Merge branch 'main' into peer-set2

This commit is contained in:
Boog900 2024-05-03 17:13:27 +01:00
commit 4b471a947f
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
39 changed files with 1335 additions and 1048 deletions

View file

@ -0,0 +1,62 @@
# MIT License
#
# Copyright (c) 2022-2023 Luke Parker
# Copyright (c) Cuprate developers
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# Initially taken from Serai Dex: https://github.com/serai-dex/serai/blob/b823413c9b7ae6747b9af99e18379cfc49f4271a/.github/actions/monero/action.yml.
name: monerod-download
description: Downloads the core Monero daemon
inputs:
version:
description: "Version to download"
required: false
default: v0.18.3.3
runs:
using: "composite"
steps:
- name: Monero Daemon Cache
id: cache-monerod
uses: actions/cache@v3
with:
path: |
monerod
monerod.exe
key: monerod-${{ runner.os }}-${{ runner.arch }}-${{ inputs.version }}
- name: Download the Monero Daemon
if: steps.cache-monerod.outputs.cache-hit != 'true'
shell: bash
run: |
OS=${{ runner.os }}
ARCH=${{ runner.arch }}
case "$OS $ARCH" in
"Windows X64") FILE=monero-win-x64-${{ inputs.version }}.zip ;;
"Windows X86") FILE=monero-win-x86-${{ inputs.version }}.zip ;;
"Linux X64") FILE=monero-linux-x64-${{ inputs.version }}.tar.bz2 ;;
"Linux X86") FILE=monero-linux-x86-${{ inputs.version }}.tar.bz2 ;;
"macOS X64") FILE=monero-mac-x64-${{ inputs.version }}.tar.bz2 ;;
"macOS ARM64") FILE=monero-mac-armv8-${{ inputs.version }}.tar.bz2 ;;
*) exit 1 ;;
esac
curl -O -L https://downloads.getmonero.org/cli/$FILE
if [[ ${{ runner.os }} == Windows ]]; then
unzip $FILE
mv */monerod.exe monerod.exe
else
tar -xvf $FILE
mv */monerod monerod
fi

View file

@ -77,6 +77,9 @@ jobs:
~/.rustup
key: ${{ matrix.os }}
- name: Download monerod
uses: ./.github/actions/monerod-download
# Packages other than `Boost` used by `Monero` are listed here.
# https://github.com/monero-project/monero/blob/c444a7e002036e834bfb4c68f04a121ce1af5825/.github/workflows/build.yml#L71
@ -103,7 +106,7 @@ jobs:
rustup default stable-x86_64-pc-windows-gnu
- name: Documentation
run: cargo doc --workspace --all-features
run: cargo doc --workspace --all-features --no-deps
- name: Clippy (fail on warnings)
run: cargo clippy --workspace --all-features --all-targets -- -D warnings
@ -113,7 +116,7 @@ jobs:
run: |
cargo test --all-features --workspace
cargo test --package cuprate-database --no-default-features --features redb --features service
# TODO: upload binaries with `actions/upload-artifact@v3`
- name: Build
run: cargo build --all-features --all-targets --workspace

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
target/
.vscode
monerod

713
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -78,7 +78,6 @@ tracing = { version = "0.1.40", default-features = false }
## workspace.dev-dependencies
tempfile = { version = "3" }
reqwest = { version = "0.11.24" }
pretty_assertions = { version = "1.4.0" }
proptest = { version = "1" }
proptest-derive = { version = "0.4.0" }

View file

@ -35,16 +35,17 @@ page_size = { version = "0.6.0" } # Needed for database resizes, they mus
thiserror = { workspace = true }
# `service` feature.
crossbeam = { workspace = true, features = ["std"], optional = true }
futures = { workspace = true, optional = true }
tokio = { workspace = true, features = ["full"], optional = true }
tokio-util = { workspace = true, features = ["full"], optional = true }
tower = { workspace = true, features = ["full"], optional = true }
rayon = { workspace = true, optional = true }
crossbeam = { workspace = true, features = ["std"], optional = true }
futures = { workspace = true, optional = true }
tokio = { workspace = true, features = ["full"], optional = true }
tokio-util = { workspace = true, features = ["full"], optional = true }
tower = { workspace = true, features = ["full"], optional = true }
thread_local = { workspace = true }
rayon = { workspace = true, optional = true }
# Optional features.
heed = { version = "0.20.0-alpha.9", optional = true }
redb = { version = "2.0.0", optional = true }
heed = { version = "0.20.0", features = ["read-txn-no-tls"], optional = true }
redb = { version = "2.1.0", optional = true }
serde = { workspace = true, optional = true }
[dev-dependencies]

View file

@ -137,7 +137,8 @@ impl<T: Table> DatabaseIter<T> for HeedTableRo<'_, T> {
}
//---------------------------------------------------------------------------------------------------- DatabaseRo Impl
impl<T: Table> DatabaseRo<T> for HeedTableRo<'_, T> {
// SAFETY: `HeedTableRo: !Send` as it holds a reference to `heed::RoTxn: Send + !Sync`.
unsafe impl<T: Table> DatabaseRo<T> for HeedTableRo<'_, T> {
#[inline]
fn get(&self, key: &T::Key) -> Result<T::Value, RuntimeError> {
get::<T>(&self.db, self.tx_ro, key)
@ -165,7 +166,9 @@ impl<T: Table> DatabaseRo<T> for HeedTableRo<'_, T> {
}
//---------------------------------------------------------------------------------------------------- DatabaseRw Impl
impl<T: Table> DatabaseRo<T> for HeedTableRw<'_, '_, T> {
// SAFETY: The `Send` bound only applies to `HeedTableRo`.
// `HeedTableRw`'s write transaction is `!Send`.
unsafe impl<T: Table> DatabaseRo<T> for HeedTableRw<'_, '_, T> {
#[inline]
fn get(&self, key: &T::Key) -> Result<T::Value, RuntimeError> {
get::<T>(&self.db, &self.tx_rw.borrow(), key)

View file

@ -185,7 +185,9 @@ impl Env for ConcreteEnv {
// Create the database directory if it doesn't exist.
std::fs::create_dir_all(config.db_directory())?;
// Open the environment in the user's PATH.
let env = env_open_options.open(config.db_directory())?;
// SAFETY: LMDB uses a memory-map backed file.
// <https://docs.rs/heed/0.20.0/heed/struct.EnvOpenOptions.html#method.open>
let env = unsafe { env_open_options.open(config.db_directory())? };
// TODO: Open/create tables with certain flags
// <https://github.com/monero-project/monero/blob/059028a30a8ae9752338a7897329fe8012a310d5/src/blockchain_db/lmdb/db_lmdb.cpp#L1324>

View file

@ -49,10 +49,9 @@ impl From<heed::Error> for crate::InitError {
| E2::Panic => Self::Unknown(Box::new(mdb_error)),
},
E1::InvalidDatabaseTyping
| E1::BadOpenOptions { .. }
| E1::Encoding(_)
| E1::Decoding(_) => Self::Unknown(Box::new(error)),
E1::BadOpenOptions { .. } | E1::Encoding(_) | E1::Decoding(_) => {
Self::Unknown(Box::new(error))
}
}
}
}
@ -139,11 +138,9 @@ impl From<heed::Error> for crate::RuntimeError {
},
// Only if we write incorrect code.
E1::InvalidDatabaseTyping
| E1::DatabaseClosing
| E1::BadOpenOptions { .. }
| E1::Encoding(_)
| E1::Decoding(_) => panic!("fix the database code! {error:#?}"),
E1::DatabaseClosing | E1::BadOpenOptions { .. } | E1::Encoding(_) | E1::Decoding(_) => {
panic!("fix the database code! {error:#?}")
}
}
}
}

View file

