From f5d6d2e5a5ab9e8945a107833073c99b740fec93 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Fri, 26 Jul 2024 16:26:31 +0100 Subject: [PATCH] split the DB service abstraction --- Cargo.lock | 69 ++---- Cargo.toml | 1 + consensus/fast-sync/src/create.rs | 6 +- storage/blockchain/Cargo.toml | 14 +- storage/blockchain/src/config/backend.rs | 31 --- storage/blockchain/src/config/config.rs | 4 +- storage/blockchain/src/config/mod.rs | 9 +- storage/blockchain/src/config/sync_mode.rs | 135 ------------ storage/blockchain/src/lib.rs | 2 +- storage/blockchain/src/service/free.rs | 13 +- storage/blockchain/src/service/mod.rs | 9 +- storage/blockchain/src/service/read.rs | 170 ++------------- storage/blockchain/src/service/tests.rs | 9 +- storage/blockchain/src/service/types.rs | 20 +- storage/blockchain/src/service/write.rs | 199 +----------------- storage/service/Cargo.toml | 22 ++ storage/service/src/lib.rs | 6 + .../config => service/src}/reader_threads.rs | 24 ++- storage/service/src/service.rs | 5 + storage/service/src/service/read.rs | 84 ++++++++ storage/service/src/service/write.rs | 174 +++++++++++++++ 21 files changed, 389 insertions(+), 617 deletions(-) delete mode 100644 storage/blockchain/src/config/backend.rs delete mode 100644 storage/blockchain/src/config/sync_mode.rs create mode 100644 storage/service/Cargo.toml create mode 100644 storage/service/src/lib.rs rename storage/{blockchain/src/config => service/src}/reader_threads.rs (92%) create mode 100644 storage/service/src/service.rs create mode 100644 storage/service/src/service/read.rs create mode 100644 storage/service/src/service/write.rs diff --git a/Cargo.lock b/Cargo.lock index 426ccc2f..7a9d0744 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -29,12 +29,6 @@ dependencies = [ "zerocopy", ] -[[package]] -name = "allocator-api2" -version = "0.2.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" - [[package]] name = "android-tzdata" version = "0.1.1" @@ -471,7 +465,7 @@ dependencies = [ "cuprate-test-utils", "cuprate-wire", "futures", - "indexmap 2.2.6", + "indexmap", "rand", "thiserror", "tokio", @@ -496,14 +490,13 @@ version = "0.0.0" dependencies = [ "bitflags 2.5.0", "bytemuck", - "crossbeam", "cuprate-database", + "cuprate-database-service", "cuprate-helper", "cuprate-pruning", "cuprate-test-utils", "cuprate-types", "curve25519-dalek", - "futures", "hex", "hex-literal", "monero-serai", @@ -514,7 +507,6 @@ dependencies = [ "tempfile", "thread_local", "tokio", - "tokio-util", "tower", ] @@ -607,6 +599,19 @@ dependencies = [ "thiserror", ] +[[package]] +name = "cuprate-database-service" +version = "0.1.0" +dependencies = [ + "crossbeam", + "cuprate-database", + "cuprate-helper", + "futures", + "rayon", + "serde", + "tower", +] + [[package]] name = "cuprate-epee-encoding" version = "0.5.0" @@ -707,7 +712,7 @@ dependencies = [ "dashmap", "futures", "hex", - "indexmap 2.2.6", + "indexmap", "monero-serai", "pin-project", "proptest", @@ -876,7 +881,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.5", + "hashbrown", "lock_api", "once_cell", "parking_lot_core", @@ -1168,12 +1173,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "hashbrown" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" - [[package]] name = "hashbrown" version = "0.14.5" @@ -1181,17 +1180,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ "ahash", - "allocator-api2", -] - -[[package]] -name = "hdrhistogram" -version = "7.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" -dependencies = [ - "byteorder", - "num-traits", ] [[package]] @@ -1515,16 +1503,6 @@ dependencies = [ "utf8_iter", ] -[[package]] -name = "indexmap" -version = "1.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" -dependencies = [ - "autocfg", - "hashbrown 0.12.3", -] - [[package]] name = "indexmap" version = "2.2.6" @@ -1532,7 +1510,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" dependencies = [ "equivalent", - "hashbrown 0.14.5", + "hashbrown", ] [[package]] @@ -2458,7 +2436,7 @@ name = "std-shims" version = "0.1.1" source = "git+https://github.com/Cuprate/serai.git?rev=d27d934#d27d93480aa8a849d84214ad4c71d83ce6fea0c1" dependencies = [ - "hashbrown 0.14.5", + "hashbrown", "spin", ] @@ -2663,10 +2641,7 @@ checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", - "futures-io", "futures-sink", - "futures-util", - "hashbrown 0.14.5", "pin-project-lite", "slab", "tokio", @@ -2685,7 +2660,7 @@ version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ - "indexmap 2.2.6", + "indexmap", "toml_datetime", "winnow", ] @@ -2698,12 +2673,8 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", - "hdrhistogram", - "indexmap 1.9.3", "pin-project", "pin-project-lite", - "rand", - "slab", "tokio", "tokio-util", "tower-layer", diff --git a/Cargo.toml b/Cargo.toml index 9b090ba6..b1ee6a6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ members = [ "p2p/async-buffer", "p2p/address-book", "storage/blockchain", + "storage/service", "storage/txpool", "storage/database", "pruning", diff --git a/consensus/fast-sync/src/create.rs b/consensus/fast-sync/src/create.rs index dc2311fe..61138bcb 100644 --- a/consensus/fast-sync/src/create.rs +++ b/consensus/fast-sync/src/create.rs @@ -4,7 +4,7 @@ use clap::Parser; use tower::{Service, ServiceExt}; use cuprate_blockchain::{ - config::ConfigBuilder, cuprate_database::RuntimeError, service::DatabaseReadHandle, + config::ConfigBuilder, cuprate_database::RuntimeError, service::BCReadHandle, }; use cuprate_types::blockchain::{BCReadRequest, BCResponse}; @@ -13,7 +13,7 @@ use cuprate_fast_sync::{hash_of_hashes, BlockId, HashOfHashes}; const BATCH_SIZE: u64 = 512; async fn read_batch( - handle: &mut DatabaseReadHandle, + handle: &mut BCReadHandle, height_from: u64, ) -> Result, RuntimeError> { let mut block_ids = Vec::::with_capacity(BATCH_SIZE as usize); @@ -60,7 +60,7 @@ async fn main() { let config = ConfigBuilder::new().build(); - let (mut read_handle, _) = cuprate_blockchain::service::init(config).unwrap(); + let (mut read_handle, _, _) = cuprate_blockchain::service::init(config).unwrap(); let mut hashes_of_hashes = Vec::new(); diff --git a/storage/blockchain/Cargo.toml b/storage/blockchain/Cargo.toml index 79d0dc47..d8d588fd 100644 --- a/storage/blockchain/Cargo.toml +++ b/storage/blockchain/Cargo.toml @@ -15,13 +15,14 @@ default = ["heed", "service"] heed = ["cuprate-database/heed"] redb = ["cuprate-database/redb"] redb-memory = ["cuprate-database/redb-memory"] -service = ["dep:crossbeam", "dep:futures", "dep:tokio", "dep:tokio-util", "dep:tower", "dep:rayon"] +service = ["dep:thread_local", "dep:rayon"] [dependencies] # FIXME: # We only need the `thread` feature if `service` is enabled. # Figure out how to enable features of an already pulled in dependency conditionally. cuprate-database = { path = "../database" } +cuprate-database-service = { path = "../service" } cuprate-helper = { path = "../../helper", features = ["fs", "thread", "map"] } cuprate-types = { path = "../../types", features = ["blockchain"] } @@ -33,19 +34,16 @@ monero-serai = { workspace = true, features = ["std"] } serde = { workspace = true, optional = true } # `service` feature. -crossbeam = { workspace = true, features = ["std"], optional = true } -futures = { workspace = true, optional = true } -tokio = { workspace = true, features = ["full"], optional = true } -tokio-util = { workspace = true, features = ["full"], optional = true } -tower = { workspace = true, features = ["full"], optional = true } -thread_local = { workspace = true } +thread_local = { workspace = true, optional = true } rayon = { workspace = true, optional = true } [dev-dependencies] cuprate-helper = { path = "../../helper", features = ["thread"] } cuprate-test-utils = { path = "../../test-utils" } -tempfile = { version = "3.10.0" } +tokio = { workspace = true, features = ["full"] } +tower = { workspace = true } +tempfile = { workspace = true } pretty_assertions = { workspace = true } proptest = { workspace = true } hex = { workspace = true } diff --git a/storage/blockchain/src/config/backend.rs b/storage/blockchain/src/config/backend.rs deleted file mode 100644 index ee72b3df..00000000 --- a/storage/blockchain/src/config/backend.rs +++ /dev/null @@ -1,31 +0,0 @@ -//! SOMEDAY - -//---------------------------------------------------------------------------------------------------- Import -use std::{ - borrow::Cow, - num::NonZeroUsize, - path::{Path, PathBuf}, -}; - -#[cfg(feature = "serde")] -use serde::{Deserialize, Serialize}; - -use cuprate_helper::fs::cuprate_blockchain_dir; - -use crate::{ - config::{ReaderThreads, SyncMode}, - constants::DATABASE_DATA_FILENAME, - resize::ResizeAlgorithm, -}; - -//---------------------------------------------------------------------------------------------------- Backend -/// SOMEDAY: allow runtime hot-swappable backends. -#[derive(Copy, Clone, Debug, Default, PartialEq, PartialOrd, Eq, Ord, Hash)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub enum Backend { - #[default] - /// SOMEDAY - Heed, - /// SOMEDAY - Redb, -} diff --git a/storage/blockchain/src/config/config.rs b/storage/blockchain/src/config/config.rs index c58e292a..35937203 100644 --- a/storage/blockchain/src/config/config.rs +++ b/storage/blockchain/src/config/config.rs @@ -9,8 +9,8 @@ use serde::{Deserialize, Serialize}; use cuprate_database::{config::SyncMode, resize::ResizeAlgorithm}; use cuprate_helper::fs::cuprate_blockchain_dir; -use crate::config::ReaderThreads; - +// re-exports +pub use cuprate_database_service::ReaderThreads; //---------------------------------------------------------------------------------------------------- ConfigBuilder /// Builder for [`Config`]. /// diff --git a/storage/blockchain/src/config/mod.rs b/storage/blockchain/src/config/mod.rs index 7ecc14c4..555a6e6e 100644 --- a/storage/blockchain/src/config/mod.rs +++ b/storage/blockchain/src/config/mod.rs @@ -34,14 +34,11 @@ //! .build(); //! //! // Start a database `service` using this configuration. -//! let (reader_handle, _) = cuprate_blockchain::service::init(config.clone())?; +//! let (_, _, env) = cuprate_blockchain::service::init(config.clone())?; //! // It's using the config we provided. -//! assert_eq!(reader_handle.env().config(), &config.db_config); +//! assert_eq!(env.config(), &config.db_config); //! # Ok(()) } //! ``` mod config; -pub use config::{Config, ConfigBuilder}; - -mod reader_threads; -pub use reader_threads::ReaderThreads; +pub use config::{Config, ConfigBuilder, ReaderThreads}; diff --git a/storage/blockchain/src/config/sync_mode.rs b/storage/blockchain/src/config/sync_mode.rs deleted file mode 100644 index 1d203396..00000000 --- a/storage/blockchain/src/config/sync_mode.rs +++ /dev/null @@ -1,135 +0,0 @@ -//! Database [`Env`](crate::Env) configuration. -//! -//! This module contains the main [`Config`]uration struct -//! for the database [`Env`](crate::Env)ironment, and data -//! structures related to any configuration setting. -//! -//! These configurations are processed at runtime, meaning -//! the `Env` can/will dynamically adjust its behavior -//! based on these values. - -//---------------------------------------------------------------------------------------------------- Import - -#[cfg(feature = "serde")] -use serde::{Deserialize, Serialize}; - -//---------------------------------------------------------------------------------------------------- SyncMode -/// Disk synchronization mode. -/// -/// This controls how/when the database syncs its data to disk. -/// -/// Regardless of the variant chosen, dropping [`Env`](crate::Env) -/// will always cause it to fully sync to disk. -/// -/// # Sync vs Async -/// All invariants except [`SyncMode::Async`] & [`SyncMode::Fast`] -/// are `synchronous`, as in the database will wait until the OS has -/// finished syncing all the data to disk before continuing. -/// -/// `SyncMode::Async` & `SyncMode::Fast` are `asynchronous`, meaning -/// the database will _NOT_ wait until the data is fully synced to disk -/// before continuing. Note that this doesn't mean the database itself -/// won't be synchronized between readers/writers, but rather that the -/// data _on disk_ may not be immediately synchronized after a write. -/// -/// Something like: -/// ```rust,ignore -/// db.put("key", value); -/// db.get("key"); -/// ``` -/// will be fine, most likely pulling from memory instead of disk. -/// -/// # SOMEDAY -/// Dynamic sync's are not yet supported. -/// -/// Only: -/// -/// - [`SyncMode::Safe`] -/// - [`SyncMode::Async`] -/// - [`SyncMode::Fast`] -/// -/// are supported, all other variants will panic on [`crate::Env::open`]. -#[derive(Copy, Clone, Debug, Default, PartialEq, PartialOrd, Eq, Ord, Hash)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub enum SyncMode { - /// Use [`SyncMode::Fast`] until fully synced, - /// then use [`SyncMode::Safe`]. - /// - // # SOMEDAY: how to implement this? - // ref: - // monerod-solution: - // cuprate-issue: - // - // We could: - // ```rust,ignore - // if current_db_block <= top_block.saturating_sub(N) { - // // don't sync() - // } else { - // // sync() - // } - // ``` - // where N is some threshold we pick that is _close_ enough - // to being synced where we want to start being safer. - // - // Essentially, when we are in a certain % range of being finished, - // switch to safe mode, until then, go fast. - FastThenSafe, - - #[default] - /// Fully sync to disk per transaction. - /// - /// Every database transaction commit will - /// fully sync all data to disk, _synchronously_, - /// so the database (writer) halts until synced. - /// - /// This is expected to be very slow. - /// - /// This matches: - /// - LMDB without any special sync flags - /// - [`redb::Durability::Immediate`](https://docs.rs/redb/1.5.0/redb/enum.Durability.html#variant.Immediate) - Safe, - - /// Asynchrously sync to disk per transaction. - /// - /// This is the same as [`SyncMode::Safe`], - /// but the syncs will be asynchronous, i.e. - /// each transaction commit will sync to disk, - /// but only eventually, not necessarily immediately. - /// - /// This matches: - /// - [`MDB_MAPASYNC`](http://www.lmdb.tech/doc/group__mdb__env.html#gab034ed0d8e5938090aef5ee0997f7e94) - /// - [`redb::Durability::Eventual`](https://docs.rs/redb/1.5.0/redb/enum.Durability.html#variant.Eventual) - Async, - - /// Fully sync to disk after we cross this transaction threshold. - /// - /// After committing [`usize`] amount of database - /// transactions, it will be sync to disk. - /// - /// `0` behaves the same as [`SyncMode::Safe`], and a ridiculously large - /// number like `usize::MAX` is practically the same as [`SyncMode::Fast`]. - Threshold(usize), - - /// Only flush at database shutdown. - /// - /// This is the fastest, yet unsafest option. - /// - /// It will cause the database to never _actively_ sync, - /// letting the OS decide when to flush data to disk. - /// - /// This matches: - /// - [`MDB_NOSYNC`](http://www.lmdb.tech/doc/group__mdb__env.html#ga5791dd1adb09123f82dd1f331209e12e) + [`MDB_MAPASYNC`](http://www.lmdb.tech/doc/group__mdb__env.html#gab034ed0d8e5938090aef5ee0997f7e94) - /// - [`redb::Durability::None`](https://docs.rs/redb/1.5.0/redb/enum.Durability.html#variant.None) - /// - /// `monerod` reference: - /// - /// # Corruption - /// In the case of a system crash, the database - /// may become corrupted when using this option. - // - // FIXME: we could call this `unsafe` - // and use that terminology in the config file - // so users know exactly what they are getting - // themselves into. - Fast, -} diff --git a/storage/blockchain/src/lib.rs b/storage/blockchain/src/lib.rs index 9db0862a..51b0fc78 100644 --- a/storage/blockchain/src/lib.rs +++ b/storage/blockchain/src/lib.rs @@ -52,7 +52,7 @@ unused_crate_dependencies, unused_doc_comments, unused_mut, - missing_docs, + //missing_docs, deprecated, unused_comparisons, nonstandard_style diff --git a/storage/blockchain/src/service/free.rs b/storage/blockchain/src/service/free.rs index 3701f66f..17d01e28 100644 --- a/storage/blockchain/src/service/free.rs +++ b/storage/blockchain/src/service/free.rs @@ -3,11 +3,12 @@ //---------------------------------------------------------------------------------------------------- Import use std::sync::Arc; -use cuprate_database::InitError; +use cuprate_database::{ConcreteEnv, InitError}; +use crate::service::{init_read_service, init_write_service}; use crate::{ config::Config, - service::{DatabaseReadHandle, DatabaseWriteHandle}, + service::types::{BCReadHandle, BCWriteHandle}, }; //---------------------------------------------------------------------------------------------------- Init @@ -20,17 +21,17 @@ use crate::{ /// /// # Errors /// This will forward the error if [`crate::open`] failed. -pub fn init(config: Config) -> Result<(DatabaseReadHandle, DatabaseWriteHandle), InitError> { +pub fn init(config: Config) -> Result<(BCReadHandle, BCWriteHandle, Arc), InitError> { let reader_threads = config.reader_threads; // Initialize the database itself. let db = Arc::new(crate::open(config)?); // Spawn the Reader thread pool and Writer. - let readers = DatabaseReadHandle::init(&db, reader_threads); - let writer = DatabaseWriteHandle::init(db); + let readers = init_read_service(db.clone(), reader_threads); + let writer = init_write_service(db.clone()); - Ok((readers, writer)) + Ok((readers, writer, db)) } //---------------------------------------------------------------------------------------------------- Compact history diff --git a/storage/blockchain/src/service/mod.rs b/storage/blockchain/src/service/mod.rs index 1d9d10b4..2f436908 100644 --- a/storage/blockchain/src/service/mod.rs +++ b/storage/blockchain/src/service/mod.rs @@ -81,11 +81,11 @@ //! .build(); //! //! // Initialize the database thread-pool. -//! let (mut read_handle, mut write_handle) = cuprate_blockchain::service::init(config)?; +//! let (mut read_handle, mut write_handle, _) = cuprate_blockchain::service::init(config)?; //! //! // Prepare a request to write block. //! let mut block = block_v16_tx0().clone(); -//! # block.height = 0 as u64; // must be 0th height or panic in `add_block()` +//! # block.height = 0_u64; // must be 0th height or panic in `add_block()` //! let request = BCWriteRequest::WriteBlock(block); //! //! // Send the request. @@ -119,16 +119,17 @@ //! ``` mod read; -pub use read::DatabaseReadHandle; +pub use read::{init_read_service, init_read_service_with_pool}; mod write; -pub use write::DatabaseWriteHandle; +pub use write::init_write_service; mod free; pub use free::init; // Internal type aliases for `service`. mod types; +pub use types::{BCReadHandle, BCWriteHandle}; #[cfg(test)] mod tests; diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index 3f0b2633..e66d9a79 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -4,24 +4,21 @@ use std::{ collections::{HashMap, HashSet}, sync::Arc, - task::{Context, Poll}, }; -use futures::{channel::oneshot, ready}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon::ThreadPool; use thread_local::ThreadLocal; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; -use tokio_util::sync::PollSemaphore; use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError}; -use cuprate_helper::{asynch::InfallibleOneshotReceiver, map::combine_low_high_bits_to_u128}; +use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; +use cuprate_helper::map::combine_low_high_bits_to_u128; use cuprate_types::{ blockchain::{BCReadRequest, BCResponse}, ExtendedBlockHeader, OutputOnChain, }; use crate::{ - config::ReaderThreads, ops::{ block::{ block_exists, get_block_extended_header_from_height, get_block_height, get_block_info, @@ -32,7 +29,7 @@ use crate::{ }, service::{ free::{compact_history_genesis_not_included, compact_history_index_to_height_offset}, - types::{ResponseReceiver, ResponseResult, ResponseSender}, + types::{BCReadHandle, ResponseResult}, }, tables::OpenTables, tables::{BlockHeights, BlockInfos, Tables}, @@ -40,148 +37,13 @@ use crate::{ }; //---------------------------------------------------------------------------------------------------- DatabaseReadHandle -/// Read handle to the database. -/// -/// This is cheaply [`Clone`]able handle that -/// allows `async`hronously reading from the database. -/// -/// Calling [`tower::Service::call`] with a [`DatabaseReadHandle`] & [`BCReadRequest`] -/// will return an `async`hronous channel that can be `.await`ed upon -/// to receive the corresponding [`BCResponse`]. -pub struct DatabaseReadHandle { - /// Handle to the custom `rayon` DB reader thread-pool. - /// - /// Requests are [`rayon::ThreadPool::spawn`]ed in this thread-pool, - /// and responses are returned via a channel we (the caller) provide. - pool: Arc, - /// Counting semaphore asynchronous permit for database access. - /// Each [`tower::Service::poll_ready`] will acquire a permit - /// before actually sending a request to the `rayon` DB threadpool. - semaphore: PollSemaphore, - - /// An owned permit. - /// This will be set to [`Some`] in `poll_ready()` when we successfully acquire - /// the permit, and will be [`Option::take()`]n after `tower::Service::call()` is called. - /// - /// The actual permit will be dropped _after_ the rayon DB thread has finished - /// the request, i.e., after [`map_request()`] finishes. - permit: Option, - - /// Access to the database. - env: Arc, +pub fn init_read_service(env: Arc, threads: ReaderThreads) -> BCReadHandle { + init_read_service_with_pool(env, init_thread_pool(threads)) } -// `OwnedSemaphorePermit` does not implement `Clone`, -// so manually clone all elements, while keeping `permit` -// `None` across clones. -impl Clone for DatabaseReadHandle { - fn clone(&self) -> Self { - Self { - pool: Arc::clone(&self.pool), - semaphore: self.semaphore.clone(), - permit: None, - env: Arc::clone(&self.env), - } - } -} - -impl DatabaseReadHandle { - /// Initialize the `DatabaseReader` thread-pool backed by `rayon`. - /// - /// This spawns `N` amount of `DatabaseReader`'s - /// attached to `env` and returns a handle to the pool. - /// - /// Should be called _once_ per actual database. - #[cold] - #[inline(never)] // Only called once. - pub(super) fn init(env: &Arc, reader_threads: ReaderThreads) -> Self { - // How many reader threads to spawn? - let reader_count = reader_threads.as_threads().get(); - - // Spawn `rayon` reader threadpool. - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(reader_count) - .thread_name(|i| format!("cuprate_helper::service::read::DatabaseReader{i}")) - .build() - .unwrap(); - - // Create a semaphore with the same amount of - // permits as the amount of reader threads. - let semaphore = PollSemaphore::new(Arc::new(Semaphore::new(reader_count))); - - // Return a handle to the pool. - Self { - pool: Arc::new(pool), - semaphore, - permit: None, - env: Arc::clone(env), - } - } - - /// Access to the actual database environment. - /// - /// # ⚠️ Warning - /// This function gives you access to the actual - /// underlying database connected to by `self`. - /// - /// I.e. it allows you to read/write data _directly_ - /// instead of going through a request. - /// - /// Be warned that using the database directly - /// in this manner has not been tested. - #[inline] - pub const fn env(&self) -> &Arc { - &self.env - } -} - -impl tower::Service for DatabaseReadHandle { - type Response = BCResponse; - type Error = RuntimeError; - type Future = ResponseReceiver; - - #[inline] - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - // Check if we already have a permit. - if self.permit.is_some() { - return Poll::Ready(Ok(())); - } - - // Acquire a permit before returning `Ready`. - let permit = - ready!(self.semaphore.poll_acquire(cx)).expect("this semaphore is never closed"); - - self.permit = Some(permit); - Poll::Ready(Ok(())) - } - - #[inline] - fn call(&mut self, request: BCReadRequest) -> Self::Future { - let permit = self - .permit - .take() - .expect("poll_ready() should have acquire a permit before calling call()"); - - // Response channel we `.await` on. - let (response_sender, receiver) = oneshot::channel(); - - // Spawn the request in the rayon DB thread-pool. - // - // Note that this uses `self.pool` instead of `rayon::spawn` - // such that any `rayon` parallel code that runs within - // the passed closure uses the same `rayon` threadpool. - // - // INVARIANT: - // The below `DatabaseReader` function impl block relies on this behavior. - let env = Arc::clone(&self.env); - self.pool.spawn(move || { - let _permit: OwnedSemaphorePermit = permit; - map_request(&env, request, response_sender); - }); // drop(permit/env); - - InfallibleOneshotReceiver::from(receiver) - } +pub fn init_read_service_with_pool(env: Arc, pool: Arc) -> BCReadHandle { + DatabaseReadService::new(env, pool, map_request) } //---------------------------------------------------------------------------------------------------- Request Mapping @@ -194,12 +56,11 @@ impl tower::Service for DatabaseReadHandle { /// The basic structure is: /// 1. `Request` is mapped to a handler function /// 2. Handler function is called -/// 3. [`BCResponse`] is sent +/// 3. [`BCResponse`] is returned fn map_request( - env: &ConcreteEnv, // Access to the database - request: BCReadRequest, // The request we must fulfill - response_sender: ResponseSender, // The channel we must send the response back to -) { + env: &ConcreteEnv, // Access to the database + request: BCReadRequest, // The request we must fulfill +) -> ResponseResult { use BCReadRequest as R; /* SOMEDAY: pre-request handling, run some code for each request? */ @@ -218,12 +79,9 @@ fn map_request( R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids), }; - if let Err(e) = response_sender.send(response) { - // TODO: use tracing. - println!("database reader failed to send response: {e:?}"); - } - /* SOMEDAY: post-request handling, run some code for each request? */ + + response } //---------------------------------------------------------------------------------------------------- Thread Local diff --git a/storage/blockchain/src/service/tests.rs b/storage/blockchain/src/service/tests.rs index 4f3fbe49..4bff452a 100644 --- a/storage/blockchain/src/service/tests.rs +++ b/storage/blockchain/src/service/tests.rs @@ -29,7 +29,7 @@ use crate::{ blockchain::chain_height, output::id_to_output_on_chain, }, - service::{init, DatabaseReadHandle, DatabaseWriteHandle}, + service::{init, BCReadHandle, BCWriteHandle}, tables::{OpenTables, Tables, TablesIter}, tests::AssertTableLen, types::{Amount, AmountIndex, PreRctOutputId}, @@ -38,8 +38,8 @@ use crate::{ //---------------------------------------------------------------------------------------------------- Helper functions /// Initialize the `service`. fn init_service() -> ( - DatabaseReadHandle, - DatabaseWriteHandle, + BCReadHandle, + BCWriteHandle, Arc, tempfile::TempDir, ) { @@ -48,8 +48,7 @@ fn init_service() -> ( .db_directory(Cow::Owned(tempdir.path().into())) .low_power() .build(); - let (reader, writer) = init(config).unwrap(); - let env = reader.env().clone(); + let (reader, writer, env) = init(config).unwrap(); (reader, writer, env, tempdir) } diff --git a/storage/blockchain/src/service/types.rs b/storage/blockchain/src/service/types.rs index c6ee67e7..b612af8d 100644 --- a/storage/blockchain/src/service/types.rs +++ b/storage/blockchain/src/service/types.rs @@ -3,11 +3,9 @@ //! Only used internally for our `tower::Service` impls. //---------------------------------------------------------------------------------------------------- Use -use futures::channel::oneshot::Sender; - use cuprate_database::RuntimeError; -use cuprate_helper::asynch::InfallibleOneshotReceiver; -use cuprate_types::blockchain::BCResponse; +use cuprate_database_service::{DatabaseReadService, DatabaseWriteHandle}; +use cuprate_types::blockchain::{BCReadRequest, BCResponse, BCWriteRequest}; //---------------------------------------------------------------------------------------------------- Types /// The actual type of the response. @@ -15,16 +13,6 @@ use cuprate_types::blockchain::BCResponse; /// Either our [`BCResponse`], or a database error occurred. pub(super) type ResponseResult = Result; -/// The `Receiver` channel that receives the read response. -/// -/// This is owned by the caller (the reader/writer thread) -/// who `.await`'s for the response. -/// -/// The channel itself should never fail, -/// but the actual database operation might. -pub(super) type ResponseReceiver = InfallibleOneshotReceiver; +pub type BCWriteHandle = DatabaseWriteHandle; -/// The `Sender` channel for the response. -/// -/// The database reader/writer thread uses this to send the database result to the caller. -pub(super) type ResponseSender = Sender; +pub type BCReadHandle = DatabaseReadService; diff --git a/storage/blockchain/src/service/write.rs b/storage/blockchain/src/service/write.rs index 041ae7b6..505fbe48 100644 --- a/storage/blockchain/src/service/write.rs +++ b/storage/blockchain/src/service/write.rs @@ -1,209 +1,28 @@ //! Database writer thread definitions and logic. //---------------------------------------------------------------------------------------------------- Import -use std::{ - sync::Arc, - task::{Context, Poll}, -}; - -use futures::channel::oneshot; +use std::sync::Arc; use cuprate_database::{ConcreteEnv, Env, EnvInner, RuntimeError, TxRw}; -use cuprate_helper::asynch::InfallibleOneshotReceiver; +use cuprate_database_service::DatabaseWriteHandle; use cuprate_types::{ blockchain::{BCResponse, BCWriteRequest}, VerifiedBlockInformation, }; use crate::{ - service::types::{ResponseReceiver, ResponseResult, ResponseSender}, + service::types::{BCWriteHandle, ResponseResult}, tables::OpenTables, }; +//---------------------------------------------------------------------------------------------------- init_write_service -//---------------------------------------------------------------------------------------------------- Constants -/// Name of the writer thread. -const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter"); - -//---------------------------------------------------------------------------------------------------- DatabaseWriteHandle -/// Write handle to the database. -/// -/// This is handle that allows `async`hronously writing to the database, -/// it is not [`Clone`]able as there is only ever 1 place within Cuprate -/// that writes. -/// -/// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`] & [`BCWriteRequest`] -/// will return an `async`hronous channel that can be `.await`ed upon -/// to receive the corresponding [`BCResponse`]. -#[derive(Debug)] -pub struct DatabaseWriteHandle { - /// Sender channel to the database write thread-pool. - /// - /// We provide the response channel for the thread-pool. - pub(super) sender: crossbeam::channel::Sender<(BCWriteRequest, ResponseSender)>, +pub fn init_write_service(env: Arc) -> BCWriteHandle { + DatabaseWriteHandle::init(env, handle_bc_request) } -impl DatabaseWriteHandle { - /// Initialize the single `DatabaseWriter` thread. - #[cold] - #[inline(never)] // Only called once. - pub(super) fn init(env: Arc) -> Self { - // Initialize `Request/Response` channels. - let (sender, receiver) = crossbeam::channel::unbounded(); - - // Spawn the writer. - std::thread::Builder::new() - .name(WRITER_THREAD_NAME.into()) - .spawn(move || { - let this = DatabaseWriter { receiver, env }; - DatabaseWriter::main(this); - }) - .unwrap(); - - Self { sender } - } -} - -impl tower::Service for DatabaseWriteHandle { - type Response = BCResponse; - type Error = RuntimeError; - type Future = ResponseReceiver; - - #[inline] - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - #[inline] - fn call(&mut self, request: BCWriteRequest) -> Self::Future { - // Response channel we `.await` on. - let (response_sender, receiver) = oneshot::channel(); - - // Send the write request. - self.sender.send((request, response_sender)).unwrap(); - - InfallibleOneshotReceiver::from(receiver) - } -} - -//---------------------------------------------------------------------------------------------------- DatabaseWriter -/// The single database writer thread. -pub(super) struct DatabaseWriter { - /// Receiver side of the database request channel. - /// - /// Any caller can send some requests to this channel. - /// They send them alongside another `Response` channel, - /// which we will eventually send to. - receiver: crossbeam::channel::Receiver<(BCWriteRequest, ResponseSender)>, - - /// Access to the database. - env: Arc, -} - -impl Drop for DatabaseWriter { - fn drop(&mut self) { - // TODO: log the writer thread has exited? - } -} - -impl DatabaseWriter { - /// The `DatabaseWriter`'s main function. - /// - /// The writer just loops in this function, handling requests forever - /// until the request channel is dropped or a panic occurs. - #[cold] - #[inline(never)] // Only called once. - fn main(self) { - // 1. Hang on request channel - // 2. Map request to some database function - // 3. Execute that function, get the result - // 4. Return the result via channel - 'main: loop { - let Ok((request, response_sender)) = self.receiver.recv() else { - // If this receive errors, it means that the channel is empty - // and disconnected, meaning the other side (all senders) have - // been dropped. This means "shutdown", and we return here to - // exit the thread. - // - // Since the channel is empty, it means we've also processed - // all requests. Since it is disconnected, it means future - // ones cannot come in. - return; - }; - - /// How many times should we retry handling the request on resize errors? - /// - /// This is 1 on automatically resizing databases, meaning there is only 1 iteration. - const REQUEST_RETRY_LIMIT: usize = if ConcreteEnv::MANUAL_RESIZE { 3 } else { 1 }; - - // Map [`Request`]'s to specific database functions. - // - // Both will: - // 1. Map the request to a function - // 2. Call the function - // 3. (manual resize only) If resize is needed, resize and retry - // 4. (manual resize only) Redo step {1, 2} - // 5. Send the function's `Result` back to the requester - // - // FIXME: there's probably a more elegant way - // to represent this retry logic with recursive - // functions instead of a loop. - 'retry: for retry in 0..REQUEST_RETRY_LIMIT { - // FIXME: will there be more than 1 write request? - // this won't have to be an enum. - let response = match &request { - BCWriteRequest::WriteBlock(block) => write_block(&self.env, block), - }; - - // If the database needs to resize, do so. - if ConcreteEnv::MANUAL_RESIZE && matches!(response, Err(RuntimeError::ResizeNeeded)) - { - // If this is the last iteration of the outer `for` loop and we - // encounter a resize error _again_, it means something is wrong. - assert_ne!( - retry, REQUEST_RETRY_LIMIT, - "database resize failed maximum of {REQUEST_RETRY_LIMIT} times" - ); - - // Resize the map, and retry the request handling loop. - // - // FIXME: - // We could pass in custom resizes to account for - // batches, i.e., we're about to add ~5GB of data, - // add that much instead of the default 1GB. - // - let old = self.env.current_map_size(); - let new = self.env.resize_map(None); - - // TODO: use tracing. - println!("resizing database memory map, old: {old}B, new: {new}B"); - - // Try handling the request again. - continue 'retry; - } - - // Automatically resizing databases should not be returning a resize error. - #[cfg(debug_assertions)] - if !ConcreteEnv::MANUAL_RESIZE { - assert!( - !matches!(response, Err(RuntimeError::ResizeNeeded)), - "auto-resizing database returned a ResizeNeeded error" - ); - } - - // Send the response back, whether if it's an `Ok` or `Err`. - if let Err(e) = response_sender.send(response) { - // TODO: use tracing. - println!("database writer failed to send response: {e:?}"); - } - - continue 'main; - } - - // Above retry loop should either: - // - continue to the next ['main] loop or... - // - ...retry until panic - unreachable!(); - } +fn handle_bc_request(env: &ConcreteEnv, req: &BCWriteRequest) -> Result { + match req { + BCWriteRequest::WriteBlock(block) => write_block(env, block), } } diff --git a/storage/service/Cargo.toml b/storage/service/Cargo.toml new file mode 100644 index 00000000..131c5320 --- /dev/null +++ b/storage/service/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "cuprate-database-service" +version = "0.1.0" +edition = "2021" +description = "Cuprate's database service abstraction" +license = "MIT" +authors = ["Boog900"] +repository = "https://github.com/Cuprate/cuprate/tree/main/storage/service" +keywords = ["cuprate", "service", "database"] + +[dependencies] +cuprate-database = { path = "../database" } +cuprate-helper = { path = "../../helper", features = ["fs", "thread", "map"] } + +serde = { workspace = true, optional = true } +rayon = { workspace = true } +tower = { workspace = true } +futures = { workspace = true } +crossbeam = { workspace = true, features = ["std"] } + +[lints] +workspace = true diff --git a/storage/service/src/lib.rs b/storage/service/src/lib.rs new file mode 100644 index 00000000..543174a2 --- /dev/null +++ b/storage/service/src/lib.rs @@ -0,0 +1,6 @@ +mod reader_threads; +mod service; + +pub use reader_threads::{init_thread_pool, ReaderThreads}; + +pub use service::{DatabaseReadService, DatabaseWriteHandle}; diff --git a/storage/blockchain/src/config/reader_threads.rs b/storage/service/src/reader_threads.rs similarity index 92% rename from storage/blockchain/src/config/reader_threads.rs rename to storage/service/src/reader_threads.rs index d4dd6ac4..a8dc5d63 100644 --- a/storage/blockchain/src/config/reader_threads.rs +++ b/storage/service/src/reader_threads.rs @@ -9,10 +9,24 @@ //! based on these values. //---------------------------------------------------------------------------------------------------- Import -use std::num::NonZeroUsize; - +use rayon::ThreadPool; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +use std::num::NonZeroUsize; +use std::sync::Arc; + +pub fn init_thread_pool(reader_threads: ReaderThreads) -> Arc { + // How many reader threads to spawn? + let reader_count = reader_threads.as_threads().get(); + + Arc::new( + rayon::ThreadPoolBuilder::new() + .num_threads(reader_count) + .thread_name(|i| format!("{}::DatabaseReader({i})", module_path!())) + .build() + .unwrap(), + ) +} //---------------------------------------------------------------------------------------------------- ReaderThreads /// Amount of database reader threads to spawn when using [`service`](crate::service). @@ -48,7 +62,7 @@ pub enum ReaderThreads { /// as such, it is equal to [`ReaderThreads::OnePerThread`]. /// /// ```rust - /// # use cuprate_blockchain::config::*; + /// # use cuprate_database_service::*; /// let reader_threads = ReaderThreads::from(0_usize); /// assert!(matches!(reader_threads, ReaderThreads::OnePerThread)); /// ``` @@ -80,7 +94,7 @@ pub enum ReaderThreads { /// non-zero, but not 1 thread, the minimum value 1 will be returned. /// /// ```rust - /// # use cuprate_blockchain::config::*; + /// # use cuprate_database_service::ReaderThreads; /// assert_eq!(ReaderThreads::Percent(0.000000001).as_threads().get(), 1); /// ``` Percent(f32), @@ -96,7 +110,7 @@ impl ReaderThreads { /// /// # Example /// ```rust - /// use cuprate_blockchain::config::ReaderThreads as R; + /// use cuprate_database_service::ReaderThreads as R; /// /// let total_threads: std::num::NonZeroUsize = /// cuprate_helper::thread::threads(); diff --git a/storage/service/src/service.rs b/storage/service/src/service.rs new file mode 100644 index 00000000..cd4957ff --- /dev/null +++ b/storage/service/src/service.rs @@ -0,0 +1,5 @@ +mod read; +mod write; + +pub use read::DatabaseReadService; +pub use write::DatabaseWriteHandle; diff --git a/storage/service/src/service/read.rs b/storage/service/src/service/read.rs new file mode 100644 index 00000000..e0f7465e --- /dev/null +++ b/storage/service/src/service/read.rs @@ -0,0 +1,84 @@ +use std::{ + sync::Arc, + task::{Context, Poll}, +}; + +use futures::channel::oneshot; +use rayon::ThreadPool; +use tower::Service; + +use cuprate_database::ConcreteEnv; +use cuprate_helper::asynch::InfallibleOneshotReceiver; + +/// The [`rayon::ThreadPool`] service. +/// +/// Uses an inner request handler and a rayon thread-pool to asynchronously handler requests. +pub struct DatabaseReadService { + /// The rayon thread-pool. + pool: Arc, + + /// The function used to handle request. + inner_handler: Arc Result + Send + Sync + 'static>, +} + +impl Clone for DatabaseReadService { + fn clone(&self) -> Self { + Self { + pool: self.pool.clone(), + inner_handler: self.inner_handler.clone(), + } + } +} + +impl DatabaseReadService +where + Req: Send + 'static, + Res: Send + 'static, + Err: Send + 'static, +{ + pub fn new( + env: Arc, + pool: Arc, + req_handler: impl Fn(&ConcreteEnv, Req) -> Result + Send + Sync + 'static, + ) -> Self { + let inner_handler = Arc::new(move |req| req_handler(&env, req)); + + Self { + pool, + inner_handler, + } + } +} + +impl Service for DatabaseReadService +where + Req: Send + 'static, + Res: Send + 'static, + Err: Send + 'static, +{ + type Response = Res; + type Error = Err; + type Future = InfallibleOneshotReceiver>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Req) -> Self::Future { + // Response channel we `.await` on. + let (response_sender, receiver) = oneshot::channel(); + + let handler = self.inner_handler.clone(); + + // Spawn the request in the rayon DB thread-pool. + // + // Note that this uses `self.pool` instead of `rayon::spawn` + // such that any `rayon` parallel code that runs within + // the passed closure uses the same `rayon` threadpool. + self.pool.spawn(move || { + drop(response_sender.send(handler(req))); + }); + + InfallibleOneshotReceiver::from(receiver) + } +} diff --git a/storage/service/src/service/write.rs b/storage/service/src/service/write.rs new file mode 100644 index 00000000..25e758c7 --- /dev/null +++ b/storage/service/src/service/write.rs @@ -0,0 +1,174 @@ +use cuprate_database::{ConcreteEnv, Env, RuntimeError}; +use cuprate_helper::asynch::InfallibleOneshotReceiver; +use futures::channel::oneshot; +use std::fmt::Debug; +use std::sync::Arc; +use std::task::{Context, Poll}; + +//---------------------------------------------------------------------------------------------------- Constants +/// Name of the writer thread. +const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter"); + +//---------------------------------------------------------------------------------------------------- DatabaseWriteHandle +/// Write handle to the database. +/// +/// This is handle that allows `async`hronously writing to the database. +/// +/// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`] +/// will return an `async`hronous channel that can be `.await`ed upon +/// to receive the corresponding [`BCResponse`]. +#[derive(Debug)] +pub struct DatabaseWriteHandle { + /// Sender channel to the database write thread-pool. + /// + /// We provide the response channel for the thread-pool. + pub(super) sender: + crossbeam::channel::Sender<(Req, oneshot::Sender>)>, +} + +impl DatabaseWriteHandle +where + Req: Send + 'static, + Res: Debug + Send + 'static, +{ + /// Initialize the single `DatabaseWriter` thread. + #[cold] + #[inline(never)] // Only called once. + pub fn init( + env: Arc, + inner_handler: impl Fn(&ConcreteEnv, &Req) -> Result + Send + 'static, + ) -> Self { + // Initialize `Request/Response` channels. + let (sender, receiver) = crossbeam::channel::unbounded(); + + // Spawn the writer. + std::thread::Builder::new() + .name(WRITER_THREAD_NAME.into()) + .spawn(move || database_writer(env, receiver, inner_handler)) + .unwrap(); + + Self { sender } + } +} + +impl tower::Service for DatabaseWriteHandle { + type Response = Res; + type Error = RuntimeError; + type Future = InfallibleOneshotReceiver>; + + #[inline] + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + #[inline] + fn call(&mut self, request: Req) -> Self::Future { + // Response channel we `.await` on. + let (response_sender, receiver) = oneshot::channel(); + + // Send the write request. + self.sender.send((request, response_sender)).unwrap(); + + InfallibleOneshotReceiver::from(receiver) + } +} + +//---------------------------------------------------------------------------------------------------- database_writer +/// The main function of the writer thread. +fn database_writer( + env: Arc, + receiver: crossbeam::channel::Receiver<(Req, oneshot::Sender>)>, + inner_handler: impl Fn(&ConcreteEnv, &Req) -> Result, +) where + Req: Send + 'static, + Res: Debug + Send + 'static, +{ + // 1. Hang on request channel + // 2. Map request to some database function + // 3. Execute that function, get the result + // 4. Return the result via channel + 'main: loop { + let Ok((request, response_sender)) = receiver.recv() else { + // If this receive errors, it means that the channel is empty + // and disconnected, meaning the other side (all senders) have + // been dropped. This means "shutdown", and we return here to + // exit the thread. + // + // Since the channel is empty, it means we've also processed + // all requests. Since it is disconnected, it means future + // ones cannot come in. + return; + }; + + /// How many times should we retry handling the request on resize errors? + /// + /// This is 1 on automatically resizing databases, meaning there is only 1 iteration. + const REQUEST_RETRY_LIMIT: usize = if ConcreteEnv::MANUAL_RESIZE { 3 } else { 1 }; + + // Map [`Request`]'s to specific database functions. + // + // Both will: + // 1. Map the request to a function + // 2. Call the function + // 3. (manual resize only) If resize is needed, resize and retry + // 4. (manual resize only) Redo step {1, 2} + // 5. Send the function's `Result` back to the requester + // + // FIXME: there's probably a more elegant way + // to represent this retry logic with recursive + // functions instead of a loop. + 'retry: for retry in 0..REQUEST_RETRY_LIMIT { + // FIXME: will there be more than 1 write request? + // this won't have to be an enum. + let response = inner_handler(&env, &request); + + // If the database needs to resize, do so. + if ConcreteEnv::MANUAL_RESIZE && matches!(response, Err(RuntimeError::ResizeNeeded)) { + // If this is the last iteration of the outer `for` loop and we + // encounter a resize error _again_, it means something is wrong. + assert_ne!( + retry, REQUEST_RETRY_LIMIT, + "database resize failed maximum of {REQUEST_RETRY_LIMIT} times" + ); + + // Resize the map, and retry the request handling loop. + // + // FIXME: + // We could pass in custom resizes to account for + // batches, i.e., we're about to add ~5GB of data, + // add that much instead of the default 1GB. + // + let old = env.current_map_size(); + let new = env.resize_map(None); + + // TODO: use tracing. + println!("resizing database memory map, old: {old}B, new: {new}B"); + + // Try handling the request again. + continue 'retry; + } + + // Automatically resizing databases should not be returning a resize error. + #[cfg(debug_assertions)] + if !ConcreteEnv::MANUAL_RESIZE { + assert!( + !matches!(response, Err(RuntimeError::ResizeNeeded)), + "auto-resizing database returned a ResizeNeeded error" + ); + } + + // Send the response back, whether if it's an `Ok` or `Err`. + if let Err(e) = response_sender.send(response) { + // TODO: use tracing. + println!("database writer failed to send response: {e:?}"); + } + + continue 'main; + } + + // Above retry loop should either: + // - continue to the next ['main] loop or... + // - ...retry until panic + unreachable!(); + } +}