database: use rayon for service's reader thread-pool (#93)

* add `rayon 1.9.0`

* service: re-impl reader threadpool with `rayon`

* service: impl `tower::Service` for writer

* backend: create db dir in `Env::open`

* service: read + write request/response tests

* docs, name changes

* service: always return `Poll::Ready` in writer

* service: use `spawn()` instead of `install()`

* service: replace `DatabaseReader` with free functions

* cargo: add `tokio-utils`

* service: acquire permit before `call()` for read requests

* service: acquire permit in tests

* docs

* service: use loop for write request tests

* service: use `ready!()`
This commit is contained in:
hinto-janai 2024-03-21 16:16:12 -04:00 committed by GitHub
parent 93372fa4b5
commit 004bb153b4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 352 additions and 195 deletions

13
Cargo.lock generated
View file

@ -40,6 +40,12 @@ dependencies = [
"zerocopy", "zerocopy",
] ]
[[package]]
name = "allocator-api2"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5"
[[package]] [[package]]
name = "android-tzdata" name = "android-tzdata"
version = "0.1.1" version = "0.1.1"
@ -589,14 +595,17 @@ dependencies = [
"cfg-if", "cfg-if",
"crossbeam", "crossbeam",
"cuprate-helper", "cuprate-helper",
"futures",
"heed", "heed",
"page_size", "page_size",
"paste", "paste",
"rayon",
"redb", "redb",
"serde", "serde",
"tempfile", "tempfile",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-util",
"tower", "tower",
] ]
@ -1071,6 +1080,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604"
dependencies = [ dependencies = [
"ahash", "ahash",
"allocator-api2",
] ]
[[package]] [[package]]
@ -2862,7 +2872,10 @@ checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-core", "futures-core",
"futures-io",
"futures-sink", "futures-sink",
"futures-util",
"hashbrown 0.14.3",
"pin-project-lite", "pin-project-lite",
"slab", "slab",
"tokio", "tokio",

View file

@ -58,7 +58,7 @@ paste = { version = "1.0.14", default-features = false }
pin-project = { version = "1.1.3", default-features = false } pin-project = { version = "1.1.3", default-features = false }
randomx-rs = { git = "https://github.com/Cuprate/randomx-rs.git", rev = "0028464", default-features = false } randomx-rs = { git = "https://github.com/Cuprate/randomx-rs.git", rev = "0028464", default-features = false }
rand = { version = "0.8.5", default-features = false } rand = { version = "0.8.5", default-features = false }
rayon = { version = "1.8.0", default-features = false } rayon = { version = "1.9.0", default-features = false }
serde_bytes = { version = "0.11.12", default-features = false } serde_bytes = { version = "0.11.12", default-features = false }
serde_json = { version = "1.0.108", default-features = false } serde_json = { version = "1.0.108", default-features = false }
serde = { version = "1.0.190", default-features = false } serde = { version = "1.0.190", default-features = false }

View file

@ -9,11 +9,11 @@ repository = "https://github.com/Cuprate/cuprate/tree/main/database"
keywords = ["cuprate", "database"] keywords = ["cuprate", "database"]
[features] [features]
default = ["heed", "redb", "service"] # default = ["heed", "redb", "service"]
# default = ["redb", "service"] default = ["redb", "service"]
heed = ["dep:heed"] heed = ["dep:heed"]
redb = ["dep:redb"] redb = ["dep:redb"]
service = ["dep:crossbeam", "dep:tokio", "dep:tower"] service = ["dep:crossbeam", "dep:futures", "dep:tokio", "dep:tokio-util", "dep:tower", "dep:rayon"]
[dependencies] [dependencies]
bytemuck = { version = "1.14.3", features = ["must_cast", "derive", "min_const_generics", "extern_crate_alloc"] } bytemuck = { version = "1.14.3", features = ["must_cast", "derive", "min_const_generics", "extern_crate_alloc"] }
@ -27,9 +27,12 @@ page_size = { version = "0.6.0" } # Needed for database resizes, they must
thiserror = { workspace = true } thiserror = { workspace = true }
# `service` feature. # `service` feature.
crossbeam = { workspace = true, features = ["std"], optional = true } crossbeam = { workspace = true, features = ["std"], optional = true }
tokio = { workspace = true, features = ["full"], optional = true } futures = { workspace = true, optional = true }
tower = { workspace = true, features = ["full"], optional = true } tokio = { workspace = true, features = ["full"], optional = true }
tokio-util = { workspace = true, features = ["full"], optional = true }
tower = { workspace = true, features = ["full"], optional = true }
rayon = { workspace = true, optional = true }
# Optional features. # Optional features.
heed = { version = "0.20.0-alpha.9", optional = true } heed = { version = "0.20.0-alpha.9", optional = true }

