add block batch prepper

This commit is contained in:
Boog900 2024-06-06 00:24:34 +01:00
parent 6df67bb9d3
commit c4b14727fd
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
2 changed files with 429 additions and 35 deletions

61
Cargo.lock generated
View file

@ -50,6 +50,16 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "async-buffer"
version = "0.1.0"
dependencies = [
"futures",
"pin-project",
"thiserror",
"tokio",
]
[[package]] [[package]]
name = "async-lock" name = "async-lock"
version = "3.3.0" version = "3.3.0"
@ -525,6 +535,7 @@ dependencies = [
"tokio-util", "tokio-util",
"tower", "tower",
"tracing", "tracing",
"tracing-subscriber",
] ]
[[package]] [[package]]
@ -613,7 +624,7 @@ dependencies = [
] ]
[[package]] [[package]]
name = "dandelion_tower" name = "dandelion-tower"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"futures", "futures",
@ -1463,6 +1474,16 @@ dependencies = [
"zeroize", "zeroize",
] ]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]] [[package]]
name = "num-traits" name = "num-traits"
version = "0.2.18" version = "0.2.18"
@ -1510,6 +1531,12 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]] [[package]]
name = "page_size" name = "page_size"
version = "0.6.0" version = "0.6.0"
@ -2136,6 +2163,15 @@ dependencies = [
"keccak", "keccak",
] ]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]] [[package]]
name = "signal-hook-registry" name = "signal-hook-registry"
version = "1.4.2" version = "1.4.2"
@ -2476,6 +2512,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [ dependencies = [
"once_cell", "once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
] ]
[[package]] [[package]]
@ -2484,7 +2532,12 @@ version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
dependencies = [ dependencies = [
"nu-ansi-term",
"sharded-slab",
"smallvec",
"thread_local",
"tracing-core", "tracing-core",
"tracing-log",
] ]
[[package]] [[package]]
@ -2543,6 +2596,12 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]] [[package]]
name = "version_check" name = "version_check"
version = "0.9.4" version = "0.9.4"

View file

