consensus: use a custom spawn function when using rayon in async code.

Before we were using tokio's `spawn_blocking`, which wasn't ideal as this put tasks in a queue to be put on rayon's pool.

Instead, now we create an oneshot and use rayon::spawn.
This commit is contained in:
Boog900 2023-11-07 23:52:56 +00:00
parent b49878ac6b
commit 9471ca5d6a
No known key found for this signature in database
GPG key ID: 5401367FB7302004
7 changed files with 94 additions and 54 deletions

View file

@ -124,7 +124,7 @@ where
async fn scan_chain<D>( async fn scan_chain<D>(
cache: Arc<RwLock<ScanningCache>>, cache: Arc<RwLock<ScanningCache>>,
_save_file: PathBuf, save_file: PathBuf,
_rpc_config: Arc<RwLock<RpcConfig>>, _rpc_config: Arc<RwLock<RpcConfig>>,
database: D, database: D,
) -> Result<(), tower::BoxError> ) -> Result<(), tower::BoxError>
@ -171,6 +171,11 @@ where
tracing::info!("verified block: {}", verified_block_info.height); tracing::info!("verified block: {}", verified_block_info.height);
if verified_block_info.height % 5000 == 0 {
tracing::info!("saving cache to: {}", save_file.display());
cache.write().unwrap().save(&save_file).unwrap();
}
update_cache_and_context(&cache, &mut context_updater, verified_block_info).await?; update_cache_and_context(&cache, &mut context_updater, verified_block_info).await?;
} }
} }

View file

