From 920303e1b4c60ee298c920db530a3a8cc92019a0 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 29 Aug 2024 14:57:43 -0400 Subject: [PATCH] Add helper to intake Eventualities --- processor/scanner/src/eventuality/mod.rs | 80 ++++++++++++------------ processor/scanner/src/lib.rs | 1 - 2 files changed, 40 insertions(+), 41 deletions(-) diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index 38176ed4..7b5e3eed 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use group::GroupEncoding; use serai_db::{Get, DbTxn, Db}; @@ -10,7 +12,7 @@ use crate::{ OutputWithInInstruction, ReceiverScanData, ScannerGlobalDb, SubstrateToEventualityDb, ScanToEventualityDb, }, - BlockExt, ScannerFeed, KeyFor, SchedulerUpdate, Scheduler, sort_outputs, + BlockExt, ScannerFeed, KeyFor, EventualityFor, SchedulerUpdate, Scheduler, sort_outputs, scan::{next_to_scan_for_outputs_block, queue_output_until_block}, }; @@ -28,6 +30,29 @@ pub(crate) fn latest_scannable_block(getter: &impl Get) -> Optio .map(|b| b + S::WINDOW_LENGTH - 1) } +/// Intake a set of Eventualities into the DB. +/// +/// The HashMap is keyed by the key these Eventualities are for. +fn intake_eventualities( + txn: &mut impl DbTxn, + to_intake: HashMap, Vec>>, +) { + for (key, new_eventualities) in to_intake { + let key = { + let mut key_repr = as GroupEncoding>::Repr::default(); + assert_eq!(key.len(), key_repr.as_ref().len()); + key_repr.as_mut().copy_from_slice(&key); + KeyFor::::from_bytes(&key_repr).unwrap() + }; + + let mut eventualities = EventualityDb::::eventualities(txn, key); + for new_eventuality in new_eventualities { + eventualities.insert(new_eventuality); + } + EventualityDb::::set_eventualities(txn, key, &eventualities); + } +} + /* When we scan a block, we receive outputs. When this block is acknowledged, we accumulate those outputs into some scheduler, potentially causing certain transactions to begin their signing @@ -106,22 +131,7 @@ impl> EventualityTask { intaked_any = true; let new_eventualities = self.scheduler.fulfill(&mut txn, burns); - - // TODO: De-duplicate this with below instance via a helper function - for (key, new_eventualities) in new_eventualities { - let key = { - let mut key_repr = as GroupEncoding>::Repr::default(); - assert_eq!(key.len(), key_repr.as_ref().len()); - key_repr.as_mut().copy_from_slice(&key); - KeyFor::::from_bytes(&key_repr).unwrap() - }; - - let mut eventualities = EventualityDb::::eventualities(&txn, key); - for new_eventuality in new_eventualities { - eventualities.insert(new_eventuality); - } - EventualityDb::::set_eventualities(&mut txn, key, &eventualities); - } + intake_eventualities::(&mut txn, new_eventualities); } txn.commit(); } @@ -310,30 +320,20 @@ impl> ContinuallyRan for EventualityTas } // Update the scheduler - let mut scheduler_update = SchedulerUpdate { outputs, forwards, returns }; - scheduler_update.outputs.sort_by(sort_outputs); - scheduler_update.forwards.sort_by(sort_outputs); - scheduler_update.returns.sort_by(|a, b| sort_outputs(&a.output, &b.output)); - // Intake the new Eventualities - let new_eventualities = self.scheduler.update(&mut txn, scheduler_update); - for (key, new_eventualities) in new_eventualities { - let key = { - let mut key_repr = as GroupEncoding>::Repr::default(); - assert_eq!(key.len(), key_repr.as_ref().len()); - key_repr.as_mut().copy_from_slice(&key); - KeyFor::::from_bytes(&key_repr).unwrap() - }; - - keys - .iter() - .find(|serai_key| serai_key.key == key) - .expect("queueing eventuality for key which isn't active"); - - let mut eventualities = EventualityDb::::eventualities(&txn, key); - for new_eventuality in new_eventualities { - eventualities.insert(new_eventuality); + { + let mut scheduler_update = SchedulerUpdate { outputs, forwards, returns }; + scheduler_update.outputs.sort_by(sort_outputs); + scheduler_update.forwards.sort_by(sort_outputs); + scheduler_update.returns.sort_by(|a, b| sort_outputs(&a.output, &b.output)); + // Intake the new Eventualities + let new_eventualities = self.scheduler.update(&mut txn, scheduler_update); + for key in new_eventualities.keys() { + keys + .iter() + .find(|serai_key| serai_key.key.to_bytes().as_ref() == key.as_slice()) + .expect("intaking Eventuality for key which isn't active"); } - EventualityDb::::set_eventualities(&mut txn, key, &eventualities); + intake_eventualities::(&mut txn, new_eventualities); } // Now that we've intaked any Eventualities caused, check if we're retiring any keys diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 77bed7fc..5f7e44a2 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -242,7 +242,6 @@ impl Scanner { /// Create a new scanner. /// /// This will begin its execution, spawning several asynchronous tasks. - // TODO: Take start_time and binary search here? pub async fn new( db: impl Db, feed: S,