diff --git a/p2p/p2p/src/constants.rs b/p2p/p2p/src/constants.rs index 44dba91..4c08eb8 100644 --- a/p2p/p2p/src/constants.rs +++ b/p2p/p2p/src/constants.rs @@ -3,6 +3,12 @@ use std::time::Duration; /// The timeout we set on handshakes. pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(20); +/// The timeout we set on receiving ping requests +pub(crate) const PING_REQUEST_TIMEOUT: Duration = Duration::from_secs(5); + +/// The amount of concurrency (maximum number of simultaneous tasks) we allow for handling ping requests +pub(crate) const PING_REQUEST_CONCURRENCY: usize = 2; + /// The maximum amount of connections to make to seed nodes for when we need peers. pub(crate) const MAX_SEED_CONNECTIONS: usize = 3; diff --git a/p2p/p2p/src/inbound_server.rs b/p2p/p2p/src/inbound_server.rs index aa971a5..80ff38e 100644 --- a/p2p/p2p/src/inbound_server.rs +++ b/p2p/p2p/src/inbound_server.rs @@ -4,9 +4,10 @@ //! them to the handshaker service and then adds them to the client pool. use std::{pin::pin, sync::Arc}; -use futures::StreamExt; +use futures::{SinkExt, StreamExt}; use tokio::{ sync::Semaphore, + task::JoinSet, time::{sleep, timeout}, }; use tower::{Service, ServiceExt}; @@ -17,14 +18,22 @@ use cuprate_p2p_core::{ services::{AddressBookRequest, AddressBookResponse}, AddressBook, ConnectionDirection, NetworkZone, }; +use cuprate_wire::{ + admin::{PingResponse, PING_OK_RESPONSE_STATUS_TEXT}, + AdminRequestMessage, AdminResponseMessage, Message, +}; use crate::{ client_pool::ClientPool, - constants::{HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN}, + constants::{ + HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN, PING_REQUEST_CONCURRENCY, + PING_REQUEST_TIMEOUT, + }, P2PConfig, }; -/// Starts the inbound server. +/// Starts the inbound server. This function will listen to all incoming connections +/// and initiate handshake if needed, after verifying the address isn't banned. #[instrument(level = "warn", skip_all)] pub async fn inbound_server( client_pool: Arc>, @@ -40,6 +49,10 @@ where HS::Future: Send + 'static, A: AddressBook, { + // Copying the peer_id before borrowing for ping responses (Make us avoid a `clone()`). + let our_peer_id = config.basic_node_data().peer_id; + + // Mandatory. Extract server config from P2PConfig let Some(server_config) = config.server_config else { tracing::warn!("No inbound server config provided, not listening for inbound connections."); return Ok(()); @@ -53,13 +66,18 @@ where let mut listener = pin!(listener); + // Create semaphore for limiting to maximum inbound connections. let semaphore = Arc::new(Semaphore::new(config.max_inbound_connections)); + // Create ping request handling JoinSet + let mut ping_join_set = JoinSet::new(); + // Listen to incoming connections and extract necessary information. while let Some(connection) = listener.next().await { - let Ok((addr, peer_stream, peer_sink)) = connection else { + let Ok((addr, mut peer_stream, mut peer_sink)) = connection else { continue; }; + // If peer is banned, drop connection if let Some(addr) = &addr { let AddressBookResponse::IsPeerBanned(banned) = address_book .ready() @@ -75,11 +93,13 @@ where } } + // Create a new internal id for new peers let addr = match addr { Some(addr) => InternalPeerID::KnownAddr(addr), None => InternalPeerID::Unknown(rand::random()), }; + // If we're still behind our maximum limit, Initiate handshake. if let Ok(permit) = semaphore.clone().try_acquire_owned() { tracing::debug!("Permit free for incoming connection, attempting handshake."); @@ -102,8 +122,39 @@ where .instrument(Span::current()), ); } else { + // Otherwise check if the node is simply pinging us. tracing::debug!("No permit free for incoming connection."); - // TODO: listen for if the peer is just trying to ping us to see if we are reachable. + + // We only handle 2 ping request conccurently. Otherwise we drop the connection immediately. + if ping_join_set.len() < PING_REQUEST_CONCURRENCY { + ping_join_set.spawn( + async move { + // Await first message from node. If it is a ping request we respond back, otherwise we drop the connection. + let fut = timeout(PING_REQUEST_TIMEOUT, peer_stream.next()); + + // Ok if timeout did not elapsed -> Some if there is a message -> Ok if it has been decoded + if let Ok(Some(Ok(Message::Request(AdminRequestMessage::Ping)))) = fut.await + { + let response = peer_sink + .send( + Message::Response(AdminResponseMessage::Ping(PingResponse { + status: PING_OK_RESPONSE_STATUS_TEXT, + peer_id: our_peer_id, + })) + .into(), + ) + .await; + + if let Err(err) = response { + tracing::debug!( + "Unable to respond to ping request from peer ({addr}): {err}" + ) + } + } + } + .instrument(Span::current()), + ); + } } sleep(INBOUND_CONNECTION_COOL_DOWN).await;