mirror of
https://github.com/serai-dex/serai.git
synced 2025-01-03 09:29:46 +00:00
Initial pruning of backlogged consensus messages
This commit is contained in:
parent
e5afcda76b
commit
af9b1ad5f9
4 changed files with 54 additions and 32 deletions
|
@ -1,5 +1,5 @@
|
||||||
use core::{marker::PhantomData, fmt::Debug};
|
use core::{marker::PhantomData, fmt::Debug};
|
||||||
use std::{sync::Arc, io};
|
use std::{sync::Arc, io, collections::VecDeque};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
@ -194,7 +194,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
|
||||||
);
|
);
|
||||||
let blockchain = Arc::new(RwLock::new(blockchain));
|
let blockchain = Arc::new(RwLock::new(blockchain));
|
||||||
|
|
||||||
let to_rebroadcast = Arc::new(RwLock::new(vec![]));
|
let to_rebroadcast = Arc::new(RwLock::new(VecDeque::new()));
|
||||||
// Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the
|
// Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the
|
||||||
// P2P layer
|
// P2P layer
|
||||||
let p2p_meta_task_handle = Arc::new(
|
let p2p_meta_task_handle = Arc::new(
|
||||||
|
|
|
@ -1,5 +1,8 @@
|
||||||
use core::ops::Deref;
|
use core::ops::Deref;
|
||||||
use std::{sync::Arc, collections::HashMap};
|
use std::{
|
||||||
|
sync::Arc,
|
||||||
|
collections::{VecDeque, HashMap},
|
||||||
|
};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
@ -268,7 +271,7 @@ pub struct TendermintNetwork<D: Db, T: TransactionTrait, P: P2p> {
|
||||||
pub(crate) validators: Arc<Validators>,
|
pub(crate) validators: Arc<Validators>,
|
||||||
pub(crate) blockchain: Arc<RwLock<Blockchain<D, T>>>,
|
pub(crate) blockchain: Arc<RwLock<Blockchain<D, T>>>,
|
||||||
|
|
||||||
pub(crate) to_rebroadcast: Arc<RwLock<Vec<Vec<u8>>>>,
|
pub(crate) to_rebroadcast: Arc<RwLock<VecDeque<Vec<u8>>>>,
|
||||||
|
|
||||||
pub(crate) p2p: P,
|
pub(crate) p2p: P,
|
||||||
}
|
}
|
||||||
|
@ -277,29 +280,6 @@ pub const BLOCK_PROCESSING_TIME: u32 = 999;
|
||||||
pub const LATENCY_TIME: u32 = 1667;
|
pub const LATENCY_TIME: u32 = 1667;
|
||||||
pub const TARGET_BLOCK_TIME: u32 = BLOCK_PROCESSING_TIME + (3 * LATENCY_TIME);
|
pub const TARGET_BLOCK_TIME: u32 = BLOCK_PROCESSING_TIME + (3 * LATENCY_TIME);
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn assert_target_block_time() {
|
|
||||||
use serai_db::MemDb;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct DummyP2p;
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl P2p for DummyP2p {
|
|
||||||
async fn broadcast(&self, _: [u8; 32], _: Vec<u8>) {
|
|
||||||
unimplemented!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Type paremeters don't matter here since we only need to call the block_time()
|
|
||||||
// and it only relies on the constants of the trait implementation. block_time() is in seconds,
|
|
||||||
// TARGET_BLOCK_TIME is in milliseconds.
|
|
||||||
assert_eq!(
|
|
||||||
<TendermintNetwork<MemDb, TendermintTx, DummyP2p> as Network>::block_time(),
|
|
||||||
TARGET_BLOCK_TIME / 1000
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P> {
|
impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P> {
|
||||||
type Db = D;
|
type Db = D;
|
||||||
|
@ -327,6 +307,9 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
|
async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
|
||||||
|
let mut to_broadcast = vec![TENDERMINT_MESSAGE];
|
||||||
|
to_broadcast.extend(msg.encode());
|
||||||
|
|
||||||
// Since we're broadcasting a Tendermint message, set it to be re-broadcasted every second
|
// Since we're broadcasting a Tendermint message, set it to be re-broadcasted every second
|
||||||
// until the block it's trying to build is complete
|
// until the block it's trying to build is complete
|
||||||
// If the P2P layer drops a message before all nodes obtained access, or a node had an
|
// If the P2P layer drops a message before all nodes obtained access, or a node had an
|
||||||
|
@ -334,10 +317,18 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
|
||||||
// This is atrocious if there's no content-based deduplication protocol for messages actively
|
// This is atrocious if there's no content-based deduplication protocol for messages actively
|
||||||
// being gossiped
|
// being gossiped
|
||||||
// LibP2p, as used by Serai, is configured to content-based deduplicate
|
// LibP2p, as used by Serai, is configured to content-based deduplicate
|
||||||
let mut to_broadcast = vec![TENDERMINT_MESSAGE];
|
{
|
||||||
to_broadcast.extend(msg.encode());
|
let mut to_rebroadcast_lock = self.to_rebroadcast.write().await;
|
||||||
// TODO: Prune messages from old rounds which are no longer necessary
|
to_rebroadcast_lock.push_back(to_broadcast.clone());
|
||||||
self.to_rebroadcast.write().await.push(to_broadcast.clone());
|
// We should have, ideally, 3 * validators messages within a round
|
||||||
|
// Therefore, this should keep the most recent 2-rounds
|
||||||
|
// TODO: This isn't perfect. Each participant should just rebroadcast their latest round of
|
||||||
|
// messages
|
||||||
|
while to_rebroadcast_lock.len() > (6 * self.validators.weights.len()) {
|
||||||
|
to_rebroadcast_lock.pop_front();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
self.p2p.broadcast(self.genesis, to_broadcast).await
|
self.p2p.broadcast(self.genesis, to_broadcast).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -443,7 +434,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
|
||||||
}
|
}
|
||||||
|
|
||||||
// Since we've added a valid block, clear to_rebroadcast
|
// Since we've added a valid block, clear to_rebroadcast
|
||||||
*self.to_rebroadcast.write().await = vec![];
|
*self.to_rebroadcast.write().await = VecDeque::new();
|
||||||
|
|
||||||
Some(TendermintBlock(
|
Some(TendermintBlock(
|
||||||
self.blockchain.write().await.build_block::<Self>(&self.signature_scheme()).serialize(),
|
self.blockchain.write().await.build_block::<Self>(&self.signature_scheme()).serialize(),
|
||||||
|
|
|
@ -1,3 +1,6 @@
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tendermint;
|
||||||
|
|
||||||
mod transaction;
|
mod transaction;
|
||||||
pub use transaction::*;
|
pub use transaction::*;
|
||||||
|
|
||||||
|
|
28
coordinator/tributary/src/tests/tendermint.rs
Normal file
28
coordinator/tributary/src/tests/tendermint.rs
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
use tendermint::ext::Network;
|
||||||
|
use crate::{
|
||||||
|
P2p, TendermintTx,
|
||||||
|
tendermint::{TARGET_BLOCK_TIME, TendermintNetwork},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn assert_target_block_time() {
|
||||||
|
use serai_db::MemDb;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct DummyP2p;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl P2p for DummyP2p {
|
||||||
|
async fn broadcast(&self, _: [u8; 32], _: Vec<u8>) {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type paremeters don't matter here since we only need to call the block_time()
|
||||||
|
// and it only relies on the constants of the trait implementation. block_time() is in seconds,
|
||||||
|
// TARGET_BLOCK_TIME is in milliseconds.
|
||||||
|
assert_eq!(
|
||||||
|
<TendermintNetwork<MemDb, TendermintTx, DummyP2p> as Network>::block_time(),
|
||||||
|
TARGET_BLOCK_TIME / 1000
|
||||||
|
)
|
||||||
|
}
|
Loading…
Reference in a new issue