From 16b22dd105f67fd457ce43191c76397a8a3553e2 Mon Sep 17 00:00:00 2001 From: David Bell <17103917+davidjohnbell@users.noreply.github.com> Date: Fri, 8 Dec 2023 14:12:16 +0400 Subject: [PATCH] Convert coordinator/substrate/db to use create_db macro (#436) * chore: implement create_db for substrate (fix broken branch) * Correct rebase artifacts * chore: remove todo statement * chore: rename BlockDb to NextBlock * chore: return empty tuple instead of empty array for event storage * Finish rebasing * .Minor tweaks to remove leftover variables These may be rebase artifacts. --------- Co-authored-by: Luke Parker --- coordinator/src/cosign_evaluator.rs | 6 +- coordinator/src/substrate/db.rs | 96 ++++++++--------------------- coordinator/src/substrate/mod.rs | 50 +++++++-------- 3 files changed, 53 insertions(+), 99 deletions(-) diff --git a/coordinator/src/cosign_evaluator.rs b/coordinator/src/cosign_evaluator.rs index c097acd7..a38377b4 100644 --- a/coordinator/src/cosign_evaluator.rs +++ b/coordinator/src/cosign_evaluator.rs @@ -23,7 +23,7 @@ use processor_messages::coordinator::cosign_block_msg; use crate::{ p2p::{CosignedBlock, P2pMessageKind, P2p}, - substrate::SubstrateDb, + substrate::LatestCosignedBlock, }; create_db! { @@ -67,9 +67,9 @@ impl CosignEvaluator { let mut db_lock = self.db.lock().await; let mut txn = db_lock.txn(); - if highest_block > SubstrateDb::::latest_cosigned_block(&txn) { + if highest_block > LatestCosignedBlock::latest_cosigned_block(&txn) { log::info!("setting latest cosigned block to {}", highest_block); - SubstrateDb::::set_latest_cosigned_block(&mut txn, highest_block); + LatestCosignedBlock::set(&mut txn, &highest_block); } txn.commit(); } diff --git a/coordinator/src/substrate/db.rs b/coordinator/src/substrate/db.rs index 02fe65cf..e2e33c51 100644 --- a/coordinator/src/substrate/db.rs +++ b/coordinator/src/substrate/db.rs @@ -7,13 +7,17 @@ use serai_client::{ pub use serai_db::*; -create_db! { +create_db!( SubstrateDb { CosignTriggered: () -> (), IntendedCosign: () -> (u64, Option), BlockHasEvents: (block: u64) -> u8, + LatestCosignedBlock: () -> u64, + NextBlock: () -> u64, + EventDb: (id: &[u8], index: u32) -> (), + BatchInstructionsHashDb: (network: NetworkId, id: u32) -> [u8; 32] } -} +); impl IntendedCosign { pub fn set_intended_cosign(txn: &mut impl DbTxn, intended: u64) { @@ -26,82 +30,32 @@ impl IntendedCosign { } } +impl LatestCosignedBlock { + pub fn latest_cosigned_block(getter: &impl Get) -> u64 { + Self::get(getter).unwrap_or_default().max(1) + } +} + +impl EventDb { + pub fn is_unhandled(getter: &impl Get, id: &[u8], index: u32) -> bool { + Self::get(getter, id, index).is_none() + } + + pub fn handle_event(txn: &mut impl DbTxn, id: &[u8], index: u32) { + assert!(Self::is_unhandled(txn, id, index)); + Self::set(txn, id, index, &()); + } +} + db_channel! { - SubstrateDb { + SubstrateDbChannels { CosignTransactions: (network: NetworkId) -> (Session, u64, [u8; 32]), } } + impl CosignTransactions { // Append a cosign transaction. pub fn append_cosign(txn: &mut impl DbTxn, set: ValidatorSet, number: u64, hash: [u8; 32]) { CosignTransactions::send(txn, set.network, &(set.session, number, hash)) } } - -#[derive(Debug)] -pub struct SubstrateDb(pub D); -impl SubstrateDb { - pub fn new(db: D) -> Self { - Self(db) - } - - fn substrate_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec { - D::key(b"coordinator_substrate", dst, key) - } - - fn next_block_key() -> Vec { - Self::substrate_key(b"next_block", []) - } - pub fn set_next_block(&mut self, block: u64) { - let mut txn = self.0.txn(); - txn.put(Self::next_block_key(), block.to_le_bytes()); - txn.commit(); - } - pub fn next_block(&self) -> u64 { - u64::from_le_bytes(self.0.get(Self::next_block_key()).unwrap_or(vec![0; 8]).try_into().unwrap()) - } - - fn latest_cosigned_block_key() -> Vec { - Self::substrate_key(b"latest_cosigned_block", []) - } - pub fn set_latest_cosigned_block(txn: &mut D::Transaction<'_>, latest_cosigned_block: u64) { - txn.put(Self::latest_cosigned_block_key(), latest_cosigned_block.to_le_bytes()); - } - pub fn latest_cosigned_block(getter: &G) -> u64 { - let db = u64::from_le_bytes( - getter.get(Self::latest_cosigned_block_key()).unwrap_or(vec![0; 8]).try_into().unwrap(), - ); - // Mark the genesis as cosigned - db.max(1) - } - - fn event_key(id: &[u8], index: u32) -> Vec { - Self::substrate_key(b"event", [id, index.to_le_bytes().as_ref()].concat()) - } - pub fn handled_event(getter: &G, id: [u8; 32], index: u32) -> bool { - getter.get(Self::event_key(&id, index)).is_some() - } - pub fn handle_event(txn: &mut D::Transaction<'_>, id: [u8; 32], index: u32) { - assert!(!Self::handled_event(txn, id, index)); - txn.put(Self::event_key(&id, index), []); - } - - fn batch_instructions_key(network: NetworkId, id: u32) -> Vec { - Self::substrate_key(b"batch", (network, id).encode()) - } - pub fn batch_instructions_hash( - getter: &G, - network: NetworkId, - id: u32, - ) -> Option<[u8; 32]> { - getter.get(Self::batch_instructions_key(network, id)).map(|bytes| bytes.try_into().unwrap()) - } - pub fn save_batch_instructions_hash( - txn: &mut D::Transaction<'_>, - network: NetworkId, - id: u32, - hash: [u8; 32], - ) { - txn.put(Self::batch_instructions_key(network, id), hash); - } -} diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index 2d33a9a2..65af3fcb 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -178,7 +178,7 @@ async fn handle_batch_and_burns( network_had_event(&mut burns, &mut batches, network); let mut txn = db.txn(); - SubstrateDb::::save_batch_instructions_hash(&mut txn, network, id, instructions_hash); + BatchInstructionsHashDb::set(&mut txn, network, id, &instructions_hash); txn.commit(); // Make sure this is the only Batch event for this network in this Block @@ -239,7 +239,7 @@ async fn handle_batch_and_burns( // Handle a specific Substrate block, returning an error when it fails to get data // (not blocking / holding) async fn handle_block( - db: &mut SubstrateDb, + db: &mut D, key: &Zeroizing<::F>, new_tributary_spec: &mpsc::UnboundedSender, tributary_retired: &mpsc::UnboundedSender, @@ -268,11 +268,11 @@ async fn handle_block( continue; } - if !SubstrateDb::::handled_event(&db.0, hash, event_id) { + if EventDb::is_unhandled(db, &hash, event_id) { log::info!("found fresh new set event {:?}", new_set); - let mut txn = db.0.txn(); + let mut txn = db.txn(); handle_new_set::(&mut txn, key, new_tributary_spec, serai, &block, set).await?; - SubstrateDb::::handle_event(&mut txn, hash, event_id); + EventDb::handle_event(&mut txn, &hash, event_id); txn.commit(); } event_id += 1; @@ -280,15 +280,15 @@ async fn handle_block( // If a key pair was confirmed, inform the processor for key_gen in serai.as_of(hash).validator_sets().key_gen_events().await? { - if !SubstrateDb::::handled_event(&db.0, hash, event_id) { + if EventDb::is_unhandled(db, &hash, event_id) { log::info!("found fresh key gen event {:?}", key_gen); if let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen { handle_key_gen(processors, serai, &block, set, key_pair).await?; } else { panic!("KeyGen event wasn't KeyGen: {key_gen:?}"); } - let mut txn = db.0.txn(); - SubstrateDb::::handle_event(&mut txn, hash, event_id); + let mut txn = db.txn(); + EventDb::handle_event(&mut txn, &hash, event_id); txn.commit(); } event_id += 1; @@ -303,12 +303,12 @@ async fn handle_block( continue; } - if !SubstrateDb::::handled_event(&db.0, hash, event_id) { + if EventDb::is_unhandled(db, &hash, event_id) { log::info!("found fresh set retired event {:?}", retired_set); - let mut txn = db.0.txn(); + let mut txn = db.txn(); crate::ActiveTributaryDb::retire_tributary(&mut txn, set); tributary_retired.send(set).unwrap(); - SubstrateDb::::handle_event(&mut txn, hash, event_id); + EventDb::handle_event(&mut txn, &hash, event_id); txn.commit(); } event_id += 1; @@ -319,18 +319,18 @@ async fn handle_block( // following events share data collection // This does break the uniqueness of (hash, event_id) -> one event, yet // (network, (hash, event_id)) remains valid as a unique ID for an event - if !SubstrateDb::::handled_event(&db.0, hash, event_id) { - handle_batch_and_burns(&mut db.0, processors, serai, &block).await?; + if EventDb::is_unhandled(db, &hash, event_id) { + handle_batch_and_burns(db, processors, serai, &block).await?; } - let mut txn = db.0.txn(); - SubstrateDb::::handle_event(&mut txn, hash, event_id); + let mut txn = db.txn(); + EventDb::handle_event(&mut txn, &hash, event_id); txn.commit(); Ok(()) } async fn handle_new_blocks( - db: &mut SubstrateDb, + db: &mut D, key: &Zeroizing<::F>, new_tributary_spec: &mpsc::UnboundedSender, tributary_retired: &mpsc::UnboundedSender, @@ -394,7 +394,7 @@ async fn handle_new_blocks( } } - let mut txn = db.0.txn(); + let mut txn = db.txn(); let Some((last_intended_to_cosign_block, mut skipped_block)) = IntendedCosign::get(&txn) else { IntendedCosign::set_intended_cosign(&mut txn, 1); txn.commit(); @@ -506,7 +506,7 @@ async fn handle_new_blocks( // cosigned if let Some(has_no_cosigners) = has_no_cosigners { log::debug!("{} had no cosigners available, marking as cosigned", has_no_cosigners.number()); - SubstrateDb::::set_latest_cosigned_block(&mut txn, has_no_cosigners.number()); + LatestCosignedBlock::set(&mut txn, &has_no_cosigners.number()); } else { CosignTriggered::set(&mut txn, &()); for (set, block, hash) in cosign { @@ -518,7 +518,7 @@ async fn handle_new_blocks( } // Reduce to the latest cosigned block - let latest_number = latest_number.min(SubstrateDb::::latest_cosigned_block(&db.0)); + let latest_number = latest_number.min(LatestCosignedBlock::latest_cosigned_block(db)); if latest_number < *next_block { return Ok(()); @@ -540,7 +540,9 @@ async fn handle_new_blocks( ) .await?; *next_block += 1; - db.set_next_block(*next_block); + let mut txn = db.txn(); + NextBlock::set(&mut txn, next_block); + txn.commit(); log::info!("handled substrate block {b}"); } @@ -548,7 +550,7 @@ async fn handle_new_blocks( } pub async fn scan_task( - db: D, + mut db: D, key: Zeroizing<::F>, processors: Pro, serai: Arc, @@ -556,9 +558,7 @@ pub async fn scan_task( tributary_retired: mpsc::UnboundedSender, ) { log::info!("scanning substrate"); - - let mut db = SubstrateDb::new(db); - let mut next_substrate_block = db.next_block(); + let mut next_substrate_block = NextBlock::get(&db).unwrap_or_default(); /* let new_substrate_block_notifier = { @@ -680,7 +680,7 @@ pub(crate) async fn verify_published_batches( // TODO: Localize from MainDb to SubstrateDb let last = crate::LastVerifiedBatchDb::get(txn, network); for id in last.map(|last| last + 1).unwrap_or(0) ..= optimistic_up_to { - let Some(on_chain) = SubstrateDb::::batch_instructions_hash(txn, network, id) else { + let Some(on_chain) = BatchInstructionsHashDb::get(txn, network, id) else { break; }; let off_chain = crate::ExpectedBatchDb::get(txn, network, id).unwrap();