diff --git a/coordinator/tributary/tendermint/src/ext.rs b/coordinator/tributary/tendermint/src/ext.rs index 0670d5da..796015f0 100644 --- a/coordinator/tributary/tendermint/src/ext.rs +++ b/coordinator/tributary/tendermint/src/ext.rs @@ -117,9 +117,10 @@ impl SignatureScheme for Arc { } } -/// A commit for a specific block. The list of validators have weight exceeding the threshold for -/// a valid commit. -#[derive(Clone, PartialEq, Debug, Encode, Decode)] +/// A commit for a specific block. +/// +/// The list of validators have weight exceeding the threshold for a valid commit. +#[derive(PartialEq, Debug, Encode, Decode)] pub struct Commit { /// End time of the round which created this commit, used as the start time of the next block. pub end_time: u64, @@ -129,6 +130,16 @@ pub struct Commit { pub signature: S::AggregateSignature, } +impl Clone for Commit { + fn clone(&self) -> Self { + Self { + end_time: self.end_time, + validators: self.validators.clone(), + signature: self.signature.clone(), + } + } +} + /// Weights for the validators present. pub trait Weights: Send + Sync { type ValidatorId: ValidatorId; diff --git a/coordinator/tributary/tendermint/src/lib.rs b/coordinator/tributary/tendermint/src/lib.rs index 4146dbb5..ee355b02 100644 --- a/coordinator/tributary/tendermint/src/lib.rs +++ b/coordinator/tributary/tendermint/src/lib.rs @@ -135,17 +135,18 @@ pub struct TendermintMachine { queue: VecDeque>, msg_recv: mpsc::UnboundedReceiver>, - #[allow(clippy::type_complexity)] - step_recv: mpsc::UnboundedReceiver<(BlockNumber, Commit, Option)>, + synced_block_recv: mpsc::UnboundedReceiver>, block: BlockData, } -pub type StepSender = mpsc::UnboundedSender<( - BlockNumber, - Commit<::SignatureScheme>, - Option<::Block>, -)>; +pub struct SyncedBlock { + pub number: BlockNumber, + pub block: ::Block, + pub commit: Commit<::SignatureScheme>, +} + +pub type SyncedBlockSender = mpsc::UnboundedSender>; pub type MessageSender = mpsc::UnboundedSender>; @@ -153,7 +154,7 @@ pub type MessageSender = mpsc::UnboundedSender>; pub struct TendermintHandle { /// Channel to trigger the machine to move to the next block. /// Takes in the the previous block's commit, along with the new proposal. - pub step: StepSender, + pub synced_block: SyncedBlockSender, /// Channel to send messages received from the P2P layer. pub messages: MessageSender, /// Tendermint machine to be run on an asynchronous task. @@ -252,9 +253,9 @@ impl TendermintMachine { proposal: N::Block, ) -> TendermintHandle { let (msg_send, msg_recv) = mpsc::unbounded(); - let (step_send, step_recv) = mpsc::unbounded(); + let (synced_block_send, synced_block_recv) = mpsc::unbounded(); TendermintHandle { - step: step_send, + synced_block: synced_block_send, messages: msg_send, machine: { let sys_time = sys_time(last_time); @@ -274,7 +275,7 @@ impl TendermintMachine { queue: VecDeque::new(), msg_recv, - step_recv, + synced_block_recv, block: BlockData::new( weights, @@ -309,12 +310,19 @@ impl TendermintMachine { if let Some((broadcast, msg)) = futures::select_biased! { // Handle a new block occuring externally (an external sync loop) // Has the highest priority as it makes all other futures here irrelevant - msg = self.step_recv.next() => { - if let Some((block_number, commit, proposal)) = msg { + msg = self.synced_block_recv.next() => { + if let Some(SyncedBlock { number, block, commit }) = msg { // Commit is for a block we've already moved past - if block_number != self.block.number { + if number != self.block.number { continue; } + + // Commit is invalid + if !self.network.verify_commit(block.id(), &commit) { + continue; + } + + let proposal = self.network.add_block(block, commit.clone()).await; self.reset_by_commit(commit, proposal).await; None } else { diff --git a/coordinator/tributary/tendermint/tests/ext.rs b/coordinator/tributary/tendermint/tests/ext.rs index 818f2acd..9c1dd9ee 100644 --- a/coordinator/tributary/tendermint/tests/ext.rs +++ b/coordinator/tributary/tendermint/tests/ext.rs @@ -11,7 +11,7 @@ use futures::SinkExt; use tokio::{sync::RwLock, time::sleep}; use tendermint_machine::{ - ext::*, SignedMessageFor, StepSender, MessageSender, TendermintMachine, TendermintHandle, + ext::*, SignedMessageFor, SyncedBlockSender, MessageSender, TendermintMachine, TendermintHandle, }; type TestValidatorId = u16; @@ -97,7 +97,7 @@ impl Block for TestBlock { } #[allow(clippy::type_complexity)] -struct TestNetwork(u16, Arc, StepSender)>>>); +struct TestNetwork(u16, Arc, SyncedBlockSender)>>>); #[async_trait] impl Network for TestNetwork { @@ -149,13 +149,15 @@ impl Network for TestNetwork { } impl TestNetwork { - async fn new(validators: usize) -> Arc, StepSender)>>> { + async fn new( + validators: usize, + ) -> Arc, SyncedBlockSender)>>> { let arc = Arc::new(RwLock::new(vec![])); { let mut write = arc.write().await; for i in 0 .. validators { let i = u16::try_from(i).unwrap(); - let TendermintHandle { messages, machine, step } = TendermintMachine::new( + let TendermintHandle { messages, synced_block, machine } = TendermintMachine::new( TestNetwork(i, arc.clone()), BlockNumber(1), SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), @@ -163,7 +165,7 @@ impl TestNetwork { ) .await; tokio::task::spawn(machine.run()); - write.push((messages, step)); + write.push((messages, synced_block)); } } arc