From 653ead1e8c46cc0b28b9e4613e5c19bf2715cd52 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 4 Sep 2024 01:44:21 -0400 Subject: [PATCH] Finish the tree logic in the transaction-chaining scheduler Also completes the DB functions, makes Scheduler never instantiated, and ensures tree roots have change outputs. --- processor/primitives/src/payment.rs | 21 + processor/scanner/src/eventuality/mod.rs | 28 +- processor/scanner/src/lib.rs | 21 +- .../scheduler/utxo/primitives/src/lib.rs | 18 +- .../utxo/transaction-chaining/src/db.rs | 21 +- .../utxo/transaction-chaining/src/lib.rs | 736 ++++++++++-------- 6 files changed, 477 insertions(+), 368 deletions(-) diff --git a/processor/primitives/src/payment.rs b/processor/primitives/src/payment.rs index 1bbb0604..bf3c918c 100644 --- a/processor/primitives/src/payment.rs +++ b/processor/primitives/src/payment.rs @@ -1,3 +1,7 @@ +use std::io; + +use scale::{Encode, Decode, IoReader}; + use serai_primitives::{Balance, Data}; use serai_coins_primitives::OutInstructionWithBalance; @@ -27,6 +31,7 @@ impl Payment { pub fn new(address: A, balance: Balance, data: Option>) -> Self { Payment { address, balance, data } } + /// The address to pay. pub fn address(&self) -> &A { &self.address @@ -39,4 +44,20 @@ impl Payment { pub fn data(&self) -> &Option> { &self.data } + + /// Read a Payment. + pub fn read(reader: &mut impl io::Read) -> io::Result { + let address = A::read(reader)?; + let reader = &mut IoReader(reader); + let balance = Balance::decode(reader).map_err(io::Error::other)?; + let data = Option::>::decode(reader).map_err(io::Error::other)?; + Ok(Self { address, balance, data }) + } + /// Write the Payment. + pub fn write(&self, writer: &mut impl io::Write) -> io::Result<()> { + self.address.write(writer).unwrap(); + self.balance.encode_to(writer); + self.data.encode_to(writer); + Ok(()) + } } diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index 5a7b4cca..84670f79 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -1,3 +1,4 @@ +use core::marker::PhantomData; use std::collections::{HashSet, HashMap}; use group::GroupEncoding; @@ -101,11 +102,11 @@ fn intake_eventualities( pub(crate) struct EventualityTask> { db: D, feed: S, - scheduler: Sch, + scheduler: PhantomData, } impl> EventualityTask { - pub(crate) fn new(mut db: D, feed: S, scheduler: Sch, start_block: u64) -> Self { + pub(crate) fn new(mut db: D, feed: S, start_block: u64) -> Self { if EventualityDb::::next_to_check_for_eventualities_block(&db).is_none() { // Initialize the DB let mut txn = db.txn(); @@ -113,7 +114,7 @@ impl> EventualityTask { txn.commit(); } - Self { db, feed, scheduler } + Self { db, feed, scheduler: PhantomData } } #[allow(clippy::type_complexity)] @@ -146,7 +147,7 @@ impl> EventualityTask { } // Returns a boolean of if we intaked any Burns. - fn intake_burns(&mut self) -> bool { + async fn intake_burns(&mut self) -> Result { let mut intaked_any = false; // If we've handled an notable block, we may have Burns being queued with it as the reference @@ -158,6 +159,8 @@ impl> EventualityTask { // others the new key let (_keys, keys_with_stages) = self.keys_and_keys_with_stages(latest_handled_notable_block); + let block = self.feed.block_by_number(&self.db, latest_handled_notable_block).await?; + let mut txn = self.db.txn(); // Drain the entire channel while let Some(burns) = @@ -165,8 +168,9 @@ impl> EventualityTask { { intaked_any = true; - let new_eventualities = self.scheduler.fulfill( + let new_eventualities = Sch::fulfill( &mut txn, + &block, &keys_with_stages, burns .into_iter() @@ -178,7 +182,7 @@ impl> EventualityTask { txn.commit(); } - intaked_any + Ok(intaked_any) } } @@ -197,7 +201,7 @@ impl> ContinuallyRan for EventualityTas // Start by intaking any Burns we have sitting around // It's important we run this regardless of if we have a new block to handle - made_progress |= self.intake_burns(); + made_progress |= self.intake_burns().await?; /* Eventualities increase upon one of two cases: @@ -253,7 +257,7 @@ impl> ContinuallyRan for EventualityTas // state will be for the newer block) #[allow(unused_assignments)] { - made_progress |= self.intake_burns(); + made_progress |= self.intake_burns().await?; } } @@ -278,7 +282,7 @@ impl> ContinuallyRan for EventualityTas for key in &keys { // If this is the key's activation block, activate it if key.activation_block_number == b { - self.scheduler.activate_key(&mut txn, key.key); + Sch::activate_key(&mut txn, key.key); } let completed_eventualities = { @@ -431,7 +435,7 @@ impl> ContinuallyRan for EventualityTas after a later one was already used). */ let new_eventualities = - self.scheduler.update(&mut txn, &keys_with_stages, scheduler_update); + Sch::update(&mut txn, &block, &keys_with_stages, scheduler_update); // Intake the new Eventualities for key in new_eventualities.keys() { keys @@ -451,7 +455,7 @@ impl> ContinuallyRan for EventualityTas key.key != keys.last().unwrap().key, "key which was forwarding was the last key (which has no key after it to forward to)" ); - self.scheduler.flush_key(&mut txn, key.key, keys.last().unwrap().key); + Sch::flush_key(&mut txn, &block, key.key, keys.last().unwrap().key); } // Now that we've intaked any Eventualities caused, check if we're retiring any keys @@ -469,7 +473,7 @@ impl> ContinuallyRan for EventualityTas // We tell the scheduler to retire it now as we're done with it, and this fn doesn't // require it be called with a canonical order - self.scheduler.retire_key(&mut txn, key.key); + Sch::retire_key(&mut txn, key.key); } } } diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 1818fbf0..8ecb731f 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -163,6 +163,8 @@ pub type AddressFor = <::Block as Block>::Address; pub type OutputFor = <::Block as Block>::Output; /// The eventuality type for this ScannerFeed. pub type EventualityFor = <::Block as Block>::Eventuality; +/// The block type for this ScannerFeed. +pub type BlockFor = ::Block; #[async_trait::async_trait] pub trait BatchPublisher: 'static + Send + Sync { @@ -245,7 +247,7 @@ pub trait Scheduler: 'static + Send { /// /// This SHOULD setup any necessary database structures. This SHOULD NOT cause the new key to /// be used as the primary key. The multisig rotation time clearly establishes its steps. - fn activate_key(&mut self, txn: &mut impl DbTxn, key: KeyFor); + fn activate_key(txn: &mut impl DbTxn, key: KeyFor); /// Flush all outputs within a retiring key to the new key. /// @@ -257,14 +259,20 @@ pub trait Scheduler: 'static + Send { /// /// If the retiring key has any unfulfilled payments associated with it, those MUST be made /// the responsibility of the new key. - fn flush_key(&mut self, txn: &mut impl DbTxn, retiring_key: KeyFor, new_key: KeyFor); + // TODO: This needs to return a HashMap for the eventualities + fn flush_key( + txn: &mut impl DbTxn, + block: &BlockFor, + retiring_key: KeyFor, + new_key: KeyFor, + ); /// Retire a key as it'll no longer be used. /// /// Any key retired MUST NOT still have outputs associated with it. This SHOULD be a NOP other /// than any assertions and database cleanup. This MUST NOT be expected to be called in a fashion /// ordered to any other calls. - fn retire_key(&mut self, txn: &mut impl DbTxn, key: KeyFor); + fn retire_key(txn: &mut impl DbTxn, key: KeyFor); /// Accumulate outputs into the scheduler, yielding the Eventualities now to be scanned for. /// @@ -275,7 +283,6 @@ pub trait Scheduler: 'static + Send { /// The `Vec` used as the key in the returned HashMap should be the encoded key the /// Eventualities are for. fn update( - &mut self, txn: &mut impl DbTxn, block: &BlockFor, active_keys: &[(KeyFor, LifetimeStage)], @@ -315,7 +322,6 @@ pub trait Scheduler: 'static + Send { has an output-to-Serai, the new primary output). */ fn fulfill( - &mut self, txn: &mut impl DbTxn, block: &BlockFor, active_keys: &[(KeyFor, LifetimeStage)], @@ -333,18 +339,17 @@ impl Scanner { /// Create a new scanner. /// /// This will begin its execution, spawning several asynchronous tasks. - pub async fn new( + pub async fn new>( db: impl Db, feed: S, batch_publisher: impl BatchPublisher, - scheduler: impl Scheduler, start_block: u64, ) -> Self { let index_task = index::IndexTask::new(db.clone(), feed.clone(), start_block).await; let scan_task = scan::ScanTask::new(db.clone(), feed.clone(), start_block); let report_task = report::ReportTask::<_, S, _>::new(db.clone(), batch_publisher, start_block); let substrate_task = substrate::SubstrateTask::<_, S>::new(db.clone()); - let eventuality_task = eventuality::EventualityTask::new(db, feed, scheduler, start_block); + let eventuality_task = eventuality::EventualityTask::<_, _, Sch>::new(db, feed, start_block); let (_index_handle, index_run) = RunNowHandle::new(); let (scan_handle, scan_run) = RunNowHandle::new(); diff --git a/processor/scheduler/utxo/primitives/src/lib.rs b/processor/scheduler/utxo/primitives/src/lib.rs index 356192ee..81d5ebd7 100644 --- a/processor/scheduler/utxo/primitives/src/lib.rs +++ b/processor/scheduler/utxo/primitives/src/lib.rs @@ -2,12 +2,10 @@ #![doc = include_str!("../README.md")] #![deny(missing_docs)] -use core::fmt::Debug; - use serai_primitives::{Coin, Amount}; use primitives::{ReceivedOutput, Payment}; -use scanner::{ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor}; +use scanner::{ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, BlockFor}; use scheduler_primitives::*; /// A planned transaction. @@ -23,12 +21,6 @@ pub struct PlannedTransaction { /// An object able to plan a transaction. #[async_trait::async_trait] pub trait TransactionPlanner: 'static + Send + Sync { - /// An error encountered when determining the fee rate. - /// - /// This MUST be an ephemeral error. Retrying fetching data from the blockchain MUST eventually - /// resolve without manual intervention/changing the arguments. - type EphemeralError: Debug; - /// The type representing a fee rate to use for transactions. type FeeRate: Clone + Copy; @@ -42,12 +34,8 @@ pub trait TransactionPlanner: 'static + Send + Sync { /// Obtain the fee rate to pay. /// - /// This must be constant to the finalized block referenced by this block number and the coin. - async fn fee_rate( - &self, - block_number: u64, - coin: Coin, - ) -> Result; + /// This must be constant to the block and coin. + fn fee_rate(block: &BlockFor, coin: Coin) -> Self::FeeRate; /// The branch address for this key of Serai's. fn branch_address(key: KeyFor) -> AddressFor; diff --git a/processor/scheduler/utxo/transaction-chaining/src/db.rs b/processor/scheduler/utxo/transaction-chaining/src/db.rs index d629480f..697f1009 100644 --- a/processor/scheduler/utxo/transaction-chaining/src/db.rs +++ b/processor/scheduler/utxo/transaction-chaining/src/db.rs @@ -61,13 +61,13 @@ impl Db { pub(crate) fn set_already_accumulated_output( txn: &mut impl DbTxn, - output: as ReceivedOutput, AddressFor>>::Id, + output: & as ReceivedOutput, AddressFor>>::Id, ) { AlreadyAccumulatedOutput::set(txn, output.as_ref(), &()); } pub(crate) fn take_if_already_accumulated_output( txn: &mut impl DbTxn, - output: as ReceivedOutput, AddressFor>>::Id, + output: & as ReceivedOutput, AddressFor>>::Id, ) -> bool { let res = AlreadyAccumulatedOutput::get(txn, output.as_ref()).is_some(); AlreadyAccumulatedOutput::del(txn, output.as_ref()); @@ -79,15 +79,26 @@ impl Db { key: KeyFor, coin: Coin, ) -> Option>>> { - todo!("TODO") + let buf = SerializedQueuedPayments::get(getter, key.to_bytes().as_ref(), coin)?; + let mut buf = buf.as_slice(); + + let mut res = Vec::with_capacity(buf.len() / 128); + while !buf.is_empty() { + res.push(Payment::read(&mut buf).unwrap()); + } + Some(res) } pub(crate) fn set_queued_payments( txn: &mut impl DbTxn, key: KeyFor, coin: Coin, - queued: &Vec>>, + queued: &[Payment>], ) { - todo!("TODO") + let mut buf = Vec::with_capacity(queued.len() * 128); + for queued in queued { + queued.write(&mut buf).unwrap(); + } + SerializedQueuedPayments::set(txn, key.to_bytes().as_ref(), coin, &buf); } pub(crate) fn del_queued_payments(txn: &mut impl DbTxn, key: KeyFor, coin: Coin) { SerializedQueuedPayments::del(txn, key.to_bytes().as_ref(), coin); diff --git a/processor/scheduler/utxo/transaction-chaining/src/lib.rs b/processor/scheduler/utxo/transaction-chaining/src/lib.rs index 31c70c1e..7359a87c 100644 --- a/processor/scheduler/utxo/transaction-chaining/src/lib.rs +++ b/processor/scheduler/utxo/transaction-chaining/src/lib.rs @@ -13,8 +13,8 @@ use serai_db::DbTxn; use primitives::{OutputType, ReceivedOutput, Payment}; use scanner::{ - LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, SchedulerUpdate, - Scheduler as SchedulerTrait, + LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, BlockFor, + SchedulerUpdate, Scheduler as SchedulerTrait, }; use scheduler_primitives::*; use utxo_scheduler_primitives::*; @@ -22,6 +22,114 @@ use utxo_scheduler_primitives::*; mod db; use db::Db; +#[derive(Clone)] +enum TreeTransaction { + Leaves { payments: Vec>>, value: u64 }, + Branch { children: Vec, value: u64 }, +} +impl TreeTransaction { + fn children(&self) -> usize { + match self { + Self::Leaves { payments, .. } => payments.len(), + Self::Branch { children, .. } => children.len(), + } + } + fn value(&self) -> u64 { + match self { + Self::Leaves { value, .. } | Self::Branch { value, .. } => *value, + } + } + fn payments( + &self, + coin: Coin, + branch_address: &AddressFor, + input_value: u64, + ) -> Option>>> { + // Fetch the amounts for the payments we'll make + let mut amounts: Vec<_> = match self { + Self::Leaves { payments, .. } => { + payments.iter().map(|payment| Some(payment.balance().amount.0)).collect() + } + Self::Branch { children, .. } => children.iter().map(|child| Some(child.value())).collect(), + }; + + // We need to reduce them so their sum is our input value + assert!(input_value <= self.value()); + let amount_to_amortize = self.value() - input_value; + + // If any payments won't survive the reduction, set them to None + let mut amortized = 0; + 'outer: while amounts.iter().any(Option::is_some) && (amortized < amount_to_amortize) { + let adjusted_fee = amount_to_amortize - amortized; + let amounts_len = + u64::try_from(amounts.iter().filter(|amount| amount.is_some()).count()).unwrap(); + let per_payment_fee_check = adjusted_fee.div_ceil(amounts_len); + + // Check each amount to see if it's not viable + let mut i = 0; + while i < amounts.len() { + if let Some(amount) = amounts[i] { + if amount.saturating_sub(per_payment_fee_check) < S::dust(coin).0 { + amounts[i] = None; + amortized += amount; + // If this amount wasn't viable, re-run with the new fee/amortization amounts + continue 'outer; + } + } + i += 1; + } + + // Now that we have the payments which will survive, reduce them + for (i, amount) in amounts.iter_mut().enumerate() { + if let Some(amount) = amount { + *amount -= adjusted_fee / amounts_len; + if i < usize::try_from(adjusted_fee % amounts_len).unwrap() { + *amount -= 1; + } + } + } + break; + } + + // Now that we have the reduced amounts, create the payments + let payments: Vec<_> = match self { + Self::Leaves { payments, .. } => { + payments + .iter() + .zip(amounts) + .filter_map(|(payment, amount)| { + amount.map(|amount| { + // The existing payment, with the new amount + Payment::new( + payment.address().clone(), + Balance { coin, amount: Amount(amount) }, + payment.data().clone(), + ) + }) + }) + .collect() + } + Self::Branch { .. } => { + amounts + .into_iter() + .filter_map(|amount| { + amount.map(|amount| { + // A branch output with the new amount + Payment::new(branch_address.clone(), Balance { coin, amount: Amount(amount) }, None) + }) + }) + .collect() + } + }; + + // Use None for vec![] so we never actually use vec![] + if payments.is_empty() { + None?; + } + Some(payments) + } +} + /// The outputs which will be effected by a PlannedTransaction and received by Serai. pub struct EffectedReceivedOutputs(Vec>); @@ -33,319 +141,315 @@ pub struct Scheduler>> Scheduler { - fn handle_queued_payments( - &mut self, + fn accumulate_outputs(txn: &mut impl DbTxn, outputs: Vec>, from_scanner: bool) { + let mut outputs_by_key = HashMap::new(); + for output in outputs { + if !from_scanner { + // Since this isn't being reported by the scanner, flag it so when the scanner does report + // it, we don't accumulate it again + Db::::set_already_accumulated_output(txn, &output.id()); + } else if Db::::take_if_already_accumulated_output(txn, &output.id()) { + continue; + } + + let coin = output.balance().coin; + outputs_by_key + // Index by key and coin + .entry((output.key().to_bytes().as_ref().to_vec(), coin)) + // If we haven't accumulated here prior, read the outputs from the database + .or_insert_with(|| (output.key(), Db::::outputs(txn, output.key(), coin).unwrap())) + .1 + .push(output); + } + // Write the outputs back to the database + for ((_key_vec, coin), (key, outputs)) in outputs_by_key { + Db::::set_outputs(txn, key, coin, &outputs); + } + } + + fn aggregate_inputs( + txn: &mut impl DbTxn, + block: &BlockFor, + key_for_change: KeyFor, + key: KeyFor, + coin: Coin, + ) -> Vec> { + let mut eventualities = vec![]; + + let mut operating_costs = Db::::operating_costs(txn, coin).0; + let mut outputs = Db::::outputs(txn, key, coin).unwrap(); + while outputs.len() > P::MAX_INPUTS { + let to_aggregate = outputs.drain(.. P::MAX_INPUTS).collect::>(); + Db::::set_outputs(txn, key, coin, &outputs); + + let Some(planned) = P::plan_transaction_with_fee_amortization( + &mut operating_costs, + P::fee_rate(block, coin), + to_aggregate, + vec![], + Some(key_for_change), + ) else { + continue; + }; + + TransactionsToSign::::send(txn, &key, &planned.signable); + eventualities.push(planned.eventuality); + Self::accumulate_outputs(txn, planned.auxilliary.0, false); + + // Reload the outputs for the next loop iteration + outputs = Db::::outputs(txn, key, coin).unwrap(); + } + + Db::::set_operating_costs(txn, coin, Amount(operating_costs)); + eventualities + } + + fn fulfillable_payments( + txn: &mut impl DbTxn, + operating_costs: &mut u64, + key: KeyFor, + coin: Coin, + value_of_outputs: u64, + ) -> Vec>> { + // Fetch all payments for this key + let mut payments = Db::::queued_payments(txn, key, coin).unwrap(); + if payments.is_empty() { + return vec![]; + } + + loop { + // inputs must be >= (payments - operating costs) + // Accordingly, (inputs + operating costs) must be >= payments + let value_fulfillable = value_of_outputs + *operating_costs; + + // Drop to just the payments we can currently fulfill + { + let mut can_handle = 0; + let mut value_used = 0; + for payment in &payments { + value_used += payment.balance().amount.0; + if value_fulfillable < value_used { + break; + } + can_handle += 1; + } + + let remaining_payments = payments.drain(can_handle ..).collect::>(); + // Restore the rest to the database + Db::::set_queued_payments(txn, key, coin, &remaining_payments); + } + + // If these payments are worth less than the operating costs, immediately drop them + let payments_value = payments.iter().map(|payment| payment.balance().amount.0).sum::(); + if payments_value <= *operating_costs { + *operating_costs -= payments_value; + Db::::set_operating_costs(txn, coin, Amount(*operating_costs)); + + // Reset payments to the queued payments + payments = Db::::queued_payments(txn, key, coin).unwrap(); + // If there's no more payments, stop looking for which payments we should fulfill + if payments.is_empty() { + return vec![]; + } + // Find which of these we should handle + continue; + } + + return payments; + } + } + + fn step( txn: &mut impl DbTxn, active_keys: &[(KeyFor, LifetimeStage)], - fee_rates: &HashMap, + block: &BlockFor, key: KeyFor, ) -> Vec> { let mut eventualities = vec![]; - let mut accumulate_outputs = |txn, outputs: Vec>| { - let mut outputs_by_key = HashMap::new(); - for output in outputs { - Db::::set_already_accumulated_output(txn, output.id()); - let coin = output.balance().coin; - outputs_by_key - .entry((output.key().to_bytes().as_ref().to_vec(), coin)) - .or_insert_with(|| (output.key(), Db::::outputs(txn, output.key(), coin).unwrap())) - .1 - .push(output); + let key_for_change = match active_keys[0].1 { + LifetimeStage::ActiveYetNotReporting => { + panic!("expected to fulfill payments despite not reporting for the oldest key") } - for ((_key_vec, coin), (key, outputs)) in outputs_by_key { - Db::::set_outputs(txn, key, coin, &outputs); + LifetimeStage::Active => active_keys[0].0, + LifetimeStage::UsingNewForChange | LifetimeStage::Forwarding | LifetimeStage::Finishing => { + active_keys[1].0 } }; + let branch_address = P::branch_address(key); - for coin in S::NETWORK.coins() { - // Fetch our operating costs and all our outputs - let mut operating_costs = Db::::operating_costs(txn, *coin).0; - let mut outputs = Db::::outputs(txn, key, *coin).unwrap(); + 'coin: for coin in S::NETWORK.coins() { + let coin = *coin; - // If we have more than the maximum amount of inputs, aggregate until we don't - { - while outputs.len() > P::MAX_INPUTS { + // Perform any input aggregation we should + eventualities.append(&mut Self::aggregate_inputs(txn, block, key_for_change, key, coin)); + + // Fetch the operating costs/outputs + let mut operating_costs = Db::::operating_costs(txn, coin).0; + let outputs = Db::::outputs(txn, key, coin).unwrap(); + + // Fetch the fulfillable payments + let payments = Self::fulfillable_payments( + txn, + &mut operating_costs, + key, + coin, + outputs.iter().map(|output| output.balance().amount.0).sum(), + ); + if payments.is_empty() { + continue; + } + + // If this is our only key, we should be able to fulfill all payments + // Else, we'd be insolvent + if active_keys.len() == 1 { + assert!(Db::::queued_payments(txn, key, coin).unwrap().is_empty()); + } + + // Create a tree to fulfillthe payments + // This variable is for the current layer of the tree being built + let mut tree = Vec::with_capacity(payments.len().div_ceil(P::MAX_OUTPUTS)); + + // Push the branches for the leaves (the payments out) + for payments in payments.chunks(P::MAX_OUTPUTS) { + let value = payments.iter().map(|payment| payment.balance().amount.0).sum::(); + tree.push(TreeTransaction::::Leaves { payments: payments.to_vec(), value }); + } + + // While we haven't calculated a tree root, or the tree root doesn't support a change output, + // keep working + while (tree.len() != 1) || (tree[0].children() == P::MAX_OUTPUTS) { + let mut branch_layer = vec![]; + for children in tree.chunks(P::MAX_OUTPUTS) { + branch_layer.push(TreeTransaction::::Branch { + children: children.to_vec(), + value: children.iter().map(TreeTransaction::value).sum(), + }); + } + tree = branch_layer; + } + assert_eq!(tree.len(), 1); + assert!((tree[0].children() + 1) <= P::MAX_OUTPUTS); + + // Create the transaction for the root of the tree + let mut branch_outputs = { + // Try creating this transaction twice, once with a change output and once with increased + // operating costs to ensure a change output (as necessary to meet the requirements of the + // scanner API) + let mut planned_outer = None; + for i in 0 .. 2 { let Some(planned) = P::plan_transaction_with_fee_amortization( &mut operating_costs, - fee_rates[coin], - outputs.drain(.. P::MAX_INPUTS).collect::>(), - vec![], + P::fee_rate(block, coin), + outputs.clone(), + tree[0] + .payments(coin, &branch_address, tree[0].value()) + .expect("payments were dropped despite providing an input of the needed value"), Some(key_for_change), ) else { - // We amortized all payments, and even when just trying to make the change output, these - // inputs couldn't afford their own aggregation and were written off - Db::::set_operating_costs(txn, *coin, Amount(operating_costs)); + // This should trip on the first iteration or not at all + assert_eq!(i, 0); + // This doesn't have inputs even worth aggregating so drop the entire tree + Db::::set_operating_costs(txn, coin, Amount(operating_costs)); + continue 'coin; + }; + + // If this doesn't have a change output, increase operating costs and try again + if !planned.auxilliary.0.iter().any(|output| output.kind() == OutputType::Change) { + /* + Since we'll create a change output if it's worth at least dust, amortizing dust from + the payments should solve this. If the new transaction can't afford those operating + costs, then the payments should be amortized out, causing there to be a change or no + transaction at all. + */ + operating_costs += S::dust(coin).0; + continue; + } + + // Since this had a change output, move forward with it + planned_outer = Some(planned); + break; + } + let Some(mut planned) = planned_outer else { + panic!("couldn't create a tree root with a change output") + }; + Db::::set_operating_costs(txn, coin, Amount(operating_costs)); + TransactionsToSign::::send(txn, &key, &planned.signable); + eventualities.push(planned.eventuality); + + // We accumulate the change output, but not the branches as we'll consume them momentarily + Self::accumulate_outputs( + txn, + planned + .auxilliary + .0 + .iter() + .filter(|output| output.kind() == OutputType::Change) + .cloned() + .collect(), + false, + ); + planned.auxilliary.0.retain(|output| output.kind() == OutputType::Branch); + planned.auxilliary.0 + }; + + // Now execute each layer of the tree + tree = match tree.remove(0) { + TreeTransaction::Leaves { .. } => vec![], + TreeTransaction::Branch { children, .. } => children, + }; + while !tree.is_empty() { + // Sort the branch outputs by their value + branch_outputs.sort_by_key(|a| a.balance().amount.0); + // Sort the transactions we should create by their value so they share an order with the + // branch outputs + tree.sort_by_key(TreeTransaction::value); + + // If we dropped any Branch outputs, drop the associated children + tree.truncate(branch_outputs.len()); + assert_eq!(branch_outputs.len(), tree.len()); + + let branch_outputs_for_this_layer = branch_outputs; + let this_layer = tree; + branch_outputs = vec![]; + tree = vec![]; + + for (branch_output, tx) in branch_outputs_for_this_layer.into_iter().zip(this_layer) { + assert_eq!(branch_output.kind(), OutputType::Branch); + + let Some(payments) = tx.payments(coin, &branch_address, branch_output.balance().amount.0) + else { + // If this output has become too small to satisfy this branch, drop it continue; }; - // Send the transactions off for signing - TransactionsToSign::::send(txn, &key, &planned.signable); - // Push the Eventualities onto the result - eventualities.push(planned.eventuality); - // Accumulate the outputs - Db::set_outputs(txn, key, *coin, &outputs); - accumulate_outputs(txn, planned.auxilliary.0); - outputs = Db::outputs(txn, key, *coin).unwrap(); - } - Db::::set_operating_costs(txn, *coin, Amount(operating_costs)); - } - - // Now, handle the payments - let mut payments = Db::::queued_payments(txn, key, *coin).unwrap(); - if payments.is_empty() { - continue; - } - - // If this is our only key, our outputs and operating costs should be greater than the - // payments' value - if active_keys.len() == 1 { - // The available amount to fulfill is the amount we have plus the amount we'll reduce by - // An alternative formulation would be `outputs >= (payments - operating costs)`, but - // that'd risk underflow - let value_available = - operating_costs + outputs.iter().map(|output| output.balance().amount.0).sum::(); - - assert!( - value_available >= payments.iter().map(|payment| payment.balance().amount.0).sum::() - ); - } - - // Find the set of payments we should fulfill at this time - loop { - let value_available = - operating_costs + outputs.iter().map(|output| output.balance().amount.0).sum::(); - - // Drop to just the payments we currently have the outputs for - { - let mut can_handle = 0; - let mut value_used = 0; - for payment in payments { - value_used += payment.balance().amount.0; - if value_available < value_used { - break; - } - can_handle += 1; - } - - let remaining_payments = payments.drain(can_handle ..).collect::>(); - // Restore the rest to the database - Db::::set_queued_payments(txn, key, *coin, &remaining_payments); - } - let payments_value = payments.iter().map(|payment| payment.balance().amount.0).sum::(); - - // If these payments are worth less than the operating costs, immediately drop them - if payments_value <= operating_costs { - operating_costs -= payments_value; - Db::::set_operating_costs(txn, *coin, Amount(operating_costs)); - - // Reset payments to the queued payments - payments = Db::::queued_payments(txn, key, *coin).unwrap(); - // If there's no more payments, stop looking for which payments we should fulfill - if payments.is_empty() { - break; - } - - // Find which of these we should handle - continue; - } - - break; - } - if payments.is_empty() { - continue; - } - - // Create a tree to fulfill all of the payments - #[derive(Clone)] - struct TreeTransaction { - payments: Vec>>, - children: Vec>, - value: u64, - } - let mut tree_transactions = vec![]; - for payments in payments.chunks(P::MAX_OUTPUTS) { - let value = payments.iter().map(|payment| payment.balance().amount.0).sum::(); - tree_transactions.push(TreeTransaction:: { - payments: payments.to_vec(), - children: vec![], - value, - }); - } - // While we haven't calculated a tree root, or the tree root doesn't support a change output, - // keep working - while (tree_transactions.len() != 1) || - (tree_transactions[0].payments.len() == P::MAX_OUTPUTS) - { - let mut next_tree_transactions = vec![]; - for children in tree_transactions.chunks(P::MAX_OUTPUTS) { - // If this is the last chunk, and it doesn't need to accumulated, continue - if (children.len() < P::MAX_OUTPUTS) && - ((next_tree_transactions.len() + children.len()) < P::MAX_OUTPUTS) - { - for child in children { - next_tree_transactions.push(child.clone()); - } - continue; - } - - let payments = children - .iter() - .map(|child| { - Payment::new( - P::branch_address(key), - Balance { coin: *coin, amount: Amount(child.value) }, - None, - ) - }) - .collect(); - let value = children.iter().map(|child| child.value).sum(); - next_tree_transactions.push(TreeTransaction { - payments, - children: children.to_vec(), - value, - }); - } - tree_transactions = next_tree_transactions; - } - - // This is recursive, yet only recurses with logarithmic depth - fn execute_tree_transaction< - S: ScannerFeed, - P: TransactionPlanner>, - >( - txn: &mut impl DbTxn, - fee_rate: P::FeeRate, - eventualities: &mut Vec>, - key: KeyFor, - mut branch_outputs: Vec>, - mut children: Vec>, - ) { - assert_eq!(branch_outputs.len(), children.len()); - - // Sort the branch outputs by their value - branch_outputs.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0)); - // Find the child for each branch output - // This is only done within a transaction, not across the layer, so we don't have branches - // created in transactions with less outputs (and therefore less fees) jump places with - // other branches - children.sort_by(|a, b| a.value.cmp(&b.value)); - - for (branch_output, mut child) in branch_outputs.into_iter().zip(children) { - assert_eq!(branch_output.kind(), OutputType::Branch); - Db::::set_already_accumulated_output(txn, branch_output.id()); - - // We need to compensate for the value of this output being less than the value of the - // payments - { - let fee_to_amortize = child.value - branch_output.balance().amount.0; - let mut amortized = 0; - 'outer: while (!child.payments.is_empty()) && (amortized < fee_to_amortize) { - let adjusted_fee = fee_to_amortize - amortized; - let payments_len = u64::try_from(child.payments.len()).unwrap(); - let per_payment_fee_check = adjusted_fee.div_ceil(payments_len); - - let mut i = 0; - while i < child.payments.len() { - let amount = child.payments[i].balance().amount.0; - if amount <= per_payment_fee_check { - child.payments.swap_remove(i); - child.children.swap_remove(i); - amortized += amount; - continue 'outer; - } - i += 1; - } - - // Since all payments can pay the fee, deduct accordingly - for (i, payment) in child.payments.iter_mut().enumerate() { - let Balance { coin, amount } = payment.balance(); - let mut amount = amount.0; - amount -= adjusted_fee / payments_len; - if i < usize::try_from(adjusted_fee % payments_len).unwrap() { - amount -= 1; - } - - *payment = Payment::new( - payment.address().clone(), - Balance { coin, amount: Amount(amount) }, - None, - ); - } - } - if child.payments.is_empty() { - continue; - } - } - - let Some(planned) = P::plan_transaction_with_fee_amortization( + let branch_output_id = branch_output.id(); + let Some(mut planned) = P::plan_transaction_with_fee_amortization( // Uses 0 as there's no operating costs to incur/amortize here &mut 0, - fee_rate, + P::fee_rate(block, coin), vec![branch_output], - child.payments, + payments, None, ) else { // This Branch isn't viable, so drop it (and its children) continue; }; + // Since we've made a TX spending this output, don't accumulate it later + Db::::set_already_accumulated_output(txn, &branch_output_id); TransactionsToSign::::send(txn, &key, &planned.signable); eventualities.push(planned.eventuality); - if !child.children.is_empty() { - execute_tree_transaction::( - txn, - fee_rate, - eventualities, - key, - planned.auxilliary.0, - child.children, - ); + + match tx { + TreeTransaction::Leaves { .. } => {} + // If this was a branch, handle its children + TreeTransaction::Branch { mut children, .. } => { + branch_outputs.append(&mut planned.auxilliary.0); + tree.append(&mut children); + } } } } - - assert_eq!(tree_transactions.len(), 1); - assert!((tree_transactions[0].payments.len() + 1) <= P::MAX_OUTPUTS); - - // Create the transaction for the root of the tree - let Some(planned) = P::plan_transaction_with_fee_amortization( - &mut operating_costs, - fee_rates[coin], - outputs, - tree_transactions[0].payments, - Some(key_for_change), - ) else { - Db::::set_operating_costs(txn, *coin, Amount(operating_costs)); - continue; - }; - TransactionsToSign::::send(txn, &key, &planned.signable); - eventualities.push(planned.eventuality); - - // We accumulate the change output, but consume the branches here - accumulate_outputs( - txn, - planned - .auxilliary - .0 - .iter() - .filter(|output| output.kind() == OutputType::Change) - .cloned() - .collect(), - ); - // Filter the outputs to the change outputs - let mut branch_outputs = planned.auxilliary.0; - branch_outputs.retain(|output| output.kind() == OutputType::Branch); - - if !tree_transactions[0].children.is_empty() { - execute_tree_transaction::( - txn, - fee_rates[coin], - &mut eventualities, - key, - branch_outputs, - tree_transactions[0].children, - ); - } } eventualities @@ -355,16 +459,21 @@ impl>> Sched impl>> SchedulerTrait for Scheduler { - fn activate_key(&mut self, txn: &mut impl DbTxn, key: KeyFor) { + fn activate_key(txn: &mut impl DbTxn, key: KeyFor) { for coin in S::NETWORK.coins() { assert!(Db::::outputs(txn, key, *coin).is_none()); Db::::set_outputs(txn, key, *coin, &[]); assert!(Db::::queued_payments(txn, key, *coin).is_none()); - Db::::set_queued_payments(txn, key, *coin, &vec![]); + Db::::set_queued_payments(txn, key, *coin, &[]); } } - fn flush_key(&mut self, txn: &mut impl DbTxn, retiring_key: KeyFor, new_key: KeyFor) { + fn flush_key( + txn: &mut impl DbTxn, + _block: &BlockFor, + retiring_key: KeyFor, + new_key: KeyFor, + ) { for coin in S::NETWORK.coins() { let still_queued = Db::::queued_payments(txn, retiring_key, *coin).unwrap(); let mut new_queued = Db::::queued_payments(txn, new_key, *coin).unwrap(); @@ -372,12 +481,14 @@ impl>> Sched let mut queued = still_queued; queued.append(&mut new_queued); - Db::::set_queued_payments(txn, retiring_key, *coin, &vec![]); + Db::::set_queued_payments(txn, retiring_key, *coin, &[]); Db::::set_queued_payments(txn, new_key, *coin, &queued); + + // TODO: Forward all existing outputs } } - fn retire_key(&mut self, txn: &mut impl DbTxn, key: KeyFor) { + fn retire_key(txn: &mut impl DbTxn, key: KeyFor) { for coin in S::NETWORK.coins() { assert!(Db::::outputs(txn, key, *coin).unwrap().is_empty()); Db::::del_outputs(txn, key, *coin); @@ -387,48 +498,18 @@ impl>> Sched } fn update( - &mut self, txn: &mut impl DbTxn, block: &BlockFor, active_keys: &[(KeyFor, LifetimeStage)], update: SchedulerUpdate, ) -> HashMap, Vec>> { - // Accumulate all the outputs - for (key, _) in active_keys { - // Accumulate them in memory - let mut outputs_by_coin = HashMap::with_capacity(1); - for output in update.outputs().iter().filter(|output| output.key() == *key) { - match output.kind() { - OutputType::External | OutputType::Forwarded => {} - // Only accumulate these if we haven't already - OutputType::Branch | OutputType::Change => { - if Db::::take_if_already_accumulated_output(txn, output.id()) { - continue; - } - } - } - let coin = output.balance().coin; - if let std::collections::hash_map::Entry::Vacant(e) = outputs_by_coin.entry(coin) { - e.insert(Db::::outputs(txn, *key, coin).unwrap()); - } - outputs_by_coin.get_mut(&coin).unwrap().push(output.clone()); - } - - // Flush them to the database - for (coin, outputs) in outputs_by_coin { - Db::::set_outputs(txn, *key, coin, &outputs); - } - } - - let fee_rates = block.fee_rates(); + Self::accumulate_outputs(txn, update.outputs().to_vec(), true); // Fulfill the payments we prior couldn't let mut eventualities = HashMap::new(); for (key, _stage) in active_keys { - eventualities.insert( - key.to_bytes().as_ref().to_vec(), - self.handle_queued_payments(txn, active_keys, fee_rates, *key), - ); + eventualities + .insert(key.to_bytes().as_ref().to_vec(), Self::step(txn, active_keys, block, *key)); } // TODO: If this key has been flushed, forward all outputs @@ -448,7 +529,7 @@ impl>> Sched // This uses 0 for the operating costs as we don't incur any here // If the output can't pay for itself to be forwarded, we simply drop it &mut 0, - fee_rates[&forward.balance().coin], + P::fee_rate(block, forward.balance().coin), vec![forward.clone()], vec![Payment::new(P::forwarding_address(forward_to_key), forward.balance(), None)], None, @@ -465,7 +546,7 @@ impl>> Sched // This uses 0 for the operating costs as we don't incur any here // If the output can't pay for itself to be returned, we simply drop it &mut 0, - fee_rates[&out_instruction.balance().coin], + P::fee_rate(block, out_instruction.balance().coin), vec![to_return.output().clone()], vec![out_instruction], None, @@ -480,7 +561,7 @@ impl>> Sched TransactionsToSign::::send(txn, &key, &planned_tx.signable); // Insert the Eventualities into the result - eventualities[key.to_bytes().as_ref()].push(planned_tx.eventuality); + eventualities.get_mut(key.to_bytes().as_ref()).unwrap().push(planned_tx.eventuality); } eventualities @@ -488,11 +569,10 @@ impl>> Sched } fn fulfill( - &mut self, txn: &mut impl DbTxn, block: &BlockFor, active_keys: &[(KeyFor, LifetimeStage)], - mut payments: Vec>>, + payments: Vec>>, ) -> HashMap, Vec>> { // Find the key to filfill these payments with let fulfillment_key = match active_keys[0].1 { @@ -514,7 +594,7 @@ impl>> Sched // Handle the queued payments HashMap::from([( fulfillment_key.to_bytes().as_ref().to_vec(), - self.handle_queued_payments(txn, active_keys, block.fee_rates(), fulfillment_key), + Self::step(txn, active_keys, block, fulfillment_key), )]) } }