From 19b87c7f5acab14bbc4ee094ef59ce520be31f98 Mon Sep 17 00:00:00 2001
From: Luke Parker <lukeparker5132@gmail.com>
Date: Wed, 15 Jan 2025 20:29:57 -0500
Subject: [PATCH] Add the DKG confirmation flow

Finishes the coordinator redo
---
 coordinator/src/db.rs               |  31 +-
 coordinator/src/dkg_confirmation.rs | 442 ++++++++++++++++++++++++++++
 coordinator/src/main.rs             |  39 ++-
 coordinator/src/substrate.rs        |   7 +-
 coordinator/src/tributary.rs        |  56 ++--
 5 files changed, 541 insertions(+), 34 deletions(-)
 create mode 100644 coordinator/src/dkg_confirmation.rs

diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs
index 6f336bcc..97500b4e 100644
--- a/coordinator/src/db.rs
+++ b/coordinator/src/db.rs
@@ -7,7 +7,7 @@ use dkg::Participant;
 
 use serai_client::{
   primitives::NetworkId,
-  validator_sets::primitives::{Session, ValidatorSet},
+  validator_sets::primitives::{Session, ValidatorSet, KeyPair},
 };
 
 use serai_cosign::SignedCosign;
@@ -78,6 +78,8 @@ create_db! {
     LastProcessorMessage: (network: NetworkId) -> u64,
     // Cosigns we produced and tried to intake yet incurred an error while doing so
     ErroneousCosigns: () -> Vec<SignedCosign>,
+    // The keys to confirm and set on the Serai network
+    KeysToConfirm: (set: ValidatorSet) -> KeyPair,
   }
 }
 
@@ -95,24 +97,39 @@ mod _internal_db {
 
   db_channel! {
     Coordinator {
-      // Tributary transactions to publish
-      TributaryTransactions: (set: ValidatorSet) -> Transaction,
+      // Tributary transactions to publish from the Processor messages
+      TributaryTransactionsFromProcessorMessages: (set: ValidatorSet) -> Transaction,
+      // Tributary transactions to publish from the DKG confirmation task
+      TributaryTransactionsFromDkgConfirmation: (set: ValidatorSet) -> Transaction,
       // Participants to remove
       RemoveParticipant: (set: ValidatorSet) -> Participant,
     }
   }
 }
 
-pub(crate) struct TributaryTransactions;
-impl TributaryTransactions {
+pub(crate) struct TributaryTransactionsFromProcessorMessages;
+impl TributaryTransactionsFromProcessorMessages {
   pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet, tx: &Transaction) {
     // If this set has yet to be retired, send this transaction
     if RetiredTributary::get(txn, set.network).map(|session| session.0) < Some(set.session.0) {
-      _internal_db::TributaryTransactions::send(txn, set, tx);
+      _internal_db::TributaryTransactionsFromProcessorMessages::send(txn, set, tx);
     }
   }
   pub(crate) fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<Transaction> {
-    _internal_db::TributaryTransactions::try_recv(txn, set)
+    _internal_db::TributaryTransactionsFromProcessorMessages::try_recv(txn, set)
+  }
+}
+
+pub(crate) struct TributaryTransactionsFromDkgConfirmation;
+impl TributaryTransactionsFromDkgConfirmation {
+  pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet, tx: &Transaction) {
+    // If this set has yet to be retired, send this transaction
+    if RetiredTributary::get(txn, set.network).map(|session| session.0) < Some(set.session.0) {
+      _internal_db::TributaryTransactionsFromDkgConfirmation::send(txn, set, tx);
+    }
+  }
+  pub(crate) fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<Transaction> {
+    _internal_db::TributaryTransactionsFromDkgConfirmation::try_recv(txn, set)
   }
 }
 
