From e1ad897f7e64571bdf0ee15a8ddf508ed1e1dd4c Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sat, 14 Sep 2024 01:09:35 -0400 Subject: [PATCH] Allow scheduler's creation of transactions to be async and error I don't love this, but it's the only way to select decoys without using a local database. While the prior commit added such a databse, the performance of it presumably wasn't viable, and while TODOs marked the needed improvements, it was still messy with an immense scope re: any auditing. The relevant scheduler functions now take `&self` (intentional, as all mutations should be via the `&mut impl DbTxn` passed). The calls to `&self` are expected to be completely deterministic (as usual). --- processor/bin/src/lib.rs | 25 +- processor/bitcoin/src/main.rs | 4 +- processor/bitcoin/src/scheduler.rs | 87 ++-- processor/monero/src/decoys.rs | 294 ------------ processor/monero/src/main.rs | 1 - processor/monero/src/rpc.rs | 27 +- processor/scanner/src/eventuality/mod.rs | 44 +- processor/scanner/src/lib.rs | 28 +- .../scheduler/utxo/primitives/src/lib.rs | 260 +++++----- processor/scheduler/utxo/standard/src/lib.rs | 443 ++++++++++-------- .../utxo/transaction-chaining/src/lib.rs | 364 +++++++------- 11 files changed, 723 insertions(+), 854 deletions(-) delete mode 100644 processor/monero/src/decoys.rs diff --git a/processor/bin/src/lib.rs b/processor/bin/src/lib.rs index 67ea6150..7758b1ea 100644 --- a/processor/bin/src/lib.rs +++ b/processor/bin/src/lib.rs @@ -15,7 +15,7 @@ use serai_db::{Get, DbTxn, Db as DbTrait, create_db, db_channel}; use primitives::EncodableG; use ::key_gen::{KeyGenParams, KeyGen}; -use scheduler::SignableTransaction; +use scheduler::{SignableTransaction, TransactionFor}; use scanner::{ScannerFeed, Scanner, KeyFor, Scheduler}; use signers::{TransactionPublisher, Signers}; @@ -161,22 +161,23 @@ async fn first_block_after_time(feed: &S, serai_time: u64) -> u6 pub async fn main_loop< S: ScannerFeed, K: KeyGenParams>>, - Sch: Scheduler< - S, - SignableTransaction: SignableTransaction, - >, - P: TransactionPublisher<::Transaction>, + Sch: Clone + + Scheduler< + S, + SignableTransaction: SignableTransaction, + >, >( mut db: Db, feed: S, - publisher: P, + scheduler: Sch, + publisher: impl TransactionPublisher>, ) { let mut coordinator = Coordinator::new(db.clone()); let mut key_gen = key_gen::(); - let mut scanner = Scanner::new::(db.clone(), feed.clone()).await; + let mut scanner = Scanner::new(db.clone(), feed.clone(), scheduler.clone()).await; let mut signers = - Signers::::new(db.clone(), coordinator.coordinator_send(), publisher); + Signers::::new(db.clone(), coordinator.coordinator_send(), publisher); loop { let db_clone = db.clone(); @@ -242,8 +243,10 @@ pub async fn main_loop< if session == Session(0) { assert!(scanner.is_none()); let start_block = first_block_after_time(&feed, serai_time).await; - scanner = - Some(Scanner::initialize::(db_clone, feed.clone(), start_block, key.0).await); + scanner = Some( + Scanner::initialize(db_clone, feed.clone(), scheduler.clone(), start_block, key.0) + .await, + ); } } messages::substrate::CoordinatorMessage::SlashesReported { session } => { diff --git a/processor/bitcoin/src/main.rs b/processor/bitcoin/src/main.rs index 56bfd619..d029ad8b 100644 --- a/processor/bitcoin/src/main.rs +++ b/processor/bitcoin/src/main.rs @@ -22,7 +22,7 @@ use crate::key_gen::KeyGenParams; mod rpc; use rpc::Rpc; mod scheduler; -use scheduler::Scheduler; +use scheduler::{Planner, Scheduler}; // Our custom code for Bitcoin mod db; @@ -57,7 +57,7 @@ async fn main() { tokio::spawn(TxIndexTask(feed.clone()).continually_run(index_task, vec![])); core::mem::forget(index_handle); - bin::main_loop::<_, KeyGenParams, Scheduler<_>, Rpc>(db, feed.clone(), feed).await; + bin::main_loop::<_, KeyGenParams, _>(db, feed.clone(), Scheduler::new(Planner), feed).await; } /* diff --git a/processor/bitcoin/src/scheduler.rs b/processor/bitcoin/src/scheduler.rs index 6e49d23d..b6554bda 100644 --- a/processor/bitcoin/src/scheduler.rs +++ b/processor/bitcoin/src/scheduler.rs @@ -1,3 +1,5 @@ +use core::future::Future; + use ciphersuite::{Ciphersuite, Secp256k1}; use bitcoin_serai::{ @@ -89,8 +91,10 @@ fn signable_transaction( .map(|bst| (SignableTransaction { inputs, payments, change, fee_per_vbyte }, bst)) } +#[derive(Clone)] pub(crate) struct Planner; impl TransactionPlanner, EffectedReceivedOutputs>> for Planner { + type EphemeralError = (); type FeeRate = u64; type SignableTransaction = SignableTransaction; @@ -153,50 +157,59 @@ impl TransactionPlanner, EffectedReceivedOutputs>> for Plan } fn plan( + &self, fee_rate: Self::FeeRate, inputs: Vec>>, payments: Vec>>>, change: Option>>, - ) -> PlannedTransaction, Self::SignableTransaction, EffectedReceivedOutputs>> { - let key = inputs.first().unwrap().key(); - for input in &inputs { - assert_eq!(key, input.key()); - } + ) -> impl Send + + Future< + Output = Result< + PlannedTransaction, Self::SignableTransaction, EffectedReceivedOutputs>>, + Self::EphemeralError, + >, + > { + async move { + let key = inputs.first().unwrap().key(); + for input in &inputs { + assert_eq!(key, input.key()); + } - let singular_spent_output = (inputs.len() == 1).then(|| inputs[0].id()); - match signable_transaction::(fee_rate, inputs.clone(), payments, change) { - Ok(tx) => PlannedTransaction { - signable: tx.0, - eventuality: Eventuality { txid: tx.1.txid(), singular_spent_output }, - auxilliary: EffectedReceivedOutputs({ - let tx = tx.1.transaction(); - let scanner = scanner(key); + let singular_spent_output = (inputs.len() == 1).then(|| inputs[0].id()); + match signable_transaction::(fee_rate, inputs.clone(), payments, change) { + Ok(tx) => Ok(PlannedTransaction { + signable: tx.0, + eventuality: Eventuality { txid: tx.1.txid(), singular_spent_output }, + auxilliary: EffectedReceivedOutputs({ + let tx = tx.1.transaction(); + let scanner = scanner(key); - let mut res = vec![]; - for output in scanner.scan_transaction(tx) { - res.push(Output::new_with_presumed_origin( - key, - tx, - // It shouldn't matter if this is wrong as we should never try to return these - // We still provide an accurate value to ensure a lack of discrepancies - Some(Address::new(inputs[0].output.output().script_pubkey.clone()).unwrap()), - output, - )); - } - res + let mut res = vec![]; + for output in scanner.scan_transaction(tx) { + res.push(Output::new_with_presumed_origin( + key, + tx, + // It shouldn't matter if this is wrong as we should never try to return these + // We still provide an accurate value to ensure a lack of discrepancies + Some(Address::new(inputs[0].output.output().script_pubkey.clone()).unwrap()), + output, + )); + } + res + }), }), - }, - Err( - TransactionError::NoInputs | TransactionError::NoOutputs | TransactionError::DustPayment, - ) => panic!("malformed arguments to plan"), - // No data, we have a minimum fee rate, we checked the amount of inputs/outputs - Err( - TransactionError::TooMuchData | - TransactionError::TooLowFee | - TransactionError::TooLargeTransaction, - ) => unreachable!(), - Err(TransactionError::NotEnoughFunds { .. }) => { - panic!("plan called for a transaction without enough funds") + Err( + TransactionError::NoInputs | TransactionError::NoOutputs | TransactionError::DustPayment, + ) => panic!("malformed arguments to plan"), + // No data, we have a minimum fee rate, we checked the amount of inputs/outputs + Err( + TransactionError::TooMuchData | + TransactionError::TooLowFee | + TransactionError::TooLargeTransaction, + ) => unreachable!(), + Err(TransactionError::NotEnoughFunds { .. }) => { + panic!("plan called for a transaction without enough funds") + } } } } diff --git a/processor/monero/src/decoys.rs b/processor/monero/src/decoys.rs deleted file mode 100644 index 000463d0..00000000 --- a/processor/monero/src/decoys.rs +++ /dev/null @@ -1,294 +0,0 @@ -use core::{ - future::Future, - ops::{Bound, RangeBounds}, -}; - -use curve25519_dalek::{ - scalar::Scalar, - edwards::{CompressedEdwardsY, EdwardsPoint}, -}; -use monero_wallet::{ - DEFAULT_LOCK_WINDOW, - primitives::Commitment, - transaction::{Timelock, Input, Pruned, Transaction}, - rpc::{OutputInformation, RpcError, Rpc as MRpcTrait, DecoyRpc}, -}; - -use borsh::{BorshSerialize, BorshDeserialize}; -use serai_db::{Get, DbTxn, Db, create_db}; - -use primitives::task::ContinuallyRan; -use scanner::ScannerFeed; - -use crate::Rpc; - -#[derive(BorshSerialize, BorshDeserialize)] -struct EncodableOutputInformation { - height: u64, - timelocked: bool, - key: [u8; 32], - commitment: [u8; 32], -} - -create_db! { - MoneroProcessorDecoys { - NextToIndexBlock: () -> u64, - PriorIndexedBlock: () -> [u8; 32], - DistributionStartBlock: () -> u64, - Distribution: () -> Vec, - Out: (index: u64) -> EncodableOutputInformation, - } -} - -/* - We want to be able to select decoys when planning transactions, but planning transactions is a - synchronous process. We store the decoys to a local database and have our database implement - `DecoyRpc` to achieve synchronous decoy selection. - - This is only needed as the transactions we sign must have decoys decided and agreed upon. With - FCMP++s, we'll be able to sign transactions without the membership proof, letting any signer - prove for membership after the fact (with their local views). Until then, this task remains. -*/ -pub(crate) struct DecoysTask { - pub(crate) rpc: Rpc, - pub(crate) current_distribution: Vec, -} - -impl ContinuallyRan for DecoysTask { - fn run_iteration(&mut self) -> impl Send + Future> { - async move { - let finalized_block_number = self - .rpc - .rpc - .get_height() - .await - .map_err(|e| format!("couldn't fetch latest block number: {e:?}"))? - .checked_sub(Rpc::::CONFIRMATIONS.try_into().unwrap()) - .ok_or(format!( - "blockchain only just started and doesn't have {} blocks yet", - Rpc::::CONFIRMATIONS - ))?; - - if NextToIndexBlock::get(&self.rpc.db).is_none() { - let distribution = self - .rpc - .rpc - .get_output_distribution(..= finalized_block_number) - .await - .map_err(|e| format!("failed to get output distribution: {e:?}"))?; - if distribution.is_empty() { - Err("distribution was empty".to_string())?; - } - - let distribution_start_block = finalized_block_number - (distribution.len() - 1); - // There may have been a reorg between the time of getting the distribution and the time of - // getting this block. This is an invariant and assumed not to have happened in the split - // second it's possible. - let block = self - .rpc - .rpc - .get_block_by_number(distribution_start_block) - .await - .map_err(|e| format!("failed to get the start block for the distribution: {e:?}"))?; - - let mut txn = self.rpc.db.txn(); - NextToIndexBlock::set(&mut txn, &distribution_start_block.try_into().unwrap()); - PriorIndexedBlock::set(&mut txn, &block.header.previous); - DistributionStartBlock::set(&mut txn, &u64::try_from(distribution_start_block).unwrap()); - txn.commit(); - } - - let next_to_index_block = - usize::try_from(NextToIndexBlock::get(&self.rpc.db).unwrap()).unwrap(); - if next_to_index_block >= finalized_block_number { - return Ok(false); - } - - for b in next_to_index_block ..= finalized_block_number { - // Fetch the block - let block = self - .rpc - .rpc - .get_block_by_number(b) - .await - .map_err(|e| format!("decoys task failed to fetch block: {e:?}"))?; - let prior = PriorIndexedBlock::get(&self.rpc.db).unwrap(); - if block.header.previous != prior { - panic!( - "decoys task detected reorg: expected {}, found {}", - hex::encode(prior), - hex::encode(block.header.previous) - ); - } - - // Fetch the transactions in the block - let transactions = self - .rpc - .rpc - .get_pruned_transactions(&block.transactions) - .await - .map_err(|e| format!("failed to get the pruned transactions within a block: {e:?}"))?; - - fn outputs( - list: &mut Vec, - block_number: u64, - tx: Transaction, - ) { - match tx { - Transaction::V1 { .. } => {} - Transaction::V2 { prefix, proofs } => { - for (i, output) in prefix.outputs.into_iter().enumerate() { - list.push(EncodableOutputInformation { - // This is correct per the documentation on OutputInformation, which this maps to - height: block_number, - timelocked: prefix.additional_timelock != Timelock::None, - key: output.key.to_bytes(), - commitment: if matches!( - prefix.inputs.first().expect("Monero transaction had no inputs"), - Input::Gen(_) - ) { - Commitment::new( - Scalar::ONE, - output.amount.expect("miner transaction outputs didn't have amounts set"), - ) - .calculate() - .compress() - .to_bytes() - } else { - proofs - .as_ref() - .expect("non-miner V2 transaction didn't have proofs") - .base - .commitments - .get(i) - .expect("amount of commitments didn't match amount of outputs") - .compress() - .to_bytes() - }, - }); - } - } - } - } - - let block_hash = block.hash(); - - let b = u64::try_from(b).unwrap(); - let mut encodable = Vec::with_capacity(2 * (1 + block.transactions.len())); - outputs(&mut encodable, b, block.miner_transaction.into()); - for transaction in transactions { - outputs(&mut encodable, b, transaction); - } - - let existing_outputs = self.current_distribution.last().copied().unwrap_or(0); - let now_outputs = existing_outputs + u64::try_from(encodable.len()).unwrap(); - self.current_distribution.push(now_outputs); - - let mut txn = self.rpc.db.txn(); - NextToIndexBlock::set(&mut txn, &(b + 1)); - PriorIndexedBlock::set(&mut txn, &block_hash); - // TODO: Don't write the entire 10 MB distribution to the DB every two minutes - Distribution::set(&mut txn, &self.current_distribution); - for (b, out) in (existing_outputs .. now_outputs).zip(encodable) { - Out::set(&mut txn, b, &out); - } - txn.commit(); - } - Ok(true) - } - } -} - -// TODO: Cache the distribution in a static -pub(crate) struct Decoys<'a, G: Get>(&'a G); -impl<'a, G: Sync + Get> DecoyRpc for Decoys<'a, G> { - #[rustfmt::skip] - fn get_output_distribution_end_height( - &self, - ) -> impl Send + Future> { - async move { - Ok(NextToIndexBlock::get(self.0).map_or(0, |b| usize::try_from(b).unwrap() + 1)) - } - } - fn get_output_distribution( - &self, - range: impl Send + RangeBounds, - ) -> impl Send + Future, RpcError>> { - async move { - let from = match range.start_bound() { - Bound::Included(from) => *from, - Bound::Excluded(from) => from.checked_add(1).ok_or_else(|| { - RpcError::InternalError("range's from wasn't representable".to_string()) - })?, - Bound::Unbounded => 0, - }; - let to = match range.end_bound() { - Bound::Included(to) => *to, - Bound::Excluded(to) => to - .checked_sub(1) - .ok_or_else(|| RpcError::InternalError("range's to wasn't representable".to_string()))?, - Bound::Unbounded => { - panic!("requested distribution till latest block, which is non-deterministic") - } - }; - if from > to { - Err(RpcError::InternalError(format!( - "malformed range: inclusive start {from}, inclusive end {to}" - )))?; - } - - let distribution_start_block = usize::try_from( - DistributionStartBlock::get(self.0).expect("never populated the distribution start block"), - ) - .unwrap(); - let len_of_distribution_until_to = - to.checked_sub(distribution_start_block).ok_or_else(|| { - RpcError::InternalError( - "requested distribution until a block when the distribution had yet to start" - .to_string(), - ) - })? + - 1; - let distribution = Distribution::get(self.0).expect("never populated the distribution"); - assert!( - distribution.len() >= len_of_distribution_until_to, - "requested distribution until block we have yet to index" - ); - Ok( - distribution[from.saturating_sub(distribution_start_block) .. len_of_distribution_until_to] - .to_vec(), - ) - } - } - fn get_outs( - &self, - _indexes: &[u64], - ) -> impl Send + Future, RpcError>> { - async move { unimplemented!("get_outs is unused") } - } - fn get_unlocked_outputs( - &self, - indexes: &[u64], - height: usize, - fingerprintable_deterministic: bool, - ) -> impl Send + Future>, RpcError>> { - assert!(fingerprintable_deterministic, "processor wasn't using deterministic output selection"); - async move { - let mut res = vec![]; - for index in indexes { - let out = Out::get(self.0, *index).expect("requested output we didn't index"); - let unlocked = (!out.timelocked) && - ((usize::try_from(out.height).unwrap() + DEFAULT_LOCK_WINDOW) <= height); - res.push(unlocked.then(|| CompressedEdwardsY(out.key).decompress()).flatten().map(|key| { - [ - key, - CompressedEdwardsY(out.commitment) - .decompress() - .expect("output with invalid commitment"), - ] - })); - } - Ok(res) - } - } -} diff --git a/processor/monero/src/main.rs b/processor/monero/src/main.rs index 5b32e0f1..344b6c48 100644 --- a/processor/monero/src/main.rs +++ b/processor/monero/src/main.rs @@ -16,7 +16,6 @@ use crate::key_gen::KeyGenParams; mod rpc; use rpc::Rpc; -mod decoys; /* mod scheduler; use scheduler::Scheduler; diff --git a/processor/monero/src/rpc.rs b/processor/monero/src/rpc.rs index 58e6cf8b..9244b23f 100644 --- a/processor/monero/src/rpc.rs +++ b/processor/monero/src/rpc.rs @@ -5,7 +5,6 @@ use monero_simple_request_rpc::SimpleRequestRpc; use serai_client::primitives::{NetworkId, Coin, Amount}; -use serai_db::Db; use scanner::ScannerFeed; use signers::TransactionPublisher; @@ -15,12 +14,11 @@ use crate::{ }; #[derive(Clone)] -pub(crate) struct Rpc { - pub(crate) db: D, +pub(crate) struct Rpc { pub(crate) rpc: SimpleRequestRpc, } -impl ScannerFeed for Rpc { +impl ScannerFeed for Rpc { const NETWORK: NetworkId = NetworkId::Monero; // Outputs aren't spendable until 10 blocks later due to the 10-block lock // Since we assumed scanned outputs are spendable, that sets a minimum confirmation depth of 10 @@ -39,15 +37,16 @@ impl ScannerFeed for Rpc { &self, ) -> impl Send + Future> { async move { - // The decoys task only indexes finalized blocks - crate::decoys::NextToIndexBlock::get(&self.db) - .ok_or_else(|| { - RpcError::InternalError("decoys task hasn't indexed any blocks yet".to_string()) - })? - .checked_sub(1) - .ok_or_else(|| { - RpcError::InternalError("only the genesis block has been indexed".to_string()) - }) + Ok( + self + .rpc + .get_height() + .await? + .checked_sub(1) + .expect("connected to an invalid Monero RPC") + .try_into() + .unwrap(), + ) } } @@ -128,7 +127,7 @@ impl ScannerFeed for Rpc { } } -impl TransactionPublisher for Rpc { +impl TransactionPublisher for Rpc { type EphemeralError = RpcError; fn publish( diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index 46a5e13b..99fea2fb 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -1,4 +1,4 @@ -use core::{marker::PhantomData, future::Future}; +use core::future::Future; use std::collections::{HashSet, HashMap}; use group::GroupEncoding; @@ -102,11 +102,11 @@ fn intake_eventualities( pub(crate) struct EventualityTask> { db: D, feed: S, - scheduler: PhantomData, + scheduler: Sch, } impl> EventualityTask { - pub(crate) fn new(mut db: D, feed: S, start_block: u64) -> Self { + pub(crate) fn new(mut db: D, feed: S, scheduler: Sch, start_block: u64) -> Self { if EventualityDb::::next_to_check_for_eventualities_block(&db).is_none() { // Initialize the DB let mut txn = db.txn(); @@ -114,7 +114,7 @@ impl> EventualityTask { txn.commit(); } - Self { db, feed, scheduler: PhantomData } + Self { db, feed, scheduler } } #[allow(clippy::type_complexity)] @@ -167,15 +167,19 @@ impl> EventualityTask { { intaked_any = true; - let new_eventualities = Sch::fulfill( - &mut txn, - &block, - &keys_with_stages, - burns - .into_iter() - .filter_map(|burn| Payment::>::try_from(burn).ok()) - .collect(), - ); + let new_eventualities = self + .scheduler + .fulfill( + &mut txn, + &block, + &keys_with_stages, + burns + .into_iter() + .filter_map(|burn| Payment::>::try_from(burn).ok()) + .collect(), + ) + .await + .map_err(|e| format!("failed to queue fulfilling payments: {e:?}"))?; intake_eventualities::(&mut txn, new_eventualities); } txn.commit(); @@ -443,8 +447,11 @@ impl> ContinuallyRan for EventualityTas determined off an earlier block than this (enabling an earlier LifetimeStage to be used after a later one was already used). */ - let new_eventualities = - Sch::update(&mut txn, &block, &keys_with_stages, scheduler_update); + let new_eventualities = self + .scheduler + .update(&mut txn, &block, &keys_with_stages, scheduler_update) + .await + .map_err(|e| format!("failed to update scheduler: {e:?}"))?; // Intake the new Eventualities for key in new_eventualities.keys() { keys @@ -464,8 +471,11 @@ impl> ContinuallyRan for EventualityTas key.key != keys.last().unwrap().key, "key which was forwarding was the last key (which has no key after it to forward to)" ); - let new_eventualities = - Sch::flush_key(&mut txn, &block, key.key, keys.last().unwrap().key); + let new_eventualities = self + .scheduler + .flush_key(&mut txn, &block, key.key, keys.last().unwrap().key) + .await + .map_err(|e| format!("failed to flush key from scheduler: {e:?}"))?; intake_eventualities::(&mut txn, new_eventualities); } diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 6ac45223..1b6afaa9 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -256,8 +256,17 @@ impl SchedulerUpdate { } } +/// Eventualities, keyed by the encoding of the key the Eventualities are for. +pub type KeyScopedEventualities = HashMap, Vec>>; + /// The object responsible for accumulating outputs and planning new transactions. pub trait Scheduler: 'static + Send { + /// An error encountered when handling updates/payments. + /// + /// This MUST be an ephemeral error. Retrying handling updates/payments MUST eventually + /// resolve without manual intervention/changing the arguments. + type EphemeralError: Debug; + /// The type for a signable transaction. type SignableTransaction: scheduler_primitives::SignableTransaction; @@ -278,11 +287,12 @@ pub trait Scheduler: 'static + Send { /// If the retiring key has any unfulfilled payments associated with it, those MUST be made /// the responsibility of the new key. fn flush_key( + &self, txn: &mut impl DbTxn, block: &BlockFor, retiring_key: KeyFor, new_key: KeyFor, - ) -> HashMap, Vec>>; + ) -> impl Send + Future, Self::EphemeralError>>; /// Retire a key as it'll no longer be used. /// @@ -300,11 +310,12 @@ pub trait Scheduler: 'static + Send { /// The `Vec` used as the key in the returned HashMap should be the encoded key the /// Eventualities are for. fn update( + &self, txn: &mut impl DbTxn, block: &BlockFor, active_keys: &[(KeyFor, LifetimeStage)], update: SchedulerUpdate, - ) -> HashMap, Vec>>; + ) -> impl Send + Future, Self::EphemeralError>>; /// Fulfill a series of payments, yielding the Eventualities now to be scanned for. /// @@ -339,11 +350,12 @@ pub trait Scheduler: 'static + Send { has an output-to-Serai, the new primary output). */ fn fulfill( + &self, txn: &mut impl DbTxn, block: &BlockFor, active_keys: &[(KeyFor, LifetimeStage)], payments: Vec>>, - ) -> HashMap, Vec>>; + ) -> impl Send + Future, Self::EphemeralError>>; } /// A representation of a scanner. @@ -358,14 +370,15 @@ impl Scanner { /// This will begin its execution, spawning several asynchronous tasks. /// /// This will return None if the Scanner was never initialized. - pub async fn new>(db: impl Db, feed: S) -> Option { + pub async fn new(db: impl Db, feed: S, scheduler: impl Scheduler) -> Option { let start_block = ScannerGlobalDb::::start_block(&db)?; let index_task = index::IndexTask::new(db.clone(), feed.clone(), start_block).await; let scan_task = scan::ScanTask::new(db.clone(), feed.clone(), start_block); let report_task = report::ReportTask::<_, S>::new(db.clone(), start_block); let substrate_task = substrate::SubstrateTask::<_, S>::new(db.clone()); - let eventuality_task = eventuality::EventualityTask::<_, _, Sch>::new(db, feed, start_block); + let eventuality_task = + eventuality::EventualityTask::<_, _, _>::new(db, feed, scheduler, start_block); let (index_task_def, _index_handle) = Task::new(); let (scan_task_def, scan_handle) = Task::new(); @@ -394,9 +407,10 @@ impl Scanner { /// This will begin its execution, spawning several asynchronous tasks. /// /// This passes through to `Scanner::new` if prior called. - pub async fn initialize>( + pub async fn initialize( mut db: impl Db, feed: S, + scheduler: impl Scheduler, start_block: u64, start_key: KeyFor, ) -> Self { @@ -407,7 +421,7 @@ impl Scanner { txn.commit(); } - Self::new::(db, feed).await.unwrap() + Self::new(db, feed, scheduler).await.unwrap() } /// Acknowledge a Batch having been published on Serai. diff --git a/processor/scheduler/utxo/primitives/src/lib.rs b/processor/scheduler/utxo/primitives/src/lib.rs index e48221a1..00b2d10f 100644 --- a/processor/scheduler/utxo/primitives/src/lib.rs +++ b/processor/scheduler/utxo/primitives/src/lib.rs @@ -2,6 +2,8 @@ #![doc = include_str!("../README.md")] #![deny(missing_docs)] +use core::{fmt::Debug, future::Future}; + use serai_primitives::{Coin, Amount}; use primitives::{ReceivedOutput, Payment}; @@ -40,8 +42,14 @@ pub struct AmortizePlannedTransaction: 'static + Send + Sync { + /// An error encountered when handling planning transactions. + /// + /// This MUST be an ephemeral error. Retrying planning transactions MUST eventually resolve + /// resolve manual intervention/changing the arguments. + type EphemeralError: Debug; + /// The type representing a fee rate to use for transactions. - type FeeRate: Clone + Copy; + type FeeRate: Send + Clone + Copy; /// The type representing a signable transaction. type SignableTransaction: SignableTransaction; @@ -82,11 +90,15 @@ pub trait TransactionPlanner: 'static + Send + Sync { /// `change` will always be an address belonging to the Serai network. If it is `Some`, a change /// output must be created. fn plan( + &self, fee_rate: Self::FeeRate, inputs: Vec>, payments: Vec>>, change: Option>, - ) -> PlannedTransaction; + ) -> impl Send + + Future< + Output = Result, Self::EphemeralError>, + >; /// Obtain a PlannedTransaction via amortizing the fee over the payments. /// @@ -98,132 +110,142 @@ pub trait TransactionPlanner: 'static + Send + Sync { /// Returns `None` if the fee exceeded the inputs, or `Some` otherwise. // TODO: Enum for Change of None, Some, Mandatory fn plan_transaction_with_fee_amortization( + &self, operating_costs: &mut u64, fee_rate: Self::FeeRate, inputs: Vec>, mut payments: Vec>>, mut change: Option>, - ) -> Option> { - // If there's no change output, we can't recoup any operating costs we would amortize - // We also don't have any losses if the inputs are written off/the change output is reduced - let mut operating_costs_if_no_change = 0; - let operating_costs_in_effect = - if change.is_none() { &mut operating_costs_if_no_change } else { operating_costs }; + ) -> impl Send + + Future< + Output = Result< + Option>, + Self::EphemeralError, + >, + > { + async move { + // If there's no change output, we can't recoup any operating costs we would amortize + // We also don't have any losses if the inputs are written off/the change output is reduced + let mut operating_costs_if_no_change = 0; + let operating_costs_in_effect = + if change.is_none() { &mut operating_costs_if_no_change } else { operating_costs }; + + // Sanity checks + { + assert!(!inputs.is_empty()); + assert!((!payments.is_empty()) || change.is_some()); + let coin = inputs.first().unwrap().balance().coin; + for input in &inputs { + assert_eq!(coin, input.balance().coin); + } + for payment in &payments { + assert_eq!(coin, payment.balance().coin); + } + assert!( + (inputs.iter().map(|input| input.balance().amount.0).sum::() + + *operating_costs_in_effect) >= + payments.iter().map(|payment| payment.balance().amount.0).sum::(), + "attempted to fulfill payments without a sufficient input set" + ); + } - // Sanity checks - { - assert!(!inputs.is_empty()); - assert!((!payments.is_empty()) || change.is_some()); let coin = inputs.first().unwrap().balance().coin; - for input in &inputs { - assert_eq!(coin, input.balance().coin); + + // Amortization + { + // Sort payments from high amount to low amount + payments.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0).reverse()); + + let mut fee = Self::calculate_fee(fee_rate, inputs.clone(), payments.clone(), change).0; + let mut amortized = 0; + while !payments.is_empty() { + // We need to pay the fee, and any accrued operating costs, minus what we've already + // amortized + let adjusted_fee = (*operating_costs_in_effect + fee).saturating_sub(amortized); + + /* + Ideally, we wouldn't use a ceil div yet would be accurate about it. Any remainder could + be amortized over the largest outputs, which wouldn't be relevant here as we only work + with the smallest output. The issue is the theoretical edge case where all outputs have + the same value and are of the minimum value. In that case, none would be able to have + the remainder amortized as it'd cause them to need to be dropped. Using a ceil div + avoids this. + */ + let per_payment_fee = adjusted_fee.div_ceil(u64::try_from(payments.len()).unwrap()); + // Pop the last payment if it can't pay the fee, remaining about the dust limit as it does + if payments.last().unwrap().balance().amount.0 <= (per_payment_fee + S::dust(coin).0) { + amortized += payments.pop().unwrap().balance().amount.0; + // Recalculate the fee and try again + fee = Self::calculate_fee(fee_rate, inputs.clone(), payments.clone(), change).0; + continue; + } + // Break since all of these payments shouldn't be dropped + break; + } + + // If we couldn't amortize the fee over the payments, check if we even have enough to pay it + if payments.is_empty() { + // If we don't have a change output, we simply return here + // We no longer have anything to do here, nor any expectations + if change.is_none() { + return Ok(None); + } + + let inputs = inputs.iter().map(|input| input.balance().amount.0).sum::(); + // Checks not just if we can pay for it, yet that the would-be change output is at least + // dust + if inputs < (fee + S::dust(coin).0) { + // Write off these inputs + *operating_costs_in_effect += inputs; + // Yet also claw back the payments we dropped, as we only lost the change + // The dropped payments will be worth less than the inputs + operating_costs we started + // with, so this shouldn't use `saturating_sub` + *operating_costs_in_effect -= amortized; + return Ok(None); + } + } else { + // Since we have payments which can pay the fee we ended up with, amortize it + let adjusted_fee = (*operating_costs_in_effect + fee).saturating_sub(amortized); + let per_payment_base_fee = adjusted_fee / u64::try_from(payments.len()).unwrap(); + let payments_paying_one_atomic_unit_more = + usize::try_from(adjusted_fee % u64::try_from(payments.len()).unwrap()).unwrap(); + + for (i, payment) in payments.iter_mut().enumerate() { + let per_payment_fee = + per_payment_base_fee + u64::from(u8::from(i < payments_paying_one_atomic_unit_more)); + payment.balance().amount.0 -= per_payment_fee; + amortized += per_payment_fee; + } + assert!(amortized >= (*operating_costs_in_effect + fee)); + + // If the change is less than the dust, drop it + let would_be_change = inputs.iter().map(|input| input.balance().amount.0).sum::() - + payments.iter().map(|payment| payment.balance().amount.0).sum::() - + fee; + if would_be_change < S::dust(coin).0 { + change = None; + *operating_costs_in_effect += would_be_change; + } + } + + // Update the amount of operating costs + *operating_costs_in_effect = (*operating_costs_in_effect + fee).saturating_sub(amortized); } - for payment in &payments { - assert_eq!(coin, payment.balance().coin); - } - assert!( - (inputs.iter().map(|input| input.balance().amount.0).sum::() + - *operating_costs_in_effect) >= - payments.iter().map(|payment| payment.balance().amount.0).sum::(), - "attempted to fulfill payments without a sufficient input set" - ); + + // Because we amortized, or accrued as operating costs, the fee, make the transaction + let effected_payments = payments.iter().map(|payment| payment.balance().amount).collect(); + let has_change = change.is_some(); + + let PlannedTransaction { signable, eventuality, auxilliary } = + self.plan(fee_rate, inputs, payments, change).await?; + Ok(Some(AmortizePlannedTransaction { + effected_payments, + has_change, + signable, + eventuality, + auxilliary, + })) } - - let coin = inputs.first().unwrap().balance().coin; - - // Amortization - { - // Sort payments from high amount to low amount - payments.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0).reverse()); - - let mut fee = Self::calculate_fee(fee_rate, inputs.clone(), payments.clone(), change).0; - let mut amortized = 0; - while !payments.is_empty() { - // We need to pay the fee, and any accrued operating costs, minus what we've already - // amortized - let adjusted_fee = (*operating_costs_in_effect + fee).saturating_sub(amortized); - - /* - Ideally, we wouldn't use a ceil div yet would be accurate about it. Any remainder could - be amortized over the largest outputs, which wouldn't be relevant here as we only work - with the smallest output. The issue is the theoretical edge case where all outputs have - the same value and are of the minimum value. In that case, none would be able to have the - remainder amortized as it'd cause them to need to be dropped. Using a ceil div avoids - this. - */ - let per_payment_fee = adjusted_fee.div_ceil(u64::try_from(payments.len()).unwrap()); - // Pop the last payment if it can't pay the fee, remaining about the dust limit as it does - if payments.last().unwrap().balance().amount.0 <= (per_payment_fee + S::dust(coin).0) { - amortized += payments.pop().unwrap().balance().amount.0; - // Recalculate the fee and try again - fee = Self::calculate_fee(fee_rate, inputs.clone(), payments.clone(), change).0; - continue; - } - // Break since all of these payments shouldn't be dropped - break; - } - - // If we couldn't amortize the fee over the payments, check if we even have enough to pay it - if payments.is_empty() { - // If we don't have a change output, we simply return here - // We no longer have anything to do here, nor any expectations - if change.is_none() { - None?; - } - - let inputs = inputs.iter().map(|input| input.balance().amount.0).sum::(); - // Checks not just if we can pay for it, yet that the would-be change output is at least - // dust - if inputs < (fee + S::dust(coin).0) { - // Write off these inputs - *operating_costs_in_effect += inputs; - // Yet also claw back the payments we dropped, as we only lost the change - // The dropped payments will be worth less than the inputs + operating_costs we started - // with, so this shouldn't use `saturating_sub` - *operating_costs_in_effect -= amortized; - None?; - } - } else { - // Since we have payments which can pay the fee we ended up with, amortize it - let adjusted_fee = (*operating_costs_in_effect + fee).saturating_sub(amortized); - let per_payment_base_fee = adjusted_fee / u64::try_from(payments.len()).unwrap(); - let payments_paying_one_atomic_unit_more = - usize::try_from(adjusted_fee % u64::try_from(payments.len()).unwrap()).unwrap(); - - for (i, payment) in payments.iter_mut().enumerate() { - let per_payment_fee = - per_payment_base_fee + u64::from(u8::from(i < payments_paying_one_atomic_unit_more)); - payment.balance().amount.0 -= per_payment_fee; - amortized += per_payment_fee; - } - assert!(amortized >= (*operating_costs_in_effect + fee)); - - // If the change is less than the dust, drop it - let would_be_change = inputs.iter().map(|input| input.balance().amount.0).sum::() - - payments.iter().map(|payment| payment.balance().amount.0).sum::() - - fee; - if would_be_change < S::dust(coin).0 { - change = None; - *operating_costs_in_effect += would_be_change; - } - } - - // Update the amount of operating costs - *operating_costs_in_effect = (*operating_costs_in_effect + fee).saturating_sub(amortized); - } - - // Because we amortized, or accrued as operating costs, the fee, make the transaction - let effected_payments = payments.iter().map(|payment| payment.balance().amount).collect(); - let has_change = change.is_some(); - let PlannedTransaction { signable, eventuality, auxilliary } = - Self::plan(fee_rate, inputs, payments, change); - Some(AmortizePlannedTransaction { - effected_payments, - has_change, - signable, - eventuality, - auxilliary, - }) } /// Create a tree to fulfill a set of payments. diff --git a/processor/scheduler/utxo/standard/src/lib.rs b/processor/scheduler/utxo/standard/src/lib.rs index 3ae855e7..5ff786a7 100644 --- a/processor/scheduler/utxo/standard/src/lib.rs +++ b/processor/scheduler/utxo/standard/src/lib.rs @@ -2,7 +2,7 @@ #![doc = include_str!("../README.md")] #![deny(missing_docs)] -use core::marker::PhantomData; +use core::{marker::PhantomData, future::Future}; use std::collections::HashMap; use group::GroupEncoding; @@ -14,7 +14,7 @@ use serai_db::DbTxn; use primitives::{ReceivedOutput, Payment}; use scanner::{ LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, BlockFor, - SchedulerUpdate, Scheduler as SchedulerTrait, + SchedulerUpdate, KeyScopedEventualities, Scheduler as SchedulerTrait, }; use scheduler_primitives::*; use utxo_scheduler_primitives::*; @@ -23,16 +23,27 @@ mod db; use db::Db; /// A scheduler of transactions for networks premised on the UTXO model. -pub struct Scheduler>(PhantomData, PhantomData

); +#[allow(non_snake_case)] +#[derive(Clone)] +pub struct Scheduler> { + planner: P, + _S: PhantomData, +} impl> Scheduler { - fn aggregate_inputs( + /// Create a new scheduler. + pub fn new(planner: P) -> Self { + Self { planner, _S: PhantomData } + } + + async fn aggregate_inputs( + &self, txn: &mut impl DbTxn, block: &BlockFor, key_for_change: KeyFor, key: KeyFor, coin: Coin, - ) -> Vec> { + ) -> Result>, >::EphemeralError> { let mut eventualities = vec![]; let mut operating_costs = Db::::operating_costs(txn, coin).0; @@ -41,13 +52,17 @@ impl> Scheduler { while outputs.len() > P::MAX_INPUTS { let to_aggregate = outputs.drain(.. P::MAX_INPUTS).collect::>(); - let Some(planned) = P::plan_transaction_with_fee_amortization( - &mut operating_costs, - P::fee_rate(block, coin), - to_aggregate, - vec![], - Some(key_for_change), - ) else { + let Some(planned) = self + .planner + .plan_transaction_with_fee_amortization( + &mut operating_costs, + P::fee_rate(block, coin), + to_aggregate, + vec![], + Some(key_for_change), + ) + .await? + else { continue; }; @@ -57,7 +72,7 @@ impl> Scheduler { Db::::set_outputs(txn, key, coin, &outputs); Db::::set_operating_costs(txn, coin, Amount(operating_costs)); - eventualities + Ok(eventualities) } fn fulfillable_payments( @@ -140,31 +155,36 @@ impl> Scheduler { } } - fn handle_branch( + async fn handle_branch( + &self, txn: &mut impl DbTxn, block: &BlockFor, eventualities: &mut Vec>, output: OutputFor, tx: TreeTransaction>, - ) -> bool { + ) -> Result>::EphemeralError> { let key = output.key(); let coin = output.balance().coin; let Some(payments) = tx.payments::(coin, &P::branch_address(key), output.balance().amount.0) else { // If this output has become too small to satisfy this branch, drop it - return false; + return Ok(false); }; - let Some(planned) = P::plan_transaction_with_fee_amortization( - // Uses 0 as there's no operating costs to incur/amortize here - &mut 0, - P::fee_rate(block, coin), - vec![output], - payments, - None, - ) else { + let Some(planned) = self + .planner + .plan_transaction_with_fee_amortization( + // Uses 0 as there's no operating costs to incur/amortize here + &mut 0, + P::fee_rate(block, coin), + vec![output], + payments, + None, + ) + .await? + else { // This Branch isn't viable, so drop it (and its children) - return false; + return Ok(false); }; TransactionsToSign::::send(txn, &key, &planned.signable); @@ -172,15 +192,16 @@ impl> Scheduler { Self::queue_branches(txn, key, coin, planned.effected_payments, tx); - true + Ok(true) } - fn step( + async fn step( + &self, txn: &mut impl DbTxn, active_keys: &[(KeyFor, LifetimeStage)], block: &BlockFor, key: KeyFor, - ) -> Vec> { + ) -> Result>, >::EphemeralError> { let mut eventualities = vec![]; let key_for_change = match active_keys[0].1 { @@ -198,7 +219,8 @@ impl> Scheduler { let coin = *coin; // Perform any input aggregation we should - eventualities.append(&mut Self::aggregate_inputs(txn, block, key_for_change, key, coin)); + eventualities + .append(&mut self.aggregate_inputs(txn, block, key_for_change, key, coin).await?); // Fetch the operating costs/outputs let mut operating_costs = Db::::operating_costs(txn, coin).0; @@ -228,15 +250,19 @@ impl> Scheduler { // scanner API) let mut planned_outer = None; for i in 0 .. 2 { - let Some(planned) = P::plan_transaction_with_fee_amortization( - &mut operating_costs, - P::fee_rate(block, coin), - outputs.clone(), - tree[0] - .payments::(coin, &branch_address, tree[0].value()) - .expect("payments were dropped despite providing an input of the needed value"), - Some(key_for_change), - ) else { + let Some(planned) = self + .planner + .plan_transaction_with_fee_amortization( + &mut operating_costs, + P::fee_rate(block, coin), + outputs.clone(), + tree[0] + .payments::(coin, &branch_address, tree[0].value()) + .expect("payments were dropped despite providing an input of the needed value"), + Some(key_for_change), + ) + .await? + else { // This should trip on the first iteration or not at all assert_eq!(i, 0); // This doesn't have inputs even worth aggregating so drop the entire tree @@ -272,46 +298,53 @@ impl> Scheduler { Self::queue_branches(txn, key, coin, planned.effected_payments, tree.remove(0)); } - eventualities + Ok(eventualities) } - fn flush_outputs( + async fn flush_outputs( + &self, txn: &mut impl DbTxn, - eventualities: &mut HashMap, Vec>>, + eventualities: &mut KeyScopedEventualities, block: &BlockFor, from: KeyFor, to: KeyFor, coin: Coin, - ) { + ) -> Result<(), >::EphemeralError> { let from_bytes = from.to_bytes().as_ref().to_vec(); // Ensure our inputs are aggregated eventualities .entry(from_bytes.clone()) .or_insert(vec![]) - .append(&mut Self::aggregate_inputs(txn, block, to, from, coin)); + .append(&mut self.aggregate_inputs(txn, block, to, from, coin).await?); // Now that our inputs are aggregated, transfer all of them to the new key let mut operating_costs = Db::::operating_costs(txn, coin).0; let outputs = Db::::outputs(txn, from, coin).unwrap(); if outputs.is_empty() { - return; + return Ok(()); } - let planned = P::plan_transaction_with_fee_amortization( - &mut operating_costs, - P::fee_rate(block, coin), - outputs, - vec![], - Some(to), - ); + let planned = self + .planner + .plan_transaction_with_fee_amortization( + &mut operating_costs, + P::fee_rate(block, coin), + outputs, + vec![], + Some(to), + ) + .await?; Db::::set_operating_costs(txn, coin, Amount(operating_costs)); - let Some(planned) = planned else { return }; + let Some(planned) = planned else { return Ok(()) }; TransactionsToSign::::send(txn, &from, &planned.signable); eventualities.get_mut(&from_bytes).unwrap().push(planned.eventuality); + + Ok(()) } } impl> SchedulerTrait for Scheduler { + type EphemeralError = P::EphemeralError; type SignableTransaction = P::SignableTransaction; fn activate_key(txn: &mut impl DbTxn, key: KeyFor) { @@ -324,29 +357,32 @@ impl> SchedulerTrait for Schedul } fn flush_key( + &self, txn: &mut impl DbTxn, block: &BlockFor, retiring_key: KeyFor, new_key: KeyFor, - ) -> HashMap, Vec>> { - let mut eventualities = HashMap::new(); - for coin in S::NETWORK.coins() { - // Move the payments to the new key - { - let still_queued = Db::::queued_payments(txn, retiring_key, *coin).unwrap(); - let mut new_queued = Db::::queued_payments(txn, new_key, *coin).unwrap(); + ) -> impl Send + Future, Self::EphemeralError>> { + async move { + let mut eventualities = HashMap::new(); + for coin in S::NETWORK.coins() { + // Move the payments to the new key + { + let still_queued = Db::::queued_payments(txn, retiring_key, *coin).unwrap(); + let mut new_queued = Db::::queued_payments(txn, new_key, *coin).unwrap(); - let mut queued = still_queued; - queued.append(&mut new_queued); + let mut queued = still_queued; + queued.append(&mut new_queued); - Db::::set_queued_payments(txn, retiring_key, *coin, &[]); - Db::::set_queued_payments(txn, new_key, *coin, &queued); + Db::::set_queued_payments(txn, retiring_key, *coin, &[]); + Db::::set_queued_payments(txn, new_key, *coin, &queued); + } + + // Move the outputs to the new key + self.flush_outputs(txn, &mut eventualities, block, retiring_key, new_key, *coin).await?; } - - // Move the outputs to the new key - Self::flush_outputs(txn, &mut eventualities, block, retiring_key, new_key, *coin); + Ok(eventualities) } - eventualities } fn retire_key(txn: &mut impl DbTxn, key: KeyFor) { @@ -359,155 +395,174 @@ impl> SchedulerTrait for Schedul } fn update( + &self, txn: &mut impl DbTxn, block: &BlockFor, active_keys: &[(KeyFor, LifetimeStage)], update: SchedulerUpdate, - ) -> HashMap, Vec>> { - let mut eventualities = HashMap::new(); + ) -> impl Send + Future, Self::EphemeralError>> { + async move { + let mut eventualities = HashMap::new(); - // Accumulate the new outputs - { - let mut outputs_by_key = HashMap::new(); - for output in update.outputs() { - // If this aligns for a branch, handle it - if let Some(branch) = Db::::take_pending_branch(txn, output.key(), output.balance()) { - if Self::handle_branch( - txn, - block, - eventualities.entry(output.key().to_bytes().as_ref().to_vec()).or_insert(vec![]), - output.clone(), - branch, - ) { - // If we could use it for a branch, we do and move on - // Else, we let it be accumulated by the standard accumulation code - continue; + // Accumulate the new outputs + { + let mut outputs_by_key = HashMap::new(); + for output in update.outputs() { + // If this aligns for a branch, handle it + if let Some(branch) = Db::::take_pending_branch(txn, output.key(), output.balance()) { + if self + .handle_branch( + txn, + block, + eventualities.entry(output.key().to_bytes().as_ref().to_vec()).or_insert(vec![]), + output.clone(), + branch, + ) + .await? + { + // If we could use it for a branch, we do and move on + // Else, we let it be accumulated by the standard accumulation code + continue; + } + } + + let coin = output.balance().coin; + outputs_by_key + // Index by key and coin + .entry((output.key().to_bytes().as_ref().to_vec(), coin)) + // If we haven't accumulated here prior, read the outputs from the database + .or_insert_with(|| (output.key(), Db::::outputs(txn, output.key(), coin).unwrap())) + .1 + .push(output.clone()); + } + // Write the outputs back to the database + for ((_key_vec, coin), (key, outputs)) in outputs_by_key { + Db::::set_outputs(txn, key, coin, &outputs); + } + } + + // Fulfill the payments we prior couldn't + for (key, _stage) in active_keys { + eventualities + .entry(key.to_bytes().as_ref().to_vec()) + .or_insert(vec![]) + .append(&mut self.step(txn, active_keys, block, *key).await?); + } + + // If this key has been flushed, forward all outputs + match active_keys[0].1 { + LifetimeStage::ActiveYetNotReporting | + LifetimeStage::Active | + LifetimeStage::UsingNewForChange => {} + LifetimeStage::Forwarding | LifetimeStage::Finishing => { + for coin in S::NETWORK.coins() { + self + .flush_outputs( + txn, + &mut eventualities, + block, + active_keys[0].0, + active_keys[1].0, + *coin, + ) + .await?; } } - - let coin = output.balance().coin; - outputs_by_key - // Index by key and coin - .entry((output.key().to_bytes().as_ref().to_vec(), coin)) - // If we haven't accumulated here prior, read the outputs from the database - .or_insert_with(|| (output.key(), Db::::outputs(txn, output.key(), coin).unwrap())) - .1 - .push(output.clone()); } - // Write the outputs back to the database - for ((_key_vec, coin), (key, outputs)) in outputs_by_key { - Db::::set_outputs(txn, key, coin, &outputs); - } - } - // Fulfill the payments we prior couldn't - for (key, _stage) in active_keys { - eventualities - .entry(key.to_bytes().as_ref().to_vec()) - .or_insert(vec![]) - .append(&mut Self::step(txn, active_keys, block, *key)); - } + // Create the transactions for the forwards/burns + { + let mut planned_txs = vec![]; + for forward in update.forwards() { + let key = forward.key(); - // If this key has been flushed, forward all outputs - match active_keys[0].1 { - LifetimeStage::ActiveYetNotReporting | - LifetimeStage::Active | - LifetimeStage::UsingNewForChange => {} - LifetimeStage::Forwarding | LifetimeStage::Finishing => { - for coin in S::NETWORK.coins() { - Self::flush_outputs( - txn, - &mut eventualities, - block, - active_keys[0].0, - active_keys[1].0, - *coin, - ); + assert_eq!(active_keys.len(), 2); + assert_eq!(active_keys[0].1, LifetimeStage::Forwarding); + assert_eq!(active_keys[1].1, LifetimeStage::Active); + let forward_to_key = active_keys[1].0; + + let Some(plan) = self + .planner + .plan_transaction_with_fee_amortization( + // This uses 0 for the operating costs as we don't incur any here + // If the output can't pay for itself to be forwarded, we simply drop it + &mut 0, + P::fee_rate(block, forward.balance().coin), + vec![forward.clone()], + vec![Payment::new(P::forwarding_address(forward_to_key), forward.balance(), None)], + None, + ) + .await? + else { + continue; + }; + planned_txs.push((key, plan)); } + for to_return in update.returns() { + let key = to_return.output().key(); + let out_instruction = + Payment::new(to_return.address().clone(), to_return.output().balance(), None); + let Some(plan) = self + .planner + .plan_transaction_with_fee_amortization( + // This uses 0 for the operating costs as we don't incur any here + // If the output can't pay for itself to be returned, we simply drop it + &mut 0, + P::fee_rate(block, out_instruction.balance().coin), + vec![to_return.output().clone()], + vec![out_instruction], + None, + ) + .await? + else { + continue; + }; + planned_txs.push((key, plan)); + } + + for (key, planned_tx) in planned_txs { + // Send the transactions off for signing + TransactionsToSign::::send(txn, &key, &planned_tx.signable); + + // Insert the Eventualities into the result + eventualities.get_mut(key.to_bytes().as_ref()).unwrap().push(planned_tx.eventuality); + } + + Ok(eventualities) } } - - // Create the transactions for the forwards/burns - { - let mut planned_txs = vec![]; - for forward in update.forwards() { - let key = forward.key(); - - assert_eq!(active_keys.len(), 2); - assert_eq!(active_keys[0].1, LifetimeStage::Forwarding); - assert_eq!(active_keys[1].1, LifetimeStage::Active); - let forward_to_key = active_keys[1].0; - - let Some(plan) = P::plan_transaction_with_fee_amortization( - // This uses 0 for the operating costs as we don't incur any here - // If the output can't pay for itself to be forwarded, we simply drop it - &mut 0, - P::fee_rate(block, forward.balance().coin), - vec![forward.clone()], - vec![Payment::new(P::forwarding_address(forward_to_key), forward.balance(), None)], - None, - ) else { - continue; - }; - planned_txs.push((key, plan)); - } - for to_return in update.returns() { - let key = to_return.output().key(); - let out_instruction = - Payment::new(to_return.address().clone(), to_return.output().balance(), None); - let Some(plan) = P::plan_transaction_with_fee_amortization( - // This uses 0 for the operating costs as we don't incur any here - // If the output can't pay for itself to be returned, we simply drop it - &mut 0, - P::fee_rate(block, out_instruction.balance().coin), - vec![to_return.output().clone()], - vec![out_instruction], - None, - ) else { - continue; - }; - planned_txs.push((key, plan)); - } - - for (key, planned_tx) in planned_txs { - // Send the transactions off for signing - TransactionsToSign::::send(txn, &key, &planned_tx.signable); - - // Insert the Eventualities into the result - eventualities.get_mut(key.to_bytes().as_ref()).unwrap().push(planned_tx.eventuality); - } - - eventualities - } } fn fulfill( + &self, txn: &mut impl DbTxn, block: &BlockFor, active_keys: &[(KeyFor, LifetimeStage)], payments: Vec>>, - ) -> HashMap, Vec>> { - // Find the key to filfill these payments with - let fulfillment_key = match active_keys[0].1 { - LifetimeStage::ActiveYetNotReporting => { - panic!("expected to fulfill payments despite not reporting for the oldest key") + ) -> impl Send + Future, Self::EphemeralError>> { + async move { + // Find the key to filfill these payments with + let fulfillment_key = match active_keys[0].1 { + LifetimeStage::ActiveYetNotReporting => { + panic!("expected to fulfill payments despite not reporting for the oldest key") + } + LifetimeStage::Active | LifetimeStage::UsingNewForChange => active_keys[0].0, + LifetimeStage::Forwarding | LifetimeStage::Finishing => active_keys[1].0, + }; + + // Queue the payments for this key + for coin in S::NETWORK.coins() { + let mut queued_payments = Db::::queued_payments(txn, fulfillment_key, *coin).unwrap(); + queued_payments + .extend(payments.iter().filter(|payment| payment.balance().coin == *coin).cloned()); + Db::::set_queued_payments(txn, fulfillment_key, *coin, &queued_payments); } - LifetimeStage::Active | LifetimeStage::UsingNewForChange => active_keys[0].0, - LifetimeStage::Forwarding | LifetimeStage::Finishing => active_keys[1].0, - }; - // Queue the payments for this key - for coin in S::NETWORK.coins() { - let mut queued_payments = Db::::queued_payments(txn, fulfillment_key, *coin).unwrap(); - queued_payments - .extend(payments.iter().filter(|payment| payment.balance().coin == *coin).cloned()); - Db::::set_queued_payments(txn, fulfillment_key, *coin, &queued_payments); + // Handle the queued payments + Ok(HashMap::from([( + fulfillment_key.to_bytes().as_ref().to_vec(), + self.step(txn, active_keys, block, fulfillment_key).await?, + )])) } - - // Handle the queued payments - HashMap::from([( - fulfillment_key.to_bytes().as_ref().to_vec(), - Self::step(txn, active_keys, block, fulfillment_key), - )]) } } diff --git a/processor/scheduler/utxo/transaction-chaining/src/lib.rs b/processor/scheduler/utxo/transaction-chaining/src/lib.rs index e43f5fec..cb0a8b15 100644 --- a/processor/scheduler/utxo/transaction-chaining/src/lib.rs +++ b/processor/scheduler/utxo/transaction-chaining/src/lib.rs @@ -2,7 +2,7 @@ #![doc = include_str!("../README.md")] #![deny(missing_docs)] -use core::marker::PhantomData; +use core::{marker::PhantomData, future::Future}; use std::collections::HashMap; use group::GroupEncoding; @@ -14,7 +14,7 @@ use serai_db::DbTxn; use primitives::{OutputType, ReceivedOutput, Payment}; use scanner::{ LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, BlockFor, - SchedulerUpdate, Scheduler as SchedulerTrait, + SchedulerUpdate, KeyScopedEventualities, Scheduler as SchedulerTrait, }; use scheduler_primitives::*; use utxo_scheduler_primitives::*; @@ -27,12 +27,19 @@ pub struct EffectedReceivedOutputs(pub Vec>); /// A scheduler of transactions for networks premised on the UTXO model which support /// transaction chaining. -pub struct Scheduler>>( - PhantomData, - PhantomData

, -); +#[allow(non_snake_case)] +#[derive(Clone)] +pub struct Scheduler>> { + planner: P, + _S: PhantomData, +} impl>> Scheduler { + /// Create a new scheduler. + pub fn new(planner: P) -> Self { + Self { planner, _S: PhantomData } + } + fn accumulate_outputs(txn: &mut impl DbTxn, outputs: Vec>, from_scanner: bool) { let mut outputs_by_key = HashMap::new(); for output in outputs { @@ -59,13 +66,14 @@ impl>> Sched } } - fn aggregate_inputs( + async fn aggregate_inputs( + &self, txn: &mut impl DbTxn, block: &BlockFor, key_for_change: KeyFor, key: KeyFor, coin: Coin, - ) -> Vec> { + ) -> Result>, >::EphemeralError> { let mut eventualities = vec![]; let mut operating_costs = Db::::operating_costs(txn, coin).0; @@ -74,13 +82,17 @@ impl>> Sched let to_aggregate = outputs.drain(.. P::MAX_INPUTS).collect::>(); Db::::set_outputs(txn, key, coin, &outputs); - let Some(planned) = P::plan_transaction_with_fee_amortization( - &mut operating_costs, - P::fee_rate(block, coin), - to_aggregate, - vec![], - Some(key_for_change), - ) else { + let Some(planned) = self + .planner + .plan_transaction_with_fee_amortization( + &mut operating_costs, + P::fee_rate(block, coin), + to_aggregate, + vec![], + Some(key_for_change), + ) + .await? + else { continue; }; @@ -93,7 +105,7 @@ impl>> Sched } Db::::set_operating_costs(txn, coin, Amount(operating_costs)); - eventualities + Ok(eventualities) } fn fulfillable_payments( @@ -151,12 +163,13 @@ impl>> Sched } } - fn step( + async fn step( + &self, txn: &mut impl DbTxn, active_keys: &[(KeyFor, LifetimeStage)], block: &BlockFor, key: KeyFor, - ) -> Vec> { + ) -> Result>, >::EphemeralError> { let mut eventualities = vec![]; let key_for_change = match active_keys[0].1 { @@ -174,7 +187,8 @@ impl>> Sched let coin = *coin; // Perform any input aggregation we should - eventualities.append(&mut Self::aggregate_inputs(txn, block, key_for_change, key, coin)); + eventualities + .append(&mut self.aggregate_inputs(txn, block, key_for_change, key, coin).await?); // Fetch the operating costs/outputs let mut operating_costs = Db::::operating_costs(txn, coin).0; @@ -211,15 +225,19 @@ impl>> Sched // scanner API) let mut planned_outer = None; for i in 0 .. 2 { - let Some(planned) = P::plan_transaction_with_fee_amortization( - &mut operating_costs, - P::fee_rate(block, coin), - outputs.clone(), - tree[0] - .payments::(coin, &branch_address, tree[0].value()) - .expect("payments were dropped despite providing an input of the needed value"), - Some(key_for_change), - ) else { + let Some(planned) = self + .planner + .plan_transaction_with_fee_amortization( + &mut operating_costs, + P::fee_rate(block, coin), + outputs.clone(), + tree[0] + .payments::(coin, &branch_address, tree[0].value()) + .expect("payments were dropped despite providing an input of the needed value"), + Some(key_for_change), + ) + .await? + else { // This should trip on the first iteration or not at all assert_eq!(i, 0); // This doesn't have inputs even worth aggregating so drop the entire tree @@ -300,14 +318,18 @@ impl>> Sched }; let branch_output_id = branch_output.id(); - let Some(mut planned) = P::plan_transaction_with_fee_amortization( - // Uses 0 as there's no operating costs to incur/amortize here - &mut 0, - P::fee_rate(block, coin), - vec![branch_output], - payments, - None, - ) else { + let Some(mut planned) = self + .planner + .plan_transaction_with_fee_amortization( + // Uses 0 as there's no operating costs to incur/amortize here + &mut 0, + P::fee_rate(block, coin), + vec![branch_output], + payments, + None, + ) + .await? + else { // This Branch isn't viable, so drop it (and its children) continue; }; @@ -328,49 +350,56 @@ impl>> Sched } } - eventualities + Ok(eventualities) } - fn flush_outputs( + async fn flush_outputs( + &self, txn: &mut impl DbTxn, - eventualities: &mut HashMap, Vec>>, + eventualities: &mut KeyScopedEventualities, block: &BlockFor, from: KeyFor, to: KeyFor, coin: Coin, - ) { + ) -> Result<(), >::EphemeralError> { let from_bytes = from.to_bytes().as_ref().to_vec(); // Ensure our inputs are aggregated eventualities .entry(from_bytes.clone()) .or_insert(vec![]) - .append(&mut Self::aggregate_inputs(txn, block, to, from, coin)); + .append(&mut self.aggregate_inputs(txn, block, to, from, coin).await?); // Now that our inputs are aggregated, transfer all of them to the new key let mut operating_costs = Db::::operating_costs(txn, coin).0; let outputs = Db::::outputs(txn, from, coin).unwrap(); if outputs.is_empty() { - return; + return Ok(()); } - let planned = P::plan_transaction_with_fee_amortization( - &mut operating_costs, - P::fee_rate(block, coin), - outputs, - vec![], - Some(to), - ); + let planned = self + .planner + .plan_transaction_with_fee_amortization( + &mut operating_costs, + P::fee_rate(block, coin), + outputs, + vec![], + Some(to), + ) + .await?; Db::::set_operating_costs(txn, coin, Amount(operating_costs)); - let Some(planned) = planned else { return }; + let Some(planned) = planned else { return Ok(()) }; TransactionsToSign::::send(txn, &from, &planned.signable); eventualities.get_mut(&from_bytes).unwrap().push(planned.eventuality); Self::accumulate_outputs(txn, planned.auxilliary.0, false); + + Ok(()) } } impl>> SchedulerTrait for Scheduler { + type EphemeralError = P::EphemeralError; type SignableTransaction = P::SignableTransaction; fn activate_key(txn: &mut impl DbTxn, key: KeyFor) { @@ -383,29 +412,32 @@ impl>> Sched } fn flush_key( + &self, txn: &mut impl DbTxn, block: &BlockFor, retiring_key: KeyFor, new_key: KeyFor, - ) -> HashMap, Vec>> { - let mut eventualities = HashMap::new(); - for coin in S::NETWORK.coins() { - // Move the payments to the new key - { - let still_queued = Db::::queued_payments(txn, retiring_key, *coin).unwrap(); - let mut new_queued = Db::::queued_payments(txn, new_key, *coin).unwrap(); + ) -> impl Send + Future, Self::EphemeralError>> { + async move { + let mut eventualities = HashMap::new(); + for coin in S::NETWORK.coins() { + // Move the payments to the new key + { + let still_queued = Db::::queued_payments(txn, retiring_key, *coin).unwrap(); + let mut new_queued = Db::::queued_payments(txn, new_key, *coin).unwrap(); - let mut queued = still_queued; - queued.append(&mut new_queued); + let mut queued = still_queued; + queued.append(&mut new_queued); - Db::::set_queued_payments(txn, retiring_key, *coin, &[]); - Db::::set_queued_payments(txn, new_key, *coin, &queued); + Db::::set_queued_payments(txn, retiring_key, *coin, &[]); + Db::::set_queued_payments(txn, new_key, *coin, &queued); + } + + // Move the outputs to the new key + self.flush_outputs(txn, &mut eventualities, block, retiring_key, new_key, *coin).await?; } - - // Move the outputs to the new key - Self::flush_outputs(txn, &mut eventualities, block, retiring_key, new_key, *coin); + Ok(eventualities) } - eventualities } fn retire_key(txn: &mut impl DbTxn, key: KeyFor) { @@ -418,121 +450,137 @@ impl>> Sched } fn update( + &self, txn: &mut impl DbTxn, block: &BlockFor, active_keys: &[(KeyFor, LifetimeStage)], update: SchedulerUpdate, - ) -> HashMap, Vec>> { - Self::accumulate_outputs(txn, update.outputs().to_vec(), true); + ) -> impl Send + Future, Self::EphemeralError>> { + async move { + Self::accumulate_outputs(txn, update.outputs().to_vec(), true); - // Fulfill the payments we prior couldn't - let mut eventualities = HashMap::new(); - for (key, _stage) in active_keys { - assert!(eventualities - .insert(key.to_bytes().as_ref().to_vec(), Self::step(txn, active_keys, block, *key)) - .is_none()); - } + // Fulfill the payments we prior couldn't + let mut eventualities = HashMap::new(); + for (key, _stage) in active_keys { + assert!(eventualities + .insert(key.to_bytes().as_ref().to_vec(), self.step(txn, active_keys, block, *key).await?) + .is_none()); + } - // If this key has been flushed, forward all outputs - match active_keys[0].1 { - LifetimeStage::ActiveYetNotReporting | - LifetimeStage::Active | - LifetimeStage::UsingNewForChange => {} - LifetimeStage::Forwarding | LifetimeStage::Finishing => { - for coin in S::NETWORK.coins() { - Self::flush_outputs( - txn, - &mut eventualities, - block, - active_keys[0].0, - active_keys[1].0, - *coin, - ); + // If this key has been flushed, forward all outputs + match active_keys[0].1 { + LifetimeStage::ActiveYetNotReporting | + LifetimeStage::Active | + LifetimeStage::UsingNewForChange => {} + LifetimeStage::Forwarding | LifetimeStage::Finishing => { + for coin in S::NETWORK.coins() { + self + .flush_outputs( + txn, + &mut eventualities, + block, + active_keys[0].0, + active_keys[1].0, + *coin, + ) + .await?; + } } } - } - // Create the transactions for the forwards/burns - { - let mut planned_txs = vec![]; - for forward in update.forwards() { - let key = forward.key(); + // Create the transactions for the forwards/burns + { + let mut planned_txs = vec![]; + for forward in update.forwards() { + let key = forward.key(); - assert_eq!(active_keys.len(), 2); - assert_eq!(active_keys[0].1, LifetimeStage::Forwarding); - assert_eq!(active_keys[1].1, LifetimeStage::Active); - let forward_to_key = active_keys[1].0; + assert_eq!(active_keys.len(), 2); + assert_eq!(active_keys[0].1, LifetimeStage::Forwarding); + assert_eq!(active_keys[1].1, LifetimeStage::Active); + let forward_to_key = active_keys[1].0; - let Some(plan) = P::plan_transaction_with_fee_amortization( - // This uses 0 for the operating costs as we don't incur any here - // If the output can't pay for itself to be forwarded, we simply drop it - &mut 0, - P::fee_rate(block, forward.balance().coin), - vec![forward.clone()], - vec![Payment::new(P::forwarding_address(forward_to_key), forward.balance(), None)], - None, - ) else { - continue; - }; - planned_txs.push((key, plan)); + let Some(plan) = self + .planner + .plan_transaction_with_fee_amortization( + // This uses 0 for the operating costs as we don't incur any here + // If the output can't pay for itself to be forwarded, we simply drop it + &mut 0, + P::fee_rate(block, forward.balance().coin), + vec![forward.clone()], + vec![Payment::new(P::forwarding_address(forward_to_key), forward.balance(), None)], + None, + ) + .await? + else { + continue; + }; + planned_txs.push((key, plan)); + } + for to_return in update.returns() { + let key = to_return.output().key(); + let out_instruction = + Payment::new(to_return.address().clone(), to_return.output().balance(), None); + let Some(plan) = self + .planner + .plan_transaction_with_fee_amortization( + // This uses 0 for the operating costs as we don't incur any here + // If the output can't pay for itself to be returned, we simply drop it + &mut 0, + P::fee_rate(block, out_instruction.balance().coin), + vec![to_return.output().clone()], + vec![out_instruction], + None, + ) + .await? + else { + continue; + }; + planned_txs.push((key, plan)); + } + + for (key, planned_tx) in planned_txs { + // Send the transactions off for signing + TransactionsToSign::::send(txn, &key, &planned_tx.signable); + + // Insert the Eventualities into the result + eventualities.get_mut(key.to_bytes().as_ref()).unwrap().push(planned_tx.eventuality); + } + + Ok(eventualities) } - for to_return in update.returns() { - let key = to_return.output().key(); - let out_instruction = - Payment::new(to_return.address().clone(), to_return.output().balance(), None); - let Some(plan) = P::plan_transaction_with_fee_amortization( - // This uses 0 for the operating costs as we don't incur any here - // If the output can't pay for itself to be returned, we simply drop it - &mut 0, - P::fee_rate(block, out_instruction.balance().coin), - vec![to_return.output().clone()], - vec![out_instruction], - None, - ) else { - continue; - }; - planned_txs.push((key, plan)); - } - - for (key, planned_tx) in planned_txs { - // Send the transactions off for signing - TransactionsToSign::::send(txn, &key, &planned_tx.signable); - - // Insert the Eventualities into the result - eventualities.get_mut(key.to_bytes().as_ref()).unwrap().push(planned_tx.eventuality); - } - - eventualities } } fn fulfill( + &self, txn: &mut impl DbTxn, block: &BlockFor, active_keys: &[(KeyFor, LifetimeStage)], payments: Vec>>, - ) -> HashMap, Vec>> { - // Find the key to filfill these payments with - let fulfillment_key = match active_keys[0].1 { - LifetimeStage::ActiveYetNotReporting => { - panic!("expected to fulfill payments despite not reporting for the oldest key") + ) -> impl Send + Future, Self::EphemeralError>> { + async move { + // Find the key to filfill these payments with + let fulfillment_key = match active_keys[0].1 { + LifetimeStage::ActiveYetNotReporting => { + panic!("expected to fulfill payments despite not reporting for the oldest key") + } + LifetimeStage::Active | LifetimeStage::UsingNewForChange => active_keys[0].0, + LifetimeStage::Forwarding | LifetimeStage::Finishing => active_keys[1].0, + }; + + // Queue the payments for this key + for coin in S::NETWORK.coins() { + let mut queued_payments = Db::::queued_payments(txn, fulfillment_key, *coin).unwrap(); + queued_payments + .extend(payments.iter().filter(|payment| payment.balance().coin == *coin).cloned()); + Db::::set_queued_payments(txn, fulfillment_key, *coin, &queued_payments); } - LifetimeStage::Active | LifetimeStage::UsingNewForChange => active_keys[0].0, - LifetimeStage::Forwarding | LifetimeStage::Finishing => active_keys[1].0, - }; - // Queue the payments for this key - for coin in S::NETWORK.coins() { - let mut queued_payments = Db::::queued_payments(txn, fulfillment_key, *coin).unwrap(); - queued_payments - .extend(payments.iter().filter(|payment| payment.balance().coin == *coin).cloned()); - Db::::set_queued_payments(txn, fulfillment_key, *coin, &queued_payments); + // Handle the queued payments + Ok(HashMap::from([( + fulfillment_key.to_bytes().as_ref().to_vec(), + self.step(txn, active_keys, block, fulfillment_key).await?, + )])) } - - // Handle the queued payments - HashMap::from([( - fulfillment_key.to_bytes().as_ref().to_vec(), - Self::step(txn, active_keys, block, fulfillment_key), - )]) } }