diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 17f89c96..183c010f 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -281,7 +281,7 @@ pub async fn handle_p2p( }; log::trace!("handling message for tributary {:?}", tributary.spec.set()); - if tributary.tributary.write().await.handle_message(&msg.msg).await { + if tributary.tributary.read().await.handle_message(&msg.msg).await { P2p::broadcast(&p2p, msg.kind, msg.msg).await; } } @@ -385,11 +385,7 @@ pub async fn handle_p2p( return; }; - // TODO: Add a check which doesn't require write to see if this is the next block in - // line - // If it's in the future, hold it for up to T time - - let res = tributary.tributary.write().await.sync_block(block, msg.msg).await; + let res = tributary.tributary.read().await.sync_block(block, msg.msg).await; log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res); } }); @@ -518,8 +514,9 @@ pub async fn handle_processors( data: share, signed: Transaction::empty_signed(), })), - // TODO - sign::ProcessorMessage::Completed { .. } => todo!(), + sign::ProcessorMessage::Completed { key: _, id, tx } => { + Some(Transaction::SignCompleted(id, tx, Transaction::empty_signed())) + } }, ProcessorMessage::Coordinator(inner_msg) => match inner_msg { coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => { @@ -624,7 +621,9 @@ pub async fn handle_processors( // If this created a transaction, publish it if let Some(mut tx) = tx { + log::trace!("processor message effected transaction {}", hex::encode(tx.hash())); let tributaries = tributaries.read().await; + log::trace!("read global tributaries"); let Some(tributary) = tributaries.get(&genesis) else { // TODO: This can happen since Substrate tells the Processor to generate commitments // at the same time it tells the Tributary to be created @@ -632,9 +631,11 @@ pub async fn handle_processors( panic!("processor is operating on tributary we don't have"); }; let tributary = tributary.tributary.read().await; + log::trace!("read specific tributary"); match tx.kind() { TransactionKind::Provided(_) => { + log::trace!("providing transaction {}", hex::encode(tx.hash())); let res = tributary.provide_transaction(tx).await; if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) { panic!("provided an invalid transaction: {res:?}"); diff --git a/coordinator/src/tributary/handle.rs b/coordinator/src/tributary/handle.rs index 37fd4108..9f9cda79 100644 --- a/coordinator/src/tributary/handle.rs +++ b/coordinator/src/tributary/handle.rs @@ -589,8 +589,20 @@ pub async fn handle_application_tx< None => {} } } - Transaction::SignCompleted(_, _, _) => { - // TODO + Transaction::SignCompleted(id, tx, signed) => { + // TODO: Confirm this is a valid ID + // TODO: Confirm this signer hasn't prior published a completion + let Some(key_pair) = TributaryDb::::key_pair(txn, spec.set()) else { todo!() }; + processors + .send( + spec.set().network, + CoordinatorMessage::Sign(sign::CoordinatorMessage::Completed { + key: key_pair.1.to_vec(), + id, + tx, + }), + ) + .await; } } } diff --git a/coordinator/src/tributary/mod.rs b/coordinator/src/tributary/mod.rs index b9ab5c38..2c4f4d63 100644 --- a/coordinator/src/tributary/mod.rs +++ b/coordinator/src/tributary/mod.rs @@ -243,6 +243,9 @@ pub enum Transaction { SignPreprocess(SignData), SignShare(SignData), + // TODO: We can't make this an Unsigned as we need to prevent spam, which requires a max of 1 + // claim per sender + // Can we de-duplicate across senders though, if they claim the same hash completes? SignCompleted([u8; 32], Vec, Signed), } diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index b7767181..e63a29d0 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -8,7 +8,7 @@ use zeroize::Zeroizing; use ciphersuite::{Ciphersuite, Ristretto}; use scale::Decode; -use futures::{StreamExt, SinkExt}; +use futures::{StreamExt, SinkExt, channel::mpsc::UnboundedReceiver}; use ::tendermint::{ ext::{BlockNumber, Commit, Block as BlockTrait, Network}, SignedMessageFor, SyncedBlock, SyncedBlockSender, SyncedBlockResultReceiver, MessageSender, @@ -144,7 +144,7 @@ pub struct Tributary { genesis: [u8; 32], network: TendermintNetwork, - synced_block: SyncedBlockSender>, + synced_block: Arc>>>, synced_block_result: Arc>, messages: Arc>>>, } @@ -188,7 +188,7 @@ impl Tributary { db, genesis, network, - synced_block, + synced_block: Arc::new(RwLock::new(synced_block)), synced_block_result: Arc::new(RwLock::new(synced_block_result)), messages: Arc::new(RwLock::new(messages)), }) @@ -239,11 +239,12 @@ impl Tributary { res } - // Sync a block. - // TODO: Since we have a static validator set, we should only need the tail commit? - pub async fn sync_block(&mut self, block: Block, commit: Vec) -> bool { - let mut result = self.synced_block_result.write().await; - + async fn sync_block_internal( + &self, + block: Block, + commit: Vec, + result: &mut UnboundedReceiver, + ) -> bool { let (tip, block_number) = { let blockchain = self.network.blockchain.read().await; (blockchain.tip(), blockchain.block_number()) @@ -272,12 +273,22 @@ impl Tributary { } let number = BlockNumber((block_number + 1).into()); - self.synced_block.send(SyncedBlock { number, block, commit }).await.unwrap(); + self.synced_block.write().await.send(SyncedBlock { number, block, commit }).await.unwrap(); result.next().await.unwrap() } + // Sync a block. + // TODO: Since we have a static validator set, we should only need the tail commit? + pub async fn sync_block(&self, block: Block, commit: Vec) -> bool { + let mut result = self.synced_block_result.write().await; + self.sync_block_internal(block, commit, &mut result).await + } + // Return true if the message should be rebroadcasted. - pub async fn handle_message(&mut self, msg: &[u8]) -> bool { + pub async fn handle_message(&self, msg: &[u8]) -> bool { + // Acquire the lock now to prevent sync_block from being run at the same time + let mut sync_block = self.synced_block_result.write().await; + match msg.first() { Some(&TRANSACTION_MESSAGE) => { let Ok(tx) = Transaction::read::<&[u8]>(&mut &msg[1 ..]) else { @@ -316,7 +327,7 @@ impl Tributary { return false; }; let commit = msg[(msg.len() - msg_ref.len()) ..].to_vec(); - if self.sync_block(block, commit).await { + if self.sync_block_internal(block, commit, &mut sync_block).await { log::debug!("synced block over p2p net instead of building the commit ourselves"); } false diff --git a/tests/coordinator/src/tests/mod.rs b/tests/coordinator/src/tests/mod.rs index 72d30906..0e07d835 100644 --- a/tests/coordinator/src/tests/mod.rs +++ b/tests/coordinator/src/tests/mod.rs @@ -43,6 +43,6 @@ pub(crate) fn new_test() -> (Vec<(Handles, ::F)>, Dock pub(crate) async fn wait_for_tributary() { tokio::time::sleep(Duration::from_secs(20)).await; if std::env::var("GITHUB_CI") == Ok("true".to_string()) { - tokio::time::sleep(Duration::from_secs(60)).await; + tokio::time::sleep(Duration::from_secs(40)).await; } }