Add a callback to accumulate outputs and return the new Eventualities

This commit is contained in:
Luke Parker 2024-08-26 22:49:57 -04:00
parent 379780a3c9
commit 2fcd9530dd
5 changed files with 49 additions and 16 deletions

View file

@ -1,9 +1,11 @@
use serai_db::{Db, DbTxn}; use group::GroupEncoding;
use serai_db::{DbTxn, Db};
use primitives::{OutputType, ReceivedOutput, Block}; use primitives::{OutputType, ReceivedOutput, Block};
// TODO: Localize to EventualityDb? // TODO: Localize to EventualityDb?
use crate::{lifetime::LifetimeStage, db::ScannerDb, ScannerFeed, ContinuallyRan}; use crate::{lifetime::LifetimeStage, db::ScannerDb, ScannerFeed, KeyFor, Scheduler, ContinuallyRan};
/* /*
Note: The following assumes there's some value, `CONFIRMATIONS`, and the finalized block we Note: The following assumes there's some value, `CONFIRMATIONS`, and the finalized block we
@ -53,13 +55,14 @@ use crate::{lifetime::LifetimeStage, db::ScannerDb, ScannerFeed, ContinuallyRan}
This forms a backlog only if the latency of scanning, acknowledgement, and intake (including This forms a backlog only if the latency of scanning, acknowledgement, and intake (including
checking Eventualities) exceeds the window duration (the desired property). checking Eventualities) exceeds the window duration (the desired property).
*/ */
struct EventualityTask<D: Db, S: ScannerFeed> { struct EventualityTask<D: Db, S: ScannerFeed, Sch: Scheduler<S>> {
db: D, db: D,
feed: S, feed: S,
scheduler: Sch,
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl<D: Db, S: ScannerFeed> ContinuallyRan for EventualityTask<D, S> { impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTask<D, S, Sch> {
async fn run_iteration(&mut self) -> Result<bool, String> { async fn run_iteration(&mut self) -> Result<bool, String> {
/* /*
The set of Eventualities only increase when a block is acknowledged. Accordingly, we can only The set of Eventualities only increase when a block is acknowledged. Accordingly, we can only
@ -168,6 +171,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for EventualityTask<D, S> {
}; };
// Fetch all non-External outputs // Fetch all non-External outputs
// TODO: Have a scan_for_outputs_ext which sorts for us
let mut non_external_outputs = block.scan_for_outputs(key.key); let mut non_external_outputs = block.scan_for_outputs(key.key);
non_external_outputs.retain(|output| output.kind() != OutputType::External); non_external_outputs.retain(|output| output.kind() != OutputType::External);
// Drop any outputs less than the dust limit // Drop any outputs less than the dust limit
@ -206,8 +210,24 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for EventualityTask<D, S> {
let outputs_to_return = ScannerDb::<S>::take_queued_returns(&mut txn, b); let outputs_to_return = ScannerDb::<S>::take_queued_returns(&mut txn, b);
let new_eventualities =
self.scheduler.accumulate_outputs_and_return_outputs(&mut txn, outputs, outputs_to_return);
for (key, new_eventualities) in new_eventualities {
let key = {
let mut key_repr = <KeyFor<S> as GroupEncoding>::Repr::default();
assert_eq!(key.len(), key_repr.as_ref().len());
key_repr.as_mut().copy_from_slice(&key);
KeyFor::<S>::from_bytes(&key_repr).unwrap()
};
let mut eventualities = ScannerDb::<S>::eventualities(&txn, key.key);
for new_eventuality in new_eventualities {
eventualities.active_eventualities.insert(new_eventuality.lookup(), new_eventuality);
}
ScannerDb::<S>::set_eventualities(&mut txn, eventualities);
}
// Update the next to check block // Update the next to check block
// TODO: Two-stage process
ScannerDb::<S>::set_next_to_check_for_eventualities_block(&mut txn, next_to_check); ScannerDb::<S>::set_next_to_check_for_eventualities_block(&mut txn, next_to_check);
txn.commit(); txn.commit();
} }

View file

@ -1,4 +1,4 @@
use serai_db::{Db, DbTxn}; use serai_db::{DbTxn, Db};
use primitives::BlockHeader; use primitives::BlockHeader;

View file

@ -1,8 +1,12 @@
use core::{marker::PhantomData, fmt::Debug, time::Duration}; use core::{marker::PhantomData, fmt::Debug, time::Duration};
use std::collections::HashMap;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use serai_db::DbTxn;
use serai_primitives::{NetworkId, Coin, Amount}; use serai_primitives::{NetworkId, Coin, Amount};
use primitives::Block; use primitives::Block;
// Logic for deciding where in its lifetime a multisig is. // Logic for deciding where in its lifetime a multisig is.
@ -91,6 +95,21 @@ pub trait ScannerFeed: Send + Sync {
type KeyFor<S> = <<S as ScannerFeed>::Block as Block>::Key; type KeyFor<S> = <<S as ScannerFeed>::Block as Block>::Key;
type AddressFor<S> = <<S as ScannerFeed>::Block as Block>::Address; type AddressFor<S> = <<S as ScannerFeed>::Block as Block>::Address;
type OutputFor<S> = <<S as ScannerFeed>::Block as Block>::Output; type OutputFor<S> = <<S as ScannerFeed>::Block as Block>::Output;
type EventualityFor<S> = <<S as ScannerFeed>::Block as Block>::Eventuality;
/// The object responsible for accumulating outputs and planning new transactions.
pub trait Scheduler<S: ScannerFeed> {
/// Accumulate outputs into the scheduler, yielding the Eventualities now to be scanned for.
///
/// The `Vec<u8>` used as the key in the returned HashMap should be the encoded key these
/// Eventualities are for.
fn accumulate_outputs_and_return_outputs(
&mut self,
txn: &mut impl DbTxn,
outputs: Vec<OutputFor<S>>,
outputs_to_return: Vec<OutputFor<S>>,
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>>;
}
/// A handle to immediately run an iteration of a task. /// A handle to immediately run an iteration of a task.
#[derive(Clone)] #[derive(Clone)]
@ -189,8 +208,9 @@ impl<S: ScannerFeed> Scanner<S> {
/// ///
/// This means this block was ordered on Serai in relation to `Burn` events, and all validators /// This means this block was ordered on Serai in relation to `Burn` events, and all validators
/// have achieved synchrony on it. /// have achieved synchrony on it.
// TODO: If we're acknowledge block `b`, the Eventuality task was already eligible to check it // TODO: If we're acknowledging block `b`, the Eventuality task was already eligible to check it
// for Eventualities. We need this to block until the Eventuality task has actually checked it. // for Eventualities. We need this to block until the Eventuality task has actually checked it.
// TODO: Does the prior TODO hold with how the callback is now handled?
pub fn acknowledge_block( pub fn acknowledge_block(
&mut self, &mut self,
block_number: u64, block_number: u64,
@ -198,13 +218,6 @@ impl<S: ScannerFeed> Scanner<S> {
) -> Vec<OutputFor<S>> { ) -> Vec<OutputFor<S>> {
todo!("TODO") todo!("TODO")
} }
/// Register the Eventualities caused by a block.
// TODO: Replace this with a callback returned by acknowledge_block which panics if it's not
// called yet dropped
pub fn register_eventualities(&mut self, block_number: u64, eventualities: Vec<()>) {
todo!("TODO")
}
} }
/* /*

View file

@ -1,5 +1,5 @@
use scale::Encode; use scale::Encode;
use serai_db::{Db, DbTxn}; use serai_db::{DbTxn, Db};
use serai_primitives::BlockHash; use serai_primitives::BlockHash;
use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};

View file

@ -1,5 +1,5 @@
use scale::Decode; use scale::Decode;
use serai_db::{Db, DbTxn}; use serai_db::{DbTxn, Db};
use serai_primitives::MAX_DATA_LEN; use serai_primitives::MAX_DATA_LEN;
use serai_in_instructions_primitives::{ use serai_in_instructions_primitives::{