batch the retrieval of outputs

This commit is contained in:
Boog900 2023-10-26 03:16:03 +01:00
parent cb7d8b7b5e
commit 0cac022605
No known key found for this signature in database
GPG key ID: 5401367FB7302004
6 changed files with 275 additions and 60 deletions

View file

@ -1,38 +1,37 @@
#![cfg(feature = "binaries")] #![cfg(feature = "binaries")]
use std::ops::Range; use std::{
use std::path::PathBuf; io::Read,
use std::sync::{Arc, RwLock}; ops::Range,
use std::time::Duration; path::PathBuf,
sync::{Arc, RwLock},
};
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use tracing::level_filters::LevelFilter; use tracing::level_filters::LevelFilter;
use cuprate_common::Network; use cuprate_common::Network;
use monero_consensus::rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig};
use monero_consensus::{ use monero_consensus::{
context::{ContextConfig, UpdateBlockchainCacheRequest}, context::{ContextConfig, UpdateBlockchainCacheRequest},
initialize_verifier, Database, DatabaseRequest, DatabaseResponse, VerifiedBlockInformation, initialize_verifier,
VerifyBlockRequest, rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig},
transactions::VerifyTxRequest,
Database, DatabaseRequest, DatabaseResponse, HardFork, VerifiedBlockInformation,
VerifyBlockRequest, VerifyTxResponse,
}; };
const INITIAL_MAX_BLOCKS_IN_RANGE: u64 = 1000; const MAX_BLOCKS_IN_RANGE: u64 = 500;
const MAX_BLOCKS_IN_RANGE: u64 = 1000; const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 250;
const INITIAL_MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 250;
/// Calls for a batch of blocks, returning the response and the time it took. /// Calls for a batch of blocks, returning the response and the time it took.
async fn call_batch<D: Database>( async fn call_batch<D: Database>(
range: Range<u64>, range: Range<u64>,
database: D, database: D,
) -> Result<(DatabaseResponse, Duration), tower::BoxError> { ) -> Result<DatabaseResponse, tower::BoxError> {
let now = std::time::Instant::now(); Ok(database
Ok(( .oneshot(DatabaseRequest::BlockBatchInRange(range))
database .await?)
.oneshot(DatabaseRequest::BlockBatchInRange(range))
.await?,
now.elapsed(),
))
} }
async fn scan_chain<D>( async fn scan_chain<D>(
@ -48,13 +47,13 @@ where
tracing::info!("Beginning chain scan"); tracing::info!("Beginning chain scan");
// TODO: when we implement all rules use the RPCs chain height, for now we don't check v2 txs. // TODO: when we implement all rules use the RPCs chain height, for now we don't check v2 txs.
let chain_height = 1288616; let chain_height = 1009827;
tracing::info!("scanning to chain height: {}", chain_height); tracing::info!("scanning to chain height: {}", chain_height);
let config = ContextConfig::main_net(); let config = ContextConfig::main_net();
let (mut block_verifier, _, mut context_updater) = let (mut block_verifier, mut transaction_verifier, mut context_updater) =
initialize_verifier(database.clone(), config).await?; initialize_verifier(database.clone(), config).await?;
let batch_size = rpc_config.read().unwrap().block_batch_size(); let batch_size = rpc_config.read().unwrap().block_batch_size();
@ -75,6 +74,7 @@ where
let mut next_batch_start_height = start_height + batch_size; let mut next_batch_start_height = start_height + batch_size;
while next_batch_start_height < chain_height { while next_batch_start_height < chain_height {
// TODO: utilize dynamic batch sizes
let next_batch_size = rpc_config.read().unwrap().block_batch_size(); let next_batch_size = rpc_config.read().unwrap().block_batch_size();
// Call the next batch while we handle this batch. // Call the next batch while we handle this batch.
@ -87,7 +87,7 @@ where
)), )),
); );
let (DatabaseResponse::BlockBatchInRange(blocks), _) = current_fut.await?? else { let (DatabaseResponse::BlockBatchInRange(blocks)) = current_fut.await?? else {
panic!("Database sent incorrect response!"); panic!("Database sent incorrect response!");
}; };
@ -97,19 +97,55 @@ where
chain_height chain_height
); );
// let block_len = blocks.len(); let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
for (block, txs) in blocks {
// This is an optimisation, we batch ALL the transactions together to get their outputs, saving a
// massive amount of time at the cost of inaccurate data, specifically the only thing that's inaccurate
// is the amount of outputs at a certain time and as this would be lower (so more strict) than the true value
// this will fail when this is an issue.
let mut txs_per_block = [0; (MAX_BLOCKS_IN_RANGE * 3) as usize];
let txs = txs
.into_iter()
.enumerate()
.flat_map(|(block_id, block_batch_txs)| {
// block id is just this blocks position in the batch.
txs_per_block[block_id] = block_batch_txs.len();
block_batch_txs
})
.collect();
let VerifyTxResponse::BatchSetupOk(txs) = transaction_verifier
.ready()
.await?
.call(VerifyTxRequest::BatchSetup {
txs,
// TODO: we need to get the haf from the context svc
hf: HardFork::V1,
})
.await?
else {
panic!("tx verifier returned incorrect response");
};
let mut done_txs = 0;
for (block_id, block) in blocks.into_iter().enumerate() {
// block id is just this blocks position in the batch.
let txs = &txs[done_txs..done_txs + txs_per_block[block_id]];
done_txs += txs_per_block[block_id];
let verified_block_info: VerifiedBlockInformation = block_verifier let verified_block_info: VerifiedBlockInformation = block_verifier
.ready() .ready()
.await? .await?
.call(VerifyBlockRequest::MainChainBatchSetupVerify(block, txs)) .call(VerifyBlockRequest::MainChain(block, txs.into()))
.await?; .await?;
// add the new block to the cache
cache.write().unwrap().add_new_block_data( cache.write().unwrap().add_new_block_data(
verified_block_info.generated_coins, verified_block_info.generated_coins,
&verified_block_info.block.miner_tx, &verified_block_info.block.miner_tx,
&verified_block_info.txs, &verified_block_info.txs,
); );
// update the chain context svc with the new block
context_updater context_updater
.ready() .ready()
.await? .await?
@ -130,10 +166,12 @@ where
current_height += 1; current_height += 1;
next_batch_start_height += 1; next_batch_start_height += 1;
if current_height % 500 == 0 { if current_height % 25000 == 0 {
tracing::info!("Saving cache to: {}", save_file.display()); tracing::info!("Saving cache to: {}", save_file.display());
cache.read().unwrap().save(&save_file)?; cache.read().unwrap().save(&save_file)?;
// Get the block header to check our information matches what it should be, we don't need
// to do this all the time
let DatabaseResponse::BlockExtendedHeader(header) = database let DatabaseResponse::BlockExtendedHeader(header) = database
.ready() .ready()
.await? .await?
@ -163,6 +201,12 @@ where
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
// TODO: take this in as config options:
// - nodes to connect to
// - block batch size (not header)
// - network
// - tracing level
tracing_subscriber::fmt() tracing_subscriber::fmt()
.with_max_level(LevelFilter::INFO) .with_max_level(LevelFilter::INFO)
.init(); .init();
@ -198,10 +242,7 @@ async fn main() {
"http://145.239.97.211:18089".to_string(), "http://145.239.97.211:18089".to_string(),
]; ];
let rpc_config = RpcConfig::new( let rpc_config = RpcConfig::new(MAX_BLOCKS_IN_RANGE, MAX_BLOCKS_HEADERS_IN_RANGE);
INITIAL_MAX_BLOCKS_IN_RANGE,
INITIAL_MAX_BLOCKS_HEADERS_IN_RANGE,
);
let rpc_config = Arc::new(RwLock::new(rpc_config)); let rpc_config = Arc::new(RwLock::new(rpc_config));
let cache = match ScanningCache::load(&file_for_cache) { let cache = match ScanningCache::load(&file_for_cache) {

View file

@ -99,6 +99,9 @@ where
batch_setup_verify_main_chain_block(block, txs, context_svc, tx_verifier_svc) batch_setup_verify_main_chain_block(block, txs, context_svc, tx_verifier_svc)
.await .await
} }
VerifyBlockRequest::MainChain(block, txs) => {
verify_main_chain_block(block, txs, context_svc, tx_verifier_svc).await
}
_ => todo!(), _ => todo!(),
} }
} }
@ -106,6 +109,92 @@ where
} }
} }
async fn verify_main_chain_block<C, Tx>(
block: Block,
txs: Vec<Arc<TransactionVerificationData>>,
context_svc: C,
tx_verifier_svc: Tx,
) -> Result<VerifiedBlockInformation, ConsensusError>
where
C: Service<BlockChainContextRequest, Response = BlockChainContext, Error = tower::BoxError>
+ Send
+ 'static,
C::Future: Send + 'static,
Tx: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>,
{
tracing::debug!("getting blockchain context");
let context = context_svc
.oneshot(BlockChainContextRequest)
.await
.map_err(Into::<ConsensusError>::into)?;
tracing::debug!("got blockchain context: {:?}", context);
let block_weight = block.miner_tx.weight() + txs.iter().map(|tx| tx.tx_weight).sum::<usize>();
let total_fees = txs.iter().map(|tx| tx.fee).sum::<u64>();
tx_verifier_svc
.oneshot(VerifyTxRequest::Block {
txs: txs.clone(),
current_chain_height: context.chain_height,
time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(),
hf: context.current_hard_fork,
})
.await?;
let generated_coins = miner_tx::check_miner_tx(
&block.miner_tx,
total_fees,
context.chain_height,
block_weight,
context.median_weight_for_block_reward,
context.already_generated_coins,
&context.current_hard_fork,
)?;
let hashing_blob = block.serialize_hashable();
checks::block_size_sanity_check(block.serialize().len(), context.effective_median_weight)?;
checks::block_weight_check(block_weight, context.median_weight_for_block_reward)?;
checks::check_amount_txs(block.txs.len())?;
checks::check_prev_id(&block, &context.top_hash)?;
if let Some(median_timestamp) = context.median_block_timestamp {
// will only be None for the first 60 blocks
checks::check_timestamp(&block, median_timestamp)?;
}
// do POW test last
let pow_hash = tokio::task::spawn_blocking(move || {
hash_worker::calculate_pow_hash(
&hashing_blob,
context.chain_height,
&context.current_hard_fork,
)
})
.await
.unwrap()?;
checks::check_block_pow(&pow_hash, context.next_difficulty)?;
context
.current_hard_fork
.check_block_version_vote(&block.header)?;
Ok(VerifiedBlockInformation {
block_hash: block.hash(),
block,
txs,
pow_hash,
generated_coins,
weight: block_weight,
height: context.chain_height,
long_term_weight: context.next_block_long_term_weight(block_weight),
hf_vote: HardFork::V1,
cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty,
})
}
async fn batch_setup_verify_main_chain_block<C, Tx>( async fn batch_setup_verify_main_chain_block<C, Tx>(
block: Block, block: Block,
txs: Vec<Transaction>, txs: Vec<Transaction>,

View file

@ -1,22 +1,24 @@
use curve25519_dalek::edwards::CompressedEdwardsY; use std::{
use std::cmp::min; cmp::min,
use std::collections::{HashMap, HashSet}; collections::{HashMap, HashSet},
use std::future::Future; future::Future,
use std::ops::Range; ops::Range,
use std::pin::Pin; pin::Pin,
use std::sync::{Arc, Mutex, RwLock}; sync::{Arc, Mutex, RwLock},
use std::task::{Context, Poll}; task::{Context, Poll},
use std::time::Duration; };
use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture}; use curve25519_dalek::edwards::CompressedEdwardsY;
use futures::{stream::FuturesOrdered, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use futures::{
lock::{OwnedMutexGuard, OwnedMutexLockFuture},
stream::{FuturesOrdered, FuturesUnordered},
FutureExt, StreamExt, TryFutureExt, TryStreamExt,
};
use monero_serai::rpc::{HttpRpc, RpcConnection, RpcError}; use monero_serai::rpc::{HttpRpc, RpcConnection, RpcError};
use rayon::prelude::*; use rayon::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use tower::balance::p2c::Balance; use tower::{balance::p2c::Balance, util::BoxService, ServiceExt};
use tower::util::BoxService;
use tower::ServiceExt;
use tracing::{instrument, Instrument}; use tracing::{instrument, Instrument};
use cuprate_common::BlockID; use cuprate_common::BlockID;
@ -29,6 +31,8 @@ mod discover;
use cache::ScanningCache; use cache::ScanningCache;
const MAX_OUTS_PER_RPC: usize = 5000; // the cap for monerod is 5000
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
pub struct RpcConfig { pub struct RpcConfig {
pub max_blocks_per_node: u64, pub max_blocks_per_node: u64,
@ -85,9 +89,9 @@ pub fn init_rpc_load_balancer(
let (rpc_discoverer_tx, rpc_discoverer_rx) = futures::channel::mpsc::channel(30); let (rpc_discoverer_tx, rpc_discoverer_rx) = futures::channel::mpsc::channel(30);
let rpc_balance = Balance::new(rpc_discoverer_rx.map(Result::<_, tower::BoxError>::Ok)); let rpc_balance = Balance::new(rpc_discoverer_rx.map(Result::<_, tower::BoxError>::Ok));
let timeout = tower::timeout::Timeout::new(rpc_balance, Duration::from_secs(120)); // let timeout = tower::timeout::Timeout::new(rpc_balance, Duration::from_secs(120));
let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(timeout), 30); let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(rpc_balance), 30);
let rpcs = tower::retry::Retry::new(Attempts(3), rpc_buffer); let rpcs = tower::retry::Retry::new(Attempts(10), rpc_buffer);
let discover = discover::RPCDiscover { let discover = discover::RPCDiscover {
rpc: rpcs.clone(), rpc: rpcs.clone(),
@ -164,6 +168,56 @@ where
config.max_block_headers_per_node, config.max_block_headers_per_node,
) )
} }
DatabaseRequest::Outputs(outs) => async move {
let mut split_outs: Vec<HashMap<u64, HashSet<u64>>> = Vec::new();
let mut i: usize = 0;
for (amount, ixs) in outs {
if ixs.len() > MAX_OUTS_PER_RPC {
for ii in (0..ixs.len()).step_by(MAX_OUTS_PER_RPC) {
let mut amt_map = HashSet::with_capacity(MAX_OUTS_PER_RPC);
amt_map.extend(ixs.iter().skip(ii).copied().take(MAX_OUTS_PER_RPC));
let mut map = HashMap::new();
map.insert(amount, amt_map);
split_outs.push(map);
i += 1;
}
continue;
}
if let Some(map) = split_outs.get_mut(i.saturating_sub(1)) {
if map.iter().map(|(_, amt_map)| amt_map.len()).sum::<usize>() + ixs.len()
< MAX_OUTS_PER_RPC
{
assert!(map.insert(amount, ixs).is_none());
continue;
}
}
let mut map = HashMap::new();
map.insert(amount, ixs);
split_outs.push(map);
i += 1;
}
let mut futs = FuturesUnordered::from_iter(
split_outs
.into_iter()
.map(|map| this.clone().oneshot(DatabaseRequest::Outputs(map))),
);
let mut outs = HashMap::new();
while let Some(out_response) = futs.next().await {
let DatabaseResponse::Outputs(out_response) = out_response? else {
panic!("RPC sent incorrect response!");
};
out_response.into_iter().for_each(|(amt, amt_map)| {
outs.entry(amt).or_insert_with(HashMap::new).extend(amt_map)
});
}
Ok(DatabaseResponse::Outputs(outs))
}
.boxed(),
req => this.oneshot(req).boxed(), req => this.oneshot(req).boxed(),
} }
} }

