use get_blocks_by_height.bin in RPC

This commit is contained in:
Boog900 2023-10-05 17:54:19 +01:00
parent 88b646c5a3
commit 20f6af7951
No known key found for this signature in database
GPG key ID: 5401367FB7302004
10 changed files with 133 additions and 87 deletions

View file

@ -9,7 +9,17 @@ repository = "https://github.com/Cuprate/cuprate/tree/main/consensus"
[features] [features]
default = ["binaries"] default = ["binaries"]
binaries = ["dep:tokio", "dep:tracing-subscriber", "tower/retry", "tower/balance", "tower/buffer", "dep:serde_json", "dep:serde", "dep:epee-encoding"] binaries = [
"dep:tokio",
"dep:tracing-subscriber",
"tower/retry",
"tower/balance",
"tower/buffer",
"dep:serde_json",
"dep:serde",
"dep:monero-epee-bin-serde",
"dep:monero-wire"
]
[dependencies] [dependencies]
hex = "0.4" hex = "0.4"
@ -27,7 +37,8 @@ cuprate-common = {path = "../common"}
cryptonight-cuprate = {path = "../cryptonight"} cryptonight-cuprate = {path = "../cryptonight"}
# used in binaries # used in binaries
epee-encoding = {version = "0.5", optional = true} monero-wire = {path="../net/monero-wire", optional = true}
monero-epee-bin-serde = {git = "https://github.com/monero-rs/monero-epee-bin-serde.git", rev = "e4a585a", optional = true}
serde_json = {version = "1", optional = true} serde_json = {version = "1", optional = true}
serde = {version = "1", optional = true, features = ["derive"]} serde = {version = "1", optional = true, features = ["derive"]}
tokio = { version = "1", features = ["rt-multi-thread", "macros"], optional = true } tokio = { version = "1", features = ["rt-multi-thread", "macros"], optional = true }
@ -35,5 +46,3 @@ tracing-subscriber = {version = "0.3", optional = true}
# here to help cargo to pick a version - remove me # here to help cargo to pick a version - remove me
syn = "2.0.37" syn = "2.0.37"
[profile.dev]
opt-level = 3

View file

