Expand primitives/scanner with niceties needed for the scheduler

This commit is contained in:
Luke Parker 2024-09-01 00:05:08 -04:00
parent bd277e7032
commit 6deb60513c
5 changed files with 75 additions and 13 deletions

View file

@ -8,7 +8,7 @@ use serai_primitives::{ExternalAddress, Balance};
use crate::Id;
/// An address on the external network.
pub trait Address: Send + Sync + Into<ExternalAddress> + TryFrom<ExternalAddress> {
pub trait Address: Send + Sync + Clone + Into<ExternalAddress> + TryFrom<ExternalAddress> {
/// Write this address.
fn write(&self, writer: &mut impl io::Write) -> io::Result<()>;
/// Read an address.

View file

@ -12,7 +12,7 @@ use crate::{
SeraiKey, OutputWithInInstruction, ReceiverScanData, ScannerGlobalDb, SubstrateToEventualityDb,
ScanToEventualityDb,
},
BlockExt, ScannerFeed, KeyFor, OutputFor, EventualityFor, SchedulerUpdate, Scheduler,
BlockExt, ScannerFeed, KeyFor, OutputFor, EventualityFor, Payment, SchedulerUpdate, Scheduler,
sort_outputs,
scan::{next_to_scan_for_outputs_block, queue_output_until_block},
};
@ -165,7 +165,11 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> EventualityTask<D, S, Sch> {
{
intaked_any = true;
let new_eventualities = self.scheduler.fulfill(&mut txn, &keys_with_stages, burns);
let new_eventualities = self.scheduler.fulfill(
&mut txn,
&keys_with_stages,
burns.into_iter().filter_map(|burn| Payment::try_from(burn).ok()).collect(),
);
intake_eventualities::<S>(&mut txn, new_eventualities);
}
txn.commit();
@ -291,7 +295,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
// Drop any outputs less than the dust limit
non_external_outputs.retain(|output| {
let balance = output.balance();
balance.amount.0 >= self.feed.dust(balance.coin).0
balance.amount.0 >= S::dust(balance.coin).0
});
/*

View file

@ -5,7 +5,7 @@ use group::GroupEncoding;
use serai_db::{Get, DbTxn, Db};
use serai_primitives::{NetworkId, Coin, Amount};
use serai_primitives::{NetworkId, Coin, Amount, Balance, Data};
use serai_in_instructions_primitives::Batch;
use serai_coins_primitives::OutInstructionWithBalance;
@ -143,7 +143,7 @@ pub trait ScannerFeed: 'static + Send + Sync + Clone {
///
/// This MUST be constant. Serai MUST NOT create internal outputs worth less than this. This
/// SHOULD be a value worth handling at a human level.
fn dust(&self, coin: Coin) -> Amount;
fn dust(coin: Coin) -> Amount;
/// The cost to aggregate an input as of the specified block.
///
@ -155,10 +155,14 @@ pub trait ScannerFeed: 'static + Send + Sync + Clone {
) -> Result<Amount, Self::EphemeralError>;
}
type KeyFor<S> = <<S as ScannerFeed>::Block as Block>::Key;
type AddressFor<S> = <<S as ScannerFeed>::Block as Block>::Address;
type OutputFor<S> = <<S as ScannerFeed>::Block as Block>::Output;
type EventualityFor<S> = <<S as ScannerFeed>::Block as Block>::Eventuality;
/// The key type for this ScannerFeed.
pub type KeyFor<S> = <<S as ScannerFeed>::Block as Block>::Key;
/// The address type for this ScannerFeed.
pub type AddressFor<S> = <<S as ScannerFeed>::Block as Block>::Address;
/// The output type for this ScannerFeed.
pub type OutputFor<S> = <<S as ScannerFeed>::Block as Block>::Output;
/// The eventuality type for this ScannerFeed.
pub type EventualityFor<S> = <<S as ScannerFeed>::Block as Block>::Eventuality;
#[async_trait::async_trait]
pub trait BatchPublisher: 'static + Send + Sync {
@ -200,6 +204,55 @@ pub struct SchedulerUpdate<S: ScannerFeed> {
returns: Vec<Return<S>>,
}
impl<S: ScannerFeed> SchedulerUpdate<S> {
/// The outputs to accumulate.
pub fn outputs(&self) -> &[OutputFor<S>] {
&self.outputs
}
/// The outputs to forward to the latest multisig.
pub fn forwards(&self) -> &[OutputFor<S>] {
&self.forwards
}
/// The outputs to return.
pub fn returns(&self) -> &[Return<S>] {
&self.returns
}
}
/// A payment to fulfill.
#[derive(Clone)]
pub struct Payment<S: ScannerFeed> {
address: AddressFor<S>,
balance: Balance,
data: Option<Vec<u8>>,
}
impl<S: ScannerFeed> TryFrom<OutInstructionWithBalance> for Payment<S> {
type Error = ();
fn try_from(out_instruction_with_balance: OutInstructionWithBalance) -> Result<Self, ()> {
Ok(Payment {
address: out_instruction_with_balance.instruction.address.try_into().map_err(|_| ())?,
balance: out_instruction_with_balance.balance,
data: out_instruction_with_balance.instruction.data.map(Data::consume),
})
}
}
impl<S: ScannerFeed> Payment<S> {
/// The address to pay.
pub fn address(&self) -> &AddressFor<S> {
&self.address
}
/// The balance to transfer.
pub fn balance(&self) -> Balance {
self.balance
}
/// The data to associate with this payment.
pub fn data(&self) -> &Option<Vec<u8>> {
&self.data
}
}
/// The object responsible for accumulating outputs and planning new transactions.
pub trait Scheduler<S: ScannerFeed>: 'static + Send {
/// Activate a key.
@ -274,7 +327,7 @@ pub trait Scheduler<S: ScannerFeed>: 'static + Send {
&mut self,
txn: &mut impl DbTxn,
active_keys: &[(KeyFor<S>, LifetimeStage)],
payments: Vec<OutInstructionWithBalance>,
payments: Vec<Payment<S>>,
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>>;
}

View file

@ -215,7 +215,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
let balance = output.balance();
// We ensure it's over the dust limit to prevent people sending 1 satoshi from causing
// an invocation of a consensus/signing protocol
if balance.amount.0 >= self.feed.dust(balance.coin).0 {
if balance.amount.0 >= S::dust(balance.coin).0 {
ScannerGlobalDb::<S>::flag_notable_due_to_non_external_output(&mut txn, b);
}
continue;
@ -243,7 +243,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
balance.amount.0 -= 2 * cost_to_aggregate.0;
// Now, check it's still past the dust threshold
if balance.amount.0 < self.feed.dust(balance.coin).0 {
if balance.amount.0 < S::dust(balance.coin).0 {
continue;
}

View file

@ -138,6 +138,11 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for SubstrateTask<D, S> {
}
}
// Drop burns less than the dust
let burns = burns
.into_iter()
.filter(|burn| burn.balance.amount.0 >= S::dust(burn.balance.coin).0)
.collect::<Vec<_>>();
if !burns.is_empty() {
// We send these Burns as stemming from this block we just acknowledged
// This causes them to be acted on after we accumulate the outputs from this block