service: remove RwLock, impl some read functions

This commit is contained in:
hinto.janai 2024-04-23 18:57:21 -04:00
parent 6a0a17aead
commit 0a01ddda2f
No known key found for this signature in database
GPG key ID: D47CE05FA175A499
3 changed files with 100 additions and 52 deletions

View file

@ -34,29 +34,7 @@ pub fn init(config: Config) -> Result<(DatabaseReadHandle, DatabaseWriteHandle),
let reader_threads = config.reader_threads;
// Initialize the database itself.
//
// INVARIANT:
// To prevent the reader thread-pool seeing different states of the
// database amongst themselves in the face of a write, i.e:
// ```
// Reader 1 (same request as reader 2)
// |
// v Writer
// tx_ro |
// v Reader 2
// tx_rw + commit |
// v
// tx_ro <- different state than reader 1
// ```
// We must ensure that all reader threads see the same
// database state, and that if the writer writes, all
// reader threads also see the changes at the same time.
//
// This invariant is protected by this `RwLock`.
//
// Functions that do not necessarily need multi-transaction
// synchronization (resizing, disk size, etc) can use `.read()` instead.
let db = Arc::new(RwLock::new(ConcreteEnv::open(config)?));
let db = Arc::new(ConcreteEnv::open(config)?);
// Spawn the Reader thread pool and Writer.
let readers = DatabaseReadHandle::init(&db, reader_threads);

View file

