Add dedicated Eventuality DB and stub missing fns

This commit is contained in:
Luke Parker 2024-08-27 00:23:15 -04:00
parent 33e0c85f34
commit 9ab8ba0215
6 changed files with 81 additions and 26 deletions

View file

@ -215,8 +215,8 @@ impl<S: ScannerFeed> ScannerDb<S> {
pub(crate) fn queue_return( pub(crate) fn queue_return(
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
block_queued_from: u64, block_queued_from: u64,
return_addr: AddressFor<S>, return_addr: &AddressFor<S>,
output: OutputFor<S>, output: &OutputFor<S>,
) { ) {
todo!("TODO") todo!("TODO")
} }
@ -253,6 +253,10 @@ impl<S: ScannerFeed> ScannerDb<S> {
} }
pub(crate) fn flag_notable(txn: &mut impl DbTxn, block_number: u64) { pub(crate) fn flag_notable(txn: &mut impl DbTxn, block_number: u64) {
assert!(
NextToPotentiallyReportBlock::get(txn).unwrap() <= block_number,
"already potentially reported a block we're only now flagging as notable"
);
NotableBlock::set(txn, block_number, &()); NotableBlock::set(txn, block_number, &());
} }
@ -285,4 +289,12 @@ impl<S: ScannerFeed> ScannerDb<S> {
pub(crate) fn is_block_notable(getter: &impl Get, number: u64) -> bool { pub(crate) fn is_block_notable(getter: &impl Get, number: u64) -> bool {
NotableBlock::get(getter, number).is_some() NotableBlock::get(getter, number).is_some()
} }
pub(crate) fn take_queued_returns(txn: &mut impl DbTxn, block_number: u64) -> Vec<OutputFor<S>> {
todo!("TODO")
}
pub(crate) fn acquire_batch_id(txn: &mut impl DbTxn) -> u32 {
todo!("TODO")
}
} }

View file

@ -0,0 +1,36 @@
use core::marker::PhantomData;
use borsh::{BorshSerialize, BorshDeserialize};
use serai_db::{Get, DbTxn, create_db};
use primitives::EventualityTracker;
use crate::{ScannerFeed, KeyFor, EventualityFor};
// The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this.
trait Borshy: BorshSerialize + BorshDeserialize {}
impl<T: BorshSerialize + BorshDeserialize> Borshy for T {}
create_db!(
ScannerEventuality {
SerializedEventualities: <K: Borshy>() -> Vec<u8>,
}
);
pub(crate) struct EventualityDb<S: ScannerFeed>(PhantomData<S>);
impl<S: ScannerFeed> EventualityDb<S> {
pub(crate) fn set_eventualities(
txn: &mut impl DbTxn,
key: KeyFor<S>,
eventualities: &EventualityTracker<EventualityFor<S>>,
) {
todo!("TODO")
}
pub(crate) fn eventualities(
getter: &impl Get,
key: KeyFor<S>,
) -> EventualityTracker<EventualityFor<S>> {
todo!("TODO")
}
}

View file

