diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 405ee168..59adbaf2 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -21,7 +21,7 @@ futures = "0.3" crypto-bigint = "0.5" randomx-rs = "1" -monero-serai = {git="https://github.com/Cuprate/serai.git", rev = "84b77b1"} +monero-serai = {git="https://github.com/Cuprate/serai.git", rev = "46f4370"} cuprate-common = {path = "../common"} cryptonight-cuprate = {path = "../cryptonight"} @@ -33,4 +33,7 @@ serde = {version = "1", optional = true, features = ["derive"]} tokio = { version = "1", features = ["rt-multi-thread", "macros"], optional = true } tracing-subscriber = {version = "0.3", optional = true} # here to help cargo to pick a version - remove me -syn = "2.0.37" \ No newline at end of file +syn = "2.0.37" + +[profile.dev] +opt-level = 3 diff --git a/consensus/src/bin/scan_chain.rs b/consensus/src/bin/scan_chain.rs index 498246c3..8e79e3dd 100644 --- a/consensus/src/bin/scan_chain.rs +++ b/consensus/src/bin/scan_chain.rs @@ -1,6 +1,8 @@ #![cfg(feature = "binaries")] use cuprate_common::Network; +use futures::stream::FuturesOrdered; +use futures::{stream, StreamExt}; use std::collections::HashMap; use std::fmt::{Display, Formatter}; @@ -8,14 +10,14 @@ use tower::{Service, ServiceExt}; use tracing::instrument; use tracing::level_filters::LevelFilter; - - -use monero_consensus::rpc::{init_rpc_load_balancer, MAX_BLOCKS_IN_RANGE}; +use monero_consensus::rpc::init_rpc_load_balancer; use monero_consensus::{ - state::{Config, State}, + verifier::{Config, Verifier}, ConsensusError, Database, DatabaseRequest, DatabaseResponse, }; +const BATCH_SIZE: u64 = 50; + /// A cache which can keep chain state while scanning. /// /// Because we are using a RPC interface with node we need to keep track @@ -27,6 +29,8 @@ struct ScanningCache { numb_outs: HashMap, /// The height of the *next* block to scan. height: u64, + /// The hash of the *last* block scanned. + last_block_hash: [u8; 32], } impl Default for ScanningCache { @@ -34,7 +38,8 @@ impl Default for ScanningCache { ScanningCache { network: Default::default(), numb_outs: Default::default(), - height: 1, + height: 0, + last_block_hash: [0; 32], } } } @@ -71,11 +76,14 @@ impl Display for ScanningCache { } #[instrument(skip_all, level = "info")] -async fn scan_chain( +async fn scan_chain( cache: ScanningCache, network: Network, mut database: D, -) -> Result<(), ConsensusError> { +) -> Result<(), tower::BoxError> +where + D::Future: Send + 'static, +{ tracing::info!("Beginning chain scan, {}", &cache); let DatabaseResponse::ChainHeight(chain_height) = database @@ -94,21 +102,67 @@ async fn scan_chain( _ => todo!(), }; - let _state = State::init_at_chain_height(config, cache.height, database.clone()).await?; + let _state = Verifier::init_at_chain_height(config, cache.height + 1, database.clone()).await?; - tracing::info!("Initialised state, begging scan"); + tracing::info!("Initialised verifier, begging scan"); - for height in (cache.height..chain_height).step_by(MAX_BLOCKS_IN_RANGE as usize) { - let DatabaseResponse::BlockBatchInRange(_blocks) = database - .ready() - .await? - .call(DatabaseRequest::BlockBatchInRange( - height..(height + MAX_BLOCKS_IN_RANGE).max(chain_height), - )) - .await? - else { + let mut next_fut = tokio::spawn(database.clone().ready().await?.call( + DatabaseRequest::BlockBatchInRange( + cache.height..(cache.height + BATCH_SIZE).min(chain_height), + ), + )); + + for height in (cache.height..chain_height) + .step_by(BATCH_SIZE as usize) + .skip(1) + { + // Call the next batch while we handle this batch. The RPC does not require use to use .await before + // it starts working on the request. + let current_fut = std::mem::replace( + &mut next_fut, + tokio::spawn( + database + .ready() + .await? + .call(DatabaseRequest::BlockBatchInRange( + height..(height + BATCH_SIZE).min(chain_height), + )), + ), + ); + + let DatabaseResponse::BlockBatchInRange(blocks) = current_fut.await?? else { panic!("Database sent incorrect response!"); }; + + let mut block_data_fut = FuturesOrdered::from_iter(blocks.iter().map(|b| async { + if !b.txs.is_empty() { + let txs = b.txs.clone(); + let db = database.clone(); + tokio::spawn(async move { + let DatabaseResponse::Transactions(txs) = + db.oneshot(DatabaseRequest::Transactions(txs)).await? + else { + panic!("Database sent incorrect response!"); + }; + Ok(txs) + }) + .await + .unwrap() + } else { + Ok(vec![]) + } + })) + .zip(stream::iter(blocks.iter())); + + while let Some((txs, block)) = block_data_fut.next().await { + let txs = txs.map_err(|e: ConsensusError| e)?; + } + + tracing::info!( + "Moving onto next batch: {:?}, chain height: {}", + height..(height + BATCH_SIZE).min(chain_height), + chain_height + ); } Ok(()) @@ -124,17 +178,17 @@ async fn main() { "http://xmr-node.cakewallet.com:18081".to_string(), "http://node.sethforprivacy.com".to_string(), "http://nodex.monerujo.io:18081".to_string(), - "http://node.community.rino.io:18081".to_string(), + // "http://node.community.rino.io:18081".to_string(), "http://nodes.hashvault.pro:18081".to_string(), - "http://node.moneroworld.com:18089".to_string(), + // "http://node.moneroworld.com:18089".to_string(), "http://node.c3pool.com:18081".to_string(), // "http://xmr-node.cakewallet.com:18081".to_string(), "http://node.sethforprivacy.com".to_string(), "http://nodex.monerujo.io:18081".to_string(), - "http://node.community.rino.io:18081".to_string(), + // "http://node.community.rino.io:18081".to_string(), "http://nodes.hashvault.pro:18081".to_string(), - "http://node.moneroworld.com:18089".to_string(), + // "http://node.moneroworld.com:18089".to_string(), "http://node.c3pool.com:18081".to_string(), ]; diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index b35e4f30..075cbc20 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -4,7 +4,7 @@ pub mod hardforks; pub mod miner_tx; #[cfg(feature = "binaries")] pub mod rpc; -pub mod state; +pub mod verifier; #[derive(Debug, thiserror::Error)] pub enum ConsensusError { @@ -38,6 +38,8 @@ pub enum DatabaseRequest { #[cfg(feature = "binaries")] BlockBatchInRange(std::ops::Range), + #[cfg(feature = "binaries")] + Transactions(Vec<[u8; 32]>), } #[derive(Debug)] @@ -54,4 +56,6 @@ pub enum DatabaseResponse { #[cfg(feature = "binaries")] BlockBatchInRange(Vec), + #[cfg(feature = "binaries")] + Transactions(Vec), } diff --git a/consensus/src/rpc.rs b/consensus/src/rpc.rs index 384dfc68..1ba0affb 100644 --- a/consensus/src/rpc.rs +++ b/consensus/src/rpc.rs @@ -7,7 +7,6 @@ use std::task::{Context, Poll}; use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture}; use futures::{stream::FuturesOrdered, FutureExt, TryFutureExt, TryStreamExt}; -use monero_serai::block::Block; use monero_serai::rpc::{HttpRpc, RpcConnection, RpcError}; use serde::Deserialize; use serde_json::json; @@ -22,7 +21,8 @@ use crate::block::weight::BlockWeightInfo; use crate::hardforks::BlockHFInfo; use crate::{DatabaseRequest, DatabaseResponse}; -pub const MAX_BLOCKS_IN_RANGE: u64 = 50; +pub const MAX_BLOCKS_IN_RANGE: u64 = 10; +pub const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 50; #[derive(Clone)] pub struct Attempts(u64); @@ -31,8 +31,11 @@ impl tower::retry::Policy for Attempts { type Future = futures::future::Ready; fn retry(&self, _: &Req, result: Result<&Res, &E>) -> Option { if result.is_err() { - // TODO: - Some(futures::future::ready(Attempts(self.0))) + if self.0 == 0 { + None + } else { + Some(futures::future::ready(Attempts(self.0 - 1))) + } } else { None } @@ -45,8 +48,14 @@ impl tower::retry::Policy for Attempts { pub fn init_rpc_load_balancer( addresses: Vec, -) -> impl tower::Service + Clone -{ +) -> impl tower::Service< + DatabaseRequest, + Response = DatabaseResponse, + Error = tower::BoxError, + Future = Pin< + Box> + Send + 'static>, + >, +> + Clone { let rpc_discoverer = tower::discover::ServiceList::new( addresses .into_iter() @@ -54,7 +63,7 @@ pub fn init_rpc_load_balancer( ); let rpc_balance = Balance::new(rpc_discoverer); let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(rpc_balance), 3); - let rpcs = tower::retry::Retry::new(Attempts(3), rpc_buffer); + let rpcs = tower::retry::Retry::new(Attempts(2), rpc_buffer); RpcBalancer { rpcs } } @@ -99,6 +108,7 @@ where DatabaseRequest::BlockBatchInRange, DatabaseResponse::BlockBatchInRange, resp_to_ret, + MAX_BLOCKS_IN_RANGE, ) } DatabaseRequest::BlockPOWInfoInRange(range) => { @@ -114,6 +124,7 @@ where DatabaseRequest::BlockPOWInfoInRange, DatabaseResponse::BlockPOWInfoInRange, resp_to_ret, + MAX_BLOCKS_HEADERS_IN_RANGE, ) } @@ -130,6 +141,7 @@ where DatabaseRequest::BlockWeightsInRange, DatabaseResponse::BlockWeightsInRange, resp_to_ret, + MAX_BLOCKS_HEADERS_IN_RANGE, ) } DatabaseRequest::BlockHfInfoInRange(range) => { @@ -145,6 +157,7 @@ where DatabaseRequest::BlockHfInfoInRange, DatabaseResponse::BlockHfInfoInRange, resp_to_ret, + MAX_BLOCKS_HEADERS_IN_RANGE, ) } req => this.oneshot(req).boxed(), @@ -158,6 +171,7 @@ fn split_range_request( req: impl FnOnce(Range) -> DatabaseRequest + Clone + Send + 'static, resp: impl FnOnce(Vec) -> DatabaseResponse + Send + 'static, resp_to_ret: impl Fn(DatabaseResponse) -> Vec + Copy + Send + 'static, + max_request_per_rpc: u64, ) -> Pin> + Send + 'static>> where T: tower::Service @@ -169,11 +183,11 @@ where Ret: Send + 'static, { let iter = (0..range.clone().count() as u64) - .step_by(MAX_BLOCKS_IN_RANGE as usize) + .step_by(max_request_per_rpc as usize) .map(|i| { let req = req.clone(); let new_range = - (range.start + i)..(min(range.start + i + MAX_BLOCKS_IN_RANGE, range.end)); + (range.start + i)..(min(range.start + i + max_request_per_rpc, range.end)); rpc.clone().oneshot(req(new_range)).map_ok(resp_to_ret) }); @@ -280,46 +294,39 @@ impl tower::Service f get_blocks_pow_info_in_range(range, rpc).boxed() } DatabaseRequest::BlockBatchInRange(range) => get_blocks_in_range(range, rpc).boxed(), + DatabaseRequest::Transactions(txs) => get_transactions(txs, rpc).boxed(), } } } +async fn get_transactions( + txs: Vec<[u8; 32]>, + rpc: OwnedMutexGuard>, +) -> Result { + if txs.is_empty() { + return Ok(DatabaseResponse::Transactions(vec![])); + } + tracing::info!("Getting transactions, count: {}", txs.len()); + + let txs = rpc.get_transactions(&txs).await?; + + Ok(DatabaseResponse::Transactions(txs)) +} + async fn get_blocks_in_range( range: Range, rpc: OwnedMutexGuard>, ) -> Result { + let fut = FuturesOrdered::from_iter( + range + .clone() + .map(|height| rpc.get_block_by_number(height as usize)), + ); + tracing::info!("Getting blocks in range: {:?}", range); - mod i_64079 { - use epee_encoding::EpeeObject; - - #[derive(EpeeObject)] - pub struct Request { - pub heights: Vec, - } - - #[derive(EpeeObject)] - pub struct Response { - pub blocks: Vec>, - } - } - use i_64079::*; - - let res = rpc - .bin_call( - "get_blocks_by_height.bin", - epee_encoding::to_bytes(&Request { - heights: range.collect(), - })?, - ) - .await?; - let res: Response = epee_encoding::from_bytes(&res)?; - Ok(DatabaseResponse::BlockBatchInRange( - res.blocks - .into_iter() - .map(|buf| Block::read(&mut buf.as_slice())) - .collect::>()?, + fut.try_collect().await?, )) }