diff --git a/coordinator/src/dkg_confirmation.rs b/coordinator/src/dkg_confirmation.rs
new file mode 100644
index 00000000..e09b0a4d
--- /dev/null
+++ b/coordinator/src/dkg_confirmation.rs
@@ -0,0 +1,442 @@
+use core::{ops::Deref, future::Future};
+use std::{boxed::Box, sync::Arc, collections::HashMap};
+
+use zeroize::Zeroizing;
+use rand_core::OsRng;
+use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
+use frost_schnorrkel::{
+  frost::{
+    dkg::{Participant, musig::musig},
+    FrostError,
+    sign::*,
+  },
+  Schnorrkel,
+};
+
+use serai_db::{DbTxn, Db as DbTrait};
+
+use serai_client::{
+  primitives::SeraiAddress,
+  validator_sets::primitives::{ValidatorSet, musig_context, set_keys_message},
+  SeraiError, Serai,
+};
+
+use serai_task::ContinuallyRan;
+
+use serai_coordinator_substrate::{NewSetInformation, Keys};
+use serai_coordinator_tributary::{Transaction, DkgConfirmationMessages};
+
+use crate::{KeysToConfirm, TributaryTransactionsFromDkgConfirmation};
+
+fn schnorrkel() -> Schnorrkel {
+  Schnorrkel::new(b"substrate") // TODO: Pull the constant for this
+}
+
+fn our_i(
+  set: &NewSetInformation,
+  key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
+  data: &HashMap<Participant, Vec<u8>>,
+) -> Participant {
+  let public = SeraiAddress((Ristretto::generator() * key.deref()).to_bytes());
+
+  let mut our_i = None;
+  for participant in data.keys() {
+    let validator_index = usize::from(u16::from(*participant) - 1);
+    let (validator, _weight) = set.validators[validator_index];
+    if validator == public {
+      our_i = Some(*participant);
+    }
+  }
+  our_i.unwrap()
+}
+
+// Take a HashMap of participations with non-contiguous Participants and convert them to a
+// contiguous sequence.
+//
+// The input data is expected to not include our own data, which also won't be in the output data.
+//
+// Returns the mapping from the contiguous Participants to the original Participants.
+fn make_contiguous<T>(
+  our_i: Participant,
+  mut data: HashMap<Participant, Vec<u8>>,
+  transform: impl Fn(Vec<u8>) -> std::io::Result<T>,
+) -> Result<HashMap<Participant, T>, Participant> {
+  assert!(!data.contains_key(&our_i));
+
+  let mut ordered_participants = data.keys().copied().collect::<Vec<_>>();
+  ordered_participants.sort_by_key(|participant| u16::from(*participant));
+
+  let mut our_i = Some(our_i);
+  let mut contiguous = HashMap::new();
+  let mut i = 1;
+  for participant in ordered_participants {
+    // If this is the first participant after our own index, increment to account for our index
+    if let Some(our_i_value) = our_i {
+      if u16::from(participant) > u16::from(our_i_value) {
+        i += 1;
+        our_i = None;
+      }
+    }
+
+    let contiguous_index = Participant::new(i).unwrap();
+    let data = match transform(data.remove(&participant).unwrap()) {
+      Ok(data) => data,
+      Err(_) => Err(participant)?,
+    };
+    contiguous.insert(contiguous_index, data);
+    i += 1;
+  }
+  Ok(contiguous)
+}
+
+fn handle_frost_error<T>(result: Result<T, FrostError>) -> Result<T, Participant> {
+  match &result {
+    Ok(_) => Ok(result.unwrap()),
+    Err(FrostError::InvalidPreprocess(participant) | FrostError::InvalidShare(participant)) => {
+      Err(*participant)
+    }
+    // All of these should be unreachable
+    Err(
+      FrostError::InternalError(_) |
+      FrostError::InvalidParticipant(_, _) |
+      FrostError::InvalidSigningSet(_) |
+      FrostError::InvalidParticipantQuantity(_, _) |
+      FrostError::DuplicatedParticipant(_) |
+      FrostError::MissingParticipant(_),
+    ) => {
+      result.unwrap();
+      unreachable!("continued execution after unwrapping Result::Err");
+    }
+  }
+}
+
+#[rustfmt::skip]
+enum Signer {
+  Preprocess { attempt: u32, seed: CachedPreprocess, preprocess: [u8; 64] },
+  Share {
+    attempt: u32,
+    musig_validators: Vec<SeraiAddress>,
+    share: [u8; 32],
+    machine: Box<AlgorithmSignatureMachine<Ristretto, Schnorrkel>>,
+  },
+}
+
+/// Performs the DKG Confirmation protocol.
+pub(crate) struct ConfirmDkgTask<CD: DbTrait, TD: DbTrait> {
+  db: CD,
+
+  set: NewSetInformation,
+  tributary_db: TD,
+
+  serai: Arc<Serai>,
+
+  key: Zeroizing<<Ristretto as Ciphersuite>::F>,
+  signer: Option<Signer>,
+}
+
+impl<CD: DbTrait, TD: DbTrait> ConfirmDkgTask<CD, TD> {
+  pub(crate) fn new(
+    db: CD,
+    set: NewSetInformation,
+    tributary_db: TD,
+    serai: Arc<Serai>,
+    key: Zeroizing<<Ristretto as Ciphersuite>::F>,
+  ) -> Self {
+    Self { db, set, tributary_db, serai, key, signer: None }
+  }
+
+  fn slash(db: &mut CD, set: ValidatorSet, validator: SeraiAddress) {
+    let mut txn = db.txn();
+    TributaryTransactionsFromDkgConfirmation::send(
+      &mut txn,
+      set,
+      &Transaction::RemoveParticipant { participant: validator, signed: Default::default() },
+    );
+    txn.commit();
+  }
+
+  fn preprocess(
+    db: &mut CD,
+    set: ValidatorSet,
+    attempt: u32,
+    key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
+    signer: &mut Option<Signer>,
+  ) {
+    // Perform the preprocess
+    let (machine, preprocess) = AlgorithmMachine::new(
+      schnorrkel(),
+      // We use a 1-of-1 Musig here as we don't know who will actually be in this Musig yet
+      musig(&musig_context(set), key, &[Ristretto::generator() * key.deref()]).unwrap().into(),
+    )
+    .preprocess(&mut OsRng);
+    // We take the preprocess so we can use it in a distinct machine with the actual Musig
+    // parameters
+    let seed = machine.cache();
+
+    let mut preprocess_bytes = [0u8; 64];
+    preprocess_bytes.copy_from_slice(&preprocess.serialize());
+    let preprocess = preprocess_bytes;
+
+    let mut txn = db.txn();
+    // If this attempt has already been preprocessed for, the Tributary will de-duplicate it
+    // This may mean the Tributary preprocess is distinct from ours, but we check for that later
+    TributaryTransactionsFromDkgConfirmation::send(
+      &mut txn,
+      set,
+      &Transaction::DkgConfirmationPreprocess { attempt, preprocess, signed: Default::default() },
+    );
+    txn.commit();
+
+    *signer = Some(Signer::Preprocess { attempt, seed, preprocess });
+  }
+}
+
+impl<CD: DbTrait, TD: DbTrait> ContinuallyRan for ConfirmDkgTask<CD, TD> {
+  type Error = SeraiError;
+
+  fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
+    async move {
+      let mut made_progress = false;
+
+      // If we were sent a key to set, create the signer for it
+      if self.signer.is_none() && KeysToConfirm::get(&self.db, self.set.set).is_some() {
+        // Create and publish the initial preprocess
+        Self::preprocess(&mut self.db, self.set.set, 0, &self.key, &mut self.signer);
+
+        made_progress = true;
+      }
+
+      // If we have keys to confirm, handle all messages from the tributary
+      if let Some(key_pair) = KeysToConfirm::get(&self.db, self.set.set) {
+        // Handle all messages from the Tributary
+        loop {
+          let mut tributary_txn = self.tributary_db.txn();
+          let Some(msg) = DkgConfirmationMessages::try_recv(&mut tributary_txn, self.set.set)
+          else {
+            break;
+          };
+
+          match msg {
+            messages::sign::CoordinatorMessage::Reattempt {
+              id: messages::sign::SignId { attempt, .. },
+            } => {
+              // Create and publish the preprocess for the specified attempt
+              Self::preprocess(&mut self.db, self.set.set, attempt, &self.key, &mut self.signer);
+            }
+            messages::sign::CoordinatorMessage::Preprocesses {
+              id: messages::sign::SignId { attempt, .. },
+              mut preprocesses,
+            } => {
+              // Confirm the preprocess we're expected to sign with is the one we locally have
+              // It may be different if we rebooted and made a second preprocess for this attempt
+              let Some(Signer::Preprocess { attempt: our_attempt, seed, preprocess }) =
+                self.signer.take()
+              else {
+                // If this message is not expected, commit the txn to drop it and move on
+                // At some point, we'll get a Reattempt and reset
+                tributary_txn.commit();
+                break;
+              };
+
+              // Determine the MuSig key signed with
+              let musig_validators = {
+                let mut ordered_participants = preprocesses.keys().copied().collect::<Vec<_>>();
+                ordered_participants.sort_by_key(|participant| u16::from(*participant));
+
+                let mut res = vec![];
+                for participant in ordered_participants {
+                  let (validator, _weight) =
+                    self.set.validators[usize::from(u16::from(participant) - 1)];
+                  res.push(validator);
+                }
+                res
+              };
+
+              let musig_public_keys = musig_validators
+                .iter()
+                .map(|key| {
+                  Ristretto::read_G(&mut key.0.as_slice())
+                    .expect("Serai validator had invalid public key")
+                })
+                .collect::<Vec<_>>();
+
+              let keys =
+                musig(&musig_context(self.set.set), &self.key, &musig_public_keys).unwrap().into();
+
+              // Rebuild the machine
+              let (machine, preprocess_from_cache) =
+                AlgorithmSignMachine::from_cache(schnorrkel(), keys, seed);
+              assert_eq!(preprocess.as_slice(), preprocess_from_cache.serialize().as_slice());
+
+              // Ensure this is a consistent signing session
+              let our_i = our_i(&self.set, &self.key, &preprocesses);
+              let consistent = (attempt == our_attempt) &&
+                (preprocesses.remove(&our_i).unwrap().as_slice() == preprocess.as_slice());
+              if !consistent {
+                tributary_txn.commit();
+                break;
+              }
+
+              // Reformat the preprocesses into the expected format for Musig
+              let preprocesses = match make_contiguous(our_i, preprocesses, |preprocess| {
+                machine.read_preprocess(&mut preprocess.as_slice())
+              }) {
+                Ok(preprocesses) => preprocesses,
+                // This yields the *original participant index*
+                Err(participant) => {
+                  Self::slash(
+                    &mut self.db,
+                    self.set.set,
+                    self.set.validators[usize::from(u16::from(participant) - 1)].0,
+                  );
+                  tributary_txn.commit();
+                  break;
+                }
+              };
+
+              // Calculate our share
+              let (machine, share) = match handle_frost_error(
+                machine.sign(preprocesses, &set_keys_message(&self.set.set, &key_pair)),
+              ) {
+                Ok((machine, share)) => (machine, share),
+                // This yields the *musig participant index*
+                Err(participant) => {
+                  Self::slash(
+                    &mut self.db,
+                    self.set.set,
+                    musig_validators[usize::from(u16::from(participant) - 1)],
+                  );
+                  tributary_txn.commit();
+                  break;
+                }
+              };
+
+              // Send our share
+              let share = <[u8; 32]>::try_from(share.serialize()).unwrap();
+              let mut txn = self.db.txn();
+              TributaryTransactionsFromDkgConfirmation::send(
+                &mut txn,
+                self.set.set,
+                &Transaction::DkgConfirmationShare { attempt, share, signed: Default::default() },
+              );
+              txn.commit();
+
+              self.signer = Some(Signer::Share {
+                attempt,
+                musig_validators,
+                share,
+                machine: Box::new(machine),
+              });
+            }
+            messages::sign::CoordinatorMessage::Shares {
+              id: messages::sign::SignId { attempt, .. },
+              mut shares,
+            } => {
+              let Some(Signer::Share { attempt: our_attempt, musig_validators, share, machine }) =
+                self.signer.take()
+              else {
+                tributary_txn.commit();
+                break;
+              };
+
+              // Ensure this is a consistent signing session
+              let our_i = our_i(&self.set, &self.key, &shares);
+              let consistent = (attempt == our_attempt) &&
+                (shares.remove(&our_i).unwrap().as_slice() == share.as_slice());
+              if !consistent {
+                tributary_txn.commit();
+                break;
+              }
+
+              // Reformat the shares into the expected format for Musig
+              let shares = match make_contiguous(our_i, shares, |share| {
+                machine.read_share(&mut share.as_slice())
+              }) {
+                Ok(shares) => shares,
+                // This yields the *original participant index*
+                Err(participant) => {
+                  Self::slash(
+                    &mut self.db,
+                    self.set.set,
+                    self.set.validators[usize::from(u16::from(participant) - 1)].0,
+                  );
+                  tributary_txn.commit();
+                  break;
+                }
+              };
+
+              match handle_frost_error(machine.complete(shares)) {
+                Ok(signature) => {
+                  // Create the bitvec of the participants
+                  let mut signature_participants;
+                  {
+                    use bitvec::prelude::*;
+                    signature_participants = bitvec![u8, Lsb0; 0; 0];
+                    let mut i = 0;
+                    for (validator, _) in self.set.validators {
+                      if Some(validator) == musig_validators.get(i) {
+                        signature_participants.push(true);
+                        i += 1;
+                      } else {
+                        signature_participants.push(false);
+                      }
+                    }
+                  }
+
+                  // This is safe to call multiple times as it'll just change which *valid*
+                  // signature to publish
+                  let mut txn = self.db.txn();
+                  Keys::set(
+                    &mut txn,
+                    self.set.set,
+                    key_pair.clone(),
+                    signature_participants,
+                    signature.into(),
+                  );
+                  txn.commit();
+                }
+                // This yields the *musig participant index*
+                Err(participant) => {
+                  Self::slash(
+                    &mut self.db,
+                    self.set.set,
+                    musig_validators[usize::from(u16::from(participant) - 1)],
+                  );
+                  tributary_txn.commit();
+                  break;
+                }
+              }
+            }
+          }
+
+          // Because we successfully handled this message, note we made proress
+          made_progress = true;
+          tributary_txn.commit();
+        }
+      }
+
+      // Check if the key has been set on Serai
+      if KeysToConfirm::get(&self.db, self.set.set).is_some() {
+        let serai = self.serai.as_of_latest_finalized_block().await?;
+        let serai = serai.validator_sets();
+        let is_historic_set = serai.session(self.set.set.network).await?.map(|session| session.0) >
+          Some(self.set.set.session.0);
+        let key_set_on_serai = is_historic_set || serai.keys(self.set.set).await?.is_some();
+        if key_set_on_serai {
+          // Take the keys to confirm so we never instantiate the signer again
+          let mut txn = self.db.txn();
+          KeysToConfirm::take(&mut txn, self.set.set);
+          txn.commit();
+
+          // Drop our own signer
+          // The task won't die until the Tributary does, but now it'll never do anything again
+          self.signer = None;
+
+          made_progress = true;
+        }
+      }
+
+      Ok(made_progress)
+    }
+  }
+}
diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs
index 22043392..9d7afb17 100644
--- a/coordinator/src/main.rs
+++ b/coordinator/src/main.rs
@@ -14,8 +14,8 @@ use borsh::BorshDeserialize;
 use tokio::sync::mpsc;
 
 use serai_client::{
-  primitives::{NetworkId, SeraiAddress, Signature},
-  validator_sets::primitives::ValidatorSet,
+  primitives::{NetworkId, PublicKey, SeraiAddress, Signature},
+  validator_sets::primitives::{ValidatorSet, KeyPair},
   Serai,
 };
 use message_queue::{Service, client::MessageQueue};
