Implement deterministic nonces for Tributary transactions

This commit is contained in:
Luke Parker 2023-09-25 15:42:39 -04:00
parent ca69f97fef
commit 4babf898d7
No known key found for this signature in database
6 changed files with 175 additions and 38 deletions

View file

@ -34,8 +34,9 @@ use ::tributary::{
}; };
mod tributary; mod tributary;
#[rustfmt::skip] use crate::tributary::{
use crate::tributary::{TributarySpec, SignData, Transaction, TributaryDb, scanner::RecognizedIdType}; TributarySpec, SignData, Transaction, TributaryDb, NonceDecider, scanner::RecognizedIdType,
};
mod db; mod db;
use db::MainDb; use db::MainDb;
@ -186,7 +187,7 @@ pub async fn scan_tributaries<
Pro: Processors, Pro: Processors,
P: P2p, P: P2p,
FRid: Future<Output = ()>, FRid: Future<Output = ()>,
RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32]) -> FRid, RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid,
>( >(
raw_db: D, raw_db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>, key: Zeroizing<<Ristretto as Ciphersuite>::F>,
@ -455,6 +456,7 @@ pub async fn publish_transaction<D: Db, P: P2p>(
) { ) {
log::debug!("publishing transaction {}", hex::encode(tx.hash())); log::debug!("publishing transaction {}", hex::encode(tx.hash()));
if let TransactionKind::Signed(signed) = tx.kind() { if let TransactionKind::Signed(signed) = tx.kind() {
// TODO: What if we try to publish TX with a nonce of 5 when the blockchain only has 3?
if tributary if tributary
.next_nonce(signed.signer) .next_nonce(signed.signer)
.await .await
@ -610,6 +612,7 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
// Safe to use its own txn since this is static and just needs to be written before we // Safe to use its own txn since this is static and just needs to be written before we
// provide SubstrateBlock // provide SubstrateBlock
let mut txn = db.txn(); let mut txn = db.txn();
// TODO: This needs to be scoped per multisig
TributaryDb::<D>::set_plan_ids(&mut txn, genesis, block, &plans); TributaryDb::<D>::set_plan_ids(&mut txn, genesis, block, &plans);
txn.commit(); txn.commit();
@ -756,23 +759,29 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
tributary.add_transaction(tx).await; tributary.add_transaction(tx).await;
} }
TransactionKind::Signed(_) => { TransactionKind::Signed(_) => {
// Get the next nonce
// TODO: This should be deterministic, not just DB-backed, to allow rebuilding validators
// without the prior instance's DB
// let mut txn = db.txn();
// let nonce = MainDb::tx_nonce(&mut txn, msg.id, tributary);
// TODO: This isn't deterministic, or at least DB-backed, and accordingly is unsafe
log::trace!("getting next nonce for Tributary TX in response to processor message"); log::trace!("getting next nonce for Tributary TX in response to processor message");
let nonce = tributary
.next_nonce(Ristretto::generator() * key.deref()) let nonce = loop {
.await let Some(nonce) =
.expect("publishing a TX to a tributary we aren't in"); NonceDecider::<D>::nonce(&db, genesis, &tx).expect("signed TX didn't have nonce")
else {
// This can be None if:
// 1) We scanned the relevant transaction(s) in a Tributary block
// 2) The processor was sent a message and responded
// 3) The Tributary TXN has yet to be committed
log::warn!("nonce has yet to be saved for processor-instigated transaction");
sleep(Duration::from_millis(100)).await;
continue;
};
break nonce;
};
tx.sign(&mut OsRng, genesis, &key, nonce); tx.sign(&mut OsRng, genesis, &key, nonce);
let Some(tributary) = tributaries.get(&genesis) else {
panic!("tributary we don't have came to consensus on an Batch");
};
let tributary = tributary.tributary.read().await;
publish_transaction(&tributary, tx).await; publish_transaction(&tributary, tx).await;
// txn.commit();
} }
} }
} }
@ -816,7 +825,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
let raw_db = raw_db.clone(); let raw_db = raw_db.clone();
let key = key.clone(); let key = key.clone();
let tributaries = tributaries.clone(); let tributaries = tributaries.clone();
move |network, genesis, id_type, id| { move |network, genesis, id_type, id, nonce| {
let raw_db = raw_db.clone(); let raw_db = raw_db.clone();
let key = key.clone(); let key = key.clone();
let tributaries = tributaries.clone(); let tributaries = tributaries.clone();
@ -851,20 +860,13 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
}), }),
}; };
tx.sign(&mut OsRng, genesis, &key, nonce);
let tributaries = tributaries.read().await; let tributaries = tributaries.read().await;
let Some(tributary) = tributaries.get(&genesis) else { let Some(tributary) = tributaries.get(&genesis) else {
panic!("tributary we don't have came to consensus on an Batch"); panic!("tributary we don't have came to consensus on an Batch");
}; };
let tributary = tributary.tributary.read().await; let tributary = tributary.tributary.read().await;
// TODO: Same note as prior nonce acquisition
log::trace!("getting next nonce for Tributary TX containing Batch signing data");
let nonce = tributary
.next_nonce(Ristretto::generator() * key.deref())
.await
.expect("publishing a TX to a tributary we aren't in");
tx.sign(&mut OsRng, genesis, &key, nonce);
publish_transaction(&tributary, tx).await; publish_transaction(&tributary, tx).await;
} }
} }

