From 6bfc4da4e4bd2222ea2bc057432592d9247de589 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Mon, 23 Oct 2023 22:24:02 +0100 Subject: [PATCH] cache more blockchain data to reduce number of RPC calls --- consensus/Cargo.toml | 1 + consensus/src/context.rs | 33 ++++------- consensus/src/context/difficulty.rs | 88 +++++++---------------------- consensus/src/context/hardforks.rs | 68 +++++++++------------- cryptonight/build.rs | 2 + 5 files changed, 59 insertions(+), 133 deletions(-) diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 1f2aa8d..456eafd 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -16,6 +16,7 @@ binaries = [ "tower/retry", "tower/balance", "tower/buffer", + "tower/timeout", "dep:serde_json", "dep:serde", "dep:monero-epee-bin-serde", diff --git a/consensus/src/context.rs b/consensus/src/context.rs index 81d5515..acd5ac2 100644 --- a/consensus/src/context.rs +++ b/consensus/src/context.rs @@ -117,7 +117,6 @@ where chain_height: Arc::new(chain_height.into()), already_generated_coins: Arc::new(already_generated_coins.into()), top_block_hash: Arc::new(top_block_hash.into()), - database, }; let context_svc_update = context_svc.clone(); @@ -130,9 +129,9 @@ where #[derive(Debug, Clone, Copy)] pub struct BlockChainContext { /// The next blocks difficulty. - next_difficulty: u128, + pub next_difficulty: u128, /// The current cumulative difficulty. - cumulative_difficulty: u128, + pub cumulative_difficulty: u128, /// The current effective median block weight. effective_median_weight: usize, /// The median long term block weight. @@ -155,7 +154,7 @@ pub struct BlockChainContext { pub struct BlockChainContextRequest; #[derive(Clone)] -pub struct BlockChainContextService { +pub struct BlockChainContextService { difficulty_cache: Arc>, weight_cache: Arc>, hardfork_state: Arc>, @@ -163,11 +162,9 @@ pub struct BlockChainContextService { chain_height: Arc>, top_block_hash: Arc>, already_generated_coins: Arc>, - - database: D, } -impl Service for BlockChainContextService { +impl Service for BlockChainContextService { type Response = BlockChainContext; type Error = ConsensusError; type Future = @@ -218,20 +215,17 @@ pub struct UpdateBlockchainCacheRequest { pub long_term_weight: usize, pub generated_coins: u64, pub vote: HardFork, + pub cumulative_difficulty: u128, } -impl tower::Service for BlockChainContextService -where - D: Database + Clone + Send + Sync + 'static, - D::Future: Send + 'static, -{ +impl tower::Service for BlockChainContextService { type Response = (); type Error = tower::BoxError; type Future = Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.database.poll_ready(cx).map_err(Into::into) + Poll::Ready(Ok(())) } fn call(&mut self, new: UpdateBlockchainCacheRequest) -> Self::Future { @@ -243,30 +237,23 @@ where let top_hash = self.top_block_hash.clone(); let already_generated_coins = self.already_generated_coins.clone(); - let database = self.database.clone(); - async move { difficulty_cache .write() .await - .new_block(new.height, new.timestamp, database.clone()) + .new_block(new.height, new.timestamp, new.cumulative_difficulty) .await?; weight_cache .write() .await - .new_block( - new.height, - new.weight, - new.long_term_weight, - database.clone(), - ) + .new_block(new.height, new.weight, new.long_term_weight) .await?; hardfork_state .write() .await - .new_block(new.vote, new.height, database) + .new_block(new.vote, new.height) .await?; *chain_height.write().await = new.height + 1; diff --git a/consensus/src/context/difficulty.rs b/consensus/src/context/difficulty.rs index f6f7faf..c9437ad 100644 --- a/consensus/src/context/difficulty.rs +++ b/consensus/src/context/difficulty.rs @@ -57,11 +57,8 @@ pub struct DifficultyCache { /// The list of timestamps in the window. /// len <= [`DIFFICULTY_BLOCKS_COUNT`] timestamps: VecDeque, - /// The work done in the [`DIFFICULTY_ACCOUNTED_WINDOW_LEN`] window, this is an optimisation - /// so we don't need to keep track of cumulative difficulties as well as timestamps. - windowed_work: u128, /// The current cumulative difficulty of the chain. - cumulative_difficulty: u128, + cumulative_difficulties: VecDeque, /// The last height we accounted for. last_accounted_height: u64, /// The config @@ -100,19 +97,16 @@ impl DifficultyCache { block_start = 1; } - let timestamps = - get_blocks_in_range_timestamps(database.clone(), block_start..chain_height).await?; + let (timestamps, cumulative_difficulties) = + get_blocks_in_pow_info(database.clone(), block_start..chain_height).await?; let mut diff = DifficultyCache { timestamps, - windowed_work: 0, - cumulative_difficulty: 0, + cumulative_difficulties, last_accounted_height: chain_height - 1, config, }; - diff.update_windowed_work(database).await?; - tracing::info!( "Current chain height: {}, accounting for {} blocks timestamps", chain_height, @@ -122,61 +116,23 @@ impl DifficultyCache { Ok(diff) } - pub async fn new_block( + pub async fn new_block( &mut self, height: u64, timestamp: u64, - database: D, + cumulative_difficulty: u128, ) -> Result<(), ConsensusError> { assert_eq!(self.last_accounted_height + 1, height); self.last_accounted_height += 1; self.timestamps.push_back(timestamp); + self.cumulative_difficulties + .push_back(cumulative_difficulty); if u64::try_from(self.timestamps.len()).unwrap() > self.config.total_block_count() { self.timestamps.pop_front(); + self.cumulative_difficulties.pop_front(); } - self.update_windowed_work(database).await?; - - Ok(()) - } - - async fn update_windowed_work( - &mut self, - mut database: D, - ) -> Result<(), ConsensusError> { - if self.last_accounted_height == 0 { - return Ok(()); - } - - let mut block_start = - (self.last_accounted_height + 1).saturating_sub(self.config.total_block_count()); - - // skip the genesis block - if block_start == 0 { - block_start = 1; - } - - let (start, end) = - get_window_start_and_end(self.timestamps.len(), self.config.accounted_window_len()); - - let low_cumulative_difficulty = get_block_cum_diff( - &mut database, - block_start + TryInto::::try_into(start).unwrap(), - ) - .await?; - - let high_cumulative_difficulty = get_block_cum_diff( - &mut database, - block_start + TryInto::::try_into(end).unwrap() - 1, - ) - .await?; - - let chain_cumulative_difficulty = - get_block_cum_diff(&mut database, self.last_accounted_height).await?; - - self.cumulative_difficulty = chain_cumulative_difficulty; - self.windowed_work = high_cumulative_difficulty - low_cumulative_difficulty; Ok(()) } @@ -200,11 +156,14 @@ impl DifficultyCache { let mut time_span = u128::from(sorted_timestamps[window_end - 1] - sorted_timestamps[window_start]); + let windowed_work = self.cumulative_difficulties[window_end - 1] + - self.cumulative_difficulties[window_start]; + if time_span == 0 { time_span = 1; } - (self.windowed_work * hf.block_time().as_secs() as u128 + time_span - 1) / time_span + (windowed_work * hf.block_time().as_secs() as u128 + time_span - 1) / time_span } /// Returns the median timestamp over the last `numb_blocks`. @@ -222,7 +181,7 @@ impl DifficultyCache { /// Returns the cumulative difficulty of the chain. pub fn cumulative_difficulty(&self) -> u128 { - self.cumulative_difficulty + self.cumulative_difficulties.back().copied().unwrap_or(0) } pub fn top_block_timestamp(&self) -> Option { @@ -246,10 +205,10 @@ fn get_window_start_and_end(window_len: usize, accounted_window: usize) -> (usiz } #[instrument(name = "get_blocks_timestamps", skip(database), level = "info")] -async fn get_blocks_in_range_timestamps( +async fn get_blocks_in_pow_info( database: D, block_heights: Range, -) -> Result, ConsensusError> { +) -> Result<(VecDeque, VecDeque), ConsensusError> { tracing::info!("Getting blocks timestamps"); let DatabaseResponse::BlockExtendedHeaderInRange(ext_header) = database @@ -259,15 +218,8 @@ async fn get_blocks_in_range_timestamps( panic!("Database sent incorrect response"); }; - Ok(ext_header.into_iter().map(|info| info.timestamp).collect()) -} - -async fn get_block_cum_diff(database: D, height: u64) -> Result { - let DatabaseResponse::BlockExtendedHeader(ext_header) = database - .oneshot(DatabaseRequest::BlockExtendedHeader(height.into())) - .await? - else { - panic!("Database service sent incorrect response!"); - }; - Ok(ext_header.cumulative_difficulty) + Ok(ext_header + .into_iter() + .map(|info| (info.timestamp, info.cumulative_difficulty)) + .unzip()) } diff --git a/consensus/src/context/hardforks.rs b/consensus/src/context/hardforks.rs index 2b17459..9e57a3d 100644 --- a/consensus/src/context/hardforks.rs +++ b/consensus/src/context/hardforks.rs @@ -1,4 +1,5 @@ use std::{ + collections::VecDeque, fmt::{Display, Formatter}, ops::Range, time::Duration, @@ -192,9 +193,11 @@ impl HardFork { } /// A struct holding the current voting state of the blockchain. -#[derive(Debug, Default, Clone)] +#[derive(Debug, Clone)] struct HFVotes { votes: [u64; NUMB_OF_HARD_FORKS], + vote_list: VecDeque, + window_size: usize, } impl Display for HFVotes { @@ -222,19 +225,22 @@ impl Display for HFVotes { } impl HFVotes { - /// Add votes for a hard-fork - pub fn add_votes_for_hf(&mut self, hf: &HardFork, votes: u64) { - self.votes[*hf as usize - 1] += votes; + pub fn new(window_size: usize) -> HFVotes { + HFVotes { + votes: [0; NUMB_OF_HARD_FORKS], + vote_list: VecDeque::with_capacity(window_size), + window_size, + } } - /// Add a vote for a hard-fork. + /// Add a vote for a hard-fork, this function removes votes outside of the window. pub fn add_vote_for_hf(&mut self, hf: &HardFork) { - self.add_votes_for_hf(hf, 1) - } - - /// Remove a vote for a hard-fork. - pub fn remove_vote_for_hf(&mut self, hf: &HardFork) { - self.votes[*hf as usize - 1] -= 1; + self.vote_list.push_back(*hf); + self.votes[*hf as usize - 1] += 1; + if self.vote_list.len() > self.window_size { + let hf = self.vote_list.pop_front().unwrap(); + self.votes[hf as usize - 1] -= 1; + } } /// Returns the total votes for a hard-fork. @@ -291,7 +297,12 @@ impl HardForkState { let block_start = chain_height.saturating_sub(config.window); - let votes = get_votes_in_range(database.clone(), block_start..chain_height).await?; + let votes = get_votes_in_range( + database.clone(), + block_start..chain_height, + usize::try_from(config.window).unwrap(), + ) + .await?; if chain_height > config.window { debug_assert_eq!(votes.total_votes(), config.window) @@ -331,12 +342,7 @@ impl HardForkState { Ok(hfs) } - pub async fn new_block( - &mut self, - vote: HardFork, - height: u64, - mut database: D, - ) -> Result<(), ConsensusError> { + pub async fn new_block(&mut self, vote: HardFork, height: u64) -> Result<(), ConsensusError> { assert_eq!(self.last_height + 1, height); self.last_height += 1; @@ -348,29 +354,6 @@ impl HardForkState { self.votes.add_vote_for_hf(&vote); - for height_to_remove in - (self.config.window..self.votes.total_votes()).map(|offset| height - offset) - { - let DatabaseResponse::BlockExtendedHeader(ext_header) = database - .ready() - .await? - .call(DatabaseRequest::BlockExtendedHeader( - height_to_remove.into(), - )) - .await? - else { - panic!("Database sent incorrect response!"); - }; - - tracing::debug!( - "Removing block {} vote ({:?}) as they have left the window", - height_to_remove, - ext_header.vote - ); - - self.votes.remove_vote_for_hf(&ext_header.vote); - } - if height > self.config.window { debug_assert_eq!(self.votes.total_votes(), self.config.window); } @@ -418,8 +401,9 @@ pub fn votes_needed(threshold: u64, window: u64) -> u64 { async fn get_votes_in_range( database: D, block_heights: Range, + window_size: usize, ) -> Result { - let mut votes = HFVotes::default(); + let mut votes = HFVotes::new(window_size); let DatabaseResponse::BlockExtendedHeaderInRange(vote_list) = database .oneshot(DatabaseRequest::BlockExtendedHeaderInRange(block_heights)) diff --git a/cryptonight/build.rs b/cryptonight/build.rs index 7fd2c95..92a7fea 100644 --- a/cryptonight/build.rs +++ b/cryptonight/build.rs @@ -21,5 +21,7 @@ fn main() { .file("c/CryptonightR_JIT.c") .file("c/CryptonightR_template.S") .flag("-maes") + .flag("-Ofast") + .flag("-fexceptions") .compile("cryptonight") }