diff --git a/Cargo.lock b/Cargo.lock index 052b1ee6..77531897 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -850,6 +850,24 @@ dependencies = [ [[package]] name = "cuprate-txpool" version = "0.0.0" +dependencies = [ + "bitflags 2.5.0", + "bytemuck", + "cuprate-database", + "cuprate-database-service", + "cuprate-helper", + "cuprate-test-utils", + "cuprate-types", + "hex", + "hex-literal", + "monero-serai", + "rayon", + "serde", + "tempfile", + "thiserror", + "tokio", + "tower", +] [[package]] name = "cuprate-types" diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 3b7f2ae1..004285d3 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -10,7 +10,7 @@ //! implement a database you need to have a service which accepts [`BlockchainReadRequest`] and responds //! with [`BlockchainResponse`]. //! -use cuprate_consensus_rules::{ConsensusError, HardFork}; +use cuprate_consensus_rules::ConsensusError; mod batch_verifier; pub mod block; @@ -27,7 +27,10 @@ pub use context::{ pub use transactions::{TxVerifierService, VerifyTxRequest, VerifyTxResponse}; // re-export. -pub use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; +pub use cuprate_types::{ + blockchain::{BlockchainReadRequest, BlockchainResponse}, + HardFork, +}; /// An Error returned from one of the consensus services. #[derive(Debug, thiserror::Error)] diff --git a/helper/src/fs.rs b/helper/src/fs.rs index d38ee334..7290361f 100644 --- a/helper/src/fs.rs +++ b/helper/src/fs.rs @@ -163,6 +163,19 @@ impl_path_lazylock! { CUPRATE_BLOCKCHAIN_DIR, data_dir, "blockchain", + + /// Cuprate's transaction pool directory. + /// + /// This is the PATH used for any Cuprate txpool files. + /// + /// | OS | PATH | + /// |---------|------------------------------------------------------------| + /// | Windows | `C:\Users\Alice\AppData\Roaming\Cuprate\txpool\` | + /// | macOS | `/Users/Alice/Library/Application Support/Cuprate/txpool/` | + /// | Linux | `/home/alice/.local/share/cuprate/txpool/` | + CUPRATE_TXPOOL_DIR, + data_dir, + "txpool", } //---------------------------------------------------------------------------------------------------- Tests @@ -198,6 +211,10 @@ mod test { let dir = &*CUPRATE_BLOCKCHAIN_DIR; println!("cuprate_blockchain_dir: {dir:?}"); assert!(dir.ends_with(r"AppData\Roaming\Cuprate\blockchain")); + + let dir = &*CUPRATE_TXPOOL_DIR; + println!("cuprate_txpool_dir: {dir:?}"); + assert!(dir.ends_with(r"AppData\Roaming\Cuprate\txpool")); } else if cfg!(target_os = "macos") { let dir = &*CUPRATE_CACHE_DIR; println!("cuprate_cache_dir: {dir:?}"); @@ -214,6 +231,10 @@ mod test { let dir = &*CUPRATE_BLOCKCHAIN_DIR; println!("cuprate_blockchain_dir: {dir:?}"); assert!(dir.ends_with("Library/Application Support/Cuprate/blockchain")); + + let dir = &*CUPRATE_TXPOOL_DIR; + println!("cuprate_txpool_dir: {dir:?}"); + assert!(dir.ends_with("Library/Application Support/Cuprate/txpool")); } else { // Assumes Linux. let dir = &*CUPRATE_CACHE_DIR; @@ -231,6 +252,10 @@ mod test { let dir = &*CUPRATE_BLOCKCHAIN_DIR; println!("cuprate_blockchain_dir: {dir:?}"); assert!(dir.ends_with(".local/share/cuprate/blockchain")); + + let dir = &*CUPRATE_TXPOOL_DIR; + println!("cuprate_txpool_dir: {dir:?}"); + assert!(dir.ends_with(".local/share/cuprate/txpool")); } } } diff --git a/storage/blockchain/Cargo.toml b/storage/blockchain/Cargo.toml index f45f1bcb..7e79305a 100644 --- a/storage/blockchain/Cargo.toml +++ b/storage/blockchain/Cargo.toml @@ -26,8 +26,8 @@ cuprate-database-service = { path = "../service" } cuprate-helper = { path = "../../helper", features = ["fs", "thread", "map"] } cuprate-types = { path = "../../types", features = ["blockchain"] } -bitflags = { workspace = true, features = ["serde", "bytemuck"] } -bytemuck = { version = "1.14.3", features = ["must_cast", "derive", "min_const_generics", "extern_crate_alloc"] } +bitflags = { workspace = true, features = ["std", "serde", "bytemuck"] } +bytemuck = { workspace = true, features = ["must_cast", "derive", "min_const_generics", "extern_crate_alloc"] } curve25519-dalek = { workspace = true } cuprate-pruning = { path = "../../pruning" } monero-serai = { workspace = true, features = ["std"] } diff --git a/storage/blockchain/src/constants.rs b/storage/blockchain/src/constants.rs index 7f00d4cd..87268858 100644 --- a/storage/blockchain/src/constants.rs +++ b/storage/blockchain/src/constants.rs @@ -14,21 +14,6 @@ /// pub const DATABASE_VERSION: u64 = 0; -//---------------------------------------------------------------------------------------------------- Error Messages -/// Corrupt database error message. -/// -/// The error message shown to end-users in panic -/// messages if we think the database is corrupted. -/// -/// This is meant to be user-friendly. -pub const DATABASE_CORRUPT_MSG: &str = r"Cuprate has encountered a fatal error. The database may be corrupted. - -TODO: instructions on: -1. What to do -2. How to fix (re-sync, recover, etc) -3. General advice for preventing corruption -4. etc"; - //---------------------------------------------------------------------------------------------------- Tests #[cfg(test)] mod test {} diff --git a/storage/blockchain/src/lib.rs b/storage/blockchain/src/lib.rs index ec6d0824..e544a69e 100644 --- a/storage/blockchain/src/lib.rs +++ b/storage/blockchain/src/lib.rs @@ -117,7 +117,7 @@ compile_error!("Cuprate is only compatible with 64-bit CPUs"); mod constants; mod free; -pub use constants::{DATABASE_CORRUPT_MSG, DATABASE_VERSION}; +pub use constants::DATABASE_VERSION; pub use cuprate_database; pub use free::open; diff --git a/storage/blockchain/src/tables.rs b/storage/blockchain/src/tables.rs index caac7873..122ac31b 100644 --- a/storage/blockchain/src/tables.rs +++ b/storage/blockchain/src/tables.rs @@ -28,7 +28,6 @@ use crate::types::{ // - Tables are defined in plural to avoid name conflicts with types // - If adding/changing a table also edit: // - the tests in `src/backend/tests.rs` -// - `call_fn_on_all_tables_or_early_return!()` macro in `src/open_tables.rs` cuprate_database::define_tables! { /// Serialized block blobs (bytes). /// diff --git a/storage/blockchain/src/types.rs b/storage/blockchain/src/types.rs index 9abd1750..eb1dc647 100644 --- a/storage/blockchain/src/types.rs +++ b/storage/blockchain/src/types.rs @@ -1,4 +1,4 @@ -//! Database [table](crate::tables) types. +//! Blockchain [table](crate::tables) types. //! //! This module contains all types used by the database tables, //! and aliases for common Monero-related types that use the diff --git a/storage/txpool/Cargo.toml b/storage/txpool/Cargo.toml index 536d445a..d5ea77d0 100644 --- a/storage/txpool/Cargo.toml +++ b/storage/txpool/Cargo.toml @@ -4,12 +4,40 @@ version = "0.0.0" edition = "2021" description = "Cuprate's transaction pool database" license = "MIT" -authors = ["hinto-janai"] -repository = "https://github.com/Cuprate/cuprate/tree/main/storage/cuprate-txpool" +authors = ["Boog900"] +repository = "https://github.com/Cuprate/cuprate/tree/main/storage/txpool" keywords = ["cuprate", "txpool", "transaction", "pool", "database"] [features] +default = ["heed", "service"] +# default = ["redb", "service"] +# default = ["redb-memory", "service"] +heed = ["cuprate-database/heed"] +redb = ["cuprate-database/redb"] +redb-memory = ["cuprate-database/redb-memory"] +service = ["dep:tower", "dep:rayon", "dep:cuprate-database-service"] +serde = ["dep:serde", "cuprate-database/serde", "cuprate-database-service/serde"] [dependencies] +cuprate-database = { path = "../database", features = ["heed"] } +cuprate-database-service = { path = "../service", optional = true } +cuprate-types = { path = "../../types" } +cuprate-helper = { path = "../../helper", default-features = false, features = ["constants"] } + +monero-serai = { workspace = true, features = ["std"] } +bytemuck = { workspace = true, features = ["must_cast", "derive", "min_const_generics", "extern_crate_alloc"] } +bitflags = { workspace = true, features = ["std", "serde", "bytemuck"] } +thiserror = { workspace = true } +hex = { workspace = true } + +tower = { workspace = true, optional = true } +rayon = { workspace = true, optional = true } + +serde = { workspace = true, optional = true } [dev-dependencies] +cuprate-test-utils = { path = "../../test-utils" } + +tokio = { workspace = true } +tempfile = { workspace = true } +hex-literal = { workspace = true } diff --git a/storage/txpool/README.md b/storage/txpool/README.md new file mode 100644 index 00000000..80d3b25b --- /dev/null +++ b/storage/txpool/README.md @@ -0,0 +1,114 @@ +Cuprate's tx-pool database. + +This documentation is mostly for practical usage of `cuprate_txpool`. + +For a high-level overview, see the database section in +[Cuprate's architecture book](https://architecture.cuprate.org). + +If you're looking for a database crate, consider using the lower-level +[`cuprate-database`](https://doc.cuprate.org/cuprate_database) +crate that this crate is built on-top of. + +# Purpose + +This crate does 3 things: + +1. Uses [`cuprate_database`] as a base database layer +1. Implements various transaction pool related [operations](ops), [tables], and [types] +1. Exposes a [`tower::Service`] backed by a thread-pool + +Each layer builds on-top of the previous. + +As a user of `cuprate_txpool`, consider using the higher-level [`service`] module, +or at the very least the [`ops`] module instead of interacting with the `cuprate_database` traits directly. + +# `cuprate_database` + +Consider reading `cuprate_database`'s crate documentation before this crate, as it is the first layer. + +If/when this crate needs is used, be sure to use the version that this crate re-exports, e.g.: + +```rust +use cuprate_txpool::{ + cuprate_database::RuntimeError, +}; +``` + +This ensures the types/traits used from `cuprate_database` are the same ones used by `cuprate_txpool` internally. + +# Feature flags + +The `service` module requires the `service` feature to be enabled. +See the module for more documentation. + +Different database backends are enabled by the feature flags: + +- `heed` (LMDB) +- `redb` + +The default is `heed`. + +`tracing` is always enabled and cannot be disabled via feature-flag. + + +# Invariants when not using `service` + +See [`cuprate_blockchain`](https://doc.cuprate.org/cuprate_blockchain), the invariants are the same. + +# Examples + +The below is an example of using `cuprate_txpool`'s +lowest API, i.e. using a mix of this crate and `cuprate_database`'s traits directly - +**this is NOT recommended.** + +For examples of the higher-level APIs, see: + +- [`ops`] +- [`service`] + +```rust +use cuprate_txpool::{ + cuprate_database::{ + ConcreteEnv, + Env, EnvInner, + DatabaseRo, DatabaseRw, TxRo, TxRw, + }, + config::ConfigBuilder, + tables::{Tables, TablesMut, OpenTables}, +}; + +# fn main() -> Result<(), Box> { + // Create a configuration for the database environment. + let tmp_dir = tempfile::tempdir()?; + let db_dir = tmp_dir.path().to_owned(); + let config = ConfigBuilder::new() + .db_directory(db_dir.into()) + .build(); + + // Initialize the database environment. + let env = cuprate_txpool::open(config)?; + + // Open up a transaction + tables for writing. + let env_inner = env.env_inner(); + let tx_rw = env_inner.tx_rw()?; + let mut tables = env_inner.open_tables_mut(&tx_rw)?; + + // ⚠️ Write data to the tables directly. + // (not recommended, use `ops` or `service`). + const KEY_IMAGE: [u8; 32] = [88; 32]; + const TX_HASH: [u8; 32] = [88; 32]; + tables.spent_key_images_mut().put(&KEY_IMAGE, &TX_HASH)?; + + // Commit the data written. + drop(tables); + TxRw::commit(tx_rw)?; + + // Read the data, assert it is correct. + let tx_ro = env_inner.tx_ro()?; + let tables = env_inner.open_tables(&tx_ro)?; + let (key_image, tx_hash) = tables.spent_key_images().first()?; + assert_eq!(key_image, KEY_IMAGE); + assert_eq!(tx_hash, TX_HASH); + # Ok(()) +} +``` diff --git a/storage/txpool/src/config.rs b/storage/txpool/src/config.rs new file mode 100644 index 00000000..8d09b5e5 --- /dev/null +++ b/storage/txpool/src/config.rs @@ -0,0 +1,232 @@ +//! The transaction pool [`Config`]. +use std::{borrow::Cow, path::Path}; + +use cuprate_database::{ + config::{Config as DbConfig, SyncMode}, + resize::ResizeAlgorithm, +}; +use cuprate_database_service::ReaderThreads; +use cuprate_helper::fs::CUPRATE_TXPOOL_DIR; + +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; + +/// The default transaction pool weight limit. +const DEFAULT_TXPOOL_WEIGHT_LIMIT: usize = 600 * 1024 * 1024; + +//---------------------------------------------------------------------------------------------------- ConfigBuilder +/// Builder for [`Config`]. +/// +// SOMEDAY: there's are many more options to add in the future. +#[derive(Debug, Clone, PartialEq, PartialOrd)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct ConfigBuilder { + /// [`Config::db_directory`]. + db_directory: Option>, + + /// [`Config::cuprate_database_config`]. + db_config: cuprate_database::config::ConfigBuilder, + + /// [`Config::reader_threads`]. + reader_threads: Option, + + /// [`Config::max_txpool_weight`]. + max_txpool_weight: Option, +} + +impl ConfigBuilder { + /// Create a new [`ConfigBuilder`]. + /// + /// [`ConfigBuilder::build`] can be called immediately + /// after this function to use default values. + pub fn new() -> Self { + Self { + db_directory: None, + db_config: cuprate_database::config::ConfigBuilder::new(Cow::Borrowed( + &*CUPRATE_TXPOOL_DIR, + )), + reader_threads: None, + max_txpool_weight: None, + } + } + + /// Build into a [`Config`]. + /// + /// # Default values + /// If [`ConfigBuilder::db_directory`] was not called, + /// the default [`CUPRATE_TXPOOL_DIR`] will be used. + /// + /// For all other values, [`Default::default`] is used. + pub fn build(self) -> Config { + // INVARIANT: all PATH safety checks are done + // in `helper::fs`. No need to do them here. + let db_directory = self + .db_directory + .unwrap_or_else(|| Cow::Borrowed(&*CUPRATE_TXPOOL_DIR)); + + let reader_threads = self.reader_threads.unwrap_or_default(); + + let max_txpool_weight = self + .max_txpool_weight + .unwrap_or(DEFAULT_TXPOOL_WEIGHT_LIMIT); + + let db_config = self + .db_config + .db_directory(db_directory) + .reader_threads(reader_threads.as_threads()) + .build(); + + Config { + db_config, + reader_threads, + max_txpool_weight, + } + } + + /// Sets a new maximum weight for the transaction pool. + #[must_use] + pub const fn max_txpool_weight(mut self, max_txpool_weight: usize) -> Self { + self.max_txpool_weight = Some(max_txpool_weight); + self + } + + /// Set a custom database directory (and file) [`Path`]. + #[must_use] + pub fn db_directory(mut self, db_directory: Cow<'static, Path>) -> Self { + self.db_directory = Some(db_directory); + self + } + + /// Calls [`cuprate_database::config::ConfigBuilder::sync_mode`]. + #[must_use] + pub fn sync_mode(mut self, sync_mode: SyncMode) -> Self { + self.db_config = self.db_config.sync_mode(sync_mode); + self + } + + /// Calls [`cuprate_database::config::ConfigBuilder::resize_algorithm`]. + #[must_use] + pub fn resize_algorithm(mut self, resize_algorithm: ResizeAlgorithm) -> Self { + self.db_config = self.db_config.resize_algorithm(resize_algorithm); + self + } + + /// Set a custom [`ReaderThreads`]. + #[must_use] + pub const fn reader_threads(mut self, reader_threads: ReaderThreads) -> Self { + self.reader_threads = Some(reader_threads); + self + } + + /// Tune the [`ConfigBuilder`] for the highest performing, + /// but also most resource-intensive & maybe risky settings. + /// + /// Good default for testing, and resource-available machines. + #[must_use] + pub fn fast(mut self) -> Self { + self.db_config = + cuprate_database::config::ConfigBuilder::new(Cow::Borrowed(&*CUPRATE_TXPOOL_DIR)) + .fast(); + + self.reader_threads = Some(ReaderThreads::OnePerThread); + self + } + + /// Tune the [`ConfigBuilder`] for the lowest performing, + /// but also least resource-intensive settings. + /// + /// Good default for resource-limited machines, e.g. a cheap VPS. + #[must_use] + pub fn low_power(mut self) -> Self { + self.db_config = + cuprate_database::config::ConfigBuilder::new(Cow::Borrowed(&*CUPRATE_TXPOOL_DIR)) + .low_power(); + + self.reader_threads = Some(ReaderThreads::One); + self + } +} + +impl Default for ConfigBuilder { + fn default() -> Self { + let db_directory = Cow::Borrowed(CUPRATE_TXPOOL_DIR.as_path()); + Self { + db_directory: Some(db_directory.clone()), + db_config: cuprate_database::config::ConfigBuilder::new(db_directory), + reader_threads: Some(ReaderThreads::default()), + max_txpool_weight: Some(DEFAULT_TXPOOL_WEIGHT_LIMIT), + } + } +} + +//---------------------------------------------------------------------------------------------------- Config +/// `cuprate_txpool` configuration. +/// +/// This is a configuration built on-top of [`DbConfig`]. +/// +/// It contains configuration specific to this crate, plus the database config. +/// +/// For construction, either use [`ConfigBuilder`] or [`Config::default`]. +#[derive(Debug, Clone, PartialEq, PartialOrd)] +pub struct Config { + /// The database configuration. + pub db_config: DbConfig, + + /// Database reader thread count. + pub reader_threads: ReaderThreads, + + /// The maximum weight of the transaction pool, after which we will start dropping transactions. + // TODO: enforce this max size. + pub max_txpool_weight: usize, +} + +impl Config { + /// Create a new [`Config`] with sane default settings. + /// + /// The [`DbConfig::db_directory`] + /// will be set to [`CUPRATE_TXPOOL_DIR`]. + /// + /// All other values will be [`Default::default`]. + /// + /// Same as [`Config::default`]. + /// + /// ```rust + /// use cuprate_database::{ + /// config::SyncMode, + /// resize::ResizeAlgorithm, + /// DATABASE_DATA_FILENAME, + /// }; + /// use cuprate_database_service::ReaderThreads; + /// use cuprate_helper::fs::*; + /// + /// use cuprate_txpool::Config; + /// + /// let config = Config::new(); + /// + /// assert_eq!(config.db_config.db_directory(), &*CUPRATE_TXPOOL_DIR); + /// assert!(config.db_config.db_file().starts_with(&*CUPRATE_TXPOOL_DIR)); + /// assert!(config.db_config.db_file().ends_with(DATABASE_DATA_FILENAME)); + /// assert_eq!(config.db_config.sync_mode, SyncMode::default()); + /// assert_eq!(config.db_config.resize_algorithm, ResizeAlgorithm::default()); + /// assert_eq!(config.reader_threads, ReaderThreads::default()); + /// ``` + pub fn new() -> Self { + Config { + db_config: DbConfig::new(Cow::Borrowed(&*CUPRATE_TXPOOL_DIR)), + reader_threads: ReaderThreads::default(), + max_txpool_weight: 0, + } + } +} + +impl Default for Config { + /// Same as [`Config::new`]. + /// + /// ```rust + /// # use cuprate_txpool::Config; + /// assert_eq!(Config::default(), Config::new()); + /// ``` + fn default() -> Self { + Self::new() + } +} diff --git a/storage/txpool/src/free.rs b/storage/txpool/src/free.rs new file mode 100644 index 00000000..d394002b --- /dev/null +++ b/storage/txpool/src/free.rs @@ -0,0 +1,62 @@ +//! General free functions (related to the tx-pool database). + +//---------------------------------------------------------------------------------------------------- Import +use cuprate_database::{ConcreteEnv, Env, EnvInner, InitError, RuntimeError, TxRw}; + +use crate::{config::Config, tables::OpenTables}; + +//---------------------------------------------------------------------------------------------------- Free functions +/// Open the txpool database using the passed [`Config`]. +/// +/// This calls [`cuprate_database::Env::open`] and prepares the +/// database to be ready for txpool-related usage, e.g. +/// table creation, table sort order, etc. +/// +/// All tables found in [`crate::tables`] will be +/// ready for usage in the returned [`ConcreteEnv`]. +/// +/// # Errors +/// This will error if: +/// - The database file could not be opened +/// - A write transaction could not be opened +/// - A table could not be created/opened +#[cold] +#[inline(never)] // only called once +pub fn open(config: Config) -> Result { + // Attempt to open the database environment. + let env = ::open(config.db_config)?; + + /// Convert runtime errors to init errors. + /// + /// INVARIANT: + /// [`cuprate_database`]'s functions mostly return the former + /// so we must convert them. We have knowledge of which errors + /// makes sense in this functions context so we panic on + /// unexpected ones. + fn runtime_to_init_error(runtime: RuntimeError) -> InitError { + match runtime { + RuntimeError::Io(io_error) => io_error.into(), + + // These errors shouldn't be happening here. + RuntimeError::KeyExists + | RuntimeError::KeyNotFound + | RuntimeError::ResizeNeeded + | RuntimeError::TableNotFound => unreachable!(), + } + } + + // INVARIANT: We must ensure that all tables are created, + // `cuprate_database` has no way of knowing _which_ tables + // we want since it is agnostic, so we are responsible for this. + { + let env_inner = env.env_inner(); + let tx_rw = env_inner.tx_rw().map_err(runtime_to_init_error)?; + + // Create all tables. + OpenTables::create_tables(&env_inner, &tx_rw).map_err(runtime_to_init_error)?; + + TxRw::commit(tx_rw).map_err(runtime_to_init_error)?; + } + + Ok(env) +} diff --git a/storage/txpool/src/lib.rs b/storage/txpool/src/lib.rs index 8b137891..f200c348 100644 --- a/storage/txpool/src/lib.rs +++ b/storage/txpool/src/lib.rs @@ -1 +1,15 @@ +#![doc = include_str!("../README.md")] +pub mod config; +mod free; +pub mod ops; +#[cfg(feature = "service")] +pub mod service; +pub mod tables; +pub mod types; + +pub use config::Config; +pub use free::open; + +//re-exports +pub use cuprate_database; diff --git a/storage/txpool/src/ops.rs b/storage/txpool/src/ops.rs new file mode 100644 index 00000000..50d9ea4a --- /dev/null +++ b/storage/txpool/src/ops.rs @@ -0,0 +1,102 @@ +//! Abstracted Monero tx-pool database operations. +//! +//! This module contains many free functions that use the +//! traits in [`cuprate_database`] to generically call Monero-related +//! tx-pool database operations. +//! +//! # `impl Table` +//! Functions in this module take [`Tables`](crate::tables::Tables) and +//! [`TablesMut`](crate::tables::TablesMut) directly - these are +//! _already opened_ database tables. +//! +//! As such, the responsibility of +//! transactions, tables, etc, are on the caller. +//! +//! Notably, this means that these functions are as lean +//! as possible, so calling them in a loop should be okay. +//! +//! # Atomicity +//! As transactions are handled by the _caller_ of these functions, +//! it is up to the caller to decide what happens if one them return +//! an error. +//! +//! To maintain atomicity, transactions should be [`abort`](cuprate_database::TxRw::abort)ed +//! if one of the functions failed. +//! +//! For example, if [`add_transaction`] is called and returns an [`Err`], +//! `abort`ing the transaction that opened the input `TableMut` would reverse all tables +//! mutated by [`add_transaction`] up until the error, leaving it in the state it was in before +//! [`add_transaction`] was called. +//! +//! # Example +//! Simple usage of `ops`. +//! +//! ```rust +//! use hex_literal::hex; +//! +//! use cuprate_test_utils::data::TX_V1_SIG2; +//! use cuprate_txpool::{ +//! cuprate_database::{ +//! ConcreteEnv, +//! Env, EnvInner, +//! DatabaseRo, DatabaseRw, TxRo, TxRw, +//! }, +//! config::ConfigBuilder, +//! tables::{Tables, TablesMut, OpenTables}, +//! ops::{add_transaction, get_transaction_verification_data}, +//! }; +//! +//! # fn main() -> Result<(), Box> { +//! // Create a configuration for the database environment. +//! let tmp_dir = tempfile::tempdir()?; +//! let db_dir = tmp_dir.path().to_owned(); +//! let config = ConfigBuilder::new() +//! .db_directory(db_dir.into()) +//! .build(); +//! +//! // Initialize the database environment. +//! let env = cuprate_txpool::open(config)?; +//! +//! // Open up a transaction + tables for writing. +//! let env_inner = env.env_inner(); +//! let tx_rw = env_inner.tx_rw()?; +//! let mut tables = env_inner.open_tables_mut(&tx_rw)?; +//! +//! // Write a tx to the database. +//! let mut tx = TX_V1_SIG2.clone(); +//! let tx_hash = tx.tx_hash; +//! add_transaction(&tx.try_into().unwrap(), true, &mut tables)?; +//! +//! // Commit the data written. +//! drop(tables); +//! TxRw::commit(tx_rw)?; +//! +//! // Read the data, assert it is correct. +//! let tx_rw = env_inner.tx_rw()?; +//! let mut tables = env_inner.open_tables_mut(&tx_rw)?; +//! let tx = get_transaction_verification_data(&tx_hash, &mut tables)?; +//! +//! assert_eq!(tx.tx_hash, tx_hash); +//! assert_eq!(tx.tx, TX_V1_SIG2.tx); +//! # Ok(()) } +//! ``` + +mod key_images; +mod tx_read; +mod tx_write; + +pub use tx_read::get_transaction_verification_data; +pub use tx_write::{add_transaction, remove_transaction}; + +/// An error that can occur on some tx-write ops. +#[derive(thiserror::Error, Debug)] +pub enum TxPoolWriteError { + /// The transaction could not be added as it double spends another tx in the pool. + /// + /// The inner value is the hash of the transaction that was double spent. + #[error("Transaction doubles spent transaction already in the pool ({}).", hex::encode(.0))] + DoubleSpend(crate::types::TransactionHash), + /// A database error. + #[error("Database error: {0}")] + Database(#[from] cuprate_database::RuntimeError), +} diff --git a/storage/txpool/src/ops/key_images.rs b/storage/txpool/src/ops/key_images.rs new file mode 100644 index 00000000..c6e44152 --- /dev/null +++ b/storage/txpool/src/ops/key_images.rs @@ -0,0 +1,54 @@ +//! Tx-pool key image ops. +use monero_serai::transaction::Input; + +use cuprate_database::{DatabaseRw, RuntimeError}; + +use crate::{ops::TxPoolWriteError, tables::SpentKeyImages, types::TransactionHash}; + +/// Adds the transaction key images to the [`SpentKeyImages`] table. +/// +/// This function will return an error if any of the key images are already spent. +/// +/// # Panics +/// This function will panic if any of the [`Input`]s are not [`Input::ToKey`] +pub fn add_tx_key_images( + inputs: &[Input], + tx_hash: &TransactionHash, + kis_table: &mut impl DatabaseRw, +) -> Result<(), TxPoolWriteError> { + for ki in inputs.iter().map(ki_from_input) { + if let Ok(double_spend_tx_hash) = kis_table.get(&ki) { + return Err(TxPoolWriteError::DoubleSpend(double_spend_tx_hash)); + } + + kis_table.put(&ki, tx_hash)?; + } + + Ok(()) +} + +/// Removes key images from the [`SpentKeyImages`] table. +/// +/// # Panics +/// This function will panic if any of the [`Input`]s are not [`Input::ToKey`] +pub fn remove_tx_key_images( + inputs: &[Input], + kis_table: &mut impl DatabaseRw, +) -> Result<(), RuntimeError> { + for ki in inputs.iter().map(ki_from_input) { + kis_table.delete(&ki)?; + } + + Ok(()) +} + +/// Maps an input to a key image. +/// +/// # Panics +/// This function will panic if the [`Input`] is not [`Input::ToKey`] +fn ki_from_input(input: &Input) -> [u8; 32] { + match input { + Input::ToKey { key_image, .. } => key_image.compress().0, + Input::Gen(_) => panic!("miner tx cannot be added to the txpool"), + } +} diff --git a/storage/txpool/src/ops/tx_read.rs b/storage/txpool/src/ops/tx_read.rs new file mode 100644 index 00000000..db894151 --- /dev/null +++ b/storage/txpool/src/ops/tx_read.rs @@ -0,0 +1,36 @@ +//! Transaction read ops. +//! +//! This module handles reading full transaction data, like getting a transaction from the pool. +use std::sync::Mutex; + +use monero_serai::transaction::Transaction; + +use cuprate_database::{DatabaseRo, RuntimeError}; +use cuprate_types::{TransactionVerificationData, TxVersion}; + +use crate::{tables::Tables, types::TransactionHash}; + +/// Gets the [`TransactionVerificationData`] of a transaction in the tx-pool, leaving the tx in the pool. +pub fn get_transaction_verification_data( + tx_hash: &TransactionHash, + tables: &impl Tables, +) -> Result { + let tx_blob = tables.transaction_blobs().get(tx_hash)?.0; + + let tx_info = tables.transaction_infos().get(tx_hash)?; + + let cached_verification_state = tables.cached_verification_state().get(tx_hash)?.into(); + + let tx = + Transaction::read(&mut tx_blob.as_slice()).expect("Tx in the tx-pool must be parseable"); + + Ok(TransactionVerificationData { + version: TxVersion::from_raw(tx.version()).expect("Tx in tx-pool has invalid version"), + tx, + tx_blob, + tx_weight: tx_info.weight, + fee: tx_info.fee, + tx_hash: *tx_hash, + cached_verification_state: Mutex::new(cached_verification_state), + }) +} diff --git a/storage/txpool/src/ops/tx_write.rs b/storage/txpool/src/ops/tx_write.rs new file mode 100644 index 00000000..9885b9c5 --- /dev/null +++ b/storage/txpool/src/ops/tx_write.rs @@ -0,0 +1,83 @@ +//! Transaction writing ops. +//! +//! This module handles writing full transaction data, like removing or adding a transaction. +use bytemuck::TransparentWrapper; +use monero_serai::transaction::{NotPruned, Transaction}; + +use cuprate_database::{DatabaseRw, RuntimeError, StorableVec}; +use cuprate_types::TransactionVerificationData; + +use crate::{ + ops::{ + key_images::{add_tx_key_images, remove_tx_key_images}, + TxPoolWriteError, + }, + tables::TablesMut, + types::{TransactionHash, TransactionInfo, TxStateFlags}, +}; + +/// Adds a transaction to the tx-pool. +/// +/// This function fills in all tables necessary to add the transaction to the pool. +/// +/// # Panics +/// This function will panic if the transactions inputs are not all of type [`Input::ToKey`](monero_serai::transaction::Input::ToKey). +pub fn add_transaction( + tx: &TransactionVerificationData, + state_stem: bool, + tables: &mut impl TablesMut, +) -> Result<(), TxPoolWriteError> { + // Add the tx blob to table 0. + tables + .transaction_blobs_mut() + .put(&tx.tx_hash, StorableVec::wrap_ref(&tx.tx_blob))?; + + let mut flags = TxStateFlags::empty(); + flags.set(TxStateFlags::STATE_STEM, state_stem); + + // Add the tx info to table 1. + tables.transaction_infos_mut().put( + &tx.tx_hash, + &TransactionInfo { + fee: tx.fee, + weight: tx.tx_weight, + flags, + _padding: [0; 7], + }, + )?; + + // Add the cached verification state to table 2. + let cached_verification_state = (*tx.cached_verification_state.lock().unwrap()).into(); + tables + .cached_verification_state_mut() + .put(&tx.tx_hash, &cached_verification_state)?; + + // Add the tx key images to table 3. + let kis_table = tables.spent_key_images_mut(); + add_tx_key_images(&tx.tx.prefix().inputs, &tx.tx_hash, kis_table)?; + + Ok(()) +} + +/// Removes a transaction from the transaction pool. +pub fn remove_transaction( + tx_hash: &TransactionHash, + tables: &mut impl TablesMut, +) -> Result<(), RuntimeError> { + // Remove the tx blob from table 0. + let tx_blob = tables.transaction_blobs_mut().take(tx_hash)?.0; + + // Remove the tx info from table 1. + tables.transaction_infos_mut().delete(tx_hash)?; + + // Remove the cached verification state from table 2. + tables.cached_verification_state_mut().delete(tx_hash)?; + + // Remove the tx key images from table 3. + let tx = Transaction::::read(&mut tx_blob.as_slice()) + .expect("Tx in the tx-pool must be parseable"); + let kis_table = tables.spent_key_images_mut(); + remove_tx_key_images(&tx.prefix().inputs, kis_table)?; + + Ok(()) +} diff --git a/storage/txpool/src/service.rs b/storage/txpool/src/service.rs new file mode 100644 index 00000000..d87adcea --- /dev/null +++ b/storage/txpool/src/service.rs @@ -0,0 +1,136 @@ +//! [`tower::Service`] integeration + thread-pool. +//! +//! ## `service` +//! The `service` module implements the [`tower`] integration, +//! along with the reader/writer thread-pool system. +//! +//! The thread-pool allows outside crates to communicate with it by +//! sending database [`Request`][req_r]s and receiving [`Response`][resp]s `async`hronously - +//! without having to actually worry and handle the database themselves. +//! +//! The system is managed by this crate, and only requires [`init`] by the user. +//! +//! This module must be enabled with the `service` feature. +//! +//! ## Handles +//! The 2 handles to the database are: +//! - [`TxpoolReadHandle`] +//! - [`TxpoolWriteHandle`] +//! +//! The 1st allows any caller to send [`ReadRequest`][req_r]s. +//! +//! The 2nd allows any caller to send [`WriteRequest`][req_w]s. +//! +//! The `DatabaseReadHandle` can be shared as it is cheaply [`Clone`]able, however, +//! the `DatabaseWriteHandle` cannot be cloned. There is only 1 place in Cuprate that +//! writes, so it is passed there and used. +//! +//! ## Initialization +//! The database & thread-pool system can be initialized with [`init()`]. +//! +//! This causes the underlying database/threads to be setup +//! and returns a read/write handle to that database. +//! +//! ## Shutdown +//! Upon the above handles being dropped, the corresponding thread(s) will automatically exit, i.e: +//! - The last [`TxpoolReadHandle`] is dropped => reader thread-pool exits +//! - The last [`TxpoolWriteHandle`] is dropped => writer thread exits +//! +//! Upon dropping the [`cuprate_database::Env`]: +//! - All un-processed database transactions are completed +//! - All data gets flushed to disk (caused by [`Drop::drop`] impl on `Env`) +//! +//! ## Request and Response +//! To interact with the database (whether reading or writing data), +//! a `Request` can be sent using one of the above handles. +//! +//! Both the handles implement `tower::Service`, so they can be [`tower::Service::call`]ed. +//! +//! An `async`hronous channel will be returned from the call. +//! This channel can be `.await`ed upon to (eventually) receive +//! the corresponding `Response` to your `Request`. +//! +//! [req_r]: interface::TxpoolReadRequest +//! +//! [req_w]: interface::TxpoolWriteRequest +//! +//! // TODO: we have 2 responses +//! +//! [resp]: interface::TxpoolWriteResponse +//! +//! # Example +//! Simple usage of `service`. +//! +//! ```rust +//! use std::sync::Arc; +//! +//! use hex_literal::hex; +//! use tower::{Service, ServiceExt}; +//! +//! use cuprate_test_utils::data::TX_V1_SIG2; +//! +//! use cuprate_txpool::{ +//! cuprate_database::Env, +//! config::ConfigBuilder, +//! service::interface::{ +//! TxpoolWriteRequest, +//! TxpoolWriteResponse, +//! TxpoolReadRequest, +//! TxpoolReadResponse +//! } +//! }; +//! +//! # #[tokio::main] +//! # async fn main() -> Result<(), Box> { +//! // Create a configuration for the database environment. +//! use cuprate_test_utils::data::TX_V1_SIG2; +//! let tmp_dir = tempfile::tempdir()?; +//! let db_dir = tmp_dir.path().to_owned(); +//! let config = ConfigBuilder::new() +//! .db_directory(db_dir.into()) +//! .build(); +//! +//! // Initialize the database thread-pool. +//! let (mut read_handle, mut write_handle, _) = cuprate_txpool::service::init(config)?; +//! +//! // Prepare a request to write block. +//! let tx = TX_V1_SIG2.clone(); +//! let request = TxpoolWriteRequest::AddTransaction { +//! tx: Arc::new(tx.try_into().unwrap()), +//! state_stem: false, +//! }; +//! +//! // Send the request. +//! // We receive back an `async` channel that will +//! // eventually yield the result when `service` +//! // is done writing the tx. +//! let response_channel = write_handle.ready().await?.call(request); +//! +//! // Block write was OK. +//! let TxpoolWriteResponse::AddTransaction(double_spent) = response_channel.await? else { +//! panic!("tx-pool returned wrong response!"); +//! }; +//! assert!(double_spent.is_none()); +//! +//! // Now, let's try getting the block hash +//! // of the block we just wrote. +//! let request = TxpoolReadRequest::TxBlob(TX_V1_SIG2.tx_hash); +//! let response_channel = read_handle.ready().await?.call(request); +//! let response = response_channel.await?; +//! +//! // This causes the writer thread on the +//! // other side of this handle to exit... +//! drop(write_handle); +//! // ...and this causes the reader thread-pool to exit. +//! drop(read_handle); +//! # Ok(()) } +//! ``` + +mod free; +pub mod interface; +mod read; +mod types; +mod write; + +pub use free::init; +pub use types::{TxpoolReadHandle, TxpoolWriteHandle}; diff --git a/storage/txpool/src/service/free.rs b/storage/txpool/src/service/free.rs new file mode 100644 index 00000000..614ab5c4 --- /dev/null +++ b/storage/txpool/src/service/free.rs @@ -0,0 +1,37 @@ +use std::sync::Arc; + +use cuprate_database::{ConcreteEnv, InitError}; + +use crate::{ + service::{ + read::init_read_service, + types::{TxpoolReadHandle, TxpoolWriteHandle}, + write::init_write_service, + }, + Config, +}; + +//---------------------------------------------------------------------------------------------------- Init +#[cold] +#[inline(never)] // Only called once (?) +/// Initialize a database & thread-pool, and return a read/write handle to it. +/// +/// Once the returned handles are [`Drop::drop`]ed, the reader +/// thread-pool and writer thread will exit automatically. +/// +/// # Errors +/// This will forward the error if [`crate::open`] failed. +pub fn init( + config: Config, +) -> Result<(TxpoolReadHandle, TxpoolWriteHandle, Arc), InitError> { + let reader_threads = config.reader_threads; + + // Initialize the database itself. + let db = Arc::new(crate::open(config)?); + + // Spawn the Reader thread pool and Writer. + let readers = init_read_service(db.clone(), reader_threads); + let writer = init_write_service(db.clone()); + + Ok((readers, writer, db)) +} diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs new file mode 100644 index 00000000..93235c00 --- /dev/null +++ b/storage/txpool/src/service/interface.rs @@ -0,0 +1,59 @@ +//! Tx-pool [`service`](super) interface. +//! +//! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums. +use std::sync::Arc; + +use cuprate_types::TransactionVerificationData; + +use crate::types::TransactionHash; + +//---------------------------------------------------------------------------------------------------- TxpoolReadRequest +/// The transaction pool [`tower::Service`] read request type. +pub enum TxpoolReadRequest { + /// A request for the blob (raw bytes) of a transaction with the given hash. + TxBlob(TransactionHash), + /// A request for the [`TransactionVerificationData`] of a transaction in the tx pool. + TxVerificationData(TransactionHash), +} + +//---------------------------------------------------------------------------------------------------- TxpoolReadResponse +/// The transaction pool [`tower::Service`] read response type. +#[allow(clippy::large_enum_variant)] +pub enum TxpoolReadResponse { + /// A response containing the raw bytes of a transaction. + // TODO: use bytes::Bytes. + TxBlob(Vec), + /// A response of [`TransactionVerificationData`]. + TxVerificationData(TransactionVerificationData), +} + +//---------------------------------------------------------------------------------------------------- TxpoolWriteRequest +/// The transaction pool [`tower::Service`] write request type. +pub enum TxpoolWriteRequest { + /// Add a transaction to the pool. + /// + /// Returns [`TxpoolWriteResponse::AddTransaction`]. + AddTransaction { + /// The tx to add. + tx: Arc, + /// A [`bool`] denoting the routing state of this tx. + /// + /// [`true`] if this tx is in the stem state. + state_stem: bool, + }, + /// Remove a transaction with the given hash from the pool. + /// + /// Returns [`TxpoolWriteResponse::Ok`]. + RemoveTransaction(TransactionHash), +} + +//---------------------------------------------------------------------------------------------------- TxpoolWriteResponse +/// The transaction pool [`tower::Service`] write response type. +#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)] +pub enum TxpoolWriteResponse { + /// A [`TxpoolWriteRequest::AddTransaction`] response. + /// + /// If the inner value is [`Some`] the tx was not added to the pool as it double spends a tx with the given hash. + AddTransaction(Option), + Ok, +} diff --git a/storage/txpool/src/service/read.rs b/storage/txpool/src/service/read.rs new file mode 100644 index 00000000..c2fee66d --- /dev/null +++ b/storage/txpool/src/service/read.rs @@ -0,0 +1,105 @@ +use std::sync::Arc; + +use rayon::ThreadPool; + +use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner}; +use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; + +use crate::{ + ops::get_transaction_verification_data, + service::{ + interface::{TxpoolReadRequest, TxpoolReadResponse}, + types::{ReadResponseResult, TxpoolReadHandle}, + }, + tables::{OpenTables, TransactionBlobs}, + types::TransactionHash, +}; + +// TODO: update the docs here +//---------------------------------------------------------------------------------------------------- init_read_service +/// Initialize the [`TxpoolReadHandle`] thread-pool backed by `rayon`. +/// +/// This spawns `threads` amount of reader threads +/// attached to `env` and returns a handle to the pool. +/// +/// Should be called _once_ per actual database. +#[cold] +#[inline(never)] // Only called once. +pub fn init_read_service(env: Arc, threads: ReaderThreads) -> TxpoolReadHandle { + init_read_service_with_pool(env, init_thread_pool(threads)) +} + +/// Initialize the [`TxpoolReadHandle`], with a specific rayon thread-pool instead of +/// creating a new one. +/// +/// Should be called _once_ per actual database. +#[cold] +#[inline(never)] // Only called once. +pub fn init_read_service_with_pool( + env: Arc, + pool: Arc, +) -> TxpoolReadHandle { + DatabaseReadService::new(env, pool, map_request) +} + +//---------------------------------------------------------------------------------------------------- Request Mapping +// This function maps [`Request`]s to function calls +// executed by the rayon DB reader threadpool. + +/// Map [`TxpoolReadRequest`]'s to specific database handler functions. +/// +/// This is the main entrance into all `Request` handler functions. +/// The basic structure is: +/// 1. `Request` is mapped to a handler function +/// 2. Handler function is called +/// 3. [`TxpoolReadResponse`] is returned +fn map_request( + env: &ConcreteEnv, // Access to the database + request: TxpoolReadRequest, // The request we must fulfill +) -> ReadResponseResult { + match request { + TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash), + TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash), + } +} + +//---------------------------------------------------------------------------------------------------- Handler functions +// These are the actual functions that do stuff according to the incoming [`TxpoolReadRequest`]. +// +// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to +// the enum variant name, e.g: `TxBlob` -> `tx_blob`. +// +// Each function will return the [`TxpoolReadResponse`] that we +// should send back to the caller in [`map_request()`]. +// +// INVARIANT: +// These functions are called above in `tower::Service::call()` +// using a custom threadpool which means any call to `par_*()` functions +// will be using the custom rayon DB reader thread-pool, not the global one. +// +// All functions below assume that this is the case, such that +// `par_*()` functions will not block the _global_ rayon thread-pool. + +/// [`TxpoolReadRequest::TxBlob`]. +#[inline] +fn tx_blob(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult { + let inner_env = env.env_inner(); + let tx_ro = inner_env.tx_ro()?; + + let tx_blobs_table = inner_env.open_db_ro::(&tx_ro)?; + + tx_blobs_table + .get(tx_hash) + .map(|blob| TxpoolReadResponse::TxBlob(blob.0)) +} + +/// [`TxpoolReadRequest::TxVerificationData`]. +#[inline] +fn tx_verification_data(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult { + let inner_env = env.env_inner(); + let tx_ro = inner_env.tx_ro()?; + + let tables = inner_env.open_tables(&tx_ro)?; + + get_transaction_verification_data(tx_hash, &tables).map(TxpoolReadResponse::TxVerificationData) +} diff --git a/storage/txpool/src/service/types.rs b/storage/txpool/src/service/types.rs new file mode 100644 index 00000000..5c6b97ce --- /dev/null +++ b/storage/txpool/src/service/types.rs @@ -0,0 +1,21 @@ +//! Database service type aliases. +//! +//! Only used internally for our [`tower::Service`] impls. + +use cuprate_database::RuntimeError; +use cuprate_database_service::{DatabaseReadService, DatabaseWriteHandle}; + +use crate::service::interface::{ + TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse, +}; + +/// The actual type of the response. +/// +/// Either our [`TxpoolReadResponse`], or a database error occurred. +pub(super) type ReadResponseResult = Result; + +/// The transaction pool database write service. +pub type TxpoolWriteHandle = DatabaseWriteHandle; + +/// The transaction pool database read service. +pub type TxpoolReadHandle = DatabaseReadService; diff --git a/storage/txpool/src/service/write.rs b/storage/txpool/src/service/write.rs new file mode 100644 index 00000000..f6bdb385 --- /dev/null +++ b/storage/txpool/src/service/write.rs @@ -0,0 +1,103 @@ +use std::sync::Arc; + +use cuprate_database::{ConcreteEnv, Env, EnvInner, RuntimeError, TxRw}; +use cuprate_database_service::DatabaseWriteHandle; +use cuprate_types::TransactionVerificationData; + +use crate::{ + ops::{self, TxPoolWriteError}, + service::{ + interface::{TxpoolWriteRequest, TxpoolWriteResponse}, + types::TxpoolWriteHandle, + }, + tables::OpenTables, + types::TransactionHash, +}; + +//---------------------------------------------------------------------------------------------------- init_write_service +/// Initialize the txpool write service from a [`ConcreteEnv`]. +pub fn init_write_service(env: Arc) -> TxpoolWriteHandle { + DatabaseWriteHandle::init(env, handle_txpool_request) +} + +//---------------------------------------------------------------------------------------------------- handle_txpool_request +/// Handle an incoming [`TxpoolWriteRequest`], returning a [`TxpoolWriteResponse`]. +fn handle_txpool_request( + env: &ConcreteEnv, + req: &TxpoolWriteRequest, +) -> Result { + match req { + TxpoolWriteRequest::AddTransaction { tx, state_stem } => { + add_transaction(env, tx, *state_stem) + } + TxpoolWriteRequest::RemoveTransaction(tx_hash) => remove_transaction(env, tx_hash), + } +} + +//---------------------------------------------------------------------------------------------------- Handler functions +// These are the actual functions that do stuff according to the incoming [`TxpoolWriteRequest`]. +// +// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to +// the enum variant name, e.g: `BlockExtendedHeader` -> `block_extended_header`. +// +// Each function will return the [`Response`] that we +// should send back to the caller in [`map_request()`]. + +/// [`TxpoolWriteRequest::AddTransaction`] +fn add_transaction( + env: &ConcreteEnv, + tx: &TransactionVerificationData, + state_stem: bool, +) -> Result { + let env_inner = env.env_inner(); + let tx_rw = env_inner.tx_rw()?; + + let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?; + + if let Err(e) = ops::add_transaction(tx, state_stem, &mut tables_mut) { + drop(tables_mut); + // error adding the tx, abort the DB transaction. + TxRw::abort(tx_rw) + .expect("could not maintain database atomicity by aborting write transaction"); + + return match e { + TxPoolWriteError::DoubleSpend(tx_hash) => { + // If we couldn't add the tx due to a double spend still return ok, but include the tx + // this double spent. + // TODO: mark the double spent tx? + Ok(TxpoolWriteResponse::AddTransaction(Some(tx_hash))) + } + TxPoolWriteError::Database(e) => Err(e), + }; + }; + + drop(tables_mut); + // The tx was added to the pool successfully. + TxRw::commit(tx_rw)?; + Ok(TxpoolWriteResponse::AddTransaction(None)) +} + +/// [`TxpoolWriteRequest::RemoveTransaction`] +fn remove_transaction( + env: &ConcreteEnv, + tx_hash: &TransactionHash, +) -> Result { + let env_inner = env.env_inner(); + let tx_rw = env_inner.tx_rw()?; + + let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?; + + if let Err(e) = ops::remove_transaction(tx_hash, &mut tables_mut) { + drop(tables_mut); + // error removing the tx, abort the DB transaction. + TxRw::abort(tx_rw) + .expect("could not maintain database atomicity by aborting write transaction"); + + return Err(e); + } + + drop(tables_mut); + + TxRw::commit(tx_rw)?; + Ok(TxpoolWriteResponse::Ok) +} diff --git a/storage/txpool/src/tables.rs b/storage/txpool/src/tables.rs new file mode 100644 index 00000000..dbb686ae --- /dev/null +++ b/storage/txpool/src/tables.rs @@ -0,0 +1,45 @@ +//! Tx-pool Database tables. +//! +//! # Table marker structs +//! This module contains all the table definitions used by [`cuprate_txpool`](crate). +//! +//! The zero-sized structs here represents the table type; +//! they all are essentially marker types that implement [`cuprate_database::Table`]. +//! +//! Table structs are `CamelCase`, and their static string +//! names used by the actual database backend are `snake_case`. +//! +//! For example: [`TransactionBlobs`] -> `transaction_blobs`. +//! +//! # Traits +//! This module also contains a set of traits for +//! accessing _all_ tables defined here at once. +use cuprate_database::{define_tables, StorableVec}; + +use crate::types::{KeyImage, RawCachedVerificationState, TransactionHash, TransactionInfo}; + +define_tables! { + /// Serialized transaction blobs. + /// + /// This table contains the transaction blobs of all the transactions in the pool. + 0 => TransactionBlobs, + TransactionHash => StorableVec, + + /// Transaction information. + /// + /// This table contains information of all transactions currently in the pool. + 1 => TransactionInfos, + TransactionHash => TransactionInfo, + + /// Cached transaction verification state. + /// + /// This table contains the cached verification state of all translations in the pool. + 2 => CachedVerificationState, + TransactionHash => RawCachedVerificationState, + + /// Spent key images. + /// + /// This table contains the spent key images from all transactions in the pool. + 3 => SpentKeyImages, + KeyImage => TransactionHash +} diff --git a/storage/txpool/src/types.rs b/storage/txpool/src/types.rs new file mode 100644 index 00000000..5c89d3b9 --- /dev/null +++ b/storage/txpool/src/types.rs @@ -0,0 +1,124 @@ +//! Tx-pool [table](crate::tables) types. +//! +//! This module contains all types used by the database tables, +//! and aliases for common types that use the same underlying +//! primitive type. +//! +//! +use bytemuck::{Pod, Zeroable}; + +use monero_serai::transaction::Timelock; + +use cuprate_types::{CachedVerificationState, HardFork}; + +/// An inputs key image. +pub type KeyImage = [u8; 32]; + +/// A transaction hash. +pub type TransactionHash = [u8; 32]; + +bitflags::bitflags! { + /// Flags representing the state of the transaction in the pool. + #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)] + #[repr(transparent)] + pub struct TxStateFlags: u8 { + /// A flag for if the transaction is in the stem state. + const STATE_STEM = 0b0000_0001; + /// A flag for if we have seen another tx double spending this tx. + const DOUBLE_SPENT = 0b0000_0010; + } +} + +/// Information on a tx-pool transaction. +#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)] +#[repr(C)] +pub struct TransactionInfo { + /// The transaction's fee. + pub fee: u64, + /// The transaction`s weight. + pub weight: usize, + /// [`TxStateFlags`] of this transaction. + pub flags: TxStateFlags, + /// Explicit padding so that we have no implicit padding bytes in `repr(C)`. + /// + /// Allows potential future expansion of this type. + pub _padding: [u8; 7], +} + +/// [`CachedVerificationState`] in a format that can be stored into the database. +/// +/// This type impls [`Into`] & [`From`] [`CachedVerificationState`]. +#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)] +#[repr(C)] +pub struct RawCachedVerificationState { + /// The raw hash, will be all `0`s if there is no block hash that this is valid for. + raw_valid_at_hash: [u8; 32], + /// The raw hard-fork, will be `0` if there is no hf this was validated at. + raw_hf: u8, + /// The raw [`u64`] timestamp as little endian bytes ([`u64::to_le_bytes`]). + /// + /// This will be `0` if there is no timestamp that needs to be passed for this to + /// be valid. + /// + /// Not a [`u64`] as if it was this type would have an alignment requirement. + raw_valid_past_timestamp: [u8; 8], +} + +impl From for CachedVerificationState { + fn from(value: RawCachedVerificationState) -> Self { + // if the hash is all `0`s then there is no hash this is valid at. + if value.raw_valid_at_hash == [0; 32] { + return CachedVerificationState::NotVerified; + } + + let raw_valid_past_timestamp = u64::from_le_bytes(value.raw_valid_past_timestamp); + + // if the timestamp is 0, there is no timestamp that needs to be passed. + if raw_valid_past_timestamp == 0 { + return CachedVerificationState::ValidAtHashAndHF { + block_hash: value.raw_valid_at_hash, + hf: HardFork::from_version(value.raw_hf) + .expect("hard-fork values stored in the DB should always be valid"), + }; + } + + CachedVerificationState::ValidAtHashAndHFWithTimeBasedLock { + block_hash: value.raw_valid_at_hash, + hf: HardFork::from_version(value.raw_hf) + .expect("hard-fork values stored in the DB should always be valid"), + time_lock: Timelock::Time(raw_valid_past_timestamp), + } + } +} + +impl From for RawCachedVerificationState { + fn from(value: CachedVerificationState) -> Self { + match value { + CachedVerificationState::NotVerified => Self { + raw_valid_at_hash: [0; 32], + raw_hf: 0, + raw_valid_past_timestamp: [0; 8], + }, + CachedVerificationState::ValidAtHashAndHF { block_hash, hf } => Self { + raw_valid_at_hash: block_hash, + raw_hf: hf.as_u8(), + raw_valid_past_timestamp: [0; 8], + }, + CachedVerificationState::ValidAtHashAndHFWithTimeBasedLock { + block_hash, + hf, + time_lock, + } => { + let Timelock::Time(time) = time_lock else { + panic!("ValidAtHashAndHFWithTimeBasedLock timelock was not time-based"); + }; + + Self { + raw_valid_at_hash: block_hash, + raw_hf: hf.as_u8(), + raw_valid_past_timestamp: time.to_le_bytes(), + } + } + } + } +} diff --git a/types/src/transaction_verification_data.rs b/types/src/transaction_verification_data.rs index 68e17b81..3dfe5fdf 100644 --- a/types/src/transaction_verification_data.rs +++ b/types/src/transaction_verification_data.rs @@ -4,7 +4,7 @@ use std::sync::Mutex; use monero_serai::transaction::{Timelock, Transaction}; -use crate::HardFork; +use crate::{HardFork, VerifiedTransactionInformation}; /// An enum representing all valid Monero transaction versions. #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] @@ -92,3 +92,23 @@ pub struct TransactionVerificationData { /// The verification state of this transaction. pub cached_verification_state: Mutex, } + +#[derive(Debug, Copy, Clone, thiserror::Error)] +#[error("Error converting a verified tx to a cached verification data tx.")] +pub struct TxConversionError; + +impl TryFrom for TransactionVerificationData { + type Error = TxConversionError; + + fn try_from(value: VerifiedTransactionInformation) -> Result { + Ok(Self { + version: TxVersion::from_raw(value.tx.version()).ok_or(TxConversionError)?, + tx: value.tx, + tx_blob: value.tx_blob, + tx_weight: value.tx_weight, + fee: value.fee, + tx_hash: value.tx_hash, + cached_verification_state: Mutex::new(CachedVerificationState::NotVerified), + }) + } +}