diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 8bad36dc..cc5a0b7b 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -39,9 +39,7 @@ pub mod tests; // This is a static to satisfy lifetime expectations lazy_static::lazy_static! { - static ref NEW_TRIBUTARIES: Arc>> = Arc::new( - RwLock::new(VecDeque::new()) - ); + static ref NEW_TRIBUTARIES: RwLock> = RwLock::new(VecDeque::new()); } async fn run( @@ -79,6 +77,8 @@ async fn run( ) .await { + // TODO: Should this use a notification system for new blocks? + // Right now it's sleeping for half the block time. Ok(()) => sleep(Duration::from_secs(3)).await, Err(e) => { log::error!("couldn't communicate with serai node: {e}"); @@ -93,8 +93,11 @@ async fn run( { struct ActiveTributary { spec: TributarySpec, - tributary: Tributary, + tributary: Arc>>, } + + // Arc so this can be shared between the Tributary scanner task and the P2P task + // Write locks on this may take a while to acquire let tributaries = Arc::new(RwLock::new(HashMap::<[u8; 32], ActiveTributary>::new())); async fn add_tributary( @@ -116,7 +119,10 @@ async fn run( .await .unwrap(); - tributaries.insert(tributary.genesis(), ActiveTributary { spec, tributary }); + tributaries.insert( + tributary.genesis(), + ActiveTributary { spec, tributary: Arc::new(RwLock::new(tributary)) }, + ); } // Reload active tributaries from the database @@ -140,10 +146,9 @@ async fn run( tokio::spawn(async move { loop { // The following handle_new_blocks function may take an arbitrary amount of time - // If registering a new tributary waited for a lock on the tributaries table, the - // substrate scanner may wait on a lock for an arbitrary amount of time - // By instead using the distinct NEW_TRIBUTARIES, there should be minimal - // competition/blocking + // Accordingly, it may take a long time to acquire a write lock on the tributaries table + // By definition of NEW_TRIBUTARIES, we allow tributaries to be added almost immediately, + // meaning the Substrate scanner won't become blocked on this { let mut new_tributaries = NEW_TRIBUTARIES.write().await; while let Some(spec) = new_tributaries.pop_front() { @@ -159,20 +164,22 @@ async fn run( } } - // Unknown-length read acquisition. This would risk screwing over the P2P process EXCEPT - // they both use read locks. Accordingly, they can co-exist + // TODO: Instead of holding this lock long term, should this take in Arc RwLock and + // re-acquire read locks? for ActiveTributary { spec, tributary } in tributaries.read().await.values() { tributary::scanner::handle_new_blocks::<_, _, P>( &mut tributary_db, &key, &mut processor, spec, - tributary, + &*tributary.read().await, ) .await; } - sleep(Duration::from_secs(3)).await; + // Sleep for half the block time + // TODO: Should we define a notification system for when a new block occurs? + sleep(Duration::from_secs(Tributary::::block_time() / 2)).await; } }); } @@ -190,7 +197,10 @@ async fn run( continue; }; - if tributary.tributary.handle_message(&msg.msg).await { + // This is misleading being read, as it will mutate the Tributary, yet there's + // greater efficiency when it is read + // The safety of it is also justified by Tributary::handle_message's documentation + if tributary.tributary.read().await.handle_message(&msg.msg).await { P2p::broadcast(&p2p, msg.kind, msg.msg).await; } } diff --git a/coordinator/tributary/README.md b/coordinator/tributary/README.md index be75b102..6fce976e 100644 --- a/coordinator/tributary/README.md +++ b/coordinator/tributary/README.md @@ -1,3 +1,3 @@ # Tributary -A micro-blockchain to provide consensus and ordering to P2P communication. +A verifiable, ordered broadcast layer implemented as a BFT micro-blockchain. diff --git a/coordinator/tributary/src/tendermint.rs b/coordinator/tributary/src/tendermint.rs index dc97baec..9e7eefbd 100644 --- a/coordinator/tributary/src/tendermint.rs +++ b/coordinator/tributary/src/tendermint.rs @@ -240,6 +240,9 @@ impl Network for TendermintNetwork { type Weights = Arc; type Block = TendermintBlock; + // These are in seconds and create a six-second block time. + // The block time is the latency on message delivery (where a message is some piece of data + // embedded in a transaction), hence why it should be kept low. const BLOCK_PROCESSING_TIME: u32 = 3; const LATENCY_TIME: u32 = 1; @@ -307,7 +310,7 @@ impl Network for TendermintNetwork { hex::encode(hash), hex::encode(self.genesis) ); - sleep(Duration::from_secs(30)).await; + sleep(Duration::from_secs(Tendermint::::block_time())).await; } _ => return invalid_block(), }