diff --git a/Cargo.lock b/Cargo.lock index 0436f69..271a776 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -911,6 +911,7 @@ dependencies = [ "monero-serai", "rayon", "serde", + "sha3", "tempfile", "thiserror", "tokio", @@ -1010,6 +1011,7 @@ dependencies = [ "serde", "serde_bytes", "serde_json", + "sha3", "thiserror", "thread_local", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 6c322fb..d0dc418 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,7 @@ rayon = { version = "1.10.0", default-features = false } serde_bytes = { version = "0.11.15", default-features = false } serde_json = { version = "1.0.128", default-features = false } serde = { version = "1.0.210", default-features = false } +sha3 = { version = "0.10.8", default-features = false } strum = { version = "0.26.3", default-features = false } thiserror = { version = "1.0.63", default-features = false } thread_local = { version = "1.1.8", default-features = false } diff --git a/binaries/cuprated/Cargo.toml b/binaries/cuprated/Cargo.toml index 325406b..c8a4247 100644 --- a/binaries/cuprated/Cargo.toml +++ b/binaries/cuprated/Cargo.toml @@ -20,7 +20,7 @@ cuprate-levin = { path = "../../net/levin" } cuprate-wire = { path = "../../net/wire" } cuprate-p2p = { path = "../../p2p/p2p" } cuprate-p2p-core = { path = "../../p2p/p2p-core" } -cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower" } +cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower", features = ["txpool"] } cuprate-async-buffer = { path = "../../p2p/async-buffer" } cuprate-address-book = { path = "../../p2p/address-book" } cuprate-blockchain = { path = "../../storage/blockchain", features = ["service"] } @@ -64,6 +64,7 @@ rayon = { workspace = true } serde_bytes = { workspace = true } serde_json = { workspace = true } serde = { workspace = true } +sha3 = { workspace = true, features = ["std"] } thiserror = { workspace = true } thread_local = { workspace = true } tokio-util = { workspace = true } diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index a06f3fa..c4b75e4 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -25,7 +25,7 @@ mod manager; mod syncer; mod types; -use types::{ +pub use types::{ ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle, }; diff --git a/binaries/cuprated/src/blockchain/interface.rs b/binaries/cuprated/src/blockchain/interface.rs index 3a8797e..7f7dfd3 100644 --- a/binaries/cuprated/src/blockchain/interface.rs +++ b/binaries/cuprated/src/blockchain/interface.rs @@ -15,9 +15,11 @@ use tower::{Service, ServiceExt}; use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_consensus::transactions::new_tx_verification_data; use cuprate_helper::cast::usize_to_u64; +use cuprate_txpool::service::interface::{TxpoolReadRequest, TxpoolReadResponse}; +use cuprate_txpool::service::TxpoolReadHandle; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, - Chain, TransactionVerificationData, + Chain, }; use crate::{ @@ -38,7 +40,7 @@ pub enum IncomingBlockError { /// /// The inner values are the block hash and the indexes of the missing txs in the block. #[error("Unknown transactions in block.")] - UnknownTransactions([u8; 32], Vec), + UnknownTransactions([u8; 32], Vec), /// We are missing the block's parent. #[error("The block has an unknown parent.")] Orphan, @@ -59,8 +61,9 @@ pub enum IncomingBlockError { /// - the block's parent is unknown pub async fn handle_incoming_block( block: Block, - given_txs: Vec, + mut given_txs: HashMap<[u8; 32], Transaction>, blockchain_read_handle: &mut BlockchainReadHandle, + txpool_read_handle: &mut TxpoolReadHandle, ) -> Result { /// A [`HashSet`] of block hashes that the blockchain manager is currently handling. /// @@ -72,7 +75,12 @@ pub async fn handle_incoming_block( /// which are also more expensive than `Mutex`s. static BLOCKS_BEING_HANDLED: LazyLock>> = LazyLock::new(|| Mutex::new(HashSet::new())); - // FIXME: we should look in the tx-pool for txs when that is ready. + + if given_txs.len() > block.transactions.len() { + return Err(IncomingBlockError::InvalidBlock(anyhow::anyhow!( + "Too many transactions given for block" + ))); + } if !block_exists(block.header.previous, blockchain_read_handle) .await @@ -90,23 +98,32 @@ pub async fn handle_incoming_block( return Ok(IncomingBlockOk::AlreadyHave); } - // TODO: remove this when we have a working tx-pool. - if given_txs.len() != block.transactions.len() { - return Err(IncomingBlockError::UnknownTransactions( - block_hash, - (0..usize_to_u64(block.transactions.len())).collect(), - )); - } + let TxpoolReadResponse::TxsForBlock { mut txs, missing } = txpool_read_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolReadRequest::TxsForBlock(block.transactions.clone())) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; - // TODO: check we actually got given the right txs. - let prepped_txs = given_txs - .into_par_iter() - .map(|tx| { - let tx = new_tx_verification_data(tx)?; - Ok((tx.tx_hash, tx)) - }) - .collect::>() - .map_err(IncomingBlockError::InvalidBlock)?; + if !missing.is_empty() { + let needed_hashes = missing.iter().map(|index| block.transactions[*index]); + + for needed_hash in needed_hashes { + let Some(tx) = given_txs.remove(&needed_hash) else { + return Err(IncomingBlockError::UnknownTransactions(block_hash, missing)); + }; + + txs.insert( + needed_hash, + new_tx_verification_data(tx) + .map_err(|e| IncomingBlockError::InvalidBlock(e.into()))?, + ); + } + } let Some(incoming_block_tx) = COMMAND_TX.get() else { // We could still be starting up the blockchain manager. @@ -126,7 +143,7 @@ pub async fn handle_incoming_block( incoming_block_tx .send(BlockchainManagerCommand::AddBlock { block, - prepped_txs: todo!(), + prepped_txs: txs, response_tx, }) .await diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index 118c8dd..75d15b0 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -36,6 +36,7 @@ mod commands; mod handler; pub use commands::{BlockchainManagerCommand, IncomingBlockOk}; +use cuprate_txpool::service::TxpoolWriteHandle; /// Initialize the blockchain manager. /// @@ -45,6 +46,7 @@ pub async fn init_blockchain_manager( clearnet_interface: NetworkInterface, blockchain_write_handle: BlockchainWriteHandle, blockchain_read_handle: BlockchainReadHandle, + txpool_write_handle: TxpoolWriteHandle, mut blockchain_context_service: BlockChainContextService, block_verifier_service: ConcreteBlockVerifierService, block_downloader_config: BlockDownloaderConfig, @@ -79,6 +81,7 @@ pub async fn init_blockchain_manager( let manager = BlockchainManager { blockchain_write_handle, blockchain_read_handle, + txpool_write_handle, blockchain_context_service, cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(), block_verifier_service, @@ -101,6 +104,8 @@ pub struct BlockchainManager { blockchain_write_handle: BlockchainWriteHandle, /// A [`BlockchainReadHandle`]. blockchain_read_handle: BlockchainReadHandle, + /// A [`TxpoolWriteHandle`]. + txpool_write_handle: TxpoolWriteHandle, // TODO: Improve the API of the cache service. // TODO: rename the cache service -> `BlockchainContextService`. /// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index 9603bad..c3d256b 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -1,6 +1,7 @@ //! The blockchain manager handler functions. use bytes::Bytes; use futures::{TryFutureExt, TryStreamExt}; +use monero_serai::transaction::Input; use monero_serai::{block::Block, transaction::Transaction}; use rayon::prelude::*; use std::ops::ControlFlow; @@ -17,6 +18,7 @@ use cuprate_consensus::{ }; use cuprate_helper::cast::usize_to_u64; use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest}; +use cuprate_txpool::service::interface::TxpoolWriteRequest; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest}, AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation, @@ -434,6 +436,18 @@ impl super::BlockchainManager { &mut self, verified_block: VerifiedBlockInformation, ) { + // FIXME: this is pretty inefficient, we should probably return the KI map created in the consensus crate. + let spent_key_images = verified_block + .txs + .iter() + .flat_map(|tx| { + tx.tx.prefix().inputs.iter().map(|input| match input { + Input::ToKey { key_image, .. } => key_image.compress().0, + Input::Gen(_) => unreachable!(), + }) + }) + .collect::>(); + self.blockchain_context_service .ready() .await @@ -472,6 +486,14 @@ impl super::BlockchainManager { }; self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone(); + + self.txpool_write_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolWriteRequest::NewBlock { spent_key_images }) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); } } diff --git a/binaries/cuprated/src/txpool.rs b/binaries/cuprated/src/txpool.rs index a6f05e7..7973102 100644 --- a/binaries/cuprated/src/txpool.rs +++ b/binaries/cuprated/src/txpool.rs @@ -1,3 +1,7 @@ //! Transaction Pool //! //! Will handle initiating the tx-pool, providing the preprocessor required for the dandelion pool. + +mod dandelion; +mod incoming_tx; +mod txs_being_handled; diff --git a/binaries/cuprated/src/txpool/dandelion.rs b/binaries/cuprated/src/txpool/dandelion.rs new file mode 100644 index 0000000..07fd479 --- /dev/null +++ b/binaries/cuprated/src/txpool/dandelion.rs @@ -0,0 +1,59 @@ +use std::time::Duration; + +use bytes::Bytes; +use cuprate_dandelion_tower::pool::DandelionPoolService; +use cuprate_dandelion_tower::{DandelionConfig, DandelionRouter, Graph}; +use cuprate_p2p::NetworkInterface; +use cuprate_p2p_core::ClearNet; +use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; +use cuprate_wire::NetworkAddress; + +mod diffuse_service; +mod stem_service; +mod tx_store; + +#[derive(Clone)] +pub struct DandelionTx(Bytes); + +type TxId = [u8; 32]; + +const DANDELION_CONFIG: DandelionConfig = DandelionConfig { + time_between_hop: Duration::from_millis(175), + epoch_duration: Duration::from_secs(10 * 60), + fluff_probability: 0.12, + graph: Graph::FourRegular, +}; + +type ConcreteDandelionRouter = DandelionRouter< + stem_service::OutboundPeerStream, + diffuse_service::DiffuseService, + NetworkAddress, + stem_service::StemPeerService, + DandelionTx, +>; + +pub fn start_dandelion_pool_manager( + router: ConcreteDandelionRouter, + txpool_read_handle: TxpoolReadHandle, + txpool_write_handle: TxpoolWriteHandle, +) -> DandelionPoolService { + cuprate_dandelion_tower::pool::start_dandelion_pool_manager( + 12, + router, + tx_store::TxStoreService { + txpool_read_handle, + txpool_write_handle, + }, + DANDELION_CONFIG, + ) +} + +pub fn dandelion_router(clear_net: NetworkInterface) -> ConcreteDandelionRouter { + DandelionRouter::new( + diffuse_service::DiffuseService { + clear_net_broadcast_service: clear_net.broadcast_svc(), + }, + stem_service::OutboundPeerStream { clear_net }, + DANDELION_CONFIG, + ) +} diff --git a/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs b/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs new file mode 100644 index 0000000..115799d --- /dev/null +++ b/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs @@ -0,0 +1,45 @@ +use std::{ + future::{ready, Ready}, + task::{Context, Poll}, +}; + +use futures::FutureExt; +use tower::Service; + +use cuprate_dandelion_tower::traits::DiffuseRequest; +use cuprate_p2p::{BroadcastRequest, BroadcastSvc, NetworkInterface}; +use cuprate_p2p_core::ClearNet; + +use super::DandelionTx; + +/// The dandelion diffusion service. +pub struct DiffuseService { + pub clear_net_broadcast_service: BroadcastSvc, +} + +impl Service> for DiffuseService { + type Response = (); + type Error = tower::BoxError; + type Future = Ready>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.clear_net_broadcast_service + .poll_ready(cx) + .map_err(Into::into) + } + + fn call(&mut self, req: DiffuseRequest) -> Self::Future { + // TODO: Call `into_inner` when 1.82.0 stabilizes + self.clear_net_broadcast_service + .call(BroadcastRequest::Transaction { + tx_bytes: req.0 .0, + direction: None, + received_from: None, + }) + .now_or_never() + .unwrap() + .expect("Broadcast service is Infallible"); + + ready(Ok(())) + } +} diff --git a/binaries/cuprated/src/txpool/dandelion/stem_service.rs b/binaries/cuprated/src/txpool/dandelion/stem_service.rs new file mode 100644 index 0000000..330c884 --- /dev/null +++ b/binaries/cuprated/src/txpool/dandelion/stem_service.rs @@ -0,0 +1,67 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer}; +use cuprate_p2p::NetworkInterface; +use cuprate_p2p_core::{ + client::{Client, InternalPeerID}, + ClearNet, NetworkZone, PeerRequest, ProtocolRequest, +}; +use cuprate_wire::{protocol::NewTransactions, NetworkAddress}; + +use bytes::Bytes; +use futures::Stream; +use tower::Service; + +use super::DandelionTx; + +/// The dandelion outbound peer stream. +pub struct OutboundPeerStream { + pub clear_net: NetworkInterface, +} + +impl Stream for OutboundPeerStream { + type Item = Result>, tower::BoxError>; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + // TODO: make the outbound peer choice random. + Poll::Ready(Some(Ok(self + .clear_net + .client_pool() + .outbound_client() + .map_or(OutboundPeer::Exhausted, |client| { + let addr = match client.info.id { + InternalPeerID::KnownAddr(addr) => addr, + InternalPeerID::Unknown(_) => panic!("Outbound peer had an unknown address"), + }; + + OutboundPeer::Peer(addr.into(), StemPeerService(client)) + })))) + } +} + +/// The stem service, used to send stem txs. +pub struct StemPeerService(Client); + +impl Service> for StemPeerService { + type Response = as Service>::Response; + type Error = tower::BoxError; + type Future = as Service>::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_ready(cx) + } + + fn call(&mut self, req: StemRequest) -> Self::Future { + self.0 + .call(PeerRequest::Protocol(ProtocolRequest::NewTransactions( + NewTransactions { + txs: vec![req.0 .0], + dandelionpp_fluff: false, + padding: Bytes::new(), + }, + ))) + } +} diff --git a/binaries/cuprated/src/txpool/dandelion/tx_store.rs b/binaries/cuprated/src/txpool/dandelion/tx_store.rs new file mode 100644 index 0000000..c8fda16 --- /dev/null +++ b/binaries/cuprated/src/txpool/dandelion/tx_store.rs @@ -0,0 +1,77 @@ +use std::{ + f32::consts::E, + task::{Context, Poll}, +}; + +use bytes::Bytes; +use futures::{future::BoxFuture, FutureExt, StreamExt, TryFutureExt}; +use tower::{util::Oneshot, Service, ServiceExt}; + +use cuprate_dandelion_tower::{ + traits::{TxStoreRequest, TxStoreResponse}, + State, +}; +use cuprate_database::RuntimeError; +use cuprate_txpool::service::{ + interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse}, + TxpoolReadHandle, TxpoolWriteHandle, +}; + +use super::{DandelionTx, TxId}; + +/// The dandelion tx-store service. +/// +/// This is just mapping the interface [`cuprate_dandelion_tower`] wants to what [`cuprate_txpool`] provides. +pub struct TxStoreService { + pub txpool_read_handle: TxpoolReadHandle, + pub txpool_write_handle: TxpoolWriteHandle, +} + +impl Service> for TxStoreService { + type Response = TxStoreResponse; + type Error = tower::BoxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: TxStoreRequest) -> Self::Future { + match req { + TxStoreRequest::Get(tx_id) => self + .txpool_read_handle + .clone() + .oneshot(TxpoolReadRequest::TxBlob(tx_id)) + .map(|res| match res { + Ok(TxpoolReadResponse::TxBlob { + tx_blob, + state_stem, + }) => { + let state = if state_stem { + State::Stem + } else { + State::Fluff + }; + + Ok(TxStoreResponse::Transaction(Some(( + DandelionTx(Bytes::from(tx_blob)), + state, + )))) + } + Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Transaction(None)), + Err(e) => Err(e.into()), + Ok(_) => unreachable!(), + }) + .boxed(), + TxStoreRequest::Promote(tx_id) => self + .txpool_write_handle + .clone() + .oneshot(TxpoolWriteRequest::Promote(tx_id)) + .map(|res| match res { + Ok(_) | Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Ok), + Err(e) => Err(e.into()), + }) + .boxed(), + } + } +} diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs new file mode 100644 index 0000000..1ade33d --- /dev/null +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -0,0 +1,326 @@ +use std::{ + collections::HashSet, + future::ready, + sync::Arc, + task::{Context, Poll}, +}; + +use bytes::Bytes; +use dashmap::DashSet; +use futures::{future::BoxFuture, FutureExt}; +use monero_serai::transaction::Transaction; +use sha3::{Digest, Sha3_256}; +use tower::{Service, ServiceExt}; + +use cuprate_consensus::{ + transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse, + BlockChainContextService, ExtendedConsensusError, TxVerifierService, VerifyTxRequest, + VerifyTxResponse, +}; +use cuprate_dandelion_tower::{ + pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder}, + State, TxState, +}; +use cuprate_helper::asynch::rayon_spawn_async; +use cuprate_txpool::service::{ + interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse}, + TxpoolReadHandle, TxpoolWriteHandle, +}; +use cuprate_types::TransactionVerificationData; +use cuprate_wire::NetworkAddress; + +use crate::{ + blockchain::ConcreteTxVerifierService, + constants::PANIC_CRITICAL_SERVICE_ERROR, + signals::REORG_LOCK, + txpool::txs_being_handled::{tx_blob_hash, TxBeingHandledLocally, TxsBeingHandled}, +}; + +/// An error that can happen handling an incoming tx. +pub enum IncomingTxError { + Parse(std::io::Error), + Consensus(ExtendedConsensusError), + DuplicateTransaction, +} + +/// Incoming transactions. +pub struct IncomingTxs { + pub txs: Vec, + pub state: TxState, +} + +/// The transaction type used for dandelion++. +#[derive(Clone)] +struct DandelionTx(Bytes); + +/// A transaction ID/hash. +type TxId = [u8; 32]; + +/// The service than handles incoming transaction pool transactions. +/// +/// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes. +pub struct IncomingTxHandler { + /// A store of txs currently being handled in incoming tx requests. + txs_being_handled: TxsBeingHandled, + /// The blockchain context cache. + blockchain_context_cache: BlockChainContextService, + /// The dandelion txpool manager. + dandelion_pool_manager: DandelionPoolService, + /// The transaction verifier service. + tx_verifier_service: ConcreteTxVerifierService, + /// The txpool write handle. + txpool_write_handle: TxpoolWriteHandle, + /// The txpool read handle. + txpool_read_handle: TxpoolReadHandle, +} + +impl Service for IncomingTxHandler { + type Response = (); + type Error = IncomingTxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: IncomingTxs) -> Self::Future { + handle_incoming_txs( + req.txs, + req.state, + self.txs_being_handled.clone(), + self.blockchain_context_cache.clone(), + self.tx_verifier_service.clone(), + self.txpool_write_handle.clone(), + self.txpool_read_handle.clone(), + self.dandelion_pool_manager.clone(), + ) + .boxed() + } +} + +#[expect(clippy::too_many_arguments)] +async fn handle_incoming_txs( + txs: Vec, + state: TxState, + txs_being_handled: TxsBeingHandled, + mut blockchain_context_cache: BlockChainContextService, + mut tx_verifier_service: ConcreteTxVerifierService, + mut txpool_write_handle: TxpoolWriteHandle, + mut txpool_read_handle: TxpoolReadHandle, + mut dandelion_pool_manager: DandelionPoolService, +) -> Result<(), IncomingTxError> { + let reorg_guard = REORG_LOCK.read().await; + + let (txs, stem_pool_txs, txs_being_handled_guard) = + prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?; + + let BlockChainContextResponse::Context(context) = blockchain_context_cache + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockChainContextRequest::Context) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; + + let context = context.unchecked_blockchain_context(); + + tx_verifier_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(VerifyTxRequest::Prepped { + txs: txs.clone(), + current_chain_height: context.chain_height, + top_hash: context.top_hash, + time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), + hf: context.current_hf, + }) + .await + .map_err(IncomingTxError::Consensus)?; + + for tx in txs { + handle_valid_tx( + tx, + state.clone(), + &mut txpool_write_handle, + &mut dandelion_pool_manager, + ) + .await; + } + + for stem_tx in stem_pool_txs { + rerelay_stem_tx( + &stem_tx, + state.clone(), + &mut txpool_read_handle, + &mut dandelion_pool_manager, + ) + .await; + } + + Ok(()) +} + +/// Prepares the incoming transactions for verification. +/// +/// This will filter out all transactions already in the pool or txs already being handled in another request. +async fn prepare_incoming_txs( + tx_blobs: Vec, + txs_being_handled: TxsBeingHandled, + txpool_read_handle: &mut TxpoolReadHandle, +) -> Result< + ( + Vec>, + Vec, + TxBeingHandledLocally, + ), + IncomingTxError, +> { + let mut tx_blob_hashes = HashSet::new(); + let mut txs_being_handled_locally = txs_being_handled.local_tracker(); + + // Compute the blob hash for each tx and filter out the txs currently being handled by another incoming tx batch. + let txs = tx_blobs + .into_iter() + .filter_map(|tx_blob| { + let tx_blob_hash = tx_blob_hash(tx_blob.as_ref()); + + // If a duplicate is in here the incoming tx batch contained the same tx twice. + if !tx_blob_hashes.insert(tx_blob_hash) { + return Some(Err(IncomingTxError::DuplicateTransaction)); + } + + // If a duplicate is here it is being handled in another batch. + if !txs_being_handled_locally.try_add_tx(tx_blob_hash) { + return None; + } + + Some(Ok((tx_blob_hash, tx_blob))) + }) + .collect::, _>>()?; + + // Filter the txs already in the txpool out. + // This will leave the txs already in the pool in [`TxBeingHandledLocally`] but that shouldn't be an issue. + let TxpoolReadResponse::FilterKnownTxBlobHashes { + unknown_blob_hashes, + stem_pool_hashes, + } = txpool_read_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes)) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; + + // Now prepare the txs for verification. + rayon_spawn_async(move || { + let txs = txs + .into_iter() + .filter_map(|(tx_blob_hash, tx_blob)| { + if unknown_blob_hashes.contains(&tx_blob_hash) { + Some(tx_blob) + } else { + None + } + }) + .map(|bytes| { + let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?; + + let tx = new_tx_verification_data(tx) + .map_err(|e| IncomingTxError::Consensus(e.into()))?; + + Ok(Arc::new(tx)) + }) + .collect::, IncomingTxError>>()?; + + Ok((txs, stem_pool_hashes, txs_being_handled_locally)) + }) + .await +} + +async fn handle_valid_tx( + tx: Arc, + state: TxState, + txpool_write_handle: &mut TxpoolWriteHandle, + dandelion_pool_manager: &mut DandelionPoolService, +) { + let incoming_tx = + IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash); + + let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolWriteRequest::AddTransaction { + tx, + state_stem: state.state_stem(), + }) + .await + .expect("TODO") + else { + unreachable!() + }; + + // TODO: track double spends to quickly ignore them from their blob hash. + if let Some(tx_hash) = double_spend { + return; + }; + + // TODO: There is a race condition possible if a tx and block come in at the same time . + + let incoming_tx = incoming_tx + .with_routing_state(state) + .with_state_in_db(None) + .build() + .unwrap(); + + dandelion_pool_manager + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(incoming_tx) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); +} + +async fn rerelay_stem_tx( + tx_hash: &TxId, + state: TxState, + txpool_read_handle: &mut TxpoolReadHandle, + dandelion_pool_manager: &mut DandelionPoolService, +) { + let TxpoolReadResponse::TxBlob { tx_blob, .. } = txpool_read_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolReadRequest::TxBlob(*tx_hash)) + .await + .expect("TODO") + else { + unreachable!() + }; + + let incoming_tx = + IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx_blob)), *tx_hash); + + // TODO: fill this in properly. + let incoming_tx = incoming_tx + .with_routing_state(state) + .with_state_in_db(Some(State::Stem)) + .build() + .unwrap(); + + dandelion_pool_manager + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(incoming_tx) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); +} diff --git a/binaries/cuprated/src/txpool/txs_being_handled.rs b/binaries/cuprated/src/txpool/txs_being_handled.rs new file mode 100644 index 0000000..8b30821 --- /dev/null +++ b/binaries/cuprated/src/txpool/txs_being_handled.rs @@ -0,0 +1,45 @@ +use dashmap::DashSet; +use sha3::{Digest, Sha3_256}; +use std::sync::Arc; + +pub fn tx_blob_hash(tx_bytes: &[u8]) -> [u8; 32] { + let mut hasher = Sha3_256::new(); + hasher.update(tx_bytes); + hasher.finalize().into() +} + +#[derive(Clone)] +pub struct TxsBeingHandled(Arc>); + +impl TxsBeingHandled { + pub fn local_tracker(&self) -> TxBeingHandledLocally { + TxBeingHandledLocally { + txs_being_handled: self.clone(), + txs: vec![], + } + } +} + +pub struct TxBeingHandledLocally { + txs_being_handled: TxsBeingHandled, + txs: Vec<[u8; 32]>, +} + +impl TxBeingHandledLocally { + pub fn try_add_tx(&mut self, tx_blob_hash: [u8; 32]) -> bool { + if !self.txs_being_handled.0.insert(tx_blob_hash) { + return false; + } + + self.txs.push(tx_blob_hash); + true + } +} + +impl Drop for TxBeingHandledLocally { + fn drop(&mut self) { + for hash in &self.txs { + self.txs_being_handled.0.remove(hash); + } + } +} diff --git a/p2p/dandelion-tower/src/router.rs b/p2p/dandelion-tower/src/router.rs index 88702be..c04dcae 100644 --- a/p2p/dandelion-tower/src/router.rs +++ b/p2p/dandelion-tower/src/router.rs @@ -73,6 +73,12 @@ pub enum TxState { Local, } +impl TxState { + pub const fn state_stem(&self) -> bool { + matches!(self, Self::Local | Self::Stem { .. }) + } +} + /// A request to route a transaction. pub struct DandelionRouteReq { /// The transaction. diff --git a/p2p/p2p/src/client_pool.rs b/p2p/p2p/src/client_pool.rs index fc97fc1..25dd242 100644 --- a/p2p/p2p/src/client_pool.rs +++ b/p2p/p2p/src/client_pool.rs @@ -18,7 +18,7 @@ use tracing::{Instrument, Span}; use cuprate_p2p_core::{ client::{Client, InternalPeerID}, handles::ConnectionHandle, - NetworkZone, + ConnectionDirection, NetworkZone, }; pub(crate) mod disconnect_monitor; @@ -165,6 +165,16 @@ impl ClientPool { sync_data.cumulative_difficulty() > cumulative_difficulty }) } + + pub fn outbound_client(&self) -> Option> { + let client = self + .clients + .iter() + .find(|element| element.value().info.direction == ConnectionDirection::Outbound)?; + let id = *client.key(); + + Some(self.clients.remove(&id).unwrap().1) + } } mod sealed { diff --git a/storage/service/src/service/write.rs b/storage/service/src/service/write.rs index f75d615..607c4aa 100644 --- a/storage/service/src/service/write.rs +++ b/storage/service/src/service/write.rs @@ -30,6 +30,14 @@ pub struct DatabaseWriteHandle { crossbeam::channel::Sender<(Req, oneshot::Sender>)>, } +impl Clone for DatabaseWriteHandle { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + } + } +} + impl DatabaseWriteHandle where Req: Send + 'static, diff --git a/storage/txpool/Cargo.toml b/storage/txpool/Cargo.toml index 70211d9..923060e 100644 --- a/storage/txpool/Cargo.toml +++ b/storage/txpool/Cargo.toml @@ -29,6 +29,7 @@ bytemuck = { workspace = true, features = ["must_cast", "derive" bitflags = { workspace = true, features = ["std", "serde", "bytemuck"] } thiserror = { workspace = true } hex = { workspace = true } +sha3 = { workspace = true, features = ["std"] } tower = { workspace = true, optional = true } rayon = { workspace = true, optional = true } diff --git a/storage/txpool/src/free.rs b/storage/txpool/src/free.rs index d394002..e04e350 100644 --- a/storage/txpool/src/free.rs +++ b/storage/txpool/src/free.rs @@ -1,9 +1,11 @@ //! General free functions (related to the tx-pool database). //---------------------------------------------------------------------------------------------------- Import +use sha3::{Digest, Sha3_256}; + use cuprate_database::{ConcreteEnv, Env, EnvInner, InitError, RuntimeError, TxRw}; -use crate::{config::Config, tables::OpenTables}; +use crate::{config::Config, tables::OpenTables, types::TransactionBlobHash}; //---------------------------------------------------------------------------------------------------- Free functions /// Open the txpool database using the passed [`Config`]. @@ -60,3 +62,15 @@ pub fn open(config: Config) -> Result { Ok(env) } + +/// Calculate the transaction blob hash. +/// +/// This value is supposed to be quick to compute just based of the tx-blob without needing to parse the tx. +/// +/// The exact way the hash is calculated is not stable and is subject to change, as such it should not be exposed +/// as a way to interact with Cuprate externally. +pub fn transaction_blob_hash(tx_blob: &[u8]) -> TransactionBlobHash { + let mut hasher = Sha3_256::new(); + hasher.update(tx_blob); + hasher.finalize().into() +} diff --git a/storage/txpool/src/lib.rs b/storage/txpool/src/lib.rs index 5fb3b14..8a57c72 100644 --- a/storage/txpool/src/lib.rs +++ b/storage/txpool/src/lib.rs @@ -14,7 +14,7 @@ mod tx; pub mod types; pub use config::Config; -pub use free::open; +pub use free::{open, transaction_blob_hash}; pub use tx::TxEntry; //re-exports diff --git a/storage/txpool/src/ops.rs b/storage/txpool/src/ops.rs index 50d9ea4..289a8bb 100644 --- a/storage/txpool/src/ops.rs +++ b/storage/txpool/src/ops.rs @@ -85,7 +85,7 @@ mod key_images; mod tx_read; mod tx_write; -pub use tx_read::get_transaction_verification_data; +pub use tx_read::{get_transaction_verification_data, in_stem_pool}; pub use tx_write::{add_transaction, remove_transaction}; /// An error that can occur on some tx-write ops. diff --git a/storage/txpool/src/ops/tx_read.rs b/storage/txpool/src/ops/tx_read.rs index db89415..6b79cba 100644 --- a/storage/txpool/src/ops/tx_read.rs +++ b/storage/txpool/src/ops/tx_read.rs @@ -8,6 +8,8 @@ use monero_serai::transaction::Transaction; use cuprate_database::{DatabaseRo, RuntimeError}; use cuprate_types::{TransactionVerificationData, TxVersion}; +use crate::tables::TransactionInfos; +use crate::types::TxStateFlags; use crate::{tables::Tables, types::TransactionHash}; /// Gets the [`TransactionVerificationData`] of a transaction in the tx-pool, leaving the tx in the pool. @@ -34,3 +36,13 @@ pub fn get_transaction_verification_data( cached_verification_state: Mutex::new(cached_verification_state), }) } + +pub fn in_stem_pool( + tx_hash: &TransactionHash, + tx_infos: &impl DatabaseRo, +) -> Result { + Ok(tx_infos + .get(tx_hash)? + .flags + .contains(TxStateFlags::STATE_STEM)) +} diff --git a/storage/txpool/src/ops/tx_write.rs b/storage/txpool/src/ops/tx_write.rs index 9885b9c..dc5ab46 100644 --- a/storage/txpool/src/ops/tx_write.rs +++ b/storage/txpool/src/ops/tx_write.rs @@ -8,6 +8,7 @@ use cuprate_database::{DatabaseRw, RuntimeError, StorableVec}; use cuprate_types::TransactionVerificationData; use crate::{ + free::transaction_blob_hash, ops::{ key_images::{add_tx_key_images, remove_tx_key_images}, TxPoolWriteError, @@ -56,6 +57,12 @@ pub fn add_transaction( let kis_table = tables.spent_key_images_mut(); add_tx_key_images(&tx.tx.prefix().inputs, &tx.tx_hash, kis_table)?; + // Add the blob hash to table 4. + let blob_hash = transaction_blob_hash(&tx.tx_blob); + tables + .known_blob_hashes_mut() + .put(&blob_hash, &tx.tx_hash)?; + Ok(()) } @@ -79,5 +86,9 @@ pub fn remove_transaction( let kis_table = tables.spent_key_images_mut(); remove_tx_key_images(&tx.prefix().inputs, kis_table)?; + // Remove the blob hash from table 4. + let blob_hash = transaction_blob_hash(&tx_blob); + tables.known_blob_hashes_mut().delete(&blob_hash)?; + Ok(()) } diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index 88dd02e..a400a21 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -1,14 +1,21 @@ //! Tx-pool [`service`](super) interface. //! //! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums. -use std::sync::Arc; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use cuprate_types::TransactionVerificationData; -use crate::{tx::TxEntry, types::TransactionHash}; +use crate::{ + tx::TxEntry, + types::{KeyImage, TransactionBlobHash, TransactionHash}, +}; //---------------------------------------------------------------------------------------------------- TxpoolReadRequest /// The transaction pool [`tower::Service`] read request type. +#[derive(Clone)] pub enum TxpoolReadRequest { /// A request for the blob (raw bytes) of a transaction with the given hash. TxBlob(TransactionHash), @@ -16,6 +23,14 @@ pub enum TxpoolReadRequest { /// A request for the [`TransactionVerificationData`] of a transaction in the tx pool. TxVerificationData(TransactionHash), + /// A request to filter (remove) all **known** transactions from the set. + /// + /// The hash is **not** the transaction hash, it is the hash of the serialized tx-blob. + FilterKnownTxBlobHashes(HashSet), + + /// A request to pull some transactions for an incoming block. + TxsForBlock(Vec), + /// Get information on all transactions in the pool. Backlog, @@ -27,15 +42,28 @@ pub enum TxpoolReadRequest { /// The transaction pool [`tower::Service`] read response type. #[expect(clippy::large_enum_variant)] pub enum TxpoolReadResponse { - /// Response to [`TxpoolReadRequest::TxBlob`]. - /// - /// The inner value is the raw bytes of a transaction. - // TODO: use bytes::Bytes. - TxBlob(Vec), + /// A response containing the raw bytes of a transaction. + TxBlob { tx_blob: Vec, state_stem: bool }, - /// Response to [`TxpoolReadRequest::TxVerificationData`]. + /// A response of [`TransactionVerificationData`]. TxVerificationData(TransactionVerificationData), + /// The response for [`TxpoolReadRequest::FilterKnownTxBlobHashes`]. + FilterKnownTxBlobHashes { + /// The blob hashes that are unknown. + unknown_blob_hashes: HashSet, + /// The tx hashes of the blob hashes that were known but were in the stem pool. + stem_pool_hashes: Vec, + }, + + /// The response for [`TxpoolReadRequest::TxsForBlock`]. + TxsForBlock { + /// The txs we had in the txpool. + txs: HashMap<[u8; 32], TransactionVerificationData>, + /// The indexes of the missing txs. + missing: Vec, + }, + /// Response to [`TxpoolReadRequest::Backlog`]. /// /// The inner `Vec` contains information on all @@ -69,6 +97,18 @@ pub enum TxpoolWriteRequest { /// /// Returns [`TxpoolWriteResponse::Ok`]. RemoveTransaction(TransactionHash), + + /// Promote a transaction from the stem pool to the fluff pool. + /// If the tx is already in the fluff pool this does nothing. + /// + /// Returns [`TxpoolWriteResponse::Ok`]. + Promote(TransactionHash), + + /// Tell the tx-pool about a new block. + NewBlock { + /// The spent key images in the new block. + spent_key_images: Vec, + }, } //---------------------------------------------------------------------------------------------------- TxpoolWriteResponse diff --git a/storage/txpool/src/service/read.rs b/storage/txpool/src/service/read.rs index 3135322..bdeeb74 100644 --- a/storage/txpool/src/service/read.rs +++ b/storage/txpool/src/service/read.rs @@ -4,22 +4,24 @@ clippy::unnecessary_wraps, reason = "TODO: finish implementing the signatures from " )] - -use std::sync::Arc; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use rayon::ThreadPool; -use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner}; +use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError}; use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; use crate::{ - ops::get_transaction_verification_data, + ops::{get_transaction_verification_data, in_stem_pool}, service::{ interface::{TxpoolReadRequest, TxpoolReadResponse}, types::{ReadResponseResult, TxpoolReadHandle}, }, - tables::{OpenTables, TransactionBlobs}, - types::TransactionHash, + tables::{KnownBlobHashes, OpenTables, TransactionBlobs, TransactionInfos}, + types::{TransactionBlobHash, TransactionHash, TxStateFlags}, }; // TODO: update the docs here @@ -57,7 +59,6 @@ fn init_read_service_with_pool(env: Arc, pool: Arc) -> /// 1. `Request` is mapped to a handler function /// 2. Handler function is called /// 3. [`TxpoolReadResponse`] is returned -#[expect(clippy::needless_pass_by_value)] fn map_request( env: &ConcreteEnv, // Access to the database request: TxpoolReadRequest, // The request we must fulfill @@ -65,6 +66,10 @@ fn map_request( match request { TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash), TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash), + TxpoolReadRequest::FilterKnownTxBlobHashes(blob_hashes) => { + filter_known_tx_blob_hashes(env, blob_hashes) + } + TxpoolReadRequest::TxsForBlock(txs_needed) => txs_for_block(env, txs_needed), TxpoolReadRequest::Backlog => backlog(env), TxpoolReadRequest::Size => size(env), } @@ -94,10 +99,15 @@ fn tx_blob(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult { let tx_ro = inner_env.tx_ro()?; let tx_blobs_table = inner_env.open_db_ro::(&tx_ro)?; + let tx_infos_table = inner_env.open_db_ro::(&tx_ro)?; - tx_blobs_table - .get(tx_hash) - .map(|blob| TxpoolReadResponse::TxBlob(blob.0)) + let tx_blob = tx_blobs_table.get(tx_hash)?.0; + let tx_info = tx_infos_table.get(tx_hash)?; + + Ok(TxpoolReadResponse::TxBlob { + tx_blob, + state_stem: tx_info.flags.contains(TxStateFlags::STATE_STEM), + }) } /// [`TxpoolReadRequest::TxVerificationData`]. @@ -111,6 +121,79 @@ fn tx_verification_data(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadRes get_transaction_verification_data(tx_hash, &tables).map(TxpoolReadResponse::TxVerificationData) } +/// [`TxpoolReadRequest::FilterKnownTxBlobHashes`]. +fn filter_known_tx_blob_hashes( + env: &ConcreteEnv, + mut blob_hashes: HashSet, +) -> ReadResponseResult { + let inner_env = env.env_inner(); + let tx_ro = inner_env.tx_ro()?; + + let tx_blob_hashes = inner_env.open_db_ro::(&tx_ro)?; + let tx_infos = inner_env.open_db_ro::(&tx_ro)?; + + let mut stem_pool_hashes = Vec::new(); + + // A closure that returns if a tx with a certain blob hash is unknown. + // This also fills in `stem_tx_hashes`. + let mut tx_unknown = |blob_hash| -> Result { + match tx_blob_hashes.get(&blob_hash) { + Ok(tx_hash) => { + if in_stem_pool(&tx_hash, &tx_infos)? { + stem_pool_hashes.push(tx_hash); + } + Ok(false) + } + Err(RuntimeError::KeyNotFound) => Ok(true), + Err(e) => Err(e), + } + }; + + let mut err = None; + blob_hashes.retain(|blob_hash| match tx_unknown(*blob_hash) { + Ok(res) => res, + Err(e) => { + err = Some(e); + false + } + }); + + if let Some(e) = err { + return Err(e); + } + + Ok(TxpoolReadResponse::FilterKnownTxBlobHashes { + unknown_blob_hashes: blob_hashes, + stem_pool_hashes, + }) +} + +/// [`TxpoolReadRequest::TxsForBlock`]. +fn txs_for_block(env: &ConcreteEnv, txs: Vec) -> ReadResponseResult { + let inner_env = env.env_inner(); + let tx_ro = inner_env.tx_ro()?; + + let tables = inner_env.open_tables(&tx_ro)?; + + let mut missing_tx_indexes = Vec::with_capacity(txs.len()); + let mut txs_verification_data = HashMap::with_capacity(txs.len()); + + for (i, tx_hash) in txs.into_iter().enumerate() { + match get_transaction_verification_data(&tx_hash, &tables) { + Ok(tx) => { + txs_verification_data.insert(tx_hash, tx); + } + Err(RuntimeError::KeyNotFound) => missing_tx_indexes.push(i), + Err(e) => return Err(e), + } + } + + Ok(TxpoolReadResponse::TxsForBlock { + txs: txs_verification_data, + missing: missing_tx_indexes, + }) +} + /// [`TxpoolReadRequest::Backlog`]. #[inline] fn backlog(env: &ConcreteEnv) -> ReadResponseResult { diff --git a/storage/txpool/src/service/write.rs b/storage/txpool/src/service/write.rs index 8a3b1bf..9d1a598 100644 --- a/storage/txpool/src/service/write.rs +++ b/storage/txpool/src/service/write.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use cuprate_database::{ConcreteEnv, Env, EnvInner, RuntimeError, TxRw}; +use cuprate_database::{ConcreteEnv, DatabaseRo, DatabaseRw, Env, EnvInner, RuntimeError, TxRw}; use cuprate_database_service::DatabaseWriteHandle; use cuprate_types::TransactionVerificationData; @@ -10,8 +10,8 @@ use crate::{ interface::{TxpoolWriteRequest, TxpoolWriteResponse}, types::TxpoolWriteHandle, }, - tables::OpenTables, - types::TransactionHash, + tables::{OpenTables, Tables, TransactionInfos}, + types::{KeyImage, TransactionHash, TxStateFlags}, }; //---------------------------------------------------------------------------------------------------- init_write_service @@ -31,6 +31,8 @@ fn handle_txpool_request( add_transaction(env, tx, *state_stem) } TxpoolWriteRequest::RemoveTransaction(tx_hash) => remove_transaction(env, tx_hash), + TxpoolWriteRequest::Promote(tx_hash) => promote(env, tx_hash), + TxpoolWriteRequest::NewBlock { spent_key_images } => new_block(env, spent_key_images), } } @@ -101,3 +103,58 @@ fn remove_transaction( TxRw::commit(tx_rw)?; Ok(TxpoolWriteResponse::Ok) } + +/// [`TxpoolWriteRequest::Promote`] +fn promote( + env: &ConcreteEnv, + tx_hash: &TransactionHash, +) -> Result { + let env_inner = env.env_inner(); + let tx_rw = env_inner.tx_rw()?; + + let mut tx_infos = env_inner.open_db_rw::(&tx_rw)?; + + tx_infos.update(tx_hash, |mut info| { + info.flags.remove(TxStateFlags::STATE_STEM); + Some(info) + })?; + + drop(tx_infos); + + TxRw::commit(tx_rw)?; + Ok(TxpoolWriteResponse::Ok) +} + +fn new_block( + env: &ConcreteEnv, + spent_key_images: &[KeyImage], +) -> Result { + let env_inner = env.env_inner(); + let tx_rw = env_inner.tx_rw()?; + + // FIXME: use try blocks once stable. + let result = || { + let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?; + + for key_image in spent_key_images { + match tables_mut + .spent_key_images() + .get(key_image) + .and_then(|tx_hash| ops::remove_transaction(&tx_hash, &mut tables_mut)) + { + Ok(()) | Err(RuntimeError::KeyNotFound) => (), + Err(e) => return Err(e), + } + } + + Ok(()) + }; + + if let Err(e) = result() { + TxRw::abort(tx_rw)?; + return Err(e); + } + + TxRw::commit(tx_rw)?; + Ok(TxpoolWriteResponse::Ok) +} diff --git a/storage/txpool/src/tables.rs b/storage/txpool/src/tables.rs index dbb686a..1f2d449 100644 --- a/storage/txpool/src/tables.rs +++ b/storage/txpool/src/tables.rs @@ -16,7 +16,9 @@ //! accessing _all_ tables defined here at once. use cuprate_database::{define_tables, StorableVec}; -use crate::types::{KeyImage, RawCachedVerificationState, TransactionHash, TransactionInfo}; +use crate::types::{ + KeyImage, RawCachedVerificationState, TransactionBlobHash, TransactionHash, TransactionInfo, +}; define_tables! { /// Serialized transaction blobs. @@ -41,5 +43,9 @@ define_tables! { /// /// This table contains the spent key images from all transactions in the pool. 3 => SpentKeyImages, - KeyImage => TransactionHash + KeyImage => TransactionHash, + + /// Transaction blob hashes that are in the pool. + 4 => KnownBlobHashes, + TransactionBlobHash => TransactionHash, } diff --git a/storage/txpool/src/types.rs b/storage/txpool/src/types.rs index 4da2d0f..2acb819 100644 --- a/storage/txpool/src/types.rs +++ b/storage/txpool/src/types.rs @@ -6,7 +6,6 @@ //! //! use bytemuck::{Pod, Zeroable}; - use monero_serai::transaction::Timelock; use cuprate_types::{CachedVerificationState, HardFork}; @@ -17,6 +16,9 @@ pub type KeyImage = [u8; 32]; /// A transaction hash. pub type TransactionHash = [u8; 32]; +/// A transaction blob hash. +pub type TransactionBlobHash = [u8; 32]; + bitflags::bitflags! { /// Flags representing the state of the transaction in the pool. #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)]