diff --git a/coordinator/src/substrate.rs b/coordinator/src/substrate.rs index 9d5bcea2..60ea1e20 100644 --- a/coordinator/src/substrate.rs +++ b/coordinator/src/substrate.rs @@ -83,7 +83,7 @@ async fn handle_new_set( .unwrap(); // Trigger a DKG - // TODO: Check how the processor handles thi being fired multiple times + // TODO: Check how the processor handles this being fired multiple times // We already have a unique event ID based on block, event index (where event index is // the one generated in this handle_block function) // We could use that on this end and the processor end? @@ -114,9 +114,9 @@ async fn handle_key_gen( .expect("KeyGen occurred for a set which doesn't exist") .is_some() { - // TODO: Check how the processor handles thi being fired multiple times + // TODO: Check how the processor handles this being fired multiple times processor - .send(CoordinatorMessage::KeyGen( + .send(CoordinatorMessage::Substrate( processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair { context: SubstrateContext { coin_latest_finalized_block: serai @@ -168,23 +168,8 @@ async fn handle_batch_and_burns( // prior batches // Since batches within a block are guaranteed to be ordered, thanks to their incremental ID, // the last batch will be the latest batch, so its block will be the latest block + // This is just a mild optimization to prevent needing an additional RPC call to grab this batch_block.insert(network, network_block); - - // TODO: Check how the processor handles thi being fired multiple times - processor - .send(CoordinatorMessage::Coordinator( - processor_messages::coordinator::CoordinatorMessage::BatchSigned { - key: get_coin_key( - serai, - // TODO2 - ValidatorSet { network, session: Session(0) }, - ) - .await? - .expect("ValidatorSet without keys signed a batch"), - block: network_block, - }, - )) - .await; } else { panic!("Batch event wasn't Batch: {batch:?}"); } @@ -228,7 +213,7 @@ async fn handle_batch_and_burns( .expect("network had a batch/burn yet never set a latest block") }; - // TODO: Check how the processor handles thi being fired multiple times + // TODO: Check how the processor handles this being fired multiple times processor .send(CoordinatorMessage::Substrate( processor_messages::substrate::CoordinatorMessage::SubstrateBlock { diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index 35a9215b..d9107752 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -110,8 +110,6 @@ pub mod coordinator { BatchShares { id: SignId, shares: HashMap }, // Re-attempt a batch signing protocol. BatchReattempt { id: SignId }, - // Needed so a client which didn't participate in signing can still realize signing completed - BatchSigned { key: Vec, block: BlockHash }, } impl CoordinatorMessage { @@ -120,7 +118,6 @@ pub mod coordinator { CoordinatorMessage::BatchPreprocesses { id, .. } => BlockHash(id.id), CoordinatorMessage::BatchShares { id, .. } => BlockHash(id.id), CoordinatorMessage::BatchReattempt { id } => BlockHash(id.id), - CoordinatorMessage::BatchSigned { block, .. } => *block, }) } @@ -129,7 +126,6 @@ pub mod coordinator { CoordinatorMessage::BatchPreprocesses { id, .. } => &id.key, CoordinatorMessage::BatchShares { id, .. } => &id.key, CoordinatorMessage::BatchReattempt { id } => &id.key, - CoordinatorMessage::BatchSigned { key, .. } => key, } } } diff --git a/processor/src/main.rs b/processor/src/main.rs index 1a14c1ad..a88d9ddb 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -141,9 +141,13 @@ struct TributaryMutable { key_gen: KeyGen, signers: HashMap, Signer>, - // This isn't mutably borrowed by Substrate. It is also mutably borrowed by the Scanner. - // The safety of this is from the Scanner starting new sign tasks, and Tributary only mutating - // already-created sign tasks. The Scanner does not mutate sign tasks post-creation. + // This is also mutably borrowed by the Scanner. + // The Scanner starts new sign tasks. + // The Tributary mutates already-created signed tasks, potentially completing them. + // Substrate may mark tasks as completed, invalidating any existing mutable borrows. + // The safety of this follows as written above. + + // TODO: There should only be one SubstrateSigner at a time (see #277) substrate_signers: HashMap, SubstrateSigner>, } @@ -343,7 +347,13 @@ async fn handle_coordinator_msg( let key = ::read_G::<&[u8]>(&mut key_vec.as_ref()).unwrap(); // We now have to acknowledge every block for this key up to the acknowledged block - let outputs = substrate_mutable.scanner.ack_up_to_block(key, block_id).await; + let (blocks, outputs) = substrate_mutable.scanner.ack_up_to_block(key, block_id).await; + // Since this block was acknowledged, we no longer have to sign the batch for it + for block in blocks { + for (_, signer) in tributary_mutable.substrate_signers.iter_mut() { + signer.batch_signed(block); + } + } let mut payments = vec![]; for out in burns { diff --git a/processor/src/scanner.rs b/processor/src/scanner.rs index 419a1b9e..653f5aa7 100644 --- a/processor/src/scanner.rs +++ b/processor/src/scanner.rs @@ -14,6 +14,8 @@ use tokio::{ time::sleep, }; +use serai_client::primitives::BlockHash; + use crate::{ Get, DbTxn, Db, coins::{Output, Transaction, EventualitiesTracker, Block, Coin}, @@ -175,13 +177,19 @@ impl ScannerDb { fn scanned_block_key(key: &::G) -> Vec { Self::scanner_key(b"scanned_block", key.to_bytes()) } + + #[allow(clippy::type_complexity)] fn save_scanned_block( txn: &mut D::Transaction<'_>, key: &::G, block: usize, - ) -> Vec { - let outputs = Self::block(txn, block).and_then(|id| Self::outputs(txn, key, &id)); - let outputs = outputs.unwrap_or(vec![]); + ) -> (Option<>::Id>, Vec) { + let id = Self::block(txn, block); // It may be None for the first key rotated to + let outputs = if let Some(id) = id.as_ref() { + Self::outputs(txn, key, id).unwrap_or(vec![]) + } else { + vec![] + }; // Mark all the outputs from this block as seen for output in &outputs { @@ -191,7 +199,7 @@ impl ScannerDb { txn.put(Self::scanned_block_key(key), u64::try_from(block).unwrap().to_le_bytes()); // Return this block's outputs so they can be pruned from the RAM cache - outputs + (id, outputs) } fn latest_scanned_block(&self, key: ::G) -> usize { let bytes = self @@ -269,7 +277,8 @@ impl ScannerHandle { info!("Rotating to key {}", hex::encode(key.to_bytes())); let mut txn = scanner.db.0.txn(); - assert!(ScannerDb::::save_scanned_block(&mut txn, &key, activation_number).is_empty()); + let (_, outputs) = ScannerDb::::save_scanned_block(&mut txn, &key, activation_number); + assert!(outputs.is_empty()); ScannerDb::::add_active_key(&mut txn, key); txn.commit(); scanner.keys.push(key); @@ -284,7 +293,7 @@ impl ScannerHandle { &mut self, key: ::G, id: >::Id, - ) -> Vec { + ) -> (Vec, Vec) { let mut scanner = self.scanner.write().await; debug!("Block {} acknowledged", hex::encode(&id)); @@ -294,11 +303,16 @@ impl ScannerHandle { // Get the number of the last block we acknowledged let prior = scanner.db.latest_scanned_block(key); + let mut blocks = vec![]; let mut outputs = vec![]; let mut txn = scanner.db.0.txn(); for number in (prior + 1) ..= number { - outputs.extend(ScannerDb::::save_scanned_block(&mut txn, &key, number)); + let (block, these_outputs) = ScannerDb::::save_scanned_block(&mut txn, &key, number); + let block = BlockHash(block.unwrap().as_ref().try_into().unwrap()); + blocks.push(block); + outputs.extend(these_outputs); } + assert_eq!(blocks.last().unwrap().as_ref(), id.as_ref()); // TODO: This likely needs to be atomic with the scheduler? txn.commit(); @@ -306,7 +320,7 @@ impl ScannerHandle { assert!(scanner.ram_outputs.remove(output.id().as_ref())); } - outputs + (blocks, outputs) } } diff --git a/processor/src/substrate_signer.rs b/processor/src/substrate_signer.rs index 28e09b2c..368813c1 100644 --- a/processor/src/substrate_signer.rs +++ b/processor/src/substrate_signer.rs @@ -18,7 +18,10 @@ use frost_schnorrkel::Schnorrkel; use log::{info, debug, warn}; -use serai_client::in_instructions::primitives::{Batch, SignedBatch}; +use serai_client::{ + primitives::BlockHash, + in_instructions::primitives::{Batch, SignedBatch}, +}; use messages::{sign::SignId, coordinator::*}; use crate::{DbTxn, Db}; @@ -104,13 +107,10 @@ impl SubstrateSigner { // Check the attempt lines up match self.attempt.get(&id.id) { // If we don't have an attempt logged, it's because the coordinator is faulty OR because we - // rebooted + // rebooted OR we detected the signed batch on chain + // The latter is the expected flow for batches not actively being participated in None => { - warn!( - "not attempting batch {} #{}. this is an error if we didn't reboot", - hex::encode(id.id), - id.attempt - ); + warn!("not attempting batch {} #{}", hex::encode(id.id), id.attempt); Err(())?; } Some(attempt) => { @@ -204,7 +204,7 @@ impl SubstrateSigner { pub async fn sign(&mut self, batch: Batch) { if self.db.completed(batch.block.0) { debug!("Sign batch order for ID we've already completed signing"); - // See BatchSigned for commentary on why this simply returns + // See batch_signed for commentary on why this simply returns return; } @@ -318,26 +318,28 @@ impl SubstrateSigner { CoordinatorMessage::BatchReattempt { id } => { self.attempt(id.id, id.attempt).await; } - - CoordinatorMessage::BatchSigned { key: _, block } => { - // Stop trying to sign for this batch - let mut txn = self.db.0.txn(); - SubstrateSignerDb::::complete(&mut txn, block.0); - txn.commit(); - - self.signable.remove(&block.0); - self.attempt.remove(&block.0); - self.preprocessing.remove(&block.0); - self.signing.remove(&block.0); - - // This doesn't emit SignedBatch because it doesn't have access to the SignedBatch - // The coordinator is expected to only claim a batch was signed if it's on the Substrate - // chain, hence why it's unnecessary to check it/back it up here - - // This also doesn't emit any further events since all mutation happen on the - // substrate::CoordinatorMessage::SubstrateBlock message (which SignedBatch is meant to - // end up triggering) - } } } + + pub fn batch_signed(&mut self, block: BlockHash) { + // Stop trying to sign for this batch + let mut txn = self.db.0.txn(); + SubstrateSignerDb::::complete(&mut txn, block.0); + txn.commit(); + + self.signable.remove(&block.0); + self.attempt.remove(&block.0); + self.preprocessing.remove(&block.0); + self.signing.remove(&block.0); + + // This doesn't emit SignedBatch because it doesn't have access to the SignedBatch + // This function is expected to only be called once Substrate acknowledges this block, + // which means its batch must have been signed + // While a successive batch's signing would also cause this block to be acknowledged, Substrate + // guarantees a batch's ordered inclusion + + // This also doesn't emit any further events since all mutation from the Batch being signed + // happens on the substrate::CoordinatorMessage::SubstrateBlock message (which SignedBatch is + // meant to end up triggering) + } } diff --git a/processor/src/tests/scanner.rs b/processor/src/tests/scanner.rs index 7650eb5e..f900ed01 100644 --- a/processor/src/tests/scanner.rs +++ b/processor/src/tests/scanner.rs @@ -7,6 +7,8 @@ use frost::Participant; use tokio::time::timeout; +use serai_client::primitives::BlockHash; + use serai_db::MemDb; use crate::{ @@ -25,13 +27,14 @@ pub async fn test_scanner(coin: C) { } let first = Arc::new(Mutex::new(true)); + let activation_number = coin.get_latest_block_number().await.unwrap(); let db = MemDb::new(); let new_scanner = || async { let (mut scanner, active_keys) = Scanner::new(coin.clone(), db.clone()); let mut first = first.lock().unwrap(); if *first { assert!(active_keys.is_empty()); - scanner.rotate_key(coin.get_latest_block_number().await.unwrap(), keys.group_key()).await; + scanner.rotate_key(activation_number, keys.group_key()).await; *first = false; } else { assert_eq!(active_keys.len(), 1); @@ -68,7 +71,19 @@ pub async fn test_scanner(coin: C) { verify_event(new_scanner().await).await; // Acknowledge the block - assert_eq!(scanner.ack_up_to_block(keys.group_key(), block_id.clone()).await, outputs); + + // Acknowledging it should yield a list of all blocks since the last acknowledged block + let mut blocks = vec![]; + let mut curr_block = activation_number + 1; + loop { + let block = coin.get_block(curr_block).await.unwrap().id(); + blocks.push(BlockHash(block.as_ref().try_into().unwrap())); + if block == block_id { + break; + } + curr_block += 1; + } + assert_eq!(scanner.ack_up_to_block(keys.group_key(), block_id).await, (blocks, outputs)); // There should be no more events assert!(timeout(Duration::from_secs(30), scanner.events.recv()).await.is_err()); diff --git a/processor/src/tests/wallet.rs b/processor/src/tests/wallet.rs index 65fb3dae..6b33b7fb 100644 --- a/processor/src/tests/wallet.rs +++ b/processor/src/tests/wallet.rs @@ -115,7 +115,7 @@ pub async fn test_wallet(coin: C) { // Check the Scanner DB can reload the outputs assert_eq!( - scanner.ack_up_to_block(key, block.id()).await, + scanner.ack_up_to_block(key, block.id()).await.1, [first_outputs, outputs].concat().to_vec() ); }