mirror of
https://github.com/serai-dex/serai.git
synced 2025-01-22 10:44:53 +00:00
Don't have acknowledge_batch
immediately run
`acknowledge_batch` can only be run if we know what the Batch should be. If we don't know what the Batch should be, we have to block until we do. Specifically, we need the block number associated with the Batch. Instead of blocking over the Scanner API, the Scanner API now solely queues actions. A new task intakes those actions once we can. This ensures we can intake the entire Substrate chain, even if our daemon for the external network is stalled at its genesis block. All of this for the block number alone seems ridiculous. To go from the block hash in the Batch to the block number without this task, we'd at least need the index task to be up to date (still requiring blocking or an API returning ephemeral errors).
This commit is contained in:
parent
f21838e0d5
commit
13b74195f7
3 changed files with 282 additions and 80 deletions
|
@ -7,7 +7,7 @@ use serai_db::{Get, DbTxn, Db};
|
|||
|
||||
use serai_primitives::{NetworkId, Coin, Amount};
|
||||
use serai_in_instructions_primitives::Batch;
|
||||
use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance};
|
||||
use serai_coins_primitives::OutInstructionWithBalance;
|
||||
|
||||
use primitives::{task::*, Address, ReceivedOutput, Block};
|
||||
|
||||
|
@ -17,15 +17,16 @@ pub use lifetime::LifetimeStage;
|
|||
|
||||
// Database schema definition and associated functions.
|
||||
mod db;
|
||||
use db::{ScannerGlobalDb, SubstrateToEventualityDb};
|
||||
// Task to index the blockchain, ensuring we don't reorganize finalized blocks.
|
||||
mod index;
|
||||
// Scans blocks for received coins.
|
||||
mod scan;
|
||||
/// Task which reports Batches to Substrate.
|
||||
mod report;
|
||||
/// Task which handles events from Substrate once we can.
|
||||
mod substrate;
|
||||
/// Check blocks for transactions expected to eventually occur.
|
||||
mod eventuality;
|
||||
/// Task which reports `Batch`s to Substrate.
|
||||
mod report;
|
||||
|
||||
pub(crate) fn sort_outputs<K: GroupEncoding, A: Address, O: ReceivedOutput<K, A>>(
|
||||
a: &O,
|
||||
|
@ -280,7 +281,7 @@ pub trait Scheduler<S: ScannerFeed>: 'static + Send {
|
|||
/// A representation of a scanner.
|
||||
#[allow(non_snake_case)]
|
||||
pub struct Scanner<S: ScannerFeed> {
|
||||
eventuality_handle: RunNowHandle,
|
||||
substrate_handle: RunNowHandle,
|
||||
_S: PhantomData<S>,
|
||||
}
|
||||
impl<S: ScannerFeed> Scanner<S> {
|
||||
|
@ -297,24 +298,29 @@ impl<S: ScannerFeed> Scanner<S> {
|
|||
let index_task = index::IndexTask::new(db.clone(), feed.clone(), start_block).await;
|
||||
let scan_task = scan::ScanTask::new(db.clone(), feed.clone(), start_block);
|
||||
let report_task = report::ReportTask::<_, S, _>::new(db.clone(), batch_publisher, start_block);
|
||||
let substrate_task = substrate::SubstrateTask::<_, S>::new(db.clone());
|
||||
let eventuality_task = eventuality::EventualityTask::new(db, feed, scheduler, start_block);
|
||||
|
||||
let (_index_handle, index_run) = RunNowHandle::new();
|
||||
let (scan_handle, scan_run) = RunNowHandle::new();
|
||||
let (report_handle, report_run) = RunNowHandle::new();
|
||||
let (substrate_handle, substrate_run) = RunNowHandle::new();
|
||||
let (eventuality_handle, eventuality_run) = RunNowHandle::new();
|
||||
|
||||
// Upon indexing a new block, scan it
|
||||
tokio::spawn(index_task.continually_run(index_run, vec![scan_handle.clone()]));
|
||||
// Upon scanning a block, report it
|
||||
tokio::spawn(scan_task.continually_run(scan_run, vec![report_handle]));
|
||||
// Upon reporting a block, we do nothing
|
||||
// Upon reporting a block, we do nothing (as the burden is on Substrate which won't be
|
||||
// immediately ready)
|
||||
tokio::spawn(report_task.continually_run(report_run, vec![]));
|
||||
// Upon handling an event from Substrate, we run the Eventuality task (as it's what's affected)
|
||||
tokio::spawn(substrate_task.continually_run(substrate_run, vec![eventuality_handle]));
|
||||
// Upon handling the Eventualities in a block, we run the scan task as we've advanced the
|
||||
// window its allowed to scan
|
||||
tokio::spawn(eventuality_task.continually_run(eventuality_run, vec![scan_handle]));
|
||||
|
||||
Self { eventuality_handle, _S: PhantomData }
|
||||
Self { substrate_handle, _S: PhantomData }
|
||||
}
|
||||
|
||||
/// Acknowledge a Batch having been published on Serai.
|
||||
|
@ -335,80 +341,23 @@ impl<S: ScannerFeed> Scanner<S> {
|
|||
mut txn: impl DbTxn,
|
||||
batch_id: u32,
|
||||
in_instruction_succeededs: Vec<bool>,
|
||||
mut burns: Vec<OutInstructionWithBalance>,
|
||||
burns: Vec<OutInstructionWithBalance>,
|
||||
key_to_activate: Option<KeyFor<S>>,
|
||||
) {
|
||||
log::info!("acknowledging batch {batch_id}");
|
||||
|
||||
// TODO: We need to take all of these arguments and send them to a task
|
||||
// Then, when we do have this block number, we need to execute this function
|
||||
let block_number = report::take_block_number_for_batch::<S>(&mut txn, batch_id)
|
||||
.expect("didn't have the block number for a Batch");
|
||||
|
||||
assert!(
|
||||
ScannerGlobalDb::<S>::is_block_notable(&txn, block_number),
|
||||
"acknowledging a block which wasn't notable"
|
||||
// Queue acknowledging this block via the Substrate task
|
||||
substrate::queue_acknowledge_batch::<S>(
|
||||
&mut txn,
|
||||
batch_id,
|
||||
in_instruction_succeededs,
|
||||
burns,
|
||||
key_to_activate,
|
||||
);
|
||||
if let Some(prior_highest_acknowledged_block) =
|
||||
ScannerGlobalDb::<S>::highest_acknowledged_block(&txn)
|
||||
{
|
||||
// If a single block produced multiple Batches, the block number won't increment
|
||||
assert!(
|
||||
block_number >= prior_highest_acknowledged_block,
|
||||
"acknowledging blocks out-of-order"
|
||||
);
|
||||
for b in (prior_highest_acknowledged_block + 1) .. block_number {
|
||||
assert!(
|
||||
!ScannerGlobalDb::<S>::is_block_notable(&txn, b),
|
||||
"skipped acknowledging a block which was notable"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
ScannerGlobalDb::<S>::set_highest_acknowledged_block(&mut txn, block_number);
|
||||
if let Some(key_to_activate) = key_to_activate {
|
||||
ScannerGlobalDb::<S>::queue_key(&mut txn, block_number + S::WINDOW_LENGTH, key_to_activate);
|
||||
}
|
||||
|
||||
// Return the balances for any InInstructions which failed to execute
|
||||
{
|
||||
let return_information = report::take_return_information::<S>(&mut txn, batch_id)
|
||||
.expect("didn't save the return information for Batch we published");
|
||||
assert_eq!(
|
||||
in_instruction_succeededs.len(),
|
||||
return_information.len(),
|
||||
"amount of InInstruction succeededs differed from amount of return information saved"
|
||||
);
|
||||
|
||||
// We map these into standard Burns
|
||||
for (succeeded, return_information) in
|
||||
in_instruction_succeededs.into_iter().zip(return_information)
|
||||
{
|
||||
if succeeded {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(report::ReturnInformation { address, balance }) = return_information {
|
||||
burns.push(OutInstructionWithBalance {
|
||||
instruction: OutInstruction { address: address.into(), data: None },
|
||||
balance,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !burns.is_empty() {
|
||||
// We send these Burns as stemming from this block we just acknowledged
|
||||
// This causes them to be acted on after we accumulate the outputs from this block
|
||||
SubstrateToEventualityDb::send_burns(&mut txn, block_number, &burns);
|
||||
}
|
||||
|
||||
// Commit the txn
|
||||
// Commit this txn so this data is flushed
|
||||
txn.commit();
|
||||
// Run the Eventuality task since we've advanced it
|
||||
// We couldn't successfully do this if that txn was still floating around, uncommitted
|
||||
// The execution of this task won't actually have more work until the txn is committed
|
||||
self.eventuality_handle.run_now();
|
||||
// Then run the Substrate task
|
||||
self.substrate_handle.run_now();
|
||||
}
|
||||
|
||||
/// Queue Burns.
|
||||
|
@ -442,14 +391,16 @@ impl<S: ScannerFeed> Scanner<S> {
|
|||
latency and likely practically require we add regularly scheduled notable blocks (which may be
|
||||
unnecessary).
|
||||
*/
|
||||
pub fn queue_burns(&mut self, txn: &mut impl DbTxn, burns: &Vec<OutInstructionWithBalance>) {
|
||||
pub fn queue_burns(&mut self, mut txn: impl DbTxn, burns: Vec<OutInstructionWithBalance>) {
|
||||
if burns.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let queue_as_of = ScannerGlobalDb::<S>::highest_acknowledged_block(txn)
|
||||
.expect("queueing Burns yet never acknowledged a block");
|
||||
|
||||
SubstrateToEventualityDb::send_burns(txn, queue_as_of, burns)
|
||||
// Queue queueing these burns via the Substrate task
|
||||
substrate::queue_queue_burns::<S>(&mut txn, burns);
|
||||
// Commit this txn so this data is flushed
|
||||
txn.commit();
|
||||
// Then run the Substrate task
|
||||
self.substrate_handle.run_now();
|
||||
}
|
||||
}
|
||||
|
|
89
processor/scanner/src/substrate/db.rs
Normal file
89
processor/scanner/src/substrate/db.rs
Normal file
|
@ -0,0 +1,89 @@
|
|||
use core::marker::PhantomData;
|
||||
|
||||
use group::GroupEncoding;
|
||||
|
||||
use borsh::{BorshSerialize, BorshDeserialize};
|
||||
use serai_db::{Get, DbTxn, create_db, db_channel};
|
||||
|
||||
use serai_coins_primitives::OutInstructionWithBalance;
|
||||
|
||||
use crate::{ScannerFeed, KeyFor};
|
||||
|
||||
#[derive(BorshSerialize, BorshDeserialize)]
|
||||
struct AcknowledgeBatchEncodable {
|
||||
batch_id: u32,
|
||||
in_instruction_succeededs: Vec<bool>,
|
||||
burns: Vec<OutInstructionWithBalance>,
|
||||
key_to_activate: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
#[derive(BorshSerialize, BorshDeserialize)]
|
||||
enum ActionEncodable {
|
||||
AcknowledgeBatch(AcknowledgeBatchEncodable),
|
||||
QueueBurns(Vec<OutInstructionWithBalance>),
|
||||
}
|
||||
|
||||
pub(crate) struct AcknowledgeBatch<S: ScannerFeed> {
|
||||
pub(crate) batch_id: u32,
|
||||
pub(crate) in_instruction_succeededs: Vec<bool>,
|
||||
pub(crate) burns: Vec<OutInstructionWithBalance>,
|
||||
pub(crate) key_to_activate: Option<KeyFor<S>>,
|
||||
}
|
||||
|
||||
pub(crate) enum Action<S: ScannerFeed> {
|
||||
AcknowledgeBatch(AcknowledgeBatch<S>),
|
||||
QueueBurns(Vec<OutInstructionWithBalance>),
|
||||
}
|
||||
|
||||
db_channel!(
|
||||
ScannerSubstrate {
|
||||
Actions: (empty_key: ()) -> ActionEncodable,
|
||||
}
|
||||
);
|
||||
|
||||
pub(crate) struct SubstrateDb<S: ScannerFeed>(PhantomData<S>);
|
||||
impl<S: ScannerFeed> SubstrateDb<S> {
|
||||
pub(crate) fn queue_acknowledge_batch(
|
||||
txn: &mut impl DbTxn,
|
||||
batch_id: u32,
|
||||
in_instruction_succeededs: Vec<bool>,
|
||||
burns: Vec<OutInstructionWithBalance>,
|
||||
key_to_activate: Option<KeyFor<S>>,
|
||||
) {
|
||||
Actions::send(
|
||||
txn,
|
||||
(),
|
||||
&ActionEncodable::AcknowledgeBatch(AcknowledgeBatchEncodable {
|
||||
batch_id,
|
||||
in_instruction_succeededs,
|
||||
burns,
|
||||
key_to_activate: key_to_activate.map(|key| key.to_bytes().as_ref().to_vec()),
|
||||
}),
|
||||
);
|
||||
}
|
||||
pub(crate) fn queue_queue_burns(txn: &mut impl DbTxn, burns: Vec<OutInstructionWithBalance>) {
|
||||
Actions::send(txn, (), &ActionEncodable::QueueBurns(burns));
|
||||
}
|
||||
|
||||
pub(crate) fn next_action(txn: &mut impl DbTxn) -> Option<Action<S>> {
|
||||
let action_encodable = Actions::try_recv(txn, ())?;
|
||||
Some(match action_encodable {
|
||||
ActionEncodable::AcknowledgeBatch(AcknowledgeBatchEncodable {
|
||||
batch_id,
|
||||
in_instruction_succeededs,
|
||||
burns,
|
||||
key_to_activate,
|
||||
}) => Action::AcknowledgeBatch(AcknowledgeBatch {
|
||||
batch_id,
|
||||
in_instruction_succeededs,
|
||||
burns,
|
||||
key_to_activate: key_to_activate.map(|key| {
|
||||
let mut repr = <KeyFor<S> as GroupEncoding>::Repr::default();
|
||||
repr.as_mut().copy_from_slice(&key);
|
||||
KeyFor::<S>::from_bytes(&repr).unwrap()
|
||||
}),
|
||||
}),
|
||||
ActionEncodable::QueueBurns(burns) => Action::QueueBurns(burns),
|
||||
})
|
||||
}
|
||||
}
|
162
processor/scanner/src/substrate/mod.rs
Normal file
162
processor/scanner/src/substrate/mod.rs
Normal file
|
@ -0,0 +1,162 @@
|
|||
use core::marker::PhantomData;
|
||||
|
||||
use serai_db::{DbTxn, Db};
|
||||
|
||||
use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance};
|
||||
|
||||
use primitives::task::ContinuallyRan;
|
||||
use crate::{
|
||||
db::{ScannerGlobalDb, SubstrateToEventualityDb},
|
||||
report, ScannerFeed, KeyFor,
|
||||
};
|
||||
|
||||
mod db;
|
||||
use db::*;
|
||||
|
||||
pub(crate) fn queue_acknowledge_batch<S: ScannerFeed>(
|
||||
txn: &mut impl DbTxn,
|
||||
batch_id: u32,
|
||||
in_instruction_succeededs: Vec<bool>,
|
||||
burns: Vec<OutInstructionWithBalance>,
|
||||
key_to_activate: Option<KeyFor<S>>,
|
||||
) {
|
||||
SubstrateDb::<S>::queue_acknowledge_batch(
|
||||
txn,
|
||||
batch_id,
|
||||
in_instruction_succeededs,
|
||||
burns,
|
||||
key_to_activate,
|
||||
)
|
||||
}
|
||||
pub(crate) fn queue_queue_burns<S: ScannerFeed>(
|
||||
txn: &mut impl DbTxn,
|
||||
burns: Vec<OutInstructionWithBalance>,
|
||||
) {
|
||||
SubstrateDb::<S>::queue_queue_burns(txn, burns)
|
||||
}
|
||||
|
||||
/*
|
||||
When Serai acknowledges a Batch, we can only handle it once we've scanned the chain and generated
|
||||
the same Batch ourselves. This takes the `acknowledge_batch`, `queue_burns` arguments and sits on
|
||||
them until we're able to process them.
|
||||
*/
|
||||
#[allow(non_snake_case)]
|
||||
pub(crate) struct SubstrateTask<D: Db, S: ScannerFeed> {
|
||||
db: D,
|
||||
_S: PhantomData<S>,
|
||||
}
|
||||
|
||||
impl<D: Db, S: ScannerFeed> SubstrateTask<D, S> {
|
||||
pub(crate) fn new(db: D) -> Self {
|
||||
Self { db, _S: PhantomData }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<D: Db, S: ScannerFeed> ContinuallyRan for SubstrateTask<D, S> {
|
||||
async fn run_iteration(&mut self) -> Result<bool, String> {
|
||||
let mut made_progress = false;
|
||||
loop {
|
||||
// Fetch the next action to handle
|
||||
let mut txn = self.db.txn();
|
||||
let Some(action) = SubstrateDb::<S>::next_action(&mut txn) else {
|
||||
drop(txn);
|
||||
return Ok(made_progress);
|
||||
};
|
||||
|
||||
match action {
|
||||
Action::AcknowledgeBatch(AcknowledgeBatch {
|
||||
batch_id,
|
||||
in_instruction_succeededs,
|
||||
mut burns,
|
||||
key_to_activate,
|
||||
}) => {
|
||||
// Check if we have the information for this batch
|
||||
let Some(block_number) = report::take_block_number_for_batch::<S>(&mut txn, batch_id)
|
||||
else {
|
||||
// If we don't, drop this txn (restoring the action to the database)
|
||||
drop(txn);
|
||||
return Ok(made_progress);
|
||||
};
|
||||
|
||||
// Mark we made progress and handle this
|
||||
made_progress = true;
|
||||
|
||||
assert!(
|
||||
ScannerGlobalDb::<S>::is_block_notable(&txn, block_number),
|
||||
"acknowledging a block which wasn't notable"
|
||||
);
|
||||
if let Some(prior_highest_acknowledged_block) =
|
||||
ScannerGlobalDb::<S>::highest_acknowledged_block(&txn)
|
||||
{
|
||||
// If a single block produced multiple Batches, the block number won't increment
|
||||
assert!(
|
||||
block_number >= prior_highest_acknowledged_block,
|
||||
"acknowledging blocks out-of-order"
|
||||
);
|
||||
for b in (prior_highest_acknowledged_block + 1) .. block_number {
|
||||
assert!(
|
||||
!ScannerGlobalDb::<S>::is_block_notable(&txn, b),
|
||||
"skipped acknowledging a block which was notable"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
ScannerGlobalDb::<S>::set_highest_acknowledged_block(&mut txn, block_number);
|
||||
if let Some(key_to_activate) = key_to_activate {
|
||||
ScannerGlobalDb::<S>::queue_key(
|
||||
&mut txn,
|
||||
block_number + S::WINDOW_LENGTH,
|
||||
key_to_activate,
|
||||
);
|
||||
}
|
||||
|
||||
// Return the balances for any InInstructions which failed to execute
|
||||
{
|
||||
let return_information = report::take_return_information::<S>(&mut txn, batch_id)
|
||||
.expect("didn't save the return information for Batch we published");
|
||||
assert_eq!(
|
||||
in_instruction_succeededs.len(),
|
||||
return_information.len(),
|
||||
"amount of InInstruction succeededs differed from amount of return information saved"
|
||||
);
|
||||
|
||||
// We map these into standard Burns
|
||||
for (succeeded, return_information) in
|
||||
in_instruction_succeededs.into_iter().zip(return_information)
|
||||
{
|
||||
if succeeded {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(report::ReturnInformation { address, balance }) = return_information {
|
||||
burns.push(OutInstructionWithBalance {
|
||||
instruction: OutInstruction { address: address.into(), data: None },
|
||||
balance,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !burns.is_empty() {
|
||||
// We send these Burns as stemming from this block we just acknowledged
|
||||
// This causes them to be acted on after we accumulate the outputs from this block
|
||||
SubstrateToEventualityDb::send_burns(&mut txn, block_number, &burns);
|
||||
}
|
||||
}
|
||||
|
||||
Action::QueueBurns(burns) => {
|
||||
// We can instantly handle this so long as we've handled all prior actions
|
||||
made_progress = true;
|
||||
|
||||
let queue_as_of = ScannerGlobalDb::<S>::highest_acknowledged_block(&txn)
|
||||
.expect("queueing Burns yet never acknowledged a block");
|
||||
|
||||
SubstrateToEventualityDb::send_burns(&mut txn, queue_as_of, &burns);
|
||||
}
|
||||
}
|
||||
|
||||
txn.commit();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue