Allow scheduler's creation of transactions to be async and error

I don't love this, but it's the only way to select decoys without using a local
database. While the prior commit added such a databse, the performance of it
presumably wasn't viable, and while TODOs marked the needed improvements, it
was still messy with an immense scope re: any auditing.

The relevant scheduler functions now take `&self` (intentional, as all
mutations should be via the `&mut impl DbTxn` passed). The calls to `&self` are
expected to be completely deterministic (as usual).
This commit is contained in:
Luke Parker 2024-09-14 01:09:35 -04:00
parent 2edc2f3612
commit e1ad897f7e
11 changed files with 723 additions and 854 deletions

View file

@ -15,7 +15,7 @@ use serai_db::{Get, DbTxn, Db as DbTrait, create_db, db_channel};
use primitives::EncodableG; use primitives::EncodableG;
use ::key_gen::{KeyGenParams, KeyGen}; use ::key_gen::{KeyGenParams, KeyGen};
use scheduler::SignableTransaction; use scheduler::{SignableTransaction, TransactionFor};
use scanner::{ScannerFeed, Scanner, KeyFor, Scheduler}; use scanner::{ScannerFeed, Scanner, KeyFor, Scheduler};
use signers::{TransactionPublisher, Signers}; use signers::{TransactionPublisher, Signers};
@ -161,22 +161,23 @@ async fn first_block_after_time<S: ScannerFeed>(feed: &S, serai_time: u64) -> u6
pub async fn main_loop< pub async fn main_loop<
S: ScannerFeed, S: ScannerFeed,
K: KeyGenParams<ExternalNetworkCiphersuite: Ciphersuite<G = KeyFor<S>>>, K: KeyGenParams<ExternalNetworkCiphersuite: Ciphersuite<G = KeyFor<S>>>,
Sch: Scheduler< Sch: Clone
S, + Scheduler<
SignableTransaction: SignableTransaction<Ciphersuite = K::ExternalNetworkCiphersuite>, S,
>, SignableTransaction: SignableTransaction<Ciphersuite = K::ExternalNetworkCiphersuite>,
P: TransactionPublisher<<Sch::SignableTransaction as SignableTransaction>::Transaction>, >,
>( >(
mut db: Db, mut db: Db,
feed: S, feed: S,
publisher: P, scheduler: Sch,
publisher: impl TransactionPublisher<TransactionFor<Sch::SignableTransaction>>,
) { ) {
let mut coordinator = Coordinator::new(db.clone()); let mut coordinator = Coordinator::new(db.clone());
let mut key_gen = key_gen::<K>(); let mut key_gen = key_gen::<K>();
let mut scanner = Scanner::new::<Sch>(db.clone(), feed.clone()).await; let mut scanner = Scanner::new(db.clone(), feed.clone(), scheduler.clone()).await;
let mut signers = let mut signers =
Signers::<Db, S, Sch, P>::new(db.clone(), coordinator.coordinator_send(), publisher); Signers::<Db, S, Sch, _>::new(db.clone(), coordinator.coordinator_send(), publisher);
loop { loop {
let db_clone = db.clone(); let db_clone = db.clone();
@ -242,8 +243,10 @@ pub async fn main_loop<
if session == Session(0) { if session == Session(0) {
assert!(scanner.is_none()); assert!(scanner.is_none());
let start_block = first_block_after_time(&feed, serai_time).await; let start_block = first_block_after_time(&feed, serai_time).await;
scanner = scanner = Some(
Some(Scanner::initialize::<Sch>(db_clone, feed.clone(), start_block, key.0).await); Scanner::initialize(db_clone, feed.clone(), scheduler.clone(), start_block, key.0)
.await,
);
} }
} }
messages::substrate::CoordinatorMessage::SlashesReported { session } => { messages::substrate::CoordinatorMessage::SlashesReported { session } => {

View file

@ -22,7 +22,7 @@ use crate::key_gen::KeyGenParams;
mod rpc; mod rpc;
use rpc::Rpc; use rpc::Rpc;
mod scheduler; mod scheduler;
use scheduler::Scheduler; use scheduler::{Planner, Scheduler};
// Our custom code for Bitcoin // Our custom code for Bitcoin
mod db; mod db;
@ -57,7 +57,7 @@ async fn main() {
tokio::spawn(TxIndexTask(feed.clone()).continually_run(index_task, vec![])); tokio::spawn(TxIndexTask(feed.clone()).continually_run(index_task, vec![]));
core::mem::forget(index_handle); core::mem::forget(index_handle);
bin::main_loop::<_, KeyGenParams, Scheduler<_>, Rpc<bin::Db>>(db, feed.clone(), feed).await; bin::main_loop::<_, KeyGenParams, _>(db, feed.clone(), Scheduler::new(Planner), feed).await;
} }
/* /*

View file

@ -1,3 +1,5 @@
use core::future::Future;
use ciphersuite::{Ciphersuite, Secp256k1}; use ciphersuite::{Ciphersuite, Secp256k1};
use bitcoin_serai::{ use bitcoin_serai::{
@ -89,8 +91,10 @@ fn signable_transaction<D: Db>(
.map(|bst| (SignableTransaction { inputs, payments, change, fee_per_vbyte }, bst)) .map(|bst| (SignableTransaction { inputs, payments, change, fee_per_vbyte }, bst))
} }
#[derive(Clone)]
pub(crate) struct Planner; pub(crate) struct Planner;
impl<D: Db> TransactionPlanner<Rpc<D>, EffectedReceivedOutputs<Rpc<D>>> for Planner { impl<D: Db> TransactionPlanner<Rpc<D>, EffectedReceivedOutputs<Rpc<D>>> for Planner {
type EphemeralError = ();
type FeeRate = u64; type FeeRate = u64;
type SignableTransaction = SignableTransaction; type SignableTransaction = SignableTransaction;
@ -153,50 +157,59 @@ impl<D: Db> TransactionPlanner<Rpc<D>, EffectedReceivedOutputs<Rpc<D>>> for Plan
} }
fn plan( fn plan(
&self,
fee_rate: Self::FeeRate, fee_rate: Self::FeeRate,
inputs: Vec<OutputFor<Rpc<D>>>, inputs: Vec<OutputFor<Rpc<D>>>,
payments: Vec<Payment<AddressFor<Rpc<D>>>>, payments: Vec<Payment<AddressFor<Rpc<D>>>>,
change: Option<KeyFor<Rpc<D>>>, change: Option<KeyFor<Rpc<D>>>,
) -> PlannedTransaction<Rpc<D>, Self::SignableTransaction, EffectedReceivedOutputs<Rpc<D>>> { ) -> impl Send
let key = inputs.first().unwrap().key(); + Future<
for input in &inputs { Output = Result<
assert_eq!(key, input.key()); PlannedTransaction<Rpc<D>, Self::SignableTransaction, EffectedReceivedOutputs<Rpc<D>>>,
} Self::EphemeralError,
>,
> {
async move {
let key = inputs.first().unwrap().key();
for input in &inputs {
assert_eq!(key, input.key());
}
let singular_spent_output = (inputs.len() == 1).then(|| inputs[0].id()); let singular_spent_output = (inputs.len() == 1).then(|| inputs[0].id());
match signable_transaction::<D>(fee_rate, inputs.clone(), payments, change) { match signable_transaction::<D>(fee_rate, inputs.clone(), payments, change) {
Ok(tx) => PlannedTransaction { Ok(tx) => Ok(PlannedTransaction {
signable: tx.0, signable: tx.0,
eventuality: Eventuality { txid: tx.1.txid(), singular_spent_output }, eventuality: Eventuality { txid: tx.1.txid(), singular_spent_output },
auxilliary: EffectedReceivedOutputs({ auxilliary: EffectedReceivedOutputs({
let tx = tx.1.transaction(); let tx = tx.1.transaction();
let scanner = scanner(key); let scanner = scanner(key);
let mut res = vec![]; let mut res = vec![];
for output in scanner.scan_transaction(tx) { for output in scanner.scan_transaction(tx) {
res.push(Output::new_with_presumed_origin( res.push(Output::new_with_presumed_origin(
key, key,
tx, tx,
// It shouldn't matter if this is wrong as we should never try to return these // It shouldn't matter if this is wrong as we should never try to return these
// We still provide an accurate value to ensure a lack of discrepancies // We still provide an accurate value to ensure a lack of discrepancies
Some(Address::new(inputs[0].output.output().script_pubkey.clone()).unwrap()), Some(Address::new(inputs[0].output.output().script_pubkey.clone()).unwrap()),
output, output,
)); ));
} }
res res
}),
}), }),
}, Err(
Err( TransactionError::NoInputs | TransactionError::NoOutputs | TransactionError::DustPayment,
TransactionError::NoInputs | TransactionError::NoOutputs | TransactionError::DustPayment, ) => panic!("malformed arguments to plan"),
) => panic!("malformed arguments to plan"), // No data, we have a minimum fee rate, we checked the amount of inputs/outputs
// No data, we have a minimum fee rate, we checked the amount of inputs/outputs Err(
Err( TransactionError::TooMuchData |
TransactionError::TooMuchData | TransactionError::TooLowFee |
TransactionError::TooLowFee | TransactionError::TooLargeTransaction,
TransactionError::TooLargeTransaction, ) => unreachable!(),
) => unreachable!(), Err(TransactionError::NotEnoughFunds { .. }) => {
Err(TransactionError::NotEnoughFunds { .. }) => { panic!("plan called for a transaction without enough funds")
panic!("plan called for a transaction without enough funds") }
} }
} }
} }

View file

@ -1,294 +0,0 @@
use core::{
future::Future,
ops::{Bound, RangeBounds},
};
use curve25519_dalek::{
scalar::Scalar,
edwards::{CompressedEdwardsY, EdwardsPoint},
};
use monero_wallet::{
DEFAULT_LOCK_WINDOW,
primitives::Commitment,
transaction::{Timelock, Input, Pruned, Transaction},
rpc::{OutputInformation, RpcError, Rpc as MRpcTrait, DecoyRpc},
};
use borsh::{BorshSerialize, BorshDeserialize};
use serai_db::{Get, DbTxn, Db, create_db};
use primitives::task::ContinuallyRan;
use scanner::ScannerFeed;
use crate::Rpc;
#[derive(BorshSerialize, BorshDeserialize)]
struct EncodableOutputInformation {
height: u64,
timelocked: bool,
key: [u8; 32],
commitment: [u8; 32],
}
create_db! {
MoneroProcessorDecoys {
NextToIndexBlock: () -> u64,
PriorIndexedBlock: () -> [u8; 32],
DistributionStartBlock: () -> u64,
Distribution: () -> Vec<u64>,
Out: (index: u64) -> EncodableOutputInformation,
}
}
/*
We want to be able to select decoys when planning transactions, but planning transactions is a
synchronous process. We store the decoys to a local database and have our database implement
`DecoyRpc` to achieve synchronous decoy selection.
This is only needed as the transactions we sign must have decoys decided and agreed upon. With
FCMP++s, we'll be able to sign transactions without the membership proof, letting any signer
prove for membership after the fact (with their local views). Until then, this task remains.
*/
pub(crate) struct DecoysTask<D: Db> {
pub(crate) rpc: Rpc<D>,
pub(crate) current_distribution: Vec<u64>,
}
impl<D: Db> ContinuallyRan for DecoysTask<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let finalized_block_number = self
.rpc
.rpc
.get_height()
.await
.map_err(|e| format!("couldn't fetch latest block number: {e:?}"))?
.checked_sub(Rpc::<D>::CONFIRMATIONS.try_into().unwrap())
.ok_or(format!(
"blockchain only just started and doesn't have {} blocks yet",
Rpc::<D>::CONFIRMATIONS
))?;
if NextToIndexBlock::get(&self.rpc.db).is_none() {
let distribution = self
.rpc
.rpc
.get_output_distribution(..= finalized_block_number)
.await
.map_err(|e| format!("failed to get output distribution: {e:?}"))?;
if distribution.is_empty() {
Err("distribution was empty".to_string())?;
}
let distribution_start_block = finalized_block_number - (distribution.len() - 1);
// There may have been a reorg between the time of getting the distribution and the time of
// getting this block. This is an invariant and assumed not to have happened in the split
// second it's possible.
let block = self
.rpc
.rpc
.get_block_by_number(distribution_start_block)
.await
.map_err(|e| format!("failed to get the start block for the distribution: {e:?}"))?;
let mut txn = self.rpc.db.txn();
NextToIndexBlock::set(&mut txn, &distribution_start_block.try_into().unwrap());
PriorIndexedBlock::set(&mut txn, &block.header.previous);
DistributionStartBlock::set(&mut txn, &u64::try_from(distribution_start_block).unwrap());
txn.commit();
}
let next_to_index_block =
usize::try_from(NextToIndexBlock::get(&self.rpc.db).unwrap()).unwrap();
if next_to_index_block >= finalized_block_number {
return Ok(false);
}
for b in next_to_index_block ..= finalized_block_number {
// Fetch the block
let block = self
.rpc
.rpc
.get_block_by_number(b)
.await
.map_err(|e| format!("decoys task failed to fetch block: {e:?}"))?;
let prior = PriorIndexedBlock::get(&self.rpc.db).unwrap();
if block.header.previous != prior {
panic!(
"decoys task detected reorg: expected {}, found {}",
hex::encode(prior),
hex::encode(block.header.previous)
);
}
// Fetch the transactions in the block
let transactions = self
.rpc
.rpc
.get_pruned_transactions(&block.transactions)
.await
.map_err(|e| format!("failed to get the pruned transactions within a block: {e:?}"))?;
fn outputs(
list: &mut Vec<EncodableOutputInformation>,
block_number: u64,
tx: Transaction<Pruned>,
) {
match tx {
Transaction::V1 { .. } => {}
Transaction::V2 { prefix, proofs } => {
for (i, output) in prefix.outputs.into_iter().enumerate() {
list.push(EncodableOutputInformation {
// This is correct per the documentation on OutputInformation, which this maps to
height: block_number,
timelocked: prefix.additional_timelock != Timelock::None,
key: output.key.to_bytes(),
commitment: if matches!(
prefix.inputs.first().expect("Monero transaction had no inputs"),
Input::Gen(_)
) {
Commitment::new(
Scalar::ONE,
output.amount.expect("miner transaction outputs didn't have amounts set"),
)
.calculate()
.compress()
.to_bytes()
} else {
proofs
.as_ref()
.expect("non-miner V2 transaction didn't have proofs")
.base
.commitments
.get(i)
.expect("amount of commitments didn't match amount of outputs")
.compress()
.to_bytes()
},
});
}
}
}
}
let block_hash = block.hash();
let b = u64::try_from(b).unwrap();
let mut encodable = Vec::with_capacity(2 * (1 + block.transactions.len()));
outputs(&mut encodable, b, block.miner_transaction.into());
for transaction in transactions {
outputs(&mut encodable, b, transaction);
}
let existing_outputs = self.current_distribution.last().copied().unwrap_or(0);
let now_outputs = existing_outputs + u64::try_from(encodable.len()).unwrap();
self.current_distribution.push(now_outputs);
let mut txn = self.rpc.db.txn();
NextToIndexBlock::set(&mut txn, &(b + 1));
PriorIndexedBlock::set(&mut txn, &block_hash);
// TODO: Don't write the entire 10 MB distribution to the DB every two minutes
Distribution::set(&mut txn, &self.current_distribution);
for (b, out) in (existing_outputs .. now_outputs).zip(encodable) {
Out::set(&mut txn, b, &out);
}
txn.commit();
}
Ok(true)
}
}
}
// TODO: Cache the distribution in a static
pub(crate) struct Decoys<'a, G: Get>(&'a G);
impl<'a, G: Sync + Get> DecoyRpc for Decoys<'a, G> {
#[rustfmt::skip]
fn get_output_distribution_end_height(
&self,
) -> impl Send + Future<Output = Result<usize, RpcError>> {
async move {
Ok(NextToIndexBlock::get(self.0).map_or(0, |b| usize::try_from(b).unwrap() + 1))
}
}
fn get_output_distribution(
&self,
range: impl Send + RangeBounds<usize>,
) -> impl Send + Future<Output = Result<Vec<u64>, RpcError>> {
async move {
let from = match range.start_bound() {
Bound::Included(from) => *from,
Bound::Excluded(from) => from.checked_add(1).ok_or_else(|| {
RpcError::InternalError("range's from wasn't representable".to_string())
})?,
Bound::Unbounded => 0,
};
let to = match range.end_bound() {
Bound::Included(to) => *to,
Bound::Excluded(to) => to
.checked_sub(1)
.ok_or_else(|| RpcError::InternalError("range's to wasn't representable".to_string()))?,
Bound::Unbounded => {
panic!("requested distribution till latest block, which is non-deterministic")
}
};
if from > to {
Err(RpcError::InternalError(format!(
"malformed range: inclusive start {from}, inclusive end {to}"
)))?;
}
let distribution_start_block = usize::try_from(
DistributionStartBlock::get(self.0).expect("never populated the distribution start block"),
)
.unwrap();
let len_of_distribution_until_to =
to.checked_sub(distribution_start_block).ok_or_else(|| {
RpcError::InternalError(
"requested distribution until a block when the distribution had yet to start"
.to_string(),
)
})? +
1;
let distribution = Distribution::get(self.0).expect("never populated the distribution");
assert!(
distribution.len() >= len_of_distribution_until_to,
"requested distribution until block we have yet to index"
);
Ok(
distribution[from.saturating_sub(distribution_start_block) .. len_of_distribution_until_to]
.to_vec(),
)
}
}
fn get_outs(
&self,
_indexes: &[u64],
) -> impl Send + Future<Output = Result<Vec<OutputInformation>, RpcError>> {
async move { unimplemented!("get_outs is unused") }
}
fn get_unlocked_outputs(
&self,
indexes: &[u64],
height: usize,
fingerprintable_deterministic: bool,
) -> impl Send + Future<Output = Result<Vec<Option<[EdwardsPoint; 2]>>, RpcError>> {
assert!(fingerprintable_deterministic, "processor wasn't using deterministic output selection");
async move {
let mut res = vec![];
for index in indexes {
let out = Out::get(self.0, *index).expect("requested output we didn't index");
let unlocked = (!out.timelocked) &&
((usize::try_from(out.height).unwrap() + DEFAULT_LOCK_WINDOW) <= height);
res.push(unlocked.then(|| CompressedEdwardsY(out.key).decompress()).flatten().map(|key| {
[
key,
CompressedEdwardsY(out.commitment)
.decompress()
.expect("output with invalid commitment"),
]
}));
}
Ok(res)
}
}
}

