diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 073d5d42..c7cbd253 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -13,9 +13,9 @@ impl Borshy for T {} #[derive(BorshSerialize, BorshDeserialize)] pub(crate) struct SeraiKey { - activation_block_number: u64, - retirement_block_number: Option, - key: K, + pub(crate) activation_block_number: u64, + pub(crate) retirement_block_number: Option, + pub(crate) key: K, } create_db!( @@ -208,7 +208,7 @@ impl ScannerDb { SerializedOutputs::set(txn, block_number, &buf); } - pub(crate) fn is_notable_block(getter: &impl Get, number: u64) -> bool { + pub(crate) fn is_block_notable(getter: &impl Get, number: u64) -> bool { NotableBlock::get(getter, number).is_some() } } diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 04dcf824..a6f3e899 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -6,12 +6,21 @@ use primitives::{ReceivedOutput, Block}; mod db; mod index; +mod scan; +mod eventuality; +mod report; +mod safe; /// A feed usable to scan a blockchain. /// /// This defines the primitive types used, along with various getters necessary for indexing. #[async_trait::async_trait] pub trait ScannerFeed: Send + Sync { + /// The amount of confirmations required for a block to be finalized. + /// + /// This value must be at least `1`. + const CONFIRMATIONS: u64; + /// The type of the key used to receive coins on this blockchain. type Key: group::Group + group::GroupEncoding; @@ -35,11 +44,19 @@ pub trait ScannerFeed: Send + Sync { /// resolve without manual intervention. type EphemeralError: Debug; + /// Fetch the number of the latest block. + /// + /// The block number is its zero-indexed position within a linear view of the external network's + /// consensus. The genesis block accordingly has block number 0. + async fn latest_block_number(&self) -> Result; + /// Fetch the number of the latest finalized block. /// /// The block number is its zero-indexed position within a linear view of the external network's /// consensus. The genesis block accordingly has block number 0. - async fn latest_finalized_block_number(&self) -> Result; + async fn latest_finalized_block_number(&self) -> Result { + Ok(self.latest_block_number().await? - Self::CONFIRMATIONS) + } /// Fetch a block by its number. async fn block_by_number(&self, number: u64) -> Result; @@ -49,7 +66,7 @@ pub trait ScannerFeed: Send + Sync { &self, block: &Self::Block, key: Self::Key, - ) -> Result; + ) -> Result, Self::EphemeralError>; } /// A handle to immediately run an iteration of a task. diff --git a/processor/scanner/src/report.rs b/processor/scanner/src/report.rs index 4d378b9c..5c57a3f5 100644 --- a/processor/scanner/src/report.rs +++ b/processor/scanner/src/report.rs @@ -8,7 +8,7 @@ use serai_db::{Db, DbTxn}; use primitives::{Id, Block}; // TODO: Localize to ReportDb? -use crate::{db::ScannerDb, ScannerFeed}; +use crate::{db::ScannerDb, ScannerFeed, ContinuallyRan}; struct ReportTask { db: D, @@ -20,8 +20,10 @@ impl ContinuallyRan for ReportTask { async fn run_iteration(&mut self) -> Result { let highest_reportable = { // Fetch the latest scanned and latest checked block - let next_to_scan = ScannerDb::::next_to_scan_for_outputs_block(&self.db).expect("ReportTask run before writing the start block"); - let next_to_check = ScannerDb::::next_to_check_for_eventualities_block(&self.db).expect("ReportTask run before writing the start block"); + let next_to_scan = ScannerDb::::next_to_scan_for_outputs_block(&self.db) + .expect("ReportTask run before writing the start block"); + let next_to_check = ScannerDb::::next_to_check_for_eventualities_block(&self.db) + .expect("ReportTask run before writing the start block"); // If we haven't done any work, return if (next_to_scan == 0) || (next_to_check == 0) { return Ok(false); @@ -31,10 +33,11 @@ impl ContinuallyRan for ReportTask { last_scanned.min(last_checked) }; - let next_to_potentially_report = ScannerDb::::next_block_to_potentially_report(&self.db).expect("ReportTask run before writing the start block"); + let next_to_potentially_report = ScannerDb::::next_to_potentially_report_block(&self.db) + .expect("ReportTask run before writing the start block"); for b in next_to_potentially_report ..= highest_reportable { - if ScannerDb::::is_block_notable(b) { + if ScannerDb::::is_block_notable(&self.db, b) { todo!("TODO: Make Batches, which requires handling Forwarded within this crate"); } diff --git a/processor/scanner/src/safe.rs b/processor/scanner/src/safe.rs index a5de448d..a0b4f547 100644 --- a/processor/scanner/src/safe.rs +++ b/processor/scanner/src/safe.rs @@ -5,7 +5,7 @@ use serai_db::{Db, DbTxn}; use primitives::{Id, Block}; // TODO: Localize to SafeDb? -use crate::{db::ScannerDb, ScannerFeed}; +use crate::{db::ScannerDb, ScannerFeed, ContinuallyRan}; /* We mark blocks safe to scan when they're no more than `(CONFIRMATIONS - 1)` blocks after the @@ -27,7 +27,8 @@ struct SafeToScanTask { impl ContinuallyRan for SafeToScanTask { async fn run_iteration(&mut self) -> Result { // First, we fetch the highest acknowledged block - let Some(highest_acknowledged_block) = ScannerDb::::highest_acknowledged_block(&self.db) else { + let Some(highest_acknowledged_block) = ScannerDb::::highest_acknowledged_block(&self.db) + else { // If no blocks have been acknowledged, we don't mark any safe // Once the start block (implicitly safe) has been acknowledged, we proceed from there return Ok(false); @@ -38,13 +39,15 @@ impl ContinuallyRan for SafeToScanTask { // If we've decided to report (or not report) a block, we know if it needs acknowledgement // (and accordingly is pending acknowledgement) // Accordingly, the block immediately before this is the latest block with a known status - ScannerDb::::next_block_to_potentially_report(&self.db).expect("SafeToScanTask run before writing the start block") - 1 + ScannerDb::::next_to_potentially_report_block(&self.db) + .expect("SafeToScanTask run before writing the start block") - + 1 }; let mut oldest_pending_acknowledgement = None; for b in (highest_acknowledged_block + 1) ..= latest_block_known_if_pending_acknowledgement { // If the block isn't notable, immediately flag it as acknowledged - if !ScannerDb::::is_block_notable(b) { + if !ScannerDb::::is_block_notable(&self.db, b) { let mut txn = self.db.txn(); ScannerDb::::set_highest_acknowledged_block(&mut txn, b); txn.commit(); @@ -61,13 +64,19 @@ impl ContinuallyRan for SafeToScanTask { // acknowledgement, and the oldest block still pending acknowledgement is in the future, // we know the safe block to scan to is // `>= latest_block_known_if_pending_acknowledgement + (CONFIRMATIONS - 1)` - let oldest_pending_acknowledgement = oldest_pending_acknowledgement.unwrap_or(latest_block_known_if_pending_acknowledgement); + let oldest_pending_acknowledgement = + oldest_pending_acknowledgement.unwrap_or(latest_block_known_if_pending_acknowledgement); + + let old_safe_block = ScannerDb::::latest_scannable_block(&self.db) + .expect("SafeToScanTask run before writing the start block"); + let new_safe_block = oldest_pending_acknowledgement + + (S::CONFIRMATIONS.checked_sub(1).expect("CONFIRMATIONS wasn't at least 1")); // Update the latest scannable block let mut txn = self.db.txn(); - ScannerDb::::set_latest_scannable_block(oldest_pending_acknowledgement + (CONFIRMATIONS - 1)); + ScannerDb::::set_latest_scannable_block(&mut txn, new_safe_block); txn.commit(); - Ok(next_to_potentially_report <= highest_reportable) + Ok(old_safe_block != new_safe_block) } } diff --git a/processor/scanner/src/scan.rs b/processor/scanner/src/scan.rs index b96486d4..92165002 100644 --- a/processor/scanner/src/scan.rs +++ b/processor/scanner/src/scan.rs @@ -1,9 +1,9 @@ use serai_db::{Db, DbTxn}; -use primitives::{Id, Block}; +use primitives::{Id, ReceivedOutput, Block}; // TODO: Localize to ScanDb? -use crate::{db::ScannerDb, ScannerFeed}; +use crate::{db::ScannerDb, ScannerFeed, ContinuallyRan}; struct ScanForOutputsTask { db: D, @@ -14,9 +14,11 @@ struct ScanForOutputsTask { impl ContinuallyRan for ScanForOutputsTask { async fn run_iteration(&mut self) -> Result { // Fetch the safe to scan block - let latest_scannable = ScannerDb::::latest_scannable_block(&self.db).expect("ScanForOutputsTask run before writing the start block"); + let latest_scannable = ScannerDb::::latest_scannable_block(&self.db) + .expect("ScanForOutputsTask run before writing the start block"); // Fetch the next block to scan - let next_to_scan = ScannerDb::::next_to_scan_for_outputs_block(&self.db).expect("ScanForOutputsTask run before writing the start block"); + let next_to_scan = ScannerDb::::next_to_scan_for_outputs_block(&self.db) + .expect("ScanForOutputsTask run before writing the start block"); for b in next_to_scan ..= latest_scannable { let block = match self.feed.block_by_number(b).await { @@ -26,15 +28,22 @@ impl ContinuallyRan for ScanForOutputsTask { // Check the ID of this block is the expected ID { - let expected = ScannerDb::::block_id(b).expect("scannable block didn't have its ID saved"); + let expected = + ScannerDb::::block_id(&self.db, b).expect("scannable block didn't have its ID saved"); if block.id() != expected { - panic!("finalized chain reorganized from {} to {} at {}", hex::encode(expected), hex::encode(block.id()), b); + panic!( + "finalized chain reorganized from {} to {} at {}", + hex::encode(expected), + hex::encode(block.id()), + b + ); } } log::info!("scanning block: {} ({b})", hex::encode(block.id())); - let keys = ScannerDb::::keys(&self.db).expect("scanning for a blockchain without any keys set"); + let mut keys = + ScannerDb::::keys(&self.db).expect("scanning for a blockchain without any keys set"); // Remove all the retired keys while let Some(retire_at) = keys[0].retirement_block_number { if retire_at <= b { @@ -51,8 +60,13 @@ impl ContinuallyRan for ScanForOutputsTask { continue; } - for output in network.scan_for_outputs(&block, key).awaits { - assert_eq!(output.key(), key); + for output in self + .feed + .scan_for_outputs(&block, key.key.0) + .await + .map_err(|e| format!("failed to scan block {b}: {e:?}"))? + { + assert_eq!(output.key(), key.key.0); // TODO: Check for dust outputs.push(output); }