diff --git a/Cargo.lock b/Cargo.lock index 874df28..d80f20d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -576,9 +576,11 @@ dependencies = [ "crossbeam", "cuprate-helper", "heed", + "page_size", "paste", "sanakirja", "serde", + "tempfile", "thiserror", "tokio", "tower", diff --git a/database/Cargo.toml b/database/Cargo.toml index 91930c9..281edea 100644 --- a/database/Cargo.toml +++ b/database/Cargo.toml @@ -13,21 +13,38 @@ default = ["heed", "service"] # default = ["sanakirja", "service"] # For testing `sanakirja`. heed = ["dep:heed"] sanakirja = ["dep:sanakirja"] -service = ["dep:cuprate-helper", "dep:crossbeam", "dep:tokio", "dep:tower"] +service = ["dep:crossbeam", "dep:tokio", "dep:tower"] [dependencies] -cfg-if = { workspace = true } -paste = { workspace = true } -thiserror = { workspace = true } +cfg-if = { workspace = true } +# FIXME: +# We only need the `thread` feature if `service` is enabled. +# Figure out how to enable features of an already pulled in dependency conditionally. +cuprate-helper = { path = "../helper", features = ["fs", "thread"] } +paste = { workspace = true } +# Needed for database resizes. +# They must be a multiple of the OS page size. +page_size = { version = "0.6.0" } +thiserror = { workspace = true } # `service` feature. -cuprate-helper = { path = "../helper", features = ["thread"], optional = true } crossbeam = { workspace = true, features = ["std"], optional = true } tokio = { workspace = true, features = ["full"], optional = true } tower = { workspace = true, features = ["full"], optional = true } +# SOMEDAY: could be used in `service` as +# the database mutual exclusive `RwLock`. +# +# `parking_lot` has a fairness policy unlike `std`, +# although for now (and until testing is done), +# `std` is fine. +# parking_lot = { workspace = true, optional = true } # Optional features. -borsh = { workspace = true, optional = true } -heed = { git = "https://github.com/Cuprate/heed", rev = "5aa75b7", optional = true } -sanakirja = { version = "1.4.0", optional = true } -serde = { workspace = true, optional = true } +borsh = { workspace = true, optional = true } +heed = { git = "https://github.com/Cuprate/heed", rev = "5aa75b7", optional = true } +sanakirja = { version = "1.4.0", optional = true } +serde = { workspace = true, optional = true } + +[dev-dependencies] +page_size = { version = "0.6.0" } +tempfile = { version = "3.10.0" } \ No newline at end of file diff --git a/database/README.md b/database/README.md index 622a146..4279972 100644 --- a/database/README.md +++ b/database/README.md @@ -1,8 +1,6 @@ # Database Cuprate's database implementation. -TODO: document `Pod` and how databases use (de)serialize objects when storing/fetching, essentially using `<[u8], [u8]>`. - 1. [Documentation](#documentation) 1. [File Structure](#file-structure) @@ -16,9 +14,12 @@ TODO: document `Pod` and how databases use (de)serialize objects when storing/fe 1. [Layers](#layers) - [Database](#database) - [Trait](#trait) - - [ConcreteDatabase](#concretedatabase) + - [ConcreteEnv](#concreteenvConcreteEnv - [Thread-pool](#thread-pool) - [Service](#service) +1. [Resizing](#resizing) +1. [Flushing](#flushing) +1. [(De)serialization](#deserialization) --- @@ -52,7 +53,7 @@ The code within `src/` is also littered with some `grep`-able comments containin | `FIXME` | This code works but isn't ideal | `HACK` | This code is a brittle workaround | `PERF` | This code is weird for performance reasons -| `TODO` | This has to be implemented +| `TODO` | This must be implemented; There should be 0 of these in production code | `SOMEDAY` | This should be implemented... someday # File Structure @@ -65,6 +66,7 @@ The top-level `src/` files. | File | Purpose | |------------------|---------| +| `config.rs` | Database `Env` configuration | `constants.rs` | General constants used throughout `cuprate-database` | `database.rs` | Abstracted database; `trait Database` | `env.rs` | Abstracted database environment; `trait Env` @@ -138,6 +140,8 @@ cargo doc ``` `LMDB` should not need to be installed as `heed` has a build script that pulls it in automatically. +TODO: document max readers limit: https://github.com/monero-project/monero/blob/059028a30a8ae9752338a7897329fe8012a310d5/src/blockchain_db/lmdb/db_lmdb.cpp#L1372. Other potential processes (e.g. `xmrblocks`) that are also reading the `data.mdb` file need to be accounted for. + ## `sanakirja` TODO @@ -146,6 +150,21 @@ TODO: update with accurate information when ready, update image. ## Database ## Trait -## ConcreteDatabase +## ConcreteEnv ## Thread ## Service + +# Resizing +TODO: document resize algorithm: +- Exactly when it occurs +- How much bytes are added + +All backends follow the same algorithm. + +# Flushing +TODO: document disk flushing behavior. +- Config options +- Backend-specific behavior + +# (De)serialization +TODO: document `Pod` and how databases use (de)serialize objects when storing/fetching, essentially using `<[u8], [u8]>`. \ No newline at end of file diff --git a/database/src/backend/heed/env.rs b/database/src/backend/heed/env.rs index 7026f1b..bb7deb9 100644 --- a/database/src/backend/heed/env.rs +++ b/database/src/backend/heed/env.rs @@ -1,66 +1,132 @@ //! Implementation of `trait Env` for `heed`. //---------------------------------------------------------------------------------------------------- Import -use std::path::Path; +use std::sync::RwLock; use crate::{ backend::heed::types::HeedDb, + config::Config, database::Database, env::Env, error::{InitError, RuntimeError}, + resize::ResizeAlgorithm, table::Table, }; //---------------------------------------------------------------------------------------------------- Env /// A strongly typed, concrete database environment, backed by `heed`. -pub struct ConcreteEnv(heed::Env); +pub struct ConcreteEnv { + /// The actual database environment. + /// + /// # Why `RwLock`? + /// We need mutual exclusive access to the environment for resizing. + /// + /// Using 2 atomics for mutual exclusion was considered: + /// - `currently_resizing: AtomicBool` + /// - `reader_count: AtomicUsize` + /// + /// This is how `monerod` does it: + /// + /// + /// `currently_resizing` would be set to `true` on resizes and + /// `reader_count` would be spinned on until 0, at which point + /// we are safe to resize. + /// + /// Although, 3 atomic operations (check atomic bool, reader_count++, reader_count--) + /// turns out to be roughly as expensive as acquiring a non-contended `RwLock`, + /// the CPU sleeping instead of spinning is much better too. + /// + /// # `unwrap()` + /// This will be [`unwrap()`]ed everywhere. + /// + /// If lock is poisoned, we want all of Cuprate to panic. + env: RwLock, + + /// The configuration we were opened with + /// (and in current use). + config: Config, +} + +impl Drop for ConcreteEnv { + fn drop(&mut self) { + if let Err(e) = self.sync() { + // TODO: log error? + } + + // TODO: log that we are dropping the database. + } +} //---------------------------------------------------------------------------------------------------- Env Impl impl Env for ConcreteEnv { - /// TODO + const MANUAL_RESIZE: bool = true; + const SYNCS_PER_TX: bool = false; type RoTx<'db> = heed::RoTxn<'db>; - - /// TODO type RwTx<'db> = heed::RwTxn<'db>; - //------------------------------------------------ Required #[cold] #[inline(never)] // called once. - /// TODO - /// # Errors - /// TODO - fn open>(path: P) -> Result { + fn open(config: Config) -> Result { + // INVARIANT: + // We must open LMDB using `heed::EnvOpenOptions::max_readers` + // and input whatever is in `config.reader_threads` or else + // LMDB will start throwing errors if there are >126 readers. + // + // + // We should also leave reader slots for other processes, e.g. `xmrblocks`. + // + + // todo!() } - /// TODO - /// # Errors - /// TODO + fn config(&self) -> &Config { + &self.config + } + fn sync(&self) -> Result<(), RuntimeError> { todo!() } + fn resize_map(&self, resize_algorithm: Option) { + let resize_algorithm = resize_algorithm.unwrap_or_else(|| self.config().resize_algorithm); + + let current_size_bytes = self.current_map_size(); + let new_size_bytes = resize_algorithm.resize(current_size_bytes); + + // SAFETY: + // Resizing requires that we have + // exclusive access to the database environment. + // Our `heed::Env` is wrapped within a `RwLock`, + // and we have a WriteGuard to it, so we're safe. + // + // + unsafe { + // INVARIANT: `resize()` returns a valid `usize` to resize to. + self.env + .write() + .unwrap() + .resize(new_size_bytes.get()) + .unwrap(); + } + } + + fn current_map_size(&self) -> usize { + self.env.read().unwrap().info().map_size + } + #[inline] - /// TODO - /// # Errors - /// TODO fn ro_tx(&self) -> Result, RuntimeError> { todo!() } #[inline] - /// TODO - /// # Errors - /// TODO fn rw_tx(&self) -> Result, RuntimeError> { todo!() } #[cold] #[inline(never)] // called infrequently?. - /// TODO - /// # Errors - /// TODO fn create_tables_if_needed( &self, tx_rw: &mut Self::RwTx<'_>, @@ -69,9 +135,6 @@ impl Env for ConcreteEnv { } #[inline] - /// TODO - /// # Errors - /// TODO fn open_database( &self, to_rw: &Self::RoTx<'_>, diff --git a/database/src/backend/heed/error.rs b/database/src/backend/heed/error.rs index d371217..4bd4c16 100644 --- a/database/src/backend/heed/error.rs +++ b/database/src/backend/heed/error.rs @@ -1,7 +1,7 @@ //! Conversion from `heed::Error` -> `cuprate_database::RuntimeError`. //---------------------------------------------------------------------------------------------------- Use -use crate::constants::CUPRATE_DATABASE_CORRUPT_MSG; +use crate::constants::DATABASE_CORRUPT_MSG; //---------------------------------------------------------------------------------------------------- InitError impl From for crate::InitError { @@ -74,6 +74,7 @@ impl From for crate::RuntimeError { E1::Mdb(mdb_error) => match mdb_error { E2::KeyExist => Self::KeyExists, E2::NotFound => Self::KeyNotFound, + E2::MapFull => Self::ResizeNeeded, // Corruption errors, these have special panic messages. // @@ -82,7 +83,7 @@ impl From for crate::RuntimeError { // // "Requested page not found - this usually indicates corruption." // - E2::Corrupted | E2::PageNotFound => panic!("{mdb_error:?}\n{CUPRATE_DATABASE_CORRUPT_MSG}"), + E2::Corrupted | E2::PageNotFound => panic!("{mdb_error:?}\n{DATABASE_CORRUPT_MSG}"), // These errors should not occur, and if they do, // the best thing `cuprate_database` can do for @@ -102,21 +103,44 @@ impl From for crate::RuntimeError { // These errors are the same as above, but instead // of being errors we can't control, these are errors // that only happen if we write incorrect code. - E2::MapFull // Resize the map when needed. - | E2::ReadersFull // Don't spawn too many reader threads. - | E2::DbsFull // Don't create too many database tables. - | E2::CursorFull // Don't do crazy multi-nested LMDB cursor stuff. - | E2::MapResized // Resize the map when needed. - | E2::Incompatible // - | E2::BadValSize // Unsupported size of key/DB name/data, or wrong DUP_FIXED size. + + // "Database contents grew beyond environment mapsize." + // We should be resizing the map when needed, this error + // occurring indicates we did _not_ do that, which is a bug + // and we should panic. + // + // TODO: This can also mean _another_ process wrote to our + // LMDB file and increased the size. I don't think we need to accommodate for this. + // + // Although `monerod` reacts to that instead of `MDB_MAP_FULL` + // which is what `mdb_put()` returns so... idk? + // + | E2::MapResized + // We should be setting `heed::EnvOpenOptions::max_readers()` + // with our reader thread value in [`crate::config::Config`], + // thus this error should never occur. + // + | E2::ReadersFull + // Do not open more database tables than we initially started with. + // We know this number at compile time (amount of `Table`'s) so this + // should never happen. + // + // + | E2::DbsFull + // Don't do crazy multi-nested LMDB cursor stuff. + | E2::CursorFull + // + | E2::Incompatible + // Unsupported size of key/DB name/data, or wrong DUP_FIXED size. + // Don't use a key that is `>511` bytes. + // + | E2::BadValSize => panic!("fix the database code! {mdb_error:?}"), }, - // Database is shutting down. - E1::DatabaseClosing => Self::ShuttingDown, - // Only if we write incorrect code. E1::InvalidDatabaseTyping + | E1::DatabaseClosing | E1::BadOpenOptions { .. } | E1::Encoding(_) | E1::Decoding(_) => panic!("fix the database code! {error:?}"), diff --git a/database/src/backend/sanakirja/env.rs b/database/src/backend/sanakirja/env.rs index 823a41e..42e6e9a 100644 --- a/database/src/backend/sanakirja/env.rs +++ b/database/src/backend/sanakirja/env.rs @@ -1,10 +1,11 @@ //! Implementation of `trait Env` for `sanakirja`. //---------------------------------------------------------------------------------------------------- Import -use std::path::Path; +use std::{path::Path, sync::Arc}; use crate::{ backend::sanakirja::types::SanakirjaDb, + config::Config, database::Database, env::Env, error::{InitError, RuntimeError}, @@ -13,57 +14,61 @@ use crate::{ //---------------------------------------------------------------------------------------------------- ConcreteEnv /// A strongly typed, concrete database environment, backed by `sanakirja`. -pub struct ConcreteEnv(sanakirja::Env); +pub struct ConcreteEnv { + /// The actual database environment. + env: sanakirja::Env, + + /// The configuration we were opened with + /// (and in current use). + config: Config, +} + +impl Drop for ConcreteEnv { + fn drop(&mut self) { + if let Err(e) = self.sync() { + // TODO: log error? + } + + // TODO: log that we are dropping the database. + } +} //---------------------------------------------------------------------------------------------------- Env Impl impl Env for ConcreteEnv { - /// TODO - /// + const MANUAL_RESIZE: bool = false; + const SYNCS_PER_TX: bool = true; + /// FIXME: /// We could also implement `Borrow for ConcreteEnv` /// instead of this reference. type RoTx<'db> = sanakirja::Txn<&'db sanakirja::Env>; - - /// TODO type RwTx<'db> = sanakirja::MutTxn<&'db sanakirja::Env, ()>; - //------------------------------------------------ Required #[cold] #[inline(never)] // called once. - /// TODO - /// # Errors - /// TODO - fn open>(path: P) -> Result { + fn open(config: Config) -> Result { todo!() } - /// TODO - /// # Errors - /// TODO + fn config(&self) -> &Config { + &self.config + } + fn sync(&self) -> Result<(), RuntimeError> { todo!() } #[inline] - /// TODO - /// # Errors - /// TODO fn ro_tx(&self) -> Result, RuntimeError> { todo!() } #[inline] - /// TODO - /// # Errors - /// TODO fn rw_tx(&self) -> Result, RuntimeError> { todo!() } #[cold] #[inline(never)] // called infrequently?. - /// TODO - /// # Errors - /// TODO fn create_tables_if_needed( &self, tx_rw: &mut Self::RwTx<'_>, @@ -72,9 +77,6 @@ impl Env for ConcreteEnv { } #[inline] - /// TODO - /// # Errors - /// TODO fn open_database( &self, to_rw: &Self::RoTx<'_>, diff --git a/database/src/backend/sanakirja/error.rs b/database/src/backend/sanakirja/error.rs index 952881b..9fa0bb1 100644 --- a/database/src/backend/sanakirja/error.rs +++ b/database/src/backend/sanakirja/error.rs @@ -1,7 +1,7 @@ //! Conversion from `sanakirja::Error` -> `cuprate_database::RuntimeError`. //---------------------------------------------------------------------------------------------------- Import -use crate::constants::CUPRATE_DATABASE_CORRUPT_MSG; +use crate::constants::DATABASE_CORRUPT_MSG; //---------------------------------------------------------------------------------------------------- InitError impl From for crate::InitError { @@ -34,7 +34,7 @@ impl From for crate::RuntimeError { // A CRC failure essentially means a `sanakirja` page was corrupt. // - E::Corrupt(_) | E::CRC(_) => panic!("{error:?}\n{CUPRATE_DATABASE_CORRUPT_MSG}"), + E::Corrupt(_) | E::CRC(_) => panic!("{error:?}\n{DATABASE_CORRUPT_MSG}"), // These errors should not occur, and if they do, // the best thing `cuprate_database` can do for diff --git a/database/src/config.rs b/database/src/config.rs new file mode 100644 index 0000000..c09b477 --- /dev/null +++ b/database/src/config.rs @@ -0,0 +1,194 @@ +//! Database [`Env`](crate::Env) configuration. +//! +//! TODO + +//---------------------------------------------------------------------------------------------------- Import +use std::{borrow::Cow, num::NonZeroUsize, path::Path}; + +use cuprate_helper::fs::cuprate_database_dir; + +use crate::{constants::DATABASE_FILENAME, resize::ResizeAlgorithm}; + +//---------------------------------------------------------------------------------------------------- Config +/// Database [`Env`](crate::Env) configuration. +/// +/// This is the struct passed to [`Env::open`](crate::Env::open) that +/// allows the database to be configured in various ways. +/// +/// TODO: there's probably more options to add. +#[derive(Clone, PartialEq, PartialOrd)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct Config { + /// The directory used to store all database files. + /// + /// By default, if no value is provided in the [`Config`] + /// constructor functions, this will be [`cuprate_database_dir`]. + pub db_directory: Cow<'static, Path>, + + /// The actual database data file. + /// + /// This is private, and created from the above `db_directory`. + pub(crate) db_file: Cow<'static, Path>, + + /// TODO + pub sync_mode: SyncMode, + + /// Database reader thread count. + pub reader_threads: ReaderThreads, + + /// TODO + pub resize_algorithm: ResizeAlgorithm, +} + +impl Config { + /// TODO + fn return_db_dir_and_file>( + db_directory: Option

, + ) -> (Cow<'static, Path>, Cow<'static, Path>) { + // INVARIANT: all PATH safety checks are done + // in `helper::fs`. No need to do them here. + let db_directory = db_directory.map_or_else( + || Cow::Borrowed(cuprate_database_dir()), + |p| Cow::Owned(p.as_ref().to_path_buf()), + ); + + let mut db_file = db_directory.to_path_buf(); + db_file.push(DATABASE_FILENAME); + + (db_directory, Cow::Owned(db_file)) + } + + /// TODO + pub fn new>(db_directory: Option

) -> Self { + let (db_directory, db_file) = Self::return_db_dir_and_file(db_directory); + Self { + db_directory, + db_file, + sync_mode: SyncMode::Safe, + reader_threads: ReaderThreads::OnePerThread, + resize_algorithm: ResizeAlgorithm::new(), + } + } + + /// TODO + pub fn fast>(db_directory: Option

) -> Self { + let (db_directory, db_file) = Self::return_db_dir_and_file(db_directory); + Self { + db_directory, + db_file, + sync_mode: SyncMode::Fastest, + reader_threads: ReaderThreads::OnePerThread, + resize_algorithm: ResizeAlgorithm::new(), + } + } + + /// TODO + pub fn low_power>(db_directory: Option

) -> Self { + let (db_directory, db_file) = Self::return_db_dir_and_file(db_directory); + Self { + db_directory, + db_file, + sync_mode: SyncMode::Safe, + reader_threads: ReaderThreads::One, + resize_algorithm: ResizeAlgorithm::new(), + } + } + + /// Return the absolute [`Path`] to the database data file. + pub fn db_file_path(&self) -> &Path { + &self.db_file + } +} + +impl Default for Config { + fn default() -> Self { + Self::new(None::<&'static Path>) + } +} + +//---------------------------------------------------------------------------------------------------- SyncMode +/// TODO +#[derive(Copy, Clone, Default, PartialEq, PartialOrd, Eq, Ord, Hash)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr( + feature = "borsh", + derive(borsh::BorshSerialize, borsh::BorshDeserialize) +)] +pub enum SyncMode { + /// Fully sync to disk per transaction. + #[default] + Safe, + + /// Asynchronously sync, only flush at database shutdown. + Fastest, +} + +//---------------------------------------------------------------------------------------------------- ReaderThreads +/// TODO +#[derive(Copy, Clone, Default, PartialEq, PartialOrd)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr( + feature = "borsh", + derive(borsh::BorshSerialize, borsh::BorshDeserialize) +)] +pub enum ReaderThreads { + #[default] + /// TODO + OnePerThread, + + /// TODO + One, + + /// TODO + Number(NonZeroUsize), + + /// TODO + /// + /// # Invariant + /// Must be `0.0..=1.0`. + Percent(f32), +} + +impl ReaderThreads { + /// TODO + // # Invariant + // LMDB will error if we input zero, so don't allow that. + // + pub fn as_threads(&self) -> NonZeroUsize { + let total_threads = cuprate_helper::thread::threads(); + + match self { + Self::OnePerThread => total_threads, + Self::One => NonZeroUsize::MIN, + Self::Number(n) => std::cmp::min(*n, total_threads), + + // We handle the casting loss. + #[allow( + clippy::cast_precision_loss, + clippy::cast_possible_truncation, + clippy::cast_sign_loss + )] + Self::Percent(f) => { + if !f.is_normal() || !(0.0..=1.0).contains(f) { + return total_threads; + } + + let thread_percent = (total_threads.get() as f32) * f; + let Some(threads) = NonZeroUsize::new(thread_percent as usize) else { + return total_threads; + }; + + std::cmp::min(threads, total_threads) + } + } + } +} + +impl> From for ReaderThreads { + fn from(value: T) -> Self { + match NonZeroUsize::new(value.into()) { + Some(n) => Self::Number(n), + None => Self::One, + } + } +} diff --git a/database/src/constants.rs b/database/src/constants.rs index 7a4ffd8..67037f5 100644 --- a/database/src/constants.rs +++ b/database/src/constants.rs @@ -2,29 +2,6 @@ //---------------------------------------------------------------------------------------------------- Import -//---------------------------------------------------------------------------------------------------- Directory/Files -/// The directory that contains database-related files. -/// -/// This is a sub-directory within the Cuprate folder, e.g: -/// ```txt -/// ~/.local/share/cuprate/ -/// ├─ database/ # <- -/// ├─ data.mdb -/// ├─ lock.mdb -/// ``` -pub const CUPRATE_DATABASE_DIR: &str = "database"; - -/// The actual database file name. -/// -/// This is a _file_ within [`CUPRATE_DATABASE_DIR`], e.g: -/// ```txt -/// ~/.local/share/cuprate/ -/// ├─ database/ -/// ├─ data.mdb # <- -/// ├─ lock.mdb -/// ``` -pub const CUPRATE_DATABASE_FILE: &str = "data"; - //---------------------------------------------------------------------------------------------------- Error Messages /// Corrupt database error message. /// @@ -32,7 +9,7 @@ pub const CUPRATE_DATABASE_FILE: &str = "data"; /// messages if we think the database is corrupted. /// /// This is meant to be user-friendly. -pub const CUPRATE_DATABASE_CORRUPT_MSG: &str = r"Cuprate has encountered a fatal error. The database may be corrupted. +pub const DATABASE_CORRUPT_MSG: &str = r"Cuprate has encountered a fatal error. The database may be corrupted. TODO: instructions on: 1. What to do @@ -47,21 +24,22 @@ cfg_if::cfg_if! { if #[cfg(all(feature = "sanakirja", not(feature = "heed")))] { /// Static string of the `crate` being used as the database backend. pub const DATABASE_BACKEND: &str = "sanakirja"; + + /// Cuprate's database filename. + /// + /// This is the filename for Cuprate's database, used in [`Config::db_file_path`](crate::config::Config::db_file_path). + pub const DATABASE_FILENAME: &str = "data.san"; // TODO: pick a name + extension. } else { /// Static string of the `crate` being used as the database backend. pub const DATABASE_BACKEND: &str = "heed"; + + /// Cuprate's database filename. + /// + /// This is the filename for Cuprate's database, used in [`Config::db_file_path`](crate::config::Config::db_file_path). + pub const DATABASE_FILENAME: &str = "data.mdb"; } } //---------------------------------------------------------------------------------------------------- Tests #[cfg(test)] -mod test { - use super::*; - - #[test] - /// Sanity check that our PATHs aren't empty... (will cause disaster). - fn non_empty_path() { - assert!(!CUPRATE_DATABASE_DIR.is_empty()); - assert!(!CUPRATE_DATABASE_FILE.is_empty()); - } -} +mod test {} diff --git a/database/src/env.rs b/database/src/env.rs index e9da72f..4b434a5 100644 --- a/database/src/env.rs +++ b/database/src/env.rs @@ -1,11 +1,11 @@ //! Abstracted database environment; `trait Env`. //---------------------------------------------------------------------------------------------------- Import -use std::path::Path; - use crate::{ + config::Config, database::Database, error::{InitError, RuntimeError}, + resize::ResizeAlgorithm, table::Table, transaction::{RoTx, RwTx}, }; @@ -13,8 +13,31 @@ use crate::{ //---------------------------------------------------------------------------------------------------- Env /// Database environment abstraction. /// -/// TODO +/// Essentially, the functions that can be called on [`ConcreteEnv`](crate::ConcreteEnv). +/// +/// # `Drop` +/// Objects that implement [`Env`] _should_ probably +/// [`Env::sync`] in their drop implementations, +/// although, no invariant relies on this (yet). pub trait Env: Sized { + //------------------------------------------------ Constants + /// Does the database backend need to be manually + /// resized when the memory-map is full? + /// + /// # Invariant + /// If this is `false`, that means this [`Env`] + /// can _never_ return a [`RuntimeError::ResizeNeeded`]. + /// + /// If this is `true`, [`Env::resize_map`] & [`Env::current_map_size`] + /// _must_ be re-implemented, as it just panics by default. + const MANUAL_RESIZE: bool; + + /// Does the database backend forcefully sync/flush + /// to disk on every transaction commit? + /// + /// This is used as an optimization. + const SYNCS_PER_TX: bool; + //------------------------------------------------ Types /// TODO type RoTx<'db>: RoTx<'db>; @@ -26,13 +49,64 @@ pub trait Env: Sized { /// TODO /// # Errors /// TODO - fn open>(path: P) -> Result; + fn open(config: Config) -> Result; + + /// Return the [`Config`] that this database was [`Env::open`]ed with. + fn config(&self) -> &Config; + + /// Return the amount of actual of bytes the database is taking up on disk. + /// + /// This is the current _disk_ value in bytes, not the memory map. + /// + /// # Errors + /// This will error if either: + /// + /// - [`std::fs::File::open`] + /// - [`std::fs::File::metadata`] + /// + /// failed on the database file on disk. + fn disk_size_bytes(&self) -> std::io::Result { + // We have the direct PATH to the file, + // no need to use backend-specific functions. + // + // SAFETY: as we are only accessing the metadata of + // the file and not reading the bytes, it should be + // fine even with a memory mapped file being actively + // written to. + Ok(std::fs::File::open(&self.config().db_file)? + .metadata()? + .len()) + } /// TODO /// # Errors /// TODO fn sync(&self) -> Result<(), RuntimeError>; + /// Resize the database's memory map to a + /// new (bigger) size using a [`ResizeAlgorithm`]. + /// + /// By default, this function will use the `ResizeAlgorithm` in [`Env::config`]. + /// + /// If `resize_algorithm` is `Some`, that will be used instead. + /// + /// # Invariant + /// This function _must_ be re-implemented if [`Env::MANUAL_RESIZE`] is `true`. + /// + /// Otherwise, this function will panic with `unreachable!()`. + fn resize_map(&self, resize_algorithm: Option) { + unreachable!() + } + + /// What is the _current_ size of the database's memory map in bytes? + /// + /// # Invariant + /// 1. This function _must_ be re-implemented if [`Env::MANUAL_RESIZE`] is `true`. + /// 2. This function must be accurate, as [`Env::resize_map()`] may depend on it. + fn current_map_size(&self) -> usize { + unreachable!() + } + /// TODO /// # Errors /// TODO diff --git a/database/src/error.rs b/database/src/error.rs index f3990d1..33c6434 100644 --- a/database/src/error.rs +++ b/database/src/error.rs @@ -4,15 +4,12 @@ //---------------------------------------------------------------------------------------------------- Import use std::fmt::Debug; -#[allow(unused_imports)] // docs -use crate::env::Env; - //---------------------------------------------------------------------------------------------------- Types /// Alias for a thread-safe boxed error. type BoxError = Box; //---------------------------------------------------------------------------------------------------- InitError -/// Errors that occur during ([`Env::open`]). +/// Errors that occur during ([`Env::open`](crate::env::Env::open)). /// /// # Handling /// As this is a database initialization error, the correct @@ -59,16 +56,18 @@ pub enum InitError { } //---------------------------------------------------------------------------------------------------- RuntimeError -/// Errors that occur _after_ successful ([`Env::open`]). +/// Errors that occur _after_ successful ([`Env::open`](crate::env::Env::open)). /// /// There are no errors for: /// 1. Missing tables /// 2. (De)serialization +/// 3. Shutdown errors /// /// as `cuprate_database` upholds the invariant that: /// /// 1. All tables exist /// 2. (De)serialization never fails +/// 3. The database (thread-pool) only shuts down when all channels are dropped #[derive(thiserror::Error, Debug)] pub enum RuntimeError { /// The given key already existed in the database. @@ -79,12 +78,14 @@ pub enum RuntimeError { #[error("key/value pair was not found")] KeyNotFound, + /// The database memory map is full and needs a resize. + /// + /// # Invariant + /// This error can only occur if [`Env::MANUAL_RESIZE`](crate::Env::MANUAL_RESIZE) is `true`. + #[error("database memory map must be resized")] + ResizeNeeded, + /// A [`std::io::Error`]. #[error("I/O error: {0}")] Io(#[from] std::io::Error), - - /// The database is currently in the process - /// of shutting down and cannot respond. - #[error("database is shutting down")] - ShuttingDown, } diff --git a/database/src/free.rs b/database/src/free.rs index d7889c5..a9b9385 100644 --- a/database/src/free.rs +++ b/database/src/free.rs @@ -1,14 +1,10 @@ //! General free functions (related to the database). +//! +//! TODO. //---------------------------------------------------------------------------------------------------- Import -//---------------------------------------------------------------------------------------------------- Constants - -//---------------------------------------------------------------------------------------------------- TYPE - -//---------------------------------------------------------------------------------------------------- IMPL - -//---------------------------------------------------------------------------------------------------- Trait Impl +//---------------------------------------------------------------------------------------------------- Free functions //---------------------------------------------------------------------------------------------------- Tests #[cfg(test)] diff --git a/database/src/key.rs b/database/src/key.rs index 57fe9b4..4257290 100644 --- a/database/src/key.rs +++ b/database/src/key.rs @@ -1,13 +1,10 @@ //! Database key abstraction; `trait Key`. //---------------------------------------------------------------------------------------------------- Import -#[allow(unused_imports)] // docs -use crate::table::Table; - use crate::pod::Pod; //---------------------------------------------------------------------------------------------------- Table -/// Database [`Table`] key metadata. +/// Database [`Table`](crate::table::Table) key metadata. /// /// Purely compile time information for database table keys, supporting duplicate keys. pub trait Key { diff --git a/database/src/lib.rs b/database/src/lib.rs index 1efb0eb..88755a0 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -1,5 +1,11 @@ //! Database abstraction and utilities. //! +//! This documentation is mostly for practical usage of `cuprate_database`. +//! +//! For a high-level overview, +//! see [`database/README.md`](https://github.com/Cuprate/cuprate/blob/main/database/README.md). +//! +//! # Purpose //! This crate does 3 things: //! 1. Abstracts various databases with the [`Env`], [`Database`], [`Table`], [`Key`], [`RoTx`], and [`RwTx`] traits //! 2. Implements various `Monero` related [functions](ops) & [`tables`] @@ -39,13 +45,16 @@ //! For example: //! - [`std::mem::size_of::`] //! - [`std::mem::align_of::`] -//! - [`Drop::::drop`] //! //! Things like these functions are affected by the backend and inner data, //! and should not be relied upon. This extends to any `struct/enum` that contains `ConcreteEnv`. //! -//! The only thing about `ConcreteEnv` that should -//! be relied upon is that it implements [`Env`]. +//! `ConcreteEnv` invariants you can rely on: +//! - It implements [`Env`] +//! - Upon [`Drop::drop`], all database data will sync to disk +//! +//! Note that `ConcreteEnv` itself is not a clonable type, +//! it should be wrapped in [`std::sync::Arc`]. //! //! TODO: we could also expose `ConcreteDatabase` if we're //! going to be storing any databases in structs, to lessen @@ -60,13 +69,55 @@ //! - `sanakirja` //! //! The default is `heed`. +//! +//! # Invariants when not using `service` +//! `cuprate_database` can be used without the `service` feature enabled but +//! there are some things that must be kept in mind when doing so: +//! +//! TODO: make pretty. these will need to be updated +//! as things change and as more backends are added. +//! +//! 1. Memory map resizing (must resize as needed) +//! 1. Must not exceed `Config`'s maximum reader count +//! 1. Avoid many nested transactions +//! 1. `heed::MdbError::BadValSize` +//! 1. `heed::Error::InvalidDatabaseTyping` +//! 1. `heed::Error::BadOpenOptions` +//! 1. Encoding/decoding into `[u8]` +//! +//! # Example +//! Simple usage of this crate. +//! +//! ```rust +//! use cuprate_database::{ +//! config::Config, +//! ConcreteEnv, +//! Env, Key, RoTx, RwTx, +//! service::{ReadRequest, WriteRequest, Response}, +//! }; +//! +//! // Create a configuration for the database environment. +//! let db_dir = tempfile::tempdir().unwrap(); +//! let config = Config::new(Some(db_dir)); +//! +//! // Initialize the database thread-pool. +//! +//! // TODO: +//! // 1. let (read_handle, write_handle) = cuprate_database::service::init(config).unwrap(); +//! // 2. Send write/read requests +//! // 3. Use some other `Env` functions +//! // 4. Shutdown +//! ``` //---------------------------------------------------------------------------------------------------- Lints // Forbid lints. // Our code, and code generated (e.g macros) cannot overrule these. #![forbid( + // `unsafe` is allowed but it _must_ be + // commented with `SAFETY: reason`. + clippy::undocumented_unsafe_blocks, + // Never. - unsafe_code, unused_unsafe, redundant_semicolons, unused_allocation, @@ -215,6 +266,7 @@ clippy::module_name_repetitions, clippy::module_inception, clippy::redundant_pub_crate, + clippy::option_if_let_else, )] // Allow some lints when running in debug mode. #![cfg_attr(debug_assertions, allow(clippy::todo, clippy::multiple_crate_versions))] @@ -234,10 +286,10 @@ compile_error!("Cuprate is only compatible with 64-bit CPUs"); mod backend; pub use backend::ConcreteEnv; +pub mod config; + mod constants; -pub use constants::{ - CUPRATE_DATABASE_CORRUPT_MSG, CUPRATE_DATABASE_DIR, CUPRATE_DATABASE_FILE, DATABASE_BACKEND, -}; +pub use constants::{DATABASE_BACKEND, DATABASE_CORRUPT_MSG, DATABASE_FILENAME}; mod database; pub use database::Database; @@ -250,6 +302,8 @@ pub use error::{InitError, RuntimeError}; mod free; +pub mod resize; + mod key; pub use key::{DupKey, Key}; diff --git a/database/src/resize.rs b/database/src/resize.rs new file mode 100644 index 0000000..c77c1ee --- /dev/null +++ b/database/src/resize.rs @@ -0,0 +1,309 @@ +//! Database memory map resizing algorithms. +//! +//! This modules contains [`ResizeAlgorithm`] which determines how the +//! [`ConcreteEnv`](crate::ConcreteEnv) resizes it's memory map when needing more space. +//! This value is in [`Config`](crate::config::Config) and can be selected at runtime. +//! +//! Although, it is only used by `ConcreteEnv` if [`Env::MANUAL_RESIZE`](crate::env::Env::MANUAL_RESIZE) is `true`. +//! +//! The algorithms are available as free functions in this module as well. +//! +//! # Page size +//! All free functions in this module will +//! return a multiple of the OS page size ([`page_size()`]), +//! [LMDB will error](http://www.lmdb.tech/doc/group__mdb.html#gaa2506ec8dab3d969b0e609cd82e619e5) +//! if this is not the case. +//! +//! # Invariants +//! All returned [`NonZeroUsize`] values of the free functions in this module +//! (including [`ResizeAlgorithm::resize`]) uphold the following invariants: +//! 1. It will always be `>=` the input `current_size_bytes` +//! 2. It will always be a multiple of [`page_size()`] + +//---------------------------------------------------------------------------------------------------- Import +use std::{num::NonZeroUsize, sync::OnceLock}; + +//---------------------------------------------------------------------------------------------------- ResizeAlgorithm +/// The function/algorithm used by the +/// database when resizing the memory map. +/// +/// # TODO +/// We could test around with different algorithms. +/// Calling [`heed::Env::resize`] is surprisingly fast, +/// around `0.0000082s` on my machine. We could probably +/// get away with smaller and more frequent resizes. +/// **With the caveat being we are taking a `WriteGuard` to a `RwLock`.** +#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr( + feature = "borsh", + derive(borsh::BorshSerialize, borsh::BorshDeserialize) +)] +pub enum ResizeAlgorithm { + /// Uses [`monero`]. + Monero, + + /// Uses [`fixed_bytes`]. + FixedBytes(NonZeroUsize), + + /// Uses [`percent`]. + Percent(f32), +} + +impl ResizeAlgorithm { + /// Returns [`Self::Monero`]. + /// + /// ```rust + /// # use cuprate_database::resize::*; + /// assert!(matches!(ResizeAlgorithm::new(), ResizeAlgorithm::Monero)); + /// ``` + #[inline] + pub const fn new() -> Self { + Self::Monero + } + + /// Maps the `self` variant to the free functions in [`crate::resize`]. + #[inline] + pub fn resize(&self, current_size_bytes: usize) -> NonZeroUsize { + match self { + Self::Monero => monero(current_size_bytes), + Self::FixedBytes(u) => todo!(), + Self::Percent(f) => todo!(), + } + } +} + +impl Default for ResizeAlgorithm { + /// Calls [`Self::new`]. + /// + /// ```rust + /// # use cuprate_database::resize::*; + /// assert_eq!(ResizeAlgorithm::new(), ResizeAlgorithm::default()); + /// ``` + #[inline] + fn default() -> Self { + Self::new() + } +} + +//---------------------------------------------------------------------------------------------------- Free functions +/// Cached result of [`page_size()`]. +static PAGE_SIZE: OnceLock = OnceLock::new(); +/// This function retrieves the system’s memory page size. +/// +/// It is just [`page_size::get`](https://docs.rs/page_size) internally. +/// +/// This caches the result, so this function is cheap after the 1st call. +/// +/// # Panics +/// This function will panic if the OS returns of page size of `0` (impossible?). +#[inline] +pub fn page_size() -> NonZeroUsize { + *PAGE_SIZE + .get_or_init(|| NonZeroUsize::new(page_size::get()).expect("page_size::get() returned 0")) +} + +/// Memory map resize closely matching `monerod`. +/// +/// # Method +/// This function mostly matches `monerod`'s current resize implementation[^1], +/// and will increase `current_size_bytes` by `1 << 30`[^2] exactly then +/// rounded to the nearest multiple of the OS page size. +/// +/// [^1]: +/// +/// [^2]: `1_073_745_920` +/// +/// ```rust +/// # use cuprate_database::resize::*; +/// // The value this function will increment by +/// // (assuming page multiple of 4096). +/// const N: usize = 1_073_741_824; +/// +/// // 0 returns the minimum value. +/// assert_eq!(monero(0).get(), N); +/// +/// // Rounds up to nearest OS page size. +/// assert_eq!(monero(1).get(), N + page_size().get()); +/// ``` +/// +/// # Panics +/// This function will panic if adding onto `current_size_bytes` overflows [`usize::MAX`]. +/// +/// ```rust,should_panic +/// # use cuprate_database::resize::*; +/// // Ridiculous large numbers panic. +/// monero(usize::MAX); +/// ``` +pub fn monero(current_size_bytes: usize) -> NonZeroUsize { + /// The exact expression used by `monerod` + /// when calculating how many bytes to add. + /// + /// The nominal value is `1_073_741_824`. + /// Not actually 1 GB but close enough I guess. + /// + /// + const ADD_SIZE: usize = 1_usize << 30; + + let page_size = page_size().get(); + let new_size_bytes = current_size_bytes + ADD_SIZE; + + // Round up the new size to the + // nearest multiple of the OS page size. + let remainder = new_size_bytes % page_size; + + // INVARIANT: minimum is always at least `ADD_SIZE`. + NonZeroUsize::new(if remainder == 0 { + new_size_bytes + } else { + (new_size_bytes + page_size) - remainder + }) + .unwrap() +} + +/// Memory map resize by a fixed amount of bytes. +/// +/// # Method +/// This function will `current_size_bytes + add_bytes` +/// and then round up to nearest OS page size. +/// +/// ```rust +/// # use cuprate_database::resize::*; +/// let page_size: usize = page_size().get(); +/// +/// // Anything below the page size will round up to the page size. +/// for i in 0..=page_size { +/// assert_eq!(fixed_bytes(0, i).get(), page_size); +/// } +/// +/// // (page_size + 1) will round up to (page_size * 2). +/// assert_eq!(fixed_bytes(page_size, 1).get(), page_size * 2); +/// +/// // (page_size + page_size) doesn't require any rounding. +/// assert_eq!(fixed_bytes(page_size, page_size).get(), page_size * 2); +/// ``` +/// +/// # Panics +/// This function will panic if adding onto `current_size_bytes` overflows [`usize::MAX`]. +/// +/// ```rust,should_panic +/// # use cuprate_database::resize::*; +/// // Ridiculous large numbers panic. +/// fixed_bytes(1, usize::MAX); +/// ``` +pub fn fixed_bytes(current_size_bytes: usize, add_bytes: usize) -> NonZeroUsize { + let page_size = page_size(); + let new_size_bytes = current_size_bytes + add_bytes; + + // Guard against < page_size. + if new_size_bytes <= page_size.get() { + return page_size; + } + + // Round up the new size to the + // nearest multiple of the OS page size. + let remainder = new_size_bytes % page_size; + + // INVARIANT: we guarded against < page_size above. + NonZeroUsize::new(if remainder == 0 { + new_size_bytes + } else { + (new_size_bytes + page_size.get()) - remainder + }) + .unwrap() +} + +/// Memory map resize by a percentage. +/// +/// # Method +/// This function will multiply `current_size_bytes` by `percent`. +/// +/// Any input `<= 1.0` or non-normal float ([`f32::NAN`], [`f32::INFINITY`]) +/// will make the returning `NonZeroUsize` the same as `current_size_bytes` +/// (rounded up to the OS page size). +/// +/// ```rust +/// # use cuprate_database::resize::*; +/// let page_size: usize = page_size().get(); +/// +/// // Anything below the page size will round up to the page size. +/// for i in 0..=page_size { +/// assert_eq!(percent(i, 1.0).get(), page_size); +/// } +/// +/// // Same for 2 page sizes. +/// for i in (page_size + 1)..=(page_size * 2) { +/// assert_eq!(percent(i, 1.0).get(), page_size * 2); +/// } +/// +/// // Weird floats do nothing. +/// assert_eq!(percent(page_size, f32::NAN).get(), page_size); +/// assert_eq!(percent(page_size, f32::INFINITY).get(), page_size); +/// assert_eq!(percent(page_size, f32::NEG_INFINITY).get(), page_size); +/// assert_eq!(percent(page_size, -1.0).get(), page_size); +/// assert_eq!(percent(page_size, 0.999).get(), page_size); +/// ``` +/// +/// # Panics +/// This function will panic if `current_size_bytes * percent` +/// is closer to [`usize::MAX`] than the OS page size. +/// +/// ```rust,should_panic +/// # use cuprate_database::resize::*; +/// // Ridiculous large numbers panic. +/// percent(usize::MAX, 1.001); +/// ``` +pub fn percent(current_size_bytes: usize, percent: f32) -> NonZeroUsize { + // Guard against bad floats. + use std::num::FpCategory; + let percent = match percent.classify() { + FpCategory::Normal => { + if percent <= 1.0 { + 1.0 + } else { + percent + } + } + _ => 1.0, + }; + + let page_size = page_size(); + + // INVARIANT: Allow `f32` <-> `usize` casting, we handle all cases. + #[allow( + clippy::cast_possible_truncation, + clippy::cast_sign_loss, + clippy::cast_precision_loss + )] + let new_size_bytes = ((current_size_bytes as f32) * percent) as usize; + + // Panic if rounding up to the nearest page size would overflow. + let new_size_bytes = if new_size_bytes > (usize::MAX - page_size.get()) { + panic!("new_size_bytes is percent() near usize::MAX"); + } else { + new_size_bytes + }; + + // Guard against < page_size. + if new_size_bytes <= page_size.get() { + return page_size; + } + + // Round up the new size to the + // nearest multiple of the OS page size. + let remainder = new_size_bytes % page_size; + + // INVARIANT: we guarded against < page_size above. + NonZeroUsize::new(if remainder == 0 { + new_size_bytes + } else { + (new_size_bytes + page_size.get()) - remainder + }) + .unwrap() +} + +//---------------------------------------------------------------------------------------------------- Tests +#[cfg(test)] +mod test { + // use super::*; +} diff --git a/database/src/service/free.rs b/database/src/service/free.rs index d77c6e0..ae64bf1 100644 --- a/database/src/service/free.rs +++ b/database/src/service/free.rs @@ -4,72 +4,46 @@ use std::sync::Arc; use crate::{ - service::read::DatabaseReader, - service::write::DatabaseWriter, - service::{DatabaseReadHandle, DatabaseWriteHandle}, - ConcreteEnv, + config::Config, + error::InitError, + service::{ + read::DatabaseReader, write::DatabaseWriter, DatabaseReadHandle, DatabaseWriteHandle, + }, + ConcreteEnv, Env, }; //---------------------------------------------------------------------------------------------------- Init #[cold] -#[inline(never)] // Only called once. +#[inline(never)] // Only called once (?) /// Initialize a database & thread-pool, and return a read/write handle to it. /// -/// The returned handles are cheaply [`Clone`]able. +/// Once the returned handles are [`Drop::drop`]ed, the reader +/// thread-pool and writer thread will exit automatically. /// -/// TODO: add blocking behavior docs. -pub fn init() -> (DatabaseReadHandle, DatabaseWriteHandle) { - // TODO: - // This should only ever be called once? - // We could `panic!()` if called twice. +/// # Errors +/// This will forward the error if [`Env::open`] failed. +// +// INVARIANT: +// `cuprate_database` depends on the fact that this is the only +// function that hands out the handles. After that, they can be +// cloned, however they must eventually be dropped and shouldn't +// be leaked. +// +// As the reader thread-pool and writer thread both rely on the +// disconnection (drop) of these channels for shutdown behavior, +// leaking these handles could cause data to not get flushed to disk. +pub fn init(config: Config) -> Result<(DatabaseReadHandle, DatabaseWriteHandle), InitError> { + let reader_threads = config.reader_threads; // Initialize the database itself. - // TODO: there's probably shutdown code we have to run. - let db: Arc = Arc::new(todo!()); + let db: Arc = Arc::new(ConcreteEnv::open(config)?); - // Spawn the `Reader/Writer` thread pools. - let readers = DatabaseReader::init(&db); - let writers = DatabaseWriter::init(&db); + // Spawn the Reader thread pool and Writer. + let readers = DatabaseReader::init(&db, reader_threads); + let writers = DatabaseWriter::init(db); // Return the handles to those pools. - (readers, writers) -} - -#[cold] -#[inline(never)] // Only called once. -/// Sync/flush all data, and shutdown the database thread-pool. -/// -/// This function **blocks**, waiting until: -/// 1. All database transactions are complete -/// 2. All data has been flushed to disk -/// 3. All database threads have exited -/// -/// The database being shutdown is the one started in [`init()`], -/// aka, the single program global database. -/// -/// # TODO -/// Maybe the visibility/access of this function should somehow be -/// limited such that only certain parts of `cuprate` can actually -/// call this function. -/// -/// Anyone/everyone being able to shutdown the database seems dangerous. -/// -/// Counter-argument: we can just CTRL+F to see who calls this i guess. -pub fn shutdown(db: Arc) { - // Not sure how this function is going - // to work on a `&'static` database, but: - - // 1. Send a shutdown message to all database threads, maybe `Request::Shutdown` - // 2. Wait on barrier until all threads are "ready" (all tx's are done) - // 3. Writer thread will flush all data to disk - // 4. All threads exit, 1 of them sends us back an OK - // 5. We don't need to reclaim ownership of `&'static ConcreteEnv` because... - // 5a) a bunch of threads have a `&` to it, so this is hard (impossible?) - // 5b) as along as data is flushed, we can just `std::process::exit` - // and there's no need to (manually) drop the actual database - - drop(db); - todo!(); + Ok((readers, writers)) } //---------------------------------------------------------------------------------------------------- Tests diff --git a/database/src/service/mod.rs b/database/src/service/mod.rs index 2146995..3ce2bfb 100644 --- a/database/src/service/mod.rs +++ b/database/src/service/mod.rs @@ -8,17 +8,10 @@ //! sending database [`Request`](ReadRequest)s and receiving [`Response`]s `async`hronously - //! without having to actually worry and handle the database themselves. //! -//! The system is managed by this crate, and only -//! requires [`init`] and [`shutdown`] by the user. +//! The system is managed by this crate, and only requires [`init`] by the user. //! //! This module must be enabled with the `service` feature. //! -//! ## Initialization -//! The database & thread-pool system can be initialized with [`init()`]. -//! -//! This causes the underlying database/threads to be setup -//! and returns a read/write handle to that database. -//! //! ## Handles //! The 2 handles to the database are: //! - [`DatabaseReadHandle`] @@ -32,6 +25,21 @@ //! the `DatabaseWriteHandle` cannot be cloned. There is only 1 place in Cuprate that //! writes, so it is passed there and used. //! +//! ## Initialization +//! The database & thread-pool system can be initialized with [`init()`]. +//! +//! This causes the underlying database/threads to be setup +//! and returns a read/write handle to that database. +//! +//! ## Shutdown +//! Upon the above handles being dropped, the corresponding thread(s) will automatically exit, i.e: +//! - The last [`DatabaseReadHandle`] is dropped => reader thread-pool exits +//! - The last [`DatabaseWriteHandle`] is dropped => writer thread exits +//! +//! Upon dropping the [`crate::ConcreteEnv`]: +//! - All un-processed database transactions are completed +//! - All data gets flushed to disk (caused by [`Drop::drop`] impl of [`crate::ConcreteEnv`]) +//! //! ## Request and Response //! To interact with the database (whether reading or writing data), //! a `Request` can be sent using one of the above handles. @@ -40,7 +48,7 @@ //! //! An `async`hronous channel will be returned from the call. //! This channel can be `.await`ed upon to (eventually) receive -//! corresponding `Response` to your `Request`. +//! the corresponding `Response` to your `Request`. mod read; pub use read::DatabaseReadHandle; @@ -49,7 +57,7 @@ mod write; pub use write::DatabaseWriteHandle; mod free; -pub use free::{init, shutdown}; +pub use free::init; mod request; pub use request::{ReadRequest, WriteRequest}; diff --git a/database/src/service/read.rs b/database/src/service/read.rs index 3681ab8..2285db2 100644 --- a/database/src/service/read.rs +++ b/database/src/service/read.rs @@ -6,9 +6,12 @@ use std::{ task::{Context, Poll}, }; +use crossbeam::channel::Receiver; + use cuprate_helper::asynch::InfallibleOneshotReceiver; use crate::{ + config::ReaderThreads, error::RuntimeError, service::{request::ReadRequest, response::Response}, ConcreteEnv, @@ -68,7 +71,7 @@ impl tower::Service for DatabaseReadHandle { } } -//---------------------------------------------------------------------------------------------------- DatabaseReader Impl +//---------------------------------------------------------------------------------------------------- DatabaseReader /// Database reader thread. /// /// This struct essentially represents a thread. @@ -86,12 +89,21 @@ pub(super) struct DatabaseReader { /// /// SOMEDAY: this struct itself could cache a return channel /// instead of creating a new `oneshot` each request. - receiver: crossbeam::channel::Receiver<(ReadRequest, ResponseSend)>, + receiver: Receiver<(ReadRequest, ResponseSend)>, /// Access to the database. db: Arc, } +impl Drop for DatabaseReader { + fn drop(&mut self) { + // INVARIANT: we set the thread name when spawning it. + let thread_name = std::thread::current().name().unwrap(); + + // TODO: log that this thread has exited? + } +} + impl DatabaseReader { /// Initialize the `DatabaseReader` thread-pool. /// @@ -101,31 +113,29 @@ impl DatabaseReader { /// Should be called _once_ per actual database. #[cold] #[inline(never)] // Only called once. - pub(super) fn init(db: &Arc) -> DatabaseReadHandle { + pub(super) fn init(db: &Arc, reader_threads: ReaderThreads) -> DatabaseReadHandle { // Initialize `Request/Response` channels. let (sender, receiver) = crossbeam::channel::unbounded(); - // TODO: slightly _less_ readers per thread may be more ideal. - // We could account for the writer count as well such that - // readers + writers == total_thread_count - // - // TODO: take in a config option that allows - // manually adjusting this thread-count. - let readers = cuprate_helper::thread::threads().get(); + // How many reader threads to spawn? + let reader_count = reader_threads.as_threads(); // Spawn pool of readers. - for _ in 0..readers { + for i in 0..reader_count.get() { let receiver = receiver.clone(); - let db = db.clone(); + let db = Arc::clone(db); - std::thread::spawn(move || { - let this = Self { receiver, db }; - - Self::main(this); - }); + std::thread::Builder::new() + .name(format!("cuprate_helper::service::read::DatabaseReader{i}")) + .spawn(move || { + let this = Self { receiver, db }; + Self::main(this); + }) + .unwrap(); } - // Return a handle to the pool. + // Return a handle to the pool and channels to + // allow clean shutdown of all reader threads. DatabaseReadHandle { sender } } @@ -134,20 +144,19 @@ impl DatabaseReader { /// Each thread just loops in this function. #[cold] #[inline(never)] // Only called once. - fn main(mut self) { + fn main(self) { + // 1. Hang on request channel + // 2. Map request to some database function + // 3. Execute that function, get the result + // 4. Return the result via channel loop { - // 1. Hang on request channel - // 2. Map request to some database function - // 3. Execute that function, get the result - // 4. Return the result via channel - let (request, response_send) = match self.receiver.recv() { - Ok((r, c)) => (r, c), - - // Shutdown on error. - Err(e) => { - Self::shutdown(self); - return; - } + // Database requests. + let Ok((request, response_send)) = self.receiver.recv() else { + // If this receive errors, it means that the channel is empty + // and disconnected, meaning the other side (all senders) have + // been dropped. This means "shutdown", and we return here to + // exit the thread. + return; }; // Map [`Request`]'s to specific database functions. @@ -155,37 +164,28 @@ impl DatabaseReader { ReadRequest::Example1 => self.example_handler_1(response_send), ReadRequest::Example2(_x) => self.example_handler_2(response_send), ReadRequest::Example3(_x) => self.example_handler_3(response_send), - ReadRequest::Shutdown => { - /* TODO: run shutdown code */ - Self::shutdown(self); - - // Return, exiting the thread. - return; - } } } } /// TODO - fn example_handler_1(&mut self, response_send: ResponseSend) { + #[inline] + fn example_handler_1(&self, response_send: ResponseSend) { let db_result = todo!(); response_send.send(db_result).unwrap(); } /// TODO - fn example_handler_2(&mut self, response_send: ResponseSend) { + #[inline] + fn example_handler_2(&self, response_send: ResponseSend) { let db_result = todo!(); response_send.send(db_result).unwrap(); } /// TODO - fn example_handler_3(&mut self, response_send: ResponseSend) { + #[inline] + fn example_handler_3(&self, response_send: ResponseSend) { let db_result = todo!(); response_send.send(db_result).unwrap(); } - - /// TODO - fn shutdown(self) { - todo!() - } } diff --git a/database/src/service/request.rs b/database/src/service/request.rs index 73064c7..1c5baf4 100644 --- a/database/src/service/request.rs +++ b/database/src/service/request.rs @@ -16,8 +16,6 @@ pub enum ReadRequest { Example2(usize), /// TODO Example3(String), - /// TODO - Shutdown, } //---------------------------------------------------------------------------------------------------- WriteRequest @@ -30,8 +28,6 @@ pub enum WriteRequest { Example2(usize), /// TODO Example3(String), - /// - Shutdown, } //---------------------------------------------------------------------------------------------------- IMPL diff --git a/database/src/service/write.rs b/database/src/service/write.rs index e17e88f..b269dfc 100644 --- a/database/src/service/write.rs +++ b/database/src/service/write.rs @@ -11,9 +11,13 @@ use cuprate_helper::asynch::InfallibleOneshotReceiver; use crate::{ error::RuntimeError, service::{request::WriteRequest, response::Response}, - ConcreteEnv, + ConcreteEnv, Env, }; +//---------------------------------------------------------------------------------------------------- Constants +/// Name of the writer thread. +const WRITER_THREAD_NAME: &str = "cuprate_helper::service::read::DatabaseWriter"; + //---------------------------------------------------------------------------------------------------- Types /// The actual type of the response. /// @@ -77,21 +81,28 @@ pub(super) struct DatabaseWriter { db: Arc, } +impl Drop for DatabaseWriter { + fn drop(&mut self) { + // TODO: log the writer thread has exited? + } +} + impl DatabaseWriter { /// Initialize the single `DatabaseWriter` thread. #[cold] #[inline(never)] // Only called once. - pub(super) fn init(db: &Arc) -> DatabaseWriteHandle { + pub(super) fn init(db: Arc) -> DatabaseWriteHandle { // Initialize `Request/Response` channels. let (sender, receiver) = crossbeam::channel::unbounded(); // Spawn the writer. - let db = Arc::clone(db); - std::thread::spawn(move || { - let this = Self { receiver, db }; - - Self::main(this); - }); + std::thread::Builder::new() + .name(WRITER_THREAD_NAME.into()) + .spawn(move || { + let this = Self { receiver, db }; + Self::main(this); + }) + .unwrap(); // Return a handle to the pool. DatabaseWriteHandle { sender } @@ -103,17 +114,21 @@ impl DatabaseWriter { #[cold] #[inline(never)] // Only called once. fn main(mut self) { + // 1. Hang on request channel + // 2. Map request to some database function + // 3. Execute that function, get the result + // 4. Return the result via channel loop { - // 1. Hang on request channel - // 2. Map request to some database function - // 3. Execute that function, get the result - // 4. Return the result via channel - let (request, response_send) = match self.receiver.recv() { - Ok(tuple) => tuple, - Err(e) => { - // TODO: what to do with this channel error? - todo!(); - } + let Ok((request, response_send)) = self.receiver.recv() else { + // If this receive errors, it means that the channel is empty + // and disconnected, meaning the other side (all senders) have + // been dropped. This means "shutdown", and we return here to + // exit the thread. + // + // Since the channel is empty, it means we've also processed + // all requests. Since it is disconnected, it means future + // ones cannot come in. + return; }; // Map [`Request`]'s to specific database functions. @@ -121,37 +136,52 @@ impl DatabaseWriter { WriteRequest::Example1 => self.example_handler_1(response_send), WriteRequest::Example2(_x) => self.example_handler_2(response_send), WriteRequest::Example3(_x) => self.example_handler_3(response_send), - WriteRequest::Shutdown => { - /* TODO: run shutdown code */ - Self::shutdown(self); - - // Return, exiting the thread. - return; - } } } } + /// Resize the database's memory map. + fn resize_map(&self) { + // The compiler most likely optimizes out this + // entire function call if this returns here. + if !ConcreteEnv::MANUAL_RESIZE { + return; + } + + // INVARIANT: + // [`Env`]'s that are `MANUAL_RESIZE` are expected to implement + // their internals such that we have exclusive access when calling + // this function. We do not handle the exclusion part, `resize_map()` + // itself does. The `heed` backend does this with `RwLock`. + // + // We need mutual exclusion due to: + // + self.db.resize_map(None); + // TODO: + // We could pass in custom resizes to account for + // batch transactions, i.e., we're about to add ~5GB + // of data, add that much instead of the default 1GB. + // + } + /// TODO + #[inline] fn example_handler_1(&mut self, response_send: ResponseSend) { let db_result = todo!(); response_send.send(db_result).unwrap(); } /// TODO + #[inline] fn example_handler_2(&mut self, response_send: ResponseSend) { let db_result = todo!(); response_send.send(db_result).unwrap(); } /// TODO + #[inline] fn example_handler_3(&mut self, response_send: ResponseSend) { let db_result = todo!(); response_send.send(db_result).unwrap(); } - - /// TODO - fn shutdown(self) { - todo!() - } } diff --git a/database/src/table.rs b/database/src/table.rs index 9ad459a..82b4c63 100644 --- a/database/src/table.rs +++ b/database/src/table.rs @@ -7,9 +7,12 @@ use crate::{key::Key, pod::Pod}; /// Database table metadata. /// /// Purely compile time information for database tables. -/// Not really an accurate name for `K/V` database but -/// this represents the metadata of a `K/V` storing object. -pub trait Table { +/// +/// ## Sealed +/// This trait is [`Sealed`](https://rust-lang.github.io/api-guidelines/future-proofing.html#sealed-traits-protect-against-downstream-implementations-c-sealed). +/// +/// It is, and can only be implemented on the types inside [`tables`][crate::tables]. +pub trait Table: crate::tables::private::Sealed { // TODO: // // Add K/V comparison `type`s that define diff --git a/database/src/tables.rs b/database/src/tables.rs index fdede7c..492fa7d 100644 --- a/database/src/tables.rs +++ b/database/src/tables.rs @@ -6,6 +6,18 @@ //---------------------------------------------------------------------------------------------------- Import use crate::table::Table; +//---------------------------------------------------------------------------------------------------- Tables +/// Private module, should not be accessible outside this crate. +/// +/// Used to block outsiders implementing [`Table`]. +/// All [`Table`] types must also implement [`Sealed`]. +pub(super) mod private { + /// Private sealed trait. + /// + /// Cannot be implemented outside this crate. + pub trait Sealed {} +} + //---------------------------------------------------------------------------------------------------- Tables /// An enumeration of _all_ database tables. /// @@ -90,6 +102,10 @@ macro_rules! tables { #[derive(Copy,Clone,Debug,PartialEq,PartialOrd,Eq,Ord,Hash)] pub struct [<$table:camel>]; + // Implement the `Sealed` in this file. + // Required by `Table`. + impl private::Sealed for [<$table:camel>] {} + // Table trait impl. impl Table for [<$table:camel>] { const NAME: &'static str = stringify!([<$table:snake>]); diff --git a/helper/src/fs.rs b/helper/src/fs.rs index 8a82024..9a08b3c 100644 --- a/helper/src/fs.rs +++ b/helper/src/fs.rs @@ -48,16 +48,18 @@ pub const CUPRATE_DIR: &str = { /// - [`cuprate_cache_dir()`] /// - [`cuprate_config_dir()`] /// - [`cuprate_data_dir()`] +/// - [`cuprate_database_dir()`] /// /// FIXME: Use `LazyLock` when stabilized. /// . /// . -macro_rules! impl_dir_oncelock_and_fn { +macro_rules! impl_path_oncelock_and_fn { ($( $(#[$attr:meta])* // Documentation and any `derive`'s. $fn:ident, // Name of the corresponding access function. $dirs_fn:ident, // Name of the `dirs` function to use, the PATH prefix. - $once_lock:ident // Name of the `OnceLock`. + $once_lock:ident, // Name of the `OnceLock`. + $sub_dirs:literal // Any sub-directories to add onto the PATH. ),* $(,)?) => {$( /// Local `OnceLock` containing the Path. static $once_lock: OnceLock = OnceLock::new(); @@ -88,14 +90,24 @@ macro_rules! impl_dir_oncelock_and_fn { // Returned OS PATH should be absolute, not relative. assert!(path.is_absolute(), "SAFETY: returned OS PATH was not absolute"); + // Unconditionally prefix with the top-level Cuprate directory. path.push(CUPRATE_DIR); + + // Add any sub directories if specified in the macro. + if !$sub_dirs.is_empty() { + path.push($sub_dirs); + } + path }) } )*}; } -impl_dir_oncelock_and_fn! { +// Note that the `OnceLock`'s are prefixed with `__` to indicate: +// 1. They're not really to be used directly +// 2. To avoid name conflicts +impl_path_oncelock_and_fn! { /// Cuprate's cache directory. /// /// This is the PATH used for any Cuprate cache files. @@ -107,7 +119,8 @@ impl_dir_oncelock_and_fn! { /// | Linux | `/home/alice/.cache/cuprate/` | cuprate_cache_dir, cache_dir, - CUPRATE_CACHE_DIR, + __CUPRATE_CACHE_DIR, + "", /// Cuprate's config directory. /// @@ -120,7 +133,8 @@ impl_dir_oncelock_and_fn! { /// | Linux | `/home/alice/.config/cuprate/` | cuprate_config_dir, config_dir, - CUPRATE_CONFIG_DIR, + __CUPRATE_CONFIG_DIR, + "", /// Cuprate's data directory. /// @@ -133,7 +147,22 @@ impl_dir_oncelock_and_fn! { /// | Linux | `/home/alice/.local/share/cuprate/` | cuprate_data_dir, data_dir, - CUPRATE_DATA_DIR, + __CUPRATE_DATA_DIR, + "", + + /// Cuprate's database directory. + /// + /// This is the PATH used for any Cuprate database files. + /// + /// | OS | PATH | + /// |---------|--------------------------------------------------------------| + /// | Windows | `C:\Users\Alice\AppData\Roaming\Cuprate\database\` | + /// | macOS | `/Users/Alice/Library/Application Support/Cuprate/database/` | + /// | Linux | `/home/alice/.local/share/cuprate/database/` | + cuprate_database_dir, + data_dir, + __CUPRATE_DATABASE_DIR, + "database", } //---------------------------------------------------------------------------------------------------- Tests @@ -141,11 +170,17 @@ impl_dir_oncelock_and_fn! { mod test { use super::*; + // Sanity check every PATH defined in this file. + // + // Each new PATH should be added to this test: + // - It must be `is_absolute()` + // - It must `ends_with()` the expected end PATH for the OS #[test] - fn dir_sanity_check() { + fn path_sanity_check() { assert!(cuprate_cache_dir().is_absolute()); assert!(cuprate_config_dir().is_absolute()); assert!(cuprate_data_dir().is_absolute()); + assert!(cuprate_database_dir().is_absolute()); if cfg!(target_os = "windows") { let dir = cuprate_cache_dir(); @@ -159,6 +194,10 @@ mod test { let dir = cuprate_data_dir(); println!("cuprate_data_dir: {dir:?}"); assert!(dir.ends_with(r"AppData\Roaming\Cuprate")); + + let dir = cuprate_database_dir(); + println!("cuprate_database_dir: {dir:?}"); + assert!(dir.ends_with(r"AppData\Roaming\Cuprate\database")); } else if cfg!(target_os = "macos") { let dir = cuprate_cache_dir(); println!("cuprate_cache_dir: {dir:?}"); @@ -171,6 +210,10 @@ mod test { let dir = cuprate_data_dir(); println!("cuprate_data_dir: {dir:?}"); assert!(dir.ends_with("Library/Application Support/Cuprate")); + + let dir = cuprate_database_dir(); + println!("cuprate_database_dir: {dir:?}"); + assert!(dir.ends_with("Library/Application Support/Cuprate/database")); } else { // Assumes Linux. let dir = cuprate_cache_dir(); @@ -184,6 +227,10 @@ mod test { let dir = cuprate_data_dir(); println!("cuprate_data_dir: {dir:?}"); assert!(dir.ends_with(".local/share/cuprate")); + + let dir = cuprate_database_dir(); + println!("cuprate_database_dir: {dir:?}"); + assert!(dir.ends_with(".local/share/cuprate/database")); } } }