From 0e5555d4d4cef01a3d7c79433c079794f785b393 Mon Sep 17 00:00:00 2001 From: Boog900 Date: Fri, 7 Jun 2024 20:23:48 +0000 Subject: [PATCH] Consensus: block batch prepper (#151) * add block batch prepper * Apply suggestions from code review Co-authored-by: hinto-janai * review comments * review comments * fix doc --------- Co-authored-by: hinto-janai --- consensus/src/block.rs | 426 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 387 insertions(+), 39 deletions(-) diff --git a/consensus/src/block.rs b/consensus/src/block.rs index a17709c6..9fb46b95 100644 --- a/consensus/src/block.rs +++ b/consensus/src/block.rs @@ -7,27 +7,91 @@ use std::{ task::{Context, Poll}, }; -use cuprate_helper::asynch::rayon_spawn_async; use futures::FutureExt; -use monero_serai::{block::Block, transaction::Input}; +use monero_serai::{ + block::Block, + transaction::{Input, Transaction}, +}; +use rayon::prelude::*; use tower::{Service, ServiceExt}; +use tracing::instrument; use cuprate_consensus_rules::{ - blocks::{calculate_pow_hash, check_block, check_block_pow, BlockError, RandomX}, + blocks::{ + calculate_pow_hash, check_block, check_block_pow, is_randomx_seed_height, + randomx_seed_height, BlockError, RandomX, + }, miner_tx::MinerTxError, ConsensusError, HardFork, }; +use cuprate_helper::asynch::rayon_spawn_async; use cuprate_types::{VerifiedBlockInformation, VerifiedTransactionInformation}; use crate::{ - context::{BlockChainContextRequest, BlockChainContextResponse}, + context::{ + rx_vms::RandomXVM, BlockChainContextRequest, BlockChainContextResponse, + RawBlockChainContext, + }, transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse}, Database, ExtendedConsensusError, }; +/// A pre-prepared block with all data needed to verify it, except the block's proof of work. +#[derive(Debug)] +pub struct PreparedBlockExPow { + /// The block. + pub block: Block, + /// The serialised block's bytes. + pub block_blob: Vec, + + /// The block's hard-fork vote. + pub hf_vote: HardFork, + /// The block's hard-fork version. + pub hf_version: HardFork, + + /// The block's hash. + pub block_hash: [u8; 32], + /// The height of the block. + pub height: u64, + + /// The weight of the block's miner transaction. + pub miner_tx_weight: usize, +} + +impl PreparedBlockExPow { + /// Prepare a new block. + /// + /// # Errors + /// This errors if either the `block`'s: + /// - Hard-fork values are invalid + /// - Miner transaction is missing a miner input + pub fn new(block: Block) -> Result { + let (hf_version, hf_vote) = + HardFork::from_block_header(&block.header).map_err(BlockError::HardForkError)?; + + let Some(Input::Gen(height)) = block.miner_tx.prefix.inputs.first() else { + Err(ConsensusError::Block(BlockError::MinerTxError( + MinerTxError::InputNotOfTypeGen, + )))? + }; + + Ok(PreparedBlockExPow { + block_blob: block.serialize(), + hf_vote, + hf_version, + + block_hash: block.hash(), + height: *height, + + miner_tx_weight: block.miner_tx.weight(), + block, + }) + } +} + /// A pre-prepared block with all data needed to verify it. #[derive(Debug)] -pub struct PrePreparedBlock { +pub struct PreparedBlock { /// The block pub block: Block, /// The serialised blocks bytes @@ -47,15 +111,15 @@ pub struct PrePreparedBlock { pub miner_tx_weight: usize, } -impl PrePreparedBlock { - /// Creates a new [`PrePreparedBlock`]. +impl PreparedBlock { + /// Creates a new [`PreparedBlock`]. /// /// The randomX VM must be Some if RX is needed or this will panic. /// The randomX VM must also be initialised with the correct seed. fn new( block: Block, randomx_vm: Option<&R>, - ) -> Result { + ) -> Result { let (hf_version, hf_vote) = HardFork::from_block_header(&block.header).map_err(BlockError::HardForkError)?; @@ -65,7 +129,7 @@ impl PrePreparedBlock { )))? }; - Ok(PrePreparedBlock { + Ok(PreparedBlock { block_blob: block.serialize(), hf_vote, hf_version, @@ -82,6 +146,36 @@ impl PrePreparedBlock { block, }) } + + /// Creates a new [`PreparedBlock`] from a [`PreparedBlockExPow`]. + /// + /// This function will give an invalid PoW hash if `randomx_vm` is not initialised + /// with the correct seed. + /// + /// # Panics + /// This function will panic if `randomx_vm` is + /// [`None`] even though RandomX is needed. + fn new_prepped( + block: PreparedBlockExPow, + randomx_vm: Option<&R>, + ) -> Result { + Ok(PreparedBlock { + block_blob: block.block_blob, + hf_vote: block.hf_vote, + hf_version: block.hf_version, + + block_hash: block.block_hash, + pow_hash: calculate_pow_hash( + randomx_vm, + &block.block.serialize_hashable(), + block.height, + &block.hf_version, + )?, + + miner_tx_weight: block.block.miner_tx.weight(), + block: block.block, + }) + } } /// A request to verify a block. @@ -91,12 +185,28 @@ pub enum VerifyBlockRequest { block: Block, prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, }, + /// Verifies a prepared block. + MainChainPrepped { + /// The already prepared block. + block: PreparedBlock, + /// The full list of transactions for this block, in the order given in `block`. + txs: Vec>, + }, + /// Batch prepares a list of blocks and transactions for verification. + MainChainBatchPrepareBlocks { + /// The list of blocks and their transactions (not necessarily in the order given in the block). + blocks: Vec<(Block, Vec)>, + }, } /// A response from a verify block request. +#[allow(clippy::large_enum_variant)] // The largest variant is most common ([`MainChain`]) pub enum VerifyBlockResponse { /// This block is valid. MainChain(VerifiedBlockInformation), + /// A list of prepared blocks for verification, you should call [`VerifyBlockRequest::MainChainPrepped`] on each of the returned + /// blocks to fully verify them. + MainChainBatchPrepped(Vec<(PreparedBlock, Vec>)>), } /// The block verifier service. @@ -178,17 +288,188 @@ where } => { verify_main_chain_block(block, prepared_txs, context_svc, tx_verifier_svc).await } + VerifyBlockRequest::MainChainBatchPrepareBlocks { blocks } => { + batch_prepare_main_chain_block(blocks, context_svc).await + } + VerifyBlockRequest::MainChainPrepped { block, txs } => { + verify_prepped_main_chain_block(block, txs, context_svc, tx_verifier_svc, None) + .await + } } } .boxed() } } +/// Batch prepares a list of blocks for verification. +#[instrument(level = "debug", name = "batch_prep_blocks", skip_all, fields(amt = blocks.len()))] +async fn batch_prepare_main_chain_block( + blocks: Vec<(Block, Vec)>, + mut context_svc: C, +) -> Result +where + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + > + Send + + 'static, + C::Future: Send + 'static, +{ + let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip(); + + tracing::debug!("Calculating block hashes."); + let blocks: Vec = rayon_spawn_async(|| { + blocks + .into_iter() + .map(PreparedBlockExPow::new) + .collect::, _>>() + }) + .await?; + + // A Vec of (timestamp, HF) for each block to calculate the expected difficulty for each block. + let mut timestamps_hfs = Vec::with_capacity(blocks.len()); + let mut new_rx_vm = None; + + tracing::debug!("Checking blocks follow each other."); + + // For every block make sure they have the correct height and previous ID + for window in blocks.windows(2) { + let block_0 = &window[0]; + let block_1 = &window[1]; + + if block_0.block_hash != block_1.block.header.previous + || block_0.height != block_1.height - 1 + { + tracing::debug!("Blocks do not follow each other, verification failed."); + Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?; + } + + // Cache any potential RX VM seeds as we may need them for future blocks in the batch. + if is_randomx_seed_height(block_0.height) { + new_rx_vm = Some((block_0.height, block_0.block_hash)); + } + + timestamps_hfs.push((block_0.block.header.timestamp, block_0.hf_version)) + } + + // Get the current blockchain context. + let BlockChainContextResponse::Context(checked_context) = context_svc + .ready() + .await? + .call(BlockChainContextRequest::GetContext) + .await + .map_err(Into::::into)? + else { + panic!("Context service returned wrong response!"); + }; + + // Calculate the expected difficulties for each block in the batch. + let BlockChainContextResponse::BatchDifficulties(difficulties) = context_svc + .ready() + .await? + .call(BlockChainContextRequest::BatchGetDifficulties( + timestamps_hfs, + )) + .await + .map_err(Into::::into)? + else { + panic!("Context service returned wrong response!"); + }; + + let context = checked_context.unchecked_blockchain_context().clone(); + + // Make sure the blocks follow the main chain. + + if context.chain_height != blocks[0].height { + tracing::debug!("Blocks do not follow main chain, verification failed."); + + Err(ConsensusError::Block(BlockError::MinerTxError( + MinerTxError::InputsHeightIncorrect, + )))?; + } + + if context.top_hash != blocks[0].block.header.previous { + tracing::debug!("Blocks do not follow main chain, verification failed."); + + Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?; + } + + let mut rx_vms = context.rx_vms; + + // If we have a RX seed in the batch calculate it. + if let Some((new_vm_height, new_vm_seed)) = new_rx_vm { + tracing::debug!("New randomX seed in batch, initialising VM"); + + let new_vm = rayon_spawn_async(move || { + Arc::new(RandomXVM::new(&new_vm_seed).expect("RandomX VM gave an error on set up!")) + }) + .await; + + context_svc + .ready() + .await? + .call(BlockChainContextRequest::NewRXVM(( + new_vm_seed, + new_vm.clone(), + ))) + .await + .map_err(Into::::into)?; + + rx_vms.insert(new_vm_height, new_vm); + } + + tracing::debug!("Calculating PoW and prepping transaction"); + + let blocks = rayon_spawn_async(move || { + blocks + .into_par_iter() + .zip(difficulties) + .zip(txs) + .map(|((block, difficultly), txs)| { + // Calculate the PoW for the block. + let height = block.height; + let block = PreparedBlock::new_prepped( + block, + rx_vms.get(&randomx_seed_height(height)).map(AsRef::as_ref), + )?; + + // Check the PoW + check_block_pow(&block.pow_hash, difficultly).map_err(ConsensusError::Block)?; + + // Now setup the txs. + let mut txs = txs + .into_par_iter() + .map(|tx| { + let tx = TransactionVerificationData::new(tx)?; + Ok::<_, ConsensusError>((tx.tx_hash, tx)) + }) + .collect::, _>>()?; + + // Order the txs correctly. + let mut ordered_txs = Vec::with_capacity(txs.len()); + + for tx_hash in &block.block.txs { + let tx = txs + .remove(tx_hash) + .ok_or(ExtendedConsensusError::TxsIncludedWithBlockIncorrect)?; + ordered_txs.push(Arc::new(tx)); + } + + Ok((block, ordered_txs)) + }) + .collect::, ExtendedConsensusError>>() + }) + .await?; + + Ok(VerifyBlockResponse::MainChainBatchPrepped(blocks)) +} + /// Verifies a prepared block. async fn verify_main_chain_block( block: Block, mut txs: HashMap<[u8; 32], TransactionVerificationData>, - context_svc: C, + mut context_svc: C, tx_verifier_svc: TxV, ) -> Result where @@ -201,12 +482,11 @@ where C::Future: Send + 'static, TxV: Service, { - tracing::debug!("getting blockchain context"); - let BlockChainContextResponse::Context(checked_context) = context_svc - .oneshot(BlockChainContextRequest::GetContext) - .await - .map_err(Into::::into)? + .ready() + .await? + .call(BlockChainContextRequest::GetContext) + .await? else { panic!("Context service returned wrong response!"); }; @@ -214,18 +494,24 @@ where let context = checked_context.unchecked_blockchain_context().clone(); tracing::debug!("got blockchain context: {:?}", context); - // Set up the block and just pass it to [`verify_main_chain_block_prepared`] + tracing::debug!( + "Preparing block for verification, expected height: {}", + context.chain_height + ); + + // Set up the block and just pass it to [`verify_prepped_main_chain_block`] let rx_vms = context.rx_vms.clone(); let height = context.chain_height; let prepped_block = rayon_spawn_async(move || { - PrePreparedBlock::new(block, rx_vms.get(&height).map(AsRef::as_ref)) + PreparedBlock::new( + block, + rx_vms.get(&randomx_seed_height(height)).map(AsRef::as_ref), + ) }) .await?; - tracing::debug!("verifying block: {}", hex::encode(prepped_block.block_hash)); - check_block_pow(&prepped_block.pow_hash, context.next_difficulty) .map_err(ConsensusError::Block)?; @@ -233,31 +519,93 @@ where let mut ordered_txs = Vec::with_capacity(txs.len()); - tracing::debug!("Checking we have correct transactions for block."); + tracing::debug!("Ordering transactions for block."); - for tx_hash in &prepped_block.block.txs { - let tx = txs - .remove(tx_hash) - .ok_or(ExtendedConsensusError::TxsIncludedWithBlockIncorrect)?; - ordered_txs.push(Arc::new(tx)); + if !prepped_block.block.txs.is_empty() { + for tx_hash in &prepped_block.block.txs { + let tx = txs + .remove(tx_hash) + .ok_or(ExtendedConsensusError::TxsIncludedWithBlockIncorrect)?; + ordered_txs.push(Arc::new(tx)); + } + drop(txs); } - drop(txs); - tracing::debug!("Verifying transactions for block."); + verify_prepped_main_chain_block( + prepped_block, + ordered_txs, + context_svc, + tx_verifier_svc, + Some(context), + ) + .await +} - tx_verifier_svc - .oneshot(VerifyTxRequest::Prepped { - txs: ordered_txs.clone(), - current_chain_height: context.chain_height, - top_hash: context.top_hash, - time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), - hf: context.current_hf, - }) - .await?; +async fn verify_prepped_main_chain_block( + prepped_block: PreparedBlock, + txs: Vec>, + context_svc: C, + tx_verifier_svc: TxV, + cached_context: Option, +) -> Result +where + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + > + Send + + 'static, + C::Future: Send + 'static, + TxV: Service, +{ + let context = if let Some(context) = cached_context { + context + } else { + let BlockChainContextResponse::Context(checked_context) = context_svc + .oneshot(BlockChainContextRequest::GetContext) + .await + .map_err(Into::::into)? + else { + panic!("Context service returned wrong response!"); + }; + + let context = checked_context.unchecked_blockchain_context().clone(); + + tracing::debug!("got blockchain context: {context:?}"); + + context + }; + + tracing::debug!("verifying block: {}", hex::encode(prepped_block.block_hash)); + + check_block_pow(&prepped_block.pow_hash, context.next_difficulty) + .map_err(ConsensusError::Block)?; + + if prepped_block.block.txs.len() != txs.len() { + return Err(ExtendedConsensusError::TxsIncludedWithBlockIncorrect); + } + + if !prepped_block.block.txs.is_empty() { + for (expected_tx_hash, tx) in prepped_block.block.txs.iter().zip(txs.iter()) { + if expected_tx_hash != &tx.tx_hash { + return Err(ExtendedConsensusError::TxsIncludedWithBlockIncorrect); + } + } + + tx_verifier_svc + .oneshot(VerifyTxRequest::Prepped { + txs: txs.clone(), + current_chain_height: context.chain_height, + top_hash: context.top_hash, + time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), + hf: context.current_hf, + }) + .await?; + } let block_weight = - prepped_block.miner_tx_weight + ordered_txs.iter().map(|tx| tx.tx_weight).sum::(); - let total_fees = ordered_txs.iter().map(|tx| tx.fee).sum::(); + prepped_block.miner_tx_weight + txs.iter().map(|tx| tx.tx_weight).sum::(); + let total_fees = txs.iter().map(|tx| tx.fee).sum::(); tracing::debug!("Verifying block header."); let (_, generated_coins) = check_block( @@ -273,7 +621,7 @@ where block_hash: prepped_block.block_hash, block: prepped_block.block, block_blob: prepped_block.block_blob, - txs: ordered_txs + txs: txs .into_iter() .map(|tx| { // Note: it would be possible for the transaction verification service to hold onto the tx after the call