diff --git a/Cargo.lock b/Cargo.lock index 4b392177..7f64474f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8089,6 +8089,7 @@ dependencies = [ "bitcoin", "ciphersuite", "frost-schnorrkel", + "futures", "lazy_static", "modular-frost", "monero-serai", diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 1c49d650..5146e0d0 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -21,6 +21,7 @@ use serai_client::{primitives::NetworkId, Public, Serai}; use message_queue::{Service, client::MessageQueue}; +use futures::stream::StreamExt; use tokio::{sync::RwLock, time::sleep}; use ::tributary::{ @@ -70,7 +71,7 @@ async fn add_tributary( log::info!("adding tributary {:?}", spec.set()); let tributary = Tributary::<_, Transaction, _>::new( - // TODO2: Use a db on a distinct volume + // TODO2: Use a db on a distinct volume to protect against DoS attacks db, spec.genesis(), spec.start_time(), @@ -102,7 +103,36 @@ pub async fn scan_substrate( let mut db = substrate::SubstrateDb::new(db); let mut next_substrate_block = db.next_block(); + let new_substrate_block_notifier = { + let serai = &serai; + move || async move { + loop { + match serai.newly_finalized_block().await { + Ok(sub) => return sub, + Err(e) => { + log::error!("couldn't communicate with serai node: {e}"); + sleep(Duration::from_secs(5)).await; + } + } + } + } + }; + let mut substrate_block_notifier = new_substrate_block_notifier().await; + loop { + // await the next block, yet if our notifier had an error, re-create it + { + if substrate_block_notifier + .next() + .await + .and_then(|result| if result.is_err() { None } else { Some(()) }) + .is_none() + { + substrate_block_notifier = new_substrate_block_notifier().await; + continue; + } + } + match substrate::handle_new_blocks( &mut db, &key, @@ -125,9 +155,7 @@ pub async fn scan_substrate( ) .await { - // TODO2: Should this use a notification system for new blocks? - // Right now it's sleeping for half the block time. - Ok(()) => sleep(Duration::from_secs(3)).await, + Ok(()) => {} Err(e) => { log::error!("couldn't communicate with serai node: {e}"); sleep(Duration::from_secs(5)).await; diff --git a/substrate/client/Cargo.toml b/substrate/client/Cargo.toml index 8bfa3d6a..bd904858 100644 --- a/substrate/client/Cargo.toml +++ b/substrate/client/Cargo.toml @@ -16,6 +16,8 @@ rustdoc-args = ["--cfg", "docsrs"] zeroize = "^1.5" thiserror = { version = "1", optional = true } +futures = "0.3" + scale = { package = "parity-scale-codec", version = "3" } scale-info = { version = "2", optional = true } diff --git a/substrate/client/src/serai/mod.rs b/substrate/client/src/serai/mod.rs index 10ea17ae..d8135f06 100644 --- a/substrate/client/src/serai/mod.rs +++ b/substrate/client/src/serai/mod.rs @@ -1,5 +1,7 @@ use thiserror::Error; +use futures::stream::{Stream, StreamExt}; + use scale::{Encode, Decode, Compact}; mod scale_value; pub(crate) use scale_value::{Value, Composite, scale_value, scale_composite}; @@ -259,6 +261,18 @@ impl Serai { self.get_block(hash.into()).await } + /// A stream which yields whenever new block(s) have been finalized. + pub async fn newly_finalized_block( + &self, + ) -> Result>, SeraiError> { + Ok(self.0.rpc().subscribe_finalized_block_headers().await.map_err(SeraiError::RpcError)?.map( + |next| { + next.map_err(SeraiError::RpcError)?; + Ok(()) + }, + )) + } + pub async fn get_nonce(&self, address: &SeraiAddress) -> Result { self .0