@ -118,7 +118,8 @@ impl<T: Table + 'static> DatabaseIter<T> for RedbTableRo<T::Key, T::Value> {
}
//---------------------------------------------------------------------------------------------------- DatabaseRo
impl<T: Table + 'static> DatabaseRo<T> for RedbTableRo<T::Key, T::Value> {
// SAFETY: Both `redb`'s transaction and table types are `Send + Sync`.
unsafe impl<T: Table + 'static> DatabaseRo<T> for RedbTableRo<T::Key, T::Value> {
#[inline]
fn get(&self, key: &T::Key) -> Result<T::Value, RuntimeError> {
get::<T>(self, key)
@ -146,7 +147,8 @@ impl<T: Table + 'static> DatabaseRo<T> for RedbTableRo<T::Key, T::Value> {
}
//---------------------------------------------------------------------------------------------------- DatabaseRw
impl<T: Table + 'static> DatabaseRo<T> for RedbTableRw<'_, T::Key, T::Value> {
// SAFETY: Both `redb`'s transaction and table types are `Send + Sync`.
unsafe impl<T: Table + 'static> DatabaseRo<T> for RedbTableRw<'_, T::Key, T::Value> {
#[inline]
fn get(&self, key: &T::Key) -> Result<T::Value, RuntimeError> {
get::<T>(self, key)

View file

@ -84,6 +84,7 @@ impl Env for ConcreteEnv {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(config.db_file())?;
env_builder.create_file(db_file)?

View file

@ -82,7 +82,29 @@ pub trait DatabaseIter<T: Table> {
///
/// This is a read-only database table,
/// write operations are defined in [`DatabaseRw`].
pub trait DatabaseRo<T: Table> {
///
/// # Safety
/// The table type that implements this MUST be `Send`.
///
/// However if the table holds a reference to a transaction:
/// - only the transaction only has to be `Send`
/// - the table cannot implement `Send`
///
/// For example:
///
/// `heed`'s transactions are `Send` but `HeedTableRo` contains a `&`
/// to the transaction, as such, if `Send` were implemented on `HeedTableRo`
/// then 1 transaction could be used to open multiple tables, then sent to
/// other threads - this would be a soundness hole against `HeedTableRo`.
///
/// `&T` is only `Send` if `T: Sync`.
///
/// `heed::RoTxn: !Sync`, therefore our table
/// holding `&heed::RoTxn` must NOT be `Send`.
///
/// - <https://doc.rust-lang.org/std/marker/trait.Sync.html>
/// - <https://doc.rust-lang.org/nomicon/send-and-sync.html>
pub unsafe trait DatabaseRo<T: Table> {
/// Get the value corresponding to a key.
///
/// The returned value is _owned_.

View file

@ -12,7 +12,7 @@ use crate::{
tables::{
call_fn_on_all_tables_or_early_return, BlockBlobs, BlockHeights, BlockInfos, KeyImages,
NumOutputs, Outputs, PrunableHashes, PrunableTxBlobs, PrunedTxBlobs, RctOutputs, Tables,
TablesMut, TxHeights, TxIds, TxUnlockTime,
TablesIter, TablesMut, TxHeights, TxIds, TxUnlockTime,
},
transaction::{TxRo, TxRw},
};
@ -233,7 +233,7 @@ where
///
/// # Errors
/// TODO
fn open_tables(&self, tx_ro: &Ro) -> Result<impl Tables, RuntimeError> {
fn open_tables(&self, tx_ro: &Ro) -> Result<impl TablesIter, RuntimeError> {
call_fn_on_all_tables_or_early_return! {
Self::open_db_ro(self, tx_ro)
}

View file

@ -1,6 +1,4 @@
//! General free functions (related to the database).
//!
//! TODO.
//---------------------------------------------------------------------------------------------------- Import

View file

@ -191,6 +191,10 @@
// although it is sometimes nice.
clippy::must_use_candidate,
// FIXME: good lint but too many false positives
// with our `Env` + `RwLock` setup.
clippy::significant_drop_tightening,
// TODO: should be removed after all `todo!()`'s are gone.
clippy::diverging_sub_expression,
@ -238,7 +242,7 @@ pub use env::{Env, EnvInner};
mod error;
pub use error::{InitError, RuntimeError};
mod free;
pub(crate) mod free;
pub mod resize;
@ -269,3 +273,6 @@ pub mod service;
//---------------------------------------------------------------------------------------------------- Private
#[cfg(test)]
pub(crate) mod tests;
#[cfg(feature = "service")] // only needed in `service` for now
pub(crate) mod unsafe_sendable;

View file

@ -89,8 +89,14 @@ pub fn add_block(
}
//------------------------------------------------------ Transaction / Outputs / Key Images
for tx_verification_data in &block.txs {
add_tx(tx_verification_data, &chain_height, tables)?;
// Add the miner transaction first.
{
let tx = &block.block.miner_tx;
add_tx(tx, &tx.serialize(), &tx.hash(), &chain_height, tables)?;
}
for tx in &block.txs {
add_tx(&tx.tx, &tx.tx_blob, &tx.tx_hash, &chain_height, tables)?;
}
//------------------------------------------------------ Block Info
@ -161,6 +167,7 @@ pub fn pop_block(
let block = Block::read(&mut block_blob.as_slice())?;
//------------------------------------------------------ Transaction / Outputs / Key Images
remove_tx(&block.miner_tx.hash(), tables)?;
for tx_hash in &block.txs {
remove_tx(tx_hash, tables)?;
}
@ -226,6 +233,16 @@ pub fn get_block_extended_header_top(
}
//---------------------------------------------------------------------------------------------------- Misc
/// Retrieve a [`BlockInfo`] via its [`BlockHeight`].
#[doc = doc_error!()]
#[inline]
pub fn get_block_info(
block_height: &BlockHeight,
table_block_infos: &impl DatabaseRo<BlockInfos>,
) -> Result<BlockInfo, RuntimeError> {
table_block_infos.get(block_height)
}
/// Retrieve a [`BlockHeight`] via its [`BlockHash`].
#[doc = doc_error!()]
#[inline]
@ -262,7 +279,7 @@ mod test {
use super::*;
use crate::{
ops::tx::{get_tx, tx_exists},
tests::{assert_all_tables_are_empty, tmp_concrete_env},
tests::{assert_all_tables_are_empty, tmp_concrete_env, AssertTableLen},
Env,
};
@ -315,20 +332,23 @@ mod test {
let tables = env_inner.open_tables(&tx_ro).unwrap();
// Assert only the proper tables were added to.
assert_eq!(tables.block_infos().len().unwrap(), 3);
assert_eq!(tables.block_blobs().len().unwrap(), 3);
assert_eq!(tables.block_heights().len().unwrap(), 3);
assert_eq!(tables.key_images().len().unwrap(), 69);
assert_eq!(tables.num_outputs().len().unwrap(), 38);
assert_eq!(tables.pruned_tx_blobs().len().unwrap(), 0);
assert_eq!(tables.prunable_hashes().len().unwrap(), 0);
assert_eq!(tables.outputs().len().unwrap(), 107);
assert_eq!(tables.prunable_tx_blobs().len().unwrap(), 0);
assert_eq!(tables.rct_outputs().len().unwrap(), 6);
assert_eq!(tables.tx_blobs().len().unwrap(), 5);
assert_eq!(tables.tx_ids().len().unwrap(), 5);
assert_eq!(tables.tx_heights().len().unwrap(), 5);
assert_eq!(tables.tx_unlock_time().len().unwrap(), 0);
AssertTableLen {
block_infos: 3,
block_blobs: 3,
block_heights: 3,
key_images: 69,
num_outputs: 41,
pruned_tx_blobs: 0,
prunable_hashes: 0,
outputs: 111,
prunable_tx_blobs: 0,
rct_outputs: 8,
tx_blobs: 8,
tx_ids: 8,
tx_heights: 8,
tx_unlock_time: 3,
}
.assert(&tables);
// Check `cumulative` functions work.
assert_eq!(

View file

@ -101,7 +101,7 @@ mod test {
block::add_block,
tx::{get_tx, tx_exists},
},
tests::{assert_all_tables_are_empty, tmp_concrete_env},
tests::{assert_all_tables_are_empty, tmp_concrete_env, AssertTableLen},
Env,
};
@ -149,6 +149,24 @@ mod test {
}
// Assert reads are correct.
AssertTableLen {
block_infos: 3,
block_blobs: 3,
block_heights: 3,
key_images: 69,
num_outputs: 41,
pruned_tx_blobs: 0,
prunable_hashes: 0,
outputs: 111,
prunable_tx_blobs: 0,
rct_outputs: 8,
tx_blobs: 8,
tx_ids: 8,
tx_heights: 8,
tx_unlock_time: 3,
}
.assert(&tables);
assert_eq!(blocks_len, chain_height(tables.block_heights()).unwrap());
assert_eq!(
blocks_len - 1,

View file

@ -64,7 +64,7 @@ mod test {
use super::*;
use crate::{
ops::tx::{get_tx, tx_exists},
tests::{assert_all_tables_are_empty, tmp_concrete_env},
tests::{assert_all_tables_are_empty, tmp_concrete_env, AssertTableLen},
Env,
};
@ -108,23 +108,11 @@ mod test {
let tables = env_inner.open_tables(&tx_ro).unwrap();
// Assert only the proper tables were added to.
assert_eq!(
tables.key_images().len().unwrap(),
u64::try_from(key_images.len()).unwrap()
);
assert_eq!(tables.block_infos().len().unwrap(), 0);
assert_eq!(tables.block_blobs().len().unwrap(), 0);
assert_eq!(tables.block_heights().len().unwrap(), 0);
assert_eq!(tables.num_outputs().len().unwrap(), 0);
assert_eq!(tables.pruned_tx_blobs().len().unwrap(), 0);
assert_eq!(tables.prunable_hashes().len().unwrap(), 0);
assert_eq!(tables.outputs().len().unwrap(), 0);
assert_eq!(tables.prunable_tx_blobs().len().unwrap(), 0);
assert_eq!(tables.rct_outputs().len().unwrap(), 0);
assert_eq!(tables.tx_blobs().len().unwrap(), 0);
assert_eq!(tables.tx_ids().len().unwrap(), 0);
assert_eq!(tables.tx_heights().len().unwrap(), 0);
assert_eq!(tables.tx_unlock_time().len().unwrap(), 0);
AssertTableLen {
key_images: tables.key_images().len().unwrap(),
..Default::default()
}
.assert(&tables);
for key_image in &key_images {
println!("key_image_exists(): {}", hex::encode(key_image));

View file

@ -1,7 +1,12 @@
//! Outputs.
use cuprate_helper::map::u64_to_timelock;
use curve25519_dalek::{constants::ED25519_BASEPOINT_POINT, edwards::CompressedEdwardsY, Scalar};
//---------------------------------------------------------------------------------------------------- Import
use monero_serai::transaction::{Timelock, Transaction};
use monero_serai::{
transaction::{Timelock, Transaction},
H,
};
use cuprate_types::{OutputOnChain, VerifiedBlockInformation};
@ -17,8 +22,8 @@ use crate::{
},
transaction::{TxRo, TxRw},
types::{
Amount, AmountIndex, BlockHash, BlockHeight, BlockInfo, KeyImage, Output, PreRctOutputId,
RctOutput, TxHash,
Amount, AmountIndex, BlockHash, BlockHeight, BlockInfo, KeyImage, Output, OutputFlags,
PreRctOutputId, RctOutput, TxHash,
},
};
@ -153,13 +158,110 @@ pub fn get_rct_num_outputs(
table_rct_outputs.len()
}
//---------------------------------------------------------------------------------------------------- Mapping functions
/// Map an [`Output`] to a [`cuprate_types::OutputOnChain`].
#[doc = doc_error!()]
pub fn output_to_output_on_chain(
output: &Output,
amount: Amount,
table_tx_unlock_time: &impl DatabaseRo<TxUnlockTime>,
) -> Result<OutputOnChain, RuntimeError> {
// FIXME: implement lookup table for common values:
// <https://github.com/monero-project/monero/blob/c8214782fb2a769c57382a999eaf099691c836e7/src/ringct/rctOps.cpp#L322>
let commitment = ED25519_BASEPOINT_POINT + H() * Scalar::from(amount);
let time_lock = if output
.output_flags
.contains(OutputFlags::NON_ZERO_UNLOCK_TIME)
{
u64_to_timelock(table_tx_unlock_time.get(&output.tx_idx)?)
} else {
Timelock::None
};
let key = CompressedEdwardsY::from_slice(&output.key)
.map(|y| y.decompress())
.unwrap_or(None);
Ok(OutputOnChain {
height: u64::from(output.height),
time_lock,
key,
commitment,
})
}
/// Map an [`RctOutput`] to a [`cuprate_types::OutputOnChain`].
///
/// # Panics
/// This function will panic if `rct_output`'s `commitment` fails to decompress
/// into a valid [`EdwardsPoint`](curve25519_dalek::edwards::EdwardsPoint).
///
/// This should normally not happen as commitments that
/// are stored in the database should always be valid.
#[doc = doc_error!()]
pub fn rct_output_to_output_on_chain(
rct_output: &RctOutput,
table_tx_unlock_time: &impl DatabaseRo<TxUnlockTime>,
) -> Result<OutputOnChain, RuntimeError> {
// INVARIANT: Commitments stored are valid when stored by the database.
let commitment = CompressedEdwardsY::from_slice(&rct_output.commitment)
.unwrap()
.decompress()
.unwrap();
let time_lock = if rct_output
.output_flags
.contains(OutputFlags::NON_ZERO_UNLOCK_TIME)
{
u64_to_timelock(table_tx_unlock_time.get(&rct_output.tx_idx)?)
} else {
Timelock::None
};
let key = CompressedEdwardsY::from_slice(&rct_output.key)
.map(|y| y.decompress())
.unwrap_or(None);
Ok(OutputOnChain {
height: u64::from(rct_output.height),
time_lock,
key,
commitment,
})
}
/// Map an [`PreRctOutputId`] to an [`OutputOnChain`].
///
/// Note that this still support RCT outputs, in that case, [`PreRctOutputId::amount`] should be `0`.
#[doc = doc_error!()]
pub fn id_to_output_on_chain(
id: &PreRctOutputId,
tables: &impl Tables,
) -> Result<OutputOnChain, RuntimeError> {
// v2 transactions.
if id.amount == 0 {
let rct_output = get_rct_output(&id.amount_index, tables.rct_outputs())?;
let output_on_chain = rct_output_to_output_on_chain(&rct_output, tables.tx_unlock_time())?;
Ok(output_on_chain)
} else {
// v1 transactions.
let output = get_output(id, tables.outputs())?;
let output_on_chain =
output_to_output_on_chain(&output, id.amount, tables.tx_unlock_time())?;
Ok(output_on_chain)
}
}
//---------------------------------------------------------------------------------------------------- Tests
#[cfg(test)]
#[allow(clippy::significant_drop_tightening, clippy::cognitive_complexity)]
mod test {
use super::*;
use crate::{
tests::{assert_all_tables_are_empty, tmp_concrete_env},
tests::{assert_all_tables_are_empty, tmp_concrete_env, AssertTableLen},
types::OutputFlags,
Env,
};
@ -221,20 +323,23 @@ mod test {
// Assert all reads of the outputs are OK.
{
// Assert proper tables were added to.
assert_eq!(tables.block_infos().len().unwrap(), 0);
assert_eq!(tables.block_blobs().len().unwrap(), 0);
assert_eq!(tables.block_heights().len().unwrap(), 0);
assert_eq!(tables.key_images().len().unwrap(), 0);
assert_eq!(tables.num_outputs().len().unwrap(), 1);
assert_eq!(tables.pruned_tx_blobs().len().unwrap(), 0);
assert_eq!(tables.prunable_hashes().len().unwrap(), 0);
assert_eq!(tables.outputs().len().unwrap(), 1);
assert_eq!(tables.prunable_tx_blobs().len().unwrap(), 0);
assert_eq!(tables.rct_outputs().len().unwrap(), 1);
assert_eq!(tables.tx_blobs().len().unwrap(), 0);
assert_eq!(tables.tx_ids().len().unwrap(), 0);
assert_eq!(tables.tx_heights().len().unwrap(), 0);
assert_eq!(tables.tx_unlock_time().len().unwrap(), 0);
AssertTableLen {
block_infos: 0,
block_blobs: 0,
block_heights: 0,
key_images: 0,
num_outputs: 1,
pruned_tx_blobs: 0,
prunable_hashes: 0,
outputs: 1,
prunable_tx_blobs: 0,
rct_outputs: 1,
tx_blobs: 0,
tx_ids: 0,
tx_heights: 0,
tx_unlock_time: 0,
}
.assert(&tables);
// Assert length is correct.
assert_eq!(get_num_outputs(tables.outputs()).unwrap(), 1);

View file

@ -36,7 +36,7 @@ use super::{
};
//---------------------------------------------------------------------------------------------------- Private
/// Add a [`TransactionVerificationData`] to the database.
/// Add a [`Transaction`] (and related data) to the database.
///
/// The `block_height` is the block that this `tx` belongs to.
///
@ -63,18 +63,20 @@ use super::{
#[doc = doc_error!()]
#[inline]
pub fn add_tx(
tx: &TransactionVerificationData,
tx: &Transaction,
tx_blob: &Vec<u8>,
tx_hash: &TxHash,
block_height: &BlockHeight,
tables: &mut impl TablesMut,
) -> Result<TxId, RuntimeError> {
let tx_id = get_num_tx(tables.tx_ids_mut())?;
//------------------------------------------------------ Transaction data
tables.tx_ids_mut().put(&tx.tx_hash, &tx_id)?;
tables.tx_ids_mut().put(tx_hash, &tx_id)?;
tables.tx_heights_mut().put(&tx_id, block_height)?;
tables
.tx_blobs_mut()
.put(&tx_id, StorableVec::wrap_ref(&tx.tx_blob))?;
.put(&tx_id, StorableVec::wrap_ref(tx_blob))?;
//------------------------------------------------------ Timelocks
// Height/time is not differentiated via type, but rather:
@ -82,7 +84,7 @@ pub fn add_tx(
// so the `u64/usize` is stored without any tag.
//
// <https://github.com/Cuprate/cuprate/pull/102#discussion_r1558504285>
match tx.tx.prefix.timelock {
match tx.prefix.timelock {
Timelock::None => (),
Timelock::Block(height) => tables.tx_unlock_time_mut().put(&tx_id, &(height as u64))?,
Timelock::Time(time) => tables.tx_unlock_time_mut().put(&tx_id, &time)?,
@ -95,8 +97,6 @@ pub fn add_tx(
// }
//------------------------------------------------------
// Refer to the inner transaction type from now on.
let tx: &Transaction = &tx.tx;
let Ok(height) = u32::try_from(*block_height) else {
panic!("add_tx(): block_height ({block_height}) > u32::MAX");
};
@ -128,7 +128,6 @@ pub fn add_tx(
};
let mut amount_indices = Vec::with_capacity(tx.prefix.outputs.len());
let tx_idx = get_num_tx(tables.tx_ids_mut())?;
for (i, output) in tx.prefix.outputs.iter().enumerate() {
let key = *output.key.as_bytes();
@ -151,7 +150,7 @@ pub fn add_tx(
key,
height,
output_flags,
tx_idx,
tx_idx: tx_id,
commitment,
},
tables.rct_outputs_mut(),
@ -164,7 +163,7 @@ pub fn add_tx(
key,
height,
output_flags,
tx_idx,
tx_idx: tx_id,
},
tables,
)?
@ -178,7 +177,7 @@ pub fn add_tx(
key,
height,
output_flags,
tx_idx,
tx_idx: tx_id,
commitment,
},
tables.rct_outputs_mut(),
@ -343,7 +342,7 @@ pub fn tx_exists(
mod test {
use super::*;
use crate::{
tests::{assert_all_tables_are_empty, tmp_concrete_env},
tests::{assert_all_tables_are_empty, tmp_concrete_env, AssertTableLen},
Env,
};
use cuprate_test_utils::data::{tx_v1_sig0, tx_v1_sig2, tx_v2_rct3};
@ -368,7 +367,7 @@ mod test {
.iter()
.map(|tx| {
println!("add_tx(): {tx:#?}");
add_tx(tx, &0, &mut tables).unwrap()
add_tx(&tx.tx, &tx.tx_blob, &tx.tx_hash, &0, &mut tables).unwrap()
})
.collect::<Vec<TxId>>();
@ -384,20 +383,23 @@ mod test {
let tables = env_inner.open_tables(&tx_ro).unwrap();
// Assert only the proper tables were added to.
assert_eq!(tables.block_infos().len().unwrap(), 0);
assert_eq!(tables.block_blobs().len().unwrap(), 0);
assert_eq!(tables.block_heights().len().unwrap(), 0);
assert_eq!(tables.key_images().len().unwrap(), 4); // added to key images
assert_eq!(tables.pruned_tx_blobs().len().unwrap(), 0);
assert_eq!(tables.prunable_hashes().len().unwrap(), 0);
assert_eq!(tables.num_outputs().len().unwrap(), 9);
assert_eq!(tables.outputs().len().unwrap(), 10); // added to outputs
assert_eq!(tables.prunable_tx_blobs().len().unwrap(), 0);
assert_eq!(tables.rct_outputs().len().unwrap(), 2);
assert_eq!(tables.tx_blobs().len().unwrap(), 3);
assert_eq!(tables.tx_ids().len().unwrap(), 3);
assert_eq!(tables.tx_heights().len().unwrap(), 3);
assert_eq!(tables.tx_unlock_time().len().unwrap(), 1); // only 1 has a timelock
AssertTableLen {
block_infos: 0,
block_blobs: 0,
block_heights: 0,
key_images: 4, // added to key images
pruned_tx_blobs: 0,
prunable_hashes: 0,
num_outputs: 9,
outputs: 10, // added to outputs
prunable_tx_blobs: 0,
rct_outputs: 2,
tx_blobs: 3,
tx_ids: 3,
tx_heights: 3,
tx_unlock_time: 1, // only 1 has a timelock
}
.assert(&tables);
// Both from ID and hash should result in getting the same transaction.
let mut tx_hashes = vec![];

View file

@ -3,27 +3,46 @@
//---------------------------------------------------------------------------------------------------- Import
use std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
ops::Range,
sync::Arc,
sync::{Arc, RwLock},
task::{Context, Poll},
};
use cfg_if::cfg_if;
use crossbeam::channel::Receiver;
use curve25519_dalek::{constants::ED25519_BASEPOINT_POINT, edwards::CompressedEdwardsY, Scalar};
use futures::{channel::oneshot, ready};
use monero_serai::{transaction::Timelock, H};
use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
use thread_local::ThreadLocal;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::PollSemaphore;
use cuprate_helper::asynch::InfallibleOneshotReceiver;
use cuprate_types::service::{ReadRequest, Response};
use cuprate_types::{
service::{ReadRequest, Response},
ExtendedBlockHeader, OutputOnChain,
};
use crate::{
config::ReaderThreads,
constants::DATABASE_CORRUPT_MSG,
error::RuntimeError,
ops::{
block::{get_block_extended_header_from_height, get_block_info},
blockchain::{cumulative_generated_coins, top_block_height},
key_image::key_image_exists,
output::{
get_output, get_rct_output, id_to_output_on_chain, output_to_output_on_chain,
rct_output_to_output_on_chain,
},
},
service::types::{ResponseReceiver, ResponseResult, ResponseSender},
types::{Amount, AmountIndex, BlockHeight, KeyImage},
ConcreteEnv,
tables::{BlockHeights, BlockInfos, KeyImages, NumOutputs, Outputs, Tables},
types::{Amount, AmountIndex, BlockHeight, KeyImage, OutputFlags, PreRctOutputId},
unsafe_sendable::UnsafeSendable,
ConcreteEnv, DatabaseRo, Env, EnvInner,
};
//---------------------------------------------------------------------------------------------------- DatabaseReadHandle
@ -65,10 +84,10 @@ pub struct DatabaseReadHandle {
impl Clone for DatabaseReadHandle {
fn clone(&self) -> Self {
Self {
pool: self.pool.clone(),
pool: Arc::clone(&self.pool),
semaphore: self.semaphore.clone(),
permit: None,
env: self.env.clone(),
env: Arc::clone(&self.env),
}
}
}
@ -106,23 +125,21 @@ impl DatabaseReadHandle {
}
}
/// TODO
/// Access to the actual database environment.
///
/// # ⚠️ Warning
/// This function gives you access to the actual
/// underlying database connected to by `self`.
///
/// I.e. it allows you to read/write data _directly_
/// instead of going through a request.
///
/// Be warned that using the database directly
/// in this manner has not been tested.
#[inline]
pub const fn env(&self) -> &Arc<ConcreteEnv> {
&self.env
}
/// TODO
#[inline]
pub const fn semaphore(&self) -> &PollSemaphore {
&self.semaphore
}
/// TODO
#[inline]
pub const fn permit(&self) -> &Option<OwnedSemaphorePermit> {
&self.permit
}
}
impl tower::Service<ReadRequest> for DatabaseReadHandle {
@ -163,9 +180,11 @@ impl tower::Service<ReadRequest> for DatabaseReadHandle {
//
// INVARIANT:
// The below `DatabaseReader` function impl block relies on this behavior.
let env = Arc::clone(self.env());
self.pool
.spawn(move || map_request(permit, env, request, response_sender));
let env = Arc::clone(&self.env);
self.pool.spawn(move || {
let _permit: OwnedSemaphorePermit = permit;
map_request(&env, request, response_sender);
}); // drop(permit/env);
InfallibleOneshotReceiver::from(receiver)
}
@ -175,7 +194,6 @@ impl tower::Service<ReadRequest> for DatabaseReadHandle {
// This function maps [`Request`]s to function calls
// executed by the rayon DB reader threadpool.
#[allow(clippy::needless_pass_by_value)] // TODO: fix me
/// Map [`Request`]'s to specific database handler functions.
///
/// This is the main entrance into all `Request` handler functions.
@ -184,23 +202,23 @@ impl tower::Service<ReadRequest> for DatabaseReadHandle {
/// 2. Handler function is called
/// 3. [`Response`] is sent
fn map_request(
_permit: OwnedSemaphorePermit, // Permit for this request, dropped at end of function
env: Arc<ConcreteEnv>, // Access to the database
request: ReadRequest, // The request we must fulfill
env: &ConcreteEnv, // Access to the database
request: ReadRequest, // The request we must fulfill
response_sender: ResponseSender, // The channel we must send the response back to
) {
/* TODO: pre-request handling, run some code for each request? */
use ReadRequest as R;
/* TODO: pre-request handling, run some code for each request? */
let response = match request {
R::BlockExtendedHeader(block) => block_extended_header(&env, block),
R::BlockHash(block) => block_hash(&env, block),
R::BlockExtendedHeaderInRange(range) => block_extended_header_in_range(&env, range),
R::ChainHeight => chain_height(&env),
R::GeneratedCoins => generated_coins(&env),
R::Outputs(map) => outputs(&env, map),
R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(&env, vec),
R::CheckKIsNotSpent(set) => check_k_is_not_spent(&env, set),
R::BlockExtendedHeader(block) => block_extended_header(env, block),
R::BlockHash(block) => block_hash(env, block),
R::BlockExtendedHeaderInRange(range) => block_extended_header_in_range(env, range),
R::ChainHeight => chain_height(env),
R::GeneratedCoins => generated_coins(env),
R::Outputs(map) => outputs(env, map),
R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(env, vec),
R::CheckKIsNotSpent(set) => check_k_is_not_spent(env, set),
};
if let Err(e) = response_sender.send(response) {
@ -211,6 +229,54 @@ fn map_request(
/* TODO: post-request handling, run some code for each request? */
}
//---------------------------------------------------------------------------------------------------- Thread Local
/// Q: Why does this exist?
///
/// A1: `heed`'s transactions and tables are not `Sync`, so we cannot use
/// them with rayon, however, we set a feature such that they are `Send`.
///
/// A2: When sending to rayon, we want to ensure each read transaction
/// is only being used by 1 thread only to scale reads
///
/// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1576762346>
#[inline]
fn thread_local<T: Send>(env: &impl Env) -> ThreadLocal<T> {
ThreadLocal::with_capacity(env.config().reader_threads.as_threads().get())
}
/// Take in a `ThreadLocal<impl Tables>` and return an `&impl Tables + Send`.
///
/// # Safety
/// See [`DatabaseRo`] docs.
///
/// We are safely using `UnsafeSendable` in `service`'s reader thread-pool
/// as we are pairing our usage with `ThreadLocal` - only 1 thread
/// will ever access a transaction at a time. This is an INVARIANT.
///
/// A `Mutex` was considered but:
/// - It is less performant
/// - It isn't technically needed for safety in our use-case
/// - It causes `DatabaseIter` function return issues as there is a `MutexGuard` object
///
/// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1581684698>
///
/// # Notes
/// This is used for other backends as well instead of branching with `cfg_if`.
/// The other backends (as of current) are `Send + Sync` so this is fine.
/// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1585618374>
macro_rules! get_tables {
($env_inner:ident, $tx_ro:ident, $tables:ident) => {{
$tables.get_or_try(|| {
#[allow(clippy::significant_drop_in_scrutinee)]
match $env_inner.open_tables($tx_ro) {
// SAFETY: see above macro doc comment.
Ok(tables) => Ok(unsafe { crate::unsafe_sendable::UnsafeSendable::new(tables) }),
Err(e) => Err(e),
}
})
}};
}
//---------------------------------------------------------------------------------------------------- Handler functions
// These are the actual functions that do stuff according to the incoming [`Request`].
//
@ -228,57 +294,211 @@ fn map_request(
// All functions below assume that this is the case, such that
// `par_*()` functions will not block the _global_ rayon thread-pool.
// TODO: implement multi-transaction read atomicity.
// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1576874589>.
/// [`ReadRequest::BlockExtendedHeader`].
#[inline]
fn block_extended_header(env: &Arc<ConcreteEnv>, block_height: BlockHeight) -> ResponseResult {
todo!()
fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
// Single-threaded, no `ThreadLocal` required.
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let tables = env_inner.open_tables(&tx_ro)?;
Ok(Response::BlockExtendedHeader(
get_block_extended_header_from_height(&block_height, &tables)?,
))
}
/// [`ReadRequest::BlockHash`].
#[inline]
fn block_hash(env: &Arc<ConcreteEnv>, block_height: BlockHeight) -> ResponseResult {
todo!()
fn block_hash(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult {
// Single-threaded, no `ThreadLocal` required.
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
Ok(Response::BlockHash(
get_block_info(&block_height, &table_block_infos)?.block_hash,
))
}
/// [`ReadRequest::BlockExtendedHeaderInRange`].
#[inline]
fn block_extended_header_in_range(
env: &Arc<ConcreteEnv>,
env: &ConcreteEnv,
range: std::ops::Range<BlockHeight>,
) -> ResponseResult {
todo!()
// Prepare tx/tables in `ThreadLocal`.
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
// Collect results using `rayon`.
let vec = range
.into_par_iter()
.map(|block_height| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
get_block_extended_header_from_height(&block_height, tables)
})
.collect::<Result<Vec<ExtendedBlockHeader>, RuntimeError>>()?;
Ok(Response::BlockExtendedHeaderInRange(vec))
}
/// [`ReadRequest::ChainHeight`].
#[inline]
fn chain_height(env: &Arc<ConcreteEnv>) -> ResponseResult {
todo!()
fn chain_height(env: &ConcreteEnv) -> ResponseResult {
// Single-threaded, no `ThreadLocal` required.
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
let chain_height = crate::ops::blockchain::chain_height(&table_block_heights)?;
let block_hash =
get_block_info(&chain_height.saturating_sub(1), &table_block_infos)?.block_hash;
Ok(Response::ChainHeight(chain_height, block_hash))
}
/// [`ReadRequest::GeneratedCoins`].
#[inline]
fn generated_coins(env: &Arc<ConcreteEnv>) -> ResponseResult {
todo!()
fn generated_coins(env: &ConcreteEnv) -> ResponseResult {
// Single-threaded, no `ThreadLocal` required.
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro()?;
let table_block_heights = env_inner.open_db_ro::<BlockHeights>(&tx_ro)?;
let table_block_infos = env_inner.open_db_ro::<BlockInfos>(&tx_ro)?;
let top_height = top_block_height(&table_block_heights)?;
Ok(Response::GeneratedCoins(cumulative_generated_coins(
&top_height,
&table_block_infos,
)?))
}
/// [`ReadRequest::Outputs`].
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn outputs(env: &Arc<ConcreteEnv>, map: HashMap<Amount, HashSet<AmountIndex>>) -> ResponseResult {
todo!()
fn outputs(env: &ConcreteEnv, outputs: HashMap<Amount, HashSet<AmountIndex>>) -> ResponseResult {
// Prepare tx/tables in `ThreadLocal`.
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
// The 2nd mapping function.
// This is pulled out from the below `map()` for readability.
let inner_map = |amount, amount_index| -> Result<(AmountIndex, OutputOnChain), RuntimeError> {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
let id = PreRctOutputId {
amount,
amount_index,
};
let output_on_chain = id_to_output_on_chain(&id, tables)?;
Ok((amount_index, output_on_chain))
};
// Collect results using `rayon`.
let map = outputs
.into_par_iter()
.map(|(amount, amount_index_set)| {
Ok((
amount,
amount_index_set
.into_par_iter()
.map(|amount_index| inner_map(amount, amount_index))
.collect::<Result<HashMap<AmountIndex, OutputOnChain>, RuntimeError>>()?,
))
})
.collect::<Result<HashMap<Amount, HashMap<AmountIndex, OutputOnChain>>, RuntimeError>>()?;
Ok(Response::Outputs(map))
}
/// [`ReadRequest::NumberOutputsWithAmount`].
/// TODO
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn number_outputs_with_amount(env: &Arc<ConcreteEnv>, vec: Vec<Amount>) -> ResponseResult {
todo!()
fn number_outputs_with_amount(env: &ConcreteEnv, amounts: Vec<Amount>) -> ResponseResult {
// Prepare tx/tables in `ThreadLocal`.
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
// Cache the amount of RCT outputs once.
// INVARIANT: #[cfg] @ lib.rs asserts `usize == u64`
#[allow(clippy::cast_possible_truncation)]
let num_rct_outputs = {
let tx_ro = env_inner.tx_ro()?;
let tables = env_inner.open_tables(&tx_ro)?;
tables.rct_outputs().len()? as usize
};
// Collect results using `rayon`.
let map = amounts
.into_par_iter()
.map(|amount| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
if amount == 0 {
// v2 transactions.
Ok((amount, num_rct_outputs))
} else {
// v1 transactions.
match tables.num_outputs().get(&amount) {
// INVARIANT: #[cfg] @ lib.rs asserts `usize == u64`
#[allow(clippy::cast_possible_truncation)]
Ok(count) => Ok((amount, count as usize)),
// If we get a request for an `amount` that doesn't exist,
// we return `0` instead of an error.
Err(RuntimeError::KeyNotFound) => Ok((amount, 0)),
Err(e) => Err(e),
}
}
})
.collect::<Result<HashMap<Amount, usize>, RuntimeError>>()?;
Ok(Response::NumberOutputsWithAmount(map))
}
/// [`ReadRequest::CheckKIsNotSpent`].
#[inline]
#[allow(clippy::needless_pass_by_value)] // TODO: remove me
fn check_k_is_not_spent(env: &Arc<ConcreteEnv>, set: HashSet<KeyImage>) -> ResponseResult {
todo!()
fn check_k_is_not_spent(env: &ConcreteEnv, key_images: HashSet<KeyImage>) -> ResponseResult {
// Prepare tx/tables in `ThreadLocal`.
let env_inner = env.env_inner();
let tx_ro = thread_local(env);
let tables = thread_local(env);
// Key image check function.
let key_image_exists = |key_image| {
let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?;
let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref();
key_image_exists(&key_image, tables.key_images())
};
// TODO:
// Create/use `enum cuprate_types::Exist { Does, DoesNot }`
// or similar instead of `bool` for clarity.
// <https://github.com/Cuprate/cuprate/pull/113#discussion_r1581536526>
//
// Collect results using `rayon`.
match key_images
.into_par_iter()
.map(key_image_exists)
// If the result is either:
// `Ok(true)` => a key image was found, return early
// `Err` => an error was found, return early
//
// Else, `Ok(false)` will continue the iterator.
.find_any(|result| !matches!(result, Ok(false)))
{
None | Some(Ok(false)) => Ok(Response::CheckKIsNotSpent(true)), // Key image was NOT found.
Some(Ok(true)) => Ok(Response::CheckKIsNotSpent(false)), // Key image was found.
Some(Err(e)) => Err(e), // A database error occurred.
}
}

View file

@ -7,76 +7,379 @@
// This is only imported on `#[cfg(test)]` in `mod.rs`.
#![allow(unused_mut, clippy::significant_drop_tightening)]
#![allow(
clippy::significant_drop_tightening,
clippy::await_holding_lock,
clippy::too_many_lines
)]
//---------------------------------------------------------------------------------------------------- Use
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use pretty_assertions::assert_eq;
use tower::{Service, ServiceExt};
use cuprate_types::service::{ReadRequest, Response, WriteRequest};
use cuprate_test_utils::data::{block_v16_tx0, block_v1_tx2, block_v9_tx3};
use cuprate_types::{
service::{ReadRequest, Response, WriteRequest},
ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation,
};
use crate::{
config::Config,
ops::{
block::{get_block_extended_header_from_height, get_block_info},
blockchain::{chain_height, top_block_height},
output::{get_output, id_to_output_on_chain, output_to_output_on_chain},
},
service::{init, DatabaseReadHandle, DatabaseWriteHandle},
tables::{KeyImages, Tables, TablesIter},
tests::AssertTableLen,
types::{Amount, AmountIndex, KeyImage, PreRctOutputId},
ConcreteEnv, DatabaseIter, DatabaseRo, Env, EnvInner, RuntimeError,
};
//---------------------------------------------------------------------------------------------------- Tests
//---------------------------------------------------------------------------------------------------- Helper functions
/// Initialize the `service`.
fn init_service() -> (DatabaseReadHandle, DatabaseWriteHandle, tempfile::TempDir) {
fn init_service() -> (
DatabaseReadHandle,
DatabaseWriteHandle,
Arc<ConcreteEnv>,
tempfile::TempDir,
) {
let tempdir = tempfile::tempdir().unwrap();
let config = Config::low_power(Some(tempdir.path().into()));
let (reader, writer) = init(config).unwrap();
(reader, writer, tempdir)
let env = reader.env().clone();
(reader, writer, env, tempdir)
}
/// This is the template used in the actual test functions below.
///
/// - Send write request(s)
/// - Receive response(s)
/// - Assert proper tables were mutated
/// - Assert read requests lead to expected responses
#[allow(clippy::future_not_send)] // INVARIANT: tests are using a single threaded runtime
async fn test_template(
// Which block(s) to add?
block_fns: &[fn() -> &'static VerifiedBlockInformation],
// Total amount of generated coins after the block(s) have been added.
cumulative_generated_coins: u64,
// What are the table lengths be after the block(s) have been added?
assert_table_len: AssertTableLen,
) {
//----------------------------------------------------------------------- Write requests
let (reader, mut writer, env, _tempdir) = init_service();
let env_inner = env.env_inner();
let tx_ro = env_inner.tx_ro().unwrap();
let tables = env_inner.open_tables(&tx_ro).unwrap();
// HACK: `add_block()` asserts blocks with non-sequential heights
// cannot be added, to get around this, manually edit the block height.
for (i, block_fn) in block_fns.iter().enumerate() {
let mut block = block_fn().clone();
block.height = i as u64;
// Request a block to be written, assert it was written.
let request = WriteRequest::WriteBlock(block);
let response_channel = writer.call(request);
let response = response_channel.await.unwrap();
assert_eq!(response, Response::WriteBlockOk);
}
//----------------------------------------------------------------------- Reset the transaction
drop(tables);
drop(tx_ro);
let tx_ro = env_inner.tx_ro().unwrap();
let tables = env_inner.open_tables(&tx_ro).unwrap();
//----------------------------------------------------------------------- Assert all table lengths are correct
assert_table_len.assert(&tables);
//----------------------------------------------------------------------- Read request prep
// Next few lines are just for preparing the expected responses,
// see further below for usage.
let extended_block_header_0 = Ok(Response::BlockExtendedHeader(
get_block_extended_header_from_height(&0, &tables).unwrap(),
));
let extended_block_header_1 = if block_fns.len() > 1 {
Ok(Response::BlockExtendedHeader(
get_block_extended_header_from_height(&1, &tables).unwrap(),
))
} else {
Err(RuntimeError::KeyNotFound)
};
let block_hash_0 = Ok(Response::BlockHash(
get_block_info(&0, tables.block_infos()).unwrap().block_hash,
));
let block_hash_1 = if block_fns.len() > 1 {
Ok(Response::BlockHash(
get_block_info(&1, tables.block_infos()).unwrap().block_hash,
))
} else {
Err(RuntimeError::KeyNotFound)
};
let range_0_1 = Ok(Response::BlockExtendedHeaderInRange(vec![
get_block_extended_header_from_height(&0, &tables).unwrap(),
]));
let range_0_2 = if block_fns.len() >= 2 {
Ok(Response::BlockExtendedHeaderInRange(vec![
get_block_extended_header_from_height(&0, &tables).unwrap(),
get_block_extended_header_from_height(&1, &tables).unwrap(),
]))
} else {
Err(RuntimeError::KeyNotFound)
};
let chain_height = {
let height = chain_height(tables.block_heights()).unwrap();
let block_info = get_block_info(&height.saturating_sub(1), tables.block_infos()).unwrap();
Ok(Response::ChainHeight(height, block_info.block_hash))
};
let cumulative_generated_coins = Ok(Response::GeneratedCoins(cumulative_generated_coins));
let num_req = tables
.outputs_iter()
.keys()
.unwrap()
.map(Result::unwrap)
.map(|key| key.amount)
.collect::<Vec<Amount>>();
let num_resp = Ok(Response::NumberOutputsWithAmount(
num_req
.iter()
.map(|amount| match tables.num_outputs().get(amount) {
// INVARIANT: #[cfg] @ lib.rs asserts `usize == u64`
#[allow(clippy::cast_possible_truncation)]
Ok(count) => (*amount, count as usize),
Err(RuntimeError::KeyNotFound) => (*amount, 0),
Err(e) => panic!(),
})
.collect::<HashMap<Amount, usize>>(),
));
// Contains a fake non-spent key-image.
let ki_req = HashSet::from([[0; 32]]);
let ki_resp = Ok(Response::CheckKIsNotSpent(true));
//----------------------------------------------------------------------- Assert expected response
// Assert read requests lead to the expected responses.
for (request, expected_response) in [
(ReadRequest::BlockExtendedHeader(0), extended_block_header_0),
(ReadRequest::BlockExtendedHeader(1), extended_block_header_1),
(ReadRequest::BlockHash(0), block_hash_0),
(ReadRequest::BlockHash(1), block_hash_1),
(ReadRequest::BlockExtendedHeaderInRange(0..1), range_0_1),
(ReadRequest::BlockExtendedHeaderInRange(0..2), range_0_2),
(ReadRequest::ChainHeight, chain_height),
(ReadRequest::GeneratedCoins, cumulative_generated_coins),
(ReadRequest::NumberOutputsWithAmount(num_req), num_resp),
(ReadRequest::CheckKIsNotSpent(ki_req), ki_resp),
] {
let response = reader.clone().oneshot(request).await;
println!("response: {response:#?}, expected_response: {expected_response:#?}");
match response {
Ok(resp) => assert_eq!(resp, expected_response.unwrap()),
Err(ref e) => assert!(matches!(response, expected_response)),
}
}
//----------------------------------------------------------------------- Key image checks
// Assert each key image we inserted comes back as "spent".
for key_image in tables.key_images_iter().keys().unwrap() {
let key_image = key_image.unwrap();
let request = ReadRequest::CheckKIsNotSpent(HashSet::from([key_image]));
let response = reader.clone().oneshot(request).await;
println!("response: {response:#?}, key_image: {key_image:#?}");
assert_eq!(response.unwrap(), Response::CheckKIsNotSpent(false));
}
//----------------------------------------------------------------------- Output checks
// Create the map of amounts and amount indices.
//
// FIXME: There's definitely a better way to map
// `Vec<PreRctOutputId>` -> `HashMap<u64, HashSet<u64>>`
let (map, output_count) = {
let mut ids = tables
.outputs_iter()
.keys()
.unwrap()
.map(Result::unwrap)
.collect::<Vec<PreRctOutputId>>();
ids.extend(
tables
.rct_outputs_iter()
.keys()
.unwrap()
.map(Result::unwrap)
.map(|amount_index| PreRctOutputId {
amount: 0,
amount_index,
}),
);
// Used later to compare the amount of Outputs
// returned in the Response is equal to the amount
// we asked for.
let output_count = ids.len();
let mut map = HashMap::<Amount, HashSet<AmountIndex>>::new();
for id in ids {
map.entry(id.amount)
.and_modify(|set| {
set.insert(id.amount_index);
})
.or_insert_with(|| HashSet::from([id.amount_index]));
}
(map, output_count)
};
// Map `Output` -> `OutputOnChain`
// This is the expected output from the `Response`.
let outputs_on_chain = map
.iter()
.flat_map(|(amount, amount_index_set)| {
amount_index_set.iter().map(|amount_index| {
let id = PreRctOutputId {
amount: *amount,
amount_index: *amount_index,
};
id_to_output_on_chain(&id, &tables).unwrap()
})
})
.collect::<Vec<OutputOnChain>>();
// Send a request for every output we inserted before.
let request = ReadRequest::Outputs(map.clone());
let response = reader.clone().oneshot(request).await;
println!("Response::Outputs response: {response:#?}");
let Ok(Response::Outputs(response)) = response else {
panic!("{response:#?}")
};
// Assert amount of `Amount`'s are the same.
assert_eq!(map.len(), response.len());
// Assert we get back the same map of
// `Amount`'s and `AmountIndex`'s.
let mut response_output_count = 0;
for (amount, output_map) in response {
let amount_index_set = map.get(&amount).unwrap();
for (amount_index, output) in output_map {
response_output_count += 1;
assert!(amount_index_set.contains(&amount_index));
assert!(outputs_on_chain.contains(&output));
}
}
// Assert the amount of `Output`'s returned is as expected.
let table_output_len = tables.outputs().len().unwrap() + tables.rct_outputs().len().unwrap();
assert_eq!(output_count as u64, table_output_len);
assert_eq!(output_count, response_output_count);
}
//---------------------------------------------------------------------------------------------------- Tests
/// Simply `init()` the service and then drop it.
///
/// If this test fails, something is very wrong.
#[test]
fn init_drop() {
let (reader, writer, _tempdir) = init_service();
let (reader, writer, env, _tempdir) = init_service();
}
// TODO:
// un-comment and fix these tests when all `{read,write}`
// service functions are implemented.
/// Assert write/read correctness of [`block_v1_tx2`].
#[tokio::test]
async fn v1_tx2() {
test_template(
&[block_v1_tx2],
14_535_350_982_449,
AssertTableLen {
block_infos: 1,
block_blobs: 1,
block_heights: 1,
key_images: 65,
num_outputs: 41,
pruned_tx_blobs: 0,
prunable_hashes: 0,
outputs: 111,
prunable_tx_blobs: 0,
rct_outputs: 0,
tx_blobs: 3,
tx_ids: 3,
tx_heights: 3,
tx_unlock_time: 1,
},
)
.await;
}
// /// Send a read request, and receive a response,
// /// asserting the response the expected value.
// #[tokio::test]
// async fn read_request() {
// let (reader, writer, _tempdir) = init_service();
/// Assert write/read correctness of [`block_v9_tx3`].
#[tokio::test]
async fn v9_tx3() {
test_template(
&[block_v9_tx3],
3_403_774_022_163,
AssertTableLen {
block_infos: 1,
block_blobs: 1,
block_heights: 1,
key_images: 4,
num_outputs: 0,
pruned_tx_blobs: 0,
prunable_hashes: 0,
outputs: 0,
prunable_tx_blobs: 0,
rct_outputs: 7,
tx_blobs: 4,
tx_ids: 4,
tx_heights: 4,
tx_unlock_time: 1,
},
)
.await;
}
// for (request, expected_response) in [
// (ReadRequest::Example1, Response::Example1),
// (ReadRequest::Example2(123), Response::Example2(123)),
// (
// ReadRequest::Example3("hello".into()),
// Response::Example3("hello".into()),
// ),
// ] {
// // This calls `poll_ready()` asserting we have a permit before `call()`.
// let response_channel = reader.clone().oneshot(request);
// let response = response_channel.await.unwrap();
// assert_eq!(response, expected_response);
// }
// }
// /// Send a write request, and receive a response,
// /// asserting the response the expected value.
// #[tokio::test]
// async fn write_request() {
// let (reader, mut writer, _tempdir) = init_service();
// for (request, expected_response) in [
// (WriteRequest::Example1, Response::Example1),
// (WriteRequest::Example2(123), Response::Example2(123)),
// (
// WriteRequest::Example3("hello".into()),
// Response::Example3("hello".into()),
// ),
// ] {
// let response_channel = writer.call(request);
// let response = response_channel.await.unwrap();
// assert_eq!(response, expected_response);
// }
// }
/// Assert write/read correctness of [`block_v16_tx0`].
#[tokio::test]
async fn v16_tx0() {
test_template(
&[block_v16_tx0],
600_000_000_000,
AssertTableLen {
block_infos: 1,
block_blobs: 1,
block_heights: 1,
key_images: 0,
num_outputs: 0,
pruned_tx_blobs: 0,
prunable_hashes: 0,
outputs: 0,
prunable_tx_blobs: 0,
rct_outputs: 1,
tx_blobs: 1,
tx_ids: 1,
tx_heights: 1,
tx_unlock_time: 1,
},
)
.await;
}

View file

@ -15,9 +15,12 @@ use cuprate_types::{
};
use crate::{
constants::DATABASE_CORRUPT_MSG,
env::{Env, EnvInner},
error::RuntimeError,
service::types::{ResponseReceiver, ResponseResult, ResponseSender},
ConcreteEnv, Env,
transaction::TxRw,
ConcreteEnv,
};
//---------------------------------------------------------------------------------------------------- Constants
@ -226,5 +229,26 @@ impl DatabaseWriter {
/// [`WriteRequest::WriteBlock`].
#[inline]
fn write_block(env: &ConcreteEnv, block: &VerifiedBlockInformation) -> ResponseResult {
todo!()
let env_inner = env.env_inner();
let tx_rw = env_inner.tx_rw()?;
let result = {
let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
crate::ops::block::add_block(block, &mut tables_mut)
};
match result {
Ok(()) => {
tx_rw.commit()?;
Ok(Response::WriteBlockOk)
}
Err(e) => {
// INVARIANT: ensure database atomicity by aborting
// the transaction on `add_block()` failures.
tx_rw
.abort()
.expect("could not maintain database atomicity by aborting write transaction");
Err(e)
}
}
}

View file

@ -350,10 +350,10 @@ tables! {
/// Maps an output's amount to the number of outputs with that amount.
///
/// For a new output the `AmountIndex` value from this
/// table will be its index in a list of duplicate outputs.
/// For example, if there are 5 outputs with `amount = 123`
/// then calling `get(123)` on this table will return 5.
NumOutputs,
Amount => AmountIndex,
Amount => u64,
/// TODO
PrunedTxBlobs,

View file

@ -16,12 +16,61 @@ use monero_serai::{
ringct::{RctPrunable, RctSignatures},
transaction::{Timelock, Transaction, TransactionPrefix},
};
use pretty_assertions::assert_eq;
use crate::{
config::Config, key::Key, storable::Storable, tables::Tables, transaction::TxRo, ConcreteEnv,
Env, EnvInner,
DatabaseRo, Env, EnvInner,
};
//---------------------------------------------------------------------------------------------------- Struct
/// Named struct to assert the length of all tables.
///
/// This is a struct with fields instead of a function
/// so that callers can name arguments, otherwise the call-site
/// is a little confusing, i.e. `assert_table_len(0, 25, 1, 123)`.
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct AssertTableLen {
pub(crate) block_infos: u64,
pub(crate) block_blobs: u64,
pub(crate) block_heights: u64,
pub(crate) key_images: u64,
pub(crate) num_outputs: u64,
pub(crate) pruned_tx_blobs: u64,
pub(crate) prunable_hashes: u64,
pub(crate) outputs: u64,
pub(crate) prunable_tx_blobs: u64,
pub(crate) rct_outputs: u64,
pub(crate) tx_blobs: u64,
pub(crate) tx_ids: u64,
pub(crate) tx_heights: u64,
pub(crate) tx_unlock_time: u64,
}
impl AssertTableLen {
/// Assert the length of all tables.
pub(crate) fn assert(self, tables: &impl Tables) {
let other = Self {
block_infos: tables.block_infos().len().unwrap(),
block_blobs: tables.block_blobs().len().unwrap(),
block_heights: tables.block_heights().len().unwrap(),
key_images: tables.key_images().len().unwrap(),
num_outputs: tables.num_outputs().len().unwrap(),
pruned_tx_blobs: tables.pruned_tx_blobs().len().unwrap(),
prunable_hashes: tables.prunable_hashes().len().unwrap(),
outputs: tables.outputs().len().unwrap(),
prunable_tx_blobs: tables.prunable_tx_blobs().len().unwrap(),
rct_outputs: tables.rct_outputs().len().unwrap(),
tx_blobs: tables.tx_blobs().len().unwrap(),
tx_ids: tables.tx_ids().len().unwrap(),
tx_heights: tables.tx_heights().len().unwrap(),
tx_unlock_time: tables.tx_unlock_time().len().unwrap(),
};
assert_eq!(self, other);
}
}
//---------------------------------------------------------------------------------------------------- fn
/// Create an `Env` in a temporarily directory.
/// The directory is automatically removed after the `TempDir` is dropped.

View file

@ -0,0 +1,86 @@
//! Wrapper type for partially-`unsafe` usage of `T: !Send`.
//---------------------------------------------------------------------------------------------------- Import
use std::{
borrow::Borrow,
ops::{Deref, DerefMut},
};
use bytemuck::TransparentWrapper;
use crate::storable::StorableVec;
//---------------------------------------------------------------------------------------------------- Aliases
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, TransparentWrapper)]
#[repr(transparent)]
/// A wrapper type that `unsafe`ly implements `Send` for any `T`.
///
/// This is a marker/wrapper type that allows wrapping
/// any type `T` such that it implements `Send`.
///
/// This is to be used when `T` is `Send`, but only in certain
/// situations not provable to the compiler, or is otherwise a
/// a pain to prove and/or less efficient.
///
/// It is up to the users of this type to ensure their
/// usage of `UnsafeSendable` are actually safe.
///
/// Notably, `heed`'s table type uses this inside `service`.
pub(crate) struct UnsafeSendable<T>(T);
#[allow(clippy::non_send_fields_in_send_ty)]
// SAFETY: Users ensure that their usage of this type is safe.
unsafe impl<T> Send for UnsafeSendable<T> {}
impl<T> UnsafeSendable<T> {
/// Create a new [`UnsafeSendable`].
///
/// # Safety
/// By constructing this type, you must ensure the usage
/// of the resulting `Self` is follows all the [`Send`] rules.
pub(crate) const unsafe fn new(t: T) -> Self {
Self(t)
}
/// Extract the inner `T`.
pub(crate) fn into_inner(self) -> T {
self.0
}
}
impl<T> Borrow<T> for UnsafeSendable<T> {
fn borrow(&self) -> &T {
&self.0
}
}
impl<T> AsRef<T> for UnsafeSendable<T> {
fn as_ref(&self) -> &T {
&self.0
}
}
impl<T> AsMut<T> for UnsafeSendable<T> {
fn as_mut(&mut self) -> &mut T {
&mut self.0
}
}
impl<T> Deref for UnsafeSendable<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for UnsafeSendable<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
//---------------------------------------------------------------------------------------------------- Tests
#[cfg(test)]
mod test {
// use super::*;
}

View file

@ -17,16 +17,17 @@ asynch = ["dep:futures", "dep:rayon"]
constants = []
fs = ["dep:dirs"]
num = []
map = []
map = ["dep:monero-serai"]
time = ["dep:chrono", "std"]
thread = ["std", "dep:target_os_lib"]
[dependencies]
crossbeam = { workspace = true, optional = true }
chrono = { workspace = true, optional = true, features = ["std", "clock"] }
dirs = { workspace = true, optional = true }
futures = { workspace = true, optional = true, features = ["std"] }
rayon = { workspace = true, optional = true }
crossbeam = { workspace = true, optional = true }
chrono = { workspace = true, optional = true, features = ["std", "clock"] }
dirs = { workspace = true, optional = true }
futures = { workspace = true, optional = true, features = ["std"] }
monero-serai = { workspace = true, optional = true }
rayon = { workspace = true, optional = true }
# This is kinda a stupid work around.
# [thread] needs to activate one of these libs (windows|libc)

View file

@ -5,6 +5,7 @@
//! `#[no_std]` compatible.
//---------------------------------------------------------------------------------------------------- Use
use monero_serai::transaction::Timelock;
//---------------------------------------------------------------------------------------------------- `(u64, u64) <-> u128`
/// Split a [`u128`] value into 2 64-bit values.
@ -53,6 +54,54 @@ pub const fn combine_low_high_bits_to_u128(low_bits: u64, high_bits: u64) -> u12
res | (low_bits as u128)
}
//---------------------------------------------------------------------------------------------------- Timelock
/// Map a [`u64`] to a [`Timelock`].
///
/// Height/time is not differentiated via type, but rather:
/// "height is any value less than 500_000_000 and timestamp is any value above"
/// so the `u64/usize` is stored without any tag.
///
/// See [`timelock_to_u64`] for the inverse function.
///
/// - <https://github.com/Cuprate/cuprate/pull/102#discussion_r1558504285>
/// - <https://github.com/serai-dex/serai/blob/bc1dec79917d37d326ac3d9bc571a64131b0424a/coins/monero/src/transaction.rs#L139>
///
/// ```rust
/// # use cuprate_helper::map::*;
/// # use monero_serai::transaction::*;
/// assert_eq!(u64_to_timelock(0), Timelock::None);
/// assert_eq!(u64_to_timelock(499_999_999), Timelock::Block(499_999_999));
/// assert_eq!(u64_to_timelock(500_000_000), Timelock::Time(500_000_000));
/// ```
pub fn u64_to_timelock(u: u64) -> Timelock {
if u == 0 {
Timelock::None
} else if u < 500_000_000 {
Timelock::Block(usize::try_from(u).unwrap())
} else {
Timelock::Time(u)
}
}
/// Map [`Timelock`] to a [`u64`].
///
/// See [`u64_to_timelock`] for the inverse function and more documentation.
///
/// ```rust
/// # use cuprate_helper::map::*;
/// # use monero_serai::transaction::*;
/// assert_eq!(timelock_to_u64(Timelock::None), 0);
/// assert_eq!(timelock_to_u64(Timelock::Block(499_999_999)), 499_999_999);
/// assert_eq!(timelock_to_u64(Timelock::Time(500_000_000)), 500_000_000);
/// ```
pub fn timelock_to_u64(timelock: Timelock) -> u64 {
match timelock {
Timelock::None => 0,
Timelock::Block(u) => u64::try_from(u).unwrap(),
Timelock::Time(u) => u,
}
}
//---------------------------------------------------------------------------------------------------- Tests
#[cfg(test)]
mod test {}

View file

@ -0,0 +1,14 @@
-----BEGIN PGP PUBLIC KEY BLOCK-----
mDMEZb0y4RYJKwYBBAHaRw8BAQdAvMid+QsSxLULIkKPLf0XWgPxaoG89qPNiQ4S
fXH0BfW0VlN5bnRoZXRpY0JpcmQ0NSAoQ3VwcmF0ZSdzIGRldmVsb3BlcikgPHNv
bWVvbmVlbHNlLmlzX29uLmdpdGh1Yi5yaW83eEBzaW1wbGVsb2dpbi5jb20+iJME
ExYKADsWIQQEmOfWc9FTBiAKoHnHaXP3SFIeEQUCZb0y4QIbAwULCQgHAgIiAgYV
CgkICwIEFgIDAQIeBwIXgAAKCRDHaXP3SFIeEUx+AQDYd7t75+V4/aSTczLxMGuT
A84qGRuYNStXUJzjV8F21wD/YVlybZcr9dDQ/+YOgh5aXBzo+oGm+XhhSbI3QdIX
LAC4OARlvTLhEgorBgEEAZdVAQUBAQdAgRoSFUmnCqETElyry97kFwsdzlNyldk2
ZPgH9J4fCHwDAQgHiHgEGBYKACAWIQQEmOfWc9FTBiAKoHnHaXP3SFIeEQUCZb0y
4QIbDAAKCRDHaXP3SFIeETDSAP4k8+jUaStnjrkzN1jvRg136qNfwe8ZzjrsWJ0n
FOS8zAEA/fwRjRyvEP28KJNiKdyhDYWYJTpyLGTiPP8b43NsHAM=
=gqqy
-----END PGP PUBLIC KEY BLOCK-----

View file

@ -1,15 +0,0 @@
-----BEGIN PGP PUBLIC KEY BLOCK-----
mE8EZAt90BMFK4EEAAoCAwS8WnB3wMu+JxWm3LpuHO1jcdwIlMjndqoGCcJnFEKm
shkx1eE21AoCGJYYAjeVLrazF5hqTzs6UpBuP7ZNaXvJtEBTeW50aGV0aWNCaXJk
NDUgPHNvbWVvbmVlbHNlLmlzX29uLmdpdGh1Yi5yaW83eEBzaW1wbGVsb2dpbi5j
b20+iJAEExMIADgWIQTX0AOzMdcNEMyKDV31QokN0AEPEQUCZAt90AIbAwULCQgH
AgYVCgkICwIEFgIDAQIeAQIXgAAKCRD1QokN0AEPEWp0AQCDCOdgi3LRFLrF/rR9
zBy6ceMgAp4Z/GJMO66je3BeIgD9HPo7OkRsKvI1kCf7X9KDV6M0+bmYpC23HYpN
1zWnq++4UwRkC33QEgUrgQQACgIDBGfPz0WQRKwicAMkUF2InuOns4aU/1bDwidd
wP426408APfJ7vTtKOVFjfHzKLLiw1Z0texwhBL0y76nggkzVbMDAQgHiHgEGBMI
ACAWIQTX0AOzMdcNEMyKDV31QokN0AEPEQUCZAt90AIbDAAKCRD1QokN0AEPERQg
APsHUaCbt1BByhXpVu34C9bY6P1Sw9ARpfl9cc2kAEnQRQD+Klmx13c/WOj6euF6
RMKtt34En+0xhP99yfEpoofta/0=
=Pkk7
-----END PGP PUBLIC KEY BLOCK-----

View file

@ -105,7 +105,7 @@ impl<const N: usize> ByteArrayVec<N> {
/// Splits the byte array vec into two at the given index.
///
/// Afterwards self contains elements [0, at), and the returned Bytes contains elements [at, len).
/// Afterwards self contains elements [0, at), and the returned [`ByteArrayVec`] contains elements [at, len).
///
/// This is an O(1) operation that just increases the reference count and sets a few indices.
///

View file

@ -30,7 +30,6 @@ bitflags! {
impl PeerSupportFlags: u32 {
const FLUFFY_BLOCKS = 0b0000_0001;
const _ = !0;
}
}

View file

@ -109,20 +109,18 @@ impl<Z: NetworkZone> PeerList<Z> {
}
return self.remove_peer(&peer);
} else {
let len = self.len();
if len == 0 {
return None;
} else {
let n = r.gen_range(0..len);
}
let len = self.len();
let (&key, _) = self.peers.get_index(n).unwrap();
if must_keep_peers.contains(&key) {
continue;
}
if len == 0 {
return None;
}
return self.remove_peer(&key);
}
let n = r.gen_range(0..len);
let (&key, _) = self.peers.get_index(n).unwrap();
if !must_keep_peers.contains(&key) {
return self.remove_peer(&key);
}
}

View file

@ -32,7 +32,7 @@ use monero_pruning::PruningSeed;
/// or a random u64 if not.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum InternalPeerID<A> {
/// A known address
/// A known address.
KnownAddr(A),
/// An unknown address (probably an inbound anonymity network connection).
Unknown(u64),
@ -42,7 +42,7 @@ impl<A: Display> Display for InternalPeerID<A> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
InternalPeerID::KnownAddr(addr) => addr.fmt(f),
InternalPeerID::Unknown(id) => f.write_str(&format!("Unknown, ID: {}", id)),
InternalPeerID::Unknown(id) => f.write_str(&format!("Unknown, ID: {id}")),
}
}
}
@ -79,7 +79,7 @@ pub struct Client<Z: NetworkZone> {
/// The semaphore that limits the requests sent to the peer.
semaphore: PollSemaphore,
/// A permit for the semaphore, will be some after `poll_ready` returns ready.
/// A permit for the semaphore, will be [`Some`] after `poll_ready` returns ready.
permit: Option<OwnedSemaphorePermit>,
/// The error slot shared between the [`Client`] and [`Connection`](connection::Connection).
@ -137,9 +137,8 @@ impl<Z: NetworkZone> Service<PeerRequest> for Client<Z> {
return Poll::Ready(Ok(()));
}
let Some(permit) = ready!(self.semaphore.poll_acquire(cx)) else {
unreachable!("Client semaphore should not be closed!");
};
let permit = ready!(self.semaphore.poll_acquire(cx))
.expect("Client semaphore should not be closed!");
self.permit = Some(permit);
@ -147,9 +146,10 @@ impl<Z: NetworkZone> Service<PeerRequest> for Client<Z> {
}
fn call(&mut self, request: PeerRequest) -> Self::Future {
let Some(permit) = self.permit.take() else {
panic!("poll_ready did not return ready before call to call")
};
let permit = self
.permit
.take()
.expect("poll_ready did not return ready before call to call");
let (tx, rx) = oneshot::channel();
let req = connection::ConnectionTaskRequest {

View file

@ -160,21 +160,24 @@ where
self.send_message_to_peer(req.request.into()).await?;
// Set the timeout after sending the message, TODO: Is this a good idea.
self.request_timeout = Some(Box::pin(sleep(REQUEST_TIMEOUT)));
} else {
// For future devs: This function cannot exit early without sending a response back down the
// response channel.
let res = self.send_message_to_peer(req.request.into()).await;
// send the response now, the request does not need a response from the peer.
if let Err(e) = res {
let err_str = e.to_string();
let _ = req.response_channel.send(Err(err_str.clone().into()));
Err(e)?
} else {
// We still need to respond even if the response is this.
let _ = req.response_channel.send(Ok(PeerResponse::NA));
}
return Ok(());
}
// INVARIANT: This function cannot exit early without sending a response back down the
// response channel.
let res = self.send_message_to_peer(req.request.into()).await;
// send the response now, the request does not need a response from the peer.
if let Err(e) = res {
// can't clone the error so turn it to a string first, hacky but oh well.
let err_str = e.to_string();
let _ = req.response_channel.send(Err(err_str.clone().into()));
return Err(e);
} else {
// We still need to respond even if the response is this.
let _ = req.response_channel.send(Ok(PeerResponse::NA));
}
Ok(())
}

View file

@ -3,7 +3,7 @@
//! This module handles connecting to peers and giving the sink/stream to the handshaker which will then
//! perform a handshake and create a [`Client`].
//!
//! This is where outbound connections are crated.
//! This is where outbound connections are created.
//!
use std::{
future::Future,
@ -23,7 +23,7 @@ use crate::{
/// A request to connect to a peer.
pub struct ConnectRequest<Z: NetworkZone> {
/// The peers address
/// The peer's address.
pub addr: Z::Addr,
/// A permit which will be held be the connection allowing you to set limits on the number of
/// connections.

View file

@ -18,7 +18,6 @@ futures = { workspace = true, features = ["std"] }
async-trait = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
bytes = { workspace = true, features = ["std"] }
@ -26,13 +25,6 @@ tempfile = { workspace = true }
borsh = { workspace = true, features = ["derive"]}
[target.'cfg(unix)'.dependencies]
tar = "0.4.40"
bzip2 = "0.4.4"
[target.'cfg(windows)'.dependencies]
zip = "0.6"
[dev-dependencies]
hex = { workspace = true }
pretty_assertions = { workspace = true }

View file

@ -4,9 +4,12 @@
//! this to test compatibility with monerod.
//!
use std::{
env::current_dir,
ffi::OsStr,
fs::read_dir,
io::Read,
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener},
path::PathBuf,
process::{Child, Command, Stdio},
str::from_utf8,
thread::panicking,
@ -15,14 +18,9 @@ use std::{
use tokio::{task::yield_now, time::timeout};
mod download;
/// IPv4 local host.
const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST);
/// The `monerod` version to use.
const MONEROD_VERSION: &str = "v0.18.3.1";
/// The log line `monerod` emits indicated it has successfully started up.
const MONEROD_STARTUP_TEXT: &str =
"The daemon will start synchronizing with the network. This may take a long time to complete.";
@ -34,7 +32,7 @@ const MONEROD_SHUTDOWN_TEXT: &str = "Stopping cryptonote protocol";
///
/// This function will set `regtest` and the P2P/ RPC ports so these can't be included in the flags.
pub async fn monerod<T: AsRef<OsStr>>(flags: impl IntoIterator<Item = T>) -> SpawnedMoneroD {
let path_to_monerod = download::check_download_monerod().await.unwrap();
let path_to_monerod = find_root().join("monerod");
let rpc_port = get_available_port(&[]);
let p2p_port = get_available_port(&[rpc_port]);
@ -54,7 +52,9 @@ pub async fn monerod<T: AsRef<OsStr>>(flags: impl IntoIterator<Item = T>) -> Spa
.arg(format!("--data-dir={}", data_dir.path().display()))
.arg("--non-interactive")
.spawn()
.unwrap();
.expect(
"Failed to start monerod, you need to have the monerod binary in the root of the repo",
);
let mut logs = String::new();
@ -92,6 +92,20 @@ pub async fn monerod<T: AsRef<OsStr>>(flags: impl IntoIterator<Item = T>) -> Spa
}
}
/// Finds the root of the repo by finding the `target` directory, this will work up from the current
/// directory until it finds a `target` directory, then returns the directory that the target is contained
/// in.
fn find_root() -> PathBuf {
let mut current_dir = current_dir().unwrap();
loop {
if read_dir(current_dir.join("target")).is_ok() {
return current_dir;
} else if !current_dir.pop() {
panic!("Could not find ./target");
}
}
}
/// Fetch an available TCP port on the machine for `monerod` to bind to.
fn get_available_port(already_taken: &[u16]) -> u16 {
loop {

View file

@ -1,106 +0,0 @@
//! Downloading Monerod Module
//!
//! This module handles finding the right monerod file to download, downloading it and extracting it.
//!
use std::{
env::{
consts::{ARCH, OS},
current_dir,
},
fs::read_dir,
path::{Path, PathBuf},
};
#[cfg(unix)]
use bytes::Buf;
use reqwest::{get, Error as ReqError};
use tokio::sync::Mutex;
use super::MONEROD_VERSION;
/// A mutex to make sure only one thread at a time downloads monerod.
static DOWNLOAD_MONEROD_MUTEX: Mutex<()> = Mutex::const_new(());
/// Returns the file name to download and the expected extracted folder name.
fn file_name(version: &str) -> (String, String) {
let download_file = match (OS, ARCH) {
("windows", "x64" | "x86_64") => format!("monero-win-x64-{version}.zip"),
("windows", "x86") => format!("monero-win-x86-{version}.zip"),
("linux", "x64" | "x86_64") => format!("monero-linux-x64-{version}.tar.bz2"),
("linux", "x86") => format!("monero-linux-x86-{version}.tar.bz2"),
("macos", "x64" | "x86_64") => format!("monero-mac-x64-{version}.tar.bz2"),
("macos", "aarch64") => format!("monero-mac-armv8-{version}.tar.bz2"),
_ => panic!("Can't get monerod for {OS}, {ARCH}."),
};
let extracted_dir = match (OS, ARCH) {
("windows", "x64" | "x86_64") => {
format!("monero-x86_64-w64-mingw32-{version}")
}
("windows", "x86") => format!("monero-i686-w64-mingw32-{version}"),
("linux", "x64" | "x86_64") => format!("monero-x86_64-linux-gnu-{version}"),
("linux", "x86") => format!("monero-i686-linux-gnu-{version}"),
("macos", "x64" | "x86_64") => {
format!("monero-x86_64-apple-darwin11-{version}")
}
("macos", "aarch64") => format!("monero-aarch64-apple-darwin11-{version}"),
_ => panic!("Can't get monerod for {OS}, {ARCH}."),
};
(download_file, extracted_dir)
}
/// Downloads the monerod file provided, extracts it and puts the extracted folder into `path_to_store`.
async fn download_monerod(file_name: &str, path_to_store: &Path) -> Result<(), ReqError> {
let res = get(format!("https://downloads.getmonero.org/cli/{file_name}")).await?;
let monerod_archive = res.bytes().await.unwrap();
#[cfg(unix)]
{
let bzip_decomp = bzip2::read::BzDecoder::new(monerod_archive.reader());
let mut tar_archive = tar::Archive::new(bzip_decomp);
tar_archive.unpack(path_to_store).unwrap();
}
#[cfg(windows)]
{
let mut zip = zip::ZipArchive::new(std::io::Cursor::new(monerod_archive.as_ref())).unwrap();
zip.extract(path_to_store).unwrap();
}
Ok(())
}
/// Finds the `target` directory, this will work up from the current directory until
/// it finds a `target` directory.
fn find_target() -> PathBuf {
let mut current_dir = current_dir().unwrap();
loop {
let potential_target = current_dir.join("target");
if read_dir(current_dir.join("target")).is_ok() {
return potential_target;
} else if !current_dir.pop() {
panic!("Could not find ./target");
}
}
}
/// Checks if we have monerod or downloads it if we don't and then returns the path to it.
pub(crate) async fn check_download_monerod() -> Result<PathBuf, ReqError> {
// make sure no other threads are downloading monerod at the same time.
let _guard = DOWNLOAD_MONEROD_MUTEX.lock().await;
let path_to_store = find_target();
let (file_name, dir_name) = file_name(MONEROD_VERSION);
let path_to_monerod = path_to_store.join(dir_name);
// Check if we already have monerod
if read_dir(&path_to_monerod).is_ok() {
return Ok(path_to_monerod.join("monerod"));
}
download_monerod(&file_name, &path_to_store).await?;
Ok(path_to_monerod.join("monerod"))
}