diff --git a/Cargo.lock b/Cargo.lock index 0e507c41..b89874db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,6 +50,16 @@ dependencies = [ "libc", ] +[[package]] +name = "async-buffer" +version = "0.1.0" +dependencies = [ + "futures", + "pin-project", + "thiserror", + "tokio", +] + [[package]] name = "async-lock" version = "3.3.0" @@ -525,6 +535,7 @@ dependencies = [ "tokio-util", "tower", "tracing", + "tracing-subscriber", ] [[package]] @@ -613,7 +624,7 @@ dependencies = [ ] [[package]] -name = "dandelion_tower" +name = "dandelion-tower" version = "0.1.0" dependencies = [ "futures", @@ -1463,6 +1474,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-traits" version = "0.2.18" @@ -1510,6 +1531,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "page_size" version = "0.6.0" @@ -2136,6 +2163,15 @@ dependencies = [ "keccak", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -2476,6 +2512,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", ] [[package]] @@ -2484,7 +2532,12 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", "tracing-core", + "tracing-log", ] [[package]] @@ -2543,6 +2596,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "version_check" version = "0.9.4" diff --git a/consensus/src/block.rs b/consensus/src/block.rs index a17709c6..7a423746 100644 --- a/consensus/src/block.rs +++ b/consensus/src/block.rs @@ -7,24 +7,82 @@ use std::{ task::{Context, Poll}, }; -use cuprate_helper::asynch::rayon_spawn_async; use futures::FutureExt; -use monero_serai::{block::Block, transaction::Input}; +use monero_serai::{ + block::Block, + transaction::{Input, Transaction}, +}; +use rayon::prelude::*; use tower::{Service, ServiceExt}; +use tracing::instrument; use cuprate_consensus_rules::{ - blocks::{calculate_pow_hash, check_block, check_block_pow, BlockError, RandomX}, + blocks::{ + calculate_pow_hash, check_block, check_block_pow, is_randomx_seed_height, + randomx_seed_height, BlockError, RandomX, + }, miner_tx::MinerTxError, ConsensusError, HardFork, }; +use cuprate_helper::asynch::rayon_spawn_async; use cuprate_types::{VerifiedBlockInformation, VerifiedTransactionInformation}; use crate::{ - context::{BlockChainContextRequest, BlockChainContextResponse}, + context::{ + rx_vms::RandomXVM, BlockChainContextRequest, BlockChainContextResponse, + RawBlockChainContext, + }, transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse}, Database, ExtendedConsensusError, }; +/// A pre-prepared block with all data needed to verify it, except the block's proof of work. +#[derive(Debug)] +pub struct PrePreparedBlockExPOW { + /// The block + pub block: Block, + /// The serialised blocks bytes + pub block_blob: Vec, + + /// The blocks hf vote + pub hf_vote: HardFork, + /// The blocks hf version + pub hf_version: HardFork, + + /// The blocks hash + pub block_hash: [u8; 32], + /// The height of the block. + pub height: u64, + + /// The weight of the blocks miner transaction. + pub miner_tx_weight: usize, +} + +impl PrePreparedBlockExPOW { + pub fn new(block: Block) -> Result { + let (hf_version, hf_vote) = + HardFork::from_block_header(&block.header).map_err(BlockError::HardForkError)?; + + let Some(Input::Gen(height)) = block.miner_tx.prefix.inputs.first() else { + Err(ConsensusError::Block(BlockError::MinerTxError( + MinerTxError::InputNotOfTypeGen, + )))? + }; + + Ok(PrePreparedBlockExPOW { + block_blob: block.serialize(), + hf_vote, + hf_version, + + block_hash: block.hash(), + height: *height, + + miner_tx_weight: block.miner_tx.weight(), + block, + }) + } +} + /// A pre-prepared block with all data needed to verify it. #[derive(Debug)] pub struct PrePreparedBlock { @@ -82,6 +140,32 @@ impl PrePreparedBlock { block, }) } + + /// Creates a new [`PrePreparedBlock`] from a [`PrePreparedBlockExPOW`]. + /// + /// The randomX VM must be Some if RX is needed or this will panic. + /// The randomX VM must also be initialised with the correct seed. + fn new_prepped( + block: PrePreparedBlockExPOW, + randomx_vm: Option<&R>, + ) -> Result { + Ok(PrePreparedBlock { + block_blob: block.block_blob, + hf_vote: block.hf_vote, + hf_version: block.hf_version, + + block_hash: block.block_hash, + pow_hash: calculate_pow_hash( + randomx_vm, + &block.block.serialize_hashable(), + block.height, + &block.hf_version, + )?, + + miner_tx_weight: block.block.miner_tx.weight(), + block: block.block, + }) + } } /// A request to verify a block. @@ -91,12 +175,28 @@ pub enum VerifyBlockRequest { block: Block, prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, }, + /// Verifies a prepared block. + MainChainPrepped { + /// The already prepared block. + block: PrePreparedBlock, + /// The full list of transactions for this block, in order given in `block`. + txs: Vec>, + }, + /// Batch prepares a list of blocks and transactions for verification. + MainChainBatchPrepareBlocks { + /// The list of blocks. + blocks: Vec<(Block, Vec)>, + }, } /// A response from a verify block request. +#[allow(clippy::large_enum_variant)] // The largest variant is most common ([`MainChain`]) pub enum VerifyBlockResponse { /// This block is valid. MainChain(VerifiedBlockInformation), + /// A list of prepared blocks for verification, you should call [`VerifyBlockRequest::MainChainPrepped`] on each of the returned + /// blocks to fully verify them. + MainChainBatchPrepped(Vec<(PrePreparedBlock, Vec>)>), } /// The block verifier service. @@ -178,17 +278,185 @@ where } => { verify_main_chain_block(block, prepared_txs, context_svc, tx_verifier_svc).await } + VerifyBlockRequest::MainChainBatchPrepareBlocks { blocks } => { + batch_prepare_main_chain_block(blocks, context_svc).await + } + VerifyBlockRequest::MainChainPrepped { block, txs } => { + verify_prepped_main_chain_block(block, txs, context_svc, tx_verifier_svc, None) + .await + } } } .boxed() } } +/// Batch prepares a list of blocks for verification. +#[instrument(level="debug", name="batch_prep_blocks" skip_all, fields(amt=blocks.len()))] +async fn batch_prepare_main_chain_block( + blocks: Vec<(Block, Vec)>, + mut context_svc: C, +) -> Result +where + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + > + Send + + 'static, + C::Future: Send + 'static, +{ + let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip(); + + tracing::debug!("Calculating block hashes."); + let blocks: Vec = rayon_spawn_async(|| { + blocks + .into_iter() + .map(PrePreparedBlockExPOW::new) + .collect::, _>>() + }) + .await?; + + // A Vec of (timestamp, HF) for each block to calculate the expected difficulty for each block. + let mut timestamps_hfs = Vec::with_capacity(blocks.len()); + let mut new_rx_vm = None; + + tracing::debug!("Checking blocks follow each other."); + + // For every block make sure they have the correct height and previous ID + for window in blocks.windows(2) { + if window[0].block_hash != window[1].block.header.previous + || window[0].height != window[1].height - 1 + { + tracing::debug!("Blocks do not follow each other, verification failed."); + Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?; + } + + // Cache any potential RX VM seeds as we may need them for future blocks in the batch. + if is_randomx_seed_height(window[0].height) { + new_rx_vm = Some((window[0].height, window[0].block_hash)); + } + + timestamps_hfs.push((window[0].block.header.timestamp, window[0].hf_version)) + } + + // Get the current blockchain context. + let BlockChainContextResponse::Context(checked_context) = context_svc + .ready() + .await? + .call(BlockChainContextRequest::GetContext) + .await + .map_err(Into::::into)? + else { + panic!("Context service returned wrong response!"); + }; + + // Calculate the expected difficulties for each block in the batch. + let BlockChainContextResponse::BatchDifficulties(difficulties) = context_svc + .ready() + .await? + .call(BlockChainContextRequest::BatchGetDifficulties( + timestamps_hfs, + )) + .await + .map_err(Into::::into)? + else { + panic!("Context service returned wrong response!"); + }; + + let context = checked_context.unchecked_blockchain_context().clone(); + + // Make sure the blocks follow the main chain. + + if context.chain_height != blocks[0].height { + tracing::debug!("Blocks do not follow main chain, verification failed."); + + Err(ConsensusError::Block(BlockError::MinerTxError( + MinerTxError::InputsHeightIncorrect, + )))?; + } + + if context.top_hash != blocks[0].block.header.previous { + tracing::debug!("Blocks do not follow main chain, verification failed."); + + Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?; + } + + let mut rx_vms = context.rx_vms; + + // If we have a RX seed in the batch calculate it. + if let Some((new_vm_height, new_vm_seed)) = new_rx_vm { + tracing::debug!("New randomX seed in batch, initialising VM"); + + let new_vm = rayon_spawn_async(move || { + Arc::new(RandomXVM::new(&new_vm_seed).expect("RandomX VM gave an error on set up!")) + }) + .await; + + context_svc + .ready() + .await? + .call(BlockChainContextRequest::NewRXVM(( + new_vm_seed, + new_vm.clone(), + ))) + .await + .map_err(Into::::into)?; + + rx_vms.insert(new_vm_height, new_vm); + } + + tracing::debug!("Calculating PoW and prepping transaction"); + + let blocks = rayon_spawn_async(move || { + blocks + .into_par_iter() + .zip(difficulties) + .zip(txs) + .map(|((block, difficultly), txs)| { + // Calculate the PoW for the block. + let height = block.height; + let block = PrePreparedBlock::new_prepped( + block, + rx_vms.get(&randomx_seed_height(height)).map(AsRef::as_ref), + )?; + + // Check the PoW + check_block_pow(&block.pow_hash, difficultly).map_err(ConsensusError::Block)?; + + // Now setup the txs. + let mut txs = txs + .into_par_iter() + .map(|tx| { + let tx = TransactionVerificationData::new(tx)?; + Ok::<_, ConsensusError>((tx.tx_hash, tx)) + }) + .collect::, _>>()?; + + // Order the txs correctly. + let mut ordered_txs = Vec::with_capacity(txs.len()); + + for tx_hash in &block.block.txs { + let tx = txs + .remove(tx_hash) + .ok_or(ExtendedConsensusError::TxsIncludedWithBlockIncorrect)?; + ordered_txs.push(Arc::new(tx)); + } + + Ok((block, ordered_txs)) + }) + .collect::, ExtendedConsensusError>>() + }) + .await?; + + Ok(VerifyBlockResponse::MainChainBatchPrepped(blocks)) +} + /// Verifies a prepared block. async fn verify_main_chain_block( block: Block, mut txs: HashMap<[u8; 32], TransactionVerificationData>, - context_svc: C, + mut context_svc: C, tx_verifier_svc: TxV, ) -> Result where @@ -201,12 +469,11 @@ where C::Future: Send + 'static, TxV: Service, { - tracing::debug!("getting blockchain context"); - let BlockChainContextResponse::Context(checked_context) = context_svc - .oneshot(BlockChainContextRequest::GetContext) - .await - .map_err(Into::::into)? + .ready() + .await? + .call(BlockChainContextRequest::GetContext) + .await? else { panic!("Context service returned wrong response!"); }; @@ -214,18 +481,24 @@ where let context = checked_context.unchecked_blockchain_context().clone(); tracing::debug!("got blockchain context: {:?}", context); - // Set up the block and just pass it to [`verify_main_chain_block_prepared`] + tracing::debug!( + "Preparing block for verification, expected height: {}", + context.chain_height + ); + + // Set up the block and just pass it to [`verify_prepped_main_chain_block`] let rx_vms = context.rx_vms.clone(); let height = context.chain_height; let prepped_block = rayon_spawn_async(move || { - PrePreparedBlock::new(block, rx_vms.get(&height).map(AsRef::as_ref)) + PrePreparedBlock::new( + block, + rx_vms.get(&randomx_seed_height(height)).map(AsRef::as_ref), + ) }) .await?; - tracing::debug!("verifying block: {}", hex::encode(prepped_block.block_hash)); - check_block_pow(&prepped_block.pow_hash, context.next_difficulty) .map_err(ConsensusError::Block)?; @@ -233,31 +506,93 @@ where let mut ordered_txs = Vec::with_capacity(txs.len()); - tracing::debug!("Checking we have correct transactions for block."); + tracing::debug!("Ordering transactions for block."); - for tx_hash in &prepped_block.block.txs { - let tx = txs - .remove(tx_hash) - .ok_or(ExtendedConsensusError::TxsIncludedWithBlockIncorrect)?; - ordered_txs.push(Arc::new(tx)); + if !prepped_block.block.txs.is_empty() { + for tx_hash in &prepped_block.block.txs { + let tx = txs + .remove(tx_hash) + .ok_or(ExtendedConsensusError::TxsIncludedWithBlockIncorrect)?; + ordered_txs.push(Arc::new(tx)); + } + drop(txs); } - drop(txs); - tracing::debug!("Verifying transactions for block."); + verify_prepped_main_chain_block( + prepped_block, + ordered_txs, + context_svc, + tx_verifier_svc, + Some(context), + ) + .await +} - tx_verifier_svc - .oneshot(VerifyTxRequest::Prepped { - txs: ordered_txs.clone(), - current_chain_height: context.chain_height, - top_hash: context.top_hash, - time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), - hf: context.current_hf, - }) - .await?; +async fn verify_prepped_main_chain_block( + prepped_block: PrePreparedBlock, + txs: Vec>, + context_svc: C, + tx_verifier_svc: TxV, + cached_context: Option, +) -> Result +where + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + > + Send + + 'static, + C::Future: Send + 'static, + TxV: Service, +{ + let context = if let Some(context) = cached_context { + context + } else { + let BlockChainContextResponse::Context(checked_context) = context_svc + .oneshot(BlockChainContextRequest::GetContext) + .await + .map_err(Into::::into)? + else { + panic!("Context service returned wrong response!"); + }; + + let context = checked_context.unchecked_blockchain_context().clone(); + + tracing::debug!("got blockchain context: {:?}", context); + + context + }; + + tracing::debug!("verifying block: {}", hex::encode(prepped_block.block_hash)); + + check_block_pow(&prepped_block.pow_hash, context.next_difficulty) + .map_err(ConsensusError::Block)?; + + if prepped_block.block.txs.len() != txs.len() { + return Err(ExtendedConsensusError::TxsIncludedWithBlockIncorrect); + } + + if !prepped_block.block.txs.is_empty() { + for (expected_tx_hash, tx) in prepped_block.block.txs.iter().zip(txs.iter()) { + if expected_tx_hash != &tx.tx_hash { + return Err(ExtendedConsensusError::TxsIncludedWithBlockIncorrect); + } + } + + tx_verifier_svc + .oneshot(VerifyTxRequest::Prepped { + txs: txs.clone(), + current_chain_height: context.chain_height, + top_hash: context.top_hash, + time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), + hf: context.current_hf, + }) + .await?; + } let block_weight = - prepped_block.miner_tx_weight + ordered_txs.iter().map(|tx| tx.tx_weight).sum::(); - let total_fees = ordered_txs.iter().map(|tx| tx.fee).sum::(); + prepped_block.miner_tx_weight + txs.iter().map(|tx| tx.tx_weight).sum::(); + let total_fees = txs.iter().map(|tx| tx.fee).sum::(); tracing::debug!("Verifying block header."); let (_, generated_coins) = check_block( @@ -273,7 +608,7 @@ where block_hash: prepped_block.block_hash, block: prepped_block.block, block_blob: prepped_block.block_blob, - txs: ordered_txs + txs: txs .into_iter() .map(|tx| { // Note: it would be possible for the transaction verification service to hold onto the tx after the call