Add non-transaction-chaining scheduler

This commit is contained in:
Luke Parker 2024-09-04 03:54:12 -04:00
parent 0c1aec29bb
commit 6e9cb74022
17 changed files with 951 additions and 145 deletions

View file

@ -45,6 +45,7 @@ jobs:
-p serai-processor-scanner \
-p serai-processor-scheduler-primitives \
-p serai-processor-utxo-scheduler-primitives \
-p serai-processor-utxo-scheduler \
-p serai-processor-transaction-chaining-scheduler \
-p serai-processor \
-p tendermint-machine \

16
Cargo.lock generated
View file

@ -8733,11 +8733,27 @@ dependencies = [
"serai-processor-utxo-scheduler-primitives",
]
[[package]]
name = "serai-processor-utxo-scheduler"
version = "0.1.0"
dependencies = [
"borsh",
"group",
"parity-scale-codec",
"serai-db",
"serai-primitives",
"serai-processor-primitives",
"serai-processor-scanner",
"serai-processor-scheduler-primitives",
"serai-processor-utxo-scheduler-primitives",
]
[[package]]
name = "serai-processor-utxo-scheduler-primitives"
version = "0.1.0"
dependencies = [
"async-trait",
"borsh",
"serai-primitives",
"serai-processor-primitives",
"serai-processor-scanner",

View file

@ -77,6 +77,7 @@ members = [
"processor/scanner",
"processor/scheduler/primitives",
"processor/scheduler/utxo/primitives",
"processor/scheduler/utxo/standard",
"processor/scheduler/utxo/transaction-chaining",
"processor",

View file

@ -52,6 +52,7 @@ exceptions = [
{ allow = ["AGPL-3.0"], name = "serai-processor-scanner" },
{ allow = ["AGPL-3.0"], name = "serai-processor-scheduler-primitives" },
{ allow = ["AGPL-3.0"], name = "serai-processor-utxo-scheduler-primitives" },
{ allow = ["AGPL-3.0"], name = "serai-processor-standard-scheduler" },
{ allow = ["AGPL-3.0"], name = "serai-processor-transaction-chaining-scheduler" },
{ allow = ["AGPL-3.0"], name = "serai-processor" },

View file

@ -3,12 +3,22 @@ use std::io;
use group::GroupEncoding;
use borsh::{BorshSerialize, BorshDeserialize};
use serai_primitives::{ExternalAddress, Balance};
use crate::Id;
/// An address on the external network.
pub trait Address: Send + Sync + Clone + Into<ExternalAddress> + TryFrom<ExternalAddress> {
pub trait Address:
Send
+ Sync
+ Clone
+ Into<ExternalAddress>
+ TryFrom<ExternalAddress>
+ BorshSerialize
+ BorshDeserialize
{
/// Write this address.
fn write(&self, writer: &mut impl io::Write) -> io::Result<()>;
/// Read an address.

View file

@ -1,6 +1,7 @@
use std::io;
use scale::{Encode, Decode, IoReader};
use borsh::{BorshSerialize, BorshDeserialize};
use serai_primitives::{Balance, Data};
use serai_coins_primitives::OutInstructionWithBalance;
@ -8,7 +9,7 @@ use serai_coins_primitives::OutInstructionWithBalance;
use crate::Address;
/// A payment to fulfill.
#[derive(Clone)]
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub struct Payment<A: Address> {
address: A,
balance: Balance,

View file

@ -13,6 +13,9 @@ publish = false
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[package.metadata.cargo-machete]
ignored = ["scale", "borsh"]
[lints]
workspace = true

View file

@ -19,6 +19,8 @@ workspace = true
[dependencies]
async-trait = { version = "0.1", default-features = false }
borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] }
serai-primitives = { path = "../../../../substrate/primitives", default-features = false, features = ["std"] }
primitives = { package = "serai-processor-primitives", path = "../../../primitives" }

View file

@ -8,6 +8,9 @@ use primitives::{ReceivedOutput, Payment};
use scanner::{ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, BlockFor};
use scheduler_primitives::*;
mod tree;
pub use tree::*;
/// A planned transaction.
pub struct PlannedTransaction<S: ScannerFeed, ST: SignableTransaction, A> {
/// The signable transaction.
@ -18,6 +21,23 @@ pub struct PlannedTransaction<S: ScannerFeed, ST: SignableTransaction, A> {
pub auxilliary: A,
}
/// A planned transaction which was created via amortizing the fee.
pub struct AmortizePlannedTransaction<S: ScannerFeed, ST: SignableTransaction, A> {
/// The amounts the included payments were worth.
///
/// If the payments passed as an argument are sorted from highest to lowest valued, these `n`
/// amounts will be for the first `n` payments.
pub effected_payments: Vec<Amount>,
/// Whether or not the planned transaction had a change output.
pub has_change: bool,
/// The signable transaction.
pub signable: ST,
/// The Eventuality to watch for.
pub eventuality: EventualityFor<S>,
/// The auxilliary data for this transaction.
pub auxilliary: A,
}
/// An object able to plan a transaction.
#[async_trait::async_trait]
pub trait TransactionPlanner<S: ScannerFeed, A>: 'static + Send + Sync {
@ -60,7 +80,8 @@ pub trait TransactionPlanner<S: ScannerFeed, A>: 'static + Send + Sync {
/// This must only require the same fee as would be returned by `calculate_fee`. The caller is
/// trusted to maintain `sum(inputs) - sum(payments) >= if change.is_some() { DUST } else { 0 }`.
///
/// `change` will always be an address belonging to the Serai network.
/// `change` will always be an address belonging to the Serai network. If it is `Some`, a change
/// output must be created.
fn plan(
fee_rate: Self::FeeRate,
inputs: Vec<OutputFor<S>>,
@ -82,7 +103,7 @@ pub trait TransactionPlanner<S: ScannerFeed, A>: 'static + Send + Sync {
inputs: Vec<OutputFor<S>>,
mut payments: Vec<Payment<AddressFor<S>>>,
mut change: Option<KeyFor<S>>,
) -> Option<PlannedTransaction<S, Self::SignableTransaction, A>> {
) -> Option<AmortizePlannedTransaction<S, Self::SignableTransaction, A>> {
// If there's no change output, we can't recoup any operating costs we would amortize
// We also don't have any losses if the inputs are written off/the change output is reduced
let mut operating_costs_if_no_change = 0;
@ -192,6 +213,48 @@ pub trait TransactionPlanner<S: ScannerFeed, A>: 'static + Send + Sync {
}
// Because we amortized, or accrued as operating costs, the fee, make the transaction
Some(Self::plan(fee_rate, inputs, payments, change))
let effected_payments = payments.iter().map(|payment| payment.balance().amount).collect();
let has_change = change.is_some();
let PlannedTransaction { signable, eventuality, auxilliary } =
Self::plan(fee_rate, inputs, payments, change);
Some(AmortizePlannedTransaction {
effected_payments,
has_change,
signable,
eventuality,
auxilliary,
})
}
/// Create a tree to fulfill a set of payments.
///
/// Returns a `TreeTransaction` whose children (and arbitrary children of children) fulfill all
/// these payments. This tree root will be able to be made with a change output.
fn tree(payments: &[Payment<AddressFor<S>>]) -> TreeTransaction<AddressFor<S>> {
// This variable is for the current layer of the tree being built
let mut tree = Vec::with_capacity(payments.len().div_ceil(Self::MAX_OUTPUTS));
// Push the branches for the leaves (the payments out)
for payments in payments.chunks(Self::MAX_OUTPUTS) {
let value = payments.iter().map(|payment| payment.balance().amount.0).sum::<u64>();
tree.push(TreeTransaction::<AddressFor<S>>::Leaves { payments: payments.to_vec(), value });
}
// While we haven't calculated a tree root, or the tree root doesn't support a change output,
// keep working
while (tree.len() != 1) || (tree[0].children() == Self::MAX_OUTPUTS) {
let mut branch_layer = vec![];
for children in tree.chunks(Self::MAX_OUTPUTS) {
branch_layer.push(TreeTransaction::<AddressFor<S>>::Branch {
children: children.to_vec(),
value: children.iter().map(TreeTransaction::value).sum(),
});
}
tree = branch_layer;
}
assert_eq!(tree.len(), 1);
let tree_root = tree.remove(0);
assert!((tree_root.children() + 1) <= Self::MAX_OUTPUTS);
tree_root
}
}

View file

@ -0,0 +1,146 @@
use borsh::{BorshSerialize, BorshDeserialize};
use serai_primitives::{Coin, Amount, Balance};
use primitives::{Address, Payment};
use scanner::ScannerFeed;
/// A transaction within a tree to fulfill payments.
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub enum TreeTransaction<A: Address> {
/// A transaction for the leaves (payments) of the tree.
Leaves {
/// The payments within this transaction.
payments: Vec<Payment<A>>,
/// The sum value of the payments.
value: u64,
},
/// A transaction for the branches of the tree.
Branch {
/// The child transactions.
children: Vec<Self>,
/// The sum value of the child transactions.
value: u64,
},
}
impl<A: Address> TreeTransaction<A> {
/// How many children this transaction has.
///
/// A child is defined as any dependent, whether payment or transaction.
pub fn children(&self) -> usize {
match self {
Self::Leaves { payments, .. } => payments.len(),
Self::Branch { children, .. } => children.len(),
}
}
/// The value this transaction wants to spend.
pub fn value(&self) -> u64 {
match self {
Self::Leaves { value, .. } | Self::Branch { value, .. } => *value,
}
}
/// The payments to make to enable this transaction's children.
///
/// A child is defined as any dependent, whether payment or transaction.
///
/// The input value given to this transaction MUST be less than or equal to the desired value.
/// The difference will be amortized over all dependents.
///
/// Returns None if no payments should be made. Returns Some containing a non-empty Vec if any
/// payments should be made.
pub fn payments<S: ScannerFeed>(
&self,
coin: Coin,
branch_address: &A,
input_value: u64,
) -> Option<Vec<Payment<A>>> {
// Fetch the amounts for the payments we'll make
let mut amounts: Vec<_> = match self {
Self::Leaves { payments, .. } => payments
.iter()
.map(|payment| {
assert_eq!(payment.balance().coin, coin);
Some(payment.balance().amount.0)
})
.collect(),
Self::Branch { children, .. } => children.iter().map(|child| Some(child.value())).collect(),
};
// We need to reduce them so their sum is our input value
assert!(input_value <= self.value());
let amount_to_amortize = self.value() - input_value;
// If any payments won't survive the reduction, set them to None
let mut amortized = 0;
'outer: while amounts.iter().any(Option::is_some) && (amortized < amount_to_amortize) {
let adjusted_fee = amount_to_amortize - amortized;
let amounts_len =
u64::try_from(amounts.iter().filter(|amount| amount.is_some()).count()).unwrap();
let per_payment_fee_check = adjusted_fee.div_ceil(amounts_len);
// Check each amount to see if it's not viable
let mut i = 0;
while i < amounts.len() {
if let Some(amount) = amounts[i] {
if amount.saturating_sub(per_payment_fee_check) < S::dust(coin).0 {
amounts[i] = None;
amortized += amount;
// If this amount wasn't viable, re-run with the new fee/amortization amounts
continue 'outer;
}
}
i += 1;
}
// Now that we have the payments which will survive, reduce them
for (i, amount) in amounts.iter_mut().enumerate() {
if let Some(amount) = amount {
*amount -= adjusted_fee / amounts_len;
if i < usize::try_from(adjusted_fee % amounts_len).unwrap() {
*amount -= 1;
}
}
}
break;
}
// Now that we have the reduced amounts, create the payments
let payments: Vec<_> = match self {
Self::Leaves { payments, .. } => {
payments
.iter()
.zip(amounts)
.filter_map(|(payment, amount)| {
amount.map(|amount| {
// The existing payment, with the new amount
Payment::new(
payment.address().clone(),
Balance { coin, amount: Amount(amount) },
payment.data().clone(),
)
})
})
.collect()
}
Self::Branch { .. } => {
amounts
.into_iter()
.filter_map(|amount| {
amount.map(|amount| {
// A branch output with the new amount
Payment::new(branch_address.clone(), Balance { coin, amount: Amount(amount) }, None)
})
})
.collect()
}
};
// Use None for vec![] so we never actually use vec![]
if payments.is_empty() {
None?;
}
Some(payments)
}
}

View file

@ -0,0 +1,35 @@
[package]
name = "serai-processor-utxo-scheduler"
version = "0.1.0"
description = "Scheduler for UTXO networks for the Serai processor"
license = "AGPL-3.0-only"
repository = "https://github.com/serai-dex/serai/tree/develop/processor/scheduler/utxo/standard"
authors = ["Luke Parker <lukeparker5132@gmail.com>"]
keywords = []
edition = "2021"
publish = false
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[package.metadata.cargo-machete]
ignored = ["scale", "borsh"]
[lints]
workspace = true
[dependencies]
group = { version = "0.13", default-features = false }
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] }
borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] }
serai-primitives = { path = "../../../../substrate/primitives", default-features = false, features = ["std"] }
serai-db = { path = "../../../../common/db" }
primitives = { package = "serai-processor-primitives", path = "../../../primitives" }
scanner = { package = "serai-processor-scanner", path = "../../../scanner" }
scheduler-primitives = { package = "serai-processor-scheduler-primitives", path = "../../primitives" }
utxo-scheduler-primitives = { package = "serai-processor-utxo-scheduler-primitives", path = "../primitives" }

