service: wrap ConcreteEnv in RwLock and doc why

This commit is contained in:
hinto.janai 2024-04-23 16:46:45 -04:00
parent 4dbf7dafa9
commit ba1d96522f
No known key found for this signature in database
GPG key ID: D47CE05FA175A499
5 changed files with 147 additions and 27 deletions

View file

@ -191,6 +191,10 @@
// although it is sometimes nice.
clippy::must_use_candidate,
// FIXME: good lint but too many false positives
// with our `Env` + `RwLock` setup.
clippy::significant_drop_tightening,
// TODO: should be removed after all `todo!()`'s are gone.
clippy::diverging_sub_expression,

View file

@ -0,0 +1,86 @@
//! General constants used throughout `cuprate-database`.
//---------------------------------------------------------------------------------------------------- Import
use cfg_if::cfg_if;
//---------------------------------------------------------------------------------------------------- Version
/// Current major version of the database.
///
/// Returned by [`crate::ops::property::db_version`].
///
/// This is incremented by 1 when `cuprate_database`'s
/// structure/schema/tables change.
///
/// This is akin to `VERSION` in `monerod`:
/// <https://github.com/monero-project/monero/blob/c8214782fb2a769c57382a999eaf099691c836e7/src/blockchain_db/lmdb/db_lmdb.cpp#L57>
pub const DATABASE_VERSION: u64 = 0;
//---------------------------------------------------------------------------------------------------- Error Messages
/// Corrupt database error message.
///
/// The error message shown to end-users in panic
/// messages if we think the database is corrupted.
///
/// This is meant to be user-friendly.
pub const DATABASE_CORRUPT_MSG: &str = r"Cuprate has encountered a fatal error. The database may be corrupted.
TODO: instructions on:
1. What to do
2. How to fix (re-sync, recover, etc)
3. General advice for preventing corruption
4. etc";
//---------------------------------------------------------------------------------------------------- Misc
/// Static string of the `crate` being used as the database backend.
///
/// | Backend | Value |
/// |---------|-------|
/// | `heed` | "heed"
/// | `redb` | "redb"
pub const DATABASE_BACKEND: &str = {
cfg_if! {
if #[cfg(all(feature = "redb", not(feature = "heed")))] {
"redb"
} else {
"heed"
}
}
};
/// Cuprate's database filename.
///
/// Used in [`Config::db_file`](crate::config::Config::db_file).
///
/// | Backend | Value |
/// |---------|-------|
/// | `heed` | "data.mdb"
/// | `redb` | "data.redb"
pub const DATABASE_DATA_FILENAME: &str = {
cfg_if! {
if #[cfg(all(feature = "redb", not(feature = "heed")))] {
"data.redb"
} else {
"data.mdb"
}
}
};
/// Cuprate's database lock filename.
///
/// | Backend | Value |
/// |---------|-------|
/// | `heed` | Some("lock.mdb")
/// | `redb` | None (redb doesn't use a file lock)
pub const DATABASE_LOCK_FILENAME: Option<&str> = {
cfg_if! {
if #[cfg(all(feature = "redb", not(feature = "heed")))] {
None
} else {
Some("lock.mdb")
}
}
};
//---------------------------------------------------------------------------------------------------- Tests
#[cfg(test)]
mod test {}

View file

