From 2fd74dec14d272fa30474d305a8b2db4fceb5953 Mon Sep 17 00:00:00 2001
From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com>
Date: Sun, 19 May 2024 18:35:07 +0100
Subject: [PATCH] add an inbound connection server

---
 p2p/address-book/src/lib.rs                  |  4 +-
 p2p/cuprate-p2p/src/constants.rs             |  6 ++
 p2p/cuprate-p2p/src/inbound_server.rs        | 84 ++++++++++++++++++++
 p2p/cuprate-p2p/src/lib.rs                   | 69 +++++++++++++---
 p2p/monero-p2p/src/lib.rs                    |  9 ++-
 p2p/monero-p2p/src/network_zones/clear.rs    |  6 +-
 p2p/monero-p2p/tests/fragmented_handshake.rs |  7 +-
 p2p/monero-p2p/tests/handshake.rs            |  4 +-
 test-utils/src/test_netzone.rs               | 10 ++-
 9 files changed, 169 insertions(+), 30 deletions(-)
 create mode 100644 p2p/cuprate-p2p/src/inbound_server.rs

diff --git a/p2p/address-book/src/lib.rs b/p2p/address-book/src/lib.rs
index c63c7691..afe48cb0 100644
--- a/p2p/address-book/src/lib.rs
+++ b/p2p/address-book/src/lib.rs
@@ -13,9 +13,7 @@
 //!
 use std::{io::ErrorKind, path::PathBuf, time::Duration};
 
-use tower::buffer::Buffer;
-
-use monero_p2p::{services::AddressBookRequest, NetworkZone};
+use monero_p2p::NetworkZone;
 
 mod book;
 mod peer_list;
diff --git a/p2p/cuprate-p2p/src/constants.rs b/p2p/cuprate-p2p/src/constants.rs
index a402f03b..927a4fe7 100644
--- a/p2p/cuprate-p2p/src/constants.rs
+++ b/p2p/cuprate-p2p/src/constants.rs
@@ -29,3 +29,9 @@ pub(crate) const MAX_TXS_IN_BROADCAST_CHANNEL: usize = 50;
 
 /// The durations of a short ban.
 pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10);
