diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index d0d1ad4f..b1aeddd5 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -8,11 +8,11 @@ use zeroize::Zeroizing; use ciphersuite::{Ciphersuite, Ristretto}; use scale::Decode; -use futures::SinkExt; +use futures::{StreamExt, SinkExt}; use ::tendermint::{ ext::{BlockNumber, Commit, Block as BlockTrait, Network}, - SignedMessageFor, SyncedBlock, SyncedBlockSender, MessageSender, TendermintMachine, - TendermintHandle, + SignedMessageFor, SyncedBlock, SyncedBlockSender, SyncedBlockResultReceiver, MessageSender, + TendermintMachine, TendermintHandle, }; use serai_db::Db; @@ -53,8 +53,9 @@ pub const ACCOUNT_MEMPOOL_LIMIT: u32 = 50; // participant from flooding disks and causing out of space errors in order processes. pub const BLOCK_SIZE_LIMIT: usize = 350_000; -pub(crate) const TRANSACTION_MESSAGE: u8 = 0; -pub(crate) const TENDERMINT_MESSAGE: u8 = 1; +pub(crate) const TENDERMINT_MESSAGE: u8 = 0; +pub(crate) const BLOCK_MESSAGE: u8 = 1; +pub(crate) const TRANSACTION_MESSAGE: u8 = 2; /// An item which can be read and written. pub trait ReadWrite: Sized { @@ -89,6 +90,7 @@ pub struct Tributary { network: TendermintNetwork, synced_block: SyncedBlockSender>, + synced_block_result: Arc>, messages: Arc>>>, } @@ -119,11 +121,18 @@ impl Tributary { let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p }; - let TendermintHandle { synced_block, messages, machine } = + let TendermintHandle { synced_block, synced_block_result, messages, machine } = TendermintMachine::new(network.clone(), block_number, start_time, proposal).await; tokio::task::spawn(machine.run()); - Some(Self { db, genesis, network, synced_block, messages: Arc::new(RwLock::new(messages)) }) + Some(Self { + db, + genesis, + network, + synced_block, + synced_block_result: Arc::new(RwLock::new(synced_block_result)), + messages: Arc::new(RwLock::new(messages)), + }) } pub fn block_time() -> u32 { @@ -149,6 +158,9 @@ impl Tributary { pub fn commit(&self, hash: &[u8; 32]) -> Option> { Blockchain::::commit_from_db(&self.db, hash) } + pub fn parsed_commit(&self, hash: &[u8; 32]) -> Option> { + self.commit(hash).map(|commit| Commit::::decode(&mut commit.as_ref()).unwrap()) + } pub fn block_after(&self, hash: &[u8; 32]) -> Option<[u8; 32]> { Blockchain::::block_after(&self.db, hash) } @@ -182,32 +194,35 @@ impl Tributary { // Sync a block. // TODO: Since we have a static validator set, we should only need the tail commit? pub async fn sync_block(&mut self, block: Block, commit: Vec) -> bool { + let mut result = self.synced_block_result.write().await; + let (tip, block_number) = { let blockchain = self.network.blockchain.read().await; (blockchain.tip(), blockchain.block_number()) }; if block.header.parent != tip { + log::debug!("told to sync a block whose parent wasn't our tip"); return false; } let block = TendermintBlock(block.serialize()); let Ok(commit) = Commit::>::decode(&mut commit.as_ref()) else { + log::error!("sent an invalidly serialized commit"); return false; }; if !self.network.verify_commit(block.id(), &commit) { + log::error!("sent an invalid commit"); return false; } let number = BlockNumber((block_number + 1).into()); self.synced_block.send(SyncedBlock { number, block, commit }).await.unwrap(); - true + result.next().await.unwrap() } // Return true if the message should be rebroadcasted. - // Safe to be &self since the only usage of self is on self.network.blockchain and self.messages, - // both which successfully acquire their own write locks and don't rely on each other - pub async fn handle_message(&self, msg: &[u8]) -> bool { + pub async fn handle_message(&mut self, msg: &[u8]) -> bool { match msg.first() { Some(&TRANSACTION_MESSAGE) => { let Ok(tx) = T::read::<&[u8]>(&mut &msg[1 ..]) else { @@ -234,6 +249,19 @@ impl Tributary { false } + Some(&BLOCK_MESSAGE) => { + let mut msg_ref = &msg[1 ..]; + let Ok(block) = Block::::read(&mut msg_ref) else { + log::error!("received invalid block message"); + return false; + }; + let commit = msg[(msg.len() - msg_ref.len()) ..].to_vec(); + if self.sync_block(block, commit).await { + log::debug!("synced block over p2p net instead of building the commit ourselves"); + } + false + } + _ => false, } } diff --git a/coordinator/tributary/src/tendermint.rs b/coordinator/tributary/src/tendermint.rs index 49c79178..8b894a56 100644 --- a/coordinator/tributary/src/tendermint.rs +++ b/coordinator/tributary/src/tendermint.rs @@ -37,7 +37,8 @@ use tokio::{ }; use crate::{ - TENDERMINT_MESSAGE, ReadWrite, Transaction, BlockHeader, Block, BlockError, Blockchain, P2p, + TENDERMINT_MESSAGE, BLOCK_MESSAGE, ReadWrite, Transaction, BlockHeader, Block, BlockError, + Blockchain, P2p, }; fn challenge( @@ -56,7 +57,7 @@ fn challenge( } #[derive(Clone, PartialEq, Eq, Debug)] -pub(crate) struct Signer { +pub struct Signer { genesis: [u8; 32], key: Zeroizing<::F>, } @@ -115,7 +116,7 @@ impl SignerTrait for Signer { } #[derive(Clone, PartialEq, Eq, Debug)] -pub(crate) struct Validators { +pub struct Validators { genesis: [u8; 32], total_weight: u64, weights: HashMap<[u8; 32], u64>, @@ -281,7 +282,7 @@ impl Network for TendermintNetwork { async fn add_block( &mut self, - block: Self::Block, + serialized_block: Self::Block, commit: Commit, ) -> Option { let invalid_block = || { @@ -294,16 +295,25 @@ impl Network for TendermintNetwork { }; // Tendermint should only produce valid commits - assert!(self.verify_commit(block.id(), &commit)); + assert!(self.verify_commit(serialized_block.id(), &commit)); - let Ok(block) = Block::read::<&[u8]>(&mut block.0.as_ref()) else { + let Ok(block) = Block::read::<&[u8]>(&mut serialized_block.0.as_ref()) else { return invalid_block(); }; + let encoded_commit = commit.encode(); loop { - let block_res = self.blockchain.write().await.add_block(&block, commit.encode()); + let block_res = self.blockchain.write().await.add_block(&block, encoded_commit.clone()); match block_res { - Ok(()) => break, + Ok(()) => { + // If we successfully added this block, broadcast it + // TODO: Move this under the coordinator once we set up on new block notifications? + let mut msg = serialized_block.0; + msg.insert(0, BLOCK_MESSAGE); + msg.extend(encoded_commit); + self.p2p.broadcast(self.genesis, msg).await; + break; + } Err(BlockError::NonLocalProvided(hash)) => { log::error!( "missing provided transaction {} which other validators on tributary {} had", diff --git a/coordinator/tributary/tendermint/src/lib.rs b/coordinator/tributary/tendermint/src/lib.rs index 7ed632a9..753c6526 100644 --- a/coordinator/tributary/tendermint/src/lib.rs +++ b/coordinator/tributary/tendermint/src/lib.rs @@ -9,7 +9,7 @@ use std::{ use parity_scale_codec::{Encode, Decode}; use futures::{ - FutureExt, StreamExt, + FutureExt, StreamExt, SinkExt, future::{self, Fuse}, channel::mpsc, }; @@ -135,6 +135,7 @@ pub struct TendermintMachine { queue: VecDeque>, msg_recv: mpsc::UnboundedReceiver>, synced_block_recv: mpsc::UnboundedReceiver>, + synced_block_result_send: mpsc::UnboundedSender, block: BlockData, } @@ -146,6 +147,7 @@ pub struct SyncedBlock { } pub type SyncedBlockSender = mpsc::UnboundedSender>; +pub type SyncedBlockResultReceiver = mpsc::UnboundedReceiver; pub type MessageSender = mpsc::UnboundedSender>; @@ -154,6 +156,8 @@ pub struct TendermintHandle { /// Channel to trigger the machine to move to the next block. /// Takes in the the previous block's commit, along with the new proposal. pub synced_block: SyncedBlockSender, + /// A channel to communicate the result of a synced_block message. + pub synced_block_result: SyncedBlockResultReceiver, /// Channel to send messages received from the P2P layer. pub messages: MessageSender, /// Tendermint machine to be run on an asynchronous task. @@ -253,8 +257,10 @@ impl TendermintMachine { ) -> TendermintHandle { let (msg_send, msg_recv) = mpsc::unbounded(); let (synced_block_send, synced_block_recv) = mpsc::unbounded(); + let (synced_block_result_send, synced_block_result_recv) = mpsc::unbounded(); TendermintHandle { synced_block: synced_block_send, + synced_block_result: synced_block_result_recv, messages: msg_send, machine: { let sys_time = sys_time(last_time); @@ -275,6 +281,7 @@ impl TendermintMachine { queue: VecDeque::new(), msg_recv, synced_block_recv, + synced_block_result_send, block: BlockData::new( weights, @@ -313,16 +320,19 @@ impl TendermintMachine { if let Some(SyncedBlock { number, block, commit }) = msg { // Commit is for a block we've already moved past if number != self.block.number { + self.synced_block_result_send.send(false).await.unwrap(); continue; } // Commit is invalid if !self.network.verify_commit(block.id(), &commit) { + self.synced_block_result_send.send(false).await.unwrap(); continue; } let proposal = self.network.add_block(block, commit.clone()).await; self.reset_by_commit(commit, proposal).await; + self.synced_block_result_send.send(true).await.unwrap(); None } else { break; diff --git a/coordinator/tributary/tendermint/tests/ext.rs b/coordinator/tributary/tendermint/tests/ext.rs index 9c1dd9ee..c6248277 100644 --- a/coordinator/tributary/tendermint/tests/ext.rs +++ b/coordinator/tributary/tendermint/tests/ext.rs @@ -11,7 +11,8 @@ use futures::SinkExt; use tokio::{sync::RwLock, time::sleep}; use tendermint_machine::{ - ext::*, SignedMessageFor, SyncedBlockSender, MessageSender, TendermintMachine, TendermintHandle, + ext::*, SignedMessageFor, SyncedBlockSender, SyncedBlockResultReceiver, MessageSender, + TendermintMachine, TendermintHandle, }; type TestValidatorId = u16; @@ -97,7 +98,10 @@ impl Block for TestBlock { } #[allow(clippy::type_complexity)] -struct TestNetwork(u16, Arc, SyncedBlockSender)>>>); +struct TestNetwork( + u16, + Arc, SyncedBlockSender, SyncedBlockResultReceiver)>>>, +); #[async_trait] impl Network for TestNetwork { @@ -122,7 +126,7 @@ impl Network for TestNetwork { } async fn broadcast(&mut self, msg: SignedMessageFor) { - for (messages, _) in self.1.write().await.iter_mut() { + for (messages, _, _) in self.1.write().await.iter_mut() { messages.send(msg.clone()).await.unwrap(); } } @@ -151,21 +155,23 @@ impl Network for TestNetwork { impl TestNetwork { async fn new( validators: usize, - ) -> Arc, SyncedBlockSender)>>> { + ) -> Arc, SyncedBlockSender, SyncedBlockResultReceiver)>>> + { let arc = Arc::new(RwLock::new(vec![])); { let mut write = arc.write().await; for i in 0 .. validators { let i = u16::try_from(i).unwrap(); - let TendermintHandle { messages, synced_block, machine } = TendermintMachine::new( - TestNetwork(i, arc.clone()), - BlockNumber(1), - SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), - TestBlock { id: 1u32.to_le_bytes(), valid: Ok(()) }, - ) - .await; + let TendermintHandle { messages, synced_block, synced_block_result, machine } = + TendermintMachine::new( + TestNetwork(i, arc.clone()), + BlockNumber(1), + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(), + TestBlock { id: 1u32.to_le_bytes(), valid: Ok(()) }, + ) + .await; tokio::task::spawn(machine.run()); - write.push((messages, synced_block)); + write.push((messages, synced_block, synced_block_result)); } } arc