add more config options for the verifier

This commit is contained in:
Boog900 2023-10-15 20:35:33 +01:00
parent 55b7699a82
commit 21f1448343
No known key found for this signature in database
GPG key ID: 5401367FB7302004
16 changed files with 714 additions and 242 deletions

View file

@ -31,7 +31,7 @@ futures = "0.3"
crypto-bigint = "0.5" crypto-bigint = "0.5"
randomx-rs = "1" randomx-rs = "1"
monero-serai = {git="https://github.com/Cuprate/serai.git", rev = "46f4370"} monero-serai = {git="https://github.com/Cuprate/serai.git", rev = "df4af7b"}
cuprate-common = {path = "../common"} cuprate-common = {path = "../common"}
cryptonight-cuprate = {path = "../cryptonight"} cryptonight-cuprate = {path = "../cryptonight"}
@ -46,3 +46,5 @@ tracing-subscriber = {version = "0.3", optional = true}
# here to help cargo to pick a version - remove me # here to help cargo to pick a version - remove me
syn = "2.0.37" syn = "2.0.37"
[profile.dev]
opt-level = 3

View file

@ -2,6 +2,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use std::sync::{Arc, RwLock};
use tower::ServiceExt; use tower::ServiceExt;
use tracing::instrument; use tracing::instrument;
@ -9,13 +10,16 @@ use tracing::level_filters::LevelFilter;
use cuprate_common::Network; use cuprate_common::Network;
use monero_consensus::rpc::{init_rpc_load_balancer, MAX_BLOCKS_IN_RANGE}; use monero_consensus::hardforks::HardFork;
use monero_consensus::rpc::{init_rpc_load_balancer, RpcConfig};
use monero_consensus::{ use monero_consensus::{
verifier::{Config, Verifier}, verifier::{Config, Verifier},
Database, DatabaseRequest, DatabaseResponse, Database, DatabaseRequest, DatabaseResponse,
}; };
const BATCH_SIZE: u64 = MAX_BLOCKS_IN_RANGE * 3; const INITIAL_MAX_BLOCKS_IN_RANGE: u64 = 250;
const MAX_BLOCKS_IN_RANGE: u64 = 1000;
const INITIAL_MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 250;
/// A cache which can keep chain state while scanning. /// A cache which can keep chain state while scanning.
/// ///
@ -70,11 +74,10 @@ impl Display for ScanningCache {
.finish() .finish()
} }
} }
#[instrument(skip_all, level = "info")]
async fn scan_chain<D: Database + Clone + Send + 'static>( async fn scan_chain<D: Database + Clone + Send + 'static>(
cache: ScanningCache, cache: ScanningCache,
network: Network, network: Network,
rpc_config: Arc<RwLock<RpcConfig>>,
mut database: D, mut database: D,
) -> Result<(), tower::BoxError> ) -> Result<(), tower::BoxError>
where where
@ -98,20 +101,28 @@ where
_ => todo!(), _ => todo!(),
}; };
let verifier = Verifier::init_at_chain_height(config, cache.height, database.clone()).await?; //let verifier = Verifier::init_at_chain_height(config, cache.height, database.clone()).await?;
tracing::info!("Initialised verifier, begging scan"); tracing::info!("Initialised verifier, begging scan");
let batch_size = rpc_config.read().unwrap().block_batch_size();
let mut next_fut = tokio::spawn(database.clone().ready().await?.call( let mut next_fut = tokio::spawn(database.clone().ready().await?.call(
DatabaseRequest::BlockBatchInRange( DatabaseRequest::BlockBatchInRange(
cache.height..(cache.height + BATCH_SIZE).min(chain_height), cache.height..(cache.height + batch_size).min(chain_height),
), ),
)); ));
for height in (cache.height..chain_height) let mut current_height = cache.height;
.step_by(BATCH_SIZE as usize) let mut next_batch_start_height = cache.height + batch_size;
.skip(1)
{ let mut time_to_verify_last_batch: u128 = 0;
let mut first = true;
while next_batch_start_height < chain_height {
let next_batch_size = rpc_config.read().unwrap().block_batch_size();
let time_to_retrieve_batch = std::time::Instant::now();
// Call the next batch while we handle this batch. // Call the next batch while we handle this batch.
let current_fut = std::mem::replace( let current_fut = std::mem::replace(
&mut next_fut, &mut next_fut,
@ -120,7 +131,8 @@ where
.ready() .ready()
.await? .await?
.call(DatabaseRequest::BlockBatchInRange( .call(DatabaseRequest::BlockBatchInRange(
height..(height + BATCH_SIZE).min(chain_height), next_batch_start_height
..(next_batch_start_height + next_batch_size).min(chain_height),
)), )),
), ),
); );
@ -129,15 +141,55 @@ where
panic!("Database sent incorrect response!"); panic!("Database sent incorrect response!");
}; };
for (block, txs) in blocks.into_iter() { let time_to_verify_batch = std::time::Instant::now();
println!("{}, {}", hex::encode(block.hash()), txs.len());
let time_to_retrieve_batch = time_to_retrieve_batch.elapsed().as_millis();
if time_to_retrieve_batch > 2000 && !first {
let mut conf = rpc_config.write().unwrap();
conf.max_blocks_per_node = (conf.max_blocks_per_node
* TryInto::<u64>::try_into(
time_to_verify_last_batch
/ (time_to_verify_last_batch + time_to_retrieve_batch),
)
.unwrap())
.max(10_u64)
.min(MAX_BLOCKS_IN_RANGE);
tracing::info!("Decreasing batch size to: {}", conf.max_blocks_per_node);
} else if time_to_retrieve_batch < 100 {
let mut conf = rpc_config.write().unwrap();
conf.max_blocks_per_node = (conf.max_blocks_per_node * 2)
.max(10_u64)
.min(MAX_BLOCKS_IN_RANGE);
tracing::info!("Increasing batch size to: {}", conf.max_blocks_per_node);
} }
first = false;
tracing::info!( tracing::info!(
"Moving onto next batch: {:?}, chain height: {}", "Handling batch: {:?}, chain height: {}",
height..(height + BATCH_SIZE).min(chain_height), current_height..(current_height + blocks.len() as u64),
chain_height chain_height
); );
for (block, txs) in blocks.into_iter() {
let pow_hash = monero_consensus::block::pow::calculate_pow_hash(
&block.serialize_hashable(),
block.number() as u64,
&HardFork::V1,
);
tracing::info!(
"Verified block: {}, numb txs: {}",
current_height,
txs.len()
);
current_height += 1;
next_batch_start_height += 1;
}
time_to_verify_last_batch = time_to_verify_batch.elapsed().as_millis();
} }
Ok(()) Ok(())
@ -155,12 +207,33 @@ async fn main() {
"http://nodex.monerujo.io:18081".to_string(), "http://nodex.monerujo.io:18081".to_string(),
"http://nodes.hashvault.pro:18081".to_string(), "http://nodes.hashvault.pro:18081".to_string(),
"http://node.c3pool.com:18081".to_string(), "http://node.c3pool.com:18081".to_string(),
"http://node.trocador.app:18089".to_string(),
"http://xmr.lukas.services:18089".to_string(),
"http://xmr-node-eu.cakewallet.com:18081".to_string(),
"http://38.105.209.54:18089".to_string(),
"http://68.118.241.70:18089".to_string(),
"http://145.239.97.211:18089".to_string(),
//
"http://xmr-node.cakewallet.com:18081".to_string(),
"http://node.sethforprivacy.com".to_string(),
"http://nodex.monerujo.io:18081".to_string(),
"http://nodes.hashvault.pro:18081".to_string(),
"http://node.c3pool.com:18081".to_string(),
"http://node.trocador.app:18089".to_string(),
"http://xmr.lukas.services:18089".to_string(),
"http://xmr-node-eu.cakewallet.com:18081".to_string(),
"http://38.105.209.54:18089".to_string(),
"http://68.118.241.70:18089".to_string(),
"http://145.239.97.211:18089".to_string(),
]; ];
let rpc = init_rpc_load_balancer(urls); let rpc_config = RpcConfig::new(10, INITIAL_MAX_BLOCKS_HEADERS_IN_RANGE);
let rpc_config = Arc::new(RwLock::new(rpc_config));
let rpc = init_rpc_load_balancer(urls, rpc_config.clone());
let network = Network::Mainnet; let network = Network::Mainnet;
let cache = ScanningCache::default(); let cache = ScanningCache::default();
scan_chain(cache, network, rpc).await.unwrap(); scan_chain(cache, network, rpc_config, rpc).await.unwrap();
} }

