From f9d02d43c211e463a3087c98460b7f12b22ad5ac Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 29 Aug 2024 12:45:47 -0400 Subject: [PATCH] Route burns through the scanner --- Cargo.lock | 1 + processor/scanner/Cargo.toml | 1 + processor/scanner/src/db.rs | 31 +++++- processor/scanner/src/eventuality/db.rs | 16 ++- processor/scanner/src/eventuality/mod.rs | 124 ++++++++++++++++++----- processor/scanner/src/lib.rs | 89 ++++++++++++++-- 6 files changed, 223 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4cc54e15..2a9de4b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8670,6 +8670,7 @@ dependencies = [ "hex", "log", "parity-scale-codec", + "serai-coins-primitives", "serai-db", "serai-in-instructions-primitives", "serai-primitives", diff --git a/processor/scanner/Cargo.toml b/processor/scanner/Cargo.toml index a16b55f2..e7cdef97 100644 --- a/processor/scanner/Cargo.toml +++ b/processor/scanner/Cargo.toml @@ -37,6 +37,7 @@ serai-db = { path = "../../common/db" } serai-primitives = { path = "../../substrate/primitives", default-features = false, features = ["std"] } serai-in-instructions-primitives = { path = "../../substrate/in-instructions/primitives", default-features = false, features = ["std"] } +serai-coins-primitives = { path = "../../substrate/coins/primitives", default-features = false, features = ["std"] } messages = { package = "serai-processor-messages", path = "../messages" } primitives = { package = "serai-processor-primitives", path = "../primitives" } diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 810859a6..a6272eeb 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -6,6 +6,7 @@ use borsh::{BorshSerialize, BorshDeserialize}; use serai_db::{Get, DbTxn, create_db, db_channel}; use serai_in_instructions_primitives::InInstructionWithBalance; +use serai_coins_primitives::OutInstructionWithBalance; use primitives::{EncodableG, Address, ReceivedOutput}; @@ -336,9 +337,9 @@ impl ScanToEventualityDb { } #[derive(BorshSerialize, BorshDeserialize)] -pub(crate) struct BlockBoundInInstructions { - pub(crate) block_number: u64, - pub(crate) in_instructions: Vec, +struct BlockBoundInInstructions { + block_number: u64, + in_instructions: Vec, } db_channel! { @@ -370,3 +371,27 @@ impl ScanToReportDb { data.in_instructions } } + +db_channel! { + ScannerSubstrateEventuality { + Burns: (acknowledged_block: u64) -> Vec, + } +} + +pub(crate) struct SubstrateToEventualityDb; +impl SubstrateToEventualityDb { + pub(crate) fn send_burns( + txn: &mut impl DbTxn, + acknowledged_block: u64, + burns: &Vec, + ) { + Burns::send(txn, acknowledged_block, burns); + } + + pub(crate) fn try_recv_burns( + txn: &mut impl DbTxn, + acknowledged_block: u64, + ) -> Option> { + Burns::try_recv(txn, acknowledged_block) + } +} diff --git a/processor/scanner/src/eventuality/db.rs b/processor/scanner/src/eventuality/db.rs index c5a07b04..f810ba2f 100644 --- a/processor/scanner/src/eventuality/db.rs +++ b/processor/scanner/src/eventuality/db.rs @@ -11,6 +11,8 @@ create_db!( ScannerEventuality { // The next block to check for resolving eventualities NextToCheckForEventualitiesBlock: () -> u64, + // The latest block this task has handled which was notable + LatestHandledNotableBlock: () -> u64, SerializedEventualities: (key: K) -> Vec, } @@ -22,16 +24,22 @@ impl EventualityDb { txn: &mut impl DbTxn, next_to_check_for_eventualities_block: u64, ) { - assert!( - next_to_check_for_eventualities_block != 0, - "next-to-check-for-eventualities block was 0 when it's bound non-zero" - ); NextToCheckForEventualitiesBlock::set(txn, &next_to_check_for_eventualities_block); } pub(crate) fn next_to_check_for_eventualities_block(getter: &impl Get) -> Option { NextToCheckForEventualitiesBlock::get(getter) } + pub(crate) fn set_latest_handled_notable_block( + txn: &mut impl DbTxn, + latest_handled_notable_block: u64, + ) { + LatestHandledNotableBlock::set(txn, &latest_handled_notable_block); + } + pub(crate) fn latest_handled_notable_block(getter: &impl Get) -> Option { + LatestHandledNotableBlock::get(getter) + } + pub(crate) fn set_eventualities( txn: &mut impl DbTxn, key: KeyFor, diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index b5dc3dd9..38176ed4 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -6,7 +6,10 @@ use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Eventuality, use crate::{ lifetime::LifetimeStage, - db::{OutputWithInInstruction, ReceiverScanData, ScannerGlobalDb, ScanToEventualityDb}, + db::{ + OutputWithInInstruction, ReceiverScanData, ScannerGlobalDb, SubstrateToEventualityDb, + ScanToEventualityDb, + }, BlockExt, ScannerFeed, KeyFor, SchedulerUpdate, Scheduler, sort_outputs, scan::{next_to_scan_for_outputs_block, queue_output_until_block}, }; @@ -20,6 +23,7 @@ use db::EventualityDb; /// only allowed to scan `S::WINDOW_LENGTH - 1` blocks ahead so we can safely schedule keys to /// retire `S::WINDOW_LENGTH` blocks out. pub(crate) fn latest_scannable_block(getter: &impl Get) -> Option { + assert!(S::WINDOW_LENGTH > 0); EventualityDb::::next_to_check_for_eventualities_block(getter) .map(|b| b + S::WINDOW_LENGTH - 1) } @@ -79,24 +83,81 @@ impl> EventualityTask { if EventualityDb::::next_to_check_for_eventualities_block(&db).is_none() { // Initialize the DB let mut txn = db.txn(); - // We can receive outputs in `start_block`, but any descending transactions will be in the - // next block - EventualityDb::::set_next_to_check_for_eventualities_block(&mut txn, start_block + 1); + EventualityDb::::set_next_to_check_for_eventualities_block(&mut txn, start_block); txn.commit(); } Self { db, feed, scheduler } } + + // Returns a boolean of if we intaked any Burns. + fn intake_burns(&mut self) -> bool { + let mut intaked_any = false; + + // If we've handled an notable block, we may have Burns being queued with it as the reference + if let Some(latest_handled_notable_block) = + EventualityDb::::latest_handled_notable_block(&self.db) + { + let mut txn = self.db.txn(); + // Drain the entire channel + while let Some(burns) = + SubstrateToEventualityDb::try_recv_burns(&mut txn, latest_handled_notable_block) + { + intaked_any = true; + + let new_eventualities = self.scheduler.fulfill(&mut txn, burns); + + // TODO: De-duplicate this with below instance via a helper function + for (key, new_eventualities) in new_eventualities { + let key = { + let mut key_repr = as GroupEncoding>::Repr::default(); + assert_eq!(key.len(), key_repr.as_ref().len()); + key_repr.as_mut().copy_from_slice(&key); + KeyFor::::from_bytes(&key_repr).unwrap() + }; + + let mut eventualities = EventualityDb::::eventualities(&txn, key); + for new_eventuality in new_eventualities { + eventualities.insert(new_eventuality); + } + EventualityDb::::set_eventualities(&mut txn, key, &eventualities); + } + } + txn.commit(); + } + + intaked_any + } } #[async_trait::async_trait] impl> ContinuallyRan for EventualityTask { async fn run_iteration(&mut self) -> Result { + // Fetch the highest acknowledged block + let Some(highest_acknowledged) = ScannerGlobalDb::::highest_acknowledged_block(&self.db) + else { + // If we've never acknowledged a block, return + return Ok(false); + }; + + // A boolean of if we've made any progress to return at the end of the function + let mut made_progress = false; + + // Start by intaking any Burns we have sitting around + made_progress |= self.intake_burns(); + /* - The set of Eventualities only increase when a block is acknowledged. Accordingly, we can only - iterate up to (and including) the block currently pending acknowledgement. "including" is - because even if block `b` causes new Eventualities, they'll only potentially resolve in block - `b + 1`. + Eventualities increase upon one of two cases: + + 1) We're fulfilling Burns + 2) We acknowledged a block + + We can't know the processor has intaked all Burns it should have when we process block `b`. + We solve this by executing a consensus protocol whenever a resolution for an Eventuality + created to fulfill Burns occurs. Accordingly, we force ourselves to obtain synchrony on such + blocks (and all preceding Burns). + + This means we can only iterate up to the block currently pending acknowledgement. We only know blocks will need acknowledgement *for sure* if they were scanned. The only other causes are key activation and retirement (both scheduled outside the scan window). This makes @@ -113,32 +174,38 @@ impl> ContinuallyRan for EventualityTas next_to_scan }; - // Fetch the highest acknowledged block - let highest_acknowledged = ScannerGlobalDb::::highest_acknowledged_block(&self.db) - .expect("EventualityTask run before writing the start block"); - // Fetch the next block to check let next_to_check = EventualityDb::::next_to_check_for_eventualities_block(&self.db) .expect("EventualityTask run before writing the start block"); // Check all blocks - let mut iterated = false; for b in next_to_check .. exclusive_upper_bound { - // If the prior block was notable *and* not acknowledged, break - // This is so if it caused any Eventualities (which may resolve this block), we have them - { - // This `- 1` is safe as next to check is bound to be non-zero - // This is possible since even if we receive coins in block 0, any transactions we'd make - // would resolve in block 1 (the first block we'll check under this non-zero rule) - let prior_block = b - 1; - if ScannerGlobalDb::::is_block_notable(&self.db, prior_block) && - (prior_block > highest_acknowledged) - { + let is_block_notable = ScannerGlobalDb::::is_block_notable(&self.db, b); + if is_block_notable { + /* + If this block is notable *and* not acknowledged, break. + + This is so if Burns queued prior to this block's acknowledgement caused any Eventualities + (which may resolve this block), we have them. If it wasn't for that, it'd be so if this + block's acknowledgement caused any Eventualities, we have them, though those would only + potentially resolve in the next block (letting us scan this block without delay). + */ + if b > highest_acknowledged { break; } + + // Since this block is notable, ensure we've intaked all the Burns preceding it + // We can know with certainty that the channel is fully populated at this time since we've + // acknowledged a newer block (so we've handled the state up to this point and new state + // will be for the newer block) + #[allow(unused_assignments)] + { + made_progress |= self.intake_burns(); + } } - iterated = true; + // Since we're handling this block, we are making progress + made_progress = true; let block = self.feed.block_by_number(&self.db, b).await?; @@ -186,6 +253,7 @@ impl> ContinuallyRan for EventualityTas let mut non_external_outputs = block.scan_for_outputs(key.key); non_external_outputs.retain(|output| output.kind() != OutputType::External); // Drop any outputs less than the dust limit + // TODO: Either further filter to outputs we made or also check cost_to_aggregate non_external_outputs.retain(|output| { let balance = output.balance(); balance.amount.0 >= self.feed.dust(balance.coin).0 @@ -288,10 +356,16 @@ impl> ContinuallyRan for EventualityTas // Update the next-to-check block EventualityDb::::set_next_to_check_for_eventualities_block(&mut txn, next_to_check); + + // If this block was notable, update the latest-handled notable block + if is_block_notable { + EventualityDb::::set_latest_handled_notable_block(&mut txn, b); + } + txn.commit(); } // Run dependents if we successfully checked any blocks - Ok(iterated) + Ok(made_progress) } } diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 3e828fcb..27395d79 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -7,6 +7,7 @@ use serai_db::{Get, DbTxn, Db}; use serai_primitives::{NetworkId, Coin, Amount}; use serai_in_instructions_primitives::Batch; +use serai_coins_primitives::OutInstructionWithBalance; use primitives::{task::*, Address, ReceivedOutput, Block}; @@ -15,7 +16,7 @@ mod lifetime; // Database schema definition and associated functions. mod db; -use db::ScannerGlobalDb; +use db::{ScannerGlobalDb, SubstrateToEventualityDb}; // Task to index the blockchain, ensuring we don't reorganize finalized blocks. mod index; // Scans blocks for received coins. @@ -147,7 +148,7 @@ pub trait ScannerFeed: 'static + Send + Sync + Clone { /// The dust threshold for the specified coin. /// - /// This MUST be constant. Serai MJUST NOT create internal outputs worth less than this. This + /// This MUST be constant. Serai MUST NOT create internal outputs worth less than this. This /// SHOULD be a value worth handling at a human level. fn dust(&self, coin: Coin) -> Amount; } @@ -195,6 +196,40 @@ pub trait Scheduler: 'static + Send { txn: &mut impl DbTxn, update: SchedulerUpdate, ) -> HashMap, Vec>>; + + /// Fulfill a series of payments, yielding the Eventualities now to be scanned for. + /// + /// Any Eventualities returned by this function must include an output-to-self (such as a Branch + /// or Change), unless they descend from a transaction returned by this function which satisfies + /// that requirement. + /// + /// The `Vec` used as the key in the returned HashMap should be the encoded key the + /// Eventualities are for. + /* + We need an output-to-self so we can detect a block with an Eventuality completion with regards + to Burns, forcing us to ensure we have accumulated all the Burns we should by the time we + handle that block. We explicitly don't require children have this requirement as by detecting + the first resolution, we ensure we'll accumulate the Burns (therefore becoming aware of the + childrens' Eventualities, enabling recognizing their resolutions). + + This carve out enables the following: + + ------------------ Fulfillment TX ---------------------- + | Primary Output | ---------------> | New Primary Output | + ------------------ | ---------------------- + | + | ------------------------------ + |------> | Branching Output for Burns | + ------------------------------ + + Without wasting pointless Change outputs on every transaction (as there's a single parent which + has an output-to-self). + */ + fn fulfill( + &mut self, + txn: &mut impl DbTxn, + payments: Vec, + ) -> HashMap, Vec>>; } /// A representation of a scanner. @@ -242,6 +277,8 @@ impl Scanner { /// /// This means this block was ordered on Serai in relation to `Burn` events, and all validators /// have achieved synchrony on it. + /// + /// The calls to this function must be ordered with regards to `queue_burns`. pub fn acknowledge_block( &mut self, mut txn: impl DbTxn, @@ -249,10 +286,23 @@ impl Scanner { key_to_activate: Option>, ) { log::info!("acknowledging block {block_number}"); + assert!( ScannerGlobalDb::::is_block_notable(&txn, block_number), "acknowledging a block which wasn't notable" ); + if let Some(prior_highest_acknowledged_block) = + ScannerGlobalDb::::highest_acknowledged_block(&txn) + { + assert!(block_number > prior_highest_acknowledged_block, "acknowledging blocks out-of-order"); + for b in (prior_highest_acknowledged_block + 1) .. (block_number - 1) { + assert!( + !ScannerGlobalDb::::is_block_notable(&txn, b), + "skipped acknowledging a block which was notable" + ); + } + } + ScannerGlobalDb::::set_highest_acknowledged_block(&mut txn, block_number); if let Some(key_to_activate) = key_to_activate { ScannerGlobalDb::::queue_key(&mut txn, block_number + S::WINDOW_LENGTH, key_to_activate); @@ -268,13 +318,38 @@ impl Scanner { /// Queue Burns. /// - /// The scanner only updates the scheduler with new outputs upon acknowledging a block. We can - /// safely queue Burns so long as they're only actually added once we've handled the outputs from - /// the block acknowledged prior to their queueing. - pub fn queue_burns(&mut self, txn: &mut impl DbTxn, burns: Vec<()>) { + /// The scanner only updates the scheduler with new outputs upon acknowledging a block. The + /// ability to fulfill Burns, and therefore their order, is dependent on the current output + /// state. This immediately sets a bound that this function is ordered with regards to + /// `acknowledge_block`. + /* + The fact Burns can be queued during any Substrate block is problematic. The scanner is allowed + to scan anything within the window set by the Eventuality task. The Eventuality task is allowed + to handle all blocks until it reaches a block needing acknowledgement. + + This means we may queue Burns when the latest acknowledged block is 1, yet we've already + scanned 101. Such Burns may complete back in block 2, and we simply wouldn't have noticed due + to not having yet generated the Eventualities. + + We solve this by mandating all transactions made as the result of an Eventuality include a + output-to-self worth at least `N::DUST`. If that occurs, the scanner will force a consensus + protocol on block 2. Accordingly, we won't scan all the way to block 101 (missing the + resolution of the Eventuality) as we'll obtain synchrony on block 2 and all Burns queued prior + to it. + + Another option would be to re-check historical blocks, yet this would potentially redo an + unbounded amount of work. It would also not allow us to safely detect if received outputs were + in fact the result of Eventualities or not. + + Another option would be to schedule Burns after the next-acknowledged block, yet this would add + latency and likely practically require we add regularly scheduled notable blocks (which may be + unnecessary). + */ + pub fn queue_burns(&mut self, txn: &mut impl DbTxn, burns: &Vec) { let queue_as_of = ScannerGlobalDb::::highest_acknowledged_block(txn) .expect("queueing Burns yet never acknowledged a block"); - todo!("TODO") + + SubstrateToEventualityDb::send_burns(txn, queue_as_of, burns) } }