diff --git a/Cargo.lock b/Cargo.lock index 8662be6f..768191b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8120,6 +8120,29 @@ dependencies = [ "sp-runtime", ] +[[package]] +name = "serai-bitcoin-processor" +version = "0.1.0" +dependencies = [ + "async-trait", + "bitcoin-serai", + "borsh", + "const-hex", + "env_logger", + "hex", + "k256", + "log", + "parity-scale-codec", + "secp256k1", + "serai-db", + "serai-env", + "serai-message-queue", + "serai-processor-messages", + "serde_json", + "tokio", + "zalloc", +] + [[package]] name = "serai-client" version = "0.1.0" @@ -8315,6 +8338,28 @@ dependencies = [ name = "serai-env" version = "0.1.0" +[[package]] +name = "serai-ethereum-processor" +version = "0.1.0" +dependencies = [ + "async-trait", + "borsh", + "const-hex", + "env_logger", + "ethereum-serai", + "hex", + "k256", + "log", + "parity-scale-codec", + "serai-db", + "serai-env", + "serai-message-queue", + "serai-processor-messages", + "serde_json", + "tokio", + "zalloc", +] + [[package]] name = "serai-ethereum-relayer" version = "0.1.0" @@ -8343,7 +8388,6 @@ dependencies = [ "serai-coordinator-tests", "serai-docker-tests", "serai-message-queue-tests", - "serai-processor", "serai-processor-tests", "serde", "serde_json", @@ -8459,6 +8503,29 @@ dependencies = [ "zeroize", ] +[[package]] +name = "serai-monero-processor" +version = "0.1.0" +dependencies = [ + "async-trait", + "borsh", + "const-hex", + "dalek-ff-group", + "env_logger", + "hex", + "log", + "monero-simple-request-rpc", + "monero-wallet", + "parity-scale-codec", + "serai-db", + "serai-env", + "serai-message-queue", + "serai-processor-messages", + "serde_json", + "tokio", + "zalloc", +] + [[package]] name = "serai-no-std-tests" version = "0.1.0" @@ -8558,47 +8625,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "serai-processor" -version = "0.1.0" -dependencies = [ - "async-trait", - "bitcoin-serai", - "borsh", - "ciphersuite", - "const-hex", - "dalek-ff-group", - "dkg", - "dockertest", - "ec-divisors", - "env_logger", - "ethereum-serai", - "flexible-transcript", - "frost-schnorrkel", - "hex", - "k256", - "log", - "modular-frost", - "monero-simple-request-rpc", - "monero-wallet", - "parity-scale-codec", - "rand_chacha", - "rand_core", - "secp256k1", - "serai-client", - "serai-db", - "serai-docker-tests", - "serai-env", - "serai-message-queue", - "serai-processor-messages", - "serde_json", - "sp-application-crypto", - "thiserror", - "tokio", - "zalloc", - "zeroize", -] - [[package]] name = "serai-processor-frost-attempt-manager" version = "0.1.0" @@ -8676,6 +8702,7 @@ dependencies = [ "serai-in-instructions-primitives", "serai-primitives", "serai-processor-primitives", + "serai-processor-scheduler-primitives", "tokio", ] @@ -8684,11 +8711,32 @@ name = "serai-processor-scheduler-primitives" version = "0.1.0" dependencies = [ "borsh", - "group", + "ciphersuite", + "modular-frost", "parity-scale-codec", "serai-db", ] +[[package]] +name = "serai-processor-signers" +version = "0.1.0" +dependencies = [ + "async-trait", + "borsh", + "ciphersuite", + "log", + "modular-frost", + "parity-scale-codec", + "serai-db", + "serai-processor-frost-attempt-manager", + "serai-processor-messages", + "serai-processor-primitives", + "serai-processor-scanner", + "serai-processor-scheduler-primitives", + "serai-validator-sets-primitives", + "tokio", +] + [[package]] name = "serai-processor-tests" version = "0.1.0" @@ -8711,7 +8759,6 @@ dependencies = [ "serai-docker-tests", "serai-message-queue", "serai-message-queue-tests", - "serai-processor", "serai-processor-messages", "serde_json", "tokio", diff --git a/processor/frost-attempt-manager/src/individual.rs b/processor/frost-attempt-manager/src/individual.rs index d7f4eec0..049731c6 100644 --- a/processor/frost-attempt-manager/src/individual.rs +++ b/processor/frost-attempt-manager/src/individual.rs @@ -80,10 +80,15 @@ impl SigningProtocol { We avoid this by saving to the DB we preprocessed before sending our preprocessed, and only keeping our preprocesses for this instance of the processor. Accordingly, on reboot, we will - flag the prior preprocess and not send new preprocesses. + flag the prior preprocess and not send new preprocesses. This does require our own DB + transaction (to ensure we save to the DB we preprocessed before yielding the preprocess + messages). We also won't send the share we were supposed to, unfortunately, yet caching/reloading the preprocess has enough safety issues it isn't worth the headache. + + Since we bind a signing attempt to the lifetime of the application, we're also safe against + nonce reuse (as the state machines enforce single-use and we never reuse a preprocess). */ { let mut txn = self.db.txn(); diff --git a/processor/frost-attempt-manager/src/lib.rs b/processor/frost-attempt-manager/src/lib.rs index c4d1708d..2ce46784 100644 --- a/processor/frost-attempt-manager/src/lib.rs +++ b/processor/frost-attempt-manager/src/lib.rs @@ -65,6 +65,10 @@ impl AttemptManager { } /// Handle a message for a signing protocol. + /// + /// Handling a message multiple times is safe and will cause subsequent calls to return + /// `Response::Messages(vec![])`. Handling a message for a signing protocol which isn't being + /// worked on (potentially due to rebooting) will also return `Response::Messages(vec![])`. pub fn handle(&mut self, msg: CoordinatorMessage) -> Response { match msg { CoordinatorMessage::Preprocesses { id, preprocesses } => { diff --git a/processor/primitives/src/eventuality.rs b/processor/primitives/src/eventuality.rs index 80337824..f68ceeae 100644 --- a/processor/primitives/src/eventuality.rs +++ b/processor/primitives/src/eventuality.rs @@ -7,7 +7,7 @@ pub trait Eventuality: Sized + Send + Sync { /// The type used to identify a received output. type OutputId: Id; - /// The ID of the transaction this Eventuality is for. + /// The ID of the SignableTransaction this Eventuality is for. /// /// This is an internal ID arbitrarily definable so long as it's unique. fn id(&self) -> [u8; 32]; diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index f45d2966..246e5f46 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -520,3 +520,26 @@ impl SubstrateToEventualityDb { Burns::try_recv(txn, acknowledged_block) } } + +mod _completed_eventualities { + use serai_db::{Get, DbTxn, create_db, db_channel}; + + db_channel! { + ScannerPublic { + CompletedEventualities: (empty_key: ()) -> [u8; 32], + } + } +} + +/// The IDs of completed Eventualities found on-chain, within a finalized block. +pub struct CompletedEventualities(PhantomData); +impl CompletedEventualities { + pub(crate) fn send(txn: &mut impl DbTxn, id: [u8; 32]) { + _completed_eventualities::CompletedEventualities::send(txn, (), &id); + } + + /// Receive the ID of a completed Eventuality. + pub fn try_recv(txn: &mut impl DbTxn) -> Option<[u8; 32]> { + _completed_eventualities::CompletedEventualities::try_recv(txn, ()) + } +} diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index 6db60b71..7dadbe55 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -14,7 +14,7 @@ use crate::{ ScanToEventualityDb, }, BlockExt, ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, SchedulerUpdate, Scheduler, - sort_outputs, + CompletedEventualities, sort_outputs, scan::{next_to_scan_for_outputs_block, queue_output_until_block}, }; @@ -292,8 +292,13 @@ impl> ContinuallyRan for EventualityTas completed_eventualities }; - for tx in completed_eventualities.keys() { - log::info!("eventuality resolved by {}", hex::encode(tx.as_ref())); + for (tx, eventuality) in &completed_eventualities { + log::info!( + "eventuality {} resolved by {}", + hex::encode(eventuality.id()), + hex::encode(tx.as_ref()) + ); + CompletedEventualities::::send(&mut txn, eventuality.id()); } // Fetch all non-External outputs diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 5573e484..7c699e9c 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -21,6 +21,7 @@ pub use lifetime::LifetimeStage; // Database schema definition and associated functions. mod db; +pub use db::CompletedEventualities; // Task to index the blockchain, ensuring we don't reorganize finalized blocks. mod index; // Scans blocks for received coins. @@ -170,6 +171,10 @@ pub type EventualityFor = <::Block as Block>::Eventuality; /// The block type for this ScannerFeed. pub type BlockFor = ::Block; +/// An object usable to publish a Batch. +// This will presumably be the Batch signer defined in `serai-processor-signers` or a test shim. +// It could also be some app-layer database for the purpose of verifying the Batches published to +// Serai. #[async_trait::async_trait] pub trait BatchPublisher: 'static + Send + Sync { /// An error encountered when publishing the Batch. diff --git a/processor/scheduler/primitives/Cargo.toml b/processor/scheduler/primitives/Cargo.toml index cdf12cbb..f847300a 100644 --- a/processor/scheduler/primitives/Cargo.toml +++ b/processor/scheduler/primitives/Cargo.toml @@ -20,7 +20,8 @@ ignored = ["scale", "borsh"] workspace = true [dependencies] -group = { version = "0.13", default-features = false } +ciphersuite = { path = "../../../crypto/ciphersuite", default-features = false, features = ["std"] } +frost = { package = "modular-frost", path = "../../../crypto/frost", default-features = false } scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] } borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } diff --git a/processor/scheduler/primitives/src/lib.rs b/processor/scheduler/primitives/src/lib.rs index b3bf525c..4de4f67a 100644 --- a/processor/scheduler/primitives/src/lib.rs +++ b/processor/scheduler/primitives/src/lib.rs @@ -5,16 +5,25 @@ use core::marker::PhantomData; use std::io; -use group::GroupEncoding; +use ciphersuite::{group::GroupEncoding, Ciphersuite}; +use frost::{dkg::ThresholdKeys, sign::PreprocessMachine}; use serai_db::DbTxn; +/// A transaction. +pub trait Transaction: Sized { + /// Read a `Transaction`. + fn read(reader: &mut impl io::Read) -> io::Result; + /// Write a `Transaction`. + fn write(&self, writer: &mut impl io::Write) -> io::Result<()>; +} + /// A signable transaction. pub trait SignableTransaction: 'static + Sized + Send + Sync + Clone { /// The ciphersuite used to sign this transaction. - type Ciphersuite: Cuphersuite; + type Ciphersuite: Ciphersuite; /// The preprocess machine for the signing protocol for this transaction. - type PreprocessMachine: PreprocessMachine; + type PreprocessMachine: Clone + PreprocessMachine; /// Read a `SignableTransaction`. fn read(reader: &mut impl io::Read) -> io::Result; diff --git a/processor/signers/Cargo.toml b/processor/signers/Cargo.toml index 007c814c..06d64da2 100644 --- a/processor/signers/Cargo.toml +++ b/processor/signers/Cargo.toml @@ -20,13 +20,23 @@ ignored = ["borsh", "scale"] workspace = true [dependencies] -group = { version = "0.13", default-features = false } +async-trait = { version = "0.1", default-features = false } +ciphersuite = { path = "../../crypto/ciphersuite", default-features = false, features = ["std"] } +frost = { package = "modular-frost", path = "../../crypto/frost", default-features = false } + +scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] } +borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } + +serai-validator-sets-primitives = { path = "../../substrate/validator-sets/primitives", default-features = false, features = ["std"] } + +serai-db = { path = "../../common/db" } log = { version = "0.4", default-features = false, features = ["std"] } tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] } +messages = { package = "serai-processor-messages", path = "../messages" } primitives = { package = "serai-processor-primitives", path = "../primitives" } scanner = { package = "serai-processor-scanner", path = "../scanner" } -scheduler = { package = "serai-scheduler-primitives", path = "../scheduler/primitives" } +scheduler = { package = "serai-processor-scheduler-primitives", path = "../scheduler/primitives" } frost-attempt-manager = { package = "serai-processor-frost-attempt-manager", path = "../frost-attempt-manager" } diff --git a/processor/signers/src/db.rs b/processor/signers/src/db.rs new file mode 100644 index 00000000..5ba5f7d4 --- /dev/null +++ b/processor/signers/src/db.rs @@ -0,0 +1,27 @@ +use serai_validator_sets_primitives::Session; + +use serai_db::{Get, DbTxn, create_db, db_channel}; + +use messages::sign::{ProcessorMessage, CoordinatorMessage}; + +db_channel! { + SignersGlobal { + // CompletedEventualities needs to be handled by each signer, meaning we need to turn its + // effective spsc into a spmc. We do this by duplicating its message for all keys we're + // signing for. + // TODO: Populate from CompletedEventualities + CompletedEventualitiesForEachKey: (session: Session) -> [u8; 32], + + CoordinatorToTransactionSignerMessages: (session: Session) -> CoordinatorMessage, + TransactionSignerToCoordinatorMessages: (session: Session) -> ProcessorMessage, + + CoordinatorToBatchSignerMessages: (session: Session) -> CoordinatorMessage, + BatchSignerToCoordinatorMessages: (session: Session) -> ProcessorMessage, + + CoordinatorToSlashReportSignerMessages: (session: Session) -> CoordinatorMessage, + SlashReportSignerToCoordinatorMessages: (session: Session) -> ProcessorMessage, + + CoordinatorToCosignerMessages: (session: Session) -> CoordinatorMessage, + CosignerToCoordinatorMessages: (session: Session) -> ProcessorMessage, + } +} diff --git a/processor/signers/src/lib.rs b/processor/signers/src/lib.rs index c221ca4c..7453f4b6 100644 --- a/processor/signers/src/lib.rs +++ b/processor/signers/src/lib.rs @@ -2,8 +2,38 @@ #![doc = include_str!("../README.md")] #![deny(missing_docs)] +use core::fmt::Debug; + +use frost::sign::PreprocessMachine; + +use scheduler::SignableTransaction; + +pub(crate) mod db; + mod transaction; +/// An object capable of publishing a transaction. +#[async_trait::async_trait] +pub trait TransactionPublisher: 'static + Send + Sync { + /// An error encountered when publishing a transaction. + /// + /// This MUST be an ephemeral error. Retrying publication MUST eventually resolve without manual + /// intervention/changing the arguments. + /// + /// The transaction already being present in the mempool/on-chain SHOULD NOT be considered an + /// error. + type EphemeralError: Debug; + + /// Publish a transaction. + /// + /// This will be called multiple times, with the same transaction, until the transaction is + /// confirmed on-chain. + async fn publish( + &self, + tx: ::Signature, + ) -> Result<(), Self::EphemeralError>; +} + /* // The signers used by a Processor, key-scoped. struct KeySigners { diff --git a/processor/signers/src/transaction/mod.rs b/processor/signers/src/transaction/mod.rs index ba1487cb..4ed573f4 100644 --- a/processor/signers/src/transaction/mod.rs +++ b/processor/signers/src/transaction/mod.rs @@ -1,68 +1,127 @@ -use serai_db::{Get, DbTxn, Db}; +use frost::dkg::ThresholdKeys; + +use serai_validator_sets_primitives::Session; + +use serai_db::{DbTxn, Db}; use primitives::task::ContinuallyRan; -use scanner::ScannerFeed; -use scheduler::TransactionsToSign; +use scheduler::{SignableTransaction, TransactionsToSign}; +use scanner::{ScannerFeed, Scheduler}; + +use frost_attempt_manager::*; + +use crate::{ + db::{ + CoordinatorToTransactionSignerMessages, TransactionSignerToCoordinatorMessages, + CompletedEventualitiesForEachKey, + }, + TransactionPublisher, +}; mod db; -use db::IndexDb; // Fetches transactions to sign and signs them. -pub(crate) struct TransactionTask { +pub(crate) struct TransactionTask< + D: Db, + S: ScannerFeed, + Sch: Scheduler, + P: TransactionPublisher, +> { db: D, - keys: ThresholdKeys<::Ciphersuite>, + session: Session, + keys: Vec::Ciphersuite>>, attempt_manager: AttemptManager::PreprocessMachine>, + publisher: P, } -impl TransactionTask { - pub(crate) async fn new( +impl, P: TransactionPublisher> + TransactionTask +{ + pub(crate) fn new( db: D, - keys: ThresholdKeys<::Ciphersuite>, + session: Session, + keys: Vec::Ciphersuite>>, + publisher: P, ) -> Self { - Self { db, keys, attempt_manager: AttemptManager::new() } + let attempt_manager = AttemptManager::new( + db.clone(), + session, + keys.first().expect("creating a transaction signer with 0 keys").params().i(), + ); + Self { db, session, keys, attempt_manager, publisher } } } #[async_trait::async_trait] -impl ContinuallyRan for TransactionTask { +impl, P: TransactionPublisher> + ContinuallyRan for TransactionTask +{ async fn run_iteration(&mut self) -> Result { let mut iterated = false; // Check for new transactions to sign loop { let mut txn = self.db.txn(); - let Some(tx) = TransactionsToSign::try_recv(&mut txn, self.key) else { break }; + let Some(tx) = TransactionsToSign::::try_recv( + &mut txn, + &self.keys[0].group_key(), + ) else { + break; + }; iterated = true; let mut machines = Vec::with_capacity(self.keys.len()); for keys in &self.keys { machines.push(tx.clone().sign(keys.clone())); } - let messages = self.attempt_manager.register(tx.id(), machines); - todo!("TODO"); + for msg in self.attempt_manager.register(tx.id(), machines) { + TransactionSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + } txn.commit(); } // Check for completed Eventualities (meaning we should no longer sign for these transactions) loop { let mut txn = self.db.txn(); - let Some(tx) = CompletedEventualities::try_recv(&mut txn, self.key) else { break }; + let Some(id) = CompletedEventualitiesForEachKey::try_recv(&mut txn, self.session) else { + break; + }; iterated = true; - self.attempt_manager.retire(tx); + self.attempt_manager.retire(id); + // TODO: Stop rebroadcasting this transaction txn.commit(); } + // Handle any messages sent to us loop { let mut txn = self.db.txn(); - let Some(msg) = TransactionSignMessages::try_recv(&mut txn, self.key) else { break }; + let Some(msg) = CoordinatorToTransactionSignerMessages::try_recv(&mut txn, self.session) + else { + break; + }; iterated = true; match self.attempt_manager.handle(msg) { - Response::Messages(messages) => todo!("TODO"), - Response::Signature(signature) => todo!("TODO"), + Response::Messages(msgs) => { + for msg in msgs { + TransactionSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + } + } + Response::Signature(signed_tx) => { + // TODO: Save this TX to the DB + // TODO: Attempt publication every minute + // TODO: On boot, reload all TXs to rebroadcast + self + .publisher + .publish(signed_tx) + .await + .map_err(|e| format!("couldn't publish transaction: {e:?}"))?; + } } + + txn.commit(); } Ok(iterated)