View file

@ -1,5 +1,77 @@
use monero_serai::block::Block;
use crate::{hardforks::BlockHFInfo, helper::current_time, ConsensusError};
pub mod difficulty;
pub mod pow; pub mod pow;
pub mod reward;
pub mod weight; pub mod weight;
pub use pow::{check_block_pow, difficulty::DifficultyCache, BlockPOWInfo}; pub use difficulty::{DifficultyCache, DifficultyCacheConfig};
pub use weight::{block_weight, BlockWeightInfo, BlockWeightsCache}; pub use pow::{check_block_pow, BlockPOWInfo};
pub use weight::{block_weight, BlockWeightInfo, BlockWeightsCache, BlockWeightsCacheConfig};
const BLOCK_SIZE_SANITY_LEEWAY: usize = 100;
const BLOCK_FUTURE_TIME_LIMIT: u64 = 60 * 60 * 2;
pub struct BlockVerificationData {
hf: BlockHFInfo,
pow: BlockPOWInfo,
weights: BlockWeightInfo,
block_blob: Vec<u8>,
block: Block,
block_hash: [u8; 32],
pow_hash: [u8; 32],
}
/// Sanity check on the block blob size.
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks.html#block-weight-and-size
fn block_size_sanity_check(
block_blob_len: usize,
effective_median: usize,
) -> Result<(), ConsensusError> {
if block_blob_len > effective_median * 2 + BLOCK_SIZE_SANITY_LEEWAY {
Err(ConsensusError::BlockIsTooLarge)
} else {
Ok(())
}
}
/// Sanity check on the block weight.
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks.html#block-weight-and-siz
fn block_weight_check(
block_weight: usize,
median_for_block_reward: usize,
) -> Result<(), ConsensusError> {
if block_weight > median_for_block_reward * 2 {
Err(ConsensusError::BlockIsTooLarge)
} else {
Ok(())
}
}
/// Verifies the previous id is the last blocks hash
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks.html#previous-id
fn check_prev_id(block: &Block, top_hash: &[u8; 32]) -> Result<(), ConsensusError> {
if &block.header.previous != top_hash {
Err(ConsensusError::BlockIsNotApartOfChain)
} else {
Ok(())
}
}
/// Checks the blocks timestamp is in the valid range.
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks.html#timestamp
fn check_timestamp(block: &Block, median_timestamp: u64) -> Result<(), ConsensusError> {
if block.header.timestamp < median_timestamp
|| block.header.timestamp > current_time() + BLOCK_FUTURE_TIME_LIMIT
{
return Err(ConsensusError::BlockTimestampInvalid);
} else {
Ok(())
}
}

View file

