diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 42086681..3ea41161 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -3,13 +3,13 @@ use std::io; use scale::Encode; use borsh::{BorshSerialize, BorshDeserialize}; -use serai_db::{Get, DbTxn, create_db}; +use serai_db::{Get, DbTxn, create_db, db_channel}; use serai_in_instructions_primitives::InInstructionWithBalance; use primitives::{ReceivedOutput, BorshG}; -use crate::{lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor}; +use crate::{lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, Return}; // The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this. trait Borshy: BorshSerialize + BorshDeserialize {} @@ -76,8 +76,6 @@ create_db!( NotableBlock: (number: u64) -> (), SerializedQueuedOutputs: (block_number: u64) -> Vec, - SerializedForwardedOutputsIndex: (block_number: u64) -> Vec, - SerializedForwardedOutput: (output_id: &[u8]) -> Vec, SerializedOutputs: (block_number: u64) -> Vec, } ); @@ -209,15 +207,6 @@ impl ScannerDb { todo!("TODO") } - pub(crate) fn queue_return( - txn: &mut impl DbTxn, - block_queued_from: u64, - return_addr: &AddressFor, - output: &OutputFor, - ) { - todo!("TODO") - } - pub(crate) fn queue_output_until_block( txn: &mut impl DbTxn, queue_for_block: u64, @@ -229,26 +218,6 @@ impl ScannerDb { SerializedQueuedOutputs::set(txn, queue_for_block, &outputs); } - pub(crate) fn save_output_being_forwarded( - txn: &mut impl DbTxn, - block_forwarded_from: u64, - output: &OutputWithInInstruction, - ) { - let mut buf = Vec::with_capacity(128); - output.write(&mut buf).unwrap(); - - let id = output.output.id(); - - // Save this to an index so we can later fetch all outputs to forward - let mut forwarded_outputs = SerializedForwardedOutputsIndex::get(txn, block_forwarded_from) - .unwrap_or(Vec::with_capacity(32)); - forwarded_outputs.extend(id.as_ref()); - SerializedForwardedOutputsIndex::set(txn, block_forwarded_from, &forwarded_outputs); - - // Save the output itself - SerializedForwardedOutput::set(txn, id.as_ref(), &buf); - } - pub(crate) fn flag_notable(txn: &mut impl DbTxn, block_number: u64) { assert!( NextToPotentiallyReportBlock::get(txn).unwrap() <= block_number, @@ -287,11 +256,99 @@ impl ScannerDb { NotableBlock::get(getter, number).is_some() } - pub(crate) fn take_queued_returns(txn: &mut impl DbTxn, block_number: u64) -> Vec> { - todo!("TODO") - } - pub(crate) fn acquire_batch_id(txn: &mut impl DbTxn) -> u32 { todo!("TODO") } + + pub(crate) fn return_address_and_in_instruction_for_forwarded_output( + getter: &impl Get, + output: & as ReceivedOutput, AddressFor>>::Id, + ) -> Option<(Option>, InInstructionWithBalance)> { + todo!("TODO") + } +} + +/// The data produced by scanning a block. +/// +/// This is the sender's version which includes the forwarded outputs with their InInstructions, +/// which need to be saved to the database for later retrieval. +pub(crate) struct SenderScanData { + /// The block number. + pub(crate) block_number: u64, + /// The received outputs which should be accumulated into the scheduler. + pub(crate) received_external_outputs: Vec>, + /// The outputs which need to be forwarded. + pub(crate) forwards: Vec>, + /// The outputs which need to be returned. + pub(crate) returns: Vec>, +} + +/// The data produced by scanning a block. +/// +/// This is the receiver's version which doesn't include the forwarded outputs' InInstructions, as +/// the Eventuality task doesn't need it to process this block. +pub(crate) struct ReceiverScanData { + /// The block number. + pub(crate) block_number: u64, + /// The received outputs which should be accumulated into the scheduler. + pub(crate) received_external_outputs: Vec>, + /// The outputs which need to be forwarded. + pub(crate) forwards: Vec>, + /// The outputs which need to be returned. + pub(crate) returns: Vec>, +} + +#[derive(BorshSerialize, BorshDeserialize)] +pub(crate) struct SerializedScanData { + pub(crate) block_number: u64, + pub(crate) data: Vec, +} + +db_channel! { + ScannerScanEventuality { + ScannedBlock: (empty_key: ()) -> SerializedScanData, + } +} + +pub(crate) struct ScanToEventualityDb(PhantomData); +impl ScanToEventualityDb { + pub(crate) fn send_scan_data(txn: &mut impl DbTxn, block_number: u64, data: &SenderScanData) { + /* + SerializedForwardedOutputsIndex: (block_number: u64) -> Vec, + SerializedForwardedOutput: (output_id: &[u8]) -> Vec, + + pub(crate) fn save_output_being_forwarded( + txn: &mut impl DbTxn, + block_forwarded_from: u64, + output: &OutputWithInInstruction, + ) { + let mut buf = Vec::with_capacity(128); + output.write(&mut buf).unwrap(); + + let id = output.output.id(); + + // Save this to an index so we can later fetch all outputs to forward + let mut forwarded_outputs = SerializedForwardedOutputsIndex::get(txn, block_forwarded_from) + .unwrap_or(Vec::with_capacity(32)); + forwarded_outputs.extend(id.as_ref()); + SerializedForwardedOutputsIndex::set(txn, block_forwarded_from, &forwarded_outputs); + + // Save the output itself + SerializedForwardedOutput::set(txn, id.as_ref(), &buf); + } + */ + + ScannedBlock::send(txn, (), todo!("TODO")); + } + pub(crate) fn recv_scan_data(txn: &mut impl DbTxn, block_number: u64) -> ReceiverScanData { + let data = + ScannedBlock::try_recv(txn, ()).expect("receiving data for a scanned block not yet sent"); + assert_eq!( + block_number, data.block_number, + "received data for a scanned block distinct than expected" + ); + let data = &data.data; + + todo!("TODO") + } } diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index 3d70d650..4f5fbe63 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -2,11 +2,13 @@ use group::GroupEncoding; use serai_db::{DbTxn, Db}; -use primitives::{OutputType, ReceivedOutput, Eventuality, Block}; +use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Eventuality, Block}; // TODO: Localize to EventualityDb? use crate::{ - lifetime::LifetimeStage, db::ScannerDb, BlockExt, ScannerFeed, KeyFor, Scheduler, ContinuallyRan, + lifetime::LifetimeStage, + db::{OutputWithInInstruction, ReceiverScanData, ScannerDb, ScanToEventualityDb}, + BlockExt, ScannerFeed, KeyFor, SchedulerUpdate, Scheduler, }; mod db; @@ -137,13 +139,12 @@ impl> ContinuallyRan for EventualityTas let mut txn = self.db.txn(); - // Fetch the External outputs we reported, and therefore should yield after handling this - // block - let mut outputs = ScannerDb::::in_instructions(&txn, b) - .expect("handling eventualities/outputs for block which didn't set its InInstructions") - .into_iter() - .map(|output| output.output) - .collect::>(); + // Fetch the data from the scanner + let scan_data = ScanToEventualityDb::recv_scan_data(&mut txn, b); + assert_eq!(scan_data.block_number, b); + let ReceiverScanData { block_number: _, received_external_outputs, forwards, returns } = + scan_data; + let mut outputs = received_external_outputs; for key in keys { let completed_eventualities = { @@ -184,17 +185,37 @@ impl> ContinuallyRan for EventualityTas } // Now, we iterate over all Forwarded outputs and queue their InInstructions - todo!("TODO"); + for output in + non_external_outputs.iter().filter(|output| output.kind() == OutputType::Forwarded) + { + let Some(eventuality) = completed_eventualities.get(&output.transaction_id()) else { + // Output sent to the forwarding address yet not actually forwarded + continue; + }; + let Some(forwarded) = eventuality.forwarded_output() else { + // This was a TX made by us, yet someone burned to the forwarding address + continue; + }; + + let (return_address, in_instruction) = + ScannerDb::::return_address_and_in_instruction_for_forwarded_output( + &txn, &forwarded, + ) + .expect("forwarded an output yet didn't save its InInstruction to the DB"); + ScannerDb::::queue_output_until_block( + &mut txn, + b + S::WINDOW_LENGTH, + &OutputWithInInstruction { output: output.clone(), return_address, in_instruction }, + ); + } // Accumulate all of these outputs outputs.extend(non_external_outputs); } - let outputs_to_return = ScannerDb::::take_queued_returns(&mut txn, b); - // TODO: This also has to intake Burns let new_eventualities = - self.scheduler.accumulate_outputs_and_return_outputs(&mut txn, outputs, outputs_to_return); + self.scheduler.update(&mut txn, SchedulerUpdate { outputs, forwards, returns }); for (key, new_eventualities) in new_eventualities { let key = { let mut key_repr = as GroupEncoding>::Repr::default(); diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index a29f1069..ef295471 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -148,17 +148,29 @@ type AddressFor = <::Block as Block>::Address; type OutputFor = <::Block as Block>::Output; type EventualityFor = <::Block as Block>::Eventuality; +/// A return to occur. +pub struct Return { + address: AddressFor, + output: OutputFor, +} + +/// An update for the scheduler. +pub struct SchedulerUpdate { + outputs: Vec>, + forwards: Vec>, + returns: Vec>, +} + /// The object responsible for accumulating outputs and planning new transactions. pub trait Scheduler: Send { /// Accumulate outputs into the scheduler, yielding the Eventualities now to be scanned for. /// - /// The `Vec` used as the key in the returned HashMap should be the encoded key these + /// The `Vec` used as the key in the returned HashMap should be the encoded key the /// Eventualities are for. - fn accumulate_outputs_and_return_outputs( + fn update( &mut self, txn: &mut impl DbTxn, - outputs: Vec>, - outputs_to_return: Vec>, + update: SchedulerUpdate, ) -> HashMap, Vec>>; } diff --git a/processor/scanner/src/scan.rs b/processor/scanner/src/scan.rs index 7e59c92d..d8312e3b 100644 --- a/processor/scanner/src/scan.rs +++ b/processor/scanner/src/scan.rs @@ -11,8 +11,8 @@ use primitives::{OutputType, ReceivedOutput, Block}; // TODO: Localize to ScanDb? use crate::{ lifetime::LifetimeStage, - db::{OutputWithInInstruction, ScannerDb}, - BlockExt, ScannerFeed, AddressFor, OutputFor, ContinuallyRan, + db::{OutputWithInInstruction, SenderScanData, ScannerDb, ScanToEventualityDb}, + BlockExt, ScannerFeed, AddressFor, OutputFor, Return, ContinuallyRan, }; // Construct an InInstruction from an external output. @@ -86,6 +86,12 @@ impl ContinuallyRan for ScanForOutputsTask { let mut txn = self.db.txn(); + let mut scan_data = SenderScanData { + block_number: b, + received_external_outputs: vec![], + forwards: vec![], + returns: vec![], + }; let mut in_instructions = ScannerDb::::take_queued_outputs(&mut txn, b); // Scan for each key @@ -171,13 +177,21 @@ impl ContinuallyRan for ScanForOutputsTask { return_address, in_instruction: InInstructionWithBalance { instruction, balance: balance_to_use }, }, - (Some(return_addr), None) => { + (Some(address), None) => { // Since there was no instruction here, return this since we parsed a return address - ScannerDb::::queue_return(&mut txn, b, &return_addr, &output); + if key.stage != LifetimeStage::Finishing { + scan_data.returns.push(Return { address, output }); + } + continue; + } + // Since we didn't receive an instruction nor can we return this, queue this for + // accumulation and move on + (None, None) => { + if key.stage != LifetimeStage::Finishing { + scan_data.received_external_outputs.push(output); + } continue; } - // Since we didn't receive an instruction nor can we return this, move on - (None, None) => continue, }; // Drop External outputs if they're to a multisig which won't report them @@ -201,7 +215,7 @@ impl ContinuallyRan for ScanForOutputsTask { LifetimeStage::Forwarding => { // When the forwarded output appears, we can see which Plan it's associated with and // from there recover this output - ScannerDb::::save_output_being_forwarded(&mut txn, b, &output_with_in_instruction); + scan_data.forwards.push(output_with_in_instruction); continue; } // We should drop these as we should not be handling new External outputs at this @@ -213,10 +227,13 @@ impl ContinuallyRan for ScanForOutputsTask { // Ensures we didn't miss a `continue` above assert!(matches!(key.stage, LifetimeStage::Active | LifetimeStage::UsingNewForChange)); + scan_data.received_external_outputs.push(output_with_in_instruction.output.clone()); in_instructions.push(output_with_in_instruction); } } + // Save the outputs to return + ScanToEventualityDb::::send_scan_data(&mut txn, b, &scan_data); // Save the in instructions ScannerDb::::set_in_instructions(&mut txn, b, in_instructions); // Update the next to scan block