View file

@ -16,7 +16,6 @@ use crate::key_gen::KeyGenParams;
mod rpc; mod rpc;
use rpc::Rpc; use rpc::Rpc;
mod decoys;
/* /*
mod scheduler; mod scheduler;
use scheduler::Scheduler; use scheduler::Scheduler;

View file

@ -5,7 +5,6 @@ use monero_simple_request_rpc::SimpleRequestRpc;
use serai_client::primitives::{NetworkId, Coin, Amount}; use serai_client::primitives::{NetworkId, Coin, Amount};
use serai_db::Db;
use scanner::ScannerFeed; use scanner::ScannerFeed;
use signers::TransactionPublisher; use signers::TransactionPublisher;
@ -15,12 +14,11 @@ use crate::{
}; };
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct Rpc<D: Db> { pub(crate) struct Rpc {
pub(crate) db: D,
pub(crate) rpc: SimpleRequestRpc, pub(crate) rpc: SimpleRequestRpc,
} }
impl<D: Db> ScannerFeed for Rpc<D> { impl ScannerFeed for Rpc {
const NETWORK: NetworkId = NetworkId::Monero; const NETWORK: NetworkId = NetworkId::Monero;
// Outputs aren't spendable until 10 blocks later due to the 10-block lock // Outputs aren't spendable until 10 blocks later due to the 10-block lock
// Since we assumed scanned outputs are spendable, that sets a minimum confirmation depth of 10 // Since we assumed scanned outputs are spendable, that sets a minimum confirmation depth of 10
@ -39,15 +37,16 @@ impl<D: Db> ScannerFeed for Rpc<D> {
&self, &self,
) -> impl Send + Future<Output = Result<u64, Self::EphemeralError>> { ) -> impl Send + Future<Output = Result<u64, Self::EphemeralError>> {
async move { async move {
// The decoys task only indexes finalized blocks Ok(
crate::decoys::NextToIndexBlock::get(&self.db) self
.ok_or_else(|| { .rpc
RpcError::InternalError("decoys task hasn't indexed any blocks yet".to_string()) .get_height()
})? .await?
.checked_sub(1) .checked_sub(1)
.ok_or_else(|| { .expect("connected to an invalid Monero RPC")
RpcError::InternalError("only the genesis block has been indexed".to_string()) .try_into()
}) .unwrap(),
)
} }
} }
@ -128,7 +127,7 @@ impl<D: Db> ScannerFeed for Rpc<D> {
} }
} }
impl<D: Db> TransactionPublisher<Transaction> for Rpc<D> { impl TransactionPublisher<Transaction> for Rpc {
type EphemeralError = RpcError; type EphemeralError = RpcError;
fn publish( fn publish(

View file

@ -1,4 +1,4 @@
use core::{marker::PhantomData, future::Future}; use core::future::Future;
use std::collections::{HashSet, HashMap}; use std::collections::{HashSet, HashMap};
use group::GroupEncoding; use group::GroupEncoding;
@ -102,11 +102,11 @@ fn intake_eventualities<S: ScannerFeed>(
pub(crate) struct EventualityTask<D: Db, S: ScannerFeed, Sch: Scheduler<S>> { pub(crate) struct EventualityTask<D: Db, S: ScannerFeed, Sch: Scheduler<S>> {
db: D, db: D,
feed: S, feed: S,
scheduler: PhantomData<Sch>, scheduler: Sch,
} }
impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> EventualityTask<D, S, Sch> { impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> EventualityTask<D, S, Sch> {
pub(crate) fn new(mut db: D, feed: S, start_block: u64) -> Self { pub(crate) fn new(mut db: D, feed: S, scheduler: Sch, start_block: u64) -> Self {
if EventualityDb::<S>::next_to_check_for_eventualities_block(&db).is_none() { if EventualityDb::<S>::next_to_check_for_eventualities_block(&db).is_none() {
// Initialize the DB // Initialize the DB
let mut txn = db.txn(); let mut txn = db.txn();
@ -114,7 +114,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> EventualityTask<D, S, Sch> {
txn.commit(); txn.commit();
} }
Self { db, feed, scheduler: PhantomData } Self { db, feed, scheduler }
} }
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
@ -167,15 +167,19 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> EventualityTask<D, S, Sch> {
{ {
intaked_any = true; intaked_any = true;
let new_eventualities = Sch::fulfill( let new_eventualities = self
&mut txn, .scheduler
&block, .fulfill(
&keys_with_stages, &mut txn,
burns &block,
.into_iter() &keys_with_stages,
.filter_map(|burn| Payment::<AddressFor<S>>::try_from(burn).ok()) burns
.collect(), .into_iter()
); .filter_map(|burn| Payment::<AddressFor<S>>::try_from(burn).ok())
.collect(),
)
.await
.map_err(|e| format!("failed to queue fulfilling payments: {e:?}"))?;
intake_eventualities::<S>(&mut txn, new_eventualities); intake_eventualities::<S>(&mut txn, new_eventualities);
} }
txn.commit(); txn.commit();
@ -443,8 +447,11 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
determined off an earlier block than this (enabling an earlier LifetimeStage to be determined off an earlier block than this (enabling an earlier LifetimeStage to be
used after a later one was already used). used after a later one was already used).
*/ */
let new_eventualities = let new_eventualities = self
Sch::update(&mut txn, &block, &keys_with_stages, scheduler_update); .scheduler
.update(&mut txn, &block, &keys_with_stages, scheduler_update)
.await
.map_err(|e| format!("failed to update scheduler: {e:?}"))?;
// Intake the new Eventualities // Intake the new Eventualities
for key in new_eventualities.keys() { for key in new_eventualities.keys() {
keys keys
@ -464,8 +471,11 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
key.key != keys.last().unwrap().key, key.key != keys.last().unwrap().key,
"key which was forwarding was the last key (which has no key after it to forward to)" "key which was forwarding was the last key (which has no key after it to forward to)"
); );
let new_eventualities = let new_eventualities = self
Sch::flush_key(&mut txn, &block, key.key, keys.last().unwrap().key); .scheduler
.flush_key(&mut txn, &block, key.key, keys.last().unwrap().key)
.await
.map_err(|e| format!("failed to flush key from scheduler: {e:?}"))?;
intake_eventualities::<S>(&mut txn, new_eventualities); intake_eventualities::<S>(&mut txn, new_eventualities);
} }