@ -1,10 +1,13 @@
use std::collections::VecDeque;
use std::ops::Range; use std::ops::Range;
use tower::ServiceExt; use tower::ServiceExt;
use tracing::instrument; use tracing::instrument;
use crate::{hardforks::HardFork, ConsensusError, Database, DatabaseRequest, DatabaseResponse}; use crate::{
hardforks::HardFork, helper::median, ConsensusError, Database, DatabaseRequest,
DatabaseResponse,
};
/// The amount of blocks we account for to calculate difficulty /// The amount of blocks we account for to calculate difficulty
const DIFFICULTY_WINDOW: usize = 720; const DIFFICULTY_WINDOW: usize = 720;
@ -15,26 +18,62 @@ const DIFFICULTY_CUT: usize = 60;
/// The amount of blocks we add onto the window before doing any calculations so that the /// The amount of blocks we add onto the window before doing any calculations so that the
/// difficulty lags by this amount of blocks /// difficulty lags by this amount of blocks
const DIFFICULTY_LAG: usize = 15; const DIFFICULTY_LAG: usize = 15;
/// The total amount of blocks we need to track to calculate difficulty
const DIFFICULTY_BLOCKS_COUNT: u64 = (DIFFICULTY_WINDOW + DIFFICULTY_LAG) as u64; /// Configuration for the difficulty cache.
/// The amount of blocks we account for after removing the outliers. ///
const DIFFICULTY_ACCOUNTED_WINDOW_LEN: usize = DIFFICULTY_WINDOW - 2 * DIFFICULTY_CUT; #[derive(Debug, Clone)]
pub struct DifficultyCacheConfig {
window: usize,
cut: usize,
lag: usize,
}
impl DifficultyCacheConfig {
pub fn new(window: usize, cut: usize, lag: usize) -> DifficultyCacheConfig {
DifficultyCacheConfig { window, cut, lag }
}
/// Returns the total amount of blocks we need to track to calculate difficulty
pub fn total_block_count(&self) -> u64 {
(self.window + self.lag).try_into().unwrap()
}
/// The amount of blocks we account for after removing the outliers.
pub fn accounted_window_len(&self) -> usize {
self.window - 2 * self.cut
}
pub fn main_net() -> DifficultyCacheConfig {
DifficultyCacheConfig {
window: DIFFICULTY_WINDOW,
cut: DIFFICULTY_CUT,
lag: DIFFICULTY_LAG,
}
}
}
/// This struct is able to calculate difficulties from blockchain information. /// This struct is able to calculate difficulties from blockchain information.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct DifficultyCache { pub struct DifficultyCache {
/// The list of timestamps in the window. /// The list of timestamps in the window.
/// len <= [`DIFFICULTY_BLOCKS_COUNT`] /// len <= [`DIFFICULTY_BLOCKS_COUNT`]
timestamps: Vec<u64>, timestamps: VecDeque<u64>,
/// The work done in the [`DIFFICULTY_ACCOUNTED_WINDOW_LEN`] window, this is an optimisation /// The work done in the [`DIFFICULTY_ACCOUNTED_WINDOW_LEN`] window, this is an optimisation
/// so we don't need to keep track of cumulative difficulties as well as timestamps. /// so we don't need to keep track of cumulative difficulties as well as timestamps.
windowed_work: u128, windowed_work: u128,
/// The current cumulative difficulty of the chain.
cumulative_difficulty: u128,
/// The last height we accounted for. /// The last height we accounted for.
last_accounted_height: u64, last_accounted_height: u64,
/// The config
config: DifficultyCacheConfig,
} }
impl DifficultyCache { impl DifficultyCache {
pub async fn init<D: Database + Clone>(mut database: D) -> Result<Self, ConsensusError> { pub async fn init<D: Database + Clone>(
config: DifficultyCacheConfig,
mut database: D,
) -> Result<Self, ConsensusError> {
let DatabaseResponse::ChainHeight(chain_height) = database let DatabaseResponse::ChainHeight(chain_height) = database
.ready() .ready()
.await? .await?
@ -44,17 +83,18 @@ impl DifficultyCache {
panic!("Database sent incorrect response") panic!("Database sent incorrect response")
}; };
DifficultyCache::init_from_chain_height(chain_height, database).await DifficultyCache::init_from_chain_height(chain_height, config, database).await
} }
#[instrument(name = "init_difficulty_cache", level = "info", skip(database))] #[instrument(name = "init_difficulty_cache", level = "info", skip(database, config))]
pub async fn init_from_chain_height<D: Database + Clone>( pub async fn init_from_chain_height<D: Database + Clone>(
chain_height: u64, chain_height: u64,
config: DifficultyCacheConfig,
mut database: D, mut database: D,
) -> Result<Self, ConsensusError> { ) -> Result<Self, ConsensusError> {
tracing::info!("Initializing difficulty cache this may take a while."); tracing::info!("Initializing difficulty cache this may take a while.");
let mut block_start = chain_height.saturating_sub(DIFFICULTY_BLOCKS_COUNT); let mut block_start = chain_height.saturating_sub(config.total_block_count());
if block_start == 0 { if block_start == 0 {
block_start = 1; block_start = 1;
@ -66,7 +106,9 @@ impl DifficultyCache {
let mut diff = DifficultyCache { let mut diff = DifficultyCache {
timestamps, timestamps,
windowed_work: 0, windowed_work: 0,
cumulative_difficulty: 0,
last_accounted_height: chain_height - 1, last_accounted_height: chain_height - 1,
config,
}; };
diff.update_windowed_work(&mut database).await?; diff.update_windowed_work(&mut database).await?;
@ -80,44 +122,21 @@ impl DifficultyCache {
Ok(diff) Ok(diff)
} }
pub async fn resync<D: Database + Clone>( pub async fn new_block<D: Database>(
&mut self, &mut self,
mut database: D, height: u64,
timestamp: u64,
database: D,
) -> Result<(), ConsensusError> { ) -> Result<(), ConsensusError> {
let DatabaseResponse::ChainHeight(chain_height) = database assert_eq!(self.last_accounted_height + 1, height);
.ready() self.last_accounted_height += 1;
.await?
.call(DatabaseRequest::ChainHeight)
.await?
else {
panic!("Database sent incorrect response")
};
// TODO: We need to handle re-orgs self.timestamps.pop_front();
assert!(chain_height > self.last_accounted_height); self.timestamps.push_back(timestamp);
if chain_height == self.last_accounted_height + 1 { self.update_windowed_work(database).await?;
return Ok(());
}
let mut timestamps = get_blocks_in_range_timestamps( Ok(())
database.clone(),
self.last_accounted_height + 1..chain_height,
)
.await?;
self.timestamps.append(&mut timestamps);
self.timestamps.drain(
0..self
.timestamps
.len()
.saturating_sub(DIFFICULTY_BLOCKS_COUNT as usize),
);
self.last_accounted_height = chain_height - 1;
self.update_windowed_work(database).await
} }
async fn update_windowed_work<D: Database>( async fn update_windowed_work<D: Database>(
@ -129,13 +148,14 @@ impl DifficultyCache {
} }
let mut block_start = let mut block_start =
(self.last_accounted_height + 1).saturating_sub(DIFFICULTY_BLOCKS_COUNT); (self.last_accounted_height + 1).saturating_sub(self.config.total_block_count());
if block_start == 0 { if block_start == 0 {
block_start = 1; block_start = 1;
} }
let (start, end) = get_window_start_and_end(self.timestamps.len()); let (start, end) =
get_window_start_and_end(self.timestamps.len(), self.config.accounted_window_len());
let low_cumulative_difficulty = get_block_cum_diff( let low_cumulative_difficulty = get_block_cum_diff(
&mut database, &mut database,
@ -149,6 +169,10 @@ impl DifficultyCache {
) )
.await?; .await?;
let chain_cumulative_difficulty =
get_block_cum_diff(&mut database, self.last_accounted_height).await?;
self.cumulative_difficulty = chain_cumulative_difficulty;
self.windowed_work = high_cumulative_difficulty - low_cumulative_difficulty; self.windowed_work = high_cumulative_difficulty - low_cumulative_difficulty;
Ok(()) Ok(())
} }
@ -165,9 +189,10 @@ impl DifficultyCache {
if sorted_timestamps.len() > DIFFICULTY_WINDOW { if sorted_timestamps.len() > DIFFICULTY_WINDOW {
sorted_timestamps.drain(DIFFICULTY_WINDOW..); sorted_timestamps.drain(DIFFICULTY_WINDOW..);
}; };
sorted_timestamps.sort_unstable(); sorted_timestamps.make_contiguous().sort_unstable();
let (window_start, window_end) = get_window_start_and_end(sorted_timestamps.len()); let (window_start, window_end) =
get_window_start_and_end(sorted_timestamps.len(), self.config.accounted_window_len());
let mut time_span = let mut time_span =
u128::from(sorted_timestamps[window_end - 1] - sorted_timestamps[window_start]); u128::from(sorted_timestamps[window_end - 1] - sorted_timestamps[window_start]);
@ -176,22 +201,35 @@ impl DifficultyCache {
time_span = 1; time_span = 1;
} }
(self.windowed_work * target_time_for_hf(hf) + time_span - 1) / time_span (self.windowed_work * hf.block_time().as_secs() as u128 + time_span - 1) / time_span
}
/// Returns the median timestamp over the last `numb_blocks`.
///
/// Will panic if `numb_blocks` is larger than amount of blocks in the cache.
pub fn median_timestamp(&self, numb_blocks: usize) -> u64 {
median(
&self
.timestamps
.range(self.timestamps.len().checked_sub(numb_blocks).unwrap()..)
.copied()
.collect::<Vec<_>>(),
)
} }
} }
fn get_window_start_and_end(window_len: usize) -> (usize, usize) { fn get_window_start_and_end(window_len: usize, accounted_window: usize) -> (usize, usize) {
let window_len = if window_len > DIFFICULTY_WINDOW { let window_len = if window_len > DIFFICULTY_WINDOW {
DIFFICULTY_WINDOW DIFFICULTY_WINDOW
} else { } else {
window_len window_len
}; };
if window_len <= DIFFICULTY_ACCOUNTED_WINDOW_LEN { if window_len <= accounted_window {
(0, window_len) (0, window_len)
} else { } else {
let start = (window_len - (DIFFICULTY_ACCOUNTED_WINDOW_LEN) + 1) / 2; let start = (window_len - (accounted_window) + 1) / 2;
(start, start + DIFFICULTY_ACCOUNTED_WINDOW_LEN) (start, start + accounted_window)
} }
} }
@ -199,7 +237,7 @@ fn get_window_start_and_end(window_len: usize) -> (usize, usize) {
async fn get_blocks_in_range_timestamps<D: Database + Clone>( async fn get_blocks_in_range_timestamps<D: Database + Clone>(
database: D, database: D,
block_heights: Range<u64>, block_heights: Range<u64>,
) -> Result<Vec<u64>, ConsensusError> { ) -> Result<VecDeque<u64>, ConsensusError> {
tracing::info!("Getting blocks timestamps"); tracing::info!("Getting blocks timestamps");
let DatabaseResponse::BlockPOWInfoInRange(pow_infos) = database let DatabaseResponse::BlockPOWInfoInRange(pow_infos) = database
@ -221,10 +259,3 @@ async fn get_block_cum_diff<D: Database>(database: D, height: u64) -> Result<u12
}; };
Ok(pow.cumulative_difficulty) Ok(pow.cumulative_difficulty)
} }
fn target_time_for_hf(hf: &HardFork) -> u128 {
match hf {
HardFork::V1 => 60,
_ => 120,
}
}

