diff --git a/substrate/tendermint/machine/src/lib.rs b/substrate/tendermint/machine/src/lib.rs index 3e388051..46386dc9 100644 --- a/substrate/tendermint/machine/src/lib.rs +++ b/substrate/tendermint/machine/src/lib.rs @@ -16,7 +16,10 @@ use futures::{ use tokio::time::sleep; mod time; -use time::{sys_time, CanonicalInstant}; +use time::*; + +mod round; +use round::*; mod message_log; use message_log::MessageLog; @@ -103,6 +106,21 @@ enum TendermintError<V: ValidatorId> { Temporal, } +struct BlockData<N: Network> { + number: BlockNumber, + validator_id: Option<N::ValidatorId>, + proposal: N::Block, + + log: MessageLog<N>, + slashes: HashSet<N::ValidatorId>, + end_time: HashMap<Round, CanonicalInstant>, + + round: RoundData<N>, + + locked: Option<(Round, <N::Block as Block>::Id)>, + valid: Option<(Round, N::Block)>, +} + /// A machine executing the Tendermint protocol. pub struct TendermintMachine<N: Network> { network: N, @@ -110,11 +128,6 @@ pub struct TendermintMachine<N: Network> { validators: N::SignatureScheme, weights: Arc<N::Weights>, - validator_id: Option<N::ValidatorId>, - - number: BlockNumber, - personal_proposal: N::Block, - queue: VecDeque<Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>>, msg_recv: mpsc::UnboundedReceiver< @@ -122,17 +135,7 @@ pub struct TendermintMachine<N: Network> { >, step_recv: mpsc::UnboundedReceiver<(Commit<N::SignatureScheme>, N::Block)>, - log: MessageLog<N>, - slashes: HashSet<N::ValidatorId>, - end_time: HashMap<Round, CanonicalInstant>, - round: Round, - start_time: CanonicalInstant, - step: Step, - - locked: Option<(Round, <N::Block as Block>::Id)>, - valid: Option<(Round, N::Block)>, - - timeouts: HashMap<Step, Instant>, + block: BlockData<N>, } pub type StepSender<N> = @@ -158,126 +161,113 @@ pub struct TendermintHandle<N: Network> { } impl<N: Network + 'static> TendermintMachine<N> { - fn timeout(&self, step: Step) -> CanonicalInstant { - let adjusted_block = N::BLOCK_PROCESSING_TIME * (self.round.0 + 1); - let adjusted_latency = N::LATENCY_TIME * (self.round.0 + 1); - let offset = Duration::from_secs( - (match step { - Step::Propose => adjusted_block + adjusted_latency, - Step::Prevote => adjusted_block + (2 * adjusted_latency), - Step::Precommit => adjusted_block + (3 * adjusted_latency), - }) - .into(), - ); - self.start_time + offset - } - fn broadcast( &mut self, data: Data<N::Block, <N::SignatureScheme as SignatureScheme>::Signature>, ) { - if let Some(validator_id) = &self.validator_id { + if let Some(validator_id) = &self.block.validator_id { // 27, 33, 41, 46, 60, 64 - self.step = data.step(); + self.block.round.step = data.step(); self.queue.push_back(Message { sender: *validator_id, - number: self.number, - round: self.round, + number: self.block.number, + round: self.block.round.round, data, }); } } - // 14-21 - fn round_propose(&mut self) -> bool { - if Some(self.weights.proposer(self.number, self.round)) == self.validator_id { - let (round, block) = self - .valid - .clone() - .map(|(r, b)| (Some(r), b)) - .unwrap_or((None, self.personal_proposal.clone())); + fn populate_end_time(&mut self, round: Round) { + for r in (self.block.round.round.0 + 1) .. round.0 { + self.block.end_time.insert( + Round(r), + RoundData::<N>::new(Round(r), self.block.end_time[&Round(r - 1)]).end_time(), + ); + } + } + + // Start a new round. Returns true if we were the proposer + fn round(&mut self, round: Round, time: Option<CanonicalInstant>) -> bool { + // If skipping rounds, populate end_time + self.populate_end_time(round); + + // 11-13 + self.block.round = + RoundData::<N>::new(round, time.unwrap_or_else(|| self.block.end_time[&Round(round.0 - 1)])); + self.block.end_time.insert(round, self.block.round.end_time()); + + // 14-21 + if Some(self.weights.proposer(self.block.number, self.block.round.round)) == + self.block.validator_id + { + let (round, block) = if let Some((round, block)) = &self.block.valid { + (Some(*round), block.clone()) + } else { + (None, self.block.proposal.clone()) + }; self.broadcast(Data::Proposal(round, block)); true } else { - self.timeouts.insert(Step::Propose, self.timeout(Step::Propose).instant()); + self.block.round.set_timeout(Step::Propose); false } } - fn round(&mut self, round: Round) -> bool { - // If moving to a new round, correct the start time and populate end_time - for r in self.round.0 .. round.0 { - let end = self.timeout(Step::Precommit); - self.end_time.insert(Round(r), end); - self.round.0 += 1; - self.start_time = end; - } - - // Write the round regardless in case of reset - - // 11-13 - self.round = round; - self.step = Step::Propose; - - // Write the end time - self.end_time.insert(round, self.timeout(Step::Precommit)); - // Clear timeouts - self.timeouts = HashMap::new(); - - self.round_propose() - } - // 53-54 async fn reset(&mut self, end_round: Round, proposal: N::Block) { - // Wait for the next block interval - let round_end = self.end_time[&end_round]; + // Ensure we have the end time data for the last round + self.populate_end_time(end_round); + + // Sleep until this round ends + let round_end = self.block.end_time[&end_round]; sleep(round_end.instant().saturating_duration_since(Instant::now())).await; - self.validator_id = self.signer.validator_id().await; + // Only keep queued messages for this block + self.queue = self.queue.drain(..).filter(|msg| msg.number == self.block.number).collect(); - self.number.0 += 1; - self.personal_proposal = proposal; + // Create the new block + self.block = BlockData { + number: BlockNumber(self.block.number.0 + 1), + validator_id: self.signer.validator_id().await, + proposal, - self.queue = self.queue.drain(..).filter(|msg| msg.number == self.number).collect(); + log: MessageLog::new(self.weights.clone()), + slashes: HashSet::new(), + end_time: HashMap::new(), - self.log = MessageLog::new(self.weights.clone()); - self.slashes = HashSet::new(); - self.end_time = HashMap::new(); - self.start_time = round_end; + // This will be populated in the following round() call + round: RoundData::<N>::new(Round(0), CanonicalInstant::new(0)), - self.locked = None; - self.valid = None; + locked: None, + valid: None, + }; - self.round(Round(0)); + // Start the first round + self.round(Round(0), Some(round_end)); } async fn reset_by_commit(&mut self, commit: Commit<N::SignatureScheme>, proposal: N::Block) { - let mut round = None; - // If our start time is >= the commit's end time, it's from a previous round - if self.start_time.canonical() >= commit.end_time { - for (round_i, end_time) in &self.end_time { - if end_time.canonical() == commit.end_time { - round = Some(*round_i); - break; - } - } - } else { - // Increment rounds until we find the round - while { - self.round(Round(self.round.0 + 1)); - // Use < to prevent an infinite loop - self.end_time[&self.round].canonical() < commit.end_time - } {} - round = - Some(self.round).filter(|_| self.end_time[&self.round].canonical() == commit.end_time); + let mut round = self.block.round.round; + // If this commit is for a round we don't have, jump up to it + while self.block.end_time[&round].canonical() < commit.end_time { + round.0 += 1; + self.populate_end_time(round); } + // If this commit is for a prior round, find it + while self.block.end_time[&round].canonical() > commit.end_time { + if round.0 == 0 { + panic!("commit isn't for this machine's next block"); + } + round.0 -= 1; + } + debug_assert_eq!(self.block.end_time[&round].canonical(), commit.end_time); - self.reset(round.expect("commit wasn't for the machine's next block"), proposal).await; + self.reset(round, proposal).await; } async fn slash(&mut self, validator: N::ValidatorId) { - if !self.slashes.contains(&validator) { - self.slashes.insert(validator); + if !self.block.slashes.contains(&validator) { + self.block.slashes.insert(validator); self.network.slash(validator).await; } } @@ -312,61 +302,42 @@ impl<N: Network + 'static> TendermintMachine<N> { validators, weights: weights.clone(), - validator_id, - - number: BlockNumber(last.0 .0 + 1), - personal_proposal: proposal, - queue: VecDeque::new(), msg_recv, step_recv, - log: MessageLog::new(weights), - slashes: HashSet::new(), - end_time: HashMap::new(), - round: Round(0), - // The end time of the last block is the start time for this one - // The Commit explicitly contains the end time, so loading the last commit will provide - // this. The only exception is for the genesis block, which doesn't have a commit - // Using the genesis time in place will cause this block to be created immediately - // after it, without the standard amount of separation (so their times will be - // equivalent or minimally offset) - // For callers wishing to avoid this, they should pass (0, GENESIS + N::block_time()) - start_time: CanonicalInstant::new(last.1), - step: Step::Propose, + block: BlockData { + number: BlockNumber(last.0 .0 + 1), + validator_id, + proposal, - locked: None, - valid: None, + log: MessageLog::new(weights), + slashes: HashSet::new(), + end_time: HashMap::new(), - timeouts: HashMap::new(), + // This will be populated in the following round() call + round: RoundData::<N>::new(Round(0), CanonicalInstant::new(0)), + + locked: None, + valid: None, + }, }; - machine.round(Round(0)); + + // The end time of the last block is the start time for this one + // The Commit explicitly contains the end time, so loading the last commit will provide + // this. The only exception is for the genesis block, which doesn't have a commit + // Using the genesis time in place will cause this block to be created immediately + // after it, without the standard amount of separation (so their times will be + // equivalent or minimally offset) + // For callers wishing to avoid this, they should pass (0, GENESIS + N::block_time()) + machine.round(Round(0), Some(CanonicalInstant::new(last.1))); machine }, } } pub async fn run(mut self) { - self.round(Round(0)); - loop { - // Create futures for the various timeouts - let timeout_future = |step| { - let timeout = self.timeouts.get(&step).copied(); - (async move { - if let Some(timeout) = timeout { - sleep(timeout.saturating_duration_since(Instant::now())).await; - } else { - future::pending::<()>().await; - } - }) - .fuse() - }; - let propose_timeout = timeout_future(Step::Propose); - let prevote_timeout = timeout_future(Step::Prevote); - let precommit_timeout = timeout_future(Step::Precommit); - futures::pin_mut!(propose_timeout, prevote_timeout, precommit_timeout); - // Also create a future for if the queue has a message // Does not pop_front as if another message has higher priority, its future will be handled // instead in this loop, and the popped value would be dropped with the next iteration @@ -391,31 +362,28 @@ impl<N: Network + 'static> TendermintMachine<N> { }, // Handle any timeouts - _ = &mut propose_timeout => { + step = self.block.round.timeout_future().fuse() => { // Remove the timeout so it doesn't persist, always being the selected future due to bias - // While this does enable the below get_entry calls to enter timeouts again, they'll - // never attempt to add a timeout after this timeout has expired - self.timeouts.remove(&Step::Propose); - if self.step == Step::Propose { - // Slash the validator for not proposing when they should've - self.slash(self.weights.proposer(self.number, self.round)).await; - self.broadcast(Data::Prevote(None)); + // While this does enable the timeout to be entered again, the timeout setting code will + // never attempt to add a timeout after its timeout has expired + self.block.round.timeouts.remove(&step); + // Only run if it's still the step in question + if self.block.round.step == step { + match step { + Step::Propose => { + // Slash the validator for not proposing when they should've + self.slash(self.weights.proposer(self.block.number, self.block.round.round)).await; + self.broadcast(Data::Prevote(None)); + }, + Step::Prevote => self.broadcast(Data::Precommit(None)), + Step::Precommit => { + self.round(Round(self.block.round.round.0 + 1), None); + continue; + } + } } None }, - _ = &mut prevote_timeout => { - self.timeouts.remove(&Step::Prevote); - if self.step == Step::Prevote { - self.broadcast(Data::Precommit(None)); - } - None - }, - _ = &mut precommit_timeout => { - // Technically unnecessary since round() will clear the timeouts - self.timeouts.remove(&Step::Precommit); - self.round(Round(self.round.0.wrapping_add(1))); - continue; - }, // Handle any received messages msg = self.msg_recv.next() => { @@ -440,6 +408,7 @@ impl<N: Network + 'static> TendermintMachine<N> { let mut validators = vec![]; let mut sigs = vec![]; for (v, sig) in self + .block .log .precommitted .iter() @@ -450,7 +419,7 @@ impl<N: Network + 'static> TendermintMachine<N> { } let commit = Commit { - end_time: self.end_time[&msg.round].canonical(), + end_time: self.block.end_time[&msg.round].canonical(), validators, signature: N::SignatureScheme::aggregate(&sigs), }; @@ -484,7 +453,7 @@ impl<N: Network + 'static> TendermintMachine<N> { // Only perform this verification if we already have the end_time // Else, there's a DoS where we receive a precommit for some round infinitely in the future // which forces to calculate every end time - if let Some(end_time) = self.end_time.get(&round) { + if let Some(end_time) = self.block.end_time.get(&round) { if !self.validators.verify(sender, &commit_msg(end_time.canonical(), id.as_ref()), sig) { Err(TendermintError::Malicious(sender))?; } @@ -497,7 +466,7 @@ impl<N: Network + 'static> TendermintMachine<N> { &mut self, msg: Message<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>, ) -> Result<Option<N::Block>, TendermintError<N::ValidatorId>> { - if msg.number != self.number { + if msg.number != self.block.number { Err(TendermintError::Temporal)?; } @@ -511,7 +480,7 @@ impl<N: Network + 'static> TendermintMachine<N> { Err(TendermintError::Malicious(msg.sender))?; }; - if !self.log.log(msg.clone())? { + if !self.block.log.log(msg.clone())? { return Ok(None); } @@ -520,13 +489,14 @@ impl<N: Network + 'static> TendermintMachine<N> { // Run the finalizer to see if it applies // 49-52 if matches!(msg.data, Data::Proposal(..)) || matches!(msg.data, Data::Precommit(_)) { - let proposer = self.weights.proposer(self.number, msg.round); + let proposer = self.weights.proposer(self.block.number, msg.round); // Get the proposal - if let Some(Data::Proposal(_, block)) = self.log.get(msg.round, proposer, Step::Propose) { + if let Some(Data::Proposal(_, block)) = self.block.log.get(msg.round, proposer, Step::Propose) + { // Check if it has gotten a sufficient amount of precommits // Use a junk signature since message equality disregards the signature - if self.log.has_consensus( + if self.block.log.has_consensus( msg.round, Data::Precommit(Some((block.id(), self.signer.sign(&[]).await))), ) { @@ -537,15 +507,15 @@ impl<N: Network + 'static> TendermintMachine<N> { // Else, check if we need to jump ahead #[allow(clippy::comparison_chain)] - if msg.round.0 < self.round.0 { + if msg.round.0 < self.block.round.round.0 { // Prior round, disregard if not finalizing return Ok(None); - } else if msg.round.0 > self.round.0 { + } else if msg.round.0 > self.block.round.round.0 { // 55-56 // Jump, enabling processing by the below code - if self.log.round_participation(msg.round) > self.weights.fault_thresold() { + if self.block.log.round_participation(msg.round) > self.weights.fault_thresold() { // If this round already has precommit messages, verify their signatures - let round_msgs = self.log.log[&msg.round].clone(); + let round_msgs = self.block.log.log[&msg.round].clone(); for (validator, msgs) in &round_msgs { if let Some(data) = msgs.get(&Step::Precommit) { if self.verify_precommit_signature(*validator, msg.round, data).is_err() { @@ -555,7 +525,7 @@ impl<N: Network + 'static> TendermintMachine<N> { } // If we're the proposer, return now so we re-run processing with our proposal // If we continue now, it'd just be wasted ops - if self.round(msg.round) { + if self.round(msg.round, None) { return Ok(None); } } else { @@ -567,12 +537,12 @@ impl<N: Network + 'static> TendermintMachine<N> { // The paper executes these checks when the step is prevote. Making sure this message warrants // rerunning these checks is a sane optimization since message instances is a full iteration // of the round map - if (self.step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) { - let (participation, weight) = self.log.message_instances(self.round, Data::Prevote(None)); + if (self.block.round.step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) { + let (participation, weight) = + self.block.log.message_instances(self.block.round.round, Data::Prevote(None)); // 34-35 if participation >= self.weights.threshold() { - let timeout = self.timeout(Step::Prevote); - self.timeouts.entry(Step::Prevote).or_insert_with(|| timeout.instant()); + self.block.round.set_timeout(Step::Prevote); } // 44-46 @@ -584,16 +554,17 @@ impl<N: Network + 'static> TendermintMachine<N> { // 47-48 if matches!(msg.data, Data::Precommit(_)) && - self.log.has_participation(self.round, Step::Precommit) + self.block.log.has_participation(self.block.round.round, Step::Precommit) { - let timeout = self.timeout(Step::Precommit); - self.timeouts.entry(Step::Precommit).or_insert_with(|| timeout.instant()); + self.block.round.set_timeout(Step::Precommit); } - let proposer = self.weights.proposer(self.number, self.round); - if let Some(Data::Proposal(vr, block)) = self.log.get(self.round, proposer, Step::Propose) { + let proposer = self.weights.proposer(self.block.number, self.block.round.round); + if let Some(Data::Proposal(vr, block)) = + self.block.log.get(self.block.round.round, proposer, Step::Propose) + { // 22-33 - if self.step == Step::Propose { + if self.block.round.step == Step::Propose { // Delay error handling (triggering a slash) until after we vote. let (valid, err) = match self.network.validate(block).await { Ok(_) => (true, Ok(None)), @@ -607,19 +578,19 @@ impl<N: Network + 'static> TendermintMachine<N> { // 23 and 29. If it's some, both are satisfied if they're for the same ID. If it's some // with different IDs, the function on 22 rejects yet the function on 28 has one other // condition - let locked = self.locked.as_ref().map(|(_, id)| id == &block.id()).unwrap_or(true); + let locked = self.block.locked.as_ref().map(|(_, id)| id == &block.id()).unwrap_or(true); let mut vote = raw_vote.filter(|_| locked); if let Some(vr) = vr { // Malformed message - if vr.0 >= self.round.0 { + if vr.0 >= self.block.round.round.0 { Err(TendermintError::Malicious(msg.sender))?; } - if self.log.has_consensus(*vr, Data::Prevote(Some(block.id()))) { + if self.block.log.has_consensus(*vr, Data::Prevote(Some(block.id()))) { // Allow differing locked values if the proposal has a newer valid round // This is the other condition described above - if let Some((locked_round, _)) = self.locked.as_ref() { + if let Some((locked_round, _)) = self.block.locked.as_ref() { vote = vote.or_else(|| raw_vote.filter(|_| locked_round.0 <= vr.0)); } @@ -630,27 +601,36 @@ impl<N: Network + 'static> TendermintMachine<N> { self.broadcast(Data::Prevote(vote)); return err; } - } else if self.valid.as_ref().map(|(round, _)| round != &self.round).unwrap_or(true) { + } else if self + .block + .valid + .as_ref() + .map(|(round, _)| round != &self.block.round.round) + .unwrap_or(true) + { // 36-43 // The run once condition is implemented above. Sinve valid will always be set, it not // being set, or only being set historically, means this has yet to be run - if self.log.has_consensus(self.round, Data::Prevote(Some(block.id()))) { + if self.block.log.has_consensus(self.block.round.round, Data::Prevote(Some(block.id()))) { match self.network.validate(block).await { Ok(_) => (), Err(BlockError::Temporal) => (), Err(BlockError::Fatal) => Err(TendermintError::Malicious(proposer))?, }; - self.valid = Some((self.round, block.clone())); - if self.step == Step::Prevote { - self.locked = Some((self.round, block.id())); + self.block.valid = Some((self.block.round.round, block.clone())); + if self.block.round.step == Step::Prevote { + self.block.locked = Some((self.block.round.round, block.id())); self.broadcast(Data::Precommit(Some(( block.id(), self .signer - .sign(&commit_msg(self.end_time[&self.round].canonical(), block.id().as_ref())) + .sign(&commit_msg( + self.block.end_time[&self.block.round.round].canonical(), + block.id().as_ref(), + )) .await, )))); return Ok(None); diff --git a/substrate/tendermint/machine/src/round.rs b/substrate/tendermint/machine/src/round.rs new file mode 100644 index 00000000..2763e5f7 --- /dev/null +++ b/substrate/tendermint/machine/src/round.rs @@ -0,0 +1,82 @@ +use std::{ + marker::PhantomData, + time::{Duration, Instant}, + collections::HashMap, +}; + +use futures::{FutureExt, future}; +use tokio::time::sleep; + +use crate::{ + time::CanonicalInstant, + Step, + ext::{Round, Network}, +}; + +pub(crate) struct RoundData<N: Network> { + _network: PhantomData<N>, + pub(crate) round: Round, + pub(crate) start_time: CanonicalInstant, + pub(crate) step: Step, + pub(crate) timeouts: HashMap<Step, Instant>, +} + +impl<N: Network> RoundData<N> { + pub(crate) fn new(round: Round, start_time: CanonicalInstant) -> Self { + RoundData { + _network: PhantomData, + round, + start_time, + step: Step::Propose, + timeouts: HashMap::new(), + } + } + + fn timeout(&self, step: Step) -> CanonicalInstant { + let adjusted_block = N::BLOCK_PROCESSING_TIME * (self.round.0 + 1); + let adjusted_latency = N::LATENCY_TIME * (self.round.0 + 1); + let offset = Duration::from_secs( + (match step { + Step::Propose => adjusted_block + adjusted_latency, + Step::Prevote => adjusted_block + (2 * adjusted_latency), + Step::Precommit => adjusted_block + (3 * adjusted_latency), + }) + .into(), + ); + self.start_time + offset + } + + pub(crate) fn end_time(&self) -> CanonicalInstant { + self.timeout(Step::Precommit) + } + + pub(crate) fn set_timeout(&mut self, step: Step) { + let timeout = self.timeout(step).instant(); + self.timeouts.entry(step).or_insert(timeout); + } + + pub(crate) async fn timeout_future(&self) -> Step { + let timeout_future = |step| { + let timeout = self.timeouts.get(&step).copied(); + (async move { + if let Some(timeout) = timeout { + sleep(timeout.saturating_duration_since(Instant::now())).await; + } else { + future::pending::<()>().await; + } + step + }) + .fuse() + }; + let propose_timeout = timeout_future(Step::Propose); + let prevote_timeout = timeout_future(Step::Prevote); + let precommit_timeout = timeout_future(Step::Precommit); + futures::pin_mut!(propose_timeout, prevote_timeout, precommit_timeout); + + futures::select_biased! { + step = propose_timeout => step, + step = prevote_timeout => step, + step = precommit_timeout => step, + } + } +}