From d323fc8b7bacbee7e337a94539faf839764df2f3 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Mon, 10 Apr 2023 11:11:46 -0400 Subject: [PATCH] Handle signing batches in the processor Duplicates the existing signer for one tailored to batch signing. --- Cargo.lock | 1 + processor/Cargo.toml | 1 + processor/messages/src/lib.rs | 46 ++-- processor/src/coins/bitcoin.rs | 11 +- processor/src/coins/mod.rs | 3 +- processor/src/coins/monero.rs | 10 +- processor/src/main.rs | 95 +++++-- processor/src/scanner.rs | 49 +++- processor/src/signer.rs | 3 - processor/src/substrate_signer.rs | 405 ++++++++++++++++++++++++++++++ processor/src/tests/addresses.rs | 4 +- processor/src/tests/scanner.rs | 7 +- processor/src/tests/wallet.rs | 10 +- 13 files changed, 576 insertions(+), 69 deletions(-) create mode 100644 processor/src/substrate_signer.rs diff --git a/Cargo.lock b/Cargo.lock index 7809352e..9a453635 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6591,6 +6591,7 @@ dependencies = [ "dalek-ff-group", "env_logger", "flexible-transcript", + "frost-schnorrkel", "futures", "group 0.13.0", "hex", diff --git a/processor/Cargo.toml b/processor/Cargo.toml index b14ca99d..11f0d613 100644 --- a/processor/Cargo.toml +++ b/processor/Cargo.toml @@ -36,6 +36,7 @@ group = "0.13" transcript = { package = "flexible-transcript", path = "../crypto/transcript" } frost = { package = "modular-frost", path = "../crypto/frost", features = ["ristretto"] } +frost-schnorrkel = { path = "../crypto/schnorrkel" } # Substrate sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false } diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index fce2b948..9d3e8282 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -106,12 +106,6 @@ pub mod sign { Completed { key: Vec, id: [u8; 32], tx: Vec }, } - impl CoordinatorMessage { - pub fn required_block(&self) -> Option { - None - } - } - #[derive(Clone, PartialEq, Eq, Debug, Zeroize, Serialize, Deserialize)] pub enum ProcessorMessage { // Created preprocess for the specified signing protocol. @@ -123,6 +117,10 @@ pub mod sign { } impl CoordinatorMessage { + pub fn required_block(&self) -> Option { + None + } + pub fn key(&self) -> &[u8] { match self { CoordinatorMessage::Preprocesses { id, .. } => &id.key, @@ -134,41 +132,39 @@ pub mod sign { } pub mod coordinator { - use super::*; + use super::{sign::SignId, *}; #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] pub enum CoordinatorMessage { - // The validators have off-chain agreeance on this block being finalized. That means it should - // be signed and published to Substrate. - BlockFinalized { key: Vec, block: BlockHash }, - // Uses Vec instead of [u8; 64] since serde Deserialize isn't implemented for [u8; 64] - BlockPreprocesses { key: Vec, block: BlockHash, preprocesses: HashMap> }, - BlockShares { key: Vec, block: BlockHash, shares: HashMap }, + BatchPreprocesses { id: SignId, preprocesses: HashMap> }, + BatchShares { id: SignId, shares: HashMap }, // Needed so a client which didn't participate in signing can still realize signing completed - BlockSigned { key: Vec, block: BlockHash, signature: Vec }, + BatchSigned { key: Vec, block: BlockHash }, } impl CoordinatorMessage { pub fn required_block(&self) -> Option { Some(match self { - CoordinatorMessage::BlockFinalized { block, .. } => *block, - CoordinatorMessage::BlockPreprocesses { block, .. } => *block, - CoordinatorMessage::BlockShares { block, .. } => *block, - CoordinatorMessage::BlockSigned { block, .. } => *block, + CoordinatorMessage::BatchPreprocesses { id, .. } => BlockHash(id.id), + CoordinatorMessage::BatchShares { id, .. } => BlockHash(id.id), + CoordinatorMessage::BatchSigned { block, .. } => *block, }) } + + pub fn key(&self) -> &[u8] { + match self { + CoordinatorMessage::BatchPreprocesses { id, .. } => &id.key, + CoordinatorMessage::BatchShares { id, .. } => &id.key, + CoordinatorMessage::BatchSigned { key, .. } => key, + } + } } #[derive(Clone, PartialEq, Eq, Debug, Zeroize, Serialize, Deserialize)] pub enum ProcessorMessage { - // This should become an inherent transaction by the block producer. - // As an inherent, this should be ~41 bytes per update. - // Ideally, we don't need to put finalized_block on chain though. - Block { key: Vec, latest_number: u64, finalized_block: BlockHash }, - - BlockPreprocess { key: Vec, block: BlockHash, preprocess: Vec }, - BlockSign { key: Vec, block: BlockHash, share: [u8; 32] }, + BatchPreprocess { id: SignId, preprocess: Vec }, + BatchShare { id: SignId, share: [u8; 32] }, } } diff --git a/processor/src/coins/bitcoin.rs b/processor/src/coins/bitcoin.rs index c3cd32f2..994346ca 100644 --- a/processor/src/coins/bitcoin.rs +++ b/processor/src/coins/bitcoin.rs @@ -1,4 +1,8 @@ -use std::{io, collections::HashMap}; +use std::{ + time::{SystemTime, Duration}, + io, + collections::HashMap, +}; use async_trait::async_trait; @@ -10,7 +14,7 @@ use frost::{ ThresholdKeys, }; -use tokio::time::{Duration, sleep}; +use tokio::time::sleep; use bitcoin_serai::{ bitcoin::{ @@ -197,6 +201,9 @@ impl BlockTrait for Block { hash.reverse(); hash } + fn time(&self) -> SystemTime { + SystemTime::UNIX_EPOCH + Duration::from_secs(self.header.time.into()) + } fn median_fee(&self) -> Fee { // TODO Fee(20) diff --git a/processor/src/coins/mod.rs b/processor/src/coins/mod.rs index 5c57d4a8..9d6b28de 100644 --- a/processor/src/coins/mod.rs +++ b/processor/src/coins/mod.rs @@ -1,5 +1,5 @@ use core::fmt::Debug; -use std::{collections::HashMap, io}; +use std::{time::SystemTime, io, collections::HashMap}; use async_trait::async_trait; use thiserror::Error; @@ -160,6 +160,7 @@ pub trait Block: Send + Sync + Sized + Clone + Debug { // This is currently bounded to being 32-bytes. type Id: 'static + Id; fn id(&self) -> Self::Id; + fn time(&self) -> SystemTime; fn median_fee(&self) -> C::Fee; } diff --git a/processor/src/coins/monero.rs b/processor/src/coins/monero.rs index 7ba78fb3..ec030e13 100644 --- a/processor/src/coins/monero.rs +++ b/processor/src/coins/monero.rs @@ -1,4 +1,8 @@ -use std::{time::Duration, collections::HashMap, io}; +use std::{ + time::{SystemTime, Duration}, + collections::HashMap, + io, +}; use async_trait::async_trait; @@ -142,6 +146,10 @@ impl BlockTrait for Block { self.0 } + fn time(&self) -> SystemTime { + SystemTime::UNIX_EPOCH + Duration::from_secs(self.1.header.timestamp) + } + fn median_fee(&self) -> Fee { // TODO Fee { per_weight: 80000, mask: 10000 } diff --git a/processor/src/main.rs b/processor/src/main.rs index 4acdd31b..b5c0173a 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -22,7 +22,7 @@ use serai_client::{ primitives::{MAX_DATA_LEN, BlockHash}, tokens::primitives::{OutInstruction, OutInstructionWithBalance}, in_instructions::primitives::{ - Shorthand, RefundableInInstruction, InInstructionWithBalance, Batch, SignedBatch, + Shorthand, RefundableInInstruction, InInstructionWithBalance, Batch, }, }; @@ -50,6 +50,9 @@ use key_gen::{KeyGenEvent, KeyGen}; mod signer; use signer::{SignerEvent, Signer, SignerHandle}; +mod substrate_signer; +use substrate_signer::{SubstrateSignerEvent, SubstrateSigner, SubstrateSignerHandle}; + mod scanner; use scanner::{ScannerEvent, Scanner, ScannerHandle}; @@ -84,6 +87,20 @@ impl<'a, C: Coin, D: Db> Future for SignerMessageFuture<'a, C, D> { } } +struct SubstrateSignerMessageFuture<'a, D: Db>(&'a mut HashMap, SubstrateSignerHandle>); +impl<'a, D: Db> Future for SubstrateSignerMessageFuture<'a, D> { + type Output = (Vec, SubstrateSignerEvent); + fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { + for (key, signer) in self.0.iter_mut() { + match signer.events.poll_recv(ctx) { + Poll::Ready(event) => return Poll::Ready((key.clone(), event.unwrap())), + Poll::Pending => {} + } + } + Poll::Pending + } +} + async fn get_fee(coin: &C, block_number: usize) -> C::Fee { loop { // TODO2: Use an fee representative of several blocks @@ -179,6 +196,12 @@ async fn sign_plans( } async fn run(raw_db: D, coin: C, mut coordinator: Co) { + // We currently expect a contextless bidirectional mapping between these two values + // (which is that any value of A can be interpreted as B and vice versa) + // While we can write a contextual mapping, we have yet to do so + // This check ensures no coin which doesn't have a bidirectional mapping is defined + assert_eq!(>::Id::default().as_ref().len(), BlockHash([0u8; 32]).0.len()); + let mut entropy_transcript = { let entropy = Zeroizing::new(env::var("ENTROPY").expect("entropy wasn't provided as an env var")); @@ -211,6 +234,7 @@ async fn run(raw_db: D, coin: C, mut coordinato let (mut scanner, active_keys) = Scanner::new(coin.clone(), raw_db.clone()); let mut schedulers = HashMap::, Scheduler>::new(); + let mut substrate_signers = HashMap::new(); let mut signers = HashMap::new(); let mut main_db = MainDb::new(raw_db.clone()); @@ -218,8 +242,15 @@ async fn run(raw_db: D, coin: C, mut coordinato for key in &active_keys { // TODO: Load existing schedulers - // TODO: Handle the Ristretto key - let signer = Signer::new(raw_db.clone(), coin.clone(), key_gen.keys(key).1); + let (substrate_keys, coin_keys) = key_gen.keys(key); + + let substrate_key = substrate_keys.group_key(); + let substrate_signer = SubstrateSigner::new(raw_db.clone(), substrate_keys); + // We don't have to load any state for this since the Scanner will re-fire any events + // necessary + substrate_signers.insert(substrate_key.to_bytes().to_vec(), substrate_signer); + + let signer = Signer::new(raw_db.clone(), coin.clone(), coin_keys); // Load any TXs being actively signed let key = key.to_bytes(); @@ -319,10 +350,13 @@ async fn run(raw_db: D, coin: C, mut coordinato match msg.msg.clone() { CoordinatorMessage::KeyGen(msg) => { match key_gen.handle(msg).await { - // TODO: Handle substrate_keys - KeyGenEvent::KeyConfirmed { activation_block, substrate_keys: _, coin_keys } => { - let keys = coin_keys; - let key = keys.group_key(); + KeyGenEvent::KeyConfirmed { activation_block, substrate_keys, coin_keys } => { + substrate_signers.insert( + substrate_keys.group_key().to_bytes().to_vec(), + SubstrateSigner::new(raw_db.clone(), substrate_keys), + ); + + let key = coin_keys.group_key(); let mut activation_block_hash = >::Id::default(); activation_block_hash.as_mut().copy_from_slice(&activation_block.0); @@ -335,8 +369,8 @@ async fn run(raw_db: D, coin: C, mut coordinato scanner.rotate_key(activation_number, key).await; schedulers.insert(key.to_bytes().as_ref().to_vec(), Scheduler::::new(key)); signers.insert( - keys.group_key().to_bytes().as_ref().to_vec(), - Signer::new(raw_db.clone(), coin.clone(), keys) + key.to_bytes().as_ref().to_vec(), + Signer::new(raw_db.clone(), coin.clone(), coin_keys) ); }, @@ -345,13 +379,15 @@ async fn run(raw_db: D, coin: C, mut coordinato coordinator.send(ProcessorMessage::KeyGen(msg)).await; }, } - } + }, CoordinatorMessage::Sign(msg) => { signers[msg.key()].handle(msg).await; - } + }, - CoordinatorMessage::Coordinator(_) => todo!(), + CoordinatorMessage::Coordinator(msg) => { + substrate_signers[msg.key()].handle(msg).await; + }, CoordinatorMessage::Substrate(msg) => { match msg { @@ -422,7 +458,7 @@ async fn run(raw_db: D, coin: C, mut coordinato // These need to be sent to the coordinator which needs to check they aren't replayed // TODO match msg.unwrap() { - ScannerEvent::Outputs(key, block, outputs) => { + ScannerEvent::Block(key, block, time, outputs) => { let key = key.to_bytes().as_ref().to_vec(); let mut block_hash = [0; 32]; @@ -462,26 +498,38 @@ async fn run(raw_db: D, coin: C, mut coordinato }).collect() }; - coordinator.send(ProcessorMessage::Substrate( - messages::substrate::ProcessorMessage::Update { + substrate_signers[&key].sign(time, batch).await; + }, + } + }, + + (key, msg) = SubstrateSignerMessageFuture(&mut substrate_signers) => { + match msg { + SubstrateSignerEvent::ProcessorMessage(msg) => { + coordinator.send(ProcessorMessage::Coordinator(msg)).await; + }, + SubstrateSignerEvent::SignedBatch(batch) => { + coordinator + .send(ProcessorMessage::Substrate(messages::substrate::ProcessorMessage::Update { key, - batch: SignedBatch { - batch, - signature: sp_application_crypto::sr25519::Signature([0; 64]), - }, - } - )).await; + batch, + })) + .await; }, } }, (key, msg) = SignerMessageFuture(&mut signers) => { match msg { + SignerEvent::ProcessorMessage(msg) => { + coordinator.send(ProcessorMessage::Sign(msg)).await; + }, + SignerEvent::SignedTransaction { id, tx } => { main_db.finish_signing(&key, id); coordinator .send(ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed { - key, + key: key.to_vec(), id, tx: tx.as_ref().to_vec() })) @@ -495,9 +543,6 @@ async fn run(raw_db: D, coin: C, mut coordinato // 3) When the chain has an eventuality, if it had an outbound payment, report it up to // Substrate for logging purposes }, - SignerEvent::ProcessorMessage(msg) => { - coordinator.send(ProcessorMessage::Sign(msg)).await; - }, } }, } diff --git a/processor/src/scanner.rs b/processor/src/scanner.rs index 79450447..2115e86e 100644 --- a/processor/src/scanner.rs +++ b/processor/src/scanner.rs @@ -1,6 +1,7 @@ -use core::{marker::PhantomData, time::Duration}; +use core::marker::PhantomData; use std::{ sync::Arc, + time::{SystemTime, Duration}, collections::{HashSet, HashMap}, }; @@ -20,8 +21,8 @@ use crate::{ #[derive(Clone, Debug)] pub enum ScannerEvent { - // Outputs received - Outputs(::G, >::Id, Vec), + // Block scanned + Block(::G, >::Id, SystemTime, Vec), } pub type ScannerEventChannel = mpsc::UnboundedReceiver>; @@ -395,7 +396,6 @@ impl Scanner { scanner.ram_outputs.insert(id); } - // TODO: Still fire an empty Outputs event if we haven't had inputs in a while if outputs.is_empty() { continue; } @@ -405,8 +405,47 @@ impl Scanner { scanner.db.save_outputs(&mut txn, &key, &block_id, &outputs); txn.commit(); + const TIME_TOLERANCE: u64 = 15; + + let now = SystemTime::now(); + let mut time = block.time(); + + // Block is older than the tolerance + // This isn't an issue, yet shows our daemon may have fallen behind/been disconnected + if now.duration_since(time).unwrap_or(Duration::ZERO) > + Duration::from_secs(TIME_TOLERANCE) + { + warn!( + "the time is {} and we only just received a block dated {}", + (now.duration_since(SystemTime::UNIX_EPOCH)).expect("now before epoch").as_secs(), + (time.duration_since(SystemTime::UNIX_EPOCH)) + .expect("block time before epoch") + .as_secs(), + ); + } + + // If this block is in the future, either this server's clock is wrong OR the block's + // miner's clock is wrong. The latter is the problem + // + // This time is used to schedule signing sessions over the content of this block + // If it's in the future, the first attempt won't time out until this block is no + // longer in the future + // + // Since we don't need consensus, if this time is more than 15s in the future, + // set it to the local time + // + // As long as a supermajority of nodes set a time within ~15s of each other, this + // should be fine + + // TODO2: Make more robust + if time.duration_since(now).unwrap_or(Duration::ZERO) > + Duration::from_secs(TIME_TOLERANCE) + { + time = now; + } + // Send all outputs - if !scanner.emit(ScannerEvent::Outputs(key, block_id, outputs)) { + if !scanner.emit(ScannerEvent::Block(key, block_id, time, outputs)) { return; } // Write this number as scanned so we won't re-fire these outputs diff --git a/processor/src/signer.rs b/processor/src/signer.rs index a403afb0..6ba62f1d 100644 --- a/processor/src/signer.rs +++ b/processor/src/signer.rs @@ -94,9 +94,6 @@ impl SignerDb { } } -/// Coded so if the processor spontaneously reboots, one of two paths occur: -/// 1) It either didn't send its response, so the attempt will be aborted -/// 2) It did send its response, and has locally saved enough data to continue pub struct Signer { coin: C, db: SignerDb, diff --git a/processor/src/substrate_signer.rs b/processor/src/substrate_signer.rs new file mode 100644 index 00000000..5f59a9d3 --- /dev/null +++ b/processor/src/substrate_signer.rs @@ -0,0 +1,405 @@ +use core::fmt; +use std::{ + sync::Arc, + time::{SystemTime, Duration}, + collections::HashMap, +}; + +use rand_core::OsRng; + +use scale::Encode; + +use group::GroupEncoding; +use frost::{ + curve::Ristretto, + ThresholdKeys, + sign::{ + Writable, PreprocessMachine, SignMachine, SignatureMachine, AlgorithmMachine, + AlgorithmSignMachine, AlgorithmSignatureMachine, + }, +}; +use frost_schnorrkel::Schnorrkel; + +use log::{info, debug, warn}; +use tokio::{ + sync::{RwLock, mpsc}, + time::sleep, +}; + +use serai_client::in_instructions::primitives::{Batch, SignedBatch}; + +use messages::{sign::SignId, coordinator::*}; +use crate::{DbTxn, Db}; + +const CHANNEL_MSG: &str = "SubstrateSigner handler was dropped. Shutting down?"; + +#[derive(Debug)] +pub enum SubstrateSignerEvent { + ProcessorMessage(ProcessorMessage), + SignedBatch(SignedBatch), +} + +pub type SubstrateSignerEventChannel = mpsc::UnboundedReceiver; + +#[derive(Debug)] +struct SubstrateSignerDb(D); +impl SubstrateSignerDb { + fn sign_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec { + D::key(b"SUBSTRATE_SIGNER", dst, key) + } + + fn completed_key(id: [u8; 32]) -> Vec { + Self::sign_key(b"completed", id) + } + fn complete(&mut self, txn: &mut D::Transaction, id: [u8; 32]) { + txn.put(Self::completed_key(id), [1]); + } + fn completed(&self, id: [u8; 32]) -> bool { + self.0.get(Self::completed_key(id)).is_some() + } + + fn attempt_key(id: &SignId) -> Vec { + Self::sign_key(b"attempt", bincode::serialize(id).unwrap()) + } + fn attempt(&mut self, txn: &mut D::Transaction, id: &SignId) { + txn.put(Self::attempt_key(id), []); + } + fn has_attempt(&mut self, id: &SignId) -> bool { + self.0.get(Self::attempt_key(id)).is_some() + } + + fn save_batch(&mut self, txn: &mut D::Transaction, batch: &SignedBatch) { + txn.put(Self::sign_key(b"batch", batch.batch.block), batch.encode()); + } +} + +pub struct SubstrateSigner { + db: SubstrateSignerDb, + + keys: ThresholdKeys, + + signable: HashMap<[u8; 32], (SystemTime, Batch)>, + attempt: HashMap<[u8; 32], u32>, + preprocessing: HashMap<[u8; 32], AlgorithmSignMachine>, + signing: HashMap<[u8; 32], AlgorithmSignatureMachine>, + + events: mpsc::UnboundedSender, +} + +impl fmt::Debug for SubstrateSigner { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt + .debug_struct("SubstrateSigner") + .field("signable", &self.signable) + .field("attempt", &self.attempt) + .finish_non_exhaustive() + } +} + +#[derive(Debug)] +pub struct SubstrateSignerHandle { + signer: Arc>>, + pub events: SubstrateSignerEventChannel, +} + +impl SubstrateSigner { + #[allow(clippy::new_ret_no_self)] + pub fn new(db: D, keys: ThresholdKeys) -> SubstrateSignerHandle { + let (events_send, events_recv) = mpsc::unbounded_channel(); + + let signer = Arc::new(RwLock::new(SubstrateSigner { + db: SubstrateSignerDb(db), + + keys, + + signable: HashMap::new(), + attempt: HashMap::new(), + preprocessing: HashMap::new(), + signing: HashMap::new(), + + events: events_send, + })); + + tokio::spawn(SubstrateSigner::run(signer.clone())); + + SubstrateSignerHandle { signer, events: events_recv } + } + + fn verify_id(&self, id: &SignId) -> Result<(), ()> { + if !id.signing_set(&self.keys.params()).contains(&self.keys.params().i()) { + panic!("coordinator sent us preprocesses for a signing attempt we're not participating in"); + } + + // Check the attempt lines up + match self.attempt.get(&id.id) { + // If we don't have an attempt logged, it's because the coordinator is faulty OR + // because we rebooted + None => { + warn!("not attempting {:?}. this is an error if we didn't reboot", id); + // Don't panic on the assumption we rebooted + Err(())?; + } + Some(attempt) => { + // This could be an old attempt, or it may be a 'future' attempt if we rebooted and + // our SystemTime wasn't monotonic, as it may be + if attempt != &id.attempt { + debug!("sent signing data for a distinct attempt"); + Err(())?; + } + } + } + + Ok(()) + } + + fn emit(&mut self, event: SubstrateSignerEvent) -> bool { + if self.events.send(event).is_err() { + info!("{}", CHANNEL_MSG); + false + } else { + true + } + } + + async fn handle(&mut self, msg: CoordinatorMessage) { + match msg { + CoordinatorMessage::BatchPreprocesses { id, mut preprocesses } => { + if self.verify_id(&id).is_err() { + return; + } + + let machine = match self.preprocessing.remove(&id.id) { + // Either rebooted or RPC error, or some invariant + None => { + warn!("not preprocessing for {:?}. this is an error if we didn't reboot", id); + return; + } + Some(machine) => machine, + }; + + let preprocesses = match preprocesses + .drain() + .map(|(l, preprocess)| { + machine + .read_preprocess::<&[u8]>(&mut preprocess.as_ref()) + .map(|preprocess| (l, preprocess)) + }) + .collect::>() + { + Ok(preprocesses) => preprocesses, + Err(e) => todo!("malicious signer: {:?}", e), + }; + + let (machine, share) = match machine.sign(preprocesses, &self.signable[&id.id].1.encode()) { + Ok(res) => res, + Err(e) => todo!("malicious signer: {:?}", e), + }; + self.signing.insert(id.id, machine); + + // Broadcast our share + let mut share_bytes = [0; 32]; + share_bytes.copy_from_slice(&share.serialize()); + self.emit(SubstrateSignerEvent::ProcessorMessage(ProcessorMessage::BatchShare { + id, + share: share_bytes, + })); + } + + CoordinatorMessage::BatchShares { id, mut shares } => { + if self.verify_id(&id).is_err() { + return; + } + + let machine = match self.signing.remove(&id.id) { + // Rebooted, RPC error, or some invariant + None => { + // If preprocessing has this ID, it means we were never sent the preprocess by the + // coordinator + if self.preprocessing.contains_key(&id.id) { + panic!("never preprocessed yet signing?"); + } + + warn!("not preprocessing for {:?}. this is an error if we didn't reboot", id); + return; + } + Some(machine) => machine, + }; + + let shares = match shares + .drain() + .map(|(l, share)| { + machine.read_share::<&[u8]>(&mut share.as_ref()).map(|share| (l, share)) + }) + .collect::>() + { + Ok(shares) => shares, + Err(e) => todo!("malicious signer: {:?}", e), + }; + + let sig = match machine.complete(shares) { + Ok(res) => res, + Err(e) => todo!("malicious signer: {:?}", e), + }; + + let batch = + SignedBatch { batch: self.signable.remove(&id.id).unwrap().1, signature: sig.into() }; + + // Save the batch in case it's needed for recovery + let mut txn = self.db.0.txn(); + self.db.save_batch(&mut txn, &batch); + self.db.complete(&mut txn, id.id); + txn.commit(); + + // Stop trying to sign for this batch + assert!(self.attempt.remove(&id.id).is_some()); + assert!(self.preprocessing.remove(&id.id).is_none()); + assert!(self.signing.remove(&id.id).is_none()); + + self.emit(SubstrateSignerEvent::SignedBatch(batch)); + } + + CoordinatorMessage::BatchSigned { key: _, block } => { + // Stop trying to sign for this batch + let mut txn = self.db.0.txn(); + self.db.complete(&mut txn, block.0); + txn.commit(); + + self.signable.remove(&block.0); + self.attempt.remove(&block.0); + self.preprocessing.remove(&block.0); + self.signing.remove(&block.0); + + // This doesn't emit SignedBatch because it doesn't have access to the SignedBatch + // The coordinator is expected to only claim a batch was signed if it's on the Substrate + // chain, hence why it's unnecessary to check it/back it up here + + // This also doesn't emit any further events since all mutation happen on the + // substrate::CoordinatorMessage::BlockAcknowledged message (which SignedBatch is meant to + // end up triggering) + } + } + } + + // An async function, to be spawned on a task, to handle signing + async fn run(signer_arc: Arc>) { + const SIGN_TIMEOUT: u64 = 30; + + loop { + // Sleep until a timeout expires (or five seconds expire) + // Since this code start new sessions, it will delay any ordered signing sessions from + // starting for up to 5 seconds, hence why this number can't be too high (such as 30 seconds, + // the full timeout) + // This won't delay re-attempting any signing session however, nor will it block the + // sign_transaction function (since this doesn't hold any locks) + sleep({ + let now = SystemTime::now(); + let mut lowest = Duration::from_secs(5); + let signer = signer_arc.read().await; + for (id, (start, _)) in &signer.signable { + let until = if let Some(attempt) = signer.attempt.get(id) { + // Get when this attempt times out + (*start + Duration::from_secs(u64::from(attempt + 1) * SIGN_TIMEOUT)) + .duration_since(now) + .unwrap_or(Duration::ZERO) + } else { + Duration::ZERO + }; + + if until < lowest { + lowest = until; + } + } + lowest + }) + .await; + + // Because a signing attempt has timed out (or five seconds has passed), check all + // sessions' timeouts + { + let mut signer = signer_arc.write().await; + let keys = signer.signable.keys().cloned().collect::>(); + for id in keys { + let (start, _) = &signer.signable[&id]; + let start = *start; + + let attempt = u32::try_from( + SystemTime::now().duration_since(start).unwrap_or(Duration::ZERO).as_secs() / + SIGN_TIMEOUT, + ) + .unwrap(); + + // Check if we're already working on this attempt + if let Some(curr_attempt) = signer.attempt.get(&id) { + if curr_attempt >= &attempt { + continue; + } + } + + // Delete any existing machines + signer.preprocessing.remove(&id); + signer.signing.remove(&id); + + // Update the attempt number so we don't re-enter this conditional + signer.attempt.insert(id, attempt); + + let id = SignId { key: signer.keys.group_key().to_bytes().to_vec(), id, attempt }; + // Only preprocess if we're a signer + if !id.signing_set(&signer.keys.params()).contains(&signer.keys.params().i()) { + continue; + } + info!("selected to sign {:?}", id); + + // If we reboot mid-sign, the current design has us abort all signs and wait for latter + // attempts/new signing protocols + // This is distinct from the DKG which will continue DKG sessions, even on reboot + // This is because signing is tolerant of failures of up to 1/3rd of the group + // The DKG requires 100% participation + // While we could apply similar tricks as the DKG (a seeded RNG) to achieve support for + // reboots, it's not worth the complexity when messing up here leaks our secret share + // + // Despite this, on reboot, we'll get told of active signing items, and may be in this + // branch again for something we've already attempted + // + // Only run if this hasn't already been attempted + if signer.db.has_attempt(&id) { + warn!("already attempted {:?}. this is an error if we didn't reboot", id); + continue; + } + + let mut txn = signer.db.0.txn(); + signer.db.attempt(&mut txn, &id); + txn.commit(); + + // b"substrate" is a literal from sp-core + let machine = AlgorithmMachine::new(Schnorrkel::new(b"substrate"), signer.keys.clone()); + + let (machine, preprocess) = machine.preprocess(&mut OsRng); + signer.preprocessing.insert(id.id, machine); + + // Broadcast our preprocess + if !signer.emit(SubstrateSignerEvent::ProcessorMessage( + ProcessorMessage::BatchPreprocess { id, preprocess: preprocess.serialize() }, + )) { + return; + } + } + } + } + } +} + +impl SubstrateSignerHandle { + pub async fn sign(&self, start: SystemTime, batch: Batch) { + let mut signer = self.signer.write().await; + if signer.db.completed(batch.block.0) { + debug!("Sign batch order for ID we've already completed signing"); + // See BatchSigned for commentary on why this simply returns + return; + } + signer.signable.insert(batch.block.0, (start, batch)); + } + + pub async fn handle(&self, msg: CoordinatorMessage) { + self.signer.write().await.handle(msg).await; + } +} diff --git a/processor/src/tests/addresses.rs b/processor/src/tests/addresses.rs index 10310f90..5c07ceac 100644 --- a/processor/src/tests/addresses.rs +++ b/processor/src/tests/addresses.rs @@ -49,7 +49,7 @@ async fn spend( coin.mine_block().await; } match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Outputs(this_key, _, outputs) => { + ScannerEvent::Block(this_key, _, _, outputs) => { assert_eq!(this_key, key); assert_eq!(outputs.len(), 1); // Make sure this is actually a change output @@ -82,7 +82,7 @@ pub async fn test_addresses(coin: C) { // Verify the Scanner picked them up let outputs = match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Outputs(this_key, block, outputs) => { + ScannerEvent::Block(this_key, block, _, outputs) => { assert_eq!(this_key, key); assert_eq!(block, block_id); assert_eq!(outputs.len(), 1); diff --git a/processor/src/tests/scanner.rs b/processor/src/tests/scanner.rs index 22b4987e..0829f880 100644 --- a/processor/src/tests/scanner.rs +++ b/processor/src/tests/scanner.rs @@ -40,15 +40,18 @@ pub async fn test_scanner(coin: C) { let scanner = new_scanner().await; // Receive funds - let block_id = coin.test_send(C::address(keys.group_key())).await.id(); + let block = coin.test_send(C::address(keys.group_key())).await; + let block_id = block.id(); + let block_time = block.time(); // Verify the Scanner picked them up let verify_event = |mut scanner: ScannerHandle| async { let outputs = match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Outputs(key, block, outputs) => { + ScannerEvent::Block(key, block, time, outputs) => { assert_eq!(key, keys.group_key()); assert_eq!(block, block_id); + assert_eq!(time, block_time); assert_eq!(outputs.len(), 1); assert_eq!(outputs[0].kind(), OutputType::External); outputs diff --git a/processor/src/tests/wallet.rs b/processor/src/tests/wallet.rs index 85171d74..bd85a6c4 100644 --- a/processor/src/tests/wallet.rs +++ b/processor/src/tests/wallet.rs @@ -27,12 +27,15 @@ pub async fn test_wallet(coin: C) { let (block_id, outputs) = { scanner.rotate_key(coin.get_latest_block_number().await.unwrap(), key).await; - let block_id = coin.test_send(C::address(key)).await.id(); + let block = coin.test_send(C::address(key)).await; + let block_id = block.id(); + let block_time = block.time(); match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Outputs(this_key, block, outputs) => { + ScannerEvent::Block(this_key, block, time, outputs) => { assert_eq!(this_key, key); assert_eq!(block, block_id); + assert_eq!(time, block_time); assert_eq!(outputs.len(), 1); (block_id, outputs) } @@ -96,9 +99,10 @@ pub async fn test_wallet(coin: C) { } match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Outputs(this_key, block_id, these_outputs) => { + ScannerEvent::Block(this_key, block_id, time, these_outputs) => { assert_eq!(this_key, key); assert_eq!(block_id, block.id()); + assert_eq!(time, block.time()); assert_eq!(these_outputs, outputs); } }