Plan scheduled payments whenever outputs are received

The scheduler prior waited for the next series of payments to be added.
This commit is contained in:
Luke Parker 2023-04-13 15:40:06 -04:00
parent a509dbfad6
commit 9bea368d36
No known key found for this signature in database
2 changed files with 24 additions and 16 deletions

View file

@ -394,6 +394,7 @@ async fn run<C: Coin, D: Db, Co: Coordinator>(raw_db: D, coin: C, mut coordinato
CoordinatorMessage::Substrate(msg) => {
match msg {
// TODO: Merge this with Burns so we don't have two distinct scheduling actions
messages::substrate::CoordinatorMessage::BlockAcknowledged {
context,
key: key_vec,

View file

@ -136,21 +136,21 @@ impl<C: Coin> Scheduler<C> {
}
}
// Sort the UTXOs by amount
utxos.sort_by(|a, b| a.amount().cmp(&b.amount()).reverse());
log::info!("{} planned TXs have had their required inputs confirmed", txs.len());
// Return the now possible TXs
log::info!("created {} planned TXs to sign from now recived outputs", txs.len());
// Additionally call schedule in case these outputs enable fulfilling scheduled payments
txs.extend(self.schedule(vec![]));
txs
}
// Schedule a series of payments. This should be called after `add_outputs`.
pub fn schedule(&mut self, payments: Vec<Payment<C>>) -> Vec<Plan<C>> {
log::debug!("scheduling payments");
assert!(!payments.is_empty(), "tried to schedule zero payments");
log::info!("scheduling {} new payments", payments.len());
// Add all new payments to the list of pending payments
self.payments.extend(payments);
let payments_at_start = self.payments.len();
log::info!("{} payments are now scheduled", payments_at_start);
// If we don't have UTXOs available, don't try to continue
if self.utxos.is_empty() {
@ -177,14 +177,9 @@ impl<C: Coin> Scheduler<C> {
}
}
let mut aggregating = vec![];
let mut txs = vec![];
for chunk in utxo_chunks.drain(..) {
aggregating.push(Plan {
key: self.key,
inputs: chunk,
payments: vec![],
change: Some(self.key),
})
txs.push(Plan { key: self.key, inputs: chunk, payments: vec![], change: Some(self.key) })
}
// We want to use all possible UTXOs for all possible payments
@ -204,14 +199,23 @@ impl<C: Coin> Scheduler<C> {
if balance.checked_sub(amount).is_some() {
balance -= amount;
executing.push(self.payments.pop_front().unwrap());
} else {
// TODO: We could continue checking other payments which aren't [0]
break;
}
}
// Now that we have the list of payments we can successfully handle right now, create the TX
// for them
let mut txs = vec![self.execute(utxos, executing)];
txs.append(&mut aggregating);
log::info!("created {} TXs to sign", txs.len());
if !executing.is_empty() {
txs.push(self.execute(utxos, executing));
}
log::info!(
"created {} TXs containing {} payments to sign",
txs.len(),
payments_at_start - self.payments.len(),
);
txs
}
@ -260,6 +264,9 @@ impl<C: Coin> Scheduler<C> {
payments.drain(..).filter(|payment| payment.amount >= C::DUST).collect::<Vec<_>>();
// Sanity check this was done properly
assert!(actual >= payments.iter().map(|payment| payment.amount).sum::<u64>());
if payments.is_empty() {
return;
}
self.plans.entry(actual).or_insert(VecDeque::new()).push_back(payments);
}