From 8ac501028de44ac968b99aea9c750226468fc7b6 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 29 Aug 2024 00:01:31 -0400 Subject: [PATCH] Add API to publish Batches with This doesn't have to be abstract, we can generate the message and use the message-queue API, yet this should help with testing. --- processor/scanner/src/lib.rs | 27 ++++++++++++++++++++++++--- processor/scanner/src/report/mod.rs | 22 +++++++++++++++------- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index d8a29951..3e828fcb 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -6,6 +6,7 @@ use group::GroupEncoding; use serai_db::{Get, DbTxn, Db}; use serai_primitives::{NetworkId, Coin, Amount}; +use serai_in_instructions_primitives::Batch; use primitives::{task::*, Address, ReceivedOutput, Block}; @@ -81,7 +82,7 @@ pub trait ScannerFeed: 'static + Send + Sync + Clone { /// An error encountered when fetching data from the blockchain. /// /// This MUST be an ephemeral error. Retrying fetching data from the blockchain MUST eventually - /// resolve without manual intervention. + /// resolve without manual intervention/changing the arguments. type EphemeralError: Debug; /// Fetch the number of the latest finalized block. @@ -156,6 +157,20 @@ type AddressFor = <::Block as Block>::Address; type OutputFor = <::Block as Block>::Output; type EventualityFor = <::Block as Block>::Eventuality; +#[async_trait::async_trait] +pub trait BatchPublisher: 'static + Send + Sync { + /// An error encountered when publishing the Batch. + /// + /// This MUST be an ephemeral error. Retrying publication MUST eventually resolve without manual + /// intervention/changing the arguments. + type EphemeralError: Debug; + + /// Publish a Batch. + /// + /// This function must be safe to call with the same Batch multiple times. + async fn publish_batch(&mut self, batch: Batch) -> Result<(), Self::EphemeralError>; +} + /// A return to occur. pub struct Return { address: AddressFor, @@ -193,10 +208,16 @@ impl Scanner { /// /// This will begin its execution, spawning several asynchronous tasks. // TODO: Take start_time and binary search here? - pub async fn new(db: impl Db, feed: S, scheduler: impl Scheduler, start_block: u64) -> Self { + pub async fn new( + db: impl Db, + feed: S, + batch_publisher: impl BatchPublisher, + scheduler: impl Scheduler, + start_block: u64, + ) -> Self { let index_task = index::IndexTask::new(db.clone(), feed.clone(), start_block).await; let scan_task = scan::ScanTask::new(db.clone(), feed.clone(), start_block); - let report_task = report::ReportTask::<_, S>::new(db.clone(), start_block); + let report_task = report::ReportTask::<_, S, _>::new(db.clone(), batch_publisher, start_block); let eventuality_task = eventuality::EventualityTask::new(db, feed, scheduler, start_block); let (_index_handle, index_run) = RunNowHandle::new(); diff --git a/processor/scanner/src/report/mod.rs b/processor/scanner/src/report/mod.rs index 18f842e2..b789ea58 100644 --- a/processor/scanner/src/report/mod.rs +++ b/processor/scanner/src/report/mod.rs @@ -6,11 +6,12 @@ use serai_db::{DbTxn, Db}; use serai_primitives::BlockHash; use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; +use primitives::task::ContinuallyRan; use crate::{ db::{ScannerGlobalDb, ScanToReportDb}, index, scan::next_to_scan_for_outputs_block, - ScannerFeed, ContinuallyRan, + ScannerFeed, BatchPublisher, }; mod db; @@ -24,13 +25,14 @@ use db::ReportDb; the InInstructions for it. */ #[allow(non_snake_case)] -pub(crate) struct ReportTask { +pub(crate) struct ReportTask { db: D, + batch_publisher: B, _S: PhantomData, } -impl ReportTask { - pub(crate) fn new(mut db: D, start_block: u64) -> Self { +impl ReportTask { + pub(crate) fn new(mut db: D, batch_publisher: B, start_block: u64) -> Self { if ReportDb::next_to_potentially_report_block(&db).is_none() { // Initialize the DB let mut txn = db.txn(); @@ -38,12 +40,12 @@ impl ReportTask { txn.commit(); } - Self { db, _S: PhantomData } + Self { db, batch_publisher, _S: PhantomData } } } #[async_trait::async_trait] -impl ContinuallyRan for ReportTask { +impl ContinuallyRan for ReportTask { async fn run_iteration(&mut self) -> Result { let highest_reportable = { // Fetch the next to scan block @@ -107,7 +109,13 @@ impl ContinuallyRan for ReportTask { } } - todo!("TODO: Set/emit batches"); + for batch in batches { + self + .batch_publisher + .publish_batch(batch) + .await + .map_err(|e| format!("failed to publish batch: {e:?}"))?; + } } // Update the next to potentially report block