@ -2,13 +2,16 @@ use group::GroupEncoding;
use serai_db::{DbTxn, Db}; use serai_db::{DbTxn, Db};
use primitives::{OutputType, ReceivedOutput, Block}; use primitives::{OutputType, ReceivedOutput, Eventuality, Block};
// TODO: Localize to EventualityDb? // TODO: Localize to EventualityDb?
use crate::{ use crate::{
lifetime::LifetimeStage, db::ScannerDb, BlockExt, ScannerFeed, KeyFor, Scheduler, ContinuallyRan, lifetime::LifetimeStage, db::ScannerDb, BlockExt, ScannerFeed, KeyFor, Scheduler, ContinuallyRan,
}; };
mod db;
use db::EventualityDb;
/* /*
When we scan a block, we receive outputs. When this block is acknowledged, we accumulate those When we scan a block, we receive outputs. When this block is acknowledged, we accumulate those
outputs into some scheduler, potentially causing certain transactions to begin their signing outputs into some scheduler, potentially causing certain transactions to begin their signing
@ -110,7 +113,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
iterated = true; iterated = true;
let block = self.feed.block_by_number(b).await?; let block = self.feed.block_by_number(&self.db, b).await?;
log::info!("checking eventuality completions in block: {} ({b})", hex::encode(block.id())); log::info!("checking eventuality completions in block: {} ({b})", hex::encode(block.id()));
@ -144,9 +147,9 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
for key in keys { for key in keys {
let completed_eventualities = { let completed_eventualities = {
let mut eventualities = ScannerDb::<S>::eventualities(&txn, key.key); let mut eventualities = EventualityDb::<S>::eventualities(&txn, key.key);
let completed_eventualities = block.check_for_eventuality_resolutions(&mut eventualities); let completed_eventualities = block.check_for_eventuality_resolutions(&mut eventualities);
ScannerDb::<S>::set_eventualities(&mut txn, eventualities); EventualityDb::<S>::set_eventualities(&mut txn, key.key, &eventualities);
completed_eventualities completed_eventualities
}; };
@ -200,11 +203,11 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
KeyFor::<S>::from_bytes(&key_repr).unwrap() KeyFor::<S>::from_bytes(&key_repr).unwrap()
}; };
let mut eventualities = ScannerDb::<S>::eventualities(&txn, key); let mut eventualities = EventualityDb::<S>::eventualities(&txn, key);
for new_eventuality in new_eventualities { for new_eventuality in new_eventualities {
eventualities.active_eventualities.insert(new_eventuality.lookup(), new_eventuality); eventualities.active_eventualities.insert(new_eventuality.lookup(), new_eventuality);
} }
ScannerDb::<S>::set_eventualities(&mut txn, eventualities); EventualityDb::<S>::set_eventualities(&mut txn, key, &eventualities);
} }
// Update the next to check block // Update the next to check block

View file

@ -1,13 +1,11 @@
use core::{marker::PhantomData, fmt::Debug, time::Duration}; use core::{marker::PhantomData, fmt::Debug};
use std::collections::HashMap; use std::collections::HashMap;
use tokio::sync::mpsc; use serai_db::{Get, DbTxn};
use serai_db::DbTxn;
use serai_primitives::{NetworkId, Coin, Amount}; use serai_primitives::{NetworkId, Coin, Amount};
use primitives::{task::*, Block}; use primitives::{task::*, ReceivedOutput, Block};
// Logic for deciding where in its lifetime a multisig is. // Logic for deciding where in its lifetime a multisig is.
mod lifetime; mod lifetime;
@ -29,10 +27,10 @@ pub(crate) trait BlockExt: Block {
} }
impl<B: Block> BlockExt for B { impl<B: Block> BlockExt for B {
fn scan_for_outputs(&self, key: Self::Key) -> Vec<Self::Output> { fn scan_for_outputs(&self, key: Self::Key) -> Vec<Self::Output> {
let mut outputs = self.scan_for_outputs_unordered(); let mut outputs = self.scan_for_outputs_unordered(key);
outputs.sort_by(|a, b| { outputs.sort_by(|a, b| {
use core::cmp::{Ordering, Ord}; use core::cmp::{Ordering, Ord};
let res = a.id().as_ref().cmp(&b.id().as_ref()); let res = a.id().as_ref().cmp(b.id().as_ref());
assert!(res != Ordering::Equal, "scanned two outputs within a block with the same ID"); assert!(res != Ordering::Equal, "scanned two outputs within a block with the same ID");
res res
}); });
@ -103,7 +101,11 @@ pub trait ScannerFeed: Send + Sync {
/// Fetch a block by its number. /// Fetch a block by its number.
/// ///
/// Panics if the block requested wasn't indexed. /// Panics if the block requested wasn't indexed.
async fn block_by_number(&self, getter: &impl Get, number: u64) -> Result<Self::Block, String> { async fn block_by_number(
&self,
getter: &(impl Send + Sync + Get),
number: u64,
) -> Result<Self::Block, String> {
let block = match self.unchecked_block_by_number(number).await { let block = match self.unchecked_block_by_number(number).await {
Ok(block) => block, Ok(block) => block,
Err(e) => Err(format!("couldn't fetch block {number}: {e:?}"))?, Err(e) => Err(format!("couldn't fetch block {number}: {e:?}"))?,
@ -111,8 +113,8 @@ pub trait ScannerFeed: Send + Sync {
// Check the ID of this block is the expected ID // Check the ID of this block is the expected ID
{ {
let expected = let expected = crate::index::IndexDb::block_id(getter, number)
crate::index::IndexDb::block_id(&self.db, number).expect("requested a block which wasn't indexed"); .expect("requested a block which wasn't indexed");
if block.id() != expected { if block.id() != expected {
panic!( panic!(
"finalized chain reorganized from {} to {} at {}", "finalized chain reorganized from {} to {} at {}",
@ -122,6 +124,8 @@ pub trait ScannerFeed: Send + Sync {
); );
} }
} }
Ok(block)
} }
/// The cost to aggregate an input as of the specified block. /// The cost to aggregate an input as of the specified block.
@ -146,7 +150,7 @@ type OutputFor<S> = <<S as ScannerFeed>::Block as Block>::Output;
type EventualityFor<S> = <<S as ScannerFeed>::Block as Block>::Eventuality; type EventualityFor<S> = <<S as ScannerFeed>::Block as Block>::Eventuality;
/// The object responsible for accumulating outputs and planning new transactions. /// The object responsible for accumulating outputs and planning new transactions.
pub trait Scheduler<S: ScannerFeed> { pub trait Scheduler<S: ScannerFeed>: Send {
/// Accumulate outputs into the scheduler, yielding the Eventualities now to be scanned for. /// Accumulate outputs into the scheduler, yielding the Eventualities now to be scanned for.
/// ///
/// The `Vec<u8>` used as the key in the returned HashMap should be the encoded key these /// The `Vec<u8>` used as the key in the returned HashMap should be the encoded key these

View file

@ -7,7 +7,7 @@ use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
use primitives::ReceivedOutput; use primitives::ReceivedOutput;
// TODO: Localize to ReportDb? // TODO: Localize to ReportDb?
use crate::{db::ScannerDb, ScannerFeed, ContinuallyRan}; use crate::{db::ScannerDb, index::IndexDb, ScannerFeed, ContinuallyRan};
/* /*
This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion. This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion.
@ -57,7 +57,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
// methods to be used in the future) // methods to be used in the future)
in_instructions.sort_by(|a, b| { in_instructions.sort_by(|a, b| {
use core::cmp::{Ordering, Ord}; use core::cmp::{Ordering, Ord};
let res = a.output.id().as_ref().cmp(&b.output.id().as_ref()); let res = a.output.id().as_ref().cmp(b.output.id().as_ref());
assert!(res != Ordering::Equal); assert!(res != Ordering::Equal);
res res
}); });
@ -66,8 +66,8 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
let network = S::NETWORK; let network = S::NETWORK;
let block_hash = let block_hash =
ScannerDb::<S>::block_id(&txn, b).expect("reporting block we didn't save the ID for"); IndexDb::block_id(&txn, b).expect("reporting block we didn't save the ID for");
let mut batch_id = ScannerDb::<S>::acquire_batch_id(txn); let mut batch_id = ScannerDb::<S>::acquire_batch_id(&mut txn);
// start with empty batch // start with empty batch
let mut batches = let mut batches =
@ -83,7 +83,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
let instruction = batch.instructions.pop().unwrap(); let instruction = batch.instructions.pop().unwrap();
// bump the id for the new batch // bump the id for the new batch
batch_id = ScannerDb::<S>::acquire_batch_id(txn); batch_id = ScannerDb::<S>::acquire_batch_id(&mut txn);
// make a new batch with this instruction included // make a new batch with this instruction included
batches.push(Batch { batches.push(Batch {

View file

@ -76,7 +76,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
.expect("ScanForOutputsTask run before writing the start block"); .expect("ScanForOutputsTask run before writing the start block");
for b in next_to_scan ..= latest_scannable { for b in next_to_scan ..= latest_scannable {
let block = self.feed.block_by_number(b).await?; let block = self.feed.block_by_number(&self.db, b).await?;
log::info!("scanning block: {} ({b})", hex::encode(block.id())); log::info!("scanning block: {} ({b})", hex::encode(block.id()));
@ -173,7 +173,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
}, },
(Some(return_addr), None) => { (Some(return_addr), None) => {
// Since there was no instruction here, return this since we parsed a return address // Since there was no instruction here, return this since we parsed a return address
ScannerDb::<S>::queue_return(&mut txn, b, return_addr, output); ScannerDb::<S>::queue_return(&mut txn, b, &return_addr, &output);
continue; continue;
} }
// Since we didn't receive an instruction nor can we return this, move on // Since we didn't receive an instruction nor can we return this, move on