diff --git a/Cargo.lock b/Cargo.lock index fed0f8d9..b1cd5141 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8075,6 +8075,7 @@ dependencies = [ "serai-tokens-pallet", "serai-validator-sets-pallet", "sp-application-crypto", + "sp-core", "sp-runtime", "thiserror", ] diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 8c113131..04df8516 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -535,7 +535,7 @@ pub async fn handle_processors( Some(Transaction::SubstrateBlock(block)) } - coordinator::ProcessorMessage::BatchPreprocess { id, preprocess } => { + coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocess } => { // If this is the first attempt instance, synchronize around the block first if id.attempt == 0 { // Save the preprocess to disk so we can publish it later @@ -545,7 +545,9 @@ pub async fn handle_processors( MainDb::::save_first_preprocess(&mut txn, id.id, preprocess); txn.commit(); - Some(Transaction::ExternalBlock(id.id)) + // TODO: This will publish one ExternalBlock per Batch. We should only publish one per + // all batches within a block + Some(Transaction::ExternalBlock(block.0)) } else { Some(Transaction::BatchPreprocess(SignData { plan: id.id, diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index 8a0b6d05..fa699305 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -8,7 +8,7 @@ use frost::ThresholdParams; use serai_client::{ SeraiError, Block, Serai, - primitives::BlockHash, + primitives::{BlockHash, NetworkId}, validator_sets::{ primitives::{Session, ValidatorSet, KeyPair}, ValidatorSetsEvent, @@ -157,21 +157,23 @@ async fn handle_batch_and_burns( // While that shouldn't be needed, ensuring order never hurts, and may enable design choices // with regards to Processor <-> Coordinator message passing let mut networks_with_event = vec![]; - let mut network_had_event = |burns: &mut HashMap<_, _>, network| { + let mut network_had_event = |burns: &mut HashMap<_, _>, batches: &mut HashMap<_, _>, network| { // Don't insert this network multiple times // A Vec is still used in order to maintain the insertion order if !networks_with_event.contains(&network) { networks_with_event.push(network); burns.insert(network, vec![]); + batches.insert(network, vec![]); } }; let mut batch_block = HashMap::new(); + let mut batches = HashMap::>::new(); let mut burns = HashMap::new(); for batch in serai.get_batch_events(hash).await? { - if let InInstructionsEvent::Batch { network, id: _, block: network_block } = batch { - network_had_event(&mut burns, network); + if let InInstructionsEvent::Batch { network, id, block: network_block } = batch { + network_had_event(&mut burns, &mut batches, network); // Track what Serai acknowledges as the latest block for this network // If this Substrate block has multiple batches, the last batch's block will overwrite the @@ -180,6 +182,9 @@ async fn handle_batch_and_burns( // the last batch will be the latest batch, so its block will be the latest block // This is just a mild optimization to prevent needing an additional RPC call to grab this batch_block.insert(network, network_block); + + // Add the batch included by this block + batches.get_mut(&network).unwrap().push(id); } else { panic!("Batch event wasn't Batch: {batch:?}"); } @@ -188,12 +193,10 @@ async fn handle_batch_and_burns( for burn in serai.get_burn_events(hash).await? { if let TokensEvent::Burn { address: _, balance, instruction } = burn { let network = balance.coin.network(); - network_had_event(&mut burns, network); + network_had_event(&mut burns, &mut batches, network); // network_had_event should register an entry in burns - let mut burns_so_far = burns.remove(&network).unwrap(); - burns_so_far.push(OutInstructionWithBalance { balance, instruction }); - burns.insert(network, burns_so_far); + burns.get_mut(&network).unwrap().push(OutInstructionWithBalance { balance, instruction }); } else { panic!("Burn event wasn't Burn: {burn:?}"); } @@ -230,6 +233,7 @@ async fn handle_batch_and_burns( .map(|keys| keys.1.into_inner()) .expect("batch/burn for network which never set keys"), burns: burns.remove(&network).unwrap(), + batches: batches.remove(&network).unwrap(), }, ), ) diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index d47ae233..17927ffb 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -115,12 +115,16 @@ pub mod coordinator { } impl CoordinatorMessage { + // The Coordinator will only send Batch messages once the Batch ID has been recognized + // The ID will only be recognized when the block is acknowledged by a super-majority of the + // network *and the local node* + // This synchrony obtained lets us ignore the synchrony requirement offered here pub fn required_block(&self) -> Option { - Some(match self { - CoordinatorMessage::BatchPreprocesses { id, .. } => BlockHash(id.id), - CoordinatorMessage::BatchShares { id, .. } => BlockHash(id.id), - CoordinatorMessage::BatchReattempt { id } => BlockHash(id.id), - }) + match self { + CoordinatorMessage::BatchPreprocesses { .. } => None, + CoordinatorMessage::BatchShares { .. } => None, + CoordinatorMessage::BatchReattempt { .. } => None, + } } pub fn key(&self) -> &[u8] { @@ -135,7 +139,7 @@ pub mod coordinator { #[derive(Clone, PartialEq, Eq, Debug, Zeroize, Serialize, Deserialize)] pub enum ProcessorMessage { SubstrateBlockAck { network: NetworkId, block: u64, plans: Vec<[u8; 32]> }, - BatchPreprocess { id: SignId, preprocess: Vec }, + BatchPreprocess { id: SignId, block: BlockHash, preprocess: Vec }, BatchShare { id: SignId, share: [u8; 32] }, } } @@ -156,6 +160,7 @@ pub mod substrate { block: u64, key: Vec, burns: Vec, + batches: Vec, }, } diff --git a/processor/src/main.rs b/processor/src/main.rs index 89076ecc..50385d86 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -12,13 +12,13 @@ use frost::{curve::Ciphersuite, ThresholdKeys}; use log::{info, warn, error}; use tokio::time::sleep; -use scale::Decode; +use scale::{Encode, Decode}; use serai_client::{ primitives::{MAX_DATA_LEN, BlockHash, NetworkId}, tokens::primitives::{OutInstruction, OutInstructionWithBalance}, in_instructions::primitives::{ - Shorthand, RefundableInInstruction, InInstructionWithBalance, Batch, + Shorthand, RefundableInInstruction, InInstructionWithBalance, Batch, MAX_BATCH_SIZE, }, }; @@ -396,6 +396,7 @@ async fn handle_coordinator_msg( block, key: key_vec, burns, + batches, } => { assert_eq!(network_id, N::NETWORK, "coordinator sent us data for another network"); @@ -405,12 +406,11 @@ async fn handle_coordinator_msg( let key = ::read_G::<&[u8]>(&mut key_vec.as_ref()).unwrap(); // We now have to acknowledge every block for this key up to the acknowledged block - let (blocks, outputs) = - substrate_mutable.scanner.ack_up_to_block(txn, key, block_id).await; + let outputs = substrate_mutable.scanner.ack_up_to_block(txn, key, block_id).await; // Since this block was acknowledged, we no longer have to sign the batch for it - for block in blocks { + for batch_id in batches { for (_, signer) in tributary_mutable.substrate_signers.iter_mut() { - signer.batch_signed(txn, block); + signer.batch_signed(txn, batch_id); } } @@ -665,51 +665,84 @@ async fn run(mut raw_db: D, network: N, mut let mut txn = raw_db.txn(); match msg.unwrap() { - ScannerEvent::Block { key, block, batch, outputs } => { + ScannerEvent::Block { key, block, outputs } => { let mut block_hash = [0; 32]; block_hash.copy_from_slice(block.as_ref()); + // TODO: Move this out from Scanner now that the Scanner no longer handles batches + let mut batch_id = substrate_mutable.scanner.next_batch_id(&txn); - let batch = Batch { + // start with empty batch + let mut batches = vec![Batch { network: N::NETWORK, - id: batch, + id: batch_id, block: BlockHash(block_hash), - instructions: outputs.iter().filter_map(|output| { - // If these aren't externally received funds, don't handle it as an instruction - if output.kind() != OutputType::External { - return None; - } + instructions: vec![], + }]; + for output in outputs { + // If these aren't externally received funds, don't handle it as an instruction + if output.kind() != OutputType::External { + continue; + } - let mut data = output.data(); - let max_data_len = MAX_DATA_LEN.try_into().unwrap(); - if data.len() > max_data_len { - error!( - "data in output {} exceeded MAX_DATA_LEN ({MAX_DATA_LEN}): {}", - hex::encode(output.id()), - data.len(), - ); - data = &data[.. max_data_len]; - } + let mut data = output.data(); + let max_data_len = usize::try_from(MAX_DATA_LEN).unwrap(); + // TODO: Should we drop this, instead of truncating? + // A truncating message likely doesn't have value yet has increased data load and is + // corrupt vs a NOP. The former seems more likely to cause problems + if data.len() > max_data_len { + error!( + "data in output {} exceeded MAX_DATA_LEN ({MAX_DATA_LEN}): {}. truncating", + hex::encode(output.id()), + data.len(), + ); + data = &data[.. max_data_len]; + } - let shorthand = Shorthand::decode(&mut data).ok()?; - let instruction = RefundableInInstruction::try_from(shorthand).ok()?; - // TODO2: Set instruction.origin if not set (and handle refunds in general) - Some(InInstructionWithBalance { - instruction: instruction.instruction, - balance: output.balance(), - }) - }).collect() - }; + let Ok(shorthand) = Shorthand::decode(&mut data) else { continue }; + let Ok(instruction) = RefundableInInstruction::try_from(shorthand) else { continue }; - info!("created batch {} ({} instructions)", batch.id, batch.instructions.len()); + // TODO2: Set instruction.origin if not set (and handle refunds in general) + let instruction = InInstructionWithBalance { + instruction: instruction.instruction, + balance: output.balance(), + }; + + let batch = batches.last_mut().unwrap(); + batch.instructions.push(instruction); + + // check if batch is over-size + if batch.encode().len() > MAX_BATCH_SIZE { + // pop the last instruction so it's back in size + let instruction = batch.instructions.pop().unwrap(); + + // bump the id for the new batch + batch_id += 1; + + // make a new batch with this instruction included + batches.push(Batch { + network: N::NETWORK, + id: batch_id, + block: BlockHash(block_hash), + instructions: vec![instruction], + }); + } + } + + // Save the next batch ID + substrate_mutable.scanner.set_next_batch_id(&mut txn, batch_id + 1); // Start signing this batch - // TODO: Don't reload both sets of keys in full just to get the Substrate public key - tributary_mutable - .substrate_signers - .get_mut(tributary_mutable.key_gen.keys(&key).0.group_key().to_bytes().as_slice()) - .unwrap() - .sign(&mut txn, batch) - .await; + for batch in batches { + info!("created batch {} ({} instructions)", batch.id, batch.instructions.len()); + + // TODO: Don't reload both sets of keys in full just to get the Substrate public key + tributary_mutable + .substrate_signers + .get_mut(tributary_mutable.key_gen.keys(&key).0.group_key().to_bytes().as_slice()) + .unwrap() + .sign(&mut txn, batch) + .await; + } }, ScannerEvent::Completed(id, tx) => { diff --git a/processor/src/scanner.rs b/processor/src/scanner.rs index 6ebde415..51404431 100644 --- a/processor/src/scanner.rs +++ b/processor/src/scanner.rs @@ -14,8 +14,6 @@ use tokio::{ time::sleep, }; -use serai_client::primitives::BlockHash; - use crate::{ Get, DbTxn, Db, networks::{Output, Transaction, EventualitiesTracker, Block, Network}, @@ -27,7 +25,6 @@ pub enum ScannerEvent { Block { key: ::G, block: >::Id, - batch: u32, outputs: Vec, }, // Eventuality completion found on-chain @@ -115,9 +112,6 @@ impl ScannerDb { fn next_batch_key() -> Vec { Self::scanner_key(b"next_batch", []) } - fn batch_key(key: &::G, block: &>::Id) -> Vec { - Self::scanner_key(b"batch", [key.to_bytes().as_ref(), block.as_ref()].concat()) - } fn outputs_key( key: &::G, block: &>::Id, @@ -129,12 +123,7 @@ impl ScannerDb { key: &::G, block: &>::Id, outputs: &[N::Output], - ) -> u32 { - let batch_key = Self::batch_key(key, block); - if let Some(batch) = txn.get(batch_key) { - return u32::from_le_bytes(batch.try_into().unwrap()); - } - + ) { let mut bytes = Vec::with_capacity(outputs.len() * 64); for output in outputs { output.write(&mut bytes).unwrap(); @@ -150,13 +139,6 @@ impl ScannerDb { // 0a, 1a, 2a, 3a, 4a, 5a, 4b, 5b // when it should be // 0a, 1a, 2a, 3a, 4a, 4b, 5a, 5b - - // Because it's a new set of outputs, allocate a batch ID for it - let next_bytes = txn.get(Self::next_batch_key()).unwrap_or(vec![0; 4]).try_into().unwrap(); - let next = u32::from_le_bytes(next_bytes); - txn.put(Self::next_batch_key(), (next + 1).to_le_bytes()); - txn.put(Self::batch_key(key, block), next_bytes); - next } fn outputs( txn: &D::Transaction<'_>, @@ -182,7 +164,7 @@ impl ScannerDb { txn: &mut D::Transaction<'_>, key: &::G, block: usize, - ) -> (Option<>::Id>, Vec) { + ) -> Vec { let id = Self::block(txn, block); // It may be None for the first key rotated to let outputs = if let Some(id) = id.as_ref() { Self::outputs(txn, key, id).unwrap_or(vec![]) @@ -198,7 +180,7 @@ impl ScannerDb { txn.put(Self::scanned_block_key(key), u64::try_from(block).unwrap().to_le_bytes()); // Return this block's outputs so they can be pruned from the RAM cache - (id, outputs) + outputs } fn latest_scanned_block(getter: &G, key: ::G) -> usize { let bytes = getter @@ -280,7 +262,7 @@ impl ScannerHandle { info!("Rotating scanner to key {} at {activation_number}", hex::encode(key.to_bytes())); - let (_, outputs) = ScannerDb::::save_scanned_block(txn, &key, activation_number); + let outputs = ScannerDb::::save_scanned_block(txn, &key, activation_number); scanner.ram_scanned.insert(key.to_bytes().as_ref().to_vec(), activation_number); assert!(outputs.is_empty()); @@ -295,13 +277,25 @@ impl ScannerHandle { ScannerDb::::block_number(&self.scanner.read().await.db, id) } + // Set the next batch ID to use + pub fn set_next_batch_id(&self, txn: &mut D::Transaction<'_>, batch: u32) { + txn.put(ScannerDb::::next_batch_key(), batch.to_le_bytes()); + } + + // Get the next batch ID + pub fn next_batch_id(&self, txn: &D::Transaction<'_>) -> u32 { + txn + .get(ScannerDb::::next_batch_key()) + .map_or(0, |v| u32::from_le_bytes(v.try_into().unwrap())) + } + /// Acknowledge having handled a block for a key. pub async fn ack_up_to_block( &mut self, txn: &mut D::Transaction<'_>, key: ::G, id: >::Id, - ) -> (Vec, Vec) { + ) -> Vec { let mut scanner = self.scanner.write().await; debug!("Block {} acknowledged", hex::encode(&id)); @@ -311,21 +305,16 @@ impl ScannerHandle { // Get the number of the last block we acknowledged let prior = ScannerDb::::latest_scanned_block(txn, key); - let mut blocks = vec![]; let mut outputs = vec![]; for number in (prior + 1) ..= number { - let (block, these_outputs) = ScannerDb::::save_scanned_block(txn, &key, number); - let block = BlockHash(block.unwrap().as_ref().try_into().unwrap()); - blocks.push(block); - outputs.extend(these_outputs); + outputs.extend(ScannerDb::::save_scanned_block(txn, &key, number)); } - assert_eq!(blocks.last().unwrap().as_ref(), id.as_ref()); for output in &outputs { assert!(scanner.ram_outputs.remove(output.id().as_ref())); } - (blocks, outputs) + outputs } } @@ -514,11 +503,11 @@ impl Scanner { // Save the outputs to disk let mut txn = scanner.db.txn(); - let batch = ScannerDb::::save_outputs(&mut txn, &key, &block_id, &outputs); + ScannerDb::::save_outputs(&mut txn, &key, &block_id, &outputs); txn.commit(); // Send all outputs - if !scanner.emit(ScannerEvent::Block { key, block: block_id, batch, outputs }) { + if !scanner.emit(ScannerEvent::Block { key, block: block_id, outputs }) { return; } // Write this number as scanned so we won't re-fire these outputs diff --git a/processor/src/substrate_signer.rs b/processor/src/substrate_signer.rs index 2a81e0c7..f54f7de7 100644 --- a/processor/src/substrate_signer.rs +++ b/processor/src/substrate_signer.rs @@ -18,10 +18,7 @@ use frost_schnorrkel::Schnorrkel; use log::{info, debug, warn}; -use serai_client::{ - primitives::BlockHash, - in_instructions::primitives::{Batch, SignedBatch, batch_message}, -}; +use serai_client::in_instructions::primitives::{Batch, SignedBatch, batch_message}; use messages::{sign::SignId, coordinator::*}; use crate::{Get, DbTxn, Db}; @@ -149,7 +146,9 @@ impl SubstrateSigner { } // Start this attempt - if !self.signable.contains_key(&id) { + let block = if let Some(batch) = self.signable.get(&id) { + batch.block + } else { warn!("told to attempt signing a batch we aren't currently signing for"); return; }; @@ -162,7 +161,7 @@ impl SubstrateSigner { self.attempt.insert(id, attempt); let id = SignId { key: self.keys.group_key().to_bytes().to_vec(), id, attempt }; - info!("signing batch {}, attempt #{}", hex::encode(id.id), id.attempt); + info!("signing batch {} #{}", hex::encode(id.id), id.attempt); // If we reboot mid-sign, the current design has us abort all signs and wait for latter // attempts/new signing protocols @@ -199,19 +198,21 @@ impl SubstrateSigner { // Broadcast our preprocess self.events.push_back(SubstrateSignerEvent::ProcessorMessage( - ProcessorMessage::BatchPreprocess { id, preprocess: preprocess.serialize() }, + ProcessorMessage::BatchPreprocess { id, block, preprocess: preprocess.serialize() }, )); } pub async fn sign(&mut self, txn: &mut D::Transaction<'_>, batch: Batch) { - if SubstrateSignerDb::::completed(txn, batch.block.0) { + // Use the batch id as the ID + let mut id = [0u8; 32]; + id[.. 4].copy_from_slice(&batch.id.to_le_bytes()); + + if SubstrateSignerDb::::completed(txn, id) { debug!("Sign batch order for ID we've already completed signing"); // See batch_signed for commentary on why this simply returns return; } - // Use the block hash as the ID - let id = batch.block.0; self.signable.insert(id, batch); self.attempt(txn, id, 0).await; } @@ -335,14 +336,19 @@ impl SubstrateSigner { } } - pub fn batch_signed(&mut self, txn: &mut D::Transaction<'_>, block: BlockHash) { - // Stop trying to sign for this batch - SubstrateSignerDb::::complete(txn, block.0); + pub fn batch_signed(&mut self, txn: &mut D::Transaction<'_>, batch_id: u32) { + // Convert into 32-byte ID + // TODO: Add a BatchSignId so we don't have this inefficiency + let mut id = [0u8; 32]; + id[.. 4].copy_from_slice(&batch_id.to_le_bytes()); - self.signable.remove(&block.0); - self.attempt.remove(&block.0); - self.preprocessing.remove(&block.0); - self.signing.remove(&block.0); + // Stop trying to sign for this batch + SubstrateSignerDb::::complete(txn, id); + + self.signable.remove(&id); + self.attempt.remove(&id); + self.preprocessing.remove(&id); + self.signing.remove(&id); // This doesn't emit SignedBatch because it doesn't have access to the SignedBatch // This function is expected to only be called once Substrate acknowledges this block, diff --git a/processor/src/tests/addresses.rs b/processor/src/tests/addresses.rs index bd134278..d3b23ddb 100644 --- a/processor/src/tests/addresses.rs +++ b/processor/src/tests/addresses.rs @@ -20,7 +20,6 @@ async fn spend( network: &N, keys: &HashMap>, scanner: &mut ScannerHandle, - batch: u32, outputs: Vec, ) -> Vec { let key = keys[&Participant::new(1).unwrap()].group_key(); @@ -52,9 +51,8 @@ async fn spend( network.mine_block().await; } match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Block { key: this_key, block: _, batch: this_batch, outputs } => { + ScannerEvent::Block { key: this_key, block: _, outputs } => { assert_eq!(this_key, key); - assert_eq!(this_batch, batch); assert_eq!(outputs.len(), 1); // Make sure this is actually a change output assert_eq!(outputs[0].kind(), OutputType::Change); @@ -91,10 +89,9 @@ pub async fn test_addresses(network: N) { // Verify the Scanner picked them up let outputs = match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Block { key: this_key, block, batch, outputs } => { + ScannerEvent::Block { key: this_key, block, outputs } => { assert_eq!(this_key, key); assert_eq!(block, block_id); - assert_eq!(batch, 0); assert_eq!(outputs.len(), 1); assert_eq!(outputs[0].kind(), OutputType::Branch); outputs @@ -105,7 +102,7 @@ pub async fn test_addresses(network: N) { }; // Spend the branch output, creating a change output and ensuring we actually get change - let outputs = spend(&network, &keys, &mut scanner, 1, outputs).await; + let outputs = spend(&network, &keys, &mut scanner, outputs).await; // Also test spending the change output - spend(&network, &keys, &mut scanner, 2, outputs).await; + spend(&network, &keys, &mut scanner, outputs).await; } diff --git a/processor/src/tests/scanner.rs b/processor/src/tests/scanner.rs index 799e365b..4f20ed0a 100644 --- a/processor/src/tests/scanner.rs +++ b/processor/src/tests/scanner.rs @@ -55,10 +55,9 @@ pub async fn test_scanner(network: N) { let verify_event = |mut scanner: ScannerHandle| async { let outputs = match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Block { key, block, batch, outputs } => { + ScannerEvent::Block { key, block, outputs } => { assert_eq!(key, keys.group_key()); assert_eq!(block, block_id); - assert_eq!(batch, 0); assert_eq!(outputs.len(), 1); assert_eq!(outputs[0].kind(), OutputType::External); outputs @@ -90,10 +89,7 @@ pub async fn test_scanner(network: N) { let mut cloned_db = db.clone(); let mut txn = cloned_db.txn(); - assert_eq!( - scanner.ack_up_to_block(&mut txn, keys.group_key(), block_id).await, - (blocks, outputs) - ); + assert_eq!(scanner.ack_up_to_block(&mut txn, keys.group_key(), block_id).await, outputs); txn.commit(); // There should be no more events diff --git a/processor/src/tests/substrate_signer.rs b/processor/src/tests/substrate_signer.rs index a6f55bf7..592ce1d7 100644 --- a/processor/src/tests/substrate_signer.rs +++ b/processor/src/tests/substrate_signer.rs @@ -24,13 +24,16 @@ async fn test_substrate_signer() { let participant_one = Participant::new(1).unwrap(); + let id: u32 = 5; + let mut id_arr = [0u8; 32]; + id_arr[.. 4].copy_from_slice(&id.to_le_bytes()); let block = BlockHash([0xaa; 32]); let actual_id = - SignId { key: keys[&participant_one].group_key().to_bytes().to_vec(), id: block.0, attempt: 0 }; + SignId { key: keys[&participant_one].group_key().to_bytes().to_vec(), id: id_arr, attempt: 0 }; let batch = Batch { network: NetworkId::Monero, - id: 5, + id, block, instructions: vec![ InInstructionWithBalance { @@ -81,10 +84,12 @@ async fn test_substrate_signer() { let i = Participant::new(u16::try_from(i).unwrap()).unwrap(); if let SubstrateSignerEvent::ProcessorMessage(ProcessorMessage::BatchPreprocess { id, + block: batch_block, preprocess, }) = signers.get_mut(&i).unwrap().events.pop_front().unwrap() { assert_eq!(id, actual_id); + assert_eq!(batch_block, block); if signing_set.contains(&i) { preprocesses.insert(i, preprocess); } diff --git a/processor/src/tests/wallet.rs b/processor/src/tests/wallet.rs index 324e631f..3eebe6fc 100644 --- a/processor/src/tests/wallet.rs +++ b/processor/src/tests/wallet.rs @@ -36,10 +36,9 @@ pub async fn test_wallet(network: N) { let block_id = block.id(); match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Block { key: this_key, block, batch, outputs } => { + ScannerEvent::Block { key: this_key, block, outputs } => { assert_eq!(this_key, key); assert_eq!(block, block_id); - assert_eq!(batch, 0); assert_eq!(outputs.len(), 1); (block_id, outputs) } @@ -110,10 +109,9 @@ pub async fn test_wallet(network: N) { } match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Block { key: this_key, block: block_id, batch, outputs: these_outputs } => { + ScannerEvent::Block { key: this_key, block: block_id, outputs: these_outputs } => { assert_eq!(this_key, key); assert_eq!(block_id, block.id()); - assert_eq!(batch, 1); assert_eq!(these_outputs, outputs); } ScannerEvent::Completed(_, _) => { @@ -124,7 +122,7 @@ pub async fn test_wallet(network: N) { // Check the Scanner DB can reload the outputs let mut txn = db.txn(); assert_eq!( - scanner.ack_up_to_block(&mut txn, key, block.id()).await.1, + scanner.ack_up_to_block(&mut txn, key, block.id()).await, [first_outputs, outputs].concat().to_vec() ); txn.commit(); diff --git a/substrate/client/src/serai/in_instructions.rs b/substrate/client/src/serai/in_instructions.rs index 978830f4..4d744030 100644 --- a/substrate/client/src/serai/in_instructions.rs +++ b/substrate/client/src/serai/in_instructions.rs @@ -19,7 +19,7 @@ impl Serai { hash: [u8; 32], network: NetworkId, ) -> Result, SeraiError> { - self.storage(PALLET, "LatestBlock", Some(vec![scale_value(network)]), hash).await + self.storage(PALLET, "LatestNetworkBlock", Some(vec![scale_value(network)]), hash).await } pub async fn get_batch_events( diff --git a/substrate/in-instructions/pallet/Cargo.toml b/substrate/in-instructions/pallet/Cargo.toml index ea9dab34..5261e70c 100644 --- a/substrate/in-instructions/pallet/Cargo.toml +++ b/substrate/in-instructions/pallet/Cargo.toml @@ -19,6 +19,7 @@ scale-info = { version = "2", default-features = false, features = ["derive"] } sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false } sp-runtime = { git = "https://github.com/serai-dex/substrate", default-features = false } +sp-core = { git = "https://github.com/serai-dex/substrate", default-features = false } frame-system = { git = "https://github.com/serai-dex/substrate", default-features = false } frame-support = { git = "https://github.com/serai-dex/substrate", default-features = false } diff --git a/substrate/in-instructions/pallet/src/lib.rs b/substrate/in-instructions/pallet/src/lib.rs index afe2c2f6..f6729c91 100644 --- a/substrate/in-instructions/pallet/src/lib.rs +++ b/substrate/in-instructions/pallet/src/lib.rs @@ -23,6 +23,8 @@ pub enum PalletError { #[frame_support::pallet] pub mod pallet { use sp_application_crypto::RuntimePublic; + use sp_runtime::traits::Zero; + use sp_core::sr25519::Public; use frame_support::pallet_prelude::*; use frame_system::pallet_prelude::*; @@ -55,10 +57,16 @@ pub mod pallet { #[pallet::getter(fn batches)] pub(crate) type Batches = StorageMap<_, Blake2_256, NetworkId, u32, OptionQuery>; + // The last Serai block in which this validator set included a batch + #[pallet::storage] + #[pallet::getter(fn last_batch_block)] + pub(crate) type LastBatchBlock = + StorageMap<_, Blake2_256, NetworkId, BlockNumberFor, OptionQuery>; + // The latest block a network has acknowledged as finalized #[pallet::storage] - #[pallet::getter(fn last_block)] - pub(crate) type LatestBlock = + #[pallet::getter(fn latest_network_block)] + pub(crate) type LatestNetworkBlock = StorageMap<_, Blake2_256, NetworkId, BlockHash, OptionQuery>; impl Pallet { @@ -71,6 +79,31 @@ pub mod pallet { } } + fn key_for_network(network: NetworkId) -> Result { + // TODO: Get the latest session + let session = Session(0); + + let mut set = ValidatorSet { session, network }; + // TODO: If this session just set their keys, it'll invalidate any batches in the mempool + // Should there be a transitory period/future-set cut off? + if let Some(keys) = ValidatorSets::::keys(set) { + Ok(keys.0) + } else { + // If this set hasn't set their keys yet, use the previous set's + if set.session.0 == 0 { + // Since there haven't been any keys set, no signature can legitimately exist + Err(InvalidTransaction::BadProof)?; + } + set.session.0 -= 1; + + if let Some(keys) = ValidatorSets::::keys(set) { + Ok(keys.0) + } else { + Err(InvalidTransaction::BadProof)? + } + } + } + #[pallet::call] impl Pallet { #[pallet::call_index(0)] @@ -78,16 +111,20 @@ pub mod pallet { pub fn execute_batch(origin: OriginFor, batch: SignedBatch) -> DispatchResult { ensure_none(origin)?; - let mut batch = batch.batch; + let batch = batch.batch; + + // TODO: Test validate_unsigned is actually called prior to execution, which is required for + // this to be safe + LastBatchBlock::::insert(batch.network, frame_system::Pallet::::block_number()); Batches::::insert(batch.network, batch.id); - LatestBlock::::insert(batch.network, batch.block); + LatestNetworkBlock::::insert(batch.network, batch.block); Self::deposit_event(Event::Batch { network: batch.network, id: batch.id, block: batch.block, }); - for (i, instruction) in batch.instructions.drain(..).enumerate() { + for (i, instruction) in batch.instructions.into_iter().enumerate() { // TODO: Check this balance's coin belongs to this network // If they don't, the validator set should be completely slashed, without question @@ -116,37 +153,30 @@ pub mod pallet { }; let network = batch.batch.network; + let key = key_for_network::(network)?; - // TODO: Get the latest session - let session = Session(0); - - let mut set = ValidatorSet { session, network }; - // TODO: If this session just set their keys, it'll invalidate anything in the mempool - // Should there be a transitory period/future-set cut off? - let key = if let Some(keys) = ValidatorSets::::keys(set) { - keys.0 - } else { - // If this set hasn't set their keys yet, use the previous set's - if set.session.0 == 0 { - Err(InvalidTransaction::BadProof)?; - } - set.session.0 -= 1; - - if let Some(keys) = ValidatorSets::::keys(set) { - keys.0 - } else { - Err(InvalidTransaction::BadProof)? - } - }; + // verify the batch size + // TODO: Merge this encode with the one done by batch_message + if batch.batch.encode().len() > MAX_BATCH_SIZE { + Err(InvalidTransaction::ExhaustsResources)?; + } + // verify the signature if !key.verify(&batch_message(&batch.batch), &batch.signature) { Err(InvalidTransaction::BadProof)?; } + // check that this validator set isn't publishing a batch more than once per block + let current_block = >::block_number(); + let last_block = LastBatchBlock::::get(network).unwrap_or(Zero::zero()); + if last_block >= current_block { + Err(InvalidTransaction::Future)?; + } + // Verify the batch is sequential // Batches has the last ID set. The next ID should be it + 1 // If there's no ID, the next ID should be 0 - let expected = Batches::::get(network).map(|prev| prev + 1).unwrap_or(0); + let expected = Batches::::get(network).map_or(0, |prev| prev + 1); if batch.batch.id < expected { Err(InvalidTransaction::Stale)?; } diff --git a/substrate/in-instructions/primitives/src/lib.rs b/substrate/in-instructions/primitives/src/lib.rs index 62dd54d6..f4a730e1 100644 --- a/substrate/in-instructions/primitives/src/lib.rs +++ b/substrate/in-instructions/primitives/src/lib.rs @@ -21,6 +21,8 @@ use serai_primitives::{BlockHash, Balance, NetworkId, SeraiAddress, ExternalAddr mod shorthand; pub use shorthand::*; +pub const MAX_BATCH_SIZE: usize = 25_000; // ~25kb + #[derive( Clone, PartialEq, Eq, Debug, Serialize, Deserialize, Encode, Decode, MaxEncodedLen, TypeInfo, )] diff --git a/tests/processor/src/tests/batch.rs b/tests/processor/src/tests/batch.rs index cc07487f..25391524 100644 --- a/tests/processor/src/tests/batch.rs +++ b/tests/processor/src/tests/batch.rs @@ -22,6 +22,7 @@ pub(crate) async fn recv_batch_preprocesses( attempt: u32, ) -> (SignId, HashMap>) { let mut id = None; + let mut block = None; let mut preprocesses = HashMap::new(); for (i, coordinator) in coordinators.iter_mut().enumerate() { let i = Participant::new(u16::try_from(i).unwrap() + 1).unwrap(); @@ -29,14 +30,20 @@ pub(crate) async fn recv_batch_preprocesses( let msg = coordinator.recv_message().await; match msg { messages::ProcessorMessage::Coordinator( - messages::coordinator::ProcessorMessage::BatchPreprocess { id: this_id, preprocess }, + messages::coordinator::ProcessorMessage::BatchPreprocess { + id: this_id, + block: this_block, + preprocess, + }, ) => { if id.is_none() { assert_eq!(&this_id.key, &key); assert_eq!(this_id.attempt, attempt); id = Some(this_id.clone()); + block = Some(this_block); } assert_eq!(&this_id, id.as_ref().unwrap()); + assert_eq!(&this_block, block.as_ref().unwrap()); preprocesses.insert(i, preprocess); } @@ -147,6 +154,7 @@ pub(crate) async fn substrate_block( block: sent_block, key: _, burns: _, + batches: _, } => { coordinator.send_message(block).await; match coordinator.recv_message().await { @@ -280,6 +288,7 @@ fn batch_test() { // TODO: Should we use the network key here? Or should we only use the Ristretto key? key: key_pair.1.to_vec(), burns: vec![], + batches: vec![batch.batch.id], }, ) .await diff --git a/tests/processor/src/tests/send.rs b/tests/processor/src/tests/send.rs index dcf6bd6b..d0aa8643 100644 --- a/tests/processor/src/tests/send.rs +++ b/tests/processor/src/tests/send.rs @@ -219,6 +219,7 @@ fn send_test() { instruction: OutInstruction { address: wallet.address(), data: None }, balance: balance_sent, }], + batches: vec![batch.batch.id], }, ) .await;