From 75b47070027985c423b14610166720a5bd859a20 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 3 Sep 2024 01:41:51 -0400 Subject: [PATCH] Add input aggregation in the transaction-chaining scheduler Also handles some other misc in it. --- Cargo.lock | 1 + .../scheduler/utxo/primitives/Cargo.toml | 1 + .../scheduler/utxo/primitives/src/lib.rs | 23 +- .../utxo/transaction-chaining/Cargo.toml | 2 +- .../utxo/transaction-chaining/src/db.rs | 20 +- .../utxo/transaction-chaining/src/lib.rs | 291 ++++++++++++++---- 6 files changed, 268 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6e7ced07..dd1cc19e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8741,6 +8741,7 @@ dependencies = [ "serai-primitives", "serai-processor-primitives", "serai-processor-scanner", + "serai-processor-scheduler-primitives", ] [[package]] diff --git a/processor/scheduler/utxo/primitives/Cargo.toml b/processor/scheduler/utxo/primitives/Cargo.toml index 01d3db7d..4f2499f9 100644 --- a/processor/scheduler/utxo/primitives/Cargo.toml +++ b/processor/scheduler/utxo/primitives/Cargo.toml @@ -23,3 +23,4 @@ serai-primitives = { path = "../../../../substrate/primitives", default-features primitives = { package = "serai-processor-primitives", path = "../../../primitives" } scanner = { package = "serai-processor-scanner", path = "../../../scanner" } +scheduler-primitives = { package = "serai-processor-scheduler-primitives", path = "../../primitives" } diff --git a/processor/scheduler/utxo/primitives/src/lib.rs b/processor/scheduler/utxo/primitives/src/lib.rs index 2c6da97b..f3e220b0 100644 --- a/processor/scheduler/utxo/primitives/src/lib.rs +++ b/processor/scheduler/utxo/primitives/src/lib.rs @@ -7,11 +7,22 @@ use core::fmt::Debug; use serai_primitives::{Coin, Amount}; use primitives::{ReceivedOutput, Payment}; -use scanner::{ScannerFeed, KeyFor, AddressFor, OutputFor}; +use scanner::{ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor}; +use scheduler_primitives::*; + +/// A planned transaction. +pub struct PlannedTransaction { + /// The signable transaction. + pub signable: ST, + /// The Eventuality to watch for. + pub eventuality: EventualityFor, + /// The auxilliary data for this transaction. + pub auxilliary: A, +} /// An object able to plan a transaction. #[async_trait::async_trait] -pub trait TransactionPlanner: 'static + Send + Sync { +pub trait TransactionPlanner: 'static + Send + Sync { /// An error encountered when determining the fee rate. /// /// This MUST be an ephemeral error. Retrying fetching data from the blockchain MUST eventually @@ -21,8 +32,8 @@ pub trait TransactionPlanner: 'static + Send + Sync { /// The type representing a fee rate to use for transactions. type FeeRate: Clone + Copy; - /// The type representing a planned transaction. - type PlannedTransaction; + /// The type representing a signable transaction. + type SignableTransaction: SignableTransaction; /// Obtain the fee rate to pay. /// @@ -62,7 +73,7 @@ pub trait TransactionPlanner: 'static + Send + Sync { inputs: Vec>, payments: Vec>>, change: Option>, - ) -> Self::PlannedTransaction; + ) -> PlannedTransaction; /// Obtain a PlannedTransaction via amortizing the fee over the payments. /// @@ -77,7 +88,7 @@ pub trait TransactionPlanner: 'static + Send + Sync { inputs: Vec>, mut payments: Vec>>, mut change: Option>, - ) -> Option { + ) -> Option> { // Sanity checks { assert!(!inputs.is_empty()); diff --git a/processor/scheduler/utxo/transaction-chaining/Cargo.toml b/processor/scheduler/utxo/transaction-chaining/Cargo.toml index a6b12128..0b1eb155 100644 --- a/processor/scheduler/utxo/transaction-chaining/Cargo.toml +++ b/processor/scheduler/utxo/transaction-chaining/Cargo.toml @@ -30,6 +30,6 @@ serai-primitives = { path = "../../../../substrate/primitives", default-features serai-db = { path = "../../../../common/db" } primitives = { package = "serai-processor-primitives", path = "../../../primitives" } +scanner = { package = "serai-processor-scanner", path = "../../../scanner" } scheduler-primitives = { package = "serai-processor-scheduler-primitives", path = "../../primitives" } utxo-scheduler-primitives = { package = "serai-processor-utxo-scheduler-primitives", path = "../primitives" } -scanner = { package = "serai-processor-scanner", path = "../../../scanner" } diff --git a/processor/scheduler/utxo/transaction-chaining/src/db.rs b/processor/scheduler/utxo/transaction-chaining/src/db.rs index f6de26d1..7d800718 100644 --- a/processor/scheduler/utxo/transaction-chaining/src/db.rs +++ b/processor/scheduler/utxo/transaction-chaining/src/db.rs @@ -6,8 +6,8 @@ use serai_primitives::{Coin, Amount}; use serai_db::{Get, DbTxn, create_db}; -use primitives::ReceivedOutput; -use scanner::{ScannerFeed, KeyFor, OutputFor}; +use primitives::{Payment, ReceivedOutput}; +use scanner::{ScannerFeed, KeyFor, AddressFor, OutputFor}; create_db! { TransactionChainingScheduler { @@ -15,7 +15,7 @@ create_db! { SerializedOutputs: (key: &[u8], coin: Coin) -> Vec, // We should be immediately able to schedule the fulfillment of payments, yet this may not be // possible if we're in the middle of a multisig rotation (as our output set will be split) - SerializedQueuedPayments: (key: &[u8]) > Vec, + SerializedQueuedPayments: (key: &[u8], coin: Coin) -> Vec, } } @@ -61,13 +61,19 @@ impl Db { pub(crate) fn queued_payments( getter: &impl Get, key: KeyFor, - ) -> Option>> { + coin: Coin, + ) -> Option>>> { todo!("TODO") } - pub(crate) fn set_queued_payments(txn: &mut impl DbTxn, key: KeyFor, queued: Vec>) { + pub(crate) fn set_queued_payments( + txn: &mut impl DbTxn, + key: KeyFor, + coin: Coin, + queued: &Vec>>, + ) { todo!("TODO") } - pub(crate) fn del_outputs(txn: &mut impl DbTxn, key: KeyFor) { - SerializedQueuedPayments::del(txn, key.to_bytes().as_ref()); + pub(crate) fn del_queued_payments(txn: &mut impl DbTxn, key: KeyFor, coin: Coin) { + SerializedQueuedPayments::del(txn, key.to_bytes().as_ref(), coin); } } diff --git a/processor/scheduler/utxo/transaction-chaining/src/lib.rs b/processor/scheduler/utxo/transaction-chaining/src/lib.rs index 8f21e9d6..9e552c13 100644 --- a/processor/scheduler/utxo/transaction-chaining/src/lib.rs +++ b/processor/scheduler/utxo/transaction-chaining/src/lib.rs @@ -7,11 +7,11 @@ use std::collections::HashMap; use group::GroupEncoding; -use serai_primitives::Coin; +use serai_primitives::{Coin, Amount}; use serai_db::DbTxn; -use primitives::{ReceivedOutput, Payment}; +use primitives::{OutputType, ReceivedOutput, Payment}; use scanner::{ LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, SchedulerUpdate, Scheduler as SchedulerTrait, @@ -22,65 +22,205 @@ use utxo_scheduler_primitives::*; mod db; use db::Db; -/// A planned transaction. -pub struct PlannedTransaction { - /// The signable transaction. - signable: T, - /// The outputs we'll receive from this. - effected_received_outputs: OutputFor, - /// The Eventuality to watch for. - eventuality: EventualityFor, -} +/// The outputs which will be effected by a PlannedTransaction and received by Serai. +pub struct EffectedReceivedOutputs(Vec>); /// A scheduler of transactions for networks premised on the UTXO model which support /// transaction chaining. -pub struct Scheduler< - S: ScannerFeed, - T, - P: TransactionPlanner>, ->(PhantomData, PhantomData, PhantomData

); +pub struct Scheduler>>( + PhantomData, + PhantomData

, +); -impl>> - Scheduler -{ - fn accumulate_outputs(txn: &mut impl DbTxn, key: KeyFor, outputs: &[OutputFor]) { - // Accumulate them in memory - let mut outputs_by_coin = HashMap::with_capacity(1); - for output in outputs.iter().filter(|output| output.key() == key) { - let coin = output.balance().coin; - if let std::collections::hash_map::Entry::Vacant(e) = outputs_by_coin.entry(coin) { - e.insert(Db::::outputs(txn, key, coin).unwrap()); +impl>> Scheduler { + fn handle_queued_payments( + &mut self, + txn: &mut impl DbTxn, + active_keys: &[(KeyFor, LifetimeStage)], + key: KeyFor, + ) -> Vec> { + let mut eventualities = vec![]; + + for coin in S::NETWORK.coins() { + // Fetch our operating costs and all our outputs + let mut operating_costs = Db::::operating_costs(txn, *coin).0; + let mut outputs = Db::::outputs(txn, key, *coin).unwrap(); + + // Fetch the queued payments + let mut payments = Db::::queued_payments(txn, key, *coin).unwrap(); + if payments.is_empty() { + continue; } - outputs_by_coin.get_mut(&coin).unwrap().push(output.clone()); + + // If this is our only key, our outputs and operating costs should be greater than the + // payments' value + if active_keys.len() == 1 { + // The available amount of fulfill is the amount we have plus the amount we'll reduce by + // An alternative formulation would be `outputs >= (payments - operating costs)`, but + // that'd risk underflow + let available = + operating_costs + outputs.iter().map(|output| output.balance().amount.0).sum::(); + assert!(available >= payments.iter().map(|payment| payment.balance().amount.0).sum::()); + } + + let amount_of_payments_that_can_be_handled = + |operating_costs: u64, outputs: &[_], payments: &[_]| { + let value_available = + operating_costs + outputs.iter().map(|output| output.balance().amount.0).sum::(); + + let mut can_handle = 0; + let mut value_used = 0; + for payment in payments { + value_used += payment.balance().amount.0; + if value_available < value_used { + break; + } + can_handle += 1; + } + + can_handle + }; + + // Find the set of payments we should fulfill at this time + { + // Drop to just the payments we currently have the outputs for + { + let can_handle = + amount_of_payments_that_can_be_handled(operating_costs, &outputs, &payments); + let remaining_payments = payments.drain(can_handle ..).collect::>(); + // Restore the rest to the database + Db::::set_queued_payments(txn, key, *coin, &remaining_payments); + } + let payments_value = payments.iter().map(|payment| payment.balance().amount.0).sum::(); + + // If these payments are worth less than the operating costs, immediately drop them + if payments_value <= operating_costs { + operating_costs -= payments_value; + Db::::set_operating_costs(txn, *coin, Amount(operating_costs)); + return vec![]; + } + + // We explicitly sort AFTER deciding which payments to handle so we always handle the + // oldest queued payments first (preventing any from eternally being shuffled to the back + // of the line) + payments.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0)); + } + assert!(!payments.is_empty()); + + // Find the smallest set of outputs usable to fulfill these outputs + // Size is determined by the largest output, not quantity nor aggregate value + { + // We start by sorting low to high + outputs.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0)); + + let value_needed = + payments.iter().map(|payment| payment.balance().amount.0).sum::() - operating_costs; + + let mut needed = 0; + let mut value_present = 0; + for output in &outputs { + needed += 1; + value_present += output.balance().amount.0; + if value_present >= value_needed { + break; + } + } + + // Drain, and save back to the DB, the unnecessary outputs + let remaining_outputs = outputs.drain(needed ..).collect::>(); + Db::::set_outputs(txn, key, *coin, &remaining_outputs); + } + assert!(!outputs.is_empty()); + + // We now have the current operating costs, the outputs we're using, and the payments + // The database has the unused outputs/unfilfillable payments + // Actually plan/send off the transactions + + // While our set of outputs exceed the input limit, aggregate them + while outputs.len() > MAX_INPUTS { + let outputs_chunk = outputs.drain(.. MAX_INPUTS).collect::>(); + + // While we're aggregating these outputs, handle any payments we can + let payments_chunk = loop { + let can_handle = + amount_of_payments_that_can_be_handled(operating_costs, &outputs, &payments); + let payments_chunk = payments.drain(.. can_handle.min(MAX_OUTPUTS)).collect::>(); + + let payments_value = + payments_chunk.iter().map(|payment| payment.balance().amount.0).sum::(); + if payments_value <= operating_costs { + operating_costs -= payments_value; + continue; + } + break payments_chunk; + }; + + let Some(planned) = P::plan_transaction_with_fee_amortization( + &mut operating_costs, + fee_rates[coin], + outputs_chunk, + payments_chunk, + // We always use our key for the change here since we may need this change output to + // finish fulfilling these payments + Some(key), + ) else { + // We amortized all payments, and even when just trying to make the change output, these + // inputs couldn't afford their own aggregation and were written off + continue; + }; + + // Send the transactions off for signing + TransactionsToSign::::send(txn, &key, &planned.signable); + + // Push the Eventualities onto the result + eventualities.push(planned.eventuality); + + let mut effected_received_outputs = planned.auxilliary.0; + // Only handle Change so if someone burns to an External address, we don't use it here + // when the scanner will tell us to return it (without accumulating it) + effected_received_outputs.retain(|output| output.kind() == OutputType::Change); + outputs.append(&mut effected_received_outputs); + } + + // Now that we have an aggregated set of inputs, create the tree for payments + todo!("TODO"); } - // Flush them to the database - for (coin, outputs) in outputs_by_coin { - Db::::set_outputs(txn, key, coin, &outputs); - } + eventualities } } -impl< - S: ScannerFeed, - T: 'static + Send + Sync + SignableTransaction, - P: TransactionPlanner>, - > SchedulerTrait for Scheduler +impl>> SchedulerTrait + for Scheduler { fn activate_key(&mut self, txn: &mut impl DbTxn, key: KeyFor) { for coin in S::NETWORK.coins() { + assert!(Db::::outputs(txn, key, *coin).is_none()); Db::::set_outputs(txn, key, *coin, &[]); + assert!(Db::::queued_payments(txn, key, *coin).is_none()); + Db::::set_queued_payments(txn, key, *coin, &vec![]); } } fn flush_key(&mut self, txn: &mut impl DbTxn, retiring_key: KeyFor, new_key: KeyFor) { - todo!("TODO") + for coin in S::NETWORK.coins() { + let still_queued = Db::::queued_payments(txn, retiring_key, *coin).unwrap(); + let mut new_queued = Db::::queued_payments(txn, new_key, *coin).unwrap(); + + let mut queued = still_queued; + queued.append(&mut new_queued); + + Db::::set_queued_payments(txn, retiring_key, *coin, &vec![]); + Db::::set_queued_payments(txn, new_key, *coin, &queued); + } } fn retire_key(&mut self, txn: &mut impl DbTxn, key: KeyFor) { for coin in S::NETWORK.coins() { - assert!(Db::::outputs(txn, key, *coin).is_none()); + assert!(Db::::outputs(txn, key, *coin).unwrap().is_empty()); Db::::del_outputs(txn, key, *coin); + assert!(Db::::queued_payments(txn, key, *coin).unwrap().is_empty()); + Db::::del_queued_payments(txn, key, *coin); } } @@ -91,12 +231,41 @@ impl< update: SchedulerUpdate, ) -> HashMap, Vec>> { // Accumulate all the outputs - for key in active_keys { - Self::accumulate_outputs(txn, key.0, update.outputs()); + for (key, _) in active_keys { + // Accumulate them in memory + let mut outputs_by_coin = HashMap::with_capacity(1); + for output in update.outputs().iter().filter(|output| output.key() == *key) { + match output.kind() { + OutputType::External | OutputType::Forwarded => {}, + // TODO: Only accumulate these if we haven't already, but do accumulate if not + OutputType::Branch | OutputType::Change => todo!("TODO"), + } + let coin = output.balance().coin; + if let std::collections::hash_map::Entry::Vacant(e) = outputs_by_coin.entry(coin) { + e.insert(Db::::outputs(txn, *key, coin).unwrap()); + } + outputs_by_coin.get_mut(&coin).unwrap().push(output.clone()); + } + + // Flush them to the database + for (coin, outputs) in outputs_by_coin { + Db::::set_outputs(txn, *key, coin, &outputs); + } } let mut fee_rates: HashMap = todo!("TODO"); + // Fulfill the payments we prior couldn't + let mut eventualities = HashMap::new(); + for (key, _stage) in active_keys { + eventualities.insert( + key.to_bytes().as_ref().to_vec(), + self.handle_queued_payments(txn, active_keys, *key), + ); + } + + // TODO: If this key has been flushed, forward all outputs + // Create the transactions for the forwards/burns { let mut planned_txs = vec![]; @@ -137,20 +306,14 @@ impl< planned_txs.push((key, plan)); } - let mut eventualities = HashMap::new(); for (key, planned_tx) in planned_txs { // Send the transactions off for signing - TransactionsToSign::::send(txn, &key, &planned_tx.signable); + TransactionsToSign::::send(txn, &key, &planned_tx.signable); - // Insert the eventualities into the result - eventualities - .entry(key.to_bytes().as_ref().to_vec()) - .or_insert(Vec::with_capacity(1)) - .push(planned_tx.eventuality); + // Insert the Eventualities into the result + eventualities[key.to_bytes().as_ref()].push(planned_tx.eventuality); } - // TODO: Fulfill any payments we prior couldn't - eventualities } } @@ -159,13 +322,29 @@ impl< &mut self, txn: &mut impl DbTxn, active_keys: &[(KeyFor, LifetimeStage)], - payments: Vec>>, + mut payments: Vec>>, ) -> HashMap, Vec>> { - // TODO: Find the key to use for fulfillment - // TODO: Sort outputs and payments by amount - // TODO: For as long as we don't have sufficiently aggregated inputs to handle all payments, - // aggregate - // TODO: Create the tree for the payments - todo!("TODO") + // Find the key to filfill these payments with + let fulfillment_key = match active_keys[0].1 { + LifetimeStage::ActiveYetNotReporting => { + panic!("expected to fulfill payments despite not reporting for the oldest key") + } + LifetimeStage::Active | LifetimeStage::UsingNewForChange => active_keys[0].0, + LifetimeStage::Forwarding | LifetimeStage::Finishing => active_keys[1].0, + }; + + // Queue the payments for this key + for coin in S::NETWORK.coins() { + let mut queued_payments = Db::::queued_payments(txn, fulfillment_key, *coin).unwrap(); + queued_payments + .extend(payments.iter().filter(|payment| payment.balance().coin == *coin).cloned()); + Db::::set_queued_payments(txn, fulfillment_key, *coin, &queued_payments); + } + + // Handle the queued payments + HashMap::from([( + fulfillment_key.to_bytes().as_ref().to_vec(), + self.handle_queued_payments(txn, active_keys, fulfillment_key), + )]) } }