diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 308d629b..e52b7fdc 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -99,7 +99,7 @@ async fn add_tributary( spec.n(), spec .i(Ristretto::generator() * key.deref()) - .expect("adding a tribuary for a set we aren't in set for"), + .expect("adding a tribtuary for a set we aren't in set for"), ) .unwrap(), }, @@ -195,13 +195,22 @@ pub async fn scan_substrate( } } +pub(crate) trait RIDTrait: + Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid +{ +} +impl FRid> + RIDTrait for F +{ +} + #[allow(clippy::type_complexity)] -pub async fn scan_tributaries< +pub(crate) async fn scan_tributaries< D: Db, Pro: Processors, P: P2p, - FRid: Future, - RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid, + FRid: Send + Future, + RID: 'static + Send + Sync + RIDTrait, >( raw_db: D, key: Zeroizing<::F>, @@ -213,64 +222,74 @@ pub async fn scan_tributaries< ) { log::info!("scanning tributaries"); - // Handle new Tributary blocks - let mut tributary_readers = vec![]; - let mut tributary_db = tributary::TributaryDb::new(raw_db.clone()); loop { - while let Ok(ActiveTributary { spec, tributary }) = { - match new_tributary.try_recv() { - Ok(tributary) => Ok(tributary), - Err(broadcast::error::TryRecvError::Empty) => Err(()), - Err(broadcast::error::TryRecvError::Lagged(_)) => { - panic!("scan_tributaries lagged to handle new_tributary") - } - Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"), - } - } { - tributary_readers.push((spec, tributary.reader())); - } - - for (spec, reader) in &tributary_readers { - tributary::scanner::handle_new_blocks::<_, _, _, _, _, _, P>( - &mut tributary_db, - &key, - recognized_id.clone(), - &processors, - |set, tx| { + match new_tributary.recv().await { + Ok(ActiveTributary { spec, tributary }) => { + // For each Tributary, spawn a dedicated scanner task + tokio::spawn({ + let raw_db = raw_db.clone(); + let key = key.clone(); + let recognized_id = recognized_id.clone(); + let p2p = p2p.clone(); + let processors = processors.clone(); let serai = serai.clone(); async move { + let spec = &spec; + let reader = tributary.reader(); + let mut tributary_db = tributary::TributaryDb::new(raw_db.clone()); loop { - match serai.publish(&tx).await { - Ok(_) => { - log::info!("set key pair for {set:?}"); - break; - } - // This is assumed to be some ephemeral error due to the assumed fault-free - // creation - // TODO2: Differentiate connection errors from invariants - Err(e) => { - // Check if this failed because the keys were already set by someone else - if matches!(serai.get_keys(spec.set()).await, Ok(Some(_))) { - log::info!("another coordinator set key pair for {:?}", set); - break; - } + tributary::scanner::handle_new_blocks::<_, _, _, _, _, _, P>( + &mut tributary_db, + &key, + recognized_id.clone(), + &processors, + |set, tx| { + let serai = serai.clone(); + async move { + loop { + match serai.publish(&tx).await { + Ok(_) => { + log::info!("set key pair for {set:?}"); + break; + } + // This is assumed to be some ephemeral error due to the assumed fault-free + // creation + // TODO2: Differentiate connection errors from invariants + Err(e) => { + // Check if this failed because the keys were already set by someone else + if matches!(serai.get_keys(spec.set()).await, Ok(Some(_))) { + log::info!("another coordinator set key pair for {:?}", set); + break; + } - log::error!("couldn't connect to Serai node to publish set_keys TX: {:?}", e); - tokio::time::sleep(Duration::from_secs(10)).await; - } - } + log::error!( + "couldn't connect to Serai node to publish set_keys TX: {:?}", + e + ); + tokio::time::sleep(Duration::from_secs(10)).await; + } + } + } + } + }, + spec, + &reader, + ) + .await; + + // Sleep for half the block time + // TODO: Define a notification system for when a new block occurs + sleep(Duration::from_secs((Tributary::::block_time() / 2).into())) + .await; } } - }, - spec, - reader, - ) - .await; + }); + } + Err(broadcast::error::RecvError::Lagged(_)) => { + panic!("scan_tributaries lagged to handle new_tributary") + } + Err(broadcast::error::RecvError::Closed) => panic!("new_tributary sender closed"), } - - // Sleep for half the block time - // TODO2: Define a notification system for when a new block occurs - sleep(Duration::from_secs((Tributary::::block_time() / 2).into())).await; } } @@ -288,7 +307,7 @@ pub async fn heartbeat_tributaries( Ok(tributary) => Ok(tributary), Err(broadcast::error::TryRecvError::Empty) => Err(()), Err(broadcast::error::TryRecvError::Lagged(_)) => { - panic!("heartbeat lagged to handle new_tributary") + panic!("heartbeat_tributaries lagged to handle new_tributary") } Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"), } @@ -328,30 +347,27 @@ pub async fn handle_p2p( p2p: P, mut new_tributary: broadcast::Receiver>, ) { - // TODO: Merge this into the below loop. We don't need an extra task here let tributaries = Arc::new(RwLock::new(HashMap::new())); - tokio::spawn({ - let tributaries = tributaries.clone(); - async move { - loop { - match new_tributary.recv().await { - Ok(tributary) => { - tributaries.write().await.insert(tributary.spec.genesis(), tributary); - } - Err(broadcast::error::RecvError::Lagged(_)) => { - panic!("handle_p2p lagged to handle new_tributary") - } - Err(broadcast::error::RecvError::Closed) => panic!("new_tributary sender closed"), - } - } - } - }); - loop { + while let Ok(tributary) = { + match new_tributary.try_recv() { + Ok(tributary) => Ok(tributary), + Err(broadcast::error::TryRecvError::Empty) => Err(()), + Err(broadcast::error::TryRecvError::Lagged(_)) => { + panic!("handle_p2p lagged to handle new_tributary") + } + Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"), + } + } { + // TODO: Because the below maintains a read lock, this will never process until all prior P2P + // messages were handled. That's a notable latency spike + tributaries.write().await.insert(tributary.spec.genesis(), tributary); + } + let mut msg = p2p.receive().await; // Spawn a dedicated task to handle this message, ensuring any singularly latent message // doesn't hold everything up - // TODO2: Move to one task per tributary (or two. One for Tendermint, one for Tributary) + // TODO: Move to one task per tributary (or two. One for Tendermint, one for Tributary) tokio::spawn({ let p2p = p2p.clone(); let tributaries = tributaries.clone(); @@ -494,26 +510,21 @@ pub async fn handle_processors( ) { let pub_key = Ristretto::generator() * key.deref(); - // TODO: Merge this into the below loop. We don't need an extra task here - let tributaries = Arc::new(RwLock::new(HashMap::new())); - tokio::spawn({ - let tributaries = tributaries.clone(); - async move { - loop { - match new_tributary.recv().await { - Ok(tributary) => { - tributaries.write().await.insert(tributary.spec.genesis(), tributary); - } - Err(broadcast::error::RecvError::Lagged(_)) => { - panic!("handle_processors lagged to handle new_tributary") - } - Err(broadcast::error::RecvError::Closed) => panic!("new_tributary sender closed"), - } - } - } - }); - + let mut tributaries = HashMap::new(); loop { + while let Ok(tributary) = { + match new_tributary.try_recv() { + Ok(tributary) => Ok(tributary), + Err(broadcast::error::TryRecvError::Empty) => Err(()), + Err(broadcast::error::TryRecvError::Lagged(_)) => { + panic!("handle_processors lagged to handle new_tributary") + } + Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"), + } + } { + tributaries.insert(tributary.spec.genesis(), tributary); + } + // TODO: Dispatch this message to a task dedicated to handling this processor, preventing one // processor from holding up all the others. This would require a peek method be added to the // message-queue (to view multiple future messages at once) @@ -525,7 +536,7 @@ pub async fn handle_processors( // (which means there's a lack of multisig rotation) let spec = { let mut spec = None; - for tributary in tributaries.read().await.values() { + for tributary in tributaries.values() { if tributary.spec.set().network == msg.network { spec = Some(tributary.spec.clone()); break; @@ -763,7 +774,6 @@ pub async fn handle_processors( // If this created a transaction, publish it if let Some(mut tx) = tx { log::trace!("processor message effected transaction {}", hex::encode(tx.hash())); - let tributaries = tributaries.read().await; let Some(tributary) = tributaries.get(&genesis) else { // TODO: This can happen since Substrate tells the Processor to generate commitments // at the same time it tells the Tributary to be created diff --git a/coordinator/src/tributary/handle.rs b/coordinator/src/tributary/handle.rs index c1476dc4..257f6932 100644 --- a/coordinator/src/tributary/handle.rs +++ b/coordinator/src/tributary/handle.rs @@ -17,7 +17,6 @@ use frost_schnorrkel::Schnorrkel; use serai_client::{ Signature, - primitives::NetworkId, validator_sets::primitives::{ValidatorSet, KeyPair, musig_context, set_keys_message}, subxt::utils::Encoded, Serai, @@ -225,13 +224,13 @@ pub fn generated_key_pair( DkgConfirmer::share(spec, key, attempt, preprocesses, key_pair) } -pub async fn handle_application_tx< +pub(crate) async fn handle_application_tx< D: Db, Pro: Processors, FPst: Future, PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, FRid: Future, - RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid, + RID: crate::RIDTrait, >( tx: Transaction, spec: &TributarySpec, diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index a3708b71..5d8f0016 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -4,9 +4,7 @@ use zeroize::Zeroizing; use ciphersuite::{Ciphersuite, Ristretto}; -use serai_client::{ - primitives::NetworkId, validator_sets::primitives::ValidatorSet, subxt::utils::Encoded, -}; +use serai_client::{validator_sets::primitives::ValidatorSet, subxt::utils::Encoded}; use tributary::{ Transaction as TributaryTransaction, Block, TributaryReader, @@ -40,7 +38,7 @@ async fn handle_block< FPst: Future, PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, FRid: Future, - RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid, + RID: crate::RIDTrait, P: P2p, >( db: &mut TributaryDb, @@ -101,13 +99,13 @@ async fn handle_block< // TODO2: Trigger any necessary re-attempts } -pub async fn handle_new_blocks< +pub(crate) async fn handle_new_blocks< D: Db, Pro: Processors, FPst: Future, PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, FRid: Future, - RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid, + RID: crate::RIDTrait, P: P2p, >( db: &mut TributaryDb,