add read & write svcs
Some checks failed
Audit / audit (push) Has been cancelled
Deny / audit (push) Has been cancelled

This commit is contained in:
Boog900 2024-08-03 01:25:19 +01:00
parent b1f41f17f4
commit 251c467931
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
11 changed files with 176 additions and 20 deletions

2
Cargo.lock generated
View file

@ -806,12 +806,14 @@ name = "cuprate-txpool"
version = "0.0.0"
dependencies = [
"bytemuck",
"cuprate-dandelion-tower",
"cuprate-database",
"cuprate-database-service",
"cuprate-helper",
"cuprate-types",
"hex",
"monero-serai",
"rayon",
"thiserror",
"tower",
]

View file

@ -15,6 +15,7 @@ cuprate-database = { path = "../database" }
cuprate-database-service = { path = "../service" }
cuprate-types = { path = "../../types" }
cuprate-helper = { path = "../../helper" , default-features = false, features = ["constants"] }
cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower" }
monero-serai = { workspace = true, features = ["std"] }
bytemuck = { workspace = true, features = ["must_cast", "derive", "min_const_generics", "extern_crate_alloc"] }
@ -22,4 +23,6 @@ thiserror = { workspace = true }
hex = { workspace = true }
tower = { workspace = true }
rayon = { workspace = true }
[dev-dependencies]

View file

@ -1,13 +1,5 @@
use crate::ops::key_images::{add_tx_key_images, remove_tx_key_images};
use crate::tables::TablesMut;
use crate::types::{TransactionHash, TransactionInfo};
use crate::TxPoolWriteError;
use bytemuck::TransparentWrapper;
use cuprate_database::{RuntimeError, StorableVec};
use cuprate_types::TransactionVerificationData;
use monero_serai::transaction::{Input, Transaction};
use std::sync::{Arc, Mutex};
mod key_images;
mod tx_read;
mod tx_write;
pub use tx_read::get_transaction_verification_data;

View file

