use arc-swap to cache context

This commit is contained in:
Boog900 2024-11-23 23:55:01 +00:00
parent 4b925b8c78
commit 525d1322c8
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
19 changed files with 193 additions and 444 deletions

7
Cargo.lock generated
View file

@ -56,6 +56,12 @@ version = "1.0.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74f37166d7d48a0284b99dd824694c26119c700b53bf0d1540cdb147dbdaaf13"
[[package]]
name = "arc-swap"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
name = "arrayref"
version = "0.3.9"
@ -642,6 +648,7 @@ dependencies = [
name = "cuprate-consensus-context"
version = "0.1.0"
dependencies = [
"arc-swap",
"cuprate-consensus-rules",
"cuprate-helper",
"cuprate-types",

View file

@ -84,6 +84,7 @@ cuprate-zmq-types = { path = "zmq/types", default-features =
# External dependencies
anyhow = { version = "1", default-features = false }
arc-swap = { version = "1", default-features = false }
arrayvec = { version = "0.7", default-features = false }
async-trait = { version = "0.1", default-features = false }
bitflags = { version = "2", default-features = false }
@ -135,7 +136,6 @@ tokio-test = { version = "0.4" }
## TODO:
## Potential dependencies.
# arc-swap = { version = "1.6.0" } # Atomically swappable Arc<T> | https://github.com/vorner/arc-swap
# itoa = { version = "1.0.9" } # Fast integer to string formatting | https://github.com/dtolnay/itoa
# notify = { version = "6.1.1" } # Filesystem watching | https://github.com/notify-rs/notify
# once_cell = { version = "1.18.0" } # Lazy/one-time initialization | https://github.com/matklad/once_cell

View file

@ -8,7 +8,7 @@ use tokio::sync::{mpsc, Notify};
use tower::{BoxError, Service, ServiceExt};
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::{generate_genesis_block, BlockChainContextService, ContextConfig};
use cuprate_consensus::{generate_genesis_block, BlockchainContextService, ContextConfig};
use cuprate_cryptonight::cryptonight_hash_v0;
use cuprate_p2p::{block_downloader::BlockDownloaderConfig, NetworkInterface};
use cuprate_p2p_core::{ClearNet, Network};
@ -84,7 +84,7 @@ pub async fn init_consensus(
(
ConcreteBlockVerifierService,
ConcreteTxVerifierService,
BlockChainContextService,
BlockchainContextService,
),
BoxError,
> {

View file

@ -8,11 +8,10 @@ use tracing::error;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::{
BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService,
BlockVerifierService, ExtendedConsensusError, TxVerifierService, VerifyBlockRequest,
BlockChainContextRequest, BlockChainContextResponse, BlockVerifierService,
BlockchainContextService, ExtendedConsensusError, TxVerifierService, VerifyBlockRequest,
VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse,
};
use cuprate_consensus_context::RawBlockChainContext;
use cuprate_p2p::{
block_downloader::{BlockBatch, BlockDownloaderConfig},
BroadcastSvc, NetworkInterface,
@ -48,7 +47,7 @@ pub async fn init_blockchain_manager(
blockchain_write_handle: BlockchainWriteHandle,
blockchain_read_handle: BlockchainReadHandle,
txpool_write_handle: TxpoolWriteHandle,
mut blockchain_context_service: BlockChainContextService,
mut blockchain_context_service: BlockchainContextService,
block_verifier_service: ConcreteBlockVerifierService,
block_downloader_config: BlockDownloaderConfig,
) {
@ -68,23 +67,11 @@ pub async fn init_blockchain_manager(
block_downloader_config,
));
let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::Context)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!()
};
let manager = BlockchainManager {
blockchain_write_handle,
blockchain_read_handle,
txpool_write_handle,
blockchain_context_service,
cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(),
block_verifier_service,
stop_current_block_downloader,
broadcast_svc: clearnet_interface.broadcast_svc(),
@ -107,13 +94,9 @@ pub struct BlockchainManager {
blockchain_read_handle: BlockchainReadHandle,
/// A [`TxpoolWriteHandle`].
txpool_write_handle: TxpoolWriteHandle,
// TODO: Improve the API of the cache service.
// TODO: rename the cache service -> `BlockchainContextService`.
/// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve
/// values without needing to go to a [`BlockchainReadHandle`].
blockchain_context_service: BlockChainContextService,
/// A cached context representing the current state.
cached_blockchain_context: RawBlockChainContext,
blockchain_context_service: BlockchainContextService,
/// The block verifier service, to verify incoming blocks.
block_verifier_service: ConcreteBlockVerifierService,
/// A [`Notify`] to tell the [syncer](syncer::syncer) that we want to cancel this current download

View file

@ -83,7 +83,11 @@ impl super::BlockchainManager {
block: Block,
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
) -> Result<IncomingBlockOk, anyhow::Error> {
if block.header.previous != self.cached_blockchain_context.top_hash {
let context = self.blockchain_context_service.blockchain_context();
let top_hash = context.top_hash;
let chain_height = context.chain_height;
if block.header.previous != top_hash {
self.handle_incoming_alt_block(block, prepared_txs).await?;
return Ok(IncomingBlockOk::AddedToAltChain);
}
@ -105,8 +109,7 @@ impl super::BlockchainManager {
let block_blob = Bytes::copy_from_slice(&verified_block.block_blob);
self.add_valid_block_to_main_chain(verified_block).await;
self.broadcast_block(block_blob, self.cached_blockchain_context.chain_height)
.await;
self.broadcast_block(block_blob, chain_height).await;
Ok(IncomingBlockOk::AddedToMainChain)
}
@ -126,7 +129,12 @@ impl super::BlockchainManager {
.first()
.expect("Block batch should not be empty");
if first_block.header.previous == self.cached_blockchain_context.top_hash {
if first_block.header.previous
== self
.blockchain_context_service
.blockchain_context()
.top_hash
{
self.handle_incoming_block_batch_main_chain(batch).await;
} else {
self.handle_incoming_block_batch_alt_chain(batch).await;
@ -285,7 +293,10 @@ impl super::BlockchainManager {
// If this alt chain
if alt_block_info.cumulative_difficulty
> self.cached_blockchain_context.cumulative_difficulty
> self
.blockchain_context_service
.blockchain_context()
.cumulative_difficulty
{
self.try_do_reorg(alt_block_info).await?;
return Ok(AddAltBlock::Reorged);
@ -338,7 +349,10 @@ impl super::BlockchainManager {
alt_blocks.push(top_alt_block);
let split_height = alt_blocks[0].height;
let current_main_chain_height = self.cached_blockchain_context.chain_height;
let current_main_chain_height = self
.blockchain_context_service
.blockchain_context()
.chain_height;
let BlockchainResponse::PopBlocks(old_main_chain_id) = self
.blockchain_write_handle
@ -472,20 +486,6 @@ impl super::BlockchainManager {
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
let BlockChainContextResponse::Context(blockchain_context) = self
.blockchain_context_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::Context)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!();
};
self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone();
self.txpool_write_handle
.ready()
.await

View file

@ -9,7 +9,8 @@ use tokio::{
use tower::{Service, ServiceExt};
use tracing::instrument;
use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse};
use cuprate_consensus::{BlockChainContextRequest, BlockChainContextResponse, BlockchainContext};
use cuprate_consensus_context::BlockchainContextService;
use cuprate_p2p::{
block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
NetworkInterface, PeerSetRequest, PeerSetResponse,
@ -29,8 +30,8 @@ pub enum SyncerError {
/// The syncer tasks that makes sure we are fully synchronised with our connected peers.
#[instrument(level = "debug", skip_all)]
pub async fn syncer<C, CN>(
mut context_svc: C,
pub async fn syncer<CN>(
mut context_svc: BlockchainContextService,
our_chain: CN,
mut clearnet_interface: NetworkInterface<ClearNet>,
incoming_block_batch_tx: mpsc::Sender<BlockBatch>,
@ -38,12 +39,6 @@ pub async fn syncer<C, CN>(
block_downloader_config: BlockDownloaderConfig,
) -> Result<(), SyncerError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
>,
C::Future: Send + 'static,
CN: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
+ Clone
+ Send
@ -54,15 +49,6 @@ where
let mut check_sync_interval = interval(CHECK_SYNC_FREQUENCY);
let BlockChainContextResponse::Context(mut blockchain_ctx) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::Context)
.await?
else {
unreachable!();
};
tracing::debug!("Waiting for new sync info in top sync channel");
loop {
@ -70,8 +56,7 @@ where
tracing::trace!("Checking connected peers to see if we are behind",);
check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?;
let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context();
let blockchain_context = context_svc.blockchain_context();
let PeerSetResponse::MostPoWSeen {
cumulative_difficulty,
@ -86,7 +71,7 @@ where
unreachable!();
};
if cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty {
if cumulative_difficulty <= blockchain_context.cumulative_difficulty {
continue;
}
@ -116,32 +101,3 @@ where
}
}
}
/// Checks if we should update the given [`BlockChainContext`] and updates it if needed.
async fn check_update_blockchain_context<C>(
context_svc: C,
old_context: &mut BlockChainContext,
) -> Result<(), tower::BoxError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
>,
C::Future: Send + 'static,
{
if old_context.blockchain_context().is_ok() {
return Ok(());
}
let BlockChainContextResponse::Context(ctx) = context_svc
.oneshot(BlockChainContextRequest::Context)
.await?
else {
unreachable!();
};
*old_context = ctx;
Ok(())
}

View file

@ -1,14 +1,11 @@
use tower::util::MapErr;
use cuprate_blockchain::{cuprate_database::RuntimeError, service::BlockchainReadHandle};
use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService};
use cuprate_consensus::{BlockVerifierService, BlockchainContextService, TxVerifierService};
/// The [`BlockVerifierService`] with all generic types defined.
pub type ConcreteBlockVerifierService = BlockVerifierService<
BlockChainContextService,
ConcreteTxVerifierService,
ConsensusBlockchainReadHandle,
>;
pub type ConcreteBlockVerifierService =
BlockVerifierService<ConcreteTxVerifierService, ConsensusBlockchainReadHandle>;
/// The [`TxVerifierService`] with all generic types defined.
pub type ConcreteTxVerifierService = TxVerifierService<ConsensusBlockchainReadHandle>;

View file

@ -8,7 +8,7 @@ use monero_serai::block::Block;
use tower::Service;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::BlockChainContextService;
use cuprate_consensus::BlockchainContextService;
use cuprate_pruning::PruningSeed;
use cuprate_rpc_interface::RpcHandler;
use cuprate_rpc_types::{
@ -148,7 +148,7 @@ pub struct CupratedRpcHandler {
pub blockchain_read: BlockchainReadHandle,
/// Handle to the blockchain context service.
pub blockchain_context: BlockChainContextService,
pub blockchain_context: BlockchainContextService,
/// Handle to the blockchain manager.
pub blockchain_manager: BlockchainManagerHandle,
@ -165,7 +165,7 @@ impl CupratedRpcHandler {
pub const fn new(
restricted: bool,
blockchain_read: BlockchainReadHandle,
blockchain_context: BlockChainContextService,
blockchain_context: BlockchainContextService,
blockchain_manager: BlockchainManagerHandle,
txpool_read: TxpoolReadHandle,
txpool_manager: std::convert::Infallible,

View file

@ -7,35 +7,17 @@ use monero_serai::block::Block;
use tower::{Service, ServiceExt};
use cuprate_consensus_context::{
BlockChainContext, BlockChainContextRequest, BlockChainContextResponse,
BlockChainContextService,
BlockChainContextRequest, BlockChainContextResponse, BlockchainContext,
BlockchainContextService,
};
use cuprate_helper::cast::u64_to_usize;
use cuprate_types::{FeeEstimate, HardFork, HardForkInfo};
// FIXME: use `anyhow::Error` over `tower::BoxError` in blockchain context.
/// [`BlockChainContextRequest::Context`].
pub(crate) async fn context(
blockchain_context: &mut BlockChainContextService,
) -> Result<BlockChainContext, Error> {
let BlockChainContextResponse::Context(context) = blockchain_context
.ready()
.await
.map_err(|e| anyhow!(e))?
.call(BlockChainContextRequest::Context)
.await
.map_err(|e| anyhow!(e))?
else {
unreachable!();
};
Ok(context)
}
/// [`BlockChainContextRequest::HardForkInfo`].
pub(crate) async fn hard_fork_info(
blockchain_context: &mut BlockChainContextService,
blockchain_context: &mut BlockchainContextService,
hard_fork: HardFork,
) -> Result<HardForkInfo, Error> {
let BlockChainContextResponse::HardForkInfo(hf_info) = blockchain_context
@ -54,7 +36,7 @@ pub(crate) async fn hard_fork_info(
/// [`BlockChainContextRequest::FeeEstimate`].
pub(crate) async fn fee_estimate(
blockchain_context: &mut BlockChainContextService,
blockchain_context: &mut BlockchainContextService,
grace_blocks: u64,
) -> Result<FeeEstimate, Error> {
let BlockChainContextResponse::FeeEstimate(fee) = blockchain_context
@ -73,7 +55,7 @@ pub(crate) async fn fee_estimate(
/// [`BlockChainContextRequest::CalculatePow`]
pub(crate) async fn calculate_pow(
blockchain_context: &mut BlockChainContextService,
blockchain_context: &mut BlockchainContextService,
hardfork: HardFork,
height: u64,
block: Box<Block>,

View file

@ -1,7 +1,7 @@
//! Transaction Pool
//!
//! Handles initiating the tx-pool, providing the preprocessor required for the dandelion pool.
use cuprate_consensus::BlockChainContextService;
use cuprate_consensus::BlockchainContextService;
use cuprate_p2p::NetworkInterface;
use cuprate_p2p_core::ClearNet;
use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};

View file

@ -11,7 +11,7 @@ use tower::{Service, ServiceExt};
use cuprate_consensus::{
transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
BlockChainContextService, ExtendedConsensusError, VerifyTxRequest,
BlockchainContextService, ExtendedConsensusError, VerifyTxRequest,
};
use cuprate_dandelion_tower::{
pool::{DandelionPoolService, IncomingTxBuilder},
@ -71,7 +71,7 @@ pub struct IncomingTxHandler {
/// A store of txs currently being handled in incoming tx requests.
pub(super) txs_being_handled: TxsBeingHandled,
/// The blockchain context cache.
pub(super) blockchain_context_cache: BlockChainContextService,
pub(super) blockchain_context_cache: BlockchainContextService,
/// The dandelion txpool manager.
pub(super) dandelion_pool_manager:
DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
@ -90,7 +90,7 @@ impl IncomingTxHandler {
clear_net: NetworkInterface<ClearNet>,
txpool_write_handle: TxpoolWriteHandle,
txpool_read_handle: TxpoolReadHandle,
blockchain_context_cache: BlockChainContextService,
blockchain_context_cache: BlockchainContextService,
tx_verifier_service: ConcreteTxVerifierService,
) -> Self {
let dandelion_router = dandelion::dandelion_router(clear_net);
@ -139,7 +139,7 @@ impl Service<IncomingTxs> for IncomingTxHandler {
async fn handle_incoming_txs(
IncomingTxs { txs, state }: IncomingTxs,
txs_being_handled: TxsBeingHandled,
mut blockchain_context_cache: BlockChainContextService,
mut blockchain_context_cache: BlockchainContextService,
mut tx_verifier_service: ConcreteTxVerifierService,
mut txpool_write_handle: TxpoolWriteHandle,
mut txpool_read_handle: TxpoolReadHandle,
@ -150,18 +150,7 @@ async fn handle_incoming_txs(
let (txs, stem_pool_txs, txs_being_handled_guard) =
prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?;
let BlockChainContextResponse::Context(context) = blockchain_context_cache
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::Context)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!()
};
let context = context.unchecked_blockchain_context();
let context = blockchain_context_cache.blockchain_context();
tx_verifier_service
.ready()

View file

@ -10,6 +10,7 @@ cuprate-consensus-rules = { workspace = true, features = ["proptest"]}
cuprate-helper = { workspace = true, default-features = false, features = ["std", "cast", "num", "asynch"] }
cuprate-types = { workspace = true, default-features = false, features = ["blockchain"] }
arc-swap = { workspace = true }
futures = { workspace = true, features = ["std", "async-await"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"]}
tokio-util = { workspace = true }

View file

@ -1,6 +1,6 @@
//! # Blockchain Context
//!
//! This crate contains a service to get cached context from the blockchain: [`BlockChainContext`].
//! This crate contains a service to get cached context from the blockchain: [`BlockchainContext`].
//! This is used during contextual validation, this does not have all the data for contextual validation
//! (outputs) for that you will need a [`Database`].
@ -8,6 +8,9 @@
// FIXME: should we pull in a dependency just to link docs?
use monero_serai as _;
use arc_swap::ArcSwap;
use futures::{channel::oneshot, FutureExt};
use monero_serai::block::Block;
use std::{
cmp::min,
collections::HashMap,
@ -16,9 +19,6 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use futures::{channel::oneshot, FutureExt};
use monero_serai::block::Block;
use tokio::sync::mpsc;
use tokio_util::sync::PollSender;
use tower::Service;
@ -34,7 +34,6 @@ pub mod weight;
mod alt_chains;
mod task;
mod tokens;
use cuprate_types::{Chain, ChainInfo, FeeEstimate, HardForkInfo};
use difficulty::DifficultyCache;
@ -44,7 +43,6 @@ use weight::BlockWeightsCache;
pub use alt_chains::{sealed::AltChainRequestToken, AltChainContextCache};
pub use difficulty::DifficultyCacheConfig;
pub use hardforks::HardForkConfig;
pub use tokens::*;
pub use weight::BlockWeightsCacheConfig;
pub const BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW: u64 = 60;
@ -96,27 +94,30 @@ impl ContextConfig {
pub async fn initialize_blockchain_context<D>(
cfg: ContextConfig,
database: D,
) -> Result<BlockChainContextService, ContextCacheError>
) -> Result<BlockchainContextService, ContextCacheError>
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
let context_task = task::ContextTask::init_context(cfg, database).await?;
let context_cache = arc_swap::Cache::new(Arc::clone(&context_task.context_swap));
// TODO: make buffer size configurable.
let (tx, rx) = mpsc::channel(15);
tokio::spawn(context_task.run(rx));
Ok(BlockChainContextService {
Ok(BlockchainContextService {
channel: PollSender::new(tx),
context_cache,
})
}
/// Raw blockchain context, gotten from [`BlockChainContext`]. This data may turn invalid so is not ok to keep
/// around. You should keep around [`BlockChainContext`] instead.
/// Raw blockchain context, gotten from [`BlockchainContext`]. This data may turn invalid so is not ok to keep
/// around. You should keep around [`BlockchainContext`] instead.
#[derive(Debug, Clone)]
pub struct RawBlockChainContext {
pub struct BlockchainContext {
/// The current cumulative difficulty.
pub cumulative_difficulty: u128,
/// Context to verify a block, as needed by [`cuprate-consensus-rules`]
@ -127,14 +128,14 @@ pub struct RawBlockChainContext {
top_block_timestamp: Option<u64>,
}
impl std::ops::Deref for RawBlockChainContext {
impl std::ops::Deref for BlockchainContext {
type Target = ContextToVerifyBlock;
fn deref(&self) -> &Self::Target {
&self.context_to_verify_block
}
}
impl RawBlockChainContext {
impl BlockchainContext {
/// Returns the timestamp the should be used when checking locked outputs.
///
/// ref: <https://cuprate.github.io/monero-book/consensus_rules/transactions/unlock_time.html#getting-the-current-time>
@ -167,40 +168,6 @@ impl RawBlockChainContext {
}
}
/// Blockchain context which keeps a token of validity so users will know when the data is no longer valid.
#[derive(Debug, Clone)]
pub struct BlockChainContext {
/// A token representing this data's validity.
validity_token: ValidityToken,
/// The actual block chain context.
raw: RawBlockChainContext,
}
#[derive(Debug, Clone, Copy, thiserror::Error)]
#[error("data is no longer valid")]
pub struct DataNoLongerValid;
impl BlockChainContext {
/// Checks if the data is still valid.
pub fn is_still_valid(&self) -> bool {
self.validity_token.is_data_valid()
}
/// Checks if the data is valid returning an Err if not and a reference to the blockchain context if
/// it is.
pub fn blockchain_context(&self) -> Result<&RawBlockChainContext, DataNoLongerValid> {
if !self.is_still_valid() {
return Err(DataNoLongerValid);
}
Ok(&self.raw)
}
/// Returns the blockchain context without checking the validity token.
pub const fn unchecked_blockchain_context(&self) -> &RawBlockChainContext {
&self.raw
}
}
/// Data needed from a new block to add it to the context cache.
#[derive(Debug, Clone)]
pub struct NewBlockData {
@ -225,9 +192,6 @@ pub struct NewBlockData {
/// A request to the blockchain context cache.
#[derive(Debug, Clone)]
pub enum BlockChainContextRequest {
/// Get the current blockchain context.
Context,
/// Gets all the current `RandomX` VMs.
CurrentRxVms,
@ -363,9 +327,6 @@ pub enum BlockChainContextResponse {
/// - [`BlockChainContextRequest::AddAltChainContextCache`]
Ok,
/// Response to [`BlockChainContextRequest::Context`]
Context(BlockChainContext),
/// Response to [`BlockChainContextRequest::CurrentRxVms`]
///
/// A map of seed height to `RandomX` VMs.
@ -403,11 +364,19 @@ pub enum BlockChainContextResponse {
/// The blockchain context service.
#[derive(Clone)]
pub struct BlockChainContextService {
pub struct BlockchainContextService {
channel: PollSender<task::ContextTaskRequest>,
context_cache: arc_swap::Cache<Arc<ArcSwap<BlockchainContext>>, Arc<BlockchainContext>>,
}
impl Service<BlockChainContextRequest> for BlockChainContextService {
impl BlockchainContextService {
pub fn blockchain_context(&mut self) -> &Arc<BlockchainContext> {
self.context_cache.load()
}
}
impl Service<BlockChainContextRequest> for BlockchainContextService {
type Response = BlockChainContextResponse;
type Error = tower::BoxError;
type Future =

View file

@ -3,7 +3,10 @@
//! This module contains the async task that handles keeping track of blockchain context.
//! It holds all the context caches and handles [`tower::Service`] requests.
//!
use arc_swap::ArcSwap;
use futures::channel::oneshot;
use std::sync::Arc;
use tokio::sync::mpsc;
use tower::ServiceExt;
use tracing::Instrument;
@ -12,14 +15,14 @@ use cuprate_consensus_rules::blocks::ContextToVerifyBlock;
use cuprate_helper::cast::u64_to_usize;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain,
Chain, HardFork,
};
use crate::{
alt_chains::{get_alt_chain_difficulty_cache, get_alt_chain_weight_cache, AltChainMap},
difficulty, hardforks, rx_vms, weight, BlockChainContext, BlockChainContextRequest,
BlockChainContextResponse, ContextCacheError, ContextConfig, Database, RawBlockChainContext,
ValidityToken, BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW,
difficulty, hardforks, rx_vms, weight, BlockChainContextRequest, BlockChainContextResponse,
BlockchainContext, ContextCacheError, ContextConfig, Database,
BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW,
};
/// A request from the context service to the context task.
@ -34,9 +37,7 @@ pub(super) struct ContextTaskRequest {
/// The Context task that keeps the blockchain context and handles requests.
pub(crate) struct ContextTask<D: Database> {
/// A token used to invalidate previous contexts when a new
/// block is added to the chain.
current_validity_token: ValidityToken,
pub(crate) context_swap: Arc<ArcSwap<BlockchainContext>>,
/// The difficulty cache.
difficulty_cache: difficulty::DifficultyCache,
@ -128,10 +129,22 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
rx_vms::RandomXVmCache::init_from_chain_height(chain_height, &current_hf, db).await
});
let difficulty_cache = difficulty_cache_handle.await.unwrap()?;
let weight_cache = weight_cache_handle.await.unwrap()?;
let blockchain_context = blockchain_context(
current_hf,
top_block_hash,
chain_height,
already_generated_coins,
&weight_cache,
&difficulty_cache,
);
let context_svc = Self {
current_validity_token: ValidityToken::new(),
difficulty_cache: difficulty_cache_handle.await.unwrap()?,
weight_cache: weight_cache_handle.await.unwrap()?,
context_swap: Arc::new(ArcSwap::new(Arc::new(blockchain_context))),
difficulty_cache,
weight_cache,
rx_vm_cache: rx_seed_handle.await.unwrap()?,
hardfork_state,
alt_chain_cache_map: AltChainMap::new(),
@ -144,42 +157,23 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
Ok(context_svc)
}
fn update_blockchain_context(&self) {
self.context_swap.store(Arc::new(blockchain_context(
self.hardfork_state.current_hardfork,
self.top_block_hash,
self.chain_height,
self.already_generated_coins,
&self.weight_cache,
&self.difficulty_cache,
)));
}
/// Handles a [`BlockChainContextRequest`] and returns a [`BlockChainContextResponse`].
pub(crate) async fn handle_req(
&mut self,
req: BlockChainContextRequest,
) -> Result<BlockChainContextResponse, tower::BoxError> {
Ok(match req {
BlockChainContextRequest::Context => {
tracing::debug!("Getting blockchain context");
let current_hf = self.hardfork_state.current_hardfork();
BlockChainContextResponse::Context(BlockChainContext {
validity_token: self.current_validity_token.clone(),
raw: RawBlockChainContext {
context_to_verify_block: ContextToVerifyBlock {
median_weight_for_block_reward: self
.weight_cache
.median_for_block_reward(current_hf),
effective_median_weight: self
.weight_cache
.effective_median_block_weight(current_hf),
top_hash: self.top_block_hash,
median_block_timestamp: self
.difficulty_cache
.median_timestamp(u64_to_usize(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW)),
chain_height: self.chain_height,
current_hf,
next_difficulty: self.difficulty_cache.next_difficulty(current_hf),
already_generated_coins: self.already_generated_coins,
},
cumulative_difficulty: self.difficulty_cache.cumulative_difficulty(),
median_long_term_weight: self.weight_cache.median_long_term_weight(),
top_block_timestamp: self.difficulty_cache.top_block_timestamp(),
},
})
}
BlockChainContextRequest::CurrentRxVms => {
BlockChainContextResponse::RxVms(self.rx_vm_cache.get_vms().await)
}
@ -202,9 +196,6 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
"Updating blockchain cache with new block, height: {}",
new.height
);
// Cancel the validity token and replace it with a new one.
std::mem::replace(&mut self.current_validity_token, ValidityToken::new())
.set_data_invalid();
self.difficulty_cache.new_block(
new.height,
@ -225,6 +216,8 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
.already_generated_coins
.saturating_add(new.generated_coins);
self.update_blockchain_context();
BlockChainContextResponse::Ok
}
BlockChainContextRequest::PopBlocks { numb_blocks } => {
@ -272,8 +265,7 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
self.already_generated_coins = already_generated_coins;
self.top_block_hash = top_block_hash;
std::mem::replace(&mut self.current_validity_token, ValidityToken::new())
.set_data_invalid();
self.update_blockchain_context();
BlockChainContextResponse::Ok
}
@ -342,3 +334,29 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
tracing::info!("Shutting down blockchain context task.");
}
}
fn blockchain_context(
hf: HardFork,
top_hash: [u8; 32],
chain_height: usize,
already_generated_coins: u64,
weight_cache: &weight::BlockWeightsCache,
difficulty_cache: &difficulty::DifficultyCache,
) -> BlockchainContext {
BlockchainContext {
context_to_verify_block: ContextToVerifyBlock {
median_weight_for_block_reward: weight_cache.median_for_block_reward(hf),
effective_median_weight: weight_cache.effective_median_block_weight(hf),
top_hash,
median_block_timestamp: difficulty_cache
.median_timestamp(u64_to_usize(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW)),
chain_height,
current_hf: hf,
next_difficulty: difficulty_cache.next_difficulty(hf),
already_generated_coins,
},
cumulative_difficulty: difficulty_cache.cumulative_difficulty(),
median_long_term_weight: weight_cache.median_long_term_weight(),
top_block_timestamp: difficulty_cache.top_block_timestamp(),
}
}

View file

@ -1,33 +0,0 @@
//! Tokens
//!
//! This module contains tokens which keep track of the validity of certain data.
//! Currently, there is 1 token:
//! - [`ValidityToken`]
//!
use tokio_util::sync::CancellationToken;
/// A token representing if a piece of data is valid.
#[derive(Debug, Clone, Default)]
pub struct ValidityToken {
token: CancellationToken,
}
impl ValidityToken {
/// Creates a new [`ValidityToken`]
pub fn new() -> Self {
Self {
token: CancellationToken::new(),
}
}
/// Returns `true` if the data is still valid.
pub fn is_data_valid(&self) -> bool {
!self.token.is_cancelled()
}
/// Sets the data to invalid.
pub fn set_data_invalid(self) {
self.token.cancel();
}
}

View file

@ -10,10 +10,10 @@ use monero_serai::{
block::Block,
transaction::{Input, Transaction},
};
use tower::{Service, ServiceExt};
use tower::Service;
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_consensus_context::{BlockChainContextRequest, BlockChainContextResponse};
use cuprate_consensus_context::BlockchainContextService;
use cuprate_consensus_rules::{miner_tx::MinerTxError, ConsensusError};
use cuprate_helper::cast::u64_to_usize;
use cuprate_types::{VerifiedBlockInformation, VerifiedTransactionInformation};
@ -110,37 +110,18 @@ impl From<tower::BoxError> for FastSyncError {
}
}
pub struct FastSyncService<C> {
context_svc: C,
pub struct FastSyncService {
context_svc: BlockchainContextService,
}
impl<C> FastSyncService<C>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
{
impl FastSyncService {
#[expect(dead_code)]
pub(crate) const fn new(context_svc: C) -> Self {
pub(crate) const fn new(context_svc: BlockchainContextService) -> Self {
Self { context_svc }
}
}
impl<C> Service<FastSyncRequest> for FastSyncService<C>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
{
impl Service<FastSyncRequest> for FastSyncService {
type Response = FastSyncResponse;
type Error = FastSyncError;
type Future =
@ -210,31 +191,13 @@ fn validate_hashes(
})
}
async fn validate_block<C>(
mut context_svc: C,
async fn validate_block(
mut context_svc: BlockchainContextService,
block: Block,
mut txs: HashMap<[u8; 32], Transaction>,
token: ValidBlockId,
) -> Result<FastSyncResponse, FastSyncError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
C::Future: Send + 'static,
{
let BlockChainContextResponse::Context(checked_context) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::Context)
.await?
else {
panic!("Context service returned wrong response!");
};
let block_chain_ctx = checked_context.unchecked_blockchain_context().clone();
) -> Result<FastSyncResponse, FastSyncError> {
let context = context_svc.blockchain_context();
let block_hash = block.hash();
if block_hash != token.0 {
@ -246,7 +209,7 @@ where
let Some(Input::Gen(height)) = block.miner_transaction.prefix().inputs.first() else {
return Err(FastSyncError::MinerTx(MinerTxError::InputNotOfTypeGen));
};
if *height != block_chain_ctx.chain_height {
if *height != context.chain_height {
return Err(FastSyncError::BlockHeightMismatch);
}
@ -288,9 +251,8 @@ where
height: *height,
generated_coins,
weight,
long_term_weight: block_chain_ctx.next_block_long_term_weight(weight),
cumulative_difficulty: block_chain_ctx.cumulative_difficulty
+ block_chain_ctx.next_difficulty,
long_term_weight: context.next_block_long_term_weight(weight),
cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty,
block,
}))
}

View file

@ -15,7 +15,7 @@ use monero_serai::{
use tower::{Service, ServiceExt};
use cuprate_consensus_context::{
BlockChainContextRequest, BlockChainContextResponse, RawBlockChainContext,
BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService,
};
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_types::{
@ -242,9 +242,9 @@ pub enum VerifyBlockResponse {
}
/// The block verifier service.
pub struct BlockVerifierService<C, TxV, D> {
pub struct BlockVerifierService<TxV, D> {
/// The context service.
context_svc: C,
context_svc: BlockchainContextService,
/// The tx verifier service.
tx_verifier_svc: TxV,
/// The database.
@ -252,12 +252,8 @@ pub struct BlockVerifierService<C, TxV, D> {
_database: D,
}
impl<C, TxV, D> BlockVerifierService<C, TxV, D>
impl<TxV, D> BlockVerifierService<TxV, D>
where
C: Service<BlockChainContextRequest, Response = BlockChainContextResponse>
+ Clone
+ Send
+ 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
@ -266,7 +262,11 @@ where
D::Future: Send + 'static,
{
/// Creates a new block verifier.
pub(crate) const fn new(context_svc: C, tx_verifier_svc: TxV, database: D) -> Self {
pub(crate) const fn new(
context_svc: BlockchainContextService,
tx_verifier_svc: TxV,
database: D,
) -> Self {
Self {
context_svc,
tx_verifier_svc,
@ -275,17 +275,8 @@ where
}
}
impl<C, TxV, D> Service<VerifyBlockRequest> for BlockVerifierService<C, TxV, D>
impl<TxV, D> Service<VerifyBlockRequest> for BlockVerifierService<TxV, D>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
@ -320,8 +311,7 @@ where
batch_prepare_main_chain_block(blocks, context_svc).await
}
VerifyBlockRequest::MainChainPrepped { block, txs } => {
verify_prepped_main_chain_block(block, txs, context_svc, tx_verifier_svc, None)
.await
verify_prepped_main_chain_block(block, txs, context_svc, tx_verifier_svc).await
}
VerifyBlockRequest::AltChain {
block,
@ -334,32 +324,17 @@ where
}
/// Verifies a prepared block.
async fn verify_main_chain_block<C, TxV>(
async fn verify_main_chain_block<TxV>(
block: Block,
txs: HashMap<[u8; 32], TransactionVerificationData>,
mut context_svc: C,
mut context_svc: BlockchainContextService,
tx_verifier_svc: TxV,
) -> Result<VerifyBlockResponse, ExtendedConsensusError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>,
{
let BlockChainContextResponse::Context(checked_context) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::Context)
.await?
else {
panic!("Context service returned wrong response!");
};
let context = context_svc.blockchain_context().clone();
let context = checked_context.unchecked_blockchain_context().clone();
tracing::debug!("got blockchain context: {:?}", context);
tracing::debug!(
@ -404,49 +379,19 @@ where
.map(Arc::new)
.collect();
verify_prepped_main_chain_block(
prepped_block,
ordered_txs,
context_svc,
tx_verifier_svc,
Some(context),
)
.await
verify_prepped_main_chain_block(prepped_block, ordered_txs, context_svc, tx_verifier_svc).await
}
async fn verify_prepped_main_chain_block<C, TxV>(
async fn verify_prepped_main_chain_block<TxV>(
prepped_block: PreparedBlock,
txs: Vec<Arc<TransactionVerificationData>>,
context_svc: C,
mut context_svc: BlockchainContextService,
tx_verifier_svc: TxV,
cached_context: Option<RawBlockChainContext>,
) -> Result<VerifyBlockResponse, ExtendedConsensusError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>,
{
let context = if let Some(context) = cached_context {
context
} else {
let BlockChainContextResponse::Context(checked_context) = context_svc
.oneshot(BlockChainContextRequest::Context)
.await?
else {
panic!("Context service returned wrong response!");
};
let context = checked_context.unchecked_blockchain_context().clone();
tracing::debug!("got blockchain context: {context:?}");
context
};
let context = context_svc.blockchain_context();
tracing::debug!("verifying block: {}", hex::encode(prepped_block.block_hash));

View file

@ -1,11 +1,7 @@
use std::{collections::HashMap, sync::Arc};
use monero_serai::{block::Block, transaction::Transaction};
use rayon::prelude::*;
use tower::{Service, ServiceExt};
use tracing::instrument;
use cuprate_consensus_context::rx_vms::RandomXVm;
use cuprate_consensus_context::BlockchainContextService;
use cuprate_consensus_rules::{
blocks::{check_block_pow, is_randomx_seed_height, randomx_seed_height, BlockError},
hard_forks::HardForkError,
@ -13,6 +9,10 @@ use cuprate_consensus_rules::{
ConsensusError, HardFork,
};
use cuprate_helper::asynch::rayon_spawn_async;
use monero_serai::{block::Block, transaction::Transaction};
use rayon::prelude::*;
use tower::{Service, ServiceExt};
use tracing::instrument;
use crate::{
block::{free::pull_ordered_transactions, PreparedBlock, PreparedBlockExPow},
@ -23,19 +23,10 @@ use crate::{
/// Batch prepares a list of blocks for verification.
#[instrument(level = "debug", name = "batch_prep_blocks", skip_all, fields(amt = blocks.len()))]
pub(crate) async fn batch_prepare_main_chain_block<C>(
pub(crate) async fn batch_prepare_main_chain_block(
blocks: Vec<(Block, Vec<Transaction>)>,
mut context_svc: C,
) -> Result<VerifyBlockResponse, ExtendedConsensusError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
C::Future: Send + 'static,
{
mut context_svc: BlockchainContextService,
) -> Result<VerifyBlockResponse, ExtendedConsensusError> {
let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
tracing::debug!("Calculating block hashes.");
@ -89,16 +80,6 @@ where
timestamps_hfs.push((block_0.block.header.timestamp, block_0.hf_version));
}
// Get the current blockchain context.
let BlockChainContextResponse::Context(checked_context) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::Context)
.await?
else {
panic!("Context service returned wrong response!");
};
// Calculate the expected difficulties for each block in the batch.
let BlockChainContextResponse::BatchDifficulties(difficulties) = context_svc
.ready()
@ -111,7 +92,8 @@ where
panic!("Context service returned wrong response!");
};
let context = checked_context.unchecked_blockchain_context().clone();
// Get the current blockchain context.
let context = context_svc.blockchain_context();
// Make sure the blocks follow the main chain.

View file

@ -2,7 +2,7 @@
//!
//! This crate contains 3 [`tower::Service`]s that implement Monero's consensus rules:
//!
//! - [`BlockChainContextService`] Which handles keeping the current state of the blockchain.
//! - [`BlockchainContextService`] Which handles keeping the current state of the blockchain.
//! - [`BlockVerifierService`] Which handles block verification.
//! - [`TxVerifierService`] Which handles transaction verification.
//!
@ -30,8 +30,8 @@ pub mod transactions;
pub use block::{BlockVerifierService, VerifyBlockRequest, VerifyBlockResponse};
pub use cuprate_consensus_context::{
initialize_blockchain_context, BlockChainContext, BlockChainContextRequest,
BlockChainContextResponse, BlockChainContextService, ContextConfig,
initialize_blockchain_context, BlockChainContextRequest, BlockChainContextResponse,
BlockchainContext, BlockchainContextService, ContextConfig,
};
pub use transactions::{TxVerifierService, VerifyTxRequest, VerifyTxResponse};
@ -64,25 +64,16 @@ pub enum ExtendedConsensusError {
}
/// Initialize the 2 verifier [`tower::Service`]s (block and transaction).
pub fn initialize_verifier<D, Ctx>(
pub fn initialize_verifier<D>(
database: D,
ctx_svc: Ctx,
ctx_svc: BlockchainContextService,
) -> (
BlockVerifierService<Ctx, TxVerifierService<D>, D>,
BlockVerifierService<TxVerifierService<D>, D>,
TxVerifierService<D>,
)
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
Ctx: tower::Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
Ctx::Future: Send + 'static,
{
let tx_svc = TxVerifierService::new(database.clone());
let block_svc = BlockVerifierService::new(ctx_svc, tx_svc.clone(), database);