From fdfe520f9da1fc189083a0fc9372a5c8583dd931 Mon Sep 17 00:00:00 2001
From: Luke Parker <lukeparker5132@gmail.com>
Date: Wed, 28 Aug 2024 19:00:02 -0400
Subject: [PATCH] Add ScanDb

---
 processor/scanner/src/db.rs              | 50 +++-----------------
 processor/scanner/src/eventuality/mod.rs |  5 +-
 processor/scanner/src/report.rs          |  6 ++-
 processor/scanner/src/scan/db.rs         | 59 ++++++++++++++++++++++++
 processor/scanner/src/scan/mod.rs        | 46 +++++++++++++++---
 5 files changed, 113 insertions(+), 53 deletions(-)
 create mode 100644 processor/scanner/src/scan/db.rs

diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs
index a37e05f4..e3e31c38 100644
--- a/processor/scanner/src/db.rs
+++ b/processor/scanner/src/db.rs
@@ -9,7 +9,10 @@ use serai_in_instructions_primitives::InInstructionWithBalance;
 
 use primitives::{ReceivedOutput, EncodableG};
 
-use crate::{lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, Return};
+use crate::{
+  lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, Return,
+  scan::next_to_scan_for_outputs_block,
+};
 
 // The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this.
 trait Borshy: BorshSerialize + BorshDeserialize {}
