diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ca0bd4f5..a6260579 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -45,6 +45,7 @@ jobs: -p serai-processor-scanner \ -p serai-processor-scheduler-primitives \ -p serai-processor-utxo-scheduler-primitives \ + -p serai-processor-utxo-scheduler \ -p serai-processor-transaction-chaining-scheduler \ -p serai-processor \ -p tendermint-machine \ diff --git a/Cargo.lock b/Cargo.lock index dd1cc19e..b3fa4e36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8733,11 +8733,27 @@ dependencies = [ "serai-processor-utxo-scheduler-primitives", ] +[[package]] +name = "serai-processor-utxo-scheduler" +version = "0.1.0" +dependencies = [ + "borsh", + "group", + "parity-scale-codec", + "serai-db", + "serai-primitives", + "serai-processor-primitives", + "serai-processor-scanner", + "serai-processor-scheduler-primitives", + "serai-processor-utxo-scheduler-primitives", +] + [[package]] name = "serai-processor-utxo-scheduler-primitives" version = "0.1.0" dependencies = [ "async-trait", + "borsh", "serai-primitives", "serai-processor-primitives", "serai-processor-scanner", diff --git a/Cargo.toml b/Cargo.toml index b61cde68..a2d86c82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,7 @@ members = [ "processor/scanner", "processor/scheduler/primitives", "processor/scheduler/utxo/primitives", + "processor/scheduler/utxo/standard", "processor/scheduler/utxo/transaction-chaining", "processor", diff --git a/deny.toml b/deny.toml index 2ca0ca50..16d3cbea 100644 --- a/deny.toml +++ b/deny.toml @@ -52,6 +52,7 @@ exceptions = [ { allow = ["AGPL-3.0"], name = "serai-processor-scanner" }, { allow = ["AGPL-3.0"], name = "serai-processor-scheduler-primitives" }, { allow = ["AGPL-3.0"], name = "serai-processor-utxo-scheduler-primitives" }, + { allow = ["AGPL-3.0"], name = "serai-processor-standard-scheduler" }, { allow = ["AGPL-3.0"], name = "serai-processor-transaction-chaining-scheduler" }, { allow = ["AGPL-3.0"], name = "serai-processor" }, diff --git a/processor/primitives/src/output.rs b/processor/primitives/src/output.rs index d59b4fd0..cbfe59f3 100644 --- a/processor/primitives/src/output.rs +++ b/processor/primitives/src/output.rs @@ -3,12 +3,22 @@ use std::io; use group::GroupEncoding; +use borsh::{BorshSerialize, BorshDeserialize}; + use serai_primitives::{ExternalAddress, Balance}; use crate::Id; /// An address on the external network. -pub trait Address: Send + Sync + Clone + Into + TryFrom { +pub trait Address: + Send + + Sync + + Clone + + Into + + TryFrom + + BorshSerialize + + BorshDeserialize +{ /// Write this address. fn write(&self, writer: &mut impl io::Write) -> io::Result<()>; /// Read an address. diff --git a/processor/primitives/src/payment.rs b/processor/primitives/src/payment.rs index bf3c918c..67a5bbad 100644 --- a/processor/primitives/src/payment.rs +++ b/processor/primitives/src/payment.rs @@ -1,6 +1,7 @@ use std::io; use scale::{Encode, Decode, IoReader}; +use borsh::{BorshSerialize, BorshDeserialize}; use serai_primitives::{Balance, Data}; use serai_coins_primitives::OutInstructionWithBalance; @@ -8,7 +9,7 @@ use serai_coins_primitives::OutInstructionWithBalance; use crate::Address; /// A payment to fulfill. -#[derive(Clone)] +#[derive(Clone, BorshSerialize, BorshDeserialize)] pub struct Payment { address: A, balance: Balance, diff --git a/processor/scheduler/primitives/Cargo.toml b/processor/scheduler/primitives/Cargo.toml index 31d73853..cdf12cbb 100644 --- a/processor/scheduler/primitives/Cargo.toml +++ b/processor/scheduler/primitives/Cargo.toml @@ -13,6 +13,9 @@ publish = false all-features = true rustdoc-args = ["--cfg", "docsrs"] +[package.metadata.cargo-machete] +ignored = ["scale", "borsh"] + [lints] workspace = true diff --git a/processor/scheduler/utxo/primitives/Cargo.toml b/processor/scheduler/utxo/primitives/Cargo.toml index 4f2499f9..85935ae0 100644 --- a/processor/scheduler/utxo/primitives/Cargo.toml +++ b/processor/scheduler/utxo/primitives/Cargo.toml @@ -19,6 +19,8 @@ workspace = true [dependencies] async-trait = { version = "0.1", default-features = false } +borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } + serai-primitives = { path = "../../../../substrate/primitives", default-features = false, features = ["std"] } primitives = { package = "serai-processor-primitives", path = "../../../primitives" } diff --git a/processor/scheduler/utxo/primitives/src/lib.rs b/processor/scheduler/utxo/primitives/src/lib.rs index 81d5ebd7..274eb2a4 100644 --- a/processor/scheduler/utxo/primitives/src/lib.rs +++ b/processor/scheduler/utxo/primitives/src/lib.rs @@ -8,6 +8,9 @@ use primitives::{ReceivedOutput, Payment}; use scanner::{ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, BlockFor}; use scheduler_primitives::*; +mod tree; +pub use tree::*; + /// A planned transaction. pub struct PlannedTransaction { /// The signable transaction. @@ -18,6 +21,23 @@ pub struct PlannedTransaction { pub auxilliary: A, } +/// A planned transaction which was created via amortizing the fee. +pub struct AmortizePlannedTransaction { + /// The amounts the included payments were worth. + /// + /// If the payments passed as an argument are sorted from highest to lowest valued, these `n` + /// amounts will be for the first `n` payments. + pub effected_payments: Vec, + /// Whether or not the planned transaction had a change output. + pub has_change: bool, + /// The signable transaction. + pub signable: ST, + /// The Eventuality to watch for. + pub eventuality: EventualityFor, + /// The auxilliary data for this transaction. + pub auxilliary: A, +} + /// An object able to plan a transaction. #[async_trait::async_trait] pub trait TransactionPlanner: 'static + Send + Sync { @@ -60,7 +80,8 @@ pub trait TransactionPlanner: 'static + Send + Sync { /// This must only require the same fee as would be returned by `calculate_fee`. The caller is /// trusted to maintain `sum(inputs) - sum(payments) >= if change.is_some() { DUST } else { 0 }`. /// - /// `change` will always be an address belonging to the Serai network. + /// `change` will always be an address belonging to the Serai network. If it is `Some`, a change + /// output must be created. fn plan( fee_rate: Self::FeeRate, inputs: Vec>, @@ -82,7 +103,7 @@ pub trait TransactionPlanner: 'static + Send + Sync { inputs: Vec>, mut payments: Vec>>, mut change: Option>, - ) -> Option> { + ) -> Option> { // If there's no change output, we can't recoup any operating costs we would amortize // We also don't have any losses if the inputs are written off/the change output is reduced let mut operating_costs_if_no_change = 0; @@ -192,6 +213,48 @@ pub trait TransactionPlanner: 'static + Send + Sync { } // Because we amortized, or accrued as operating costs, the fee, make the transaction - Some(Self::plan(fee_rate, inputs, payments, change)) + let effected_payments = payments.iter().map(|payment| payment.balance().amount).collect(); + let has_change = change.is_some(); + let PlannedTransaction { signable, eventuality, auxilliary } = + Self::plan(fee_rate, inputs, payments, change); + Some(AmortizePlannedTransaction { + effected_payments, + has_change, + signable, + eventuality, + auxilliary, + }) + } + + /// Create a tree to fulfill a set of payments. + /// + /// Returns a `TreeTransaction` whose children (and arbitrary children of children) fulfill all + /// these payments. This tree root will be able to be made with a change output. + fn tree(payments: &[Payment>]) -> TreeTransaction> { + // This variable is for the current layer of the tree being built + let mut tree = Vec::with_capacity(payments.len().div_ceil(Self::MAX_OUTPUTS)); + + // Push the branches for the leaves (the payments out) + for payments in payments.chunks(Self::MAX_OUTPUTS) { + let value = payments.iter().map(|payment| payment.balance().amount.0).sum::(); + tree.push(TreeTransaction::>::Leaves { payments: payments.to_vec(), value }); + } + + // While we haven't calculated a tree root, or the tree root doesn't support a change output, + // keep working + while (tree.len() != 1) || (tree[0].children() == Self::MAX_OUTPUTS) { + let mut branch_layer = vec![]; + for children in tree.chunks(Self::MAX_OUTPUTS) { + branch_layer.push(TreeTransaction::>::Branch { + children: children.to_vec(), + value: children.iter().map(TreeTransaction::value).sum(), + }); + } + tree = branch_layer; + } + assert_eq!(tree.len(), 1); + let tree_root = tree.remove(0); + assert!((tree_root.children() + 1) <= Self::MAX_OUTPUTS); + tree_root } } diff --git a/processor/scheduler/utxo/primitives/src/tree.rs b/processor/scheduler/utxo/primitives/src/tree.rs new file mode 100644 index 00000000..b52f3ba3 --- /dev/null +++ b/processor/scheduler/utxo/primitives/src/tree.rs @@ -0,0 +1,146 @@ +use borsh::{BorshSerialize, BorshDeserialize}; + +use serai_primitives::{Coin, Amount, Balance}; + +use primitives::{Address, Payment}; +use scanner::ScannerFeed; + +/// A transaction within a tree to fulfill payments. +#[derive(Clone, BorshSerialize, BorshDeserialize)] +pub enum TreeTransaction { + /// A transaction for the leaves (payments) of the tree. + Leaves { + /// The payments within this transaction. + payments: Vec>, + /// The sum value of the payments. + value: u64, + }, + /// A transaction for the branches of the tree. + Branch { + /// The child transactions. + children: Vec, + /// The sum value of the child transactions. + value: u64, + }, +} +impl TreeTransaction { + /// How many children this transaction has. + /// + /// A child is defined as any dependent, whether payment or transaction. + pub fn children(&self) -> usize { + match self { + Self::Leaves { payments, .. } => payments.len(), + Self::Branch { children, .. } => children.len(), + } + } + + /// The value this transaction wants to spend. + pub fn value(&self) -> u64 { + match self { + Self::Leaves { value, .. } | Self::Branch { value, .. } => *value, + } + } + + /// The payments to make to enable this transaction's children. + /// + /// A child is defined as any dependent, whether payment or transaction. + /// + /// The input value given to this transaction MUST be less than or equal to the desired value. + /// The difference will be amortized over all dependents. + /// + /// Returns None if no payments should be made. Returns Some containing a non-empty Vec if any + /// payments should be made. + pub fn payments( + &self, + coin: Coin, + branch_address: &A, + input_value: u64, + ) -> Option>> { + // Fetch the amounts for the payments we'll make + let mut amounts: Vec<_> = match self { + Self::Leaves { payments, .. } => payments + .iter() + .map(|payment| { + assert_eq!(payment.balance().coin, coin); + Some(payment.balance().amount.0) + }) + .collect(), + Self::Branch { children, .. } => children.iter().map(|child| Some(child.value())).collect(), + }; + + // We need to reduce them so their sum is our input value + assert!(input_value <= self.value()); + let amount_to_amortize = self.value() - input_value; + + // If any payments won't survive the reduction, set them to None + let mut amortized = 0; + 'outer: while amounts.iter().any(Option::is_some) && (amortized < amount_to_amortize) { + let adjusted_fee = amount_to_amortize - amortized; + let amounts_len = + u64::try_from(amounts.iter().filter(|amount| amount.is_some()).count()).unwrap(); + let per_payment_fee_check = adjusted_fee.div_ceil(amounts_len); + + // Check each amount to see if it's not viable + let mut i = 0; + while i < amounts.len() { + if let Some(amount) = amounts[i] { + if amount.saturating_sub(per_payment_fee_check) < S::dust(coin).0 { + amounts[i] = None; + amortized += amount; + // If this amount wasn't viable, re-run with the new fee/amortization amounts + continue 'outer; + } + } + i += 1; + } + + // Now that we have the payments which will survive, reduce them + for (i, amount) in amounts.iter_mut().enumerate() { + if let Some(amount) = amount { + *amount -= adjusted_fee / amounts_len; + if i < usize::try_from(adjusted_fee % amounts_len).unwrap() { + *amount -= 1; + } + } + } + break; + } + + // Now that we have the reduced amounts, create the payments + let payments: Vec<_> = match self { + Self::Leaves { payments, .. } => { + payments + .iter() + .zip(amounts) + .filter_map(|(payment, amount)| { + amount.map(|amount| { + // The existing payment, with the new amount + Payment::new( + payment.address().clone(), + Balance { coin, amount: Amount(amount) }, + payment.data().clone(), + ) + }) + }) + .collect() + } + Self::Branch { .. } => { + amounts + .into_iter() + .filter_map(|amount| { + amount.map(|amount| { + // A branch output with the new amount + Payment::new(branch_address.clone(), Balance { coin, amount: Amount(amount) }, None) + }) + }) + .collect() + } + }; + + // Use None for vec![] so we never actually use vec![] + if payments.is_empty() { + None?; + } + Some(payments) + } +} diff --git a/processor/scheduler/utxo/standard/Cargo.toml b/processor/scheduler/utxo/standard/Cargo.toml new file mode 100644 index 00000000..d6c16161 --- /dev/null +++ b/processor/scheduler/utxo/standard/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "serai-processor-utxo-scheduler" +version = "0.1.0" +description = "Scheduler for UTXO networks for the Serai processor" +license = "AGPL-3.0-only" +repository = "https://github.com/serai-dex/serai/tree/develop/processor/scheduler/utxo/standard" +authors = ["Luke Parker "] +keywords = [] +edition = "2021" +publish = false + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[package.metadata.cargo-machete] +ignored = ["scale", "borsh"] + +[lints] +workspace = true + +[dependencies] +group = { version = "0.13", default-features = false } + +scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] } +borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } + +serai-primitives = { path = "../../../../substrate/primitives", default-features = false, features = ["std"] } + +serai-db = { path = "../../../../common/db" } + +primitives = { package = "serai-processor-primitives", path = "../../../primitives" } +scanner = { package = "serai-processor-scanner", path = "../../../scanner" } +scheduler-primitives = { package = "serai-processor-scheduler-primitives", path = "../../primitives" } +utxo-scheduler-primitives = { package = "serai-processor-utxo-scheduler-primitives", path = "../primitives" } diff --git a/processor/scheduler/utxo/standard/LICENSE b/processor/scheduler/utxo/standard/LICENSE new file mode 100644 index 00000000..e091b149 --- /dev/null +++ b/processor/scheduler/utxo/standard/LICENSE @@ -0,0 +1,15 @@ +AGPL-3.0-only license + +Copyright (c) 2024 Luke Parker + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License Version 3 as +published by the Free Software Foundation. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . diff --git a/processor/scheduler/utxo/standard/README.md b/processor/scheduler/utxo/standard/README.md new file mode 100644 index 00000000..8e5360f0 --- /dev/null +++ b/processor/scheduler/utxo/standard/README.md @@ -0,0 +1,17 @@ +# UTXO Scheduler + +A scheduler of transactions for networks premised on the UTXO model. + +### Design + +The scheduler is designed to achieve fulfillment of all expected payments with +an `O(1)` delay (regardless of prior scheduler state), `O(log n)` time, and +`O(log(n) + n)` computational complexity. + +For the time/computational complexity, we use a tree to fulfill payments. +This quickly gives us the ability to make as many outputs as necessary +(regardless of per-transaction output limits) and only has the latency of +including a chain of `O(log n)` transactions on-chain. The only computational +overhead is in creating the transactions which are branches in the tree. +Since we split off the root of the tree from a master output, the delay to start +fulfillment is the delay for the master output to re-appear on-chain. diff --git a/processor/scheduler/utxo/standard/src/db.rs b/processor/scheduler/utxo/standard/src/db.rs new file mode 100644 index 00000000..00761595 --- /dev/null +++ b/processor/scheduler/utxo/standard/src/db.rs @@ -0,0 +1,113 @@ +use core::marker::PhantomData; + +use group::GroupEncoding; + +use serai_primitives::{Coin, Amount, Balance}; + +use borsh::BorshDeserialize; +use serai_db::{Get, DbTxn, create_db, db_channel}; + +use primitives::{Payment, ReceivedOutput}; +use utxo_scheduler_primitives::TreeTransaction; +use scanner::{ScannerFeed, KeyFor, AddressFor, OutputFor}; + +create_db! { + UtxoScheduler { + OperatingCosts: (coin: Coin) -> Amount, + SerializedOutputs: (key: &[u8], coin: Coin) -> Vec, + SerializedQueuedPayments: (key: &[u8], coin: Coin) -> Vec, + } +} + +db_channel! { + UtxoScheduler { + PendingBranch: (key: &[u8], balance: Balance) -> Vec, + } +} + +pub(crate) struct Db(PhantomData); +impl Db { + pub(crate) fn operating_costs(getter: &impl Get, coin: Coin) -> Amount { + OperatingCosts::get(getter, coin).unwrap_or(Amount(0)) + } + pub(crate) fn set_operating_costs(txn: &mut impl DbTxn, coin: Coin, amount: Amount) { + OperatingCosts::set(txn, coin, &amount) + } + + pub(crate) fn outputs( + getter: &impl Get, + key: KeyFor, + coin: Coin, + ) -> Option>> { + let buf = SerializedOutputs::get(getter, key.to_bytes().as_ref(), coin)?; + let mut buf = buf.as_slice(); + + let mut res = Vec::with_capacity(buf.len() / 128); + while !buf.is_empty() { + res.push(OutputFor::::read(&mut buf).unwrap()); + } + Some(res) + } + pub(crate) fn set_outputs( + txn: &mut impl DbTxn, + key: KeyFor, + coin: Coin, + outputs: &[OutputFor], + ) { + let mut buf = Vec::with_capacity(outputs.len() * 128); + for output in outputs { + output.write(&mut buf).unwrap(); + } + SerializedOutputs::set(txn, key.to_bytes().as_ref(), coin, &buf); + } + pub(crate) fn del_outputs(txn: &mut impl DbTxn, key: KeyFor, coin: Coin) { + SerializedOutputs::del(txn, key.to_bytes().as_ref(), coin); + } + + pub(crate) fn queued_payments( + getter: &impl Get, + key: KeyFor, + coin: Coin, + ) -> Option>>> { + let buf = SerializedQueuedPayments::get(getter, key.to_bytes().as_ref(), coin)?; + let mut buf = buf.as_slice(); + + let mut res = Vec::with_capacity(buf.len() / 128); + while !buf.is_empty() { + res.push(Payment::read(&mut buf).unwrap()); + } + Some(res) + } + pub(crate) fn set_queued_payments( + txn: &mut impl DbTxn, + key: KeyFor, + coin: Coin, + queued: &[Payment>], + ) { + let mut buf = Vec::with_capacity(queued.len() * 128); + for queued in queued { + queued.write(&mut buf).unwrap(); + } + SerializedQueuedPayments::set(txn, key.to_bytes().as_ref(), coin, &buf); + } + pub(crate) fn del_queued_payments(txn: &mut impl DbTxn, key: KeyFor, coin: Coin) { + SerializedQueuedPayments::del(txn, key.to_bytes().as_ref(), coin); + } + + pub(crate) fn queue_pending_branch( + txn: &mut impl DbTxn, + key: KeyFor, + balance: Balance, + child: &TreeTransaction>, + ) { + PendingBranch::send(txn, key.to_bytes().as_ref(), balance, &borsh::to_vec(child).unwrap()) + } + pub(crate) fn take_pending_branch( + txn: &mut impl DbTxn, + key: KeyFor, + balance: Balance, + ) -> Option>> { + PendingBranch::try_recv(txn, key.to_bytes().as_ref(), balance) + .map(|bytes| TreeTransaction::>::deserialize(&mut bytes.as_slice()).unwrap()) + } +} diff --git a/processor/scheduler/utxo/standard/src/lib.rs b/processor/scheduler/utxo/standard/src/lib.rs new file mode 100644 index 00000000..f69ca54b --- /dev/null +++ b/processor/scheduler/utxo/standard/src/lib.rs @@ -0,0 +1,508 @@ +#![cfg_attr(docsrs, feature(doc_auto_cfg))] +#![doc = include_str!("../README.md")] +#![deny(missing_docs)] + +use core::marker::PhantomData; +use std::collections::HashMap; + +use group::GroupEncoding; + +use serai_primitives::{Coin, Amount, Balance}; + +use serai_db::DbTxn; + +use primitives::{ReceivedOutput, Payment}; +use scanner::{ + LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, BlockFor, + SchedulerUpdate, Scheduler as SchedulerTrait, +}; +use scheduler_primitives::*; +use utxo_scheduler_primitives::*; + +mod db; +use db::Db; + +/// A scheduler of transactions for networks premised on the UTXO model. +pub struct Scheduler>(PhantomData, PhantomData

); + +impl> Scheduler { + fn aggregate_inputs( + txn: &mut impl DbTxn, + block: &BlockFor, + key_for_change: KeyFor, + key: KeyFor, + coin: Coin, + ) -> Vec> { + let mut eventualities = vec![]; + + let mut operating_costs = Db::::operating_costs(txn, coin).0; + let mut outputs = Db::::outputs(txn, key, coin).unwrap(); + outputs.sort_by_key(|output| output.balance().amount.0); + while outputs.len() > P::MAX_INPUTS { + let to_aggregate = outputs.drain(.. P::MAX_INPUTS).collect::>(); + + let Some(planned) = P::plan_transaction_with_fee_amortization( + &mut operating_costs, + P::fee_rate(block, coin), + to_aggregate, + vec![], + Some(key_for_change), + ) else { + continue; + }; + + TransactionsToSign::::send(txn, &key, &planned.signable); + eventualities.push(planned.eventuality); + } + + Db::::set_outputs(txn, key, coin, &outputs); + Db::::set_operating_costs(txn, coin, Amount(operating_costs)); + eventualities + } + + fn fulfillable_payments( + txn: &mut impl DbTxn, + operating_costs: &mut u64, + key: KeyFor, + coin: Coin, + value_of_outputs: u64, + ) -> Vec>> { + // Fetch all payments for this key + let mut payments = Db::::queued_payments(txn, key, coin).unwrap(); + if payments.is_empty() { + return vec![]; + } + + loop { + // inputs must be >= (payments - operating costs) + // Accordingly, (inputs + operating costs) must be >= payments + let value_fulfillable = value_of_outputs + *operating_costs; + + // Drop to just the payments we can currently fulfill + { + let mut can_handle = 0; + let mut value_used = 0; + for payment in &payments { + value_used += payment.balance().amount.0; + if value_fulfillable < value_used { + break; + } + can_handle += 1; + } + + let remaining_payments = payments.drain(can_handle ..).collect::>(); + // Restore the rest to the database + Db::::set_queued_payments(txn, key, coin, &remaining_payments); + } + + // If these payments are worth less than the operating costs, immediately drop them + let payments_value = payments.iter().map(|payment| payment.balance().amount.0).sum::(); + if payments_value <= *operating_costs { + *operating_costs -= payments_value; + Db::::set_operating_costs(txn, coin, Amount(*operating_costs)); + + // Reset payments to the queued payments + payments = Db::::queued_payments(txn, key, coin).unwrap(); + // If there's no more payments, stop looking for which payments we should fulfill + if payments.is_empty() { + return vec![]; + } + // Find which of these we should handle + continue; + } + + return payments; + } + } + + fn queue_branches( + txn: &mut impl DbTxn, + key: KeyFor, + coin: Coin, + effected_payments: Vec, + tx: TreeTransaction>, + ) { + match tx { + TreeTransaction::Leaves { .. } => {} + TreeTransaction::Branch { mut children, .. } => { + children.sort_by_key(TreeTransaction::value); + children.reverse(); + + /* + This may only be a subset of payments but it'll be the originally-highest-valued + payments. `zip` will truncate to the first children which will be the highest-valued + children thanks to our sort. + */ + for (amount, child) in effected_payments.into_iter().zip(children) { + Db::::queue_pending_branch(txn, key, Balance { coin, amount }, &child); + } + } + } + } + + fn handle_branch( + txn: &mut impl DbTxn, + block: &BlockFor, + eventualities: &mut Vec>, + output: OutputFor, + tx: TreeTransaction>, + ) -> bool { + let key = output.key(); + let coin = output.balance().coin; + let Some(payments) = tx.payments::(coin, &P::branch_address(key), output.balance().amount.0) + else { + // If this output has become too small to satisfy this branch, drop it + return false; + }; + + let Some(planned) = P::plan_transaction_with_fee_amortization( + // Uses 0 as there's no operating costs to incur/amortize here + &mut 0, + P::fee_rate(block, coin), + vec![output], + payments, + None, + ) else { + // This Branch isn't viable, so drop it (and its children) + return false; + }; + + TransactionsToSign::::send(txn, &key, &planned.signable); + eventualities.push(planned.eventuality); + + Self::queue_branches(txn, key, coin, planned.effected_payments, tx); + + true + } + + fn step( + txn: &mut impl DbTxn, + active_keys: &[(KeyFor, LifetimeStage)], + block: &BlockFor, + key: KeyFor, + ) -> Vec> { + let mut eventualities = vec![]; + + let key_for_change = match active_keys[0].1 { + LifetimeStage::ActiveYetNotReporting => { + panic!("expected to fulfill payments despite not reporting for the oldest key") + } + LifetimeStage::Active => active_keys[0].0, + LifetimeStage::UsingNewForChange | LifetimeStage::Forwarding | LifetimeStage::Finishing => { + active_keys[1].0 + } + }; + let branch_address = P::branch_address(key); + + 'coin: for coin in S::NETWORK.coins() { + let coin = *coin; + + // Perform any input aggregation we should + eventualities.append(&mut Self::aggregate_inputs(txn, block, key_for_change, key, coin)); + + // Fetch the operating costs/outputs + let mut operating_costs = Db::::operating_costs(txn, coin).0; + let outputs = Db::::outputs(txn, key, coin).unwrap(); + + // Fetch the fulfillable payments + let payments = Self::fulfillable_payments( + txn, + &mut operating_costs, + key, + coin, + outputs.iter().map(|output| output.balance().amount.0).sum(), + ); + if payments.is_empty() { + continue; + } + + // Create a tree to fulfill the payments + let mut tree = vec![P::tree(&payments)]; + + // Create the transaction for the root of the tree + // Try creating this transaction twice, once with a change output and once with increased + // operating costs to ensure a change output (as necessary to meet the requirements of the + // scanner API) + let mut planned_outer = None; + for i in 0 .. 2 { + let Some(planned) = P::plan_transaction_with_fee_amortization( + &mut operating_costs, + P::fee_rate(block, coin), + outputs.clone(), + tree[0] + .payments::(coin, &branch_address, tree[0].value()) + .expect("payments were dropped despite providing an input of the needed value"), + Some(key_for_change), + ) else { + // This should trip on the first iteration or not at all + assert_eq!(i, 0); + // This doesn't have inputs even worth aggregating so drop the entire tree + Db::::set_operating_costs(txn, coin, Amount(operating_costs)); + continue 'coin; + }; + + // If this doesn't have a change output, increase operating costs and try again + if !planned.has_change { + /* + Since we'll create a change output if it's worth at least dust, amortizing dust from + the payments should solve this. If the new transaction can't afford those operating + costs, then the payments should be amortized out, causing there to be a change or no + transaction at all. + */ + operating_costs += S::dust(coin).0; + continue; + } + + // Since this had a change output, move forward with it + planned_outer = Some(planned); + break; + } + let Some(planned) = planned_outer else { + panic!("couldn't create a tree root with a change output") + }; + Db::::set_operating_costs(txn, coin, Amount(operating_costs)); + TransactionsToSign::::send(txn, &key, &planned.signable); + eventualities.push(planned.eventuality); + + // Now save the next layer of the tree to the database + // We'll execute it when it appears + Self::queue_branches(txn, key, coin, planned.effected_payments, tree.remove(0)); + } + + eventualities + } + + fn flush_outputs( + txn: &mut impl DbTxn, + eventualities: &mut HashMap, Vec>>, + block: &BlockFor, + from: KeyFor, + to: KeyFor, + coin: Coin, + ) { + let from_bytes = from.to_bytes().as_ref().to_vec(); + // Ensure our inputs are aggregated + eventualities + .entry(from_bytes.clone()) + .or_insert(vec![]) + .append(&mut Self::aggregate_inputs(txn, block, to, from, coin)); + + // Now that our inputs are aggregated, transfer all of them to the new key + let mut operating_costs = Db::::operating_costs(txn, coin).0; + let outputs = Db::::outputs(txn, from, coin).unwrap(); + if outputs.is_empty() { + return; + } + let planned = P::plan_transaction_with_fee_amortization( + &mut operating_costs, + P::fee_rate(block, coin), + outputs, + vec![], + Some(to), + ); + Db::::set_operating_costs(txn, coin, Amount(operating_costs)); + let Some(planned) = planned else { return }; + + TransactionsToSign::::send(txn, &from, &planned.signable); + eventualities.get_mut(&from_bytes).unwrap().push(planned.eventuality); + } +} + +impl> SchedulerTrait for Scheduler { + fn activate_key(txn: &mut impl DbTxn, key: KeyFor) { + for coin in S::NETWORK.coins() { + assert!(Db::::outputs(txn, key, *coin).is_none()); + Db::::set_outputs(txn, key, *coin, &[]); + assert!(Db::::queued_payments(txn, key, *coin).is_none()); + Db::::set_queued_payments(txn, key, *coin, &[]); + } + } + + fn flush_key( + txn: &mut impl DbTxn, + block: &BlockFor, + retiring_key: KeyFor, + new_key: KeyFor, + ) -> HashMap, Vec>> { + let mut eventualities = HashMap::new(); + for coin in S::NETWORK.coins() { + // Move the payments to the new key + { + let still_queued = Db::::queued_payments(txn, retiring_key, *coin).unwrap(); + let mut new_queued = Db::::queued_payments(txn, new_key, *coin).unwrap(); + + let mut queued = still_queued; + queued.append(&mut new_queued); + + Db::::set_queued_payments(txn, retiring_key, *coin, &[]); + Db::::set_queued_payments(txn, new_key, *coin, &queued); + } + + // Move the outputs to the new key + Self::flush_outputs(txn, &mut eventualities, block, retiring_key, new_key, *coin); + } + eventualities + } + + fn retire_key(txn: &mut impl DbTxn, key: KeyFor) { + for coin in S::NETWORK.coins() { + assert!(Db::::outputs(txn, key, *coin).unwrap().is_empty()); + Db::::del_outputs(txn, key, *coin); + assert!(Db::::queued_payments(txn, key, *coin).unwrap().is_empty()); + Db::::del_queued_payments(txn, key, *coin); + } + } + + fn update( + txn: &mut impl DbTxn, + block: &BlockFor, + active_keys: &[(KeyFor, LifetimeStage)], + update: SchedulerUpdate, + ) -> HashMap, Vec>> { + let mut eventualities = HashMap::new(); + + // Accumulate the new outputs + { + let mut outputs_by_key = HashMap::new(); + for output in update.outputs() { + // If this aligns for a branch, handle it + if let Some(branch) = Db::::take_pending_branch(txn, output.key(), output.balance()) { + if Self::handle_branch( + txn, + block, + eventualities.entry(output.key().to_bytes().as_ref().to_vec()).or_insert(vec![]), + output.clone(), + branch, + ) { + // If we could use it for a branch, we do and move on + // Else, we let it be accumulated by the standard accumulation code + continue; + } + } + + let coin = output.balance().coin; + outputs_by_key + // Index by key and coin + .entry((output.key().to_bytes().as_ref().to_vec(), coin)) + // If we haven't accumulated here prior, read the outputs from the database + .or_insert_with(|| (output.key(), Db::::outputs(txn, output.key(), coin).unwrap())) + .1 + .push(output.clone()); + } + // Write the outputs back to the database + for ((_key_vec, coin), (key, outputs)) in outputs_by_key { + Db::::set_outputs(txn, key, coin, &outputs); + } + } + + // Fulfill the payments we prior couldn't + for (key, _stage) in active_keys { + eventualities + .entry(key.to_bytes().as_ref().to_vec()) + .or_insert(vec![]) + .append(&mut Self::step(txn, active_keys, block, *key)); + } + + // If this key has been flushed, forward all outputs + match active_keys[0].1 { + LifetimeStage::ActiveYetNotReporting | + LifetimeStage::Active | + LifetimeStage::UsingNewForChange => {} + LifetimeStage::Forwarding | LifetimeStage::Finishing => { + for coin in S::NETWORK.coins() { + Self::flush_outputs( + txn, + &mut eventualities, + block, + active_keys[0].0, + active_keys[1].0, + *coin, + ); + } + } + } + + // Create the transactions for the forwards/burns + { + let mut planned_txs = vec![]; + for forward in update.forwards() { + let key = forward.key(); + + assert_eq!(active_keys.len(), 2); + assert_eq!(active_keys[0].1, LifetimeStage::Forwarding); + assert_eq!(active_keys[1].1, LifetimeStage::Active); + let forward_to_key = active_keys[1].0; + + let Some(plan) = P::plan_transaction_with_fee_amortization( + // This uses 0 for the operating costs as we don't incur any here + // If the output can't pay for itself to be forwarded, we simply drop it + &mut 0, + P::fee_rate(block, forward.balance().coin), + vec![forward.clone()], + vec![Payment::new(P::forwarding_address(forward_to_key), forward.balance(), None)], + None, + ) else { + continue; + }; + planned_txs.push((key, plan)); + } + for to_return in update.returns() { + let key = to_return.output().key(); + let out_instruction = + Payment::new(to_return.address().clone(), to_return.output().balance(), None); + let Some(plan) = P::plan_transaction_with_fee_amortization( + // This uses 0 for the operating costs as we don't incur any here + // If the output can't pay for itself to be returned, we simply drop it + &mut 0, + P::fee_rate(block, out_instruction.balance().coin), + vec![to_return.output().clone()], + vec![out_instruction], + None, + ) else { + continue; + }; + planned_txs.push((key, plan)); + } + + for (key, planned_tx) in planned_txs { + // Send the transactions off for signing + TransactionsToSign::::send(txn, &key, &planned_tx.signable); + + // Insert the Eventualities into the result + eventualities.get_mut(key.to_bytes().as_ref()).unwrap().push(planned_tx.eventuality); + } + + eventualities + } + } + + fn fulfill( + txn: &mut impl DbTxn, + block: &BlockFor, + active_keys: &[(KeyFor, LifetimeStage)], + payments: Vec>>, + ) -> HashMap, Vec>> { + // Find the key to filfill these payments with + let fulfillment_key = match active_keys[0].1 { + LifetimeStage::ActiveYetNotReporting => { + panic!("expected to fulfill payments despite not reporting for the oldest key") + } + LifetimeStage::Active | LifetimeStage::UsingNewForChange => active_keys[0].0, + LifetimeStage::Forwarding | LifetimeStage::Finishing => active_keys[1].0, + }; + + // Queue the payments for this key + for coin in S::NETWORK.coins() { + let mut queued_payments = Db::::queued_payments(txn, fulfillment_key, *coin).unwrap(); + queued_payments + .extend(payments.iter().filter(|payment| payment.balance().coin == *coin).cloned()); + Db::::set_queued_payments(txn, fulfillment_key, *coin, &queued_payments); + } + + // Handle the queued payments + HashMap::from([( + fulfillment_key.to_bytes().as_ref().to_vec(), + Self::step(txn, active_keys, block, fulfillment_key), + )]) + } +} diff --git a/processor/scheduler/utxo/transaction-chaining/README.md b/processor/scheduler/utxo/transaction-chaining/README.md index 0788ff53..a129b669 100644 --- a/processor/scheduler/utxo/transaction-chaining/README.md +++ b/processor/scheduler/utxo/transaction-chaining/README.md @@ -9,7 +9,7 @@ to build and sign a transaction spending it. The scheduler is designed to achieve fulfillment of all expected payments with an `O(1)` delay (regardless of prior scheduler state), `O(log n)` time, and -`O(n)` computational complexity. +`O(log(n) + n)` computational complexity. Due to the ability to chain transactions, we can immediately plan/sign dependent transactions. For the time/computational complexity, we use a tree to fulfill diff --git a/processor/scheduler/utxo/transaction-chaining/src/lib.rs b/processor/scheduler/utxo/transaction-chaining/src/lib.rs index 321d4b60..9a4ed2eb 100644 --- a/processor/scheduler/utxo/transaction-chaining/src/lib.rs +++ b/processor/scheduler/utxo/transaction-chaining/src/lib.rs @@ -7,7 +7,7 @@ use std::collections::HashMap; use group::GroupEncoding; -use serai_primitives::{Coin, Amount, Balance}; +use serai_primitives::{Coin, Amount}; use serai_db::DbTxn; @@ -22,114 +22,6 @@ use utxo_scheduler_primitives::*; mod db; use db::Db; -#[derive(Clone)] -enum TreeTransaction { - Leaves { payments: Vec>>, value: u64 }, - Branch { children: Vec, value: u64 }, -} -impl TreeTransaction { - fn children(&self) -> usize { - match self { - Self::Leaves { payments, .. } => payments.len(), - Self::Branch { children, .. } => children.len(), - } - } - fn value(&self) -> u64 { - match self { - Self::Leaves { value, .. } | Self::Branch { value, .. } => *value, - } - } - fn payments( - &self, - coin: Coin, - branch_address: &AddressFor, - input_value: u64, - ) -> Option>>> { - // Fetch the amounts for the payments we'll make - let mut amounts: Vec<_> = match self { - Self::Leaves { payments, .. } => { - payments.iter().map(|payment| Some(payment.balance().amount.0)).collect() - } - Self::Branch { children, .. } => children.iter().map(|child| Some(child.value())).collect(), - }; - - // We need to reduce them so their sum is our input value - assert!(input_value <= self.value()); - let amount_to_amortize = self.value() - input_value; - - // If any payments won't survive the reduction, set them to None - let mut amortized = 0; - 'outer: while amounts.iter().any(Option::is_some) && (amortized < amount_to_amortize) { - let adjusted_fee = amount_to_amortize - amortized; - let amounts_len = - u64::try_from(amounts.iter().filter(|amount| amount.is_some()).count()).unwrap(); - let per_payment_fee_check = adjusted_fee.div_ceil(amounts_len); - - // Check each amount to see if it's not viable - let mut i = 0; - while i < amounts.len() { - if let Some(amount) = amounts[i] { - if amount.saturating_sub(per_payment_fee_check) < S::dust(coin).0 { - amounts[i] = None; - amortized += amount; - // If this amount wasn't viable, re-run with the new fee/amortization amounts - continue 'outer; - } - } - i += 1; - } - - // Now that we have the payments which will survive, reduce them - for (i, amount) in amounts.iter_mut().enumerate() { - if let Some(amount) = amount { - *amount -= adjusted_fee / amounts_len; - if i < usize::try_from(adjusted_fee % amounts_len).unwrap() { - *amount -= 1; - } - } - } - break; - } - - // Now that we have the reduced amounts, create the payments - let payments: Vec<_> = match self { - Self::Leaves { payments, .. } => { - payments - .iter() - .zip(amounts) - .filter_map(|(payment, amount)| { - amount.map(|amount| { - // The existing payment, with the new amount - Payment::new( - payment.address().clone(), - Balance { coin, amount: Amount(amount) }, - payment.data().clone(), - ) - }) - }) - .collect() - } - Self::Branch { .. } => { - amounts - .into_iter() - .filter_map(|amount| { - amount.map(|amount| { - // A branch output with the new amount - Payment::new(branch_address.clone(), Balance { coin, amount: Amount(amount) }, None) - }) - }) - .collect() - } - }; - - // Use None for vec![] so we never actually use vec![] - if payments.is_empty() { - None?; - } - Some(payments) - } -} - /// The outputs which will be effected by a PlannedTransaction and received by Serai. pub struct EffectedReceivedOutputs(Vec>); @@ -306,30 +198,8 @@ impl>> Sched assert!(Db::::queued_payments(txn, key, coin).unwrap().is_empty()); } - // Create a tree to fulfillthe payments - // This variable is for the current layer of the tree being built - let mut tree = Vec::with_capacity(payments.len().div_ceil(P::MAX_OUTPUTS)); - - // Push the branches for the leaves (the payments out) - for payments in payments.chunks(P::MAX_OUTPUTS) { - let value = payments.iter().map(|payment| payment.balance().amount.0).sum::(); - tree.push(TreeTransaction::::Leaves { payments: payments.to_vec(), value }); - } - - // While we haven't calculated a tree root, or the tree root doesn't support a change output, - // keep working - while (tree.len() != 1) || (tree[0].children() == P::MAX_OUTPUTS) { - let mut branch_layer = vec![]; - for children in tree.chunks(P::MAX_OUTPUTS) { - branch_layer.push(TreeTransaction::::Branch { - children: children.to_vec(), - value: children.iter().map(TreeTransaction::value).sum(), - }); - } - tree = branch_layer; - } - assert_eq!(tree.len(), 1); - assert!((tree[0].children() + 1) <= P::MAX_OUTPUTS); + // Create a tree to fulfill the payments + let mut tree = vec![P::tree(&payments)]; // Create the transaction for the root of the tree let mut branch_outputs = { @@ -343,7 +213,7 @@ impl>> Sched P::fee_rate(block, coin), outputs.clone(), tree[0] - .payments(coin, &branch_address, tree[0].value()) + .payments::(coin, &branch_address, tree[0].value()) .expect("payments were dropped despite providing an input of the needed value"), Some(key_for_change), ) else { @@ -355,7 +225,7 @@ impl>> Sched }; // If this doesn't have a change output, increase operating costs and try again - if !planned.auxilliary.0.iter().any(|output| output.kind() == OutputType::Change) { + if !planned.has_change { /* Since we'll create a change output if it's worth at least dust, amortizing dust from the payments should solve this. If the new transaction can't afford those operating @@ -399,11 +269,13 @@ impl>> Sched TreeTransaction::Branch { children, .. } => children, }; while !tree.is_empty() { - // Sort the branch outputs by their value + // Sort the branch outputs by their value (high to low) branch_outputs.sort_by_key(|a| a.balance().amount.0); + branch_outputs.reverse(); // Sort the transactions we should create by their value so they share an order with the // branch outputs tree.sort_by_key(TreeTransaction::value); + tree.reverse(); // If we dropped any Branch outputs, drop the associated children tree.truncate(branch_outputs.len()); @@ -417,7 +289,8 @@ impl>> Sched for (branch_output, tx) in branch_outputs_for_this_layer.into_iter().zip(this_layer) { assert_eq!(branch_output.kind(), OutputType::Branch); - let Some(payments) = tx.payments(coin, &branch_address, branch_output.balance().amount.0) + let Some(payments) = + tx.payments::(coin, &branch_address, branch_output.balance().amount.0) else { // If this output has become too small to satisfy this branch, drop it continue; @@ -550,8 +423,9 @@ impl>> Sched // Fulfill the payments we prior couldn't let mut eventualities = HashMap::new(); for (key, _stage) in active_keys { - eventualities - .insert(key.to_bytes().as_ref().to_vec(), Self::step(txn, active_keys, block, *key)); + assert!(eventualities + .insert(key.to_bytes().as_ref().to_vec(), Self::step(txn, active_keys, block, *key)) + .is_none()); } // If this key has been flushed, forward all outputs