From 65f3f485174a156119a17536bdb32a5938affa55 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 28 Aug 2024 19:58:28 -0400 Subject: [PATCH] Add ReportDb --- processor/scanner/src/db.rs | 24 +++--------------- processor/scanner/src/eventuality/mod.rs | 18 +++++++------- processor/scanner/src/report/db.rs | 27 +++++++++++++++++++++ processor/scanner/src/report/mod.rs | 31 ++++++++++++++++++------ processor/scanner/src/scan/mod.rs | 4 ++- 5 files changed, 65 insertions(+), 39 deletions(-) create mode 100644 processor/scanner/src/report/db.rs diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index e3e31c38..7a2d68a9 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -47,7 +47,7 @@ impl OutputWithInInstruction { } create_db!( - Scanner { + ScannerGlobal { ActiveKeys: () -> Vec>, RetireAt: (key: K) -> u64, @@ -78,8 +78,8 @@ create_db!( } ); -pub(crate) struct ScannerDb(PhantomData); -impl ScannerDb { +pub(crate) struct ScannerGlobalDb(PhantomData); +impl ScannerGlobalDb { /// Queue a key. /// /// Keys may be queued whenever, so long as they're scheduled to activate `WINDOW_LENGTH` blocks @@ -180,20 +180,6 @@ impl ScannerDb { Some(keys) } - pub(crate) fn set_start_block(txn: &mut impl DbTxn, start_block: u64, id: [u8; 32]) { - NextToPotentiallyReportBlock::set(txn, &start_block); - } - - pub(crate) fn set_next_to_potentially_report_block( - txn: &mut impl DbTxn, - next_to_potentially_report_block: u64, - ) { - NextToPotentiallyReportBlock::set(txn, &next_to_potentially_report_block); - } - pub(crate) fn next_to_potentially_report_block(getter: &impl Get) -> Option { - NextToPotentiallyReportBlock::get(getter) - } - pub(crate) fn set_highest_acknowledged_block( txn: &mut impl DbTxn, highest_acknowledged_block: u64, @@ -224,10 +210,6 @@ impl ScannerDb { NotableBlock::get(getter, number).is_some() } - pub(crate) fn acquire_batch_id(txn: &mut impl DbTxn) -> u32 { - todo!("TODO") - } - pub(crate) fn return_address_and_in_instruction_for_forwarded_output( getter: &impl Get, output: & as ReceivedOutput, AddressFor>>::Id, diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index a29e5301..e10aab54 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -4,10 +4,9 @@ use serai_db::{Get, DbTxn, Db}; use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Eventuality, Block}; -// TODO: Localize to EventualityDb? use crate::{ lifetime::LifetimeStage, - db::{OutputWithInInstruction, ReceiverScanData, ScannerDb, ScanToEventualityDb}, + db::{OutputWithInInstruction, ReceiverScanData, ScannerGlobalDb, ScanToEventualityDb}, BlockExt, ScannerFeed, KeyFor, SchedulerUpdate, Scheduler, sort_outputs, scan::{next_to_scan_for_outputs_block, queue_output_until_block}, }; @@ -69,7 +68,7 @@ pub(crate) fn latest_scannable_block(getter: &impl Get) -> Optio This forms a backlog only if the latency of scanning, acknowledgement, and intake (including checking Eventualities) exceeds the window duration (the desired property). */ -struct EventualityTask> { +pub(crate) struct EventualityTask> { db: D, feed: S, scheduler: Sch, @@ -115,7 +114,7 @@ impl> ContinuallyRan for EventualityTas }; // Fetch the highest acknowledged block - let highest_acknowledged = ScannerDb::::highest_acknowledged_block(&self.db) + let highest_acknowledged = ScannerGlobalDb::::highest_acknowledged_block(&self.db) .expect("EventualityTask run before writing the start block"); // Fetch the next block to check @@ -132,7 +131,7 @@ impl> ContinuallyRan for EventualityTas // This is possible since even if we receive coins in block 0, any transactions we'd make // would resolve in block 1 (the first block we'll check under this non-zero rule) let prior_block = b - 1; - if ScannerDb::::is_block_notable(&self.db, prior_block) && + if ScannerGlobalDb::::is_block_notable(&self.db, prior_block) && (prior_block > highest_acknowledged) { break; @@ -156,8 +155,9 @@ impl> ContinuallyRan for EventualityTas one which decides when to retire a key, and when it marks a key to be retired, it is done with it. Accordingly, it's not an issue if such a key was dropped. */ - let mut keys = ScannerDb::::active_keys_as_of_next_to_scan_for_outputs_block(&self.db) - .expect("scanning for a blockchain without any keys set"); + let mut keys = + ScannerGlobalDb::::active_keys_as_of_next_to_scan_for_outputs_block(&self.db) + .expect("scanning for a blockchain without any keys set"); // Since the next-to-scan block is ahead of us, drop keys which have yet to actually activate keys.retain(|key| b <= key.activation_block_number); @@ -226,7 +226,7 @@ impl> ContinuallyRan for EventualityTas }; let (return_address, in_instruction) = - ScannerDb::::return_address_and_in_instruction_for_forwarded_output( + ScannerGlobalDb::::return_address_and_in_instruction_for_forwarded_output( &txn, &forwarded, ) .expect("forwarded an output yet didn't save its InInstruction to the DB"); @@ -281,7 +281,7 @@ impl> ContinuallyRan for EventualityTas // Retire this key `WINDOW_LENGTH` blocks in the future to ensure the scan task never // has a malleable view of the keys. - ScannerDb::::retire_key(&mut txn, b + S::WINDOW_LENGTH, key.key); + ScannerGlobalDb::::retire_key(&mut txn, b + S::WINDOW_LENGTH, key.key); } } } diff --git a/processor/scanner/src/report/db.rs b/processor/scanner/src/report/db.rs new file mode 100644 index 00000000..cca2148e --- /dev/null +++ b/processor/scanner/src/report/db.rs @@ -0,0 +1,27 @@ +use core::marker::PhantomData; + +use serai_db::{Get, DbTxn, Db, create_db}; + +create_db!( + ScannerReport { + // The next block to potentially report + NextToPotentiallyReportBlock: () -> u64, + } +); + +pub(crate) struct ReportDb; +impl ReportDb { + pub(crate) fn set_next_to_potentially_report_block( + txn: &mut impl DbTxn, + next_to_potentially_report_block: u64, + ) { + NextToPotentiallyReportBlock::set(txn, &next_to_potentially_report_block); + } + pub(crate) fn next_to_potentially_report_block(getter: &impl Get) -> Option { + NextToPotentiallyReportBlock::get(getter) + } + + pub(crate) fn acquire_batch_id(txn: &mut impl DbTxn) -> u32 { + todo!("TODO") + } +} diff --git a/processor/scanner/src/report/mod.rs b/processor/scanner/src/report/mod.rs index f69459f0..95bbbbd2 100644 --- a/processor/scanner/src/report/mod.rs +++ b/processor/scanner/src/report/mod.rs @@ -4,14 +4,16 @@ use serai_db::{DbTxn, Db}; use serai_primitives::BlockHash; use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; -// TODO: Localize to Report? use crate::{ - db::{ScannerDb, ScanToReportDb}, + db::{ScannerGlobalDb, ScanToReportDb}, index, scan::next_to_scan_for_outputs_block, ScannerFeed, ContinuallyRan, }; +mod db; +use db::ReportDb; + /* This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion. @@ -19,11 +21,24 @@ use crate::{ Eventualities, have processed the block. This ensures we know if this block is notable, and have the InInstructions for it. */ -struct ReportTask { +pub(crate) struct ReportTask { db: D, feed: S, } +impl ReportTask { + pub(crate) fn new(mut db: D, feed: S, start_block: u64) -> Self { + if ReportDb::next_to_potentially_report_block(&db).is_none() { + // Initialize the DB + let mut txn = db.txn(); + ReportDb::set_next_to_potentially_report_block(&mut txn, start_block); + txn.commit(); + } + + Self { db, feed } + } +} + #[async_trait::async_trait] impl ContinuallyRan for ReportTask { async fn run_iteration(&mut self) -> Result { @@ -44,7 +59,7 @@ impl ContinuallyRan for ReportTask { last_scanned }; - let next_to_potentially_report = ScannerDb::::next_to_potentially_report_block(&self.db) + let next_to_potentially_report = ReportDb::next_to_potentially_report_block(&self.db) .expect("ReportTask run before writing the start block"); for b in next_to_potentially_report ..= highest_reportable { @@ -53,7 +68,7 @@ impl ContinuallyRan for ReportTask { // Receive the InInstructions for this block // We always do this as we can't trivially tell if we should recv InInstructions before we do let in_instructions = ScanToReportDb::::recv_in_instructions(&mut txn, b); - let notable = ScannerDb::::is_block_notable(&txn, b); + let notable = ScannerGlobalDb::::is_block_notable(&txn, b); if !notable { assert!(in_instructions.is_empty(), "block wasn't notable yet had InInstructions"); } @@ -61,7 +76,7 @@ impl ContinuallyRan for ReportTask { if notable { let network = S::NETWORK; let block_hash = index::block_id(&txn, b); - let mut batch_id = ScannerDb::::acquire_batch_id(&mut txn); + let mut batch_id = ReportDb::acquire_batch_id(&mut txn); // start with empty batch let mut batches = @@ -77,7 +92,7 @@ impl ContinuallyRan for ReportTask { let instruction = batch.instructions.pop().unwrap(); // bump the id for the new batch - batch_id = ScannerDb::::acquire_batch_id(&mut txn); + batch_id = ReportDb::acquire_batch_id(&mut txn); // make a new batch with this instruction included batches.push(Batch { @@ -93,7 +108,7 @@ impl ContinuallyRan for ReportTask { } // Update the next to potentially report block - ScannerDb::::set_next_to_potentially_report_block(&mut txn, b + 1); + ReportDb::set_next_to_potentially_report_block(&mut txn, b + 1); txn.commit(); } diff --git a/processor/scanner/src/scan/mod.rs b/processor/scanner/src/scan/mod.rs index 1f143809..54f9bd77 100644 --- a/processor/scanner/src/scan/mod.rs +++ b/processor/scanner/src/scan/mod.rs @@ -8,7 +8,6 @@ use serai_in_instructions_primitives::{ use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Block}; -// TODO: Localize to ScanDb? use crate::{ lifetime::LifetimeStage, db::{OutputWithInInstruction, SenderScanData, ScannerDb, ScanToReportDb, ScanToEventualityDb}, @@ -28,6 +27,9 @@ pub(crate) fn queue_output_until_block( queue_for_block: u64, output: &OutputWithInInstruction, ) { + // This isn't a perfect assertion as by the time this txn commits, we may have already started + // scanning this block. That doesn't change it should never trip as we queue outside the window + // we'll scan assert!( queue_for_block >= next_to_scan_for_outputs_block::(txn)