From b91bd44476c357dca2b7c3e53fb8d5d9861be546 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 24 Aug 2023 19:06:22 -0400 Subject: [PATCH] Support multiple batches per block by the coordinator Also corrects an assumption block hash == batch ID. --- coordinator/src/db.rs | 39 ++++++++++++++++++++++ coordinator/src/main.rs | 49 ++++++++++++++++------------ coordinator/src/tributary/handle.rs | 7 ++-- coordinator/src/tributary/scanner.rs | 8 +++-- 4 files changed, 77 insertions(+), 26 deletions(-) diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index 48affa58..aa4a39e0 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -1,3 +1,6 @@ +use scale::Encode; +use serai_client::primitives::{NetworkId, BlockHash}; + pub use serai_db::*; use crate::tributary::TributarySpec; @@ -42,6 +45,42 @@ impl<'a, D: Db> MainDb<'a, D> { txn.commit(); } + fn batches_in_block_key(network: NetworkId, block: [u8; 32]) -> Vec { + Self::main_key(b"batches_in_block", (network, block).encode()) + } + pub fn batches_in_block( + getter: &G, + network: NetworkId, + block: [u8; 32], + ) -> Vec<[u8; 32]> { + getter + .get(Self::batches_in_block_key(network, block)) + .expect("asking for batches in block for block without batches") + .chunks(32) + .map(|id| id.try_into().unwrap()) + .collect() + } + pub fn add_batch_to_block( + txn: &mut D::Transaction<'_>, + network: NetworkId, + block: BlockHash, + id: [u8; 32], + ) { + let key = Self::batches_in_block_key(network, block.0); + let Some(mut existing) = txn.get(&key) else { + txn.put(&key, block.0); + return; + }; + + if existing.chunks(32).any(|existing_id| existing_id == id) { + // TODO: Is this an invariant? + return; + } + + existing.extend(block.0); + txn.put(&key, existing); + } + fn first_preprocess_key(id: [u8; 32]) -> Vec { Self::main_key(b"first_preprocess", id) } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index a664b49c..eb5e237d 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -17,7 +17,7 @@ use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto}; use serai_db::{DbTxn, Db}; use serai_env as env; -use serai_client::{Public, Serai}; +use serai_client::{primitives::NetworkId, Public, Serai}; use message_queue::{Service, client::MessageQueue}; @@ -146,7 +146,7 @@ pub async fn scan_substrate( pub async fn scan_tributaries( raw_db: D, key: Zeroizing<::F>, - recognized_id_send: UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>, + recognized_id_send: UnboundedSender<(NetworkId, [u8; 32], RecognizedIdType, [u8; 32])>, p2p: P, processors: Pro, serai: Arc, @@ -544,6 +544,7 @@ pub async fn handle_processors( // before this message finishes it handling (or with this message's finished handling) let mut txn = db.txn(); MainDb::::save_first_preprocess(&mut txn, id.id, preprocess); + MainDb::::add_batch_to_block(&mut txn, msg.network, block, id.id); txn.commit(); // TODO: This will publish one ExternalBlock per Batch. We should only publish one per @@ -701,21 +702,27 @@ pub async fn run( let tributaries = tributaries.clone(); async move { loop { - if let Some((genesis, id_type, id)) = recognized_id_recv.recv().await { - let mut tx = match id_type { - RecognizedIdType::Block => Transaction::BatchPreprocess(SignData { - plan: id, - attempt: 0, - data: MainDb::::first_preprocess(&raw_db, id), - signed: Transaction::empty_signed(), - }), + if let Some((network, genesis, id_type, id)) = recognized_id_recv.recv().await { + let txs = match id_type { + RecognizedIdType::Block => { + let mut txs = vec![]; + for id in MainDb::::batches_in_block(&raw_db, network, id) { + txs.push(Transaction::BatchPreprocess(SignData { + plan: id, + attempt: 0, + data: MainDb::::first_preprocess(&raw_db, id), + signed: Transaction::empty_signed(), + })); + } + txs + } - RecognizedIdType::Plan => Transaction::SignPreprocess(SignData { + RecognizedIdType::Plan => vec![Transaction::SignPreprocess(SignData { plan: id, attempt: 0, data: MainDb::::first_preprocess(&raw_db, id), signed: Transaction::empty_signed(), - }), + })], }; let tributaries = tributaries.read().await; @@ -724,15 +731,17 @@ pub async fn run( }; let tributary = tributary.tributary.read().await; - // TODO: Same note as prior nonce acquisition - log::trace!("getting next nonce for Tributary TX containing Batch signing data"); - let nonce = tributary - .next_nonce(Ristretto::generator() * key.deref()) - .await - .expect("publishing a TX to a tributary we aren't in"); - tx.sign(&mut OsRng, genesis, &key, nonce); + for mut tx in txs { + // TODO: Same note as prior nonce acquisition + log::trace!("getting next nonce for Tributary TX containing Batch signing data"); + let nonce = tributary + .next_nonce(Ristretto::generator() * key.deref()) + .await + .expect("publishing a TX to a tributary we aren't in"); + tx.sign(&mut OsRng, genesis, &key, nonce); - publish_transaction(&tributary, tx).await; + publish_transaction(&tributary, tx).await; + } } else { log::warn!("recognized_id_send was dropped. are we shutting down?"); break; diff --git a/coordinator/src/tributary/handle.rs b/coordinator/src/tributary/handle.rs index ed033f73..86611b1c 100644 --- a/coordinator/src/tributary/handle.rs +++ b/coordinator/src/tributary/handle.rs @@ -19,6 +19,7 @@ use tokio::sync::mpsc::UnboundedSender; use serai_client::{ Signature, + primitives::NetworkId, validator_sets::primitives::{ValidatorSet, KeyPair, musig_context, set_keys_message}, subxt::utils::Encoded, Serai, @@ -232,7 +233,7 @@ pub async fn handle_application_tx< publish_serai_tx: PST, genesis: [u8; 32], key: &Zeroizing<::F>, - recognized_id: &UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>, + recognized_id: &UnboundedSender<(NetworkId, [u8; 32], RecognizedIdType, [u8; 32])>, txn: &mut ::Transaction<'_>, ) { // Used to determine if an ID is acceptable @@ -433,7 +434,7 @@ pub async fn handle_application_tx< // Because this external block has been finalized, its batch ID should be authorized TributaryDb::::recognize_id(txn, Zone::Batch.label(), genesis, block); recognized_id - .send((genesis, RecognizedIdType::Block, block)) + .send((spec.set().network, genesis, RecognizedIdType::Block, block)) .expect("recognized_id_recv was dropped. are we shutting down?"); } @@ -446,7 +447,7 @@ pub async fn handle_application_tx< for id in plan_ids { TributaryDb::::recognize_id(txn, Zone::Sign.label(), genesis, id); recognized_id - .send((genesis, RecognizedIdType::Plan, id)) + .send((spec.set().network, genesis, RecognizedIdType::Plan, id)) .expect("recognized_id_recv was dropped. are we shutting down?"); } } diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index acd3abb6..a6894e8a 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -4,7 +4,9 @@ use zeroize::Zeroizing; use ciphersuite::{Ciphersuite, Ristretto}; -use serai_client::{validator_sets::primitives::ValidatorSet, subxt::utils::Encoded}; +use serai_client::{ + primitives::NetworkId, validator_sets::primitives::ValidatorSet, subxt::utils::Encoded, +}; use tokio::sync::mpsc::UnboundedSender; @@ -43,7 +45,7 @@ async fn handle_block< >( db: &mut TributaryDb, key: &Zeroizing<::F>, - recognized_id: &UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>, + recognized_id: &UnboundedSender<(NetworkId, [u8; 32], RecognizedIdType, [u8; 32])>, processors: &Pro, publish_serai_tx: PST, spec: &TributarySpec, @@ -109,7 +111,7 @@ pub async fn handle_new_blocks< >( db: &mut TributaryDb, key: &Zeroizing<::F>, - recognized_id: &UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>, + recognized_id: &UnboundedSender<(NetworkId, [u8; 32], RecognizedIdType, [u8; 32])>, processors: &Pro, publish_serai_tx: PST, spec: &TributarySpec,