Add ReportDb

This commit is contained in:
Luke Parker 2024-08-28 19:58:28 -04:00
parent 7cc07d64d1
commit 65f3f48517
5 changed files with 65 additions and 39 deletions

View file

@ -47,7 +47,7 @@ impl<S: ScannerFeed> OutputWithInInstruction<S> {
} }
create_db!( create_db!(
Scanner { ScannerGlobal {
ActiveKeys: <K: Borshy>() -> Vec<SeraiKeyDbEntry<K>>, ActiveKeys: <K: Borshy>() -> Vec<SeraiKeyDbEntry<K>>,
RetireAt: <K: Encode>(key: K) -> u64, RetireAt: <K: Encode>(key: K) -> u64,
@ -78,8 +78,8 @@ create_db!(
} }
); );
pub(crate) struct ScannerDb<S: ScannerFeed>(PhantomData<S>); pub(crate) struct ScannerGlobalDb<S: ScannerFeed>(PhantomData<S>);
impl<S: ScannerFeed> ScannerDb<S> { impl<S: ScannerFeed> ScannerGlobalDb<S> {
/// Queue a key. /// Queue a key.
/// ///
/// Keys may be queued whenever, so long as they're scheduled to activate `WINDOW_LENGTH` blocks /// Keys may be queued whenever, so long as they're scheduled to activate `WINDOW_LENGTH` blocks
@ -180,20 +180,6 @@ impl<S: ScannerFeed> ScannerDb<S> {
Some(keys) Some(keys)
} }
pub(crate) fn set_start_block(txn: &mut impl DbTxn, start_block: u64, id: [u8; 32]) {
NextToPotentiallyReportBlock::set(txn, &start_block);
}
pub(crate) fn set_next_to_potentially_report_block(
txn: &mut impl DbTxn,
next_to_potentially_report_block: u64,
) {
NextToPotentiallyReportBlock::set(txn, &next_to_potentially_report_block);
}
pub(crate) fn next_to_potentially_report_block(getter: &impl Get) -> Option<u64> {
NextToPotentiallyReportBlock::get(getter)
}
pub(crate) fn set_highest_acknowledged_block( pub(crate) fn set_highest_acknowledged_block(
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
highest_acknowledged_block: u64, highest_acknowledged_block: u64,
@ -224,10 +210,6 @@ impl<S: ScannerFeed> ScannerDb<S> {
NotableBlock::get(getter, number).is_some() NotableBlock::get(getter, number).is_some()
} }
pub(crate) fn acquire_batch_id(txn: &mut impl DbTxn) -> u32 {
todo!("TODO")
}
pub(crate) fn return_address_and_in_instruction_for_forwarded_output( pub(crate) fn return_address_and_in_instruction_for_forwarded_output(
getter: &impl Get, getter: &impl Get,
output: &<OutputFor<S> as ReceivedOutput<KeyFor<S>, AddressFor<S>>>::Id, output: &<OutputFor<S> as ReceivedOutput<KeyFor<S>, AddressFor<S>>>::Id,

View file

@ -4,10 +4,9 @@ use serai_db::{Get, DbTxn, Db};
use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Eventuality, Block}; use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Eventuality, Block};
// TODO: Localize to EventualityDb?
use crate::{ use crate::{
lifetime::LifetimeStage, lifetime::LifetimeStage,
db::{OutputWithInInstruction, ReceiverScanData, ScannerDb, ScanToEventualityDb}, db::{OutputWithInInstruction, ReceiverScanData, ScannerGlobalDb, ScanToEventualityDb},
BlockExt, ScannerFeed, KeyFor, SchedulerUpdate, Scheduler, sort_outputs, BlockExt, ScannerFeed, KeyFor, SchedulerUpdate, Scheduler, sort_outputs,
scan::{next_to_scan_for_outputs_block, queue_output_until_block}, scan::{next_to_scan_for_outputs_block, queue_output_until_block},
}; };
@ -69,7 +68,7 @@ pub(crate) fn latest_scannable_block<S: ScannerFeed>(getter: &impl Get) -> Optio
This forms a backlog only if the latency of scanning, acknowledgement, and intake (including This forms a backlog only if the latency of scanning, acknowledgement, and intake (including
checking Eventualities) exceeds the window duration (the desired property). checking Eventualities) exceeds the window duration (the desired property).
*/ */
struct EventualityTask<D: Db, S: ScannerFeed, Sch: Scheduler<S>> { pub(crate) struct EventualityTask<D: Db, S: ScannerFeed, Sch: Scheduler<S>> {
db: D, db: D,
feed: S, feed: S,
scheduler: Sch, scheduler: Sch,
@ -115,7 +114,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
}; };
// Fetch the highest acknowledged block // Fetch the highest acknowledged block
let highest_acknowledged = ScannerDb::<S>::highest_acknowledged_block(&self.db) let highest_acknowledged = ScannerGlobalDb::<S>::highest_acknowledged_block(&self.db)
.expect("EventualityTask run before writing the start block"); .expect("EventualityTask run before writing the start block");
// Fetch the next block to check // Fetch the next block to check
@ -132,7 +131,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
// This is possible since even if we receive coins in block 0, any transactions we'd make // This is possible since even if we receive coins in block 0, any transactions we'd make
// would resolve in block 1 (the first block we'll check under this non-zero rule) // would resolve in block 1 (the first block we'll check under this non-zero rule)
let prior_block = b - 1; let prior_block = b - 1;
if ScannerDb::<S>::is_block_notable(&self.db, prior_block) && if ScannerGlobalDb::<S>::is_block_notable(&self.db, prior_block) &&
(prior_block > highest_acknowledged) (prior_block > highest_acknowledged)
{ {
break; break;
@ -156,8 +155,9 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
one which decides when to retire a key, and when it marks a key to be retired, it is done one which decides when to retire a key, and when it marks a key to be retired, it is done
with it. Accordingly, it's not an issue if such a key was dropped. with it. Accordingly, it's not an issue if such a key was dropped.
*/ */
let mut keys = ScannerDb::<S>::active_keys_as_of_next_to_scan_for_outputs_block(&self.db) let mut keys =
.expect("scanning for a blockchain without any keys set"); ScannerGlobalDb::<S>::active_keys_as_of_next_to_scan_for_outputs_block(&self.db)
.expect("scanning for a blockchain without any keys set");
// Since the next-to-scan block is ahead of us, drop keys which have yet to actually activate // Since the next-to-scan block is ahead of us, drop keys which have yet to actually activate
keys.retain(|key| b <= key.activation_block_number); keys.retain(|key| b <= key.activation_block_number);
@ -226,7 +226,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
}; };
let (return_address, in_instruction) = let (return_address, in_instruction) =
ScannerDb::<S>::return_address_and_in_instruction_for_forwarded_output( ScannerGlobalDb::<S>::return_address_and_in_instruction_for_forwarded_output(
&txn, &forwarded, &txn, &forwarded,
) )
.expect("forwarded an output yet didn't save its InInstruction to the DB"); .expect("forwarded an output yet didn't save its InInstruction to the DB");
@ -281,7 +281,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
// Retire this key `WINDOW_LENGTH` blocks in the future to ensure the scan task never // Retire this key `WINDOW_LENGTH` blocks in the future to ensure the scan task never
// has a malleable view of the keys. // has a malleable view of the keys.
ScannerDb::<S>::retire_key(&mut txn, b + S::WINDOW_LENGTH, key.key); ScannerGlobalDb::<S>::retire_key(&mut txn, b + S::WINDOW_LENGTH, key.key);
} }
} }
} }

