add output cache

This commit is contained in:
Boog900 2025-01-24 03:39:57 +00:00
parent 2756d9613c
commit 918d818581
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
16 changed files with 157 additions and 66 deletions
Cargo.lock
binaries/cuprated/src
blockchain/manager
config
main.rs
rpc/request
txpool
consensus/src
storage/blockchain
types

3
Cargo.lock generated
View file

@ -607,6 +607,7 @@ dependencies = [
"curve25519-dalek",
"hex",
"hex-literal",
"indexmap",
"monero-serai",
"pretty_assertions",
"proptest",
@ -1006,6 +1007,7 @@ dependencies = [
"curve25519-dalek",
"hex",
"hex-literal",
"indexmap",
"monero-serai",
"pretty_assertions",
"proptest",
@ -1831,6 +1833,7 @@ checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f"
dependencies = [
"equivalent",
"hashbrown 0.15.2",
"rayon",
]
[[package]]

View file

@ -166,8 +166,8 @@ impl super::BlockchainManager {
/// This function will panic if any internal service returns an unexpected error that we cannot
/// recover from or if the incoming batch contains no blocks.
async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) {
let Ok(prepped_blocks) =
batch_prepare_main_chain_blocks(batch.blocks, &mut self.blockchain_context_service)
let Ok((prepped_blocks, mut output_cache)) =
batch_prepare_main_chain_blocks(batch.blocks, &mut self.blockchain_context_service, self.blockchain_read_handle.clone())
.await
else {
batch.peer_handle.ban_peer(LONG_BAN);
@ -181,6 +181,7 @@ impl super::BlockchainManager {
txs,
&mut self.blockchain_context_service,
self.blockchain_read_handle.clone(),
Some(&mut output_cache)
)
.await
else {
@ -404,6 +405,7 @@ impl super::BlockchainManager {
prepped_txs,
&mut self.blockchain_context_service,
self.blockchain_read_handle.clone(),
None
)
.await?;

View file

@ -39,7 +39,7 @@ impl From<BlockDownloaderConfig> for cuprate_p2p::block_downloader::BlockDownloa
in_progress_queue_bytes: value.in_progress_queue_bytes,
check_client_pool_interval: value.check_client_pool_interval,
target_batch_bytes: value.target_batch_bytes,
initial_batch_len: 1,
initial_batch_len: 20,
}
}
}

View file

@ -27,7 +27,7 @@ use cuprate_consensus_context::{
BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService,
};
use cuprate_helper::time::secs_to_hms;
use cuprate_types::blockchain::BlockchainWriteRequest;
use crate::{
config::Config, constants::PANIC_CRITICAL_SERVICE_ERROR, logging::CupratedTracingFilter,
};
@ -70,6 +70,7 @@ fn main() {
Arc::clone(&db_thread_pool),
)
.unwrap();
let (txpool_read_handle, txpool_write_handle, _) =
cuprate_txpool::service::init_with_pool(config.txpool_config(), db_thread_pool).unwrap();

View file

@ -196,7 +196,7 @@ pub(crate) async fn outputs(
unreachable!();
};
Ok(outputs)
Ok(todo!())
}
/// [`BlockchainReadRequest::NumberOutputsWithAmount`]

View file

@ -172,6 +172,7 @@ async fn handle_incoming_txs(
context.current_adjusted_timestamp_for_time_lock(),
context.current_hf,
blockchain_read_handle,
None
)
.verify()
.await

View file

@ -37,6 +37,7 @@ mod free;
pub use alt_block::sanity_check_alt_block;
pub use batch_prepare::batch_prepare_main_chain_blocks;
use cuprate_types::output_cache::OutputCache;
use free::pull_ordered_transactions;
/// A pre-prepared block with all data needed to verify it, except the block's proof of work.
@ -243,7 +244,7 @@ where
// Check that the txs included are what we need and that there are not any extra.
let ordered_txs = pull_ordered_transactions(&prepped_block.block, txs)?;
verify_prepped_main_chain_block(prepped_block, ordered_txs, context_svc, database).await
verify_prepped_main_chain_block(prepped_block, ordered_txs, context_svc, database, None).await
}
/// Fully verify a block that has already been prepared using [`batch_prepare_main_chain_blocks`].
@ -252,6 +253,7 @@ pub async fn verify_prepped_main_chain_block<D>(
mut txs: Vec<TransactionVerificationData>,
context_svc: &mut BlockchainContextService,
database: D,
output_cache: Option<&mut OutputCache>,
) -> Result<VerifiedBlockInformation, ExtendedConsensusError>
where
D: Database + Clone + Send + 'static,
@ -283,6 +285,7 @@ where
context.current_adjusted_timestamp_for_time_lock(),
context.current_hf,
database,
output_cache.as_ref().map(|o| &**o)
)
.verify()
.await?;
@ -304,7 +307,7 @@ where
)
.map_err(ConsensusError::Block)?;
Ok(VerifiedBlockInformation {
let block = VerifiedBlockInformation {
block_hash: prepped_block.block_hash,
block: prepped_block.block,
block_blob: prepped_block.block_blob,
@ -324,5 +327,11 @@ where
height: context.chain_height,
long_term_weight: context.next_block_long_term_weight(block_weight),
cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty,
})
};
if let Some(output_cache) = output_cache {
output_cache.add_block_to_cache(&block);
}
Ok(block)
}

