cache more blockchain data to reduce number of RPC calls

This commit is contained in:
Boog900 2023-10-23 22:24:02 +01:00
parent eeefe49d63
commit 6bfc4da4e4
No known key found for this signature in database
GPG key ID: 5401367FB7302004
5 changed files with 59 additions and 133 deletions

View file

@ -16,6 +16,7 @@ binaries = [
"tower/retry", "tower/retry",
"tower/balance", "tower/balance",
"tower/buffer", "tower/buffer",
"tower/timeout",
"dep:serde_json", "dep:serde_json",
"dep:serde", "dep:serde",
"dep:monero-epee-bin-serde", "dep:monero-epee-bin-serde",

View file

@ -117,7 +117,6 @@ where
chain_height: Arc::new(chain_height.into()), chain_height: Arc::new(chain_height.into()),
already_generated_coins: Arc::new(already_generated_coins.into()), already_generated_coins: Arc::new(already_generated_coins.into()),
top_block_hash: Arc::new(top_block_hash.into()), top_block_hash: Arc::new(top_block_hash.into()),
database,
}; };
let context_svc_update = context_svc.clone(); let context_svc_update = context_svc.clone();
@ -130,9 +129,9 @@ where
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub struct BlockChainContext { pub struct BlockChainContext {
/// The next blocks difficulty. /// The next blocks difficulty.
next_difficulty: u128, pub next_difficulty: u128,
/// The current cumulative difficulty. /// The current cumulative difficulty.
cumulative_difficulty: u128, pub cumulative_difficulty: u128,
/// The current effective median block weight. /// The current effective median block weight.
effective_median_weight: usize, effective_median_weight: usize,
/// The median long term block weight. /// The median long term block weight.
@ -155,7 +154,7 @@ pub struct BlockChainContext {
pub struct BlockChainContextRequest; pub struct BlockChainContextRequest;
#[derive(Clone)] #[derive(Clone)]
pub struct BlockChainContextService<D> { pub struct BlockChainContextService {
difficulty_cache: Arc<RwLock<difficulty::DifficultyCache>>, difficulty_cache: Arc<RwLock<difficulty::DifficultyCache>>,
weight_cache: Arc<RwLock<weight::BlockWeightsCache>>, weight_cache: Arc<RwLock<weight::BlockWeightsCache>>,
hardfork_state: Arc<RwLock<hardforks::HardForkState>>, hardfork_state: Arc<RwLock<hardforks::HardForkState>>,
@ -163,11 +162,9 @@ pub struct BlockChainContextService<D> {
chain_height: Arc<RwLock<u64>>, chain_height: Arc<RwLock<u64>>,
top_block_hash: Arc<RwLock<[u8; 32]>>, top_block_hash: Arc<RwLock<[u8; 32]>>,
already_generated_coins: Arc<RwLock<u64>>, already_generated_coins: Arc<RwLock<u64>>,
database: D,
} }
impl<D> Service<BlockChainContextRequest> for BlockChainContextService<D> { impl Service<BlockChainContextRequest> for BlockChainContextService {
type Response = BlockChainContext; type Response = BlockChainContext;
type Error = ConsensusError; type Error = ConsensusError;
type Future = type Future =
@ -218,20 +215,17 @@ pub struct UpdateBlockchainCacheRequest {
pub long_term_weight: usize, pub long_term_weight: usize,
pub generated_coins: u64, pub generated_coins: u64,
pub vote: HardFork, pub vote: HardFork,
pub cumulative_difficulty: u128,
} }
impl<D> tower::Service<UpdateBlockchainCacheRequest> for BlockChainContextService<D> impl tower::Service<UpdateBlockchainCacheRequest> for BlockChainContextService {
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
type Response = (); type Response = ();
type Error = tower::BoxError; type Error = tower::BoxError;
type Future = type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.database.poll_ready(cx).map_err(Into::into) Poll::Ready(Ok(()))
} }
fn call(&mut self, new: UpdateBlockchainCacheRequest) -> Self::Future { fn call(&mut self, new: UpdateBlockchainCacheRequest) -> Self::Future {
@ -243,30 +237,23 @@ where
let top_hash = self.top_block_hash.clone(); let top_hash = self.top_block_hash.clone();
let already_generated_coins = self.already_generated_coins.clone(); let already_generated_coins = self.already_generated_coins.clone();
let database = self.database.clone();
async move { async move {
difficulty_cache difficulty_cache
.write() .write()
.await .await
.new_block(new.height, new.timestamp, database.clone()) .new_block(new.height, new.timestamp, new.cumulative_difficulty)
.await?; .await?;
weight_cache weight_cache
.write() .write()
.await .await
.new_block( .new_block(new.height, new.weight, new.long_term_weight)
new.height,
new.weight,
new.long_term_weight,
database.clone(),
)
.await?; .await?;
hardfork_state hardfork_state
.write() .write()
.await .await
.new_block(new.vote, new.height, database) .new_block(new.vote, new.height)
.await?; .await?;
*chain_height.write().await = new.height + 1; *chain_height.write().await = new.height + 1;

View file

@ -57,11 +57,8 @@ pub struct DifficultyCache {
/// The list of timestamps in the window. /// The list of timestamps in the window.
/// len <= [`DIFFICULTY_BLOCKS_COUNT`] /// len <= [`DIFFICULTY_BLOCKS_COUNT`]
timestamps: VecDeque<u64>, timestamps: VecDeque<u64>,
/// The work done in the [`DIFFICULTY_ACCOUNTED_WINDOW_LEN`] window, this is an optimisation
/// so we don't need to keep track of cumulative difficulties as well as timestamps.
windowed_work: u128,
/// The current cumulative difficulty of the chain. /// The current cumulative difficulty of the chain.
cumulative_difficulty: u128, cumulative_difficulties: VecDeque<u128>,
/// The last height we accounted for. /// The last height we accounted for.
last_accounted_height: u64, last_accounted_height: u64,
/// The config /// The config
@ -100,19 +97,16 @@ impl DifficultyCache {
block_start = 1; block_start = 1;
} }
let timestamps = let (timestamps, cumulative_difficulties) =
get_blocks_in_range_timestamps(database.clone(), block_start..chain_height).await?; get_blocks_in_pow_info(database.clone(), block_start..chain_height).await?;
let mut diff = DifficultyCache { let mut diff = DifficultyCache {
timestamps, timestamps,
windowed_work: 0, cumulative_difficulties,
cumulative_difficulty: 0,
last_accounted_height: chain_height - 1, last_accounted_height: chain_height - 1,
config, config,
}; };
diff.update_windowed_work(database).await?;
tracing::info!( tracing::info!(
"Current chain height: {}, accounting for {} blocks timestamps", "Current chain height: {}, accounting for {} blocks timestamps",
chain_height, chain_height,
@ -122,61 +116,23 @@ impl DifficultyCache {
Ok(diff) Ok(diff)
} }
pub async fn new_block<D: Database>( pub async fn new_block(
&mut self, &mut self,
height: u64, height: u64,
timestamp: u64, timestamp: u64,
database: D, cumulative_difficulty: u128,
) -> Result<(), ConsensusError> { ) -> Result<(), ConsensusError> {
assert_eq!(self.last_accounted_height + 1, height); assert_eq!(self.last_accounted_height + 1, height);
self.last_accounted_height += 1; self.last_accounted_height += 1;
self.timestamps.push_back(timestamp); self.timestamps.push_back(timestamp);
self.cumulative_difficulties
.push_back(cumulative_difficulty);
if u64::try_from(self.timestamps.len()).unwrap() > self.config.total_block_count() { if u64::try_from(self.timestamps.len()).unwrap() > self.config.total_block_count() {
self.timestamps.pop_front(); self.timestamps.pop_front();
self.cumulative_difficulties.pop_front();
} }
self.update_windowed_work(database).await?;
Ok(())
}
async fn update_windowed_work<D: Database>(
&mut self,
mut database: D,
) -> Result<(), ConsensusError> {
if self.last_accounted_height == 0 {
return Ok(());
}
let mut block_start =
(self.last_accounted_height + 1).saturating_sub(self.config.total_block_count());
// skip the genesis block
if block_start == 0 {
block_start = 1;
}
let (start, end) =
get_window_start_and_end(self.timestamps.len(), self.config.accounted_window_len());
let low_cumulative_difficulty = get_block_cum_diff(
&mut database,
block_start + TryInto::<u64>::try_into(start).unwrap(),
)
.await?;
let high_cumulative_difficulty = get_block_cum_diff(
&mut database,
block_start + TryInto::<u64>::try_into(end).unwrap() - 1,
)
.await?;
let chain_cumulative_difficulty =
get_block_cum_diff(&mut database, self.last_accounted_height).await?;
self.cumulative_difficulty = chain_cumulative_difficulty;
self.windowed_work = high_cumulative_difficulty - low_cumulative_difficulty;
Ok(()) Ok(())
} }
@ -200,11 +156,14 @@ impl DifficultyCache {
let mut time_span = let mut time_span =
u128::from(sorted_timestamps[window_end - 1] - sorted_timestamps[window_start]); u128::from(sorted_timestamps[window_end - 1] - sorted_timestamps[window_start]);
let windowed_work = self.cumulative_difficulties[window_end - 1]
- self.cumulative_difficulties[window_start];
if time_span == 0 { if time_span == 0 {
time_span = 1; time_span = 1;
} }
(self.windowed_work * hf.block_time().as_secs() as u128 + time_span - 1) / time_span (windowed_work * hf.block_time().as_secs() as u128 + time_span - 1) / time_span
} }
/// Returns the median timestamp over the last `numb_blocks`. /// Returns the median timestamp over the last `numb_blocks`.
@ -222,7 +181,7 @@ impl DifficultyCache {
/// Returns the cumulative difficulty of the chain. /// Returns the cumulative difficulty of the chain.
pub fn cumulative_difficulty(&self) -> u128 { pub fn cumulative_difficulty(&self) -> u128 {
self.cumulative_difficulty self.cumulative_difficulties.back().copied().unwrap_or(0)
} }
pub fn top_block_timestamp(&self) -> Option<u64> { pub fn top_block_timestamp(&self) -> Option<u64> {
@ -246,10 +205,10 @@ fn get_window_start_and_end(window_len: usize, accounted_window: usize) -> (usiz
} }
#[instrument(name = "get_blocks_timestamps", skip(database), level = "info")] #[instrument(name = "get_blocks_timestamps", skip(database), level = "info")]
async fn get_blocks_in_range_timestamps<D: Database + Clone>( async fn get_blocks_in_pow_info<D: Database + Clone>(
database: D, database: D,
block_heights: Range<u64>, block_heights: Range<u64>,
) -> Result<VecDeque<u64>, ConsensusError> { ) -> Result<(VecDeque<u64>, VecDeque<u128>), ConsensusError> {
tracing::info!("Getting blocks timestamps"); tracing::info!("Getting blocks timestamps");
let DatabaseResponse::BlockExtendedHeaderInRange(ext_header) = database let DatabaseResponse::BlockExtendedHeaderInRange(ext_header) = database
@ -259,15 +218,8 @@ async fn get_blocks_in_range_timestamps<D: Database + Clone>(
panic!("Database sent incorrect response"); panic!("Database sent incorrect response");
}; };
Ok(ext_header.into_iter().map(|info| info.timestamp).collect()) Ok(ext_header
} .into_iter()
.map(|info| (info.timestamp, info.cumulative_difficulty))
async fn get_block_cum_diff<D: Database>(database: D, height: u64) -> Result<u128, ConsensusError> { .unzip())
let DatabaseResponse::BlockExtendedHeader(ext_header) = database
.oneshot(DatabaseRequest::BlockExtendedHeader(height.into()))
.await?
else {
panic!("Database service sent incorrect response!");
};
Ok(ext_header.cumulative_difficulty)
} }

View file

@ -1,4 +1,5 @@
use std::{ use std::{
collections::VecDeque,
fmt::{Display, Formatter}, fmt::{Display, Formatter},
ops::Range, ops::Range,
time::Duration, time::Duration,
@ -192,9 +193,11 @@ impl HardFork {
} }
/// A struct holding the current voting state of the blockchain. /// A struct holding the current voting state of the blockchain.
#[derive(Debug, Default, Clone)] #[derive(Debug, Clone)]
struct HFVotes { struct HFVotes {
votes: [u64; NUMB_OF_HARD_FORKS], votes: [u64; NUMB_OF_HARD_FORKS],
vote_list: VecDeque<HardFork>,
window_size: usize,
} }
impl Display for HFVotes { impl Display for HFVotes {
@ -222,19 +225,22 @@ impl Display for HFVotes {
} }
impl HFVotes { impl HFVotes {
/// Add votes for a hard-fork pub fn new(window_size: usize) -> HFVotes {
pub fn add_votes_for_hf(&mut self, hf: &HardFork, votes: u64) { HFVotes {
self.votes[*hf as usize - 1] += votes; votes: [0; NUMB_OF_HARD_FORKS],
vote_list: VecDeque::with_capacity(window_size),
window_size,
}
} }
/// Add a vote for a hard-fork. /// Add a vote for a hard-fork, this function removes votes outside of the window.
pub fn add_vote_for_hf(&mut self, hf: &HardFork) { pub fn add_vote_for_hf(&mut self, hf: &HardFork) {
self.add_votes_for_hf(hf, 1) self.vote_list.push_back(*hf);
} self.votes[*hf as usize - 1] += 1;
if self.vote_list.len() > self.window_size {
/// Remove a vote for a hard-fork. let hf = self.vote_list.pop_front().unwrap();
pub fn remove_vote_for_hf(&mut self, hf: &HardFork) { self.votes[hf as usize - 1] -= 1;
self.votes[*hf as usize - 1] -= 1; }
} }
/// Returns the total votes for a hard-fork. /// Returns the total votes for a hard-fork.
@ -291,7 +297,12 @@ impl HardForkState {
let block_start = chain_height.saturating_sub(config.window); let block_start = chain_height.saturating_sub(config.window);
let votes = get_votes_in_range(database.clone(), block_start..chain_height).await?; let votes = get_votes_in_range(
database.clone(),
block_start..chain_height,
usize::try_from(config.window).unwrap(),
)
.await?;
if chain_height > config.window { if chain_height > config.window {
debug_assert_eq!(votes.total_votes(), config.window) debug_assert_eq!(votes.total_votes(), config.window)
@ -331,12 +342,7 @@ impl HardForkState {
Ok(hfs) Ok(hfs)
} }
pub async fn new_block<D: Database>( pub async fn new_block(&mut self, vote: HardFork, height: u64) -> Result<(), ConsensusError> {
&mut self,
vote: HardFork,
height: u64,
mut database: D,
) -> Result<(), ConsensusError> {
assert_eq!(self.last_height + 1, height); assert_eq!(self.last_height + 1, height);
self.last_height += 1; self.last_height += 1;
@ -348,29 +354,6 @@ impl HardForkState {
self.votes.add_vote_for_hf(&vote); self.votes.add_vote_for_hf(&vote);
for height_to_remove in
(self.config.window..self.votes.total_votes()).map(|offset| height - offset)
{
let DatabaseResponse::BlockExtendedHeader(ext_header) = database
.ready()
.await?
.call(DatabaseRequest::BlockExtendedHeader(
height_to_remove.into(),
))
.await?
else {
panic!("Database sent incorrect response!");
};
tracing::debug!(
"Removing block {} vote ({:?}) as they have left the window",
height_to_remove,
ext_header.vote
);
self.votes.remove_vote_for_hf(&ext_header.vote);
}
if height > self.config.window { if height > self.config.window {
debug_assert_eq!(self.votes.total_votes(), self.config.window); debug_assert_eq!(self.votes.total_votes(), self.config.window);
} }
@ -418,8 +401,9 @@ pub fn votes_needed(threshold: u64, window: u64) -> u64 {
async fn get_votes_in_range<D: Database>( async fn get_votes_in_range<D: Database>(
database: D, database: D,
block_heights: Range<u64>, block_heights: Range<u64>,
window_size: usize,
) -> Result<HFVotes, ConsensusError> { ) -> Result<HFVotes, ConsensusError> {
let mut votes = HFVotes::default(); let mut votes = HFVotes::new(window_size);
let DatabaseResponse::BlockExtendedHeaderInRange(vote_list) = database let DatabaseResponse::BlockExtendedHeaderInRange(vote_list) = database
.oneshot(DatabaseRequest::BlockExtendedHeaderInRange(block_heights)) .oneshot(DatabaseRequest::BlockExtendedHeaderInRange(block_heights))

View file

@ -21,5 +21,7 @@ fn main() {
.file("c/CryptonightR_JIT.c") .file("c/CryptonightR_JIT.c")
.file("c/CryptonightR_template.S") .file("c/CryptonightR_template.S")
.flag("-maes") .flag("-maes")
.flag("-Ofast")
.flag("-fexceptions")
.compile("cryptonight") .compile("cryptonight")
} }