View file

@ -0,0 +1,15 @@
AGPL-3.0-only license
Copyright (c) 2024 Luke Parker
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License Version 3 as
published by the Free Software Foundation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.

View file

@ -0,0 +1,17 @@
# UTXO Scheduler
A scheduler of transactions for networks premised on the UTXO model.
### Design
The scheduler is designed to achieve fulfillment of all expected payments with
an `O(1)` delay (regardless of prior scheduler state), `O(log n)` time, and
`O(log(n) + n)` computational complexity.
For the time/computational complexity, we use a tree to fulfill payments.
This quickly gives us the ability to make as many outputs as necessary
(regardless of per-transaction output limits) and only has the latency of
including a chain of `O(log n)` transactions on-chain. The only computational
overhead is in creating the transactions which are branches in the tree.
Since we split off the root of the tree from a master output, the delay to start
fulfillment is the delay for the master output to re-appear on-chain.

View file

@ -0,0 +1,113 @@
use core::marker::PhantomData;
use group::GroupEncoding;
use serai_primitives::{Coin, Amount, Balance};
use borsh::BorshDeserialize;
use serai_db::{Get, DbTxn, create_db, db_channel};
use primitives::{Payment, ReceivedOutput};
use utxo_scheduler_primitives::TreeTransaction;
use scanner::{ScannerFeed, KeyFor, AddressFor, OutputFor};
create_db! {
UtxoScheduler {
OperatingCosts: (coin: Coin) -> Amount,
SerializedOutputs: (key: &[u8], coin: Coin) -> Vec<u8>,
SerializedQueuedPayments: (key: &[u8], coin: Coin) -> Vec<u8>,
}
}
db_channel! {
UtxoScheduler {
PendingBranch: (key: &[u8], balance: Balance) -> Vec<u8>,
}
}
pub(crate) struct Db<S: ScannerFeed>(PhantomData<S>);
impl<S: ScannerFeed> Db<S> {
pub(crate) fn operating_costs(getter: &impl Get, coin: Coin) -> Amount {
OperatingCosts::get(getter, coin).unwrap_or(Amount(0))
}
pub(crate) fn set_operating_costs(txn: &mut impl DbTxn, coin: Coin, amount: Amount) {
OperatingCosts::set(txn, coin, &amount)
}
pub(crate) fn outputs(
getter: &impl Get,
key: KeyFor<S>,
coin: Coin,
) -> Option<Vec<OutputFor<S>>> {
let buf = SerializedOutputs::get(getter, key.to_bytes().as_ref(), coin)?;
let mut buf = buf.as_slice();
let mut res = Vec::with_capacity(buf.len() / 128);
while !buf.is_empty() {
res.push(OutputFor::<S>::read(&mut buf).unwrap());
}
Some(res)
}
pub(crate) fn set_outputs(
txn: &mut impl DbTxn,
key: KeyFor<S>,
coin: Coin,
outputs: &[OutputFor<S>],
) {
let mut buf = Vec::with_capacity(outputs.len() * 128);
for output in outputs {
output.write(&mut buf).unwrap();
}
SerializedOutputs::set(txn, key.to_bytes().as_ref(), coin, &buf);
}
pub(crate) fn del_outputs(txn: &mut impl DbTxn, key: KeyFor<S>, coin: Coin) {
SerializedOutputs::del(txn, key.to_bytes().as_ref(), coin);
}
pub(crate) fn queued_payments(
getter: &impl Get,
key: KeyFor<S>,
coin: Coin,
) -> Option<Vec<Payment<AddressFor<S>>>> {
let buf = SerializedQueuedPayments::get(getter, key.to_bytes().as_ref(), coin)?;
let mut buf = buf.as_slice();
let mut res = Vec::with_capacity(buf.len() / 128);
while !buf.is_empty() {
res.push(Payment::read(&mut buf).unwrap());
}
Some(res)
}
pub(crate) fn set_queued_payments(
txn: &mut impl DbTxn,
key: KeyFor<S>,
coin: Coin,
queued: &[Payment<AddressFor<S>>],
) {
let mut buf = Vec::with_capacity(queued.len() * 128);
for queued in queued {
queued.write(&mut buf).unwrap();
}
SerializedQueuedPayments::set(txn, key.to_bytes().as_ref(), coin, &buf);
}
pub(crate) fn del_queued_payments(txn: &mut impl DbTxn, key: KeyFor<S>, coin: Coin) {
SerializedQueuedPayments::del(txn, key.to_bytes().as_ref(), coin);
}
pub(crate) fn queue_pending_branch(
txn: &mut impl DbTxn,
key: KeyFor<S>,
balance: Balance,
child: &TreeTransaction<AddressFor<S>>,
) {
PendingBranch::send(txn, key.to_bytes().as_ref(), balance, &borsh::to_vec(child).unwrap())
}
pub(crate) fn take_pending_branch(
txn: &mut impl DbTxn,
key: KeyFor<S>,
balance: Balance,
) -> Option<TreeTransaction<AddressFor<S>>> {
PendingBranch::try_recv(txn, key.to_bytes().as_ref(), balance)
.map(|bytes| TreeTransaction::<AddressFor<S>>::deserialize(&mut bytes.as_slice()).unwrap())
}
}

