diff --git a/consensus/src/context.rs b/consensus/src/context.rs new file mode 100644 index 0000000..2ec01b6 --- /dev/null +++ b/consensus/src/context.rs @@ -0,0 +1,248 @@ +//! # Blockchain Context +//! +//! This module contains a service to get cached context from the blockchain: [`BlockChainContext`]. +//! This is used during contextual validation, this does not have all the data for contextual validation +//! (outputs) for that you will need a [`Database`]. +//! + +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use futures::FutureExt; +use tokio::sync::RwLock; +use tower::buffer::future::ResponseFuture; +use tower::{buffer::Buffer, Service, ServiceExt}; + +use crate::{ConsensusError, Database, DatabaseRequest, DatabaseResponse}; + +pub mod difficulty; +mod hardforks; +mod weight; + +pub use difficulty::DifficultyCacheConfig; +pub use hardforks::{HardFork, HardForkConfig}; +pub use weight::BlockWeightsCacheConfig; + +const BUFFER_CONTEXT_CHANNEL_SIZE: usize = 5; + +pub struct ContextConfig { + hard_fork_cfg: HardForkConfig, + difficulty_cfg: DifficultyCacheConfig, + weights_config: BlockWeightsCacheConfig, +} + +impl ContextConfig { + pub fn main_net() -> ContextConfig { + ContextConfig { + hard_fork_cfg: HardForkConfig::main_net(), + difficulty_cfg: DifficultyCacheConfig::main_net(), + weights_config: BlockWeightsCacheConfig::main_net(), + } + } +} + +pub async fn initialize_blockchain_context( + cfg: ContextConfig, + mut database: D, +) -> Result< + impl Service< + BlockChainContextRequest, + Response = BlockChainContext, + Error = tower::BoxError, + Future = impl Future> + Send + 'static, + > + Clone + + Send + + Sync + + 'static, + ConsensusError, +> +where + D: Database + Clone + Send + Sync + 'static, + D::Future: Send + 'static, +{ + let ContextConfig { + difficulty_cfg, + weights_config, + hard_fork_cfg, + } = cfg; + + tracing::debug!("Initialising blockchain context"); + + let DatabaseResponse::ChainHeight(chain_height, top_block_hash) = database + .ready() + .await? + .call(DatabaseRequest::ChainHeight) + .await? + else { + panic!("Database sent incorrect response!"); + }; + + let db = database.clone(); + let difficulty_cache_handle = tokio::spawn(async move { + difficulty::DifficultyCache::init_from_chain_height(chain_height, difficulty_cfg, db).await + }); + + let db = database.clone(); + let weight_cache_handle = tokio::spawn(async move { + weight::BlockWeightsCache::init_from_chain_height(chain_height, weights_config, db).await + }); + + let db = database.clone(); + let hardfork_state_handle = tokio::spawn(async move { + hardforks::HardForkState::init_from_chain_height(chain_height, hard_fork_cfg, db).await + }); + + let context_svc = BlockChainContextService { + difficulty_cache: Arc::new(difficulty_cache_handle.await.unwrap()?.into()), + weight_cache: Arc::new(weight_cache_handle.await.unwrap()?.into()), + hardfork_state: Arc::new(hardfork_state_handle.await.unwrap()?.into()), + chain_height, + top_block_hash, + database, + }; + + let buffered_svc = Buffer::new(context_svc.boxed(), BUFFER_CONTEXT_CHANNEL_SIZE); + + Ok(buffered_svc) +} + +#[derive(Debug, Clone, Copy)] +pub struct BlockChainContext { + /// The next blocks difficulty. + next_difficulty: u128, + /// The current cumulative difficulty. + cumulative_difficulty: u128, + /// The current effective median block weight. + effective_median_weight: usize, + /// The median long term block weight. + median_long_term_weight: usize, + /// Median weight to use for block reward calculations. + median_weight_for_block_reward: usize, + /// Timestamp to use to check time locked outputs. + time_lock_timestamp: u64, + /// The height of the chain. + chain_height: u64, + /// The top blocks hash + top_hash: [u8; 32], + /// The current hard fork. + pub current_hard_fork: HardFork, +} + +#[derive(Debug, Clone)] +pub struct BlockChainContextRequest; + +pub struct BlockChainContextService { + difficulty_cache: Arc>, + weight_cache: Arc>, + hardfork_state: Arc>, + + chain_height: u64, + top_block_hash: [u8; 32], + + database: D, +} + +impl Service for BlockChainContextService { + type Response = BlockChainContext; + type Error = ConsensusError; + type Future = + Pin> + Send + 'static>>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _: BlockChainContextRequest) -> Self::Future { + let hardfork_state = self.hardfork_state.clone(); + let difficulty_cache = self.difficulty_cache.clone(); + let weight_cache = self.weight_cache.clone(); + + let chain_height = self.chain_height; + let top_hash = self.top_block_hash; + + async move { + let hardfork_state = hardfork_state.read().await; + let difficulty_cache = difficulty_cache.read().await; + let weight_cache = weight_cache.read().await; + + let current_hf = hardfork_state.current_hardfork(); + + Ok(BlockChainContext { + next_difficulty: difficulty_cache.next_difficulty(¤t_hf), + cumulative_difficulty: difficulty_cache.cumulative_difficulty(), + effective_median_weight: weight_cache.effective_median_block_weight(¤t_hf), + median_long_term_weight: weight_cache.median_long_term_weight(), + median_weight_for_block_reward: weight_cache.median_for_block_reward(¤t_hf), + time_lock_timestamp: 0, //TODO: + chain_height, + top_hash, + current_hard_fork: current_hf, + }) + } + .boxed() + } +} + +pub struct UpdateBlockchainCacheRequest { + new_top_hash: [u8; 32], + height: u64, + timestamp: u64, + weight: usize, + long_term_weight: usize, + vote: HardFork, +} + +impl tower::Service for BlockChainContextService +where + D: Database + Clone + Send + Sync + 'static, + D::Future: Send + 'static, +{ + type Response = (); + type Error = ConsensusError; + type Future = + Pin> + Send + 'static>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.database.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, new: UpdateBlockchainCacheRequest) -> Self::Future { + let hardfork_state = self.hardfork_state.clone(); + let difficulty_cache = self.difficulty_cache.clone(); + let weight_cache = self.weight_cache.clone(); + + let database = self.database.clone(); + + async move { + difficulty_cache + .write() + .await + .new_block(new.height, new.timestamp, database.clone()) + .await?; + + weight_cache + .write() + .await + .new_block( + new.height, + new.weight, + new.long_term_weight, + database.clone(), + ) + .await?; + + hardfork_state + .write() + .await + .new_block(new.vote, new.height, database) + .await?; + + Ok(()) + } + .boxed() + } +} diff --git a/consensus/src/block/difficulty.rs b/consensus/src/context/difficulty.rs similarity index 92% rename from consensus/src/block/difficulty.rs rename to consensus/src/context/difficulty.rs index dd5630e..7c52fe3 100644 --- a/consensus/src/block/difficulty.rs +++ b/consensus/src/context/difficulty.rs @@ -1,12 +1,10 @@ -use std::collections::VecDeque; -use std::ops::Range; +use std::{collections::VecDeque, ops::Range}; use tower::ServiceExt; use tracing::instrument; use crate::{ - hardforks::HardFork, helper::median, ConsensusError, Database, DatabaseRequest, - DatabaseResponse, + helper::median, ConsensusError, Database, DatabaseRequest, DatabaseResponse, HardFork, }; /// The amount of blocks we account for to calculate difficulty @@ -53,6 +51,7 @@ impl DifficultyCacheConfig { } /// This struct is able to calculate difficulties from blockchain information. +/// #[derive(Debug, Clone)] pub struct DifficultyCache { /// The list of timestamps in the window. @@ -74,7 +73,7 @@ impl DifficultyCache { config: DifficultyCacheConfig, mut database: D, ) -> Result { - let DatabaseResponse::ChainHeight(chain_height) = database + let DatabaseResponse::ChainHeight(chain_height, _) = database .ready() .await? .call(DatabaseRequest::ChainHeight) @@ -90,7 +89,7 @@ impl DifficultyCache { pub async fn init_from_chain_height( chain_height: u64, config: DifficultyCacheConfig, - mut database: D, + database: D, ) -> Result { tracing::info!("Initializing difficulty cache this may take a while."); @@ -112,7 +111,7 @@ impl DifficultyCache { config, }; - diff.update_windowed_work(&mut database).await?; + diff.update_windowed_work(database).await?; tracing::info!( "Current chain height: {}, accounting for {} blocks timestamps", @@ -251,22 +250,22 @@ async fn get_blocks_in_range_timestamps( ) -> Result, ConsensusError> { tracing::info!("Getting blocks timestamps"); - let DatabaseResponse::BlockPOWInfoInRange(pow_infos) = database - .oneshot(DatabaseRequest::BlockPOWInfoInRange(block_heights)) + let DatabaseResponse::BlockExtendedHeaderInRange(ext_header) = database + .oneshot(DatabaseRequest::BlockExtendedHeaderInRange(block_heights)) .await? else { panic!("Database sent incorrect response"); }; - Ok(pow_infos.into_iter().map(|info| info.timestamp).collect()) + Ok(ext_header.into_iter().map(|info| info.timestamp).collect()) } async fn get_block_cum_diff(database: D, height: u64) -> Result { - let DatabaseResponse::BlockPOWInfo(pow) = database - .oneshot(DatabaseRequest::BlockPOWInfo(height.into())) + let DatabaseResponse::BlockExtendedHeader(ext_header) = database + .oneshot(DatabaseRequest::BlockExtendedHeader(height.into())) .await? else { panic!("Database service sent incorrect response!"); }; - Ok(pow.cumulative_difficulty) + Ok(ext_header.cumulative_difficulty) } diff --git a/consensus/src/hardforks.rs b/consensus/src/context/hardforks.rs similarity index 90% rename from consensus/src/hardforks.rs rename to consensus/src/context/hardforks.rs index f6994c3..2b17459 100644 --- a/consensus/src/hardforks.rs +++ b/consensus/src/context/hardforks.rs @@ -1,13 +1,13 @@ -use std::fmt::{Display, Formatter}; -use std::ops::Range; -use std::time::Duration; +use std::{ + fmt::{Display, Formatter}, + ops::Range, + time::Duration, +}; use monero_serai::block::BlockHeader; use tower::ServiceExt; use tracing::instrument; -use cuprate_common::Network; - use crate::{ConsensusError, Database, DatabaseRequest, DatabaseResponse}; // https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#accepting-a-fork @@ -182,6 +182,13 @@ impl HardFork { _ => BLOCK_TIME_V2, } } + + /// Checks a blocks version and vote, assuming that `self` is the current hard-fork. + /// + /// https://cuprate.github.io/monero-book/consensus_rules/blocks.html#version-and-vote + pub fn check_block_version_vote(&self, block_hf_info: &BlockHFInfo) -> bool { + self == &block_hf_info.version && &block_hf_info.vote >= self + } } /// A struct holding the current voting state of the blockchain. @@ -260,7 +267,7 @@ impl HardForkState { config: HardForkConfig, mut database: D, ) -> Result { - let DatabaseResponse::ChainHeight(chain_height) = database + let DatabaseResponse::ChainHeight(chain_height, _) = database .ready() .await? .call(DatabaseRequest::ChainHeight) @@ -269,15 +276,15 @@ impl HardForkState { panic!("Database sent incorrect response") }; - let hfs = HardForkState::init_from_chain_height(config, chain_height, database).await?; + let hfs = HardForkState::init_from_chain_height(chain_height, config, database).await?; Ok(hfs) } #[instrument(name = "init_hardfork_state", skip(config, database), level = "info")] pub async fn init_from_chain_height( - config: HardForkConfig, chain_height: u64, + config: HardForkConfig, mut database: D, ) -> Result { tracing::info!("Initializing hard-fork state this may take a while."); @@ -290,16 +297,18 @@ impl HardForkState { debug_assert_eq!(votes.total_votes(), config.window) } - let DatabaseResponse::BlockHFInfo(hf_info) = database + let DatabaseResponse::BlockExtendedHeader(ext_header) = database .ready() .await? - .call(DatabaseRequest::BlockHFInfo((chain_height - 1).into())) + .call(DatabaseRequest::BlockExtendedHeader( + (chain_height - 1).into(), + )) .await? else { panic!("Database sent incorrect response!"); }; - let current_hardfork = hf_info.version; + let current_hardfork = ext_header.version; let next_hardfork = current_hardfork.next_fork(); @@ -322,11 +331,6 @@ impl HardForkState { Ok(hfs) } - pub fn check_block_version_vote(&self, block_hf_info: &BlockHFInfo) -> bool { - self.current_hardfork == block_hf_info.version - && block_hf_info.vote >= self.current_hardfork - } - pub async fn new_block( &mut self, vote: HardFork, @@ -347,10 +351,12 @@ impl HardForkState { for height_to_remove in (self.config.window..self.votes.total_votes()).map(|offset| height - offset) { - let DatabaseResponse::BlockHFInfo(hf_info) = database + let DatabaseResponse::BlockExtendedHeader(ext_header) = database .ready() .await? - .call(DatabaseRequest::BlockHFInfo(height_to_remove.into())) + .call(DatabaseRequest::BlockExtendedHeader( + height_to_remove.into(), + )) .await? else { panic!("Database sent incorrect response!"); @@ -359,10 +365,10 @@ impl HardForkState { tracing::debug!( "Removing block {} vote ({:?}) as they have left the window", height_to_remove, - hf_info.vote + ext_header.vote ); - self.votes.remove_vote_for_hf(&hf_info.vote); + self.votes.remove_vote_for_hf(&ext_header.vote); } if height > self.config.window { @@ -395,6 +401,10 @@ impl HardForkState { self.next_hardfork = new_hf.next_fork(); self.current_hardfork = new_hf; } + + pub fn current_hardfork(&self) -> HardFork { + self.current_hardfork + } } /// Returns the votes needed for this fork. @@ -411,8 +421,8 @@ async fn get_votes_in_range( ) -> Result { let mut votes = HFVotes::default(); - let DatabaseResponse::BlockHfInfoInRange(vote_list) = database - .oneshot(DatabaseRequest::BlockHfInfoInRange(block_heights)) + let DatabaseResponse::BlockExtendedHeaderInRange(vote_list) = database + .oneshot(DatabaseRequest::BlockExtendedHeaderInRange(block_heights)) .await? else { panic!("Database sent incorrect response!"); diff --git a/consensus/src/block/weight.rs b/consensus/src/context/weight.rs similarity index 89% rename from consensus/src/block/weight.rs rename to consensus/src/context/weight.rs index 486e476..4a70970 100644 --- a/consensus/src/block/weight.rs +++ b/consensus/src/context/weight.rs @@ -6,17 +6,18 @@ //! For more information please see the [block weights chapter](https://cuprate.github.io/monero-book/consensus_rules/blocks/weight_limit.html) //! in the Monero Book. //! -use std::cmp::{max, min}; -use std::collections::VecDeque; -use std::ops::Range; +use std::{ + cmp::{max, min}, + collections::VecDeque, + ops::Range, +}; use monero_serai::{block::Block, transaction::Transaction}; use tower::ServiceExt; use tracing::instrument; use crate::{ - hardforks::HardFork, helper::median, ConsensusError, Database, DatabaseRequest, - DatabaseResponse, + helper::median, ConsensusError, Database, DatabaseRequest, DatabaseResponse, HardFork, }; const PENALTY_FREE_ZONE_1: usize = 20000; @@ -102,7 +103,7 @@ impl BlockWeightsCache { config: BlockWeightsCacheConfig, mut database: D, ) -> Result { - let DatabaseResponse::ChainHeight(chain_height) = database + let DatabaseResponse::ChainHeight(chain_height, _) = database .ready() .await? .call(DatabaseRequest::ChainHeight) @@ -156,12 +157,12 @@ impl BlockWeightsCache { /// /// The block_height **MUST** be one more than the last height the cache has /// seen. - pub async fn new_block_added( + pub async fn new_block( &mut self, block_height: u64, block_weight: usize, long_term_weight: usize, - database: &mut D, + database: D, ) -> Result<(), ConsensusError> { tracing::debug!( "Adding new block's {} weights to block cache, weight: {}, long term weight: {}", @@ -181,15 +182,17 @@ impl BlockWeightsCache { "Block {} is out of the long term weight window, removing it", height_to_remove ); - let DatabaseResponse::BlockWeights(weights) = database - .oneshot(DatabaseRequest::BlockWeights(height_to_remove.into())) + let DatabaseResponse::BlockExtendedHeader(ext_header) = database + .oneshot(DatabaseRequest::BlockExtendedHeader( + height_to_remove.into(), + )) .await? else { panic!("Database sent incorrect response!"); }; let idx = self .long_term_weights - .binary_search(&weights.long_term_weight) + .binary_search(&ext_header.long_term_weight) .expect("Weight must be in list if in the window"); self.long_term_weights.remove(idx); } @@ -209,6 +212,11 @@ impl BlockWeightsCache { calculate_block_long_term_weight(hf, block_weight, &self.long_term_weights) } + /// Returns the median long term weight over the last [`LONG_TERM_WINDOW`] blocks, or custom amount of blocks in the config. + pub fn median_long_term_weight(&self) -> usize { + median(&self.long_term_weights) + } + /// Returns the effective median weight, used for block reward calculations and to calculate /// the block weight limit. /// @@ -295,14 +303,17 @@ async fn get_blocks_weight_in_range( ) -> Result, ConsensusError> { tracing::info!("getting block weights."); - let DatabaseResponse::BlockWeightsInRange(weights) = database - .oneshot(DatabaseRequest::BlockWeightsInRange(range)) + let DatabaseResponse::BlockExtendedHeaderInRange(ext_headers) = database + .oneshot(DatabaseRequest::BlockExtendedHeaderInRange(range)) .await? else { panic!("Database sent incorrect response!") }; - Ok(weights.into_iter().map(|info| info.block_weight).collect()) + Ok(ext_headers + .into_iter() + .map(|info| info.block_weight) + .collect()) } #[instrument(name = "get_long_term_weights", skip(database), level = "info")] @@ -312,14 +323,14 @@ async fn get_long_term_weight_in_range( ) -> Result, ConsensusError> { tracing::info!("getting block long term weights."); - let DatabaseResponse::BlockWeightsInRange(weights) = database - .oneshot(DatabaseRequest::BlockWeightsInRange(range)) + let DatabaseResponse::BlockExtendedHeaderInRange(ext_headers) = database + .oneshot(DatabaseRequest::BlockExtendedHeaderInRange(range)) .await? else { panic!("Database sent incorrect response!") }; - Ok(weights + Ok(ext_headers .into_iter() .map(|info| info.long_term_weight) .collect()) diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 0924954..7a02ed2 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -1,14 +1,37 @@ use std::collections::{HashMap, HashSet}; pub mod block; +pub mod context; pub mod genesis; -pub mod hardforks; mod helper; pub mod miner_tx; #[cfg(feature = "binaries")] pub mod rpc; pub mod transactions; -pub mod verifier; + +pub use block::VerifyBlockRequest; +pub use context::{ContextConfig, HardFork}; +pub use transactions::VerifyTxRequest; + +pub async fn initialize_verifier( + database: D, + cfg: ContextConfig, +) -> Result< + ( + impl tower::Service, + impl tower::Service, + ), + ConsensusError, +> +where + D: Database + Clone + Send + Sync + 'static, + D::Future: Send + 'static, +{ + let context_svc = context::initialize_blockchain_context(cfg, database.clone()).await?; + let tx_svc = transactions::TxVerifierService::new(database); + let block_svc = block::BlockVerifierService::new(context_svc, tx_svc.clone()); + Ok((block_svc, tx_svc)) +} #[derive(Debug, thiserror::Error)] pub enum ConsensusError { @@ -50,16 +73,32 @@ impl), - BlockWeightsInRange(std::ops::Range), - BlockPOWInfoInRange(std::ops::Range), + BlockExtendedHeaderInRange(std::ops::Range), ChainHeight, @@ -72,18 +111,14 @@ pub enum DatabaseRequest { #[derive(Debug)] pub enum DatabaseResponse { - BlockHFInfo(hardforks::BlockHFInfo), - BlockPOWInfo(block::BlockPOWInfo), - BlockWeights(block::weight::BlockWeightInfo), + BlockExtendedHeader(ExtendedBlockHeader), BlockHash([u8; 32]), - BlockHfInfoInRange(Vec), - BlockWeightsInRange(Vec), - BlockPOWInfoInRange(Vec), + BlockExtendedHeaderInRange(Vec), - ChainHeight(u64), + ChainHeight(u64, [u8; 32]), - Outputs(HashMap>), + Outputs(HashMap>), NumberOutputsWithAmount(usize), #[cfg(feature = "binaries")]