View file

@ -1,9 +1,8 @@
use std::collections::HashSet;
use std::io::Read;
use std::path::Path;
use std::{ use std::{
collections::HashMap, collections::HashMap,
collections::HashSet,
fmt::{Display, Formatter}, fmt::{Display, Formatter},
path::Path,
sync::Arc, sync::Arc,
}; };
@ -11,8 +10,6 @@ use bincode::{Decode, Encode};
use monero_serai::transaction::{Input, Timelock, Transaction}; use monero_serai::transaction::{Input, Timelock, Transaction};
use tracing_subscriber::fmt::MakeWriter; use tracing_subscriber::fmt::MakeWriter;
use cuprate_common::Network;
use crate::transactions::TransactionVerificationData; use crate::transactions::TransactionVerificationData;
/// A cache which can keep chain state while scanning. /// A cache which can keep chain state while scanning.

View file

@ -1,15 +1,17 @@
use std::collections::HashSet; use std::{
use std::sync::{Arc, RwLock}; collections::HashSet,
use std::time::Duration; sync::{Arc, RwLock},
time::Duration,
};
use futures::channel::mpsc::SendError; use futures::{
use futures::stream::FuturesUnordered; channel::mpsc::{self, SendError},
use futures::{channel::mpsc, SinkExt, Stream, StreamExt, TryFutureExt, TryStream}; stream::FuturesUnordered,
SinkExt,
};
use monero_serai::rpc::HttpRpc; use monero_serai::rpc::HttpRpc;
use tokio::time::timeout; use tokio::time::timeout;
use tower::discover::Change; use tower::{discover::Change, load::PeakEwma};
use tower::load::PeakEwma;
use tower::ServiceExt;
use tracing::instrument; use tracing::instrument;
use super::{cache::ScanningCache, Rpc}; use super::{cache::ScanningCache, Rpc};