+
+/// The time to sleep after an inbound connection comes in.
+///
+/// This is a safety measure to prevent Cuprate from getting spammed with a load of inbound connections.
+/// TODO: it might be a good idea to make this configurable.
+pub(crate) const INBOUND_CONNECTION_COOL_DOWN: Duration = Duration::from_millis(750);
diff --git a/p2p/cuprate-p2p/src/inbound_server.rs b/p2p/cuprate-p2p/src/inbound_server.rs
new file mode 100644
index 00000000..11b3cb11
--- /dev/null
+++ b/p2p/cuprate-p2p/src/inbound_server.rs
@@ -0,0 +1,84 @@
+//! # Inbound Server
+//!
+//! This module contains the inbound connection server, which listens for inbound connections, gives
+//! them to the handshake service and then adds them to the client pool.
+use std::{pin::pin, sync::Arc};
+
+use futures::StreamExt;
+use monero_p2p::{
+    client::{Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
+    ConnectionDirection, NetworkZone,
+};
+use tokio::time::sleep;
+use tokio::{sync::Semaphore, time::timeout};
+use tower::{Service, ServiceExt};
+use tracing::instrument;
+
+use crate::constants::INBOUND_CONNECTION_COOL_DOWN;
+use crate::{client_pool::ClientPool, constants::HANDSHAKE_TIMEOUT, P2PConfig};
+
+#[instrument(level = "info", skip_all)]
+pub async fn inbound_server<N: NetworkZone, HS>(
+    client_pool: Arc<ClientPool<N>>,
+    mut handshaker: HS,
+    config: P2PConfig<N>,
+) -> Result<(), tower::BoxError>
+where
+    HS: Service<DoHandshakeRequest<N>, Response = Client<N>, Error = HandshakeError>
+        + Send
+        + 'static,
+    HS::Future: Send + 'static,
+{
+    let Some(server_config) = config.server_config else {
+        tracing::warn!("No inbound server config provided, not listening for inbound connections.");
+        return Ok(());
+    };
+
+    tracing::info!("Starting inbound connection server");
+
+    let listener = N::incoming_connection_listener(server_config, config.p2p_port)
+        .await
+        .inspect_err(|e| tracing::warn!("Failed to start inbound server: {e}"))?;
+
+    let mut listener = pin!(listener);
+
+    let semaphore = Arc::new(Semaphore::new(config.max_inbound_connections));
+
+    while let Some(connection) = listener.next().await {
+        let Ok((addr, peer_stream, peer_sink)) = connection else {
+            continue;
+        };
+
+        let addr = match addr {
+            Some(addr) => InternalPeerID::KnownAddr(addr),
+            None => InternalPeerID::Unknown(rand::random()),
+        };
+
+        if let Ok(permit) = semaphore.clone().try_acquire_owned() {
+            tracing::debug!("Permit free for incoming connection, attempting handshake.");
+
+            let fut = handshaker.ready().await?.call(DoHandshakeRequest {
+                addr,
+                peer_stream,
+                peer_sink,
+                direction: ConnectionDirection::InBound,
+                permit,
+            });
+
+            let cloned_pool = client_pool.clone();
+
+            tokio::spawn(async move {
+                if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, fut).await {
+                    cloned_pool.add_new_client(peer);
+                }
+            });
+        } else {
+            tracing::debug!("No permit free for incoming connection.");
+            // TODO: listen for if the peer is just trying to ping us to see if we are reachable.
+        }
+
+        sleep(INBOUND_CONNECTION_COOL_DOWN).await;
+    }
+
+    Ok(())
+}
diff --git a/p2p/cuprate-p2p/src/lib.rs b/p2p/cuprate-p2p/src/lib.rs
index 6a9f72b1..d83e270e 100644
--- a/p2p/cuprate-p2p/src/lib.rs
+++ b/p2p/cuprate-p2p/src/lib.rs
@@ -8,20 +8,25 @@
 #![allow(dead_code)]
 
 use std::sync::Arc;
-use tokio::sync::watch;
+use tokio::sync::{mpsc, watch};
 use tower::buffer::Buffer;
+use tracing::instrument;
 
-use monero_p2p::NetworkZone;
+use monero_p2p::{CoreSyncSvc, NetworkZone, PeerRequestHandler};
 
 mod broadcast;
 mod client_pool;
 pub mod config;
 pub mod connection_maintainer;
 mod constants;
+mod inbound_server;
 mod sync_states;
 
+use crate::connection_maintainer::MakeConnectionRequest;
 pub use config::P2PConfig;
+use monero_p2p::client::Connector;
 
