From 9c27ba5791377d639cb5d30d0f692c228568c122 Mon Sep 17 00:00:00 2001 From: hinto-janai Date: Wed, 1 May 2024 13:52:20 -0400 Subject: [PATCH] database: impl `service` fn bodies (#113) * write: impl write_block() * ops: add `get_block_info()` * read: impl block fn's * read: fix signatures * service: wrap `ConcreteEnv` in `RwLock` and doc why * heed: use `read-txn-no-tls` for `Send` read transactions * service: remove RwLock, impl some read functions * read: impl `outputs()` * read: flatten indentation, add `thread_local()` * read: impl `number_outputs_with_amount()` * tests: add `AssertTableLen` * ops: replace all table len asserts with `AssertTableLen` * service: initial tests * service: finish most tests * service: fix bad block data in test * tables: fix incorrect doc * service: add `ReadRequest::Outputs` test * read: use macros for set/getting `ThreadLocal`'s based on backend * small fixes * fix review * small fixes * read: fix ThreadLocal macros for `redb` * read: move `Output` mapping to `crate::free` it's needed in tests too * service: check output value correctness in tests * helper: add timelock <-> u64 mapping functions * free: use `u64_to_timelock()` * read: rct outputs * read: fix variable name * read: use ThreadLocal for both backends * heed: use Mutex for `HeedTableRo`'s read tx * block: add miner_tx * heed: remove Table bound oops * Revert "heed: use Mutex for `HeedTableRo`'s read tx" This reverts commit 7e8aae016c55802070ccf7d152aa8966984d7186. * add `UnsafeSendable` * read: use `UnsafeSendable` for `heed`, branch on backend * read: safety docs * cargo.toml: re-add `read-txn-no-tls` for heed * ops: fix tests, remove miner_tx * fix tx_idx calculation, account for RCT outputs in tests * read: docs, fix `get_tables!()` for both backends * fix clippy * database: `unsafe trait DatabaseRo` * tx: use correct tx_id * free: remove miner_tx comment * free: remove `amount` input for rct outputs * ops: split `add_tx` inputs * read: use `UnsafeSendable` for all backends * heed: update safety comment * move output functions `free` -> `ops` * read: fix `chain_height()` handling * remove serde on `UnsafeSendable` * de-dup docs on `trait DatabaseRo`, `get_tables!()` * Update database/src/unsafe_sendable.rs Co-authored-by: Boog900 --------- Co-authored-by: Boog900 --- Cargo.lock | 2 + database/Cargo.toml | 15 +- database/src/backend/heed/database.rs | 7 +- database/src/backend/redb/database.rs | 6 +- database/src/database.rs | 24 +- database/src/env.rs | 4 +- database/src/free.rs | 2 - database/src/lib.rs | 9 +- database/src/ops/block.rs | 54 ++-- database/src/ops/blockchain.rs | 20 +- database/src/ops/key_image.rs | 24 +- database/src/ops/output.rs | 141 +++++++-- database/src/ops/tx.rs | 56 ++-- database/src/service/read.rs | 334 +++++++++++++++++---- database/src/service/tests.rs | 399 ++++++++++++++++++++++---- database/src/service/write.rs | 28 +- database/src/tables.rs | 6 +- database/src/tests.rs | 51 +++- database/src/unsafe_sendable.rs | 86 ++++++ helper/Cargo.toml | 13 +- helper/src/map.rs | 49 ++++ 21 files changed, 1115 insertions(+), 215 deletions(-) create mode 100644 database/src/unsafe_sendable.rs diff --git a/Cargo.lock b/Cargo.lock index 8b45568..e9cb1b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -555,6 +555,7 @@ dependencies = [ "serde", "tempfile", "thiserror", + "thread_local", "tokio", "tokio-util", "tower", @@ -569,6 +570,7 @@ dependencies = [ "dirs", "futures", "libc", + "monero-serai", "rayon", "tokio", "windows", diff --git a/database/Cargo.toml b/database/Cargo.toml index c8efac2..712dbb1 100644 --- a/database/Cargo.toml +++ b/database/Cargo.toml @@ -35,15 +35,16 @@ page_size = { version = "0.6.0" } # Needed for database resizes, they mus thiserror = { workspace = true } # `service` feature. -crossbeam = { workspace = true, features = ["std"], optional = true } -futures = { workspace = true, optional = true } -tokio = { workspace = true, features = ["full"], optional = true } -tokio-util = { workspace = true, features = ["full"], optional = true } -tower = { workspace = true, features = ["full"], optional = true } -rayon = { workspace = true, optional = true } +crossbeam = { workspace = true, features = ["std"], optional = true } +futures = { workspace = true, optional = true } +tokio = { workspace = true, features = ["full"], optional = true } +tokio-util = { workspace = true, features = ["full"], optional = true } +tower = { workspace = true, features = ["full"], optional = true } +thread_local = { workspace = true } +rayon = { workspace = true, optional = true } # Optional features. -heed = { version = "0.20.0", optional = true } +heed = { version = "0.20.0", features = ["read-txn-no-tls"], optional = true } redb = { version = "2.1.0", optional = true } serde = { workspace = true, optional = true } diff --git a/database/src/backend/heed/database.rs b/database/src/backend/heed/database.rs index 01d544d..573d32c 100644 --- a/database/src/backend/heed/database.rs +++ b/database/src/backend/heed/database.rs @@ -137,7 +137,8 @@ impl DatabaseIter for HeedTableRo<'_, T> { } //---------------------------------------------------------------------------------------------------- DatabaseRo Impl -impl DatabaseRo for HeedTableRo<'_, T> { +// SAFETY: `HeedTableRo: !Send` as it holds a reference to `heed::RoTxn: Send + !Sync`. +unsafe impl DatabaseRo for HeedTableRo<'_, T> { #[inline] fn get(&self, key: &T::Key) -> Result { get::(&self.db, self.tx_ro, key) @@ -165,7 +166,9 @@ impl DatabaseRo for HeedTableRo<'_, T> { } //---------------------------------------------------------------------------------------------------- DatabaseRw Impl -impl DatabaseRo for HeedTableRw<'_, '_, T> { +// SAFETY: The `Send` bound only applies to `HeedTableRo`. +// `HeedTableRw`'s write transaction is `!Send`. +unsafe impl DatabaseRo for HeedTableRw<'_, '_, T> { #[inline] fn get(&self, key: &T::Key) -> Result { get::(&self.db, &self.tx_rw.borrow(), key) diff --git a/database/src/backend/redb/database.rs b/database/src/backend/redb/database.rs index bcfb2be..3224c24 100644 --- a/database/src/backend/redb/database.rs +++ b/database/src/backend/redb/database.rs @@ -118,7 +118,8 @@ impl DatabaseIter for RedbTableRo { } //---------------------------------------------------------------------------------------------------- DatabaseRo -impl DatabaseRo for RedbTableRo { +// SAFETY: Both `redb`'s transaction and table types are `Send + Sync`. +unsafe impl DatabaseRo for RedbTableRo { #[inline] fn get(&self, key: &T::Key) -> Result { get::(self, key) @@ -146,7 +147,8 @@ impl DatabaseRo for RedbTableRo { } //---------------------------------------------------------------------------------------------------- DatabaseRw -impl DatabaseRo for RedbTableRw<'_, T::Key, T::Value> { +// SAFETY: Both `redb`'s transaction and table types are `Send + Sync`. +unsafe impl DatabaseRo for RedbTableRw<'_, T::Key, T::Value> { #[inline] fn get(&self, key: &T::Key) -> Result { get::(self, key) diff --git a/database/src/database.rs b/database/src/database.rs index 2ec3d32..8273a92 100644 --- a/database/src/database.rs +++ b/database/src/database.rs @@ -82,7 +82,29 @@ pub trait DatabaseIter { /// /// This is a read-only database table, /// write operations are defined in [`DatabaseRw`]. -pub trait DatabaseRo { +/// +/// # Safety +/// The table type that implements this MUST be `Send`. +/// +/// However if the table holds a reference to a transaction: +/// - only the transaction only has to be `Send` +/// - the table cannot implement `Send` +/// +/// For example: +/// +/// `heed`'s transactions are `Send` but `HeedTableRo` contains a `&` +/// to the transaction, as such, if `Send` were implemented on `HeedTableRo` +/// then 1 transaction could be used to open multiple tables, then sent to +/// other threads - this would be a soundness hole against `HeedTableRo`. +/// +/// `&T` is only `Send` if `T: Sync`. +/// +/// `heed::RoTxn: !Sync`, therefore our table +/// holding `&heed::RoTxn` must NOT be `Send`. +/// +/// - +/// - +pub unsafe trait DatabaseRo { /// Get the value corresponding to a key. /// /// The returned value is _owned_. diff --git a/database/src/env.rs b/database/src/env.rs index d21595c..437928a 100644 --- a/database/src/env.rs +++ b/database/src/env.rs @@ -12,7 +12,7 @@ use crate::{ tables::{ call_fn_on_all_tables_or_early_return, BlockBlobs, BlockHeights, BlockInfos, KeyImages, NumOutputs, Outputs, PrunableHashes, PrunableTxBlobs, PrunedTxBlobs, RctOutputs, Tables, - TablesMut, TxHeights, TxIds, TxUnlockTime, + TablesIter, TablesMut, TxHeights, TxIds, TxUnlockTime, }, transaction::{TxRo, TxRw}, }; @@ -233,7 +233,7 @@ where /// /// # Errors /// TODO - fn open_tables(&self, tx_ro: &Ro) -> Result { + fn open_tables(&self, tx_ro: &Ro) -> Result { call_fn_on_all_tables_or_early_return! { Self::open_db_ro(self, tx_ro) } diff --git a/database/src/free.rs b/database/src/free.rs index a9b9385..7e145a2 100644 --- a/database/src/free.rs +++ b/database/src/free.rs @@ -1,6 +1,4 @@ //! General free functions (related to the database). -//! -//! TODO. //---------------------------------------------------------------------------------------------------- Import diff --git a/database/src/lib.rs b/database/src/lib.rs index ce7df28..4d0fe87 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -191,6 +191,10 @@ // although it is sometimes nice. clippy::must_use_candidate, + // FIXME: good lint but too many false positives + // with our `Env` + `RwLock` setup. + clippy::significant_drop_tightening, + // TODO: should be removed after all `todo!()`'s are gone. clippy::diverging_sub_expression, @@ -238,7 +242,7 @@ pub use env::{Env, EnvInner}; mod error; pub use error::{InitError, RuntimeError}; -mod free; +pub(crate) mod free; pub mod resize; @@ -269,3 +273,6 @@ pub mod service; //---------------------------------------------------------------------------------------------------- Private #[cfg(test)] pub(crate) mod tests; + +#[cfg(feature = "service")] // only needed in `service` for now +pub(crate) mod unsafe_sendable; diff --git a/database/src/ops/block.rs b/database/src/ops/block.rs index 8a70ba1..77a8091 100644 --- a/database/src/ops/block.rs +++ b/database/src/ops/block.rs @@ -89,8 +89,14 @@ pub fn add_block( } //------------------------------------------------------ Transaction / Outputs / Key Images - for tx_verification_data in &block.txs { - add_tx(tx_verification_data, &chain_height, tables)?; + // Add the miner transaction first. + { + let tx = &block.block.miner_tx; + add_tx(tx, &tx.serialize(), &tx.hash(), &chain_height, tables)?; + } + + for tx in &block.txs { + add_tx(&tx.tx, &tx.tx_blob, &tx.tx_hash, &chain_height, tables)?; } //------------------------------------------------------ Block Info @@ -161,6 +167,7 @@ pub fn pop_block( let block = Block::read(&mut block_blob.as_slice())?; //------------------------------------------------------ Transaction / Outputs / Key Images + remove_tx(&block.miner_tx.hash(), tables)?; for tx_hash in &block.txs { remove_tx(tx_hash, tables)?; } @@ -226,6 +233,16 @@ pub fn get_block_extended_header_top( } //---------------------------------------------------------------------------------------------------- Misc +/// Retrieve a [`BlockInfo`] via its [`BlockHeight`]. +#[doc = doc_error!()] +#[inline] +pub fn get_block_info( + block_height: &BlockHeight, + table_block_infos: &impl DatabaseRo, +) -> Result { + table_block_infos.get(block_height) +} + /// Retrieve a [`BlockHeight`] via its [`BlockHash`]. #[doc = doc_error!()] #[inline] @@ -262,7 +279,7 @@ mod test { use super::*; use crate::{ ops::tx::{get_tx, tx_exists}, - tests::{assert_all_tables_are_empty, tmp_concrete_env}, + tests::{assert_all_tables_are_empty, tmp_concrete_env, AssertTableLen}, Env, }; @@ -315,20 +332,23 @@ mod test { let tables = env_inner.open_tables(&tx_ro).unwrap(); // Assert only the proper tables were added to. - assert_eq!(tables.block_infos().len().unwrap(), 3); - assert_eq!(tables.block_blobs().len().unwrap(), 3); - assert_eq!(tables.block_heights().len().unwrap(), 3); - assert_eq!(tables.key_images().len().unwrap(), 69); - assert_eq!(tables.num_outputs().len().unwrap(), 38); - assert_eq!(tables.pruned_tx_blobs().len().unwrap(), 0); - assert_eq!(tables.prunable_hashes().len().unwrap(), 0); - assert_eq!(tables.outputs().len().unwrap(), 107); - assert_eq!(tables.prunable_tx_blobs().len().unwrap(), 0); - assert_eq!(tables.rct_outputs().len().unwrap(), 6); - assert_eq!(tables.tx_blobs().len().unwrap(), 5); - assert_eq!(tables.tx_ids().len().unwrap(), 5); - assert_eq!(tables.tx_heights().len().unwrap(), 5); - assert_eq!(tables.tx_unlock_time().len().unwrap(), 0); + AssertTableLen { + block_infos: 3, + block_blobs: 3, + block_heights: 3, + key_images: 69, + num_outputs: 41, + pruned_tx_blobs: 0, + prunable_hashes: 0, + outputs: 111, + prunable_tx_blobs: 0, + rct_outputs: 8, + tx_blobs: 8, + tx_ids: 8, + tx_heights: 8, + tx_unlock_time: 3, + } + .assert(&tables); // Check `cumulative` functions work. assert_eq!( diff --git a/database/src/ops/blockchain.rs b/database/src/ops/blockchain.rs index 784ec50..9208995 100644 --- a/database/src/ops/blockchain.rs +++ b/database/src/ops/blockchain.rs @@ -101,7 +101,7 @@ mod test { block::add_block, tx::{get_tx, tx_exists}, }, - tests::{assert_all_tables_are_empty, tmp_concrete_env}, + tests::{assert_all_tables_are_empty, tmp_concrete_env, AssertTableLen}, Env, }; @@ -149,6 +149,24 @@ mod test { } // Assert reads are correct. + AssertTableLen { + block_infos: 3, + block_blobs: 3, + block_heights: 3, + key_images: 69, + num_outputs: 41, + pruned_tx_blobs: 0, + prunable_hashes: 0, + outputs: 111, + prunable_tx_blobs: 0, + rct_outputs: 8, + tx_blobs: 8, + tx_ids: 8, + tx_heights: 8, + tx_unlock_time: 3, + } + .assert(&tables); + assert_eq!(blocks_len, chain_height(tables.block_heights()).unwrap()); assert_eq!( blocks_len - 1, diff --git a/database/src/ops/key_image.rs b/database/src/ops/key_image.rs index 1fa00ea..0938447 100644 --- a/database/src/ops/key_image.rs +++ b/database/src/ops/key_image.rs @@ -64,7 +64,7 @@ mod test { use super::*; use crate::{ ops::tx::{get_tx, tx_exists}, - tests::{assert_all_tables_are_empty, tmp_concrete_env}, + tests::{assert_all_tables_are_empty, tmp_concrete_env, AssertTableLen}, Env, }; @@ -108,23 +108,11 @@ mod test { let tables = env_inner.open_tables(&tx_ro).unwrap(); // Assert only the proper tables were added to. - assert_eq!( - tables.key_images().len().unwrap(), - u64::try_from(key_images.len()).unwrap() - ); - assert_eq!(tables.block_infos().len().unwrap(), 0); - assert_eq!(tables.block_blobs().len().unwrap(), 0); - assert_eq!(tables.block_heights().len().unwrap(), 0); - assert_eq!(tables.num_outputs().len().unwrap(), 0); - assert_eq!(tables.pruned_tx_blobs().len().unwrap(), 0); - assert_eq!(tables.prunable_hashes().len().unwrap(), 0); - assert_eq!(tables.outputs().len().unwrap(), 0); - assert_eq!(tables.prunable_tx_blobs().len().unwrap(), 0); - assert_eq!(tables.rct_outputs().len().unwrap(), 0); - assert_eq!(tables.tx_blobs().len().unwrap(), 0); - assert_eq!(tables.tx_ids().len().unwrap(), 0); - assert_eq!(tables.tx_heights().len().unwrap(), 0); - assert_eq!(tables.tx_unlock_time().len().unwrap(), 0); + AssertTableLen { + key_images: tables.key_images().len().unwrap(), + ..Default::default() + } + .assert(&tables); for key_image in &key_images { println!("key_image_exists(): {}", hex::encode(key_image)); diff --git a/database/src/ops/output.rs b/database/src/ops/output.rs index b2c615f..e290245 100644 --- a/database/src/ops/output.rs +++ b/database/src/ops/output.rs @@ -1,7 +1,12 @@ //! Outputs. +use cuprate_helper::map::u64_to_timelock; +use curve25519_dalek::{constants::ED25519_BASEPOINT_POINT, edwards::CompressedEdwardsY, Scalar}; //---------------------------------------------------------------------------------------------------- Import -use monero_serai::transaction::{Timelock, Transaction}; +use monero_serai::{ + transaction::{Timelock, Transaction}, + H, +}; use cuprate_types::{OutputOnChain, VerifiedBlockInformation}; @@ -17,8 +22,8 @@ use crate::{ }, transaction::{TxRo, TxRw}, types::{ - Amount, AmountIndex, BlockHash, BlockHeight, BlockInfo, KeyImage, Output, PreRctOutputId, - RctOutput, TxHash, + Amount, AmountIndex, BlockHash, BlockHeight, BlockInfo, KeyImage, Output, OutputFlags, + PreRctOutputId, RctOutput, TxHash, }, }; @@ -153,13 +158,110 @@ pub fn get_rct_num_outputs( table_rct_outputs.len() } +//---------------------------------------------------------------------------------------------------- Mapping functions +/// Map an [`Output`] to a [`cuprate_types::OutputOnChain`]. +#[doc = doc_error!()] +pub fn output_to_output_on_chain( + output: &Output, + amount: Amount, + table_tx_unlock_time: &impl DatabaseRo, +) -> Result { + // FIXME: implement lookup table for common values: + // + let commitment = ED25519_BASEPOINT_POINT + H() * Scalar::from(amount); + + let time_lock = if output + .output_flags + .contains(OutputFlags::NON_ZERO_UNLOCK_TIME) + { + u64_to_timelock(table_tx_unlock_time.get(&output.tx_idx)?) + } else { + Timelock::None + }; + + let key = CompressedEdwardsY::from_slice(&output.key) + .map(|y| y.decompress()) + .unwrap_or(None); + + Ok(OutputOnChain { + height: u64::from(output.height), + time_lock, + key, + commitment, + }) +} + +/// Map an [`RctOutput`] to a [`cuprate_types::OutputOnChain`]. +/// +/// # Panics +/// This function will panic if `rct_output`'s `commitment` fails to decompress +/// into a valid [`EdwardsPoint`](curve25519_dalek::edwards::EdwardsPoint). +/// +/// This should normally not happen as commitments that +/// are stored in the database should always be valid. +#[doc = doc_error!()] +pub fn rct_output_to_output_on_chain( + rct_output: &RctOutput, + table_tx_unlock_time: &impl DatabaseRo, +) -> Result { + // INVARIANT: Commitments stored are valid when stored by the database. + let commitment = CompressedEdwardsY::from_slice(&rct_output.commitment) + .unwrap() + .decompress() + .unwrap(); + + let time_lock = if rct_output + .output_flags + .contains(OutputFlags::NON_ZERO_UNLOCK_TIME) + { + u64_to_timelock(table_tx_unlock_time.get(&rct_output.tx_idx)?) + } else { + Timelock::None + }; + + let key = CompressedEdwardsY::from_slice(&rct_output.key) + .map(|y| y.decompress()) + .unwrap_or(None); + + Ok(OutputOnChain { + height: u64::from(rct_output.height), + time_lock, + key, + commitment, + }) +} + +/// Map an [`PreRctOutputId`] to an [`OutputOnChain`]. +/// +/// Note that this still support RCT outputs, in that case, [`PreRctOutputId::amount`] should be `0`. +#[doc = doc_error!()] +pub fn id_to_output_on_chain( + id: &PreRctOutputId, + tables: &impl Tables, +) -> Result { + // v2 transactions. + if id.amount == 0 { + let rct_output = get_rct_output(&id.amount_index, tables.rct_outputs())?; + let output_on_chain = rct_output_to_output_on_chain(&rct_output, tables.tx_unlock_time())?; + + Ok(output_on_chain) + } else { + // v1 transactions. + let output = get_output(id, tables.outputs())?; + let output_on_chain = + output_to_output_on_chain(&output, id.amount, tables.tx_unlock_time())?; + + Ok(output_on_chain) + } +} + //---------------------------------------------------------------------------------------------------- Tests #[cfg(test)] #[allow(clippy::significant_drop_tightening, clippy::cognitive_complexity)] mod test { use super::*; use crate::{ - tests::{assert_all_tables_are_empty, tmp_concrete_env}, + tests::{assert_all_tables_are_empty, tmp_concrete_env, AssertTableLen}, types::OutputFlags, Env, }; @@ -221,20 +323,23 @@ mod test { // Assert all reads of the outputs are OK. { // Assert proper tables were added to. - assert_eq!(tables.block_infos().len().unwrap(), 0); - assert_eq!(tables.block_blobs().len().unwrap(), 0); - assert_eq!(tables.block_heights().len().unwrap(), 0); - assert_eq!(tables.key_images().len().unwrap(), 0); - assert_eq!(tables.num_outputs().len().unwrap(), 1); - assert_eq!(tables.pruned_tx_blobs().len().unwrap(), 0); - assert_eq!(tables.prunable_hashes().len().unwrap(), 0); - assert_eq!(tables.outputs().len().unwrap(), 1); - assert_eq!(tables.prunable_tx_blobs().len().unwrap(), 0); - assert_eq!(tables.rct_outputs().len().unwrap(), 1); - assert_eq!(tables.tx_blobs().len().unwrap(), 0); - assert_eq!(tables.tx_ids().len().unwrap(), 0); - assert_eq!(tables.tx_heights().len().unwrap(), 0); - assert_eq!(tables.tx_unlock_time().len().unwrap(), 0); + AssertTableLen { + block_infos: 0, + block_blobs: 0, + block_heights: 0, + key_images: 0, + num_outputs: 1, + pruned_tx_blobs: 0, + prunable_hashes: 0, + outputs: 1, + prunable_tx_blobs: 0, + rct_outputs: 1, + tx_blobs: 0, + tx_ids: 0, + tx_heights: 0, + tx_unlock_time: 0, + } + .assert(&tables); // Assert length is correct. assert_eq!(get_num_outputs(tables.outputs()).unwrap(), 1); diff --git a/database/src/ops/tx.rs b/database/src/ops/tx.rs index a66750e..a76de39 100644 --- a/database/src/ops/tx.rs +++ b/database/src/ops/tx.rs @@ -36,7 +36,7 @@ use super::{ }; //---------------------------------------------------------------------------------------------------- Private -/// Add a [`TransactionVerificationData`] to the database. +/// Add a [`Transaction`] (and related data) to the database. /// /// The `block_height` is the block that this `tx` belongs to. /// @@ -63,18 +63,20 @@ use super::{ #[doc = doc_error!()] #[inline] pub fn add_tx( - tx: &TransactionVerificationData, + tx: &Transaction, + tx_blob: &Vec, + tx_hash: &TxHash, block_height: &BlockHeight, tables: &mut impl TablesMut, ) -> Result { let tx_id = get_num_tx(tables.tx_ids_mut())?; //------------------------------------------------------ Transaction data - tables.tx_ids_mut().put(&tx.tx_hash, &tx_id)?; + tables.tx_ids_mut().put(tx_hash, &tx_id)?; tables.tx_heights_mut().put(&tx_id, block_height)?; tables .tx_blobs_mut() - .put(&tx_id, StorableVec::wrap_ref(&tx.tx_blob))?; + .put(&tx_id, StorableVec::wrap_ref(tx_blob))?; //------------------------------------------------------ Timelocks // Height/time is not differentiated via type, but rather: @@ -82,7 +84,7 @@ pub fn add_tx( // so the `u64/usize` is stored without any tag. // // - match tx.tx.prefix.timelock { + match tx.prefix.timelock { Timelock::None => (), Timelock::Block(height) => tables.tx_unlock_time_mut().put(&tx_id, &(height as u64))?, Timelock::Time(time) => tables.tx_unlock_time_mut().put(&tx_id, &time)?, @@ -95,8 +97,6 @@ pub fn add_tx( // } //------------------------------------------------------ - // Refer to the inner transaction type from now on. - let tx: &Transaction = &tx.tx; let Ok(height) = u32::try_from(*block_height) else { panic!("add_tx(): block_height ({block_height}) > u32::MAX"); }; @@ -128,7 +128,6 @@ pub fn add_tx( }; let mut amount_indices = Vec::with_capacity(tx.prefix.outputs.len()); - let tx_idx = get_num_tx(tables.tx_ids_mut())?; for (i, output) in tx.prefix.outputs.iter().enumerate() { let key = *output.key.as_bytes(); @@ -151,7 +150,7 @@ pub fn add_tx( key, height, output_flags, - tx_idx, + tx_idx: tx_id, commitment, }, tables.rct_outputs_mut(), @@ -164,7 +163,7 @@ pub fn add_tx( key, height, output_flags, - tx_idx, + tx_idx: tx_id, }, tables, )? @@ -178,7 +177,7 @@ pub fn add_tx( key, height, output_flags, - tx_idx, + tx_idx: tx_id, commitment, }, tables.rct_outputs_mut(), @@ -343,7 +342,7 @@ pub fn tx_exists( mod test { use super::*; use crate::{ - tests::{assert_all_tables_are_empty, tmp_concrete_env}, + tests::{assert_all_tables_are_empty, tmp_concrete_env, AssertTableLen}, Env, }; use cuprate_test_utils::data::{tx_v1_sig0, tx_v1_sig2, tx_v2_rct3}; @@ -368,7 +367,7 @@ mod test { .iter() .map(|tx| { println!("add_tx(): {tx:#?}"); - add_tx(tx, &0, &mut tables).unwrap() + add_tx(&tx.tx, &tx.tx_blob, &tx.tx_hash, &0, &mut tables).unwrap() }) .collect::>(); @@ -384,20 +383,23 @@ mod test { let tables = env_inner.open_tables(&tx_ro).unwrap(); // Assert only the proper tables were added to. - assert_eq!(tables.block_infos().len().unwrap(), 0); - assert_eq!(tables.block_blobs().len().unwrap(), 0); - assert_eq!(tables.block_heights().len().unwrap(), 0); - assert_eq!(tables.key_images().len().unwrap(), 4); // added to key images - assert_eq!(tables.pruned_tx_blobs().len().unwrap(), 0); - assert_eq!(tables.prunable_hashes().len().unwrap(), 0); - assert_eq!(tables.num_outputs().len().unwrap(), 9); - assert_eq!(tables.outputs().len().unwrap(), 10); // added to outputs - assert_eq!(tables.prunable_tx_blobs().len().unwrap(), 0); - assert_eq!(tables.rct_outputs().len().unwrap(), 2); - assert_eq!(tables.tx_blobs().len().unwrap(), 3); - assert_eq!(tables.tx_ids().len().unwrap(), 3); - assert_eq!(tables.tx_heights().len().unwrap(), 3); - assert_eq!(tables.tx_unlock_time().len().unwrap(), 1); // only 1 has a timelock + AssertTableLen { + block_infos: 0, + block_blobs: 0, + block_heights: 0, + key_images: 4, // added to key images + pruned_tx_blobs: 0, + prunable_hashes: 0, + num_outputs: 9, + outputs: 10, // added to outputs + prunable_tx_blobs: 0, + rct_outputs: 2, + tx_blobs: 3, + tx_ids: 3, + tx_heights: 3, + tx_unlock_time: 1, // only 1 has a timelock + } + .assert(&tables); // Both from ID and hash should result in getting the same transaction. let mut tx_hashes = vec![]; diff --git a/database/src/service/read.rs b/database/src/service/read.rs index cabe335..b178bd1 100644 --- a/database/src/service/read.rs +++ b/database/src/service/read.rs @@ -3,27 +3,46 @@ //---------------------------------------------------------------------------------------------------- Import use std::{ collections::{HashMap, HashSet}, + num::NonZeroUsize, ops::Range, - sync::Arc, + sync::{Arc, RwLock}, task::{Context, Poll}, }; +use cfg_if::cfg_if; use crossbeam::channel::Receiver; - +use curve25519_dalek::{constants::ED25519_BASEPOINT_POINT, edwards::CompressedEdwardsY, Scalar}; use futures::{channel::oneshot, ready}; - +use monero_serai::{transaction::Timelock, H}; +use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}; +use thread_local::ThreadLocal; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio_util::sync::PollSemaphore; use cuprate_helper::asynch::InfallibleOneshotReceiver; -use cuprate_types::service::{ReadRequest, Response}; +use cuprate_types::{ + service::{ReadRequest, Response}, + ExtendedBlockHeader, OutputOnChain, +}; use crate::{ config::ReaderThreads, + constants::DATABASE_CORRUPT_MSG, error::RuntimeError, + ops::{ + block::{get_block_extended_header_from_height, get_block_info}, + blockchain::{cumulative_generated_coins, top_block_height}, + key_image::key_image_exists, + output::{ + get_output, get_rct_output, id_to_output_on_chain, output_to_output_on_chain, + rct_output_to_output_on_chain, + }, + }, service::types::{ResponseReceiver, ResponseResult, ResponseSender}, - types::{Amount, AmountIndex, BlockHeight, KeyImage}, - ConcreteEnv, + tables::{BlockHeights, BlockInfos, KeyImages, NumOutputs, Outputs, Tables}, + types::{Amount, AmountIndex, BlockHeight, KeyImage, OutputFlags, PreRctOutputId}, + unsafe_sendable::UnsafeSendable, + ConcreteEnv, DatabaseRo, Env, EnvInner, }; //---------------------------------------------------------------------------------------------------- DatabaseReadHandle @@ -65,10 +84,10 @@ pub struct DatabaseReadHandle { impl Clone for DatabaseReadHandle { fn clone(&self) -> Self { Self { - pool: self.pool.clone(), + pool: Arc::clone(&self.pool), semaphore: self.semaphore.clone(), permit: None, - env: self.env.clone(), + env: Arc::clone(&self.env), } } } @@ -106,23 +125,21 @@ impl DatabaseReadHandle { } } - /// TODO + /// Access to the actual database environment. + /// + /// # ⚠️ Warning + /// This function gives you access to the actual + /// underlying database connected to by `self`. + /// + /// I.e. it allows you to read/write data _directly_ + /// instead of going through a request. + /// + /// Be warned that using the database directly + /// in this manner has not been tested. #[inline] pub const fn env(&self) -> &Arc { &self.env } - - /// TODO - #[inline] - pub const fn semaphore(&self) -> &PollSemaphore { - &self.semaphore - } - - /// TODO - #[inline] - pub const fn permit(&self) -> &Option { - &self.permit - } } impl tower::Service for DatabaseReadHandle { @@ -163,9 +180,11 @@ impl tower::Service for DatabaseReadHandle { // // INVARIANT: // The below `DatabaseReader` function impl block relies on this behavior. - let env = Arc::clone(self.env()); - self.pool - .spawn(move || map_request(permit, env, request, response_sender)); + let env = Arc::clone(&self.env); + self.pool.spawn(move || { + let _permit: OwnedSemaphorePermit = permit; + map_request(&env, request, response_sender); + }); // drop(permit/env); InfallibleOneshotReceiver::from(receiver) } @@ -175,7 +194,6 @@ impl tower::Service for DatabaseReadHandle { // This function maps [`Request`]s to function calls // executed by the rayon DB reader threadpool. -#[allow(clippy::needless_pass_by_value)] // TODO: fix me /// Map [`Request`]'s to specific database handler functions. /// /// This is the main entrance into all `Request` handler functions. @@ -184,23 +202,23 @@ impl tower::Service for DatabaseReadHandle { /// 2. Handler function is called /// 3. [`Response`] is sent fn map_request( - _permit: OwnedSemaphorePermit, // Permit for this request, dropped at end of function - env: Arc, // Access to the database - request: ReadRequest, // The request we must fulfill + env: &ConcreteEnv, // Access to the database + request: ReadRequest, // The request we must fulfill response_sender: ResponseSender, // The channel we must send the response back to ) { - /* TODO: pre-request handling, run some code for each request? */ use ReadRequest as R; + /* TODO: pre-request handling, run some code for each request? */ + let response = match request { - R::BlockExtendedHeader(block) => block_extended_header(&env, block), - R::BlockHash(block) => block_hash(&env, block), - R::BlockExtendedHeaderInRange(range) => block_extended_header_in_range(&env, range), - R::ChainHeight => chain_height(&env), - R::GeneratedCoins => generated_coins(&env), - R::Outputs(map) => outputs(&env, map), - R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(&env, vec), - R::CheckKIsNotSpent(set) => check_k_is_not_spent(&env, set), + R::BlockExtendedHeader(block) => block_extended_header(env, block), + R::BlockHash(block) => block_hash(env, block), + R::BlockExtendedHeaderInRange(range) => block_extended_header_in_range(env, range), + R::ChainHeight => chain_height(env), + R::GeneratedCoins => generated_coins(env), + R::Outputs(map) => outputs(env, map), + R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(env, vec), + R::CheckKIsNotSpent(set) => check_k_is_not_spent(env, set), }; if let Err(e) = response_sender.send(response) { @@ -211,6 +229,54 @@ fn map_request( /* TODO: post-request handling, run some code for each request? */ } +//---------------------------------------------------------------------------------------------------- Thread Local +/// Q: Why does this exist? +/// +/// A1: `heed`'s transactions and tables are not `Sync`, so we cannot use +/// them with rayon, however, we set a feature such that they are `Send`. +/// +/// A2: When sending to rayon, we want to ensure each read transaction +/// is only being used by 1 thread only to scale reads +/// +/// +#[inline] +fn thread_local(env: &impl Env) -> ThreadLocal { + ThreadLocal::with_capacity(env.config().reader_threads.as_threads().get()) +} + +/// Take in a `ThreadLocal` and return an `&impl Tables + Send`. +/// +/// # Safety +/// See [`DatabaseRo`] docs. +/// +/// We are safely using `UnsafeSendable` in `service`'s reader thread-pool +/// as we are pairing our usage with `ThreadLocal` - only 1 thread +/// will ever access a transaction at a time. This is an INVARIANT. +/// +/// A `Mutex` was considered but: +/// - It is less performant +/// - It isn't technically needed for safety in our use-case +/// - It causes `DatabaseIter` function return issues as there is a `MutexGuard` object +/// +/// +/// +/// # Notes +/// This is used for other backends as well instead of branching with `cfg_if`. +/// The other backends (as of current) are `Send + Sync` so this is fine. +/// +macro_rules! get_tables { + ($env_inner:ident, $tx_ro:ident, $tables:ident) => {{ + $tables.get_or_try(|| { + #[allow(clippy::significant_drop_in_scrutinee)] + match $env_inner.open_tables($tx_ro) { + // SAFETY: see above macro doc comment. + Ok(tables) => Ok(unsafe { crate::unsafe_sendable::UnsafeSendable::new(tables) }), + Err(e) => Err(e), + } + }) + }}; +} + //---------------------------------------------------------------------------------------------------- Handler functions // These are the actual functions that do stuff according to the incoming [`Request`]. // @@ -228,57 +294,211 @@ fn map_request( // All functions below assume that this is the case, such that // `par_*()` functions will not block the _global_ rayon thread-pool. +// TODO: implement multi-transaction read atomicity. +// . + /// [`ReadRequest::BlockExtendedHeader`]. #[inline] -fn block_extended_header(env: &Arc, block_height: BlockHeight) -> ResponseResult { - todo!() +fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult { + // Single-threaded, no `ThreadLocal` required. + let env_inner = env.env_inner(); + let tx_ro = env_inner.tx_ro()?; + let tables = env_inner.open_tables(&tx_ro)?; + + Ok(Response::BlockExtendedHeader( + get_block_extended_header_from_height(&block_height, &tables)?, + )) } /// [`ReadRequest::BlockHash`]. #[inline] -fn block_hash(env: &Arc, block_height: BlockHeight) -> ResponseResult { - todo!() +fn block_hash(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult { + // Single-threaded, no `ThreadLocal` required. + let env_inner = env.env_inner(); + let tx_ro = env_inner.tx_ro()?; + let table_block_infos = env_inner.open_db_ro::(&tx_ro)?; + + Ok(Response::BlockHash( + get_block_info(&block_height, &table_block_infos)?.block_hash, + )) } /// [`ReadRequest::BlockExtendedHeaderInRange`]. #[inline] fn block_extended_header_in_range( - env: &Arc, + env: &ConcreteEnv, range: std::ops::Range, ) -> ResponseResult { - todo!() + // Prepare tx/tables in `ThreadLocal`. + let env_inner = env.env_inner(); + let tx_ro = thread_local(env); + let tables = thread_local(env); + + // Collect results using `rayon`. + let vec = range + .into_par_iter() + .map(|block_height| { + let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?; + let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref(); + get_block_extended_header_from_height(&block_height, tables) + }) + .collect::, RuntimeError>>()?; + + Ok(Response::BlockExtendedHeaderInRange(vec)) } /// [`ReadRequest::ChainHeight`]. #[inline] -fn chain_height(env: &Arc) -> ResponseResult { - todo!() +fn chain_height(env: &ConcreteEnv) -> ResponseResult { + // Single-threaded, no `ThreadLocal` required. + let env_inner = env.env_inner(); + let tx_ro = env_inner.tx_ro()?; + let table_block_heights = env_inner.open_db_ro::(&tx_ro)?; + let table_block_infos = env_inner.open_db_ro::(&tx_ro)?; + + let chain_height = crate::ops::blockchain::chain_height(&table_block_heights)?; + let block_hash = + get_block_info(&chain_height.saturating_sub(1), &table_block_infos)?.block_hash; + + Ok(Response::ChainHeight(chain_height, block_hash)) } /// [`ReadRequest::GeneratedCoins`]. #[inline] -fn generated_coins(env: &Arc) -> ResponseResult { - todo!() +fn generated_coins(env: &ConcreteEnv) -> ResponseResult { + // Single-threaded, no `ThreadLocal` required. + let env_inner = env.env_inner(); + let tx_ro = env_inner.tx_ro()?; + let table_block_heights = env_inner.open_db_ro::(&tx_ro)?; + let table_block_infos = env_inner.open_db_ro::(&tx_ro)?; + + let top_height = top_block_height(&table_block_heights)?; + + Ok(Response::GeneratedCoins(cumulative_generated_coins( + &top_height, + &table_block_infos, + )?)) } /// [`ReadRequest::Outputs`]. #[inline] -#[allow(clippy::needless_pass_by_value)] // TODO: remove me -fn outputs(env: &Arc, map: HashMap>) -> ResponseResult { - todo!() +fn outputs(env: &ConcreteEnv, outputs: HashMap>) -> ResponseResult { + // Prepare tx/tables in `ThreadLocal`. + let env_inner = env.env_inner(); + let tx_ro = thread_local(env); + let tables = thread_local(env); + + // The 2nd mapping function. + // This is pulled out from the below `map()` for readability. + let inner_map = |amount, amount_index| -> Result<(AmountIndex, OutputOnChain), RuntimeError> { + let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?; + let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref(); + + let id = PreRctOutputId { + amount, + amount_index, + }; + + let output_on_chain = id_to_output_on_chain(&id, tables)?; + + Ok((amount_index, output_on_chain)) + }; + + // Collect results using `rayon`. + let map = outputs + .into_par_iter() + .map(|(amount, amount_index_set)| { + Ok(( + amount, + amount_index_set + .into_par_iter() + .map(|amount_index| inner_map(amount, amount_index)) + .collect::, RuntimeError>>()?, + )) + }) + .collect::>, RuntimeError>>()?; + + Ok(Response::Outputs(map)) } /// [`ReadRequest::NumberOutputsWithAmount`]. -/// TODO #[inline] -#[allow(clippy::needless_pass_by_value)] // TODO: remove me -fn number_outputs_with_amount(env: &Arc, vec: Vec) -> ResponseResult { - todo!() +fn number_outputs_with_amount(env: &ConcreteEnv, amounts: Vec) -> ResponseResult { + // Prepare tx/tables in `ThreadLocal`. + let env_inner = env.env_inner(); + let tx_ro = thread_local(env); + let tables = thread_local(env); + + // Cache the amount of RCT outputs once. + // INVARIANT: #[cfg] @ lib.rs asserts `usize == u64` + #[allow(clippy::cast_possible_truncation)] + let num_rct_outputs = { + let tx_ro = env_inner.tx_ro()?; + let tables = env_inner.open_tables(&tx_ro)?; + tables.rct_outputs().len()? as usize + }; + + // Collect results using `rayon`. + let map = amounts + .into_par_iter() + .map(|amount| { + let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?; + let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref(); + + if amount == 0 { + // v2 transactions. + Ok((amount, num_rct_outputs)) + } else { + // v1 transactions. + match tables.num_outputs().get(&amount) { + // INVARIANT: #[cfg] @ lib.rs asserts `usize == u64` + #[allow(clippy::cast_possible_truncation)] + Ok(count) => Ok((amount, count as usize)), + // If we get a request for an `amount` that doesn't exist, + // we return `0` instead of an error. + Err(RuntimeError::KeyNotFound) => Ok((amount, 0)), + Err(e) => Err(e), + } + } + }) + .collect::, RuntimeError>>()?; + + Ok(Response::NumberOutputsWithAmount(map)) } /// [`ReadRequest::CheckKIsNotSpent`]. #[inline] -#[allow(clippy::needless_pass_by_value)] // TODO: remove me -fn check_k_is_not_spent(env: &Arc, set: HashSet) -> ResponseResult { - todo!() +fn check_k_is_not_spent(env: &ConcreteEnv, key_images: HashSet) -> ResponseResult { + // Prepare tx/tables in `ThreadLocal`. + let env_inner = env.env_inner(); + let tx_ro = thread_local(env); + let tables = thread_local(env); + + // Key image check function. + let key_image_exists = |key_image| { + let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?; + let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref(); + key_image_exists(&key_image, tables.key_images()) + }; + + // TODO: + // Create/use `enum cuprate_types::Exist { Does, DoesNot }` + // or similar instead of `bool` for clarity. + // + // + // Collect results using `rayon`. + match key_images + .into_par_iter() + .map(key_image_exists) + // If the result is either: + // `Ok(true)` => a key image was found, return early + // `Err` => an error was found, return early + // + // Else, `Ok(false)` will continue the iterator. + .find_any(|result| !matches!(result, Ok(false))) + { + None | Some(Ok(false)) => Ok(Response::CheckKIsNotSpent(true)), // Key image was NOT found. + Some(Ok(true)) => Ok(Response::CheckKIsNotSpent(false)), // Key image was found. + Some(Err(e)) => Err(e), // A database error occurred. + } } diff --git a/database/src/service/tests.rs b/database/src/service/tests.rs index 03c03db..5f09b19 100644 --- a/database/src/service/tests.rs +++ b/database/src/service/tests.rs @@ -7,76 +7,379 @@ // This is only imported on `#[cfg(test)]` in `mod.rs`. -#![allow(unused_mut, clippy::significant_drop_tightening)] +#![allow( + clippy::significant_drop_tightening, + clippy::await_holding_lock, + clippy::too_many_lines +)] //---------------------------------------------------------------------------------------------------- Use +use std::{ + collections::{hash_map::Entry, HashMap, HashSet}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; + +use pretty_assertions::assert_eq; use tower::{Service, ServiceExt}; -use cuprate_types::service::{ReadRequest, Response, WriteRequest}; +use cuprate_test_utils::data::{block_v16_tx0, block_v1_tx2, block_v9_tx3}; +use cuprate_types::{ + service::{ReadRequest, Response, WriteRequest}, + ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation, +}; use crate::{ config::Config, + ops::{ + block::{get_block_extended_header_from_height, get_block_info}, + blockchain::{chain_height, top_block_height}, + output::{get_output, id_to_output_on_chain, output_to_output_on_chain}, + }, service::{init, DatabaseReadHandle, DatabaseWriteHandle}, + tables::{KeyImages, Tables, TablesIter}, + tests::AssertTableLen, + types::{Amount, AmountIndex, KeyImage, PreRctOutputId}, + ConcreteEnv, DatabaseIter, DatabaseRo, Env, EnvInner, RuntimeError, }; -//---------------------------------------------------------------------------------------------------- Tests +//---------------------------------------------------------------------------------------------------- Helper functions /// Initialize the `service`. -fn init_service() -> (DatabaseReadHandle, DatabaseWriteHandle, tempfile::TempDir) { +fn init_service() -> ( + DatabaseReadHandle, + DatabaseWriteHandle, + Arc, + tempfile::TempDir, +) { let tempdir = tempfile::tempdir().unwrap(); let config = Config::low_power(Some(tempdir.path().into())); let (reader, writer) = init(config).unwrap(); - (reader, writer, tempdir) + let env = reader.env().clone(); + (reader, writer, env, tempdir) } +/// This is the template used in the actual test functions below. +/// +/// - Send write request(s) +/// - Receive response(s) +/// - Assert proper tables were mutated +/// - Assert read requests lead to expected responses +#[allow(clippy::future_not_send)] // INVARIANT: tests are using a single threaded runtime +async fn test_template( + // Which block(s) to add? + block_fns: &[fn() -> &'static VerifiedBlockInformation], + // Total amount of generated coins after the block(s) have been added. + cumulative_generated_coins: u64, + // What are the table lengths be after the block(s) have been added? + assert_table_len: AssertTableLen, +) { + //----------------------------------------------------------------------- Write requests + let (reader, mut writer, env, _tempdir) = init_service(); + + let env_inner = env.env_inner(); + let tx_ro = env_inner.tx_ro().unwrap(); + let tables = env_inner.open_tables(&tx_ro).unwrap(); + + // HACK: `add_block()` asserts blocks with non-sequential heights + // cannot be added, to get around this, manually edit the block height. + for (i, block_fn) in block_fns.iter().enumerate() { + let mut block = block_fn().clone(); + block.height = i as u64; + + // Request a block to be written, assert it was written. + let request = WriteRequest::WriteBlock(block); + let response_channel = writer.call(request); + let response = response_channel.await.unwrap(); + assert_eq!(response, Response::WriteBlockOk); + } + + //----------------------------------------------------------------------- Reset the transaction + drop(tables); + drop(tx_ro); + let tx_ro = env_inner.tx_ro().unwrap(); + let tables = env_inner.open_tables(&tx_ro).unwrap(); + + //----------------------------------------------------------------------- Assert all table lengths are correct + assert_table_len.assert(&tables); + + //----------------------------------------------------------------------- Read request prep + // Next few lines are just for preparing the expected responses, + // see further below for usage. + + let extended_block_header_0 = Ok(Response::BlockExtendedHeader( + get_block_extended_header_from_height(&0, &tables).unwrap(), + )); + + let extended_block_header_1 = if block_fns.len() > 1 { + Ok(Response::BlockExtendedHeader( + get_block_extended_header_from_height(&1, &tables).unwrap(), + )) + } else { + Err(RuntimeError::KeyNotFound) + }; + + let block_hash_0 = Ok(Response::BlockHash( + get_block_info(&0, tables.block_infos()).unwrap().block_hash, + )); + + let block_hash_1 = if block_fns.len() > 1 { + Ok(Response::BlockHash( + get_block_info(&1, tables.block_infos()).unwrap().block_hash, + )) + } else { + Err(RuntimeError::KeyNotFound) + }; + + let range_0_1 = Ok(Response::BlockExtendedHeaderInRange(vec![ + get_block_extended_header_from_height(&0, &tables).unwrap(), + ])); + + let range_0_2 = if block_fns.len() >= 2 { + Ok(Response::BlockExtendedHeaderInRange(vec![ + get_block_extended_header_from_height(&0, &tables).unwrap(), + get_block_extended_header_from_height(&1, &tables).unwrap(), + ])) + } else { + Err(RuntimeError::KeyNotFound) + }; + + let chain_height = { + let height = chain_height(tables.block_heights()).unwrap(); + let block_info = get_block_info(&height.saturating_sub(1), tables.block_infos()).unwrap(); + Ok(Response::ChainHeight(height, block_info.block_hash)) + }; + + let cumulative_generated_coins = Ok(Response::GeneratedCoins(cumulative_generated_coins)); + + let num_req = tables + .outputs_iter() + .keys() + .unwrap() + .map(Result::unwrap) + .map(|key| key.amount) + .collect::>(); + + let num_resp = Ok(Response::NumberOutputsWithAmount( + num_req + .iter() + .map(|amount| match tables.num_outputs().get(amount) { + // INVARIANT: #[cfg] @ lib.rs asserts `usize == u64` + #[allow(clippy::cast_possible_truncation)] + Ok(count) => (*amount, count as usize), + Err(RuntimeError::KeyNotFound) => (*amount, 0), + Err(e) => panic!(), + }) + .collect::>(), + )); + + // Contains a fake non-spent key-image. + let ki_req = HashSet::from([[0; 32]]); + let ki_resp = Ok(Response::CheckKIsNotSpent(true)); + + //----------------------------------------------------------------------- Assert expected response + // Assert read requests lead to the expected responses. + for (request, expected_response) in [ + (ReadRequest::BlockExtendedHeader(0), extended_block_header_0), + (ReadRequest::BlockExtendedHeader(1), extended_block_header_1), + (ReadRequest::BlockHash(0), block_hash_0), + (ReadRequest::BlockHash(1), block_hash_1), + (ReadRequest::BlockExtendedHeaderInRange(0..1), range_0_1), + (ReadRequest::BlockExtendedHeaderInRange(0..2), range_0_2), + (ReadRequest::ChainHeight, chain_height), + (ReadRequest::GeneratedCoins, cumulative_generated_coins), + (ReadRequest::NumberOutputsWithAmount(num_req), num_resp), + (ReadRequest::CheckKIsNotSpent(ki_req), ki_resp), + ] { + let response = reader.clone().oneshot(request).await; + println!("response: {response:#?}, expected_response: {expected_response:#?}"); + match response { + Ok(resp) => assert_eq!(resp, expected_response.unwrap()), + Err(ref e) => assert!(matches!(response, expected_response)), + } + } + + //----------------------------------------------------------------------- Key image checks + // Assert each key image we inserted comes back as "spent". + for key_image in tables.key_images_iter().keys().unwrap() { + let key_image = key_image.unwrap(); + let request = ReadRequest::CheckKIsNotSpent(HashSet::from([key_image])); + let response = reader.clone().oneshot(request).await; + println!("response: {response:#?}, key_image: {key_image:#?}"); + assert_eq!(response.unwrap(), Response::CheckKIsNotSpent(false)); + } + + //----------------------------------------------------------------------- Output checks + // Create the map of amounts and amount indices. + // + // FIXME: There's definitely a better way to map + // `Vec` -> `HashMap>` + let (map, output_count) = { + let mut ids = tables + .outputs_iter() + .keys() + .unwrap() + .map(Result::unwrap) + .collect::>(); + + ids.extend( + tables + .rct_outputs_iter() + .keys() + .unwrap() + .map(Result::unwrap) + .map(|amount_index| PreRctOutputId { + amount: 0, + amount_index, + }), + ); + + // Used later to compare the amount of Outputs + // returned in the Response is equal to the amount + // we asked for. + let output_count = ids.len(); + + let mut map = HashMap::>::new(); + for id in ids { + map.entry(id.amount) + .and_modify(|set| { + set.insert(id.amount_index); + }) + .or_insert_with(|| HashSet::from([id.amount_index])); + } + + (map, output_count) + }; + + // Map `Output` -> `OutputOnChain` + // This is the expected output from the `Response`. + let outputs_on_chain = map + .iter() + .flat_map(|(amount, amount_index_set)| { + amount_index_set.iter().map(|amount_index| { + let id = PreRctOutputId { + amount: *amount, + amount_index: *amount_index, + }; + id_to_output_on_chain(&id, &tables).unwrap() + }) + }) + .collect::>(); + + // Send a request for every output we inserted before. + let request = ReadRequest::Outputs(map.clone()); + let response = reader.clone().oneshot(request).await; + println!("Response::Outputs response: {response:#?}"); + let Ok(Response::Outputs(response)) = response else { + panic!("{response:#?}") + }; + + // Assert amount of `Amount`'s are the same. + assert_eq!(map.len(), response.len()); + + // Assert we get back the same map of + // `Amount`'s and `AmountIndex`'s. + let mut response_output_count = 0; + for (amount, output_map) in response { + let amount_index_set = map.get(&amount).unwrap(); + + for (amount_index, output) in output_map { + response_output_count += 1; + assert!(amount_index_set.contains(&amount_index)); + assert!(outputs_on_chain.contains(&output)); + } + } + + // Assert the amount of `Output`'s returned is as expected. + let table_output_len = tables.outputs().len().unwrap() + tables.rct_outputs().len().unwrap(); + assert_eq!(output_count as u64, table_output_len); + assert_eq!(output_count, response_output_count); +} + +//---------------------------------------------------------------------------------------------------- Tests /// Simply `init()` the service and then drop it. /// /// If this test fails, something is very wrong. #[test] fn init_drop() { - let (reader, writer, _tempdir) = init_service(); + let (reader, writer, env, _tempdir) = init_service(); } -// TODO: -// un-comment and fix these tests when all `{read,write}` -// service functions are implemented. +/// Assert write/read correctness of [`block_v1_tx2`]. +#[tokio::test] +async fn v1_tx2() { + test_template( + &[block_v1_tx2], + 14_535_350_982_449, + AssertTableLen { + block_infos: 1, + block_blobs: 1, + block_heights: 1, + key_images: 65, + num_outputs: 41, + pruned_tx_blobs: 0, + prunable_hashes: 0, + outputs: 111, + prunable_tx_blobs: 0, + rct_outputs: 0, + tx_blobs: 3, + tx_ids: 3, + tx_heights: 3, + tx_unlock_time: 1, + }, + ) + .await; +} -// /// Send a read request, and receive a response, -// /// asserting the response the expected value. -// #[tokio::test] -// async fn read_request() { -// let (reader, writer, _tempdir) = init_service(); +/// Assert write/read correctness of [`block_v9_tx3`]. +#[tokio::test] +async fn v9_tx3() { + test_template( + &[block_v9_tx3], + 3_403_774_022_163, + AssertTableLen { + block_infos: 1, + block_blobs: 1, + block_heights: 1, + key_images: 4, + num_outputs: 0, + pruned_tx_blobs: 0, + prunable_hashes: 0, + outputs: 0, + prunable_tx_blobs: 0, + rct_outputs: 7, + tx_blobs: 4, + tx_ids: 4, + tx_heights: 4, + tx_unlock_time: 1, + }, + ) + .await; +} -// for (request, expected_response) in [ -// (ReadRequest::Example1, Response::Example1), -// (ReadRequest::Example2(123), Response::Example2(123)), -// ( -// ReadRequest::Example3("hello".into()), -// Response::Example3("hello".into()), -// ), -// ] { -// // This calls `poll_ready()` asserting we have a permit before `call()`. -// let response_channel = reader.clone().oneshot(request); -// let response = response_channel.await.unwrap(); -// assert_eq!(response, expected_response); -// } -// } - -// /// Send a write request, and receive a response, -// /// asserting the response the expected value. -// #[tokio::test] -// async fn write_request() { -// let (reader, mut writer, _tempdir) = init_service(); - -// for (request, expected_response) in [ -// (WriteRequest::Example1, Response::Example1), -// (WriteRequest::Example2(123), Response::Example2(123)), -// ( -// WriteRequest::Example3("hello".into()), -// Response::Example3("hello".into()), -// ), -// ] { -// let response_channel = writer.call(request); -// let response = response_channel.await.unwrap(); -// assert_eq!(response, expected_response); -// } -// } +/// Assert write/read correctness of [`block_v16_tx0`]. +#[tokio::test] +async fn v16_tx0() { + test_template( + &[block_v16_tx0], + 600_000_000_000, + AssertTableLen { + block_infos: 1, + block_blobs: 1, + block_heights: 1, + key_images: 0, + num_outputs: 0, + pruned_tx_blobs: 0, + prunable_hashes: 0, + outputs: 0, + prunable_tx_blobs: 0, + rct_outputs: 1, + tx_blobs: 1, + tx_ids: 1, + tx_heights: 1, + tx_unlock_time: 1, + }, + ) + .await; +} diff --git a/database/src/service/write.rs b/database/src/service/write.rs index e05f9ad..3cd7e80 100644 --- a/database/src/service/write.rs +++ b/database/src/service/write.rs @@ -15,9 +15,12 @@ use cuprate_types::{ }; use crate::{ + constants::DATABASE_CORRUPT_MSG, + env::{Env, EnvInner}, error::RuntimeError, service::types::{ResponseReceiver, ResponseResult, ResponseSender}, - ConcreteEnv, Env, + transaction::TxRw, + ConcreteEnv, }; //---------------------------------------------------------------------------------------------------- Constants @@ -226,5 +229,26 @@ impl DatabaseWriter { /// [`WriteRequest::WriteBlock`]. #[inline] fn write_block(env: &ConcreteEnv, block: &VerifiedBlockInformation) -> ResponseResult { - todo!() + let env_inner = env.env_inner(); + let tx_rw = env_inner.tx_rw()?; + + let result = { + let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?; + crate::ops::block::add_block(block, &mut tables_mut) + }; + + match result { + Ok(()) => { + tx_rw.commit()?; + Ok(Response::WriteBlockOk) + } + Err(e) => { + // INVARIANT: ensure database atomicity by aborting + // the transaction on `add_block()` failures. + tx_rw + .abort() + .expect("could not maintain database atomicity by aborting write transaction"); + Err(e) + } + } } diff --git a/database/src/tables.rs b/database/src/tables.rs index 846d405..6ac71df 100644 --- a/database/src/tables.rs +++ b/database/src/tables.rs @@ -350,10 +350,10 @@ tables! { /// Maps an output's amount to the number of outputs with that amount. /// - /// For a new output the `AmountIndex` value from this - /// table will be its index in a list of duplicate outputs. + /// For example, if there are 5 outputs with `amount = 123` + /// then calling `get(123)` on this table will return 5. NumOutputs, - Amount => AmountIndex, + Amount => u64, /// TODO PrunedTxBlobs, diff --git a/database/src/tests.rs b/database/src/tests.rs index e7b7e1a..dbebbe9 100644 --- a/database/src/tests.rs +++ b/database/src/tests.rs @@ -16,12 +16,61 @@ use monero_serai::{ ringct::{RctPrunable, RctSignatures}, transaction::{Timelock, Transaction, TransactionPrefix}, }; +use pretty_assertions::assert_eq; use crate::{ config::Config, key::Key, storable::Storable, tables::Tables, transaction::TxRo, ConcreteEnv, - Env, EnvInner, + DatabaseRo, Env, EnvInner, }; +//---------------------------------------------------------------------------------------------------- Struct +/// Named struct to assert the length of all tables. +/// +/// This is a struct with fields instead of a function +/// so that callers can name arguments, otherwise the call-site +/// is a little confusing, i.e. `assert_table_len(0, 25, 1, 123)`. +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) struct AssertTableLen { + pub(crate) block_infos: u64, + pub(crate) block_blobs: u64, + pub(crate) block_heights: u64, + pub(crate) key_images: u64, + pub(crate) num_outputs: u64, + pub(crate) pruned_tx_blobs: u64, + pub(crate) prunable_hashes: u64, + pub(crate) outputs: u64, + pub(crate) prunable_tx_blobs: u64, + pub(crate) rct_outputs: u64, + pub(crate) tx_blobs: u64, + pub(crate) tx_ids: u64, + pub(crate) tx_heights: u64, + pub(crate) tx_unlock_time: u64, +} + +impl AssertTableLen { + /// Assert the length of all tables. + pub(crate) fn assert(self, tables: &impl Tables) { + let other = Self { + block_infos: tables.block_infos().len().unwrap(), + block_blobs: tables.block_blobs().len().unwrap(), + block_heights: tables.block_heights().len().unwrap(), + key_images: tables.key_images().len().unwrap(), + num_outputs: tables.num_outputs().len().unwrap(), + pruned_tx_blobs: tables.pruned_tx_blobs().len().unwrap(), + prunable_hashes: tables.prunable_hashes().len().unwrap(), + outputs: tables.outputs().len().unwrap(), + prunable_tx_blobs: tables.prunable_tx_blobs().len().unwrap(), + rct_outputs: tables.rct_outputs().len().unwrap(), + tx_blobs: tables.tx_blobs().len().unwrap(), + tx_ids: tables.tx_ids().len().unwrap(), + tx_heights: tables.tx_heights().len().unwrap(), + tx_unlock_time: tables.tx_unlock_time().len().unwrap(), + }; + + assert_eq!(self, other); + } +} + //---------------------------------------------------------------------------------------------------- fn /// Create an `Env` in a temporarily directory. /// The directory is automatically removed after the `TempDir` is dropped. diff --git a/database/src/unsafe_sendable.rs b/database/src/unsafe_sendable.rs new file mode 100644 index 0000000..b11d82a --- /dev/null +++ b/database/src/unsafe_sendable.rs @@ -0,0 +1,86 @@ +//! Wrapper type for partially-`unsafe` usage of `T: !Send`. + +//---------------------------------------------------------------------------------------------------- Import +use std::{ + borrow::Borrow, + ops::{Deref, DerefMut}, +}; + +use bytemuck::TransparentWrapper; + +use crate::storable::StorableVec; + +//---------------------------------------------------------------------------------------------------- Aliases +#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, TransparentWrapper)] +#[repr(transparent)] +/// A wrapper type that `unsafe`ly implements `Send` for any `T`. +/// +/// This is a marker/wrapper type that allows wrapping +/// any type `T` such that it implements `Send`. +/// +/// This is to be used when `T` is `Send`, but only in certain +/// situations not provable to the compiler, or is otherwise a +/// a pain to prove and/or less efficient. +/// +/// It is up to the users of this type to ensure their +/// usage of `UnsafeSendable` are actually safe. +/// +/// Notably, `heed`'s table type uses this inside `service`. +pub(crate) struct UnsafeSendable(T); + +#[allow(clippy::non_send_fields_in_send_ty)] +// SAFETY: Users ensure that their usage of this type is safe. +unsafe impl Send for UnsafeSendable {} + +impl UnsafeSendable { + /// Create a new [`UnsafeSendable`]. + /// + /// # Safety + /// By constructing this type, you must ensure the usage + /// of the resulting `Self` is follows all the [`Send`] rules. + pub(crate) const unsafe fn new(t: T) -> Self { + Self(t) + } + + /// Extract the inner `T`. + pub(crate) fn into_inner(self) -> T { + self.0 + } +} + +impl Borrow for UnsafeSendable { + fn borrow(&self) -> &T { + &self.0 + } +} + +impl AsRef for UnsafeSendable { + fn as_ref(&self) -> &T { + &self.0 + } +} + +impl AsMut for UnsafeSendable { + fn as_mut(&mut self) -> &mut T { + &mut self.0 + } +} + +impl Deref for UnsafeSendable { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for UnsafeSendable { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +//---------------------------------------------------------------------------------------------------- Tests +#[cfg(test)] +mod test { + // use super::*; +} diff --git a/helper/Cargo.toml b/helper/Cargo.toml index e77fe4e..59e4e71 100644 --- a/helper/Cargo.toml +++ b/helper/Cargo.toml @@ -17,16 +17,17 @@ asynch = ["dep:futures", "dep:rayon"] constants = [] fs = ["dep:dirs"] num = [] -map = [] +map = ["dep:monero-serai"] time = ["dep:chrono", "std"] thread = ["std", "dep:target_os_lib"] [dependencies] -crossbeam = { workspace = true, optional = true } -chrono = { workspace = true, optional = true, features = ["std", "clock"] } -dirs = { workspace = true, optional = true } -futures = { workspace = true, optional = true, features = ["std"] } -rayon = { workspace = true, optional = true } +crossbeam = { workspace = true, optional = true } +chrono = { workspace = true, optional = true, features = ["std", "clock"] } +dirs = { workspace = true, optional = true } +futures = { workspace = true, optional = true, features = ["std"] } +monero-serai = { workspace = true, optional = true } +rayon = { workspace = true, optional = true } # This is kinda a stupid work around. # [thread] needs to activate one of these libs (windows|libc) diff --git a/helper/src/map.rs b/helper/src/map.rs index b17d634..96d9f61 100644 --- a/helper/src/map.rs +++ b/helper/src/map.rs @@ -5,6 +5,7 @@ //! `#[no_std]` compatible. //---------------------------------------------------------------------------------------------------- Use +use monero_serai::transaction::Timelock; //---------------------------------------------------------------------------------------------------- `(u64, u64) <-> u128` /// Split a [`u128`] value into 2 64-bit values. @@ -53,6 +54,54 @@ pub const fn combine_low_high_bits_to_u128(low_bits: u64, high_bits: u64) -> u12 res | (low_bits as u128) } +//---------------------------------------------------------------------------------------------------- Timelock +/// Map a [`u64`] to a [`Timelock`]. +/// +/// Height/time is not differentiated via type, but rather: +/// "height is any value less than 500_000_000 and timestamp is any value above" +/// so the `u64/usize` is stored without any tag. +/// +/// See [`timelock_to_u64`] for the inverse function. +/// +/// - +/// - +/// +/// ```rust +/// # use cuprate_helper::map::*; +/// # use monero_serai::transaction::*; +/// assert_eq!(u64_to_timelock(0), Timelock::None); +/// assert_eq!(u64_to_timelock(499_999_999), Timelock::Block(499_999_999)); +/// assert_eq!(u64_to_timelock(500_000_000), Timelock::Time(500_000_000)); +/// ``` +pub fn u64_to_timelock(u: u64) -> Timelock { + if u == 0 { + Timelock::None + } else if u < 500_000_000 { + Timelock::Block(usize::try_from(u).unwrap()) + } else { + Timelock::Time(u) + } +} + +/// Map [`Timelock`] to a [`u64`]. +/// +/// See [`u64_to_timelock`] for the inverse function and more documentation. +/// +/// ```rust +/// # use cuprate_helper::map::*; +/// # use monero_serai::transaction::*; +/// assert_eq!(timelock_to_u64(Timelock::None), 0); +/// assert_eq!(timelock_to_u64(Timelock::Block(499_999_999)), 499_999_999); +/// assert_eq!(timelock_to_u64(Timelock::Time(500_000_000)), 500_000_000); +/// ``` +pub fn timelock_to_u64(timelock: Timelock) -> u64 { + match timelock { + Timelock::None => 0, + Timelock::Block(u) => u64::try_from(u).unwrap(), + Timelock::Time(u) => u, + } +} + //---------------------------------------------------------------------------------------------------- Tests #[cfg(test)] mod test {}