remove validity token + cache response

This commit is contained in:
Boog900 2024-10-28 01:27:59 +00:00
parent 63216aecae
commit 1f792573bd
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
3 changed files with 90 additions and 124 deletions

View file

@ -8,6 +8,10 @@
// FIXME: should we pull in a dependency just to link docs? // FIXME: should we pull in a dependency just to link docs?
use monero_serai as _; use monero_serai as _;
use futures::{channel::oneshot, FutureExt};
use std::future::ready;
use std::sync::RwLock;
use std::task::ready;
use std::{ use std::{
cmp::min, cmp::min,
collections::HashMap, collections::HashMap,
@ -16,8 +20,6 @@ use std::{
sync::Arc, sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
}; };
use futures::{channel::oneshot, FutureExt};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_util::sync::PollSender; use tokio_util::sync::PollSender;
use tower::Service; use tower::Service;
@ -33,7 +35,6 @@ pub mod weight;
mod alt_chains; mod alt_chains;
mod task; mod task;
mod tokens;
use cuprate_types::{Chain, ChainInfo, FeeEstimate, HardForkInfo}; use cuprate_types::{Chain, ChainInfo, FeeEstimate, HardForkInfo};
use difficulty::DifficultyCache; use difficulty::DifficultyCache;
@ -43,7 +44,6 @@ use weight::BlockWeightsCache;
pub use alt_chains::{sealed::AltChainRequestToken, AltChainContextCache}; pub use alt_chains::{sealed::AltChainRequestToken, AltChainContextCache};
pub use difficulty::DifficultyCacheConfig; pub use difficulty::DifficultyCacheConfig;
pub use hardforks::HardForkConfig; pub use hardforks::HardForkConfig;
pub use tokens::*;
pub use weight::BlockWeightsCacheConfig; pub use weight::BlockWeightsCacheConfig;
pub const BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW: u64 = 60; pub const BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW: u64 = 60;
@ -100,7 +100,7 @@ where
D: Database + Clone + Send + Sync + 'static, D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static, D::Future: Send + 'static,
{ {
let context_task = task::ContextTask::init_context(cfg, database).await?; let (context_task, cached_context) = task::ContextTask::init_context(cfg, database).await?;
// TODO: make buffer size configurable. // TODO: make buffer size configurable.
let (tx, rx) = mpsc::channel(15); let (tx, rx) = mpsc::channel(15);
@ -109,13 +109,13 @@ where
Ok(BlockChainContextService { Ok(BlockChainContextService {
channel: PollSender::new(tx), channel: PollSender::new(tx),
cached_context,
}) })
} }
/// Raw blockchain context, gotten from [`BlockChainContext`]. This data may turn invalid so is not ok to keep /// [`BlockChainContext`]
/// around. You should keep around [`BlockChainContext`] instead.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RawBlockChainContext { pub struct BlockChainContext {
/// The current cumulative difficulty. /// The current cumulative difficulty.
pub cumulative_difficulty: u128, pub cumulative_difficulty: u128,
/// Context to verify a block, as needed by [`cuprate-consensus-rules`] /// Context to verify a block, as needed by [`cuprate-consensus-rules`]
@ -126,14 +126,14 @@ pub struct RawBlockChainContext {
top_block_timestamp: Option<u64>, top_block_timestamp: Option<u64>,
} }
impl std::ops::Deref for RawBlockChainContext { impl std::ops::Deref for BlockChainContext {
type Target = ContextToVerifyBlock; type Target = ContextToVerifyBlock;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
&self.context_to_verify_block &self.context_to_verify_block
} }
} }
impl RawBlockChainContext { impl BlockChainContext {
/// Returns the timestamp the should be used when checking locked outputs. /// Returns the timestamp the should be used when checking locked outputs.
/// ///
/// ref: <https://cuprate.github.io/monero-book/consensus_rules/transactions/unlock_time.html#getting-the-current-time> /// ref: <https://cuprate.github.io/monero-book/consensus_rules/transactions/unlock_time.html#getting-the-current-time>
@ -166,40 +166,6 @@ impl RawBlockChainContext {
} }
} }
/// Blockchain context which keeps a token of validity so users will know when the data is no longer valid.
#[derive(Debug, Clone)]
pub struct BlockChainContext {
/// A token representing this data's validity.
validity_token: ValidityToken,
/// The actual block chain context.
raw: RawBlockChainContext,
}
#[derive(Debug, Clone, Copy, thiserror::Error)]
#[error("data is no longer valid")]
pub struct DataNoLongerValid;
impl BlockChainContext {
/// Checks if the data is still valid.
pub fn is_still_valid(&self) -> bool {
self.validity_token.is_data_valid()
}
/// Checks if the data is valid returning an Err if not and a reference to the blockchain context if
/// it is.
pub fn blockchain_context(&self) -> Result<&RawBlockChainContext, DataNoLongerValid> {
if !self.is_still_valid() {
return Err(DataNoLongerValid);
}
Ok(&self.raw)
}
/// Returns the blockchain context without checking the validity token.
pub const fn unchecked_blockchain_context(&self) -> &RawBlockChainContext {
&self.raw
}
}
/// Data needed from a new block to add it to the context cache. /// Data needed from a new block to add it to the context cache.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct NewBlockData { pub struct NewBlockData {
@ -386,6 +352,8 @@ pub enum BlockChainContextResponse {
#[derive(Clone)] #[derive(Clone)]
pub struct BlockChainContextService { pub struct BlockChainContextService {
channel: PollSender<task::ContextTaskRequest>, channel: PollSender<task::ContextTaskRequest>,
/// The cached context, in the context cache, so we don't have to calculate it each time.
cached_context: Arc<RwLock<BlockChainContext>>,
} }
impl Service<BlockChainContextRequest> for BlockChainContextService { impl Service<BlockChainContextRequest> for BlockChainContextService {
@ -401,6 +369,13 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
} }
fn call(&mut self, req: BlockChainContextRequest) -> Self::Future { fn call(&mut self, req: BlockChainContextRequest) -> Self::Future {
if matches!(req, BlockChainContextRequest::Context) {
self.channel.abort_send();
let context = self.cached_context.read().unwrap().clone();
return ready(Ok(BlockChainContextResponse::Context(context))).boxed();
}
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let req = task::ContextTaskRequest { let req = task::ContextTaskRequest {

View file

@ -3,7 +3,9 @@
//! This module contains the async task that handles keeping track of blockchain context. //! This module contains the async task that handles keeping track of blockchain context.
//! It holds all the context caches and handles [`tower::Service`] requests. //! It holds all the context caches and handles [`tower::Service`] requests.
//! //!
use futures::channel::oneshot; use futures::channel::oneshot;
use std::sync::{Arc, RwLock};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tower::ServiceExt; use tower::ServiceExt;
use tracing::Instrument; use tracing::Instrument;
@ -12,14 +14,14 @@ use cuprate_consensus_rules::blocks::ContextToVerifyBlock;
use cuprate_helper::cast::u64_to_usize; use cuprate_helper::cast::u64_to_usize;
use cuprate_types::{ use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse}, blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain, Chain, HardFork,
}; };
use crate::{ use crate::{
alt_chains::{get_alt_chain_difficulty_cache, get_alt_chain_weight_cache, AltChainMap}, alt_chains::{get_alt_chain_difficulty_cache, get_alt_chain_weight_cache, AltChainMap},
difficulty, hardforks, rx_vms, weight, BlockChainContext, BlockChainContextRequest, difficulty, hardforks, rx_vms, weight, BlockChainContext, BlockChainContextRequest,
BlockChainContextResponse, ContextCacheError, ContextConfig, Database, RawBlockChainContext, BlockChainContextResponse, ContextCacheError, ContextConfig, Database,
ValidityToken, BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW, BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW,
}; };
/// A request from the context service to the context task. /// A request from the context service to the context task.
@ -34,9 +36,7 @@ pub(super) struct ContextTaskRequest {
/// The Context task that keeps the blockchain context and handles requests. /// The Context task that keeps the blockchain context and handles requests.
pub(crate) struct ContextTask<D: Database> { pub(crate) struct ContextTask<D: Database> {
/// A token used to invalidate previous contexts when a new cached_context: Arc<RwLock<BlockChainContext>>,
/// block is added to the chain.
current_validity_token: ValidityToken,
/// The difficulty cache. /// The difficulty cache.
difficulty_cache: difficulty::DifficultyCache, difficulty_cache: difficulty::DifficultyCache,
@ -65,7 +65,7 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
pub(crate) async fn init_context( pub(crate) async fn init_context(
cfg: ContextConfig, cfg: ContextConfig,
mut database: D, mut database: D,
) -> Result<Self, ContextCacheError> { ) -> Result<(Self, Arc<RwLock<BlockChainContext>>), ContextCacheError> {
let ContextConfig { let ContextConfig {
difficulty_cfg, difficulty_cfg,
weights_config, weights_config,
@ -128,11 +128,26 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
rx_vms::RandomXVmCache::init_from_chain_height(chain_height, &current_hf, db).await rx_vms::RandomXVmCache::init_from_chain_height(chain_height, &current_hf, db).await
}); });
let difficulty_cache = difficulty_cache_handle.await.unwrap()?;
let weight_cache = weight_cache_handle.await.unwrap()?;
let rx_vm_cache = rx_seed_handle.await.unwrap()?;
let context = blockchain_context(
&difficulty_cache,
&weight_cache,
top_block_hash,
chain_height,
current_hf,
already_generated_coins,
);
let cached_context = Arc::new(RwLock::new(context));
let context_svc = Self { let context_svc = Self {
current_validity_token: ValidityToken::new(), cached_context: Arc::clone(&cached_context),
difficulty_cache: difficulty_cache_handle.await.unwrap()?, difficulty_cache,
weight_cache: weight_cache_handle.await.unwrap()?, weight_cache,
rx_vm_cache: rx_seed_handle.await.unwrap()?, rx_vm_cache,
hardfork_state, hardfork_state,
alt_chain_cache_map: AltChainMap::new(), alt_chain_cache_map: AltChainMap::new(),
chain_height, chain_height,
@ -141,44 +156,30 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
database, database,
}; };
Ok(context_svc) Ok((context_svc, cached_context))
}
fn update_blockchain_context(&self) {
let blockchain_context = blockchain_context(
&self.difficulty_cache,
&self.weight_cache,
self.top_block_hash,
self.chain_height,
self.hardfork_state.current_hardfork(),
self.already_generated_coins,
);
*self.cached_context.write().unwrap() = blockchain_context;
} }
/// Handles a [`BlockChainContextRequest`] and returns a [`BlockChainContextResponse`]. /// Handles a [`BlockChainContextRequest`] and returns a [`BlockChainContextResponse`].
pub(crate) async fn handle_req( async fn handle_req(
&mut self, &mut self,
req: BlockChainContextRequest, req: BlockChainContextRequest,
) -> Result<BlockChainContextResponse, tower::BoxError> { ) -> Result<BlockChainContextResponse, tower::BoxError> {
Ok(match req { Ok(match req {
BlockChainContextRequest::Context => { BlockChainContextRequest::Context => {
tracing::debug!("Getting blockchain context"); unreachable!("This is handled directly in the service.")
let current_hf = self.hardfork_state.current_hardfork();
BlockChainContextResponse::Context(BlockChainContext {
validity_token: self.current_validity_token.clone(),
raw: RawBlockChainContext {
context_to_verify_block: ContextToVerifyBlock {
median_weight_for_block_reward: self
.weight_cache
.median_for_block_reward(current_hf),
effective_median_weight: self
.weight_cache
.effective_median_block_weight(current_hf),
top_hash: self.top_block_hash,
median_block_timestamp: self
.difficulty_cache
.median_timestamp(u64_to_usize(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW)),
chain_height: self.chain_height,
current_hf,
next_difficulty: self.difficulty_cache.next_difficulty(current_hf),
already_generated_coins: self.already_generated_coins,
},
cumulative_difficulty: self.difficulty_cache.cumulative_difficulty(),
median_long_term_weight: self.weight_cache.median_long_term_weight(),
top_block_timestamp: self.difficulty_cache.top_block_timestamp(),
},
})
} }
BlockChainContextRequest::CurrentRxVms => { BlockChainContextRequest::CurrentRxVms => {
BlockChainContextResponse::RxVms(self.rx_vm_cache.get_vms().await) BlockChainContextResponse::RxVms(self.rx_vm_cache.get_vms().await)
@ -202,10 +203,6 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
"Updating blockchain cache with new block, height: {}", "Updating blockchain cache with new block, height: {}",
new.height new.height
); );
// Cancel the validity token and replace it with a new one.
std::mem::replace(&mut self.current_validity_token, ValidityToken::new())
.set_data_invalid();
self.difficulty_cache.new_block( self.difficulty_cache.new_block(
new.height, new.height,
new.timestamp, new.timestamp,
@ -225,6 +222,8 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
.already_generated_coins .already_generated_coins
.saturating_add(new.generated_coins); .saturating_add(new.generated_coins);
self.update_blockchain_context();
BlockChainContextResponse::Ok BlockChainContextResponse::Ok
} }
BlockChainContextRequest::PopBlocks { numb_blocks } => { BlockChainContextRequest::PopBlocks { numb_blocks } => {
@ -272,8 +271,7 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
self.already_generated_coins = already_generated_coins; self.already_generated_coins = already_generated_coins;
self.top_block_hash = top_block_hash; self.top_block_hash = top_block_hash;
std::mem::replace(&mut self.current_validity_token, ValidityToken::new()) self.update_blockchain_context();
.set_data_invalid();
BlockChainContextResponse::Ok BlockChainContextResponse::Ok
} }
@ -341,3 +339,29 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
tracing::info!("Shutting down blockchain context task."); tracing::info!("Shutting down blockchain context task.");
} }
} }
fn blockchain_context(
difficulty_cache: &difficulty::DifficultyCache,
weight_cache: &weight::BlockWeightsCache,
top_hash: [u8; 32],
chain_height: usize,
current_hf: HardFork,
already_generated_coins: u64,
) -> BlockChainContext {
BlockChainContext {
context_to_verify_block: ContextToVerifyBlock {
median_weight_for_block_reward: weight_cache.median_for_block_reward(current_hf),
effective_median_weight: weight_cache.effective_median_block_weight(current_hf),
top_hash,
median_block_timestamp: difficulty_cache
.median_timestamp(u64_to_usize(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW)),
chain_height,
current_hf,
next_difficulty: difficulty_cache.next_difficulty(current_hf),
already_generated_coins,
},
cumulative_difficulty: difficulty_cache.cumulative_difficulty(),
median_long_term_weight: weight_cache.median_long_term_weight(),
top_block_timestamp: difficulty_cache.top_block_timestamp(),
}
}

View file

@ -1,33 +0,0 @@
//! Tokens
//!
//! This module contains tokens which keep track of the validity of certain data.
//! Currently, there is 1 token:
//! - [`ValidityToken`]
//!
use tokio_util::sync::CancellationToken;
/// A token representing if a piece of data is valid.
#[derive(Debug, Clone, Default)]
pub struct ValidityToken {
token: CancellationToken,
}
impl ValidityToken {
/// Creates a new [`ValidityToken`]
pub fn new() -> Self {
Self {
token: CancellationToken::new(),
}
}
/// Returns `true` if the data is still valid.
pub fn is_data_valid(&self) -> bool {
!self.token.is_cancelled()
}
/// Sets the data to invalid.
pub fn set_data_invalid(self) {
self.token.cancel();
}
}