View file

@ -13,6 +13,7 @@ use cuprate_consensus_rules::{
ConsensusError, HardFork,
};
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_types::output_cache::OutputCache;
use cuprate_types::TransactionVerificationData;
use crate::{
@ -21,13 +22,16 @@ use crate::{
transactions::start_tx_verification,
BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError,
};
use crate::__private::Database;
use crate::transactions::contextual_data::get_output_cache;
/// Batch prepares a list of blocks for verification.
#[instrument(level = "debug", name = "batch_prep_blocks", skip_all, fields(amt = blocks.len()))]
pub async fn batch_prepare_main_chain_blocks(
pub async fn batch_prepare_main_chain_blocks<D: Database>(
blocks: Vec<(Block, Vec<Transaction>)>,
context_svc: &mut BlockchainContextService,
) -> Result<Vec<(PreparedBlock, Vec<TransactionVerificationData>)>, ExtendedConsensusError> {
database: D
) -> Result<(Vec<(PreparedBlock, Vec<TransactionVerificationData>)>, OutputCache), ExtendedConsensusError> {
let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
tracing::debug!("Calculating block hashes.");
@ -189,5 +193,7 @@ pub async fn batch_prepare_main_chain_blocks(
})
.await?;
Ok(blocks)
let output_cache = get_output_cache(blocks.iter().flat_map(|(_, txs)| txs.iter()), database).await?;
Ok((blocks, output_cache))
}

View file

@ -75,7 +75,7 @@ pub mod __private {
BlockchainReadRequest,
Response = BlockchainResponse,
Error = tower::BoxError,
Future: Send + 'static,
//Future: Send + 'static,
>
{
}
@ -85,7 +85,7 @@ pub mod __private {
BlockchainReadRequest,
Response = BlockchainResponse,
Error = tower::BoxError,
Future: Send + 'static,
// Future: Send + 'static,
>,
> Database for T
{

View file

@ -43,7 +43,7 @@ use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
CachedVerificationState, TransactionVerificationData, TxVersion,
};
use cuprate_types::output_cache::OutputCache;
use crate::{
batch_verifier::MultiThreadedBatchVerifier,
transactions::contextual_data::{batch_get_decoy_info, batch_get_ring_member_info},
@ -155,6 +155,7 @@ impl VerificationWanted {
time_for_time_lock: u64,
hf: HardFork,
database: D,
output_cache: Option<&OutputCache>
) -> FullVerification<D> {
FullVerification {
prepped_txs: self.prepped_txs,
@ -163,6 +164,7 @@ impl VerificationWanted {
time_for_time_lock,
hf,
database,
output_cache
}
}
}
@ -208,7 +210,7 @@ impl SemanticVerification {
/// Full transaction verification.
///
/// [`VerificationWanted::full`]
pub struct FullVerification<D> {
pub struct FullVerification<'a, D> {
prepped_txs: Vec<TransactionVerificationData>,
current_chain_height: usize,
@ -216,9 +218,10 @@ pub struct FullVerification<D> {
time_for_time_lock: u64,
hf: HardFork,
database: D,
output_cache: Option<&'a OutputCache>
}
impl<D: Database + Clone> FullVerification<D> {
impl<D: Database + Clone> FullVerification<'_, D> {
/// Fully verify each transaction.
pub async fn verify(
mut self,
@ -262,6 +265,7 @@ impl<D: Database + Clone> FullVerification<D> {
self.time_for_time_lock,
self.hf,
self.database,
self.output_cache
)
.await
}
@ -458,6 +462,7 @@ async fn verify_transactions<D>(
current_time_lock_timestamp: u64,
hf: HardFork,
database: D,
output_cache: Option<&OutputCache>
) -> Result<Vec<TransactionVerificationData>, ExtendedConsensusError>
where
D: Database,
@ -478,6 +483,7 @@ where
.map(|(tx, _)| tx),
hf,
database,
output_cache
)
.await?;

View file

@ -10,9 +10,11 @@
//!
//! Because this data is unique for *every* transaction and the context service is just for blockchain state data.
//!
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use monero_serai::transaction::{Input, Timelock};
use monero_serai::transaction::{Input, Timelock, Transaction};
use tower::ServiceExt;
use tracing::instrument;
@ -27,7 +29,7 @@ use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
OutputOnChain,
};
use cuprate_types::output_cache::OutputCache;
use crate::{transactions::TransactionVerificationData, Database, ExtendedConsensusError};
/// Get the ring members for the inputs from the outputs on the chain.
@ -134,18 +136,13 @@ fn new_rings(
})
}
/// Retrieves the [`TxRingMembersInfo`] for the inputted [`TransactionVerificationData`].
///
/// This function batch gets all the ring members for the inputted transactions and fills in data about
/// them.
pub async fn batch_get_ring_member_info<D: Database>(
txs_verification_data: impl Iterator<Item = &TransactionVerificationData> + Clone,
hf: HardFork,
pub async fn get_output_cache<D: Database>(
txs_verification_data: impl Iterator<Item = &TransactionVerificationData>,
mut database: D,
) -> Result<Vec<TxRingMembersInfo>, ExtendedConsensusError> {
) -> Result<OutputCache, ExtendedConsensusError> {
let mut output_ids = HashMap::new();
for tx_v_data in txs_verification_data.clone() {
for tx_v_data in txs_verification_data {
insert_ring_member_ids(&tx_v_data.tx.prefix().inputs, &mut output_ids)
.map_err(ConsensusError::Transaction)?;
}
@ -159,23 +156,49 @@ pub async fn batch_get_ring_member_info<D: Database>(
panic!("Database sent incorrect response!")
};
let BlockchainResponse::NumberOutputsWithAmount(outputs_with_amount) = database
.ready()
.await?
.call(BlockchainReadRequest::NumberOutputsWithAmount(
outputs.keys().copied().collect(),
))
.await?
else {
panic!("Database sent incorrect response!")
Ok(outputs)
}
/// Retrieves the [`TxRingMembersInfo`] for the inputted [`TransactionVerificationData`].
///
/// This function batch gets all the ring members for the inputted transactions and fills in data about
/// them.
pub async fn batch_get_ring_member_info<D: Database>(
txs_verification_data: impl Iterator<Item = &TransactionVerificationData> + Clone,
hf: HardFork,
mut database: D,
cache: Option<&OutputCache>,
) -> Result<Vec<TxRingMembersInfo>, ExtendedConsensusError> {
let mut output_ids = HashMap::new();
for tx_v_data in txs_verification_data.clone() {
insert_ring_member_ids(&tx_v_data.tx.prefix().inputs, &mut output_ids)
.map_err(ConsensusError::Transaction)?;
}
let outputs =if let Some(cache) = cache {
Cow::Borrowed(cache)
} else {
let BlockchainResponse::Outputs(outputs) = database
.ready()
.await?
.call(BlockchainReadRequest::Outputs(output_ids))
.await?
else {
panic!("Database sent incorrect response!")
};
Cow::Owned(outputs)
};
Ok(txs_verification_data
.map(move |tx_v_data| {
let numb_outputs = |amt| outputs_with_amount.get(&amt).copied().unwrap_or(0);
let numb_outputs = |amt| outputs.number_outs_with_amount(amt);
let ring_members_for_tx = get_ring_members_for_inputs(
|amt, idx| outputs.get(&amt)?.get(&idx).copied(),
|amt, idx| outputs.get_output(amt, idx).copied(),
&tx_v_data.tx.prefix().inputs,
)
.map_err(ConsensusError::Transaction)?;

View file

@ -31,6 +31,7 @@ rand = { workspace = true, features = ["std", "std_rng"] }
monero-serai = { workspace = true, features = ["std"] }
serde = { workspace = true, optional = true }
indexmap = { workspace = true, features = ["rayon"] }
tower = { workspace = true }
thread_local = { workspace = true }
rayon = { workspace = true }

View file

@ -9,26 +9,18 @@
)]
//---------------------------------------------------------------------------------------------------- Import
use std::{
cmp::min,
collections::{HashMap, HashSet},
sync::Arc,
};
use indexmap::{IndexMap, IndexSet};
use rayon::{
iter::{Either, IntoParallelIterator, ParallelIterator},
prelude::*,
ThreadPool,
};
use thread_local::ThreadLocal;
use cuprate_database::{ConcreteEnv, DatabaseRo, DbResult, Env, EnvInner, RuntimeError};
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
use cuprate_helper::map::combine_low_high_bits_to_u128;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain, ChainId, ExtendedBlockHeader, OutputHistogramInput, OutputOnChain, TxsInBlock,
use std::{
cmp::min,
collections::{HashMap, HashSet},
sync::Arc,
};
use thread_local::ThreadLocal;
use crate::{
ops::{
@ -53,6 +45,14 @@ use crate::{
AltBlockHeight, Amount, AmountIndex, BlockHash, BlockHeight, KeyImage, PreRctOutputId,
},
};
use cuprate_database::{ConcreteEnv, DatabaseRo, DbResult, Env, EnvInner, RuntimeError};
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
use cuprate_helper::map::combine_low_high_bits_to_u128;
use cuprate_types::output_cache::OutputCache;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain, ChainId, ExtendedBlockHeader, OutputHistogramInput, OutputOnChain, TxsInBlock,
};
//---------------------------------------------------------------------------------------------------- init_read_service
/// Initialize the [`BlockchainReadHandle`] thread-pool backed by [`rayon`].
@ -419,9 +419,34 @@ fn outputs(env: &ConcreteEnv, outputs: HashMap<Amount, HashSet<AmountIndex>>) ->
let tx_ro = thread_local(env);
let tables = thread_local(env);
let amount_of_outs = outputs
.par_iter()
.map(|(&amount, _)| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
if amount == 0 {
Ok((amount, tables.rct_outputs().len()?))
} else {
// v1 transactions.
match tables.num_outputs().get(&amount) {
#[expect(
clippy::cast_possible_truncation,
reason = "INVARIANT: #[cfg] @ lib.rs asserts `usize == u64`"
)]
Ok(count) => Ok((amount, count)),
// If we get a request for an `amount` that doesn't exist,
// we return `0` instead of an error.
Err(RuntimeError::KeyNotFound) => Ok((amount, 0)),
Err(e) => Err(e),
}
}
})
.collect::<Result<_, _>>()?;
// The 2nd mapping function.
// This is pulled out from the below `map()` for readability.
let inner_map = |amount, amount_index| -> DbResult<(AmountIndex, OutputOnChain)> {
let inner_map = |amount, amount_index| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
@ -430,26 +455,36 @@ fn outputs(env: &ConcreteEnv, outputs: HashMap<Amount, HashSet<AmountIndex>>) ->
amount_index,
};
let output_on_chain = id_to_output_on_chain(&id, tables)?;
let output_on_chain = match id_to_output_on_chain(&id, tables) {
Ok(output) => output,
Err(RuntimeError::KeyNotFound) => return Ok(Either::Right(amount_index)),
Err(e) => return Err(e),
};
Ok((amount_index, output_on_chain))
Ok(Either::Left((amount_index, output_on_chain)))
};
// Collect results using `rayon`.
let map = outputs
let (map, wanted_outputs) = outputs
.into_par_iter()
.map(|(amount, amount_index_set)| {
let (left, right) = amount_index_set
.into_par_iter()
.map(|amount_index| inner_map(amount, amount_index))
.collect::<Result<_, _>>()?;
Ok((
amount,
amount_index_set
.into_par_iter()
.map(|amount_index| inner_map(amount, amount_index))
.collect::<DbResult<HashMap<AmountIndex, OutputOnChain>>>()?,
(amount,
left),
(amount,
right)
))
})
.collect::<DbResult<HashMap<Amount, HashMap<AmountIndex, OutputOnChain>>>>()?;
.collect::<DbResult<(IndexMap<_, IndexMap<_, _>>, IndexMap<_, IndexSet<_>>)>>()?;
Ok(BlockchainResponse::Outputs(map))
let cache = OutputCache::new(map, amount_of_outs, wanted_outputs);
Ok(BlockchainResponse::Outputs(cache))
}
/// [`BlockchainReadRequest::NumberOutputsWithAmount`].

View file

@ -10,7 +10,7 @@ keywords = ["cuprate", "types"]
[features]
default = ["blockchain", "epee", "serde", "json", "hex"]
blockchain = []
blockchain = ["dep:indexmap"]
epee = ["dep:cuprate-epee-encoding"]
serde = ["dep:serde", "hex"]
proptest = ["dep:proptest", "dep:proptest-derive"]
@ -31,6 +31,7 @@ hex = { workspace = true, features = ["serde", "alloc"], optional =
serde = { workspace = true, features = ["std", "derive"], optional = true }
strum = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
indexmap = { workspace = true, features = ["std"], optional = true }
proptest = { workspace = true, optional = true }
proptest-derive = { workspace = true, optional = true }

View file

@ -10,6 +10,7 @@ use std::{
use monero_serai::block::Block;
use crate::output_cache::OutputCache;
use crate::{
types::{Chain, ExtendedBlockHeader, OutputOnChain, TxsInBlock, VerifiedBlockInformation},
AltBlockInformation, BlockCompleteEntry, ChainId, ChainInfo, CoinbaseTxSum,
@ -256,7 +257,7 @@ pub enum BlockchainResponse {
///
/// Inner value is all the outputs requested,
/// associated with their amount and amount index.
Outputs(HashMap<u64, HashMap<u64, OutputOnChain>>),
Outputs(OutputCache),
/// Response to [`BlockchainReadRequest::NumberOutputsWithAmount`].
///

View file

@ -34,10 +34,12 @@ pub use types::{
#[cfg(feature = "blockchain")]
pub mod blockchain;
#[cfg(feature = "blockchain")]
pub mod output_cache;
#[cfg(feature = "json")]
pub mod json;
#[cfg(feature = "hex")]
pub mod hex;
//---------------------------------------------------------------------------------------------------- Private