@@ -33,6 +33,7 @@ mod db;
 use db::*;
 
 mod tributary;
+mod dkg_confirmation;
 
 mod substrate;
 use substrate::SubstrateTask;
@@ -197,7 +198,7 @@ async fn handle_network(
       messages::ProcessorMessage::KeyGen(msg) => match msg {
         messages::key_gen::ProcessorMessage::Participation { session, participation } => {
           let set = ValidatorSet { network, session };
-          TributaryTransactions::send(
+          TributaryTransactionsFromProcessorMessages::send(
             &mut txn,
             set,
             &Transaction::DkgParticipation { participation, signed: Signed::default() },
@@ -207,7 +208,18 @@ async fn handle_network(
           session,
           substrate_key,
           network_key,
-        } => todo!("TODO DkgConfirmationMessages, Transaction::DkgConfirmationPreprocess"),
+        } => {
+          KeysToConfirm::set(
+            &mut txn,
+            ValidatorSet { network, session },
+            &KeyPair(
+              PublicKey::from_raw(substrate_key),
+              network_key
+                .try_into()
+                .expect("generated a network key which exceeds the maximum key length"),
+            ),
+          );
+        }
         messages::key_gen::ProcessorMessage::Blame { session, participant } => {
           RemoveParticipant::send(&mut txn, ValidatorSet { network, session }, participant);
         }
@@ -221,11 +233,15 @@ async fn handle_network(
           if id.attempt == 0 {
             // Batches are declared by their intent to be signed
             if let messages::sign::VariantSignId::Batch(hash) = id.id {
-              TributaryTransactions::send(&mut txn, set, &Transaction::Batch { hash });
+              TributaryTransactionsFromProcessorMessages::send(
+                &mut txn,
+                set,
+                &Transaction::Batch { hash },
+              );
             }
           }
 
-          TributaryTransactions::send(
+          TributaryTransactionsFromProcessorMessages::send(
             &mut txn,
             set,
             &Transaction::Sign {
@@ -239,7 +255,7 @@ async fn handle_network(
         }
         messages::sign::ProcessorMessage::Shares { id, shares } => {
           let set = ValidatorSet { network, session: id.session };
-          TributaryTransactions::send(
+          TributaryTransactionsFromProcessorMessages::send(
             &mut txn,
             set,
             &Transaction::Sign {
@@ -284,7 +300,7 @@ async fn handle_network(
           for (session, plans) in by_session {
             let set = ValidatorSet { network, session };
             SubstrateBlockPlans::set(&mut txn, set, block, &plans);
-            TributaryTransactions::send(
+            TributaryTransactionsFromProcessorMessages::send(
               &mut txn,
               set,
               &Transaction::SubstrateBlock { hash: block },
@@ -350,10 +366,13 @@ async fn main() {
     // Cleanup all historic Tributaries
     while let Some(to_cleanup) = TributaryCleanup::try_recv(&mut txn) {
       prune_tributary_db(to_cleanup);
+      // Remove the keys to confirm for this network
+      KeysToConfirm::take(&mut txn, to_cleanup);
       // Drain the cosign intents created for this set
       while !Cosigning::<Db>::intended_cosigns(&mut txn, to_cleanup).is_empty() {}
       // Drain the transactions to publish for this set
-      while TributaryTransactions::try_recv(&mut txn, to_cleanup).is_some() {}
+      while TributaryTransactionsFromProcessorMessages::try_recv(&mut txn, to_cleanup).is_some() {}
+      while TributaryTransactionsFromDkgConfirmation::try_recv(&mut txn, to_cleanup).is_some() {}
       // Drain the participants to remove for this set
       while RemoveParticipant::try_recv(&mut txn, to_cleanup).is_some() {}
       // Remove the SignSlashReport notification
@@ -442,6 +461,7 @@ async fn main() {
       p2p.clone(),
       &p2p_add_tributary_send,
       tributary,
+      serai.clone(),
       serai_key.clone(),
     )
     .await;
@@ -456,6 +476,7 @@ async fn main() {
       p2p: p2p.clone(),
       p2p_add_tributary: p2p_add_tributary_send.clone(),
       p2p_retire_tributary: p2p_retire_tributary_send.clone(),
+      serai: serai.clone(),
     })
     .continually_run(substrate_task_def, vec![]),
   );
diff --git a/coordinator/src/substrate.rs b/coordinator/src/substrate.rs
index 7601b2cc..518db079 100644
--- a/coordinator/src/substrate.rs
+++ b/coordinator/src/substrate.rs
@@ -9,7 +9,10 @@ use tokio::sync::mpsc;
 
 use serai_db::{DbTxn, Db as DbTrait};
 
-use serai_client::validator_sets::primitives::{Session, ValidatorSet};
+use serai_client::{
+  validator_sets::primitives::{Session, ValidatorSet},
+  Serai,
+};
 use message_queue::{Service, Metadata, client::MessageQueue};
 
 use tributary_sdk::Tributary;
@@ -29,6 +32,7 @@ pub(crate) struct SubstrateTask<P: P2p> {
   pub(crate) p2p_add_tributary:
     mpsc::UnboundedSender<(ValidatorSet, Tributary<Db, Transaction, P>)>,
   pub(crate) p2p_retire_tributary: mpsc::UnboundedSender<ValidatorSet>,
+  pub(crate) serai: Arc<Serai>,
 }
 
 impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
@@ -146,6 +150,7 @@ impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
           self.p2p.clone(),
           &self.p2p_add_tributary,
           new_set,
+          self.serai.clone(),
           self.serai_key.clone(),
         )
         .await;
diff --git a/coordinator/src/tributary.rs b/coordinator/src/tributary.rs
index 7162bbe1..fdb9c090 100644
--- a/coordinator/src/tributary.rs
+++ b/coordinator/src/tributary.rs
@@ -11,7 +11,7 @@ use tokio::sync::mpsc;
 use serai_db::{Get, DbTxn, Db as DbTrait, create_db, db_channel};
 
 use scale::Encode;
-use serai_client::validator_sets::primitives::ValidatorSet;
+use serai_client::{validator_sets::primitives::ValidatorSet, Serai};
 
 use tributary_sdk::{TransactionKind, TransactionError, ProvidedError, TransactionTrait, Tributary};
 
@@ -26,7 +26,10 @@ use serai_coordinator_tributary::{
 };
 use serai_coordinator_p2p::P2p;
 
-use crate::{Db, TributaryTransactions, RemoveParticipant};
+use crate::{
+  Db, TributaryTransactionsFromProcessorMessages, TributaryTransactionsFromDkgConfirmation,
+  RemoveParticipant, dkg_confirmation::ConfirmDkgTask,
+};
 
 create_db! {
   Coordinator {
@@ -172,6 +175,7 @@ async fn add_signed_unsigned_transaction<TD: DbTrait, P: P2p>(
     Ok(true | false) => {}
     // InvalidNonce may be out-of-order TXs, not invalid ones, but we only create nonce #n+1 after
     // on-chain inclusion of the TX with nonce #n, so it is invalid within our context
+    // TODO: We need to handle publishing #n when #n already on-chain
     Err(
       TransactionError::TooLargeTransaction |
       TransactionError::InvalidSigner |
@@ -192,7 +196,7 @@ async fn add_signed_unsigned_transaction<TD: DbTrait, P: P2p>(
   true
 }
 
-/// Adds all of the transactions sent via `TributaryTransactions`.
+/// Adds all of the transactions sent via `TributaryTransactionsFromProcessorMessages`.
 pub(crate) struct AddTributaryTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> {
   db: CD,
   tributary_db: TD,
@@ -210,7 +214,19 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactio
       // Provide/add all transactions sent our way
       loop {
         let mut txn = self.db.txn();
-        let Some(tx) = TributaryTransactions::try_recv(&mut txn, self.set.set) else { break };
+        // This gives priority to DkgConfirmation as that will only yield transactions at the start
+        // of the Tributary, ensuring this will be exhausted and yield to ProcessorMessages
+        let tx = match TributaryTransactionsFromDkgConfirmation::try_recv(&mut txn, self.set.set) {
+          Some(tx) => tx,
+          None => {
+            let Some(tx) =
+              TributaryTransactionsFromProcessorMessages::try_recv(&mut txn, self.set.set)
+            else {
+              break;
+            };
+            tx
+          }
+        };
 
         let kind = tx.kind();
         match kind {
@@ -399,6 +415,8 @@ async fn scan_on_new_block<CD: DbTrait, TD: DbTrait, P: P2p>(
 /// - Spawn the ScanTributaryTask
 /// - Spawn the ProvideCosignCosignedTransactionsTask
 /// - Spawn the TributaryProcessorMessagesTask
+/// - Spawn the AddTributaryTransactionsTask
+/// - Spawn the ConfirmDkgTask
 /// - Spawn the SignSlashReportTask
 /// - Iterate the scan task whenever a new block occurs (not just on the standard interval)
 pub(crate) async fn spawn_tributary<P: P2p>(
@@ -407,6 +425,7 @@ pub(crate) async fn spawn_tributary<P: P2p>(
   p2p: P,
   p2p_add_tributary: &mpsc::UnboundedSender<(ValidatorSet, Tributary<Db, Transaction, P>)>,
   set: NewSetInformation,
+  serai: Arc<Serai>,
   serai_key: Zeroizing<<Ristretto as Ciphersuite>::F>,
 ) {
   // Don't spawn retired Tributaries
@@ -485,30 +504,37 @@ pub(crate) async fn spawn_tributary<P: P2p>(
       .continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]),
   );
 
-  // Spawn the sign slash report task
-  let (sign_slash_report_task_def, sign_slash_report_task) = Task::new();
+  // Spawn the add transactions task
+  let (add_tributary_transactions_task_def, add_tributary_transactions_task) = Task::new();
   tokio::spawn(
-    (SignSlashReportTask {
+    (AddTributaryTransactionsTask {
       db: db.clone(),
       tributary_db: tributary_db.clone(),
       tributary: tributary.clone(),
       set: set.clone(),
       key: serai_key.clone(),
     })
-    .continually_run(sign_slash_report_task_def, vec![]),
+    .continually_run(add_tributary_transactions_task_def, vec![]),
   );
 
-  // Spawn the add transactions task
-  let (add_tributary_transactions_task_def, add_tributary_transactions_task) = Task::new();
+  // Spawn the task to confirm the DKG result
+  let (confirm_dkg_task_def, confirm_dkg_task) = Task::new();
   tokio::spawn(
-    (AddTributaryTransactionsTask {
+    ConfirmDkgTask::new(db.clone(), set.clone(), tributary_db.clone(), serai, serai_key.clone())
+      .continually_run(confirm_dkg_task_def, vec![add_tributary_transactions_task]),
+  );
+
+  // Spawn the sign slash report task
+  let (sign_slash_report_task_def, sign_slash_report_task) = Task::new();
+  tokio::spawn(
+    (SignSlashReportTask {
       db: db.clone(),
       tributary_db,
       tributary: tributary.clone(),
       set: set.clone(),
       key: serai_key,
     })
-    .continually_run(add_tributary_transactions_task_def, vec![]),
+    .continually_run(sign_slash_report_task_def, vec![]),
   );
 
   // Whenever a new block occurs, immediately run the scan task
@@ -520,10 +546,6 @@ pub(crate) async fn spawn_tributary<P: P2p>(
     set.set,
     tributary,
     scan_tributary_task,
-    vec![
-      provide_cosign_cosigned_transactions_task,
-      sign_slash_report_task,
-      add_tributary_transactions_task,
-    ],
+    vec![provide_cosign_cosigned_transactions_task, confirm_dkg_task, sign_slash_report_task],
   ));
 }