From 04a971a024074d24ac2c54aa864a609849743d6d Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 28 Aug 2024 23:31:31 -0400 Subject: [PATCH] Fill in various DB functions --- processor/primitives/src/eventuality.rs | 19 ++++++++++++++--- processor/primitives/src/output.rs | 7 +++++- processor/scanner/src/db.rs | 27 +++++++++++++++++++++--- processor/scanner/src/eventuality/db.rs | 26 +++++++++++++++-------- processor/scanner/src/eventuality/mod.rs | 2 +- processor/scanner/src/report/db.rs | 6 +++++- processor/scanner/src/scan/db.rs | 9 +++++++- 7 files changed, 77 insertions(+), 19 deletions(-) diff --git a/processor/primitives/src/eventuality.rs b/processor/primitives/src/eventuality.rs index 7203031b..eb6cda9c 100644 --- a/processor/primitives/src/eventuality.rs +++ b/processor/primitives/src/eventuality.rs @@ -23,9 +23,9 @@ pub trait Eventuality: Sized + Send + Sync { fn forwarded_output(&self) -> Option; /// Read an Eventuality. - fn read(reader: &mut R) -> io::Result; - /// Serialize an Eventuality to a `Vec`. - fn serialize(&self) -> Vec; + fn read(reader: &mut impl io::Read) -> io::Result; + /// Write an Eventuality. + fn write(&self, writer: &mut impl io::Write) -> io::Result<()>; } /// A tracker of unresolved Eventualities. @@ -36,3 +36,16 @@ pub struct EventualityTracker { /// These are keyed by their lookups. pub active_eventualities: HashMap, E>, } + +impl Default for EventualityTracker { + fn default() -> Self { + EventualityTracker { active_eventualities: HashMap::new() } + } +} + +impl EventualityTracker { + /// Insert an Eventuality into the tracker. + pub fn insert(&mut self, eventuality: E) { + self.active_eventualities.insert(eventuality.lookup(), eventuality); + } +} diff --git a/processor/primitives/src/output.rs b/processor/primitives/src/output.rs index 152a59e0..777b2c52 100644 --- a/processor/primitives/src/output.rs +++ b/processor/primitives/src/output.rs @@ -8,7 +8,12 @@ use serai_primitives::{ExternalAddress, Balance}; use crate::Id; /// An address on the external network. -pub trait Address: Send + Sync + TryFrom {} +pub trait Address: Send + Sync + TryFrom { + /// Write this address. + fn write(&self, writer: &mut impl io::Write) -> io::Result<()>; + /// Read an address. + fn read(reader: &mut impl io::Read) -> io::Result; +} /// The type of the output. #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 59af768f..810859a6 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -1,13 +1,13 @@ use core::marker::PhantomData; use std::io; -use scale::Encode; +use scale::{Encode, Decode, IoReader}; use borsh::{BorshSerialize, BorshDeserialize}; use serai_db::{Get, DbTxn, create_db, db_channel}; use serai_in_instructions_primitives::InInstructionWithBalance; -use primitives::{EncodableG, ReceivedOutput}; +use primitives::{EncodableG, Address, ReceivedOutput}; use crate::{ lifetime::LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, Return, @@ -38,9 +38,30 @@ pub(crate) struct OutputWithInInstruction { } impl OutputWithInInstruction { + pub(crate) fn read(reader: &mut impl io::Read) -> io::Result { + let output = OutputFor::::read(reader)?; + let return_address = { + let mut opt = [0xff]; + reader.read_exact(&mut opt)?; + assert!((opt[0] == 0) || (opt[0] == 1)); + if opt[0] == 0 { + None + } else { + Some(AddressFor::::read(reader)?) + } + }; + let in_instruction = + InInstructionWithBalance::decode(&mut IoReader(reader)).map_err(io::Error::other)?; + Ok(Self { output, return_address, in_instruction }) + } pub(crate) fn write(&self, writer: &mut impl io::Write) -> io::Result<()> { self.output.write(writer)?; - // TODO self.return_address.write(writer)?; + if let Some(return_address) = &self.return_address { + writer.write_all(&[1])?; + return_address.write(writer)?; + } else { + writer.write_all(&[0])?; + } self.in_instruction.encode_to(writer); Ok(()) } diff --git a/processor/scanner/src/eventuality/db.rs b/processor/scanner/src/eventuality/db.rs index baed33c4..c5a07b04 100644 --- a/processor/scanner/src/eventuality/db.rs +++ b/processor/scanner/src/eventuality/db.rs @@ -1,22 +1,18 @@ use core::marker::PhantomData; -use borsh::{BorshSerialize, BorshDeserialize}; +use scale::Encode; use serai_db::{Get, DbTxn, create_db}; -use primitives::EventualityTracker; +use primitives::{EncodableG, Eventuality, EventualityTracker}; use crate::{ScannerFeed, KeyFor, EventualityFor}; -// The DB macro doesn't support `BorshSerialize + BorshDeserialize` as a bound, hence this. -trait Borshy: BorshSerialize + BorshDeserialize {} -impl Borshy for T {} - create_db!( ScannerEventuality { // The next block to check for resolving eventualities NextToCheckForEventualitiesBlock: () -> u64, - SerializedEventualities: () -> Vec, + SerializedEventualities: (key: K) -> Vec, } ); @@ -41,13 +37,25 @@ impl EventualityDb { key: KeyFor, eventualities: &EventualityTracker>, ) { - todo!("TODO") + let mut serialized = Vec::with_capacity(eventualities.active_eventualities.len() * 128); + for eventuality in eventualities.active_eventualities.values() { + eventuality.write(&mut serialized).unwrap(); + } + SerializedEventualities::set(txn, EncodableG(key), &serialized); } pub(crate) fn eventualities( getter: &impl Get, key: KeyFor, ) -> EventualityTracker> { - todo!("TODO") + let serialized = SerializedEventualities::get(getter, EncodableG(key)).unwrap_or(vec![]); + let mut serialized = serialized.as_slice(); + + let mut res = EventualityTracker::default(); + while !serialized.is_empty() { + let eventuality = EventualityFor::::read(&mut serialized).unwrap(); + res.insert(eventuality); + } + res } } diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index e10aab54..b5dc3dd9 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -263,7 +263,7 @@ impl> ContinuallyRan for EventualityTas let mut eventualities = EventualityDb::::eventualities(&txn, key); for new_eventuality in new_eventualities { - eventualities.active_eventualities.insert(new_eventuality.lookup(), new_eventuality); + eventualities.insert(new_eventuality); } EventualityDb::::set_eventualities(&mut txn, key, &eventualities); } diff --git a/processor/scanner/src/report/db.rs b/processor/scanner/src/report/db.rs index 745aa772..2fd98d4b 100644 --- a/processor/scanner/src/report/db.rs +++ b/processor/scanner/src/report/db.rs @@ -4,6 +4,8 @@ create_db!( ScannerReport { // The next block to potentially report NextToPotentiallyReportBlock: () -> u64, + // The next Batch ID to use + NextBatchId: () -> u32, } ); @@ -20,6 +22,8 @@ impl ReportDb { } pub(crate) fn acquire_batch_id(txn: &mut impl DbTxn) -> u32 { - todo!("TODO") + let id = NextBatchId::get(txn).unwrap_or(0); + NextBatchId::set(txn, &(id + 1)); + id } } diff --git a/processor/scanner/src/scan/db.rs b/processor/scanner/src/scan/db.rs index 9b98150f..6df84df1 100644 --- a/processor/scanner/src/scan/db.rs +++ b/processor/scanner/src/scan/db.rs @@ -29,7 +29,14 @@ impl ScanDb { txn: &mut impl DbTxn, block_number: u64, ) -> Vec> { - todo!("TODO") + let serialized = SerializedQueuedOutputs::get(txn, block_number).unwrap_or(vec![]); + let mut serialized = serialized.as_slice(); + + let mut res = Vec::with_capacity(serialized.len() / 128); + while !serialized.is_empty() { + res.push(OutputWithInInstruction::::read(&mut serialized).unwrap()); + } + res } pub(crate) fn queue_output_until_block(