@ -7,24 +7,82 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use cuprate_helper::asynch::rayon_spawn_async;
use futures::FutureExt; use futures::FutureExt;
use monero_serai::{block::Block, transaction::Input}; use monero_serai::{
block::Block,
transaction::{Input, Transaction},
};
use rayon::prelude::*;
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use tracing::instrument;
use cuprate_consensus_rules::{ use cuprate_consensus_rules::{
blocks::{calculate_pow_hash, check_block, check_block_pow, BlockError, RandomX}, blocks::{
calculate_pow_hash, check_block, check_block_pow, is_randomx_seed_height,
randomx_seed_height, BlockError, RandomX,
},
miner_tx::MinerTxError, miner_tx::MinerTxError,
ConsensusError, HardFork, ConsensusError, HardFork,
}; };
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_types::{VerifiedBlockInformation, VerifiedTransactionInformation}; use cuprate_types::{VerifiedBlockInformation, VerifiedTransactionInformation};
use crate::{ use crate::{
context::{BlockChainContextRequest, BlockChainContextResponse}, context::{
rx_vms::RandomXVM, BlockChainContextRequest, BlockChainContextResponse,
RawBlockChainContext,
},
transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse}, transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse},
Database, ExtendedConsensusError, Database, ExtendedConsensusError,
}; };
/// A pre-prepared block with all data needed to verify it, except the block's proof of work.
#[derive(Debug)]
pub struct PrePreparedBlockExPOW {
/// The block
pub block: Block,
/// The serialised blocks bytes
pub block_blob: Vec<u8>,
/// The blocks hf vote
pub hf_vote: HardFork,
/// The blocks hf version
pub hf_version: HardFork,
/// The blocks hash
pub block_hash: [u8; 32],
/// The height of the block.
pub height: u64,
/// The weight of the blocks miner transaction.
pub miner_tx_weight: usize,
}
impl PrePreparedBlockExPOW {
pub fn new(block: Block) -> Result<PrePreparedBlockExPOW, ConsensusError> {
let (hf_version, hf_vote) =
HardFork::from_block_header(&block.header).map_err(BlockError::HardForkError)?;
let Some(Input::Gen(height)) = block.miner_tx.prefix.inputs.first() else {
Err(ConsensusError::Block(BlockError::MinerTxError(
MinerTxError::InputNotOfTypeGen,
)))?
};
Ok(PrePreparedBlockExPOW {
block_blob: block.serialize(),
hf_vote,
hf_version,
block_hash: block.hash(),
height: *height,
miner_tx_weight: block.miner_tx.weight(),
block,
})
}
}
/// A pre-prepared block with all data needed to verify it. /// A pre-prepared block with all data needed to verify it.
#[derive(Debug)] #[derive(Debug)]
pub struct PrePreparedBlock { pub struct PrePreparedBlock {
@ -82,6 +140,32 @@ impl PrePreparedBlock {
block, block,
}) })
} }
/// Creates a new [`PrePreparedBlock`] from a [`PrePreparedBlockExPOW`].
///
/// The randomX VM must be Some if RX is needed or this will panic.
/// The randomX VM must also be initialised with the correct seed.
fn new_prepped<R: RandomX>(
block: PrePreparedBlockExPOW,
randomx_vm: Option<&R>,
) -> Result<PrePreparedBlock, ConsensusError> {
Ok(PrePreparedBlock {
block_blob: block.block_blob,
hf_vote: block.hf_vote,
hf_version: block.hf_version,
block_hash: block.block_hash,
pow_hash: calculate_pow_hash(
randomx_vm,
&block.block.serialize_hashable(),
block.height,
&block.hf_version,
)?,
miner_tx_weight: block.block.miner_tx.weight(),
block: block.block,
})
}
} }
/// A request to verify a block. /// A request to verify a block.
@ -91,12 +175,28 @@ pub enum VerifyBlockRequest {
block: Block, block: Block,
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
}, },
/// Verifies a prepared block.
MainChainPrepped {
/// The already prepared block.
block: PrePreparedBlock,
/// The full list of transactions for this block, in order given in `block`.
txs: Vec<Arc<TransactionVerificationData>>,
},
/// Batch prepares a list of blocks and transactions for verification.
MainChainBatchPrepareBlocks {
/// The list of blocks.
blocks: Vec<(Block, Vec<Transaction>)>,
},
} }
/// A response from a verify block request. /// A response from a verify block request.
#[allow(clippy::large_enum_variant)] // The largest variant is most common ([`MainChain`])
pub enum VerifyBlockResponse { pub enum VerifyBlockResponse {
/// This block is valid. /// This block is valid.
MainChain(VerifiedBlockInformation), MainChain(VerifiedBlockInformation),
/// A list of prepared blocks for verification, you should call [`VerifyBlockRequest::MainChainPrepped`] on each of the returned
/// blocks to fully verify them.
MainChainBatchPrepped(Vec<(PrePreparedBlock, Vec<Arc<TransactionVerificationData>>)>),
} }
/// The block verifier service. /// The block verifier service.
@ -178,17 +278,185 @@ where
} => { } => {
verify_main_chain_block(block, prepared_txs, context_svc, tx_verifier_svc).await verify_main_chain_block(block, prepared_txs, context_svc, tx_verifier_svc).await
} }
VerifyBlockRequest::MainChainBatchPrepareBlocks { blocks } => {
batch_prepare_main_chain_block(blocks, context_svc).await
}
VerifyBlockRequest::MainChainPrepped { block, txs } => {
verify_prepped_main_chain_block(block, txs, context_svc, tx_verifier_svc, None)
.await
}
} }
} }
.boxed() .boxed()
} }
} }
/// Batch prepares a list of blocks for verification.
#[instrument(level="debug", name="batch_prep_blocks" skip_all, fields(amt=blocks.len()))]
async fn batch_prepare_main_chain_block<C>(
blocks: Vec<(Block, Vec<Transaction>)>,
mut context_svc: C,
) -> Result<VerifyBlockResponse, ExtendedConsensusError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
C::Future: Send + 'static,
{
let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
tracing::debug!("Calculating block hashes.");
let blocks: Vec<PrePreparedBlockExPOW> = rayon_spawn_async(|| {
blocks
.into_iter()
.map(PrePreparedBlockExPOW::new)
.collect::<Result<Vec<_>, _>>()
})
.await?;
// A Vec of (timestamp, HF) for each block to calculate the expected difficulty for each block.
let mut timestamps_hfs = Vec::with_capacity(blocks.len());
let mut new_rx_vm = None;
tracing::debug!("Checking blocks follow each other.");
// For every block make sure they have the correct height and previous ID
for window in blocks.windows(2) {
if window[0].block_hash != window[1].block.header.previous
|| window[0].height != window[1].height - 1
{
tracing::debug!("Blocks do not follow each other, verification failed.");
Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?;
}
// Cache any potential RX VM seeds as we may need them for future blocks in the batch.
if is_randomx_seed_height(window[0].height) {
new_rx_vm = Some((window[0].height, window[0].block_hash));
}
timestamps_hfs.push((window[0].block.header.timestamp, window[0].hf_version))
}
// Get the current blockchain context.
let BlockChainContextResponse::Context(checked_context) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::GetContext)
.await
.map_err(Into::<ExtendedConsensusError>::into)?
else {
panic!("Context service returned wrong response!");
};
// Calculate the expected difficulties for each block in the batch.
let BlockChainContextResponse::BatchDifficulties(difficulties) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::BatchGetDifficulties(
timestamps_hfs,
))
.await
.map_err(Into::<ExtendedConsensusError>::into)?
else {
panic!("Context service returned wrong response!");
};
let context = checked_context.unchecked_blockchain_context().clone();
// Make sure the blocks follow the main chain.
if context.chain_height != blocks[0].height {
tracing::debug!("Blocks do not follow main chain, verification failed.");
Err(ConsensusError::Block(BlockError::MinerTxError(
MinerTxError::InputsHeightIncorrect,
)))?;
}
if context.top_hash != blocks[0].block.header.previous {
tracing::debug!("Blocks do not follow main chain, verification failed.");
Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?;
}
let mut rx_vms = context.rx_vms;
// If we have a RX seed in the batch calculate it.
if let Some((new_vm_height, new_vm_seed)) = new_rx_vm {
tracing::debug!("New randomX seed in batch, initialising VM");
let new_vm = rayon_spawn_async(move || {
Arc::new(RandomXVM::new(&new_vm_seed).expect("RandomX VM gave an error on set up!"))
})
.await;
context_svc
.ready()
.await?
.call(BlockChainContextRequest::NewRXVM((
new_vm_seed,
new_vm.clone(),
)))
.await
.map_err(Into::<ExtendedConsensusError>::into)?;
rx_vms.insert(new_vm_height, new_vm);
}
tracing::debug!("Calculating PoW and prepping transaction");
let blocks = rayon_spawn_async(move || {
blocks
.into_par_iter()
.zip(difficulties)
.zip(txs)
.map(|((block, difficultly), txs)| {
// Calculate the PoW for the block.
let height = block.height;
let block = PrePreparedBlock::new_prepped(
block,
rx_vms.get(&randomx_seed_height(height)).map(AsRef::as_ref),
)?;
// Check the PoW
check_block_pow(&block.pow_hash, difficultly).map_err(ConsensusError::Block)?;
// Now setup the txs.
let mut txs = txs
.into_par_iter()
.map(|tx| {
let tx = TransactionVerificationData::new(tx)?;
Ok::<_, ConsensusError>((tx.tx_hash, tx))
})
.collect::<Result<HashMap<_, _>, _>>()?;
// Order the txs correctly.
let mut ordered_txs = Vec::with_capacity(txs.len());
for tx_hash in &block.block.txs {
let tx = txs
.remove(tx_hash)
.ok_or(ExtendedConsensusError::TxsIncludedWithBlockIncorrect)?;
ordered_txs.push(Arc::new(tx));
}
Ok((block, ordered_txs))
})
.collect::<Result<Vec<_>, ExtendedConsensusError>>()
})
.await?;
Ok(VerifyBlockResponse::MainChainBatchPrepped(blocks))
}
/// Verifies a prepared block. /// Verifies a prepared block.
async fn verify_main_chain_block<C, TxV>( async fn verify_main_chain_block<C, TxV>(
block: Block, block: Block,
mut txs: HashMap<[u8; 32], TransactionVerificationData>, mut txs: HashMap<[u8; 32], TransactionVerificationData>,
context_svc: C, mut context_svc: C,
tx_verifier_svc: TxV, tx_verifier_svc: TxV,
) -> Result<VerifyBlockResponse, ExtendedConsensusError> ) -> Result<VerifyBlockResponse, ExtendedConsensusError>
where where
@ -201,12 +469,11 @@ where
C::Future: Send + 'static, C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>, TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>,
{ {
tracing::debug!("getting blockchain context");
let BlockChainContextResponse::Context(checked_context) = context_svc let BlockChainContextResponse::Context(checked_context) = context_svc
.oneshot(BlockChainContextRequest::GetContext) .ready()
.await .await?
.map_err(Into::<ExtendedConsensusError>::into)? .call(BlockChainContextRequest::GetContext)
.await?
else { else {
panic!("Context service returned wrong response!"); panic!("Context service returned wrong response!");
}; };
@ -214,18 +481,24 @@ where
let context = checked_context.unchecked_blockchain_context().clone(); let context = checked_context.unchecked_blockchain_context().clone();
tracing::debug!("got blockchain context: {:?}", context); tracing::debug!("got blockchain context: {:?}", context);
// Set up the block and just pass it to [`verify_main_chain_block_prepared`] tracing::debug!(
"Preparing block for verification, expected height: {}",
context.chain_height
);
// Set up the block and just pass it to [`verify_prepped_main_chain_block`]
let rx_vms = context.rx_vms.clone(); let rx_vms = context.rx_vms.clone();
let height = context.chain_height; let height = context.chain_height;
let prepped_block = rayon_spawn_async(move || { let prepped_block = rayon_spawn_async(move || {
PrePreparedBlock::new(block, rx_vms.get(&height).map(AsRef::as_ref)) PrePreparedBlock::new(
block,
rx_vms.get(&randomx_seed_height(height)).map(AsRef::as_ref),
)
}) })
.await?; .await?;
tracing::debug!("verifying block: {}", hex::encode(prepped_block.block_hash));
check_block_pow(&prepped_block.pow_hash, context.next_difficulty) check_block_pow(&prepped_block.pow_hash, context.next_difficulty)
.map_err(ConsensusError::Block)?; .map_err(ConsensusError::Block)?;
@ -233,31 +506,93 @@ where
let mut ordered_txs = Vec::with_capacity(txs.len()); let mut ordered_txs = Vec::with_capacity(txs.len());
tracing::debug!("Checking we have correct transactions for block."); tracing::debug!("Ordering transactions for block.");
for tx_hash in &prepped_block.block.txs { if !prepped_block.block.txs.is_empty() {
let tx = txs for tx_hash in &prepped_block.block.txs {
.remove(tx_hash) let tx = txs
.ok_or(ExtendedConsensusError::TxsIncludedWithBlockIncorrect)?; .remove(tx_hash)
ordered_txs.push(Arc::new(tx)); .ok_or(ExtendedConsensusError::TxsIncludedWithBlockIncorrect)?;
ordered_txs.push(Arc::new(tx));
}
drop(txs);
} }
drop(txs);
tracing::debug!("Verifying transactions for block."); verify_prepped_main_chain_block(
prepped_block,
ordered_txs,
context_svc,
tx_verifier_svc,
Some(context),
)
.await
}
tx_verifier_svc async fn verify_prepped_main_chain_block<C, TxV>(
.oneshot(VerifyTxRequest::Prepped { prepped_block: PrePreparedBlock,
txs: ordered_txs.clone(), txs: Vec<Arc<TransactionVerificationData>>,
current_chain_height: context.chain_height, context_svc: C,
top_hash: context.top_hash, tx_verifier_svc: TxV,
time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), cached_context: Option<RawBlockChainContext>,
hf: context.current_hf, ) -> Result<VerifyBlockResponse, ExtendedConsensusError>
}) where
.await?; C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>,
{
let context = if let Some(context) = cached_context {
context
} else {
let BlockChainContextResponse::Context(checked_context) = context_svc
.oneshot(BlockChainContextRequest::GetContext)
.await
.map_err(Into::<ExtendedConsensusError>::into)?
else {
panic!("Context service returned wrong response!");
};
let context = checked_context.unchecked_blockchain_context().clone();
tracing::debug!("got blockchain context: {:?}", context);
context
};
tracing::debug!("verifying block: {}", hex::encode(prepped_block.block_hash));
check_block_pow(&prepped_block.pow_hash, context.next_difficulty)
.map_err(ConsensusError::Block)?;
if prepped_block.block.txs.len() != txs.len() {
return Err(ExtendedConsensusError::TxsIncludedWithBlockIncorrect);
}
if !prepped_block.block.txs.is_empty() {
for (expected_tx_hash, tx) in prepped_block.block.txs.iter().zip(txs.iter()) {
if expected_tx_hash != &tx.tx_hash {
return Err(ExtendedConsensusError::TxsIncludedWithBlockIncorrect);
}
}
tx_verifier_svc
.oneshot(VerifyTxRequest::Prepped {
txs: txs.clone(),
current_chain_height: context.chain_height,
top_hash: context.top_hash,
time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(),
hf: context.current_hf,
})
.await?;
}
let block_weight = let block_weight =
prepped_block.miner_tx_weight + ordered_txs.iter().map(|tx| tx.tx_weight).sum::<usize>(); prepped_block.miner_tx_weight + txs.iter().map(|tx| tx.tx_weight).sum::<usize>();
let total_fees = ordered_txs.iter().map(|tx| tx.fee).sum::<u64>(); let total_fees = txs.iter().map(|tx| tx.fee).sum::<u64>();
tracing::debug!("Verifying block header."); tracing::debug!("Verifying block header.");
let (_, generated_coins) = check_block( let (_, generated_coins) = check_block(
@ -273,7 +608,7 @@ where
block_hash: prepped_block.block_hash, block_hash: prepped_block.block_hash,
block: prepped_block.block, block: prepped_block.block,
block_blob: prepped_block.block_blob, block_blob: prepped_block.block_blob,
txs: ordered_txs txs: txs
.into_iter() .into_iter()
.map(|tx| { .map(|tx| {
// Note: it would be possible for the transaction verification service to hold onto the tx after the call // Note: it would be possible for the transaction verification service to hold onto the tx after the call