diff --git a/processor/scanner/src/eventuality.rs b/processor/scanner/src/eventuality.rs index b223fd79..83ab4eba 100644 --- a/processor/scanner/src/eventuality.rs +++ b/processor/scanner/src/eventuality.rs @@ -1,9 +1,11 @@ -use serai_db::{Db, DbTxn}; +use group::GroupEncoding; + +use serai_db::{DbTxn, Db}; use primitives::{OutputType, ReceivedOutput, Block}; // TODO: Localize to EventualityDb? -use crate::{lifetime::LifetimeStage, db::ScannerDb, ScannerFeed, ContinuallyRan}; +use crate::{lifetime::LifetimeStage, db::ScannerDb, ScannerFeed, KeyFor, Scheduler, ContinuallyRan}; /* Note: The following assumes there's some value, `CONFIRMATIONS`, and the finalized block we @@ -53,13 +55,14 @@ use crate::{lifetime::LifetimeStage, db::ScannerDb, ScannerFeed, ContinuallyRan} This forms a backlog only if the latency of scanning, acknowledgement, and intake (including checking Eventualities) exceeds the window duration (the desired property). */ -struct EventualityTask { +struct EventualityTask> { db: D, feed: S, + scheduler: Sch, } #[async_trait::async_trait] -impl ContinuallyRan for EventualityTask { +impl> ContinuallyRan for EventualityTask { async fn run_iteration(&mut self) -> Result { /* The set of Eventualities only increase when a block is acknowledged. Accordingly, we can only @@ -168,6 +171,7 @@ impl ContinuallyRan for EventualityTask { }; // Fetch all non-External outputs + // TODO: Have a scan_for_outputs_ext which sorts for us let mut non_external_outputs = block.scan_for_outputs(key.key); non_external_outputs.retain(|output| output.kind() != OutputType::External); // Drop any outputs less than the dust limit @@ -206,8 +210,24 @@ impl ContinuallyRan for EventualityTask { let outputs_to_return = ScannerDb::::take_queued_returns(&mut txn, b); + let new_eventualities = + self.scheduler.accumulate_outputs_and_return_outputs(&mut txn, outputs, outputs_to_return); + for (key, new_eventualities) in new_eventualities { + let key = { + let mut key_repr = as GroupEncoding>::Repr::default(); + assert_eq!(key.len(), key_repr.as_ref().len()); + key_repr.as_mut().copy_from_slice(&key); + KeyFor::::from_bytes(&key_repr).unwrap() + }; + + let mut eventualities = ScannerDb::::eventualities(&txn, key.key); + for new_eventuality in new_eventualities { + eventualities.active_eventualities.insert(new_eventuality.lookup(), new_eventuality); + } + ScannerDb::::set_eventualities(&mut txn, eventualities); + } + // Update the next to check block - // TODO: Two-stage process ScannerDb::::set_next_to_check_for_eventualities_block(&mut txn, next_to_check); txn.commit(); } diff --git a/processor/scanner/src/index.rs b/processor/scanner/src/index.rs index b5c4fd0f..1d278015 100644 --- a/processor/scanner/src/index.rs +++ b/processor/scanner/src/index.rs @@ -1,4 +1,4 @@ -use serai_db::{Db, DbTxn}; +use serai_db::{DbTxn, Db}; use primitives::BlockHeader; diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index b683a4b7..7919f006 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -1,8 +1,12 @@ use core::{marker::PhantomData, fmt::Debug, time::Duration}; +use std::collections::HashMap; use tokio::sync::mpsc; +use serai_db::DbTxn; + use serai_primitives::{NetworkId, Coin, Amount}; + use primitives::Block; // Logic for deciding where in its lifetime a multisig is. @@ -91,6 +95,21 @@ pub trait ScannerFeed: Send + Sync { type KeyFor = <::Block as Block>::Key; type AddressFor = <::Block as Block>::Address; type OutputFor = <::Block as Block>::Output; +type EventualityFor = <::Block as Block>::Eventuality; + +/// The object responsible for accumulating outputs and planning new transactions. +pub trait Scheduler { + /// Accumulate outputs into the scheduler, yielding the Eventualities now to be scanned for. + /// + /// The `Vec` used as the key in the returned HashMap should be the encoded key these + /// Eventualities are for. + fn accumulate_outputs_and_return_outputs( + &mut self, + txn: &mut impl DbTxn, + outputs: Vec>, + outputs_to_return: Vec>, + ) -> HashMap, Vec>>; +} /// A handle to immediately run an iteration of a task. #[derive(Clone)] @@ -189,8 +208,9 @@ impl Scanner { /// /// This means this block was ordered on Serai in relation to `Burn` events, and all validators /// have achieved synchrony on it. - // TODO: If we're acknowledge block `b`, the Eventuality task was already eligible to check it + // TODO: If we're acknowledging block `b`, the Eventuality task was already eligible to check it // for Eventualities. We need this to block until the Eventuality task has actually checked it. + // TODO: Does the prior TODO hold with how the callback is now handled? pub fn acknowledge_block( &mut self, block_number: u64, @@ -198,13 +218,6 @@ impl Scanner { ) -> Vec> { todo!("TODO") } - - /// Register the Eventualities caused by a block. - // TODO: Replace this with a callback returned by acknowledge_block which panics if it's not - // called yet dropped - pub fn register_eventualities(&mut self, block_number: u64, eventualities: Vec<()>) { - todo!("TODO") - } } /* diff --git a/processor/scanner/src/report.rs b/processor/scanner/src/report.rs index 2c35d0f5..ec87845f 100644 --- a/processor/scanner/src/report.rs +++ b/processor/scanner/src/report.rs @@ -1,5 +1,5 @@ use scale::Encode; -use serai_db::{Db, DbTxn}; +use serai_db::{DbTxn, Db}; use serai_primitives::BlockHash; use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; diff --git a/processor/scanner/src/scan.rs b/processor/scanner/src/scan.rs index 2bfb112f..cd010d7c 100644 --- a/processor/scanner/src/scan.rs +++ b/processor/scanner/src/scan.rs @@ -1,5 +1,5 @@ use scale::Decode; -use serai_db::{Db, DbTxn}; +use serai_db::{DbTxn, Db}; use serai_primitives::MAX_DATA_LEN; use serai_in_instructions_primitives::{