Revert coordinator/tributary to fd4f247917

\#560 is causing notable CI failures, with its logs including slashes at 10x
the prior rate.
This commit is contained in:
Luke Parker 2024-04-21 10:14:22 -04:00
parent 5fa7e3d450
commit 933b17aa91
No known key found for this signature in database
8 changed files with 529 additions and 553 deletions

View file

@ -1,5 +1,5 @@
use core::{marker::PhantomData, fmt::Debug}; use core::{marker::PhantomData, fmt::Debug};
use std::{sync::Arc, io}; use std::{sync::Arc, io, collections::VecDeque};
use async_trait::async_trait; use async_trait::async_trait;
@ -154,6 +154,14 @@ pub struct Tributary<D: Db, T: TransactionTrait, P: P2p> {
synced_block: Arc<RwLock<SyncedBlockSender<TendermintNetwork<D, T, P>>>>, synced_block: Arc<RwLock<SyncedBlockSender<TendermintNetwork<D, T, P>>>>,
synced_block_result: Arc<RwLock<SyncedBlockResultReceiver>>, synced_block_result: Arc<RwLock<SyncedBlockResultReceiver>>,
messages: Arc<RwLock<MessageSender<TendermintNetwork<D, T, P>>>>, messages: Arc<RwLock<MessageSender<TendermintNetwork<D, T, P>>>>,
p2p_meta_task_handle: Arc<tokio::task::AbortHandle>,
}
impl<D: Db, T: TransactionTrait, P: P2p> Drop for Tributary<D, T, P> {
fn drop(&mut self) {
self.p2p_meta_task_handle.abort();
}
} }
impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> { impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
@ -185,7 +193,28 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
); );
let blockchain = Arc::new(RwLock::new(blockchain)); let blockchain = Arc::new(RwLock::new(blockchain));
let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p }; let to_rebroadcast = Arc::new(RwLock::new(VecDeque::new()));
// Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the
// P2P layer
let p2p_meta_task_handle = Arc::new(
tokio::spawn({
let to_rebroadcast = to_rebroadcast.clone();
let p2p = p2p.clone();
async move {
loop {
let to_rebroadcast = to_rebroadcast.read().await.clone();
for msg in to_rebroadcast {
p2p.broadcast(genesis, msg).await;
}
tokio::time::sleep(core::time::Duration::from_secs(60)).await;
}
}
})
.abort_handle(),
);
let network =
TendermintNetwork { genesis, signer, validators, blockchain, to_rebroadcast, p2p };
let TendermintHandle { synced_block, synced_block_result, messages, machine } = let TendermintHandle { synced_block, synced_block_result, messages, machine } =
TendermintMachine::new( TendermintMachine::new(
@ -206,6 +235,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
synced_block: Arc::new(RwLock::new(synced_block)), synced_block: Arc::new(RwLock::new(synced_block)),
synced_block_result: Arc::new(RwLock::new(synced_block_result)), synced_block_result: Arc::new(RwLock::new(synced_block_result)),
messages: Arc::new(RwLock::new(messages)), messages: Arc::new(RwLock::new(messages)),
p2p_meta_task_handle,
}) })
} }

View file

@ -1,5 +1,8 @@
use core::ops::Deref; use core::ops::Deref;
use std::{sync::Arc, collections::HashMap}; use std::{
sync::Arc,
collections::{VecDeque, HashMap},
};
use async_trait::async_trait; use async_trait::async_trait;
@ -267,6 +270,8 @@ pub struct TendermintNetwork<D: Db, T: TransactionTrait, P: P2p> {
pub(crate) validators: Arc<Validators>, pub(crate) validators: Arc<Validators>,
pub(crate) blockchain: Arc<RwLock<Blockchain<D, T>>>, pub(crate) blockchain: Arc<RwLock<Blockchain<D, T>>>,
pub(crate) to_rebroadcast: Arc<RwLock<VecDeque<Vec<u8>>>>,
pub(crate) p2p: P, pub(crate) p2p: P,
} }
@ -303,6 +308,26 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
async fn broadcast(&mut self, msg: SignedMessageFor<Self>) { async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
let mut to_broadcast = vec![TENDERMINT_MESSAGE]; let mut to_broadcast = vec![TENDERMINT_MESSAGE];
to_broadcast.extend(msg.encode()); to_broadcast.extend(msg.encode());
// Since we're broadcasting a Tendermint message, set it to be re-broadcasted every second
// until the block it's trying to build is complete
// If the P2P layer drops a message before all nodes obtained access, or a node had an
// intermittent failure, this will ensure reconcilliation
// This is atrocious if there's no content-based deduplication protocol for messages actively
// being gossiped
// LibP2p, as used by Serai, is configured to content-based deduplicate
{
let mut to_rebroadcast_lock = self.to_rebroadcast.write().await;
to_rebroadcast_lock.push_back(to_broadcast.clone());
// We should have, ideally, 3 * validators messages within a round
// Therefore, this should keep the most recent 2-rounds
// TODO: This isn't perfect. Each participant should just rebroadcast their latest round of
// messages
while to_rebroadcast_lock.len() > (6 * self.validators.weights.len()) {
to_rebroadcast_lock.pop_front();
}
}
self.p2p.broadcast(self.genesis, to_broadcast).await self.p2p.broadcast(self.genesis, to_broadcast).await
} }
@ -341,7 +366,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
} }
} }
async fn validate(&self, block: &Self::Block) -> Result<(), TendermintBlockError> { async fn validate(&mut self, block: &Self::Block) -> Result<(), TendermintBlockError> {
let block = let block =
Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?; Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?;
self self
@ -403,6 +428,9 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
} }
} }
// Since we've added a valid block, clear to_rebroadcast
*self.to_rebroadcast.write().await = VecDeque::new();
Some(TendermintBlock( Some(TendermintBlock(
self.blockchain.write().await.build_block::<Self>(&self.signature_scheme()).serialize(), self.blockchain.write().await.build_block::<Self>(&self.signature_scheme()).serialize(),
)) ))

