From d0055f16bdc73dc7cd2b465e55007899da9098ca Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Thu, 31 Oct 2024 23:32:32 +0000 Subject: [PATCH] fix build --- Cargo.lock | 2 - binaries/cuprated/src/blockchain/manager.rs | 1 - binaries/cuprated/src/p2p.rs | 43 ++++++++++++++++++++ binaries/cuprated/src/p2p/request_handler.rs | 33 ++++++++++----- binaries/cuprated/src/txpool.rs | 2 +- binaries/cuprated/src/txpool/incoming_tx.rs | 5 +++ 6 files changed, 71 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a2be6d61..3d4aa692 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -963,7 +963,6 @@ dependencies = [ "monero-serai", "rayon", "serde", - "sha3", "tempfile", "thiserror", "tokio", @@ -1064,7 +1063,6 @@ dependencies = [ "serde", "serde_bytes", "serde_json", - "sha3", "thiserror", "thread_local", "tokio", diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index 1dba6407..2166795e 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -38,7 +38,6 @@ mod commands; mod handler; pub use commands::{BlockchainManagerCommand, IncomingBlockOk}; -use cuprate_txpool::service::TxpoolWriteHandle; /// Initialize the blockchain manager. /// diff --git a/binaries/cuprated/src/p2p.rs b/binaries/cuprated/src/p2p.rs index cdf1cef7..9e24b607 100644 --- a/binaries/cuprated/src/p2p.rs +++ b/binaries/cuprated/src/p2p.rs @@ -1,8 +1,51 @@ //! P2P //! //! Will handle initiating the P2P and contains a protocol request handler. +use crate::txpool::IncomingTxHandler; +use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_consensus::BlockChainContextService; +use cuprate_p2p::{NetworkInterface, P2PConfig}; +use cuprate_p2p_core::ClearNet; +use cuprate_txpool::service::TxpoolReadHandle; +use futures::{FutureExt, TryFutureExt}; +use tokio::sync::oneshot; +use tower::ServiceExt; +mod core_sync_service; mod network_address; pub mod request_handler; pub use network_address::CrossNetworkInternalPeerId; + +pub async fn start_clearnet_p2p( + blockchain_read_handle: BlockchainReadHandle, + blockchain_context_service: BlockChainContextService, + txpool_read_handle: TxpoolReadHandle, + config: P2PConfig, +) -> Result< + ( + NetworkInterface, + oneshot::Sender, + ), + tower::BoxError, +> { + let (incoming_tx_handler_tx, incoming_tx_handler_rx) = oneshot::channel(); + + let request_handler_maker = request_handler::P2pProtocolRequestHandlerMaker { + blockchain_read_handle, + blockchain_context_service: blockchain_context_service.clone(), + txpool_read_handle, + incoming_tx_handler: None, + incoming_tx_handler_fut: incoming_tx_handler_rx.shared(), + }; + + Ok(( + cuprate_p2p::initialize_network( + request_handler_maker.map_response(|s| s.map_err(Into::into)), + core_sync_service::CoreSyncService(blockchain_context_service), + config, + ) + .await?, + incoming_tx_handler_tx, + )) +} diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index be88a049..0f39ecec 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -42,6 +42,7 @@ use cuprate_wire::protocol::{ use crate::blockchain::interface::{self as blockchain_interface, IncomingBlockError}; use crate::constants::PANIC_CRITICAL_SERVICE_ERROR; +use crate::p2p::CrossNetworkInternalPeerId; use crate::txpool::{IncomingTxHandler, IncomingTxs}; /// The P2P protocol request handler [`MakeService`](tower::MakeService). @@ -60,8 +61,12 @@ pub struct P2pProtocolRequestHandlerMaker { pub incoming_tx_handler_fut: Shared>, } -impl Service> for P2pProtocolRequestHandlerMaker { - type Response = P2pProtocolRequestHandler; +impl Service> for P2pProtocolRequestHandlerMaker +where + A: NetZoneAddress, + InternalPeerID: Into, +{ + type Response = P2pProtocolRequestHandler; type Error = tower::BoxError; type Future = Ready>; @@ -79,7 +84,7 @@ impl Service> for P2pProtocolRequestHandle Poll::Ready(Ok(())) } - fn call(&mut self, peer_information: PeerInformation) -> Self::Future { + fn call(&mut self, peer_information: PeerInformation) -> Self::Future { let Some(incoming_tx_handler) = self.incoming_tx_handler.clone() else { panic!("poll_ready was not called or did not return `Poll::Ready`") }; @@ -114,7 +119,11 @@ pub struct P2pProtocolRequestHandler { incoming_tx_handler: IncomingTxHandler, } -impl Service for P2pProtocolRequestHandler { +impl Service for P2pProtocolRequestHandler +where + A: NetZoneAddress, + InternalPeerID: Into, +{ type Response = ProtocolResponse; type Error = anyhow::Error; type Future = BoxFuture<'static, Result>; @@ -355,12 +364,16 @@ async fn new_fluffy_block( } /// [`ProtocolRequest::NewTransactions`] -async fn new_transactions( +async fn new_transactions( peer_information: PeerInformation, request: NewTransactions, mut blockchain_context_service: BlockChainContextService, mut incoming_tx_handler: IncomingTxHandler, -) -> anyhow::Result { +) -> anyhow::Result +where + A: NetZoneAddress, + InternalPeerID: Into, +{ let BlockChainContextResponse::Context(context) = blockchain_context_service .ready() .await @@ -387,11 +400,9 @@ async fn new_transactions( let state = if request.dandelionpp_fluff { TxState::Fluff } else { - let InternalPeerID::KnownAddr(addr) = peer_information.id else { - todo!("Anonymity networks") - }; - - TxState::Stem { from: addr.into() } + TxState::Stem { + from: peer_information.id.into(), + } }; drop(request.padding); diff --git a/binaries/cuprated/src/txpool.rs b/binaries/cuprated/src/txpool.rs index 9592c2bf..20769567 100644 --- a/binaries/cuprated/src/txpool.rs +++ b/binaries/cuprated/src/txpool.rs @@ -12,4 +12,4 @@ mod dandelion; mod incoming_tx; mod txs_being_handled; -pub use incoming_tx::IncomingTxHandler; +pub use incoming_tx::{IncomingTxError, IncomingTxHandler, IncomingTxs}; diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index e2041598..bf7684e4 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -43,9 +43,13 @@ use crate::{ }; /// An error that can happen handling an incoming tx. +#[derive(Debug, thiserror::Error)] pub enum IncomingTxError { + #[error("Error parsing tx: {0}")] Parse(std::io::Error), + #[error(transparent)] Consensus(ExtendedConsensusError), + #[error("Duplicate tx in message")] DuplicateTransaction, } @@ -67,6 +71,7 @@ pub(super) type TxId = [u8; 32]; /// The service than handles incoming transaction pool transactions. /// /// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes. +#[derive(Clone)] pub struct IncomingTxHandler { /// A store of txs currently being handled in incoming tx requests. pub(super) txs_being_handled: TxsBeingHandled,