+#[instrument(level="warn", name="net", skip_all, fields(zone=N::NAME))]
 pub async fn initialize_network<N, R, CS>(
     peer_req_handler: R,
     core_sync_svc: CS,
@@ -29,14 +34,21 @@ pub async fn initialize_network<N, R, CS>(
 ) -> Result<NetworkInterface<N>, tower::BoxError>
 where
     N: NetworkZone,
+    R: PeerRequestHandler + Clone,
+    CS: CoreSyncSvc + Clone,
 {
-    let address_book = monero_address_book::init_address_book(config.address_book_config).await?;
+    let address_book =
+        monero_address_book::init_address_book(config.address_book_config.clone()).await?;
     let address_book = Buffer::new(
         address_book,
         config.max_inbound_connections + config.outbound_connections,
     );
 
-    let (sync_states_svc, top_block_watcher) = sync_states::PeerSyncSvc::new();
+    let (sync_states_svc, top_block_watch) = sync_states::PeerSyncSvc::new();
+    let sync_states_svc = Buffer::new(
+        sync_states_svc,
+        config.max_inbound_connections + config.outbound_connections,
+    );
 
     // Use the default config. Changing the defaults effects tx fluff times, which could effect D++ so for now don't allow changing
     // this.
@@ -46,34 +58,65 @@ where
     let mut basic_node_data = config.basic_node_data();
 
     if !N::CHECK_NODE_ID {
-        // TODO: make sure this is the value monerod sets for anonn networks.
+        // TODO: make sure this is the value monerod sets for anon networks.
         basic_node_data.peer_id = 1;
     }
 
     let outbound_handshaker = monero_p2p::client::HandShaker::new(
         address_book.clone(),
-        sync_states_svc,
-        core_sync_svc,
-        peer_req_handler,
+        sync_states_svc.clone(),
+        core_sync_svc.clone(),
+        peer_req_handler.clone(),
         outbound_mkr,
-        basic_node_data,
+        basic_node_data.clone(),
     );
 
     let inbound_handshaker = monero_p2p::client::HandShaker::new(
         address_book.clone(),
         sync_states_svc,
-        core_sync_svc,
+        core_sync_svc.clone(),
         peer_req_handler,
         inbound_mkr,
         basic_node_data,
     );
-    
-    let outb
-    
+
+    let client_pool = client_pool::ClientPool::new();
+
+    let (make_connection_tx, make_connection_rx) = mpsc::channel(3);
+
+    let outbound_connector = Connector::new(outbound_handshaker);
+    let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new(
+        config.clone(),
+        client_pool.clone(),
+        make_connection_rx,
+        address_book.clone(),
+        outbound_connector,
+    );
+
+    tokio::spawn(outbound_connection_maintainer.run());
+    tokio::spawn(inbound_server::inbound_server(
+        client_pool.clone(),
+        inbound_handshaker,
+        config,
+    ));
+
+    Ok(NetworkInterface {
+        pool: client_pool,
+        broadcast_svc,
+        top_block_watch,
+        make_connection_tx,
+    })
 }
 
+/// The interface to Monero's P2P network on a certain [`NetworkZone`].
 pub struct NetworkInterface<N: NetworkZone> {
+    /// A pool of free connected peers.
     pool: Arc<client_pool::ClientPool<N>>,
+    /// A [`Service`](tower::Service) that allows broadcasting to all connected peers.
     broadcast_svc: broadcast::BroadcastSvc<N>,
+    /// A [`watch`] channel that contains the highest seen cumulative difficulty and other info
+    /// on that claimed chain.
     top_block_watch: watch::Receiver<sync_states::NewSyncInfo>,
+    /// A channel to request extra connections.
+    make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
 }
diff --git a/p2p/monero-p2p/src/lib.rs b/p2p/monero-p2p/src/lib.rs
index 9c171320..13ecf4aa 100644
--- a/p2p/monero-p2p/src/lib.rs
+++ b/p2p/monero-p2p/src/lib.rs
@@ -130,11 +130,11 @@ pub trait NetworkZone: Clone + Copy + Send + 'static {
     /// The sink (outgoing data) type for this network.
     type Sink: Sink<LevinMessage<Message>, Error = BucketError> + Unpin + Send + 'static;
     /// The inbound connection listener for this network.
-    type Listener: Stream<
-        Item = Result<(Option<Self::Addr>, Self::Stream, Self::Sink), std::io::Error>,
-    >;
+    type Listener: Stream<Item = Result<(Option<Self::Addr>, Self::Stream, Self::Sink), std::io::Error>>
+        + Send
+        + 'static;
     /// Config used to start a server which listens for incoming connections.
-    type ServerCfg;
+    type ServerCfg: Clone + Debug + Send + 'static;
 
     async fn connect_to_peer(
         addr: Self::Addr,
@@ -142,6 +142,7 @@ pub trait NetworkZone: Clone + Copy + Send + 'static {
 
     async fn incoming_connection_listener(
         config: Self::ServerCfg,
+        port: u16,
     ) -> Result<Self::Listener, std::io::Error>;
 }
 
diff --git a/p2p/monero-p2p/src/network_zones/clear.rs b/p2p/monero-p2p/src/network_zones/clear.rs
index 5141a069..c77f1333 100644
--- a/p2p/monero-p2p/src/network_zones/clear.rs
+++ b/p2p/monero-p2p/src/network_zones/clear.rs
@@ -37,8 +37,9 @@ impl NetZoneAddress for SocketAddr {
     }
 }
 
+#[derive(Debug, Clone)]
 pub struct ClearNetServerCfg {
-    pub addr: SocketAddr,
+    pub ip: IpAddr,
 }
 
 #[derive(Clone, Copy)]
@@ -80,8 +81,9 @@ impl NetworkZone for ClearNet {
 
     async fn incoming_connection_listener(
         config: Self::ServerCfg,
+        port: u16,
     ) -> Result<Self::Listener, std::io::Error> {
-        let listener = TcpListener::bind(config.addr).await?;
+        let listener = TcpListener::bind(SocketAddr::new(config.ip, port)).await?;
         Ok(InBoundStream { listener })
     }
 }
diff --git a/p2p/monero-p2p/tests/fragmented_handshake.rs b/p2p/monero-p2p/tests/fragmented_handshake.rs
index 60d490f8..e9833cf3 100644
--- a/p2p/monero-p2p/tests/fragmented_handshake.rs
+++ b/p2p/monero-p2p/tests/fragmented_handshake.rs
@@ -71,8 +71,9 @@ impl NetworkZone for FragNet {
 
     async fn incoming_connection_listener(
         config: Self::ServerCfg,
+        port: u16,
     ) -> Result<Self::Listener, std::io::Error> {
-        let listener = TcpListener::bind(config.addr).await?;
+        let listener = TcpListener::bind(SocketAddr::new(config.ip, port)).await?;
         Ok(InBoundStream { listener })
     }
 }
@@ -194,9 +195,9 @@ async fn fragmented_handshake_monerod_to_cuprate() {
         our_basic_node_data,
     );
 
-    let addr = "127.0.0.1:18081".parse().unwrap();
+    let ip = "127.0.0.1".parse().unwrap();
 
-    let mut listener = FragNet::incoming_connection_listener(ClearNetServerCfg { addr })
+    let mut listener = FragNet::incoming_connection_listener(ClearNetServerCfg { ip }, 18081)
         .await
         .unwrap();
 
diff --git a/p2p/monero-p2p/tests/handshake.rs b/p2p/monero-p2p/tests/handshake.rs
index 1d8b649c..b63a221b 100644
--- a/p2p/monero-p2p/tests/handshake.rs
+++ b/p2p/monero-p2p/tests/handshake.rs
@@ -174,9 +174,9 @@ async fn handshake_monerod_to_cuprate() {
         our_basic_node_data,
     );
 
-    let addr = "127.0.0.1:18081".parse().unwrap();
+    let ip = "127.0.0.1".parse().unwrap();
 
-    let mut listener = ClearNet::incoming_connection_listener(ClearNetServerCfg { addr })
+    let mut listener = ClearNet::incoming_connection_listener(ClearNetServerCfg { ip }, 18081)
         .await
         .unwrap();
 
diff --git a/test-utils/src/test_netzone.rs b/test-utils/src/test_netzone.rs
index 0a534164..e82e5532 100644
--- a/test-utils/src/test_netzone.rs
+++ b/test-utils/src/test_netzone.rs
@@ -87,8 +87,9 @@ impl<const ALLOW_SYNC: bool, const DANDELION_PP: bool, const CHECK_NODE_ID: bool
     type Listener = Pin<
         Box<
             dyn Stream<
-                Item = Result<(Option<Self::Addr>, Self::Stream, Self::Sink), std::io::Error>,
-            >,
+                    Item = Result<(Option<Self::Addr>, Self::Stream, Self::Sink), std::io::Error>,
+                > + Send
+                + 'static,
         >,
     >;
     type ServerCfg = ();
@@ -97,7 +98,10 @@ impl<const ALLOW_SYNC: bool, const DANDELION_PP: bool, const CHECK_NODE_ID: bool
         unimplemented!()
     }
 
-    async fn incoming_connection_listener(_: Self::ServerCfg) -> Result<Self::Listener, Error> {
+    async fn incoming_connection_listener(
+        _: Self::ServerCfg,
+        _: u16,
+    ) -> Result<Self::Listener, Error> {
         unimplemented!()
     }
 }