@ -17,7 +17,8 @@ use std::{
use cfg_if::cfg_if;
use crossbeam::channel::Receiver;
use futures::{channel::oneshot, ready};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
use thread_local::ThreadLocal;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::PollSemaphore;
@ -33,9 +34,9 @@ use crate::{
error::RuntimeError,
ops::block::{get_block_extended_header_from_height, get_block_info},
service::types::{ResponseReceiver, ResponseResult, ResponseSender},
tables::{BlockInfos, Tables},
types::{Amount, AmountIndex, BlockHeight, KeyImage},
ConcreteEnv, Env, EnvInner,
tables::{BlockHeights, BlockInfos, KeyImages, NumOutputs, Outputs, Tables},
types::{Amount, AmountIndex, BlockHeight, KeyImage, PreRctOutputId},
ConcreteEnv, DatabaseRo, Env, EnvInner,
};
//---------------------------------------------------------------------------------------------------- DatabaseReadHandle
@ -68,7 +69,7 @@ pub struct DatabaseReadHandle {
permit: Option<OwnedSemaphorePermit>,
/// Access to the database.
env: Arc<RwLock<ConcreteEnv>>,
env: Arc<ConcreteEnv>,
}
// `OwnedSemaphorePermit` does not implement `Clone`,
@ -94,7 +95,7 @@ impl DatabaseReadHandle {
/// Should be called _once_ per actual database.
#[cold]
#[inline(never)] // Only called once.
pub(super) fn init(env: &Arc<RwLock<ConcreteEnv>>, reader_threads: ReaderThreads) -> Self {
pub(super) fn init(env: &Arc<ConcreteEnv>, reader_threads: ReaderThreads) -> Self {
// How many reader threads to spawn?
let reader_count = reader_threads.as_threads().get();
@ -120,7 +121,7 @@ impl DatabaseReadHandle {
/// TODO
#[inline]
pub const fn env(&self) -> &Arc<RwLock<ConcreteEnv>> {
pub const fn env(&self) -> &Arc<ConcreteEnv> {
&self.env
}
@ -197,14 +198,13 @@ impl tower::Service<ReadRequest> for DatabaseReadHandle {
/// 3. [`Response`] is sent
fn map_request(
_permit: OwnedSemaphorePermit, // Permit for this request, dropped at end of function
env: Arc<RwLock<ConcreteEnv>>, // Access to the database
env: Arc<ConcreteEnv>, // Access to the database
request: ReadRequest, // The request we must fulfill
response_sender: ResponseSender, // The channel we must send the response back to
) {
use ReadRequest as R;
/* TODO: pre-request handling, run some code for each request? */
let env = env.read().expect(DATABASE_CORRUPT_MSG);
let response = match request {
R::BlockExtendedHeader(block) => block_extended_header(&env, block),
@ -242,6 +242,9 @@ fn map_request(
// All functions below assume that this is the case, such that
// `par_*()` functions will not block the _global_ rayon thread-pool.
// TODO: implement multi-transaction read atomicity.
// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1576874589>.
/// [`ReadRequest::BlockExtendedHeader`].
#[inline]
fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
@ -274,13 +277,16 @@ fn block_extended_header_in_range(
) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = ThreadLocal::with_capacity(env.config().reader_threads.as_threads().get());
let tables = ThreadLocal::with_capacity(env.config().reader_threads.as_threads().get());
// This iterator will early return as `Err` if there's even 1 error.
let vec = range
.into_par_iter()
.map(|block_height| {
let tx_ro = env_inner.tx_ro()?;
let tables = env_inner.open_tables(&tx_ro)?;
get_block_extended_header_from_height(&block_height, &tables)
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = tables.get_or_try(|| env_inner.open_tables(tx_ro))?;
get_block_extended_header_from_height(&block_height, tables)
})
.collect::<Result<Vec<ExtendedBlockHeader>, RuntimeError>>()?;
@ -290,19 +296,54 @@ fn block_extended_header_in_range(
/// [`ReadRequest::ChainHeight`].
#[inline]
fn chain_height(env: &ConcreteEnv) -> ResponseResult {
todo!()
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
let top_height = crate::ops::blockchain::top_block_height(&table_block_heights)?;
let block_hash = crate::ops::block::get_block_info(&top_height, &table_block_infos)?.block_hash;
Ok(Response::ChainHeight(top_height, block_hash))
}
/// [`ReadRequest::GeneratedCoins`].
#[inline]
fn generated_coins(env: &ConcreteEnv) -> ResponseResult {
todo!()
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
let top_height = crate::ops::blockchain::top_block_height(&table_block_heights)?;
Ok(Response::GeneratedCoins(
crate::ops::blockchain::cumulative_generated_coins(&top_height, &table_block_infos)?,
))
}
/// [`ReadRequest::Outputs`].
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn outputs(env: &ConcreteEnv, map: HashMap<Amount, HashSet<AmountIndex>>) -> ResponseResult {
// let env_inner = env.env_inner();
// let vec = map
// .par_iter()
// .map(|(amount, amount_index_set)| {
// amount_index_set.par_iter().map(|amount_index| {
// let tx_ro = env_inner.tx_ro()?;
// let table_outputs = env_inner.open_db_ro::<Outputs>(&tx_ro)?;
// let pre_rct_output_id = PreRctOutputId {
// amount: *amount,
// amount_index: *amount_index,
// };
// crate::ops::output::get_output(&pre_rct_output_id, &table_outputs)
// })
// })
// .collect::<Result<Vec<ExtendedBlockHeader>, RuntimeError>>()?;
todo!()
}
@ -310,13 +351,48 @@ fn outputs(env: &ConcreteEnv, map: HashMap<Amount, HashSet<AmountIndex>>) -> Res
/// TODO
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn number_outputs_with_amount(env: &ConcreteEnv, vec: Vec<Amount>) -> ResponseResult {
fn number_outputs_with_amount(env: &ConcreteEnv, amounts: Vec<Amount>) -> ResponseResult {
// let env_inner = env.env_inner();
// let vec = amounts
// .into_par_iter()
// .map(|amount| {
// let tx_ro = env_inner.tx_ro()?;
// let table_num_outputs = env_inner.open_db_ro::<NumOutputs>(&tx_ro)?;
// match table_num_outputs.get(amount) {}
// })
// .collect::<Result<HashMap<Amount, usize>, RuntimeError>>()?;
todo!()
}
/// [`ReadRequest::CheckKIsNotSpent`].
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn check_k_is_not_spent(env: &ConcreteEnv, set: HashSet<KeyImage>) -> ResponseResult {
todo!()
fn check_k_is_not_spent(env: &ConcreteEnv, key_images: HashSet<KeyImage>) -> ResponseResult {
let env_inner = env.env_inner();
let tx_ro = ThreadLocal::with_capacity(env.config().reader_threads.as_threads().get());
let table_key_images =
ThreadLocal::with_capacity(env.config().reader_threads.as_threads().get());
let key_image_exists = |key_image| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let table_key_images =
table_key_images.get_or_try(|| env_inner.open_db_ro::<KeyImages>(tx_ro))?;
crate::ops::key_image::key_image_exists(&key_image, table_key_images)
};
match key_images
.into_par_iter()
.map(key_image_exists)
// If the result is either:
// `Ok(true)` => a key image was found, return early
// `Err` => an error was found, return early
//
// Else, `Ok(false)` will continue the iterator.
.find_any(|result| !matches!(result, Ok(false)))
{
None | Some(Ok(false)) => Ok(Response::CheckKIsNotSpent(true)), // Key image was NOT found.
Some(Ok(true)) => Ok(Response::CheckKIsNotSpent(false)), // Key image was found.
Some(Err(e)) => Err(e), // A database error occured.
}
}

View file

@ -49,7 +49,7 @@ impl DatabaseWriteHandle {
/// Initialize the single `DatabaseWriter` thread.
#[cold]
#[inline(never)] // Only called once.
pub(super) fn init(env: Arc<RwLock<ConcreteEnv>>) -> Self {
pub(super) fn init(env: Arc<ConcreteEnv>) -> Self {
// Initialize `Request/Response` channels.
let (sender, receiver) = crossbeam::channel::unbounded();
@ -99,7 +99,7 @@ pub(super) struct DatabaseWriter {
receiver: crossbeam::channel::Receiver<(WriteRequest, ResponseSender)>,
/// Access to the database.
env: Arc<RwLock<ConcreteEnv>>,
env: Arc<ConcreteEnv>,
}
impl Drop for DatabaseWriter {
@ -175,12 +175,8 @@ impl DatabaseWriter {
// batches, i.e., we're about to add ~5GB of data,
// add that much instead of the default 1GB.
// <https://github.com/monero-project/monero/blob/059028a30a8ae9752338a7897329fe8012a310d5/src/blockchain_db/lmdb/db_lmdb.cpp#L665-L695>
let (old, new) = {
let env = self.env.read().expect(DATABASE_CORRUPT_MSG);
let old = env.current_map_size();
let new = env.resize_map(None);
(old, new)
};
let old = self.env.current_map_size();
let new = self.env.resize_map(None);
// TODO: use tracing.
println!("resizing database memory map, old: {old}B, new: {new}B");
@ -233,9 +229,7 @@ impl DatabaseWriter {
/// [`WriteRequest::WriteBlock`].
#[inline]
#[allow(clippy::significant_drop_tightening)]
fn write_block(env: &RwLock<ConcreteEnv>, block: &VerifiedBlockInformation) -> ResponseResult {
let env = env.write().expect(DATABASE_CORRUPT_MSG);
fn write_block(env: &ConcreteEnv, block: &VerifiedBlockInformation) -> ResponseResult {
let env_inner = env.env_inner();
let tx_rw = env_inner.tx_rw()?;