@ -1,4 +1,4 @@
use crate::tables::{TablesMut, TransactionBlobs};
use crate::tables::{Tables, TransactionBlobs};
use crate::types::TransactionHash;
use cuprate_database::{DatabaseRw, RuntimeError};
use cuprate_types::TransactionVerificationData;
@ -8,7 +8,7 @@ use std::sync::Mutex;
/// Gets the [`TransactionVerificationData`] of a transaction in the tx-pool, leaving the tx in the pool.
pub fn get_transaction_verification_data(
tx_hash: &TransactionHash,
tables: &mut impl TablesMut,
tables: &impl Tables,
) -> Result<TransactionVerificationData, RuntimeError> {
let tx_blob = tables.transaction_blobs_mut().get(tx_hash)?.0;

View file

@ -1,3 +1,5 @@
mod free;
mod interface;
mod read;
mod types;
mod write;

View file

@ -2,7 +2,8 @@ use std::sync::Arc;
use cuprate_database::{ConcreteEnv, InitError};
use crate::service::types::{TxPoolReadHandle, TxPoolWriteHandle};
use crate::service::read::init_read_service;
use crate::service::types::{TxpoolReadHandle, TxpoolWriteHandle};
use crate::Config;
//---------------------------------------------------------------------------------------------------- Init
@ -17,7 +18,7 @@ use crate::Config;
/// This will forward the error if [`crate::open`] failed.
pub fn init(
config: Config,
) -> Result<(TxPoolReadHandle, TxPoolWriteHandle, Arc<ConcreteEnv>), InitError> {
) -> Result<(TxpoolReadHandle, TxpoolWriteHandle, Arc<ConcreteEnv>), InitError> {
let reader_threads = config.reader_threads;
// Initialize the database itself.

View file

@ -1,4 +1,5 @@
use crate::types::TransactionHash;
use cuprate_dandelion_tower::State;
use cuprate_types::TransactionVerificationData;
use std::sync::Arc;
@ -9,6 +10,8 @@ pub enum TxpoolReadRequest {
TxBlob(TransactionHash),
/// A request for the [`TransactionVerificationData`] of a transaction in the tx pool.
TxVerificationData(TransactionHash),
/// Returns if we have a transaction in the pool.
TxInPool(TransactionHash),
}
//---------------------------------------------------------------------------------------------------- TxpoolReadResponse
@ -18,17 +21,20 @@ pub enum TxpoolReadResponse {
TxBlob(Vec<u8>),
/// A response of [`TransactionVerificationData`].
TxVerificationData(TransactionVerificationData),
TxInPool(Option<State>),
}
//---------------------------------------------------------------------------------------------------- TxpoolWriteRequest
pub enum TxpoolWriteRequest {
AddTransaction {
tx: Arc<TransactionVerificationData>,
state_fluff: bool,
},
AddTransaction(NewTransaction),
RemoveTransaction(TransactionHash),
PromoteTransactionToFluffPool(TransactionHash),
}
//---------------------------------------------------------------------------------------------------- TxpoolWriteResponse
pub enum TxpoolWriteResponse {}
pub struct NewTransaction {
tx: Arc<TransactionVerificationData>,
dpp_state: State,
}

View file

@ -0,0 +1,100 @@
use std::sync::Arc;
use rayon::ThreadPool;
use crate::ops::get_transaction_verification_data;
use crate::service::interface::{TxpoolReadRequest, TxpoolReadResponse};
use crate::service::types::{ReadResponseResult, TxpoolReadHandle};
use crate::tables::{OpenTables, TransactionBlobs};
use crate::types::TransactionHash;
use cuprate_database::{ConcreteEnv, Env, EnvInner, RuntimeError};
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
// TODO: update the docs here
//---------------------------------------------------------------------------------------------------- init_read_service
/// Initialize the [`BCReadHandle`] thread-pool backed by `rayon`.
///
/// This spawns `threads` amount of reader threads
/// attached to `env` and returns a handle to the pool.
///
/// Should be called _once_ per actual database.
#[cold]
#[inline(never)] // Only called once.
pub fn init_read_service(env: Arc<ConcreteEnv>, threads: ReaderThreads) -> TxpoolReadHandle {
init_read_service_with_pool(env, init_thread_pool(threads))
}
/// Initialize the blockchain database read service, with a specific rayon thread-pool instead of
/// creating a new one.
///
/// Should be called _once_ per actual database.
#[cold]
#[inline(never)] // Only called once.
pub fn init_read_service_with_pool(
env: Arc<ConcreteEnv>,
pool: Arc<ThreadPool>,
) -> TxpoolReadHandle {
DatabaseReadService::new(env, pool, map_request)
}
//---------------------------------------------------------------------------------------------------- Request Mapping
// This function maps [`Request`]s to function calls
// executed by the rayon DB reader threadpool.
/// Map [`TxpoolReadRequest`]'s to specific database handler functions.
///
/// This is the main entrance into all `Request` handler functions.
/// The basic structure is:
/// 1. `Request` is mapped to a handler function
/// 2. Handler function is called
/// 3. [`TxpoolReadResponse`] is returned
fn map_request(
env: &ConcreteEnv, // Access to the database
request: TxpoolReadRequest, // The request we must fulfill
) -> ReadResponseResult {
match request {
TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash),
TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash),
}
}
//---------------------------------------------------------------------------------------------------- Handler functions
// These are the actual functions that do stuff according to the incoming [`TxpoolReadRequest`].
//
// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to
// the enum variant name, e.g: `TxBlob` -> `tx_blob`.
//
// Each function will return the [`TxpoolReadResponse`] that we
// should send back to the caller in [`map_request()`].
//
// INVARIANT:
// These functions are called above in `tower::Service::call()`
// using a custom threadpool which means any call to `par_*()` functions
// will be using the custom rayon DB reader thread-pool, not the global one.
//
// All functions below assume that this is the case, such that
// `par_*()` functions will not block the _global_ rayon thread-pool.
/// [`TxpoolReadRequest::TxBlob`].
#[inline]
fn tx_blob(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult {
let inner_env = env.env_inner();
let tx_ro = inner_env.tx_ro()?;
let tx_blobs_table = inner_env.open_db_ro::<TransactionBlobs>(&tx_ro)?;
tx_blobs_table
.get(tx_hash)
.map(|blob| TxpoolReadResponse::TxBlob(blob.0))
}
/// [`TxpoolReadRequest::TxVerificationData`].
#[inline]
fn tx_verification_data(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult {
let inner_env = env.env_inner();
let tx_ro = inner_env.tx_ro()?;
let tables = inner_env.open_tables(&tx_ro)?;
get_transaction_verification_data(tx_hash, &tables).map(TxpoolReadResponse::TxVerificationData)
}

View file

@ -2,14 +2,20 @@
//!
//! Only used internally for our [`tower::Service`] impls.
use cuprate_database::RuntimeError;
use cuprate_database_service::{DatabaseReadService, DatabaseWriteHandle};
use crate::service::interface::{
TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse,
};
/// The actual type of the response.
///
/// Either our [`TxpoolReadResponse`], or a database error occurred.
pub(super) type ReadResponseResult = Result<TxpoolReadResponse, RuntimeError>;
/// The transaction pool database write service.
pub type TxPoolWriteHandle = DatabaseWriteHandle<TxpoolWriteRequest, TxpoolWriteResponse>;
pub type TxpoolWriteHandle = DatabaseWriteHandle<TxpoolWriteRequest, TxpoolWriteResponse>;
/// The transaction pool database read service.
pub type TxPoolReadHandle = DatabaseReadService<TxpoolReadRequest, TxpoolReadResponse>;
pub type TxpoolReadHandle = DatabaseReadService<TxpoolReadRequest, TxpoolReadResponse>;

View file

@ -0,0 +1,39 @@
use cuprate_database::{ConcreteEnv, RuntimeError};
use cuprate_database_service::DatabaseWriteHandle;
use std::sync::Arc;
use cuprate_types::blockchain::BCWriteRequest;
use crate::{
service::{
interface::{TxpoolWriteRequest, TxpoolWriteResponse},
types::TxpoolWriteHandle,
},
TxPoolWriteError,
};
//---------------------------------------------------------------------------------------------------- init_write_service
/// Initialize the txpool write service from a [`ConcreteEnv`].
pub fn init_write_service(env: Arc<ConcreteEnv>) -> TxpoolWriteHandle {
DatabaseWriteHandle::init(env, handle_txpool_request)
}
//---------------------------------------------------------------------------------------------------- handle_txpool_request
/// Handle an incoming [`TxpoolWriteRequest`], returning a [`TxpoolWriteResponse`].
fn handle_txpool_request(
env: &ConcreteEnv,
req: &TxpoolWriteRequest,
) -> Result<TxpoolWriteResponse, TxPoolWriteError> {
match req {
TxpoolWriteRequest::AddTransaction(tx) =>
}
}
//---------------------------------------------------------------------------------------------------- Handler functions
// These are the actual functions that do stuff according to the incoming [`TxpoolWriteRequest`].
//
// Each function name is a 1-1 mapping (from CamelCase -> snake_case) to
// the enum variant name, e.g: `BlockExtendedHeader` -> `block_extended_header`.
//
// Each function will return the [`Response`] that we
// should send back to the caller in [`map_request()`].
/// [`BCWriteRequest::WriteBlock`].

View file

@ -1,5 +1,6 @@
use bytemuck::{Pod, Zeroable};
use cuprate_dandelion_tower::State;
use cuprate_types::{CachedVerificationState, HardFork};
use monero_serai::transaction::Timelock;
@ -17,6 +18,10 @@ pub struct TransactionInfo {
pub double_spend_seen: bool,
}
impl TransactionInfo {
pub fn dpp_state(&self) -> State {}
}
/// [`CachedVerificationState`] in a format that can be stored into the database.
///
/// This type impls [`Into`] & [`From`] [`CachedVerificationState`].