From e78236276aee0cc10dacbe473c07715ebc2b3494 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Fri, 13 Sep 2024 01:14:47 -0400 Subject: [PATCH] Remove async-trait from processor/ Part of https://github.com/serai-dex/issues/607. --- processor/bin/Cargo.toml | 1 - processor/bin/src/coordinator.rs | 84 ++-- processor/bitcoin/Cargo.toml | 1 - processor/bitcoin/src/main.rs | 2 - processor/bitcoin/src/primitives/block.rs | 1 - processor/bitcoin/src/rpc.rs | 155 +++--- processor/bitcoin/src/txindex.rs | 144 +++--- processor/ethereum/Cargo.toml | 2 - processor/monero/Cargo.toml | 1 - processor/monero/src/primitives/block.rs | 1 - processor/monero/src/rpc.rs | 67 ++- processor/primitives/Cargo.toml | 2 - processor/primitives/src/block.rs | 1 - processor/primitives/src/task.rs | 101 ++-- processor/scanner/Cargo.toml | 3 - processor/scanner/src/eventuality/mod.rs | 586 +++++++++++----------- processor/scanner/src/index/mod.rs | 98 ++-- processor/scanner/src/lib.rs | 32 +- processor/scanner/src/report/mod.rs | 186 +++---- processor/scanner/src/scan/mod.rs | 475 +++++++++--------- processor/scanner/src/substrate/mod.rs | 184 +++---- processor/signers/Cargo.toml | 3 +- processor/signers/src/batch/mod.rs | 192 +++---- processor/signers/src/coordinator/mod.rs | 272 +++++----- processor/signers/src/cosign/mod.rs | 117 ++--- processor/signers/src/lib.rs | 27 +- processor/signers/src/slash_report.rs | 114 ++--- processor/signers/src/transaction/mod.rs | 5 +- processor/src/tests/scanner.rs | 2 +- 29 files changed, 1481 insertions(+), 1378 deletions(-) diff --git a/processor/bin/Cargo.toml b/processor/bin/Cargo.toml index 01a774ac..f6da8b7c 100644 --- a/processor/bin/Cargo.toml +++ b/processor/bin/Cargo.toml @@ -17,7 +17,6 @@ rustdoc-args = ["--cfg", "docsrs"] workspace = true [dependencies] -async-trait = { version = "0.1", default-features = false } zeroize = { version = "1", default-features = false, features = ["std"] } hex = { version = "0.4", default-features = false, features = ["std"] } diff --git a/processor/bin/src/coordinator.rs b/processor/bin/src/coordinator.rs index ead4a131..6fe5aea0 100644 --- a/processor/bin/src/coordinator.rs +++ b/processor/bin/src/coordinator.rs @@ -1,3 +1,4 @@ +use core::future::Future; use std::sync::{LazyLock, Arc, Mutex}; use tokio::sync::mpsc; @@ -169,59 +170,74 @@ impl Coordinator { } } -#[async_trait::async_trait] impl signers::Coordinator for CoordinatorSend { type EphemeralError = (); - async fn send( + fn send( &mut self, msg: messages::sign::ProcessorMessage, - ) -> Result<(), Self::EphemeralError> { - self.send(&messages::ProcessorMessage::Sign(msg)); - Ok(()) + ) -> impl Send + Future> { + async move { + self.send(&messages::ProcessorMessage::Sign(msg)); + Ok(()) + } } - async fn publish_cosign( + fn publish_cosign( &mut self, block_number: u64, block: [u8; 32], signature: Signature, - ) -> Result<(), Self::EphemeralError> { - self.send(&messages::ProcessorMessage::Coordinator( - messages::coordinator::ProcessorMessage::CosignedBlock { - block_number, - block, - signature: signature.encode(), - }, - )); - Ok(()) + ) -> impl Send + Future> { + async move { + self.send(&messages::ProcessorMessage::Coordinator( + messages::coordinator::ProcessorMessage::CosignedBlock { + block_number, + block, + signature: signature.encode(), + }, + )); + Ok(()) + } } - async fn publish_batch(&mut self, batch: Batch) -> Result<(), Self::EphemeralError> { - self.send(&messages::ProcessorMessage::Substrate( - messages::substrate::ProcessorMessage::Batch { batch }, - )); - Ok(()) + fn publish_batch( + &mut self, + batch: Batch, + ) -> impl Send + Future> { + async move { + self.send(&messages::ProcessorMessage::Substrate( + messages::substrate::ProcessorMessage::Batch { batch }, + )); + Ok(()) + } } - async fn publish_signed_batch(&mut self, batch: SignedBatch) -> Result<(), Self::EphemeralError> { - self.send(&messages::ProcessorMessage::Coordinator( - messages::coordinator::ProcessorMessage::SignedBatch { batch }, - )); - Ok(()) + fn publish_signed_batch( + &mut self, + batch: SignedBatch, + ) -> impl Send + Future> { + async move { + self.send(&messages::ProcessorMessage::Coordinator( + messages::coordinator::ProcessorMessage::SignedBatch { batch }, + )); + Ok(()) + } } - async fn publish_slash_report_signature( + fn publish_slash_report_signature( &mut self, session: Session, signature: Signature, - ) -> Result<(), Self::EphemeralError> { - self.send(&messages::ProcessorMessage::Coordinator( - messages::coordinator::ProcessorMessage::SignedSlashReport { - session, - signature: signature.encode(), - }, - )); - Ok(()) + ) -> impl Send + Future> { + async move { + self.send(&messages::ProcessorMessage::Coordinator( + messages::coordinator::ProcessorMessage::SignedSlashReport { + session, + signature: signature.encode(), + }, + )); + Ok(()) + } } } diff --git a/processor/bitcoin/Cargo.toml b/processor/bitcoin/Cargo.toml index 2d4958c7..52cca1ae 100644 --- a/processor/bitcoin/Cargo.toml +++ b/processor/bitcoin/Cargo.toml @@ -17,7 +17,6 @@ rustdoc-args = ["--cfg", "docsrs"] workspace = true [dependencies] -async-trait = { version = "0.1", default-features = false } rand_core = { version = "0.6", default-features = false } hex = { version = "0.4", default-features = false, features = ["std"] } diff --git a/processor/bitcoin/src/main.rs b/processor/bitcoin/src/main.rs index 74e174ee..56bfd619 100644 --- a/processor/bitcoin/src/main.rs +++ b/processor/bitcoin/src/main.rs @@ -96,7 +96,6 @@ use serai_client::{ */ /* -#[async_trait] impl TransactionTrait for Transaction { #[cfg(test)] async fn fee(&self, network: &Bitcoin) -> u64 { @@ -210,7 +209,6 @@ impl Bitcoin { } } -#[async_trait] impl Network for Bitcoin { // 2 inputs should be 2 * 230 = 460 weight units // The output should be ~36 bytes, or 144 weight units diff --git a/processor/bitcoin/src/primitives/block.rs b/processor/bitcoin/src/primitives/block.rs index 8221c8b5..e3df7e69 100644 --- a/processor/bitcoin/src/primitives/block.rs +++ b/processor/bitcoin/src/primitives/block.rs @@ -31,7 +31,6 @@ impl fmt::Debug for Block { } } -#[async_trait::async_trait] impl primitives::Block for Block { type Header = BlockHeader; diff --git a/processor/bitcoin/src/rpc.rs b/processor/bitcoin/src/rpc.rs index 23db5570..acd3be85 100644 --- a/processor/bitcoin/src/rpc.rs +++ b/processor/bitcoin/src/rpc.rs @@ -1,3 +1,5 @@ +use core::future::Future; + use bitcoin_serai::rpc::{RpcError, Rpc as BRpc}; use serai_client::primitives::{NetworkId, Coin, Amount}; @@ -18,7 +20,6 @@ pub(crate) struct Rpc { pub(crate) rpc: BRpc, } -#[async_trait::async_trait] impl ScannerFeed for Rpc { const NETWORK: NetworkId = NetworkId::Bitcoin; // 6 confirmations is widely accepted as secure and shouldn't occur @@ -32,71 +33,89 @@ impl ScannerFeed for Rpc { type EphemeralError = RpcError; - async fn latest_finalized_block_number(&self) -> Result { - db::LatestBlockToYieldAsFinalized::get(&self.db).ok_or(RpcError::ConnectionError) + fn latest_finalized_block_number( + &self, + ) -> impl Send + Future> { + async move { db::LatestBlockToYieldAsFinalized::get(&self.db).ok_or(RpcError::ConnectionError) } } - async fn time_of_block(&self, number: u64) -> Result { - let number = usize::try_from(number).unwrap(); - - /* - The block time isn't guaranteed to be monotonic. It is guaranteed to be greater than the - median time of prior blocks, as detailed in BIP-0113 (a BIP which used that fact to improve - CLTV). This creates a monotonic median time which we use as the block time. - */ - // This implements `GetMedianTimePast` - let median = { - const MEDIAN_TIMESPAN: usize = 11; - let mut timestamps = Vec::with_capacity(MEDIAN_TIMESPAN); - for i in number.saturating_sub(MEDIAN_TIMESPAN) .. number { - timestamps.push(self.rpc.get_block(&self.rpc.get_block_hash(i).await?).await?.header.time); - } - timestamps.sort(); - timestamps[timestamps.len() / 2] - }; - - /* - This block's timestamp is guaranteed to be greater than this median: - https://github.com/bitcoin/bitcoin/blob/0725a374941355349bb4bc8a79dad1affb27d3b9 - /src/validation.cpp#L4182-L4184 - - This does not guarantee the median always increases however. Take the following trivial - example, as the window is initially built: - - 0 block has time 0 // Prior blocks: [] - 1 block has time 1 // Prior blocks: [0] - 2 block has time 2 // Prior blocks: [0, 1] - 3 block has time 2 // Prior blocks: [0, 1, 2] - - These two blocks have the same time (both greater than the median of their prior blocks) and - the same median. - - The median will never decrease however. The values pushed onto the window will always be - greater than the median. If a value greater than the median is popped, the median will remain - the same (due to the counterbalance of the pushed value). If a value less than the median is - popped, the median will increase (either to another instance of the same value, yet one - closer to the end of the repeating sequence, or to a higher value). - */ - Ok(median.into()) - } - - async fn unchecked_block_header_by_number( + fn time_of_block( &self, number: u64, - ) -> Result<::Header, Self::EphemeralError> { - Ok(BlockHeader( - self.rpc.get_block(&self.rpc.get_block_hash(number.try_into().unwrap()).await?).await?.header, - )) + ) -> impl Send + Future> { + async move { + let number = usize::try_from(number).unwrap(); + + /* + The block time isn't guaranteed to be monotonic. It is guaranteed to be greater than the + median time of prior blocks, as detailed in BIP-0113 (a BIP which used that fact to improve + CLTV). This creates a monotonic median time which we use as the block time. + */ + // This implements `GetMedianTimePast` + let median = { + const MEDIAN_TIMESPAN: usize = 11; + let mut timestamps = Vec::with_capacity(MEDIAN_TIMESPAN); + for i in number.saturating_sub(MEDIAN_TIMESPAN) .. number { + timestamps + .push(self.rpc.get_block(&self.rpc.get_block_hash(i).await?).await?.header.time); + } + timestamps.sort(); + timestamps[timestamps.len() / 2] + }; + + /* + This block's timestamp is guaranteed to be greater than this median: + https://github.com/bitcoin/bitcoin/blob/0725a374941355349bb4bc8a79dad1affb27d3b9 + /src/validation.cpp#L4182-L4184 + + This does not guarantee the median always increases however. Take the following trivial + example, as the window is initially built: + + 0 block has time 0 // Prior blocks: [] + 1 block has time 1 // Prior blocks: [0] + 2 block has time 2 // Prior blocks: [0, 1] + 3 block has time 2 // Prior blocks: [0, 1, 2] + + These two blocks have the same time (both greater than the median of their prior blocks) and + the same median. + + The median will never decrease however. The values pushed onto the window will always be + greater than the median. If a value greater than the median is popped, the median will + remain the same (due to the counterbalance of the pushed value). If a value less than the + median is popped, the median will increase (either to another instance of the same value, + yet one closer to the end of the repeating sequence, or to a higher value). + */ + Ok(median.into()) + } } - async fn unchecked_block_by_number( + fn unchecked_block_header_by_number( &self, number: u64, - ) -> Result { - Ok(Block( - self.db.clone(), - self.rpc.get_block(&self.rpc.get_block_hash(number.try_into().unwrap()).await?).await?, - )) + ) -> impl Send + + Future::Header, Self::EphemeralError>> + { + async move { + Ok(BlockHeader( + self + .rpc + .get_block(&self.rpc.get_block_hash(number.try_into().unwrap()).await?) + .await? + .header, + )) + } + } + + fn unchecked_block_by_number( + &self, + number: u64, + ) -> impl Send + Future> { + async move { + Ok(Block( + self.db.clone(), + self.rpc.get_block(&self.rpc.get_block_hash(number.try_into().unwrap()).await?).await?, + )) + } } fn dust(coin: Coin) -> Amount { @@ -137,22 +156,26 @@ impl ScannerFeed for Rpc { Amount(10_000) } - async fn cost_to_aggregate( + fn cost_to_aggregate( &self, coin: Coin, _reference_block: &Self::Block, - ) -> Result { - assert_eq!(coin, Coin::Bitcoin); - // TODO - Ok(Amount(0)) + ) -> impl Send + Future> { + async move { + assert_eq!(coin, Coin::Bitcoin); + // TODO + Ok(Amount(0)) + } } } -#[async_trait::async_trait] impl TransactionPublisher for Rpc { type EphemeralError = RpcError; - async fn publish(&self, tx: Transaction) -> Result<(), Self::EphemeralError> { - self.rpc.send_raw_transaction(&tx.0).await.map(|_| ()) + fn publish( + &self, + tx: Transaction, + ) -> impl Send + Future> { + async move { self.rpc.send_raw_transaction(&tx.0).await.map(|_| ()) } } } diff --git a/processor/bitcoin/src/txindex.rs b/processor/bitcoin/src/txindex.rs index 4ed38973..6a55a4c4 100644 --- a/processor/bitcoin/src/txindex.rs +++ b/processor/bitcoin/src/txindex.rs @@ -1,18 +1,4 @@ -/* - We want to be able to return received outputs. We do that by iterating over the inputs to find an - address format we recognize, then setting that address as the address to return to. - - Since inputs only contain the script signatures, yet addresses are for script public keys, we - need to pull up the output spent by an input and read the script public key from that. While we - could use `txindex=1`, and an asynchronous call to the Bitcoin node, we: - - 1) Can maintain a much smaller index ourselves - 2) Don't want the asynchronous call (which would require the flow be async, allowed to - potentially error, and more latent) - 3) Don't want to risk Bitcoin's `txindex` corruptions (frequently observed on testnet) - - This task builds that index. -*/ +use core::future::Future; use bitcoin_serai::bitcoin::ScriptBuf; @@ -35,72 +21,88 @@ pub(crate) fn script_pubkey_for_on_chain_output( ) } +/* + We want to be able to return received outputs. We do that by iterating over the inputs to find an + address format we recognize, then setting that address as the address to return to. + + Since inputs only contain the script signatures, yet addresses are for script public keys, we + need to pull up the output spent by an input and read the script public key from that. While we + could use `txindex=1`, and an asynchronous call to the Bitcoin node, we: + + 1) Can maintain a much smaller index ourselves + 2) Don't want the asynchronous call (which would require the flow be async, allowed to + potentially error, and more latent) + 3) Don't want to risk Bitcoin's `txindex` corruptions (frequently observed on testnet) + + This task builds that index. +*/ pub(crate) struct TxIndexTask(pub(crate) Rpc); -#[async_trait::async_trait] impl ContinuallyRan for TxIndexTask { - async fn run_iteration(&mut self) -> Result { - let latest_block_number = self - .0 - .rpc - .get_latest_block_number() - .await - .map_err(|e| format!("couldn't fetch latest block number: {e:?}"))?; - let latest_block_number = u64::try_from(latest_block_number).unwrap(); - // `CONFIRMATIONS - 1` as any on-chain block inherently has one confirmation (itself) - let finalized_block_number = - latest_block_number.checked_sub(Rpc::::CONFIRMATIONS - 1).ok_or(format!( - "blockchain only just started and doesn't have {} blocks yet", - Rpc::::CONFIRMATIONS - ))?; - - /* - `finalized_block_number` is the latest block number minus confirmations. The blockchain may - undetectably re-organize though, as while the scanner will maintain an index of finalized - blocks and panics on reorganization, this runs prior to the scanner and that index. - - A reorganization of `CONFIRMATIONS` blocks is still an invariant. Even if that occurs, this - saves the script public keys *by the transaction hash an output index*. Accordingly, it isn't - invalidated on reorganization. The only risk would be if the new chain reorganized to - include a transaction to Serai which we didn't index the parents of. If that happens, we'll - panic when we scan the transaction, causing the invariant to be detected. - */ - - let finalized_block_number_in_db = db::LatestBlockToYieldAsFinalized::get(&self.0.db); - let next_block = finalized_block_number_in_db.map_or(0, |block| block + 1); - - let mut iterated = false; - for b in next_block ..= finalized_block_number { - iterated = true; - - // Fetch the block - let block_hash = self + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let latest_block_number = self .0 .rpc - .get_block_hash(b.try_into().unwrap()) + .get_latest_block_number() .await - .map_err(|e| format!("couldn't fetch block hash for block {b}: {e:?}"))?; - let block = self - .0 - .rpc - .get_block(&block_hash) - .await - .map_err(|e| format!("couldn't fetch block {b}: {e:?}"))?; + .map_err(|e| format!("couldn't fetch latest block number: {e:?}"))?; + let latest_block_number = u64::try_from(latest_block_number).unwrap(); + // `CONFIRMATIONS - 1` as any on-chain block inherently has one confirmation (itself) + let finalized_block_number = + latest_block_number.checked_sub(Rpc::::CONFIRMATIONS - 1).ok_or(format!( + "blockchain only just started and doesn't have {} blocks yet", + Rpc::::CONFIRMATIONS + ))?; - let mut txn = self.0.db.txn(); + /* + `finalized_block_number` is the latest block number minus confirmations. The blockchain may + undetectably re-organize though, as while the scanner will maintain an index of finalized + blocks and panics on reorganization, this runs prior to the scanner and that index. - for tx in &block.txdata { - let txid = hash_bytes(tx.compute_txid().to_raw_hash()); - for (o, output) in tx.output.iter().enumerate() { - let o = u32::try_from(o).unwrap(); - // Set the script public key for this transaction - db::ScriptPubKey::set(&mut txn, txid, o, &output.script_pubkey.clone().into_bytes()); + A reorganization of `CONFIRMATIONS` blocks is still an invariant. Even if that occurs, this + saves the script public keys *by the transaction hash an output index*. Accordingly, it + isn't invalidated on reorganization. The only risk would be if the new chain reorganized to + include a transaction to Serai which we didn't index the parents of. If that happens, we'll + panic when we scan the transaction, causing the invariant to be detected. + */ + + let finalized_block_number_in_db = db::LatestBlockToYieldAsFinalized::get(&self.0.db); + let next_block = finalized_block_number_in_db.map_or(0, |block| block + 1); + + let mut iterated = false; + for b in next_block ..= finalized_block_number { + iterated = true; + + // Fetch the block + let block_hash = self + .0 + .rpc + .get_block_hash(b.try_into().unwrap()) + .await + .map_err(|e| format!("couldn't fetch block hash for block {b}: {e:?}"))?; + let block = self + .0 + .rpc + .get_block(&block_hash) + .await + .map_err(|e| format!("couldn't fetch block {b}: {e:?}"))?; + + let mut txn = self.0.db.txn(); + + for tx in &block.txdata { + let txid = hash_bytes(tx.compute_txid().to_raw_hash()); + for (o, output) in tx.output.iter().enumerate() { + let o = u32::try_from(o).unwrap(); + // Set the script public key for this transaction + db::ScriptPubKey::set(&mut txn, txid, o, &output.script_pubkey.clone().into_bytes()); + } } - } - db::LatestBlockToYieldAsFinalized::set(&mut txn, &b); - txn.commit(); + db::LatestBlockToYieldAsFinalized::set(&mut txn, &b); + txn.commit(); + } + Ok(iterated) } - Ok(iterated) } } diff --git a/processor/ethereum/Cargo.toml b/processor/ethereum/Cargo.toml index eff47af9..ea65d570 100644 --- a/processor/ethereum/Cargo.toml +++ b/processor/ethereum/Cargo.toml @@ -17,8 +17,6 @@ rustdoc-args = ["--cfg", "docsrs"] workspace = true [dependencies] -async-trait = { version = "0.1", default-features = false } - const-hex = { version = "1", default-features = false } hex = { version = "0.4", default-features = false, features = ["std"] } scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] } diff --git a/processor/monero/Cargo.toml b/processor/monero/Cargo.toml index f70d6187..22137b2d 100644 --- a/processor/monero/Cargo.toml +++ b/processor/monero/Cargo.toml @@ -17,7 +17,6 @@ rustdoc-args = ["--cfg", "docsrs"] workspace = true [dependencies] -async-trait = { version = "0.1", default-features = false } rand_core = { version = "0.6", default-features = false } hex = { version = "0.4", default-features = false, features = ["std"] } diff --git a/processor/monero/src/primitives/block.rs b/processor/monero/src/primitives/block.rs index 40d0f296..ad28b0c1 100644 --- a/processor/monero/src/primitives/block.rs +++ b/processor/monero/src/primitives/block.rs @@ -24,7 +24,6 @@ impl primitives::BlockHeader for BlockHeader { #[derive(Clone, Debug)] pub(crate) struct Block(pub(crate) MBlock, Vec); -#[async_trait::async_trait] impl primitives::Block for Block { type Header = BlockHeader; diff --git a/processor/monero/src/rpc.rs b/processor/monero/src/rpc.rs index 21a202cc..0e0739b8 100644 --- a/processor/monero/src/rpc.rs +++ b/processor/monero/src/rpc.rs @@ -1,3 +1,5 @@ +use core::future::Future; + use monero_wallet::rpc::{RpcError, Rpc as RpcTrait}; use monero_simple_request_rpc::SimpleRequestRpc; @@ -16,7 +18,6 @@ pub(crate) struct Rpc { pub(crate) rpc: SimpleRequestRpc, } -#[async_trait::async_trait] impl ScannerFeed for Rpc { const NETWORK: NetworkId = NetworkId::Monero; // Outputs aren't spendable until 10 blocks later due to the 10-block lock @@ -32,28 +33,44 @@ impl ScannerFeed for Rpc { type EphemeralError = RpcError; - async fn latest_finalized_block_number(&self) -> Result { - Ok(self.rpc.get_height().await?.checked_sub(1).expect("connected to an invalid Monero RPC").try_into().unwrap()) + fn latest_finalized_block_number( + &self, + ) -> impl Send + Future> { + async move { + Ok( + self + .rpc + .get_height() + .await? + .checked_sub(1) + .expect("connected to an invalid Monero RPC") + .try_into() + .unwrap(), + ) + } } - async fn time_of_block(&self, number: u64) -> Result { - todo!("TODO") - } - - async fn unchecked_block_header_by_number( + fn time_of_block( &self, number: u64, - ) -> Result<::Header, Self::EphemeralError> { - Ok(BlockHeader( - self.rpc.get_block_by_number(number.try_into().unwrap()).await? - )) + ) -> impl Send + Future> { + async move{todo!("TODO")} } - async fn unchecked_block_by_number( + fn unchecked_block_header_by_number( &self, number: u64, - ) -> Result { - todo!("TODO") + ) -> impl Send + + Future::Header, Self::EphemeralError>> + { + async move { Ok(BlockHeader(self.rpc.get_block_by_number(number.try_into().unwrap()).await?)) } + } + + fn unchecked_block_by_number( + &self, + number: u64, + ) -> impl Send + Future> { + async move { todo!("TODO") } } fn dust(coin: Coin) -> Amount { @@ -62,22 +79,26 @@ impl ScannerFeed for Rpc { todo!("TODO") } - async fn cost_to_aggregate( + fn cost_to_aggregate( &self, coin: Coin, _reference_block: &Self::Block, - ) -> Result { - assert_eq!(coin, Coin::Bitcoin); - // TODO - Ok(Amount(0)) + ) -> impl Send + Future> { + async move { + assert_eq!(coin, Coin::Bitcoin); + // TODO + Ok(Amount(0)) + } } } -#[async_trait::async_trait] impl TransactionPublisher for Rpc { type EphemeralError = RpcError; - async fn publish(&self, tx: Transaction) -> Result<(), Self::EphemeralError> { - self.rpc.publish_transaction(&tx.0).await + fn publish( + &self, + tx: Transaction, + ) -> impl Send + Future> { + async move { self.rpc.publish_transaction(&tx.0).await } } } diff --git a/processor/primitives/Cargo.toml b/processor/primitives/Cargo.toml index dd1b74ea..6dd3082b 100644 --- a/processor/primitives/Cargo.toml +++ b/processor/primitives/Cargo.toml @@ -17,8 +17,6 @@ rustdoc-args = ["--cfg", "docsrs"] workspace = true [dependencies] -async-trait = { version = "0.1", default-features = false } - group = { version = "0.13", default-features = false } serai-primitives = { path = "../../substrate/primitives", default-features = false, features = ["std"] } diff --git a/processor/primitives/src/block.rs b/processor/primitives/src/block.rs index 4f721d02..da481247 100644 --- a/processor/primitives/src/block.rs +++ b/processor/primitives/src/block.rs @@ -22,7 +22,6 @@ pub trait BlockHeader: Send + Sync + Sized + Clone + Debug { /// necessary to literally define it as whatever the external network defines as a block. For /// external networks which finalize block(s), this block type should be a representation of all /// transactions within a period finalization (whether block or epoch). -#[async_trait::async_trait] pub trait Block: Send + Sync + Sized + Clone + Debug { /// The type used for this block's header. type Header: BlockHeader; diff --git a/processor/primitives/src/task.rs b/processor/primitives/src/task.rs index a40fb9ff..e8efc64c 100644 --- a/processor/primitives/src/task.rs +++ b/processor/primitives/src/task.rs @@ -1,4 +1,4 @@ -use core::time::Duration; +use core::{future::Future, time::Duration}; use std::sync::Arc; use tokio::sync::{mpsc, oneshot, Mutex}; @@ -78,8 +78,7 @@ impl TaskHandle { } /// A task to be continually ran. -#[async_trait::async_trait] -pub trait ContinuallyRan: Sized { +pub trait ContinuallyRan: Sized + Send { /// The amount of seconds before this task should be polled again. const DELAY_BETWEEN_ITERATIONS: u64 = 5; /// The maximum amount of seconds before this task should be run again. @@ -91,60 +90,66 @@ pub trait ContinuallyRan: Sized { /// /// If this returns `true`, all dependents of the task will immediately have a new iteration ran /// (without waiting for whatever timer they were already on). - async fn run_iteration(&mut self) -> Result; + fn run_iteration(&mut self) -> impl Send + Future>; /// Continually run the task. - async fn continually_run(mut self, mut task: Task, dependents: Vec) { - // The default number of seconds to sleep before running the task again - let default_sleep_before_next_task = Self::DELAY_BETWEEN_ITERATIONS; - // The current number of seconds to sleep before running the task again - // We increment this upon errors in order to not flood the logs with errors - let mut current_sleep_before_next_task = default_sleep_before_next_task; - let increase_sleep_before_next_task = |current_sleep_before_next_task: &mut u64| { - let new_sleep = *current_sleep_before_next_task + default_sleep_before_next_task; - // Set a limit of sleeping for two minutes - *current_sleep_before_next_task = new_sleep.max(Self::MAX_DELAY_BETWEEN_ITERATIONS); - }; + fn continually_run( + mut self, + mut task: Task, + dependents: Vec, + ) -> impl Send + Future { + async move { + // The default number of seconds to sleep before running the task again + let default_sleep_before_next_task = Self::DELAY_BETWEEN_ITERATIONS; + // The current number of seconds to sleep before running the task again + // We increment this upon errors in order to not flood the logs with errors + let mut current_sleep_before_next_task = default_sleep_before_next_task; + let increase_sleep_before_next_task = |current_sleep_before_next_task: &mut u64| { + let new_sleep = *current_sleep_before_next_task + default_sleep_before_next_task; + // Set a limit of sleeping for two minutes + *current_sleep_before_next_task = new_sleep.max(Self::MAX_DELAY_BETWEEN_ITERATIONS); + }; - loop { - // If we were told to close/all handles were dropped, drop it - { - let should_close = task.close.try_recv(); - match should_close { - Ok(()) | Err(mpsc::error::TryRecvError::Disconnected) => break, - Err(mpsc::error::TryRecvError::Empty) => {} + loop { + // If we were told to close/all handles were dropped, drop it + { + let should_close = task.close.try_recv(); + match should_close { + Ok(()) | Err(mpsc::error::TryRecvError::Disconnected) => break, + Err(mpsc::error::TryRecvError::Empty) => {} + } } - } - match self.run_iteration().await { - Ok(run_dependents) => { - // Upon a successful (error-free) loop iteration, reset the amount of time we sleep - current_sleep_before_next_task = default_sleep_before_next_task; + match self.run_iteration().await { + Ok(run_dependents) => { + // Upon a successful (error-free) loop iteration, reset the amount of time we sleep + current_sleep_before_next_task = default_sleep_before_next_task; - if run_dependents { - for dependent in &dependents { - dependent.run_now(); + if run_dependents { + for dependent in &dependents { + dependent.run_now(); + } } } - } - Err(e) => { - log::warn!("{}", e); - increase_sleep_before_next_task(&mut current_sleep_before_next_task); - } - } - - // Don't run the task again for another few seconds UNLESS told to run now - tokio::select! { - () = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {}, - msg = task.run_now.recv() => { - // Check if this is firing because the handle was dropped - if msg.is_none() { - break; + Err(e) => { + log::warn!("{}", e); + increase_sleep_before_next_task(&mut current_sleep_before_next_task); } - }, - } - } + } - task.closed.send(()).unwrap(); + // Don't run the task again for another few seconds UNLESS told to run now + tokio::select! { + () = tokio::time::sleep(Duration::from_secs(current_sleep_before_next_task)) => {}, + msg = task.run_now.recv() => { + // Check if this is firing because the handle was dropped + if msg.is_none() { + break; + } + }, + } + } + + task.closed.send(()).unwrap(); + } } } diff --git a/processor/scanner/Cargo.toml b/processor/scanner/Cargo.toml index e3e08329..1ff154cd 100644 --- a/processor/scanner/Cargo.toml +++ b/processor/scanner/Cargo.toml @@ -17,9 +17,6 @@ rustdoc-args = ["--cfg", "docsrs"] workspace = true [dependencies] -# Macros -async-trait = { version = "0.1", default-features = false } - # Encoders hex = { version = "0.4", default-features = false, features = ["std"] } scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] } diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index 5d139c6d..46a5e13b 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -1,4 +1,4 @@ -use core::marker::PhantomData; +use core::{marker::PhantomData, future::Future}; use std::collections::{HashSet, HashMap}; use group::GroupEncoding; @@ -185,317 +185,323 @@ impl> EventualityTask { } } -#[async_trait::async_trait] impl> ContinuallyRan for EventualityTask { - async fn run_iteration(&mut self) -> Result { - // Fetch the highest acknowledged block - let Some(highest_acknowledged) = ScannerGlobalDb::::highest_acknowledged_block(&self.db) - else { - // If we've never acknowledged a block, return - return Ok(false); - }; - - // A boolean of if we've made any progress to return at the end of the function - let mut made_progress = false; - - // Start by intaking any Burns we have sitting around - // It's important we run this regardless of if we have a new block to handle - made_progress |= self.intake_burns().await?; - - /* - Eventualities increase upon one of two cases: - - 1) We're fulfilling Burns - 2) We acknowledged a block - - We can't know the processor has intaked all Burns it should have when we process block `b`. - We solve this by executing a consensus protocol whenever a resolution for an Eventuality - created to fulfill Burns occurs. Accordingly, we force ourselves to obtain synchrony on such - blocks (and all preceding Burns). - - This means we can only iterate up to the block currently pending acknowledgement. - - We only know blocks will need acknowledgement *for sure* if they were scanned. The only other - causes are key activation and retirement (both scheduled outside the scan window). This makes - the exclusive upper bound the *next block to scan*. - */ - let exclusive_upper_bound = { - // Fetch the next to scan block - let next_to_scan = next_to_scan_for_outputs_block::(&self.db) - .expect("EventualityTask run before writing the start block"); - // If we haven't done any work, return - if next_to_scan == 0 { + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + // Fetch the highest acknowledged block + let Some(highest_acknowledged) = ScannerGlobalDb::::highest_acknowledged_block(&self.db) + else { + // If we've never acknowledged a block, return return Ok(false); - } - next_to_scan - }; + }; - // Fetch the next block to check - let next_to_check = EventualityDb::::next_to_check_for_eventualities_block(&self.db) - .expect("EventualityTask run before writing the start block"); + // A boolean of if we've made any progress to return at the end of the function + let mut made_progress = false; - // Check all blocks - for b in next_to_check .. exclusive_upper_bound { - let is_block_notable = ScannerGlobalDb::::is_block_notable(&self.db, b); - if is_block_notable { - /* - If this block is notable *and* not acknowledged, break. + // Start by intaking any Burns we have sitting around + // It's important we run this regardless of if we have a new block to handle + made_progress |= self.intake_burns().await?; - This is so if Burns queued prior to this block's acknowledgement caused any Eventualities - (which may resolve this block), we have them. If it wasn't for that, it'd be so if this - block's acknowledgement caused any Eventualities, we have them, though those would only - potentially resolve in the next block (letting us scan this block without delay). - */ - if b > highest_acknowledged { - break; + /* + Eventualities increase upon one of two cases: + + 1) We're fulfilling Burns + 2) We acknowledged a block + + We can't know the processor has intaked all Burns it should have when we process block `b`. + We solve this by executing a consensus protocol whenever a resolution for an Eventuality + created to fulfill Burns occurs. Accordingly, we force ourselves to obtain synchrony on + such blocks (and all preceding Burns). + + This means we can only iterate up to the block currently pending acknowledgement. + + We only know blocks will need acknowledgement *for sure* if they were scanned. The only + other causes are key activation and retirement (both scheduled outside the scan window). + This makes the exclusive upper bound the *next block to scan*. + */ + let exclusive_upper_bound = { + // Fetch the next to scan block + let next_to_scan = next_to_scan_for_outputs_block::(&self.db) + .expect("EventualityTask run before writing the start block"); + // If we haven't done any work, return + if next_to_scan == 0 { + return Ok(false); } + next_to_scan + }; - // Since this block is notable, ensure we've intaked all the Burns preceding it - // We can know with certainty that the channel is fully populated at this time since we've - // acknowledged a newer block (so we've handled the state up to this point and any new - // state will be for the newer block) - #[allow(unused_assignments)] - { - made_progress |= self.intake_burns().await?; - } - } + // Fetch the next block to check + let next_to_check = EventualityDb::::next_to_check_for_eventualities_block(&self.db) + .expect("EventualityTask run before writing the start block"); - // Since we're handling this block, we are making progress - made_progress = true; - - let block = self.feed.block_by_number(&self.db, b).await?; - - log::debug!("checking eventuality completions in block: {} ({b})", hex::encode(block.id())); - - let (keys, keys_with_stages) = self.keys_and_keys_with_stages(b); - - let mut txn = self.db.txn(); - - // Fetch the data from the scanner - let scan_data = ScanToEventualityDb::recv_scan_data(&mut txn, b); - assert_eq!(scan_data.block_number, b); - let ReceiverScanData { block_number: _, received_external_outputs, forwards, returns } = - scan_data; - let mut outputs = received_external_outputs; - - for key in &keys { - // If this is the key's activation block, activate it - if key.activation_block_number == b { - Sch::activate_key(&mut txn, key.key); - } - - let completed_eventualities = { - let mut eventualities = EventualityDb::::eventualities(&txn, key.key); - let completed_eventualities = block.check_for_eventuality_resolutions(&mut eventualities); - EventualityDb::::set_eventualities(&mut txn, key.key, &eventualities); - completed_eventualities - }; - - for (tx, eventuality) in &completed_eventualities { - log::info!( - "eventuality {} resolved by {}", - hex::encode(eventuality.id()), - hex::encode(tx.as_ref()) - ); - CompletedEventualities::send(&mut txn, &key.key, eventuality.id()); - } - - // Fetch all non-External outputs - let mut non_external_outputs = block.scan_for_outputs(key.key); - non_external_outputs.retain(|output| output.kind() != OutputType::External); - // Drop any outputs less than the dust limit - non_external_outputs.retain(|output| { - let balance = output.balance(); - balance.amount.0 >= S::dust(balance.coin).0 - }); - - /* - Now that we have all non-External outputs, we filter them to be only the outputs which - are from transactions which resolve our own Eventualities *if* the multisig is retiring. - This implements step 6 of `spec/processor/Multisig Rotation.md`. - - We may receive a Change output. The only issue with accumulating this would be if it - extends the multisig's lifetime (by increasing the amount of outputs yet to be - forwarded). By checking it's one we made, either: - 1) It's a legitimate Change output to be forwarded - 2) It's a Change output created by a user burning coins (specifying the Change address), - which can only be created while the multisig is actively handling `Burn`s (therefore - ensuring this multisig cannot be kept alive ad-infinitum) - - The commentary on Change outputs also applies to Branch/Forwarded. They'll presumably get - ignored if not usable however. - */ - if key.stage == LifetimeStage::Finishing { - non_external_outputs - .retain(|output| completed_eventualities.contains_key(&output.transaction_id())); - } - - // Finally, for non-External outputs we didn't make, we check they're worth more than the - // cost to aggregate them to avoid some profitable spam attacks by malicious miners - { - // Fetch and cache the costs to aggregate as this call may be expensive - let coins = - non_external_outputs.iter().map(|output| output.balance().coin).collect::>(); - let mut costs_to_aggregate = HashMap::new(); - for coin in coins { - costs_to_aggregate.insert( - coin, - self.feed.cost_to_aggregate(coin, &block).await.map_err(|e| { - format!("EventualityTask couldn't fetch cost to aggregate {coin:?} at {b}: {e:?}") - })?, - ); - } - - // Only retain out outputs/outputs sufficiently worthwhile - non_external_outputs.retain(|output| { - completed_eventualities.contains_key(&output.transaction_id()) || { - let balance = output.balance(); - balance.amount.0 >= (2 * costs_to_aggregate[&balance.coin].0) - } - }); - } - - // Now, we iterate over all Forwarded outputs and queue their InInstructions - for output in - non_external_outputs.iter().filter(|output| output.kind() == OutputType::Forwarded) - { - let Some(eventuality) = completed_eventualities.get(&output.transaction_id()) else { - // Output sent to the forwarding address yet not one we made - continue; - }; - let Some(forwarded) = eventuality.singular_spent_output() else { - // This was a TX made by us, yet someone burned to the forwarding address as it doesn't - // follow the structure of forwarding transactions - continue; - }; - - let Some((return_address, mut in_instruction)) = - ScannerGlobalDb::::return_address_and_in_instruction_for_forwarded_output( - &txn, &forwarded, - ) - else { - // This was a TX made by us, coincidentally with the necessary structure, yet wasn't - // forwarding an output - continue; - }; - - // We use the original amount, minus twice the cost to aggregate - // If the fees we paid to forward this now (less than the cost to aggregate now, yet not - // necessarily the cost to aggregate historically) caused this amount to be less, reduce - // it accordingly - in_instruction.balance.amount.0 = - in_instruction.balance.amount.0.min(output.balance().amount.0); - - queue_output_until_block::( - &mut txn, - b + S::WINDOW_LENGTH, - &OutputWithInInstruction { output: output.clone(), return_address, in_instruction }, - ); - } - - // Accumulate all of these outputs - outputs.extend(non_external_outputs); - } - - // Update the scheduler - { - let mut scheduler_update = SchedulerUpdate { outputs, forwards, returns }; - scheduler_update.outputs.sort_by(sort_outputs); - scheduler_update.forwards.sort_by(sort_outputs); - scheduler_update.returns.sort_by(|a, b| sort_outputs(&a.output, &b.output)); - - let empty = { - let a: core::slice::Iter<'_, OutputFor> = scheduler_update.outputs.iter(); - let b: core::slice::Iter<'_, OutputFor> = scheduler_update.forwards.iter(); - let c = scheduler_update.returns.iter().map(|output_to_return| &output_to_return.output); - let mut all_outputs = a.chain(b).chain(c).peekable(); - - // If we received any output, sanity check this block is notable - let empty = all_outputs.peek().is_none(); - if !empty { - assert!(is_block_notable, "accumulating output(s) in non-notable block"); - } - - // Sanity check we've never accumulated these outputs before - for output in all_outputs { - assert!( - !EventualityDb::::prior_accumulated_output(&txn, &output.id()), - "prior accumulated an output with this ID" - ); - EventualityDb::::accumulated_output(&mut txn, &output.id()); - } - - empty - }; - - if !empty { - // Accumulate the outputs + // Check all blocks + for b in next_to_check .. exclusive_upper_bound { + let is_block_notable = ScannerGlobalDb::::is_block_notable(&self.db, b); + if is_block_notable { /* - This uses the `keys_with_stages` for the current block, yet this block is notable. - Accordingly, all future intaked Burns will use at least this block when determining - what LifetimeStage a key is. That makes the LifetimeStage monotonically incremented. If - this block wasn't notable, we'd potentially intake Burns with the LifetimeStage - determined off an earlier block than this (enabling an earlier LifetimeStage to be used - after a later one was already used). + If this block is notable *and* not acknowledged, break. + + This is so if Burns queued prior to this block's acknowledgement caused any + Eventualities (which may resolve this block), we have them. If it wasn't for that, it'd + be so if this block's acknowledgement caused any Eventualities, we have them, though + those would only potentially resolve in the next block (letting us scan this block + without delay). */ - let new_eventualities = - Sch::update(&mut txn, &block, &keys_with_stages, scheduler_update); - // Intake the new Eventualities - for key in new_eventualities.keys() { - keys - .iter() - .find(|serai_key| serai_key.key.to_bytes().as_ref() == key.as_slice()) - .expect("intaking Eventuality for key which isn't active"); + if b > highest_acknowledged { + break; } - intake_eventualities::(&mut txn, new_eventualities); - } - } - for key in &keys { - // If this is the block at which forwarding starts for this key, flush it - // We do this after we issue the above update for any efficiencies gained by doing so - if key.block_at_which_forwarding_starts == Some(b) { - assert!( - key.key != keys.last().unwrap().key, - "key which was forwarding was the last key (which has no key after it to forward to)" - ); - let new_eventualities = - Sch::flush_key(&mut txn, &block, key.key, keys.last().unwrap().key); - intake_eventualities::(&mut txn, new_eventualities); + // Since this block is notable, ensure we've intaked all the Burns preceding it + // We can know with certainty that the channel is fully populated at this time since + // we've acknowledged a newer block (so we've handled the state up to this point and any + // new state will be for the newer block) + #[allow(unused_assignments)] + { + made_progress |= self.intake_burns().await?; + } } - // Now that we've intaked any Eventualities caused, check if we're retiring any keys - if key.stage == LifetimeStage::Finishing { - let eventualities = EventualityDb::::eventualities(&txn, key.key); - if eventualities.active_eventualities.is_empty() { + // Since we're handling this block, we are making progress + made_progress = true; + + let block = self.feed.block_by_number(&self.db, b).await?; + + log::debug!("checking eventuality completions in block: {} ({b})", hex::encode(block.id())); + + let (keys, keys_with_stages) = self.keys_and_keys_with_stages(b); + + let mut txn = self.db.txn(); + + // Fetch the data from the scanner + let scan_data = ScanToEventualityDb::recv_scan_data(&mut txn, b); + assert_eq!(scan_data.block_number, b); + let ReceiverScanData { block_number: _, received_external_outputs, forwards, returns } = + scan_data; + let mut outputs = received_external_outputs; + + for key in &keys { + // If this is the key's activation block, activate it + if key.activation_block_number == b { + Sch::activate_key(&mut txn, key.key); + } + + let completed_eventualities = { + let mut eventualities = EventualityDb::::eventualities(&txn, key.key); + let completed_eventualities = + block.check_for_eventuality_resolutions(&mut eventualities); + EventualityDb::::set_eventualities(&mut txn, key.key, &eventualities); + completed_eventualities + }; + + for (tx, eventuality) in &completed_eventualities { log::info!( - "key {} has finished and is being retired", - hex::encode(key.key.to_bytes().as_ref()) + "eventuality {} resolved by {}", + hex::encode(eventuality.id()), + hex::encode(tx.as_ref()) ); + CompletedEventualities::send(&mut txn, &key.key, eventuality.id()); + } - // Retire this key `WINDOW_LENGTH` blocks in the future to ensure the scan task never - // has a malleable view of the keys. - ScannerGlobalDb::::retire_key(&mut txn, b + S::WINDOW_LENGTH, key.key); + // Fetch all non-External outputs + let mut non_external_outputs = block.scan_for_outputs(key.key); + non_external_outputs.retain(|output| output.kind() != OutputType::External); + // Drop any outputs less than the dust limit + non_external_outputs.retain(|output| { + let balance = output.balance(); + balance.amount.0 >= S::dust(balance.coin).0 + }); - // We tell the scheduler to retire it now as we're done with it, and this fn doesn't - // require it be called with a canonical order - Sch::retire_key(&mut txn, key.key); + /* + Now that we have all non-External outputs, we filter them to be only the outputs which + are from transactions which resolve our own Eventualities *if* the multisig is retiring. + This implements step 6 of `spec/processor/Multisig Rotation.md`. + + We may receive a Change output. The only issue with accumulating this would be if it + extends the multisig's lifetime (by increasing the amount of outputs yet to be + forwarded). By checking it's one we made, either: + 1) It's a legitimate Change output to be forwarded + 2) It's a Change output created by a user burning coins (specifying the Change address), + which can only be created while the multisig is actively handling `Burn`s (therefore + ensuring this multisig cannot be kept alive ad-infinitum) + + The commentary on Change outputs also applies to Branch/Forwarded. They'll presumably + get ignored if not usable however. + */ + if key.stage == LifetimeStage::Finishing { + non_external_outputs + .retain(|output| completed_eventualities.contains_key(&output.transaction_id())); + } + + // Finally, for non-External outputs we didn't make, we check they're worth more than the + // cost to aggregate them to avoid some profitable spam attacks by malicious miners + { + // Fetch and cache the costs to aggregate as this call may be expensive + let coins = non_external_outputs + .iter() + .map(|output| output.balance().coin) + .collect::>(); + let mut costs_to_aggregate = HashMap::new(); + for coin in coins { + costs_to_aggregate.insert( + coin, + self.feed.cost_to_aggregate(coin, &block).await.map_err(|e| { + format!("EventualityTask couldn't fetch cost to aggregate {coin:?} at {b}: {e:?}") + })?, + ); + } + + // Only retain out outputs/outputs sufficiently worthwhile + non_external_outputs.retain(|output| { + completed_eventualities.contains_key(&output.transaction_id()) || { + let balance = output.balance(); + balance.amount.0 >= (2 * costs_to_aggregate[&balance.coin].0) + } + }); + } + + // Now, we iterate over all Forwarded outputs and queue their InInstructions + for output in + non_external_outputs.iter().filter(|output| output.kind() == OutputType::Forwarded) + { + let Some(eventuality) = completed_eventualities.get(&output.transaction_id()) else { + // Output sent to the forwarding address yet not one we made + continue; + }; + let Some(forwarded) = eventuality.singular_spent_output() else { + // This was a TX made by us, yet someone burned to the forwarding address as it + // doesn't follow the structure of forwarding transactions + continue; + }; + + let Some((return_address, mut in_instruction)) = + ScannerGlobalDb::::return_address_and_in_instruction_for_forwarded_output( + &txn, &forwarded, + ) + else { + // This was a TX made by us, coincidentally with the necessary structure, yet wasn't + // forwarding an output + continue; + }; + + // We use the original amount, minus twice the cost to aggregate + // If the fees we paid to forward this now (less than the cost to aggregate now, yet not + // necessarily the cost to aggregate historically) caused this amount to be less, reduce + // it accordingly + in_instruction.balance.amount.0 = + in_instruction.balance.amount.0.min(output.balance().amount.0); + + queue_output_until_block::( + &mut txn, + b + S::WINDOW_LENGTH, + &OutputWithInInstruction { output: output.clone(), return_address, in_instruction }, + ); + } + + // Accumulate all of these outputs + outputs.extend(non_external_outputs); + } + + // Update the scheduler + { + let mut scheduler_update = SchedulerUpdate { outputs, forwards, returns }; + scheduler_update.outputs.sort_by(sort_outputs); + scheduler_update.forwards.sort_by(sort_outputs); + scheduler_update.returns.sort_by(|a, b| sort_outputs(&a.output, &b.output)); + + let empty = { + let a: core::slice::Iter<'_, OutputFor> = scheduler_update.outputs.iter(); + let b: core::slice::Iter<'_, OutputFor> = scheduler_update.forwards.iter(); + let c = + scheduler_update.returns.iter().map(|output_to_return| &output_to_return.output); + let mut all_outputs = a.chain(b).chain(c).peekable(); + + // If we received any output, sanity check this block is notable + let empty = all_outputs.peek().is_none(); + if !empty { + assert!(is_block_notable, "accumulating output(s) in non-notable block"); + } + + // Sanity check we've never accumulated these outputs before + for output in all_outputs { + assert!( + !EventualityDb::::prior_accumulated_output(&txn, &output.id()), + "prior accumulated an output with this ID" + ); + EventualityDb::::accumulated_output(&mut txn, &output.id()); + } + + empty + }; + + if !empty { + // Accumulate the outputs + /* + This uses the `keys_with_stages` for the current block, yet this block is notable. + Accordingly, all future intaked Burns will use at least this block when determining + what LifetimeStage a key is. That makes the LifetimeStage monotonically incremented. + If this block wasn't notable, we'd potentially intake Burns with the LifetimeStage + determined off an earlier block than this (enabling an earlier LifetimeStage to be + used after a later one was already used). + */ + let new_eventualities = + Sch::update(&mut txn, &block, &keys_with_stages, scheduler_update); + // Intake the new Eventualities + for key in new_eventualities.keys() { + keys + .iter() + .find(|serai_key| serai_key.key.to_bytes().as_ref() == key.as_slice()) + .expect("intaking Eventuality for key which isn't active"); + } + intake_eventualities::(&mut txn, new_eventualities); } } + + for key in &keys { + // If this is the block at which forwarding starts for this key, flush it + // We do this after we issue the above update for any efficiencies gained by doing so + if key.block_at_which_forwarding_starts == Some(b) { + assert!( + key.key != keys.last().unwrap().key, + "key which was forwarding was the last key (which has no key after it to forward to)" + ); + let new_eventualities = + Sch::flush_key(&mut txn, &block, key.key, keys.last().unwrap().key); + intake_eventualities::(&mut txn, new_eventualities); + } + + // Now that we've intaked any Eventualities caused, check if we're retiring any keys + if key.stage == LifetimeStage::Finishing { + let eventualities = EventualityDb::::eventualities(&txn, key.key); + if eventualities.active_eventualities.is_empty() { + log::info!( + "key {} has finished and is being retired", + hex::encode(key.key.to_bytes().as_ref()) + ); + + // Retire this key `WINDOW_LENGTH` blocks in the future to ensure the scan task never + // has a malleable view of the keys. + ScannerGlobalDb::::retire_key(&mut txn, b + S::WINDOW_LENGTH, key.key); + + // We tell the scheduler to retire it now as we're done with it, and this fn doesn't + // require it be called with a canonical order + Sch::retire_key(&mut txn, key.key); + } + } + } + + // Update the next-to-check block + EventualityDb::::set_next_to_check_for_eventualities_block(&mut txn, next_to_check); + + // If this block was notable, update the latest-handled notable block + if is_block_notable { + EventualityDb::::set_latest_handled_notable_block(&mut txn, b); + } + + txn.commit(); } - // Update the next-to-check block - EventualityDb::::set_next_to_check_for_eventualities_block(&mut txn, next_to_check); - - // If this block was notable, update the latest-handled notable block - if is_block_notable { - EventualityDb::::set_latest_handled_notable_block(&mut txn, b); - } - - txn.commit(); + // Run dependents if we successfully checked any blocks + Ok(made_progress) } - - // Run dependents if we successfully checked any blocks - Ok(made_progress) } } diff --git a/processor/scanner/src/index/mod.rs b/processor/scanner/src/index/mod.rs index 930ce55a..03abc8a8 100644 --- a/processor/scanner/src/index/mod.rs +++ b/processor/scanner/src/index/mod.rs @@ -1,5 +1,6 @@ -use serai_db::{Get, DbTxn, Db}; +use core::future::Future; +use serai_db::{Get, DbTxn, Db}; use primitives::{task::ContinuallyRan, BlockHeader}; use crate::ScannerFeed; @@ -56,58 +57,59 @@ impl IndexTask { } } -#[async_trait::async_trait] impl ContinuallyRan for IndexTask { - async fn run_iteration(&mut self) -> Result { - // Fetch the latest finalized block - let our_latest_finalized = IndexDb::latest_finalized_block(&self.db) - .expect("IndexTask run before writing the start block"); - let latest_finalized = match self.feed.latest_finalized_block_number().await { - Ok(latest_finalized) => latest_finalized, - Err(e) => Err(format!("couldn't fetch the latest finalized block number: {e:?}"))?, - }; - - if latest_finalized < our_latest_finalized { - // Explicitly log this as an error as returned ephemeral errors are logged with debug - // This doesn't panic as the node should sync along our indexed chain, and if it doesn't, - // we'll panic at that point in time - log::error!( - "node is out of sync, latest finalized {} is behind our indexed {}", - latest_finalized, - our_latest_finalized - ); - Err("node is out of sync".to_string())?; - } - - // Index the hashes of all blocks until the latest finalized block - for b in (our_latest_finalized + 1) ..= latest_finalized { - let block = match self.feed.unchecked_block_header_by_number(b).await { - Ok(block) => block, - Err(e) => Err(format!("couldn't fetch block {b}: {e:?}"))?, + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + // Fetch the latest finalized block + let our_latest_finalized = IndexDb::latest_finalized_block(&self.db) + .expect("IndexTask run before writing the start block"); + let latest_finalized = match self.feed.latest_finalized_block_number().await { + Ok(latest_finalized) => latest_finalized, + Err(e) => Err(format!("couldn't fetch the latest finalized block number: {e:?}"))?, }; - // Check this descends from our indexed chain - { - let expected_parent = - IndexDb::block_id(&self.db, b - 1).expect("didn't have the ID of the prior block"); - if block.parent() != expected_parent { - panic!( - "current finalized block (#{b}, {}) doesn't build off finalized block (#{}, {})", - hex::encode(block.parent()), - b - 1, - hex::encode(expected_parent) - ); - } + if latest_finalized < our_latest_finalized { + // Explicitly log this as an error as returned ephemeral errors are logged with debug + // This doesn't panic as the node should sync along our indexed chain, and if it doesn't, + // we'll panic at that point in time + log::error!( + "node is out of sync, latest finalized {} is behind our indexed {}", + latest_finalized, + our_latest_finalized + ); + Err("node is out of sync".to_string())?; } - // Update the latest finalized block - let mut txn = self.db.txn(); - IndexDb::set_block(&mut txn, b, block.id()); - IndexDb::set_latest_finalized_block(&mut txn, b); - txn.commit(); - } + // Index the hashes of all blocks until the latest finalized block + for b in (our_latest_finalized + 1) ..= latest_finalized { + let block = match self.feed.unchecked_block_header_by_number(b).await { + Ok(block) => block, + Err(e) => Err(format!("couldn't fetch block {b}: {e:?}"))?, + }; - // Have dependents run if we updated the latest finalized block - Ok(our_latest_finalized != latest_finalized) + // Check this descends from our indexed chain + { + let expected_parent = + IndexDb::block_id(&self.db, b - 1).expect("didn't have the ID of the prior block"); + if block.parent() != expected_parent { + panic!( + "current finalized block (#{b}, {}) doesn't build off finalized block (#{}, {})", + hex::encode(block.parent()), + b - 1, + hex::encode(expected_parent) + ); + } + } + + // Update the latest finalized block + let mut txn = self.db.txn(); + IndexDb::set_block(&mut txn, b, block.id()); + IndexDb::set_latest_finalized_block(&mut txn, b); + txn.commit(); + } + + // Have dependents run if we updated the latest finalized block + Ok(our_latest_finalized != latest_finalized) + } } } diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index d100815d..a5c5c038 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -2,7 +2,7 @@ #![doc = include_str!("../README.md")] #![deny(missing_docs)] -use core::{marker::PhantomData, fmt::Debug}; +use core::{marker::PhantomData, future::Future, fmt::Debug}; use std::{io, collections::HashMap}; use group::GroupEncoding; @@ -59,7 +59,6 @@ impl BlockExt for B { /// A feed usable to scan a blockchain. /// /// This defines the primitive types used, along with various getters necessary for indexing. -#[async_trait::async_trait] pub trait ScannerFeed: 'static + Send + Sync + Clone { /// The ID of the network being scanned for. const NETWORK: NetworkId; @@ -110,38 +109,43 @@ pub trait ScannerFeed: 'static + Send + Sync + Clone { /// /// The block number is its zero-indexed position within a linear view of the external network's /// consensus. The genesis block accordingly has block number 0. - async fn latest_finalized_block_number(&self) -> Result; + fn latest_finalized_block_number( + &self, + ) -> impl Send + Future>; /// Fetch the timestamp of a block (represented in seconds since the epoch). /// /// This must be monotonically incrementing. Two blocks may share a timestamp. - async fn time_of_block(&self, number: u64) -> Result; + fn time_of_block( + &self, + number: u64, + ) -> impl Send + Future>; /// Fetch a block header by its number. /// /// This does not check the returned BlockHeader is the header for the block we indexed. - async fn unchecked_block_header_by_number( + fn unchecked_block_header_by_number( &self, number: u64, - ) -> Result<::Header, Self::EphemeralError>; + ) -> impl Send + Future::Header, Self::EphemeralError>>; /// Fetch a block by its number. /// /// This does not check the returned Block is the block we indexed. - async fn unchecked_block_by_number( + fn unchecked_block_by_number( &self, number: u64, - ) -> Result; + ) -> impl Send + Future>; /// Fetch a block by its number. /// /// Panics if the block requested wasn't indexed. - async fn block_by_number( + fn block_by_number( &self, getter: &(impl Send + Sync + Get), number: u64, - ) -> Result { - let block = match self.unchecked_block_by_number(number).await { + ) -> impl Send + Future> { + async move {let block = match self.unchecked_block_by_number(number).await { Ok(block) => block, Err(e) => Err(format!("couldn't fetch block {number}: {e:?}"))?, }; @@ -159,7 +163,7 @@ pub trait ScannerFeed: 'static + Send + Sync + Clone { } } - Ok(block) + Ok(block)} } /// The dust threshold for the specified coin. @@ -171,11 +175,11 @@ pub trait ScannerFeed: 'static + Send + Sync + Clone { /// The cost to aggregate an input as of the specified block. /// /// This is defined as the transaction fee for a 2-input, 1-output transaction. - async fn cost_to_aggregate( + fn cost_to_aggregate( &self, coin: Coin, reference_block: &Self::Block, - ) -> Result; + ) -> impl Send + Future>; } /// The key type for this ScannerFeed. diff --git a/processor/scanner/src/report/mod.rs b/processor/scanner/src/report/mod.rs index 5fd2c7eb..afb1b672 100644 --- a/processor/scanner/src/report/mod.rs +++ b/processor/scanner/src/report/mod.rs @@ -1,4 +1,4 @@ -use core::marker::PhantomData; +use core::{marker::PhantomData, future::Future}; use scale::Encode; use serai_db::{DbTxn, Db}; @@ -65,113 +65,119 @@ impl ReportTask { } } -#[async_trait::async_trait] impl ContinuallyRan for ReportTask { - async fn run_iteration(&mut self) -> Result { - let highest_reportable = { - // Fetch the next to scan block - let next_to_scan = next_to_scan_for_outputs_block::(&self.db) + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let highest_reportable = { + // Fetch the next to scan block + let next_to_scan = next_to_scan_for_outputs_block::(&self.db) + .expect("ReportTask run before writing the start block"); + // If we haven't done any work, return + if next_to_scan == 0 { + return Ok(false); + } + // The last scanned block is the block prior to this + #[allow(clippy::let_and_return)] + let last_scanned = next_to_scan - 1; + // The last scanned block is the highest reportable block as we only scan blocks within a + // window where it's safe to immediately report the block + // See `eventuality.rs` for more info + last_scanned + }; + + let next_to_potentially_report = ReportDb::::next_to_potentially_report_block(&self.db) .expect("ReportTask run before writing the start block"); - // If we haven't done any work, return - if next_to_scan == 0 { - return Ok(false); - } - // The last scanned block is the block prior to this - #[allow(clippy::let_and_return)] - let last_scanned = next_to_scan - 1; - // The last scanned block is the highest reportable block as we only scan blocks within a - // window where it's safe to immediately report the block - // See `eventuality.rs` for more info - last_scanned - }; - let next_to_potentially_report = ReportDb::::next_to_potentially_report_block(&self.db) - .expect("ReportTask run before writing the start block"); + for b in next_to_potentially_report ..= highest_reportable { + let mut txn = self.db.txn(); - for b in next_to_potentially_report ..= highest_reportable { - let mut txn = self.db.txn(); + // Receive the InInstructions for this block + // We always do this as we can't trivially tell if we should recv InInstructions before we + // do + let InInstructionData { + external_key_for_session_to_sign_batch, + returnable_in_instructions: in_instructions, + } = ScanToReportDb::::recv_in_instructions(&mut txn, b); + let notable = ScannerGlobalDb::::is_block_notable(&txn, b); + if !notable { + assert!(in_instructions.is_empty(), "block wasn't notable yet had InInstructions"); + } + // If this block is notable, create the Batch(s) for it + if notable { + let network = S::NETWORK; + let block_hash = index::block_id(&txn, b); + let mut batch_id = ReportDb::::acquire_batch_id(&mut txn, b); - // Receive the InInstructions for this block - // We always do this as we can't trivially tell if we should recv InInstructions before we do - let InInstructionData { - external_key_for_session_to_sign_batch, - returnable_in_instructions: in_instructions, - } = ScanToReportDb::::recv_in_instructions(&mut txn, b); - let notable = ScannerGlobalDb::::is_block_notable(&txn, b); - if !notable { - assert!(in_instructions.is_empty(), "block wasn't notable yet had InInstructions"); - } - // If this block is notable, create the Batch(s) for it - if notable { - let network = S::NETWORK; - let block_hash = index::block_id(&txn, b); - let mut batch_id = ReportDb::::acquire_batch_id(&mut txn, b); + // start with empty batch + let mut batches = vec![Batch { + network, + id: batch_id, + block: BlockHash(block_hash), + instructions: vec![], + }]; + // We also track the return information for the InInstructions within a Batch in case + // they error + let mut return_information = vec![vec![]]; - // start with empty batch - let mut batches = - vec![Batch { network, id: batch_id, block: BlockHash(block_hash), instructions: vec![] }]; - // We also track the return information for the InInstructions within a Batch in case they - // error - let mut return_information = vec![vec![]]; + for Returnable { return_address, in_instruction } in in_instructions { + let balance = in_instruction.balance; - for Returnable { return_address, in_instruction } in in_instructions { - let balance = in_instruction.balance; + let batch = batches.last_mut().unwrap(); + batch.instructions.push(in_instruction); - let batch = batches.last_mut().unwrap(); - batch.instructions.push(in_instruction); + // check if batch is over-size + if batch.encode().len() > MAX_BATCH_SIZE { + // pop the last instruction so it's back in size + let in_instruction = batch.instructions.pop().unwrap(); - // check if batch is over-size - if batch.encode().len() > MAX_BATCH_SIZE { - // pop the last instruction so it's back in size - let in_instruction = batch.instructions.pop().unwrap(); + // bump the id for the new batch + batch_id = ReportDb::::acquire_batch_id(&mut txn, b); - // bump the id for the new batch - batch_id = ReportDb::::acquire_batch_id(&mut txn, b); + // make a new batch with this instruction included + batches.push(Batch { + network, + id: batch_id, + block: BlockHash(block_hash), + instructions: vec![in_instruction], + }); + // Since we're allocating a new batch, allocate a new set of return addresses for it + return_information.push(vec![]); + } - // make a new batch with this instruction included - batches.push(Batch { - network, - id: batch_id, - block: BlockHash(block_hash), - instructions: vec![in_instruction], - }); - // Since we're allocating a new batch, allocate a new set of return addresses for it - return_information.push(vec![]); + // For the set of return addresses for the InInstructions for the batch we just pushed + // onto, push this InInstruction's return addresses + return_information + .last_mut() + .unwrap() + .push(return_address.map(|address| ReturnInformation { address, balance })); } - // For the set of return addresses for the InInstructions for the batch we just pushed - // onto, push this InInstruction's return addresses - return_information - .last_mut() - .unwrap() - .push(return_address.map(|address| ReturnInformation { address, balance })); + // Save the return addresses to the database + assert_eq!(batches.len(), return_information.len()); + for (batch, return_information) in batches.iter().zip(&return_information) { + assert_eq!(batch.instructions.len(), return_information.len()); + ReportDb::::save_external_key_for_session_to_sign_batch( + &mut txn, + batch.id, + &external_key_for_session_to_sign_batch, + ); + ReportDb::::save_return_information(&mut txn, batch.id, return_information); + } + + for batch in batches { + Batches::send(&mut txn, &batch); + BatchesToSign::send(&mut txn, &external_key_for_session_to_sign_batch, &batch); + } } - // Save the return addresses to the database - assert_eq!(batches.len(), return_information.len()); - for (batch, return_information) in batches.iter().zip(&return_information) { - assert_eq!(batch.instructions.len(), return_information.len()); - ReportDb::::save_external_key_for_session_to_sign_batch( - &mut txn, - batch.id, - &external_key_for_session_to_sign_batch, - ); - ReportDb::::save_return_information(&mut txn, batch.id, return_information); - } + // Update the next to potentially report block + ReportDb::::set_next_to_potentially_report_block(&mut txn, b + 1); - for batch in batches { - Batches::send(&mut txn, &batch); - BatchesToSign::send(&mut txn, &external_key_for_session_to_sign_batch, &batch); - } + txn.commit(); } - // Update the next to potentially report block - ReportDb::::set_next_to_potentially_report_block(&mut txn, b + 1); - - txn.commit(); + // Run dependents if we decided to report any blocks + Ok(next_to_potentially_report <= highest_reportable) } - - // Run dependents if we decided to report any blocks - Ok(next_to_potentially_report <= highest_reportable) } } diff --git a/processor/scanner/src/scan/mod.rs b/processor/scanner/src/scan/mod.rs index 91c97f60..c54dc3e0 100644 --- a/processor/scanner/src/scan/mod.rs +++ b/processor/scanner/src/scan/mod.rs @@ -1,3 +1,4 @@ +use core::future::Future; use std::collections::HashMap; use scale::Decode; @@ -107,258 +108,262 @@ impl ScanTask { } } -#[async_trait::async_trait] impl ContinuallyRan for ScanTask { - async fn run_iteration(&mut self) -> Result { - // Fetch the safe to scan block - let latest_scannable = - latest_scannable_block::(&self.db).expect("ScanTask run before writing the start block"); - // Fetch the next block to scan - let next_to_scan = ScanDb::::next_to_scan_for_outputs_block(&self.db) - .expect("ScanTask run before writing the start block"); + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + // Fetch the safe to scan block + let latest_scannable = + latest_scannable_block::(&self.db).expect("ScanTask run before writing the start block"); + // Fetch the next block to scan + let next_to_scan = ScanDb::::next_to_scan_for_outputs_block(&self.db) + .expect("ScanTask run before writing the start block"); - for b in next_to_scan ..= latest_scannable { - let block = self.feed.block_by_number(&self.db, b).await?; + for b in next_to_scan ..= latest_scannable { + let block = self.feed.block_by_number(&self.db, b).await?; - log::info!("scanning block: {} ({b})", hex::encode(block.id())); + log::info!("scanning block: {} ({b})", hex::encode(block.id())); - let mut txn = self.db.txn(); + let mut txn = self.db.txn(); - assert_eq!(ScanDb::::next_to_scan_for_outputs_block(&txn).unwrap(), b); + assert_eq!(ScanDb::::next_to_scan_for_outputs_block(&txn).unwrap(), b); - // Tidy the keys, then fetch them - // We don't have to tidy them here, we just have to somewhere, so why not here? - ScannerGlobalDb::::tidy_keys(&mut txn); - let keys = ScannerGlobalDb::::active_keys_as_of_next_to_scan_for_outputs_block(&txn) - .expect("scanning for a blockchain without any keys set"); + // Tidy the keys, then fetch them + // We don't have to tidy them here, we just have to somewhere, so why not here? + ScannerGlobalDb::::tidy_keys(&mut txn); + let keys = ScannerGlobalDb::::active_keys_as_of_next_to_scan_for_outputs_block(&txn) + .expect("scanning for a blockchain without any keys set"); - // The scan data for this block - let mut scan_data = SenderScanData { - block_number: b, - received_external_outputs: vec![], - forwards: vec![], - returns: vec![], - }; - // The InInstructions for this block - let mut in_instructions = vec![]; - - // The outputs queued for this block - let queued_outputs = { - let mut queued_outputs = ScanDb::::take_queued_outputs(&mut txn, b); - // Sort the queued outputs in case they weren't queued in a deterministic fashion - queued_outputs.sort_by(|a, b| sort_outputs(&a.output, &b.output)); - queued_outputs - }; - for queued_output in queued_outputs { - in_instructions.push(( - queued_output.output.id(), - Returnable { - return_address: queued_output.return_address, - in_instruction: queued_output.in_instruction, - }, - )); - scan_data.received_external_outputs.push(queued_output.output); - } - - // We subtract the cost to aggregate from some outputs we scan - // This cost is fetched with an asynchronous function which may be non-trivial - // We cache the result of this function here to avoid calling it multiple times - let mut costs_to_aggregate = HashMap::with_capacity(1); - - // Scan for each key - for key in &keys { - for output in block.scan_for_outputs(key.key) { - assert_eq!(output.key(), key.key); - - /* - The scan task runs ahead of time, obtaining ordering on the external network's blocks - with relation to events on the Serai network. This is done via publishing a Batch which - contains the InInstructions from External outputs. Accordingly, the scan process only - has to yield External outputs. - - It'd appear to make sense to scan for all outputs, and after scanning for all outputs, - yield all outputs. The issue is we can't identify outputs we created here. We can only - identify the outputs we receive and their *declared intention*. - - We only want to handle Change/Branch/Forwarded outputs we made ourselves. For - Forwarded, the reasoning is obvious (retiring multisigs should only downsize, yet - accepting new outputs solely because they claim to be Forwarded would increase the size - of the multisig). For Change/Branch, it's because such outputs which aren't ours are - pointless. They wouldn't hurt to accumulate though. - - The issue is they would hurt to accumulate. We want to filter outputs which are less - than their cost to aggregate, a variable itself variable to the current blockchain. We - can filter such outputs here, yet if we drop a Change output, we create an insolvency. - We'd need to track the loss and offset it later. That means we can't filter such - outputs, as we expect any Change output we make. - - The issue is the Change outputs we don't make. Someone can create an output declaring - to be Change, yet not actually Change. If we don't filter it, it'd be queued for - accumulation, yet it may cost more to accumulate than it's worth. - - The solution is to let the Eventuality task, which does know if we made an output or - not (or rather, if a transaction is identical to a transaction which should exist - regarding effects) decide to keep/yield the outputs which we should only keep if we - made them (as Serai itself should not make worthless outputs, so we can assume they're - worthwhile, and even if they're not economically, they are technically). - - The alternative, we drop outputs here with a generic filter rule and then report back - the insolvency created, still doesn't work as we'd only be creating an insolvency if - the output was actually made by us (and not simply someone else sending in). We can - have the Eventuality task report the insolvency, yet that requires the scanner be - responsible for such filter logic. It's more flexible, and has a cleaner API, - to do so at a higher level. - */ - if output.kind() != OutputType::External { - // While we don't report these outputs, we still need consensus on this block and - // accordingly still need to set it as notable - let balance = output.balance(); - // We ensure it's over the dust limit to prevent people sending 1 satoshi from causing - // an invocation of a consensus/signing protocol - if balance.amount.0 >= S::dust(balance.coin).0 { - ScannerGlobalDb::::flag_notable_due_to_non_external_output(&mut txn, b); - } - continue; - } - - // Check this isn't dust - let balance_to_use = { - let mut balance = output.balance(); - - // First, subtract 2 * the cost to aggregate, as detailed in - // `spec/processor/UTXO Management.md` - - // We cache this, so if it isn't yet cached, insert it into the cache - if let std::collections::hash_map::Entry::Vacant(e) = - costs_to_aggregate.entry(balance.coin) - { - e.insert(self.feed.cost_to_aggregate(balance.coin, &block).await.map_err(|e| { - format!( - "ScanTask couldn't fetch cost to aggregate {:?} at {b}: {e:?}", - balance.coin - ) - })?); - } - let cost_to_aggregate = costs_to_aggregate[&balance.coin]; - balance.amount.0 -= 2 * cost_to_aggregate.0; - - // Now, check it's still past the dust threshold - if balance.amount.0 < S::dust(balance.coin).0 { - continue; - } - - balance - }; - - // Fetch the InInstruction/return addr for this output - let output_with_in_instruction = match in_instruction_from_output::(&output) { - (return_address, Some(instruction)) => OutputWithInInstruction { - output, - return_address, - in_instruction: InInstructionWithBalance { instruction, balance: balance_to_use }, - }, - (Some(address), None) => { - // Since there was no instruction here, return this since we parsed a return address - if key.stage != LifetimeStage::Finishing { - scan_data.returns.push(Return { address, output }); - } - continue; - } - // Since we didn't receive an instruction nor can we return this, queue this for - // accumulation and move on - (None, None) => { - if key.stage != LifetimeStage::Finishing { - scan_data.received_external_outputs.push(output); - } - continue; - } - }; - - // Drop External outputs if they're to a multisig which won't report them - // This means we should report any External output we save to disk here - #[allow(clippy::match_same_arms)] - match key.stage { - // This multisig isn't yet reporting its External outputs to avoid a DoS - // Queue the output to be reported when this multisig starts reporting - LifetimeStage::ActiveYetNotReporting => { - ScanDb::::queue_output_until_block( - &mut txn, - key.block_at_which_reporting_starts, - &output_with_in_instruction, - ); - continue; - } - // We should report External outputs in these cases - LifetimeStage::Active | LifetimeStage::UsingNewForChange => {} - // We should report External outputs only once forwarded, where they'll appear as - // OutputType::Forwarded. We save them now for when they appear - LifetimeStage::Forwarding => { - // When the forwarded output appears, we can see which Plan it's associated with and - // from there recover this output - scan_data.forwards.push(output_with_in_instruction); - continue; - } - // We should drop these as we should not be handling new External outputs at this - // time - LifetimeStage::Finishing => { - continue; - } - } - // Ensures we didn't miss a `continue` above - assert!(matches!(key.stage, LifetimeStage::Active | LifetimeStage::UsingNewForChange)); + // The scan data for this block + let mut scan_data = SenderScanData { + block_number: b, + received_external_outputs: vec![], + forwards: vec![], + returns: vec![], + }; + // The InInstructions for this block + let mut in_instructions = vec![]; + // The outputs queued for this block + let queued_outputs = { + let mut queued_outputs = ScanDb::::take_queued_outputs(&mut txn, b); + // Sort the queued outputs in case they weren't queued in a deterministic fashion + queued_outputs.sort_by(|a, b| sort_outputs(&a.output, &b.output)); + queued_outputs + }; + for queued_output in queued_outputs { in_instructions.push(( - output_with_in_instruction.output.id(), + queued_output.output.id(), Returnable { - return_address: output_with_in_instruction.return_address, - in_instruction: output_with_in_instruction.in_instruction, + return_address: queued_output.return_address, + in_instruction: queued_output.in_instruction, }, )); - scan_data.received_external_outputs.push(output_with_in_instruction.output); + scan_data.received_external_outputs.push(queued_output.output); } - } - // Sort the InInstructions by the output ID - in_instructions.sort_by(|(output_id_a, _), (output_id_b, _)| { - use core::cmp::{Ordering, Ord}; - let res = output_id_a.as_ref().cmp(output_id_b.as_ref()); - assert!(res != Ordering::Equal, "two outputs within a collection had the same ID"); - res - }); - // Check we haven't prior reported an InInstruction for this output - // This is a sanity check which is intended to prevent multiple instances of sriXYZ on-chain - // due to a single output - for (id, _) in &in_instructions { - assert!( - !ScanDb::::prior_reported_in_instruction_for_output(&txn, id), - "prior reported an InInstruction for an output with this ID" + // We subtract the cost to aggregate from some outputs we scan + // This cost is fetched with an asynchronous function which may be non-trivial + // We cache the result of this function here to avoid calling it multiple times + let mut costs_to_aggregate = HashMap::with_capacity(1); + + // Scan for each key + for key in &keys { + for output in block.scan_for_outputs(key.key) { + assert_eq!(output.key(), key.key); + + /* + The scan task runs ahead of time, obtaining ordering on the external network's blocks + with relation to events on the Serai network. This is done via publishing a Batch + which contains the InInstructions from External outputs. Accordingly, the scan + process only has to yield External outputs. + + It'd appear to make sense to scan for all outputs, and after scanning for all + outputs, yield all outputs. The issue is we can't identify outputs we created here. + We can only identify the outputs we receive and their *declared intention*. + + We only want to handle Change/Branch/Forwarded outputs we made ourselves. For + Forwarded, the reasoning is obvious (retiring multisigs should only downsize, yet + accepting new outputs solely because they claim to be Forwarded would increase the + size of the multisig). For Change/Branch, it's because such outputs which aren't ours + are pointless. They wouldn't hurt to accumulate though. + + The issue is they would hurt to accumulate. We want to filter outputs which are less + than their cost to aggregate, a variable itself variable to the current blockchain. + We can filter such outputs here, yet if we drop a Change output, we create an + insolvency. We'd need to track the loss and offset it later. That means we can't + filter such outputs, as we expect any Change output we make. + + The issue is the Change outputs we don't make. Someone can create an output declaring + to be Change, yet not actually Change. If we don't filter it, it'd be queued for + accumulation, yet it may cost more to accumulate than it's worth. + + The solution is to let the Eventuality task, which does know if we made an output or + not (or rather, if a transaction is identical to a transaction which should exist + regarding effects) decide to keep/yield the outputs which we should only keep if we + made them (as Serai itself should not make worthless outputs, so we can assume + they're worthwhile, and even if they're not economically, they are technically). + + The alternative, we drop outputs here with a generic filter rule and then report back + the insolvency created, still doesn't work as we'd only be creating an insolvency if + the output was actually made by us (and not simply someone else sending in). We can + have the Eventuality task report the insolvency, yet that requires the scanner be + responsible for such filter logic. It's more flexible, and has a cleaner API, + to do so at a higher level. + */ + if output.kind() != OutputType::External { + // While we don't report these outputs, we still need consensus on this block and + // accordingly still need to set it as notable + let balance = output.balance(); + // We ensure it's over the dust limit to prevent people sending 1 satoshi from + // causing an invocation of a consensus/signing protocol + if balance.amount.0 >= S::dust(balance.coin).0 { + ScannerGlobalDb::::flag_notable_due_to_non_external_output(&mut txn, b); + } + continue; + } + + // Check this isn't dust + let balance_to_use = { + let mut balance = output.balance(); + + // First, subtract 2 * the cost to aggregate, as detailed in + // `spec/processor/UTXO Management.md` + + // We cache this, so if it isn't yet cached, insert it into the cache + if let std::collections::hash_map::Entry::Vacant(e) = + costs_to_aggregate.entry(balance.coin) + { + e.insert(self.feed.cost_to_aggregate(balance.coin, &block).await.map_err(|e| { + format!( + "ScanTask couldn't fetch cost to aggregate {:?} at {b}: {e:?}", + balance.coin + ) + })?); + } + let cost_to_aggregate = costs_to_aggregate[&balance.coin]; + balance.amount.0 -= 2 * cost_to_aggregate.0; + + // Now, check it's still past the dust threshold + if balance.amount.0 < S::dust(balance.coin).0 { + continue; + } + + balance + }; + + // Fetch the InInstruction/return addr for this output + let output_with_in_instruction = match in_instruction_from_output::(&output) { + (return_address, Some(instruction)) => OutputWithInInstruction { + output, + return_address, + in_instruction: InInstructionWithBalance { instruction, balance: balance_to_use }, + }, + (Some(address), None) => { + // Since there was no instruction here, return this since we parsed a return + // address + if key.stage != LifetimeStage::Finishing { + scan_data.returns.push(Return { address, output }); + } + continue; + } + // Since we didn't receive an instruction nor can we return this, queue this for + // accumulation and move on + (None, None) => { + if key.stage != LifetimeStage::Finishing { + scan_data.received_external_outputs.push(output); + } + continue; + } + }; + + // Drop External outputs if they're to a multisig which won't report them + // This means we should report any External output we save to disk here + #[allow(clippy::match_same_arms)] + match key.stage { + // This multisig isn't yet reporting its External outputs to avoid a DoS + // Queue the output to be reported when this multisig starts reporting + LifetimeStage::ActiveYetNotReporting => { + ScanDb::::queue_output_until_block( + &mut txn, + key.block_at_which_reporting_starts, + &output_with_in_instruction, + ); + continue; + } + // We should report External outputs in these cases + LifetimeStage::Active | LifetimeStage::UsingNewForChange => {} + // We should report External outputs only once forwarded, where they'll appear as + // OutputType::Forwarded. We save them now for when they appear + LifetimeStage::Forwarding => { + // When the forwarded output appears, we can see which Plan it's associated with + // and from there recover this output + scan_data.forwards.push(output_with_in_instruction); + continue; + } + // We should drop these as we should not be handling new External outputs at this + // time + LifetimeStage::Finishing => { + continue; + } + } + // Ensures we didn't miss a `continue` above + assert!(matches!(key.stage, LifetimeStage::Active | LifetimeStage::UsingNewForChange)); + + in_instructions.push(( + output_with_in_instruction.output.id(), + Returnable { + return_address: output_with_in_instruction.return_address, + in_instruction: output_with_in_instruction.in_instruction, + }, + )); + scan_data.received_external_outputs.push(output_with_in_instruction.output); + } + } + + // Sort the InInstructions by the output ID + in_instructions.sort_by(|(output_id_a, _), (output_id_b, _)| { + use core::cmp::{Ordering, Ord}; + let res = output_id_a.as_ref().cmp(output_id_b.as_ref()); + assert!(res != Ordering::Equal, "two outputs within a collection had the same ID"); + res + }); + // Check we haven't prior reported an InInstruction for this output + // This is a sanity check which is intended to prevent multiple instances of sriXYZ + // on-chain due to a single output + for (id, _) in &in_instructions { + assert!( + !ScanDb::::prior_reported_in_instruction_for_output(&txn, id), + "prior reported an InInstruction for an output with this ID" + ); + ScanDb::::reported_in_instruction_for_output(&mut txn, id); + } + // Reformat the InInstructions to just the InInstructions + let in_instructions = in_instructions + .into_iter() + .map(|(_id, in_instruction)| in_instruction) + .collect::>(); + // Send the InInstructions to the report task + // We need to also specify which key is responsible for signing the Batch for these, which + // will always be the oldest key (as the new key signing the Batch signifies handover + // acceptance) + ScanToReportDb::::send_in_instructions( + &mut txn, + b, + &InInstructionData { + external_key_for_session_to_sign_batch: keys[0].key, + returnable_in_instructions: in_instructions, + }, ); - ScanDb::::reported_in_instruction_for_output(&mut txn, id); + + // Send the scan data to the eventuality task + ScanToEventualityDb::::send_scan_data(&mut txn, b, &scan_data); + // Update the next to scan block + ScanDb::::set_next_to_scan_for_outputs_block(&mut txn, b + 1); + txn.commit(); } - // Reformat the InInstructions to just the InInstructions - let in_instructions = - in_instructions.into_iter().map(|(_id, in_instruction)| in_instruction).collect::>(); - // Send the InInstructions to the report task - // We need to also specify which key is responsible for signing the Batch for these, which - // will always be the oldest key (as the new key signing the Batch signifies handover - // acceptance) - ScanToReportDb::::send_in_instructions( - &mut txn, - b, - &InInstructionData { - external_key_for_session_to_sign_batch: keys[0].key, - returnable_in_instructions: in_instructions, - }, - ); - // Send the scan data to the eventuality task - ScanToEventualityDb::::send_scan_data(&mut txn, b, &scan_data); - // Update the next to scan block - ScanDb::::set_next_to_scan_for_outputs_block(&mut txn, b + 1); - txn.commit(); + // Run dependents if we successfully scanned any blocks + Ok(next_to_scan <= latest_scannable) } - - // Run dependents if we successfully scanned any blocks - Ok(next_to_scan <= latest_scannable) } } diff --git a/processor/scanner/src/substrate/mod.rs b/processor/scanner/src/substrate/mod.rs index fc97daf3..a7302e5c 100644 --- a/processor/scanner/src/substrate/mod.rs +++ b/processor/scanner/src/substrate/mod.rs @@ -1,4 +1,4 @@ -use core::marker::PhantomData; +use core::{marker::PhantomData, future::Future}; use serai_db::{DbTxn, Db}; @@ -52,115 +52,121 @@ impl SubstrateTask { } } -#[async_trait::async_trait] impl ContinuallyRan for SubstrateTask { - async fn run_iteration(&mut self) -> Result { - let mut made_progress = false; - loop { - // Fetch the next action to handle - let mut txn = self.db.txn(); - let Some(action) = SubstrateDb::::next_action(&mut txn) else { - drop(txn); - return Ok(made_progress); - }; + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let mut made_progress = false; + loop { + // Fetch the next action to handle + let mut txn = self.db.txn(); + let Some(action) = SubstrateDb::::next_action(&mut txn) else { + drop(txn); + return Ok(made_progress); + }; - match action { - Action::AcknowledgeBatch(AcknowledgeBatch { - batch_id, - in_instruction_succeededs, - mut burns, - key_to_activate, - }) => { - // Check if we have the information for this batch - let Some(block_number) = report::take_block_number_for_batch::(&mut txn, batch_id) - else { - // If we don't, drop this txn (restoring the action to the database) - drop(txn); - return Ok(made_progress); - }; + match action { + Action::AcknowledgeBatch(AcknowledgeBatch { + batch_id, + in_instruction_succeededs, + mut burns, + key_to_activate, + }) => { + // Check if we have the information for this batch + let Some(block_number) = report::take_block_number_for_batch::(&mut txn, batch_id) + else { + // If we don't, drop this txn (restoring the action to the database) + drop(txn); + return Ok(made_progress); + }; - { - let external_key_for_session_to_sign_batch = - report::take_external_key_for_session_to_sign_batch::(&mut txn, batch_id).unwrap(); - AcknowledgedBatches::send(&mut txn, &external_key_for_session_to_sign_batch, batch_id); - } - - // Mark we made progress and handle this - made_progress = true; - - assert!( - ScannerGlobalDb::::is_block_notable(&txn, block_number), - "acknowledging a block which wasn't notable" - ); - if let Some(prior_highest_acknowledged_block) = - ScannerGlobalDb::::highest_acknowledged_block(&txn) - { - // If a single block produced multiple Batches, the block number won't increment - assert!( - block_number >= prior_highest_acknowledged_block, - "acknowledging blocks out-of-order" - ); - for b in (prior_highest_acknowledged_block + 1) .. block_number { - assert!( - !ScannerGlobalDb::::is_block_notable(&txn, b), - "skipped acknowledging a block which was notable" + { + let external_key_for_session_to_sign_batch = + report::take_external_key_for_session_to_sign_batch::(&mut txn, batch_id) + .unwrap(); + AcknowledgedBatches::send( + &mut txn, + &external_key_for_session_to_sign_batch, + batch_id, ); } - } - ScannerGlobalDb::::set_highest_acknowledged_block(&mut txn, block_number); - if let Some(key_to_activate) = key_to_activate { - ScannerGlobalDb::::queue_key( - &mut txn, - block_number + S::WINDOW_LENGTH, - key_to_activate, + // Mark we made progress and handle this + made_progress = true; + + assert!( + ScannerGlobalDb::::is_block_notable(&txn, block_number), + "acknowledging a block which wasn't notable" ); - } + if let Some(prior_highest_acknowledged_block) = + ScannerGlobalDb::::highest_acknowledged_block(&txn) + { + // If a single block produced multiple Batches, the block number won't increment + assert!( + block_number >= prior_highest_acknowledged_block, + "acknowledging blocks out-of-order" + ); + for b in (prior_highest_acknowledged_block + 1) .. block_number { + assert!( + !ScannerGlobalDb::::is_block_notable(&txn, b), + "skipped acknowledging a block which was notable" + ); + } + } - // Return the balances for any InInstructions which failed to execute - { - let return_information = report::take_return_information::(&mut txn, batch_id) - .expect("didn't save the return information for Batch we published"); - assert_eq!( + ScannerGlobalDb::::set_highest_acknowledged_block(&mut txn, block_number); + if let Some(key_to_activate) = key_to_activate { + ScannerGlobalDb::::queue_key( + &mut txn, + block_number + S::WINDOW_LENGTH, + key_to_activate, + ); + } + + // Return the balances for any InInstructions which failed to execute + { + let return_information = report::take_return_information::(&mut txn, batch_id) + .expect("didn't save the return information for Batch we published"); + assert_eq!( in_instruction_succeededs.len(), return_information.len(), "amount of InInstruction succeededs differed from amount of return information saved" ); - // We map these into standard Burns - for (succeeded, return_information) in - in_instruction_succeededs.into_iter().zip(return_information) - { - if succeeded { - continue; - } + // We map these into standard Burns + for (succeeded, return_information) in + in_instruction_succeededs.into_iter().zip(return_information) + { + if succeeded { + continue; + } - if let Some(report::ReturnInformation { address, balance }) = return_information { - burns.push(OutInstructionWithBalance { - instruction: OutInstruction { address: address.into(), data: None }, - balance, - }); + if let Some(report::ReturnInformation { address, balance }) = return_information { + burns.push(OutInstructionWithBalance { + instruction: OutInstruction { address: address.into(), data: None }, + balance, + }); + } } } + + // We send these Burns as stemming from this block we just acknowledged + // This causes them to be acted on after we accumulate the outputs from this block + SubstrateToEventualityDb::send_burns::(&mut txn, block_number, burns); } - // We send these Burns as stemming from this block we just acknowledged - // This causes them to be acted on after we accumulate the outputs from this block - SubstrateToEventualityDb::send_burns::(&mut txn, block_number, burns); + Action::QueueBurns(burns) => { + // We can instantly handle this so long as we've handled all prior actions + made_progress = true; + + let queue_as_of = ScannerGlobalDb::::highest_acknowledged_block(&txn) + .expect("queueing Burns yet never acknowledged a block"); + + SubstrateToEventualityDb::send_burns::(&mut txn, queue_as_of, burns); + } } - Action::QueueBurns(burns) => { - // We can instantly handle this so long as we've handled all prior actions - made_progress = true; - - let queue_as_of = ScannerGlobalDb::::highest_acknowledged_block(&txn) - .expect("queueing Burns yet never acknowledged a block"); - - SubstrateToEventualityDb::send_burns::(&mut txn, queue_as_of, burns); - } + txn.commit(); } - - txn.commit(); } } } diff --git a/processor/signers/Cargo.toml b/processor/signers/Cargo.toml index 7b7ef098..65222896 100644 --- a/processor/signers/Cargo.toml +++ b/processor/signers/Cargo.toml @@ -14,13 +14,12 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [package.metadata.cargo-machete] -ignored = ["borsh", "scale"] +ignored = ["borsh"] [lints] workspace = true [dependencies] -async-trait = { version = "0.1", default-features = false } rand_core = { version = "0.6", default-features = false } zeroize = { version = "1", default-features = false, features = ["std"] } diff --git a/processor/signers/src/batch/mod.rs b/processor/signers/src/batch/mod.rs index f08fb5e2..b8ad7ccb 100644 --- a/processor/signers/src/batch/mod.rs +++ b/processor/signers/src/batch/mod.rs @@ -1,3 +1,4 @@ +use core::future::Future; use std::collections::HashSet; use ciphersuite::{group::GroupEncoding, Ristretto}; @@ -75,114 +76,115 @@ impl BatchSignerTask { } } -#[async_trait::async_trait] impl ContinuallyRan for BatchSignerTask { - async fn run_iteration(&mut self) -> Result { - let mut iterated = false; + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let mut iterated = false; - // Check for new batches to sign - loop { - let mut txn = self.db.txn(); - let Some(batch) = BatchesToSign::try_recv(&mut txn, &self.external_key) else { - break; - }; - iterated = true; + // Check for new batches to sign + loop { + let mut txn = self.db.txn(); + let Some(batch) = BatchesToSign::try_recv(&mut txn, &self.external_key) else { + break; + }; + iterated = true; - // Save this to the database as a transaction to sign - self.active_signing_protocols.insert(batch.id); - ActiveSigningProtocols::set( - &mut txn, - self.session, - &self.active_signing_protocols.iter().copied().collect(), - ); - Batches::set(&mut txn, batch.id, &batch); + // Save this to the database as a transaction to sign + self.active_signing_protocols.insert(batch.id); + ActiveSigningProtocols::set( + &mut txn, + self.session, + &self.active_signing_protocols.iter().copied().collect(), + ); + Batches::set(&mut txn, batch.id, &batch); - let mut machines = Vec::with_capacity(self.keys.len()); - for keys in &self.keys { - machines.push(WrappedSchnorrkelMachine::new(keys.clone(), batch_message(&batch))); - } - for msg in self.attempt_manager.register(VariantSignId::Batch(batch.id), machines) { - BatchSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); - } - - txn.commit(); - } - - // Check for acknowledged Batches (meaning we should no longer sign for these Batches) - loop { - let mut txn = self.db.txn(); - let Some(id) = AcknowledgedBatches::try_recv(&mut txn, &self.external_key) else { - break; - }; - - { - let last_acknowledged = LastAcknowledgedBatch::get(&txn); - if Some(id) > last_acknowledged { - LastAcknowledgedBatch::set(&mut txn, &id); + let mut machines = Vec::with_capacity(self.keys.len()); + for keys in &self.keys { + machines.push(WrappedSchnorrkelMachine::new(keys.clone(), batch_message(&batch))); } + for msg in self.attempt_manager.register(VariantSignId::Batch(batch.id), machines) { + BatchSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + } + + txn.commit(); } - /* - We may have yet to register this signing protocol. + // Check for acknowledged Batches (meaning we should no longer sign for these Batches) + loop { + let mut txn = self.db.txn(); + let Some(id) = AcknowledgedBatches::try_recv(&mut txn, &self.external_key) else { + break; + }; - While `BatchesToSign` is populated before `AcknowledgedBatches`, we could theoretically have - `BatchesToSign` populated with a new batch _while iterating over `AcknowledgedBatches`_, and - then have `AcknowledgedBatched` populated. In that edge case, we will see the - acknowledgement notification before we see the transaction. - - In such a case, we break (dropping the txn, re-queueing the acknowledgement notification). - On the task's next iteration, we'll process the Batch from `BatchesToSign` and be - able to make progress. - */ - if !self.active_signing_protocols.remove(&id) { - break; - } - iterated = true; - - // Since it was, remove this as an active signing protocol - ActiveSigningProtocols::set( - &mut txn, - self.session, - &self.active_signing_protocols.iter().copied().collect(), - ); - // Clean up the database - Batches::del(&mut txn, id); - SignedBatches::del(&mut txn, id); - - // We retire with a txn so we either successfully flag this Batch as acknowledged, and - // won't re-register it (making this retire safe), or we don't flag it, meaning we will - // re-register it, yet that's safe as we have yet to retire it - self.attempt_manager.retire(&mut txn, VariantSignId::Batch(id)); - - txn.commit(); - } - - // Handle any messages sent to us - loop { - let mut txn = self.db.txn(); - let Some(msg) = CoordinatorToBatchSignerMessages::try_recv(&mut txn, self.session) else { - break; - }; - iterated = true; - - match self.attempt_manager.handle(msg) { - Response::Messages(msgs) => { - for msg in msgs { - BatchSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + { + let last_acknowledged = LastAcknowledgedBatch::get(&txn); + if Some(id) > last_acknowledged { + LastAcknowledgedBatch::set(&mut txn, &id); } } - Response::Signature { id, signature } => { - let VariantSignId::Batch(id) = id else { panic!("BatchSignerTask signed a non-Batch") }; - let batch = - Batches::get(&txn, id).expect("signed a Batch we didn't save to the database"); - let signed_batch = SignedBatch { batch, signature: signature.into() }; - SignedBatches::set(&mut txn, signed_batch.batch.id, &signed_batch); + + /* + We may have yet to register this signing protocol. + + While `BatchesToSign` is populated before `AcknowledgedBatches`, we could theoretically + have `BatchesToSign` populated with a new batch _while iterating over + `AcknowledgedBatches`_, and then have `AcknowledgedBatched` populated. In that edge case, + we will see the acknowledgement notification before we see the transaction. + + In such a case, we break (dropping the txn, re-queueing the acknowledgement notification). + On the task's next iteration, we'll process the Batch from `BatchesToSign` and be + able to make progress. + */ + if !self.active_signing_protocols.remove(&id) { + break; } + iterated = true; + + // Since it was, remove this as an active signing protocol + ActiveSigningProtocols::set( + &mut txn, + self.session, + &self.active_signing_protocols.iter().copied().collect(), + ); + // Clean up the database + Batches::del(&mut txn, id); + SignedBatches::del(&mut txn, id); + + // We retire with a txn so we either successfully flag this Batch as acknowledged, and + // won't re-register it (making this retire safe), or we don't flag it, meaning we will + // re-register it, yet that's safe as we have yet to retire it + self.attempt_manager.retire(&mut txn, VariantSignId::Batch(id)); + + txn.commit(); } - txn.commit(); - } + // Handle any messages sent to us + loop { + let mut txn = self.db.txn(); + let Some(msg) = CoordinatorToBatchSignerMessages::try_recv(&mut txn, self.session) else { + break; + }; + iterated = true; - Ok(iterated) + match self.attempt_manager.handle(msg) { + Response::Messages(msgs) => { + for msg in msgs { + BatchSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + } + } + Response::Signature { id, signature } => { + let VariantSignId::Batch(id) = id else { panic!("BatchSignerTask signed a non-Batch") }; + let batch = + Batches::get(&txn, id).expect("signed a Batch we didn't save to the database"); + let signed_batch = SignedBatch { batch, signature: signature.into() }; + SignedBatches::set(&mut txn, signed_batch.batch.id, &signed_batch); + } + } + + txn.commit(); + } + + Ok(iterated) + } } } diff --git a/processor/signers/src/coordinator/mod.rs b/processor/signers/src/coordinator/mod.rs index e749f841..1e3c84d2 100644 --- a/processor/signers/src/coordinator/mod.rs +++ b/processor/signers/src/coordinator/mod.rs @@ -1,3 +1,5 @@ +use core::future::Future; + use scale::Decode; use serai_db::{DbTxn, Db}; @@ -19,149 +21,157 @@ impl CoordinatorTask { } } -#[async_trait::async_trait] impl ContinuallyRan for CoordinatorTask { - async fn run_iteration(&mut self) -> Result { - let mut iterated = false; + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let mut iterated = false; - for session in RegisteredKeys::get(&self.db).unwrap_or(vec![]) { - // Publish the messages generated by this key's signers - loop { - let mut txn = self.db.txn(); - let Some(msg) = CosignerToCoordinatorMessages::try_recv(&mut txn, session) else { - break; - }; - iterated = true; - - self - .coordinator - .send(msg) - .await - .map_err(|e| format!("couldn't send sign message to the coordinator: {e:?}"))?; - - txn.commit(); - } - - loop { - let mut txn = self.db.txn(); - let Some(msg) = BatchSignerToCoordinatorMessages::try_recv(&mut txn, session) else { - break; - }; - iterated = true; - - self - .coordinator - .send(msg) - .await - .map_err(|e| format!("couldn't send sign message to the coordinator: {e:?}"))?; - - txn.commit(); - } - - loop { - let mut txn = self.db.txn(); - let Some(msg) = SlashReportSignerToCoordinatorMessages::try_recv(&mut txn, session) else { - break; - }; - iterated = true; - - self - .coordinator - .send(msg) - .await - .map_err(|e| format!("couldn't send sign message to the coordinator: {e:?}"))?; - - txn.commit(); - } - - loop { - let mut txn = self.db.txn(); - let Some(msg) = TransactionSignerToCoordinatorMessages::try_recv(&mut txn, session) else { - break; - }; - iterated = true; - - self - .coordinator - .send(msg) - .await - .map_err(|e| format!("couldn't send sign message to the coordinator: {e:?}"))?; - - txn.commit(); - } - - // Publish the cosigns from this session - { - let mut txn = self.db.txn(); - while let Some(((block_number, block_id), signature)) = Cosign::try_recv(&mut txn, session) - { - iterated = true; - self - .coordinator - .publish_cosign(block_number, block_id, <_>::decode(&mut signature.as_slice()).unwrap()) - .await - .map_err(|e| format!("couldn't publish Cosign: {e:?}"))?; - } - txn.commit(); - } - - // If this session signed its slash report, publish its signature - { - let mut txn = self.db.txn(); - if let Some(slash_report_signature) = SlashReportSignature::try_recv(&mut txn, session) { + for session in RegisteredKeys::get(&self.db).unwrap_or(vec![]) { + // Publish the messages generated by this key's signers + loop { + let mut txn = self.db.txn(); + let Some(msg) = CosignerToCoordinatorMessages::try_recv(&mut txn, session) else { + break; + }; iterated = true; self .coordinator - .publish_slash_report_signature( - session, - <_>::decode(&mut slash_report_signature.as_slice()).unwrap(), - ) + .send(msg) .await - .map_err(|e| { - format!("couldn't send slash report signature to the coordinator: {e:?}") - })?; + .map_err(|e| format!("couldn't send sign message to the coordinator: {e:?}"))?; txn.commit(); } - } - } - // Publish the Batches - { - let mut txn = self.db.txn(); - while let Some(batch) = scanner::Batches::try_recv(&mut txn) { - iterated = true; - self - .coordinator - .publish_batch(batch) - .await - .map_err(|e| format!("couldn't publish Batch: {e:?}"))?; - } - txn.commit(); - } + loop { + let mut txn = self.db.txn(); + let Some(msg) = BatchSignerToCoordinatorMessages::try_recv(&mut txn, session) else { + break; + }; + iterated = true; - // Publish the signed Batches - { - let mut txn = self.db.txn(); - // The last acknowledged Batch may exceed the last Batch we published if we didn't sign for - // the prior Batch(es) (and accordingly didn't publish them) - let last_batch = - crate::batch::last_acknowledged_batch(&txn).max(db::LastPublishedBatch::get(&txn)); - let mut next_batch = last_batch.map_or(0, |id| id + 1); - while let Some(batch) = crate::batch::signed_batch(&txn, next_batch) { - iterated = true; - db::LastPublishedBatch::set(&mut txn, &batch.batch.id); - self - .coordinator - .publish_signed_batch(batch) - .await - .map_err(|e| format!("couldn't publish Batch: {e:?}"))?; - next_batch += 1; - } - txn.commit(); - } + self + .coordinator + .send(msg) + .await + .map_err(|e| format!("couldn't send sign message to the coordinator: {e:?}"))?; - Ok(iterated) + txn.commit(); + } + + loop { + let mut txn = self.db.txn(); + let Some(msg) = SlashReportSignerToCoordinatorMessages::try_recv(&mut txn, session) + else { + break; + }; + iterated = true; + + self + .coordinator + .send(msg) + .await + .map_err(|e| format!("couldn't send sign message to the coordinator: {e:?}"))?; + + txn.commit(); + } + + loop { + let mut txn = self.db.txn(); + let Some(msg) = TransactionSignerToCoordinatorMessages::try_recv(&mut txn, session) + else { + break; + }; + iterated = true; + + self + .coordinator + .send(msg) + .await + .map_err(|e| format!("couldn't send sign message to the coordinator: {e:?}"))?; + + txn.commit(); + } + + // Publish the cosigns from this session + { + let mut txn = self.db.txn(); + while let Some(((block_number, block_id), signature)) = + Cosign::try_recv(&mut txn, session) + { + iterated = true; + self + .coordinator + .publish_cosign( + block_number, + block_id, + <_>::decode(&mut signature.as_slice()).unwrap(), + ) + .await + .map_err(|e| format!("couldn't publish Cosign: {e:?}"))?; + } + txn.commit(); + } + + // If this session signed its slash report, publish its signature + { + let mut txn = self.db.txn(); + if let Some(slash_report_signature) = SlashReportSignature::try_recv(&mut txn, session) { + iterated = true; + + self + .coordinator + .publish_slash_report_signature( + session, + <_>::decode(&mut slash_report_signature.as_slice()).unwrap(), + ) + .await + .map_err(|e| { + format!("couldn't send slash report signature to the coordinator: {e:?}") + })?; + + txn.commit(); + } + } + } + + // Publish the Batches + { + let mut txn = self.db.txn(); + while let Some(batch) = scanner::Batches::try_recv(&mut txn) { + iterated = true; + self + .coordinator + .publish_batch(batch) + .await + .map_err(|e| format!("couldn't publish Batch: {e:?}"))?; + } + txn.commit(); + } + + // Publish the signed Batches + { + let mut txn = self.db.txn(); + // The last acknowledged Batch may exceed the last Batch we published if we didn't sign for + // the prior Batch(es) (and accordingly didn't publish them) + let last_batch = + crate::batch::last_acknowledged_batch(&txn).max(db::LastPublishedBatch::get(&txn)); + let mut next_batch = last_batch.map_or(0, |id| id + 1); + while let Some(batch) = crate::batch::signed_batch(&txn, next_batch) { + iterated = true; + db::LastPublishedBatch::set(&mut txn, &batch.batch.id); + self + .coordinator + .publish_signed_batch(batch) + .await + .map_err(|e| format!("couldn't publish Batch: {e:?}"))?; + next_batch += 1; + } + txn.commit(); + } + + Ok(iterated) + } } } diff --git a/processor/signers/src/cosign/mod.rs b/processor/signers/src/cosign/mod.rs index 41db8050..2de18e86 100644 --- a/processor/signers/src/cosign/mod.rs +++ b/processor/signers/src/cosign/mod.rs @@ -1,3 +1,5 @@ +use core::future::Future; + use ciphersuite::Ristretto; use frost::dkg::ThresholdKeys; @@ -48,75 +50,76 @@ impl CosignerTask { } } -#[async_trait::async_trait] impl ContinuallyRan for CosignerTask { - async fn run_iteration(&mut self) -> Result { - let mut iterated = false; + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let mut iterated = false; - // Check the cosign to work on - { - let mut txn = self.db.txn(); - if let Some(cosign) = ToCosign::get(&txn, self.session) { - // If this wasn't already signed for... - if LatestCosigned::get(&txn, self.session) < Some(cosign.0) { - // If this isn't the cosign we're currently working on, meaning it's fresh - if self.current_cosign != Some(cosign) { - // Retire the current cosign - if let Some(current_cosign) = self.current_cosign { - assert!(current_cosign.0 < cosign.0); - self.attempt_manager.retire(&mut txn, VariantSignId::Cosign(current_cosign.0)); - } - - // Set the cosign being worked on - self.current_cosign = Some(cosign); - - let mut machines = Vec::with_capacity(self.keys.len()); - { - let message = cosign_block_msg(cosign.0, cosign.1); - for keys in &self.keys { - machines.push(WrappedSchnorrkelMachine::new(keys.clone(), message.clone())); + // Check the cosign to work on + { + let mut txn = self.db.txn(); + if let Some(cosign) = ToCosign::get(&txn, self.session) { + // If this wasn't already signed for... + if LatestCosigned::get(&txn, self.session) < Some(cosign.0) { + // If this isn't the cosign we're currently working on, meaning it's fresh + if self.current_cosign != Some(cosign) { + // Retire the current cosign + if let Some(current_cosign) = self.current_cosign { + assert!(current_cosign.0 < cosign.0); + self.attempt_manager.retire(&mut txn, VariantSignId::Cosign(current_cosign.0)); } + + // Set the cosign being worked on + self.current_cosign = Some(cosign); + + let mut machines = Vec::with_capacity(self.keys.len()); + { + let message = cosign_block_msg(cosign.0, cosign.1); + for keys in &self.keys { + machines.push(WrappedSchnorrkelMachine::new(keys.clone(), message.clone())); + } + } + for msg in self.attempt_manager.register(VariantSignId::Cosign(cosign.0), machines) { + CosignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + } + + txn.commit(); } - for msg in self.attempt_manager.register(VariantSignId::Cosign(cosign.0), machines) { + } + } + } + + // Handle any messages sent to us + loop { + let mut txn = self.db.txn(); + let Some(msg) = CoordinatorToCosignerMessages::try_recv(&mut txn, self.session) else { + break; + }; + iterated = true; + + match self.attempt_manager.handle(msg) { + Response::Messages(msgs) => { + for msg in msgs { CosignerToCoordinatorMessages::send(&mut txn, self.session, &msg); } + } + Response::Signature { id, signature } => { + let VariantSignId::Cosign(block_number) = id else { + panic!("CosignerTask signed a non-Cosign") + }; + assert_eq!(Some(block_number), self.current_cosign.map(|cosign| cosign.0)); - txn.commit(); + let cosign = self.current_cosign.take().unwrap(); + LatestCosigned::set(&mut txn, self.session, &cosign.0); + // Send the cosign + Cosign::send(&mut txn, self.session, &(cosign, Signature::from(signature).encode())); } } - } - } - // Handle any messages sent to us - loop { - let mut txn = self.db.txn(); - let Some(msg) = CoordinatorToCosignerMessages::try_recv(&mut txn, self.session) else { - break; - }; - iterated = true; - - match self.attempt_manager.handle(msg) { - Response::Messages(msgs) => { - for msg in msgs { - CosignerToCoordinatorMessages::send(&mut txn, self.session, &msg); - } - } - Response::Signature { id, signature } => { - let VariantSignId::Cosign(block_number) = id else { - panic!("CosignerTask signed a non-Cosign") - }; - assert_eq!(Some(block_number), self.current_cosign.map(|cosign| cosign.0)); - - let cosign = self.current_cosign.take().unwrap(); - LatestCosigned::set(&mut txn, self.session, &cosign.0); - // Send the cosign - Cosign::send(&mut txn, self.session, &(cosign, Signature::from(signature).encode())); - } + txn.commit(); } - txn.commit(); + Ok(iterated) } - - Ok(iterated) } } diff --git a/processor/signers/src/lib.rs b/processor/signers/src/lib.rs index e06dd07f..c76fbd32 100644 --- a/processor/signers/src/lib.rs +++ b/processor/signers/src/lib.rs @@ -2,7 +2,7 @@ #![doc = include_str!("../README.md")] #![deny(missing_docs)] -use core::{fmt::Debug, marker::PhantomData}; +use core::{future::Future, fmt::Debug, marker::PhantomData}; use std::collections::HashMap; use zeroize::Zeroizing; @@ -43,7 +43,6 @@ mod transaction; use transaction::TransactionSignerTask; /// A connection to the Coordinator which messages can be published with. -#[async_trait::async_trait] pub trait Coordinator: 'static + Send + Sync { /// An error encountered when interacting with a coordinator. /// @@ -52,32 +51,38 @@ pub trait Coordinator: 'static + Send + Sync { type EphemeralError: Debug; /// Send a `messages::sign::ProcessorMessage`. - async fn send(&mut self, message: ProcessorMessage) -> Result<(), Self::EphemeralError>; + fn send( + &mut self, + message: ProcessorMessage, + ) -> impl Send + Future>; /// Publish a cosign. - async fn publish_cosign( + fn publish_cosign( &mut self, block_number: u64, block_id: [u8; 32], signature: Signature, - ) -> Result<(), Self::EphemeralError>; + ) -> impl Send + Future>; /// Publish a `Batch`. - async fn publish_batch(&mut self, batch: Batch) -> Result<(), Self::EphemeralError>; + fn publish_batch(&mut self, batch: Batch) + -> impl Send + Future>; /// Publish a `SignedBatch`. - async fn publish_signed_batch(&mut self, batch: SignedBatch) -> Result<(), Self::EphemeralError>; + fn publish_signed_batch( + &mut self, + batch: SignedBatch, + ) -> impl Send + Future>; /// Publish a slash report's signature. - async fn publish_slash_report_signature( + fn publish_slash_report_signature( &mut self, session: Session, signature: Signature, - ) -> Result<(), Self::EphemeralError>; + ) -> impl Send + Future>; } /// An object capable of publishing a transaction. -#[async_trait::async_trait] pub trait TransactionPublisher: 'static + Send + Sync + Clone { /// An error encountered when publishing a transaction. /// @@ -92,7 +97,7 @@ pub trait TransactionPublisher: 'static + Send + Sync + Clone { /// /// The transaction already being present in the mempool/on-chain MUST NOT be considered an /// error. - async fn publish(&self, tx: T) -> Result<(), Self::EphemeralError>; + fn publish(&self, tx: T) -> impl Send + Future>; } struct Tasks { diff --git a/processor/signers/src/slash_report.rs b/processor/signers/src/slash_report.rs index 19a2523b..e040798c 100644 --- a/processor/signers/src/slash_report.rs +++ b/processor/signers/src/slash_report.rs @@ -1,4 +1,4 @@ -use core::marker::PhantomData; +use core::{marker::PhantomData, future::Future}; use ciphersuite::Ristretto; use frost::dkg::ThresholdKeys; @@ -51,70 +51,72 @@ impl SlashReportSignerTask { } } -#[async_trait::async_trait] impl ContinuallyRan for SlashReportSignerTask { - async fn run_iteration(&mut self) -> Result { - let mut iterated = false; + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let mut iterated = false; - // Check for the slash report to sign - if !self.has_slash_report { - let mut txn = self.db.txn(); - let Some(slash_report) = SlashReport::try_recv(&mut txn, self.session) else { - return Ok(false); - }; - // We only commit this upon successfully signing this slash report - drop(txn); - iterated = true; + // Check for the slash report to sign + if !self.has_slash_report { + let mut txn = self.db.txn(); + let Some(slash_report) = SlashReport::try_recv(&mut txn, self.session) else { + return Ok(false); + }; + // We only commit this upon successfully signing this slash report + drop(txn); + iterated = true; - self.has_slash_report = true; + self.has_slash_report = true; - let mut machines = Vec::with_capacity(self.keys.len()); - { - let message = report_slashes_message( - &ValidatorSet { network: S::NETWORK, session: self.session }, - &SlashReportStruct(slash_report.try_into().unwrap()), - ); - for keys in &self.keys { - machines.push(WrappedSchnorrkelMachine::new(keys.clone(), message.clone())); - } - } - let mut txn = self.db.txn(); - for msg in self.attempt_manager.register(VariantSignId::SlashReport(self.session), machines) { - SlashReportSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); - } - txn.commit(); - } - - // Handle any messages sent to us - loop { - let mut txn = self.db.txn(); - let Some(msg) = CoordinatorToSlashReportSignerMessages::try_recv(&mut txn, self.session) - else { - break; - }; - iterated = true; - - match self.attempt_manager.handle(msg) { - Response::Messages(msgs) => { - for msg in msgs { - SlashReportSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + let mut machines = Vec::with_capacity(self.keys.len()); + { + let message = report_slashes_message( + &ValidatorSet { network: S::NETWORK, session: self.session }, + &SlashReportStruct(slash_report.try_into().unwrap()), + ); + for keys in &self.keys { + machines.push(WrappedSchnorrkelMachine::new(keys.clone(), message.clone())); } } - Response::Signature { id, signature } => { - let VariantSignId::SlashReport(session) = id else { - panic!("SlashReportSignerTask signed a non-SlashReport") - }; - assert_eq!(session, self.session); - // Drain the channel - SlashReport::try_recv(&mut txn, self.session).unwrap(); - // Send the signature - SlashReportSignature::send(&mut txn, session, &Signature::from(signature).encode()); + let mut txn = self.db.txn(); + for msg in self.attempt_manager.register(VariantSignId::SlashReport(self.session), machines) + { + SlashReportSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); } + txn.commit(); } - txn.commit(); - } + // Handle any messages sent to us + loop { + let mut txn = self.db.txn(); + let Some(msg) = CoordinatorToSlashReportSignerMessages::try_recv(&mut txn, self.session) + else { + break; + }; + iterated = true; - Ok(iterated) + match self.attempt_manager.handle(msg) { + Response::Messages(msgs) => { + for msg in msgs { + SlashReportSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + } + } + Response::Signature { id, signature } => { + let VariantSignId::SlashReport(session) = id else { + panic!("SlashReportSignerTask signed a non-SlashReport") + }; + assert_eq!(session, self.session); + // Drain the channel + SlashReport::try_recv(&mut txn, self.session).unwrap(); + // Send the signature + SlashReportSignature::send(&mut txn, session, &Signature::from(signature).encode()); + } + } + + txn.commit(); + } + + Ok(iterated) + } } } diff --git a/processor/signers/src/transaction/mod.rs b/processor/signers/src/transaction/mod.rs index b9b62e75..f089e931 100644 --- a/processor/signers/src/transaction/mod.rs +++ b/processor/signers/src/transaction/mod.rs @@ -1,3 +1,4 @@ +use core::future::Future; use std::{ collections::HashSet, time::{Duration, Instant}, @@ -88,11 +89,10 @@ impl> } } -#[async_trait::async_trait] impl>> ContinuallyRan for TransactionSignerTask { - async fn run_iteration(&mut self) -> Result { + fn run_iteration(&mut self) -> impl Send + Future> {async{ let mut iterated = false; // Check for new transactions to sign @@ -233,3 +233,4 @@ impl> Ok(iterated) } } +} diff --git a/processor/src/tests/scanner.rs b/processor/src/tests/scanner.rs index 6421c499..a40e465c 100644 --- a/processor/src/tests/scanner.rs +++ b/processor/src/tests/scanner.rs @@ -71,7 +71,7 @@ pub async fn test_scanner( let block_id = block.id(); // Verify the Scanner picked them up - let verify_event = |mut scanner: ScannerHandle| async { + let verify_event = |mut scanner: ScannerHandle| async move { let outputs = match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { ScannerEvent::Block { is_retirement_block, block, outputs } => {