move hardforks, weights and pow cache to single context with unified API

This commit is contained in:
Boog900 2023-10-22 17:27:37 +01:00
parent 387278b821
commit 50f9458528
No known key found for this signature in database
GPG key ID: 5401367FB7302004
5 changed files with 371 additions and 68 deletions

248
consensus/src/context.rs Normal file
View file

@ -0,0 +1,248 @@
//! # Blockchain Context
//!
//! This module contains a service to get cached context from the blockchain: [`BlockChainContext`].
//! This is used during contextual validation, this does not have all the data for contextual validation
//! (outputs) for that you will need a [`Database`].
//!
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures::FutureExt;
use tokio::sync::RwLock;
use tower::buffer::future::ResponseFuture;
use tower::{buffer::Buffer, Service, ServiceExt};
use crate::{ConsensusError, Database, DatabaseRequest, DatabaseResponse};
pub mod difficulty;
mod hardforks;
mod weight;
pub use difficulty::DifficultyCacheConfig;
pub use hardforks::{HardFork, HardForkConfig};
pub use weight::BlockWeightsCacheConfig;
const BUFFER_CONTEXT_CHANNEL_SIZE: usize = 5;
pub struct ContextConfig {
hard_fork_cfg: HardForkConfig,
difficulty_cfg: DifficultyCacheConfig,
weights_config: BlockWeightsCacheConfig,
}
impl ContextConfig {
pub fn main_net() -> ContextConfig {
ContextConfig {
hard_fork_cfg: HardForkConfig::main_net(),
difficulty_cfg: DifficultyCacheConfig::main_net(),
weights_config: BlockWeightsCacheConfig::main_net(),
}
}
}
pub async fn initialize_blockchain_context<D>(
cfg: ContextConfig,
mut database: D,
) -> Result<
impl Service<
BlockChainContextRequest,
Response = BlockChainContext,
Error = tower::BoxError,
Future = impl Future<Output = Result<BlockChainContext, tower::BoxError>> + Send + 'static,
> + Clone
+ Send
+ Sync
+ 'static,
ConsensusError,
>
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
let ContextConfig {
difficulty_cfg,
weights_config,
hard_fork_cfg,
} = cfg;
tracing::debug!("Initialising blockchain context");
let DatabaseResponse::ChainHeight(chain_height, top_block_hash) = database
.ready()
.await?
.call(DatabaseRequest::ChainHeight)
.await?
else {
panic!("Database sent incorrect response!");
};
let db = database.clone();
let difficulty_cache_handle = tokio::spawn(async move {
difficulty::DifficultyCache::init_from_chain_height(chain_height, difficulty_cfg, db).await
});
let db = database.clone();
let weight_cache_handle = tokio::spawn(async move {
weight::BlockWeightsCache::init_from_chain_height(chain_height, weights_config, db).await
});
let db = database.clone();
let hardfork_state_handle = tokio::spawn(async move {
hardforks::HardForkState::init_from_chain_height(chain_height, hard_fork_cfg, db).await
});
let context_svc = BlockChainContextService {
difficulty_cache: Arc::new(difficulty_cache_handle.await.unwrap()?.into()),
weight_cache: Arc::new(weight_cache_handle.await.unwrap()?.into()),
hardfork_state: Arc::new(hardfork_state_handle.await.unwrap()?.into()),
chain_height,
top_block_hash,
database,
};
let buffered_svc = Buffer::new(context_svc.boxed(), BUFFER_CONTEXT_CHANNEL_SIZE);
Ok(buffered_svc)
}
#[derive(Debug, Clone, Copy)]
pub struct BlockChainContext {
/// The next blocks difficulty.
next_difficulty: u128,
/// The current cumulative difficulty.
cumulative_difficulty: u128,
/// The current effective median block weight.
effective_median_weight: usize,
/// The median long term block weight.
median_long_term_weight: usize,
/// Median weight to use for block reward calculations.
median_weight_for_block_reward: usize,
/// Timestamp to use to check time locked outputs.
time_lock_timestamp: u64,
/// The height of the chain.
chain_height: u64,
/// The top blocks hash
top_hash: [u8; 32],
/// The current hard fork.
pub current_hard_fork: HardFork,
}
#[derive(Debug, Clone)]
pub struct BlockChainContextRequest;
pub struct BlockChainContextService<D> {
difficulty_cache: Arc<RwLock<difficulty::DifficultyCache>>,
weight_cache: Arc<RwLock<weight::BlockWeightsCache>>,
hardfork_state: Arc<RwLock<hardforks::HardForkState>>,
chain_height: u64,
top_block_hash: [u8; 32],
database: D,
}
impl<D> Service<BlockChainContextRequest> for BlockChainContextService<D> {
type Response = BlockChainContext;
type Error = ConsensusError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: BlockChainContextRequest) -> Self::Future {
let hardfork_state = self.hardfork_state.clone();
let difficulty_cache = self.difficulty_cache.clone();
let weight_cache = self.weight_cache.clone();
let chain_height = self.chain_height;
let top_hash = self.top_block_hash;
async move {
let hardfork_state = hardfork_state.read().await;
let difficulty_cache = difficulty_cache.read().await;
let weight_cache = weight_cache.read().await;
let current_hf = hardfork_state.current_hardfork();
Ok(BlockChainContext {
next_difficulty: difficulty_cache.next_difficulty(&current_hf),
cumulative_difficulty: difficulty_cache.cumulative_difficulty(),
effective_median_weight: weight_cache.effective_median_block_weight(&current_hf),
median_long_term_weight: weight_cache.median_long_term_weight(),
median_weight_for_block_reward: weight_cache.median_for_block_reward(&current_hf),
time_lock_timestamp: 0, //TODO:
chain_height,
top_hash,
current_hard_fork: current_hf,
})
}
.boxed()
}
}
pub struct UpdateBlockchainCacheRequest {
new_top_hash: [u8; 32],
height: u64,
timestamp: u64,
weight: usize,
long_term_weight: usize,
vote: HardFork,
}
impl<D> tower::Service<UpdateBlockchainCacheRequest> for BlockChainContextService<D>
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
type Response = ();
type Error = ConsensusError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.database.poll_ready(cx).map_err(Into::into)
}
fn call(&mut self, new: UpdateBlockchainCacheRequest) -> Self::Future {
let hardfork_state = self.hardfork_state.clone();
let difficulty_cache = self.difficulty_cache.clone();
let weight_cache = self.weight_cache.clone();
let database = self.database.clone();
async move {
difficulty_cache
.write()
.await
.new_block(new.height, new.timestamp, database.clone())
.await?;
weight_cache
.write()
.await
.new_block(
new.height,
new.weight,
new.long_term_weight,
database.clone(),
)
.await?;
hardfork_state
.write()
.await
.new_block(new.vote, new.height, database)
.await?;
Ok(())
}
.boxed()
}
}