View file

@ -256,8 +256,17 @@ impl<S: ScannerFeed> SchedulerUpdate<S> {
} }
} }
/// Eventualities, keyed by the encoding of the key the Eventualities are for.
pub type KeyScopedEventualities<S> = HashMap<Vec<u8>, Vec<EventualityFor<S>>>;
/// The object responsible for accumulating outputs and planning new transactions. /// The object responsible for accumulating outputs and planning new transactions.
pub trait Scheduler<S: ScannerFeed>: 'static + Send { pub trait Scheduler<S: ScannerFeed>: 'static + Send {
/// An error encountered when handling updates/payments.
///
/// This MUST be an ephemeral error. Retrying handling updates/payments MUST eventually
/// resolve without manual intervention/changing the arguments.
type EphemeralError: Debug;
/// The type for a signable transaction. /// The type for a signable transaction.
type SignableTransaction: scheduler_primitives::SignableTransaction; type SignableTransaction: scheduler_primitives::SignableTransaction;
@ -278,11 +287,12 @@ pub trait Scheduler<S: ScannerFeed>: 'static + Send {
/// If the retiring key has any unfulfilled payments associated with it, those MUST be made /// If the retiring key has any unfulfilled payments associated with it, those MUST be made
/// the responsibility of the new key. /// the responsibility of the new key.
fn flush_key( fn flush_key(
&self,
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
block: &BlockFor<S>, block: &BlockFor<S>,
retiring_key: KeyFor<S>, retiring_key: KeyFor<S>,
new_key: KeyFor<S>, new_key: KeyFor<S>,
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>>; ) -> impl Send + Future<Output = Result<KeyScopedEventualities<S>, Self::EphemeralError>>;
/// Retire a key as it'll no longer be used. /// Retire a key as it'll no longer be used.
/// ///
@ -300,11 +310,12 @@ pub trait Scheduler<S: ScannerFeed>: 'static + Send {
/// The `Vec<u8>` used as the key in the returned HashMap should be the encoded key the /// The `Vec<u8>` used as the key in the returned HashMap should be the encoded key the
/// Eventualities are for. /// Eventualities are for.
fn update( fn update(
&self,
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
block: &BlockFor<S>, block: &BlockFor<S>,
active_keys: &[(KeyFor<S>, LifetimeStage)], active_keys: &[(KeyFor<S>, LifetimeStage)],
update: SchedulerUpdate<S>, update: SchedulerUpdate<S>,
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>>; ) -> impl Send + Future<Output = Result<KeyScopedEventualities<S>, Self::EphemeralError>>;
/// Fulfill a series of payments, yielding the Eventualities now to be scanned for. /// Fulfill a series of payments, yielding the Eventualities now to be scanned for.
/// ///
@ -339,11 +350,12 @@ pub trait Scheduler<S: ScannerFeed>: 'static + Send {
has an output-to-Serai, the new primary output). has an output-to-Serai, the new primary output).
*/ */
fn fulfill( fn fulfill(
&self,
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
block: &BlockFor<S>, block: &BlockFor<S>,
active_keys: &[(KeyFor<S>, LifetimeStage)], active_keys: &[(KeyFor<S>, LifetimeStage)],
payments: Vec<Payment<AddressFor<S>>>, payments: Vec<Payment<AddressFor<S>>>,
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>>; ) -> impl Send + Future<Output = Result<KeyScopedEventualities<S>, Self::EphemeralError>>;
} }
/// A representation of a scanner. /// A representation of a scanner.
@ -358,14 +370,15 @@ impl<S: ScannerFeed> Scanner<S> {
/// This will begin its execution, spawning several asynchronous tasks. /// This will begin its execution, spawning several asynchronous tasks.
/// ///
/// This will return None if the Scanner was never initialized. /// This will return None if the Scanner was never initialized.
pub async fn new<Sch: Scheduler<S>>(db: impl Db, feed: S) -> Option<Self> { pub async fn new(db: impl Db, feed: S, scheduler: impl Scheduler<S>) -> Option<Self> {
let start_block = ScannerGlobalDb::<S>::start_block(&db)?; let start_block = ScannerGlobalDb::<S>::start_block(&db)?;
let index_task = index::IndexTask::new(db.clone(), feed.clone(), start_block).await; let index_task = index::IndexTask::new(db.clone(), feed.clone(), start_block).await;
let scan_task = scan::ScanTask::new(db.clone(), feed.clone(), start_block); let scan_task = scan::ScanTask::new(db.clone(), feed.clone(), start_block);
let report_task = report::ReportTask::<_, S>::new(db.clone(), start_block); let report_task = report::ReportTask::<_, S>::new(db.clone(), start_block);
let substrate_task = substrate::SubstrateTask::<_, S>::new(db.clone()); let substrate_task = substrate::SubstrateTask::<_, S>::new(db.clone());
let eventuality_task = eventuality::EventualityTask::<_, _, Sch>::new(db, feed, start_block); let eventuality_task =
eventuality::EventualityTask::<_, _, _>::new(db, feed, scheduler, start_block);
let (index_task_def, _index_handle) = Task::new(); let (index_task_def, _index_handle) = Task::new();
let (scan_task_def, scan_handle) = Task::new(); let (scan_task_def, scan_handle) = Task::new();
@ -394,9 +407,10 @@ impl<S: ScannerFeed> Scanner<S> {
/// This will begin its execution, spawning several asynchronous tasks. /// This will begin its execution, spawning several asynchronous tasks.
/// ///
/// This passes through to `Scanner::new` if prior called. /// This passes through to `Scanner::new` if prior called.
pub async fn initialize<Sch: Scheduler<S>>( pub async fn initialize(
mut db: impl Db, mut db: impl Db,
feed: S, feed: S,
scheduler: impl Scheduler<S>,
start_block: u64, start_block: u64,
start_key: KeyFor<S>, start_key: KeyFor<S>,
) -> Self { ) -> Self {
@ -407,7 +421,7 @@ impl<S: ScannerFeed> Scanner<S> {
txn.commit(); txn.commit();
} }
Self::new::<Sch>(db, feed).await.unwrap() Self::new(db, feed, scheduler).await.unwrap()
} }
/// Acknowledge a Batch having been published on Serai. /// Acknowledge a Batch having been published on Serai.

View file

@ -2,6 +2,8 @@
#![doc = include_str!("../README.md")] #![doc = include_str!("../README.md")]
#![deny(missing_docs)] #![deny(missing_docs)]
use core::{fmt::Debug, future::Future};
use serai_primitives::{Coin, Amount}; use serai_primitives::{Coin, Amount};
use primitives::{ReceivedOutput, Payment}; use primitives::{ReceivedOutput, Payment};
@ -40,8 +42,14 @@ pub struct AmortizePlannedTransaction<S: ScannerFeed, ST: SignableTransaction, A
/// An object able to plan a transaction. /// An object able to plan a transaction.
pub trait TransactionPlanner<S: ScannerFeed, A>: 'static + Send + Sync { pub trait TransactionPlanner<S: ScannerFeed, A>: 'static + Send + Sync {
/// An error encountered when handling planning transactions.
///
/// This MUST be an ephemeral error. Retrying planning transactions MUST eventually resolve
/// resolve manual intervention/changing the arguments.
type EphemeralError: Debug;
/// The type representing a fee rate to use for transactions. /// The type representing a fee rate to use for transactions.
type FeeRate: Clone + Copy; type FeeRate: Send + Clone + Copy;
/// The type representing a signable transaction. /// The type representing a signable transaction.
type SignableTransaction: SignableTransaction; type SignableTransaction: SignableTransaction;
@ -82,11 +90,15 @@ pub trait TransactionPlanner<S: ScannerFeed, A>: 'static + Send + Sync {
/// `change` will always be an address belonging to the Serai network. If it is `Some`, a change /// `change` will always be an address belonging to the Serai network. If it is `Some`, a change
/// output must be created. /// output must be created.
fn plan( fn plan(
&self,
fee_rate: Self::FeeRate, fee_rate: Self::FeeRate,
inputs: Vec<OutputFor<S>>, inputs: Vec<OutputFor<S>>,
payments: Vec<Payment<AddressFor<S>>>, payments: Vec<Payment<AddressFor<S>>>,
change: Option<KeyFor<S>>, change: Option<KeyFor<S>>,
) -> PlannedTransaction<S, Self::SignableTransaction, A>; ) -> impl Send
+ Future<
Output = Result<PlannedTransaction<S, Self::SignableTransaction, A>, Self::EphemeralError>,
>;
/// Obtain a PlannedTransaction via amortizing the fee over the payments. /// Obtain a PlannedTransaction via amortizing the fee over the payments.
/// ///
@ -98,132 +110,142 @@ pub trait TransactionPlanner<S: ScannerFeed, A>: 'static + Send + Sync {
/// Returns `None` if the fee exceeded the inputs, or `Some` otherwise. /// Returns `None` if the fee exceeded the inputs, or `Some` otherwise.
// TODO: Enum for Change of None, Some, Mandatory // TODO: Enum for Change of None, Some, Mandatory
fn plan_transaction_with_fee_amortization( fn plan_transaction_with_fee_amortization(
&self,
operating_costs: &mut u64, operating_costs: &mut u64,
fee_rate: Self::FeeRate, fee_rate: Self::FeeRate,
inputs: Vec<OutputFor<S>>, inputs: Vec<OutputFor<S>>,
mut payments: Vec<Payment<AddressFor<S>>>, mut payments: Vec<Payment<AddressFor<S>>>,
mut change: Option<KeyFor<S>>, mut change: Option<KeyFor<S>>,
) -> Option<AmortizePlannedTransaction<S, Self::SignableTransaction, A>> { ) -> impl Send
// If there's no change output, we can't recoup any operating costs we would amortize + Future<
// We also don't have any losses if the inputs are written off/the change output is reduced Output = Result<
let mut operating_costs_if_no_change = 0; Option<AmortizePlannedTransaction<S, Self::SignableTransaction, A>>,
let operating_costs_in_effect = Self::EphemeralError,
if change.is_none() { &mut operating_costs_if_no_change } else { operating_costs }; >,
> {
async move {
// If there's no change output, we can't recoup any operating costs we would amortize
// We also don't have any losses if the inputs are written off/the change output is reduced
let mut operating_costs_if_no_change = 0;
let operating_costs_in_effect =
if change.is_none() { &mut operating_costs_if_no_change } else { operating_costs };
// Sanity checks
{
assert!(!inputs.is_empty());
assert!((!payments.is_empty()) || change.is_some());
let coin = inputs.first().unwrap().balance().coin;
for input in &inputs {
assert_eq!(coin, input.balance().coin);
}
for payment in &payments {
assert_eq!(coin, payment.balance().coin);
}
assert!(
(inputs.iter().map(|input| input.balance().amount.0).sum::<u64>() +
*operating_costs_in_effect) >=
payments.iter().map(|payment| payment.balance().amount.0).sum::<u64>(),
"attempted to fulfill payments without a sufficient input set"
);
}
// Sanity checks
{
assert!(!inputs.is_empty());
assert!((!payments.is_empty()) || change.is_some());
let coin = inputs.first().unwrap().balance().coin; let coin = inputs.first().unwrap().balance().coin;
for input in &inputs {
assert_eq!(coin, input.balance().coin); // Amortization
{
// Sort payments from high amount to low amount
payments.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0).reverse());
let mut fee = Self::calculate_fee(fee_rate, inputs.clone(), payments.clone(), change).0;
let mut amortized = 0;
while !payments.is_empty() {
// We need to pay the fee, and any accrued operating costs, minus what we've already
// amortized
let adjusted_fee = (*operating_costs_in_effect + fee).saturating_sub(amortized);
/*
Ideally, we wouldn't use a ceil div yet would be accurate about it. Any remainder could
be amortized over the largest outputs, which wouldn't be relevant here as we only work
with the smallest output. The issue is the theoretical edge case where all outputs have
the same value and are of the minimum value. In that case, none would be able to have
the remainder amortized as it'd cause them to need to be dropped. Using a ceil div
avoids this.
*/
let per_payment_fee = adjusted_fee.div_ceil(u64::try_from(payments.len()).unwrap());
// Pop the last payment if it can't pay the fee, remaining about the dust limit as it does
if payments.last().unwrap().balance().amount.0 <= (per_payment_fee + S::dust(coin).0) {
amortized += payments.pop().unwrap().balance().amount.0;
// Recalculate the fee and try again
fee = Self::calculate_fee(fee_rate, inputs.clone(), payments.clone(), change).0;
continue;
}
// Break since all of these payments shouldn't be dropped
break;
}
// If we couldn't amortize the fee over the payments, check if we even have enough to pay it
if payments.is_empty() {
// If we don't have a change output, we simply return here
// We no longer have anything to do here, nor any expectations
if change.is_none() {
return Ok(None);
}
let inputs = inputs.iter().map(|input| input.balance().amount.0).sum::<u64>();
// Checks not just if we can pay for it, yet that the would-be change output is at least
// dust
if inputs < (fee + S::dust(coin).0) {
// Write off these inputs
*operating_costs_in_effect += inputs;
// Yet also claw back the payments we dropped, as we only lost the change
// The dropped payments will be worth less than the inputs + operating_costs we started
// with, so this shouldn't use `saturating_sub`
*operating_costs_in_effect -= amortized;
return Ok(None);
}
} else {
// Since we have payments which can pay the fee we ended up with, amortize it
let adjusted_fee = (*operating_costs_in_effect + fee).saturating_sub(amortized);
let per_payment_base_fee = adjusted_fee / u64::try_from(payments.len()).unwrap();
let payments_paying_one_atomic_unit_more =
usize::try_from(adjusted_fee % u64::try_from(payments.len()).unwrap()).unwrap();
for (i, payment) in payments.iter_mut().enumerate() {
let per_payment_fee =
per_payment_base_fee + u64::from(u8::from(i < payments_paying_one_atomic_unit_more));
payment.balance().amount.0 -= per_payment_fee;
amortized += per_payment_fee;
}
assert!(amortized >= (*operating_costs_in_effect + fee));
// If the change is less than the dust, drop it
let would_be_change = inputs.iter().map(|input| input.balance().amount.0).sum::<u64>() -
payments.iter().map(|payment| payment.balance().amount.0).sum::<u64>() -
fee;
if would_be_change < S::dust(coin).0 {
change = None;
*operating_costs_in_effect += would_be_change;
}
}
// Update the amount of operating costs
*operating_costs_in_effect = (*operating_costs_in_effect + fee).saturating_sub(amortized);
} }
for payment in &payments {
assert_eq!(coin, payment.balance().coin); // Because we amortized, or accrued as operating costs, the fee, make the transaction
} let effected_payments = payments.iter().map(|payment| payment.balance().amount).collect();
assert!( let has_change = change.is_some();
(inputs.iter().map(|input| input.balance().amount.0).sum::<u64>() +
*operating_costs_in_effect) >= let PlannedTransaction { signable, eventuality, auxilliary } =
payments.iter().map(|payment| payment.balance().amount.0).sum::<u64>(), self.plan(fee_rate, inputs, payments, change).await?;
"attempted to fulfill payments without a sufficient input set" Ok(Some(AmortizePlannedTransaction {
); effected_payments,
has_change,
signable,
eventuality,
auxilliary,
}))
} }
let coin = inputs.first().unwrap().balance().coin;
// Amortization
{
// Sort payments from high amount to low amount
payments.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0).reverse());
let mut fee = Self::calculate_fee(fee_rate, inputs.clone(), payments.clone(), change).0;
let mut amortized = 0;
while !payments.is_empty() {
// We need to pay the fee, and any accrued operating costs, minus what we've already
// amortized
let adjusted_fee = (*operating_costs_in_effect + fee).saturating_sub(amortized);
/*
Ideally, we wouldn't use a ceil div yet would be accurate about it. Any remainder could
be amortized over the largest outputs, which wouldn't be relevant here as we only work
with the smallest output. The issue is the theoretical edge case where all outputs have
the same value and are of the minimum value. In that case, none would be able to have the
remainder amortized as it'd cause them to need to be dropped. Using a ceil div avoids
this.
*/
let per_payment_fee = adjusted_fee.div_ceil(u64::try_from(payments.len()).unwrap());
// Pop the last payment if it can't pay the fee, remaining about the dust limit as it does
if payments.last().unwrap().balance().amount.0 <= (per_payment_fee + S::dust(coin).0) {
amortized += payments.pop().unwrap().balance().amount.0;
// Recalculate the fee and try again
fee = Self::calculate_fee(fee_rate, inputs.clone(), payments.clone(), change).0;
continue;
}
// Break since all of these payments shouldn't be dropped
break;
}
// If we couldn't amortize the fee over the payments, check if we even have enough to pay it
if payments.is_empty() {
// If we don't have a change output, we simply return here
// We no longer have anything to do here, nor any expectations
if change.is_none() {
None?;
}
let inputs = inputs.iter().map(|input| input.balance().amount.0).sum::<u64>();
// Checks not just if we can pay for it, yet that the would-be change output is at least
// dust
if inputs < (fee + S::dust(coin).0) {
// Write off these inputs
*operating_costs_in_effect += inputs;
// Yet also claw back the payments we dropped, as we only lost the change
// The dropped payments will be worth less than the inputs + operating_costs we started
// with, so this shouldn't use `saturating_sub`
*operating_costs_in_effect -= amortized;
None?;
}
} else {
// Since we have payments which can pay the fee we ended up with, amortize it
let adjusted_fee = (*operating_costs_in_effect + fee).saturating_sub(amortized);
let per_payment_base_fee = adjusted_fee / u64::try_from(payments.len()).unwrap();
let payments_paying_one_atomic_unit_more =
usize::try_from(adjusted_fee % u64::try_from(payments.len()).unwrap()).unwrap();
for (i, payment) in payments.iter_mut().enumerate() {
let per_payment_fee =
per_payment_base_fee + u64::from(u8::from(i < payments_paying_one_atomic_unit_more));
payment.balance().amount.0 -= per_payment_fee;
amortized += per_payment_fee;
}
assert!(amortized >= (*operating_costs_in_effect + fee));
// If the change is less than the dust, drop it
let would_be_change = inputs.iter().map(|input| input.balance().amount.0).sum::<u64>() -
payments.iter().map(|payment| payment.balance().amount.0).sum::<u64>() -
fee;
if would_be_change < S::dust(coin).0 {
change = None;
*operating_costs_in_effect += would_be_change;
}
}
// Update the amount of operating costs
*operating_costs_in_effect = (*operating_costs_in_effect + fee).saturating_sub(amortized);
}
// Because we amortized, or accrued as operating costs, the fee, make the transaction
let effected_payments = payments.iter().map(|payment| payment.balance().amount).collect();
let has_change = change.is_some();
let PlannedTransaction { signable, eventuality, auxilliary } =
Self::plan(fee_rate, inputs, payments, change);
Some(AmortizePlannedTransaction {
effected_payments,
has_change,
signable,
eventuality,
auxilliary,
})
} }
/// Create a tree to fulfill a set of payments. /// Create a tree to fulfill a set of payments.

