From d1b9e6bb923357eb85e95caa001394478dfff34a Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Tue, 15 Oct 2024 17:05:15 +0100 Subject: [PATCH] pre-review changes --- binaries/cuprated/src/txpool/dandelion.rs | 7 +++- .../src/txpool/dandelion/stem_service.rs | 4 +- binaries/cuprated/src/txpool/incoming_tx.rs | 39 ++++++++++++------- .../cuprated/src/txpool/txs_being_handled.rs | 28 +++++++------ p2p/dandelion-tower/src/router.rs | 5 ++- p2p/p2p/src/client_pool.rs | 7 ++-- p2p/p2p/src/lib.rs | 1 + storage/txpool/src/ops/tx_read.rs | 10 +++-- storage/txpool/src/service/read.rs | 5 +-- storage/txpool/src/service/write.rs | 2 + 10 files changed, 69 insertions(+), 39 deletions(-) diff --git a/binaries/cuprated/src/txpool/dandelion.rs b/binaries/cuprated/src/txpool/dandelion.rs index dfdcdc1f..982736d6 100644 --- a/binaries/cuprated/src/txpool/dandelion.rs +++ b/binaries/cuprated/src/txpool/dandelion.rs @@ -1,7 +1,5 @@ use std::time::Duration; -use super::incoming_tx::{DandelionTx, TxId}; -use crate::p2p::CrossNetworkInternalPeerId; use cuprate_dandelion_tower::{ pool::DandelionPoolService, DandelionConfig, DandelionRouter, Graph, }; @@ -9,6 +7,11 @@ use cuprate_p2p::NetworkInterface; use cuprate_p2p_core::ClearNet; use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; +use crate::{ + p2p::CrossNetworkInternalPeerId, + txpool::incoming_tx::{DandelionTx, TxId}, +}; + mod diffuse_service; mod stem_service; mod tx_store; diff --git a/binaries/cuprated/src/txpool/dandelion/stem_service.rs b/binaries/cuprated/src/txpool/dandelion/stem_service.rs index 8a50c060..73dc7578 100644 --- a/binaries/cuprated/src/txpool/dandelion/stem_service.rs +++ b/binaries/cuprated/src/txpool/dandelion/stem_service.rs @@ -8,7 +8,7 @@ use futures::Stream; use tower::Service; use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer}; -use cuprate_p2p::NetworkInterface; +use cuprate_p2p::{ClientPoolDropGuard, NetworkInterface}; use cuprate_p2p_core::{ client::{Client, InternalPeerID}, ClearNet, NetworkZone, PeerRequest, ProtocolRequest, @@ -45,7 +45,7 @@ impl Stream for OutboundPeerStream { } /// The stem service, used to send stem txs. -pub struct StemPeerService(Client); +pub struct StemPeerService(ClientPoolDropGuard); impl Service> for StemPeerService { type Response = as Service>::Response; diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index 8ec6d621..4abb7ad0 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -18,18 +18,23 @@ use cuprate_dandelion_tower::{ State, TxState, }; use cuprate_helper::asynch::rayon_spawn_async; -use cuprate_txpool::service::{ - interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse}, - TxpoolReadHandle, TxpoolWriteHandle, +use cuprate_txpool::{ + service::{ + interface::{ + TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse, + }, + TxpoolReadHandle, TxpoolWriteHandle, + }, + transaction_blob_hash, }; use cuprate_types::TransactionVerificationData; -use crate::p2p::CrossNetworkInternalPeerId; use crate::{ blockchain::ConcreteTxVerifierService, constants::PANIC_CRITICAL_SERVICE_ERROR, + p2p::CrossNetworkInternalPeerId, signals::REORG_LOCK, - txpool::txs_being_handled::{tx_blob_hash, TxBeingHandledLocally, TxsBeingHandled}, + txpool::txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally}, }; /// An error that can happen handling an incoming tx. @@ -151,6 +156,7 @@ async fn handle_incoming_txs( .await; } + // Re-relay any txs we got in the block that were already in our stem pool. for stem_tx in stem_pool_txs { rerelay_stem_tx( &stem_tx, @@ -168,7 +174,10 @@ async fn handle_incoming_txs( /// /// This will filter out all transactions already in the pool or txs already being handled in another request. /// -/// # Returns +/// Returns in order: +/// - The [`TransactionVerificationData`] for all the txs we did not already have +/// - The Ids of the transactions in the incoming message that are in our stem-pool +/// - A [`TxsBeingHandledLocally`] guard that prevents verifying the same tx at the same time across 2 tasks. async fn prepare_incoming_txs( tx_blobs: Vec, txs_being_handled: TxsBeingHandled, @@ -177,7 +186,7 @@ async fn prepare_incoming_txs( ( Vec>, Vec, - TxBeingHandledLocally, + TxsBeingHandledLocally, ), IncomingTxError, > { @@ -188,7 +197,7 @@ async fn prepare_incoming_txs( let txs = tx_blobs .into_iter() .filter_map(|tx_blob| { - let tx_blob_hash = tx_blob_hash(tx_blob.as_ref()); + let tx_blob_hash = transaction_blob_hash(tx_blob.as_ref()); // If a duplicate is in here the incoming tx batch contained the same tx twice. if !tx_blob_hashes.insert(tx_blob_hash) { @@ -246,6 +255,9 @@ async fn prepare_incoming_txs( .await } +/// Handle a verified tx. +/// +/// This will add the tx to the txpool and route it to the network. async fn handle_valid_tx( tx: Arc, state: TxState, @@ -265,7 +277,7 @@ async fn handle_valid_tx( .expect(PANIC_CRITICAL_SERVICE_ERROR) .call(TxpoolWriteRequest::AddTransaction { tx, - state_stem: state.state_stem(), + state_stem: state.state_stage(), }) .await .expect("TODO") @@ -278,7 +290,7 @@ async fn handle_valid_tx( return; }; - // TODO: There is a race condition possible if a tx and block come in at the same time . + // TODO: There is a race condition possible if a tx and block come in at the same time: . let incoming_tx = incoming_tx .with_routing_state(state) @@ -295,6 +307,7 @@ async fn handle_valid_tx( .expect(PANIC_CRITICAL_SERVICE_ERROR); } +/// Re-relay a tx that was already in our stem pool. async fn rerelay_stem_tx( tx_hash: &TxId, state: TxState, @@ -305,15 +318,15 @@ async fn rerelay_stem_tx( CrossNetworkInternalPeerId, >, ) { - let TxpoolReadResponse::TxBlob { tx_blob, .. } = txpool_read_handle + let Ok(TxpoolReadResponse::TxBlob { tx_blob, .. }) = txpool_read_handle .ready() .await .expect(PANIC_CRITICAL_SERVICE_ERROR) .call(TxpoolReadRequest::TxBlob(*tx_hash)) .await - .expect("TODO") else { - unreachable!() + // The tx could have been dropped from the pool. + return; }; let incoming_tx = diff --git a/binaries/cuprated/src/txpool/txs_being_handled.rs b/binaries/cuprated/src/txpool/txs_being_handled.rs index 214d1cc2..122b8ac7 100644 --- a/binaries/cuprated/src/txpool/txs_being_handled.rs +++ b/binaries/cuprated/src/txpool/txs_being_handled.rs @@ -1,35 +1,39 @@ -use dashmap::DashSet; -use sha3::{Digest, Sha3_256}; use std::sync::Arc; -pub fn tx_blob_hash(tx_bytes: &[u8]) -> [u8; 32] { - let mut hasher = Sha3_256::new(); - hasher.update(tx_bytes); - hasher.finalize().into() -} +use dashmap::DashSet; +/// A set of txs currently being handled, shared between instances of the incoming tx handler. #[derive(Clone)] pub struct TxsBeingHandled(Arc>); impl TxsBeingHandled { + /// Create a new [`TxsBeingHandled`] pub fn new() -> Self { Self(Arc::new(DashSet::new())) } - pub fn local_tracker(&self) -> TxBeingHandledLocally { - TxBeingHandledLocally { + /// Create a new [`TxsBeingHandledLocally`] that will keep track of txs being handled in a request. + pub fn local_tracker(&self) -> TxsBeingHandledLocally { + TxsBeingHandledLocally { txs_being_handled: self.clone(), txs: vec![], } } } -pub struct TxBeingHandledLocally { +/// A tracker of txs being handled in a single request. This will add the txs to the global [`TxsBeingHandled`] +/// tracker as well. +/// +/// When this is dropped the txs will be removed from [`TxsBeingHandled`]. +pub struct TxsBeingHandledLocally { txs_being_handled: TxsBeingHandled, txs: Vec<[u8; 32]>, } -impl TxBeingHandledLocally { +impl TxsBeingHandledLocally { + /// Try add a tx to the map from its [`transaction_blob_hash`](cuprate_txpool::transaction_blob_hash). + /// + /// Returns `true` if the tx was added and `false` if another task is already handling this tx. pub fn try_add_tx(&mut self, tx_blob_hash: [u8; 32]) -> bool { if !self.txs_being_handled.0.insert(tx_blob_hash) { return false; @@ -40,7 +44,7 @@ impl TxBeingHandledLocally { } } -impl Drop for TxBeingHandledLocally { +impl Drop for TxsBeingHandledLocally { fn drop(&mut self) { for hash in &self.txs { self.txs_being_handled.0.remove(hash); diff --git a/p2p/dandelion-tower/src/router.rs b/p2p/dandelion-tower/src/router.rs index c04dcaea..79eabd4e 100644 --- a/p2p/dandelion-tower/src/router.rs +++ b/p2p/dandelion-tower/src/router.rs @@ -74,7 +74,10 @@ pub enum TxState { } impl TxState { - pub const fn state_stem(&self) -> bool { + /// Returns if the tx is in the stem stage. + /// + /// [`TxState::Local`] & [`TxState::Stem`] are the 2 stem stage states. + pub const fn state_stage(&self) -> bool { matches!(self, Self::Local | Self::Stem { .. }) } } diff --git a/p2p/p2p/src/client_pool.rs b/p2p/p2p/src/client_pool.rs index 25dd2420..67c8f112 100644 --- a/p2p/p2p/src/client_pool.rs +++ b/p2p/p2p/src/client_pool.rs @@ -24,7 +24,7 @@ use cuprate_p2p_core::{ pub(crate) mod disconnect_monitor; mod drop_guard_client; -pub(crate) use drop_guard_client::ClientPoolDropGuard; +pub use drop_guard_client::ClientPoolDropGuard; /// The client pool, which holds currently connected free peers. /// @@ -166,14 +166,15 @@ impl ClientPool { }) } - pub fn outbound_client(&self) -> Option> { + /// Returns the first outbound peer when iterating over the peers. + pub fn outbound_client(self: &Arc) -> Option> { let client = self .clients .iter() .find(|element| element.value().info.direction == ConnectionDirection::Outbound)?; let id = *client.key(); - Some(self.clients.remove(&id).unwrap().1) + Some(self.borrow_client(&id).unwrap()) } } diff --git a/p2p/p2p/src/lib.rs b/p2p/p2p/src/lib.rs index b3577a77..4c1d691c 100644 --- a/p2p/p2p/src/lib.rs +++ b/p2p/p2p/src/lib.rs @@ -26,6 +26,7 @@ mod inbound_server; use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}; pub use broadcast::{BroadcastRequest, BroadcastSvc}; +pub use client_pool::ClientPoolDropGuard; pub use config::{AddressBookConfig, P2PConfig}; use connection_maintainer::MakeConnectionRequest; diff --git a/storage/txpool/src/ops/tx_read.rs b/storage/txpool/src/ops/tx_read.rs index 6b79cba8..fa4af31a 100644 --- a/storage/txpool/src/ops/tx_read.rs +++ b/storage/txpool/src/ops/tx_read.rs @@ -8,9 +8,10 @@ use monero_serai::transaction::Transaction; use cuprate_database::{DatabaseRo, RuntimeError}; use cuprate_types::{TransactionVerificationData, TxVersion}; -use crate::tables::TransactionInfos; -use crate::types::TxStateFlags; -use crate::{tables::Tables, types::TransactionHash}; +use crate::{ + tables::{Tables, TransactionInfos}, + types::{TransactionHash, TxStateFlags}, +}; /// Gets the [`TransactionVerificationData`] of a transaction in the tx-pool, leaving the tx in the pool. pub fn get_transaction_verification_data( @@ -37,6 +38,9 @@ pub fn get_transaction_verification_data( }) } +/// Returns if the transaction with the given hash is in the stem pool. +/// +/// This will return an err if the transaction is not in the pool. pub fn in_stem_pool( tx_hash: &TransactionHash, tx_infos: &impl DatabaseRo, diff --git a/storage/txpool/src/service/read.rs b/storage/txpool/src/service/read.rs index bdeeb74b..45cbd261 100644 --- a/storage/txpool/src/service/read.rs +++ b/storage/txpool/src/service/read.rs @@ -21,7 +21,7 @@ use crate::{ types::{ReadResponseResult, TxpoolReadHandle}, }, tables::{KnownBlobHashes, OpenTables, TransactionBlobs, TransactionInfos}, - types::{TransactionBlobHash, TransactionHash, TxStateFlags}, + types::{TransactionBlobHash, TransactionHash}, }; // TODO: update the docs here @@ -102,11 +102,10 @@ fn tx_blob(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult { let tx_infos_table = inner_env.open_db_ro::(&tx_ro)?; let tx_blob = tx_blobs_table.get(tx_hash)?.0; - let tx_info = tx_infos_table.get(tx_hash)?; Ok(TxpoolReadResponse::TxBlob { tx_blob, - state_stem: tx_info.flags.contains(TxStateFlags::STATE_STEM), + state_stem: in_stem_pool(tx_hash, &tx_infos_table)?, }) } diff --git a/storage/txpool/src/service/write.rs b/storage/txpool/src/service/write.rs index 9d1a5980..3e2186a0 100644 --- a/storage/txpool/src/service/write.rs +++ b/storage/txpool/src/service/write.rs @@ -125,6 +125,7 @@ fn promote( Ok(TxpoolWriteResponse::Ok) } +/// [`TxpoolWriteRequest::NewBlock`] fn new_block( env: &ConcreteEnv, spent_key_images: &[KeyImage], @@ -136,6 +137,7 @@ fn new_block( let result = || { let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?; + // Remove all txs which spend key images that were spent in the new block. for key_image in spent_key_images { match tables_mut .spent_key_images()