From 2b09309adca6dd8def0da1781e3f2c595441f018 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 23 Apr 2023 03:48:50 -0400 Subject: [PATCH] Handle adding new Tributaries Removes last_block as an argument from Tendermint. It now loads from the DB as needed. While slightly less performant, it's easiest and should be fine. --- Cargo.lock | 1 + coordinator/Cargo.toml | 1 + coordinator/src/main.rs | 83 ++++++++++++++++++++++---- coordinator/src/substrate/mod.rs | 49 ++++++++------- coordinator/src/tests/tributary/dkg.rs | 51 ++++------------ coordinator/src/tests/tributary/mod.rs | 1 + coordinator/src/tributary/db.rs | 2 +- coordinator/src/tributary/scanner.rs | 41 +++++++++---- 8 files changed, 149 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b798a7ae..aaf0c4b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1309,6 +1309,7 @@ dependencies = [ "blake2", "ciphersuite", "flexible-transcript", + "lazy_static", "log", "modular-frost", "parity-scale-codec", diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index 1daf5def..a2620c21 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -15,6 +15,7 @@ rustdoc-args = ["--cfg", "docsrs"] [dependencies] async-trait = "0.1" +lazy_static = "1" zeroize = "^1.5" rand_core = "0.6" diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 6a1f63cf..50aebb48 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -3,7 +3,11 @@ #![allow(unreachable_code)] #![allow(clippy::diverging_sub_expression)] -use std::{time::Duration, collections::HashMap}; +use std::{ + sync::Arc, + time::Duration, + collections::{VecDeque, HashMap}, +}; use zeroize::Zeroizing; @@ -12,9 +16,12 @@ use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto}; use serai_db::{Db, MemDb}; use serai_client::Serai; -use tokio::time::sleep; +use tokio::{sync::RwLock, time::sleep}; + +use ::tributary::Tributary; mod tributary; +use crate::tributary::{TributarySpec, Transaction}; mod p2p; pub use p2p::*; @@ -27,6 +34,13 @@ mod substrate; #[cfg(test)] pub mod tests; +// This is a static to satisfy lifetime expectations +lazy_static::lazy_static! { + static ref NEW_TRIBUTARIES: Arc>> = Arc::new( + RwLock::new(VecDeque::new()) + ); +} + async fn run( raw_db: D, key: Zeroizing<::F>, @@ -34,11 +48,17 @@ async fn run( mut processor: Pro, serai: Serai, ) { - let mut substrate_db = substrate::SubstrateDb::new(raw_db.clone()); - let mut last_substrate_block = substrate_db.last_block(); - let mut last_tributary_block = HashMap::<[u8; 32], _>::new(); + let add_new_tributary = |spec: TributarySpec| async { + NEW_TRIBUTARIES.write().await.push_back(spec); + // TODO: Save this tributary's information to the databae before returning + }; { + let mut substrate_db = substrate::SubstrateDb::new(raw_db.clone()); + let mut last_substrate_block = substrate_db.last_block(); + + let p2p = p2p.clone(); + let key = key.clone(); let mut processor = processor.clone(); tokio::spawn(async move { @@ -46,6 +66,7 @@ async fn run( match substrate::handle_new_blocks( &mut substrate_db, &key, + add_new_tributary, &p2p, &mut processor, &serai, @@ -64,20 +85,62 @@ async fn run( } { - let mut tributary_db = tributary::TributaryDb::new(raw_db); + struct ActiveTributary { + spec: TributarySpec, + tributary: Tributary, + } + + let mut tributaries = HashMap::<[u8; 32], ActiveTributary>::new(); + + async fn add_tributary( + db: D, + key: Zeroizing<::F>, + p2p: P, + tributaries: &mut HashMap<[u8; 32], ActiveTributary>, + spec: TributarySpec, + ) { + let tributary = Tributary::<_, Transaction, _>::new( + db, + spec.genesis(), + spec.start_time(), + key, + spec.validators(), + p2p, + ) + .await + .unwrap(); + + tributaries.insert(tributary.genesis(), ActiveTributary { spec, tributary }); + } + + // TODO: Reload tributaries + + let mut tributary_db = tributary::TributaryDb::new(raw_db.clone()); tokio::spawn(async move { loop { - for (_, last_block) in last_tributary_block.iter_mut() { + // The following handle_new_blocks function may take an arbitrary amount of time + // If registering a new tributary waited for a lock on the tributaries table, the substrate + // scanner may wait on a lock for an arbitrary amount of time + // By instead using the distinct NEW_TRIBUTARIES, there should be minimal + // competition/blocking + { + let mut new_tributaries = NEW_TRIBUTARIES.write().await; + while let Some(spec) = new_tributaries.pop_front() { + add_tributary(raw_db.clone(), key.clone(), p2p.clone(), &mut tributaries, spec).await; + } + } + + for (genesis, ActiveTributary { spec, tributary }) in tributaries.iter_mut() { tributary::scanner::handle_new_blocks::<_, _, P>( &mut tributary_db, &key, &mut processor, - todo!(), - todo!(), - last_block, + spec, + tributary, ) .await; } + sleep(Duration::from_secs(3)).await; } }); diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index 887e58dc..d7e56ef1 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -1,4 +1,4 @@ -use core::ops::Deref; +use core::{ops::Deref, future::Future}; use std::collections::{HashSet, HashMap}; use zeroize::Zeroizing; @@ -19,8 +19,6 @@ use serai_client::{ use serai_db::DbTxn; -use tributary::Tributary; - use processor_messages::{SubstrateContext, key_gen::KeyGenId, CoordinatorMessage}; use crate::{Db, P2p, processor::Processor, tributary::TributarySpec}; @@ -40,10 +38,15 @@ async fn in_set( Ok(Some(data.participants.iter().any(|(participant, _)| participant.0 == key))) } -async fn handle_new_set( +async fn handle_new_set< + D: Db, + Fut: Future, + ANT: Clone + Fn(TributarySpec) -> Fut, + Pro: Processor, +>( db: D, key: &Zeroizing<::F>, - p2p: &P, + add_new_tributary: ANT, processor: &mut Pro, serai: &Serai, block: &Block, @@ -53,24 +56,14 @@ async fn handle_new_set( let set_data = serai.get_validator_set(set).await?.expect("NewSet for set which doesn't exist"); let spec = TributarySpec::new(block.hash(), block.time().unwrap(), set, set_data); - - // TODO: Do something with this - let tributary = Tributary::<_, crate::tributary::Transaction, _>::new( - db, - spec.genesis(), - spec.start_time(), - key.clone(), - spec.validators(), - p2p.clone(), - ) - .await - .unwrap(); + add_new_tributary(spec.clone()); // Trigger a DKG // TODO: Check how the processor handles this being fired multiple times // We already have a unique event ID based on block, event index (where event index is // the one generated in this handle_block function) // We could use that on this end and the processor end? + // TODO: Should this be handled in the Tributary code? processor .send(CoordinatorMessage::KeyGen( processor_messages::key_gen::CoordinatorMessage::GenerateKey { @@ -214,9 +207,16 @@ async fn handle_batch_and_burns( // Handle a specific Substrate block, returning an error when it fails to get data // (not blocking / holding) -async fn handle_block( +async fn handle_block< + D: Db, + Fut: Future, + ANT: Clone + Fn(TributarySpec) -> Fut, + Pro: Processor, + P: P2p, +>( db: &mut SubstrateDb, key: &Zeroizing<::F>, + add_new_tributary: ANT, p2p: &P, processor: &mut Pro, serai: &Serai, @@ -236,7 +236,8 @@ async fn handle_block( if !SubstrateDb::::handled_event(&db.0, hash, event_id) { if let ValidatorSetsEvent::NewSet { set } = new_set { // TODO2: Use a DB on a dedicated volume - handle_new_set(db.0.clone(), key, p2p, processor, serai, &block, set).await?; + handle_new_set(db.0.clone(), key, add_new_tributary.clone(), processor, serai, &block, set) + .await?; } else { panic!("NewSet event wasn't NewSet: {new_set:?}"); } @@ -277,9 +278,16 @@ async fn handle_block( Ok(()) } -pub async fn handle_new_blocks( +pub async fn handle_new_blocks< + D: Db, + Fut: Future, + ANT: Clone + Fn(TributarySpec) -> Fut, + Pro: Processor, + P: P2p, +>( db: &mut SubstrateDb, key: &Zeroizing<::F>, + add_new_tributary: ANT, p2p: &P, processor: &mut Pro, serai: &Serai, @@ -297,6 +305,7 @@ pub async fn handle_new_blocks( handle_block( db, key, + add_new_tributary.clone(), p2p, processor, serai, diff --git a/coordinator/src/tests/tributary/dkg.rs b/coordinator/src/tests/tributary/dkg.rs index 66d1e8d1..2cc3fd3c 100644 --- a/coordinator/src/tests/tributary/dkg.rs +++ b/coordinator/src/tests/tributary/dkg.rs @@ -78,35 +78,25 @@ async fn dkg_test() { key: &Zeroizing<::F>, spec: &TributarySpec, tributary: &Tributary, - ) -> (TributaryDb, MemProcessor, [u8; 32]) { + ) -> (TributaryDb, MemProcessor) { let mut scanner_db = TributaryDb(MemDb::new()); let mut processor = MemProcessor::new(); - let mut last_block = tributary.genesis(); - handle_new_blocks(&mut scanner_db, key, &mut processor, spec, tributary, &mut last_block).await; - assert!(last_block != tributary.genesis()); - (scanner_db, processor, last_block) + handle_new_blocks(&mut scanner_db, key, &mut processor, spec, tributary).await; + (scanner_db, processor) } // Instantiate a scanner and verify it has nothing to report - let (mut scanner_db, mut processor, mut last_block) = - new_processor(&keys[0], &spec, &tributaries[0].1).await; + let (mut scanner_db, mut processor) = new_processor(&keys[0], &spec, &tributaries[0].1).await; assert!(processor.0.read().unwrap().is_empty()); // Publish the last commitment + let block_before_tx = tributaries[0].1.tip(); assert!(tributaries[0].1.add_transaction(txs[0].clone()).await); - wait_for_tx_inclusion(&tributaries[0].1, last_block, txs[0].hash()).await; + wait_for_tx_inclusion(&tributaries[0].1, block_before_tx, txs[0].hash()).await; sleep(Duration::from_secs(Tributary::::block_time().into())).await; // Verify the scanner emits a KeyGen::Commitments message - handle_new_blocks( - &mut scanner_db, - &keys[0], - &mut processor, - &spec, - &tributaries[0].1, - &mut last_block, - ) - .await; + handle_new_blocks(&mut scanner_db, &keys[0], &mut processor, &spec, &tributaries[0].1).await; { let mut msgs = processor.0.write().unwrap(); assert_eq!(msgs.pop_front().unwrap(), expected_commitments); @@ -115,7 +105,7 @@ async fn dkg_test() { // Verify all keys exhibit this scanner behavior for (i, key) in keys.iter().enumerate() { - let (_, processor, _) = new_processor(key, &spec, &tributaries[i].1).await; + let (_, processor) = new_processor(key, &spec, &tributaries[i].1).await; let mut msgs = processor.0.write().unwrap(); assert_eq!(msgs.pop_front().unwrap(), expected_commitments); assert!(msgs.is_empty()); @@ -147,20 +137,13 @@ async fn dkg_test() { } // With just 4 sets of shares, nothing should happen yet - handle_new_blocks( - &mut scanner_db, - &keys[0], - &mut processor, - &spec, - &tributaries[0].1, - &mut last_block, - ) - .await; + handle_new_blocks(&mut scanner_db, &keys[0], &mut processor, &spec, &tributaries[0].1).await; assert!(processor.0.write().unwrap().is_empty()); // Publish the final set of shares + let block_before_tx = tributaries[0].1.tip(); assert!(tributaries[0].1.add_transaction(txs[0].clone()).await); - wait_for_tx_inclusion(&tributaries[0].1, last_block, txs[0].hash()).await; + wait_for_tx_inclusion(&tributaries[0].1, block_before_tx, txs[0].hash()).await; sleep(Duration::from_secs(Tributary::::block_time().into())).await; // Each scanner should emit a distinct shares message @@ -185,15 +168,7 @@ async fn dkg_test() { }; // Any scanner which has handled the prior blocks should only emit the new event - handle_new_blocks( - &mut scanner_db, - &keys[0], - &mut processor, - &spec, - &tributaries[0].1, - &mut last_block, - ) - .await; + handle_new_blocks(&mut scanner_db, &keys[0], &mut processor, &spec, &tributaries[0].1).await; { let mut msgs = processor.0.write().unwrap(); assert_eq!(msgs.pop_front().unwrap(), shares_for(0)); @@ -202,7 +177,7 @@ async fn dkg_test() { // Yet new scanners should emit all events for (i, key) in keys.iter().enumerate() { - let (_, processor, _) = new_processor(key, &spec, &tributaries[i].1).await; + let (_, processor) = new_processor(key, &spec, &tributaries[i].1).await; let mut msgs = processor.0.write().unwrap(); assert_eq!(msgs.pop_front().unwrap(), expected_commitments); assert_eq!(msgs.pop_front().unwrap(), shares_for(i)); diff --git a/coordinator/src/tests/tributary/mod.rs b/coordinator/src/tests/tributary/mod.rs index cee7249a..866a979a 100644 --- a/coordinator/src/tests/tributary/mod.rs +++ b/coordinator/src/tests/tributary/mod.rs @@ -15,6 +15,7 @@ pub use chain::*; mod tx; mod dkg; +// TODO: Test the other transactions fn random_u32(rng: &mut R) -> u32 { u32::try_from(rng.next_u64() >> 32).unwrap() diff --git a/coordinator/src/tributary/db.rs b/coordinator/src/tributary/db.rs index e860d1bd..940e3e0d 100644 --- a/coordinator/src/tributary/db.rs +++ b/coordinator/src/tributary/db.rs @@ -24,7 +24,7 @@ impl TributaryDb { txn.commit(); } pub fn last_block(&self, genesis: [u8; 32]) -> [u8; 32] { - self.0.get(Self::block_key(genesis)).unwrap_or(genesis.to_vec()).try_into().unwrap() + self.0.get(Self::block_key(genesis)).map(|last| last.try_into().unwrap()).unwrap_or(genesis) } // This shouldn't need genesis? Yet it's saner to have then quibble about. diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index e5299410..e9f5fdbf 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -1,5 +1,5 @@ use core::ops::Deref; -use std::collections::HashMap; +use std::collections::{VecDeque, HashMap}; use zeroize::Zeroizing; @@ -296,25 +296,44 @@ pub async fn handle_new_blocks( processor: &mut Pro, spec: &TributarySpec, tributary: &Tributary, - last_block: &mut [u8; 32], ) { + let last_block = db.last_block(tributary.genesis()); + // Check if there's been a new Tributary block let latest = tributary.tip(); - if latest == *last_block { + if latest == last_block { return; } - // TODO: Only handle n blocks at a time. - // This may load the entire chain into RAM as-is. - let mut blocks = vec![tributary.block(&latest).unwrap()]; - while blocks.last().unwrap().parent() != *last_block { - blocks.push(tributary.block(&blocks.last().unwrap().parent()).unwrap()); + let mut blocks = VecDeque::new(); + // This is a new block, as per the prior if check + blocks.push_back(tributary.block(&latest).unwrap()); + + let mut block = None; + while { + let parent = blocks.back().unwrap().parent(); + // If the parent is the genesis, we've reached the end + if parent == tributary.genesis() { + false + } else { + // Get this block + block = Some(tributary.block(&parent).unwrap()); + // If it's the last block we've scanned, it's the end. Else, push it + block.as_ref().unwrap().hash() != last_block + } + } { + blocks.push_back(block.take().unwrap()); + + // Prevent this from loading the entire chain into RAM by setting a limit of 1000 blocks at a + // time (roughly 350 MB under the current block size limit) + if blocks.len() > 1000 { + blocks.pop_front(); + } } - while let Some(block) = blocks.pop() { + while let Some(block) = blocks.pop_back() { let hash = block.hash(); handle_block(db, key, processor, spec, tributary, block).await; - *last_block = hash; - db.set_last_block(tributary.genesis(), *last_block); + db.set_last_block(tributary.genesis(), hash); } }