From 9471ca5d6a84cb41e4973003529ce66048dda32d Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Tue, 7 Nov 2023 23:52:56 +0000 Subject: [PATCH] consensus: use a custom spawn function when using rayon in async code. Before we were using tokio's `spawn_blocking`, which wasn't ideal as this put tasks in a queue to be put on rayon's pool. Instead, now we create an oneshot and use rayon::spawn. --- consensus/src/bin/scan_chain.rs | 7 ++- consensus/src/context.rs | 8 ++-- consensus/src/context/weight.rs | 22 ++++++--- consensus/src/helper.rs | 13 ++++++ consensus/src/rpc.rs | 81 +++++++++++++++++++-------------- consensus/src/rpc/cache.rs | 3 +- consensus/src/transactions.rs | 14 +++--- 7 files changed, 94 insertions(+), 54 deletions(-) diff --git a/consensus/src/bin/scan_chain.rs b/consensus/src/bin/scan_chain.rs index 01c5ebb8..2a12097b 100644 --- a/consensus/src/bin/scan_chain.rs +++ b/consensus/src/bin/scan_chain.rs @@ -124,7 +124,7 @@ where async fn scan_chain( cache: Arc>, - _save_file: PathBuf, + save_file: PathBuf, _rpc_config: Arc>, database: D, ) -> Result<(), tower::BoxError> @@ -171,6 +171,11 @@ where tracing::info!("verified block: {}", verified_block_info.height); + if verified_block_info.height % 5000 == 0 { + tracing::info!("saving cache to: {}", save_file.display()); + cache.write().unwrap().save(&save_file).unwrap(); + } + update_cache_and_context(&cache, &mut context_updater, verified_block_info).await?; } } diff --git a/consensus/src/context.rs b/consensus/src/context.rs index aa588829..437153e5 100644 --- a/consensus/src/context.rs +++ b/consensus/src/context.rs @@ -297,10 +297,12 @@ impl Service for BlockChainContextService { next_difficulty: difficulty_cache.next_difficulty(¤t_hf), cumulative_difficulty: difficulty_cache.cumulative_difficulty(), effective_median_weight: weight_cache - .effective_median_block_weight(¤t_hf), - median_long_term_weight: weight_cache.median_long_term_weight(), + .effective_median_block_weight(¤t_hf) + .await, + median_long_term_weight: weight_cache.median_long_term_weight().await, median_weight_for_block_reward: weight_cache - .median_for_block_reward(¤t_hf), + .median_for_block_reward(¤t_hf) + .await, already_generated_coins: *already_generated_coins, top_block_timestamp: difficulty_cache.top_block_timestamp(), median_block_timestamp: difficulty_cache.median_timestamp( diff --git a/consensus/src/context/weight.rs b/consensus/src/context/weight.rs index a4c46015..1c0cc6ae 100644 --- a/consensus/src/context/weight.rs +++ b/consensus/src/context/weight.rs @@ -17,7 +17,8 @@ use tower::ServiceExt; use tracing::instrument; use crate::{ - helper::median, ConsensusError, Database, DatabaseRequest, DatabaseResponse, HardFork, + helper::{median, rayon_spawn_async}, + ConsensusError, Database, DatabaseRequest, DatabaseResponse, HardFork, }; #[cfg(test)] @@ -144,9 +145,16 @@ impl BlockWeightsCache { } /// Returns the median long term weight over the last [`LONG_TERM_WINDOW`] blocks, or custom amount of blocks in the config. - pub fn median_long_term_weight(&self) -> usize { + pub async fn median_long_term_weight(&self) -> usize { let mut sorted_long_term_weights: Vec = self.long_term_weights.clone().into(); - sorted_long_term_weights.par_sort_unstable(); + + // Move this out of the async runtime as this can take a bit. + let sorted_long_term_weights = rayon_spawn_async(|| { + sorted_long_term_weights.par_sort_unstable(); + sorted_long_term_weights + }) + .await; + median(&sorted_long_term_weights) } @@ -161,22 +169,22 @@ impl BlockWeightsCache { /// the block weight limit. /// /// See: https://cuprate.github.io/monero-book/consensus_rules/blocks/weight_limit.html#calculating-effective-median-weight - pub fn effective_median_block_weight(&self, hf: &HardFork) -> usize { + pub async fn effective_median_block_weight(&self, hf: &HardFork) -> usize { calculate_effective_median_block_weight( hf, self.median_short_term_weight(), - self.median_long_term_weight(), + self.median_long_term_weight().await, ) } /// Returns the median weight used to calculate block reward punishment. /// /// https://cuprate.github.io/monero-book/consensus_rules/blocks/reward.html#calculating-block-reward - pub fn median_for_block_reward(&self, hf: &HardFork) -> usize { + pub async fn median_for_block_reward(&self, hf: &HardFork) -> usize { if hf.in_range(&HardFork::V1, &HardFork::V12) { self.median_short_term_weight() } else { - self.effective_median_block_weight(hf) + self.effective_median_block_weight(hf).await } .max(penalty_free_zone(hf)) } diff --git a/consensus/src/helper.rs b/consensus/src/helper.rs index 614bed63..ca7ba0af 100644 --- a/consensus/src/helper.rs +++ b/consensus/src/helper.rs @@ -5,6 +5,19 @@ use std::{ use curve25519_dalek::edwards::CompressedEdwardsY; +/// Spawns a task for the rayon thread pool and awaits the result without blocking the async runtime. +pub(crate) async fn rayon_spawn_async(f: F) -> R +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + let (tx, rx) = tokio::sync::oneshot::channel(); + rayon::spawn(|| { + let _ = tx.send(f()); + }); + rx.await.expect("The sender must not be dropped") +} + pub(crate) fn get_mid(a: T, b: T) -> T where T: Add + Sub + Div + Mul + Copy + From, diff --git a/consensus/src/rpc.rs b/consensus/src/rpc.rs index ba00a501..97f0ec8c 100644 --- a/consensus/src/rpc.rs +++ b/consensus/src/rpc.rs @@ -25,7 +25,10 @@ use tracing::{instrument, Instrument}; use cuprate_common::BlockID; use monero_wire::common::{BlockCompleteEntry, TransactionBlobs}; -use crate::{DatabaseRequest, DatabaseResponse, ExtendedBlockHeader, HardFork, OutputOnChain}; +use crate::{ + helper::rayon_spawn_async, DatabaseRequest, DatabaseResponse, ExtendedBlockHeader, HardFork, + OutputOnChain, +}; pub mod cache; mod discover; @@ -503,25 +506,30 @@ async fn get_blocks_in_range( let blocks: Response = monero_epee_bin_serde::from_bytes(res)?; Ok(DatabaseResponse::BlockBatchInRange( - blocks - .blocks - .into_par_iter() - .map(|b| { - Ok(( - monero_serai::block::Block::read(&mut b.block.as_slice())?, - match b.txs { - TransactionBlobs::Pruned(_) => return Err("node sent pruned txs!".into()), - TransactionBlobs::Normal(txs) => txs - .into_par_iter() - .map(|tx| { - monero_serai::transaction::Transaction::read(&mut tx.as_slice()) - }) - .collect::>()?, - TransactionBlobs::None => vec![], - }, - )) - }) - .collect::>()?, + rayon_spawn_async(|| { + blocks + .blocks + .into_par_iter() + .map(|b| { + Ok(( + monero_serai::block::Block::read(&mut b.block.as_slice())?, + match b.txs { + TransactionBlobs::Pruned(_) => { + return Err("node sent pruned txs!".into()) + } + TransactionBlobs::Normal(txs) => txs + .into_par_iter() + .map(|tx| { + monero_serai::transaction::Transaction::read(&mut tx.as_slice()) + }) + .collect::>()?, + TransactionBlobs::None => vec![], + }, + )) + }) + .collect::>() + }) + .await?, )) } @@ -556,21 +564,24 @@ async fn get_block_info_in_range( tracing::info!("Retrieved block headers in range: {:?}", range); Ok(DatabaseResponse::BlockExtendedHeaderInRange( - res.headers - .into_iter() - .map(|info| ExtendedBlockHeader { - version: HardFork::from_version(&info.major_version) - .expect("previously checked block has incorrect version"), - vote: HardFork::from_vote(&info.minor_version), - timestamp: info.timestamp, - cumulative_difficulty: u128_from_low_high( - info.cumulative_difficulty, - info.cumulative_difficulty_top64, - ), - block_weight: info.block_weight, - long_term_weight: info.long_term_weight, - }) - .collect(), + rayon_spawn_async(|| { + res.headers + .into_iter() + .map(|info| ExtendedBlockHeader { + version: HardFork::from_version(&info.major_version) + .expect("previously checked block has incorrect version"), + vote: HardFork::from_vote(&info.minor_version), + timestamp: info.timestamp, + cumulative_difficulty: u128_from_low_high( + info.cumulative_difficulty, + info.cumulative_difficulty_top64, + ), + block_weight: info.block_weight, + long_term_weight: info.long_term_weight, + }) + .collect() + }) + .await, )) } diff --git a/consensus/src/rpc/cache.rs b/consensus/src/rpc/cache.rs index 2856f6c6..2c3366c9 100644 --- a/consensus/src/rpc/cache.rs +++ b/consensus/src/rpc/cache.rs @@ -82,8 +82,9 @@ impl ScanningCache { self.height += 1; } + /// Returns true if any kis are included in our spent set. pub fn are_kis_spent(&self, kis: HashSet<[u8; 32]>) -> bool { - self.kis.is_disjoint(&kis) + !self.kis.is_disjoint(&kis) } pub fn outputs_time_lock(&self, tx: &[u8; 32]) -> Timelock { diff --git a/consensus/src/transactions.rs b/consensus/src/transactions.rs index ea18ac9f..9c9cfb05 100644 --- a/consensus/src/transactions.rs +++ b/consensus/src/transactions.rs @@ -14,7 +14,8 @@ use tower::{Service, ServiceExt}; use tracing::instrument; use crate::{ - context::ReOrgToken, ConsensusError, Database, DatabaseRequest, DatabaseResponse, HardFork, + context::ReOrgToken, helper::rayon_spawn_async, ConsensusError, Database, DatabaseRequest, + DatabaseResponse, HardFork, }; mod contextual_data; @@ -158,13 +159,12 @@ where D: Database + Clone + Sync + Send + 'static, { // Move out of the async runtime and use rayon to parallelize the serialisation and hashing of the txs. - let txs = tokio::task::spawn_blocking(|| { + let txs = rayon_spawn_async(|| { txs.into_par_iter() .map(|tx| Ok(Arc::new(TransactionVerificationData::new(tx)?))) .collect::, ConsensusError>>() }) - .await - .unwrap()?; + .await?; contextual_data::batch_fill_ring_member_info(&txs, &hf, re_org_token, database).await?; @@ -191,7 +191,8 @@ where let spent_kis = Arc::new(std::sync::Mutex::new(HashSet::new())); let cloned_spent_kis = spent_kis.clone(); - tokio::task::spawn_blocking(move || { + + rayon_spawn_async(move || { txs.par_iter().try_for_each(|tx| { verify_transaction_for_block( tx, @@ -202,8 +203,7 @@ where ) }) }) - .await - .unwrap()?; + .await?; let DatabaseResponse::CheckKIsNotSpent(kis_spent) = database .oneshot(DatabaseRequest::CheckKIsNotSpent(