From 525d1322c8aa3511458e8637b832e85f0d7e4915 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sat, 23 Nov 2024 23:55:01 +0000 Subject: [PATCH] use arc-swap to cache context --- Cargo.lock | 7 ++ Cargo.toml | 2 +- binaries/cuprated/src/blockchain.rs | 4 +- binaries/cuprated/src/blockchain/manager.rs | 25 +--- .../src/blockchain/manager/handler.rs | 40 +++---- binaries/cuprated/src/blockchain/syncer.rs | 56 +-------- binaries/cuprated/src/blockchain/types.rs | 9 +- binaries/cuprated/src/rpc/handler.rs | 6 +- .../src/rpc/request/blockchain_context.rs | 28 +---- binaries/cuprated/src/txpool.rs | 2 +- binaries/cuprated/src/txpool/incoming_tx.rs | 21 +--- consensus/context/Cargo.toml | 1 + consensus/context/src/lib.rs | 79 ++++--------- consensus/context/src/task.rs | 108 ++++++++++-------- consensus/context/src/tokens.rs | 33 ------ consensus/fast-sync/src/fast_sync.rs | 66 +++-------- consensus/src/block.rs | 91 +++------------ consensus/src/block/batch_prepare.rs | 38 ++---- consensus/src/lib.rs | 21 +--- 19 files changed, 193 insertions(+), 444 deletions(-) delete mode 100644 consensus/context/src/tokens.rs diff --git a/Cargo.lock b/Cargo.lock index 08c017c4..4e41b012 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,6 +56,12 @@ version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74f37166d7d48a0284b99dd824694c26119c700b53bf0d1540cdb147dbdaaf13" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "arrayref" version = "0.3.9" @@ -642,6 +648,7 @@ dependencies = [ name = "cuprate-consensus-context" version = "0.1.0" dependencies = [ + "arc-swap", "cuprate-consensus-rules", "cuprate-helper", "cuprate-types", diff --git a/Cargo.toml b/Cargo.toml index 1bfd680a..52bc0e6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,7 @@ cuprate-zmq-types = { path = "zmq/types", default-features = # External dependencies anyhow = { version = "1", default-features = false } +arc-swap = { version = "1", default-features = false } arrayvec = { version = "0.7", default-features = false } async-trait = { version = "0.1", default-features = false } bitflags = { version = "2", default-features = false } @@ -135,7 +136,6 @@ tokio-test = { version = "0.4" } ## TODO: ## Potential dependencies. -# arc-swap = { version = "1.6.0" } # Atomically swappable Arc | https://github.com/vorner/arc-swap # itoa = { version = "1.0.9" } # Fast integer to string formatting | https://github.com/dtolnay/itoa # notify = { version = "6.1.1" } # Filesystem watching | https://github.com/notify-rs/notify # once_cell = { version = "1.18.0" } # Lazy/one-time initialization | https://github.com/matklad/once_cell diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index c4b75e4e..29f0f433 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -8,7 +8,7 @@ use tokio::sync::{mpsc, Notify}; use tower::{BoxError, Service, ServiceExt}; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; -use cuprate_consensus::{generate_genesis_block, BlockChainContextService, ContextConfig}; +use cuprate_consensus::{generate_genesis_block, BlockchainContextService, ContextConfig}; use cuprate_cryptonight::cryptonight_hash_v0; use cuprate_p2p::{block_downloader::BlockDownloaderConfig, NetworkInterface}; use cuprate_p2p_core::{ClearNet, Network}; @@ -84,7 +84,7 @@ pub async fn init_consensus( ( ConcreteBlockVerifierService, ConcreteTxVerifierService, - BlockChainContextService, + BlockchainContextService, ), BoxError, > { diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index 2166795e..0024fafc 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -8,11 +8,10 @@ use tracing::error; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_consensus::{ - BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, - BlockVerifierService, ExtendedConsensusError, TxVerifierService, VerifyBlockRequest, + BlockChainContextRequest, BlockChainContextResponse, BlockVerifierService, + BlockchainContextService, ExtendedConsensusError, TxVerifierService, VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse, }; -use cuprate_consensus_context::RawBlockChainContext; use cuprate_p2p::{ block_downloader::{BlockBatch, BlockDownloaderConfig}, BroadcastSvc, NetworkInterface, @@ -48,7 +47,7 @@ pub async fn init_blockchain_manager( blockchain_write_handle: BlockchainWriteHandle, blockchain_read_handle: BlockchainReadHandle, txpool_write_handle: TxpoolWriteHandle, - mut blockchain_context_service: BlockChainContextService, + mut blockchain_context_service: BlockchainContextService, block_verifier_service: ConcreteBlockVerifierService, block_downloader_config: BlockDownloaderConfig, ) { @@ -68,23 +67,11 @@ pub async fn init_blockchain_manager( block_downloader_config, )); - let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(BlockChainContextRequest::Context) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - else { - unreachable!() - }; - let manager = BlockchainManager { blockchain_write_handle, blockchain_read_handle, txpool_write_handle, blockchain_context_service, - cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(), block_verifier_service, stop_current_block_downloader, broadcast_svc: clearnet_interface.broadcast_svc(), @@ -107,13 +94,9 @@ pub struct BlockchainManager { blockchain_read_handle: BlockchainReadHandle, /// A [`TxpoolWriteHandle`]. txpool_write_handle: TxpoolWriteHandle, - // TODO: Improve the API of the cache service. - // TODO: rename the cache service -> `BlockchainContextService`. /// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve /// values without needing to go to a [`BlockchainReadHandle`]. - blockchain_context_service: BlockChainContextService, - /// A cached context representing the current state. - cached_blockchain_context: RawBlockChainContext, + blockchain_context_service: BlockchainContextService, /// The block verifier service, to verify incoming blocks. block_verifier_service: ConcreteBlockVerifierService, /// A [`Notify`] to tell the [syncer](syncer::syncer) that we want to cancel this current download diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index 5d1cd2d6..507d9713 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -83,7 +83,11 @@ impl super::BlockchainManager { block: Block, prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, ) -> Result { - if block.header.previous != self.cached_blockchain_context.top_hash { + let context = self.blockchain_context_service.blockchain_context(); + let top_hash = context.top_hash; + let chain_height = context.chain_height; + + if block.header.previous != top_hash { self.handle_incoming_alt_block(block, prepared_txs).await?; return Ok(IncomingBlockOk::AddedToAltChain); } @@ -105,8 +109,7 @@ impl super::BlockchainManager { let block_blob = Bytes::copy_from_slice(&verified_block.block_blob); self.add_valid_block_to_main_chain(verified_block).await; - self.broadcast_block(block_blob, self.cached_blockchain_context.chain_height) - .await; + self.broadcast_block(block_blob, chain_height).await; Ok(IncomingBlockOk::AddedToMainChain) } @@ -126,7 +129,12 @@ impl super::BlockchainManager { .first() .expect("Block batch should not be empty"); - if first_block.header.previous == self.cached_blockchain_context.top_hash { + if first_block.header.previous + == self + .blockchain_context_service + .blockchain_context() + .top_hash + { self.handle_incoming_block_batch_main_chain(batch).await; } else { self.handle_incoming_block_batch_alt_chain(batch).await; @@ -285,7 +293,10 @@ impl super::BlockchainManager { // If this alt chain if alt_block_info.cumulative_difficulty - > self.cached_blockchain_context.cumulative_difficulty + > self + .blockchain_context_service + .blockchain_context() + .cumulative_difficulty { self.try_do_reorg(alt_block_info).await?; return Ok(AddAltBlock::Reorged); @@ -338,7 +349,10 @@ impl super::BlockchainManager { alt_blocks.push(top_alt_block); let split_height = alt_blocks[0].height; - let current_main_chain_height = self.cached_blockchain_context.chain_height; + let current_main_chain_height = self + .blockchain_context_service + .blockchain_context() + .chain_height; let BlockchainResponse::PopBlocks(old_main_chain_id) = self .blockchain_write_handle @@ -472,20 +486,6 @@ impl super::BlockchainManager { .await .expect(PANIC_CRITICAL_SERVICE_ERROR); - let BlockChainContextResponse::Context(blockchain_context) = self - .blockchain_context_service - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(BlockChainContextRequest::Context) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - else { - unreachable!(); - }; - - self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone(); - self.txpool_write_handle .ready() .await diff --git a/binaries/cuprated/src/blockchain/syncer.rs b/binaries/cuprated/src/blockchain/syncer.rs index 69ad3303..691d2568 100644 --- a/binaries/cuprated/src/blockchain/syncer.rs +++ b/binaries/cuprated/src/blockchain/syncer.rs @@ -9,7 +9,8 @@ use tokio::{ use tower::{Service, ServiceExt}; use tracing::instrument; -use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse}; +use cuprate_consensus::{BlockChainContextRequest, BlockChainContextResponse, BlockchainContext}; +use cuprate_consensus_context::BlockchainContextService; use cuprate_p2p::{ block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}, NetworkInterface, PeerSetRequest, PeerSetResponse, @@ -29,8 +30,8 @@ pub enum SyncerError { /// The syncer tasks that makes sure we are fully synchronised with our connected peers. #[instrument(level = "debug", skip_all)] -pub async fn syncer( - mut context_svc: C, +pub async fn syncer( + mut context_svc: BlockchainContextService, our_chain: CN, mut clearnet_interface: NetworkInterface, incoming_block_batch_tx: mpsc::Sender, @@ -38,12 +39,6 @@ pub async fn syncer( block_downloader_config: BlockDownloaderConfig, ) -> Result<(), SyncerError> where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - >, - C::Future: Send + 'static, CN: Service + Clone + Send @@ -54,15 +49,6 @@ where let mut check_sync_interval = interval(CHECK_SYNC_FREQUENCY); - let BlockChainContextResponse::Context(mut blockchain_ctx) = context_svc - .ready() - .await? - .call(BlockChainContextRequest::Context) - .await? - else { - unreachable!(); - }; - tracing::debug!("Waiting for new sync info in top sync channel"); loop { @@ -70,8 +56,7 @@ where tracing::trace!("Checking connected peers to see if we are behind",); - check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?; - let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context(); + let blockchain_context = context_svc.blockchain_context(); let PeerSetResponse::MostPoWSeen { cumulative_difficulty, @@ -86,7 +71,7 @@ where unreachable!(); }; - if cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty { + if cumulative_difficulty <= blockchain_context.cumulative_difficulty { continue; } @@ -116,32 +101,3 @@ where } } } - -/// Checks if we should update the given [`BlockChainContext`] and updates it if needed. -async fn check_update_blockchain_context( - context_svc: C, - old_context: &mut BlockChainContext, -) -> Result<(), tower::BoxError> -where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - >, - C::Future: Send + 'static, -{ - if old_context.blockchain_context().is_ok() { - return Ok(()); - } - - let BlockChainContextResponse::Context(ctx) = context_svc - .oneshot(BlockChainContextRequest::Context) - .await? - else { - unreachable!(); - }; - - *old_context = ctx; - - Ok(()) -} diff --git a/binaries/cuprated/src/blockchain/types.rs b/binaries/cuprated/src/blockchain/types.rs index 54e46621..5390eb63 100644 --- a/binaries/cuprated/src/blockchain/types.rs +++ b/binaries/cuprated/src/blockchain/types.rs @@ -1,14 +1,11 @@ use tower::util::MapErr; use cuprate_blockchain::{cuprate_database::RuntimeError, service::BlockchainReadHandle}; -use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService}; +use cuprate_consensus::{BlockVerifierService, BlockchainContextService, TxVerifierService}; /// The [`BlockVerifierService`] with all generic types defined. -pub type ConcreteBlockVerifierService = BlockVerifierService< - BlockChainContextService, - ConcreteTxVerifierService, - ConsensusBlockchainReadHandle, ->; +pub type ConcreteBlockVerifierService = + BlockVerifierService; /// The [`TxVerifierService`] with all generic types defined. pub type ConcreteTxVerifierService = TxVerifierService; diff --git a/binaries/cuprated/src/rpc/handler.rs b/binaries/cuprated/src/rpc/handler.rs index 1f73403b..c16248d9 100644 --- a/binaries/cuprated/src/rpc/handler.rs +++ b/binaries/cuprated/src/rpc/handler.rs @@ -8,7 +8,7 @@ use monero_serai::block::Block; use tower::Service; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; -use cuprate_consensus::BlockChainContextService; +use cuprate_consensus::BlockchainContextService; use cuprate_pruning::PruningSeed; use cuprate_rpc_interface::RpcHandler; use cuprate_rpc_types::{ @@ -148,7 +148,7 @@ pub struct CupratedRpcHandler { pub blockchain_read: BlockchainReadHandle, /// Handle to the blockchain context service. - pub blockchain_context: BlockChainContextService, + pub blockchain_context: BlockchainContextService, /// Handle to the blockchain manager. pub blockchain_manager: BlockchainManagerHandle, @@ -165,7 +165,7 @@ impl CupratedRpcHandler { pub const fn new( restricted: bool, blockchain_read: BlockchainReadHandle, - blockchain_context: BlockChainContextService, + blockchain_context: BlockchainContextService, blockchain_manager: BlockchainManagerHandle, txpool_read: TxpoolReadHandle, txpool_manager: std::convert::Infallible, diff --git a/binaries/cuprated/src/rpc/request/blockchain_context.rs b/binaries/cuprated/src/rpc/request/blockchain_context.rs index c6f0f225..1d4e28f5 100644 --- a/binaries/cuprated/src/rpc/request/blockchain_context.rs +++ b/binaries/cuprated/src/rpc/request/blockchain_context.rs @@ -7,35 +7,17 @@ use monero_serai::block::Block; use tower::{Service, ServiceExt}; use cuprate_consensus_context::{ - BlockChainContext, BlockChainContextRequest, BlockChainContextResponse, - BlockChainContextService, + BlockChainContextRequest, BlockChainContextResponse, BlockchainContext, + BlockchainContextService, }; use cuprate_helper::cast::u64_to_usize; use cuprate_types::{FeeEstimate, HardFork, HardForkInfo}; // FIXME: use `anyhow::Error` over `tower::BoxError` in blockchain context. -/// [`BlockChainContextRequest::Context`]. -pub(crate) async fn context( - blockchain_context: &mut BlockChainContextService, -) -> Result { - let BlockChainContextResponse::Context(context) = blockchain_context - .ready() - .await - .map_err(|e| anyhow!(e))? - .call(BlockChainContextRequest::Context) - .await - .map_err(|e| anyhow!(e))? - else { - unreachable!(); - }; - - Ok(context) -} - /// [`BlockChainContextRequest::HardForkInfo`]. pub(crate) async fn hard_fork_info( - blockchain_context: &mut BlockChainContextService, + blockchain_context: &mut BlockchainContextService, hard_fork: HardFork, ) -> Result { let BlockChainContextResponse::HardForkInfo(hf_info) = blockchain_context @@ -54,7 +36,7 @@ pub(crate) async fn hard_fork_info( /// [`BlockChainContextRequest::FeeEstimate`]. pub(crate) async fn fee_estimate( - blockchain_context: &mut BlockChainContextService, + blockchain_context: &mut BlockchainContextService, grace_blocks: u64, ) -> Result { let BlockChainContextResponse::FeeEstimate(fee) = blockchain_context @@ -73,7 +55,7 @@ pub(crate) async fn fee_estimate( /// [`BlockChainContextRequest::CalculatePow`] pub(crate) async fn calculate_pow( - blockchain_context: &mut BlockChainContextService, + blockchain_context: &mut BlockchainContextService, hardfork: HardFork, height: u64, block: Box, diff --git a/binaries/cuprated/src/txpool.rs b/binaries/cuprated/src/txpool.rs index 9592c2bf..bf32c539 100644 --- a/binaries/cuprated/src/txpool.rs +++ b/binaries/cuprated/src/txpool.rs @@ -1,7 +1,7 @@ //! Transaction Pool //! //! Handles initiating the tx-pool, providing the preprocessor required for the dandelion pool. -use cuprate_consensus::BlockChainContextService; +use cuprate_consensus::BlockchainContextService; use cuprate_p2p::NetworkInterface; use cuprate_p2p_core::ClearNet; use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index e2041598..109425c7 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -11,7 +11,7 @@ use tower::{Service, ServiceExt}; use cuprate_consensus::{ transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse, - BlockChainContextService, ExtendedConsensusError, VerifyTxRequest, + BlockchainContextService, ExtendedConsensusError, VerifyTxRequest, }; use cuprate_dandelion_tower::{ pool::{DandelionPoolService, IncomingTxBuilder}, @@ -71,7 +71,7 @@ pub struct IncomingTxHandler { /// A store of txs currently being handled in incoming tx requests. pub(super) txs_being_handled: TxsBeingHandled, /// The blockchain context cache. - pub(super) blockchain_context_cache: BlockChainContextService, + pub(super) blockchain_context_cache: BlockchainContextService, /// The dandelion txpool manager. pub(super) dandelion_pool_manager: DandelionPoolService, @@ -90,7 +90,7 @@ impl IncomingTxHandler { clear_net: NetworkInterface, txpool_write_handle: TxpoolWriteHandle, txpool_read_handle: TxpoolReadHandle, - blockchain_context_cache: BlockChainContextService, + blockchain_context_cache: BlockchainContextService, tx_verifier_service: ConcreteTxVerifierService, ) -> Self { let dandelion_router = dandelion::dandelion_router(clear_net); @@ -139,7 +139,7 @@ impl Service for IncomingTxHandler { async fn handle_incoming_txs( IncomingTxs { txs, state }: IncomingTxs, txs_being_handled: TxsBeingHandled, - mut blockchain_context_cache: BlockChainContextService, + mut blockchain_context_cache: BlockchainContextService, mut tx_verifier_service: ConcreteTxVerifierService, mut txpool_write_handle: TxpoolWriteHandle, mut txpool_read_handle: TxpoolReadHandle, @@ -150,18 +150,7 @@ async fn handle_incoming_txs( let (txs, stem_pool_txs, txs_being_handled_guard) = prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?; - let BlockChainContextResponse::Context(context) = blockchain_context_cache - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(BlockChainContextRequest::Context) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - else { - unreachable!() - }; - - let context = context.unchecked_blockchain_context(); + let context = blockchain_context_cache.blockchain_context(); tx_verifier_service .ready() diff --git a/consensus/context/Cargo.toml b/consensus/context/Cargo.toml index 76790464..fb5d8a8a 100644 --- a/consensus/context/Cargo.toml +++ b/consensus/context/Cargo.toml @@ -10,6 +10,7 @@ cuprate-consensus-rules = { workspace = true, features = ["proptest"]} cuprate-helper = { workspace = true, default-features = false, features = ["std", "cast", "num", "asynch"] } cuprate-types = { workspace = true, default-features = false, features = ["blockchain"] } +arc-swap = { workspace = true } futures = { workspace = true, features = ["std", "async-await"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros"]} tokio-util = { workspace = true } diff --git a/consensus/context/src/lib.rs b/consensus/context/src/lib.rs index acc4d23d..c516fb85 100644 --- a/consensus/context/src/lib.rs +++ b/consensus/context/src/lib.rs @@ -1,6 +1,6 @@ //! # Blockchain Context //! -//! This crate contains a service to get cached context from the blockchain: [`BlockChainContext`]. +//! This crate contains a service to get cached context from the blockchain: [`BlockchainContext`]. //! This is used during contextual validation, this does not have all the data for contextual validation //! (outputs) for that you will need a [`Database`]. @@ -8,6 +8,9 @@ // FIXME: should we pull in a dependency just to link docs? use monero_serai as _; +use arc_swap::ArcSwap; +use futures::{channel::oneshot, FutureExt}; +use monero_serai::block::Block; use std::{ cmp::min, collections::HashMap, @@ -16,9 +19,6 @@ use std::{ sync::Arc, task::{Context, Poll}, }; - -use futures::{channel::oneshot, FutureExt}; -use monero_serai::block::Block; use tokio::sync::mpsc; use tokio_util::sync::PollSender; use tower::Service; @@ -34,7 +34,6 @@ pub mod weight; mod alt_chains; mod task; -mod tokens; use cuprate_types::{Chain, ChainInfo, FeeEstimate, HardForkInfo}; use difficulty::DifficultyCache; @@ -44,7 +43,6 @@ use weight::BlockWeightsCache; pub use alt_chains::{sealed::AltChainRequestToken, AltChainContextCache}; pub use difficulty::DifficultyCacheConfig; pub use hardforks::HardForkConfig; -pub use tokens::*; pub use weight::BlockWeightsCacheConfig; pub const BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW: u64 = 60; @@ -96,27 +94,30 @@ impl ContextConfig { pub async fn initialize_blockchain_context( cfg: ContextConfig, database: D, -) -> Result +) -> Result where D: Database + Clone + Send + Sync + 'static, D::Future: Send + 'static, { let context_task = task::ContextTask::init_context(cfg, database).await?; + let context_cache = arc_swap::Cache::new(Arc::clone(&context_task.context_swap)); + // TODO: make buffer size configurable. let (tx, rx) = mpsc::channel(15); tokio::spawn(context_task.run(rx)); - Ok(BlockChainContextService { + Ok(BlockchainContextService { channel: PollSender::new(tx), + context_cache, }) } -/// Raw blockchain context, gotten from [`BlockChainContext`]. This data may turn invalid so is not ok to keep -/// around. You should keep around [`BlockChainContext`] instead. +/// Raw blockchain context, gotten from [`BlockchainContext`]. This data may turn invalid so is not ok to keep +/// around. You should keep around [`BlockchainContext`] instead. #[derive(Debug, Clone)] -pub struct RawBlockChainContext { +pub struct BlockchainContext { /// The current cumulative difficulty. pub cumulative_difficulty: u128, /// Context to verify a block, as needed by [`cuprate-consensus-rules`] @@ -127,14 +128,14 @@ pub struct RawBlockChainContext { top_block_timestamp: Option, } -impl std::ops::Deref for RawBlockChainContext { +impl std::ops::Deref for BlockchainContext { type Target = ContextToVerifyBlock; fn deref(&self) -> &Self::Target { &self.context_to_verify_block } } -impl RawBlockChainContext { +impl BlockchainContext { /// Returns the timestamp the should be used when checking locked outputs. /// /// ref: @@ -167,40 +168,6 @@ impl RawBlockChainContext { } } -/// Blockchain context which keeps a token of validity so users will know when the data is no longer valid. -#[derive(Debug, Clone)] -pub struct BlockChainContext { - /// A token representing this data's validity. - validity_token: ValidityToken, - /// The actual block chain context. - raw: RawBlockChainContext, -} - -#[derive(Debug, Clone, Copy, thiserror::Error)] -#[error("data is no longer valid")] -pub struct DataNoLongerValid; - -impl BlockChainContext { - /// Checks if the data is still valid. - pub fn is_still_valid(&self) -> bool { - self.validity_token.is_data_valid() - } - - /// Checks if the data is valid returning an Err if not and a reference to the blockchain context if - /// it is. - pub fn blockchain_context(&self) -> Result<&RawBlockChainContext, DataNoLongerValid> { - if !self.is_still_valid() { - return Err(DataNoLongerValid); - } - Ok(&self.raw) - } - - /// Returns the blockchain context without checking the validity token. - pub const fn unchecked_blockchain_context(&self) -> &RawBlockChainContext { - &self.raw - } -} - /// Data needed from a new block to add it to the context cache. #[derive(Debug, Clone)] pub struct NewBlockData { @@ -225,9 +192,6 @@ pub struct NewBlockData { /// A request to the blockchain context cache. #[derive(Debug, Clone)] pub enum BlockChainContextRequest { - /// Get the current blockchain context. - Context, - /// Gets all the current `RandomX` VMs. CurrentRxVms, @@ -363,9 +327,6 @@ pub enum BlockChainContextResponse { /// - [`BlockChainContextRequest::AddAltChainContextCache`] Ok, - /// Response to [`BlockChainContextRequest::Context`] - Context(BlockChainContext), - /// Response to [`BlockChainContextRequest::CurrentRxVms`] /// /// A map of seed height to `RandomX` VMs. @@ -403,11 +364,19 @@ pub enum BlockChainContextResponse { /// The blockchain context service. #[derive(Clone)] -pub struct BlockChainContextService { +pub struct BlockchainContextService { channel: PollSender, + + context_cache: arc_swap::Cache>, Arc>, } -impl Service for BlockChainContextService { +impl BlockchainContextService { + pub fn blockchain_context(&mut self) -> &Arc { + self.context_cache.load() + } +} + +impl Service for BlockchainContextService { type Response = BlockChainContextResponse; type Error = tower::BoxError; type Future = diff --git a/consensus/context/src/task.rs b/consensus/context/src/task.rs index b0759952..8bb6367a 100644 --- a/consensus/context/src/task.rs +++ b/consensus/context/src/task.rs @@ -3,7 +3,10 @@ //! This module contains the async task that handles keeping track of blockchain context. //! It holds all the context caches and handles [`tower::Service`] requests. //! + +use arc_swap::ArcSwap; use futures::channel::oneshot; +use std::sync::Arc; use tokio::sync::mpsc; use tower::ServiceExt; use tracing::Instrument; @@ -12,14 +15,14 @@ use cuprate_consensus_rules::blocks::ContextToVerifyBlock; use cuprate_helper::cast::u64_to_usize; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, - Chain, + Chain, HardFork, }; use crate::{ alt_chains::{get_alt_chain_difficulty_cache, get_alt_chain_weight_cache, AltChainMap}, - difficulty, hardforks, rx_vms, weight, BlockChainContext, BlockChainContextRequest, - BlockChainContextResponse, ContextCacheError, ContextConfig, Database, RawBlockChainContext, - ValidityToken, BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW, + difficulty, hardforks, rx_vms, weight, BlockChainContextRequest, BlockChainContextResponse, + BlockchainContext, ContextCacheError, ContextConfig, Database, + BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW, }; /// A request from the context service to the context task. @@ -34,9 +37,7 @@ pub(super) struct ContextTaskRequest { /// The Context task that keeps the blockchain context and handles requests. pub(crate) struct ContextTask { - /// A token used to invalidate previous contexts when a new - /// block is added to the chain. - current_validity_token: ValidityToken, + pub(crate) context_swap: Arc>, /// The difficulty cache. difficulty_cache: difficulty::DifficultyCache, @@ -128,10 +129,22 @@ impl ContextTask { rx_vms::RandomXVmCache::init_from_chain_height(chain_height, ¤t_hf, db).await }); + let difficulty_cache = difficulty_cache_handle.await.unwrap()?; + let weight_cache = weight_cache_handle.await.unwrap()?; + + let blockchain_context = blockchain_context( + current_hf, + top_block_hash, + chain_height, + already_generated_coins, + &weight_cache, + &difficulty_cache, + ); + let context_svc = Self { - current_validity_token: ValidityToken::new(), - difficulty_cache: difficulty_cache_handle.await.unwrap()?, - weight_cache: weight_cache_handle.await.unwrap()?, + context_swap: Arc::new(ArcSwap::new(Arc::new(blockchain_context))), + difficulty_cache, + weight_cache, rx_vm_cache: rx_seed_handle.await.unwrap()?, hardfork_state, alt_chain_cache_map: AltChainMap::new(), @@ -144,42 +157,23 @@ impl ContextTask { Ok(context_svc) } + fn update_blockchain_context(&self) { + self.context_swap.store(Arc::new(blockchain_context( + self.hardfork_state.current_hardfork, + self.top_block_hash, + self.chain_height, + self.already_generated_coins, + &self.weight_cache, + &self.difficulty_cache, + ))); + } + /// Handles a [`BlockChainContextRequest`] and returns a [`BlockChainContextResponse`]. pub(crate) async fn handle_req( &mut self, req: BlockChainContextRequest, ) -> Result { Ok(match req { - BlockChainContextRequest::Context => { - tracing::debug!("Getting blockchain context"); - - let current_hf = self.hardfork_state.current_hardfork(); - - BlockChainContextResponse::Context(BlockChainContext { - validity_token: self.current_validity_token.clone(), - raw: RawBlockChainContext { - context_to_verify_block: ContextToVerifyBlock { - median_weight_for_block_reward: self - .weight_cache - .median_for_block_reward(current_hf), - effective_median_weight: self - .weight_cache - .effective_median_block_weight(current_hf), - top_hash: self.top_block_hash, - median_block_timestamp: self - .difficulty_cache - .median_timestamp(u64_to_usize(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW)), - chain_height: self.chain_height, - current_hf, - next_difficulty: self.difficulty_cache.next_difficulty(current_hf), - already_generated_coins: self.already_generated_coins, - }, - cumulative_difficulty: self.difficulty_cache.cumulative_difficulty(), - median_long_term_weight: self.weight_cache.median_long_term_weight(), - top_block_timestamp: self.difficulty_cache.top_block_timestamp(), - }, - }) - } BlockChainContextRequest::CurrentRxVms => { BlockChainContextResponse::RxVms(self.rx_vm_cache.get_vms().await) } @@ -202,9 +196,6 @@ impl ContextTask { "Updating blockchain cache with new block, height: {}", new.height ); - // Cancel the validity token and replace it with a new one. - std::mem::replace(&mut self.current_validity_token, ValidityToken::new()) - .set_data_invalid(); self.difficulty_cache.new_block( new.height, @@ -225,6 +216,8 @@ impl ContextTask { .already_generated_coins .saturating_add(new.generated_coins); + self.update_blockchain_context(); + BlockChainContextResponse::Ok } BlockChainContextRequest::PopBlocks { numb_blocks } => { @@ -272,8 +265,7 @@ impl ContextTask { self.already_generated_coins = already_generated_coins; self.top_block_hash = top_block_hash; - std::mem::replace(&mut self.current_validity_token, ValidityToken::new()) - .set_data_invalid(); + self.update_blockchain_context(); BlockChainContextResponse::Ok } @@ -342,3 +334,29 @@ impl ContextTask { tracing::info!("Shutting down blockchain context task."); } } + +fn blockchain_context( + hf: HardFork, + top_hash: [u8; 32], + chain_height: usize, + already_generated_coins: u64, + weight_cache: &weight::BlockWeightsCache, + difficulty_cache: &difficulty::DifficultyCache, +) -> BlockchainContext { + BlockchainContext { + context_to_verify_block: ContextToVerifyBlock { + median_weight_for_block_reward: weight_cache.median_for_block_reward(hf), + effective_median_weight: weight_cache.effective_median_block_weight(hf), + top_hash, + median_block_timestamp: difficulty_cache + .median_timestamp(u64_to_usize(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW)), + chain_height, + current_hf: hf, + next_difficulty: difficulty_cache.next_difficulty(hf), + already_generated_coins, + }, + cumulative_difficulty: difficulty_cache.cumulative_difficulty(), + median_long_term_weight: weight_cache.median_long_term_weight(), + top_block_timestamp: difficulty_cache.top_block_timestamp(), + } +} diff --git a/consensus/context/src/tokens.rs b/consensus/context/src/tokens.rs deleted file mode 100644 index d2223039..00000000 --- a/consensus/context/src/tokens.rs +++ /dev/null @@ -1,33 +0,0 @@ -//! Tokens -//! -//! This module contains tokens which keep track of the validity of certain data. -//! Currently, there is 1 token: -//! - [`ValidityToken`] -//! - -use tokio_util::sync::CancellationToken; - -/// A token representing if a piece of data is valid. -#[derive(Debug, Clone, Default)] -pub struct ValidityToken { - token: CancellationToken, -} - -impl ValidityToken { - /// Creates a new [`ValidityToken`] - pub fn new() -> Self { - Self { - token: CancellationToken::new(), - } - } - - /// Returns `true` if the data is still valid. - pub fn is_data_valid(&self) -> bool { - !self.token.is_cancelled() - } - - /// Sets the data to invalid. - pub fn set_data_invalid(self) { - self.token.cancel(); - } -} diff --git a/consensus/fast-sync/src/fast_sync.rs b/consensus/fast-sync/src/fast_sync.rs index 3764e217..647e2246 100644 --- a/consensus/fast-sync/src/fast_sync.rs +++ b/consensus/fast-sync/src/fast_sync.rs @@ -10,10 +10,10 @@ use monero_serai::{ block::Block, transaction::{Input, Transaction}, }; -use tower::{Service, ServiceExt}; +use tower::Service; use cuprate_consensus::transactions::new_tx_verification_data; -use cuprate_consensus_context::{BlockChainContextRequest, BlockChainContextResponse}; +use cuprate_consensus_context::BlockchainContextService; use cuprate_consensus_rules::{miner_tx::MinerTxError, ConsensusError}; use cuprate_helper::cast::u64_to_usize; use cuprate_types::{VerifiedBlockInformation, VerifiedTransactionInformation}; @@ -110,37 +110,18 @@ impl From for FastSyncError { } } -pub struct FastSyncService { - context_svc: C, +pub struct FastSyncService { + context_svc: BlockchainContextService, } -impl FastSyncService -where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Clone - + Send - + 'static, -{ +impl FastSyncService { #[expect(dead_code)] - pub(crate) const fn new(context_svc: C) -> Self { + pub(crate) const fn new(context_svc: BlockchainContextService) -> Self { Self { context_svc } } } -impl Service for FastSyncService -where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Clone - + Send - + 'static, - C::Future: Send + 'static, -{ +impl Service for FastSyncService { type Response = FastSyncResponse; type Error = FastSyncError; type Future = @@ -210,31 +191,13 @@ fn validate_hashes( }) } -async fn validate_block( - mut context_svc: C, +async fn validate_block( + mut context_svc: BlockchainContextService, block: Block, mut txs: HashMap<[u8; 32], Transaction>, token: ValidBlockId, -) -> Result -where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Send - + 'static, - C::Future: Send + 'static, -{ - let BlockChainContextResponse::Context(checked_context) = context_svc - .ready() - .await? - .call(BlockChainContextRequest::Context) - .await? - else { - panic!("Context service returned wrong response!"); - }; - - let block_chain_ctx = checked_context.unchecked_blockchain_context().clone(); +) -> Result { + let context = context_svc.blockchain_context(); let block_hash = block.hash(); if block_hash != token.0 { @@ -246,7 +209,7 @@ where let Some(Input::Gen(height)) = block.miner_transaction.prefix().inputs.first() else { return Err(FastSyncError::MinerTx(MinerTxError::InputNotOfTypeGen)); }; - if *height != block_chain_ctx.chain_height { + if *height != context.chain_height { return Err(FastSyncError::BlockHeightMismatch); } @@ -288,9 +251,8 @@ where height: *height, generated_coins, weight, - long_term_weight: block_chain_ctx.next_block_long_term_weight(weight), - cumulative_difficulty: block_chain_ctx.cumulative_difficulty - + block_chain_ctx.next_difficulty, + long_term_weight: context.next_block_long_term_weight(weight), + cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty, block, })) } diff --git a/consensus/src/block.rs b/consensus/src/block.rs index 3f5d749e..719825ce 100644 --- a/consensus/src/block.rs +++ b/consensus/src/block.rs @@ -15,7 +15,7 @@ use monero_serai::{ use tower::{Service, ServiceExt}; use cuprate_consensus_context::{ - BlockChainContextRequest, BlockChainContextResponse, RawBlockChainContext, + BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService, }; use cuprate_helper::asynch::rayon_spawn_async; use cuprate_types::{ @@ -242,9 +242,9 @@ pub enum VerifyBlockResponse { } /// The block verifier service. -pub struct BlockVerifierService { +pub struct BlockVerifierService { /// The context service. - context_svc: C, + context_svc: BlockchainContextService, /// The tx verifier service. tx_verifier_svc: TxV, /// The database. @@ -252,12 +252,8 @@ pub struct BlockVerifierService { _database: D, } -impl BlockVerifierService +impl BlockVerifierService where - C: Service - + Clone - + Send - + 'static, TxV: Service + Clone + Send @@ -266,7 +262,11 @@ where D::Future: Send + 'static, { /// Creates a new block verifier. - pub(crate) const fn new(context_svc: C, tx_verifier_svc: TxV, database: D) -> Self { + pub(crate) const fn new( + context_svc: BlockchainContextService, + tx_verifier_svc: TxV, + database: D, + ) -> Self { Self { context_svc, tx_verifier_svc, @@ -275,17 +275,8 @@ where } } -impl Service for BlockVerifierService +impl Service for BlockVerifierService where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Clone - + Send - + 'static, - C::Future: Send + 'static, - TxV: Service + Clone + Send @@ -320,8 +311,7 @@ where batch_prepare_main_chain_block(blocks, context_svc).await } VerifyBlockRequest::MainChainPrepped { block, txs } => { - verify_prepped_main_chain_block(block, txs, context_svc, tx_verifier_svc, None) - .await + verify_prepped_main_chain_block(block, txs, context_svc, tx_verifier_svc).await } VerifyBlockRequest::AltChain { block, @@ -334,32 +324,17 @@ where } /// Verifies a prepared block. -async fn verify_main_chain_block( +async fn verify_main_chain_block( block: Block, txs: HashMap<[u8; 32], TransactionVerificationData>, - mut context_svc: C, + mut context_svc: BlockchainContextService, tx_verifier_svc: TxV, ) -> Result where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Send - + 'static, - C::Future: Send + 'static, TxV: Service, { - let BlockChainContextResponse::Context(checked_context) = context_svc - .ready() - .await? - .call(BlockChainContextRequest::Context) - .await? - else { - panic!("Context service returned wrong response!"); - }; + let context = context_svc.blockchain_context().clone(); - let context = checked_context.unchecked_blockchain_context().clone(); tracing::debug!("got blockchain context: {:?}", context); tracing::debug!( @@ -404,49 +379,19 @@ where .map(Arc::new) .collect(); - verify_prepped_main_chain_block( - prepped_block, - ordered_txs, - context_svc, - tx_verifier_svc, - Some(context), - ) - .await + verify_prepped_main_chain_block(prepped_block, ordered_txs, context_svc, tx_verifier_svc).await } -async fn verify_prepped_main_chain_block( +async fn verify_prepped_main_chain_block( prepped_block: PreparedBlock, txs: Vec>, - context_svc: C, + mut context_svc: BlockchainContextService, tx_verifier_svc: TxV, - cached_context: Option, ) -> Result where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Send - + 'static, - C::Future: Send + 'static, TxV: Service, { - let context = if let Some(context) = cached_context { - context - } else { - let BlockChainContextResponse::Context(checked_context) = context_svc - .oneshot(BlockChainContextRequest::Context) - .await? - else { - panic!("Context service returned wrong response!"); - }; - - let context = checked_context.unchecked_blockchain_context().clone(); - - tracing::debug!("got blockchain context: {context:?}"); - - context - }; + let context = context_svc.blockchain_context(); tracing::debug!("verifying block: {}", hex::encode(prepped_block.block_hash)); diff --git a/consensus/src/block/batch_prepare.rs b/consensus/src/block/batch_prepare.rs index ef384f5d..37aa2dc0 100644 --- a/consensus/src/block/batch_prepare.rs +++ b/consensus/src/block/batch_prepare.rs @@ -1,11 +1,7 @@ use std::{collections::HashMap, sync::Arc}; -use monero_serai::{block::Block, transaction::Transaction}; -use rayon::prelude::*; -use tower::{Service, ServiceExt}; -use tracing::instrument; - use cuprate_consensus_context::rx_vms::RandomXVm; +use cuprate_consensus_context::BlockchainContextService; use cuprate_consensus_rules::{ blocks::{check_block_pow, is_randomx_seed_height, randomx_seed_height, BlockError}, hard_forks::HardForkError, @@ -13,6 +9,10 @@ use cuprate_consensus_rules::{ ConsensusError, HardFork, }; use cuprate_helper::asynch::rayon_spawn_async; +use monero_serai::{block::Block, transaction::Transaction}; +use rayon::prelude::*; +use tower::{Service, ServiceExt}; +use tracing::instrument; use crate::{ block::{free::pull_ordered_transactions, PreparedBlock, PreparedBlockExPow}, @@ -23,19 +23,10 @@ use crate::{ /// Batch prepares a list of blocks for verification. #[instrument(level = "debug", name = "batch_prep_blocks", skip_all, fields(amt = blocks.len()))] -pub(crate) async fn batch_prepare_main_chain_block( +pub(crate) async fn batch_prepare_main_chain_block( blocks: Vec<(Block, Vec)>, - mut context_svc: C, -) -> Result -where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Send - + 'static, - C::Future: Send + 'static, -{ + mut context_svc: BlockchainContextService, +) -> Result { let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip(); tracing::debug!("Calculating block hashes."); @@ -89,16 +80,6 @@ where timestamps_hfs.push((block_0.block.header.timestamp, block_0.hf_version)); } - // Get the current blockchain context. - let BlockChainContextResponse::Context(checked_context) = context_svc - .ready() - .await? - .call(BlockChainContextRequest::Context) - .await? - else { - panic!("Context service returned wrong response!"); - }; - // Calculate the expected difficulties for each block in the batch. let BlockChainContextResponse::BatchDifficulties(difficulties) = context_svc .ready() @@ -111,7 +92,8 @@ where panic!("Context service returned wrong response!"); }; - let context = checked_context.unchecked_blockchain_context().clone(); + // Get the current blockchain context. + let context = context_svc.blockchain_context(); // Make sure the blocks follow the main chain. diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index f21d00b2..65fb1d8e 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -2,7 +2,7 @@ //! //! This crate contains 3 [`tower::Service`]s that implement Monero's consensus rules: //! -//! - [`BlockChainContextService`] Which handles keeping the current state of the blockchain. +//! - [`BlockchainContextService`] Which handles keeping the current state of the blockchain. //! - [`BlockVerifierService`] Which handles block verification. //! - [`TxVerifierService`] Which handles transaction verification. //! @@ -30,8 +30,8 @@ pub mod transactions; pub use block::{BlockVerifierService, VerifyBlockRequest, VerifyBlockResponse}; pub use cuprate_consensus_context::{ - initialize_blockchain_context, BlockChainContext, BlockChainContextRequest, - BlockChainContextResponse, BlockChainContextService, ContextConfig, + initialize_blockchain_context, BlockChainContextRequest, BlockChainContextResponse, + BlockchainContext, BlockchainContextService, ContextConfig, }; pub use transactions::{TxVerifierService, VerifyTxRequest, VerifyTxResponse}; @@ -64,25 +64,16 @@ pub enum ExtendedConsensusError { } /// Initialize the 2 verifier [`tower::Service`]s (block and transaction). -pub fn initialize_verifier( +pub fn initialize_verifier( database: D, - ctx_svc: Ctx, + ctx_svc: BlockchainContextService, ) -> ( - BlockVerifierService, D>, + BlockVerifierService, D>, TxVerifierService, ) where D: Database + Clone + Send + Sync + 'static, D::Future: Send + 'static, - Ctx: tower::Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Clone - + Send - + Sync - + 'static, - Ctx::Future: Send + 'static, { let tx_svc = TxVerifierService::new(database.clone()); let block_svc = BlockVerifierService::new(ctx_svc, tx_svc.clone(), database);