mirror of
https://github.com/serai-dex/serai.git
synced 2025-01-25 20:16:01 +00:00
Flesh out report task
This commit is contained in:
parent
fd12cc0213
commit
d5d1fc3eea
5 changed files with 79 additions and 30 deletions
|
@ -6,12 +6,13 @@ use crate::{Id, Address, ReceivedOutput};
|
|||
|
||||
/// A block header from an external network.
|
||||
pub trait BlockHeader: Send + Sync + Sized + Clone + Debug {
|
||||
/// The type used to identify blocks.
|
||||
type Id: 'static + Id;
|
||||
/// The ID of this block.
|
||||
fn id(&self) -> Self::Id;
|
||||
///
|
||||
/// This is fixed to 32-bytes and is expected to be cryptographically binding with 128-bit
|
||||
/// security. This is not required to be the ID used natively by the external network.
|
||||
fn id(&self) -> [u8; 32];
|
||||
/// The ID of the parent block.
|
||||
fn parent(&self) -> Self::Id;
|
||||
fn parent(&self) -> [u8; 32];
|
||||
}
|
||||
|
||||
/// A block from an external network.
|
||||
|
@ -33,7 +34,7 @@ pub trait Block: Send + Sync + Sized + Clone + Debug {
|
|||
type Output: ReceivedOutput<Self::Key, Self::Address>;
|
||||
|
||||
/// The ID of this block.
|
||||
fn id(&self) -> <Self::Header as BlockHeader>::Id;
|
||||
fn id(&self) -> [u8; 32];
|
||||
|
||||
/// Scan all outputs within this block to find the outputs spendable by this key.
|
||||
fn scan_for_outputs(&self, key: Self::Key) -> Vec<Self::Output>;
|
||||
|
|
|
@ -11,7 +11,7 @@ use serai_in_instructions_primitives::InInstructionWithBalance;
|
|||
|
||||
use primitives::{Id, ReceivedOutput, Block, BorshG};
|
||||
|
||||
use crate::{lifetime::LifetimeStage, ScannerFeed, BlockIdFor, KeyFor, AddressFor, OutputFor};
|
||||
use crate::{lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor};
|
||||
|
||||
// The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this.
|
||||
trait Borshy: BorshSerialize + BorshDeserialize {}
|
||||
|
@ -46,8 +46,8 @@ impl<S: ScannerFeed> OutputWithInInstruction<S> {
|
|||
|
||||
create_db!(
|
||||
Scanner {
|
||||
BlockId: <I: Id>(number: u64) -> I,
|
||||
BlockNumber: <I: Id>(id: I) -> u64,
|
||||
BlockId: (number: u64) -> [u8; 32],
|
||||
BlockNumber: (id: [u8; 32]) -> u64,
|
||||
|
||||
ActiveKeys: <K: Borshy>() -> Vec<SeraiKeyDbEntry<K>>,
|
||||
|
||||
|
@ -91,14 +91,14 @@ create_db!(
|
|||
|
||||
pub(crate) struct ScannerDb<S: ScannerFeed>(PhantomData<S>);
|
||||
impl<S: ScannerFeed> ScannerDb<S> {
|
||||
pub(crate) fn set_block(txn: &mut impl DbTxn, number: u64, id: BlockIdFor<S>) {
|
||||
pub(crate) fn set_block(txn: &mut impl DbTxn, number: u64, id: [u8; 32]) {
|
||||
BlockId::set(txn, number, &id);
|
||||
BlockNumber::set(txn, id, &number);
|
||||
}
|
||||
pub(crate) fn block_id(getter: &impl Get, number: u64) -> Option<BlockIdFor<S>> {
|
||||
pub(crate) fn block_id(getter: &impl Get, number: u64) -> Option<[u8; 32]> {
|
||||
BlockId::get(getter, number)
|
||||
}
|
||||
pub(crate) fn block_number(getter: &impl Get, id: BlockIdFor<S>) -> Option<u64> {
|
||||
pub(crate) fn block_number(getter: &impl Get, id: [u8; 32]) -> Option<u64> {
|
||||
BlockNumber::get(getter, id)
|
||||
}
|
||||
|
||||
|
@ -154,7 +154,7 @@ impl<S: ScannerFeed> ScannerDb<S> {
|
|||
Some(keys)
|
||||
}
|
||||
|
||||
pub(crate) fn set_start_block(txn: &mut impl DbTxn, start_block: u64, id: BlockIdFor<S>) {
|
||||
pub(crate) fn set_start_block(txn: &mut impl DbTxn, start_block: u64, id: [u8; 32]) {
|
||||
assert!(
|
||||
LatestFinalizedBlock::get(txn).is_none(),
|
||||
"setting start block but prior set start block"
|
||||
|
@ -276,18 +276,18 @@ impl<S: ScannerFeed> ScannerDb<S> {
|
|||
SerializedForwardedOutput::set(txn, id.as_ref(), &buf);
|
||||
}
|
||||
|
||||
// TODO: Use a DbChannel here, and send the instructions to the report task and the outputs to
|
||||
// the eventuality task? That way this cleans up after itself
|
||||
pub(crate) fn set_in_instructions(
|
||||
txn: &mut impl DbTxn,
|
||||
block_number: u64,
|
||||
outputs: Vec<OutputWithInInstruction<S>>,
|
||||
) {
|
||||
if outputs.is_empty() {
|
||||
return;
|
||||
if !outputs.is_empty() {
|
||||
// Set this block as notable
|
||||
NotableBlock::set(txn, block_number, &());
|
||||
}
|
||||
|
||||
// Set this block as notable
|
||||
NotableBlock::set(txn, block_number, &());
|
||||
|
||||
let mut buf = Vec::with_capacity(outputs.len() * 128);
|
||||
for output in outputs {
|
||||
output.write(&mut buf).unwrap();
|
||||
|
@ -295,6 +295,13 @@ impl<S: ScannerFeed> ScannerDb<S> {
|
|||
SerializedOutputs::set(txn, block_number, &buf);
|
||||
}
|
||||
|
||||
pub(crate) fn in_instructions(
|
||||
getter: &impl Get,
|
||||
block_number: u64,
|
||||
) -> Option<Vec<OutputWithInInstruction<S>>> {
|
||||
todo!("TODO")
|
||||
}
|
||||
|
||||
pub(crate) fn is_block_notable(getter: &impl Get, number: u64) -> bool {
|
||||
NotableBlock::get(getter, number).is_some()
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ use core::{marker::PhantomData, fmt::Debug, time::Duration};
|
|||
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use serai_primitives::{Coin, Amount};
|
||||
use serai_primitives::{NetworkId, Coin, Amount};
|
||||
use primitives::{ReceivedOutput, BlockHeader, Block};
|
||||
|
||||
// Logic for deciding where in its lifetime a multisig is.
|
||||
|
@ -24,6 +24,9 @@ mod report;
|
|||
/// This defines the primitive types used, along with various getters necessary for indexing.
|
||||
#[async_trait::async_trait]
|
||||
pub trait ScannerFeed: Send + Sync {
|
||||
/// The ID of the network being scanned for.
|
||||
const NETWORK: NetworkId;
|
||||
|
||||
/// The amount of confirmations a block must have to be considered finalized.
|
||||
///
|
||||
/// This value must be at least `1`.
|
||||
|
@ -84,7 +87,6 @@ pub trait ScannerFeed: Send + Sync {
|
|||
fn dust(&self, coin: Coin) -> Amount;
|
||||
}
|
||||
|
||||
type BlockIdFor<S> = <<<S as ScannerFeed>::Block as Block>::Header as BlockHeader>::Id;
|
||||
type KeyFor<S> = <<S as ScannerFeed>::Block as Block>::Key;
|
||||
type AddressFor<S> = <<S as ScannerFeed>::Block as Block>::Address;
|
||||
type OutputFor<S> = <<S as ScannerFeed>::Block as Block>::Output;
|
||||
|
|
|
@ -1,15 +1,20 @@
|
|||
/*
|
||||
We only report blocks once both tasks, scanning for received ouputs and eventualities, have
|
||||
processed the block. This ensures we've performed all ncessary options.
|
||||
*/
|
||||
|
||||
use scale::Encode;
|
||||
use serai_db::{Db, DbTxn};
|
||||
|
||||
use serai_primitives::BlockHash;
|
||||
use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
|
||||
use primitives::{Id, OutputType, Block};
|
||||
|
||||
// TODO: Localize to ReportDb?
|
||||
use crate::{db::ScannerDb, ScannerFeed, ContinuallyRan};
|
||||
|
||||
/*
|
||||
This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion.
|
||||
|
||||
We only report blocks once both tasks, scanning for received outputs and checking for resolved
|
||||
Eventualities, have processed the block. This ensures we know if this block is notable, and have
|
||||
the InInstructions for it.
|
||||
*/
|
||||
struct ReportTask<D: Db, S: ScannerFeed> {
|
||||
db: D,
|
||||
feed: S,
|
||||
|
@ -39,15 +44,49 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
|
|||
.expect("ReportTask run before writing the start block");
|
||||
|
||||
for b in next_to_potentially_report ..= highest_reportable {
|
||||
if ScannerDb::<S>::is_block_notable(&self.db, b) {
|
||||
let in_instructions = todo!("TODO");
|
||||
// TODO: Also pull the InInstructions from forwarding
|
||||
todo!("TODO: Make Batches, which requires handling Forwarded within this crate");
|
||||
let mut txn = self.db.txn();
|
||||
|
||||
if ScannerDb::<S>::is_block_notable(&txn, b) {
|
||||
let in_instructions = ScannerDb::<S>::in_instructions(&txn, b)
|
||||
.expect("reporting block which didn't set its InInstructions");
|
||||
|
||||
let network = S::NETWORK;
|
||||
let block_hash =
|
||||
ScannerDb::<S>::block_id(&txn, b).expect("reporting block we didn't save the ID for");
|
||||
let mut batch_id = ScannerDb::<S>::acquire_batch_id(txn);
|
||||
|
||||
// start with empty batch
|
||||
let mut batches =
|
||||
vec![Batch { network, id: batch_id, block: BlockHash(block_hash), instructions: vec![] }];
|
||||
|
||||
for instruction in in_instructions {
|
||||
let batch = batches.last_mut().unwrap();
|
||||
batch.instructions.push(instruction.in_instruction);
|
||||
|
||||
// check if batch is over-size
|
||||
if batch.encode().len() > MAX_BATCH_SIZE {
|
||||
// pop the last instruction so it's back in size
|
||||
let instruction = batch.instructions.pop().unwrap();
|
||||
|
||||
// bump the id for the new batch
|
||||
batch_id = ScannerDb::<S>::acquire_batch_id(txn);
|
||||
|
||||
// make a new batch with this instruction included
|
||||
batches.push(Batch {
|
||||
network,
|
||||
id: batch_id,
|
||||
block: BlockHash(block_hash),
|
||||
instructions: vec![instruction],
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
todo!("TODO: Set/emit batches");
|
||||
}
|
||||
|
||||
let mut txn = self.db.txn();
|
||||
// Update the next to potentially report block
|
||||
ScannerDb::<S>::set_next_to_potentially_report_block(&mut txn, b + 1);
|
||||
|
||||
txn.commit();
|
||||
}
|
||||
|
||||
|
|
|
@ -212,7 +212,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
|
|||
LifetimeStage::Forwarding => {
|
||||
// When the forwarded output appears, we can see which Plan it's associated with and
|
||||
// from there recover this output
|
||||
ScannerDb::<S>::save_output_being_forwarded(&mut txn, &output_with_in_instruction);
|
||||
ScannerDb::<S>::save_output_being_forwarded(&mut txn, b, &output_with_in_instruction);
|
||||
continue;
|
||||
}
|
||||
// We should drop these as we should not be handling new External outputs at this
|
||||
|
|
Loading…
Reference in a new issue