View file

@ -3,6 +3,7 @@ use std::{
collections::{HashSet, HashMap}, collections::{HashSet, HashMap},
}; };
use parity_scale_codec::Encode;
use serai_db::{Get, DbTxn, Db}; use serai_db::{Get, DbTxn, Db};
use crate::{ use crate::{
@ -19,7 +20,7 @@ pub(crate) struct BlockData<N: Network> {
pub(crate) number: BlockNumber, pub(crate) number: BlockNumber,
pub(crate) validator_id: Option<N::ValidatorId>, pub(crate) validator_id: Option<N::ValidatorId>,
pub(crate) our_proposal: Option<N::Block>, pub(crate) proposal: Option<N::Block>,
pub(crate) log: MessageLog<N>, pub(crate) log: MessageLog<N>,
pub(crate) slashes: HashSet<N::ValidatorId>, pub(crate) slashes: HashSet<N::ValidatorId>,
@ -42,7 +43,7 @@ impl<N: Network> BlockData<N> {
weights: Arc<N::Weights>, weights: Arc<N::Weights>,
number: BlockNumber, number: BlockNumber,
validator_id: Option<N::ValidatorId>, validator_id: Option<N::ValidatorId>,
our_proposal: Option<N::Block>, proposal: Option<N::Block>,
) -> BlockData<N> { ) -> BlockData<N> {
BlockData { BlockData {
db, db,
@ -50,7 +51,7 @@ impl<N: Network> BlockData<N> {
number, number,
validator_id, validator_id,
our_proposal, proposal,
log: MessageLog::new(weights), log: MessageLog::new(weights),
slashes: HashSet::new(), slashes: HashSet::new(),
@ -107,17 +108,17 @@ impl<N: Network> BlockData<N> {
self.populate_end_time(round); self.populate_end_time(round);
} }
// L11-13 // 11-13
self.round = Some(RoundData::<N>::new( self.round = Some(RoundData::<N>::new(
round, round,
time.unwrap_or_else(|| self.end_time[&RoundNumber(round.0 - 1)]), time.unwrap_or_else(|| self.end_time[&RoundNumber(round.0 - 1)]),
)); ));
self.end_time.insert(round, self.round().end_time()); self.end_time.insert(round, self.round().end_time());
// L14-21 // 14-21
if Some(proposer) == self.validator_id { if Some(proposer) == self.validator_id {
let (round, block) = self.valid.clone().unzip(); let (round, block) = self.valid.clone().unzip();
block.or_else(|| self.our_proposal.clone()).map(|block| Data::Proposal(round, block)) block.or_else(|| self.proposal.clone()).map(|block| Data::Proposal(round, block))
} else { } else {
self.round_mut().set_timeout(Step::Propose); self.round_mut().set_timeout(Step::Propose);
None None
@ -197,8 +198,8 @@ impl<N: Network> BlockData<N> {
assert!(!new_round); assert!(!new_round);
None?; None?;
} }
// Put that we're sending this message to the DB // Put this message to the DB
txn.put(&msg_key, []); txn.put(&msg_key, res.encode());
txn.commit(); txn.commit();
} }

View file

@ -288,7 +288,7 @@ pub trait Network: Sized + Send + Sync {
async fn slash(&mut self, validator: Self::ValidatorId, slash_event: SlashEvent); async fn slash(&mut self, validator: Self::ValidatorId, slash_event: SlashEvent);
/// Validate a block. /// Validate a block.
async fn validate(&self, block: &Self::Block) -> Result<(), BlockError>; async fn validate(&mut self, block: &Self::Block) -> Result<(), BlockError>;
/// Add a block, returning the proposal for the next one. /// Add a block, returning the proposal for the next one.
/// ///

File diff suppressed because it is too large Load diff

View file

@ -2,7 +2,7 @@ use std::{sync::Arc, collections::HashMap};
use parity_scale_codec::Encode; use parity_scale_codec::Encode;
use crate::{ext::*, RoundNumber, Step, DataFor, SignedMessageFor, Evidence}; use crate::{ext::*, RoundNumber, Step, DataFor, TendermintError, SignedMessageFor, Evidence};
type RoundLog<N> = HashMap<<N as Network>::ValidatorId, HashMap<Step, SignedMessageFor<N>>>; type RoundLog<N> = HashMap<<N as Network>::ValidatorId, HashMap<Step, SignedMessageFor<N>>>;
pub(crate) struct MessageLog<N: Network> { pub(crate) struct MessageLog<N: Network> {
@ -16,7 +16,7 @@ impl<N: Network> MessageLog<N> {
} }
// Returns true if it's a new message // Returns true if it's a new message
pub(crate) fn log(&mut self, signed: SignedMessageFor<N>) -> Result<bool, Evidence> { pub(crate) fn log(&mut self, signed: SignedMessageFor<N>) -> Result<bool, TendermintError<N>> {
let msg = &signed.msg; let msg = &signed.msg;
// Clarity, and safety around default != new edge cases // Clarity, and safety around default != new edge cases
let round = self.log.entry(msg.round).or_insert_with(HashMap::new); let round = self.log.entry(msg.round).or_insert_with(HashMap::new);
@ -30,7 +30,10 @@ impl<N: Network> MessageLog<N> {
target: "tendermint", target: "tendermint",
"Validator sent multiple messages for the same block + round + step" "Validator sent multiple messages for the same block + round + step"
); );
Err(Evidence::ConflictingMessages(existing.encode(), signed.encode()))?; Err(TendermintError::Malicious(
msg.sender,
Some(Evidence::ConflictingMessages(existing.encode(), signed.encode())),
))?;
} }
return Ok(false); return Ok(false);
} }
@ -44,8 +47,7 @@ impl<N: Network> MessageLog<N> {
pub(crate) fn message_instances(&self, round: RoundNumber, data: &DataFor<N>) -> (u64, u64) { pub(crate) fn message_instances(&self, round: RoundNumber, data: &DataFor<N>) -> (u64, u64) {
let mut participating = 0; let mut participating = 0;
let mut weight = 0; let mut weight = 0;
let Some(log) = self.log.get(&round) else { return (0, 0) }; for (participant, msgs) in &self.log[&round] {
for (participant, msgs) in log {
if let Some(msg) = msgs.get(&data.step()) { if let Some(msg) = msgs.get(&data.step()) {
let validator_weight = self.weights.weight(*participant); let validator_weight = self.weights.weight(*participant);
participating += validator_weight; participating += validator_weight;
@ -71,8 +73,7 @@ impl<N: Network> MessageLog<N> {
// Check if a supermajority of nodes have participated on a specific step // Check if a supermajority of nodes have participated on a specific step
pub(crate) fn has_participation(&self, round: RoundNumber, step: Step) -> bool { pub(crate) fn has_participation(&self, round: RoundNumber, step: Step) -> bool {
let mut participating = 0; let mut participating = 0;
let Some(log) = self.log.get(&round) else { return false }; for (participant, msgs) in &self.log[&round] {
for (participant, msgs) in log {
if msgs.get(&step).is_some() { if msgs.get(&step).is_some() {
participating += self.weights.weight(*participant); participating += self.weights.weight(*participant);
} }

View file

@ -57,7 +57,6 @@ impl<N: Network> RoundData<N> {
// Poll all set timeouts, returning the Step whose timeout has just expired // Poll all set timeouts, returning the Step whose timeout has just expired
pub(crate) async fn timeout_future(&self) -> Step { pub(crate) async fn timeout_future(&self) -> Step {
/*
let now = Instant::now(); let now = Instant::now();
log::trace!( log::trace!(
target: "tendermint", target: "tendermint",
@ -65,7 +64,6 @@ impl<N: Network> RoundData<N> {
self.step, self.step,
self.timeouts.iter().map(|(k, v)| (k, v.duration_since(now))).collect::<HashMap<_, _>>() self.timeouts.iter().map(|(k, v)| (k, v.duration_since(now))).collect::<HashMap<_, _>>()
); );
*/
let timeout_future = |step| { let timeout_future = |step| {
let timeout = self.timeouts.get(&step).copied(); let timeout = self.timeouts.get(&step).copied();

View file

@ -145,7 +145,7 @@ impl Network for TestNetwork {
println!("Slash for {id} due to {event:?}"); println!("Slash for {id} due to {event:?}");
} }
async fn validate(&self, block: &TestBlock) -> Result<(), BlockError> { async fn validate(&mut self, block: &TestBlock) -> Result<(), BlockError> {
block.valid block.valid
} }