@ -1,27 +1,26 @@
#![cfg(feature = "binaries")] #![cfg(feature = "binaries")]
use cuprate_common::Network;
use futures::stream::FuturesOrdered;
use futures::{stream, StreamExt};
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use tower::{Service, ServiceExt}; use tower::ServiceExt;
use tracing::instrument; use tracing::instrument;
use tracing::level_filters::LevelFilter; use tracing::level_filters::LevelFilter;
use monero_consensus::rpc::init_rpc_load_balancer; use cuprate_common::Network;
use monero_consensus::rpc::{init_rpc_load_balancer, MAX_BLOCKS_IN_RANGE};
use monero_consensus::{ use monero_consensus::{
verifier::{Config, Verifier}, verifier::{Config, Verifier},
ConsensusError, Database, DatabaseRequest, DatabaseResponse, Database, DatabaseRequest, DatabaseResponse,
}; };
const BATCH_SIZE: u64 = 50; const BATCH_SIZE: u64 = MAX_BLOCKS_IN_RANGE * 4;
/// A cache which can keep chain state while scanning. /// A cache which can keep chain state while scanning.
/// ///
/// Because we are using a RPC interface with node we need to keep track /// Because we are using a RPC interface with a node we need to keep track
/// of certain data that node doesn't hold like the number of outputs at /// of certain data that the node doesn't hold like the number of outputs at
/// a certain time. /// a certain time.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct ScanningCache { struct ScanningCache {
@ -29,8 +28,6 @@ struct ScanningCache {
numb_outs: HashMap<u64, u64>, numb_outs: HashMap<u64, u64>,
/// The height of the *next* block to scan. /// The height of the *next* block to scan.
height: u64, height: u64,
/// The hash of the *last* block scanned.
last_block_hash: [u8; 32],
} }
impl Default for ScanningCache { impl Default for ScanningCache {
@ -38,8 +35,7 @@ impl Default for ScanningCache {
ScanningCache { ScanningCache {
network: Default::default(), network: Default::default(),
numb_outs: Default::default(), numb_outs: Default::default(),
height: 0, height: 1,
last_block_hash: [0; 32],
} }
} }
} }
@ -102,7 +98,7 @@ where
_ => todo!(), _ => todo!(),
}; };
let _state = Verifier::init_at_chain_height(config, cache.height + 1, database.clone()).await?; let verifier = Verifier::init_at_chain_height(config, cache.height, database.clone()).await?;
tracing::info!("Initialised verifier, begging scan"); tracing::info!("Initialised verifier, begging scan");
@ -116,8 +112,7 @@ where
.step_by(BATCH_SIZE as usize) .step_by(BATCH_SIZE as usize)
.skip(1) .skip(1)
{ {
// Call the next batch while we handle this batch. The RPC does not require use to use .await before // Call the next batch while we handle this batch.
// it starts working on the request.
let current_fut = std::mem::replace( let current_fut = std::mem::replace(
&mut next_fut, &mut next_fut,
tokio::spawn( tokio::spawn(
@ -134,28 +129,8 @@ where
panic!("Database sent incorrect response!"); panic!("Database sent incorrect response!");
}; };
let mut block_data_fut = FuturesOrdered::from_iter(blocks.iter().map(|b| async { for (block, txs) in blocks.into_iter() {
if !b.txs.is_empty() { println!("{}, {}", hex::encode(block.hash()), txs.len());
let txs = b.txs.clone();
let db = database.clone();
tokio::spawn(async move {
let DatabaseResponse::Transactions(txs) =
db.oneshot(DatabaseRequest::Transactions(txs)).await?
else {
panic!("Database sent incorrect response!");
};
Ok(txs)
})
.await
.unwrap()
} else {
Ok(vec![])
}
}))
.zip(stream::iter(blocks.iter()));
while let Some((txs, block)) = block_data_fut.next().await {
let txs = txs.map_err(|e: ConsensusError| e)?;
} }
tracing::info!( tracing::info!(

View file

@ -1,2 +1,5 @@
pub mod pow; pub mod pow;
pub mod weight; pub mod weight;
pub use pow::{check_block_pow, difficulty::DifficultyCache, BlockPOWInfo};
pub use weight::{block_weight, BlockWeightInfo, BlockWeightsCache};

View file

@ -10,6 +10,8 @@ pub mod verifier;
pub enum ConsensusError { pub enum ConsensusError {
#[error("Invalid hard fork version: {0}")] #[error("Invalid hard fork version: {0}")]
InvalidHardForkVersion(&'static str), InvalidHardForkVersion(&'static str),
#[error("The block has a different previous hash than expected")]
BlockIsNotApartOfChain,
#[error("Database error: {0}")] #[error("Database error: {0}")]
Database(#[from] tower::BoxError), Database(#[from] tower::BoxError),
} }
@ -29,6 +31,7 @@ pub enum DatabaseRequest {
BlockHFInfo(cuprate_common::BlockID), BlockHFInfo(cuprate_common::BlockID),
BlockPOWInfo(cuprate_common::BlockID), BlockPOWInfo(cuprate_common::BlockID),
BlockWeights(cuprate_common::BlockID), BlockWeights(cuprate_common::BlockID),
BlockHash(u64),
BlockHfInfoInRange(std::ops::Range<u64>), BlockHfInfoInRange(std::ops::Range<u64>),
BlockWeightsInRange(std::ops::Range<u64>), BlockWeightsInRange(std::ops::Range<u64>),
@ -38,8 +41,6 @@ pub enum DatabaseRequest {
#[cfg(feature = "binaries")] #[cfg(feature = "binaries")]
BlockBatchInRange(std::ops::Range<u64>), BlockBatchInRange(std::ops::Range<u64>),
#[cfg(feature = "binaries")]
Transactions(Vec<[u8; 32]>),
} }
#[derive(Debug)] #[derive(Debug)]
@ -47,6 +48,7 @@ pub enum DatabaseResponse {
BlockHFInfo(hardforks::BlockHFInfo), BlockHFInfo(hardforks::BlockHFInfo),
BlockPOWInfo(block::pow::BlockPOWInfo), BlockPOWInfo(block::pow::BlockPOWInfo),
BlockWeights(block::weight::BlockWeightInfo), BlockWeights(block::weight::BlockWeightInfo),
BlockHash([u8; 32]),
BlockHfInfoInRange(Vec<hardforks::BlockHFInfo>), BlockHfInfoInRange(Vec<hardforks::BlockHFInfo>),
BlockWeightsInRange(Vec<block::weight::BlockWeightInfo>), BlockWeightsInRange(Vec<block::weight::BlockWeightInfo>),
@ -55,7 +57,10 @@ pub enum DatabaseResponse {
ChainHeight(u64), ChainHeight(u64),
#[cfg(feature = "binaries")] #[cfg(feature = "binaries")]
BlockBatchInRange(Vec<monero_serai::block::Block>), BlockBatchInRange(
#[cfg(feature = "binaries")] Vec<(
Transactions(Vec<monero_serai::transaction::Transaction>), monero_serai::block::Block,
Vec<monero_serai::transaction::Transaction>,
)>,
),
} }

View file

@ -8,21 +8,22 @@ use std::task::{Context, Poll};
use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture}; use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture};
use futures::{stream::FuturesOrdered, FutureExt, TryFutureExt, TryStreamExt}; use futures::{stream::FuturesOrdered, FutureExt, TryFutureExt, TryStreamExt};
use monero_serai::rpc::{HttpRpc, RpcConnection, RpcError}; use monero_serai::rpc::{HttpRpc, RpcConnection, RpcError};
use serde::Deserialize; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use tower::balance::p2c::Balance; use tower::balance::p2c::Balance;
use tower::util::BoxService; use tower::util::BoxService;
use tower::ServiceExt; use tower::ServiceExt;
use cuprate_common::BlockID; use cuprate_common::BlockID;
use monero_wire::common::{BlockCompleteEntry, TransactionBlobs};
use crate::block::pow::BlockPOWInfo; use crate::block::pow::BlockPOWInfo;
use crate::block::weight::BlockWeightInfo; use crate::block::weight::BlockWeightInfo;
use crate::hardforks::BlockHFInfo; use crate::hardforks::BlockHFInfo;
use crate::{DatabaseRequest, DatabaseResponse}; use crate::{DatabaseRequest, DatabaseResponse};
pub const MAX_BLOCKS_IN_RANGE: u64 = 10; pub const MAX_BLOCKS_IN_RANGE: u64 = 75;
pub const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 50; pub const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 200;
#[derive(Clone)] #[derive(Clone)]
pub struct Attempts(u64); pub struct Attempts(u64);
@ -227,16 +228,6 @@ impl Rpc<HttpRpc> {
} }
} }
impl<R: RpcConnection> Clone for Rpc<R> {
fn clone(&self) -> Self {
Rpc {
rpc: Arc::clone(&self.rpc),
rpc_state: RpcState::Locked,
error_slot: Arc::clone(&self.error_slot),
}
}
}
impl<R: RpcConnection + Send + Sync + 'static> tower::Service<DatabaseRequest> for Rpc<R> { impl<R: RpcConnection + Send + Sync + 'static> tower::Service<DatabaseRequest> for Rpc<R> {
type Response = DatabaseResponse; type Response = DatabaseResponse;
type Error = tower::BoxError; type Error = tower::BoxError;
@ -269,6 +260,17 @@ impl<R: RpcConnection + Send + Sync + 'static> tower::Service<DatabaseRequest> f
let err_slot = self.error_slot.clone(); let err_slot = self.error_slot.clone();
match req { match req {
DatabaseRequest::BlockHash(height) => async move {
let res: Result<_, RpcError> = rpc
.get_block_hash(height as usize)
.map_ok(|hash| DatabaseResponse::BlockHash(hash))
.await;
if let Err(e) = &res {
*err_slot.lock().unwrap() = Some(e.clone());
}
res.map_err(Into::into)
}
.boxed(),
DatabaseRequest::ChainHeight => async move { DatabaseRequest::ChainHeight => async move {
let res: Result<_, RpcError> = rpc let res: Result<_, RpcError> = rpc
.get_height() .get_height()
@ -294,39 +296,62 @@ impl<R: RpcConnection + Send + Sync + 'static> tower::Service<DatabaseRequest> f
get_blocks_pow_info_in_range(range, rpc).boxed() get_blocks_pow_info_in_range(range, rpc).boxed()
} }
DatabaseRequest::BlockBatchInRange(range) => get_blocks_in_range(range, rpc).boxed(), DatabaseRequest::BlockBatchInRange(range) => get_blocks_in_range(range, rpc).boxed(),
DatabaseRequest::Transactions(txs) => get_transactions(txs, rpc).boxed(),
} }
} }
} }
async fn get_transactions<R: RpcConnection>(
txs: Vec<[u8; 32]>,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> {
if txs.is_empty() {
return Ok(DatabaseResponse::Transactions(vec![]));
}
tracing::info!("Getting transactions, count: {}", txs.len());
let txs = rpc.get_transactions(&txs).await?;
Ok(DatabaseResponse::Transactions(txs))
}
async fn get_blocks_in_range<R: RpcConnection>( async fn get_blocks_in_range<R: RpcConnection>(
range: Range<u64>, range: Range<u64>,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>, rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> { ) -> Result<DatabaseResponse, tower::BoxError> {
let fut = FuturesOrdered::from_iter(
range
.clone()
.map(|height| rpc.get_block_by_number(height as usize)),
);
tracing::info!("Getting blocks in range: {:?}", range); tracing::info!("Getting blocks in range: {:?}", range);
#[derive(Serialize)]
pub struct Request {
pub heights: Vec<u64>,
}
#[derive(Deserialize)]
pub struct Response {
pub blocks: Vec<BlockCompleteEntry>,
}
let res = rpc
.bin_call(
"get_blocks_by_height.bin",
monero_epee_bin_serde::to_bytes(&Request {
heights: range.collect(),
})?,
)
.await?;
let blocks: Response = monero_epee_bin_serde::from_bytes(&res)?;
Ok(DatabaseResponse::BlockBatchInRange( Ok(DatabaseResponse::BlockBatchInRange(
fut.try_collect().await?, blocks
.blocks
.into_iter()
.map(|b| {
Ok((
monero_serai::block::Block::read(&mut b.block.as_slice())?,
if let Some(txs) = b.txs {
match txs {
TransactionBlobs::Pruned(_) => {
return Err("node sent pruned txs!".into())
}
TransactionBlobs::Normal(txs) => txs
.into_iter()
.map(|tx| {
monero_serai::transaction::Transaction::read(&mut tx.as_slice())
})
.collect::<Result<_, _>>()?,
}
} else {
vec![]
},
))
})
.collect::<Result<_, tower::BoxError>>()?,
)) ))
} }
@ -485,3 +510,8 @@ async fn get_blocks_hf_info_in_range<R: RpcConnection>(
.collect(), .collect(),
)) ))
} }
#[derive(Deserialize)]
pub struct BResponse {
pub blocks: Vec<BlockCompleteEntry>,
}

