diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 7eb276ce..fa2db781 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -24,8 +24,9 @@ struct SeraiKeyDbEntry { } pub(crate) struct SeraiKey { - pub(crate) stage: LifetimeStage, pub(crate) key: K, + pub(crate) stage: LifetimeStage, + pub(crate) block_at_which_reporting_starts: u64, } pub(crate) struct OutputWithInInstruction { @@ -81,6 +82,9 @@ create_db!( // This collapses from `bool` to `()`, using if the value was set for true and false otherwise NotableBlock: (number: u64) -> (), + SerializedQueuedOutputs: (block_number: u64) -> Vec, + SerializedForwardedOutputsIndex: (block_number: u64) -> Vec, + SerializedForwardedOutput: (output_id: &[u8]) -> Vec, SerializedOutputs: (block_number: u64) -> Vec, } ); @@ -138,14 +142,13 @@ impl ScannerDb { if block_number < raw_keys[i].activation_block_number { continue; } - keys.push(SeraiKey { - key: raw_keys[i].key.0, - stage: LifetimeStage::calculate::( + let (stage, block_at_which_reporting_starts) = + LifetimeStage::calculate_stage_and_reporting_start_block::( block_number, raw_keys[i].activation_block_number, raw_keys.get(i + 1).map(|key| key.activation_block_number), - ), - }); + ); + keys.push(SeraiKey { key: raw_keys[i].key.0, stage, block_at_which_reporting_starts }); } assert!(keys.len() <= 2); Some(keys) @@ -226,6 +229,53 @@ impl ScannerDb { HighestAcknowledgedBlock::get(getter) } + pub(crate) fn take_queued_outputs( + txn: &mut impl DbTxn, + block_number: u64, + ) -> Vec> { + todo!("TODO") + } + + pub(crate) fn queue_return( + txn: &mut impl DbTxn, + block_queued_from: u64, + return_addr: AddressFor, + output: OutputFor, + ) { + todo!("TODO") + } + + pub(crate) fn queue_output_until_block( + txn: &mut impl DbTxn, + queue_for_block: u64, + output: &OutputWithInInstruction, + ) { + let mut outputs = + SerializedQueuedOutputs::get(txn, queue_for_block).unwrap_or(Vec::with_capacity(128)); + output.write(&mut outputs).unwrap(); + SerializedQueuedOutputs::set(txn, queue_for_block, &outputs); + } + + pub(crate) fn save_output_being_forwarded( + txn: &mut impl DbTxn, + block_forwarded_from: u64, + output: &OutputWithInInstruction, + ) { + let mut buf = Vec::with_capacity(128); + output.write(&mut buf).unwrap(); + + let id = output.output.id(); + + // Save this to an index so we can later fetch all outputs to forward + let mut forwarded_outputs = SerializedForwardedOutputsIndex::get(txn, block_forwarded_from) + .unwrap_or(Vec::with_capacity(32)); + forwarded_outputs.extend(id.as_ref()); + SerializedForwardedOutputsIndex::set(txn, block_forwarded_from, &forwarded_outputs); + + // Save the output itself + SerializedForwardedOutput::set(txn, id.as_ref(), &buf); + } + pub(crate) fn set_in_instructions( txn: &mut impl DbTxn, block_number: u64, diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 7bd8cc2e..0a26f177 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -195,6 +195,8 @@ impl Scanner { } /// Register the Eventualities caused by a block. + // TODO: Replace this with a callback returned by acknowledge_block which panics if it's not + // called yet dropped pub fn register_eventualities(&mut self, block_number: u64, eventualities: Vec<()>) { todo!("TODO") } diff --git a/processor/scanner/src/lifetime.rs b/processor/scanner/src/lifetime.rs index 62ee91c3..6d189bca 100644 --- a/processor/scanner/src/lifetime.rs +++ b/processor/scanner/src/lifetime.rs @@ -35,16 +35,16 @@ pub(crate) enum LifetimeStage { } impl LifetimeStage { - /// Get the stage of its lifetime this multisig is in based on when the next multisig's key - /// activates. + /// Get the stage of its lifetime this multisig is in, and the block at which we start reporting + /// outputs to it. /// /// Panics if the multisig being calculated for isn't actually active and a variety of other /// insane cases. - pub(crate) fn calculate( + pub(crate) fn calculate_stage_and_reporting_start_block( block_number: u64, activation_block_number: u64, next_keys_activation_block_number: Option, - ) -> Self { + ) -> (Self, u64) { assert!( activation_block_number >= block_number, "calculating lifetime stage for an inactive multisig" @@ -53,13 +53,15 @@ impl LifetimeStage { // activation block itself is the first block within this window let active_yet_not_reporting_end_block = activation_block_number + S::CONFIRMATIONS + S::TEN_MINUTES; + // The exclusive end block is the inclusive start block + let reporting_start_block = active_yet_not_reporting_end_block; if block_number < active_yet_not_reporting_end_block { - return LifetimeStage::ActiveYetNotReporting; + return (LifetimeStage::ActiveYetNotReporting, reporting_start_block); } let Some(next_keys_activation_block_number) = next_keys_activation_block_number else { // If there is no next multisig, this is the active multisig - return LifetimeStage::Active; + return (LifetimeStage::Active, reporting_start_block); }; assert!( @@ -72,14 +74,14 @@ impl LifetimeStage { let new_active_yet_not_reporting_end_block = next_keys_activation_block_number + S::CONFIRMATIONS + S::TEN_MINUTES; if block_number < new_active_yet_not_reporting_end_block { - return LifetimeStage::Active; + return (LifetimeStage::Active, reporting_start_block); } // Step 4 details a further CONFIRMATIONS let new_active_and_used_for_change_end_block = new_active_yet_not_reporting_end_block + S::CONFIRMATIONS; if block_number < new_active_and_used_for_change_end_block { - return LifetimeStage::UsingNewForChange; + return (LifetimeStage::UsingNewForChange, reporting_start_block); } // Step 5 details a further 6 hours @@ -87,10 +89,10 @@ impl LifetimeStage { let new_active_and_forwarded_to_end_block = new_active_and_used_for_change_end_block + (6 * 6 * S::TEN_MINUTES); if block_number < new_active_and_forwarded_to_end_block { - return LifetimeStage::Forwarding; + return (LifetimeStage::Forwarding, reporting_start_block); } // Step 6 - LifetimeStage::Finishing + (LifetimeStage::Finishing, reporting_start_block) } } diff --git a/processor/scanner/src/scan.rs b/processor/scanner/src/scan.rs index 13332586..e35eb749 100644 --- a/processor/scanner/src/scan.rs +++ b/processor/scanner/src/scan.rs @@ -103,7 +103,10 @@ impl ContinuallyRan for ScanForOutputsTask { let mut keys = ScannerDb::::active_keys_as_of_next_to_scan_for_outputs_block(&self.db) .expect("scanning for a blockchain without any keys set"); - let mut in_instructions = vec![]; + let mut txn = self.db.txn(); + + let mut in_instructions = ScannerDb::::take_queued_outputs(&mut txn, b); + // Scan for each key for key in keys { for output in block.scan_for_outputs(key.key) { @@ -152,24 +155,6 @@ impl ContinuallyRan for ScanForOutputsTask { continue; } - // Drop External outputs if they're to a multisig which won't report them - // This means we should report any External output we save to disk here - #[allow(clippy::match_same_arms)] - match key.stage { - // TODO: Delay External outputs - LifetimeStage::ActiveYetNotReporting => todo!("TODO"), - // We should report External outputs in these cases - LifetimeStage::Active | LifetimeStage::UsingNewForChange => {} - // We should report External outputs only once forwarded, where they'll appear as - // OutputType::Forwarded - LifetimeStage::Forwarding => todo!("TODO"), - // We should drop these as we should not be handling new External outputs at this - // time - LifetimeStage::Finishing => { - continue; - } - } - // Check this isn't dust let balance_to_use = { let mut balance = output.balance(); @@ -190,27 +175,59 @@ impl ContinuallyRan for ScanForOutputsTask { balance }; - // Decode and save the InInstruction/return addr for this output - match in_instruction_from_output::(&output) { - (return_address, Some(instruction)) => { - let in_instruction = - InInstructionWithBalance { instruction, balance: balance_to_use }; - // TODO: Make a proper struct out of this - in_instructions.push(OutputWithInInstruction { - output, - return_address, - in_instruction, - }); - todo!("TODO: Save to be reported") + // Fetch the InInstruction/return addr for this output + let output_with_in_instruction = match in_instruction_from_output::(&output) { + (return_address, Some(instruction)) => OutputWithInInstruction { + output, + return_address, + in_instruction: InInstructionWithBalance { instruction, balance: balance_to_use }, + }, + (Some(return_addr), None) => { + // Since there was no instruction here, return this since we parsed a return address + ScannerDb::::queue_return(&mut txn, b, return_addr, output); + continue; + } + // Since we didn't receive an instruction nor can we return this, move on + (None, None) => continue, + }; + + // Drop External outputs if they're to a multisig which won't report them + // This means we should report any External output we save to disk here + #[allow(clippy::match_same_arms)] + match key.stage { + // This multisig isn't yet reporting its External outputs to avoid a DoS + // Queue the output to be reported when this multisig starts reporting + LifetimeStage::ActiveYetNotReporting => { + ScannerDb::::queue_output_until_block( + &mut txn, + key.block_at_which_reporting_starts, + &output_with_in_instruction, + ); + continue; + } + // We should report External outputs in these cases + LifetimeStage::Active | LifetimeStage::UsingNewForChange => {} + // We should report External outputs only once forwarded, where they'll appear as + // OutputType::Forwarded. We save them now for when they appear + LifetimeStage::Forwarding => { + // When the forwarded output appears, we can see which Plan it's associated with and + // from there recover this output + ScannerDb::::save_output_being_forwarded(&mut txn, &output_with_in_instruction); + continue; + } + // We should drop these as we should not be handling new External outputs at this + // time + LifetimeStage::Finishing => { + continue; } - (Some(return_addr), None) => todo!("TODO: Queue return"), - // Since we didn't receive an instruction nor can we return this, accumulate it - (None, None) => {} } + // Ensures we didn't miss a `continue` above + assert!(matches!(key.stage, LifetimeStage::Active | LifetimeStage::UsingNewForChange)); + + in_instructions.push(output_with_in_instruction); } } - let mut txn = self.db.txn(); // Save the in instructions ScannerDb::::set_in_instructions(&mut txn, b, in_instructions); // Update the next to scan block