From bd5491dfd514bc4fe0ff56abf1b67ac413c9bac8 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Fri, 29 Sep 2023 04:19:59 -0400 Subject: [PATCH] Simply Coordinator/Processors::send by accepting impl Into *Message --- coordinator/src/main.rs | 28 ++++++++--------- coordinator/src/processors.rs | 5 +-- coordinator/src/substrate/mod.rs | 48 +++++++++++++---------------- coordinator/src/tests/mod.rs | 4 +-- coordinator/src/tributary/handle.rs | 32 +++++++++---------- processor/src/coordinator.rs | 5 +-- processor/src/main.rs | 36 ++++++++-------------- 7 files changed, 69 insertions(+), 89 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 17a021f2..013781f1 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -88,19 +88,17 @@ async fn add_tributary( processors .send( set.network, - processor_messages::CoordinatorMessage::KeyGen( - processor_messages::key_gen::CoordinatorMessage::GenerateKey { - id: processor_messages::key_gen::KeyGenId { set, attempt: 0 }, - params: frost::ThresholdParams::new( - spec.t(), - spec.n(), - spec - .i(Ristretto::generator() * key.deref()) - .expect("adding a tributary for a set we aren't in set for"), - ) - .unwrap(), - }, - ), + processor_messages::key_gen::CoordinatorMessage::GenerateKey { + id: processor_messages::key_gen::KeyGenId { set, attempt: 0 }, + params: frost::ThresholdParams::new( + spec.t(), + spec.n(), + spec + .i(Ristretto::generator() * key.deref()) + .expect("adding a tributary for a set we aren't in set for"), + ) + .unwrap(), + }, ) .await; @@ -625,10 +623,8 @@ async fn handle_processor_messages( // Re-define batch // We can't drop it, yet it shouldn't be accidentally used in the following block - #[allow(clippy::let_unit_value)] + #[allow(clippy::let_unit_value, unused_variables)] let batch = (); - #[allow(clippy::let_unit_value)] - let _ = batch; // Verify all `Batch`s which we've already indexed from Substrate // This won't be complete, as it only runs when a `Batch` message is received, which diff --git a/coordinator/src/processors.rs b/coordinator/src/processors.rs index da31d4b6..c147fad1 100644 --- a/coordinator/src/processors.rs +++ b/coordinator/src/processors.rs @@ -14,14 +14,15 @@ pub struct Message { #[async_trait::async_trait] pub trait Processors: 'static + Send + Sync + Clone { - async fn send(&self, network: NetworkId, msg: CoordinatorMessage); + async fn send(&self, network: NetworkId, msg: impl Send + Into); async fn recv(&mut self, network: NetworkId) -> Message; async fn ack(&mut self, msg: Message); } #[async_trait::async_trait] impl Processors for Arc { - async fn send(&self, network: NetworkId, msg: CoordinatorMessage) { + async fn send(&self, network: NetworkId, msg: impl Send + Into) { + let msg: CoordinatorMessage = msg.into(); let metadata = Metadata { from: self.service, to: Service::Processor(network), intent: msg.intent() }; let msg = serde_json::to_string(&msg).unwrap(); diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index 38662143..0419bb08 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -18,7 +18,7 @@ use serai_client::{ use serai_db::DbTxn; -use processor_messages::{SubstrateContext, CoordinatorMessage}; +use processor_messages::SubstrateContext; use tokio::time::sleep; @@ -102,21 +102,19 @@ async fn handle_key_gen( processors .send( set.network, - CoordinatorMessage::Substrate( - processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair { - context: SubstrateContext { - serai_time: block.time().unwrap() / 1000, - network_latest_finalized_block: serai - .get_latest_block_for_network(block.hash(), set.network) - .await? - // The processor treats this as a magic value which will cause it to find a network - // block which has a time greater than or equal to the Serai time - .unwrap_or(BlockHash([0; 32])), - }, - set, - key_pair, + processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair { + context: SubstrateContext { + serai_time: block.time().unwrap() / 1000, + network_latest_finalized_block: serai + .get_latest_block_for_network(block.hash(), set.network) + .await? + // The processor treats this as a magic value which will cause it to find a network + // block which has a time greater than or equal to the Serai time + .unwrap_or(BlockHash([0; 32])), }, - ), + set, + key_pair, + }, ) .await; @@ -197,18 +195,16 @@ async fn handle_batch_and_burns( processors .send( network, - CoordinatorMessage::Substrate( - processor_messages::substrate::CoordinatorMessage::SubstrateBlock { - context: SubstrateContext { - serai_time: block.time().unwrap() / 1000, - network_latest_finalized_block, - }, - network, - block: block.number(), - burns: burns.remove(&network).unwrap(), - batches: batches.remove(&network).unwrap(), + processor_messages::substrate::CoordinatorMessage::SubstrateBlock { + context: SubstrateContext { + serai_time: block.time().unwrap() / 1000, + network_latest_finalized_block, }, - ), + network, + block: block.number(), + burns: burns.remove(&network).unwrap(), + batches: batches.remove(&network).unwrap(), + }, ) .await; } diff --git a/coordinator/src/tests/mod.rs b/coordinator/src/tests/mod.rs index 4e766ff3..6aaa907a 100644 --- a/coordinator/src/tests/mod.rs +++ b/coordinator/src/tests/mod.rs @@ -30,10 +30,10 @@ impl MemProcessors { #[async_trait::async_trait] impl Processors for MemProcessors { - async fn send(&self, network: NetworkId, msg: CoordinatorMessage) { + async fn send(&self, network: NetworkId, msg: impl Send + Into) { let mut processors = self.0.write().await; let processor = processors.entry(network).or_insert_with(VecDeque::new); - processor.push_back(msg); + processor.push_back(msg.into()); } async fn recv(&mut self, _: NetworkId) -> Message { todo!() diff --git a/coordinator/src/tributary/handle.rs b/coordinator/src/tributary/handle.rs index b548df30..1ec83a97 100644 --- a/coordinator/src/tributary/handle.rs +++ b/coordinator/src/tributary/handle.rs @@ -25,8 +25,8 @@ use serai_client::{ use tributary::Signed; use processor_messages::{ - CoordinatorMessage, coordinator, key_gen::{self, KeyGenId}, + coordinator, sign::{self, SignId}, }; @@ -309,10 +309,10 @@ pub(crate) async fn handle_application_tx< processors .send( spec.set().network, - CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Commitments { + key_gen::CoordinatorMessage::Commitments { id: KeyGenId { set: spec.set(), attempt }, commitments, - }), + }, ) .await; } @@ -365,10 +365,10 @@ pub(crate) async fn handle_application_tx< processors .send( spec.set().network, - CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Shares { + key_gen::CoordinatorMessage::Shares { id: KeyGenId { set: spec.set(), attempt }, shares, - }), + }, ) .await; } @@ -458,10 +458,10 @@ pub(crate) async fn handle_application_tx< processors .send( spec.set().network, - CoordinatorMessage::Coordinator(coordinator::CoordinatorMessage::BatchPreprocesses { + coordinator::CoordinatorMessage::BatchPreprocesses { id: SignId { key, id: data.plan, attempt: data.attempt }, preprocesses, - }), + }, ) .await; } @@ -485,13 +485,13 @@ pub(crate) async fn handle_application_tx< processors .send( spec.set().network, - CoordinatorMessage::Coordinator(coordinator::CoordinatorMessage::BatchShares { + coordinator::CoordinatorMessage::BatchShares { id: SignId { key, id: data.plan, attempt: data.attempt }, shares: shares .into_iter() .map(|(validator, share)| (validator, share.try_into().unwrap())) .collect(), - }), + }, ) .await; } @@ -517,7 +517,7 @@ pub(crate) async fn handle_application_tx< processors .send( spec.set().network, - CoordinatorMessage::Sign(sign::CoordinatorMessage::Preprocesses { + sign::CoordinatorMessage::Preprocesses { id: SignId { key: key_pair .expect("completed SignPreprocess despite not setting the key pair") @@ -527,7 +527,7 @@ pub(crate) async fn handle_application_tx< attempt: data.attempt, }, preprocesses, - }), + }, ) .await; } @@ -551,7 +551,7 @@ pub(crate) async fn handle_application_tx< processors .send( spec.set().network, - CoordinatorMessage::Sign(sign::CoordinatorMessage::Shares { + sign::CoordinatorMessage::Shares { id: SignId { key: key_pair .expect("completed SignShares despite not setting the key pair") @@ -561,7 +561,7 @@ pub(crate) async fn handle_application_tx< attempt: data.attempt, }, shares, - }), + }, ) .await; } @@ -581,11 +581,7 @@ pub(crate) async fn handle_application_tx< processors .send( spec.set().network, - CoordinatorMessage::Sign(sign::CoordinatorMessage::Completed { - key: key_pair.1.to_vec(), - id: plan, - tx: tx_hash, - }), + sign::CoordinatorMessage::Completed { key: key_pair.1.to_vec(), id: plan, tx: tx_hash }, ) .await; } diff --git a/processor/src/coordinator.rs b/processor/src/coordinator.rs index 294c273c..7f4e39fb 100644 --- a/processor/src/coordinator.rs +++ b/processor/src/coordinator.rs @@ -10,14 +10,15 @@ pub struct Message { #[async_trait::async_trait] pub trait Coordinator { - async fn send(&mut self, msg: ProcessorMessage); + async fn send(&mut self, msg: impl Send + Into); async fn recv(&mut self) -> Message; async fn ack(&mut self, msg: Message); } #[async_trait::async_trait] impl Coordinator for MessageQueue { - async fn send(&mut self, msg: ProcessorMessage) { + async fn send(&mut self, msg: impl Send + Into) { + let msg: ProcessorMessage = msg.into(); let metadata = Metadata { from: self.service, to: Service::Coordinator, intent: msg.intent() }; let msg = serde_json::to_string(&msg).unwrap(); diff --git a/processor/src/main.rs b/processor/src/main.rs index 2b6d0d44..471d9328 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -13,7 +13,7 @@ use serai_client::{ validator_sets::primitives::{ValidatorSet, KeyPair}, }; -use messages::{CoordinatorMessage, ProcessorMessage}; +use messages::CoordinatorMessage; use serai_env as env; @@ -202,9 +202,7 @@ async fn handle_coordinator_msg( match msg.msg.clone() { CoordinatorMessage::KeyGen(msg) => { - coordinator - .send(ProcessorMessage::KeyGen(tributary_mutable.key_gen.handle(txn, msg).await)) - .await; + coordinator.send(tributary_mutable.key_gen.handle(txn, msg).await).await; } CoordinatorMessage::Sign(msg) => { @@ -343,13 +341,11 @@ async fn handle_coordinator_msg( // plans if !tributary_mutable.signers.is_empty() { coordinator - .send(messages::ProcessorMessage::Coordinator( - messages::coordinator::ProcessorMessage::SubstrateBlockAck { - network: N::NETWORK, - block: substrate_block, - plans: to_sign.iter().map(|signable| signable.1).collect(), - }, - )) + .send(messages::coordinator::ProcessorMessage::SubstrateBlockAck { + network: N::NETWORK, + block: substrate_block, + plans: to_sign.iter().map(|signable| signable.1).collect(), + }) .await; } @@ -531,9 +527,7 @@ async fn run(mut raw_db: D, network: N, mut info!("created batch {} ({} instructions)", batch.id, batch.instructions.len()); coordinator.send( - messages::ProcessorMessage::Substrate( - messages::substrate::ProcessorMessage::Batch { batch: batch.clone() } - ) + messages::substrate::ProcessorMessage::Batch { batch: batch.clone() } ).await; if let Some(substrate_signer) = tributary_mutable.substrate_signer.as_mut() { @@ -568,18 +562,18 @@ async fn run(mut raw_db: D, network: N, mut while let Some(msg) = signer.events.pop_front() { match msg { SignerEvent::ProcessorMessage(msg) => { - coordinator.send(ProcessorMessage::Sign(msg)).await; + coordinator.send(msg).await; } SignerEvent::SignedTransaction { id, tx } => { // It is important ProcessorMessage::Completed is only emitted if a Signer we had // created the TX completed (which having it only emitted after a SignerEvent ensures) coordinator - .send(ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed { + .send(messages::sign::ProcessorMessage::Completed { key: key.clone(), id, tx: tx.as_ref().to_vec(), - })) + }) .await; } } @@ -590,14 +584,10 @@ async fn run(mut raw_db: D, network: N, mut while let Some(msg) = signer.events.pop_front() { match msg { SubstrateSignerEvent::ProcessorMessage(msg) => { - coordinator.send(ProcessorMessage::Coordinator(msg)).await; + coordinator.send(msg).await; } SubstrateSignerEvent::SignedBatch(batch) => { - coordinator - .send(ProcessorMessage::Substrate( - messages::substrate::ProcessorMessage::SignedBatch { batch }, - )) - .await; + coordinator.send(messages::substrate::ProcessorMessage::SignedBatch { batch }).await; } } }