From d5595b7eaf9786181c77c5d32f0534ee5e94a400 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Tue, 3 Oct 2023 22:10:31 +0100 Subject: [PATCH] add logic to build all caches synchronously --- common/src/lib.rs | 2 +- common/src/network.rs | 3 +- consensus/Cargo.toml | 9 +- consensus/src/bin/scan_chain.rs | 145 ++++++++++++++++++++------ consensus/src/block/pow/difficulty.rs | 16 ++- consensus/src/block/weight.rs | 9 +- consensus/src/hardforks.rs | 61 +++++------ consensus/src/lib.rs | 11 +- consensus/src/rpc.rs | 64 +++++++++++- 9 files changed, 240 insertions(+), 80 deletions(-) diff --git a/common/src/lib.rs b/common/src/lib.rs index 4bd48d11..4bac38b8 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -2,7 +2,7 @@ pub mod network; pub mod pruning; -use std::fmt::{Formatter, Write}; +use std::fmt::Formatter; //pub use hardforks::HardForks; pub use network::Network; pub use pruning::{PruningError, PruningSeed}; diff --git a/common/src/network.rs b/common/src/network.rs index 85891f97..81f17a6c 100644 --- a/common/src/network.rs +++ b/common/src/network.rs @@ -8,8 +8,9 @@ const STAGENET_NETWORK_ID: [u8; 16] = [ 0x12, 0x30, 0xF1, 0x71, 0x61, 0x04, 0x41, 0x61, 0x17, 0x31, 0x00, 0x82, 0x16, 0xA1, 0xA1, 0x12, ]; -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Default)] pub enum Network { + #[default] Mainnet, Testnet, Stagenet, diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index a57d3e6b..405ee168 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -9,14 +9,14 @@ repository = "https://github.com/Cuprate/cuprate/tree/main/consensus" [features] default = ["binaries"] -binaries = ["rpc", "dep:tokio", "dep:tracing-subscriber", "tower/retry", "tower/balance", "tower/buffer"] -rpc = ["dep:futures", "dep:serde_json", "dep:serde"] +binaries = ["dep:tokio", "dep:tracing-subscriber", "tower/retry", "tower/balance", "tower/buffer", "dep:serde_json", "dep:serde", "dep:epee-encoding"] [dependencies] hex = "0.4" thiserror = "1" tower = {version = "0.4", features = ["util"]} tracing = "0.1" +futures = "0.3" crypto-bigint = "0.5" @@ -26,11 +26,10 @@ monero-serai = {git="https://github.com/Cuprate/serai.git", rev = "84b77b1"} cuprate-common = {path = "../common"} cryptonight-cuprate = {path = "../cryptonight"} -# used for rpc -futures = {version = "0.3", optional = true} +# used in binaries +epee-encoding = {version = "0.5", optional = true} serde_json = {version = "1", optional = true} serde = {version = "1", optional = true, features = ["derive"]} -# used in binaries tokio = { version = "1", features = ["rt-multi-thread", "macros"], optional = true } tracing-subscriber = {version = "0.3", optional = true} # here to help cargo to pick a version - remove me diff --git a/consensus/src/bin/scan_chain.rs b/consensus/src/bin/scan_chain.rs index 2fdea5fb..498246c3 100644 --- a/consensus/src/bin/scan_chain.rs +++ b/consensus/src/bin/scan_chain.rs @@ -1,17 +1,123 @@ #![cfg(feature = "binaries")] -use tower::ServiceExt; +use cuprate_common::Network; +use std::collections::HashMap; +use std::fmt::{Display, Formatter}; + +use tower::{Service, ServiceExt}; +use tracing::instrument; use tracing::level_filters::LevelFilter; -use monero_consensus::block::{pow::difficulty::DifficultyCache, weight::BlockWeightsCache}; -use monero_consensus::hardforks::HardFork; -use monero_consensus::rpc::init_rpc_load_balancer; -use monero_consensus::{DatabaseRequest, DatabaseResponse}; + + +use monero_consensus::rpc::{init_rpc_load_balancer, MAX_BLOCKS_IN_RANGE}; +use monero_consensus::{ + state::{Config, State}, + ConsensusError, Database, DatabaseRequest, DatabaseResponse, +}; + +/// A cache which can keep chain state while scanning. +/// +/// Because we are using a RPC interface with node we need to keep track +/// of certain data that node doesn't hold like the number of outputs at +/// a certain time. +#[derive(Debug, Clone)] +struct ScanningCache { + network: Network, + numb_outs: HashMap, + /// The height of the *next* block to scan. + height: u64, +} + +impl Default for ScanningCache { + fn default() -> Self { + ScanningCache { + network: Default::default(), + numb_outs: Default::default(), + height: 1, + } + } +} + +impl ScanningCache { + fn total_outs(&self) -> u64 { + self.numb_outs.values().sum() + } + + fn numb_outs(&self, amount: u64) -> u64 { + *self.numb_outs.get(&amount).unwrap_or(&0) + } + + fn add_outs(&mut self, amount: u64, count: u64) { + if let Some(numb_outs) = self.numb_outs.get_mut(&amount) { + *numb_outs += count; + } else { + self.numb_outs.insert(amount, count); + } + } +} + +impl Display for ScanningCache { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let rct_outs = self.numb_outs(0); + let total_outs = self.total_outs(); + + f.debug_struct("Cache") + .field("next_block", &self.height) + .field("rct_outs", &rct_outs) + .field("total_outs", &total_outs) + .finish() + } +} + +#[instrument(skip_all, level = "info")] +async fn scan_chain( + cache: ScanningCache, + network: Network, + mut database: D, +) -> Result<(), ConsensusError> { + tracing::info!("Beginning chain scan, {}", &cache); + + let DatabaseResponse::ChainHeight(chain_height) = database + .ready() + .await? + .call(DatabaseRequest::ChainHeight) + .await? + else { + panic!("Database sent incorrect response!"); + }; + + tracing::info!("scanning to chain height: {}", chain_height); + + let config = match network { + Network::Mainnet => Config::main_net(), + _ => todo!(), + }; + + let _state = State::init_at_chain_height(config, cache.height, database.clone()).await?; + + tracing::info!("Initialised state, begging scan"); + + for height in (cache.height..chain_height).step_by(MAX_BLOCKS_IN_RANGE as usize) { + let DatabaseResponse::BlockBatchInRange(_blocks) = database + .ready() + .await? + .call(DatabaseRequest::BlockBatchInRange( + height..(height + MAX_BLOCKS_IN_RANGE).max(chain_height), + )) + .await? + else { + panic!("Database sent incorrect response!"); + }; + } + + Ok(()) +} #[tokio::main] async fn main() { tracing_subscriber::fmt() - .with_max_level(LevelFilter::DEBUG) + .with_max_level(LevelFilter::INFO) .init(); let urls = vec![ @@ -32,29 +138,10 @@ async fn main() { "http://node.c3pool.com:18081".to_string(), ]; - let mut rpc = init_rpc_load_balancer(urls); + let rpc = init_rpc_load_balancer(urls); - let mut difficulty = DifficultyCache::init_from_chain_height(2985610, rpc.clone()) - .await - .unwrap(); - /* - let DatabaseResponse::BlockWeights(weights) = rpc - .oneshot(DatabaseRequest::BlockWeights(2985610.into())) - .await - .unwrap() - else { - panic!() - }; + let network = Network::Mainnet; + let cache = ScanningCache::default(); - assert_eq!( - weights.long_term_weight, - difficulty.next_block_long_term_weight(&HardFork::V16, weights.block_weight) - ); - - */ - println!("{:?}", difficulty.next_difficulty(&HardFork::V16)); //774466376 - - //let _hfs = HardForks::init_at_chain_height(HardForkConfig::default(), 1009827, rpc.clone()) - // .await - // .unwrap(); + scan_chain(cache, network, rpc).await.unwrap(); } diff --git a/consensus/src/block/pow/difficulty.rs b/consensus/src/block/pow/difficulty.rs index d13d3555..5279e947 100644 --- a/consensus/src/block/pow/difficulty.rs +++ b/consensus/src/block/pow/difficulty.rs @@ -1,5 +1,5 @@ -use futures::stream::FuturesOrdered; -use futures::{TryFutureExt, TryStreamExt}; + + use std::ops::Range; use tower::ServiceExt; use tracing::instrument; @@ -21,7 +21,7 @@ const DIFFICULTY_BLOCKS_COUNT: u64 = (DIFFICULTY_WINDOW + DIFFICULTY_LAG) as u64 const DIFFICULTY_ACCOUNTED_WINDOW_LEN: usize = DIFFICULTY_WINDOW - 2 * DIFFICULTY_CUT; /// This struct is able to calculate difficulties from blockchain information. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DifficultyCache { /// The list of timestamps in the window. /// len <= [`DIFFICULTY_BLOCKS_COUNT`] @@ -52,6 +52,8 @@ impl DifficultyCache { chain_height: u64, mut database: D, ) -> Result { + tracing::info!("Initializing difficulty cache this may take a while."); + let mut block_start = chain_height.saturating_sub(DIFFICULTY_BLOCKS_COUNT); if block_start == 0 { @@ -122,6 +124,10 @@ impl DifficultyCache { &mut self, mut database: D, ) -> Result<(), ConsensusError> { + if self.last_accounted_height == 0 { + return Ok(()); + } + let mut block_start = (self.last_accounted_height + 1).saturating_sub(DIFFICULTY_BLOCKS_COUNT); @@ -189,11 +195,13 @@ fn get_window_start_and_end(window_len: usize) -> (usize, usize) { } } -#[instrument(name = "get_blocks_timestamps", skip(database))] +#[instrument(name = "get_blocks_timestamps", skip(database), level = "info")] async fn get_blocks_in_range_timestamps( database: D, block_heights: Range, ) -> Result, ConsensusError> { + tracing::info!("Getting blocks timestamps"); + let DatabaseResponse::BlockPOWInfoInRange(pow_infos) = database .oneshot(DatabaseRequest::BlockPOWInfoInRange(block_heights)) .await? diff --git a/consensus/src/block/weight.rs b/consensus/src/block/weight.rs index d17e8f64..4e6855d2 100644 --- a/consensus/src/block/weight.rs +++ b/consensus/src/block/weight.rs @@ -57,6 +57,7 @@ pub fn penalty_free_zone(hf: &HardFork) -> usize { /// /// These calculations require a lot of data from the database so by caching /// this data it reduces the load on the database. +#[derive(Clone)] pub struct BlockWeightsCache { /// This list is not sorted. short_term_block_weights: VecDeque, @@ -87,6 +88,8 @@ impl BlockWeightsCache { chain_height: u64, database: D, ) -> Result { + tracing::info!("Initializing weight cache this may take a while."); + let mut long_term_weights = get_long_term_weight_in_range( chain_height.saturating_sub(LONG_TERM_WINDOW)..chain_height, database.clone(), @@ -266,6 +269,8 @@ async fn get_blocks_weight_in_range( range: Range, database: D, ) -> Result, ConsensusError> { + tracing::info!("getting block weights."); + let DatabaseResponse::BlockWeightsInRange(weights) = database .oneshot(DatabaseRequest::BlockWeightsInRange(range)) .await? @@ -276,11 +281,13 @@ async fn get_blocks_weight_in_range( Ok(weights.into_iter().map(|info| info.block_weight).collect()) } -#[instrument(name = "get_long_term_weights", skip(database))] +#[instrument(name = "get_long_term_weights", skip(database), level = "info")] async fn get_long_term_weight_in_range( range: Range, database: D, ) -> Result, ConsensusError> { + tracing::info!("getting block long term weights."); + let DatabaseResponse::BlockWeightsInRange(weights) = database .oneshot(DatabaseRequest::BlockWeightsInRange(range)) .await? diff --git a/consensus/src/hardforks.rs b/consensus/src/hardforks.rs index 0d69c89d..1193e7ee 100644 --- a/consensus/src/hardforks.rs +++ b/consensus/src/hardforks.rs @@ -167,7 +167,7 @@ impl HardFork { } /// A struct holding the current voting state of the blockchain. -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] struct HFVotes { votes: [u64; 16], } @@ -227,7 +227,7 @@ impl HFVotes { /// Configuration for hard-forks. /// -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct HardForkConfig { /// The network we are on. network: Network, @@ -235,8 +235,8 @@ pub struct HardForkConfig { window: u64, } -impl Default for HardForkConfig { - fn default() -> Self { +impl HardForkConfig { + pub fn main_net() -> HardForkConfig { Self { network: Network::Mainnet, window: DEFAULT_WINDOW_SIZE, @@ -245,8 +245,8 @@ impl Default for HardForkConfig { } /// A struct that keeps track of the current hard-fork and current votes. -#[derive(Debug)] -pub struct HardForks { +#[derive(Debug, Clone)] +pub struct HardForkState { current_hardfork: HardFork, next_hardfork: Option, @@ -256,14 +256,11 @@ pub struct HardForks { last_height: u64, } -impl HardForks { +impl HardForkState { pub async fn init( config: HardForkConfig, mut database: D, - ) -> Result - where - D::Future: Send + 'static, - { + ) -> Result { let DatabaseResponse::ChainHeight(chain_height) = database .ready() .await? @@ -273,22 +270,19 @@ impl HardForks { panic!("Database sent incorrect response") }; - let mut hfs = - HardForks::init_at_chain_height(config, chain_height, database.clone()).await?; - - tracing::info!("HardFork state: {:?}", hfs); + let hfs = HardForkState::init_from_chain_height(config, chain_height, database).await?; Ok(hfs) } - pub async fn init_at_chain_height( + #[instrument(name = "init_hardfork_state", skip(config, database), level = "info")] + pub async fn init_from_chain_height( config: HardForkConfig, chain_height: u64, mut database: D, - ) -> Result - where - D::Future: Send + 'static, - { + ) -> Result { + tracing::info!("Initializing hard-fork state this may take a while."); + let block_start = chain_height.saturating_sub(config.window); let votes = get_votes_in_range(database.clone(), block_start..chain_height).await?; @@ -297,10 +291,10 @@ impl HardForks { debug_assert_eq!(votes.total_votes(), config.window) } - let DatabaseResponse::BlockHfInfo(hf_info) = database + let DatabaseResponse::BlockHFInfo(hf_info) = database .ready() .await? - .call(DatabaseRequest::BlockPOWInfo((chain_height - 1).into())) + .call(DatabaseRequest::BlockHFInfo((chain_height - 1).into())) .await? else { panic!("Database sent incorrect response!"); @@ -310,7 +304,7 @@ impl HardForks { let next_hardfork = current_hardfork.next_fork(); - let mut hfs = HardForks { + let mut hfs = HardForkState { config, current_hardfork, next_hardfork, @@ -320,14 +314,18 @@ impl HardForks { hfs.check_set_new_hf(); - tracing::info!("HardFork state: {:?}", hfs); + tracing::info!( + "Initialized Hfs, current fork: {:?}, {}", + hfs.current_hardfork, + hfs.votes + ); Ok(hfs) } pub fn check_block_version_vote(&self, block_hf_info: &BlockHFInfo) -> bool { - &self.current_hardfork == &block_hf_info.version - && &block_hf_info.vote >= &self.current_hardfork + self.current_hardfork == block_hf_info.version + && block_hf_info.vote >= self.current_hardfork } pub async fn new_block( @@ -350,10 +348,10 @@ impl HardForks { for height_to_remove in (self.config.window..self.votes.total_votes()).map(|offset| height - offset) { - let DatabaseResponse::BlockHfInfo(hf_info) = database + let DatabaseResponse::BlockHFInfo(hf_info) = database .ready() .await? - .call(DatabaseRequest::BlockPOWInfo(height_to_remove.into())) + .call(DatabaseRequest::BlockHFInfo(height_to_remove.into())) .await? else { panic!("Database sent incorrect response!"); @@ -400,13 +398,10 @@ impl HardForks { } #[instrument(name = "get_votes", skip(database))] -async fn get_votes_in_range( +async fn get_votes_in_range( database: D, block_heights: Range, -) -> Result -where - D::Future: Send + 'static, -{ +) -> Result { let mut votes = HFVotes::default(); let DatabaseResponse::BlockHfInfoInRange(vote_list) = database diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index dbf7a28c..b35e4f30 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -2,8 +2,9 @@ pub mod block; pub mod genesis; pub mod hardforks; pub mod miner_tx; -#[cfg(feature = "rpc")] +#[cfg(feature = "binaries")] pub mod rpc; +pub mod state; #[derive(Debug, thiserror::Error)] pub enum ConsensusError { @@ -34,11 +35,14 @@ pub enum DatabaseRequest { BlockPOWInfoInRange(std::ops::Range), ChainHeight, + + #[cfg(feature = "binaries")] + BlockBatchInRange(std::ops::Range), } #[derive(Debug)] pub enum DatabaseResponse { - BlockHfInfo(hardforks::BlockHFInfo), + BlockHFInfo(hardforks::BlockHFInfo), BlockPOWInfo(block::pow::BlockPOWInfo), BlockWeights(block::weight::BlockWeightInfo), @@ -47,4 +51,7 @@ pub enum DatabaseResponse { BlockPOWInfoInRange(Vec), ChainHeight(u64), + + #[cfg(feature = "binaries")] + BlockBatchInRange(Vec), } diff --git a/consensus/src/rpc.rs b/consensus/src/rpc.rs index 46df90bb..384dfc68 100644 --- a/consensus/src/rpc.rs +++ b/consensus/src/rpc.rs @@ -7,6 +7,7 @@ use std::task::{Context, Poll}; use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture}; use futures::{stream::FuturesOrdered, FutureExt, TryFutureExt, TryStreamExt}; +use monero_serai::block::Block; use monero_serai::rpc::{HttpRpc, RpcConnection, RpcError}; use serde::Deserialize; use serde_json::json; @@ -21,7 +22,7 @@ use crate::block::weight::BlockWeightInfo; use crate::hardforks::BlockHFInfo; use crate::{DatabaseRequest, DatabaseResponse}; -const MAX_BLOCKS_IN_RANGE: u64 = 50; +pub const MAX_BLOCKS_IN_RANGE: u64 = 50; #[derive(Clone)] pub struct Attempts(u64); @@ -85,6 +86,21 @@ where let this = self.rpcs.clone(); match req { + DatabaseRequest::BlockBatchInRange(range) => { + let resp_to_ret = |resp: DatabaseResponse| { + let DatabaseResponse::BlockBatchInRange(pow_info) = resp else { + panic!("Database sent incorrect response"); + }; + pow_info + }; + split_range_request( + this, + range, + DatabaseRequest::BlockBatchInRange, + DatabaseResponse::BlockBatchInRange, + resp_to_ret, + ) + } DatabaseRequest::BlockPOWInfoInRange(range) => { let resp_to_ret = |resp: DatabaseResponse| { let DatabaseResponse::BlockPOWInfoInRange(pow_info) = resp else { @@ -263,10 +279,50 @@ impl tower::Service f DatabaseRequest::BlockPOWInfoInRange(range) => { get_blocks_pow_info_in_range(range, rpc).boxed() } + DatabaseRequest::BlockBatchInRange(range) => get_blocks_in_range(range, rpc).boxed(), } } } +async fn get_blocks_in_range( + range: Range, + rpc: OwnedMutexGuard>, +) -> Result { + tracing::info!("Getting blocks in range: {:?}", range); + + mod i_64079 { + use epee_encoding::EpeeObject; + + #[derive(EpeeObject)] + pub struct Request { + pub heights: Vec, + } + + #[derive(EpeeObject)] + pub struct Response { + pub blocks: Vec>, + } + } + use i_64079::*; + + let res = rpc + .bin_call( + "get_blocks_by_height.bin", + epee_encoding::to_bytes(&Request { + heights: range.collect(), + })?, + ) + .await?; + let res: Response = epee_encoding::from_bytes(&res)?; + + Ok(DatabaseResponse::BlockBatchInRange( + res.blocks + .into_iter() + .map(|buf| Block::read(&mut buf.as_slice())) + .collect::>()?, + )) +} + #[derive(Deserialize, Debug)] struct BlockInfo { cumulative_difficulty: u64, @@ -295,7 +351,7 @@ async fn get_block_info_in_range( ) .await?; - tracing::debug!("Retrieved blocks in range: {:?}", range); + tracing::info!("Retrieved block headers in range: {:?}", range); Ok(res.headers) } @@ -304,7 +360,7 @@ async fn get_block_info( id: BlockID, rpc: OwnedMutexGuard>, ) -> Result { - tracing::debug!("Retrieving block info with id: {}", id); + tracing::info!("Retrieving block info with id: {}", id); #[derive(Deserialize, Debug)] struct Response { @@ -403,7 +459,7 @@ async fn get_blocks_hf_info( ) -> Result { let info = get_block_info(id, rpc).await?; - Ok(DatabaseResponse::BlockHfInfo( + Ok(DatabaseResponse::BlockHFInfo( BlockHFInfo::from_major_minor(info.major_version, info.minor_version)?, )) }