diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 6b5b8fcc..adaf92a8 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -38,7 +38,7 @@ use ::tributary::{ mod tributary; use crate::tributary::{ - TributarySpec, SignData, Transaction, TributaryDb, NonceDecider, scanner::RecognizedIdType, + TributarySpec, SignData, Transaction, NonceDecider, scanner::RecognizedIdType, PlanIds, }; mod db; @@ -256,7 +256,7 @@ async fn handle_processor_message( .iter() .filter_map(|plan| Some(plan.id).filter(|_| plan.key == key)) .collect::>(); - TributaryDb::::set_plan_ids(&mut txn, tributary.spec.genesis(), *block, &plans); + PlanIds::set(&mut txn, &tributary.spec.genesis(), *block, &plans); let tx = Transaction::SubstrateBlock(*block); log::trace!( @@ -472,7 +472,7 @@ async fn handle_processor_message( // As for the safety of calling error_generating_key_pair, the processor is presumed // to only send InvalidShare or GeneratedKeyPair for a given attempt let mut txs = if let Some(faulty) = - crate::tributary::error_generating_key_pair::(&txn, key, spec, id.attempt) + crate::tributary::error_generating_key_pair::<_>(&txn, key, spec, id.attempt) { vec![Transaction::RemoveParticipant(faulty)] } else { diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index e34ca199..032b8740 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -30,7 +30,7 @@ use tokio::{sync::mpsc, time::sleep}; use crate::{ Db, processors::Processors, - tributary::{TributarySpec, SeraiBlockNumber, TributaryDb}, + tributary::{TributarySpec, SeraiBlockNumber, KeyPairDb}, }; mod db; @@ -305,7 +305,7 @@ async fn handle_block( // Immediately ensure this key pair is accessible to the tributary, before we fire any // events off of it let mut txn = db.0.txn(); - TributaryDb::::set_key_pair(&mut txn, set, &key_pair); + KeyPairDb::set(&mut txn, set, &key_pair); txn.commit(); handle_key_gen(&mut db.0, processors, serai, &block, set, key_pair).await?; diff --git a/coordinator/src/tests/tributary/dkg.rs b/coordinator/src/tests/tributary/dkg.rs index b96468d0..8434f334 100644 --- a/coordinator/src/tests/tributary/dkg.rs +++ b/coordinator/src/tests/tributary/dkg.rs @@ -13,7 +13,7 @@ use sp_runtime::traits::Verify; use tokio::time::sleep; -use serai_db::{DbTxn, Db, MemDb}; +use serai_db::{Db, MemDb, DbTxn}; use processor_messages::{ key_gen::{self, KeyGenId}, @@ -23,7 +23,7 @@ use processor_messages::{ use tributary::{TransactionTrait, Tributary}; use crate::{ - tributary::{TributaryDb, Transaction, TributarySpec, scanner::handle_new_blocks}, + tributary::{Transaction, TributarySpec, scanner::handle_new_blocks}, tests::{ MemProcessors, LocalP2p, tributary::{new_keys, new_spec, new_tributaries, run_tributaries, wait_for_tx_inclusion}, @@ -81,8 +81,8 @@ async fn dkg_test() { key: &Zeroizing<::F>, spec: &TributarySpec, tributary: &Tributary, - ) -> (TributaryDb, MemProcessors) { - let mut scanner_db = TributaryDb(MemDb::new()); + ) -> (MemDb, MemProcessors) { + let mut scanner_db = MemDb::new(); let processors = MemProcessors::new(); handle_new_blocks::<_, _, _, _, _, _, LocalP2p>( &mut scanner_db, @@ -289,7 +289,7 @@ async fn dkg_test() { if i != 0 { scanner_db = &mut local_scanner_db; } - let mut txn = scanner_db.0.txn(); + let mut txn = scanner_db.txn(); let share = crate::tributary::generated_key_pair::(&mut txn, key, &spec, &key_pair, 0).unwrap(); txn.commit(); diff --git a/coordinator/src/tributary/db.rs b/coordinator/src/tributary/db.rs index f231c0a0..20421f47 100644 --- a/coordinator/src/tributary/db.rs +++ b/coordinator/src/tributary/db.rs @@ -1,26 +1,20 @@ -use core::{marker::PhantomData, ops::Deref}; -use std::{io::Read, collections::HashMap}; - -use scale::{Encode, Decode}; +use core::ops::Deref; +use std::collections::HashMap; use zeroize::Zeroizing; -use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; +use ciphersuite::{Ciphersuite, Ristretto, group::GroupEncoding}; use frost::Participant; use serai_client::validator_sets::primitives::{ValidatorSet, KeyPair}; use processor_messages::coordinator::SubstrateSignableId; +use scale::Encode; + pub use serai_db::*; use crate::tributary::TributarySpec; -create_db! { - NewTributaryDb { - SeraiBlockNumber: (hash: [u8; 32]) -> u64, - } -} - #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum Topic { Dkg, @@ -58,7 +52,7 @@ pub struct DataSpecification { } impl DataSpecification { - fn as_key(&self, genesis: [u8; 32]) -> Vec { + pub fn as_key(&self, genesis: [u8; 32]) -> Vec { let mut res = self.topic.as_key(genesis); let label_bytes = self.label.bytes(); res.push(u8::try_from(label_bytes.len()).unwrap()); @@ -68,226 +62,6 @@ impl DataSpecification { } } -#[derive(Debug)] -pub struct TributaryDb(pub D); -impl TributaryDb { - pub fn new(db: D) -> Self { - Self(db) - } - - fn tributary_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec { - D::key(b"coordinator_tributary", dst, key) - } - - // Last block scanned - fn last_block_key(genesis: [u8; 32]) -> Vec { - Self::tributary_key(b"block", genesis) - } - pub fn set_last_block(&mut self, genesis: [u8; 32], block: [u8; 32]) { - let mut txn = self.0.txn(); - txn.put(Self::last_block_key(genesis), block); - txn.commit(); - } - pub fn last_block(&self, genesis: [u8; 32]) -> [u8; 32] { - self - .0 - .get(Self::last_block_key(genesis)) - .map(|last| last.try_into().unwrap()) - .unwrap_or(genesis) - } - - // If a validator has been fatally slashed - fn fatal_slashes_key(genesis: [u8; 32]) -> Vec { - Self::tributary_key(b"fatal_slashes", genesis) - } - fn fatally_slashed_key(genesis: [u8; 32], account: [u8; 32]) -> Vec { - Self::tributary_key(b"fatally_slashed", (genesis, account).encode()) - } - pub fn is_fatally_slashed(getter: &G, genesis: [u8; 32], account: [u8; 32]) -> bool { - getter.get(Self::fatally_slashed_key(genesis, account)).is_some() - } - pub fn set_fatally_slashed(txn: &mut D::Transaction<'_>, genesis: [u8; 32], account: [u8; 32]) { - txn.put(Self::fatally_slashed_key(genesis, account), []); - - let key = Self::fatal_slashes_key(genesis); - let mut existing = txn.get(&key).unwrap_or(vec![]); - - // Don't append if we already have it - if existing.chunks(32).any(|existing| existing == account) { - return; - } - - existing.extend(account); - txn.put(key, existing); - } - - fn share_for_blame_key(genesis: &[u8], from: Participant, to: Participant) -> Vec { - Self::tributary_key(b"share_for_blame", (genesis, u16::from(from), u16::from(to)).encode()) - } - pub fn save_share_for_blame( - txn: &mut D::Transaction<'_>, - genesis: &[u8], - from: Participant, - to: Participant, - share: &[u8], - ) { - txn.put(Self::share_for_blame_key(genesis, from, to), share); - } - pub fn share_for_blame( - getter: &G, - genesis: &[u8], - from: Participant, - to: Participant, - ) -> Option> { - getter.get(Self::share_for_blame_key(genesis, from, to)) - } - - // The plan IDs associated with a Substrate block - fn plan_ids_key(genesis: &[u8], block: u64) -> Vec { - Self::tributary_key(b"plan_ids", [genesis, block.to_le_bytes().as_ref()].concat()) - } - pub fn set_plan_ids( - txn: &mut D::Transaction<'_>, - genesis: [u8; 32], - block: u64, - plans: &[[u8; 32]], - ) { - txn.put(Self::plan_ids_key(&genesis, block), plans.concat()); - } - pub fn plan_ids(getter: &G, genesis: [u8; 32], block: u64) -> Option> { - getter.get(Self::plan_ids_key(&genesis, block)).map(|bytes| { - let mut res = vec![]; - let mut bytes_ref: &[u8] = bytes.as_ref(); - while !bytes_ref.is_empty() { - let mut id = [0; 32]; - bytes_ref.read_exact(&mut id).unwrap(); - res.push(id); - } - res - }) - } - - fn confirmation_nonces_key(genesis: [u8; 32], attempt: u32) -> Vec { - Self::tributary_key(b"confirmation_nonces", (genesis, attempt).encode()) - } - pub fn save_confirmation_nonces( - txn: &mut D::Transaction<'_>, - genesis: [u8; 32], - attempt: u32, - nonces: HashMap>, - ) { - let nonces = - nonces.into_iter().map(|(key, value)| (u16::from(key), value)).collect::>(); - txn.put(Self::confirmation_nonces_key(genesis, attempt), bincode::serialize(&nonces).unwrap()) - } - pub fn confirmation_nonces( - getter: &G, - genesis: [u8; 32], - attempt: u32, - ) -> Option>> { - let bytes = getter.get(Self::confirmation_nonces_key(genesis, attempt))?; - let map: HashMap> = bincode::deserialize(&bytes).unwrap(); - Some(map.into_iter().map(|(key, value)| (Participant::new(key).unwrap(), value)).collect()) - } - - // The key pair which we're actively working on completing - fn currently_completing_key_pair_key(genesis: [u8; 32]) -> Vec { - Self::tributary_key(b"currently_completing_key_pair", genesis) - } - pub fn save_currently_completing_key_pair( - txn: &mut D::Transaction<'_>, - genesis: [u8; 32], - key_pair: &KeyPair, - ) { - txn.put(Self::currently_completing_key_pair_key(genesis), key_pair.encode()) - } - pub fn currently_completing_key_pair(getter: &G, genesis: [u8; 32]) -> Option { - getter - .get(Self::currently_completing_key_pair_key(genesis)) - .map(|bytes| KeyPair::decode(&mut bytes.as_slice()).unwrap()) - } - - // The key pair confirmed for this Tributary - pub fn key_pair_key(set: ValidatorSet) -> Vec { - Self::tributary_key(b"key_pair", set.encode()) - } - pub fn set_key_pair(txn: &mut D::Transaction<'_>, set: ValidatorSet, key_pair: &KeyPair) { - txn.put(Self::key_pair_key(set), key_pair.encode()); - } - pub fn key_pair(getter: &G, set: ValidatorSet) -> Option { - Some(KeyPair::decode(&mut getter.get(Self::key_pair_key(set))?.as_slice()).unwrap()) - } - - // The current attempt to resolve a topic - fn attempt_key(genesis: [u8; 32], topic: Topic) -> Vec { - Self::tributary_key(b"attempt", topic.as_key(genesis)) - } - pub fn recognize_topic(txn: &mut D::Transaction<'_>, genesis: [u8; 32], topic: Topic) { - txn.put(Self::attempt_key(genesis, topic), 0u32.to_le_bytes()) - } - pub fn attempt(getter: &G, genesis: [u8; 32], topic: Topic) -> Option { - let attempt_bytes = getter.get(Self::attempt_key(genesis, topic)); - // DKGs start when the chain starts - if attempt_bytes.is_none() && (topic == Topic::Dkg) { - return Some(0); - } - Some(u32::from_le_bytes(attempt_bytes?.try_into().unwrap())) - } - - // Key for the amount of instances received thus far - fn data_received_key(genesis: [u8; 32], data_spec: &DataSpecification) -> Vec { - Self::tributary_key(b"data_received", data_spec.as_key(genesis)) - } - // Key for an instance of data from a specific validator - fn data_key( - genesis: [u8; 32], - data_spec: &DataSpecification, - signer: ::G, - ) -> Vec { - Self::tributary_key( - b"data", - [data_spec.as_key(genesis).as_slice(), signer.to_bytes().as_ref()].concat(), - ) - } - pub fn data( - getter: &G, - genesis: [u8; 32], - data_spec: &DataSpecification, - signer: ::G, - ) -> Option> { - getter.get(Self::data_key(genesis, data_spec, signer)) - } - fn set_data( - txn: &mut D::Transaction<'_>, - genesis: [u8; 32], - data_spec: &DataSpecification, - signer: ::G, - signer_shares: u16, - data: &[u8], - ) -> (u16, u16) { - let received_key = Self::data_received_key(genesis, data_spec); - let prior_received = - u16::from_le_bytes(txn.get(&received_key).unwrap_or(vec![0; 2]).try_into().unwrap()); - let received = prior_received + signer_shares; - - txn.put(received_key, received.to_le_bytes()); - txn.put(Self::data_key(genesis, data_spec, signer), data); - - (prior_received, received) - } - - fn event_key(id: &[u8], index: u32) -> Vec { - Self::tributary_key(b"event", [id, index.to_le_bytes().as_ref()].concat()) - } - pub fn handled_event(getter: &G, id: [u8; 32], index: u32) -> bool { - getter.get(Self::event_key(&id, index)).is_some() - } - pub fn handle_event(txn: &mut D::Transaction<'_>, id: [u8; 32], index: u32) { - assert!(!Self::handled_event(txn, id, index)); - txn.put(Self::event_key(&id, index), []); - } -} - pub enum DataSet { Participating(HashMap>), NotParticipating, @@ -298,17 +72,81 @@ pub enum Accumulation { NotReady, } -pub struct TributaryState(PhantomData); -impl TributaryState { +create_db!( + NewTributary { + SeraiBlockNumber: (hash: [u8; 32]) -> u64, + LastBlock: (genesis: [u8; 32]) -> [u8; 32], + FatalSlashes: (genesis: [u8; 32]) -> Vec, + FatallySlashed: (genesis: [u8; 32], account: [u8; 32]) -> (), + ShareBlame: (genesis: [u8; 32], from: u16, to: u16) -> Vec, + PlanIds: (genesis: &[u8], block: u64) -> Vec<[u8; 32]>, + ConfirmationNonces: (genesis: [u8; 32], attempt: u32) -> HashMap>, + CurrentlyCompletingKeyPair: (genesis: [u8; 32]) -> KeyPair, + KeyPairDb: (set: ValidatorSet) -> KeyPair, + AttemptDb: (genesis: [u8; 32], topic_key: &Vec) -> u32, + DataReceived: (genesis: [u8; 32], data_spec_key: &Vec) -> u16, + DataDb: (genesis: [u8; 32], data_spec_key: &Vec, signer_bytes: &[u8; 32]) -> Vec, + EventDb: (id: [u8; 32], index: u32) -> (), + } +); + +impl FatallySlashed { + pub fn set_fatally_slashed(txn: &mut impl DbTxn, genesis: [u8; 32], account: [u8; 32]) { + Self::set(txn, genesis, account, &()); + let mut existing = FatalSlashes::get(txn, genesis).unwrap_or_default(); + + // Don't append if we already have it + if existing.chunks(32).any(|existing| existing == account) { + return; + } + + existing.extend(account); + FatalSlashes::set(txn, genesis, &existing); + } +} + +impl AttemptDb { + pub fn recognize_topic(txn: &mut impl DbTxn, genesis: [u8; 32], topic: Topic) { + Self::set(txn, genesis, &topic.as_key(genesis), &0u32); + } + + pub fn attempt(getter: &impl Get, genesis: [u8; 32], topic: Topic) -> Option { + let attempt = Self::get(getter, genesis, &topic.as_key(genesis)); + if attempt.is_none() && topic == Topic::Dkg { + return Some(0); + } + attempt + } +} + +impl DataDb { + pub fn set_data( + txn: &mut impl DbTxn, + genesis: [u8; 32], + data_spec: &DataSpecification, + signer: ::G, + signer_shares: u16, + data: &Vec, + ) -> (u16, u16) { + let data_spec = data_spec.as_key(genesis); + let prior_received = DataReceived::get(txn, genesis, &data_spec).unwrap_or_default(); + let received = prior_received + signer_shares; + DataReceived::set(txn, genesis, &data_spec, &received); + DataDb::set(txn, genesis, &data_spec, &signer.to_bytes(), data); + (prior_received, received) + } + pub fn accumulate( - txn: &mut D::Transaction<'_>, + txn: &mut impl DbTxn, our_key: &Zeroizing<::F>, spec: &TributarySpec, data_spec: &DataSpecification, signer: ::G, - data: &[u8], + data: &Vec, ) -> Accumulation { - if TributaryDb::::data(txn, spec.genesis(), data_spec, signer).is_some() { + let genesis = spec.genesis(); + let data_spec_key = data_spec.as_key(genesis); + if Self::get(txn, genesis, &data_spec_key, &signer.to_bytes()).is_some() { panic!("accumulating data for a participant multiple times"); } let signer_shares = { @@ -317,7 +155,7 @@ impl TributaryState { u16::from(signer_i.end) - u16::from(signer_i.start) }; let (prior_received, now_received) = - TributaryDb::::set_data(txn, spec.genesis(), data_spec, signer, signer_shares, data); + Self::set_data(txn, spec.genesis(), data_spec, signer, signer_shares, data); // If we have all the needed commitments/preprocesses/shares, tell the processor let needed = if data_spec.topic == Topic::Dkg { spec.n() } else { spec.t() }; @@ -327,13 +165,14 @@ impl TributaryState { for validator in spec.validators().iter().map(|validator| validator.0) { data.insert( spec.i(validator).unwrap().start, - if let Some(data) = TributaryDb::::data(txn, spec.genesis(), data_spec, validator) { + if let Some(data) = Self::get(txn, genesis, &data_spec_key, &validator.to_bytes()) { data } else { continue; }, ); } + assert_eq!(data.len(), usize::from(needed)); // Remove our own piece of data, if we were involved @@ -355,3 +194,10 @@ impl TributaryState { Accumulation::NotReady } } + +impl EventDb { + pub fn handle_event(txn: &mut impl DbTxn, id: [u8; 32], index: u32) { + assert!(Self::get(txn, id, index).is_none()); + Self::set(txn, id, index, &()); + } +} diff --git a/coordinator/src/tributary/handle.rs b/coordinator/src/tributary/handle.rs index f94f5e84..63b5ac2b 100644 --- a/coordinator/src/tributary/handle.rs +++ b/coordinator/src/tributary/handle.rs @@ -27,14 +27,16 @@ use serai_db::{Get, Db}; use crate::{ processors::Processors, tributary::{ - Transaction, TributarySpec, SeraiBlockNumber, Topic, DataSpecification, TributaryDb, DataSet, - Accumulation, TributaryState, + Transaction, TributarySpec, SeraiBlockNumber, Topic, DataSpecification, DataSet, Accumulation, nonce_decider::NonceDecider, dkg_confirmer::DkgConfirmer, scanner::{RecognizedIdType, RIDTrait}, + FatallySlashed, ShareBlame, PlanIds, ConfirmationNonces, KeyPairDb, AttemptDb, DataDb, }, }; +use super::CurrentlyCompletingKeyPair; + const DKG_COMMITMENTS: &str = "commitments"; const DKG_SHARES: &str = "shares"; const DKG_CONFIRMATION_NONCES: &str = "confirmation_nonces"; @@ -61,14 +63,13 @@ pub fn dkg_confirmation_nonces( // // The caller must ensure only error_generating_key_pair or generated_key_pair is called for a // given attempt. -pub fn error_generating_key_pair( +pub fn error_generating_key_pair( getter: &G, key: &Zeroizing<::F>, spec: &TributarySpec, attempt: u32, ) -> Option { - let preprocesses = - TributaryDb::::confirmation_nonces(getter, spec.genesis(), attempt).unwrap(); + let preprocesses = ConfirmationNonces::get(getter, spec.genesis(), attempt).unwrap(); // Sign a key pair which can't be valid // (0xff used as 0 would be the Ristretto identity point, 0-length for the network key) @@ -90,8 +91,8 @@ pub fn generated_key_pair( key_pair: &KeyPair, attempt: u32, ) -> Result<[u8; 32], Participant> { - TributaryDb::::save_currently_completing_key_pair(txn, spec.genesis(), key_pair); - let preprocesses = TributaryDb::::confirmation_nonces(txn, spec.genesis(), attempt).unwrap(); + CurrentlyCompletingKeyPair::set(txn, spec.genesis(), key_pair); + let preprocesses = ConfirmationNonces::get(txn, spec.genesis(), attempt).unwrap(); DkgConfirmer::share(spec, key, attempt, preprocesses, key_pair) } @@ -102,7 +103,7 @@ pub(super) fn fatal_slash( reason: &str, ) { log::warn!("fatally slashing {}. reason: {}", hex::encode(account), reason); - TributaryDb::::set_fatally_slashed(txn, genesis, account); + FatallySlashed::set_fatally_slashed(txn, genesis, account); // TODO: disconnect the node from network/ban from further participation in all Tributaries // TODO: If during DKG, trigger a re-attempt @@ -155,7 +156,7 @@ pub(crate) async fn handle_application_tx< // TODO: Because fatally slashed participants can still publish onto the blockchain, they have // a notable DoS ability if let TransactionKind::Signed(signed) = tx.kind() { - if TributaryDb::::is_fatally_slashed(txn, genesis, signed.signer.to_bytes()) { + if FatallySlashed::get(txn, genesis, signed.signer.to_bytes()).is_some() { return; } } @@ -164,7 +165,7 @@ pub(crate) async fn handle_application_tx< data_spec: &DataSpecification, bytes: Vec, signed: &Signed| { - let Some(curr_attempt) = TributaryDb::::attempt(txn, genesis, data_spec.topic) else { + let Some(curr_attempt) = AttemptDb::attempt(txn, genesis, data_spec.topic) else { // Premature publication of a valid ID/publication of an invalid ID fatal_slash::( txn, @@ -176,7 +177,7 @@ pub(crate) async fn handle_application_tx< }; // If they've already published a TX for this attempt, slash - if TributaryDb::::data(txn, genesis, data_spec, signed.signer).is_some() { + if DataDb::get(txn, genesis, &data_spec.as_key(genesis), &signed.signer.to_bytes()).is_some() { fatal_slash::(txn, genesis, signed.signer.to_bytes(), "published data multiple times"); return Accumulation::NotReady; } @@ -203,7 +204,7 @@ pub(crate) async fn handle_application_tx< // TODO: If this is shares, we need to check they are part of the selected signing set // Accumulate this data - TributaryState::::accumulate(txn, key, spec, data_spec, signed.signer, &bytes) + DataDb::accumulate(txn, key, spec, data_spec, signed.signer, &bytes) }; fn check_sign_data_len( @@ -312,7 +313,7 @@ pub(crate) async fn handle_application_tx< } let to = Participant::new(to).unwrap(); - TributaryDb::::save_share_for_blame(txn, &genesis, from, to, share); + ShareBlame::set(txn, genesis, from.into(), to.into(), &share); } } } @@ -367,7 +368,7 @@ pub(crate) async fn handle_application_tx< else { panic!("got all DKG shares yet confirmation nonces aren't Ready(Participating(_))"); }; - TributaryDb::::save_confirmation_nonces(txn, genesis, attempt, confirmation_nonces); + ConfirmationNonces::set(txn, genesis, attempt, &confirmation_nonces); // shares is a HashMap>>>, with the values representing: // - Each of the sender's shares @@ -438,7 +439,7 @@ pub(crate) async fn handle_application_tx< return; } - let share = TributaryDb::::share_for_blame(txn, &genesis, accuser, faulty).unwrap(); + let share = ShareBlame::get(txn, genesis, accuser.into(), faulty.into()).unwrap(); processors .send( spec.set().network, @@ -463,16 +464,13 @@ pub(crate) async fn handle_application_tx< Accumulation::Ready(DataSet::Participating(shares)) => { log::info!("got all DkgConfirmed for {}", hex::encode(genesis)); - let preprocesses = TributaryDb::::confirmation_nonces(txn, genesis, attempt).unwrap(); + let preprocesses = ConfirmationNonces::get(txn, genesis, attempt).unwrap(); // TODO: This can technically happen under very very very specific timing as the txn put // happens before DkgConfirmed, yet the txn commit isn't guaranteed to - let key_pair = TributaryDb::::currently_completing_key_pair(txn, genesis) - .unwrap_or_else(|| { - panic!( - "in DkgConfirmed handling, which happens after everyone {}", - "(including us) fires DkgConfirmed, yet no confirming key pair" - ) - }); + let key_pair = CurrentlyCompletingKeyPair::get(txn, genesis).expect( + "in DkgConfirmed handling, which happens after everyone \ + (including us) fires DkgConfirmed, yet no confirming key pair", + ); let sig = match DkgConfirmer::complete(spec, key, attempt, preprocesses, &key_pair, shares) { Ok(sig) => sig, @@ -496,7 +494,7 @@ pub(crate) async fn handle_application_tx< } Transaction::CosignSubstrateBlock(hash) => { - TributaryDb::::recognize_topic( + AttemptDb::recognize_topic( txn, genesis, Topic::SubstrateSign(SubstrateSignableId::CosigningSubstrateBlock(hash)), @@ -508,7 +506,7 @@ pub(crate) async fn handle_application_tx< ); let key = loop { - let Some(key_pair) = TributaryDb::::key_pair(txn, spec.set()) else { + let Some(key_pair) = KeyPairDb::get(txn, spec.set()) else { // This can happen based on a timing condition log::warn!("CosignSubstrateBlock yet keys weren't set yet"); tokio::time::sleep(core::time::Duration::from_secs(1)).await; @@ -535,7 +533,7 @@ pub(crate) async fn handle_application_tx< Transaction::Batch(_, batch) => { // Because this Batch has achieved synchrony, its batch ID should be authorized - TributaryDb::::recognize_topic( + AttemptDb::recognize_topic( txn, genesis, Topic::SubstrateSign(SubstrateSignableId::Batch(batch)), @@ -546,14 +544,14 @@ pub(crate) async fn handle_application_tx< } Transaction::SubstrateBlock(block) => { - let plan_ids = TributaryDb::::plan_ids(txn, genesis, block).expect( + let plan_ids = PlanIds::get(txn, &genesis, block).expect( "synced a tributary block finalizing a substrate block in a provided transaction \ despite us not providing that transaction", ); let nonces = NonceDecider::handle_substrate_block(txn, genesis, &plan_ids); for (nonce, id) in nonces.into_iter().zip(plan_ids.into_iter()) { - TributaryDb::::recognize_topic(txn, genesis, Topic::Sign(id)); + AttemptDb::recognize_topic(txn, genesis, Topic::Sign(id)); recognized_id(spec.set(), genesis, RecognizedIdType::Plan, id.to_vec(), nonce).await; } } @@ -575,7 +573,7 @@ pub(crate) async fn handle_application_tx< Accumulation::Ready(DataSet::Participating(mut preprocesses)) => { unflatten(spec, &mut preprocesses); NonceDecider::selected_for_signing_substrate(txn, genesis, data.plan); - let key = TributaryDb::::key_pair(txn, spec.set()).unwrap().0 .0; + let key = KeyPairDb::get(txn, spec.set()).unwrap().0 .0; processors .send( spec.set().network, @@ -606,7 +604,7 @@ pub(crate) async fn handle_application_tx< ) { Accumulation::Ready(DataSet::Participating(mut shares)) => { unflatten(spec, &mut shares); - let key = TributaryDb::::key_pair(txn, spec.set()).unwrap().0 .0; + let key = KeyPairDb::get(txn, spec.set()).unwrap().0 .0; processors .send( spec.set().network, @@ -629,7 +627,7 @@ pub(crate) async fn handle_application_tx< let Ok(_) = check_sign_data_len::(txn, spec, data.signed.signer, data.data.len()) else { return; }; - let key_pair = TributaryDb::::key_pair(txn, spec.set()); + let key_pair = KeyPairDb::get(txn, spec.set()); match handle( txn, &DataSpecification { @@ -668,7 +666,7 @@ pub(crate) async fn handle_application_tx< let Ok(_) = check_sign_data_len::(txn, spec, data.signed.signer, data.data.len()) else { return; }; - let key_pair = TributaryDb::::key_pair(txn, spec.set()); + let key_pair = KeyPairDb::get(txn, spec.set()); match handle( txn, &DataSpecification { @@ -709,7 +707,7 @@ pub(crate) async fn handle_application_tx< hex::encode(plan) ); - if TributaryDb::::attempt(txn, genesis, Topic::Sign(plan)).is_none() { + if AttemptDb::attempt(txn, genesis, Topic::Sign(plan)).is_none() { fatal_slash::( txn, genesis, @@ -720,7 +718,7 @@ pub(crate) async fn handle_application_tx< }; // TODO: Confirm this signer hasn't prior published a completion - let Some(key_pair) = TributaryDb::::key_pair(txn, spec.set()) else { + let Some(key_pair) = KeyPairDb::get(txn, spec.set()) else { panic!("SignCompleted for recognized plan ID despite not having a key pair for this set") }; processors diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index 895c7754..40afe6e1 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -18,16 +18,18 @@ use tributary::{ }, }; -use serai_db::DbTxn; - use crate::{ Db, tributary::handle::{fatal_slash, handle_application_tx}, processors::Processors, - tributary::{TributaryDb, TributarySpec, Transaction}, + tributary::{TributarySpec, Transaction, EventDb}, P2p, }; +use super::LastBlock; + +use serai_db::DbTxn; + #[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)] pub enum RecognizedIdType { Batch, @@ -53,7 +55,7 @@ async fn handle_block< RID: RIDTrait, P: P2p, >( - db: &mut TributaryDb, + db: &mut D, key: &Zeroizing<::F>, recognized_id: RID, processors: &Pro, @@ -69,12 +71,12 @@ async fn handle_block< let mut event_id = 0; #[allow(clippy::explicit_counter_loop)] // event_id isn't TX index. It just currently lines up for tx in block.transactions { - if TributaryDb::::handled_event(&db.0, hash, event_id) { + if EventDb::get(db, hash, event_id).is_some() { event_id += 1; continue; } - let mut txn = db.0.txn(); + let mut txn = db.txn(); match tx { TributaryTransaction::Tendermint(TendermintTx::SlashEvidence(ev)) => { @@ -121,7 +123,7 @@ async fn handle_block< } } - TributaryDb::::handle_event(&mut txn, hash, event_id); + EventDb::handle_event(&mut txn, hash, event_id); txn.commit(); event_id += 1; @@ -139,7 +141,7 @@ pub(crate) async fn handle_new_blocks< RID: RIDTrait, P: P2p, >( - db: &mut TributaryDb, + db: &mut D, key: &Zeroizing<::F>, recognized_id: RID, processors: &Pro, @@ -148,7 +150,7 @@ pub(crate) async fn handle_new_blocks< tributary: &TributaryReader, ) { let genesis = tributary.genesis(); - let mut last_block = db.last_block(genesis); + let mut last_block = LastBlock::get(db, genesis).unwrap_or(genesis); while let Some(next) = tributary.block_after(&last_block) { let block = tributary.block(&next).unwrap(); @@ -176,7 +178,9 @@ pub(crate) async fn handle_new_blocks< ) .await; last_block = next; - db.set_last_block(genesis, next); + let mut txn = db.txn(); + LastBlock::set(&mut txn, genesis, &next); + txn.commit(); } } @@ -209,7 +213,7 @@ pub(crate) async fn scan_tributaries_task< async move { let spec = &spec; let reader = tributary.reader(); - let mut tributary_db = TributaryDb::new(raw_db.clone()); + let mut tributary_db = raw_db.clone(); loop { // Check if the set was retired, and if so, don't further operate if crate::MainDb::::is_tributary_retired(&raw_db, spec.set()) {