diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index a242bde4..ddb0bbfe 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -34,8 +34,9 @@ use ::tributary::{ }; mod tributary; -#[rustfmt::skip] -use crate::tributary::{TributarySpec, SignData, Transaction, TributaryDb, scanner::RecognizedIdType}; +use crate::tributary::{ + TributarySpec, SignData, Transaction, TributaryDb, NonceDecider, scanner::RecognizedIdType, +}; mod db; use db::MainDb; @@ -186,7 +187,7 @@ pub async fn scan_tributaries< Pro: Processors, P: P2p, FRid: Future, - RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32]) -> FRid, + RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid, >( raw_db: D, key: Zeroizing<::F>, @@ -455,6 +456,7 @@ pub async fn publish_transaction( ) { log::debug!("publishing transaction {}", hex::encode(tx.hash())); if let TransactionKind::Signed(signed) = tx.kind() { + // TODO: What if we try to publish TX with a nonce of 5 when the blockchain only has 3? if tributary .next_nonce(signed.signer) .await @@ -610,6 +612,7 @@ pub async fn handle_processors( // Safe to use its own txn since this is static and just needs to be written before we // provide SubstrateBlock let mut txn = db.txn(); + // TODO: This needs to be scoped per multisig TributaryDb::::set_plan_ids(&mut txn, genesis, block, &plans); txn.commit(); @@ -756,23 +759,29 @@ pub async fn handle_processors( tributary.add_transaction(tx).await; } TransactionKind::Signed(_) => { - // Get the next nonce - // TODO: This should be deterministic, not just DB-backed, to allow rebuilding validators - // without the prior instance's DB - // let mut txn = db.txn(); - // let nonce = MainDb::tx_nonce(&mut txn, msg.id, tributary); - - // TODO: This isn't deterministic, or at least DB-backed, and accordingly is unsafe log::trace!("getting next nonce for Tributary TX in response to processor message"); - let nonce = tributary - .next_nonce(Ristretto::generator() * key.deref()) - .await - .expect("publishing a TX to a tributary we aren't in"); + + let nonce = loop { + let Some(nonce) = + NonceDecider::::nonce(&db, genesis, &tx).expect("signed TX didn't have nonce") + else { + // This can be None if: + // 1) We scanned the relevant transaction(s) in a Tributary block + // 2) The processor was sent a message and responded + // 3) The Tributary TXN has yet to be committed + log::warn!("nonce has yet to be saved for processor-instigated transaction"); + sleep(Duration::from_millis(100)).await; + continue; + }; + break nonce; + }; tx.sign(&mut OsRng, genesis, &key, nonce); + let Some(tributary) = tributaries.get(&genesis) else { + panic!("tributary we don't have came to consensus on an Batch"); + }; + let tributary = tributary.tributary.read().await; publish_transaction(&tributary, tx).await; - - // txn.commit(); } } } @@ -816,7 +825,7 @@ pub async fn run( let raw_db = raw_db.clone(); let key = key.clone(); let tributaries = tributaries.clone(); - move |network, genesis, id_type, id| { + move |network, genesis, id_type, id, nonce| { let raw_db = raw_db.clone(); let key = key.clone(); let tributaries = tributaries.clone(); @@ -851,20 +860,13 @@ pub async fn run( }), }; + tx.sign(&mut OsRng, genesis, &key, nonce); + let tributaries = tributaries.read().await; let Some(tributary) = tributaries.get(&genesis) else { panic!("tributary we don't have came to consensus on an Batch"); }; let tributary = tributary.tributary.read().await; - - // TODO: Same note as prior nonce acquisition - log::trace!("getting next nonce for Tributary TX containing Batch signing data"); - let nonce = tributary - .next_nonce(Ristretto::generator() * key.deref()) - .await - .expect("publishing a TX to a tributary we aren't in"); - tx.sign(&mut OsRng, genesis, &key, nonce); - publish_transaction(&tributary, tx).await; } } diff --git a/coordinator/src/tests/tributary/dkg.rs b/coordinator/src/tests/tributary/dkg.rs index 1da472fa..874a7895 100644 --- a/coordinator/src/tests/tributary/dkg.rs +++ b/coordinator/src/tests/tributary/dkg.rs @@ -86,7 +86,7 @@ async fn dkg_test() { handle_new_blocks::<_, _, _, _, _, _, LocalP2p>( &mut scanner_db, key, - |_, _, _, _| async { + |_, _, _, _, _| async { panic!("provided TX caused recognized_id to be called in new_processors") }, &processors, @@ -112,7 +112,7 @@ async fn dkg_test() { handle_new_blocks::<_, _, _, _, _, _, LocalP2p>( &mut scanner_db, &keys[0], - |_, _, _, _| async { + |_, _, _, _, _| async { panic!("provided TX caused recognized_id to be called after Commitments") }, &processors, @@ -191,7 +191,7 @@ async fn dkg_test() { handle_new_blocks::<_, _, _, _, _, _, LocalP2p>( &mut scanner_db, &keys[0], - |_, _, _, _| async { + |_, _, _, _, _| async { panic!("provided TX caused recognized_id to be called after some shares") }, &processors, @@ -239,7 +239,7 @@ async fn dkg_test() { handle_new_blocks::<_, _, _, _, _, _, LocalP2p>( &mut scanner_db, &keys[0], - |_, _, _, _| async { panic!("provided TX caused recognized_id to be called after shares") }, + |_, _, _, _, _| async { panic!("provided TX caused recognized_id to be called after shares") }, &processors, |_, _| async { panic!("test tried to publish a new Serai TX") }, &spec, @@ -306,7 +306,7 @@ async fn dkg_test() { handle_new_blocks::<_, _, _, _, _, _, LocalP2p>( &mut scanner_db, &keys[0], - |_, _, _, _| async { + |_, _, _, _, _| async { panic!("provided TX caused recognized_id to be called after DKG confirmation") }, &processors, diff --git a/coordinator/src/tributary/handle.rs b/coordinator/src/tributary/handle.rs index cfca4500..c1476dc4 100644 --- a/coordinator/src/tributary/handle.rs +++ b/coordinator/src/tributary/handle.rs @@ -36,7 +36,8 @@ use serai_db::{Get, Db}; use crate::{ processors::Processors, tributary::{ - Transaction, TributarySpec, Topic, DataSpecification, TributaryDb, scanner::RecognizedIdType, + Transaction, TributarySpec, Topic, DataSpecification, TributaryDb, nonce_decider::NonceDecider, + scanner::RecognizedIdType, }, }; @@ -230,7 +231,7 @@ pub async fn handle_application_tx< FPst: Future, PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, FRid: Future, - RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32]) -> FRid, + RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid, >( tx: Transaction, spec: &TributarySpec, @@ -414,7 +415,8 @@ pub async fn handle_application_tx< Transaction::Batch(_, batch) => { // Because this Batch has achieved synchrony, its batch ID should be authorized TributaryDb::::recognize_topic(txn, genesis, Topic::Batch(batch)); - recognized_id(spec.set().network, genesis, RecognizedIdType::Batch, batch).await; + let nonce = NonceDecider::::handle_batch(txn, genesis, batch); + recognized_id(spec.set().network, genesis, RecognizedIdType::Batch, batch, nonce).await; } Transaction::SubstrateBlock(block) => { @@ -423,9 +425,10 @@ pub async fn handle_application_tx< despite us not providing that transaction", ); - for id in plan_ids { + let nonces = NonceDecider::::handle_substrate_block(txn, genesis, &plan_ids); + for (nonce, id) in nonces.into_iter().zip(plan_ids.into_iter()) { TributaryDb::::recognize_topic(txn, genesis, Topic::Sign(id)); - recognized_id(spec.set().network, genesis, RecognizedIdType::Plan, id).await; + recognized_id(spec.set().network, genesis, RecognizedIdType::Plan, id, nonce).await; } } @@ -441,6 +444,7 @@ pub async fn handle_application_tx< &data.signed, ) { Some(Some(preprocesses)) => { + NonceDecider::::selected_for_signing_batch(txn, genesis, data.plan); processors .send( spec.set().network, @@ -498,6 +502,7 @@ pub async fn handle_application_tx< &data.signed, ) { Some(Some(preprocesses)) => { + NonceDecider::::selected_for_signing_plan(txn, genesis, data.plan); processors .send( spec.set().network, diff --git a/coordinator/src/tributary/mod.rs b/coordinator/src/tributary/mod.rs index c97107db..9ae31ce1 100644 --- a/coordinator/src/tributary/mod.rs +++ b/coordinator/src/tributary/mod.rs @@ -30,6 +30,9 @@ use tributary::{ mod db; pub use db::*; +mod nonce_decider; +pub use nonce_decider::*; + mod handle; pub use handle::*; diff --git a/coordinator/src/tributary/nonce_decider.rs b/coordinator/src/tributary/nonce_decider.rs new file mode 100644 index 00000000..eb95c539 --- /dev/null +++ b/coordinator/src/tributary/nonce_decider.rs @@ -0,0 +1,127 @@ +use core::marker::PhantomData; + +use serai_db::{Get, DbTxn, Db}; + +use crate::tributary::Transaction; + +/// Decides the nonce which should be used for a transaction on a Tributary. +/// +/// Deterministically builds a list of nonces to use based on the on-chain events and expected +/// transactions in response. Enables rebooting/rebuilding validators with full safety. +pub struct NonceDecider(PhantomData); + +const BATCH_CODE: u8 = 0; +const BATCH_SIGNING_CODE: u8 = 1; +const PLAN_CODE: u8 = 2; +const PLAN_SIGNING_CODE: u8 = 3; + +impl NonceDecider { + fn next_nonce_key(genesis: [u8; 32]) -> Vec { + D::key(b"coordinator_tributary_nonce", b"next", genesis) + } + fn allocate_nonce(txn: &mut D::Transaction<'_>, genesis: [u8; 32]) -> u32 { + let key = Self::next_nonce_key(genesis); + let next = + txn.get(&key).map(|bytes| u32::from_le_bytes(bytes.try_into().unwrap())).unwrap_or(3); + txn.put(key, (next + 1).to_le_bytes()); + next + } + + fn item_nonce_key(genesis: [u8; 32], code: u8, id: [u8; 32]) -> Vec { + D::key( + b"coordinator_tributary_nonce", + b"item", + [genesis.as_slice(), [code].as_ref(), id.as_ref()].concat(), + ) + } + fn set_nonce( + txn: &mut D::Transaction<'_>, + genesis: [u8; 32], + code: u8, + id: [u8; 32], + nonce: u32, + ) { + txn.put(Self::item_nonce_key(genesis, code, id), nonce.to_le_bytes()) + } + fn db_nonce(getter: &G, genesis: [u8; 32], code: u8, id: [u8; 32]) -> Option { + getter + .get(Self::item_nonce_key(genesis, code, id)) + .map(|bytes| u32::from_le_bytes(bytes.try_into().unwrap())) + } + + pub fn handle_batch(txn: &mut D::Transaction<'_>, genesis: [u8; 32], batch: [u8; 32]) -> u32 { + let nonce_for = Self::allocate_nonce(txn, genesis); + Self::set_nonce(txn, genesis, BATCH_CODE, batch, nonce_for); + nonce_for + } + pub fn selected_for_signing_batch( + txn: &mut D::Transaction<'_>, + genesis: [u8; 32], + batch: [u8; 32], + ) { + let nonce_for = Self::allocate_nonce(txn, genesis); + Self::set_nonce(txn, genesis, BATCH_SIGNING_CODE, batch, nonce_for); + } + + pub fn handle_substrate_block( + txn: &mut D::Transaction<'_>, + genesis: [u8; 32], + plans: &[[u8; 32]], + ) -> Vec { + let mut res = Vec::with_capacity(plans.len()); + for plan in plans { + let nonce_for = Self::allocate_nonce(txn, genesis); + Self::set_nonce(txn, genesis, PLAN_CODE, *plan, nonce_for); + res.push(nonce_for); + } + res + } + pub fn selected_for_signing_plan( + txn: &mut D::Transaction<'_>, + genesis: [u8; 32], + plan: [u8; 32], + ) { + let nonce_for = Self::allocate_nonce(txn, genesis); + Self::set_nonce(txn, genesis, PLAN_SIGNING_CODE, plan, nonce_for); + } + + pub fn nonce(getter: &G, genesis: [u8; 32], tx: &Transaction) -> Option> { + match tx { + Transaction::DkgCommitments(attempt, _, _) => { + assert_eq!(*attempt, 0); + Some(Some(0)) + } + Transaction::DkgShares { attempt, .. } => { + assert_eq!(*attempt, 0); + Some(Some(1)) + } + Transaction::DkgConfirmed(attempt, _, _) => { + assert_eq!(*attempt, 0); + Some(Some(2)) + } + + Transaction::Batch(_, _) => None, + Transaction::SubstrateBlock(_) => None, + + Transaction::BatchPreprocess(data) => { + assert_eq!(data.attempt, 0); + Some(Self::db_nonce(getter, genesis, BATCH_CODE, data.plan)) + } + Transaction::BatchShare(data) => { + assert_eq!(data.attempt, 0); + Some(Self::db_nonce(getter, genesis, BATCH_SIGNING_CODE, data.plan)) + } + + Transaction::SignPreprocess(data) => { + assert_eq!(data.attempt, 0); + Some(Self::db_nonce(getter, genesis, PLAN_CODE, data.plan)) + } + Transaction::SignShare(data) => { + assert_eq!(data.attempt, 0); + Some(Self::db_nonce(getter, genesis, PLAN_SIGNING_CODE, data.plan)) + } + + Transaction::SignCompleted { .. } => None, + } + } +} diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index fe8a18d2..a3708b71 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -40,7 +40,7 @@ async fn handle_block< FPst: Future, PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, FRid: Future, - RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32]) -> FRid, + RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid, P: P2p, >( db: &mut TributaryDb, @@ -107,7 +107,7 @@ pub async fn handle_new_blocks< FPst: Future, PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, FRid: Future, - RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32]) -> FRid, + RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid, P: P2p, >( db: &mut TributaryDb,