@ -1,7 +1,7 @@
//! General free functions used (related to `cuprate_database::service`).
//---------------------------------------------------------------------------------------------------- Import
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use crate::{
config::Config,
@ -34,7 +34,29 @@ pub fn init(config: Config) -> Result<(DatabaseReadHandle, DatabaseWriteHandle),
let reader_threads = config.reader_threads;
// Initialize the database itself.
let db: Arc<ConcreteEnv> = Arc::new(ConcreteEnv::open(config)?);
//
// INVARIANT:
// To prevent the reader thread-pool seeing different states of the
// database amongst themselves in the face of a write, i.e:
// ```
// Reader 1 (same request as reader 2)
// |
// v Writer
// tx_ro |
// v Reader 2
// tx_rw + commit |
// v
// tx_ro <- different state than reader 1
// ```
// We must ensure that all reader threads see the same
// database state, and that if the writer writes, all
// reader threads also see the changes at the same time.
//
// This invariant is protected by this `RwLock`.
//
// Functions that do not necessarily need multi-transaction
// synchronization (resizing, disk size, etc) can use `.read()` instead.
let db = Arc::new(RwLock::new(ConcreteEnv::open(config)?));
// Spawn the Reader thread pool and Writer.
let readers = DatabaseReadHandle::init(&db, reader_threads);

View file

@ -10,7 +10,7 @@ use std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
ops::Range,
sync::Arc,
sync::{Arc, RwLock},
task::{Context, Poll},
};
@ -29,6 +29,7 @@ use cuprate_types::{
use crate::{
config::ReaderThreads,
constants::DATABASE_CORRUPT_MSG,
error::RuntimeError,
ops::block::{get_block_extended_header_from_height, get_block_info},
service::types::{ResponseReceiver, ResponseResult, ResponseSender},
@ -67,7 +68,7 @@ pub struct DatabaseReadHandle {
permit: Option<OwnedSemaphorePermit>,
/// Access to the database.
env: Arc<ConcreteEnv>,
env: Arc<RwLock<ConcreteEnv>>,
}
// `OwnedSemaphorePermit` does not implement `Clone`,
@ -76,10 +77,10 @@ pub struct DatabaseReadHandle {
impl Clone for DatabaseReadHandle {
fn clone(&self) -> Self {
Self {
pool: self.pool.clone(),
pool: Arc::clone(&self.pool),
semaphore: self.semaphore.clone(),
permit: None,
env: self.env.clone(),
env: Arc::clone(&self.env),
}
}
}
@ -93,7 +94,7 @@ impl DatabaseReadHandle {
/// Should be called _once_ per actual database.
#[cold]
#[inline(never)] // Only called once.
pub(super) fn init(env: &Arc<ConcreteEnv>, reader_threads: ReaderThreads) -> Self {
pub(super) fn init(env: &Arc<RwLock<ConcreteEnv>>, reader_threads: ReaderThreads) -> Self {
// How many reader threads to spawn?
let reader_count = reader_threads.as_threads().get();
@ -119,7 +120,7 @@ impl DatabaseReadHandle {
/// TODO
#[inline]
pub const fn env(&self) -> &Arc<ConcreteEnv> {
pub const fn env(&self) -> &Arc<RwLock<ConcreteEnv>> {
&self.env
}
@ -174,7 +175,7 @@ impl tower::Service<ReadRequest> for DatabaseReadHandle {
//
// INVARIANT:
// The below `DatabaseReader` function impl block relies on this behavior.
let env = Arc::clone(self.env());
let env = Arc::clone(&self.env);
self.pool
.spawn(move || map_request(permit, env, request, response_sender));
@ -196,13 +197,15 @@ impl tower::Service<ReadRequest> for DatabaseReadHandle {
/// 3. [`Response`] is sent
fn map_request(
_permit: OwnedSemaphorePermit, // Permit for this request, dropped at end of function
env: Arc<ConcreteEnv>, // Access to the database
env: Arc<RwLock<ConcreteEnv>>, // Access to the database
request: ReadRequest, // The request we must fulfill
response_sender: ResponseSender, // The channel we must send the response back to
) {
/* TODO: pre-request handling, run some code for each request? */
use ReadRequest as R;
/* TODO: pre-request handling, run some code for each request? */
let env = env.read().expect(DATABASE_CORRUPT_MSG);
let response = match request {
R::BlockExtendedHeader(block) => block_extended_header(&env, block),
R::BlockHash(block) => block_hash(&env, block),
@ -272,16 +275,14 @@ fn block_extended_header_in_range(
let env_inner = env.env_inner();
// This iterator will early return as `Err` if there's even 1 error.
let vec = {
range
.into_par_iter()
.map(|block_height| {
let tx_ro = env_inner.tx_ro()?;
let tables = env_inner.open_tables(&tx_ro)?;
get_block_extended_header_from_height(&block_height, &tables)
})
.collect::<Result<Vec<ExtendedBlockHeader>, RuntimeError>>()?
};
let vec = range
.into_par_iter()
.map(|block_height| {
let tx_ro = env_inner.tx_ro()?;
let tables = env_inner.open_tables(&tx_ro)?;
get_block_extended_header_from_height(&block_height, &tables)
})
.collect::<Result<Vec<ExtendedBlockHeader>, RuntimeError>>()?;
Ok(Response::BlockExtendedHeaderInRange(vec))
}

View file

@ -2,7 +2,7 @@
//---------------------------------------------------------------------------------------------------- Import
use std::{
sync::Arc,
sync::{Arc, RwLock},
task::{Context, Poll},
};
@ -15,6 +15,7 @@ use cuprate_types::{
};
use crate::{
constants::DATABASE_CORRUPT_MSG,
env::{Env, EnvInner},
error::RuntimeError,
service::types::{ResponseReceiver, ResponseResult, ResponseSender},
@ -48,7 +49,7 @@ impl DatabaseWriteHandle {
/// Initialize the single `DatabaseWriter` thread.
#[cold]
#[inline(never)] // Only called once.
pub(super) fn init(env: Arc<ConcreteEnv>) -> Self {
pub(super) fn init(env: Arc<RwLock<ConcreteEnv>>) -> Self {
// Initialize `Request/Response` channels.
let (sender, receiver) = crossbeam::channel::unbounded();
@ -98,7 +99,7 @@ pub(super) struct DatabaseWriter {
receiver: crossbeam::channel::Receiver<(WriteRequest, ResponseSender)>,
/// Access to the database.
env: Arc<ConcreteEnv>,
env: Arc<RwLock<ConcreteEnv>>,
}
impl Drop for DatabaseWriter {
@ -174,8 +175,12 @@ impl DatabaseWriter {
// batches, i.e., we're about to add ~5GB of data,
// add that much instead of the default 1GB.
// <https://github.com/monero-project/monero/blob/059028a30a8ae9752338a7897329fe8012a310d5/src/blockchain_db/lmdb/db_lmdb.cpp#L665-L695>
let old = self.env.current_map_size();
let new = self.env.resize_map(None);
let (old, new) = {
let env = self.env.read().expect(DATABASE_CORRUPT_MSG);
let old = env.current_map_size();
let new = env.resize_map(None);
(old, new)
};
// TODO: use tracing.
println!("resizing database memory map, old: {old}B, new: {new}B");
@ -228,7 +233,9 @@ impl DatabaseWriter {
/// [`WriteRequest::WriteBlock`].
#[inline]
#[allow(clippy::significant_drop_tightening)]
fn write_block(env: &ConcreteEnv, block: &VerifiedBlockInformation) -> ResponseResult {
fn write_block(env: &RwLock<ConcreteEnv>, block: &VerifiedBlockInformation) -> ResponseResult {
let env = env.write().expect(DATABASE_CORRUPT_MSG);
let env_inner = env.env_inner();
let tx_rw = env_inner.tx_rw()?;