@@ -35,7 +38,7 @@ pub(crate) struct OutputWithInInstruction<S: ScannerFeed> {
 }
 
 impl<S: ScannerFeed> OutputWithInInstruction<S> {
-  fn write(&self, writer: &mut impl io::Write) -> io::Result<()> {
+  pub(crate) fn write(&self, writer: &mut impl io::Write) -> io::Result<()> {
     self.output.write(writer)?;
     // TODO self.return_address.write(writer)?;
     self.in_instruction.encode_to(writer);
@@ -48,8 +51,6 @@ create_db!(
     ActiveKeys: <K: Borshy>() -> Vec<SeraiKeyDbEntry<K>>,
     RetireAt: <K: Encode>(key: K) -> u64,
 
-    // The next block to scan for received outputs
-    NextToScanForOutputsBlock: () -> u64,
     // The next block to potentially report
     NextToPotentiallyReportBlock: () -> u64,
     // Highest acknowledged block
@@ -74,9 +75,6 @@ create_db!(
     */
     // This collapses from `bool` to `()`, using if the value was set for true and false otherwise
     NotableBlock: (number: u64) -> (),
-
-    SerializedQueuedOutputs: (block_number: u64) -> Vec<u8>,
-    SerializedOutputs: (block_number: u64) -> Vec<u8>,
   }
 );
 
@@ -127,7 +125,7 @@ impl<S: ScannerFeed> ScannerDb<S> {
     let Some(key) = keys.first() else { return };
 
     // Get the block we're scanning for next
-    let block_number = Self::next_to_scan_for_outputs_block(txn).expect(
+    let block_number = next_to_scan_for_outputs_block::<S>(txn).expect(
       "tidying keys despite never setting the next to scan for block (done on initialization)",
     );
     // If this key is scheduled for retiry...
@@ -150,7 +148,7 @@ impl<S: ScannerFeed> ScannerDb<S> {
   ) -> Option<Vec<SeraiKey<KeyFor<S>>>> {
     // We don't take this as an argument as we don't keep all historical keys in memory
     // If we've scanned block 1,000,000, we can't answer the active keys as of block 0
-    let block_number = Self::next_to_scan_for_outputs_block(getter)?;
+    let block_number = next_to_scan_for_outputs_block::<S>(getter)?;
 
     let raw_keys: Vec<SeraiKeyDbEntry<EncodableG<KeyFor<S>>>> = ActiveKeys::get(getter)?;
     let mut keys = Vec::with_capacity(2);
@@ -183,25 +181,9 @@ impl<S: ScannerFeed> ScannerDb<S> {
   }
 
   pub(crate) fn set_start_block(txn: &mut impl DbTxn, start_block: u64, id: [u8; 32]) {
-    assert!(
-      NextToScanForOutputsBlock::get(txn).is_none(),
-      "setting start block but prior set start block"
-    );
-
-    NextToScanForOutputsBlock::set(txn, &start_block);
     NextToPotentiallyReportBlock::set(txn, &start_block);
   }
 
-  pub(crate) fn set_next_to_scan_for_outputs_block(
-    txn: &mut impl DbTxn,
-    next_to_scan_for_outputs_block: u64,
-  ) {
-    NextToScanForOutputsBlock::set(txn, &next_to_scan_for_outputs_block);
-  }
-  pub(crate) fn next_to_scan_for_outputs_block(getter: &impl Get) -> Option<u64> {
-    NextToScanForOutputsBlock::get(getter)
-  }
-
   pub(crate) fn set_next_to_potentially_report_block(
     txn: &mut impl DbTxn,
     next_to_potentially_report_block: u64,
@@ -222,24 +204,6 @@ impl<S: ScannerFeed> ScannerDb<S> {
     HighestAcknowledgedBlock::get(getter)
   }
 
-  pub(crate) fn take_queued_outputs(
-    txn: &mut impl DbTxn,
-    block_number: u64,
-  ) -> Vec<OutputWithInInstruction<S>> {
-    todo!("TODO")
-  }
-
-  pub(crate) fn queue_output_until_block(
-    txn: &mut impl DbTxn,
-    queue_for_block: u64,
-    output: &OutputWithInInstruction<S>,
-  ) {
-    let mut outputs =
-      SerializedQueuedOutputs::get(txn, queue_for_block).unwrap_or(Vec::with_capacity(128));
-    output.write(&mut outputs).unwrap();
-    SerializedQueuedOutputs::set(txn, queue_for_block, &outputs);
-  }
-
   /*
     This is so verbosely named as the DB itself already flags upon external outputs. Specifically,
     if any block yields External outputs to accumulate, we flag it as notable.
diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs
index f682bf36..a29e5301 100644
--- a/processor/scanner/src/eventuality/mod.rs
+++ b/processor/scanner/src/eventuality/mod.rs
@@ -9,6 +9,7 @@ use crate::{
   lifetime::LifetimeStage,
   db::{OutputWithInInstruction, ReceiverScanData, ScannerDb, ScanToEventualityDb},
   BlockExt, ScannerFeed, KeyFor, SchedulerUpdate, Scheduler, sort_outputs,
+  scan::{next_to_scan_for_outputs_block, queue_output_until_block},
 };
 
 mod db;
@@ -104,7 +105,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
     */
     let exclusive_upper_bound = {
       // Fetch the next to scan block
-      let next_to_scan = ScannerDb::<S>::next_to_scan_for_outputs_block(&self.db)
+      let next_to_scan = next_to_scan_for_outputs_block::<S>(&self.db)
         .expect("EventualityTask run before writing the start block");
       // If we haven't done any work, return
       if next_to_scan == 0 {
@@ -229,7 +230,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
               &txn, &forwarded,
             )
             .expect("forwarded an output yet didn't save its InInstruction to the DB");
-          ScannerDb::<S>::queue_output_until_block(
+          queue_output_until_block::<S>(
             &mut txn,
             b + S::WINDOW_LENGTH,
             &OutputWithInInstruction { output: output.clone(), return_address, in_instruction },
diff --git a/processor/scanner/src/report.rs b/processor/scanner/src/report.rs
index 39a72106..f69459f0 100644
--- a/processor/scanner/src/report.rs
+++ b/processor/scanner/src/report.rs
@@ -7,7 +7,9 @@ use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
 // TODO: Localize to Report?
 use crate::{
   db::{ScannerDb, ScanToReportDb},
-  index, ScannerFeed, ContinuallyRan,
+  index,
+  scan::next_to_scan_for_outputs_block,
+  ScannerFeed, ContinuallyRan,
 };
 
 /*
@@ -27,7 +29,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
   async fn run_iteration(&mut self) -> Result<bool, String> {
     let highest_reportable = {
       // Fetch the next to scan block
-      let next_to_scan = ScannerDb::<S>::next_to_scan_for_outputs_block(&self.db)
+      let next_to_scan = next_to_scan_for_outputs_block::<S>(&self.db)
         .expect("ReportTask run before writing the start block");
       // If we haven't done any work, return
       if next_to_scan == 0 {
diff --git a/processor/scanner/src/scan/db.rs b/processor/scanner/src/scan/db.rs
new file mode 100644
index 00000000..905e10be
--- /dev/null
+++ b/processor/scanner/src/scan/db.rs
@@ -0,0 +1,59 @@
+use core::marker::PhantomData;
+use std::io;
+
+use scale::Encode;
+use borsh::{BorshSerialize, BorshDeserialize};
+use serai_db::{Get, DbTxn, create_db};
+
+use serai_in_instructions_primitives::InInstructionWithBalance;
+
+use primitives::{EncodableG, ReceivedOutput, EventualityTracker};
+
+use crate::{
+  lifetime::LifetimeStage, db::OutputWithInInstruction, ScannerFeed, KeyFor, AddressFor, OutputFor,
+  EventualityFor, Return, scan::next_to_scan_for_outputs_block,
+};
+
+// The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this.
+trait Borshy: BorshSerialize + BorshDeserialize {}
+impl<T: BorshSerialize + BorshDeserialize> Borshy for T {}
+
+create_db!(
+  ScannerScan {
+    // The next block to scan for received outputs
+    NextToScanForOutputsBlock: () -> u64,
+
+    SerializedQueuedOutputs: (block_number: u64) -> Vec<u8>,
+  }
+);
+
+pub(crate) struct ScanDb<S: ScannerFeed>(PhantomData<S>);
+impl<S: ScannerFeed> ScanDb<S> {
+  pub(crate) fn set_next_to_scan_for_outputs_block(
+    txn: &mut impl DbTxn,
+    next_to_scan_for_outputs_block: u64,
+  ) {
+    NextToScanForOutputsBlock::set(txn, &next_to_scan_for_outputs_block);
+  }
+  pub(crate) fn next_to_scan_for_outputs_block(getter: &impl Get) -> Option<u64> {
+    NextToScanForOutputsBlock::get(getter)
+  }
+
+  pub(crate) fn take_queued_outputs(
+    txn: &mut impl DbTxn,
+    block_number: u64,
+  ) -> Vec<OutputWithInInstruction<S>> {
+    todo!("TODO")
+  }
+
+  pub(crate) fn queue_output_until_block(
+    txn: &mut impl DbTxn,
+    queue_for_block: u64,
+    output: &OutputWithInInstruction<S>,
+  ) {
+    let mut outputs =
+      SerializedQueuedOutputs::get(txn, queue_for_block).unwrap_or(Vec::with_capacity(128));
+    output.write(&mut outputs).unwrap();
+    SerializedQueuedOutputs::set(txn, queue_for_block, &outputs);
+  }
+}
diff --git a/processor/scanner/src/scan/mod.rs b/processor/scanner/src/scan/mod.rs
index 201f64a1..1f143809 100644
--- a/processor/scanner/src/scan/mod.rs
+++ b/processor/scanner/src/scan/mod.rs
@@ -1,5 +1,5 @@
 use scale::Decode;
-use serai_db::{DbTxn, Db};
+use serai_db::{Get, DbTxn, Db};
 
 use serai_primitives::MAX_DATA_LEN;
 use serai_in_instructions_primitives::{
@@ -16,6 +16,27 @@ use crate::{
   eventuality::latest_scannable_block,
 };
 
+mod db;
+use db::ScanDb;
+
+pub(crate) fn next_to_scan_for_outputs_block<S: ScannerFeed>(getter: &impl Get) -> Option<u64> {
+  ScanDb::<S>::next_to_scan_for_outputs_block(getter)
+}
+
+pub(crate) fn queue_output_until_block<S: ScannerFeed>(
+  txn: &mut impl DbTxn,
+  queue_for_block: u64,
+  output: &OutputWithInInstruction<S>,
+) {
+  assert!(
+    queue_for_block >=
+      next_to_scan_for_outputs_block::<S>(txn)
+        .expect("queueing an output despite no next-to-scan-for-outputs block"),
+    "queueing an output for a block already scanned"
+  );
+  ScanDb::<S>::queue_output_until_block(txn, queue_for_block, output)
+}
+
 // Construct an InInstruction from an external output.
 //
 // Also returns the address to return the coins to upon error.
@@ -66,6 +87,19 @@ struct ScanForOutputsTask<D: Db, S: ScannerFeed> {
   feed: S,
 }
 
+impl<D: Db, S: ScannerFeed> ScanForOutputsTask<D, S> {
+  pub(crate) fn new(mut db: D, feed: S, start_block: u64) -> Self {
+    if ScanDb::<S>::next_to_scan_for_outputs_block(&db).is_none() {
+      // Initialize the DB
+      let mut txn = db.txn();
+      ScanDb::<S>::set_next_to_scan_for_outputs_block(&mut txn, start_block);
+      txn.commit();
+    }
+
+    Self { db, feed }
+  }
+}
+
 #[async_trait::async_trait]
 impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
   async fn run_iteration(&mut self) -> Result<bool, String> {
@@ -73,7 +107,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
     let latest_scannable = latest_scannable_block::<S>(&self.db)
       .expect("ScanForOutputsTask run before writing the start block");
     // Fetch the next block to scan
-    let next_to_scan = ScannerDb::<S>::next_to_scan_for_outputs_block(&self.db)
+    let next_to_scan = ScanDb::<S>::next_to_scan_for_outputs_block(&self.db)
       .expect("ScanForOutputsTask run before writing the start block");
 
     for b in next_to_scan ..= latest_scannable {
@@ -83,7 +117,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
 
       let mut txn = self.db.txn();
 
-      assert_eq!(ScannerDb::<S>::next_to_scan_for_outputs_block(&txn).unwrap(), b);
+      assert_eq!(ScanDb::<S>::next_to_scan_for_outputs_block(&txn).unwrap(), b);
 
       // Tidy the keys, then fetch them
       // We don't have to tidy them here, we just have to somewhere, so why not here?
@@ -100,7 +134,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
       let mut in_instructions = vec![];
 
       let queued_outputs = {
-        let mut queued_outputs = ScannerDb::<S>::take_queued_outputs(&mut txn, b);
+        let mut queued_outputs = ScanDb::<S>::take_queued_outputs(&mut txn, b);
         // Sort the queued outputs in case they weren't queued in a deterministic fashion
         queued_outputs.sort_by(|a, b| sort_outputs(&a.output, &b.output));
         queued_outputs
@@ -217,7 +251,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
             // This multisig isn't yet reporting its External outputs to avoid a DoS
             // Queue the output to be reported when this multisig starts reporting
             LifetimeStage::ActiveYetNotReporting => {
-              ScannerDb::<S>::queue_output_until_block(
+              ScanDb::<S>::queue_output_until_block(
                 &mut txn,
                 key.block_at_which_reporting_starts,
                 &output_with_in_instruction,
@@ -253,7 +287,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
       // Send the InInstructions to the report task
       ScanToReportDb::<S>::send_in_instructions(&mut txn, b, in_instructions);
       // Update the next to scan block
-      ScannerDb::<S>::set_next_to_scan_for_outputs_block(&mut txn, b + 1);
+      ScanDb::<S>::set_next_to_scan_for_outputs_block(&mut txn, b + 1);
       txn.commit();
     }