View file

@ -0,0 +1,27 @@
use core::marker::PhantomData;
use serai_db::{Get, DbTxn, Db, create_db};
create_db!(
ScannerReport {
// The next block to potentially report
NextToPotentiallyReportBlock: () -> u64,
}
);
pub(crate) struct ReportDb;
impl ReportDb {
pub(crate) fn set_next_to_potentially_report_block(
txn: &mut impl DbTxn,
next_to_potentially_report_block: u64,
) {
NextToPotentiallyReportBlock::set(txn, &next_to_potentially_report_block);
}
pub(crate) fn next_to_potentially_report_block(getter: &impl Get) -> Option<u64> {
NextToPotentiallyReportBlock::get(getter)
}
pub(crate) fn acquire_batch_id(txn: &mut impl DbTxn) -> u32 {
todo!("TODO")
}
}

View file

@ -4,14 +4,16 @@ use serai_db::{DbTxn, Db};
use serai_primitives::BlockHash; use serai_primitives::BlockHash;
use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
// TODO: Localize to Report?
use crate::{ use crate::{
db::{ScannerDb, ScanToReportDb}, db::{ScannerGlobalDb, ScanToReportDb},
index, index,
scan::next_to_scan_for_outputs_block, scan::next_to_scan_for_outputs_block,
ScannerFeed, ContinuallyRan, ScannerFeed, ContinuallyRan,
}; };
mod db;
use db::ReportDb;
/* /*
This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion. This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion.
@ -19,11 +21,24 @@ use crate::{
Eventualities, have processed the block. This ensures we know if this block is notable, and have Eventualities, have processed the block. This ensures we know if this block is notable, and have
the InInstructions for it. the InInstructions for it.
*/ */
struct ReportTask<D: Db, S: ScannerFeed> { pub(crate) struct ReportTask<D: Db, S: ScannerFeed> {
db: D, db: D,
feed: S, feed: S,
} }
impl<D: Db, S: ScannerFeed> ReportTask<D, S> {
pub(crate) fn new(mut db: D, feed: S, start_block: u64) -> Self {
if ReportDb::next_to_potentially_report_block(&db).is_none() {
// Initialize the DB
let mut txn = db.txn();
ReportDb::set_next_to_potentially_report_block(&mut txn, start_block);
txn.commit();
}
Self { db, feed }
}
}
#[async_trait::async_trait] #[async_trait::async_trait]
impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> { impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
async fn run_iteration(&mut self) -> Result<bool, String> { async fn run_iteration(&mut self) -> Result<bool, String> {
@ -44,7 +59,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
last_scanned last_scanned
}; };
let next_to_potentially_report = ScannerDb::<S>::next_to_potentially_report_block(&self.db) let next_to_potentially_report = ReportDb::next_to_potentially_report_block(&self.db)
.expect("ReportTask run before writing the start block"); .expect("ReportTask run before writing the start block");
for b in next_to_potentially_report ..= highest_reportable { for b in next_to_potentially_report ..= highest_reportable {
@ -53,7 +68,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
// Receive the InInstructions for this block // Receive the InInstructions for this block
// We always do this as we can't trivially tell if we should recv InInstructions before we do // We always do this as we can't trivially tell if we should recv InInstructions before we do
let in_instructions = ScanToReportDb::<S>::recv_in_instructions(&mut txn, b); let in_instructions = ScanToReportDb::<S>::recv_in_instructions(&mut txn, b);
let notable = ScannerDb::<S>::is_block_notable(&txn, b); let notable = ScannerGlobalDb::<S>::is_block_notable(&txn, b);
if !notable { if !notable {
assert!(in_instructions.is_empty(), "block wasn't notable yet had InInstructions"); assert!(in_instructions.is_empty(), "block wasn't notable yet had InInstructions");
} }
@ -61,7 +76,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
if notable { if notable {
let network = S::NETWORK; let network = S::NETWORK;
let block_hash = index::block_id(&txn, b); let block_hash = index::block_id(&txn, b);
let mut batch_id = ScannerDb::<S>::acquire_batch_id(&mut txn); let mut batch_id = ReportDb::acquire_batch_id(&mut txn);
// start with empty batch // start with empty batch
let mut batches = let mut batches =
@ -77,7 +92,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
let instruction = batch.instructions.pop().unwrap(); let instruction = batch.instructions.pop().unwrap();
// bump the id for the new batch // bump the id for the new batch
batch_id = ScannerDb::<S>::acquire_batch_id(&mut txn); batch_id = ReportDb::acquire_batch_id(&mut txn);
// make a new batch with this instruction included // make a new batch with this instruction included
batches.push(Batch { batches.push(Batch {
@ -93,7 +108,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
} }
// Update the next to potentially report block // Update the next to potentially report block
ScannerDb::<S>::set_next_to_potentially_report_block(&mut txn, b + 1); ReportDb::set_next_to_potentially_report_block(&mut txn, b + 1);
txn.commit(); txn.commit();
} }

View file

@ -8,7 +8,6 @@ use serai_in_instructions_primitives::{
use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Block}; use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Block};
// TODO: Localize to ScanDb?
use crate::{ use crate::{
lifetime::LifetimeStage, lifetime::LifetimeStage,
db::{OutputWithInInstruction, SenderScanData, ScannerDb, ScanToReportDb, ScanToEventualityDb}, db::{OutputWithInInstruction, SenderScanData, ScannerDb, ScanToReportDb, ScanToEventualityDb},
@ -28,6 +27,9 @@ pub(crate) fn queue_output_until_block<S: ScannerFeed>(
queue_for_block: u64, queue_for_block: u64,
output: &OutputWithInInstruction<S>, output: &OutputWithInInstruction<S>,
) { ) {
// This isn't a perfect assertion as by the time this txn commits, we may have already started
// scanning this block. That doesn't change it should never trip as we queue outside the window
// we'll scan
assert!( assert!(
queue_for_block >= queue_for_block >=
next_to_scan_for_outputs_block::<S>(txn) next_to_scan_for_outputs_block::<S>(txn)