View file

@ -164,6 +164,8 @@ impl Env for ConcreteEnv {
reader_threads + 16 reader_threads + 16
}); });
// Create the database directory if it doesn't exist.
std::fs::create_dir_all(config.db_directory())?;
// Open the environment in the user's PATH. // Open the environment in the user's PATH.
let env = env_open_options.open(config.db_directory())?; let env = env_open_options.open(config.db_directory())?;

View file

@ -72,6 +72,9 @@ impl Env for ConcreteEnv {
// TODO: we can set cache sizes with: // TODO: we can set cache sizes with:
// env_builder.set_cache(bytes); // env_builder.set_cache(bytes);
// Create the database directory if it doesn't exist.
std::fs::create_dir_all(config.db_directory())?;
// Open the database file, create if needed. // Open the database file, create if needed.
let db_file = std::fs::OpenOptions::new() let db_file = std::fs::OpenOptions::new()
.read(true) .read(true)

View file

@ -142,7 +142,6 @@
keyword_idents, keyword_idents,
non_ascii_idents, non_ascii_idents,
variant_size_differences, variant_size_differences,
unused_mut, // Annoying when debugging, maybe put in allow.
// Probably can be put into `#[deny]`. // Probably can be put into `#[deny]`.
future_incompatible, future_incompatible,
@ -168,6 +167,7 @@
clippy::pedantic, clippy::pedantic,
clippy::nursery, clippy::nursery,
clippy::cargo, clippy::cargo,
unused_mut,
missing_docs, missing_docs,
deprecated, deprecated,
unused_comparisons, unused_comparisons,

View file

@ -6,9 +6,7 @@ use std::sync::Arc;
use crate::{ use crate::{
config::Config, config::Config,
error::InitError, error::InitError,
service::{ service::{write::DatabaseWriter, DatabaseReadHandle, DatabaseWriteHandle},
read::DatabaseReader, write::DatabaseWriter, DatabaseReadHandle, DatabaseWriteHandle,
},
ConcreteEnv, Env, ConcreteEnv, Env,
}; };
@ -39,11 +37,10 @@ pub fn init(config: Config) -> Result<(DatabaseReadHandle, DatabaseWriteHandle),
let db: Arc<ConcreteEnv> = Arc::new(ConcreteEnv::open(config)?); let db: Arc<ConcreteEnv> = Arc::new(ConcreteEnv::open(config)?);
// Spawn the Reader thread pool and Writer. // Spawn the Reader thread pool and Writer.
let readers = DatabaseReader::init(&db, reader_threads); let readers = DatabaseReadHandle::init(&db, reader_threads);
let writers = DatabaseWriter::init(db); let writer = DatabaseWriteHandle::init(db);
// Return the handles to those pools. Ok((readers, writer))
Ok((readers, writers))
} }
//---------------------------------------------------------------------------------------------------- Tests //---------------------------------------------------------------------------------------------------- Tests

View file

@ -38,7 +38,7 @@
//! //!
//! Upon dropping the [`crate::ConcreteEnv`]: //! Upon dropping the [`crate::ConcreteEnv`]:
//! - All un-processed database transactions are completed //! - All un-processed database transactions are completed
//! - All data gets flushed to disk (caused by [`Drop::drop`] impl of [`crate::ConcreteEnv`]) //! - All data gets flushed to disk (caused by [`Drop::drop`] impl on [`crate::ConcreteEnv`])
//! //!
//! ## Request and Response //! ## Request and Response
//! To interact with the database (whether reading or writing data), //! To interact with the database (whether reading or writing data),

