From 9ab8ba021544a174b3da7ebfce90163688dbc927 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 27 Aug 2024 00:23:15 -0400 Subject: [PATCH] Add dedicated Eventuality DB and stub missing fns --- processor/scanner/src/db.rs | 16 +++++++++-- processor/scanner/src/eventuality/db.rs | 36 ++++++++++++++++++++++++ processor/scanner/src/eventuality/mod.rs | 15 ++++++---- processor/scanner/src/lib.rs | 26 +++++++++-------- processor/scanner/src/report.rs | 10 +++---- processor/scanner/src/scan.rs | 4 +-- 6 files changed, 81 insertions(+), 26 deletions(-) create mode 100644 processor/scanner/src/eventuality/db.rs diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 4ac6bada..18511222 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -215,8 +215,8 @@ impl ScannerDb { pub(crate) fn queue_return( txn: &mut impl DbTxn, block_queued_from: u64, - return_addr: AddressFor, - output: OutputFor, + return_addr: &AddressFor, + output: &OutputFor, ) { todo!("TODO") } @@ -253,6 +253,10 @@ impl ScannerDb { } pub(crate) fn flag_notable(txn: &mut impl DbTxn, block_number: u64) { + assert!( + NextToPotentiallyReportBlock::get(txn).unwrap() <= block_number, + "already potentially reported a block we're only now flagging as notable" + ); NotableBlock::set(txn, block_number, &()); } @@ -285,4 +289,12 @@ impl ScannerDb { pub(crate) fn is_block_notable(getter: &impl Get, number: u64) -> bool { NotableBlock::get(getter, number).is_some() } + + pub(crate) fn take_queued_returns(txn: &mut impl DbTxn, block_number: u64) -> Vec> { + todo!("TODO") + } + + pub(crate) fn acquire_batch_id(txn: &mut impl DbTxn) -> u32 { + todo!("TODO") + } } diff --git a/processor/scanner/src/eventuality/db.rs b/processor/scanner/src/eventuality/db.rs new file mode 100644 index 00000000..e379532d --- /dev/null +++ b/processor/scanner/src/eventuality/db.rs @@ -0,0 +1,36 @@ +use core::marker::PhantomData; + +use borsh::{BorshSerialize, BorshDeserialize}; +use serai_db::{Get, DbTxn, create_db}; + +use primitives::EventualityTracker; + +use crate::{ScannerFeed, KeyFor, EventualityFor}; + +// The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this. +trait Borshy: BorshSerialize + BorshDeserialize {} +impl Borshy for T {} + +create_db!( + ScannerEventuality { + SerializedEventualities: () -> Vec, + } +); + +pub(crate) struct EventualityDb(PhantomData); +impl EventualityDb { + pub(crate) fn set_eventualities( + txn: &mut impl DbTxn, + key: KeyFor, + eventualities: &EventualityTracker>, + ) { + todo!("TODO") + } + + pub(crate) fn eventualities( + getter: &impl Get, + key: KeyFor, + ) -> EventualityTracker> { + todo!("TODO") + } +} diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index 8fc18246..3d70d650 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -2,13 +2,16 @@ use group::GroupEncoding; use serai_db::{DbTxn, Db}; -use primitives::{OutputType, ReceivedOutput, Block}; +use primitives::{OutputType, ReceivedOutput, Eventuality, Block}; // TODO: Localize to EventualityDb? use crate::{ lifetime::LifetimeStage, db::ScannerDb, BlockExt, ScannerFeed, KeyFor, Scheduler, ContinuallyRan, }; +mod db; +use db::EventualityDb; + /* When we scan a block, we receive outputs. When this block is acknowledged, we accumulate those outputs into some scheduler, potentially causing certain transactions to begin their signing @@ -110,7 +113,7 @@ impl> ContinuallyRan for EventualityTas iterated = true; - let block = self.feed.block_by_number(b).await?; + let block = self.feed.block_by_number(&self.db, b).await?; log::info!("checking eventuality completions in block: {} ({b})", hex::encode(block.id())); @@ -144,9 +147,9 @@ impl> ContinuallyRan for EventualityTas for key in keys { let completed_eventualities = { - let mut eventualities = ScannerDb::::eventualities(&txn, key.key); + let mut eventualities = EventualityDb::::eventualities(&txn, key.key); let completed_eventualities = block.check_for_eventuality_resolutions(&mut eventualities); - ScannerDb::::set_eventualities(&mut txn, eventualities); + EventualityDb::::set_eventualities(&mut txn, key.key, &eventualities); completed_eventualities }; @@ -200,11 +203,11 @@ impl> ContinuallyRan for EventualityTas KeyFor::::from_bytes(&key_repr).unwrap() }; - let mut eventualities = ScannerDb::::eventualities(&txn, key); + let mut eventualities = EventualityDb::::eventualities(&txn, key); for new_eventuality in new_eventualities { eventualities.active_eventualities.insert(new_eventuality.lookup(), new_eventuality); } - ScannerDb::::set_eventualities(&mut txn, eventualities); + EventualityDb::::set_eventualities(&mut txn, key, &eventualities); } // Update the next to check block diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index d38c2ec3..fb6599b7 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -1,13 +1,11 @@ -use core::{marker::PhantomData, fmt::Debug, time::Duration}; +use core::{marker::PhantomData, fmt::Debug}; use std::collections::HashMap; -use tokio::sync::mpsc; - -use serai_db::DbTxn; +use serai_db::{Get, DbTxn}; use serai_primitives::{NetworkId, Coin, Amount}; -use primitives::{task::*, Block}; +use primitives::{task::*, ReceivedOutput, Block}; // Logic for deciding where in its lifetime a multisig is. mod lifetime; @@ -29,10 +27,10 @@ pub(crate) trait BlockExt: Block { } impl BlockExt for B { fn scan_for_outputs(&self, key: Self::Key) -> Vec { - let mut outputs = self.scan_for_outputs_unordered(); + let mut outputs = self.scan_for_outputs_unordered(key); outputs.sort_by(|a, b| { use core::cmp::{Ordering, Ord}; - let res = a.id().as_ref().cmp(&b.id().as_ref()); + let res = a.id().as_ref().cmp(b.id().as_ref()); assert!(res != Ordering::Equal, "scanned two outputs within a block with the same ID"); res }); @@ -103,7 +101,11 @@ pub trait ScannerFeed: Send + Sync { /// Fetch a block by its number. /// /// Panics if the block requested wasn't indexed. - async fn block_by_number(&self, getter: &impl Get, number: u64) -> Result { + async fn block_by_number( + &self, + getter: &(impl Send + Sync + Get), + number: u64, + ) -> Result { let block = match self.unchecked_block_by_number(number).await { Ok(block) => block, Err(e) => Err(format!("couldn't fetch block {number}: {e:?}"))?, @@ -111,8 +113,8 @@ pub trait ScannerFeed: Send + Sync { // Check the ID of this block is the expected ID { - let expected = - crate::index::IndexDb::block_id(&self.db, number).expect("requested a block which wasn't indexed"); + let expected = crate::index::IndexDb::block_id(getter, number) + .expect("requested a block which wasn't indexed"); if block.id() != expected { panic!( "finalized chain reorganized from {} to {} at {}", @@ -122,6 +124,8 @@ pub trait ScannerFeed: Send + Sync { ); } } + + Ok(block) } /// The cost to aggregate an input as of the specified block. @@ -146,7 +150,7 @@ type OutputFor = <::Block as Block>::Output; type EventualityFor = <::Block as Block>::Eventuality; /// The object responsible for accumulating outputs and planning new transactions. -pub trait Scheduler { +pub trait Scheduler: Send { /// Accumulate outputs into the scheduler, yielding the Eventualities now to be scanned for. /// /// The `Vec` used as the key in the returned HashMap should be the encoded key these diff --git a/processor/scanner/src/report.rs b/processor/scanner/src/report.rs index ec87845f..8f37d7a6 100644 --- a/processor/scanner/src/report.rs +++ b/processor/scanner/src/report.rs @@ -7,7 +7,7 @@ use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; use primitives::ReceivedOutput; // TODO: Localize to ReportDb? -use crate::{db::ScannerDb, ScannerFeed, ContinuallyRan}; +use crate::{db::ScannerDb, index::IndexDb, ScannerFeed, ContinuallyRan}; /* This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion. @@ -57,7 +57,7 @@ impl ContinuallyRan for ReportTask { // methods to be used in the future) in_instructions.sort_by(|a, b| { use core::cmp::{Ordering, Ord}; - let res = a.output.id().as_ref().cmp(&b.output.id().as_ref()); + let res = a.output.id().as_ref().cmp(b.output.id().as_ref()); assert!(res != Ordering::Equal); res }); @@ -66,8 +66,8 @@ impl ContinuallyRan for ReportTask { let network = S::NETWORK; let block_hash = - ScannerDb::::block_id(&txn, b).expect("reporting block we didn't save the ID for"); - let mut batch_id = ScannerDb::::acquire_batch_id(txn); + IndexDb::block_id(&txn, b).expect("reporting block we didn't save the ID for"); + let mut batch_id = ScannerDb::::acquire_batch_id(&mut txn); // start with empty batch let mut batches = @@ -83,7 +83,7 @@ impl ContinuallyRan for ReportTask { let instruction = batch.instructions.pop().unwrap(); // bump the id for the new batch - batch_id = ScannerDb::::acquire_batch_id(txn); + batch_id = ScannerDb::::acquire_batch_id(&mut txn); // make a new batch with this instruction included batches.push(Batch { diff --git a/processor/scanner/src/scan.rs b/processor/scanner/src/scan.rs index ddc1110e..7e59c92d 100644 --- a/processor/scanner/src/scan.rs +++ b/processor/scanner/src/scan.rs @@ -76,7 +76,7 @@ impl ContinuallyRan for ScanForOutputsTask { .expect("ScanForOutputsTask run before writing the start block"); for b in next_to_scan ..= latest_scannable { - let block = self.feed.block_by_number(b).await?; + let block = self.feed.block_by_number(&self.db, b).await?; log::info!("scanning block: {} ({b})", hex::encode(block.id())); @@ -173,7 +173,7 @@ impl ContinuallyRan for ScanForOutputsTask { }, (Some(return_addr), None) => { // Since there was no instruction here, return this since we parsed a return address - ScannerDb::::queue_return(&mut txn, b, return_addr, output); + ScannerDb::::queue_return(&mut txn, b, &return_addr, &output); continue; } // Since we didn't receive an instruction nor can we return this, move on