add logic to build all caches synchronously

This commit is contained in:
Boog900 2023-10-03 22:10:31 +01:00
parent eb3c727b4d
commit d5595b7eaf
No known key found for this signature in database
GPG key ID: 5401367FB7302004
9 changed files with 240 additions and 80 deletions

View file

@ -2,7 +2,7 @@
pub mod network;
pub mod pruning;
use std::fmt::{Formatter, Write};
use std::fmt::Formatter;
//pub use hardforks::HardForks;
pub use network::Network;
pub use pruning::{PruningError, PruningSeed};

View file

@ -8,8 +8,9 @@ const STAGENET_NETWORK_ID: [u8; 16] = [
0x12, 0x30, 0xF1, 0x71, 0x61, 0x04, 0x41, 0x61, 0x17, 0x31, 0x00, 0x82, 0x16, 0xA1, 0xA1, 0x12,
];
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, Default)]
pub enum Network {
#[default]
Mainnet,
Testnet,
Stagenet,

View file

@ -9,14 +9,14 @@ repository = "https://github.com/Cuprate/cuprate/tree/main/consensus"
[features]
default = ["binaries"]
binaries = ["rpc", "dep:tokio", "dep:tracing-subscriber", "tower/retry", "tower/balance", "tower/buffer"]
rpc = ["dep:futures", "dep:serde_json", "dep:serde"]
binaries = ["dep:tokio", "dep:tracing-subscriber", "tower/retry", "tower/balance", "tower/buffer", "dep:serde_json", "dep:serde", "dep:epee-encoding"]
[dependencies]
hex = "0.4"
thiserror = "1"
tower = {version = "0.4", features = ["util"]}
tracing = "0.1"
futures = "0.3"
crypto-bigint = "0.5"
@ -26,11 +26,10 @@ monero-serai = {git="https://github.com/Cuprate/serai.git", rev = "84b77b1"}
cuprate-common = {path = "../common"}
cryptonight-cuprate = {path = "../cryptonight"}
# used for rpc
futures = {version = "0.3", optional = true}
# used in binaries
epee-encoding = {version = "0.5", optional = true}
serde_json = {version = "1", optional = true}
serde = {version = "1", optional = true, features = ["derive"]}
# used in binaries
tokio = { version = "1", features = ["rt-multi-thread", "macros"], optional = true }
tracing-subscriber = {version = "0.3", optional = true}
# here to help cargo to pick a version - remove me

View file

@ -1,17 +1,123 @@
#![cfg(feature = "binaries")]
use tower::ServiceExt;
use cuprate_common::Network;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use tower::{Service, ServiceExt};
use tracing::instrument;
use tracing::level_filters::LevelFilter;
use monero_consensus::block::{pow::difficulty::DifficultyCache, weight::BlockWeightsCache};
use monero_consensus::hardforks::HardFork;
use monero_consensus::rpc::init_rpc_load_balancer;
use monero_consensus::{DatabaseRequest, DatabaseResponse};
use monero_consensus::rpc::{init_rpc_load_balancer, MAX_BLOCKS_IN_RANGE};
use monero_consensus::{
state::{Config, State},
ConsensusError, Database, DatabaseRequest, DatabaseResponse,
};
/// A cache which can keep chain state while scanning.
///
/// Because we are using a RPC interface with node we need to keep track
/// of certain data that node doesn't hold like the number of outputs at
/// a certain time.
#[derive(Debug, Clone)]
struct ScanningCache {
network: Network,
numb_outs: HashMap<u64, u64>,
/// The height of the *next* block to scan.
height: u64,
}
impl Default for ScanningCache {
fn default() -> Self {
ScanningCache {
network: Default::default(),
numb_outs: Default::default(),
height: 1,
}
}
}
impl ScanningCache {
fn total_outs(&self) -> u64 {
self.numb_outs.values().sum()
}
fn numb_outs(&self, amount: u64) -> u64 {
*self.numb_outs.get(&amount).unwrap_or(&0)
}
fn add_outs(&mut self, amount: u64, count: u64) {
if let Some(numb_outs) = self.numb_outs.get_mut(&amount) {
*numb_outs += count;
} else {
self.numb_outs.insert(amount, count);
}
}
}
impl Display for ScanningCache {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let rct_outs = self.numb_outs(0);
let total_outs = self.total_outs();
f.debug_struct("Cache")
.field("next_block", &self.height)
.field("rct_outs", &rct_outs)
.field("total_outs", &total_outs)
.finish()
}
}
#[instrument(skip_all, level = "info")]
async fn scan_chain<D: Database + Clone>(
cache: ScanningCache,
network: Network,
mut database: D,
) -> Result<(), ConsensusError> {
tracing::info!("Beginning chain scan, {}", &cache);
let DatabaseResponse::ChainHeight(chain_height) = database
.ready()
.await?
.call(DatabaseRequest::ChainHeight)
.await?
else {
panic!("Database sent incorrect response!");
};
tracing::info!("scanning to chain height: {}", chain_height);
let config = match network {
Network::Mainnet => Config::main_net(),
_ => todo!(),
};
let _state = State::init_at_chain_height(config, cache.height, database.clone()).await?;
tracing::info!("Initialised state, begging scan");
for height in (cache.height..chain_height).step_by(MAX_BLOCKS_IN_RANGE as usize) {
let DatabaseResponse::BlockBatchInRange(_blocks) = database
.ready()
.await?
.call(DatabaseRequest::BlockBatchInRange(
height..(height + MAX_BLOCKS_IN_RANGE).max(chain_height),
))
.await?
else {
panic!("Database sent incorrect response!");
};
}
Ok(())
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_max_level(LevelFilter::DEBUG)
.with_max_level(LevelFilter::INFO)
.init();
let urls = vec![
@ -32,29 +138,10 @@ async fn main() {
"http://node.c3pool.com:18081".to_string(),
];
let mut rpc = init_rpc_load_balancer(urls);
let rpc = init_rpc_load_balancer(urls);
let mut difficulty = DifficultyCache::init_from_chain_height(2985610, rpc.clone())
.await
.unwrap();
/*
let DatabaseResponse::BlockWeights(weights) = rpc
.oneshot(DatabaseRequest::BlockWeights(2985610.into()))
.await
.unwrap()
else {
panic!()
};
let network = Network::Mainnet;
let cache = ScanningCache::default();
assert_eq!(
weights.long_term_weight,
difficulty.next_block_long_term_weight(&HardFork::V16, weights.block_weight)
);
*/
println!("{:?}", difficulty.next_difficulty(&HardFork::V16)); //774466376
//let _hfs = HardForks::init_at_chain_height(HardForkConfig::default(), 1009827, rpc.clone())
// .await
// .unwrap();
scan_chain(cache, network, rpc).await.unwrap();
}

View file

@ -1,5 +1,5 @@
use futures::stream::FuturesOrdered;
use futures::{TryFutureExt, TryStreamExt};
use std::ops::Range;
use tower::ServiceExt;
use tracing::instrument;
@ -21,7 +21,7 @@ const DIFFICULTY_BLOCKS_COUNT: u64 = (DIFFICULTY_WINDOW + DIFFICULTY_LAG) as u64
const DIFFICULTY_ACCOUNTED_WINDOW_LEN: usize = DIFFICULTY_WINDOW - 2 * DIFFICULTY_CUT;
/// This struct is able to calculate difficulties from blockchain information.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DifficultyCache {
/// The list of timestamps in the window.
/// len <= [`DIFFICULTY_BLOCKS_COUNT`]
@ -52,6 +52,8 @@ impl DifficultyCache {
chain_height: u64,
mut database: D,
) -> Result<Self, ConsensusError> {
tracing::info!("Initializing difficulty cache this may take a while.");
let mut block_start = chain_height.saturating_sub(DIFFICULTY_BLOCKS_COUNT);
if block_start == 0 {
@ -122,6 +124,10 @@ impl DifficultyCache {
&mut self,
mut database: D,
) -> Result<(), ConsensusError> {
if self.last_accounted_height == 0 {
return Ok(());
}
let mut block_start =
(self.last_accounted_height + 1).saturating_sub(DIFFICULTY_BLOCKS_COUNT);
@ -189,11 +195,13 @@ fn get_window_start_and_end(window_len: usize) -> (usize, usize) {
}
}
#[instrument(name = "get_blocks_timestamps", skip(database))]
#[instrument(name = "get_blocks_timestamps", skip(database), level = "info")]
async fn get_blocks_in_range_timestamps<D: Database + Clone>(
database: D,
block_heights: Range<u64>,
) -> Result<Vec<u64>, ConsensusError> {
tracing::info!("Getting blocks timestamps");
let DatabaseResponse::BlockPOWInfoInRange(pow_infos) = database
.oneshot(DatabaseRequest::BlockPOWInfoInRange(block_heights))
.await?

View file

@ -57,6 +57,7 @@ pub fn penalty_free_zone(hf: &HardFork) -> usize {
///
/// These calculations require a lot of data from the database so by caching
/// this data it reduces the load on the database.
#[derive(Clone)]
pub struct BlockWeightsCache {
/// This list is not sorted.
short_term_block_weights: VecDeque<usize>,
@ -87,6 +88,8 @@ impl BlockWeightsCache {
chain_height: u64,
database: D,
) -> Result<Self, ConsensusError> {
tracing::info!("Initializing weight cache this may take a while.");
let mut long_term_weights = get_long_term_weight_in_range(
chain_height.saturating_sub(LONG_TERM_WINDOW)..chain_height,
database.clone(),
@ -266,6 +269,8 @@ async fn get_blocks_weight_in_range<D: Database + Clone>(
range: Range<u64>,
database: D,
) -> Result<Vec<usize>, ConsensusError> {
tracing::info!("getting block weights.");
let DatabaseResponse::BlockWeightsInRange(weights) = database
.oneshot(DatabaseRequest::BlockWeightsInRange(range))
.await?
@ -276,11 +281,13 @@ async fn get_blocks_weight_in_range<D: Database + Clone>(
Ok(weights.into_iter().map(|info| info.block_weight).collect())
}
#[instrument(name = "get_long_term_weights", skip(database))]
#[instrument(name = "get_long_term_weights", skip(database), level = "info")]
async fn get_long_term_weight_in_range<D: Database + Clone>(
range: Range<u64>,
database: D,
) -> Result<Vec<usize>, ConsensusError> {
tracing::info!("getting block long term weights.");
let DatabaseResponse::BlockWeightsInRange(weights) = database
.oneshot(DatabaseRequest::BlockWeightsInRange(range))
.await?

View file

@ -167,7 +167,7 @@ impl HardFork {
}
/// A struct holding the current voting state of the blockchain.
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
struct HFVotes {
votes: [u64; 16],
}
@ -227,7 +227,7 @@ impl HFVotes {
/// Configuration for hard-forks.
///
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct HardForkConfig {
/// The network we are on.
network: Network,
@ -235,8 +235,8 @@ pub struct HardForkConfig {
window: u64,
}
impl Default for HardForkConfig {
fn default() -> Self {
impl HardForkConfig {
pub fn main_net() -> HardForkConfig {
Self {
network: Network::Mainnet,
window: DEFAULT_WINDOW_SIZE,
@ -245,8 +245,8 @@ impl Default for HardForkConfig {
}
/// A struct that keeps track of the current hard-fork and current votes.
#[derive(Debug)]
pub struct HardForks {
#[derive(Debug, Clone)]
pub struct HardForkState {
current_hardfork: HardFork,
next_hardfork: Option<HardFork>,
@ -256,14 +256,11 @@ pub struct HardForks {
last_height: u64,
}
impl HardForks {
impl HardForkState {
pub async fn init<D: Database + Clone>(
config: HardForkConfig,
mut database: D,
) -> Result<Self, ConsensusError>
where
D::Future: Send + 'static,
{
) -> Result<Self, ConsensusError> {
let DatabaseResponse::ChainHeight(chain_height) = database
.ready()
.await?
@ -273,22 +270,19 @@ impl HardForks {
panic!("Database sent incorrect response")
};
let mut hfs =
HardForks::init_at_chain_height(config, chain_height, database.clone()).await?;
tracing::info!("HardFork state: {:?}", hfs);
let hfs = HardForkState::init_from_chain_height(config, chain_height, database).await?;
Ok(hfs)
}
pub async fn init_at_chain_height<D: Database + Clone>(
#[instrument(name = "init_hardfork_state", skip(config, database), level = "info")]
pub async fn init_from_chain_height<D: Database + Clone>(
config: HardForkConfig,
chain_height: u64,
mut database: D,
) -> Result<Self, ConsensusError>
where
D::Future: Send + 'static,
{
) -> Result<Self, ConsensusError> {
tracing::info!("Initializing hard-fork state this may take a while.");
let block_start = chain_height.saturating_sub(config.window);
let votes = get_votes_in_range(database.clone(), block_start..chain_height).await?;
@ -297,10 +291,10 @@ impl HardForks {
debug_assert_eq!(votes.total_votes(), config.window)
}
let DatabaseResponse::BlockHfInfo(hf_info) = database
let DatabaseResponse::BlockHFInfo(hf_info) = database
.ready()
.await?
.call(DatabaseRequest::BlockPOWInfo((chain_height - 1).into()))
.call(DatabaseRequest::BlockHFInfo((chain_height - 1).into()))
.await?
else {
panic!("Database sent incorrect response!");
@ -310,7 +304,7 @@ impl HardForks {
let next_hardfork = current_hardfork.next_fork();
let mut hfs = HardForks {
let mut hfs = HardForkState {
config,
current_hardfork,
next_hardfork,
@ -320,14 +314,18 @@ impl HardForks {
hfs.check_set_new_hf();
tracing::info!("HardFork state: {:?}", hfs);
tracing::info!(
"Initialized Hfs, current fork: {:?}, {}",
hfs.current_hardfork,
hfs.votes
);
Ok(hfs)
}
pub fn check_block_version_vote(&self, block_hf_info: &BlockHFInfo) -> bool {
&self.current_hardfork == &block_hf_info.version
&& &block_hf_info.vote >= &self.current_hardfork
self.current_hardfork == block_hf_info.version
&& block_hf_info.vote >= self.current_hardfork
}
pub async fn new_block<D: Database>(
@ -350,10 +348,10 @@ impl HardForks {
for height_to_remove in
(self.config.window..self.votes.total_votes()).map(|offset| height - offset)
{
let DatabaseResponse::BlockHfInfo(hf_info) = database
let DatabaseResponse::BlockHFInfo(hf_info) = database
.ready()
.await?
.call(DatabaseRequest::BlockPOWInfo(height_to_remove.into()))
.call(DatabaseRequest::BlockHFInfo(height_to_remove.into()))
.await?
else {
panic!("Database sent incorrect response!");
@ -400,13 +398,10 @@ impl HardForks {
}
#[instrument(name = "get_votes", skip(database))]
async fn get_votes_in_range<D: Database + Clone>(
async fn get_votes_in_range<D: Database>(
database: D,
block_heights: Range<u64>,
) -> Result<HFVotes, ConsensusError>
where
D::Future: Send + 'static,
{
) -> Result<HFVotes, ConsensusError> {
let mut votes = HFVotes::default();
let DatabaseResponse::BlockHfInfoInRange(vote_list) = database

View file

@ -2,8 +2,9 @@ pub mod block;
pub mod genesis;
pub mod hardforks;
pub mod miner_tx;
#[cfg(feature = "rpc")]
#[cfg(feature = "binaries")]
pub mod rpc;
pub mod state;
#[derive(Debug, thiserror::Error)]
pub enum ConsensusError {
@ -34,11 +35,14 @@ pub enum DatabaseRequest {
BlockPOWInfoInRange(std::ops::Range<u64>),
ChainHeight,
#[cfg(feature = "binaries")]
BlockBatchInRange(std::ops::Range<u64>),
}
#[derive(Debug)]
pub enum DatabaseResponse {
BlockHfInfo(hardforks::BlockHFInfo),
BlockHFInfo(hardforks::BlockHFInfo),
BlockPOWInfo(block::pow::BlockPOWInfo),
BlockWeights(block::weight::BlockWeightInfo),
@ -47,4 +51,7 @@ pub enum DatabaseResponse {
BlockPOWInfoInRange(Vec<block::pow::BlockPOWInfo>),
ChainHeight(u64),
#[cfg(feature = "binaries")]
BlockBatchInRange(Vec<monero_serai::block::Block>),
}

View file

@ -7,6 +7,7 @@ use std::task::{Context, Poll};
use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture};
use futures::{stream::FuturesOrdered, FutureExt, TryFutureExt, TryStreamExt};
use monero_serai::block::Block;
use monero_serai::rpc::{HttpRpc, RpcConnection, RpcError};
use serde::Deserialize;
use serde_json::json;
@ -21,7 +22,7 @@ use crate::block::weight::BlockWeightInfo;
use crate::hardforks::BlockHFInfo;
use crate::{DatabaseRequest, DatabaseResponse};
const MAX_BLOCKS_IN_RANGE: u64 = 50;
pub const MAX_BLOCKS_IN_RANGE: u64 = 50;
#[derive(Clone)]
pub struct Attempts(u64);
@ -85,6 +86,21 @@ where
let this = self.rpcs.clone();
match req {
DatabaseRequest::BlockBatchInRange(range) => {
let resp_to_ret = |resp: DatabaseResponse| {
let DatabaseResponse::BlockBatchInRange(pow_info) = resp else {
panic!("Database sent incorrect response");
};
pow_info
};
split_range_request(
this,
range,
DatabaseRequest::BlockBatchInRange,
DatabaseResponse::BlockBatchInRange,
resp_to_ret,
)
}
DatabaseRequest::BlockPOWInfoInRange(range) => {
let resp_to_ret = |resp: DatabaseResponse| {
let DatabaseResponse::BlockPOWInfoInRange(pow_info) = resp else {
@ -263,10 +279,50 @@ impl<R: RpcConnection + Send + Sync + 'static> tower::Service<DatabaseRequest> f
DatabaseRequest::BlockPOWInfoInRange(range) => {
get_blocks_pow_info_in_range(range, rpc).boxed()
}
DatabaseRequest::BlockBatchInRange(range) => get_blocks_in_range(range, rpc).boxed(),
}
}
}
async fn get_blocks_in_range<R: RpcConnection>(
range: Range<u64>,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> {
tracing::info!("Getting blocks in range: {:?}", range);
mod i_64079 {
use epee_encoding::EpeeObject;
#[derive(EpeeObject)]
pub struct Request {
pub heights: Vec<u64>,
}
#[derive(EpeeObject)]
pub struct Response {
pub blocks: Vec<Vec<u8>>,
}
}
use i_64079::*;
let res = rpc
.bin_call(
"get_blocks_by_height.bin",
epee_encoding::to_bytes(&Request {
heights: range.collect(),
})?,
)
.await?;
let res: Response = epee_encoding::from_bytes(&res)?;
Ok(DatabaseResponse::BlockBatchInRange(
res.blocks
.into_iter()
.map(|buf| Block::read(&mut buf.as_slice()))
.collect::<Result<_, _>>()?,
))
}
#[derive(Deserialize, Debug)]
struct BlockInfo {
cumulative_difficulty: u64,
@ -295,7 +351,7 @@ async fn get_block_info_in_range<R: RpcConnection>(
)
.await?;
tracing::debug!("Retrieved blocks in range: {:?}", range);
tracing::info!("Retrieved block headers in range: {:?}", range);
Ok(res.headers)
}
@ -304,7 +360,7 @@ async fn get_block_info<R: RpcConnection>(
id: BlockID,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<BlockInfo, tower::BoxError> {
tracing::debug!("Retrieving block info with id: {}", id);
tracing::info!("Retrieving block info with id: {}", id);
#[derive(Deserialize, Debug)]
struct Response {
@ -403,7 +459,7 @@ async fn get_blocks_hf_info<R: RpcConnection>(
) -> Result<DatabaseResponse, tower::BoxError> {
let info = get_block_info(id, rpc).await?;
Ok(DatabaseResponse::BlockHfInfo(
Ok(DatabaseResponse::BlockHFInfo(
BlockHFInfo::from_major_minor(info.major_version, info.minor_version)?,
))
}