init scan chain code

This commit is contained in:
Boog900 2023-10-04 14:50:13 +01:00
parent d5595b7eaf
commit f60bb1a678
No known key found for this signature in database
GPG key ID: 5401367FB7302004
4 changed files with 131 additions and 63 deletions

View file

@ -21,7 +21,7 @@ futures = "0.3"
crypto-bigint = "0.5" crypto-bigint = "0.5"
randomx-rs = "1" randomx-rs = "1"
monero-serai = {git="https://github.com/Cuprate/serai.git", rev = "84b77b1"} monero-serai = {git="https://github.com/Cuprate/serai.git", rev = "46f4370"}
cuprate-common = {path = "../common"} cuprate-common = {path = "../common"}
cryptonight-cuprate = {path = "../cryptonight"} cryptonight-cuprate = {path = "../cryptonight"}
@ -33,4 +33,7 @@ serde = {version = "1", optional = true, features = ["derive"]}
tokio = { version = "1", features = ["rt-multi-thread", "macros"], optional = true } tokio = { version = "1", features = ["rt-multi-thread", "macros"], optional = true }
tracing-subscriber = {version = "0.3", optional = true} tracing-subscriber = {version = "0.3", optional = true}
# here to help cargo to pick a version - remove me # here to help cargo to pick a version - remove me
syn = "2.0.37" syn = "2.0.37"
[profile.dev]
opt-level = 3

View file

