diff --git a/consensus/src/bin/scan_chain.rs b/consensus/src/bin/scan_chain.rs index 9fa39a5a..dc261105 100644 --- a/consensus/src/bin/scan_chain.rs +++ b/consensus/src/bin/scan_chain.rs @@ -1,5 +1,6 @@ #![cfg(feature = "binaries")] +use std::path::Path; use std::{ io::Read, ops::Range, @@ -7,6 +8,7 @@ use std::{ sync::{Arc, RwLock}, }; +use monero_serai::{block::Block, transaction::Transaction}; use tower::{Service, ServiceExt}; use tracing::level_filters::LevelFilter; @@ -17,8 +19,8 @@ use monero_consensus::{ initialize_verifier, rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig}, transactions::VerifyTxRequest, - Database, DatabaseRequest, DatabaseResponse, HardFork, VerifiedBlockInformation, - VerifyBlockRequest, VerifyTxResponse, + ConsensusError, Database, DatabaseRequest, DatabaseResponse, HardFork, + VerifiedBlockInformation, VerifyBlockRequest, VerifyTxResponse, }; const MAX_BLOCKS_IN_RANGE: u64 = 500; @@ -29,9 +31,169 @@ async fn call_batch( range: Range, database: D, ) -> Result { - Ok(database + database .oneshot(DatabaseRequest::BlockBatchInRange(range)) - .await?) + .await +} + +fn simple_get_hf(height: u64) -> HardFork { + match height { + 0..=1009826 => HardFork::V1, + 1009827..=1141316 => HardFork::V2, + 1141317..=1220515 => HardFork::V3, + _ => todo!("rules past v3"), + } +} + +async fn update_cache_and_context( + cache: &RwLock, + context_updater: &mut Ctx, + verified_block_info: VerifiedBlockInformation, +) -> Result<(), tower::BoxError> +where + Ctx: tower::Service, +{ + // add the new block to the cache + cache.write().unwrap().add_new_block_data( + verified_block_info.generated_coins, + &verified_block_info.block.miner_tx, + &verified_block_info.txs, + ); + // update the chain context svc with the new block + context_updater + .ready() + .await? + .call(UpdateBlockchainCacheRequest { + new_top_hash: verified_block_info.block_hash, + height: verified_block_info.height, + timestamp: verified_block_info.block.header.timestamp, + weight: verified_block_info.weight, + long_term_weight: verified_block_info.long_term_weight, + vote: verified_block_info.hf_vote, + generated_coins: verified_block_info.generated_coins, + cumulative_difficulty: verified_block_info.cumulative_difficulty, + }) + .await?; + + Ok(()) +} + +/// Batches all transactions together when getting outs +/// +/// TODO: reduce the amount of parameters of this function +async fn batch_txs_verify_blocks( + cache: &RwLock, + save_file: &Path, + txs: Vec>, + blocks: Vec, + tx_verifier: &mut Tx, + block_verifier: &mut Blk, + context_updater: &mut Ctx, + current_height: u64, + hf: HardFork, +) -> Result<(), tower::BoxError> +where + Blk: tower::Service< + VerifyBlockRequest, + Response = VerifiedBlockInformation, + Error = ConsensusError, + >, + Tx: tower::Service, + Ctx: tower::Service, +{ + // This is an optimisation, we batch ALL the transactions together to get their outputs, saving a + // massive amount of time at the cost of inaccurate data, specifically the only thing that's inaccurate + // is the amount of outputs at a certain time and as this would be lower (so more strict) than the true value + // this will fail when this is an issue. + let mut txs_per_block = [0; (MAX_BLOCKS_IN_RANGE * 3) as usize]; + let txs = txs + .into_iter() + .enumerate() + .flat_map(|(block_id, block_batch_txs)| { + // block id is just this blocks position in the batch. + txs_per_block[block_id] = block_batch_txs.len(); + block_batch_txs + }) + .collect(); + + let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier + .ready() + .await? + .call(VerifyTxRequest::BatchSetup { txs, hf }) + .await? + else { + panic!("tx verifier returned incorrect response"); + }; + + let mut done_txs = 0; + for (block_id, block) in blocks.into_iter().enumerate() { + // block id is just this blocks position in the batch. + let txs = &txs[done_txs..done_txs + txs_per_block[block_id]]; + done_txs += txs_per_block[block_id]; + + let verified_block_info: VerifiedBlockInformation = block_verifier + .ready() + .await? + .call(VerifyBlockRequest::MainChain(block, txs.into())) + .await?; + + tracing::info!( + "verified block: {}", + current_height + u64::try_from(block_id).unwrap() + ); + + update_cache_and_context(cache, context_updater, verified_block_info).await?; + + if current_height + u64::try_from(block_id).unwrap() % 25000 == 0 { + tracing::info!("Saving cache to: {}", save_file.display()); + cache.read().unwrap().save(save_file)?; + } + } + + Ok(()) +} + +/// Batches only transactions per block together when getting outs +/// +/// TODO: reduce the amount of parameters of this function +async fn verify_blocks( + cache: &RwLock, + save_file: &Path, + txs: Vec>, + blocks: Vec, + block_verifier: &mut Blk, + context_updater: &mut Ctx, + current_height: u64, +) -> Result<(), tower::BoxError> +where + Blk: tower::Service< + VerifyBlockRequest, + Response = VerifiedBlockInformation, + Error = ConsensusError, + >, + Ctx: tower::Service, +{ + for (block_id, (block, txs)) in blocks.into_iter().zip(txs.into_iter()).enumerate() { + let verified_block_info: VerifiedBlockInformation = block_verifier + .ready() + .await? + .call(VerifyBlockRequest::MainChainBatchSetupVerify(block, txs)) + .await?; + + tracing::info!( + "verified block: {}", + current_height + u64::try_from(block_id).unwrap() + ); + + update_cache_and_context(cache, context_updater, verified_block_info).await?; + + if current_height + u64::try_from(block_id).unwrap() % 25000 == 0 { + tracing::info!("Saving cache to: {}", save_file.display()); + cache.read().unwrap().save(save_file)?; + } + } + + Ok(()) } async fn scan_chain( @@ -47,7 +209,7 @@ where tracing::info!("Beginning chain scan"); // TODO: when we implement all rules use the RPCs chain height, for now we don't check v2 txs. - let chain_height = 1009827; + let chain_height = 3_000_000; tracing::info!("scanning to chain height: {}", chain_height); @@ -87,7 +249,7 @@ where )), ); - let (DatabaseResponse::BlockBatchInRange(blocks)) = current_fut.await?? else { + let DatabaseResponse::BlockBatchInRange(blocks) = current_fut.await?? else { panic!("Database sent incorrect response!"); }; @@ -98,102 +260,42 @@ where ); let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip(); + let batch_len = u64::try_from(blocks.len()).unwrap(); - // This is an optimisation, we batch ALL the transactions together to get their outputs, saving a - // massive amount of time at the cost of inaccurate data, specifically the only thing that's inaccurate - // is the amount of outputs at a certain time and as this would be lower (so more strict) than the true value - // this will fail when this is an issue. - let mut txs_per_block = [0; (MAX_BLOCKS_IN_RANGE * 3) as usize]; - let txs = txs - .into_iter() - .enumerate() - .flat_map(|(block_id, block_batch_txs)| { - // block id is just this blocks position in the batch. - txs_per_block[block_id] = block_batch_txs.len(); - block_batch_txs - }) - .collect(); + let hf_start_batch = simple_get_hf(current_height); + let hf_end_batch = simple_get_hf(current_height + batch_len); - let VerifyTxResponse::BatchSetupOk(txs) = transaction_verifier - .ready() - .await? - .call(VerifyTxRequest::BatchSetup { + if hf_start_batch == hf_end_batch { + // we can only batch transactions on the same hard fork + batch_txs_verify_blocks( + &cache, + &save_file, txs, - // TODO: we need to get the haf from the context svc - hf: HardFork::V1, - }) - .await? - else { - panic!("tx verifier returned incorrect response"); - }; - - let mut done_txs = 0; - for (block_id, block) in blocks.into_iter().enumerate() { - // block id is just this blocks position in the batch. - let txs = &txs[done_txs..done_txs + txs_per_block[block_id]]; - done_txs += txs_per_block[block_id]; - - let verified_block_info: VerifiedBlockInformation = block_verifier - .ready() - .await? - .call(VerifyBlockRequest::MainChain(block, txs.into())) - .await?; - - // add the new block to the cache - cache.write().unwrap().add_new_block_data( - verified_block_info.generated_coins, - &verified_block_info.block.miner_tx, - &verified_block_info.txs, + blocks, + &mut transaction_verifier, + &mut block_verifier, + &mut context_updater, + current_height, + hf_start_batch, + ) + .await?; + } else { + tracing::warn!( + "Hard fork during batch, getting outputs per block this will take a while!" ); - // update the chain context svc with the new block - context_updater - .ready() - .await? - .call(UpdateBlockchainCacheRequest { - new_top_hash: verified_block_info.block_hash, - height: verified_block_info.height, - timestamp: verified_block_info.block.header.timestamp, - weight: verified_block_info.weight, - long_term_weight: verified_block_info.long_term_weight, - vote: verified_block_info.hf_vote, - generated_coins: verified_block_info.generated_coins, - cumulative_difficulty: verified_block_info.cumulative_difficulty, - }) - .await?; - - tracing::info!("Verified block: {}", current_height); - - current_height += 1; - next_batch_start_height += 1; - - if current_height % 25000 == 0 { - tracing::info!("Saving cache to: {}", save_file.display()); - cache.read().unwrap().save(&save_file)?; - - // Get the block header to check our information matches what it should be, we don't need - // to do this all the time - let DatabaseResponse::BlockExtendedHeader(header) = database - .ready() - .await? - .call(DatabaseRequest::BlockExtendedHeader( - verified_block_info.height.into(), - )) - .await? - else { - panic!(); - }; - - assert_eq!(header.block_weight, verified_block_info.weight); - assert_eq!( - header.cumulative_difficulty, - verified_block_info.cumulative_difficulty - ); - assert_eq!( - header.long_term_weight, - verified_block_info.long_term_weight - ); - } + verify_blocks( + &cache, + &save_file, + txs, + blocks, + &mut block_verifier, + &mut context_updater, + current_height, + ) + .await?; } + current_height += batch_len; + next_batch_start_height += batch_len; } Ok(()) diff --git a/consensus/src/rpc.rs b/consensus/src/rpc.rs index 44d7378e..34e55d5b 100644 --- a/consensus/src/rpc.rs +++ b/consensus/src/rpc.rs @@ -6,6 +6,7 @@ use std::{ pin::Pin, sync::{Arc, Mutex, RwLock}, task::{Context, Poll}, + time::Duration, }; use curve25519_dalek::edwards::CompressedEdwardsY; @@ -89,8 +90,8 @@ pub fn init_rpc_load_balancer( let (rpc_discoverer_tx, rpc_discoverer_rx) = futures::channel::mpsc::channel(30); let rpc_balance = Balance::new(rpc_discoverer_rx.map(Result::<_, tower::BoxError>::Ok)); - // let timeout = tower::timeout::Timeout::new(rpc_balance, Duration::from_secs(120)); - let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(rpc_balance), 30); + let timeout = tower::timeout::Timeout::new(rpc_balance, Duration::from_secs(1200)); + let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(timeout), 30); let rpcs = tower::retry::Retry::new(Attempts(10), rpc_buffer); let discover = discover::RPCDiscover {