From 0cac02260567b2481df7a1f2e0b9616c7ee492b0 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Thu, 26 Oct 2023 03:16:03 +0100 Subject: [PATCH] batch the retrieval of outputs --- consensus/src/bin/scan_chain.rs | 99 +++++++++++++++++++++++---------- consensus/src/block.rs | 89 +++++++++++++++++++++++++++++ consensus/src/rpc.rs | 88 +++++++++++++++++++++++------ consensus/src/rpc/cache.rs | 7 +-- consensus/src/rpc/discover.rs | 20 ++++--- consensus/src/transactions.rs | 32 +++++++++++ 6 files changed, 275 insertions(+), 60 deletions(-) diff --git a/consensus/src/bin/scan_chain.rs b/consensus/src/bin/scan_chain.rs index 22adb62..9fa39a5 100644 --- a/consensus/src/bin/scan_chain.rs +++ b/consensus/src/bin/scan_chain.rs @@ -1,38 +1,37 @@ #![cfg(feature = "binaries")] -use std::ops::Range; -use std::path::PathBuf; -use std::sync::{Arc, RwLock}; -use std::time::Duration; +use std::{ + io::Read, + ops::Range, + path::PathBuf, + sync::{Arc, RwLock}, +}; use tower::{Service, ServiceExt}; use tracing::level_filters::LevelFilter; use cuprate_common::Network; -use monero_consensus::rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig}; use monero_consensus::{ context::{ContextConfig, UpdateBlockchainCacheRequest}, - initialize_verifier, Database, DatabaseRequest, DatabaseResponse, VerifiedBlockInformation, - VerifyBlockRequest, + initialize_verifier, + rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig}, + transactions::VerifyTxRequest, + Database, DatabaseRequest, DatabaseResponse, HardFork, VerifiedBlockInformation, + VerifyBlockRequest, VerifyTxResponse, }; -const INITIAL_MAX_BLOCKS_IN_RANGE: u64 = 1000; -const MAX_BLOCKS_IN_RANGE: u64 = 1000; -const INITIAL_MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 250; +const MAX_BLOCKS_IN_RANGE: u64 = 500; +const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 250; /// Calls for a batch of blocks, returning the response and the time it took. async fn call_batch( range: Range, database: D, -) -> Result<(DatabaseResponse, Duration), tower::BoxError> { - let now = std::time::Instant::now(); - Ok(( - database - .oneshot(DatabaseRequest::BlockBatchInRange(range)) - .await?, - now.elapsed(), - )) +) -> Result { + Ok(database + .oneshot(DatabaseRequest::BlockBatchInRange(range)) + .await?) } async fn scan_chain( @@ -48,13 +47,13 @@ where tracing::info!("Beginning chain scan"); // TODO: when we implement all rules use the RPCs chain height, for now we don't check v2 txs. - let chain_height = 1288616; + let chain_height = 1009827; tracing::info!("scanning to chain height: {}", chain_height); let config = ContextConfig::main_net(); - let (mut block_verifier, _, mut context_updater) = + let (mut block_verifier, mut transaction_verifier, mut context_updater) = initialize_verifier(database.clone(), config).await?; let batch_size = rpc_config.read().unwrap().block_batch_size(); @@ -75,6 +74,7 @@ where let mut next_batch_start_height = start_height + batch_size; while next_batch_start_height < chain_height { + // TODO: utilize dynamic batch sizes let next_batch_size = rpc_config.read().unwrap().block_batch_size(); // Call the next batch while we handle this batch. @@ -87,7 +87,7 @@ where )), ); - let (DatabaseResponse::BlockBatchInRange(blocks), _) = current_fut.await?? else { + let (DatabaseResponse::BlockBatchInRange(blocks)) = current_fut.await?? else { panic!("Database sent incorrect response!"); }; @@ -97,19 +97,55 @@ where chain_height ); - // let block_len = blocks.len(); - for (block, txs) in blocks { + let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip(); + + // This is an optimisation, we batch ALL the transactions together to get their outputs, saving a + // massive amount of time at the cost of inaccurate data, specifically the only thing that's inaccurate + // is the amount of outputs at a certain time and as this would be lower (so more strict) than the true value + // this will fail when this is an issue. + let mut txs_per_block = [0; (MAX_BLOCKS_IN_RANGE * 3) as usize]; + let txs = txs + .into_iter() + .enumerate() + .flat_map(|(block_id, block_batch_txs)| { + // block id is just this blocks position in the batch. + txs_per_block[block_id] = block_batch_txs.len(); + block_batch_txs + }) + .collect(); + + let VerifyTxResponse::BatchSetupOk(txs) = transaction_verifier + .ready() + .await? + .call(VerifyTxRequest::BatchSetup { + txs, + // TODO: we need to get the haf from the context svc + hf: HardFork::V1, + }) + .await? + else { + panic!("tx verifier returned incorrect response"); + }; + + let mut done_txs = 0; + for (block_id, block) in blocks.into_iter().enumerate() { + // block id is just this blocks position in the batch. + let txs = &txs[done_txs..done_txs + txs_per_block[block_id]]; + done_txs += txs_per_block[block_id]; + let verified_block_info: VerifiedBlockInformation = block_verifier .ready() .await? - .call(VerifyBlockRequest::MainChainBatchSetupVerify(block, txs)) + .call(VerifyBlockRequest::MainChain(block, txs.into())) .await?; + // add the new block to the cache cache.write().unwrap().add_new_block_data( verified_block_info.generated_coins, &verified_block_info.block.miner_tx, &verified_block_info.txs, ); + // update the chain context svc with the new block context_updater .ready() .await? @@ -130,10 +166,12 @@ where current_height += 1; next_batch_start_height += 1; - if current_height % 500 == 0 { + if current_height % 25000 == 0 { tracing::info!("Saving cache to: {}", save_file.display()); cache.read().unwrap().save(&save_file)?; + // Get the block header to check our information matches what it should be, we don't need + // to do this all the time let DatabaseResponse::BlockExtendedHeader(header) = database .ready() .await? @@ -163,6 +201,12 @@ where #[tokio::main] async fn main() { + // TODO: take this in as config options: + // - nodes to connect to + // - block batch size (not header) + // - network + // - tracing level + tracing_subscriber::fmt() .with_max_level(LevelFilter::INFO) .init(); @@ -198,10 +242,7 @@ async fn main() { "http://145.239.97.211:18089".to_string(), ]; - let rpc_config = RpcConfig::new( - INITIAL_MAX_BLOCKS_IN_RANGE, - INITIAL_MAX_BLOCKS_HEADERS_IN_RANGE, - ); + let rpc_config = RpcConfig::new(MAX_BLOCKS_IN_RANGE, MAX_BLOCKS_HEADERS_IN_RANGE); let rpc_config = Arc::new(RwLock::new(rpc_config)); let cache = match ScanningCache::load(&file_for_cache) { diff --git a/consensus/src/block.rs b/consensus/src/block.rs index e5f11e0..d0326f2 100644 --- a/consensus/src/block.rs +++ b/consensus/src/block.rs @@ -99,6 +99,9 @@ where batch_setup_verify_main_chain_block(block, txs, context_svc, tx_verifier_svc) .await } + VerifyBlockRequest::MainChain(block, txs) => { + verify_main_chain_block(block, txs, context_svc, tx_verifier_svc).await + } _ => todo!(), } } @@ -106,6 +109,92 @@ where } } +async fn verify_main_chain_block( + block: Block, + txs: Vec>, + context_svc: C, + tx_verifier_svc: Tx, +) -> Result +where + C: Service + + Send + + 'static, + C::Future: Send + 'static, + Tx: Service, +{ + tracing::debug!("getting blockchain context"); + let context = context_svc + .oneshot(BlockChainContextRequest) + .await + .map_err(Into::::into)?; + + tracing::debug!("got blockchain context: {:?}", context); + + let block_weight = block.miner_tx.weight() + txs.iter().map(|tx| tx.tx_weight).sum::(); + let total_fees = txs.iter().map(|tx| tx.fee).sum::(); + + tx_verifier_svc + .oneshot(VerifyTxRequest::Block { + txs: txs.clone(), + current_chain_height: context.chain_height, + time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), + hf: context.current_hard_fork, + }) + .await?; + + let generated_coins = miner_tx::check_miner_tx( + &block.miner_tx, + total_fees, + context.chain_height, + block_weight, + context.median_weight_for_block_reward, + context.already_generated_coins, + &context.current_hard_fork, + )?; + + let hashing_blob = block.serialize_hashable(); + + checks::block_size_sanity_check(block.serialize().len(), context.effective_median_weight)?; + checks::block_weight_check(block_weight, context.median_weight_for_block_reward)?; + + checks::check_amount_txs(block.txs.len())?; + checks::check_prev_id(&block, &context.top_hash)?; + if let Some(median_timestamp) = context.median_block_timestamp { + // will only be None for the first 60 blocks + checks::check_timestamp(&block, median_timestamp)?; + } + + // do POW test last + let pow_hash = tokio::task::spawn_blocking(move || { + hash_worker::calculate_pow_hash( + &hashing_blob, + context.chain_height, + &context.current_hard_fork, + ) + }) + .await + .unwrap()?; + + checks::check_block_pow(&pow_hash, context.next_difficulty)?; + + context + .current_hard_fork + .check_block_version_vote(&block.header)?; + + Ok(VerifiedBlockInformation { + block_hash: block.hash(), + block, + txs, + pow_hash, + generated_coins, + weight: block_weight, + height: context.chain_height, + long_term_weight: context.next_block_long_term_weight(block_weight), + hf_vote: HardFork::V1, + cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty, + }) +} + async fn batch_setup_verify_main_chain_block( block: Block, txs: Vec, diff --git a/consensus/src/rpc.rs b/consensus/src/rpc.rs index b593e22..44d7378 100644 --- a/consensus/src/rpc.rs +++ b/consensus/src/rpc.rs @@ -1,22 +1,24 @@ -use curve25519_dalek::edwards::CompressedEdwardsY; -use std::cmp::min; -use std::collections::{HashMap, HashSet}; -use std::future::Future; -use std::ops::Range; -use std::pin::Pin; -use std::sync::{Arc, Mutex, RwLock}; -use std::task::{Context, Poll}; -use std::time::Duration; +use std::{ + cmp::min, + collections::{HashMap, HashSet}, + future::Future, + ops::Range, + pin::Pin, + sync::{Arc, Mutex, RwLock}, + task::{Context, Poll}, +}; -use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture}; -use futures::{stream::FuturesOrdered, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; +use curve25519_dalek::edwards::CompressedEdwardsY; +use futures::{ + lock::{OwnedMutexGuard, OwnedMutexLockFuture}, + stream::{FuturesOrdered, FuturesUnordered}, + FutureExt, StreamExt, TryFutureExt, TryStreamExt, +}; use monero_serai::rpc::{HttpRpc, RpcConnection, RpcError}; use rayon::prelude::*; use serde::{Deserialize, Serialize}; use serde_json::json; -use tower::balance::p2c::Balance; -use tower::util::BoxService; -use tower::ServiceExt; +use tower::{balance::p2c::Balance, util::BoxService, ServiceExt}; use tracing::{instrument, Instrument}; use cuprate_common::BlockID; @@ -29,6 +31,8 @@ mod discover; use cache::ScanningCache; +const MAX_OUTS_PER_RPC: usize = 5000; // the cap for monerod is 5000 + #[derive(Debug, Copy, Clone)] pub struct RpcConfig { pub max_blocks_per_node: u64, @@ -85,9 +89,9 @@ pub fn init_rpc_load_balancer( let (rpc_discoverer_tx, rpc_discoverer_rx) = futures::channel::mpsc::channel(30); let rpc_balance = Balance::new(rpc_discoverer_rx.map(Result::<_, tower::BoxError>::Ok)); - let timeout = tower::timeout::Timeout::new(rpc_balance, Duration::from_secs(120)); - let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(timeout), 30); - let rpcs = tower::retry::Retry::new(Attempts(3), rpc_buffer); + // let timeout = tower::timeout::Timeout::new(rpc_balance, Duration::from_secs(120)); + let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(rpc_balance), 30); + let rpcs = tower::retry::Retry::new(Attempts(10), rpc_buffer); let discover = discover::RPCDiscover { rpc: rpcs.clone(), @@ -164,6 +168,56 @@ where config.max_block_headers_per_node, ) } + DatabaseRequest::Outputs(outs) => async move { + let mut split_outs: Vec>> = Vec::new(); + let mut i: usize = 0; + for (amount, ixs) in outs { + if ixs.len() > MAX_OUTS_PER_RPC { + for ii in (0..ixs.len()).step_by(MAX_OUTS_PER_RPC) { + let mut amt_map = HashSet::with_capacity(MAX_OUTS_PER_RPC); + amt_map.extend(ixs.iter().skip(ii).copied().take(MAX_OUTS_PER_RPC)); + + let mut map = HashMap::new(); + map.insert(amount, amt_map); + split_outs.push(map); + i += 1; + } + continue; + } + + if let Some(map) = split_outs.get_mut(i.saturating_sub(1)) { + if map.iter().map(|(_, amt_map)| amt_map.len()).sum::() + ixs.len() + < MAX_OUTS_PER_RPC + { + assert!(map.insert(amount, ixs).is_none()); + continue; + } + } + let mut map = HashMap::new(); + map.insert(amount, ixs); + split_outs.push(map); + i += 1; + } + + let mut futs = FuturesUnordered::from_iter( + split_outs + .into_iter() + .map(|map| this.clone().oneshot(DatabaseRequest::Outputs(map))), + ); + + let mut outs = HashMap::new(); + + while let Some(out_response) = futs.next().await { + let DatabaseResponse::Outputs(out_response) = out_response? else { + panic!("RPC sent incorrect response!"); + }; + out_response.into_iter().for_each(|(amt, amt_map)| { + outs.entry(amt).or_insert_with(HashMap::new).extend(amt_map) + }); + } + Ok(DatabaseResponse::Outputs(outs)) + } + .boxed(), req => this.oneshot(req).boxed(), } } diff --git a/consensus/src/rpc/cache.rs b/consensus/src/rpc/cache.rs index d44b4ce..f50aa4d 100644 --- a/consensus/src/rpc/cache.rs +++ b/consensus/src/rpc/cache.rs @@ -1,9 +1,8 @@ -use std::collections::HashSet; -use std::io::Read; -use std::path::Path; use std::{ collections::HashMap, + collections::HashSet, fmt::{Display, Formatter}, + path::Path, sync::Arc, }; @@ -11,8 +10,6 @@ use bincode::{Decode, Encode}; use monero_serai::transaction::{Input, Timelock, Transaction}; use tracing_subscriber::fmt::MakeWriter; -use cuprate_common::Network; - use crate::transactions::TransactionVerificationData; /// A cache which can keep chain state while scanning. diff --git a/consensus/src/rpc/discover.rs b/consensus/src/rpc/discover.rs index 8f2de57..4841bc0 100644 --- a/consensus/src/rpc/discover.rs +++ b/consensus/src/rpc/discover.rs @@ -1,15 +1,17 @@ -use std::collections::HashSet; -use std::sync::{Arc, RwLock}; -use std::time::Duration; +use std::{ + collections::HashSet, + sync::{Arc, RwLock}, + time::Duration, +}; -use futures::channel::mpsc::SendError; -use futures::stream::FuturesUnordered; -use futures::{channel::mpsc, SinkExt, Stream, StreamExt, TryFutureExt, TryStream}; +use futures::{ + channel::mpsc::{self, SendError}, + stream::FuturesUnordered, + SinkExt, +}; use monero_serai::rpc::HttpRpc; use tokio::time::timeout; -use tower::discover::Change; -use tower::load::PeakEwma; -use tower::ServiceExt; +use tower::{discover::Change, load::PeakEwma}; use tracing::instrument; use super::{cache::ScanningCache, Rpc}; diff --git a/consensus/src/transactions.rs b/consensus/src/transactions.rs index 28d0e70..5bd651f 100644 --- a/consensus/src/transactions.rs +++ b/consensus/src/transactions.rs @@ -74,6 +74,8 @@ pub enum VerifyTxRequest { time_for_time_lock: u64, hf: HardFork, }, + /// Batches the setup of [`TransactionVerificationData`]. + BatchSetup { txs: Vec, hf: HardFork }, /// Batches the setup of [`TransactionVerificationData`] and verifies the transactions /// in the context of a block. BatchSetupVerifyBlock { @@ -135,6 +137,14 @@ where hf, ) .boxed(), + VerifyTxRequest::BatchSetup { + txs, + hf + } => batch_setup_transactions( + database, + txs, + hf + ).boxed(), VerifyTxRequest::BatchSetupVerifyBlock { txs, current_chain_height, @@ -179,6 +189,28 @@ where Ok(()) } +async fn batch_setup_transactions( + database: D, + txs: Vec, + hf: HardFork, +) -> Result + where + D: Database + Clone + Sync + Send + 'static, +{ + // Move out of the async runtime and use rayon to parallelize the serialisation and hashing of the txs. + let txs = tokio::task::spawn_blocking(|| { + txs.into_par_iter() + .map(|tx| Ok(Arc::new(TransactionVerificationData::new(tx)?))) + .collect::, ConsensusError>>() + }) + .await + .unwrap()?; + + set_missing_ring_members(database, &txs, &hf).await?; + + Ok(VerifyTxResponse::BatchSetupOk(txs)) +} + async fn batch_setup_verify_transactions_for_block( database: D, txs: Vec,