Replace scanner's BatchPublisher with a pair of DB channels

This commit is contained in:
Luke Parker 2024-09-08 23:42:18 -04:00
parent f07ec7bee0
commit 4152bcacb2
7 changed files with 152 additions and 61 deletions

View file

@ -35,7 +35,7 @@ tokio = { version = "1", default-features = false, features = ["rt-multi-thread"
serai-db = { path = "../../common/db" }
serai-primitives = { path = "../../substrate/primitives", default-features = false, features = ["std"] }
serai-in-instructions-primitives = { path = "../../substrate/in-instructions/primitives", default-features = false, features = ["std"] }
serai-in-instructions-primitives = { path = "../../substrate/in-instructions/primitives", default-features = false, features = ["std", "borsh"] }
serai-coins-primitives = { path = "../../substrate/coins/primitives", default-features = false, features = ["std", "borsh"] }
primitives = { package = "serai-processor-primitives", path = "../primitives" }

View file

@ -2,11 +2,12 @@ use core::marker::PhantomData;
use std::io::{self, Read, Write};
use group::GroupEncoding;
use scale::{Encode, Decode, IoReader};
use borsh::{BorshSerialize, BorshDeserialize};
use serai_db::{Get, DbTxn, create_db, db_channel};
use serai_in_instructions_primitives::InInstructionWithBalance;
use serai_in_instructions_primitives::{InInstructionWithBalance, Batch};
use serai_coins_primitives::OutInstructionWithBalance;
use primitives::{EncodableG, Address, ReceivedOutput};
@ -105,6 +106,10 @@ create_db!(
pub(crate) struct ScannerGlobalDb<S: ScannerFeed>(PhantomData<S>);
impl<S: ScannerFeed> ScannerGlobalDb<S> {
pub(crate) fn has_any_key_been_queued(getter: &impl Get) -> bool {
ActiveKeys::get::<EncodableG<KeyFor<S>>>(getter).is_some()
}
/// Queue a key.
///
/// Keys may be queued whenever, so long as they're scheduled to activate `WINDOW_LENGTH` blocks
@ -460,15 +465,20 @@ db_channel! {
}
}
pub(crate) struct InInstructionData<S: ScannerFeed> {
pub(crate) external_key_for_session_to_sign_batch: KeyFor<S>,
pub(crate) returnable_in_instructions: Vec<Returnable<S>>,
}
pub(crate) struct ScanToReportDb<S: ScannerFeed>(PhantomData<S>);
impl<S: ScannerFeed> ScanToReportDb<S> {
pub(crate) fn send_in_instructions(
txn: &mut impl DbTxn,
block_number: u64,
returnable_in_instructions: &[Returnable<S>],
data: &InInstructionData<S>,
) {
let mut buf = vec![];
for returnable_in_instruction in returnable_in_instructions {
let mut buf = data.external_key_for_session_to_sign_batch.to_bytes().as_ref().to_vec();
for returnable_in_instruction in &data.returnable_in_instructions {
returnable_in_instruction.write(&mut buf).unwrap();
}
InInstructions::send(
@ -481,7 +491,7 @@ impl<S: ScannerFeed> ScanToReportDb<S> {
pub(crate) fn recv_in_instructions(
txn: &mut impl DbTxn,
block_number: u64,
) -> Vec<Returnable<S>> {
) -> InInstructionData<S> {
let data = InInstructions::try_recv(txn, ())
.expect("receiving InInstructions for a scanned block not yet sent");
assert_eq!(
@ -490,11 +500,20 @@ impl<S: ScannerFeed> ScanToReportDb<S> {
);
let mut buf = data.returnable_in_instructions.as_slice();
let external_key_for_session_to_sign_batch = {
let mut external_key_for_session_to_sign_batch =
<KeyFor<S> as GroupEncoding>::Repr::default();
let key_len = external_key_for_session_to_sign_batch.as_ref().len();
external_key_for_session_to_sign_batch.as_mut().copy_from_slice(&buf[.. key_len]);
buf = &buf[key_len ..];
KeyFor::<S>::from_bytes(&external_key_for_session_to_sign_batch).unwrap()
};
let mut returnable_in_instructions = vec![];
while !buf.is_empty() {
returnable_in_instructions.push(Returnable::read(&mut buf).unwrap());
}
returnable_in_instructions
InInstructionData { external_key_for_session_to_sign_batch, returnable_in_instructions }
}
}
@ -522,25 +541,55 @@ impl SubstrateToEventualityDb {
}
}
mod _completed_eventualities {
mod _public_db {
use serai_in_instructions_primitives::Batch;
use serai_db::{Get, DbTxn, create_db, db_channel};
db_channel! {
ScannerPublic {
BatchToSign: (key: &[u8]) -> Batch,
AcknowledgedBatch: (key: &[u8]) -> u32,
CompletedEventualities: (key: &[u8]) -> [u8; 32],
}
}
}
/// The batches to sign and publish.
pub struct BatchToSign<K: GroupEncoding>(PhantomData<K>);
impl<K: GroupEncoding> BatchToSign<K> {
pub(crate) fn send(txn: &mut impl DbTxn, key: &K, batch: &Batch) {
_public_db::BatchToSign::send(txn, key.to_bytes().as_ref(), batch);
}
/// Receive a batch to sign and publish.
pub fn try_recv(txn: &mut impl DbTxn, key: &K) -> Option<Batch> {
_public_db::BatchToSign::try_recv(txn, key.to_bytes().as_ref())
}
}
/// The batches which were acknowledged on-chain.
pub struct AcknowledgedBatch<K: GroupEncoding>(PhantomData<K>);
impl<K: GroupEncoding> AcknowledgedBatch<K> {
pub(crate) fn send(txn: &mut impl DbTxn, key: &K, batch: u32) {
_public_db::AcknowledgedBatch::send(txn, key.to_bytes().as_ref(), &batch);
}
/// Receive the ID of a batch which was acknowledged.
pub fn try_recv(txn: &mut impl DbTxn, key: &K) -> Option<u32> {
_public_db::AcknowledgedBatch::try_recv(txn, key.to_bytes().as_ref())
}
}
/// The IDs of completed Eventualities found on-chain, within a finalized block.
pub struct CompletedEventualities<K: GroupEncoding>(PhantomData<K>);
impl<K: GroupEncoding> CompletedEventualities<K> {
pub(crate) fn send(txn: &mut impl DbTxn, key: &K, id: [u8; 32]) {
_completed_eventualities::CompletedEventualities::send(txn, key.to_bytes().as_ref(), &id);
_public_db::CompletedEventualities::send(txn, key.to_bytes().as_ref(), &id);
}
/// Receive the ID of a completed Eventuality.
pub fn try_recv(txn: &mut impl DbTxn, key: &K) -> Option<[u8; 32]> {
_completed_eventualities::CompletedEventualities::try_recv(txn, key.to_bytes().as_ref())
_public_db::CompletedEventualities::try_recv(txn, key.to_bytes().as_ref())
}
}

View file

@ -10,7 +10,6 @@ use group::GroupEncoding;
use serai_db::{Get, DbTxn, Db};
use serai_primitives::{NetworkId, Coin, Amount};
use serai_in_instructions_primitives::Batch;
use serai_coins_primitives::OutInstructionWithBalance;
use primitives::{task::*, Address, ReceivedOutput, Block, Payment};
@ -21,7 +20,8 @@ pub use lifetime::LifetimeStage;
// Database schema definition and associated functions.
mod db;
pub use db::CompletedEventualities;
use db::ScannerGlobalDb;
pub use db::{BatchToSign, AcknowledgedBatch, CompletedEventualities};
// Task to index the blockchain, ensuring we don't reorganize finalized blocks.
mod index;
// Scans blocks for received coins.
@ -171,24 +171,6 @@ pub type EventualityFor<S> = <<S as ScannerFeed>::Block as Block>::Eventuality;
/// The block type for this ScannerFeed.
pub type BlockFor<S> = <S as ScannerFeed>::Block;
/// An object usable to publish a Batch.
// This will presumably be the Batch signer defined in `serai-processor-signers` or a test shim.
// It could also be some app-layer database for the purpose of verifying the Batches published to
// Serai.
#[async_trait::async_trait]
pub trait BatchPublisher: 'static + Send + Sync {
/// An error encountered when publishing the Batch.
///
/// This MUST be an ephemeral error. Retrying publication MUST eventually resolve without manual
/// intervention/changing the arguments.
type EphemeralError: Debug;
/// Publish a Batch.
///
/// This function must be safe to call with the same Batch multiple times.
async fn publish_batch(&mut self, batch: Batch) -> Result<(), Self::EphemeralError>;
}
/// A return to occur.
pub struct Return<S: ScannerFeed> {
address: AddressFor<S>,
@ -351,14 +333,20 @@ impl<S: ScannerFeed> Scanner<S> {
///
/// This will begin its execution, spawning several asynchronous tasks.
pub async fn new<Sch: Scheduler<S>>(
db: impl Db,
mut db: impl Db,
feed: S,
batch_publisher: impl BatchPublisher,
start_block: u64,
start_key: KeyFor<S>,
) -> Self {
if !ScannerGlobalDb::<S>::has_any_key_been_queued(&db) {
let mut txn = db.txn();
ScannerGlobalDb::<S>::queue_key(&mut txn, start_block, start_key);
txn.commit();
}
let index_task = index::IndexTask::new(db.clone(), feed.clone(), start_block).await;
let scan_task = scan::ScanTask::new(db.clone(), feed.clone(), start_block);
let report_task = report::ReportTask::<_, S, _>::new(db.clone(), batch_publisher, start_block);
let report_task = report::ReportTask::<_, S>::new(db.clone(), start_block);
let substrate_task = substrate::SubstrateTask::<_, S>::new(db.clone());
let eventuality_task = eventuality::EventualityTask::<_, _, Sch>::new(db, feed, start_block);

View file

@ -1,6 +1,8 @@
use core::marker::PhantomData;
use std::io::{Read, Write};
use group::GroupEncoding;
use scale::{Encode, Decode, IoReader};
use serai_db::{Get, DbTxn, create_db};
@ -8,7 +10,7 @@ use serai_primitives::Balance;
use primitives::Address;
use crate::{ScannerFeed, AddressFor};
use crate::{ScannerFeed, KeyFor, AddressFor};
create_db!(
ScannerReport {
@ -20,6 +22,9 @@ create_db!(
// The block number which caused a batch
BlockNumberForBatch: (batch: u32) -> u64,
// The external key for the session which should sign a batch
ExternalKeyForSessionToSignBatch: (batch: u32) -> Vec<u8>,
// The return addresses for the InInstructions within a Batch
SerializedReturnAddresses: (batch: u32) -> Vec<u8>,
}
@ -55,6 +60,29 @@ impl<S: ScannerFeed> ReportDb<S> {
Some(block_number)
}
pub(crate) fn save_external_key_for_session_to_sign_batch(
txn: &mut impl DbTxn,
id: u32,
external_key_for_session_to_sign_batch: &KeyFor<S>,
) {
ExternalKeyForSessionToSignBatch::set(
txn,
id,
&external_key_for_session_to_sign_batch.to_bytes().as_ref().to_vec(),
);
}
pub(crate) fn take_external_key_for_session_to_sign_batch(
txn: &mut impl DbTxn,
id: u32,
) -> Option<KeyFor<S>> {
ExternalKeyForSessionToSignBatch::get(txn, id).map(|key_vec| {
let mut key = <KeyFor<S> as GroupEncoding>::Repr::default();
key.as_mut().copy_from_slice(&key_vec);
KeyFor::<S>::from_bytes(&key).unwrap()
})
}
pub(crate) fn save_return_information(
txn: &mut impl DbTxn,
id: u32,

View file

@ -8,23 +8,16 @@ use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
use primitives::task::ContinuallyRan;
use crate::{
db::{Returnable, ScannerGlobalDb, ScanToReportDb},
db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToReportDb, BatchToSign},
index,
scan::next_to_scan_for_outputs_block,
ScannerFeed, BatchPublisher,
ScannerFeed, KeyFor,
};
mod db;
pub(crate) use db::ReturnInformation;
use db::ReportDb;
pub(crate) fn take_return_information<S: ScannerFeed>(
txn: &mut impl DbTxn,
id: u32,
) -> Option<Vec<Option<ReturnInformation<S>>>> {
ReportDb::<S>::take_return_information(txn, id)
}
pub(crate) fn take_block_number_for_batch<S: ScannerFeed>(
txn: &mut impl DbTxn,
id: u32,
@ -32,6 +25,20 @@ pub(crate) fn take_block_number_for_batch<S: ScannerFeed>(
ReportDb::<S>::take_block_number_for_batch(txn, id)
}
pub(crate) fn take_external_key_for_session_to_sign_batch<S: ScannerFeed>(
txn: &mut impl DbTxn,
id: u32,
) -> Option<KeyFor<S>> {
ReportDb::<S>::take_external_key_for_session_to_sign_batch(txn, id)
}
pub(crate) fn take_return_information<S: ScannerFeed>(
txn: &mut impl DbTxn,
id: u32,
) -> Option<Vec<Option<ReturnInformation<S>>>> {
ReportDb::<S>::take_return_information(txn, id)
}
/*
This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion.
@ -40,14 +47,13 @@ pub(crate) fn take_block_number_for_batch<S: ScannerFeed>(
the InInstructions for it.
*/
#[allow(non_snake_case)]
pub(crate) struct ReportTask<D: Db, S: ScannerFeed, B: BatchPublisher> {
pub(crate) struct ReportTask<D: Db, S: ScannerFeed> {
db: D,
batch_publisher: B,
_S: PhantomData<S>,
}
impl<D: Db, S: ScannerFeed, B: BatchPublisher> ReportTask<D, S, B> {
pub(crate) fn new(mut db: D, batch_publisher: B, start_block: u64) -> Self {
impl<D: Db, S: ScannerFeed> ReportTask<D, S> {
pub(crate) fn new(mut db: D, start_block: u64) -> Self {
if ReportDb::<S>::next_to_potentially_report_block(&db).is_none() {
// Initialize the DB
let mut txn = db.txn();
@ -55,12 +61,12 @@ impl<D: Db, S: ScannerFeed, B: BatchPublisher> ReportTask<D, S, B> {
txn.commit();
}
Self { db, batch_publisher, _S: PhantomData }
Self { db, _S: PhantomData }
}
}
#[async_trait::async_trait]
impl<D: Db, S: ScannerFeed, B: BatchPublisher> ContinuallyRan for ReportTask<D, S, B> {
impl<D: Db, S: ScannerFeed> ContinuallyRan for ReportTask<D, S> {
async fn run_iteration(&mut self) -> Result<bool, String> {
let highest_reportable = {
// Fetch the next to scan block
@ -87,7 +93,10 @@ impl<D: Db, S: ScannerFeed, B: BatchPublisher> ContinuallyRan for ReportTask<D,
// Receive the InInstructions for this block
// We always do this as we can't trivially tell if we should recv InInstructions before we do
let in_instructions = ScanToReportDb::<S>::recv_in_instructions(&mut txn, b);
let InInstructionData {
external_key_for_session_to_sign_batch,
returnable_in_instructions: in_instructions,
} = ScanToReportDb::<S>::recv_in_instructions(&mut txn, b);
let notable = ScannerGlobalDb::<S>::is_block_notable(&txn, b);
if !notable {
assert!(in_instructions.is_empty(), "block wasn't notable yet had InInstructions");
@ -138,19 +147,20 @@ impl<D: Db, S: ScannerFeed, B: BatchPublisher> ContinuallyRan for ReportTask<D,
.push(return_address.map(|address| ReturnInformation { address, balance }));
}
// Save the return addresses to the databse
// Save the return addresses to the database
assert_eq!(batches.len(), return_information.len());
for (batch, return_information) in batches.iter().zip(&return_information) {
assert_eq!(batch.instructions.len(), return_information.len());
ReportDb::<S>::save_external_key_for_session_to_sign_batch(
&mut txn,
batch.id,
&external_key_for_session_to_sign_batch,
);
ReportDb::<S>::save_return_information(&mut txn, batch.id, return_information);
}
for batch in batches {
self
.batch_publisher
.publish_batch(batch)
.await
.map_err(|e| format!("failed to publish batch: {e:?}"))?;
BatchToSign::send(&mut txn, &external_key_for_session_to_sign_batch, &batch);
}
}

View file

@ -13,8 +13,8 @@ use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Block};
use crate::{
lifetime::LifetimeStage,
db::{
OutputWithInInstruction, Returnable, SenderScanData, ScannerGlobalDb, ScanToReportDb,
ScanToEventualityDb,
OutputWithInInstruction, Returnable, SenderScanData, ScannerGlobalDb, InInstructionData,
ScanToReportDb, ScanToEventualityDb,
},
BlockExt, ScannerFeed, AddressFor, OutputFor, Return, sort_outputs,
eventuality::latest_scannable_block,
@ -166,7 +166,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
let mut costs_to_aggregate = HashMap::with_capacity(1);
// Scan for each key
for key in keys {
for key in &keys {
for output in block.scan_for_outputs(key.key) {
assert_eq!(output.key(), key.key);
@ -339,7 +339,17 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
let in_instructions =
in_instructions.into_iter().map(|(_id, in_instruction)| in_instruction).collect::<Vec<_>>();
// Send the InInstructions to the report task
ScanToReportDb::<S>::send_in_instructions(&mut txn, b, &in_instructions);
// We need to also specify which key is responsible for signing the Batch for these, which
// will always be the oldest key (as the new key signing the Batch signifies handover
// acceptance)
ScanToReportDb::<S>::send_in_instructions(
&mut txn,
b,
&InInstructionData {
external_key_for_session_to_sign_batch: keys[0].key,
returnable_in_instructions: in_instructions,
},
);
// Send the scan data to the eventuality task
ScanToEventualityDb::<S>::send_scan_data(&mut txn, b, &scan_data);

View file

@ -6,7 +6,7 @@ use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance};
use primitives::task::ContinuallyRan;
use crate::{
db::{ScannerGlobalDb, SubstrateToEventualityDb},
db::{ScannerGlobalDb, SubstrateToEventualityDb, AcknowledgedBatch},
report, ScannerFeed, KeyFor,
};
@ -79,6 +79,12 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for SubstrateTask<D, S> {
return Ok(made_progress);
};
{
let external_key_for_session_to_sign_batch =
report::take_external_key_for_session_to_sign_batch::<S>(&mut txn, batch_id).unwrap();
AcknowledgedBatch::send(&mut txn, &external_key_for_session_to_sign_batch, batch_id);
}
// Mark we made progress and handle this
made_progress = true;