Simply Coordinator/Processors::send by accepting impl Into *Message

This commit is contained in:
Luke Parker 2023-09-29 04:19:59 -04:00
parent 0eff3d9453
commit bd5491dfd5
No known key found for this signature in database
7 changed files with 69 additions and 89 deletions

View file

@ -88,7 +88,6 @@ async fn add_tributary<D: Db, Pro: Processors, P: P2p>(
processors processors
.send( .send(
set.network, set.network,
processor_messages::CoordinatorMessage::KeyGen(
processor_messages::key_gen::CoordinatorMessage::GenerateKey { processor_messages::key_gen::CoordinatorMessage::GenerateKey {
id: processor_messages::key_gen::KeyGenId { set, attempt: 0 }, id: processor_messages::key_gen::KeyGenId { set, attempt: 0 },
params: frost::ThresholdParams::new( params: frost::ThresholdParams::new(
@ -100,7 +99,6 @@ async fn add_tributary<D: Db, Pro: Processors, P: P2p>(
) )
.unwrap(), .unwrap(),
}, },
),
) )
.await; .await;
@ -625,10 +623,8 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
// Re-define batch // Re-define batch
// We can't drop it, yet it shouldn't be accidentally used in the following block // We can't drop it, yet it shouldn't be accidentally used in the following block
#[allow(clippy::let_unit_value)] #[allow(clippy::let_unit_value, unused_variables)]
let batch = (); let batch = ();
#[allow(clippy::let_unit_value)]
let _ = batch;
// Verify all `Batch`s which we've already indexed from Substrate // Verify all `Batch`s which we've already indexed from Substrate
// This won't be complete, as it only runs when a `Batch` message is received, which // This won't be complete, as it only runs when a `Batch` message is received, which

View file

@ -14,14 +14,15 @@ pub struct Message {
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Processors: 'static + Send + Sync + Clone { pub trait Processors: 'static + Send + Sync + Clone {
async fn send(&self, network: NetworkId, msg: CoordinatorMessage); async fn send(&self, network: NetworkId, msg: impl Send + Into<CoordinatorMessage>);
async fn recv(&mut self, network: NetworkId) -> Message; async fn recv(&mut self, network: NetworkId) -> Message;
async fn ack(&mut self, msg: Message); async fn ack(&mut self, msg: Message);
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl Processors for Arc<MessageQueue> { impl Processors for Arc<MessageQueue> {
async fn send(&self, network: NetworkId, msg: CoordinatorMessage) { async fn send(&self, network: NetworkId, msg: impl Send + Into<CoordinatorMessage>) {
let msg: CoordinatorMessage = msg.into();
let metadata = let metadata =
Metadata { from: self.service, to: Service::Processor(network), intent: msg.intent() }; Metadata { from: self.service, to: Service::Processor(network), intent: msg.intent() };
let msg = serde_json::to_string(&msg).unwrap(); let msg = serde_json::to_string(&msg).unwrap();

View file

@ -18,7 +18,7 @@ use serai_client::{
use serai_db::DbTxn; use serai_db::DbTxn;
use processor_messages::{SubstrateContext, CoordinatorMessage}; use processor_messages::SubstrateContext;
use tokio::time::sleep; use tokio::time::sleep;
@ -102,7 +102,6 @@ async fn handle_key_gen<D: Db, Pro: Processors>(
processors processors
.send( .send(
set.network, set.network,
CoordinatorMessage::Substrate(
processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair { processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair {
context: SubstrateContext { context: SubstrateContext {
serai_time: block.time().unwrap() / 1000, serai_time: block.time().unwrap() / 1000,
@ -116,7 +115,6 @@ async fn handle_key_gen<D: Db, Pro: Processors>(
set, set,
key_pair, key_pair,
}, },
),
) )
.await; .await;
@ -197,7 +195,6 @@ async fn handle_batch_and_burns<D: Db, Pro: Processors>(
processors processors
.send( .send(
network, network,
CoordinatorMessage::Substrate(
processor_messages::substrate::CoordinatorMessage::SubstrateBlock { processor_messages::substrate::CoordinatorMessage::SubstrateBlock {
context: SubstrateContext { context: SubstrateContext {
serai_time: block.time().unwrap() / 1000, serai_time: block.time().unwrap() / 1000,
@ -208,7 +205,6 @@ async fn handle_batch_and_burns<D: Db, Pro: Processors>(
burns: burns.remove(&network).unwrap(), burns: burns.remove(&network).unwrap(),
batches: batches.remove(&network).unwrap(), batches: batches.remove(&network).unwrap(),
}, },
),
) )
.await; .await;
} }

View file

@ -30,10 +30,10 @@ impl MemProcessors {
#[async_trait::async_trait] #[async_trait::async_trait]
impl Processors for MemProcessors { impl Processors for MemProcessors {
async fn send(&self, network: NetworkId, msg: CoordinatorMessage) { async fn send(&self, network: NetworkId, msg: impl Send + Into<CoordinatorMessage>) {
let mut processors = self.0.write().await; let mut processors = self.0.write().await;
let processor = processors.entry(network).or_insert_with(VecDeque::new); let processor = processors.entry(network).or_insert_with(VecDeque::new);
processor.push_back(msg); processor.push_back(msg.into());
} }
async fn recv(&mut self, _: NetworkId) -> Message { async fn recv(&mut self, _: NetworkId) -> Message {
todo!() todo!()

View file

@ -25,8 +25,8 @@ use serai_client::{
use tributary::Signed; use tributary::Signed;
use processor_messages::{ use processor_messages::{
CoordinatorMessage, coordinator,
key_gen::{self, KeyGenId}, key_gen::{self, KeyGenId},
coordinator,
sign::{self, SignId}, sign::{self, SignId},
}; };
@ -309,10 +309,10 @@ pub(crate) async fn handle_application_tx<
processors processors
.send( .send(
spec.set().network, spec.set().network,
CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Commitments { key_gen::CoordinatorMessage::Commitments {
id: KeyGenId { set: spec.set(), attempt }, id: KeyGenId { set: spec.set(), attempt },
commitments, commitments,
}), },
) )
.await; .await;
} }
@ -365,10 +365,10 @@ pub(crate) async fn handle_application_tx<
processors processors
.send( .send(
spec.set().network, spec.set().network,
CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Shares { key_gen::CoordinatorMessage::Shares {
id: KeyGenId { set: spec.set(), attempt }, id: KeyGenId { set: spec.set(), attempt },
shares, shares,
}), },
) )
.await; .await;
} }
@ -458,10 +458,10 @@ pub(crate) async fn handle_application_tx<
processors processors
.send( .send(
spec.set().network, spec.set().network,
CoordinatorMessage::Coordinator(coordinator::CoordinatorMessage::BatchPreprocesses { coordinator::CoordinatorMessage::BatchPreprocesses {
id: SignId { key, id: data.plan, attempt: data.attempt }, id: SignId { key, id: data.plan, attempt: data.attempt },
preprocesses, preprocesses,
}), },
) )
.await; .await;
} }
@ -485,13 +485,13 @@ pub(crate) async fn handle_application_tx<
processors processors
.send( .send(
spec.set().network, spec.set().network,
CoordinatorMessage::Coordinator(coordinator::CoordinatorMessage::BatchShares { coordinator::CoordinatorMessage::BatchShares {
id: SignId { key, id: data.plan, attempt: data.attempt }, id: SignId { key, id: data.plan, attempt: data.attempt },
shares: shares shares: shares
.into_iter() .into_iter()
.map(|(validator, share)| (validator, share.try_into().unwrap())) .map(|(validator, share)| (validator, share.try_into().unwrap()))
.collect(), .collect(),
}), },
) )
.await; .await;
} }
@ -517,7 +517,7 @@ pub(crate) async fn handle_application_tx<
processors processors
.send( .send(
spec.set().network, spec.set().network,
CoordinatorMessage::Sign(sign::CoordinatorMessage::Preprocesses { sign::CoordinatorMessage::Preprocesses {
id: SignId { id: SignId {
key: key_pair key: key_pair
.expect("completed SignPreprocess despite not setting the key pair") .expect("completed SignPreprocess despite not setting the key pair")
@ -527,7 +527,7 @@ pub(crate) async fn handle_application_tx<
attempt: data.attempt, attempt: data.attempt,
}, },
preprocesses, preprocesses,
}), },
) )
.await; .await;
} }
@ -551,7 +551,7 @@ pub(crate) async fn handle_application_tx<
processors processors
.send( .send(
spec.set().network, spec.set().network,
CoordinatorMessage::Sign(sign::CoordinatorMessage::Shares { sign::CoordinatorMessage::Shares {
id: SignId { id: SignId {
key: key_pair key: key_pair
.expect("completed SignShares despite not setting the key pair") .expect("completed SignShares despite not setting the key pair")
@ -561,7 +561,7 @@ pub(crate) async fn handle_application_tx<
attempt: data.attempt, attempt: data.attempt,
}, },
shares, shares,
}), },
) )
.await; .await;
} }
@ -581,11 +581,7 @@ pub(crate) async fn handle_application_tx<
processors processors
.send( .send(
spec.set().network, spec.set().network,
CoordinatorMessage::Sign(sign::CoordinatorMessage::Completed { sign::CoordinatorMessage::Completed { key: key_pair.1.to_vec(), id: plan, tx: tx_hash },
key: key_pair.1.to_vec(),
id: plan,
tx: tx_hash,
}),
) )
.await; .await;
} }

View file

@ -10,14 +10,15 @@ pub struct Message {
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Coordinator { pub trait Coordinator {
async fn send(&mut self, msg: ProcessorMessage); async fn send(&mut self, msg: impl Send + Into<ProcessorMessage>);
async fn recv(&mut self) -> Message; async fn recv(&mut self) -> Message;
async fn ack(&mut self, msg: Message); async fn ack(&mut self, msg: Message);
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl Coordinator for MessageQueue { impl Coordinator for MessageQueue {
async fn send(&mut self, msg: ProcessorMessage) { async fn send(&mut self, msg: impl Send + Into<ProcessorMessage>) {
let msg: ProcessorMessage = msg.into();
let metadata = Metadata { from: self.service, to: Service::Coordinator, intent: msg.intent() }; let metadata = Metadata { from: self.service, to: Service::Coordinator, intent: msg.intent() };
let msg = serde_json::to_string(&msg).unwrap(); let msg = serde_json::to_string(&msg).unwrap();

View file

@ -13,7 +13,7 @@ use serai_client::{
validator_sets::primitives::{ValidatorSet, KeyPair}, validator_sets::primitives::{ValidatorSet, KeyPair},
}; };
use messages::{CoordinatorMessage, ProcessorMessage}; use messages::CoordinatorMessage;
use serai_env as env; use serai_env as env;
@ -202,9 +202,7 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
match msg.msg.clone() { match msg.msg.clone() {
CoordinatorMessage::KeyGen(msg) => { CoordinatorMessage::KeyGen(msg) => {
coordinator coordinator.send(tributary_mutable.key_gen.handle(txn, msg).await).await;
.send(ProcessorMessage::KeyGen(tributary_mutable.key_gen.handle(txn, msg).await))
.await;
} }
CoordinatorMessage::Sign(msg) => { CoordinatorMessage::Sign(msg) => {
@ -343,13 +341,11 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
// plans // plans
if !tributary_mutable.signers.is_empty() { if !tributary_mutable.signers.is_empty() {
coordinator coordinator
.send(messages::ProcessorMessage::Coordinator( .send(messages::coordinator::ProcessorMessage::SubstrateBlockAck {
messages::coordinator::ProcessorMessage::SubstrateBlockAck {
network: N::NETWORK, network: N::NETWORK,
block: substrate_block, block: substrate_block,
plans: to_sign.iter().map(|signable| signable.1).collect(), plans: to_sign.iter().map(|signable| signable.1).collect(),
}, })
))
.await; .await;
} }
@ -531,9 +527,7 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
info!("created batch {} ({} instructions)", batch.id, batch.instructions.len()); info!("created batch {} ({} instructions)", batch.id, batch.instructions.len());
coordinator.send( coordinator.send(
messages::ProcessorMessage::Substrate(
messages::substrate::ProcessorMessage::Batch { batch: batch.clone() } messages::substrate::ProcessorMessage::Batch { batch: batch.clone() }
)
).await; ).await;
if let Some(substrate_signer) = tributary_mutable.substrate_signer.as_mut() { if let Some(substrate_signer) = tributary_mutable.substrate_signer.as_mut() {
@ -568,18 +562,18 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
while let Some(msg) = signer.events.pop_front() { while let Some(msg) = signer.events.pop_front() {
match msg { match msg {
SignerEvent::ProcessorMessage(msg) => { SignerEvent::ProcessorMessage(msg) => {
coordinator.send(ProcessorMessage::Sign(msg)).await; coordinator.send(msg).await;
} }
SignerEvent::SignedTransaction { id, tx } => { SignerEvent::SignedTransaction { id, tx } => {
// It is important ProcessorMessage::Completed is only emitted if a Signer we had // It is important ProcessorMessage::Completed is only emitted if a Signer we had
// created the TX completed (which having it only emitted after a SignerEvent ensures) // created the TX completed (which having it only emitted after a SignerEvent ensures)
coordinator coordinator
.send(ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed { .send(messages::sign::ProcessorMessage::Completed {
key: key.clone(), key: key.clone(),
id, id,
tx: tx.as_ref().to_vec(), tx: tx.as_ref().to_vec(),
})) })
.await; .await;
} }
} }
@ -590,14 +584,10 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
while let Some(msg) = signer.events.pop_front() { while let Some(msg) = signer.events.pop_front() {
match msg { match msg {
SubstrateSignerEvent::ProcessorMessage(msg) => { SubstrateSignerEvent::ProcessorMessage(msg) => {
coordinator.send(ProcessorMessage::Coordinator(msg)).await; coordinator.send(msg).await;
} }
SubstrateSignerEvent::SignedBatch(batch) => { SubstrateSignerEvent::SignedBatch(batch) => {
coordinator coordinator.send(messages::substrate::ProcessorMessage::SignedBatch { batch }).await;
.send(ProcessorMessage::Substrate(
messages::substrate::ProcessorMessage::SignedBatch { batch },
))
.await;
} }
} }
} }