Add helper methods

Has fetched blocks checked to be the indexed blocks. Has scanned outputs be
sorted, meaning they aren't subject to implicit order/may be non-deterministic
(such as if handled by a threadpool).
This commit is contained in:
Luke Parker 2024-08-26 23:15:19 -04:00
parent b65dbacd6a
commit 7e71840822
5 changed files with 63 additions and 50 deletions

View file

@ -54,7 +54,9 @@ pub trait Block: Send + Sync + Sized + Clone + Debug {
fn id(&self) -> [u8; 32];
/// Scan all outputs within this block to find the outputs spendable by this key.
fn scan_for_outputs(&self, key: Self::Key) -> Vec<Self::Output>;
///
/// No assumption on the order of the returned outputs is made.
fn scan_for_outputs_unordered(&self, key: Self::Key) -> Vec<Self::Output>;
/// Check if this block resolved any Eventualities.
///

View file

@ -5,13 +5,11 @@ use serai_db::{DbTxn, Db};
use primitives::{OutputType, ReceivedOutput, Block};
// TODO: Localize to EventualityDb?
use crate::{lifetime::LifetimeStage, db::ScannerDb, ScannerFeed, KeyFor, Scheduler, ContinuallyRan};
use crate::{
lifetime::LifetimeStage, db::ScannerDb, BlockExt, ScannerFeed, KeyFor, Scheduler, ContinuallyRan,
};
/*
Note: The following assumes there's some value, `CONFIRMATIONS`, and the finalized block we
operate on is `CONFIRMATIONS` blocks deep. This is true for Proof-of-Work chains yet not the API
actively used here.
When we scan a block, we receive outputs. When this block is acknowledged, we accumulate those
outputs into some scheduler, potentially causing certain transactions to begin their signing
protocol.
@ -112,25 +110,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
iterated = true;
// TODO: Add a helper to fetch an indexed block, de-duplicate with scan
let block = match self.feed.block_by_number(b).await {
Ok(block) => block,
Err(e) => Err(format!("couldn't fetch block {b}: {e:?}"))?,
};
// Check the ID of this block is the expected ID
{
let expected =
ScannerDb::<S>::block_id(&self.db, b).expect("scannable block didn't have its ID saved");
if block.id() != expected {
panic!(
"finalized chain reorganized from {} to {} at {}",
hex::encode(expected),
hex::encode(block.id()),
b
);
}
}
let block = self.feed.block_by_number(b).await?;
log::info!("checking eventuality completions in block: {} ({b})", hex::encode(block.id()));
@ -171,7 +151,6 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
};
// Fetch all non-External outputs
// TODO: Have a scan_for_outputs_ext which sorts for us
let mut non_external_outputs = block.scan_for_outputs(key.key);
non_external_outputs.retain(|output| output.kind() != OutputType::External);
// Drop any outputs less than the dust limit
@ -210,6 +189,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
let outputs_to_return = ScannerDb::<S>::take_queued_returns(&mut txn, b);
// TODO: This also has to intake Burns
let new_eventualities =
self.scheduler.accumulate_outputs_and_return_outputs(&mut txn, outputs, outputs_to_return);
for (key, new_eventualities) in new_eventualities {
@ -220,7 +200,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
KeyFor::<S>::from_bytes(&key_repr).unwrap()
};
let mut eventualities = ScannerDb::<S>::eventualities(&txn, key.key);
let mut eventualities = ScannerDb::<S>::eventualities(&txn, key);
for new_eventuality in new_eventualities {
eventualities.active_eventualities.insert(new_eventuality.lookup(), new_eventuality);
}

View file

@ -43,7 +43,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for IndexFinalizedTask<D, S> {
// Index the hashes of all blocks until the latest finalized block
for b in (our_latest_finalized + 1) ..= latest_finalized {
let block = match self.feed.block_header_by_number(b).await {
let block = match self.feed.unchecked_block_header_by_number(b).await {
Ok(block) => block,
Err(e) => Err(format!("couldn't fetch block {b}: {e:?}"))?,
};

View file

@ -23,6 +23,23 @@ mod eventuality;
/// Task which reports `Batch`s to Substrate.
mod report;
/// Extension traits around Block.
pub(crate) trait BlockExt: Block {
fn scan_for_outputs(&self, key: Self::Key) -> Vec<Self::Output>;
}
impl<B: Block> BlockExt for B {
fn scan_for_outputs(&self, key: Self::Key) -> Vec<Self::Output> {
let mut outputs = self.scan_for_outputs_unordered();
outputs.sort_by(|a, b| {
use core::cmp::{Ordering, Ord};
let res = a.id().as_ref().cmp(&b.id().as_ref());
assert!(res != Ordering::Equal, "scanned two outputs within a block with the same ID");
res
});
outputs
}
}
/// A feed usable to scan a blockchain.
///
/// This defines the primitive types used, along with various getters necessary for indexing.
@ -68,13 +85,44 @@ pub trait ScannerFeed: Send + Sync {
async fn latest_finalized_block_number(&self) -> Result<u64, Self::EphemeralError>;
/// Fetch a block header by its number.
async fn block_header_by_number(
///
/// This does not check the returned BlockHeader is the header for the block we indexed.
async fn unchecked_block_header_by_number(
&self,
number: u64,
) -> Result<<Self::Block as Block>::Header, Self::EphemeralError>;
/// Fetch a block by its number.
async fn block_by_number(&self, number: u64) -> Result<Self::Block, Self::EphemeralError>;
///
/// This does not check the returned Block is the block we indexed.
async fn unchecked_block_by_number(
&self,
number: u64,
) -> Result<Self::Block, Self::EphemeralError>;
/// Fetch a block by its number.
///
/// Panics if the block requested wasn't indexed.
async fn block_by_number(&self, getter: &impl Get, number: u64) -> Result<Self::Block, String> {
let block = match self.unchecked_block_by_number(number).await {
Ok(block) => block,
Err(e) => Err(format!("couldn't fetch block {number}: {e:?}"))?,
};
// Check the ID of this block is the expected ID
{
let expected =
ScannerDb::<S>::block_id(&self.db, number).expect("requested a block which wasn't indexed");
if block.id() != expected {
panic!(
"finalized chain reorganized from {} to {} at {}",
hex::encode(expected),
hex::encode(block.id()),
number,
);
}
}
}
/// The cost to aggregate an input as of the specified block.
///

View file

@ -12,7 +12,7 @@ use primitives::{OutputType, ReceivedOutput, Block};
use crate::{
lifetime::LifetimeStage,
db::{OutputWithInInstruction, ScannerDb},
ScannerFeed, AddressFor, OutputFor, ContinuallyRan,
BlockExt, ScannerFeed, AddressFor, OutputFor, ContinuallyRan,
};
// Construct an InInstruction from an external output.
@ -76,24 +76,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
.expect("ScanForOutputsTask run before writing the start block");
for b in next_to_scan ..= latest_scannable {
let block = match self.feed.block_by_number(b).await {
Ok(block) => block,
Err(e) => Err(format!("couldn't fetch block {b}: {e:?}"))?,
};
// Check the ID of this block is the expected ID
{
let expected =
ScannerDb::<S>::block_id(&self.db, b).expect("scannable block didn't have its ID saved");
if block.id() != expected {
panic!(
"finalized chain reorganized from {} to {} at {}",
hex::encode(expected),
hex::encode(block.id()),
b
);
}
}
let block = self.feed.block_by_number(b).await?;
log::info!("scanning block: {} ({b})", hex::encode(block.id()));
@ -143,7 +126,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
worthwhile, and even if they're not economically, they are technically).
The alternative, we drop outputs here with a generic filter rule and then report back
the insolvency created, still doesn't work as we'd only be creating if an insolvency if
the insolvency created, still doesn't work as we'd only be creating an insolvency if
the output was actually made by us (and not simply someone else sending in). We can
have the Eventuality task report the insolvency, yet that requires the scanner be
responsible for such filter logic. It's more flexible, and has a cleaner API,