Ensure we don't reply to stale heartbeats

This commit is contained in:
Luke Parker 2024-04-18 01:24:38 -04:00
parent 05e6d81948
commit 2b481ab71e
No known key found for this signature in database

View file

@ -558,6 +558,17 @@ impl TributaryP2p for LibP2p {
} }
} }
fn heartbeat_time_unit<D: Db, P: P2p>() -> u64 {
// Also include the timestamp so LibP2p doesn't flag this as an old message re-circulating
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("system clock is wrong")
.as_secs();
// Divide by the block time so if multiple parties send a Heartbeat, they're more likely to
// overlap
timestamp / u64::from(Tributary::<D, Transaction, P>::block_time())
}
pub async fn heartbeat_tributaries_task<D: Db, P: P2p>( pub async fn heartbeat_tributaries_task<D: Db, P: P2p>(
p2p: P, p2p: P,
mut tributary_event: broadcast::Receiver<TributaryEvent<D, P>>, mut tributary_event: broadcast::Receiver<TributaryEvent<D, P>>,
@ -592,14 +603,7 @@ pub async fn heartbeat_tributaries_task<D: Db, P: P2p>(
if SystemTime::now() > (block_time + Duration::from_secs(60)) { if SystemTime::now() > (block_time + Duration::from_secs(60)) {
log::warn!("last known tributary block was over a minute ago"); log::warn!("last known tributary block was over a minute ago");
let mut msg = tip.to_vec(); let mut msg = tip.to_vec();
// Also include the timestamp so LibP2p doesn't flag this as an old message re-circulating let time_unit = heartbeat_time_unit::<D, P>();
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("system clock is wrong")
.as_secs();
// Divide by the block time so if multiple parties send a Heartbeat, they're more likely to
// overlap
let time_unit = timestamp / u64::from(Tributary::<D, Transaction, P>::block_time());
msg.extend(time_unit.to_le_bytes()); msg.extend(time_unit.to_le_bytes());
P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), msg).await; P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), msg).await;
} }
@ -666,6 +670,13 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
log::error!("validator sent invalid heartbeat"); log::error!("validator sent invalid heartbeat");
continue; continue;
} }
// Only respond to recent heartbeats
let msg_time_unit = u64::from_le_bytes(msg.msg[32 .. 40].try_into().expect(
"length-checked heartbeat message didn't have 8 bytes for the u64",
));
if heartbeat_time_unit::<D, P>().saturating_sub(msg_time_unit) > 1 {
continue;
}
let p2p = p2p.clone(); let p2p = p2p.clone();
let spec = tributary.spec.clone(); let spec = tributary.spec.clone();
@ -715,7 +726,7 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
to_send.push(next); to_send.push(next);
latest = next; latest = next;
} }
if to_send.len() > 1 { if to_send.len() > 3 {
for next in to_send { for next in to_send {
let mut res = reader.block(&next).unwrap().serialize(); let mut res = reader.block(&next).unwrap().serialize();
res.extend(reader.commit(&next).unwrap()); res.extend(reader.commit(&next).unwrap());