From ad7b750d76dd4c603eb00b3f2b61f4340054706e Mon Sep 17 00:00:00 2001 From: hinto-janai Date: Tue, 16 Apr 2024 18:05:38 -0400 Subject: [PATCH] database: impl `service::{Request,Response}` mappings (#101) * add `cuprate-types` * remove `cuprate_database::service::{request,response}` * use `cuprate_types::service::{request,response}` * service: fix `Request` `match`'s * service: create `ReadRequest` function mappings * service: create `WriteRequest` function mappings * service: add rough `WriteRequest` retry loop * service: handle `RuntimeError::ResizeNeeded` in writer * add `{R,r}o` exception to typos * docs * env: make `resize_map()` return new memory map byte size * write: proactively handle resizes `add_block()` takes `VerifiedBlockInformation` such that it can just take the inner blobs of data. This is a problem when reactively resizing since we no longer have the block struct we just gave away so we're forced to `.clone()` each retry. Instead of that - we will proactively resize so the resize error will never occur in the first place. * read: use type aliases * docs * fix import * write: handle resizes reactively * service: panic if response can't be sent back * write: add loop unreachable asserts * service: print and drop error instead of panic * write: fix retry loop off-by-1 * write: fix docs * review changes * update readme * remove `BlockBatchInRange` request/response * Update database/README.md Co-authored-by: Boog900 --------- Co-authored-by: Boog900 --- Cargo.lock | 1 + database/Cargo.toml | 1 + database/README.md | 3 +- database/src/backend/heed/env.rs | 13 ++- database/src/env.rs | 6 +- database/src/lib.rs | 2 + database/src/resize.rs | 2 + database/src/service/mod.rs | 21 ++-- database/src/service/read.rs | 126 +++++++++++++--------- database/src/service/request.rs | 41 -------- database/src/service/response.rs | 38 ------- database/src/service/tests.rs | 82 ++++++++------- database/src/service/types.rs | 31 ++++++ database/src/service/write.rs | 175 ++++++++++++++++++------------- types/src/service.rs | 4 - typos.toml | 4 + 16 files changed, 293 insertions(+), 257 deletions(-) delete mode 100644 database/src/service/request.rs delete mode 100644 database/src/service/response.rs create mode 100644 database/src/service/types.rs diff --git a/Cargo.lock b/Cargo.lock index 6a69a1da..92a275ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -596,6 +596,7 @@ dependencies = [ "cfg-if", "crossbeam", "cuprate-helper", + "cuprate-types", "futures", "heed", "page_size", diff --git a/database/Cargo.toml b/database/Cargo.toml index 33c3937c..646b76bf 100644 --- a/database/Cargo.toml +++ b/database/Cargo.toml @@ -25,6 +25,7 @@ cfg-if = { workspace = true } # We only need the `thread` feature if `service` is enabled. # Figure out how to enable features of an already pulled in dependency conditionally. cuprate-helper = { path = "../helper", features = ["fs", "thread"] } +cuprate-types = { path = "../types", features = ["service"] } paste = { workspace = true } page_size = { version = "0.6.0" } # Needed for database resizes, they must be a multiple of the OS page size. thiserror = { workspace = true } diff --git a/database/README.md b/database/README.md index 74d07149..6d47d2b6 100644 --- a/database/README.md +++ b/database/README.md @@ -105,9 +105,8 @@ This folder contains the `cupate_database::service` module. |----------------|---------| | `free.rs` | General free functions used (related to `cuprate_database::service`) | `read.rs` | Read thread-pool definitions and logic -| `request.rs` | Read/write `Request`s to the database -| `response.rs` | Read/write `Response`'s from the database | `tests.rs` | Thread-pool tests and test helper functions +| `types.rs` | `cuprate_database::service`-related type aliases | `write.rs` | Write thread-pool definitions and logic ## `src/backend/` diff --git a/database/src/backend/heed/env.rs b/database/src/backend/heed/env.rs index d9e3fdc2..9362cd72 100644 --- a/database/src/backend/heed/env.rs +++ b/database/src/backend/heed/env.rs @@ -4,6 +4,7 @@ use std::{ cell::RefCell, fmt::Debug, + num::NonZeroUsize, ops::Deref, sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, }; @@ -249,11 +250,11 @@ impl Env for ConcreteEnv { Ok(self.env.read().unwrap().force_sync()?) } - fn resize_map(&self, resize_algorithm: Option) { + fn resize_map(&self, resize_algorithm: Option) -> NonZeroUsize { let resize_algorithm = resize_algorithm.unwrap_or_else(|| self.config().resize_algorithm); let current_size_bytes = self.current_map_size(); - let new_size_bytes = resize_algorithm.resize(current_size_bytes).get(); + let new_size_bytes = resize_algorithm.resize(current_size_bytes); // SAFETY: // Resizing requires that we have @@ -264,8 +265,14 @@ impl Env for ConcreteEnv { // unsafe { // INVARIANT: `resize()` returns a valid `usize` to resize to. - self.env.write().unwrap().resize(new_size_bytes).unwrap(); + self.env + .write() + .unwrap() + .resize(new_size_bytes.get()) + .unwrap(); } + + new_size_bytes } #[inline] diff --git a/database/src/env.rs b/database/src/env.rs index 26adc975..4109d889 100644 --- a/database/src/env.rs +++ b/database/src/env.rs @@ -1,7 +1,7 @@ //! Abstracted database environment; `trait Env`. //---------------------------------------------------------------------------------------------------- Import -use std::{fmt::Debug, ops::Deref}; +use std::{fmt::Debug, num::NonZeroUsize, ops::Deref}; use crate::{ config::Config, @@ -109,11 +109,13 @@ pub trait Env: Sized { /// /// If `resize_algorithm` is `Some`, that will be used instead. /// + /// This function returns the _new_ memory map size in bytes. + /// /// # Invariant /// This function _must_ be re-implemented if [`Env::MANUAL_RESIZE`] is `true`. /// /// Otherwise, this function will panic with `unreachable!()`. - fn resize_map(&self, resize_algorithm: Option) { + fn resize_map(&self, resize_algorithm: Option) -> NonZeroUsize { unreachable!() } diff --git a/database/src/lib.rs b/database/src/lib.rs index a80647f0..d9f0181f 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -100,6 +100,8 @@ //! config::Config, //! ConcreteEnv, //! Env, Key, TxRo, TxRw, +//! }; +//! use cuprate_types::{ //! service::{ReadRequest, WriteRequest, Response}, //! }; //! diff --git a/database/src/resize.rs b/database/src/resize.rs index 62ecf5e7..ac54781c 100644 --- a/database/src/resize.rs +++ b/database/src/resize.rs @@ -59,6 +59,8 @@ impl ResizeAlgorithm { } /// Maps the `self` variant to the free functions in [`crate::resize`]. + /// + /// This function returns the _new_ memory map size in bytes. #[inline] pub fn resize(&self, current_size_bytes: usize) -> NonZeroUsize { match self { diff --git a/database/src/service/mod.rs b/database/src/service/mod.rs index 83f8088a..88773052 100644 --- a/database/src/service/mod.rs +++ b/database/src/service/mod.rs @@ -5,7 +5,7 @@ //! along with the reader/writer thread-pool system. //! //! The thread-pool allows outside crates to communicate with it by -//! sending database [`Request`](ReadRequest)s and receiving [`Response`]s `async`hronously - +//! sending database [`Request`][req_r]s and receiving [`Response`][resp]s `async`hronously - //! without having to actually worry and handle the database themselves. //! //! The system is managed by this crate, and only requires [`init`] by the user. @@ -17,9 +17,9 @@ //! - [`DatabaseReadHandle`] //! - [`DatabaseWriteHandle`] //! -//! The 1st allows any caller to send [`ReadRequest`]s. +//! The 1st allows any caller to send [`ReadRequest`][req_r]s. //! -//! The 2nd allows any caller to send [`WriteRequest`]s. +//! The 2nd allows any caller to send [`WriteRequest`][req_w]s. //! //! The `DatabaseReadHandle` can be shared as it is cheaply [`Clone`]able, however, //! the `DatabaseWriteHandle` cannot be cloned. There is only 1 place in Cuprate that @@ -49,6 +49,14 @@ //! An `async`hronous channel will be returned from the call. //! This channel can be `.await`ed upon to (eventually) receive //! the corresponding `Response` to your `Request`. +//! +//! +//! +//! [req_r]: cuprate_types::service::ReadRequest +//! +//! [req_w]: cuprate_types::service::WriteRequest +//! +//! [resp]: cuprate_types::service::Response mod read; pub use read::DatabaseReadHandle; @@ -59,11 +67,8 @@ pub use write::DatabaseWriteHandle; mod free; pub use free::init; -mod request; -pub use request::{ReadRequest, WriteRequest}; - -mod response; -pub use response::Response; +// Internal type aliases for `service`. +mod types; #[cfg(test)] mod tests; diff --git a/database/src/service/read.rs b/database/src/service/read.rs index 7361ce72..cabe3350 100644 --- a/database/src/service/read.rs +++ b/database/src/service/read.rs @@ -2,6 +2,8 @@ //---------------------------------------------------------------------------------------------------- Import use std::{ + collections::{HashMap, HashSet}, + ops::Range, sync::Arc, task::{Context, Poll}, }; @@ -14,35 +16,16 @@ use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio_util::sync::PollSemaphore; use cuprate_helper::asynch::InfallibleOneshotReceiver; +use cuprate_types::service::{ReadRequest, Response}; use crate::{ config::ReaderThreads, error::RuntimeError, - service::{request::ReadRequest, response::Response}, + service::types::{ResponseReceiver, ResponseResult, ResponseSender}, + types::{Amount, AmountIndex, BlockHeight, KeyImage}, ConcreteEnv, }; -//---------------------------------------------------------------------------------------------------- Types -/// The actual type of the response. -/// -/// Either our [`Response`], or a database error occurred. -type ResponseResult = Result; - -/// The `Receiver` channel that receives the read response. -/// -/// This is owned by the caller (the reader) -/// who `.await`'s for the response. -/// -/// The channel itself should never fail, -/// but the actual database operation might. -type ResponseReceiver = InfallibleOneshotReceiver; - -/// The `Sender` channel for the response. -/// -/// The database reader thread uses this to send -/// the database result to the caller. -type ResponseSender = oneshot::Sender; - //---------------------------------------------------------------------------------------------------- DatabaseReadHandle /// Read handle to the database. /// @@ -155,15 +138,14 @@ impl tower::Service for DatabaseReadHandle { } // Acquire a permit before returning `Ready`. - let Some(permit) = ready!(self.semaphore.poll_acquire(cx)) else { - // `self` itself owns the backing semaphore, so it can't be closed. - unreachable!(); - }; + let permit = + ready!(self.semaphore.poll_acquire(cx)).expect("this semaphore is never closed"); self.permit = Some(permit); Poll::Ready(Ok(())) } + #[inline] fn call(&mut self, request: ReadRequest) -> Self::Future { let permit = self .permit @@ -193,28 +175,37 @@ impl tower::Service for DatabaseReadHandle { // This function maps [`Request`]s to function calls // executed by the rayon DB reader threadpool. -#[inline] -#[allow(clippy::needless_pass_by_value)] +#[allow(clippy::needless_pass_by_value)] // TODO: fix me /// Map [`Request`]'s to specific database handler functions. /// /// This is the main entrance into all `Request` handler functions. /// The basic structure is: -/// /// 1. `Request` is mapped to a handler function /// 2. Handler function is called /// 3. [`Response`] is sent fn map_request( - _permit: OwnedSemaphorePermit, // Permit for this request - env: Arc, // Access to the database - request: ReadRequest, // The request we must fulfill + _permit: OwnedSemaphorePermit, // Permit for this request, dropped at end of function + env: Arc, // Access to the database + request: ReadRequest, // The request we must fulfill response_sender: ResponseSender, // The channel we must send the response back to ) { /* TODO: pre-request handling, run some code for each request? */ + use ReadRequest as R; - match request { - ReadRequest::Example1 => example_handler_1(env, response_sender), - ReadRequest::Example2(x) => example_handler_2(env, response_sender, x), - ReadRequest::Example3(x) => example_handler_3(env, response_sender, x), + let response = match request { + R::BlockExtendedHeader(block) => block_extended_header(&env, block), + R::BlockHash(block) => block_hash(&env, block), + R::BlockExtendedHeaderInRange(range) => block_extended_header_in_range(&env, range), + R::ChainHeight => chain_height(&env), + R::GeneratedCoins => generated_coins(&env), + R::Outputs(map) => outputs(&env, map), + R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(&env, vec), + R::CheckKIsNotSpent(set) => check_k_is_not_spent(&env, set), + }; + + if let Err(e) = response_sender.send(response) { + // TODO: use tracing. + println!("database reader failed to send response: {e:?}"); } /* TODO: post-request handling, run some code for each request? */ @@ -223,6 +214,12 @@ fn map_request( //---------------------------------------------------------------------------------------------------- Handler functions // These are the actual functions that do stuff according to the incoming [`Request`]. // +// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to +// the enum variant name, e.g: `BlockExtendedHeader` -> `block_extended_header`. +// +// Each function will return the [`Response`] that we +// should send back to the caller in [`map_request()`]. +// // INVARIANT: // These functions are called above in `tower::Service::call()` // using a custom threadpool which means any call to `par_*()` functions @@ -231,26 +228,57 @@ fn map_request( // All functions below assume that this is the case, such that // `par_*()` functions will not block the _global_ rayon thread-pool. -/// TODO +/// [`ReadRequest::BlockExtendedHeader`]. #[inline] -#[allow(clippy::needless_pass_by_value)] // TODO: remove me -fn example_handler_1(env: Arc, response_sender: ResponseSender) { - let db_result = Ok(Response::Example1); - response_sender.send(db_result).unwrap(); +fn block_extended_header(env: &Arc, block_height: BlockHeight) -> ResponseResult { + todo!() } -/// TODO +/// [`ReadRequest::BlockHash`]. #[inline] -#[allow(clippy::needless_pass_by_value)] // TODO: remove me -fn example_handler_2(env: Arc, response_sender: ResponseSender, x: usize) { - let db_result = Ok(Response::Example2(x)); - response_sender.send(db_result).unwrap(); +fn block_hash(env: &Arc, block_height: BlockHeight) -> ResponseResult { + todo!() } +/// [`ReadRequest::BlockExtendedHeaderInRange`]. +#[inline] +fn block_extended_header_in_range( + env: &Arc, + range: std::ops::Range, +) -> ResponseResult { + todo!() +} + +/// [`ReadRequest::ChainHeight`]. +#[inline] +fn chain_height(env: &Arc) -> ResponseResult { + todo!() +} + +/// [`ReadRequest::GeneratedCoins`]. +#[inline] +fn generated_coins(env: &Arc) -> ResponseResult { + todo!() +} + +/// [`ReadRequest::Outputs`]. +#[inline] +#[allow(clippy::needless_pass_by_value)] // TODO: remove me +fn outputs(env: &Arc, map: HashMap>) -> ResponseResult { + todo!() +} + +/// [`ReadRequest::NumberOutputsWithAmount`]. /// TODO #[inline] #[allow(clippy::needless_pass_by_value)] // TODO: remove me -fn example_handler_3(env: Arc, response_sender: ResponseSender, x: String) { - let db_result = Ok(Response::Example3(x)); - response_sender.send(db_result).unwrap(); +fn number_outputs_with_amount(env: &Arc, vec: Vec) -> ResponseResult { + todo!() +} + +/// [`ReadRequest::CheckKIsNotSpent`]. +#[inline] +#[allow(clippy::needless_pass_by_value)] // TODO: remove me +fn check_k_is_not_spent(env: &Arc, set: HashSet) -> ResponseResult { + todo!() } diff --git a/database/src/service/request.rs b/database/src/service/request.rs deleted file mode 100644 index 93877ecd..00000000 --- a/database/src/service/request.rs +++ /dev/null @@ -1,41 +0,0 @@ -//! Read/write `Request`s to the database. -//! -//! TODO: could add `strum` derives. - -//---------------------------------------------------------------------------------------------------- Import - -//---------------------------------------------------------------------------------------------------- Constants - -//---------------------------------------------------------------------------------------------------- ReadRequest -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -/// A read request to the database. -pub enum ReadRequest { - /// TODO - Example1, - /// TODO - Example2(usize), - /// TODO - Example3(String), -} - -//---------------------------------------------------------------------------------------------------- WriteRequest -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -/// A write request to the database. -pub enum WriteRequest { - /// TODO - Example1, - /// TODO - Example2(usize), - /// TODO - Example3(String), -} - -//---------------------------------------------------------------------------------------------------- IMPL - -//---------------------------------------------------------------------------------------------------- Trait Impl - -//---------------------------------------------------------------------------------------------------- Tests -#[cfg(test)] -mod test { - // use super::*; -} diff --git a/database/src/service/response.rs b/database/src/service/response.rs deleted file mode 100644 index 6977efdb..00000000 --- a/database/src/service/response.rs +++ /dev/null @@ -1,38 +0,0 @@ -//! Read/write `Response`'s from the database. -//! -//! TODO: could add `strum` derives. - -//---------------------------------------------------------------------------------------------------- Import - -//---------------------------------------------------------------------------------------------------- Constants - -//---------------------------------------------------------------------------------------------------- Response -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -/// A response from the database. -/// -/// TODO -pub enum Response { - //-------------------------------------------------------- Read responses - /// TODO - Example1, - /// TODO - Example2(usize), - /// TODO - Example3(String), - - //-------------------------------------------------------- Write responses - /// The response - /// - /// TODO - ExampleWriteResponse, // Probably will be just `Ok` -} - -//---------------------------------------------------------------------------------------------------- IMPL - -//---------------------------------------------------------------------------------------------------- Trait Impl - -//---------------------------------------------------------------------------------------------------- Tests -#[cfg(test)] -mod test { - // use super::*; -} diff --git a/database/src/service/tests.rs b/database/src/service/tests.rs index a01f67a5..03c03dbc 100644 --- a/database/src/service/tests.rs +++ b/database/src/service/tests.rs @@ -12,9 +12,11 @@ //---------------------------------------------------------------------------------------------------- Use use tower::{Service, ServiceExt}; +use cuprate_types::service::{ReadRequest, Response, WriteRequest}; + use crate::{ config::Config, - service::{init, DatabaseReadHandle, DatabaseWriteHandle, ReadRequest, Response, WriteRequest}, + service::{init, DatabaseReadHandle, DatabaseWriteHandle}, }; //---------------------------------------------------------------------------------------------------- Tests @@ -34,43 +36,47 @@ fn init_drop() { let (reader, writer, _tempdir) = init_service(); } -/// Send a read request, and receive a response, -/// asserting the response the expected value. -#[tokio::test] -async fn read_request() { - let (reader, writer, _tempdir) = init_service(); +// TODO: +// un-comment and fix these tests when all `{read,write}` +// service functions are implemented. - for (request, expected_response) in [ - (ReadRequest::Example1, Response::Example1), - (ReadRequest::Example2(123), Response::Example2(123)), - ( - ReadRequest::Example3("hello".into()), - Response::Example3("hello".into()), - ), - ] { - // This calls `poll_ready()` asserting we have a permit before `call()`. - let response_channel = reader.clone().oneshot(request); - let response = response_channel.await.unwrap(); - assert_eq!(response, expected_response); - } -} +// /// Send a read request, and receive a response, +// /// asserting the response the expected value. +// #[tokio::test] +// async fn read_request() { +// let (reader, writer, _tempdir) = init_service(); -/// Send a write request, and receive a response, -/// asserting the response the expected value. -#[tokio::test] -async fn write_request() { - let (reader, mut writer, _tempdir) = init_service(); +// for (request, expected_response) in [ +// (ReadRequest::Example1, Response::Example1), +// (ReadRequest::Example2(123), Response::Example2(123)), +// ( +// ReadRequest::Example3("hello".into()), +// Response::Example3("hello".into()), +// ), +// ] { +// // This calls `poll_ready()` asserting we have a permit before `call()`. +// let response_channel = reader.clone().oneshot(request); +// let response = response_channel.await.unwrap(); +// assert_eq!(response, expected_response); +// } +// } - for (request, expected_response) in [ - (WriteRequest::Example1, Response::Example1), - (WriteRequest::Example2(123), Response::Example2(123)), - ( - WriteRequest::Example3("hello".into()), - Response::Example3("hello".into()), - ), - ] { - let response_channel = writer.call(request); - let response = response_channel.await.unwrap(); - assert_eq!(response, expected_response); - } -} +// /// Send a write request, and receive a response, +// /// asserting the response the expected value. +// #[tokio::test] +// async fn write_request() { +// let (reader, mut writer, _tempdir) = init_service(); + +// for (request, expected_response) in [ +// (WriteRequest::Example1, Response::Example1), +// (WriteRequest::Example2(123), Response::Example2(123)), +// ( +// WriteRequest::Example3("hello".into()), +// Response::Example3("hello".into()), +// ), +// ] { +// let response_channel = writer.call(request); +// let response = response_channel.await.unwrap(); +// assert_eq!(response, expected_response); +// } +// } diff --git a/database/src/service/types.rs b/database/src/service/types.rs new file mode 100644 index 00000000..265bf42c --- /dev/null +++ b/database/src/service/types.rs @@ -0,0 +1,31 @@ +//! Database service type aliases. +//! +//! Only used internally for our `tower::Service` impls. + +//---------------------------------------------------------------------------------------------------- Use +use futures::channel::oneshot::Sender; + +use cuprate_helper::asynch::InfallibleOneshotReceiver; +use cuprate_types::service::Response; + +use crate::error::RuntimeError; + +//---------------------------------------------------------------------------------------------------- Types +/// The actual type of the response. +/// +/// Either our [`Response`], or a database error occurred. +pub(super) type ResponseResult = Result; + +/// The `Receiver` channel that receives the read response. +/// +/// This is owned by the caller (the reader/writer thread) +/// who `.await`'s for the response. +/// +/// The channel itself should never fail, +/// but the actual database operation might. +pub(super) type ResponseReceiver = InfallibleOneshotReceiver; + +/// The `Sender` channel for the response. +/// +/// The database reader/writer thread uses this to send the database result to the caller. +pub(super) type ResponseSender = Sender; diff --git a/database/src/service/write.rs b/database/src/service/write.rs index 13e6f979..e05f9ad2 100644 --- a/database/src/service/write.rs +++ b/database/src/service/write.rs @@ -9,31 +9,20 @@ use std::{ use futures::channel::oneshot; use cuprate_helper::asynch::InfallibleOneshotReceiver; +use cuprate_types::{ + service::{Response, WriteRequest}, + VerifiedBlockInformation, +}; use crate::{ error::RuntimeError, - service::{request::WriteRequest, response::Response}, + service::types::{ResponseReceiver, ResponseResult, ResponseSender}, ConcreteEnv, Env, }; //---------------------------------------------------------------------------------------------------- Constants /// Name of the writer thread. -const WRITER_THREAD_NAME: &str = "cuprate_helper::service::read::DatabaseWriter"; - -//---------------------------------------------------------------------------------------------------- Types -/// The actual type of the response. -/// -/// Either our [Response], or a database error occurred. -type ResponseResult = Result; - -/// The `Receiver` channel that receives the write response. -/// -/// The channel itself should never fail, -/// but the actual database operation might. -type ResponseReceiver = InfallibleOneshotReceiver; - -/// The `Sender` channel for the response. -type ResponseSender = oneshot::Sender; +const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter"); //---------------------------------------------------------------------------------------------------- DatabaseWriteHandle /// Write handle to the database. @@ -57,7 +46,7 @@ impl DatabaseWriteHandle { /// Initialize the single `DatabaseWriter` thread. #[cold] #[inline(never)] // Only called once. - pub(super) fn init(db: Arc) -> Self { + pub(super) fn init(env: Arc) -> Self { // Initialize `Request/Response` channels. let (sender, receiver) = crossbeam::channel::unbounded(); @@ -65,7 +54,7 @@ impl DatabaseWriteHandle { std::thread::Builder::new() .name(WRITER_THREAD_NAME.into()) .spawn(move || { - let this = DatabaseWriter { receiver, db }; + let this = DatabaseWriter { receiver, env }; DatabaseWriter::main(this); }) .unwrap(); @@ -107,7 +96,7 @@ pub(super) struct DatabaseWriter { receiver: crossbeam::channel::Receiver<(WriteRequest, ResponseSender)>, /// Access to the database. - db: Arc, + env: Arc, } impl Drop for DatabaseWriter { @@ -119,7 +108,8 @@ impl Drop for DatabaseWriter { impl DatabaseWriter { /// The `DatabaseWriter`'s main function. /// - /// The writer just loops in this function. + /// The writer just loops in this function, handling requests forever + /// until the request channel is dropped or a panic occurs. #[cold] #[inline(never)] // Only called once. fn main(self) { @@ -127,7 +117,7 @@ impl DatabaseWriter { // 2. Map request to some database function // 3. Execute that function, get the result // 4. Return the result via channel - loop { + 'main: loop { let Ok((request, response_sender)) = self.receiver.recv() else { // If this receive errors, it means that the channel is empty // and disconnected, meaning the other side (all senders) have @@ -140,60 +130,101 @@ impl DatabaseWriter { return; }; + /// How many times should we retry handling the request on resize errors? + /// + /// This is 1 on automatically resizing databases, meaning there is only 1 iteration. + #[allow(clippy::items_after_statements)] + const REQUEST_RETRY_LIMIT: usize = if ConcreteEnv::MANUAL_RESIZE { 3 } else { 1 }; + // Map [`Request`]'s to specific database functions. - match request { - WriteRequest::Example1 => self.example_handler_1(response_sender), - WriteRequest::Example2(x) => self.example_handler_2(response_sender, x), - WriteRequest::Example3(x) => self.example_handler_3(response_sender, x), + // + // Both will: + // 1. Map the request to a function + // 2. Call the function + // 3. (manual resize only) If resize is needed, resize and retry + // 4. (manual resize only) Redo step {1, 2} + // 5. Send the function's `Result` back to the requester + // + // FIXME: there's probably a more elegant way + // to represent this retry logic with recursive + // functions instead of a loop. + 'retry: for retry in 0..REQUEST_RETRY_LIMIT { + // TODO: will there be more than 1 write request? + // this won't have to be an enum. + let response = match &request { + WriteRequest::WriteBlock(block) => write_block(&self.env, block), + }; + + // If the database needs to resize, do so. + if ConcreteEnv::MANUAL_RESIZE && matches!(response, Err(RuntimeError::ResizeNeeded)) + { + // If this is the last iteration of the outer `for` loop and we + // encounter a resize error _again_, it means something is wrong. + assert_ne!( + retry, REQUEST_RETRY_LIMIT, + "database resize failed maximum of {REQUEST_RETRY_LIMIT} times" + ); + + // Resize the map, and retry the request handling loop. + // + // FIXME: + // We could pass in custom resizes to account for + // batches, i.e., we're about to add ~5GB of data, + // add that much instead of the default 1GB. + // + let old = self.env.current_map_size(); + let new = self.env.resize_map(None); + + // TODO: use tracing. + println!("resizing database memory map, old: {old}B, new: {new}B"); + + // Try handling the request again. + continue 'retry; + } + + // Automatically resizing databases should not be returning a resize error. + #[cfg(debug_assertions)] + if !ConcreteEnv::MANUAL_RESIZE { + assert!( + !matches!(response, Err(RuntimeError::ResizeNeeded)), + "auto-resizing database returned a ResizeNeeded error" + ); + } + + // Send the response back, whether if it's an `Ok` or `Err`. + if let Err(e) = response_sender.send(response) { + // TODO: use tracing. + println!("database writer failed to send response: {e:?}"); + } + + continue 'main; } - } - } - /// Resize the database's memory map. - fn resize_map(&self) { - // The compiler most likely optimizes out this - // entire function call if this returns here. - if !ConcreteEnv::MANUAL_RESIZE { - return; + // Above retry loop should either: + // - continue to the next ['main] loop or... + // - ...retry until panic + unreachable!(); } - // INVARIANT: - // [`Env`]'s that are `MANUAL_RESIZE` are expected to implement - // their internals such that we have exclusive access when calling - // this function. We do not handle the exclusion part, `resize_map()` - // itself does. The `heed` backend does this with `RwLock`. - // - // We need mutual exclusion due to: - // - self.db.resize_map(None); - // TODO: - // We could pass in custom resizes to account for - // batch transactions, i.e., we're about to add ~5GB - // of data, add that much instead of the default 1GB. - // - } - - /// TODO - #[inline] - #[allow(clippy::unused_self)] // TODO: remove me - fn example_handler_1(&self, response_sender: ResponseSender) { - let db_result = Ok(Response::Example1); - response_sender.send(db_result).unwrap(); - } - - /// TODO - #[inline] - #[allow(clippy::unused_self)] // TODO: remove me - fn example_handler_2(&self, response_sender: ResponseSender, x: usize) { - let db_result = Ok(Response::Example2(x)); - response_sender.send(db_result).unwrap(); - } - - /// TODO - #[inline] - #[allow(clippy::unused_self)] // TODO: remove me - fn example_handler_3(&self, response_sender: ResponseSender, x: String) { - let db_result = Ok(Response::Example3(x)); - response_sender.send(db_result).unwrap(); + // The only case the ['main] loop breaks should be a: + // - direct function return + // - panic + // anything below should be unreachable. + unreachable!(); } } + +//---------------------------------------------------------------------------------------------------- Handler functions +// These are the actual functions that do stuff according to the incoming [`Request`]. +// +// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to +// the enum variant name, e.g: `BlockExtendedHeader` -> `block_extended_header`. +// +// Each function will return the [`Response`] that we +// should send back to the caller in [`map_request()`]. + +/// [`WriteRequest::WriteBlock`]. +#[inline] +fn write_block(env: &ConcreteEnv, block: &VerifiedBlockInformation) -> ResponseResult { + todo!() +} diff --git a/types/src/service.rs b/types/src/service.rs index 97344f64..2917536b 100644 --- a/types/src/service.rs +++ b/types/src/service.rs @@ -37,8 +37,6 @@ pub enum ReadRequest { NumberOutputsWithAmount(Vec), /// TODO CheckKIsNotSpent(HashSet<[u8; 32]>), - /// TODO - BlockBatchInRange(Range), } //---------------------------------------------------------------------------------------------------- WriteRequest @@ -73,8 +71,6 @@ pub enum Response { /// TODO /// returns true if key images are spent CheckKIsNotSpent(bool), - /// TODO - BlockBatchInRange(Vec<(Block, Vec)>), //------------------------------------------------------ Writes /// TODO diff --git a/typos.toml b/typos.toml index 299b8eb8..abab1903 100644 --- a/typos.toml +++ b/typos.toml @@ -6,6 +6,10 @@ extend-ignore-identifiers-re = [ # in file: `/cryptonight/c/oaes_lib.c:1213` # not sure if false-positive or not. "InvMixColums", + # cuprate_database's `TxRo` and `tx_ro` + "RO", + "Ro", + "ro", ] [files]