View file

@ -1,4 +1,4 @@
//! Database read thread-pool definitions and logic. //! Database reader thread-pool definitions and logic.
//---------------------------------------------------------------------------------------------------- Import //---------------------------------------------------------------------------------------------------- Import
use std::{ use std::{
@ -8,6 +8,11 @@ use std::{
use crossbeam::channel::Receiver; use crossbeam::channel::Receiver;
use futures::{channel::oneshot, ready};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::PollSemaphore;
use cuprate_helper::asynch::InfallibleOneshotReceiver; use cuprate_helper::asynch::InfallibleOneshotReceiver;
use crate::{ use crate::{
@ -20,7 +25,7 @@ use crate::{
//---------------------------------------------------------------------------------------------------- Types //---------------------------------------------------------------------------------------------------- Types
/// The actual type of the response. /// The actual type of the response.
/// ///
/// Either our [Response], or a database error occurred. /// Either our [`Response`], or a database error occurred.
type ResponseResult = Result<Response, RuntimeError>; type ResponseResult = Result<Response, RuntimeError>;
/// The `Receiver` channel that receives the read response. /// The `Receiver` channel that receives the read response.
@ -30,13 +35,13 @@ type ResponseResult = Result<Response, RuntimeError>;
/// ///
/// The channel itself should never fail, /// The channel itself should never fail,
/// but the actual database operation might. /// but the actual database operation might.
type ResponseRecv = InfallibleOneshotReceiver<ResponseResult>; type ResponseReceiver = InfallibleOneshotReceiver<ResponseResult>;
/// The `Sender` channel for the response. /// The `Sender` channel for the response.
/// ///
/// The database reader thread uses this to send /// The database reader thread uses this to send
/// the database result to the caller. /// the database result to the caller.
type ResponseSend = tokio::sync::oneshot::Sender<ResponseResult>; type ResponseSender = oneshot::Sender<ResponseResult>;
//---------------------------------------------------------------------------------------------------- DatabaseReadHandle //---------------------------------------------------------------------------------------------------- DatabaseReadHandle
/// Read handle to the database. /// Read handle to the database.
@ -47,145 +52,205 @@ type ResponseSend = tokio::sync::oneshot::Sender<ResponseResult>;
/// Calling [`tower::Service::call`] with a [`DatabaseReadHandle`] & [`ReadRequest`] /// Calling [`tower::Service::call`] with a [`DatabaseReadHandle`] & [`ReadRequest`]
/// will return an `async`hronous channel that can be `.await`ed upon /// will return an `async`hronous channel that can be `.await`ed upon
/// to receive the corresponding [`Response`]. /// to receive the corresponding [`Response`].
#[derive(Clone, Debug)]
pub struct DatabaseReadHandle { pub struct DatabaseReadHandle {
/// Sender channel to the database read thread-pool. /// Handle to the custom `rayon` DB reader thread-pool.
/// ///
/// We provide the response channel for the thread-pool. /// Requests are [`rayon::ThreadPool::spawn`]ed in this thread-pool,
pub(super) sender: crossbeam::channel::Sender<(ReadRequest, ResponseSend)>, /// and responses are returned via a channel we (the caller) provide.
pool: Arc<rayon::ThreadPool>,
/// Counting semaphore asynchronous permit for database access.
/// Each [`tower::Service::poll_ready`] will acquire a permit
/// before actually sending a request to the `rayon` DB threadpool.
semaphore: PollSemaphore,
/// An owned permit.
/// This will be set to [`Some`] in `poll_ready()` when we successfully acquire
/// the permit, and will be [`Option::take()`]n after `tower::Service::call()` is called.
///
/// The actual permit will be dropped _after_ the rayon DB thread has finished
/// the request, i.e., after [`map_request()`] finishes.
permit: Option<OwnedSemaphorePermit>,
/// Access to the database.
env: Arc<ConcreteEnv>,
}
// `OwnedSemaphorePermit` does not implement `Clone`,
// so manually clone all elements, while keeping `permit`
// `None` across clones.
impl Clone for DatabaseReadHandle {
fn clone(&self) -> Self {
Self {
pool: self.pool.clone(),
semaphore: self.semaphore.clone(),
permit: None,
env: self.env.clone(),
}
}
}
impl DatabaseReadHandle {
/// Initialize the `DatabaseReader` thread-pool backed by `rayon`.
///
/// This spawns `N` amount of `DatabaseReader`'s
/// attached to `env` and returns a handle to the pool.
///
/// Should be called _once_ per actual database.
#[cold]
#[inline(never)] // Only called once.
pub(super) fn init(env: &Arc<ConcreteEnv>, reader_threads: ReaderThreads) -> Self {
// How many reader threads to spawn?
let reader_count = reader_threads.as_threads().get();
// Spawn `rayon` reader threadpool.
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(reader_count)
.thread_name(|i| format!("cuprate_helper::service::read::DatabaseReader{i}"))
.build()
.unwrap();
// Create a semaphore with the same amount of
// permits as the amount of reader threads.
let semaphore = PollSemaphore::new(Arc::new(Semaphore::new(reader_count)));
// Return a handle to the pool.
Self {
pool: Arc::new(pool),
semaphore,
permit: None,
env: Arc::clone(env),
}
}
/// TODO
#[inline]
pub const fn env(&self) -> &Arc<ConcreteEnv> {
&self.env
}
/// TODO
#[inline]
pub const fn semaphore(&self) -> &PollSemaphore {
&self.semaphore
}
/// TODO
#[inline]
pub const fn permit(&self) -> &Option<OwnedSemaphorePermit> {
&self.permit
}
} }
impl tower::Service<ReadRequest> for DatabaseReadHandle { impl tower::Service<ReadRequest> for DatabaseReadHandle {
type Response = Response; type Response = Response;
type Error = RuntimeError; type Error = RuntimeError;
type Future = ResponseRecv; type Future = ResponseReceiver;
#[inline] #[inline]
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
todo!() // Check if we already have a permit.
} if self.permit.is_some() {
return Poll::Ready(Ok(()));
#[inline]
fn call(&mut self, _req: ReadRequest) -> Self::Future {
todo!()
}
}
//---------------------------------------------------------------------------------------------------- DatabaseReader
/// Database reader thread.
///
/// This struct essentially represents a thread.
///
/// Each reader thread is spawned with access to this struct (self).
pub(super) struct DatabaseReader {
/// Receiver side of the database request channel.
///
/// Any caller can send some requests to this channel.
/// They send them alongside another `Response` channel,
/// which we will eventually send to.
///
/// We (the database reader thread) are not responsible
/// for creating this channel, the caller provides it.
///
/// SOMEDAY: this struct itself could cache a return channel
/// instead of creating a new `oneshot` each request.
receiver: Receiver<(ReadRequest, ResponseSend)>,
/// Access to the database.
db: Arc<ConcreteEnv>,
}
impl Drop for DatabaseReader {
fn drop(&mut self) {
// INVARIANT: we set the thread name when spawning it.
let thread_name = std::thread::current().name().unwrap();
// TODO: log that this thread has exited?
}
}
impl DatabaseReader {
/// Initialize the `DatabaseReader` thread-pool.
///
/// This spawns `N` amount of `DatabaseReader`'s
/// attached to `db` and returns a handle to the pool.
///
/// Should be called _once_ per actual database.
#[cold]
#[inline(never)] // Only called once.
pub(super) fn init(db: &Arc<ConcreteEnv>, reader_threads: ReaderThreads) -> DatabaseReadHandle {
// Initialize `Request/Response` channels.
let (sender, receiver) = crossbeam::channel::unbounded();
// How many reader threads to spawn?
let reader_count = reader_threads.as_threads();
// Spawn pool of readers.
for i in 0..reader_count.get() {
let receiver = receiver.clone();
let db = Arc::clone(db);
std::thread::Builder::new()
.name(format!("cuprate_helper::service::read::DatabaseReader{i}"))
.spawn(move || {
let this = Self { receiver, db };
Self::main(this);
})
.unwrap();
} }
// Return a handle to the pool and channels to // Acquire a permit before returning `Ready`.
// allow clean shutdown of all reader threads. let Some(permit) = ready!(self.semaphore.poll_acquire(cx)) else {
DatabaseReadHandle { sender } // `self` itself owns the backing semaphore, so it can't be closed.
unreachable!();
};
self.permit = Some(permit);
Poll::Ready(Ok(()))
} }
/// The `DatabaseReader`'s main function. fn call(&mut self, request: ReadRequest) -> Self::Future {
/// let permit = self
/// Each thread just loops in this function. .permit
#[cold] .take()
#[inline(never)] // Only called once. .expect("poll_ready() should have acquire a permit before calling call()");
fn main(self) {
// 1. Hang on request channel
// 2. Map request to some database function
// 3. Execute that function, get the result
// 4. Return the result via channel
loop {
// Database requests.
let Ok((request, response_send)) = self.receiver.recv() else {
// If this receive errors, it means that the channel is empty
// and disconnected, meaning the other side (all senders) have
// been dropped. This means "shutdown", and we return here to
// exit the thread.
return;
};
// Map [`Request`]'s to specific database functions. // Response channel we `.await` on.
match request { let (response_sender, receiver) = oneshot::channel();
ReadRequest::Example1 => self.example_handler_1(response_send),
ReadRequest::Example2(_x) => self.example_handler_2(response_send),
ReadRequest::Example3(_x) => self.example_handler_3(response_send),
}
}
}
/// TODO // Spawn the request in the rayon DB thread-pool.
#[inline] //
fn example_handler_1(&self, response_send: ResponseSend) { // Note that this uses `self.pool` instead of `rayon::spawn`
let db_result = todo!(); // such that any `rayon` parallel code that runs within
response_send.send(db_result).unwrap(); // the passed closure uses the same `rayon` threadpool.
} //
// INVARIANT:
// The below `DatabaseReader` function impl block relies on this behavior.
let env = Arc::clone(self.env());
self.pool
.spawn(move || map_request(permit, env, request, response_sender));
/// TODO InfallibleOneshotReceiver::from(receiver)
#[inline]
fn example_handler_2(&self, response_send: ResponseSend) {
let db_result = todo!();
response_send.send(db_result).unwrap();
}
/// TODO
#[inline]
fn example_handler_3(&self, response_send: ResponseSend) {
let db_result = todo!();
response_send.send(db_result).unwrap();
} }
} }
//---------------------------------------------------------------------------------------------------- Request Mapping
// This function maps [`Request`]s to function calls
// executed by the rayon DB reader threadpool.
#[inline]
#[allow(clippy::needless_pass_by_value)]
/// Map [`Request`]'s to specific database handler functions.
///
/// This is the main entrance into all `Request` handler functions.
/// The basic structure is:
///
/// 1. `Request` is mapped to a handler function
/// 2. Handler function is called
/// 3. [`Response`] is sent
fn map_request(
_permit: OwnedSemaphorePermit, // Permit for this request
env: Arc<ConcreteEnv>, // Access to the database
request: ReadRequest, // The request we must fulfill
response_sender: ResponseSender, // The channel we must send the response back to
) {
/* TODO: pre-request handling, run some code for each request? */
match request {
ReadRequest::Example1 => example_handler_1(env, response_sender),
ReadRequest::Example2(x) => example_handler_2(env, response_sender, x),
ReadRequest::Example3(x) => example_handler_3(env, response_sender, x),
}
/* TODO: post-request handling, run some code for each request? */
}
//---------------------------------------------------------------------------------------------------- Handler functions
// These are the actual functions that do stuff according to the incoming [`Request`].
//
// INVARIANT:
// These functions are called above in `tower::Service::call()`
// using a custom threadpool which means any call to `par_*()` functions
// will be using the custom rayon DB reader thread-pool, not the global one.
//
// All functions below assume that this is the case, such that
// `par_*()` functions will not block the _global_ rayon thread-pool.
/// TODO
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn example_handler_1(env: Arc<ConcreteEnv>, response_sender: ResponseSender) {
let db_result = Ok(Response::Example1);
response_sender.send(db_result).unwrap();
}
/// TODO
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn example_handler_2(env: Arc<ConcreteEnv>, response_sender: ResponseSender, x: usize) {
let db_result = Ok(Response::Example2(x));
response_sender.send(db_result).unwrap();
}
/// TODO
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn example_handler_3(env: Arc<ConcreteEnv>, response_sender: ResponseSender, x: String) {
let db_result = Ok(Response::Example3(x));
response_sender.send(db_result).unwrap();
}

View file

@ -7,7 +7,7 @@
//---------------------------------------------------------------------------------------------------- Constants //---------------------------------------------------------------------------------------------------- Constants
//---------------------------------------------------------------------------------------------------- ReadRequest //---------------------------------------------------------------------------------------------------- ReadRequest
#[derive(Debug)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
/// A read request to the database. /// A read request to the database.
pub enum ReadRequest { pub enum ReadRequest {
/// TODO /// TODO
@ -19,7 +19,7 @@ pub enum ReadRequest {
} }
//---------------------------------------------------------------------------------------------------- WriteRequest //---------------------------------------------------------------------------------------------------- WriteRequest
#[derive(Debug)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
/// A write request to the database. /// A write request to the database.
pub enum WriteRequest { pub enum WriteRequest {
/// TODO /// TODO

View file

@ -7,7 +7,7 @@
//---------------------------------------------------------------------------------------------------- Constants //---------------------------------------------------------------------------------------------------- Constants
//---------------------------------------------------------------------------------------------------- Response //---------------------------------------------------------------------------------------------------- Response
#[derive(Debug)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
/// A response from the database. /// A response from the database.
/// ///
/// TODO /// TODO

View file

@ -7,8 +7,70 @@
// This is only imported on `#[cfg(test)]` in `mod.rs`. // This is only imported on `#[cfg(test)]` in `mod.rs`.
#[test] #![allow(unused_mut, clippy::significant_drop_tightening)]
const fn test() {
// TODO: remove me. //---------------------------------------------------------------------------------------------------- Use
// Just to see if the module gets imported correctly on test mode. use tower::{Service, ServiceExt};
use crate::{
config::Config,
service::{init, DatabaseReadHandle, DatabaseWriteHandle, ReadRequest, Response, WriteRequest},
};
//---------------------------------------------------------------------------------------------------- Tests
/// Initialize the `service`.
fn init_service() -> (DatabaseReadHandle, DatabaseWriteHandle, tempfile::TempDir) {
let tempdir = tempfile::tempdir().unwrap();
let config = Config::low_power(Some(tempdir.path().into()));
let (reader, writer) = init(config).unwrap();
(reader, writer, tempdir)
}
/// Simply `init()` the service and then drop it.
///
/// If this test fails, something is very wrong.
#[test]
fn init_drop() {
let (reader, writer, _tempdir) = init_service();
}
/// Send a read request, and receive a response,
/// asserting the response the expected value.
#[tokio::test]
async fn read_request() {
let (reader, writer, _tempdir) = init_service();
for (request, expected_response) in [
(ReadRequest::Example1, Response::Example1),
(ReadRequest::Example2(123), Response::Example2(123)),
(
ReadRequest::Example3("hello".into()),
Response::Example3("hello".into()),
),
] {
// This calls `poll_ready()` asserting we have a permit before `call()`.
let response_channel = reader.clone().oneshot(request);
let response = response_channel.await.unwrap();
assert_eq!(response, expected_response);
}
}
/// Send a write request, and receive a response,
/// asserting the response the expected value.
#[tokio::test]
async fn write_request() {
let (reader, mut writer, _tempdir) = init_service();
for (request, expected_response) in [
(WriteRequest::Example1, Response::Example1),
(WriteRequest::Example2(123), Response::Example2(123)),
(
WriteRequest::Example3("hello".into()),
Response::Example3("hello".into()),
),
] {
let response_channel = writer.call(request);
let response = response_channel.await.unwrap();
assert_eq!(response, expected_response);
}
} }

View file

@ -1,4 +1,4 @@
//! Database write thread-pool definitions and logic. //! Database writer thread definitions and logic.
//---------------------------------------------------------------------------------------------------- Import //---------------------------------------------------------------------------------------------------- Import
use std::{ use std::{
@ -6,6 +6,8 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use futures::channel::oneshot;
use cuprate_helper::asynch::InfallibleOneshotReceiver; use cuprate_helper::asynch::InfallibleOneshotReceiver;
use crate::{ use crate::{
@ -28,10 +30,10 @@ type ResponseResult = Result<Response, RuntimeError>;
/// ///
/// The channel itself should never fail, /// The channel itself should never fail,
/// but the actual database operation might. /// but the actual database operation might.
type ResponseRecv = InfallibleOneshotReceiver<ResponseResult>; type ResponseReceiver = InfallibleOneshotReceiver<ResponseResult>;
/// The `Sender` channel for the response. /// The `Sender` channel for the response.
type ResponseSend = tokio::sync::oneshot::Sender<ResponseResult>; type ResponseSender = oneshot::Sender<ResponseResult>;
//---------------------------------------------------------------------------------------------------- DatabaseWriteHandle //---------------------------------------------------------------------------------------------------- DatabaseWriteHandle
/// Write handle to the database. /// Write handle to the database.
@ -48,22 +50,49 @@ pub struct DatabaseWriteHandle {
/// Sender channel to the database write thread-pool. /// Sender channel to the database write thread-pool.
/// ///
/// We provide the response channel for the thread-pool. /// We provide the response channel for the thread-pool.
pub(super) sender: crossbeam::channel::Sender<(WriteRequest, ResponseSend)>, pub(super) sender: crossbeam::channel::Sender<(WriteRequest, ResponseSender)>,
}
impl DatabaseWriteHandle {
/// Initialize the single `DatabaseWriter` thread.
#[cold]
#[inline(never)] // Only called once.
pub(super) fn init(db: Arc<ConcreteEnv>) -> Self {
// Initialize `Request/Response` channels.
let (sender, receiver) = crossbeam::channel::unbounded();
// Spawn the writer.
std::thread::Builder::new()
.name(WRITER_THREAD_NAME.into())
.spawn(move || {
let this = DatabaseWriter { receiver, db };
DatabaseWriter::main(this);
})
.unwrap();
Self { sender }
}
} }
impl tower::Service<WriteRequest> for DatabaseWriteHandle { impl tower::Service<WriteRequest> for DatabaseWriteHandle {
type Response = Response; type Response = Response;
type Error = RuntimeError; type Error = RuntimeError;
type Future = ResponseRecv; type Future = ResponseReceiver;
#[inline] #[inline]
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
todo!() Poll::Ready(Ok(()))
} }
#[inline] #[inline]
fn call(&mut self, _req: WriteRequest) -> Self::Future { fn call(&mut self, request: WriteRequest) -> Self::Future {
todo!() // Response channel we `.await` on.
let (response_sender, receiver) = oneshot::channel();
// Send the write request.
self.sender.send((request, response_sender)).unwrap();
InfallibleOneshotReceiver::from(receiver)
} }
} }
@ -75,7 +104,7 @@ pub(super) struct DatabaseWriter {
/// Any caller can send some requests to this channel. /// Any caller can send some requests to this channel.
/// They send them alongside another `Response` channel, /// They send them alongside another `Response` channel,
/// which we will eventually send to. /// which we will eventually send to.
receiver: crossbeam::channel::Receiver<(WriteRequest, ResponseSend)>, receiver: crossbeam::channel::Receiver<(WriteRequest, ResponseSender)>,
/// Access to the database. /// Access to the database.
db: Arc<ConcreteEnv>, db: Arc<ConcreteEnv>,
@ -88,38 +117,18 @@ impl Drop for DatabaseWriter {
} }
impl DatabaseWriter { impl DatabaseWriter {
/// Initialize the single `DatabaseWriter` thread.
#[cold]
#[inline(never)] // Only called once.
pub(super) fn init(db: Arc<ConcreteEnv>) -> DatabaseWriteHandle {
// Initialize `Request/Response` channels.
let (sender, receiver) = crossbeam::channel::unbounded();
// Spawn the writer.
std::thread::Builder::new()
.name(WRITER_THREAD_NAME.into())
.spawn(move || {
let this = Self { receiver, db };
Self::main(this);
})
.unwrap();
// Return a handle to the pool.
DatabaseWriteHandle { sender }
}
/// The `DatabaseWriter`'s main function. /// The `DatabaseWriter`'s main function.
/// ///
/// The writer just loops in this function. /// The writer just loops in this function.
#[cold] #[cold]
#[inline(never)] // Only called once. #[inline(never)] // Only called once.
fn main(mut self) { fn main(self) {
// 1. Hang on request channel // 1. Hang on request channel
// 2. Map request to some database function // 2. Map request to some database function
// 3. Execute that function, get the result // 3. Execute that function, get the result
// 4. Return the result via channel // 4. Return the result via channel
loop { loop {
let Ok((request, response_send)) = self.receiver.recv() else { let Ok((request, response_sender)) = self.receiver.recv() else {
// If this receive errors, it means that the channel is empty // If this receive errors, it means that the channel is empty
// and disconnected, meaning the other side (all senders) have // and disconnected, meaning the other side (all senders) have
// been dropped. This means "shutdown", and we return here to // been dropped. This means "shutdown", and we return here to
@ -133,9 +142,9 @@ impl DatabaseWriter {
// Map [`Request`]'s to specific database functions. // Map [`Request`]'s to specific database functions.
match request { match request {
WriteRequest::Example1 => self.example_handler_1(response_send), WriteRequest::Example1 => self.example_handler_1(response_sender),
WriteRequest::Example2(_x) => self.example_handler_2(response_send), WriteRequest::Example2(x) => self.example_handler_2(response_sender, x),
WriteRequest::Example3(_x) => self.example_handler_3(response_send), WriteRequest::Example3(x) => self.example_handler_3(response_sender, x),
} }
} }
} }
@ -166,22 +175,25 @@ impl DatabaseWriter {
/// TODO /// TODO
#[inline] #[inline]
fn example_handler_1(&mut self, response_send: ResponseSend) { #[allow(clippy::unused_self)] // TODO: remove me
let db_result = todo!(); fn example_handler_1(&self, response_sender: ResponseSender) {
response_send.send(db_result).unwrap(); let db_result = Ok(Response::Example1);
response_sender.send(db_result).unwrap();
} }
/// TODO /// TODO
#[inline] #[inline]
fn example_handler_2(&mut self, response_send: ResponseSend) { #[allow(clippy::unused_self)] // TODO: remove me
let db_result = todo!(); fn example_handler_2(&self, response_sender: ResponseSender, x: usize) {
response_send.send(db_result).unwrap(); let db_result = Ok(Response::Example2(x));
response_sender.send(db_result).unwrap();
} }
/// TODO /// TODO
#[inline] #[inline]
fn example_handler_3(&mut self, response_send: ResponseSend) { #[allow(clippy::unused_self)] // TODO: remove me
let db_result = todo!(); fn example_handler_3(&self, response_sender: ResponseSender, x: String) {
response_send.send(db_result).unwrap(); let db_result = Ok(Response::Example3(x));
response_sender.send(db_result).unwrap();
} }
} }