mirror of
https://github.com/Cuprate/cuprate.git
synced 2025-01-26 20:36:00 +00:00
split the DB service abstraction
This commit is contained in:
parent
aa718e224f
commit
f5d6d2e5a5
21 changed files with 389 additions and 617 deletions
69
Cargo.lock
generated
69
Cargo.lock
generated
|
@ -29,12 +29,6 @@ dependencies = [
|
|||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "allocator-api2"
|
||||
version = "0.2.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f"
|
||||
|
||||
[[package]]
|
||||
name = "android-tzdata"
|
||||
version = "0.1.1"
|
||||
|
@ -471,7 +465,7 @@ dependencies = [
|
|||
"cuprate-test-utils",
|
||||
"cuprate-wire",
|
||||
"futures",
|
||||
"indexmap 2.2.6",
|
||||
"indexmap",
|
||||
"rand",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
|
@ -496,14 +490,13 @@ version = "0.0.0"
|
|||
dependencies = [
|
||||
"bitflags 2.5.0",
|
||||
"bytemuck",
|
||||
"crossbeam",
|
||||
"cuprate-database",
|
||||
"cuprate-database-service",
|
||||
"cuprate-helper",
|
||||
"cuprate-pruning",
|
||||
"cuprate-test-utils",
|
||||
"cuprate-types",
|
||||
"curve25519-dalek",
|
||||
"futures",
|
||||
"hex",
|
||||
"hex-literal",
|
||||
"monero-serai",
|
||||
|
@ -514,7 +507,6 @@ dependencies = [
|
|||
"tempfile",
|
||||
"thread_local",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower",
|
||||
]
|
||||
|
||||
|
@ -607,6 +599,19 @@ dependencies = [
|
|||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cuprate-database-service"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"crossbeam",
|
||||
"cuprate-database",
|
||||
"cuprate-helper",
|
||||
"futures",
|
||||
"rayon",
|
||||
"serde",
|
||||
"tower",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cuprate-epee-encoding"
|
||||
version = "0.5.0"
|
||||
|
@ -707,7 +712,7 @@ dependencies = [
|
|||
"dashmap",
|
||||
"futures",
|
||||
"hex",
|
||||
"indexmap 2.2.6",
|
||||
"indexmap",
|
||||
"monero-serai",
|
||||
"pin-project",
|
||||
"proptest",
|
||||
|
@ -876,7 +881,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"hashbrown 0.14.5",
|
||||
"hashbrown",
|
||||
"lock_api",
|
||||
"once_cell",
|
||||
"parking_lot_core",
|
||||
|
@ -1168,12 +1173,6 @@ dependencies = [
|
|||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
|
||||
|
||||
[[package]]
|
||||
name = "hashbrown"
|
||||
version = "0.14.5"
|
||||
|
@ -1181,17 +1180,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"allocator-api2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hdrhistogram"
|
||||
version = "7.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1515,16 +1503,6 @@ dependencies = [
|
|||
"utf8_iter",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "1.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"hashbrown 0.12.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "2.2.6"
|
||||
|
@ -1532,7 +1510,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26"
|
||||
dependencies = [
|
||||
"equivalent",
|
||||
"hashbrown 0.14.5",
|
||||
"hashbrown",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -2458,7 +2436,7 @@ name = "std-shims"
|
|||
version = "0.1.1"
|
||||
source = "git+https://github.com/Cuprate/serai.git?rev=d27d934#d27d93480aa8a849d84214ad4c71d83ce6fea0c1"
|
||||
dependencies = [
|
||||
"hashbrown 0.14.5",
|
||||
"hashbrown",
|
||||
"spin",
|
||||
]
|
||||
|
||||
|
@ -2663,10 +2641,7 @@ checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1"
|
|||
dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"futures-sink",
|
||||
"futures-util",
|
||||
"hashbrown 0.14.5",
|
||||
"pin-project-lite",
|
||||
"slab",
|
||||
"tokio",
|
||||
|
@ -2685,7 +2660,7 @@ version = "0.21.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1"
|
||||
dependencies = [
|
||||
"indexmap 2.2.6",
|
||||
"indexmap",
|
||||
"toml_datetime",
|
||||
"winnow",
|
||||
]
|
||||
|
@ -2698,12 +2673,8 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
|
|||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"hdrhistogram",
|
||||
"indexmap 1.9.3",
|
||||
"pin-project",
|
||||
"pin-project-lite",
|
||||
"rand",
|
||||
"slab",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower-layer",
|
||||
|
|
|
@ -17,6 +17,7 @@ members = [
|
|||
"p2p/async-buffer",
|
||||
"p2p/address-book",
|
||||
"storage/blockchain",
|
||||
"storage/service",
|
||||
"storage/txpool",
|
||||
"storage/database",
|
||||
"pruning",
|
||||
|
|
|
@ -4,7 +4,7 @@ use clap::Parser;
|
|||
use tower::{Service, ServiceExt};
|
||||
|
||||
use cuprate_blockchain::{
|
||||
config::ConfigBuilder, cuprate_database::RuntimeError, service::DatabaseReadHandle,
|
||||
config::ConfigBuilder, cuprate_database::RuntimeError, service::BCReadHandle,
|
||||
};
|
||||
use cuprate_types::blockchain::{BCReadRequest, BCResponse};
|
||||
|
||||
|
@ -13,7 +13,7 @@ use cuprate_fast_sync::{hash_of_hashes, BlockId, HashOfHashes};
|
|||
const BATCH_SIZE: u64 = 512;
|
||||
|
||||
async fn read_batch(
|
||||
handle: &mut DatabaseReadHandle,
|
||||
handle: &mut BCReadHandle,
|
||||
height_from: u64,
|
||||
) -> Result<Vec<BlockId>, RuntimeError> {
|
||||
let mut block_ids = Vec::<BlockId>::with_capacity(BATCH_SIZE as usize);
|
||||
|
@ -60,7 +60,7 @@ async fn main() {
|
|||
|
||||
let config = ConfigBuilder::new().build();
|
||||
|
||||
let (mut read_handle, _) = cuprate_blockchain::service::init(config).unwrap();
|
||||
let (mut read_handle, _, _) = cuprate_blockchain::service::init(config).unwrap();
|
||||
|
||||
let mut hashes_of_hashes = Vec::new();
|
||||
|
||||
|
|
|
@ -15,13 +15,14 @@ default = ["heed", "service"]
|
|||
heed = ["cuprate-database/heed"]
|
||||
redb = ["cuprate-database/redb"]
|
||||
redb-memory = ["cuprate-database/redb-memory"]
|
||||
service = ["dep:crossbeam", "dep:futures", "dep:tokio", "dep:tokio-util", "dep:tower", "dep:rayon"]
|
||||
service = ["dep:thread_local", "dep:rayon"]
|
||||
|
||||
[dependencies]
|
||||
# FIXME:
|
||||
# We only need the `thread` feature if `service` is enabled.
|
||||
# Figure out how to enable features of an already pulled in dependency conditionally.
|
||||
cuprate-database = { path = "../database" }
|
||||
cuprate-database-service = { path = "../service" }
|
||||
cuprate-helper = { path = "../../helper", features = ["fs", "thread", "map"] }
|
||||
cuprate-types = { path = "../../types", features = ["blockchain"] }
|
||||
|
||||
|
@ -33,19 +34,16 @@ monero-serai = { workspace = true, features = ["std"] }
|
|||
serde = { workspace = true, optional = true }
|
||||
|
||||
# `service` feature.
|
||||
crossbeam = { workspace = true, features = ["std"], optional = true }
|
||||
futures = { workspace = true, optional = true }
|
||||
tokio = { workspace = true, features = ["full"], optional = true }
|
||||
tokio-util = { workspace = true, features = ["full"], optional = true }
|
||||
tower = { workspace = true, features = ["full"], optional = true }
|
||||
thread_local = { workspace = true }
|
||||
thread_local = { workspace = true, optional = true }
|
||||
rayon = { workspace = true, optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
cuprate-helper = { path = "../../helper", features = ["thread"] }
|
||||
cuprate-test-utils = { path = "../../test-utils" }
|
||||
|
||||
tempfile = { version = "3.10.0" }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
tower = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
pretty_assertions = { workspace = true }
|
||||
proptest = { workspace = true }
|
||||
hex = { workspace = true }
|
||||
|
|
|
@ -1,31 +0,0 @@
|
|||
//! SOMEDAY
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- Import
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
num::NonZeroUsize,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
#[cfg(feature = "serde")]
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use cuprate_helper::fs::cuprate_blockchain_dir;
|
||||
|
||||
use crate::{
|
||||
config::{ReaderThreads, SyncMode},
|
||||
constants::DATABASE_DATA_FILENAME,
|
||||
resize::ResizeAlgorithm,
|
||||
};
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- Backend
|
||||
/// SOMEDAY: allow runtime hot-swappable backends.
|
||||
#[derive(Copy, Clone, Debug, Default, PartialEq, PartialOrd, Eq, Ord, Hash)]
|
||||
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
|
||||
pub enum Backend {
|
||||
#[default]
|
||||
/// SOMEDAY
|
||||
Heed,
|
||||
/// SOMEDAY
|
||||
Redb,
|
||||
}
|
|
@ -9,8 +9,8 @@ use serde::{Deserialize, Serialize};
|
|||
use cuprate_database::{config::SyncMode, resize::ResizeAlgorithm};
|
||||
use cuprate_helper::fs::cuprate_blockchain_dir;
|
||||
|
||||
use crate::config::ReaderThreads;
|
||||
|
||||
// re-exports
|
||||
pub use cuprate_database_service::ReaderThreads;
|
||||
//---------------------------------------------------------------------------------------------------- ConfigBuilder
|
||||
/// Builder for [`Config`].
|
||||
///
|
||||
|
|
|
@ -34,14 +34,11 @@
|
|||
//! .build();
|
||||
//!
|
||||
//! // Start a database `service` using this configuration.
|
||||
//! let (reader_handle, _) = cuprate_blockchain::service::init(config.clone())?;
|
||||
//! let (_, _, env) = cuprate_blockchain::service::init(config.clone())?;
|
||||
//! // It's using the config we provided.
|
||||
//! assert_eq!(reader_handle.env().config(), &config.db_config);
|
||||
//! assert_eq!(env.config(), &config.db_config);
|
||||
//! # Ok(()) }
|
||||
//! ```
|
||||
|
||||
mod config;
|
||||
pub use config::{Config, ConfigBuilder};
|
||||
|
||||
mod reader_threads;
|
||||
pub use reader_threads::ReaderThreads;
|
||||
pub use config::{Config, ConfigBuilder, ReaderThreads};
|
||||
|
|
|
@ -1,135 +0,0 @@
|
|||
//! Database [`Env`](crate::Env) configuration.
|
||||
//!
|
||||
//! This module contains the main [`Config`]uration struct
|
||||
//! for the database [`Env`](crate::Env)ironment, and data
|
||||
//! structures related to any configuration setting.
|
||||
//!
|
||||
//! These configurations are processed at runtime, meaning
|
||||
//! the `Env` can/will dynamically adjust its behavior
|
||||
//! based on these values.
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- Import
|
||||
|
||||
#[cfg(feature = "serde")]
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- SyncMode
|
||||
/// Disk synchronization mode.
|
||||
///
|
||||
/// This controls how/when the database syncs its data to disk.
|
||||
///
|
||||
/// Regardless of the variant chosen, dropping [`Env`](crate::Env)
|
||||
/// will always cause it to fully sync to disk.
|
||||
///
|
||||
/// # Sync vs Async
|
||||
/// All invariants except [`SyncMode::Async`] & [`SyncMode::Fast`]
|
||||
/// are `synchronous`, as in the database will wait until the OS has
|
||||
/// finished syncing all the data to disk before continuing.
|
||||
///
|
||||
/// `SyncMode::Async` & `SyncMode::Fast` are `asynchronous`, meaning
|
||||
/// the database will _NOT_ wait until the data is fully synced to disk
|
||||
/// before continuing. Note that this doesn't mean the database itself
|
||||
/// won't be synchronized between readers/writers, but rather that the
|
||||
/// data _on disk_ may not be immediately synchronized after a write.
|
||||
///
|
||||
/// Something like:
|
||||
/// ```rust,ignore
|
||||
/// db.put("key", value);
|
||||
/// db.get("key");
|
||||
/// ```
|
||||
/// will be fine, most likely pulling from memory instead of disk.
|
||||
///
|
||||
/// # SOMEDAY
|
||||
/// Dynamic sync's are not yet supported.
|
||||
///
|
||||
/// Only:
|
||||
///
|
||||
/// - [`SyncMode::Safe`]
|
||||
/// - [`SyncMode::Async`]
|
||||
/// - [`SyncMode::Fast`]
|
||||
///
|
||||
/// are supported, all other variants will panic on [`crate::Env::open`].
|
||||
#[derive(Copy, Clone, Debug, Default, PartialEq, PartialOrd, Eq, Ord, Hash)]
|
||||
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
|
||||
pub enum SyncMode {
|
||||
/// Use [`SyncMode::Fast`] until fully synced,
|
||||
/// then use [`SyncMode::Safe`].
|
||||
///
|
||||
// # SOMEDAY: how to implement this?
|
||||
// ref: <https://github.com/monero-project/monero/issues/1463>
|
||||
// monerod-solution: <https://github.com/monero-project/monero/pull/1506>
|
||||
// cuprate-issue: <https://github.com/Cuprate/cuprate/issues/78>
|
||||
//
|
||||
// We could:
|
||||
// ```rust,ignore
|
||||
// if current_db_block <= top_block.saturating_sub(N) {
|
||||
// // don't sync()
|
||||
// } else {
|
||||
// // sync()
|
||||
// }
|
||||
// ```
|
||||
// where N is some threshold we pick that is _close_ enough
|
||||
// to being synced where we want to start being safer.
|
||||
//
|
||||
// Essentially, when we are in a certain % range of being finished,
|
||||
// switch to safe mode, until then, go fast.
|
||||
FastThenSafe,
|
||||
|
||||
#[default]
|
||||
/// Fully sync to disk per transaction.
|
||||
///
|
||||
/// Every database transaction commit will
|
||||
/// fully sync all data to disk, _synchronously_,
|
||||
/// so the database (writer) halts until synced.
|
||||
///
|
||||
/// This is expected to be very slow.
|
||||
///
|
||||
/// This matches:
|
||||
/// - LMDB without any special sync flags
|
||||
/// - [`redb::Durability::Immediate`](https://docs.rs/redb/1.5.0/redb/enum.Durability.html#variant.Immediate)
|
||||
Safe,
|
||||
|
||||
/// Asynchrously sync to disk per transaction.
|
||||
///
|
||||
/// This is the same as [`SyncMode::Safe`],
|
||||
/// but the syncs will be asynchronous, i.e.
|
||||
/// each transaction commit will sync to disk,
|
||||
/// but only eventually, not necessarily immediately.
|
||||
///
|
||||
/// This matches:
|
||||
/// - [`MDB_MAPASYNC`](http://www.lmdb.tech/doc/group__mdb__env.html#gab034ed0d8e5938090aef5ee0997f7e94)
|
||||
/// - [`redb::Durability::Eventual`](https://docs.rs/redb/1.5.0/redb/enum.Durability.html#variant.Eventual)
|
||||
Async,
|
||||
|
||||
/// Fully sync to disk after we cross this transaction threshold.
|
||||
///
|
||||
/// After committing [`usize`] amount of database
|
||||
/// transactions, it will be sync to disk.
|
||||
///
|
||||
/// `0` behaves the same as [`SyncMode::Safe`], and a ridiculously large
|
||||
/// number like `usize::MAX` is practically the same as [`SyncMode::Fast`].
|
||||
Threshold(usize),
|
||||
|
||||
/// Only flush at database shutdown.
|
||||
///
|
||||
/// This is the fastest, yet unsafest option.
|
||||
///
|
||||
/// It will cause the database to never _actively_ sync,
|
||||
/// letting the OS decide when to flush data to disk.
|
||||
///
|
||||
/// This matches:
|
||||
/// - [`MDB_NOSYNC`](http://www.lmdb.tech/doc/group__mdb__env.html#ga5791dd1adb09123f82dd1f331209e12e) + [`MDB_MAPASYNC`](http://www.lmdb.tech/doc/group__mdb__env.html#gab034ed0d8e5938090aef5ee0997f7e94)
|
||||
/// - [`redb::Durability::None`](https://docs.rs/redb/1.5.0/redb/enum.Durability.html#variant.None)
|
||||
///
|
||||
/// `monerod` reference: <https://github.com/monero-project/monero/blob/7b7958bbd9d76375c47dc418b4adabba0f0b1785/src/blockchain_db/lmdb/db_lmdb.cpp#L1380-L1381>
|
||||
///
|
||||
/// # Corruption
|
||||
/// In the case of a system crash, the database
|
||||
/// may become corrupted when using this option.
|
||||
//
|
||||
// FIXME: we could call this `unsafe`
|
||||
// and use that terminology in the config file
|
||||
// so users know exactly what they are getting
|
||||
// themselves into.
|
||||
Fast,
|
||||
}
|
|
@ -52,7 +52,7 @@
|
|||
unused_crate_dependencies,
|
||||
unused_doc_comments,
|
||||
unused_mut,
|
||||
missing_docs,
|
||||
//missing_docs,
|
||||
deprecated,
|
||||
unused_comparisons,
|
||||
nonstandard_style
|
||||
|
|
|
@ -3,11 +3,12 @@
|
|||
//---------------------------------------------------------------------------------------------------- Import
|
||||
use std::sync::Arc;
|
||||
|
||||
use cuprate_database::InitError;
|
||||
use cuprate_database::{ConcreteEnv, InitError};
|
||||
|
||||
use crate::service::{init_read_service, init_write_service};
|
||||
use crate::{
|
||||
config::Config,
|
||||
service::{DatabaseReadHandle, DatabaseWriteHandle},
|
||||
service::types::{BCReadHandle, BCWriteHandle},
|
||||
};
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- Init
|
||||
|
@ -20,17 +21,17 @@ use crate::{
|
|||
///
|
||||
/// # Errors
|
||||
/// This will forward the error if [`crate::open`] failed.
|
||||
pub fn init(config: Config) -> Result<(DatabaseReadHandle, DatabaseWriteHandle), InitError> {
|
||||
pub fn init(config: Config) -> Result<(BCReadHandle, BCWriteHandle, Arc<ConcreteEnv>), InitError> {
|
||||
let reader_threads = config.reader_threads;
|
||||
|
||||
// Initialize the database itself.
|
||||
let db = Arc::new(crate::open(config)?);
|
||||
|
||||
// Spawn the Reader thread pool and Writer.
|
||||
let readers = DatabaseReadHandle::init(&db, reader_threads);
|
||||
let writer = DatabaseWriteHandle::init(db);
|
||||
let readers = init_read_service(db.clone(), reader_threads);
|
||||
let writer = init_write_service(db.clone());
|
||||
|
||||
Ok((readers, writer))
|
||||
Ok((readers, writer, db))
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- Compact history
|
||||
|
|
|
@ -81,11 +81,11 @@
|
|||
//! .build();
|
||||
//!
|
||||
//! // Initialize the database thread-pool.
|
||||
//! let (mut read_handle, mut write_handle) = cuprate_blockchain::service::init(config)?;
|
||||
//! let (mut read_handle, mut write_handle, _) = cuprate_blockchain::service::init(config)?;
|
||||
//!
|
||||
//! // Prepare a request to write block.
|
||||
//! let mut block = block_v16_tx0().clone();
|
||||
//! # block.height = 0 as u64; // must be 0th height or panic in `add_block()`
|
||||
//! # block.height = 0_u64; // must be 0th height or panic in `add_block()`
|
||||
//! let request = BCWriteRequest::WriteBlock(block);
|
||||
//!
|
||||
//! // Send the request.
|
||||
|
@ -119,16 +119,17 @@
|
|||
//! ```
|
||||
|
||||
mod read;
|
||||
pub use read::DatabaseReadHandle;
|
||||
pub use read::{init_read_service, init_read_service_with_pool};
|
||||
|
||||
mod write;
|
||||
pub use write::DatabaseWriteHandle;
|
||||
pub use write::init_write_service;
|
||||
|
||||
mod free;
|
||||
pub use free::init;
|
||||
|
||||
// Internal type aliases for `service`.
|
||||
mod types;
|
||||
pub use types::{BCReadHandle, BCWriteHandle};
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
|
|
@ -4,24 +4,21 @@
|
|||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use futures::{channel::oneshot, ready};
|
||||
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
||||
use rayon::ThreadPool;
|
||||
use thread_local::ThreadLocal;
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
use tokio_util::sync::PollSemaphore;
|
||||
|
||||
use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError};
|
||||
use cuprate_helper::{asynch::InfallibleOneshotReceiver, map::combine_low_high_bits_to_u128};
|
||||
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
|
||||
use cuprate_helper::map::combine_low_high_bits_to_u128;
|
||||
use cuprate_types::{
|
||||
blockchain::{BCReadRequest, BCResponse},
|
||||
ExtendedBlockHeader, OutputOnChain,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
config::ReaderThreads,
|
||||
ops::{
|
||||
block::{
|
||||
block_exists, get_block_extended_header_from_height, get_block_height, get_block_info,
|
||||
|
@ -32,7 +29,7 @@ use crate::{
|
|||
},
|
||||
service::{
|
||||
free::{compact_history_genesis_not_included, compact_history_index_to_height_offset},
|
||||
types::{ResponseReceiver, ResponseResult, ResponseSender},
|
||||
types::{BCReadHandle, ResponseResult},
|
||||
},
|
||||
tables::OpenTables,
|
||||
tables::{BlockHeights, BlockInfos, Tables},
|
||||
|
@ -40,148 +37,13 @@ use crate::{
|
|||
};
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- DatabaseReadHandle
|
||||
/// Read handle to the database.
|
||||
///
|
||||
/// This is cheaply [`Clone`]able handle that
|
||||
/// allows `async`hronously reading from the database.
|
||||
///
|
||||
/// Calling [`tower::Service::call`] with a [`DatabaseReadHandle`] & [`BCReadRequest`]
|
||||
/// will return an `async`hronous channel that can be `.await`ed upon
|
||||
/// to receive the corresponding [`BCResponse`].
|
||||
pub struct DatabaseReadHandle {
|
||||
/// Handle to the custom `rayon` DB reader thread-pool.
|
||||
///
|
||||
/// Requests are [`rayon::ThreadPool::spawn`]ed in this thread-pool,
|
||||
/// and responses are returned via a channel we (the caller) provide.
|
||||
pool: Arc<rayon::ThreadPool>,
|
||||
|
||||
/// Counting semaphore asynchronous permit for database access.
|
||||
/// Each [`tower::Service::poll_ready`] will acquire a permit
|
||||
/// before actually sending a request to the `rayon` DB threadpool.
|
||||
semaphore: PollSemaphore,
|
||||
|
||||
/// An owned permit.
|
||||
/// This will be set to [`Some`] in `poll_ready()` when we successfully acquire
|
||||
/// the permit, and will be [`Option::take()`]n after `tower::Service::call()` is called.
|
||||
///
|
||||
/// The actual permit will be dropped _after_ the rayon DB thread has finished
|
||||
/// the request, i.e., after [`map_request()`] finishes.
|
||||
permit: Option<OwnedSemaphorePermit>,
|
||||
|
||||
/// Access to the database.
|
||||
env: Arc<ConcreteEnv>,
|
||||
pub fn init_read_service(env: Arc<ConcreteEnv>, threads: ReaderThreads) -> BCReadHandle {
|
||||
init_read_service_with_pool(env, init_thread_pool(threads))
|
||||
}
|
||||
|
||||
// `OwnedSemaphorePermit` does not implement `Clone`,
|
||||
// so manually clone all elements, while keeping `permit`
|
||||
// `None` across clones.
|
||||
impl Clone for DatabaseReadHandle {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
pool: Arc::clone(&self.pool),
|
||||
semaphore: self.semaphore.clone(),
|
||||
permit: None,
|
||||
env: Arc::clone(&self.env),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DatabaseReadHandle {
|
||||
/// Initialize the `DatabaseReader` thread-pool backed by `rayon`.
|
||||
///
|
||||
/// This spawns `N` amount of `DatabaseReader`'s
|
||||
/// attached to `env` and returns a handle to the pool.
|
||||
///
|
||||
/// Should be called _once_ per actual database.
|
||||
#[cold]
|
||||
#[inline(never)] // Only called once.
|
||||
pub(super) fn init(env: &Arc<ConcreteEnv>, reader_threads: ReaderThreads) -> Self {
|
||||
// How many reader threads to spawn?
|
||||
let reader_count = reader_threads.as_threads().get();
|
||||
|
||||
// Spawn `rayon` reader threadpool.
|
||||
let pool = rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(reader_count)
|
||||
.thread_name(|i| format!("cuprate_helper::service::read::DatabaseReader{i}"))
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
// Create a semaphore with the same amount of
|
||||
// permits as the amount of reader threads.
|
||||
let semaphore = PollSemaphore::new(Arc::new(Semaphore::new(reader_count)));
|
||||
|
||||
// Return a handle to the pool.
|
||||
Self {
|
||||
pool: Arc::new(pool),
|
||||
semaphore,
|
||||
permit: None,
|
||||
env: Arc::clone(env),
|
||||
}
|
||||
}
|
||||
|
||||
/// Access to the actual database environment.
|
||||
///
|
||||
/// # ⚠️ Warning
|
||||
/// This function gives you access to the actual
|
||||
/// underlying database connected to by `self`.
|
||||
///
|
||||
/// I.e. it allows you to read/write data _directly_
|
||||
/// instead of going through a request.
|
||||
///
|
||||
/// Be warned that using the database directly
|
||||
/// in this manner has not been tested.
|
||||
#[inline]
|
||||
pub const fn env(&self) -> &Arc<ConcreteEnv> {
|
||||
&self.env
|
||||
}
|
||||
}
|
||||
|
||||
impl tower::Service<BCReadRequest> for DatabaseReadHandle {
|
||||
type Response = BCResponse;
|
||||
type Error = RuntimeError;
|
||||
type Future = ResponseReceiver;
|
||||
|
||||
#[inline]
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
// Check if we already have a permit.
|
||||
if self.permit.is_some() {
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
|
||||
// Acquire a permit before returning `Ready`.
|
||||
let permit =
|
||||
ready!(self.semaphore.poll_acquire(cx)).expect("this semaphore is never closed");
|
||||
|
||||
self.permit = Some(permit);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn call(&mut self, request: BCReadRequest) -> Self::Future {
|
||||
let permit = self
|
||||
.permit
|
||||
.take()
|
||||
.expect("poll_ready() should have acquire a permit before calling call()");
|
||||
|
||||
// Response channel we `.await` on.
|
||||
let (response_sender, receiver) = oneshot::channel();
|
||||
|
||||
// Spawn the request in the rayon DB thread-pool.
|
||||
//
|
||||
// Note that this uses `self.pool` instead of `rayon::spawn`
|
||||
// such that any `rayon` parallel code that runs within
|
||||
// the passed closure uses the same `rayon` threadpool.
|
||||
//
|
||||
// INVARIANT:
|
||||
// The below `DatabaseReader` function impl block relies on this behavior.
|
||||
let env = Arc::clone(&self.env);
|
||||
self.pool.spawn(move || {
|
||||
let _permit: OwnedSemaphorePermit = permit;
|
||||
map_request(&env, request, response_sender);
|
||||
}); // drop(permit/env);
|
||||
|
||||
InfallibleOneshotReceiver::from(receiver)
|
||||
}
|
||||
pub fn init_read_service_with_pool(env: Arc<ConcreteEnv>, pool: Arc<ThreadPool>) -> BCReadHandle {
|
||||
DatabaseReadService::new(env, pool, map_request)
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- Request Mapping
|
||||
|
@ -194,12 +56,11 @@ impl tower::Service<BCReadRequest> for DatabaseReadHandle {
|
|||
/// The basic structure is:
|
||||
/// 1. `Request` is mapped to a handler function
|
||||
/// 2. Handler function is called
|
||||
/// 3. [`BCResponse`] is sent
|
||||
/// 3. [`BCResponse`] is returned
|
||||
fn map_request(
|
||||
env: &ConcreteEnv, // Access to the database
|
||||
request: BCReadRequest, // The request we must fulfill
|
||||
response_sender: ResponseSender, // The channel we must send the response back to
|
||||
) {
|
||||
env: &ConcreteEnv, // Access to the database
|
||||
request: BCReadRequest, // The request we must fulfill
|
||||
) -> ResponseResult {
|
||||
use BCReadRequest as R;
|
||||
|
||||
/* SOMEDAY: pre-request handling, run some code for each request? */
|
||||
|
@ -218,12 +79,9 @@ fn map_request(
|
|||
R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids),
|
||||
};
|
||||
|
||||
if let Err(e) = response_sender.send(response) {
|
||||
// TODO: use tracing.
|
||||
println!("database reader failed to send response: {e:?}");
|
||||
}
|
||||
|
||||
/* SOMEDAY: post-request handling, run some code for each request? */
|
||||
|
||||
response
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- Thread Local
|
||||
|
|
|
@ -29,7 +29,7 @@ use crate::{
|
|||
blockchain::chain_height,
|
||||
output::id_to_output_on_chain,
|
||||
},
|
||||
service::{init, DatabaseReadHandle, DatabaseWriteHandle},
|
||||
service::{init, BCReadHandle, BCWriteHandle},
|
||||
tables::{OpenTables, Tables, TablesIter},
|
||||
tests::AssertTableLen,
|
||||
types::{Amount, AmountIndex, PreRctOutputId},
|
||||
|
@ -38,8 +38,8 @@ use crate::{
|
|||
//---------------------------------------------------------------------------------------------------- Helper functions
|
||||
/// Initialize the `service`.
|
||||
fn init_service() -> (
|
||||
DatabaseReadHandle,
|
||||
DatabaseWriteHandle,
|
||||
BCReadHandle,
|
||||
BCWriteHandle,
|
||||
Arc<ConcreteEnv>,
|
||||
tempfile::TempDir,
|
||||
) {
|
||||
|
@ -48,8 +48,7 @@ fn init_service() -> (
|
|||
.db_directory(Cow::Owned(tempdir.path().into()))
|
||||
.low_power()
|
||||
.build();
|
||||
let (reader, writer) = init(config).unwrap();
|
||||
let env = reader.env().clone();
|
||||
let (reader, writer, env) = init(config).unwrap();
|
||||
(reader, writer, env, tempdir)
|
||||
}
|
||||
|
||||
|
|
|
@ -3,11 +3,9 @@
|
|||
//! Only used internally for our `tower::Service` impls.
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- Use
|
||||
use futures::channel::oneshot::Sender;
|
||||
|
||||
use cuprate_database::RuntimeError;
|
||||
use cuprate_helper::asynch::InfallibleOneshotReceiver;
|
||||
use cuprate_types::blockchain::BCResponse;
|
||||
use cuprate_database_service::{DatabaseReadService, DatabaseWriteHandle};
|
||||
use cuprate_types::blockchain::{BCReadRequest, BCResponse, BCWriteRequest};
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- Types
|
||||
/// The actual type of the response.
|
||||
|
@ -15,16 +13,6 @@ use cuprate_types::blockchain::BCResponse;
|
|||
/// Either our [`BCResponse`], or a database error occurred.
|
||||
pub(super) type ResponseResult = Result<BCResponse, RuntimeError>;
|
||||
|
||||
/// The `Receiver` channel that receives the read response.
|
||||
///
|
||||
/// This is owned by the caller (the reader/writer thread)
|
||||
/// who `.await`'s for the response.
|
||||
///
|
||||
/// The channel itself should never fail,
|
||||
/// but the actual database operation might.
|
||||
pub(super) type ResponseReceiver = InfallibleOneshotReceiver<ResponseResult>;
|
||||
pub type BCWriteHandle = DatabaseWriteHandle<BCWriteRequest, BCResponse>;
|
||||
|
||||
/// The `Sender` channel for the response.
|
||||
///
|
||||
/// The database reader/writer thread uses this to send the database result to the caller.
|
||||
pub(super) type ResponseSender = Sender<ResponseResult>;
|
||||
pub type BCReadHandle = DatabaseReadService<BCReadRequest, BCResponse, RuntimeError>;
|
||||
|
|
|
@ -1,209 +1,28 @@
|
|||
//! Database writer thread definitions and logic.
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- Import
|
||||
use std::{
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use futures::channel::oneshot;
|
||||
use std::sync::Arc;
|
||||
|
||||
use cuprate_database::{ConcreteEnv, Env, EnvInner, RuntimeError, TxRw};
|
||||
use cuprate_helper::asynch::InfallibleOneshotReceiver;
|
||||
use cuprate_database_service::DatabaseWriteHandle;
|
||||
use cuprate_types::{
|
||||
blockchain::{BCResponse, BCWriteRequest},
|
||||
VerifiedBlockInformation,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
service::types::{ResponseReceiver, ResponseResult, ResponseSender},
|
||||
service::types::{BCWriteHandle, ResponseResult},
|
||||
tables::OpenTables,
|
||||
};
|
||||
//---------------------------------------------------------------------------------------------------- init_write_service
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- Constants
|
||||
/// Name of the writer thread.
|
||||
const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter");
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- DatabaseWriteHandle
|
||||
/// Write handle to the database.
|
||||
///
|
||||
/// This is handle that allows `async`hronously writing to the database,
|
||||
/// it is not [`Clone`]able as there is only ever 1 place within Cuprate
|
||||
/// that writes.
|
||||
///
|
||||
/// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`] & [`BCWriteRequest`]
|
||||
/// will return an `async`hronous channel that can be `.await`ed upon
|
||||
/// to receive the corresponding [`BCResponse`].
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseWriteHandle {
|
||||
/// Sender channel to the database write thread-pool.
|
||||
///
|
||||
/// We provide the response channel for the thread-pool.
|
||||
pub(super) sender: crossbeam::channel::Sender<(BCWriteRequest, ResponseSender)>,
|
||||
pub fn init_write_service(env: Arc<ConcreteEnv>) -> BCWriteHandle {
|
||||
DatabaseWriteHandle::init(env, handle_bc_request)
|
||||
}
|
||||
|
||||
impl DatabaseWriteHandle {
|
||||
/// Initialize the single `DatabaseWriter` thread.
|
||||
#[cold]
|
||||
#[inline(never)] // Only called once.
|
||||
pub(super) fn init(env: Arc<ConcreteEnv>) -> Self {
|
||||
// Initialize `Request/Response` channels.
|
||||
let (sender, receiver) = crossbeam::channel::unbounded();
|
||||
|
||||
// Spawn the writer.
|
||||
std::thread::Builder::new()
|
||||
.name(WRITER_THREAD_NAME.into())
|
||||
.spawn(move || {
|
||||
let this = DatabaseWriter { receiver, env };
|
||||
DatabaseWriter::main(this);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
Self { sender }
|
||||
}
|
||||
}
|
||||
|
||||
impl tower::Service<BCWriteRequest> for DatabaseWriteHandle {
|
||||
type Response = BCResponse;
|
||||
type Error = RuntimeError;
|
||||
type Future = ResponseReceiver;
|
||||
|
||||
#[inline]
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn call(&mut self, request: BCWriteRequest) -> Self::Future {
|
||||
// Response channel we `.await` on.
|
||||
let (response_sender, receiver) = oneshot::channel();
|
||||
|
||||
// Send the write request.
|
||||
self.sender.send((request, response_sender)).unwrap();
|
||||
|
||||
InfallibleOneshotReceiver::from(receiver)
|
||||
}
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- DatabaseWriter
|
||||
/// The single database writer thread.
|
||||
pub(super) struct DatabaseWriter {
|
||||
/// Receiver side of the database request channel.
|
||||
///
|
||||
/// Any caller can send some requests to this channel.
|
||||
/// They send them alongside another `Response` channel,
|
||||
/// which we will eventually send to.
|
||||
receiver: crossbeam::channel::Receiver<(BCWriteRequest, ResponseSender)>,
|
||||
|
||||
/// Access to the database.
|
||||
env: Arc<ConcreteEnv>,
|
||||
}
|
||||
|
||||
impl Drop for DatabaseWriter {
|
||||
fn drop(&mut self) {
|
||||
// TODO: log the writer thread has exited?
|
||||
}
|
||||
}
|
||||
|
||||
impl DatabaseWriter {
|
||||
/// The `DatabaseWriter`'s main function.
|
||||
///
|
||||
/// The writer just loops in this function, handling requests forever
|
||||
/// until the request channel is dropped or a panic occurs.
|
||||
#[cold]
|
||||
#[inline(never)] // Only called once.
|
||||
fn main(self) {
|
||||
// 1. Hang on request channel
|
||||
// 2. Map request to some database function
|
||||
// 3. Execute that function, get the result
|
||||
// 4. Return the result via channel
|
||||
'main: loop {
|
||||
let Ok((request, response_sender)) = self.receiver.recv() else {
|
||||
// If this receive errors, it means that the channel is empty
|
||||
// and disconnected, meaning the other side (all senders) have
|
||||
// been dropped. This means "shutdown", and we return here to
|
||||
// exit the thread.
|
||||
//
|
||||
// Since the channel is empty, it means we've also processed
|
||||
// all requests. Since it is disconnected, it means future
|
||||
// ones cannot come in.
|
||||
return;
|
||||
};
|
||||
|
||||
/// How many times should we retry handling the request on resize errors?
|
||||
///
|
||||
/// This is 1 on automatically resizing databases, meaning there is only 1 iteration.
|
||||
const REQUEST_RETRY_LIMIT: usize = if ConcreteEnv::MANUAL_RESIZE { 3 } else { 1 };
|
||||
|
||||
// Map [`Request`]'s to specific database functions.
|
||||
//
|
||||
// Both will:
|
||||
// 1. Map the request to a function
|
||||
// 2. Call the function
|
||||
// 3. (manual resize only) If resize is needed, resize and retry
|
||||
// 4. (manual resize only) Redo step {1, 2}
|
||||
// 5. Send the function's `Result` back to the requester
|
||||
//
|
||||
// FIXME: there's probably a more elegant way
|
||||
// to represent this retry logic with recursive
|
||||
// functions instead of a loop.
|
||||
'retry: for retry in 0..REQUEST_RETRY_LIMIT {
|
||||
// FIXME: will there be more than 1 write request?
|
||||
// this won't have to be an enum.
|
||||
let response = match &request {
|
||||
BCWriteRequest::WriteBlock(block) => write_block(&self.env, block),
|
||||
};
|
||||
|
||||
// If the database needs to resize, do so.
|
||||
if ConcreteEnv::MANUAL_RESIZE && matches!(response, Err(RuntimeError::ResizeNeeded))
|
||||
{
|
||||
// If this is the last iteration of the outer `for` loop and we
|
||||
// encounter a resize error _again_, it means something is wrong.
|
||||
assert_ne!(
|
||||
retry, REQUEST_RETRY_LIMIT,
|
||||
"database resize failed maximum of {REQUEST_RETRY_LIMIT} times"
|
||||
);
|
||||
|
||||
// Resize the map, and retry the request handling loop.
|
||||
//
|
||||
// FIXME:
|
||||
// We could pass in custom resizes to account for
|
||||
// batches, i.e., we're about to add ~5GB of data,
|
||||
// add that much instead of the default 1GB.
|
||||
// <https://github.com/monero-project/monero/blob/059028a30a8ae9752338a7897329fe8012a310d5/src/blockchain_db/lmdb/db_lmdb.cpp#L665-L695>
|
||||
let old = self.env.current_map_size();
|
||||
let new = self.env.resize_map(None);
|
||||
|
||||
// TODO: use tracing.
|
||||
println!("resizing database memory map, old: {old}B, new: {new}B");
|
||||
|
||||
// Try handling the request again.
|
||||
continue 'retry;
|
||||
}
|
||||
|
||||
// Automatically resizing databases should not be returning a resize error.
|
||||
#[cfg(debug_assertions)]
|
||||
if !ConcreteEnv::MANUAL_RESIZE {
|
||||
assert!(
|
||||
!matches!(response, Err(RuntimeError::ResizeNeeded)),
|
||||
"auto-resizing database returned a ResizeNeeded error"
|
||||
);
|
||||
}
|
||||
|
||||
// Send the response back, whether if it's an `Ok` or `Err`.
|
||||
if let Err(e) = response_sender.send(response) {
|
||||
// TODO: use tracing.
|
||||
println!("database writer failed to send response: {e:?}");
|
||||
}
|
||||
|
||||
continue 'main;
|
||||
}
|
||||
|
||||
// Above retry loop should either:
|
||||
// - continue to the next ['main] loop or...
|
||||
// - ...retry until panic
|
||||
unreachable!();
|
||||
}
|
||||
fn handle_bc_request(env: &ConcreteEnv, req: &BCWriteRequest) -> Result<BCResponse, RuntimeError> {
|
||||
match req {
|
||||
BCWriteRequest::WriteBlock(block) => write_block(env, block),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
22
storage/service/Cargo.toml
Normal file
22
storage/service/Cargo.toml
Normal file
|
@ -0,0 +1,22 @@
|
|||
[package]
|
||||
name = "cuprate-database-service"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
description = "Cuprate's database service abstraction"
|
||||
license = "MIT"
|
||||
authors = ["Boog900"]
|
||||
repository = "https://github.com/Cuprate/cuprate/tree/main/storage/service"
|
||||
keywords = ["cuprate", "service", "database"]
|
||||
|
||||
[dependencies]
|
||||
cuprate-database = { path = "../database" }
|
||||
cuprate-helper = { path = "../../helper", features = ["fs", "thread", "map"] }
|
||||
|
||||
serde = { workspace = true, optional = true }
|
||||
rayon = { workspace = true }
|
||||
tower = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
crossbeam = { workspace = true, features = ["std"] }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
6
storage/service/src/lib.rs
Normal file
6
storage/service/src/lib.rs
Normal file
|
@ -0,0 +1,6 @@
|
|||
mod reader_threads;
|
||||
mod service;
|
||||
|
||||
pub use reader_threads::{init_thread_pool, ReaderThreads};
|
||||
|
||||
pub use service::{DatabaseReadService, DatabaseWriteHandle};
|
|
@ -9,10 +9,24 @@
|
|||
//! based on these values.
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- Import
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use rayon::ThreadPool;
|
||||
#[cfg(feature = "serde")]
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub fn init_thread_pool(reader_threads: ReaderThreads) -> Arc<ThreadPool> {
|
||||
// How many reader threads to spawn?
|
||||
let reader_count = reader_threads.as_threads().get();
|
||||
|
||||
Arc::new(
|
||||
rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(reader_count)
|
||||
.thread_name(|i| format!("{}::DatabaseReader({i})", module_path!()))
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- ReaderThreads
|
||||
/// Amount of database reader threads to spawn when using [`service`](crate::service).
|
||||
|
@ -48,7 +62,7 @@ pub enum ReaderThreads {
|
|||
/// as such, it is equal to [`ReaderThreads::OnePerThread`].
|
||||
///
|
||||
/// ```rust
|
||||
/// # use cuprate_blockchain::config::*;
|
||||
/// # use cuprate_database_service::*;
|
||||
/// let reader_threads = ReaderThreads::from(0_usize);
|
||||
/// assert!(matches!(reader_threads, ReaderThreads::OnePerThread));
|
||||
/// ```
|
||||
|
@ -80,7 +94,7 @@ pub enum ReaderThreads {
|
|||
/// non-zero, but not 1 thread, the minimum value 1 will be returned.
|
||||
///
|
||||
/// ```rust
|
||||
/// # use cuprate_blockchain::config::*;
|
||||
/// # use cuprate_database_service::ReaderThreads;
|
||||
/// assert_eq!(ReaderThreads::Percent(0.000000001).as_threads().get(), 1);
|
||||
/// ```
|
||||
Percent(f32),
|
||||
|
@ -96,7 +110,7 @@ impl ReaderThreads {
|
|||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use cuprate_blockchain::config::ReaderThreads as R;
|
||||
/// use cuprate_database_service::ReaderThreads as R;
|
||||
///
|
||||
/// let total_threads: std::num::NonZeroUsize =
|
||||
/// cuprate_helper::thread::threads();
|
5
storage/service/src/service.rs
Normal file
5
storage/service/src/service.rs
Normal file
|
@ -0,0 +1,5 @@
|
|||
mod read;
|
||||
mod write;
|
||||
|
||||
pub use read::DatabaseReadService;
|
||||
pub use write::DatabaseWriteHandle;
|
84
storage/service/src/service/read.rs
Normal file
84
storage/service/src/service/read.rs
Normal file
|
@ -0,0 +1,84 @@
|
|||
use std::{
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use futures::channel::oneshot;
|
||||
use rayon::ThreadPool;
|
||||
use tower::Service;
|
||||
|
||||
use cuprate_database::ConcreteEnv;
|
||||
use cuprate_helper::asynch::InfallibleOneshotReceiver;
|
||||
|
||||
/// The [`rayon::ThreadPool`] service.
|
||||
///
|
||||
/// Uses an inner request handler and a rayon thread-pool to asynchronously handler requests.
|
||||
pub struct DatabaseReadService<Req, Res, Err> {
|
||||
/// The rayon thread-pool.
|
||||
pool: Arc<ThreadPool>,
|
||||
|
||||
/// The function used to handle request.
|
||||
inner_handler: Arc<dyn Fn(Req) -> Result<Res, Err> + Send + Sync + 'static>,
|
||||
}
|
||||
|
||||
impl<Req, Res, Err> Clone for DatabaseReadService<Req, Res, Err> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
pool: self.pool.clone(),
|
||||
inner_handler: self.inner_handler.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Res, Err> DatabaseReadService<Req, Res, Err>
|
||||
where
|
||||
Req: Send + 'static,
|
||||
Res: Send + 'static,
|
||||
Err: Send + 'static,
|
||||
{
|
||||
pub fn new(
|
||||
env: Arc<ConcreteEnv>,
|
||||
pool: Arc<ThreadPool>,
|
||||
req_handler: impl Fn(&ConcreteEnv, Req) -> Result<Res, Err> + Send + Sync + 'static,
|
||||
) -> Self {
|
||||
let inner_handler = Arc::new(move |req| req_handler(&env, req));
|
||||
|
||||
Self {
|
||||
pool,
|
||||
inner_handler,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Res, Err> Service<Req> for DatabaseReadService<Req, Res, Err>
|
||||
where
|
||||
Req: Send + 'static,
|
||||
Res: Send + 'static,
|
||||
Err: Send + 'static,
|
||||
{
|
||||
type Response = Res;
|
||||
type Error = Err;
|
||||
type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;
|
||||
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Req) -> Self::Future {
|
||||
// Response channel we `.await` on.
|
||||
let (response_sender, receiver) = oneshot::channel();
|
||||
|
||||
let handler = self.inner_handler.clone();
|
||||
|
||||
// Spawn the request in the rayon DB thread-pool.
|
||||
//
|
||||
// Note that this uses `self.pool` instead of `rayon::spawn`
|
||||
// such that any `rayon` parallel code that runs within
|
||||
// the passed closure uses the same `rayon` threadpool.
|
||||
self.pool.spawn(move || {
|
||||
drop(response_sender.send(handler(req)));
|
||||
});
|
||||
|
||||
InfallibleOneshotReceiver::from(receiver)
|
||||
}
|
||||
}
|
174
storage/service/src/service/write.rs
Normal file
174
storage/service/src/service/write.rs
Normal file
|
@ -0,0 +1,174 @@
|
|||
use cuprate_database::{ConcreteEnv, Env, RuntimeError};
|
||||
use cuprate_helper::asynch::InfallibleOneshotReceiver;
|
||||
use futures::channel::oneshot;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- Constants
|
||||
/// Name of the writer thread.
|
||||
const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter");
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- DatabaseWriteHandle
|
||||
/// Write handle to the database.
|
||||
///
|
||||
/// This is handle that allows `async`hronously writing to the database.
|
||||
///
|
||||
/// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`]
|
||||
/// will return an `async`hronous channel that can be `.await`ed upon
|
||||
/// to receive the corresponding [`BCResponse`].
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseWriteHandle<Req, Res> {
|
||||
/// Sender channel to the database write thread-pool.
|
||||
///
|
||||
/// We provide the response channel for the thread-pool.
|
||||
pub(super) sender:
|
||||
crossbeam::channel::Sender<(Req, oneshot::Sender<Result<Res, RuntimeError>>)>,
|
||||
}
|
||||
|
||||
impl<Req, Res> DatabaseWriteHandle<Req, Res>
|
||||
where
|
||||
Req: Send + 'static,
|
||||
Res: Debug + Send + 'static,
|
||||
{
|
||||
/// Initialize the single `DatabaseWriter` thread.
|
||||
#[cold]
|
||||
#[inline(never)] // Only called once.
|
||||
pub fn init(
|
||||
env: Arc<ConcreteEnv>,
|
||||
inner_handler: impl Fn(&ConcreteEnv, &Req) -> Result<Res, RuntimeError> + Send + 'static,
|
||||
) -> Self {
|
||||
// Initialize `Request/Response` channels.
|
||||
let (sender, receiver) = crossbeam::channel::unbounded();
|
||||
|
||||
// Spawn the writer.
|
||||
std::thread::Builder::new()
|
||||
.name(WRITER_THREAD_NAME.into())
|
||||
.spawn(move || database_writer(env, receiver, inner_handler))
|
||||
.unwrap();
|
||||
|
||||
Self { sender }
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Res> tower::Service<Req> for DatabaseWriteHandle<Req, Res> {
|
||||
type Response = Res;
|
||||
type Error = RuntimeError;
|
||||
type Future = InfallibleOneshotReceiver<Result<Res, RuntimeError>>;
|
||||
|
||||
#[inline]
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn call(&mut self, request: Req) -> Self::Future {
|
||||
// Response channel we `.await` on.
|
||||
let (response_sender, receiver) = oneshot::channel();
|
||||
|
||||
// Send the write request.
|
||||
self.sender.send((request, response_sender)).unwrap();
|
||||
|
||||
InfallibleOneshotReceiver::from(receiver)
|
||||
}
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- database_writer
|
||||
/// The main function of the writer thread.
|
||||
fn database_writer<Req, Res>(
|
||||
env: Arc<ConcreteEnv>,
|
||||
receiver: crossbeam::channel::Receiver<(Req, oneshot::Sender<Result<Res, RuntimeError>>)>,
|
||||
inner_handler: impl Fn(&ConcreteEnv, &Req) -> Result<Res, RuntimeError>,
|
||||
) where
|
||||
Req: Send + 'static,
|
||||
Res: Debug + Send + 'static,
|
||||
{
|
||||
// 1. Hang on request channel
|
||||
// 2. Map request to some database function
|
||||
// 3. Execute that function, get the result
|
||||
// 4. Return the result via channel
|
||||
'main: loop {
|
||||
let Ok((request, response_sender)) = receiver.recv() else {
|
||||
// If this receive errors, it means that the channel is empty
|
||||
// and disconnected, meaning the other side (all senders) have
|
||||
// been dropped. This means "shutdown", and we return here to
|
||||
// exit the thread.
|
||||
//
|
||||
// Since the channel is empty, it means we've also processed
|
||||
// all requests. Since it is disconnected, it means future
|
||||
// ones cannot come in.
|
||||
return;
|
||||
};
|
||||
|
||||
/// How many times should we retry handling the request on resize errors?
|
||||
///
|
||||
/// This is 1 on automatically resizing databases, meaning there is only 1 iteration.
|
||||
const REQUEST_RETRY_LIMIT: usize = if ConcreteEnv::MANUAL_RESIZE { 3 } else { 1 };
|
||||
|
||||
// Map [`Request`]'s to specific database functions.
|
||||
//
|
||||
// Both will:
|
||||
// 1. Map the request to a function
|
||||
// 2. Call the function
|
||||
// 3. (manual resize only) If resize is needed, resize and retry
|
||||
// 4. (manual resize only) Redo step {1, 2}
|
||||
// 5. Send the function's `Result` back to the requester
|
||||
//
|
||||
// FIXME: there's probably a more elegant way
|
||||
// to represent this retry logic with recursive
|
||||
// functions instead of a loop.
|
||||
'retry: for retry in 0..REQUEST_RETRY_LIMIT {
|
||||
// FIXME: will there be more than 1 write request?
|
||||
// this won't have to be an enum.
|
||||
let response = inner_handler(&env, &request);
|
||||
|
||||
// If the database needs to resize, do so.
|
||||
if ConcreteEnv::MANUAL_RESIZE && matches!(response, Err(RuntimeError::ResizeNeeded)) {
|
||||
// If this is the last iteration of the outer `for` loop and we
|
||||
// encounter a resize error _again_, it means something is wrong.
|
||||
assert_ne!(
|
||||
retry, REQUEST_RETRY_LIMIT,
|
||||
"database resize failed maximum of {REQUEST_RETRY_LIMIT} times"
|
||||
);
|
||||
|
||||
// Resize the map, and retry the request handling loop.
|
||||
//
|
||||
// FIXME:
|
||||
// We could pass in custom resizes to account for
|
||||
// batches, i.e., we're about to add ~5GB of data,
|
||||
// add that much instead of the default 1GB.
|
||||
// <https://github.com/monero-project/monero/blob/059028a30a8ae9752338a7897329fe8012a310d5/src/blockchain_db/lmdb/db_lmdb.cpp#L665-L695>
|
||||
let old = env.current_map_size();
|
||||
let new = env.resize_map(None);
|
||||
|
||||
// TODO: use tracing.
|
||||
println!("resizing database memory map, old: {old}B, new: {new}B");
|
||||
|
||||
// Try handling the request again.
|
||||
continue 'retry;
|
||||
}
|
||||
|
||||
// Automatically resizing databases should not be returning a resize error.
|
||||
#[cfg(debug_assertions)]
|
||||
if !ConcreteEnv::MANUAL_RESIZE {
|
||||
assert!(
|
||||
!matches!(response, Err(RuntimeError::ResizeNeeded)),
|
||||
"auto-resizing database returned a ResizeNeeded error"
|
||||
);
|
||||
}
|
||||
|
||||
// Send the response back, whether if it's an `Ok` or `Err`.
|
||||
if let Err(e) = response_sender.send(response) {
|
||||
// TODO: use tracing.
|
||||
println!("database writer failed to send response: {e:?}");
|
||||
}
|
||||
|
||||
continue 'main;
|
||||
}
|
||||
|
||||
// Above retry loop should either:
|
||||
// - continue to the next ['main] loop or...
|
||||
// - ...retry until panic
|
||||
unreachable!();
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue