move more tasks to rayon and change some returned

futures to be explicit instead of Boxed
This commit is contained in:
Boog900 2023-12-02 22:57:34 +00:00
parent 8557073c15
commit f3d96ca2ce
No known key found for this signature in database
GPG key ID: 5401367FB7302004
14 changed files with 361 additions and 309 deletions

View file

@ -10,3 +10,5 @@ authors = ["Boog900"]
chrono = "0.4.24"
thiserror = "1"
hex = "0.4"
futures = "0.3.29"

View file

@ -1,6 +1,7 @@
//pub mod hardforks;
pub mod network;
pub mod pruning;
pub mod tower_utils;
use std::fmt::Formatter;
//pub use hardforks::HardForks;

49
common/src/tower_utils.rs Normal file
View file

@ -0,0 +1,49 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures::channel::oneshot;
use futures::FutureExt;
/// A oneshot that doesn't return an Error. This requires the sender to always
/// return a response.
pub struct InfallibleOneshotReceiver<T>(oneshot::Receiver<T>);
impl<T> From<oneshot::Receiver<T>> for InfallibleOneshotReceiver<T> {
fn from(value: oneshot::Receiver<T>) -> Self {
InfallibleOneshotReceiver(value)
}
}
impl<T> Future for InfallibleOneshotReceiver<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0
.poll_unpin(cx)
.map(|res| res.expect("Oneshot must not be cancelled before response!"))
}
}
/// A future that is ready straight away.
pub struct InstaFuture<T>(Option<T>);
impl<T: Unpin> From<T> for InstaFuture<T> {
fn from(value: T) -> Self {
InstaFuture(Some(value))
}
}
impl<T: Unpin> Future for InstaFuture<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(
self.0
.take()
.expect("Can't call future twice after Poll::Ready"),
)
}
}

View file

@ -36,9 +36,9 @@ crypto-bigint = "0.5"
curve25519-dalek = "4"
randomx-rs = "1"
monero-serai = {git="https://github.com/serai-dex/serai.git", rev = "c328e5e"}
multiexp = {git="https://github.com/serai-dex/serai.git", rev = "c328e5e"}
dalek-ff-group = {git="https://github.com/serai-dex/serai.git", rev = "c328e5e"}
monero-serai = {git="https://github.com/cuprate/serai.git", rev = "4a5d860"}
multiexp = {git="https://github.com/cuprate/serai.git", rev = "4a5d860"}
dalek-ff-group = {git="https://github.com/cuprate/serai.git", rev = "4a5d860"}
cuprate-common = {path = "../common"}
cryptonight-cuprate = {path = "../cryptonight"}

View file

