From 8763ef23ed6d42dd99a46d5f452b8748b7cfc68c Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 20 Aug 2024 11:57:56 -0400 Subject: [PATCH] Definition and delineation of tasks within the scanner Also defines primitives for the processor. --- .github/workflows/tests.yml | 1 + Cargo.lock | 28 ++++ Cargo.toml | 2 + processor/frost-attempt-manager/Cargo.toml | 3 + processor/primitives/Cargo.toml | 27 +++ processor/primitives/LICENSE | 15 ++ processor/primitives/README.md | 3 + processor/primitives/src/lib.rs | 167 +++++++++++++++++++ processor/scanner/Cargo.toml | 20 ++- processor/scanner/src/db.rs | 162 ++++++++++++++++++ processor/scanner/src/eventuality.rs | 0 processor/scanner/src/index.rs | 72 ++++++++ processor/scanner/src/lib.rs | 183 ++++++++++----------- processor/scanner/src/scan.rs | 73 ++++++++ processor/src/multisigs/mod.rs | 2 + 15 files changed, 653 insertions(+), 105 deletions(-) create mode 100644 processor/primitives/Cargo.toml create mode 100644 processor/primitives/LICENSE create mode 100644 processor/primitives/README.md create mode 100644 processor/primitives/src/lib.rs create mode 100644 processor/scanner/src/db.rs create mode 100644 processor/scanner/src/eventuality.rs create mode 100644 processor/scanner/src/index.rs create mode 100644 processor/scanner/src/scan.rs diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 385d54c4..5032676f 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -41,6 +41,7 @@ jobs: -p serai-processor-messages \ -p serai-processor-key-gen \ -p serai-processor-frost-attempt-manager \ + -p serai-processor-primitives \ -p serai-processor-scanner \ -p serai-processor \ -p tendermint-machine \ diff --git a/Cargo.lock b/Cargo.lock index f5e1151d..230ed22f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8647,6 +8647,34 @@ dependencies = [ "serai-validator-sets-primitives", ] +[[package]] +name = "serai-processor-primitives" +version = "0.1.0" +dependencies = [ + "async-trait", + "borsh", + "group", + "parity-scale-codec", + "serai-primitives", +] + +[[package]] +name = "serai-processor-scanner" +version = "0.1.0" +dependencies = [ + "async-trait", + "borsh", + "group", + "hex", + "log", + "parity-scale-codec", + "serai-db", + "serai-processor-messages", + "serai-processor-primitives", + "thiserror", + "tokio", +] + [[package]] name = "serai-processor-tests" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 8d6d9416..7ad08a51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,6 +72,8 @@ members = [ "processor/messages", "processor/key-gen", "processor/frost-attempt-manager", + + "processor/primitives", "processor/scanner", "processor", diff --git a/processor/frost-attempt-manager/Cargo.toml b/processor/frost-attempt-manager/Cargo.toml index 01c1e4c5..a01acf0f 100644 --- a/processor/frost-attempt-manager/Cargo.toml +++ b/processor/frost-attempt-manager/Cargo.toml @@ -13,6 +13,9 @@ rust-version = "1.79" all-features = true rustdoc-args = ["--cfg", "docsrs"] +[package.metadata.cargo-machete] +ignored = ["borsh", "scale"] + [lints] workspace = true diff --git a/processor/primitives/Cargo.toml b/processor/primitives/Cargo.toml new file mode 100644 index 00000000..dd59c0a8 --- /dev/null +++ b/processor/primitives/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "serai-processor-primitives" +version = "0.1.0" +description = "Primitives for the Serai processor" +license = "AGPL-3.0-only" +repository = "https://github.com/serai-dex/serai/tree/develop/processor/primitives" +authors = ["Luke Parker "] +keywords = [] +edition = "2021" +publish = false + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[lints] +workspace = true + +[dependencies] +async-trait = { version = "0.1", default-features = false } + +group = { version = "0.13", default-features = false } + +serai-primitives = { path = "../../substrate/primitives", default-features = false, features = ["std"] } + +scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] } +borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } diff --git a/processor/primitives/LICENSE b/processor/primitives/LICENSE new file mode 100644 index 00000000..41d5a261 --- /dev/null +++ b/processor/primitives/LICENSE @@ -0,0 +1,15 @@ +AGPL-3.0-only license + +Copyright (c) 2022-2024 Luke Parker + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License Version 3 as +published by the Free Software Foundation. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . diff --git a/processor/primitives/README.md b/processor/primitives/README.md new file mode 100644 index 00000000..d616993c --- /dev/null +++ b/processor/primitives/README.md @@ -0,0 +1,3 @@ +# Primitives + +Primitive types/traits/structs used by the Processor. diff --git a/processor/primitives/src/lib.rs b/processor/primitives/src/lib.rs new file mode 100644 index 00000000..535dd14f --- /dev/null +++ b/processor/primitives/src/lib.rs @@ -0,0 +1,167 @@ +#![cfg_attr(docsrs, feature(doc_auto_cfg))] +#![doc = include_str!("../README.md")] +#![deny(missing_docs)] + +use core::fmt::Debug; +use std::io; + +use group::GroupEncoding; + +use serai_primitives::Balance; + +use scale::{Encode, Decode}; +use borsh::{BorshSerialize, BorshDeserialize}; + +/// An ID for an output/transaction/block/etc. +/// +/// IDs don't need to implement `Copy`, enabling `[u8; 33]`, `[u8; 64]` to be used. IDs are still +/// bound to being of a constant-size, where `Default::default()` returns an instance of such size +/// (making `Vec` invalid as an `Id`). +pub trait Id: + Send + + Sync + + Clone + + Default + + PartialEq + + AsRef<[u8]> + + AsMut<[u8]> + + Debug + + Encode + + Decode + + BorshSerialize + + BorshDeserialize +{ +} +impl Id for [u8; N] where [u8; N]: Default {} + +/// The type of the output. +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] +pub enum OutputType { + /// An output received to the address external payments use. + /// + /// This is reported to Substrate in a `Batch`. + External, + + /// A branch output. + /// + /// Given a known output set, and a known series of outbound transactions, we should be able to + /// form a completely deterministic schedule S. The issue is when S has TXs which spend prior TXs + /// in S (which is needed for our logarithmic scheduling). In order to have the descendant TX, + /// say S[1], build off S[0], we need to observe when S[0] is included on-chain. + /// + /// We cannot. + /// + /// Monero (and other privacy coins) do not expose their UTXO graphs. Even if we know how to + /// create S[0], and the actual payment info behind it, we cannot observe it on the blockchain + /// unless we participated in creating it. Locking the entire schedule, when we cannot sign for + /// the entire schedule at once, to a single signing set isn't feasible. + /// + /// While any member of the active signing set can provide data enabling other signers to + /// participate, it's several KB of data which we then have to code communication for. + /// The other option is to simply not observe S[0]. Instead, observe a TX with an identical + /// output to the one in S[0] we intended to use for S[1]. It's either from S[0], or Eve, a + /// malicious actor, has sent us a forged TX which is... equally as usable? So who cares? + /// + /// The only issue is if we have multiple outputs on-chain with identical amounts and purposes. + /// Accordingly, when the scheduler makes a plan for when a specific output is available, it + /// shouldn't set that plan. It should *push* that plan to a queue of plans to perform when + /// instances of that output occur. + Branch, + + /// A change output. + /// + /// This should be added to the available UTXO pool with no further action taken. It does not + /// need to be reported (though we do still need synchrony on the block it's in). There's no + /// explicit expectation for the usage of this output at time of recipience. + Change, + + /// A forwarded output from the prior multisig. + /// + /// This is distinguished for technical reasons around detecting when a multisig should be + /// retired. + Forwarded, +} + +impl OutputType { + fn write(&self, writer: &mut W) -> io::Result<()> { + writer.write_all(&[match self { + OutputType::External => 0, + OutputType::Branch => 1, + OutputType::Change => 2, + OutputType::Forwarded => 3, + }]) + } + + fn read(reader: &mut R) -> io::Result { + let mut byte = [0; 1]; + reader.read_exact(&mut byte)?; + Ok(match byte[0] { + 0 => OutputType::External, + 1 => OutputType::Branch, + 2 => OutputType::Change, + 3 => OutputType::Forwarded, + _ => Err(io::Error::other("invalid OutputType"))?, + }) + } +} + +/// A received output. +pub trait ReceivedOutput: + Send + Sync + Sized + Clone + PartialEq + Eq + Debug +{ + /// The type used to identify this output. + type Id: 'static + Id; + + /// The type of this output. + fn kind(&self) -> OutputType; + + /// The ID of this output. + fn id(&self) -> Self::Id; + /// The key this output was received by. + fn key(&self) -> K; + + /// The presumed origin for this output. + /// + /// This is used as the address to refund coins to if we can't handle the output as desired + /// (unless overridden). + fn presumed_origin(&self) -> Option; + + /// The balance associated with this output. + fn balance(&self) -> Balance; + /// The arbitrary data (presumably an InInstruction) associated with this output. + fn data(&self) -> &[u8]; + + /// Write this output. + fn write(&self, writer: &mut W) -> io::Result<()>; + /// Read an output. + fn read(reader: &mut R) -> io::Result; +} + +/// A block from an external network. +#[async_trait::async_trait] +pub trait Block: Send + Sync + Sized + Clone + Debug { + /// The type used to identify blocks. + type Id: 'static + Id; + /// The ID of this block. + fn id(&self) -> Self::Id; + /// The ID of the parent block. + fn parent(&self) -> Self::Id; +} + +/// A wrapper for a group element which implements the borsh traits. +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub struct BorshG(pub G); +impl BorshSerialize for BorshG { + fn serialize(&self, writer: &mut W) -> borsh::io::Result<()> { + writer.write_all(self.0.to_bytes().as_ref()) + } +} +impl BorshDeserialize for BorshG { + fn deserialize_reader(reader: &mut R) -> borsh::io::Result { + let mut repr = G::Repr::default(); + reader.read_exact(repr.as_mut())?; + Ok(Self( + Option::::from(G::from_bytes(&repr)).ok_or(borsh::io::Error::other("invalid point"))?, + )) + } +} diff --git a/processor/scanner/Cargo.toml b/processor/scanner/Cargo.toml index f3b5ad37..670581d9 100644 --- a/processor/scanner/Cargo.toml +++ b/processor/scanner/Cargo.toml @@ -17,17 +17,23 @@ rustdoc-args = ["--cfg", "docsrs"] workspace = true [dependencies] -rand_core = { version = "0.6", default-features = false, features = ["std", "getrandom"] } - -frost = { package = "modular-frost", path = "../../crypto/frost", version = "^0.8.1", default-features = false } - -serai-validator-sets-primitives = { path = "../../substrate/validator-sets/primitives", default-features = false, features = ["std"] } +# Macros +async-trait = { version = "0.1", default-features = false } +thiserror = { version = "1", default-features = false } +# Encoders hex = { version = "0.4", default-features = false, features = ["std"] } -log = { version = "0.4", default-features = false, features = ["std"] } - scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] } borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } + +# Cryptography +group = { version = "0.13", default-features = false } + +# Application +log = { version = "0.4", default-features = false, features = ["std"] } +tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] } + serai-db = { path = "../../common/db" } messages = { package = "serai-processor-messages", path = "../messages" } +primitives = { package = "serai-processor-primitives", path = "../primitives" } diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs new file mode 100644 index 00000000..8bd7d944 --- /dev/null +++ b/processor/scanner/src/db.rs @@ -0,0 +1,162 @@ +use core::marker::PhantomData; + +use group::GroupEncoding; + +use borsh::{BorshSerialize, BorshDeserialize}; +use serai_db::{Get, DbTxn, create_db}; + +use primitives::{Id, Block, BorshG}; + +use crate::ScannerFeed; + +// The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this. +trait Borshy: BorshSerialize + BorshDeserialize {} +impl Borshy for T {} + +#[derive(BorshSerialize, BorshDeserialize)] +struct SeraiKey { + activation_block_number: u64, + retirement_block_number: Option, + key: K, +} + +create_db!( + Scanner { + BlockId: (number: u64) -> I, + BlockNumber: (id: I) -> u64, + + ActiveKeys: () -> Vec>, + + // The latest finalized block to appear of a blockchain + LatestFinalizedBlock: () -> u64, + // The latest block which it's safe to scan (dependent on what Serai has acknowledged scanning) + LatestScannableBlock: () -> u64, + // The next block to scan for received outputs + NextToScanForOutputsBlock: () -> u64, + // The next block to check for resolving eventualities + NextToCheckForEventualitiesBlock: () -> u64, + + // If a block was notable + /* + A block is notable if one of three conditions are met: + + 1) We activated a key within this block. + 2) We retired a key within this block. + 3) We received outputs within this block. + + The first two conditions, and the reasoning for them, is extensively documented in + `spec/processor/Multisig Rotation.md`. The third is obvious (as any block we receive outputs + in needs synchrony so that we can spend the received outputs). + + We save if a block is notable here by either the scan for received outputs task or the + check for eventuality completion task. Once a block has been processed by both, the reporting + task will report any notable blocks. Finally, the task which sets the block safe to scan to + makes its decision based on the notable blocks and the acknowledged blocks. + */ + // This collapses from `bool` to `()`, using if the value was set for true and false otherwise + NotableBlock: (number: u64) -> (), + } +); + +pub(crate) struct ScannerDb(PhantomData); +impl ScannerDb { + pub(crate) fn set_block(txn: &mut impl DbTxn, number: u64, id: ::Id) { + BlockId::set(txn, number, &id); + BlockNumber::set(txn, id, &number); + } + pub(crate) fn block_id(getter: &impl Get, number: u64) -> Option<::Id> { + BlockId::get(getter, number) + } + pub(crate) fn block_number(getter: &impl Get, id: ::Id) -> Option { + BlockNumber::get(getter, id) + } + + // activation_block_number is inclusive, so the key will be scanned for starting at the specified + // block + pub(crate) fn queue_key(txn: &mut impl DbTxn, activation_block_number: u64, key: S::Key) { + let mut keys: Vec>> = ActiveKeys::get(txn).unwrap_or(vec![]); + for key_i in &keys { + if key == key_i.key.0 { + panic!("queueing a key prior queued"); + } + } + keys.push(SeraiKey { + activation_block_number, + retirement_block_number: None, + key: BorshG(key), + }); + ActiveKeys::set(txn, &keys); + } + // retirement_block_number is inclusive, so the key will no longer be scanned for as of the + // specified block + pub(crate) fn retire_key(txn: &mut impl DbTxn, retirement_block_number: u64, key: S::Key) { + let mut keys: Vec>> = + ActiveKeys::get(txn).expect("retiring key yet no active keys"); + + assert!(keys.len() > 1, "retiring our only key"); + for i in 0 .. keys.len() { + if key == keys[i].key.0 { + keys[i].retirement_block_number = Some(retirement_block_number); + ActiveKeys::set(txn, &keys); + return; + } + + // This is not the key in question, but since it's older, it already should've been queued + // for retirement + assert!( + keys[i].retirement_block_number.is_some(), + "older key wasn't retired before newer key" + ); + } + panic!("retiring key yet not present in keys") + } + pub(crate) fn keys(getter: &impl Get) -> Option>>> { + ActiveKeys::get(getter) + } + + pub(crate) fn set_start_block( + txn: &mut impl DbTxn, + start_block: u64, + id: ::Id, + ) { + Self::set_block(txn, start_block, id); + LatestFinalizedBlock::set(txn, &start_block); + LatestScannableBlock::set(txn, &start_block); + NextToScanForOutputsBlock::set(txn, &start_block); + NextToCheckForEventualitiesBlock::set(txn, &start_block); + } + + pub(crate) fn set_latest_finalized_block(txn: &mut impl DbTxn, latest_finalized_block: u64) { + LatestFinalizedBlock::set(txn, &latest_finalized_block); + } + pub(crate) fn latest_finalized_block(getter: &impl Get) -> Option { + LatestFinalizedBlock::get(getter) + } + + pub(crate) fn set_latest_scannable_block(txn: &mut impl DbTxn, latest_scannable_block: u64) { + LatestScannableBlock::set(txn, &latest_scannable_block); + } + pub(crate) fn latest_scannable_block(getter: &impl Get) -> Option { + LatestScannableBlock::get(getter) + } + + pub(crate) fn set_next_to_scan_for_outputs_block( + txn: &mut impl DbTxn, + next_to_scan_for_outputs_block: u64, + ) { + NextToScanForOutputsBlock::set(txn, &next_to_scan_for_outputs_block); + } + pub(crate) fn next_to_scan_for_outputs_block(getter: &impl Get) -> Option { + NextToScanForOutputsBlock::get(getter) + } + + pub(crate) fn set_next_to_check_for_eventualities_block( + txn: &mut impl DbTxn, + next_to_check_for_eventualities_block: u64, + ) { + NextToCheckForEventualitiesBlock::set(txn, &next_to_check_for_eventualities_block); + } + pub(crate) fn next_to_check_for_eventualities_block(getter: &impl Get) -> Option { + NextToCheckForEventualitiesBlock::get(getter) + } +} diff --git a/processor/scanner/src/eventuality.rs b/processor/scanner/src/eventuality.rs new file mode 100644 index 00000000..e69de29b diff --git a/processor/scanner/src/index.rs b/processor/scanner/src/index.rs new file mode 100644 index 00000000..66477cdb --- /dev/null +++ b/processor/scanner/src/index.rs @@ -0,0 +1,72 @@ +use serai_db::{Db, DbTxn}; + +use primitives::{Id, Block}; + +// TODO: Localize to IndexDb? +use crate::{db::ScannerDb, ScannerFeed, ContinuallyRan}; + +/* + This processor should build its own index of the blockchain, yet only for finalized blocks which + are safe to process. For Proof of Work blockchains, which only have probabilistic finality, these + are the set of sufficiently confirmed blocks. For blockchains with finality, these are the + finalized blocks. + + This task finds the finalized blocks, verifies they're continguous, and saves their IDs. +*/ +struct IndexFinalizedTask { + db: D, + feed: S, +} + +#[async_trait::async_trait] +impl ContinuallyRan for IndexFinalizedTask { + async fn run_instance(&mut self) -> Result<(), String> { + // Fetch the latest finalized block + let our_latest_finalized = ScannerDb::::latest_finalized_block(&self.db) + .expect("IndexTask run before writing the start block"); + let latest_finalized = match self.feed.latest_finalized_block_number().await { + Ok(latest_finalized) => latest_finalized, + Err(e) => Err(format!("couldn't fetch the latest finalized block number: {e:?}"))?, + }; + + // Index the hashes of all blocks until the latest finalized block + for b in (our_latest_finalized + 1) ..= latest_finalized { + let block = match self.feed.block_by_number(b).await { + Ok(block) => block, + Err(e) => Err(format!("couldn't fetch block {b}: {e:?}"))?, + }; + + // Check this descends from our indexed chain + { + let expected_parent = + ScannerDb::::block_id(&self.db, b - 1).expect("didn't have the ID of the prior block"); + if block.parent() != expected_parent { + panic!( + "current finalized block (#{b}, {}) doesn't build off finalized block (#{}, {})", + hex::encode(block.parent()), + b - 1, + hex::encode(expected_parent) + ); + } + } + + // Update the latest finalized block + let mut txn = self.db.txn(); + ScannerDb::::set_block(&mut txn, b, block.id()); + ScannerDb::::set_latest_finalized_block(&mut txn, b); + txn.commit(); + } + + Ok(()) + } +} + +/* + The processor can't index the blockchain unilaterally. It needs to develop a totally ordered view + of the blockchain. That requires consensus with other validators on when certain keys are set to + activate (and retire). We solve this by only scanning `n` blocks ahead of the last agreed upon + block, then waiting for Serai to acknowledge the block. This lets us safely schedule events after + this `n` block window (as demonstrated/proven with `mini`). + + TODO +*/ diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 1b25e108..736a62b9 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -1,25 +1,91 @@ -use core::marker::PhantomData; -use std::{ - sync::Arc, - io::Read, - time::Duration, - collections::{VecDeque, HashSet, HashMap}, -}; +use core::fmt::Debug; -use ciphersuite::group::GroupEncoding; -use frost::curve::Ciphersuite; +use primitives::{ReceivedOutput, Block}; -use log::{info, debug, warn}; -use tokio::{ - sync::{RwLockReadGuard, RwLockWriteGuard, RwLock, mpsc}, - time::sleep, -}; +mod db; +mod index; -use crate::{ - Get, DbTxn, Db, - networks::{Output, Transaction, Eventuality, EventualitiesTracker, Block, Network}, -}; +/// A feed usable to scan a blockchain. +/// +/// This defines the primitive types used, along with various getters necessary for indexing. +#[async_trait::async_trait] +pub trait ScannerFeed: Send + Sync { + /// The type of the key used to receive coins on this blockchain. + type Key: group::Group + group::GroupEncoding; + /// The type of the address used to specify who to send coins to on this blockchain. + type Address; + + /// The type representing a received (and spendable) output. + type Output: ReceivedOutput; + + /// The representation of a block for this blockchain. + /// + /// A block is defined as a consensus event associated with a set of transactions. It is not + /// necessary to literally define it as whatever the external network defines as a block. For + /// external networks which finalize block(s), this block type should be a representation of all + /// transactions within a finalization event. + type Block: Block; + + /// An error encountered when fetching data from the blockchain. + /// + /// This MUST be an ephemeral error. Retrying fetching data from the blockchain MUST eventually + /// resolve without manual intervention. + type EphemeralError: Debug; + + /// Fetch the number of the latest finalized block. + /// + /// The block number is its zero-indexed position within a linear view of the external network's + /// consensus. The genesis block accordingly has block number 0. + async fn latest_finalized_block_number(&self) -> Result; + + /// Fetch a block by its number. + async fn block_by_number(&self, number: u64) -> Result; + + /// Scan a block for its outputs. + async fn scan_for_outputs( + &self, + block: &Self::Block, + key: Self::Key, + ) -> Result; +} + +#[async_trait::async_trait] +pub(crate) trait ContinuallyRan: Sized { + async fn run_instance(&mut self) -> Result<(), String>; + + async fn continually_run(mut self) { + // The default number of seconds to sleep before running the task again + let default_sleep_before_next_task = 5; + // The current number of seconds to sleep before running the task again + // We increment this upon errors in order to not flood the logs with errors + let mut current_sleep_before_next_task = default_sleep_before_next_task; + let increase_sleep_before_next_task = |current_sleep_before_next_task: &mut u64| { + let new_sleep = *current_sleep_before_next_task + default_sleep_before_next_task; + // Set a limit of sleeping for two minutes + *current_sleep_before_next_task = new_sleep.max(120); + }; + + loop { + match self.run_instance().await { + Ok(()) => { + // Upon a successful (error-free) loop iteration, reset the amount of time we sleep + current_sleep_before_next_task = default_sleep_before_next_task; + } + Err(e) => { + log::debug!("{}", e); + increase_sleep_before_next_task(&mut current_sleep_before_next_task); + } + } + + // Don't run the task again for another few seconds + // This is at the start of the loop so we can continue without skipping this delay + tokio::time::sleep(core::time::Duration::from_secs(current_sleep_before_next_task)).await; + } + } +} + +/* #[derive(Clone, Debug)] pub enum ScannerEvent { // Block scanned @@ -44,86 +110,6 @@ pub type ScannerEventChannel = mpsc::UnboundedReceiver>; #[derive(Clone, Debug)] struct ScannerDb(PhantomData, PhantomData); impl ScannerDb { - fn scanner_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec { - D::key(b"SCANNER", dst, key) - } - - fn block_key(number: usize) -> Vec { - Self::scanner_key(b"block_id", u64::try_from(number).unwrap().to_le_bytes()) - } - fn block_number_key(id: &>::Id) -> Vec { - Self::scanner_key(b"block_number", id) - } - fn save_block(txn: &mut D::Transaction<'_>, number: usize, id: &>::Id) { - txn.put(Self::block_number_key(id), u64::try_from(number).unwrap().to_le_bytes()); - txn.put(Self::block_key(number), id); - } - fn block(getter: &G, number: usize) -> Option<>::Id> { - getter.get(Self::block_key(number)).map(|id| { - let mut res = >::Id::default(); - res.as_mut().copy_from_slice(&id); - res - }) - } - fn block_number(getter: &G, id: &>::Id) -> Option { - getter - .get(Self::block_number_key(id)) - .map(|number| u64::from_le_bytes(number.try_into().unwrap()).try_into().unwrap()) - } - - fn keys_key() -> Vec { - Self::scanner_key(b"keys", b"") - } - fn register_key( - txn: &mut D::Transaction<'_>, - activation_number: usize, - key: ::G, - ) { - let mut keys = txn.get(Self::keys_key()).unwrap_or(vec![]); - - let key_bytes = key.to_bytes(); - - let key_len = key_bytes.as_ref().len(); - assert_eq!(keys.len() % (8 + key_len), 0); - - // Sanity check this key isn't already present - let mut i = 0; - while i < keys.len() { - if &keys[(i + 8) .. ((i + 8) + key_len)] == key_bytes.as_ref() { - panic!("adding {} as a key yet it was already present", hex::encode(key_bytes)); - } - i += 8 + key_len; - } - - keys.extend(u64::try_from(activation_number).unwrap().to_le_bytes()); - keys.extend(key_bytes.as_ref()); - txn.put(Self::keys_key(), keys); - } - fn keys(getter: &G) -> Vec<(usize, ::G)> { - let bytes_vec = getter.get(Self::keys_key()).unwrap_or(vec![]); - let mut bytes: &[u8] = bytes_vec.as_ref(); - - // Assumes keys will be 32 bytes when calculating the capacity - // If keys are larger, this may allocate more memory than needed - // If keys are smaller, this may require additional allocations - // Either are fine - let mut res = Vec::with_capacity(bytes.len() / (8 + 32)); - while !bytes.is_empty() { - let mut activation_number = [0; 8]; - bytes.read_exact(&mut activation_number).unwrap(); - let activation_number = u64::from_le_bytes(activation_number).try_into().unwrap(); - - res.push((activation_number, N::Curve::read_G(&mut bytes).unwrap())); - } - res - } - fn retire_key(txn: &mut D::Transaction<'_>) { - let keys = Self::keys(txn); - assert_eq!(keys.len(), 2); - txn.del(Self::keys_key()); - Self::register_key(txn, keys[1].0, keys[1].1); - } - fn seen_key(id: &>::Id) -> Vec { Self::scanner_key(b"seen", id) } @@ -737,3 +723,4 @@ impl Scanner { } } } +*/ diff --git a/processor/scanner/src/scan.rs b/processor/scanner/src/scan.rs new file mode 100644 index 00000000..6f784a7e --- /dev/null +++ b/processor/scanner/src/scan.rs @@ -0,0 +1,73 @@ +use serai_db::{Db, DbTxn}; + +use primitives::{Id, Block}; + +// TODO: Localize to ScanDb? +use crate::{db::ScannerDb, ScannerFeed}; + +struct ScanForOutputsTask { + db: D, + feed: S, +} + +#[async_trait::async_trait] +impl ContinuallyRan for ScanForOutputsTask { + async fn run_instance(&mut self) -> Result<(), String> { + // Fetch the safe to scan block + let latest_scannable = ScannerDb::::latest_scannable_block(&self.db).expect("ScanForOutputsTask run before writing the start block"); + // Fetch the next block to scan + let next_to_scan = ScannerDb::::next_to_scan_for_outputs_block(&self.db).expect("ScanForOutputsTask run before writing the start block"); + + for b in next_to_scan ..= latest_scannable { + let block = match self.feed.block_by_number(b).await { + Ok(block) => block, + Err(e) => Err(format!("couldn't fetch block {b}: {e:?}"))?, + }; + + // Check the ID of this block is the expected ID + { + let expected = ScannerDb::::block_id(b).expect("scannable block didn't have its ID saved"); + if block.id() != expected { + panic!("finalized chain reorganized from {} to {} at {}", hex::encode(expected), hex::encode(block.id()), b); + } + } + + log::info!("scanning block: {} ({b})", hex::encode(block.id())); + + let keys = ScannerDb::::keys(&self.db).expect("scanning for a blockchain without any keys set"); + // Remove all the retired keys + while let Some(retire_at) = keys[0].retirement_block_number { + if retire_at <= b { + keys.remove(0); + } + } + assert!(keys.len() <= 2); + + // Scan for each key + for key in keys { + // If this key has yet to active, skip it + if key.activation_block_number > b { + continue; + } + + let mut outputs = vec![]; + for output in network.scan_for_outputs(&block, key).awaits { + assert_eq!(output.key(), key); + // TODO: Check for dust + outputs.push(output); + } + } + + let mut txn = self.db.txn(); + // Update the latest scanned block + ScannerDb::::set_next_to_scan_for_outputs_block(&mut txn, b + 1); + // TODO: If this had outputs, yield them and mark this block notable + /* + A block is notable if it's an activation, had outputs, or a retirement block. + */ + txn.commit(); + } + + Ok(()) + } +} diff --git a/processor/src/multisigs/mod.rs b/processor/src/multisigs/mod.rs index 12f01715..92ea0271 100644 --- a/processor/src/multisigs/mod.rs +++ b/processor/src/multisigs/mod.rs @@ -18,10 +18,12 @@ use log::{info, error}; use tokio::time::sleep; +/* TODO #[cfg(not(test))] mod scanner; #[cfg(test)] pub mod scanner; +*/ use scanner::{ScannerEvent, ScannerHandle, Scanner};