View file

@ -1,6 +1,7 @@
use crypto_bigint::{CheckedMul, U256}; use crypto_bigint::{CheckedMul, U256};
use cryptonight_cuprate::{cryptonight_hash, Variant};
pub mod difficulty; use crate::{hardforks::HardFork, ConsensusError};
#[derive(Debug)] #[derive(Debug)]
pub struct BlockPOWInfo { pub struct BlockPOWInfo {
@ -18,3 +19,26 @@ pub fn check_block_pow(hash: &[u8; 32], difficulty: u128) -> bool {
int_hash.checked_mul(&difficulty).is_some().unwrap_u8() == 1 int_hash.checked_mul(&difficulty).is_some().unwrap_u8() == 1
} }
/// Calcualtes the POW hash of this block.
pub fn calculate_pow_hash(buf: &[u8], height: u64, hf: &HardFork) -> [u8; 32] {
if height == 202612 {
return hex::decode("84f64766475d51837ac9efbef1926486e58563c95a19fef4aec3254f03000000")
.unwrap()
.try_into()
.unwrap();
}
if hf.in_range(&HardFork::V1, &HardFork::V7) {
cryptonight_hash(buf, &Variant::V0)
//cryptonight_hash::cryptonight_hash(buf, &Variant::V0)
} else if hf == &HardFork::V7 {
cryptonight_hash(buf, &Variant::V1)
} else if hf.in_range(&HardFork::V8, &HardFork::V10) {
cryptonight_hash(buf, &Variant::V2)
} else if hf.in_range(&HardFork::V10, &HardFork::V12) {
cryptonight_hash(buf, &Variant::R { height })
} else {
todo!("RandomX")
}
}

View file

@ -0,0 +1,31 @@
use crate::hardforks::HardFork;
const MONEY_SUPPLY: u64 = u64::MAX;
const MINIMUM_REWARD_PER_MIN: u64 = 3 * 10_u64.pow(11);
fn calculate_base_reward(already_generated_coins: u64, hf: &HardFork) -> u64 {
let target_mins = hf.block_time().as_secs() / 60;
let emission_speed_factor = 20 - (target_mins - 1);
((MONEY_SUPPLY - already_generated_coins) >> emission_speed_factor)
.max(MINIMUM_REWARD_PER_MIN * target_mins)
}
pub fn calculate_block_reward(
block_weight: u64,
effective_median_bw: u64,
already_generated_coins: u64,
hf: &HardFork,
) -> u64 {
let base_reward = calculate_base_reward(already_generated_coins, hf);
let multiplicand = (2 * effective_median_bw - block_weight) * block_weight;
let effective_median_bw: u128 = effective_median_bw.into();
((mul_128(base_reward, multiplicand) / effective_median_bw) / effective_median_bw)
.try_into()
.unwrap()
}
fn mul_128(a: u64, b: u64) -> u128 {
a as u128 * b as u128
}

View file

@ -14,7 +14,10 @@ use monero_serai::{block::Block, transaction::Transaction};
use tower::ServiceExt; use tower::ServiceExt;
use tracing::instrument; use tracing::instrument;
use crate::{hardforks::HardFork, ConsensusError, Database, DatabaseRequest, DatabaseResponse}; use crate::{
hardforks::HardFork, helper::median, ConsensusError, Database, DatabaseRequest,
DatabaseResponse,
};
const PENALTY_FREE_ZONE_1: usize = 20000; const PENALTY_FREE_ZONE_1: usize = 20000;
const PENALTY_FREE_ZONE_2: usize = 60000; const PENALTY_FREE_ZONE_2: usize = 60000;
@ -52,6 +55,30 @@ pub fn penalty_free_zone(hf: &HardFork) -> usize {
} }
} }
/// Configuration for the block weight cache.
///
#[derive(Debug, Clone)]
pub struct BlockWeightsCacheConfig {
short_term_window: u64,
long_term_window: u64,
}
impl BlockWeightsCacheConfig {
pub fn new(short_term_window: u64, long_term_window: u64) -> BlockWeightsCacheConfig {
BlockWeightsCacheConfig {
short_term_window,
long_term_window,
}
}
pub fn main_net() -> BlockWeightsCacheConfig {
BlockWeightsCacheConfig {
short_term_window: SHORT_TERM_WINDOW,
long_term_window: LONG_TERM_WINDOW,
}
}
}
/// A cache used to calculate block weight limits, the effective median and /// A cache used to calculate block weight limits, the effective median and
/// long term block weights. /// long term block weights.
/// ///
@ -65,11 +92,16 @@ pub struct BlockWeightsCache {
long_term_weights: Vec<usize>, long_term_weights: Vec<usize>,
/// The height of the top block. /// The height of the top block.
tip_height: u64, tip_height: u64,
config: BlockWeightsCacheConfig,
} }
impl BlockWeightsCache { impl BlockWeightsCache {
/// Initialize the [`BlockWeightsCache`] at the the height of the database. /// Initialize the [`BlockWeightsCache`] at the the height of the database.
pub async fn init<D: Database + Clone>(mut database: D) -> Result<Self, ConsensusError> { pub async fn init<D: Database + Clone>(
config: BlockWeightsCacheConfig,
mut database: D,
) -> Result<Self, ConsensusError> {
let DatabaseResponse::ChainHeight(chain_height) = database let DatabaseResponse::ChainHeight(chain_height) = database
.ready() .ready()
.await? .await?
@ -79,19 +111,20 @@ impl BlockWeightsCache {
panic!("Database sent incorrect response!"); panic!("Database sent incorrect response!");
}; };
Self::init_from_chain_height(chain_height, database).await Self::init_from_chain_height(chain_height, config, database).await
} }
/// Initialize the [`BlockWeightsCache`] at the the given chain height. /// Initialize the [`BlockWeightsCache`] at the the given chain height.
#[instrument(name = "init_weight_cache", level = "info", skip(database))] #[instrument(name = "init_weight_cache", level = "info", skip(database, config))]
pub async fn init_from_chain_height<D: Database + Clone>( pub async fn init_from_chain_height<D: Database + Clone>(
chain_height: u64, chain_height: u64,
config: BlockWeightsCacheConfig,
database: D, database: D,
) -> Result<Self, ConsensusError> { ) -> Result<Self, ConsensusError> {
tracing::info!("Initializing weight cache this may take a while."); tracing::info!("Initializing weight cache this may take a while.");
let mut long_term_weights = get_long_term_weight_in_range( let mut long_term_weights = get_long_term_weight_in_range(
chain_height.saturating_sub(LONG_TERM_WINDOW)..chain_height, chain_height.saturating_sub(config.long_term_window)..chain_height,
database.clone(), database.clone(),
) )
.await?; .await?;
@ -103,7 +136,7 @@ impl BlockWeightsCache {
); );
let short_term_block_weights: VecDeque<usize> = get_blocks_weight_in_range( let short_term_block_weights: VecDeque<usize> = get_blocks_weight_in_range(
chain_height.saturating_sub(SHORT_TERM_WINDOW)..chain_height, chain_height.saturating_sub(config.short_term_window)..chain_height,
database, database,
) )
.await? .await?
@ -115,6 +148,7 @@ impl BlockWeightsCache {
short_term_block_weights, short_term_block_weights,
long_term_weights, long_term_weights,
tip_height: chain_height - 1, tip_height: chain_height - 1,
config,
}) })
} }
@ -142,7 +176,7 @@ impl BlockWeightsCache {
Ok(idx) | Err(idx) => self.long_term_weights.insert(idx, long_term_weight), Ok(idx) | Err(idx) => self.long_term_weights.insert(idx, long_term_weight),
}; };
if let Some(height_to_remove) = block_height.checked_sub(LONG_TERM_WINDOW) { if let Some(height_to_remove) = block_height.checked_sub(self.config.long_term_window) {
tracing::debug!( tracing::debug!(
"Block {} is out of the long term weight window, removing it", "Block {} is out of the long term weight window, removing it",
height_to_remove height_to_remove
@ -161,7 +195,7 @@ impl BlockWeightsCache {
} }
self.short_term_block_weights.push_back(block_weight); self.short_term_block_weights.push_back(block_weight);
if self.short_term_block_weights.len() > SHORT_TERM_WINDOW.try_into().unwrap() { if self.short_term_block_weights.len() > self.config.short_term_window.try_into().unwrap() {
self.short_term_block_weights.pop_front(); self.short_term_block_weights.pop_front();
} }
@ -190,9 +224,18 @@ impl BlockWeightsCache {
) )
} }
/// Returns the block weight limit. /// Returns the median weight used to calculate block reward punishment.
pub fn next_block_weight_limit(&self, hf: &HardFork) -> usize { ///
2 * self.effective_median_block_weight(hf) /// https://cuprate.github.io/monero-book/consensus_rules/blocks/reward.html#calculating-block-reward
pub fn median_for_block_reward(&self, hf: &HardFork) -> usize {
if hf.in_range(&HardFork::V1, &HardFork::V12) {
let mut sorted_short_term_weights: Vec<usize> =
self.short_term_block_weights.clone().into();
sorted_short_term_weights.sort_unstable();
median(&sorted_short_term_weights)
} else {
self.effective_median_block_weight(hf)
}
} }
} }
@ -245,25 +288,6 @@ fn calculate_block_long_term_weight(
min(short_term_constraint, adjusted_block_weight) min(short_term_constraint, adjusted_block_weight)
} }
fn get_mid(a: usize, b: usize) -> usize {
// https://github.com/monero-project/monero/blob/90294f09ae34ef96f3dea5fea544816786df87c8/contrib/epee/include/misc_language.h#L43
(a / 2) + (b / 2) + ((a - 2 * (a / 2)) + (b - 2 * (b / 2))) / 2
}
fn median(array: &[usize]) -> usize {
let mid = array.len() / 2;
if array.len() == 1 {
return array[0];
}
if array.len() % 2 == 0 {
get_mid(array[mid - 1], array[mid])
} else {
array[mid]
}
}
#[instrument(name = "get_block_weights", skip(database))] #[instrument(name = "get_block_weights", skip(database))]
async fn get_blocks_weight_in_range<D: Database + Clone>( async fn get_blocks_weight_in_range<D: Database + Clone>(
range: Range<u64>, range: Range<u64>,

View file

@ -1,5 +1,6 @@
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use std::ops::Range; use std::ops::Range;
use std::time::Duration;
use monero_serai::block::BlockHeader; use monero_serai::block::BlockHeader;
use tower::ServiceExt; use tower::ServiceExt;
@ -11,6 +12,10 @@ use crate::{ConsensusError, Database, DatabaseRequest, DatabaseResponse};
// https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#accepting-a-fork // https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#accepting-a-fork
const DEFAULT_WINDOW_SIZE: u64 = 10080; // supermajority window check length - a week const DEFAULT_WINDOW_SIZE: u64 = 10080; // supermajority window check length - a week
const BLOCK_TIME_V1: Duration = Duration::from_secs(60);
const BLOCK_TIME_V2: Duration = Duration::from_secs(120);
const NUMB_OF_HARD_FORKS: usize = 16;
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub struct BlockHFInfo { pub struct BlockHFInfo {
@ -34,6 +39,65 @@ impl BlockHFInfo {
} }
} }
/// Information about a given hard-fork.
#[derive(Debug, Clone, Copy)]
pub struct HFInfo {
height: u64,
threshold: u64,
}
impl HFInfo {
pub fn new(height: u64, threshold: u64) -> HFInfo {
HFInfo { height, threshold }
}
/// Returns the main-net hard-fork information.
///
/// https://cuprate.github.io/monero-book/consensus_rules/hardforks.html#Mainnet-Hard-Forks
pub fn main_net() -> [HFInfo; NUMB_OF_HARD_FORKS] {
[
HFInfo::new(0, 0),
HFInfo::new(1009827, 0),
HFInfo::new(1141317, 0),
HFInfo::new(1220516, 0),
HFInfo::new(1288616, 0),
HFInfo::new(1400000, 0),
HFInfo::new(1546000, 0),
HFInfo::new(1685555, 0),
HFInfo::new(1686275, 0),
HFInfo::new(1788000, 0),
HFInfo::new(1788720, 0),
HFInfo::new(1978433, 0),
HFInfo::new(2210000, 0),
HFInfo::new(2210720, 0),
HFInfo::new(2688888, 0),
HFInfo::new(2689608, 0),
]
}
}
/// Configuration for hard-forks.
///
#[derive(Debug, Clone)]
pub struct HardForkConfig {
/// The network we are on.
forks: [HFInfo; NUMB_OF_HARD_FORKS],
/// The amount of votes we are taking into account to decide on a fork activation.
window: u64,
}
impl HardForkConfig {
fn fork_info(&self, hf: &HardFork) -> HFInfo {
self.forks[*hf as usize - 1]
}
pub fn main_net() -> HardForkConfig {
Self {
forks: HFInfo::main_net(),
window: DEFAULT_WINDOW_SIZE,
}
}
}
/// An identifier for every hard-fork Monero has had. /// An identifier for every hard-fork Monero has had.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
#[repr(u8)] #[repr(u8)]
@ -104,72 +168,26 @@ impl HardFork {
HardFork::from_version(&(*self as u8 + 1)).ok() HardFork::from_version(&(*self as u8 + 1)).ok()
} }
/// Returns the threshold of this fork.
pub fn fork_threshold(&self, _: &Network) -> u64 {
// No Monero hard forks actually use voting
0
}
/// Returns the votes needed for this fork.
///
/// https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#accepting-a-fork
pub fn votes_needed(&self, network: &Network, window: u64) -> u64 {
(self.fork_threshold(network) * window + 99) / 100
}
/// Returns the minimum height this fork will activate at
pub fn fork_height(&self, network: &Network) -> u64 {
match network {
Network::Mainnet => self.mainnet_fork_height(),
Network::Stagenet => self.stagenet_fork_height(),
Network::Testnet => self.testnet_fork_height(),
}
}
/// https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#Stagenet-Hard-Forks
fn stagenet_fork_height(&self) -> u64 {
todo!()
}
/// https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#Testnet-Hard-Forks
fn testnet_fork_height(&self) -> u64 {
todo!()
}
/// https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#Mainnet-Hard-Forks
fn mainnet_fork_height(&self) -> u64 {
match self {
HardFork::V1 => 0, // Monero core has this as 1, which is strange
HardFork::V2 => 1009827,
HardFork::V3 => 1141317,
HardFork::V4 => 1220516,
HardFork::V5 => 1288616,
HardFork::V6 => 1400000,
HardFork::V7 => 1546000,
HardFork::V8 => 1685555,
HardFork::V9 => 1686275,
HardFork::V10 => 1788000,
HardFork::V11 => 1788720,
HardFork::V12 => 1978433,
HardFork::V13 => 2210000,
HardFork::V14 => 2210720,
HardFork::V15 => 2688888,
HardFork::V16 => 2689608,
}
}
/// Returns if the hard-fork is in range: /// Returns if the hard-fork is in range:
/// ///
/// start <= hf < end /// start <= hf < end
pub fn in_range(&self, start: &HardFork, end: &HardFork) -> bool { pub fn in_range(&self, start: &HardFork, end: &HardFork) -> bool {
start <= self && self < end start <= self && self < end
} }
/// Returns the target block time for this hardfork.
pub fn block_time(&self) -> Duration {
match self {
HardFork::V1 => BLOCK_TIME_V1,
_ => BLOCK_TIME_V2,
}
}
} }
/// A struct holding the current voting state of the blockchain. /// A struct holding the current voting state of the blockchain.
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]
struct HFVotes { struct HFVotes {
votes: [u64; 16], votes: [u64; NUMB_OF_HARD_FORKS],
} }
impl Display for HFVotes { impl Display for HFVotes {
@ -225,25 +243,6 @@ impl HFVotes {
} }
} }
/// Configuration for hard-forks.
///
#[derive(Debug, Clone)]
pub struct HardForkConfig {
/// The network we are on.
network: Network,
/// The amount of votes we are taking into account to decide on a fork activation.
window: u64,
}
impl HardForkConfig {
pub fn main_net() -> HardForkConfig {
Self {
network: Network::Mainnet,
window: DEFAULT_WINDOW_SIZE,
}
}
}
/// A struct that keeps track of the current hard-fork and current votes. /// A struct that keeps track of the current hard-fork and current votes.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct HardForkState { pub struct HardForkState {
@ -379,9 +378,10 @@ impl HardForkState {
/// https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#accepting-a-fork /// https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#accepting-a-fork
fn check_set_new_hf(&mut self) { fn check_set_new_hf(&mut self) {
while let Some(new_hf) = self.next_hardfork { while let Some(new_hf) = self.next_hardfork {
if self.last_height + 1 >= new_hf.fork_height(&self.config.network) let hf_info = self.config.fork_info(&new_hf);
if self.last_height + 1 >= hf_info.height
&& self.votes.votes_for_hf(&new_hf) && self.votes.votes_for_hf(&new_hf)
>= new_hf.votes_needed(&self.config.network, self.config.window) >= votes_needed(hf_info.threshold, self.config.window)
{ {
self.set_hf(new_hf); self.set_hf(new_hf);
} else { } else {
@ -397,6 +397,13 @@ impl HardForkState {
} }
} }
/// Returns the votes needed for this fork.
///
/// https://cuprate.github.io/monero-docs/consensus_rules/hardforks.html#accepting-a-fork
pub fn votes_needed(threshold: u64, window: u64) -> u64 {
(threshold * window + 99) / 100
}
#[instrument(name = "get_votes", skip(database))] #[instrument(name = "get_votes", skip(database))]
async fn get_votes_in_range<D: Database>( async fn get_votes_in_range<D: Database>(
database: D, database: D,

56
consensus/src/helper.rs Normal file
View file

@ -0,0 +1,56 @@
use std::{
io::{Cursor, Error, ErrorKind},
ops::{Add, Div, Mul, Sub},
time::{SystemTime, UNIX_EPOCH},
};
/// Deserializes an object using the give `des` function, checking that all the bytes
/// are consumed.
pub(crate) fn size_check_decode<T>(
buf: &[u8],
des: impl Fn(&mut Cursor<&[u8]>) -> Result<T, Error>,
) -> Result<T, Error> {
let mut cur = Cursor::new(buf);
let t = des(&mut cur)?;
if TryInto::<usize>::try_into(cur.position()).unwrap() != buf.len() {
return Err(Error::new(
ErrorKind::Other,
"Data not fully consumed while decoding!",
));
}
Ok(t)
}
pub(crate) fn get_mid<T>(a: T, b: T) -> T
where
T: Add<Output = T> + Sub<Output = T> + Div<Output = T> + Mul<Output = T> + Copy + From<u8>,
{
let two: T = 2_u8.into();
// https://github.com/monero-project/monero/blob/90294f09ae34ef96f3dea5fea544816786df87c8/contrib/epee/include/misc_language.h#L43
(a / two) + (b / two) + ((a - two * (a / two)) + (b - two * (b / two))) / two
}
pub(crate) fn median<T>(array: &[T]) -> T
where
T: Add<Output = T> + Sub<Output = T> + Div<Output = T> + Mul<Output = T> + Copy + From<u8>,
{
let mid = array.len() / 2;
if array.len() == 1 {
return array[0];
}
if array.len() % 2 == 0 {
get_mid(array[mid - 1], array[mid])
} else {
array[mid]
}
}
pub(crate) fn current_time() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
}

View file

@ -1,6 +1,7 @@
pub mod block; pub mod block;
pub mod genesis; pub mod genesis;
pub mod hardforks; pub mod hardforks;
mod helper;
pub mod miner_tx; pub mod miner_tx;
#[cfg(feature = "binaries")] #[cfg(feature = "binaries")]
pub mod rpc; pub mod rpc;
@ -8,6 +9,10 @@ pub mod verifier;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum ConsensusError { pub enum ConsensusError {
#[error("Block has a timestamp outside of the valid range")]
BlockTimestampInvalid,
#[error("Block is too large")]
BlockIsTooLarge,
#[error("Invalid hard fork version: {0}")] #[error("Invalid hard fork version: {0}")]
InvalidHardForkVersion(&'static str), InvalidHardForkVersion(&'static str),
#[error("The block has a different previous hash than expected")] #[error("The block has a different previous hash than expected")]
@ -46,13 +51,13 @@ pub enum DatabaseRequest {
#[derive(Debug)] #[derive(Debug)]
pub enum DatabaseResponse { pub enum DatabaseResponse {
BlockHFInfo(hardforks::BlockHFInfo), BlockHFInfo(hardforks::BlockHFInfo),
BlockPOWInfo(block::pow::BlockPOWInfo), BlockPOWInfo(block::BlockPOWInfo),
BlockWeights(block::weight::BlockWeightInfo), BlockWeights(block::weight::BlockWeightInfo),
BlockHash([u8; 32]), BlockHash([u8; 32]),
BlockHfInfoInRange(Vec<hardforks::BlockHFInfo>), BlockHfInfoInRange(Vec<hardforks::BlockHFInfo>),
BlockWeightsInRange(Vec<block::weight::BlockWeightInfo>), BlockWeightsInRange(Vec<block::BlockWeightInfo>),
BlockPOWInfoInRange(Vec<block::pow::BlockPOWInfo>), BlockPOWInfoInRange(Vec<block::BlockPOWInfo>),
ChainHeight(u64), ChainHeight(u64),

View file

@ -2,11 +2,11 @@ use std::cmp::min;
use std::future::Future; use std::future::Future;
use std::ops::Range; use std::ops::Range;
use std::pin::Pin; use std::pin::Pin;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex, RwLock};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture}; use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture};
use futures::{stream::FuturesOrdered, FutureExt, TryFutureExt, TryStreamExt}; use futures::{stream::FuturesOrdered, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use monero_serai::rpc::{HttpRpc, RpcConnection, RpcError}; use monero_serai::rpc::{HttpRpc, RpcConnection, RpcError};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
@ -25,8 +25,24 @@ use crate::{DatabaseRequest, DatabaseResponse};
mod discover; mod discover;
pub const MAX_BLOCKS_IN_RANGE: u64 = 200; #[derive(Debug, Copy, Clone)]
pub const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 200; pub struct RpcConfig {
pub max_blocks_per_node: u64,
pub max_block_headers_per_node: u64,
}
impl RpcConfig {
pub fn block_batch_size(&self) -> u64 {
self.max_blocks_per_node * 3
}
pub fn new(max_blocks_per_node: u64, max_block_headers_per_node: u64) -> RpcConfig {
RpcConfig {
max_block_headers_per_node,
max_blocks_per_node,
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct Attempts(u64); pub struct Attempts(u64);
@ -52,6 +68,7 @@ impl<Req: Clone, Res, E> tower::retry::Policy<Req, Res, E> for Attempts {
pub fn init_rpc_load_balancer( pub fn init_rpc_load_balancer(
addresses: Vec<String>, addresses: Vec<String>,
config: Arc<RwLock<RpcConfig>>,
) -> impl tower::Service< ) -> impl tower::Service<
DatabaseRequest, DatabaseRequest,
Response = DatabaseResponse, Response = DatabaseResponse,
@ -60,21 +77,28 @@ pub fn init_rpc_load_balancer(
Box<dyn Future<Output = Result<DatabaseResponse, tower::BoxError>> + Send + 'static>, Box<dyn Future<Output = Result<DatabaseResponse, tower::BoxError>> + Send + 'static>,
>, >,
> + Clone { > + Clone {
let rpc_discoverer = tower::discover::ServiceList::new( let (rpc_discoverer_tx, rpc_discoverer_rx) = futures::channel::mpsc::channel(30);
addresses
.into_iter() let rpc_balance = Balance::new(rpc_discoverer_rx.map(Result::<_, tower::BoxError>::Ok));
.map(|addr| tower::load::Constant::new(Rpc::new_http(addr), 0)),
);
let rpc_balance = Balance::new(rpc_discoverer);
let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(rpc_balance), 3); let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(rpc_balance), 3);
let rpcs = tower::retry::Retry::new(Attempts(2), rpc_buffer); let rpcs = tower::retry::Retry::new(Attempts(2), rpc_buffer);
RpcBalancer { rpcs } let discover = discover::RPCDiscover {
rpc: rpcs.clone(),
initial_list: addresses,
ok_channel: rpc_discoverer_tx,
already_connected: Default::default(),
};
tokio::spawn(discover.run());
RpcBalancer { rpcs, config }
} }
#[derive(Clone)] #[derive(Clone)]
pub struct RpcBalancer<T: Clone> { pub struct RpcBalancer<T: Clone> {
rpcs: T, rpcs: T,
config: Arc<RwLock<RpcConfig>>,
} }
impl<T> tower::Service<DatabaseRequest> for RpcBalancer<T> impl<T> tower::Service<DatabaseRequest> for RpcBalancer<T>
@ -97,6 +121,8 @@ where
fn call(&mut self, req: DatabaseRequest) -> Self::Future { fn call(&mut self, req: DatabaseRequest) -> Self::Future {
let this = self.rpcs.clone(); let this = self.rpcs.clone();
let config_mutex = self.config.clone();
let config = config_mutex.read().unwrap();
match req { match req {
DatabaseRequest::BlockBatchInRange(range) => { DatabaseRequest::BlockBatchInRange(range) => {
@ -112,7 +138,7 @@ where
DatabaseRequest::BlockBatchInRange, DatabaseRequest::BlockBatchInRange,
DatabaseResponse::BlockBatchInRange, DatabaseResponse::BlockBatchInRange,
resp_to_ret, resp_to_ret,
MAX_BLOCKS_IN_RANGE, config.max_blocks_per_node,
) )
} }
DatabaseRequest::BlockPOWInfoInRange(range) => { DatabaseRequest::BlockPOWInfoInRange(range) => {
@ -128,7 +154,7 @@ where
DatabaseRequest::BlockPOWInfoInRange, DatabaseRequest::BlockPOWInfoInRange,
DatabaseResponse::BlockPOWInfoInRange, DatabaseResponse::BlockPOWInfoInRange,
resp_to_ret, resp_to_ret,
MAX_BLOCKS_HEADERS_IN_RANGE, config.max_block_headers_per_node,
) )
} }
@ -145,7 +171,7 @@ where
DatabaseRequest::BlockWeightsInRange, DatabaseRequest::BlockWeightsInRange,
DatabaseResponse::BlockWeightsInRange, DatabaseResponse::BlockWeightsInRange,
resp_to_ret, resp_to_ret,
MAX_BLOCKS_HEADERS_IN_RANGE, config.max_block_headers_per_node,
) )
} }
DatabaseRequest::BlockHfInfoInRange(range) => { DatabaseRequest::BlockHfInfoInRange(range) => {
@ -161,7 +187,7 @@ where
DatabaseRequest::BlockHfInfoInRange, DatabaseRequest::BlockHfInfoInRange,
DatabaseResponse::BlockHfInfoInRange, DatabaseResponse::BlockHfInfoInRange,
resp_to_ret, resp_to_ret,
MAX_BLOCKS_HEADERS_IN_RANGE, config.max_block_headers_per_node,
) )
} }
req => this.oneshot(req).boxed(), req => this.oneshot(req).boxed(),
@ -524,3 +550,17 @@ async fn get_blocks_hf_info_in_range<R: RpcConnection>(
.collect(), .collect(),
)) ))
} }
#[tokio::test]
async fn t() {
let rpc = Rpc::new_http("http://node.c3pool.com:18081".to_string());
let res: serde_json::Value = rpc
.rpc
.try_lock()
.unwrap()
.json_rpc_call("get_connections", None)
.await
.unwrap();
println!("{res}");
}

