mirror of
https://github.com/serai-dex/serai.git
synced 2025-04-13 09:41:57 +00:00
Add the DKG confirmation flow
Finishes the coordinator redo
This commit is contained in:
parent
505f1b20a4
commit
19b87c7f5a
5 changed files with 541 additions and 34 deletions
coordinator/src
|
@ -7,7 +7,7 @@ use dkg::Participant;
|
|||
|
||||
use serai_client::{
|
||||
primitives::NetworkId,
|
||||
validator_sets::primitives::{Session, ValidatorSet},
|
||||
validator_sets::primitives::{Session, ValidatorSet, KeyPair},
|
||||
};
|
||||
|
||||
use serai_cosign::SignedCosign;
|
||||
|
@ -78,6 +78,8 @@ create_db! {
|
|||
LastProcessorMessage: (network: NetworkId) -> u64,
|
||||
// Cosigns we produced and tried to intake yet incurred an error while doing so
|
||||
ErroneousCosigns: () -> Vec<SignedCosign>,
|
||||
// The keys to confirm and set on the Serai network
|
||||
KeysToConfirm: (set: ValidatorSet) -> KeyPair,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -95,24 +97,39 @@ mod _internal_db {
|
|||
|
||||
db_channel! {
|
||||
Coordinator {
|
||||
// Tributary transactions to publish
|
||||
TributaryTransactions: (set: ValidatorSet) -> Transaction,
|
||||
// Tributary transactions to publish from the Processor messages
|
||||
TributaryTransactionsFromProcessorMessages: (set: ValidatorSet) -> Transaction,
|
||||
// Tributary transactions to publish from the DKG confirmation task
|
||||
TributaryTransactionsFromDkgConfirmation: (set: ValidatorSet) -> Transaction,
|
||||
// Participants to remove
|
||||
RemoveParticipant: (set: ValidatorSet) -> Participant,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct TributaryTransactions;
|
||||
impl TributaryTransactions {
|
||||
pub(crate) struct TributaryTransactionsFromProcessorMessages;
|
||||
impl TributaryTransactionsFromProcessorMessages {
|
||||
pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet, tx: &Transaction) {
|
||||
// If this set has yet to be retired, send this transaction
|
||||
if RetiredTributary::get(txn, set.network).map(|session| session.0) < Some(set.session.0) {
|
||||
_internal_db::TributaryTransactions::send(txn, set, tx);
|
||||
_internal_db::TributaryTransactionsFromProcessorMessages::send(txn, set, tx);
|
||||
}
|
||||
}
|
||||
pub(crate) fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<Transaction> {
|
||||
_internal_db::TributaryTransactions::try_recv(txn, set)
|
||||
_internal_db::TributaryTransactionsFromProcessorMessages::try_recv(txn, set)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct TributaryTransactionsFromDkgConfirmation;
|
||||
impl TributaryTransactionsFromDkgConfirmation {
|
||||
pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet, tx: &Transaction) {
|
||||
// If this set has yet to be retired, send this transaction
|
||||
if RetiredTributary::get(txn, set.network).map(|session| session.0) < Some(set.session.0) {
|
||||
_internal_db::TributaryTransactionsFromDkgConfirmation::send(txn, set, tx);
|
||||
}
|
||||
}
|
||||
pub(crate) fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<Transaction> {
|
||||
_internal_db::TributaryTransactionsFromDkgConfirmation::try_recv(txn, set)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
442
coordinator/src/dkg_confirmation.rs
Normal file
442
coordinator/src/dkg_confirmation.rs
Normal file
|
@ -0,0 +1,442 @@
|
|||
use core::{ops::Deref, future::Future};
|
||||
use std::{boxed::Box, sync::Arc, collections::HashMap};
|
||||
|
||||
use zeroize::Zeroizing;
|
||||
use rand_core::OsRng;
|
||||
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
|
||||
use frost_schnorrkel::{
|
||||
frost::{
|
||||
dkg::{Participant, musig::musig},
|
||||
FrostError,
|
||||
sign::*,
|
||||
},
|
||||
Schnorrkel,
|
||||
};
|
||||
|
||||
use serai_db::{DbTxn, Db as DbTrait};
|
||||
|
||||
use serai_client::{
|
||||
primitives::SeraiAddress,
|
||||
validator_sets::primitives::{ValidatorSet, musig_context, set_keys_message},
|
||||
SeraiError, Serai,
|
||||
};
|
||||
|
||||
use serai_task::ContinuallyRan;
|
||||
|
||||
use serai_coordinator_substrate::{NewSetInformation, Keys};
|
||||
use serai_coordinator_tributary::{Transaction, DkgConfirmationMessages};
|
||||
|
||||
use crate::{KeysToConfirm, TributaryTransactionsFromDkgConfirmation};
|
||||
|
||||
fn schnorrkel() -> Schnorrkel {
|
||||
Schnorrkel::new(b"substrate") // TODO: Pull the constant for this
|
||||
}
|
||||
|
||||
fn our_i(
|
||||
set: &NewSetInformation,
|
||||
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||
data: &HashMap<Participant, Vec<u8>>,
|
||||
) -> Participant {
|
||||
let public = SeraiAddress((Ristretto::generator() * key.deref()).to_bytes());
|
||||
|
||||
let mut our_i = None;
|
||||
for participant in data.keys() {
|
||||
let validator_index = usize::from(u16::from(*participant) - 1);
|
||||
let (validator, _weight) = set.validators[validator_index];
|
||||
if validator == public {
|
||||
our_i = Some(*participant);
|
||||
}
|
||||
}
|
||||
our_i.unwrap()
|
||||
}
|
||||
|
||||
// Take a HashMap of participations with non-contiguous Participants and convert them to a
|
||||
// contiguous sequence.
|
||||
//
|
||||
// The input data is expected to not include our own data, which also won't be in the output data.
|
||||
//
|
||||
// Returns the mapping from the contiguous Participants to the original Participants.
|
||||
fn make_contiguous<T>(
|
||||
our_i: Participant,
|
||||
mut data: HashMap<Participant, Vec<u8>>,
|
||||
transform: impl Fn(Vec<u8>) -> std::io::Result<T>,
|
||||
) -> Result<HashMap<Participant, T>, Participant> {
|
||||
assert!(!data.contains_key(&our_i));
|
||||
|
||||
let mut ordered_participants = data.keys().copied().collect::<Vec<_>>();
|
||||
ordered_participants.sort_by_key(|participant| u16::from(*participant));
|
||||
|
||||
let mut our_i = Some(our_i);
|
||||
let mut contiguous = HashMap::new();
|
||||
let mut i = 1;
|
||||
for participant in ordered_participants {
|
||||
// If this is the first participant after our own index, increment to account for our index
|
||||
if let Some(our_i_value) = our_i {
|
||||
if u16::from(participant) > u16::from(our_i_value) {
|
||||
i += 1;
|
||||
our_i = None;
|
||||
}
|
||||
}
|
||||
|
||||
let contiguous_index = Participant::new(i).unwrap();
|
||||
let data = match transform(data.remove(&participant).unwrap()) {
|
||||
Ok(data) => data,
|
||||
Err(_) => Err(participant)?,
|
||||
};
|
||||
contiguous.insert(contiguous_index, data);
|
||||
i += 1;
|
||||
}
|
||||
Ok(contiguous)
|
||||
}
|
||||
|
||||
fn handle_frost_error<T>(result: Result<T, FrostError>) -> Result<T, Participant> {
|
||||
match &result {
|
||||
Ok(_) => Ok(result.unwrap()),
|
||||
Err(FrostError::InvalidPreprocess(participant) | FrostError::InvalidShare(participant)) => {
|
||||
Err(*participant)
|
||||
}
|
||||
// All of these should be unreachable
|
||||
Err(
|
||||
FrostError::InternalError(_) |
|
||||
FrostError::InvalidParticipant(_, _) |
|
||||
FrostError::InvalidSigningSet(_) |
|
||||
FrostError::InvalidParticipantQuantity(_, _) |
|
||||
FrostError::DuplicatedParticipant(_) |
|
||||
FrostError::MissingParticipant(_),
|
||||
) => {
|
||||
result.unwrap();
|
||||
unreachable!("continued execution after unwrapping Result::Err");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[rustfmt::skip]
|
||||
enum Signer {
|
||||
Preprocess { attempt: u32, seed: CachedPreprocess, preprocess: [u8; 64] },
|
||||
Share {
|
||||
attempt: u32,
|
||||
musig_validators: Vec<SeraiAddress>,
|
||||
share: [u8; 32],
|
||||
machine: Box<AlgorithmSignatureMachine<Ristretto, Schnorrkel>>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Performs the DKG Confirmation protocol.
|
||||
pub(crate) struct ConfirmDkgTask<CD: DbTrait, TD: DbTrait> {
|
||||
db: CD,
|
||||
|
||||
set: NewSetInformation,
|
||||
tributary_db: TD,
|
||||
|
||||
serai: Arc<Serai>,
|
||||
|
||||
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||
signer: Option<Signer>,
|
||||
}
|
||||
|
||||
impl<CD: DbTrait, TD: DbTrait> ConfirmDkgTask<CD, TD> {
|
||||
pub(crate) fn new(
|
||||
db: CD,
|
||||
set: NewSetInformation,
|
||||
tributary_db: TD,
|
||||
serai: Arc<Serai>,
|
||||
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||
) -> Self {
|
||||
Self { db, set, tributary_db, serai, key, signer: None }
|
||||
}
|
||||
|
||||
fn slash(db: &mut CD, set: ValidatorSet, validator: SeraiAddress) {
|
||||
let mut txn = db.txn();
|
||||
TributaryTransactionsFromDkgConfirmation::send(
|
||||
&mut txn,
|
||||
set,
|
||||
&Transaction::RemoveParticipant { participant: validator, signed: Default::default() },
|
||||
);
|
||||
txn.commit();
|
||||
}
|
||||
|
||||
fn preprocess(
|
||||
db: &mut CD,
|
||||
set: ValidatorSet,
|
||||
attempt: u32,
|
||||
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||
signer: &mut Option<Signer>,
|
||||
) {
|
||||
// Perform the preprocess
|
||||
let (machine, preprocess) = AlgorithmMachine::new(
|
||||
schnorrkel(),
|
||||
// We use a 1-of-1 Musig here as we don't know who will actually be in this Musig yet
|
||||
musig(&musig_context(set), key, &[Ristretto::generator() * key.deref()]).unwrap().into(),
|
||||
)
|
||||
.preprocess(&mut OsRng);
|
||||
// We take the preprocess so we can use it in a distinct machine with the actual Musig
|
||||
// parameters
|
||||
let seed = machine.cache();
|
||||
|
||||
let mut preprocess_bytes = [0u8; 64];
|
||||
preprocess_bytes.copy_from_slice(&preprocess.serialize());
|
||||
let preprocess = preprocess_bytes;
|
||||
|
||||
let mut txn = db.txn();
|
||||
// If this attempt has already been preprocessed for, the Tributary will de-duplicate it
|
||||
// This may mean the Tributary preprocess is distinct from ours, but we check for that later
|
||||
TributaryTransactionsFromDkgConfirmation::send(
|
||||
&mut txn,
|
||||
set,
|
||||
&Transaction::DkgConfirmationPreprocess { attempt, preprocess, signed: Default::default() },
|
||||
);
|
||||
txn.commit();
|
||||
|
||||
*signer = Some(Signer::Preprocess { attempt, seed, preprocess });
|
||||
}
|
||||
}
|
||||
|
||||
impl<CD: DbTrait, TD: DbTrait> ContinuallyRan for ConfirmDkgTask<CD, TD> {
|
||||
type Error = SeraiError;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let mut made_progress = false;
|
||||
|
||||
// If we were sent a key to set, create the signer for it
|
||||
if self.signer.is_none() && KeysToConfirm::get(&self.db, self.set.set).is_some() {
|
||||
// Create and publish the initial preprocess
|
||||
Self::preprocess(&mut self.db, self.set.set, 0, &self.key, &mut self.signer);
|
||||
|
||||
made_progress = true;
|
||||
}
|
||||
|
||||
// If we have keys to confirm, handle all messages from the tributary
|
||||
if let Some(key_pair) = KeysToConfirm::get(&self.db, self.set.set) {
|
||||
// Handle all messages from the Tributary
|
||||
loop {
|
||||
let mut tributary_txn = self.tributary_db.txn();
|
||||
let Some(msg) = DkgConfirmationMessages::try_recv(&mut tributary_txn, self.set.set)
|
||||
else {
|
||||
break;
|
||||
};
|
||||
|
||||
match msg {
|
||||
messages::sign::CoordinatorMessage::Reattempt {
|
||||
id: messages::sign::SignId { attempt, .. },
|
||||
} => {
|
||||
// Create and publish the preprocess for the specified attempt
|
||||
Self::preprocess(&mut self.db, self.set.set, attempt, &self.key, &mut self.signer);
|
||||
}
|
||||
messages::sign::CoordinatorMessage::Preprocesses {
|
||||
id: messages::sign::SignId { attempt, .. },
|
||||
mut preprocesses,
|
||||
} => {
|
||||
// Confirm the preprocess we're expected to sign with is the one we locally have
|
||||
// It may be different if we rebooted and made a second preprocess for this attempt
|
||||
let Some(Signer::Preprocess { attempt: our_attempt, seed, preprocess }) =
|
||||
self.signer.take()
|
||||
else {
|
||||
// If this message is not expected, commit the txn to drop it and move on
|
||||
// At some point, we'll get a Reattempt and reset
|
||||
tributary_txn.commit();
|
||||
break;
|
||||
};
|
||||
|
||||
// Determine the MuSig key signed with
|
||||
let musig_validators = {
|
||||
let mut ordered_participants = preprocesses.keys().copied().collect::<Vec<_>>();
|
||||
ordered_participants.sort_by_key(|participant| u16::from(*participant));
|
||||
|
||||
let mut res = vec![];
|
||||
for participant in ordered_participants {
|
||||
let (validator, _weight) =
|
||||
self.set.validators[usize::from(u16::from(participant) - 1)];
|
||||
res.push(validator);
|
||||
}
|
||||
res
|
||||
};
|
||||
|
||||
let musig_public_keys = musig_validators
|
||||
.iter()
|
||||
.map(|key| {
|
||||
Ristretto::read_G(&mut key.0.as_slice())
|
||||
.expect("Serai validator had invalid public key")
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let keys =
|
||||
musig(&musig_context(self.set.set), &self.key, &musig_public_keys).unwrap().into();
|
||||
|
||||
// Rebuild the machine
|
||||
let (machine, preprocess_from_cache) =
|
||||
AlgorithmSignMachine::from_cache(schnorrkel(), keys, seed);
|
||||
assert_eq!(preprocess.as_slice(), preprocess_from_cache.serialize().as_slice());
|
||||
|
||||
// Ensure this is a consistent signing session
|
||||
let our_i = our_i(&self.set, &self.key, &preprocesses);
|
||||
let consistent = (attempt == our_attempt) &&
|
||||
(preprocesses.remove(&our_i).unwrap().as_slice() == preprocess.as_slice());
|
||||
if !consistent {
|
||||
tributary_txn.commit();
|
||||
break;
|
||||
}
|
||||
|
||||
// Reformat the preprocesses into the expected format for Musig
|
||||
let preprocesses = match make_contiguous(our_i, preprocesses, |preprocess| {
|
||||
machine.read_preprocess(&mut preprocess.as_slice())
|
||||
}) {
|
||||
Ok(preprocesses) => preprocesses,
|
||||
// This yields the *original participant index*
|
||||
Err(participant) => {
|
||||
Self::slash(
|
||||
&mut self.db,
|
||||
self.set.set,
|
||||
self.set.validators[usize::from(u16::from(participant) - 1)].0,
|
||||
);
|
||||
tributary_txn.commit();
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
// Calculate our share
|
||||
let (machine, share) = match handle_frost_error(
|
||||
machine.sign(preprocesses, &set_keys_message(&self.set.set, &key_pair)),
|
||||
) {
|
||||
Ok((machine, share)) => (machine, share),
|
||||
// This yields the *musig participant index*
|
||||
Err(participant) => {
|
||||
Self::slash(
|
||||
&mut self.db,
|
||||
self.set.set,
|
||||
musig_validators[usize::from(u16::from(participant) - 1)],
|
||||
);
|
||||
tributary_txn.commit();
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
// Send our share
|
||||
let share = <[u8; 32]>::try_from(share.serialize()).unwrap();
|
||||
let mut txn = self.db.txn();
|
||||
TributaryTransactionsFromDkgConfirmation::send(
|
||||
&mut txn,
|
||||
self.set.set,
|
||||
&Transaction::DkgConfirmationShare { attempt, share, signed: Default::default() },
|
||||
);
|
||||
txn.commit();
|
||||
|
||||
self.signer = Some(Signer::Share {
|
||||
attempt,
|
||||
musig_validators,
|
||||
share,
|
||||
machine: Box::new(machine),
|
||||
});
|
||||
}
|
||||
messages::sign::CoordinatorMessage::Shares {
|
||||
id: messages::sign::SignId { attempt, .. },
|
||||
mut shares,
|
||||
} => {
|
||||
let Some(Signer::Share { attempt: our_attempt, musig_validators, share, machine }) =
|
||||
self.signer.take()
|
||||
else {
|
||||
tributary_txn.commit();
|
||||
break;
|
||||
};
|
||||
|
||||
// Ensure this is a consistent signing session
|
||||
let our_i = our_i(&self.set, &self.key, &shares);
|
||||
let consistent = (attempt == our_attempt) &&
|
||||
(shares.remove(&our_i).unwrap().as_slice() == share.as_slice());
|
||||
if !consistent {
|
||||
tributary_txn.commit();
|
||||
break;
|
||||
}
|
||||
|
||||
// Reformat the shares into the expected format for Musig
|
||||
let shares = match make_contiguous(our_i, shares, |share| {
|
||||
machine.read_share(&mut share.as_slice())
|
||||
}) {
|
||||
Ok(shares) => shares,
|
||||
// This yields the *original participant index*
|
||||
Err(participant) => {
|
||||
Self::slash(
|
||||
&mut self.db,
|
||||
self.set.set,
|
||||
self.set.validators[usize::from(u16::from(participant) - 1)].0,
|
||||
);
|
||||
tributary_txn.commit();
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
match handle_frost_error(machine.complete(shares)) {
|
||||
Ok(signature) => {
|
||||
// Create the bitvec of the participants
|
||||
let mut signature_participants;
|
||||
{
|
||||
use bitvec::prelude::*;
|
||||
signature_participants = bitvec![u8, Lsb0; 0; 0];
|
||||
let mut i = 0;
|
||||
for (validator, _) in self.set.validators {
|
||||
if Some(validator) == musig_validators.get(i) {
|
||||
signature_participants.push(true);
|
||||
i += 1;
|
||||
} else {
|
||||
signature_participants.push(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This is safe to call multiple times as it'll just change which *valid*
|
||||
// signature to publish
|
||||
let mut txn = self.db.txn();
|
||||
Keys::set(
|
||||
&mut txn,
|
||||
self.set.set,
|
||||
key_pair.clone(),
|
||||
signature_participants,
|
||||
signature.into(),
|
||||
);
|
||||
txn.commit();
|
||||
}
|
||||
// This yields the *musig participant index*
|
||||
Err(participant) => {
|
||||
Self::slash(
|
||||
&mut self.db,
|
||||
self.set.set,
|
||||
musig_validators[usize::from(u16::from(participant) - 1)],
|
||||
);
|
||||
tributary_txn.commit();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Because we successfully handled this message, note we made proress
|
||||
made_progress = true;
|
||||
tributary_txn.commit();
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the key has been set on Serai
|
||||
if KeysToConfirm::get(&self.db, self.set.set).is_some() {
|
||||
let serai = self.serai.as_of_latest_finalized_block().await?;
|
||||
let serai = serai.validator_sets();
|
||||
let is_historic_set = serai.session(self.set.set.network).await?.map(|session| session.0) >
|
||||
Some(self.set.set.session.0);
|
||||
let key_set_on_serai = is_historic_set || serai.keys(self.set.set).await?.is_some();
|
||||
if key_set_on_serai {
|
||||
// Take the keys to confirm so we never instantiate the signer again
|
||||
let mut txn = self.db.txn();
|
||||
KeysToConfirm::take(&mut txn, self.set.set);
|
||||
txn.commit();
|
||||
|
||||
// Drop our own signer
|
||||
// The task won't die until the Tributary does, but now it'll never do anything again
|
||||
self.signer = None;
|
||||
|
||||
made_progress = true;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(made_progress)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -14,8 +14,8 @@ use borsh::BorshDeserialize;
|
|||
use tokio::sync::mpsc;
|
||||
|
||||
use serai_client::{
|
||||
primitives::{NetworkId, SeraiAddress, Signature},
|
||||
validator_sets::primitives::ValidatorSet,
|
||||
primitives::{NetworkId, PublicKey, SeraiAddress, Signature},
|
||||
validator_sets::primitives::{ValidatorSet, KeyPair},
|
||||
Serai,
|
||||
};
|
||||
use message_queue::{Service, client::MessageQueue};
|
||||
|
@ -33,6 +33,7 @@ mod db;
|
|||
use db::*;
|
||||
|
||||
mod tributary;
|
||||
mod dkg_confirmation;
|
||||
|
||||
mod substrate;
|
||||
use substrate::SubstrateTask;
|
||||
|
@ -197,7 +198,7 @@ async fn handle_network(
|
|||
messages::ProcessorMessage::KeyGen(msg) => match msg {
|
||||
messages::key_gen::ProcessorMessage::Participation { session, participation } => {
|
||||
let set = ValidatorSet { network, session };
|
||||
TributaryTransactions::send(
|
||||
TributaryTransactionsFromProcessorMessages::send(
|
||||
&mut txn,
|
||||
set,
|
||||
&Transaction::DkgParticipation { participation, signed: Signed::default() },
|
||||
|
@ -207,7 +208,18 @@ async fn handle_network(
|
|||
session,
|
||||
substrate_key,
|
||||
network_key,
|
||||
} => todo!("TODO DkgConfirmationMessages, Transaction::DkgConfirmationPreprocess"),
|
||||
} => {
|
||||
KeysToConfirm::set(
|
||||
&mut txn,
|
||||
ValidatorSet { network, session },
|
||||
&KeyPair(
|
||||
PublicKey::from_raw(substrate_key),
|
||||
network_key
|
||||
.try_into()
|
||||
.expect("generated a network key which exceeds the maximum key length"),
|
||||
),
|
||||
);
|
||||
}
|
||||
messages::key_gen::ProcessorMessage::Blame { session, participant } => {
|
||||
RemoveParticipant::send(&mut txn, ValidatorSet { network, session }, participant);
|
||||
}
|
||||
|
@ -221,11 +233,15 @@ async fn handle_network(
|
|||
if id.attempt == 0 {
|
||||
// Batches are declared by their intent to be signed
|
||||
if let messages::sign::VariantSignId::Batch(hash) = id.id {
|
||||
TributaryTransactions::send(&mut txn, set, &Transaction::Batch { hash });
|
||||
TributaryTransactionsFromProcessorMessages::send(
|
||||
&mut txn,
|
||||
set,
|
||||
&Transaction::Batch { hash },
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
TributaryTransactions::send(
|
||||
TributaryTransactionsFromProcessorMessages::send(
|
||||
&mut txn,
|
||||
set,
|
||||
&Transaction::Sign {
|
||||
|
@ -239,7 +255,7 @@ async fn handle_network(
|
|||
}
|
||||
messages::sign::ProcessorMessage::Shares { id, shares } => {
|
||||
let set = ValidatorSet { network, session: id.session };
|
||||
TributaryTransactions::send(
|
||||
TributaryTransactionsFromProcessorMessages::send(
|
||||
&mut txn,
|
||||
set,
|
||||
&Transaction::Sign {
|
||||
|
@ -284,7 +300,7 @@ async fn handle_network(
|
|||
for (session, plans) in by_session {
|
||||
let set = ValidatorSet { network, session };
|
||||
SubstrateBlockPlans::set(&mut txn, set, block, &plans);
|
||||
TributaryTransactions::send(
|
||||
TributaryTransactionsFromProcessorMessages::send(
|
||||
&mut txn,
|
||||
set,
|
||||
&Transaction::SubstrateBlock { hash: block },
|
||||
|
@ -350,10 +366,13 @@ async fn main() {
|
|||
// Cleanup all historic Tributaries
|
||||
while let Some(to_cleanup) = TributaryCleanup::try_recv(&mut txn) {
|
||||
prune_tributary_db(to_cleanup);
|
||||
// Remove the keys to confirm for this network
|
||||
KeysToConfirm::take(&mut txn, to_cleanup);
|
||||
// Drain the cosign intents created for this set
|
||||
while !Cosigning::<Db>::intended_cosigns(&mut txn, to_cleanup).is_empty() {}
|
||||
// Drain the transactions to publish for this set
|
||||
while TributaryTransactions::try_recv(&mut txn, to_cleanup).is_some() {}
|
||||
while TributaryTransactionsFromProcessorMessages::try_recv(&mut txn, to_cleanup).is_some() {}
|
||||
while TributaryTransactionsFromDkgConfirmation::try_recv(&mut txn, to_cleanup).is_some() {}
|
||||
// Drain the participants to remove for this set
|
||||
while RemoveParticipant::try_recv(&mut txn, to_cleanup).is_some() {}
|
||||
// Remove the SignSlashReport notification
|
||||
|
@ -442,6 +461,7 @@ async fn main() {
|
|||
p2p.clone(),
|
||||
&p2p_add_tributary_send,
|
||||
tributary,
|
||||
serai.clone(),
|
||||
serai_key.clone(),
|
||||
)
|
||||
.await;
|
||||
|
@ -456,6 +476,7 @@ async fn main() {
|
|||
p2p: p2p.clone(),
|
||||
p2p_add_tributary: p2p_add_tributary_send.clone(),
|
||||
p2p_retire_tributary: p2p_retire_tributary_send.clone(),
|
||||
serai: serai.clone(),
|
||||
})
|
||||
.continually_run(substrate_task_def, vec![]),
|
||||
);
|
||||
|
|
|
@ -9,7 +9,10 @@ use tokio::sync::mpsc;
|
|||
|
||||
use serai_db::{DbTxn, Db as DbTrait};
|
||||
|
||||
use serai_client::validator_sets::primitives::{Session, ValidatorSet};
|
||||
use serai_client::{
|
||||
validator_sets::primitives::{Session, ValidatorSet},
|
||||
Serai,
|
||||
};
|
||||
use message_queue::{Service, Metadata, client::MessageQueue};
|
||||
|
||||
use tributary_sdk::Tributary;
|
||||
|
@ -29,6 +32,7 @@ pub(crate) struct SubstrateTask<P: P2p> {
|
|||
pub(crate) p2p_add_tributary:
|
||||
mpsc::UnboundedSender<(ValidatorSet, Tributary<Db, Transaction, P>)>,
|
||||
pub(crate) p2p_retire_tributary: mpsc::UnboundedSender<ValidatorSet>,
|
||||
pub(crate) serai: Arc<Serai>,
|
||||
}
|
||||
|
||||
impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
|
||||
|
@ -146,6 +150,7 @@ impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
|
|||
self.p2p.clone(),
|
||||
&self.p2p_add_tributary,
|
||||
new_set,
|
||||
self.serai.clone(),
|
||||
self.serai_key.clone(),
|
||||
)
|
||||
.await;
|
||||
|
|
|
@ -11,7 +11,7 @@ use tokio::sync::mpsc;
|
|||
use serai_db::{Get, DbTxn, Db as DbTrait, create_db, db_channel};
|
||||
|
||||
use scale::Encode;
|
||||
use serai_client::validator_sets::primitives::ValidatorSet;
|
||||
use serai_client::{validator_sets::primitives::ValidatorSet, Serai};
|
||||
|
||||
use tributary_sdk::{TransactionKind, TransactionError, ProvidedError, TransactionTrait, Tributary};
|
||||
|
||||
|
@ -26,7 +26,10 @@ use serai_coordinator_tributary::{
|
|||
};
|
||||
use serai_coordinator_p2p::P2p;
|
||||
|
||||
use crate::{Db, TributaryTransactions, RemoveParticipant};
|
||||
use crate::{
|
||||
Db, TributaryTransactionsFromProcessorMessages, TributaryTransactionsFromDkgConfirmation,
|
||||
RemoveParticipant, dkg_confirmation::ConfirmDkgTask,
|
||||
};
|
||||
|
||||
create_db! {
|
||||
Coordinator {
|
||||
|
@ -172,6 +175,7 @@ async fn add_signed_unsigned_transaction<TD: DbTrait, P: P2p>(
|
|||
Ok(true | false) => {}
|
||||
// InvalidNonce may be out-of-order TXs, not invalid ones, but we only create nonce #n+1 after
|
||||
// on-chain inclusion of the TX with nonce #n, so it is invalid within our context
|
||||
// TODO: We need to handle publishing #n when #n already on-chain
|
||||
Err(
|
||||
TransactionError::TooLargeTransaction |
|
||||
TransactionError::InvalidSigner |
|
||||
|
@ -192,7 +196,7 @@ async fn add_signed_unsigned_transaction<TD: DbTrait, P: P2p>(
|
|||
true
|
||||
}
|
||||
|
||||
/// Adds all of the transactions sent via `TributaryTransactions`.
|
||||
/// Adds all of the transactions sent via `TributaryTransactionsFromProcessorMessages`.
|
||||
pub(crate) struct AddTributaryTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> {
|
||||
db: CD,
|
||||
tributary_db: TD,
|
||||
|
@ -210,7 +214,19 @@ impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for AddTributaryTransactio
|
|||
// Provide/add all transactions sent our way
|
||||
loop {
|
||||
let mut txn = self.db.txn();
|
||||
let Some(tx) = TributaryTransactions::try_recv(&mut txn, self.set.set) else { break };
|
||||
// This gives priority to DkgConfirmation as that will only yield transactions at the start
|
||||
// of the Tributary, ensuring this will be exhausted and yield to ProcessorMessages
|
||||
let tx = match TributaryTransactionsFromDkgConfirmation::try_recv(&mut txn, self.set.set) {
|
||||
Some(tx) => tx,
|
||||
None => {
|
||||
let Some(tx) =
|
||||
TributaryTransactionsFromProcessorMessages::try_recv(&mut txn, self.set.set)
|
||||
else {
|
||||
break;
|
||||
};
|
||||
tx
|
||||
}
|
||||
};
|
||||
|
||||
let kind = tx.kind();
|
||||
match kind {
|
||||
|
@ -399,6 +415,8 @@ async fn scan_on_new_block<CD: DbTrait, TD: DbTrait, P: P2p>(
|
|||
/// - Spawn the ScanTributaryTask
|
||||
/// - Spawn the ProvideCosignCosignedTransactionsTask
|
||||
/// - Spawn the TributaryProcessorMessagesTask
|
||||
/// - Spawn the AddTributaryTransactionsTask
|
||||
/// - Spawn the ConfirmDkgTask
|
||||
/// - Spawn the SignSlashReportTask
|
||||
/// - Iterate the scan task whenever a new block occurs (not just on the standard interval)
|
||||
pub(crate) async fn spawn_tributary<P: P2p>(
|
||||
|
@ -407,6 +425,7 @@ pub(crate) async fn spawn_tributary<P: P2p>(
|
|||
p2p: P,
|
||||
p2p_add_tributary: &mpsc::UnboundedSender<(ValidatorSet, Tributary<Db, Transaction, P>)>,
|
||||
set: NewSetInformation,
|
||||
serai: Arc<Serai>,
|
||||
serai_key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||
) {
|
||||
// Don't spawn retired Tributaries
|
||||
|
@ -485,30 +504,37 @@ pub(crate) async fn spawn_tributary<P: P2p>(
|
|||
.continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]),
|
||||
);
|
||||
|
||||
// Spawn the sign slash report task
|
||||
let (sign_slash_report_task_def, sign_slash_report_task) = Task::new();
|
||||
// Spawn the add transactions task
|
||||
let (add_tributary_transactions_task_def, add_tributary_transactions_task) = Task::new();
|
||||
tokio::spawn(
|
||||
(SignSlashReportTask {
|
||||
(AddTributaryTransactionsTask {
|
||||
db: db.clone(),
|
||||
tributary_db: tributary_db.clone(),
|
||||
tributary: tributary.clone(),
|
||||
set: set.clone(),
|
||||
key: serai_key.clone(),
|
||||
})
|
||||
.continually_run(sign_slash_report_task_def, vec![]),
|
||||
.continually_run(add_tributary_transactions_task_def, vec![]),
|
||||
);
|
||||
|
||||
// Spawn the add transactions task
|
||||
let (add_tributary_transactions_task_def, add_tributary_transactions_task) = Task::new();
|
||||
// Spawn the task to confirm the DKG result
|
||||
let (confirm_dkg_task_def, confirm_dkg_task) = Task::new();
|
||||
tokio::spawn(
|
||||
(AddTributaryTransactionsTask {
|
||||
ConfirmDkgTask::new(db.clone(), set.clone(), tributary_db.clone(), serai, serai_key.clone())
|
||||
.continually_run(confirm_dkg_task_def, vec![add_tributary_transactions_task]),
|
||||
);
|
||||
|
||||
// Spawn the sign slash report task
|
||||
let (sign_slash_report_task_def, sign_slash_report_task) = Task::new();
|
||||
tokio::spawn(
|
||||
(SignSlashReportTask {
|
||||
db: db.clone(),
|
||||
tributary_db,
|
||||
tributary: tributary.clone(),
|
||||
set: set.clone(),
|
||||
key: serai_key,
|
||||
})
|
||||
.continually_run(add_tributary_transactions_task_def, vec![]),
|
||||
.continually_run(sign_slash_report_task_def, vec![]),
|
||||
);
|
||||
|
||||
// Whenever a new block occurs, immediately run the scan task
|
||||
|
@ -520,10 +546,6 @@ pub(crate) async fn spawn_tributary<P: P2p>(
|
|||
set.set,
|
||||
tributary,
|
||||
scan_tributary_task,
|
||||
vec![
|
||||
provide_cosign_cosigned_transactions_task,
|
||||
sign_slash_report_task,
|
||||
add_tributary_transactions_task,
|
||||
],
|
||||
vec![provide_cosign_cosigned_transactions_task, confirm_dkg_task, sign_slash_report_task],
|
||||
));
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue