pre-review changes

This commit is contained in:
Boog900 2024-10-15 17:05:15 +01:00
parent bffec38fbe
commit d1b9e6bb92
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
10 changed files with 69 additions and 39 deletions

View file

@ -1,7 +1,5 @@
use std::time::Duration;
use super::incoming_tx::{DandelionTx, TxId};
use crate::p2p::CrossNetworkInternalPeerId;
use cuprate_dandelion_tower::{
pool::DandelionPoolService, DandelionConfig, DandelionRouter, Graph,
};
@ -9,6 +7,11 @@ use cuprate_p2p::NetworkInterface;
use cuprate_p2p_core::ClearNet;
use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
use crate::{
p2p::CrossNetworkInternalPeerId,
txpool::incoming_tx::{DandelionTx, TxId},
};
mod diffuse_service;
mod stem_service;
mod tx_store;

View file

@ -8,7 +8,7 @@ use futures::Stream;
use tower::Service;
use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer};
use cuprate_p2p::NetworkInterface;
use cuprate_p2p::{ClientPoolDropGuard, NetworkInterface};
use cuprate_p2p_core::{
client::{Client, InternalPeerID},
ClearNet, NetworkZone, PeerRequest, ProtocolRequest,
@ -45,7 +45,7 @@ impl Stream for OutboundPeerStream {
}
/// The stem service, used to send stem txs.
pub struct StemPeerService<N: NetworkZone>(Client<N>);
pub struct StemPeerService<N: NetworkZone>(ClientPoolDropGuard<N>);
impl<N: NetworkZone> Service<StemRequest<DandelionTx>> for StemPeerService<N> {
type Response = <Client<N> as Service<PeerRequest>>::Response;

View file

@ -18,18 +18,23 @@ use cuprate_dandelion_tower::{
State, TxState,
};
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_txpool::service::{
interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse},
TxpoolReadHandle, TxpoolWriteHandle,
use cuprate_txpool::{
service::{
interface::{
TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse,
},
TxpoolReadHandle, TxpoolWriteHandle,
},
transaction_blob_hash,
};
use cuprate_types::TransactionVerificationData;
use crate::p2p::CrossNetworkInternalPeerId;
use crate::{
blockchain::ConcreteTxVerifierService,
constants::PANIC_CRITICAL_SERVICE_ERROR,
p2p::CrossNetworkInternalPeerId,
signals::REORG_LOCK,
txpool::txs_being_handled::{tx_blob_hash, TxBeingHandledLocally, TxsBeingHandled},
txpool::txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally},
};
/// An error that can happen handling an incoming tx.
@ -151,6 +156,7 @@ async fn handle_incoming_txs(
.await;
}
// Re-relay any txs we got in the block that were already in our stem pool.
for stem_tx in stem_pool_txs {
rerelay_stem_tx(
&stem_tx,
@ -168,7 +174,10 @@ async fn handle_incoming_txs(
///
/// This will filter out all transactions already in the pool or txs already being handled in another request.
///
/// # Returns
/// Returns in order:
/// - The [`TransactionVerificationData`] for all the txs we did not already have
/// - The Ids of the transactions in the incoming message that are in our stem-pool
/// - A [`TxsBeingHandledLocally`] guard that prevents verifying the same tx at the same time across 2 tasks.
async fn prepare_incoming_txs(
tx_blobs: Vec<Bytes>,
txs_being_handled: TxsBeingHandled,
@ -177,7 +186,7 @@ async fn prepare_incoming_txs(
(
Vec<Arc<TransactionVerificationData>>,
Vec<TxId>,
TxBeingHandledLocally,
TxsBeingHandledLocally,
),
IncomingTxError,
> {
@ -188,7 +197,7 @@ async fn prepare_incoming_txs(
let txs = tx_blobs
.into_iter()
.filter_map(|tx_blob| {
let tx_blob_hash = tx_blob_hash(tx_blob.as_ref());
let tx_blob_hash = transaction_blob_hash(tx_blob.as_ref());
// If a duplicate is in here the incoming tx batch contained the same tx twice.
if !tx_blob_hashes.insert(tx_blob_hash) {
@ -246,6 +255,9 @@ async fn prepare_incoming_txs(
.await
}
/// Handle a verified tx.
///
/// This will add the tx to the txpool and route it to the network.
async fn handle_valid_tx(
tx: Arc<TransactionVerificationData>,
state: TxState<CrossNetworkInternalPeerId>,
@ -265,7 +277,7 @@ async fn handle_valid_tx(
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolWriteRequest::AddTransaction {
tx,
state_stem: state.state_stem(),
state_stem: state.state_stage(),
})
.await
.expect("TODO")
@ -278,7 +290,7 @@ async fn handle_valid_tx(
return;
};
// TODO: There is a race condition possible if a tx and block come in at the same time <https://github.com/Cuprate/cuprate/issues/314>.
// TODO: There is a race condition possible if a tx and block come in at the same time: <https://github.com/Cuprate/cuprate/issues/314>.
let incoming_tx = incoming_tx
.with_routing_state(state)
@ -295,6 +307,7 @@ async fn handle_valid_tx(
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}
/// Re-relay a tx that was already in our stem pool.
async fn rerelay_stem_tx(
tx_hash: &TxId,
state: TxState<CrossNetworkInternalPeerId>,
@ -305,15 +318,15 @@ async fn rerelay_stem_tx(
CrossNetworkInternalPeerId,
>,
) {
let TxpoolReadResponse::TxBlob { tx_blob, .. } = txpool_read_handle
let Ok(TxpoolReadResponse::TxBlob { tx_blob, .. }) = txpool_read_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolReadRequest::TxBlob(*tx_hash))
.await
.expect("TODO")
else {
unreachable!()
// The tx could have been dropped from the pool.
return;
};
let incoming_tx =

View file

@ -1,35 +1,39 @@
use dashmap::DashSet;
use sha3::{Digest, Sha3_256};
use std::sync::Arc;
pub fn tx_blob_hash(tx_bytes: &[u8]) -> [u8; 32] {
let mut hasher = Sha3_256::new();
hasher.update(tx_bytes);
hasher.finalize().into()
}
use dashmap::DashSet;
/// A set of txs currently being handled, shared between instances of the incoming tx handler.
#[derive(Clone)]
pub struct TxsBeingHandled(Arc<DashSet<[u8; 32]>>);
impl TxsBeingHandled {
/// Create a new [`TxsBeingHandled`]
pub fn new() -> Self {
Self(Arc::new(DashSet::new()))
}
pub fn local_tracker(&self) -> TxBeingHandledLocally {
TxBeingHandledLocally {
/// Create a new [`TxsBeingHandledLocally`] that will keep track of txs being handled in a request.
pub fn local_tracker(&self) -> TxsBeingHandledLocally {
TxsBeingHandledLocally {
txs_being_handled: self.clone(),
txs: vec![],
}
}
}
pub struct TxBeingHandledLocally {
/// A tracker of txs being handled in a single request. This will add the txs to the global [`TxsBeingHandled`]
/// tracker as well.
///
/// When this is dropped the txs will be removed from [`TxsBeingHandled`].
pub struct TxsBeingHandledLocally {
txs_being_handled: TxsBeingHandled,
txs: Vec<[u8; 32]>,
}
impl TxBeingHandledLocally {
impl TxsBeingHandledLocally {
/// Try add a tx to the map from its [`transaction_blob_hash`](cuprate_txpool::transaction_blob_hash).
///
/// Returns `true` if the tx was added and `false` if another task is already handling this tx.
pub fn try_add_tx(&mut self, tx_blob_hash: [u8; 32]) -> bool {
if !self.txs_being_handled.0.insert(tx_blob_hash) {
return false;
@ -40,7 +44,7 @@ impl TxBeingHandledLocally {
}
}
impl Drop for TxBeingHandledLocally {
impl Drop for TxsBeingHandledLocally {
fn drop(&mut self) {
for hash in &self.txs {
self.txs_being_handled.0.remove(hash);

View file

@ -74,7 +74,10 @@ pub enum TxState<Id> {
}
impl<Id> TxState<Id> {
pub const fn state_stem(&self) -> bool {
/// Returns if the tx is in the stem stage.
///
/// [`TxState::Local`] & [`TxState::Stem`] are the 2 stem stage states.
pub const fn state_stage(&self) -> bool {
matches!(self, Self::Local | Self::Stem { .. })
}
}

View file

@ -24,7 +24,7 @@ use cuprate_p2p_core::{
pub(crate) mod disconnect_monitor;
mod drop_guard_client;
pub(crate) use drop_guard_client::ClientPoolDropGuard;
pub use drop_guard_client::ClientPoolDropGuard;
/// The client pool, which holds currently connected free peers.
///
@ -166,14 +166,15 @@ impl<N: NetworkZone> ClientPool<N> {
})
}
pub fn outbound_client(&self) -> Option<Client<N>> {
/// Returns the first outbound peer when iterating over the peers.
pub fn outbound_client(self: &Arc<Self>) -> Option<ClientPoolDropGuard<N>> {
let client = self
.clients
.iter()
.find(|element| element.value().info.direction == ConnectionDirection::Outbound)?;
let id = *client.key();
Some(self.clients.remove(&id).unwrap().1)
Some(self.borrow_client(&id).unwrap())
}
}

View file

@ -26,6 +26,7 @@ mod inbound_server;
use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse};
pub use broadcast::{BroadcastRequest, BroadcastSvc};
pub use client_pool::ClientPoolDropGuard;
pub use config::{AddressBookConfig, P2PConfig};
use connection_maintainer::MakeConnectionRequest;

View file

@ -8,9 +8,10 @@ use monero_serai::transaction::Transaction;
use cuprate_database::{DatabaseRo, RuntimeError};
use cuprate_types::{TransactionVerificationData, TxVersion};
use crate::tables::TransactionInfos;
use crate::types::TxStateFlags;
use crate::{tables::Tables, types::TransactionHash};
use crate::{
tables::{Tables, TransactionInfos},
types::{TransactionHash, TxStateFlags},
};
/// Gets the [`TransactionVerificationData`] of a transaction in the tx-pool, leaving the tx in the pool.
pub fn get_transaction_verification_data(
@ -37,6 +38,9 @@ pub fn get_transaction_verification_data(
})
}
/// Returns if the transaction with the given hash is in the stem pool.
///
/// This will return an err if the transaction is not in the pool.
pub fn in_stem_pool(
tx_hash: &TransactionHash,
tx_infos: &impl DatabaseRo<TransactionInfos>,

View file

@ -21,7 +21,7 @@ use crate::{
types::{ReadResponseResult, TxpoolReadHandle},
},
tables::{KnownBlobHashes, OpenTables, TransactionBlobs, TransactionInfos},
types::{TransactionBlobHash, TransactionHash, TxStateFlags},
types::{TransactionBlobHash, TransactionHash},
};
// TODO: update the docs here
@ -102,11 +102,10 @@ fn tx_blob(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult {
let tx_infos_table = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
let tx_blob = tx_blobs_table.get(tx_hash)?.0;
let tx_info = tx_infos_table.get(tx_hash)?;
Ok(TxpoolReadResponse::TxBlob {
tx_blob,
state_stem: tx_info.flags.contains(TxStateFlags::STATE_STEM),
state_stem: in_stem_pool(tx_hash, &tx_infos_table)?,
})
}

View file

@ -125,6 +125,7 @@ fn promote(
Ok(TxpoolWriteResponse::Ok)
}
/// [`TxpoolWriteRequest::NewBlock`]
fn new_block(
env: &ConcreteEnv,
spent_key_images: &[KeyImage],
@ -136,6 +137,7 @@ fn new_block(
let result = || {
let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
// Remove all txs which spend key images that were spent in the new block.
for key_image in spent_key_images {
match tables_mut
.spent_key_images()