From 7e8aae016c55802070ccf7d152aa8966984d7186 Mon Sep 17 00:00:00 2001 From: "hinto.janai" Date: Sun, 28 Apr 2024 21:00:14 -0400 Subject: [PATCH] heed: use Mutex for `HeedTableRo`'s read tx --- database/src/backend/heed/database.rs | 53 +++++++++++++++++++-------- database/src/backend/heed/env.rs | 4 +- 2 files changed, 39 insertions(+), 18 deletions(-) diff --git a/database/src/backend/heed/database.rs b/database/src/backend/heed/database.rs index 7a62855d..d9f8c676 100644 --- a/database/src/backend/heed/database.rs +++ b/database/src/backend/heed/database.rs @@ -6,7 +6,7 @@ use std::{ cell::RefCell, fmt::Debug, ops::RangeBounds, - sync::RwLockReadGuard, + sync::{Mutex, RwLockReadGuard}, }; use crate::{ @@ -36,7 +36,7 @@ pub(super) struct HeedTableRo<'tx, T: Table> { /// An already opened database table. pub(super) db: HeedDb, /// The associated read-only transaction that opened this table. - pub(super) tx_ro: &'tx heed::RoTxn<'tx>, + pub(super) tx_ro: Mutex<&'tx heed::RoTxn<'tx>>, } /// An opened read/write database associated with a transaction. @@ -49,16 +49,25 @@ pub(super) struct HeedTableRw<'env, 'tx, T: Table> { pub(super) tx_rw: &'tx RefCell>, } -/// SAFETY: `cuprate_database`'s Cargo.toml enables a feature -/// for `heed` that turns on the `MDB_NOTLS` flag for LMDB. +#[allow(clippy::non_send_fields_in_send_ty)] +/// SAFETY: 2 invariants for safety: /// +/// 1. `cuprate_database`'s Cargo.toml enables a feature +/// for `heed` that turns on the `MDB_NOTLS` flag for LMDB. /// This makes read transactions `Send`, but only if that flag is enabled. /// +/// 2. Our `tx_ro` is wrapped in Mutex, as `&T: Send` only if `T: Sync`. +/// This is what is happening as we have `&TxRw`, not `TxRw`. +/// +/// /// This is required as in `crate::service` we must put our transactions and /// tables inside `ThreadLocal`'s to use across multiple threads. -/// -/// `ThreadLocal` requires that `T: Send`. -unsafe impl Send for HeedTableRo<'_, T> {} +unsafe impl Send for HeedTableRo<'_, T> +where + T::Key: Send, + T::Value: Send, +{ +} //---------------------------------------------------------------------------------------------------- Shared functions // FIXME: we cannot just deref `HeedTableRw -> HeedTableRo` and @@ -121,7 +130,10 @@ impl DatabaseIter for HeedTableRo<'_, T> { where Range: RangeBounds + 'a, { - Ok(self.db.range(self.tx_ro, &range)?.map(|res| Ok(res?.1))) + Ok(self + .db + .range(&self.tx_ro.lock().unwrap(), &range)? + .map(|res| Ok(res?.1))) } #[inline] @@ -129,21 +141,30 @@ impl DatabaseIter for HeedTableRo<'_, T> { &self, ) -> Result> + '_, RuntimeError> { - Ok(self.db.iter(self.tx_ro)?.map(|res| Ok(res?))) + Ok(self + .db + .iter(&self.tx_ro.lock().unwrap())? + .map(|res| Ok(res?))) } #[inline] fn keys( &self, ) -> Result> + '_, RuntimeError> { - Ok(self.db.iter(self.tx_ro)?.map(|res| Ok(res?.0))) + Ok(self + .db + .iter(&self.tx_ro.lock().unwrap())? + .map(|res| Ok(res?.0))) } #[inline] fn values( &self, ) -> Result> + '_, RuntimeError> { - Ok(self.db.iter(self.tx_ro)?.map(|res| Ok(res?.1))) + Ok(self + .db + .iter(&self.tx_ro.lock().unwrap())? + .map(|res| Ok(res?.1))) } } @@ -151,27 +172,27 @@ impl DatabaseIter for HeedTableRo<'_, T> { impl DatabaseRo for HeedTableRo<'_, T> { #[inline] fn get(&self, key: &T::Key) -> Result { - get::(&self.db, self.tx_ro, key) + get::(&self.db, &self.tx_ro.lock().unwrap(), key) } #[inline] fn len(&self) -> Result { - len::(&self.db, self.tx_ro) + len::(&self.db, &self.tx_ro.lock().unwrap()) } #[inline] fn first(&self) -> Result<(T::Key, T::Value), RuntimeError> { - first::(&self.db, self.tx_ro) + first::(&self.db, &self.tx_ro.lock().unwrap()) } #[inline] fn last(&self) -> Result<(T::Key, T::Value), RuntimeError> { - last::(&self.db, self.tx_ro) + last::(&self.db, &self.tx_ro.lock().unwrap()) } #[inline] fn is_empty(&self) -> Result { - is_empty::(&self.db, self.tx_ro) + is_empty::(&self.db, &self.tx_ro.lock().unwrap()) } } diff --git a/database/src/backend/heed/env.rs b/database/src/backend/heed/env.rs index 56ece8f3..9b21d039 100644 --- a/database/src/backend/heed/env.rs +++ b/database/src/backend/heed/env.rs @@ -6,7 +6,7 @@ use std::{ fmt::Debug, num::NonZeroUsize, ops::Deref, - sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, + sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}, }; use heed::{DatabaseOpenOptions, EnvFlags, EnvOpenOptions}; @@ -312,7 +312,7 @@ where db: self .open_database(tx_ro, Some(T::NAME))? .expect(PANIC_MSG_MISSING_TABLE), - tx_ro, + tx_ro: Mutex::new(tx_ro), }) }