database: impl service fn bodies (#113)

* write: impl write_block()

* ops: add `get_block_info()`

* read: impl block fn's

* read: fix signatures

* service: wrap `ConcreteEnv` in `RwLock` and doc why

* heed: use `read-txn-no-tls` for `Send` read transactions

* service: remove RwLock, impl some read functions

* read: impl `outputs()`

* read: flatten indentation, add `thread_local()`

* read: impl `number_outputs_with_amount()`

* tests: add `AssertTableLen`

* ops: replace all table len asserts with `AssertTableLen`

* service: initial tests

* service: finish most tests

* service: fix bad block data in test

* tables: fix incorrect doc

* service: add `ReadRequest::Outputs` test

* read: use macros for set/getting `ThreadLocal`'s based on backend

* small fixes

* fix review

* small fixes

* read: fix ThreadLocal macros for `redb`

* read: move `Output` mapping to `crate::free`

it's needed in tests too

* service: check output value correctness in tests

* helper: add timelock <-> u64 mapping functions

* free: use `u64_to_timelock()`

* read: rct outputs

* read: fix variable name

* read: use ThreadLocal for both backends

* heed: use Mutex for `HeedTableRo`'s read tx

* block: add miner_tx

* heed: remove Table bound

oops

* Revert "heed: use Mutex for `HeedTableRo`'s read tx"

This reverts commit 7e8aae016c55802070ccf7d152aa8966984d7186.

* add `UnsafeSendable`

* read: use `UnsafeSendable` for `heed`, branch on backend

* read: safety docs

* cargo.toml: re-add `read-txn-no-tls` for heed

* ops: fix tests, remove miner_tx

* fix tx_idx calculation, account for RCT outputs in tests

* read: docs, fix `get_tables!()` for both backends

* fix clippy

* database: `unsafe trait DatabaseRo`

* tx: use correct tx_id

* free: remove miner_tx comment

* free: remove `amount` input for rct outputs

* ops: split `add_tx` inputs

* read: use `UnsafeSendable` for all backends

* heed: update safety comment

* move output functions `free` -> `ops`

* read: fix `chain_height()` handling

* remove serde on `UnsafeSendable`

* de-dup docs on `trait DatabaseRo`, `get_tables!()`

* Update database/src/unsafe_sendable.rs

Co-authored-by: Boog900 <boog900@tutanota.com>

---------

Co-authored-by: Boog900 <boog900@tutanota.com>
This commit is contained in:
hinto-janai 2024-05-01 13:52:20 -04:00 committed by GitHub
parent 88f7d1f212
commit 9c27ba5791
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 1115 additions and 215 deletions

2
Cargo.lock generated
View file

@ -555,6 +555,7 @@ dependencies = [
"serde",
"tempfile",
"thiserror",
"thread_local",
"tokio",
"tokio-util",
"tower",
@ -569,6 +570,7 @@ dependencies = [
"dirs",
"futures",
"libc",
"monero-serai",
"rayon",
"tokio",
"windows",

View file

@ -40,10 +40,11 @@ futures = { workspace = true, optional = true }
tokio = { workspace = true, features = ["full"], optional = true }
tokio-util = { workspace = true, features = ["full"], optional = true }
tower = { workspace = true, features = ["full"], optional = true }
thread_local = { workspace = true }
rayon = { workspace = true, optional = true }
# Optional features.
heed = { version = "0.20.0", optional = true }
heed = { version = "0.20.0", features = ["read-txn-no-tls"], optional = true }
redb = { version = "2.1.0", optional = true }
serde = { workspace = true, optional = true }

View file

@ -137,7 +137,8 @@ impl<T: Table> DatabaseIter<T> for HeedTableRo<'_, T> {
}
//---------------------------------------------------------------------------------------------------- DatabaseRo Impl
impl<T: Table> DatabaseRo<T> for HeedTableRo<'_, T> {
// SAFETY: `HeedTableRo: !Send` as it holds a reference to `heed::RoTxn: Send + !Sync`.
unsafe impl<T: Table> DatabaseRo<T> for HeedTableRo<'_, T> {
#[inline]
fn get(&self, key: &T::Key) -> Result<T::Value, RuntimeError> {
get::<T>(&self.db, self.tx_ro, key)
@ -165,7 +166,9 @@ impl<T: Table> DatabaseRo<T> for HeedTableRo<'_, T> {
}
//---------------------------------------------------------------------------------------------------- DatabaseRw Impl
impl<T: Table> DatabaseRo<T> for HeedTableRw<'_, '_, T> {
// SAFETY: The `Send` bound only applies to `HeedTableRo`.
// `HeedTableRw`'s write transaction is `!Send`.
unsafe impl<T: Table> DatabaseRo<T> for HeedTableRw<'_, '_, T> {
#[inline]
fn get(&self, key: &T::Key) -> Result<T::Value, RuntimeError> {
get::<T>(&self.db, &self.tx_rw.borrow(), key)

View file

@ -118,7 +118,8 @@ impl<T: Table + 'static> DatabaseIter<T> for RedbTableRo<T::Key, T::Value> {
}
//---------------------------------------------------------------------------------------------------- DatabaseRo
impl<T: Table + 'static> DatabaseRo<T> for RedbTableRo<T::Key, T::Value> {
// SAFETY: Both `redb`'s transaction and table types are `Send + Sync`.
unsafe impl<T: Table + 'static> DatabaseRo<T> for RedbTableRo<T::Key, T::Value> {
#[inline]
fn get(&self, key: &T::Key) -> Result<T::Value, RuntimeError> {
get::<T>(self, key)
@ -146,7 +147,8 @@ impl<T: Table + 'static> DatabaseRo<T> for RedbTableRo<T::Key, T::Value> {
}
//---------------------------------------------------------------------------------------------------- DatabaseRw
impl<T: Table + 'static> DatabaseRo<T> for RedbTableRw<'_, T::Key, T::Value> {
// SAFETY: Both `redb`'s transaction and table types are `Send + Sync`.
unsafe impl<T: Table + 'static> DatabaseRo<T> for RedbTableRw<'_, T::Key, T::Value> {
#[inline]
fn get(&self, key: &T::Key) -> Result<T::Value, RuntimeError> {
get::<T>(self, key)

View file

@ -82,7 +82,29 @@ pub trait DatabaseIter<T: Table> {
///
/// This is a read-only database table,
/// write operations are defined in [`DatabaseRw`].
pub trait DatabaseRo<T: Table> {
///
/// # Safety
/// The table type that implements this MUST be `Send`.
///
/// However if the table holds a reference to a transaction:
/// - only the transaction only has to be `Send`
/// - the table cannot implement `Send`
///
/// For example:
///
/// `heed`'s transactions are `Send` but `HeedTableRo` contains a `&`
/// to the transaction, as such, if `Send` were implemented on `HeedTableRo`
/// then 1 transaction could be used to open multiple tables, then sent to
/// other threads - this would be a soundness hole against `HeedTableRo`.
///
/// `&T` is only `Send` if `T: Sync`.
///
/// `heed::RoTxn: !Sync`, therefore our table
/// holding `&heed::RoTxn` must NOT be `Send`.
///
/// - <https://doc.rust-lang.org/std/marker/trait.Sync.html>
/// - <https://doc.rust-lang.org/nomicon/send-and-sync.html>
pub unsafe trait DatabaseRo<T: Table> {
/// Get the value corresponding to a key.
///
/// The returned value is _owned_.

View file

@ -12,7 +12,7 @@ use crate::{
tables::{
call_fn_on_all_tables_or_early_return, BlockBlobs, BlockHeights, BlockInfos, KeyImages,
NumOutputs, Outputs, PrunableHashes, PrunableTxBlobs, PrunedTxBlobs, RctOutputs, Tables,
TablesMut, TxHeights, TxIds, TxUnlockTime,
TablesIter, TablesMut, TxHeights, TxIds, TxUnlockTime,
},
transaction::{TxRo, TxRw},
};
@ -233,7 +233,7 @@ where
///
/// # Errors
/// TODO
fn open_tables(&self, tx_ro: &Ro) -> Result<impl Tables, RuntimeError> {
fn open_tables(&self, tx_ro: &Ro) -> Result<impl TablesIter, RuntimeError> {
call_fn_on_all_tables_or_early_return! {
Self::open_db_ro(self, tx_ro)
}

View file

@ -1,6 +1,4 @@
//! General free functions (related to the database).
//!
//! TODO.
//---------------------------------------------------------------------------------------------------- Import

View file

@ -191,6 +191,10 @@
// although it is sometimes nice.
clippy::must_use_candidate,
// FIXME: good lint but too many false positives
// with our `Env` + `RwLock` setup.
clippy::significant_drop_tightening,
// TODO: should be removed after all `todo!()`'s are gone.
clippy::diverging_sub_expression,
@ -238,7 +242,7 @@ pub use env::{Env, EnvInner};
mod error;
pub use error::{InitError, RuntimeError};
mod free;
pub(crate) mod free;
pub mod resize;
@ -269,3 +273,6 @@ pub mod service;
//---------------------------------------------------------------------------------------------------- Private
#[cfg(test)]
pub(crate) mod tests;
#[cfg(feature = "service")] // only needed in `service` for now
pub(crate) mod unsafe_sendable;

View file

@ -89,8 +89,14 @@ pub fn add_block(
}
//------------------------------------------------------ Transaction / Outputs / Key Images
for tx_verification_data in &block.txs {
add_tx(tx_verification_data, &chain_height, tables)?;
// Add the miner transaction first.
{
let tx = &block.block.miner_tx;
add_tx(tx, &tx.serialize(), &tx.hash(), &chain_height, tables)?;
}
for tx in &block.txs {
add_tx(&tx.tx, &tx.tx_blob, &tx.tx_hash, &chain_height, tables)?;
}
//------------------------------------------------------ Block Info
@ -161,6 +167,7 @@ pub fn pop_block(
let block = Block::read(&mut block_blob.as_slice())?;
//------------------------------------------------------ Transaction / Outputs / Key Images
remove_tx(&block.miner_tx.hash(), tables)?;
for tx_hash in &block.txs {
remove_tx(tx_hash, tables)?;
}
@ -226,6 +233,16 @@ pub fn get_block_extended_header_top(
}
//---------------------------------------------------------------------------------------------------- Misc
/// Retrieve a [`BlockInfo`] via its [`BlockHeight`].
#[doc = doc_error!()]
#[inline]
pub fn get_block_info(
block_height: &BlockHeight,
table_block_infos: &impl DatabaseRo<BlockInfos>,
) -> Result<BlockInfo, RuntimeError> {
table_block_infos.get(block_height)
}
/// Retrieve a [`BlockHeight`] via its [`BlockHash`].
#[doc = doc_error!()]
#[inline]
@ -262,7 +279,7 @@ mod test {
use super::*;
use crate::{
ops::tx::{get_tx, tx_exists},
tests::{assert_all_tables_are_empty, tmp_concrete_env},
tests::{assert_all_tables_are_empty, tmp_concrete_env, AssertTableLen},
Env,
};
@ -315,20 +332,23 @@ mod test {
let tables = env_inner.open_tables(&tx_ro).unwrap();
// Assert only the proper tables were added to.
assert_eq!(tables.block_infos().len().unwrap(), 3);
assert_eq!(tables.block_blobs().len().unwrap(), 3);
assert_eq!(tables.block_heights().len().unwrap(), 3);
assert_eq!(tables.key_images().len().unwrap(), 69);
assert_eq!(tables.num_outputs().len().unwrap(), 38);
assert_eq!(tables.pruned_tx_blobs().len().unwrap(), 0);
assert_eq!(tables.prunable_hashes().len().unwrap(), 0);
assert_eq!(tables.outputs().len().unwrap(), 107);
assert_eq!(tables.prunable_tx_blobs().len().unwrap(), 0);
assert_eq!(tables.rct_outputs().len().unwrap(), 6);
assert_eq!(tables.tx_blobs().len().unwrap(), 5);
assert_eq!(tables.tx_ids().len().unwrap(), 5);
assert_eq!(tables.tx_heights().len().unwrap(), 5);
assert_eq!(tables.tx_unlock_time().len().unwrap(), 0);
AssertTableLen {
block_infos: 3,
block_blobs: 3,
block_heights: 3,
key_images: 69,
num_outputs: 41,
pruned_tx_blobs: 0,
prunable_hashes: 0,
outputs: 111,
prunable_tx_blobs: 0,
rct_outputs: 8,
tx_blobs: 8,
tx_ids: 8,
tx_heights: 8,
tx_unlock_time: 3,
}
.assert(&tables);
// Check `cumulative` functions work.
assert_eq!(

View file

@ -101,7 +101,7 @@ mod test {
block::add_block,
tx::{get_tx, tx_exists},
},
tests::{assert_all_tables_are_empty, tmp_concrete_env},
tests::{assert_all_tables_are_empty, tmp_concrete_env, AssertTableLen},
Env,
};
@ -149,6 +149,24 @@ mod test {
}
// Assert reads are correct.
AssertTableLen {
block_infos: 3,
block_blobs: 3,
block_heights: 3,
key_images: 69,
num_outputs: 41,
pruned_tx_blobs: 0,
prunable_hashes: 0,
outputs: 111,
prunable_tx_blobs: 0,
rct_outputs: 8,
tx_blobs: 8,
tx_ids: 8,
tx_heights: 8,
tx_unlock_time: 3,
}
.assert(&tables);
assert_eq!(blocks_len, chain_height(tables.block_heights()).unwrap());
assert_eq!(
blocks_len - 1,

View file

@ -64,7 +64,7 @@ mod test {
use super::*;
use crate::{
ops::tx::{get_tx, tx_exists},
tests::{assert_all_tables_are_empty, tmp_concrete_env},
tests::{assert_all_tables_are_empty, tmp_concrete_env, AssertTableLen},
Env,
};
@ -108,23 +108,11 @@ mod test {
let tables = env_inner.open_tables(&tx_ro).unwrap();
// Assert only the proper tables were added to.
assert_eq!(
tables.key_images().len().unwrap(),
u64::try_from(key_images.len()).unwrap()
);
assert_eq!(tables.block_infos().len().unwrap(), 0);
assert_eq!(tables.block_blobs().len().unwrap(), 0);
assert_eq!(tables.block_heights().len().unwrap(), 0);
assert_eq!(tables.num_outputs().len().unwrap(), 0);
assert_eq!(tables.pruned_tx_blobs().len().unwrap(), 0);
assert_eq!(tables.prunable_hashes().len().unwrap(), 0);
assert_eq!(tables.outputs().len().unwrap(), 0);
assert_eq!(tables.prunable_tx_blobs().len().unwrap(), 0);
assert_eq!(tables.rct_outputs().len().unwrap(), 0);
assert_eq!(tables.tx_blobs().len().unwrap(), 0);
assert_eq!(tables.tx_ids().len().unwrap(), 0);
assert_eq!(tables.tx_heights().len().unwrap(), 0);
assert_eq!(tables.tx_unlock_time().len().unwrap(), 0);
AssertTableLen {
key_images: tables.key_images().len().unwrap(),
..Default::default()
}
.assert(&tables);
for key_image in &key_images {
println!("key_image_exists(): {}", hex::encode(key_image));

View file

@ -1,7 +1,12 @@
//! Outputs.
use cuprate_helper::map::u64_to_timelock;
use curve25519_dalek::{constants::ED25519_BASEPOINT_POINT, edwards::CompressedEdwardsY, Scalar};
//---------------------------------------------------------------------------------------------------- Import
use monero_serai::transaction::{Timelock, Transaction};
use monero_serai::{
transaction::{Timelock, Transaction},
H,
};
use cuprate_types::{OutputOnChain, VerifiedBlockInformation};
@ -17,8 +22,8 @@ use crate::{
},
transaction::{TxRo, TxRw},
types::{
Amount, AmountIndex, BlockHash, BlockHeight, BlockInfo, KeyImage, Output, PreRctOutputId,
RctOutput, TxHash,
Amount, AmountIndex, BlockHash, BlockHeight, BlockInfo, KeyImage, Output, OutputFlags,
PreRctOutputId, RctOutput, TxHash,
},
};
@ -153,13 +158,110 @@ pub fn get_rct_num_outputs(
table_rct_outputs.len()
}
//---------------------------------------------------------------------------------------------------- Mapping functions
/// Map an [`Output`] to a [`cuprate_types::OutputOnChain`].
#[doc = doc_error!()]
pub fn output_to_output_on_chain(
output: &Output,
amount: Amount,
table_tx_unlock_time: &impl DatabaseRo<TxUnlockTime>,
) -> Result<OutputOnChain, RuntimeError> {
// FIXME: implement lookup table for common values:
// <https://github.com/monero-project/monero/blob/c8214782fb2a769c57382a999eaf099691c836e7/src/ringct/rctOps.cpp#L322>
let commitment = ED25519_BASEPOINT_POINT + H() * Scalar::from(amount);
let time_lock = if output
.output_flags
.contains(OutputFlags::NON_ZERO_UNLOCK_TIME)
{
u64_to_timelock(table_tx_unlock_time.get(&output.tx_idx)?)
} else {
Timelock::None
};
let key = CompressedEdwardsY::from_slice(&output.key)
.map(|y| y.decompress())
.unwrap_or(None);
Ok(OutputOnChain {
height: u64::from(output.height),
time_lock,
key,
commitment,
})
}
/// Map an [`RctOutput`] to a [`cuprate_types::OutputOnChain`].
///
/// # Panics
/// This function will panic if `rct_output`'s `commitment` fails to decompress
/// into a valid [`EdwardsPoint`](curve25519_dalek::edwards::EdwardsPoint).
///
/// This should normally not happen as commitments that
/// are stored in the database should always be valid.
#[doc = doc_error!()]
pub fn rct_output_to_output_on_chain(
rct_output: &RctOutput,
table_tx_unlock_time: &impl DatabaseRo<TxUnlockTime>,
) -> Result<OutputOnChain, RuntimeError> {
// INVARIANT: Commitments stored are valid when stored by the database.
let commitment = CompressedEdwardsY::from_slice(&rct_output.commitment)
.unwrap()
.decompress()
.unwrap();
let time_lock = if rct_output
.output_flags
.contains(OutputFlags::NON_ZERO_UNLOCK_TIME)
{
u64_to_timelock(table_tx_unlock_time.get(&rct_output.tx_idx)?)
} else {
Timelock::None
};
let key = CompressedEdwardsY::from_slice(&rct_output.key)
.map(|y| y.decompress())
.unwrap_or(None);
Ok(OutputOnChain {
height: u64::from(rct_output.height),
time_lock,
key,
commitment,
})
}
/// Map an [`PreRctOutputId`] to an [`OutputOnChain`].
///
/// Note that this still support RCT outputs, in that case, [`PreRctOutputId::amount`] should be `0`.
#[doc = doc_error!()]
pub fn id_to_output_on_chain(
id: &PreRctOutputId,
tables: &impl Tables,
) -> Result<OutputOnChain, RuntimeError> {
// v2 transactions.
if id.amount == 0 {
let rct_output = get_rct_output(&id.amount_index, tables.rct_outputs())?;
let output_on_chain = rct_output_to_output_on_chain(&rct_output, tables.tx_unlock_time())?;
Ok(output_on_chain)
} else {
// v1 transactions.
let output = get_output(id, tables.outputs())?;
let output_on_chain =
output_to_output_on_chain(&output, id.amount, tables.tx_unlock_time())?;
Ok(output_on_chain)
}
}
//---------------------------------------------------------------------------------------------------- Tests
#[cfg(test)]
#[allow(clippy::significant_drop_tightening, clippy::cognitive_complexity)]
mod test {
use super::*;
use crate::{
tests::{assert_all_tables_are_empty, tmp_concrete_env},
tests::{assert_all_tables_are_empty, tmp_concrete_env, AssertTableLen},
types::OutputFlags,
Env,
};
@ -221,20 +323,23 @@ mod test {
// Assert all reads of the outputs are OK.
{
// Assert proper tables were added to.
assert_eq!(tables.block_infos().len().unwrap(), 0);
assert_eq!(tables.block_blobs().len().unwrap(), 0);
assert_eq!(tables.block_heights().len().unwrap(), 0);
assert_eq!(tables.key_images().len().unwrap(), 0);
assert_eq!(tables.num_outputs().len().unwrap(), 1);
assert_eq!(tables.pruned_tx_blobs().len().unwrap(), 0);
assert_eq!(tables.prunable_hashes().len().unwrap(), 0);
assert_eq!(tables.outputs().len().unwrap(), 1);
assert_eq!(tables.prunable_tx_blobs().len().unwrap(), 0);
assert_eq!(tables.rct_outputs().len().unwrap(), 1);
assert_eq!(tables.tx_blobs().len().unwrap(), 0);
assert_eq!(tables.tx_ids().len().unwrap(), 0);
assert_eq!(tables.tx_heights().len().unwrap(), 0);
assert_eq!(tables.tx_unlock_time().len().unwrap(), 0);
AssertTableLen {
block_infos: 0,
block_blobs: 0,
block_heights: 0,
key_images: 0,
num_outputs: 1,
pruned_tx_blobs: 0,
prunable_hashes: 0,
outputs: 1,
prunable_tx_blobs: 0,
rct_outputs: 1,
tx_blobs: 0,
tx_ids: 0,
tx_heights: 0,
tx_unlock_time: 0,
}
.assert(&tables);
// Assert length is correct.
assert_eq!(get_num_outputs(tables.outputs()).unwrap(), 1);

View file

@ -36,7 +36,7 @@ use super::{
};
//---------------------------------------------------------------------------------------------------- Private
/// Add a [`TransactionVerificationData`] to the database.
/// Add a [`Transaction`] (and related data) to the database.
///
/// The `block_height` is the block that this `tx` belongs to.
///
@ -63,18 +63,20 @@ use super::{
#[doc = doc_error!()]
#[inline]
pub fn add_tx(
tx: &TransactionVerificationData,
tx: &Transaction,
tx_blob: &Vec<u8>,
tx_hash: &TxHash,
block_height: &BlockHeight,
tables: &mut impl TablesMut,
) -> Result<TxId, RuntimeError> {
let tx_id = get_num_tx(tables.tx_ids_mut())?;
//------------------------------------------------------ Transaction data
tables.tx_ids_mut().put(&tx.tx_hash, &tx_id)?;
tables.tx_ids_mut().put(tx_hash, &tx_id)?;
tables.tx_heights_mut().put(&tx_id, block_height)?;
tables
.tx_blobs_mut()
.put(&tx_id, StorableVec::wrap_ref(&tx.tx_blob))?;
.put(&tx_id, StorableVec::wrap_ref(tx_blob))?;
//------------------------------------------------------ Timelocks
// Height/time is not differentiated via type, but rather:
@ -82,7 +84,7 @@ pub fn add_tx(
// so the `u64/usize` is stored without any tag.
//
// <https://github.com/Cuprate/cuprate/pull/102#discussion_r1558504285>
match tx.tx.prefix.timelock {
match tx.prefix.timelock {
Timelock::None => (),
Timelock::Block(height) => tables.tx_unlock_time_mut().put(&tx_id, &(height as u64))?,
Timelock::Time(time) => tables.tx_unlock_time_mut().put(&tx_id, &time)?,
@ -95,8 +97,6 @@ pub fn add_tx(
// }
//------------------------------------------------------
// Refer to the inner transaction type from now on.
let tx: &Transaction = &tx.tx;
let Ok(height) = u32::try_from(*block_height) else {
panic!("add_tx(): block_height ({block_height}) > u32::MAX");
};
@ -128,7 +128,6 @@ pub fn add_tx(
};
let mut amount_indices = Vec::with_capacity(tx.prefix.outputs.len());
let tx_idx = get_num_tx(tables.tx_ids_mut())?;
for (i, output) in tx.prefix.outputs.iter().enumerate() {
let key = *output.key.as_bytes();
@ -151,7 +150,7 @@ pub fn add_tx(
key,
height,
output_flags,
tx_idx,
tx_idx: tx_id,
commitment,
},
tables.rct_outputs_mut(),
@ -164,7 +163,7 @@ pub fn add_tx(
key,
height,
output_flags,
tx_idx,
tx_idx: tx_id,
},
tables,
)?
@ -178,7 +177,7 @@ pub fn add_tx(
key,
height,
output_flags,
tx_idx,
tx_idx: tx_id,
commitment,
},
tables.rct_outputs_mut(),
@ -343,7 +342,7 @@ pub fn tx_exists(
mod test {
use super::*;
use crate::{
tests::{assert_all_tables_are_empty, tmp_concrete_env},
tests::{assert_all_tables_are_empty, tmp_concrete_env, AssertTableLen},
Env,
};
use cuprate_test_utils::data::{tx_v1_sig0, tx_v1_sig2, tx_v2_rct3};
@ -368,7 +367,7 @@ mod test {
.iter()
.map(|tx| {
println!("add_tx(): {tx:#?}");
add_tx(tx, &0, &mut tables).unwrap()
add_tx(&tx.tx, &tx.tx_blob, &tx.tx_hash, &0, &mut tables).unwrap()
})
.collect::<Vec<TxId>>();
@ -384,20 +383,23 @@ mod test {
let tables = env_inner.open_tables(&tx_ro).unwrap();
// Assert only the proper tables were added to.
assert_eq!(tables.block_infos().len().unwrap(), 0);
assert_eq!(tables.block_blobs().len().unwrap(), 0);
assert_eq!(tables.block_heights().len().unwrap(), 0);
assert_eq!(tables.key_images().len().unwrap(), 4); // added to key images
assert_eq!(tables.pruned_tx_blobs().len().unwrap(), 0);
assert_eq!(tables.prunable_hashes().len().unwrap(), 0);
assert_eq!(tables.num_outputs().len().unwrap(), 9);
assert_eq!(tables.outputs().len().unwrap(), 10); // added to outputs
assert_eq!(tables.prunable_tx_blobs().len().unwrap(), 0);
assert_eq!(tables.rct_outputs().len().unwrap(), 2);
assert_eq!(tables.tx_blobs().len().unwrap(), 3);
assert_eq!(tables.tx_ids().len().unwrap(), 3);
assert_eq!(tables.tx_heights().len().unwrap(), 3);
assert_eq!(tables.tx_unlock_time().len().unwrap(), 1); // only 1 has a timelock
AssertTableLen {
block_infos: 0,
block_blobs: 0,
block_heights: 0,
key_images: 4, // added to key images
pruned_tx_blobs: 0,
prunable_hashes: 0,
num_outputs: 9,
outputs: 10, // added to outputs
prunable_tx_blobs: 0,
rct_outputs: 2,
tx_blobs: 3,
tx_ids: 3,
tx_heights: 3,
tx_unlock_time: 1, // only 1 has a timelock
}
.assert(&tables);
// Both from ID and hash should result in getting the same transaction.
let mut tx_hashes = vec![];

View file

@ -3,27 +3,46 @@
//---------------------------------------------------------------------------------------------------- Import
use std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
ops::Range,
sync::Arc,
sync::{Arc, RwLock},
task::{Context, Poll},
};
use cfg_if::cfg_if;
use crossbeam::channel::Receiver;
use curve25519_dalek::{constants::ED25519_BASEPOINT_POINT, edwards::CompressedEdwardsY, Scalar};
use futures::{channel::oneshot, ready};
use monero_serai::{transaction::Timelock, H};
use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
use thread_local::ThreadLocal;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::PollSemaphore;
use cuprate_helper::asynch::InfallibleOneshotReceiver;
use cuprate_types::service::{ReadRequest, Response};
use cuprate_types::{
service::{ReadRequest, Response},
ExtendedBlockHeader, OutputOnChain,
};
use crate::{
config::ReaderThreads,
constants::DATABASE_CORRUPT_MSG,
error::RuntimeError,
ops::{
block::{get_block_extended_header_from_height, get_block_info},
blockchain::{cumulative_generated_coins, top_block_height},
key_image::key_image_exists,
output::{
get_output, get_rct_output, id_to_output_on_chain, output_to_output_on_chain,
rct_output_to_output_on_chain,
},
},
service::types::{ResponseReceiver, ResponseResult, ResponseSender},
types::{Amount, AmountIndex, BlockHeight, KeyImage},
ConcreteEnv,
tables::{BlockHeights, BlockInfos, KeyImages, NumOutputs, Outputs, Tables},
types::{Amount, AmountIndex, BlockHeight, KeyImage, OutputFlags, PreRctOutputId},
unsafe_sendable::UnsafeSendable,
ConcreteEnv, DatabaseRo, Env, EnvInner,
};
//---------------------------------------------------------------------------------------------------- DatabaseReadHandle
@ -65,10 +84,10 @@ pub struct DatabaseReadHandle {
impl Clone for DatabaseReadHandle {
fn clone(&self) -> Self {
Self {
pool: self.pool.clone(),
pool: Arc::clone(&self.pool),
semaphore: self.semaphore.clone(),
permit: None,
env: self.env.clone(),
env: Arc::clone(&self.env),
}
}
}
@ -106,23 +125,21 @@ impl DatabaseReadHandle {
}
}
/// TODO
/// Access to the actual database environment.
///
/// # ⚠️ Warning
/// This function gives you access to the actual
/// underlying database connected to by `self`.
///
/// I.e. it allows you to read/write data _directly_
/// instead of going through a request.
///
/// Be warned that using the database directly
/// in this manner has not been tested.
#[inline]
pub const fn env(&self) -> &Arc<ConcreteEnv> {
&self.env
}
/// TODO
#[inline]
pub const fn semaphore(&self) -> &PollSemaphore {
&self.semaphore
}
/// TODO
#[inline]
pub const fn permit(&self) -> &Option<OwnedSemaphorePermit> {
&self.permit
}
}
impl tower::Service<ReadRequest> for DatabaseReadHandle {
@ -163,9 +180,11 @@ impl tower::Service<ReadRequest> for DatabaseReadHandle {
//
// INVARIANT:
// The below `DatabaseReader` function impl block relies on this behavior.
let env = Arc::clone(self.env());
self.pool
.spawn(move || map_request(permit, env, request, response_sender));
let env = Arc::clone(&self.env);
self.pool.spawn(move || {
let _permit: OwnedSemaphorePermit = permit;
map_request(&env, request, response_sender);
}); // drop(permit/env);
InfallibleOneshotReceiver::from(receiver)
}
@ -175,7 +194,6 @@ impl tower::Service<ReadRequest> for DatabaseReadHandle {
// This function maps [`Request`]s to function calls
// executed by the rayon DB reader threadpool.
#[allow(clippy::needless_pass_by_value)] // TODO: fix me
/// Map [`Request`]'s to specific database handler functions.
///
/// This is the main entrance into all `Request` handler functions.
@ -184,23 +202,23 @@ impl tower::Service<ReadRequest> for DatabaseReadHandle {
/// 2. Handler function is called
/// 3. [`Response`] is sent
fn map_request(
_permit: OwnedSemaphorePermit, // Permit for this request, dropped at end of function
env: Arc<ConcreteEnv>, // Access to the database
env: &ConcreteEnv, // Access to the database
request: ReadRequest, // The request we must fulfill
response_sender: ResponseSender, // The channel we must send the response back to
) {
/* TODO: pre-request handling, run some code for each request? */
use ReadRequest as R;
/* TODO: pre-request handling, run some code for each request? */
let response = match request {
R::BlockExtendedHeader(block) => block_extended_header(&env, block),
R::BlockHash(block) => block_hash(&env, block),
R::BlockExtendedHeaderInRange(range) => block_extended_header_in_range(&env, range),
R::ChainHeight => chain_height(&env),
R::GeneratedCoins => generated_coins(&env),
R::Outputs(map) => outputs(&env, map),
R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(&env, vec),
R::CheckKIsNotSpent(set) => check_k_is_not_spent(&env, set),
R::BlockExtendedHeader(block) => block_extended_header(env, block),
R::BlockHash(block) => block_hash(env, block),
R::BlockExtendedHeaderInRange(range) => block_extended_header_in_range(env, range),
R::ChainHeight => chain_height(env),
R::GeneratedCoins => generated_coins(env),
R::Outputs(map) => outputs(env, map),
R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(env, vec),
R::CheckKIsNotSpent(set) => check_k_is_not_spent(env, set),
};
if let Err(e) = response_sender.send(response) {
@ -211,6 +229,54 @@ fn map_request(
/* TODO: post-request handling, run some code for each request? */
}
//---------------------------------------------------------------------------------------------------- Thread Local
/// Q: Why does this exist?
///
/// A1: `heed`'s transactions and tables are not `Sync`, so we cannot use
/// them with rayon, however, we set a feature such that they are `Send`.
///
/// A2: When sending to rayon, we want to ensure each read transaction
/// is only being used by 1 thread only to scale reads
///
/// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1576762346>
#[inline]
fn thread_local<T: Send>(env: &impl Env) -> ThreadLocal<T> {
ThreadLocal::with_capacity(env.config().reader_threads.as_threads().get())
}
/// Take in a `ThreadLocal<impl Tables>` and return an `&impl Tables + Send`.
///
/// # Safety
/// See [`DatabaseRo`] docs.
///
/// We are safely using `UnsafeSendable` in `service`'s reader thread-pool
/// as we are pairing our usage with `ThreadLocal` - only 1 thread
/// will ever access a transaction at a time. This is an INVARIANT.
///
/// A `Mutex` was considered but:
/// - It is less performant
/// - It isn't technically needed for safety in our use-case
/// - It causes `DatabaseIter` function return issues as there is a `MutexGuard` object
///
/// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1581684698>
///
/// # Notes
/// This is used for other backends as well instead of branching with `cfg_if`.
/// The other backends (as of current) are `Send + Sync` so this is fine.
/// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1585618374>
macro_rules! get_tables {
($env_inner:ident, $tx_ro:ident, $tables:ident) => {{
$tables.get_or_try(|| {
#[allow(clippy::significant_drop_in_scrutinee)]
match $env_inner.open_tables($tx_ro) {
// SAFETY: see above macro doc comment.
Ok(tables) => Ok(unsafe { crate::unsafe_sendable::UnsafeSendable::new(tables) }),
Err(e) => Err(e),
}
})
}};
}
//---------------------------------------------------------------------------------------------------- Handler functions
// These are the actual functions that do stuff according to the incoming [`Request`].
//
@ -228,57 +294,211 @@ fn map_request(
// All functions below assume that this is the case, such that
// `par_*()` functions will not block the _global_ rayon thread-pool.
// TODO: implement multi-transaction read atomicity.
// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1576874589>.
/// [`ReadRequest::BlockExtendedHeader`].
#[inline]
fn block_extended_header(env: &Arc<ConcreteEnv>, block_height: BlockHeight) -> ResponseResult {
todo!()
fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
// Single-threaded, no `ThreadLocal` required.
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let tables = env_inner.open_tables(&tx_ro)?;
Ok(Response::BlockExtendedHeader(
get_block_extended_header_from_height(&block_height, &tables)?,
))
}
/// [`ReadRequest::BlockHash`].
#[inline]
fn block_hash(env: &Arc<ConcreteEnv>, block_height: BlockHeight) -> ResponseResult {
todo!()
fn block_hash(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
// Single-threaded, no `ThreadLocal` required.
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
Ok(Response::BlockHash(
get_block_info(&block_height, &table_block_infos)?.block_hash,
))
}
/// [`ReadRequest::BlockExtendedHeaderInRange`].
#[inline]
fn block_extended_header_in_range(
env: &Arc<ConcreteEnv>,
env: &ConcreteEnv,
range: std::ops::Range<BlockHeight>,
) -> ResponseResult {
todo!()
// Prepare tx/tables in `ThreadLocal`.
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
// Collect results using `rayon`.
let vec = range
.into_par_iter()
.map(|block_height| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
get_block_extended_header_from_height(&block_height, tables)
})
.collect::<Result<Vec<ExtendedBlockHeader>, RuntimeError>>()?;
Ok(Response::BlockExtendedHeaderInRange(vec))
}
/// [`ReadRequest::ChainHeight`].
#[inline]
fn chain_height(env: &Arc<ConcreteEnv>) -> ResponseResult {
todo!()
fn chain_height(env: &ConcreteEnv) -> ResponseResult {
// Single-threaded, no `ThreadLocal` required.
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
let chain_height = crate::ops::blockchain::chain_height(&table_block_heights)?;
let block_hash =
get_block_info(&chain_height.saturating_sub(1), &table_block_infos)?.block_hash;
Ok(Response::ChainHeight(chain_height, block_hash))
}
/// [`ReadRequest::GeneratedCoins`].
#[inline]
fn generated_coins(env: &Arc<ConcreteEnv>) -> ResponseResult {
todo!()
fn generated_coins(env: &ConcreteEnv) -> ResponseResult {
// Single-threaded, no `ThreadLocal` required.
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
let top_height = top_block_height(&table_block_heights)?;
Ok(Response::GeneratedCoins(cumulative_generated_coins(
&top_height,
&table_block_infos,
)?))
}
/// [`ReadRequest::Outputs`].
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn outputs(env: &Arc<ConcreteEnv>, map: HashMap<Amount, HashSet<AmountIndex>>) -> ResponseResult {
todo!()
fn outputs(env: &ConcreteEnv, outputs: HashMap<Amount, HashSet<AmountIndex>>) -> ResponseResult {
// Prepare tx/tables in `ThreadLocal`.
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
// The 2nd mapping function.
// This is pulled out from the below `map()` for readability.
let inner_map = |amount, amount_index| -> Result<(AmountIndex, OutputOnChain), RuntimeError> {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
let id = PreRctOutputId {
amount,
amount_index,
};
let output_on_chain = id_to_output_on_chain(&id, tables)?;
Ok((amount_index, output_on_chain))
};
// Collect results using `rayon`.
let map = outputs
.into_par_iter()
.map(|(amount, amount_index_set)| {
Ok((
amount,
amount_index_set
.into_par_iter()
.map(|amount_index| inner_map(amount, amount_index))
.collect::<Result<HashMap<AmountIndex, OutputOnChain>, RuntimeError>>()?,
))
})
.collect::<Result<HashMap<Amount, HashMap<AmountIndex, OutputOnChain>>, RuntimeError>>()?;
Ok(Response::Outputs(map))
}
/// [`ReadRequest::NumberOutputsWithAmount`].
/// TODO
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn number_outputs_with_amount(env: &Arc<ConcreteEnv>, vec: Vec<Amount>) -> ResponseResult {
todo!()
fn number_outputs_with_amount(env: &ConcreteEnv, amounts: Vec<Amount>) -> ResponseResult {
// Prepare tx/tables in `ThreadLocal`.
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
// Cache the amount of RCT outputs once.
// INVARIANT: #[cfg] @ lib.rs asserts `usize == u64`
#[allow(clippy::cast_possible_truncation)]
let num_rct_outputs = {
let tx_ro = env_inner.tx_ro()?;
let tables = env_inner.open_tables(&tx_ro)?;
tables.rct_outputs().len()? as usize
};
// Collect results using `rayon`.
let map = amounts
.into_par_iter()
.map(|amount| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
if amount == 0 {
// v2 transactions.
Ok((amount, num_rct_outputs))
} else {
// v1 transactions.
match tables.num_outputs().get(&amount) {
// INVARIANT: #[cfg] @ lib.rs asserts `usize == u64`
#[allow(clippy::cast_possible_truncation)]
Ok(count) => Ok((amount, count as usize)),
// If we get a request for an `amount` that doesn't exist,
// we return `0` instead of an error.
Err(RuntimeError::KeyNotFound) => Ok((amount, 0)),
Err(e) => Err(e),
}
}
})
.collect::<Result<HashMap<Amount, usize>, RuntimeError>>()?;
Ok(Response::NumberOutputsWithAmount(map))
}
/// [`ReadRequest::CheckKIsNotSpent`].
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn check_k_is_not_spent(env: &Arc<ConcreteEnv>, set: HashSet<KeyImage>) -> ResponseResult {
todo!()
fn check_k_is_not_spent(env: &ConcreteEnv, key_images: HashSet<KeyImage>) -> ResponseResult {
// Prepare tx/tables in `ThreadLocal`.
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
// Key image check function.
let key_image_exists = |key_image| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
key_image_exists(&key_image, tables.key_images())
};
// TODO:
// Create/use `enum cuprate_types::Exist { Does, DoesNot }`
// or similar instead of `bool` for clarity.
// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1581536526>
//
// Collect results using `rayon`.
match key_images
.into_par_iter()
.map(key_image_exists)
// If the result is either:
// `Ok(true)` => a key image was found, return early
// `Err` => an error was found, return early
//
// Else, `Ok(false)` will continue the iterator.
.find_any(|result| !matches!(result, Ok(false)))
{
None | Some(Ok(false)) => Ok(Response::CheckKIsNotSpent(true)), // Key image was NOT found.
Some(Ok(true)) => Ok(Response::CheckKIsNotSpent(false)), // Key image was found.
Some(Err(e)) => Err(e), // A database error occurred.
}
}

View file

@ -7,76 +7,379 @@
// This is only imported on `#[cfg(test)]` in `mod.rs`.
#![allow(unused_mut, clippy::significant_drop_tightening)]
#![allow(
clippy::significant_drop_tightening,
clippy::await_holding_lock,
clippy::too_many_lines
)]
//---------------------------------------------------------------------------------------------------- Use
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use pretty_assertions::assert_eq;
use tower::{Service, ServiceExt};
use cuprate_types::service::{ReadRequest, Response, WriteRequest};
use cuprate_test_utils::data::{block_v16_tx0, block_v1_tx2, block_v9_tx3};
use cuprate_types::{
service::{ReadRequest, Response, WriteRequest},
ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation,
};
use crate::{
config::Config,
ops::{
block::{get_block_extended_header_from_height, get_block_info},
blockchain::{chain_height, top_block_height},
output::{get_output, id_to_output_on_chain, output_to_output_on_chain},
},
service::{init, DatabaseReadHandle, DatabaseWriteHandle},
tables::{KeyImages, Tables, TablesIter},
tests::AssertTableLen,
types::{Amount, AmountIndex, KeyImage, PreRctOutputId},
ConcreteEnv, DatabaseIter, DatabaseRo, Env, EnvInner, RuntimeError,
};
//---------------------------------------------------------------------------------------------------- Tests
//---------------------------------------------------------------------------------------------------- Helper functions
/// Initialize the `service`.
fn init_service() -> (DatabaseReadHandle, DatabaseWriteHandle, tempfile::TempDir) {
fn init_service() -> (
DatabaseReadHandle,
DatabaseWriteHandle,
Arc<ConcreteEnv>,
tempfile::TempDir,
) {
let tempdir = tempfile::tempdir().unwrap();
let config = Config::low_power(Some(tempdir.path().into()));
let (reader, writer) = init(config).unwrap();
(reader, writer, tempdir)
let env = reader.env().clone();
(reader, writer, env, tempdir)
}
/// This is the template used in the actual test functions below.
///
/// - Send write request(s)
/// - Receive response(s)
/// - Assert proper tables were mutated
/// - Assert read requests lead to expected responses
#[allow(clippy::future_not_send)] // INVARIANT: tests are using a single threaded runtime
async fn test_template(
// Which block(s) to add?
block_fns: &[fn() -> &'static VerifiedBlockInformation],
// Total amount of generated coins after the block(s) have been added.
cumulative_generated_coins: u64,
// What are the table lengths be after the block(s) have been added?
assert_table_len: AssertTableLen,
) {
//----------------------------------------------------------------------- Write requests
let (reader, mut writer, env, _tempdir) = init_service();
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro().unwrap();
let tables = env_inner.open_tables(&tx_ro).unwrap();
// HACK: `add_block()` asserts blocks with non-sequential heights
// cannot be added, to get around this, manually edit the block height.
for (i, block_fn) in block_fns.iter().enumerate() {
let mut block = block_fn().clone();
block.height = i as u64;
// Request a block to be written, assert it was written.
let request = WriteRequest::WriteBlock(block);
let response_channel = writer.call(request);
let response = response_channel.await.unwrap();
assert_eq!(response, Response::WriteBlockOk);
}
//----------------------------------------------------------------------- Reset the transaction
drop(tables);
drop(tx_ro);
let tx_ro = env_inner.tx_ro().unwrap();
let tables = env_inner.open_tables(&tx_ro).unwrap();
//----------------------------------------------------------------------- Assert all table lengths are correct
assert_table_len.assert(&tables);
//----------------------------------------------------------------------- Read request prep
// Next few lines are just for preparing the expected responses,
// see further below for usage.
let extended_block_header_0 = Ok(Response::BlockExtendedHeader(
get_block_extended_header_from_height(&0, &tables).unwrap(),
));
let extended_block_header_1 = if block_fns.len() > 1 {
Ok(Response::BlockExtendedHeader(
get_block_extended_header_from_height(&1, &tables).unwrap(),
))
} else {
Err(RuntimeError::KeyNotFound)
};
let block_hash_0 = Ok(Response::BlockHash(
get_block_info(&0, tables.block_infos()).unwrap().block_hash,
));
let block_hash_1 = if block_fns.len() > 1 {
Ok(Response::BlockHash(
get_block_info(&1, tables.block_infos()).unwrap().block_hash,
))
} else {
Err(RuntimeError::KeyNotFound)
};
let range_0_1 = Ok(Response::BlockExtendedHeaderInRange(vec![
get_block_extended_header_from_height(&0, &tables).unwrap(),
]));
let range_0_2 = if block_fns.len() >= 2 {
Ok(Response::BlockExtendedHeaderInRange(vec![
get_block_extended_header_from_height(&0, &tables).unwrap(),
get_block_extended_header_from_height(&1, &tables).unwrap(),
]))
} else {
Err(RuntimeError::KeyNotFound)
};
let chain_height = {
let height = chain_height(tables.block_heights()).unwrap();
let block_info = get_block_info(&height.saturating_sub(1), tables.block_infos()).unwrap();
Ok(Response::ChainHeight(height, block_info.block_hash))
};
let cumulative_generated_coins = Ok(Response::GeneratedCoins(cumulative_generated_coins));
let num_req = tables
.outputs_iter()
.keys()
.unwrap()
.map(Result::unwrap)
.map(|key| key.amount)
.collect::<Vec<Amount>>();
let num_resp = Ok(Response::NumberOutputsWithAmount(
num_req
.iter()
.map(|amount| match tables.num_outputs().get(amount) {
// INVARIANT: #[cfg] @ lib.rs asserts `usize == u64`
#[allow(clippy::cast_possible_truncation)]
Ok(count) => (*amount, count as usize),
Err(RuntimeError::KeyNotFound) => (*amount, 0),
Err(e) => panic!(),
})
.collect::<HashMap<Amount, usize>>(),
));
// Contains a fake non-spent key-image.
let ki_req = HashSet::from([[0; 32]]);
let ki_resp = Ok(Response::CheckKIsNotSpent(true));
//----------------------------------------------------------------------- Assert expected response
// Assert read requests lead to the expected responses.
for (request, expected_response) in [
(ReadRequest::BlockExtendedHeader(0), extended_block_header_0),
(ReadRequest::BlockExtendedHeader(1), extended_block_header_1),
(ReadRequest::BlockHash(0), block_hash_0),
(ReadRequest::BlockHash(1), block_hash_1),
(ReadRequest::BlockExtendedHeaderInRange(0..1), range_0_1),
(ReadRequest::BlockExtendedHeaderInRange(0..2), range_0_2),
(ReadRequest::ChainHeight, chain_height),
(ReadRequest::GeneratedCoins, cumulative_generated_coins),
(ReadRequest::NumberOutputsWithAmount(num_req), num_resp),
(ReadRequest::CheckKIsNotSpent(ki_req), ki_resp),
] {
let response = reader.clone().oneshot(request).await;
println!("response: {response:#?}, expected_response: {expected_response:#?}");
match response {
Ok(resp) => assert_eq!(resp, expected_response.unwrap()),
Err(ref e) => assert!(matches!(response, expected_response)),
}
}
//----------------------------------------------------------------------- Key image checks
// Assert each key image we inserted comes back as "spent".
for key_image in tables.key_images_iter().keys().unwrap() {
let key_image = key_image.unwrap();
let request = ReadRequest::CheckKIsNotSpent(HashSet::from([key_image]));
let response = reader.clone().oneshot(request).await;
println!("response: {response:#?}, key_image: {key_image:#?}");
assert_eq!(response.unwrap(), Response::CheckKIsNotSpent(false));
}
//----------------------------------------------------------------------- Output checks
// Create the map of amounts and amount indices.
//
// FIXME: There's definitely a better way to map
// `Vec<PreRctOutputId>` -> `HashMap<u64, HashSet<u64>>`
let (map, output_count) = {
let mut ids = tables
.outputs_iter()
.keys()
.unwrap()
.map(Result::unwrap)
.collect::<Vec<PreRctOutputId>>();
ids.extend(
tables
.rct_outputs_iter()
.keys()
.unwrap()
.map(Result::unwrap)
.map(|amount_index| PreRctOutputId {
amount: 0,
amount_index,
}),
);
// Used later to compare the amount of Outputs
// returned in the Response is equal to the amount
// we asked for.
let output_count = ids.len();
let mut map = HashMap::<Amount, HashSet<AmountIndex>>::new();
for id in ids {
map.entry(id.amount)
.and_modify(|set| {
set.insert(id.amount_index);
})
.or_insert_with(|| HashSet::from([id.amount_index]));
}
(map, output_count)
};
// Map `Output` -> `OutputOnChain`
// This is the expected output from the `Response`.
let outputs_on_chain = map
.iter()
.flat_map(|(amount, amount_index_set)| {
amount_index_set.iter().map(|amount_index| {
let id = PreRctOutputId {
amount: *amount,
amount_index: *amount_index,
};
id_to_output_on_chain(&id, &tables).unwrap()
})
})
.collect::<Vec<OutputOnChain>>();
// Send a request for every output we inserted before.
let request = ReadRequest::Outputs(map.clone());
let response = reader.clone().oneshot(request).await;
println!("Response::Outputs response: {response:#?}");
let Ok(Response::Outputs(response)) = response else {
panic!("{response:#?}")
};
// Assert amount of `Amount`'s are the same.
assert_eq!(map.len(), response.len());
// Assert we get back the same map of
// `Amount`'s and `AmountIndex`'s.
let mut response_output_count = 0;
for (amount, output_map) in response {
let amount_index_set = map.get(&amount).unwrap();
for (amount_index, output) in output_map {
response_output_count += 1;
assert!(amount_index_set.contains(&amount_index));
assert!(outputs_on_chain.contains(&output));
}
}
// Assert the amount of `Output`'s returned is as expected.
let table_output_len = tables.outputs().len().unwrap() + tables.rct_outputs().len().unwrap();
assert_eq!(output_count as u64, table_output_len);
assert_eq!(output_count, response_output_count);
}
//---------------------------------------------------------------------------------------------------- Tests
/// Simply `init()` the service and then drop it.
///
/// If this test fails, something is very wrong.
#[test]
fn init_drop() {
let (reader, writer, _tempdir) = init_service();
let (reader, writer, env, _tempdir) = init_service();
}
// TODO:
// un-comment and fix these tests when all `{read,write}`
// service functions are implemented.
/// Assert write/read correctness of [`block_v1_tx2`].
#[tokio::test]
async fn v1_tx2() {
test_template(
&[block_v1_tx2],
14_535_350_982_449,
AssertTableLen {
block_infos: 1,
block_blobs: 1,
block_heights: 1,
key_images: 65,
num_outputs: 41,
pruned_tx_blobs: 0,
prunable_hashes: 0,
outputs: 111,
prunable_tx_blobs: 0,
rct_outputs: 0,
tx_blobs: 3,
tx_ids: 3,
tx_heights: 3,
tx_unlock_time: 1,
},
)
.await;
}
// /// Send a read request, and receive a response,
// /// asserting the response the expected value.
// #[tokio::test]
// async fn read_request() {
// let (reader, writer, _tempdir) = init_service();
/// Assert write/read correctness of [`block_v9_tx3`].
#[tokio::test]
async fn v9_tx3() {
test_template(
&[block_v9_tx3],
3_403_774_022_163,
AssertTableLen {
block_infos: 1,
block_blobs: 1,
block_heights: 1,
key_images: 4,
num_outputs: 0,
pruned_tx_blobs: 0,
prunable_hashes: 0,
outputs: 0,
prunable_tx_blobs: 0,
rct_outputs: 7,
tx_blobs: 4,
tx_ids: 4,
tx_heights: 4,
tx_unlock_time: 1,
},
)
.await;
}
// for (request, expected_response) in [
// (ReadRequest::Example1, Response::Example1),
// (ReadRequest::Example2(123), Response::Example2(123)),
// (
// ReadRequest::Example3("hello".into()),
// Response::Example3("hello".into()),
// ),
// ] {
// // This calls `poll_ready()` asserting we have a permit before `call()`.
// let response_channel = reader.clone().oneshot(request);
// let response = response_channel.await.unwrap();
// assert_eq!(response, expected_response);
// }
// }
// /// Send a write request, and receive a response,
// /// asserting the response the expected value.
// #[tokio::test]
// async fn write_request() {
// let (reader, mut writer, _tempdir) = init_service();
// for (request, expected_response) in [
// (WriteRequest::Example1, Response::Example1),
// (WriteRequest::Example2(123), Response::Example2(123)),
// (
// WriteRequest::Example3("hello".into()),
// Response::Example3("hello".into()),
// ),
// ] {
// let response_channel = writer.call(request);
// let response = response_channel.await.unwrap();
// assert_eq!(response, expected_response);
// }
// }
/// Assert write/read correctness of [`block_v16_tx0`].
#[tokio::test]
async fn v16_tx0() {
test_template(
&[block_v16_tx0],
600_000_000_000,
AssertTableLen {
block_infos: 1,
block_blobs: 1,
block_heights: 1,
key_images: 0,
num_outputs: 0,
pruned_tx_blobs: 0,
prunable_hashes: 0,
outputs: 0,
prunable_tx_blobs: 0,
rct_outputs: 1,
tx_blobs: 1,
tx_ids: 1,
tx_heights: 1,
tx_unlock_time: 1,
},
)
.await;
}

View file

@ -15,9 +15,12 @@ use cuprate_types::{
};
use crate::{
constants::DATABASE_CORRUPT_MSG,
env::{Env, EnvInner},
error::RuntimeError,
service::types::{ResponseReceiver, ResponseResult, ResponseSender},
ConcreteEnv, Env,
transaction::TxRw,
ConcreteEnv,
};
//---------------------------------------------------------------------------------------------------- Constants
@ -226,5 +229,26 @@ impl DatabaseWriter {
/// [`WriteRequest::WriteBlock`].
#[inline]
fn write_block(env: &ConcreteEnv, block: &VerifiedBlockInformation) -> ResponseResult {
todo!()
let env_inner = env.env_inner();
let tx_rw = env_inner.tx_rw()?;
let result = {
let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
crate::ops::block::add_block(block, &mut tables_mut)
};
match result {
Ok(()) => {
tx_rw.commit()?;
Ok(Response::WriteBlockOk)
}
Err(e) => {
// INVARIANT: ensure database atomicity by aborting
// the transaction on `add_block()` failures.
tx_rw
.abort()
.expect("could not maintain database atomicity by aborting write transaction");
Err(e)
}
}
}

View file

@ -350,10 +350,10 @@ tables! {
/// Maps an output's amount to the number of outputs with that amount.
///
/// For a new output the `AmountIndex` value from this
/// table will be its index in a list of duplicate outputs.
/// For example, if there are 5 outputs with `amount = 123`
/// then calling `get(123)` on this table will return 5.
NumOutputs,
Amount => AmountIndex,
Amount => u64,
/// TODO
PrunedTxBlobs,

View file

@ -16,12 +16,61 @@ use monero_serai::{
ringct::{RctPrunable, RctSignatures},
transaction::{Timelock, Transaction, TransactionPrefix},
};
use pretty_assertions::assert_eq;
use crate::{
config::Config, key::Key, storable::Storable, tables::Tables, transaction::TxRo, ConcreteEnv,
Env, EnvInner,
DatabaseRo, Env, EnvInner,
};
//---------------------------------------------------------------------------------------------------- Struct
/// Named struct to assert the length of all tables.
///
/// This is a struct with fields instead of a function
/// so that callers can name arguments, otherwise the call-site
/// is a little confusing, i.e. `assert_table_len(0, 25, 1, 123)`.
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct AssertTableLen {
pub(crate) block_infos: u64,
pub(crate) block_blobs: u64,
pub(crate) block_heights: u64,
pub(crate) key_images: u64,
pub(crate) num_outputs: u64,
pub(crate) pruned_tx_blobs: u64,
pub(crate) prunable_hashes: u64,
pub(crate) outputs: u64,
pub(crate) prunable_tx_blobs: u64,
pub(crate) rct_outputs: u64,
pub(crate) tx_blobs: u64,
pub(crate) tx_ids: u64,
pub(crate) tx_heights: u64,
pub(crate) tx_unlock_time: u64,
}
impl AssertTableLen {
/// Assert the length of all tables.
pub(crate) fn assert(self, tables: &impl Tables) {
let other = Self {
block_infos: tables.block_infos().len().unwrap(),
block_blobs: tables.block_blobs().len().unwrap(),
block_heights: tables.block_heights().len().unwrap(),
key_images: tables.key_images().len().unwrap(),
num_outputs: tables.num_outputs().len().unwrap(),
pruned_tx_blobs: tables.pruned_tx_blobs().len().unwrap(),
prunable_hashes: tables.prunable_hashes().len().unwrap(),
outputs: tables.outputs().len().unwrap(),
prunable_tx_blobs: tables.prunable_tx_blobs().len().unwrap(),
rct_outputs: tables.rct_outputs().len().unwrap(),
tx_blobs: tables.tx_blobs().len().unwrap(),
tx_ids: tables.tx_ids().len().unwrap(),
tx_heights: tables.tx_heights().len().unwrap(),
tx_unlock_time: tables.tx_unlock_time().len().unwrap(),
};
assert_eq!(self, other);
}
}
//---------------------------------------------------------------------------------------------------- fn
/// Create an `Env` in a temporarily directory.
/// The directory is automatically removed after the `TempDir` is dropped.

View file

@ -0,0 +1,86 @@
//! Wrapper type for partially-`unsafe` usage of `T: !Send`.
//---------------------------------------------------------------------------------------------------- Import
use std::{
borrow::Borrow,
ops::{Deref, DerefMut},
};
use bytemuck::TransparentWrapper;
use crate::storable::StorableVec;
//---------------------------------------------------------------------------------------------------- Aliases
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, TransparentWrapper)]
#[repr(transparent)]
/// A wrapper type that `unsafe`ly implements `Send` for any `T`.
///
/// This is a marker/wrapper type that allows wrapping
/// any type `T` such that it implements `Send`.
///
/// This is to be used when `T` is `Send`, but only in certain
/// situations not provable to the compiler, or is otherwise a
/// a pain to prove and/or less efficient.
///
/// It is up to the users of this type to ensure their
/// usage of `UnsafeSendable` are actually safe.
///
/// Notably, `heed`'s table type uses this inside `service`.
pub(crate) struct UnsafeSendable<T>(T);
#[allow(clippy::non_send_fields_in_send_ty)]
// SAFETY: Users ensure that their usage of this type is safe.
unsafe impl<T> Send for UnsafeSendable<T> {}
impl<T> UnsafeSendable<T> {
/// Create a new [`UnsafeSendable`].
///
/// # Safety
/// By constructing this type, you must ensure the usage
/// of the resulting `Self` is follows all the [`Send`] rules.
pub(crate) const unsafe fn new(t: T) -> Self {
Self(t)
}
/// Extract the inner `T`.
pub(crate) fn into_inner(self) -> T {
self.0
}
}
impl<T> Borrow<T> for UnsafeSendable<T> {
fn borrow(&self) -> &T {
&self.0
}
}
impl<T> AsRef<T> for UnsafeSendable<T> {
fn as_ref(&self) -> &T {
&self.0
}
}
impl<T> AsMut<T> for UnsafeSendable<T> {
fn as_mut(&mut self) -> &mut T {
&mut self.0
}
}
impl<T> Deref for UnsafeSendable<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for UnsafeSendable<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
//---------------------------------------------------------------------------------------------------- Tests
#[cfg(test)]
mod test {
// use super::*;
}

View file

@ -17,7 +17,7 @@ asynch = ["dep:futures", "dep:rayon"]
constants = []
fs = ["dep:dirs"]
num = []
map = []
map = ["dep:monero-serai"]
time = ["dep:chrono", "std"]
thread = ["std", "dep:target_os_lib"]
@ -26,6 +26,7 @@ crossbeam = { workspace = true, optional = true }
chrono = { workspace = true, optional = true, features = ["std", "clock"] }
dirs = { workspace = true, optional = true }
futures = { workspace = true, optional = true, features = ["std"] }
monero-serai = { workspace = true, optional = true }
rayon = { workspace = true, optional = true }
# This is kinda a stupid work around.

View file

@ -5,6 +5,7 @@
//! `#[no_std]` compatible.
//---------------------------------------------------------------------------------------------------- Use
use monero_serai::transaction::Timelock;
//---------------------------------------------------------------------------------------------------- `(u64, u64) <-> u128`
/// Split a [`u128`] value into 2 64-bit values.
@ -53,6 +54,54 @@ pub const fn combine_low_high_bits_to_u128(low_bits: u64, high_bits: u64) -> u12
res | (low_bits as u128)
}
//---------------------------------------------------------------------------------------------------- Timelock
/// Map a [`u64`] to a [`Timelock`].
///
/// Height/time is not differentiated via type, but rather:
/// "height is any value less than 500_000_000 and timestamp is any value above"
/// so the `u64/usize` is stored without any tag.
///
/// See [`timelock_to_u64`] for the inverse function.
///
/// - <https://github.com/Cuprate/cuprate/pull/102#discussion_r1558504285>
/// - <https://github.com/serai-dex/serai/blob/bc1dec79917d37d326ac3d9bc571a64131b0424a/coins/monero/src/transaction.rs#L139>
///
/// ```rust
/// # use cuprate_helper::map::*;
/// # use monero_serai::transaction::*;
/// assert_eq!(u64_to_timelock(0), Timelock::None);
/// assert_eq!(u64_to_timelock(499_999_999), Timelock::Block(499_999_999));
/// assert_eq!(u64_to_timelock(500_000_000), Timelock::Time(500_000_000));
/// ```
pub fn u64_to_timelock(u: u64) -> Timelock {
if u == 0 {
Timelock::None
} else if u < 500_000_000 {
Timelock::Block(usize::try_from(u).unwrap())
} else {
Timelock::Time(u)
}
}
/// Map [`Timelock`] to a [`u64`].
///
/// See [`u64_to_timelock`] for the inverse function and more documentation.
///
/// ```rust
/// # use cuprate_helper::map::*;
/// # use monero_serai::transaction::*;
/// assert_eq!(timelock_to_u64(Timelock::None), 0);
/// assert_eq!(timelock_to_u64(Timelock::Block(499_999_999)), 499_999_999);
/// assert_eq!(timelock_to_u64(Timelock::Time(500_000_000)), 500_000_000);
/// ```
pub fn timelock_to_u64(timelock: Timelock) -> u64 {
match timelock {
Timelock::None => 0,
Timelock::Block(u) => u64::try_from(u).unwrap(),
Timelock::Time(u) => u,
}
}
//---------------------------------------------------------------------------------------------------- Tests
#[cfg(test)]
mod test {}