From 05b1fc5f0511d101bac99a0d270b705856044e4e Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 23 Apr 2023 18:55:43 -0400 Subject: [PATCH] Send a heartbeat message when a Tributary falls behind --- coordinator/src/main.rs | 37 ++++++++++++++++++++++-- coordinator/src/p2p.rs | 11 +++++++ coordinator/src/tests/tributary/chain.rs | 3 ++ coordinator/tributary/src/lib.rs | 3 ++ coordinator/tributary/src/tendermint.rs | 2 +- 5 files changed, 53 insertions(+), 3 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index cc5a0b7b..c5988709 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -5,7 +5,7 @@ use std::{ sync::Arc, - time::Duration, + time::{SystemTime, Duration}, collections::{VecDeque, HashMap}, }; @@ -179,7 +179,37 @@ async fn run( // Sleep for half the block time // TODO: Should we define a notification system for when a new block occurs? - sleep(Duration::from_secs(Tributary::::block_time() / 2)).await; + sleep(Duration::from_secs((Tributary::::block_time() / 2).into())) + .await; + } + }); + } + + // If a Tributary has fallen behind, trigger syncing + { + let p2p = p2p.clone(); + let tributaries = tributaries.clone(); + tokio::spawn(async move { + let ten_blocks_of_time = + Duration::from_secs((Tributary::::block_time() * 10).into()); + + loop { + for ActiveTributary { spec: _, tributary } in tributaries.read().await.values() { + let tributary = tributary.read().await; + let tip = tributary.tip(); + let block_time = SystemTime::UNIX_EPOCH + + Duration::from_secs(tributary.time_of_block(&tip).unwrap_or(0)); + + // Only trigger syncing if the block is more than a minute behind + if SystemTime::now() > (block_time + Duration::from_secs(60)) { + log::warn!("last known tributary block was over a minute ago"); + P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), tip.to_vec()) + .await; + } + } + + // Only check once every 10 blocks of time + sleep(ten_blocks_of_time).await; } }); } @@ -204,6 +234,9 @@ async fn run( P2p::broadcast(&p2p, msg.kind, msg.msg).await; } } + + // TODO: Respond with the missing block, if there are any + P2pMessageKind::Heartbeat(genesis) => todo!(), } } }); diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 85f1f909..3d78d9d4 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -12,6 +12,7 @@ pub use tributary::P2p as TributaryP2p; #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] pub enum P2pMessageKind { Tributary([u8; 32]), + Heartbeat([u8; 32]), } impl P2pMessageKind { @@ -22,6 +23,11 @@ impl P2pMessageKind { res.extend(genesis); res } + P2pMessageKind::Heartbeat(genesis) => { + let mut res = vec![1]; + res.extend(genesis); + res + } } } @@ -34,6 +40,11 @@ impl P2pMessageKind { reader.read_exact(&mut genesis).ok()?; P2pMessageKind::Tributary(genesis) }), + 1 => Some({ + let mut genesis = [0; 32]; + reader.read_exact(&mut genesis).ok()?; + P2pMessageKind::Heartbeat(genesis) + }), _ => None, } } diff --git a/coordinator/src/tests/tributary/chain.rs b/coordinator/src/tests/tributary/chain.rs index e355d265..429351e9 100644 --- a/coordinator/src/tests/tributary/chain.rs +++ b/coordinator/src/tests/tributary/chain.rs @@ -103,6 +103,7 @@ pub async fn run_tributaries( p2p.broadcast(msg.kind, msg.msg).await; } } + _ => panic!("unexpected p2p message found"), } } } @@ -170,6 +171,7 @@ async fn tributary_test() { assert_eq!(genesis, tributary.genesis()); tributary.handle_message(&msg.msg).await; } + _ => panic!("unexpected p2p message found"), } } } @@ -195,6 +197,7 @@ async fn tributary_test() { assert_eq!(genesis, tributary.genesis()); tributary.handle_message(&msg.msg).await; } + _ => panic!("unexpected p2p message found"), } } } diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index ae47fdd9..acb31da3 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -143,6 +143,9 @@ impl Tributary { pub fn block(&self, hash: &[u8; 32]) -> Option> { self.network.blockchain.read().unwrap().block(hash) } + pub fn time_of_block(&self, hash: &[u8; 32]) -> Option { + self.network.blockchain.read().unwrap().commit(hash).map(|commit| Commit::::decode(&mut commit.as_ref()).unwrap().end_time) + } pub fn commit(&self, hash: &[u8; 32]) -> Option> { self.network.blockchain.read().unwrap().commit(hash) } diff --git a/coordinator/tributary/src/tendermint.rs b/coordinator/tributary/src/tendermint.rs index 9e7eefbd..2754d1a2 100644 --- a/coordinator/tributary/src/tendermint.rs +++ b/coordinator/tributary/src/tendermint.rs @@ -310,7 +310,7 @@ impl Network for TendermintNetwork { hex::encode(hash), hex::encode(self.genesis) ); - sleep(Duration::from_secs(Tendermint::::block_time())).await; + sleep(Duration::from_secs(Self::block_time().into())).await; } _ => return invalid_block(), }