consensus: add block weight calculations + rcp speedup

Instead of asking for one block at a time, we ask for batches of blocks, which significantly speeds up getting blocks.
This commit is contained in:
Boog900 2023-09-28 12:21:06 +01:00
parent 545189f523
commit d7798328b6
No known key found for this signature in database
GPG key ID: 5401367FB7302004
14 changed files with 420 additions and 109 deletions

View file

@ -1,4 +1,3 @@
[workspace]
resolver = "2"
@ -14,5 +13,3 @@ members = [
# "p2p",
# "p2p/sync-states"
]

View file

@ -9,3 +9,4 @@ authors = ["Boog900"]
[dependencies]
chrono = "0.4.24"
thiserror = "1"
hex = "0.4"

View file

@ -2,6 +2,7 @@
pub mod network;
pub mod pruning;
use std::fmt::{Formatter, Write};
//pub use hardforks::HardForks;
pub use network::Network;
pub use pruning::{PruningError, PruningSeed};
@ -19,6 +20,15 @@ pub enum BlockID {
Height(u64),
}
impl std::fmt::Display for BlockID {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
BlockID::Hash(hash) => f.write_str(&format!("Hash: {}", hex::encode(hash))),
BlockID::Height(height) => f.write_str(&format!("Height: {}", height)),
}
}
}
impl From<u64> for BlockID {
fn from(value: u64) -> Self {
BlockID::Height(value)

View file

@ -34,4 +34,4 @@ serde = {version = "1", optional = true, features = ["derive"]}
tokio = { version = "1", features = ["rt-multi-thread", "macros"], optional = true }
tracing-subscriber = {version = "0.3", optional = true}
# here to help cargo to pick a version - remove me
syn = "2.0.29"
syn = "2.0.37"

View file

@ -5,17 +5,13 @@ use monero_serai::rpc::HttpRpc;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::balance::p2c::Balance;
use tower::discover::Change;
use tower::util::BoxService;
use tower::{Service, ServiceExt};
use monero_consensus::DatabaseRequest;
use tracing::level_filters::LevelFilter;
use monero_consensus::block::weight::BlockWeightsCache;
use monero_consensus::hardforks::HardFork;
use monero_consensus::pow::difficulty::DifficultyCalculator;
use monero_consensus::rpc::Rpc;
use monero_consensus::rpc::{init_rpc_load_balancer, Rpc};
struct RpcDiscoverer(Vec<String>, u64);
@ -74,31 +70,18 @@ async fn main() {
"http://node.c3pool.com:18081".to_string(),
];
let rpc_discoverer = tower::discover::ServiceList::new(
urls.into_iter()
.map(|url| tower::load::Constant::new(Rpc::new_http(url), 0)),
let rpc = init_rpc_load_balancer(urls);
let difficulty = BlockWeightsCache::init_from_chain_height(2984089, rpc.clone())
.await
.unwrap();
println!(
"{:?}",
difficulty.next_block_long_term_weight(&HardFork::V15, 175819)
);
let rpc_balance = Balance::new(rpc_discoverer);
let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(rpc_balance), 3);
let mut rpc = tower::retry::Retry::new(Attempts(3), rpc_buffer);
let pow_info = rpc
.ready()
.await
.unwrap()
.call(DatabaseRequest::BlockPOWInfo(64.into()))
.await
.unwrap();
println!("{pow_info:?}");
let difficulty = DifficultyCalculator::init_from_chain_height(578656, rpc.clone())
.await
.unwrap();
println!("{:?}", difficulty);
println!("{:?}", difficulty.next_difficulty(&HardFork::V1)); //774466376
// println!("{:?}", difficulty.next_difficulty(&HardFork::V1)); //774466376
//let _hfs = HardForks::init_at_chain_height(HardForkConfig::default(), 1009827, rpc.clone())
// .await

2
consensus/src/block.rs Normal file
View file

@ -0,0 +1,2 @@
pub mod pow;
pub mod weight;

View file

@ -15,14 +15,3 @@ pub fn check_block_pow(hash: &[u8; 32], difficulty: u128) -> bool {
int_hash.checked_mul(&difficulty).is_some().unwrap_u8() == 1
}
#[test]
fn chekc() {
let hash = hex::decode("5aeebb3de73859d92f3f82fdb97286d81264ecb72a42e4b9f1e6d62eb682d7c0")
.unwrap()
.try_into()
.unwrap();
let diff = 257344482654;
assert!(check_block_pow(&hash, diff))
}

View file

@ -1,5 +1,5 @@
use futures::stream::FuturesOrdered;
use futures::{StreamExt, TryFutureExt};
use futures::{TryFutureExt, TryStreamExt};
use std::ops::Range;
use tower::ServiceExt;
use tracing::instrument;
@ -184,24 +184,12 @@ async fn get_blocks_in_range_timestamps<D: Database + Clone>(
database: D,
block_heights: Range<u64>,
) -> Result<Vec<u64>, Error> {
let start = block_heights.start;
let mut timestamps = Vec::with_capacity(
TryInto::<usize>::try_into(block_heights.end - start)
.expect("Height does not fit into usize!"),
let timestamp_fut = FuturesOrdered::from_iter(
block_heights
.map(|height| get_block_timestamp(database.clone(), height).map_ok(move |res| res)),
);
let mut timestamp_fut = FuturesOrdered::from_iter(block_heights.map(|height| {
get_block_timestamp(database.clone(), height).map_ok(move |res| (height, res))
}));
while let Some(res) = timestamp_fut.next().await {
let (height, timestamp): (u64, u64) = res?;
tracing::debug!("Block timestamp for height: {} = {:?}", height, timestamp);
timestamps.push(timestamp);
}
Ok(timestamps)
timestamp_fut.try_collect().await
}
async fn get_block_timestamp<D: Database>(database: D, height: u64) -> Result<u64, Error> {

View file

@ -0,0 +1,176 @@
use std::cmp::{max, min};
use std::ops::Range;
use monero_serai::{block::Block, transaction::Transaction};
use tower::ServiceExt;
use tracing::instrument;
use crate::{hardforks::HardFork, Database, DatabaseRequest, DatabaseResponse, Error};
const PENALTY_FREE_ZONE_1: usize = 20000;
const PENALTY_FREE_ZONE_2: usize = 60000;
const PENALTY_FREE_ZONE_5: usize = 300000;
const SHORT_TERM_WINDOW: u64 = 100;
const LONG_TERM_WINDOW: u64 = 100000;
#[derive(Debug)]
pub struct BlockWeightInfo {
pub block_weight: usize,
pub long_term_weight: usize,
}
/// Calculates the blocks weight.
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks/weight_limit.html#blocks-weight
pub fn block_weight(block: &Block, txs: &[Transaction]) -> usize {
txs.iter()
.chain([&block.miner_tx])
.map(|tx| tx.weight())
.sum()
}
/// Returns the penalty free zone
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks/weight_limit.html#penalty-free-zone
pub fn penalty_free_zone(hf: &HardFork) -> usize {
if hf == &HardFork::V1 {
PENALTY_FREE_ZONE_1
} else if hf.in_range(&HardFork::V2, &HardFork::V5) {
PENALTY_FREE_ZONE_2
} else {
PENALTY_FREE_ZONE_5
}
}
pub struct BlockWeightsCache {
/// This list is not sorted.
short_term_block_weights: Vec<usize>,
/// This list is sorted.
long_term_weights: Vec<usize>,
/// The height of the top block.
tip_height: u64,
}
impl BlockWeightsCache {
pub async fn init<D: Database + Clone>(mut database: D) -> Result<Self, Error> {
let DatabaseResponse::ChainHeight(chain_height) = database
.ready()
.await?
.call(DatabaseRequest::ChainHeight)
.await?
else {
panic!("Database sent incorrect response!");
};
Self::init_from_chain_height(chain_height, database).await
}
pub async fn init_from_chain_height<D: Database + Clone>(
chain_height: u64,
database: D,
) -> Result<Self, Error> {
let mut long_term_weights = get_long_term_weight_in_range(
chain_height.saturating_sub(LONG_TERM_WINDOW)..chain_height,
database.clone(),
)
.await?;
long_term_weights.sort_unstable();
tracing::debug!(
"Sorted long term weights with length: {}",
long_term_weights.len()
);
let short_term_block_weights = get_blocks_weight_in_range(
chain_height.saturating_sub(SHORT_TERM_WINDOW)..chain_height,
database,
)
.await?;
Ok(BlockWeightsCache {
short_term_block_weights,
long_term_weights,
tip_height: chain_height - 1,
})
}
pub fn next_block_long_term_weight(&self, hf: &HardFork, block_weight: usize) -> usize {
calculate_block_long_term_weight(hf, block_weight, &self.long_term_weights)
}
}
pub fn calculate_block_long_term_weight(
hf: &HardFork,
block_weight: usize,
sorted_long_term_window: &[usize],
) -> usize {
if hf.in_range(&HardFork::V1, &HardFork::V10) {
return block_weight;
}
let long_term_median = max(penalty_free_zone(hf), median(sorted_long_term_window));
let (short_term_constraint, adjusted_block_weight) =
if hf.in_range(&HardFork::V10, &HardFork::V15) {
let stc = long_term_median + long_term_median * 2 / 5;
(stc, block_weight)
} else {
let stc = long_term_median + long_term_median * 7 / 10;
(stc, max(block_weight, long_term_median * 10 / 17))
};
min(short_term_constraint, adjusted_block_weight)
}
fn get_mid(a: usize, b: usize) -> usize {
// https://github.com/monero-project/monero/blob/90294f09ae34ef96f3dea5fea544816786df87c8/contrib/epee/include/misc_language.h#L43
(a / 2) + (b / 2) + ((a - 2 * (a / 2)) + (b - 2 * (b / 2))) / 2
}
fn median(array: &[usize]) -> usize {
let mid = array.len() / 2;
if array.len() == 1 {
return array[0];
}
if array.len() % 2 == 0 {
get_mid(array[mid - 1], array[mid])
} else {
array[mid]
}
}
#[instrument(skip(database))]
async fn get_blocks_weight_in_range<D: Database + Clone>(
range: Range<u64>,
database: D,
) -> Result<Vec<usize>, Error> {
let DatabaseResponse::BlockWeightsInRange(weights) = database
.oneshot(DatabaseRequest::BlockWeightsInRange(range))
.await?
else {
panic!()
};
Ok(weights.into_iter().map(|info| info.block_weight).collect())
}
#[instrument(skip(database))]
async fn get_long_term_weight_in_range<D: Database + Clone>(
range: Range<u64>,
database: D,
) -> Result<Vec<usize>, Error> {
let DatabaseResponse::BlockWeightsInRange(weights) = database
.oneshot(DatabaseRequest::BlockWeightsInRange(range))
.await?
else {
panic!()
};
Ok(weights
.into_iter()
.map(|info| info.long_term_weight)
.collect())
}

View file

@ -81,24 +81,7 @@ impl HardFork {
/// Returns the next hard-fork.
pub fn next_fork(&self) -> Option<HardFork> {
match self {
HardFork::V1 => Some(HardFork::V2),
HardFork::V2 => Some(HardFork::V3),
HardFork::V3 => Some(HardFork::V4),
HardFork::V4 => Some(HardFork::V5),
HardFork::V5 => Some(HardFork::V6),
HardFork::V6 => Some(HardFork::V7),
HardFork::V7 => Some(HardFork::V8),
HardFork::V8 => Some(HardFork::V9),
HardFork::V9 => Some(HardFork::V10),
HardFork::V10 => Some(HardFork::V11),
HardFork::V11 => Some(HardFork::V12),
HardFork::V12 => Some(HardFork::V13),
HardFork::V13 => Some(HardFork::V14),
HardFork::V14 => Some(HardFork::V15),
HardFork::V15 => Some(HardFork::V16),
HardFork::V16 => None,
}
HardFork::from_version(&(*self as u8 + 1)).ok()
}
/// Returns the threshold of this fork.
@ -154,6 +137,13 @@ impl HardFork {
HardFork::V16 => 2689608,
}
}
/// Returns if the hard-fork is in range:
///
/// start <= hf < end
pub fn in_range(&self, start: &HardFork, end: &HardFork) -> bool {
start <= self && self < end
}
}
/// A struct holding the current voting state of the blockchain.

View file

@ -1,6 +1,7 @@
pub mod block;
pub mod genesis;
pub mod hardforks;
pub mod pow;
pub mod miner_tx;
#[cfg(feature = "rpc")]
pub mod rpc;
@ -26,12 +27,20 @@ impl<T: tower::Service<DatabaseRequest, Response = DatabaseResponse, Error = tow
pub enum DatabaseRequest {
BlockHeader(cuprate_common::BlockID),
BlockPOWInfo(cuprate_common::BlockID),
BlockWeights(cuprate_common::BlockID),
BlockWeightsInRange(std::ops::Range<u64>),
ChainHeight,
}
#[derive(Debug)]
pub enum DatabaseResponse {
BlockHeader(monero_serai::block::BlockHeader),
BlockPOWInfo(pow::BlockPOWInfo),
BlockPOWInfo(block::pow::BlockPOWInfo),
BlockWeights(block::weight::BlockWeightInfo),
BlockWeightsInRange(Vec<block::weight::BlockWeightInfo>),
ChainHeight(u64),
}

View file

@ -0,0 +1 @@

View file

@ -1,19 +1,123 @@
use std::cmp::min;
use std::future::Future;
use std::ops::Range;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture};
use futures::{FutureExt, TryFutureExt};
use futures::{stream::FuturesOrdered, FutureExt, TryFutureExt, TryStreamExt};
use monero_serai::rpc::{HttpRpc, RpcConnection, RpcError};
use serde::Deserialize;
use serde_json::json;
use tower::balance::p2c::Balance;
use tower::util::BoxService;
use tower::ServiceExt;
use cuprate_common::BlockID;
use crate::pow::BlockPOWInfo;
use crate::block::pow::BlockPOWInfo;
use crate::block::weight::BlockWeightInfo;
use crate::{DatabaseRequest, DatabaseResponse};
const MAX_BLOCKS_IN_RANGE: u64 = 25;
#[derive(Clone)]
pub struct Attempts(u64);
impl<Req: Clone, Res, E> tower::retry::Policy<Req, Res, E> for Attempts {
type Future = futures::future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
if result.is_err() {
// TODO:
Some(futures::future::ready(Attempts(self.0)))
} else {
None
}
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}
pub fn init_rpc_load_balancer(
addresses: Vec<String>,
) -> impl tower::Service<DatabaseRequest, Response = DatabaseResponse, Error = tower::BoxError> + Clone
{
let rpc_discoverer = tower::discover::ServiceList::new(
addresses
.into_iter()
.map(|addr| tower::load::Constant::new(Rpc::new_http(addr), 0)),
);
let rpc_balance = Balance::new(rpc_discoverer);
let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(rpc_balance), 3);
let rpcs = tower::retry::Retry::new(Attempts(3), rpc_buffer);
RpcBalancer { rpcs }
}
#[derive(Clone)]
pub struct RpcBalancer<T: Clone> {
rpcs: T,
}
impl<T> tower::Service<DatabaseRequest> for RpcBalancer<T>
where
T: tower::Service<DatabaseRequest, Response = DatabaseResponse, Error = tower::BoxError>
+ Clone
+ Send
+ Sync
+ 'static,
T::Future: Send + 'static,
{
type Response = DatabaseResponse;
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: DatabaseRequest) -> Self::Future {
let this = self.rpcs.clone();
match req {
DatabaseRequest::BlockWeightsInRange(range) => async move {
let res_to_weights = |res| {
let DatabaseResponse::BlockWeightsInRange(range) = res else {
panic!("Incorrect Response!");
};
range
};
let iter = (0..range.clone().count() as u64)
.step_by(MAX_BLOCKS_IN_RANGE as usize)
.map(|i| {
let new_range = (range.start + i)
..(min(range.start + i + MAX_BLOCKS_IN_RANGE, range.end));
this.clone()
.oneshot(DatabaseRequest::BlockWeightsInRange(new_range))
.map_ok(res_to_weights)
});
let fut = FuturesOrdered::from_iter(iter);
let mut res = Vec::with_capacity(range.count());
for mut rpc_res in fut.try_collect::<Vec<Vec<_>>>().await?.into_iter() {
res.append(&mut rpc_res)
}
Ok(DatabaseResponse::BlockWeightsInRange(res))
}
.boxed(),
req => this.oneshot(req).boxed(),
}
}
}
enum RpcState<R: RpcConnection> {
Locked,
Acquiring(OwnedMutexLockFuture<monero_serai::rpc::Rpc<R>>),
@ -115,24 +219,53 @@ impl<R: RpcConnection + Send + Sync + 'static> tower::Service<DatabaseRequest> f
.boxed(),
},
DatabaseRequest::BlockPOWInfo(id) => get_blocks_pow_info(id, rpc).boxed(),
DatabaseRequest::BlockWeights(id) => get_blocks_weight_info(id, rpc).boxed(),
DatabaseRequest::BlockWeightsInRange(range) => {
get_blocks_weight_info_in_range(range, rpc).boxed()
}
}
}
}
async fn get_blocks_pow_info<R: RpcConnection>(
id: BlockID,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> {
#[derive(Deserialize, Debug)]
struct BlockHeaderResponse {
struct BlockInfo {
cumulative_difficulty: u64,
cumulative_difficulty_top64: u64,
timestamp: u64,
block_weight: usize,
long_term_weight: usize,
}
async fn get_block_info_in_range<R: RpcConnection>(
range: Range<u64>,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<Vec<BlockInfo>, tower::BoxError> {
#[derive(Deserialize, Debug)]
struct Response {
headers: Vec<BlockInfo>,
}
let res = rpc
.json_rpc_call::<Response>(
"get_block_headers_range",
Some(json!({"start_height": range.start, "end_height": range.end - 1})),
)
.await?;
tracing::info!("Retrieved blocks in range: {:?}", range);
Ok(res.headers)
}
async fn get_block_info<R: RpcConnection>(
id: BlockID,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<BlockInfo, tower::BoxError> {
tracing::debug!("Retrieving block info with id: {}", id);
#[derive(Deserialize, Debug)]
struct Response {
block_header: BlockHeaderResponse,
block_header: BlockInfo,
}
match id {
@ -143,27 +276,58 @@ async fn get_blocks_pow_info<R: RpcConnection>(
Some(json!({"height": height})),
)
.await?;
Ok(DatabaseResponse::BlockPOWInfo(BlockPOWInfo {
timestamp: res.block_header.timestamp,
cumulative_difficulty: u128_from_low_high(
res.block_header.cumulative_difficulty,
res.block_header.cumulative_difficulty_top64,
),
}))
Ok(res.block_header)
}
BlockID::Hash(hash) => {
let res = rpc
.json_rpc_call::<Response>("get_block_header_by_hash", Some(json!({"hash": hash})))
.await?;
Ok(DatabaseResponse::BlockPOWInfo(BlockPOWInfo {
timestamp: res.block_header.timestamp,
cumulative_difficulty: u128_from_low_high(
res.block_header.cumulative_difficulty,
res.block_header.cumulative_difficulty_top64,
),
Ok(res.block_header)
}
}
}
async fn get_blocks_weight_info_in_range<R: RpcConnection>(
range: Range<u64>,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> {
let info = get_block_info_in_range(range, rpc).await?;
Ok(DatabaseResponse::BlockWeightsInRange(
info.into_iter()
.map(|info| BlockWeightInfo {
block_weight: info.block_weight,
long_term_weight: info.long_term_weight,
})
.collect(),
))
}
async fn get_blocks_weight_info<R: RpcConnection>(
id: BlockID,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> {
let info = get_block_info(id, rpc).await?;
Ok(DatabaseResponse::BlockWeights(BlockWeightInfo {
block_weight: info.block_weight,
long_term_weight: info.long_term_weight,
}))
}
}
async fn get_blocks_pow_info<R: RpcConnection>(
id: BlockID,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> {
let info = get_block_info(id, rpc).await?;
Ok(DatabaseResponse::BlockPOWInfo(BlockPOWInfo {
timestamp: info.timestamp,
cumulative_difficulty: u128_from_low_high(
info.cumulative_difficulty,
info.cumulative_difficulty_top64,
),
}))
}
fn u128_from_low_high(low: u64, high: u64) -> u128 {

View file

@ -249,7 +249,8 @@ fn get_imm32(gen: &mut Blake2Generator, id: &ScalarInstructionID) -> Option<u32>
fn get_mod_shift(gen: &mut Blake2Generator, id: &ScalarInstructionID) -> Option<u8> {
match id {
ScalarInstructionID::IADD_RS => Some(gen.next_u8()),
// keep the shit between 0 and 3.
ScalarInstructionID::IADD_RS => Some((gen.next_u8() >> 2) % 4),
_ => None,
}
}