From 4152bcacb250ddcf229bbce73a037cb9be0cb5cf Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 8 Sep 2024 23:42:18 -0400 Subject: [PATCH] Replace scanner's BatchPublisher with a pair of DB channels --- processor/scanner/Cargo.toml | 2 +- processor/scanner/src/db.rs | 67 ++++++++++++++++++++++---- processor/scanner/src/lib.rs | 34 +++++-------- processor/scanner/src/report/db.rs | 30 +++++++++++- processor/scanner/src/report/mod.rs | 54 ++++++++++++--------- processor/scanner/src/scan/mod.rs | 18 +++++-- processor/scanner/src/substrate/mod.rs | 8 ++- 7 files changed, 152 insertions(+), 61 deletions(-) diff --git a/processor/scanner/Cargo.toml b/processor/scanner/Cargo.toml index 2a3e7e0a..e3e08329 100644 --- a/processor/scanner/Cargo.toml +++ b/processor/scanner/Cargo.toml @@ -35,7 +35,7 @@ tokio = { version = "1", default-features = false, features = ["rt-multi-thread" serai-db = { path = "../../common/db" } serai-primitives = { path = "../../substrate/primitives", default-features = false, features = ["std"] } -serai-in-instructions-primitives = { path = "../../substrate/in-instructions/primitives", default-features = false, features = ["std"] } +serai-in-instructions-primitives = { path = "../../substrate/in-instructions/primitives", default-features = false, features = ["std", "borsh"] } serai-coins-primitives = { path = "../../substrate/coins/primitives", default-features = false, features = ["std", "borsh"] } primitives = { package = "serai-processor-primitives", path = "../primitives" } diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index f72fa202..f54ff8e1 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -2,11 +2,12 @@ use core::marker::PhantomData; use std::io::{self, Read, Write}; use group::GroupEncoding; + use scale::{Encode, Decode, IoReader}; use borsh::{BorshSerialize, BorshDeserialize}; use serai_db::{Get, DbTxn, create_db, db_channel}; -use serai_in_instructions_primitives::InInstructionWithBalance; +use serai_in_instructions_primitives::{InInstructionWithBalance, Batch}; use serai_coins_primitives::OutInstructionWithBalance; use primitives::{EncodableG, Address, ReceivedOutput}; @@ -105,6 +106,10 @@ create_db!( pub(crate) struct ScannerGlobalDb(PhantomData); impl ScannerGlobalDb { + pub(crate) fn has_any_key_been_queued(getter: &impl Get) -> bool { + ActiveKeys::get::>>(getter).is_some() + } + /// Queue a key. /// /// Keys may be queued whenever, so long as they're scheduled to activate `WINDOW_LENGTH` blocks @@ -460,15 +465,20 @@ db_channel! { } } +pub(crate) struct InInstructionData { + pub(crate) external_key_for_session_to_sign_batch: KeyFor, + pub(crate) returnable_in_instructions: Vec>, +} + pub(crate) struct ScanToReportDb(PhantomData); impl ScanToReportDb { pub(crate) fn send_in_instructions( txn: &mut impl DbTxn, block_number: u64, - returnable_in_instructions: &[Returnable], + data: &InInstructionData, ) { - let mut buf = vec![]; - for returnable_in_instruction in returnable_in_instructions { + let mut buf = data.external_key_for_session_to_sign_batch.to_bytes().as_ref().to_vec(); + for returnable_in_instruction in &data.returnable_in_instructions { returnable_in_instruction.write(&mut buf).unwrap(); } InInstructions::send( @@ -481,7 +491,7 @@ impl ScanToReportDb { pub(crate) fn recv_in_instructions( txn: &mut impl DbTxn, block_number: u64, - ) -> Vec> { + ) -> InInstructionData { let data = InInstructions::try_recv(txn, ()) .expect("receiving InInstructions for a scanned block not yet sent"); assert_eq!( @@ -490,11 +500,20 @@ impl ScanToReportDb { ); let mut buf = data.returnable_in_instructions.as_slice(); + let external_key_for_session_to_sign_batch = { + let mut external_key_for_session_to_sign_batch = + as GroupEncoding>::Repr::default(); + let key_len = external_key_for_session_to_sign_batch.as_ref().len(); + external_key_for_session_to_sign_batch.as_mut().copy_from_slice(&buf[.. key_len]); + buf = &buf[key_len ..]; + KeyFor::::from_bytes(&external_key_for_session_to_sign_batch).unwrap() + }; + let mut returnable_in_instructions = vec![]; while !buf.is_empty() { returnable_in_instructions.push(Returnable::read(&mut buf).unwrap()); } - returnable_in_instructions + InInstructionData { external_key_for_session_to_sign_batch, returnable_in_instructions } } } @@ -522,25 +541,55 @@ impl SubstrateToEventualityDb { } } -mod _completed_eventualities { +mod _public_db { + use serai_in_instructions_primitives::Batch; + use serai_db::{Get, DbTxn, create_db, db_channel}; db_channel! { ScannerPublic { + BatchToSign: (key: &[u8]) -> Batch, + AcknowledgedBatch: (key: &[u8]) -> u32, CompletedEventualities: (key: &[u8]) -> [u8; 32], } } } +/// The batches to sign and publish. +pub struct BatchToSign(PhantomData); +impl BatchToSign { + pub(crate) fn send(txn: &mut impl DbTxn, key: &K, batch: &Batch) { + _public_db::BatchToSign::send(txn, key.to_bytes().as_ref(), batch); + } + + /// Receive a batch to sign and publish. + pub fn try_recv(txn: &mut impl DbTxn, key: &K) -> Option { + _public_db::BatchToSign::try_recv(txn, key.to_bytes().as_ref()) + } +} + +/// The batches which were acknowledged on-chain. +pub struct AcknowledgedBatch(PhantomData); +impl AcknowledgedBatch { + pub(crate) fn send(txn: &mut impl DbTxn, key: &K, batch: u32) { + _public_db::AcknowledgedBatch::send(txn, key.to_bytes().as_ref(), &batch); + } + + /// Receive the ID of a batch which was acknowledged. + pub fn try_recv(txn: &mut impl DbTxn, key: &K) -> Option { + _public_db::AcknowledgedBatch::try_recv(txn, key.to_bytes().as_ref()) + } +} + /// The IDs of completed Eventualities found on-chain, within a finalized block. pub struct CompletedEventualities(PhantomData); impl CompletedEventualities { pub(crate) fn send(txn: &mut impl DbTxn, key: &K, id: [u8; 32]) { - _completed_eventualities::CompletedEventualities::send(txn, key.to_bytes().as_ref(), &id); + _public_db::CompletedEventualities::send(txn, key.to_bytes().as_ref(), &id); } /// Receive the ID of a completed Eventuality. pub fn try_recv(txn: &mut impl DbTxn, key: &K) -> Option<[u8; 32]> { - _completed_eventualities::CompletedEventualities::try_recv(txn, key.to_bytes().as_ref()) + _public_db::CompletedEventualities::try_recv(txn, key.to_bytes().as_ref()) } } diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 3323c6ff..bcd195ec 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -10,7 +10,6 @@ use group::GroupEncoding; use serai_db::{Get, DbTxn, Db}; use serai_primitives::{NetworkId, Coin, Amount}; -use serai_in_instructions_primitives::Batch; use serai_coins_primitives::OutInstructionWithBalance; use primitives::{task::*, Address, ReceivedOutput, Block, Payment}; @@ -21,7 +20,8 @@ pub use lifetime::LifetimeStage; // Database schema definition and associated functions. mod db; -pub use db::CompletedEventualities; +use db::ScannerGlobalDb; +pub use db::{BatchToSign, AcknowledgedBatch, CompletedEventualities}; // Task to index the blockchain, ensuring we don't reorganize finalized blocks. mod index; // Scans blocks for received coins. @@ -171,24 +171,6 @@ pub type EventualityFor = <::Block as Block>::Eventuality; /// The block type for this ScannerFeed. pub type BlockFor = ::Block; -/// An object usable to publish a Batch. -// This will presumably be the Batch signer defined in `serai-processor-signers` or a test shim. -// It could also be some app-layer database for the purpose of verifying the Batches published to -// Serai. -#[async_trait::async_trait] -pub trait BatchPublisher: 'static + Send + Sync { - /// An error encountered when publishing the Batch. - /// - /// This MUST be an ephemeral error. Retrying publication MUST eventually resolve without manual - /// intervention/changing the arguments. - type EphemeralError: Debug; - - /// Publish a Batch. - /// - /// This function must be safe to call with the same Batch multiple times. - async fn publish_batch(&mut self, batch: Batch) -> Result<(), Self::EphemeralError>; -} - /// A return to occur. pub struct Return { address: AddressFor, @@ -351,14 +333,20 @@ impl Scanner { /// /// This will begin its execution, spawning several asynchronous tasks. pub async fn new>( - db: impl Db, + mut db: impl Db, feed: S, - batch_publisher: impl BatchPublisher, start_block: u64, + start_key: KeyFor, ) -> Self { + if !ScannerGlobalDb::::has_any_key_been_queued(&db) { + let mut txn = db.txn(); + ScannerGlobalDb::::queue_key(&mut txn, start_block, start_key); + txn.commit(); + } + let index_task = index::IndexTask::new(db.clone(), feed.clone(), start_block).await; let scan_task = scan::ScanTask::new(db.clone(), feed.clone(), start_block); - let report_task = report::ReportTask::<_, S, _>::new(db.clone(), batch_publisher, start_block); + let report_task = report::ReportTask::<_, S>::new(db.clone(), start_block); let substrate_task = substrate::SubstrateTask::<_, S>::new(db.clone()); let eventuality_task = eventuality::EventualityTask::<_, _, Sch>::new(db, feed, start_block); diff --git a/processor/scanner/src/report/db.rs b/processor/scanner/src/report/db.rs index baff6635..05239779 100644 --- a/processor/scanner/src/report/db.rs +++ b/processor/scanner/src/report/db.rs @@ -1,6 +1,8 @@ use core::marker::PhantomData; use std::io::{Read, Write}; +use group::GroupEncoding; + use scale::{Encode, Decode, IoReader}; use serai_db::{Get, DbTxn, create_db}; @@ -8,7 +10,7 @@ use serai_primitives::Balance; use primitives::Address; -use crate::{ScannerFeed, AddressFor}; +use crate::{ScannerFeed, KeyFor, AddressFor}; create_db!( ScannerReport { @@ -20,6 +22,9 @@ create_db!( // The block number which caused a batch BlockNumberForBatch: (batch: u32) -> u64, + // The external key for the session which should sign a batch + ExternalKeyForSessionToSignBatch: (batch: u32) -> Vec, + // The return addresses for the InInstructions within a Batch SerializedReturnAddresses: (batch: u32) -> Vec, } @@ -55,6 +60,29 @@ impl ReportDb { Some(block_number) } + pub(crate) fn save_external_key_for_session_to_sign_batch( + txn: &mut impl DbTxn, + id: u32, + external_key_for_session_to_sign_batch: &KeyFor, + ) { + ExternalKeyForSessionToSignBatch::set( + txn, + id, + &external_key_for_session_to_sign_batch.to_bytes().as_ref().to_vec(), + ); + } + + pub(crate) fn take_external_key_for_session_to_sign_batch( + txn: &mut impl DbTxn, + id: u32, + ) -> Option> { + ExternalKeyForSessionToSignBatch::get(txn, id).map(|key_vec| { + let mut key = as GroupEncoding>::Repr::default(); + key.as_mut().copy_from_slice(&key_vec); + KeyFor::::from_bytes(&key).unwrap() + }) + } + pub(crate) fn save_return_information( txn: &mut impl DbTxn, id: u32, diff --git a/processor/scanner/src/report/mod.rs b/processor/scanner/src/report/mod.rs index ba851713..f983d0e7 100644 --- a/processor/scanner/src/report/mod.rs +++ b/processor/scanner/src/report/mod.rs @@ -8,23 +8,16 @@ use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; use primitives::task::ContinuallyRan; use crate::{ - db::{Returnable, ScannerGlobalDb, ScanToReportDb}, + db::{Returnable, ScannerGlobalDb, InInstructionData, ScanToReportDb, BatchToSign}, index, scan::next_to_scan_for_outputs_block, - ScannerFeed, BatchPublisher, + ScannerFeed, KeyFor, }; mod db; pub(crate) use db::ReturnInformation; use db::ReportDb; -pub(crate) fn take_return_information( - txn: &mut impl DbTxn, - id: u32, -) -> Option>>> { - ReportDb::::take_return_information(txn, id) -} - pub(crate) fn take_block_number_for_batch( txn: &mut impl DbTxn, id: u32, @@ -32,6 +25,20 @@ pub(crate) fn take_block_number_for_batch( ReportDb::::take_block_number_for_batch(txn, id) } +pub(crate) fn take_external_key_for_session_to_sign_batch( + txn: &mut impl DbTxn, + id: u32, +) -> Option> { + ReportDb::::take_external_key_for_session_to_sign_batch(txn, id) +} + +pub(crate) fn take_return_information( + txn: &mut impl DbTxn, + id: u32, +) -> Option>>> { + ReportDb::::take_return_information(txn, id) +} + /* This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion. @@ -40,14 +47,13 @@ pub(crate) fn take_block_number_for_batch( the InInstructions for it. */ #[allow(non_snake_case)] -pub(crate) struct ReportTask { +pub(crate) struct ReportTask { db: D, - batch_publisher: B, _S: PhantomData, } -impl ReportTask { - pub(crate) fn new(mut db: D, batch_publisher: B, start_block: u64) -> Self { +impl ReportTask { + pub(crate) fn new(mut db: D, start_block: u64) -> Self { if ReportDb::::next_to_potentially_report_block(&db).is_none() { // Initialize the DB let mut txn = db.txn(); @@ -55,12 +61,12 @@ impl ReportTask { txn.commit(); } - Self { db, batch_publisher, _S: PhantomData } + Self { db, _S: PhantomData } } } #[async_trait::async_trait] -impl ContinuallyRan for ReportTask { +impl ContinuallyRan for ReportTask { async fn run_iteration(&mut self) -> Result { let highest_reportable = { // Fetch the next to scan block @@ -87,7 +93,10 @@ impl ContinuallyRan for ReportTask::recv_in_instructions(&mut txn, b); + let InInstructionData { + external_key_for_session_to_sign_batch, + returnable_in_instructions: in_instructions, + } = ScanToReportDb::::recv_in_instructions(&mut txn, b); let notable = ScannerGlobalDb::::is_block_notable(&txn, b); if !notable { assert!(in_instructions.is_empty(), "block wasn't notable yet had InInstructions"); @@ -138,19 +147,20 @@ impl ContinuallyRan for ReportTask::save_external_key_for_session_to_sign_batch( + &mut txn, + batch.id, + &external_key_for_session_to_sign_batch, + ); ReportDb::::save_return_information(&mut txn, batch.id, return_information); } for batch in batches { - self - .batch_publisher - .publish_batch(batch) - .await - .map_err(|e| format!("failed to publish batch: {e:?}"))?; + BatchToSign::send(&mut txn, &external_key_for_session_to_sign_batch, &batch); } } diff --git a/processor/scanner/src/scan/mod.rs b/processor/scanner/src/scan/mod.rs index 51671dc6..91c97f60 100644 --- a/processor/scanner/src/scan/mod.rs +++ b/processor/scanner/src/scan/mod.rs @@ -13,8 +13,8 @@ use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Block}; use crate::{ lifetime::LifetimeStage, db::{ - OutputWithInInstruction, Returnable, SenderScanData, ScannerGlobalDb, ScanToReportDb, - ScanToEventualityDb, + OutputWithInInstruction, Returnable, SenderScanData, ScannerGlobalDb, InInstructionData, + ScanToReportDb, ScanToEventualityDb, }, BlockExt, ScannerFeed, AddressFor, OutputFor, Return, sort_outputs, eventuality::latest_scannable_block, @@ -166,7 +166,7 @@ impl ContinuallyRan for ScanTask { let mut costs_to_aggregate = HashMap::with_capacity(1); // Scan for each key - for key in keys { + for key in &keys { for output in block.scan_for_outputs(key.key) { assert_eq!(output.key(), key.key); @@ -339,7 +339,17 @@ impl ContinuallyRan for ScanTask { let in_instructions = in_instructions.into_iter().map(|(_id, in_instruction)| in_instruction).collect::>(); // Send the InInstructions to the report task - ScanToReportDb::::send_in_instructions(&mut txn, b, &in_instructions); + // We need to also specify which key is responsible for signing the Batch for these, which + // will always be the oldest key (as the new key signing the Batch signifies handover + // acceptance) + ScanToReportDb::::send_in_instructions( + &mut txn, + b, + &InInstructionData { + external_key_for_session_to_sign_batch: keys[0].key, + returnable_in_instructions: in_instructions, + }, + ); // Send the scan data to the eventuality task ScanToEventualityDb::::send_scan_data(&mut txn, b, &scan_data); diff --git a/processor/scanner/src/substrate/mod.rs b/processor/scanner/src/substrate/mod.rs index d67be9dc..6f9cd86b 100644 --- a/processor/scanner/src/substrate/mod.rs +++ b/processor/scanner/src/substrate/mod.rs @@ -6,7 +6,7 @@ use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance}; use primitives::task::ContinuallyRan; use crate::{ - db::{ScannerGlobalDb, SubstrateToEventualityDb}, + db::{ScannerGlobalDb, SubstrateToEventualityDb, AcknowledgedBatch}, report, ScannerFeed, KeyFor, }; @@ -79,6 +79,12 @@ impl ContinuallyRan for SubstrateTask { return Ok(made_progress); }; + { + let external_key_for_session_to_sign_batch = + report::take_external_key_for_session_to_sign_batch::(&mut txn, batch_id).unwrap(); + AcknowledgedBatch::send(&mut txn, &external_key_for_session_to_sign_batch, batch_id); + } + // Mark we made progress and handle this made_progress = true;