mirror of
https://github.com/serai-dex/serai.git
synced 2024-12-23 03:59:22 +00:00
Rewrite tendermint's message handling loop to much more clearly match the paper (#560)
* Rewrite tendermint's message handling loop to much more clearly match the paper No longer checks relevant branches upon messages, yet all branches upon any state change. This is slower, yet easier to review and likely without one or two rare edge cases. When reviewing, please see page 5 of https://arxiv.org/pdf/1807.04938.pdf. Lines from the specified algorithm can be found in the code by searching for "// L". * Sane rebroadcasting of consensus messages Instead of broadcasting the last n messages on the Tributary side of things, we now have the machine rebroadcast the message tape for the current block. * Only rebroadcast messages which didn't error in some way * Only rebroadcast our own messages for tendermint
This commit is contained in:
parent
fd4f247917
commit
523d2ac911
7 changed files with 555 additions and 528 deletions
|
@ -1,5 +1,5 @@
|
||||||
use core::{marker::PhantomData, fmt::Debug};
|
use core::{marker::PhantomData, fmt::Debug};
|
||||||
use std::{sync::Arc, io, collections::VecDeque};
|
use std::{sync::Arc, io};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
@ -154,14 +154,6 @@ pub struct Tributary<D: Db, T: TransactionTrait, P: P2p> {
|
||||||
synced_block: Arc<RwLock<SyncedBlockSender<TendermintNetwork<D, T, P>>>>,
|
synced_block: Arc<RwLock<SyncedBlockSender<TendermintNetwork<D, T, P>>>>,
|
||||||
synced_block_result: Arc<RwLock<SyncedBlockResultReceiver>>,
|
synced_block_result: Arc<RwLock<SyncedBlockResultReceiver>>,
|
||||||
messages: Arc<RwLock<MessageSender<TendermintNetwork<D, T, P>>>>,
|
messages: Arc<RwLock<MessageSender<TendermintNetwork<D, T, P>>>>,
|
||||||
|
|
||||||
p2p_meta_task_handle: Arc<tokio::task::AbortHandle>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<D: Db, T: TransactionTrait, P: P2p> Drop for Tributary<D, T, P> {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
self.p2p_meta_task_handle.abort();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
|
impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
|
||||||
|
@ -193,28 +185,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
|
||||||
);
|
);
|
||||||
let blockchain = Arc::new(RwLock::new(blockchain));
|
let blockchain = Arc::new(RwLock::new(blockchain));
|
||||||
|
|
||||||
let to_rebroadcast = Arc::new(RwLock::new(VecDeque::new()));
|
let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p };
|
||||||
// Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the
|
|
||||||
// P2P layer
|
|
||||||
let p2p_meta_task_handle = Arc::new(
|
|
||||||
tokio::spawn({
|
|
||||||
let to_rebroadcast = to_rebroadcast.clone();
|
|
||||||
let p2p = p2p.clone();
|
|
||||||
async move {
|
|
||||||
loop {
|
|
||||||
let to_rebroadcast = to_rebroadcast.read().await.clone();
|
|
||||||
for msg in to_rebroadcast {
|
|
||||||
p2p.broadcast(genesis, msg).await;
|
|
||||||
}
|
|
||||||
tokio::time::sleep(core::time::Duration::from_secs(60)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.abort_handle(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let network =
|
|
||||||
TendermintNetwork { genesis, signer, validators, blockchain, to_rebroadcast, p2p };
|
|
||||||
|
|
||||||
let TendermintHandle { synced_block, synced_block_result, messages, machine } =
|
let TendermintHandle { synced_block, synced_block_result, messages, machine } =
|
||||||
TendermintMachine::new(
|
TendermintMachine::new(
|
||||||
|
@ -235,7 +206,6 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
|
||||||
synced_block: Arc::new(RwLock::new(synced_block)),
|
synced_block: Arc::new(RwLock::new(synced_block)),
|
||||||
synced_block_result: Arc::new(RwLock::new(synced_block_result)),
|
synced_block_result: Arc::new(RwLock::new(synced_block_result)),
|
||||||
messages: Arc::new(RwLock::new(messages)),
|
messages: Arc::new(RwLock::new(messages)),
|
||||||
p2p_meta_task_handle,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,5 @@
|
||||||
use core::ops::Deref;
|
use core::ops::Deref;
|
||||||
use std::{
|
use std::{sync::Arc, collections::HashMap};
|
||||||
sync::Arc,
|
|
||||||
collections::{VecDeque, HashMap},
|
|
||||||
};
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
@ -270,8 +267,6 @@ pub struct TendermintNetwork<D: Db, T: TransactionTrait, P: P2p> {
|
||||||
pub(crate) validators: Arc<Validators>,
|
pub(crate) validators: Arc<Validators>,
|
||||||
pub(crate) blockchain: Arc<RwLock<Blockchain<D, T>>>,
|
pub(crate) blockchain: Arc<RwLock<Blockchain<D, T>>>,
|
||||||
|
|
||||||
pub(crate) to_rebroadcast: Arc<RwLock<VecDeque<Vec<u8>>>>,
|
|
||||||
|
|
||||||
pub(crate) p2p: P,
|
pub(crate) p2p: P,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -308,26 +303,6 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
|
||||||
async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
|
async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
|
||||||
let mut to_broadcast = vec![TENDERMINT_MESSAGE];
|
let mut to_broadcast = vec![TENDERMINT_MESSAGE];
|
||||||
to_broadcast.extend(msg.encode());
|
to_broadcast.extend(msg.encode());
|
||||||
|
|
||||||
// Since we're broadcasting a Tendermint message, set it to be re-broadcasted every second
|
|
||||||
// until the block it's trying to build is complete
|
|
||||||
// If the P2P layer drops a message before all nodes obtained access, or a node had an
|
|
||||||
// intermittent failure, this will ensure reconcilliation
|
|
||||||
// This is atrocious if there's no content-based deduplication protocol for messages actively
|
|
||||||
// being gossiped
|
|
||||||
// LibP2p, as used by Serai, is configured to content-based deduplicate
|
|
||||||
{
|
|
||||||
let mut to_rebroadcast_lock = self.to_rebroadcast.write().await;
|
|
||||||
to_rebroadcast_lock.push_back(to_broadcast.clone());
|
|
||||||
// We should have, ideally, 3 * validators messages within a round
|
|
||||||
// Therefore, this should keep the most recent 2-rounds
|
|
||||||
// TODO: This isn't perfect. Each participant should just rebroadcast their latest round of
|
|
||||||
// messages
|
|
||||||
while to_rebroadcast_lock.len() > (6 * self.validators.weights.len()) {
|
|
||||||
to_rebroadcast_lock.pop_front();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self.p2p.broadcast(self.genesis, to_broadcast).await
|
self.p2p.broadcast(self.genesis, to_broadcast).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -366,7 +341,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn validate(&mut self, block: &Self::Block) -> Result<(), TendermintBlockError> {
|
async fn validate(&self, block: &Self::Block) -> Result<(), TendermintBlockError> {
|
||||||
let block =
|
let block =
|
||||||
Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?;
|
Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?;
|
||||||
self
|
self
|
||||||
|
@ -428,9 +403,6 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Since we've added a valid block, clear to_rebroadcast
|
|
||||||
*self.to_rebroadcast.write().await = VecDeque::new();
|
|
||||||
|
|
||||||
Some(TendermintBlock(
|
Some(TendermintBlock(
|
||||||
self.blockchain.write().await.build_block::<Self>(&self.signature_scheme()).serialize(),
|
self.blockchain.write().await.build_block::<Self>(&self.signature_scheme()).serialize(),
|
||||||
))
|
))
|
||||||
|
|
|
@ -3,7 +3,6 @@ use std::{
|
||||||
collections::{HashSet, HashMap},
|
collections::{HashSet, HashMap},
|
||||||
};
|
};
|
||||||
|
|
||||||
use parity_scale_codec::Encode;
|
|
||||||
use serai_db::{Get, DbTxn, Db};
|
use serai_db::{Get, DbTxn, Db};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
@ -20,7 +19,7 @@ pub(crate) struct BlockData<N: Network> {
|
||||||
|
|
||||||
pub(crate) number: BlockNumber,
|
pub(crate) number: BlockNumber,
|
||||||
pub(crate) validator_id: Option<N::ValidatorId>,
|
pub(crate) validator_id: Option<N::ValidatorId>,
|
||||||
pub(crate) proposal: Option<N::Block>,
|
pub(crate) our_proposal: Option<N::Block>,
|
||||||
|
|
||||||
pub(crate) log: MessageLog<N>,
|
pub(crate) log: MessageLog<N>,
|
||||||
pub(crate) slashes: HashSet<N::ValidatorId>,
|
pub(crate) slashes: HashSet<N::ValidatorId>,
|
||||||
|
@ -43,7 +42,7 @@ impl<N: Network> BlockData<N> {
|
||||||
weights: Arc<N::Weights>,
|
weights: Arc<N::Weights>,
|
||||||
number: BlockNumber,
|
number: BlockNumber,
|
||||||
validator_id: Option<N::ValidatorId>,
|
validator_id: Option<N::ValidatorId>,
|
||||||
proposal: Option<N::Block>,
|
our_proposal: Option<N::Block>,
|
||||||
) -> BlockData<N> {
|
) -> BlockData<N> {
|
||||||
BlockData {
|
BlockData {
|
||||||
db,
|
db,
|
||||||
|
@ -51,7 +50,7 @@ impl<N: Network> BlockData<N> {
|
||||||
|
|
||||||
number,
|
number,
|
||||||
validator_id,
|
validator_id,
|
||||||
proposal,
|
our_proposal,
|
||||||
|
|
||||||
log: MessageLog::new(weights),
|
log: MessageLog::new(weights),
|
||||||
slashes: HashSet::new(),
|
slashes: HashSet::new(),
|
||||||
|
@ -108,17 +107,17 @@ impl<N: Network> BlockData<N> {
|
||||||
self.populate_end_time(round);
|
self.populate_end_time(round);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 11-13
|
// L11-13
|
||||||
self.round = Some(RoundData::<N>::new(
|
self.round = Some(RoundData::<N>::new(
|
||||||
round,
|
round,
|
||||||
time.unwrap_or_else(|| self.end_time[&RoundNumber(round.0 - 1)]),
|
time.unwrap_or_else(|| self.end_time[&RoundNumber(round.0 - 1)]),
|
||||||
));
|
));
|
||||||
self.end_time.insert(round, self.round().end_time());
|
self.end_time.insert(round, self.round().end_time());
|
||||||
|
|
||||||
// 14-21
|
// L14-21
|
||||||
if Some(proposer) == self.validator_id {
|
if Some(proposer) == self.validator_id {
|
||||||
let (round, block) = self.valid.clone().unzip();
|
let (round, block) = self.valid.clone().unzip();
|
||||||
block.or_else(|| self.proposal.clone()).map(|block| Data::Proposal(round, block))
|
block.or_else(|| self.our_proposal.clone()).map(|block| Data::Proposal(round, block))
|
||||||
} else {
|
} else {
|
||||||
self.round_mut().set_timeout(Step::Propose);
|
self.round_mut().set_timeout(Step::Propose);
|
||||||
None
|
None
|
||||||
|
@ -198,8 +197,8 @@ impl<N: Network> BlockData<N> {
|
||||||
assert!(!new_round);
|
assert!(!new_round);
|
||||||
None?;
|
None?;
|
||||||
}
|
}
|
||||||
// Put this message to the DB
|
// Put that we're sending this message to the DB
|
||||||
txn.put(&msg_key, res.encode());
|
txn.put(&msg_key, []);
|
||||||
|
|
||||||
txn.commit();
|
txn.commit();
|
||||||
}
|
}
|
||||||
|
|
|
@ -288,7 +288,7 @@ pub trait Network: Sized + Send + Sync {
|
||||||
async fn slash(&mut self, validator: Self::ValidatorId, slash_event: SlashEvent);
|
async fn slash(&mut self, validator: Self::ValidatorId, slash_event: SlashEvent);
|
||||||
|
|
||||||
/// Validate a block.
|
/// Validate a block.
|
||||||
async fn validate(&mut self, block: &Self::Block) -> Result<(), BlockError>;
|
async fn validate(&self, block: &Self::Block) -> Result<(), BlockError>;
|
||||||
|
|
||||||
/// Add a block, returning the proposal for the next one.
|
/// Add a block, returning the proposal for the next one.
|
||||||
///
|
///
|
||||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -2,7 +2,7 @@ use std::{sync::Arc, collections::HashMap};
|
||||||
|
|
||||||
use parity_scale_codec::Encode;
|
use parity_scale_codec::Encode;
|
||||||
|
|
||||||
use crate::{ext::*, RoundNumber, Step, DataFor, TendermintError, SignedMessageFor, Evidence};
|
use crate::{ext::*, RoundNumber, Step, DataFor, SignedMessageFor, Evidence};
|
||||||
|
|
||||||
type RoundLog<N> = HashMap<<N as Network>::ValidatorId, HashMap<Step, SignedMessageFor<N>>>;
|
type RoundLog<N> = HashMap<<N as Network>::ValidatorId, HashMap<Step, SignedMessageFor<N>>>;
|
||||||
pub(crate) struct MessageLog<N: Network> {
|
pub(crate) struct MessageLog<N: Network> {
|
||||||
|
@ -16,7 +16,7 @@ impl<N: Network> MessageLog<N> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns true if it's a new message
|
// Returns true if it's a new message
|
||||||
pub(crate) fn log(&mut self, signed: SignedMessageFor<N>) -> Result<bool, TendermintError<N>> {
|
pub(crate) fn log(&mut self, signed: SignedMessageFor<N>) -> Result<bool, Evidence> {
|
||||||
let msg = &signed.msg;
|
let msg = &signed.msg;
|
||||||
// Clarity, and safety around default != new edge cases
|
// Clarity, and safety around default != new edge cases
|
||||||
let round = self.log.entry(msg.round).or_insert_with(HashMap::new);
|
let round = self.log.entry(msg.round).or_insert_with(HashMap::new);
|
||||||
|
@ -30,10 +30,7 @@ impl<N: Network> MessageLog<N> {
|
||||||
target: "tendermint",
|
target: "tendermint",
|
||||||
"Validator sent multiple messages for the same block + round + step"
|
"Validator sent multiple messages for the same block + round + step"
|
||||||
);
|
);
|
||||||
Err(TendermintError::Malicious(
|
Err(Evidence::ConflictingMessages(existing.encode(), signed.encode()))?;
|
||||||
msg.sender,
|
|
||||||
Some(Evidence::ConflictingMessages(existing.encode(), signed.encode())),
|
|
||||||
))?;
|
|
||||||
}
|
}
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
@ -47,7 +44,8 @@ impl<N: Network> MessageLog<N> {
|
||||||
pub(crate) fn message_instances(&self, round: RoundNumber, data: &DataFor<N>) -> (u64, u64) {
|
pub(crate) fn message_instances(&self, round: RoundNumber, data: &DataFor<N>) -> (u64, u64) {
|
||||||
let mut participating = 0;
|
let mut participating = 0;
|
||||||
let mut weight = 0;
|
let mut weight = 0;
|
||||||
for (participant, msgs) in &self.log[&round] {
|
let Some(log) = self.log.get(&round) else { return (0, 0) };
|
||||||
|
for (participant, msgs) in log {
|
||||||
if let Some(msg) = msgs.get(&data.step()) {
|
if let Some(msg) = msgs.get(&data.step()) {
|
||||||
let validator_weight = self.weights.weight(*participant);
|
let validator_weight = self.weights.weight(*participant);
|
||||||
participating += validator_weight;
|
participating += validator_weight;
|
||||||
|
@ -73,7 +71,8 @@ impl<N: Network> MessageLog<N> {
|
||||||
// Check if a supermajority of nodes have participated on a specific step
|
// Check if a supermajority of nodes have participated on a specific step
|
||||||
pub(crate) fn has_participation(&self, round: RoundNumber, step: Step) -> bool {
|
pub(crate) fn has_participation(&self, round: RoundNumber, step: Step) -> bool {
|
||||||
let mut participating = 0;
|
let mut participating = 0;
|
||||||
for (participant, msgs) in &self.log[&round] {
|
let Some(log) = self.log.get(&round) else { return false };
|
||||||
|
for (participant, msgs) in log {
|
||||||
if msgs.get(&step).is_some() {
|
if msgs.get(&step).is_some() {
|
||||||
participating += self.weights.weight(*participant);
|
participating += self.weights.weight(*participant);
|
||||||
}
|
}
|
||||||
|
|
|
@ -145,7 +145,7 @@ impl Network for TestNetwork {
|
||||||
println!("Slash for {id} due to {event:?}");
|
println!("Slash for {id} due to {event:?}");
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn validate(&mut self, block: &TestBlock) -> Result<(), BlockError> {
|
async fn validate(&self, block: &TestBlock) -> Result<(), BlockError> {
|
||||||
block.valid
|
block.valid
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue