mirror of
https://github.com/serai-dex/serai.git
synced 2025-01-22 18:54:40 +00:00
Add helper to intake Eventualities
This commit is contained in:
parent
9f4b28e5ae
commit
920303e1b4
2 changed files with 40 additions and 41 deletions
|
@ -1,3 +1,5 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use group::GroupEncoding;
|
||||
|
||||
use serai_db::{Get, DbTxn, Db};
|
||||
|
@ -10,7 +12,7 @@ use crate::{
|
|||
OutputWithInInstruction, ReceiverScanData, ScannerGlobalDb, SubstrateToEventualityDb,
|
||||
ScanToEventualityDb,
|
||||
},
|
||||
BlockExt, ScannerFeed, KeyFor, SchedulerUpdate, Scheduler, sort_outputs,
|
||||
BlockExt, ScannerFeed, KeyFor, EventualityFor, SchedulerUpdate, Scheduler, sort_outputs,
|
||||
scan::{next_to_scan_for_outputs_block, queue_output_until_block},
|
||||
};
|
||||
|
||||
|
@ -28,6 +30,29 @@ pub(crate) fn latest_scannable_block<S: ScannerFeed>(getter: &impl Get) -> Optio
|
|||
.map(|b| b + S::WINDOW_LENGTH - 1)
|
||||
}
|
||||
|
||||
/// Intake a set of Eventualities into the DB.
|
||||
///
|
||||
/// The HashMap is keyed by the key these Eventualities are for.
|
||||
fn intake_eventualities<S: ScannerFeed>(
|
||||
txn: &mut impl DbTxn,
|
||||
to_intake: HashMap<Vec<u8>, Vec<EventualityFor<S>>>,
|
||||
) {
|
||||
for (key, new_eventualities) in to_intake {
|
||||
let key = {
|
||||
let mut key_repr = <KeyFor<S> as GroupEncoding>::Repr::default();
|
||||
assert_eq!(key.len(), key_repr.as_ref().len());
|
||||
key_repr.as_mut().copy_from_slice(&key);
|
||||
KeyFor::<S>::from_bytes(&key_repr).unwrap()
|
||||
};
|
||||
|
||||
let mut eventualities = EventualityDb::<S>::eventualities(txn, key);
|
||||
for new_eventuality in new_eventualities {
|
||||
eventualities.insert(new_eventuality);
|
||||
}
|
||||
EventualityDb::<S>::set_eventualities(txn, key, &eventualities);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
When we scan a block, we receive outputs. When this block is acknowledged, we accumulate those
|
||||
outputs into some scheduler, potentially causing certain transactions to begin their signing
|
||||
|
@ -106,22 +131,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> EventualityTask<D, S, Sch> {
|
|||
intaked_any = true;
|
||||
|
||||
let new_eventualities = self.scheduler.fulfill(&mut txn, burns);
|
||||
|
||||
// TODO: De-duplicate this with below instance via a helper function
|
||||
for (key, new_eventualities) in new_eventualities {
|
||||
let key = {
|
||||
let mut key_repr = <KeyFor<S> as GroupEncoding>::Repr::default();
|
||||
assert_eq!(key.len(), key_repr.as_ref().len());
|
||||
key_repr.as_mut().copy_from_slice(&key);
|
||||
KeyFor::<S>::from_bytes(&key_repr).unwrap()
|
||||
};
|
||||
|
||||
let mut eventualities = EventualityDb::<S>::eventualities(&txn, key);
|
||||
for new_eventuality in new_eventualities {
|
||||
eventualities.insert(new_eventuality);
|
||||
}
|
||||
EventualityDb::<S>::set_eventualities(&mut txn, key, &eventualities);
|
||||
}
|
||||
intake_eventualities::<S>(&mut txn, new_eventualities);
|
||||
}
|
||||
txn.commit();
|
||||
}
|
||||
|
@ -310,30 +320,20 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
|
|||
}
|
||||
|
||||
// Update the scheduler
|
||||
let mut scheduler_update = SchedulerUpdate { outputs, forwards, returns };
|
||||
scheduler_update.outputs.sort_by(sort_outputs);
|
||||
scheduler_update.forwards.sort_by(sort_outputs);
|
||||
scheduler_update.returns.sort_by(|a, b| sort_outputs(&a.output, &b.output));
|
||||
// Intake the new Eventualities
|
||||
let new_eventualities = self.scheduler.update(&mut txn, scheduler_update);
|
||||
for (key, new_eventualities) in new_eventualities {
|
||||
let key = {
|
||||
let mut key_repr = <KeyFor<S> as GroupEncoding>::Repr::default();
|
||||
assert_eq!(key.len(), key_repr.as_ref().len());
|
||||
key_repr.as_mut().copy_from_slice(&key);
|
||||
KeyFor::<S>::from_bytes(&key_repr).unwrap()
|
||||
};
|
||||
|
||||
keys
|
||||
.iter()
|
||||
.find(|serai_key| serai_key.key == key)
|
||||
.expect("queueing eventuality for key which isn't active");
|
||||
|
||||
let mut eventualities = EventualityDb::<S>::eventualities(&txn, key);
|
||||
for new_eventuality in new_eventualities {
|
||||
eventualities.insert(new_eventuality);
|
||||
{
|
||||
let mut scheduler_update = SchedulerUpdate { outputs, forwards, returns };
|
||||
scheduler_update.outputs.sort_by(sort_outputs);
|
||||
scheduler_update.forwards.sort_by(sort_outputs);
|
||||
scheduler_update.returns.sort_by(|a, b| sort_outputs(&a.output, &b.output));
|
||||
// Intake the new Eventualities
|
||||
let new_eventualities = self.scheduler.update(&mut txn, scheduler_update);
|
||||
for key in new_eventualities.keys() {
|
||||
keys
|
||||
.iter()
|
||||
.find(|serai_key| serai_key.key.to_bytes().as_ref() == key.as_slice())
|
||||
.expect("intaking Eventuality for key which isn't active");
|
||||
}
|
||||
EventualityDb::<S>::set_eventualities(&mut txn, key, &eventualities);
|
||||
intake_eventualities::<S>(&mut txn, new_eventualities);
|
||||
}
|
||||
|
||||
// Now that we've intaked any Eventualities caused, check if we're retiring any keys
|
||||
|
|
|
@ -242,7 +242,6 @@ impl<S: ScannerFeed> Scanner<S> {
|
|||
/// Create a new scanner.
|
||||
///
|
||||
/// This will begin its execution, spawning several asynchronous tasks.
|
||||
// TODO: Take start_time and binary search here?
|
||||
pub async fn new(
|
||||
db: impl Db,
|
||||
feed: S,
|
||||
|
|
Loading…
Reference in a new issue