mirror of
https://github.com/serai-dex/serai.git
synced 2025-03-16 16:42:03 +00:00
Support multiple batches per block by the coordinator
Also corrects an assumption block hash == batch ID.
This commit is contained in:
parent
dc2656a538
commit
b91bd44476
4 changed files with 77 additions and 26 deletions
|
@ -1,3 +1,6 @@
|
||||||
|
use scale::Encode;
|
||||||
|
use serai_client::primitives::{NetworkId, BlockHash};
|
||||||
|
|
||||||
pub use serai_db::*;
|
pub use serai_db::*;
|
||||||
|
|
||||||
use crate::tributary::TributarySpec;
|
use crate::tributary::TributarySpec;
|
||||||
|
@ -42,6 +45,42 @@ impl<'a, D: Db> MainDb<'a, D> {
|
||||||
txn.commit();
|
txn.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn batches_in_block_key(network: NetworkId, block: [u8; 32]) -> Vec<u8> {
|
||||||
|
Self::main_key(b"batches_in_block", (network, block).encode())
|
||||||
|
}
|
||||||
|
pub fn batches_in_block<G: Get>(
|
||||||
|
getter: &G,
|
||||||
|
network: NetworkId,
|
||||||
|
block: [u8; 32],
|
||||||
|
) -> Vec<[u8; 32]> {
|
||||||
|
getter
|
||||||
|
.get(Self::batches_in_block_key(network, block))
|
||||||
|
.expect("asking for batches in block for block without batches")
|
||||||
|
.chunks(32)
|
||||||
|
.map(|id| id.try_into().unwrap())
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
pub fn add_batch_to_block(
|
||||||
|
txn: &mut D::Transaction<'_>,
|
||||||
|
network: NetworkId,
|
||||||
|
block: BlockHash,
|
||||||
|
id: [u8; 32],
|
||||||
|
) {
|
||||||
|
let key = Self::batches_in_block_key(network, block.0);
|
||||||
|
let Some(mut existing) = txn.get(&key) else {
|
||||||
|
txn.put(&key, block.0);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if existing.chunks(32).any(|existing_id| existing_id == id) {
|
||||||
|
// TODO: Is this an invariant?
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
existing.extend(block.0);
|
||||||
|
txn.put(&key, existing);
|
||||||
|
}
|
||||||
|
|
||||||
fn first_preprocess_key(id: [u8; 32]) -> Vec<u8> {
|
fn first_preprocess_key(id: [u8; 32]) -> Vec<u8> {
|
||||||
Self::main_key(b"first_preprocess", id)
|
Self::main_key(b"first_preprocess", id)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto};
|
||||||
use serai_db::{DbTxn, Db};
|
use serai_db::{DbTxn, Db};
|
||||||
use serai_env as env;
|
use serai_env as env;
|
||||||
|
|
||||||
use serai_client::{Public, Serai};
|
use serai_client::{primitives::NetworkId, Public, Serai};
|
||||||
|
|
||||||
use message_queue::{Service, client::MessageQueue};
|
use message_queue::{Service, client::MessageQueue};
|
||||||
|
|
||||||
|
@ -146,7 +146,7 @@ pub async fn scan_substrate<D: Db, Pro: Processors>(
|
||||||
pub async fn scan_tributaries<D: Db, Pro: Processors, P: P2p>(
|
pub async fn scan_tributaries<D: Db, Pro: Processors, P: P2p>(
|
||||||
raw_db: D,
|
raw_db: D,
|
||||||
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||||
recognized_id_send: UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>,
|
recognized_id_send: UnboundedSender<(NetworkId, [u8; 32], RecognizedIdType, [u8; 32])>,
|
||||||
p2p: P,
|
p2p: P,
|
||||||
processors: Pro,
|
processors: Pro,
|
||||||
serai: Arc<Serai>,
|
serai: Arc<Serai>,
|
||||||
|
@ -544,6 +544,7 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
|
||||||
// before this message finishes it handling (or with this message's finished handling)
|
// before this message finishes it handling (or with this message's finished handling)
|
||||||
let mut txn = db.txn();
|
let mut txn = db.txn();
|
||||||
MainDb::<D>::save_first_preprocess(&mut txn, id.id, preprocess);
|
MainDb::<D>::save_first_preprocess(&mut txn, id.id, preprocess);
|
||||||
|
MainDb::<D>::add_batch_to_block(&mut txn, msg.network, block, id.id);
|
||||||
txn.commit();
|
txn.commit();
|
||||||
|
|
||||||
// TODO: This will publish one ExternalBlock per Batch. We should only publish one per
|
// TODO: This will publish one ExternalBlock per Batch. We should only publish one per
|
||||||
|
@ -701,21 +702,27 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
|
||||||
let tributaries = tributaries.clone();
|
let tributaries = tributaries.clone();
|
||||||
async move {
|
async move {
|
||||||
loop {
|
loop {
|
||||||
if let Some((genesis, id_type, id)) = recognized_id_recv.recv().await {
|
if let Some((network, genesis, id_type, id)) = recognized_id_recv.recv().await {
|
||||||
let mut tx = match id_type {
|
let txs = match id_type {
|
||||||
RecognizedIdType::Block => Transaction::BatchPreprocess(SignData {
|
RecognizedIdType::Block => {
|
||||||
plan: id,
|
let mut txs = vec![];
|
||||||
attempt: 0,
|
for id in MainDb::<D>::batches_in_block(&raw_db, network, id) {
|
||||||
data: MainDb::<D>::first_preprocess(&raw_db, id),
|
txs.push(Transaction::BatchPreprocess(SignData {
|
||||||
signed: Transaction::empty_signed(),
|
plan: id,
|
||||||
}),
|
attempt: 0,
|
||||||
|
data: MainDb::<D>::first_preprocess(&raw_db, id),
|
||||||
|
signed: Transaction::empty_signed(),
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
txs
|
||||||
|
}
|
||||||
|
|
||||||
RecognizedIdType::Plan => Transaction::SignPreprocess(SignData {
|
RecognizedIdType::Plan => vec![Transaction::SignPreprocess(SignData {
|
||||||
plan: id,
|
plan: id,
|
||||||
attempt: 0,
|
attempt: 0,
|
||||||
data: MainDb::<D>::first_preprocess(&raw_db, id),
|
data: MainDb::<D>::first_preprocess(&raw_db, id),
|
||||||
signed: Transaction::empty_signed(),
|
signed: Transaction::empty_signed(),
|
||||||
}),
|
})],
|
||||||
};
|
};
|
||||||
|
|
||||||
let tributaries = tributaries.read().await;
|
let tributaries = tributaries.read().await;
|
||||||
|
@ -724,15 +731,17 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
|
||||||
};
|
};
|
||||||
let tributary = tributary.tributary.read().await;
|
let tributary = tributary.tributary.read().await;
|
||||||
|
|
||||||
// TODO: Same note as prior nonce acquisition
|
for mut tx in txs {
|
||||||
log::trace!("getting next nonce for Tributary TX containing Batch signing data");
|
// TODO: Same note as prior nonce acquisition
|
||||||
let nonce = tributary
|
log::trace!("getting next nonce for Tributary TX containing Batch signing data");
|
||||||
.next_nonce(Ristretto::generator() * key.deref())
|
let nonce = tributary
|
||||||
.await
|
.next_nonce(Ristretto::generator() * key.deref())
|
||||||
.expect("publishing a TX to a tributary we aren't in");
|
.await
|
||||||
tx.sign(&mut OsRng, genesis, &key, nonce);
|
.expect("publishing a TX to a tributary we aren't in");
|
||||||
|
tx.sign(&mut OsRng, genesis, &key, nonce);
|
||||||
|
|
||||||
publish_transaction(&tributary, tx).await;
|
publish_transaction(&tributary, tx).await;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
log::warn!("recognized_id_send was dropped. are we shutting down?");
|
log::warn!("recognized_id_send was dropped. are we shutting down?");
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -19,6 +19,7 @@ use tokio::sync::mpsc::UnboundedSender;
|
||||||
|
|
||||||
use serai_client::{
|
use serai_client::{
|
||||||
Signature,
|
Signature,
|
||||||
|
primitives::NetworkId,
|
||||||
validator_sets::primitives::{ValidatorSet, KeyPair, musig_context, set_keys_message},
|
validator_sets::primitives::{ValidatorSet, KeyPair, musig_context, set_keys_message},
|
||||||
subxt::utils::Encoded,
|
subxt::utils::Encoded,
|
||||||
Serai,
|
Serai,
|
||||||
|
@ -232,7 +233,7 @@ pub async fn handle_application_tx<
|
||||||
publish_serai_tx: PST,
|
publish_serai_tx: PST,
|
||||||
genesis: [u8; 32],
|
genesis: [u8; 32],
|
||||||
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
|
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||||
recognized_id: &UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>,
|
recognized_id: &UnboundedSender<(NetworkId, [u8; 32], RecognizedIdType, [u8; 32])>,
|
||||||
txn: &mut <D as Db>::Transaction<'_>,
|
txn: &mut <D as Db>::Transaction<'_>,
|
||||||
) {
|
) {
|
||||||
// Used to determine if an ID is acceptable
|
// Used to determine if an ID is acceptable
|
||||||
|
@ -433,7 +434,7 @@ pub async fn handle_application_tx<
|
||||||
// Because this external block has been finalized, its batch ID should be authorized
|
// Because this external block has been finalized, its batch ID should be authorized
|
||||||
TributaryDb::<D>::recognize_id(txn, Zone::Batch.label(), genesis, block);
|
TributaryDb::<D>::recognize_id(txn, Zone::Batch.label(), genesis, block);
|
||||||
recognized_id
|
recognized_id
|
||||||
.send((genesis, RecognizedIdType::Block, block))
|
.send((spec.set().network, genesis, RecognizedIdType::Block, block))
|
||||||
.expect("recognized_id_recv was dropped. are we shutting down?");
|
.expect("recognized_id_recv was dropped. are we shutting down?");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -446,7 +447,7 @@ pub async fn handle_application_tx<
|
||||||
for id in plan_ids {
|
for id in plan_ids {
|
||||||
TributaryDb::<D>::recognize_id(txn, Zone::Sign.label(), genesis, id);
|
TributaryDb::<D>::recognize_id(txn, Zone::Sign.label(), genesis, id);
|
||||||
recognized_id
|
recognized_id
|
||||||
.send((genesis, RecognizedIdType::Plan, id))
|
.send((spec.set().network, genesis, RecognizedIdType::Plan, id))
|
||||||
.expect("recognized_id_recv was dropped. are we shutting down?");
|
.expect("recognized_id_recv was dropped. are we shutting down?");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,9 @@ use zeroize::Zeroizing;
|
||||||
|
|
||||||
use ciphersuite::{Ciphersuite, Ristretto};
|
use ciphersuite::{Ciphersuite, Ristretto};
|
||||||
|
|
||||||
use serai_client::{validator_sets::primitives::ValidatorSet, subxt::utils::Encoded};
|
use serai_client::{
|
||||||
|
primitives::NetworkId, validator_sets::primitives::ValidatorSet, subxt::utils::Encoded,
|
||||||
|
};
|
||||||
|
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
|
|
||||||
|
@ -43,7 +45,7 @@ async fn handle_block<
|
||||||
>(
|
>(
|
||||||
db: &mut TributaryDb<D>,
|
db: &mut TributaryDb<D>,
|
||||||
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
|
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||||
recognized_id: &UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>,
|
recognized_id: &UnboundedSender<(NetworkId, [u8; 32], RecognizedIdType, [u8; 32])>,
|
||||||
processors: &Pro,
|
processors: &Pro,
|
||||||
publish_serai_tx: PST,
|
publish_serai_tx: PST,
|
||||||
spec: &TributarySpec,
|
spec: &TributarySpec,
|
||||||
|
@ -109,7 +111,7 @@ pub async fn handle_new_blocks<
|
||||||
>(
|
>(
|
||||||
db: &mut TributaryDb<D>,
|
db: &mut TributaryDb<D>,
|
||||||
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
|
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||||
recognized_id: &UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>,
|
recognized_id: &UnboundedSender<(NetworkId, [u8; 32], RecognizedIdType, [u8; 32])>,
|
||||||
processors: &Pro,
|
processors: &Pro,
|
||||||
publish_serai_tx: PST,
|
publish_serai_tx: PST,
|
||||||
spec: &TributarySpec,
|
spec: &TributarySpec,
|
||||||
|
|
Loading…
Reference in a new issue