add effective median weight calculations

+ more rpc speedup for the rest of the consensus lib
This commit is contained in:
Boog900 2023-10-02 21:07:11 +01:00
parent d7798328b6
commit eb3c727b4d
No known key found for this signature in database
GPG key ID: 5401367FB7302004
7 changed files with 405 additions and 258 deletions

View file

@ -1,50 +1,12 @@
#![cfg(feature = "binaries")] #![cfg(feature = "binaries")]
use futures::Stream; use tower::ServiceExt;
use monero_serai::rpc::HttpRpc;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::discover::Change;
use tracing::level_filters::LevelFilter; use tracing::level_filters::LevelFilter;
use monero_consensus::block::weight::BlockWeightsCache; use monero_consensus::block::{pow::difficulty::DifficultyCache, weight::BlockWeightsCache};
use monero_consensus::hardforks::HardFork; use monero_consensus::hardforks::HardFork;
use monero_consensus::rpc::{init_rpc_load_balancer, Rpc}; use monero_consensus::rpc::init_rpc_load_balancer;
use monero_consensus::{DatabaseRequest, DatabaseResponse};
struct RpcDiscoverer(Vec<String>, u64);
impl Stream for RpcDiscoverer {
type Item = Result<Change<u64, Rpc<HttpRpc>>, tower::BoxError>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(url) = this.0.pop() {
this.1 += 1;
return Poll::Ready(Some(Ok(Change::Insert(this.1, Rpc::new_http(url)))));
}
Poll::Ready(None)
}
}
#[derive(Clone)]
pub struct Attempts(u64);
impl<Req: Clone, Res, E> tower::retry::Policy<Req, Res, E> for Attempts {
type Future = futures::future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
if result.is_err() {
Some(futures::future::ready(Attempts(self.0)))
} else {
None
}
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
@ -70,18 +32,27 @@ async fn main() {
"http://node.c3pool.com:18081".to_string(), "http://node.c3pool.com:18081".to_string(),
]; ];
let rpc = init_rpc_load_balancer(urls); let mut rpc = init_rpc_load_balancer(urls);
let difficulty = BlockWeightsCache::init_from_chain_height(2984089, rpc.clone()) let mut difficulty = DifficultyCache::init_from_chain_height(2985610, rpc.clone())
.await .await
.unwrap(); .unwrap();
/*
let DatabaseResponse::BlockWeights(weights) = rpc
.oneshot(DatabaseRequest::BlockWeights(2985610.into()))
.await
.unwrap()
else {
panic!()
};
println!( assert_eq!(
"{:?}", weights.long_term_weight,
difficulty.next_block_long_term_weight(&HardFork::V15, 175819) difficulty.next_block_long_term_weight(&HardFork::V16, weights.block_weight)
); );
// println!("{:?}", difficulty.next_difficulty(&HardFork::V1)); //774466376 */
println!("{:?}", difficulty.next_difficulty(&HardFork::V16)); //774466376
//let _hfs = HardForks::init_at_chain_height(HardForkConfig::default(), 1009827, rpc.clone()) //let _hfs = HardForks::init_at_chain_height(HardForkConfig::default(), 1009827, rpc.clone())
// .await // .await

View file