@ -1,6 +1,8 @@
#![cfg(feature = "binaries")] #![cfg(feature = "binaries")]
use cuprate_common::Network; use cuprate_common::Network;
use futures::stream::FuturesOrdered;
use futures::{stream, StreamExt};
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
@ -8,14 +10,14 @@ use tower::{Service, ServiceExt};
use tracing::instrument; use tracing::instrument;
use tracing::level_filters::LevelFilter; use tracing::level_filters::LevelFilter;
use monero_consensus::rpc::init_rpc_load_balancer;
use monero_consensus::rpc::{init_rpc_load_balancer, MAX_BLOCKS_IN_RANGE};
use monero_consensus::{ use monero_consensus::{
state::{Config, State}, verifier::{Config, Verifier},
ConsensusError, Database, DatabaseRequest, DatabaseResponse, ConsensusError, Database, DatabaseRequest, DatabaseResponse,
}; };
const BATCH_SIZE: u64 = 50;
/// A cache which can keep chain state while scanning. /// A cache which can keep chain state while scanning.
/// ///
/// Because we are using a RPC interface with node we need to keep track /// Because we are using a RPC interface with node we need to keep track
@ -27,6 +29,8 @@ struct ScanningCache {
numb_outs: HashMap<u64, u64>, numb_outs: HashMap<u64, u64>,
/// The height of the *next* block to scan. /// The height of the *next* block to scan.
height: u64, height: u64,
/// The hash of the *last* block scanned.
last_block_hash: [u8; 32],
} }
impl Default for ScanningCache { impl Default for ScanningCache {
@ -34,7 +38,8 @@ impl Default for ScanningCache {
ScanningCache { ScanningCache {
network: Default::default(), network: Default::default(),
numb_outs: Default::default(), numb_outs: Default::default(),
height: 1, height: 0,
last_block_hash: [0; 32],
} }
} }
} }
@ -71,11 +76,14 @@ impl Display for ScanningCache {
} }
#[instrument(skip_all, level = "info")] #[instrument(skip_all, level = "info")]
async fn scan_chain<D: Database + Clone>( async fn scan_chain<D: Database + Clone + Send + 'static>(
cache: ScanningCache, cache: ScanningCache,
network: Network, network: Network,
mut database: D, mut database: D,
) -> Result<(), ConsensusError> { ) -> Result<(), tower::BoxError>
where
D::Future: Send + 'static,
{
tracing::info!("Beginning chain scan, {}", &cache); tracing::info!("Beginning chain scan, {}", &cache);
let DatabaseResponse::ChainHeight(chain_height) = database let DatabaseResponse::ChainHeight(chain_height) = database
@ -94,21 +102,67 @@ async fn scan_chain<D: Database + Clone>(
_ => todo!(), _ => todo!(),
}; };
let _state = State::init_at_chain_height(config, cache.height, database.clone()).await?; let _state = Verifier::init_at_chain_height(config, cache.height + 1, database.clone()).await?;
tracing::info!("Initialised state, begging scan"); tracing::info!("Initialised verifier, begging scan");
for height in (cache.height..chain_height).step_by(MAX_BLOCKS_IN_RANGE as usize) { let mut next_fut = tokio::spawn(database.clone().ready().await?.call(
let DatabaseResponse::BlockBatchInRange(_blocks) = database DatabaseRequest::BlockBatchInRange(
.ready() cache.height..(cache.height + BATCH_SIZE).min(chain_height),
.await? ),
.call(DatabaseRequest::BlockBatchInRange( ));
height..(height + MAX_BLOCKS_IN_RANGE).max(chain_height),
)) for height in (cache.height..chain_height)
.await? .step_by(BATCH_SIZE as usize)
else { .skip(1)
{
// Call the next batch while we handle this batch. The RPC does not require use to use .await before
// it starts working on the request.
let current_fut = std::mem::replace(
&mut next_fut,
tokio::spawn(
database
.ready()
.await?
.call(DatabaseRequest::BlockBatchInRange(
height..(height + BATCH_SIZE).min(chain_height),
)),
),
);
let DatabaseResponse::BlockBatchInRange(blocks) = current_fut.await?? else {
panic!("Database sent incorrect response!"); panic!("Database sent incorrect response!");
}; };
let mut block_data_fut = FuturesOrdered::from_iter(blocks.iter().map(|b| async {
if !b.txs.is_empty() {
let txs = b.txs.clone();
let db = database.clone();
tokio::spawn(async move {
let DatabaseResponse::Transactions(txs) =
db.oneshot(DatabaseRequest::Transactions(txs)).await?
else {
panic!("Database sent incorrect response!");
};
Ok(txs)
})
.await
.unwrap()
} else {
Ok(vec![])
}
}))
.zip(stream::iter(blocks.iter()));
while let Some((txs, block)) = block_data_fut.next().await {
let txs = txs.map_err(|e: ConsensusError| e)?;
}
tracing::info!(
"Moving onto next batch: {:?}, chain height: {}",
height..(height + BATCH_SIZE).min(chain_height),
chain_height
);
} }
Ok(()) Ok(())
@ -124,17 +178,17 @@ async fn main() {
"http://xmr-node.cakewallet.com:18081".to_string(), "http://xmr-node.cakewallet.com:18081".to_string(),
"http://node.sethforprivacy.com".to_string(), "http://node.sethforprivacy.com".to_string(),
"http://nodex.monerujo.io:18081".to_string(), "http://nodex.monerujo.io:18081".to_string(),
"http://node.community.rino.io:18081".to_string(), // "http://node.community.rino.io:18081".to_string(),
"http://nodes.hashvault.pro:18081".to_string(), "http://nodes.hashvault.pro:18081".to_string(),
"http://node.moneroworld.com:18089".to_string(), // "http://node.moneroworld.com:18089".to_string(),
"http://node.c3pool.com:18081".to_string(), "http://node.c3pool.com:18081".to_string(),
// //
"http://xmr-node.cakewallet.com:18081".to_string(), "http://xmr-node.cakewallet.com:18081".to_string(),
"http://node.sethforprivacy.com".to_string(), "http://node.sethforprivacy.com".to_string(),
"http://nodex.monerujo.io:18081".to_string(), "http://nodex.monerujo.io:18081".to_string(),
"http://node.community.rino.io:18081".to_string(), // "http://node.community.rino.io:18081".to_string(),
"http://nodes.hashvault.pro:18081".to_string(), "http://nodes.hashvault.pro:18081".to_string(),
"http://node.moneroworld.com:18089".to_string(), // "http://node.moneroworld.com:18089".to_string(),
"http://node.c3pool.com:18081".to_string(), "http://node.c3pool.com:18081".to_string(),
]; ];

View file

@ -4,7 +4,7 @@ pub mod hardforks;
pub mod miner_tx; pub mod miner_tx;
#[cfg(feature = "binaries")] #[cfg(feature = "binaries")]
pub mod rpc; pub mod rpc;
pub mod state; pub mod verifier;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum ConsensusError { pub enum ConsensusError {
@ -38,6 +38,8 @@ pub enum DatabaseRequest {
#[cfg(feature = "binaries")] #[cfg(feature = "binaries")]
BlockBatchInRange(std::ops::Range<u64>), BlockBatchInRange(std::ops::Range<u64>),
#[cfg(feature = "binaries")]
Transactions(Vec<[u8; 32]>),
} }
#[derive(Debug)] #[derive(Debug)]
@ -54,4 +56,6 @@ pub enum DatabaseResponse {
#[cfg(feature = "binaries")] #[cfg(feature = "binaries")]
BlockBatchInRange(Vec<monero_serai::block::Block>), BlockBatchInRange(Vec<monero_serai::block::Block>),
#[cfg(feature = "binaries")]
Transactions(Vec<monero_serai::transaction::Transaction>),
} }