View file

@ -0,0 +1,508 @@
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![doc = include_str!("../README.md")]
#![deny(missing_docs)]
use core::marker::PhantomData;
use std::collections::HashMap;
use group::GroupEncoding;
use serai_primitives::{Coin, Amount, Balance};
use serai_db::DbTxn;
use primitives::{ReceivedOutput, Payment};
use scanner::{
LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, BlockFor,
SchedulerUpdate, Scheduler as SchedulerTrait,
};
use scheduler_primitives::*;
use utxo_scheduler_primitives::*;
mod db;
use db::Db;
/// A scheduler of transactions for networks premised on the UTXO model.
pub struct Scheduler<S: ScannerFeed, P: TransactionPlanner<S, ()>>(PhantomData<S>, PhantomData<P>);
impl<S: ScannerFeed, P: TransactionPlanner<S, ()>> Scheduler<S, P> {
fn aggregate_inputs(
txn: &mut impl DbTxn,
block: &BlockFor<S>,
key_for_change: KeyFor<S>,
key: KeyFor<S>,
coin: Coin,
) -> Vec<EventualityFor<S>> {
let mut eventualities = vec![];
let mut operating_costs = Db::<S>::operating_costs(txn, coin).0;
let mut outputs = Db::<S>::outputs(txn, key, coin).unwrap();
outputs.sort_by_key(|output| output.balance().amount.0);
while outputs.len() > P::MAX_INPUTS {
let to_aggregate = outputs.drain(.. P::MAX_INPUTS).collect::<Vec<_>>();
let Some(planned) = P::plan_transaction_with_fee_amortization(
&mut operating_costs,
P::fee_rate(block, coin),
to_aggregate,
vec![],
Some(key_for_change),
) else {
continue;
};
TransactionsToSign::<P::SignableTransaction>::send(txn, &key, &planned.signable);
eventualities.push(planned.eventuality);
}
Db::<S>::set_outputs(txn, key, coin, &outputs);
Db::<S>::set_operating_costs(txn, coin, Amount(operating_costs));
eventualities
}
fn fulfillable_payments(
txn: &mut impl DbTxn,
operating_costs: &mut u64,
key: KeyFor<S>,
coin: Coin,
value_of_outputs: u64,
) -> Vec<Payment<AddressFor<S>>> {
// Fetch all payments for this key
let mut payments = Db::<S>::queued_payments(txn, key, coin).unwrap();
if payments.is_empty() {
return vec![];
}
loop {
// inputs must be >= (payments - operating costs)
// Accordingly, (inputs + operating costs) must be >= payments
let value_fulfillable = value_of_outputs + *operating_costs;
// Drop to just the payments we can currently fulfill
{
let mut can_handle = 0;
let mut value_used = 0;
for payment in &payments {
value_used += payment.balance().amount.0;
if value_fulfillable < value_used {
break;
}
can_handle += 1;
}
let remaining_payments = payments.drain(can_handle ..).collect::<Vec<_>>();
// Restore the rest to the database
Db::<S>::set_queued_payments(txn, key, coin, &remaining_payments);
}
// If these payments are worth less than the operating costs, immediately drop them
let payments_value = payments.iter().map(|payment| payment.balance().amount.0).sum::<u64>();
if payments_value <= *operating_costs {
*operating_costs -= payments_value;
Db::<S>::set_operating_costs(txn, coin, Amount(*operating_costs));
// Reset payments to the queued payments
payments = Db::<S>::queued_payments(txn, key, coin).unwrap();
// If there's no more payments, stop looking for which payments we should fulfill
if payments.is_empty() {
return vec![];
}
// Find which of these we should handle
continue;
}
return payments;
}
}
fn queue_branches(
txn: &mut impl DbTxn,
key: KeyFor<S>,
coin: Coin,
effected_payments: Vec<Amount>,
tx: TreeTransaction<AddressFor<S>>,
) {
match tx {
TreeTransaction::Leaves { .. } => {}
TreeTransaction::Branch { mut children, .. } => {
children.sort_by_key(TreeTransaction::value);
children.reverse();
/*
This may only be a subset of payments but it'll be the originally-highest-valued
payments. `zip` will truncate to the first children which will be the highest-valued
children thanks to our sort.
*/
for (amount, child) in effected_payments.into_iter().zip(children) {
Db::<S>::queue_pending_branch(txn, key, Balance { coin, amount }, &child);
}
}
}
}
fn handle_branch(
txn: &mut impl DbTxn,
block: &BlockFor<S>,
eventualities: &mut Vec<EventualityFor<S>>,
output: OutputFor<S>,
tx: TreeTransaction<AddressFor<S>>,
) -> bool {
let key = output.key();
let coin = output.balance().coin;
let Some(payments) = tx.payments::<S>(coin, &P::branch_address(key), output.balance().amount.0)
else {
// If this output has become too small to satisfy this branch, drop it
return false;
};
let Some(planned) = P::plan_transaction_with_fee_amortization(
// Uses 0 as there's no operating costs to incur/amortize here
&mut 0,
P::fee_rate(block, coin),
vec![output],
payments,
None,
) else {
// This Branch isn't viable, so drop it (and its children)
return false;
};
TransactionsToSign::<P::SignableTransaction>::send(txn, &key, &planned.signable);
eventualities.push(planned.eventuality);
Self::queue_branches(txn, key, coin, planned.effected_payments, tx);
true
}
fn step(
txn: &mut impl DbTxn,
active_keys: &[(KeyFor<S>, LifetimeStage)],
block: &BlockFor<S>,
key: KeyFor<S>,
) -> Vec<EventualityFor<S>> {
let mut eventualities = vec![];
let key_for_change = match active_keys[0].1 {
LifetimeStage::ActiveYetNotReporting => {
panic!("expected to fulfill payments despite not reporting for the oldest key")
}
LifetimeStage::Active => active_keys[0].0,
LifetimeStage::UsingNewForChange | LifetimeStage::Forwarding | LifetimeStage::Finishing => {
active_keys[1].0
}
};
let branch_address = P::branch_address(key);
'coin: for coin in S::NETWORK.coins() {
let coin = *coin;
// Perform any input aggregation we should
eventualities.append(&mut Self::aggregate_inputs(txn, block, key_for_change, key, coin));
// Fetch the operating costs/outputs
let mut operating_costs = Db::<S>::operating_costs(txn, coin).0;
let outputs = Db::<S>::outputs(txn, key, coin).unwrap();
// Fetch the fulfillable payments
let payments = Self::fulfillable_payments(
txn,
&mut operating_costs,
key,
coin,
outputs.iter().map(|output| output.balance().amount.0).sum(),
);
if payments.is_empty() {
continue;
}
// Create a tree to fulfill the payments
let mut tree = vec![P::tree(&payments)];
// Create the transaction for the root of the tree
// Try creating this transaction twice, once with a change output and once with increased
// operating costs to ensure a change output (as necessary to meet the requirements of the
// scanner API)
let mut planned_outer = None;
for i in 0 .. 2 {
let Some(planned) = P::plan_transaction_with_fee_amortization(
&mut operating_costs,
P::fee_rate(block, coin),
outputs.clone(),
tree[0]
.payments::<S>(coin, &branch_address, tree[0].value())
.expect("payments were dropped despite providing an input of the needed value"),
Some(key_for_change),
) else {
// This should trip on the first iteration or not at all
assert_eq!(i, 0);
// This doesn't have inputs even worth aggregating so drop the entire tree
Db::<S>::set_operating_costs(txn, coin, Amount(operating_costs));
continue 'coin;
};
// If this doesn't have a change output, increase operating costs and try again
if !planned.has_change {
/*
Since we'll create a change output if it's worth at least dust, amortizing dust from
the payments should solve this. If the new transaction can't afford those operating
costs, then the payments should be amortized out, causing there to be a change or no
transaction at all.
*/
operating_costs += S::dust(coin).0;
continue;
}
// Since this had a change output, move forward with it
planned_outer = Some(planned);
break;
}
let Some(planned) = planned_outer else {
panic!("couldn't create a tree root with a change output")
};
Db::<S>::set_operating_costs(txn, coin, Amount(operating_costs));
TransactionsToSign::<P::SignableTransaction>::send(txn, &key, &planned.signable);
eventualities.push(planned.eventuality);
// Now save the next layer of the tree to the database
// We'll execute it when it appears
Self::queue_branches(txn, key, coin, planned.effected_payments, tree.remove(0));
}
eventualities
}
fn flush_outputs(
txn: &mut impl DbTxn,
eventualities: &mut HashMap<Vec<u8>, Vec<EventualityFor<S>>>,
block: &BlockFor<S>,
from: KeyFor<S>,
to: KeyFor<S>,
coin: Coin,
) {
let from_bytes = from.to_bytes().as_ref().to_vec();
// Ensure our inputs are aggregated
eventualities
.entry(from_bytes.clone())
.or_insert(vec![])
.append(&mut Self::aggregate_inputs(txn, block, to, from, coin));
// Now that our inputs are aggregated, transfer all of them to the new key
let mut operating_costs = Db::<S>::operating_costs(txn, coin).0;
let outputs = Db::<S>::outputs(txn, from, coin).unwrap();
if outputs.is_empty() {
return;
}
let planned = P::plan_transaction_with_fee_amortization(
&mut operating_costs,
P::fee_rate(block, coin),
outputs,
vec![],
Some(to),
);
Db::<S>::set_operating_costs(txn, coin, Amount(operating_costs));
let Some(planned) = planned else { return };
TransactionsToSign::<P::SignableTransaction>::send(txn, &from, &planned.signable);
eventualities.get_mut(&from_bytes).unwrap().push(planned.eventuality);
}
}
impl<S: ScannerFeed, P: TransactionPlanner<S, ()>> SchedulerTrait<S> for Scheduler<S, P> {
fn activate_key(txn: &mut impl DbTxn, key: KeyFor<S>) {
for coin in S::NETWORK.coins() {
assert!(Db::<S>::outputs(txn, key, *coin).is_none());
Db::<S>::set_outputs(txn, key, *coin, &[]);
assert!(Db::<S>::queued_payments(txn, key, *coin).is_none());
Db::<S>::set_queued_payments(txn, key, *coin, &[]);
}
}
fn flush_key(
txn: &mut impl DbTxn,
block: &BlockFor<S>,
retiring_key: KeyFor<S>,
new_key: KeyFor<S>,
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>> {
let mut eventualities = HashMap::new();
for coin in S::NETWORK.coins() {
// Move the payments to the new key
{
let still_queued = Db::<S>::queued_payments(txn, retiring_key, *coin).unwrap();
let mut new_queued = Db::<S>::queued_payments(txn, new_key, *coin).unwrap();
let mut queued = still_queued;
queued.append(&mut new_queued);
Db::<S>::set_queued_payments(txn, retiring_key, *coin, &[]);
Db::<S>::set_queued_payments(txn, new_key, *coin, &queued);
}
// Move the outputs to the new key
Self::flush_outputs(txn, &mut eventualities, block, retiring_key, new_key, *coin);
}
eventualities
}
fn retire_key(txn: &mut impl DbTxn, key: KeyFor<S>) {
for coin in S::NETWORK.coins() {
assert!(Db::<S>::outputs(txn, key, *coin).unwrap().is_empty());
Db::<S>::del_outputs(txn, key, *coin);
assert!(Db::<S>::queued_payments(txn, key, *coin).unwrap().is_empty());
Db::<S>::del_queued_payments(txn, key, *coin);
}
}
fn update(
txn: &mut impl DbTxn,
block: &BlockFor<S>,
active_keys: &[(KeyFor<S>, LifetimeStage)],
update: SchedulerUpdate<S>,
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>> {
let mut eventualities = HashMap::new();
// Accumulate the new outputs
{
let mut outputs_by_key = HashMap::new();
for output in update.outputs() {
// If this aligns for a branch, handle it
if let Some(branch) = Db::<S>::take_pending_branch(txn, output.key(), output.balance()) {
if Self::handle_branch(
txn,
block,
eventualities.entry(output.key().to_bytes().as_ref().to_vec()).or_insert(vec![]),
output.clone(),
branch,
) {
// If we could use it for a branch, we do and move on
// Else, we let it be accumulated by the standard accumulation code
continue;
}
}
let coin = output.balance().coin;
outputs_by_key
// Index by key and coin
.entry((output.key().to_bytes().as_ref().to_vec(), coin))
// If we haven't accumulated here prior, read the outputs from the database
.or_insert_with(|| (output.key(), Db::<S>::outputs(txn, output.key(), coin).unwrap()))
.1
.push(output.clone());
}
// Write the outputs back to the database
for ((_key_vec, coin), (key, outputs)) in outputs_by_key {
Db::<S>::set_outputs(txn, key, coin, &outputs);
}
}
// Fulfill the payments we prior couldn't
for (key, _stage) in active_keys {
eventualities
.entry(key.to_bytes().as_ref().to_vec())
.or_insert(vec![])
.append(&mut Self::step(txn, active_keys, block, *key));
}
// If this key has been flushed, forward all outputs
match active_keys[0].1 {
LifetimeStage::ActiveYetNotReporting |
LifetimeStage::Active |
LifetimeStage::UsingNewForChange => {}
LifetimeStage::Forwarding | LifetimeStage::Finishing => {
for coin in S::NETWORK.coins() {
Self::flush_outputs(
txn,
&mut eventualities,
block,
active_keys[0].0,
active_keys[1].0,
*coin,
);
}
}
}
// Create the transactions for the forwards/burns
{
let mut planned_txs = vec![];
for forward in update.forwards() {
let key = forward.key();
assert_eq!(active_keys.len(), 2);
assert_eq!(active_keys[0].1, LifetimeStage::Forwarding);
assert_eq!(active_keys[1].1, LifetimeStage::Active);
let forward_to_key = active_keys[1].0;
let Some(plan) = P::plan_transaction_with_fee_amortization(
// This uses 0 for the operating costs as we don't incur any here
// If the output can't pay for itself to be forwarded, we simply drop it
&mut 0,
P::fee_rate(block, forward.balance().coin),
vec![forward.clone()],
vec![Payment::new(P::forwarding_address(forward_to_key), forward.balance(), None)],
None,
) else {
continue;
};
planned_txs.push((key, plan));
}
for to_return in update.returns() {
let key = to_return.output().key();
let out_instruction =
Payment::new(to_return.address().clone(), to_return.output().balance(), None);
let Some(plan) = P::plan_transaction_with_fee_amortization(
// This uses 0 for the operating costs as we don't incur any here
// If the output can't pay for itself to be returned, we simply drop it
&mut 0,
P::fee_rate(block, out_instruction.balance().coin),
vec![to_return.output().clone()],
vec![out_instruction],
None,
) else {
continue;
};
planned_txs.push((key, plan));
}
for (key, planned_tx) in planned_txs {
// Send the transactions off for signing
TransactionsToSign::<P::SignableTransaction>::send(txn, &key, &planned_tx.signable);
// Insert the Eventualities into the result
eventualities.get_mut(key.to_bytes().as_ref()).unwrap().push(planned_tx.eventuality);
}
eventualities
}
}
fn fulfill(
txn: &mut impl DbTxn,
block: &BlockFor<S>,
active_keys: &[(KeyFor<S>, LifetimeStage)],
payments: Vec<Payment<AddressFor<S>>>,
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>> {
// Find the key to filfill these payments with
let fulfillment_key = match active_keys[0].1 {
LifetimeStage::ActiveYetNotReporting => {
panic!("expected to fulfill payments despite not reporting for the oldest key")
}
LifetimeStage::Active | LifetimeStage::UsingNewForChange => active_keys[0].0,
LifetimeStage::Forwarding | LifetimeStage::Finishing => active_keys[1].0,
};
// Queue the payments for this key
for coin in S::NETWORK.coins() {
let mut queued_payments = Db::<S>::queued_payments(txn, fulfillment_key, *coin).unwrap();
queued_payments
.extend(payments.iter().filter(|payment| payment.balance().coin == *coin).cloned());
Db::<S>::set_queued_payments(txn, fulfillment_key, *coin, &queued_payments);
}
// Handle the queued payments
HashMap::from([(
fulfillment_key.to_bytes().as_ref().to_vec(),
Self::step(txn, active_keys, block, fulfillment_key),
)])
}
}

View file

@ -9,7 +9,7 @@ to build and sign a transaction spending it.
The scheduler is designed to achieve fulfillment of all expected payments with
an `O(1)` delay (regardless of prior scheduler state), `O(log n)` time, and
`O(n)` computational complexity.
`O(log(n) + n)` computational complexity.
Due to the ability to chain transactions, we can immediately plan/sign dependent
transactions. For the time/computational complexity, we use a tree to fulfill

View file

@ -7,7 +7,7 @@ use std::collections::HashMap;
use group::GroupEncoding;
use serai_primitives::{Coin, Amount, Balance};
use serai_primitives::{Coin, Amount};
use serai_db::DbTxn;
@ -22,114 +22,6 @@ use utxo_scheduler_primitives::*;
mod db;
use db::Db;
#[derive(Clone)]
enum TreeTransaction<S: ScannerFeed> {
Leaves { payments: Vec<Payment<AddressFor<S>>>, value: u64 },
Branch { children: Vec<Self>, value: u64 },
}
impl<S: ScannerFeed> TreeTransaction<S> {
fn children(&self) -> usize {
match self {
Self::Leaves { payments, .. } => payments.len(),
Self::Branch { children, .. } => children.len(),
}
}
fn value(&self) -> u64 {
match self {
Self::Leaves { value, .. } | Self::Branch { value, .. } => *value,
}
}
fn payments(
&self,
coin: Coin,
branch_address: &AddressFor<S>,
input_value: u64,
) -> Option<Vec<Payment<AddressFor<S>>>> {
// Fetch the amounts for the payments we'll make
let mut amounts: Vec<_> = match self {
Self::Leaves { payments, .. } => {
payments.iter().map(|payment| Some(payment.balance().amount.0)).collect()
}
Self::Branch { children, .. } => children.iter().map(|child| Some(child.value())).collect(),
};
// We need to reduce them so their sum is our input value
assert!(input_value <= self.value());
let amount_to_amortize = self.value() - input_value;
// If any payments won't survive the reduction, set them to None
let mut amortized = 0;
'outer: while amounts.iter().any(Option::is_some) && (amortized < amount_to_amortize) {
let adjusted_fee = amount_to_amortize - amortized;
let amounts_len =
u64::try_from(amounts.iter().filter(|amount| amount.is_some()).count()).unwrap();
let per_payment_fee_check = adjusted_fee.div_ceil(amounts_len);
// Check each amount to see if it's not viable
let mut i = 0;
while i < amounts.len() {
if let Some(amount) = amounts[i] {
if amount.saturating_sub(per_payment_fee_check) < S::dust(coin).0 {
amounts[i] = None;
amortized += amount;
// If this amount wasn't viable, re-run with the new fee/amortization amounts
continue 'outer;
}
}
i += 1;
}
// Now that we have the payments which will survive, reduce them
for (i, amount) in amounts.iter_mut().enumerate() {
if let Some(amount) = amount {
*amount -= adjusted_fee / amounts_len;
if i < usize::try_from(adjusted_fee % amounts_len).unwrap() {
*amount -= 1;
}
}
}
break;
}
// Now that we have the reduced amounts, create the payments
let payments: Vec<_> = match self {
Self::Leaves { payments, .. } => {
payments
.iter()
.zip(amounts)
.filter_map(|(payment, amount)| {
amount.map(|amount| {
// The existing payment, with the new amount
Payment::new(
payment.address().clone(),
Balance { coin, amount: Amount(amount) },
payment.data().clone(),
)
})
})
.collect()
}
Self::Branch { .. } => {
amounts
.into_iter()
.filter_map(|amount| {
amount.map(|amount| {
// A branch output with the new amount
Payment::new(branch_address.clone(), Balance { coin, amount: Amount(amount) }, None)
})
})
.collect()
}
};
// Use None for vec![] so we never actually use vec![]
if payments.is_empty() {
None?;
}
Some(payments)
}
}
/// The outputs which will be effected by a PlannedTransaction and received by Serai.
pub struct EffectedReceivedOutputs<S: ScannerFeed>(Vec<OutputFor<S>>);
@ -306,30 +198,8 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
assert!(Db::<S>::queued_payments(txn, key, coin).unwrap().is_empty());
}
// Create a tree to fulfillthe payments
// This variable is for the current layer of the tree being built
let mut tree = Vec::with_capacity(payments.len().div_ceil(P::MAX_OUTPUTS));
// Push the branches for the leaves (the payments out)
for payments in payments.chunks(P::MAX_OUTPUTS) {
let value = payments.iter().map(|payment| payment.balance().amount.0).sum::<u64>();
tree.push(TreeTransaction::<S>::Leaves { payments: payments.to_vec(), value });
}
// While we haven't calculated a tree root, or the tree root doesn't support a change output,
// keep working
while (tree.len() != 1) || (tree[0].children() == P::MAX_OUTPUTS) {
let mut branch_layer = vec![];
for children in tree.chunks(P::MAX_OUTPUTS) {
branch_layer.push(TreeTransaction::<S>::Branch {
children: children.to_vec(),
value: children.iter().map(TreeTransaction::value).sum(),
});
}
tree = branch_layer;
}
assert_eq!(tree.len(), 1);
assert!((tree[0].children() + 1) <= P::MAX_OUTPUTS);
// Create a tree to fulfill the payments
let mut tree = vec![P::tree(&payments)];
// Create the transaction for the root of the tree
let mut branch_outputs = {
@ -343,7 +213,7 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
P::fee_rate(block, coin),
outputs.clone(),
tree[0]
.payments(coin, &branch_address, tree[0].value())
.payments::<S>(coin, &branch_address, tree[0].value())
.expect("payments were dropped despite providing an input of the needed value"),
Some(key_for_change),
) else {
@ -355,7 +225,7 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
};
// If this doesn't have a change output, increase operating costs and try again
if !planned.auxilliary.0.iter().any(|output| output.kind() == OutputType::Change) {
if !planned.has_change {
/*
Since we'll create a change output if it's worth at least dust, amortizing dust from
the payments should solve this. If the new transaction can't afford those operating
@ -399,11 +269,13 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
TreeTransaction::Branch { children, .. } => children,
};
while !tree.is_empty() {
// Sort the branch outputs by their value
// Sort the branch outputs by their value (high to low)
branch_outputs.sort_by_key(|a| a.balance().amount.0);
branch_outputs.reverse();
// Sort the transactions we should create by their value so they share an order with the
// branch outputs
tree.sort_by_key(TreeTransaction::value);
tree.reverse();
// If we dropped any Branch outputs, drop the associated children
tree.truncate(branch_outputs.len());
@ -417,7 +289,8 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
for (branch_output, tx) in branch_outputs_for_this_layer.into_iter().zip(this_layer) {
assert_eq!(branch_output.kind(), OutputType::Branch);
let Some(payments) = tx.payments(coin, &branch_address, branch_output.balance().amount.0)
let Some(payments) =
tx.payments::<S>(coin, &branch_address, branch_output.balance().amount.0)
else {
// If this output has become too small to satisfy this branch, drop it
continue;
@ -550,8 +423,9 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
// Fulfill the payments we prior couldn't
let mut eventualities = HashMap::new();
for (key, _stage) in active_keys {
eventualities
.insert(key.to_bytes().as_ref().to_vec(), Self::step(txn, active_keys, block, *key));
assert!(eventualities
.insert(key.to_bytes().as_ref().to_vec(), Self::step(txn, active_keys, block, *key))
.is_none());
}
// If this key has been flushed, forward all outputs