Use a channel for the InInstructions

It's still unclear how we'll handle refunding failed InInstructions at this
time. Presumably, extending the InInstruction channel with the associated
output ID?
This commit is contained in:
Luke Parker 2024-08-27 02:14:59 -04:00
parent 6196642beb
commit 75251f04b4
3 changed files with 79 additions and 53 deletions

View file

@ -226,32 +226,6 @@ impl<S: ScannerFeed> ScannerDb<S> {
NotableBlock::set(txn, block_number, &());
}
// TODO: Use a DbChannel here, and send the instructions to the report task and the outputs to
// the eventuality task? That way this cleans up after itself
pub(crate) fn set_in_instructions(
txn: &mut impl DbTxn,
block_number: u64,
outputs: Vec<OutputWithInInstruction<S>>,
) {
if !outputs.is_empty() {
// Set this block as notable
NotableBlock::set(txn, block_number, &());
}
let mut buf = Vec::with_capacity(outputs.len() * 128);
for output in outputs {
output.write(&mut buf).unwrap();
}
SerializedOutputs::set(txn, block_number, &buf);
}
pub(crate) fn in_instructions(
getter: &impl Get,
block_number: u64,
) -> Option<Vec<OutputWithInInstruction<S>>> {
todo!("TODO")
}
pub(crate) fn is_block_notable(getter: &impl Get, number: u64) -> bool {
NotableBlock::get(getter, number).is_some()
}
@ -352,3 +326,44 @@ impl<S: ScannerFeed> ScanToEventualityDb<S> {
todo!("TODO")
}
}
#[derive(BorshSerialize, BorshDeserialize)]
pub(crate) struct BlockBoundInInstructions {
pub(crate) block_number: u64,
pub(crate) in_instructions: Vec<InInstructionWithBalance>,
}
db_channel! {
ScannerScanReport {
InInstructions: (empty_key: ()) -> BlockBoundInInstructions,
}
}
pub(crate) struct ScanToReportDb<S: ScannerFeed>(PhantomData<S>);
impl<S: ScannerFeed> ScanToReportDb<S> {
pub(crate) fn send_in_instructions(
txn: &mut impl DbTxn,
block_number: u64,
in_instructions: Vec<InInstructionWithBalance>,
) {
if !in_instructions.is_empty() {
// Set this block as notable
NotableBlock::set(txn, block_number, &());
}
InInstructions::send(txn, (), &BlockBoundInInstructions { block_number, in_instructions });
}
pub(crate) fn recv_in_instructions(
txn: &mut impl DbTxn,
block_number: u64,
) -> Vec<InInstructionWithBalance> {
let data = InInstructions::try_recv(txn, ())
.expect("receiving InInstructions for a scanned block not yet sent");
assert_eq!(
block_number, data.block_number,
"received InInstructions for a scanned block distinct than expected"
);
data.in_instructions
}
}

View file

@ -4,10 +4,11 @@ use serai_db::{DbTxn, Db};
use serai_primitives::BlockHash;
use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
use primitives::ReceivedOutput;
// TODO: Localize to ReportDb?
use crate::{db::ScannerDb, index, ScannerFeed, ContinuallyRan};
// TODO: Localize to Report?
use crate::{
db::{ScannerDb, ScanToReportDb},
index, ScannerFeed, ContinuallyRan,
};
/*
This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion.
@ -47,23 +48,15 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
for b in next_to_potentially_report ..= highest_reportable {
let mut txn = self.db.txn();
// Receive the InInstructions for this block
// We always do this as we can't trivially tell if we should recv InInstructions before we do
let in_instructions = ScanToReportDb::<S>::recv_in_instructions(&mut txn, b);
let notable = ScannerDb::<S>::is_block_notable(&txn, b);
if !notable {
assert!(in_instructions.is_empty(), "block wasn't notable yet had InInstructions");
}
// If this block is notable, create the Batch(s) for it
if ScannerDb::<S>::is_block_notable(&txn, b) {
let in_instructions = {
let mut in_instructions = ScannerDb::<S>::in_instructions(&txn, b)
.expect("reporting block which didn't set its InInstructions");
// Sort these before reporting them in case anything we did is non-deterministic/to have
// a well-defined order (not implicit to however we got this result, enabling different
// methods to be used in the future)
in_instructions.sort_by(|a, b| {
use core::cmp::{Ordering, Ord};
let res = a.output.id().as_ref().cmp(b.output.id().as_ref());
assert!(res != Ordering::Equal);
res
});
in_instructions
};
if notable {
let network = S::NETWORK;
let block_hash = index::block_id(&txn, b);
let mut batch_id = ScannerDb::<S>::acquire_batch_id(&mut txn);
@ -74,7 +67,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
for instruction in in_instructions {
let batch = batches.last_mut().unwrap();
batch.instructions.push(instruction.in_instruction);
batch.instructions.push(instruction);
// check if batch is over-size
if batch.encode().len() > MAX_BATCH_SIZE {

View file

@ -11,7 +11,7 @@ use primitives::{OutputType, ReceivedOutput, Block};
// TODO: Localize to ScanDb?
use crate::{
lifetime::LifetimeStage,
db::{OutputWithInInstruction, SenderScanData, ScannerDb, ScanToEventualityDb},
db::{OutputWithInInstruction, SenderScanData, ScannerDb, ScanToReportDb, ScanToEventualityDb},
BlockExt, ScannerFeed, AddressFor, OutputFor, Return, ContinuallyRan,
};
@ -92,7 +92,25 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
forwards: vec![],
returns: vec![],
};
let mut in_instructions = ScannerDb::<S>::take_queued_outputs(&mut txn, b);
let mut in_instructions = vec![];
let queued_outputs = {
let mut queued_outputs = ScannerDb::<S>::take_queued_outputs(&mut txn, b);
// Sort the queued outputs in case they weren't queued in a deterministic fashion
queued_outputs.sort_by(|a, b| {
use core::cmp::{Ordering, Ord};
let res = a.output.id().as_ref().cmp(b.output.id().as_ref());
assert!(res != Ordering::Equal);
res
});
queued_outputs
};
for queued_output in queued_outputs {
scan_data.received_external_outputs.push(queued_output.output);
in_instructions.push(queued_output.in_instruction);
}
// Scan for each key
for key in keys {
@ -228,14 +246,14 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
assert!(matches!(key.stage, LifetimeStage::Active | LifetimeStage::UsingNewForChange));
scan_data.received_external_outputs.push(output_with_in_instruction.output.clone());
in_instructions.push(output_with_in_instruction);
in_instructions.push(output_with_in_instruction.in_instruction);
}
}
// Save the outputs to return
// Send the scan data to the eventuality task
ScanToEventualityDb::<S>::send_scan_data(&mut txn, b, &scan_data);
// Save the in instructions
ScannerDb::<S>::set_in_instructions(&mut txn, b, in_instructions);
// Send the in instructions to the report task
ScanToReportDb::<S>::send_in_instructions(&mut txn, b, in_instructions);
// Update the next to scan block
ScannerDb::<S>::set_next_to_scan_for_outputs_block(&mut txn, b + 1);
txn.commit();