View file

@ -10,8 +10,9 @@ repository = "https://github.com/SyntheticBird45/cuprate/tree/main/net/monero-wi
[dependencies] [dependencies]
levin-cuprate = {path="../levin"} levin-cuprate = {path="../levin"}
epee-encoding = { git = "https://github.com/boog900/epee-encoding"} epee-encoding = { git = "https://github.com/boog900/epee-encoding"}
monero-epee-bin-serde = {git = "https://github.com/monero-rs/monero-epee-bin-serde.git"} monero-epee-bin-serde = {git = "https://github.com/monero-rs/monero-epee-bin-serde.git", rev="e4a585a"}
serde = {version = "1", features = ["derive"]} serde = {version = "1", features = ["derive"]}
serde_with = "3"
[dev-dependencies] [dev-dependencies]
hex = "0.4.3" hex = "0.4.3"

View file

@ -31,6 +31,7 @@
pub mod messages; pub mod messages;
pub mod network_address; pub mod network_address;
mod serde_helpers;
pub use network_address::NetworkAddress; pub use network_address::NetworkAddress;

View file

@ -17,11 +17,15 @@
// //
use epee_encoding::EpeeObject; use epee_encoding::EpeeObject;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_with::{serde_as, Bytes};
use crate::NetworkAddress; use crate::{
serde_helpers::{default_false, default_zero},
NetworkAddress,
};
mod builders; mod builders;
mod serde_helpers; mod serde_impls;
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PeerSupportFlags(u32); pub struct PeerSupportFlags(u32);
@ -197,18 +201,23 @@ impl TransactionBlobs {
} }
/// A Block that can contain transactions /// A Block that can contain transactions
#[serde_as]
#[derive(Clone, Debug, EpeeObject, Serialize, Deserialize, PartialEq, Eq)] #[derive(Clone, Debug, EpeeObject, Serialize, Deserialize, PartialEq, Eq)]
pub struct BlockCompleteEntry { pub struct BlockCompleteEntry {
/// True if tx data is pruned /// True if tx data is pruned
#[epee_default(false)] #[epee_default(false)]
#[serde(default = "default_false")]
pub pruned: bool, pub pruned: bool,
/// The Block /// The Block
#[serde_as(as = "Bytes")]
pub block: Vec<u8>, pub block: Vec<u8>,
/// The Block Weight/Size /// The Block Weight/Size
#[epee_default(0)] #[epee_default(0)]
#[serde(default = "default_zero")]
pub block_weight: u64, pub block_weight: u64,
/// The blocks txs /// The blocks txs
#[epee_default(None)] #[epee_default(None)]
#[serde(skip_serializing_if = "Option::is_none")]
pub txs: Option<TransactionBlobs>, pub txs: Option<TransactionBlobs>,
} }

View file

@ -0,0 +1,13 @@
use std::fmt::Debug;
pub(crate) fn default_false() -> bool {
false
}
pub(crate) fn default_true() -> bool {
true
}
pub(crate) fn default_zero<T: TryFrom<u8>>() -> T {
0.try_into().map_err(|_ |"Couldn't fit 0 into integer type!").unwrap()
}