stop batching transactions across blocks when a hard-fork happens..

in the batch.
This commit is contained in:
Boog900 2023-10-27 00:39:06 +01:00
parent d0bd17c560
commit 216bedaf06
No known key found for this signature in database
GPG key ID: 5401367FB7302004
2 changed files with 202 additions and 99 deletions

View file

@ -1,5 +1,6 @@
#![cfg(feature = "binaries")]
use std::path::Path;
use std::{
io::Read,
ops::Range,
@ -7,6 +8,7 @@ use std::{
sync::{Arc, RwLock},
};
use monero_serai::{block::Block, transaction::Transaction};
use tower::{Service, ServiceExt};
use tracing::level_filters::LevelFilter;
@ -17,8 +19,8 @@ use monero_consensus::{
initialize_verifier,
rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig},
transactions::VerifyTxRequest,
Database, DatabaseRequest, DatabaseResponse, HardFork, VerifiedBlockInformation,
VerifyBlockRequest, VerifyTxResponse,
ConsensusError, Database, DatabaseRequest, DatabaseResponse, HardFork,
VerifiedBlockInformation, VerifyBlockRequest, VerifyTxResponse,
};
const MAX_BLOCKS_IN_RANGE: u64 = 500;
@ -29,9 +31,169 @@ async fn call_batch<D: Database>(
range: Range<u64>,
database: D,
) -> Result<DatabaseResponse, tower::BoxError> {
Ok(database
database
.oneshot(DatabaseRequest::BlockBatchInRange(range))
.await?)
.await
}
fn simple_get_hf(height: u64) -> HardFork {
match height {
0..=1009826 => HardFork::V1,
1009827..=1141316 => HardFork::V2,
1141317..=1220515 => HardFork::V3,
_ => todo!("rules past v3"),
}
}
async fn update_cache_and_context<Ctx>(
cache: &RwLock<ScanningCache>,
context_updater: &mut Ctx,
verified_block_info: VerifiedBlockInformation,
) -> Result<(), tower::BoxError>
where
Ctx: tower::Service<UpdateBlockchainCacheRequest, Response = (), Error = tower::BoxError>,
{
// add the new block to the cache
cache.write().unwrap().add_new_block_data(
verified_block_info.generated_coins,
&verified_block_info.block.miner_tx,
&verified_block_info.txs,
);
// update the chain context svc with the new block
context_updater
.ready()
.await?
.call(UpdateBlockchainCacheRequest {
new_top_hash: verified_block_info.block_hash,
height: verified_block_info.height,
timestamp: verified_block_info.block.header.timestamp,
weight: verified_block_info.weight,
long_term_weight: verified_block_info.long_term_weight,
vote: verified_block_info.hf_vote,
generated_coins: verified_block_info.generated_coins,
cumulative_difficulty: verified_block_info.cumulative_difficulty,
})
.await?;
Ok(())
}
/// Batches all transactions together when getting outs
///
/// TODO: reduce the amount of parameters of this function
async fn batch_txs_verify_blocks<Tx, Blk, Ctx>(
cache: &RwLock<ScanningCache>,
save_file: &Path,
txs: Vec<Vec<Transaction>>,
blocks: Vec<Block>,
tx_verifier: &mut Tx,
block_verifier: &mut Blk,
context_updater: &mut Ctx,
current_height: u64,
hf: HardFork,
) -> Result<(), tower::BoxError>
where
Blk: tower::Service<
VerifyBlockRequest,
Response = VerifiedBlockInformation,
Error = ConsensusError,
>,
Tx: tower::Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>,
Ctx: tower::Service<UpdateBlockchainCacheRequest, Response = (), Error = tower::BoxError>,
{
// This is an optimisation, we batch ALL the transactions together to get their outputs, saving a
// massive amount of time at the cost of inaccurate data, specifically the only thing that's inaccurate
// is the amount of outputs at a certain time and as this would be lower (so more strict) than the true value
// this will fail when this is an issue.
let mut txs_per_block = [0; (MAX_BLOCKS_IN_RANGE * 3) as usize];
let txs = txs
.into_iter()
.enumerate()
.flat_map(|(block_id, block_batch_txs)| {
// block id is just this blocks position in the batch.
txs_per_block[block_id] = block_batch_txs.len();
block_batch_txs
})
.collect();
let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier
.ready()
.await?
.call(VerifyTxRequest::BatchSetup { txs, hf })
.await?
else {
panic!("tx verifier returned incorrect response");
};
let mut done_txs = 0;
for (block_id, block) in blocks.into_iter().enumerate() {
// block id is just this blocks position in the batch.
let txs = &txs[done_txs..done_txs + txs_per_block[block_id]];
done_txs += txs_per_block[block_id];
let verified_block_info: VerifiedBlockInformation = block_verifier
.ready()
.await?
.call(VerifyBlockRequest::MainChain(block, txs.into()))
.await?;
tracing::info!(
"verified block: {}",
current_height + u64::try_from(block_id).unwrap()
);
update_cache_and_context(cache, context_updater, verified_block_info).await?;
if current_height + u64::try_from(block_id).unwrap() % 25000 == 0 {
tracing::info!("Saving cache to: {}", save_file.display());
cache.read().unwrap().save(save_file)?;
}
}
Ok(())
}
/// Batches only transactions per block together when getting outs
///
/// TODO: reduce the amount of parameters of this function
async fn verify_blocks<Blk, Ctx>(
cache: &RwLock<ScanningCache>,
save_file: &Path,
txs: Vec<Vec<Transaction>>,
blocks: Vec<Block>,
block_verifier: &mut Blk,
context_updater: &mut Ctx,
current_height: u64,
) -> Result<(), tower::BoxError>
where
Blk: tower::Service<
VerifyBlockRequest,
Response = VerifiedBlockInformation,
Error = ConsensusError,
>,
Ctx: tower::Service<UpdateBlockchainCacheRequest, Response = (), Error = tower::BoxError>,
{
for (block_id, (block, txs)) in blocks.into_iter().zip(txs.into_iter()).enumerate() {
let verified_block_info: VerifiedBlockInformation = block_verifier
.ready()
.await?
.call(VerifyBlockRequest::MainChainBatchSetupVerify(block, txs))
.await?;
tracing::info!(
"verified block: {}",
current_height + u64::try_from(block_id).unwrap()
);
update_cache_and_context(cache, context_updater, verified_block_info).await?;
if current_height + u64::try_from(block_id).unwrap() % 25000 == 0 {
tracing::info!("Saving cache to: {}", save_file.display());
cache.read().unwrap().save(save_file)?;
}
}
Ok(())
}
async fn scan_chain<D>(
@ -47,7 +209,7 @@ where
tracing::info!("Beginning chain scan");
// TODO: when we implement all rules use the RPCs chain height, for now we don't check v2 txs.
let chain_height = 1009827;
let chain_height = 3_000_000;
tracing::info!("scanning to chain height: {}", chain_height);
@ -87,7 +249,7 @@ where
)),
);
let (DatabaseResponse::BlockBatchInRange(blocks)) = current_fut.await?? else {
let DatabaseResponse::BlockBatchInRange(blocks) = current_fut.await?? else {
panic!("Database sent incorrect response!");
};
@ -98,102 +260,42 @@ where
);
let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
let batch_len = u64::try_from(blocks.len()).unwrap();
// This is an optimisation, we batch ALL the transactions together to get their outputs, saving a
// massive amount of time at the cost of inaccurate data, specifically the only thing that's inaccurate
// is the amount of outputs at a certain time and as this would be lower (so more strict) than the true value
// this will fail when this is an issue.
let mut txs_per_block = [0; (MAX_BLOCKS_IN_RANGE * 3) as usize];
let txs = txs
.into_iter()
.enumerate()
.flat_map(|(block_id, block_batch_txs)| {
// block id is just this blocks position in the batch.
txs_per_block[block_id] = block_batch_txs.len();
block_batch_txs
})
.collect();
let hf_start_batch = simple_get_hf(current_height);
let hf_end_batch = simple_get_hf(current_height + batch_len);
let VerifyTxResponse::BatchSetupOk(txs) = transaction_verifier
.ready()
.await?
.call(VerifyTxRequest::BatchSetup {
if hf_start_batch == hf_end_batch {
// we can only batch transactions on the same hard fork
batch_txs_verify_blocks(
&cache,
&save_file,
txs,
// TODO: we need to get the haf from the context svc
hf: HardFork::V1,
})
.await?
else {
panic!("tx verifier returned incorrect response");
};
let mut done_txs = 0;
for (block_id, block) in blocks.into_iter().enumerate() {
// block id is just this blocks position in the batch.
let txs = &txs[done_txs..done_txs + txs_per_block[block_id]];
done_txs += txs_per_block[block_id];
let verified_block_info: VerifiedBlockInformation = block_verifier
.ready()
.await?
.call(VerifyBlockRequest::MainChain(block, txs.into()))
.await?;
// add the new block to the cache
cache.write().unwrap().add_new_block_data(
verified_block_info.generated_coins,
&verified_block_info.block.miner_tx,
&verified_block_info.txs,
blocks,
&mut transaction_verifier,
&mut block_verifier,
&mut context_updater,
current_height,
hf_start_batch,
)
.await?;
} else {
tracing::warn!(
"Hard fork during batch, getting outputs per block this will take a while!"
);
// update the chain context svc with the new block
context_updater
.ready()
.await?
.call(UpdateBlockchainCacheRequest {
new_top_hash: verified_block_info.block_hash,
height: verified_block_info.height,
timestamp: verified_block_info.block.header.timestamp,
weight: verified_block_info.weight,
long_term_weight: verified_block_info.long_term_weight,
vote: verified_block_info.hf_vote,
generated_coins: verified_block_info.generated_coins,
cumulative_difficulty: verified_block_info.cumulative_difficulty,
})
.await?;
tracing::info!("Verified block: {}", current_height);
current_height += 1;
next_batch_start_height += 1;
if current_height % 25000 == 0 {
tracing::info!("Saving cache to: {}", save_file.display());
cache.read().unwrap().save(&save_file)?;
// Get the block header to check our information matches what it should be, we don't need
// to do this all the time
let DatabaseResponse::BlockExtendedHeader(header) = database
.ready()
.await?
.call(DatabaseRequest::BlockExtendedHeader(
verified_block_info.height.into(),
))
.await?
else {
panic!();
};
assert_eq!(header.block_weight, verified_block_info.weight);
assert_eq!(
header.cumulative_difficulty,
verified_block_info.cumulative_difficulty
);
assert_eq!(
header.long_term_weight,
verified_block_info.long_term_weight
);
}
verify_blocks(
&cache,
&save_file,
txs,
blocks,
&mut block_verifier,
&mut context_updater,
current_height,
)
.await?;
}
current_height += batch_len;
next_batch_start_height += batch_len;
}
Ok(())

View file

@ -6,6 +6,7 @@ use std::{
pin::Pin,
sync::{Arc, Mutex, RwLock},
task::{Context, Poll},
time::Duration,
};
use curve25519_dalek::edwards::CompressedEdwardsY;
@ -89,8 +90,8 @@ pub fn init_rpc_load_balancer(
let (rpc_discoverer_tx, rpc_discoverer_rx) = futures::channel::mpsc::channel(30);
let rpc_balance = Balance::new(rpc_discoverer_rx.map(Result::<_, tower::BoxError>::Ok));
// let timeout = tower::timeout::Timeout::new(rpc_balance, Duration::from_secs(120));
let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(rpc_balance), 30);
let timeout = tower::timeout::Timeout::new(rpc_balance, Duration::from_secs(1200));
let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(timeout), 30);
let rpcs = tower::retry::Retry::new(Attempts(10), rpc_buffer);
let discover = discover::RPCDiscover {