fetch block headers in parallel and support multiple rpc endpoints

this significantly speeds up initiating the hardfork struct
This commit is contained in:
Boog900 2023-09-05 19:13:46 +01:00
parent 42548f733d
commit a56d8ea87f
No known key found for this signature in database
GPG key ID: 5401367FB7302004
7 changed files with 217 additions and 63 deletions

View file

@ -13,7 +13,7 @@ pub const CRYPTONOTE_PRUNING_LOG_STRIPES: u32 = 3;
pub const CRYPTONOTE_PRUNING_STRIPE_SIZE: u64 = 4096;
pub const CRYPTONOTE_PRUNING_TIP_BLOCKS: u64 = 5500;
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum BlockID {
Hash([u8; 32]),
Height(u64),

View file

@ -78,7 +78,7 @@ impl PruningSeed {
/// and 3 for `log_stripes`.*
///
pub fn new(stripe: u32, log_stripes: u32) -> Result<PruningSeed, PruningError> {
if !(log_stripes <= PRUNING_SEED_LOG_STRIPES_MASK) {
if log_stripes > PRUNING_SEED_LOG_STRIPES_MASK {
Err(PruningError::LogStripesOutOfRange)
} else if !(stripe > 0 && stripe <= (1 << log_stripes)) {
Err(PruningError::StripeOutOfRange)

View file

@ -9,7 +9,7 @@ repository = "https://github.com/Cuprate/cuprate/tree/main/consensus"
[features]
default = ["binaries"]
binaries = ["rpc", "dep:tokio", "dep:tracing-subscriber", "tower/retry", "tower/balance"]
binaries = ["rpc", "dep:tokio", "dep:tracing-subscriber", "tower/retry", "tower/balance", "tower/buffer"]
rpc = ["dep:futures"]
[dependencies]

View file

@ -1,39 +1,86 @@
#![cfg(feature = "binaries")]
use tower::{Service, ServiceExt};
use futures::Stream;
use monero_serai::rpc::HttpRpc;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::balance::p2c::Balance;
use tower::discover::Change;
use tower::util::{BoxService};
use tracing::level_filters::LevelFilter;
use monero_consensus::hardforks::{HardFork, HardForkConfig, HardForks};
use monero_consensus::hardforks::{HardForkConfig, HardForks};
use monero_consensus::rpc::Rpc;
use monero_consensus::DatabaseRequest;
struct RpcDiscoverer(Vec<String>, u64);
impl Stream for RpcDiscoverer {
type Item = Result<Change<u64, Rpc<HttpRpc>>, tower::BoxError>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(url) = this.0.pop() {
this.1 += 1;
return Poll::Ready(Some(Ok(Change::Insert(this.1, Rpc::new_http(url)))));
}
Poll::Ready(None)
}
}
#[derive(Clone)]
pub struct Attempts(u64);
impl<Req: Clone, Res, E> tower::retry::Policy<Req, Res, E> for Attempts {
type Future = futures::future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
if result.is_err() {
Some(futures::future::ready(Attempts(self.0 - 1)))
} else {
None
}
}
fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_max_level(LevelFilter::INFO)
.with_max_level(LevelFilter::DEBUG)
.init();
let mut rpc = Rpc::new_http("http://xmr-node.cakewallet.com:18081".to_string());
let urls = vec![
"http://xmr-node.cakewallet.com:18081".to_string(),
"http://node.sethforprivacy.com".to_string(),
"http://nodex.monerujo.io:18081".to_string(),
"http://node.community.rino.io:18081".to_string(),
"http://nodes.hashvault.pro:18081".to_string(),
"http://node.moneroworld.com:18089".to_string(),
"http://node.c3pool.com:18081".to_string(),
//
"http://xmr-node.cakewallet.com:18081".to_string(),
"http://node.sethforprivacy.com".to_string(),
"http://nodex.monerujo.io:18081".to_string(),
"http://node.community.rino.io:18081".to_string(),
"http://nodes.hashvault.pro:18081".to_string(),
"http://node.moneroworld.com:18089".to_string(),
"http://node.c3pool.com:18081".to_string(),
];
let res = rpc
.ready()
.await
.unwrap()
.call(DatabaseRequest::ChainHeight)
let rpc_discoverer = tower::discover::ServiceList::new(
urls.into_iter()
.map(|url| tower::load::Constant::new(Rpc::new_http(url), 0)),
);
let rpc_balance = Balance::new(rpc_discoverer);
let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(rpc_balance), 3);
let rpc = tower::retry::Retry::new(Attempts(3), rpc_buffer);
let _hfs = HardForks::init(HardForkConfig::default(), rpc.clone())
.await
.unwrap();
println!("{:?}", res);
let mut hfs = HardForks::init(HardForkConfig::default(), &mut rpc)
.await
.unwrap();
println!("{:?}", hfs);
hfs.new_block(HardFork::V2, 1009827, &mut rpc).await;
println!("{:?}", hfs);
hfs.new_block(HardFork::V2, 1009828, &mut rpc).await;
println!("{:?}", hfs);
}

View file

@ -1,3 +1,5 @@
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryFutureExt};
use std::ops::Range;
use monero_serai::block::BlockHeader;
@ -195,7 +197,7 @@ impl Default for HardForkConfig {
fn default() -> Self {
Self {
network: Network::Mainnet,
window: 3, //DEFAULT_WINDOW_SIZE,
window: DEFAULT_WINDOW_SIZE,
}
}
}
@ -213,9 +215,12 @@ pub struct HardForks {
}
impl HardForks {
pub async fn init<D>(config: HardForkConfig, database: &mut D) -> Result<Self, Error>
pub async fn init<D: Database + Clone>(
config: HardForkConfig,
mut database: D,
) -> Result<Self, Error>
where
D: Database,
D::Future: Send + 'static,
{
let DatabaseResponse::ChainHeight(chain_height) = database
.ready()
@ -231,13 +236,13 @@ impl HardForks {
0..chain_height
};
let votes = get_votes_in_range(database, block_heights).await?;
let votes = get_votes_in_range(database.clone(), block_heights).await?;
if chain_height > config.window {
assert_eq!(votes.total_votes(), config.window)
debug_assert_eq!(votes.total_votes(), config.window)
}
let latest_header = get_block_header(database, chain_height - 1).await?;
let latest_header = get_block_header(&mut database, chain_height - 1).await?;
let current_hardfork = HardFork::from_version(&latest_header.major_version)
.expect("Invalid major version in stored block");
@ -252,40 +257,108 @@ impl HardForks {
last_height: chain_height - 1,
};
// chain_height = height + 1
hfs.check_set_new_hf(chain_height);
hfs.resync(&mut database).await?;
hfs.check_set_new_hf();
tracing::info!("HardFork state: {:?}", hfs);
Ok(hfs)
}
#[instrument(skip(self, database))]
async fn resync<D: Database>(&mut self, mut database: D) -> Result<(), Error> {
let DatabaseResponse::ChainHeight(mut chain_height) = database
.ready()
.await?
.call(DatabaseRequest::ChainHeight)
.await? else {
panic!("Database sent incorrect response")
};
tracing::debug!(
"chain-tip: {}, last height: {}",
chain_height - 1,
self.last_height
);
loop {
while chain_height > self.last_height + 1 {
self.get_and_account_new_block(self.last_height + 1, &mut database)
.await;
}
let DatabaseResponse::ChainHeight(c_h) = database
.ready()
.await?
.call(DatabaseRequest::ChainHeight)
.await? else {
panic!("Database sent incorrect response")
};
chain_height = c_h;
if chain_height == self.last_height + 1 {
return Ok(());
}
tracing::debug!(
"chain-tip: {}, last height: {}",
chain_height - 1,
self.last_height
);
}
}
async fn get_and_account_new_block<D: Database>(&mut self, height: u64, mut database: D) {
let header = get_block_header(&mut database, height)
.await
.expect("Error retrieving block we should have in database");
self.new_block(HardFork::from_vote(&header.minor_version), height, database)
.await
}
pub fn check_block_version_vote(&self, version: &HardFork, vote: &HardFork) -> bool {
&self.current_hardfork == version && vote >= &self.current_hardfork
}
pub async fn new_block<D: Database>(&mut self, vote: HardFork, height: u64, database: &mut D) {
assert_eq!(self.last_height + 1, height);
pub async fn new_block<D: Database>(&mut self, vote: HardFork, height: u64, mut database: D) {
debug_assert_eq!(self.last_height + 1, height);
self.last_height += 1;
tracing::debug!(
"Accounting for new blocks vote, height: {}, vote: {:?}",
self.last_height,
vote
);
self.votes.add_vote_for_hf(&vote);
for offset in self.config.window..self.votes.total_votes() {
let header = get_block_header(database, height - offset)
let header = get_block_header(&mut database, height - offset)
.await
.expect("Error retrieving block we should have in database");
self.votes
.remove_vote_for_hf(&HardFork::from_vote(&header.minor_version));
let vote = HardFork::from_vote(&header.minor_version);
tracing::debug!(
"Removing block {} vote ({:?}) as they have left the window",
height - offset,
vote
);
self.votes.remove_vote_for_hf(&vote);
}
if height > self.config.window {
assert_eq!(self.votes.total_votes(), self.config.window);
debug_assert_eq!(self.votes.total_votes(), self.config.window);
}
self.check_set_new_hf(height + 1)
self.check_set_new_hf()
}
fn check_set_new_hf(&mut self, height: u64) {
fn check_set_new_hf(&mut self) {
while let Some(new_hf) = self.next_hardfork {
if height >= new_hf.fork_height(&self.config.network)
if self.last_height + 1 >= new_hf.fork_height(&self.config.network)
&& self.votes.get_votes_for_hf(&new_hf)
>= new_hf.votes_needed(&self.config.network, self.config.window)
{
@ -303,18 +376,25 @@ impl HardForks {
}
#[instrument(skip(database))]
async fn get_votes_in_range<D: Database>(
database: &mut D,
async fn get_votes_in_range<D: Database + Clone>(
database: D,
block_heights: Range<u64>,
) -> Result<HFVotes, Error> {
) -> Result<HFVotes, Error>
where
D::Future: Send + 'static,
{
let mut votes = HFVotes::default();
for height in block_heights {
let header = get_block_header(database, height).await?;
let mut fut =
FuturesUnordered::from_iter(block_heights.map(|height| {
get_block_header(database.clone(), height).map_ok(move |res| (height, res))
}));
while let Some(res) = fut.next().await {
let (height, header): (u64, BlockHeader) = res?;
let vote = HardFork::from_vote(&header.minor_version);
tracing::info!("Block vote for height: {} = {:?}", height, vote);
tracing::debug!("Block vote for height: {} = {:?}", height, vote);
votes.add_vote_for_hf(&HardFork::from_vote(&header.minor_version));
}
@ -323,7 +403,7 @@ async fn get_votes_in_range<D: Database>(
}
async fn get_block_header<D: Database>(
database: &mut D,
database: D,
block_id: impl Into<BlockID>,
) -> Result<BlockHeader, Error> {
let DatabaseResponse::BlockHeader(header) = database

View file

@ -1,4 +1,6 @@
use tower::ServiceExt;
pub mod genesis;
pub mod hardforks;
@ -23,7 +25,7 @@ impl<T: tower::Service<DatabaseRequest, Response = DatabaseResponse, Error = tow
{
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum DatabaseRequest {
BlockHeader(cuprate_common::BlockID),
ChainHeight,

View file

@ -1,11 +1,13 @@
use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture};
use futures::{FutureExt, TryFutureExt};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use monero_serai::rpc::{HttpRpc, RpcConnection};
use tower::BoxError;
use cuprate_common::BlockID;
@ -19,6 +21,7 @@ enum RpcState<R: RpcConnection> {
pub struct Rpc<R: RpcConnection>(
Arc<futures::lock::Mutex<monero_serai::rpc::Rpc<R>>>,
RpcState<R>,
Arc<Mutex<bool>>,
);
impl Rpc<HttpRpc> {
@ -27,22 +30,27 @@ impl Rpc<HttpRpc> {
Rpc(
Arc::new(futures::lock::Mutex::new(http_rpc)),
RpcState::Locked,
Arc::new(Mutex::new(false)),
)
}
}
impl<R: RpcConnection> Clone for Rpc<R> {
fn clone(&self) -> Self {
Rpc(Arc::clone(&self.0), RpcState::Locked)
Rpc(Arc::clone(&self.0), RpcState::Locked, Arc::clone(&self.2))
}
}
impl<R: RpcConnection + Send + Sync + 'static> tower::Service<DatabaseRequest> for Rpc<R> {
type Response = DatabaseResponse;
type Error = tower::BoxError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + 'static>>;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if *self.2.lock().unwrap() {
return Poll::Ready(Err("Rpc has errored".into()));
}
loop {
match &mut self.1 {
RpcState::Locked => self.1 = RpcState::Acquiring(self.0.clone().lock_owned()),
@ -59,28 +67,45 @@ impl<R: RpcConnection + Send + Sync + 'static> tower::Service<DatabaseRequest> f
panic!("poll_ready was not called first!");
};
let err = self.2.clone();
match req {
DatabaseRequest::ChainHeight => async move {
rpc.get_height()
let res = rpc
.get_height()
.map_ok(|height| DatabaseResponse::ChainHeight(height.try_into().unwrap()))
.map_err(Into::into)
.await
.await;
if res.is_err() {
*err.lock().unwrap() = true;
}
res
}
.boxed(),
DatabaseRequest::BlockHeader(id) => match id {
BlockID::Hash(hash) => async move {
rpc.get_block(hash)
let res = rpc
.get_block(hash)
.map_ok(|block| DatabaseResponse::BlockHeader(block.header))
.map_err(Into::into)
.await
.map_err(Into::<BoxError>::into)
.await;
if res.is_err() {
*err.lock().unwrap() = true;
}
res
}
.boxed(),
BlockID::Height(height) => async move {
rpc.get_block_by_number(height.try_into().unwrap())
let res = rpc
.get_block_by_number(height.try_into().unwrap())
.map_ok(|block| DatabaseResponse::BlockHeader(block.header))
.map_err(Into::into)
.await
.await;
if res.is_err() {
*err.lock().unwrap() = true;
}
res
}
.boxed(),
},