diff --git a/substrate/tendermint/machine/src/ext.rs b/substrate/tendermint/machine/src/ext.rs index b9ffe5cc..270a3b12 100644 --- a/substrate/tendermint/machine/src/ext.rs +++ b/substrate/tendermint/machine/src/ext.rs @@ -121,7 +121,7 @@ impl<S: SignatureScheme> SignatureScheme for Arc<S> { /// a valid commit. #[derive(Clone, PartialEq, Debug, Encode, Decode)] pub struct Commit<S: SignatureScheme> { - /// End time of the round, used as the start time of next round. + /// End time of the round which created this commit, used as the start time of the next block. pub end_time: u64, /// Validators participating in the signature. pub validators: Vec<S::ValidatorId>, diff --git a/substrate/tendermint/machine/src/lib.rs b/substrate/tendermint/machine/src/lib.rs index 6e45f7e2..6b674ae5 100644 --- a/substrate/tendermint/machine/src/lib.rs +++ b/substrate/tendermint/machine/src/lib.rs @@ -119,6 +119,7 @@ pub struct TendermintMachine<N: Network> { msg_recv: mpsc::UnboundedReceiver< SignedMessage<N::ValidatorId, N::Block, <N::SignatureScheme as SignatureScheme>::Signature>, >, + step_recv: mpsc::UnboundedReceiver<(Commit<N::SignatureScheme>, N::Block)>, log: MessageLog<N>, slashes: HashSet<N::ValidatorId>, @@ -142,6 +143,9 @@ pub type MessageSender<N> = mpsc::UnboundedSender< /// A Tendermint machine and its channel to receive messages from the gossip layer over. pub struct TendermintHandle<N: Network> { + /// Channel to trigger the machine to move to the next height. + /// Takes in the the previous block's commit, along with the new proposal. + pub step: mpsc::UnboundedSender<(Commit<N::SignatureScheme>, N::Block)>, /// Channel to send messages received from the P2P layer. pub messages: MessageSender<N>, /// Tendermint machine to be run on an asynchronous task. @@ -251,6 +255,26 @@ impl<N: Network + 'static> TendermintMachine<N> { self.round(Round(0)); } + async fn reset_by_commit(&mut self, commit: Commit<N::SignatureScheme>, proposal: N::Block) { + // Determine the Round number this commit ended on + let mut round = Round(0); + // Use < to prevent an infinite loop + while self.canonical_end_time(round) < commit.end_time { + round.0 += 1; + } + debug_assert_eq!( + self.canonical_end_time(round), + commit.end_time, + "resetting by commit for a different block" + ); + + // Populate the various pieces of round info + if self.round.0 < round.0 { + self.round(round); + } + self.reset(round, proposal).await; + } + async fn slash(&mut self, validator: N::ValidatorId) { if !self.slashes.contains(&validator) { self.slashes.insert(validator); @@ -268,7 +292,9 @@ impl<N: Network + 'static> TendermintMachine<N> { proposal: N::Block, ) -> TendermintHandle<N> { let (msg_send, msg_recv) = mpsc::unbounded(); + let (step_send, step_recv) = mpsc::unbounded(); TendermintHandle { + step: step_send, messages: msg_send, machine: { let last_end = UNIX_EPOCH + Duration::from_secs(last.1); @@ -317,6 +343,7 @@ impl<N: Network + 'static> TendermintMachine<N> { queue: VecDeque::new(), msg_recv, + step_recv, log: MessageLog::new(weights), slashes: HashSet::new(), @@ -364,6 +391,16 @@ impl<N: Network + 'static> TendermintMachine<N> { if self.queue.is_empty() { Fuse::terminated() } else { future::ready(()).fuse() }; if let Some((broadcast, msg)) = futures::select_biased! { + // Handle a new height occuring externally (an external sync loop) + msg = self.step_recv.next() => { + if let Some((commit, proposal)) = msg { + self.reset_by_commit(commit, proposal).await; + None + } else { + break; + } + }, + // Handle our messages _ = queue_future => { Some((true, self.queue.pop_front().unwrap())) diff --git a/substrate/tendermint/machine/tests/ext.rs b/substrate/tendermint/machine/tests/ext.rs index 3e3e52ce..62552ff7 100644 --- a/substrate/tendermint/machine/tests/ext.rs +++ b/substrate/tendermint/machine/tests/ext.rs @@ -152,7 +152,7 @@ impl TestNetwork { let mut write = arc.write().await; for i in 0 .. validators { let i = u16::try_from(i).unwrap(); - let TendermintHandle { messages, machine } = TendermintMachine::new( + let TendermintHandle { messages, machine, .. } = TendermintMachine::new( TestNetwork(i, arc.clone()), (BlockNumber(1), (SystemTime::now().duration_since(UNIX_EPOCH)).unwrap().as_secs()), TestBlock { id: 1u32.to_le_bytes(), valid: Ok(()) },