diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 8bd7d944..073d5d42 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -1,11 +1,9 @@ use core::marker::PhantomData; -use group::GroupEncoding; - use borsh::{BorshSerialize, BorshDeserialize}; use serai_db::{Get, DbTxn, create_db}; -use primitives::{Id, Block, BorshG}; +use primitives::{Id, ReceivedOutput, Block, BorshG}; use crate::ScannerFeed; @@ -14,7 +12,7 @@ trait Borshy: BorshSerialize + BorshDeserialize {} impl Borshy for T {} #[derive(BorshSerialize, BorshDeserialize)] -struct SeraiKey { +pub(crate) struct SeraiKey { activation_block_number: u64, retirement_block_number: Option, key: K, @@ -35,6 +33,10 @@ create_db!( NextToScanForOutputsBlock: () -> u64, // The next block to check for resolving eventualities NextToCheckForEventualitiesBlock: () -> u64, + // The next block to potentially report + NextToPotentiallyReportBlock: () -> u64, + // The highest acknowledged block + HighestAcknowledgedBlock: () -> u64, // If a block was notable /* @@ -55,6 +57,8 @@ create_db!( */ // This collapses from `bool` to `()`, using if the value was set for true and false otherwise NotableBlock: (number: u64) -> (), + + SerializedOutputs: (block_number: u64) -> Vec, } ); @@ -74,6 +78,10 @@ impl ScannerDb { // activation_block_number is inclusive, so the key will be scanned for starting at the specified // block pub(crate) fn queue_key(txn: &mut impl DbTxn, activation_block_number: u64, key: S::Key) { + // Set this block as notable + NotableBlock::set(txn, activation_block_number, &()); + + // Push the key let mut keys: Vec>> = ActiveKeys::get(txn).unwrap_or(vec![]); for key_i in &keys { if key == key_i.key.0 { @@ -124,6 +132,7 @@ impl ScannerDb { LatestScannableBlock::set(txn, &start_block); NextToScanForOutputsBlock::set(txn, &start_block); NextToCheckForEventualitiesBlock::set(txn, &start_block); + NextToPotentiallyReportBlock::set(txn, &start_block); } pub(crate) fn set_latest_finalized_block(txn: &mut impl DbTxn, latest_finalized_block: u64) { @@ -159,4 +168,47 @@ impl ScannerDb { pub(crate) fn next_to_check_for_eventualities_block(getter: &impl Get) -> Option { NextToCheckForEventualitiesBlock::get(getter) } + + pub(crate) fn set_next_to_potentially_report_block( + txn: &mut impl DbTxn, + next_to_potentially_report_block: u64, + ) { + NextToPotentiallyReportBlock::set(txn, &next_to_potentially_report_block); + } + pub(crate) fn next_to_potentially_report_block(getter: &impl Get) -> Option { + NextToPotentiallyReportBlock::get(getter) + } + + pub(crate) fn set_highest_acknowledged_block( + txn: &mut impl DbTxn, + highest_acknowledged_block: u64, + ) { + HighestAcknowledgedBlock::set(txn, &highest_acknowledged_block); + } + pub(crate) fn highest_acknowledged_block(getter: &impl Get) -> Option { + HighestAcknowledgedBlock::get(getter) + } + + pub(crate) fn set_outputs( + txn: &mut impl DbTxn, + block_number: u64, + outputs: Vec>, + ) { + if outputs.is_empty() { + return; + } + + // Set this block as notable + NotableBlock::set(txn, block_number, &()); + + let mut buf = Vec::with_capacity(outputs.len() * 128); + for output in outputs { + output.write(&mut buf).unwrap(); + } + SerializedOutputs::set(txn, block_number, &buf); + } + + pub(crate) fn is_notable_block(getter: &impl Get, number: u64) -> bool { + NotableBlock::get(getter, number).is_some() + } } diff --git a/processor/scanner/src/eventuality.rs b/processor/scanner/src/eventuality.rs index e69de29b..70b786d1 100644 --- a/processor/scanner/src/eventuality.rs +++ b/processor/scanner/src/eventuality.rs @@ -0,0 +1 @@ +// TODO diff --git a/processor/scanner/src/index.rs b/processor/scanner/src/index.rs index 66477cdb..7967d5df 100644 --- a/processor/scanner/src/index.rs +++ b/processor/scanner/src/index.rs @@ -20,7 +20,7 @@ struct IndexFinalizedTask { #[async_trait::async_trait] impl ContinuallyRan for IndexFinalizedTask { - async fn run_instance(&mut self) -> Result<(), String> { + async fn run_iteration(&mut self) -> Result { // Fetch the latest finalized block let our_latest_finalized = ScannerDb::::latest_finalized_block(&self.db) .expect("IndexTask run before writing the start block"); @@ -29,6 +29,18 @@ impl ContinuallyRan for IndexFinalizedTask { Err(e) => Err(format!("couldn't fetch the latest finalized block number: {e:?}"))?, }; + if latest_finalized < our_latest_finalized { + // Explicitly log this as an error as returned ephemeral errors are logged with debug + // This doesn't panic as the node should sync along our indexed chain, and if it doesn't, + // we'll panic at that point in time + log::error!( + "node is out of sync, latest finalized {} is behind our indexed {}", + latest_finalized, + our_latest_finalized + ); + Err("node is out of sync".to_string())?; + } + // Index the hashes of all blocks until the latest finalized block for b in (our_latest_finalized + 1) ..= latest_finalized { let block = match self.feed.block_by_number(b).await { @@ -57,16 +69,7 @@ impl ContinuallyRan for IndexFinalizedTask { txn.commit(); } - Ok(()) + // Have dependents run if we updated the latest finalized block + Ok(our_latest_finalized != latest_finalized) } } - -/* - The processor can't index the blockchain unilaterally. It needs to develop a totally ordered view - of the blockchain. That requires consensus with other validators on when certain keys are set to - activate (and retire). We solve this by only scanning `n` blocks ahead of the last agreed upon - block, then waiting for Serai to acknowledge the block. This lets us safely schedule events after - this `n` block window (as demonstrated/proven with `mini`). - - TODO -*/ diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 736a62b9..04dcf824 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -1,4 +1,6 @@ -use core::fmt::Debug; +use core::{fmt::Debug, time::Duration}; + +use tokio::sync::mpsc; use primitives::{ReceivedOutput, Block}; @@ -50,11 +52,50 @@ pub trait ScannerFeed: Send + Sync { ) -> Result; } +/// A handle to immediately run an iteration of a task. +#[derive(Clone)] +pub(crate) struct RunNowHandle(mpsc::Sender<()>); +/// An instruction recipient to immediately run an iteration of a task. +pub(crate) struct RunNowRecipient(mpsc::Receiver<()>); + +impl RunNowHandle { + /// Create a new run-now handle to be assigned to a task. + pub(crate) fn new() -> (Self, RunNowRecipient) { + // Uses a capacity of 1 as any call to run as soon as possible satisfies all calls to run as + // soon as possible + let (send, recv) = mpsc::channel(1); + (Self(send), RunNowRecipient(recv)) + } + + /// Tell the task to run now (and not whenever its next iteration on a timer is). + /// + /// Panics if the task has been dropped. + pub(crate) fn run_now(&self) { + #[allow(clippy::match_same_arms)] + match self.0.try_send(()) { + Ok(()) => {} + // NOP on full, as this task will already be ran as soon as possible + Err(mpsc::error::TrySendError::Full(())) => {} + Err(mpsc::error::TrySendError::Closed(())) => { + panic!("task was unexpectedly closed when calling run_now") + } + } + } +} + #[async_trait::async_trait] pub(crate) trait ContinuallyRan: Sized { - async fn run_instance(&mut self) -> Result<(), String>; + /// Run an iteration of the task. + /// + /// If this returns `true`, all dependents of the task will immediately have a new iteration ran + /// (without waiting for whatever timer they were already on). + async fn run_iteration(&mut self) -> Result; - async fn continually_run(mut self) { + /// Continually run the task. + /// + /// This returns a channel which can have a message set to immediately trigger a new run of an + /// iteration. + async fn continually_run(mut self, mut run_now: RunNowRecipient, dependents: Vec) { // The default number of seconds to sleep before running the task again let default_sleep_before_next_task = 5; // The current number of seconds to sleep before running the task again @@ -67,10 +108,16 @@ pub(crate) trait ContinuallyRan: Sized { }; loop { - match self.run_instance().await { - Ok(()) => { + match self.run_iteration().await { + Ok(run_dependents) => { // Upon a successful (error-free) loop iteration, reset the amount of time we sleep current_sleep_before_next_task = default_sleep_before_next_task; + + if run_dependents { + for dependent in &dependents { + dependent.run_now(); + } + } } Err(e) => { log::debug!("{}", e); @@ -78,9 +125,11 @@ pub(crate) trait ContinuallyRan: Sized { } } - // Don't run the task again for another few seconds - // This is at the start of the loop so we can continue without skipping this delay - tokio::time::sleep(core::time::Duration::from_secs(current_sleep_before_next_task)).await; + // Don't run the task again for another few seconds UNLESS told to run now + tokio::select! { + () = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {}, + msg = run_now.0.recv() => assert_eq!(msg, Some(()), "run now handle was dropped"), + } } } } diff --git a/processor/scanner/src/report.rs b/processor/scanner/src/report.rs new file mode 100644 index 00000000..4d378b9c --- /dev/null +++ b/processor/scanner/src/report.rs @@ -0,0 +1,50 @@ +/* + We only report blocks once both tasks, scanning for received ouputs and eventualities, have + processed the block. This ensures we've performed all ncessary options. +*/ + +use serai_db::{Db, DbTxn}; + +use primitives::{Id, Block}; + +// TODO: Localize to ReportDb? +use crate::{db::ScannerDb, ScannerFeed}; + +struct ReportTask { + db: D, + feed: S, +} + +#[async_trait::async_trait] +impl ContinuallyRan for ReportTask { + async fn run_iteration(&mut self) -> Result { + let highest_reportable = { + // Fetch the latest scanned and latest checked block + let next_to_scan = ScannerDb::::next_to_scan_for_outputs_block(&self.db).expect("ReportTask run before writing the start block"); + let next_to_check = ScannerDb::::next_to_check_for_eventualities_block(&self.db).expect("ReportTask run before writing the start block"); + // If we haven't done any work, return + if (next_to_scan == 0) || (next_to_check == 0) { + return Ok(false); + } + let last_scanned = next_to_scan - 1; + let last_checked = next_to_check - 1; + last_scanned.min(last_checked) + }; + + let next_to_potentially_report = ScannerDb::::next_block_to_potentially_report(&self.db).expect("ReportTask run before writing the start block"); + + for b in next_to_potentially_report ..= highest_reportable { + if ScannerDb::::is_block_notable(b) { + todo!("TODO: Make Batches, which requires handling Forwarded within this crate"); + } + + let mut txn = self.db.txn(); + // Update the next to potentially report block + ScannerDb::::set_next_to_potentially_report_block(&mut txn, b + 1); + txn.commit(); + } + + // Run dependents if we decided to report any blocks + Ok(next_to_potentially_report <= highest_reportable) + } +} diff --git a/processor/scanner/src/safe.rs b/processor/scanner/src/safe.rs new file mode 100644 index 00000000..a5de448d --- /dev/null +++ b/processor/scanner/src/safe.rs @@ -0,0 +1,73 @@ +use core::marker::PhantomData; + +use serai_db::{Db, DbTxn}; + +use primitives::{Id, Block}; + +// TODO: Localize to SafeDb? +use crate::{db::ScannerDb, ScannerFeed}; + +/* + We mark blocks safe to scan when they're no more than `(CONFIRMATIONS - 1)` blocks after the + oldest notable block still pending acknowledgement (creating a window of length `CONFIRMATIONS` + when including the block pending acknowledgement). This means that if all known notable blocks + have been acknowledged, and a stretch of non-notable blocks occurs, they'll automatically be + marked safe to scan (since they come before the next oldest notable block still pending + acknowledgement). + + This design lets Serai safely schedule events `CONFIRMATIONS` blocks after the latest + acknowledged block. For an exhaustive proof of this, please see `mini`. +*/ +struct SafeToScanTask { + db: D, + _S: PhantomData, +} + +#[async_trait::async_trait] +impl ContinuallyRan for SafeToScanTask { + async fn run_iteration(&mut self) -> Result { + // First, we fetch the highest acknowledged block + let Some(highest_acknowledged_block) = ScannerDb::::highest_acknowledged_block(&self.db) else { + // If no blocks have been acknowledged, we don't mark any safe + // Once the start block (implicitly safe) has been acknowledged, we proceed from there + return Ok(false); + }; + + let latest_block_known_if_pending_acknowledgement = { + // The next block to potentially report comes after all blocks we've decided to report or not + // If we've decided to report (or not report) a block, we know if it needs acknowledgement + // (and accordingly is pending acknowledgement) + // Accordingly, the block immediately before this is the latest block with a known status + ScannerDb::::next_block_to_potentially_report(&self.db).expect("SafeToScanTask run before writing the start block") - 1 + }; + + let mut oldest_pending_acknowledgement = None; + for b in (highest_acknowledged_block + 1) ..= latest_block_known_if_pending_acknowledgement { + // If the block isn't notable, immediately flag it as acknowledged + if !ScannerDb::::is_block_notable(b) { + let mut txn = self.db.txn(); + ScannerDb::::set_highest_acknowledged_block(&mut txn, b); + txn.commit(); + continue; + } + + oldest_pending_acknowledgement = Some(b); + break; + } + + // `oldest_pending_acknowledgement` is now the oldest block pending acknowledgement or `None` + // If it's `None`, then we were able to implicitly acknowledge all blocks within this span + // Since the safe block is `(CONFIRMATIONS - 1)` blocks after the oldest block still pending + // acknowledgement, and the oldest block still pending acknowledgement is in the future, + // we know the safe block to scan to is + // `>= latest_block_known_if_pending_acknowledgement + (CONFIRMATIONS - 1)` + let oldest_pending_acknowledgement = oldest_pending_acknowledgement.unwrap_or(latest_block_known_if_pending_acknowledgement); + + // Update the latest scannable block + let mut txn = self.db.txn(); + ScannerDb::::set_latest_scannable_block(oldest_pending_acknowledgement + (CONFIRMATIONS - 1)); + txn.commit(); + + Ok(next_to_potentially_report <= highest_reportable) + } +} diff --git a/processor/scanner/src/scan.rs b/processor/scanner/src/scan.rs index 6f784a7e..b96486d4 100644 --- a/processor/scanner/src/scan.rs +++ b/processor/scanner/src/scan.rs @@ -12,7 +12,7 @@ struct ScanForOutputsTask { #[async_trait::async_trait] impl ContinuallyRan for ScanForOutputsTask { - async fn run_instance(&mut self) -> Result<(), String> { + async fn run_iteration(&mut self) -> Result { // Fetch the safe to scan block let latest_scannable = ScannerDb::::latest_scannable_block(&self.db).expect("ScanForOutputsTask run before writing the start block"); // Fetch the next block to scan @@ -43,6 +43,7 @@ impl ContinuallyRan for ScanForOutputsTask { } assert!(keys.len() <= 2); + let mut outputs = vec![]; // Scan for each key for key in keys { // If this key has yet to active, skip it @@ -50,7 +51,6 @@ impl ContinuallyRan for ScanForOutputsTask { continue; } - let mut outputs = vec![]; for output in network.scan_for_outputs(&block, key).awaits { assert_eq!(output.key(), key); // TODO: Check for dust @@ -59,15 +59,14 @@ impl ContinuallyRan for ScanForOutputsTask { } let mut txn = self.db.txn(); - // Update the latest scanned block + // Save the outputs + ScannerDb::::set_outputs(&mut txn, b, outputs); + // Update the next to scan block ScannerDb::::set_next_to_scan_for_outputs_block(&mut txn, b + 1); - // TODO: If this had outputs, yield them and mark this block notable - /* - A block is notable if it's an activation, had outputs, or a retirement block. - */ txn.commit(); } - Ok(()) + // Run dependents if we successfully scanned any blocks + Ok(next_to_scan <= latest_scannable) } }