diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 2e20a78d..54fbbb9e 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -791,11 +791,14 @@ pub async fn run( }); // When we reach synchrony on an event requiring signing, send our preprocess for it + // TODO: Properly place this into the Tributary scanner, as it's a mess out here let recognized_id = { let raw_db = raw_db.clone(); let key = key.clone(); let tributaries = Arc::new(RwLock::new(HashMap::new())); + // Spawn a task to maintain a local view of the tributaries for whenever recognized_id is + // called tokio::spawn({ let tributaries = tributaries.clone(); let mut set_to_genesis = HashMap::new(); diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 52c1c5cc..d435f186 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -450,6 +450,7 @@ pub async fn handle_p2p_task( let (send, mut recv) = mpsc::unbounded_channel(); channels.write().await.insert(genesis, send); + // Per-Tributary P2P message handler tokio::spawn({ let p2p = p2p.clone(); async move { diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index b5b26314..25bb3f2f 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -1,4 +1,4 @@ -use core::future::Future; +use core::{future::Future, time::Duration}; use std::sync::Arc; use zeroize::Zeroizing; @@ -43,7 +43,6 @@ impl::is_tributary_retired(&raw_db, spec.set()) { + break; + } + // Obtain the next block notification now to prevent obtaining it immediately after // the next block occurs let next_block_notification = tributary.next_block_notification().await; @@ -256,16 +260,18 @@ pub(crate) async fn scan_tributaries_task< ) .await; - next_block_notification - .await - .map_err(|_| "") - .expect("tributary dropped its notifications?"); + // Run either when the notification fires, or every interval of block_time + let _ = tokio::time::timeout( + Duration::from_secs(tributary::Tributary::::block_time().into()), + next_block_notification, + ) + .await; } } }); } - // TODO - Ok(crate::TributaryEvent::TributaryRetired(_)) => todo!(), + // The above loop simply checks the DB every few seconds, voiding the need for this event + Ok(crate::TributaryEvent::TributaryRetired(_)) => {} Err(broadcast::error::RecvError::Lagged(_)) => { panic!("scan_tributaries lagged to handle tributary_event") }