mirror of
https://github.com/serai-dex/serai.git
synced 2025-01-09 12:29:27 +00:00
Regularly rebroadcast consensus messages to ensure presence even if dropped from the P2P layer
Attempts to fix #342, #382.
This commit is contained in:
parent
15edea1389
commit
b0fcdd3367
2 changed files with 40 additions and 1 deletions
|
@ -127,6 +127,11 @@ pub trait ReadWrite: Sized {
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait P2p: 'static + Send + Sync + Clone + Debug {
|
pub trait P2p: 'static + Send + Sync + Clone + Debug {
|
||||||
|
/// Broadcast a message to all other members of the Tributary with the specified genesis.
|
||||||
|
///
|
||||||
|
/// The Tributary will re-broadcast consensus messages on a fixed interval to ensure they aren't
|
||||||
|
/// prematurely dropped from the P2P layer. THe P2P layer SHOULD perform content-based
|
||||||
|
/// deduplication to ensure a sane amount of load.
|
||||||
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>);
|
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -178,7 +183,25 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
|
||||||
);
|
);
|
||||||
let blockchain = Arc::new(RwLock::new(blockchain));
|
let blockchain = Arc::new(RwLock::new(blockchain));
|
||||||
|
|
||||||
let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p };
|
let to_rebroadcast = Arc::new(RwLock::new(vec![]));
|
||||||
|
// Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the
|
||||||
|
// P2P layer
|
||||||
|
tokio::spawn({
|
||||||
|
let to_rebroadcast = to_rebroadcast.clone();
|
||||||
|
let p2p = p2p.clone();
|
||||||
|
async move {
|
||||||
|
loop {
|
||||||
|
let to_rebroadcast = to_rebroadcast.read().await.clone();
|
||||||
|
for msg in to_rebroadcast {
|
||||||
|
p2p.broadcast(genesis, msg).await;
|
||||||
|
}
|
||||||
|
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let network =
|
||||||
|
TendermintNetwork { genesis, signer, validators, blockchain, to_rebroadcast, p2p };
|
||||||
|
|
||||||
let TendermintHandle { synced_block, synced_block_result, messages, machine } =
|
let TendermintHandle { synced_block, synced_block_result, messages, machine } =
|
||||||
TendermintMachine::new(network.clone(), block_number, start_time, proposal).await;
|
TendermintMachine::new(network.clone(), block_number, start_time, proposal).await;
|
||||||
|
|
|
@ -271,6 +271,8 @@ pub struct TendermintNetwork<D: Db, T: TransactionTrait, P: P2p> {
|
||||||
pub(crate) validators: Arc<Validators>,
|
pub(crate) validators: Arc<Validators>,
|
||||||
pub(crate) blockchain: Arc<RwLock<Blockchain<D, T>>>,
|
pub(crate) blockchain: Arc<RwLock<Blockchain<D, T>>>,
|
||||||
|
|
||||||
|
pub(crate) to_rebroadcast: Arc<RwLock<Vec<Vec<u8>>>>,
|
||||||
|
|
||||||
pub(crate) p2p: P,
|
pub(crate) p2p: P,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -304,8 +306,19 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
|
async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
|
||||||
|
// Since we're broadcasting a Tendermint message, set it to be re-broadcasted every second
|
||||||
|
// until the block it's trying to build is complete
|
||||||
|
// If the P2P layer drops a message before all nodes obtained access, or a node had an
|
||||||
|
// intermittent failure, this will ensure reconcilliation
|
||||||
|
// Resolves halts caused by timing discrepancies, which technically are violations of
|
||||||
|
// Tendermint as a BFT protocol, and shouldn't occur yet have in low-powered testing
|
||||||
|
// environments
|
||||||
|
// This is atrocious if there's no content-based deduplication protocol for messages actively
|
||||||
|
// being gossiped
|
||||||
|
// LibP2p, as used by Serai, is configured to content-based deduplicate
|
||||||
let mut to_broadcast = vec![TENDERMINT_MESSAGE];
|
let mut to_broadcast = vec![TENDERMINT_MESSAGE];
|
||||||
to_broadcast.extend(msg.encode());
|
to_broadcast.extend(msg.encode());
|
||||||
|
self.to_rebroadcast.write().await.push(to_broadcast.clone());
|
||||||
self.p2p.broadcast(self.genesis, to_broadcast).await
|
self.p2p.broadcast(self.genesis, to_broadcast).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -407,6 +420,9 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Since we've added a valid block, clear to_rebroadcast
|
||||||
|
*self.to_rebroadcast.write().await = vec![];
|
||||||
|
|
||||||
Some(TendermintBlock(
|
Some(TendermintBlock(
|
||||||
self.blockchain.write().await.build_block::<Self>(self.signature_scheme()).serialize(),
|
self.blockchain.write().await.build_block::<Self>(self.signature_scheme()).serialize(),
|
||||||
))
|
))
|
||||||
|
|
Loading…
Reference in a new issue