From 64d370ac11dada9d433eaef1bb0760da023df4d2 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 27 Sep 2023 00:44:31 -0400 Subject: [PATCH] Make publish_signed_transaction safe for out of order publications This is a possibility under the new deterministic nonce scheme. While there is a concern of us never creating a transaction with a nonce, blocking everything, we should always create transactions. We'll always publish preprocesses, and while we'll only publish shares if everyone else does, we only allocate for shares once everyone else does. --- coordinator/src/db.rs | 18 ++++++++++++++- coordinator/src/main.rs | 51 ++++++++++++++++++++++++++--------------- 2 files changed, 50 insertions(+), 19 deletions(-) diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index 4393037c..5019b6c0 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -5,7 +5,8 @@ use serai_client::{primitives::NetworkId, in_instructions::primitives::SignedBat pub use serai_db::*; -use crate::tributary::TributarySpec; +use ::tributary::ReadWrite; +use crate::tributary::{TributarySpec, Transaction}; #[derive(Debug)] pub struct MainDb(PhantomData); @@ -51,6 +52,21 @@ impl MainDb { txn.put(key, existing_bytes); } + fn signed_transaction_key(nonce: u32) -> Vec { + Self::main_key(b"signed_transaction", nonce.to_le_bytes()) + } + pub fn save_signed_transaction(txn: &mut D::Transaction<'_>, nonce: u32, tx: Transaction) { + txn.put(Self::signed_transaction_key(nonce), tx.serialize()); + } + pub fn take_signed_transaction(txn: &mut D::Transaction<'_>, nonce: u32) -> Option { + let key = Self::signed_transaction_key(nonce); + let res = txn.get(&key).map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap()); + if res.is_some() { + txn.del(&key); + } + res + } + fn first_preprocess_key(id: [u8; 32]) -> Vec { Self::main_key(b"first_preprocess", id) } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 05cb3add..61bbc013 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -490,27 +490,42 @@ pub async fn handle_p2p( } } -pub async fn publish_signed_transaction( +async fn publish_signed_transaction( + db: &mut D, tributary: &Tributary, tx: Transaction, ) { log::debug!("publishing transaction {}", hex::encode(tx.hash())); - if let TransactionKind::Signed(signed) = tx.kind() { - // TODO: What if we try to publish TX with a nonce of 5 when the blockchain only has 3? - if tributary - .next_nonce(signed.signer) - .await - .expect("we don't have a nonce, meaning we aren't a participant on this tributary") > - signed.nonce - { - log::warn!("we've already published this transaction. this should only appear on reboot"); - } else { - // We should've created a valid transaction - assert!(tributary.add_transaction(tx).await, "created an invalid transaction"); - } + + let mut txn = db.txn(); + let signer = if let TransactionKind::Signed(signed) = tx.kind() { + let signer = signed.signer; + + // Safe as we should deterministically create transactions, meaning if this is already on-disk, + // it's what we're saving now + MainDb::::save_signed_transaction(&mut txn, signed.nonce, tx); + + signer } else { panic!("non-signed transaction passed to publish_signed_transaction"); + }; + + // If we're trying to publish 5, when the last transaction published was 3, this will delay + // publication until the point in time we publish 4 + while let Some(tx) = MainDb::::take_signed_transaction( + &mut txn, + tributary + .next_nonce(signer) + .await + .expect("we don't have a nonce, meaning we aren't a participant on this tributary"), + ) { + // We should've created a valid transaction + // This does assume publish_signed_transaction hasn't been called twice with the same + // transaction, which risks a race condition on the validity of this assert + // Our use case only calls this function sequentially + assert!(tributary.add_transaction(tx).await, "created an invalid transaction"); } + txn.commit(); } async fn handle_processor_messages( @@ -521,7 +536,7 @@ async fn handle_processor_messages( tributary: ActiveTributary, mut recv: mpsc::UnboundedReceiver, ) { - let db_clone = db.clone(); // Enables cloning the DB while we have a txn + let mut db_clone = db.clone(); // Enables cloning the DB while we have a txn let pub_key = Ristretto::generator() * key.deref(); let ActiveTributary { spec, tributary } = tributary; @@ -799,7 +814,7 @@ async fn handle_processor_messages( }; tx.sign(&mut OsRng, genesis, &key, nonce); - publish_signed_transaction(&tributary, tx).await; + publish_signed_transaction(&mut db_clone, &tributary, tx).await; } } } @@ -953,7 +968,7 @@ pub async fn run( }); move |network, genesis, id_type, id, nonce| { - let raw_db = raw_db.clone(); + let mut raw_db = raw_db.clone(); let key = key.clone(); let tributaries = tributaries.clone(); async move { @@ -994,7 +1009,7 @@ pub async fn run( // TODO: This may happen if the task above is simply slow panic!("tributary we don't have came to consensus on an Batch"); }; - publish_signed_transaction(tributary, tx).await; + publish_signed_transaction(&mut raw_db, tributary, tx).await; } } };