This commit is contained in:
Boog900 2024-07-26 21:30:35 +01:00
parent c1cba04123
commit d3c6128fc7
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
10 changed files with 46 additions and 38 deletions

View file

@ -34,6 +34,7 @@ monero-serai = { workspace = true, features = ["std"] }
serde = { workspace = true, optional = true }
# `service` feature.
tower = { workspace = true }
thread_local = { workspace = true, optional = true }
rayon = { workspace = true, optional = true }
@ -42,7 +43,6 @@ cuprate-helper = { path = "../../helper", features = ["thread"] }
cuprate-test-utils = { path = "../../test-utils" }
tokio = { workspace = true, features = ["full"] }
tower = { workspace = true }
tempfile = { workspace = true }
pretty_assertions = { workspace = true }
proptest = { workspace = true }

View file

@ -52,7 +52,7 @@
unused_crate_dependencies,
unused_doc_comments,
unused_mut,
//missing_docs,
missing_docs,
deprecated,
unused_comparisons,
nonstandard_style

View file

@ -14,8 +14,8 @@
//!
//! ## Handles
//! The 2 handles to the database are:
//! - [`DatabaseReadHandle`]
//! - [`DatabaseWriteHandle`]
//! - [`BCReadHandle`]
//! - [`BCWriteHandle`]
//!
//! The 1st allows any caller to send [`ReadRequest`][req_r]s.
//!
@ -33,12 +33,12 @@
//!
//! ## Shutdown
//! Upon the above handles being dropped, the corresponding thread(s) will automatically exit, i.e:
//! - The last [`DatabaseReadHandle`] is dropped => reader thread-pool exits
//! - The last [`DatabaseWriteHandle`] is dropped => writer thread exits
//! - The last [`BCReadHandle`] is dropped => reader thread-pool exits
//! - The last [`BCWriteHandle`] is dropped => writer thread exits
//!
//! Upon dropping the [`cuprate_database::ConcreteEnv`]:
//! Upon dropping the [`cuprate_database::Env`]:
//! - All un-processed database transactions are completed
//! - All data gets flushed to disk (caused by [`Drop::drop`] impl on `ConcreteEnv`)
//! - All data gets flushed to disk (caused by [`Drop::drop`] impl on `Env`)
//!
//! ## Request and Response
//! To interact with the database (whether reading or writing data),
@ -118,6 +118,9 @@
//! # Ok(()) }
//! ```
// needed for docs
use tower as _;
mod read;
pub use read::{init_read_service, init_read_service_with_pool};

View file

@ -36,12 +36,14 @@ use crate::{
types::{Amount, AmountIndex, BlockHash, BlockHeight, KeyImage, PreRctOutputId},
};
//---------------------------------------------------------------------------------------------------- DatabaseReadHandle
//---------------------------------------------------------------------------------------------------- init_read_service
/// Initialize the blockchain database read service.
pub fn init_read_service(env: Arc<ConcreteEnv>, threads: ReaderThreads) -> BCReadHandle {
init_read_service_with_pool(env, init_thread_pool(threads))
}
/// Initialize the blockchain database read service, with a specific rayon thread-pool instead of
/// creating a new one.
pub fn init_read_service_with_pool(env: Arc<ConcreteEnv>, pool: Arc<ThreadPool>) -> BCReadHandle {
DatabaseReadService::new(env, pool, map_request)
}
@ -65,7 +67,7 @@ fn map_request(
/* SOMEDAY: pre-request handling, run some code for each request? */
let response = match request {
match request {
R::BlockExtendedHeader(block) => block_extended_header(env, block),
R::BlockHash(block) => block_hash(env, block),
R::FilterUnknownHashes(hashes) => filter_unknown_hashes(env, hashes),
@ -77,11 +79,9 @@ fn map_request(
R::KeyImagesSpent(set) => key_images_spent(env, set),
R::CompactChainHistory => compact_chain_history(env),
R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids),
};
}
/* SOMEDAY: post-request handling, run some code for each request? */
response
}
//---------------------------------------------------------------------------------------------------- Thread Local

View file

@ -13,6 +13,8 @@ use cuprate_types::blockchain::{BCReadRequest, BCResponse, BCWriteRequest};
/// Either our [`BCResponse`], or a database error occurred.
pub(super) type ResponseResult = Result<BCResponse, RuntimeError>;
/// The blockchain database write service.
pub type BCWriteHandle = DatabaseWriteHandle<BCWriteRequest, BCResponse>;
pub type BCReadHandle = DatabaseReadService<BCReadRequest, BCResponse, RuntimeError>;
/// The blockchain database read service.
pub type BCReadHandle = DatabaseReadService<BCReadRequest, BCResponse>;

View file

