From 2bddf002226d0e92d3abc6fa39eca1bab177a10d Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 27 Aug 2024 00:44:11 -0400 Subject: [PATCH] Don't expose IndexDb throughout the crate --- processor/primitives/src/task.rs | 2 +- processor/scanner/src/db.rs | 3 --- processor/scanner/src/index/mod.rs | 40 ++++++++++++++++++++++++++++-- processor/scanner/src/lib.rs | 3 +-- processor/scanner/src/report.rs | 5 ++-- 5 files changed, 42 insertions(+), 11 deletions(-) diff --git a/processor/primitives/src/task.rs b/processor/primitives/src/task.rs index a7d6153c..94a576a0 100644 --- a/processor/primitives/src/task.rs +++ b/processor/primitives/src/task.rs @@ -78,7 +78,7 @@ pub trait ContinuallyRan: Sized { } } Err(e) => { - log::debug!("{}", e); + log::warn!("{}", e); increase_sleep_before_next_task(&mut current_sleep_before_next_task); } } diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 18511222..42086681 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -143,9 +143,6 @@ impl ScannerDb { "setting start block but prior set start block" ); - crate::index::IndexDb::set_block(txn, start_block, id); - crate::index::IndexDb::set_latest_finalized_block(txn, start_block); - NextToScanForOutputsBlock::set(txn, &start_block); // We can receive outputs in this block, but any descending transactions will be in the next // block. This, with the check on-set, creates a bound that this value in the DB is non-zero. diff --git a/processor/scanner/src/index/mod.rs b/processor/scanner/src/index/mod.rs index 07801650..7c70eedc 100644 --- a/processor/scanner/src/index/mod.rs +++ b/processor/scanner/src/index/mod.rs @@ -1,11 +1,17 @@ -use serai_db::{DbTxn, Db}; +use serai_db::{Get, DbTxn, Db}; use primitives::{task::ContinuallyRan, BlockHeader}; use crate::ScannerFeed; mod db; -pub(crate) use db::IndexDb; +use db::IndexDb; + +/// Panics if an unindexed block's ID is requested. +pub(crate) fn block_id(getter: &impl Get, block_number: u64) -> [u8; 32] { + IndexDb::block_id(getter, block_number) + .unwrap_or_else(|| panic!("requested block ID for unindexed block {block_number}")) +} /* This processor should build its own index of the blockchain, yet only for finalized blocks which @@ -20,6 +26,36 @@ struct IndexFinalizedTask { feed: S, } +impl IndexFinalizedTask { + pub(crate) async fn new(mut db: D, feed: S, start_block: u64) -> Self { + if IndexDb::block_id(&db, start_block).is_none() { + // Fetch the block for its ID + let block = { + let mut delay = Self::DELAY_BETWEEN_ITERATIONS; + loop { + match feed.unchecked_block_header_by_number(start_block).await { + Ok(block) => break block, + Err(e) => { + log::warn!("IndexFinalizedTask couldn't fetch start block {start_block}: {e:?}"); + tokio::time::sleep(core::time::Duration::from_secs(delay)).await; + delay += Self::DELAY_BETWEEN_ITERATIONS; + delay = delay.min(Self::MAX_DELAY_BETWEEN_ITERATIONS); + } + }; + } + }; + + // Initialize the DB + let mut txn = db.txn(); + IndexDb::set_block(&mut txn, start_block, block.id()); + IndexDb::set_latest_finalized_block(&mut txn, start_block); + txn.commit(); + } + + Self { db, feed } + } +} + #[async_trait::async_trait] impl ContinuallyRan for IndexFinalizedTask { async fn run_iteration(&mut self) -> Result { diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index fb6599b7..a29f1069 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -113,8 +113,7 @@ pub trait ScannerFeed: Send + Sync { // Check the ID of this block is the expected ID { - let expected = crate::index::IndexDb::block_id(getter, number) - .expect("requested a block which wasn't indexed"); + let expected = crate::index::block_id(getter, number); if block.id() != expected { panic!( "finalized chain reorganized from {} to {} at {}", diff --git a/processor/scanner/src/report.rs b/processor/scanner/src/report.rs index 8f37d7a6..f2caf692 100644 --- a/processor/scanner/src/report.rs +++ b/processor/scanner/src/report.rs @@ -7,7 +7,7 @@ use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; use primitives::ReceivedOutput; // TODO: Localize to ReportDb? -use crate::{db::ScannerDb, index::IndexDb, ScannerFeed, ContinuallyRan}; +use crate::{db::ScannerDb, index, ScannerFeed, ContinuallyRan}; /* This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion. @@ -65,8 +65,7 @@ impl ContinuallyRan for ReportTask { }; let network = S::NETWORK; - let block_hash = - IndexDb::block_id(&txn, b).expect("reporting block we didn't save the ID for"); + let block_hash = index::block_id(&txn, b); let mut batch_id = ScannerDb::::acquire_batch_id(&mut txn); // start with empty batch