diff --git a/processor/primitives/src/block.rs b/processor/primitives/src/block.rs index 77e7e816..5ca2acec 100644 --- a/processor/primitives/src/block.rs +++ b/processor/primitives/src/block.rs @@ -1,8 +1,9 @@ use core::fmt::Debug; +use std::collections::HashMap; use group::{Group, GroupEncoding}; -use crate::{Id, Address, ReceivedOutput}; +use crate::{Id, Address, ReceivedOutput, Eventuality, EventualityTracker}; /// A block header from an external network. pub trait BlockHeader: Send + Sync + Sized + Clone + Debug { @@ -15,6 +16,12 @@ pub trait BlockHeader: Send + Sync + Sized + Clone + Debug { fn parent(&self) -> [u8; 32]; } +/// A transaction from an external network. +pub trait Transaction: Send + Sync + Sized { + /// The type used to identify transactions on this external network. + type Id: Id; +} + /// A block from an external network. /// /// A block is defined as a consensus event associated with a set of transactions. It is not @@ -30,12 +37,31 @@ pub trait Block: Send + Sync + Sized + Clone + Debug { type Key: Group + GroupEncoding; /// The type used to represent addresses on this external network. type Address: Address; + /// The type used to represent transactions on this external network. + type Transaction: Transaction; /// The type used to represent received outputs on this external network. - type Output: ReceivedOutput; + type Output: ReceivedOutput< + Self::Key, + Self::Address, + TransactionId = ::Id, + >; + /// The type used to represent an Eventuality for a transaction on this external network. + type Eventuality: Eventuality< + OutputId = >::Id, + >; /// The ID of this block. fn id(&self) -> [u8; 32]; /// Scan all outputs within this block to find the outputs spendable by this key. fn scan_for_outputs(&self, key: Self::Key) -> Vec; + + /// Check if this block resolved any Eventualities. + /// + /// Returns tbe resolved Eventualities, indexed by the ID of the transactions which resolved + /// them. + fn check_for_eventuality_resolutions( + &self, + eventualities: &mut EventualityTracker, + ) -> HashMap<::Id, Self::Eventuality>; } diff --git a/processor/primitives/src/eventuality.rs b/processor/primitives/src/eventuality.rs index 6e16637d..7203031b 100644 --- a/processor/primitives/src/eventuality.rs +++ b/processor/primitives/src/eventuality.rs @@ -1,8 +1,12 @@ -use std::collections::HashMap; -use std::io; +use std::{io, collections::HashMap}; + +use crate::Id; /// A description of a transaction which will eventually happen. pub trait Eventuality: Sized + Send + Sync { + /// The type used to identify a received output. + type OutputId: Id; + /// A unique byte sequence which can be used to identify potentially resolving transactions. /// /// Both a transaction and an Eventuality are expected to be able to yield lookup sequences. @@ -15,6 +19,9 @@ pub trait Eventuality: Sized + Send + Sync { /// identified, the full check is performed. fn lookup(&self) -> Vec; + /// The output this plan forwarded. + fn forwarded_output(&self) -> Option; + /// Read an Eventuality. fn read(reader: &mut R) -> io::Result; /// Serialize an Eventuality to a `Vec`. diff --git a/processor/primitives/src/lib.rs b/processor/primitives/src/lib.rs index dc64facf..f796a13a 100644 --- a/processor/primitives/src/lib.rs +++ b/processor/primitives/src/lib.rs @@ -2,7 +2,7 @@ #![doc = include_str!("../README.md")] #![deny(missing_docs)] -use core::fmt::Debug; +use core::{hash::Hash, fmt::Debug}; use group::GroupEncoding; @@ -29,6 +29,8 @@ pub trait Id: + Clone + Default + PartialEq + + Eq + + Hash + AsRef<[u8]> + AsMut<[u8]> + Debug diff --git a/processor/primitives/src/output.rs b/processor/primitives/src/output.rs index 2b96d229..152a59e0 100644 --- a/processor/primitives/src/output.rs +++ b/processor/primitives/src/output.rs @@ -89,12 +89,16 @@ pub trait ReceivedOutput: { /// The type used to identify this output. type Id: 'static + Id; + /// The type used to identify the transaction which created this output. + type TransactionId: 'static + Id; /// The type of this output. fn kind(&self) -> OutputType; /// The ID of this output. fn id(&self) -> Self::Id; + /// The ID of the transaction which created this output. + fn transaction_id(&self) -> Self::TransactionId; /// The key this output was received by. fn key(&self) -> K; diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 0710ae30..09807a09 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -1,15 +1,13 @@ use core::marker::PhantomData; use std::io; -use group::GroupEncoding; - -use scale::{Encode, Decode}; +use scale::Encode; use borsh::{BorshSerialize, BorshDeserialize}; use serai_db::{Get, DbTxn, create_db}; use serai_in_instructions_primitives::InInstructionWithBalance; -use primitives::{Id, ReceivedOutput, Block, BorshG}; +use primitives::{ReceivedOutput, BorshG}; use crate::{lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor}; diff --git a/processor/scanner/src/eventuality.rs b/processor/scanner/src/eventuality.rs index cb91ca42..b223fd79 100644 --- a/processor/scanner/src/eventuality.rs +++ b/processor/scanner/src/eventuality.rs @@ -1,9 +1,9 @@ use serai_db::{Db, DbTxn}; -use primitives::{Id, ReceivedOutput, Block}; +use primitives::{OutputType, ReceivedOutput, Block}; // TODO: Localize to EventualityDb? -use crate::{db::ScannerDb, ScannerFeed, ContinuallyRan}; +use crate::{lifetime::LifetimeStage, db::ScannerDb, ScannerFeed, ContinuallyRan}; /* Note: The following assumes there's some value, `CONFIRMATIONS`, and the finalized block we @@ -109,12 +109,105 @@ impl ContinuallyRan for EventualityTask { iterated = true; - // TODO: Not only check/clear eventualities, if this eventuality forwarded an output, queue - // it to be reported in however many blocks - todo!("TODO"); + // TODO: Add a helper to fetch an indexed block, de-duplicate with scan + let block = match self.feed.block_by_number(b).await { + Ok(block) => block, + Err(e) => Err(format!("couldn't fetch block {b}: {e:?}"))?, + }; + + // Check the ID of this block is the expected ID + { + let expected = + ScannerDb::::block_id(&self.db, b).expect("scannable block didn't have its ID saved"); + if block.id() != expected { + panic!( + "finalized chain reorganized from {} to {} at {}", + hex::encode(expected), + hex::encode(block.id()), + b + ); + } + } + + log::info!("checking eventuality completions in block: {} ({b})", hex::encode(block.id())); + + /* + This is proper as the keys for the next to scan block (at most `WINDOW_LENGTH` ahead, + which is `<= CONFIRMATIONS`) will be the keys to use here. + + If we had added a new key (which hasn't actually actived by the block we're currently + working on), it won't have any Eventualities for at least `CONFIRMATIONS` blocks (so it'd + have no impact here). + + As for retiring a key, that's done on this task's timeline. We ensure we don't bork the + scanner by officially retiring the key `WINDOW_LENGTH` blocks in the future (ensuring the + scanner never has a malleable view of the keys). + */ + // TODO: Ensure the add key/remove key DB fns are called by the same task to prevent issues + // there + // TODO: On register eventuality, assert the above timeline assumptions + let mut keys = ScannerDb::::active_keys_as_of_next_to_scan_for_outputs_block(&self.db) + .expect("scanning for a blockchain without any keys set"); let mut txn = self.db.txn(); + + // Fetch the External outputs we reported, and therefore should yield after handling this + // block + let mut outputs = ScannerDb::::in_instructions(&txn, b) + .expect("handling eventualities/outputs for block which didn't set its InInstructions") + .into_iter() + .map(|output| output.output) + .collect::>(); + + for key in keys { + let completed_eventualities = { + let mut eventualities = ScannerDb::::eventualities(&txn, key.key); + let completed_eventualities = block.check_for_eventuality_resolutions(&mut eventualities); + ScannerDb::::set_eventualities(&mut txn, eventualities); + completed_eventualities + }; + + // Fetch all non-External outputs + let mut non_external_outputs = block.scan_for_outputs(key.key); + non_external_outputs.retain(|output| output.kind() != OutputType::External); + // Drop any outputs less than the dust limit + non_external_outputs.retain(|output| { + let balance = output.balance(); + balance.amount.0 >= self.feed.dust(balance.coin).0 + }); + + /* + Now that we have all non-External outputs, we filter them to be only the outputs which + are from transactions which resolve our own Eventualities *if* the multisig is retiring. + This implements step 6 of `spec/processor/Multisig Rotation.md`. + + We may receive a Change output. The only issue with accumulating this would be if it + extends the multisig's lifetime (by increasing the amount of outputs yet to be + forwarded). By checking it's one we made, either: + 1) It's a legitimate Change output to be forwarded + 2) It's a Change output created by a user burning coins (specifying the Change address), + which can only be created while the multisig is actively handling `Burn`s (therefore + ensuring this multisig cannot be kept alive ad-infinitum) + + The commentary on Change outputs also applies to Branch/Forwarded. They'll presumably get + ignored if not usable however. + */ + if key.stage == LifetimeStage::Finishing { + non_external_outputs + .retain(|output| completed_eventualities.contains_key(&output.transaction_id())); + } + + // Now, we iterate over all Forwarded outputs and queue their InInstructions + todo!("TODO"); + + // Accumulate all of these outputs + outputs.extend(non_external_outputs); + } + + let outputs_to_return = ScannerDb::::take_queued_returns(&mut txn, b); + // Update the next to check block + // TODO: Two-stage process ScannerDb::::set_next_to_check_for_eventualities_block(&mut txn, next_to_check); txn.commit(); } diff --git a/processor/scanner/src/index.rs b/processor/scanner/src/index.rs index de68522e..b5c4fd0f 100644 --- a/processor/scanner/src/index.rs +++ b/processor/scanner/src/index.rs @@ -1,6 +1,6 @@ use serai_db::{Db, DbTxn}; -use primitives::{Id, BlockHeader}; +use primitives::BlockHeader; // TODO: Localize to IndexDb? use crate::{db::ScannerDb, ScannerFeed, ContinuallyRan}; diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 5b5f6fe2..b683a4b7 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -3,7 +3,7 @@ use core::{marker::PhantomData, fmt::Debug, time::Duration}; use tokio::sync::mpsc; use serai_primitives::{NetworkId, Coin, Amount}; -use primitives::{ReceivedOutput, BlockHeader, Block}; +use primitives::Block; // Logic for deciding where in its lifetime a multisig is. mod lifetime; @@ -34,8 +34,8 @@ pub trait ScannerFeed: Send + Sync { /// The amount of blocks to process in parallel. /// - /// This value must be at least `1`. This value should be the worst-case latency to handle a - /// block divided by the expected block time. + /// This must be at least `1`. This must be less than or equal to `CONFIRMATIONS`. This value + /// should be the worst-case latency to handle a block divided by the expected block time. const WINDOW_LENGTH: u64; /// The amount of blocks which will occur in 10 minutes (approximate). @@ -83,7 +83,8 @@ pub trait ScannerFeed: Send + Sync { /// The dust threshold for the specified coin. /// - /// This should be a value worth handling at a human level. + /// This MUST be constant. Serai MJUST NOT create internal outputs worth less than this. This + /// SHOULD be a value worth handling at a human level. fn dust(&self, coin: Coin) -> Amount; } @@ -188,6 +189,8 @@ impl Scanner { /// /// This means this block was ordered on Serai in relation to `Burn` events, and all validators /// have achieved synchrony on it. + // TODO: If we're acknowledge block `b`, the Eventuality task was already eligible to check it + // for Eventualities. We need this to block until the Eventuality task has actually checked it. pub fn acknowledge_block( &mut self, block_number: u64, diff --git a/processor/scanner/src/lifetime.rs b/processor/scanner/src/lifetime.rs index 6d189bca..09df7a37 100644 --- a/processor/scanner/src/lifetime.rs +++ b/processor/scanner/src/lifetime.rs @@ -6,6 +6,7 @@ use crate::ScannerFeed; /// rotation process. Steps 7-8 regard a multisig which isn't retiring yet retired, and /// accordingly, no longer exists, so they are not modelled here (as this only models active /// multisigs. Inactive multisigs aren't represented in the first place). +#[derive(PartialEq)] pub(crate) enum LifetimeStage { /// A new multisig, once active, shouldn't actually start receiving coins until several blocks /// later. If any UI is premature in sending to this multisig, we delay to report the outputs to diff --git a/processor/scanner/src/report.rs b/processor/scanner/src/report.rs index 37ef8874..2c35d0f5 100644 --- a/processor/scanner/src/report.rs +++ b/processor/scanner/src/report.rs @@ -3,7 +3,8 @@ use serai_db::{Db, DbTxn}; use serai_primitives::BlockHash; use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; -use primitives::{Id, OutputType, Block}; + +use primitives::ReceivedOutput; // TODO: Localize to ReportDb? use crate::{db::ScannerDb, ScannerFeed, ContinuallyRan}; @@ -48,8 +49,20 @@ impl ContinuallyRan for ReportTask { // If this block is notable, create the Batch(s) for it if ScannerDb::::is_block_notable(&txn, b) { - let in_instructions = ScannerDb::::in_instructions(&txn, b) - .expect("reporting block which didn't set its InInstructions"); + let in_instructions = { + let mut in_instructions = ScannerDb::::in_instructions(&txn, b) + .expect("reporting block which didn't set its InInstructions"); + // Sort these before reporting them in case anything we did is non-deterministic/to have + // a well-defined order (not implicit to however we got this result, enabling different + // methods to be used in the future) + in_instructions.sort_by(|a, b| { + use core::cmp::{Ordering, Ord}; + let res = a.output.id().as_ref().cmp(&b.output.id().as_ref()); + assert!(res != Ordering::Equal); + res + }); + in_instructions + }; let network = S::NETWORK; let block_hash = diff --git a/processor/scanner/src/scan.rs b/processor/scanner/src/scan.rs index 8c8e07b3..2bfb112f 100644 --- a/processor/scanner/src/scan.rs +++ b/processor/scanner/src/scan.rs @@ -1,14 +1,12 @@ -use group::GroupEncoding; - -use scale::{Encode, Decode}; +use scale::Decode; use serai_db::{Db, DbTxn}; -use serai_primitives::{MAX_DATA_LEN, ExternalAddress}; +use serai_primitives::MAX_DATA_LEN; use serai_in_instructions_primitives::{ Shorthand, RefundableInInstruction, InInstruction, InInstructionWithBalance, }; -use primitives::{Id, OutputType, ReceivedOutput, Block}; +use primitives::{OutputType, ReceivedOutput, Block}; // TODO: Localize to ScanDb? use crate::{ @@ -100,7 +98,7 @@ impl ContinuallyRan for ScanForOutputsTask { log::info!("scanning block: {} ({b})", hex::encode(block.id())); assert_eq!(ScannerDb::::next_to_scan_for_outputs_block(&self.db).unwrap(), b); - let mut keys = ScannerDb::::active_keys_as_of_next_to_scan_for_outputs_block(&self.db) + let keys = ScannerDb::::active_keys_as_of_next_to_scan_for_outputs_block(&self.db) .expect("scanning for a blockchain without any keys set"); let mut txn = self.db.txn(); @@ -154,7 +152,7 @@ impl ContinuallyRan for ScanForOutputsTask { if output.kind() != OutputType::External { // While we don't report these outputs, we still need consensus on this block and // accordingly still need to set it as notable - let balance = outputs.balance(); + let balance = output.balance(); // We ensure it's over the dust limit to prevent people sending 1 satoshi from causing // an invocation of a consensus/signing protocol if balance.amount.0 >= self.feed.dust(balance.coin).0 {