View file

@ -7,6 +7,7 @@ use futures::{channel::mpsc, SinkExt, Stream, StreamExt, TryFutureExt, TryStream
use monero_serai::rpc::HttpRpc; use monero_serai::rpc::HttpRpc;
use tokio::time::timeout; use tokio::time::timeout;
use tower::discover::Change; use tower::discover::Change;
use tower::load::PeakEwma;
use tower::ServiceExt; use tower::ServiceExt;
use tracing::instrument; use tracing::instrument;
@ -28,24 +29,32 @@ async fn check_rpc(addr: String) -> Option<Rpc<HttpRpc>> {
Some(Rpc::new_http(addr)) Some(Rpc::new_http(addr))
} }
struct RPCDiscover<T> { pub(crate) struct RPCDiscover<T> {
rpc: T, pub rpc: T,
initial_list: Vec<String>, pub initial_list: Vec<String>,
ok_channel: mpsc::Sender<Change<usize, Rpc<HttpRpc>>>, pub ok_channel: mpsc::Sender<Change<usize, PeakEwma<Rpc<HttpRpc>>>>,
already_connected: HashSet<String>, pub already_connected: HashSet<String>,
} }
impl<T: Database> RPCDiscover<T> { impl<T: Database> RPCDiscover<T> {
async fn found_rpc(&mut self, rpc: Rpc<HttpRpc>) -> Result<(), SendError> { async fn found_rpc(&mut self, rpc: Rpc<HttpRpc>) -> Result<(), SendError> {
if self.already_connected.contains(&rpc.addr) { //if self.already_connected.contains(&rpc.addr) {
return Ok(()); // return Ok(());
} //}
tracing::info!("Found node to connect to: {}", &rpc.addr); tracing::info!("Connecting to node: {}", &rpc.addr);
let addr = rpc.addr.clone(); let addr = rpc.addr.clone();
self.ok_channel self.ok_channel
.send(Change::Insert(self.already_connected.len(), rpc)) .send(Change::Insert(
self.already_connected.len(),
PeakEwma::new(
rpc,
Duration::from_secs(5000),
300.0,
tower::load::CompleteOnResponse::default(),
),
))
.await?; .await?;
self.already_connected.insert(addr); self.already_connected.insert(addr);
@ -53,10 +62,8 @@ impl<T: Database> RPCDiscover<T> {
} }
pub async fn run(mut self) { pub async fn run(mut self) {
loop {
if !self.initial_list.is_empty() { if !self.initial_list.is_empty() {
let mut fut = let mut fut = FuturesUnordered::from_iter(self.initial_list.drain(..).map(check_rpc));
FuturesUnordered::from_iter(self.initial_list.drain(..).map(check_rpc));
while let Some(res) = fut.next().await { while let Some(res) = fut.next().await {
if let Some(rpc) = res { if let Some(rpc) = res {
@ -67,14 +74,5 @@ impl<T: Database> RPCDiscover<T> {
} }
} }
} }
if self.already_connected.len() > 100 {
tracing::info!("Stopping RPC discover, connected to 100 nodes!");
}
tokio::time::sleep(Duration::from_secs(2)).await
// TODO: RPC request to get more peers
}
} }
} }

View file

@ -4,19 +4,26 @@ use tower::ServiceExt;
use tracing::instrument; use tracing::instrument;
use crate::{ use crate::{
block::{pow::difficulty::DifficultyCache, weight::BlockWeightsCache}, block::{
difficulty::{DifficultyCache, DifficultyCacheConfig},
weight::{BlockWeightsCache, BlockWeightsCacheConfig},
},
hardforks::{HardForkConfig, HardForkState}, hardforks::{HardForkConfig, HardForkState},
ConsensusError, Database, DatabaseRequest, DatabaseResponse, ConsensusError, Database, DatabaseRequest, DatabaseResponse,
}; };
pub struct Config { pub struct Config {
hard_fork_cfg: HardForkConfig, hard_fork_cfg: HardForkConfig,
difficulty_cfg: DifficultyCacheConfig,
weights_config: BlockWeightsCacheConfig,
} }
impl Config { impl Config {
pub fn main_net() -> Config { pub fn main_net() -> Config {
Config { Config {
hard_fork_cfg: HardForkConfig::main_net(), hard_fork_cfg: HardForkConfig::main_net(),
difficulty_cfg: DifficultyCacheConfig::main_net(),
weights_config: BlockWeightsCacheConfig::main_net(),
} }
} }
} }
@ -47,7 +54,6 @@ impl State {
Self::init_at_chain_height(config, chain_height, database).await Self::init_at_chain_height(config, chain_height, database).await
} }
#[instrument(name = "init_state", skip_all)]
pub async fn init_at_chain_height<D: Database + Clone>( pub async fn init_at_chain_height<D: Database + Clone>(
config: Config, config: Config,
chain_height: u64, chain_height: u64,
@ -63,8 +69,16 @@ impl State {
}; };
let (block_weight, difficulty, hard_fork) = join!( let (block_weight, difficulty, hard_fork) = join!(
BlockWeightsCache::init_from_chain_height(chain_height, database.clone()), BlockWeightsCache::init_from_chain_height(
DifficultyCache::init_from_chain_height(chain_height, database.clone()), chain_height,
config.weights_config,
database.clone()
),
DifficultyCache::init_from_chain_height(
chain_height,
config.difficulty_cfg,
database.clone()
),
HardForkState::init_from_chain_height(config.hard_fork_cfg, chain_height, database) HardForkState::init_from_chain_height(config.hard_fork_cfg, chain_height, database)
); );
@ -99,7 +113,6 @@ impl Verifier {
Self::init_at_chain_height(config, chain_height, database).await Self::init_at_chain_height(config, chain_height, database).await
} }
#[instrument(name = "init_verifier", skip_all)]
pub async fn init_at_chain_height<D: Database + Clone>( pub async fn init_at_chain_height<D: Database + Clone>(
config: Config, config: Config,
chain_height: u64, chain_height: u64,

View file

@ -9,6 +9,8 @@ repository = "https://github.com/Cuprate/cuprate/tree/main/cryptonight"
[dependencies] [dependencies]
[build-dependencies] [build-dependencies]
cc = "1" cc = "1"
[dev-dependencies]
hex = "0.4"

View file

@ -0,0 +1,92 @@
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use crate::NetworkAddress;
#[derive(Serialize, Deserialize)]
pub(crate) struct TaggedNetworkAddress {
#[serde(rename = "type")]
ty: u8,
#[serde(flatten)]
addr: RawNetworkAddress,
}
#[derive(Error, Debug)]
#[error("Invalid network address tag")]
pub(crate) struct InvalidNetworkAddressTag;
impl TryFrom<TaggedNetworkAddress> for NetworkAddress {
type Error = InvalidNetworkAddressTag;
fn try_from(value: TaggedNetworkAddress) -> Result<Self, Self::Error> {
Ok(match (value.ty, value.addr) {
(1, RawNetworkAddress::IPv4(addr)) => NetworkAddress::IPv4(addr),
(2, RawNetworkAddress::IPv6(addr)) => NetworkAddress::IPv6(addr),
_ => return Err(InvalidNetworkAddressTag),
})
}
}
impl From<NetworkAddress> for TaggedNetworkAddress {
fn from(value: NetworkAddress) -> Self {
match value {
NetworkAddress::IPv4(addr) => TaggedNetworkAddress {
ty: 1,
addr: RawNetworkAddress::IPv4(addr),
},
NetworkAddress::IPv6(addr) => TaggedNetworkAddress {
ty: 2,
addr: RawNetworkAddress::IPv6(addr),
},
}
}
}
#[derive(Serialize, Deserialize)]
#[serde(untagged)]
pub(crate) enum RawNetworkAddress {
/// IPv4
IPv4(#[serde(with = "SocketAddrV4Def")] SocketAddrV4),
/// IPv6
IPv6(#[serde(with = "SocketAddrV6Def")] SocketAddrV6),
}
#[derive(Deserialize, Serialize)]
#[serde(remote = "SocketAddrV4")]
pub(crate) struct SocketAddrV4Def {
#[serde(getter = "get_ip_v4")]
m_ip: u32,
#[serde(getter = "SocketAddrV4::port")]
m_port: u16,
}
fn get_ip_v4(addr: &SocketAddrV4) -> u32 {
u32::from_be_bytes(addr.ip().octets())
}
impl From<SocketAddrV4Def> for SocketAddrV4 {
fn from(def: SocketAddrV4Def) -> SocketAddrV4 {
SocketAddrV4::new(Ipv4Addr::from(def.m_ip), def.m_port)
}
}
#[derive(Deserialize, Serialize)]
#[serde(remote = "SocketAddrV6")]
pub(crate) struct SocketAddrV6Def {
#[serde(getter = "get_ip_v6")]
addr: [u8; 16],
#[serde(getter = "SocketAddrV6::port")]
m_port: u16,
}
fn get_ip_v6(addr: &SocketAddrV6) -> [u8; 16] {
addr.ip().octets()
}
impl From<SocketAddrV6Def> for SocketAddrV6 {
fn from(def: SocketAddrV6Def) -> SocketAddrV6 {
SocketAddrV6::new(Ipv6Addr::from(def.addr), def.m_port, 0, 0)
}
}

View file

@ -9,5 +9,7 @@ pub(crate) fn default_true() -> bool {
} }
pub(crate) fn default_zero<T: TryFrom<u8>>() -> T { pub(crate) fn default_zero<T: TryFrom<u8>>() -> T {
0.try_into().map_err(|_ |"Couldn't fit 0 into integer type!").unwrap() 0.try_into()
.map_err(|_| "Couldn't fit 0 into integer type!")
.unwrap()
} }