View file

@ -1,12 +1,10 @@
use std::collections::VecDeque;
use std::ops::Range;
use std::{collections::VecDeque, ops::Range};
use tower::ServiceExt;
use tracing::instrument;
use crate::{
hardforks::HardFork, helper::median, ConsensusError, Database, DatabaseRequest,
DatabaseResponse,
helper::median, ConsensusError, Database, DatabaseRequest, DatabaseResponse, HardFork,
};
/// The amount of blocks we account for to calculate difficulty
@ -53,6 +51,7 @@ impl DifficultyCacheConfig {
}
/// This struct is able to calculate difficulties from blockchain information.
///
#[derive(Debug, Clone)]
pub struct DifficultyCache {
/// The list of timestamps in the window.
@ -74,7 +73,7 @@ impl DifficultyCache {
config: DifficultyCacheConfig,
mut database: D,
) -> Result<Self, ConsensusError> {
let DatabaseResponse::ChainHeight(chain_height) = database
let DatabaseResponse::ChainHeight(chain_height, _) = database
.ready()
.await?
.call(DatabaseRequest::ChainHeight)
@ -90,7 +89,7 @@ impl DifficultyCache {
pub async fn init_from_chain_height<D: Database + Clone>(
chain_height: u64,
config: DifficultyCacheConfig,
mut database: D,
database: D,
) -> Result<Self, ConsensusError> {
tracing::info!("Initializing difficulty cache this may take a while.");
@ -112,7 +111,7 @@ impl DifficultyCache {
config,
};
diff.update_windowed_work(&mut database).await?;
diff.update_windowed_work(database).await?;
tracing::info!(
"Current chain height: {}, accounting for {} blocks timestamps",
@ -251,22 +250,22 @@ async fn get_blocks_in_range_timestamps<D: Database + Clone>(
) -> Result<VecDeque<u64>, ConsensusError> {
tracing::info!("Getting blocks timestamps");
let DatabaseResponse::BlockPOWInfoInRange(pow_infos) = database
.oneshot(DatabaseRequest::BlockPOWInfoInRange(block_heights))
let DatabaseResponse::BlockExtendedHeaderInRange(ext_header) = database
.oneshot(DatabaseRequest::BlockExtendedHeaderInRange(block_heights))
.await?
else {
panic!("Database sent incorrect response");
};
Ok(pow_infos.into_iter().map(|info| info.timestamp).collect())
Ok(ext_header.into_iter().map(|info| info.timestamp).collect())
}
async fn get_block_cum_diff<D: Database>(database: D, height: u64) -> Result<u128, ConsensusError> {
let DatabaseResponse::BlockPOWInfo(pow) = database
.oneshot(DatabaseRequest::BlockPOWInfo(height.into()))
let DatabaseResponse::BlockExtendedHeader(ext_header) = database
.oneshot(DatabaseRequest::BlockExtendedHeader(height.into()))
.await?
else {
panic!("Database service sent incorrect response!");
};
Ok(pow.cumulative_difficulty)
Ok(ext_header.cumulative_difficulty)
}

View file

@ -1,13 +1,13 @@
use std::fmt::{Display, Formatter};
use std::ops::Range;
use std::time::Duration;
use std::{
fmt::{Display, Formatter},
ops::Range,
time::Duration,
};
use monero_serai::block::BlockHeader;
use tower::ServiceExt;
use tracing::instrument;
use cuprate_common::Network;
use crate::{ConsensusError, Database, DatabaseRequest, DatabaseResponse};
// https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#accepting-a-fork
@ -182,6 +182,13 @@ impl HardFork {
_ => BLOCK_TIME_V2,
}
}
/// Checks a blocks version and vote, assuming that `self` is the current hard-fork.
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks.html#version-and-vote
pub fn check_block_version_vote(&self, block_hf_info: &BlockHFInfo) -> bool {
self == &block_hf_info.version && &block_hf_info.vote >= self
}
}
/// A struct holding the current voting state of the blockchain.
@ -260,7 +267,7 @@ impl HardForkState {
config: HardForkConfig,
mut database: D,
) -> Result<Self, ConsensusError> {
let DatabaseResponse::ChainHeight(chain_height) = database
let DatabaseResponse::ChainHeight(chain_height, _) = database
.ready()
.await?
.call(DatabaseRequest::ChainHeight)
@ -269,15 +276,15 @@ impl HardForkState {
panic!("Database sent incorrect response")
};
let hfs = HardForkState::init_from_chain_height(config, chain_height, database).await?;
let hfs = HardForkState::init_from_chain_height(chain_height, config, database).await?;
Ok(hfs)
}
#[instrument(name = "init_hardfork_state", skip(config, database), level = "info")]
pub async fn init_from_chain_height<D: Database + Clone>(
config: HardForkConfig,
chain_height: u64,
config: HardForkConfig,
mut database: D,
) -> Result<Self, ConsensusError> {
tracing::info!("Initializing hard-fork state this may take a while.");
@ -290,16 +297,18 @@ impl HardForkState {
debug_assert_eq!(votes.total_votes(), config.window)
}
let DatabaseResponse::BlockHFInfo(hf_info) = database
let DatabaseResponse::BlockExtendedHeader(ext_header) = database
.ready()
.await?
.call(DatabaseRequest::BlockHFInfo((chain_height - 1).into()))
.call(DatabaseRequest::BlockExtendedHeader(
(chain_height - 1).into(),
))
.await?
else {
panic!("Database sent incorrect response!");
};
let current_hardfork = hf_info.version;
let current_hardfork = ext_header.version;
let next_hardfork = current_hardfork.next_fork();
@ -322,11 +331,6 @@ impl HardForkState {
Ok(hfs)
}
pub fn check_block_version_vote(&self, block_hf_info: &BlockHFInfo) -> bool {
self.current_hardfork == block_hf_info.version
&& block_hf_info.vote >= self.current_hardfork
}
pub async fn new_block<D: Database>(
&mut self,
vote: HardFork,
@ -347,10 +351,12 @@ impl HardForkState {
for height_to_remove in
(self.config.window..self.votes.total_votes()).map(|offset| height - offset)
{
let DatabaseResponse::BlockHFInfo(hf_info) = database
let DatabaseResponse::BlockExtendedHeader(ext_header) = database
.ready()
.await?
.call(DatabaseRequest::BlockHFInfo(height_to_remove.into()))
.call(DatabaseRequest::BlockExtendedHeader(
height_to_remove.into(),
))
.await?
else {
panic!("Database sent incorrect response!");
@ -359,10 +365,10 @@ impl HardForkState {
tracing::debug!(
"Removing block {} vote ({:?}) as they have left the window",
height_to_remove,
hf_info.vote
ext_header.vote
);
self.votes.remove_vote_for_hf(&hf_info.vote);
self.votes.remove_vote_for_hf(&ext_header.vote);
}
if height > self.config.window {
@ -395,6 +401,10 @@ impl HardForkState {
self.next_hardfork = new_hf.next_fork();
self.current_hardfork = new_hf;
}
pub fn current_hardfork(&self) -> HardFork {
self.current_hardfork
}
}
/// Returns the votes needed for this fork.
@ -411,8 +421,8 @@ async fn get_votes_in_range<D: Database>(
) -> Result<HFVotes, ConsensusError> {
let mut votes = HFVotes::default();
let DatabaseResponse::BlockHfInfoInRange(vote_list) = database
.oneshot(DatabaseRequest::BlockHfInfoInRange(block_heights))
let DatabaseResponse::BlockExtendedHeaderInRange(vote_list) = database
.oneshot(DatabaseRequest::BlockExtendedHeaderInRange(block_heights))
.await?
else {
panic!("Database sent incorrect response!");

View file

@ -6,17 +6,18 @@
//! For more information please see the [block weights chapter](https://cuprate.github.io/monero-book/consensus_rules/blocks/weight_limit.html)
//! in the Monero Book.
//!
use std::cmp::{max, min};
use std::collections::VecDeque;
use std::ops::Range;
use std::{
cmp::{max, min},
collections::VecDeque,
ops::Range,
};
use monero_serai::{block::Block, transaction::Transaction};
use tower::ServiceExt;
use tracing::instrument;
use crate::{
hardforks::HardFork, helper::median, ConsensusError, Database, DatabaseRequest,
DatabaseResponse,
helper::median, ConsensusError, Database, DatabaseRequest, DatabaseResponse, HardFork,
};
const PENALTY_FREE_ZONE_1: usize = 20000;
@ -102,7 +103,7 @@ impl BlockWeightsCache {
config: BlockWeightsCacheConfig,
mut database: D,
) -> Result<Self, ConsensusError> {
let DatabaseResponse::ChainHeight(chain_height) = database
let DatabaseResponse::ChainHeight(chain_height, _) = database
.ready()
.await?
.call(DatabaseRequest::ChainHeight)
@ -156,12 +157,12 @@ impl BlockWeightsCache {
///
/// The block_height **MUST** be one more than the last height the cache has
/// seen.
pub async fn new_block_added<D: Database>(
pub async fn new_block<D: Database>(
&mut self,
block_height: u64,
block_weight: usize,
long_term_weight: usize,
database: &mut D,
database: D,
) -> Result<(), ConsensusError> {
tracing::debug!(
"Adding new block's {} weights to block cache, weight: {}, long term weight: {}",
@ -181,15 +182,17 @@ impl BlockWeightsCache {
"Block {} is out of the long term weight window, removing it",
height_to_remove
);
let DatabaseResponse::BlockWeights(weights) = database
.oneshot(DatabaseRequest::BlockWeights(height_to_remove.into()))
let DatabaseResponse::BlockExtendedHeader(ext_header) = database
.oneshot(DatabaseRequest::BlockExtendedHeader(
height_to_remove.into(),
))
.await?
else {
panic!("Database sent incorrect response!");
};
let idx = self
.long_term_weights
.binary_search(&weights.long_term_weight)
.binary_search(&ext_header.long_term_weight)
.expect("Weight must be in list if in the window");
self.long_term_weights.remove(idx);
}
@ -209,6 +212,11 @@ impl BlockWeightsCache {
calculate_block_long_term_weight(hf, block_weight, &self.long_term_weights)
}
/// Returns the median long term weight over the last [`LONG_TERM_WINDOW`] blocks, or custom amount of blocks in the config.
pub fn median_long_term_weight(&self) -> usize {
median(&self.long_term_weights)
}
/// Returns the effective median weight, used for block reward calculations and to calculate
/// the block weight limit.
///
@ -295,14 +303,17 @@ async fn get_blocks_weight_in_range<D: Database + Clone>(
) -> Result<Vec<usize>, ConsensusError> {
tracing::info!("getting block weights.");
let DatabaseResponse::BlockWeightsInRange(weights) = database
.oneshot(DatabaseRequest::BlockWeightsInRange(range))
let DatabaseResponse::BlockExtendedHeaderInRange(ext_headers) = database
.oneshot(DatabaseRequest::BlockExtendedHeaderInRange(range))
.await?
else {
panic!("Database sent incorrect response!")
};
Ok(weights.into_iter().map(|info| info.block_weight).collect())
Ok(ext_headers
.into_iter()
.map(|info| info.block_weight)
.collect())
}
#[instrument(name = "get_long_term_weights", skip(database), level = "info")]
@ -312,14 +323,14 @@ async fn get_long_term_weight_in_range<D: Database + Clone>(
) -> Result<Vec<usize>, ConsensusError> {
tracing::info!("getting block long term weights.");
let DatabaseResponse::BlockWeightsInRange(weights) = database
.oneshot(DatabaseRequest::BlockWeightsInRange(range))
let DatabaseResponse::BlockExtendedHeaderInRange(ext_headers) = database
.oneshot(DatabaseRequest::BlockExtendedHeaderInRange(range))
.await?
else {
panic!("Database sent incorrect response!")
};
Ok(weights
Ok(ext_headers
.into_iter()
.map(|info| info.long_term_weight)
.collect())

View file

@ -1,14 +1,37 @@
use std::collections::{HashMap, HashSet};
pub mod block;
pub mod context;
pub mod genesis;
pub mod hardforks;
mod helper;
pub mod miner_tx;
#[cfg(feature = "binaries")]
pub mod rpc;
pub mod transactions;
pub mod verifier;
pub use block::VerifyBlockRequest;
pub use context::{ContextConfig, HardFork};
pub use transactions::VerifyTxRequest;
pub async fn initialize_verifier<D>(
database: D,
cfg: ContextConfig,
) -> Result<
(
impl tower::Service<VerifyBlockRequest, Response = (), Error = ConsensusError>,
impl tower::Service<VerifyTxRequest, Response = (), Error = ConsensusError>,
),
ConsensusError,
>
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
let context_svc = context::initialize_blockchain_context(cfg, database.clone()).await?;
let tx_svc = transactions::TxVerifierService::new(database);
let block_svc = block::BlockVerifierService::new(context_svc, tx_svc.clone());
Ok((block_svc, tx_svc))
}
#[derive(Debug, thiserror::Error)]
pub enum ConsensusError {
@ -50,16 +73,32 @@ impl<T: tower::Service<DatabaseRequest, Response = DatabaseResponse, Error = tow
{
}
#[derive(Debug)]
pub struct OutputOnChain {
height: u64,
time_lock: monero_serai::transaction::Timelock,
key: curve25519_dalek::EdwardsPoint,
mask: curve25519_dalek::EdwardsPoint,
}
#[derive(Debug, Copy, Clone)]
pub struct ExtendedBlockHeader {
pub version: HardFork,
pub vote: HardFork,
pub timestamp: u64,
pub cumulative_difficulty: u128,
pub block_weight: usize,
pub long_term_weight: usize,
}
#[derive(Debug, Clone)]
pub enum DatabaseRequest {
BlockHFInfo(cuprate_common::BlockID),
BlockPOWInfo(cuprate_common::BlockID),
BlockWeights(cuprate_common::BlockID),
BlockExtendedHeader(cuprate_common::BlockID),
BlockHash(u64),
BlockHfInfoInRange(std::ops::Range<u64>),
BlockWeightsInRange(std::ops::Range<u64>),
BlockPOWInfoInRange(std::ops::Range<u64>),
BlockExtendedHeaderInRange(std::ops::Range<u64>),
ChainHeight,
@ -72,18 +111,14 @@ pub enum DatabaseRequest {
#[derive(Debug)]
pub enum DatabaseResponse {
BlockHFInfo(hardforks::BlockHFInfo),
BlockPOWInfo(block::BlockPOWInfo),
BlockWeights(block::weight::BlockWeightInfo),
BlockExtendedHeader(ExtendedBlockHeader),
BlockHash([u8; 32]),
BlockHfInfoInRange(Vec<hardforks::BlockHFInfo>),
BlockWeightsInRange(Vec<block::BlockWeightInfo>),
BlockPOWInfoInRange(Vec<block::BlockPOWInfo>),
BlockExtendedHeaderInRange(Vec<ExtendedBlockHeader>),
ChainHeight(u64),
ChainHeight(u64, [u8; 32]),
Outputs(HashMap<u64, HashMap<u64, [curve25519_dalek::EdwardsPoint; 2]>>),
Outputs(HashMap<u64, HashMap<u64, OutputOnChain>>),
NumberOutputsWithAmount(usize),
#[cfg(feature = "binaries")]