diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index a1d36f1c..48affa58 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -41,4 +41,19 @@ impl<'a, D: Db> MainDb<'a, D> { txn.put(key, existing_bytes); txn.commit(); } + + fn first_preprocess_key(id: [u8; 32]) -> Vec { + Self::main_key(b"first_preprocess", id) + } + pub fn save_first_preprocess(txn: &mut D::Transaction<'_>, id: [u8; 32], preprocess: Vec) { + let key = Self::first_preprocess_key(id); + if let Some(existing) = txn.get(&key) { + assert_eq!(existing, preprocess, "saved a distinct first preprocess"); + return; + } + txn.put(key, preprocess); + } + pub fn first_preprocess(getter: &G, id: [u8; 32]) -> Vec { + getter.get(Self::first_preprocess_key(id)).expect("asked for first preprocess we never saved") + } } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index b8e6dfba..b6ba622d 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -14,15 +14,25 @@ use rand_core::OsRng; use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto}; -use serai_db::{Db, MemDb}; +use serai_db::{DbTxn, Db, MemDb}; use serai_client::Serai; -use tokio::{sync::RwLock, time::sleep}; +use tokio::{ + sync::{ + mpsc::{self, UnboundedSender}, + RwLock, + }, + time::sleep, +}; -use ::tributary::{ReadWrite, Block, Tributary, TributaryReader}; +use ::tributary::{ + ReadWrite, ProvidedError, TransactionKind, Transaction as TransactionTrait, Block, Tributary, + TributaryReader, +}; mod tributary; -use crate::tributary::{TributarySpec, SignData, Transaction}; +#[rustfmt::skip] +use crate::tributary::{TributarySpec, SignData, Transaction, TributaryDb, scanner::RecognizedIdType}; mod db; use db::MainDb; @@ -125,6 +135,7 @@ pub async fn scan_substrate( pub async fn scan_tributaries( raw_db: D, key: Zeroizing<::F>, + recognized_id_send: UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>, p2p: P, processor: Pro, tributaries: Arc>>>, @@ -162,6 +173,7 @@ pub async fn scan_tributaries( tributary::scanner::handle_new_blocks::<_, _>( &mut tributary_db, &key, + &recognized_id_send, &processor, spec, reader, @@ -223,19 +235,18 @@ pub async fn handle_p2p( } } - // TODO2: Rate limit this + // TODO2: Rate limit this per validator P2pMessageKind::Heartbeat(genesis) => { - let tributaries = tributaries.read().await; - let Some(tributary) = tributaries.get(&genesis) else { - log::debug!("received heartbeat message for unknown network"); - continue; - }; - if msg.msg.len() != 32 { log::error!("validator sent invalid heartbeat"); continue; } + let tributaries = tributaries.read().await; + let Some(tributary) = tributaries.get(&genesis) else { + log::debug!("received heartbeat message for unknown network"); + continue; + }; let tributary_read = tributary.tributary.read().await; /* @@ -312,8 +323,30 @@ pub async fn handle_p2p( } } +pub async fn publish_transaction( + tributary: &Tributary, + tx: Transaction, +) { + if let TransactionKind::Signed(signed) = tx.kind() { + if tributary + .next_nonce(signed.signer) + .await + .expect("we don't have a nonce, meaning we aren't a participant on this tributary") > + signed.nonce + { + log::warn!("we've already published this transaction. this should only appear on reboot"); + } else { + // We should've created a valid transaction + assert!(tributary.add_transaction(tx).await, "created an invalid transaction"); + } + } else { + panic!("non-signed transaction passed to publish_transaction"); + } +} + #[allow(clippy::type_complexity)] pub async fn handle_processors( + mut db: D, key: Zeroizing<::F>, mut processor: Pro, tributaries: Arc>>>, @@ -339,12 +372,20 @@ pub async fn handle_processors( }, ProcessorMessage::Sign(msg) => match msg { sign::ProcessorMessage::Preprocess { id, preprocess } => { - Some(Transaction::SignPreprocess(SignData { - plan: id.id, - attempt: id.attempt, - data: preprocess, - signed: Transaction::empty_signed(), - })) + if id.attempt == 0 { + let mut txn = db.txn(); + MainDb::::save_first_preprocess(&mut txn, id.id, preprocess); + txn.commit(); + + None + } else { + Some(Transaction::SignPreprocess(SignData { + plan: id.id, + attempt: id.attempt, + data: preprocess, + signed: Transaction::empty_signed(), + })) + } } sign::ProcessorMessage::Share { id, share } => Some(Transaction::SignShare(SignData { plan: id.id, @@ -356,15 +397,36 @@ pub async fn handle_processors( sign::ProcessorMessage::Completed { .. } => todo!(), }, ProcessorMessage::Coordinator(msg) => match msg { - // TODO - coordinator::ProcessorMessage::SubstrateBlockAck { .. } => todo!(), + coordinator::ProcessorMessage::SubstrateBlockAck { network: _, block, plans } => { + // TODO2: Check this network aligns with this processor + + // Safe to use its own txn since this is static and just needs to be written before we + // provide SubstrateBlock + let mut txn = db.txn(); + TributaryDb::::set_plan_ids(&mut txn, genesis, block, &plans); + txn.commit(); + + Some(Transaction::SubstrateBlock(block)) + } coordinator::ProcessorMessage::BatchPreprocess { id, preprocess } => { - Some(Transaction::BatchPreprocess(SignData { - plan: id.id, - attempt: id.attempt, - data: preprocess, - signed: Transaction::empty_signed(), - })) + // If this is the first attempt instance, synchronize around the block first + if id.attempt == 0 { + // Save the preprocess to disk so we can publish it later + // This is fine to use its own TX since it's static and just needs to be written + // before this message finishes it handling (or with this message's finished handling) + let mut txn = db.txn(); + MainDb::::save_first_preprocess(&mut txn, id.id, preprocess); + txn.commit(); + + Some(Transaction::ExternalBlock(id.id)) + } else { + Some(Transaction::BatchPreprocess(SignData { + plan: id.id, + attempt: id.attempt, + data: preprocess, + signed: Transaction::empty_signed(), + })) + } } coordinator::ProcessorMessage::BatchShare { id, share } => { Some(Transaction::BatchShare(SignData { @@ -383,13 +445,6 @@ pub async fn handle_processors( // If this created a transaction, publish it if let Some(mut tx) = tx { - // Get the next nonce - // let mut txn = db.txn(); - // let nonce = MainDb::tx_nonce(&mut txn, msg.id, tributary); - - let nonce = 0; // TODO - tx.sign(&mut OsRng, genesis, &key, nonce); - let tributaries = tributaries.read().await; let Some(tributary) = tributaries.get(&genesis) else { // TODO: This can happen since Substrate tells the Processor to generate commitments @@ -399,20 +454,28 @@ pub async fn handle_processors( }; let tributary = tributary.tributary.read().await; - if tributary - .next_nonce(pub_key) - .await - .expect("we don't have a nonce, meaning we aren't a participant on this tributary") > - nonce - { - log::warn!("we've already published this transaction. this should only appear on reboot"); - } else { - // We should've created a valid transaction - // TODO: Delay SignPreprocess/BatchPreprocess until associated ID is valid - assert!(tributary.add_transaction(tx).await, "created an invalid transaction"); - } - // txn.commit(); + match tx.kind() { + TransactionKind::Provided(_) => { + let res = tributary.provide_transaction(tx).await; + if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) { + panic!("provided an invalid transaction: {res:?}"); + } + } + TransactionKind::Signed(_) => { + // Get the next nonce + // let mut txn = db.txn(); + // let nonce = MainDb::tx_nonce(&mut txn, msg.id, tributary); + + let nonce = 0; // TODO + tx.sign(&mut OsRng, genesis, &key, nonce); + + publish_transaction(&tributary, tx).await; + + // txn.commit(); + } + _ => panic!("created an unexpected transaction"), + } } } } @@ -446,13 +509,61 @@ pub async fn run( } // Handle new blocks for each Tributary - tokio::spawn(scan_tributaries( - raw_db.clone(), - key.clone(), - p2p.clone(), - processor.clone(), - tributaries.clone(), - )); + let (recognized_id_send, mut recognized_id_recv) = mpsc::unbounded_channel(); + { + let raw_db = raw_db.clone(); + tokio::spawn(scan_tributaries( + raw_db, + key.clone(), + recognized_id_send, + p2p.clone(), + processor.clone(), + tributaries.clone(), + )); + } + + // When we reach consensus on a new external block, send our BatchPreprocess for it + tokio::spawn({ + let raw_db = raw_db.clone(); + let key = key.clone(); + let tributaries = tributaries.clone(); + async move { + loop { + if let Some((genesis, id_type, id)) = recognized_id_recv.recv().await { + let mut tx = match id_type { + RecognizedIdType::Block => Transaction::BatchPreprocess(SignData { + plan: id, + attempt: 0, + data: MainDb::::first_preprocess(&raw_db, id), + signed: Transaction::empty_signed(), + }), + + RecognizedIdType::Plan => Transaction::SignPreprocess(SignData { + plan: id, + attempt: 0, + data: MainDb::::first_preprocess(&raw_db, id), + signed: Transaction::empty_signed(), + }), + }; + + let nonce = 0; // TODO + tx.sign(&mut OsRng, genesis, &key, nonce); + + // TODO: Consolidate this code with the above instance + let tributaries = tributaries.read().await; + let Some(tributary) = tributaries.get(&genesis) else { + panic!("tributary we don't have came to consensus on an ExternalBlock"); + }; + let tributary = tributary.tributary.read().await; + + publish_transaction(&tributary, tx).await; + } else { + log::warn!("recognized_id_recv was dropped. are we shutting down?"); + break; + } + } + } + }); // Spawn the heartbeat task, which will trigger syncing if there hasn't been a Tributary block // in a while (presumably because we're behind) @@ -462,7 +573,7 @@ pub async fn run( tokio::spawn(handle_p2p(Ristretto::generator() * key.deref(), p2p, tributaries.clone())); // Handle all messages from processors - handle_processors(key, processor, tributaries).await; + handle_processors(raw_db, key, processor, tributaries).await; } #[tokio::main] diff --git a/coordinator/src/tests/tributary/dkg.rs b/coordinator/src/tests/tributary/dkg.rs index 0c4e8a99..bb6e49f7 100644 --- a/coordinator/src/tests/tributary/dkg.rs +++ b/coordinator/src/tests/tributary/dkg.rs @@ -7,7 +7,7 @@ use rand_core::{RngCore, OsRng}; use ciphersuite::{Ciphersuite, Ristretto}; use frost::Participant; -use tokio::time::sleep; +use tokio::{time::sleep, sync::mpsc}; use serai_db::MemDb; @@ -81,7 +81,16 @@ async fn dkg_test() { ) -> (TributaryDb, MemProcessor) { let mut scanner_db = TributaryDb(MemDb::new()); let processor = MemProcessor::new(); - handle_new_blocks(&mut scanner_db, key, &processor, spec, &tributary.reader()).await; + // Uses a brand new channel since this channel won't be used within this test + handle_new_blocks( + &mut scanner_db, + key, + &mpsc::unbounded_channel().0, + &processor, + spec, + &tributary.reader(), + ) + .await; (scanner_db, processor) } @@ -96,7 +105,15 @@ async fn dkg_test() { sleep(Duration::from_secs(Tributary::::block_time().into())).await; // Verify the scanner emits a KeyGen::Commitments message - handle_new_blocks(&mut scanner_db, &keys[0], &processor, &spec, &tributaries[0].1.reader()).await; + handle_new_blocks( + &mut scanner_db, + &keys[0], + &mpsc::unbounded_channel().0, + &processor, + &spec, + &tributaries[0].1.reader(), + ) + .await; { let mut msgs = processor.0.write().await; assert_eq!(msgs.pop_front().unwrap(), expected_commitments); @@ -137,7 +154,15 @@ async fn dkg_test() { } // With just 4 sets of shares, nothing should happen yet - handle_new_blocks(&mut scanner_db, &keys[0], &processor, &spec, &tributaries[0].1.reader()).await; + handle_new_blocks( + &mut scanner_db, + &keys[0], + &mpsc::unbounded_channel().0, + &processor, + &spec, + &tributaries[0].1.reader(), + ) + .await; assert!(processor.0.write().await.is_empty()); // Publish the final set of shares @@ -168,7 +193,15 @@ async fn dkg_test() { }; // Any scanner which has handled the prior blocks should only emit the new event - handle_new_blocks(&mut scanner_db, &keys[0], &processor, &spec, &tributaries[0].1.reader()).await; + handle_new_blocks( + &mut scanner_db, + &keys[0], + &mpsc::unbounded_channel().0, + &processor, + &spec, + &tributaries[0].1.reader(), + ) + .await; { let mut msgs = processor.0.write().await; assert_eq!(msgs.pop_front().unwrap(), shares_for(0)); diff --git a/coordinator/src/tributary/db.rs b/coordinator/src/tributary/db.rs index 940e3e0d..b0b66aa2 100644 --- a/coordinator/src/tributary/db.rs +++ b/coordinator/src/tributary/db.rs @@ -27,17 +27,17 @@ impl TributaryDb { self.0.get(Self::block_key(genesis)).map(|last| last.try_into().unwrap()).unwrap_or(genesis) } - // This shouldn't need genesis? Yet it's saner to have then quibble about. - fn batch_id_key(genesis: &[u8], ext_block: [u8; 32]) -> Vec { - Self::tributary_key(b"batch_id", [genesis, ext_block.as_ref()].concat()) - } - pub fn batch_id(getter: &G, genesis: [u8; 32], ext_block: [u8; 32]) -> Option<[u8; 32]> { - getter.get(Self::batch_id_key(&genesis, ext_block)).map(|bytes| bytes.try_into().unwrap()) - } - fn plan_ids_key(genesis: &[u8], block: u64) -> Vec { Self::tributary_key(b"plan_ids", [genesis, block.to_le_bytes().as_ref()].concat()) } + pub fn set_plan_ids( + txn: &mut D::Transaction<'_>, + genesis: [u8; 32], + block: u64, + plans: &[[u8; 32]], + ) { + txn.put(Self::plan_ids_key(&genesis, block), plans.concat()); + } pub fn plan_ids(getter: &G, genesis: [u8; 32], block: u64) -> Option> { getter.get(Self::plan_ids_key(&genesis, block)).map(|bytes| { let mut res = vec![]; diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index 4e00124f..2db25be7 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -5,6 +5,8 @@ use zeroize::Zeroizing; use ciphersuite::{Ciphersuite, Ristretto}; +use tokio::sync::mpsc::UnboundedSender; + use tributary::{Signed, Block, TributaryReader}; use processor_messages::{ @@ -21,10 +23,17 @@ use crate::{ tributary::{TributaryDb, TributarySpec, Transaction}, }; +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub enum RecognizedIdType { + Block, + Plan, +} + // Handle a specific Tributary block async fn handle_block( db: &mut TributaryDb, key: &Zeroizing<::F>, + recognized_id: &UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>, processor: &Pro, spec: &TributarySpec, block: Block, @@ -172,16 +181,10 @@ async fn handle_block( Transaction::ExternalBlock(block) => { // Because this external block has been finalized, its batch ID should be authorized - - // If we didn't provide this transaction, we should halt until we do - // If we provided a distinct transaction, we should error - // If we did provide this transaction, we should've set the batch ID for the block - let batch_id = TributaryDb::::batch_id(&txn, genesis, block).expect( - "synced a tributary block finalizing a external block in a provided transaction \ - despite us not providing that transaction", - ); - - TributaryDb::::recognize_id(&mut txn, Zone::Batch.label(), genesis, batch_id); + TributaryDb::::recognize_id(&mut txn, Zone::Batch.label(), genesis, block); + recognized_id + .send((genesis, RecognizedIdType::Block, block)) + .expect("recognized_id_recv was dropped. are we shutting down?"); } Transaction::SubstrateBlock(block) => { @@ -192,6 +195,9 @@ async fn handle_block( for id in plan_ids { TributaryDb::::recognize_id(&mut txn, Zone::Sign.label(), genesis, id); + recognized_id + .send((genesis, RecognizedIdType::Plan, id)) + .expect("recognized_id_recv was dropped. are we shutting down?"); } } @@ -287,6 +293,7 @@ async fn handle_block( pub async fn handle_new_blocks( db: &mut TributaryDb, key: &Zeroizing<::F>, + recognized_id: &UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>, processor: &Pro, spec: &TributarySpec, tributary: &TributaryReader, @@ -295,7 +302,7 @@ pub async fn handle_new_blocks( let mut last_block = db.last_block(genesis); while let Some(next) = tributary.block_after(&last_block) { let block = tributary.block(&next).unwrap(); - handle_block(db, key, processor, spec, block).await; + handle_block(db, key, recognized_id, processor, spec, block).await; last_block = next; db.set_last_block(genesis, next); } diff --git a/processor/src/signer.rs b/processor/src/signer.rs index 85804fa5..44696d34 100644 --- a/processor/src/signer.rs +++ b/processor/src/signer.rs @@ -325,6 +325,10 @@ impl Signer { Ok(machine) => machine, }; + // TODO: Use a seeded RNG here so we don't produce distinct messages with the same intent + // This is also needed so we don't preprocess, send preprocess, reboot before ack'ing the + // message, send distinct preprocess, and then attempt a signing session premised on the former + // with the latter let (machine, preprocess) = machine.preprocess(&mut OsRng); self.preprocessing.insert(id.id, machine); diff --git a/processor/src/substrate_signer.rs b/processor/src/substrate_signer.rs index 248e3cce..87ed7ea6 100644 --- a/processor/src/substrate_signer.rs +++ b/processor/src/substrate_signer.rs @@ -190,6 +190,10 @@ impl SubstrateSigner { // b"substrate" is a literal from sp-core let machine = AlgorithmMachine::new(Schnorrkel::new(b"substrate"), self.keys.clone()); + // TODO: Use a seeded RNG here so we don't produce distinct messages with the same purpose + // This is also needed so we don't preprocess, send preprocess, reboot before ack'ing the + // message, send distinct preprocess, and then attempt a signing session premised on the former + // with the latter let (machine, preprocess) = machine.preprocess(&mut OsRng); self.preprocessing.insert(id.id, machine); @@ -206,6 +210,7 @@ impl SubstrateSigner { return; } + // Use the block hash as the ID let id = batch.block.0; self.signable.insert(id, batch); self.attempt(txn, id, 0).await;