diff --git a/Cargo.lock b/Cargo.lock index 9db0bb74..81e3d1de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8731,6 +8731,7 @@ dependencies = [ "rand_core", "serai-db", "serai-in-instructions-primitives", + "serai-primitives", "serai-processor-frost-attempt-manager", "serai-processor-messages", "serai-processor-primitives", diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index dc7f2939..d9534293 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -7,9 +7,9 @@ use borsh::{BorshSerialize, BorshDeserialize}; use dkg::Participant; use serai_primitives::BlockHash; -use in_instructions_primitives::{Batch, SignedBatch}; -use coins_primitives::OutInstructionWithBalance; use validator_sets_primitives::{Session, KeyPair, Slash}; +use coins_primitives::OutInstructionWithBalance; +use in_instructions_primitives::{Batch, SignedBatch}; #[derive(Clone, Copy, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] pub struct SubstrateContext { @@ -84,7 +84,7 @@ pub mod sign { pub enum VariantSignId { Cosign([u8; 32]), Batch(u32), - SlashReport([u8; 32]), + SlashReport(Session), Transaction([u8; 32]), } impl fmt::Debug for VariantSignId { @@ -94,10 +94,9 @@ pub mod sign { f.debug_struct("VariantSignId::Cosign").field("0", &hex::encode(cosign)).finish() } Self::Batch(batch) => f.debug_struct("VariantSignId::Batch").field("0", &batch).finish(), - Self::SlashReport(slash_report) => f - .debug_struct("VariantSignId::SlashReport") - .field("0", &hex::encode(slash_report)) - .finish(), + Self::SlashReport(session) => { + f.debug_struct("VariantSignId::SlashReport").field("0", &session).finish() + } Self::Transaction(tx) => { f.debug_struct("VariantSignId::Transaction").field("0", &hex::encode(tx)).finish() } diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index e5b39cdd..5919ff7e 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -21,7 +21,7 @@ pub use lifetime::LifetimeStage; // Database schema definition and associated functions. mod db; use db::ScannerGlobalDb; -pub use db::{BatchesToSign, AcknowledgedBatches, CompletedEventualities}; +pub use db::{Batches, BatchesToSign, AcknowledgedBatches, CompletedEventualities}; // Task to index the blockchain, ensuring we don't reorganize finalized blocks. mod index; // Scans blocks for received coins. diff --git a/processor/signers/Cargo.toml b/processor/signers/Cargo.toml index 91192a9e..7b7ef098 100644 --- a/processor/signers/Cargo.toml +++ b/processor/signers/Cargo.toml @@ -31,6 +31,7 @@ frost-schnorrkel = { path = "../../crypto/schnorrkel", default-features = false scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] } borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } +serai-primitives = { path = "../../substrate/primitives", default-features = false, features = ["std"] } serai-validator-sets-primitives = { path = "../../substrate/validator-sets/primitives", default-features = false, features = ["std"] } serai-in-instructions-primitives = { path = "../../substrate/in-instructions/primitives", default-features = false, features = ["std"] } diff --git a/processor/signers/src/coordinator/mod.rs b/processor/signers/src/coordinator/mod.rs index 77cdef59..0b1ee467 100644 --- a/processor/signers/src/coordinator/mod.rs +++ b/processor/signers/src/coordinator/mod.rs @@ -1,14 +1,9 @@ +use scale::Decode; use serai_db::{DbTxn, Db}; use primitives::task::ContinuallyRan; -use crate::{ - db::{ - RegisteredKeys, CosignerToCoordinatorMessages, BatchSignerToCoordinatorMessages, - SlashReportSignerToCoordinatorMessages, TransactionSignerToCoordinatorMessages, - }, - Coordinator, -}; +use crate::{db::*, Coordinator}; mod db; @@ -30,6 +25,7 @@ impl ContinuallyRan for CoordinatorTask { let mut iterated = false; for session in RegisteredKeys::get(&self.db).unwrap_or(vec![]) { + // Publish the messages generated by this key's signers loop { let mut txn = self.db.txn(); let Some(msg) = CosignerToCoordinatorMessages::try_recv(&mut txn, session) else { @@ -93,6 +89,26 @@ impl ContinuallyRan for CoordinatorTask { txn.commit(); } + + // If this session signed its slash report, publish its signature + { + let mut txn = self.db.txn(); + if let Some(slash_report_signature) = SlashReportSignature::try_recv(&mut txn, session) { + iterated = true; + + self + .coordinator + .publish_slash_report_signature( + <_>::decode(&mut slash_report_signature.as_slice()).unwrap(), + ) + .await + .map_err(|e| { + format!("couldn't send slash report signature to the coordinator: {e:?}") + })?; + + txn.commit(); + } + } } // Publish the Batches diff --git a/processor/signers/src/db.rs b/processor/signers/src/db.rs index 66894621..ea022fca 100644 --- a/processor/signers/src/db.rs +++ b/processor/signers/src/db.rs @@ -17,6 +17,7 @@ db_channel! { SignersGlobal { Cosign: (session: Session) -> (u64, [u8; 32]), SlashReport: (session: Session) -> Vec, + SlashReportSignature: (session: Session) -> Vec, CoordinatorToCosignerMessages: (session: Session) -> CoordinatorMessage, CosignerToCoordinatorMessages: (session: Session) -> ProcessorMessage, diff --git a/processor/signers/src/lib.rs b/processor/signers/src/lib.rs index de456296..cc40ce25 100644 --- a/processor/signers/src/lib.rs +++ b/processor/signers/src/lib.rs @@ -10,8 +10,9 @@ use zeroize::Zeroizing; use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; use frost::dkg::{ThresholdCore, ThresholdKeys}; +use serai_primitives::Signature; use serai_validator_sets_primitives::{Session, Slash}; -use serai_in_instructions_primitives::SignedBatch; +use serai_in_instructions_primitives::{Batch, SignedBatch}; use serai_db::{DbTxn, Db}; @@ -19,6 +20,7 @@ use messages::sign::{VariantSignId, ProcessorMessage, CoordinatorMessage}; use primitives::task::{Task, TaskHandle, ContinuallyRan}; use scheduler::{Transaction, SignableTransaction, TransactionFor}; +use scanner::{ScannerFeed, Scheduler}; mod wrapped_schnorrkel; pub(crate) use wrapped_schnorrkel::WrappedSchnorrkelMachine; @@ -31,6 +33,9 @@ use coordinator::CoordinatorTask; mod batch; use batch::BatchSignerTask; +mod slash_report; +use slash_report::SlashReportSignerTask; + mod transaction; use transaction::TransactionSignerTask; @@ -51,6 +56,12 @@ pub trait Coordinator: 'static + Send + Sync { /// Publish a `SignedBatch`. async fn publish_signed_batch(&mut self, batch: SignedBatch) -> Result<(), Self::EphemeralError>; + + /// Publish a slash report's signature. + async fn publish_slash_report_signature( + &mut self, + signature: Signature, + ) -> Result<(), Self::EphemeralError>; } /// An object capable of publishing a transaction. @@ -81,12 +92,17 @@ struct Tasks { /// The signers used by a processor. #[allow(non_snake_case)] -pub struct Signers { +pub struct Signers> { coordinator_handle: TaskHandle, tasks: HashMap, - _ST: PhantomData, + _Sch: PhantomData, + _S: PhantomData, } +type CiphersuiteFor = + <>::SignableTransaction as SignableTransaction>::Ciphersuite; +type SignableTransactionFor = >::SignableTransaction; + /* This is completely outside of consensus, so the worst that can happen is: @@ -99,14 +115,14 @@ pub struct Signers { completion comes in *before* we registered a key, the signer will hold the signing protocol in memory until the session is retired entirely. */ -impl Signers { +impl> Signers { /// Initialize the signers. /// /// This will spawn tasks for any historically registered keys. pub fn new( mut db: impl Db, coordinator: impl Coordinator, - publisher: &impl TransactionPublisher>, + publisher: &impl TransactionPublisher>>, ) -> Self { /* On boot, perform any database cleanup which was queued. @@ -120,8 +136,7 @@ impl Signers { let mut txn = db.txn(); for (session, external_key_bytes) in db::ToCleanup::get(&txn).unwrap_or(vec![]) { let mut external_key_bytes = external_key_bytes.as_slice(); - let external_key = - ::read_G(&mut external_key_bytes).unwrap(); + let external_key = CiphersuiteFor::::read_G(&mut external_key_bytes).unwrap(); assert!(external_key_bytes.is_empty()); // Drain the Batches to sign @@ -133,7 +148,12 @@ impl Signers { // Drain the transactions to sign // This will be fully populated by the scheduler before retiry - while scheduler::TransactionsToSign::::try_recv(&mut txn, &external_key).is_some() {} + while scheduler::TransactionsToSign::>::try_recv( + &mut txn, + &external_key, + ) + .is_some() + {} // Drain the completed Eventualities while scanner::CompletedEventualities::try_recv(&mut txn, &external_key).is_some() {} @@ -170,11 +190,12 @@ impl Signers { while !buf.is_empty() { substrate_keys .push(ThresholdKeys::from(ThresholdCore::::read(&mut buf).unwrap())); - external_keys - .push(ThresholdKeys::from(ThresholdCore::::read(&mut buf).unwrap())); + external_keys.push(ThresholdKeys::from( + ThresholdCore::>::read(&mut buf).unwrap(), + )); } - // TODO: Cosigner and slash report signers + // TODO: Cosigner let (batch_task, batch_handle) = Task::new(); tokio::spawn( @@ -187,9 +208,15 @@ impl Signers { .continually_run(batch_task, vec![coordinator_handle.clone()]), ); + let (slash_report_task, slash_report_handle) = Task::new(); + tokio::spawn( + SlashReportSignerTask::<_, S>::new(db.clone(), session, substrate_keys.clone()) + .continually_run(slash_report_task, vec![coordinator_handle.clone()]), + ); + let (transaction_task, transaction_handle) = Task::new(); tokio::spawn( - TransactionSignerTask::<_, ST, _>::new( + TransactionSignerTask::<_, SignableTransactionFor, _>::new( db.clone(), publisher.clone(), session, @@ -203,13 +230,13 @@ impl Signers { Tasks { cosigner: todo!("TODO"), batch: batch_handle, - slash_report: todo!("TODO"), + slash_report: slash_report_handle, transaction: transaction_handle, }, ); } - Self { coordinator_handle, tasks, _ST: PhantomData } + Self { coordinator_handle, tasks, _Sch: PhantomData, _S: PhantomData } } /// Register a set of keys to sign with. @@ -220,7 +247,7 @@ impl Signers { txn: &mut impl DbTxn, session: Session, substrate_keys: Vec>, - network_keys: Vec>, + network_keys: Vec>>, ) { // Don't register already retired keys if Some(session.0) <= db::LatestRetiredSession::get(txn).map(|session| session.0) { @@ -246,7 +273,8 @@ impl Signers { /// Retire the signers for a session. /// /// This MUST be called in order, for every session (even if we didn't register keys for this - /// session). + /// session). This MUST only be called after slash report publication, or after that process + /// times out (not once the key is done with regards to the external network). pub fn retire_session( &mut self, txn: &mut impl DbTxn, @@ -324,7 +352,7 @@ impl Signers { txn.commit(); if let Some(tasks) = self.tasks.get(&session) { - tasks.cosign.run_now(); + tasks.cosigner.run_now(); } } @@ -335,9 +363,9 @@ impl Signers { &mut self, mut txn: impl DbTxn, session: Session, - slash_report: Vec, + slash_report: &Vec, ) { - db::SlashReport::send(&mut txn, session, &slash_report); + db::SlashReport::send(&mut txn, session, slash_report); txn.commit(); if let Some(tasks) = self.tasks.get(&session) { diff --git a/processor/signers/src/slash_report.rs b/processor/signers/src/slash_report.rs new file mode 100644 index 00000000..bdb6cdba --- /dev/null +++ b/processor/signers/src/slash_report.rs @@ -0,0 +1,120 @@ +use core::marker::PhantomData; + +use ciphersuite::Ristretto; +use frost::dkg::ThresholdKeys; + +use scale::Encode; +use serai_primitives::Signature; +use serai_validator_sets_primitives::{ + Session, ValidatorSet, SlashReport as SlashReportStruct, report_slashes_message, +}; + +use serai_db::{DbTxn, Db}; + +use messages::sign::VariantSignId; + +use primitives::task::ContinuallyRan; +use scanner::ScannerFeed; + +use frost_attempt_manager::*; + +use crate::{ + db::{ + SlashReport, SlashReportSignature, CoordinatorToSlashReportSignerMessages, + SlashReportSignerToCoordinatorMessages, + }, + WrappedSchnorrkelMachine, +}; + +// Fetches slash_reportes to sign and signs them. +#[allow(non_snake_case)] +pub(crate) struct SlashReportSignerTask { + db: D, + _S: PhantomData, + + session: Session, + keys: Vec>, + + has_slash_report: bool, + attempt_manager: AttemptManager, +} + +impl SlashReportSignerTask { + pub(crate) fn new(db: D, session: Session, keys: Vec>) -> Self { + let attempt_manager = AttemptManager::new( + db.clone(), + session, + keys.first().expect("creating a slash_report signer with 0 keys").params().i(), + ); + + Self { db, _S: PhantomData, session, keys, has_slash_report: false, attempt_manager } + } +} + +#[async_trait::async_trait] +impl ContinuallyRan for SlashReportSignerTask { + async fn run_iteration(&mut self) -> Result { + let mut iterated = false; + + // Check for the slash report to sign + if !self.has_slash_report { + let mut txn = self.db.txn(); + let Some(slash_report) = SlashReport::try_recv(&mut txn, self.session) else { + return Ok(false); + }; + // We only commit this upon successfully signing this slash report + drop(txn); + iterated = true; + + self.has_slash_report = true; + + let mut machines = Vec::with_capacity(self.keys.len()); + { + let message = report_slashes_message( + &ValidatorSet { network: S::NETWORK, session: self.session }, + &SlashReportStruct(slash_report.try_into().unwrap()), + ); + for keys in &self.keys { + machines.push(WrappedSchnorrkelMachine::new(keys.clone(), message.clone())); + } + } + let mut txn = self.db.txn(); + for msg in self.attempt_manager.register(VariantSignId::SlashReport(self.session), machines) { + SlashReportSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + } + txn.commit(); + } + + // Handle any messages sent to us + loop { + let mut txn = self.db.txn(); + let Some(msg) = CoordinatorToSlashReportSignerMessages::try_recv(&mut txn, self.session) + else { + break; + }; + iterated = true; + + match self.attempt_manager.handle(msg) { + Response::Messages(msgs) => { + for msg in msgs { + SlashReportSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + } + } + Response::Signature { id, signature } => { + let VariantSignId::SlashReport(session) = id else { + panic!("SlashReportSignerTask signed a non-SlashReport") + }; + assert_eq!(session, self.session); + // Drain the channel + SlashReport::try_recv(&mut txn, self.session).unwrap(); + // Send the signature + SlashReportSignature::send(&mut txn, session, &Signature::from(signature).encode()); + } + } + + txn.commit(); + } + + Ok(iterated) + } +}