mirror of
https://github.com/serai-dex/serai.git
synced 2024-12-23 12:09:37 +00:00
Add a DbChannel between scan and eventuality task
This commit is contained in:
parent
2bddf00222
commit
6196642beb
4 changed files with 168 additions and 61 deletions
|
@ -3,13 +3,13 @@ use std::io;
|
||||||
|
|
||||||
use scale::Encode;
|
use scale::Encode;
|
||||||
use borsh::{BorshSerialize, BorshDeserialize};
|
use borsh::{BorshSerialize, BorshDeserialize};
|
||||||
use serai_db::{Get, DbTxn, create_db};
|
use serai_db::{Get, DbTxn, create_db, db_channel};
|
||||||
|
|
||||||
use serai_in_instructions_primitives::InInstructionWithBalance;
|
use serai_in_instructions_primitives::InInstructionWithBalance;
|
||||||
|
|
||||||
use primitives::{ReceivedOutput, BorshG};
|
use primitives::{ReceivedOutput, BorshG};
|
||||||
|
|
||||||
use crate::{lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor};
|
use crate::{lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, Return};
|
||||||
|
|
||||||
// The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this.
|
// The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this.
|
||||||
trait Borshy: BorshSerialize + BorshDeserialize {}
|
trait Borshy: BorshSerialize + BorshDeserialize {}
|
||||||
|
@ -76,8 +76,6 @@ create_db!(
|
||||||
NotableBlock: (number: u64) -> (),
|
NotableBlock: (number: u64) -> (),
|
||||||
|
|
||||||
SerializedQueuedOutputs: (block_number: u64) -> Vec<u8>,
|
SerializedQueuedOutputs: (block_number: u64) -> Vec<u8>,
|
||||||
SerializedForwardedOutputsIndex: (block_number: u64) -> Vec<u8>,
|
|
||||||
SerializedForwardedOutput: (output_id: &[u8]) -> Vec<u8>,
|
|
||||||
SerializedOutputs: (block_number: u64) -> Vec<u8>,
|
SerializedOutputs: (block_number: u64) -> Vec<u8>,
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
@ -209,15 +207,6 @@ impl<S: ScannerFeed> ScannerDb<S> {
|
||||||
todo!("TODO")
|
todo!("TODO")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn queue_return(
|
|
||||||
txn: &mut impl DbTxn,
|
|
||||||
block_queued_from: u64,
|
|
||||||
return_addr: &AddressFor<S>,
|
|
||||||
output: &OutputFor<S>,
|
|
||||||
) {
|
|
||||||
todo!("TODO")
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn queue_output_until_block(
|
pub(crate) fn queue_output_until_block(
|
||||||
txn: &mut impl DbTxn,
|
txn: &mut impl DbTxn,
|
||||||
queue_for_block: u64,
|
queue_for_block: u64,
|
||||||
|
@ -229,26 +218,6 @@ impl<S: ScannerFeed> ScannerDb<S> {
|
||||||
SerializedQueuedOutputs::set(txn, queue_for_block, &outputs);
|
SerializedQueuedOutputs::set(txn, queue_for_block, &outputs);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn save_output_being_forwarded(
|
|
||||||
txn: &mut impl DbTxn,
|
|
||||||
block_forwarded_from: u64,
|
|
||||||
output: &OutputWithInInstruction<S>,
|
|
||||||
) {
|
|
||||||
let mut buf = Vec::with_capacity(128);
|
|
||||||
output.write(&mut buf).unwrap();
|
|
||||||
|
|
||||||
let id = output.output.id();
|
|
||||||
|
|
||||||
// Save this to an index so we can later fetch all outputs to forward
|
|
||||||
let mut forwarded_outputs = SerializedForwardedOutputsIndex::get(txn, block_forwarded_from)
|
|
||||||
.unwrap_or(Vec::with_capacity(32));
|
|
||||||
forwarded_outputs.extend(id.as_ref());
|
|
||||||
SerializedForwardedOutputsIndex::set(txn, block_forwarded_from, &forwarded_outputs);
|
|
||||||
|
|
||||||
// Save the output itself
|
|
||||||
SerializedForwardedOutput::set(txn, id.as_ref(), &buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn flag_notable(txn: &mut impl DbTxn, block_number: u64) {
|
pub(crate) fn flag_notable(txn: &mut impl DbTxn, block_number: u64) {
|
||||||
assert!(
|
assert!(
|
||||||
NextToPotentiallyReportBlock::get(txn).unwrap() <= block_number,
|
NextToPotentiallyReportBlock::get(txn).unwrap() <= block_number,
|
||||||
|
@ -287,11 +256,99 @@ impl<S: ScannerFeed> ScannerDb<S> {
|
||||||
NotableBlock::get(getter, number).is_some()
|
NotableBlock::get(getter, number).is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn take_queued_returns(txn: &mut impl DbTxn, block_number: u64) -> Vec<OutputFor<S>> {
|
|
||||||
todo!("TODO")
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn acquire_batch_id(txn: &mut impl DbTxn) -> u32 {
|
pub(crate) fn acquire_batch_id(txn: &mut impl DbTxn) -> u32 {
|
||||||
todo!("TODO")
|
todo!("TODO")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn return_address_and_in_instruction_for_forwarded_output(
|
||||||
|
getter: &impl Get,
|
||||||
|
output: &<OutputFor<S> as ReceivedOutput<KeyFor<S>, AddressFor<S>>>::Id,
|
||||||
|
) -> Option<(Option<AddressFor<S>>, InInstructionWithBalance)> {
|
||||||
|
todo!("TODO")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The data produced by scanning a block.
|
||||||
|
///
|
||||||
|
/// This is the sender's version which includes the forwarded outputs with their InInstructions,
|
||||||
|
/// which need to be saved to the database for later retrieval.
|
||||||
|
pub(crate) struct SenderScanData<S: ScannerFeed> {
|
||||||
|
/// The block number.
|
||||||
|
pub(crate) block_number: u64,
|
||||||
|
/// The received outputs which should be accumulated into the scheduler.
|
||||||
|
pub(crate) received_external_outputs: Vec<OutputFor<S>>,
|
||||||
|
/// The outputs which need to be forwarded.
|
||||||
|
pub(crate) forwards: Vec<OutputWithInInstruction<S>>,
|
||||||
|
/// The outputs which need to be returned.
|
||||||
|
pub(crate) returns: Vec<Return<S>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The data produced by scanning a block.
|
||||||
|
///
|
||||||
|
/// This is the receiver's version which doesn't include the forwarded outputs' InInstructions, as
|
||||||
|
/// the Eventuality task doesn't need it to process this block.
|
||||||
|
pub(crate) struct ReceiverScanData<S: ScannerFeed> {
|
||||||
|
/// The block number.
|
||||||
|
pub(crate) block_number: u64,
|
||||||
|
/// The received outputs which should be accumulated into the scheduler.
|
||||||
|
pub(crate) received_external_outputs: Vec<OutputFor<S>>,
|
||||||
|
/// The outputs which need to be forwarded.
|
||||||
|
pub(crate) forwards: Vec<OutputFor<S>>,
|
||||||
|
/// The outputs which need to be returned.
|
||||||
|
pub(crate) returns: Vec<Return<S>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(BorshSerialize, BorshDeserialize)]
|
||||||
|
pub(crate) struct SerializedScanData {
|
||||||
|
pub(crate) block_number: u64,
|
||||||
|
pub(crate) data: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
db_channel! {
|
||||||
|
ScannerScanEventuality {
|
||||||
|
ScannedBlock: (empty_key: ()) -> SerializedScanData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct ScanToEventualityDb<S: ScannerFeed>(PhantomData<S>);
|
||||||
|
impl<S: ScannerFeed> ScanToEventualityDb<S> {
|
||||||
|
pub(crate) fn send_scan_data(txn: &mut impl DbTxn, block_number: u64, data: &SenderScanData<S>) {
|
||||||
|
/*
|
||||||
|
SerializedForwardedOutputsIndex: (block_number: u64) -> Vec<u8>,
|
||||||
|
SerializedForwardedOutput: (output_id: &[u8]) -> Vec<u8>,
|
||||||
|
|
||||||
|
pub(crate) fn save_output_being_forwarded(
|
||||||
|
txn: &mut impl DbTxn,
|
||||||
|
block_forwarded_from: u64,
|
||||||
|
output: &OutputWithInInstruction<S>,
|
||||||
|
) {
|
||||||
|
let mut buf = Vec::with_capacity(128);
|
||||||
|
output.write(&mut buf).unwrap();
|
||||||
|
|
||||||
|
let id = output.output.id();
|
||||||
|
|
||||||
|
// Save this to an index so we can later fetch all outputs to forward
|
||||||
|
let mut forwarded_outputs = SerializedForwardedOutputsIndex::get(txn, block_forwarded_from)
|
||||||
|
.unwrap_or(Vec::with_capacity(32));
|
||||||
|
forwarded_outputs.extend(id.as_ref());
|
||||||
|
SerializedForwardedOutputsIndex::set(txn, block_forwarded_from, &forwarded_outputs);
|
||||||
|
|
||||||
|
// Save the output itself
|
||||||
|
SerializedForwardedOutput::set(txn, id.as_ref(), &buf);
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
ScannedBlock::send(txn, (), todo!("TODO"));
|
||||||
|
}
|
||||||
|
pub(crate) fn recv_scan_data(txn: &mut impl DbTxn, block_number: u64) -> ReceiverScanData<S> {
|
||||||
|
let data =
|
||||||
|
ScannedBlock::try_recv(txn, ()).expect("receiving data for a scanned block not yet sent");
|
||||||
|
assert_eq!(
|
||||||
|
block_number, data.block_number,
|
||||||
|
"received data for a scanned block distinct than expected"
|
||||||
|
);
|
||||||
|
let data = &data.data;
|
||||||
|
|
||||||
|
todo!("TODO")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,11 +2,13 @@ use group::GroupEncoding;
|
||||||
|
|
||||||
use serai_db::{DbTxn, Db};
|
use serai_db::{DbTxn, Db};
|
||||||
|
|
||||||
use primitives::{OutputType, ReceivedOutput, Eventuality, Block};
|
use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Eventuality, Block};
|
||||||
|
|
||||||
// TODO: Localize to EventualityDb?
|
// TODO: Localize to EventualityDb?
|
||||||
use crate::{
|
use crate::{
|
||||||
lifetime::LifetimeStage, db::ScannerDb, BlockExt, ScannerFeed, KeyFor, Scheduler, ContinuallyRan,
|
lifetime::LifetimeStage,
|
||||||
|
db::{OutputWithInInstruction, ReceiverScanData, ScannerDb, ScanToEventualityDb},
|
||||||
|
BlockExt, ScannerFeed, KeyFor, SchedulerUpdate, Scheduler,
|
||||||
};
|
};
|
||||||
|
|
||||||
mod db;
|
mod db;
|
||||||
|
@ -137,13 +139,12 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
|
||||||
|
|
||||||
let mut txn = self.db.txn();
|
let mut txn = self.db.txn();
|
||||||
|
|
||||||
// Fetch the External outputs we reported, and therefore should yield after handling this
|
// Fetch the data from the scanner
|
||||||
// block
|
let scan_data = ScanToEventualityDb::recv_scan_data(&mut txn, b);
|
||||||
let mut outputs = ScannerDb::<S>::in_instructions(&txn, b)
|
assert_eq!(scan_data.block_number, b);
|
||||||
.expect("handling eventualities/outputs for block which didn't set its InInstructions")
|
let ReceiverScanData { block_number: _, received_external_outputs, forwards, returns } =
|
||||||
.into_iter()
|
scan_data;
|
||||||
.map(|output| output.output)
|
let mut outputs = received_external_outputs;
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
for key in keys {
|
for key in keys {
|
||||||
let completed_eventualities = {
|
let completed_eventualities = {
|
||||||
|
@ -184,17 +185,37 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now, we iterate over all Forwarded outputs and queue their InInstructions
|
// Now, we iterate over all Forwarded outputs and queue their InInstructions
|
||||||
todo!("TODO");
|
for output in
|
||||||
|
non_external_outputs.iter().filter(|output| output.kind() == OutputType::Forwarded)
|
||||||
|
{
|
||||||
|
let Some(eventuality) = completed_eventualities.get(&output.transaction_id()) else {
|
||||||
|
// Output sent to the forwarding address yet not actually forwarded
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let Some(forwarded) = eventuality.forwarded_output() else {
|
||||||
|
// This was a TX made by us, yet someone burned to the forwarding address
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
let (return_address, in_instruction) =
|
||||||
|
ScannerDb::<S>::return_address_and_in_instruction_for_forwarded_output(
|
||||||
|
&txn, &forwarded,
|
||||||
|
)
|
||||||
|
.expect("forwarded an output yet didn't save its InInstruction to the DB");
|
||||||
|
ScannerDb::<S>::queue_output_until_block(
|
||||||
|
&mut txn,
|
||||||
|
b + S::WINDOW_LENGTH,
|
||||||
|
&OutputWithInInstruction { output: output.clone(), return_address, in_instruction },
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// Accumulate all of these outputs
|
// Accumulate all of these outputs
|
||||||
outputs.extend(non_external_outputs);
|
outputs.extend(non_external_outputs);
|
||||||
}
|
}
|
||||||
|
|
||||||
let outputs_to_return = ScannerDb::<S>::take_queued_returns(&mut txn, b);
|
|
||||||
|
|
||||||
// TODO: This also has to intake Burns
|
// TODO: This also has to intake Burns
|
||||||
let new_eventualities =
|
let new_eventualities =
|
||||||
self.scheduler.accumulate_outputs_and_return_outputs(&mut txn, outputs, outputs_to_return);
|
self.scheduler.update(&mut txn, SchedulerUpdate { outputs, forwards, returns });
|
||||||
for (key, new_eventualities) in new_eventualities {
|
for (key, new_eventualities) in new_eventualities {
|
||||||
let key = {
|
let key = {
|
||||||
let mut key_repr = <KeyFor<S> as GroupEncoding>::Repr::default();
|
let mut key_repr = <KeyFor<S> as GroupEncoding>::Repr::default();
|
||||||
|
|
|
@ -148,17 +148,29 @@ type AddressFor<S> = <<S as ScannerFeed>::Block as Block>::Address;
|
||||||
type OutputFor<S> = <<S as ScannerFeed>::Block as Block>::Output;
|
type OutputFor<S> = <<S as ScannerFeed>::Block as Block>::Output;
|
||||||
type EventualityFor<S> = <<S as ScannerFeed>::Block as Block>::Eventuality;
|
type EventualityFor<S> = <<S as ScannerFeed>::Block as Block>::Eventuality;
|
||||||
|
|
||||||
|
/// A return to occur.
|
||||||
|
pub struct Return<S: ScannerFeed> {
|
||||||
|
address: AddressFor<S>,
|
||||||
|
output: OutputFor<S>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An update for the scheduler.
|
||||||
|
pub struct SchedulerUpdate<S: ScannerFeed> {
|
||||||
|
outputs: Vec<OutputFor<S>>,
|
||||||
|
forwards: Vec<OutputFor<S>>,
|
||||||
|
returns: Vec<Return<S>>,
|
||||||
|
}
|
||||||
|
|
||||||
/// The object responsible for accumulating outputs and planning new transactions.
|
/// The object responsible for accumulating outputs and planning new transactions.
|
||||||
pub trait Scheduler<S: ScannerFeed>: Send {
|
pub trait Scheduler<S: ScannerFeed>: Send {
|
||||||
/// Accumulate outputs into the scheduler, yielding the Eventualities now to be scanned for.
|
/// Accumulate outputs into the scheduler, yielding the Eventualities now to be scanned for.
|
||||||
///
|
///
|
||||||
/// The `Vec<u8>` used as the key in the returned HashMap should be the encoded key these
|
/// The `Vec<u8>` used as the key in the returned HashMap should be the encoded key the
|
||||||
/// Eventualities are for.
|
/// Eventualities are for.
|
||||||
fn accumulate_outputs_and_return_outputs(
|
fn update(
|
||||||
&mut self,
|
&mut self,
|
||||||
txn: &mut impl DbTxn,
|
txn: &mut impl DbTxn,
|
||||||
outputs: Vec<OutputFor<S>>,
|
update: SchedulerUpdate<S>,
|
||||||
outputs_to_return: Vec<OutputFor<S>>,
|
|
||||||
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>>;
|
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,8 +11,8 @@ use primitives::{OutputType, ReceivedOutput, Block};
|
||||||
// TODO: Localize to ScanDb?
|
// TODO: Localize to ScanDb?
|
||||||
use crate::{
|
use crate::{
|
||||||
lifetime::LifetimeStage,
|
lifetime::LifetimeStage,
|
||||||
db::{OutputWithInInstruction, ScannerDb},
|
db::{OutputWithInInstruction, SenderScanData, ScannerDb, ScanToEventualityDb},
|
||||||
BlockExt, ScannerFeed, AddressFor, OutputFor, ContinuallyRan,
|
BlockExt, ScannerFeed, AddressFor, OutputFor, Return, ContinuallyRan,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Construct an InInstruction from an external output.
|
// Construct an InInstruction from an external output.
|
||||||
|
@ -86,6 +86,12 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
|
||||||
|
|
||||||
let mut txn = self.db.txn();
|
let mut txn = self.db.txn();
|
||||||
|
|
||||||
|
let mut scan_data = SenderScanData {
|
||||||
|
block_number: b,
|
||||||
|
received_external_outputs: vec![],
|
||||||
|
forwards: vec![],
|
||||||
|
returns: vec![],
|
||||||
|
};
|
||||||
let mut in_instructions = ScannerDb::<S>::take_queued_outputs(&mut txn, b);
|
let mut in_instructions = ScannerDb::<S>::take_queued_outputs(&mut txn, b);
|
||||||
|
|
||||||
// Scan for each key
|
// Scan for each key
|
||||||
|
@ -171,13 +177,21 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
|
||||||
return_address,
|
return_address,
|
||||||
in_instruction: InInstructionWithBalance { instruction, balance: balance_to_use },
|
in_instruction: InInstructionWithBalance { instruction, balance: balance_to_use },
|
||||||
},
|
},
|
||||||
(Some(return_addr), None) => {
|
(Some(address), None) => {
|
||||||
// Since there was no instruction here, return this since we parsed a return address
|
// Since there was no instruction here, return this since we parsed a return address
|
||||||
ScannerDb::<S>::queue_return(&mut txn, b, &return_addr, &output);
|
if key.stage != LifetimeStage::Finishing {
|
||||||
|
scan_data.returns.push(Return { address, output });
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// Since we didn't receive an instruction nor can we return this, queue this for
|
||||||
|
// accumulation and move on
|
||||||
|
(None, None) => {
|
||||||
|
if key.stage != LifetimeStage::Finishing {
|
||||||
|
scan_data.received_external_outputs.push(output);
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// Since we didn't receive an instruction nor can we return this, move on
|
|
||||||
(None, None) => continue,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Drop External outputs if they're to a multisig which won't report them
|
// Drop External outputs if they're to a multisig which won't report them
|
||||||
|
@ -201,7 +215,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
|
||||||
LifetimeStage::Forwarding => {
|
LifetimeStage::Forwarding => {
|
||||||
// When the forwarded output appears, we can see which Plan it's associated with and
|
// When the forwarded output appears, we can see which Plan it's associated with and
|
||||||
// from there recover this output
|
// from there recover this output
|
||||||
ScannerDb::<S>::save_output_being_forwarded(&mut txn, b, &output_with_in_instruction);
|
scan_data.forwards.push(output_with_in_instruction);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// We should drop these as we should not be handling new External outputs at this
|
// We should drop these as we should not be handling new External outputs at this
|
||||||
|
@ -213,10 +227,13 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
|
||||||
// Ensures we didn't miss a `continue` above
|
// Ensures we didn't miss a `continue` above
|
||||||
assert!(matches!(key.stage, LifetimeStage::Active | LifetimeStage::UsingNewForChange));
|
assert!(matches!(key.stage, LifetimeStage::Active | LifetimeStage::UsingNewForChange));
|
||||||
|
|
||||||
|
scan_data.received_external_outputs.push(output_with_in_instruction.output.clone());
|
||||||
in_instructions.push(output_with_in_instruction);
|
in_instructions.push(output_with_in_instruction);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Save the outputs to return
|
||||||
|
ScanToEventualityDb::<S>::send_scan_data(&mut txn, b, &scan_data);
|
||||||
// Save the in instructions
|
// Save the in instructions
|
||||||
ScannerDb::<S>::set_in_instructions(&mut txn, b, in_instructions);
|
ScannerDb::<S>::set_in_instructions(&mut txn, b, in_instructions);
|
||||||
// Update the next to scan block
|
// Update the next to scan block
|
||||||
|
|
Loading…
Reference in a new issue