View file

@ -74,6 +74,8 @@ pub enum VerifyTxRequest {
time_for_time_lock: u64, time_for_time_lock: u64,
hf: HardFork, hf: HardFork,
}, },
/// Batches the setup of [`TransactionVerificationData`].
BatchSetup { txs: Vec<Transaction>, hf: HardFork },
/// Batches the setup of [`TransactionVerificationData`] and verifies the transactions /// Batches the setup of [`TransactionVerificationData`] and verifies the transactions
/// in the context of a block. /// in the context of a block.
BatchSetupVerifyBlock { BatchSetupVerifyBlock {
@ -135,6 +137,14 @@ where
hf, hf,
) )
.boxed(), .boxed(),
VerifyTxRequest::BatchSetup {
txs,
hf
} => batch_setup_transactions(
database,
txs,
hf
).boxed(),
VerifyTxRequest::BatchSetupVerifyBlock { VerifyTxRequest::BatchSetupVerifyBlock {
txs, txs,
current_chain_height, current_chain_height,
@ -179,6 +189,28 @@ where
Ok(()) Ok(())
} }
async fn batch_setup_transactions<D>(
database: D,
txs: Vec<Transaction>,
hf: HardFork,
) -> Result<VerifyTxResponse, ConsensusError>
where
D: Database + Clone + Sync + Send + 'static,
{
// Move out of the async runtime and use rayon to parallelize the serialisation and hashing of the txs.
let txs = tokio::task::spawn_blocking(|| {
txs.into_par_iter()
.map(|tx| Ok(Arc::new(TransactionVerificationData::new(tx)?)))
.collect::<Result<Vec<_>, ConsensusError>>()
})
.await
.unwrap()?;
set_missing_ring_members(database, &txs, &hf).await?;
Ok(VerifyTxResponse::BatchSetupOk(txs))
}
async fn batch_setup_verify_transactions_for_block<D>( async fn batch_setup_verify_transactions_for_block<D>(
database: D, database: D,
txs: Vec<Transaction>, txs: Vec<Transaction>,