diff --git a/common/src/lib.rs b/common/src/lib.rs index f6f9426e..4cf66dd4 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -13,7 +13,7 @@ pub const CRYPTONOTE_PRUNING_LOG_STRIPES: u32 = 3; pub const CRYPTONOTE_PRUNING_STRIPE_SIZE: u64 = 4096; pub const CRYPTONOTE_PRUNING_TIP_BLOCKS: u64 = 5500; -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum BlockID { Hash([u8; 32]), Height(u64), diff --git a/common/src/pruning.rs b/common/src/pruning.rs index 13e85506..d4575696 100644 --- a/common/src/pruning.rs +++ b/common/src/pruning.rs @@ -78,7 +78,7 @@ impl PruningSeed { /// and 3 for `log_stripes`.* /// pub fn new(stripe: u32, log_stripes: u32) -> Result { - if !(log_stripes <= PRUNING_SEED_LOG_STRIPES_MASK) { + if log_stripes > PRUNING_SEED_LOG_STRIPES_MASK { Err(PruningError::LogStripesOutOfRange) } else if !(stripe > 0 && stripe <= (1 << log_stripes)) { Err(PruningError::StripeOutOfRange) diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index cd2978fd..4d0e927d 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -9,7 +9,7 @@ repository = "https://github.com/Cuprate/cuprate/tree/main/consensus" [features] default = ["binaries"] -binaries = ["rpc", "dep:tokio", "dep:tracing-subscriber", "tower/retry", "tower/balance"] +binaries = ["rpc", "dep:tokio", "dep:tracing-subscriber", "tower/retry", "tower/balance", "tower/buffer"] rpc = ["dep:futures"] [dependencies] diff --git a/consensus/src/bin/scan_chain.rs b/consensus/src/bin/scan_chain.rs index 5cc269b4..d8bfcdbd 100644 --- a/consensus/src/bin/scan_chain.rs +++ b/consensus/src/bin/scan_chain.rs @@ -1,39 +1,86 @@ #![cfg(feature = "binaries")] -use tower::{Service, ServiceExt}; +use futures::Stream; +use monero_serai::rpc::HttpRpc; +use std::pin::Pin; + +use std::task::{Context, Poll}; +use tower::balance::p2c::Balance; +use tower::discover::Change; +use tower::util::{BoxService}; + use tracing::level_filters::LevelFilter; -use monero_consensus::hardforks::{HardFork, HardForkConfig, HardForks}; +use monero_consensus::hardforks::{HardForkConfig, HardForks}; use monero_consensus::rpc::Rpc; -use monero_consensus::DatabaseRequest; + + +struct RpcDiscoverer(Vec, u64); + +impl Stream for RpcDiscoverer { + type Item = Result>, tower::BoxError>; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + if let Some(url) = this.0.pop() { + this.1 += 1; + return Poll::Ready(Some(Ok(Change::Insert(this.1, Rpc::new_http(url))))); + } + Poll::Ready(None) + } +} + +#[derive(Clone)] +pub struct Attempts(u64); + +impl tower::retry::Policy for Attempts { + type Future = futures::future::Ready; + fn retry(&self, _: &Req, result: Result<&Res, &E>) -> Option { + if result.is_err() { + Some(futures::future::ready(Attempts(self.0 - 1))) + } else { + None + } + } + + fn clone_request(&self, req: &Req) -> Option { + Some(req.clone()) + } +} #[tokio::main] async fn main() { tracing_subscriber::fmt() - .with_max_level(LevelFilter::INFO) + .with_max_level(LevelFilter::DEBUG) .init(); - let mut rpc = Rpc::new_http("http://xmr-node.cakewallet.com:18081".to_string()); + let urls = vec![ + "http://xmr-node.cakewallet.com:18081".to_string(), + "http://node.sethforprivacy.com".to_string(), + "http://nodex.monerujo.io:18081".to_string(), + "http://node.community.rino.io:18081".to_string(), + "http://nodes.hashvault.pro:18081".to_string(), + "http://node.moneroworld.com:18089".to_string(), + "http://node.c3pool.com:18081".to_string(), + // + "http://xmr-node.cakewallet.com:18081".to_string(), + "http://node.sethforprivacy.com".to_string(), + "http://nodex.monerujo.io:18081".to_string(), + "http://node.community.rino.io:18081".to_string(), + "http://nodes.hashvault.pro:18081".to_string(), + "http://node.moneroworld.com:18089".to_string(), + "http://node.c3pool.com:18081".to_string(), + ]; - let res = rpc - .ready() - .await - .unwrap() - .call(DatabaseRequest::ChainHeight) + let rpc_discoverer = tower::discover::ServiceList::new( + urls.into_iter() + .map(|url| tower::load::Constant::new(Rpc::new_http(url), 0)), + ); + let rpc_balance = Balance::new(rpc_discoverer); + let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(rpc_balance), 3); + let rpc = tower::retry::Retry::new(Attempts(3), rpc_buffer); + + let _hfs = HardForks::init(HardForkConfig::default(), rpc.clone()) .await .unwrap(); - - println!("{:?}", res); - - let mut hfs = HardForks::init(HardForkConfig::default(), &mut rpc) - .await - .unwrap(); - - println!("{:?}", hfs); - - hfs.new_block(HardFork::V2, 1009827, &mut rpc).await; - println!("{:?}", hfs); - - hfs.new_block(HardFork::V2, 1009828, &mut rpc).await; - println!("{:?}", hfs); } diff --git a/consensus/src/hardforks.rs b/consensus/src/hardforks.rs index e2d9a65e..2873e4c9 100644 --- a/consensus/src/hardforks.rs +++ b/consensus/src/hardforks.rs @@ -1,3 +1,5 @@ +use futures::stream::FuturesUnordered; +use futures::{StreamExt, TryFutureExt}; use std::ops::Range; use monero_serai::block::BlockHeader; @@ -195,7 +197,7 @@ impl Default for HardForkConfig { fn default() -> Self { Self { network: Network::Mainnet, - window: 3, //DEFAULT_WINDOW_SIZE, + window: DEFAULT_WINDOW_SIZE, } } } @@ -213,9 +215,12 @@ pub struct HardForks { } impl HardForks { - pub async fn init(config: HardForkConfig, database: &mut D) -> Result + pub async fn init( + config: HardForkConfig, + mut database: D, + ) -> Result where - D: Database, + D::Future: Send + 'static, { let DatabaseResponse::ChainHeight(chain_height) = database .ready() @@ -231,13 +236,13 @@ impl HardForks { 0..chain_height }; - let votes = get_votes_in_range(database, block_heights).await?; + let votes = get_votes_in_range(database.clone(), block_heights).await?; if chain_height > config.window { - assert_eq!(votes.total_votes(), config.window) + debug_assert_eq!(votes.total_votes(), config.window) } - let latest_header = get_block_header(database, chain_height - 1).await?; + let latest_header = get_block_header(&mut database, chain_height - 1).await?; let current_hardfork = HardFork::from_version(&latest_header.major_version) .expect("Invalid major version in stored block"); @@ -252,40 +257,108 @@ impl HardForks { last_height: chain_height - 1, }; - // chain_height = height + 1 - hfs.check_set_new_hf(chain_height); + hfs.resync(&mut database).await?; + + hfs.check_set_new_hf(); + + tracing::info!("HardFork state: {:?}", hfs); Ok(hfs) } + #[instrument(skip(self, database))] + async fn resync(&mut self, mut database: D) -> Result<(), Error> { + let DatabaseResponse::ChainHeight(mut chain_height) = database + .ready() + .await? + .call(DatabaseRequest::ChainHeight) + .await? else { + panic!("Database sent incorrect response") + }; + + tracing::debug!( + "chain-tip: {}, last height: {}", + chain_height - 1, + self.last_height + ); + + loop { + while chain_height > self.last_height + 1 { + self.get_and_account_new_block(self.last_height + 1, &mut database) + .await; + } + + let DatabaseResponse::ChainHeight(c_h) = database + .ready() + .await? + .call(DatabaseRequest::ChainHeight) + .await? else { + panic!("Database sent incorrect response") + }; + chain_height = c_h; + + if chain_height == self.last_height + 1 { + return Ok(()); + } + + tracing::debug!( + "chain-tip: {}, last height: {}", + chain_height - 1, + self.last_height + ); + } + } + + async fn get_and_account_new_block(&mut self, height: u64, mut database: D) { + let header = get_block_header(&mut database, height) + .await + .expect("Error retrieving block we should have in database"); + + self.new_block(HardFork::from_vote(&header.minor_version), height, database) + .await + } + pub fn check_block_version_vote(&self, version: &HardFork, vote: &HardFork) -> bool { &self.current_hardfork == version && vote >= &self.current_hardfork } - pub async fn new_block(&mut self, vote: HardFork, height: u64, database: &mut D) { - assert_eq!(self.last_height + 1, height); + pub async fn new_block(&mut self, vote: HardFork, height: u64, mut database: D) { + debug_assert_eq!(self.last_height + 1, height); self.last_height += 1; + tracing::debug!( + "Accounting for new blocks vote, height: {}, vote: {:?}", + self.last_height, + vote + ); + self.votes.add_vote_for_hf(&vote); for offset in self.config.window..self.votes.total_votes() { - let header = get_block_header(database, height - offset) + let header = get_block_header(&mut database, height - offset) .await .expect("Error retrieving block we should have in database"); - self.votes - .remove_vote_for_hf(&HardFork::from_vote(&header.minor_version)); + + let vote = HardFork::from_vote(&header.minor_version); + tracing::debug!( + "Removing block {} vote ({:?}) as they have left the window", + height - offset, + vote + ); + + self.votes.remove_vote_for_hf(&vote); } if height > self.config.window { - assert_eq!(self.votes.total_votes(), self.config.window); + debug_assert_eq!(self.votes.total_votes(), self.config.window); } - self.check_set_new_hf(height + 1) + self.check_set_new_hf() } - fn check_set_new_hf(&mut self, height: u64) { + fn check_set_new_hf(&mut self) { while let Some(new_hf) = self.next_hardfork { - if height >= new_hf.fork_height(&self.config.network) + if self.last_height + 1 >= new_hf.fork_height(&self.config.network) && self.votes.get_votes_for_hf(&new_hf) >= new_hf.votes_needed(&self.config.network, self.config.window) { @@ -303,18 +376,25 @@ impl HardForks { } #[instrument(skip(database))] -async fn get_votes_in_range( - database: &mut D, +async fn get_votes_in_range( + database: D, block_heights: Range, -) -> Result { +) -> Result +where + D::Future: Send + 'static, +{ let mut votes = HFVotes::default(); - for height in block_heights { - let header = get_block_header(database, height).await?; + let mut fut = + FuturesUnordered::from_iter(block_heights.map(|height| { + get_block_header(database.clone(), height).map_ok(move |res| (height, res)) + })); + while let Some(res) = fut.next().await { + let (height, header): (u64, BlockHeader) = res?; let vote = HardFork::from_vote(&header.minor_version); - tracing::info!("Block vote for height: {} = {:?}", height, vote); + tracing::debug!("Block vote for height: {} = {:?}", height, vote); votes.add_vote_for_hf(&HardFork::from_vote(&header.minor_version)); } @@ -323,7 +403,7 @@ async fn get_votes_in_range( } async fn get_block_header( - database: &mut D, + database: D, block_id: impl Into, ) -> Result { let DatabaseResponse::BlockHeader(header) = database diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 4b24ce8e..08647739 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -1,4 +1,6 @@ -use tower::ServiceExt; + + + pub mod genesis; pub mod hardforks; @@ -23,7 +25,7 @@ impl { pub struct Rpc( Arc>>, RpcState, + Arc>, ); impl Rpc { @@ -27,22 +30,27 @@ impl Rpc { Rpc( Arc::new(futures::lock::Mutex::new(http_rpc)), RpcState::Locked, + Arc::new(Mutex::new(false)), ) } } impl Clone for Rpc { fn clone(&self) -> Self { - Rpc(Arc::clone(&self.0), RpcState::Locked) + Rpc(Arc::clone(&self.0), RpcState::Locked, Arc::clone(&self.2)) } } impl tower::Service for Rpc { type Response = DatabaseResponse; type Error = tower::BoxError; - type Future = Pin> + 'static>>; + type Future = + Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + if *self.2.lock().unwrap() { + return Poll::Ready(Err("Rpc has errored".into())); + } loop { match &mut self.1 { RpcState::Locked => self.1 = RpcState::Acquiring(self.0.clone().lock_owned()), @@ -59,28 +67,45 @@ impl tower::Service f panic!("poll_ready was not called first!"); }; + let err = self.2.clone(); + match req { DatabaseRequest::ChainHeight => async move { - rpc.get_height() + let res = rpc + .get_height() .map_ok(|height| DatabaseResponse::ChainHeight(height.try_into().unwrap())) .map_err(Into::into) - .await + .await; + if res.is_err() { + *err.lock().unwrap() = true; + } + res } .boxed(), DatabaseRequest::BlockHeader(id) => match id { BlockID::Hash(hash) => async move { - rpc.get_block(hash) + let res = rpc + .get_block(hash) .map_ok(|block| DatabaseResponse::BlockHeader(block.header)) - .map_err(Into::into) - .await + .map_err(Into::::into) + .await; + if res.is_err() { + *err.lock().unwrap() = true; + } + res } .boxed(), BlockID::Height(height) => async move { - rpc.get_block_by_number(height.try_into().unwrap()) + let res = rpc + .get_block_by_number(height.try_into().unwrap()) .map_ok(|block| DatabaseResponse::BlockHeader(block.header)) .map_err(Into::into) - .await + .await; + if res.is_err() { + *err.lock().unwrap() = true; + } + res } .boxed(), },