@ -1,21 +1,23 @@
#![cfg(feature = "binaries")]
use std::{
ops::Range,
path::PathBuf,
sync::{Arc, RwLock},
};
use std::{ops::Range, path::PathBuf, sync::Arc};
use futures::{channel::mpsc, SinkExt, StreamExt};
use futures::{
channel::{mpsc, oneshot},
SinkExt, StreamExt,
};
use monero_serai::block::Block;
use tokio::sync::oneshot;
use tokio::sync::RwLock;
use tower::{Service, ServiceExt};
use tracing::level_filters::LevelFilter;
use cuprate_common::Network;
use monero_consensus::{
context::{ContextConfig, UpdateBlockchainCacheRequest},
context::{
BlockChainContextRequest, BlockChainContextResponse, ContextConfig,
UpdateBlockchainCacheData,
},
initialize_blockchain_context, initialize_verifier,
rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig},
Database, DatabaseRequest, DatabaseResponse, VerifiedBlockInformation, VerifyBlockRequest,
@ -24,8 +26,8 @@ use monero_consensus::{
mod tx_pool;
const MAX_BLOCKS_IN_RANGE: u64 = 500;
const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 500;
const MAX_BLOCKS_IN_RANGE: u64 = 1000;
const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 1000;
/// Calls for a batch of blocks, returning the response and the time it took.
async fn call_batch<D: Database>(
@ -43,10 +45,14 @@ async fn update_cache_and_context<Ctx>(
verified_block_info: VerifiedBlockInformation,
) -> Result<(), tower::BoxError>
where
Ctx: tower::Service<UpdateBlockchainCacheRequest, Response = (), Error = tower::BoxError>,
Ctx: tower::Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
>,
{
// add the new block to the cache
cache.write().unwrap().add_new_block_data(
cache.write().await.add_new_block_data(
verified_block_info.generated_coins,
&verified_block_info.block.miner_tx,
&verified_block_info.txs,
@ -55,16 +61,18 @@ where
context_updater
.ready()
.await?
.call(UpdateBlockchainCacheRequest {
new_top_hash: verified_block_info.block_hash,
height: verified_block_info.height,
timestamp: verified_block_info.block.header.timestamp,
weight: verified_block_info.weight,
long_term_weight: verified_block_info.long_term_weight,
vote: verified_block_info.hf_vote,
generated_coins: verified_block_info.generated_coins,
cumulative_difficulty: verified_block_info.cumulative_difficulty,
})
.call(BlockChainContextRequest::Update(
UpdateBlockchainCacheData {
new_top_hash: verified_block_info.block_hash,
height: verified_block_info.height,
timestamp: verified_block_info.block.header.timestamp,
weight: verified_block_info.weight,
long_term_weight: verified_block_info.long_term_weight,
vote: verified_block_info.hf_vote,
generated_coins: verified_block_info.generated_coins,
cumulative_difficulty: verified_block_info.cumulative_difficulty,
},
))
.await?;
Ok(())
@ -126,7 +134,7 @@ where
async fn scan_chain<D>(
cache: Arc<RwLock<ScanningCache>>,
save_file: PathBuf,
_rpc_config: Arc<RwLock<RpcConfig>>,
_rpc_config: Arc<std::sync::RwLock<RpcConfig>>,
database: D,
) -> Result<(), tower::BoxError>
where
@ -142,19 +150,18 @@ where
let config = ContextConfig::main_net();
let (ctx_svc, mut context_updater) =
initialize_blockchain_context(config, database.clone()).await?;
let mut ctx_svc = initialize_blockchain_context(config, database.clone()).await?;
let (tx, rx) = tokio::sync::oneshot::channel();
let (tx, rx) = oneshot::channel();
let (tx_pool_svc, new_tx_chan) = tx_pool::TxPool::spawn(rx, ctx_svc.clone()).await?;
let (mut block_verifier, transaction_verifier) =
initialize_verifier(database.clone(), tx_pool_svc, ctx_svc).await?;
initialize_verifier(database.clone(), tx_pool_svc, ctx_svc.clone()).await?;
tx.send(transaction_verifier).map_err(|_| "").unwrap();
let start_height = cache.read().unwrap().height;
let start_height = cache.read().await.height;
let (block_tx, mut incoming_blocks) = mpsc::channel(3);
@ -207,10 +214,10 @@ where
if verified_block_info.height % 5000 == 0 {
tracing::info!("saving cache to: {}", save_file.display());
cache.write().unwrap().save(&save_file).unwrap();
cache.write().await.save(&save_file).unwrap();
}
update_cache_and_context(&cache, &mut context_updater, verified_block_info).await?;
update_cache_and_context(&cache, &mut ctx_svc, verified_block_info).await?;
}
tracing::info!(
@ -267,7 +274,7 @@ async fn main() {
];
let rpc_config = RpcConfig::new(MAX_BLOCKS_IN_RANGE, MAX_BLOCKS_HEADERS_IN_RANGE);
let rpc_config = Arc::new(RwLock::new(rpc_config));
let rpc_config = Arc::new(std::sync::RwLock::new(rpc_config));
tracing::info!("Attempting to open cache at: {}", file_for_cache.display());
let cache = match ScanningCache::load(&file_for_cache) {

View file

@ -2,19 +2,24 @@
use std::{
collections::HashMap,
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
};
use futures::{channel::mpsc, FutureExt, StreamExt};
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use monero_serai::transaction::Transaction;
use tokio::sync::oneshot;
use tower::{Service, ServiceExt};
use cuprate_common::tower_utils::InfallibleOneshotReceiver;
use monero_consensus::{
context::{BlockChainContext, BlockChainContextRequest, RawBlockChainContext},
context::{
BlockChainContext, BlockChainContextRequest, BlockChainContextResponse,
RawBlockChainContext,
},
transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse},
ConsensusError, TxNotInPool, TxPoolRequest, TxPoolResponse,
};
@ -31,8 +36,7 @@ pub struct TxPoolHandle {
impl tower::Service<TxPoolRequest> for TxPoolHandle {
type Response = TxPoolResponse;
type Error = TxNotInPool;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.tx_pool_task.is_finished() {
@ -50,11 +54,7 @@ impl tower::Service<TxPoolRequest> for TxPoolHandle {
.try_send((req, tx))
.expect("You need to use `poll_ready` to check capacity!");
async move {
rx.await
.expect("Tx pool will always respond without dropping the sender")
}
.boxed()
rx.into()
}
}
@ -83,8 +83,11 @@ where
+ Send
+ 'static,
TxV::Future: Send + 'static,
Ctx: Service<BlockChainContextRequest, Response = BlockChainContext, Error = tower::BoxError>
+ Send
Ctx: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
Ctx::Future: Send + 'static,
{
@ -101,11 +104,14 @@ where
),
tower::BoxError,
> {
let current_ctx = ctx_svc
let BlockChainContextResponse::Context(current_ctx) = ctx_svc
.ready()
.await?
.call(BlockChainContextRequest)
.await?;
.call(BlockChainContextRequest::Get)
.await?
else {
panic!("Context service service returned wrong response!")
};
let tx_pool = TxPool {
txs: Default::default(),
@ -133,12 +139,17 @@ where
if let Ok(current_ctx) = self.current_ctx.blockchain_context().cloned() {
Ok(current_ctx)
} else {
self.current_ctx = self
let BlockChainContextResponse::Context(current_ctx) = self
.ctx_svc
.ready()
.await?
.call(BlockChainContextRequest)
.await?;
.call(BlockChainContextRequest::Get)
.await?
else {
panic!("Context service service returned wrong response!")
};
self.current_ctx = current_ctx;
Ok(self.current_ctx.unchecked_blockchain_context().clone())
}

View file

@ -11,7 +11,7 @@ use rayon::prelude::*;
use tower::{Service, ServiceExt};
use crate::{
context::{BlockChainContext, BlockChainContextRequest},
context::{BlockChainContextRequest, BlockChainContextResponse},
helper::rayon_spawn_async,
transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse},
ConsensusError, HardFork, TxNotInPool, TxPoolRequest, TxPoolResponse,
@ -75,7 +75,10 @@ pub struct BlockVerifierService<C: Clone, TxV: Clone, TxP: Clone> {
impl<C, TxV, TxP> BlockVerifierService<C, TxV, TxP>
where
C: Service<BlockChainContextRequest, Response = BlockChainContext> + Clone + Send + 'static,
C: Service<BlockChainContextRequest, Response = BlockChainContextResponse>
+ Clone
+ Send
+ 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>
+ Clone
+ Send
@ -100,8 +103,11 @@ where
impl<C, TxV, TxP> Service<VerifyBlockRequest> for BlockVerifierService<C, TxV, TxP>
where
C: Service<BlockChainContextRequest, Response = BlockChainContext, Error = tower::BoxError>
+ Clone
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
@ -123,14 +129,12 @@ where
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
futures::ready!(self.context_svc.poll_ready(cx)).map(Into::into)?;
self.tx_verifier_svc.poll_ready(cx)
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: VerifyBlockRequest) -> Self::Future {
let context_svc = self.context_svc.clone();
let context_svc = std::mem::replace(&mut self.context_svc, context_svc);
let tx_verifier_svc = self.tx_verifier_svc.clone();
let tx_pool = self.tx_pool.clone();
@ -195,8 +199,11 @@ async fn verify_prepared_main_chain_block<C, TxV, TxP>(
tx_pool: TxP,
) -> Result<VerifyBlockResponse, ConsensusError>
where
C: Service<BlockChainContextRequest, Response = BlockChainContext, Error = tower::BoxError>
+ Send
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>,
@ -206,12 +213,14 @@ where
+ 'static,
{
tracing::debug!("getting blockchain context");
let checked_context = context_svc
.oneshot(BlockChainContextRequest)
let BlockChainContextResponse::Context(checked_context) = context_svc
.oneshot(BlockChainContextRequest::Get)
.await
.map_err(Into::<ConsensusError>::into)?;
.map_err(Into::<ConsensusError>::into)?
else {
panic!("Context service returned wrong response!");
};
// TODO: should we unwrap here, we did just get the data so it should be ok.
let context = checked_context.unchecked_blockchain_context().clone();
tracing::debug!("got blockchain context: {:?}", context);
@ -275,7 +284,7 @@ where
weight: block_weight,
height: context.chain_height,
long_term_weight: context.next_block_long_term_weight(block_weight),
hf_vote: HardFork::V1,
hf_vote: block.hf_vote,
cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty,
}))
}
@ -287,8 +296,11 @@ async fn verify_main_chain_block<C, TxV, TxP>(
tx_pool: TxP,
) -> Result<VerifyBlockResponse, ConsensusError>
where
C: Service<BlockChainContextRequest, Response = BlockChainContext, Error = tower::BoxError>
+ Send
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>,
@ -298,12 +310,14 @@ where
+ 'static,
{
tracing::debug!("getting blockchain context");
let checked_context = context_svc
.oneshot(BlockChainContextRequest)
let BlockChainContextResponse::Context(checked_context) = context_svc
.oneshot(BlockChainContextRequest::Get)
.await
.map_err(Into::<ConsensusError>::into)?;
.map_err(Into::<ConsensusError>::into)?
else {
panic!("Context service returned wrong response!");
};
// TODO: should we unwrap here, we did just get the data so it should be ok.
let context = checked_context.unchecked_blockchain_context().clone();
tracing::debug!("got blockchain context: {:?}", context);

View file

@ -8,8 +8,7 @@
use std::{
cmp::min,
future::Future,
ops::{Deref, DerefMut},
pin::Pin,
ops::DerefMut,
sync::Arc,
task::{Context, Poll},
};
@ -20,6 +19,8 @@ use futures::{
};
use tower::{Service, ServiceExt};
use cuprate_common::tower_utils::InstaFuture;
use crate::{helper::current_time, ConsensusError, Database, DatabaseRequest, DatabaseResponse};
mod difficulty;
@ -57,20 +58,17 @@ pub async fn initialize_blockchain_context<D>(
cfg: ContextConfig,
mut database: D,
) -> Result<
(
impl Service<
BlockChainContextRequest,
Response = BlockChainContext,
Error = tower::BoxError,
Future = impl Future<Output = Result<BlockChainContext, tower::BoxError>>
+ Send
+ 'static,
> + Clone
+ Send
+ Sync
+ 'static,
impl Service<UpdateBlockchainCacheRequest, Response = (), Error = tower::BoxError>,
),
impl Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
Future = impl Future<Output = Result<BlockChainContextResponse, tower::BoxError>>
+ Send
+ 'static,
> + Clone
+ Send
+ Sync
+ 'static,
ConsensusError,
>
where
@ -135,9 +133,7 @@ where
lock_state: MutexLockState::Locked,
};
let context_svc_update = context_svc.clone();
Ok((context_svc_update.clone(), context_svc_update))
Ok(context_svc)
}
/// Raw blockchain context, gotten from [`BlockChainContext`]. This data may turn invalid so is not ok to keep
@ -246,7 +242,27 @@ impl BlockChainContext {
}
#[derive(Debug, Clone)]
pub struct BlockChainContextRequest;
pub struct UpdateBlockchainCacheData {
pub new_top_hash: [u8; 32],
pub height: u64,
pub timestamp: u64,
pub weight: usize,
pub long_term_weight: usize,
pub generated_coins: u64,
pub vote: HardFork,
pub cumulative_difficulty: u128,
}
#[derive(Debug, Clone)]
pub enum BlockChainContextRequest {
Get,
Update(UpdateBlockchainCacheData),
}
pub enum BlockChainContextResponse {
Context(BlockChainContext),
Ok,
}
#[derive(Clone)]
struct InternalBlockChainContext {
@ -285,10 +301,9 @@ impl Clone for BlockChainContextService {
}
impl Service<BlockChainContextRequest> for BlockChainContextService {
type Response = BlockChainContext;
type Response = BlockChainContextResponse;
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
type Future = InstaFuture<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
@ -306,121 +321,67 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
}
}
fn call(&mut self, _: BlockChainContextRequest) -> Self::Future {
let MutexLockState::Acquired(internal_blockchain_context) =
std::mem::replace(&mut self.lock_state, MutexLockState::Locked)
else {
panic!("poll_ready() was not called first!")
};
async move {
let InternalBlockChainContext {
current_validity_token,
current_reorg_token,
difficulty_cache,
weight_cache,
hardfork_state,
chain_height,
top_block_hash,
already_generated_coins,
} = internal_blockchain_context.deref();
let current_hf = hardfork_state.current_hardfork();
Ok(BlockChainContext {
validity_token: current_validity_token.clone(),
raw: RawBlockChainContext {
next_difficulty: difficulty_cache.next_difficulty(&current_hf),
cumulative_difficulty: difficulty_cache.cumulative_difficulty(),
effective_median_weight: weight_cache
.effective_median_block_weight(&current_hf),
median_long_term_weight: weight_cache.median_long_term_weight(),
median_weight_for_block_reward: weight_cache
.median_for_block_reward(&current_hf),
already_generated_coins: *already_generated_coins,
top_block_timestamp: difficulty_cache.top_block_timestamp(),
median_block_timestamp: difficulty_cache.median_timestamp(
usize::try_from(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW).unwrap(),
),
chain_height: *chain_height,
top_hash: *top_block_hash,
current_hard_fork: current_hf,
re_org_token: current_reorg_token.clone(),
},
})
}
.boxed()
}
}
// TODO: join these services, there is no need for 2.
pub struct UpdateBlockchainCacheRequest {
pub new_top_hash: [u8; 32],
pub height: u64,
pub timestamp: u64,
pub weight: usize,
pub long_term_weight: usize,
pub generated_coins: u64,
pub vote: HardFork,
pub cumulative_difficulty: u128,
}
impl tower::Service<UpdateBlockchainCacheRequest> for BlockChainContextService {
type Response = ();
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
match &mut self.lock_state {
MutexLockState::Locked => {
self.lock_state = MutexLockState::Acquiring(
Arc::clone(&self.internal_blockchain_context).lock_owned(),
)
}
MutexLockState::Acquiring(rpc) => {
self.lock_state = MutexLockState::Acquired(futures::ready!(rpc.poll_unpin(cx)))
}
MutexLockState::Acquired(_) => return Poll::Ready(Ok(())),
}
}
}
fn call(&mut self, new: UpdateBlockchainCacheRequest) -> Self::Future {
fn call(&mut self, req: BlockChainContextRequest) -> Self::Future {
let MutexLockState::Acquired(mut internal_blockchain_context) =
std::mem::replace(&mut self.lock_state, MutexLockState::Locked)
else {
panic!("poll_ready() was not called first!")
};
async move {
let InternalBlockChainContext {
current_validity_token,
current_reorg_token: _,
difficulty_cache,
weight_cache,
hardfork_state,
chain_height,
top_block_hash,
already_generated_coins,
} = internal_blockchain_context.deref_mut();
let InternalBlockChainContext {
current_validity_token,
current_reorg_token,
difficulty_cache,
weight_cache,
hardfork_state,
chain_height,
top_block_hash,
already_generated_coins,
} = internal_blockchain_context.deref_mut();
// Cancel the validity token and replace it with a new one.
std::mem::replace(current_validity_token, ValidityToken::new()).set_data_invalid();
match req {
BlockChainContextRequest::Get => {
let current_hf = hardfork_state.current_hardfork();
difficulty_cache.new_block(new.height, new.timestamp, new.cumulative_difficulty);
InstaFuture::from(Ok(BlockChainContextResponse::Context(BlockChainContext {
validity_token: current_validity_token.clone(),
raw: RawBlockChainContext {
next_difficulty: difficulty_cache.next_difficulty(&current_hf),
cumulative_difficulty: difficulty_cache.cumulative_difficulty(),
effective_median_weight: weight_cache
.effective_median_block_weight(&current_hf),
median_long_term_weight: weight_cache.median_long_term_weight(),
median_weight_for_block_reward: weight_cache
.median_for_block_reward(&current_hf),
already_generated_coins: *already_generated_coins,
top_block_timestamp: difficulty_cache.top_block_timestamp(),
median_block_timestamp: difficulty_cache.median_timestamp(
usize::try_from(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW).unwrap(),
),
chain_height: *chain_height,
top_hash: *top_block_hash,
current_hard_fork: current_hf,
re_org_token: current_reorg_token.clone(),
},
})))
}
BlockChainContextRequest::Update(new) => {
// Cancel the validity token and replace it with a new one.
std::mem::replace(current_validity_token, ValidityToken::new()).set_data_invalid();
weight_cache.new_block(new.height, new.weight, new.long_term_weight);
difficulty_cache.new_block(new.height, new.timestamp, new.cumulative_difficulty);
hardfork_state.new_block(new.vote, new.height);
weight_cache.new_block(new.height, new.weight, new.long_term_weight);
*chain_height = new.height + 1;
*top_block_hash = new.new_top_hash;
*already_generated_coins = already_generated_coins.saturating_add(new.generated_coins);
hardfork_state.new_block(new.vote, new.height);
Ok(())
*chain_height = new.height + 1;
*top_block_hash = new.new_top_hash;
*already_generated_coins =
already_generated_coins.saturating_add(new.generated_coins);
InstaFuture::from(Ok(BlockChainContextResponse::Ok))
}
}
.boxed()
}
}

View file

@ -5,7 +5,7 @@ use tower::ServiceExt;
use super::{
difficulty::tests::TEST_DIFFICULTY_CONFIG, hardforks::tests::TEST_HARD_FORK_CONFIG,
initialize_blockchain_context, weight::tests::TEST_WEIGHT_CONFIG, BlockChainContextRequest,
ContextConfig, UpdateBlockchainCacheRequest,
BlockChainContextResponse, ContextConfig, UpdateBlockchainCacheData,
};
use crate::{test_utils::mock_db::*, HardFork};
@ -25,25 +25,33 @@ async fn context_invalidated_on_new_block() -> Result<(), tower::BoxError> {
.unwrap()
.current();
let (ctx_svc, updater) = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?;
let ctx_svc = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?;
let context = ctx_svc.oneshot(BlockChainContextRequest).await?;
let BlockChainContextResponse::Context(context) = ctx_svc
.clone()
.oneshot(BlockChainContextRequest::Get)
.await?
else {
panic!("Context service returned wrong response!");
};
assert!(context.is_still_valid());
assert!(context.is_still_valid());
assert!(context.is_still_valid());
updater
.oneshot(UpdateBlockchainCacheRequest {
new_top_hash: [0; 32],
height: BLOCKCHAIN_HEIGHT,
timestamp: 0,
weight: 0,
long_term_weight: 0,
generated_coins: 0,
vote: HardFork::V1,
cumulative_difficulty: 0,
})
ctx_svc
.oneshot(BlockChainContextRequest::Update(
UpdateBlockchainCacheData {
new_top_hash: [0; 32],
height: BLOCKCHAIN_HEIGHT,
timestamp: 0,
weight: 0,
long_term_weight: 0,
generated_coins: 0,
vote: HardFork::V1,
cumulative_difficulty: 0,
},
))
.await?;
assert!(!context.is_still_valid());
@ -61,9 +69,13 @@ async fn context_height_correct() -> Result<(), tower::BoxError> {
.unwrap()
.current();
let (ctx_svc, _) = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?;
let ctx_svc = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?;
let context = ctx_svc.oneshot(BlockChainContextRequest).await?;
let BlockChainContextResponse::Context(context) =
ctx_svc.oneshot(BlockChainContextRequest::Get).await?
else {
panic!("context service returned incorrect response!")
};
assert_eq!(
context.blockchain_context().unwrap().chain_height,

View file

@ -19,8 +19,8 @@ pub use block::{
PrePreparedBlock, VerifiedBlockInformation, VerifyBlockRequest, VerifyBlockResponse,
};
pub use context::{
initialize_blockchain_context, BlockChainContext, BlockChainContextRequest, ContextConfig,
HardFork, UpdateBlockchainCacheRequest,
initialize_blockchain_context, BlockChainContext, BlockChainContextRequest,
BlockChainContextResponse, ContextConfig, HardFork,
};
pub use transactions::{VerifyTxRequest, VerifyTxResponse};
@ -63,7 +63,7 @@ where
TxP::Future: Send + 'static,
Ctx: tower::Service<
BlockChainContextRequest,
Response = BlockChainContext,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send

View file

@ -4,22 +4,18 @@ use std::{
future::Future,
ops::Range,
pin::Pin,
sync::{Arc, Mutex, RwLock},
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use curve25519_dalek::edwards::CompressedEdwardsY;
use futures::{
lock::{OwnedMutexGuard, OwnedMutexLockFuture},
stream::{FuturesOrdered, FuturesUnordered},
FutureExt, StreamExt, TryFutureExt, TryStreamExt,
};
use monero_serai::rpc::{HttpRpc, RpcConnection, RpcError};
use tokio::sync::RwLock;
use tower::{balance::p2c::Balance, util::BoxService, ServiceExt};
use tracing_subscriber::filter::FilterExt;
use crate::{DatabaseRequest, DatabaseResponse};
use crate::{helper::rayon_spawn_async, DatabaseRequest, DatabaseResponse};
pub mod cache;
mod connection;
@ -73,7 +69,7 @@ impl<Req: Clone, Res, E> tower::retry::Policy<Req, Res, E> for Attempts {
pub fn init_rpc_load_balancer(
addresses: Vec<String>,
cache: Arc<RwLock<ScanningCache>>,
config: Arc<RwLock<RpcConfig>>,
config: Arc<std::sync::RwLock<RpcConfig>>,
) -> impl tower::Service<
DatabaseRequest,
Response = DatabaseResponse,
@ -87,7 +83,7 @@ pub fn init_rpc_load_balancer(
let rpc_balance = Balance::new(Box::pin(
rpc_discoverer_rx.map(Result::<_, tower::BoxError>::Ok),
));
let rpc_buffer = tower::buffer::Buffer::new(rpc_balance, 500);
let rpc_buffer = tower::buffer::Buffer::new(rpc_balance, 50);
let rpcs = tower::retry::Retry::new(Attempts(10), rpc_buffer);
let discover = discover::RPCDiscover {
@ -109,7 +105,7 @@ pub fn init_rpc_load_balancer(
#[derive(Clone)]
pub struct RpcBalancer<T: Clone> {
rpcs: T,
config: Arc<RwLock<RpcConfig>>,
config: Arc<std::sync::RwLock<RpcConfig>>,
cache: Arc<RwLock<ScanningCache>>,
}
@ -141,19 +137,19 @@ where
match req {
DatabaseRequest::CheckKIsNotSpent(kis) => async move {
Ok(DatabaseResponse::CheckKIsNotSpent(
cache.read().unwrap().are_kis_spent(kis),
cache.read().await.are_kis_spent(kis),
))
}
.boxed(),
DatabaseRequest::GeneratedCoins => async move {
Ok(DatabaseResponse::GeneratedCoins(
cache.read().unwrap().already_generated_coins,
cache.read().await.already_generated_coins,
))
}
.boxed(),
DatabaseRequest::NumberOutputsWithAmount(amt) => async move {
Ok(DatabaseResponse::NumberOutputsWithAmount(
cache.read().unwrap().numb_outs(amt),
cache.read().await.numb_outs(amt),
))
}
.boxed(),
@ -172,6 +168,7 @@ where
resp_to_ret,
config.max_blocks_per_node,
)
.boxed()
}
DatabaseRequest::BlockExtendedHeaderInRange(range) => {
let resp_to_ret = |resp: DatabaseResponse| {
@ -188,37 +185,43 @@ where
resp_to_ret,
config.max_block_headers_per_node,
)
.boxed()
}
DatabaseRequest::Outputs(outs) => async move {
let mut split_outs: Vec<HashMap<u64, HashSet<u64>>> = Vec::new();
let mut i: usize = 0;
for (amount, ixs) in outs {
if ixs.len() > MAX_OUTS_PER_RPC {
for ii in (0..ixs.len()).step_by(MAX_OUTS_PER_RPC) {
let mut amt_map = HashSet::with_capacity(MAX_OUTS_PER_RPC);
amt_map.extend(ixs.iter().skip(ii).copied().take(MAX_OUTS_PER_RPC));
let split_outs = rayon_spawn_async(|| {
let mut split_outs: Vec<HashMap<u64, HashSet<u64>>> = Vec::new();
let mut i: usize = 0;
for (amount, ixs) in outs {
if ixs.len() > MAX_OUTS_PER_RPC {
for ii in (0..ixs.len()).step_by(MAX_OUTS_PER_RPC) {
let mut amt_map = HashSet::with_capacity(MAX_OUTS_PER_RPC);
amt_map.extend(ixs.iter().skip(ii).copied().take(MAX_OUTS_PER_RPC));
let mut map = HashMap::new();
map.insert(amount, amt_map);
split_outs.push(map);
i += 1;
}
continue;
}
if let Some(map) = split_outs.get_mut(i.saturating_sub(1)) {
if map.iter().map(|(_, amt_map)| amt_map.len()).sum::<usize>() + ixs.len()
< MAX_OUTS_PER_RPC
{
assert!(map.insert(amount, ixs).is_none());
let mut map = HashMap::new();
map.insert(amount, amt_map);
split_outs.push(map);
i += 1;
}
continue;
}
if let Some(map) = split_outs.get_mut(i.saturating_sub(1)) {
if map.iter().map(|(_, amt_map)| amt_map.len()).sum::<usize>()
+ ixs.len()
< MAX_OUTS_PER_RPC
{
assert!(map.insert(amount, ixs).is_none());
continue;
}
}
let mut map = HashMap::new();
map.insert(amount, ixs);
split_outs.push(map);
i += 1;
}
let mut map = HashMap::new();
map.insert(amount, ixs);
split_outs.push(map);
i += 1;
}
split_outs
})
.await;
let mut futs = FuturesUnordered::from_iter(
split_outs
@ -247,11 +250,11 @@ where
fn split_range_request<T, Ret>(
rpc: T,
range: Range<u64>,
req: impl FnOnce(Range<u64>) -> DatabaseRequest + Clone + Send + 'static,
req: impl Fn(Range<u64>) -> DatabaseRequest + Send + 'static,
resp: impl FnOnce(Vec<Ret>) -> DatabaseResponse + Send + 'static,
resp_to_ret: impl Fn(DatabaseResponse) -> Vec<Ret> + Copy + Send + 'static,
max_request_per_rpc: u64,
) -> Pin<Box<dyn Future<Output = Result<DatabaseResponse, tower::BoxError>> + Send + 'static>>
) -> impl Future<Output = Result<DatabaseResponse, tower::BoxError>> + Send + 'static
where
T: tower::Service<DatabaseRequest, Response = DatabaseResponse, Error = tower::BoxError>
+ Clone
@ -264,7 +267,6 @@ where
let iter = (0..range.clone().count() as u64)
.step_by(max_request_per_rpc as usize)
.map(|i| {
let req = req.clone();
let new_range =
(range.start + i)..(min(range.start + i + max_request_per_rpc, range.end));
rpc.clone().oneshot(req(new_range)).map_ok(resp_to_ret)
@ -281,5 +283,4 @@ where
Ok(resp(res))
}
.boxed()
}

View file

@ -1,9 +1,8 @@
use std::io::Write;
use std::{
collections::HashMap,
collections::HashSet,
fmt::{Display, Formatter},
io::BufWriter,
io::{BufWriter, Write},
path::Path,
sync::Arc,
};

View file

@ -1,16 +1,14 @@
use std::{
collections::{HashMap, HashSet},
future::Future,
ops::Range,
pin::Pin,
sync::{Arc, RwLock},
sync::Arc,
task::{Context, Poll},
};
use curve25519_dalek::edwards::CompressedEdwardsY;
use futures::{
channel::{mpsc, oneshot},
FutureExt, StreamExt,
StreamExt,
};
use monero_serai::{
block::Block,
@ -24,11 +22,12 @@ use serde_json::json;
use tokio::{
task::JoinHandle,
time::{timeout, Duration},
sync::RwLock
};
use tower::Service;
use tracing::{instrument, Instrument};
use cuprate_common::BlockID;
use cuprate_common::{tower_utils::InfallibleOneshotReceiver, BlockID};
use super::ScanningCache;
use crate::{
@ -36,6 +35,7 @@ use crate::{
OutputOnChain,
};
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
const OUTPUTS_TIMEOUT: Duration = Duration::from_secs(20);
pub struct RpcConnectionSvc {
pub(crate) address: String,
@ -47,8 +47,7 @@ pub struct RpcConnectionSvc {
impl Service<DatabaseRequest> for RpcConnectionSvc {
type Response = DatabaseResponse;
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.rpc_task_handle.is_finished() {
@ -70,11 +69,7 @@ impl Service<DatabaseRequest> for RpcConnectionSvc {
.try_send(req)
.expect("poll_ready should be called first!");
async move {
rx.await
.expect("sender will not be dropped without response")
}
.boxed()
rx.into()
}
}
@ -214,9 +209,9 @@ impl RpcConnection {
)
.await?;
let blocks: Response = monero_epee_bin_serde::from_bytes(res)?;
rayon_spawn_async(|| {
let blocks: Response = monero_epee_bin_serde::from_bytes(res)?;
blocks
.blocks
.into_par_iter()
@ -256,8 +251,8 @@ impl RpcConnection {
}
#[derive(Serialize, Clone)]
struct Request {
outputs: Vec<OutputID>,
struct Request<'a> {
outputs: &'a [OutputID],
}
#[derive(Deserialize)]
@ -273,30 +268,32 @@ impl RpcConnection {
outs: Vec<OutputRes>,
}
let outputs = out_ids
.into_iter()
.flat_map(|(amt, amt_map)| {
amt_map
.into_iter()
.map(|amt_idx| OutputID {
amount: amt,
index: amt_idx,
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
let outputs = rayon_spawn_async(|| {
out_ids
.into_iter()
.flat_map(|(amt, amt_map)| {
amt_map
.into_iter()
.map(|amt_idx| OutputID {
amount: amt,
index: amt_idx,
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>()
})
.await;
let res = self
.con
.bin_call(
"get_outs.bin",
monero_epee_bin_serde::to_bytes(&Request {
outputs: outputs.clone(),
})?,
monero_epee_bin_serde::to_bytes(&Request { outputs: &outputs })?,
)
.await?;
let cache = self.cache.clone();
let cache = self.cache.clone().read_owned().await;
let span = tracing::Span::current();
rayon_spawn_async(move || {
let outs: Response = monero_epee_bin_serde::from_bytes(&res)?;
@ -304,9 +301,8 @@ impl RpcConnection {
tracing::info!(parent: &span, "Got outputs len: {}", outs.outs.len());
let mut ret = HashMap::new();
let cache = cache.read().unwrap();
for (out, idx) in outs.outs.iter().zip(outputs) {
for (out, idx) in outs.outs.into_iter().zip(outputs) {
ret.entry(idx.amount).or_insert_with(HashMap::new).insert(
idx.index,
OutputOnChain {
@ -341,7 +337,7 @@ impl RpcConnection {
.map(DatabaseResponse::BlockHash)
}
DatabaseRequest::ChainHeight => {
let height = self.cache.read().unwrap().height;
let height = self.cache.read().await.height;
let hash = timeout(DEFAULT_TIMEOUT, self.get_block_hash(height - 1)).await??;
@ -364,7 +360,7 @@ impl RpcConnection {
.map(DatabaseResponse::BlockBatchInRange)
}
DatabaseRequest::Outputs(out_ids) => {
timeout(DEFAULT_TIMEOUT, self.get_outputs(out_ids))
timeout(OUTPUTS_TIMEOUT, self.get_outputs(out_ids))
.await?
.map(DatabaseResponse::Outputs)
}

View file

@ -1,8 +1,4 @@
use std::{
collections::HashSet,
sync::{Arc, RwLock},
time::Duration,
};
use std::{sync::Arc, time::Duration};
use futures::{
channel::mpsc::{self, SendError},
@ -10,6 +6,7 @@ use futures::{
SinkExt, StreamExt,
};
use monero_serai::rpc::HttpRpc;
use tokio::sync::RwLock;
use tower::{discover::Change, load::PeakEwma};
use tracing::instrument;
@ -22,8 +19,10 @@ use super::{
async fn check_rpc(addr: String, cache: Arc<RwLock<ScanningCache>>) -> Option<RpcConnectionSvc> {
tracing::debug!("Sending request to node.");
let con = HttpRpc::new(addr.clone()).await.ok()?;
let (tx, rx) = mpsc::channel(1);
let con = HttpRpc::new_custom_timeout(addr.clone(), Duration::from_secs(u64::MAX))
.await
.ok()?;
let (tx, rx) = mpsc::channel(0);
let rpc = RpcConnection {
address: addr.clone(),
con,
@ -58,7 +57,7 @@ impl RPCDiscover {
PeakEwma::new(
rpc,
Duration::from_secs(5000),
300.0,
3000.0,
tower::load::CompleteOnResponse::default(),
),
))