From 20f6af79511020ea210994bd6c789a36597d5aa1 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Thu, 5 Oct 2023 17:54:19 +0100 Subject: [PATCH] use `get_blocks_by_height.bin` in RPC --- consensus/Cargo.toml | 17 ++- consensus/src/bin/scan_chain.rs | 55 +++------- consensus/src/block.rs | 3 + consensus/src/lib.rs | 15 ++- consensus/src/rpc.rs | 100 ++++++++++++------ net/monero-wire/Cargo.toml | 3 +- net/monero-wire/src/lib.rs | 1 + net/monero-wire/src/messages/common.rs | 13 ++- .../{serde_helpers.rs => serde_impls.rs} | 0 net/monero-wire/src/serde_helpers.rs | 13 +++ 10 files changed, 133 insertions(+), 87 deletions(-) rename net/monero-wire/src/messages/common/{serde_helpers.rs => serde_impls.rs} (100%) create mode 100644 net/monero-wire/src/serde_helpers.rs diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 59adbaf2..4e3853c9 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -9,7 +9,17 @@ repository = "https://github.com/Cuprate/cuprate/tree/main/consensus" [features] default = ["binaries"] -binaries = ["dep:tokio", "dep:tracing-subscriber", "tower/retry", "tower/balance", "tower/buffer", "dep:serde_json", "dep:serde", "dep:epee-encoding"] +binaries = [ + "dep:tokio", + "dep:tracing-subscriber", + "tower/retry", + "tower/balance", + "tower/buffer", + "dep:serde_json", + "dep:serde", + "dep:monero-epee-bin-serde", + "dep:monero-wire" +] [dependencies] hex = "0.4" @@ -27,7 +37,8 @@ cuprate-common = {path = "../common"} cryptonight-cuprate = {path = "../cryptonight"} # used in binaries -epee-encoding = {version = "0.5", optional = true} +monero-wire = {path="../net/monero-wire", optional = true} +monero-epee-bin-serde = {git = "https://github.com/monero-rs/monero-epee-bin-serde.git", rev = "e4a585a", optional = true} serde_json = {version = "1", optional = true} serde = {version = "1", optional = true, features = ["derive"]} tokio = { version = "1", features = ["rt-multi-thread", "macros"], optional = true } @@ -35,5 +46,3 @@ tracing-subscriber = {version = "0.3", optional = true} # here to help cargo to pick a version - remove me syn = "2.0.37" -[profile.dev] -opt-level = 3 diff --git a/consensus/src/bin/scan_chain.rs b/consensus/src/bin/scan_chain.rs index 8e79e3dd..62525388 100644 --- a/consensus/src/bin/scan_chain.rs +++ b/consensus/src/bin/scan_chain.rs @@ -1,27 +1,26 @@ #![cfg(feature = "binaries")] -use cuprate_common::Network; -use futures::stream::FuturesOrdered; -use futures::{stream, StreamExt}; use std::collections::HashMap; use std::fmt::{Display, Formatter}; -use tower::{Service, ServiceExt}; +use tower::ServiceExt; use tracing::instrument; use tracing::level_filters::LevelFilter; -use monero_consensus::rpc::init_rpc_load_balancer; +use cuprate_common::Network; + +use monero_consensus::rpc::{init_rpc_load_balancer, MAX_BLOCKS_IN_RANGE}; use monero_consensus::{ verifier::{Config, Verifier}, - ConsensusError, Database, DatabaseRequest, DatabaseResponse, + Database, DatabaseRequest, DatabaseResponse, }; -const BATCH_SIZE: u64 = 50; +const BATCH_SIZE: u64 = MAX_BLOCKS_IN_RANGE * 4; /// A cache which can keep chain state while scanning. /// -/// Because we are using a RPC interface with node we need to keep track -/// of certain data that node doesn't hold like the number of outputs at +/// Because we are using a RPC interface with a node we need to keep track +/// of certain data that the node doesn't hold like the number of outputs at /// a certain time. #[derive(Debug, Clone)] struct ScanningCache { @@ -29,8 +28,6 @@ struct ScanningCache { numb_outs: HashMap, /// The height of the *next* block to scan. height: u64, - /// The hash of the *last* block scanned. - last_block_hash: [u8; 32], } impl Default for ScanningCache { @@ -38,8 +35,7 @@ impl Default for ScanningCache { ScanningCache { network: Default::default(), numb_outs: Default::default(), - height: 0, - last_block_hash: [0; 32], + height: 1, } } } @@ -102,7 +98,7 @@ where _ => todo!(), }; - let _state = Verifier::init_at_chain_height(config, cache.height + 1, database.clone()).await?; + let verifier = Verifier::init_at_chain_height(config, cache.height, database.clone()).await?; tracing::info!("Initialised verifier, begging scan"); @@ -116,8 +112,7 @@ where .step_by(BATCH_SIZE as usize) .skip(1) { - // Call the next batch while we handle this batch. The RPC does not require use to use .await before - // it starts working on the request. + // Call the next batch while we handle this batch. let current_fut = std::mem::replace( &mut next_fut, tokio::spawn( @@ -134,28 +129,8 @@ where panic!("Database sent incorrect response!"); }; - let mut block_data_fut = FuturesOrdered::from_iter(blocks.iter().map(|b| async { - if !b.txs.is_empty() { - let txs = b.txs.clone(); - let db = database.clone(); - tokio::spawn(async move { - let DatabaseResponse::Transactions(txs) = - db.oneshot(DatabaseRequest::Transactions(txs)).await? - else { - panic!("Database sent incorrect response!"); - }; - Ok(txs) - }) - .await - .unwrap() - } else { - Ok(vec![]) - } - })) - .zip(stream::iter(blocks.iter())); - - while let Some((txs, block)) = block_data_fut.next().await { - let txs = txs.map_err(|e: ConsensusError| e)?; + for (block, txs) in blocks.into_iter() { + println!("{}, {}", hex::encode(block.hash()), txs.len()); } tracing::info!( @@ -178,7 +153,7 @@ async fn main() { "http://xmr-node.cakewallet.com:18081".to_string(), "http://node.sethforprivacy.com".to_string(), "http://nodex.monerujo.io:18081".to_string(), - // "http://node.community.rino.io:18081".to_string(), + //"http://node.community.rino.io:18081".to_string(), "http://nodes.hashvault.pro:18081".to_string(), // "http://node.moneroworld.com:18089".to_string(), "http://node.c3pool.com:18081".to_string(), @@ -186,7 +161,7 @@ async fn main() { "http://xmr-node.cakewallet.com:18081".to_string(), "http://node.sethforprivacy.com".to_string(), "http://nodex.monerujo.io:18081".to_string(), - // "http://node.community.rino.io:18081".to_string(), + //"http://node.community.rino.io:18081".to_string(), "http://nodes.hashvault.pro:18081".to_string(), // "http://node.moneroworld.com:18089".to_string(), "http://node.c3pool.com:18081".to_string(), diff --git a/consensus/src/block.rs b/consensus/src/block.rs index c23a5da6..e971c7aa 100644 --- a/consensus/src/block.rs +++ b/consensus/src/block.rs @@ -1,2 +1,5 @@ pub mod pow; pub mod weight; + +pub use pow::{check_block_pow, difficulty::DifficultyCache, BlockPOWInfo}; +pub use weight::{block_weight, BlockWeightInfo, BlockWeightsCache}; diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 075cbc20..6de677d7 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -10,6 +10,8 @@ pub mod verifier; pub enum ConsensusError { #[error("Invalid hard fork version: {0}")] InvalidHardForkVersion(&'static str), + #[error("The block has a different previous hash than expected")] + BlockIsNotApartOfChain, #[error("Database error: {0}")] Database(#[from] tower::BoxError), } @@ -29,6 +31,7 @@ pub enum DatabaseRequest { BlockHFInfo(cuprate_common::BlockID), BlockPOWInfo(cuprate_common::BlockID), BlockWeights(cuprate_common::BlockID), + BlockHash(u64), BlockHfInfoInRange(std::ops::Range), BlockWeightsInRange(std::ops::Range), @@ -38,8 +41,6 @@ pub enum DatabaseRequest { #[cfg(feature = "binaries")] BlockBatchInRange(std::ops::Range), - #[cfg(feature = "binaries")] - Transactions(Vec<[u8; 32]>), } #[derive(Debug)] @@ -47,6 +48,7 @@ pub enum DatabaseResponse { BlockHFInfo(hardforks::BlockHFInfo), BlockPOWInfo(block::pow::BlockPOWInfo), BlockWeights(block::weight::BlockWeightInfo), + BlockHash([u8; 32]), BlockHfInfoInRange(Vec), BlockWeightsInRange(Vec), @@ -55,7 +57,10 @@ pub enum DatabaseResponse { ChainHeight(u64), #[cfg(feature = "binaries")] - BlockBatchInRange(Vec), - #[cfg(feature = "binaries")] - Transactions(Vec), + BlockBatchInRange( + Vec<( + monero_serai::block::Block, + Vec, + )>, + ), } diff --git a/consensus/src/rpc.rs b/consensus/src/rpc.rs index 1ba0affb..4317882a 100644 --- a/consensus/src/rpc.rs +++ b/consensus/src/rpc.rs @@ -8,21 +8,22 @@ use std::task::{Context, Poll}; use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture}; use futures::{stream::FuturesOrdered, FutureExt, TryFutureExt, TryStreamExt}; use monero_serai::rpc::{HttpRpc, RpcConnection, RpcError}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use serde_json::json; use tower::balance::p2c::Balance; use tower::util::BoxService; use tower::ServiceExt; use cuprate_common::BlockID; +use monero_wire::common::{BlockCompleteEntry, TransactionBlobs}; use crate::block::pow::BlockPOWInfo; use crate::block::weight::BlockWeightInfo; use crate::hardforks::BlockHFInfo; use crate::{DatabaseRequest, DatabaseResponse}; -pub const MAX_BLOCKS_IN_RANGE: u64 = 10; -pub const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 50; +pub const MAX_BLOCKS_IN_RANGE: u64 = 75; +pub const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 200; #[derive(Clone)] pub struct Attempts(u64); @@ -227,16 +228,6 @@ impl Rpc { } } -impl Clone for Rpc { - fn clone(&self) -> Self { - Rpc { - rpc: Arc::clone(&self.rpc), - rpc_state: RpcState::Locked, - error_slot: Arc::clone(&self.error_slot), - } - } -} - impl tower::Service for Rpc { type Response = DatabaseResponse; type Error = tower::BoxError; @@ -269,6 +260,17 @@ impl tower::Service f let err_slot = self.error_slot.clone(); match req { + DatabaseRequest::BlockHash(height) => async move { + let res: Result<_, RpcError> = rpc + .get_block_hash(height as usize) + .map_ok(|hash| DatabaseResponse::BlockHash(hash)) + .await; + if let Err(e) = &res { + *err_slot.lock().unwrap() = Some(e.clone()); + } + res.map_err(Into::into) + } + .boxed(), DatabaseRequest::ChainHeight => async move { let res: Result<_, RpcError> = rpc .get_height() @@ -294,39 +296,62 @@ impl tower::Service f get_blocks_pow_info_in_range(range, rpc).boxed() } DatabaseRequest::BlockBatchInRange(range) => get_blocks_in_range(range, rpc).boxed(), - DatabaseRequest::Transactions(txs) => get_transactions(txs, rpc).boxed(), } } } -async fn get_transactions( - txs: Vec<[u8; 32]>, - rpc: OwnedMutexGuard>, -) -> Result { - if txs.is_empty() { - return Ok(DatabaseResponse::Transactions(vec![])); - } - tracing::info!("Getting transactions, count: {}", txs.len()); - - let txs = rpc.get_transactions(&txs).await?; - - Ok(DatabaseResponse::Transactions(txs)) -} - async fn get_blocks_in_range( range: Range, rpc: OwnedMutexGuard>, ) -> Result { - let fut = FuturesOrdered::from_iter( - range - .clone() - .map(|height| rpc.get_block_by_number(height as usize)), - ); - tracing::info!("Getting blocks in range: {:?}", range); + #[derive(Serialize)] + pub struct Request { + pub heights: Vec, + } + + #[derive(Deserialize)] + pub struct Response { + pub blocks: Vec, + } + + let res = rpc + .bin_call( + "get_blocks_by_height.bin", + monero_epee_bin_serde::to_bytes(&Request { + heights: range.collect(), + })?, + ) + .await?; + + let blocks: Response = monero_epee_bin_serde::from_bytes(&res)?; + Ok(DatabaseResponse::BlockBatchInRange( - fut.try_collect().await?, + blocks + .blocks + .into_iter() + .map(|b| { + Ok(( + monero_serai::block::Block::read(&mut b.block.as_slice())?, + if let Some(txs) = b.txs { + match txs { + TransactionBlobs::Pruned(_) => { + return Err("node sent pruned txs!".into()) + } + TransactionBlobs::Normal(txs) => txs + .into_iter() + .map(|tx| { + monero_serai::transaction::Transaction::read(&mut tx.as_slice()) + }) + .collect::>()?, + } + } else { + vec![] + }, + )) + }) + .collect::>()?, )) } @@ -485,3 +510,8 @@ async fn get_blocks_hf_info_in_range( .collect(), )) } + +#[derive(Deserialize)] +pub struct BResponse { + pub blocks: Vec, +} diff --git a/net/monero-wire/Cargo.toml b/net/monero-wire/Cargo.toml index cf927503..b6f787b3 100644 --- a/net/monero-wire/Cargo.toml +++ b/net/monero-wire/Cargo.toml @@ -10,8 +10,9 @@ repository = "https://github.com/SyntheticBird45/cuprate/tree/main/net/monero-wi [dependencies] levin-cuprate = {path="../levin"} epee-encoding = { git = "https://github.com/boog900/epee-encoding"} -monero-epee-bin-serde = {git = "https://github.com/monero-rs/monero-epee-bin-serde.git"} +monero-epee-bin-serde = {git = "https://github.com/monero-rs/monero-epee-bin-serde.git", rev="e4a585a"} serde = {version = "1", features = ["derive"]} +serde_with = "3" [dev-dependencies] hex = "0.4.3" diff --git a/net/monero-wire/src/lib.rs b/net/monero-wire/src/lib.rs index 4428ee13..36336e90 100644 --- a/net/monero-wire/src/lib.rs +++ b/net/monero-wire/src/lib.rs @@ -31,6 +31,7 @@ pub mod messages; pub mod network_address; +mod serde_helpers; pub use network_address::NetworkAddress; diff --git a/net/monero-wire/src/messages/common.rs b/net/monero-wire/src/messages/common.rs index 5228fb73..5c69882a 100644 --- a/net/monero-wire/src/messages/common.rs +++ b/net/monero-wire/src/messages/common.rs @@ -17,11 +17,15 @@ // use epee_encoding::EpeeObject; use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, Bytes}; -use crate::NetworkAddress; +use crate::{ + serde_helpers::{default_false, default_zero}, + NetworkAddress, +}; mod builders; -mod serde_helpers; +mod serde_impls; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct PeerSupportFlags(u32); @@ -197,18 +201,23 @@ impl TransactionBlobs { } /// A Block that can contain transactions +#[serde_as] #[derive(Clone, Debug, EpeeObject, Serialize, Deserialize, PartialEq, Eq)] pub struct BlockCompleteEntry { /// True if tx data is pruned #[epee_default(false)] + #[serde(default = "default_false")] pub pruned: bool, /// The Block + #[serde_as(as = "Bytes")] pub block: Vec, /// The Block Weight/Size #[epee_default(0)] + #[serde(default = "default_zero")] pub block_weight: u64, /// The blocks txs #[epee_default(None)] + #[serde(skip_serializing_if = "Option::is_none")] pub txs: Option, } diff --git a/net/monero-wire/src/messages/common/serde_helpers.rs b/net/monero-wire/src/messages/common/serde_impls.rs similarity index 100% rename from net/monero-wire/src/messages/common/serde_helpers.rs rename to net/monero-wire/src/messages/common/serde_impls.rs diff --git a/net/monero-wire/src/serde_helpers.rs b/net/monero-wire/src/serde_helpers.rs new file mode 100644 index 00000000..80d9c3d6 --- /dev/null +++ b/net/monero-wire/src/serde_helpers.rs @@ -0,0 +1,13 @@ +use std::fmt::Debug; + +pub(crate) fn default_false() -> bool { + false +} + +pub(crate) fn default_true() -> bool { + true +} + +pub(crate) fn default_zero>() -> T { + 0.try_into().map_err(|_ |"Couldn't fit 0 into integer type!").unwrap() +}