From 6032af6692a5ce51fbecd24fb6b233e3d38efc7f Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 26 Apr 2023 00:10:06 -0400 Subject: [PATCH] Have Coordinator MainDb take a mutable borrow --- coordinator/src/db.rs | 6 +++--- coordinator/src/main.rs | 29 ++++++++++++++--------------- coordinator/src/substrate/mod.rs | 20 ++++++++++---------- 3 files changed, 27 insertions(+), 28 deletions(-) diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index f1fd2f7a..a1d36f1c 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -3,9 +3,9 @@ pub use serai_db::*; use crate::tributary::TributarySpec; #[derive(Debug)] -pub struct MainDb(pub D); -impl MainDb { - pub fn new(db: D) -> Self { +pub struct MainDb<'a, D: Db>(&'a mut D); +impl<'a, D: Db> MainDb<'a, D> { + pub fn new(db: &'a mut D) -> Self { Self(db) } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 01b1905c..b8e6dfba 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] #![allow(unused_variables)] #![allow(unreachable_code)] #![allow(clippy::diverging_sub_expression)] @@ -46,16 +45,6 @@ lazy_static::lazy_static! { static ref NEW_TRIBUTARIES: RwLock> = RwLock::new(VecDeque::new()); } -// Specifies a new tributary -async fn create_new_tributary(db: D, spec: TributarySpec) { - // Save it to the database - MainDb(db).add_active_tributary(&spec); - // Add it to the queue - // If we reboot before this is read from the queue, the fact it was saved to the database - // means it'll be handled on reboot - NEW_TRIBUTARIES.write().await.push_back(spec); -} - pub struct ActiveTributary { pub spec: TributarySpec, pub tributary: Arc>>, @@ -104,7 +93,17 @@ pub async fn scan_substrate( match substrate::handle_new_blocks( &mut db, &key, - create_new_tributary, + |db: &mut D, spec: TributarySpec| { + // Save it to the database + MainDb::new(db).add_active_tributary(&spec); + + // Add it to the queue + // If we reboot before this is read from the queue, the fact it was saved to the database + // means it'll be handled on reboot + async { + NEW_TRIBUTARIES.write().await.push_back(spec); + } + }, &processor, &serai, &mut last_substrate_block, @@ -409,6 +408,7 @@ pub async fn handle_processors( log::warn!("we've already published this transaction. this should only appear on reboot"); } else { // We should've created a valid transaction + // TODO: Delay SignPreprocess/BatchPreprocess until associated ID is valid assert!(tributary.add_transaction(tx).await, "created an invalid transaction"); } @@ -418,7 +418,7 @@ pub async fn handle_processors( } pub async fn run( - raw_db: D, + mut raw_db: D, key: Zeroizing<::F>, p2p: P, processor: Pro, @@ -434,8 +434,7 @@ pub async fn run( let tributaries = Arc::new(RwLock::new(HashMap::<[u8; 32], ActiveTributary>::new())); // Reload active tributaries from the database - // TODO: Can MainDb take a borrow? - for spec in MainDb(raw_db.clone()).active_tributaries().1 { + for spec in MainDb::new(&mut raw_db).active_tributaries().1 { let _ = add_tributary( raw_db.clone(), key.clone(), diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index 7045f522..cccc8d66 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -41,12 +41,12 @@ async fn in_set( async fn handle_new_set< D: Db, Fut: Future, - ANT: Clone + Fn(D, TributarySpec) -> Fut, + CNT: Clone + Fn(&mut D, TributarySpec) -> Fut, Pro: Processor, >( - db: &D, + db: &mut D, key: &Zeroizing<::F>, - add_new_tributary: ANT, + create_new_tributary: CNT, processor: &Pro, serai: &Serai, block: &Block, @@ -56,7 +56,7 @@ async fn handle_new_set< let set_data = serai.get_validator_set(set).await?.expect("NewSet for set which doesn't exist"); let spec = TributarySpec::new(block.hash(), block.time().unwrap(), set, set_data); - add_new_tributary(db.clone(), spec.clone()); + create_new_tributary(db, spec.clone()); // Trigger a DKG // TODO: Check how the processor handles this being fired multiple times @@ -210,12 +210,12 @@ async fn handle_batch_and_burns( async fn handle_block< D: Db, Fut: Future, - ANT: Clone + Fn(D, TributarySpec) -> Fut, + CNT: Clone + Fn(&mut D, TributarySpec) -> Fut, Pro: Processor, >( db: &mut SubstrateDb, key: &Zeroizing<::F>, - add_new_tributary: ANT, + create_new_tributary: CNT, processor: &Pro, serai: &Serai, block: Block, @@ -233,7 +233,7 @@ async fn handle_block< // stable) if !SubstrateDb::::handled_event(&db.0, hash, event_id) { if let ValidatorSetsEvent::NewSet { set } = new_set { - handle_new_set(&db.0, key, add_new_tributary.clone(), processor, serai, &block, set) + handle_new_set(&mut db.0, key, create_new_tributary.clone(), processor, serai, &block, set) .await?; } else { panic!("NewSet event wasn't NewSet: {new_set:?}"); @@ -278,12 +278,12 @@ async fn handle_block< pub async fn handle_new_blocks< D: Db, Fut: Future, - ANT: Clone + Fn(D, TributarySpec) -> Fut, + CNT: Clone + Fn(&mut D, TributarySpec) -> Fut, Pro: Processor, >( db: &mut SubstrateDb, key: &Zeroizing<::F>, - add_new_tributary: ANT, + create_new_tributary: CNT, processor: &Pro, serai: &Serai, last_block: &mut u64, @@ -300,7 +300,7 @@ pub async fn handle_new_blocks< handle_block( db, key, - add_new_tributary.clone(), + create_new_tributary.clone(), processor, serai, if b == latest_number {