diff --git a/substrate/tendermint/machine/src/block.rs b/substrate/tendermint/machine/src/block.rs index a6b7b190..3e7dc6a5 100644 --- a/substrate/tendermint/machine/src/block.rs +++ b/substrate/tendermint/machine/src/block.rs @@ -8,6 +8,7 @@ use crate::{ ext::{RoundNumber, BlockNumber, Block, Network}, round::RoundData, message_log::MessageLog, + Step, Data, DataFor, Message, MessageFor, }; pub(crate) struct BlockData<N: Network> { @@ -56,4 +57,71 @@ impl<N: Network> BlockData<N> { pub(crate) fn round_mut(&mut self) -> &mut RoundData<N> { self.round.as_mut().unwrap() } + + pub(crate) fn populate_end_time(&mut self, round: RoundNumber) { + for r in (self.round().number.0 + 1) .. round.0 { + self.end_time.insert( + RoundNumber(r), + RoundData::<N>::new(RoundNumber(r), self.end_time[&RoundNumber(r - 1)]).end_time(), + ); + } + } + + // Start a new round. Optionally takes in the time for when this is the first round, and the time + // isn't simply the time of the prior round (yet rather the prior block). Returns the proposal + // data, if we are the proposer. + pub(crate) fn new_round( + &mut self, + round: RoundNumber, + proposer: N::ValidatorId, + time: Option<CanonicalInstant>, + ) -> Option<DataFor<N>> { + debug_assert_eq!(round.0 == 0, time.is_some()); + + // If skipping rounds, populate end_time + if round.0 != 0 { + self.populate_end_time(round); + } + + // 11-13 + self.round = Some(RoundData::<N>::new( + round, + time.unwrap_or_else(|| self.end_time[&RoundNumber(round.0 - 1)]), + )); + self.end_time.insert(round, self.round().end_time()); + + // 14-21 + if Some(proposer) == self.validator_id { + let (round, block) = if let Some((round, block)) = &self.valid { + (Some(*round), block.clone()) + } else { + (None, self.proposal.clone()) + }; + Some(Data::Proposal(round, block)) + } else { + self.round_mut().set_timeout(Step::Propose); + None + } + } + + // Transform Data into an actual Message, using the contextual data from this block + pub(crate) fn message(&mut self, data: DataFor<N>) -> Option<MessageFor<N>> { + debug_assert_eq!( + self.round().step, + match data.step() { + Step::Propose | Step::Prevote => Step::Propose, + Step::Precommit => Step::Prevote, + }, + ); + // 27, 33, 41, 46, 60, 64 + self.round_mut().step = data.step(); + + // Only return a message to if we're actually a current validator + self.validator_id.map(|validator_id| Message { + sender: validator_id, + number: self.number, + round: self.round().number, + data, + }) + } } diff --git a/substrate/tendermint/machine/src/ext.rs b/substrate/tendermint/machine/src/ext.rs index ba411e6f..b7295f2c 100644 --- a/substrate/tendermint/machine/src/ext.rs +++ b/substrate/tendermint/machine/src/ext.rs @@ -6,7 +6,7 @@ use thiserror::Error; use parity_scale_codec::{Encode, Decode}; -use crate::{SignedMessage, commit_msg}; +use crate::{SignedMessageFor, commit_msg}; /// An alias for a series of traits required for a type to be usable as a validator ID, /// automatically implemented for all types satisfying those traits. @@ -249,14 +249,7 @@ pub trait Network: Send + Sync { /// established, this will double-authenticate. Switching to unauthenticated channels in a system /// already providing authenticated channels is not recommended as this is a minor, temporal /// inefficiency while downgrading channels may have wider implications. - async fn broadcast( - &mut self, - msg: SignedMessage< - Self::ValidatorId, - Self::Block, - <Self::SignatureScheme as SignatureScheme>::Signature, - >, - ); + async fn broadcast(&mut self, msg: SignedMessageFor<Self>); /// Trigger a slash for the validator in question who was definitively malicious. /// The exact process of triggering a slash is undefined and left to the network as a whole. diff --git a/substrate/tendermint/machine/src/lib.rs b/substrate/tendermint/machine/src/lib.rs index cdd3e90a..876b11ec 100644 --- a/substrate/tendermint/machine/src/lib.rs +++ b/substrate/tendermint/machine/src/lib.rs @@ -19,7 +19,6 @@ mod time; use time::{sys_time, CanonicalInstant}; mod round; -use round::RoundData; mod block; use block::BlockData; @@ -108,6 +107,21 @@ enum TendermintError<V: ValidatorId> { Temporal, } +// Type aliases to abstract over generic hell +pub(crate) type DataFor<N> = + Data<<N as Network>::Block, <<N as Network>::SignatureScheme as SignatureScheme>::Signature>; +pub(crate) type MessageFor<N> = Message< + <N as Network>::ValidatorId, + <N as Network>::Block, + <<N as Network>::SignatureScheme as SignatureScheme>::Signature, +>; +/// Type alias to the SignedMessage type for a given Network +pub type SignedMessageFor<N> = SignedMessage< + <N as Network>::ValidatorId, + <N as Network>::Block, + <<N as Network>::SignatureScheme as SignatureScheme>::Signature, +>; + /// A machine executing the Tendermint protocol. pub struct TendermintMachine<N: Network> { network: N, @@ -115,11 +129,8 @@ pub struct TendermintMachine<N: Network> { validators: N::SignatureScheme, weights: Arc<N::Weights>, - queue: - VecDeque<Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>>, - msg_recv: mpsc::UnboundedReceiver< - SignedMessage<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>, - >, + queue: VecDeque<MessageFor<N>>, + msg_recv: mpsc::UnboundedReceiver<SignedMessageFor<N>>, step_recv: mpsc::UnboundedReceiver<(Commit<N::SignatureScheme>, N::Block)>, block: BlockData<N>, @@ -128,13 +139,7 @@ pub struct TendermintMachine<N: Network> { pub type StepSender<N> = mpsc::UnboundedSender<(Commit<<N as Network>::SignatureScheme>, <N as Network>::Block)>; -pub type MessageSender<N> = mpsc::UnboundedSender< - SignedMessage< - <N as Network>::ValidatorId, - <N as Network>::Block, - <<N as Network>::SignatureScheme as SignatureScheme>::Signature, - >, ->; +pub type MessageSender<N> = mpsc::UnboundedSender<SignedMessageFor<N>>; /// A Tendermint machine and its channel to receive messages from the gossip layer over. pub struct TendermintHandle<N: Network> { @@ -148,58 +153,20 @@ pub struct TendermintHandle<N: Network> { } impl<N: Network + 'static> TendermintMachine<N> { - fn broadcast( - &mut self, - data: Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>, - ) { - if let Some(validator_id) = self.block.validator_id { - // 27, 33, 41, 46, 60, 64 - self.block.round_mut().step = data.step(); - self.queue.push_back(Message { - sender: validator_id, - number: self.block.number, - round: self.block.round().number, - data, - }); - } - } - - fn populate_end_time(&mut self, round: RoundNumber) { - for r in (self.block.round().number.0 + 1) .. round.0 { - self.block.end_time.insert( - RoundNumber(r), - RoundData::<N>::new(RoundNumber(r), self.block.end_time[&RoundNumber(r - 1)]).end_time(), - ); + fn broadcast(&mut self, data: DataFor<N>) { + if let Some(msg) = self.block.message(data) { + self.queue.push_back(msg); } } // Start a new round. Returns true if we were the proposer fn round(&mut self, round: RoundNumber, time: Option<CanonicalInstant>) -> bool { - // If skipping rounds, populate end_time - if round.0 != 0 { - self.populate_end_time(round); - } - - // 11-13 - self.block.round = Some(RoundData::<N>::new( - round, - time.unwrap_or_else(|| self.block.end_time[&RoundNumber(round.0 - 1)]), - )); - self.block.end_time.insert(round, self.block.round().end_time()); - - // 14-21 - if Some(self.weights.proposer(self.block.number, self.block.round().number)) == - self.block.validator_id + if let Some(data) = + self.block.new_round(round, self.weights.proposer(self.block.number, round), time) { - let (round, block) = if let Some((round, block)) = &self.block.valid { - (Some(*round), block.clone()) - } else { - (None, self.block.proposal.clone()) - }; - self.broadcast(Data::Proposal(round, block)); + self.broadcast(data); true } else { - self.block.round_mut().set_timeout(Step::Propose); false } } @@ -207,7 +174,7 @@ impl<N: Network + 'static> TendermintMachine<N> { // 53-54 async fn reset(&mut self, end_round: RoundNumber, proposal: N::Block) { // Ensure we have the end time data for the last round - self.populate_end_time(end_round); + self.block.populate_end_time(end_round); // Sleep until this round ends let round_end = self.block.end_time[&end_round]; @@ -233,7 +200,7 @@ impl<N: Network + 'static> TendermintMachine<N> { // If this commit is for a round we don't have, jump up to it while self.block.end_time[&round].canonical() < commit.end_time { round.0 += 1; - self.populate_end_time(round); + self.block.populate_end_time(round); } // If this commit is for a prior round, find it while self.block.end_time[&round].canonical() > commit.end_time { @@ -417,7 +384,7 @@ impl<N: Network + 'static> TendermintMachine<N> { &self, sender: N::ValidatorId, round: RoundNumber, - data: &Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>, + data: &DataFor<N>, ) -> Result<(), TendermintError<N::ValidatorId>> { if let Data::Precommit(Some((id, sig))) = data { // Also verify the end_time of the commit @@ -435,7 +402,7 @@ impl<N: Network + 'static> TendermintMachine<N> { async fn message( &mut self, - msg: Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>, + msg: MessageFor<N>, ) -> Result<Option<N::Block>, TendermintError<N::ValidatorId>> { if msg.number != self.block.number { Err(TendermintError::Temporal)?; diff --git a/substrate/tendermint/machine/src/message_log.rs b/substrate/tendermint/machine/src/message_log.rs index e9877130..0592160d 100644 --- a/substrate/tendermint/machine/src/message_log.rs +++ b/substrate/tendermint/machine/src/message_log.rs @@ -1,6 +1,6 @@ use std::{sync::Arc, collections::HashMap}; -use crate::{ext::*, RoundNumber, Step, Data, Message, TendermintError}; +use crate::{ext::*, RoundNumber, Step, Data, DataFor, MessageFor, TendermintError}; pub(crate) struct MessageLog<N: Network> { weights: Arc<N::Weights>, @@ -8,13 +8,7 @@ pub(crate) struct MessageLog<N: Network> { N::ValidatorId, (<N::Block as Block>::Id, <N::SignatureScheme as SignatureScheme>::Signature), >, - pub(crate) log: HashMap< - RoundNumber, - HashMap< - N::ValidatorId, - HashMap<Step, Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>>, - >, - >, + pub(crate) log: HashMap<RoundNumber, HashMap<N::ValidatorId, HashMap<Step, DataFor<N>>>>, } impl<N: Network> MessageLog<N> { @@ -25,7 +19,7 @@ impl<N: Network> MessageLog<N> { // Returns true if it's a new message pub(crate) fn log( &mut self, - msg: Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>, + msg: MessageFor<N>, ) -> Result<bool, TendermintError<N::ValidatorId>> { let round = self.log.entry(msg.round).or_insert_with(HashMap::new); let msgs = round.entry(msg.sender).or_insert_with(HashMap::new); @@ -55,11 +49,7 @@ impl<N: Network> MessageLog<N> { // For a given round, return the participating weight for this step, and the weight agreeing with // the data. - pub(crate) fn message_instances( - &self, - round: RoundNumber, - data: Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>, - ) -> (u64, u64) { + pub(crate) fn message_instances(&self, round: RoundNumber, data: DataFor<N>) -> (u64, u64) { let mut participating = 0; let mut weight = 0; for (participant, msgs) in &self.log[&round] { @@ -97,11 +87,7 @@ impl<N: Network> MessageLog<N> { } // Check if consensus has been reached on a specific piece of data - pub(crate) fn has_consensus( - &self, - round: RoundNumber, - data: Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>, - ) -> bool { + pub(crate) fn has_consensus(&self, round: RoundNumber, data: DataFor<N>) -> bool { let (_, weight) = self.message_instances(round, data); weight >= self.weights.threshold() } @@ -111,7 +97,7 @@ impl<N: Network> MessageLog<N> { round: RoundNumber, sender: N::ValidatorId, step: Step, - ) -> Option<&Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>> { + ) -> Option<&DataFor<N>> { self.log.get(&round).and_then(|round| round.get(&sender).and_then(|msgs| msgs.get(&step))) } } diff --git a/substrate/tendermint/machine/tests/ext.rs b/substrate/tendermint/machine/tests/ext.rs index 69b8e55f..086c96d8 100644 --- a/substrate/tendermint/machine/tests/ext.rs +++ b/substrate/tendermint/machine/tests/ext.rs @@ -11,7 +11,7 @@ use futures::SinkExt; use tokio::{sync::RwLock, time::sleep}; use tendermint_machine::{ - ext::*, SignedMessage, StepSender, MessageSender, TendermintMachine, TendermintHandle, + ext::*, SignedMessageFor, StepSender, MessageSender, TendermintMachine, TendermintHandle, }; type TestValidatorId = u16; @@ -120,7 +120,7 @@ impl Network for TestNetwork { TestWeights } - async fn broadcast(&mut self, msg: SignedMessage<TestValidatorId, Self::Block, [u8; 32]>) { + async fn broadcast(&mut self, msg: SignedMessageFor<Self>) { for (messages, _) in self.1.write().await.iter_mut() { messages.send(msg.clone()).await.unwrap(); }