mirror of
synced 2025-03-12 09:26:51 +00:00
Txn for handling a processor message
handle_processor_messages function added to remove a very large block of nested code. MainDb cleaned to never be instantiated.
This commit is contained in:
2 changed files with 335 additions and 358 deletions
@ -1,3 +1,5 @@
use core::marker::PhantomData;
use scale::{Encode, Decode};
use serai_client::{primitives::NetworkId, in_instructions::primitives::SignedBatch};
@ -6,12 +8,8 @@ pub use serai_db::*;
use crate::tributary::TributarySpec;
pub struct MainDb<'a, D: Db>(&'a mut D);
impl<'a, D: Db> MainDb<'a, D> {
pub fn new(db: &'a mut D) -> Self {
pub struct MainDb<D: Db>(PhantomData<D>);
impl<D: Db> MainDb<D> {
fn main_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
D::key(b"coordinator_main", dst, key)
@ -29,8 +27,8 @@ impl<'a, D: Db> MainDb<'a, D> {
fn acive_tributaries_key() -> Vec<u8> {
Self::main_key(b"active_tributaries", [])
pub fn active_tributaries(&self) -> (Vec<u8>, Vec<TributarySpec>) {
let bytes = self.0.get(Self::acive_tributaries_key()).unwrap_or(vec![]);
pub fn active_tributaries<G: Get>(getter: &G) -> (Vec<u8>, Vec<TributarySpec>) {
let bytes = getter.get(Self::acive_tributaries_key()).unwrap_or(vec![]);
let mut bytes_ref: &[u8] = bytes.as_ref();
let mut tributaries = vec![];
@ -40,9 +38,9 @@ impl<'a, D: Db> MainDb<'a, D> {
(bytes, tributaries)
pub fn add_active_tributary(&mut self, spec: &TributarySpec) {
pub fn add_active_tributary(txn: &mut D::Transaction<'_>, spec: &TributarySpec) {
let key = Self::acive_tributaries_key();
let (mut existing_bytes, existing) = self.active_tributaries();
let (mut existing_bytes, existing) = Self::active_tributaries(txn);
for tributary in &existing {
if tributary == spec {
@ -50,9 +48,7 @@ impl<'a, D: Db> MainDb<'a, D> {
spec.write(&mut existing_bytes).unwrap();
let mut txn = self.0.txn();
txn.put(key, existing_bytes);
fn first_preprocess_key(id: [u8; 32]) -> Vec<u8> {
@ -73,14 +69,11 @@ impl<'a, D: Db> MainDb<'a, D> {
fn batch_key(network: NetworkId, id: u32) -> Vec<u8> {
Self::main_key(b"batch", (network, id).encode())
pub fn save_batch(&mut self, batch: SignedBatch) {
let mut txn = self.0.txn();
pub fn save_batch(txn: &mut D::Transaction<'_>, batch: SignedBatch) {
txn.put(Self::batch_key(batch.batch.network, batch.batch.id), batch.encode());
pub fn batch(&self, network: NetworkId, id: u32) -> Option<SignedBatch> {
pub fn batch<G: Get>(getter: &G, network: NetworkId, id: u32) -> Option<SignedBatch> {
.get(Self::batch_key(network, id))
.map(|batch| SignedBatch::decode(&mut batch.as_ref()).unwrap())
@ -99,7 +99,7 @@ async fn add_tributary<D: Db, Pro: Processors, P: P2p>(
.i(Ristretto::generator() * key.deref())
.expect("adding a tribtuary for a set we aren't in set for"),
.expect("adding a tributary for a set we aren't in set for"),
@ -173,7 +173,9 @@ pub async fn scan_substrate<D: Db, Pro: Processors>(
log::info!("creating new tributary for {:?}", spec.set());
// Save it to the database
let mut txn = db.txn();
MainDb::<D>::add_active_tributary(&mut txn, &spec);
// If we reboot before this is read, the fact it was saved to the database means it'll be
// handled on reboot
@ -513,6 +515,305 @@ pub async fn publish_signed_transaction<D: Db, P: P2p>(
async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
mut db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
serai: Arc<Serai>,
mut processors: Pro,
tributary: ActiveTributary<D, P>,
mut recv: mpsc::UnboundedReceiver<processors::Message>,
) {
let db_clone = db.clone(); // Enables cloning the DB while we have a txn
let pub_key = Ristretto::generator() * key.deref();
let ActiveTributary { spec, tributary } = tributary;
let genesis = spec.genesis();
loop {
let msg: processors::Message = recv.recv().await.unwrap();
if !MainDb::<D>::handled_message(&db, msg.id) {
let mut txn = db.txn();
// TODO: We probably want to NOP here, not panic?
// TODO: We do have to track produced Batches in order to ensure their integrity
let my_i = spec.i(pub_key).expect("processor message for network we aren't a validator in");
let tx = match msg.msg.clone() {
ProcessorMessage::KeyGen(inner_msg) => match inner_msg {
key_gen::ProcessorMessage::Commitments { id, commitments } => {
Some(Transaction::DkgCommitments(id.attempt, commitments, Transaction::empty_signed()))
key_gen::ProcessorMessage::Shares { id, mut shares } => {
// Create a MuSig-based machine to inform Substrate of this key generation
let nonces = crate::tributary::dkg_confirmation_nonces(&key, &spec, id.attempt);
let mut tx_shares = Vec::with_capacity(shares.len());
for i in 1 ..= spec.n() {
let i = Participant::new(i).unwrap();
if i == my_i {
shares.remove(&i).expect("processor didn't send share for another validator"),
Some(Transaction::DkgShares {
attempt: id.attempt,
shares: tx_shares,
confirmation_nonces: nonces,
signed: Transaction::empty_signed(),
key_gen::ProcessorMessage::GeneratedKeyPair { id, substrate_key, network_key } => {
id.set.network, msg.network,
"processor claimed to be a different network than it was for GeneratedKeyPair",
// TODO: Also check the other KeyGenId fields
// Tell the Tributary the key pair, get back the share for the MuSig
// signature
let share = crate::tributary::generated_key_pair::<D>(
&mut txn,
&(Public(substrate_key), network_key.try_into().unwrap()),
match share {
Ok(share) => {
Some(Transaction::DkgConfirmed(id.attempt, share, Transaction::empty_signed()))
Err(p) => {
todo!("participant {p:?} sent invalid DKG confirmation preprocesses")
ProcessorMessage::Sign(msg) => match msg {
sign::ProcessorMessage::Preprocess { id, preprocess } => {
if id.attempt == 0 {
MainDb::<D>::save_first_preprocess(&mut txn, id.id, preprocess);
} else {
Some(Transaction::SignPreprocess(SignData {
plan: id.id,
attempt: id.attempt,
data: preprocess,
signed: Transaction::empty_signed(),
sign::ProcessorMessage::Share { id, share } => Some(Transaction::SignShare(SignData {
plan: id.id,
attempt: id.attempt,
data: share,
signed: Transaction::empty_signed(),
sign::ProcessorMessage::Completed { key: _, id, tx } => {
let r = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng));
let R = <Ristretto as Ciphersuite>::generator() * r.deref();
let mut tx = Transaction::SignCompleted {
plan: id,
tx_hash: tx,
first_signer: pub_key,
signature: SchnorrSignature { R, s: <Ristretto as Ciphersuite>::F::ZERO },
let signed = SchnorrSignature::sign(&key, r, tx.sign_completed_challenge());
match &mut tx {
Transaction::SignCompleted { signature, .. } => {
*signature = signed;
_ => unreachable!(),
ProcessorMessage::Coordinator(inner_msg) => match inner_msg {
coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => {
network, msg.network,
"processor claimed to be a different network than it was for SubstrateBlockAck",
// TODO: This needs to be scoped per multisig
TributaryDb::<D>::set_plan_ids(&mut txn, genesis, block, &plans);
coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocess } => {
"informed of batch (sign ID {}, attempt {}) for block {}",
// If this is the first attempt instance, wait until we synchronize around
// the batch first
if id.attempt == 0 {
MainDb::<D>::save_first_preprocess(&mut txn, id.id, preprocess);
Some(Transaction::Batch(block.0, id.id))
} else {
Some(Transaction::BatchPreprocess(SignData {
plan: id.id,
attempt: id.attempt,
data: preprocess,
signed: Transaction::empty_signed(),
coordinator::ProcessorMessage::BatchShare { id, share } => {
Some(Transaction::BatchShare(SignData {
plan: id.id,
attempt: id.attempt,
data: share.to_vec(),
signed: Transaction::empty_signed(),
ProcessorMessage::Substrate(inner_msg) => match inner_msg {
processor_messages::substrate::ProcessorMessage::Update { batch } => {
batch.batch.network, msg.network,
"processor sent us a batch for a different network than it was for",
// TODO: Check this key's key pair's substrate key is authorized to publish
// batches
// Save this batch to the disk
MainDb::<D>::save_batch(&mut txn, batch);
Use a dedicated task to publish batches due to the latency potentially
This does not guarantee the batch has actually been published when the
message is `ack`ed to message-queue. Accordingly, if we reboot, these
batches would be dropped (as we wouldn't see the `Update` again, triggering
our re-attempt to publish).
The solution to this is to have the task try not to publish the batch which
caused it to be spawned, yet all saved batches which have yet to published.
This does risk having multiple tasks trying to publish all pending batches,
yet these aren't notably complex.
let db = db_clone.clone();
let serai = serai.clone();
let network = msg.network;
async move {
// Since we have a new batch, publish all batches yet to be published to
// Serai
// This handles the edge-case where batch n+1 is signed before batch n is
while let Some(batch) = {
// Get the next-to-execute batch ID
let next = {
let mut first = true;
loop {
if !first {
"{} {network:?}",
"couldn't connect to Serai node to get the next batch ID for",
first = false;
let Ok(latest_block) = serai.get_latest_block().await else {
let Ok(last) =
serai.get_last_batch_for_network(latest_block.hash(), network).await
else {
break if let Some(last) = last { last + 1 } else { 0 };
// If we have this batch, attempt to publish it
MainDb::<D>::batch(&db, network, next)
} {
let id = batch.batch.id;
let block = batch.batch.block;
let tx = Serai::execute_batch(batch);
// This publish may fail if this transactions already exists in the
// mempool, which is possible, or if this batch was already executed
// on-chain
// Either case will have eventual resolution and be handled by the
// above check on if this batch should execute
if serai.publish(&tx).await.is_ok() {
log::info!("published batch {network:?} {id} (block {})", hex::encode(block));
// If this created a transaction, publish it
// TODO: This block may be fired multiple times, with the Tributary maintaining its
// own txns. How safe is that?
if let Some(mut tx) = tx {
log::trace!("processor message effected transaction {}", hex::encode(tx.hash()));
match tx.kind() {
TransactionKind::Provided(_) => {
log::trace!("providing transaction {}", hex::encode(tx.hash()));
let res = tributary.provide_transaction(tx).await;
if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) {
panic!("provided an invalid transaction: {res:?}");
TransactionKind::Unsigned => {
log::trace!("publishing unsigned transaction {}", hex::encode(tx.hash()));
// Ignores the result since we can't differentiate already in-mempool from
// already on-chain from invalid
// TODO: Don't ignore the result
TransactionKind::Signed(_) => {
log::trace!("getting next nonce for Tributary TX in response to processor message");
let nonce = loop {
let Some(nonce) =
NonceDecider::<D>::nonce(&txn, genesis, &tx).expect("signed TX didn't have nonce")
else {
// This can be None if:
// 1) We scanned the relevant transaction(s) in a Tributary block
// 2) The processor was sent a message and responded
// 3) The Tributary TXN has yet to be committed
log::warn!("nonce has yet to be saved for processor-instigated transaction");
break nonce;
tx.sign(&mut OsRng, genesis, &key, nonce);
publish_signed_transaction(&tributary, tx).await;
MainDb::<D>::save_handled_message(&mut txn, msg.id);
pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
@ -520,9 +821,8 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
mut processors: Pro,
mut new_tributary: broadcast::Receiver<ActiveTributary<D, P>>,
) {
let pub_key = Ristretto::generator() * key.deref();
let channels = Arc::new(RwLock::new(HashMap::new()));
// Listen to new tributary events
let db = db.clone();
let processors = processors.clone();
@ -530,339 +830,27 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
async move {
loop {
let channels = channels.clone();
let ActiveTributary { spec, tributary } = new_tributary.recv().await.unwrap();
let genesis = spec.genesis();
let mut db = db.clone();
let key = key.clone();
let serai = serai.clone();
let mut processors = processors.clone();
async move {
let (send, mut recv) = mpsc::unbounded_channel();
// TODO: Support multisig rotation (not per-Tributary yet per-network?)
channels.write().await.insert(spec.set().network, send);
let tributary = new_tributary.recv().await.unwrap();
loop {
let msg: processors::Message = recv.recv().await.unwrap();
let (send, recv) = mpsc::unbounded_channel();
// TODO: Support multisig rotation (not per-Tributary yet per-network?)
channels.write().await.insert(tributary.spec.set().network, send);
// TODO: We probably want to NOP here, not panic?
// TODO: We do have to track produced Batches in order to ensure their integrity
let my_i =
spec.i(pub_key).expect("processor message for network we aren't a validator in");
let tx = match msg.msg.clone() {
ProcessorMessage::KeyGen(inner_msg) => match inner_msg {
key_gen::ProcessorMessage::Commitments { id, commitments } => {
key_gen::ProcessorMessage::Shares { id, mut shares } => {
// Create a MuSig-based machine to inform Substrate of this key generation
let nonces = crate::tributary::dkg_confirmation_nonces(&key, &spec, id.attempt);
let mut tx_shares = Vec::with_capacity(shares.len());
for i in 1 ..= spec.n() {
let i = Participant::new(i).unwrap();
if i == my_i {
.expect("processor didn't send share for another validator"),
Some(Transaction::DkgShares {
attempt: id.attempt,
shares: tx_shares,
confirmation_nonces: nonces,
signed: Transaction::empty_signed(),
key_gen::ProcessorMessage::GeneratedKeyPair {
} => {
id.set.network, msg.network,
"processor claimed to be a different network than it was for GeneratedKeyPair",
// TODO: Also check the other KeyGenId fields
// Tell the Tributary the key pair, get back the share for the MuSig signature
let mut txn = db.txn();
let share = crate::tributary::generated_key_pair::<D>(
&mut txn,
&(Public(substrate_key), network_key.try_into().unwrap()),
match share {
Ok(share) => Some(Transaction::DkgConfirmed(
Err(p) => {
todo!("participant {p:?} sent invalid DKG confirmation preprocesses")
ProcessorMessage::Sign(msg) => match msg {
sign::ProcessorMessage::Preprocess { id, preprocess } => {
if id.attempt == 0 {
let mut txn = db.txn();
MainDb::<D>::save_first_preprocess(&mut txn, id.id, preprocess);
} else {
Some(Transaction::SignPreprocess(SignData {
plan: id.id,
attempt: id.attempt,
data: preprocess,
signed: Transaction::empty_signed(),
sign::ProcessorMessage::Share { id, share } => {
Some(Transaction::SignShare(SignData {
plan: id.id,
attempt: id.attempt,
data: share,
signed: Transaction::empty_signed(),
sign::ProcessorMessage::Completed { key: _, id, tx } => {
let r = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng));
let R = <Ristretto as Ciphersuite>::generator() * r.deref();
let mut tx = Transaction::SignCompleted {
plan: id,
tx_hash: tx,
first_signer: pub_key,
signature: SchnorrSignature { R, s: <Ristretto as Ciphersuite>::F::ZERO },
let signed = SchnorrSignature::sign(&key, r, tx.sign_completed_challenge());
match &mut tx {
Transaction::SignCompleted { signature, .. } => {
*signature = signed;
_ => unreachable!(),
ProcessorMessage::Coordinator(inner_msg) => match inner_msg {
coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => {
network, msg.network,
"processor claimed to be a different network than it was for SubstrateBlockAck",
// Safe to use its own txn since this is static and just needs to be written
// before we provide SubstrateBlock
let mut txn = db.txn();
// TODO: This needs to be scoped per multisig
TributaryDb::<D>::set_plan_ids(&mut txn, genesis, block, &plans);
coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocess } => {
"informed of batch (sign ID {}, attempt {}) for block {}",
// If this is the first attempt instance, wait until we synchronize around the
// batch first
if id.attempt == 0 {
// Save the preprocess to disk so we can publish it later
// This is fine to use its own TX since it's static and just needs to be
// written before this message finishes it handling (or with this message's
// finished handling)
let mut txn = db.txn();
MainDb::<D>::save_first_preprocess(&mut txn, id.id, preprocess);
Some(Transaction::Batch(block.0, id.id))
} else {
Some(Transaction::BatchPreprocess(SignData {
plan: id.id,
attempt: id.attempt,
data: preprocess,
signed: Transaction::empty_signed(),
coordinator::ProcessorMessage::BatchShare { id, share } => {
Some(Transaction::BatchShare(SignData {
plan: id.id,
attempt: id.attempt,
data: share.to_vec(),
signed: Transaction::empty_signed(),
ProcessorMessage::Substrate(inner_msg) => match inner_msg {
processor_messages::substrate::ProcessorMessage::Update { batch } => {
batch.batch.network, msg.network,
"processor sent us a batch for a different network than it was for",
// TODO: Check this key's key pair's substrate key is authorized to publish
// batches
// Save this batch to the disk
MainDb::new(&mut db).save_batch(batch);
Use a dedicated task to publish batches due to the latency potentially
This does not guarantee the batch has actually been published when the
message is `ack`ed to message-queue. Accordingly, if we reboot, these batches
would be dropped (as we wouldn't see the `Update` again, triggering our
re-attempt to publish).
The solution to this is to have the task try not to publish the batch which
caused it to be spawned, yet all saved batches which have yet to published.
This does risk having multiple tasks trying to publish all pending batches,
yet these aren't notably complex.
let mut db = db.clone();
let serai = serai.clone();
let network = msg.network;
async move {
// Since we have a new batch, publish all batches yet to be published to
// Serai
// This handles the edge-case where batch n+1 is signed before batch n is
while let Some(batch) = {
// Get the next-to-execute batch ID
let next = {
let mut first = true;
loop {
if !first {
"{} {network:?}",
"couldn't connect to Serai node to get the next batch ID for",
first = false;
let Ok(latest_block) = serai.get_latest_block().await else {
let Ok(last) = serai
.get_last_batch_for_network(latest_block.hash(), network)
else {
break if let Some(last) = last { last + 1 } else { 0 };
// If we have this batch, attempt to publish it
MainDb::new(&mut db).batch(network, next)
} {
let id = batch.batch.id;
let block = batch.batch.block;
let tx = Serai::execute_batch(batch);
// This publish may fail if this transactions already exists in the
// mempool, which is possible, or if this batch was already executed
// on-chain
// Either case will have eventual resolution and be handled by the above
// check on if this batch should execute
if serai.publish(&tx).await.is_ok() {
"published batch {network:?} {id} (block {})",
// If this created a transaction, publish it
if let Some(mut tx) = tx {
log::trace!("processor message effected transaction {}", hex::encode(tx.hash()));
match tx.kind() {
TransactionKind::Provided(_) => {
log::trace!("providing transaction {}", hex::encode(tx.hash()));
let res = tributary.provide_transaction(tx).await;
if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) {
panic!("provided an invalid transaction: {res:?}");
TransactionKind::Unsigned => {
log::trace!("publishing unsigned transaction {}", hex::encode(tx.hash()));
// Ignores the result since we can't differentiate already in-mempool from
// already on-chain from invalid
// TODO: Don't ignore the result
TransactionKind::Signed(_) => {
"getting next nonce for Tributary TX in response to processor message"
let nonce = loop {
let Some(nonce) = NonceDecider::<D>::nonce(&db, genesis, &tx)
.expect("signed TX didn't have nonce")
else {
// This can be None if:
// 1) We scanned the relevant transaction(s) in a Tributary block
// 2) The processor was sent a message and responded
// 3) The Tributary TXN has yet to be committed
"nonce has yet to be saved for processor-instigated transaction"
break nonce;
tx.sign(&mut OsRng, genesis, &key, nonce);
publish_signed_transaction(&tributary, tx).await;
// TODO: Consider a global txn for this message?
let mut txn = db.txn();
MainDb::<'static, D>::save_handled_message(&mut txn, msg.id);
// For each new tributary, spawn a dedicated task to handle its messages from the processor
// TODO: Redo per network, not per tributary
// Dispatch task
let mut last_msg = None;
loop {
// TODO: We dispatch this to an async task per-processor, yet we don't move to the next message
@ -871,10 +859,6 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
// Alternatively, a peek method with local delineation of handled messages would work.
let msg = processors.recv().await;
if MainDb::<'static, D>::handled_message(&db, msg.id) {
if last_msg == Some(msg.id) {
@ -892,7 +876,7 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
pub async fn run<D: Db, Pro: Processors, P: P2p>(
mut raw_db: D,
raw_db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
p2p: P,
processors: Pro,
@ -902,7 +886,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
let (new_tributary_spec_send, mut new_tributary_spec_recv) = mpsc::unbounded_channel();
// Reload active tributaries from the database
for spec in MainDb::new(&mut raw_db).active_tributaries().1 {
for spec in MainDb::<D>::active_tributaries(&raw_db).1 {
@ -975,13 +959,13 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
let key = key.clone();
let tributaries = tributaries.clone();
async move {
// SubstrateBlockAck is fired before Preprocess, creating a race between Tributary ack
// of the SubstrateBlock and the sending of all Preprocesses
// The transactions for these are fired before the preprocesses are actually
// received/saved, creating a race between Tributary ack and the availability of all
// Preprocesses
// This waits until the necessary preprocess is available
let get_preprocess = |raw_db, id| async move {
loop {
let Some(preprocess) = MainDb::<D>::first_preprocess(raw_db, id) else {
assert_eq!(id_type, RecognizedIdType::Plan);
Reference in a new issue