View file

@ -2,7 +2,7 @@
#![doc = include_str!("../README.md")] #![doc = include_str!("../README.md")]
#![deny(missing_docs)] #![deny(missing_docs)]
use core::marker::PhantomData; use core::{marker::PhantomData, future::Future};
use std::collections::HashMap; use std::collections::HashMap;
use group::GroupEncoding; use group::GroupEncoding;
@ -14,7 +14,7 @@ use serai_db::DbTxn;
use primitives::{ReceivedOutput, Payment}; use primitives::{ReceivedOutput, Payment};
use scanner::{ use scanner::{
LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, BlockFor, LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, BlockFor,
SchedulerUpdate, Scheduler as SchedulerTrait, SchedulerUpdate, KeyScopedEventualities, Scheduler as SchedulerTrait,
}; };
use scheduler_primitives::*; use scheduler_primitives::*;
use utxo_scheduler_primitives::*; use utxo_scheduler_primitives::*;
@ -23,16 +23,27 @@ mod db;
use db::Db; use db::Db;
/// A scheduler of transactions for networks premised on the UTXO model. /// A scheduler of transactions for networks premised on the UTXO model.
pub struct Scheduler<S: ScannerFeed, P: TransactionPlanner<S, ()>>(PhantomData<S>, PhantomData<P>); #[allow(non_snake_case)]
#[derive(Clone)]
pub struct Scheduler<S: ScannerFeed, P: TransactionPlanner<S, ()>> {
planner: P,
_S: PhantomData<S>,
}
impl<S: ScannerFeed, P: TransactionPlanner<S, ()>> Scheduler<S, P> { impl<S: ScannerFeed, P: TransactionPlanner<S, ()>> Scheduler<S, P> {
fn aggregate_inputs( /// Create a new scheduler.
pub fn new(planner: P) -> Self {
Self { planner, _S: PhantomData }
}
async fn aggregate_inputs(
&self,
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
block: &BlockFor<S>, block: &BlockFor<S>,
key_for_change: KeyFor<S>, key_for_change: KeyFor<S>,
key: KeyFor<S>, key: KeyFor<S>,
coin: Coin, coin: Coin,
) -> Vec<EventualityFor<S>> { ) -> Result<Vec<EventualityFor<S>>, <Self as SchedulerTrait<S>>::EphemeralError> {
let mut eventualities = vec![]; let mut eventualities = vec![];
let mut operating_costs = Db::<S>::operating_costs(txn, coin).0; let mut operating_costs = Db::<S>::operating_costs(txn, coin).0;
@ -41,13 +52,17 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, ()>> Scheduler<S, P> {
while outputs.len() > P::MAX_INPUTS { while outputs.len() > P::MAX_INPUTS {
let to_aggregate = outputs.drain(.. P::MAX_INPUTS).collect::<Vec<_>>(); let to_aggregate = outputs.drain(.. P::MAX_INPUTS).collect::<Vec<_>>();
let Some(planned) = P::plan_transaction_with_fee_amortization( let Some(planned) = self
&mut operating_costs, .planner
P::fee_rate(block, coin), .plan_transaction_with_fee_amortization(
to_aggregate, &mut operating_costs,
vec![], P::fee_rate(block, coin),
Some(key_for_change), to_aggregate,
) else { vec![],
Some(key_for_change),
)
.await?
else {
continue; continue;
}; };
@ -57,7 +72,7 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, ()>> Scheduler<S, P> {
Db::<S>::set_outputs(txn, key, coin, &outputs); Db::<S>::set_outputs(txn, key, coin, &outputs);
Db::<S>::set_operating_costs(txn, coin, Amount(operating_costs)); Db::<S>::set_operating_costs(txn, coin, Amount(operating_costs));
eventualities Ok(eventualities)
} }
fn fulfillable_payments( fn fulfillable_payments(
@ -140,31 +155,36 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, ()>> Scheduler<S, P> {
} }
} }
fn handle_branch( async fn handle_branch(
&self,
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
block: &BlockFor<S>, block: &BlockFor<S>,
eventualities: &mut Vec<EventualityFor<S>>, eventualities: &mut Vec<EventualityFor<S>>,
output: OutputFor<S>, output: OutputFor<S>,
tx: TreeTransaction<AddressFor<S>>, tx: TreeTransaction<AddressFor<S>>,
) -> bool { ) -> Result<bool, <Self as SchedulerTrait<S>>::EphemeralError> {
let key = output.key(); let key = output.key();
let coin = output.balance().coin; let coin = output.balance().coin;
let Some(payments) = tx.payments::<S>(coin, &P::branch_address(key), output.balance().amount.0) let Some(payments) = tx.payments::<S>(coin, &P::branch_address(key), output.balance().amount.0)
else { else {
// If this output has become too small to satisfy this branch, drop it // If this output has become too small to satisfy this branch, drop it
return false; return Ok(false);
}; };
let Some(planned) = P::plan_transaction_with_fee_amortization( let Some(planned) = self
// Uses 0 as there's no operating costs to incur/amortize here .planner
&mut 0, .plan_transaction_with_fee_amortization(
P::fee_rate(block, coin), // Uses 0 as there's no operating costs to incur/amortize here
vec![output], &mut 0,
payments, P::fee_rate(block, coin),
None, vec![output],
) else { payments,
None,
)
.await?
else {
// This Branch isn't viable, so drop it (and its children) // This Branch isn't viable, so drop it (and its children)
return false; return Ok(false);
}; };
TransactionsToSign::<P::SignableTransaction>::send(txn, &key, &planned.signable); TransactionsToSign::<P::SignableTransaction>::send(txn, &key, &planned.signable);
@ -172,15 +192,16 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, ()>> Scheduler<S, P> {
Self::queue_branches(txn, key, coin, planned.effected_payments, tx); Self::queue_branches(txn, key, coin, planned.effected_payments, tx);
true Ok(true)
} }
fn step( async fn step(
&self,
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
active_keys: &[(KeyFor<S>, LifetimeStage)], active_keys: &[(KeyFor<S>, LifetimeStage)],
block: &BlockFor<S>, block: &BlockFor<S>,
key: KeyFor<S>, key: KeyFor<S>,
) -> Vec<EventualityFor<S>> { ) -> Result<Vec<EventualityFor<S>>, <Self as SchedulerTrait<S>>::EphemeralError> {
let mut eventualities = vec![]; let mut eventualities = vec![];
let key_for_change = match active_keys[0].1 { let key_for_change = match active_keys[0].1 {
@ -198,7 +219,8 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, ()>> Scheduler<S, P> {
let coin = *coin; let coin = *coin;
// Perform any input aggregation we should // Perform any input aggregation we should
eventualities.append(&mut Self::aggregate_inputs(txn, block, key_for_change, key, coin)); eventualities
.append(&mut self.aggregate_inputs(txn, block, key_for_change, key, coin).await?);
// Fetch the operating costs/outputs // Fetch the operating costs/outputs
let mut operating_costs = Db::<S>::operating_costs(txn, coin).0; let mut operating_costs = Db::<S>::operating_costs(txn, coin).0;
@ -228,15 +250,19 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, ()>> Scheduler<S, P> {
// scanner API) // scanner API)
let mut planned_outer = None; let mut planned_outer = None;
for i in 0 .. 2 { for i in 0 .. 2 {
let Some(planned) = P::plan_transaction_with_fee_amortization( let Some(planned) = self
&mut operating_costs, .planner
P::fee_rate(block, coin), .plan_transaction_with_fee_amortization(
outputs.clone(), &mut operating_costs,
tree[0] P::fee_rate(block, coin),
.payments::<S>(coin, &branch_address, tree[0].value()) outputs.clone(),
.expect("payments were dropped despite providing an input of the needed value"), tree[0]
Some(key_for_change), .payments::<S>(coin, &branch_address, tree[0].value())
) else { .expect("payments were dropped despite providing an input of the needed value"),
Some(key_for_change),
)
.await?
else {
// This should trip on the first iteration or not at all // This should trip on the first iteration or not at all
assert_eq!(i, 0); assert_eq!(i, 0);
// This doesn't have inputs even worth aggregating so drop the entire tree // This doesn't have inputs even worth aggregating so drop the entire tree
@ -272,46 +298,53 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, ()>> Scheduler<S, P> {
Self::queue_branches(txn, key, coin, planned.effected_payments, tree.remove(0)); Self::queue_branches(txn, key, coin, planned.effected_payments, tree.remove(0));
} }
eventualities Ok(eventualities)
} }
fn flush_outputs( async fn flush_outputs(
&self,
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
eventualities: &mut HashMap<Vec<u8>, Vec<EventualityFor<S>>>, eventualities: &mut KeyScopedEventualities<S>,
block: &BlockFor<S>, block: &BlockFor<S>,
from: KeyFor<S>, from: KeyFor<S>,
to: KeyFor<S>, to: KeyFor<S>,
coin: Coin, coin: Coin,
) { ) -> Result<(), <Self as SchedulerTrait<S>>::EphemeralError> {
let from_bytes = from.to_bytes().as_ref().to_vec(); let from_bytes = from.to_bytes().as_ref().to_vec();
// Ensure our inputs are aggregated // Ensure our inputs are aggregated
eventualities eventualities
.entry(from_bytes.clone()) .entry(from_bytes.clone())
.or_insert(vec![]) .or_insert(vec![])
.append(&mut Self::aggregate_inputs(txn, block, to, from, coin)); .append(&mut self.aggregate_inputs(txn, block, to, from, coin).await?);
// Now that our inputs are aggregated, transfer all of them to the new key // Now that our inputs are aggregated, transfer all of them to the new key
let mut operating_costs = Db::<S>::operating_costs(txn, coin).0; let mut operating_costs = Db::<S>::operating_costs(txn, coin).0;
let outputs = Db::<S>::outputs(txn, from, coin).unwrap(); let outputs = Db::<S>::outputs(txn, from, coin).unwrap();
if outputs.is_empty() { if outputs.is_empty() {
return; return Ok(());
} }
let planned = P::plan_transaction_with_fee_amortization( let planned = self
&mut operating_costs, .planner
P::fee_rate(block, coin), .plan_transaction_with_fee_amortization(
outputs, &mut operating_costs,
vec![], P::fee_rate(block, coin),
Some(to), outputs,
); vec![],
Some(to),
)
.await?;
Db::<S>::set_operating_costs(txn, coin, Amount(operating_costs)); Db::<S>::set_operating_costs(txn, coin, Amount(operating_costs));
let Some(planned) = planned else { return }; let Some(planned) = planned else { return Ok(()) };
TransactionsToSign::<P::SignableTransaction>::send(txn, &from, &planned.signable); TransactionsToSign::<P::SignableTransaction>::send(txn, &from, &planned.signable);
eventualities.get_mut(&from_bytes).unwrap().push(planned.eventuality); eventualities.get_mut(&from_bytes).unwrap().push(planned.eventuality);
Ok(())
} }
} }
impl<S: ScannerFeed, P: TransactionPlanner<S, ()>> SchedulerTrait<S> for Scheduler<S, P> { impl<S: ScannerFeed, P: TransactionPlanner<S, ()>> SchedulerTrait<S> for Scheduler<S, P> {
type EphemeralError = P::EphemeralError;
type SignableTransaction = P::SignableTransaction; type SignableTransaction = P::SignableTransaction;
fn activate_key(txn: &mut impl DbTxn, key: KeyFor<S>) { fn activate_key(txn: &mut impl DbTxn, key: KeyFor<S>) {
@ -324,29 +357,32 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, ()>> SchedulerTrait<S> for Schedul
} }
fn flush_key( fn flush_key(
&self,
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
block: &BlockFor<S>, block: &BlockFor<S>,
retiring_key: KeyFor<S>, retiring_key: KeyFor<S>,
new_key: KeyFor<S>, new_key: KeyFor<S>,
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>> { ) -> impl Send + Future<Output = Result<KeyScopedEventualities<S>, Self::EphemeralError>> {
let mut eventualities = HashMap::new(); async move {
for coin in S::NETWORK.coins() { let mut eventualities = HashMap::new();
// Move the payments to the new key for coin in S::NETWORK.coins() {
{ // Move the payments to the new key
let still_queued = Db::<S>::queued_payments(txn, retiring_key, *coin).unwrap(); {
let mut new_queued = Db::<S>::queued_payments(txn, new_key, *coin).unwrap(); let still_queued = Db::<S>::queued_payments(txn, retiring_key, *coin).unwrap();
let mut new_queued = Db::<S>::queued_payments(txn, new_key, *coin).unwrap();
let mut queued = still_queued; let mut queued = still_queued;
queued.append(&mut new_queued); queued.append(&mut new_queued);
Db::<S>::set_queued_payments(txn, retiring_key, *coin, &[]); Db::<S>::set_queued_payments(txn, retiring_key, *coin, &[]);
Db::<S>::set_queued_payments(txn, new_key, *coin, &queued); Db::<S>::set_queued_payments(txn, new_key, *coin, &queued);
}
// Move the outputs to the new key
self.flush_outputs(txn, &mut eventualities, block, retiring_key, new_key, *coin).await?;
} }
Ok(eventualities)
// Move the outputs to the new key
Self::flush_outputs(txn, &mut eventualities, block, retiring_key, new_key, *coin);
} }
eventualities
} }
fn retire_key(txn: &mut impl DbTxn, key: KeyFor<S>) { fn retire_key(txn: &mut impl DbTxn, key: KeyFor<S>) {
@ -359,155 +395,174 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, ()>> SchedulerTrait<S> for Schedul
} }
fn update( fn update(
&self,
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
block: &BlockFor<S>, block: &BlockFor<S>,
active_keys: &[(KeyFor<S>, LifetimeStage)], active_keys: &[(KeyFor<S>, LifetimeStage)],
update: SchedulerUpdate<S>, update: SchedulerUpdate<S>,
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>> { ) -> impl Send + Future<Output = Result<KeyScopedEventualities<S>, Self::EphemeralError>> {
let mut eventualities = HashMap::new(); async move {
let mut eventualities = HashMap::new();
// Accumulate the new outputs // Accumulate the new outputs
{ {
let mut outputs_by_key = HashMap::new(); let mut outputs_by_key = HashMap::new();
for output in update.outputs() { for output in update.outputs() {
// If this aligns for a branch, handle it // If this aligns for a branch, handle it
if let Some(branch) = Db::<S>::take_pending_branch(txn, output.key(), output.balance()) { if let Some(branch) = Db::<S>::take_pending_branch(txn, output.key(), output.balance()) {
if Self::handle_branch( if self
txn, .handle_branch(
block, txn,
eventualities.entry(output.key().to_bytes().as_ref().to_vec()).or_insert(vec![]), block,
output.clone(), eventualities.entry(output.key().to_bytes().as_ref().to_vec()).or_insert(vec![]),
branch, output.clone(),
) { branch,
// If we could use it for a branch, we do and move on )
// Else, we let it be accumulated by the standard accumulation code .await?
continue; {
// If we could use it for a branch, we do and move on
// Else, we let it be accumulated by the standard accumulation code
continue;
}
}
let coin = output.balance().coin;
outputs_by_key
// Index by key and coin
.entry((output.key().to_bytes().as_ref().to_vec(), coin))
// If we haven't accumulated here prior, read the outputs from the database
.or_insert_with(|| (output.key(), Db::<S>::outputs(txn, output.key(), coin).unwrap()))
.1
.push(output.clone());
}
// Write the outputs back to the database
for ((_key_vec, coin), (key, outputs)) in outputs_by_key {
Db::<S>::set_outputs(txn, key, coin, &outputs);
}
}
// Fulfill the payments we prior couldn't
for (key, _stage) in active_keys {
eventualities
.entry(key.to_bytes().as_ref().to_vec())
.or_insert(vec![])
.append(&mut self.step(txn, active_keys, block, *key).await?);
}
// If this key has been flushed, forward all outputs
match active_keys[0].1 {
LifetimeStage::ActiveYetNotReporting |
LifetimeStage::Active |
LifetimeStage::UsingNewForChange => {}
LifetimeStage::Forwarding | LifetimeStage::Finishing => {
for coin in S::NETWORK.coins() {
self
.flush_outputs(
txn,
&mut eventualities,
block,
active_keys[0].0,
active_keys[1].0,
*coin,
)
.await?;
} }
} }
let coin = output.balance().coin;
outputs_by_key
// Index by key and coin
.entry((output.key().to_bytes().as_ref().to_vec(), coin))
// If we haven't accumulated here prior, read the outputs from the database
.or_insert_with(|| (output.key(), Db::<S>::outputs(txn, output.key(), coin).unwrap()))
.1
.push(output.clone());
} }
// Write the outputs back to the database
for ((_key_vec, coin), (key, outputs)) in outputs_by_key {
Db::<S>::set_outputs(txn, key, coin, &outputs);
}
}
// Fulfill the payments we prior couldn't // Create the transactions for the forwards/burns
for (key, _stage) in active_keys { {
eventualities let mut planned_txs = vec![];
.entry(key.to_bytes().as_ref().to_vec()) for forward in update.forwards() {
.or_insert(vec![]) let key = forward.key();
.append(&mut Self::step(txn, active_keys, block, *key));
}
// If this key has been flushed, forward all outputs assert_eq!(active_keys.len(), 2);
match active_keys[0].1 { assert_eq!(active_keys[0].1, LifetimeStage::Forwarding);
LifetimeStage::ActiveYetNotReporting | assert_eq!(active_keys[1].1, LifetimeStage::Active);
LifetimeStage::Active | let forward_to_key = active_keys[1].0;
LifetimeStage::UsingNewForChange => {}
LifetimeStage::Forwarding | LifetimeStage::Finishing => { let Some(plan) = self
for coin in S::NETWORK.coins() { .planner
Self::flush_outputs( .plan_transaction_with_fee_amortization(
txn, // This uses 0 for the operating costs as we don't incur any here
&mut eventualities, // If the output can't pay for itself to be forwarded, we simply drop it
block, &mut 0,
active_keys[0].0, P::fee_rate(block, forward.balance().coin),
active_keys[1].0, vec![forward.clone()],
*coin, vec![Payment::new(P::forwarding_address(forward_to_key), forward.balance(), None)],
); None,
)
.await?
else {
continue;
};
planned_txs.push((key, plan));
} }
for to_return in update.returns() {
let key = to_return.output().key();
let out_instruction =
Payment::new(to_return.address().clone(), to_return.output().balance(), None);
let Some(plan) = self
.planner
.plan_transaction_with_fee_amortization(
// This uses 0 for the operating costs as we don't incur any here
// If the output can't pay for itself to be returned, we simply drop it
&mut 0,
P::fee_rate(block, out_instruction.balance().coin),
vec![to_return.output().clone()],
vec![out_instruction],
None,
)
.await?
else {
continue;
};
planned_txs.push((key, plan));
}
for (key, planned_tx) in planned_txs {
// Send the transactions off for signing
TransactionsToSign::<P::SignableTransaction>::send(txn, &key, &planned_tx.signable);
// Insert the Eventualities into the result
eventualities.get_mut(key.to_bytes().as_ref()).unwrap().push(planned_tx.eventuality);
}
Ok(eventualities)
} }
} }
// Create the transactions for the forwards/burns
{
let mut planned_txs = vec![];
for forward in update.forwards() {
let key = forward.key();
assert_eq!(active_keys.len(), 2);
assert_eq!(active_keys[0].1, LifetimeStage::Forwarding);
assert_eq!(active_keys[1].1, LifetimeStage::Active);
let forward_to_key = active_keys[1].0;
let Some(plan) = P::plan_transaction_with_fee_amortization(
// This uses 0 for the operating costs as we don't incur any here
// If the output can't pay for itself to be forwarded, we simply drop it
&mut 0,
P::fee_rate(block, forward.balance().coin),
vec![forward.clone()],
vec![Payment::new(P::forwarding_address(forward_to_key), forward.balance(), None)],
None,
) else {
continue;
};
planned_txs.push((key, plan));
}
for to_return in update.returns() {
let key = to_return.output().key();
let out_instruction =
Payment::new(to_return.address().clone(), to_return.output().balance(), None);
let Some(plan) = P::plan_transaction_with_fee_amortization(
// This uses 0 for the operating costs as we don't incur any here
// If the output can't pay for itself to be returned, we simply drop it
&mut 0,
P::fee_rate(block, out_instruction.balance().coin),
vec![to_return.output().clone()],
vec![out_instruction],
None,
) else {
continue;
};
planned_txs.push((key, plan));
}
for (key, planned_tx) in planned_txs {
// Send the transactions off for signing
TransactionsToSign::<P::SignableTransaction>::send(txn, &key, &planned_tx.signable);
// Insert the Eventualities into the result
eventualities.get_mut(key.to_bytes().as_ref()).unwrap().push(planned_tx.eventuality);
}
eventualities
}
} }
fn fulfill( fn fulfill(
&self,
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
block: &BlockFor<S>, block: &BlockFor<S>,
active_keys: &[(KeyFor<S>, LifetimeStage)], active_keys: &[(KeyFor<S>, LifetimeStage)],
payments: Vec<Payment<AddressFor<S>>>, payments: Vec<Payment<AddressFor<S>>>,
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>> { ) -> impl Send + Future<Output = Result<KeyScopedEventualities<S>, Self::EphemeralError>> {
// Find the key to filfill these payments with async move {
let fulfillment_key = match active_keys[0].1 { // Find the key to filfill these payments with
LifetimeStage::ActiveYetNotReporting => { let fulfillment_key = match active_keys[0].1 {
panic!("expected to fulfill payments despite not reporting for the oldest key") LifetimeStage::ActiveYetNotReporting => {
panic!("expected to fulfill payments despite not reporting for the oldest key")
}
LifetimeStage::Active | LifetimeStage::UsingNewForChange => active_keys[0].0,
LifetimeStage::Forwarding | LifetimeStage::Finishing => active_keys[1].0,
};
// Queue the payments for this key
for coin in S::NETWORK.coins() {
let mut queued_payments = Db::<S>::queued_payments(txn, fulfillment_key, *coin).unwrap();
queued_payments
.extend(payments.iter().filter(|payment| payment.balance().coin == *coin).cloned());
Db::<S>::set_queued_payments(txn, fulfillment_key, *coin, &queued_payments);
} }
LifetimeStage::Active | LifetimeStage::UsingNewForChange => active_keys[0].0,
LifetimeStage::Forwarding | LifetimeStage::Finishing => active_keys[1].0,
};
// Queue the payments for this key // Handle the queued payments
for coin in S::NETWORK.coins() { Ok(HashMap::from([(
let mut queued_payments = Db::<S>::queued_payments(txn, fulfillment_key, *coin).unwrap(); fulfillment_key.to_bytes().as_ref().to_vec(),
queued_payments self.step(txn, active_keys, block, fulfillment_key).await?,
.extend(payments.iter().filter(|payment| payment.balance().coin == *coin).cloned()); )]))
Db::<S>::set_queued_payments(txn, fulfillment_key, *coin, &queued_payments);
} }
// Handle the queued payments
HashMap::from([(
fulfillment_key.to_bytes().as_ref().to_vec(),
Self::step(txn, active_keys, block, fulfillment_key),
)])
} }
} }

View file

@ -2,7 +2,7 @@
#![doc = include_str!("../README.md")] #![doc = include_str!("../README.md")]
#![deny(missing_docs)] #![deny(missing_docs)]
use core::marker::PhantomData; use core::{marker::PhantomData, future::Future};
use std::collections::HashMap; use std::collections::HashMap;
use group::GroupEncoding; use group::GroupEncoding;
@ -14,7 +14,7 @@ use serai_db::DbTxn;
use primitives::{OutputType, ReceivedOutput, Payment}; use primitives::{OutputType, ReceivedOutput, Payment};
use scanner::{ use scanner::{
LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, BlockFor, LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, BlockFor,
SchedulerUpdate, Scheduler as SchedulerTrait, SchedulerUpdate, KeyScopedEventualities, Scheduler as SchedulerTrait,
}; };
use scheduler_primitives::*; use scheduler_primitives::*;
use utxo_scheduler_primitives::*; use utxo_scheduler_primitives::*;
@ -27,12 +27,19 @@ pub struct EffectedReceivedOutputs<S: ScannerFeed>(pub Vec<OutputFor<S>>);
/// A scheduler of transactions for networks premised on the UTXO model which support /// A scheduler of transactions for networks premised on the UTXO model which support
/// transaction chaining. /// transaction chaining.
pub struct Scheduler<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>>( #[allow(non_snake_case)]
PhantomData<S>, #[derive(Clone)]
PhantomData<P>, pub struct Scheduler<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> {
); planner: P,
_S: PhantomData<S>,
}
impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Scheduler<S, P> { impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Scheduler<S, P> {
/// Create a new scheduler.
pub fn new(planner: P) -> Self {
Self { planner, _S: PhantomData }
}
fn accumulate_outputs(txn: &mut impl DbTxn, outputs: Vec<OutputFor<S>>, from_scanner: bool) { fn accumulate_outputs(txn: &mut impl DbTxn, outputs: Vec<OutputFor<S>>, from_scanner: bool) {
let mut outputs_by_key = HashMap::new(); let mut outputs_by_key = HashMap::new();
for output in outputs { for output in outputs {
@ -59,13 +66,14 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
} }
} }
fn aggregate_inputs( async fn aggregate_inputs(
&self,
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
block: &BlockFor<S>, block: &BlockFor<S>,
key_for_change: KeyFor<S>, key_for_change: KeyFor<S>,
key: KeyFor<S>, key: KeyFor<S>,
coin: Coin, coin: Coin,
) -> Vec<EventualityFor<S>> { ) -> Result<Vec<EventualityFor<S>>, <Self as SchedulerTrait<S>>::EphemeralError> {
let mut eventualities = vec![]; let mut eventualities = vec![];
let mut operating_costs = Db::<S>::operating_costs(txn, coin).0; let mut operating_costs = Db::<S>::operating_costs(txn, coin).0;
@ -74,13 +82,17 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
let to_aggregate = outputs.drain(.. P::MAX_INPUTS).collect::<Vec<_>>(); let to_aggregate = outputs.drain(.. P::MAX_INPUTS).collect::<Vec<_>>();
Db::<S>::set_outputs(txn, key, coin, &outputs); Db::<S>::set_outputs(txn, key, coin, &outputs);
let Some(planned) = P::plan_transaction_with_fee_amortization( let Some(planned) = self
&mut operating_costs, .planner
P::fee_rate(block, coin), .plan_transaction_with_fee_amortization(
to_aggregate, &mut operating_costs,
vec![], P::fee_rate(block, coin),
Some(key_for_change), to_aggregate,
) else { vec![],
Some(key_for_change),
)
.await?
else {
continue; continue;
}; };
@ -93,7 +105,7 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
} }
Db::<S>::set_operating_costs(txn, coin, Amount(operating_costs)); Db::<S>::set_operating_costs(txn, coin, Amount(operating_costs));
eventualities Ok(eventualities)
} }
fn fulfillable_payments( fn fulfillable_payments(
@ -151,12 +163,13 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
} }
} }
fn step( async fn step(
&self,
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
active_keys: &[(KeyFor<S>, LifetimeStage)], active_keys: &[(KeyFor<S>, LifetimeStage)],
block: &BlockFor<S>, block: &BlockFor<S>,
key: KeyFor<S>, key: KeyFor<S>,
) -> Vec<EventualityFor<S>> { ) -> Result<Vec<EventualityFor<S>>, <Self as SchedulerTrait<S>>::EphemeralError> {
let mut eventualities = vec![]; let mut eventualities = vec![];
let key_for_change = match active_keys[0].1 { let key_for_change = match active_keys[0].1 {
@ -174,7 +187,8 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
let coin = *coin; let coin = *coin;
// Perform any input aggregation we should // Perform any input aggregation we should
eventualities.append(&mut Self::aggregate_inputs(txn, block, key_for_change, key, coin)); eventualities
.append(&mut self.aggregate_inputs(txn, block, key_for_change, key, coin).await?);
// Fetch the operating costs/outputs // Fetch the operating costs/outputs
let mut operating_costs = Db::<S>::operating_costs(txn, coin).0; let mut operating_costs = Db::<S>::operating_costs(txn, coin).0;
@ -211,15 +225,19 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
// scanner API) // scanner API)
let mut planned_outer = None; let mut planned_outer = None;
for i in 0 .. 2 { for i in 0 .. 2 {
let Some(planned) = P::plan_transaction_with_fee_amortization( let Some(planned) = self
&mut operating_costs, .planner
P::fee_rate(block, coin), .plan_transaction_with_fee_amortization(
outputs.clone(), &mut operating_costs,
tree[0] P::fee_rate(block, coin),
.payments::<S>(coin, &branch_address, tree[0].value()) outputs.clone(),
.expect("payments were dropped despite providing an input of the needed value"), tree[0]
Some(key_for_change), .payments::<S>(coin, &branch_address, tree[0].value())
) else { .expect("payments were dropped despite providing an input of the needed value"),
Some(key_for_change),
)
.await?
else {
// This should trip on the first iteration or not at all // This should trip on the first iteration or not at all
assert_eq!(i, 0); assert_eq!(i, 0);
// This doesn't have inputs even worth aggregating so drop the entire tree // This doesn't have inputs even worth aggregating so drop the entire tree
@ -300,14 +318,18 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
}; };
let branch_output_id = branch_output.id(); let branch_output_id = branch_output.id();
let Some(mut planned) = P::plan_transaction_with_fee_amortization( let Some(mut planned) = self
// Uses 0 as there's no operating costs to incur/amortize here .planner
&mut 0, .plan_transaction_with_fee_amortization(
P::fee_rate(block, coin), // Uses 0 as there's no operating costs to incur/amortize here
vec![branch_output], &mut 0,
payments, P::fee_rate(block, coin),
None, vec![branch_output],
) else { payments,
None,
)
.await?
else {
// This Branch isn't viable, so drop it (and its children) // This Branch isn't viable, so drop it (and its children)
continue; continue;
}; };
@ -328,49 +350,56 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
} }
} }
eventualities Ok(eventualities)
} }
fn flush_outputs( async fn flush_outputs(
&self,
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
eventualities: &mut HashMap<Vec<u8>, Vec<EventualityFor<S>>>, eventualities: &mut KeyScopedEventualities<S>,
block: &BlockFor<S>, block: &BlockFor<S>,
from: KeyFor<S>, from: KeyFor<S>,
to: KeyFor<S>, to: KeyFor<S>,
coin: Coin, coin: Coin,
) { ) -> Result<(), <Self as SchedulerTrait<S>>::EphemeralError> {
let from_bytes = from.to_bytes().as_ref().to_vec(); let from_bytes = from.to_bytes().as_ref().to_vec();
// Ensure our inputs are aggregated // Ensure our inputs are aggregated
eventualities eventualities
.entry(from_bytes.clone()) .entry(from_bytes.clone())
.or_insert(vec![]) .or_insert(vec![])
.append(&mut Self::aggregate_inputs(txn, block, to, from, coin)); .append(&mut self.aggregate_inputs(txn, block, to, from, coin).await?);
// Now that our inputs are aggregated, transfer all of them to the new key // Now that our inputs are aggregated, transfer all of them to the new key
let mut operating_costs = Db::<S>::operating_costs(txn, coin).0; let mut operating_costs = Db::<S>::operating_costs(txn, coin).0;
let outputs = Db::<S>::outputs(txn, from, coin).unwrap(); let outputs = Db::<S>::outputs(txn, from, coin).unwrap();
if outputs.is_empty() { if outputs.is_empty() {
return; return Ok(());
} }
let planned = P::plan_transaction_with_fee_amortization( let planned = self
&mut operating_costs, .planner
P::fee_rate(block, coin), .plan_transaction_with_fee_amortization(
outputs, &mut operating_costs,
vec![], P::fee_rate(block, coin),
Some(to), outputs,
); vec![],
Some(to),
)
.await?;
Db::<S>::set_operating_costs(txn, coin, Amount(operating_costs)); Db::<S>::set_operating_costs(txn, coin, Amount(operating_costs));
let Some(planned) = planned else { return }; let Some(planned) = planned else { return Ok(()) };
TransactionsToSign::<P::SignableTransaction>::send(txn, &from, &planned.signable); TransactionsToSign::<P::SignableTransaction>::send(txn, &from, &planned.signable);
eventualities.get_mut(&from_bytes).unwrap().push(planned.eventuality); eventualities.get_mut(&from_bytes).unwrap().push(planned.eventuality);
Self::accumulate_outputs(txn, planned.auxilliary.0, false); Self::accumulate_outputs(txn, planned.auxilliary.0, false);
Ok(())
} }
} }
impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> SchedulerTrait<S> impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> SchedulerTrait<S>
for Scheduler<S, P> for Scheduler<S, P>
{ {
type EphemeralError = P::EphemeralError;
type SignableTransaction = P::SignableTransaction; type SignableTransaction = P::SignableTransaction;
fn activate_key(txn: &mut impl DbTxn, key: KeyFor<S>) { fn activate_key(txn: &mut impl DbTxn, key: KeyFor<S>) {
@ -383,29 +412,32 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
} }
fn flush_key( fn flush_key(
&self,
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
block: &BlockFor<S>, block: &BlockFor<S>,
retiring_key: KeyFor<S>, retiring_key: KeyFor<S>,
new_key: KeyFor<S>, new_key: KeyFor<S>,
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>> { ) -> impl Send + Future<Output = Result<KeyScopedEventualities<S>, Self::EphemeralError>> {
let mut eventualities = HashMap::new(); async move {
for coin in S::NETWORK.coins() { let mut eventualities = HashMap::new();
// Move the payments to the new key for coin in S::NETWORK.coins() {
{ // Move the payments to the new key
let still_queued = Db::<S>::queued_payments(txn, retiring_key, *coin).unwrap(); {
let mut new_queued = Db::<S>::queued_payments(txn, new_key, *coin).unwrap(); let still_queued = Db::<S>::queued_payments(txn, retiring_key, *coin).unwrap();
let mut new_queued = Db::<S>::queued_payments(txn, new_key, *coin).unwrap();
let mut queued = still_queued; let mut queued = still_queued;
queued.append(&mut new_queued); queued.append(&mut new_queued);
Db::<S>::set_queued_payments(txn, retiring_key, *coin, &[]); Db::<S>::set_queued_payments(txn, retiring_key, *coin, &[]);
Db::<S>::set_queued_payments(txn, new_key, *coin, &queued); Db::<S>::set_queued_payments(txn, new_key, *coin, &queued);
}
// Move the outputs to the new key
self.flush_outputs(txn, &mut eventualities, block, retiring_key, new_key, *coin).await?;
} }
Ok(eventualities)
// Move the outputs to the new key
Self::flush_outputs(txn, &mut eventualities, block, retiring_key, new_key, *coin);
} }
eventualities
} }
fn retire_key(txn: &mut impl DbTxn, key: KeyFor<S>) { fn retire_key(txn: &mut impl DbTxn, key: KeyFor<S>) {
@ -418,121 +450,137 @@ impl<S: ScannerFeed, P: TransactionPlanner<S, EffectedReceivedOutputs<S>>> Sched
} }
fn update( fn update(
&self,
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
block: &BlockFor<S>, block: &BlockFor<S>,
active_keys: &[(KeyFor<S>, LifetimeStage)], active_keys: &[(KeyFor<S>, LifetimeStage)],
update: SchedulerUpdate<S>, update: SchedulerUpdate<S>,
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>> { ) -> impl Send + Future<Output = Result<KeyScopedEventualities<S>, Self::EphemeralError>> {
Self::accumulate_outputs(txn, update.outputs().to_vec(), true); async move {
Self::accumulate_outputs(txn, update.outputs().to_vec(), true);
// Fulfill the payments we prior couldn't // Fulfill the payments we prior couldn't
let mut eventualities = HashMap::new(); let mut eventualities = HashMap::new();
for (key, _stage) in active_keys { for (key, _stage) in active_keys {
assert!(eventualities assert!(eventualities
.insert(key.to_bytes().as_ref().to_vec(), Self::step(txn, active_keys, block, *key)) .insert(key.to_bytes().as_ref().to_vec(), self.step(txn, active_keys, block, *key).await?)
.is_none()); .is_none());
} }
// If this key has been flushed, forward all outputs // If this key has been flushed, forward all outputs
match active_keys[0].1 { match active_keys[0].1 {
LifetimeStage::ActiveYetNotReporting | LifetimeStage::ActiveYetNotReporting |
LifetimeStage::Active | LifetimeStage::Active |
LifetimeStage::UsingNewForChange => {} LifetimeStage::UsingNewForChange => {}
LifetimeStage::Forwarding | LifetimeStage::Finishing => { LifetimeStage::Forwarding | LifetimeStage::Finishing => {
for coin in S::NETWORK.coins() { for coin in S::NETWORK.coins() {
Self::flush_outputs( self
txn, .flush_outputs(
&mut eventualities, txn,
block, &mut eventualities,
active_keys[0].0, block,
active_keys[1].0, active_keys[0].0,
*coin, active_keys[1].0,
); *coin,
)
.await?;
}
} }
} }
}
// Create the transactions for the forwards/burns // Create the transactions for the forwards/burns
{ {
let mut planned_txs = vec![]; let mut planned_txs = vec![];
for forward in update.forwards() { for forward in update.forwards() {
let key = forward.key(); let key = forward.key();
assert_eq!(active_keys.len(), 2); assert_eq!(active_keys.len(), 2);
assert_eq!(active_keys[0].1, LifetimeStage::Forwarding); assert_eq!(active_keys[0].1, LifetimeStage::Forwarding);
assert_eq!(active_keys[1].1, LifetimeStage::Active); assert_eq!(active_keys[1].1, LifetimeStage::Active);
let forward_to_key = active_keys[1].0; let forward_to_key = active_keys[1].0;
let Some(plan) = P::plan_transaction_with_fee_amortization( let Some(plan) = self
// This uses 0 for the operating costs as we don't incur any here .planner
// If the output can't pay for itself to be forwarded, we simply drop it .plan_transaction_with_fee_amortization(
&mut 0, // This uses 0 for the operating costs as we don't incur any here
P::fee_rate(block, forward.balance().coin), // If the output can't pay for itself to be forwarded, we simply drop it
vec![forward.clone()], &mut 0,
vec![Payment::new(P::forwarding_address(forward_to_key), forward.balance(), None)], P::fee_rate(block, forward.balance().coin),
None, vec![forward.clone()],
) else { vec![Payment::new(P::forwarding_address(forward_to_key), forward.balance(), None)],
continue; None,
}; )
planned_txs.push((key, plan)); .await?
else {
continue;
};
planned_txs.push((key, plan));
}
for to_return in update.returns() {
let key = to_return.output().key();
let out_instruction =
Payment::new(to_return.address().clone(), to_return.output().balance(), None);
let Some(plan) = self
.planner
.plan_transaction_with_fee_amortization(
// This uses 0 for the operating costs as we don't incur any here
// If the output can't pay for itself to be returned, we simply drop it
&mut 0,
P::fee_rate(block, out_instruction.balance().coin),
vec![to_return.output().clone()],
vec![out_instruction],
None,
)
.await?
else {
continue;
};
planned_txs.push((key, plan));
}
for (key, planned_tx) in planned_txs {
// Send the transactions off for signing
TransactionsToSign::<P::SignableTransaction>::send(txn, &key, &planned_tx.signable);
// Insert the Eventualities into the result
eventualities.get_mut(key.to_bytes().as_ref()).unwrap().push(planned_tx.eventuality);
}
Ok(eventualities)
} }
for to_return in update.returns() {
let key = to_return.output().key();
let out_instruction =
Payment::new(to_return.address().clone(), to_return.output().balance(), None);
let Some(plan) = P::plan_transaction_with_fee_amortization(
// This uses 0 for the operating costs as we don't incur any here
// If the output can't pay for itself to be returned, we simply drop it
&mut 0,
P::fee_rate(block, out_instruction.balance().coin),
vec![to_return.output().clone()],
vec![out_instruction],
None,
) else {
continue;
};
planned_txs.push((key, plan));
}
for (key, planned_tx) in planned_txs {
// Send the transactions off for signing
TransactionsToSign::<P::SignableTransaction>::send(txn, &key, &planned_tx.signable);
// Insert the Eventualities into the result
eventualities.get_mut(key.to_bytes().as_ref()).unwrap().push(planned_tx.eventuality);
}
eventualities
} }
} }
fn fulfill( fn fulfill(
&self,
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
block: &BlockFor<S>, block: &BlockFor<S>,
active_keys: &[(KeyFor<S>, LifetimeStage)], active_keys: &[(KeyFor<S>, LifetimeStage)],
payments: Vec<Payment<AddressFor<S>>>, payments: Vec<Payment<AddressFor<S>>>,
) -> HashMap<Vec<u8>, Vec<EventualityFor<S>>> { ) -> impl Send + Future<Output = Result<KeyScopedEventualities<S>, Self::EphemeralError>> {
// Find the key to filfill these payments with async move {
let fulfillment_key = match active_keys[0].1 { // Find the key to filfill these payments with
LifetimeStage::ActiveYetNotReporting => { let fulfillment_key = match active_keys[0].1 {
panic!("expected to fulfill payments despite not reporting for the oldest key") LifetimeStage::ActiveYetNotReporting => {
panic!("expected to fulfill payments despite not reporting for the oldest key")
}
LifetimeStage::Active | LifetimeStage::UsingNewForChange => active_keys[0].0,
LifetimeStage::Forwarding | LifetimeStage::Finishing => active_keys[1].0,
};
// Queue the payments for this key
for coin in S::NETWORK.coins() {
let mut queued_payments = Db::<S>::queued_payments(txn, fulfillment_key, *coin).unwrap();
queued_payments
.extend(payments.iter().filter(|payment| payment.balance().coin == *coin).cloned());
Db::<S>::set_queued_payments(txn, fulfillment_key, *coin, &queued_payments);
} }
LifetimeStage::Active | LifetimeStage::UsingNewForChange => active_keys[0].0,
LifetimeStage::Forwarding | LifetimeStage::Finishing => active_keys[1].0,
};
// Queue the payments for this key // Handle the queued payments
for coin in S::NETWORK.coins() { Ok(HashMap::from([(
let mut queued_payments = Db::<S>::queued_payments(txn, fulfillment_key, *coin).unwrap(); fulfillment_key.to_bytes().as_ref().to_vec(),
queued_payments self.step(txn, active_keys, block, fulfillment_key).await?,
.extend(payments.iter().filter(|payment| payment.balance().coin == *coin).cloned()); )]))
Db::<S>::set_queued_payments(txn, fulfillment_key, *coin, &queued_payments);
} }
// Handle the queued payments
HashMap::from([(
fulfillment_key.to_bytes().as_ref().to_vec(),
Self::step(txn, active_keys, block, fulfillment_key),
)])
} }
} }