Consensus: add alt-chain handling (#214)

* add `pop_blocks` to the difficulty cache

* add a rolling median struct

* use RollingMedian in weight cache

* add pop_blocks to weight cache

* add alt context cache

* add getting alt RX vms

* rework alt cache

* add alt block verify function

* keep alt caches around

* add alt checked alt blocks to the cache

* check the alt blocks timestamp

* add docs + cleanup code

* add popping blocks from the context cache

* finish popping blocks + fix tests

* fix doc

* add a test popping blocks from HF cache

* add a request to clear alt caches

* add back lint

* Apply suggestions from code review

Co-authored-by: hinto-janai <hinto.janai@protonmail.com>

* review fixes

* small changes

* change panic doc

---------

Co-authored-by: hinto-janai <hinto.janai@protonmail.com>
This commit is contained in:
Boog900 2024-07-29 00:13:08 +00:00 committed by GitHub
parent a2bca1b889
commit b44c6b045b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 1801 additions and 412 deletions

2
Cargo.lock generated
View file

@ -535,11 +535,13 @@ dependencies = [
"multiexp",
"proptest",
"proptest-derive",
"rand",
"randomx-rs",
"rayon",
"thiserror",
"thread_local",
"tokio",
"tokio-test",
"tokio-util",
"tower",
"tracing",

View file

@ -29,6 +29,7 @@ tokio = { workspace = true, features = ["rt"] }
tokio-util = { workspace = true }
hex = { workspace = true }
rand = { workspace = true }
[dev-dependencies]
cuprate-test-utils = { path = "../test-utils" }
@ -37,5 +38,6 @@ cuprate-consensus-rules = {path = "./rules", features = ["proptest"]}
hex-literal = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"]}
tokio-test = { workspace = true }
proptest = { workspace = true }
proptest-derive = { workspace = true }

View file

@ -6,7 +6,10 @@ use tower::{Service, ServiceExt};
use cuprate_blockchain::{
config::ConfigBuilder, cuprate_database::RuntimeError, service::DatabaseReadHandle,
};
use cuprate_types::blockchain::{BCReadRequest, BCResponse};
use cuprate_types::{
blockchain::{BCReadRequest, BCResponse},
Chain,
};
use cuprate_fast_sync::{hash_of_hashes, BlockId, HashOfHashes};
@ -19,7 +22,7 @@ async fn read_batch(
let mut block_ids = Vec::<BlockId>::with_capacity(BATCH_SIZE as usize);
for height in height_from..(height_from + BATCH_SIZE) {
let request = BCReadRequest::BlockHash(height);
let request = BCReadRequest::BlockHash(height, Chain::Main);
let response_channel = handle.ready().await?.call(request);
let response = response_channel.await?;

View file

@ -148,7 +148,7 @@ fn block_size_sanity_check(
/// Sanity check on the block weight.
///
/// ref: <https://monero-book.cuprate.org/consensus_rules/blocks.html#block-weight-and-size>
fn check_block_weight(
pub fn check_block_weight(
block_weight: usize,
median_for_block_reward: usize,
) -> Result<(), BlockError> {
@ -184,7 +184,7 @@ fn check_prev_id(block: &Block, top_hash: &[u8; 32]) -> Result<(), BlockError> {
/// Checks the blocks timestamp is in the valid range.
///
/// ref: <https://monero-book.cuprate.org/consensus_rules/blocks.html#timestamp>
fn check_timestamp(block: &Block, median_timestamp: u64) -> Result<(), BlockError> {
pub fn check_timestamp(block: &Block, median_timestamp: u64) -> Result<(), BlockError> {
if block.header.timestamp < median_timestamp
|| block.header.timestamp > current_unix_timestamp() + BLOCK_FUTURE_TIME_LIMIT
{

View file

@ -38,7 +38,7 @@ pub enum HardForkError {
}
/// Information about a given hard-fork.
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct HFInfo {
height: u64,
threshold: u64,
@ -50,7 +50,7 @@ impl HFInfo {
}
/// Information about every hard-fork Monero has had.
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct HFsInfo([HFInfo; NUMB_OF_HARD_FORKS]);
impl HFsInfo {
@ -243,7 +243,7 @@ impl HardFork {
}
/// A struct holding the current voting state of the blockchain.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct HFVotes {
votes: [u64; NUMB_OF_HARD_FORKS],
vote_list: VecDeque<HardFork>,
@ -293,6 +293,28 @@ impl HFVotes {
}
}
/// Pop a number of blocks from the top of the cache and push some values into the front of the cache,
/// i.e. the oldest blocks.
///
/// `old_block_votes` should contain the HFs below the window that now will be in the window after popping
/// blocks from the top.
///
/// # Panics
///
/// This will panic if `old_block_votes` contains more HFs than `numb_blocks`.
pub fn reverse_blocks(&mut self, numb_blocks: usize, old_block_votes: Self) {
assert!(old_block_votes.vote_list.len() <= numb_blocks);
for hf in self.vote_list.drain(self.vote_list.len() - numb_blocks..) {
self.votes[hf as usize - 1] -= 1;
}
for old_vote in old_block_votes.vote_list.into_iter().rev() {
self.vote_list.push_front(old_vote);
self.votes[old_vote as usize - 1] += 1;
}
}
/// Returns the total votes for a hard-fork.
///
/// ref: <https://monero-book.cuprate.org/consensus_rules/hardforks.html#accepting-a-fork>

View file

@ -12,31 +12,35 @@ use monero_serai::{
block::Block,
transaction::{Input, Transaction},
};
use rayon::prelude::*;
use tower::{Service, ServiceExt};
use tracing::instrument;
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_types::{
AltBlockInformation, VerifiedBlockInformation, VerifiedTransactionInformation,
};
use cuprate_consensus_rules::{
blocks::{
calculate_pow_hash, check_block, check_block_pow, is_randomx_seed_height,
randomx_seed_height, BlockError, RandomX,
calculate_pow_hash, check_block, check_block_pow, randomx_seed_height, BlockError, RandomX,
},
hard_forks::HardForkError,
miner_tx::MinerTxError,
ConsensusError, HardFork,
};
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_types::{VerifiedBlockInformation, VerifiedTransactionInformation};
use crate::{
context::{
rx_vms::RandomXVM, BlockChainContextRequest, BlockChainContextResponse,
RawBlockChainContext,
},
context::{BlockChainContextRequest, BlockChainContextResponse, RawBlockChainContext},
transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse},
Database, ExtendedConsensusError,
};
mod alt_block;
mod batch_prepare;
mod free;
use alt_block::sanity_check_alt_block;
use batch_prepare::batch_prepare_main_chain_block;
use free::pull_ordered_transactions;
/// A pre-prepared block with all data needed to verify it, except the block's proof of work.
#[derive(Debug)]
pub struct PreparedBlockExPow {
@ -124,7 +128,7 @@ impl PreparedBlock {
let (hf_version, hf_vote) =
HardFork::from_block_header(&block.header).map_err(BlockError::HardForkError)?;
let Some(Input::Gen(height)) = block.miner_tx.prefix.inputs.first() else {
let [Input::Gen(height)] = &block.miner_tx.prefix.inputs[..] else {
Err(ConsensusError::Block(BlockError::MinerTxError(
MinerTxError::InputNotOfTypeGen,
)))?
@ -191,6 +195,7 @@ pub enum VerifyBlockRequest {
/// The already prepared block.
block: PreparedBlock,
/// The full list of transactions for this block, in the order given in `block`.
// TODO: Remove the Arc here
txs: Vec<Arc<TransactionVerificationData>>,
},
/// Batch prepares a list of blocks and transactions for verification.
@ -198,6 +203,16 @@ pub enum VerifyBlockRequest {
/// The list of blocks and their transactions (not necessarily in the order given in the block).
blocks: Vec<(Block, Vec<Transaction>)>,
},
/// A request to sanity check an alt block, also returning the cumulative difficulty of the alt chain.
///
/// Unlike requests to verify main chain blocks, you do not need to add the returned block to the context
/// service, you will still have to add it to the database though.
AltChain {
/// The alt block to sanity check.
block: Block,
/// The alt transactions.
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
},
}
/// A response from a verify block request.
@ -205,6 +220,8 @@ pub enum VerifyBlockRequest {
pub enum VerifyBlockResponse {
/// This block is valid.
MainChain(VerifiedBlockInformation),
/// The sanity checked alt block.
AltChain(AltBlockInformation),
/// A list of prepared blocks for verification, you should call [`VerifyBlockRequest::MainChainPrepped`] on each of the returned
/// blocks to fully verify them.
MainChainBatchPrepped(Vec<(PreparedBlock, Vec<Arc<TransactionVerificationData>>)>),
@ -296,206 +313,20 @@ where
verify_prepped_main_chain_block(block, txs, context_svc, tx_verifier_svc, None)
.await
}
VerifyBlockRequest::AltChain {
block,
prepared_txs,
} => sanity_check_alt_block(block, prepared_txs, context_svc).await,
}
}
.boxed()
}
}
/// Batch prepares a list of blocks for verification.
#[instrument(level = "debug", name = "batch_prep_blocks", skip_all, fields(amt = blocks.len()))]
async fn batch_prepare_main_chain_block<C>(
blocks: Vec<(Block, Vec<Transaction>)>,
mut context_svc: C,
) -> Result<VerifyBlockResponse, ExtendedConsensusError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
C::Future: Send + 'static,
{
let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
tracing::debug!("Calculating block hashes.");
let blocks: Vec<PreparedBlockExPow> = rayon_spawn_async(|| {
blocks
.into_iter()
.map(PreparedBlockExPow::new)
.collect::<Result<Vec<_>, _>>()
})
.await?;
let Some(last_block) = blocks.last() else {
return Err(ExtendedConsensusError::NoBlocksToVerify);
};
// hard-forks cannot be reversed, so the last block will contain the highest hard fork (provided the
// batch is valid).
let top_hf_in_batch = last_block.hf_version;
// A Vec of (timestamp, HF) for each block to calculate the expected difficulty for each block.
let mut timestamps_hfs = Vec::with_capacity(blocks.len());
let mut new_rx_vm = None;
tracing::debug!("Checking blocks follow each other.");
// For every block make sure they have the correct height and previous ID
for window in blocks.windows(2) {
let block_0 = &window[0];
let block_1 = &window[1];
// Make sure no blocks in the batch have a higher hard fork than the last block.
if block_0.hf_version > top_hf_in_batch {
Err(ConsensusError::Block(BlockError::HardForkError(
HardForkError::VersionIncorrect,
)))?;
}
if block_0.block_hash != block_1.block.header.previous
|| block_0.height != block_1.height - 1
{
tracing::debug!("Blocks do not follow each other, verification failed.");
Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?;
}
// Cache any potential RX VM seeds as we may need them for future blocks in the batch.
if is_randomx_seed_height(block_0.height) && top_hf_in_batch >= HardFork::V12 {
new_rx_vm = Some((block_0.height, block_0.block_hash));
}
timestamps_hfs.push((block_0.block.header.timestamp, block_0.hf_version))
}
// Get the current blockchain context.
let BlockChainContextResponse::Context(checked_context) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::GetContext)
.await
.map_err(Into::<ExtendedConsensusError>::into)?
else {
panic!("Context service returned wrong response!");
};
// Calculate the expected difficulties for each block in the batch.
let BlockChainContextResponse::BatchDifficulties(difficulties) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::BatchGetDifficulties(
timestamps_hfs,
))
.await
.map_err(Into::<ExtendedConsensusError>::into)?
else {
panic!("Context service returned wrong response!");
};
let context = checked_context.unchecked_blockchain_context().clone();
// Make sure the blocks follow the main chain.
if context.chain_height != blocks[0].height {
tracing::debug!("Blocks do not follow main chain, verification failed.");
Err(ConsensusError::Block(BlockError::MinerTxError(
MinerTxError::InputsHeightIncorrect,
)))?;
}
if context.top_hash != blocks[0].block.header.previous {
tracing::debug!("Blocks do not follow main chain, verification failed.");
Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?;
}
let mut rx_vms = if top_hf_in_batch < HardFork::V12 {
HashMap::new()
} else {
let BlockChainContextResponse::RxVms(rx_vms) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::GetCurrentRxVm)
.await?
else {
panic!("Blockchain context service returned wrong response!");
};
rx_vms
};
// If we have a RX seed in the batch calculate it.
if let Some((new_vm_height, new_vm_seed)) = new_rx_vm {
tracing::debug!("New randomX seed in batch, initialising VM");
let new_vm = rayon_spawn_async(move || {
Arc::new(RandomXVM::new(&new_vm_seed).expect("RandomX VM gave an error on set up!"))
})
.await;
context_svc
.oneshot(BlockChainContextRequest::NewRXVM((
new_vm_seed,
new_vm.clone(),
)))
.await
.map_err(Into::<ExtendedConsensusError>::into)?;
rx_vms.insert(new_vm_height, new_vm);
}
tracing::debug!("Calculating PoW and prepping transaction");
let blocks = rayon_spawn_async(move || {
blocks
.into_par_iter()
.zip(difficulties)
.zip(txs)
.map(|((block, difficultly), txs)| {
// Calculate the PoW for the block.
let height = block.height;
let block = PreparedBlock::new_prepped(
block,
rx_vms.get(&randomx_seed_height(height)).map(AsRef::as_ref),
)?;
// Check the PoW
check_block_pow(&block.pow_hash, difficultly).map_err(ConsensusError::Block)?;
// Now setup the txs.
let mut txs = txs
.into_par_iter()
.map(|tx| {
let tx = TransactionVerificationData::new(tx)?;
Ok::<_, ConsensusError>((tx.tx_hash, tx))
})
.collect::<Result<HashMap<_, _>, _>>()?;
// Order the txs correctly.
let mut ordered_txs = Vec::with_capacity(txs.len());
for tx_hash in &block.block.txs {
let tx = txs
.remove(tx_hash)
.ok_or(ExtendedConsensusError::TxsIncludedWithBlockIncorrect)?;
ordered_txs.push(Arc::new(tx));
}
Ok((block, ordered_txs))
})
.collect::<Result<Vec<_>, ExtendedConsensusError>>()
})
.await?;
Ok(VerifyBlockResponse::MainChainBatchPrepped(blocks))
}
/// Verifies a prepared block.
async fn verify_main_chain_block<C, TxV>(
block: Block,
mut txs: HashMap<[u8; 32], TransactionVerificationData>,
txs: HashMap<[u8; 32], TransactionVerificationData>,
mut context_svc: C,
tx_verifier_svc: TxV,
) -> Result<VerifyBlockResponse, ExtendedConsensusError>
@ -557,20 +388,11 @@ where
.map_err(ConsensusError::Block)?;
// Check that the txs included are what we need and that there are not any extra.
let mut ordered_txs = Vec::with_capacity(txs.len());
tracing::debug!("Ordering transactions for block.");
if !prepped_block.block.txs.is_empty() {
for tx_hash in &prepped_block.block.txs {
let tx = txs
.remove(tx_hash)
.ok_or(ExtendedConsensusError::TxsIncludedWithBlockIncorrect)?;
ordered_txs.push(Arc::new(tx));
}
drop(txs);
}
// TODO: Remove the Arc here
let ordered_txs = pull_ordered_transactions(&prepped_block.block, txs)?
.into_iter()
.map(Arc::new)
.collect();
verify_prepped_main_chain_block(
prepped_block,
@ -604,8 +426,7 @@ where
} else {
let BlockChainContextResponse::Context(checked_context) = context_svc
.oneshot(BlockChainContextRequest::GetContext)
.await
.map_err(Into::<ExtendedConsensusError>::into)?
.await?
else {
panic!("Context service returned wrong response!");
};

View file

@ -0,0 +1,304 @@
//! Alt Blocks
//!
//! Alt blocks are sanity checked by [`sanity_check_alt_block`], that function will also compute the cumulative
//! difficulty of the alt chain so callers will know if they should re-org to the alt chain.
use std::{collections::HashMap, sync::Arc};
use monero_serai::{block::Block, transaction::Input};
use tower::{Service, ServiceExt};
use cuprate_consensus_rules::{
blocks::{
check_block_pow, check_block_weight, check_timestamp, randomx_seed_height, BlockError,
},
miner_tx::MinerTxError,
ConsensusError,
};
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_types::{AltBlockInformation, Chain, ChainId, VerifiedTransactionInformation};
use crate::{
block::{free::pull_ordered_transactions, PreparedBlock},
context::{
difficulty::DifficultyCache,
rx_vms::RandomXVM,
weight::{self, BlockWeightsCache},
AltChainContextCache, AltChainRequestToken, BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW,
},
transactions::TransactionVerificationData,
BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError,
VerifyBlockResponse,
};
/// This function sanity checks an alt-block.
///
/// Returns [`AltBlockInformation`], which contains the cumulative difficulty of the alt chain.
///
/// This function only checks the block's PoW and its weight.
pub async fn sanity_check_alt_block<C>(
block: Block,
txs: HashMap<[u8; 32], TransactionVerificationData>,
mut context_svc: C,
) -> Result<VerifyBlockResponse, ExtendedConsensusError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
C::Future: Send + 'static,
{
// Fetch the alt-chains context cache.
let BlockChainContextResponse::AltChainContextCache(mut alt_context_cache) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::AltChainContextCache {
prev_id: block.header.previous,
_token: AltChainRequestToken,
})
.await?
else {
panic!("Context service returned wrong response!");
};
// Check if the block's miner input is formed correctly.
let [Input::Gen(height)] = &block.miner_tx.prefix.inputs[..] else {
Err(ConsensusError::Block(BlockError::MinerTxError(
MinerTxError::InputNotOfTypeGen,
)))?
};
if *height != alt_context_cache.chain_height {
Err(ConsensusError::Block(BlockError::MinerTxError(
MinerTxError::InputsHeightIncorrect,
)))?
}
// prep the alt block.
let prepped_block = {
let rx_vm = alt_rx_vm(
alt_context_cache.chain_height,
block.header.major_version,
alt_context_cache.parent_chain,
&mut alt_context_cache,
&mut context_svc,
)
.await?;
rayon_spawn_async(move || PreparedBlock::new(block, rx_vm.as_deref())).await?
};
// get the difficulty cache for this alt chain.
let difficulty_cache = alt_difficulty_cache(
prepped_block.block.header.previous,
&mut alt_context_cache,
&mut context_svc,
)
.await?;
// Check the alt block timestamp is in the correct range.
if let Some(median_timestamp) =
difficulty_cache.median_timestamp(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW.try_into().unwrap())
{
check_timestamp(&prepped_block.block, median_timestamp).map_err(ConsensusError::Block)?
};
let next_difficulty = difficulty_cache.next_difficulty(&prepped_block.hf_version);
// make sure the block's PoW is valid for this difficulty.
check_block_pow(&prepped_block.pow_hash, next_difficulty).map_err(ConsensusError::Block)?;
let cumulative_difficulty = difficulty_cache.cumulative_difficulty() + next_difficulty;
let ordered_txs = pull_ordered_transactions(&prepped_block.block, txs)?;
let block_weight =
prepped_block.miner_tx_weight + ordered_txs.iter().map(|tx| tx.tx_weight).sum::<usize>();
let alt_weight_cache = alt_weight_cache(
prepped_block.block.header.previous,
&mut alt_context_cache,
&mut context_svc,
)
.await?;
// Check the block weight is below the limit.
check_block_weight(
block_weight,
alt_weight_cache.median_for_block_reward(&prepped_block.hf_version),
)
.map_err(ConsensusError::Block)?;
let long_term_weight = weight::calculate_block_long_term_weight(
&prepped_block.hf_version,
block_weight,
alt_weight_cache.median_long_term_weight(),
);
// Get the chainID or generate a new one if this is the first alt block in this alt chain.
let chain_id = *alt_context_cache
.chain_id
.get_or_insert_with(|| ChainId(rand::random()));
// Create the alt block info.
let block_info = AltBlockInformation {
block_hash: prepped_block.block_hash,
block: prepped_block.block,
block_blob: prepped_block.block_blob,
txs: ordered_txs
.into_iter()
.map(|tx| VerifiedTransactionInformation {
tx_blob: tx.tx_blob,
tx_weight: tx.tx_weight,
fee: tx.fee,
tx_hash: tx.tx_hash,
tx: tx.tx,
})
.collect(),
pow_hash: prepped_block.pow_hash,
weight: block_weight,
height: alt_context_cache.chain_height,
long_term_weight,
cumulative_difficulty,
chain_id,
};
// Add this block to the cache.
alt_context_cache.add_new_block(
block_info.height,
block_info.block_hash,
block_info.weight,
block_info.long_term_weight,
block_info.block.header.timestamp,
);
// Add this alt cache back to the context service.
context_svc
.oneshot(BlockChainContextRequest::AddAltChainContextCache {
prev_id: block_info.block.header.previous,
cache: alt_context_cache,
_token: AltChainRequestToken,
})
.await?;
Ok(VerifyBlockResponse::AltChain(block_info))
}
/// Retrieves the alt RX VM for the chosen block height.
///
/// If the `hf` is less than 12 (the height RX activates), then [`None`] is returned.
async fn alt_rx_vm<C>(
block_height: u64,
hf: u8,
parent_chain: Chain,
alt_chain_context: &mut AltChainContextCache,
context_svc: C,
) -> Result<Option<Arc<RandomXVM>>, ExtendedConsensusError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send,
C::Future: Send + 'static,
{
if hf < 12 {
return Ok(None);
}
let seed_height = randomx_seed_height(block_height);
let cached_vm = match alt_chain_context.cached_rx_vm.take() {
// If the VM is cached and the height is the height we need, we can use this VM.
Some((cached_seed_height, vm)) if seed_height == cached_seed_height => {
(cached_seed_height, vm)
}
// Otherwise we need to make a new VM.
_ => {
let BlockChainContextResponse::AltChainRxVM(vm) = context_svc
.oneshot(BlockChainContextRequest::AltChainRxVM {
height: block_height,
chain: parent_chain,
_token: AltChainRequestToken,
})
.await?
else {
panic!("Context service returned wrong response!");
};
(seed_height, vm)
}
};
Ok(Some(
alt_chain_context.cached_rx_vm.insert(cached_vm).1.clone(),
))
}
/// Returns the [`DifficultyCache`] for the alt chain.
async fn alt_difficulty_cache<C>(
prev_id: [u8; 32],
alt_chain_context: &mut AltChainContextCache,
context_svc: C,
) -> Result<&mut DifficultyCache, ExtendedConsensusError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send,
C::Future: Send + 'static,
{
// First look to see if the difficulty cache for this alt chain is already cached.
match &mut alt_chain_context.difficulty_cache {
Some(cache) => Ok(cache),
// Otherwise make a new one.
difficulty_cache => {
let BlockChainContextResponse::AltChainDifficultyCache(cache) = context_svc
.oneshot(BlockChainContextRequest::AltChainDifficultyCache {
prev_id,
_token: AltChainRequestToken,
})
.await?
else {
panic!("Context service returned wrong response!");
};
Ok(difficulty_cache.insert(cache))
}
}
}
/// Returns the [`BlockWeightsCache`] for the alt chain.
async fn alt_weight_cache<C>(
prev_id: [u8; 32],
alt_chain_context: &mut AltChainContextCache,
context_svc: C,
) -> Result<&mut BlockWeightsCache, ExtendedConsensusError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send,
C::Future: Send + 'static,
{
// First look to see if the weight cache for this alt chain is already cached.
match &mut alt_chain_context.weight_cache {
Some(cache) => Ok(cache),
// Otherwise make a new one.
weight_cache => {
let BlockChainContextResponse::AltChainWeightCache(cache) = context_svc
.oneshot(BlockChainContextRequest::AltChainWeightCache {
prev_id,
_token: AltChainRequestToken,
})
.await?
else {
panic!("Context service returned wrong response!");
};
Ok(weight_cache.insert(cache))
}
}
}

View file

@ -0,0 +1,207 @@
use std::{collections::HashMap, sync::Arc};
use monero_serai::{block::Block, transaction::Transaction};
use rayon::prelude::*;
use tower::{Service, ServiceExt};
use tracing::instrument;
use cuprate_consensus_rules::{
blocks::{check_block_pow, is_randomx_seed_height, randomx_seed_height, BlockError},
hard_forks::HardForkError,
miner_tx::MinerTxError,
ConsensusError, HardFork,
};
use cuprate_helper::asynch::rayon_spawn_async;
use crate::{
block::{free::pull_ordered_transactions, PreparedBlock, PreparedBlockExPow},
context::rx_vms::RandomXVM,
transactions::TransactionVerificationData,
BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError,
VerifyBlockResponse,
};
/// Batch prepares a list of blocks for verification.
#[instrument(level = "debug", name = "batch_prep_blocks", skip_all, fields(amt = blocks.len()))]
pub(crate) async fn batch_prepare_main_chain_block<C>(
blocks: Vec<(Block, Vec<Transaction>)>,
mut context_svc: C,
) -> Result<VerifyBlockResponse, ExtendedConsensusError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
C::Future: Send + 'static,
{
let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
tracing::debug!("Calculating block hashes.");
let blocks: Vec<PreparedBlockExPow> = rayon_spawn_async(|| {
blocks
.into_iter()
.map(PreparedBlockExPow::new)
.collect::<Result<Vec<_>, _>>()
})
.await?;
let Some(last_block) = blocks.last() else {
return Err(ExtendedConsensusError::NoBlocksToVerify);
};
// hard-forks cannot be reversed, so the last block will contain the highest hard fork (provided the
// batch is valid).
let top_hf_in_batch = last_block.hf_version;
// A Vec of (timestamp, HF) for each block to calculate the expected difficulty for each block.
let mut timestamps_hfs = Vec::with_capacity(blocks.len());
let mut new_rx_vm = None;
tracing::debug!("Checking blocks follow each other.");
// For every block make sure they have the correct height and previous ID
for window in blocks.windows(2) {
let block_0 = &window[0];
let block_1 = &window[1];
// Make sure no blocks in the batch have a higher hard fork than the last block.
if block_0.hf_version > top_hf_in_batch {
Err(ConsensusError::Block(BlockError::HardForkError(
HardForkError::VersionIncorrect,
)))?;
}
if block_0.block_hash != block_1.block.header.previous
|| block_0.height != block_1.height - 1
{
tracing::debug!("Blocks do not follow each other, verification failed.");
Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?;
}
// Cache any potential RX VM seeds as we may need them for future blocks in the batch.
if is_randomx_seed_height(block_0.height) && top_hf_in_batch >= HardFork::V12 {
new_rx_vm = Some((block_0.height, block_0.block_hash));
}
timestamps_hfs.push((block_0.block.header.timestamp, block_0.hf_version))
}
// Get the current blockchain context.
let BlockChainContextResponse::Context(checked_context) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::GetContext)
.await?
else {
panic!("Context service returned wrong response!");
};
// Calculate the expected difficulties for each block in the batch.
let BlockChainContextResponse::BatchDifficulties(difficulties) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::BatchGetDifficulties(
timestamps_hfs,
))
.await?
else {
panic!("Context service returned wrong response!");
};
let context = checked_context.unchecked_blockchain_context().clone();
// Make sure the blocks follow the main chain.
if context.chain_height != blocks[0].height {
tracing::debug!("Blocks do not follow main chain, verification failed.");
Err(ConsensusError::Block(BlockError::MinerTxError(
MinerTxError::InputsHeightIncorrect,
)))?;
}
if context.top_hash != blocks[0].block.header.previous {
tracing::debug!("Blocks do not follow main chain, verification failed.");
Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?;
}
let mut rx_vms = if top_hf_in_batch < HardFork::V12 {
HashMap::new()
} else {
let BlockChainContextResponse::RxVms(rx_vms) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::GetCurrentRxVm)
.await?
else {
panic!("Blockchain context service returned wrong response!");
};
rx_vms
};
// If we have a RX seed in the batch calculate it.
if let Some((new_vm_height, new_vm_seed)) = new_rx_vm {
tracing::debug!("New randomX seed in batch, initialising VM");
let new_vm = rayon_spawn_async(move || {
Arc::new(RandomXVM::new(&new_vm_seed).expect("RandomX VM gave an error on set up!"))
})
.await;
// Give the new VM to the context service, so it can cache it.
context_svc
.oneshot(BlockChainContextRequest::NewRXVM((
new_vm_seed,
new_vm.clone(),
)))
.await?;
rx_vms.insert(new_vm_height, new_vm);
}
tracing::debug!("Calculating PoW and prepping transaction");
let blocks = rayon_spawn_async(move || {
blocks
.into_par_iter()
.zip(difficulties)
.zip(txs)
.map(|((block, difficultly), txs)| {
// Calculate the PoW for the block.
let height = block.height;
let block = PreparedBlock::new_prepped(
block,
rx_vms.get(&randomx_seed_height(height)).map(AsRef::as_ref),
)?;
// Check the PoW
check_block_pow(&block.pow_hash, difficultly).map_err(ConsensusError::Block)?;
// Now setup the txs.
let txs = txs
.into_par_iter()
.map(|tx| {
let tx = TransactionVerificationData::new(tx)?;
Ok::<_, ConsensusError>((tx.tx_hash, tx))
})
.collect::<Result<HashMap<_, _>, _>>()?;
// Order the txs correctly.
// TODO: Remove the Arc here
let ordered_txs = pull_ordered_transactions(&block.block, txs)?
.into_iter()
.map(Arc::new)
.collect();
Ok((block, ordered_txs))
})
.collect::<Result<Vec<_>, ExtendedConsensusError>>()
})
.await?;
Ok(VerifyBlockResponse::MainChainBatchPrepped(blocks))
}

View file

@ -0,0 +1,32 @@
//! Free functions for block verification
use std::collections::HashMap;
use monero_serai::block::Block;
use crate::{transactions::TransactionVerificationData, ExtendedConsensusError};
/// Returns a list of transactions, pulled from `txs` in the order they are in the [`Block`].
///
/// Will error if a tx need is not in `txs` or if `txs` contain more txs than needed.
pub(crate) fn pull_ordered_transactions(
block: &Block,
mut txs: HashMap<[u8; 32], TransactionVerificationData>,
) -> Result<Vec<TransactionVerificationData>, ExtendedConsensusError> {
if block.txs.len() != txs.len() {
return Err(ExtendedConsensusError::TxsIncludedWithBlockIncorrect);
}
let mut ordered_txs = Vec::with_capacity(txs.len());
if !block.txs.is_empty() {
for tx_hash in &block.txs {
let tx = txs
.remove(tx_hash)
.ok_or(ExtendedConsensusError::TxsIncludedWithBlockIncorrect)?;
ordered_txs.push(tx);
}
drop(txs);
}
Ok(ordered_txs)
}

View file

@ -27,16 +27,22 @@ pub(crate) mod hardforks;
pub(crate) mod rx_vms;
pub(crate) mod weight;
mod alt_chains;
mod task;
mod tokens;
use cuprate_types::Chain;
use difficulty::DifficultyCache;
use rx_vms::RandomXVM;
use weight::BlockWeightsCache;
pub(crate) use alt_chains::{sealed::AltChainRequestToken, AltChainContextCache};
pub use difficulty::DifficultyCacheConfig;
pub use hardforks::HardForkConfig;
use rx_vms::RandomXVM;
pub use tokens::*;
pub use weight::BlockWeightsCacheConfig;
const BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW: u64 = 60;
pub(crate) const BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW: u64 = 60;
/// Config for the context service.
pub struct ContextConfig {
@ -233,6 +239,74 @@ pub enum BlockChainContextRequest {
NewRXVM(([u8; 32], Arc<RandomXVM>)),
/// A request to add a new block to the cache.
Update(NewBlockData),
/// Pop blocks from the cache to the specified height.
PopBlocks {
/// The number of blocks to pop from the top of the chain.
///
/// # Panics
///
/// This will panic if the number of blocks will pop the genesis block.
numb_blocks: u64,
},
/// Clear the alt chain context caches.
ClearAltCache,
//----------------------------------------------------------------------------------------------------------- AltChainRequests
/// A request for an alt chain context cache.
///
/// This variant is private and is not callable from outside this crate, the block verifier service will
/// handle getting the alt cache.
AltChainContextCache {
/// The previous block field in a [`BlockHeader`](monero_serai::block::BlockHeader).
prev_id: [u8; 32],
/// An internal token to prevent external crates calling this request.
_token: AltChainRequestToken,
},
/// A request for a difficulty cache of an alternative chin.
///
/// This variant is private and is not callable from outside this crate, the block verifier service will
/// handle getting the difficulty cache of an alt chain.
AltChainDifficultyCache {
/// The previous block field in a [`BlockHeader`](monero_serai::block::BlockHeader).
prev_id: [u8; 32],
/// An internal token to prevent external crates calling this request.
_token: AltChainRequestToken,
},
/// A request for a block weight cache of an alternative chin.
///
/// This variant is private and is not callable from outside this crate, the block verifier service will
/// handle getting the weight cache of an alt chain.
AltChainWeightCache {
/// The previous block field in a [`BlockHeader`](monero_serai::block::BlockHeader).
prev_id: [u8; 32],
/// An internal token to prevent external crates calling this request.
_token: AltChainRequestToken,
},
/// A request for a RX VM for an alternative chin.
///
/// Response variant: [`BlockChainContextResponse::AltChainRxVM`].
///
/// This variant is private and is not callable from outside this crate, the block verifier service will
/// handle getting the randomX VM of an alt chain.
AltChainRxVM {
/// The height the RandomX VM is needed for.
height: u64,
/// The chain to look in for the seed.
chain: Chain,
/// An internal token to prevent external crates calling this request.
_token: AltChainRequestToken,
},
/// A request to add an alt chain context cache to the context cache.
///
/// This variant is private and is not callable from outside this crate, the block verifier service will
/// handle returning the alt cache to the context service.
AddAltChainContextCache {
/// The previous block field in a [`BlockHeader`](monero_serai::block::BlockHeader).
prev_id: [u8; 32],
/// The cache.
cache: Box<AltChainContextCache>,
/// An internal token to prevent external crates calling this request.
_token: AltChainRequestToken,
},
}
pub enum BlockChainContextResponse {
@ -242,7 +316,15 @@ pub enum BlockChainContextResponse {
RxVms(HashMap<u64, Arc<RandomXVM>>),
/// A list of difficulties.
BatchDifficulties(Vec<u128>),
/// Ok response.
/// An alt chain context cache.
AltChainContextCache(Box<AltChainContextCache>),
/// A difficulty cache for an alt chain.
AltChainDifficultyCache(DifficultyCache),
/// A randomX VM for an alt chain.
AltChainRxVM(Arc<RandomXVM>),
/// A weight cache for an alt chain
AltChainWeightCache(BlockWeightsCache),
/// A generic Ok response.
Ok,
}

View file

@ -0,0 +1,215 @@
use std::{collections::HashMap, sync::Arc};
use tower::ServiceExt;
use cuprate_consensus_rules::{blocks::BlockError, ConsensusError};
use cuprate_types::{
blockchain::{BCReadRequest, BCResponse},
Chain, ChainId,
};
use crate::{
ExtendedConsensusError,
__private::Database,
context::{difficulty::DifficultyCache, rx_vms::RandomXVM, weight::BlockWeightsCache},
};
pub(crate) mod sealed {
/// A token that should be hard to create from outside this crate.
///
/// It is currently possible to safely create this from outside this crate, **DO NOT** rely on this
/// as it will be broken once we find a way to completely seal this.
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
pub struct AltChainRequestToken;
}
/// The context cache of an alternative chain.
#[derive(Debug, Clone)]
pub struct AltChainContextCache {
/// The alt chain weight cache, [`None`] if it has not been built yet.
pub weight_cache: Option<BlockWeightsCache>,
/// The alt chain difficulty cache, [`None`] if it has not been built yet.
pub difficulty_cache: Option<DifficultyCache>,
/// A cached RX VM.
pub cached_rx_vm: Option<(u64, Arc<RandomXVM>)>,
/// The chain height of the alt chain.
pub chain_height: u64,
/// The top hash of the alt chain.
pub top_hash: [u8; 32],
/// The [`ChainID`] of the alt chain.
pub chain_id: Option<ChainId>,
/// The parent [`Chain`] of this alt chain.
pub parent_chain: Chain,
}
impl AltChainContextCache {
/// Add a new block to the cache.
pub fn add_new_block(
&mut self,
height: u64,
block_hash: [u8; 32],
block_weight: usize,
long_term_block_weight: usize,
timestamp: u64,
) {
if let Some(difficulty_cache) = &mut self.difficulty_cache {
difficulty_cache.new_block(height, timestamp, difficulty_cache.cumulative_difficulty());
}
if let Some(weight_cache) = &mut self.weight_cache {
weight_cache.new_block(height, block_weight, long_term_block_weight);
}
self.chain_height += 1;
self.top_hash = block_hash;
}
}
/// A map of top IDs to alt chains.
pub struct AltChainMap {
alt_cache_map: HashMap<[u8; 32], Box<AltChainContextCache>>,
}
impl AltChainMap {
pub fn new() -> Self {
Self {
alt_cache_map: HashMap::new(),
}
}
pub fn clear(&mut self) {
self.alt_cache_map.clear();
}
/// Add an alt chain cache to the map.
pub fn add_alt_cache(&mut self, prev_id: [u8; 32], alt_cache: Box<AltChainContextCache>) {
self.alt_cache_map.insert(prev_id, alt_cache);
}
/// Attempts to take an [`AltChainContextCache`] from the map, returning [`None`] if no cache is
/// present.
pub async fn get_alt_chain_context<D: Database>(
&mut self,
prev_id: [u8; 32],
database: D,
) -> Result<Box<AltChainContextCache>, ExtendedConsensusError> {
if let Some(cache) = self.alt_cache_map.remove(&prev_id) {
return Ok(cache);
}
// find the block with hash == prev_id.
let BCResponse::FindBlock(res) =
database.oneshot(BCReadRequest::FindBlock(prev_id)).await?
else {
panic!("Database returned wrong response");
};
let Some((parent_chain, top_height)) = res else {
// Couldn't find prev_id
Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?
};
Ok(Box::new(AltChainContextCache {
weight_cache: None,
difficulty_cache: None,
cached_rx_vm: None,
chain_height: top_height,
top_hash: prev_id,
chain_id: None,
parent_chain,
}))
}
}
/// Builds a [`DifficultyCache`] for an alt chain.
pub async fn get_alt_chain_difficulty_cache<D: Database + Clone>(
prev_id: [u8; 32],
main_chain_difficulty_cache: &DifficultyCache,
mut database: D,
) -> Result<DifficultyCache, ExtendedConsensusError> {
// find the block with hash == prev_id.
let BCResponse::FindBlock(res) = database
.ready()
.await?
.call(BCReadRequest::FindBlock(prev_id))
.await?
else {
panic!("Database returned wrong response");
};
let Some((chain, top_height)) = res else {
// Can't find prev_id
Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?
};
Ok(match chain {
Chain::Main => {
// prev_id is in main chain, we can use the fast path and clone the main chain cache.
let mut difficulty_cache = main_chain_difficulty_cache.clone();
difficulty_cache
.pop_blocks_main_chain(
difficulty_cache.last_accounted_height - top_height,
database,
)
.await?;
difficulty_cache
}
Chain::Alt(_) => {
// prev_id is in an alt chain, completely rebuild the cache.
DifficultyCache::init_from_chain_height(
top_height + 1,
main_chain_difficulty_cache.config,
database,
chain,
)
.await?
}
})
}
/// Builds a [`BlockWeightsCache`] for an alt chain.
pub async fn get_alt_chain_weight_cache<D: Database + Clone>(
prev_id: [u8; 32],
main_chain_weight_cache: &BlockWeightsCache,
mut database: D,
) -> Result<BlockWeightsCache, ExtendedConsensusError> {
// find the block with hash == prev_id.
let BCResponse::FindBlock(res) = database
.ready()
.await?
.call(BCReadRequest::FindBlock(prev_id))
.await?
else {
panic!("Database returned wrong response");
};
let Some((chain, top_height)) = res else {
// Can't find prev_id
Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?
};
Ok(match chain {
Chain::Main => {
// prev_id is in main chain, we can use the fast path and clone the main chain cache.
let mut weight_cache = main_chain_weight_cache.clone();
weight_cache
.pop_blocks_main_chain(weight_cache.tip_height - top_height, database)
.await?;
weight_cache
}
Chain::Alt(_) => {
// prev_id is in an alt chain, completely rebuild the cache.
BlockWeightsCache::init_from_chain_height(
top_height + 1,
main_chain_weight_cache.config,
database,
chain,
)
.await?
}
})
}

View file

@ -12,7 +12,10 @@ use tower::ServiceExt;
use tracing::instrument;
use cuprate_helper::num::median;
use cuprate_types::blockchain::{BCReadRequest, BCResponse};
use cuprate_types::{
blockchain::{BCReadRequest, BCResponse},
Chain,
};
use crate::{Database, ExtendedConsensusError, HardFork};
@ -28,7 +31,7 @@ const DIFFICULTY_LAG: usize = 15;
/// Configuration for the difficulty cache.
///
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct DifficultyCacheConfig {
pub(crate) window: usize,
pub(crate) cut: usize,
@ -68,7 +71,7 @@ impl DifficultyCacheConfig {
/// This struct is able to calculate difficulties from blockchain information.
///
#[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) struct DifficultyCache {
pub struct DifficultyCache {
/// The list of timestamps in the window.
/// len <= [`DIFFICULTY_BLOCKS_COUNT`]
pub(crate) timestamps: VecDeque<u64>,
@ -87,6 +90,7 @@ impl DifficultyCache {
chain_height: u64,
config: DifficultyCacheConfig,
database: D,
chain: Chain,
) -> Result<Self, ExtendedConsensusError> {
tracing::info!("Initializing difficulty cache this may take a while.");
@ -98,7 +102,9 @@ impl DifficultyCache {
}
let (timestamps, cumulative_difficulties) =
get_blocks_in_pow_info(database.clone(), block_start..chain_height).await?;
get_blocks_in_pow_info(database.clone(), block_start..chain_height, chain).await?;
debug_assert_eq!(timestamps.len() as u64, chain_height - block_start);
tracing::info!(
"Current chain height: {}, accounting for {} blocks timestamps",
@ -116,6 +122,70 @@ impl DifficultyCache {
Ok(diff)
}
/// Pop some blocks from the top of the cache.
///
/// The cache will be returned to the state it would have been in `numb_blocks` ago.
///
/// # Invariant
///
/// This _must_ only be used on a main-chain cache.
#[instrument(name = "pop_blocks_diff_cache", skip_all, fields(numb_blocks = numb_blocks))]
pub async fn pop_blocks_main_chain<D: Database + Clone>(
&mut self,
numb_blocks: u64,
database: D,
) -> Result<(), ExtendedConsensusError> {
let Some(retained_blocks) = self
.timestamps
.len()
.checked_sub(usize::try_from(numb_blocks).unwrap())
else {
// More blocks to pop than we have in the cache, so just restart a new cache.
*self = Self::init_from_chain_height(
self.last_accounted_height - numb_blocks + 1,
self.config,
database,
Chain::Main,
)
.await?;
return Ok(());
};
let current_chain_height = self.last_accounted_height + 1;
let mut new_start_height = current_chain_height
.saturating_sub(self.config.total_block_count())
.saturating_sub(numb_blocks);
// skip the genesis block.
if new_start_height == 0 {
new_start_height = 1;
}
let (mut timestamps, mut cumulative_difficulties) = get_blocks_in_pow_info(
database,
new_start_height
// current_chain_height - self.timestamps.len() blocks are already in the cache.
..(current_chain_height - u64::try_from(self.timestamps.len()).unwrap()),
Chain::Main,
)
.await?;
self.timestamps.drain(retained_blocks..);
self.cumulative_difficulties.drain(retained_blocks..);
timestamps.append(&mut self.timestamps);
cumulative_difficulties.append(&mut self.cumulative_difficulties);
self.timestamps = timestamps;
self.cumulative_difficulties = cumulative_difficulties;
self.last_accounted_height -= numb_blocks;
assert_eq!(self.timestamps.len(), self.cumulative_difficulties.len());
Ok(())
}
/// Add a new block to the difficulty cache.
pub fn new_block(&mut self, height: u64, timestamp: u64, cumulative_difficulty: u128) {
assert_eq!(self.last_accounted_height + 1, height);
@ -200,7 +270,7 @@ impl DifficultyCache {
if self.last_accounted_height + 1 == u64::try_from(numb_blocks).unwrap() {
// if the chain height is equal to `numb_blocks` add the genesis block.
// otherwise if the chain height is less than `numb_blocks` None is returned
// and if its more than it would be excluded from calculations.
// and if it's more it would be excluded from calculations.
let mut timestamps = self.timestamps.clone();
// all genesis blocks have a timestamp of 0.
// https://cuprate.github.io/monero-book/consensus_rules/genesis_block.html
@ -299,11 +369,15 @@ fn get_window_start_and_end(
async fn get_blocks_in_pow_info<D: Database + Clone>(
database: D,
block_heights: Range<u64>,
chain: Chain,
) -> Result<(VecDeque<u64>, VecDeque<u128>), ExtendedConsensusError> {
tracing::info!("Getting blocks timestamps");
let BCResponse::BlockExtendedHeaderInRange(ext_header) = database
.oneshot(BCReadRequest::BlockExtendedHeaderInRange(block_heights))
.oneshot(BCReadRequest::BlockExtendedHeaderInRange(
block_heights,
chain,
))
.await?
else {
panic!("Database sent incorrect response");

View file

@ -4,7 +4,10 @@ use tower::ServiceExt;
use tracing::instrument;
use cuprate_consensus_rules::{HFVotes, HFsInfo, HardFork};
use cuprate_types::blockchain::{BCReadRequest, BCResponse};
use cuprate_types::{
blockchain::{BCReadRequest, BCResponse},
Chain,
};
use crate::{Database, ExtendedConsensusError};
@ -15,7 +18,7 @@ const DEFAULT_WINDOW_SIZE: u64 = 10080; // supermajority window check length - a
/// Configuration for hard-forks.
///
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct HardForkConfig {
/// The network we are on.
pub(crate) info: HFsInfo,
@ -50,7 +53,7 @@ impl HardForkConfig {
}
/// A struct that keeps track of the current hard-fork and current votes.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct HardForkState {
/// The current active hard-fork.
pub(crate) current_hardfork: HardFork,
@ -117,6 +120,50 @@ impl HardForkState {
Ok(hfs)
}
/// Pop some blocks from the top of the cache.
///
/// The cache will be returned to the state it would have been in `numb_blocks` ago.
///
/// # Invariant
///
/// This _must_ only be used on a main-chain cache.
pub async fn pop_blocks_main_chain<D: Database + Clone>(
&mut self,
numb_blocks: u64,
database: D,
) -> Result<(), ExtendedConsensusError> {
let Some(retained_blocks) = self.votes.total_votes().checked_sub(self.config.window) else {
*self = Self::init_from_chain_height(
self.last_height + 1 - numb_blocks,
self.config,
database,
)
.await?;
return Ok(());
};
let current_chain_height = self.last_height + 1;
let oldest_votes = get_votes_in_range(
database,
current_chain_height
.saturating_sub(self.config.window)
.saturating_sub(numb_blocks)
..current_chain_height
.saturating_sub(numb_blocks)
.saturating_sub(retained_blocks),
usize::try_from(numb_blocks).unwrap(),
)
.await?;
self.votes
.reverse_blocks(usize::try_from(numb_blocks).unwrap(), oldest_votes);
self.last_height -= numb_blocks;
Ok(())
}
/// Add a new block to the cache.
pub fn new_block(&mut self, vote: HardFork, height: u64) {
// We don't _need_ to take in `height` but it's for safety, so we don't silently loose track
@ -168,7 +215,10 @@ async fn get_votes_in_range<D: Database>(
let mut votes = HFVotes::new(window_size);
let BCResponse::BlockExtendedHeaderInRange(vote_list) = database
.oneshot(BCReadRequest::BlockExtendedHeaderInRange(block_heights))
.oneshot(BCReadRequest::BlockExtendedHeaderInRange(
block_heights,
Chain::Main,
))
.await?
else {
panic!("Database sent incorrect response!");

View file

@ -15,12 +15,16 @@ use thread_local::ThreadLocal;
use tower::ServiceExt;
use tracing::instrument;
use cuprate_consensus_rules::blocks::randomx_seed_height;
use cuprate_consensus_rules::{
blocks::{is_randomx_seed_height, RandomX, RX_SEEDHASH_EPOCH_BLOCKS},
HardFork,
};
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_types::blockchain::{BCReadRequest, BCResponse};
use cuprate_types::{
blockchain::{BCReadRequest, BCResponse},
Chain,
};
use crate::{Database, ExtendedConsensusError};
@ -124,7 +128,39 @@ impl RandomXVMCache {
self.cached_vm.replace(vm);
}
/// Get the RandomX VMs.
/// Creates a RX VM for an alt chain, looking at the main chain RX VMs to see if we can use one
/// of them first.
pub async fn get_alt_vm<D: Database>(
&mut self,
height: u64,
chain: Chain,
database: D,
) -> Result<Arc<RandomXVM>, ExtendedConsensusError> {
let seed_height = randomx_seed_height(height);
let BCResponse::BlockHash(seed_hash) = database
.oneshot(BCReadRequest::BlockHash(seed_height, chain))
.await?
else {
panic!("Database returned wrong response!");
};
for (vm_main_chain_height, vm_seed_hash) in &self.seeds {
if vm_seed_hash == &seed_hash {
let Some(vm) = self.vms.get(vm_main_chain_height) else {
break;
};
return Ok(vm.clone());
}
}
let alt_vm = rayon_spawn_async(move || Arc::new(RandomXVM::new(&seed_hash).unwrap())).await;
Ok(alt_vm)
}
/// Get the main-chain RandomX VMs.
pub async fn get_vms(&mut self) -> HashMap<u64, Arc<RandomXVM>> {
match self.seeds.len().checked_sub(self.vms.len()) {
// No difference in the amount of seeds to VMs.
@ -176,6 +212,12 @@ impl RandomXVMCache {
self.vms.clone()
}
/// Removes all the RandomX VMs above the `new_height`.
pub fn pop_blocks_main_chain(&mut self, new_height: u64) {
self.seeds.retain(|(height, _)| *height < new_height);
self.vms.retain(|height, _| *height < new_height);
}
/// Add a new block to the VM cache.
///
/// hash is the block hash not the blocks PoW hash.
@ -231,8 +273,10 @@ async fn get_block_hashes<D: Database + Clone>(
for height in heights {
let db = database.clone();
fut.push_back(async move {
let BCResponse::BlockHash(hash) =
db.clone().oneshot(BCReadRequest::BlockHash(height)).await?
let BCResponse::BlockHash(hash) = db
.clone()
.oneshot(BCReadRequest::BlockHash(height, Chain::Main))
.await?
else {
panic!("Database sent incorrect response!");
};

View file

@ -9,14 +9,20 @@ use tower::ServiceExt;
use tracing::Instrument;
use cuprate_consensus_rules::blocks::ContextToVerifyBlock;
use cuprate_types::blockchain::{BCReadRequest, BCResponse};
use cuprate_types::{
blockchain::{BCReadRequest, BCResponse},
Chain,
};
use super::{
use crate::{
context::{
alt_chains::{get_alt_chain_difficulty_cache, get_alt_chain_weight_cache, AltChainMap},
difficulty, hardforks, rx_vms, weight, BlockChainContext, BlockChainContextRequest,
BlockChainContextResponse, ContextConfig, RawBlockChainContext, ValidityToken,
BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW,
},
Database, ExtendedConsensusError,
};
use crate::{Database, ExtendedConsensusError};
/// A request from the context service to the context task.
pub(super) struct ContextTaskRequest {
@ -29,7 +35,7 @@ pub(super) struct ContextTaskRequest {
}
/// The Context task that keeps the blockchain context and handles requests.
pub struct ContextTask {
pub struct ContextTask<D: Database> {
/// A token used to invalidate previous contexts when a new
/// block is added to the chain.
current_validity_token: ValidityToken,
@ -43,25 +49,25 @@ pub struct ContextTask {
/// The hard-fork state cache.
hardfork_state: hardforks::HardForkState,
alt_chain_cache_map: AltChainMap,
/// The current chain height.
chain_height: u64,
/// The top block hash.
top_block_hash: [u8; 32],
/// The total amount of coins generated.
already_generated_coins: u64,
database: D,
}
impl ContextTask {
impl<D: Database + Clone + Send + 'static> ContextTask<D> {
/// Initialize the [`ContextTask`], this will need to pull a lot of data from the database so may take a
/// while to complete.
pub async fn init_context<D>(
pub async fn init_context(
cfg: ContextConfig,
mut database: D,
) -> Result<ContextTask, ExtendedConsensusError>
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
) -> Result<Self, ExtendedConsensusError> {
let ContextConfig {
difficulty_cfg,
weights_config,
@ -82,7 +88,7 @@ impl ContextTask {
let BCResponse::GeneratedCoins(already_generated_coins) = database
.ready()
.await?
.call(BCReadRequest::GeneratedCoins)
.call(BCReadRequest::GeneratedCoins(chain_height - 1))
.await?
else {
panic!("Database sent incorrect response!");
@ -95,13 +101,23 @@ impl ContextTask {
let db = database.clone();
let difficulty_cache_handle = tokio::spawn(async move {
difficulty::DifficultyCache::init_from_chain_height(chain_height, difficulty_cfg, db)
difficulty::DifficultyCache::init_from_chain_height(
chain_height,
difficulty_cfg,
db,
Chain::Main,
)
.await
});
let db = database.clone();
let weight_cache_handle = tokio::spawn(async move {
weight::BlockWeightsCache::init_from_chain_height(chain_height, weights_config, db)
weight::BlockWeightsCache::init_from_chain_height(
chain_height,
weights_config,
db,
Chain::Main,
)
.await
});
@ -120,9 +136,11 @@ impl ContextTask {
weight_cache: weight_cache_handle.await.unwrap()?,
rx_vm_cache: rx_seed_handle.await.unwrap()?,
hardfork_state,
alt_chain_cache_map: AltChainMap::new(),
chain_height,
already_generated_coins,
top_block_hash,
database,
};
Ok(context_svc)
@ -211,6 +229,98 @@ impl ContextTask {
BlockChainContextResponse::Ok
}
BlockChainContextRequest::PopBlocks { numb_blocks } => {
assert!(numb_blocks < self.chain_height);
self.difficulty_cache
.pop_blocks_main_chain(numb_blocks, self.database.clone())
.await?;
self.weight_cache
.pop_blocks_main_chain(numb_blocks, self.database.clone())
.await?;
self.rx_vm_cache
.pop_blocks_main_chain(self.chain_height - numb_blocks - 1);
self.hardfork_state
.pop_blocks_main_chain(numb_blocks, self.database.clone())
.await?;
self.alt_chain_cache_map.clear();
self.chain_height -= numb_blocks;
let BCResponse::GeneratedCoins(already_generated_coins) = self
.database
.ready()
.await?
.call(BCReadRequest::GeneratedCoins(self.chain_height - 1))
.await?
else {
panic!("Database sent incorrect response!");
};
let BCResponse::BlockHash(top_block_hash) = self
.database
.ready()
.await?
.call(BCReadRequest::BlockHash(self.chain_height - 1, Chain::Main))
.await?
else {
panic!("Database returned incorrect response!");
};
self.already_generated_coins = already_generated_coins;
self.top_block_hash = top_block_hash;
std::mem::replace(&mut self.current_validity_token, ValidityToken::new())
.set_data_invalid();
BlockChainContextResponse::Ok
}
BlockChainContextRequest::ClearAltCache => {
self.alt_chain_cache_map.clear();
BlockChainContextResponse::Ok
}
BlockChainContextRequest::AltChainContextCache { prev_id, _token } => {
BlockChainContextResponse::AltChainContextCache(
self.alt_chain_cache_map
.get_alt_chain_context(prev_id, &mut self.database)
.await?,
)
}
BlockChainContextRequest::AltChainDifficultyCache { prev_id, _token } => {
BlockChainContextResponse::AltChainDifficultyCache(
get_alt_chain_difficulty_cache(
prev_id,
&self.difficulty_cache,
self.database.clone(),
)
.await?,
)
}
BlockChainContextRequest::AltChainWeightCache { prev_id, _token } => {
BlockChainContextResponse::AltChainWeightCache(
get_alt_chain_weight_cache(prev_id, &self.weight_cache, self.database.clone())
.await?,
)
}
BlockChainContextRequest::AltChainRxVM {
height,
chain,
_token,
} => BlockChainContextResponse::AltChainRxVM(
self.rx_vm_cache
.get_alt_vm(height, chain, &mut self.database)
.await?,
),
BlockChainContextRequest::AddAltChainContextCache {
prev_id,
cache,
_token,
} => {
self.alt_chain_cache_map.add_alt_cache(prev_id, cache);
BlockChainContextResponse::Ok
}
})
}

View file

@ -8,17 +8,18 @@
//!
use std::{
cmp::{max, min},
collections::VecDeque,
ops::Range,
};
use rayon::prelude::*;
use tower::ServiceExt;
use tracing::instrument;
use cuprate_consensus_rules::blocks::{penalty_free_zone, PENALTY_FREE_ZONE_5};
use cuprate_helper::{asynch::rayon_spawn_async, num::median};
use cuprate_types::blockchain::{BCReadRequest, BCResponse};
use cuprate_helper::{asynch::rayon_spawn_async, num::RollingMedian};
use cuprate_types::{
blockchain::{BCReadRequest, BCResponse},
Chain,
};
use crate::{Database, ExtendedConsensusError, HardFork};
@ -29,7 +30,7 @@ const LONG_TERM_WINDOW: u64 = 100000;
/// Configuration for the block weight cache.
///
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct BlockWeightsCacheConfig {
short_term_window: u64,
long_term_window: u64,
@ -58,25 +59,17 @@ impl BlockWeightsCacheConfig {
///
/// These calculations require a lot of data from the database so by caching
/// this data it reduces the load on the database.
#[derive(Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct BlockWeightsCache {
/// The short term block weights.
short_term_block_weights: VecDeque<usize>,
short_term_block_weights: RollingMedian<usize>,
/// The long term block weights.
long_term_weights: VecDeque<usize>,
/// The short term block weights sorted so we don't have to sort them every time we need
/// the median.
cached_sorted_long_term_weights: Vec<usize>,
/// The long term block weights sorted so we don't have to sort them every time we need
/// the median.
cached_sorted_short_term_weights: Vec<usize>,
long_term_weights: RollingMedian<usize>,
/// The height of the top block.
tip_height: u64,
pub(crate) tip_height: u64,
/// The block weight config.
config: BlockWeightsCacheConfig,
pub(crate) config: BlockWeightsCacheConfig,
}
impl BlockWeightsCache {
@ -86,45 +79,109 @@ impl BlockWeightsCache {
chain_height: u64,
config: BlockWeightsCacheConfig,
database: D,
chain: Chain,
) -> Result<Self, ExtendedConsensusError> {
tracing::info!("Initializing weight cache this may take a while.");
let long_term_weights = get_long_term_weight_in_range(
chain_height.saturating_sub(config.long_term_window)..chain_height,
database.clone(),
chain,
)
.await?;
let short_term_block_weights = get_blocks_weight_in_range(
chain_height.saturating_sub(config.short_term_window)..chain_height,
database,
chain,
)
.await?;
tracing::info!("Initialized block weight cache, chain-height: {:?}, long term weights length: {:?}, short term weights length: {:?}", chain_height, long_term_weights.len(), short_term_block_weights.len());
let mut cloned_short_term_weights = short_term_block_weights.clone();
let mut cloned_long_term_weights = long_term_weights.clone();
Ok(BlockWeightsCache {
short_term_block_weights: short_term_block_weights.into(),
long_term_weights: long_term_weights.into(),
cached_sorted_long_term_weights: rayon_spawn_async(|| {
cloned_long_term_weights.par_sort_unstable();
cloned_long_term_weights
short_term_block_weights: rayon_spawn_async(move || {
RollingMedian::from_vec(
short_term_block_weights,
usize::try_from(config.short_term_window).unwrap(),
)
})
.await,
cached_sorted_short_term_weights: rayon_spawn_async(|| {
cloned_short_term_weights.par_sort_unstable();
cloned_short_term_weights
long_term_weights: rayon_spawn_async(move || {
RollingMedian::from_vec(
long_term_weights,
usize::try_from(config.long_term_window).unwrap(),
)
})
.await,
tip_height: chain_height - 1,
config,
})
}
/// Pop some blocks from the top of the cache.
///
/// The cache will be returned to the state it would have been in `numb_blocks` ago.
#[instrument(name = "pop_blocks_weight_cache", skip_all, fields(numb_blocks = numb_blocks))]
pub async fn pop_blocks_main_chain<D: Database + Clone>(
&mut self,
numb_blocks: u64,
database: D,
) -> Result<(), ExtendedConsensusError> {
if self.long_term_weights.window_len() <= usize::try_from(numb_blocks).unwrap() {
// More blocks to pop than we have in the cache, so just restart a new cache.
*self = Self::init_from_chain_height(
self.tip_height - numb_blocks + 1,
self.config,
database,
Chain::Main,
)
.await?;
return Ok(());
}
let chain_height = self.tip_height + 1;
let new_long_term_start_height = chain_height
.saturating_sub(self.config.long_term_window)
.saturating_sub(numb_blocks);
let old_long_term_weights = get_long_term_weight_in_range(
new_long_term_start_height
// current_chain_height - self.long_term_weights.len() blocks are already in the cache.
..(chain_height - u64::try_from(self.long_term_weights.window_len()).unwrap()),
database.clone(),
Chain::Main,
)
.await?;
let new_short_term_start_height = chain_height
.saturating_sub(self.config.short_term_window)
.saturating_sub(numb_blocks);
let old_short_term_weights = get_blocks_weight_in_range(
new_short_term_start_height
// current_chain_height - self.long_term_weights.len() blocks are already in the cache.
..(chain_height - u64::try_from(self.short_term_block_weights.window_len()).unwrap()),
database,
Chain::Main
)
.await?;
for _ in 0..numb_blocks {
self.short_term_block_weights.pop_back();
self.long_term_weights.pop_back();
}
self.long_term_weights.append_front(old_long_term_weights);
self.short_term_block_weights
.append_front(old_short_term_weights);
self.tip_height -= numb_blocks;
Ok(())
}
/// Add a new block to the cache.
///
/// The block_height **MUST** be one more than the last height the cache has
@ -139,74 +196,19 @@ impl BlockWeightsCache {
long_term_weight
);
// add the new block to the `long_term_weights` list and the sorted `cached_sorted_long_term_weights` list.
self.long_term_weights.push_back(long_term_weight);
match self
.cached_sorted_long_term_weights
.binary_search(&long_term_weight)
{
Ok(idx) | Err(idx) => self
.cached_sorted_long_term_weights
.insert(idx, long_term_weight),
}
self.long_term_weights.push(long_term_weight);
// If the list now has too many entries remove the oldest.
if u64::try_from(self.long_term_weights.len()).unwrap() > self.config.long_term_window {
let val = self
.long_term_weights
.pop_front()
.expect("long term window can't be negative");
match self.cached_sorted_long_term_weights.binary_search(&val) {
Ok(idx) => self.cached_sorted_long_term_weights.remove(idx),
Err(_) => panic!("Long term cache has incorrect values!"),
};
}
// add the block to the short_term_block_weights and the sorted cached_sorted_short_term_weights list.
self.short_term_block_weights.push_back(block_weight);
match self
.cached_sorted_short_term_weights
.binary_search(&block_weight)
{
Ok(idx) | Err(idx) => self
.cached_sorted_short_term_weights
.insert(idx, block_weight),
}
// If there are now too many entries remove the oldest.
if u64::try_from(self.short_term_block_weights.len()).unwrap()
> self.config.short_term_window
{
let val = self
.short_term_block_weights
.pop_front()
.expect("short term window can't be negative");
match self.cached_sorted_short_term_weights.binary_search(&val) {
Ok(idx) => self.cached_sorted_short_term_weights.remove(idx),
Err(_) => panic!("Short term cache has incorrect values"),
};
}
debug_assert_eq!(
self.cached_sorted_long_term_weights.len(),
self.long_term_weights.len()
);
debug_assert_eq!(
self.cached_sorted_short_term_weights.len(),
self.short_term_block_weights.len()
);
self.short_term_block_weights.push(block_weight);
}
/// Returns the median long term weight over the last [`LONG_TERM_WINDOW`] blocks, or custom amount of blocks in the config.
pub fn median_long_term_weight(&self) -> usize {
median(&self.cached_sorted_long_term_weights)
self.long_term_weights.median()
}
/// Returns the median weight over the last [`SHORT_TERM_WINDOW`] blocks, or custom amount of blocks in the config.
pub fn median_short_term_weight(&self) -> usize {
median(&self.cached_sorted_short_term_weights)
self.short_term_block_weights.median()
}
/// Returns the effective median weight, used for block reward calculations and to calculate
@ -290,11 +292,12 @@ pub fn calculate_block_long_term_weight(
async fn get_blocks_weight_in_range<D: Database + Clone>(
range: Range<u64>,
database: D,
chain: Chain,
) -> Result<Vec<usize>, ExtendedConsensusError> {
tracing::info!("getting block weights.");
let BCResponse::BlockExtendedHeaderInRange(ext_headers) = database
.oneshot(BCReadRequest::BlockExtendedHeaderInRange(range))
.oneshot(BCReadRequest::BlockExtendedHeaderInRange(range, chain))
.await?
else {
panic!("Database sent incorrect response!")
@ -311,11 +314,12 @@ async fn get_blocks_weight_in_range<D: Database + Clone>(
async fn get_long_term_weight_in_range<D: Database + Clone>(
range: Range<u64>,
database: D,
chain: Chain,
) -> Result<Vec<usize>, ExtendedConsensusError> {
tracing::info!("getting block long term weights.");
let BCResponse::BlockExtendedHeaderInRange(ext_headers) = database
.oneshot(BCReadRequest::BlockExtendedHeaderInRange(range))
.oneshot(BCReadRequest::BlockExtendedHeaderInRange(range, chain))
.await?
else {
panic!("Database sent incorrect response!")

View file

@ -1,15 +1,15 @@
use std::collections::VecDeque;
use proptest::collection::size_range;
use proptest::collection::{size_range, vec};
use proptest::{prelude::*, prop_assert_eq, prop_compose, proptest};
use cuprate_helper::num::median;
use crate::{
context::difficulty::*,
tests::{context::data::DIF_3000000_3002000, mock_db::*},
HardFork,
};
use cuprate_helper::num::median;
use cuprate_types::Chain;
const TEST_WINDOW: usize = 72;
const TEST_CUT: usize = 6;
@ -26,8 +26,12 @@ async fn first_3_blocks_fixed_difficulty() -> Result<(), tower::BoxError> {
let genesis = DummyBlockExtendedHeader::default().with_difficulty_info(0, 1);
db_builder.add_block(genesis);
let mut difficulty_cache =
DifficultyCache::init_from_chain_height(1, TEST_DIFFICULTY_CONFIG, db_builder.finish(None))
let mut difficulty_cache = DifficultyCache::init_from_chain_height(
1,
TEST_DIFFICULTY_CONFIG,
db_builder.finish(None),
Chain::Main,
)
.await?;
for height in 1..3 {
@ -42,8 +46,12 @@ async fn genesis_block_skipped() -> Result<(), tower::BoxError> {
let mut db_builder = DummyDatabaseBuilder::default();
let genesis = DummyBlockExtendedHeader::default().with_difficulty_info(0, 1);
db_builder.add_block(genesis);
let diff_cache =
DifficultyCache::init_from_chain_height(1, TEST_DIFFICULTY_CONFIG, db_builder.finish(None))
let diff_cache = DifficultyCache::init_from_chain_height(
1,
TEST_DIFFICULTY_CONFIG,
db_builder.finish(None),
Chain::Main,
)
.await?;
assert!(diff_cache.cumulative_difficulties.is_empty());
assert!(diff_cache.timestamps.is_empty());
@ -66,8 +74,9 @@ async fn calculate_diff_3000000_3002000() -> Result<(), tower::BoxError> {
let mut diff_cache = DifficultyCache::init_from_chain_height(
3_000_720,
cfg.clone(),
cfg,
db_builder.finish(Some(3_000_720)),
Chain::Main,
)
.await?;
@ -208,4 +217,52 @@ proptest! {
}
}
#[test]
fn pop_blocks_below_total_blocks(
mut database in arb_dummy_database(20),
new_blocks in vec(any::<(u64, u128)>(), 0..500)
) {
tokio_test::block_on(async move {
let old_cache = DifficultyCache::init_from_chain_height(19, TEST_DIFFICULTY_CONFIG, database.clone(), Chain::Main).await.unwrap();
let blocks_to_pop = new_blocks.len();
let mut new_cache = old_cache.clone();
for (timestamp, cumulative_difficulty) in new_blocks.into_iter() {
database.add_block(DummyBlockExtendedHeader::default().with_difficulty_info(timestamp, cumulative_difficulty));
new_cache.new_block(new_cache.last_accounted_height+1, timestamp, cumulative_difficulty);
}
new_cache.pop_blocks_main_chain(blocks_to_pop as u64, database).await?;
prop_assert_eq!(new_cache, old_cache);
Ok::<_, TestCaseError>(())
})?;
}
#[test]
fn pop_blocks_above_total_blocks(
mut database in arb_dummy_database(2000),
new_blocks in vec(any::<(u64, u128)>(), 0..5_000)
) {
tokio_test::block_on(async move {
let old_cache = DifficultyCache::init_from_chain_height(1999, TEST_DIFFICULTY_CONFIG, database.clone(), Chain::Main).await.unwrap();
let blocks_to_pop = new_blocks.len();
let mut new_cache = old_cache.clone();
for (timestamp, cumulative_difficulty) in new_blocks.into_iter() {
database.add_block(DummyBlockExtendedHeader::default().with_difficulty_info(timestamp, cumulative_difficulty));
new_cache.new_block(new_cache.last_accounted_height+1, timestamp, cumulative_difficulty);
}
new_cache.pop_blocks_main_chain(blocks_to_pop as u64, database).await?;
prop_assert_eq!(new_cache, old_cache);
Ok::<_, TestCaseError>(())
})?;
}
}

View file

@ -1,3 +1,5 @@
use proptest::{collection::vec, prelude::*};
use cuprate_consensus_rules::hard_forks::{HFInfo, HFsInfo, HardFork, NUMB_OF_HARD_FORKS};
use crate::{
@ -82,3 +84,44 @@ async fn hf_v15_v16_correct() {
assert_eq!(state.current_hardfork, HardFork::V16);
}
proptest! {
fn pop_blocks(
hfs in vec(any::<HardFork>(), 0..100),
extra_hfs in vec(any::<HardFork>(), 0..100)
) {
tokio_test::block_on(async move {
let numb_hfs = hfs.len() as u64;
let numb_pop_blocks = extra_hfs.len() as u64;
let mut db_builder = DummyDatabaseBuilder::default();
for hf in hfs {
db_builder.add_block(
DummyBlockExtendedHeader::default().with_hard_fork_info(hf, hf),
);
}
let db = db_builder.finish(Some(numb_hfs as usize));
let mut state = HardForkState::init_from_chain_height(
numb_hfs,
TEST_HARD_FORK_CONFIG,
db.clone(),
)
.await?;
let state_clone = state.clone();
for (i, hf) in extra_hfs.into_iter().enumerate() {
state.new_block(hf, state.last_height + u64::try_from(i).unwrap() + 1);
}
state.pop_blocks_main_chain(numb_pop_blocks, db).await?;
prop_assert_eq!(state_clone, state);
Ok::<(), TestCaseError>(())
})?;
}
}

View file

@ -6,6 +6,7 @@ use crate::{
tests::{context::data::BW_2850000_3050000, mock_db::*},
HardFork,
};
use cuprate_types::Chain;
pub const TEST_WEIGHT_CONFIG: BlockWeightsCacheConfig = BlockWeightsCacheConfig::new(100, 5000);
@ -21,6 +22,7 @@ async fn blocks_out_of_window_not_counted() -> Result<(), tower::BoxError> {
5000,
TEST_WEIGHT_CONFIG,
db_builder.finish(None),
Chain::Main,
)
.await?;
assert_eq!(weight_cache.median_long_term_weight(), 2500);
@ -37,6 +39,74 @@ async fn blocks_out_of_window_not_counted() -> Result<(), tower::BoxError> {
Ok(())
}
#[tokio::test]
async fn pop_blocks_greater_than_window() -> Result<(), tower::BoxError> {
let mut db_builder = DummyDatabaseBuilder::default();
for weight in 1..=5000 {
let block = DummyBlockExtendedHeader::default().with_weight_into(weight, weight);
db_builder.add_block(block);
}
let database = db_builder.finish(None);
let mut weight_cache = BlockWeightsCache::init_from_chain_height(
5000,
TEST_WEIGHT_CONFIG,
database.clone(),
Chain::Main,
)
.await?;
let old_cache = weight_cache.clone();
weight_cache.new_block(5000, 0, 0);
weight_cache.new_block(5001, 0, 0);
weight_cache.new_block(5002, 0, 0);
weight_cache
.pop_blocks_main_chain(3, database)
.await
.unwrap();
assert_eq!(weight_cache, old_cache);
Ok(())
}
#[tokio::test]
async fn pop_blocks_less_than_window() -> Result<(), tower::BoxError> {
let mut db_builder = DummyDatabaseBuilder::default();
for weight in 1..=500 {
let block = DummyBlockExtendedHeader::default().with_weight_into(weight, weight);
db_builder.add_block(block);
}
let database = db_builder.finish(None);
let mut weight_cache = BlockWeightsCache::init_from_chain_height(
500,
TEST_WEIGHT_CONFIG,
database.clone(),
Chain::Main,
)
.await?;
let old_cache = weight_cache.clone();
weight_cache.new_block(500, 0, 0);
weight_cache.new_block(501, 0, 0);
weight_cache.new_block(502, 0, 0);
weight_cache
.pop_blocks_main_chain(3, database)
.await
.unwrap();
assert_eq!(weight_cache, old_cache);
Ok(())
}
#[tokio::test]
async fn weight_cache_calculates_correct_median() -> Result<(), tower::BoxError> {
let mut db_builder = DummyDatabaseBuilder::default();
@ -44,8 +114,12 @@ async fn weight_cache_calculates_correct_median() -> Result<(), tower::BoxError>
let block = DummyBlockExtendedHeader::default().with_weight_into(0, 0);
db_builder.add_block(block);
let mut weight_cache =
BlockWeightsCache::init_from_chain_height(1, TEST_WEIGHT_CONFIG, db_builder.finish(None))
let mut weight_cache = BlockWeightsCache::init_from_chain_height(
1,
TEST_WEIGHT_CONFIG,
db_builder.finish(None),
Chain::Main,
)
.await?;
for height in 1..=100 {
@ -76,6 +150,7 @@ async fn calc_bw_ltw_2850000_3050000() {
2950000,
TEST_WEIGHT_CONFIG,
db_builder.finish(Some(2950000)),
Chain::Main,
)
.await
.unwrap();

View file

@ -127,6 +127,12 @@ pub struct DummyDatabase {
dummy_height: Option<usize>,
}
impl DummyDatabase {
pub fn add_block(&mut self, block: DummyBlockExtendedHeader) {
self.blocks.write().unwrap().push(block)
}
}
impl Service<BCReadRequest> for DummyDatabase {
type Response = BCResponse;
type Error = BoxError;
@ -161,12 +167,12 @@ impl Service<BCReadRequest> for DummyDatabase {
.ok_or("block not in database!")?,
)
}
BCReadRequest::BlockHash(id) => {
BCReadRequest::BlockHash(id, _) => {
let mut hash = [0; 32];
hash[0..8].copy_from_slice(&id.to_le_bytes());
BCResponse::BlockHash(hash)
}
BCReadRequest::BlockExtendedHeaderInRange(range) => {
BCReadRequest::BlockExtendedHeaderInRange(range, _) => {
let mut end = usize::try_from(range.end).unwrap();
let mut start = usize::try_from(range.start).unwrap();
@ -200,7 +206,7 @@ impl Service<BCReadRequest> for DummyDatabase {
BCResponse::ChainHeight(height, top_hash)
}
BCReadRequest::GeneratedCoins => BCResponse::GeneratedCoins(0),
BCReadRequest::GeneratedCoins(_) => BCResponse::GeneratedCoins(0),
_ => unimplemented!("the context svc should not need these requests!"),
})
}

View file

@ -8,6 +8,9 @@ use core::{
ops::{Add, Div, Mul, Sub},
};
#[cfg(feature = "std")]
mod rolling_median;
//---------------------------------------------------------------------------------------------------- Types
// INVARIANT: must be private.
// Protects against outside-crate implementations.
@ -15,6 +18,9 @@ mod private {
pub trait Sealed: Copy + PartialOrd<Self> + core::fmt::Display {}
}
#[cfg(feature = "std")]
pub use rolling_median::RollingMedian;
/// Non-floating point numbers
///
/// This trait is sealed and is only implemented on:

View file

@ -0,0 +1,150 @@
use std::{
collections::VecDeque,
ops::{Add, Div, Mul, Sub},
};
use crate::num::median;
/// A rolling median type.
///
/// This keeps track of a window of items and allows calculating the [`RollingMedian::median`] of them.
///
/// Example:
/// ```rust
/// # use cuprate_helper::num::RollingMedian;
/// let mut rolling_median = RollingMedian::new(2);
///
/// rolling_median.push(1);
/// assert_eq!(rolling_median.median(), 1);
/// assert_eq!(rolling_median.window_len(), 1);
///
/// rolling_median.push(3);
/// assert_eq!(rolling_median.median(), 2);
/// assert_eq!(rolling_median.window_len(), 2);
///
/// rolling_median.push(5);
/// assert_eq!(rolling_median.median(), 4);
/// assert_eq!(rolling_median.window_len(), 2);
/// ```
///
// TODO: a more efficient structure is probably possible.
#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Clone)]
pub struct RollingMedian<T> {
/// The window of items, in order of insertion.
window: VecDeque<T>,
/// The window of items, sorted.
sorted_window: Vec<T>,
/// The target window length.
target_window: usize,
}
impl<T> RollingMedian<T>
where
T: Ord
+ PartialOrd
+ Add<Output = T>
+ Sub<Output = T>
+ Div<Output = T>
+ Mul<Output = T>
+ Copy
+ From<u8>,
{
/// Creates a new [`RollingMedian`] with a certain target window length.
///
/// `target_window` is the maximum amount of items to keep in the rolling window.
pub fn new(target_window: usize) -> Self {
Self {
window: VecDeque::with_capacity(target_window),
sorted_window: Vec::with_capacity(target_window),
target_window,
}
}
/// Creates a new [`RollingMedian`] from a [`Vec`] with a certain target window length.
///
/// `target_window` is the maximum amount of items to keep in the rolling window.
///
/// # Panics
/// This function panics if `vec.len() > target_window`.
pub fn from_vec(vec: Vec<T>, target_window: usize) -> Self {
assert!(vec.len() <= target_window);
let mut sorted_window = vec.clone();
sorted_window.sort_unstable();
Self {
window: vec.into(),
sorted_window,
target_window,
}
}
/// Pops the front of the window, i.e. the oldest item.
///
/// This is often not needed as [`RollingMedian::push`] will handle popping old values when they fall
/// out of the window.
pub fn pop_front(&mut self) {
if let Some(item) = self.window.pop_front() {
match self.sorted_window.binary_search(&item) {
Ok(idx) => {
self.sorted_window.remove(idx);
}
Err(_) => panic!("Value expected to be in sorted_window was not there"),
}
}
}
/// Pops the back of the window, i.e. the youngest item.
pub fn pop_back(&mut self) {
if let Some(item) = self.window.pop_back() {
match self.sorted_window.binary_search(&item) {
Ok(idx) => {
self.sorted_window.remove(idx);
}
Err(_) => panic!("Value expected to be in sorted_window was not there"),
}
}
}
/// Push an item to the _back_ of the window.
///
/// This will pop the oldest item in the window if the target length has been exceeded.
pub fn push(&mut self, item: T) {
if self.window.len() >= self.target_window {
self.pop_front();
}
self.window.push_back(item);
match self.sorted_window.binary_search(&item) {
Ok(idx) | Err(idx) => self.sorted_window.insert(idx, item),
}
}
/// Append some values to the _front_ of the window.
///
/// These new values will be the oldest items in the window. The order of the inputted items will be
/// kept, i.e. the first item in the [`Vec`] will be the oldest item in the queue.
pub fn append_front(&mut self, items: Vec<T>) {
for item in items.into_iter().rev() {
self.window.push_front(item);
match self.sorted_window.binary_search(&item) {
Ok(idx) | Err(idx) => self.sorted_window.insert(idx, item),
}
if self.window.len() > self.target_window {
self.pop_back();
}
}
}
/// Returns the number of items currently in the [`RollingMedian`].
pub fn window_len(&self) -> usize {
self.window.len()
}
/// Calculates the median of the values currently in the [`RollingMedian`].
pub fn median(&self) -> T {
median(&self.sorted_window)
}
}

View file

@ -63,7 +63,7 @@
//! use hex_literal::hex;
//! use tower::{Service, ServiceExt};
//!
//! use cuprate_types::blockchain::{BCReadRequest, BCWriteRequest, BCResponse};
//! use cuprate_types::{blockchain::{BCReadRequest, BCWriteRequest, BCResponse}, Chain};
//! use cuprate_test_utils::data::block_v16_tx0;
//!
//! use cuprate_blockchain::{
@ -85,7 +85,7 @@
//!
//! // Prepare a request to write block.
//! let mut block = block_v16_tx0().clone();
//! # block.height = 0 as u64; // must be 0th height or panic in `add_block()`
//! # block.height = 0_u64; // must be 0th height or panic in `add_block()`
//! let request = BCWriteRequest::WriteBlock(block);
//!
//! // Send the request.
@ -100,7 +100,7 @@
//!
//! // Now, let's try getting the block hash
//! // of the block we just wrote.
//! let request = BCReadRequest::BlockHash(0);
//! let request = BCReadRequest::BlockHash(0, Chain::Main);
//! let response_channel = read_handle.ready().await?.call(request);
//! let response = response_channel.await?;
//! assert_eq!(

View file

@ -17,7 +17,7 @@ use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError};
use cuprate_helper::{asynch::InfallibleOneshotReceiver, map::combine_low_high_bits_to_u128};
use cuprate_types::{
blockchain::{BCReadRequest, BCResponse},
ExtendedBlockHeader, OutputOnChain,
Chain, ExtendedBlockHeader, OutputOnChain,
};
use crate::{
@ -206,11 +206,14 @@ fn map_request(
let response = match request {
R::BlockExtendedHeader(block) => block_extended_header(env, block),
R::BlockHash(block) => block_hash(env, block),
R::BlockHash(block, chain) => block_hash(env, block, chain),
R::FindBlock(_) => todo!("Add alt blocks to DB"),
R::FilterUnknownHashes(hashes) => filter_unknown_hashes(env, hashes),
R::BlockExtendedHeaderInRange(range) => block_extended_header_in_range(env, range),
R::BlockExtendedHeaderInRange(range, chain) => {
block_extended_header_in_range(env, range, chain)
}
R::ChainHeight => chain_height(env),
R::GeneratedCoins => generated_coins(env),
R::GeneratedCoins(height) => generated_coins(env, height),
R::Outputs(map) => outputs(env, map),
R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(env, vec),
R::KeyImagesSpent(set) => key_images_spent(env, set),
@ -312,15 +315,18 @@ fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> Respon
/// [`BCReadRequest::BlockHash`].
#[inline]
fn block_hash(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
fn block_hash(env: &ConcreteEnv, block_height: BlockHeight, chain: Chain) -> ResponseResult {
// Single-threaded, no `ThreadLocal` required.
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
Ok(BCResponse::BlockHash(
get_block_info(&block_height, &table_block_infos)?.block_hash,
))
let block_hash = match chain {
Chain::Main => get_block_info(&block_height, &table_block_infos)?.block_hash,
Chain::Alt(_) => todo!("Add alt blocks to DB"),
};
Ok(BCResponse::BlockHash(block_hash))
}
/// [`BCReadRequest::FilterUnknownHashes`].
@ -356,6 +362,7 @@ fn filter_unknown_hashes(env: &ConcreteEnv, mut hashes: HashSet<BlockHash>) -> R
fn block_extended_header_in_range(
env: &ConcreteEnv,
range: std::ops::Range<BlockHeight>,
chain: Chain,
) -> ResponseResult {
// Prepare tx/tables in `ThreadLocal`.
let env_inner = env.env_inner();
@ -363,14 +370,17 @@ fn block_extended_header_in_range(
let tables = thread_local(env);
// Collect results using `rayon`.
let vec = range
let vec = match chain {
Chain::Main => range
.into_par_iter()
.map(|block_height| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
get_block_extended_header_from_height(&block_height, tables)
})
.collect::<Result<Vec<ExtendedBlockHeader>, RuntimeError>>()?;
.collect::<Result<Vec<ExtendedBlockHeader>, RuntimeError>>()?,
Chain::Alt(_) => todo!("Add alt blocks to DB"),
};
Ok(BCResponse::BlockExtendedHeaderInRange(vec))
}
@ -393,17 +403,14 @@ fn chain_height(env: &ConcreteEnv) -> ResponseResult {
/// [`BCReadRequest::GeneratedCoins`].
#[inline]
fn generated_coins(env: &ConcreteEnv) -> ResponseResult {
fn generated_coins(env: &ConcreteEnv, height: u64) -> ResponseResult {
// Single-threaded, no `ThreadLocal` required.
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
let top_height = top_block_height(&table_block_heights)?;
Ok(BCResponse::GeneratedCoins(cumulative_generated_coins(
&top_height,
&height,
&table_block_infos,
)?))
}

View file

@ -19,7 +19,7 @@ use cuprate_database::{ConcreteEnv, DatabaseIter, DatabaseRo, Env, EnvInner, Run
use cuprate_test_utils::data::{block_v16_tx0, block_v1_tx2, block_v9_tx3};
use cuprate_types::{
blockchain::{BCReadRequest, BCResponse, BCWriteRequest},
OutputOnChain, VerifiedBlockInformation,
Chain, OutputOnChain, VerifiedBlockInformation,
};
use crate::{
@ -138,10 +138,15 @@ async fn test_template(
Err(RuntimeError::KeyNotFound)
};
let test_chain_height = chain_height(tables.block_heights()).unwrap();
let chain_height = {
let height = chain_height(tables.block_heights()).unwrap();
let block_info = get_block_info(&height.saturating_sub(1), tables.block_infos()).unwrap();
Ok(BCResponse::ChainHeight(height, block_info.block_hash))
let block_info =
get_block_info(&test_chain_height.saturating_sub(1), tables.block_infos()).unwrap();
Ok(BCResponse::ChainHeight(
test_chain_height,
block_info.block_hash,
))
};
let cumulative_generated_coins = Ok(BCResponse::GeneratedCoins(cumulative_generated_coins));
@ -182,12 +187,21 @@ async fn test_template(
BCReadRequest::BlockExtendedHeader(1),
extended_block_header_1,
),
(BCReadRequest::BlockHash(0), block_hash_0),
(BCReadRequest::BlockHash(1), block_hash_1),
(BCReadRequest::BlockExtendedHeaderInRange(0..1), range_0_1),
(BCReadRequest::BlockExtendedHeaderInRange(0..2), range_0_2),
(BCReadRequest::BlockHash(0, Chain::Main), block_hash_0),
(BCReadRequest::BlockHash(1, Chain::Main), block_hash_1),
(
BCReadRequest::BlockExtendedHeaderInRange(0..1, Chain::Main),
range_0_1,
),
(
BCReadRequest::BlockExtendedHeaderInRange(0..2, Chain::Main),
range_0_2,
),
(BCReadRequest::ChainHeight, chain_height),
(BCReadRequest::GeneratedCoins, cumulative_generated_coins),
(
BCReadRequest::GeneratedCoins(test_chain_height),
cumulative_generated_coins,
),
(BCReadRequest::NumberOutputsWithAmount(num_req), num_resp),
(BCReadRequest::KeyImagesSpent(ki_req), ki_resp),
] {

View file

@ -9,7 +9,7 @@ use std::{
ops::Range,
};
use crate::types::{ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation};
use crate::types::{Chain, ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation};
//---------------------------------------------------------------------------------------------------- ReadRequest
/// A read request to the blockchain database.
@ -29,8 +29,13 @@ pub enum BCReadRequest {
/// Request a block's hash.
///
/// The input is the block's height.
BlockHash(u64),
/// The input is the block's height and the chain it is on.
BlockHash(u64, Chain),
/// Request to check if we have a block and which [`Chain`] it is on.
///
/// The input is the block's hash.
FindBlock([u8; 32]),
/// Removes the block hashes that are not in the _main_ chain.
///
@ -40,15 +45,15 @@ pub enum BCReadRequest {
/// Request a range of block extended headers.
///
/// The input is a range of block heights.
BlockExtendedHeaderInRange(Range<u64>),
BlockExtendedHeaderInRange(Range<u64>, Chain),
/// Request the current chain height.
///
/// Note that this is not the top-block height.
ChainHeight,
/// Request the total amount of generated coins (atomic units) so far.
GeneratedCoins,
/// Request the total amount of generated coins (atomic units) at this height.
GeneratedCoins(u64),
/// Request data for multiple outputs.
///
@ -129,6 +134,11 @@ pub enum BCResponse {
/// Inner value is the hash of the requested block.
BlockHash([u8; 32]),
/// Response to [`BCReadRequest::FindBlock`].
///
/// Inner value is the chain and height of the block if found.
FindBlock(Option<(Chain, u64)>),
/// Response to [`BCReadRequest::FilterUnknownHashes`].
///
/// Inner value is the list of hashes that were in the main chain.
@ -146,7 +156,7 @@ pub enum BCResponse {
/// Response to [`BCReadRequest::GeneratedCoins`].
///
/// Inner value is the total amount of generated coins so far, in atomic units.
/// Inner value is the total amount of generated coins up to and including the chosen height, in atomic units.
GeneratedCoins(u64),
/// Response to [`BCReadRequest::Outputs`].

View file

@ -84,7 +84,8 @@ mod types;
pub use block_complete_entry::{BlockCompleteEntry, PrunedTxBlobEntry, TransactionBlobs};
pub use types::{
ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation, VerifiedTransactionInformation,
AltBlockInformation, Chain, ChainId, ExtendedBlockHeader, OutputOnChain,
VerifiedBlockInformation, VerifiedTransactionInformation,
};
//---------------------------------------------------------------------------------------------------- Feature-gated

View file

@ -38,7 +38,8 @@ pub struct ExtendedBlockHeader {
//---------------------------------------------------------------------------------------------------- VerifiedTransactionInformation
/// Verified information of a transaction.
///
/// This represents a transaction in a valid block.
/// - If this is in a [`VerifiedBlockInformation`] this represents a valid transaction
/// - If this is in an [`AltBlockInformation`] this represents a potentially valid transaction
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct VerifiedTransactionInformation {
/// The transaction itself.
@ -91,6 +92,53 @@ pub struct VerifiedBlockInformation {
pub cumulative_difficulty: u128,
}
//---------------------------------------------------------------------------------------------------- ChainID
/// A unique ID for an alt chain.
///
/// The inner value is meaningless.
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct ChainId(pub u64);
//---------------------------------------------------------------------------------------------------- Chain
/// An identifier for a chain.
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub enum Chain {
/// The main chain.
Main,
/// An alt chain.
Alt(ChainId),
}
//---------------------------------------------------------------------------------------------------- AltBlockInformation
/// A block on an alternative chain.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct AltBlockInformation {
/// The block itself.
pub block: Block,
/// The serialized byte form of [`Self::block`].
///
/// [`Block::serialize`].
pub block_blob: Vec<u8>,
/// All the transactions in the block, excluding the [`Block::miner_tx`].
pub txs: Vec<VerifiedTransactionInformation>,
/// The block's hash.
///
/// [`Block::hash`].
pub block_hash: [u8; 32],
/// The block's proof-of-work hash.
pub pow_hash: [u8; 32],
/// The block's height.
pub height: u64,
/// The adjusted block size, in bytes.
pub weight: usize,
/// The long term block weight, which is the weight factored in with previous block weights.
pub long_term_weight: usize,
/// The cumulative difficulty of all blocks up until and including this block.
pub cumulative_difficulty: u128,
/// The [`ChainId`] of the chain this alt block is on.
pub chain_id: ChainId,
}
//---------------------------------------------------------------------------------------------------- OutputOnChain
/// An already existing transaction output.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]