diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 0edfad97..e92435bc 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -27,16 +27,12 @@ create_db!( // The latest finalized block to appear of a blockchain LatestFinalizedBlock: () -> u64, - // The latest block which it's safe to scan (dependent on what Serai has acknowledged scanning) - LatestScannableBlock: () -> u64, // The next block to scan for received outputs NextToScanForOutputsBlock: () -> u64, // The next block to check for resolving eventualities NextToCheckForEventualitiesBlock: () -> u64, // The next block to potentially report NextToPotentiallyReportBlock: () -> u64, - // The highest acknowledged block - HighestAcknowledgedBlock: () -> u64, // If a block was notable /* @@ -125,7 +121,6 @@ impl ScannerDb { pub(crate) fn set_start_block(txn: &mut impl DbTxn, start_block: u64, id: BlockIdFor) { Self::set_block(txn, start_block, id); LatestFinalizedBlock::set(txn, &start_block); - LatestScannableBlock::set(txn, &start_block); NextToScanForOutputsBlock::set(txn, &start_block); NextToCheckForEventualitiesBlock::set(txn, &start_block); NextToPotentiallyReportBlock::set(txn, &start_block); @@ -138,11 +133,10 @@ impl ScannerDb { LatestFinalizedBlock::get(getter) } - pub(crate) fn set_latest_scannable_block(txn: &mut impl DbTxn, latest_scannable_block: u64) { - LatestScannableBlock::set(txn, &latest_scannable_block); - } pub(crate) fn latest_scannable_block(getter: &impl Get) -> Option { - LatestScannableBlock::get(getter) + // This is whatever block we've checked the Eventualities of, plus the window length + // See `eventuality.rs` for more info + NextToCheckForEventualitiesBlock::get(getter).map(|b| b + S::WINDOW_LENGTH) } pub(crate) fn set_next_to_scan_for_outputs_block( @@ -175,16 +169,6 @@ impl ScannerDb { NextToPotentiallyReportBlock::get(getter) } - pub(crate) fn set_highest_acknowledged_block( - txn: &mut impl DbTxn, - highest_acknowledged_block: u64, - ) { - HighestAcknowledgedBlock::set(txn, &highest_acknowledged_block); - } - pub(crate) fn highest_acknowledged_block(getter: &impl Get) -> Option { - HighestAcknowledgedBlock::get(getter) - } - pub(crate) fn set_outputs(txn: &mut impl DbTxn, block_number: u64, outputs: Vec>) { if outputs.is_empty() { return; diff --git a/processor/scanner/src/eventuality.rs b/processor/scanner/src/eventuality.rs index 70b786d1..38f1d112 100644 --- a/processor/scanner/src/eventuality.rs +++ b/processor/scanner/src/eventuality.rs @@ -1 +1,50 @@ // TODO + +/* + Note: The following assumes there's some value, `CONFIRMATIONS`, and the finalized block we + operate on is `CONFIRMATIONS` blocks deep. This is true for Proof-of-Work chains yet not the API + actively used here. + + When we scan a block, we receive outputs. When this block is acknowledged, we accumulate those + outputs into some scheduler, potentially causing certain transactions to begin their signing + protocol. + + Despite only scanning blocks with `CONFIRMATIONS`, we cannot assume that these transactions (in + their signed form) will only appear after `CONFIRMATIONS`. For `CONFIRMATIONS = 10`, the scanned + block's number being `1`, the blockchain will have blocks with numbers `0 ..= 10`. While this + implies the earliest the transaction will appear is when the block number is `11`, which is + `1 + CONFIRMATIONS` (the number of the scanned block, plus the confirmations), this isn't + guaranteed. + + A reorganization could occur which causes all unconfirmed blocks to be replaced, with the new + blockchain having the signed transaction present immediately. + + This means that in order to detect Eventuality completions, we can only check block `b+1` once + we've acknowledged block `b`, accumulated its outputs, triggered any transactions, and prepared + for their Eventualities. This is important as both the completion of Eventualities, and the scan + process, may cause a block to be considered notable (where notable blocks must be perfectly + ordered). + + We do not want to fully serialize the scan flow solely because the Eventuality flow must be. If + the time to scan, acknowledge, and intake a block ever exceeded the block time, we'd form a + backlog. + + The solution is to form a window of blocks we can scan/acknowledge/intake, safely, such that we + only form a backlog if the latency for a block exceeds the duration of the entire window (the + amount of blocks in the window * the block time). + + By considering the block an Eventuality resolves not as the block it does, yet the block a window + later, we enable the following flow: + + - The scanner scans within its window, submitting blocks for acknowledgement. + - We have the blocks acknowledged (the consensus protocol handling this in parallel). + - The scanner checks for Eventualities completed following acknowledged blocks. + - If all Eventualities for a retiring multisig have been cleared, the notable block is one window + later. + - The start of the window shifts to the last block we've checked for Eventualities. This means + the end of the window is the block we just set as notable, and yes, once that's scanned we can + successfully publish a batch for it in a canonical fashion. + + This forms a backlog only if the latency of scanning, acknowledgement, and intake (including + checking Eventualities) exceeds the window duration (the desired property). +*/ diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 02c88599..5f51e7d0 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -10,17 +10,17 @@ mod index; mod scan; mod eventuality; mod report; -mod safe; /// A feed usable to scan a blockchain. /// /// This defines the primitive types used, along with various getters necessary for indexing. #[async_trait::async_trait] pub trait ScannerFeed: Send + Sync { - /// The amount of confirmations required for a block to be finalized. + /// The amount of blocks to process in parallel. /// - /// This value must be at least `1`. - const CONFIRMATIONS: u64; + /// This value must be at least `1`. This value should be the worst-case latency to handle a + /// block divided by the expected block time. + const WINDOW_LENGTH: u64; /// The representation of a block for this blockchain. /// @@ -36,19 +36,11 @@ pub trait ScannerFeed: Send + Sync { /// resolve without manual intervention. type EphemeralError: Debug; - /// Fetch the number of the latest block. - /// - /// The block number is its zero-indexed position within a linear view of the external network's - /// consensus. The genesis block accordingly has block number 0. - async fn latest_block_number(&self) -> Result; - /// Fetch the number of the latest finalized block. /// /// The block number is its zero-indexed position within a linear view of the external network's /// consensus. The genesis block accordingly has block number 0. - async fn latest_finalized_block_number(&self) -> Result { - Ok(self.latest_block_number().await? - Self::CONFIRMATIONS) - } + async fn latest_finalized_block_number(&self) -> Result; /// Fetch a block header by its number. async fn block_header_by_number( @@ -262,77 +254,7 @@ impl ScannerDb { } } -/// The Scanner emits events relating to the blockchain, notably received outputs. -/// -/// It WILL NOT fail to emit an event, even if it reboots at selected moments. -/// -/// It MAY fire the same event multiple times. -#[derive(Debug)] -pub struct Scanner { - _db: PhantomData, - - keys: Vec<(usize, ::G)>, - - eventualities: HashMap, EventualitiesTracker>, - - ram_scanned: Option, - ram_outputs: HashSet>, - - need_ack: VecDeque, - - events: mpsc::UnboundedSender>, -} - -#[derive(Clone, Debug)] -struct ScannerHold { - scanner: Arc>>>, -} -impl ScannerHold { - async fn read(&self) -> RwLockReadGuard<'_, Option>> { - loop { - let lock = self.scanner.read().await; - if lock.is_none() { - drop(lock); - tokio::task::yield_now().await; - continue; - } - return lock; - } - } - async fn write(&self) -> RwLockWriteGuard<'_, Option>> { - loop { - let lock = self.scanner.write().await; - if lock.is_none() { - drop(lock); - tokio::task::yield_now().await; - continue; - } - return lock; - } - } - // This is safe to not check if something else already acquired the Scanner as the only caller is - // sequential. - async fn long_term_acquire(&self) -> Scanner { - self.scanner.write().await.take().unwrap() - } - async fn restore(&self, scanner: Scanner) { - let _ = self.scanner.write().await.insert(scanner); - } -} - -#[derive(Debug)] -pub struct ScannerHandle { - scanner: ScannerHold, - held_scanner: Option>, - pub events: ScannerEventChannel, - pub multisig_completed: mpsc::UnboundedSender, -} - impl ScannerHandle { - pub async fn ram_scanned(&self) -> usize { - self.scanner.read().await.as_ref().unwrap().ram_scanned.unwrap_or(0) - } - /// Register a key to scan for. pub async fn register_key( &mut self, @@ -363,17 +285,6 @@ impl ScannerHandle { scanner.eventualities.insert(key.to_bytes().as_ref().to_vec(), EventualitiesTracker::new()); } - pub fn db_scanned(getter: &G) -> Option { - ScannerDb::::latest_scanned_block(getter) - } - - // This perform a database read which isn't safe with regards to if the value is set or not - // It may be set, when it isn't expected to be set, or not set, when it is expected to be set - // Since the value is static, if it's set, it's correctly set - pub fn block_number(getter: &G, id: &>::Id) -> Option { - ScannerDb::::block_number(getter, id) - } - /// Acknowledge having handled a block. /// /// Creates a lock over the Scanner, preventing its independent scanning operations until @@ -447,7 +358,6 @@ impl Scanner { network: N, db: D, ) -> (ScannerHandle, Vec<(usize, ::G)>) { - let (events_send, events_recv) = mpsc::unbounded_channel(); let (multisig_completed_send, multisig_completed_recv) = mpsc::unbounded_channel(); let keys = ScannerDb::::keys(&db); @@ -455,44 +365,6 @@ impl Scanner { for key in &keys { eventualities.insert(key.1.to_bytes().as_ref().to_vec(), EventualitiesTracker::new()); } - - let ram_scanned = ScannerDb::::latest_scanned_block(&db); - - let scanner = ScannerHold { - scanner: Arc::new(RwLock::new(Some(Scanner { - _db: PhantomData, - - keys: keys.clone(), - - eventualities, - - ram_scanned, - ram_outputs: HashSet::new(), - - need_ack: VecDeque::new(), - - events: events_send, - }))), - }; - tokio::spawn(Scanner::run(db, network, scanner.clone(), multisig_completed_recv)); - - ( - ScannerHandle { - scanner, - held_scanner: None, - events: events_recv, - multisig_completed: multisig_completed_send, - }, - keys, - ) - } - - fn emit(&mut self, event: ScannerEvent) -> bool { - if self.events.send(event).is_err() { - info!("Scanner handler was dropped. Shutting down?"); - return false; - } - true } // An async function, to be spawned on a task, to discover and report outputs @@ -576,30 +448,6 @@ impl Scanner { info!("scanning block: {} ({block_being_scanned})", hex::encode(&block_id)); - // These DB calls are safe, despite not having a txn, since they're static values - // There's no issue if they're written in advance of expected (such as on reboot) - // They're also only expected here - if let Some(id) = ScannerDb::::block(&db, block_being_scanned) { - if id != block_id { - panic!("reorg'd from finalized {} to {}", hex::encode(id), hex::encode(block_id)); - } - } else { - // TODO: Move this to an unwrap - if let Some(id) = ScannerDb::::block(&db, block_being_scanned.saturating_sub(1)) { - if id != block.parent() { - panic!( - "block {} doesn't build off expected parent {}", - hex::encode(block_id), - hex::encode(id), - ); - } - } - - let mut txn = db.txn(); - ScannerDb::::save_block(&mut txn, block_being_scanned, &block_id); - txn.commit(); - } - // Scan new blocks // TODO: This lock acquisition may be long-lived... let mut scanner_lock = scanner_hold.write().await; @@ -617,16 +465,6 @@ impl Scanner { has_activation = true; } - let key_vec = key.to_bytes().as_ref().to_vec(); - - // TODO: These lines are the ones which will cause a really long-lived lock acquisition - for output in network.get_outputs(&block, key).await { - assert_eq!(output.key(), key); - if output.balance().amount.0 >= N::DUST { - outputs.push(output); - } - } - for (id, (block_number, tx, completion)) in network .get_eventuality_completions(scanner.eventualities.get_mut(&key_vec).unwrap(), &block) .await @@ -778,10 +616,6 @@ impl Scanner { let retired = scanner.keys.remove(0).1; scanner.eventualities.remove(retired.to_bytes().as_ref()); } - - // Update ram_scanned - scanner.ram_scanned = Some(block_being_scanned); - drop(scanner_lock); // If we sent a Block event, once again check multisig_completed if sent_block && diff --git a/processor/scanner/src/report.rs b/processor/scanner/src/report.rs index 5c57a3f5..34a59617 100644 --- a/processor/scanner/src/report.rs +++ b/processor/scanner/src/report.rs @@ -19,18 +19,20 @@ struct ReportTask { impl ContinuallyRan for ReportTask { async fn run_iteration(&mut self) -> Result { let highest_reportable = { - // Fetch the latest scanned and latest checked block + // Fetch the next to scan block let next_to_scan = ScannerDb::::next_to_scan_for_outputs_block(&self.db) .expect("ReportTask run before writing the start block"); - let next_to_check = ScannerDb::::next_to_check_for_eventualities_block(&self.db) - .expect("ReportTask run before writing the start block"); // If we haven't done any work, return - if (next_to_scan == 0) || (next_to_check == 0) { + if next_to_scan == 0 { return Ok(false); } + // The last scanned block is the block prior to this + #[allow(clippy::let_and_return)] let last_scanned = next_to_scan - 1; - let last_checked = next_to_check - 1; - last_scanned.min(last_checked) + // The last scanned block is the highest reportable block as we only scan blocks within a + // window where it's safe to immediately report the block + // See `eventuality.rs` for more info + last_scanned }; let next_to_potentially_report = ScannerDb::::next_to_potentially_report_block(&self.db) diff --git a/processor/scanner/src/safe.rs b/processor/scanner/src/safe.rs deleted file mode 100644 index a0b4f547..00000000 --- a/processor/scanner/src/safe.rs +++ /dev/null @@ -1,82 +0,0 @@ -use core::marker::PhantomData; - -use serai_db::{Db, DbTxn}; - -use primitives::{Id, Block}; - -// TODO: Localize to SafeDb? -use crate::{db::ScannerDb, ScannerFeed, ContinuallyRan}; - -/* - We mark blocks safe to scan when they're no more than `(CONFIRMATIONS - 1)` blocks after the - oldest notable block still pending acknowledgement (creating a window of length `CONFIRMATIONS` - when including the block pending acknowledgement). This means that if all known notable blocks - have been acknowledged, and a stretch of non-notable blocks occurs, they'll automatically be - marked safe to scan (since they come before the next oldest notable block still pending - acknowledgement). - - This design lets Serai safely schedule events `CONFIRMATIONS` blocks after the latest - acknowledged block. For an exhaustive proof of this, please see `mini`. -*/ -struct SafeToScanTask { - db: D, - _S: PhantomData, -} - -#[async_trait::async_trait] -impl ContinuallyRan for SafeToScanTask { - async fn run_iteration(&mut self) -> Result { - // First, we fetch the highest acknowledged block - let Some(highest_acknowledged_block) = ScannerDb::::highest_acknowledged_block(&self.db) - else { - // If no blocks have been acknowledged, we don't mark any safe - // Once the start block (implicitly safe) has been acknowledged, we proceed from there - return Ok(false); - }; - - let latest_block_known_if_pending_acknowledgement = { - // The next block to potentially report comes after all blocks we've decided to report or not - // If we've decided to report (or not report) a block, we know if it needs acknowledgement - // (and accordingly is pending acknowledgement) - // Accordingly, the block immediately before this is the latest block with a known status - ScannerDb::::next_to_potentially_report_block(&self.db) - .expect("SafeToScanTask run before writing the start block") - - 1 - }; - - let mut oldest_pending_acknowledgement = None; - for b in (highest_acknowledged_block + 1) ..= latest_block_known_if_pending_acknowledgement { - // If the block isn't notable, immediately flag it as acknowledged - if !ScannerDb::::is_block_notable(&self.db, b) { - let mut txn = self.db.txn(); - ScannerDb::::set_highest_acknowledged_block(&mut txn, b); - txn.commit(); - continue; - } - - oldest_pending_acknowledgement = Some(b); - break; - } - - // `oldest_pending_acknowledgement` is now the oldest block pending acknowledgement or `None` - // If it's `None`, then we were able to implicitly acknowledge all blocks within this span - // Since the safe block is `(CONFIRMATIONS - 1)` blocks after the oldest block still pending - // acknowledgement, and the oldest block still pending acknowledgement is in the future, - // we know the safe block to scan to is - // `>= latest_block_known_if_pending_acknowledgement + (CONFIRMATIONS - 1)` - let oldest_pending_acknowledgement = - oldest_pending_acknowledgement.unwrap_or(latest_block_known_if_pending_acknowledgement); - - let old_safe_block = ScannerDb::::latest_scannable_block(&self.db) - .expect("SafeToScanTask run before writing the start block"); - let new_safe_block = oldest_pending_acknowledgement + - (S::CONFIRMATIONS.checked_sub(1).expect("CONFIRMATIONS wasn't at least 1")); - - // Update the latest scannable block - let mut txn = self.db.txn(); - ScannerDb::::set_latest_scannable_block(&mut txn, new_safe_block); - txn.commit(); - - Ok(old_safe_block != new_safe_block) - } -}