diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index 6f336bcc..97500b4e 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -7,7 +7,7 @@ use dkg::Participant; use serai_client::{ primitives::NetworkId, - validator_sets::primitives::{Session, ValidatorSet}, + validator_sets::primitives::{Session, ValidatorSet, KeyPair}, }; use serai_cosign::SignedCosign; @@ -78,6 +78,8 @@ create_db! { LastProcessorMessage: (network: NetworkId) -> u64, // Cosigns we produced and tried to intake yet incurred an error while doing so ErroneousCosigns: () -> Vec<SignedCosign>, + // The keys to confirm and set on the Serai network + KeysToConfirm: (set: ValidatorSet) -> KeyPair, } } @@ -95,24 +97,39 @@ mod _internal_db { db_channel! { Coordinator { - // Tributary transactions to publish - TributaryTransactions: (set: ValidatorSet) -> Transaction, + // Tributary transactions to publish from the Processor messages + TributaryTransactionsFromProcessorMessages: (set: ValidatorSet) -> Transaction, + // Tributary transactions to publish from the DKG confirmation task + TributaryTransactionsFromDkgConfirmation: (set: ValidatorSet) -> Transaction, // Participants to remove RemoveParticipant: (set: ValidatorSet) -> Participant, } } } -pub(crate) struct TributaryTransactions; -impl TributaryTransactions { +pub(crate) struct TributaryTransactionsFromProcessorMessages; +impl TributaryTransactionsFromProcessorMessages { pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet, tx: &Transaction) { // If this set has yet to be retired, send this transaction if RetiredTributary::get(txn, set.network).map(|session| session.0) < Some(set.session.0) { - _internal_db::TributaryTransactions::send(txn, set, tx); + _internal_db::TributaryTransactionsFromProcessorMessages::send(txn, set, tx); } } pub(crate) fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<Transaction> { - _internal_db::TributaryTransactions::try_recv(txn, set) + _internal_db::TributaryTransactionsFromProcessorMessages::try_recv(txn, set) + } +} + +pub(crate) struct TributaryTransactionsFromDkgConfirmation; +impl TributaryTransactionsFromDkgConfirmation { + pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet, tx: &Transaction) { + // If this set has yet to be retired, send this transaction + if RetiredTributary::get(txn, set.network).map(|session| session.0) < Some(set.session.0) { + _internal_db::TributaryTransactionsFromDkgConfirmation::send(txn, set, tx); + } + } + pub(crate) fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<Transaction> { + _internal_db::TributaryTransactionsFromDkgConfirmation::try_recv(txn, set) } } diff --git a/coordinator/src/dkg_confirmation.rs b/coordinator/src/dkg_confirmation.rs new file mode 100644 index 00000000..e09b0a4d --- /dev/null +++ b/coordinator/src/dkg_confirmation.rs @@ -0,0 +1,442 @@ +use core::{ops::Deref, future::Future}; +use std::{boxed::Box, sync::Arc, collections::HashMap}; + +use zeroize::Zeroizing; +use rand_core::OsRng; +use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; +use frost_schnorrkel::{ + frost::{ + dkg::{Participant, musig::musig}, + FrostError, + sign::*, + }, + Schnorrkel, +}; + +use serai_db::{DbTxn, Db as DbTrait}; + +use serai_client::{ + primitives::SeraiAddress, + validator_sets::primitives::{ValidatorSet, musig_context, set_keys_message}, + SeraiError, Serai, +}; + +use serai_task::ContinuallyRan; + +use serai_coordinator_substrate::{NewSetInformation, Keys}; +use serai_coordinator_tributary::{Transaction, DkgConfirmationMessages}; + +use crate::{KeysToConfirm, TributaryTransactionsFromDkgConfirmation}; + +fn schnorrkel() -> Schnorrkel { + Schnorrkel::new(b"substrate") // TODO: Pull the constant for this +} + +fn our_i( + set: &NewSetInformation, + key: &Zeroizing<<Ristretto as Ciphersuite>::F>, + data: &HashMap<Participant, Vec<u8>>, +) -> Participant { + let public = SeraiAddress((Ristretto::generator() * key.deref()).to_bytes()); + + let mut our_i = None; + for participant in data.keys() { + let validator_index = usize::from(u16::from(*participant) - 1); + let (validator, _weight) = set.validators[validator_index]; + if validator == public { + our_i = Some(*participant); + } + } + our_i.unwrap() +} + +// Take a HashMap of participations with non-contiguous Participants and convert them to a +// contiguous sequence. +// +// The input data is expected to not include our own data, which also won't be in the output data. +// +// Returns the mapping from the contiguous Participants to the original Participants. +fn make_contiguous<T>( + our_i: Participant, + mut data: HashMap<Participant, Vec<u8>>, + transform: impl Fn(Vec<u8>) -> std::io::Result<T>, +) -> Result<HashMap<Participant, T>, Participant> { + assert!(!data.contains_key(&our_i)); + + let mut ordered_participants = data.keys().copied().collect::<Vec<_>>(); + ordered_participants.sort_by_key(|participant| u16::from(*participant)); + + let mut our_i = Some(our_i); + let mut contiguous = HashMap::new(); + let mut i = 1; + for participant in ordered_participants { + // If this is the first participant after our own index, increment to account for our index + if let Some(our_i_value) = our_i { + if u16::from(participant) > u16::from(our_i_value) { + i += 1; + our_i = None; + } + } + + let contiguous_index = Participant::new(i).unwrap(); + let data = match transform(data.remove(&participant).unwrap()) { + Ok(data) => data, + Err(_) => Err(participant)?, + }; + contiguous.insert(contiguous_index, data); + i += 1; + } + Ok(contiguous) +} + +fn handle_frost_error<T>(result: Result<T, FrostError>) -> Result<T, Participant> { + match &result { + Ok(_) => Ok(result.unwrap()), + Err(FrostError::InvalidPreprocess(participant) | FrostError::InvalidShare(participant)) => { + Err(*participant) + } + // All of these should be unreachable + Err( + FrostError::InternalError(_) | + FrostError::InvalidParticipant(_, _) | + FrostError::InvalidSigningSet(_) | + FrostError::InvalidParticipantQuantity(_, _) | + FrostError::DuplicatedParticipant(_) | + FrostError::MissingParticipant(_), + ) => { + result.unwrap(); + unreachable!("continued execution after unwrapping Result::Err"); + } + } +} + +#[rustfmt::skip] +enum Signer { + Preprocess { attempt: u32, seed: CachedPreprocess, preprocess: [u8; 64] }, + Share { + attempt: u32, + musig_validators: Vec<SeraiAddress>, + share: [u8; 32], + machine: Box<AlgorithmSignatureMachine<Ristretto, Schnorrkel>>, + }, +} + +/// Performs the DKG Confirmation protocol. +pub(crate) struct ConfirmDkgTask<CD: DbTrait, TD: DbTrait> { + db: CD, + + set: NewSetInformation, + tributary_db: TD, + + serai: Arc<Serai>, + + key: Zeroizing<<Ristretto as Ciphersuite>::F>, + signer: Option<Signer>, +} + +impl<CD: DbTrait, TD: DbTrait> ConfirmDkgTask<CD, TD> { + pub(crate) fn new( + db: CD, + set: NewSetInformation, + tributary_db: TD, + serai: Arc<Serai>, + key: Zeroizing<<Ristretto as Ciphersuite>::F>, + ) -> Self { + Self { db, set, tributary_db, serai, key, signer: None } + } + + fn slash(db: &mut CD, set: ValidatorSet, validator: SeraiAddress) { + let mut txn = db.txn(); + TributaryTransactionsFromDkgConfirmation::send( + &mut txn, + set, + &Transaction::RemoveParticipant { participant: validator, signed: Default::default() }, + ); + txn.commit(); + } + + fn preprocess( + db: &mut CD, + set: ValidatorSet, + attempt: u32, + key: &Zeroizing<<Ristretto as Ciphersuite>::F>, + signer: &mut Option<Signer>, + ) { + // Perform the preprocess + let (machine, preprocess) = AlgorithmMachine::new( + schnorrkel(), + // We use a 1-of-1 Musig here as we don't know who will actually be in this Musig yet + musig(&musig_context(set), key, &[Ristretto::generator() * key.deref()]).unwrap().into(), + ) + .preprocess(&mut OsRng); + // We take the preprocess so we can use it in a distinct machine with the actual Musig + // parameters + let seed = machine.cache(); + + let mut preprocess_bytes = [0u8; 64]; + preprocess_bytes.copy_from_slice(&preprocess.serialize()); + let preprocess = preprocess_bytes; + + let mut txn = db.txn(); + // If this attempt has already been preprocessed for, the Tributary will de-duplicate it + // This may mean the Tributary preprocess is distinct from ours, but we check for that later + TributaryTransactionsFromDkgConfirmation::send( + &mut txn, + set, + &Transaction::DkgConfirmationPreprocess { attempt, preprocess, signed: Default::default() }, + ); + txn.commit(); + + *signer = Some(Signer::Preprocess { attempt, seed, preprocess }); + } +} + +impl<CD: DbTrait, TD: DbTrait> ContinuallyRan for ConfirmDkgTask<CD, TD> { + type Error = SeraiError; + + fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> { + async move { + let mut made_progress = false; + + // If we were sent a key to set, create the signer for it + if self.signer.is_none() && KeysToConfirm::get(&self.db, self.set.set).is_some() { + // Create and publish the initial preprocess + Self::preprocess(&mut self.db, self.set.set, 0, &self.key, &mut self.signer); + + made_progress = true; + } + + // If we have keys to confirm, handle all messages from the tributary + if let Some(key_pair) = KeysToConfirm::get(&self.db, self.set.set) { + // Handle all messages from the Tributary + loop { + let mut tributary_txn = self.tributary_db.txn(); + let Some(msg) = DkgConfirmationMessages::try_recv(&mut tributary_txn, self.set.set) + else { + break; + }; + + match msg { + messages::sign::CoordinatorMessage::Reattempt { + id: messages::sign::SignId { attempt, .. }, + } => { + // Create and publish the preprocess for the specified attempt + Self::preprocess(&mut self.db, self.set.set, attempt, &self.key, &mut self.signer); + } + messages::sign::CoordinatorMessage::Preprocesses { + id: messages::sign::SignId { attempt, .. }, + mut preprocesses, + } => { + // Confirm the preprocess we're expected to sign with is the one we locally have + // It may be different if we rebooted and made a second preprocess for this attempt + let Some(Signer::Preprocess { attempt: our_attempt, seed, preprocess }) = + self.signer.take() + else { + // If this message is not expected, commit the txn to drop it and move on + // At some point, we'll get a Reattempt and reset + tributary_txn.commit(); + break; + }; + + // Determine the MuSig key signed with + let musig_validators = { + let mut ordered_participants = preprocesses.keys().copied().collect::<Vec<_>>(); + ordered_participants.sort_by_key(|participant| u16::from(*participant)); + + let mut res = vec![]; + for participant in ordered_participants { + let (validator, _weight) = + self.set.validators[usize::from(u16::from(participant) - 1)]; + res.push(validator); + } + res + }; + + let musig_public_keys = musig_validators + .iter() + .map(|key| { + Ristretto::read_G(&mut key.0.as_slice()) + .expect("Serai validator had invalid public key") + }) + .collect::<Vec<_>>(); + + let keys = + musig(&musig_context(self.set.set), &self.key, &musig_public_keys).unwrap().into(); + + // Rebuild the machine + let (machine, preprocess_from_cache) = + AlgorithmSignMachine::from_cache(schnorrkel(), keys, seed); + assert_eq!(preprocess.as_slice(), preprocess_from_cache.serialize().as_slice()); + + // Ensure this is a consistent signing session + let our_i = our_i(&self.set, &self.key, &preprocesses); + let consistent = (attempt == our_attempt) && + (preprocesses.remove(&our_i).unwrap().as_slice() == preprocess.as_slice()); + if !consistent { + tributary_txn.commit(); + break; + } + + // Reformat the preprocesses into the expected format for Musig + let preprocesses = match make_contiguous(our_i, preprocesses, |preprocess| { + machine.read_preprocess(&mut preprocess.as_slice()) + }) { + Ok(preprocesses) => preprocesses, + // This yields the *original participant index* + Err(participant) => { + Self::slash( + &mut self.db, + self.set.set, + self.set.validators[usize::from(u16::from(participant) - 1)].0, + ); + tributary_txn.commit(); + break; + } + }; + + // Calculate our share + let (machine, share) = match handle_frost_error( + machine.sign(preprocesses, &set_keys_message(&self.set.set, &key_pair)), + ) { + Ok((machine, share)) => (machine, share), + // This yields the *musig participant index* + Err(participant) => { + Self::slash( + &mut self.db, + self.set.set, + musig_validators[usize::from(u16::from(participant) - 1)], + ); + tributary_txn.commit(); + break; + } + }; + + // Send our share + let share = <[u8; 32]>::try_from(share.serialize()).unwrap(); + let mut txn = self.db.txn(); + TributaryTransactionsFromDkgConfirmation::send( + &mut txn, + self.set.set, + &Transaction::DkgConfirmationShare { attempt, share, signed: Default::default() }, + ); + txn.commit(); + + self.signer = Some(Signer::Share { + attempt, + musig_validators, + share, + machine: Box::new(machine), + }); + } + messages::sign::CoordinatorMessage::Shares { + id: messages::sign::SignId { attempt, .. }, + mut shares, + } => { + let Some(Signer::Share { attempt: our_attempt, musig_validators, share, machine }) = + self.signer.take() + else { + tributary_txn.commit(); + break; + }; + + // Ensure this is a consistent signing session + let our_i = our_i(&self.set, &self.key, &shares); + let consistent = (attempt == our_attempt) && + (shares.remove(&our_i).unwrap().as_slice() == share.as_slice()); + if !consistent { + tributary_txn.commit(); + break; + } + + // Reformat the shares into the expected format for Musig + let shares = match make_contiguous(our_i, shares, |share| { + machine.read_share(&mut share.as_slice()) + }) { + Ok(shares) => shares, + // This yields the *original participant index* + Err(participant) => { + Self::slash( + &mut self.db, + self.set.set, + self.set.validators[usize::from(u16::from(participant) - 1)].0, + ); + tributary_txn.commit(); + break; + } + }; + + match handle_frost_error(machine.complete(shares)) { + Ok(signature) => { + // Create the bitvec of the participants + let mut signature_participants; + { + use bitvec::prelude::*; + signature_participants = bitvec![u8, Lsb0; 0; 0]; + let mut i = 0; + for (validator, _) in self.set.validators { + if Some(validator) == musig_validators.get(i) { + signature_participants.push(true); + i += 1; + } else { + signature_participants.push(false); + } + } + } + + // This is safe to call multiple times as it'll just change which *valid* + // signature to publish + let mut txn = self.db.txn(); + Keys::set( + &mut txn, + self.set.set, + key_pair.clone(), + signature_participants, + signature.into(), + ); + txn.commit(); + } + // This yields the *musig participant index* + Err(participant) => { + Self::slash( + &mut self.db, + self.set.set, + musig_validators[usize::from(u16::from(participant) - 1)], + ); + tributary_txn.commit(); + break; + } + } + } + } + + // Because we successfully handled this message, note we made proress + made_progress = true; + tributary_txn.commit(); + } + } + + // Check if the key has been set on Serai + if KeysToConfirm::get(&self.db, self.set.set).is_some() { + let serai = self.serai.as_of_latest_finalized_block().await?; + let serai = serai.validator_sets(); + let is_historic_set = serai.session(self.set.set.network).await?.map(|session| session.0) > + Some(self.set.set.session.0); + let key_set_on_serai = is_historic_set || serai.keys(self.set.set).await?.is_some(); + if key_set_on_serai { + // Take the keys to confirm so we never instantiate the signer again + let mut txn = self.db.txn(); + KeysToConfirm::take(&mut txn, self.set.set); + txn.commit(); + + // Drop our own signer + // The task won't die until the Tributary does, but now it'll never do anything again + self.signer = None; + + made_progress = true; + } + } + + Ok(made_progress) + } + } +} diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 22043392..9d7afb17 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -14,8 +14,8 @@ use borsh::BorshDeserialize; use tokio::sync::mpsc; use serai_client::{ - primitives::{NetworkId, SeraiAddress, Signature}, - validator_sets::primitives::ValidatorSet, + primitives::{NetworkId, PublicKey, SeraiAddress, Signature}, + validator_sets::primitives::{ValidatorSet, KeyPair}, Serai, }; use message_queue::{Service, client::MessageQueue}; @@ -33,6 +33,7 @@ mod db; use db::*; mod tributary; +mod dkg_confirmation; mod substrate; use substrate::SubstrateTask; @@ -197,7 +198,7 @@ async fn handle_network( messages::ProcessorMessage::KeyGen(msg) => match msg { messages::key_gen::ProcessorMessage::Participation { session, participation } => { let set = ValidatorSet { network, session }; - TributaryTransactions::send( + TributaryTransactionsFromProcessorMessages::send( &mut txn, set, &Transaction::DkgParticipation { participation, signed: Signed::default() }, @@ -207,7 +208,18 @@ async fn handle_network( session, substrate_key, network_key, - } => todo!("TODO DkgConfirmationMessages, Transaction::DkgConfirmationPreprocess"), + } => { + KeysToConfirm::set( + &mut txn, + ValidatorSet { network, session }, + &KeyPair( + PublicKey::from_raw(substrate_key), + network_key + .try_into() + .expect("generated a network key which exceeds the maximum key length"), + ), + ); + } messages::key_gen::ProcessorMessage::Blame { session, participant } => { RemoveParticipant::send(&mut txn, ValidatorSet { network, session }, participant); } @@ -221,11 +233,15 @@ async fn handle_network( if id.attempt == 0 { // Batches are declared by their intent to be signed if let messages::sign::VariantSignId::Batch(hash) = id.id { - TributaryTransactions::send(&mut txn, set, &Transaction::Batch { hash }); + TributaryTransactionsFromProcessorMessages::send( + &mut txn, + set, + &Transaction::Batch { hash }, + ); } } - TributaryTransactions::send( + TributaryTransactionsFromProcessorMessages::send( &mut txn, set, &Transaction::Sign { @@ -239,7 +255,7 @@ async fn handle_network( } messages::sign::ProcessorMessage::Shares { id, shares } => { let set = ValidatorSet { network, session: id.session }; - TributaryTransactions::send( + TributaryTransactionsFromProcessorMessages::send( &mut txn, set, &Transaction::Sign { @@ -284,7 +300,7 @@ async fn handle_network( for (session, plans) in by_session { let set = ValidatorSet { network, session }; SubstrateBlockPlans::set(&mut txn, set, block, &plans); - TributaryTransactions::send( + TributaryTransactionsFromProcessorMessages::send( &mut txn, set, &Transaction::SubstrateBlock { hash: block }, @@ -350,10 +366,13 @@ async fn main() { // Cleanup all historic Tributaries while let Some(to_cleanup) = TributaryCleanup::try_recv(&mut txn) { prune_tributary_db(to_cleanup); + // Remove the keys to confirm for this network + KeysToConfirm::take(&mut txn, to_cleanup); // Drain the cosign intents created for this set while !Cosigning::<Db>::intended_cosigns(&mut txn, to_cleanup).is_empty() {} // Drain the transactions to publish for this set - while TributaryTransactions::try_recv(&mut txn, to_cleanup).is_some() {} + while TributaryTransactionsFromProcessorMessages::try_recv(&mut txn, to_cleanup).is_some() {} + while TributaryTransactionsFromDkgConfirmation::try_recv(&mut txn, to_cleanup).is_some() {} // Drain the participants to remove for this set while RemoveParticipant::try_recv(&mut txn, to_cleanup).is_some() {} // Remove the SignSlashReport notification @@ -442,6 +461,7 @@ async fn main() { p2p.clone(), &p2p_add_tributary_send, tributary, + serai.clone(), serai_key.clone(), ) .await; @@ -456,6 +476,7 @@ async fn main() { p2p: p2p.clone(), p2p_add_tributary: p2p_add_tributary_send.clone(), p2p_retire_tributary: p2p_retire_tributary_send.clone(), + serai: serai.clone(), }) .continually_run(substrate_task_def, vec![]), ); diff --git a/coordinator/src/substrate.rs b/coordinator/src/substrate.rs index 7601b2cc..518db079 100644 --- a/coordinator/src/substrate.rs +++ b/coordinator/src/substrate.rs @@ -9,7 +9,10 @@ use tokio::sync::mpsc; use serai_db::{DbTxn, Db as DbTrait}; -use serai_client::validator_sets::primitives::{Session, ValidatorSet}; +use serai_client::{ + validator_sets::primitives::{Session, ValidatorSet}, + Serai, +}; use message_queue::{Service, Metadata, client::MessageQueue}; use tributary_sdk::Tributary; @@ -29,6 +32,7 @@ pub(crate) struct SubstrateTask<P: P2p> { pub(crate) p2p_add_tributary: mpsc::UnboundedSender<(ValidatorSet, Tributary<Db, Transaction, P>)>, pub(crate) p2p_retire_tributary: mpsc::UnboundedSender<ValidatorSet>, + pub(crate) serai: Arc<Serai>, } impl<P: P2p> ContinuallyRan for SubstrateTask<P> { @@ -146,6 +150,7 @@ impl<P: P2p> ContinuallyRan for SubstrateTask<P> { self.p2p.clone(), &self.p2p_add_tributary, new_set, + self.serai.clone(), self.serai_key.clone(), ) .await; diff --git a/coordinator/src/tributary.rs b/coordinator/src/tributary.rs index 7162bbe1..fdb9c090 100644 --- a/coordinator/src/tributary.rs +++ b/coordinator/src/tributary.rs @@ -11,7 +11,7 @@ use tokio::sync::mpsc; use serai_db::{Get, DbTxn, Db as DbTrait, create_db, db_channel}; use scale::Encode; -use serai_client::validator_sets::primitives::ValidatorSet; +use serai_client::{validator_sets::primitives::ValidatorSet, Serai}; use tributary_sdk::{TransactionKind, TransactionError, ProvidedError, TransactionTrait, Tributary}; @@ -26,7 +26,10 @@ use serai_coordinator_tributary::{ }; use serai_coordinator_p2p::P2p; -use crate::{Db, TributaryTransactions, RemoveParticipant}; +use crate::{ + Db, TributaryTransactionsFromProcessorMessages, TributaryTransactionsFromDkgConfirmation, + RemoveParticipant, dkg_confirmation::ConfirmDkgTask, +}; create_db! { Coordinator { @@ -172,6 +175,7 @@ async fn add_signed_unsigned_transaction<TD: DbTrait, P: P2p>( Ok(true | false) => {} // InvalidNonce may be out-of-order TXs, not invalid ones, but we only create nonce #n+1 after // on-chain inclusion of the TX with nonce #n, so it is invalid within our context + // TODO: We need to handle publishing #n when #n already on-chain Err( TransactionError::TooLargeTransaction | TransactionError::InvalidSigner | @@ -192,7 +196,7 @@ async fn add_signed_unsigned_transaction<TD: DbTrait, P: P2p>( true } -/// Adds all of the transactions sent via `TributaryTransactions`. +/// Adds all of the transactions sent via `TributaryTransactionsFromProcessorMessages`. pub(crate) struct AddTributaryTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> { db: CD, tributary_db: TD, @@ -210,7 +214,19 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactio // Provide/add all transactions sent our way loop { let mut txn = self.db.txn(); - let Some(tx) = TributaryTransactions::try_recv(&mut txn, self.set.set) else { break }; + // This gives priority to DkgConfirmation as that will only yield transactions at the start + // of the Tributary, ensuring this will be exhausted and yield to ProcessorMessages + let tx = match TributaryTransactionsFromDkgConfirmation::try_recv(&mut txn, self.set.set) { + Some(tx) => tx, + None => { + let Some(tx) = + TributaryTransactionsFromProcessorMessages::try_recv(&mut txn, self.set.set) + else { + break; + }; + tx + } + }; let kind = tx.kind(); match kind { @@ -399,6 +415,8 @@ async fn scan_on_new_block<CD: DbTrait, TD: DbTrait, P: P2p>( /// - Spawn the ScanTributaryTask /// - Spawn the ProvideCosignCosignedTransactionsTask /// - Spawn the TributaryProcessorMessagesTask +/// - Spawn the AddTributaryTransactionsTask +/// - Spawn the ConfirmDkgTask /// - Spawn the SignSlashReportTask /// - Iterate the scan task whenever a new block occurs (not just on the standard interval) pub(crate) async fn spawn_tributary<P: P2p>( @@ -407,6 +425,7 @@ pub(crate) async fn spawn_tributary<P: P2p>( p2p: P, p2p_add_tributary: &mpsc::UnboundedSender<(ValidatorSet, Tributary<Db, Transaction, P>)>, set: NewSetInformation, + serai: Arc<Serai>, serai_key: Zeroizing<<Ristretto as Ciphersuite>::F>, ) { // Don't spawn retired Tributaries @@ -485,30 +504,37 @@ pub(crate) async fn spawn_tributary<P: P2p>( .continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]), ); - // Spawn the sign slash report task - let (sign_slash_report_task_def, sign_slash_report_task) = Task::new(); + // Spawn the add transactions task + let (add_tributary_transactions_task_def, add_tributary_transactions_task) = Task::new(); tokio::spawn( - (SignSlashReportTask { + (AddTributaryTransactionsTask { db: db.clone(), tributary_db: tributary_db.clone(), tributary: tributary.clone(), set: set.clone(), key: serai_key.clone(), }) - .continually_run(sign_slash_report_task_def, vec![]), + .continually_run(add_tributary_transactions_task_def, vec![]), ); - // Spawn the add transactions task - let (add_tributary_transactions_task_def, add_tributary_transactions_task) = Task::new(); + // Spawn the task to confirm the DKG result + let (confirm_dkg_task_def, confirm_dkg_task) = Task::new(); tokio::spawn( - (AddTributaryTransactionsTask { + ConfirmDkgTask::new(db.clone(), set.clone(), tributary_db.clone(), serai, serai_key.clone()) + .continually_run(confirm_dkg_task_def, vec![add_tributary_transactions_task]), + ); + + // Spawn the sign slash report task + let (sign_slash_report_task_def, sign_slash_report_task) = Task::new(); + tokio::spawn( + (SignSlashReportTask { db: db.clone(), tributary_db, tributary: tributary.clone(), set: set.clone(), key: serai_key, }) - .continually_run(add_tributary_transactions_task_def, vec![]), + .continually_run(sign_slash_report_task_def, vec![]), ); // Whenever a new block occurs, immediately run the scan task @@ -520,10 +546,6 @@ pub(crate) async fn spawn_tributary<P: P2p>( set.set, tributary, scan_tributary_task, - vec![ - provide_cosign_cosigned_transactions_task, - sign_slash_report_task, - add_tributary_transactions_task, - ], + vec![provide_cosign_cosigned_transactions_task, confirm_dkg_task, sign_slash_report_task], )); }