diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 539bd4a7..1818fbf0 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -277,6 +277,7 @@ pub trait Scheduler: 'static + Send { fn update( &mut self, txn: &mut impl DbTxn, + block: &BlockFor, active_keys: &[(KeyFor, LifetimeStage)], update: SchedulerUpdate, ) -> HashMap, Vec>>; @@ -316,6 +317,7 @@ pub trait Scheduler: 'static + Send { fn fulfill( &mut self, txn: &mut impl DbTxn, + block: &BlockFor, active_keys: &[(KeyFor, LifetimeStage)], payments: Vec>>, ) -> HashMap, Vec>>; diff --git a/processor/scheduler/utxo/primitives/src/lib.rs b/processor/scheduler/utxo/primitives/src/lib.rs index af8b985f..356192ee 100644 --- a/processor/scheduler/utxo/primitives/src/lib.rs +++ b/processor/scheduler/utxo/primitives/src/lib.rs @@ -35,6 +35,11 @@ pub trait TransactionPlanner: 'static + Send + Sync { /// The type representing a signable transaction. type SignableTransaction: SignableTransaction; + /// The maximum amount of inputs allowed in a transaction. + const MAX_INPUTS: usize; + /// The maximum amount of outputs allowed in a transaction, including the change output. + const MAX_OUTPUTS: usize; + /// Obtain the fee rate to pay. /// /// This must be constant to the finalized block referenced by this block number and the coin. diff --git a/processor/scheduler/utxo/transaction-chaining/src/lib.rs b/processor/scheduler/utxo/transaction-chaining/src/lib.rs index 8e567e14..31c70c1e 100644 --- a/processor/scheduler/utxo/transaction-chaining/src/lib.rs +++ b/processor/scheduler/utxo/transaction-chaining/src/lib.rs @@ -37,6 +37,7 @@ impl>> Sched &mut self, txn: &mut impl DbTxn, active_keys: &[(KeyFor, LifetimeStage)], + fee_rates: &HashMap, key: KeyFor, ) -> Vec> { let mut eventualities = vec![]; @@ -64,11 +65,11 @@ impl>> Sched // If we have more than the maximum amount of inputs, aggregate until we don't { - while outputs.len() > MAX_INPUTS { + while outputs.len() > P::MAX_INPUTS { let Some(planned) = P::plan_transaction_with_fee_amortization( &mut operating_costs, fee_rates[coin], - outputs.drain(.. MAX_INPUTS).collect::>(), + outputs.drain(.. P::MAX_INPUTS).collect::>(), vec![], Some(key_for_change), ) else { @@ -156,13 +157,14 @@ impl>> Sched } // Create a tree to fulfill all of the payments + #[derive(Clone)] struct TreeTransaction { payments: Vec>>, children: Vec>, value: u64, } let mut tree_transactions = vec![]; - for payments in payments.chunks(MAX_OUTPUTS) { + for payments in payments.chunks(P::MAX_OUTPUTS) { let value = payments.iter().map(|payment| payment.balance().amount.0).sum::(); tree_transactions.push(TreeTransaction:: { payments: payments.to_vec(), @@ -172,9 +174,21 @@ impl>> Sched } // While we haven't calculated a tree root, or the tree root doesn't support a change output, // keep working - while (tree_transactions.len() != 1) || (tree_transactions[0].payments.len() == MAX_OUTPUTS) { + while (tree_transactions.len() != 1) || + (tree_transactions[0].payments.len() == P::MAX_OUTPUTS) + { let mut next_tree_transactions = vec![]; - for children in tree_transactions.chunks(MAX_OUTPUTS) { + for children in tree_transactions.chunks(P::MAX_OUTPUTS) { + // If this is the last chunk, and it doesn't need to accumulated, continue + if (children.len() < P::MAX_OUTPUTS) && + ((next_tree_transactions.len() + children.len()) < P::MAX_OUTPUTS) + { + for child in children { + next_tree_transactions.push(child.clone()); + } + continue; + } + let payments = children .iter() .map(|child| { @@ -194,15 +208,111 @@ impl>> Sched } tree_transactions = next_tree_transactions; } + + // This is recursive, yet only recurses with logarithmic depth + fn execute_tree_transaction< + S: ScannerFeed, + P: TransactionPlanner>, + >( + txn: &mut impl DbTxn, + fee_rate: P::FeeRate, + eventualities: &mut Vec>, + key: KeyFor, + mut branch_outputs: Vec>, + mut children: Vec>, + ) { + assert_eq!(branch_outputs.len(), children.len()); + + // Sort the branch outputs by their value + branch_outputs.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0)); + // Find the child for each branch output + // This is only done within a transaction, not across the layer, so we don't have branches + // created in transactions with less outputs (and therefore less fees) jump places with + // other branches + children.sort_by(|a, b| a.value.cmp(&b.value)); + + for (branch_output, mut child) in branch_outputs.into_iter().zip(children) { + assert_eq!(branch_output.kind(), OutputType::Branch); + Db::::set_already_accumulated_output(txn, branch_output.id()); + + // We need to compensate for the value of this output being less than the value of the + // payments + { + let fee_to_amortize = child.value - branch_output.balance().amount.0; + let mut amortized = 0; + 'outer: while (!child.payments.is_empty()) && (amortized < fee_to_amortize) { + let adjusted_fee = fee_to_amortize - amortized; + let payments_len = u64::try_from(child.payments.len()).unwrap(); + let per_payment_fee_check = adjusted_fee.div_ceil(payments_len); + + let mut i = 0; + while i < child.payments.len() { + let amount = child.payments[i].balance().amount.0; + if amount <= per_payment_fee_check { + child.payments.swap_remove(i); + child.children.swap_remove(i); + amortized += amount; + continue 'outer; + } + i += 1; + } + + // Since all payments can pay the fee, deduct accordingly + for (i, payment) in child.payments.iter_mut().enumerate() { + let Balance { coin, amount } = payment.balance(); + let mut amount = amount.0; + amount -= adjusted_fee / payments_len; + if i < usize::try_from(adjusted_fee % payments_len).unwrap() { + amount -= 1; + } + + *payment = Payment::new( + payment.address().clone(), + Balance { coin, amount: Amount(amount) }, + None, + ); + } + } + if child.payments.is_empty() { + continue; + } + } + + let Some(planned) = P::plan_transaction_with_fee_amortization( + // Uses 0 as there's no operating costs to incur/amortize here + &mut 0, + fee_rate, + vec![branch_output], + child.payments, + None, + ) else { + // This Branch isn't viable, so drop it (and its children) + continue; + }; + TransactionsToSign::::send(txn, &key, &planned.signable); + eventualities.push(planned.eventuality); + if !child.children.is_empty() { + execute_tree_transaction::( + txn, + fee_rate, + eventualities, + key, + planned.auxilliary.0, + child.children, + ); + } + } + } + assert_eq!(tree_transactions.len(), 1); - assert!((tree_transactions.payments.len() + 1) <= MAX_OUTPUTS); + assert!((tree_transactions[0].payments.len() + 1) <= P::MAX_OUTPUTS); // Create the transaction for the root of the tree let Some(planned) = P::plan_transaction_with_fee_amortization( &mut operating_costs, fee_rates[coin], outputs, - tree_transactions.payments, + tree_transactions[0].payments, Some(key_for_change), ) else { Db::::set_operating_costs(txn, *coin, Amount(operating_costs)); @@ -226,42 +336,15 @@ impl>> Sched let mut branch_outputs = planned.auxilliary.0; branch_outputs.retain(|output| output.kind() == OutputType::Branch); - // This is recursive, yet only recurses with logarithmic depth - let execute_tree_transaction = |branch_outputs, children| { - assert_eq!(branch_outputs.len(), children.len()); - - // Sort the branch outputs by their value - branch_outputs.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0)); - // Find the child for each branch output - // This is only done within a transaction, not across the layer, so we don't have branches - // created in transactions with less outputs (and therefore less fees) jump places with - // other branches - children.sort_by(|a, b| a.value.cmp(&b.value)); - - for (branch_output, child) in branch_outputs.into_iter().zip(children) { - assert_eq!(branch_output.kind(), OutputType::Branch); - Db::::set_already_accumulated_output(txn, branch_output.id()); - - let Some(planned) = P::plan_transaction_with_fee_amortization( - // Uses 0 as there's no operating costs to incur/amortize here - &mut 0, - fee_rates[coin], - vec![branch_output], - child.payments, - None, - ) else { - // This Branch isn't viable, so drop it (and its children) - continue; - }; - TransactionsToSign::::send(txn, &key, &planned.signable); - eventualities.push(planned.eventuality); - if !child.children.is_empty() { - execute_tree_transaction(planned.auxilliary.0, child.children); - } - } - }; - if !tree_transaction.children.is_empty() { - execute_tree_transaction(branch_outputs, tree_transaction.children); + if !tree_transactions[0].children.is_empty() { + execute_tree_transaction::( + txn, + fee_rates[coin], + &mut eventualities, + key, + branch_outputs, + tree_transactions[0].children, + ); } } @@ -306,6 +389,7 @@ impl>> Sched fn update( &mut self, txn: &mut impl DbTxn, + block: &BlockFor, active_keys: &[(KeyFor, LifetimeStage)], update: SchedulerUpdate, ) -> HashMap, Vec>> { @@ -336,14 +420,14 @@ impl>> Sched } } - let mut fee_rates: HashMap = todo!("TODO"); + let fee_rates = block.fee_rates(); // Fulfill the payments we prior couldn't let mut eventualities = HashMap::new(); for (key, _stage) in active_keys { eventualities.insert( key.to_bytes().as_ref().to_vec(), - self.handle_queued_payments(txn, active_keys, *key), + self.handle_queued_payments(txn, active_keys, fee_rates, *key), ); } @@ -406,6 +490,7 @@ impl>> Sched fn fulfill( &mut self, txn: &mut impl DbTxn, + block: &BlockFor, active_keys: &[(KeyFor, LifetimeStage)], mut payments: Vec>>, ) -> HashMap, Vec>> { @@ -429,7 +514,7 @@ impl>> Sched // Handle the queued payments HashMap::from([( fulfillment_key.to_bytes().as_ref().to_vec(), - self.handle_queued_payments(txn, active_keys, fulfillment_key), + self.handle_queued_payments(txn, active_keys, block.fee_rates(), fulfillment_key), )]) } }