Work on the tree logic in the transaction-chaining scheduler

This commit is contained in:
Luke Parker 2024-09-03 18:51:27 -04:00
parent ebef38d93b
commit 0601d47789
2 changed files with 193 additions and 109 deletions

View file

@ -79,7 +79,8 @@ pub trait TransactionPlanner<S: ScannerFeed, A>: 'static + Send + Sync {
///
/// `operating_costs` is accrued to if Serai faces the burden of a fee or drops inputs not worth
/// accumulating. `operating_costs` will be amortized along with this transaction's fee as
/// possible. Please see `spec/processor/UTXO Management.md` for more information.
/// possible, if there is a change output. Please see `spec/processor/UTXO Management.md` for
/// more information.
///
/// Returns `None` if the fee exceeded the inputs, or `Some` otherwise.
fn plan_transaction_with_fee_amortization(
@ -89,6 +90,12 @@ pub trait TransactionPlanner<S: ScannerFeed, A>: 'static + Send + Sync {
mut payments: Vec<Payment<AddressFor<S>>>,
mut change: Option<KeyFor<S>>,
) -> Option<PlannedTransaction<S, Self::SignableTransaction, A>> {
// If there's no change output, we can't recoup any operating costs we would amortize
// We also don't have any losses if the inputs are written off/the change output is reduced
let mut operating_costs_if_no_change = 0;
let operating_costs_in_effect =
if change.is_none() { &mut operating_costs_if_no_change } else { operating_costs };
// Sanity checks
{
assert!(!inputs.is_empty());
@ -101,7 +108,8 @@ pub trait TransactionPlanner<S: ScannerFeed, A>: 'static + Send + Sync {
assert_eq!(coin, payment.balance().coin);
}
assert!(
(inputs.iter().map(|input| input.balance().amount.0).sum::<u64>() + *operating_costs) >=
(inputs.iter().map(|input| input.balance().amount.0).sum::<u64>() +
*operating_costs_in_effect) >=
payments.iter().map(|payment| payment.balance().amount.0).sum::<u64>(),
"attempted to fulfill payments without a sufficient input set"
);
@ -119,7 +127,7 @@ pub trait TransactionPlanner<S: ScannerFeed, A>: 'static + Send + Sync {
while !payments.is_empty() {
// We need to pay the fee, and any accrued operating costs, minus what we've already
// amortized
let adjusted_fee = (*operating_costs + fee).saturating_sub(amortized);
let adjusted_fee = (*operating_costs_in_effect + fee).saturating_sub(amortized);
/*
Ideally, we wouldn't use a ceil div yet would be accurate about it. Any remainder could
@ -154,16 +162,16 @@ pub trait TransactionPlanner<S: ScannerFeed, A>: 'static + Send + Sync {
// dust
if inputs < (fee + S::dust(coin).0) {
// Write off these inputs
*operating_costs += inputs;
*operating_costs_in_effect += inputs;
// Yet also claw back the payments we dropped, as we only lost the change
// The dropped payments will be worth less than the inputs + operating_costs we started
// with, so this shouldn't use `saturating_sub`
*operating_costs -= amortized;
*operating_costs_in_effect -= amortized;
None?;
}
} else {
// Since we have payments which can pay the fee we ended up with, amortize it
let adjusted_fee = (*operating_costs + fee).saturating_sub(amortized);
let adjusted_fee = (*operating_costs_in_effect + fee).saturating_sub(amortized);
let per_payment_base_fee = adjusted_fee / u64::try_from(payments.len()).unwrap();
let payments_paying_one_atomic_unit_more =
usize::try_from(adjusted_fee % u64::try_from(payments.len()).unwrap()).unwrap();
@ -174,7 +182,7 @@ pub trait TransactionPlanner<S: ScannerFeed, A>: 'static + Send + Sync {
payment.balance().amount.0 -= per_payment_fee;
amortized += per_payment_fee;
}
assert!(amortized >= (*operating_costs + fee));
assert!(amortized >= (*operating_costs_in_effect + fee));
// If the change is less than the dust, drop it
let would_be_change = inputs.iter().map(|input| input.balance().amount.0).sum::<u64>() -
@ -182,12 +190,12 @@ pub trait TransactionPlanner<S: ScannerFeed, A>: 'static + Send + Sync {
fee;
if would_be_change < S::dust(coin).0 {
change = None;
*operating_costs += would_be_change;
*operating_costs_in_effect += would_be_change;
}
}
// Update the amount of operating costs
*operating_costs = (*operating_costs + fee).saturating_sub(amortized);
*operating_costs_in_effect = (*operating_costs_in_effect + fee).saturating_sub(amortized);
}
// Because we amortized, or accrued as operating costs, the fee, make the transaction

View file

@ -7,7 +7,7 @@ use std::collections::HashMap;
use group::GroupEncoding;
use serai_primitives::{Coin, Amount};
use serai_primitives::{Coin, Amount, Balance};
use serai_db::DbTxn;
@ -41,12 +41,56 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
) -> Vec<EventualityFor<S>> {
let mut eventualities = vec![];
let mut accumulate_outputs = |txn, outputs: Vec<OutputFor<S>>| {
let mut outputs_by_key = HashMap::new();
for output in outputs {
Db::<S>::set_already_accumulated_output(txn, output.id());
let coin = output.balance().coin;
outputs_by_key
.entry((output.key().to_bytes().as_ref().to_vec(), coin))
.or_insert_with(|| (output.key(), Db::<S>::outputs(txn, output.key(), coin).unwrap()))
.1
.push(output);
}
for ((_key_vec, coin), (key, outputs)) in outputs_by_key {
Db::<S>::set_outputs(txn, key, coin, &outputs);
}
};
for coin in S::NETWORK.coins() {
// Fetch our operating costs and all our outputs
let mut operating_costs = Db::<S>::operating_costs(txn, *coin).0;
let mut outputs = Db::<S>::outputs(txn, key, *coin).unwrap();
// Fetch the queued payments
// If we have more than the maximum amount of inputs, aggregate until we don't
{
while outputs.len() > MAX_INPUTS {
let Some(planned) = P::plan_transaction_with_fee_amortization(
&mut operating_costs,
fee_rates[coin],
outputs.drain(.. MAX_INPUTS).collect::<Vec<_>>(),
vec![],
Some(key_for_change),
) else {
// We amortized all payments, and even when just trying to make the change output, these
// inputs couldn't afford their own aggregation and were written off
Db::<S>::set_operating_costs(txn, *coin, Amount(operating_costs));
continue;
};
// Send the transactions off for signing
TransactionsToSign::<P::SignableTransaction>::send(txn, &key, &planned.signable);
// Push the Eventualities onto the result
eventualities.push(planned.eventuality);
// Accumulate the outputs
Db::set_outputs(txn, key, *coin, &outputs);
accumulate_outputs(txn, planned.auxilliary.0);
outputs = Db::outputs(txn, key, *coin).unwrap();
}
Db::<S>::set_operating_costs(txn, *coin, Amount(operating_costs));
}
// Now, handle the payments
let mut payments = Db::<S>::queued_payments(txn, key, *coin).unwrap();
if payments.is_empty() {
continue;
@ -55,21 +99,24 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
// If this is our only key, our outputs and operating costs should be greater than the
// payments' value
if active_keys.len() == 1 {
// The available amount of fulfill is the amount we have plus the amount we'll reduce by
// The available amount to fulfill is the amount we have plus the amount we'll reduce by
// An alternative formulation would be `outputs >= (payments - operating costs)`, but
// that'd risk underflow
let available =
let value_available =
operating_costs + outputs.iter().map(|output| output.balance().amount.0).sum::<u64>();
assert!(
available >= payments.iter().map(|payment| payment.balance().amount.0).sum::<u64>()
value_available >= payments.iter().map(|payment| payment.balance().amount.0).sum::<u64>()
);
}
let amount_of_payments_that_can_be_handled =
|operating_costs: u64, outputs: &[_], payments: &[_]| {
let value_available =
operating_costs + outputs.iter().map(|output| output.balance().amount.0).sum::<u64>();
// Find the set of payments we should fulfill at this time
loop {
let value_available =
operating_costs + outputs.iter().map(|output| output.balance().amount.0).sum::<u64>();
// Drop to just the payments we currently have the outputs for
{
let mut can_handle = 0;
let mut value_used = 0;
for payment in payments {
@ -80,15 +127,6 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
can_handle += 1;
}
can_handle
};
// Find the set of payments we should fulfill at this time
{
// Drop to just the payments we currently have the outputs for
{
let can_handle =
amount_of_payments_that_can_be_handled(operating_costs, &outputs, &payments);
let remaining_payments = payments.drain(can_handle ..).collect::<Vec<_>>();
// Restore the rest to the database
Db::<S>::set_queued_payments(txn, key, *coin, &remaining_payments);
@ -99,96 +137,132 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
if payments_value <= operating_costs {
operating_costs -= payments_value;
Db::<S>::set_operating_costs(txn, *coin, Amount(operating_costs));
return vec![];
}
// We explicitly sort AFTER deciding which payments to handle so we always handle the
// oldest queued payments first (preventing any from eternally being shuffled to the back
// of the line)
payments.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0));
}
assert!(!payments.is_empty());
// Find the smallest set of outputs usable to fulfill these outputs
// Size is determined by the largest output, not quantity nor aggregate value
{
// We start by sorting low to high
outputs.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0));
let value_needed =
payments.iter().map(|payment| payment.balance().amount.0).sum::<u64>() - operating_costs;
let mut needed = 0;
let mut value_present = 0;
for output in &outputs {
needed += 1;
value_present += output.balance().amount.0;
if value_present >= value_needed {
// Reset payments to the queued payments
payments = Db::<S>::queued_payments(txn, key, *coin).unwrap();
// If there's no more payments, stop looking for which payments we should fulfill
if payments.is_empty() {
break;
}
}
// Drain, and save back to the DB, the unnecessary outputs
let remaining_outputs = outputs.drain(needed ..).collect::<Vec<_>>();
Db::<S>::set_outputs(txn, key, *coin, &remaining_outputs);
}
assert!(!outputs.is_empty());
// We now have the current operating costs, the outputs we're using, and the payments
// The database has the unused outputs/unfilfillable payments
// Actually plan/send off the transactions
// While our set of outputs exceed the input limit, aggregate them
while outputs.len() > MAX_INPUTS {
let outputs_chunk = outputs.drain(.. MAX_INPUTS).collect::<Vec<_>>();
// While we're aggregating these outputs, handle any payments we can
let payments_chunk = loop {
let can_handle =
amount_of_payments_that_can_be_handled(operating_costs, &outputs, &payments);
let payments_chunk = payments.drain(.. can_handle.min(MAX_OUTPUTS)).collect::<Vec<_>>();
let payments_value =
payments_chunk.iter().map(|payment| payment.balance().amount.0).sum::<u64>();
if payments_value <= operating_costs {
operating_costs -= payments_value;
continue;
}
break payments_chunk;
};
let Some(planned) = P::plan_transaction_with_fee_amortization(
&mut operating_costs,
fee_rates[coin],
outputs_chunk,
payments_chunk,
// We always use our key for the change here since we may need this change output to
// finish fulfilling these payments
Some(key),
) else {
// We amortized all payments, and even when just trying to make the change output, these
// inputs couldn't afford their own aggregation and were written off
// Find which of these we should handle
continue;
};
// Send the transactions off for signing
TransactionsToSign::<P::SignableTransaction>::send(txn, &key, &planned.signable);
// Push the Eventualities onto the result
eventualities.push(planned.eventuality);
let mut effected_received_outputs = planned.auxilliary.0;
// Only handle Change so if someone burns to an External address, we don't use it here
// when the scanner will tell us to return it (without accumulating it)
effected_received_outputs.retain(|output| output.kind() == OutputType::Change);
for output in &effected_received_outputs {
Db::<S>::set_already_accumulated_output(txn, output.id());
}
outputs.append(&mut effected_received_outputs);
break;
}
if payments.is_empty() {
continue;
}
// Now that we have an aggregated set of inputs, create the tree for payments
todo!("TODO");
// Create a tree to fulfill all of the payments
struct TreeTransaction<S: ScannerFeed> {
payments: Vec<Payment<AddressFor<S>>>,
children: Vec<TreeTransaction<S>>,
value: u64,
}
let mut tree_transactions = vec![];
for payments in payments.chunks(MAX_OUTPUTS) {
let value = payments.iter().map(|payment| payment.balance().amount.0).sum::<u64>();
tree_transactions.push(TreeTransaction::<S> {
payments: payments.to_vec(),
children: vec![],
value,
});
}
// While we haven't calculated a tree root, or the tree root doesn't support a change output,
// keep working
while (tree_transactions.len() != 1) || (tree_transactions[0].payments.len() == MAX_OUTPUTS) {
let mut next_tree_transactions = vec![];
for children in tree_transactions.chunks(MAX_OUTPUTS) {
let payments = children
.iter()
.map(|child| {
Payment::new(
P::branch_address(key),
Balance { coin: *coin, amount: Amount(child.value) },
None,
)
})
.collect();
let value = children.iter().map(|child| child.value).sum();
next_tree_transactions.push(TreeTransaction {
payments,
children: children.to_vec(),
value,
});
}
tree_transactions = next_tree_transactions;
}
assert_eq!(tree_transactions.len(), 1);
assert!((tree_transactions.payments.len() + 1) <= MAX_OUTPUTS);
// Create the transaction for the root of the tree
let Some(planned) = P::plan_transaction_with_fee_amortization(
&mut operating_costs,
fee_rates[coin],
outputs,
tree_transactions.payments,
Some(key_for_change),
) else {
Db::<S>::set_operating_costs(txn, *coin, Amount(operating_costs));
continue;
};
TransactionsToSign::<P::SignableTransaction>::send(txn, &key, &planned.signable);
eventualities.push(planned.eventuality);
// We accumulate the change output, but consume the branches here
accumulate_outputs(
txn,
planned
.auxilliary
.0
.iter()
.filter(|output| output.kind() == OutputType::Change)
.cloned()
.collect(),
);
// Filter the outputs to the change outputs
let mut branch_outputs = planned.auxilliary.0;
branch_outputs.retain(|output| output.kind() == OutputType::Branch);
// This is recursive, yet only recurses with logarithmic depth
let execute_tree_transaction = |branch_outputs, children| {
assert_eq!(branch_outputs.len(), children.len());
// Sort the branch outputs by their value
branch_outputs.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0));
// Find the child for each branch output
// This is only done within a transaction, not across the layer, so we don't have branches
// created in transactions with less outputs (and therefore less fees) jump places with
// other branches
children.sort_by(|a, b| a.value.cmp(&b.value));
for (branch_output, child) in branch_outputs.into_iter().zip(children) {
assert_eq!(branch_output.kind(), OutputType::Branch);
Db::<S>::set_already_accumulated_output(txn, branch_output.id());
let Some(planned) = P::plan_transaction_with_fee_amortization(
// Uses 0 as there's no operating costs to incur/amortize here
&mut 0,
fee_rates[coin],
vec![branch_output],
child.payments,
None,
) else {
// This Branch isn't viable, so drop it (and its children)
continue;
};
TransactionsToSign::<P::SignableTransaction>::send(txn, &key, &planned.signable);
eventualities.push(planned.eventuality);
if !child.children.is_empty() {
execute_tree_transaction(planned.auxilliary.0, child.children);
}
}
};
if !tree_transaction.children.is_empty() {
execute_tree_transaction(branch_outputs, tree_transaction.children);
}
}
eventualities
@ -288,6 +362,7 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
let Some(plan) = P::plan_transaction_with_fee_amortization(
// This uses 0 for the operating costs as we don't incur any here
// If the output can't pay for itself to be forwarded, we simply drop it
&mut 0,
fee_rates[&forward.balance().coin],
vec![forward.clone()],
@ -304,6 +379,7 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
Payment::new(to_return.address().clone(), to_return.output().balance(), None);
let Some(plan) = P::plan_transaction_with_fee_amortization(
// This uses 0 for the operating costs as we don't incur any here
// If the output can't pay for itself to be returned, we simply drop it
&mut 0,
fee_rates[&out_instruction.balance().coin],
vec![to_return.output().clone()],