Flesh out new scanner a bit more

Adds the task to mark blocks safe to scan, and outlines the task to report
blocks.
This commit is contained in:
Luke Parker 2024-08-20 16:24:18 -04:00
parent 8763ef23ed
commit a2717d73f0
7 changed files with 259 additions and 32 deletions

View file

@ -1,11 +1,9 @@
use core::marker::PhantomData;
use group::GroupEncoding;
use borsh::{BorshSerialize, BorshDeserialize};
use serai_db::{Get, DbTxn, create_db};
use primitives::{Id, Block, BorshG};
use primitives::{Id, ReceivedOutput, Block, BorshG};
use crate::ScannerFeed;
@ -14,7 +12,7 @@ trait Borshy: BorshSerialize + BorshDeserialize {}
impl<T: BorshSerialize + BorshDeserialize> Borshy for T {}
#[derive(BorshSerialize, BorshDeserialize)]
struct SeraiKey<K: Borshy> {
pub(crate) struct SeraiKey<K: Borshy> {
activation_block_number: u64,
retirement_block_number: Option<u64>,
key: K,
@ -35,6 +33,10 @@ create_db!(
NextToScanForOutputsBlock: () -> u64,
// The next block to check for resolving eventualities
NextToCheckForEventualitiesBlock: () -> u64,
// The next block to potentially report
NextToPotentiallyReportBlock: () -> u64,
// The highest acknowledged block
HighestAcknowledgedBlock: () -> u64,
// If a block was notable
/*
@ -55,6 +57,8 @@ create_db!(
*/
// This collapses from `bool` to `()`, using if the value was set for true and false otherwise
NotableBlock: (number: u64) -> (),
SerializedOutputs: (block_number: u64) -> Vec<u8>,
}
);
@ -74,6 +78,10 @@ impl<S: ScannerFeed> ScannerDb<S> {
// activation_block_number is inclusive, so the key will be scanned for starting at the specified
// block
pub(crate) fn queue_key(txn: &mut impl DbTxn, activation_block_number: u64, key: S::Key) {
// Set this block as notable
NotableBlock::set(txn, activation_block_number, &());
// Push the key
let mut keys: Vec<SeraiKey<BorshG<S::Key>>> = ActiveKeys::get(txn).unwrap_or(vec![]);
for key_i in &keys {
if key == key_i.key.0 {
@ -124,6 +132,7 @@ impl<S: ScannerFeed> ScannerDb<S> {
LatestScannableBlock::set(txn, &start_block);
NextToScanForOutputsBlock::set(txn, &start_block);
NextToCheckForEventualitiesBlock::set(txn, &start_block);
NextToPotentiallyReportBlock::set(txn, &start_block);
}
pub(crate) fn set_latest_finalized_block(txn: &mut impl DbTxn, latest_finalized_block: u64) {
@ -159,4 +168,47 @@ impl<S: ScannerFeed> ScannerDb<S> {
pub(crate) fn next_to_check_for_eventualities_block(getter: &impl Get) -> Option<u64> {
NextToCheckForEventualitiesBlock::get(getter)
}
pub(crate) fn set_next_to_potentially_report_block(
txn: &mut impl DbTxn,
next_to_potentially_report_block: u64,
) {
NextToPotentiallyReportBlock::set(txn, &next_to_potentially_report_block);
}
pub(crate) fn next_to_potentially_report_block(getter: &impl Get) -> Option<u64> {
NextToPotentiallyReportBlock::get(getter)
}
pub(crate) fn set_highest_acknowledged_block(
txn: &mut impl DbTxn,
highest_acknowledged_block: u64,
) {
HighestAcknowledgedBlock::set(txn, &highest_acknowledged_block);
}
pub(crate) fn highest_acknowledged_block(getter: &impl Get) -> Option<u64> {
HighestAcknowledgedBlock::get(getter)
}
pub(crate) fn set_outputs(
txn: &mut impl DbTxn,
block_number: u64,
outputs: Vec<impl ReceivedOutput<S::Key, S::Address>>,
) {
if outputs.is_empty() {
return;
}
// Set this block as notable
NotableBlock::set(txn, block_number, &());
let mut buf = Vec::with_capacity(outputs.len() * 128);
for output in outputs {
output.write(&mut buf).unwrap();
}
SerializedOutputs::set(txn, block_number, &buf);
}
pub(crate) fn is_notable_block(getter: &impl Get, number: u64) -> bool {
NotableBlock::get(getter, number).is_some()
}
}

View file

@ -0,0 +1 @@
// TODO

View file

@ -20,7 +20,7 @@ struct IndexFinalizedTask<D: Db, S: ScannerFeed> {
#[async_trait::async_trait]
impl<D: Db, S: ScannerFeed> ContinuallyRan for IndexFinalizedTask<D, S> {
async fn run_instance(&mut self) -> Result<(), String> {
async fn run_iteration(&mut self) -> Result<bool, String> {
// Fetch the latest finalized block
let our_latest_finalized = ScannerDb::<S>::latest_finalized_block(&self.db)
.expect("IndexTask run before writing the start block");
@ -29,6 +29,18 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for IndexFinalizedTask<D, S> {
Err(e) => Err(format!("couldn't fetch the latest finalized block number: {e:?}"))?,
};
if latest_finalized < our_latest_finalized {
// Explicitly log this as an error as returned ephemeral errors are logged with debug
// This doesn't panic as the node should sync along our indexed chain, and if it doesn't,
// we'll panic at that point in time
log::error!(
"node is out of sync, latest finalized {} is behind our indexed {}",
latest_finalized,
our_latest_finalized
);
Err("node is out of sync".to_string())?;
}
// Index the hashes of all blocks until the latest finalized block
for b in (our_latest_finalized + 1) ..= latest_finalized {
let block = match self.feed.block_by_number(b).await {
@ -57,16 +69,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for IndexFinalizedTask<D, S> {
txn.commit();
}
Ok(())
// Have dependents run if we updated the latest finalized block
Ok(our_latest_finalized != latest_finalized)
}
}
/*
The processor can't index the blockchain unilaterally. It needs to develop a totally ordered view
of the blockchain. That requires consensus with other validators on when certain keys are set to
activate (and retire). We solve this by only scanning `n` blocks ahead of the last agreed upon
block, then waiting for Serai to acknowledge the block. This lets us safely schedule events after
this `n` block window (as demonstrated/proven with `mini`).
TODO
*/

View file

@ -1,4 +1,6 @@
use core::fmt::Debug;
use core::{fmt::Debug, time::Duration};
use tokio::sync::mpsc;
use primitives::{ReceivedOutput, Block};
@ -50,11 +52,50 @@ pub trait ScannerFeed: Send + Sync {
) -> Result<Self::Output, Self::EphemeralError>;
}
/// A handle to immediately run an iteration of a task.
#[derive(Clone)]
pub(crate) struct RunNowHandle(mpsc::Sender<()>);
/// An instruction recipient to immediately run an iteration of a task.
pub(crate) struct RunNowRecipient(mpsc::Receiver<()>);
impl RunNowHandle {
/// Create a new run-now handle to be assigned to a task.
pub(crate) fn new() -> (Self, RunNowRecipient) {
// Uses a capacity of 1 as any call to run as soon as possible satisfies all calls to run as
// soon as possible
let (send, recv) = mpsc::channel(1);
(Self(send), RunNowRecipient(recv))
}
/// Tell the task to run now (and not whenever its next iteration on a timer is).
///
/// Panics if the task has been dropped.
pub(crate) fn run_now(&self) {
#[allow(clippy::match_same_arms)]
match self.0.try_send(()) {
Ok(()) => {}
// NOP on full, as this task will already be ran as soon as possible
Err(mpsc::error::TrySendError::Full(())) => {}
Err(mpsc::error::TrySendError::Closed(())) => {
panic!("task was unexpectedly closed when calling run_now")
}
}
}
}
#[async_trait::async_trait]
pub(crate) trait ContinuallyRan: Sized {
async fn run_instance(&mut self) -> Result<(), String>;
/// Run an iteration of the task.
///
/// If this returns `true`, all dependents of the task will immediately have a new iteration ran
/// (without waiting for whatever timer they were already on).
async fn run_iteration(&mut self) -> Result<bool, String>;
async fn continually_run(mut self) {
/// Continually run the task.
///
/// This returns a channel which can have a message set to immediately trigger a new run of an
/// iteration.
async fn continually_run(mut self, mut run_now: RunNowRecipient, dependents: Vec<RunNowHandle>) {
// The default number of seconds to sleep before running the task again
let default_sleep_before_next_task = 5;
// The current number of seconds to sleep before running the task again
@ -67,10 +108,16 @@ pub(crate) trait ContinuallyRan: Sized {
};
loop {
match self.run_instance().await {
Ok(()) => {
match self.run_iteration().await {
Ok(run_dependents) => {
// Upon a successful (error-free) loop iteration, reset the amount of time we sleep
current_sleep_before_next_task = default_sleep_before_next_task;
if run_dependents {
for dependent in &dependents {
dependent.run_now();
}
}
}
Err(e) => {
log::debug!("{}", e);
@ -78,9 +125,11 @@ pub(crate) trait ContinuallyRan: Sized {
}
}
// Don't run the task again for another few seconds
// This is at the start of the loop so we can continue without skipping this delay
tokio::time::sleep(core::time::Duration::from_secs(current_sleep_before_next_task)).await;
// Don't run the task again for another few seconds UNLESS told to run now
tokio::select! {
() = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {},
msg = run_now.0.recv() => assert_eq!(msg, Some(()), "run now handle was dropped"),
}
}
}
}

View file

@ -0,0 +1,50 @@
/*
We only report blocks once both tasks, scanning for received ouputs and eventualities, have
processed the block. This ensures we've performed all ncessary options.
*/
use serai_db::{Db, DbTxn};
use primitives::{Id, Block};
// TODO: Localize to ReportDb?
use crate::{db::ScannerDb, ScannerFeed};
struct ReportTask<D: Db, S: ScannerFeed> {
db: D,
feed: S,
}
#[async_trait::async_trait]
impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
async fn run_iteration(&mut self) -> Result<bool, String> {
let highest_reportable = {
// Fetch the latest scanned and latest checked block
let next_to_scan = ScannerDb::<S>::next_to_scan_for_outputs_block(&self.db).expect("ReportTask run before writing the start block");
let next_to_check = ScannerDb::<S>::next_to_check_for_eventualities_block(&self.db).expect("ReportTask run before writing the start block");
// If we haven't done any work, return
if (next_to_scan == 0) || (next_to_check == 0) {
return Ok(false);
}
let last_scanned = next_to_scan - 1;
let last_checked = next_to_check - 1;
last_scanned.min(last_checked)
};
let next_to_potentially_report = ScannerDb::<S>::next_block_to_potentially_report(&self.db).expect("ReportTask run before writing the start block");
for b in next_to_potentially_report ..= highest_reportable {
if ScannerDb::<S>::is_block_notable(b) {
todo!("TODO: Make Batches, which requires handling Forwarded within this crate");
}
let mut txn = self.db.txn();
// Update the next to potentially report block
ScannerDb::<S>::set_next_to_potentially_report_block(&mut txn, b + 1);
txn.commit();
}
// Run dependents if we decided to report any blocks
Ok(next_to_potentially_report <= highest_reportable)
}
}

View file

@ -0,0 +1,73 @@
use core::marker::PhantomData;
use serai_db::{Db, DbTxn};
use primitives::{Id, Block};
// TODO: Localize to SafeDb?
use crate::{db::ScannerDb, ScannerFeed};
/*
We mark blocks safe to scan when they're no more than `(CONFIRMATIONS - 1)` blocks after the
oldest notable block still pending acknowledgement (creating a window of length `CONFIRMATIONS`
when including the block pending acknowledgement). This means that if all known notable blocks
have been acknowledged, and a stretch of non-notable blocks occurs, they'll automatically be
marked safe to scan (since they come before the next oldest notable block still pending
acknowledgement).
This design lets Serai safely schedule events `CONFIRMATIONS` blocks after the latest
acknowledged block. For an exhaustive proof of this, please see `mini`.
*/
struct SafeToScanTask<D: Db, S: ScannerFeed> {
db: D,
_S: PhantomData<S>,
}
#[async_trait::async_trait]
impl<D: Db, S: ScannerFeed> ContinuallyRan for SafeToScanTask<D, S> {
async fn run_iteration(&mut self) -> Result<bool, String> {
// First, we fetch the highest acknowledged block
let Some(highest_acknowledged_block) = ScannerDb::<S>::highest_acknowledged_block(&self.db) else {
// If no blocks have been acknowledged, we don't mark any safe
// Once the start block (implicitly safe) has been acknowledged, we proceed from there
return Ok(false);
};
let latest_block_known_if_pending_acknowledgement = {
// The next block to potentially report comes after all blocks we've decided to report or not
// If we've decided to report (or not report) a block, we know if it needs acknowledgement
// (and accordingly is pending acknowledgement)
// Accordingly, the block immediately before this is the latest block with a known status
ScannerDb::<S>::next_block_to_potentially_report(&self.db).expect("SafeToScanTask run before writing the start block") - 1
};
let mut oldest_pending_acknowledgement = None;
for b in (highest_acknowledged_block + 1) ..= latest_block_known_if_pending_acknowledgement {
// If the block isn't notable, immediately flag it as acknowledged
if !ScannerDb::<S>::is_block_notable(b) {
let mut txn = self.db.txn();
ScannerDb::<S>::set_highest_acknowledged_block(&mut txn, b);
txn.commit();
continue;
}
oldest_pending_acknowledgement = Some(b);
break;
}
// `oldest_pending_acknowledgement` is now the oldest block pending acknowledgement or `None`
// If it's `None`, then we were able to implicitly acknowledge all blocks within this span
// Since the safe block is `(CONFIRMATIONS - 1)` blocks after the oldest block still pending
// acknowledgement, and the oldest block still pending acknowledgement is in the future,
// we know the safe block to scan to is
// `>= latest_block_known_if_pending_acknowledgement + (CONFIRMATIONS - 1)`
let oldest_pending_acknowledgement = oldest_pending_acknowledgement.unwrap_or(latest_block_known_if_pending_acknowledgement);
// Update the latest scannable block
let mut txn = self.db.txn();
ScannerDb::<S>::set_latest_scannable_block(oldest_pending_acknowledgement + (CONFIRMATIONS - 1));
txn.commit();
Ok(next_to_potentially_report <= highest_reportable)
}
}

View file

@ -12,7 +12,7 @@ struct ScanForOutputsTask<D: Db, S: ScannerFeed> {
#[async_trait::async_trait]
impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
async fn run_instance(&mut self) -> Result<(), String> {
async fn run_iteration(&mut self) -> Result<bool, String> {
// Fetch the safe to scan block
let latest_scannable = ScannerDb::<S>::latest_scannable_block(&self.db).expect("ScanForOutputsTask run before writing the start block");
// Fetch the next block to scan
@ -43,6 +43,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
}
assert!(keys.len() <= 2);
let mut outputs = vec![];
// Scan for each key
for key in keys {
// If this key has yet to active, skip it
@ -50,7 +51,6 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
continue;
}
let mut outputs = vec![];
for output in network.scan_for_outputs(&block, key).awaits {
assert_eq!(output.key(), key);
// TODO: Check for dust
@ -59,15 +59,14 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
}
let mut txn = self.db.txn();
// Update the latest scanned block
// Save the outputs
ScannerDb::<S>::set_outputs(&mut txn, b, outputs);
// Update the next to scan block
ScannerDb::<S>::set_next_to_scan_for_outputs_block(&mut txn, b + 1);
// TODO: If this had outputs, yield them and mark this block notable
/*
A block is notable if it's an activation, had outputs, or a retirement block.
*/
txn.commit();
}
Ok(())
// Run dependents if we successfully scanned any blocks
Ok(next_to_scan <= latest_scannable)
}
}