From f3d96ca2cec42e7af5e9aa1c681367247eaf6ffd Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sat, 2 Dec 2023 22:57:34 +0000 Subject: [PATCH] move more tasks to rayon and change some returned futures to be explicit instead of Boxed --- common/Cargo.toml | 4 +- common/src/lib.rs | 1 + common/src/tower_utils.rs | 49 ++++++++ consensus/Cargo.toml | 6 +- consensus/src/bin/scan_chain.rs | 69 ++++++----- consensus/src/bin/tx_pool.rs | 51 +++++--- consensus/src/block.rs | 56 +++++---- consensus/src/context.rs | 213 +++++++++++++------------------- consensus/src/context/tests.rs | 44 ++++--- consensus/src/lib.rs | 6 +- consensus/src/rpc.rs | 85 ++++++------- consensus/src/rpc/cache.rs | 3 +- consensus/src/rpc/connection.rs | 68 +++++----- consensus/src/rpc/discover.rs | 15 ++- 14 files changed, 361 insertions(+), 309 deletions(-) create mode 100644 common/src/tower_utils.rs diff --git a/common/Cargo.toml b/common/Cargo.toml index f9c2bb9..919b36d 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -9,4 +9,6 @@ authors = ["Boog900"] [dependencies] chrono = "0.4.24" thiserror = "1" -hex = "0.4" \ No newline at end of file +hex = "0.4" + +futures = "0.3.29" \ No newline at end of file diff --git a/common/src/lib.rs b/common/src/lib.rs index 4bac38b..79bdcfd 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -1,6 +1,7 @@ //pub mod hardforks; pub mod network; pub mod pruning; +pub mod tower_utils; use std::fmt::Formatter; //pub use hardforks::HardForks; diff --git a/common/src/tower_utils.rs b/common/src/tower_utils.rs new file mode 100644 index 0000000..0b6825b --- /dev/null +++ b/common/src/tower_utils.rs @@ -0,0 +1,49 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::channel::oneshot; +use futures::FutureExt; + +/// A oneshot that doesn't return an Error. This requires the sender to always +/// return a response. +pub struct InfallibleOneshotReceiver(oneshot::Receiver); + +impl From> for InfallibleOneshotReceiver { + fn from(value: oneshot::Receiver) -> Self { + InfallibleOneshotReceiver(value) + } +} + +impl Future for InfallibleOneshotReceiver { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.0 + .poll_unpin(cx) + .map(|res| res.expect("Oneshot must not be cancelled before response!")) + } +} + +/// A future that is ready straight away. +pub struct InstaFuture(Option); + +impl From for InstaFuture { + fn from(value: T) -> Self { + InstaFuture(Some(value)) + } +} + +impl Future for InstaFuture { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { + Poll::Ready( + self.0 + .take() + .expect("Can't call future twice after Poll::Ready"), + ) + } +} diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index dce5ce8..935b70b 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -36,9 +36,9 @@ crypto-bigint = "0.5" curve25519-dalek = "4" randomx-rs = "1" -monero-serai = {git="https://github.com/serai-dex/serai.git", rev = "c328e5e"} -multiexp = {git="https://github.com/serai-dex/serai.git", rev = "c328e5e"} -dalek-ff-group = {git="https://github.com/serai-dex/serai.git", rev = "c328e5e"} +monero-serai = {git="https://github.com/cuprate/serai.git", rev = "4a5d860"} +multiexp = {git="https://github.com/cuprate/serai.git", rev = "4a5d860"} +dalek-ff-group = {git="https://github.com/cuprate/serai.git", rev = "4a5d860"} cuprate-common = {path = "../common"} cryptonight-cuprate = {path = "../cryptonight"} diff --git a/consensus/src/bin/scan_chain.rs b/consensus/src/bin/scan_chain.rs index 6e05c05..68319da 100644 --- a/consensus/src/bin/scan_chain.rs +++ b/consensus/src/bin/scan_chain.rs @@ -1,21 +1,23 @@ #![cfg(feature = "binaries")] -use std::{ - ops::Range, - path::PathBuf, - sync::{Arc, RwLock}, -}; +use std::{ops::Range, path::PathBuf, sync::Arc}; -use futures::{channel::mpsc, SinkExt, StreamExt}; +use futures::{ + channel::{mpsc, oneshot}, + SinkExt, StreamExt, +}; use monero_serai::block::Block; -use tokio::sync::oneshot; +use tokio::sync::RwLock; use tower::{Service, ServiceExt}; use tracing::level_filters::LevelFilter; use cuprate_common::Network; use monero_consensus::{ - context::{ContextConfig, UpdateBlockchainCacheRequest}, + context::{ + BlockChainContextRequest, BlockChainContextResponse, ContextConfig, + UpdateBlockchainCacheData, + }, initialize_blockchain_context, initialize_verifier, rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig}, Database, DatabaseRequest, DatabaseResponse, VerifiedBlockInformation, VerifyBlockRequest, @@ -24,8 +26,8 @@ use monero_consensus::{ mod tx_pool; -const MAX_BLOCKS_IN_RANGE: u64 = 500; -const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 500; +const MAX_BLOCKS_IN_RANGE: u64 = 1000; +const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 1000; /// Calls for a batch of blocks, returning the response and the time it took. async fn call_batch( @@ -43,10 +45,14 @@ async fn update_cache_and_context( verified_block_info: VerifiedBlockInformation, ) -> Result<(), tower::BoxError> where - Ctx: tower::Service, + Ctx: tower::Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + >, { // add the new block to the cache - cache.write().unwrap().add_new_block_data( + cache.write().await.add_new_block_data( verified_block_info.generated_coins, &verified_block_info.block.miner_tx, &verified_block_info.txs, @@ -55,16 +61,18 @@ where context_updater .ready() .await? - .call(UpdateBlockchainCacheRequest { - new_top_hash: verified_block_info.block_hash, - height: verified_block_info.height, - timestamp: verified_block_info.block.header.timestamp, - weight: verified_block_info.weight, - long_term_weight: verified_block_info.long_term_weight, - vote: verified_block_info.hf_vote, - generated_coins: verified_block_info.generated_coins, - cumulative_difficulty: verified_block_info.cumulative_difficulty, - }) + .call(BlockChainContextRequest::Update( + UpdateBlockchainCacheData { + new_top_hash: verified_block_info.block_hash, + height: verified_block_info.height, + timestamp: verified_block_info.block.header.timestamp, + weight: verified_block_info.weight, + long_term_weight: verified_block_info.long_term_weight, + vote: verified_block_info.hf_vote, + generated_coins: verified_block_info.generated_coins, + cumulative_difficulty: verified_block_info.cumulative_difficulty, + }, + )) .await?; Ok(()) @@ -126,7 +134,7 @@ where async fn scan_chain( cache: Arc>, save_file: PathBuf, - _rpc_config: Arc>, + _rpc_config: Arc>, database: D, ) -> Result<(), tower::BoxError> where @@ -142,19 +150,18 @@ where let config = ContextConfig::main_net(); - let (ctx_svc, mut context_updater) = - initialize_blockchain_context(config, database.clone()).await?; + let mut ctx_svc = initialize_blockchain_context(config, database.clone()).await?; - let (tx, rx) = tokio::sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); let (tx_pool_svc, new_tx_chan) = tx_pool::TxPool::spawn(rx, ctx_svc.clone()).await?; let (mut block_verifier, transaction_verifier) = - initialize_verifier(database.clone(), tx_pool_svc, ctx_svc).await?; + initialize_verifier(database.clone(), tx_pool_svc, ctx_svc.clone()).await?; tx.send(transaction_verifier).map_err(|_| "").unwrap(); - let start_height = cache.read().unwrap().height; + let start_height = cache.read().await.height; let (block_tx, mut incoming_blocks) = mpsc::channel(3); @@ -207,10 +214,10 @@ where if verified_block_info.height % 5000 == 0 { tracing::info!("saving cache to: {}", save_file.display()); - cache.write().unwrap().save(&save_file).unwrap(); + cache.write().await.save(&save_file).unwrap(); } - update_cache_and_context(&cache, &mut context_updater, verified_block_info).await?; + update_cache_and_context(&cache, &mut ctx_svc, verified_block_info).await?; } tracing::info!( @@ -267,7 +274,7 @@ async fn main() { ]; let rpc_config = RpcConfig::new(MAX_BLOCKS_IN_RANGE, MAX_BLOCKS_HEADERS_IN_RANGE); - let rpc_config = Arc::new(RwLock::new(rpc_config)); + let rpc_config = Arc::new(std::sync::RwLock::new(rpc_config)); tracing::info!("Attempting to open cache at: {}", file_for_cache.display()); let cache = match ScanningCache::load(&file_for_cache) { diff --git a/consensus/src/bin/tx_pool.rs b/consensus/src/bin/tx_pool.rs index dbcdd28..516918a 100644 --- a/consensus/src/bin/tx_pool.rs +++ b/consensus/src/bin/tx_pool.rs @@ -2,19 +2,24 @@ use std::{ collections::HashMap, - future::Future, - pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll}, }; -use futures::{channel::mpsc, FutureExt, StreamExt}; +use futures::{ + channel::{mpsc, oneshot}, + StreamExt, +}; use monero_serai::transaction::Transaction; -use tokio::sync::oneshot; use tower::{Service, ServiceExt}; +use cuprate_common::tower_utils::InfallibleOneshotReceiver; + use monero_consensus::{ - context::{BlockChainContext, BlockChainContextRequest, RawBlockChainContext}, + context::{ + BlockChainContext, BlockChainContextRequest, BlockChainContextResponse, + RawBlockChainContext, + }, transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse}, ConsensusError, TxNotInPool, TxPoolRequest, TxPoolResponse, }; @@ -31,8 +36,7 @@ pub struct TxPoolHandle { impl tower::Service for TxPoolHandle { type Response = TxPoolResponse; type Error = TxNotInPool; - type Future = - Pin> + Send + 'static>>; + type Future = InfallibleOneshotReceiver>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { if self.tx_pool_task.is_finished() { @@ -50,11 +54,7 @@ impl tower::Service for TxPoolHandle { .try_send((req, tx)) .expect("You need to use `poll_ready` to check capacity!"); - async move { - rx.await - .expect("Tx pool will always respond without dropping the sender") - } - .boxed() + rx.into() } } @@ -83,8 +83,11 @@ where + Send + 'static, TxV::Future: Send + 'static, - Ctx: Service - + Send + Ctx: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + > + Send + 'static, Ctx::Future: Send + 'static, { @@ -101,11 +104,14 @@ where ), tower::BoxError, > { - let current_ctx = ctx_svc + let BlockChainContextResponse::Context(current_ctx) = ctx_svc .ready() .await? - .call(BlockChainContextRequest) - .await?; + .call(BlockChainContextRequest::Get) + .await? + else { + panic!("Context service service returned wrong response!") + }; let tx_pool = TxPool { txs: Default::default(), @@ -133,12 +139,17 @@ where if let Ok(current_ctx) = self.current_ctx.blockchain_context().cloned() { Ok(current_ctx) } else { - self.current_ctx = self + let BlockChainContextResponse::Context(current_ctx) = self .ctx_svc .ready() .await? - .call(BlockChainContextRequest) - .await?; + .call(BlockChainContextRequest::Get) + .await? + else { + panic!("Context service service returned wrong response!") + }; + + self.current_ctx = current_ctx; Ok(self.current_ctx.unchecked_blockchain_context().clone()) } diff --git a/consensus/src/block.rs b/consensus/src/block.rs index 3e8331f..6127819 100644 --- a/consensus/src/block.rs +++ b/consensus/src/block.rs @@ -11,7 +11,7 @@ use rayon::prelude::*; use tower::{Service, ServiceExt}; use crate::{ - context::{BlockChainContext, BlockChainContextRequest}, + context::{BlockChainContextRequest, BlockChainContextResponse}, helper::rayon_spawn_async, transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse}, ConsensusError, HardFork, TxNotInPool, TxPoolRequest, TxPoolResponse, @@ -75,7 +75,10 @@ pub struct BlockVerifierService { impl BlockVerifierService where - C: Service + Clone + Send + 'static, + C: Service + + Clone + + Send + + 'static, TxV: Service + Clone + Send @@ -100,8 +103,11 @@ where impl Service for BlockVerifierService where - C: Service - + Clone + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + > + Clone + Send + 'static, C::Future: Send + 'static, @@ -123,14 +129,12 @@ where type Future = Pin> + Send + 'static>>; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - futures::ready!(self.context_svc.poll_ready(cx)).map(Into::into)?; - self.tx_verifier_svc.poll_ready(cx) + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, req: VerifyBlockRequest) -> Self::Future { let context_svc = self.context_svc.clone(); - let context_svc = std::mem::replace(&mut self.context_svc, context_svc); let tx_verifier_svc = self.tx_verifier_svc.clone(); let tx_pool = self.tx_pool.clone(); @@ -195,8 +199,11 @@ async fn verify_prepared_main_chain_block( tx_pool: TxP, ) -> Result where - C: Service - + Send + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + > + Send + 'static, C::Future: Send + 'static, TxV: Service, @@ -206,12 +213,14 @@ where + 'static, { tracing::debug!("getting blockchain context"); - let checked_context = context_svc - .oneshot(BlockChainContextRequest) + let BlockChainContextResponse::Context(checked_context) = context_svc + .oneshot(BlockChainContextRequest::Get) .await - .map_err(Into::::into)?; + .map_err(Into::::into)? + else { + panic!("Context service returned wrong response!"); + }; - // TODO: should we unwrap here, we did just get the data so it should be ok. let context = checked_context.unchecked_blockchain_context().clone(); tracing::debug!("got blockchain context: {:?}", context); @@ -275,7 +284,7 @@ where weight: block_weight, height: context.chain_height, long_term_weight: context.next_block_long_term_weight(block_weight), - hf_vote: HardFork::V1, + hf_vote: block.hf_vote, cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty, })) } @@ -287,8 +296,11 @@ async fn verify_main_chain_block( tx_pool: TxP, ) -> Result where - C: Service - + Send + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + > + Send + 'static, C::Future: Send + 'static, TxV: Service, @@ -298,12 +310,14 @@ where + 'static, { tracing::debug!("getting blockchain context"); - let checked_context = context_svc - .oneshot(BlockChainContextRequest) + let BlockChainContextResponse::Context(checked_context) = context_svc + .oneshot(BlockChainContextRequest::Get) .await - .map_err(Into::::into)?; + .map_err(Into::::into)? + else { + panic!("Context service returned wrong response!"); + }; - // TODO: should we unwrap here, we did just get the data so it should be ok. let context = checked_context.unchecked_blockchain_context().clone(); tracing::debug!("got blockchain context: {:?}", context); diff --git a/consensus/src/context.rs b/consensus/src/context.rs index 93d54c2..6ab98fd 100644 --- a/consensus/src/context.rs +++ b/consensus/src/context.rs @@ -8,8 +8,7 @@ use std::{ cmp::min, future::Future, - ops::{Deref, DerefMut}, - pin::Pin, + ops::DerefMut, sync::Arc, task::{Context, Poll}, }; @@ -20,6 +19,8 @@ use futures::{ }; use tower::{Service, ServiceExt}; +use cuprate_common::tower_utils::InstaFuture; + use crate::{helper::current_time, ConsensusError, Database, DatabaseRequest, DatabaseResponse}; mod difficulty; @@ -57,20 +58,17 @@ pub async fn initialize_blockchain_context( cfg: ContextConfig, mut database: D, ) -> Result< - ( - impl Service< - BlockChainContextRequest, - Response = BlockChainContext, - Error = tower::BoxError, - Future = impl Future> - + Send - + 'static, - > + Clone - + Send - + Sync - + 'static, - impl Service, - ), + impl Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + Future = impl Future> + + Send + + 'static, + > + Clone + + Send + + Sync + + 'static, ConsensusError, > where @@ -135,9 +133,7 @@ where lock_state: MutexLockState::Locked, }; - let context_svc_update = context_svc.clone(); - - Ok((context_svc_update.clone(), context_svc_update)) + Ok(context_svc) } /// Raw blockchain context, gotten from [`BlockChainContext`]. This data may turn invalid so is not ok to keep @@ -246,7 +242,27 @@ impl BlockChainContext { } #[derive(Debug, Clone)] -pub struct BlockChainContextRequest; +pub struct UpdateBlockchainCacheData { + pub new_top_hash: [u8; 32], + pub height: u64, + pub timestamp: u64, + pub weight: usize, + pub long_term_weight: usize, + pub generated_coins: u64, + pub vote: HardFork, + pub cumulative_difficulty: u128, +} + +#[derive(Debug, Clone)] +pub enum BlockChainContextRequest { + Get, + Update(UpdateBlockchainCacheData), +} + +pub enum BlockChainContextResponse { + Context(BlockChainContext), + Ok, +} #[derive(Clone)] struct InternalBlockChainContext { @@ -285,10 +301,9 @@ impl Clone for BlockChainContextService { } impl Service for BlockChainContextService { - type Response = BlockChainContext; + type Response = BlockChainContextResponse; type Error = tower::BoxError; - type Future = - Pin> + Send + 'static>>; + type Future = InstaFuture>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { loop { @@ -306,121 +321,67 @@ impl Service for BlockChainContextService { } } - fn call(&mut self, _: BlockChainContextRequest) -> Self::Future { - let MutexLockState::Acquired(internal_blockchain_context) = - std::mem::replace(&mut self.lock_state, MutexLockState::Locked) - else { - panic!("poll_ready() was not called first!") - }; - - async move { - let InternalBlockChainContext { - current_validity_token, - current_reorg_token, - difficulty_cache, - weight_cache, - hardfork_state, - chain_height, - top_block_hash, - already_generated_coins, - } = internal_blockchain_context.deref(); - - let current_hf = hardfork_state.current_hardfork(); - - Ok(BlockChainContext { - validity_token: current_validity_token.clone(), - raw: RawBlockChainContext { - next_difficulty: difficulty_cache.next_difficulty(¤t_hf), - cumulative_difficulty: difficulty_cache.cumulative_difficulty(), - effective_median_weight: weight_cache - .effective_median_block_weight(¤t_hf), - median_long_term_weight: weight_cache.median_long_term_weight(), - median_weight_for_block_reward: weight_cache - .median_for_block_reward(¤t_hf), - already_generated_coins: *already_generated_coins, - top_block_timestamp: difficulty_cache.top_block_timestamp(), - median_block_timestamp: difficulty_cache.median_timestamp( - usize::try_from(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW).unwrap(), - ), - chain_height: *chain_height, - top_hash: *top_block_hash, - current_hard_fork: current_hf, - re_org_token: current_reorg_token.clone(), - }, - }) - } - .boxed() - } -} - -// TODO: join these services, there is no need for 2. -pub struct UpdateBlockchainCacheRequest { - pub new_top_hash: [u8; 32], - pub height: u64, - pub timestamp: u64, - pub weight: usize, - pub long_term_weight: usize, - pub generated_coins: u64, - pub vote: HardFork, - pub cumulative_difficulty: u128, -} - -impl tower::Service for BlockChainContextService { - type Response = (); - type Error = tower::BoxError; - type Future = - Pin> + Send + 'static>>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - loop { - match &mut self.lock_state { - MutexLockState::Locked => { - self.lock_state = MutexLockState::Acquiring( - Arc::clone(&self.internal_blockchain_context).lock_owned(), - ) - } - MutexLockState::Acquiring(rpc) => { - self.lock_state = MutexLockState::Acquired(futures::ready!(rpc.poll_unpin(cx))) - } - MutexLockState::Acquired(_) => return Poll::Ready(Ok(())), - } - } - } - - fn call(&mut self, new: UpdateBlockchainCacheRequest) -> Self::Future { + fn call(&mut self, req: BlockChainContextRequest) -> Self::Future { let MutexLockState::Acquired(mut internal_blockchain_context) = std::mem::replace(&mut self.lock_state, MutexLockState::Locked) else { panic!("poll_ready() was not called first!") }; - async move { - let InternalBlockChainContext { - current_validity_token, - current_reorg_token: _, - difficulty_cache, - weight_cache, - hardfork_state, - chain_height, - top_block_hash, - already_generated_coins, - } = internal_blockchain_context.deref_mut(); + let InternalBlockChainContext { + current_validity_token, + current_reorg_token, + difficulty_cache, + weight_cache, + hardfork_state, + chain_height, + top_block_hash, + already_generated_coins, + } = internal_blockchain_context.deref_mut(); - // Cancel the validity token and replace it with a new one. - std::mem::replace(current_validity_token, ValidityToken::new()).set_data_invalid(); + match req { + BlockChainContextRequest::Get => { + let current_hf = hardfork_state.current_hardfork(); - difficulty_cache.new_block(new.height, new.timestamp, new.cumulative_difficulty); + InstaFuture::from(Ok(BlockChainContextResponse::Context(BlockChainContext { + validity_token: current_validity_token.clone(), + raw: RawBlockChainContext { + next_difficulty: difficulty_cache.next_difficulty(¤t_hf), + cumulative_difficulty: difficulty_cache.cumulative_difficulty(), + effective_median_weight: weight_cache + .effective_median_block_weight(¤t_hf), + median_long_term_weight: weight_cache.median_long_term_weight(), + median_weight_for_block_reward: weight_cache + .median_for_block_reward(¤t_hf), + already_generated_coins: *already_generated_coins, + top_block_timestamp: difficulty_cache.top_block_timestamp(), + median_block_timestamp: difficulty_cache.median_timestamp( + usize::try_from(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW).unwrap(), + ), + chain_height: *chain_height, + top_hash: *top_block_hash, + current_hard_fork: current_hf, + re_org_token: current_reorg_token.clone(), + }, + }))) + } + BlockChainContextRequest::Update(new) => { + // Cancel the validity token and replace it with a new one. + std::mem::replace(current_validity_token, ValidityToken::new()).set_data_invalid(); - weight_cache.new_block(new.height, new.weight, new.long_term_weight); + difficulty_cache.new_block(new.height, new.timestamp, new.cumulative_difficulty); - hardfork_state.new_block(new.vote, new.height); + weight_cache.new_block(new.height, new.weight, new.long_term_weight); - *chain_height = new.height + 1; - *top_block_hash = new.new_top_hash; - *already_generated_coins = already_generated_coins.saturating_add(new.generated_coins); + hardfork_state.new_block(new.vote, new.height); - Ok(()) + *chain_height = new.height + 1; + *top_block_hash = new.new_top_hash; + *already_generated_coins = + already_generated_coins.saturating_add(new.generated_coins); + + InstaFuture::from(Ok(BlockChainContextResponse::Ok)) + } } - .boxed() } } diff --git a/consensus/src/context/tests.rs b/consensus/src/context/tests.rs index e70c3ec..5eb6dfc 100644 --- a/consensus/src/context/tests.rs +++ b/consensus/src/context/tests.rs @@ -5,7 +5,7 @@ use tower::ServiceExt; use super::{ difficulty::tests::TEST_DIFFICULTY_CONFIG, hardforks::tests::TEST_HARD_FORK_CONFIG, initialize_blockchain_context, weight::tests::TEST_WEIGHT_CONFIG, BlockChainContextRequest, - ContextConfig, UpdateBlockchainCacheRequest, + BlockChainContextResponse, ContextConfig, UpdateBlockchainCacheData, }; use crate::{test_utils::mock_db::*, HardFork}; @@ -25,25 +25,33 @@ async fn context_invalidated_on_new_block() -> Result<(), tower::BoxError> { .unwrap() .current(); - let (ctx_svc, updater) = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?; + let ctx_svc = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?; - let context = ctx_svc.oneshot(BlockChainContextRequest).await?; + let BlockChainContextResponse::Context(context) = ctx_svc + .clone() + .oneshot(BlockChainContextRequest::Get) + .await? + else { + panic!("Context service returned wrong response!"); + }; assert!(context.is_still_valid()); assert!(context.is_still_valid()); assert!(context.is_still_valid()); - updater - .oneshot(UpdateBlockchainCacheRequest { - new_top_hash: [0; 32], - height: BLOCKCHAIN_HEIGHT, - timestamp: 0, - weight: 0, - long_term_weight: 0, - generated_coins: 0, - vote: HardFork::V1, - cumulative_difficulty: 0, - }) + ctx_svc + .oneshot(BlockChainContextRequest::Update( + UpdateBlockchainCacheData { + new_top_hash: [0; 32], + height: BLOCKCHAIN_HEIGHT, + timestamp: 0, + weight: 0, + long_term_weight: 0, + generated_coins: 0, + vote: HardFork::V1, + cumulative_difficulty: 0, + }, + )) .await?; assert!(!context.is_still_valid()); @@ -61,9 +69,13 @@ async fn context_height_correct() -> Result<(), tower::BoxError> { .unwrap() .current(); - let (ctx_svc, _) = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?; + let ctx_svc = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?; - let context = ctx_svc.oneshot(BlockChainContextRequest).await?; + let BlockChainContextResponse::Context(context) = + ctx_svc.oneshot(BlockChainContextRequest::Get).await? + else { + panic!("context service returned incorrect response!") + }; assert_eq!( context.blockchain_context().unwrap().chain_height, diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 190b8a6..4dca640 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -19,8 +19,8 @@ pub use block::{ PrePreparedBlock, VerifiedBlockInformation, VerifyBlockRequest, VerifyBlockResponse, }; pub use context::{ - initialize_blockchain_context, BlockChainContext, BlockChainContextRequest, ContextConfig, - HardFork, UpdateBlockchainCacheRequest, + initialize_blockchain_context, BlockChainContext, BlockChainContextRequest, + BlockChainContextResponse, ContextConfig, HardFork, }; pub use transactions::{VerifyTxRequest, VerifyTxResponse}; @@ -63,7 +63,7 @@ where TxP::Future: Send + 'static, Ctx: tower::Service< BlockChainContextRequest, - Response = BlockChainContext, + Response = BlockChainContextResponse, Error = tower::BoxError, > + Clone + Send diff --git a/consensus/src/rpc.rs b/consensus/src/rpc.rs index 507b9d4..fba6b29 100644 --- a/consensus/src/rpc.rs +++ b/consensus/src/rpc.rs @@ -4,22 +4,18 @@ use std::{ future::Future, ops::Range, pin::Pin, - sync::{Arc, Mutex, RwLock}, + sync::Arc, task::{Context, Poll}, - time::Duration, }; -use curve25519_dalek::edwards::CompressedEdwardsY; use futures::{ - lock::{OwnedMutexGuard, OwnedMutexLockFuture}, stream::{FuturesOrdered, FuturesUnordered}, FutureExt, StreamExt, TryFutureExt, TryStreamExt, }; -use monero_serai::rpc::{HttpRpc, RpcConnection, RpcError}; +use tokio::sync::RwLock; use tower::{balance::p2c::Balance, util::BoxService, ServiceExt}; -use tracing_subscriber::filter::FilterExt; -use crate::{DatabaseRequest, DatabaseResponse}; +use crate::{helper::rayon_spawn_async, DatabaseRequest, DatabaseResponse}; pub mod cache; mod connection; @@ -73,7 +69,7 @@ impl tower::retry::Policy for Attempts { pub fn init_rpc_load_balancer( addresses: Vec, cache: Arc>, - config: Arc>, + config: Arc>, ) -> impl tower::Service< DatabaseRequest, Response = DatabaseResponse, @@ -87,7 +83,7 @@ pub fn init_rpc_load_balancer( let rpc_balance = Balance::new(Box::pin( rpc_discoverer_rx.map(Result::<_, tower::BoxError>::Ok), )); - let rpc_buffer = tower::buffer::Buffer::new(rpc_balance, 500); + let rpc_buffer = tower::buffer::Buffer::new(rpc_balance, 50); let rpcs = tower::retry::Retry::new(Attempts(10), rpc_buffer); let discover = discover::RPCDiscover { @@ -109,7 +105,7 @@ pub fn init_rpc_load_balancer( #[derive(Clone)] pub struct RpcBalancer { rpcs: T, - config: Arc>, + config: Arc>, cache: Arc>, } @@ -141,19 +137,19 @@ where match req { DatabaseRequest::CheckKIsNotSpent(kis) => async move { Ok(DatabaseResponse::CheckKIsNotSpent( - cache.read().unwrap().are_kis_spent(kis), + cache.read().await.are_kis_spent(kis), )) } .boxed(), DatabaseRequest::GeneratedCoins => async move { Ok(DatabaseResponse::GeneratedCoins( - cache.read().unwrap().already_generated_coins, + cache.read().await.already_generated_coins, )) } .boxed(), DatabaseRequest::NumberOutputsWithAmount(amt) => async move { Ok(DatabaseResponse::NumberOutputsWithAmount( - cache.read().unwrap().numb_outs(amt), + cache.read().await.numb_outs(amt), )) } .boxed(), @@ -172,6 +168,7 @@ where resp_to_ret, config.max_blocks_per_node, ) + .boxed() } DatabaseRequest::BlockExtendedHeaderInRange(range) => { let resp_to_ret = |resp: DatabaseResponse| { @@ -188,37 +185,43 @@ where resp_to_ret, config.max_block_headers_per_node, ) + .boxed() } DatabaseRequest::Outputs(outs) => async move { - let mut split_outs: Vec>> = Vec::new(); - let mut i: usize = 0; - for (amount, ixs) in outs { - if ixs.len() > MAX_OUTS_PER_RPC { - for ii in (0..ixs.len()).step_by(MAX_OUTS_PER_RPC) { - let mut amt_map = HashSet::with_capacity(MAX_OUTS_PER_RPC); - amt_map.extend(ixs.iter().skip(ii).copied().take(MAX_OUTS_PER_RPC)); + let split_outs = rayon_spawn_async(|| { + let mut split_outs: Vec>> = Vec::new(); + let mut i: usize = 0; + for (amount, ixs) in outs { + if ixs.len() > MAX_OUTS_PER_RPC { + for ii in (0..ixs.len()).step_by(MAX_OUTS_PER_RPC) { + let mut amt_map = HashSet::with_capacity(MAX_OUTS_PER_RPC); + amt_map.extend(ixs.iter().skip(ii).copied().take(MAX_OUTS_PER_RPC)); - let mut map = HashMap::new(); - map.insert(amount, amt_map); - split_outs.push(map); - i += 1; - } - continue; - } - - if let Some(map) = split_outs.get_mut(i.saturating_sub(1)) { - if map.iter().map(|(_, amt_map)| amt_map.len()).sum::() + ixs.len() - < MAX_OUTS_PER_RPC - { - assert!(map.insert(amount, ixs).is_none()); + let mut map = HashMap::new(); + map.insert(amount, amt_map); + split_outs.push(map); + i += 1; + } continue; } + + if let Some(map) = split_outs.get_mut(i.saturating_sub(1)) { + if map.iter().map(|(_, amt_map)| amt_map.len()).sum::() + + ixs.len() + < MAX_OUTS_PER_RPC + { + assert!(map.insert(amount, ixs).is_none()); + continue; + } + } + let mut map = HashMap::new(); + map.insert(amount, ixs); + split_outs.push(map); + i += 1; } - let mut map = HashMap::new(); - map.insert(amount, ixs); - split_outs.push(map); - i += 1; - } + split_outs + }) + .await; let mut futs = FuturesUnordered::from_iter( split_outs @@ -247,11 +250,11 @@ where fn split_range_request( rpc: T, range: Range, - req: impl FnOnce(Range) -> DatabaseRequest + Clone + Send + 'static, + req: impl Fn(Range) -> DatabaseRequest + Send + 'static, resp: impl FnOnce(Vec) -> DatabaseResponse + Send + 'static, resp_to_ret: impl Fn(DatabaseResponse) -> Vec + Copy + Send + 'static, max_request_per_rpc: u64, -) -> Pin> + Send + 'static>> +) -> impl Future> + Send + 'static where T: tower::Service + Clone @@ -264,7 +267,6 @@ where let iter = (0..range.clone().count() as u64) .step_by(max_request_per_rpc as usize) .map(|i| { - let req = req.clone(); let new_range = (range.start + i)..(min(range.start + i + max_request_per_rpc, range.end)); rpc.clone().oneshot(req(new_range)).map_ok(resp_to_ret) @@ -281,5 +283,4 @@ where Ok(resp(res)) } - .boxed() } diff --git a/consensus/src/rpc/cache.rs b/consensus/src/rpc/cache.rs index 3ad0531..a871516 100644 --- a/consensus/src/rpc/cache.rs +++ b/consensus/src/rpc/cache.rs @@ -1,9 +1,8 @@ -use std::io::Write; use std::{ collections::HashMap, collections::HashSet, fmt::{Display, Formatter}, - io::BufWriter, + io::{BufWriter, Write}, path::Path, sync::Arc, }; diff --git a/consensus/src/rpc/connection.rs b/consensus/src/rpc/connection.rs index c54e74e..8660e45 100644 --- a/consensus/src/rpc/connection.rs +++ b/consensus/src/rpc/connection.rs @@ -1,16 +1,14 @@ use std::{ collections::{HashMap, HashSet}, - future::Future, ops::Range, - pin::Pin, - sync::{Arc, RwLock}, + sync::Arc, task::{Context, Poll}, }; use curve25519_dalek::edwards::CompressedEdwardsY; use futures::{ channel::{mpsc, oneshot}, - FutureExt, StreamExt, + StreamExt, }; use monero_serai::{ block::Block, @@ -24,11 +22,12 @@ use serde_json::json; use tokio::{ task::JoinHandle, time::{timeout, Duration}, + sync::RwLock }; use tower::Service; use tracing::{instrument, Instrument}; -use cuprate_common::BlockID; +use cuprate_common::{tower_utils::InfallibleOneshotReceiver, BlockID}; use super::ScanningCache; use crate::{ @@ -36,6 +35,7 @@ use crate::{ OutputOnChain, }; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300); +const OUTPUTS_TIMEOUT: Duration = Duration::from_secs(20); pub struct RpcConnectionSvc { pub(crate) address: String, @@ -47,8 +47,7 @@ pub struct RpcConnectionSvc { impl Service for RpcConnectionSvc { type Response = DatabaseResponse; type Error = tower::BoxError; - type Future = - Pin> + Send + 'static>>; + type Future = InfallibleOneshotReceiver>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { if self.rpc_task_handle.is_finished() { @@ -70,11 +69,7 @@ impl Service for RpcConnectionSvc { .try_send(req) .expect("poll_ready should be called first!"); - async move { - rx.await - .expect("sender will not be dropped without response") - } - .boxed() + rx.into() } } @@ -214,9 +209,9 @@ impl RpcConnection { ) .await?; - let blocks: Response = monero_epee_bin_serde::from_bytes(res)?; - rayon_spawn_async(|| { + let blocks: Response = monero_epee_bin_serde::from_bytes(res)?; + blocks .blocks .into_par_iter() @@ -256,8 +251,8 @@ impl RpcConnection { } #[derive(Serialize, Clone)] - struct Request { - outputs: Vec, + struct Request<'a> { + outputs: &'a [OutputID], } #[derive(Deserialize)] @@ -273,30 +268,32 @@ impl RpcConnection { outs: Vec, } - let outputs = out_ids - .into_iter() - .flat_map(|(amt, amt_map)| { - amt_map - .into_iter() - .map(|amt_idx| OutputID { - amount: amt, - index: amt_idx, - }) - .collect::>() - }) - .collect::>(); + let outputs = rayon_spawn_async(|| { + out_ids + .into_iter() + .flat_map(|(amt, amt_map)| { + amt_map + .into_iter() + .map(|amt_idx| OutputID { + amount: amt, + index: amt_idx, + }) + .collect::>() + }) + .collect::>() + }) + .await; let res = self .con .bin_call( "get_outs.bin", - monero_epee_bin_serde::to_bytes(&Request { - outputs: outputs.clone(), - })?, + monero_epee_bin_serde::to_bytes(&Request { outputs: &outputs })?, ) .await?; - let cache = self.cache.clone(); + let cache = self.cache.clone().read_owned().await; + let span = tracing::Span::current(); rayon_spawn_async(move || { let outs: Response = monero_epee_bin_serde::from_bytes(&res)?; @@ -304,9 +301,8 @@ impl RpcConnection { tracing::info!(parent: &span, "Got outputs len: {}", outs.outs.len()); let mut ret = HashMap::new(); - let cache = cache.read().unwrap(); - for (out, idx) in outs.outs.iter().zip(outputs) { + for (out, idx) in outs.outs.into_iter().zip(outputs) { ret.entry(idx.amount).or_insert_with(HashMap::new).insert( idx.index, OutputOnChain { @@ -341,7 +337,7 @@ impl RpcConnection { .map(DatabaseResponse::BlockHash) } DatabaseRequest::ChainHeight => { - let height = self.cache.read().unwrap().height; + let height = self.cache.read().await.height; let hash = timeout(DEFAULT_TIMEOUT, self.get_block_hash(height - 1)).await??; @@ -364,7 +360,7 @@ impl RpcConnection { .map(DatabaseResponse::BlockBatchInRange) } DatabaseRequest::Outputs(out_ids) => { - timeout(DEFAULT_TIMEOUT, self.get_outputs(out_ids)) + timeout(OUTPUTS_TIMEOUT, self.get_outputs(out_ids)) .await? .map(DatabaseResponse::Outputs) } diff --git a/consensus/src/rpc/discover.rs b/consensus/src/rpc/discover.rs index ca01072..a442fca 100644 --- a/consensus/src/rpc/discover.rs +++ b/consensus/src/rpc/discover.rs @@ -1,8 +1,4 @@ -use std::{ - collections::HashSet, - sync::{Arc, RwLock}, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; use futures::{ channel::mpsc::{self, SendError}, @@ -10,6 +6,7 @@ use futures::{ SinkExt, StreamExt, }; use monero_serai::rpc::HttpRpc; +use tokio::sync::RwLock; use tower::{discover::Change, load::PeakEwma}; use tracing::instrument; @@ -22,8 +19,10 @@ use super::{ async fn check_rpc(addr: String, cache: Arc>) -> Option { tracing::debug!("Sending request to node."); - let con = HttpRpc::new(addr.clone()).await.ok()?; - let (tx, rx) = mpsc::channel(1); + let con = HttpRpc::new_custom_timeout(addr.clone(), Duration::from_secs(u64::MAX)) + .await + .ok()?; + let (tx, rx) = mpsc::channel(0); let rpc = RpcConnection { address: addr.clone(), con, @@ -58,7 +57,7 @@ impl RPCDiscover { PeakEwma::new( rpc, Duration::from_secs(5000), - 300.0, + 3000.0, tower::load::CompleteOnResponse::default(), ), ))