@ -8,6 +8,9 @@ pub struct BlockPOWInfo {
pub cumulative_difficulty: u128, pub cumulative_difficulty: u128,
} }
/// Returns if the blocks POW hash is valid for the current difficulty.
///
/// See: https://cuprate.github.io/monero-book/consensus_rules/blocks/difficulty.html#checking-a-blocks-proof-of-work
pub fn check_block_pow(hash: &[u8; 32], difficulty: u128) -> bool { pub fn check_block_pow(hash: &[u8; 32], difficulty: u128) -> bool {
let int_hash = U256::from_le_slice(hash); let int_hash = U256::from_le_slice(hash);

View file

@ -4,7 +4,7 @@ use std::ops::Range;
use tower::ServiceExt; use tower::ServiceExt;
use tracing::instrument; use tracing::instrument;
use crate::{hardforks::HardFork, Database, DatabaseRequest, DatabaseResponse, Error}; use crate::{hardforks::HardFork, ConsensusError, Database, DatabaseRequest, DatabaseResponse};
/// The amount of blocks we account for to calculate difficulty /// The amount of blocks we account for to calculate difficulty
const DIFFICULTY_WINDOW: usize = 720; const DIFFICULTY_WINDOW: usize = 720;
@ -22,7 +22,7 @@ const DIFFICULTY_ACCOUNTED_WINDOW_LEN: usize = DIFFICULTY_WINDOW - 2 * DIFFICULT
/// This struct is able to calculate difficulties from blockchain information. /// This struct is able to calculate difficulties from blockchain information.
#[derive(Debug)] #[derive(Debug)]
pub struct DifficultyCalculator { pub struct DifficultyCache {
/// The list of timestamps in the window. /// The list of timestamps in the window.
/// len <= [`DIFFICULTY_BLOCKS_COUNT`] /// len <= [`DIFFICULTY_BLOCKS_COUNT`]
timestamps: Vec<u64>, timestamps: Vec<u64>,
@ -33,8 +33,8 @@ pub struct DifficultyCalculator {
last_accounted_height: u64, last_accounted_height: u64,
} }
impl DifficultyCalculator { impl DifficultyCache {
pub async fn init<D: Database + Clone>(mut database: D) -> Result<Self, Error> { pub async fn init<D: Database + Clone>(mut database: D) -> Result<Self, ConsensusError> {
let DatabaseResponse::ChainHeight(chain_height) = database let DatabaseResponse::ChainHeight(chain_height) = database
.ready() .ready()
.await? .await?
@ -44,13 +44,14 @@ impl DifficultyCalculator {
panic!("Database sent incorrect response") panic!("Database sent incorrect response")
}; };
DifficultyCalculator::init_from_chain_height(chain_height, database).await DifficultyCache::init_from_chain_height(chain_height, database).await
} }
#[instrument(name = "init_difficulty_cache", level = "info", skip(database))]
pub async fn init_from_chain_height<D: Database + Clone>( pub async fn init_from_chain_height<D: Database + Clone>(
chain_height: u64, chain_height: u64,
mut database: D, mut database: D,
) -> Result<Self, Error> { ) -> Result<Self, ConsensusError> {
let mut block_start = chain_height.saturating_sub(DIFFICULTY_BLOCKS_COUNT); let mut block_start = chain_height.saturating_sub(DIFFICULTY_BLOCKS_COUNT);
if block_start == 0 { if block_start == 0 {
@ -60,13 +61,7 @@ impl DifficultyCalculator {
let timestamps = let timestamps =
get_blocks_in_range_timestamps(database.clone(), block_start..chain_height).await?; get_blocks_in_range_timestamps(database.clone(), block_start..chain_height).await?;
tracing::debug!( let mut diff = DifficultyCache {
"Current chain height: {}, accounting for {} blocks timestamps",
chain_height,
timestamps.len()
);
let mut diff = DifficultyCalculator {
timestamps, timestamps,
windowed_work: 0, windowed_work: 0,
last_accounted_height: chain_height - 1, last_accounted_height: chain_height - 1,
@ -74,10 +69,19 @@ impl DifficultyCalculator {
diff.update_windowed_work(&mut database).await?; diff.update_windowed_work(&mut database).await?;
tracing::info!(
"Current chain height: {}, accounting for {} blocks timestamps",
chain_height,
diff.timestamps.len()
);
Ok(diff) Ok(diff)
} }
pub async fn resync<D: Database + Clone>(&mut self, mut database: D) -> Result<(), Error> { pub async fn resync<D: Database + Clone>(
&mut self,
mut database: D,
) -> Result<(), ConsensusError> {
let DatabaseResponse::ChainHeight(chain_height) = database let DatabaseResponse::ChainHeight(chain_height) = database
.ready() .ready()
.await? .await?
@ -114,7 +118,10 @@ impl DifficultyCalculator {
self.update_windowed_work(database).await self.update_windowed_work(database).await
} }
async fn update_windowed_work<D: Database>(&mut self, mut database: D) -> Result<(), Error> { async fn update_windowed_work<D: Database>(
&mut self,
mut database: D,
) -> Result<(), ConsensusError> {
let mut block_start = let mut block_start =
(self.last_accounted_height + 1).saturating_sub(DIFFICULTY_BLOCKS_COUNT); (self.last_accounted_height + 1).saturating_sub(DIFFICULTY_BLOCKS_COUNT);
@ -140,6 +147,9 @@ impl DifficultyCalculator {
Ok(()) Ok(())
} }
/// Returns the required difficulty for the next block.
///
/// See: https://cuprate.github.io/monero-book/consensus_rules/blocks/difficulty.html#calculating-difficulty
pub fn next_difficulty(&self, hf: &HardFork) -> u128 { pub fn next_difficulty(&self, hf: &HardFork) -> u128 {
if self.timestamps.len() <= 1 { if self.timestamps.len() <= 1 {
return 1; return 1;
@ -179,31 +189,22 @@ fn get_window_start_and_end(window_len: usize) -> (usize, usize) {
} }
} }
#[instrument(skip(database))] #[instrument(name = "get_blocks_timestamps", skip(database))]
async fn get_blocks_in_range_timestamps<D: Database + Clone>( async fn get_blocks_in_range_timestamps<D: Database + Clone>(
database: D, database: D,
block_heights: Range<u64>, block_heights: Range<u64>,
) -> Result<Vec<u64>, Error> { ) -> Result<Vec<u64>, ConsensusError> {
let timestamp_fut = FuturesOrdered::from_iter( let DatabaseResponse::BlockPOWInfoInRange(pow_infos) = database
block_heights .oneshot(DatabaseRequest::BlockPOWInfoInRange(block_heights))
.map(|height| get_block_timestamp(database.clone(), height).map_ok(move |res| res)),
);
timestamp_fut.try_collect().await
}
async fn get_block_timestamp<D: Database>(database: D, height: u64) -> Result<u64, Error> {
tracing::debug!("Getting block timestamp: {}", height);
let DatabaseResponse::BlockPOWInfo(pow) = database
.oneshot(DatabaseRequest::BlockPOWInfo(height.into()))
.await? .await?
else { else {
panic!("Database service sent incorrect response!"); panic!("Database sent incorrect response");
}; };
Ok(pow.timestamp)
Ok(pow_infos.into_iter().map(|info| info.timestamp).collect())
} }
async fn get_block_cum_diff<D: Database>(database: D, height: u64) -> Result<u128, Error> { async fn get_block_cum_diff<D: Database>(database: D, height: u64) -> Result<u128, ConsensusError> {
let DatabaseResponse::BlockPOWInfo(pow) = database let DatabaseResponse::BlockPOWInfo(pow) = database
.oneshot(DatabaseRequest::BlockPOWInfo(height.into())) .oneshot(DatabaseRequest::BlockPOWInfo(height.into()))
.await? .await?

View file

@ -1,11 +1,20 @@
//! # Block Weights
//!
//! This module contains calculations for block weights, including calculating block weight
//! limits, effective medians and long term block weights.
//!
//! For more information please see the [block weights chapter](https://cuprate.github.io/monero-book/consensus_rules/blocks/weight_limit.html)
//! in the Monero Book.
//!
use std::cmp::{max, min}; use std::cmp::{max, min};
use std::collections::VecDeque;
use std::ops::Range; use std::ops::Range;
use monero_serai::{block::Block, transaction::Transaction}; use monero_serai::{block::Block, transaction::Transaction};
use tower::ServiceExt; use tower::ServiceExt;
use tracing::instrument; use tracing::instrument;
use crate::{hardforks::HardFork, Database, DatabaseRequest, DatabaseResponse, Error}; use crate::{hardforks::HardFork, ConsensusError, Database, DatabaseRequest, DatabaseResponse};
const PENALTY_FREE_ZONE_1: usize = 20000; const PENALTY_FREE_ZONE_1: usize = 20000;
const PENALTY_FREE_ZONE_2: usize = 60000; const PENALTY_FREE_ZONE_2: usize = 60000;
@ -43,9 +52,14 @@ pub fn penalty_free_zone(hf: &HardFork) -> usize {
} }
} }
/// A cache used to calculate block weight limits, the effective median and
/// long term block weights.
///
/// These calculations require a lot of data from the database so by caching
/// this data it reduces the load on the database.
pub struct BlockWeightsCache { pub struct BlockWeightsCache {
/// This list is not sorted. /// This list is not sorted.
short_term_block_weights: Vec<usize>, short_term_block_weights: VecDeque<usize>,
/// This list is sorted. /// This list is sorted.
long_term_weights: Vec<usize>, long_term_weights: Vec<usize>,
/// The height of the top block. /// The height of the top block.
@ -53,7 +67,8 @@ pub struct BlockWeightsCache {
} }
impl BlockWeightsCache { impl BlockWeightsCache {
pub async fn init<D: Database + Clone>(mut database: D) -> Result<Self, Error> { /// Initialize the [`BlockWeightsCache`] at the the height of the database.
pub async fn init<D: Database + Clone>(mut database: D) -> Result<Self, ConsensusError> {
let DatabaseResponse::ChainHeight(chain_height) = database let DatabaseResponse::ChainHeight(chain_height) = database
.ready() .ready()
.await? .await?
@ -66,10 +81,12 @@ impl BlockWeightsCache {
Self::init_from_chain_height(chain_height, database).await Self::init_from_chain_height(chain_height, database).await
} }
/// Initialize the [`BlockWeightsCache`] at the the given chain height.
#[instrument(name = "init_weight_cache", level = "info", skip(database))]
pub async fn init_from_chain_height<D: Database + Clone>( pub async fn init_from_chain_height<D: Database + Clone>(
chain_height: u64, chain_height: u64,
database: D, database: D,
) -> Result<Self, Error> { ) -> Result<Self, ConsensusError> {
let mut long_term_weights = get_long_term_weight_in_range( let mut long_term_weights = get_long_term_weight_in_range(
chain_height.saturating_sub(LONG_TERM_WINDOW)..chain_height, chain_height.saturating_sub(LONG_TERM_WINDOW)..chain_height,
database.clone(), database.clone(),
@ -82,11 +99,14 @@ impl BlockWeightsCache {
long_term_weights.len() long_term_weights.len()
); );
let short_term_block_weights = get_blocks_weight_in_range( let short_term_block_weights: VecDeque<usize> = get_blocks_weight_in_range(
chain_height.saturating_sub(SHORT_TERM_WINDOW)..chain_height, chain_height.saturating_sub(SHORT_TERM_WINDOW)..chain_height,
database, database,
) )
.await?; .await?
.into();
tracing::info!("Initialized block weight cache, chain-height: {:?}, long term weights length: {:?}, short term weights length: {:?}", chain_height, long_term_weights.len(), short_term_block_weights.len());
Ok(BlockWeightsCache { Ok(BlockWeightsCache {
short_term_block_weights, short_term_block_weights,
@ -95,12 +115,111 @@ impl BlockWeightsCache {
}) })
} }
/// Add a new block to the cache.
///
/// The block_height **MUST** be one more than the last height the cache has
/// seen.
pub async fn new_block_added<D: Database>(
&mut self,
block_height: u64,
block_weight: usize,
long_term_weight: usize,
database: &mut D,
) -> Result<(), ConsensusError> {
tracing::debug!(
"Adding new block's {} weights to block cache, weight: {}, long term weight: {}",
block_weight,
block_weight,
long_term_weight
);
assert_eq!(self.tip_height + 1, block_height);
self.tip_height += 1;
match self.long_term_weights.binary_search(&long_term_weight) {
Ok(idx) | Err(idx) => self.long_term_weights.insert(idx, long_term_weight),
};
if let Some(height_to_remove) = block_height.checked_sub(LONG_TERM_WINDOW) {
tracing::debug!(
"Block {} is out of the long term weight window, removing it",
height_to_remove
);
let DatabaseResponse::BlockWeights(weights) = database
.oneshot(DatabaseRequest::BlockWeights(height_to_remove.into()))
.await?
else {
panic!("Database sent incorrect response!");
};
let idx = self
.long_term_weights
.binary_search(&weights.long_term_weight)
.expect("Weight must be in list if in the window");
self.long_term_weights.remove(idx);
}
self.short_term_block_weights.push_back(block_weight);
if self.short_term_block_weights.len() > SHORT_TERM_WINDOW.try_into().unwrap() {
self.short_term_block_weights.pop_front();
}
Ok(())
}
/// Returns the next blocks long term weight.
///
/// See: https://cuprate.github.io/monero-book/consensus_rules/blocks/weight_limit.html#calculating-a-blocks-long-term-weight
pub fn next_block_long_term_weight(&self, hf: &HardFork, block_weight: usize) -> usize { pub fn next_block_long_term_weight(&self, hf: &HardFork, block_weight: usize) -> usize {
calculate_block_long_term_weight(hf, block_weight, &self.long_term_weights) calculate_block_long_term_weight(hf, block_weight, &self.long_term_weights)
} }
/// Returns the effective median weight, used for block reward calculations and to calculate
/// the block weight limit.
///
/// See: https://cuprate.github.io/monero-book/consensus_rules/blocks/weight_limit.html#calculating-effective-median-weight
pub fn effective_median_block_weight(&self, hf: &HardFork) -> usize {
let mut sorted_short_term_weights: Vec<usize> =
self.short_term_block_weights.clone().into();
sorted_short_term_weights.sort_unstable();
calculate_effective_median_block_weight(
hf,
&sorted_short_term_weights,
&self.long_term_weights,
)
} }
pub fn calculate_block_long_term_weight( /// Returns the block weight limit.
pub fn next_block_weight_limit(&self, hf: &HardFork) -> usize {
2 * self.effective_median_block_weight(hf)
}
}
fn calculate_effective_median_block_weight(
hf: &HardFork,
sorted_short_term_window: &[usize],
sorted_long_term_window: &[usize],
) -> usize {
if hf.in_range(&HardFork::V1, &HardFork::V10) {
return median(sorted_short_term_window);
}
let long_term_median = median(sorted_long_term_window).max(PENALTY_FREE_ZONE_5);
let short_term_median = median(sorted_short_term_window);
let effective_median = if hf.in_range(&HardFork::V10, &HardFork::V15) {
min(
max(PENALTY_FREE_ZONE_5, short_term_median),
50 * long_term_median,
)
} else {
min(
max(long_term_median, short_term_median),
50 * long_term_median,
)
};
effective_median.max(penalty_free_zone(hf))
}
fn calculate_block_long_term_weight(
hf: &HardFork, hf: &HardFork,
block_weight: usize, block_weight: usize,
sorted_long_term_window: &[usize], sorted_long_term_window: &[usize],
@ -142,31 +261,31 @@ fn median(array: &[usize]) -> usize {
} }
} }
#[instrument(skip(database))] #[instrument(name = "get_block_weights", skip(database))]
async fn get_blocks_weight_in_range<D: Database + Clone>( async fn get_blocks_weight_in_range<D: Database + Clone>(
range: Range<u64>, range: Range<u64>,
database: D, database: D,
) -> Result<Vec<usize>, Error> { ) -> Result<Vec<usize>, ConsensusError> {
let DatabaseResponse::BlockWeightsInRange(weights) = database let DatabaseResponse::BlockWeightsInRange(weights) = database
.oneshot(DatabaseRequest::BlockWeightsInRange(range)) .oneshot(DatabaseRequest::BlockWeightsInRange(range))
.await? .await?
else { else {
panic!() panic!("Database sent incorrect response!")
}; };
Ok(weights.into_iter().map(|info| info.block_weight).collect()) Ok(weights.into_iter().map(|info| info.block_weight).collect())
} }
#[instrument(skip(database))] #[instrument(name = "get_long_term_weights", skip(database))]
async fn get_long_term_weight_in_range<D: Database + Clone>( async fn get_long_term_weight_in_range<D: Database + Clone>(
range: Range<u64>, range: Range<u64>,
database: D, database: D,
) -> Result<Vec<usize>, Error> { ) -> Result<Vec<usize>, ConsensusError> {
let DatabaseResponse::BlockWeightsInRange(weights) = database let DatabaseResponse::BlockWeightsInRange(weights) = database
.oneshot(DatabaseRequest::BlockWeightsInRange(range)) .oneshot(DatabaseRequest::BlockWeightsInRange(range))
.await? .await?
else { else {
panic!() panic!("Database sent incorrect response!")
}; };
Ok(weights Ok(weights

View file

@ -1,5 +1,3 @@
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryFutureExt};
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use std::ops::Range; use std::ops::Range;
@ -7,13 +5,35 @@ use monero_serai::block::BlockHeader;
use tower::ServiceExt; use tower::ServiceExt;
use tracing::instrument; use tracing::instrument;
use cuprate_common::{BlockID, Network}; use cuprate_common::Network;
use crate::{Database, DatabaseRequest, DatabaseResponse, Error}; use crate::{ConsensusError, Database, DatabaseRequest, DatabaseResponse};
// https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#accepting-a-fork // https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#accepting-a-fork
const DEFAULT_WINDOW_SIZE: u64 = 10080; // supermajority window check length - a week const DEFAULT_WINDOW_SIZE: u64 = 10080; // supermajority window check length - a week
#[derive(Debug, Clone, Copy)]
pub struct BlockHFInfo {
version: HardFork,
vote: HardFork,
}
impl BlockHFInfo {
pub fn from_block_header(block_header: &BlockHeader) -> Result<BlockHFInfo, ConsensusError> {
BlockHFInfo::from_major_minor(block_header.major_version, block_header.minor_version)
}
pub fn from_major_minor(
major_version: u8,
minor_version: u8,
) -> Result<BlockHFInfo, ConsensusError> {
Ok(BlockHFInfo {
version: HardFork::from_version(&major_version)?,
vote: HardFork::from_vote(&minor_version),
})
}
}
/// An identifier for every hard-fork Monero has had. /// An identifier for every hard-fork Monero has had.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
#[repr(u8)] #[repr(u8)]
@ -41,7 +61,7 @@ impl HardFork {
/// Returns the hard-fork for a blocks `major_version` field. /// Returns the hard-fork for a blocks `major_version` field.
/// ///
/// https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#blocks-version-and-vote /// https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#blocks-version-and-vote
pub fn from_version(version: &u8) -> Result<HardFork, Error> { pub fn from_version(version: &u8) -> Result<HardFork, ConsensusError> {
Ok(match version { Ok(match version {
1 => HardFork::V1, 1 => HardFork::V1,
2 => HardFork::V2, 2 => HardFork::V2,
@ -60,7 +80,7 @@ impl HardFork {
15 => HardFork::V15, 15 => HardFork::V15,
16 => HardFork::V16, 16 => HardFork::V16,
_ => { _ => {
return Err(Error::InvalidHardForkVersion( return Err(ConsensusError::InvalidHardForkVersion(
"Version is not a known hard fork", "Version is not a known hard fork",
)) ))
} }
@ -240,7 +260,7 @@ impl HardForks {
pub async fn init<D: Database + Clone>( pub async fn init<D: Database + Clone>(
config: HardForkConfig, config: HardForkConfig,
mut database: D, mut database: D,
) -> Result<Self, Error> ) -> Result<Self, ConsensusError>
where where
D::Future: Send + 'static, D::Future: Send + 'static,
{ {
@ -256,11 +276,6 @@ impl HardForks {
let mut hfs = let mut hfs =
HardForks::init_at_chain_height(config, chain_height, database.clone()).await?; HardForks::init_at_chain_height(config, chain_height, database.clone()).await?;
// This is only needed if the database moves independently of the HardFork class aka if we are checking a node instead of keeping state ourself.
hfs.resync(&mut database).await?;
hfs.check_set_new_hf();
tracing::info!("HardFork state: {:?}", hfs); tracing::info!("HardFork state: {:?}", hfs);
Ok(hfs) Ok(hfs)
@ -270,7 +285,7 @@ impl HardForks {
config: HardForkConfig, config: HardForkConfig,
chain_height: u64, chain_height: u64,
mut database: D, mut database: D,
) -> Result<Self, Error> ) -> Result<Self, ConsensusError>
where where
D::Future: Send + 'static, D::Future: Send + 'static,
{ {
@ -282,10 +297,16 @@ impl HardForks {
debug_assert_eq!(votes.total_votes(), config.window) debug_assert_eq!(votes.total_votes(), config.window)
} }
let latest_header = get_block_header(&mut database, chain_height - 1).await?; let DatabaseResponse::BlockHfInfo(hf_info) = database
.ready()
.await?
.call(DatabaseRequest::BlockPOWInfo((chain_height - 1).into()))
.await?
else {
panic!("Database sent incorrect response!");
};
let current_hardfork = HardFork::from_version(&latest_header.major_version) let current_hardfork = hf_info.version;
.expect("Invalid major version in stored block");
let next_hardfork = current_hardfork.next_fork(); let next_hardfork = current_hardfork.next_fork();
@ -304,69 +325,18 @@ impl HardForks {
Ok(hfs) Ok(hfs)
} }
#[instrument(skip(self, database))] pub fn check_block_version_vote(&self, block_hf_info: &BlockHFInfo) -> bool {
async fn resync<D: Database>(&mut self, mut database: D) -> Result<(), Error> { &self.current_hardfork == &block_hf_info.version
let DatabaseResponse::ChainHeight(mut chain_height) = database && &block_hf_info.vote >= &self.current_hardfork
.ready()
.await?
.call(DatabaseRequest::ChainHeight)
.await?
else {
panic!("Database sent incorrect response")
};
tracing::debug!(
"chain-tip: {}, last height: {}",
chain_height - 1,
self.last_height
);
loop {
while chain_height > self.last_height + 1 {
self.get_and_account_new_block(self.last_height + 1, &mut database)
.await?;
} }
let DatabaseResponse::ChainHeight(c_h) = database pub async fn new_block<D: Database>(
.ready()
.await?
.call(DatabaseRequest::ChainHeight)
.await?
else {
panic!("Database sent incorrect response")
};
chain_height = c_h;
if chain_height == self.last_height + 1 {
return Ok(());
}
tracing::debug!(
"chain-tip: {}, last height: {}",
chain_height - 1,
self.last_height
);
}
}
async fn get_and_account_new_block<D: Database>(
&mut self, &mut self,
vote: HardFork,
height: u64, height: u64,
mut database: D, mut database: D,
) -> Result<(), Error> { ) -> Result<(), ConsensusError> {
let header = get_block_header(&mut database, height).await?; assert_eq!(self.last_height + 1, height);
self.new_block(HardFork::from_vote(&header.minor_version), height, database)
.await;
Ok(())
}
pub fn check_block_version_vote(&self, version: &HardFork, vote: &HardFork) -> bool {
&self.current_hardfork == version && vote >= &self.current_hardfork
}
pub async fn new_block<D: Database>(&mut self, vote: HardFork, height: u64, mut database: D) {
debug_assert_eq!(self.last_height + 1, height);
self.last_height += 1; self.last_height += 1;
tracing::debug!( tracing::debug!(
@ -377,29 +347,36 @@ impl HardForks {
self.votes.add_vote_for_hf(&vote); self.votes.add_vote_for_hf(&vote);
for offset in self.config.window..self.votes.total_votes() { for height_to_remove in
let header = get_block_header(&mut database, height - offset) (self.config.window..self.votes.total_votes()).map(|offset| height - offset)
.await {
.expect("Error retrieving block we should have in database"); let DatabaseResponse::BlockHfInfo(hf_info) = database
.ready()
.await?
.call(DatabaseRequest::BlockPOWInfo(height_to_remove.into()))
.await?
else {
panic!("Database sent incorrect response!");
};
let vote = HardFork::from_vote(&header.minor_version);
tracing::debug!( tracing::debug!(
"Removing block {} vote ({:?}) as they have left the window", "Removing block {} vote ({:?}) as they have left the window",
height - offset, height_to_remove,
vote hf_info.vote
); );
self.votes.remove_vote_for_hf(&vote); self.votes.remove_vote_for_hf(&hf_info.vote);
} }
if height > self.config.window { if height > self.config.window {
debug_assert_eq!(self.votes.total_votes(), self.config.window); debug_assert_eq!(self.votes.total_votes(), self.config.window);
} }
self.check_set_new_hf() self.check_set_new_hf();
Ok(())
} }
/// Checks if the next hard-fork should be activated and sets it it it should. /// Checks if the next hard-fork should be activated and activates it if it should.
/// ///
/// https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#accepting-a-fork /// https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#accepting-a-fork
fn check_set_new_hf(&mut self) { fn check_set_new_hf(&mut self) {
@ -422,42 +399,26 @@ impl HardForks {
} }
} }
#[instrument(skip(database))] #[instrument(name = "get_votes", skip(database))]
async fn get_votes_in_range<D: Database + Clone>( async fn get_votes_in_range<D: Database + Clone>(
database: D, database: D,
block_heights: Range<u64>, block_heights: Range<u64>,
) -> Result<HFVotes, Error> ) -> Result<HFVotes, ConsensusError>
where where
D::Future: Send + 'static, D::Future: Send + 'static,
{ {
let mut votes = HFVotes::default(); let mut votes = HFVotes::default();
let mut fut = let DatabaseResponse::BlockHfInfoInRange(vote_list) = database
FuturesUnordered::from_iter(block_heights.map(|height| { .oneshot(DatabaseRequest::BlockHfInfoInRange(block_heights))
get_block_header(database.clone(), height).map_ok(move |res| (height, res)) .await?
})); else {
panic!("Database sent incorrect response!");
};
while let Some(res) = fut.next().await { for hf_info in vote_list.into_iter() {
let (height, header): (u64, BlockHeader) = res?; votes.add_vote_for_hf(&hf_info.vote);
let vote = HardFork::from_vote(&header.minor_version);
tracing::debug!("Block vote for height: {} = {:?}", height, vote);
votes.add_vote_for_hf(&HardFork::from_vote(&header.minor_version));
} }
Ok(votes) Ok(votes)
} }
async fn get_block_header<D: Database>(
database: D,
block_id: impl Into<BlockID>,
) -> Result<BlockHeader, Error> {
let DatabaseResponse::BlockHeader(header) = database
.oneshot(DatabaseRequest::BlockHeader(block_id.into()))
.await?
else {
panic!("Database sent incorrect response for block header request")
};
Ok(header)
}

View file

@ -6,7 +6,7 @@ pub mod miner_tx;
pub mod rpc; pub mod rpc;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum Error { pub enum ConsensusError {
#[error("Invalid hard fork version: {0}")] #[error("Invalid hard fork version: {0}")]
InvalidHardForkVersion(&'static str), InvalidHardForkVersion(&'static str),
#[error("Database error: {0}")] #[error("Database error: {0}")]
@ -25,22 +25,26 @@ impl<T: tower::Service<DatabaseRequest, Response = DatabaseResponse, Error = tow
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum DatabaseRequest { pub enum DatabaseRequest {
BlockHeader(cuprate_common::BlockID), BlockHFInfo(cuprate_common::BlockID),
BlockPOWInfo(cuprate_common::BlockID), BlockPOWInfo(cuprate_common::BlockID),
BlockWeights(cuprate_common::BlockID), BlockWeights(cuprate_common::BlockID),
BlockHfInfoInRange(std::ops::Range<u64>),
BlockWeightsInRange(std::ops::Range<u64>), BlockWeightsInRange(std::ops::Range<u64>),
BlockPOWInfoInRange(std::ops::Range<u64>),
ChainHeight, ChainHeight,
} }
#[derive(Debug)] #[derive(Debug)]
pub enum DatabaseResponse { pub enum DatabaseResponse {
BlockHeader(monero_serai::block::BlockHeader), BlockHfInfo(hardforks::BlockHFInfo),
BlockPOWInfo(block::pow::BlockPOWInfo), BlockPOWInfo(block::pow::BlockPOWInfo),
BlockWeights(block::weight::BlockWeightInfo), BlockWeights(block::weight::BlockWeightInfo),
BlockHfInfoInRange(Vec<hardforks::BlockHFInfo>),
BlockWeightsInRange(Vec<block::weight::BlockWeightInfo>), BlockWeightsInRange(Vec<block::weight::BlockWeightInfo>),
BlockPOWInfoInRange(Vec<block::pow::BlockPOWInfo>),
ChainHeight(u64), ChainHeight(u64),
} }

View file

@ -18,9 +18,10 @@ use cuprate_common::BlockID;
use crate::block::pow::BlockPOWInfo; use crate::block::pow::BlockPOWInfo;
use crate::block::weight::BlockWeightInfo; use crate::block::weight::BlockWeightInfo;
use crate::hardforks::BlockHFInfo;
use crate::{DatabaseRequest, DatabaseResponse}; use crate::{DatabaseRequest, DatabaseResponse};
const MAX_BLOCKS_IN_RANGE: u64 = 25; const MAX_BLOCKS_IN_RANGE: u64 = 50;
#[derive(Clone)] #[derive(Clone)]
pub struct Attempts(u64); pub struct Attempts(u64);
@ -84,38 +85,94 @@ where
let this = self.rpcs.clone(); let this = self.rpcs.clone();
match req { match req {
DatabaseRequest::BlockWeightsInRange(range) => async move { DatabaseRequest::BlockPOWInfoInRange(range) => {
let res_to_weights = |res| { let resp_to_ret = |resp: DatabaseResponse| {
let DatabaseResponse::BlockWeightsInRange(range) = res else { let DatabaseResponse::BlockPOWInfoInRange(pow_info) = resp else {
panic!("Incorrect Response!"); panic!("Database sent incorrect response");
}; };
range pow_info
}; };
split_range_request(
this,
range,
DatabaseRequest::BlockPOWInfoInRange,
DatabaseResponse::BlockPOWInfoInRange,
resp_to_ret,
)
}
DatabaseRequest::BlockWeightsInRange(range) => {
let resp_to_ret = |resp: DatabaseResponse| {
let DatabaseResponse::BlockWeightsInRange(weights) = resp else {
panic!("Database sent incorrect response");
};
weights
};
split_range_request(
this,
range,
DatabaseRequest::BlockWeightsInRange,
DatabaseResponse::BlockWeightsInRange,
resp_to_ret,
)
}
DatabaseRequest::BlockHfInfoInRange(range) => {
let resp_to_ret = |resp: DatabaseResponse| {
let DatabaseResponse::BlockHfInfoInRange(hf_info) = resp else {
panic!("Database sent incorrect response");
};
hf_info
};
split_range_request(
this,
range,
DatabaseRequest::BlockHfInfoInRange,
DatabaseResponse::BlockHfInfoInRange,
resp_to_ret,
)
}
req => this.oneshot(req).boxed(),
}
}
}
fn split_range_request<T, Ret>(
rpc: T,
range: Range<u64>,
req: impl FnOnce(Range<u64>) -> DatabaseRequest + Clone + Send + 'static,
resp: impl FnOnce(Vec<Ret>) -> DatabaseResponse + Send + 'static,
resp_to_ret: impl Fn(DatabaseResponse) -> Vec<Ret> + Copy + Send + 'static,
) -> Pin<Box<dyn Future<Output = Result<DatabaseResponse, tower::BoxError>> + Send + 'static>>
where
T: tower::Service<DatabaseRequest, Response = DatabaseResponse, Error = tower::BoxError>
+ Clone
+ Send
+ Sync
+ 'static,
T::Future: Send + 'static,
Ret: Send + 'static,
{
let iter = (0..range.clone().count() as u64) let iter = (0..range.clone().count() as u64)
.step_by(MAX_BLOCKS_IN_RANGE as usize) .step_by(MAX_BLOCKS_IN_RANGE as usize)
.map(|i| { .map(|i| {
let new_range = (range.start + i) let req = req.clone();
..(min(range.start + i + MAX_BLOCKS_IN_RANGE, range.end)); let new_range =
this.clone() (range.start + i)..(min(range.start + i + MAX_BLOCKS_IN_RANGE, range.end));
.oneshot(DatabaseRequest::BlockWeightsInRange(new_range)) rpc.clone().oneshot(req(new_range)).map_ok(resp_to_ret)
.map_ok(res_to_weights)
}); });
let fut = FuturesOrdered::from_iter(iter); let fut = FuturesOrdered::from_iter(iter);
let mut res = Vec::with_capacity(range.count()); let mut res = Vec::with_capacity(range.count());
async move {
for mut rpc_res in fut.try_collect::<Vec<Vec<_>>>().await?.into_iter() { for mut rpc_res in fut.try_collect::<Vec<Vec<_>>>().await?.into_iter() {
res.append(&mut rpc_res) res.append(&mut rpc_res)
} }
Ok(DatabaseResponse::BlockWeightsInRange(res)) Ok(resp(res))
}
.boxed(),
req => this.oneshot(req).boxed(),
}
} }
.boxed()
} }
enum RpcState<R: RpcConnection> { enum RpcState<R: RpcConnection> {
@ -194,35 +251,18 @@ impl<R: RpcConnection + Send + Sync + 'static> tower::Service<DatabaseRequest> f
} }
.boxed(), .boxed(),
DatabaseRequest::BlockHeader(id) => match id {
BlockID::Hash(hash) => async move {
let res: Result<_, RpcError> = rpc
.get_block(hash)
.map_ok(|block| DatabaseResponse::BlockHeader(block.header))
.await;
if let Err(e) = &res {
*err_slot.lock().unwrap() = Some(e.clone());
}
res.map_err(Into::into)
}
.boxed(),
BlockID::Height(height) => async move {
let res: Result<_, RpcError> = rpc
.get_block_by_number(height.try_into().unwrap())
.map_ok(|block| DatabaseResponse::BlockHeader(block.header))
.await;
if let Err(e) = &res {
*err_slot.lock().unwrap() = Some(e.clone());
}
res.map_err(Into::into)
}
.boxed(),
},
DatabaseRequest::BlockPOWInfo(id) => get_blocks_pow_info(id, rpc).boxed(), DatabaseRequest::BlockPOWInfo(id) => get_blocks_pow_info(id, rpc).boxed(),
DatabaseRequest::BlockWeights(id) => get_blocks_weight_info(id, rpc).boxed(), DatabaseRequest::BlockWeights(id) => get_blocks_weight_info(id, rpc).boxed(),
DatabaseRequest::BlockHFInfo(id) => get_blocks_hf_info(id, rpc).boxed(),
DatabaseRequest::BlockHfInfoInRange(range) => {
get_blocks_hf_info_in_range(range, rpc).boxed()
}
DatabaseRequest::BlockWeightsInRange(range) => { DatabaseRequest::BlockWeightsInRange(range) => {
get_blocks_weight_info_in_range(range, rpc).boxed() get_blocks_weight_info_in_range(range, rpc).boxed()
} }
DatabaseRequest::BlockPOWInfoInRange(range) => {
get_blocks_pow_info_in_range(range, rpc).boxed()
}
} }
} }
} }
@ -234,6 +274,9 @@ struct BlockInfo {
timestamp: u64, timestamp: u64,
block_weight: usize, block_weight: usize,
long_term_weight: usize, long_term_weight: usize,
major_version: u8,
minor_version: u8,
} }
async fn get_block_info_in_range<R: RpcConnection>( async fn get_block_info_in_range<R: RpcConnection>(
@ -252,7 +295,7 @@ async fn get_block_info_in_range<R: RpcConnection>(
) )
.await?; .await?;
tracing::info!("Retrieved blocks in range: {:?}", range); tracing::debug!("Retrieved blocks in range: {:?}", range);
Ok(res.headers) Ok(res.headers)
} }
@ -303,6 +346,25 @@ async fn get_blocks_weight_info_in_range<R: RpcConnection>(
)) ))
} }
async fn get_blocks_pow_info_in_range<R: RpcConnection>(
range: Range<u64>,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> {
let info = get_block_info_in_range(range, rpc).await?;
Ok(DatabaseResponse::BlockPOWInfoInRange(
info.into_iter()
.map(|info| BlockPOWInfo {
timestamp: info.timestamp,
cumulative_difficulty: u128_from_low_high(
info.cumulative_difficulty,
info.cumulative_difficulty_top64,
),
})
.collect(),
))
}
async fn get_blocks_weight_info<R: RpcConnection>( async fn get_blocks_weight_info<R: RpcConnection>(
id: BlockID, id: BlockID,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>, rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
@ -334,3 +396,29 @@ fn u128_from_low_high(low: u64, high: u64) -> u128 {
let res: u128 = high as u128; let res: u128 = high as u128;
res << 64 | low as u128 res << 64 | low as u128
} }
async fn get_blocks_hf_info<R: RpcConnection>(
id: BlockID,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> {
let info = get_block_info(id, rpc).await?;
Ok(DatabaseResponse::BlockHfInfo(
BlockHFInfo::from_major_minor(info.major_version, info.minor_version)?,
))
}
async fn get_blocks_hf_info_in_range<R: RpcConnection>(
range: Range<u64>,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> {
let info = get_block_info_in_range(range, rpc).await?;
Ok(DatabaseResponse::BlockHfInfoInRange(
info.into_iter()
.map(|info| {
BlockHFInfo::from_major_minor(info.major_version, info.minor_version).unwrap()
})
.collect(),
))
}