handle duplicate stem txs

This commit is contained in:
Boog900 2024-10-13 18:35:58 +01:00
parent 2d5567e47b
commit 93fb3c657c
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
7 changed files with 112 additions and 15 deletions

View file

@ -19,7 +19,7 @@ use cuprate_consensus::{
}; };
use cuprate_dandelion_tower::{ use cuprate_dandelion_tower::{
pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder}, pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder},
TxState, State, TxState,
}; };
use cuprate_helper::asynch::rayon_spawn_async; use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_txpool::service::{ use cuprate_txpool::service::{
@ -110,7 +110,7 @@ async fn handle_incoming_txs(
) -> Result<(), IncomingTxError> { ) -> Result<(), IncomingTxError> {
let reorg_guard = REORG_LOCK.read().await; let reorg_guard = REORG_LOCK.read().await;
let (txs, txs_being_handled_guard) = let (txs, stem_pool_txs, txs_being_handled_guard) =
prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?; prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?;
let BlockChainContextResponse::Context(context) = blockchain_context_cache let BlockChainContextResponse::Context(context) = blockchain_context_cache
@ -150,6 +150,16 @@ async fn handle_incoming_txs(
.await .await
} }
for stem_tx in stem_pool_txs {
rerelay_stem_tx(
&stem_tx,
state.clone(),
&mut txpool_read_handle,
&mut dandelion_pool_manager,
)
.await;
}
Ok(()) Ok(())
} }
@ -160,7 +170,14 @@ async fn prepare_incoming_txs(
tx_blobs: Vec<Bytes>, tx_blobs: Vec<Bytes>,
txs_being_handled: TxsBeingHandled, txs_being_handled: TxsBeingHandled,
txpool_read_handle: &mut TxpoolReadHandle, txpool_read_handle: &mut TxpoolReadHandle,
) -> Result<(Vec<Arc<TransactionVerificationData>>, TxBeingHandledLocally), IncomingTxError> { ) -> Result<
(
Vec<Arc<TransactionVerificationData>>,
Vec<TxId>,
TxBeingHandledLocally,
),
IncomingTxError,
> {
let mut tx_blob_hashes = HashSet::new(); let mut tx_blob_hashes = HashSet::new();
let mut txs_being_handled_loacally = txs_being_handled.local_tracker(); let mut txs_being_handled_loacally = txs_being_handled.local_tracker();
@ -186,7 +203,10 @@ async fn prepare_incoming_txs(
// Filter the txs already in the txpool out. // Filter the txs already in the txpool out.
// This will leave the txs already in the pool in [`TxBeingHandledLocally`] but that shouldn't be an issue. // This will leave the txs already in the pool in [`TxBeingHandledLocally`] but that shouldn't be an issue.
let TxpoolReadResponse::FilterKnownTxBlobHashes(tx_blob_hashes) = txpool_read_handle let TxpoolReadResponse::FilterKnownTxBlobHashes {
unknown_blob_hashes,
stem_pool_hashes,
} = txpool_read_handle
.ready() .ready()
.await .await
.expect(PANIC_CRITICAL_SERVICE_ERROR) .expect(PANIC_CRITICAL_SERVICE_ERROR)
@ -202,7 +222,7 @@ async fn prepare_incoming_txs(
let txs = txs let txs = txs
.into_iter() .into_iter()
.filter_map(|(tx_blob_hash, tx_blob)| { .filter_map(|(tx_blob_hash, tx_blob)| {
if tx_blob_hashes.contains(&tx_blob_hash) { if unknown_blob_hashes.contains(&tx_blob_hash) {
Some(tx_blob) Some(tx_blob)
} else { } else {
None None
@ -218,7 +238,7 @@ async fn prepare_incoming_txs(
}) })
.collect::<Result<Vec<_>, IncomingTxError>>()?; .collect::<Result<Vec<_>, IncomingTxError>>()?;
Ok((txs, txs_being_handled_loacally)) Ok((txs, stem_pool_hashes, txs_being_handled_loacally))
}) })
.await .await
} }
@ -268,3 +288,39 @@ async fn handle_valid_tx(
.await .await
.expect(PANIC_CRITICAL_SERVICE_ERROR); .expect(PANIC_CRITICAL_SERVICE_ERROR);
} }
async fn rerelay_stem_tx(
tx_hash: &TxId,
state: TxState<NetworkAddress>,
txpool_read_handle: &mut TxpoolReadHandle,
dandelion_pool_manager: &mut DandelionPoolService<DandelionTx, TxId, NetworkAddress>,
) {
let TxpoolReadResponse::TxBlob { tx_blob, .. } = txpool_read_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolReadRequest::TxBlob(*tx_hash))
.await
.expect("TODO")
else {
unreachable!()
};
let incoming_tx =
IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx_blob)), *tx_hash);
// TODO: fill this in properly.
let incoming_tx = incoming_tx
.with_routing_state(state)
.with_state_in_db(Some(State::Stem))
.build()
.unwrap();
dandelion_pool_manager
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(incoming_tx)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}

View file

@ -85,7 +85,7 @@ mod key_images;
mod tx_read; mod tx_read;
mod tx_write; mod tx_write;
pub use tx_read::get_transaction_verification_data; pub use tx_read::{get_transaction_verification_data, in_stem_pool};
pub use tx_write::{add_transaction, remove_transaction}; pub use tx_write::{add_transaction, remove_transaction};
/// An error that can occur on some tx-write ops. /// An error that can occur on some tx-write ops.

