diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 391ecef1..e07faf9a 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -22,7 +22,6 @@ use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorS use message_queue::{Service, client::MessageQueue}; -use futures::stream::StreamExt; use tokio::{ sync::{RwLock, mpsc, broadcast}, time::sleep, @@ -108,89 +107,6 @@ async fn add_tributary( .unwrap(); } -pub async fn scan_substrate( - db: D, - key: Zeroizing<::F>, - processors: Pro, - serai: Arc, - new_tributary_spec: mpsc::UnboundedSender, -) { - log::info!("scanning substrate"); - - let mut db = SubstrateDb::new(db); - let mut next_substrate_block = db.next_block(); - - let new_substrate_block_notifier = { - let serai = &serai; - move || async move { - loop { - match serai.newly_finalized_block().await { - Ok(sub) => return sub, - Err(e) => { - log::error!("couldn't communicate with serai node: {e}"); - sleep(Duration::from_secs(5)).await; - } - } - } - } - }; - let mut substrate_block_notifier = new_substrate_block_notifier().await; - - loop { - // await the next block, yet if our notifier had an error, re-create it - { - let Ok(next_block) = - tokio::time::timeout(Duration::from_secs(60), substrate_block_notifier.next()).await - else { - // Timed out, which may be because Serai isn't finalizing or may be some issue with the - // notifier - if serai.get_latest_block().await.map(|block| block.number()).ok() == - Some(next_substrate_block.saturating_sub(1)) - { - log::info!("serai hasn't finalized a block in the last 60s..."); - } else { - substrate_block_notifier = new_substrate_block_notifier().await; - } - continue; - }; - - // next_block is a Option - if next_block.and_then(Result::ok).is_none() { - substrate_block_notifier = new_substrate_block_notifier().await; - continue; - } - } - - match substrate::handle_new_blocks( - &mut db, - &key, - |db: &mut D, spec: TributarySpec| { - log::info!("creating new tributary for {:?}", spec.set()); - - // Save it to the database - let mut txn = db.txn(); - MainDb::::add_active_tributary(&mut txn, &spec); - txn.commit(); - - // If we reboot before this is read, the fact it was saved to the database means it'll be - // handled on reboot - new_tributary_spec.send(spec).unwrap(); - }, - &processors, - &serai, - &mut next_substrate_block, - ) - .await - { - Ok(()) => {} - Err(e) => { - log::error!("couldn't communicate with serai node: {e}"); - sleep(Duration::from_secs(5)).await; - } - } - } -} - pub async fn heartbeat_tributaries( p2p: P, mut new_tributary: broadcast::Receiver>, @@ -1032,7 +948,7 @@ pub async fn run( } // Handle new Substrate blocks - tokio::spawn(scan_substrate( + tokio::spawn(crate::substrate::scan_task( raw_db.clone(), key.clone(), processors.clone(), @@ -1147,7 +1063,7 @@ pub async fn run( // Handle new blocks for each Tributary { let raw_db = raw_db.clone(); - tokio::spawn(tributary::scanner::scan_tributaries( + tokio::spawn(tributary::scanner::scan_tributaries_task( raw_db, key.clone(), recognized_id, diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index a8e3856a..1ba2e2c7 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -1,5 +1,8 @@ use core::{ops::Deref, time::Duration}; -use std::collections::{HashSet, HashMap}; +use std::{ + sync::Arc, + collections::{HashSet, HashMap}, +}; use zeroize::Zeroizing; @@ -20,7 +23,8 @@ use serai_db::DbTxn; use processor_messages::SubstrateContext; -use tokio::time::sleep; +use futures::stream::StreamExt; +use tokio::{sync::mpsc, time::sleep}; use crate::{ Db, @@ -313,7 +317,7 @@ async fn handle_block( +async fn handle_new_blocks( db: &mut SubstrateDb, key: &Zeroizing<::F>, create_new_tributary: CNT, @@ -407,3 +411,86 @@ pub async fn is_active_set(serai: &Serai, set: ValidatorSet) -> bool { true } + +pub async fn scan_task( + db: D, + key: Zeroizing<::F>, + processors: Pro, + serai: Arc, + new_tributary_spec: mpsc::UnboundedSender, +) { + log::info!("scanning substrate"); + + let mut db = SubstrateDb::new(db); + let mut next_substrate_block = db.next_block(); + + let new_substrate_block_notifier = { + let serai = &serai; + move || async move { + loop { + match serai.newly_finalized_block().await { + Ok(sub) => return sub, + Err(e) => { + log::error!("couldn't communicate with serai node: {e}"); + sleep(Duration::from_secs(5)).await; + } + } + } + } + }; + let mut substrate_block_notifier = new_substrate_block_notifier().await; + + loop { + // await the next block, yet if our notifier had an error, re-create it + { + let Ok(next_block) = + tokio::time::timeout(Duration::from_secs(60), substrate_block_notifier.next()).await + else { + // Timed out, which may be because Serai isn't finalizing or may be some issue with the + // notifier + if serai.get_latest_block().await.map(|block| block.number()).ok() == + Some(next_substrate_block.saturating_sub(1)) + { + log::info!("serai hasn't finalized a block in the last 60s..."); + } else { + substrate_block_notifier = new_substrate_block_notifier().await; + } + continue; + }; + + // next_block is a Option + if next_block.and_then(Result::ok).is_none() { + substrate_block_notifier = new_substrate_block_notifier().await; + continue; + } + } + + match handle_new_blocks( + &mut db, + &key, + |db: &mut D, spec: TributarySpec| { + log::info!("creating new tributary for {:?}", spec.set()); + + // Save it to the database + let mut txn = db.txn(); + crate::MainDb::::add_active_tributary(&mut txn, &spec); + txn.commit(); + + // If we reboot before this is read, the fact it was saved to the database means it'll be + // handled on reboot + new_tributary_spec.send(spec).unwrap(); + }, + &processors, + &serai, + &mut next_substrate_block, + ) + .await + { + Ok(()) => {} + Err(e) => { + log::error!("couldn't communicate with serai node: {e}"); + sleep(Duration::from_secs(5)).await; + } + } + } +} diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index 20cdea6d..bd0268a0 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -163,7 +163,7 @@ pub(crate) async fn handle_new_blocks< } } -pub(crate) async fn scan_tributaries< +pub(crate) async fn scan_tributaries_task< D: Db, Pro: Processors, P: P2p,