mirror of
https://github.com/serai-dex/serai.git
synced 2025-03-12 09:26:51 +00:00
Add a TributaryState struct with higher-level DB logic
This commit is contained in:
parent
7c10873cd5
commit
0198d4cc46
5 changed files with 142 additions and 97 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -8188,6 +8188,7 @@ name = "serai-coordinator"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bincode",
|
||||
"blake2",
|
||||
"ciphersuite",
|
||||
"env_logger",
|
||||
|
|
|
@ -42,6 +42,7 @@ tributary = { package = "tributary-chain", path = "./tributary" }
|
|||
serai-client = { path = "../substrate/client", features = ["serai"] }
|
||||
|
||||
hex = "0.4"
|
||||
bincode = "1"
|
||||
serde_json = { version = "1", default-features = false }
|
||||
|
||||
log = "0.4"
|
||||
|
|
|
@ -283,8 +283,7 @@ async fn dkg_test() {
|
|||
let mut txs = vec![];
|
||||
for key in keys.iter() {
|
||||
let attempt = 0;
|
||||
// This is fine to re-use the one DB as such, due to exactly how this specific call is coded,
|
||||
// albeit poor
|
||||
let (mut scanner_db, _) = new_processors(key, &spec, &tributaries[0].1).await;
|
||||
let mut txn = scanner_db.0.txn();
|
||||
let share =
|
||||
crate::tributary::generated_key_pair::<MemDb>(&mut txn, key, &spec, &key_pair, 0).unwrap();
|
||||
|
|
|
@ -1,13 +1,18 @@
|
|||
use std::io::Read;
|
||||
use core::{marker::PhantomData, ops::Deref};
|
||||
use std::{io::Read, collections::HashMap};
|
||||
|
||||
use scale::{Encode, Decode};
|
||||
|
||||
use zeroize::Zeroizing;
|
||||
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
|
||||
use frost::Participant;
|
||||
|
||||
use serai_client::validator_sets::primitives::{ValidatorSet, KeyPair};
|
||||
|
||||
pub use serai_db::*;
|
||||
|
||||
use crate::tributary::TributarySpec;
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
|
||||
pub enum Topic {
|
||||
Dkg,
|
||||
|
@ -125,6 +130,29 @@ impl<D: Db> TributaryDb<D> {
|
|||
})
|
||||
}
|
||||
|
||||
fn confirmation_nonces_key(genesis: [u8; 32], attempt: u32) -> Vec<u8> {
|
||||
Self::tributary_key(b"confirmation_nonces", (genesis, attempt).encode())
|
||||
}
|
||||
pub fn save_confirmation_nonces(
|
||||
txn: &mut D::Transaction<'_>,
|
||||
genesis: [u8; 32],
|
||||
attempt: u32,
|
||||
nonces: HashMap<Participant, Vec<u8>>,
|
||||
) {
|
||||
let nonces =
|
||||
nonces.into_iter().map(|(key, value)| (u16::from(key), value)).collect::<HashMap<_, _>>();
|
||||
txn.put(Self::confirmation_nonces_key(genesis, attempt), bincode::serialize(&nonces).unwrap())
|
||||
}
|
||||
pub fn confirmation_nonces<G: Get>(
|
||||
getter: &G,
|
||||
genesis: [u8; 32],
|
||||
attempt: u32,
|
||||
) -> Option<HashMap<Participant, Vec<u8>>> {
|
||||
let bytes = getter.get(Self::confirmation_nonces_key(genesis, attempt))?;
|
||||
let map: HashMap<u16, Vec<u8>> = bincode::deserialize(&bytes).unwrap();
|
||||
Some(map.into_iter().map(|(key, value)| (Participant::new(key).unwrap(), value)).collect())
|
||||
}
|
||||
|
||||
// The key pair which we're actively working on completing
|
||||
fn currently_completing_key_pair_key(genesis: [u8; 32]) -> Vec<u8> {
|
||||
Self::tributary_key(b"currently_completing_key_pair", genesis)
|
||||
|
@ -221,3 +249,65 @@ impl<D: Db> TributaryDb<D> {
|
|||
txn.put(Self::event_key(&id, index), []);
|
||||
}
|
||||
}
|
||||
|
||||
pub enum DataSet {
|
||||
Participating(HashMap<Participant, Vec<u8>>),
|
||||
NotParticipating,
|
||||
}
|
||||
|
||||
pub enum Accumulation {
|
||||
Ready(DataSet),
|
||||
NotReady,
|
||||
}
|
||||
|
||||
pub struct TributaryState<D: Db>(PhantomData<D>);
|
||||
impl<D: Db> TributaryState<D> {
|
||||
pub fn accumulate(
|
||||
txn: &mut D::Transaction<'_>,
|
||||
our_key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||
spec: &TributarySpec,
|
||||
data_spec: &DataSpecification,
|
||||
signer: <Ristretto as Ciphersuite>::G,
|
||||
data: &[u8],
|
||||
) -> Accumulation {
|
||||
if TributaryDb::<D>::data(txn, spec.genesis(), data_spec, signer).is_some() {
|
||||
panic!("accumulating data for a participant multiple times");
|
||||
}
|
||||
let received = TributaryDb::<D>::set_data(txn, spec.genesis(), data_spec, signer, data);
|
||||
|
||||
// If we have all the needed commitments/preprocesses/shares, tell the processor
|
||||
// TODO: This needs to be coded by weight, not by validator count
|
||||
let needed = if data_spec.topic == Topic::Dkg { spec.n() } else { spec.t() };
|
||||
if received == needed {
|
||||
return Accumulation::Ready({
|
||||
let mut data = HashMap::new();
|
||||
for validator in spec.validators().iter().map(|validator| validator.0) {
|
||||
data.insert(
|
||||
spec.i(validator).unwrap(),
|
||||
if let Some(data) = TributaryDb::<D>::data(txn, spec.genesis(), data_spec, validator) {
|
||||
data
|
||||
} else {
|
||||
continue;
|
||||
},
|
||||
);
|
||||
}
|
||||
assert_eq!(data.len(), usize::from(needed));
|
||||
|
||||
// Remove our own piece of data, if we were involved
|
||||
if data
|
||||
.remove(
|
||||
&spec
|
||||
.i(Ristretto::generator() * our_key.deref())
|
||||
.expect("handling a message for a Tributary we aren't part of"),
|
||||
)
|
||||
.is_some()
|
||||
{
|
||||
DataSet::Participating(data)
|
||||
} else {
|
||||
DataSet::NotParticipating
|
||||
}
|
||||
});
|
||||
}
|
||||
Accumulation::NotReady
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
use core::{ops::Deref, future::Future};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use zeroize::Zeroizing;
|
||||
|
||||
|
@ -21,12 +20,13 @@ use processor_messages::{
|
|||
sign::{self, SignId},
|
||||
};
|
||||
|
||||
use serai_db::{Get, Db};
|
||||
use serai_db::Db;
|
||||
|
||||
use crate::{
|
||||
processors::Processors,
|
||||
tributary::{
|
||||
Transaction, TributarySpec, Topic, DataSpecification, TributaryDb,
|
||||
Transaction, TributarySpec, Topic, DataSpecification, TributaryDb, DataSet, Accumulation,
|
||||
TributaryState,
|
||||
nonce_decider::NonceDecider,
|
||||
dkg_confirmer::DkgConfirmer,
|
||||
scanner::{RecognizedIdType, RIDTrait},
|
||||
|
@ -46,41 +46,6 @@ const BATCH_SHARE: &str = "b_share";
|
|||
const SIGN_PREPROCESS: &str = "s_preprocess";
|
||||
const SIGN_SHARE: &str = "s_share";
|
||||
|
||||
fn read_known_to_exist_data<D: Db, G: Get>(
|
||||
getter: &G,
|
||||
spec: &TributarySpec,
|
||||
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||
data_spec: &DataSpecification,
|
||||
needed: u16,
|
||||
) -> Option<HashMap<Participant, Vec<u8>>> {
|
||||
let mut data = HashMap::new();
|
||||
for validator in spec.validators().iter().map(|validator| validator.0) {
|
||||
data.insert(
|
||||
spec.i(validator).unwrap(),
|
||||
if let Some(data) = TributaryDb::<D>::data(getter, spec.genesis(), data_spec, validator) {
|
||||
data
|
||||
} else {
|
||||
continue;
|
||||
},
|
||||
);
|
||||
}
|
||||
assert_eq!(data.len(), usize::from(needed));
|
||||
|
||||
// Remove our own piece of data, if we were involved
|
||||
if data
|
||||
.remove(
|
||||
&spec
|
||||
.i(Ristretto::generator() * key.deref())
|
||||
.expect("handling a message for a Tributary we aren't part of"),
|
||||
)
|
||||
.is_some()
|
||||
{
|
||||
Some(data)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn dkg_confirmation_nonces(
|
||||
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||
spec: &TributarySpec,
|
||||
|
@ -98,16 +63,7 @@ pub fn generated_key_pair<D: Db>(
|
|||
attempt: u32,
|
||||
) -> Result<[u8; 32], Participant> {
|
||||
TributaryDb::<D>::save_currently_completing_key_pair(txn, spec.genesis(), key_pair);
|
||||
|
||||
let Some(preprocesses) = read_known_to_exist_data::<D, _>(
|
||||
txn,
|
||||
spec,
|
||||
key,
|
||||
&DataSpecification { topic: Topic::Dkg, label: DKG_CONFIRMATION_NONCES, attempt },
|
||||
spec.n(),
|
||||
) else {
|
||||
panic!("wasn't a participant in confirming a key pair");
|
||||
};
|
||||
let preprocesses = TributaryDb::<D>::confirmation_nonces(txn, spec.genesis(), attempt).unwrap();
|
||||
DkgConfirmer::share(spec, key, attempt, preprocesses, key_pair)
|
||||
}
|
||||
|
||||
|
@ -152,19 +108,19 @@ pub(crate) async fn handle_application_tx<
|
|||
signed.signer.to_bytes(),
|
||||
"published data for ID without an attempt",
|
||||
);
|
||||
return None;
|
||||
return Accumulation::NotReady;
|
||||
};
|
||||
|
||||
// If they've already published a TX for this attempt, slash
|
||||
if TributaryDb::<D>::data(txn, genesis, data_spec, signed.signer).is_some() {
|
||||
fatal_slash::<D>(txn, genesis, signed.signer.to_bytes(), "published data multiple times");
|
||||
return None;
|
||||
return Accumulation::NotReady;
|
||||
}
|
||||
|
||||
// If the attempt is lesser than the blockchain's, slash
|
||||
if data_spec.attempt < curr_attempt {
|
||||
// TODO: Slash for being late
|
||||
return None;
|
||||
return Accumulation::NotReady;
|
||||
}
|
||||
// If the attempt is greater, this is a premature publication, full slash
|
||||
if data_spec.attempt > curr_attempt {
|
||||
|
@ -174,7 +130,7 @@ pub(crate) async fn handle_application_tx<
|
|||
signed.signer.to_bytes(),
|
||||
"published data with an attempt which hasn't started",
|
||||
);
|
||||
return None;
|
||||
return Accumulation::NotReady;
|
||||
}
|
||||
|
||||
// TODO: We can also full slash if shares before all commitments, or share before the
|
||||
|
@ -182,16 +138,8 @@ pub(crate) async fn handle_application_tx<
|
|||
|
||||
// TODO: If this is shares, we need to check they are part of the selected signing set
|
||||
|
||||
// Store this data
|
||||
let received = TributaryDb::<D>::set_data(txn, genesis, data_spec, signed.signer, &bytes);
|
||||
|
||||
// If we have all the needed commitments/preprocesses/shares, tell the processor
|
||||
// TODO: This needs to be coded by weight, not by validator count
|
||||
let needed = if data_spec.topic == Topic::Dkg { spec.n() } else { spec.t() };
|
||||
if received == needed {
|
||||
return Some(read_known_to_exist_data::<D, _>(txn, spec, key, data_spec, needed));
|
||||
}
|
||||
None
|
||||
// Accumulate this data
|
||||
TributaryState::<D>::accumulate(txn, key, spec, data_spec, signed.signer, &bytes)
|
||||
};
|
||||
|
||||
match tx {
|
||||
|
@ -202,7 +150,7 @@ pub(crate) async fn handle_application_tx<
|
|||
bytes,
|
||||
&signed,
|
||||
) {
|
||||
Some(Some(commitments)) => {
|
||||
Accumulation::Ready(DataSet::Participating(commitments)) => {
|
||||
log::info!("got all DkgCommitments for {}", hex::encode(genesis));
|
||||
processors
|
||||
.send(
|
||||
|
@ -214,8 +162,10 @@ pub(crate) async fn handle_application_tx<
|
|||
)
|
||||
.await;
|
||||
}
|
||||
Some(None) => panic!("wasn't a participant in DKG commitments"),
|
||||
None => {}
|
||||
Accumulation::Ready(DataSet::NotParticipating) => {
|
||||
panic!("wasn't a participant in DKG commitments")
|
||||
}
|
||||
Accumulation::NotReady => {}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -257,9 +207,16 @@ pub(crate) async fn handle_application_tx<
|
|||
bytes,
|
||||
&signed,
|
||||
) {
|
||||
Some(Some(shares)) => {
|
||||
Accumulation::Ready(DataSet::Participating(shares)) => {
|
||||
log::info!("got all DkgShares for {}", hex::encode(genesis));
|
||||
assert!(confirmation_nonces.is_some());
|
||||
|
||||
let Accumulation::Ready(DataSet::Participating(confirmation_nonces)) =
|
||||
confirmation_nonces
|
||||
else {
|
||||
panic!("got all DKG shares yet confirmation nonces aren't Ready(Participating(_))");
|
||||
};
|
||||
TributaryDb::<D>::save_confirmation_nonces(txn, genesis, attempt, confirmation_nonces);
|
||||
|
||||
processors
|
||||
.send(
|
||||
spec.set().network,
|
||||
|
@ -270,8 +227,10 @@ pub(crate) async fn handle_application_tx<
|
|||
)
|
||||
.await;
|
||||
}
|
||||
Some(None) => panic!("wasn't a participant in DKG shares"),
|
||||
None => assert!(confirmation_nonces.is_none()),
|
||||
Accumulation::Ready(DataSet::NotParticipating) => {
|
||||
panic!("wasn't a participant in DKG shares")
|
||||
}
|
||||
Accumulation::NotReady => assert!(matches!(confirmation_nonces, Accumulation::NotReady)),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -282,19 +241,12 @@ pub(crate) async fn handle_application_tx<
|
|||
shares.to_vec(),
|
||||
&signed,
|
||||
) {
|
||||
Some(Some(shares)) => {
|
||||
Accumulation::Ready(DataSet::Participating(shares)) => {
|
||||
log::info!("got all DkgConfirmed for {}", hex::encode(genesis));
|
||||
|
||||
let Some(preprocesses) = read_known_to_exist_data::<D, _>(
|
||||
txn,
|
||||
spec,
|
||||
key,
|
||||
&DataSpecification { topic: Topic::Dkg, label: DKG_CONFIRMATION_NONCES, attempt },
|
||||
spec.n(),
|
||||
) else {
|
||||
panic!("wasn't a participant in DKG confirmation nonces");
|
||||
};
|
||||
|
||||
let preprocesses = TributaryDb::<D>::confirmation_nonces(txn, genesis, attempt).unwrap();
|
||||
// TODO: This can technically happen under very very very specific timing as the txn put
|
||||
// happens before DkgConfirmed, yet the txn commit isn't guaranteed to
|
||||
let key_pair = TributaryDb::<D>::currently_completing_key_pair(txn, genesis)
|
||||
.unwrap_or_else(|| {
|
||||
panic!(
|
||||
|
@ -314,8 +266,10 @@ pub(crate) async fn handle_application_tx<
|
|||
)
|
||||
.await;
|
||||
}
|
||||
Some(None) => panic!("wasn't a participant in DKG confirmination shares"),
|
||||
None => {}
|
||||
Accumulation::Ready(DataSet::NotParticipating) => {
|
||||
panic!("wasn't a participant in DKG confirmination shares")
|
||||
}
|
||||
Accumulation::NotReady => {}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -350,7 +304,7 @@ pub(crate) async fn handle_application_tx<
|
|||
data.data,
|
||||
&data.signed,
|
||||
) {
|
||||
Some(Some(preprocesses)) => {
|
||||
Accumulation::Ready(DataSet::Participating(preprocesses)) => {
|
||||
NonceDecider::<D>::selected_for_signing_batch(txn, genesis, data.plan);
|
||||
let key = TributaryDb::<D>::key_pair(txn, spec.set()).unwrap().0 .0.to_vec();
|
||||
processors
|
||||
|
@ -363,8 +317,8 @@ pub(crate) async fn handle_application_tx<
|
|||
)
|
||||
.await;
|
||||
}
|
||||
Some(None) => {}
|
||||
None => {}
|
||||
Accumulation::Ready(DataSet::NotParticipating) => {}
|
||||
Accumulation::NotReady => {}
|
||||
}
|
||||
}
|
||||
Transaction::BatchShare(data) => {
|
||||
|
@ -378,7 +332,7 @@ pub(crate) async fn handle_application_tx<
|
|||
data.data,
|
||||
&data.signed,
|
||||
) {
|
||||
Some(Some(shares)) => {
|
||||
Accumulation::Ready(DataSet::Participating(shares)) => {
|
||||
let key = TributaryDb::<D>::key_pair(txn, spec.set()).unwrap().0 .0.to_vec();
|
||||
processors
|
||||
.send(
|
||||
|
@ -393,8 +347,8 @@ pub(crate) async fn handle_application_tx<
|
|||
)
|
||||
.await;
|
||||
}
|
||||
Some(None) => {}
|
||||
None => {}
|
||||
Accumulation::Ready(DataSet::NotParticipating) => {}
|
||||
Accumulation::NotReady => {}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -410,7 +364,7 @@ pub(crate) async fn handle_application_tx<
|
|||
data.data,
|
||||
&data.signed,
|
||||
) {
|
||||
Some(Some(preprocesses)) => {
|
||||
Accumulation::Ready(DataSet::Participating(preprocesses)) => {
|
||||
NonceDecider::<D>::selected_for_signing_plan(txn, genesis, data.plan);
|
||||
processors
|
||||
.send(
|
||||
|
@ -429,8 +383,8 @@ pub(crate) async fn handle_application_tx<
|
|||
)
|
||||
.await;
|
||||
}
|
||||
Some(None) => {}
|
||||
None => {}
|
||||
Accumulation::Ready(DataSet::NotParticipating) => {}
|
||||
Accumulation::NotReady => {}
|
||||
}
|
||||
}
|
||||
Transaction::SignShare(data) => {
|
||||
|
@ -445,7 +399,7 @@ pub(crate) async fn handle_application_tx<
|
|||
data.data,
|
||||
&data.signed,
|
||||
) {
|
||||
Some(Some(shares)) => {
|
||||
Accumulation::Ready(DataSet::Participating(shares)) => {
|
||||
processors
|
||||
.send(
|
||||
spec.set().network,
|
||||
|
@ -463,8 +417,8 @@ pub(crate) async fn handle_application_tx<
|
|||
)
|
||||
.await;
|
||||
}
|
||||
Some(None) => {}
|
||||
None => {}
|
||||
Accumulation::Ready(DataSet::NotParticipating) => {}
|
||||
Accumulation::NotReady => {}
|
||||
}
|
||||
}
|
||||
Transaction::SignCompleted { plan, tx_hash, .. } => {
|
||||
|
|
Loading…
Reference in a new issue