Consensus: fix Rx VM initialization (#190)
Some checks failed
CI / fmt (push) Has been cancelled
CI / typo (push) Has been cancelled
CI / ci (macos-latest, stable, bash) (push) Has been cancelled
CI / ci (ubuntu-latest, stable, bash) (push) Has been cancelled
CI / ci (windows-latest, stable-x86_64-pc-windows-gnu, msys2 {0}) (push) Has been cancelled

* fix Rx VM initialization

* fix imports

* Apply suggestions from code review

Co-authored-by: hinto-janai <hinto.janai@protonmail.com>

* use checked_sub

---------

Co-authored-by: hinto-janai <hinto.janai@protonmail.com>
This commit is contained in:
Boog900 2024-06-25 00:55:04 +00:00 committed by GitHub
parent 4b93dbec4c
commit 5c08d1a0e2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 113 additions and 80 deletions

View file

@ -21,6 +21,7 @@ use cuprate_consensus_rules::{
calculate_pow_hash, check_block, check_block_pow, is_randomx_seed_height,
randomx_seed_height, BlockError, RandomX,
},
hard_forks::HardForkError,
miner_tx::MinerTxError,
ConsensusError, HardFork,
};
@ -327,6 +328,14 @@ where
})
.await?;
let Some(last_block) = blocks.last() else {
return Err(ExtendedConsensusError::NoBlocksToVerify);
};
// hard-forks cannot be reversed, so the last block will contain the highest hard fork (provided the
// batch is valid).
let top_hf_in_batch = last_block.hf_version;
// A Vec of (timestamp, HF) for each block to calculate the expected difficulty for each block.
let mut timestamps_hfs = Vec::with_capacity(blocks.len());
let mut new_rx_vm = None;
@ -338,6 +347,13 @@ where
let block_0 = &window[0];
let block_1 = &window[1];
// Make sure no blocks in the batch have a higher hard fork than the last block.
if block_0.hf_version > top_hf_in_batch {
Err(ConsensusError::Block(BlockError::HardForkError(
HardForkError::VersionIncorrect,
)))?;
}
if block_0.block_hash != block_1.block.header.previous
|| block_0.height != block_1.height - 1
{
@ -346,7 +362,7 @@ where
}
// Cache any potential RX VM seeds as we may need them for future blocks in the batch.
if is_randomx_seed_height(block_0.height) {
if is_randomx_seed_height(block_0.height) && top_hf_in_batch >= HardFork::V12 {
new_rx_vm = Some((block_0.height, block_0.block_hash));
}
@ -395,7 +411,20 @@ where
Err(ConsensusError::Block(BlockError::PreviousIDIncorrect))?;
}
let mut rx_vms = context.rx_vms;
let mut rx_vms = if top_hf_in_batch < HardFork::V12 {
HashMap::new()
} else {
let BlockChainContextResponse::RxVms(rx_vms) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::GetCurrentRxVm)
.await?
else {
panic!("Blockchain context service returned wrong response!");
};
rx_vms
};
// If we have a RX seed in the batch calculate it.
if let Some((new_vm_height, new_vm_seed)) = new_rx_vm {
@ -407,9 +436,7 @@ where
.await;
context_svc
.ready()
.await?
.call(BlockChainContextRequest::NewRXVM((
.oneshot(BlockChainContextRequest::NewRXVM((
new_vm_seed,
new_vm.clone(),
)))
@ -501,7 +528,21 @@ where
// Set up the block and just pass it to [`verify_prepped_main_chain_block`]
let rx_vms = context.rx_vms.clone();
// We just use the raw `major_version` here, no need to turn it into a `HardFork`.
let rx_vms = if block.header.major_version < 12 {
HashMap::new()
} else {
let BlockChainContextResponse::RxVms(rx_vms) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::GetCurrentRxVm)
.await?
else {
panic!("Blockchain context service returned wrong response!");
};
rx_vms
};
let height = context.chain_height;
let prepped_block = rayon_spawn_async(move || {

View file

@ -85,20 +85,7 @@ impl ContextConfig {
pub async fn initialize_blockchain_context<D>(
cfg: ContextConfig,
database: D,
) -> Result<
impl Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
Future = impl Future<Output = Result<BlockChainContextResponse, tower::BoxError>>
+ Send
+ 'static,
> + Clone
+ Send
+ Sync
+ 'static,
ExtendedConsensusError,
>
) -> Result<BlockChainContextService, ExtendedConsensusError>
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
@ -121,9 +108,6 @@ where
pub struct RawBlockChainContext {
/// The current cumulative difficulty.
pub cumulative_difficulty: u128,
/// RandomX VMs, this maps seeds height to VM. Will definitely contain the VM required to calculate the current blocks
/// POW hash (if a RX VM is required), may contain more.
pub rx_vms: HashMap<u64, Arc<RandomXVM>>,
/// Context to verify a block, as needed by [`cuprate-consensus-rules`]
pub context_to_verify_block: ContextToVerifyBlock,
/// The median long term block weight.
@ -162,7 +146,7 @@ impl RawBlockChainContext {
}
}
/// Returns the next blocks long term weight from it's block weight.
/// Returns the next blocks long term weight from its block weight.
pub fn next_block_long_term_weight(&self, block_weight: usize) -> usize {
weight::calculate_block_long_term_weight(
&self.current_hf,
@ -232,6 +216,8 @@ pub struct NewBlockData {
pub enum BlockChainContextRequest {
/// Get the current blockchain context.
GetContext,
/// Gets the current RandomX VM.
GetCurrentRxVm,
/// Get the next difficulties for these blocks.
///
/// Inputs: a list of block timestamps and hfs
@ -252,6 +238,8 @@ pub enum BlockChainContextRequest {
pub enum BlockChainContextResponse {
/// Blockchain context response.
Context(BlockChainContext),
/// A map of seed height to RandomX VMs.
RxVms(HashMap<u64, Arc<RandomXVM>>),
/// A list of difficulties.
BatchDifficulties(Vec<u128>),
/// Ok response.

View file

@ -125,64 +125,69 @@ impl RandomXVMCache {
}
/// Get the RandomX VMs.
pub fn get_vms(&self) -> HashMap<u64, Arc<RandomXVM>> {
pub async fn get_vms(&mut self) -> HashMap<u64, Arc<RandomXVM>> {
match self.seeds.len().checked_sub(self.vms.len()) {
// No difference in the amount of seeds to VMs.
Some(0) => (),
// One more seed than VM.
Some(1) => {
let (seed_height, next_seed_hash) = *self.seeds.front().unwrap();
let new_vm = 'new_vm_block: {
tracing::debug!(
"Initializing RandomX VM for seed: {}",
hex::encode(next_seed_hash)
);
// Check if we have been given the RX VM from another part of Cuprate.
if let Some((cached_hash, cached_vm)) = self.cached_vm.take() {
if cached_hash == next_seed_hash {
tracing::debug!("VM was already created.");
break 'new_vm_block cached_vm;
}
};
rayon_spawn_async(move || Arc::new(RandomXVM::new(&next_seed_hash).unwrap()))
.await
};
self.vms.insert(seed_height, new_vm);
}
// More than one more seed than VM.
_ => {
// this will only happen when syncing and rx activates.
tracing::debug!("RandomX has activated, initialising VMs");
let seeds_clone = self.seeds.clone();
self.vms = rayon_spawn_async(move || {
seeds_clone
.par_iter()
.map(|(height, seed)| {
let vm = RandomXVM::new(seed).expect("Failed to create RandomX VM!");
let vm = Arc::new(vm);
(*height, vm)
})
.collect()
})
.await
}
}
self.vms.clone()
}
/// Add a new block to the VM cache.
///
/// hash is the block hash not the blocks PoW hash.
pub async fn new_block(&mut self, height: u64, hash: &[u8; 32], hf: &HardFork) {
let should_make_vms = hf >= &HardFork::V12;
if should_make_vms && self.vms.len() != self.seeds.len() {
// this will only happen when syncing and rx activates.
tracing::debug!("RandomX has activated, initialising VMs");
let seeds_clone = self.seeds.clone();
self.vms = rayon_spawn_async(move || {
seeds_clone
.par_iter()
.map(|(height, seed)| {
(
*height,
Arc::new(RandomXVM::new(seed).expect("Failed to create RandomX VM!")),
)
})
.collect()
})
.await
}
pub fn new_block(&mut self, height: u64, hash: &[u8; 32]) {
if is_randomx_seed_height(height) {
tracing::debug!("Block {height} is a randomX seed height, adding it to the cache.",);
self.seeds.push_front((height, *hash));
if should_make_vms {
let new_vm = 'new_vm_block: {
tracing::debug!(
"Past hard-fork 12 initializing VM for seed: {}",
hex::encode(hash)
);
// Check if we have been given the RX VM from another part of Cuprate.
if let Some((cached_hash, cached_vm)) = self.cached_vm.take() {
if &cached_hash == hash {
tracing::debug!("VM was already created.");
break 'new_vm_block cached_vm;
}
};
let hash_clone = *hash;
rayon_spawn_async(move || Arc::new(RandomXVM::new(&hash_clone).unwrap())).await
};
self.vms.insert(height, new_vm);
}
if self.seeds.len() > RX_SEEDS_CACHED {
self.seeds.pop_back();
// TODO: This is really not efficient but the amount of VMs cached is not a lot.
// HACK: This is really inefficient but the amount of VMs cached is not a lot.
self.vms.retain(|height, _| {
self.seeds
.iter()

View file

@ -158,13 +158,15 @@ impl ContextTask {
next_difficulty: self.difficulty_cache.next_difficulty(&current_hf),
already_generated_coins: self.already_generated_coins,
},
rx_vms: self.rx_vm_cache.get_vms(),
cumulative_difficulty: self.difficulty_cache.cumulative_difficulty(),
median_long_term_weight: self.weight_cache.median_long_term_weight(),
top_block_timestamp: self.difficulty_cache.top_block_timestamp(),
},
})
}
BlockChainContextRequest::GetCurrentRxVm => {
BlockChainContextResponse::RxVms(self.rx_vm_cache.get_vms().await)
}
BlockChainContextRequest::BatchGetDifficulties(blocks) => {
tracing::debug!("Getting batch difficulties len: {}", blocks.len() + 1);
@ -199,15 +201,7 @@ impl ContextTask {
self.hardfork_state.new_block(new.vote, new.height);
self.rx_vm_cache
.new_block(
new.height,
&new.block_hash,
// We use the current hf and not the hf of the top block as when syncing we need to generate VMs
// on the switch to RX not after it.
&self.hardfork_state.current_hardfork(),
)
.await;
self.rx_vm_cache.new_block(new.height, &new.block_hash);
self.chain_height = new.height + 1;
self.top_block_hash = new.block_hash;

View file

@ -44,6 +44,9 @@ pub enum ExtendedConsensusError {
/// One or more statements in the batch verifier was invalid.
#[error("One or more statements in the batch verifier was invalid.")]
OneOrMoreBatchVerificationStatementsInvalid,
/// A request to verify a batch of blocks had no blocks in the batch.
#[error("A request to verify a batch of blocks had no blocks in the batch.")]
NoBlocksToVerify,
}
/// Initialize the 2 verifier [`tower::Service`]s (block and transaction).

View file

@ -47,7 +47,9 @@ async fn rx_vm_created_on_hf_12() {
.unwrap();
assert!(cache.vms.is_empty());
cache.new_block(11, &[30; 32], &HardFork::V12).await;
cache.new_block(11, &[30; 32]);
cache.get_vms().await;
assert!(!cache.vms.is_empty());
}