From ab3c496bbd38c56bd025eff5df96113cb1da5bc8 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Wed, 6 Sep 2023 15:54:49 +0100 Subject: [PATCH] add difficulty calculations --- Cargo.toml | 2 + common/src/pruning.rs | 4 +- consensus/Cargo.toml | 6 +- consensus/src/bin/scan_chain.rs | 25 +++- consensus/src/hardforks.rs | 59 ++++++--- consensus/src/lib.rs | 7 +- consensus/src/pow.rs | 28 ++++ consensus/src/pow/difficulty.rs | 221 ++++++++++++++++++++++++++++++++ consensus/src/rpc.rs | 61 ++++++++- net/levin/src/codec.rs | 7 +- 10 files changed, 388 insertions(+), 32 deletions(-) create mode 100644 consensus/src/pow.rs create mode 100644 consensus/src/pow/difficulty.rs diff --git a/Cargo.toml b/Cargo.toml index 02ef432c..56f4eb88 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] +resolver = "2" members = [ "common", @@ -13,3 +14,4 @@ members = [ # "p2p/sync-states" ] + diff --git a/common/src/pruning.rs b/common/src/pruning.rs index d4575696..ccd1d55e 100644 --- a/common/src/pruning.rs +++ b/common/src/pruning.rs @@ -126,7 +126,7 @@ impl PruningSeed { Err(PruningError::BlockChainHeightTooLarge) } else { let Some(seed_stripe) = self.get_stripe() else { - // If the `get_stripe` returns None that means no pruning so the next + // If the `get_stripe` returns None that means no pruning so the next // unpruned block is the one inputted. return Ok(block_height); }; @@ -192,7 +192,7 @@ impl PruningSeed { blockchain_height: u64, ) -> Result { let Some(seed_stripe) = self.get_stripe() else { - // If the `get_stripe` returns None that means no pruning so the next + // If the `get_stripe` returns None that means no pruning so the next // pruned block is nonexistent so we return the blockchain_height. return Ok(blockchain_height); }; diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 4d0e927d..a87c9180 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -10,7 +10,7 @@ repository = "https://github.com/Cuprate/cuprate/tree/main/consensus" [features] default = ["binaries"] binaries = ["rpc", "dep:tokio", "dep:tracing-subscriber", "tower/retry", "tower/balance", "tower/buffer"] -rpc = ["dep:futures"] +rpc = ["dep:futures", "dep:serde_json", "dep:serde"] [dependencies] hex = "0.4" @@ -18,6 +18,8 @@ thiserror = "1" tower = {version = "0.4", features = ["util"]} tracing = "0.1" +crypto-bigint = "0.5" + randomx-rs = "1" monero-serai = {git="https://github.com/Cuprate/serai.git", rev = "84b77b1"} @@ -26,6 +28,8 @@ cryptonight-cuprate = {path = "../cryptonight"} # used for rpc futures = {version = "0.3", optional = true} +serde_json = {version = "1", optional = true} +serde = {version = "1", optional = true, features = ["derive"]} # used in binaries tokio = { version = "1", features = ["rt-multi-thread", "macros"], optional = true } tracing-subscriber = {version = "0.3", optional = true} diff --git a/consensus/src/bin/scan_chain.rs b/consensus/src/bin/scan_chain.rs index 8d71db7d..c26b9610 100644 --- a/consensus/src/bin/scan_chain.rs +++ b/consensus/src/bin/scan_chain.rs @@ -8,10 +8,13 @@ use std::task::{Context, Poll}; use tower::balance::p2c::Balance; use tower::discover::Change; use tower::util::BoxService; +use tower::{Service, ServiceExt}; +use monero_consensus::DatabaseRequest; use tracing::level_filters::LevelFilter; -use monero_consensus::hardforks::{HardForkConfig, HardForks}; +use monero_consensus::hardforks::HardFork; +use monero_consensus::pow::difficulty::DifficultyCalculator; use monero_consensus::rpc::Rpc; struct RpcDiscoverer(Vec, u64); @@ -77,9 +80,25 @@ async fn main() { ); let rpc_balance = Balance::new(rpc_discoverer); let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(rpc_balance), 3); - let rpc = tower::retry::Retry::new(Attempts(3), rpc_buffer); + let mut rpc = tower::retry::Retry::new(Attempts(3), rpc_buffer); - let _hfs = HardForks::init_at_chain_height(HardForkConfig::default(), 1009827, rpc.clone()) + let pow_info = rpc + .ready() + .await + .unwrap() + .call(DatabaseRequest::BlockPOWInfo(64.into())) .await .unwrap(); + + println!("{pow_info:?}"); + + let difficulty = DifficultyCalculator::init_from_chain_height(2968227, rpc.clone()) + .await + .unwrap(); + + println!("{:?}", difficulty.next_difficulty(&HardFork::V16)); //257344482654 + + //let _hfs = HardForks::init_at_chain_height(HardForkConfig::default(), 1009827, rpc.clone()) + // .await + // .unwrap(); } diff --git a/consensus/src/hardforks.rs b/consensus/src/hardforks.rs index 6d1abfb9..923e14d3 100644 --- a/consensus/src/hardforks.rs +++ b/consensus/src/hardforks.rs @@ -1,5 +1,6 @@ use futures::stream::FuturesUnordered; use futures::{StreamExt, TryFutureExt}; +use std::fmt::{Display, Formatter}; use std::ops::Range; use monero_serai::block::BlockHeader; @@ -154,6 +155,30 @@ struct HFVotes { votes: [u64; 16], } +impl Display for HFVotes { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HFVotes") + .field("total", &self.total_votes()) + .field("V1", &self.votes_for_hf(&HardFork::V1)) + .field("V2", &self.votes_for_hf(&HardFork::V2)) + .field("V3", &self.votes_for_hf(&HardFork::V3)) + .field("V4", &self.votes_for_hf(&HardFork::V4)) + .field("V5", &self.votes_for_hf(&HardFork::V5)) + .field("V6", &self.votes_for_hf(&HardFork::V6)) + .field("V7", &self.votes_for_hf(&HardFork::V7)) + .field("V8", &self.votes_for_hf(&HardFork::V8)) + .field("V9", &self.votes_for_hf(&HardFork::V9)) + .field("V10", &self.votes_for_hf(&HardFork::V10)) + .field("V11", &self.votes_for_hf(&HardFork::V11)) + .field("V12", &self.votes_for_hf(&HardFork::V12)) + .field("V13", &self.votes_for_hf(&HardFork::V13)) + .field("V14", &self.votes_for_hf(&HardFork::V14)) + .field("V15", &self.votes_for_hf(&HardFork::V15)) + .field("V16", &self.votes_for_hf(&HardFork::V16)) + .finish() + } +} + impl HFVotes { /// Add votes for a hard-fork pub fn add_votes_for_hf(&mut self, hf: &HardFork, votes: u64) { @@ -173,7 +198,7 @@ impl HFVotes { /// Returns the total votes for a hard-fork. /// /// http://localhost:3000/consensus_rules/hardforks.html#accepting-a-fork - pub fn get_votes_for_hf(&self, hf: &HardFork) -> u64 { + pub fn votes_for_hf(&self, hf: &HardFork) -> u64 { self.votes[*hf as usize - 1..].iter().sum() } @@ -223,12 +248,13 @@ impl HardForks { D::Future: Send + 'static, { let DatabaseResponse::ChainHeight(chain_height) = database - .ready() - .await? - .call(DatabaseRequest::ChainHeight) - .await? else { - panic!("Database sent incorrect response") - }; + .ready() + .await? + .call(DatabaseRequest::ChainHeight) + .await? + else { + panic!("Database sent incorrect response") + }; let mut hfs = HardForks::init_at_chain_height(config, chain_height, database.clone()).await?; @@ -251,13 +277,9 @@ impl HardForks { where D::Future: Send + 'static, { - let block_heights = if chain_height > config.window { - chain_height - config.window..chain_height - } else { - 0..chain_height - }; + let block_start = chain_height.saturating_sub(config.window); - let votes = get_votes_in_range(database.clone(), block_heights).await?; + let votes = get_votes_in_range(database.clone(), block_start..chain_height).await?; if chain_height > config.window { debug_assert_eq!(votes.total_votes(), config.window) @@ -291,7 +313,8 @@ impl HardForks { .ready() .await? .call(DatabaseRequest::ChainHeight) - .await? else { + .await? + else { panic!("Database sent incorrect response") }; @@ -311,7 +334,8 @@ impl HardForks { .ready() .await? .call(DatabaseRequest::ChainHeight) - .await? else { + .await? + else { panic!("Database sent incorrect response") }; chain_height = c_h; @@ -381,7 +405,7 @@ impl HardForks { fn check_set_new_hf(&mut self) { while let Some(new_hf) = self.next_hardfork { if self.last_height + 1 >= new_hf.fork_height(&self.config.network) - && self.votes.get_votes_for_hf(&new_hf) + && self.votes.votes_for_hf(&new_hf) >= new_hf.votes_needed(&self.config.network, self.config.window) { self.set_hf(new_hf); @@ -430,7 +454,8 @@ async fn get_block_header( ) -> Result { let DatabaseResponse::BlockHeader(header) = database .oneshot(DatabaseRequest::BlockHeader(block_id.into())) - .await? else { + .await? + else { panic!("Database sent incorrect response for block header request") }; Ok(header) diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 08647739..a090de1c 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -1,9 +1,6 @@ - - - - pub mod genesis; pub mod hardforks; +pub mod pow; #[cfg(feature = "rpc")] pub mod rpc; @@ -28,11 +25,13 @@ impl bool { + let int_hash = U256::from_le_slice(hash); + + let difficulty = U256::from_u128(difficulty); + + int_hash.checked_mul(&difficulty).is_some().unwrap_u8() == 1 +} + +#[test] +fn chekc() { + let hash = hex::decode("5aeebb3de73859d92f3f82fdb97286d81264ecb72a42e4b9f1e6d62eb682d7c0") + .unwrap() + .try_into() + .unwrap(); + let diff = 257344482654; + + assert!(check_block_pow(&hash, diff)) +} diff --git a/consensus/src/pow/difficulty.rs b/consensus/src/pow/difficulty.rs new file mode 100644 index 00000000..24e12551 --- /dev/null +++ b/consensus/src/pow/difficulty.rs @@ -0,0 +1,221 @@ +use futures::stream::FuturesOrdered; +use futures::{StreamExt, TryFutureExt}; +use std::ops::Range; +use tower::ServiceExt; +use tracing::instrument; + +use crate::{hardforks::HardFork, Database, DatabaseRequest, DatabaseResponse, Error}; + +/// The amount of blocks we account for to calculate difficulty +const DIFFICULTY_WINDOW: usize = 720; +/// The proportion of blocks we remove from the [`DIFFICULTY_WINDOW`]. When the window +/// if 720 this means that 60 blocks are removed from the ends of the window so 120 +/// blocks removed in total. +const DIFFICULTY_CUT: usize = 60; +/// The amount of blocks we add onto the window before doing any calculations so that the +/// difficulty lags by this amount of blocks +const DIFFICULTY_LAG: usize = 15; +/// The total amount of blocks we need to track to calculate difficulty +const DIFFICULTY_BLOCKS_COUNT: u64 = (DIFFICULTY_WINDOW + DIFFICULTY_LAG) as u64; +/// The amount of blocks we account for after removing the outliers. +const DIFFICULTY_ACCOUNTED_WINDOW_LEN: usize = DIFFICULTY_WINDOW - 2 * DIFFICULTY_CUT; + +/// This struct is able to calculate difficulties from blockchain information. +#[derive(Debug)] +pub struct DifficultyCalculator { + /// The list of timestamps in the window. + /// len <= [`DIFFICULTY_BLOCKS_COUNT`] + timestamps: Vec, + /// The work done in the [`DIFFICULTY_ACCOUNTED_WINDOW_LEN`] window, this is an optimisation + /// so we don't need to keep track of cumulative difficulties as well as timestamps. + windowed_work: u128, + /// The last height we accounted for. + last_accounted_height: u64, +} + +impl DifficultyCalculator { + pub async fn init(mut database: D) -> Result { + let DatabaseResponse::ChainHeight(chain_height) = database + .ready() + .await? + .call(DatabaseRequest::ChainHeight) + .await? + else { + panic!("Database sent incorrect response") + }; + + DifficultyCalculator::init_from_chain_height(chain_height, database).await + } + + pub async fn init_from_chain_height( + chain_height: u64, + mut database: D, + ) -> Result { + let block_start = chain_height.saturating_sub(DIFFICULTY_BLOCKS_COUNT); + + let timestamps = + get_blocks_in_range_timestamps(database.clone(), block_start..chain_height).await?; + + tracing::debug!( + "Current chain height: {}, accounting for {} blocks timestamps", + chain_height, + timestamps.len() + ); + + let mut diff = DifficultyCalculator { + timestamps, + windowed_work: 0, + last_accounted_height: chain_height - 1, + }; + + diff.update_windowed_work(&mut database).await?; + + Ok(diff) + } + + pub async fn resync(&mut self, mut database: D) -> Result<(), Error> { + let DatabaseResponse::ChainHeight(chain_height) = database + .ready() + .await? + .call(DatabaseRequest::ChainHeight) + .await? + else { + panic!("Database sent incorrect response") + }; + + // TODO: We need to handle re-orgs + assert!(chain_height > self.last_accounted_height); + + if chain_height == self.last_accounted_height + 1 { + return Ok(()); + } + + let mut timestamps = get_blocks_in_range_timestamps( + database.clone(), + self.last_accounted_height + 1..chain_height, + ) + .await?; + + self.timestamps.append(&mut timestamps); + + self.timestamps.drain( + 0..self + .timestamps + .len() + .saturating_sub(DIFFICULTY_BLOCKS_COUNT as usize), + ); + + self.last_accounted_height = chain_height - 1; + + self.update_windowed_work(database).await + } + + async fn update_windowed_work(&mut self, mut database: D) -> Result<(), Error> { + let block_start = (self.last_accounted_height + 1).saturating_sub(DIFFICULTY_BLOCKS_COUNT); + + let (start, end) = get_window_start_and_end(self.timestamps.len()); + + let low_cumulative_difficulty = get_block_cum_diff( + &mut database, + block_start + TryInto::::try_into(start).unwrap(), + ) + .await?; + + let high_cumulative_difficulty = get_block_cum_diff( + &mut database, + block_start + TryInto::::try_into(end).unwrap() - 1, + ) + .await?; + + self.windowed_work = high_cumulative_difficulty - low_cumulative_difficulty; + Ok(()) + } + + pub fn next_difficulty(&self, hf: &HardFork) -> u128 { + if self.timestamps.len() <= 1 { + return 1; + } + + let mut sorted_timestamps = self.timestamps.clone(); + sorted_timestamps.drain(DIFFICULTY_WINDOW..); + sorted_timestamps.sort_unstable(); + + let (window_start, window_end) = get_window_start_and_end(sorted_timestamps.len()); + + let mut time_span = + u128::from(sorted_timestamps[window_end - 1] - sorted_timestamps[window_start]); + + if time_span == 0 { + time_span = 1; + } + + (self.windowed_work * target_time_for_hf(hf) + time_span - 1) / time_span + } +} + +fn get_window_start_and_end(window_len: usize) -> (usize, usize) { + let window_len = if window_len > DIFFICULTY_WINDOW { + DIFFICULTY_WINDOW + } else { + window_len + }; + + if window_len <= DIFFICULTY_ACCOUNTED_WINDOW_LEN { + (0, window_len) + } else { + let start = (window_len - (DIFFICULTY_ACCOUNTED_WINDOW_LEN) + 1) / 2; + (start, start + DIFFICULTY_ACCOUNTED_WINDOW_LEN) + } +} + +#[instrument(skip(database))] +async fn get_blocks_in_range_timestamps( + database: D, + block_heights: Range, +) -> Result, Error> { + let start = block_heights.start; + let mut timestamps = Vec::with_capacity( + TryInto::::try_into(block_heights.end - start) + .expect("Height does not fit into usize!"), + ); + + let mut timestamp_fut = FuturesOrdered::from_iter(block_heights.map(|height| { + get_block_timestamp(database.clone(), height).map_ok(move |res| (height, res)) + })); + + while let Some(res) = timestamp_fut.next().await { + let (height, timestamp): (u64, u64) = res?; + tracing::debug!("Block timestamp for height: {} = {:?}", height, timestamp); + + timestamps.push(timestamp); + } + + Ok(timestamps) +} + +async fn get_block_timestamp(database: D, height: u64) -> Result { + let DatabaseResponse::BlockPOWInfo(pow) = database + .oneshot(DatabaseRequest::BlockPOWInfo(height.into())) + .await? + else { + panic!("Database service sent incorrect response!"); + }; + Ok(pow.timestamp) +} + +async fn get_block_cum_diff(database: D, height: u64) -> Result { + let DatabaseResponse::BlockPOWInfo(pow) = database + .oneshot(DatabaseRequest::BlockPOWInfo(height.into())) + .await? + else { + panic!("Database service sent incorrect response!"); + }; + Ok(pow.cumulative_difficulty) +} + +fn target_time_for_hf(hf: &HardFork) -> u128 { + match hf { + HardFork::V1 => 60, + _ => 120, + } +} diff --git a/consensus/src/rpc.rs b/consensus/src/rpc.rs index 7c635400..d2a542fc 100644 --- a/consensus/src/rpc.rs +++ b/consensus/src/rpc.rs @@ -1,16 +1,18 @@ -use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture}; -use futures::{FutureExt, TryFutureExt}; use std::future::Future; - use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; +use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture}; +use futures::{FutureExt, TryFutureExt}; use monero_serai::rpc::{HttpRpc, RpcConnection}; +use serde::Deserialize; +use serde_json::json; use tower::BoxError; use cuprate_common::BlockID; +use crate::pow::BlockPOWInfo; use crate::{DatabaseRequest, DatabaseResponse}; enum RpcState { @@ -109,6 +111,59 @@ impl tower::Service f } .boxed(), }, + DatabaseRequest::BlockPOWInfo(id) => get_blocks_pow_info(id, rpc).boxed(), } } } + +async fn get_blocks_pow_info( + id: BlockID, + rpc: OwnedMutexGuard>, +) -> Result { + #[derive(Deserialize, Debug)] + struct BlockHeaderResponse { + cumulative_difficulty: u64, + cumulative_difficulty_top64: u64, + timestamp: u64, + } + + #[derive(Deserialize, Debug)] + struct Response { + block_header: BlockHeaderResponse, + } + + match id { + BlockID::Height(height) => { + let res = rpc + .json_rpc_call::( + "get_block_header_by_height", + Some(json!({"height": height})), + ) + .await?; + Ok(DatabaseResponse::BlockPOWInfo(BlockPOWInfo { + timestamp: res.block_header.timestamp, + cumulative_difficulty: u128_from_low_high( + res.block_header.cumulative_difficulty, + res.block_header.cumulative_difficulty_top64, + ), + })) + } + BlockID::Hash(hash) => { + let res = rpc + .json_rpc_call::("get_block_header_by_hash", Some(json!({"hash": hash}))) + .await?; + Ok(DatabaseResponse::BlockPOWInfo(BlockPOWInfo { + timestamp: res.block_header.timestamp, + cumulative_difficulty: u128_from_low_high( + res.block_header.cumulative_difficulty, + res.block_header.cumulative_difficulty_top64, + ), + })) + } + } +} + +fn u128_from_low_high(low: u64, high: u64) -> u128 { + let res: u128 = high as u128; + res << 64 | low as u128 +} diff --git a/net/levin/src/codec.rs b/net/levin/src/codec.rs index 24f69ee1..8333ffa5 100644 --- a/net/levin/src/codec.rs +++ b/net/levin/src/codec.rs @@ -61,7 +61,9 @@ impl Decoder for LevinCodec { return Ok(None); } - let LevinCodec::WaitingForBody(header) = std::mem::replace(self, LevinCodec::WaitingForHeader) else { + let LevinCodec::WaitingForBody(header) = + std::mem::replace(self, LevinCodec::WaitingForHeader) + else { unreachable!() }; @@ -212,7 +214,8 @@ impl Decoder for LevinMessageCodec { if end_fragment { let MessageState::WaitingForRestOfFragment(bytes, ty, command) = - std::mem::replace(&mut self.state, MessageState::WaitingForBucket) else { + std::mem::replace(&mut self.state, MessageState::WaitingForBucket) + else { unreachable!(); };