View file

@ -86,7 +86,7 @@ async fn dkg_test() {
handle_new_blocks::<_, _, _, _, _, _, LocalP2p>( handle_new_blocks::<_, _, _, _, _, _, LocalP2p>(
&mut scanner_db, &mut scanner_db,
key, key,
|_, _, _, _| async { |_, _, _, _, _| async {
panic!("provided TX caused recognized_id to be called in new_processors") panic!("provided TX caused recognized_id to be called in new_processors")
}, },
&processors, &processors,
@ -112,7 +112,7 @@ async fn dkg_test() {
handle_new_blocks::<_, _, _, _, _, _, LocalP2p>( handle_new_blocks::<_, _, _, _, _, _, LocalP2p>(
&mut scanner_db, &mut scanner_db,
&keys[0], &keys[0],
|_, _, _, _| async { |_, _, _, _, _| async {
panic!("provided TX caused recognized_id to be called after Commitments") panic!("provided TX caused recognized_id to be called after Commitments")
}, },
&processors, &processors,
@ -191,7 +191,7 @@ async fn dkg_test() {
handle_new_blocks::<_, _, _, _, _, _, LocalP2p>( handle_new_blocks::<_, _, _, _, _, _, LocalP2p>(
&mut scanner_db, &mut scanner_db,
&keys[0], &keys[0],
|_, _, _, _| async { |_, _, _, _, _| async {
panic!("provided TX caused recognized_id to be called after some shares") panic!("provided TX caused recognized_id to be called after some shares")
}, },
&processors, &processors,
@ -239,7 +239,7 @@ async fn dkg_test() {
handle_new_blocks::<_, _, _, _, _, _, LocalP2p>( handle_new_blocks::<_, _, _, _, _, _, LocalP2p>(
&mut scanner_db, &mut scanner_db,
&keys[0], &keys[0],
|_, _, _, _| async { panic!("provided TX caused recognized_id to be called after shares") }, |_, _, _, _, _| async { panic!("provided TX caused recognized_id to be called after shares") },
&processors, &processors,
|_, _| async { panic!("test tried to publish a new Serai TX") }, |_, _| async { panic!("test tried to publish a new Serai TX") },
&spec, &spec,
@ -306,7 +306,7 @@ async fn dkg_test() {
handle_new_blocks::<_, _, _, _, _, _, LocalP2p>( handle_new_blocks::<_, _, _, _, _, _, LocalP2p>(
&mut scanner_db, &mut scanner_db,
&keys[0], &keys[0],
|_, _, _, _| async { |_, _, _, _, _| async {
panic!("provided TX caused recognized_id to be called after DKG confirmation") panic!("provided TX caused recognized_id to be called after DKG confirmation")
}, },
&processors, &processors,

View file

@ -36,7 +36,8 @@ use serai_db::{Get, Db};
use crate::{ use crate::{
processors::Processors, processors::Processors,
tributary::{ tributary::{
Transaction, TributarySpec, Topic, DataSpecification, TributaryDb, scanner::RecognizedIdType, Transaction, TributarySpec, Topic, DataSpecification, TributaryDb, nonce_decider::NonceDecider,
scanner::RecognizedIdType,
}, },
}; };
@ -230,7 +231,7 @@ pub async fn handle_application_tx<
FPst: Future<Output = ()>, FPst: Future<Output = ()>,
PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, PST: Clone + Fn(ValidatorSet, Encoded) -> FPst,
FRid: Future<Output = ()>, FRid: Future<Output = ()>,
RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32]) -> FRid, RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid,
>( >(
tx: Transaction, tx: Transaction,
spec: &TributarySpec, spec: &TributarySpec,
@ -414,7 +415,8 @@ pub async fn handle_application_tx<
Transaction::Batch(_, batch) => { Transaction::Batch(_, batch) => {
// Because this Batch has achieved synchrony, its batch ID should be authorized // Because this Batch has achieved synchrony, its batch ID should be authorized
TributaryDb::<D>::recognize_topic(txn, genesis, Topic::Batch(batch)); TributaryDb::<D>::recognize_topic(txn, genesis, Topic::Batch(batch));
recognized_id(spec.set().network, genesis, RecognizedIdType::Batch, batch).await; let nonce = NonceDecider::<D>::handle_batch(txn, genesis, batch);
recognized_id(spec.set().network, genesis, RecognizedIdType::Batch, batch, nonce).await;
} }
Transaction::SubstrateBlock(block) => { Transaction::SubstrateBlock(block) => {
@ -423,9 +425,10 @@ pub async fn handle_application_tx<
despite us not providing that transaction", despite us not providing that transaction",
); );
for id in plan_ids { let nonces = NonceDecider::<D>::handle_substrate_block(txn, genesis, &plan_ids);
for (nonce, id) in nonces.into_iter().zip(plan_ids.into_iter()) {
TributaryDb::<D>::recognize_topic(txn, genesis, Topic::Sign(id)); TributaryDb::<D>::recognize_topic(txn, genesis, Topic::Sign(id));
recognized_id(spec.set().network, genesis, RecognizedIdType::Plan, id).await; recognized_id(spec.set().network, genesis, RecognizedIdType::Plan, id, nonce).await;
} }
} }
@ -441,6 +444,7 @@ pub async fn handle_application_tx<
&data.signed, &data.signed,
) { ) {
Some(Some(preprocesses)) => { Some(Some(preprocesses)) => {
NonceDecider::<D>::selected_for_signing_batch(txn, genesis, data.plan);
processors processors
.send( .send(
spec.set().network, spec.set().network,
@ -498,6 +502,7 @@ pub async fn handle_application_tx<
&data.signed, &data.signed,
) { ) {
Some(Some(preprocesses)) => { Some(Some(preprocesses)) => {
NonceDecider::<D>::selected_for_signing_plan(txn, genesis, data.plan);
processors processors
.send( .send(
spec.set().network, spec.set().network,

View file

@ -30,6 +30,9 @@ use tributary::{
mod db; mod db;
pub use db::*; pub use db::*;
mod nonce_decider;
pub use nonce_decider::*;
mod handle; mod handle;
pub use handle::*; pub use handle::*;

View file

@ -0,0 +1,127 @@
use core::marker::PhantomData;
use serai_db::{Get, DbTxn, Db};
use crate::tributary::Transaction;
/// Decides the nonce which should be used for a transaction on a Tributary.
///
/// Deterministically builds a list of nonces to use based on the on-chain events and expected
/// transactions in response. Enables rebooting/rebuilding validators with full safety.
pub struct NonceDecider<D: Db>(PhantomData<D>);
const BATCH_CODE: u8 = 0;
const BATCH_SIGNING_CODE: u8 = 1;
const PLAN_CODE: u8 = 2;
const PLAN_SIGNING_CODE: u8 = 3;
impl<D: Db> NonceDecider<D> {
fn next_nonce_key(genesis: [u8; 32]) -> Vec<u8> {
D::key(b"coordinator_tributary_nonce", b"next", genesis)
}
fn allocate_nonce(txn: &mut D::Transaction<'_>, genesis: [u8; 32]) -> u32 {
let key = Self::next_nonce_key(genesis);
let next =
txn.get(&key).map(|bytes| u32::from_le_bytes(bytes.try_into().unwrap())).unwrap_or(3);
txn.put(key, (next + 1).to_le_bytes());
next
}
fn item_nonce_key(genesis: [u8; 32], code: u8, id: [u8; 32]) -> Vec<u8> {
D::key(
b"coordinator_tributary_nonce",
b"item",
[genesis.as_slice(), [code].as_ref(), id.as_ref()].concat(),
)
}
fn set_nonce(
txn: &mut D::Transaction<'_>,
genesis: [u8; 32],
code: u8,
id: [u8; 32],
nonce: u32,
) {
txn.put(Self::item_nonce_key(genesis, code, id), nonce.to_le_bytes())
}
fn db_nonce<G: Get>(getter: &G, genesis: [u8; 32], code: u8, id: [u8; 32]) -> Option<u32> {
getter
.get(Self::item_nonce_key(genesis, code, id))
.map(|bytes| u32::from_le_bytes(bytes.try_into().unwrap()))
}
pub fn handle_batch(txn: &mut D::Transaction<'_>, genesis: [u8; 32], batch: [u8; 32]) -> u32 {
let nonce_for = Self::allocate_nonce(txn, genesis);
Self::set_nonce(txn, genesis, BATCH_CODE, batch, nonce_for);
nonce_for
}
pub fn selected_for_signing_batch(
txn: &mut D::Transaction<'_>,
genesis: [u8; 32],
batch: [u8; 32],
) {
let nonce_for = Self::allocate_nonce(txn, genesis);
Self::set_nonce(txn, genesis, BATCH_SIGNING_CODE, batch, nonce_for);
}
pub fn handle_substrate_block(
txn: &mut D::Transaction<'_>,
genesis: [u8; 32],
plans: &[[u8; 32]],
) -> Vec<u32> {
let mut res = Vec::with_capacity(plans.len());
for plan in plans {
let nonce_for = Self::allocate_nonce(txn, genesis);
Self::set_nonce(txn, genesis, PLAN_CODE, *plan, nonce_for);
res.push(nonce_for);
}
res
}
pub fn selected_for_signing_plan(
txn: &mut D::Transaction<'_>,
genesis: [u8; 32],
plan: [u8; 32],
) {
let nonce_for = Self::allocate_nonce(txn, genesis);
Self::set_nonce(txn, genesis, PLAN_SIGNING_CODE, plan, nonce_for);
}
pub fn nonce<G: Get>(getter: &G, genesis: [u8; 32], tx: &Transaction) -> Option<Option<u32>> {
match tx {
Transaction::DkgCommitments(attempt, _, _) => {
assert_eq!(*attempt, 0);
Some(Some(0))
}
Transaction::DkgShares { attempt, .. } => {
assert_eq!(*attempt, 0);
Some(Some(1))
}
Transaction::DkgConfirmed(attempt, _, _) => {
assert_eq!(*attempt, 0);
Some(Some(2))
}
Transaction::Batch(_, _) => None,
Transaction::SubstrateBlock(_) => None,
Transaction::BatchPreprocess(data) => {
assert_eq!(data.attempt, 0);
Some(Self::db_nonce(getter, genesis, BATCH_CODE, data.plan))
}
Transaction::BatchShare(data) => {
assert_eq!(data.attempt, 0);
Some(Self::db_nonce(getter, genesis, BATCH_SIGNING_CODE, data.plan))
}
Transaction::SignPreprocess(data) => {
assert_eq!(data.attempt, 0);
Some(Self::db_nonce(getter, genesis, PLAN_CODE, data.plan))
}
Transaction::SignShare(data) => {
assert_eq!(data.attempt, 0);
Some(Self::db_nonce(getter, genesis, PLAN_SIGNING_CODE, data.plan))
}
Transaction::SignCompleted { .. } => None,
}
}
}

View file

@ -40,7 +40,7 @@ async fn handle_block<
FPst: Future<Output = ()>, FPst: Future<Output = ()>,
PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, PST: Clone + Fn(ValidatorSet, Encoded) -> FPst,
FRid: Future<Output = ()>, FRid: Future<Output = ()>,
RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32]) -> FRid, RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid,
P: P2p, P: P2p,
>( >(
db: &mut TributaryDb<D>, db: &mut TributaryDb<D>,
@ -107,7 +107,7 @@ pub async fn handle_new_blocks<
FPst: Future<Output = ()>, FPst: Future<Output = ()>,
PST: Clone + Fn(ValidatorSet, Encoded) -> FPst, PST: Clone + Fn(ValidatorSet, Encoded) -> FPst,
FRid: Future<Output = ()>, FRid: Future<Output = ()>,
RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32]) -> FRid, RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid,
P: P2p, P: P2p,
>( >(
db: &mut TributaryDb<D>, db: &mut TributaryDb<D>,