From af9b1ad5f91777dc38cfe7dee4d3b488ef9a4dc9 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Fri, 22 Mar 2024 23:18:53 -0400 Subject: [PATCH] Initial pruning of backlogged consensus messages --- coordinator/tributary/src/lib.rs | 4 +- coordinator/tributary/src/tendermint/mod.rs | 51 ++++++++----------- coordinator/tributary/src/tests/mod.rs | 3 ++ coordinator/tributary/src/tests/tendermint.rs | 28 ++++++++++ 4 files changed, 54 insertions(+), 32 deletions(-) create mode 100644 coordinator/tributary/src/tests/tendermint.rs diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 81b4fc17..99deb588 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -1,5 +1,5 @@ use core::{marker::PhantomData, fmt::Debug}; -use std::{sync::Arc, io}; +use std::{sync::Arc, io, collections::VecDeque}; use async_trait::async_trait; @@ -194,7 +194,7 @@ impl Tributary { ); let blockchain = Arc::new(RwLock::new(blockchain)); - let to_rebroadcast = Arc::new(RwLock::new(vec![])); + let to_rebroadcast = Arc::new(RwLock::new(VecDeque::new())); // Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the // P2P layer let p2p_meta_task_handle = Arc::new( diff --git a/coordinator/tributary/src/tendermint/mod.rs b/coordinator/tributary/src/tendermint/mod.rs index 40d01380..df8f7219 100644 --- a/coordinator/tributary/src/tendermint/mod.rs +++ b/coordinator/tributary/src/tendermint/mod.rs @@ -1,5 +1,8 @@ use core::ops::Deref; -use std::{sync::Arc, collections::HashMap}; +use std::{ + sync::Arc, + collections::{VecDeque, HashMap}, +}; use async_trait::async_trait; @@ -268,7 +271,7 @@ pub struct TendermintNetwork { pub(crate) validators: Arc, pub(crate) blockchain: Arc>>, - pub(crate) to_rebroadcast: Arc>>>, + pub(crate) to_rebroadcast: Arc>>>, pub(crate) p2p: P, } @@ -277,29 +280,6 @@ pub const BLOCK_PROCESSING_TIME: u32 = 999; pub const LATENCY_TIME: u32 = 1667; pub const TARGET_BLOCK_TIME: u32 = BLOCK_PROCESSING_TIME + (3 * LATENCY_TIME); -#[test] -fn assert_target_block_time() { - use serai_db::MemDb; - - #[derive(Clone, Debug)] - pub struct DummyP2p; - - #[async_trait::async_trait] - impl P2p for DummyP2p { - async fn broadcast(&self, _: [u8; 32], _: Vec) { - unimplemented!() - } - } - - // Type paremeters don't matter here since we only need to call the block_time() - // and it only relies on the constants of the trait implementation. block_time() is in seconds, - // TARGET_BLOCK_TIME is in milliseconds. - assert_eq!( - as Network>::block_time(), - TARGET_BLOCK_TIME / 1000 - ) -} - #[async_trait] impl Network for TendermintNetwork { type Db = D; @@ -327,6 +307,9 @@ impl Network for TendermintNetwork } async fn broadcast(&mut self, msg: SignedMessageFor) { + let mut to_broadcast = vec![TENDERMINT_MESSAGE]; + to_broadcast.extend(msg.encode()); + // Since we're broadcasting a Tendermint message, set it to be re-broadcasted every second // until the block it's trying to build is complete // If the P2P layer drops a message before all nodes obtained access, or a node had an @@ -334,10 +317,18 @@ impl Network for TendermintNetwork // This is atrocious if there's no content-based deduplication protocol for messages actively // being gossiped // LibP2p, as used by Serai, is configured to content-based deduplicate - let mut to_broadcast = vec![TENDERMINT_MESSAGE]; - to_broadcast.extend(msg.encode()); - // TODO: Prune messages from old rounds which are no longer necessary - self.to_rebroadcast.write().await.push(to_broadcast.clone()); + { + let mut to_rebroadcast_lock = self.to_rebroadcast.write().await; + to_rebroadcast_lock.push_back(to_broadcast.clone()); + // We should have, ideally, 3 * validators messages within a round + // Therefore, this should keep the most recent 2-rounds + // TODO: This isn't perfect. Each participant should just rebroadcast their latest round of + // messages + while to_rebroadcast_lock.len() > (6 * self.validators.weights.len()) { + to_rebroadcast_lock.pop_front(); + } + } + self.p2p.broadcast(self.genesis, to_broadcast).await } @@ -443,7 +434,7 @@ impl Network for TendermintNetwork } // Since we've added a valid block, clear to_rebroadcast - *self.to_rebroadcast.write().await = vec![]; + *self.to_rebroadcast.write().await = VecDeque::new(); Some(TendermintBlock( self.blockchain.write().await.build_block::(&self.signature_scheme()).serialize(), diff --git a/coordinator/tributary/src/tests/mod.rs b/coordinator/tributary/src/tests/mod.rs index 7c75ac36..dcaa11a5 100644 --- a/coordinator/tributary/src/tests/mod.rs +++ b/coordinator/tributary/src/tests/mod.rs @@ -1,3 +1,6 @@ +#[cfg(test)] +mod tendermint; + mod transaction; pub use transaction::*; diff --git a/coordinator/tributary/src/tests/tendermint.rs b/coordinator/tributary/src/tests/tendermint.rs new file mode 100644 index 00000000..77dfc9e5 --- /dev/null +++ b/coordinator/tributary/src/tests/tendermint.rs @@ -0,0 +1,28 @@ +use tendermint::ext::Network; +use crate::{ + P2p, TendermintTx, + tendermint::{TARGET_BLOCK_TIME, TendermintNetwork}, +}; + +#[test] +fn assert_target_block_time() { + use serai_db::MemDb; + + #[derive(Clone, Debug)] + pub struct DummyP2p; + + #[async_trait::async_trait] + impl P2p for DummyP2p { + async fn broadcast(&self, _: [u8; 32], _: Vec) { + unimplemented!() + } + } + + // Type paremeters don't matter here since we only need to call the block_time() + // and it only relies on the constants of the trait implementation. block_time() is in seconds, + // TARGET_BLOCK_TIME is in milliseconds. + assert_eq!( + as Network>::block_time(), + TARGET_BLOCK_TIME / 1000 + ) +}