diff --git a/processor/src/main.rs b/processor/src/main.rs index fd919c86..103888e6 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -45,7 +45,7 @@ mod signer; use signer::Signer; mod substrate_signer; -use substrate_signer::{SubstrateSignerEvent, SubstrateSigner}; +use substrate_signer::SubstrateSigner; mod multisigs; use multisigs::{MultisigEvent, MultisigManager}; @@ -218,14 +218,17 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>( } CoordinatorMessage::Coordinator(msg) => { - tributary_mutable + if let Some(msg) = tributary_mutable .substrate_signer .as_mut() .expect( "coordinator told us to sign a batch when we don't have a Substrate signer at this time", ) .handle(txn, msg) - .await; + .await + { + coordinator.send(msg).await; + } } CoordinatorMessage::Substrate(msg) => { @@ -552,7 +555,9 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut ).await; if let Some(substrate_signer) = tributary_mutable.substrate_signer.as_mut() { - substrate_signer.sign(txn, batch).await; + if let Some(msg) = substrate_signer.sign(txn, batch).await { + coordinator.send(msg).await; + } } } @@ -577,23 +582,6 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut }, } - // Check if the signers have events - // The signers will only have events after the above select executes, so having no timeout on - // the above is fine - // TODO: Have the Signers return these events, allowing removing these channels? - if let Some(signer) = tributary_mutable.substrate_signer.as_mut() { - while let Some(msg) = signer.events.pop_front() { - match msg { - SubstrateSignerEvent::ProcessorMessage(msg) => { - coordinator.send(msg).await; - } - SubstrateSignerEvent::SignedBatch(batch) => { - coordinator.send(messages::substrate::ProcessorMessage::SignedBatch { batch }).await; - } - } - } - } - txn.into_inner().unwrap().commit(); if let Some(msg) = outer_msg { coordinator.ack(msg).await; diff --git a/processor/src/substrate_signer.rs b/processor/src/substrate_signer.rs index 2a93f42b..d64bfe3a 100644 --- a/processor/src/substrate_signer.rs +++ b/processor/src/substrate_signer.rs @@ -1,5 +1,5 @@ use core::{marker::PhantomData, fmt}; -use std::collections::{VecDeque, HashMap}; +use std::collections::HashMap; use rand_core::OsRng; @@ -31,12 +31,6 @@ fn batch_sign_id(network: NetworkId, id: u32) -> [u8; 5] { (network, id).encode().try_into().unwrap() } -#[derive(Debug)] -pub enum SubstrateSignerEvent { - ProcessorMessage(ProcessorMessage), - SignedBatch(SignedBatch), -} - #[derive(Debug)] struct SubstrateSignerDb<D: Db>(D); impl<D: Db> SubstrateSignerDb<D> { @@ -88,8 +82,6 @@ pub struct SubstrateSigner<D: Db> { #[allow(clippy::type_complexity)] signing: HashMap<[u8; 5], (AlgorithmSignatureMachine<Ristretto, Schnorrkel>, Vec<SignatureShare>)>, - - pub events: VecDeque<SubstrateSignerEvent>, } impl<D: Db> fmt::Debug for SubstrateSigner<D> { @@ -115,8 +107,6 @@ impl<D: Db> SubstrateSigner<D> { attempt: HashMap::new(), preprocessing: HashMap::new(), signing: HashMap::new(), - - events: VecDeque::new(), } } @@ -146,10 +136,16 @@ impl<D: Db> SubstrateSigner<D> { Ok(()) } - async fn attempt(&mut self, txn: &mut D::Transaction<'_>, id: [u8; 5], attempt: u32) { + #[must_use] + async fn attempt( + &mut self, + txn: &mut D::Transaction<'_>, + id: [u8; 5], + attempt: u32, + ) -> Option<ProcessorMessage> { // See above commentary for why this doesn't emit SignedBatch if SubstrateSignerDb::<D>::completed(txn, id) { - return; + return None; } // Check if we're already working on this attempt @@ -161,7 +157,7 @@ impl<D: Db> SubstrateSigner<D> { attempt, curr_attempt ); - return; + return None; } } @@ -170,7 +166,7 @@ impl<D: Db> SubstrateSigner<D> { batch.block } else { warn!("told to attempt signing a batch we aren't currently signing for"); - return; + return None; }; // Delete any existing machines @@ -201,7 +197,7 @@ impl<D: Db> SubstrateSigner<D> { hex::encode(id.id), id.attempt ); - return; + return None; } SubstrateSignerDb::<D>::attempt(txn, &id); @@ -225,29 +221,37 @@ impl<D: Db> SubstrateSigner<D> { self.preprocessing.insert(id.id, (machines, preprocesses)); // Broadcast our preprocesses - self.events.push_back(SubstrateSignerEvent::ProcessorMessage( - ProcessorMessage::BatchPreprocess { id, block, preprocesses: serialized_preprocesses }, - )); + Some(ProcessorMessage::BatchPreprocess { id, block, preprocesses: serialized_preprocesses }) } - pub async fn sign(&mut self, txn: &mut D::Transaction<'_>, batch: Batch) { + #[must_use] + pub async fn sign( + &mut self, + txn: &mut D::Transaction<'_>, + batch: Batch, + ) -> Option<ProcessorMessage> { debug_assert_eq!(self.network, batch.network); let id = batch_sign_id(batch.network, batch.id); if SubstrateSignerDb::<D>::completed(txn, id) { debug!("Sign batch order for ID we've already completed signing"); // See batch_signed for commentary on why this simply returns - return; + return None; } self.signable.insert(id, batch); - self.attempt(txn, id, 0).await; + self.attempt(txn, id, 0).await } - pub async fn handle(&mut self, txn: &mut D::Transaction<'_>, msg: CoordinatorMessage) { + #[must_use] + pub async fn handle( + &mut self, + txn: &mut D::Transaction<'_>, + msg: CoordinatorMessage, + ) -> Option<messages::ProcessorMessage> { match msg { CoordinatorMessage::BatchPreprocesses { id, mut preprocesses } => { if self.verify_id(&id).is_err() { - return; + return None; } let (machines, our_preprocesses) = match self.preprocessing.remove(&id.id) { @@ -257,7 +261,7 @@ impl<D: Db> SubstrateSigner<D> { "not preprocessing for {}. this is an error if we didn't reboot", hex::encode(id.id), ); - return; + return None; } Some(preprocess) => preprocess, }; @@ -310,14 +314,12 @@ impl<D: Db> SubstrateSigner<D> { self.signing.insert(id.id, (signature_machine.unwrap(), shares)); // Broadcast our shares - self.events.push_back(SubstrateSignerEvent::ProcessorMessage( - ProcessorMessage::BatchShare { id, shares: serialized_shares }, - )); + Some((ProcessorMessage::BatchShare { id, shares: serialized_shares }).into()) } CoordinatorMessage::BatchShares { id, mut shares } => { if self.verify_id(&id).is_err() { - return; + return None; } let (machine, our_shares) = match self.signing.remove(&id.id) { @@ -333,7 +335,7 @@ impl<D: Db> SubstrateSigner<D> { "not preprocessing for {}. this is an error if we didn't reboot", hex::encode(id.id) ); - return; + return None; } Some(signing) => signing, }; @@ -377,11 +379,11 @@ impl<D: Db> SubstrateSigner<D> { assert!(self.preprocessing.remove(&id.id).is_none()); assert!(self.signing.remove(&id.id).is_none()); - self.events.push_back(SubstrateSignerEvent::SignedBatch(batch)); + Some((messages::substrate::ProcessorMessage::SignedBatch { batch }).into()) } CoordinatorMessage::BatchReattempt { id } => { - self.attempt(txn, id.id, id.attempt).await; + self.attempt(txn, id.id, id.attempt).await.map(Into::into) } } } @@ -407,8 +409,8 @@ impl<D: Db> SubstrateSigner<D> { // While a successive batch's signing would also cause this block to be acknowledged, Substrate // guarantees a batch's ordered inclusion - // This also doesn't emit any further events since all mutation from the Batch being signed - // happens on the substrate::CoordinatorMessage::SubstrateBlock message (which SignedBatch is - // meant to end up triggering) + // This also doesn't return any messages since all mutation from the Batch being signed happens + // on the substrate::CoordinatorMessage::SubstrateBlock message (which SignedBatch is meant to + // end up triggering) } } diff --git a/processor/src/tests/substrate_signer.rs b/processor/src/tests/substrate_signer.rs index 418a5a90..def5b747 100644 --- a/processor/src/tests/substrate_signer.rs +++ b/processor/src/tests/substrate_signer.rs @@ -16,8 +16,12 @@ use serai_db::{DbTxn, Db, MemDb}; use scale::Encode; use serai_client::{primitives::*, in_instructions::primitives::*}; -use messages::coordinator::*; -use crate::substrate_signer::{SubstrateSignerEvent, SubstrateSigner}; +use messages::{ + substrate, + coordinator::{self, BatchSignId, CoordinatorMessage}, + ProcessorMessage, +}; +use crate::substrate_signer::SubstrateSigner; #[tokio::test] async fn test_substrate_signer() { @@ -50,28 +54,10 @@ async fn test_substrate_signer() { attempt: 0, }; - let mut signers = HashMap::new(); - let mut dbs = HashMap::new(); - let mut t = 0; - for i in 1 ..= keys.len() { - let i = Participant::new(u16::try_from(i).unwrap()).unwrap(); - let keys = keys.get(&i).unwrap().clone(); - t = keys.params().t(); - - let mut signer = SubstrateSigner::<MemDb>::new(NetworkId::Monero, vec![keys]); - let mut db = MemDb::new(); - let mut txn = db.txn(); - signer.sign(&mut txn, batch.clone()).await; - txn.commit(); - - signers.insert(i, signer); - dbs.insert(i, db); - } - let mut signing_set = vec![]; - while signing_set.len() < usize::from(t) { + while signing_set.len() < usize::from(keys.values().next().unwrap().params().t()) { let candidate = Participant::new( - u16::try_from((OsRng.next_u64() % u64::try_from(signers.len()).unwrap()) + 1).unwrap(), + u16::try_from((OsRng.next_u64() % u64::try_from(keys.len()).unwrap()) + 1).unwrap(), ) .unwrap(); if signing_set.contains(&candidate) { @@ -80,31 +66,43 @@ async fn test_substrate_signer() { signing_set.push(candidate); } - // All participants should emit a preprocess + let mut signers = HashMap::new(); + let mut dbs = HashMap::new(); let mut preprocesses = HashMap::new(); - for i in 1 ..= signers.len() { + for i in 1 ..= keys.len() { let i = Participant::new(u16::try_from(i).unwrap()).unwrap(); - if let SubstrateSignerEvent::ProcessorMessage(ProcessorMessage::BatchPreprocess { - id, - block: batch_block, - preprocesses: mut these_preprocesses, - }) = signers.get_mut(&i).unwrap().events.pop_front().unwrap() - { - assert_eq!(id, actual_id); - assert_eq!(batch_block, block); - assert_eq!(these_preprocesses.len(), 1); - if signing_set.contains(&i) { - preprocesses.insert(i, these_preprocesses.swap_remove(0)); + let keys = keys.get(&i).unwrap().clone(); + + let mut signer = SubstrateSigner::<MemDb>::new(NetworkId::Monero, vec![keys]); + let mut db = MemDb::new(); + + let mut txn = db.txn(); + match signer.sign(&mut txn, batch.clone()).await.unwrap() { + // All participants should emit a preprocess + coordinator::ProcessorMessage::BatchPreprocess { + id, + block: batch_block, + preprocesses: mut these_preprocesses, + } => { + assert_eq!(id, actual_id); + assert_eq!(batch_block, block); + assert_eq!(these_preprocesses.len(), 1); + if signing_set.contains(&i) { + preprocesses.insert(i, these_preprocesses.swap_remove(0)); + } } - } else { - panic!("didn't get preprocess back"); + _ => panic!("didn't get preprocess back"), } + txn.commit(); + + signers.insert(i, signer); + dbs.insert(i, db); } let mut shares = HashMap::new(); for i in &signing_set { let mut txn = dbs.get_mut(i).unwrap().txn(); - signers + match signers .get_mut(i) .unwrap() .handle( @@ -114,25 +112,25 @@ async fn test_substrate_signer() { preprocesses: clone_without(&preprocesses, i), }, ) - .await; - txn.commit(); - - if let SubstrateSignerEvent::ProcessorMessage(ProcessorMessage::BatchShare { - id, - shares: mut these_shares, - }) = signers.get_mut(i).unwrap().events.pop_front().unwrap() + .await + .unwrap() { - assert_eq!(id, actual_id); - assert_eq!(these_shares.len(), 1); - shares.insert(*i, these_shares.swap_remove(0)); - } else { - panic!("didn't get share back"); + ProcessorMessage::Coordinator(coordinator::ProcessorMessage::BatchShare { + id, + shares: mut these_shares, + }) => { + assert_eq!(id, actual_id); + assert_eq!(these_shares.len(), 1); + shares.insert(*i, these_shares.swap_remove(0)); + } + _ => panic!("didn't get share back"), } + txn.commit(); } for i in &signing_set { let mut txn = dbs.get_mut(i).unwrap().txn(); - signers + match signers .get_mut(i) .unwrap() .handle( @@ -142,22 +140,18 @@ async fn test_substrate_signer() { shares: clone_without(&shares, i), }, ) - .await; - txn.commit(); - - if let SubstrateSignerEvent::SignedBatch(signed_batch) = - signers.get_mut(i).unwrap().events.pop_front().unwrap() + .await + .unwrap() { - assert_eq!(signed_batch.batch, batch); - assert!(Public::from_raw(keys[&participant_one].group_key().to_bytes()) - .verify(&batch_message(&batch), &signed_batch.signature)); - } else { - panic!("didn't get signed batch back"); + ProcessorMessage::Substrate(substrate::ProcessorMessage::SignedBatch { + batch: signed_batch, + }) => { + assert_eq!(signed_batch.batch, batch); + assert!(Public::from_raw(keys[&participant_one].group_key().to_bytes()) + .verify(&batch_message(&batch), &signed_batch.signature)); + } + _ => panic!("didn't get signed batch back"), } - } - - // Make sure there's no events left - for (_, mut signer) in signers.drain() { - assert!(signer.events.pop_front().is_none()); + txn.commit(); } }