From 2eb155753aca8bdcbcb1d831b3fa332e7d537823 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 9 Nov 2023 01:26:30 -0500 Subject: [PATCH] Remove the Signer events pseudo-channel for a returned message Also replaces SignerEvent with usage of ProcessorMessage directly. --- processor/src/main.rs | 51 +++++++--------- processor/src/signer.rs | 109 +++++++++++++++++++--------------- processor/src/tests/signer.rs | 94 ++++++++++++++--------------- 3 files changed, 127 insertions(+), 127 deletions(-) diff --git a/processor/src/main.rs b/processor/src/main.rs index 074b92c2..fd919c86 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -42,7 +42,7 @@ mod key_gen; use key_gen::{KeyConfirmed, KeyGen}; mod signer; -use signer::{SignerEvent, Signer}; +use signer::Signer; mod substrate_signer; use substrate_signer::{SubstrateSignerEvent, SubstrateSigner}; @@ -206,12 +206,15 @@ async fn handle_coordinator_msg( } CoordinatorMessage::Sign(msg) => { - tributary_mutable + if let Some(msg) = tributary_mutable .signers .get_mut(msg.key()) .expect("coordinator told us to sign with a signer we don't have") .handle(txn, msg) - .await; + .await + { + coordinator.send(msg).await; + } } CoordinatorMessage::Coordinator(msg) => { @@ -359,7 +362,9 @@ async fn handle_coordinator_msg( let signers = &mut tributary_mutable.signers; for (key, id, tx, eventuality) in to_sign { if let Some(signer) = signers.get_mut(key.to_bytes().as_ref()) { - signer.sign_transaction(txn, id, tx, eventuality).await; + if let Some(msg) = signer.sign_transaction(txn, id, tx, eventuality).await { + coordinator.send(msg).await; + } } } @@ -374,9 +379,10 @@ async fn handle_coordinator_msg( } } -async fn boot( +async fn boot( raw_db: &mut D, network: &N, + coordinator: &mut Co, ) -> (MainDb, TributaryMutable, SubstrateMutable) { let mut entropy_transcript = { let entropy = Zeroizing::new(env::var("ENTROPY").expect("entropy wasn't specified")); @@ -450,7 +456,11 @@ async fn boot( for (plan, tx, eventuality) in &actively_signing { if plan.key == network_key { let mut txn = raw_db.txn(); - signer.sign_transaction(&mut txn, plan.id(), tx.clone(), eventuality.clone()).await; + if let Some(msg) = + signer.sign_transaction(&mut txn, plan.id(), tx.clone(), eventuality.clone()).await + { + coordinator.send(msg).await; + } // This should only have re-writes of existing data drop(txn); } @@ -474,7 +484,8 @@ async fn run(mut raw_db: D, network: N, mut // This check ensures no network which doesn't have a bidirectional mapping is defined assert_eq!(>::Id::default().as_ref().len(), BlockHash([0u8; 32]).0.len()); - let (main_db, mut tributary_mutable, mut substrate_mutable) = boot(&mut raw_db, &network).await; + let (main_db, mut tributary_mutable, mut substrate_mutable) = + boot(&mut raw_db, &network, &mut coordinator).await; // We can't load this from the DB as we can't guarantee atomic increments with the ack function // TODO: Load with a slight tolerance @@ -557,7 +568,9 @@ async fn run(mut raw_db: D, network: N, mut }, MultisigEvent::Completed(key, id, tx) => { if let Some(signer) = tributary_mutable.signers.get_mut(&key) { - signer.completed(txn, id, tx); + if let Some(msg) = signer.completed(txn, id, tx) { + coordinator.send(msg).await; + } } } } @@ -568,28 +581,6 @@ async fn run(mut raw_db: D, network: N, mut // The signers will only have events after the above select executes, so having no timeout on // the above is fine // TODO: Have the Signers return these events, allowing removing these channels? - for (key, signer) in tributary_mutable.signers.iter_mut() { - while let Some(msg) = signer.events.pop_front() { - match msg { - SignerEvent::ProcessorMessage(msg) => { - coordinator.send(msg).await; - } - - SignerEvent::SignedTransaction { id, tx } => { - // It is important ProcessorMessage::Completed is only emitted if a Signer we had - // created the TX completed (which having it only emitted after a SignerEvent ensures) - coordinator - .send(messages::sign::ProcessorMessage::Completed { - key: key.clone(), - id, - tx: tx.as_ref().to_vec(), - }) - .await; - } - } - } - } - if let Some(signer) = tributary_mutable.substrate_signer.as_mut() { while let Some(msg) = signer.events.pop_front() { match msg { diff --git a/processor/src/signer.rs b/processor/src/signer.rs index 36943962..d2bf6391 100644 --- a/processor/src/signer.rs +++ b/processor/src/signer.rs @@ -1,5 +1,5 @@ use core::{marker::PhantomData, fmt}; -use std::collections::{VecDeque, HashMap}; +use std::collections::HashMap; use rand_core::OsRng; @@ -18,12 +18,6 @@ use crate::{ networks::{Transaction, Eventuality, Network}, }; -#[derive(Debug)] -pub enum SignerEvent { - SignedTransaction { id: [u8; 32], tx: >::Id }, - ProcessorMessage(ProcessorMessage), -} - #[derive(Debug)] struct SignerDb(D, PhantomData); impl SignerDb { @@ -162,8 +156,6 @@ pub struct Signer { preprocessing: HashMap<[u8; 32], (Vec>, Vec>)>, #[allow(clippy::type_complexity)] signing: HashMap<[u8; 32], (SignatureMachineFor, Vec>)>, - - pub events: VecDeque>, } impl fmt::Debug for Signer { @@ -210,8 +202,6 @@ impl Signer { attempt: HashMap::new(), preprocessing: HashMap::new(), signing: HashMap::new(), - - events: VecDeque::new(), } } @@ -245,6 +235,7 @@ impl Signer { Ok(()) } + #[must_use] fn already_completed(&self, txn: &mut D::Transaction<'_>, id: [u8; 32]) -> bool { if !SignerDb::::completions(txn, id).is_empty() { debug!( @@ -258,7 +249,12 @@ impl Signer { } } - fn complete(&mut self, id: [u8; 32], tx_id: >::Id) { + #[must_use] + fn complete( + &mut self, + id: [u8; 32], + tx_id: >::Id, + ) -> ProcessorMessage { // Assert we're actively signing for this TX assert!(self.signable.remove(&id).is_some(), "completed a TX we weren't signing for"); assert!(self.attempt.remove(&id).is_some(), "attempt had an ID signable didn't have"); @@ -271,10 +267,20 @@ impl Signer { self.signing.remove(&id); // Emit the event for it - self.events.push_back(SignerEvent::SignedTransaction { id, tx: tx_id }); + ProcessorMessage::Completed { + key: self.keys[0].group_key().to_bytes().as_ref().to_vec(), + id, + tx: tx_id.as_ref().to_vec(), + } } - pub fn completed(&mut self, txn: &mut D::Transaction<'_>, id: [u8; 32], tx: N::Transaction) { + #[must_use] + pub fn completed( + &mut self, + txn: &mut D::Transaction<'_>, + id: [u8; 32], + tx: N::Transaction, + ) -> Option { let first_completion = !self.already_completed(txn, id); // Save this completion to the DB @@ -282,17 +288,21 @@ impl Signer { SignerDb::::complete(txn, id, &tx); if first_completion { - self.complete(id, tx.id()); + Some(self.complete(id, tx.id())) + } else { + None } } + /// Returns Some if the first completion. // Doesn't use any loops/retries since we'll eventually get this from the Scanner anyways + #[must_use] async fn claimed_eventuality_completion( &mut self, txn: &mut D::Transaction<'_>, id: [u8; 32], tx_id: &>::Id, - ) -> bool { + ) -> Option { if let Some(eventuality) = SignerDb::::eventuality(txn, id) { // Transaction hasn't hit our mempool/was dropped for a different signature // The latter can happen given certain latency conditions/a single malicious signer @@ -305,7 +315,7 @@ impl Signer { hex::encode(id), "(or had another connectivity issue)", ); - return false; + return None; }; if self.network.confirm_completion(&eventuality, &tx) { @@ -317,8 +327,7 @@ impl Signer { SignerDb::::complete(txn, id, &tx); if first_completion { - self.complete(id, tx.id()); - return true; + return Some(self.complete(id, tx.id())); } } else { warn!( @@ -337,12 +346,18 @@ impl Signer { "which we already marked as completed", ); } - false + None } - async fn attempt(&mut self, txn: &mut D::Transaction<'_>, id: [u8; 32], attempt: u32) { + #[must_use] + async fn attempt( + &mut self, + txn: &mut D::Transaction<'_>, + id: [u8; 32], + attempt: u32, + ) -> Option { if self.already_completed(txn, id) { - return; + return None; } // Check if we're already working on this attempt @@ -354,7 +369,7 @@ impl Signer { attempt, curr_attempt ); - return; + return None; } } @@ -363,7 +378,7 @@ impl Signer { // (also because we do need an owned tx anyways) let Some(tx) = self.signable.get(&id).cloned() else { warn!("told to attempt a TX we aren't currently signing for"); - return; + return None; }; // Delete any existing machines @@ -395,7 +410,7 @@ impl Signer { hex::encode(id.id), id.attempt ); - return; + return None; } SignerDb::::attempt(txn, &id); @@ -408,7 +423,7 @@ impl Signer { let machine = match self.network.attempt_send(keys.clone(), tx.clone()).await { Err(e) => { error!("failed to attempt {}, #{}: {:?}", hex::encode(id.id), id.attempt, e); - return; + return None; } Ok(machine) => machine, }; @@ -426,38 +441,41 @@ impl Signer { self.preprocessing.insert(id.id, (machines, preprocesses)); // Broadcast our preprocess - self.events.push_back(SignerEvent::ProcessorMessage(ProcessorMessage::Preprocess { - id, - preprocesses: serialized_preprocesses, - })); + Some(ProcessorMessage::Preprocess { id, preprocesses: serialized_preprocesses }) } + #[must_use] pub async fn sign_transaction( &mut self, txn: &mut D::Transaction<'_>, id: [u8; 32], tx: N::SignableTransaction, eventuality: N::Eventuality, - ) { + ) -> Option { // The caller is expected to re-issue sign orders on reboot // This is solely used by the rebroadcast task SignerDb::::add_active_sign(txn, &id); if self.already_completed(txn, id) { - return; + return None; } SignerDb::::save_eventuality(txn, id, eventuality); self.signable.insert(id, tx); - self.attempt(txn, id, 0).await; + self.attempt(txn, id, 0).await } - pub async fn handle(&mut self, txn: &mut D::Transaction<'_>, msg: CoordinatorMessage) { + #[must_use] + pub async fn handle( + &mut self, + txn: &mut D::Transaction<'_>, + msg: CoordinatorMessage, + ) -> Option { match msg { CoordinatorMessage::Preprocesses { id, mut preprocesses } => { if self.verify_id(&id).is_err() { - return; + return None; } let (machines, our_preprocesses) = match self.preprocessing.remove(&id.id) { @@ -467,7 +485,7 @@ impl Signer { "not preprocessing for {}. this is an error if we didn't reboot", hex::encode(id.id) ); - return; + return None; } Some(machine) => machine, }; @@ -516,15 +534,12 @@ impl Signer { self.signing.insert(id.id, (signature_machine.unwrap(), shares)); // Broadcast our shares - self.events.push_back(SignerEvent::ProcessorMessage(ProcessorMessage::Share { - id, - shares: serialized_shares, - })); + Some(ProcessorMessage::Share { id, shares: serialized_shares }) } CoordinatorMessage::Shares { id, mut shares } => { if self.verify_id(&id).is_err() { - return; + return None; } let (machine, our_shares) = match self.signing.remove(&id.id) { @@ -540,7 +555,7 @@ impl Signer { "not preprocessing for {}. this is an error if we didn't reboot", hex::encode(id.id) ); - return; + return None; } Some(machine) => machine, }; @@ -582,12 +597,10 @@ impl Signer { } // Stop trying to sign for this TX - self.complete(id.id, tx_id); + Some(self.complete(id.id, tx_id)) } - CoordinatorMessage::Reattempt { id } => { - self.attempt(txn, id.id, id.attempt).await; - } + CoordinatorMessage::Reattempt { id } => self.attempt(txn, id.id, id.attempt).await, CoordinatorMessage::Completed { key: _, id, tx: mut tx_vec } => { let mut tx = >::Id::default(); @@ -601,11 +614,11 @@ impl Signer { hex::encode(id), "that's not a valid TX ID", ); - return; + return None; } tx.as_mut().copy_from_slice(&tx_vec); - self.claimed_eventuality_completion(txn, id, &tx).await; + self.claimed_eventuality_completion(txn, id, &tx).await } } } diff --git a/processor/src/tests/signer.rs b/processor/src/tests/signer.rs index b83c0428..a16f7097 100644 --- a/processor/src/tests/signer.rs +++ b/processor/src/tests/signer.rs @@ -16,7 +16,7 @@ use messages::sign::*; use crate::{ Payment, Plan, networks::{Output, Transaction, Network}, - signer::{SignerEvent, Signer}, + signer::Signer, }; #[allow(clippy::type_complexity)] @@ -33,9 +33,11 @@ pub async fn sign( attempt: 0, }; + let mut group_key = None; let mut keys = HashMap::new(); let mut txs = HashMap::new(); for (i, (these_keys, this_tx)) in keys_txs.drain() { + group_key = Some(these_keys.group_key()); keys.insert(i, these_keys); txs.insert(i, this_tx); } @@ -52,14 +54,6 @@ pub async fn sign( } drop(keys); - for i in 1 ..= signers.len() { - let i = Participant::new(u16::try_from(i).unwrap()).unwrap(); - let (tx, eventuality) = txs.remove(&i).unwrap(); - let mut txn = dbs.get_mut(&i).unwrap().txn(); - signers.get_mut(&i).unwrap().sign_transaction(&mut txn, actual_id.id, tx, eventuality).await; - txn.commit(); - } - let mut signing_set = vec![]; while signing_set.len() < usize::from(t) { let candidate = Participant::new( @@ -72,29 +66,35 @@ pub async fn sign( signing_set.push(candidate); } - // All participants should emit a preprocess let mut preprocesses = HashMap::new(); + for i in 1 ..= signers.len() { let i = Participant::new(u16::try_from(i).unwrap()).unwrap(); - if let SignerEvent::ProcessorMessage(ProcessorMessage::Preprocess { - id, - preprocesses: mut these_preprocesses, - }) = signers.get_mut(&i).unwrap().events.pop_front().unwrap() + let (tx, eventuality) = txs.remove(&i).unwrap(); + let mut txn = dbs.get_mut(&i).unwrap().txn(); + match signers + .get_mut(&i) + .unwrap() + .sign_transaction(&mut txn, actual_id.id, tx, eventuality) + .await { - assert_eq!(id, actual_id); - assert_eq!(these_preprocesses.len(), 1); - if signing_set.contains(&i) { - preprocesses.insert(i, these_preprocesses.swap_remove(0)); + // All participants should emit a preprocess + Some(ProcessorMessage::Preprocess { id, preprocesses: mut these_preprocesses }) => { + assert_eq!(id, actual_id); + assert_eq!(these_preprocesses.len(), 1); + if signing_set.contains(&i) { + preprocesses.insert(i, these_preprocesses.swap_remove(0)); + } } - } else { - panic!("didn't get preprocess back"); + _ => panic!("didn't get preprocess back"), } + txn.commit(); } let mut shares = HashMap::new(); for i in &signing_set { let mut txn = dbs.get_mut(i).unwrap().txn(); - signers + match signers .get_mut(i) .unwrap() .handle( @@ -104,52 +104,48 @@ pub async fn sign( preprocesses: clone_without(&preprocesses, i), }, ) - .await; - txn.commit(); - - if let SignerEvent::ProcessorMessage(ProcessorMessage::Share { id, shares: mut these_shares }) = - signers.get_mut(i).unwrap().events.pop_front().unwrap() + .await + .unwrap() { - assert_eq!(id, actual_id); - assert_eq!(these_shares.len(), 1); - shares.insert(*i, these_shares.swap_remove(0)); - } else { - panic!("didn't get share back"); + ProcessorMessage::Share { id, shares: mut these_shares } => { + assert_eq!(id, actual_id); + assert_eq!(these_shares.len(), 1); + shares.insert(*i, these_shares.swap_remove(0)); + } + _ => panic!("didn't get share back"), } + txn.commit(); } let mut tx_id = None; for i in &signing_set { let mut txn = dbs.get_mut(i).unwrap().txn(); - signers + match signers .get_mut(i) .unwrap() .handle( &mut txn, CoordinatorMessage::Shares { id: actual_id.clone(), shares: clone_without(&shares, i) }, ) - .await; - txn.commit(); - - if let SignerEvent::SignedTransaction { id, tx } = - signers.get_mut(i).unwrap().events.pop_front().unwrap() + .await + .unwrap() { - assert_eq!(id, actual_id.id); - if tx_id.is_none() { - tx_id = Some(tx.clone()); + ProcessorMessage::Completed { key, id, tx } => { + assert_eq!(&key, group_key.unwrap().to_bytes().as_ref()); + assert_eq!(id, actual_id.id); + if tx_id.is_none() { + tx_id = Some(tx.clone()); + } + assert_eq!(tx_id, Some(tx)); } - assert_eq!(tx_id, Some(tx)); - } else { - panic!("didn't get TX back"); + _ => panic!("didn't get TX back"), } + txn.commit(); } - // Make sure there's no events left - for (_, mut signer) in signers.drain() { - assert!(signer.events.pop_front().is_none()); - } - - tx_id.unwrap() + let mut typed_tx_id = >::Id::default(); + typed_tx_id.as_mut().copy_from_slice(tx_id.unwrap().as_ref()); + typed_tx_id } pub async fn test_signer(network: N) {