View file

@ -8,6 +8,8 @@ use monero_serai::transaction::Transaction;
use cuprate_database::{DatabaseRo, RuntimeError}; use cuprate_database::{DatabaseRo, RuntimeError};
use cuprate_types::{TransactionVerificationData, TxVersion}; use cuprate_types::{TransactionVerificationData, TxVersion};
use crate::tables::TransactionInfos;
use crate::types::TxStateFlags;
use crate::{tables::Tables, types::TransactionHash}; use crate::{tables::Tables, types::TransactionHash};
/// Gets the [`TransactionVerificationData`] of a transaction in the tx-pool, leaving the tx in the pool. /// Gets the [`TransactionVerificationData`] of a transaction in the tx-pool, leaving the tx in the pool.
@ -34,3 +36,13 @@ pub fn get_transaction_verification_data(
cached_verification_state: Mutex::new(cached_verification_state), cached_verification_state: Mutex::new(cached_verification_state),
}) })
} }
pub fn in_stem_pool(
tx_hash: &TransactionHash,
tx_infos: &impl DatabaseRo<TransactionInfos>,
) -> Result<bool, RuntimeError> {
Ok(tx_infos
.get(tx_hash)?
.flags
.contains(TxStateFlags::STATE_STEM))
}

View file

@ -59,7 +59,9 @@ pub fn add_transaction(
// Add the blob hash to table 4. // Add the blob hash to table 4.
let blob_hash = transaction_blob_hash(&tx.tx_blob); let blob_hash = transaction_blob_hash(&tx.tx_blob);
tables.known_blob_hashes_mut().put(&blob_hash, &())?; tables
.known_blob_hashes_mut()
.put(&blob_hash, &tx.tx_hash)?;
Ok(()) Ok(())
} }

View file

@ -30,7 +30,12 @@ pub enum TxpoolReadResponse {
/// A response of [`TransactionVerificationData`]. /// A response of [`TransactionVerificationData`].
TxVerificationData(TransactionVerificationData), TxVerificationData(TransactionVerificationData),
/// The response for [`TxpoolReadRequest::FilterKnownTxBlobHashes`]. /// The response for [`TxpoolReadRequest::FilterKnownTxBlobHashes`].
FilterKnownTxBlobHashes(HashSet<TransactionBlobHash>), FilterKnownTxBlobHashes {
/// The blob hashes that are unknown.
unknown_blob_hashes: HashSet<TransactionBlobHash>,
/// The tx hashes of the blob hashes that were known but were in the stem pool.
stem_pool_hashes: Vec<TransactionHash>,
},
} }
//---------------------------------------------------------------------------------------------------- TxpoolWriteRequest //---------------------------------------------------------------------------------------------------- TxpoolWriteRequest

View file

@ -2,9 +2,10 @@ use std::{collections::HashSet, sync::Arc};
use rayon::ThreadPool; use rayon::ThreadPool;
use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner}; use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError};
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
use crate::ops::in_stem_pool;
use crate::{ use crate::{
ops::get_transaction_verification_data, ops::get_transaction_verification_data,
service::{ service::{
@ -119,12 +120,30 @@ fn filter_known_tx_blob_hashes(
let tx_ro = inner_env.tx_ro()?; let tx_ro = inner_env.tx_ro()?;
let tx_blob_hashes = inner_env.open_db_ro::<KnownBlobHashes>(&tx_ro)?; let tx_blob_hashes = inner_env.open_db_ro::<KnownBlobHashes>(&tx_ro)?;
let tx_infos = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
let mut stem_pool_hashes = Vec::new();
// A closure that returns if a tx with a certain blob hash is unknown.
// This also fills in `stem_tx_hashes`.
let mut tx_unknown = |blob_hash| -> Result<bool, RuntimeError> {
match tx_blob_hashes.get(&blob_hash) {
Ok(tx_hash) => {
if in_stem_pool(&tx_hash, &tx_infos)? {
stem_pool_hashes.push(tx_hash);
}
Ok(false)
}
Err(RuntimeError::KeyNotFound) => Ok(true),
Err(e) => Err(e),
}
};
let mut err = None; let mut err = None;
blob_hashes.retain(|blob_hash| match tx_blob_hashes.contains(blob_hash) { blob_hashes.retain(|blob_hash| match tx_unknown(*blob_hash) {
Ok(exists) => !exists, Ok(res) => res,
Err(e) => { Err(e) => {
err.get_or_insert(e); err = Some(e);
false false
} }
}); });
@ -133,5 +152,8 @@ fn filter_known_tx_blob_hashes(
return Err(e); return Err(e);
} }
Ok(TxpoolReadResponse::FilterKnownTxBlobHashes(blob_hashes)) Ok(TxpoolReadResponse::FilterKnownTxBlobHashes {
unknown_blob_hashes: blob_hashes,
stem_pool_hashes,
})
} }

View file

@ -47,5 +47,5 @@ define_tables! {
/// Transaction blob hashes that are in the pool. /// Transaction blob hashes that are in the pool.
4 => KnownBlobHashes, 4 => KnownBlobHashes,
TransactionBlobHash => (), TransactionBlobHash => TransactionHash,
} }