database: impl service::{Request,Response} mappings (#101)

* add `cuprate-types`

* remove `cuprate_database::service::{request,response}`

* use `cuprate_types::service::{request,response}`

* service: fix `Request` `match`'s

* service: create `ReadRequest` function mappings

* service: create `WriteRequest` function mappings

* service: add rough `WriteRequest` retry loop

* service: handle `RuntimeError::ResizeNeeded` in writer

* add `{R,r}o` exception to typos

* docs

* env: make `resize_map()` return new memory map byte size

* write: proactively handle resizes

`add_block()` takes `VerifiedBlockInformation` such
that it can just take the inner blobs of data.

This is a problem when reactively resizing since we no longer
have the block struct we just gave away so we're forced to `.clone()`
each retry.

Instead of that - we will proactively resize so the resize error
will never occur in the first place.

* read: use type aliases

* docs

* fix import

* write: handle resizes reactively

* service: panic if response can't be sent back

* write: add loop unreachable asserts

* service: print and drop error instead of panic

* write: fix retry loop off-by-1

* write: fix docs

* review changes

* update readme

* remove `BlockBatchInRange` request/response

* Update database/README.md

Co-authored-by: Boog900 <boog900@tutanota.com>

---------

Co-authored-by: Boog900 <boog900@tutanota.com>
This commit is contained in:
hinto-janai 2024-04-16 18:05:38 -04:00 committed by GitHub
parent e6465ec613
commit ad7b750d76
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 293 additions and 257 deletions

1
Cargo.lock generated
View file

@ -596,6 +596,7 @@ dependencies = [
"cfg-if", "cfg-if",
"crossbeam", "crossbeam",
"cuprate-helper", "cuprate-helper",
"cuprate-types",
"futures", "futures",
"heed", "heed",
"page_size", "page_size",

View file

@ -25,6 +25,7 @@ cfg-if = { workspace = true }
# We only need the `thread` feature if `service` is enabled. # We only need the `thread` feature if `service` is enabled.
# Figure out how to enable features of an already pulled in dependency conditionally. # Figure out how to enable features of an already pulled in dependency conditionally.
cuprate-helper = { path = "../helper", features = ["fs", "thread"] } cuprate-helper = { path = "../helper", features = ["fs", "thread"] }
cuprate-types = { path = "../types", features = ["service"] }
paste = { workspace = true } paste = { workspace = true }
page_size = { version = "0.6.0" } # Needed for database resizes, they must be a multiple of the OS page size. page_size = { version = "0.6.0" } # Needed for database resizes, they must be a multiple of the OS page size.
thiserror = { workspace = true } thiserror = { workspace = true }

View file

@ -105,9 +105,8 @@ This folder contains the `cupate_database::service` module.
|----------------|---------| |----------------|---------|
| `free.rs` | General free functions used (related to `cuprate_database::service`) | `free.rs` | General free functions used (related to `cuprate_database::service`)
| `read.rs` | Read thread-pool definitions and logic | `read.rs` | Read thread-pool definitions and logic
| `request.rs` | Read/write `Request`s to the database
| `response.rs` | Read/write `Response`'s from the database
| `tests.rs` | Thread-pool tests and test helper functions | `tests.rs` | Thread-pool tests and test helper functions
| `types.rs` | `cuprate_database::service`-related type aliases
| `write.rs` | Write thread-pool definitions and logic | `write.rs` | Write thread-pool definitions and logic
## `src/backend/` ## `src/backend/`

View file

@ -4,6 +4,7 @@
use std::{ use std::{
cell::RefCell, cell::RefCell,
fmt::Debug, fmt::Debug,
num::NonZeroUsize,
ops::Deref, ops::Deref,
sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
}; };
@ -249,11 +250,11 @@ impl Env for ConcreteEnv {
Ok(self.env.read().unwrap().force_sync()?) Ok(self.env.read().unwrap().force_sync()?)
} }
fn resize_map(&self, resize_algorithm: Option<ResizeAlgorithm>) { fn resize_map(&self, resize_algorithm: Option<ResizeAlgorithm>) -> NonZeroUsize {
let resize_algorithm = resize_algorithm.unwrap_or_else(|| self.config().resize_algorithm); let resize_algorithm = resize_algorithm.unwrap_or_else(|| self.config().resize_algorithm);
let current_size_bytes = self.current_map_size(); let current_size_bytes = self.current_map_size();
let new_size_bytes = resize_algorithm.resize(current_size_bytes).get(); let new_size_bytes = resize_algorithm.resize(current_size_bytes);
// SAFETY: // SAFETY:
// Resizing requires that we have // Resizing requires that we have
@ -264,8 +265,14 @@ impl Env for ConcreteEnv {
// <http://www.lmdb.tech/doc/group__mdb.html#gaa2506ec8dab3d969b0e609cd82e619e5> // <http://www.lmdb.tech/doc/group__mdb.html#gaa2506ec8dab3d969b0e609cd82e619e5>
unsafe { unsafe {
// INVARIANT: `resize()` returns a valid `usize` to resize to. // INVARIANT: `resize()` returns a valid `usize` to resize to.
self.env.write().unwrap().resize(new_size_bytes).unwrap(); self.env
.write()
.unwrap()
.resize(new_size_bytes.get())
.unwrap();
} }
new_size_bytes
} }
#[inline] #[inline]

View file

@ -1,7 +1,7 @@
//! Abstracted database environment; `trait Env`. //! Abstracted database environment; `trait Env`.
//---------------------------------------------------------------------------------------------------- Import //---------------------------------------------------------------------------------------------------- Import
use std::{fmt::Debug, ops::Deref}; use std::{fmt::Debug, num::NonZeroUsize, ops::Deref};
use crate::{ use crate::{
config::Config, config::Config,
@ -109,11 +109,13 @@ pub trait Env: Sized {
/// ///
/// If `resize_algorithm` is `Some`, that will be used instead. /// If `resize_algorithm` is `Some`, that will be used instead.
/// ///
/// This function returns the _new_ memory map size in bytes.
///
/// # Invariant /// # Invariant
/// This function _must_ be re-implemented if [`Env::MANUAL_RESIZE`] is `true`. /// This function _must_ be re-implemented if [`Env::MANUAL_RESIZE`] is `true`.
/// ///
/// Otherwise, this function will panic with `unreachable!()`. /// Otherwise, this function will panic with `unreachable!()`.
fn resize_map(&self, resize_algorithm: Option<ResizeAlgorithm>) { fn resize_map(&self, resize_algorithm: Option<ResizeAlgorithm>) -> NonZeroUsize {
unreachable!() unreachable!()
} }

View file

@ -100,6 +100,8 @@
//! config::Config, //! config::Config,
//! ConcreteEnv, //! ConcreteEnv,
//! Env, Key, TxRo, TxRw, //! Env, Key, TxRo, TxRw,
//! };
//! use cuprate_types::{
//! service::{ReadRequest, WriteRequest, Response}, //! service::{ReadRequest, WriteRequest, Response},
//! }; //! };
//! //!

View file

@ -59,6 +59,8 @@ impl ResizeAlgorithm {
} }
/// Maps the `self` variant to the free functions in [`crate::resize`]. /// Maps the `self` variant to the free functions in [`crate::resize`].
///
/// This function returns the _new_ memory map size in bytes.
#[inline] #[inline]
pub fn resize(&self, current_size_bytes: usize) -> NonZeroUsize { pub fn resize(&self, current_size_bytes: usize) -> NonZeroUsize {
match self { match self {

View file

@ -5,7 +5,7 @@
//! along with the reader/writer thread-pool system. //! along with the reader/writer thread-pool system.
//! //!
//! The thread-pool allows outside crates to communicate with it by //! The thread-pool allows outside crates to communicate with it by
//! sending database [`Request`](ReadRequest)s and receiving [`Response`]s `async`hronously - //! sending database [`Request`][req_r]s and receiving [`Response`][resp]s `async`hronously -
//! without having to actually worry and handle the database themselves. //! without having to actually worry and handle the database themselves.
//! //!
//! The system is managed by this crate, and only requires [`init`] by the user. //! The system is managed by this crate, and only requires [`init`] by the user.
@ -17,9 +17,9 @@
//! - [`DatabaseReadHandle`] //! - [`DatabaseReadHandle`]
//! - [`DatabaseWriteHandle`] //! - [`DatabaseWriteHandle`]
//! //!
//! The 1st allows any caller to send [`ReadRequest`]s. //! The 1st allows any caller to send [`ReadRequest`][req_r]s.
//! //!
//! The 2nd allows any caller to send [`WriteRequest`]s. //! The 2nd allows any caller to send [`WriteRequest`][req_w]s.
//! //!
//! The `DatabaseReadHandle` can be shared as it is cheaply [`Clone`]able, however, //! The `DatabaseReadHandle` can be shared as it is cheaply [`Clone`]able, however,
//! the `DatabaseWriteHandle` cannot be cloned. There is only 1 place in Cuprate that //! the `DatabaseWriteHandle` cannot be cloned. There is only 1 place in Cuprate that
@ -49,6 +49,14 @@
//! An `async`hronous channel will be returned from the call. //! An `async`hronous channel will be returned from the call.
//! This channel can be `.await`ed upon to (eventually) receive //! This channel can be `.await`ed upon to (eventually) receive
//! the corresponding `Response` to your `Request`. //! the corresponding `Response` to your `Request`.
//!
//!
//!
//! [req_r]: cuprate_types::service::ReadRequest
//!
//! [req_w]: cuprate_types::service::WriteRequest
//!
//! [resp]: cuprate_types::service::Response
mod read; mod read;
pub use read::DatabaseReadHandle; pub use read::DatabaseReadHandle;
@ -59,11 +67,8 @@ pub use write::DatabaseWriteHandle;
mod free; mod free;
pub use free::init; pub use free::init;
mod request; // Internal type aliases for `service`.
pub use request::{ReadRequest, WriteRequest}; mod types;
mod response;
pub use response::Response;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;

View file

@ -2,6 +2,8 @@
//---------------------------------------------------------------------------------------------------- Import //---------------------------------------------------------------------------------------------------- Import
use std::{ use std::{
collections::{HashMap, HashSet},
ops::Range,
sync::Arc, sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
}; };
@ -14,35 +16,16 @@ use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::PollSemaphore; use tokio_util::sync::PollSemaphore;
use cuprate_helper::asynch::InfallibleOneshotReceiver; use cuprate_helper::asynch::InfallibleOneshotReceiver;
use cuprate_types::service::{ReadRequest, Response};
use crate::{ use crate::{
config::ReaderThreads, config::ReaderThreads,
error::RuntimeError, error::RuntimeError,
service::{request::ReadRequest, response::Response}, service::types::{ResponseReceiver, ResponseResult, ResponseSender},
types::{Amount, AmountIndex, BlockHeight, KeyImage},
ConcreteEnv, ConcreteEnv,
}; };
//---------------------------------------------------------------------------------------------------- Types
/// The actual type of the response.
///
/// Either our [`Response`], or a database error occurred.
type ResponseResult = Result<Response, RuntimeError>;
/// The `Receiver` channel that receives the read response.
///
/// This is owned by the caller (the reader)
/// who `.await`'s for the response.
///
/// The channel itself should never fail,
/// but the actual database operation might.
type ResponseReceiver = InfallibleOneshotReceiver<ResponseResult>;
/// The `Sender` channel for the response.
///
/// The database reader thread uses this to send
/// the database result to the caller.
type ResponseSender = oneshot::Sender<ResponseResult>;
//---------------------------------------------------------------------------------------------------- DatabaseReadHandle //---------------------------------------------------------------------------------------------------- DatabaseReadHandle
/// Read handle to the database. /// Read handle to the database.
/// ///
@ -155,15 +138,14 @@ impl tower::Service<ReadRequest> for DatabaseReadHandle {
} }
// Acquire a permit before returning `Ready`. // Acquire a permit before returning `Ready`.
let Some(permit) = ready!(self.semaphore.poll_acquire(cx)) else { let permit =
// `self` itself owns the backing semaphore, so it can't be closed. ready!(self.semaphore.poll_acquire(cx)).expect("this semaphore is never closed");
unreachable!();
};
self.permit = Some(permit); self.permit = Some(permit);
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
#[inline]
fn call(&mut self, request: ReadRequest) -> Self::Future { fn call(&mut self, request: ReadRequest) -> Self::Future {
let permit = self let permit = self
.permit .permit
@ -193,28 +175,37 @@ impl tower::Service<ReadRequest> for DatabaseReadHandle {
// This function maps [`Request`]s to function calls // This function maps [`Request`]s to function calls
// executed by the rayon DB reader threadpool. // executed by the rayon DB reader threadpool.
#[inline] #[allow(clippy::needless_pass_by_value)] // TODO: fix me
#[allow(clippy::needless_pass_by_value)]
/// Map [`Request`]'s to specific database handler functions. /// Map [`Request`]'s to specific database handler functions.
/// ///
/// This is the main entrance into all `Request` handler functions. /// This is the main entrance into all `Request` handler functions.
/// The basic structure is: /// The basic structure is:
///
/// 1. `Request` is mapped to a handler function /// 1. `Request` is mapped to a handler function
/// 2. Handler function is called /// 2. Handler function is called
/// 3. [`Response`] is sent /// 3. [`Response`] is sent
fn map_request( fn map_request(
_permit: OwnedSemaphorePermit, // Permit for this request _permit: OwnedSemaphorePermit, // Permit for this request, dropped at end of function
env: Arc<ConcreteEnv>, // Access to the database env: Arc<ConcreteEnv>, // Access to the database
request: ReadRequest, // The request we must fulfill request: ReadRequest, // The request we must fulfill
response_sender: ResponseSender, // The channel we must send the response back to response_sender: ResponseSender, // The channel we must send the response back to
) { ) {
/* TODO: pre-request handling, run some code for each request? */ /* TODO: pre-request handling, run some code for each request? */
use ReadRequest as R;
match request { let response = match request {
ReadRequest::Example1 => example_handler_1(env, response_sender), R::BlockExtendedHeader(block) => block_extended_header(&env, block),
ReadRequest::Example2(x) => example_handler_2(env, response_sender, x), R::BlockHash(block) => block_hash(&env, block),
ReadRequest::Example3(x) => example_handler_3(env, response_sender, x), R::BlockExtendedHeaderInRange(range) => block_extended_header_in_range(&env, range),
R::ChainHeight => chain_height(&env),
R::GeneratedCoins => generated_coins(&env),
R::Outputs(map) => outputs(&env, map),
R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(&env, vec),
R::CheckKIsNotSpent(set) => check_k_is_not_spent(&env, set),
};
if let Err(e) = response_sender.send(response) {
// TODO: use tracing.
println!("database reader failed to send response: {e:?}");
} }
/* TODO: post-request handling, run some code for each request? */ /* TODO: post-request handling, run some code for each request? */
@ -223,6 +214,12 @@ fn map_request(
//---------------------------------------------------------------------------------------------------- Handler functions //---------------------------------------------------------------------------------------------------- Handler functions
// These are the actual functions that do stuff according to the incoming [`Request`]. // These are the actual functions that do stuff according to the incoming [`Request`].
// //
// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to
// the enum variant name, e.g: `BlockExtendedHeader` -> `block_extended_header`.
//
// Each function will return the [`Response`] that we
// should send back to the caller in [`map_request()`].
//
// INVARIANT: // INVARIANT:
// These functions are called above in `tower::Service::call()` // These functions are called above in `tower::Service::call()`
// using a custom threadpool which means any call to `par_*()` functions // using a custom threadpool which means any call to `par_*()` functions
@ -231,26 +228,57 @@ fn map_request(
// All functions below assume that this is the case, such that // All functions below assume that this is the case, such that
// `par_*()` functions will not block the _global_ rayon thread-pool. // `par_*()` functions will not block the _global_ rayon thread-pool.
/// TODO /// [`ReadRequest::BlockExtendedHeader`].
#[inline] #[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me fn block_extended_header(env: &Arc<ConcreteEnv>, block_height: BlockHeight) -> ResponseResult {
fn example_handler_1(env: Arc<ConcreteEnv>, response_sender: ResponseSender) { todo!()
let db_result = Ok(Response::Example1);
response_sender.send(db_result).unwrap();
} }
/// TODO /// [`ReadRequest::BlockHash`].
#[inline] #[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me fn block_hash(env: &Arc<ConcreteEnv>, block_height: BlockHeight) -> ResponseResult {
fn example_handler_2(env: Arc<ConcreteEnv>, response_sender: ResponseSender, x: usize) { todo!()
let db_result = Ok(Response::Example2(x));
response_sender.send(db_result).unwrap();
} }
/// [`ReadRequest::BlockExtendedHeaderInRange`].
#[inline]
fn block_extended_header_in_range(
env: &Arc<ConcreteEnv>,
range: std::ops::Range<BlockHeight>,
) -> ResponseResult {
todo!()
}
/// [`ReadRequest::ChainHeight`].
#[inline]
fn chain_height(env: &Arc<ConcreteEnv>) -> ResponseResult {
todo!()
}
/// [`ReadRequest::GeneratedCoins`].
#[inline]
fn generated_coins(env: &Arc<ConcreteEnv>) -> ResponseResult {
todo!()
}
/// [`ReadRequest::Outputs`].
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn outputs(env: &Arc<ConcreteEnv>, map: HashMap<Amount, HashSet<AmountIndex>>) -> ResponseResult {
todo!()
}
/// [`ReadRequest::NumberOutputsWithAmount`].
/// TODO /// TODO
#[inline] #[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me #[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn example_handler_3(env: Arc<ConcreteEnv>, response_sender: ResponseSender, x: String) { fn number_outputs_with_amount(env: &Arc<ConcreteEnv>, vec: Vec<Amount>) -> ResponseResult {
let db_result = Ok(Response::Example3(x)); todo!()
response_sender.send(db_result).unwrap(); }
/// [`ReadRequest::CheckKIsNotSpent`].
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn check_k_is_not_spent(env: &Arc<ConcreteEnv>, set: HashSet<KeyImage>) -> ResponseResult {
todo!()
} }

View file

@ -1,41 +0,0 @@
//! Read/write `Request`s to the database.
//!
//! TODO: could add `strum` derives.
//---------------------------------------------------------------------------------------------------- Import
//---------------------------------------------------------------------------------------------------- Constants
//---------------------------------------------------------------------------------------------------- ReadRequest
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
/// A read request to the database.
pub enum ReadRequest {
/// TODO
Example1,
/// TODO
Example2(usize),
/// TODO
Example3(String),
}
//---------------------------------------------------------------------------------------------------- WriteRequest
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
/// A write request to the database.
pub enum WriteRequest {
/// TODO
Example1,
/// TODO
Example2(usize),
/// TODO
Example3(String),
}
//---------------------------------------------------------------------------------------------------- IMPL
//---------------------------------------------------------------------------------------------------- Trait Impl
//---------------------------------------------------------------------------------------------------- Tests
#[cfg(test)]
mod test {
// use super::*;
}

View file

@ -1,38 +0,0 @@
//! Read/write `Response`'s from the database.
//!
//! TODO: could add `strum` derives.
//---------------------------------------------------------------------------------------------------- Import
//---------------------------------------------------------------------------------------------------- Constants
//---------------------------------------------------------------------------------------------------- Response
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
/// A response from the database.
///
/// TODO
pub enum Response {
//-------------------------------------------------------- Read responses
/// TODO
Example1,
/// TODO
Example2(usize),
/// TODO
Example3(String),
//-------------------------------------------------------- Write responses
/// The response
///
/// TODO
ExampleWriteResponse, // Probably will be just `Ok`
}
//---------------------------------------------------------------------------------------------------- IMPL
//---------------------------------------------------------------------------------------------------- Trait Impl
//---------------------------------------------------------------------------------------------------- Tests
#[cfg(test)]
mod test {
// use super::*;
}

View file

@ -12,9 +12,11 @@
//---------------------------------------------------------------------------------------------------- Use //---------------------------------------------------------------------------------------------------- Use
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use cuprate_types::service::{ReadRequest, Response, WriteRequest};
use crate::{ use crate::{
config::Config, config::Config,
service::{init, DatabaseReadHandle, DatabaseWriteHandle, ReadRequest, Response, WriteRequest}, service::{init, DatabaseReadHandle, DatabaseWriteHandle},
}; };
//---------------------------------------------------------------------------------------------------- Tests //---------------------------------------------------------------------------------------------------- Tests
@ -34,43 +36,47 @@ fn init_drop() {
let (reader, writer, _tempdir) = init_service(); let (reader, writer, _tempdir) = init_service();
} }
/// Send a read request, and receive a response, // TODO:
/// asserting the response the expected value. // un-comment and fix these tests when all `{read,write}`
#[tokio::test] // service functions are implemented.
async fn read_request() {
let (reader, writer, _tempdir) = init_service();
for (request, expected_response) in [ // /// Send a read request, and receive a response,
(ReadRequest::Example1, Response::Example1), // /// asserting the response the expected value.
(ReadRequest::Example2(123), Response::Example2(123)), // #[tokio::test]
( // async fn read_request() {
ReadRequest::Example3("hello".into()), // let (reader, writer, _tempdir) = init_service();
Response::Example3("hello".into()),
),
] {
// This calls `poll_ready()` asserting we have a permit before `call()`.
let response_channel = reader.clone().oneshot(request);
let response = response_channel.await.unwrap();
assert_eq!(response, expected_response);
}
}
/// Send a write request, and receive a response, // for (request, expected_response) in [
/// asserting the response the expected value. // (ReadRequest::Example1, Response::Example1),
#[tokio::test] // (ReadRequest::Example2(123), Response::Example2(123)),
async fn write_request() { // (
let (reader, mut writer, _tempdir) = init_service(); // ReadRequest::Example3("hello".into()),
// Response::Example3("hello".into()),
// ),
// ] {
// // This calls `poll_ready()` asserting we have a permit before `call()`.
// let response_channel = reader.clone().oneshot(request);
// let response = response_channel.await.unwrap();
// assert_eq!(response, expected_response);
// }
// }
for (request, expected_response) in [ // /// Send a write request, and receive a response,
(WriteRequest::Example1, Response::Example1), // /// asserting the response the expected value.
(WriteRequest::Example2(123), Response::Example2(123)), // #[tokio::test]
( // async fn write_request() {
WriteRequest::Example3("hello".into()), // let (reader, mut writer, _tempdir) = init_service();
Response::Example3("hello".into()),
), // for (request, expected_response) in [
] { // (WriteRequest::Example1, Response::Example1),
let response_channel = writer.call(request); // (WriteRequest::Example2(123), Response::Example2(123)),
let response = response_channel.await.unwrap(); // (
assert_eq!(response, expected_response); // WriteRequest::Example3("hello".into()),
} // Response::Example3("hello".into()),
} // ),
// ] {
// let response_channel = writer.call(request);
// let response = response_channel.await.unwrap();
// assert_eq!(response, expected_response);
// }
// }

View file

@ -0,0 +1,31 @@
//! Database service type aliases.
//!
//! Only used internally for our `tower::Service` impls.
//---------------------------------------------------------------------------------------------------- Use
use futures::channel::oneshot::Sender;
use cuprate_helper::asynch::InfallibleOneshotReceiver;
use cuprate_types::service::Response;
use crate::error::RuntimeError;
//---------------------------------------------------------------------------------------------------- Types
/// The actual type of the response.
///
/// Either our [`Response`], or a database error occurred.
pub(super) type ResponseResult = Result<Response, RuntimeError>;
/// The `Receiver` channel that receives the read response.
///
/// This is owned by the caller (the reader/writer thread)
/// who `.await`'s for the response.
///
/// The channel itself should never fail,
/// but the actual database operation might.
pub(super) type ResponseReceiver = InfallibleOneshotReceiver<ResponseResult>;
/// The `Sender` channel for the response.
///
/// The database reader/writer thread uses this to send the database result to the caller.
pub(super) type ResponseSender = Sender<ResponseResult>;

View file

@ -9,31 +9,20 @@ use std::{
use futures::channel::oneshot; use futures::channel::oneshot;
use cuprate_helper::asynch::InfallibleOneshotReceiver; use cuprate_helper::asynch::InfallibleOneshotReceiver;
use cuprate_types::{
service::{Response, WriteRequest},
VerifiedBlockInformation,
};
use crate::{ use crate::{
error::RuntimeError, error::RuntimeError,
service::{request::WriteRequest, response::Response}, service::types::{ResponseReceiver, ResponseResult, ResponseSender},
ConcreteEnv, Env, ConcreteEnv, Env,
}; };
//---------------------------------------------------------------------------------------------------- Constants //---------------------------------------------------------------------------------------------------- Constants
/// Name of the writer thread. /// Name of the writer thread.
const WRITER_THREAD_NAME: &str = "cuprate_helper::service::read::DatabaseWriter"; const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter");
//---------------------------------------------------------------------------------------------------- Types
/// The actual type of the response.
///
/// Either our [Response], or a database error occurred.
type ResponseResult = Result<Response, RuntimeError>;
/// The `Receiver` channel that receives the write response.
///
/// The channel itself should never fail,
/// but the actual database operation might.
type ResponseReceiver = InfallibleOneshotReceiver<ResponseResult>;
/// The `Sender` channel for the response.
type ResponseSender = oneshot::Sender<ResponseResult>;
//---------------------------------------------------------------------------------------------------- DatabaseWriteHandle //---------------------------------------------------------------------------------------------------- DatabaseWriteHandle
/// Write handle to the database. /// Write handle to the database.
@ -57,7 +46,7 @@ impl DatabaseWriteHandle {
/// Initialize the single `DatabaseWriter` thread. /// Initialize the single `DatabaseWriter` thread.
#[cold] #[cold]
#[inline(never)] // Only called once. #[inline(never)] // Only called once.
pub(super) fn init(db: Arc<ConcreteEnv>) -> Self { pub(super) fn init(env: Arc<ConcreteEnv>) -> Self {
// Initialize `Request/Response` channels. // Initialize `Request/Response` channels.
let (sender, receiver) = crossbeam::channel::unbounded(); let (sender, receiver) = crossbeam::channel::unbounded();
@ -65,7 +54,7 @@ impl DatabaseWriteHandle {
std::thread::Builder::new() std::thread::Builder::new()
.name(WRITER_THREAD_NAME.into()) .name(WRITER_THREAD_NAME.into())
.spawn(move || { .spawn(move || {
let this = DatabaseWriter { receiver, db }; let this = DatabaseWriter { receiver, env };
DatabaseWriter::main(this); DatabaseWriter::main(this);
}) })
.unwrap(); .unwrap();
@ -107,7 +96,7 @@ pub(super) struct DatabaseWriter {
receiver: crossbeam::channel::Receiver<(WriteRequest, ResponseSender)>, receiver: crossbeam::channel::Receiver<(WriteRequest, ResponseSender)>,
/// Access to the database. /// Access to the database.
db: Arc<ConcreteEnv>, env: Arc<ConcreteEnv>,
} }
impl Drop for DatabaseWriter { impl Drop for DatabaseWriter {
@ -119,7 +108,8 @@ impl Drop for DatabaseWriter {
impl DatabaseWriter { impl DatabaseWriter {
/// The `DatabaseWriter`'s main function. /// The `DatabaseWriter`'s main function.
/// ///
/// The writer just loops in this function. /// The writer just loops in this function, handling requests forever
/// until the request channel is dropped or a panic occurs.
#[cold] #[cold]
#[inline(never)] // Only called once. #[inline(never)] // Only called once.
fn main(self) { fn main(self) {
@ -127,7 +117,7 @@ impl DatabaseWriter {
// 2. Map request to some database function // 2. Map request to some database function
// 3. Execute that function, get the result // 3. Execute that function, get the result
// 4. Return the result via channel // 4. Return the result via channel
loop { 'main: loop {
let Ok((request, response_sender)) = self.receiver.recv() else { let Ok((request, response_sender)) = self.receiver.recv() else {
// If this receive errors, it means that the channel is empty // If this receive errors, it means that the channel is empty
// and disconnected, meaning the other side (all senders) have // and disconnected, meaning the other side (all senders) have
@ -140,60 +130,101 @@ impl DatabaseWriter {
return; return;
}; };
/// How many times should we retry handling the request on resize errors?
///
/// This is 1 on automatically resizing databases, meaning there is only 1 iteration.
#[allow(clippy::items_after_statements)]
const REQUEST_RETRY_LIMIT: usize = if ConcreteEnv::MANUAL_RESIZE { 3 } else { 1 };
// Map [`Request`]'s to specific database functions. // Map [`Request`]'s to specific database functions.
match request {
WriteRequest::Example1 => self.example_handler_1(response_sender),
WriteRequest::Example2(x) => self.example_handler_2(response_sender, x),
WriteRequest::Example3(x) => self.example_handler_3(response_sender, x),
}
}
}
/// Resize the database's memory map.
fn resize_map(&self) {
// The compiler most likely optimizes out this
// entire function call if this returns here.
if !ConcreteEnv::MANUAL_RESIZE {
return;
}
// INVARIANT:
// [`Env`]'s that are `MANUAL_RESIZE` are expected to implement
// their internals such that we have exclusive access when calling
// this function. We do not handle the exclusion part, `resize_map()`
// itself does. The `heed` backend does this with `RwLock`.
// //
// We need mutual exclusion due to: // Both will:
// <http://www.lmdb.tech/doc/group__mdb.html#gaa2506ec8dab3d969b0e609cd82e619e5> // 1. Map the request to a function
self.db.resize_map(None); // 2. Call the function
// TODO: // 3. (manual resize only) If resize is needed, resize and retry
// 4. (manual resize only) Redo step {1, 2}
// 5. Send the function's `Result` back to the requester
//
// FIXME: there's probably a more elegant way
// to represent this retry logic with recursive
// functions instead of a loop.
'retry: for retry in 0..REQUEST_RETRY_LIMIT {
// TODO: will there be more than 1 write request?
// this won't have to be an enum.
let response = match &request {
WriteRequest::WriteBlock(block) => write_block(&self.env, block),
};
// If the database needs to resize, do so.
if ConcreteEnv::MANUAL_RESIZE && matches!(response, Err(RuntimeError::ResizeNeeded))
{
// If this is the last iteration of the outer `for` loop and we
// encounter a resize error _again_, it means something is wrong.
assert_ne!(
retry, REQUEST_RETRY_LIMIT,
"database resize failed maximum of {REQUEST_RETRY_LIMIT} times"
);
// Resize the map, and retry the request handling loop.
//
// FIXME:
// We could pass in custom resizes to account for // We could pass in custom resizes to account for
// batch transactions, i.e., we're about to add ~5GB // batches, i.e., we're about to add ~5GB of data,
// of data, add that much instead of the default 1GB. // add that much instead of the default 1GB.
// <https://github.com/monero-project/monero/blob/059028a30a8ae9752338a7897329fe8012a310d5/src/blockchain_db/lmdb/db_lmdb.cpp#L665-L695> // <https://github.com/monero-project/monero/blob/059028a30a8ae9752338a7897329fe8012a310d5/src/blockchain_db/lmdb/db_lmdb.cpp#L665-L695>
let old = self.env.current_map_size();
let new = self.env.resize_map(None);
// TODO: use tracing.
println!("resizing database memory map, old: {old}B, new: {new}B");
// Try handling the request again.
continue 'retry;
} }
/// TODO // Automatically resizing databases should not be returning a resize error.
#[inline] #[cfg(debug_assertions)]
#[allow(clippy::unused_self)] // TODO: remove me if !ConcreteEnv::MANUAL_RESIZE {
fn example_handler_1(&self, response_sender: ResponseSender) { assert!(
let db_result = Ok(Response::Example1); !matches!(response, Err(RuntimeError::ResizeNeeded)),
response_sender.send(db_result).unwrap(); "auto-resizing database returned a ResizeNeeded error"
);
} }
/// TODO // Send the response back, whether if it's an `Ok` or `Err`.
#[inline] if let Err(e) = response_sender.send(response) {
#[allow(clippy::unused_self)] // TODO: remove me // TODO: use tracing.
fn example_handler_2(&self, response_sender: ResponseSender, x: usize) { println!("database writer failed to send response: {e:?}");
let db_result = Ok(Response::Example2(x));
response_sender.send(db_result).unwrap();
} }
/// TODO continue 'main;
}
// Above retry loop should either:
// - continue to the next ['main] loop or...
// - ...retry until panic
unreachable!();
}
// The only case the ['main] loop breaks should be a:
// - direct function return
// - panic
// anything below should be unreachable.
unreachable!();
}
}
//---------------------------------------------------------------------------------------------------- Handler functions
// These are the actual functions that do stuff according to the incoming [`Request`].
//
// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to
// the enum variant name, e.g: `BlockExtendedHeader` -> `block_extended_header`.
//
// Each function will return the [`Response`] that we
// should send back to the caller in [`map_request()`].
/// [`WriteRequest::WriteBlock`].
#[inline] #[inline]
#[allow(clippy::unused_self)] // TODO: remove me fn write_block(env: &ConcreteEnv, block: &VerifiedBlockInformation) -> ResponseResult {
fn example_handler_3(&self, response_sender: ResponseSender, x: String) { todo!()
let db_result = Ok(Response::Example3(x));
response_sender.send(db_result).unwrap();
}
} }

View file

@ -37,8 +37,6 @@ pub enum ReadRequest {
NumberOutputsWithAmount(Vec<u64>), NumberOutputsWithAmount(Vec<u64>),
/// TODO /// TODO
CheckKIsNotSpent(HashSet<[u8; 32]>), CheckKIsNotSpent(HashSet<[u8; 32]>),
/// TODO
BlockBatchInRange(Range<u64>),
} }
//---------------------------------------------------------------------------------------------------- WriteRequest //---------------------------------------------------------------------------------------------------- WriteRequest
@ -73,8 +71,6 @@ pub enum Response {
/// TODO /// TODO
/// returns true if key images are spent /// returns true if key images are spent
CheckKIsNotSpent(bool), CheckKIsNotSpent(bool),
/// TODO
BlockBatchInRange(Vec<(Block, Vec<Transaction>)>),
//------------------------------------------------------ Writes //------------------------------------------------------ Writes
/// TODO /// TODO

View file

@ -6,6 +6,10 @@ extend-ignore-identifiers-re = [
# in file: `/cryptonight/c/oaes_lib.c:1213` # in file: `/cryptonight/c/oaes_lib.c:1213`
# not sure if false-positive or not. # not sure if false-positive or not.
"InvMixColums", "InvMixColums",
# cuprate_database's `TxRo` and `tx_ro`
"RO",
"Ro",
"ro",
] ]
[files] [files]