#![cfg_attr(docsrs, feature(doc_auto_cfg))] #![doc = include_str!("../README.md")] #![deny(missing_docs)] use core::marker::PhantomData; use std::collections::HashMap; use group::GroupEncoding; use serai_primitives::{Coin, Amount, Balance}; use serai_db::DbTxn; use primitives::{OutputType, ReceivedOutput, Payment}; use scanner::{ LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, SchedulerUpdate, Scheduler as SchedulerTrait, }; use scheduler_primitives::*; use utxo_scheduler_primitives::*; mod db; use db::Db; /// The outputs which will be effected by a PlannedTransaction and received by Serai. pub struct EffectedReceivedOutputs(Vec>); /// A scheduler of transactions for networks premised on the UTXO model which support /// transaction chaining. pub struct Scheduler>>( PhantomData, PhantomData

, ); impl>> Scheduler { fn handle_queued_payments( &mut self, txn: &mut impl DbTxn, active_keys: &[(KeyFor, LifetimeStage)], fee_rates: &HashMap, key: KeyFor, ) -> Vec> { let mut eventualities = vec![]; let mut accumulate_outputs = |txn, outputs: Vec>| { let mut outputs_by_key = HashMap::new(); for output in outputs { Db::::set_already_accumulated_output(txn, output.id()); let coin = output.balance().coin; outputs_by_key .entry((output.key().to_bytes().as_ref().to_vec(), coin)) .or_insert_with(|| (output.key(), Db::::outputs(txn, output.key(), coin).unwrap())) .1 .push(output); } for ((_key_vec, coin), (key, outputs)) in outputs_by_key { Db::::set_outputs(txn, key, coin, &outputs); } }; for coin in S::NETWORK.coins() { // Fetch our operating costs and all our outputs let mut operating_costs = Db::::operating_costs(txn, *coin).0; let mut outputs = Db::::outputs(txn, key, *coin).unwrap(); // If we have more than the maximum amount of inputs, aggregate until we don't { while outputs.len() > P::MAX_INPUTS { let Some(planned) = P::plan_transaction_with_fee_amortization( &mut operating_costs, fee_rates[coin], outputs.drain(.. P::MAX_INPUTS).collect::>(), vec![], Some(key_for_change), ) else { // We amortized all payments, and even when just trying to make the change output, these // inputs couldn't afford their own aggregation and were written off Db::::set_operating_costs(txn, *coin, Amount(operating_costs)); continue; }; // Send the transactions off for signing TransactionsToSign::::send(txn, &key, &planned.signable); // Push the Eventualities onto the result eventualities.push(planned.eventuality); // Accumulate the outputs Db::set_outputs(txn, key, *coin, &outputs); accumulate_outputs(txn, planned.auxilliary.0); outputs = Db::outputs(txn, key, *coin).unwrap(); } Db::::set_operating_costs(txn, *coin, Amount(operating_costs)); } // Now, handle the payments let mut payments = Db::::queued_payments(txn, key, *coin).unwrap(); if payments.is_empty() { continue; } // If this is our only key, our outputs and operating costs should be greater than the // payments' value if active_keys.len() == 1 { // The available amount to fulfill is the amount we have plus the amount we'll reduce by // An alternative formulation would be `outputs >= (payments - operating costs)`, but // that'd risk underflow let value_available = operating_costs + outputs.iter().map(|output| output.balance().amount.0).sum::(); assert!( value_available >= payments.iter().map(|payment| payment.balance().amount.0).sum::() ); } // Find the set of payments we should fulfill at this time loop { let value_available = operating_costs + outputs.iter().map(|output| output.balance().amount.0).sum::(); // Drop to just the payments we currently have the outputs for { let mut can_handle = 0; let mut value_used = 0; for payment in payments { value_used += payment.balance().amount.0; if value_available < value_used { break; } can_handle += 1; } let remaining_payments = payments.drain(can_handle ..).collect::>(); // Restore the rest to the database Db::::set_queued_payments(txn, key, *coin, &remaining_payments); } let payments_value = payments.iter().map(|payment| payment.balance().amount.0).sum::(); // If these payments are worth less than the operating costs, immediately drop them if payments_value <= operating_costs { operating_costs -= payments_value; Db::::set_operating_costs(txn, *coin, Amount(operating_costs)); // Reset payments to the queued payments payments = Db::::queued_payments(txn, key, *coin).unwrap(); // If there's no more payments, stop looking for which payments we should fulfill if payments.is_empty() { break; } // Find which of these we should handle continue; } break; } if payments.is_empty() { continue; } // Create a tree to fulfill all of the payments #[derive(Clone)] struct TreeTransaction { payments: Vec>>, children: Vec>, value: u64, } let mut tree_transactions = vec![]; for payments in payments.chunks(P::MAX_OUTPUTS) { let value = payments.iter().map(|payment| payment.balance().amount.0).sum::(); tree_transactions.push(TreeTransaction:: { payments: payments.to_vec(), children: vec![], value, }); } // While we haven't calculated a tree root, or the tree root doesn't support a change output, // keep working while (tree_transactions.len() != 1) || (tree_transactions[0].payments.len() == P::MAX_OUTPUTS) { let mut next_tree_transactions = vec![]; for children in tree_transactions.chunks(P::MAX_OUTPUTS) { // If this is the last chunk, and it doesn't need to accumulated, continue if (children.len() < P::MAX_OUTPUTS) && ((next_tree_transactions.len() + children.len()) < P::MAX_OUTPUTS) { for child in children { next_tree_transactions.push(child.clone()); } continue; } let payments = children .iter() .map(|child| { Payment::new( P::branch_address(key), Balance { coin: *coin, amount: Amount(child.value) }, None, ) }) .collect(); let value = children.iter().map(|child| child.value).sum(); next_tree_transactions.push(TreeTransaction { payments, children: children.to_vec(), value, }); } tree_transactions = next_tree_transactions; } // This is recursive, yet only recurses with logarithmic depth fn execute_tree_transaction< S: ScannerFeed, P: TransactionPlanner>, >( txn: &mut impl DbTxn, fee_rate: P::FeeRate, eventualities: &mut Vec>, key: KeyFor, mut branch_outputs: Vec>, mut children: Vec>, ) { assert_eq!(branch_outputs.len(), children.len()); // Sort the branch outputs by their value branch_outputs.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0)); // Find the child for each branch output // This is only done within a transaction, not across the layer, so we don't have branches // created in transactions with less outputs (and therefore less fees) jump places with // other branches children.sort_by(|a, b| a.value.cmp(&b.value)); for (branch_output, mut child) in branch_outputs.into_iter().zip(children) { assert_eq!(branch_output.kind(), OutputType::Branch); Db::::set_already_accumulated_output(txn, branch_output.id()); // We need to compensate for the value of this output being less than the value of the // payments { let fee_to_amortize = child.value - branch_output.balance().amount.0; let mut amortized = 0; 'outer: while (!child.payments.is_empty()) && (amortized < fee_to_amortize) { let adjusted_fee = fee_to_amortize - amortized; let payments_len = u64::try_from(child.payments.len()).unwrap(); let per_payment_fee_check = adjusted_fee.div_ceil(payments_len); let mut i = 0; while i < child.payments.len() { let amount = child.payments[i].balance().amount.0; if amount <= per_payment_fee_check { child.payments.swap_remove(i); child.children.swap_remove(i); amortized += amount; continue 'outer; } i += 1; } // Since all payments can pay the fee, deduct accordingly for (i, payment) in child.payments.iter_mut().enumerate() { let Balance { coin, amount } = payment.balance(); let mut amount = amount.0; amount -= adjusted_fee / payments_len; if i < usize::try_from(adjusted_fee % payments_len).unwrap() { amount -= 1; } *payment = Payment::new( payment.address().clone(), Balance { coin, amount: Amount(amount) }, None, ); } } if child.payments.is_empty() { continue; } } let Some(planned) = P::plan_transaction_with_fee_amortization( // Uses 0 as there's no operating costs to incur/amortize here &mut 0, fee_rate, vec![branch_output], child.payments, None, ) else { // This Branch isn't viable, so drop it (and its children) continue; }; TransactionsToSign::::send(txn, &key, &planned.signable); eventualities.push(planned.eventuality); if !child.children.is_empty() { execute_tree_transaction::( txn, fee_rate, eventualities, key, planned.auxilliary.0, child.children, ); } } } assert_eq!(tree_transactions.len(), 1); assert!((tree_transactions[0].payments.len() + 1) <= P::MAX_OUTPUTS); // Create the transaction for the root of the tree let Some(planned) = P::plan_transaction_with_fee_amortization( &mut operating_costs, fee_rates[coin], outputs, tree_transactions[0].payments, Some(key_for_change), ) else { Db::::set_operating_costs(txn, *coin, Amount(operating_costs)); continue; }; TransactionsToSign::::send(txn, &key, &planned.signable); eventualities.push(planned.eventuality); // We accumulate the change output, but consume the branches here accumulate_outputs( txn, planned .auxilliary .0 .iter() .filter(|output| output.kind() == OutputType::Change) .cloned() .collect(), ); // Filter the outputs to the change outputs let mut branch_outputs = planned.auxilliary.0; branch_outputs.retain(|output| output.kind() == OutputType::Branch); if !tree_transactions[0].children.is_empty() { execute_tree_transaction::( txn, fee_rates[coin], &mut eventualities, key, branch_outputs, tree_transactions[0].children, ); } } eventualities } } impl>> SchedulerTrait for Scheduler { fn activate_key(&mut self, txn: &mut impl DbTxn, key: KeyFor) { for coin in S::NETWORK.coins() { assert!(Db::::outputs(txn, key, *coin).is_none()); Db::::set_outputs(txn, key, *coin, &[]); assert!(Db::::queued_payments(txn, key, *coin).is_none()); Db::::set_queued_payments(txn, key, *coin, &vec![]); } } fn flush_key(&mut self, txn: &mut impl DbTxn, retiring_key: KeyFor, new_key: KeyFor) { for coin in S::NETWORK.coins() { let still_queued = Db::::queued_payments(txn, retiring_key, *coin).unwrap(); let mut new_queued = Db::::queued_payments(txn, new_key, *coin).unwrap(); let mut queued = still_queued; queued.append(&mut new_queued); Db::::set_queued_payments(txn, retiring_key, *coin, &vec![]); Db::::set_queued_payments(txn, new_key, *coin, &queued); } } fn retire_key(&mut self, txn: &mut impl DbTxn, key: KeyFor) { for coin in S::NETWORK.coins() { assert!(Db::::outputs(txn, key, *coin).unwrap().is_empty()); Db::::del_outputs(txn, key, *coin); assert!(Db::::queued_payments(txn, key, *coin).unwrap().is_empty()); Db::::del_queued_payments(txn, key, *coin); } } fn update( &mut self, txn: &mut impl DbTxn, block: &BlockFor, active_keys: &[(KeyFor, LifetimeStage)], update: SchedulerUpdate, ) -> HashMap, Vec>> { // Accumulate all the outputs for (key, _) in active_keys { // Accumulate them in memory let mut outputs_by_coin = HashMap::with_capacity(1); for output in update.outputs().iter().filter(|output| output.key() == *key) { match output.kind() { OutputType::External | OutputType::Forwarded => {} // Only accumulate these if we haven't already OutputType::Branch | OutputType::Change => { if Db::::take_if_already_accumulated_output(txn, output.id()) { continue; } } } let coin = output.balance().coin; if let std::collections::hash_map::Entry::Vacant(e) = outputs_by_coin.entry(coin) { e.insert(Db::::outputs(txn, *key, coin).unwrap()); } outputs_by_coin.get_mut(&coin).unwrap().push(output.clone()); } // Flush them to the database for (coin, outputs) in outputs_by_coin { Db::::set_outputs(txn, *key, coin, &outputs); } } let fee_rates = block.fee_rates(); // Fulfill the payments we prior couldn't let mut eventualities = HashMap::new(); for (key, _stage) in active_keys { eventualities.insert( key.to_bytes().as_ref().to_vec(), self.handle_queued_payments(txn, active_keys, fee_rates, *key), ); } // TODO: If this key has been flushed, forward all outputs // Create the transactions for the forwards/burns { let mut planned_txs = vec![]; for forward in update.forwards() { let key = forward.key(); assert_eq!(active_keys.len(), 2); assert_eq!(active_keys[0].1, LifetimeStage::Forwarding); assert_eq!(active_keys[1].1, LifetimeStage::Active); let forward_to_key = active_keys[1].0; let Some(plan) = P::plan_transaction_with_fee_amortization( // This uses 0 for the operating costs as we don't incur any here // If the output can't pay for itself to be forwarded, we simply drop it &mut 0, fee_rates[&forward.balance().coin], vec![forward.clone()], vec![Payment::new(P::forwarding_address(forward_to_key), forward.balance(), None)], None, ) else { continue; }; planned_txs.push((key, plan)); } for to_return in update.returns() { let key = to_return.output().key(); let out_instruction = Payment::new(to_return.address().clone(), to_return.output().balance(), None); let Some(plan) = P::plan_transaction_with_fee_amortization( // This uses 0 for the operating costs as we don't incur any here // If the output can't pay for itself to be returned, we simply drop it &mut 0, fee_rates[&out_instruction.balance().coin], vec![to_return.output().clone()], vec![out_instruction], None, ) else { continue; }; planned_txs.push((key, plan)); } for (key, planned_tx) in planned_txs { // Send the transactions off for signing TransactionsToSign::::send(txn, &key, &planned_tx.signable); // Insert the Eventualities into the result eventualities[key.to_bytes().as_ref()].push(planned_tx.eventuality); } eventualities } } fn fulfill( &mut self, txn: &mut impl DbTxn, block: &BlockFor, active_keys: &[(KeyFor, LifetimeStage)], mut payments: Vec>>, ) -> HashMap, Vec>> { // Find the key to filfill these payments with let fulfillment_key = match active_keys[0].1 { LifetimeStage::ActiveYetNotReporting => { panic!("expected to fulfill payments despite not reporting for the oldest key") } LifetimeStage::Active | LifetimeStage::UsingNewForChange => active_keys[0].0, LifetimeStage::Forwarding | LifetimeStage::Finishing => active_keys[1].0, }; // Queue the payments for this key for coin in S::NETWORK.coins() { let mut queued_payments = Db::::queued_payments(txn, fulfillment_key, *coin).unwrap(); queued_payments .extend(payments.iter().filter(|payment| payment.balance().coin == *coin).cloned()); Db::::set_queued_payments(txn, fulfillment_key, *coin, &queued_payments); } // Handle the queued payments HashMap::from([( fulfillment_key.to_bytes().as_ref().to_vec(), self.handle_queued_payments(txn, active_keys, block.fee_rates(), fulfillment_key), )]) } }