diff --git a/processor/src/main.rs b/processor/src/main.rs index f562925b..68082946 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -408,6 +408,7 @@ async fn run(raw_db: D, coin: C, mut coordinato .get_mut(&key_vec) .expect("key we don't have a scheduler for acknowledged a block") .add_outputs(scanner.ack_block(key, block_id).await); + sign_plans( &mut main_db, &coin, @@ -459,18 +460,15 @@ async fn run(raw_db: D, coin: C, mut coordinato msg = scanner.events.recv() => { match msg.unwrap() { - ScannerEvent::Block(key, block, time, outputs) => { + ScannerEvent::Block { key, block, time, batch, outputs } => { let key = key.to_bytes().as_ref().to_vec(); let mut block_hash = [0; 32]; block_hash.copy_from_slice(block.as_ref()); - // TODO - let id = 0; - let batch = Batch { network: C::NETWORK, - id, + id: batch, block: BlockHash(block_hash), instructions: outputs.iter().filter_map(|output| { // If these aren't externally received funds, don't handle it as an instruction diff --git a/processor/src/scanner.rs b/processor/src/scanner.rs index 4082b8d9..9fdf8492 100644 --- a/processor/src/scanner.rs +++ b/processor/src/scanner.rs @@ -22,7 +22,13 @@ use crate::{ #[derive(Clone, Debug)] pub enum ScannerEvent { // Block scanned - Block(::G, >::Id, SystemTime, Vec), + Block { + key: ::G, + block: >::Id, + time: SystemTime, + batch: u32, + outputs: Vec, + }, // Eventuality completion found on-chain Completed([u8; 32], >::Id), } @@ -111,18 +117,17 @@ impl ScannerDb { self.0.get(Self::seen_key(id)).is_some() } + fn next_batch_key() -> Vec { + Self::scanner_key(b"next_batch", []) + } + fn batch_key(key: &::G, block: &>::Id) -> Vec { + Self::scanner_key(b"batch", [key.to_bytes().as_ref(), block.as_ref()].concat()) + } fn outputs_key( key: &::G, block: &>::Id, ) -> Vec { - let key_bytes = key.to_bytes(); - let key = key_bytes.as_ref(); - // This should be safe without the bincode serialize. Using bincode lets us not worry/have to - // think about this - let db_key = bincode::serialize(&(key, block.as_ref())).unwrap(); - // Assert this is actually length prefixing - debug_assert!(db_key.len() >= (1 + key.len() + 1 + block.as_ref().len())); - Self::scanner_key(b"outputs", db_key) + Self::scanner_key(b"outputs", [key.to_bytes().as_ref(), block.as_ref()].concat()) } fn save_outputs( &mut self, @@ -130,12 +135,34 @@ impl ScannerDb { key: &::G, block: &>::Id, outputs: &[C::Output], - ) { + ) -> u32 { + let batch_key = Self::batch_key(key, block); + if let Some(batch) = txn.get(batch_key) { + return u32::from_le_bytes(batch.try_into().unwrap()); + } + let mut bytes = Vec::with_capacity(outputs.len() * 64); for output in outputs { output.write(&mut bytes).unwrap(); } txn.put(Self::outputs_key(key, block), bytes); + + // This is a new set of outputs, which are expected to be handled in a perfectly ordered + // fashion + + // TODO2: This is not currently how this works + // There may be new blocks 0 .. 5, which A will scan, yet then B may be activated at block 4 + // This would cause + // 0a, 1a, 2a, 3a, 4a, 5a, 4b, 5b + // when it should be + // 0a, 1a, 2a, 3a, 4a, 4b, 5a, 5b + + // Because it's a new set of outputs, allocate a batch ID for it + let next_bytes = txn.get(Self::next_batch_key()).unwrap_or(vec![0; 4]).try_into().unwrap(); + let next = u32::from_le_bytes(next_bytes); + txn.put(Self::next_batch_key(), (next + 1).to_le_bytes()); + txn.put(Self::batch_key(key, block), next_bytes); + next } fn outputs( &self, @@ -434,7 +461,7 @@ impl Scanner { // Save the outputs to disk let mut txn = scanner.db.0.txn(); - scanner.db.save_outputs(&mut txn, &key, &block_id, &outputs); + let batch = scanner.db.save_outputs(&mut txn, &key, &block_id, &outputs); txn.commit(); const TIME_TOLERANCE: u64 = 15; @@ -477,7 +504,7 @@ impl Scanner { } // Send all outputs - if !scanner.emit(ScannerEvent::Block(key, block_id, time, outputs)) { + if !scanner.emit(ScannerEvent::Block { key, block: block_id, time, batch, outputs }) { return; } // Write this number as scanned so we won't re-fire these outputs diff --git a/processor/src/tests/addresses.rs b/processor/src/tests/addresses.rs index 29a15f58..f31ee942 100644 --- a/processor/src/tests/addresses.rs +++ b/processor/src/tests/addresses.rs @@ -18,6 +18,7 @@ async fn spend( coin: &C, keys: &HashMap>, scanner: &mut ScannerHandle, + batch: u32, outputs: Vec, ) -> Vec { let key = keys[&Participant::new(1).unwrap()].group_key(); @@ -49,8 +50,9 @@ async fn spend( coin.mine_block().await; } match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Block(this_key, _, _, outputs) => { + ScannerEvent::Block { key: this_key, block: _, time: _, batch: this_batch, outputs } => { assert_eq!(this_key, key); + assert_eq!(this_batch, batch); assert_eq!(outputs.len(), 1); // Make sure this is actually a change output assert_eq!(outputs[0].kind(), OutputType::Change); @@ -85,9 +87,10 @@ pub async fn test_addresses(coin: C) { // Verify the Scanner picked them up let outputs = match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Block(this_key, block, _, outputs) => { + ScannerEvent::Block { key: this_key, block, time: _, batch, outputs } => { assert_eq!(this_key, key); assert_eq!(block, block_id); + assert_eq!(batch, 0); assert_eq!(outputs.len(), 1); assert_eq!(outputs[0].kind(), OutputType::Branch); outputs @@ -98,7 +101,7 @@ pub async fn test_addresses(coin: C) { }; // Spend the branch output, creating a change output and ensuring we actually get change - let outputs = spend(&coin, &keys, &mut scanner, outputs).await; + let outputs = spend(&coin, &keys, &mut scanner, 1, outputs).await; // Also test spending the change output - spend(&coin, &keys, &mut scanner, outputs).await; + spend(&coin, &keys, &mut scanner, 2, outputs).await; } diff --git a/processor/src/tests/scanner.rs b/processor/src/tests/scanner.rs index e4cfd706..696b0348 100644 --- a/processor/src/tests/scanner.rs +++ b/processor/src/tests/scanner.rs @@ -48,10 +48,11 @@ pub async fn test_scanner(coin: C) { let verify_event = |mut scanner: ScannerHandle| async { let outputs = match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Block(key, block, time, outputs) => { + ScannerEvent::Block { key, block, time, batch, outputs } => { assert_eq!(key, keys.group_key()); assert_eq!(block, block_id); assert_eq!(time, block_time); + assert_eq!(batch, 0); assert_eq!(outputs.len(), 1); assert_eq!(outputs[0].kind(), OutputType::External); outputs diff --git a/processor/src/tests/wallet.rs b/processor/src/tests/wallet.rs index dd0a0aae..acb4f711 100644 --- a/processor/src/tests/wallet.rs +++ b/processor/src/tests/wallet.rs @@ -32,10 +32,11 @@ pub async fn test_wallet(coin: C) { let block_time = block.time(); match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Block(this_key, block, time, outputs) => { + ScannerEvent::Block { key: this_key, block, time, batch, outputs } => { assert_eq!(this_key, key); assert_eq!(block, block_id); assert_eq!(time, block_time); + assert_eq!(batch, 0); assert_eq!(outputs.len(), 1); (block_id, outputs) } @@ -102,10 +103,11 @@ pub async fn test_wallet(coin: C) { } match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Block(this_key, block_id, time, these_outputs) => { + ScannerEvent::Block { key: this_key, block: block_id, time, batch, outputs: these_outputs } => { assert_eq!(this_key, key); assert_eq!(block_id, block.id()); assert_eq!(time, block.time()); + assert_eq!(batch, 1); assert_eq!(these_outputs, outputs); } ScannerEvent::Completed(_, _) => {