From 1a178381dd0fd4d8436e86f5e03dbe475e91b24b Mon Sep 17 00:00:00 2001 From: Boog900 Date: Mon, 5 Aug 2024 20:47:30 +0000 Subject: [PATCH] Storage: split the DB service abstraction (#237) * split the DB service abstraction * fix ci * misc changes * Apply suggestions from code review Co-authored-by: hinto-janai * review fixes * Update storage/service/Cargo.toml Co-authored-by: hinto-janai * Update storage/service/Cargo.toml Co-authored-by: hinto-janai * fix clippy * fix doc * `bc` -> `blockchain` * doc fixes * Update storage/service/README.md Co-authored-by: hinto-janai * cargo fmt --------- Co-authored-by: hinto-janai --- Cargo.lock | 69 ++--- Cargo.toml | 2 +- consensus/fast-sync/src/create.rs | 12 +- consensus/src/context/alt_chains.rs | 15 +- consensus/src/context/difficulty.rs | 6 +- consensus/src/context/hardforks.rs | 10 +- consensus/src/context/rx_vms.rs | 10 +- consensus/src/context/task.rs | 21 +- consensus/src/context/weight.rs | 14 +- consensus/src/lib.rs | 21 +- consensus/src/tests/mock_db.rs | 26 +- consensus/src/transactions.rs | 12 +- consensus/src/transactions/contextual_data.rs | 14 +- consensus/tests/verify_correct_txs.rs | 14 +- storage/blockchain/Cargo.toml | 20 +- storage/blockchain/src/config/backend.rs | 31 --- storage/blockchain/src/config/config.rs | 3 +- storage/blockchain/src/config/mod.rs | 9 +- storage/blockchain/src/config/sync_mode.rs | 135 ---------- storage/blockchain/src/service/free.rs | 22 +- storage/blockchain/src/service/mod.rs | 38 +-- storage/blockchain/src/service/read.rs | 254 +++++------------- storage/blockchain/src/service/tests.rs | 74 ++--- storage/blockchain/src/service/types.rs | 30 +-- storage/blockchain/src/service/write.rs | 211 ++------------- storage/service/Cargo.toml | 22 ++ storage/service/README.md | 7 + storage/service/src/lib.rs | 8 + .../config => service/src}/reader_threads.rs | 39 ++- storage/service/src/service.rs | 5 + storage/service/src/service/read.rs | 95 +++++++ storage/service/src/service/write.rs | 178 ++++++++++++ types/src/blockchain.rs | 48 ++-- 33 files changed, 666 insertions(+), 809 deletions(-) delete mode 100644 storage/blockchain/src/config/backend.rs delete mode 100644 storage/blockchain/src/config/sync_mode.rs create mode 100644 storage/service/Cargo.toml create mode 100644 storage/service/README.md create mode 100644 storage/service/src/lib.rs rename storage/{blockchain/src/config => service/src}/reader_threads.rs (84%) create mode 100644 storage/service/src/service.rs create mode 100644 storage/service/src/service/read.rs create mode 100644 storage/service/src/service/write.rs diff --git a/Cargo.lock b/Cargo.lock index 32a5cbd..eaf5f99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -29,12 +29,6 @@ dependencies = [ "zerocopy", ] -[[package]] -name = "allocator-api2" -version = "0.2.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" - [[package]] name = "android-tzdata" version = "0.1.1" @@ -471,7 +465,7 @@ dependencies = [ "cuprate-test-utils", "cuprate-wire", "futures", - "indexmap 2.2.6", + "indexmap", "rand", "thiserror", "tokio", @@ -496,14 +490,13 @@ version = "0.0.0" dependencies = [ "bitflags 2.5.0", "bytemuck", - "crossbeam", "cuprate-database", + "cuprate-database-service", "cuprate-helper", "cuprate-pruning", "cuprate-test-utils", "cuprate-types", "curve25519-dalek", - "futures", "hex", "hex-literal", "monero-serai", @@ -514,7 +507,6 @@ dependencies = [ "tempfile", "thread_local", "tokio", - "tokio-util", "tower", ] @@ -609,6 +601,19 @@ dependencies = [ "thiserror", ] +[[package]] +name = "cuprate-database-service" +version = "0.1.0" +dependencies = [ + "crossbeam", + "cuprate-database", + "cuprate-helper", + "futures", + "rayon", + "serde", + "tower", +] + [[package]] name = "cuprate-epee-encoding" version = "0.5.0" @@ -709,7 +714,7 @@ dependencies = [ "dashmap", "futures", "hex", - "indexmap 2.2.6", + "indexmap", "monero-serai", "pin-project", "proptest", @@ -882,7 +887,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.5", + "hashbrown", "lock_api", "once_cell", "parking_lot_core", @@ -1174,12 +1179,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "hashbrown" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" - [[package]] name = "hashbrown" version = "0.14.5" @@ -1187,17 +1186,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ "ahash", - "allocator-api2", -] - -[[package]] -name = "hdrhistogram" -version = "7.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" -dependencies = [ - "byteorder", - "num-traits", ] [[package]] @@ -1521,16 +1509,6 @@ dependencies = [ "utf8_iter", ] -[[package]] -name = "indexmap" -version = "1.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" -dependencies = [ - "autocfg", - "hashbrown 0.12.3", -] - [[package]] name = "indexmap" version = "2.2.6" @@ -1538,7 +1516,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" dependencies = [ "equivalent", - "hashbrown 0.14.5", + "hashbrown", ] [[package]] @@ -2464,7 +2442,7 @@ name = "std-shims" version = "0.1.1" source = "git+https://github.com/Cuprate/serai.git?rev=d27d934#d27d93480aa8a849d84214ad4c71d83ce6fea0c1" dependencies = [ - "hashbrown 0.14.5", + "hashbrown", "spin", ] @@ -2669,10 +2647,7 @@ checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", - "futures-io", "futures-sink", - "futures-util", - "hashbrown 0.14.5", "pin-project-lite", "slab", "tokio", @@ -2691,7 +2666,7 @@ version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ - "indexmap 2.2.6", + "indexmap", "toml_datetime", "winnow", ] @@ -2704,12 +2679,8 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", - "hdrhistogram", - "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", - "slab", "tokio", "tokio-util", "tower-layer", diff --git a/Cargo.toml b/Cargo.toml index 22a1585..da82d9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ members = [ "p2p/async-buffer", "p2p/address-book", "storage/blockchain", + "storage/service", "storage/txpool", "storage/database", "pruning", @@ -243,7 +244,6 @@ try_err = "deny" lossy_float_literal = "deny" let_underscore_must_use = "deny" iter_over_hash_type = "deny" -impl_trait_in_params = "deny" get_unwrap = "deny" error_impl_error = "deny" empty_structs_with_brackets = "deny" diff --git a/consensus/fast-sync/src/create.rs b/consensus/fast-sync/src/create.rs index adae100..8d4f9a6 100644 --- a/consensus/fast-sync/src/create.rs +++ b/consensus/fast-sync/src/create.rs @@ -4,10 +4,10 @@ use clap::Parser; use tower::{Service, ServiceExt}; use cuprate_blockchain::{ - config::ConfigBuilder, cuprate_database::RuntimeError, service::DatabaseReadHandle, + config::ConfigBuilder, cuprate_database::RuntimeError, service::BlockchainReadHandle, }; use cuprate_types::{ - blockchain::{BCReadRequest, BCResponse}, + blockchain::{BlockchainReadRequest, BlockchainResponse}, Chain, }; @@ -16,18 +16,18 @@ use cuprate_fast_sync::{hash_of_hashes, BlockId, HashOfHashes}; const BATCH_SIZE: u64 = 512; async fn read_batch( - handle: &mut DatabaseReadHandle, + handle: &mut BlockchainReadHandle, height_from: u64, ) -> Result, RuntimeError> { let mut block_ids = Vec::::with_capacity(BATCH_SIZE as usize); for height in height_from..(height_from + BATCH_SIZE) { - let request = BCReadRequest::BlockHash(height, Chain::Main); + let request = BlockchainReadRequest::BlockHash(height, Chain::Main); let response_channel = handle.ready().await?.call(request); let response = response_channel.await?; match response { - BCResponse::BlockHash(block_id) => block_ids.push(block_id), + BlockchainResponse::BlockHash(block_id) => block_ids.push(block_id), _ => unreachable!(), } } @@ -63,7 +63,7 @@ async fn main() { let config = ConfigBuilder::new().build(); - let (mut read_handle, _) = cuprate_blockchain::service::init(config).unwrap(); + let (mut read_handle, _, _) = cuprate_blockchain::service::init(config).unwrap(); let mut hashes_of_hashes = Vec::new(); diff --git a/consensus/src/context/alt_chains.rs b/consensus/src/context/alt_chains.rs index 71af8a1..f0c391d 100644 --- a/consensus/src/context/alt_chains.rs +++ b/consensus/src/context/alt_chains.rs @@ -4,7 +4,7 @@ use tower::ServiceExt; use cuprate_consensus_rules::{blocks::BlockError, ConsensusError}; use cuprate_types::{ - blockchain::{BCReadRequest, BCResponse}, + blockchain::{BlockchainReadRequest, BlockchainResponse}, Chain, ChainId, }; @@ -100,8 +100,9 @@ impl AltChainMap { } // find the block with hash == prev_id. - let BCResponse::FindBlock(res) = - database.oneshot(BCReadRequest::FindBlock(prev_id)).await? + let BlockchainResponse::FindBlock(res) = database + .oneshot(BlockchainReadRequest::FindBlock(prev_id)) + .await? else { panic!("Database returned wrong response"); }; @@ -130,10 +131,10 @@ pub async fn get_alt_chain_difficulty_cache( mut database: D, ) -> Result { // find the block with hash == prev_id. - let BCResponse::FindBlock(res) = database + let BlockchainResponse::FindBlock(res) = database .ready() .await? - .call(BCReadRequest::FindBlock(prev_id)) + .call(BlockchainReadRequest::FindBlock(prev_id)) .await? else { panic!("Database returned wrong response"); @@ -177,10 +178,10 @@ pub async fn get_alt_chain_weight_cache( mut database: D, ) -> Result { // find the block with hash == prev_id. - let BCResponse::FindBlock(res) = database + let BlockchainResponse::FindBlock(res) = database .ready() .await? - .call(BCReadRequest::FindBlock(prev_id)) + .call(BlockchainReadRequest::FindBlock(prev_id)) .await? else { panic!("Database returned wrong response"); diff --git a/consensus/src/context/difficulty.rs b/consensus/src/context/difficulty.rs index b025dfc..9ec0f1e 100644 --- a/consensus/src/context/difficulty.rs +++ b/consensus/src/context/difficulty.rs @@ -13,7 +13,7 @@ use tracing::instrument; use cuprate_helper::num::median; use cuprate_types::{ - blockchain::{BCReadRequest, BCResponse}, + blockchain::{BlockchainReadRequest, BlockchainResponse}, Chain, }; @@ -373,8 +373,8 @@ async fn get_blocks_in_pow_info( ) -> Result<(VecDeque, VecDeque), ExtendedConsensusError> { tracing::info!("Getting blocks timestamps"); - let BCResponse::BlockExtendedHeaderInRange(ext_header) = database - .oneshot(BCReadRequest::BlockExtendedHeaderInRange( + let BlockchainResponse::BlockExtendedHeaderInRange(ext_header) = database + .oneshot(BlockchainReadRequest::BlockExtendedHeaderInRange( block_heights, chain, )) diff --git a/consensus/src/context/hardforks.rs b/consensus/src/context/hardforks.rs index 2243350..7972a0e 100644 --- a/consensus/src/context/hardforks.rs +++ b/consensus/src/context/hardforks.rs @@ -5,7 +5,7 @@ use tracing::instrument; use cuprate_consensus_rules::{HFVotes, HFsInfo, HardFork}; use cuprate_types::{ - blockchain::{BCReadRequest, BCResponse}, + blockchain::{BlockchainReadRequest, BlockchainResponse}, Chain, }; @@ -90,10 +90,10 @@ impl HardForkState { debug_assert_eq!(votes.total_votes(), config.window) } - let BCResponse::BlockExtendedHeader(ext_header) = database + let BlockchainResponse::BlockExtendedHeader(ext_header) = database .ready() .await? - .call(BCReadRequest::BlockExtendedHeader(chain_height - 1)) + .call(BlockchainReadRequest::BlockExtendedHeader(chain_height - 1)) .await? else { panic!("Database sent incorrect response!"); @@ -214,8 +214,8 @@ async fn get_votes_in_range( ) -> Result { let mut votes = HFVotes::new(window_size); - let BCResponse::BlockExtendedHeaderInRange(vote_list) = database - .oneshot(BCReadRequest::BlockExtendedHeaderInRange( + let BlockchainResponse::BlockExtendedHeaderInRange(vote_list) = database + .oneshot(BlockchainReadRequest::BlockExtendedHeaderInRange( block_heights, Chain::Main, )) diff --git a/consensus/src/context/rx_vms.rs b/consensus/src/context/rx_vms.rs index 3154648..649146f 100644 --- a/consensus/src/context/rx_vms.rs +++ b/consensus/src/context/rx_vms.rs @@ -22,7 +22,7 @@ use cuprate_consensus_rules::{ }; use cuprate_helper::asynch::rayon_spawn_async; use cuprate_types::{ - blockchain::{BCReadRequest, BCResponse}, + blockchain::{BlockchainReadRequest, BlockchainResponse}, Chain, }; @@ -138,8 +138,8 @@ impl RandomXVMCache { ) -> Result, ExtendedConsensusError> { let seed_height = randomx_seed_height(height); - let BCResponse::BlockHash(seed_hash) = database - .oneshot(BCReadRequest::BlockHash(seed_height, chain)) + let BlockchainResponse::BlockHash(seed_hash) = database + .oneshot(BlockchainReadRequest::BlockHash(seed_height, chain)) .await? else { panic!("Database returned wrong response!"); @@ -273,9 +273,9 @@ async fn get_block_hashes( for height in heights { let db = database.clone(); fut.push_back(async move { - let BCResponse::BlockHash(hash) = db + let BlockchainResponse::BlockHash(hash) = db .clone() - .oneshot(BCReadRequest::BlockHash(height, Chain::Main)) + .oneshot(BlockchainReadRequest::BlockHash(height, Chain::Main)) .await? else { panic!("Database sent incorrect response!"); diff --git a/consensus/src/context/task.rs b/consensus/src/context/task.rs index 1fa68a2..79ddf4c 100644 --- a/consensus/src/context/task.rs +++ b/consensus/src/context/task.rs @@ -10,7 +10,7 @@ use tracing::Instrument; use cuprate_consensus_rules::blocks::ContextToVerifyBlock; use cuprate_types::{ - blockchain::{BCReadRequest, BCResponse}, + blockchain::{BlockchainReadRequest, BlockchainResponse}, Chain, }; @@ -76,19 +76,19 @@ impl ContextTask { tracing::debug!("Initialising blockchain context"); - let BCResponse::ChainHeight(chain_height, top_block_hash) = database + let BlockchainResponse::ChainHeight(chain_height, top_block_hash) = database .ready() .await? - .call(BCReadRequest::ChainHeight) + .call(BlockchainReadRequest::ChainHeight) .await? else { panic!("Database sent incorrect response!"); }; - let BCResponse::GeneratedCoins(already_generated_coins) = database + let BlockchainResponse::GeneratedCoins(already_generated_coins) = database .ready() .await? - .call(BCReadRequest::GeneratedCoins(chain_height - 1)) + .call(BlockchainReadRequest::GeneratedCoins(chain_height - 1)) .await? else { panic!("Database sent incorrect response!"); @@ -248,21 +248,24 @@ impl ContextTask { self.chain_height -= numb_blocks; - let BCResponse::GeneratedCoins(already_generated_coins) = self + let BlockchainResponse::GeneratedCoins(already_generated_coins) = self .database .ready() .await? - .call(BCReadRequest::GeneratedCoins(self.chain_height - 1)) + .call(BlockchainReadRequest::GeneratedCoins(self.chain_height - 1)) .await? else { panic!("Database sent incorrect response!"); }; - let BCResponse::BlockHash(top_block_hash) = self + let BlockchainResponse::BlockHash(top_block_hash) = self .database .ready() .await? - .call(BCReadRequest::BlockHash(self.chain_height - 1, Chain::Main)) + .call(BlockchainReadRequest::BlockHash( + self.chain_height - 1, + Chain::Main, + )) .await? else { panic!("Database returned incorrect response!"); diff --git a/consensus/src/context/weight.rs b/consensus/src/context/weight.rs index 1084086..7cd5454 100644 --- a/consensus/src/context/weight.rs +++ b/consensus/src/context/weight.rs @@ -17,7 +17,7 @@ use tracing::instrument; use cuprate_consensus_rules::blocks::{penalty_free_zone, PENALTY_FREE_ZONE_5}; use cuprate_helper::{asynch::rayon_spawn_async, num::RollingMedian}; use cuprate_types::{ - blockchain::{BCReadRequest, BCResponse}, + blockchain::{BlockchainReadRequest, BlockchainResponse}, Chain, }; @@ -296,8 +296,10 @@ async fn get_blocks_weight_in_range( ) -> Result, ExtendedConsensusError> { tracing::info!("getting block weights."); - let BCResponse::BlockExtendedHeaderInRange(ext_headers) = database - .oneshot(BCReadRequest::BlockExtendedHeaderInRange(range, chain)) + let BlockchainResponse::BlockExtendedHeaderInRange(ext_headers) = database + .oneshot(BlockchainReadRequest::BlockExtendedHeaderInRange( + range, chain, + )) .await? else { panic!("Database sent incorrect response!") @@ -318,8 +320,10 @@ async fn get_long_term_weight_in_range( ) -> Result, ExtendedConsensusError> { tracing::info!("getting block long term weights."); - let BCResponse::BlockExtendedHeaderInRange(ext_headers) = database - .oneshot(BCReadRequest::BlockExtendedHeaderInRange(range, chain)) + let BlockchainResponse::BlockExtendedHeaderInRange(ext_headers) = database + .oneshot(BlockchainReadRequest::BlockExtendedHeaderInRange( + range, chain, + )) .await? else { panic!("Database sent incorrect response!") diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 1edafdc..3b7f2ae 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -7,8 +7,8 @@ //! - [`TxVerifierService`] Which handles transaction verification. //! //! This crate is generic over the database which is implemented as a [`tower::Service`]. To -//! implement a database you need to have a service which accepts [`BCReadRequest`] and responds -//! with [`BCResponse`]. +//! implement a database you need to have a service which accepts [`BlockchainReadRequest`] and responds +//! with [`BlockchainResponse`]. //! use cuprate_consensus_rules::{ConsensusError, HardFork}; @@ -27,7 +27,7 @@ pub use context::{ pub use transactions::{TxVerifierService, VerifyTxRequest, VerifyTxResponse}; // re-export. -pub use cuprate_types::blockchain::{BCReadRequest, BCResponse}; +pub use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; /// An Error returned from one of the consensus services. #[derive(Debug, thiserror::Error)] @@ -83,7 +83,7 @@ use __private::Database; pub mod __private { use std::future::Future; - use cuprate_types::blockchain::{BCReadRequest, BCResponse}; + use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; /// A type alias trait used to represent a database, so we don't have to write [`tower::Service`] bounds /// everywhere. @@ -94,8 +94,8 @@ pub mod __private { /// ``` pub trait Database: tower::Service< - BCReadRequest, - Response = BCResponse, + BlockchainReadRequest, + Response = BlockchainResponse, Error = tower::BoxError, Future = Self::Future2, > @@ -103,8 +103,13 @@ pub mod __private { type Future2: Future> + Send + 'static; } - impl> - crate::Database for T + impl< + T: tower::Service< + BlockchainReadRequest, + Response = BlockchainResponse, + Error = tower::BoxError, + >, + > crate::Database for T where T::Future: Future> + Send + 'static, { diff --git a/consensus/src/tests/mock_db.rs b/consensus/src/tests/mock_db.rs index c4fd75d..a620003 100644 --- a/consensus/src/tests/mock_db.rs +++ b/consensus/src/tests/mock_db.rs @@ -16,7 +16,7 @@ use proptest_derive::Arbitrary; use tower::{BoxError, Service}; use cuprate_types::{ - blockchain::{BCReadRequest, BCResponse}, + blockchain::{BlockchainReadRequest, BlockchainResponse}, ExtendedBlockHeader, }; @@ -133,8 +133,8 @@ impl DummyDatabase { } } -impl Service for DummyDatabase { - type Response = BCResponse; +impl Service for DummyDatabase { + type Response = BlockchainResponse; type Error = BoxError; type Future = Pin> + Send + 'static>>; @@ -143,13 +143,13 @@ impl Service for DummyDatabase { Poll::Ready(Ok(())) } - fn call(&mut self, req: BCReadRequest) -> Self::Future { + fn call(&mut self, req: BlockchainReadRequest) -> Self::Future { let blocks = self.blocks.clone(); let dummy_height = self.dummy_height; async move { Ok(match req { - BCReadRequest::BlockExtendedHeader(id) => { + BlockchainReadRequest::BlockExtendedHeader(id) => { let mut id = usize::try_from(id).unwrap(); if let Some(dummy_height) = dummy_height { let block_len = blocks.read().unwrap().len(); @@ -157,7 +157,7 @@ impl Service for DummyDatabase { id -= dummy_height - block_len; } - BCResponse::BlockExtendedHeader( + BlockchainResponse::BlockExtendedHeader( blocks .read() .unwrap() @@ -167,12 +167,12 @@ impl Service for DummyDatabase { .ok_or("block not in database!")?, ) } - BCReadRequest::BlockHash(id, _) => { + BlockchainReadRequest::BlockHash(id, _) => { let mut hash = [0; 32]; hash[0..8].copy_from_slice(&id.to_le_bytes()); - BCResponse::BlockHash(hash) + BlockchainResponse::BlockHash(hash) } - BCReadRequest::BlockExtendedHeaderInRange(range, _) => { + BlockchainReadRequest::BlockExtendedHeaderInRange(range, _) => { let mut end = usize::try_from(range.end).unwrap(); let mut start = usize::try_from(range.start).unwrap(); @@ -183,7 +183,7 @@ impl Service for DummyDatabase { start -= dummy_height - block_len; } - BCResponse::BlockExtendedHeaderInRange( + BlockchainResponse::BlockExtendedHeaderInRange( blocks .read() .unwrap() @@ -195,7 +195,7 @@ impl Service for DummyDatabase { .collect(), ) } - BCReadRequest::ChainHeight => { + BlockchainReadRequest::ChainHeight => { let height: u64 = dummy_height .unwrap_or(blocks.read().unwrap().len()) .try_into() @@ -204,9 +204,9 @@ impl Service for DummyDatabase { let mut top_hash = [0; 32]; top_hash[0..8].copy_from_slice(&height.to_le_bytes()); - BCResponse::ChainHeight(height, top_hash) + BlockchainResponse::ChainHeight(height, top_hash) } - BCReadRequest::GeneratedCoins(_) => BCResponse::GeneratedCoins(0), + BlockchainReadRequest::GeneratedCoins(_) => BlockchainResponse::GeneratedCoins(0), _ => unimplemented!("the context svc should not need these requests!"), }) } diff --git a/consensus/src/transactions.rs b/consensus/src/transactions.rs index 417eb48..78104e9 100644 --- a/consensus/src/transactions.rs +++ b/consensus/src/transactions.rs @@ -28,7 +28,7 @@ use cuprate_consensus_rules::{ ConsensusError, HardFork, TxVersion, }; use cuprate_helper::asynch::rayon_spawn_async; -use cuprate_types::blockchain::{BCReadRequest, BCResponse}; +use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; use crate::{ batch_verifier::MultiThreadedBatchVerifier, @@ -308,10 +308,10 @@ where }) })?; - let BCResponse::KeyImagesSpent(kis_spent) = database + let BlockchainResponse::KeyImagesSpent(kis_spent) = database .ready() .await? - .call(BCReadRequest::KeyImagesSpent(spent_kis)) + .call(BlockchainReadRequest::KeyImagesSpent(spent_kis)) .await? else { panic!("Database sent incorrect response!"); @@ -340,10 +340,12 @@ where if !verified_at_block_hashes.is_empty() { tracing::trace!("Filtering block hashes not in the main chain."); - let BCResponse::FilterUnknownHashes(known_hashes) = database + let BlockchainResponse::FilterUnknownHashes(known_hashes) = database .ready() .await? - .call(BCReadRequest::FilterUnknownHashes(verified_at_block_hashes)) + .call(BlockchainReadRequest::FilterUnknownHashes( + verified_at_block_hashes, + )) .await? else { panic!("Database returned wrong response!"); diff --git a/consensus/src/transactions/contextual_data.rs b/consensus/src/transactions/contextual_data.rs index 95e5262..b17fbe0 100644 --- a/consensus/src/transactions/contextual_data.rs +++ b/consensus/src/transactions/contextual_data.rs @@ -27,7 +27,7 @@ use cuprate_consensus_rules::{ ConsensusError, HardFork, TxVersion, }; use cuprate_types::{ - blockchain::{BCReadRequest, BCResponse}, + blockchain::{BlockchainReadRequest, BlockchainResponse}, OutputOnChain, }; @@ -153,19 +153,19 @@ pub async fn batch_get_ring_member_info( .map_err(ConsensusError::Transaction)?; } - let BCResponse::Outputs(outputs) = database + let BlockchainResponse::Outputs(outputs) = database .ready() .await? - .call(BCReadRequest::Outputs(output_ids)) + .call(BlockchainReadRequest::Outputs(output_ids)) .await? else { panic!("Database sent incorrect response!") }; - let BCResponse::NumberOutputsWithAmount(outputs_with_amount) = database + let BlockchainResponse::NumberOutputsWithAmount(outputs_with_amount) = database .ready() .await? - .call(BCReadRequest::NumberOutputsWithAmount( + .call(BlockchainReadRequest::NumberOutputsWithAmount( outputs.keys().copied().collect(), )) .await? @@ -234,10 +234,10 @@ pub async fn batch_get_decoy_info<'a, D: Database + Clone + Send + 'static>( unique_input_amounts.len() ); - let BCResponse::NumberOutputsWithAmount(outputs_with_amount) = database + let BlockchainResponse::NumberOutputsWithAmount(outputs_with_amount) = database .ready() .await? - .call(BCReadRequest::NumberOutputsWithAmount( + .call(BlockchainReadRequest::NumberOutputsWithAmount( unique_input_amounts.into_iter().collect(), )) .await? diff --git a/consensus/tests/verify_correct_txs.rs b/consensus/tests/verify_correct_txs.rs index b71b52d..7afb370 100644 --- a/consensus/tests/verify_correct_txs.rs +++ b/consensus/tests/verify_correct_txs.rs @@ -12,7 +12,7 @@ use cuprate_consensus::{ TxVerifierService, VerifyTxRequest, VerifyTxResponse, __private::Database, }; use cuprate_types::{ - blockchain::{BCReadRequest, BCResponse}, + blockchain::{BlockchainReadRequest, BlockchainResponse}, OutputOnChain, }; @@ -23,12 +23,12 @@ use cuprate_test_utils::data::TX_E2D393; fn dummy_database(outputs: BTreeMap) -> impl Database + Clone { let outputs = Arc::new(outputs); - service_fn(move |req: BCReadRequest| { + service_fn(move |req: BlockchainReadRequest| { ready(Ok(match req { - BCReadRequest::NumberOutputsWithAmount(_) => { - BCResponse::NumberOutputsWithAmount(HashMap::new()) + BlockchainReadRequest::NumberOutputsWithAmount(_) => { + BlockchainResponse::NumberOutputsWithAmount(HashMap::new()) } - BCReadRequest::Outputs(outs) => { + BlockchainReadRequest::Outputs(outs) => { let idxs = outs.get(&0).unwrap(); let mut ret = HashMap::new(); @@ -40,9 +40,9 @@ fn dummy_database(outputs: BTreeMap) -> impl Database + Clon .collect::>(), ); - BCResponse::Outputs(ret) + BlockchainResponse::Outputs(ret) } - BCReadRequest::KeyImagesSpent(_) => BCResponse::KeyImagesSpent(false), + BlockchainReadRequest::KeyImagesSpent(_) => BlockchainResponse::KeyImagesSpent(false), _ => panic!("Database request not needed for this test"), })) }) diff --git a/storage/blockchain/Cargo.toml b/storage/blockchain/Cargo.toml index 79d0dc4..f45f1bc 100644 --- a/storage/blockchain/Cargo.toml +++ b/storage/blockchain/Cargo.toml @@ -15,15 +15,16 @@ default = ["heed", "service"] heed = ["cuprate-database/heed"] redb = ["cuprate-database/redb"] redb-memory = ["cuprate-database/redb-memory"] -service = ["dep:crossbeam", "dep:futures", "dep:tokio", "dep:tokio-util", "dep:tower", "dep:rayon"] +service = ["dep:thread_local", "dep:rayon"] [dependencies] # FIXME: # We only need the `thread` feature if `service` is enabled. # Figure out how to enable features of an already pulled in dependency conditionally. -cuprate-database = { path = "../database" } -cuprate-helper = { path = "../../helper", features = ["fs", "thread", "map"] } -cuprate-types = { path = "../../types", features = ["blockchain"] } +cuprate-database = { path = "../database" } +cuprate-database-service = { path = "../service" } +cuprate-helper = { path = "../../helper", features = ["fs", "thread", "map"] } +cuprate-types = { path = "../../types", features = ["blockchain"] } bitflags = { workspace = true, features = ["serde", "bytemuck"] } bytemuck = { version = "1.14.3", features = ["must_cast", "derive", "min_const_generics", "extern_crate_alloc"] } @@ -33,19 +34,16 @@ monero-serai = { workspace = true, features = ["std"] } serde = { workspace = true, optional = true } # `service` feature. -crossbeam = { workspace = true, features = ["std"], optional = true } -futures = { workspace = true, optional = true } -tokio = { workspace = true, features = ["full"], optional = true } -tokio-util = { workspace = true, features = ["full"], optional = true } -tower = { workspace = true, features = ["full"], optional = true } -thread_local = { workspace = true } +tower = { workspace = true } +thread_local = { workspace = true, optional = true } rayon = { workspace = true, optional = true } [dev-dependencies] cuprate-helper = { path = "../../helper", features = ["thread"] } cuprate-test-utils = { path = "../../test-utils" } -tempfile = { version = "3.10.0" } +tokio = { workspace = true, features = ["full"] } +tempfile = { workspace = true } pretty_assertions = { workspace = true } proptest = { workspace = true } hex = { workspace = true } diff --git a/storage/blockchain/src/config/backend.rs b/storage/blockchain/src/config/backend.rs deleted file mode 100644 index ee72b3d..0000000 --- a/storage/blockchain/src/config/backend.rs +++ /dev/null @@ -1,31 +0,0 @@ -//! SOMEDAY - -//---------------------------------------------------------------------------------------------------- Import -use std::{ - borrow::Cow, - num::NonZeroUsize, - path::{Path, PathBuf}, -}; - -#[cfg(feature = "serde")] -use serde::{Deserialize, Serialize}; - -use cuprate_helper::fs::cuprate_blockchain_dir; - -use crate::{ - config::{ReaderThreads, SyncMode}, - constants::DATABASE_DATA_FILENAME, - resize::ResizeAlgorithm, -}; - -//---------------------------------------------------------------------------------------------------- Backend -/// SOMEDAY: allow runtime hot-swappable backends. -#[derive(Copy, Clone, Debug, Default, PartialEq, PartialOrd, Eq, Ord, Hash)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub enum Backend { - #[default] - /// SOMEDAY - Heed, - /// SOMEDAY - Redb, -} diff --git a/storage/blockchain/src/config/config.rs b/storage/blockchain/src/config/config.rs index c58e292..5bfbf74 100644 --- a/storage/blockchain/src/config/config.rs +++ b/storage/blockchain/src/config/config.rs @@ -9,7 +9,8 @@ use serde::{Deserialize, Serialize}; use cuprate_database::{config::SyncMode, resize::ResizeAlgorithm}; use cuprate_helper::fs::cuprate_blockchain_dir; -use crate::config::ReaderThreads; +// re-exports +pub use cuprate_database_service::ReaderThreads; //---------------------------------------------------------------------------------------------------- ConfigBuilder /// Builder for [`Config`]. diff --git a/storage/blockchain/src/config/mod.rs b/storage/blockchain/src/config/mod.rs index 7ecc14c..555a6e6 100644 --- a/storage/blockchain/src/config/mod.rs +++ b/storage/blockchain/src/config/mod.rs @@ -34,14 +34,11 @@ //! .build(); //! //! // Start a database `service` using this configuration. -//! let (reader_handle, _) = cuprate_blockchain::service::init(config.clone())?; +//! let (_, _, env) = cuprate_blockchain::service::init(config.clone())?; //! // It's using the config we provided. -//! assert_eq!(reader_handle.env().config(), &config.db_config); +//! assert_eq!(env.config(), &config.db_config); //! # Ok(()) } //! ``` mod config; -pub use config::{Config, ConfigBuilder}; - -mod reader_threads; -pub use reader_threads::ReaderThreads; +pub use config::{Config, ConfigBuilder, ReaderThreads}; diff --git a/storage/blockchain/src/config/sync_mode.rs b/storage/blockchain/src/config/sync_mode.rs deleted file mode 100644 index 1d20339..0000000 --- a/storage/blockchain/src/config/sync_mode.rs +++ /dev/null @@ -1,135 +0,0 @@ -//! Database [`Env`](crate::Env) configuration. -//! -//! This module contains the main [`Config`]uration struct -//! for the database [`Env`](crate::Env)ironment, and data -//! structures related to any configuration setting. -//! -//! These configurations are processed at runtime, meaning -//! the `Env` can/will dynamically adjust its behavior -//! based on these values. - -//---------------------------------------------------------------------------------------------------- Import - -#[cfg(feature = "serde")] -use serde::{Deserialize, Serialize}; - -//---------------------------------------------------------------------------------------------------- SyncMode -/// Disk synchronization mode. -/// -/// This controls how/when the database syncs its data to disk. -/// -/// Regardless of the variant chosen, dropping [`Env`](crate::Env) -/// will always cause it to fully sync to disk. -/// -/// # Sync vs Async -/// All invariants except [`SyncMode::Async`] & [`SyncMode::Fast`] -/// are `synchronous`, as in the database will wait until the OS has -/// finished syncing all the data to disk before continuing. -/// -/// `SyncMode::Async` & `SyncMode::Fast` are `asynchronous`, meaning -/// the database will _NOT_ wait until the data is fully synced to disk -/// before continuing. Note that this doesn't mean the database itself -/// won't be synchronized between readers/writers, but rather that the -/// data _on disk_ may not be immediately synchronized after a write. -/// -/// Something like: -/// ```rust,ignore -/// db.put("key", value); -/// db.get("key"); -/// ``` -/// will be fine, most likely pulling from memory instead of disk. -/// -/// # SOMEDAY -/// Dynamic sync's are not yet supported. -/// -/// Only: -/// -/// - [`SyncMode::Safe`] -/// - [`SyncMode::Async`] -/// - [`SyncMode::Fast`] -/// -/// are supported, all other variants will panic on [`crate::Env::open`]. -#[derive(Copy, Clone, Debug, Default, PartialEq, PartialOrd, Eq, Ord, Hash)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub enum SyncMode { - /// Use [`SyncMode::Fast`] until fully synced, - /// then use [`SyncMode::Safe`]. - /// - // # SOMEDAY: how to implement this? - // ref: - // monerod-solution: - // cuprate-issue: - // - // We could: - // ```rust,ignore - // if current_db_block <= top_block.saturating_sub(N) { - // // don't sync() - // } else { - // // sync() - // } - // ``` - // where N is some threshold we pick that is _close_ enough - // to being synced where we want to start being safer. - // - // Essentially, when we are in a certain % range of being finished, - // switch to safe mode, until then, go fast. - FastThenSafe, - - #[default] - /// Fully sync to disk per transaction. - /// - /// Every database transaction commit will - /// fully sync all data to disk, _synchronously_, - /// so the database (writer) halts until synced. - /// - /// This is expected to be very slow. - /// - /// This matches: - /// - LMDB without any special sync flags - /// - [`redb::Durability::Immediate`](https://docs.rs/redb/1.5.0/redb/enum.Durability.html#variant.Immediate) - Safe, - - /// Asynchrously sync to disk per transaction. - /// - /// This is the same as [`SyncMode::Safe`], - /// but the syncs will be asynchronous, i.e. - /// each transaction commit will sync to disk, - /// but only eventually, not necessarily immediately. - /// - /// This matches: - /// - [`MDB_MAPASYNC`](http://www.lmdb.tech/doc/group__mdb__env.html#gab034ed0d8e5938090aef5ee0997f7e94) - /// - [`redb::Durability::Eventual`](https://docs.rs/redb/1.5.0/redb/enum.Durability.html#variant.Eventual) - Async, - - /// Fully sync to disk after we cross this transaction threshold. - /// - /// After committing [`usize`] amount of database - /// transactions, it will be sync to disk. - /// - /// `0` behaves the same as [`SyncMode::Safe`], and a ridiculously large - /// number like `usize::MAX` is practically the same as [`SyncMode::Fast`]. - Threshold(usize), - - /// Only flush at database shutdown. - /// - /// This is the fastest, yet unsafest option. - /// - /// It will cause the database to never _actively_ sync, - /// letting the OS decide when to flush data to disk. - /// - /// This matches: - /// - [`MDB_NOSYNC`](http://www.lmdb.tech/doc/group__mdb__env.html#ga5791dd1adb09123f82dd1f331209e12e) + [`MDB_MAPASYNC`](http://www.lmdb.tech/doc/group__mdb__env.html#gab034ed0d8e5938090aef5ee0997f7e94) - /// - [`redb::Durability::None`](https://docs.rs/redb/1.5.0/redb/enum.Durability.html#variant.None) - /// - /// `monerod` reference: - /// - /// # Corruption - /// In the case of a system crash, the database - /// may become corrupted when using this option. - // - // FIXME: we could call this `unsafe` - // and use that terminology in the config file - // so users know exactly what they are getting - // themselves into. - Fast, -} diff --git a/storage/blockchain/src/service/free.rs b/storage/blockchain/src/service/free.rs index 3701f66..21fb05b 100644 --- a/storage/blockchain/src/service/free.rs +++ b/storage/blockchain/src/service/free.rs @@ -3,11 +3,12 @@ //---------------------------------------------------------------------------------------------------- Import use std::sync::Arc; -use cuprate_database::InitError; +use cuprate_database::{ConcreteEnv, InitError}; +use crate::service::{init_read_service, init_write_service}; use crate::{ config::Config, - service::{DatabaseReadHandle, DatabaseWriteHandle}, + service::types::{BlockchainReadHandle, BlockchainWriteHandle}, }; //---------------------------------------------------------------------------------------------------- Init @@ -20,17 +21,26 @@ use crate::{ /// /// # Errors /// This will forward the error if [`crate::open`] failed. -pub fn init(config: Config) -> Result<(DatabaseReadHandle, DatabaseWriteHandle), InitError> { +pub fn init( + config: Config, +) -> Result< + ( + BlockchainReadHandle, + BlockchainWriteHandle, + Arc, + ), + InitError, +> { let reader_threads = config.reader_threads; // Initialize the database itself. let db = Arc::new(crate::open(config)?); // Spawn the Reader thread pool and Writer. - let readers = DatabaseReadHandle::init(&db, reader_threads); - let writer = DatabaseWriteHandle::init(db); + let readers = init_read_service(db.clone(), reader_threads); + let writer = init_write_service(db.clone()); - Ok((readers, writer)) + Ok((readers, writer, db)) } //---------------------------------------------------------------------------------------------------- Compact history diff --git a/storage/blockchain/src/service/mod.rs b/storage/blockchain/src/service/mod.rs index bf2d8e7..993c52d 100644 --- a/storage/blockchain/src/service/mod.rs +++ b/storage/blockchain/src/service/mod.rs @@ -14,8 +14,8 @@ //! //! ## Handles //! The 2 handles to the database are: -//! - [`DatabaseReadHandle`] -//! - [`DatabaseWriteHandle`] +//! - [`BlockchainReadHandle`] +//! - [`BlockchainWriteHandle`] //! //! The 1st allows any caller to send [`ReadRequest`][req_r]s. //! @@ -33,8 +33,10 @@ //! //! ## Shutdown //! Upon the above handles being dropped, the corresponding thread(s) will automatically exit, i.e: -//! - The last [`DatabaseReadHandle`] is dropped => reader thread-pool exits -//! - The last [`DatabaseWriteHandle`] is dropped => writer thread exits +//! - The last [`BlockchainReadHandle`] is dropped => reader thread-pool exits +//! - The last [`BlockchainWriteHandle`] is dropped => writer thread exits +//! +//! TODO: update this when `ConcreteEnv` is removed //! //! Upon dropping the [`cuprate_database::ConcreteEnv`]: //! - All un-processed database transactions are completed @@ -50,11 +52,11 @@ //! This channel can be `.await`ed upon to (eventually) receive //! the corresponding `Response` to your `Request`. //! -//! [req_r]: cuprate_types::blockchain::BCReadRequest +//! [req_r]: cuprate_types::blockchain::BlockchainReadRequest //! -//! [req_w]: cuprate_types::blockchain::BCWriteRequest +//! [req_w]: cuprate_types::blockchain::BlockchainWriteRequest //! -//! [resp]: cuprate_types::blockchain::BCResponse +//! [resp]: cuprate_types::blockchain::BlockchainResponse //! //! # Example //! Simple usage of `service`. @@ -63,7 +65,7 @@ //! use hex_literal::hex; //! use tower::{Service, ServiceExt}; //! -//! use cuprate_types::{blockchain::{BCReadRequest, BCWriteRequest, BCResponse}, Chain}; +//! use cuprate_types::{blockchain::{BlockchainReadRequest, BlockchainWriteRequest, BlockchainResponse}, Chain}; //! use cuprate_test_utils::data::block_v16_tx0; //! //! use cuprate_blockchain::{ @@ -81,12 +83,12 @@ //! .build(); //! //! // Initialize the database thread-pool. -//! let (mut read_handle, mut write_handle) = cuprate_blockchain::service::init(config)?; +//! let (mut read_handle, mut write_handle, _) = cuprate_blockchain::service::init(config)?; //! //! // Prepare a request to write block. //! let mut block = block_v16_tx0().clone(); //! # block.height = 0_u64; // must be 0th height or panic in `add_block()` -//! let request = BCWriteRequest::WriteBlock(block); +//! let request = BlockchainWriteRequest::WriteBlock(block); //! //! // Send the request. //! // We receive back an `async` channel that will @@ -96,16 +98,16 @@ //! //! // Block write was OK. //! let response = response_channel.await?; -//! assert_eq!(response, BCResponse::WriteBlockOk); +//! assert_eq!(response, BlockchainResponse::WriteBlockOk); //! //! // Now, let's try getting the block hash //! // of the block we just wrote. -//! let request = BCReadRequest::BlockHash(0, Chain::Main); +//! let request = BlockchainReadRequest::BlockHash(0, Chain::Main); //! let response_channel = read_handle.ready().await?.call(request); //! let response = response_channel.await?; //! assert_eq!( //! response, -//! BCResponse::BlockHash( +//! BlockchainResponse::BlockHash( //! hex!("43bd1f2b6556dcafa413d8372974af59e4e8f37dbf74dc6b2a9b7212d0577428") //! ) //! ); @@ -118,17 +120,19 @@ //! # Ok(()) } //! ``` +// needed for docs +use tower as _; + mod read; -pub use read::DatabaseReadHandle; +pub use read::{init_read_service, init_read_service_with_pool}; mod write; -pub use write::DatabaseWriteHandle; +pub use write::init_write_service; mod free; pub use free::init; - -// Internal type aliases for `service`. mod types; +pub use types::{BlockchainReadHandle, BlockchainWriteHandle}; #[cfg(test)] mod tests; diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index a5d51f1..fbd9f89 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -4,24 +4,23 @@ use std::{ collections::{HashMap, HashSet}, sync::Arc, - task::{Context, Poll}, }; -use futures::{channel::oneshot, ready}; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon::{ + iter::{IntoParallelIterator, ParallelIterator}, + ThreadPool, +}; use thread_local::ThreadLocal; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; -use tokio_util::sync::PollSemaphore; use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError}; -use cuprate_helper::{asynch::InfallibleOneshotReceiver, map::combine_low_high_bits_to_u128}; +use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; +use cuprate_helper::map::combine_low_high_bits_to_u128; use cuprate_types::{ - blockchain::{BCReadRequest, BCResponse}, + blockchain::{BlockchainReadRequest, BlockchainResponse}, Chain, ExtendedBlockHeader, OutputOnChain, }; use crate::{ - config::ReaderThreads, ops::{ block::{ block_exists, get_block_extended_header_from_height, get_block_height, get_block_info, @@ -32,156 +31,38 @@ use crate::{ }, service::{ free::{compact_history_genesis_not_included, compact_history_index_to_height_offset}, - types::{ResponseReceiver, ResponseResult, ResponseSender}, + types::{BlockchainReadHandle, ResponseResult}, }, - tables::OpenTables, - tables::{BlockHeights, BlockInfos, Tables}, + tables::{BlockHeights, BlockInfos, OpenTables, Tables}, types::{Amount, AmountIndex, BlockHash, BlockHeight, KeyImage, PreRctOutputId}, }; -//---------------------------------------------------------------------------------------------------- DatabaseReadHandle -/// Read handle to the database. +//---------------------------------------------------------------------------------------------------- init_read_service +/// Initialize the [`BlockchainReadHandle`] thread-pool backed by [`rayon`]. /// -/// This is cheaply [`Clone`]able handle that -/// allows `async`hronously reading from the database. +/// This spawns `threads` amount of reader threads +/// attached to `env` and returns a handle to the pool. /// -/// Calling [`tower::Service::call`] with a [`DatabaseReadHandle`] & [`BCReadRequest`] -/// will return an `async`hronous channel that can be `.await`ed upon -/// to receive the corresponding [`BCResponse`]. -pub struct DatabaseReadHandle { - /// Handle to the custom `rayon` DB reader thread-pool. - /// - /// Requests are [`rayon::ThreadPool::spawn`]ed in this thread-pool, - /// and responses are returned via a channel we (the caller) provide. - pool: Arc, +/// Should be called _once_ per actual database. Calling this function more than once will create +/// multiple unnecessary rayon thread-pools. +#[cold] +#[inline(never)] // Only called once. +pub fn init_read_service(env: Arc, threads: ReaderThreads) -> BlockchainReadHandle { + init_read_service_with_pool(env, init_thread_pool(threads)) +} - /// Counting semaphore asynchronous permit for database access. - /// Each [`tower::Service::poll_ready`] will acquire a permit - /// before actually sending a request to the `rayon` DB threadpool. - semaphore: PollSemaphore, - - /// An owned permit. - /// This will be set to [`Some`] in `poll_ready()` when we successfully acquire - /// the permit, and will be [`Option::take()`]n after `tower::Service::call()` is called. - /// - /// The actual permit will be dropped _after_ the rayon DB thread has finished - /// the request, i.e., after [`map_request()`] finishes. - permit: Option, - - /// Access to the database. +/// Initialize the blockchain database read service, with a specific rayon thread-pool instead of +/// creating a new one. +/// +/// Should be called _once_ per actual database, although nothing bad will happen, cloning the [`BlockchainReadHandle`] +/// is the correct way to get multiple handles to the database. +#[cold] +#[inline(never)] // Only called once. +pub fn init_read_service_with_pool( env: Arc, -} - -// `OwnedSemaphorePermit` does not implement `Clone`, -// so manually clone all elements, while keeping `permit` -// `None` across clones. -impl Clone for DatabaseReadHandle { - fn clone(&self) -> Self { - Self { - pool: Arc::clone(&self.pool), - semaphore: self.semaphore.clone(), - permit: None, - env: Arc::clone(&self.env), - } - } -} - -impl DatabaseReadHandle { - /// Initialize the `DatabaseReader` thread-pool backed by `rayon`. - /// - /// This spawns `N` amount of `DatabaseReader`'s - /// attached to `env` and returns a handle to the pool. - /// - /// Should be called _once_ per actual database. - #[cold] - #[inline(never)] // Only called once. - pub(super) fn init(env: &Arc, reader_threads: ReaderThreads) -> Self { - // How many reader threads to spawn? - let reader_count = reader_threads.as_threads().get(); - - // Spawn `rayon` reader threadpool. - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(reader_count) - .thread_name(|i| format!("cuprate_helper::service::read::DatabaseReader{i}")) - .build() - .unwrap(); - - // Create a semaphore with the same amount of - // permits as the amount of reader threads. - let semaphore = PollSemaphore::new(Arc::new(Semaphore::new(reader_count))); - - // Return a handle to the pool. - Self { - pool: Arc::new(pool), - semaphore, - permit: None, - env: Arc::clone(env), - } - } - - /// Access to the actual database environment. - /// - /// # ⚠️ Warning - /// This function gives you access to the actual - /// underlying database connected to by `self`. - /// - /// I.e. it allows you to read/write data _directly_ - /// instead of going through a request. - /// - /// Be warned that using the database directly - /// in this manner has not been tested. - #[inline] - pub const fn env(&self) -> &Arc { - &self.env - } -} - -impl tower::Service for DatabaseReadHandle { - type Response = BCResponse; - type Error = RuntimeError; - type Future = ResponseReceiver; - - #[inline] - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - // Check if we already have a permit. - if self.permit.is_some() { - return Poll::Ready(Ok(())); - } - - // Acquire a permit before returning `Ready`. - let permit = - ready!(self.semaphore.poll_acquire(cx)).expect("this semaphore is never closed"); - - self.permit = Some(permit); - Poll::Ready(Ok(())) - } - - #[inline] - fn call(&mut self, request: BCReadRequest) -> Self::Future { - let permit = self - .permit - .take() - .expect("poll_ready() should have acquire a permit before calling call()"); - - // Response channel we `.await` on. - let (response_sender, receiver) = oneshot::channel(); - - // Spawn the request in the rayon DB thread-pool. - // - // Note that this uses `self.pool` instead of `rayon::spawn` - // such that any `rayon` parallel code that runs within - // the passed closure uses the same `rayon` threadpool. - // - // INVARIANT: - // The below `DatabaseReader` function impl block relies on this behavior. - let env = Arc::clone(&self.env); - self.pool.spawn(move || { - let _permit: OwnedSemaphorePermit = permit; - map_request(&env, request, response_sender); - }); // drop(permit/env); - - InfallibleOneshotReceiver::from(receiver) - } + pool: Arc, +) -> BlockchainReadHandle { + DatabaseReadService::new(env, pool, map_request) } //---------------------------------------------------------------------------------------------------- Request Mapping @@ -194,17 +75,16 @@ impl tower::Service for DatabaseReadHandle { /// The basic structure is: /// 1. `Request` is mapped to a handler function /// 2. Handler function is called -/// 3. [`BCResponse`] is sent +/// 3. [`BlockchainResponse`] is returned fn map_request( - env: &ConcreteEnv, // Access to the database - request: BCReadRequest, // The request we must fulfill - response_sender: ResponseSender, // The channel we must send the response back to -) { - use BCReadRequest as R; + env: &ConcreteEnv, // Access to the database + request: BlockchainReadRequest, // The request we must fulfill +) -> ResponseResult { + use BlockchainReadRequest as R; /* SOMEDAY: pre-request handling, run some code for each request? */ - let response = match request { + match request { R::BlockExtendedHeader(block) => block_extended_header(env, block), R::BlockHash(block, chain) => block_hash(env, block, chain), R::FindBlock(_) => todo!("Add alt blocks to DB"), @@ -219,11 +99,6 @@ fn map_request( R::KeyImagesSpent(set) => key_images_spent(env, set), R::CompactChainHistory => compact_chain_history(env), R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids), - }; - - if let Err(e) = response_sender.send(response) { - // TODO: use tracing. - println!("database reader failed to send response: {e:?}"); } /* SOMEDAY: post-request handling, run some code for each request? */ @@ -300,7 +175,7 @@ macro_rules! get_tables { // TODO: The overhead of parallelism may be too much for every request, perfomace test to find optimal // amount of parallelism. -/// [`BCReadRequest::BlockExtendedHeader`]. +/// [`BlockchainReadRequest::BlockExtendedHeader`]. #[inline] fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult { // Single-threaded, no `ThreadLocal` required. @@ -308,12 +183,12 @@ fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> Respon let tx_ro = env_inner.tx_ro()?; let tables = env_inner.open_tables(&tx_ro)?; - Ok(BCResponse::BlockExtendedHeader( + Ok(BlockchainResponse::BlockExtendedHeader( get_block_extended_header_from_height(&block_height, &tables)?, )) } -/// [`BCReadRequest::BlockHash`]. +/// [`BlockchainReadRequest::BlockHash`]. #[inline] fn block_hash(env: &ConcreteEnv, block_height: BlockHeight, chain: Chain) -> ResponseResult { // Single-threaded, no `ThreadLocal` required. @@ -326,10 +201,10 @@ fn block_hash(env: &ConcreteEnv, block_height: BlockHeight, chain: Chain) -> Res Chain::Alt(_) => todo!("Add alt blocks to DB"), }; - Ok(BCResponse::BlockHash(block_hash)) + Ok(BlockchainResponse::BlockHash(block_hash)) } -/// [`BCReadRequest::FilterUnknownHashes`]. +/// [`BlockchainReadRequest::FilterUnknownHashes`]. #[inline] fn filter_unknown_hashes(env: &ConcreteEnv, mut hashes: HashSet) -> ResponseResult { // Single-threaded, no `ThreadLocal` required. @@ -353,11 +228,11 @@ fn filter_unknown_hashes(env: &ConcreteEnv, mut hashes: HashSet) -> R if let Some(e) = err { Err(e) } else { - Ok(BCResponse::FilterUnknownHashes(hashes)) + Ok(BlockchainResponse::FilterUnknownHashes(hashes)) } } -/// [`BCReadRequest::BlockExtendedHeaderInRange`]. +/// [`BlockchainReadRequest::BlockExtendedHeaderInRange`]. #[inline] fn block_extended_header_in_range( env: &ConcreteEnv, @@ -382,10 +257,10 @@ fn block_extended_header_in_range( Chain::Alt(_) => todo!("Add alt blocks to DB"), }; - Ok(BCResponse::BlockExtendedHeaderInRange(vec)) + Ok(BlockchainResponse::BlockExtendedHeaderInRange(vec)) } -/// [`BCReadRequest::ChainHeight`]. +/// [`BlockchainReadRequest::ChainHeight`]. #[inline] fn chain_height(env: &ConcreteEnv) -> ResponseResult { // Single-threaded, no `ThreadLocal` required. @@ -398,10 +273,10 @@ fn chain_height(env: &ConcreteEnv) -> ResponseResult { let block_hash = get_block_info(&chain_height.saturating_sub(1), &table_block_infos)?.block_hash; - Ok(BCResponse::ChainHeight(chain_height, block_hash)) + Ok(BlockchainResponse::ChainHeight(chain_height, block_hash)) } -/// [`BCReadRequest::GeneratedCoins`]. +/// [`BlockchainReadRequest::GeneratedCoins`]. #[inline] fn generated_coins(env: &ConcreteEnv, height: u64) -> ResponseResult { // Single-threaded, no `ThreadLocal` required. @@ -409,13 +284,12 @@ fn generated_coins(env: &ConcreteEnv, height: u64) -> ResponseResult { let tx_ro = env_inner.tx_ro()?; let table_block_infos = env_inner.open_db_ro::(&tx_ro)?; - Ok(BCResponse::GeneratedCoins(cumulative_generated_coins( - &height, - &table_block_infos, - )?)) + Ok(BlockchainResponse::GeneratedCoins( + cumulative_generated_coins(&height, &table_block_infos)?, + )) } -/// [`BCReadRequest::Outputs`]. +/// [`BlockchainReadRequest::Outputs`]. #[inline] fn outputs(env: &ConcreteEnv, outputs: HashMap>) -> ResponseResult { // Prepare tx/tables in `ThreadLocal`. @@ -453,10 +327,10 @@ fn outputs(env: &ConcreteEnv, outputs: HashMap>) -> }) .collect::>, RuntimeError>>()?; - Ok(BCResponse::Outputs(map)) + Ok(BlockchainResponse::Outputs(map)) } -/// [`BCReadRequest::NumberOutputsWithAmount`]. +/// [`BlockchainReadRequest::NumberOutputsWithAmount`]. #[inline] fn number_outputs_with_amount(env: &ConcreteEnv, amounts: Vec) -> ResponseResult { // Prepare tx/tables in `ThreadLocal`. @@ -498,10 +372,10 @@ fn number_outputs_with_amount(env: &ConcreteEnv, amounts: Vec) -> Respon }) .collect::, RuntimeError>>()?; - Ok(BCResponse::NumberOutputsWithAmount(map)) + Ok(BlockchainResponse::NumberOutputsWithAmount(map)) } -/// [`BCReadRequest::KeyImagesSpent`]. +/// [`BlockchainReadRequest::KeyImagesSpent`]. #[inline] fn key_images_spent(env: &ConcreteEnv, key_images: HashSet) -> ResponseResult { // Prepare tx/tables in `ThreadLocal`. @@ -532,13 +406,13 @@ fn key_images_spent(env: &ConcreteEnv, key_images: HashSet) -> Respons // Else, `Ok(false)` will continue the iterator. .find_any(|result| !matches!(result, Ok(false))) { - None | Some(Ok(false)) => Ok(BCResponse::KeyImagesSpent(false)), // Key image was NOT found. - Some(Ok(true)) => Ok(BCResponse::KeyImagesSpent(true)), // Key image was found. + None | Some(Ok(false)) => Ok(BlockchainResponse::KeyImagesSpent(false)), // Key image was NOT found. + Some(Ok(true)) => Ok(BlockchainResponse::KeyImagesSpent(true)), // Key image was found. Some(Err(e)) => Err(e), // A database error occurred. } } -/// [`BCReadRequest::CompactChainHistory`] +/// [`BlockchainReadRequest::CompactChainHistory`] fn compact_chain_history(env: &ConcreteEnv) -> ResponseResult { let env_inner = env.env_inner(); let tx_ro = env_inner.tx_ro()?; @@ -568,13 +442,13 @@ fn compact_chain_history(env: &ConcreteEnv) -> ResponseResult { block_ids.push(get_block_info(&0, &table_block_infos)?.block_hash); } - Ok(BCResponse::CompactChainHistory { + Ok(BlockchainResponse::CompactChainHistory { cumulative_difficulty, block_ids, }) } -/// [`BCReadRequest::FindFirstUnknown`] +/// [`BlockchainReadRequest::FindFirstUnknown`] /// /// # Invariant /// `block_ids` must be sorted in chronological block order, or else @@ -606,12 +480,12 @@ fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseRes } Ok(if idx == block_ids.len() { - BCResponse::FindFirstUnknown(None) + BlockchainResponse::FindFirstUnknown(None) } else if idx == 0 { - BCResponse::FindFirstUnknown(Some((0, 0))) + BlockchainResponse::FindFirstUnknown(Some((0, 0))) } else { let last_known_height = get_block_height(&block_ids[idx - 1], &table_block_heights)?; - BCResponse::FindFirstUnknown(Some((idx, last_known_height + 1))) + BlockchainResponse::FindFirstUnknown(Some((idx, last_known_height + 1))) }) } diff --git a/storage/blockchain/src/service/tests.rs b/storage/blockchain/src/service/tests.rs index c00e32f..72b60e2 100644 --- a/storage/blockchain/src/service/tests.rs +++ b/storage/blockchain/src/service/tests.rs @@ -18,7 +18,7 @@ use tower::{Service, ServiceExt}; use cuprate_database::{ConcreteEnv, DatabaseIter, DatabaseRo, Env, EnvInner, RuntimeError}; use cuprate_test_utils::data::{block_v16_tx0, block_v1_tx2, block_v9_tx3}; use cuprate_types::{ - blockchain::{BCReadRequest, BCResponse, BCWriteRequest}, + blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest}, Chain, OutputOnChain, VerifiedBlockInformation, }; @@ -29,7 +29,7 @@ use crate::{ blockchain::chain_height, output::id_to_output_on_chain, }, - service::{init, DatabaseReadHandle, DatabaseWriteHandle}, + service::{init, BlockchainReadHandle, BlockchainWriteHandle}, tables::{OpenTables, Tables, TablesIter}, tests::AssertTableLen, types::{Amount, AmountIndex, PreRctOutputId}, @@ -38,8 +38,8 @@ use crate::{ //---------------------------------------------------------------------------------------------------- Helper functions /// Initialize the `service`. fn init_service() -> ( - DatabaseReadHandle, - DatabaseWriteHandle, + BlockchainReadHandle, + BlockchainWriteHandle, Arc, tempfile::TempDir, ) { @@ -48,8 +48,7 @@ fn init_service() -> ( .db_directory(Cow::Owned(tempdir.path().into())) .low_power() .build(); - let (reader, writer) = init(config).unwrap(); - let env = reader.env().clone(); + let (reader, writer, env) = init(config).unwrap(); (reader, writer, env, tempdir) } @@ -82,10 +81,10 @@ async fn test_template( block.height = i as u64; // Request a block to be written, assert it was written. - let request = BCWriteRequest::WriteBlock(block); + let request = BlockchainWriteRequest::WriteBlock(block); let response_channel = writer.call(request); let response = response_channel.await.unwrap(); - assert_eq!(response, BCResponse::WriteBlockOk); + assert_eq!(response, BlockchainResponse::WriteBlockOk); } //----------------------------------------------------------------------- Reset the transaction @@ -101,36 +100,36 @@ async fn test_template( // Next few lines are just for preparing the expected responses, // see further below for usage. - let extended_block_header_0 = Ok(BCResponse::BlockExtendedHeader( + let extended_block_header_0 = Ok(BlockchainResponse::BlockExtendedHeader( get_block_extended_header_from_height(&0, &tables).unwrap(), )); let extended_block_header_1 = if block_fns.len() > 1 { - Ok(BCResponse::BlockExtendedHeader( + Ok(BlockchainResponse::BlockExtendedHeader( get_block_extended_header_from_height(&1, &tables).unwrap(), )) } else { Err(RuntimeError::KeyNotFound) }; - let block_hash_0 = Ok(BCResponse::BlockHash( + let block_hash_0 = Ok(BlockchainResponse::BlockHash( get_block_info(&0, tables.block_infos()).unwrap().block_hash, )); let block_hash_1 = if block_fns.len() > 1 { - Ok(BCResponse::BlockHash( + Ok(BlockchainResponse::BlockHash( get_block_info(&1, tables.block_infos()).unwrap().block_hash, )) } else { Err(RuntimeError::KeyNotFound) }; - let range_0_1 = Ok(BCResponse::BlockExtendedHeaderInRange(vec![ + let range_0_1 = Ok(BlockchainResponse::BlockExtendedHeaderInRange(vec![ get_block_extended_header_from_height(&0, &tables).unwrap(), ])); let range_0_2 = if block_fns.len() >= 2 { - Ok(BCResponse::BlockExtendedHeaderInRange(vec![ + Ok(BlockchainResponse::BlockExtendedHeaderInRange(vec![ get_block_extended_header_from_height(&0, &tables).unwrap(), get_block_extended_header_from_height(&1, &tables).unwrap(), ])) @@ -143,13 +142,15 @@ async fn test_template( let chain_height = { let block_info = get_block_info(&test_chain_height.saturating_sub(1), tables.block_infos()).unwrap(); - Ok(BCResponse::ChainHeight( + Ok(BlockchainResponse::ChainHeight( test_chain_height, block_info.block_hash, )) }; - let cumulative_generated_coins = Ok(BCResponse::GeneratedCoins(cumulative_generated_coins)); + let cumulative_generated_coins = Ok(BlockchainResponse::GeneratedCoins( + cumulative_generated_coins, + )); let num_req = tables .outputs_iter() @@ -159,7 +160,7 @@ async fn test_template( .map(|key| key.amount) .collect::>(); - let num_resp = Ok(BCResponse::NumberOutputsWithAmount( + let num_resp = Ok(BlockchainResponse::NumberOutputsWithAmount( num_req .iter() .map(|amount| match tables.num_outputs().get(amount) { @@ -174,36 +175,45 @@ async fn test_template( // Contains a fake non-spent key-image. let ki_req = HashSet::from([[0; 32]]); - let ki_resp = Ok(BCResponse::KeyImagesSpent(false)); + let ki_resp = Ok(BlockchainResponse::KeyImagesSpent(false)); //----------------------------------------------------------------------- Assert expected response // Assert read requests lead to the expected responses. for (request, expected_response) in [ ( - BCReadRequest::BlockExtendedHeader(0), + BlockchainReadRequest::BlockExtendedHeader(0), extended_block_header_0, ), ( - BCReadRequest::BlockExtendedHeader(1), + BlockchainReadRequest::BlockExtendedHeader(1), extended_block_header_1, ), - (BCReadRequest::BlockHash(0, Chain::Main), block_hash_0), - (BCReadRequest::BlockHash(1, Chain::Main), block_hash_1), ( - BCReadRequest::BlockExtendedHeaderInRange(0..1, Chain::Main), + BlockchainReadRequest::BlockHash(0, Chain::Main), + block_hash_0, + ), + ( + BlockchainReadRequest::BlockHash(1, Chain::Main), + block_hash_1, + ), + ( + BlockchainReadRequest::BlockExtendedHeaderInRange(0..1, Chain::Main), range_0_1, ), ( - BCReadRequest::BlockExtendedHeaderInRange(0..2, Chain::Main), + BlockchainReadRequest::BlockExtendedHeaderInRange(0..2, Chain::Main), range_0_2, ), - (BCReadRequest::ChainHeight, chain_height), + (BlockchainReadRequest::ChainHeight, chain_height), ( - BCReadRequest::GeneratedCoins(test_chain_height), + BlockchainReadRequest::GeneratedCoins(test_chain_height), cumulative_generated_coins, ), - (BCReadRequest::NumberOutputsWithAmount(num_req), num_resp), - (BCReadRequest::KeyImagesSpent(ki_req), ki_resp), + ( + BlockchainReadRequest::NumberOutputsWithAmount(num_req), + num_resp, + ), + (BlockchainReadRequest::KeyImagesSpent(ki_req), ki_resp), ] { let response = reader.clone().oneshot(request).await; println!("response: {response:#?}, expected_response: {expected_response:#?}"); @@ -217,10 +227,10 @@ async fn test_template( // Assert each key image we inserted comes back as "spent". for key_image in tables.key_images_iter().keys().unwrap() { let key_image = key_image.unwrap(); - let request = BCReadRequest::KeyImagesSpent(HashSet::from([key_image])); + let request = BlockchainReadRequest::KeyImagesSpent(HashSet::from([key_image])); let response = reader.clone().oneshot(request).await; println!("response: {response:#?}, key_image: {key_image:#?}"); - assert_eq!(response.unwrap(), BCResponse::KeyImagesSpent(true)); + assert_eq!(response.unwrap(), BlockchainResponse::KeyImagesSpent(true)); } //----------------------------------------------------------------------- Output checks @@ -281,10 +291,10 @@ async fn test_template( .collect::>(); // Send a request for every output we inserted before. - let request = BCReadRequest::Outputs(map.clone()); + let request = BlockchainReadRequest::Outputs(map.clone()); let response = reader.clone().oneshot(request).await; println!("Response::Outputs response: {response:#?}"); - let Ok(BCResponse::Outputs(response)) = response else { + let Ok(BlockchainResponse::Outputs(response)) = response else { panic!("{response:#?}") }; diff --git a/storage/blockchain/src/service/types.rs b/storage/blockchain/src/service/types.rs index c6ee67e..9cd86e9 100644 --- a/storage/blockchain/src/service/types.rs +++ b/storage/blockchain/src/service/types.rs @@ -1,30 +1,20 @@ //! Database service type aliases. -//! -//! Only used internally for our `tower::Service` impls. //---------------------------------------------------------------------------------------------------- Use -use futures::channel::oneshot::Sender; - use cuprate_database::RuntimeError; -use cuprate_helper::asynch::InfallibleOneshotReceiver; -use cuprate_types::blockchain::BCResponse; +use cuprate_database_service::{DatabaseReadService, DatabaseWriteHandle}; +use cuprate_types::blockchain::{ + BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest, +}; //---------------------------------------------------------------------------------------------------- Types /// The actual type of the response. /// -/// Either our [`BCResponse`], or a database error occurred. -pub(super) type ResponseResult = Result; +/// Either our [`BlockchainResponse`], or a database error occurred. +pub(super) type ResponseResult = Result; -/// The `Receiver` channel that receives the read response. -/// -/// This is owned by the caller (the reader/writer thread) -/// who `.await`'s for the response. -/// -/// The channel itself should never fail, -/// but the actual database operation might. -pub(super) type ResponseReceiver = InfallibleOneshotReceiver; +/// The blockchain database write service. +pub type BlockchainWriteHandle = DatabaseWriteHandle; -/// The `Sender` channel for the response. -/// -/// The database reader/writer thread uses this to send the database result to the caller. -pub(super) type ResponseSender = Sender; +/// The blockchain database read service. +pub type BlockchainReadHandle = DatabaseReadService; diff --git a/storage/blockchain/src/service/write.rs b/storage/blockchain/src/service/write.rs index 041ae7b..816afc4 100644 --- a/storage/blockchain/src/service/write.rs +++ b/storage/blockchain/src/service/write.rs @@ -1,209 +1,34 @@ //! Database writer thread definitions and logic. //---------------------------------------------------------------------------------------------------- Import -use std::{ - sync::Arc, - task::{Context, Poll}, -}; - -use futures::channel::oneshot; +use std::sync::Arc; use cuprate_database::{ConcreteEnv, Env, EnvInner, RuntimeError, TxRw}; -use cuprate_helper::asynch::InfallibleOneshotReceiver; +use cuprate_database_service::DatabaseWriteHandle; use cuprate_types::{ - blockchain::{BCResponse, BCWriteRequest}, + blockchain::{BlockchainResponse, BlockchainWriteRequest}, VerifiedBlockInformation, }; use crate::{ - service::types::{ResponseReceiver, ResponseResult, ResponseSender}, + service::types::{BlockchainWriteHandle, ResponseResult}, tables::OpenTables, }; -//---------------------------------------------------------------------------------------------------- Constants -/// Name of the writer thread. -const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter"); - -//---------------------------------------------------------------------------------------------------- DatabaseWriteHandle -/// Write handle to the database. -/// -/// This is handle that allows `async`hronously writing to the database, -/// it is not [`Clone`]able as there is only ever 1 place within Cuprate -/// that writes. -/// -/// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`] & [`BCWriteRequest`] -/// will return an `async`hronous channel that can be `.await`ed upon -/// to receive the corresponding [`BCResponse`]. -#[derive(Debug)] -pub struct DatabaseWriteHandle { - /// Sender channel to the database write thread-pool. - /// - /// We provide the response channel for the thread-pool. - pub(super) sender: crossbeam::channel::Sender<(BCWriteRequest, ResponseSender)>, +//---------------------------------------------------------------------------------------------------- init_write_service +/// Initialize the blockchain write service from a [`ConcreteEnv`]. +pub fn init_write_service(env: Arc) -> BlockchainWriteHandle { + DatabaseWriteHandle::init(env, handle_blockchain_request) } -impl DatabaseWriteHandle { - /// Initialize the single `DatabaseWriter` thread. - #[cold] - #[inline(never)] // Only called once. - pub(super) fn init(env: Arc) -> Self { - // Initialize `Request/Response` channels. - let (sender, receiver) = crossbeam::channel::unbounded(); - - // Spawn the writer. - std::thread::Builder::new() - .name(WRITER_THREAD_NAME.into()) - .spawn(move || { - let this = DatabaseWriter { receiver, env }; - DatabaseWriter::main(this); - }) - .unwrap(); - - Self { sender } - } -} - -impl tower::Service for DatabaseWriteHandle { - type Response = BCResponse; - type Error = RuntimeError; - type Future = ResponseReceiver; - - #[inline] - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - #[inline] - fn call(&mut self, request: BCWriteRequest) -> Self::Future { - // Response channel we `.await` on. - let (response_sender, receiver) = oneshot::channel(); - - // Send the write request. - self.sender.send((request, response_sender)).unwrap(); - - InfallibleOneshotReceiver::from(receiver) - } -} - -//---------------------------------------------------------------------------------------------------- DatabaseWriter -/// The single database writer thread. -pub(super) struct DatabaseWriter { - /// Receiver side of the database request channel. - /// - /// Any caller can send some requests to this channel. - /// They send them alongside another `Response` channel, - /// which we will eventually send to. - receiver: crossbeam::channel::Receiver<(BCWriteRequest, ResponseSender)>, - - /// Access to the database. - env: Arc, -} - -impl Drop for DatabaseWriter { - fn drop(&mut self) { - // TODO: log the writer thread has exited? - } -} - -impl DatabaseWriter { - /// The `DatabaseWriter`'s main function. - /// - /// The writer just loops in this function, handling requests forever - /// until the request channel is dropped or a panic occurs. - #[cold] - #[inline(never)] // Only called once. - fn main(self) { - // 1. Hang on request channel - // 2. Map request to some database function - // 3. Execute that function, get the result - // 4. Return the result via channel - 'main: loop { - let Ok((request, response_sender)) = self.receiver.recv() else { - // If this receive errors, it means that the channel is empty - // and disconnected, meaning the other side (all senders) have - // been dropped. This means "shutdown", and we return here to - // exit the thread. - // - // Since the channel is empty, it means we've also processed - // all requests. Since it is disconnected, it means future - // ones cannot come in. - return; - }; - - /// How many times should we retry handling the request on resize errors? - /// - /// This is 1 on automatically resizing databases, meaning there is only 1 iteration. - const REQUEST_RETRY_LIMIT: usize = if ConcreteEnv::MANUAL_RESIZE { 3 } else { 1 }; - - // Map [`Request`]'s to specific database functions. - // - // Both will: - // 1. Map the request to a function - // 2. Call the function - // 3. (manual resize only) If resize is needed, resize and retry - // 4. (manual resize only) Redo step {1, 2} - // 5. Send the function's `Result` back to the requester - // - // FIXME: there's probably a more elegant way - // to represent this retry logic with recursive - // functions instead of a loop. - 'retry: for retry in 0..REQUEST_RETRY_LIMIT { - // FIXME: will there be more than 1 write request? - // this won't have to be an enum. - let response = match &request { - BCWriteRequest::WriteBlock(block) => write_block(&self.env, block), - }; - - // If the database needs to resize, do so. - if ConcreteEnv::MANUAL_RESIZE && matches!(response, Err(RuntimeError::ResizeNeeded)) - { - // If this is the last iteration of the outer `for` loop and we - // encounter a resize error _again_, it means something is wrong. - assert_ne!( - retry, REQUEST_RETRY_LIMIT, - "database resize failed maximum of {REQUEST_RETRY_LIMIT} times" - ); - - // Resize the map, and retry the request handling loop. - // - // FIXME: - // We could pass in custom resizes to account for - // batches, i.e., we're about to add ~5GB of data, - // add that much instead of the default 1GB. - // - let old = self.env.current_map_size(); - let new = self.env.resize_map(None); - - // TODO: use tracing. - println!("resizing database memory map, old: {old}B, new: {new}B"); - - // Try handling the request again. - continue 'retry; - } - - // Automatically resizing databases should not be returning a resize error. - #[cfg(debug_assertions)] - if !ConcreteEnv::MANUAL_RESIZE { - assert!( - !matches!(response, Err(RuntimeError::ResizeNeeded)), - "auto-resizing database returned a ResizeNeeded error" - ); - } - - // Send the response back, whether if it's an `Ok` or `Err`. - if let Err(e) = response_sender.send(response) { - // TODO: use tracing. - println!("database writer failed to send response: {e:?}"); - } - - continue 'main; - } - - // Above retry loop should either: - // - continue to the next ['main] loop or... - // - ...retry until panic - unreachable!(); - } +//---------------------------------------------------------------------------------------------------- handle_bc_request +/// Handle an incoming [`BlockchainWriteRequest`], returning a [`BlockchainResponse`]. +fn handle_blockchain_request( + env: &ConcreteEnv, + req: &BlockchainWriteRequest, +) -> Result { + match req { + BlockchainWriteRequest::WriteBlock(block) => write_block(env, block), } } @@ -216,7 +41,7 @@ impl DatabaseWriter { // Each function will return the [`Response`] that we // should send back to the caller in [`map_request()`]. -/// [`BCWriteRequest::WriteBlock`]. +/// [`BlockchainWriteRequest::WriteBlock`]. #[inline] fn write_block(env: &ConcreteEnv, block: &VerifiedBlockInformation) -> ResponseResult { let env_inner = env.env_inner(); @@ -230,7 +55,7 @@ fn write_block(env: &ConcreteEnv, block: &VerifiedBlockInformation) -> ResponseR match result { Ok(()) => { TxRw::commit(tx_rw)?; - Ok(BCResponse::WriteBlockOk) + Ok(BlockchainResponse::WriteBlockOk) } Err(e) => { // INVARIANT: ensure database atomicity by aborting diff --git a/storage/service/Cargo.toml b/storage/service/Cargo.toml new file mode 100644 index 0000000..ed46b35 --- /dev/null +++ b/storage/service/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "cuprate-database-service" +version = "0.1.0" +edition = "2021" +description = "Cuprate's database service abstraction" +license = "MIT" +authors = ["Boog900"] +repository = "https://github.com/Cuprate/cuprate/tree/main/storage/service" +keywords = ["cuprate", "service", "database"] + +[dependencies] +cuprate-database = { path = "../database" } +cuprate-helper = { path = "../../helper", features = ["fs", "thread", "map"] } + +serde = { workspace = true, optional = true } +rayon = { workspace = true } +tower = { workspace = true } +futures = { workspace = true } +crossbeam = { workspace = true, features = ["std"] } + +[lints] +workspace = true diff --git a/storage/service/README.md b/storage/service/README.md new file mode 100644 index 0000000..32e743c --- /dev/null +++ b/storage/service/README.md @@ -0,0 +1,7 @@ +# Cuprate's `tower::Service` database abstraction. + +This crate contains the building blocks for creating a [`tower::Service`] interface to [`cuprate_blockchain`](https://doc.cuprate.org/cuprate_blockchain). + +It is split into 2 `tower::Service`s: +1. A [read service](crate::DatabaseReadService) which is backed by a [`rayon::ThreadPool`] +1. A [write service](crate::DatabaseWriteHandle) which spawns a single thread to handle write requests diff --git a/storage/service/src/lib.rs b/storage/service/src/lib.rs new file mode 100644 index 0000000..51d896a --- /dev/null +++ b/storage/service/src/lib.rs @@ -0,0 +1,8 @@ +#![doc = include_str!("../README.md")] + +mod reader_threads; +mod service; + +pub use reader_threads::{init_thread_pool, ReaderThreads}; + +pub use service::{DatabaseReadService, DatabaseWriteHandle}; diff --git a/storage/blockchain/src/config/reader_threads.rs b/storage/service/src/reader_threads.rs similarity index 84% rename from storage/blockchain/src/config/reader_threads.rs rename to storage/service/src/reader_threads.rs index d4dd6ac..72f619a 100644 --- a/storage/blockchain/src/config/reader_threads.rs +++ b/storage/service/src/reader_threads.rs @@ -1,23 +1,36 @@ -//! Database [`Env`](crate::Env) configuration. +//! Reader thread-pool configuration and initiation. //! -//! This module contains the main [`Config`]uration struct -//! for the database [`Env`](crate::Env)ironment, and data -//! structures related to any configuration setting. +//! This module contains [`ReaderThreads`] which allow specifying the amount of +//! reader threads for the [`rayon::ThreadPool`]. //! -//! These configurations are processed at runtime, meaning -//! the `Env` can/will dynamically adjust its behavior -//! based on these values. +//! It also contains [`init_thread_pool`] which initiates the thread-pool. //---------------------------------------------------------------------------------------------------- Import -use std::num::NonZeroUsize; +use std::{num::NonZeroUsize, sync::Arc}; +use rayon::ThreadPool; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +//---------------------------------------------------------------------------------------------------- init_thread_pool +/// Initialize the reader thread-pool backed by `rayon`. +pub fn init_thread_pool(reader_threads: ReaderThreads) -> Arc { + // How many reader threads to spawn? + let reader_count = reader_threads.as_threads().get(); + + Arc::new( + rayon::ThreadPoolBuilder::new() + .num_threads(reader_count) + .thread_name(|i| format!("{}::DatabaseReader({i})", module_path!())) + .build() + .unwrap(), + ) +} + //---------------------------------------------------------------------------------------------------- ReaderThreads -/// Amount of database reader threads to spawn when using [`service`](crate::service). +/// Amount of database reader threads to spawn. /// -/// This controls how many reader thread `service`'s +/// This controls how many reader threads the [`DatabaseReadService`](crate::DatabaseReadService) /// thread-pool will spawn to receive and send requests/responses. /// /// # Invariant @@ -48,7 +61,7 @@ pub enum ReaderThreads { /// as such, it is equal to [`ReaderThreads::OnePerThread`]. /// /// ```rust - /// # use cuprate_blockchain::config::*; + /// # use cuprate_database_service::*; /// let reader_threads = ReaderThreads::from(0_usize); /// assert!(matches!(reader_threads, ReaderThreads::OnePerThread)); /// ``` @@ -80,7 +93,7 @@ pub enum ReaderThreads { /// non-zero, but not 1 thread, the minimum value 1 will be returned. /// /// ```rust - /// # use cuprate_blockchain::config::*; + /// # use cuprate_database_service::ReaderThreads; /// assert_eq!(ReaderThreads::Percent(0.000000001).as_threads().get(), 1); /// ``` Percent(f32), @@ -96,7 +109,7 @@ impl ReaderThreads { /// /// # Example /// ```rust - /// use cuprate_blockchain::config::ReaderThreads as R; + /// use cuprate_database_service::ReaderThreads as R; /// /// let total_threads: std::num::NonZeroUsize = /// cuprate_helper::thread::threads(); diff --git a/storage/service/src/service.rs b/storage/service/src/service.rs new file mode 100644 index 0000000..cd4957f --- /dev/null +++ b/storage/service/src/service.rs @@ -0,0 +1,5 @@ +mod read; +mod write; + +pub use read::DatabaseReadService; +pub use write::DatabaseWriteHandle; diff --git a/storage/service/src/service/read.rs b/storage/service/src/service/read.rs new file mode 100644 index 0000000..0ab6853 --- /dev/null +++ b/storage/service/src/service/read.rs @@ -0,0 +1,95 @@ +use std::{ + sync::Arc, + task::{Context, Poll}, +}; + +use futures::channel::oneshot; +use rayon::ThreadPool; +use tower::Service; + +use cuprate_database::{ConcreteEnv, RuntimeError}; +use cuprate_helper::asynch::InfallibleOneshotReceiver; + +/// The [`rayon::ThreadPool`] service. +/// +/// Uses an inner request handler and a rayon thread-pool to asynchronously handle requests. +/// +/// - `Req` is the request type +/// - `Res` is the response type +pub struct DatabaseReadService { + /// Handle to the custom `rayon` DB reader thread-pool. + /// + /// Requests are [`rayon::ThreadPool::spawn`]ed in this thread-pool, + /// and responses are returned via a channel we (the caller) provide. + pool: Arc, + + /// The function used to handle request. + inner_handler: Arc Result + Send + Sync + 'static>, +} + +// Deriving [`Clone`] means `Req` & `Res` need to be `Clone`, even if they aren't. +impl Clone for DatabaseReadService { + fn clone(&self) -> Self { + Self { + pool: Arc::clone(&self.pool), + inner_handler: Arc::clone(&self.inner_handler), + } + } +} + +impl DatabaseReadService +where + Req: Send + 'static, + Res: Send + 'static, +{ + /// Creates the [`DatabaseReadService`] with the provided backing thread-pool. + /// + /// Should be called _once_ per actual database, although nothing bad will happen, cloning the [`DatabaseReadService`] + /// is the correct way to get multiple handles to the database. + #[cold] + #[inline(never)] // Only called once. + pub fn new( + env: Arc, + pool: Arc, + req_handler: impl Fn(&ConcreteEnv, Req) -> Result + Send + Sync + 'static, + ) -> Self { + let inner_handler = Arc::new(move |req| req_handler(&env, req)); + + Self { + pool, + inner_handler, + } + } +} + +impl Service for DatabaseReadService +where + Req: Send + 'static, + Res: Send + 'static, +{ + type Response = Res; + type Error = RuntimeError; + type Future = InfallibleOneshotReceiver>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Req) -> Self::Future { + // Response channel we `.await` on. + let (response_sender, receiver) = oneshot::channel(); + + let handler = Arc::clone(&self.inner_handler); + + // Spawn the request in the rayon DB thread-pool. + // + // Note that this uses `self.pool` instead of `rayon::spawn` + // such that any `rayon` parallel code that runs within + // the passed closure uses the same `rayon` threadpool. + self.pool.spawn(move || { + drop(response_sender.send(handler(req))); + }); + + InfallibleOneshotReceiver::from(receiver) + } +} diff --git a/storage/service/src/service/write.rs b/storage/service/src/service/write.rs new file mode 100644 index 0000000..f75d615 --- /dev/null +++ b/storage/service/src/service/write.rs @@ -0,0 +1,178 @@ +use std::{ + fmt::Debug, + sync::Arc, + task::{Context, Poll}, +}; + +use futures::channel::oneshot; + +use cuprate_database::{ConcreteEnv, Env, RuntimeError}; +use cuprate_helper::asynch::InfallibleOneshotReceiver; + +//---------------------------------------------------------------------------------------------------- Constants +/// Name of the writer thread. +const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter"); + +//---------------------------------------------------------------------------------------------------- DatabaseWriteHandle +/// Write handle to the database. +/// +/// This is handle that allows `async`hronously writing to the database. +/// +/// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`] +/// will return an `async`hronous channel that can be `.await`ed upon +/// to receive the corresponding response. +#[derive(Debug)] +pub struct DatabaseWriteHandle { + /// Sender channel to the database write thread-pool. + /// + /// We provide the response channel for the thread-pool. + pub(super) sender: + crossbeam::channel::Sender<(Req, oneshot::Sender>)>, +} + +impl DatabaseWriteHandle +where + Req: Send + 'static, + Res: Debug + Send + 'static, +{ + /// Initialize the single `DatabaseWriter` thread. + #[cold] + #[inline(never)] // Only called once. + pub fn init( + env: Arc, + inner_handler: impl Fn(&ConcreteEnv, &Req) -> Result + Send + 'static, + ) -> Self { + // Initialize `Request/Response` channels. + let (sender, receiver) = crossbeam::channel::unbounded(); + + // Spawn the writer. + std::thread::Builder::new() + .name(WRITER_THREAD_NAME.into()) + .spawn(move || database_writer(&env, &receiver, inner_handler)) + .unwrap(); + + Self { sender } + } +} + +impl tower::Service for DatabaseWriteHandle { + type Response = Res; + type Error = RuntimeError; + type Future = InfallibleOneshotReceiver>; + + #[inline] + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + #[inline] + fn call(&mut self, request: Req) -> Self::Future { + // Response channel we `.await` on. + let (response_sender, receiver) = oneshot::channel(); + + // Send the write request. + self.sender.send((request, response_sender)).unwrap(); + + InfallibleOneshotReceiver::from(receiver) + } +} + +//---------------------------------------------------------------------------------------------------- database_writer +/// The main function of the writer thread. +fn database_writer( + env: &ConcreteEnv, + receiver: &crossbeam::channel::Receiver<(Req, oneshot::Sender>)>, + inner_handler: impl Fn(&ConcreteEnv, &Req) -> Result, +) where + Req: Send + 'static, + Res: Debug + Send + 'static, +{ + // 1. Hang on request channel + // 2. Map request to some database function + // 3. Execute that function, get the result + // 4. Return the result via channel + 'main: loop { + let Ok((request, response_sender)) = receiver.recv() else { + // If this receive errors, it means that the channel is empty + // and disconnected, meaning the other side (all senders) have + // been dropped. This means "shutdown", and we return here to + // exit the thread. + // + // Since the channel is empty, it means we've also processed + // all requests. Since it is disconnected, it means future + // ones cannot come in. + return; + }; + + /// How many times should we retry handling the request on resize errors? + /// + /// This is 1 on automatically resizing databases, meaning there is only 1 iteration. + const REQUEST_RETRY_LIMIT: usize = if ConcreteEnv::MANUAL_RESIZE { 3 } else { 1 }; + + // Map [`Request`]'s to specific database functions. + // + // Both will: + // 1. Map the request to a function + // 2. Call the function + // 3. (manual resize only) If resize is needed, resize and retry + // 4. (manual resize only) Redo step {1, 2} + // 5. Send the function's `Result` back to the requester + // + // FIXME: there's probably a more elegant way + // to represent this retry logic with recursive + // functions instead of a loop. + 'retry: for retry in 0..REQUEST_RETRY_LIMIT { + // FIXME: will there be more than 1 write request? + // this won't have to be an enum. + let response = inner_handler(env, &request); + + // If the database needs to resize, do so. + if ConcreteEnv::MANUAL_RESIZE && matches!(response, Err(RuntimeError::ResizeNeeded)) { + // If this is the last iteration of the outer `for` loop and we + // encounter a resize error _again_, it means something is wrong. + assert_ne!( + retry, REQUEST_RETRY_LIMIT, + "database resize failed maximum of {REQUEST_RETRY_LIMIT} times" + ); + + // Resize the map, and retry the request handling loop. + // + // FIXME: + // We could pass in custom resizes to account for + // batches, i.e., we're about to add ~5GB of data, + // add that much instead of the default 1GB. + // + let old = env.current_map_size(); + let new = env.resize_map(None); + + // TODO: use tracing. + println!("resizing database memory map, old: {old}B, new: {new}B"); + + // Try handling the request again. + continue 'retry; + } + + // Automatically resizing databases should not be returning a resize error. + #[cfg(debug_assertions)] + if !ConcreteEnv::MANUAL_RESIZE { + assert!( + !matches!(response, Err(RuntimeError::ResizeNeeded)), + "auto-resizing database returned a ResizeNeeded error" + ); + } + + // Send the response back, whether if it's an `Ok` or `Err`. + if let Err(e) = response_sender.send(response) { + // TODO: use tracing. + println!("database writer failed to send response: {e:?}"); + } + + continue 'main; + } + + // Above retry loop should either: + // - continue to the next ['main] loop or... + // - ...retry until panic + unreachable!(); + } +} diff --git a/types/src/blockchain.rs b/types/src/blockchain.rs index 1ff06c2..f1a8a75 100644 --- a/types/src/blockchain.rs +++ b/types/src/blockchain.rs @@ -1,4 +1,4 @@ -//! Database [`BCReadRequest`]s, [`BCWriteRequest`]s, and [`BCResponse`]s. +//! Database [`BlockchainReadRequest`]s, [`BlockchainWriteRequest`]s, and [`BlockchainResponse`]s. //! //! Tests that assert particular requests lead to particular //! responses are also tested in Cuprate's blockchain database crate. @@ -14,14 +14,14 @@ use crate::types::{Chain, ExtendedBlockHeader, OutputOnChain, VerifiedBlockInfor //---------------------------------------------------------------------------------------------------- ReadRequest /// A read request to the blockchain database. /// -/// This pairs with [`BCResponse`], where each variant here -/// matches in name with a [`BCResponse`] variant. For example, -/// the proper response for a [`BCReadRequest::BlockHash`] -/// would be a [`BCResponse::BlockHash`]. +/// This pairs with [`BlockchainResponse`], where each variant here +/// matches in name with a [`BlockchainResponse`] variant. For example, +/// the proper response for a [`BlockchainReadRequest::BlockHash`] +/// would be a [`BlockchainResponse::BlockHash`]. /// /// See `Response` for the expected responses per `Request`. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum BCReadRequest { +pub enum BlockchainReadRequest { /// Request a block's extended header. /// /// The input is the block's height. @@ -104,10 +104,10 @@ pub enum BCReadRequest { /// A write request to the blockchain database. /// /// There is currently only 1 write request to the database, -/// as such, the only valid [`BCResponse`] to this request is -/// the proper response for a [`BCResponse::WriteBlockOk`]. +/// as such, the only valid [`BlockchainResponse`] to this request is +/// the proper response for a [`BlockchainResponse::WriteBlockOk`]. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum BCWriteRequest { +pub enum BlockchainWriteRequest { /// Request that a block be written to the database. /// /// Input is an already verified block. @@ -119,60 +119,60 @@ pub enum BCWriteRequest { /// /// These are the data types returned when using sending a `Request`. /// -/// This pairs with [`BCReadRequest`] and [`BCWriteRequest`], +/// This pairs with [`BlockchainReadRequest`] and [`BlockchainWriteRequest`], /// see those two for more info. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum BCResponse { +pub enum BlockchainResponse { //------------------------------------------------------ Reads - /// Response to [`BCReadRequest::BlockExtendedHeader`]. + /// Response to [`BlockchainReadRequest::BlockExtendedHeader`]. /// /// Inner value is the extended headed of the requested block. BlockExtendedHeader(ExtendedBlockHeader), - /// Response to [`BCReadRequest::BlockHash`]. + /// Response to [`BlockchainReadRequest::BlockHash`]. /// /// Inner value is the hash of the requested block. BlockHash([u8; 32]), - /// Response to [`BCReadRequest::FindBlock`]. + /// Response to [`BlockchainReadRequest::FindBlock`]. /// /// Inner value is the chain and height of the block if found. FindBlock(Option<(Chain, u64)>), - /// Response to [`BCReadRequest::FilterUnknownHashes`]. + /// Response to [`BlockchainReadRequest::FilterUnknownHashes`]. /// /// Inner value is the list of hashes that were in the main chain. FilterUnknownHashes(HashSet<[u8; 32]>), - /// Response to [`BCReadRequest::BlockExtendedHeaderInRange`]. + /// Response to [`BlockchainReadRequest::BlockExtendedHeaderInRange`]. /// /// Inner value is the list of extended header(s) of the requested block(s). BlockExtendedHeaderInRange(Vec), - /// Response to [`BCReadRequest::ChainHeight`]. + /// Response to [`BlockchainReadRequest::ChainHeight`]. /// /// Inner value is the chain height, and the top block's hash. ChainHeight(u64, [u8; 32]), - /// Response to [`BCReadRequest::GeneratedCoins`]. + /// Response to [`BlockchainReadRequest::GeneratedCoins`]. /// /// Inner value is the total amount of generated coins up to and including the chosen height, in atomic units. GeneratedCoins(u64), - /// Response to [`BCReadRequest::Outputs`]. + /// Response to [`BlockchainReadRequest::Outputs`]. /// /// Inner value is all the outputs requested, /// associated with their amount and amount index. Outputs(HashMap>), - /// Response to [`BCReadRequest::NumberOutputsWithAmount`]. + /// Response to [`BlockchainReadRequest::NumberOutputsWithAmount`]. /// /// Inner value is a `HashMap` of all the outputs requested where: /// - Key = output amount /// - Value = count of outputs with the same amount NumberOutputsWithAmount(HashMap), - /// Response to [`BCReadRequest::KeyImagesSpent`]. + /// Response to [`BlockchainReadRequest::KeyImagesSpent`]. /// /// The inner value is `true` if _any_ of the key images /// were spent (existed in the database already). @@ -180,7 +180,7 @@ pub enum BCResponse { /// The inner value is `false` if _none_ of the key images were spent. KeyImagesSpent(bool), - /// Response to [`BCReadRequest::CompactChainHistory`]. + /// Response to [`BlockchainReadRequest::CompactChainHistory`]. CompactChainHistory { /// A list of blocks IDs in our chain, starting with the most recent block, all the way to the genesis block. /// @@ -190,7 +190,7 @@ pub enum BCResponse { cumulative_difficulty: u128, }, - /// The response for [`BCReadRequest::FindFirstUnknown`]. + /// The response for [`BlockchainReadRequest::FindFirstUnknown`]. /// /// Contains the index of the first unknown block and its expected height. /// @@ -198,7 +198,7 @@ pub enum BCResponse { FindFirstUnknown(Option<(usize, u64)>), //------------------------------------------------------ Writes - /// Response to [`BCWriteRequest::WriteBlock`]. + /// Response to [`BlockchainWriteRequest::WriteBlock`]. /// /// This response indicates that the requested block has /// successfully been written to the database without error.