diff --git a/binaries/cuprated/src/p2p.rs b/binaries/cuprated/src/p2p.rs index 9e24b607..aeb98b60 100644 --- a/binaries/cuprated/src/p2p.rs +++ b/binaries/cuprated/src/p2p.rs @@ -1,15 +1,17 @@ //! P2P //! //! Will handle initiating the P2P and contains a protocol request handler. -use crate::txpool::IncomingTxHandler; +use futures::{FutureExt, TryFutureExt}; +use tokio::sync::oneshot; +use tower::ServiceExt; + use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_consensus::BlockChainContextService; use cuprate_p2p::{NetworkInterface, P2PConfig}; use cuprate_p2p_core::ClearNet; use cuprate_txpool::service::TxpoolReadHandle; -use futures::{FutureExt, TryFutureExt}; -use tokio::sync::oneshot; -use tower::ServiceExt; + +use crate::txpool::IncomingTxHandler; mod core_sync_service; mod network_address; @@ -17,6 +19,10 @@ pub mod request_handler; pub use network_address::CrossNetworkInternalPeerId; +/// Starts the P2P clearnet network, returning a [`NetworkInterface`] to interact with it. +/// +/// A [`oneshot::Sender`] is also returned to provide the [`IncomingTxHandler`], until this is provided network +/// handshakes can not be completed. pub async fn start_clearnet_p2p( blockchain_read_handle: BlockchainReadHandle, blockchain_context_service: BlockChainContextService, diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index c93f0580..cb0757c6 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -1,13 +1,16 @@ -use bytes::Bytes; -use futures::future::Shared; -use futures::{future::BoxFuture, FutureExt}; -use monero_serai::{block::Block, transaction::Transaction}; -use std::hash::Hash; use std::{ collections::HashSet, future::{ready, Ready}, + hash::Hash, task::{Context, Poll}, }; + +use bytes::Bytes; +use futures::{ + future::{BoxFuture, Shared}, + FutureExt, +}; +use monero_serai::{block::Block, transaction::Transaction}; use tokio::sync::{broadcast, oneshot, watch}; use tokio_stream::wrappers::WatchStream; use tower::{Service, ServiceExt}; @@ -28,9 +31,9 @@ use cuprate_helper::{ use cuprate_p2p::constants::{ MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MAX_BLOCK_BATCH_LEN, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN, }; -use cuprate_p2p_core::client::InternalPeerID; use cuprate_p2p_core::{ - client::PeerInformation, NetZoneAddress, NetworkZone, ProtocolRequest, ProtocolResponse, + client::{InternalPeerID, PeerInformation}, + NetZoneAddress, NetworkZone, ProtocolRequest, ProtocolResponse, }; use cuprate_txpool::service::TxpoolReadHandle; use cuprate_types::{ @@ -42,30 +45,32 @@ use cuprate_wire::protocol::{ GetObjectsResponse, NewFluffyBlock, NewTransactions, }; -use crate::blockchain::interface::{self as blockchain_interface, IncomingBlockError}; -use crate::constants::PANIC_CRITICAL_SERVICE_ERROR; -use crate::p2p::CrossNetworkInternalPeerId; -use crate::txpool::{IncomingTxHandler, IncomingTxs}; +use crate::{ + blockchain::interface::{self as blockchain_interface, IncomingBlockError}, + constants::PANIC_CRITICAL_SERVICE_ERROR, + p2p::CrossNetworkInternalPeerId, + txpool::{IncomingTxError, IncomingTxHandler, IncomingTxs}, +}; /// The P2P protocol request handler [`MakeService`](tower::MakeService). #[derive(Clone)] pub struct P2pProtocolRequestHandlerMaker { - /// The [`BlockchainReadHandle`] pub blockchain_read_handle: BlockchainReadHandle, pub blockchain_context_service: BlockChainContextService, - /// The [`TxpoolReadHandle`]. pub txpool_read_handle: TxpoolReadHandle, + /// The [`IncomingTxHandler`], wrapped in an [`Option`] as there is a cyclic reference between [`P2pProtocolRequestHandlerMaker`] + /// and the [`IncomingTxHandler`]. pub incoming_tx_handler: Option, + /// A [`Future`](std::future::Future) that produces the [`IncomingTxHandler`]. pub incoming_tx_handler_fut: Shared>, } impl Service> for P2pProtocolRequestHandlerMaker where - A: NetZoneAddress, InternalPeerID: Into, { type Response = P2pProtocolRequestHandler; @@ -109,13 +114,12 @@ where /// The P2P protocol request handler. #[derive(Clone)] pub struct P2pProtocolRequestHandler { - /// The [`PeerInformation`] for this peer. peer_information: PeerInformation, - /// The [`BlockchainReadHandle`]. + blockchain_read_handle: BlockchainReadHandle, blockchain_context_service: BlockChainContextService, - /// The [`TxpoolReadHandle`]. + txpool_read_handle: TxpoolReadHandle, incoming_tx_handler: IncomingTxHandler, @@ -123,7 +127,6 @@ pub struct P2pProtocolRequestHandler { impl Service for P2pProtocolRequestHandler where - A: NetZoneAddress, InternalPeerID: Into, { type Response = ProtocolResponse; @@ -163,7 +166,7 @@ where self.incoming_tx_handler.clone(), ) .boxed(), - ProtocolRequest::GetTxPoolCompliment(_) => ready(Ok(ProtocolResponse::NA)).boxed(), // TODO: tx-pool + ProtocolRequest::GetTxPoolCompliment(_) => ready(Ok(ProtocolResponse::NA)).boxed(), // TODO: should we support this? } } } @@ -303,6 +306,7 @@ async fn new_fluffy_block( mut blockchain_read_handle: BlockchainReadHandle, mut txpool_read_handle: TxpoolReadHandle, ) -> anyhow::Result { + // TODO: check context service here and ignore the block? let current_blockchain_height = request.current_blockchain_height; peer_information @@ -324,7 +328,6 @@ async fn new_fluffy_block( .into_iter() .map(|tx_blob| { if tx_blob.len() > MAX_TRANSACTION_BLOB_SIZE { - peer_information.handle.ban_peer(MEDIUM_BAN); anyhow::bail!("Peer sent a transaction over the size limit."); } diff --git a/types/src/blockchain.rs b/types/src/blockchain.rs index b5da7cb7..13458bd0 100644 --- a/types/src/blockchain.rs +++ b/types/src/blockchain.rs @@ -13,9 +13,9 @@ use monero_serai::block::Block; use crate::{ types::{ Chain, ExtendedBlockHeader, MissingTxsInBlock, OutputOnChain, VerifiedBlockInformation, - }, ChainInfo, - AltBlockInformation, BlockCompleteEntry, ChainId, CoinbaseTxSum, OutputHistogramEntry, - OutputHistogramInput, + }, + AltBlockInformation, BlockCompleteEntry, ChainId, ChainInfo, CoinbaseTxSum, + OutputHistogramEntry, OutputHistogramInput, }; //---------------------------------------------------------------------------------------------------- ReadRequest @@ -132,7 +132,9 @@ pub enum BlockchainReadRequest { AltBlocksInChain(ChainId), /// Get a [`Block`] by its height. - Block { height: usize }, + Block { + height: usize, + }, /// Get a [`Block`] by its hash. BlockByHash([u8; 32]), @@ -152,7 +154,10 @@ pub enum BlockchainReadRequest { /// `N` last blocks starting at particular height. /// /// TODO: document fields after impl. - CoinbaseTxSum { height: usize, count: u64 }, + CoinbaseTxSum { + height: usize, + count: u64, + }, /// Get information on all alternative chains. AltChains, diff --git a/types/src/lib.rs b/types/src/lib.rs index a5a04f9d..51d37d6c 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -26,8 +26,8 @@ pub use transaction_verification_data::{ pub use types::{ AddAuxPow, AltBlockInformation, AuxPow, Chain, ChainId, ChainInfo, CoinbaseTxSum, ExtendedBlockHeader, FeeEstimate, HardForkInfo, MinerData, MinerDataTxBacklogEntry, - OutputHistogramEntry, OutputHistogramInput, OutputOnChain, VerifiedBlockInformation, - VerifiedTransactionInformation, + MissingTxsInBlock, OutputHistogramEntry, OutputHistogramInput, OutputOnChain, + VerifiedBlockInformation, VerifiedTransactionInformation, }; //---------------------------------------------------------------------------------------------------- Feature-gated