From 523d2ac911890322b86186b845093ff9b8033b92 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 21 Apr 2024 05:30:31 -0400 Subject: [PATCH] Rewrite tendermint's message handling loop to much more clearly match the paper (#560) * Rewrite tendermint's message handling loop to much more clearly match the paper No longer checks relevant branches upon messages, yet all branches upon any state change. This is slower, yet easier to review and likely without one or two rare edge cases. When reviewing, please see page 5 of https://arxiv.org/pdf/1807.04938.pdf. Lines from the specified algorithm can be found in the code by searching for "// L". * Sane rebroadcasting of consensus messages Instead of broadcasting the last n messages on the Tributary side of things, we now have the machine rebroadcast the message tape for the current block. * Only rebroadcast messages which didn't error in some way * Only rebroadcast our own messages for tendermint --- coordinator/tributary/src/lib.rs | 34 +- coordinator/tributary/src/tendermint/mod.rs | 32 +- coordinator/tributary/tendermint/src/block.rs | 17 +- coordinator/tributary/tendermint/src/ext.rs | 2 +- coordinator/tributary/tendermint/src/lib.rs | 981 ++++++++++-------- .../tributary/tendermint/src/message_log.rs | 15 +- coordinator/tributary/tendermint/tests/ext.rs | 2 +- 7 files changed, 555 insertions(+), 528 deletions(-) diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index dcf38c68..121ac385 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -1,5 +1,5 @@ use core::{marker::PhantomData, fmt::Debug}; -use std::{sync::Arc, io, collections::VecDeque}; +use std::{sync::Arc, io}; use async_trait::async_trait; @@ -154,14 +154,6 @@ pub struct Tributary { synced_block: Arc>>>, synced_block_result: Arc>, messages: Arc>>>, - - p2p_meta_task_handle: Arc, -} - -impl Drop for Tributary { - fn drop(&mut self) { - self.p2p_meta_task_handle.abort(); - } } impl Tributary { @@ -193,28 +185,7 @@ impl Tributary { ); let blockchain = Arc::new(RwLock::new(blockchain)); - let to_rebroadcast = Arc::new(RwLock::new(VecDeque::new())); - // Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the - // P2P layer - let p2p_meta_task_handle = Arc::new( - tokio::spawn({ - let to_rebroadcast = to_rebroadcast.clone(); - let p2p = p2p.clone(); - async move { - loop { - let to_rebroadcast = to_rebroadcast.read().await.clone(); - for msg in to_rebroadcast { - p2p.broadcast(genesis, msg).await; - } - tokio::time::sleep(core::time::Duration::from_secs(60)).await; - } - } - }) - .abort_handle(), - ); - - let network = - TendermintNetwork { genesis, signer, validators, blockchain, to_rebroadcast, p2p }; + let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p }; let TendermintHandle { synced_block, synced_block_result, messages, machine } = TendermintMachine::new( @@ -235,7 +206,6 @@ impl Tributary { synced_block: Arc::new(RwLock::new(synced_block)), synced_block_result: Arc::new(RwLock::new(synced_block_result)), messages: Arc::new(RwLock::new(messages)), - p2p_meta_task_handle, }) } diff --git a/coordinator/tributary/src/tendermint/mod.rs b/coordinator/tributary/src/tendermint/mod.rs index e38efa5d..0ce6232c 100644 --- a/coordinator/tributary/src/tendermint/mod.rs +++ b/coordinator/tributary/src/tendermint/mod.rs @@ -1,8 +1,5 @@ use core::ops::Deref; -use std::{ - sync::Arc, - collections::{VecDeque, HashMap}, -}; +use std::{sync::Arc, collections::HashMap}; use async_trait::async_trait; @@ -270,8 +267,6 @@ pub struct TendermintNetwork { pub(crate) validators: Arc, pub(crate) blockchain: Arc>>, - pub(crate) to_rebroadcast: Arc>>>, - pub(crate) p2p: P, } @@ -308,26 +303,6 @@ impl Network for TendermintNetwork async fn broadcast(&mut self, msg: SignedMessageFor) { let mut to_broadcast = vec![TENDERMINT_MESSAGE]; to_broadcast.extend(msg.encode()); - - // Since we're broadcasting a Tendermint message, set it to be re-broadcasted every second - // until the block it's trying to build is complete - // If the P2P layer drops a message before all nodes obtained access, or a node had an - // intermittent failure, this will ensure reconcilliation - // This is atrocious if there's no content-based deduplication protocol for messages actively - // being gossiped - // LibP2p, as used by Serai, is configured to content-based deduplicate - { - let mut to_rebroadcast_lock = self.to_rebroadcast.write().await; - to_rebroadcast_lock.push_back(to_broadcast.clone()); - // We should have, ideally, 3 * validators messages within a round - // Therefore, this should keep the most recent 2-rounds - // TODO: This isn't perfect. Each participant should just rebroadcast their latest round of - // messages - while to_rebroadcast_lock.len() > (6 * self.validators.weights.len()) { - to_rebroadcast_lock.pop_front(); - } - } - self.p2p.broadcast(self.genesis, to_broadcast).await } @@ -366,7 +341,7 @@ impl Network for TendermintNetwork } } - async fn validate(&mut self, block: &Self::Block) -> Result<(), TendermintBlockError> { + async fn validate(&self, block: &Self::Block) -> Result<(), TendermintBlockError> { let block = Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?; self @@ -428,9 +403,6 @@ impl Network for TendermintNetwork } } - // Since we've added a valid block, clear to_rebroadcast - *self.to_rebroadcast.write().await = VecDeque::new(); - Some(TendermintBlock( self.blockchain.write().await.build_block::(&self.signature_scheme()).serialize(), )) diff --git a/coordinator/tributary/tendermint/src/block.rs b/coordinator/tributary/tendermint/src/block.rs index 6dfacfdb..236b4816 100644 --- a/coordinator/tributary/tendermint/src/block.rs +++ b/coordinator/tributary/tendermint/src/block.rs @@ -3,7 +3,6 @@ use std::{ collections::{HashSet, HashMap}, }; -use parity_scale_codec::Encode; use serai_db::{Get, DbTxn, Db}; use crate::{ @@ -20,7 +19,7 @@ pub(crate) struct BlockData { pub(crate) number: BlockNumber, pub(crate) validator_id: Option, - pub(crate) proposal: Option, + pub(crate) our_proposal: Option, pub(crate) log: MessageLog, pub(crate) slashes: HashSet, @@ -43,7 +42,7 @@ impl BlockData { weights: Arc, number: BlockNumber, validator_id: Option, - proposal: Option, + our_proposal: Option, ) -> BlockData { BlockData { db, @@ -51,7 +50,7 @@ impl BlockData { number, validator_id, - proposal, + our_proposal, log: MessageLog::new(weights), slashes: HashSet::new(), @@ -108,17 +107,17 @@ impl BlockData { self.populate_end_time(round); } - // 11-13 + // L11-13 self.round = Some(RoundData::::new( round, time.unwrap_or_else(|| self.end_time[&RoundNumber(round.0 - 1)]), )); self.end_time.insert(round, self.round().end_time()); - // 14-21 + // L14-21 if Some(proposer) == self.validator_id { let (round, block) = self.valid.clone().unzip(); - block.or_else(|| self.proposal.clone()).map(|block| Data::Proposal(round, block)) + block.or_else(|| self.our_proposal.clone()).map(|block| Data::Proposal(round, block)) } else { self.round_mut().set_timeout(Step::Propose); None @@ -198,8 +197,8 @@ impl BlockData { assert!(!new_round); None?; } - // Put this message to the DB - txn.put(&msg_key, res.encode()); + // Put that we're sending this message to the DB + txn.put(&msg_key, []); txn.commit(); } diff --git a/coordinator/tributary/tendermint/src/ext.rs b/coordinator/tributary/tendermint/src/ext.rs index b3d568a2..3869d9d9 100644 --- a/coordinator/tributary/tendermint/src/ext.rs +++ b/coordinator/tributary/tendermint/src/ext.rs @@ -288,7 +288,7 @@ pub trait Network: Sized + Send + Sync { async fn slash(&mut self, validator: Self::ValidatorId, slash_event: SlashEvent); /// Validate a block. - async fn validate(&mut self, block: &Self::Block) -> Result<(), BlockError>; + async fn validate(&self, block: &Self::Block) -> Result<(), BlockError>; /// Add a block, returning the proposal for the next one. /// diff --git a/coordinator/tributary/tendermint/src/lib.rs b/coordinator/tributary/tendermint/src/lib.rs index da80a41c..edd26183 100644 --- a/coordinator/tributary/tendermint/src/lib.rs +++ b/coordinator/tributary/tendermint/src/lib.rs @@ -6,7 +6,7 @@ use std::{ collections::VecDeque, }; -use parity_scale_codec::{Encode, Decode}; +use parity_scale_codec::{Encode, Decode, IoReader}; use futures_channel::mpsc; use futures_util::{ @@ -15,6 +15,8 @@ use futures_util::{ }; use tokio::time::sleep; +use serai_db::{Get, DbTxn, Db}; + pub mod time; use time::{sys_time, CanonicalInstant}; @@ -30,6 +32,11 @@ pub(crate) mod message_log; pub mod ext; use ext::*; +const MESSAGE_TAPE_KEY: &[u8] = b"tendermint-machine-message_tape"; +fn message_tape_key(genesis: [u8; 32]) -> Vec { + [MESSAGE_TAPE_KEY, &genesis].concat() +} + pub fn commit_msg(end_time: u64, id: &[u8]) -> Vec { [&end_time.to_le_bytes(), id].concat() } @@ -103,9 +110,23 @@ impl SignedMessage { } } +#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)] +pub enum SlashReason { + FailToPropose, + InvalidBlock, + InvalidProposer, +} + +#[derive(Clone, PartialEq, Eq, Debug, Encode, Decode)] +pub enum Evidence { + ConflictingMessages(Vec, Vec), + InvalidPrecommit(Vec), + InvalidValidRound(Vec), +} + #[derive(Clone, PartialEq, Eq, Debug)] -pub enum TendermintError { - Malicious(N::ValidatorId, Option), +pub enum TendermintError { + Malicious, Temporal, AlreadyHandled, InvalidEvidence, @@ -126,20 +147,6 @@ pub type SignedMessageFor = SignedMessage< <::SignatureScheme as SignatureScheme>::Signature, >; -#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)] -pub enum SlashReason { - FailToPropose, - InvalidBlock, - InvalidMessage, -} - -#[derive(Clone, PartialEq, Eq, Debug, Encode, Decode)] -pub enum Evidence { - ConflictingMessages(Vec, Vec), - InvalidPrecommit(Vec), - InvalidValidRound(Vec), -} - pub fn decode_signed_message(mut data: &[u8]) -> Option> { SignedMessageFor::::decode(&mut data).ok() } @@ -147,7 +154,7 @@ pub fn decode_signed_message(mut data: &[u8]) -> Option( data: &[u8], schema: &N::SignatureScheme, -) -> Result, TendermintError> { +) -> Result, TendermintError> { let msg = decode_signed_message::(data).ok_or(TendermintError::InvalidEvidence)?; // verify that evidence messages are signed correctly @@ -162,7 +169,7 @@ pub fn verify_tendermint_evience( evidence: &Evidence, schema: &N::SignatureScheme, commit: impl Fn(u64) -> Option>, -) -> Result<(), TendermintError> { +) -> Result<(), TendermintError> { match evidence { Evidence::ConflictingMessages(first, second) => { let first = decode_and_verify_signed_message::(first, schema)?.msg; @@ -186,15 +193,16 @@ pub fn verify_tendermint_evience( }; // TODO: We need to be passed in the genesis time to handle this edge case if msg.block.0 == 0 { - todo!("invalid precommit signature on first block") + Err(TendermintError::InvalidEvidence)? + // todo!("invalid precommit signature on first block") } // get the last commit let prior_commit = match commit(msg.block.0 - 1) { Some(c) => c, - // If we have yet to sync the block in question, we will return InvalidContent based + // If we have yet to sync the block in question, we will return InvalidEvidence based // on our own temporal ambiguity - // This will also cause an InvalidContent for anything using a non-existent block, + // This will also cause an InvalidEvidence for anything using a non-existent block, // yet that's valid behavior // TODO: Double check the ramifications of this _ => Err(TendermintError::InvalidEvidence)?, @@ -229,6 +237,16 @@ pub enum SlashEvent { WithEvidence(Evidence), } +// Struct for if various upon handlers have been triggered to ensure they don't trigger multiple +// times. +#[derive(Clone, PartialEq, Eq, Debug)] +struct Upons { + upon_prevotes: bool, + upon_successful_current_round_prevotes: bool, + upon_negative_current_round_prevotes: bool, + upon_precommits: bool, +} + /// A machine executing the Tendermint protocol. pub struct TendermintMachine { db: N::Db, @@ -245,6 +263,7 @@ pub struct TendermintMachine { synced_block_result_send: mpsc::UnboundedSender, block: BlockData, + upons: Upons, } pub struct SyncedBlock { @@ -325,6 +344,13 @@ impl TendermintMachine { ); sleep(time_until_round_end).await; + // Clear the message tape + { + let mut txn = self.db.txn(); + txn.del(&message_tape_key(self.genesis)); + txn.commit(); + } + // Clear our outbound message queue self.queue = VecDeque::new(); @@ -338,6 +364,14 @@ impl TendermintMachine { proposal, ); + // Clear upons + self.upons = Upons { + upon_prevotes: false, + upon_successful_current_round_prevotes: false, + upon_negative_current_round_prevotes: false, + upon_precommits: false, + }; + // Start the first round self.round(RoundNumber(0), Some(round_end)); } @@ -375,6 +409,419 @@ impl TendermintMachine { } } + fn proposal_for_round(&self, round: RoundNumber) -> Option<(Option, &N::Block)> { + let proposer = self.weights.proposer(self.block.number, round); + if let Some(proposal_signed) = self.block.log.get(round, proposer, Step::Propose) { + if let Data::Proposal(vr, block) = &proposal_signed.msg.data { + Some((*vr, block)) + } else { + panic!("message for Step::Propose didn't have Data::Proposal"); + } + } else { + None? + } + } + + // L22-27 + fn upon_proposal_without_valid_round(&mut self) { + if self.block.round().step != Step::Propose { + return; + } + + // If we have the proposal message... + let Some((None, block)) = self.proposal_for_round(self.block.round().number) else { + return; + }; + + // There either needs to not be a locked value or it must be equivalent + #[allow(clippy::map_unwrap_or)] + if self + .block + .locked + .as_ref() + .map(|(_round, locked_block)| block.id() == *locked_block) + .unwrap_or(true) + { + self.broadcast(Data::Prevote(Some(block.id()))); + } else { + self.broadcast(Data::Prevote(None)); + } + } + + // L28-33 + fn upon_proposal_with_valid_round(&mut self) { + if self.block.round().step != Step::Propose { + return; + } + + // If we have the proposal message... + let Some((Some(proposal_valid_round), block)) = + self.proposal_for_round(self.block.round().number) + else { + return; + }; + + // Check we have the necessary prevotes + if !self.block.log.has_consensus(proposal_valid_round, &Data::Prevote(Some(block.id()))) { + return; + } + + // We don't check valid round < current round as the `message` function does + + // If locked is None, lockedRoundp is -1 and less than valid round + #[allow(clippy::map_unwrap_or)] + let locked_clause_1 = self + .block + .locked + .as_ref() + .map(|(locked_round, _block)| locked_round.0 <= proposal_valid_round.0) + .unwrap_or(true); + // The second clause is if the locked values are equivalent. If no value is locked, they aren't + #[allow(clippy::map_unwrap_or)] + let locked_clause_2 = self + .block + .locked + .as_ref() + .map(|(_round, locked_block)| block.id() == *locked_block) + .unwrap_or(false); + + if locked_clause_1 || locked_clause_2 { + self.broadcast(Data::Prevote(Some(block.id()))); + } else { + self.broadcast(Data::Prevote(None)); + } + } + + // L34-35 + fn upon_prevotes(&mut self) { + if self.upons.upon_prevotes || (self.block.round().step != Step::Prevote) { + return; + } + + if self.block.log.has_participation(self.block.round().number, Step::Prevote) { + self.block.round_mut().set_timeout(Step::Prevote); + self.upons.upon_prevotes = true; + } + } + + // L36-43 + async fn upon_successful_current_round_prevotes(&mut self) { + // Returning if `self.step == Step::Propose` is equivalent to guarding `step >= prevote` + if self.upons.upon_successful_current_round_prevotes || + (self.block.round().step == Step::Propose) + { + return; + } + + // If we have the proposal message... + let Some((_, block)) = self.proposal_for_round(self.block.round().number) else { + return; + }; + + // Check we have the necessary prevotes + if !self.block.log.has_consensus(self.block.round().number, &Data::Prevote(Some(block.id()))) { + return; + } + + let block = block.clone(); + self.upons.upon_successful_current_round_prevotes = true; + + if self.block.round().step == Step::Prevote { + self.block.locked = Some((self.block.round().number, block.id())); + let signature = self + .signer + .sign(&commit_msg( + self.block.end_time[&self.block.round().number].canonical(), + block.id().as_ref(), + )) + .await; + self.broadcast(Data::Precommit(Some((block.id(), signature)))); + } + self.block.valid = Some((self.block.round().number, block)); + } + + // L44-46 + fn upon_negative_current_round_prevotes(&mut self) { + if self.upons.upon_negative_current_round_prevotes || (self.block.round().step != Step::Prevote) + { + return; + } + + if self.block.log.has_consensus(self.block.round().number, &Data::Prevote(None)) { + self.broadcast(Data::Precommit(None)); + } + + self.upons.upon_negative_current_round_prevotes = true; + } + + // L47-48 + fn upon_precommits(&mut self) { + if self.upons.upon_precommits { + return; + } + + if self.block.log.has_participation(self.block.round().number, Step::Precommit) { + self.block.round_mut().set_timeout(Step::Precommit); + self.upons.upon_precommits = true; + } + } + + // L22-48 + async fn all_current_round_upons(&mut self) { + self.upon_proposal_without_valid_round(); + self.upon_proposal_with_valid_round(); + self.upon_prevotes(); + self.upon_successful_current_round_prevotes().await; + self.upon_negative_current_round_prevotes(); + self.upon_precommits(); + } + + // L49-54 + async fn upon_successful_precommits(&mut self, round: RoundNumber) -> bool { + // If we have the proposal message... + let Some((_, block)) = self.proposal_for_round(round) else { return false }; + + // Check we have the necessary precommits + // The precommit we check we have consensus upon uses a junk signature since message equality + // disregards the signature + if !self + .block + .log + .has_consensus(round, &Data::Precommit(Some((block.id(), self.signer.sign(&[]).await)))) + { + return false; + } + + // Get all participants in this commit + let mut validators = vec![]; + let mut sigs = vec![]; + // Get all precommits for this round + for (validator, msgs) in &self.block.log.log[&round] { + if let Some(signed) = msgs.get(&Step::Precommit) { + if let Data::Precommit(Some((id, sig))) = &signed.msg.data { + // If this precommit was for this block, include it + if *id == block.id() { + validators.push(*validator); + sigs.push(sig.clone()); + } + } + } + } + + // Form the commit itself + let commit_msg = commit_msg(self.block.end_time[&round].canonical(), block.id().as_ref()); + let commit = Commit { + end_time: self.block.end_time[&round].canonical(), + validators: validators.clone(), + signature: self.network.signature_scheme().aggregate(&validators, &commit_msg, &sigs), + }; + debug_assert!(self.network.verify_commit(block.id(), &commit)); + + // Add the block and reset the machine + log::info!( + target: "tendermint", + "TendermintMachine produced block {}", + hex::encode(block.id().as_ref()), + ); + let id = block.id(); + let proposal = self.network.add_block(block.clone(), commit).await; + log::trace!( + target: "tendermint", + "added block {} (produced by machine)", + hex::encode(id.as_ref()), + ); + self.reset(round, proposal).await; + + true + } + + // L49-54 + async fn all_any_round_upons(&mut self, round: RoundNumber) -> bool { + self.upon_successful_precommits(round).await + } + + // Returns Ok(true) if this was a Precommit which had either no signature or its signature + // validated + // Returns Ok(false) if it wasn't a Precommit or the signature wasn't validated yet + // Returns Err if the signature was invalid + async fn verify_precommit_signature( + &mut self, + signed: &SignedMessageFor, + ) -> Result { + let msg = &signed.msg; + if let Data::Precommit(precommit) = &msg.data { + let Some((id, sig)) = precommit else { return Ok(true) }; + // Also verify the end_time of the commit + // Only perform this verification if we already have the end_time + // Else, there's a DoS where we receive a precommit for some round infinitely in the future + // which forces us to calculate every end time + if let Some(end_time) = self.block.end_time.get(&msg.round) { + if !self.validators.verify(msg.sender, &commit_msg(end_time.canonical(), id.as_ref()), sig) + { + log::warn!(target: "tendermint", "validator produced an invalid commit signature"); + self + .slash( + msg.sender, + SlashEvent::WithEvidence(Evidence::InvalidPrecommit(signed.encode())), + ) + .await; + Err(TendermintError::Malicious)?; + } + return Ok(true); + } + } + Ok(false) + } + + async fn message(&mut self, signed: &SignedMessageFor) -> Result<(), TendermintError> { + let msg = &signed.msg; + if msg.block != self.block.number { + Err(TendermintError::Temporal)?; + } + + if (msg.block == self.block.number) && + (msg.round == self.block.round().number) && + (msg.data.step() == Step::Propose) + { + log::trace!( + target: "tendermint", + "received Propose for block {}, round {}", + msg.block.0, + msg.round.0, + ); + } + + // If this is a precommit, verify its signature + self.verify_precommit_signature(signed).await?; + + // Only let the proposer propose + if matches!(msg.data, Data::Proposal(..)) && + (msg.sender != self.weights.proposer(msg.block, msg.round)) + { + log::warn!(target: "tendermint", "Validator who wasn't the proposer proposed"); + // TODO: This should have evidence + self + .slash(msg.sender, SlashEvent::Id(SlashReason::InvalidProposer, msg.block.0, msg.round.0)) + .await; + Err(TendermintError::Malicious)?; + }; + + // If this is a proposal, verify the block + // If the block is invalid, drop the message, letting the timeout cover it + // This prevents needing to check if valid inside every `upon` block + if let Data::Proposal(_, block) = &msg.data { + match self.network.validate(block).await { + Ok(()) => {} + Err(BlockError::Temporal) => return Err(TendermintError::Temporal), + Err(BlockError::Fatal) => { + log::warn!(target: "tendermint", "validator proposed a fatally invalid block"); + self + .slash( + msg.sender, + SlashEvent::Id(SlashReason::InvalidBlock, self.block.number.0, msg.round.0), + ) + .await; + Err(TendermintError::Malicious)?; + } + }; + } + + // If this is a proposal, verify the valid round isn't fundamentally invalid + if let Data::Proposal(Some(valid_round), _) = msg.data { + if valid_round.0 >= msg.round.0 { + log::warn!( + target: "tendermint", + "proposed proposed with a syntactically invalid valid round", + ); + self + .slash(msg.sender, SlashEvent::WithEvidence(Evidence::InvalidValidRound(msg.encode()))) + .await; + Err(TendermintError::Malicious)?; + } + } + + // Add it to the log, returning if it was already handled + match self.block.log.log(signed.clone()) { + Ok(true) => {} + Ok(false) => Err(TendermintError::AlreadyHandled)?, + Err(evidence) => { + self.slash(msg.sender, SlashEvent::WithEvidence(evidence)).await; + Err(TendermintError::Malicious)?; + } + } + log::debug!( + target: "tendermint", + "received new tendermint message (block: {}, round: {}, step: {:?})", + msg.block.0, + msg.round.0, + msg.data.step(), + ); + + // Run all `upons` run for any round + // If it returned true, we added a new block, so return + if self.all_any_round_upons(msg.round).await { + return Ok(()); + } + + // Check if we need to jump ahead + #[allow(clippy::comparison_chain)] + if msg.round.0 < self.block.round().number.0 { + // Prior round, disregard if not finalizing + return Ok(()); + } else if msg.round.0 > self.block.round().number.0 { + // 55-56 + // Jump, enabling processing by the below code + if self.block.log.round_participation(msg.round) > self.weights.fault_threshold() { + log::debug!( + target: "tendermint", + "jumping from round {} to round {}", + self.block.round().number.0, + msg.round.0, + ); + + // Jump to the new round. + self.round(msg.round, None); + + // If this round already has precommit messages, verify their signatures + let round_msgs = self.block.log.log[&msg.round].clone(); + for (validator, msgs) in &round_msgs { + if let Some(existing) = msgs.get(&Step::Precommit) { + if let Ok(res) = self.verify_precommit_signature(existing).await { + // Ensure this actually verified the signature instead of believing it shouldn't yet + assert!(res); + } else { + // Remove the message so it isn't counted towards forming a commit/included in one + // This won't remove the fact they precommitted for this block hash in the MessageLog + // TODO: Don't even log these in the first place until we jump, preventing needing + // to do this in the first place + self + .block + .log + .log + .get_mut(&msg.round) + .unwrap() + .get_mut(validator) + .unwrap() + .remove(&Step::Precommit) + .unwrap(); + } + } + } + } else { + // Future round which we aren't ready to jump to, so return for now + return Ok(()); + } + } + + // msg.round is now guaranteed to be equal to self.block.round().number + debug_assert_eq!(msg.round, self.block.round().number); + + // Run all `upons` run for the current round + self.all_current_round_upons().await; + + Ok(()) + } + /// Create a new Tendermint machine, from the specified point, with the specified block as the /// one to propose next. This will return a channel to send messages from the gossip layer and /// the machine itself. The machine should have `run` called from an asynchronous task. @@ -419,7 +866,7 @@ impl TendermintMachine { let validators = network.signature_scheme(); let weights = Arc::new(network.weights()); let validator_id = signer.validator_id().await; - // 01-10 + // L01-10 let mut machine = TendermintMachine { db: db.clone(), genesis, @@ -442,6 +889,13 @@ impl TendermintMachine { validator_id, Some(proposal), ), + + upons: Upons { + upon_prevotes: false, + upon_successful_current_round_prevotes: false, + upon_negative_current_round_prevotes: false, + upon_precommits: false, + }, }; // The end time of the last block is the start time for this one @@ -460,16 +914,16 @@ impl TendermintMachine { pub async fn run(mut self) { log::debug!(target: "tendermint", "running TendermintMachine"); + let mut rebroadcast_future = Box::pin(sleep(Duration::from_secs(60))).fuse(); loop { // Also create a future for if the queue has a message // Does not pop_front as if another message has higher priority, its future will be handled // instead in this loop, and the popped value would be dropped with the next iteration - // While no other message has a higher priority right now, this is a safer practice let mut queue_future = if self.queue.is_empty() { Fuse::terminated() } else { future::ready(()).fuse() }; if let Some((our_message, msg, mut sig)) = futures_util::select_biased! { - // Handle a new block occurring externally (an external sync loop) + // Handle a new block occurring externally (from an external sync loop) // Has the highest priority as it makes all other futures here irrelevant msg = self.synced_block_recv.next() => { if let Some(SyncedBlock { number, block, commit }) = msg { @@ -503,16 +957,19 @@ impl TendermintMachine { Some((true, self.queue.pop_front().unwrap(), None)) }, + // L57-67 // Handle any timeouts step = self.block.round().timeout_future().fuse() => { // Remove the timeout so it doesn't persist, always being the selected future due to bias // While this does enable the timeout to be entered again, the timeout setting code will // never attempt to add a timeout after its timeout has expired + // (due to it setting an `upon` boolean) self.block.round_mut().timeouts.remove(&step); - // Only run if it's still the step in question - if self.block.round().step == step { - match step { - Step::Propose => { + + match step { + Step::Propose => { + // Only run if it's still the step in question + if self.block.round().step == step { // Slash the validator for not proposing when they should've log::debug!(target: "tendermint", "Validator didn't propose when they should have"); // this slash will be voted on. @@ -525,14 +982,42 @@ impl TendermintMachine { ), ).await; self.broadcast(Data::Prevote(None)); - }, - Step::Prevote => self.broadcast(Data::Precommit(None)), - Step::Precommit => { - self.round(RoundNumber(self.block.round().number.0 + 1), None); - continue; } + }, + Step::Prevote => { + // Only run if it's still the step in question + if self.block.round().step == step { + self.broadcast(Data::Precommit(None)) + } + }, + Step::Precommit => { + self.round(RoundNumber(self.block.round().number.0 + 1), None); } + }; + + // Execute the upons now that the state has changed + self.all_any_round_upons(self.block.round().number).await; + self.all_current_round_upons().await; + + None + }, + + // If it's been more than 60s, rebroadcast our own messages + () = rebroadcast_future => { + let key = message_tape_key(self.genesis); + let messages = self.db.get(key).unwrap_or(vec![]); + let mut messages = messages.as_slice(); + + while !messages.is_empty() { + self.network.broadcast( + SignedMessageFor::::decode(&mut IoReader(&mut messages)) + .expect("saved invalid message to DB") + ).await; } + + // Reset the rebroadcast future + rebroadcast_future = Box::pin(sleep(core::time::Duration::from_secs(60))).fuse(); + None }, @@ -554,429 +1039,31 @@ impl TendermintMachine { } let sig = sig.unwrap(); - // TODO: message may internally call broadcast. We should check within broadcast it's not - // broadcasting our own message at this time. let signed_msg = SignedMessage { msg: msg.clone(), sig: sig.clone() }; let res = self.message(&signed_msg).await; + // If this is our message, and we hit an invariant, we could be slashed. + // We only broadcast our message after running it ourselves, to ensure it doesn't error, to + // ensure we don't get slashed on invariants. if res.is_err() && our_message { panic!("honest node (ourselves) had invalid behavior"); } - // Only now should we allow broadcasts since we're sure an invariant wasn't reached causing - // us to have invalid messages. + // Save this message to a linear tape of all our messages for this block, if ours + // TODO: Since we do this after we mark this message as sent to prevent equivocations, a + // precisely time reboot could cause this message marked as sent yet not added to the tape + if our_message { + let message_tape_key = message_tape_key(self.genesis); + let mut txn = self.db.txn(); + let mut message_tape = txn.get(&message_tape_key).unwrap_or(vec![]); + message_tape.extend(signed_msg.encode()); + txn.put(&message_tape_key, message_tape); + } + + // Re-broadcast this since it's an original consensus message worth handling if res.is_ok() { - // Re-broadcast this since it's an original consensus message self.network.broadcast(signed_msg).await; } - - match res { - Ok(None) => {} - Ok(Some(block)) => { - let mut validators = vec![]; - let mut sigs = vec![]; - // Get all precommits for this round - for (validator, msgs) in &self.block.log.log[&msg.round] { - if let Some(signed) = msgs.get(&Step::Precommit) { - if let Data::Precommit(Some((id, sig))) = &signed.msg.data { - // If this precommit was for this block, include it - if *id == block.id() { - validators.push(*validator); - sigs.push(sig.clone()); - } - } - } - } - - let commit_msg = - commit_msg(self.block.end_time[&msg.round].canonical(), block.id().as_ref()); - let commit = Commit { - end_time: self.block.end_time[&msg.round].canonical(), - validators: validators.clone(), - signature: self.network.signature_scheme().aggregate(&validators, &commit_msg, &sigs), - }; - debug_assert!(self.network.verify_commit(block.id(), &commit)); - - log::info!( - target: "tendermint", - "TendermintMachine produced block {}", - hex::encode(block.id().as_ref()), - ); - let id = block.id(); - let proposal = self.network.add_block(block, commit).await; - log::trace!( - target: "tendermint", - "added block {} (produced by machine)", - hex::encode(id.as_ref()), - ); - self.reset(msg.round, proposal).await; - } - Err(TendermintError::Malicious(sender, evidence)) => { - let current_msg = SignedMessage { msg: msg.clone(), sig: sig.clone() }; - - let slash = if let Some(ev) = evidence { - // if the malicious message contains a block, only vote to slash - // TODO: Should this decision be made at a higher level? - // A higher-level system may be able to verify if the contained block is fatally - // invalid - // A higher-level system may accept the bandwidth size of this, even if the issue is - // just the valid round field - if let Data::Proposal(_, _) = ¤t_msg.msg.data { - SlashEvent::Id( - SlashReason::InvalidBlock, - self.block.number.0, - self.block.round().number.0, - ) - } else { - // slash with evidence otherwise - SlashEvent::WithEvidence(ev) - } - } else { - // we don't have evidence. Slash with vote. - SlashEvent::Id( - SlashReason::InvalidMessage, - self.block.number.0, - self.block.round().number.0, - ) - }; - - // Each message that we're voting to slash over needs to be re-broadcasted so other - // validators also trigger their own votes - // TODO: should this be inside slash function? - if let SlashEvent::Id(_, _, _) = slash { - self.network.broadcast(current_msg).await; - } - - self.slash(sender, slash).await - } - Err( - TendermintError::Temporal | - TendermintError::AlreadyHandled | - TendermintError::InvalidEvidence, - ) => (), - } } } } - - // Returns Ok(true) if this was a Precommit which had either no signature or its signature - // validated - // Returns Ok(false) if it wasn't a Precommit or the signature wasn't validated yet - // Returns Err if the signature was invalid - fn verify_precommit_signature( - &self, - signed: &SignedMessageFor, - ) -> Result> { - let msg = &signed.msg; - if let Data::Precommit(precommit) = &msg.data { - let Some((id, sig)) = precommit else { return Ok(true) }; - // Also verify the end_time of the commit - // Only perform this verification if we already have the end_time - // Else, there's a DoS where we receive a precommit for some round infinitely in the future - // which forces us to calculate every end time - if let Some(end_time) = self.block.end_time.get(&msg.round) { - if !self.validators.verify(msg.sender, &commit_msg(end_time.canonical(), id.as_ref()), sig) - { - log::warn!(target: "tendermint", "Validator produced an invalid commit signature"); - Err(TendermintError::Malicious( - msg.sender, - Some(Evidence::InvalidPrecommit(signed.encode())), - ))?; - } - return Ok(true); - } - } - Ok(false) - } - - async fn message( - &mut self, - signed: &SignedMessageFor, - ) -> Result, TendermintError> { - let msg = &signed.msg; - if msg.block != self.block.number { - Err(TendermintError::Temporal)?; - } - - if (msg.block == self.block.number) && - (msg.round == self.block.round().number) && - (msg.data.step() == Step::Propose) - { - log::trace!( - target: "tendermint", - "received Propose for block {}, round {}", - msg.block.0, - msg.round.0, - ); - } - - // If this is a precommit, verify its signature - self.verify_precommit_signature(signed)?; - - // Only let the proposer propose - if matches!(msg.data, Data::Proposal(..)) && - (msg.sender != self.weights.proposer(msg.block, msg.round)) - { - log::warn!(target: "tendermint", "Validator who wasn't the proposer proposed"); - // TODO: This should have evidence - Err(TendermintError::Malicious(msg.sender, None))?; - }; - - if !self.block.log.log(signed.clone())? { - return Err(TendermintError::AlreadyHandled); - } - log::debug!( - target: "tendermint", - "received new tendermint message (block: {}, round: {}, step: {:?})", - msg.block.0, - msg.round.0, - msg.data.step(), - ); - - // All functions, except for the finalizer and the jump, are locked to the current round - - // Run the finalizer to see if it applies - // 49-52 - if matches!(msg.data, Data::Proposal(..)) || matches!(msg.data, Data::Precommit(_)) { - let proposer = self.weights.proposer(self.block.number, msg.round); - - // Get the proposal - if let Some(proposal_signed) = self.block.log.get(msg.round, proposer, Step::Propose) { - if let Data::Proposal(_, block) = &proposal_signed.msg.data { - // Check if it has gotten a sufficient amount of precommits - // Uses a junk signature since message equality disregards the signature - if self.block.log.has_consensus( - msg.round, - &Data::Precommit(Some((block.id(), self.signer.sign(&[]).await))), - ) { - // If msg.round is in the future, these Precommits won't have their inner signatures - // verified - // It should be impossible for msg.round to be in the future however, as this requires - // 67% of validators to Precommit, and we jump on 34% participating in the new round - // The one exception would be if a validator had 34%, and could cause participation to - // go from 33% (not enough to jump) to 67%, without executing the below code - // This also would require the local machine to be outside of allowed time tolerances, - // or the validator with 34% to not be publishing Prevotes (as those would cause a - // a jump) - // Both are invariants - // TODO: Replace this panic with an inner signature check - assert!(msg.round.0 <= self.block.round().number.0); - - log::debug!(target: "tendermint", "block {} has consensus", msg.block.0); - return Ok(Some(block.clone())); - } - } - } - } - - // Else, check if we need to jump ahead - #[allow(clippy::comparison_chain)] - if msg.round.0 < self.block.round().number.0 { - // Prior round, disregard if not finalizing - return Ok(None); - } else if msg.round.0 > self.block.round().number.0 { - // 55-56 - // Jump, enabling processing by the below code - if self.block.log.round_participation(msg.round) > self.weights.fault_threshold() { - log::debug!( - target: "tendermint", - "jumping from round {} to round {}", - self.block.round().number.0, - msg.round.0, - ); - - // Jump to the new round. - let proposer = self.round(msg.round, None); - - // If this round already has precommit messages, verify their signatures - let round_msgs = self.block.log.log[&msg.round].clone(); - for (validator, msgs) in &round_msgs { - if let Some(existing) = msgs.get(&Step::Precommit) { - if let Ok(res) = self.verify_precommit_signature(existing) { - // Ensure this actually verified the signature instead of believing it shouldn't yet - assert!(res); - } else { - // Remove the message so it isn't counted towards forming a commit/included in one - // This won't remove the fact they precommitted for this block hash in the MessageLog - // TODO: Don't even log these in the first place until we jump, preventing needing - // to do this in the first place - let msg = self - .block - .log - .log - .get_mut(&msg.round) - .unwrap() - .get_mut(validator) - .unwrap() - .remove(&Step::Precommit) - .unwrap(); - - // Slash the validator for publishing an invalid commit signature - self - .slash( - *validator, - SlashEvent::WithEvidence(Evidence::InvalidPrecommit(msg.encode())), - ) - .await; - } - } - } - - // If we're the proposer, return now we don't waste time on the current round - // (as it doesn't have a proposal, since we didn't propose, and cannot complete) - if proposer { - return Ok(None); - } - } else { - // Future round which we aren't ready to jump to, so return for now - return Ok(None); - } - } - - // msg.round is now guaranteed to be equal to self.block.round().number - debug_assert_eq!(msg.round, self.block.round().number); - - // The paper executes these checks when the step is prevote. Making sure this message warrants - // rerunning these checks is a sane optimization since message instances is a full iteration - // of the round map - if (self.block.round().step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) { - let (participation, weight) = - self.block.log.message_instances(self.block.round().number, &Data::Prevote(None)); - let threshold_weight = self.weights.threshold(); - if participation < threshold_weight { - log::trace!( - target: "tendermint", - "progess towards setting prevote timeout, participation: {}, needed: {}", - participation, - threshold_weight, - ); - } - // 34-35 - if participation >= threshold_weight { - log::trace!( - target: "tendermint", - "setting timeout for prevote due to sufficient participation", - ); - self.block.round_mut().set_timeout(Step::Prevote); - } - - // 44-46 - if weight >= threshold_weight { - self.broadcast(Data::Precommit(None)); - return Ok(None); - } - } - - // 47-48 - if matches!(msg.data, Data::Precommit(_)) && - self.block.log.has_participation(self.block.round().number, Step::Precommit) - { - log::trace!( - target: "tendermint", - "setting timeout for precommit due to sufficient participation", - ); - self.block.round_mut().set_timeout(Step::Precommit); - } - - // All further operations require actually having the proposal in question - let proposer = self.weights.proposer(self.block.number, self.block.round().number); - let (vr, block) = if let Some(proposal_signed) = - self.block.log.get(self.block.round().number, proposer, Step::Propose) - { - if let Data::Proposal(vr, block) = &proposal_signed.msg.data { - (vr, block) - } else { - panic!("message for Step::Propose didn't have Data::Proposal"); - } - } else { - return Ok(None); - }; - - // 22-33 - if self.block.round().step == Step::Propose { - // Delay error handling (triggering a slash) until after we vote. - let (valid, err) = match self.network.validate(block).await { - Ok(()) => (true, Ok(None)), - Err(BlockError::Temporal) => (false, Ok(None)), - Err(BlockError::Fatal) => (false, { - log::warn!(target: "tendermint", "Validator proposed a fatally invalid block"); - // TODO: Produce evidence of this for the higher level code to decide what to do with - Err(TendermintError::Malicious(proposer, None)) - }), - }; - // Create a raw vote which only requires block validity as a basis for the actual vote. - let raw_vote = Some(block.id()).filter(|_| valid); - - // If locked is none, it has a round of -1 according to the protocol. That satisfies - // 23 and 29. If it's some, both are satisfied if they're for the same ID. If it's some - // with different IDs, the function on 22 rejects yet the function on 28 has one other - // condition - let locked = self.block.locked.as_ref().map_or(true, |(_, id)| id == &block.id()); - let mut vote = raw_vote.filter(|_| locked); - - if let Some(vr) = vr { - // Malformed message - if vr.0 >= self.block.round().number.0 { - log::warn!(target: "tendermint", "Validator claimed a round from the future was valid"); - Err(TendermintError::Malicious( - msg.sender, - Some(Evidence::InvalidValidRound(signed.encode())), - ))?; - } - - if self.block.log.has_consensus(*vr, &Data::Prevote(Some(block.id()))) { - // Allow differing locked values if the proposal has a newer valid round - // This is the other condition described above - if let Some((locked_round, _)) = self.block.locked.as_ref() { - vote = vote.or_else(|| raw_vote.filter(|_| locked_round.0 <= vr.0)); - } - - self.broadcast(Data::Prevote(vote)); - return err; - } - } else { - self.broadcast(Data::Prevote(vote)); - return err; - } - - return Ok(None); - } - - if self.block.valid.as_ref().map_or(true, |(round, _)| round != &self.block.round().number) { - // 36-43 - - // The run once condition is implemented above. Since valid will always be set by this, it - // not being set, or only being set historically, means this has yet to be run - - if self.block.log.has_consensus(self.block.round().number, &Data::Prevote(Some(block.id()))) { - match self.network.validate(block).await { - // BlockError::Temporal is due to a temporal error we have, yet a supermajority of the - // network does not, Because we do not believe this block to be fatally invalid, and - // because a supermajority deems it valid, accept it. - Ok(()) | Err(BlockError::Temporal) => (), - Err(BlockError::Fatal) => { - log::warn!(target: "tendermint", "Validator proposed a fatally invalid block"); - // TODO: Produce evidence of this for the higher level code to decide what to do with - Err(TendermintError::Malicious(proposer, None))? - } - }; - - self.block.valid = Some((self.block.round().number, block.clone())); - if self.block.round().step == Step::Prevote { - self.block.locked = Some((self.block.round().number, block.id())); - self.broadcast(Data::Precommit(Some(( - block.id(), - self - .signer - .sign(&commit_msg( - self.block.end_time[&self.block.round().number].canonical(), - block.id().as_ref(), - )) - .await, - )))); - } - } - } - - Ok(None) - } } diff --git a/coordinator/tributary/tendermint/src/message_log.rs b/coordinator/tributary/tendermint/src/message_log.rs index 3959852d..e65568ca 100644 --- a/coordinator/tributary/tendermint/src/message_log.rs +++ b/coordinator/tributary/tendermint/src/message_log.rs @@ -2,7 +2,7 @@ use std::{sync::Arc, collections::HashMap}; use parity_scale_codec::Encode; -use crate::{ext::*, RoundNumber, Step, DataFor, TendermintError, SignedMessageFor, Evidence}; +use crate::{ext::*, RoundNumber, Step, DataFor, SignedMessageFor, Evidence}; type RoundLog = HashMap<::ValidatorId, HashMap>>; pub(crate) struct MessageLog { @@ -16,7 +16,7 @@ impl MessageLog { } // Returns true if it's a new message - pub(crate) fn log(&mut self, signed: SignedMessageFor) -> Result> { + pub(crate) fn log(&mut self, signed: SignedMessageFor) -> Result { let msg = &signed.msg; // Clarity, and safety around default != new edge cases let round = self.log.entry(msg.round).or_insert_with(HashMap::new); @@ -30,10 +30,7 @@ impl MessageLog { target: "tendermint", "Validator sent multiple messages for the same block + round + step" ); - Err(TendermintError::Malicious( - msg.sender, - Some(Evidence::ConflictingMessages(existing.encode(), signed.encode())), - ))?; + Err(Evidence::ConflictingMessages(existing.encode(), signed.encode()))?; } return Ok(false); } @@ -47,7 +44,8 @@ impl MessageLog { pub(crate) fn message_instances(&self, round: RoundNumber, data: &DataFor) -> (u64, u64) { let mut participating = 0; let mut weight = 0; - for (participant, msgs) in &self.log[&round] { + let Some(log) = self.log.get(&round) else { return (0, 0) }; + for (participant, msgs) in log { if let Some(msg) = msgs.get(&data.step()) { let validator_weight = self.weights.weight(*participant); participating += validator_weight; @@ -73,7 +71,8 @@ impl MessageLog { // Check if a supermajority of nodes have participated on a specific step pub(crate) fn has_participation(&self, round: RoundNumber, step: Step) -> bool { let mut participating = 0; - for (participant, msgs) in &self.log[&round] { + let Some(log) = self.log.get(&round) else { return false }; + for (participant, msgs) in log { if msgs.get(&step).is_some() { participating += self.weights.weight(*participant); } diff --git a/coordinator/tributary/tendermint/tests/ext.rs b/coordinator/tributary/tendermint/tests/ext.rs index 3b3cf7c3..bec95ddc 100644 --- a/coordinator/tributary/tendermint/tests/ext.rs +++ b/coordinator/tributary/tendermint/tests/ext.rs @@ -145,7 +145,7 @@ impl Network for TestNetwork { println!("Slash for {id} due to {event:?}"); } - async fn validate(&mut self, block: &TestBlock) -> Result<(), BlockError> { + async fn validate(&self, block: &TestBlock) -> Result<(), BlockError> { block.valid }