Add API to publish Batches with

This doesn't have to be abstract, we can generate the message and use the
message-queue API, yet this should help with testing.
This commit is contained in:
Luke Parker 2024-08-29 00:01:31 -04:00
parent 612c67c537
commit 8ac501028d
2 changed files with 39 additions and 10 deletions

View file

@ -6,6 +6,7 @@ use group::GroupEncoding;
use serai_db::{Get, DbTxn, Db}; use serai_db::{Get, DbTxn, Db};
use serai_primitives::{NetworkId, Coin, Amount}; use serai_primitives::{NetworkId, Coin, Amount};
use serai_in_instructions_primitives::Batch;
use primitives::{task::*, Address, ReceivedOutput, Block}; use primitives::{task::*, Address, ReceivedOutput, Block};
@ -81,7 +82,7 @@ pub trait ScannerFeed: 'static + Send + Sync + Clone {
/// An error encountered when fetching data from the blockchain. /// An error encountered when fetching data from the blockchain.
/// ///
/// This MUST be an ephemeral error. Retrying fetching data from the blockchain MUST eventually /// This MUST be an ephemeral error. Retrying fetching data from the blockchain MUST eventually
/// resolve without manual intervention. /// resolve without manual intervention/changing the arguments.
type EphemeralError: Debug; type EphemeralError: Debug;
/// Fetch the number of the latest finalized block. /// Fetch the number of the latest finalized block.
@ -156,6 +157,20 @@ type AddressFor<S> = <<S as ScannerFeed>::Block as Block>::Address;
type OutputFor<S> = <<S as ScannerFeed>::Block as Block>::Output; type OutputFor<S> = <<S as ScannerFeed>::Block as Block>::Output;
type EventualityFor<S> = <<S as ScannerFeed>::Block as Block>::Eventuality; type EventualityFor<S> = <<S as ScannerFeed>::Block as Block>::Eventuality;
#[async_trait::async_trait]
pub trait BatchPublisher: 'static + Send + Sync {
/// An error encountered when publishing the Batch.
///
/// This MUST be an ephemeral error. Retrying publication MUST eventually resolve without manual
/// intervention/changing the arguments.
type EphemeralError: Debug;
/// Publish a Batch.
///
/// This function must be safe to call with the same Batch multiple times.
async fn publish_batch(&mut self, batch: Batch) -> Result<(), Self::EphemeralError>;
}
/// A return to occur. /// A return to occur.
pub struct Return<S: ScannerFeed> { pub struct Return<S: ScannerFeed> {
address: AddressFor<S>, address: AddressFor<S>,
@ -193,10 +208,16 @@ impl<S: ScannerFeed> Scanner<S> {
/// ///
/// This will begin its execution, spawning several asynchronous tasks. /// This will begin its execution, spawning several asynchronous tasks.
// TODO: Take start_time and binary search here? // TODO: Take start_time and binary search here?
pub async fn new(db: impl Db, feed: S, scheduler: impl Scheduler<S>, start_block: u64) -> Self { pub async fn new(
db: impl Db,
feed: S,
batch_publisher: impl BatchPublisher,
scheduler: impl Scheduler<S>,
start_block: u64,
) -> Self {
let index_task = index::IndexTask::new(db.clone(), feed.clone(), start_block).await; let index_task = index::IndexTask::new(db.clone(), feed.clone(), start_block).await;
let scan_task = scan::ScanTask::new(db.clone(), feed.clone(), start_block); let scan_task = scan::ScanTask::new(db.clone(), feed.clone(), start_block);
let report_task = report::ReportTask::<_, S>::new(db.clone(), start_block); let report_task = report::ReportTask::<_, S, _>::new(db.clone(), batch_publisher, start_block);
let eventuality_task = eventuality::EventualityTask::new(db, feed, scheduler, start_block); let eventuality_task = eventuality::EventualityTask::new(db, feed, scheduler, start_block);
let (_index_handle, index_run) = RunNowHandle::new(); let (_index_handle, index_run) = RunNowHandle::new();

View file

@ -6,11 +6,12 @@ use serai_db::{DbTxn, Db};
use serai_primitives::BlockHash; use serai_primitives::BlockHash;
use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
use primitives::task::ContinuallyRan;
use crate::{ use crate::{
db::{ScannerGlobalDb, ScanToReportDb}, db::{ScannerGlobalDb, ScanToReportDb},
index, index,
scan::next_to_scan_for_outputs_block, scan::next_to_scan_for_outputs_block,
ScannerFeed, ContinuallyRan, ScannerFeed, BatchPublisher,
}; };
mod db; mod db;
@ -24,13 +25,14 @@ use db::ReportDb;
the InInstructions for it. the InInstructions for it.
*/ */
#[allow(non_snake_case)] #[allow(non_snake_case)]
pub(crate) struct ReportTask<D: Db, S: ScannerFeed> { pub(crate) struct ReportTask<D: Db, S: ScannerFeed, B: BatchPublisher> {
db: D, db: D,
batch_publisher: B,
_S: PhantomData<S>, _S: PhantomData<S>,
} }
impl<D: Db, S: ScannerFeed> ReportTask<D, S> { impl<D: Db, S: ScannerFeed, B: BatchPublisher> ReportTask<D, S, B> {
pub(crate) fn new(mut db: D, start_block: u64) -> Self { pub(crate) fn new(mut db: D, batch_publisher: B, start_block: u64) -> Self {
if ReportDb::next_to_potentially_report_block(&db).is_none() { if ReportDb::next_to_potentially_report_block(&db).is_none() {
// Initialize the DB // Initialize the DB
let mut txn = db.txn(); let mut txn = db.txn();
@ -38,12 +40,12 @@ impl<D: Db, S: ScannerFeed> ReportTask<D, S> {
txn.commit(); txn.commit();
} }
Self { db, _S: PhantomData } Self { db, batch_publisher, _S: PhantomData }
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> { impl<D: Db, S: ScannerFeed, B: BatchPublisher> ContinuallyRan for ReportTask<D, S, B> {
async fn run_iteration(&mut self) -> Result<bool, String> { async fn run_iteration(&mut self) -> Result<bool, String> {
let highest_reportable = { let highest_reportable = {
// Fetch the next to scan block // Fetch the next to scan block
@ -107,7 +109,13 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
} }
} }
todo!("TODO: Set/emit batches"); for batch in batches {
self
.batch_publisher
.publish_batch(batch)
.await
.map_err(|e| format!("failed to publish batch: {e:?}"))?;
}
} }
// Update the next to potentially report block // Update the next to potentially report block