@ -14,12 +14,15 @@ use crate::{
service::types::{BCWriteHandle, ResponseResult},
tables::OpenTables,
};
//---------------------------------------------------------------------------------------------------- init_write_service
//---------------------------------------------------------------------------------------------------- init_write_service
/// Initialize the blockchain write service from a [`ConcreteEnv`].
pub fn init_write_service(env: Arc<ConcreteEnv>) -> BCWriteHandle {
DatabaseWriteHandle::init(env, handle_bc_request)
}
//---------------------------------------------------------------------------------------------------- handle_bc_request
/// Handle an incoming [`BCWriteRequest`], returning a [`BCResponse`].
fn handle_bc_request(env: &ConcreteEnv, req: &BCWriteRequest) -> Result<BCResponse, RuntimeError> {
match req {
BCWriteRequest::WriteBlock(block) => write_block(env, block),

View file

@ -1,3 +1,5 @@
#![allow(clippy::impl_trait_in_params)]
mod reader_threads;
mod service;

View file

@ -9,11 +9,13 @@
//! based on these values.
//---------------------------------------------------------------------------------------------------- Import
use std::{num::NonZeroUsize, sync::Arc};
use rayon::ThreadPool;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use std::num::NonZeroUsize;
use std::sync::Arc;
//---------------------------------------------------------------------------------------------------- init_thread_pool
pub fn init_thread_pool(reader_threads: ReaderThreads) -> Arc<ThreadPool> {
// How many reader threads to spawn?
@ -29,8 +31,6 @@ pub fn init_thread_pool(reader_threads: ReaderThreads) -> Arc<ThreadPool> {
}
//---------------------------------------------------------------------------------------------------- ReaderThreads
/// Amount of database reader threads to spawn when using [`service`](crate::service).
///
/// This controls how many reader thread `service`'s
/// thread-pool will spawn to receive and send requests/responses.
///

View file

@ -7,39 +7,38 @@ use futures::channel::oneshot;
use rayon::ThreadPool;
use tower::Service;
use cuprate_database::ConcreteEnv;
use cuprate_database::{ConcreteEnv, RuntimeError};
use cuprate_helper::asynch::InfallibleOneshotReceiver;
/// The [`rayon::ThreadPool`] service.
///
/// Uses an inner request handler and a rayon thread-pool to asynchronously handler requests.
pub struct DatabaseReadService<Req, Res, Err> {
pub struct DatabaseReadService<Req, Res> {
/// The rayon thread-pool.
pool: Arc<ThreadPool>,
/// The function used to handle request.
inner_handler: Arc<dyn Fn(Req) -> Result<Res, Err> + Send + Sync + 'static>,
inner_handler: Arc<dyn Fn(Req) -> Result<Res, RuntimeError> + Send + Sync + 'static>,
}
impl<Req, Res, Err> Clone for DatabaseReadService<Req, Res, Err> {
impl<Req, Res> Clone for DatabaseReadService<Req, Res> {
fn clone(&self) -> Self {
Self {
pool: self.pool.clone(),
inner_handler: self.inner_handler.clone(),
pool: Arc::clone(&self.pool),
inner_handler: Arc::clone(&self.inner_handler),
}
}
}
impl<Req, Res, Err> DatabaseReadService<Req, Res, Err>
impl<Req, Res> DatabaseReadService<Req, Res>
where
Req: Send + 'static,
Res: Send + 'static,
Err: Send + 'static,
{
pub fn new(
env: Arc<ConcreteEnv>,
pool: Arc<ThreadPool>,
req_handler: impl Fn(&ConcreteEnv, Req) -> Result<Res, Err> + Send + Sync + 'static,
req_handler: impl Fn(&ConcreteEnv, Req) -> Result<Res, RuntimeError> + Send + Sync + 'static,
) -> Self {
let inner_handler = Arc::new(move |req| req_handler(&env, req));
@ -50,14 +49,13 @@ where
}
}
impl<Req, Res, Err> Service<Req> for DatabaseReadService<Req, Res, Err>
impl<Req, Res> Service<Req> for DatabaseReadService<Req, Res>
where
Req: Send + 'static,
Res: Send + 'static,
Err: Send + 'static,
{
type Response = Res;
type Error = Err;
type Error = RuntimeError;
type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -68,7 +66,7 @@ where
// Response channel we `.await` on.
let (response_sender, receiver) = oneshot::channel();
let handler = self.inner_handler.clone();
let handler = Arc::clone(&self.inner_handler);
// Spawn the request in the rayon DB thread-pool.
//

View file

@ -16,7 +16,7 @@ const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter");
///
/// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`]
/// will return an `async`hronous channel that can be `.await`ed upon
/// to receive the corresponding [`BCResponse`].
/// to receive the corresponding response.
#[derive(Debug)]
pub struct DatabaseWriteHandle<Req, Res> {
/// Sender channel to the database write thread-pool.
@ -44,7 +44,7 @@ where
// Spawn the writer.
std::thread::Builder::new()
.name(WRITER_THREAD_NAME.into())
.spawn(move || database_writer(env, receiver, inner_handler))
.spawn(move || database_writer(&env, &receiver, inner_handler))
.unwrap();
Self { sender }
@ -76,8 +76,8 @@ impl<Req, Res> tower::Service<Req> for DatabaseWriteHandle<Req, Res> {
//---------------------------------------------------------------------------------------------------- database_writer
/// The main function of the writer thread.
fn database_writer<Req, Res>(
env: Arc<ConcreteEnv>,
receiver: crossbeam::channel::Receiver<(Req, oneshot::Sender<Result<Res, RuntimeError>>)>,
env: &ConcreteEnv,
receiver: &crossbeam::channel::Receiver<(Req, oneshot::Sender<Result<Res, RuntimeError>>)>,
inner_handler: impl Fn(&ConcreteEnv, &Req) -> Result<Res, RuntimeError>,
) where
Req: Send + 'static,
@ -120,7 +120,7 @@ fn database_writer<Req, Res>(
'retry: for retry in 0..REQUEST_RETRY_LIMIT {
// FIXME: will there be more than 1 write request?
// this won't have to be an enum.
let response = inner_handler(&env, &request);
let response = inner_handler(env, &request);
// If the database needs to resize, do so.
if ConcreteEnv::MANUAL_RESIZE && matches!(response, Err(RuntimeError::ResizeNeeded)) {