From d5d1fc3eea493c11feae279025de3d60b0db6ad1 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Fri, 23 Aug 2024 22:29:15 -0400 Subject: [PATCH] Flesh out report task --- processor/primitives/src/block.rs | 11 +++--- processor/scanner/src/db.rs | 31 +++++++++------- processor/scanner/src/lib.rs | 6 ++-- processor/scanner/src/report.rs | 59 +++++++++++++++++++++++++------ processor/scanner/src/scan.rs | 2 +- 5 files changed, 79 insertions(+), 30 deletions(-) diff --git a/processor/primitives/src/block.rs b/processor/primitives/src/block.rs index 1fc92c3a..77e7e816 100644 --- a/processor/primitives/src/block.rs +++ b/processor/primitives/src/block.rs @@ -6,12 +6,13 @@ use crate::{Id, Address, ReceivedOutput}; /// A block header from an external network. pub trait BlockHeader: Send + Sync + Sized + Clone + Debug { - /// The type used to identify blocks. - type Id: 'static + Id; /// The ID of this block. - fn id(&self) -> Self::Id; + /// + /// This is fixed to 32-bytes and is expected to be cryptographically binding with 128-bit + /// security. This is not required to be the ID used natively by the external network. + fn id(&self) -> [u8; 32]; /// The ID of the parent block. - fn parent(&self) -> Self::Id; + fn parent(&self) -> [u8; 32]; } /// A block from an external network. @@ -33,7 +34,7 @@ pub trait Block: Send + Sync + Sized + Clone + Debug { type Output: ReceivedOutput; /// The ID of this block. - fn id(&self) -> ::Id; + fn id(&self) -> [u8; 32]; /// Scan all outputs within this block to find the outputs spendable by this key. fn scan_for_outputs(&self, key: Self::Key) -> Vec; diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index fa2db781..cccbe5f6 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -11,7 +11,7 @@ use serai_in_instructions_primitives::InInstructionWithBalance; use primitives::{Id, ReceivedOutput, Block, BorshG}; -use crate::{lifetime::LifetimeStage, ScannerFeed, BlockIdFor, KeyFor, AddressFor, OutputFor}; +use crate::{lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor}; // The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this. trait Borshy: BorshSerialize + BorshDeserialize {} @@ -46,8 +46,8 @@ impl OutputWithInInstruction { create_db!( Scanner { - BlockId: (number: u64) -> I, - BlockNumber: (id: I) -> u64, + BlockId: (number: u64) -> [u8; 32], + BlockNumber: (id: [u8; 32]) -> u64, ActiveKeys: () -> Vec>, @@ -91,14 +91,14 @@ create_db!( pub(crate) struct ScannerDb(PhantomData); impl ScannerDb { - pub(crate) fn set_block(txn: &mut impl DbTxn, number: u64, id: BlockIdFor) { + pub(crate) fn set_block(txn: &mut impl DbTxn, number: u64, id: [u8; 32]) { BlockId::set(txn, number, &id); BlockNumber::set(txn, id, &number); } - pub(crate) fn block_id(getter: &impl Get, number: u64) -> Option> { + pub(crate) fn block_id(getter: &impl Get, number: u64) -> Option<[u8; 32]> { BlockId::get(getter, number) } - pub(crate) fn block_number(getter: &impl Get, id: BlockIdFor) -> Option { + pub(crate) fn block_number(getter: &impl Get, id: [u8; 32]) -> Option { BlockNumber::get(getter, id) } @@ -154,7 +154,7 @@ impl ScannerDb { Some(keys) } - pub(crate) fn set_start_block(txn: &mut impl DbTxn, start_block: u64, id: BlockIdFor) { + pub(crate) fn set_start_block(txn: &mut impl DbTxn, start_block: u64, id: [u8; 32]) { assert!( LatestFinalizedBlock::get(txn).is_none(), "setting start block but prior set start block" @@ -276,18 +276,18 @@ impl ScannerDb { SerializedForwardedOutput::set(txn, id.as_ref(), &buf); } + // TODO: Use a DbChannel here, and send the instructions to the report task and the outputs to + // the eventuality task? That way this cleans up after itself pub(crate) fn set_in_instructions( txn: &mut impl DbTxn, block_number: u64, outputs: Vec>, ) { - if outputs.is_empty() { - return; + if !outputs.is_empty() { + // Set this block as notable + NotableBlock::set(txn, block_number, &()); } - // Set this block as notable - NotableBlock::set(txn, block_number, &()); - let mut buf = Vec::with_capacity(outputs.len() * 128); for output in outputs { output.write(&mut buf).unwrap(); @@ -295,6 +295,13 @@ impl ScannerDb { SerializedOutputs::set(txn, block_number, &buf); } + pub(crate) fn in_instructions( + getter: &impl Get, + block_number: u64, + ) -> Option>> { + todo!("TODO") + } + pub(crate) fn is_block_notable(getter: &impl Get, number: u64) -> bool { NotableBlock::get(getter, number).is_some() } diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 0a26f177..5b5f6fe2 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -2,7 +2,7 @@ use core::{marker::PhantomData, fmt::Debug, time::Duration}; use tokio::sync::mpsc; -use serai_primitives::{Coin, Amount}; +use serai_primitives::{NetworkId, Coin, Amount}; use primitives::{ReceivedOutput, BlockHeader, Block}; // Logic for deciding where in its lifetime a multisig is. @@ -24,6 +24,9 @@ mod report; /// This defines the primitive types used, along with various getters necessary for indexing. #[async_trait::async_trait] pub trait ScannerFeed: Send + Sync { + /// The ID of the network being scanned for. + const NETWORK: NetworkId; + /// The amount of confirmations a block must have to be considered finalized. /// /// This value must be at least `1`. @@ -84,7 +87,6 @@ pub trait ScannerFeed: Send + Sync { fn dust(&self, coin: Coin) -> Amount; } -type BlockIdFor = <<::Block as Block>::Header as BlockHeader>::Id; type KeyFor = <::Block as Block>::Key; type AddressFor = <::Block as Block>::Address; type OutputFor = <::Block as Block>::Output; diff --git a/processor/scanner/src/report.rs b/processor/scanner/src/report.rs index 3c22556c..17cdca35 100644 --- a/processor/scanner/src/report.rs +++ b/processor/scanner/src/report.rs @@ -1,15 +1,20 @@ -/* - We only report blocks once both tasks, scanning for received ouputs and eventualities, have - processed the block. This ensures we've performed all ncessary options. -*/ - +use scale::Encode; use serai_db::{Db, DbTxn}; +use serai_primitives::BlockHash; +use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; use primitives::{Id, OutputType, Block}; // TODO: Localize to ReportDb? use crate::{db::ScannerDb, ScannerFeed, ContinuallyRan}; +/* + This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion. + + We only report blocks once both tasks, scanning for received outputs and checking for resolved + Eventualities, have processed the block. This ensures we know if this block is notable, and have + the InInstructions for it. +*/ struct ReportTask { db: D, feed: S, @@ -39,15 +44,49 @@ impl ContinuallyRan for ReportTask { .expect("ReportTask run before writing the start block"); for b in next_to_potentially_report ..= highest_reportable { - if ScannerDb::::is_block_notable(&self.db, b) { - let in_instructions = todo!("TODO"); - // TODO: Also pull the InInstructions from forwarding - todo!("TODO: Make Batches, which requires handling Forwarded within this crate"); + let mut txn = self.db.txn(); + + if ScannerDb::::is_block_notable(&txn, b) { + let in_instructions = ScannerDb::::in_instructions(&txn, b) + .expect("reporting block which didn't set its InInstructions"); + + let network = S::NETWORK; + let block_hash = + ScannerDb::::block_id(&txn, b).expect("reporting block we didn't save the ID for"); + let mut batch_id = ScannerDb::::acquire_batch_id(txn); + + // start with empty batch + let mut batches = + vec![Batch { network, id: batch_id, block: BlockHash(block_hash), instructions: vec![] }]; + + for instruction in in_instructions { + let batch = batches.last_mut().unwrap(); + batch.instructions.push(instruction.in_instruction); + + // check if batch is over-size + if batch.encode().len() > MAX_BATCH_SIZE { + // pop the last instruction so it's back in size + let instruction = batch.instructions.pop().unwrap(); + + // bump the id for the new batch + batch_id = ScannerDb::::acquire_batch_id(txn); + + // make a new batch with this instruction included + batches.push(Batch { + network, + id: batch_id, + block: BlockHash(block_hash), + instructions: vec![instruction], + }); + } + } + + todo!("TODO: Set/emit batches"); } - let mut txn = self.db.txn(); // Update the next to potentially report block ScannerDb::::set_next_to_potentially_report_block(&mut txn, b + 1); + txn.commit(); } diff --git a/processor/scanner/src/scan.rs b/processor/scanner/src/scan.rs index e35eb749..365f0f14 100644 --- a/processor/scanner/src/scan.rs +++ b/processor/scanner/src/scan.rs @@ -212,7 +212,7 @@ impl ContinuallyRan for ScanForOutputsTask { LifetimeStage::Forwarding => { // When the forwarded output appears, we can see which Plan it's associated with and // from there recover this output - ScannerDb::::save_output_being_forwarded(&mut txn, &output_with_in_instruction); + ScannerDb::::save_output_being_forwarded(&mut txn, b, &output_with_in_instruction); continue; } // We should drop these as we should not be handling new External outputs at this