View file

@ -7,7 +7,6 @@ use std::task::{Context, Poll};
use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture}; use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture};
use futures::{stream::FuturesOrdered, FutureExt, TryFutureExt, TryStreamExt}; use futures::{stream::FuturesOrdered, FutureExt, TryFutureExt, TryStreamExt};
use monero_serai::block::Block;
use monero_serai::rpc::{HttpRpc, RpcConnection, RpcError}; use monero_serai::rpc::{HttpRpc, RpcConnection, RpcError};
use serde::Deserialize; use serde::Deserialize;
use serde_json::json; use serde_json::json;
@ -22,7 +21,8 @@ use crate::block::weight::BlockWeightInfo;
use crate::hardforks::BlockHFInfo; use crate::hardforks::BlockHFInfo;
use crate::{DatabaseRequest, DatabaseResponse}; use crate::{DatabaseRequest, DatabaseResponse};
pub const MAX_BLOCKS_IN_RANGE: u64 = 50; pub const MAX_BLOCKS_IN_RANGE: u64 = 10;
pub const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 50;
#[derive(Clone)] #[derive(Clone)]
pub struct Attempts(u64); pub struct Attempts(u64);
@ -31,8 +31,11 @@ impl<Req: Clone, Res, E> tower::retry::Policy<Req, Res, E> for Attempts {
type Future = futures::future::Ready<Self>; type Future = futures::future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &E>) -> Option<Self::Future> { fn retry(&self, _: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
if result.is_err() { if result.is_err() {
// TODO: if self.0 == 0 {
Some(futures::future::ready(Attempts(self.0))) None
} else {
Some(futures::future::ready(Attempts(self.0 - 1)))
}
} else { } else {
None None
} }
@ -45,8 +48,14 @@ impl<Req: Clone, Res, E> tower::retry::Policy<Req, Res, E> for Attempts {
pub fn init_rpc_load_balancer( pub fn init_rpc_load_balancer(
addresses: Vec<String>, addresses: Vec<String>,
) -> impl tower::Service<DatabaseRequest, Response = DatabaseResponse, Error = tower::BoxError> + Clone ) -> impl tower::Service<
{ DatabaseRequest,
Response = DatabaseResponse,
Error = tower::BoxError,
Future = Pin<
Box<dyn Future<Output = Result<DatabaseResponse, tower::BoxError>> + Send + 'static>,
>,
> + Clone {
let rpc_discoverer = tower::discover::ServiceList::new( let rpc_discoverer = tower::discover::ServiceList::new(
addresses addresses
.into_iter() .into_iter()
@ -54,7 +63,7 @@ pub fn init_rpc_load_balancer(
); );
let rpc_balance = Balance::new(rpc_discoverer); let rpc_balance = Balance::new(rpc_discoverer);
let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(rpc_balance), 3); let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(rpc_balance), 3);
let rpcs = tower::retry::Retry::new(Attempts(3), rpc_buffer); let rpcs = tower::retry::Retry::new(Attempts(2), rpc_buffer);
RpcBalancer { rpcs } RpcBalancer { rpcs }
} }
@ -99,6 +108,7 @@ where
DatabaseRequest::BlockBatchInRange, DatabaseRequest::BlockBatchInRange,
DatabaseResponse::BlockBatchInRange, DatabaseResponse::BlockBatchInRange,
resp_to_ret, resp_to_ret,
MAX_BLOCKS_IN_RANGE,
) )
} }
DatabaseRequest::BlockPOWInfoInRange(range) => { DatabaseRequest::BlockPOWInfoInRange(range) => {
@ -114,6 +124,7 @@ where
DatabaseRequest::BlockPOWInfoInRange, DatabaseRequest::BlockPOWInfoInRange,
DatabaseResponse::BlockPOWInfoInRange, DatabaseResponse::BlockPOWInfoInRange,
resp_to_ret, resp_to_ret,
MAX_BLOCKS_HEADERS_IN_RANGE,
) )
} }
@ -130,6 +141,7 @@ where
DatabaseRequest::BlockWeightsInRange, DatabaseRequest::BlockWeightsInRange,
DatabaseResponse::BlockWeightsInRange, DatabaseResponse::BlockWeightsInRange,
resp_to_ret, resp_to_ret,
MAX_BLOCKS_HEADERS_IN_RANGE,
) )
} }
DatabaseRequest::BlockHfInfoInRange(range) => { DatabaseRequest::BlockHfInfoInRange(range) => {
@ -145,6 +157,7 @@ where
DatabaseRequest::BlockHfInfoInRange, DatabaseRequest::BlockHfInfoInRange,
DatabaseResponse::BlockHfInfoInRange, DatabaseResponse::BlockHfInfoInRange,
resp_to_ret, resp_to_ret,
MAX_BLOCKS_HEADERS_IN_RANGE,
) )
} }
req => this.oneshot(req).boxed(), req => this.oneshot(req).boxed(),
@ -158,6 +171,7 @@ fn split_range_request<T, Ret>(
req: impl FnOnce(Range<u64>) -> DatabaseRequest + Clone + Send + 'static, req: impl FnOnce(Range<u64>) -> DatabaseRequest + Clone + Send + 'static,
resp: impl FnOnce(Vec<Ret>) -> DatabaseResponse + Send + 'static, resp: impl FnOnce(Vec<Ret>) -> DatabaseResponse + Send + 'static,
resp_to_ret: impl Fn(DatabaseResponse) -> Vec<Ret> + Copy + Send + 'static, resp_to_ret: impl Fn(DatabaseResponse) -> Vec<Ret> + Copy + Send + 'static,
max_request_per_rpc: u64,
) -> Pin<Box<dyn Future<Output = Result<DatabaseResponse, tower::BoxError>> + Send + 'static>> ) -> Pin<Box<dyn Future<Output = Result<DatabaseResponse, tower::BoxError>> + Send + 'static>>
where where
T: tower::Service<DatabaseRequest, Response = DatabaseResponse, Error = tower::BoxError> T: tower::Service<DatabaseRequest, Response = DatabaseResponse, Error = tower::BoxError>
@ -169,11 +183,11 @@ where
Ret: Send + 'static, Ret: Send + 'static,
{ {
let iter = (0..range.clone().count() as u64) let iter = (0..range.clone().count() as u64)
.step_by(MAX_BLOCKS_IN_RANGE as usize) .step_by(max_request_per_rpc as usize)
.map(|i| { .map(|i| {
let req = req.clone(); let req = req.clone();
let new_range = let new_range =
(range.start + i)..(min(range.start + i + MAX_BLOCKS_IN_RANGE, range.end)); (range.start + i)..(min(range.start + i + max_request_per_rpc, range.end));
rpc.clone().oneshot(req(new_range)).map_ok(resp_to_ret) rpc.clone().oneshot(req(new_range)).map_ok(resp_to_ret)
}); });
@ -280,46 +294,39 @@ impl<R: RpcConnection + Send + Sync + 'static> tower::Service<DatabaseRequest> f
get_blocks_pow_info_in_range(range, rpc).boxed() get_blocks_pow_info_in_range(range, rpc).boxed()
} }
DatabaseRequest::BlockBatchInRange(range) => get_blocks_in_range(range, rpc).boxed(), DatabaseRequest::BlockBatchInRange(range) => get_blocks_in_range(range, rpc).boxed(),
DatabaseRequest::Transactions(txs) => get_transactions(txs, rpc).boxed(),
} }
} }
} }
async fn get_transactions<R: RpcConnection>(
txs: Vec<[u8; 32]>,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> {
if txs.is_empty() {
return Ok(DatabaseResponse::Transactions(vec![]));
}
tracing::info!("Getting transactions, count: {}", txs.len());
let txs = rpc.get_transactions(&txs).await?;
Ok(DatabaseResponse::Transactions(txs))
}
async fn get_blocks_in_range<R: RpcConnection>( async fn get_blocks_in_range<R: RpcConnection>(
range: Range<u64>, range: Range<u64>,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>, rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> { ) -> Result<DatabaseResponse, tower::BoxError> {
let fut = FuturesOrdered::from_iter(
range
.clone()
.map(|height| rpc.get_block_by_number(height as usize)),
);
tracing::info!("Getting blocks in range: {:?}", range); tracing::info!("Getting blocks in range: {:?}", range);
mod i_64079 {
use epee_encoding::EpeeObject;
#[derive(EpeeObject)]
pub struct Request {
pub heights: Vec<u64>,
}
#[derive(EpeeObject)]
pub struct Response {
pub blocks: Vec<Vec<u8>>,
}
}
use i_64079::*;
let res = rpc
.bin_call(
"get_blocks_by_height.bin",
epee_encoding::to_bytes(&Request {
heights: range.collect(),
})?,
)
.await?;
let res: Response = epee_encoding::from_bytes(&res)?;
Ok(DatabaseResponse::BlockBatchInRange( Ok(DatabaseResponse::BlockBatchInRange(
res.blocks fut.try_collect().await?,
.into_iter()
.map(|buf| Block::read(&mut buf.as_slice()))
.collect::<Result<_, _>>()?,
)) ))
} }