From 493a2224214979ff4614ae3256e9dd8eeb6a1758 Mon Sep 17 00:00:00 2001 From: Luke Parker <lukeparker5132@gmail.com> Date: Wed, 30 Aug 2023 17:57:33 -0400 Subject: [PATCH] Use a timeout in case the JSON-RPC notifications have unexpected behavior --- coordinator/src/main.rs | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 5146e0d0..bea2f984 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -122,12 +122,23 @@ pub async fn scan_substrate<D: Db, Pro: Processors>( loop { // await the next block, yet if our notifier had an error, re-create it { - if substrate_block_notifier - .next() - .await - .and_then(|result| if result.is_err() { None } else { Some(()) }) - .is_none() - { + let Ok(next_block) = + tokio::time::timeout(Duration::from_secs(60), substrate_block_notifier.next()).await + else { + // Timed out, which may be because Serai isn't finalizing or may be some issue with the + // notifier + if serai.get_latest_block().await.map(|block| block.number()).ok() == + Some(next_substrate_block.saturating_sub(1)) + { + log::info!("serai hasn't finalized a block in the last 60s..."); + } else { + substrate_block_notifier = new_substrate_block_notifier().await; + } + continue; + }; + + // next_block is a Option<Result> + if next_block.and_then(Result::ok).is_none() { substrate_block_notifier = new_substrate_block_notifier().await; continue; } @@ -249,11 +260,11 @@ pub async fn scan_tributaries< } // This is assumed to be some ephemeral error due to the assumed fault-free // creation - // TODO: Differentiate connection errors from invariants + // TODO2: Differentiate connection errors from invariants Err(e) => { // Check if this failed because the keys were already set by someone else if matches!(serai.get_keys(spec.set()).await, Ok(Some(_))) { - log::info!("other party set key pair for {:?}", set); + log::info!("another coordinator set key pair for {:?}", set); break; } @@ -271,7 +282,7 @@ pub async fn scan_tributaries< } // Sleep for half the block time - // TODO2: Should we define a notification system for when a new block occurs? + // TODO2: Define a notification system for when a new block occurs sleep(Duration::from_secs((Tributary::<D, Transaction, P>::block_time() / 2).into())).await; } } @@ -627,7 +638,7 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>( "processor sent us a batch for a different network than it was for", ); // TODO: Check this key's key pair's substrate key is authorized to publish batches - // TODO: Check the batch ID is an atomic increment + // TODO: Handle the fact batch n+1 can be signed before batch n let tx = Serai::execute_batch(batch.clone()); loop {