@ -297,10 +297,12 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
next_difficulty: difficulty_cache.next_difficulty(&current_hf), next_difficulty: difficulty_cache.next_difficulty(&current_hf),
cumulative_difficulty: difficulty_cache.cumulative_difficulty(), cumulative_difficulty: difficulty_cache.cumulative_difficulty(),
effective_median_weight: weight_cache effective_median_weight: weight_cache
.effective_median_block_weight(&current_hf), .effective_median_block_weight(&current_hf)
median_long_term_weight: weight_cache.median_long_term_weight(), .await,
median_long_term_weight: weight_cache.median_long_term_weight().await,
median_weight_for_block_reward: weight_cache median_weight_for_block_reward: weight_cache
.median_for_block_reward(&current_hf), .median_for_block_reward(&current_hf)
.await,
already_generated_coins: *already_generated_coins, already_generated_coins: *already_generated_coins,
top_block_timestamp: difficulty_cache.top_block_timestamp(), top_block_timestamp: difficulty_cache.top_block_timestamp(),
median_block_timestamp: difficulty_cache.median_timestamp( median_block_timestamp: difficulty_cache.median_timestamp(

View file

@ -17,7 +17,8 @@ use tower::ServiceExt;
use tracing::instrument; use tracing::instrument;
use crate::{ use crate::{
helper::median, ConsensusError, Database, DatabaseRequest, DatabaseResponse, HardFork, helper::{median, rayon_spawn_async},
ConsensusError, Database, DatabaseRequest, DatabaseResponse, HardFork,
}; };
#[cfg(test)] #[cfg(test)]
@ -144,9 +145,16 @@ impl BlockWeightsCache {
} }
/// Returns the median long term weight over the last [`LONG_TERM_WINDOW`] blocks, or custom amount of blocks in the config. /// Returns the median long term weight over the last [`LONG_TERM_WINDOW`] blocks, or custom amount of blocks in the config.
pub fn median_long_term_weight(&self) -> usize { pub async fn median_long_term_weight(&self) -> usize {
let mut sorted_long_term_weights: Vec<usize> = self.long_term_weights.clone().into(); let mut sorted_long_term_weights: Vec<usize> = self.long_term_weights.clone().into();
sorted_long_term_weights.par_sort_unstable();
// Move this out of the async runtime as this can take a bit.
let sorted_long_term_weights = rayon_spawn_async(|| {
sorted_long_term_weights.par_sort_unstable();
sorted_long_term_weights
})
.await;
median(&sorted_long_term_weights) median(&sorted_long_term_weights)
} }
@ -161,22 +169,22 @@ impl BlockWeightsCache {
/// the block weight limit. /// the block weight limit.
/// ///
/// See: https://cuprate.github.io/monero-book/consensus_rules/blocks/weight_limit.html#calculating-effective-median-weight /// See: https://cuprate.github.io/monero-book/consensus_rules/blocks/weight_limit.html#calculating-effective-median-weight
pub fn effective_median_block_weight(&self, hf: &HardFork) -> usize { pub async fn effective_median_block_weight(&self, hf: &HardFork) -> usize {
calculate_effective_median_block_weight( calculate_effective_median_block_weight(
hf, hf,
self.median_short_term_weight(), self.median_short_term_weight(),
self.median_long_term_weight(), self.median_long_term_weight().await,
) )
} }
/// Returns the median weight used to calculate block reward punishment. /// Returns the median weight used to calculate block reward punishment.
/// ///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks/reward.html#calculating-block-reward /// https://cuprate.github.io/monero-book/consensus_rules/blocks/reward.html#calculating-block-reward
pub fn median_for_block_reward(&self, hf: &HardFork) -> usize { pub async fn median_for_block_reward(&self, hf: &HardFork) -> usize {
if hf.in_range(&HardFork::V1, &HardFork::V12) { if hf.in_range(&HardFork::V1, &HardFork::V12) {
self.median_short_term_weight() self.median_short_term_weight()
} else { } else {
self.effective_median_block_weight(hf) self.effective_median_block_weight(hf).await
} }
.max(penalty_free_zone(hf)) .max(penalty_free_zone(hf))
} }

View file

@ -5,6 +5,19 @@ use std::{
use curve25519_dalek::edwards::CompressedEdwardsY; use curve25519_dalek::edwards::CompressedEdwardsY;
/// Spawns a task for the rayon thread pool and awaits the result without blocking the async runtime.
pub(crate) async fn rayon_spawn_async<F, R>(f: F) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
rayon::spawn(|| {
let _ = tx.send(f());
});
rx.await.expect("The sender must not be dropped")
}
pub(crate) fn get_mid<T>(a: T, b: T) -> T pub(crate) fn get_mid<T>(a: T, b: T) -> T
where where
T: Add<Output = T> + Sub<Output = T> + Div<Output = T> + Mul<Output = T> + Copy + From<u8>, T: Add<Output = T> + Sub<Output = T> + Div<Output = T> + Mul<Output = T> + Copy + From<u8>,

View file

@ -25,7 +25,10 @@ use tracing::{instrument, Instrument};
use cuprate_common::BlockID; use cuprate_common::BlockID;
use monero_wire::common::{BlockCompleteEntry, TransactionBlobs}; use monero_wire::common::{BlockCompleteEntry, TransactionBlobs};
use crate::{DatabaseRequest, DatabaseResponse, ExtendedBlockHeader, HardFork, OutputOnChain}; use crate::{
helper::rayon_spawn_async, DatabaseRequest, DatabaseResponse, ExtendedBlockHeader, HardFork,
OutputOnChain,
};
pub mod cache; pub mod cache;
mod discover; mod discover;
@ -503,25 +506,30 @@ async fn get_blocks_in_range<R: RpcConnection>(
let blocks: Response = monero_epee_bin_serde::from_bytes(res)?; let blocks: Response = monero_epee_bin_serde::from_bytes(res)?;
Ok(DatabaseResponse::BlockBatchInRange( Ok(DatabaseResponse::BlockBatchInRange(
blocks rayon_spawn_async(|| {
.blocks blocks
.into_par_iter() .blocks
.map(|b| { .into_par_iter()
Ok(( .map(|b| {
monero_serai::block::Block::read(&mut b.block.as_slice())?, Ok((
match b.txs { monero_serai::block::Block::read(&mut b.block.as_slice())?,
TransactionBlobs::Pruned(_) => return Err("node sent pruned txs!".into()), match b.txs {
TransactionBlobs::Normal(txs) => txs TransactionBlobs::Pruned(_) => {
.into_par_iter() return Err("node sent pruned txs!".into())
.map(|tx| { }
monero_serai::transaction::Transaction::read(&mut tx.as_slice()) TransactionBlobs::Normal(txs) => txs
}) .into_par_iter()
.collect::<Result<_, _>>()?, .map(|tx| {
TransactionBlobs::None => vec![], monero_serai::transaction::Transaction::read(&mut tx.as_slice())
}, })
)) .collect::<Result<_, _>>()?,
}) TransactionBlobs::None => vec![],
.collect::<Result<_, tower::BoxError>>()?, },
))
})
.collect::<Result<_, tower::BoxError>>()
})
.await?,
)) ))
} }
@ -556,21 +564,24 @@ async fn get_block_info_in_range<R: RpcConnection>(
tracing::info!("Retrieved block headers in range: {:?}", range); tracing::info!("Retrieved block headers in range: {:?}", range);
Ok(DatabaseResponse::BlockExtendedHeaderInRange( Ok(DatabaseResponse::BlockExtendedHeaderInRange(
res.headers rayon_spawn_async(|| {
.into_iter() res.headers
.map(|info| ExtendedBlockHeader { .into_iter()
version: HardFork::from_version(&info.major_version) .map(|info| ExtendedBlockHeader {
.expect("previously checked block has incorrect version"), version: HardFork::from_version(&info.major_version)
vote: HardFork::from_vote(&info.minor_version), .expect("previously checked block has incorrect version"),
timestamp: info.timestamp, vote: HardFork::from_vote(&info.minor_version),
cumulative_difficulty: u128_from_low_high( timestamp: info.timestamp,
info.cumulative_difficulty, cumulative_difficulty: u128_from_low_high(
info.cumulative_difficulty_top64, info.cumulative_difficulty,
), info.cumulative_difficulty_top64,
block_weight: info.block_weight, ),
long_term_weight: info.long_term_weight, block_weight: info.block_weight,
}) long_term_weight: info.long_term_weight,
.collect(), })
.collect()
})
.await,
)) ))
} }

