diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 20b2935f..bcd07c65 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -54,8 +54,8 @@ async fn create_new_tributary(db: D, spec: TributarySpec) { } pub struct ActiveTributary { - spec: TributarySpec, - tributary: Arc>>, + pub spec: TributarySpec, + pub tributary: Arc>>, } // Adds a tributary into the specified HahMap @@ -145,8 +145,9 @@ pub async fn scan_tributaries( } } - // TODO: Instead of holding this lock long term, should this take in Arc RwLock and - // re-acquire read locks? + // TODO: Make a TributaryReader which only requires a DB handle and safely doesn't require + // locks + // Use that here for ActiveTributary { spec, tributary } in tributaries.read().await.values() { tributary::scanner::handle_new_blocks::<_, _, P>( &mut tributary_db, @@ -170,7 +171,7 @@ pub async fn heartbeat_tributaries( tributaries: Arc>>>, ) { let ten_blocks_of_time = - Duration::from_secs((Tributary::::block_time() * 10).into()); + Duration::from_secs((10 * Tributary::::block_time()).into()); loop { for ActiveTributary { spec: _, tributary } in tributaries.read().await.values() { @@ -207,18 +208,16 @@ pub async fn handle_p2p( continue; }; - // This is misleading being read, as it will mutate the Tributary, yet there's - // greater efficiency when it is read - // The safety of it is also justified by Tributary::handle_message's documentation - if tributary.tributary.read().await.handle_message(&msg.msg).await { + if tributary.tributary.write().await.handle_message(&msg.msg).await { P2p::broadcast(&p2p, msg.kind, msg.msg).await; } } + // TODO: Rate limit this P2pMessageKind::Heartbeat(genesis) => { let tributaries_read = tributaries.read().await; let Some(tributary) = tributaries_read.get(&genesis) else { - log::debug!("received hearttbeat message for unknown network"); + log::debug!("received heartbeat message for unknown network"); continue; }; @@ -229,14 +228,21 @@ pub async fn handle_p2p( let tributary_read = tributary.tributary.read().await; + /* // Have sqrt(n) nodes reply with the blocks let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64; // Try to have at least 3 responders if responders < 3 { responders = tributary.spec.n().min(3).into(); } + */ - // Only respond to this if randomly chosen + // Have up to three nodes respond + let responders = u64::from(tributary.spec.n().min(3)); + + // Decide which nodes will respond by using the latest block's hash as a mutually agreed + // upon entropy source + // THis isn't a secure source of entropy, yet it's fine for this let entropy = u64::from_le_bytes(tributary_read.tip().await[.. 8].try_into().unwrap()); // If n = 10, responders = 3, we want start to be 0 ..= 7 (so the highest is 7, 8, 9) // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 @@ -252,9 +258,12 @@ pub async fn handle_p2p( } } if !selected { + log::debug!("received heartbeat and not selected to respond"); continue; } + log::debug!("received heartbeat and selected to respond"); + let mut latest = msg.msg.try_into().unwrap(); // TODO: All of these calls don't *actually* need a read lock, just access to a DB handle // We can reduce lock contention accordingly @@ -273,7 +282,7 @@ pub async fn handle_p2p( continue; }; // Get just the commit - msg.msg.drain((msg.msg.len() - msg_ref.len()) ..); + msg.msg.drain(.. (msg.msg.len() - msg_ref.len())); let tributaries = tributaries.read().await; let Some(tributary) = tributaries.get(&genesis) else { @@ -281,7 +290,12 @@ pub async fn handle_p2p( continue; }; - tributary.tributary.write().await.sync_block(block, msg.msg).await; + // TODO: We take a notable amount of time to add blocks when we're missing provided + // transactions + // Any tributary with missing provided transactions will cause this P2P loop to halt + // Make a separate queue for this + let res = tributary.tributary.write().await.sync_block(block, msg.msg).await; + log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res); } } } diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 710537cf..c4b430b0 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -106,7 +106,7 @@ pub trait P2p: Send + Sync + Clone + Debug + TributaryP2p { // TODO: Move this to tests #[allow(clippy::type_complexity)] #[derive(Clone, Debug)] -pub struct LocalP2p(usize, Arc)>>>>); +pub struct LocalP2p(usize, pub Arc)>>>>); impl LocalP2p { pub fn new(validators: usize) -> Vec { diff --git a/coordinator/src/tests/tributary/handle_p2p.rs b/coordinator/src/tests/tributary/handle_p2p.rs new file mode 100644 index 00000000..9385c5c4 --- /dev/null +++ b/coordinator/src/tests/tributary/handle_p2p.rs @@ -0,0 +1,64 @@ +use core::time::Duration; +use std::{sync::Arc, collections::HashMap}; + +use rand_core::OsRng; + +use ciphersuite::{Ciphersuite, Ristretto}; + +use tokio::{sync::RwLock, time::sleep}; + +use serai_db::MemDb; + +use tributary::Tributary; + +use crate::{ + tributary::Transaction, + LocalP2p, ActiveTributary, handle_p2p, + tests::tributary::{new_keys, new_spec, new_tributaries}, +}; + +#[tokio::test] +async fn handle_p2p_test() { + let keys = new_keys(&mut OsRng); + let spec = new_spec(&mut OsRng, &keys); + + let mut tributaries = new_tributaries(&keys, &spec).await; + + let mut tributary_arcs = vec![]; + for (i, (p2p, tributary)) in tributaries.drain(..).enumerate() { + let tributary = Arc::new(RwLock::new(tributary)); + tributary_arcs.push(tributary.clone()); + tokio::spawn(handle_p2p( + Ristretto::generator() * *keys[i], + p2p, + Arc::new(RwLock::new(HashMap::from([( + spec.genesis(), + ActiveTributary { spec: spec.clone(), tributary }, + )]))), + )); + } + let tributaries = tributary_arcs; + + // After two blocks of time, we should have a new block + // We don't wait one block of time as we may have missed the chance for this block + sleep(Duration::from_secs((2 * Tributary::::block_time()).into())) + .await; + let tip = tributaries[0].read().await.tip().await; + assert!(tip != spec.genesis()); + + // Sleep one second to make sure this block propagates + sleep(Duration::from_secs(1)).await; + // Make sure every tributary has it + for tributary in &tributaries { + assert!(tributary.read().await.block(&tip).is_some()); + } + + // Then after another block of time, we should have yet another new block + sleep(Duration::from_secs(Tributary::::block_time().into())).await; + let new_tip = tributaries[0].read().await.tip().await; + assert!(new_tip != tip); + sleep(Duration::from_secs(1)).await; + for tributary in tributaries { + assert!(tributary.read().await.block(&new_tip).is_some()); + } +} diff --git a/coordinator/src/tests/tributary/mod.rs b/coordinator/src/tests/tributary/mod.rs index 866a979a..33a80a07 100644 --- a/coordinator/src/tests/tributary/mod.rs +++ b/coordinator/src/tests/tributary/mod.rs @@ -17,6 +17,9 @@ mod tx; mod dkg; // TODO: Test the other transactions +mod handle_p2p; +mod sync; + fn random_u32(rng: &mut R) -> u32 { u32::try_from(rng.next_u64() >> 32).unwrap() } diff --git a/coordinator/src/tests/tributary/sync.rs b/coordinator/src/tests/tributary/sync.rs new file mode 100644 index 00000000..c559dda8 --- /dev/null +++ b/coordinator/src/tests/tributary/sync.rs @@ -0,0 +1,129 @@ +use core::time::Duration; +use std::{ + sync::Arc, + collections::{HashSet, HashMap}, +}; + +use rand_core::OsRng; + +use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; + +use tokio::{sync::RwLock, time::sleep}; + +use serai_db::MemDb; + +use tributary::Tributary; + +use crate::{ + tributary::Transaction, + LocalP2p, ActiveTributary, handle_p2p, heartbeat_tributaries, + tests::tributary::{new_keys, new_spec, new_tributaries}, +}; + +#[tokio::test] +async fn sync_test() { + let mut keys = new_keys(&mut OsRng); + let spec = new_spec(&mut OsRng, &keys); + // Ensure this can have a node fail + assert!(spec.n() > spec.t()); + + let mut tributaries = new_tributaries(&keys, &spec).await; + + // Keep a Tributary back, effectively having it offline + let syncer_key = keys.pop().unwrap(); + let (syncer_p2p, syncer_tributary) = tributaries.pop().unwrap(); + + // Have the rest form a P2P net + let mut tributary_arcs = vec![]; + for (i, (p2p, tributary)) in tributaries.drain(..).enumerate() { + let tributary = Arc::new(RwLock::new(tributary)); + tributary_arcs.push(tributary.clone()); + tokio::spawn(handle_p2p( + Ristretto::generator() * *keys[i], + p2p, + Arc::new(RwLock::new(HashMap::from([( + spec.genesis(), + ActiveTributary { spec: spec.clone(), tributary }, + )]))), + )); + } + let tributaries = tributary_arcs; + + // After three blocks of time, we should have a new block + // We don't wait one block of time as we may have missed the chance for the first block + // We don't wait two blocks because we may have missed the chance, and then had a failure to + // propose by our 'offline' validator + let block_time = u64::from(Tributary::::block_time()); + sleep(Duration::from_secs(3 * block_time)).await; + let tip = tributaries[0].read().await.tip().await; + assert!(tip != spec.genesis()); + + // Sleep one second to make sure this block propagates + sleep(Duration::from_secs(1)).await; + // Make sure every tributary has it + for tributary in &tributaries { + assert!(tributary.read().await.block(&tip).is_some()); + } + + // Now that we've confirmed the other tributaries formed a net without issue, drop the syncer's + // pending P2P messages + syncer_p2p.1.write().await.last_mut().unwrap().clear(); + + // Have it join the net + let syncer_key = Ristretto::generator() * *syncer_key; + let syncer_tributary = Arc::new(RwLock::new(syncer_tributary)); + let syncer_tributaries = Arc::new(RwLock::new(HashMap::from([( + spec.genesis(), + ActiveTributary { spec: spec.clone(), tributary: syncer_tributary.clone() }, + )]))); + tokio::spawn(handle_p2p(syncer_key, syncer_p2p.clone(), syncer_tributaries.clone())); + + // It shouldn't automatically catch up. If it somehow was, our test would be broken + // Sanity check this + let tip = tributaries[0].read().await.tip().await; + sleep(Duration::from_secs(2 * block_time)).await; + assert!(tributaries[0].read().await.tip().await != tip); + assert_eq!(syncer_tributary.read().await.tip().await, spec.genesis()); + + // Start the heartbeat protocol + tokio::spawn(heartbeat_tributaries(syncer_p2p, syncer_tributaries)); + + // The heartbeat is once every 10 blocks + sleep(Duration::from_secs(10 * block_time)).await; + assert!(syncer_tributary.read().await.tip().await != spec.genesis()); + + // Verify it synced to the tip + let syncer_tip = { + let tributary = tributaries[0].write().await; + let syncer_tributary = syncer_tributary.write().await; + + let tip = tributary.tip().await; + let syncer_tip = syncer_tributary.tip().await; + // Allow a one block tolerance in case of race conditions + assert!(HashSet::from([tip, tributary.block(&tip).unwrap().parent()]).contains(&syncer_tip)); + syncer_tip + }; + + sleep(Duration::from_secs(block_time)).await; + + // Verify it's now keeping up + assert!(syncer_tributary.read().await.tip().await != syncer_tip); + + // Verify it's now participating in consensus + // Because only `t` validators are used in a commit, check several commits + // This should be biased in favor of the syncer since we're using the syncer's view of the commit + for _ in 0 .. 10 { + let syncer_tributary = syncer_tributary.read().await; + if syncer_tributary + .parsed_commit(&syncer_tributary.tip().await) + .unwrap() + .validators + .iter() + .any(|signer| signer == &syncer_key.to_bytes()) + { + return; + } + sleep(Duration::from_secs(block_time)).await; + } + panic!("synced tributary didn't start participating in consensus"); +}