From ed0221d8043ddc7d0b7cc81555ffaf38ed111321 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Mon, 9 Sep 2024 01:01:29 -0400 Subject: [PATCH] Add BatchSignerTask Uses a wrapper around AlgorithmMachine Schnorrkel to let the message be &[]. --- Cargo.lock | 3 + processor/scanner/src/db.rs | 20 +-- processor/scanner/src/lib.rs | 2 +- processor/scanner/src/report/mod.rs | 4 +- processor/scanner/src/substrate/mod.rs | 4 +- processor/signers/Cargo.toml | 3 + processor/signers/src/batch/db.rs | 13 ++ processor/signers/src/batch/mod.rs | 180 ++++++++++++++++++++ processor/signers/src/coordinator.rs | 2 + processor/signers/src/lib.rs | 80 ++++++--- processor/signers/src/transaction/mod.rs | 17 +- processor/signers/src/wrapped_schnorrkel.rs | 86 ++++++++++ processor/src/multisigs/mod.rs | 8 - 13 files changed, 371 insertions(+), 51 deletions(-) create mode 100644 processor/signers/src/batch/db.rs create mode 100644 processor/signers/src/batch/mod.rs create mode 100644 processor/signers/src/wrapped_schnorrkel.rs delete mode 100644 processor/src/multisigs/mod.rs diff --git a/Cargo.lock b/Cargo.lock index d6b0e3de..9db0bb74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8724,10 +8724,13 @@ dependencies = [ "async-trait", "borsh", "ciphersuite", + "frost-schnorrkel", "log", "modular-frost", "parity-scale-codec", + "rand_core", "serai-db", + "serai-in-instructions-primitives", "serai-processor-frost-attempt-manager", "serai-processor-messages", "serai-processor-primitives", diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index f54ff8e1..3dd5a2e2 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -548,36 +548,36 @@ mod _public_db { db_channel! { ScannerPublic { - BatchToSign: (key: &[u8]) -> Batch, - AcknowledgedBatch: (key: &[u8]) -> u32, + BatchesToSign: (key: &[u8]) -> Batch, + AcknowledgedBatches: (key: &[u8]) -> u32, CompletedEventualities: (key: &[u8]) -> [u8; 32], } } } /// The batches to sign and publish. -pub struct BatchToSign(PhantomData); -impl BatchToSign { +pub struct BatchesToSign(PhantomData); +impl BatchesToSign { pub(crate) fn send(txn: &mut impl DbTxn, key: &K, batch: &Batch) { - _public_db::BatchToSign::send(txn, key.to_bytes().as_ref(), batch); + _public_db::BatchesToSign::send(txn, key.to_bytes().as_ref(), batch); } /// Receive a batch to sign and publish. pub fn try_recv(txn: &mut impl DbTxn, key: &K) -> Option { - _public_db::BatchToSign::try_recv(txn, key.to_bytes().as_ref()) + _public_db::BatchesToSign::try_recv(txn, key.to_bytes().as_ref()) } } /// The batches which were acknowledged on-chain. -pub struct AcknowledgedBatch(PhantomData); -impl AcknowledgedBatch { +pub struct AcknowledgedBatches(PhantomData); +impl AcknowledgedBatches { pub(crate) fn send(txn: &mut impl DbTxn, key: &K, batch: u32) { - _public_db::AcknowledgedBatch::send(txn, key.to_bytes().as_ref(), &batch); + _public_db::AcknowledgedBatches::send(txn, key.to_bytes().as_ref(), &batch); } /// Receive the ID of a batch which was acknowledged. pub fn try_recv(txn: &mut impl DbTxn, key: &K) -> Option { - _public_db::AcknowledgedBatch::try_recv(txn, key.to_bytes().as_ref()) + _public_db::AcknowledgedBatches::try_recv(txn, key.to_bytes().as_ref()) } } diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index bcd195ec..e5b39cdd 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -21,7 +21,7 @@ pub use lifetime::LifetimeStage; // Database schema definition and associated functions. mod db; use db::ScannerGlobalDb; -pub use db::{BatchToSign, AcknowledgedBatch, CompletedEventualities}; +pub use db::{BatchesToSign, AcknowledgedBatches, CompletedEventualities}; // Task to index the blockchain, ensuring we don't reorganize finalized blocks. mod index; // Scans blocks for received coins. diff --git a/processor/scanner/src/report/mod.rs b/processor/scanner/src/report/mod.rs index f983d0e7..309b44aa 100644 --- a/processor/scanner/src/report/mod.rs +++ b/processor/scanner/src/report/mod.rs @@ -8,7 +8,7 @@ use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; use primitives::task::ContinuallyRan; use crate::{ - db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToReportDb, BatchToSign}, + db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToReportDb, BatchesToSign}, index, scan::next_to_scan_for_outputs_block, ScannerFeed, KeyFor, @@ -160,7 +160,7 @@ impl ContinuallyRan for ReportTask { } for batch in batches { - BatchToSign::send(&mut txn, &external_key_for_session_to_sign_batch, &batch); + BatchesToSign::send(&mut txn, &external_key_for_session_to_sign_batch, &batch); } } diff --git a/processor/scanner/src/substrate/mod.rs b/processor/scanner/src/substrate/mod.rs index 6f9cd86b..76961c37 100644 --- a/processor/scanner/src/substrate/mod.rs +++ b/processor/scanner/src/substrate/mod.rs @@ -6,7 +6,7 @@ use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance}; use primitives::task::ContinuallyRan; use crate::{ - db::{ScannerGlobalDb, SubstrateToEventualityDb, AcknowledgedBatch}, + db::{ScannerGlobalDb, SubstrateToEventualityDb, AcknowledgedBatches}, report, ScannerFeed, KeyFor, }; @@ -82,7 +82,7 @@ impl ContinuallyRan for SubstrateTask { { let external_key_for_session_to_sign_batch = report::take_external_key_for_session_to_sign_batch::(&mut txn, batch_id).unwrap(); - AcknowledgedBatch::send(&mut txn, &external_key_for_session_to_sign_batch, batch_id); + AcknowledgedBatches::send(&mut txn, &external_key_for_session_to_sign_batch, batch_id); } // Mark we made progress and handle this diff --git a/processor/signers/Cargo.toml b/processor/signers/Cargo.toml index 3a96c043..91192a9e 100644 --- a/processor/signers/Cargo.toml +++ b/processor/signers/Cargo.toml @@ -21,15 +21,18 @@ workspace = true [dependencies] async-trait = { version = "0.1", default-features = false } +rand_core = { version = "0.6", default-features = false } zeroize = { version = "1", default-features = false, features = ["std"] } ciphersuite = { path = "../../crypto/ciphersuite", default-features = false, features = ["std"] } frost = { package = "modular-frost", path = "../../crypto/frost", default-features = false } +frost-schnorrkel = { path = "../../crypto/schnorrkel", default-features = false } scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] } borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } serai-validator-sets-primitives = { path = "../../substrate/validator-sets/primitives", default-features = false, features = ["std"] } +serai-in-instructions-primitives = { path = "../../substrate/in-instructions/primitives", default-features = false, features = ["std"] } serai-db = { path = "../../common/db" } log = { version = "0.4", default-features = false, features = ["std"] } diff --git a/processor/signers/src/batch/db.rs b/processor/signers/src/batch/db.rs new file mode 100644 index 00000000..fec0a894 --- /dev/null +++ b/processor/signers/src/batch/db.rs @@ -0,0 +1,13 @@ +use serai_validator_sets_primitives::Session; +use serai_in_instructions_primitives::{Batch, SignedBatch}; + +use serai_db::{Get, DbTxn, create_db}; + +create_db! { + BatchSigner { + ActiveSigningProtocols: (session: Session) -> Vec, + Batches: (id: u32) -> Batch, + SignedBatches: (id: u32) -> SignedBatch, + LastAcknowledgedBatch: () -> u32, + } +} diff --git a/processor/signers/src/batch/mod.rs b/processor/signers/src/batch/mod.rs new file mode 100644 index 00000000..410ca378 --- /dev/null +++ b/processor/signers/src/batch/mod.rs @@ -0,0 +1,180 @@ +use std::collections::HashSet; + +use ciphersuite::{group::GroupEncoding, Ristretto}; +use frost::dkg::ThresholdKeys; + +use serai_validator_sets_primitives::Session; +use serai_in_instructions_primitives::{SignedBatch, batch_message}; + +use serai_db::{DbTxn, Db}; + +use messages::sign::VariantSignId; + +use primitives::task::ContinuallyRan; +use scanner::{BatchesToSign, AcknowledgedBatches}; + +use frost_attempt_manager::*; + +use crate::{ + db::{CoordinatorToBatchSignerMessages, BatchSignerToCoordinatorMessages}, + WrappedSchnorrkelMachine, +}; + +mod db; +use db::*; + +// Fetches batches to sign and signs them. +pub(crate) struct BatchSignerTask { + db: D, + + session: Session, + external_key: E, + keys: Vec>, + + active_signing_protocols: HashSet, + attempt_manager: AttemptManager, +} + +impl BatchSignerTask { + pub(crate) fn new( + db: D, + session: Session, + external_key: E, + keys: Vec>, + ) -> Self { + let mut active_signing_protocols = HashSet::new(); + let mut attempt_manager = AttemptManager::new( + db.clone(), + session, + keys.first().expect("creating a batch signer with 0 keys").params().i(), + ); + + // Re-register all active signing protocols + for id in ActiveSigningProtocols::get(&db, session).unwrap_or(vec![]) { + active_signing_protocols.insert(id); + + let batch = Batches::get(&db, id).unwrap(); + assert_eq!(batch.id, id); + + let mut machines = Vec::with_capacity(keys.len()); + for keys in &keys { + machines.push(WrappedSchnorrkelMachine::new(keys.clone(), batch_message(&batch))); + } + attempt_manager.register(VariantSignId::Batch(id), machines); + } + + Self { db, session, external_key, keys, active_signing_protocols, attempt_manager } + } +} + +#[async_trait::async_trait] +impl ContinuallyRan for BatchSignerTask { + async fn run_iteration(&mut self) -> Result { + let mut iterated = false; + + // Check for new batches to sign + loop { + let mut txn = self.db.txn(); + let Some(batch) = BatchesToSign::try_recv(&mut txn, &self.external_key) else { + break; + }; + iterated = true; + + // Save this to the database as a transaction to sign + self.active_signing_protocols.insert(batch.id); + ActiveSigningProtocols::set( + &mut txn, + self.session, + &self.active_signing_protocols.iter().copied().collect(), + ); + Batches::set(&mut txn, batch.id, &batch); + + let mut machines = Vec::with_capacity(self.keys.len()); + for keys in &self.keys { + machines.push(WrappedSchnorrkelMachine::new(keys.clone(), batch_message(&batch))); + } + for msg in self.attempt_manager.register(VariantSignId::Batch(batch.id), machines) { + BatchSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + } + + txn.commit(); + } + + // Check for acknowledged Batches (meaning we should no longer sign for these Batches) + loop { + let mut txn = self.db.txn(); + let Some(id) = AcknowledgedBatches::try_recv(&mut txn, &self.external_key) else { + break; + }; + + { + let last_acknowledged = LastAcknowledgedBatch::get(&txn); + if Some(id) > last_acknowledged { + LastAcknowledgedBatch::set(&mut txn, &id); + } + } + + /* + We may have yet to register this signing protocol. + + While `BatchesToSign` is populated before `AcknowledgedBatches`, we could theoretically have + `BatchesToSign` populated with a new batch _while iterating over `AcknowledgedBatches`_, and + then have `AcknowledgedBatched` populated. In that edge case, we will see the + acknowledgement notification before we see the transaction. + + In such a case, we break (dropping the txn, re-queueing the acknowledgement notification). + On the task's next iteration, we'll process the Batch from `BatchesToSign` and be + able to make progress. + */ + if !self.active_signing_protocols.remove(&id) { + break; + } + iterated = true; + + // Since it was, remove this as an active signing protocol + ActiveSigningProtocols::set( + &mut txn, + self.session, + &self.active_signing_protocols.iter().copied().collect(), + ); + // Clean up the database + Batches::del(&mut txn, id); + SignedBatches::del(&mut txn, id); + + // We retire with a txn so we either successfully flag this Batch as acknowledged, and + // won't re-register it (making this retire safe), or we don't flag it, meaning we will + // re-register it, yet that's safe as we have yet to retire it + self.attempt_manager.retire(&mut txn, VariantSignId::Batch(id)); + + txn.commit(); + } + + // Handle any messages sent to us + loop { + let mut txn = self.db.txn(); + let Some(msg) = CoordinatorToBatchSignerMessages::try_recv(&mut txn, self.session) else { + break; + }; + iterated = true; + + match self.attempt_manager.handle(msg) { + Response::Messages(msgs) => { + for msg in msgs { + BatchSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + } + } + Response::Signature { id, signature } => { + let VariantSignId::Batch(id) = id else { panic!("BatchSignerTask signed a non-Batch") }; + let batch = + Batches::get(&txn, id).expect("signed a Batch we didn't save to the database"); + let signed_batch = SignedBatch { batch, signature: signature.into() }; + SignedBatches::set(&mut txn, signed_batch.batch.id, &signed_batch); + } + } + + txn.commit(); + } + + Ok(iterated) + } +} diff --git a/processor/signers/src/coordinator.rs b/processor/signers/src/coordinator.rs index 43dcc571..c87dc4bb 100644 --- a/processor/signers/src/coordinator.rs +++ b/processor/signers/src/coordinator.rs @@ -93,6 +93,8 @@ impl ContinuallyRan for CoordinatorTask { } } + // TODO: For max(last acknowledged batch, last published batch) onwards, publish every batch + Ok(iterated) } } diff --git a/processor/signers/src/lib.rs b/processor/signers/src/lib.rs index a53f2208..def6ef16 100644 --- a/processor/signers/src/lib.rs +++ b/processor/signers/src/lib.rs @@ -11,6 +11,7 @@ use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; use frost::dkg::{ThresholdCore, ThresholdKeys}; use serai_validator_sets_primitives::Session; +use serai_in_instructions_primitives::SignedBatch; use serai_db::{DbTxn, Db}; @@ -19,25 +20,34 @@ use messages::sign::{VariantSignId, ProcessorMessage, CoordinatorMessage}; use primitives::task::{Task, TaskHandle, ContinuallyRan}; use scheduler::{Transaction, SignableTransaction, TransactionFor}; +mod wrapped_schnorrkel; +pub(crate) use wrapped_schnorrkel::WrappedSchnorrkelMachine; + pub(crate) mod db; mod coordinator; use coordinator::CoordinatorTask; +mod batch; +use batch::BatchSignerTask; + mod transaction; -use transaction::TransactionTask; +use transaction::TransactionSignerTask; /// A connection to the Coordinator which messages can be published with. #[async_trait::async_trait] pub trait Coordinator: 'static + Send + Sync { - /// An error encountered when sending a message. + /// An error encountered when interacting with a coordinator. /// - /// This MUST be an ephemeral error. Retrying sending a message MUST eventually resolve without + /// This MUST be an ephemeral error. Retrying an interaction MUST eventually resolve without /// manual intervention/changing the arguments. type EphemeralError: Debug; /// Send a `messages::sign::ProcessorMessage`. async fn send(&mut self, message: ProcessorMessage) -> Result<(), Self::EphemeralError>; + + /// Publish a `SignedBatch`. + async fn publish_batch(&mut self, batch: SignedBatch) -> Result<(), Self::EphemeralError>; } /// An object capable of publishing a transaction. @@ -111,13 +121,18 @@ impl Signers { ::read_G(&mut external_key_bytes).unwrap(); assert!(external_key_bytes.is_empty()); + // Drain the Batches to sign + // This will be fully populated by the scanner before retiry occurs, making this perfect + // in not leaving any pending blobs behind + while scanner::BatchesToSign::try_recv(&mut txn, &external_key).is_some() {} + // Drain the acknowledged batches to no longer sign + while scanner::AcknowledgedBatches::try_recv(&mut txn, &external_key).is_some() {} + // Drain the transactions to sign - // TransactionsToSign will be fully populated by the scheduler before retiry occurs, making - // this perfect in not leaving any pending blobs behind + // This will be fully populated by the scheduler before retiry while scheduler::TransactionsToSign::::try_recv(&mut txn, &external_key).is_some() {} // Drain the completed Eventualities - // This will be fully populated by the scanner before retiry while scanner::CompletedEventualities::try_recv(&mut txn, &external_key).is_some() {} // Drain our DB channels @@ -156,18 +171,37 @@ impl Signers { // TODO: Batch signer, cosigner, slash report signers - let (transaction_task, transaction_handle) = Task::new(); + let (batch_task, batch_handle) = Task::new(); tokio::spawn( - TransactionTask::<_, ST, _>::new(db.clone(), publisher.clone(), session, external_keys) - .continually_run(transaction_task, vec![coordinator_handle.clone()]), + BatchSignerTask::new( + db.clone(), + session, + external_keys[0].group_key(), + substrate_keys.clone(), + ) + .continually_run(batch_task, vec![coordinator_handle.clone()]), ); - tasks.insert(session, Tasks { - cosigner: todo!("TODO"), - batch: todo!("TODO"), - slash_report: todo!("TODO"), - transaction: transaction_handle, - }); + let (transaction_task, transaction_handle) = Task::new(); + tokio::spawn( + TransactionSignerTask::<_, ST, _>::new( + db.clone(), + publisher.clone(), + session, + external_keys, + ) + .continually_run(transaction_task, vec![coordinator_handle.clone()]), + ); + + tasks.insert( + session, + Tasks { + cosigner: todo!("TODO"), + batch: batch_handle, + slash_report: todo!("TODO"), + transaction: transaction_handle, + }, + ); } Self { coordinator_handle, tasks, _ST: PhantomData } @@ -246,19 +280,27 @@ impl Signers { match sign_id.id { VariantSignId::Cosign(_) => { db::CoordinatorToCosignerMessages::send(txn, sign_id.session, message); - if let Some(tasks) = tasks { tasks.cosigner.run_now(); } + if let Some(tasks) = tasks { + tasks.cosigner.run_now(); + } } VariantSignId::Batch(_) => { db::CoordinatorToBatchSignerMessages::send(txn, sign_id.session, message); - if let Some(tasks) = tasks { tasks.batch.run_now(); } + if let Some(tasks) = tasks { + tasks.batch.run_now(); + } } VariantSignId::SlashReport(_) => { db::CoordinatorToSlashReportSignerMessages::send(txn, sign_id.session, message); - if let Some(tasks) = tasks { tasks.slash_report.run_now(); } + if let Some(tasks) = tasks { + tasks.slash_report.run_now(); + } } VariantSignId::Transaction(_) => { db::CoordinatorToTransactionSignerMessages::send(txn, sign_id.session, message); - if let Some(tasks) = tasks { tasks.transaction.run_now(); } + if let Some(tasks) = tasks { + tasks.transaction.run_now(); + } } } } diff --git a/processor/signers/src/transaction/mod.rs b/processor/signers/src/transaction/mod.rs index be08cec2..9311eb32 100644 --- a/processor/signers/src/transaction/mod.rs +++ b/processor/signers/src/transaction/mod.rs @@ -26,7 +26,7 @@ mod db; use db::*; // Fetches transactions to sign and signs them. -pub(crate) struct TransactionTask< +pub(crate) struct TransactionSignerTask< D: Db, ST: SignableTransaction, P: TransactionPublisher>, @@ -44,7 +44,7 @@ pub(crate) struct TransactionTask< } impl>> - TransactionTask + TransactionSignerTask { pub(crate) fn new( db: D, @@ -90,7 +90,7 @@ impl> #[async_trait::async_trait] impl>> ContinuallyRan - for TransactionTask + for TransactionSignerTask { async fn run_iteration(&mut self) -> Result { let mut iterated = false; @@ -193,17 +193,16 @@ impl> &mut txn, match id { VariantSignId::Transaction(id) => id, - _ => panic!("TransactionTask signed a non-transaction"), + _ => panic!("TransactionSignerTask signed a non-transaction"), }, &buf, ); } - self - .publisher - .publish(signed_tx) - .await - .map_err(|e| format!("couldn't publish transaction: {e:?}"))?; + match self.publisher.publish(signed_tx).await { + Ok(()) => {} + Err(e) => log::warn!("couldn't broadcast transaction: {e:?}"), + } } } diff --git a/processor/signers/src/wrapped_schnorrkel.rs b/processor/signers/src/wrapped_schnorrkel.rs new file mode 100644 index 00000000..d81eaa70 --- /dev/null +++ b/processor/signers/src/wrapped_schnorrkel.rs @@ -0,0 +1,86 @@ +use std::{ + collections::HashMap, + io::{self, Read}, +}; + +use rand_core::{RngCore, CryptoRng}; + +use ciphersuite::Ristretto; +use frost::{ + dkg::{Participant, ThresholdKeys}, + FrostError, + algorithm::Algorithm, + sign::*, +}; +use frost_schnorrkel::Schnorrkel; + +// This wraps a Schnorrkel sign machine into one with a preset message. +#[derive(Clone)] +pub(crate) struct WrappedSchnorrkelMachine(ThresholdKeys, Vec); +impl WrappedSchnorrkelMachine { + pub(crate) fn new(keys: ThresholdKeys, msg: Vec) -> Self { + Self(keys, msg) + } +} + +pub(crate) struct WrappedSchnorrkelSignMachine( + as PreprocessMachine>::SignMachine, + Vec, +); + +type Signature = as PreprocessMachine>::Signature; +impl PreprocessMachine for WrappedSchnorrkelMachine { + type Preprocess = as PreprocessMachine>::Preprocess; + type Signature = Signature; + type SignMachine = WrappedSchnorrkelSignMachine; + + fn preprocess( + self, + rng: &mut R, + ) -> (Self::SignMachine, Preprocess>::Addendum>) + { + let WrappedSchnorrkelMachine(keys, batch) = self; + let (machine, preprocess) = + AlgorithmMachine::new(Schnorrkel::new(b"substrate"), keys).preprocess(rng); + (WrappedSchnorrkelSignMachine(machine, batch), preprocess) + } +} + +impl SignMachine for WrappedSchnorrkelSignMachine { + type Params = as SignMachine>::Params; + type Keys = as SignMachine>::Keys; + type Preprocess = + as SignMachine>::Preprocess; + type SignatureShare = + as SignMachine>::SignatureShare; + type SignatureMachine = + as SignMachine>::SignatureMachine; + + fn cache(self) -> CachedPreprocess { + unimplemented!() + } + + fn from_cache( + _algorithm: Schnorrkel, + _keys: ThresholdKeys, + _cache: CachedPreprocess, + ) -> (Self, Self::Preprocess) { + unimplemented!() + } + + fn read_preprocess(&self, reader: &mut R) -> io::Result { + self.0.read_preprocess(reader) + } + + fn sign( + self, + preprocesses: HashMap< + Participant, + Preprocess>::Addendum>, + >, + msg: &[u8], + ) -> Result<(Self::SignatureMachine, SignatureShare), FrostError> { + assert!(msg.is_empty()); + self.0.sign(preprocesses, &self.1) + } +} diff --git a/processor/src/multisigs/mod.rs b/processor/src/multisigs/mod.rs deleted file mode 100644 index 1c4adabf..00000000 --- a/processor/src/multisigs/mod.rs +++ /dev/null @@ -1,8 +0,0 @@ -#[allow(clippy::type_complexity)] -#[derive(Clone, Debug)] -pub enum MultisigEvent { - // Batches to publish - Batches(Option<(::G, ::G)>, Vec), - // Eventuality completion found on-chain - Completed(Vec, [u8; 32], N::Eventuality), -}