diff --git a/consensus/src/bin/scan_chain.rs b/consensus/src/bin/scan_chain.rs index 3435e06..2fdea5f 100644 --- a/consensus/src/bin/scan_chain.rs +++ b/consensus/src/bin/scan_chain.rs @@ -1,50 +1,12 @@ #![cfg(feature = "binaries")] -use futures::Stream; -use monero_serai::rpc::HttpRpc; -use std::pin::Pin; - -use std::task::{Context, Poll}; -use tower::discover::Change; - +use tower::ServiceExt; use tracing::level_filters::LevelFilter; -use monero_consensus::block::weight::BlockWeightsCache; +use monero_consensus::block::{pow::difficulty::DifficultyCache, weight::BlockWeightsCache}; use monero_consensus::hardforks::HardFork; -use monero_consensus::rpc::{init_rpc_load_balancer, Rpc}; - -struct RpcDiscoverer(Vec, u64); - -impl Stream for RpcDiscoverer { - type Item = Result>, tower::BoxError>; - - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - if let Some(url) = this.0.pop() { - this.1 += 1; - return Poll::Ready(Some(Ok(Change::Insert(this.1, Rpc::new_http(url))))); - } - Poll::Ready(None) - } -} - -#[derive(Clone)] -pub struct Attempts(u64); - -impl tower::retry::Policy for Attempts { - type Future = futures::future::Ready; - fn retry(&self, _: &Req, result: Result<&Res, &E>) -> Option { - if result.is_err() { - Some(futures::future::ready(Attempts(self.0))) - } else { - None - } - } - - fn clone_request(&self, req: &Req) -> Option { - Some(req.clone()) - } -} +use monero_consensus::rpc::init_rpc_load_balancer; +use monero_consensus::{DatabaseRequest, DatabaseResponse}; #[tokio::main] async fn main() { @@ -70,18 +32,27 @@ async fn main() { "http://node.c3pool.com:18081".to_string(), ]; - let rpc = init_rpc_load_balancer(urls); + let mut rpc = init_rpc_load_balancer(urls); - let difficulty = BlockWeightsCache::init_from_chain_height(2984089, rpc.clone()) + let mut difficulty = DifficultyCache::init_from_chain_height(2985610, rpc.clone()) .await .unwrap(); + /* + let DatabaseResponse::BlockWeights(weights) = rpc + .oneshot(DatabaseRequest::BlockWeights(2985610.into())) + .await + .unwrap() + else { + panic!() + }; - println!( - "{:?}", - difficulty.next_block_long_term_weight(&HardFork::V15, 175819) + assert_eq!( + weights.long_term_weight, + difficulty.next_block_long_term_weight(&HardFork::V16, weights.block_weight) ); - // println!("{:?}", difficulty.next_difficulty(&HardFork::V1)); //774466376 + */ + println!("{:?}", difficulty.next_difficulty(&HardFork::V16)); //774466376 //let _hfs = HardForks::init_at_chain_height(HardForkConfig::default(), 1009827, rpc.clone()) // .await diff --git a/consensus/src/block/pow.rs b/consensus/src/block/pow.rs index 25b5bb7..2172b42 100644 --- a/consensus/src/block/pow.rs +++ b/consensus/src/block/pow.rs @@ -8,6 +8,9 @@ pub struct BlockPOWInfo { pub cumulative_difficulty: u128, } +/// Returns if the blocks POW hash is valid for the current difficulty. +/// +/// See: https://cuprate.github.io/monero-book/consensus_rules/blocks/difficulty.html#checking-a-blocks-proof-of-work pub fn check_block_pow(hash: &[u8; 32], difficulty: u128) -> bool { let int_hash = U256::from_le_slice(hash); diff --git a/consensus/src/block/pow/difficulty.rs b/consensus/src/block/pow/difficulty.rs index b75f99a..d13d355 100644 --- a/consensus/src/block/pow/difficulty.rs +++ b/consensus/src/block/pow/difficulty.rs @@ -4,7 +4,7 @@ use std::ops::Range; use tower::ServiceExt; use tracing::instrument; -use crate::{hardforks::HardFork, Database, DatabaseRequest, DatabaseResponse, Error}; +use crate::{hardforks::HardFork, ConsensusError, Database, DatabaseRequest, DatabaseResponse}; /// The amount of blocks we account for to calculate difficulty const DIFFICULTY_WINDOW: usize = 720; @@ -22,7 +22,7 @@ const DIFFICULTY_ACCOUNTED_WINDOW_LEN: usize = DIFFICULTY_WINDOW - 2 * DIFFICULT /// This struct is able to calculate difficulties from blockchain information. #[derive(Debug)] -pub struct DifficultyCalculator { +pub struct DifficultyCache { /// The list of timestamps in the window. /// len <= [`DIFFICULTY_BLOCKS_COUNT`] timestamps: Vec, @@ -33,8 +33,8 @@ pub struct DifficultyCalculator { last_accounted_height: u64, } -impl DifficultyCalculator { - pub async fn init(mut database: D) -> Result { +impl DifficultyCache { + pub async fn init(mut database: D) -> Result { let DatabaseResponse::ChainHeight(chain_height) = database .ready() .await? @@ -44,13 +44,14 @@ impl DifficultyCalculator { panic!("Database sent incorrect response") }; - DifficultyCalculator::init_from_chain_height(chain_height, database).await + DifficultyCache::init_from_chain_height(chain_height, database).await } + #[instrument(name = "init_difficulty_cache", level = "info", skip(database))] pub async fn init_from_chain_height( chain_height: u64, mut database: D, - ) -> Result { + ) -> Result { let mut block_start = chain_height.saturating_sub(DIFFICULTY_BLOCKS_COUNT); if block_start == 0 { @@ -60,13 +61,7 @@ impl DifficultyCalculator { let timestamps = get_blocks_in_range_timestamps(database.clone(), block_start..chain_height).await?; - tracing::debug!( - "Current chain height: {}, accounting for {} blocks timestamps", - chain_height, - timestamps.len() - ); - - let mut diff = DifficultyCalculator { + let mut diff = DifficultyCache { timestamps, windowed_work: 0, last_accounted_height: chain_height - 1, @@ -74,10 +69,19 @@ impl DifficultyCalculator { diff.update_windowed_work(&mut database).await?; + tracing::info!( + "Current chain height: {}, accounting for {} blocks timestamps", + chain_height, + diff.timestamps.len() + ); + Ok(diff) } - pub async fn resync(&mut self, mut database: D) -> Result<(), Error> { + pub async fn resync( + &mut self, + mut database: D, + ) -> Result<(), ConsensusError> { let DatabaseResponse::ChainHeight(chain_height) = database .ready() .await? @@ -114,7 +118,10 @@ impl DifficultyCalculator { self.update_windowed_work(database).await } - async fn update_windowed_work(&mut self, mut database: D) -> Result<(), Error> { + async fn update_windowed_work( + &mut self, + mut database: D, + ) -> Result<(), ConsensusError> { let mut block_start = (self.last_accounted_height + 1).saturating_sub(DIFFICULTY_BLOCKS_COUNT); @@ -140,6 +147,9 @@ impl DifficultyCalculator { Ok(()) } + /// Returns the required difficulty for the next block. + /// + /// See: https://cuprate.github.io/monero-book/consensus_rules/blocks/difficulty.html#calculating-difficulty pub fn next_difficulty(&self, hf: &HardFork) -> u128 { if self.timestamps.len() <= 1 { return 1; @@ -179,31 +189,22 @@ fn get_window_start_and_end(window_len: usize) -> (usize, usize) { } } -#[instrument(skip(database))] +#[instrument(name = "get_blocks_timestamps", skip(database))] async fn get_blocks_in_range_timestamps( database: D, block_heights: Range, -) -> Result, Error> { - let timestamp_fut = FuturesOrdered::from_iter( - block_heights - .map(|height| get_block_timestamp(database.clone(), height).map_ok(move |res| res)), - ); - - timestamp_fut.try_collect().await -} - -async fn get_block_timestamp(database: D, height: u64) -> Result { - tracing::debug!("Getting block timestamp: {}", height); - let DatabaseResponse::BlockPOWInfo(pow) = database - .oneshot(DatabaseRequest::BlockPOWInfo(height.into())) +) -> Result, ConsensusError> { + let DatabaseResponse::BlockPOWInfoInRange(pow_infos) = database + .oneshot(DatabaseRequest::BlockPOWInfoInRange(block_heights)) .await? else { - panic!("Database service sent incorrect response!"); + panic!("Database sent incorrect response"); }; - Ok(pow.timestamp) + + Ok(pow_infos.into_iter().map(|info| info.timestamp).collect()) } -async fn get_block_cum_diff(database: D, height: u64) -> Result { +async fn get_block_cum_diff(database: D, height: u64) -> Result { let DatabaseResponse::BlockPOWInfo(pow) = database .oneshot(DatabaseRequest::BlockPOWInfo(height.into())) .await? diff --git a/consensus/src/block/weight.rs b/consensus/src/block/weight.rs index 3094bfd..d17e8f6 100644 --- a/consensus/src/block/weight.rs +++ b/consensus/src/block/weight.rs @@ -1,11 +1,20 @@ +//! # Block Weights +//! +//! This module contains calculations for block weights, including calculating block weight +//! limits, effective medians and long term block weights. +//! +//! For more information please see the [block weights chapter](https://cuprate.github.io/monero-book/consensus_rules/blocks/weight_limit.html) +//! in the Monero Book. +//! use std::cmp::{max, min}; +use std::collections::VecDeque; use std::ops::Range; use monero_serai::{block::Block, transaction::Transaction}; use tower::ServiceExt; use tracing::instrument; -use crate::{hardforks::HardFork, Database, DatabaseRequest, DatabaseResponse, Error}; +use crate::{hardforks::HardFork, ConsensusError, Database, DatabaseRequest, DatabaseResponse}; const PENALTY_FREE_ZONE_1: usize = 20000; const PENALTY_FREE_ZONE_2: usize = 60000; @@ -43,9 +52,14 @@ pub fn penalty_free_zone(hf: &HardFork) -> usize { } } +/// A cache used to calculate block weight limits, the effective median and +/// long term block weights. +/// +/// These calculations require a lot of data from the database so by caching +/// this data it reduces the load on the database. pub struct BlockWeightsCache { /// This list is not sorted. - short_term_block_weights: Vec, + short_term_block_weights: VecDeque, /// This list is sorted. long_term_weights: Vec, /// The height of the top block. @@ -53,7 +67,8 @@ pub struct BlockWeightsCache { } impl BlockWeightsCache { - pub async fn init(mut database: D) -> Result { + /// Initialize the [`BlockWeightsCache`] at the the height of the database. + pub async fn init(mut database: D) -> Result { let DatabaseResponse::ChainHeight(chain_height) = database .ready() .await? @@ -66,10 +81,12 @@ impl BlockWeightsCache { Self::init_from_chain_height(chain_height, database).await } + /// Initialize the [`BlockWeightsCache`] at the the given chain height. + #[instrument(name = "init_weight_cache", level = "info", skip(database))] pub async fn init_from_chain_height( chain_height: u64, database: D, - ) -> Result { + ) -> Result { let mut long_term_weights = get_long_term_weight_in_range( chain_height.saturating_sub(LONG_TERM_WINDOW)..chain_height, database.clone(), @@ -82,11 +99,14 @@ impl BlockWeightsCache { long_term_weights.len() ); - let short_term_block_weights = get_blocks_weight_in_range( + let short_term_block_weights: VecDeque = get_blocks_weight_in_range( chain_height.saturating_sub(SHORT_TERM_WINDOW)..chain_height, database, ) - .await?; + .await? + .into(); + + tracing::info!("Initialized block weight cache, chain-height: {:?}, long term weights length: {:?}, short term weights length: {:?}", chain_height, long_term_weights.len(), short_term_block_weights.len()); Ok(BlockWeightsCache { short_term_block_weights, @@ -95,12 +115,111 @@ impl BlockWeightsCache { }) } + /// Add a new block to the cache. + /// + /// The block_height **MUST** be one more than the last height the cache has + /// seen. + pub async fn new_block_added( + &mut self, + block_height: u64, + block_weight: usize, + long_term_weight: usize, + database: &mut D, + ) -> Result<(), ConsensusError> { + tracing::debug!( + "Adding new block's {} weights to block cache, weight: {}, long term weight: {}", + block_weight, + block_weight, + long_term_weight + ); + assert_eq!(self.tip_height + 1, block_height); + self.tip_height += 1; + + match self.long_term_weights.binary_search(&long_term_weight) { + Ok(idx) | Err(idx) => self.long_term_weights.insert(idx, long_term_weight), + }; + + if let Some(height_to_remove) = block_height.checked_sub(LONG_TERM_WINDOW) { + tracing::debug!( + "Block {} is out of the long term weight window, removing it", + height_to_remove + ); + let DatabaseResponse::BlockWeights(weights) = database + .oneshot(DatabaseRequest::BlockWeights(height_to_remove.into())) + .await? + else { + panic!("Database sent incorrect response!"); + }; + let idx = self + .long_term_weights + .binary_search(&weights.long_term_weight) + .expect("Weight must be in list if in the window"); + self.long_term_weights.remove(idx); + } + + self.short_term_block_weights.push_back(block_weight); + if self.short_term_block_weights.len() > SHORT_TERM_WINDOW.try_into().unwrap() { + self.short_term_block_weights.pop_front(); + } + + Ok(()) + } + + /// Returns the next blocks long term weight. + /// + /// See: https://cuprate.github.io/monero-book/consensus_rules/blocks/weight_limit.html#calculating-a-blocks-long-term-weight pub fn next_block_long_term_weight(&self, hf: &HardFork, block_weight: usize) -> usize { calculate_block_long_term_weight(hf, block_weight, &self.long_term_weights) } + + /// Returns the effective median weight, used for block reward calculations and to calculate + /// the block weight limit. + /// + /// See: https://cuprate.github.io/monero-book/consensus_rules/blocks/weight_limit.html#calculating-effective-median-weight + pub fn effective_median_block_weight(&self, hf: &HardFork) -> usize { + let mut sorted_short_term_weights: Vec = + self.short_term_block_weights.clone().into(); + sorted_short_term_weights.sort_unstable(); + calculate_effective_median_block_weight( + hf, + &sorted_short_term_weights, + &self.long_term_weights, + ) + } + + /// Returns the block weight limit. + pub fn next_block_weight_limit(&self, hf: &HardFork) -> usize { + 2 * self.effective_median_block_weight(hf) + } } -pub fn calculate_block_long_term_weight( +fn calculate_effective_median_block_weight( + hf: &HardFork, + sorted_short_term_window: &[usize], + sorted_long_term_window: &[usize], +) -> usize { + if hf.in_range(&HardFork::V1, &HardFork::V10) { + return median(sorted_short_term_window); + } + + let long_term_median = median(sorted_long_term_window).max(PENALTY_FREE_ZONE_5); + let short_term_median = median(sorted_short_term_window); + let effective_median = if hf.in_range(&HardFork::V10, &HardFork::V15) { + min( + max(PENALTY_FREE_ZONE_5, short_term_median), + 50 * long_term_median, + ) + } else { + min( + max(long_term_median, short_term_median), + 50 * long_term_median, + ) + }; + + effective_median.max(penalty_free_zone(hf)) +} + +fn calculate_block_long_term_weight( hf: &HardFork, block_weight: usize, sorted_long_term_window: &[usize], @@ -142,31 +261,31 @@ fn median(array: &[usize]) -> usize { } } -#[instrument(skip(database))] +#[instrument(name = "get_block_weights", skip(database))] async fn get_blocks_weight_in_range( range: Range, database: D, -) -> Result, Error> { +) -> Result, ConsensusError> { let DatabaseResponse::BlockWeightsInRange(weights) = database .oneshot(DatabaseRequest::BlockWeightsInRange(range)) .await? else { - panic!() + panic!("Database sent incorrect response!") }; Ok(weights.into_iter().map(|info| info.block_weight).collect()) } -#[instrument(skip(database))] +#[instrument(name = "get_long_term_weights", skip(database))] async fn get_long_term_weight_in_range( range: Range, database: D, -) -> Result, Error> { +) -> Result, ConsensusError> { let DatabaseResponse::BlockWeightsInRange(weights) = database .oneshot(DatabaseRequest::BlockWeightsInRange(range)) .await? else { - panic!() + panic!("Database sent incorrect response!") }; Ok(weights diff --git a/consensus/src/hardforks.rs b/consensus/src/hardforks.rs index c83eec8..0d69c89 100644 --- a/consensus/src/hardforks.rs +++ b/consensus/src/hardforks.rs @@ -1,5 +1,3 @@ -use futures::stream::FuturesUnordered; -use futures::{StreamExt, TryFutureExt}; use std::fmt::{Display, Formatter}; use std::ops::Range; @@ -7,13 +5,35 @@ use monero_serai::block::BlockHeader; use tower::ServiceExt; use tracing::instrument; -use cuprate_common::{BlockID, Network}; +use cuprate_common::Network; -use crate::{Database, DatabaseRequest, DatabaseResponse, Error}; +use crate::{ConsensusError, Database, DatabaseRequest, DatabaseResponse}; // https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#accepting-a-fork const DEFAULT_WINDOW_SIZE: u64 = 10080; // supermajority window check length - a week +#[derive(Debug, Clone, Copy)] +pub struct BlockHFInfo { + version: HardFork, + vote: HardFork, +} + +impl BlockHFInfo { + pub fn from_block_header(block_header: &BlockHeader) -> Result { + BlockHFInfo::from_major_minor(block_header.major_version, block_header.minor_version) + } + + pub fn from_major_minor( + major_version: u8, + minor_version: u8, + ) -> Result { + Ok(BlockHFInfo { + version: HardFork::from_version(&major_version)?, + vote: HardFork::from_vote(&minor_version), + }) + } +} + /// An identifier for every hard-fork Monero has had. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] #[repr(u8)] @@ -41,7 +61,7 @@ impl HardFork { /// Returns the hard-fork for a blocks `major_version` field. /// /// https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#blocks-version-and-vote - pub fn from_version(version: &u8) -> Result { + pub fn from_version(version: &u8) -> Result { Ok(match version { 1 => HardFork::V1, 2 => HardFork::V2, @@ -60,7 +80,7 @@ impl HardFork { 15 => HardFork::V15, 16 => HardFork::V16, _ => { - return Err(Error::InvalidHardForkVersion( + return Err(ConsensusError::InvalidHardForkVersion( "Version is not a known hard fork", )) } @@ -240,7 +260,7 @@ impl HardForks { pub async fn init( config: HardForkConfig, mut database: D, - ) -> Result + ) -> Result where D::Future: Send + 'static, { @@ -256,11 +276,6 @@ impl HardForks { let mut hfs = HardForks::init_at_chain_height(config, chain_height, database.clone()).await?; - // This is only needed if the database moves independently of the HardFork class aka if we are checking a node instead of keeping state ourself. - hfs.resync(&mut database).await?; - - hfs.check_set_new_hf(); - tracing::info!("HardFork state: {:?}", hfs); Ok(hfs) @@ -270,7 +285,7 @@ impl HardForks { config: HardForkConfig, chain_height: u64, mut database: D, - ) -> Result + ) -> Result where D::Future: Send + 'static, { @@ -282,10 +297,16 @@ impl HardForks { debug_assert_eq!(votes.total_votes(), config.window) } - let latest_header = get_block_header(&mut database, chain_height - 1).await?; + let DatabaseResponse::BlockHfInfo(hf_info) = database + .ready() + .await? + .call(DatabaseRequest::BlockPOWInfo((chain_height - 1).into())) + .await? + else { + panic!("Database sent incorrect response!"); + }; - let current_hardfork = HardFork::from_version(&latest_header.major_version) - .expect("Invalid major version in stored block"); + let current_hardfork = hf_info.version; let next_hardfork = current_hardfork.next_fork(); @@ -304,69 +325,18 @@ impl HardForks { Ok(hfs) } - #[instrument(skip(self, database))] - async fn resync(&mut self, mut database: D) -> Result<(), Error> { - let DatabaseResponse::ChainHeight(mut chain_height) = database - .ready() - .await? - .call(DatabaseRequest::ChainHeight) - .await? - else { - panic!("Database sent incorrect response") - }; - - tracing::debug!( - "chain-tip: {}, last height: {}", - chain_height - 1, - self.last_height - ); - - loop { - while chain_height > self.last_height + 1 { - self.get_and_account_new_block(self.last_height + 1, &mut database) - .await?; - } - - let DatabaseResponse::ChainHeight(c_h) = database - .ready() - .await? - .call(DatabaseRequest::ChainHeight) - .await? - else { - panic!("Database sent incorrect response") - }; - chain_height = c_h; - - if chain_height == self.last_height + 1 { - return Ok(()); - } - - tracing::debug!( - "chain-tip: {}, last height: {}", - chain_height - 1, - self.last_height - ); - } + pub fn check_block_version_vote(&self, block_hf_info: &BlockHFInfo) -> bool { + &self.current_hardfork == &block_hf_info.version + && &block_hf_info.vote >= &self.current_hardfork } - async fn get_and_account_new_block( + pub async fn new_block( &mut self, + vote: HardFork, height: u64, mut database: D, - ) -> Result<(), Error> { - let header = get_block_header(&mut database, height).await?; - - self.new_block(HardFork::from_vote(&header.minor_version), height, database) - .await; - Ok(()) - } - - pub fn check_block_version_vote(&self, version: &HardFork, vote: &HardFork) -> bool { - &self.current_hardfork == version && vote >= &self.current_hardfork - } - - pub async fn new_block(&mut self, vote: HardFork, height: u64, mut database: D) { - debug_assert_eq!(self.last_height + 1, height); + ) -> Result<(), ConsensusError> { + assert_eq!(self.last_height + 1, height); self.last_height += 1; tracing::debug!( @@ -377,29 +347,36 @@ impl HardForks { self.votes.add_vote_for_hf(&vote); - for offset in self.config.window..self.votes.total_votes() { - let header = get_block_header(&mut database, height - offset) - .await - .expect("Error retrieving block we should have in database"); + for height_to_remove in + (self.config.window..self.votes.total_votes()).map(|offset| height - offset) + { + let DatabaseResponse::BlockHfInfo(hf_info) = database + .ready() + .await? + .call(DatabaseRequest::BlockPOWInfo(height_to_remove.into())) + .await? + else { + panic!("Database sent incorrect response!"); + }; - let vote = HardFork::from_vote(&header.minor_version); tracing::debug!( "Removing block {} vote ({:?}) as they have left the window", - height - offset, - vote + height_to_remove, + hf_info.vote ); - self.votes.remove_vote_for_hf(&vote); + self.votes.remove_vote_for_hf(&hf_info.vote); } if height > self.config.window { debug_assert_eq!(self.votes.total_votes(), self.config.window); } - self.check_set_new_hf() + self.check_set_new_hf(); + Ok(()) } - /// Checks if the next hard-fork should be activated and sets it it it should. + /// Checks if the next hard-fork should be activated and activates it if it should. /// /// https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#accepting-a-fork fn check_set_new_hf(&mut self) { @@ -422,42 +399,26 @@ impl HardForks { } } -#[instrument(skip(database))] +#[instrument(name = "get_votes", skip(database))] async fn get_votes_in_range( database: D, block_heights: Range, -) -> Result +) -> Result where D::Future: Send + 'static, { let mut votes = HFVotes::default(); - let mut fut = - FuturesUnordered::from_iter(block_heights.map(|height| { - get_block_header(database.clone(), height).map_ok(move |res| (height, res)) - })); + let DatabaseResponse::BlockHfInfoInRange(vote_list) = database + .oneshot(DatabaseRequest::BlockHfInfoInRange(block_heights)) + .await? + else { + panic!("Database sent incorrect response!"); + }; - while let Some(res) = fut.next().await { - let (height, header): (u64, BlockHeader) = res?; - let vote = HardFork::from_vote(&header.minor_version); - - tracing::debug!("Block vote for height: {} = {:?}", height, vote); - - votes.add_vote_for_hf(&HardFork::from_vote(&header.minor_version)); + for hf_info in vote_list.into_iter() { + votes.add_vote_for_hf(&hf_info.vote); } Ok(votes) } - -async fn get_block_header( - database: D, - block_id: impl Into, -) -> Result { - let DatabaseResponse::BlockHeader(header) = database - .oneshot(DatabaseRequest::BlockHeader(block_id.into())) - .await? - else { - panic!("Database sent incorrect response for block header request") - }; - Ok(header) -} diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 67b21c1..dbf7a28 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -6,7 +6,7 @@ pub mod miner_tx; pub mod rpc; #[derive(Debug, thiserror::Error)] -pub enum Error { +pub enum ConsensusError { #[error("Invalid hard fork version: {0}")] InvalidHardForkVersion(&'static str), #[error("Database error: {0}")] @@ -25,22 +25,26 @@ impl), BlockWeightsInRange(std::ops::Range), + BlockPOWInfoInRange(std::ops::Range), ChainHeight, } #[derive(Debug)] pub enum DatabaseResponse { - BlockHeader(monero_serai::block::BlockHeader), + BlockHfInfo(hardforks::BlockHFInfo), BlockPOWInfo(block::pow::BlockPOWInfo), BlockWeights(block::weight::BlockWeightInfo), + BlockHfInfoInRange(Vec), BlockWeightsInRange(Vec), + BlockPOWInfoInRange(Vec), ChainHeight(u64), } diff --git a/consensus/src/rpc.rs b/consensus/src/rpc.rs index b48815a..46df90b 100644 --- a/consensus/src/rpc.rs +++ b/consensus/src/rpc.rs @@ -18,9 +18,10 @@ use cuprate_common::BlockID; use crate::block::pow::BlockPOWInfo; use crate::block::weight::BlockWeightInfo; +use crate::hardforks::BlockHFInfo; use crate::{DatabaseRequest, DatabaseResponse}; -const MAX_BLOCKS_IN_RANGE: u64 = 25; +const MAX_BLOCKS_IN_RANGE: u64 = 50; #[derive(Clone)] pub struct Attempts(u64); @@ -84,40 +85,96 @@ where let this = self.rpcs.clone(); match req { - DatabaseRequest::BlockWeightsInRange(range) => async move { - let res_to_weights = |res| { - let DatabaseResponse::BlockWeightsInRange(range) = res else { - panic!("Incorrect Response!"); + DatabaseRequest::BlockPOWInfoInRange(range) => { + let resp_to_ret = |resp: DatabaseResponse| { + let DatabaseResponse::BlockPOWInfoInRange(pow_info) = resp else { + panic!("Database sent incorrect response"); }; - range + pow_info }; - - let iter = (0..range.clone().count() as u64) - .step_by(MAX_BLOCKS_IN_RANGE as usize) - .map(|i| { - let new_range = (range.start + i) - ..(min(range.start + i + MAX_BLOCKS_IN_RANGE, range.end)); - this.clone() - .oneshot(DatabaseRequest::BlockWeightsInRange(new_range)) - .map_ok(res_to_weights) - }); - - let fut = FuturesOrdered::from_iter(iter); - - let mut res = Vec::with_capacity(range.count()); - - for mut rpc_res in fut.try_collect::>>().await?.into_iter() { - res.append(&mut rpc_res) - } - - Ok(DatabaseResponse::BlockWeightsInRange(res)) + split_range_request( + this, + range, + DatabaseRequest::BlockPOWInfoInRange, + DatabaseResponse::BlockPOWInfoInRange, + resp_to_ret, + ) + } + + DatabaseRequest::BlockWeightsInRange(range) => { + let resp_to_ret = |resp: DatabaseResponse| { + let DatabaseResponse::BlockWeightsInRange(weights) = resp else { + panic!("Database sent incorrect response"); + }; + weights + }; + split_range_request( + this, + range, + DatabaseRequest::BlockWeightsInRange, + DatabaseResponse::BlockWeightsInRange, + resp_to_ret, + ) + } + DatabaseRequest::BlockHfInfoInRange(range) => { + let resp_to_ret = |resp: DatabaseResponse| { + let DatabaseResponse::BlockHfInfoInRange(hf_info) = resp else { + panic!("Database sent incorrect response"); + }; + hf_info + }; + split_range_request( + this, + range, + DatabaseRequest::BlockHfInfoInRange, + DatabaseResponse::BlockHfInfoInRange, + resp_to_ret, + ) } - .boxed(), req => this.oneshot(req).boxed(), } } } +fn split_range_request( + rpc: T, + range: Range, + req: impl FnOnce(Range) -> DatabaseRequest + Clone + Send + 'static, + resp: impl FnOnce(Vec) -> DatabaseResponse + Send + 'static, + resp_to_ret: impl Fn(DatabaseResponse) -> Vec + Copy + Send + 'static, +) -> Pin> + Send + 'static>> +where + T: tower::Service + + Clone + + Send + + Sync + + 'static, + T::Future: Send + 'static, + Ret: Send + 'static, +{ + let iter = (0..range.clone().count() as u64) + .step_by(MAX_BLOCKS_IN_RANGE as usize) + .map(|i| { + let req = req.clone(); + let new_range = + (range.start + i)..(min(range.start + i + MAX_BLOCKS_IN_RANGE, range.end)); + rpc.clone().oneshot(req(new_range)).map_ok(resp_to_ret) + }); + + let fut = FuturesOrdered::from_iter(iter); + + let mut res = Vec::with_capacity(range.count()); + + async move { + for mut rpc_res in fut.try_collect::>>().await?.into_iter() { + res.append(&mut rpc_res) + } + + Ok(resp(res)) + } + .boxed() +} + enum RpcState { Locked, Acquiring(OwnedMutexLockFuture>), @@ -194,35 +251,18 @@ impl tower::Service f } .boxed(), - DatabaseRequest::BlockHeader(id) => match id { - BlockID::Hash(hash) => async move { - let res: Result<_, RpcError> = rpc - .get_block(hash) - .map_ok(|block| DatabaseResponse::BlockHeader(block.header)) - .await; - if let Err(e) = &res { - *err_slot.lock().unwrap() = Some(e.clone()); - } - res.map_err(Into::into) - } - .boxed(), - BlockID::Height(height) => async move { - let res: Result<_, RpcError> = rpc - .get_block_by_number(height.try_into().unwrap()) - .map_ok(|block| DatabaseResponse::BlockHeader(block.header)) - .await; - if let Err(e) = &res { - *err_slot.lock().unwrap() = Some(e.clone()); - } - res.map_err(Into::into) - } - .boxed(), - }, DatabaseRequest::BlockPOWInfo(id) => get_blocks_pow_info(id, rpc).boxed(), DatabaseRequest::BlockWeights(id) => get_blocks_weight_info(id, rpc).boxed(), + DatabaseRequest::BlockHFInfo(id) => get_blocks_hf_info(id, rpc).boxed(), + DatabaseRequest::BlockHfInfoInRange(range) => { + get_blocks_hf_info_in_range(range, rpc).boxed() + } DatabaseRequest::BlockWeightsInRange(range) => { get_blocks_weight_info_in_range(range, rpc).boxed() } + DatabaseRequest::BlockPOWInfoInRange(range) => { + get_blocks_pow_info_in_range(range, rpc).boxed() + } } } } @@ -234,6 +274,9 @@ struct BlockInfo { timestamp: u64, block_weight: usize, long_term_weight: usize, + + major_version: u8, + minor_version: u8, } async fn get_block_info_in_range( @@ -252,7 +295,7 @@ async fn get_block_info_in_range( ) .await?; - tracing::info!("Retrieved blocks in range: {:?}", range); + tracing::debug!("Retrieved blocks in range: {:?}", range); Ok(res.headers) } @@ -303,6 +346,25 @@ async fn get_blocks_weight_info_in_range( )) } +async fn get_blocks_pow_info_in_range( + range: Range, + rpc: OwnedMutexGuard>, +) -> Result { + let info = get_block_info_in_range(range, rpc).await?; + + Ok(DatabaseResponse::BlockPOWInfoInRange( + info.into_iter() + .map(|info| BlockPOWInfo { + timestamp: info.timestamp, + cumulative_difficulty: u128_from_low_high( + info.cumulative_difficulty, + info.cumulative_difficulty_top64, + ), + }) + .collect(), + )) +} + async fn get_blocks_weight_info( id: BlockID, rpc: OwnedMutexGuard>, @@ -334,3 +396,29 @@ fn u128_from_low_high(low: u64, high: u64) -> u128 { let res: u128 = high as u128; res << 64 | low as u128 } + +async fn get_blocks_hf_info( + id: BlockID, + rpc: OwnedMutexGuard>, +) -> Result { + let info = get_block_info(id, rpc).await?; + + Ok(DatabaseResponse::BlockHfInfo( + BlockHFInfo::from_major_minor(info.major_version, info.minor_version)?, + )) +} + +async fn get_blocks_hf_info_in_range( + range: Range, + rpc: OwnedMutexGuard>, +) -> Result { + let info = get_block_info_in_range(range, rpc).await?; + + Ok(DatabaseResponse::BlockHfInfoInRange( + info.into_iter() + .map(|info| { + BlockHFInfo::from_major_minor(info.major_version, info.minor_version).unwrap() + }) + .collect(), + )) +}