diff --git a/common/db/src/parity_db.rs b/common/db/src/parity_db.rs index 8c913468..9ae345f6 100644 --- a/common/db/src/parity_db.rs +++ b/common/db/src/parity_db.rs @@ -4,6 +4,7 @@ pub use ::parity_db::{Options, Db as ParityDb}; use crate::*; +#[must_use] pub struct Transaction<'a>(&'a Arc, Vec<(u8, Vec, Option>)>); impl Get for Transaction<'_> { diff --git a/common/db/src/rocks.rs b/common/db/src/rocks.rs index 6a724563..1d42d902 100644 --- a/common/db/src/rocks.rs +++ b/common/db/src/rocks.rs @@ -7,6 +7,7 @@ use rocksdb::{ use crate::*; +#[must_use] pub struct Transaction<'a, T: ThreadMode>( RocksTransaction<'a, OptimisticTransactionDB>, &'a OptimisticTransactionDB, diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index ef876f9a..06b87eb9 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -9,7 +9,7 @@ use std::{ use async_trait::async_trait; use rand_core::{RngCore, OsRng}; -use scale::Encode; +use scale::{Decode, Encode}; use borsh::{BorshSerialize, BorshDeserialize}; use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet, Serai}; @@ -29,7 +29,7 @@ use libp2p::{ noise, yamux, request_response::{ Codec as RrCodecTrait, Message as RrMessage, Event as RrEvent, Config as RrConfig, - Behaviour as RrBehavior, + Behaviour as RrBehavior, ProtocolSupport, }, gossipsub::{ IdentTopic, FastMessageId, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder, @@ -45,9 +45,20 @@ pub(crate) use tributary::{ReadWrite, P2p as TributaryP2p}; use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent}; // Block size limit + 1 KB of space for signatures/metadata -const MAX_LIBP2P_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024; +const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024; + +const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize = + (tributary::BLOCK_SIZE_LIMIT * BLOCKS_PER_BATCH) + 1024; + const LIBP2P_TOPIC: &str = "serai-coordinator"; +// Amount of blocks in a minute +// We can't use tendermint::TARGET_BLOCK_TIME here to calculate this since that is a u32. +const BLOCKS_PER_MINUTE: usize = 10; + +// Maximum amount of blocks to send in a batch +const BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1; + #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)] pub struct CosignedBlock { pub network: NetworkId, @@ -173,6 +184,18 @@ pub struct Message { pub msg: Vec, } +#[derive(Clone, Debug, Encode, Decode)] +pub struct BlockCommit { + pub block: Vec, + pub commit: Vec, +} + +#[derive(Clone, Debug, Encode, Decode)] +pub struct HeartbeatBatch { + pub blocks: Vec, + pub timestamp: u64, +} + #[async_trait] pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p { type Id: Send + Sync + Clone + Copy + fmt::Debug; @@ -228,8 +251,8 @@ impl RrCodecTrait for RrCodec { let mut len = [0; 4]; io.read_exact(&mut len).await?; let len = usize::try_from(u32::from_le_bytes(len)).expect("not a 32-bit platform?"); - if len > MAX_LIBP2P_MESSAGE_SIZE { - Err(io::Error::other("request length exceeded MAX_LIBP2P_MESSAGE_SIZE"))?; + if len > MAX_LIBP2P_REQRES_MESSAGE_SIZE { + Err(io::Error::other("request length exceeded MAX_LIBP2P_REQRES_MESSAGE_SIZE"))?; } // This may be a non-trivial allocation easily causable // While we could chunk the read, meaning we only perform the allocation as bandwidth is used, @@ -297,7 +320,7 @@ impl LibP2p { let throwaway_key_pair = Keypair::generate_ed25519(); let behavior = Behavior { - reqres: { RrBehavior::new([], RrConfig::default()) }, + reqres: { RrBehavior::new([("/coordinator", ProtocolSupport::Full)], RrConfig::default()) }, gossipsub: { let heartbeat_interval = tributary::tendermint::LATENCY_TIME / 2; let heartbeats_per_block = @@ -308,7 +331,7 @@ impl LibP2p { .heartbeat_interval(Duration::from_millis(heartbeat_interval.into())) .history_length(heartbeats_per_block * 2) .history_gossip(heartbeats_per_block) - .max_transmit_size(MAX_LIBP2P_MESSAGE_SIZE) + .max_transmit_size(MAX_LIBP2P_GOSSIP_MESSAGE_SIZE) // We send KeepAlive after 80s .idle_timeout(Duration::from_secs(85)) .validation_mode(ValidationMode::Strict) @@ -348,10 +371,11 @@ impl LibP2p { .with_tcp(TcpConfig::default().nodelay(true), noise::Config::new, || { let mut config = yamux::Config::default(); // 1 MiB default + max message size - config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_MESSAGE_SIZE); + config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_REQRES_MESSAGE_SIZE); // 256 KiB default + max message size - config - .set_receive_window_size(((256 * 1024) + MAX_LIBP2P_MESSAGE_SIZE).try_into().unwrap()); + config.set_receive_window_size( + ((256 * 1024) + MAX_LIBP2P_REQRES_MESSAGE_SIZE).try_into().unwrap(), + ); config }) .unwrap() @@ -868,7 +892,7 @@ pub async fn handle_p2p_task( let p2p = p2p.clone(); async move { loop { - let Some(mut msg) = recv.recv().await else { + let Some(msg) = recv.recv().await else { // Channel closure happens when the tributary retires break; }; @@ -913,34 +937,53 @@ pub async fn handle_p2p_task( latest = next; } if to_send.len() > 3 { - for next in to_send { - let mut res = reader.block(&next).unwrap().serialize(); - res.extend(reader.commit(&next).unwrap()); - // Also include the timestamp used within the Heartbeat - res.extend(&msg.msg[32 .. 40]); - p2p.send(msg.sender, ReqResMessageKind::Block(genesis), res).await; + // prepare the batch to sends + let mut blocks = vec![]; + for (i, next) in to_send.iter().enumerate() { + if i >= BLOCKS_PER_BATCH { + break; + } + + blocks.push(BlockCommit { + block: reader.block(next).unwrap().serialize(), + commit: reader.commit(next).unwrap(), + }); } + let batch = HeartbeatBatch { blocks, timestamp: msg_time }; + + p2p + .send(msg.sender, ReqResMessageKind::Block(genesis), batch.encode()) + .await; } }); } P2pMessageKind::ReqRes(ReqResMessageKind::Block(msg_genesis)) => { assert_eq!(msg_genesis, genesis); - let mut msg_ref: &[u8] = msg.msg.as_ref(); - let Ok(block) = Block::::read(&mut msg_ref) else { - log::error!("received block message with an invalidly serialized block"); + // decode the batch + let Ok(batch) = HeartbeatBatch::decode(&mut msg.msg.as_ref()) else { + log::error!( + "received HeartBeatBatch message with an invalidly serialized batch" + ); continue; }; - // Get just the commit - msg.msg.drain(.. (msg.msg.len() - msg_ref.len())); - msg.msg.drain((msg.msg.len() - 8) ..); - let res = tributary.tributary.sync_block(block, msg.msg).await; - log::debug!( - "received block from {:?}, sync_block returned {}", - msg.sender, - res - ); + // sync blocks + for bc in batch.blocks { + // TODO: why do we use ReadWrite instead of Encode/Decode for blocks? + // Should we use the same for batches so we can read both at the same time? + let Ok(block) = Block::::read(&mut bc.block.as_slice()) else { + log::error!("received block message with an invalidly serialized block"); + continue; + }; + + let res = tributary.tributary.sync_block(block, bc.commit).await; + log::debug!( + "received block from {:?}, sync_block returned {}", + msg.sender, + res + ); + } } P2pMessageKind::Gossip(GossipMessageKind::Tributary(msg_genesis)) => { diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index a4c6bfe5..0ea74bfe 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -1,5 +1,5 @@ use core::{marker::PhantomData, fmt::Debug}; -use std::{sync::Arc, io, collections::VecDeque}; +use std::{sync::Arc, io}; use async_trait::async_trait; @@ -154,14 +154,6 @@ pub struct Tributary { synced_block: Arc>>>, synced_block_result: Arc>, messages: Arc>>>, - - p2p_meta_task_handle: Arc, -} - -impl Drop for Tributary { - fn drop(&mut self) { - self.p2p_meta_task_handle.abort(); - } } impl Tributary { @@ -193,28 +185,7 @@ impl Tributary { ); let blockchain = Arc::new(RwLock::new(blockchain)); - let to_rebroadcast = Arc::new(RwLock::new(VecDeque::new())); - // Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the - // P2P layer - let p2p_meta_task_handle = Arc::new( - tokio::spawn({ - let to_rebroadcast = to_rebroadcast.clone(); - let p2p = p2p.clone(); - async move { - loop { - let to_rebroadcast = to_rebroadcast.read().await.clone(); - for msg in to_rebroadcast { - p2p.broadcast(genesis, msg).await; - } - tokio::time::sleep(core::time::Duration::from_secs(60)).await; - } - } - }) - .abort_handle(), - ); - - let network = - TendermintNetwork { genesis, signer, validators, blockchain, to_rebroadcast, p2p }; + let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p }; let TendermintHandle { synced_block, synced_block_result, messages, machine } = TendermintMachine::new( @@ -235,7 +206,6 @@ impl Tributary { synced_block: Arc::new(RwLock::new(synced_block)), synced_block_result: Arc::new(RwLock::new(synced_block_result)), messages: Arc::new(RwLock::new(messages)), - p2p_meta_task_handle, }) } diff --git a/coordinator/tributary/src/tendermint/mod.rs b/coordinator/tributary/src/tendermint/mod.rs index e38efa5d..0ce6232c 100644 --- a/coordinator/tributary/src/tendermint/mod.rs +++ b/coordinator/tributary/src/tendermint/mod.rs @@ -1,8 +1,5 @@ use core::ops::Deref; -use std::{ - sync::Arc, - collections::{VecDeque, HashMap}, -}; +use std::{sync::Arc, collections::HashMap}; use async_trait::async_trait; @@ -270,8 +267,6 @@ pub struct TendermintNetwork { pub(crate) validators: Arc, pub(crate) blockchain: Arc>>, - pub(crate) to_rebroadcast: Arc>>>, - pub(crate) p2p: P, } @@ -308,26 +303,6 @@ impl Network for TendermintNetwork async fn broadcast(&mut self, msg: SignedMessageFor) { let mut to_broadcast = vec![TENDERMINT_MESSAGE]; to_broadcast.extend(msg.encode()); - - // Since we're broadcasting a Tendermint message, set it to be re-broadcasted every second - // until the block it's trying to build is complete - // If the P2P layer drops a message before all nodes obtained access, or a node had an - // intermittent failure, this will ensure reconcilliation - // This is atrocious if there's no content-based deduplication protocol for messages actively - // being gossiped - // LibP2p, as used by Serai, is configured to content-based deduplicate - { - let mut to_rebroadcast_lock = self.to_rebroadcast.write().await; - to_rebroadcast_lock.push_back(to_broadcast.clone()); - // We should have, ideally, 3 * validators messages within a round - // Therefore, this should keep the most recent 2-rounds - // TODO: This isn't perfect. Each participant should just rebroadcast their latest round of - // messages - while to_rebroadcast_lock.len() > (6 * self.validators.weights.len()) { - to_rebroadcast_lock.pop_front(); - } - } - self.p2p.broadcast(self.genesis, to_broadcast).await } @@ -366,7 +341,7 @@ impl Network for TendermintNetwork } } - async fn validate(&mut self, block: &Self::Block) -> Result<(), TendermintBlockError> { + async fn validate(&self, block: &Self::Block) -> Result<(), TendermintBlockError> { let block = Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?; self @@ -428,9 +403,6 @@ impl Network for TendermintNetwork } } - // Since we've added a valid block, clear to_rebroadcast - *self.to_rebroadcast.write().await = VecDeque::new(); - Some(TendermintBlock( self.blockchain.write().await.build_block::(&self.signature_scheme()).serialize(), )) diff --git a/coordinator/tributary/tendermint/src/block.rs b/coordinator/tributary/tendermint/src/block.rs index 8fc79018..8afe00f0 100644 --- a/coordinator/tributary/tendermint/src/block.rs +++ b/coordinator/tributary/tendermint/src/block.rs @@ -3,7 +3,6 @@ use std::{ collections::{HashSet, HashMap}, }; -use parity_scale_codec::Encode; use serai_db::{Get, DbTxn, Db}; use crate::{ @@ -20,7 +19,7 @@ pub(crate) struct BlockData { pub(crate) number: BlockNumber, pub(crate) validator_id: Option, - pub(crate) proposal: Option, + pub(crate) our_proposal: Option, pub(crate) log: MessageLog, pub(crate) slashes: HashSet, @@ -43,7 +42,7 @@ impl BlockData { weights: Arc, number: BlockNumber, validator_id: Option, - proposal: Option, + our_proposal: Option, ) -> BlockData { BlockData { db, @@ -51,7 +50,7 @@ impl BlockData { number, validator_id, - proposal, + our_proposal, log: MessageLog::new(weights), slashes: HashSet::new(), @@ -108,17 +107,17 @@ impl BlockData { self.populate_end_time(round); } - // 11-13 + // L11-13 self.round = Some(RoundData::::new( round, time.unwrap_or_else(|| self.end_time[&RoundNumber(round.0 - 1)]), )); self.end_time.insert(round, self.round().end_time()); - // 14-21 + // L14-21 if Some(proposer) == self.validator_id { let (round, block) = self.valid.clone().unzip(); - block.or_else(|| self.proposal.clone()).map(|block| Data::Proposal(round, block)) + block.or_else(|| self.our_proposal.clone()).map(|block| Data::Proposal(round, block)) } else { self.round_mut().set_timeout(Step::Propose); None @@ -198,8 +197,8 @@ impl BlockData { assert!(!new_round); None?; } - // Put this message to the DB - txn.put(&msg_key, res.encode()); + // Put that we're sending this message to the DB + txn.put(&msg_key, []); txn.commit(); } diff --git a/coordinator/tributary/tendermint/src/ext.rs b/coordinator/tributary/tendermint/src/ext.rs index b3d568a2..3869d9d9 100644 --- a/coordinator/tributary/tendermint/src/ext.rs +++ b/coordinator/tributary/tendermint/src/ext.rs @@ -288,7 +288,7 @@ pub trait Network: Sized + Send + Sync { async fn slash(&mut self, validator: Self::ValidatorId, slash_event: SlashEvent); /// Validate a block. - async fn validate(&mut self, block: &Self::Block) -> Result<(), BlockError>; + async fn validate(&self, block: &Self::Block) -> Result<(), BlockError>; /// Add a block, returning the proposal for the next one. /// diff --git a/coordinator/tributary/tendermint/src/lib.rs b/coordinator/tributary/tendermint/src/lib.rs index 1d3765aa..e5193908 100644 --- a/coordinator/tributary/tendermint/src/lib.rs +++ b/coordinator/tributary/tendermint/src/lib.rs @@ -3,10 +3,10 @@ use core::fmt::Debug; use std::{ sync::Arc, time::{SystemTime, Instant, Duration}, - collections::VecDeque, + collections::{VecDeque, HashMap}, }; -use parity_scale_codec::{Encode, Decode}; +use parity_scale_codec::{Encode, Decode, IoReader}; use futures_channel::mpsc; use futures_util::{ @@ -15,6 +15,8 @@ use futures_util::{ }; use tokio::time::sleep; +use serai_db::{Get, DbTxn, Db}; + pub mod time; use time::{sys_time, CanonicalInstant}; @@ -30,6 +32,11 @@ pub(crate) mod message_log; pub mod ext; use ext::*; +const MESSAGE_TAPE_KEY: &[u8] = b"tendermint-machine-message_tape"; +fn message_tape_key(genesis: [u8; 32]) -> Vec { + [MESSAGE_TAPE_KEY, &genesis].concat() +} + pub fn commit_msg(end_time: u64, id: &[u8]) -> Vec { [&end_time.to_le_bytes(), id].concat() } @@ -62,6 +69,17 @@ impl PartialEq for Data { } } +impl core::hash::Hash for Data { + fn hash(&self, state: &mut H) { + match self { + Data::Proposal(valid_round, block) => (0, valid_round, block.id().as_ref()).hash(state), + Data::Prevote(id) => (1, id.as_ref().map(AsRef::<[u8]>::as_ref)).hash(state), + Data::Precommit(None) => (2, 0).hash(state), + Data::Precommit(Some((id, _))) => (2, 1, id.as_ref()).hash(state), + } + } +} + impl Data { pub fn step(&self) -> Step { match self { @@ -103,9 +121,23 @@ impl SignedMessage { } } +#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)] +pub enum SlashReason { + FailToPropose, + InvalidBlock, + InvalidProposer, +} + +#[derive(Clone, PartialEq, Eq, Debug, Encode, Decode)] +pub enum Evidence { + ConflictingMessages(Vec, Vec), + InvalidPrecommit(Vec), + InvalidValidRound(Vec), +} + #[derive(Clone, PartialEq, Eq, Debug)] -pub enum TendermintError { - Malicious(N::ValidatorId, Option), +pub enum TendermintError { + Malicious, Temporal, AlreadyHandled, InvalidEvidence, @@ -126,20 +158,6 @@ pub type SignedMessageFor = SignedMessage< <::SignatureScheme as SignatureScheme>::Signature, >; -#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)] -pub enum SlashReason { - FailToPropose, - InvalidBlock, - InvalidMessage, -} - -#[derive(Clone, PartialEq, Eq, Debug, Encode, Decode)] -pub enum Evidence { - ConflictingMessages(Vec, Vec), - InvalidPrecommit(Vec), - InvalidValidRound(Vec), -} - pub fn decode_signed_message(mut data: &[u8]) -> Option> { SignedMessageFor::::decode(&mut data).ok() } @@ -147,7 +165,7 @@ pub fn decode_signed_message(mut data: &[u8]) -> Option( data: &[u8], schema: &N::SignatureScheme, -) -> Result, TendermintError> { +) -> Result, TendermintError> { let msg = decode_signed_message::(data).ok_or(TendermintError::InvalidEvidence)?; // verify that evidence messages are signed correctly @@ -162,7 +180,7 @@ pub fn verify_tendermint_evidence( evidence: &Evidence, schema: &N::SignatureScheme, commit: impl Fn(u64) -> Option>, -) -> Result<(), TendermintError> { +) -> Result<(), TendermintError> { match evidence { Evidence::ConflictingMessages(first, second) => { let first = decode_and_verify_signed_message::(first, schema)?.msg; @@ -186,15 +204,16 @@ pub fn verify_tendermint_evidence( }; // TODO: We need to be passed in the genesis time to handle this edge case if msg.block.0 == 0 { - todo!("invalid precommit signature on first block") + Err(TendermintError::InvalidEvidence)? + // todo!("invalid precommit signature on first block") } // get the last commit let prior_commit = match commit(msg.block.0 - 1) { Some(c) => c, - // If we have yet to sync the block in question, we will return InvalidContent based + // If we have yet to sync the block in question, we will return InvalidEvidence based // on our own temporal ambiguity - // This will also cause an InvalidContent for anything using a non-existent block, + // This will also cause an InvalidEvidence for anything using a non-existent block, // yet that's valid behavior // TODO: Double check the ramifications of this _ => Err(TendermintError::InvalidEvidence)?, @@ -229,6 +248,16 @@ pub enum SlashEvent { WithEvidence(Evidence), } +// Struct for if various upon handlers have been triggered to ensure they don't trigger multiple +// times. +#[derive(Clone, PartialEq, Eq, Debug)] +struct Upons { + upon_prevotes: bool, + upon_successful_current_round_prevotes: bool, + upon_negative_current_round_prevotes: bool, + upon_precommits: bool, +} + /// A machine executing the Tendermint protocol. pub struct TendermintMachine { db: N::Db, @@ -245,6 +274,10 @@ pub struct TendermintMachine { synced_block_result_send: mpsc::UnboundedSender, block: BlockData, + // TODO: Move this into the Block struct + round_proposals: HashMap, N::Block)>, + // TODO: Move this into the Round struct + upons: Upons, } pub struct SyncedBlock { @@ -287,6 +320,14 @@ impl TendermintMachine { // Start a new round. Returns true if we were the proposer fn round(&mut self, round: RoundNumber, time: Option) -> bool { + // Clear upons + self.upons = Upons { + upon_prevotes: false, + upon_successful_current_round_prevotes: false, + upon_negative_current_round_prevotes: false, + upon_precommits: false, + }; + let proposer = self.weights.proposer(self.block.number, round); let res = if let Some(data) = self.block.new_round(round, proposer, time) { self.broadcast(data); @@ -325,6 +366,13 @@ impl TendermintMachine { ); sleep(time_until_round_end).await; + // Clear the message tape + { + let mut txn = self.db.txn(); + txn.del(message_tape_key(self.genesis)); + txn.commit(); + } + // Clear our outbound message queue self.queue = VecDeque::new(); @@ -338,6 +386,9 @@ impl TendermintMachine { proposal, ); + // Reset the round proposals + self.round_proposals = HashMap::new(); + // Start the first round self.round(RoundNumber(0), Some(round_end)); } @@ -375,6 +426,410 @@ impl TendermintMachine { } } + fn proposal_for_round(&self, round: RoundNumber) -> Option<(Option, &N::Block)> { + self.round_proposals.get(&round).map(|(round, block)| (*round, block)) + } + + // L22-27 + fn upon_proposal_without_valid_round(&mut self) { + if self.block.round().step != Step::Propose { + return; + } + + // If we have the proposal message... + let Some((None, block)) = self.proposal_for_round(self.block.round().number) else { + return; + }; + + // There either needs to not be a locked value or it must be equivalent + #[allow(clippy::map_unwrap_or)] + if self + .block + .locked + .as_ref() + .map(|(_round, locked_block)| block.id() == *locked_block) + .unwrap_or(true) + { + self.broadcast(Data::Prevote(Some(block.id()))); + } else { + self.broadcast(Data::Prevote(None)); + } + } + + // L28-33 + fn upon_proposal_with_valid_round(&mut self) { + if self.block.round().step != Step::Propose { + return; + } + + // If we have the proposal message... + let Some((Some(proposal_valid_round), block)) = + self.proposal_for_round(self.block.round().number) + else { + return; + }; + + // Check we have the necessary prevotes + if !self.block.log.has_consensus(proposal_valid_round, &Data::Prevote(Some(block.id()))) { + return; + } + + // We don't check valid round < current round as the `message` function does + + // If locked is None, lockedRoundp is -1 and less than valid round + #[allow(clippy::map_unwrap_or)] + let locked_clause_1 = self + .block + .locked + .as_ref() + .map(|(locked_round, _block)| locked_round.0 <= proposal_valid_round.0) + .unwrap_or(true); + // The second clause is if the locked values are equivalent. If no value is locked, they aren't + #[allow(clippy::map_unwrap_or)] + let locked_clause_2 = self + .block + .locked + .as_ref() + .map(|(_round, locked_block)| block.id() == *locked_block) + .unwrap_or(false); + + if locked_clause_1 || locked_clause_2 { + self.broadcast(Data::Prevote(Some(block.id()))); + } else { + self.broadcast(Data::Prevote(None)); + } + } + + // L34-35 + fn upon_prevotes(&mut self) { + if self.upons.upon_prevotes || (self.block.round().step != Step::Prevote) { + return; + } + + if self.block.log.has_participation(self.block.round().number, Step::Prevote) { + self.block.round_mut().set_timeout(Step::Prevote); + self.upons.upon_prevotes = true; + } + } + + // L36-43 + async fn upon_successful_current_round_prevotes(&mut self) { + // Returning if `self.step == Step::Propose` is equivalent to guarding `step >= prevote` + if self.upons.upon_successful_current_round_prevotes || + (self.block.round().step == Step::Propose) + { + return; + } + + // If we have the proposal message... + let Some((_, block)) = self.proposal_for_round(self.block.round().number) else { + return; + }; + + // Check we have the necessary prevotes + if !self.block.log.has_consensus(self.block.round().number, &Data::Prevote(Some(block.id()))) { + return; + } + + let block = block.clone(); + self.upons.upon_successful_current_round_prevotes = true; + + if self.block.round().step == Step::Prevote { + self.block.locked = Some((self.block.round().number, block.id())); + let signature = self + .signer + .sign(&commit_msg( + self.block.end_time[&self.block.round().number].canonical(), + block.id().as_ref(), + )) + .await; + self.broadcast(Data::Precommit(Some((block.id(), signature)))); + } + self.block.valid = Some((self.block.round().number, block)); + } + + // L44-46 + fn upon_negative_current_round_prevotes(&mut self) { + if self.upons.upon_negative_current_round_prevotes || (self.block.round().step != Step::Prevote) + { + return; + } + + if self.block.log.has_consensus(self.block.round().number, &Data::Prevote(None)) { + self.broadcast(Data::Precommit(None)); + } + + self.upons.upon_negative_current_round_prevotes = true; + } + + // L47-48 + fn upon_precommits(&mut self) { + if self.upons.upon_precommits { + return; + } + + if self.block.log.has_participation(self.block.round().number, Step::Precommit) { + self.block.round_mut().set_timeout(Step::Precommit); + self.upons.upon_precommits = true; + } + } + + // L22-48 + async fn all_current_round_upons(&mut self) { + self.upon_proposal_without_valid_round(); + self.upon_proposal_with_valid_round(); + self.upon_prevotes(); + self.upon_successful_current_round_prevotes().await; + self.upon_negative_current_round_prevotes(); + self.upon_precommits(); + } + + // L49-54 + async fn upon_successful_precommits(&mut self, round: RoundNumber) -> bool { + // If we have the proposal message... + let Some((_, block)) = self.proposal_for_round(round) else { return false }; + + // Check we have the necessary precommits + // The precommit we check we have consensus upon uses a junk signature since message equality + // disregards the signature + if !self + .block + .log + .has_consensus(round, &Data::Precommit(Some((block.id(), self.signer.sign(&[]).await)))) + { + return false; + } + + // Get all participants in this commit + let mut validators = vec![]; + let mut sigs = vec![]; + // Get all precommits for this round + for (validator, msgs) in &self.block.log.log[&round] { + if let Some(signed) = msgs.get(&Step::Precommit) { + if let Data::Precommit(Some((id, sig))) = &signed.msg.data { + // If this precommit was for this block, include it + if *id == block.id() { + validators.push(*validator); + sigs.push(sig.clone()); + } + } + } + } + + // Form the commit itself + let commit_msg = commit_msg(self.block.end_time[&round].canonical(), block.id().as_ref()); + let commit = Commit { + end_time: self.block.end_time[&round].canonical(), + validators: validators.clone(), + signature: self.network.signature_scheme().aggregate(&validators, &commit_msg, &sigs), + }; + debug_assert!(self.network.verify_commit(block.id(), &commit)); + + // Add the block and reset the machine + log::info!( + target: "tendermint", + "TendermintMachine produced block {}", + hex::encode(block.id().as_ref()), + ); + let id = block.id(); + let proposal = self.network.add_block(block.clone(), commit).await; + log::trace!( + target: "tendermint", + "added block {} (produced by machine)", + hex::encode(id.as_ref()), + ); + self.reset(round, proposal).await; + + true + } + + // L49-54 + async fn all_any_round_upons(&mut self, round: RoundNumber) -> bool { + self.upon_successful_precommits(round).await + } + + // Returns Ok(true) if this was a Precommit which had either no signature or its signature + // validated + // Returns Ok(false) if it wasn't a Precommit or the signature wasn't validated yet + // Returns Err if the signature was invalid + async fn verify_precommit_signature( + &mut self, + signed: &SignedMessageFor, + ) -> Result { + let msg = &signed.msg; + if let Data::Precommit(precommit) = &msg.data { + let Some((id, sig)) = precommit else { return Ok(true) }; + // Also verify the end_time of the commit + // Only perform this verification if we already have the end_time + // Else, there's a DoS where we receive a precommit for some round infinitely in the future + // which forces us to calculate every end time + if let Some(end_time) = self.block.end_time.get(&msg.round) { + if !self.validators.verify(msg.sender, &commit_msg(end_time.canonical(), id.as_ref()), sig) + { + log::warn!(target: "tendermint", "validator produced an invalid commit signature"); + self + .slash( + msg.sender, + SlashEvent::WithEvidence(Evidence::InvalidPrecommit(signed.encode())), + ) + .await; + Err(TendermintError::Malicious)?; + } + return Ok(true); + } + } + Ok(false) + } + + async fn message(&mut self, signed: &SignedMessageFor) -> Result<(), TendermintError> { + let msg = &signed.msg; + if msg.block != self.block.number { + Err(TendermintError::Temporal)?; + } + + // If this is a precommit, verify its signature + self.verify_precommit_signature(signed).await?; + + // Only let the proposer propose + if matches!(msg.data, Data::Proposal(..)) && + (msg.sender != self.weights.proposer(msg.block, msg.round)) + { + log::warn!(target: "tendermint", "validator who wasn't the proposer proposed"); + // TODO: This should have evidence + self + .slash(msg.sender, SlashEvent::Id(SlashReason::InvalidProposer, msg.block.0, msg.round.0)) + .await; + Err(TendermintError::Malicious)?; + }; + + // If this is a proposal, verify the block + // If the block is invalid, drop the message, letting the timeout cover it + // This prevents needing to check if valid inside every `upon` block + if let Data::Proposal(_, block) = &msg.data { + match self.network.validate(block).await { + Ok(()) => {} + Err(BlockError::Temporal) => return Err(TendermintError::Temporal), + Err(BlockError::Fatal) => { + log::warn!(target: "tendermint", "validator proposed a fatally invalid block"); + self + .slash( + msg.sender, + SlashEvent::Id(SlashReason::InvalidBlock, self.block.number.0, msg.round.0), + ) + .await; + Err(TendermintError::Malicious)?; + } + }; + } + + // If this is a proposal, verify the valid round isn't fundamentally invalid + if let Data::Proposal(Some(valid_round), _) = msg.data { + if valid_round.0 >= msg.round.0 { + log::warn!( + target: "tendermint", + "proposed proposed with a syntactically invalid valid round", + ); + self + .slash(msg.sender, SlashEvent::WithEvidence(Evidence::InvalidValidRound(msg.encode()))) + .await; + Err(TendermintError::Malicious)?; + } + } + + // Add it to the log, returning if it was already handled + match self.block.log.log(signed.clone()) { + Ok(true) => {} + Ok(false) => Err(TendermintError::AlreadyHandled)?, + Err(evidence) => { + self.slash(msg.sender, SlashEvent::WithEvidence(evidence)).await; + Err(TendermintError::Malicious)?; + } + } + log::debug!( + target: "tendermint", + "received new tendermint message (block: {}, round: {}, step: {:?})", + msg.block.0, + msg.round.0, + msg.data.step(), + ); + + // If this is a proposal, insert it + if let Data::Proposal(vr, block) = &msg.data { + self.round_proposals.insert(msg.round, (*vr, block.clone())); + } + + // L55-56 + // Jump ahead if we should + if (msg.round.0 > self.block.round().number.0) && + (self.block.log.round_participation(msg.round) >= self.weights.fault_threshold()) + { + log::debug!( + target: "tendermint", + "jumping from round {} to round {}", + self.block.round().number.0, + msg.round.0, + ); + + // Jump to the new round. + let old_round = self.block.round().number; + self.round(msg.round, None); + + // If any jumped over/to round already has precommit messages, verify their signatures + for jumped in (old_round.0 + 1) ..= msg.round.0 { + let jumped = RoundNumber(jumped); + let round_msgs = self.block.log.log.get(&jumped).cloned().unwrap_or_default(); + for (validator, msgs) in &round_msgs { + if let Some(existing) = msgs.get(&Step::Precommit) { + if let Ok(res) = self.verify_precommit_signature(existing).await { + // Ensure this actually verified the signature instead of believing it shouldn't yet + assert!(res); + } else { + // Remove the message so it isn't counted towards forming a commit/included in one + // This won't remove the fact they precommitted for this block hash in the MessageLog + // TODO: Don't even log these in the first place until we jump, preventing needing + // to do this in the first place + self + .block + .log + .log + .get_mut(&jumped) + .unwrap() + .get_mut(validator) + .unwrap() + .remove(&Step::Precommit) + .unwrap(); + } + } + } + } + } + + // Now that we've jumped, and: + // 1) If this is a message for an old round, verified the precommit signatures + // 2) If this is a message for what was the current round, verified the precommit signatures + // 3) If this is a message for what was a future round, verified the precommit signatures if it + // has 34+% participation + // Run all `upons` run for any round, which may produce a Commit if it has 67+% participation + // (returning true if it does, letting us return now) + // It's necessary to verify the precommit signatures before Commit production is allowed, hence + // this specific flow + if self.all_any_round_upons(msg.round).await { + return Ok(()); + } + + // If this is a historic round, or a future round without sufficient participation, return + if msg.round.0 != self.block.round().number.0 { + return Ok(()); + } + // msg.round is now guaranteed to be equal to self.block.round().number + debug_assert_eq!(msg.round, self.block.round().number); + + // Run all `upons` run for the current round + self.all_current_round_upons().await; + + Ok(()) + } + /// Create a new Tendermint machine, from the specified point, with the specified block as the /// one to propose next. This will return a channel to send messages from the gossip layer and /// the machine itself. The machine should have `run` called from an asynchronous task. @@ -419,7 +874,7 @@ impl TendermintMachine { let validators = network.signature_scheme(); let weights = Arc::new(network.weights()); let validator_id = signer.validator_id().await; - // 01-10 + // L01-10 let mut machine = TendermintMachine { db: db.clone(), genesis, @@ -442,6 +897,15 @@ impl TendermintMachine { validator_id, Some(proposal), ), + + round_proposals: HashMap::new(), + + upons: Upons { + upon_prevotes: false, + upon_successful_current_round_prevotes: false, + upon_negative_current_round_prevotes: false, + upon_precommits: false, + }, }; // The end time of the last block is the start time for this one @@ -460,16 +924,16 @@ impl TendermintMachine { pub async fn run(mut self) { log::debug!(target: "tendermint", "running TendermintMachine"); + let mut rebroadcast_future = Box::pin(sleep(Duration::from_secs(60))).fuse(); loop { // Also create a future for if the queue has a message // Does not pop_front as if another message has higher priority, its future will be handled // instead in this loop, and the popped value would be dropped with the next iteration - // While no other message has a higher priority right now, this is a safer practice let mut queue_future = if self.queue.is_empty() { Fuse::terminated() } else { future::ready(()).fuse() }; if let Some((our_message, msg, mut sig)) = futures_util::select_biased! { - // Handle a new block occurring externally (an external sync loop) + // Handle a new block occurring externally (from an external sync loop) // Has the highest priority as it makes all other futures here irrelevant msg = self.synced_block_recv.next() => { if let Some(SyncedBlock { number, block, commit }) = msg { @@ -503,16 +967,19 @@ impl TendermintMachine { Some((true, self.queue.pop_front().unwrap(), None)) }, + // L57-67 // Handle any timeouts step = self.block.round().timeout_future().fuse() => { // Remove the timeout so it doesn't persist, always being the selected future due to bias // While this does enable the timeout to be entered again, the timeout setting code will // never attempt to add a timeout after its timeout has expired + // (due to it setting an `upon` boolean) self.block.round_mut().timeouts.remove(&step); - // Only run if it's still the step in question - if self.block.round().step == step { - match step { - Step::Propose => { + + match step { + Step::Propose => { + // Only run if it's still the step in question + if self.block.round().step == step { // Slash the validator for not proposing when they should've log::debug!(target: "tendermint", "validator didn't propose when they should have"); // this slash will be voted on. @@ -525,14 +992,43 @@ impl TendermintMachine { ), ).await; self.broadcast(Data::Prevote(None)); - }, - Step::Prevote => self.broadcast(Data::Precommit(None)), - Step::Precommit => { - self.round(RoundNumber(self.block.round().number.0 + 1), None); - continue; } + }, + Step::Prevote => { + // Only run if it's still the step in question + if self.block.round().step == step { + self.broadcast(Data::Precommit(None)) + } + }, + Step::Precommit => { + self.round(RoundNumber(self.block.round().number.0 + 1), None); } + }; + + // Execute the upons now that the state has changed + self.all_any_round_upons(self.block.round().number).await; + self.all_current_round_upons().await; + + None + }, + + // If it's been more than 60s, rebroadcast our own messages + () = rebroadcast_future => { + log::trace!("rebroadcast future hit within tendermint machine"); + let key = message_tape_key(self.genesis); + let messages = self.db.get(key).unwrap_or(vec![]); + let mut messages = messages.as_slice(); + + while !messages.is_empty() { + self.network.broadcast( + SignedMessageFor::::decode(&mut IoReader(&mut messages)) + .expect("saved invalid message to DB") + ).await; } + + // Reset the rebroadcast future + rebroadcast_future = Box::pin(sleep(core::time::Duration::from_secs(60))).fuse(); + None }, @@ -554,429 +1050,32 @@ impl TendermintMachine { } let sig = sig.unwrap(); - // TODO: message may internally call broadcast. We should check within broadcast it's not - // broadcasting our own message at this time. let signed_msg = SignedMessage { msg: msg.clone(), sig: sig.clone() }; let res = self.message(&signed_msg).await; + // If this is our message, and we hit an invariant, we could be slashed. + // We only broadcast our message after running it ourselves, to ensure it doesn't error, to + // ensure we don't get slashed on invariants. if res.is_err() && our_message { panic!("honest node (ourselves) had invalid behavior"); } - // Only now should we allow broadcasts since we're sure an invariant wasn't reached causing - // us to have invalid messages. + // Save this message to a linear tape of all our messages for this block, if ours + // TODO: Since we do this after we mark this message as sent to prevent equivocations, a + // precisely time reboot could cause this message marked as sent yet not added to the tape + if our_message { + let message_tape_key = message_tape_key(self.genesis); + let mut txn = self.db.txn(); + let mut message_tape = txn.get(&message_tape_key).unwrap_or(vec![]); + message_tape.extend(signed_msg.encode()); + txn.put(&message_tape_key, message_tape); + txn.commit(); + } + + // Re-broadcast this since it's an original consensus message worth handling if res.is_ok() { - // Re-broadcast this since it's an original consensus message self.network.broadcast(signed_msg).await; } - - match res { - Ok(None) => {} - Ok(Some(block)) => { - let mut validators = vec![]; - let mut sigs = vec![]; - // Get all precommits for this round - for (validator, msgs) in &self.block.log.log[&msg.round] { - if let Some(signed) = msgs.get(&Step::Precommit) { - if let Data::Precommit(Some((id, sig))) = &signed.msg.data { - // If this precommit was for this block, include it - if *id == block.id() { - validators.push(*validator); - sigs.push(sig.clone()); - } - } - } - } - - let commit_msg = - commit_msg(self.block.end_time[&msg.round].canonical(), block.id().as_ref()); - let commit = Commit { - end_time: self.block.end_time[&msg.round].canonical(), - validators: validators.clone(), - signature: self.network.signature_scheme().aggregate(&validators, &commit_msg, &sigs), - }; - debug_assert!(self.network.verify_commit(block.id(), &commit)); - - log::info!( - target: "tendermint", - "TendermintMachine produced block {}", - hex::encode(block.id().as_ref()), - ); - let id = block.id(); - let proposal = self.network.add_block(block, commit).await; - log::trace!( - target: "tendermint", - "added block {} (produced by machine)", - hex::encode(id.as_ref()), - ); - self.reset(msg.round, proposal).await; - } - Err(TendermintError::Malicious(sender, evidence)) => { - let current_msg = SignedMessage { msg: msg.clone(), sig: sig.clone() }; - - let slash = if let Some(ev) = evidence { - // if the malicious message contains a block, only vote to slash - // TODO: Should this decision be made at a higher level? - // A higher-level system may be able to verify if the contained block is fatally - // invalid - // A higher-level system may accept the bandwidth size of this, even if the issue is - // just the valid round field - if let Data::Proposal(_, _) = ¤t_msg.msg.data { - SlashEvent::Id( - SlashReason::InvalidBlock, - self.block.number.0, - self.block.round().number.0, - ) - } else { - // slash with evidence otherwise - SlashEvent::WithEvidence(ev) - } - } else { - // we don't have evidence. Slash with vote. - SlashEvent::Id( - SlashReason::InvalidMessage, - self.block.number.0, - self.block.round().number.0, - ) - }; - - // Each message that we're voting to slash over needs to be re-broadcasted so other - // validators also trigger their own votes - // TODO: should this be inside slash function? - if let SlashEvent::Id(_, _, _) = slash { - self.network.broadcast(current_msg).await; - } - - self.slash(sender, slash).await - } - Err( - TendermintError::Temporal | - TendermintError::AlreadyHandled | - TendermintError::InvalidEvidence, - ) => (), - } } } } - - // Returns Ok(true) if this was a Precommit which had either no signature or its signature - // validated - // Returns Ok(false) if it wasn't a Precommit or the signature wasn't validated yet - // Returns Err if the signature was invalid - fn verify_precommit_signature( - &self, - signed: &SignedMessageFor, - ) -> Result> { - let msg = &signed.msg; - if let Data::Precommit(precommit) = &msg.data { - let Some((id, sig)) = precommit else { return Ok(true) }; - // Also verify the end_time of the commit - // Only perform this verification if we already have the end_time - // Else, there's a DoS where we receive a precommit for some round infinitely in the future - // which forces us to calculate every end time - if let Some(end_time) = self.block.end_time.get(&msg.round) { - if !self.validators.verify(msg.sender, &commit_msg(end_time.canonical(), id.as_ref()), sig) - { - log::warn!(target: "tendermint", "Validator produced an invalid commit signature"); - Err(TendermintError::Malicious( - msg.sender, - Some(Evidence::InvalidPrecommit(signed.encode())), - ))?; - } - return Ok(true); - } - } - Ok(false) - } - - async fn message( - &mut self, - signed: &SignedMessageFor, - ) -> Result, TendermintError> { - let msg = &signed.msg; - if msg.block != self.block.number { - Err(TendermintError::Temporal)?; - } - - if (msg.block == self.block.number) && - (msg.round == self.block.round().number) && - (msg.data.step() == Step::Propose) - { - log::trace!( - target: "tendermint", - "received Propose for block {}, round {}", - msg.block.0, - msg.round.0, - ); - } - - // If this is a precommit, verify its signature - self.verify_precommit_signature(signed)?; - - // Only let the proposer propose - if matches!(msg.data, Data::Proposal(..)) && - (msg.sender != self.weights.proposer(msg.block, msg.round)) - { - log::warn!(target: "tendermint", "Validator who wasn't the proposer proposed"); - // TODO: This should have evidence - Err(TendermintError::Malicious(msg.sender, None))?; - }; - - if !self.block.log.log(signed.clone())? { - return Err(TendermintError::AlreadyHandled); - } - log::trace!( - target: "tendermint", - "received new tendermint message (block: {}, round: {}, step: {:?})", - msg.block.0, - msg.round.0, - msg.data.step(), - ); - - // All functions, except for the finalizer and the jump, are locked to the current round - - // Run the finalizer to see if it applies - // 49-52 - if matches!(msg.data, Data::Proposal(..)) || matches!(msg.data, Data::Precommit(_)) { - let proposer = self.weights.proposer(self.block.number, msg.round); - - // Get the proposal - if let Some(proposal_signed) = self.block.log.get(msg.round, proposer, Step::Propose) { - if let Data::Proposal(_, block) = &proposal_signed.msg.data { - // Check if it has gotten a sufficient amount of precommits - // Uses a junk signature since message equality disregards the signature - if self.block.log.has_consensus( - msg.round, - &Data::Precommit(Some((block.id(), self.signer.sign(&[]).await))), - ) { - // If msg.round is in the future, these Precommits won't have their inner signatures - // verified - // It should be impossible for msg.round to be in the future however, as this requires - // 67% of validators to Precommit, and we jump on 34% participating in the new round - // The one exception would be if a validator had 34%, and could cause participation to - // go from 33% (not enough to jump) to 67%, without executing the below code - // This also would require the local machine to be outside of allowed time tolerances, - // or the validator with 34% to not be publishing Prevotes (as those would cause a - // a jump) - // Both are invariants - // TODO: Replace this panic with an inner signature check - assert!(msg.round.0 <= self.block.round().number.0); - - log::debug!(target: "tendermint", "block {} has consensus", msg.block.0); - return Ok(Some(block.clone())); - } - } - } - } - - // Else, check if we need to jump ahead - #[allow(clippy::comparison_chain)] - if msg.round.0 < self.block.round().number.0 { - // Prior round, disregard if not finalizing - return Ok(None); - } else if msg.round.0 > self.block.round().number.0 { - // 55-56 - // Jump, enabling processing by the below code - if self.block.log.round_participation(msg.round) > self.weights.fault_threshold() { - log::debug!( - target: "tendermint", - "jumping from round {} to round {}", - self.block.round().number.0, - msg.round.0, - ); - - // Jump to the new round. - let proposer = self.round(msg.round, None); - - // If this round already has precommit messages, verify their signatures - let round_msgs = self.block.log.log[&msg.round].clone(); - for (validator, msgs) in &round_msgs { - if let Some(existing) = msgs.get(&Step::Precommit) { - if let Ok(res) = self.verify_precommit_signature(existing) { - // Ensure this actually verified the signature instead of believing it shouldn't yet - assert!(res); - } else { - // Remove the message so it isn't counted towards forming a commit/included in one - // This won't remove the fact they precommitted for this block hash in the MessageLog - // TODO: Don't even log these in the first place until we jump, preventing needing - // to do this in the first place - let msg = self - .block - .log - .log - .get_mut(&msg.round) - .unwrap() - .get_mut(validator) - .unwrap() - .remove(&Step::Precommit) - .unwrap(); - - // Slash the validator for publishing an invalid commit signature - self - .slash( - *validator, - SlashEvent::WithEvidence(Evidence::InvalidPrecommit(msg.encode())), - ) - .await; - } - } - } - - // If we're the proposer, return now we don't waste time on the current round - // (as it doesn't have a proposal, since we didn't propose, and cannot complete) - if proposer { - return Ok(None); - } - } else { - // Future round which we aren't ready to jump to, so return for now - return Ok(None); - } - } - - // msg.round is now guaranteed to be equal to self.block.round().number - debug_assert_eq!(msg.round, self.block.round().number); - - // The paper executes these checks when the step is prevote. Making sure this message warrants - // rerunning these checks is a sane optimization since message instances is a full iteration - // of the round map - if (self.block.round().step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) { - let (participation, weight) = - self.block.log.message_instances(self.block.round().number, &Data::Prevote(None)); - let threshold_weight = self.weights.threshold(); - if participation < threshold_weight { - log::trace!( - target: "tendermint", - "progess towards setting prevote timeout, participation: {}, needed: {}", - participation, - threshold_weight, - ); - } - // 34-35 - if participation >= threshold_weight { - log::trace!( - target: "tendermint", - "setting timeout for prevote due to sufficient participation", - ); - self.block.round_mut().set_timeout(Step::Prevote); - } - - // 44-46 - if weight >= threshold_weight { - self.broadcast(Data::Precommit(None)); - return Ok(None); - } - } - - // 47-48 - if matches!(msg.data, Data::Precommit(_)) && - self.block.log.has_participation(self.block.round().number, Step::Precommit) - { - log::trace!( - target: "tendermint", - "setting timeout for precommit due to sufficient participation", - ); - self.block.round_mut().set_timeout(Step::Precommit); - } - - // All further operations require actually having the proposal in question - let proposer = self.weights.proposer(self.block.number, self.block.round().number); - let (vr, block) = if let Some(proposal_signed) = - self.block.log.get(self.block.round().number, proposer, Step::Propose) - { - if let Data::Proposal(vr, block) = &proposal_signed.msg.data { - (vr, block) - } else { - panic!("message for Step::Propose didn't have Data::Proposal"); - } - } else { - return Ok(None); - }; - - // 22-33 - if self.block.round().step == Step::Propose { - // Delay error handling (triggering a slash) until after we vote. - let (valid, err) = match self.network.validate(block).await { - Ok(()) => (true, Ok(None)), - Err(BlockError::Temporal) => (false, Ok(None)), - Err(BlockError::Fatal) => (false, { - log::warn!(target: "tendermint", "Validator proposed a fatally invalid block"); - // TODO: Produce evidence of this for the higher level code to decide what to do with - Err(TendermintError::Malicious(proposer, None)) - }), - }; - // Create a raw vote which only requires block validity as a basis for the actual vote. - let raw_vote = Some(block.id()).filter(|_| valid); - - // If locked is none, it has a round of -1 according to the protocol. That satisfies - // 23 and 29. If it's some, both are satisfied if they're for the same ID. If it's some - // with different IDs, the function on 22 rejects yet the function on 28 has one other - // condition - let locked = self.block.locked.as_ref().map_or(true, |(_, id)| id == &block.id()); - let mut vote = raw_vote.filter(|_| locked); - - if let Some(vr) = vr { - // Malformed message - if vr.0 >= self.block.round().number.0 { - log::warn!(target: "tendermint", "Validator claimed a round from the future was valid"); - Err(TendermintError::Malicious( - msg.sender, - Some(Evidence::InvalidValidRound(signed.encode())), - ))?; - } - - if self.block.log.has_consensus(*vr, &Data::Prevote(Some(block.id()))) { - // Allow differing locked values if the proposal has a newer valid round - // This is the other condition described above - if let Some((locked_round, _)) = self.block.locked.as_ref() { - vote = vote.or_else(|| raw_vote.filter(|_| locked_round.0 <= vr.0)); - } - - self.broadcast(Data::Prevote(vote)); - return err; - } - } else { - self.broadcast(Data::Prevote(vote)); - return err; - } - - return Ok(None); - } - - if self.block.valid.as_ref().map_or(true, |(round, _)| round != &self.block.round().number) { - // 36-43 - - // The run once condition is implemented above. Since valid will always be set by this, it - // not being set, or only being set historically, means this has yet to be run - - if self.block.log.has_consensus(self.block.round().number, &Data::Prevote(Some(block.id()))) { - match self.network.validate(block).await { - // BlockError::Temporal is due to a temporal error we have, yet a supermajority of the - // network does not, Because we do not believe this block to be fatally invalid, and - // because a supermajority deems it valid, accept it. - Ok(()) | Err(BlockError::Temporal) => (), - Err(BlockError::Fatal) => { - log::warn!(target: "tendermint", "Validator proposed a fatally invalid block"); - // TODO: Produce evidence of this for the higher level code to decide what to do with - Err(TendermintError::Malicious(proposer, None))? - } - }; - - self.block.valid = Some((self.block.round().number, block.clone())); - if self.block.round().step == Step::Prevote { - self.block.locked = Some((self.block.round().number, block.id())); - self.broadcast(Data::Precommit(Some(( - block.id(), - self - .signer - .sign(&commit_msg( - self.block.end_time[&self.block.round().number].canonical(), - block.id().as_ref(), - )) - .await, - )))); - } - } - } - - Ok(None) - } } diff --git a/coordinator/tributary/tendermint/src/message_log.rs b/coordinator/tributary/tendermint/src/message_log.rs index 3959852d..c6d172c4 100644 --- a/coordinator/tributary/tendermint/src/message_log.rs +++ b/coordinator/tributary/tendermint/src/message_log.rs @@ -2,21 +2,30 @@ use std::{sync::Arc, collections::HashMap}; use parity_scale_codec::Encode; -use crate::{ext::*, RoundNumber, Step, DataFor, TendermintError, SignedMessageFor, Evidence}; +use crate::{ext::*, RoundNumber, Step, DataFor, SignedMessageFor, Evidence}; type RoundLog = HashMap<::ValidatorId, HashMap>>; pub(crate) struct MessageLog { weights: Arc, + round_participation: HashMap, + participation: HashMap<(RoundNumber, Step), u64>, + message_instances: HashMap<(RoundNumber, DataFor), u64>, pub(crate) log: HashMap>, } impl MessageLog { pub(crate) fn new(weights: Arc) -> MessageLog { - MessageLog { weights, log: HashMap::new() } + MessageLog { + weights, + round_participation: HashMap::new(), + participation: HashMap::new(), + message_instances: HashMap::new(), + log: HashMap::new(), + } } // Returns true if it's a new message - pub(crate) fn log(&mut self, signed: SignedMessageFor) -> Result> { + pub(crate) fn log(&mut self, signed: SignedMessageFor) -> Result { let msg = &signed.msg; // Clarity, and safety around default != new edge cases let round = self.log.entry(msg.round).or_insert_with(HashMap::new); @@ -30,69 +39,36 @@ impl MessageLog { target: "tendermint", "Validator sent multiple messages for the same block + round + step" ); - Err(TendermintError::Malicious( - msg.sender, - Some(Evidence::ConflictingMessages(existing.encode(), signed.encode())), - ))?; + Err(Evidence::ConflictingMessages(existing.encode(), signed.encode()))?; } return Ok(false); } + // Since we have a new message, update the participation + let sender_weight = self.weights.weight(msg.sender); + if msgs.is_empty() { + *self.round_participation.entry(msg.round).or_insert_with(|| 0) += sender_weight; + } + *self.participation.entry((msg.round, step)).or_insert_with(|| 0) += sender_weight; + *self.message_instances.entry((msg.round, msg.data.clone())).or_insert_with(|| 0) += + sender_weight; + msgs.insert(step, signed); Ok(true) } - // For a given round, return the participating weight for this step, and the weight agreeing with - // the data. - pub(crate) fn message_instances(&self, round: RoundNumber, data: &DataFor) -> (u64, u64) { - let mut participating = 0; - let mut weight = 0; - for (participant, msgs) in &self.log[&round] { - if let Some(msg) = msgs.get(&data.step()) { - let validator_weight = self.weights.weight(*participant); - participating += validator_weight; - if data == &msg.msg.data { - weight += validator_weight; - } - } - } - (participating, weight) - } - // Get the participation in a given round pub(crate) fn round_participation(&self, round: RoundNumber) -> u64 { - let mut weight = 0; - if let Some(round) = self.log.get(&round) { - for participant in round.keys() { - weight += self.weights.weight(*participant); - } - }; - weight + *self.round_participation.get(&round).unwrap_or(&0) } // Check if a supermajority of nodes have participated on a specific step pub(crate) fn has_participation(&self, round: RoundNumber, step: Step) -> bool { - let mut participating = 0; - for (participant, msgs) in &self.log[&round] { - if msgs.get(&step).is_some() { - participating += self.weights.weight(*participant); - } - } - participating >= self.weights.threshold() + *self.participation.get(&(round, step)).unwrap_or(&0) >= self.weights.threshold() } // Check if consensus has been reached on a specific piece of data pub(crate) fn has_consensus(&self, round: RoundNumber, data: &DataFor) -> bool { - let (_, weight) = self.message_instances(round, data); - weight >= self.weights.threshold() - } - - pub(crate) fn get( - &self, - round: RoundNumber, - sender: N::ValidatorId, - step: Step, - ) -> Option<&SignedMessageFor> { - self.log.get(&round).and_then(|round| round.get(&sender).and_then(|msgs| msgs.get(&step))) + *self.message_instances.get(&(round, data.clone())).unwrap_or(&0) >= self.weights.threshold() } } diff --git a/coordinator/tributary/tendermint/src/round.rs b/coordinator/tributary/tendermint/src/round.rs index 445c2784..a97e3ed1 100644 --- a/coordinator/tributary/tendermint/src/round.rs +++ b/coordinator/tributary/tendermint/src/round.rs @@ -57,6 +57,7 @@ impl RoundData { // Poll all set timeouts, returning the Step whose timeout has just expired pub(crate) async fn timeout_future(&self) -> Step { + /* let now = Instant::now(); log::trace!( target: "tendermint", @@ -64,6 +65,7 @@ impl RoundData { self.step, self.timeouts.iter().map(|(k, v)| (k, v.duration_since(now))).collect::>() ); + */ let timeout_future = |step| { let timeout = self.timeouts.get(&step).copied(); diff --git a/coordinator/tributary/tendermint/tests/ext.rs b/coordinator/tributary/tendermint/tests/ext.rs index 3b3cf7c3..bec95ddc 100644 --- a/coordinator/tributary/tendermint/tests/ext.rs +++ b/coordinator/tributary/tendermint/tests/ext.rs @@ -145,7 +145,7 @@ impl Network for TestNetwork { println!("Slash for {id} due to {event:?}"); } - async fn validate(&mut self, block: &TestBlock) -> Result<(), BlockError> { + async fn validate(&self, block: &TestBlock) -> Result<(), BlockError> { block.valid }