diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index a37e05f4..e3e31c38 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -9,7 +9,10 @@ use serai_in_instructions_primitives::InInstructionWithBalance; use primitives::{ReceivedOutput, EncodableG}; -use crate::{lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, Return}; +use crate::{ + lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, Return, + scan::next_to_scan_for_outputs_block, +}; // The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this. trait Borshy: BorshSerialize + BorshDeserialize {} @@ -35,7 +38,7 @@ pub(crate) struct OutputWithInInstruction<S: ScannerFeed> { } impl<S: ScannerFeed> OutputWithInInstruction<S> { - fn write(&self, writer: &mut impl io::Write) -> io::Result<()> { + pub(crate) fn write(&self, writer: &mut impl io::Write) -> io::Result<()> { self.output.write(writer)?; // TODO self.return_address.write(writer)?; self.in_instruction.encode_to(writer); @@ -48,8 +51,6 @@ create_db!( ActiveKeys: <K: Borshy>() -> Vec<SeraiKeyDbEntry<K>>, RetireAt: <K: Encode>(key: K) -> u64, - // The next block to scan for received outputs - NextToScanForOutputsBlock: () -> u64, // The next block to potentially report NextToPotentiallyReportBlock: () -> u64, // Highest acknowledged block @@ -74,9 +75,6 @@ create_db!( */ // This collapses from `bool` to `()`, using if the value was set for true and false otherwise NotableBlock: (number: u64) -> (), - - SerializedQueuedOutputs: (block_number: u64) -> Vec<u8>, - SerializedOutputs: (block_number: u64) -> Vec<u8>, } ); @@ -127,7 +125,7 @@ impl<S: ScannerFeed> ScannerDb<S> { let Some(key) = keys.first() else { return }; // Get the block we're scanning for next - let block_number = Self::next_to_scan_for_outputs_block(txn).expect( + let block_number = next_to_scan_for_outputs_block::<S>(txn).expect( "tidying keys despite never setting the next to scan for block (done on initialization)", ); // If this key is scheduled for retiry... @@ -150,7 +148,7 @@ impl<S: ScannerFeed> ScannerDb<S> { ) -> Option<Vec<SeraiKey<KeyFor<S>>>> { // We don't take this as an argument as we don't keep all historical keys in memory // If we've scanned block 1,000,000, we can't answer the active keys as of block 0 - let block_number = Self::next_to_scan_for_outputs_block(getter)?; + let block_number = next_to_scan_for_outputs_block::<S>(getter)?; let raw_keys: Vec<SeraiKeyDbEntry<EncodableG<KeyFor<S>>>> = ActiveKeys::get(getter)?; let mut keys = Vec::with_capacity(2); @@ -183,25 +181,9 @@ impl<S: ScannerFeed> ScannerDb<S> { } pub(crate) fn set_start_block(txn: &mut impl DbTxn, start_block: u64, id: [u8; 32]) { - assert!( - NextToScanForOutputsBlock::get(txn).is_none(), - "setting start block but prior set start block" - ); - - NextToScanForOutputsBlock::set(txn, &start_block); NextToPotentiallyReportBlock::set(txn, &start_block); } - pub(crate) fn set_next_to_scan_for_outputs_block( - txn: &mut impl DbTxn, - next_to_scan_for_outputs_block: u64, - ) { - NextToScanForOutputsBlock::set(txn, &next_to_scan_for_outputs_block); - } - pub(crate) fn next_to_scan_for_outputs_block(getter: &impl Get) -> Option<u64> { - NextToScanForOutputsBlock::get(getter) - } - pub(crate) fn set_next_to_potentially_report_block( txn: &mut impl DbTxn, next_to_potentially_report_block: u64, @@ -222,24 +204,6 @@ impl<S: ScannerFeed> ScannerDb<S> { HighestAcknowledgedBlock::get(getter) } - pub(crate) fn take_queued_outputs( - txn: &mut impl DbTxn, - block_number: u64, - ) -> Vec<OutputWithInInstruction<S>> { - todo!("TODO") - } - - pub(crate) fn queue_output_until_block( - txn: &mut impl DbTxn, - queue_for_block: u64, - output: &OutputWithInInstruction<S>, - ) { - let mut outputs = - SerializedQueuedOutputs::get(txn, queue_for_block).unwrap_or(Vec::with_capacity(128)); - output.write(&mut outputs).unwrap(); - SerializedQueuedOutputs::set(txn, queue_for_block, &outputs); - } - /* This is so verbosely named as the DB itself already flags upon external outputs. Specifically, if any block yields External outputs to accumulate, we flag it as notable. diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index f682bf36..a29e5301 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -9,6 +9,7 @@ use crate::{ lifetime::LifetimeStage, db::{OutputWithInInstruction, ReceiverScanData, ScannerDb, ScanToEventualityDb}, BlockExt, ScannerFeed, KeyFor, SchedulerUpdate, Scheduler, sort_outputs, + scan::{next_to_scan_for_outputs_block, queue_output_until_block}, }; mod db; @@ -104,7 +105,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas */ let exclusive_upper_bound = { // Fetch the next to scan block - let next_to_scan = ScannerDb::<S>::next_to_scan_for_outputs_block(&self.db) + let next_to_scan = next_to_scan_for_outputs_block::<S>(&self.db) .expect("EventualityTask run before writing the start block"); // If we haven't done any work, return if next_to_scan == 0 { @@ -229,7 +230,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas &txn, &forwarded, ) .expect("forwarded an output yet didn't save its InInstruction to the DB"); - ScannerDb::<S>::queue_output_until_block( + queue_output_until_block::<S>( &mut txn, b + S::WINDOW_LENGTH, &OutputWithInInstruction { output: output.clone(), return_address, in_instruction }, diff --git a/processor/scanner/src/report.rs b/processor/scanner/src/report.rs index 39a72106..f69459f0 100644 --- a/processor/scanner/src/report.rs +++ b/processor/scanner/src/report.rs @@ -7,7 +7,9 @@ use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; // TODO: Localize to Report? use crate::{ db::{ScannerDb, ScanToReportDb}, - index, ScannerFeed, ContinuallyRan, + index, + scan::next_to_scan_for_outputs_block, + ScannerFeed, ContinuallyRan, }; /* @@ -27,7 +29,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> { async fn run_iteration(&mut self) -> Result<bool, String> { let highest_reportable = { // Fetch the next to scan block - let next_to_scan = ScannerDb::<S>::next_to_scan_for_outputs_block(&self.db) + let next_to_scan = next_to_scan_for_outputs_block::<S>(&self.db) .expect("ReportTask run before writing the start block"); // If we haven't done any work, return if next_to_scan == 0 { diff --git a/processor/scanner/src/scan/db.rs b/processor/scanner/src/scan/db.rs new file mode 100644 index 00000000..905e10be --- /dev/null +++ b/processor/scanner/src/scan/db.rs @@ -0,0 +1,59 @@ +use core::marker::PhantomData; +use std::io; + +use scale::Encode; +use borsh::{BorshSerialize, BorshDeserialize}; +use serai_db::{Get, DbTxn, create_db}; + +use serai_in_instructions_primitives::InInstructionWithBalance; + +use primitives::{EncodableG, ReceivedOutput, EventualityTracker}; + +use crate::{ + lifetime::LifetimeStage, db::OutputWithInInstruction, ScannerFeed, KeyFor, AddressFor, OutputFor, + EventualityFor, Return, scan::next_to_scan_for_outputs_block, +}; + +// The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this. +trait Borshy: BorshSerialize + BorshDeserialize {} +impl<T: BorshSerialize + BorshDeserialize> Borshy for T {} + +create_db!( + ScannerScan { + // The next block to scan for received outputs + NextToScanForOutputsBlock: () -> u64, + + SerializedQueuedOutputs: (block_number: u64) -> Vec<u8>, + } +); + +pub(crate) struct ScanDb<S: ScannerFeed>(PhantomData<S>); +impl<S: ScannerFeed> ScanDb<S> { + pub(crate) fn set_next_to_scan_for_outputs_block( + txn: &mut impl DbTxn, + next_to_scan_for_outputs_block: u64, + ) { + NextToScanForOutputsBlock::set(txn, &next_to_scan_for_outputs_block); + } + pub(crate) fn next_to_scan_for_outputs_block(getter: &impl Get) -> Option<u64> { + NextToScanForOutputsBlock::get(getter) + } + + pub(crate) fn take_queued_outputs( + txn: &mut impl DbTxn, + block_number: u64, + ) -> Vec<OutputWithInInstruction<S>> { + todo!("TODO") + } + + pub(crate) fn queue_output_until_block( + txn: &mut impl DbTxn, + queue_for_block: u64, + output: &OutputWithInInstruction<S>, + ) { + let mut outputs = + SerializedQueuedOutputs::get(txn, queue_for_block).unwrap_or(Vec::with_capacity(128)); + output.write(&mut outputs).unwrap(); + SerializedQueuedOutputs::set(txn, queue_for_block, &outputs); + } +} diff --git a/processor/scanner/src/scan/mod.rs b/processor/scanner/src/scan/mod.rs index 201f64a1..1f143809 100644 --- a/processor/scanner/src/scan/mod.rs +++ b/processor/scanner/src/scan/mod.rs @@ -1,5 +1,5 @@ use scale::Decode; -use serai_db::{DbTxn, Db}; +use serai_db::{Get, DbTxn, Db}; use serai_primitives::MAX_DATA_LEN; use serai_in_instructions_primitives::{ @@ -16,6 +16,27 @@ use crate::{ eventuality::latest_scannable_block, }; +mod db; +use db::ScanDb; + +pub(crate) fn next_to_scan_for_outputs_block<S: ScannerFeed>(getter: &impl Get) -> Option<u64> { + ScanDb::<S>::next_to_scan_for_outputs_block(getter) +} + +pub(crate) fn queue_output_until_block<S: ScannerFeed>( + txn: &mut impl DbTxn, + queue_for_block: u64, + output: &OutputWithInInstruction<S>, +) { + assert!( + queue_for_block >= + next_to_scan_for_outputs_block::<S>(txn) + .expect("queueing an output despite no next-to-scan-for-outputs block"), + "queueing an output for a block already scanned" + ); + ScanDb::<S>::queue_output_until_block(txn, queue_for_block, output) +} + // Construct an InInstruction from an external output. // // Also returns the address to return the coins to upon error. @@ -66,6 +87,19 @@ struct ScanForOutputsTask<D: Db, S: ScannerFeed> { feed: S, } +impl<D: Db, S: ScannerFeed> ScanForOutputsTask<D, S> { + pub(crate) fn new(mut db: D, feed: S, start_block: u64) -> Self { + if ScanDb::<S>::next_to_scan_for_outputs_block(&db).is_none() { + // Initialize the DB + let mut txn = db.txn(); + ScanDb::<S>::set_next_to_scan_for_outputs_block(&mut txn, start_block); + txn.commit(); + } + + Self { db, feed } + } +} + #[async_trait::async_trait] impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> { async fn run_iteration(&mut self) -> Result<bool, String> { @@ -73,7 +107,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> { let latest_scannable = latest_scannable_block::<S>(&self.db) .expect("ScanForOutputsTask run before writing the start block"); // Fetch the next block to scan - let next_to_scan = ScannerDb::<S>::next_to_scan_for_outputs_block(&self.db) + let next_to_scan = ScanDb::<S>::next_to_scan_for_outputs_block(&self.db) .expect("ScanForOutputsTask run before writing the start block"); for b in next_to_scan ..= latest_scannable { @@ -83,7 +117,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> { let mut txn = self.db.txn(); - assert_eq!(ScannerDb::<S>::next_to_scan_for_outputs_block(&txn).unwrap(), b); + assert_eq!(ScanDb::<S>::next_to_scan_for_outputs_block(&txn).unwrap(), b); // Tidy the keys, then fetch them // We don't have to tidy them here, we just have to somewhere, so why not here? @@ -100,7 +134,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> { let mut in_instructions = vec![]; let queued_outputs = { - let mut queued_outputs = ScannerDb::<S>::take_queued_outputs(&mut txn, b); + let mut queued_outputs = ScanDb::<S>::take_queued_outputs(&mut txn, b); // Sort the queued outputs in case they weren't queued in a deterministic fashion queued_outputs.sort_by(|a, b| sort_outputs(&a.output, &b.output)); queued_outputs @@ -217,7 +251,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> { // This multisig isn't yet reporting its External outputs to avoid a DoS // Queue the output to be reported when this multisig starts reporting LifetimeStage::ActiveYetNotReporting => { - ScannerDb::<S>::queue_output_until_block( + ScanDb::<S>::queue_output_until_block( &mut txn, key.block_at_which_reporting_starts, &output_with_in_instruction, @@ -253,7 +287,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> { // Send the InInstructions to the report task ScanToReportDb::<S>::send_in_instructions(&mut txn, b, in_instructions); // Update the next to scan block - ScannerDb::<S>::set_next_to_scan_for_outputs_block(&mut txn, b + 1); + ScanDb::<S>::set_next_to_scan_for_outputs_block(&mut txn, b + 1); txn.commit(); }