View file

@ -82,8 +82,9 @@ impl ScanningCache {
self.height += 1; self.height += 1;
} }
/// Returns true if any kis are included in our spent set.
pub fn are_kis_spent(&self, kis: HashSet<[u8; 32]>) -> bool { pub fn are_kis_spent(&self, kis: HashSet<[u8; 32]>) -> bool {
self.kis.is_disjoint(&kis) !self.kis.is_disjoint(&kis)
} }
pub fn outputs_time_lock(&self, tx: &[u8; 32]) -> Timelock { pub fn outputs_time_lock(&self, tx: &[u8; 32]) -> Timelock {

View file

@ -14,7 +14,8 @@ use tower::{Service, ServiceExt};
use tracing::instrument; use tracing::instrument;
use crate::{ use crate::{
context::ReOrgToken, ConsensusError, Database, DatabaseRequest, DatabaseResponse, HardFork, context::ReOrgToken, helper::rayon_spawn_async, ConsensusError, Database, DatabaseRequest,
DatabaseResponse, HardFork,
}; };
mod contextual_data; mod contextual_data;
@ -158,13 +159,12 @@ where
D: Database + Clone + Sync + Send + 'static, D: Database + Clone + Sync + Send + 'static,
{ {
// Move out of the async runtime and use rayon to parallelize the serialisation and hashing of the txs. // Move out of the async runtime and use rayon to parallelize the serialisation and hashing of the txs.
let txs = tokio::task::spawn_blocking(|| { let txs = rayon_spawn_async(|| {
txs.into_par_iter() txs.into_par_iter()
.map(|tx| Ok(Arc::new(TransactionVerificationData::new(tx)?))) .map(|tx| Ok(Arc::new(TransactionVerificationData::new(tx)?)))
.collect::<Result<Vec<_>, ConsensusError>>() .collect::<Result<Vec<_>, ConsensusError>>()
}) })
.await .await?;
.unwrap()?;
contextual_data::batch_fill_ring_member_info(&txs, &hf, re_org_token, database).await?; contextual_data::batch_fill_ring_member_info(&txs, &hf, re_org_token, database).await?;
@ -191,7 +191,8 @@ where
let spent_kis = Arc::new(std::sync::Mutex::new(HashSet::new())); let spent_kis = Arc::new(std::sync::Mutex::new(HashSet::new()));
let cloned_spent_kis = spent_kis.clone(); let cloned_spent_kis = spent_kis.clone();
tokio::task::spawn_blocking(move || {
rayon_spawn_async(move || {
txs.par_iter().try_for_each(|tx| { txs.par_iter().try_for_each(|tx| {
verify_transaction_for_block( verify_transaction_for_block(
tx, tx,
@ -202,8 +203,7 @@ where
) )
}) })
}) })
.await .await?;
.unwrap()?;
let DatabaseResponse::CheckKIsNotSpent(kis_spent) = database let DatabaseResponse::CheckKIsNotSpent(kis_spent) = database
.oneshot(DatabaseRequest::CheckKIsNotSpent( .oneshot(DatabaseRequest::CheckKIsNotSpent(