diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index addcad5f..f0404b6b 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -19,7 +19,7 @@ use cuprate_consensus::{ }; use cuprate_dandelion_tower::{ pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder}, - TxState, + State, TxState, }; use cuprate_helper::asynch::rayon_spawn_async; use cuprate_txpool::service::{ @@ -110,7 +110,7 @@ async fn handle_incoming_txs( ) -> Result<(), IncomingTxError> { let reorg_guard = REORG_LOCK.read().await; - let (txs, txs_being_handled_guard) = + let (txs, stem_pool_txs, txs_being_handled_guard) = prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?; let BlockChainContextResponse::Context(context) = blockchain_context_cache @@ -150,6 +150,16 @@ async fn handle_incoming_txs( .await } + for stem_tx in stem_pool_txs { + rerelay_stem_tx( + &stem_tx, + state.clone(), + &mut txpool_read_handle, + &mut dandelion_pool_manager, + ) + .await; + } + Ok(()) } @@ -160,7 +170,14 @@ async fn prepare_incoming_txs( tx_blobs: Vec, txs_being_handled: TxsBeingHandled, txpool_read_handle: &mut TxpoolReadHandle, -) -> Result<(Vec>, TxBeingHandledLocally), IncomingTxError> { +) -> Result< + ( + Vec>, + Vec, + TxBeingHandledLocally, + ), + IncomingTxError, +> { let mut tx_blob_hashes = HashSet::new(); let mut txs_being_handled_loacally = txs_being_handled.local_tracker(); @@ -186,7 +203,10 @@ async fn prepare_incoming_txs( // Filter the txs already in the txpool out. // This will leave the txs already in the pool in [`TxBeingHandledLocally`] but that shouldn't be an issue. - let TxpoolReadResponse::FilterKnownTxBlobHashes(tx_blob_hashes) = txpool_read_handle + let TxpoolReadResponse::FilterKnownTxBlobHashes { + unknown_blob_hashes, + stem_pool_hashes, + } = txpool_read_handle .ready() .await .expect(PANIC_CRITICAL_SERVICE_ERROR) @@ -202,7 +222,7 @@ async fn prepare_incoming_txs( let txs = txs .into_iter() .filter_map(|(tx_blob_hash, tx_blob)| { - if tx_blob_hashes.contains(&tx_blob_hash) { + if unknown_blob_hashes.contains(&tx_blob_hash) { Some(tx_blob) } else { None @@ -218,7 +238,7 @@ async fn prepare_incoming_txs( }) .collect::, IncomingTxError>>()?; - Ok((txs, txs_being_handled_loacally)) + Ok((txs, stem_pool_hashes, txs_being_handled_loacally)) }) .await } @@ -268,3 +288,39 @@ async fn handle_valid_tx( .await .expect(PANIC_CRITICAL_SERVICE_ERROR); } + +async fn rerelay_stem_tx( + tx_hash: &TxId, + state: TxState, + txpool_read_handle: &mut TxpoolReadHandle, + dandelion_pool_manager: &mut DandelionPoolService, +) { + let TxpoolReadResponse::TxBlob { tx_blob, .. } = txpool_read_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolReadRequest::TxBlob(*tx_hash)) + .await + .expect("TODO") + else { + unreachable!() + }; + + let incoming_tx = + IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx_blob)), *tx_hash); + + // TODO: fill this in properly. + let incoming_tx = incoming_tx + .with_routing_state(state) + .with_state_in_db(Some(State::Stem)) + .build() + .unwrap(); + + dandelion_pool_manager + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(incoming_tx) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); +} diff --git a/storage/txpool/src/ops.rs b/storage/txpool/src/ops.rs index 50d9ea4a..289a8bbf 100644 --- a/storage/txpool/src/ops.rs +++ b/storage/txpool/src/ops.rs @@ -85,7 +85,7 @@ mod key_images; mod tx_read; mod tx_write; -pub use tx_read::get_transaction_verification_data; +pub use tx_read::{get_transaction_verification_data, in_stem_pool}; pub use tx_write::{add_transaction, remove_transaction}; /// An error that can occur on some tx-write ops. diff --git a/storage/txpool/src/ops/tx_read.rs b/storage/txpool/src/ops/tx_read.rs index db894151..6b79cba8 100644 --- a/storage/txpool/src/ops/tx_read.rs +++ b/storage/txpool/src/ops/tx_read.rs @@ -8,6 +8,8 @@ use monero_serai::transaction::Transaction; use cuprate_database::{DatabaseRo, RuntimeError}; use cuprate_types::{TransactionVerificationData, TxVersion}; +use crate::tables::TransactionInfos; +use crate::types::TxStateFlags; use crate::{tables::Tables, types::TransactionHash}; /// Gets the [`TransactionVerificationData`] of a transaction in the tx-pool, leaving the tx in the pool. @@ -34,3 +36,13 @@ pub fn get_transaction_verification_data( cached_verification_state: Mutex::new(cached_verification_state), }) } + +pub fn in_stem_pool( + tx_hash: &TransactionHash, + tx_infos: &impl DatabaseRo, +) -> Result { + Ok(tx_infos + .get(tx_hash)? + .flags + .contains(TxStateFlags::STATE_STEM)) +} diff --git a/storage/txpool/src/ops/tx_write.rs b/storage/txpool/src/ops/tx_write.rs index 7b3b9a63..dc5ab463 100644 --- a/storage/txpool/src/ops/tx_write.rs +++ b/storage/txpool/src/ops/tx_write.rs @@ -59,7 +59,9 @@ pub fn add_transaction( // Add the blob hash to table 4. let blob_hash = transaction_blob_hash(&tx.tx_blob); - tables.known_blob_hashes_mut().put(&blob_hash, &())?; + tables + .known_blob_hashes_mut() + .put(&blob_hash, &tx.tx_hash)?; Ok(()) } diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index a5d6634e..0e570399 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -30,7 +30,12 @@ pub enum TxpoolReadResponse { /// A response of [`TransactionVerificationData`]. TxVerificationData(TransactionVerificationData), /// The response for [`TxpoolReadRequest::FilterKnownTxBlobHashes`]. - FilterKnownTxBlobHashes(HashSet), + FilterKnownTxBlobHashes { + /// The blob hashes that are unknown. + unknown_blob_hashes: HashSet, + /// The tx hashes of the blob hashes that were known but were in the stem pool. + stem_pool_hashes: Vec, + }, } //---------------------------------------------------------------------------------------------------- TxpoolWriteRequest diff --git a/storage/txpool/src/service/read.rs b/storage/txpool/src/service/read.rs index 4bcb32a1..9e92a0d4 100644 --- a/storage/txpool/src/service/read.rs +++ b/storage/txpool/src/service/read.rs @@ -2,9 +2,10 @@ use std::{collections::HashSet, sync::Arc}; use rayon::ThreadPool; -use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner}; +use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError}; use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; +use crate::ops::in_stem_pool; use crate::{ ops::get_transaction_verification_data, service::{ @@ -119,12 +120,30 @@ fn filter_known_tx_blob_hashes( let tx_ro = inner_env.tx_ro()?; let tx_blob_hashes = inner_env.open_db_ro::(&tx_ro)?; + let tx_infos = inner_env.open_db_ro::(&tx_ro)?; + + let mut stem_pool_hashes = Vec::new(); + + // A closure that returns if a tx with a certain blob hash is unknown. + // This also fills in `stem_tx_hashes`. + let mut tx_unknown = |blob_hash| -> Result { + match tx_blob_hashes.get(&blob_hash) { + Ok(tx_hash) => { + if in_stem_pool(&tx_hash, &tx_infos)? { + stem_pool_hashes.push(tx_hash); + } + Ok(false) + } + Err(RuntimeError::KeyNotFound) => Ok(true), + Err(e) => Err(e), + } + }; let mut err = None; - blob_hashes.retain(|blob_hash| match tx_blob_hashes.contains(blob_hash) { - Ok(exists) => !exists, + blob_hashes.retain(|blob_hash| match tx_unknown(*blob_hash) { + Ok(res) => res, Err(e) => { - err.get_or_insert(e); + err = Some(e); false } }); @@ -133,5 +152,8 @@ fn filter_known_tx_blob_hashes( return Err(e); } - Ok(TxpoolReadResponse::FilterKnownTxBlobHashes(blob_hashes)) + Ok(TxpoolReadResponse::FilterKnownTxBlobHashes { + unknown_blob_hashes: blob_hashes, + stem_pool_hashes, + }) } diff --git a/storage/txpool/src/tables.rs b/storage/txpool/src/tables.rs index 2ab32629..1f2d4490 100644 --- a/storage/txpool/src/tables.rs +++ b/storage/txpool/src/tables.rs @@ -47,5 +47,5 @@ define_tables! { /// Transaction blob hashes that are in the pool. 4 => KnownBlobHashes, - TransactionBlobHash => (), + TransactionBlobHash => TransactionHash, }