diff --git a/binaries/cuprated/src/blockchain/interface.rs b/binaries/cuprated/src/blockchain/interface.rs index 7f7dfd3a..9df0d23c 100644 --- a/binaries/cuprated/src/blockchain/interface.rs +++ b/binaries/cuprated/src/blockchain/interface.rs @@ -8,19 +8,16 @@ use std::{ }; use monero_serai::{block::Block, transaction::Transaction}; -use rayon::prelude::*; use tokio::sync::{mpsc, oneshot}; use tower::{Service, ServiceExt}; use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_consensus::transactions::new_tx_verification_data; -use cuprate_helper::cast::usize_to_u64; -use cuprate_txpool::service::interface::{TxpoolReadRequest, TxpoolReadResponse}; -use cuprate_txpool::service::TxpoolReadHandle; -use cuprate_types::{ - blockchain::{BlockchainReadRequest, BlockchainResponse}, - Chain, +use cuprate_txpool::service::{ + interface::{TxpoolReadRequest, TxpoolReadResponse}, + TxpoolReadHandle, }; +use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; use crate::{ blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk}, @@ -114,6 +111,10 @@ pub async fn handle_incoming_block( for needed_hash in needed_hashes { let Some(tx) = given_txs.remove(&needed_hash) else { + // We return back the indexes of all txs missing from our pool, not taking into account the txs + // that were given with the block, as these txs will be dropped. It is not worth it to try to add + // these txs to the pool as this will only happen with a misbehaving peer or if the txpool reaches + // the size limit. return Err(IncomingBlockError::UnknownTransactions(block_hash, missing)); }; diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index 75d15b0b..999b3924 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -17,6 +17,7 @@ use cuprate_p2p::{ BroadcastSvc, NetworkInterface, }; use cuprate_p2p_core::ClearNet; +use cuprate_txpool::service::TxpoolWriteHandle; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, Chain, TransactionVerificationData, @@ -36,7 +37,6 @@ mod commands; mod handler; pub use commands::{BlockchainManagerCommand, IncomingBlockOk}; -use cuprate_txpool::service::TxpoolWriteHandle; /// Initialize the blockchain manager. /// diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index c3d256b3..3c59bd44 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -1,8 +1,10 @@ //! The blockchain manager handler functions. use bytes::Bytes; use futures::{TryFutureExt, TryStreamExt}; -use monero_serai::transaction::Input; -use monero_serai::{block::Block, transaction::Transaction}; +use monero_serai::{ + block::Block, + transaction::{Input, Transaction}, +}; use rayon::prelude::*; use std::ops::ControlFlow; use std::{collections::HashMap, sync::Arc}; @@ -24,11 +26,8 @@ use cuprate_types::{ AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation, }; -use crate::blockchain::manager::commands::IncomingBlockOk; use crate::{ - blockchain::{ - manager::commands::BlockchainManagerCommand, types::ConsensusBlockchainReadHandle, - }, + blockchain::manager::commands::{BlockchainManagerCommand, IncomingBlockOk}, constants::PANIC_CRITICAL_SERVICE_ERROR, signals::REORG_LOCK, }; diff --git a/binaries/cuprated/src/blockchain/syncer.rs b/binaries/cuprated/src/blockchain/syncer.rs index 7d6874e0..913c9834 100644 --- a/binaries/cuprated/src/blockchain/syncer.rs +++ b/binaries/cuprated/src/blockchain/syncer.rs @@ -1,11 +1,10 @@ // FIXME: This whole module is not great and should be rewritten when the PeerSet is made. -use std::{pin::pin, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use futures::StreamExt; -use tokio::time::interval; use tokio::{ sync::{mpsc, Notify}, - time::sleep, + time::interval, }; use tower::{Service, ServiceExt}; use tracing::instrument; diff --git a/binaries/cuprated/src/blockchain/types.rs b/binaries/cuprated/src/blockchain/types.rs index e3ee62b3..54e46621 100644 --- a/binaries/cuprated/src/blockchain/types.rs +++ b/binaries/cuprated/src/blockchain/types.rs @@ -1,13 +1,7 @@ -use std::task::{Context, Poll}; - -use futures::future::BoxFuture; -use futures::{FutureExt, TryFutureExt}; -use tower::{util::MapErr, Service}; +use tower::util::MapErr; use cuprate_blockchain::{cuprate_database::RuntimeError, service::BlockchainReadHandle}; use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService}; -use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse}; -use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; /// The [`BlockVerifierService`] with all generic types defined. pub type ConcreteBlockVerifierService = BlockVerifierService< diff --git a/binaries/cuprated/src/main.rs b/binaries/cuprated/src/main.rs index 0953f627..d3fe1f56 100644 --- a/binaries/cuprated/src/main.rs +++ b/binaries/cuprated/src/main.rs @@ -12,8 +12,6 @@ reason = "TODO: remove after v1.0.0" )] -use crate::txpool::IncomingTxHandler; - mod blockchain; mod config; mod constants; diff --git a/binaries/cuprated/src/p2p.rs b/binaries/cuprated/src/p2p.rs index f55d41db..cdf1cef7 100644 --- a/binaries/cuprated/src/p2p.rs +++ b/binaries/cuprated/src/p2p.rs @@ -2,4 +2,7 @@ //! //! Will handle initiating the P2P and contains a protocol request handler. +mod network_address; pub mod request_handler; + +pub use network_address::CrossNetworkInternalPeerId; diff --git a/binaries/cuprated/src/p2p/network_address.rs b/binaries/cuprated/src/p2p/network_address.rs new file mode 100644 index 00000000..7fa8e86c --- /dev/null +++ b/binaries/cuprated/src/p2p/network_address.rs @@ -0,0 +1,16 @@ +use std::net::SocketAddr; + +use cuprate_p2p_core::{client::InternalPeerID, ClearNet, NetworkZone}; + +/// An identifier for a P2P peer on any network. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum CrossNetworkInternalPeerId { + /// A clear-net peer. + ClearNet(InternalPeerID<::Addr>), +} + +impl From::Addr>> for CrossNetworkInternalPeerId { + fn from(addr: InternalPeerID<::Addr>) -> Self { + Self::ClearNet(addr) + } +} diff --git a/binaries/cuprated/src/statics.rs b/binaries/cuprated/src/statics.rs index 8aab1c9a..9839608f 100644 --- a/binaries/cuprated/src/statics.rs +++ b/binaries/cuprated/src/statics.rs @@ -1,7 +1,7 @@ //! Global `static`s used throughout `cuprated`. use std::{ - sync::{atomic::AtomicU64, LazyLock}, + sync::LazyLock, time::{SystemTime, UNIX_EPOCH}, }; diff --git a/binaries/cuprated/src/txpool.rs b/binaries/cuprated/src/txpool.rs index 76e5ccbf..a4a47a60 100644 --- a/binaries/cuprated/src/txpool.rs +++ b/binaries/cuprated/src/txpool.rs @@ -1,7 +1,6 @@ //! Transaction Pool //! -//! Will handle initiating the tx-pool, providing the preprocessor required for the dandelion pool. - +//! Handles initiating the tx-pool, providing the preprocessor required for the dandelion pool. use cuprate_consensus::BlockChainContextService; use cuprate_p2p::NetworkInterface; use cuprate_p2p_core::ClearNet; @@ -13,9 +12,11 @@ mod dandelion; mod incoming_tx; mod txs_being_handled; -pub use incoming_tx::{IncomingTxError, IncomingTxHandler, IncomingTxs}; +pub use incoming_tx::IncomingTxHandler; -pub fn init_incoming_tx_handler( +/// Initialize the [`IncomingTxHandler`]. +#[expect(clippy::significant_drop_tightening)] +pub fn incoming_tx_handler( clear_net: NetworkInterface, txpool_write_handle: TxpoolWriteHandle, txpool_read_handle: TxpoolReadHandle, diff --git a/binaries/cuprated/src/txpool/dandelion.rs b/binaries/cuprated/src/txpool/dandelion.rs index 482379bb..dfdcdc1f 100644 --- a/binaries/cuprated/src/txpool/dandelion.rs +++ b/binaries/cuprated/src/txpool/dandelion.rs @@ -1,19 +1,21 @@ use std::time::Duration; -use bytes::Bytes; -use cuprate_dandelion_tower::pool::DandelionPoolService; -use cuprate_dandelion_tower::{DandelionConfig, DandelionRouter, Graph}; +use super::incoming_tx::{DandelionTx, TxId}; +use crate::p2p::CrossNetworkInternalPeerId; +use cuprate_dandelion_tower::{ + pool::DandelionPoolService, DandelionConfig, DandelionRouter, Graph, +}; use cuprate_p2p::NetworkInterface; use cuprate_p2p_core::ClearNet; use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; -use cuprate_wire::NetworkAddress; - -use super::incoming_tx::{DandelionTx, TxId}; mod diffuse_service; mod stem_service; mod tx_store; +/// The configuration used for [`cuprate_dandelion_tower`]. +/// +/// TODO: should we expose this? probably not. const DANDELION_CONFIG: DandelionConfig = DandelionConfig { time_between_hop: Duration::from_millis(175), epoch_duration: Duration::from_secs(10 * 60), @@ -21,20 +23,23 @@ const DANDELION_CONFIG: DandelionConfig = DandelionConfig { graph: Graph::FourRegular, }; +/// A [`DandelionRouter`] with all generic types defined. type ConcreteDandelionRouter = DandelionRouter< stem_service::OutboundPeerStream, diffuse_service::DiffuseService, - NetworkAddress, + CrossNetworkInternalPeerId, stem_service::StemPeerService, DandelionTx, >; +/// Starts the dandelion pool manager task and returns a handle to send txs to broadcast. pub fn start_dandelion_pool_manager( router: ConcreteDandelionRouter, txpool_read_handle: TxpoolReadHandle, txpool_write_handle: TxpoolWriteHandle, -) -> DandelionPoolService { +) -> DandelionPoolService { cuprate_dandelion_tower::pool::start_dandelion_pool_manager( + // TODO: make this constant configurable? 32, router, tx_store::TxStoreService { @@ -45,6 +50,7 @@ pub fn start_dandelion_pool_manager( ) } +/// Creates a [`DandelionRouter`] from a [`NetworkInterface`]. pub fn dandelion_router(clear_net: NetworkInterface) -> ConcreteDandelionRouter { DandelionRouter::new( diffuse_service::DiffuseService { diff --git a/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs b/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs index 115799d0..6ba12099 100644 --- a/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs +++ b/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs @@ -7,7 +7,7 @@ use futures::FutureExt; use tower::Service; use cuprate_dandelion_tower::traits::DiffuseRequest; -use cuprate_p2p::{BroadcastRequest, BroadcastSvc, NetworkInterface}; +use cuprate_p2p::{BroadcastRequest, BroadcastSvc}; use cuprate_p2p_core::ClearNet; use super::DandelionTx; @@ -29,7 +29,8 @@ impl Service> for DiffuseService { } fn call(&mut self, req: DiffuseRequest) -> Self::Future { - // TODO: Call `into_inner` when 1.82.0 stabilizes + // TODO: the dandelion crate should pass along where we got the tx from. + // TODO: Use `into_inner` when 1.82.0 stabilizes. self.clear_net_broadcast_service .call(BroadcastRequest::Transaction { tx_bytes: req.0 .0, diff --git a/binaries/cuprated/src/txpool/dandelion/stem_service.rs b/binaries/cuprated/src/txpool/dandelion/stem_service.rs index 330c8843..8a50c060 100644 --- a/binaries/cuprated/src/txpool/dandelion/stem_service.rs +++ b/binaries/cuprated/src/txpool/dandelion/stem_service.rs @@ -3,19 +3,20 @@ use std::{ task::{Context, Poll}, }; +use bytes::Bytes; +use futures::Stream; +use tower::Service; + use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer}; use cuprate_p2p::NetworkInterface; use cuprate_p2p_core::{ client::{Client, InternalPeerID}, ClearNet, NetworkZone, PeerRequest, ProtocolRequest, }; -use cuprate_wire::{protocol::NewTransactions, NetworkAddress}; - -use bytes::Bytes; -use futures::Stream; -use tower::Service; +use cuprate_wire::protocol::NewTransactions; use super::DandelionTx; +use crate::p2p::CrossNetworkInternalPeerId; /// The dandelion outbound peer stream. pub struct OutboundPeerStream { @@ -23,7 +24,10 @@ pub struct OutboundPeerStream { } impl Stream for OutboundPeerStream { - type Item = Result>, tower::BoxError>; + type Item = Result< + OutboundPeer>, + tower::BoxError, + >; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { // TODO: make the outbound peer choice random. @@ -32,12 +36,10 @@ impl Stream for OutboundPeerStream { .client_pool() .outbound_client() .map_or(OutboundPeer::Exhausted, |client| { - let addr = match client.info.id { - InternalPeerID::KnownAddr(addr) => addr, - InternalPeerID::Unknown(_) => panic!("Outbound peer had an unknown address"), - }; - - OutboundPeer::Peer(addr.into(), StemPeerService(client)) + OutboundPeer::Peer( + CrossNetworkInternalPeerId::ClearNet(client.info.id), + StemPeerService(client), + ) })))) } } diff --git a/binaries/cuprated/src/txpool/dandelion/tx_store.rs b/binaries/cuprated/src/txpool/dandelion/tx_store.rs index c8fda16a..b890ffdc 100644 --- a/binaries/cuprated/src/txpool/dandelion/tx_store.rs +++ b/binaries/cuprated/src/txpool/dandelion/tx_store.rs @@ -1,11 +1,8 @@ -use std::{ - f32::consts::E, - task::{Context, Poll}, -}; +use std::task::{Context, Poll}; use bytes::Bytes; -use futures::{future::BoxFuture, FutureExt, StreamExt, TryFutureExt}; -use tower::{util::Oneshot, Service, ServiceExt}; +use futures::{future::BoxFuture, FutureExt}; +use tower::{Service, ServiceExt}; use cuprate_dandelion_tower::{ traits::{TxStoreRequest, TxStoreResponse}, @@ -13,7 +10,7 @@ use cuprate_dandelion_tower::{ }; use cuprate_database::RuntimeError; use cuprate_txpool::service::{ - interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse}, + interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest}, TxpoolReadHandle, TxpoolWriteHandle, }; diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index 588e1b05..8ec6d621 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -1,24 +1,20 @@ use std::{ collections::HashSet, - future::ready, sync::Arc, task::{Context, Poll}, }; use bytes::Bytes; -use dashmap::DashSet; use futures::{future::BoxFuture, FutureExt}; use monero_serai::transaction::Transaction; -use sha3::{Digest, Sha3_256}; use tower::{Service, ServiceExt}; use cuprate_consensus::{ transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse, - BlockChainContextService, ExtendedConsensusError, TxVerifierService, VerifyTxRequest, - VerifyTxResponse, + BlockChainContextService, ExtendedConsensusError, VerifyTxRequest, }; use cuprate_dandelion_tower::{ - pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder}, + pool::{DandelionPoolService, IncomingTxBuilder}, State, TxState, }; use cuprate_helper::asynch::rayon_spawn_async; @@ -27,8 +23,8 @@ use cuprate_txpool::service::{ TxpoolReadHandle, TxpoolWriteHandle, }; use cuprate_types::TransactionVerificationData; -use cuprate_wire::NetworkAddress; +use crate::p2p::CrossNetworkInternalPeerId; use crate::{ blockchain::ConcreteTxVerifierService, constants::PANIC_CRITICAL_SERVICE_ERROR, @@ -45,8 +41,10 @@ pub enum IncomingTxError { /// Incoming transactions. pub struct IncomingTxs { + /// The raw bytes of the transactions. pub txs: Vec, - pub state: TxState, + /// The routing state of the transactions. + pub state: TxState, } /// The transaction type used for dandelion++. @@ -65,7 +63,8 @@ pub struct IncomingTxHandler { /// The blockchain context cache. pub(super) blockchain_context_cache: BlockChainContextService, /// The dandelion txpool manager. - pub(super) dandelion_pool_manager: DandelionPoolService, + pub(super) dandelion_pool_manager: + DandelionPoolService, /// The transaction verifier service. pub(super) tx_verifier_service: ConcreteTxVerifierService, /// The txpool write handle. @@ -98,18 +97,19 @@ impl Service for IncomingTxHandler { } } +/// Handles the incoming txs. #[expect(clippy::too_many_arguments)] async fn handle_incoming_txs( txs: Vec, - state: TxState, + state: TxState, txs_being_handled: TxsBeingHandled, mut blockchain_context_cache: BlockChainContextService, mut tx_verifier_service: ConcreteTxVerifierService, mut txpool_write_handle: TxpoolWriteHandle, mut txpool_read_handle: TxpoolReadHandle, - mut dandelion_pool_manager: DandelionPoolService, + mut dandelion_pool_manager: DandelionPoolService, ) -> Result<(), IncomingTxError> { - let reorg_guard = REORG_LOCK.read().await; + let _reorg_guard = REORG_LOCK.read().await; let (txs, stem_pool_txs, txs_being_handled_guard) = prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?; @@ -167,6 +167,8 @@ async fn handle_incoming_txs( /// Prepares the incoming transactions for verification. /// /// This will filter out all transactions already in the pool or txs already being handled in another request. +/// +/// # Returns async fn prepare_incoming_txs( tx_blobs: Vec, txs_being_handled: TxsBeingHandled, @@ -246,9 +248,13 @@ async fn prepare_incoming_txs( async fn handle_valid_tx( tx: Arc, - state: TxState, + state: TxState, txpool_write_handle: &mut TxpoolWriteHandle, - dandelion_pool_manager: &mut DandelionPoolService, + dandelion_pool_manager: &mut DandelionPoolService< + DandelionTx, + TxId, + CrossNetworkInternalPeerId, + >, ) { let incoming_tx = IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash); @@ -291,9 +297,13 @@ async fn handle_valid_tx( async fn rerelay_stem_tx( tx_hash: &TxId, - state: TxState, + state: TxState, txpool_read_handle: &mut TxpoolReadHandle, - dandelion_pool_manager: &mut DandelionPoolService, + dandelion_pool_manager: &mut DandelionPoolService< + DandelionTx, + TxId, + CrossNetworkInternalPeerId, + >, ) { let TxpoolReadResponse::TxBlob { tx_blob, .. } = txpool_read_handle .ready()