Send a heartbeat message when a Tributary falls behind

This commit is contained in:
Luke Parker 2023-04-23 18:55:43 -04:00
parent 72633d6421
commit 05b1fc5f05
No known key found for this signature in database
5 changed files with 53 additions and 3 deletions

View file

@ -5,7 +5,7 @@
use std::{ use std::{
sync::Arc, sync::Arc,
time::Duration, time::{SystemTime, Duration},
collections::{VecDeque, HashMap}, collections::{VecDeque, HashMap},
}; };
@ -179,7 +179,37 @@ async fn run<D: Db, Pro: Processor, P: P2p>(
// Sleep for half the block time // Sleep for half the block time
// TODO: Should we define a notification system for when a new block occurs? // TODO: Should we define a notification system for when a new block occurs?
sleep(Duration::from_secs(Tributary::<D, Transaction, P>::block_time() / 2)).await; sleep(Duration::from_secs((Tributary::<D, Transaction, P>::block_time() / 2).into()))
.await;
}
});
}
// If a Tributary has fallen behind, trigger syncing
{
let p2p = p2p.clone();
let tributaries = tributaries.clone();
tokio::spawn(async move {
let ten_blocks_of_time =
Duration::from_secs((Tributary::<D, Transaction, P>::block_time() * 10).into());
loop {
for ActiveTributary { spec: _, tributary } in tributaries.read().await.values() {
let tributary = tributary.read().await;
let tip = tributary.tip();
let block_time = SystemTime::UNIX_EPOCH +
Duration::from_secs(tributary.time_of_block(&tip).unwrap_or(0));
// Only trigger syncing if the block is more than a minute behind
if SystemTime::now() > (block_time + Duration::from_secs(60)) {
log::warn!("last known tributary block was over a minute ago");
P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), tip.to_vec())
.await;
}
}
// Only check once every 10 blocks of time
sleep(ten_blocks_of_time).await;
} }
}); });
} }
@ -204,6 +234,9 @@ async fn run<D: Db, Pro: Processor, P: P2p>(
P2p::broadcast(&p2p, msg.kind, msg.msg).await; P2p::broadcast(&p2p, msg.kind, msg.msg).await;
} }
} }
// TODO: Respond with the missing block, if there are any
P2pMessageKind::Heartbeat(genesis) => todo!(),
} }
} }
}); });

View file

@ -12,6 +12,7 @@ pub use tributary::P2p as TributaryP2p;
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum P2pMessageKind { pub enum P2pMessageKind {
Tributary([u8; 32]), Tributary([u8; 32]),
Heartbeat([u8; 32]),
} }
impl P2pMessageKind { impl P2pMessageKind {
@ -22,6 +23,11 @@ impl P2pMessageKind {
res.extend(genesis); res.extend(genesis);
res res
} }
P2pMessageKind::Heartbeat(genesis) => {
let mut res = vec![1];
res.extend(genesis);
res
}
} }
} }
@ -34,6 +40,11 @@ impl P2pMessageKind {
reader.read_exact(&mut genesis).ok()?; reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Tributary(genesis) P2pMessageKind::Tributary(genesis)
}), }),
1 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Heartbeat(genesis)
}),
_ => None, _ => None,
} }
} }

View file

@ -103,6 +103,7 @@ pub async fn run_tributaries(
p2p.broadcast(msg.kind, msg.msg).await; p2p.broadcast(msg.kind, msg.msg).await;
} }
} }
_ => panic!("unexpected p2p message found"),
} }
} }
} }
@ -170,6 +171,7 @@ async fn tributary_test() {
assert_eq!(genesis, tributary.genesis()); assert_eq!(genesis, tributary.genesis());
tributary.handle_message(&msg.msg).await; tributary.handle_message(&msg.msg).await;
} }
_ => panic!("unexpected p2p message found"),
} }
} }
} }
@ -195,6 +197,7 @@ async fn tributary_test() {
assert_eq!(genesis, tributary.genesis()); assert_eq!(genesis, tributary.genesis());
tributary.handle_message(&msg.msg).await; tributary.handle_message(&msg.msg).await;
} }
_ => panic!("unexpected p2p message found"),
} }
} }
} }

View file

@ -143,6 +143,9 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
pub fn block(&self, hash: &[u8; 32]) -> Option<Block<T>> { pub fn block(&self, hash: &[u8; 32]) -> Option<Block<T>> {
self.network.blockchain.read().unwrap().block(hash) self.network.blockchain.read().unwrap().block(hash)
} }
pub fn time_of_block(&self, hash: &[u8; 32]) -> Option<u64> {
self.network.blockchain.read().unwrap().commit(hash).map(|commit| Commit::<Validators>::decode(&mut commit.as_ref()).unwrap().end_time)
}
pub fn commit(&self, hash: &[u8; 32]) -> Option<Vec<u8>> { pub fn commit(&self, hash: &[u8; 32]) -> Option<Vec<u8>> {
self.network.blockchain.read().unwrap().commit(hash) self.network.blockchain.read().unwrap().commit(hash)
} }

View file

@ -310,7 +310,7 @@ impl<D: Db, T: Transaction, P: P2p> Network for TendermintNetwork<D, T, P> {
hex::encode(hash), hex::encode(hash),
hex::encode(self.genesis) hex::encode(self.genesis)
); );
sleep(Duration::from_secs(Tendermint::<D, T, P>::block_time())).await; sleep(Duration::from_secs(Self::block_time().into())).await;
} }
_ => return invalid_block(), _ => return invalid_block(),
} }