From b9caee9335b073235f5fef1b0a805360b06c0976 Mon Sep 17 00:00:00 2001
From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com>
Date: Tue, 30 Apr 2024 02:02:12 +0100
Subject: [PATCH] init cuprate-p2p

---
 Cargo.lock                                    |  43 +++
 Cargo.toml                                    |   3 +
 p2p/cuprate-p2p/Cargo.toml                    |  38 +++
 p2p/cuprate-p2p/src/config.rs                 |  42 +++
 p2p/cuprate-p2p/src/connection_maintainer.rs  | 294 ++++++++++++++++++
 p2p/cuprate-p2p/src/constants.rs              |  45 +++
 p2p/cuprate-p2p/src/lib.rs                    |  18 ++
 p2p/cuprate-p2p/src/peer_set.rs               |  97 ++++++
 .../src/peer_set/disconnect_monitor.rs        |  64 ++++
 .../src/peer_set/drop_guard_client.rs         |  36 +++
 10 files changed, 680 insertions(+)
 create mode 100644 p2p/cuprate-p2p/Cargo.toml
 create mode 100644 p2p/cuprate-p2p/src/config.rs
 create mode 100644 p2p/cuprate-p2p/src/connection_maintainer.rs
 create mode 100644 p2p/cuprate-p2p/src/constants.rs
 create mode 100644 p2p/cuprate-p2p/src/lib.rs
 create mode 100644 p2p/cuprate-p2p/src/peer_set.rs
 create mode 100644 p2p/cuprate-p2p/src/peer_set/disconnect_monitor.rs
 create mode 100644 p2p/cuprate-p2p/src/peer_set/drop_guard_client.rs

diff --git a/Cargo.lock b/Cargo.lock
index 18c9074d..e987cfb3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -642,6 +642,36 @@ dependencies = [
  "windows",
 ]
 
+[[package]]
+name = "cuprate-p2p"
+version = "0.1.0"
+dependencies = [
+ "bytes",
+ "cuprate-helper",
+ "cuprate-test-utils",
+ "dashmap",
+ "fixed-bytes",
+ "futures",
+ "hex",
+ "indexmap 2.2.6",
+ "monero-address-book",
+ "monero-p2p",
+ "monero-pruning",
+ "monero-serai",
+ "monero-wire",
+ "pin-project",
+ "rand",
+ "rand_distr",
+ "rayon",
+ "thiserror",
+ "tokio",
+ "tokio-stream",
+ "tokio-util",
+ "tower",
+ "tracing",
+ "tracing-subscriber",
+]
+
 [[package]]
 name = "cuprate-test-utils"
 version = "0.1.0"
@@ -726,6 +756,19 @@ dependencies = [
  "zeroize",
 ]
 
+[[package]]
+name = "dashmap"
+version = "5.5.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
+dependencies = [
+ "cfg-if",
+ "hashbrown 0.14.3",
+ "lock_api",
+ "once_cell",
+ "parking_lot_core",
+]
+
 [[package]]
 name = "deranged"
 version = "0.3.11"
diff --git a/Cargo.toml b/Cargo.toml
index ca695ac5..5143bde5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,6 +11,7 @@ members = [
 	"net/fixed-bytes",
 	"net/levin",
 	"net/monero-wire",
+	"p2p/cuprate-p2p",
 	"p2p/monero-p2p",
 	"p2p/address-book",
 	"pruning",
@@ -49,6 +50,7 @@ crypto-bigint         = { version = "0.5.5", default-features = false }
 crossbeam             = { version = "0.8.4", default-features = false }
 curve25519-dalek      = { version = "4.1.1", default-features = false }
 dalek-ff-group        = { git = "https://github.com/Cuprate/serai.git", rev = "347d4cf", default-features = false }
+dashmap               = { version = "5.5.3", default-features = false }
 dirs                  = { version = "5.0.1", default-features = false }
 futures               = { version = "0.3.29", default-features = false }
 hex                   = { version = "0.4.3", default-features = false }
@@ -60,6 +62,7 @@ paste                 = { version = "1.0.14", default-features = false }
 pin-project           = { version = "1.1.3", default-features = false }
 randomx-rs            = { git = "https://github.com/Cuprate/randomx-rs.git", rev = "0028464", default-features = false }
 rand                  = { version = "0.8.5", default-features = false }
+rand_distr            = { version = "0.4.3", default-features = false }
 rayon                 = { version = "1.9.0", default-features = false }
 serde_bytes           = { version = "0.11.12", default-features = false }
 serde_json            = { version = "1.0.108", default-features = false }
diff --git a/p2p/cuprate-p2p/Cargo.toml b/p2p/cuprate-p2p/Cargo.toml
new file mode 100644
index 00000000..17a68525
--- /dev/null
+++ b/p2p/cuprate-p2p/Cargo.toml
@@ -0,0 +1,38 @@
+[package]
+name = "cuprate-p2p"
+version = "0.1.0"
+edition = "2021"
+license = "MIT"
+authors = ["Boog900"]
+
+[dependencies]
+fixed-bytes = { path = "../../net/fixed-bytes" }
+monero-wire = { path = "../../net/monero-wire" }
+monero-p2p = { path = "../monero-p2p", features = ["borsh"] }
+monero-address-book = { path = "../address-book" }
+monero-pruning = { path = "../../pruning" }
+cuprate-helper = { path = "../../helper", features = ["asynch"] }
+
+monero-serai = { workspace = true, features = ["std"] }
+
+tower = { workspace = true }
+tokio = { workspace = true, features = ["full"] }
+rayon = { workspace = true }
+tokio-util = { workspace = true }
+tokio-stream = { workspace = true, features = ["sync", "time"] }
+futures = { workspace = true, features = ["std"] }
+pin-project = { workspace = true }
+dashmap = { workspace = true }
+
+thiserror = { workspace = true }
+bytes = { workspace = true, features = ["std"] }
+indexmap = { workspace = true, features = ["std"] }
+rand = { workspace = true, features = ["std", "std_rng"] }
+rand_distr = { workspace = true, features = ["std"] }
+hex = { workspace = true, features = ["std"] }
+tracing = { workspace = true, features = ["std", "attributes"] }
+
+[dev-dependencies]
+cuprate-test-utils = {path = "../../test-utils"}
+tracing-subscriber = { version = "0.3.18" }
+
diff --git a/p2p/cuprate-p2p/src/config.rs b/p2p/cuprate-p2p/src/config.rs
new file mode 100644
index 00000000..39ecc0ad
--- /dev/null
+++ b/p2p/cuprate-p2p/src/config.rs
@@ -0,0 +1,42 @@
+use cuprate_helper::network::Network;
+use monero_address_book::AddressBookConfig;
+
+/// P2P config.
+#[derive(Clone, Debug)]
+pub struct P2PConfig {
+    pub p2p_port: u16,
+
+    pub rpc_port: u16,
+
+    pub network: Network,
+
+    /// The number of outbound connections to make and try keep.
+    pub outbound_connections: usize,
+    /// The absolute maximum number of held outbound connections.
+    ///
+    /// *Note:* Cuprate might make more connections than this to see if a peer is reachable or
+    /// to get peers from that node, these connections are not held for long though.
+    pub max_outbound_connections: usize,
+
+    /// The number of anchor connections to make.
+    ///
+    /// An anchor connection is a connection which was held before last shutdown, anchor connections
+    /// help to prevent certain attacks.
+    pub anchor_connections: usize,
+
+    /// The percent of outbound peers that should be gray aka never connected to before.
+    ///
+    /// Only values 0..=1 are valid.
+    pub gray_peers_percent: f64,
+
+    /// The maximum amount of inbound peers
+    pub max_inbound_connections: usize,
+
+    pub address_book_config: AddressBookConfig,
+}
+
+impl P2PConfig {
+    pub fn allowed_extra_connections(&self) -> usize {
+        self.max_outbound_connections - self.outbound_connections
+    }
+}
diff --git a/p2p/cuprate-p2p/src/connection_maintainer.rs b/p2p/cuprate-p2p/src/connection_maintainer.rs
new file mode 100644
index 00000000..3522495b
--- /dev/null
+++ b/p2p/cuprate-p2p/src/connection_maintainer.rs
@@ -0,0 +1,294 @@
+//! Outbound Connection Maintainer.
+//!
+//! This module handles maintaining the number of outbound connections defined in the [`P2PConfig`].
+//! It also handles making extra connections when the peer set is under load or when we need data that
+//! no connected peer has.
+use std::sync::Arc;
+
+use futures::{stream::FuturesUnordered, StreamExt};
+use rand::{distributions::Bernoulli, prelude::*};
+use tokio::{
+    sync::{mpsc, OwnedSemaphorePermit, Semaphore},
+    time::{sleep, timeout},
+};
+use tower::{Service, ServiceExt};
+use tracing::{info, instrument, warn};
+
+use monero_p2p::{
+    client::{Client, ConnectRequest, HandshakeError},
+    services::{AddressBookRequest, AddressBookResponse},
+    AddressBook, NetworkZone,
+};
+
+use crate::{
+    config::P2PConfig,
+    constants::{
+        HANDSHAKE_TIMEOUT, MAX_SEED_CONNECTIONS, OUTBOUND_CONNECTION_TIMEOUT, PEER_FIND_TIMEOUT,
+    },
+    peer_set::ClientPool,
+};
+
+enum OutboundConnectorError {
+    MaxConnections,
+    FailedToConnectToSeeds,
+    NoAvailablePeers,
+}
+
+/// A request from the peer set to make an outbound connection.
+///
+/// This will only be sent when the peer set is under load from the rest of Cuprate or the peer
+/// set needs specific data that none of the currently connected peers have.
+pub struct MakeConnectionRequest {
+    /// The block needed that no connected peers have due to pruning.
+    block_needed: Option<u64>,
+}
+
+/// The outbound connection (count) keeper.
+///
+/// This handles maintaining a minimum number of connections and making extra connections when needed, upto a maximum.
+pub struct OutboundConnectionKeeper<N: NetworkZone, A, C> {
+    /// TODO.
+    pub client_pool: Arc<ClientPool<N>>,
+    /// The channel that tells us to make new outbound connections
+    pub make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
+    /// The address book service
+    pub address_book_svc: A,
+    /// The service to connect to a specific peer.
+    pub connector_svc: C,
+    /// A semaphore to keep the amount of outbound peers constant.
+    pub outbound_semaphore: Arc<Semaphore>,
+    /// The amount of peers we connected to because we needed more peers. If the `outbound_semaphore`
+    /// is full, and we need to connect to more peers for blocks ro because not enough peers are ready
+    /// we add a permit to the semaphore and keep track here, upto a value in config.
+    pub extra_peers: usize,
+    /// The p2p config.
+    pub config: P2PConfig,
+    /// The [`Bernoulli`] distribution, when sampled will return true if we should connect to a gray peer or
+    /// false if we should connect to a white peer.
+    ///
+    /// This is weighted to the percentage given in `config`.
+    pub peer_type_gen: Bernoulli,
+}
+
+impl<N: NetworkZone, A, C> OutboundConnectionKeeper<N, A, C>
+where
+    A: AddressBook<N>,
+    C: Service<ConnectRequest<N>, Response = Client<N>, Error = HandshakeError>,
+    C::Future: Send + 'static,
+{
+    pub fn new(
+        config: &P2PConfig,
+        client_pool: Arc<ClientPool<N>>,
+        make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
+        address_book_svc: A,
+        connector_svc: C,
+    ) -> Self {
+        let peer_type_gen = Bernoulli::new(config.gray_peers_percent)
+            .expect("Gray peer percent is incorrect should be 0..=1");
+
+        Self {
+            client_pool,
+            make_connection_rx,
+            address_book_svc,
+            connector_svc,
+            outbound_semaphore: Arc::new(Semaphore::new(config.outbound_connections)),
+            extra_peers: 0,
+            config: config.clone(),
+            peer_type_gen,
+        }
+    }
+
+    /// Connects to random seeds to get peers and immediately disconnects
+    #[instrument(level = "info", skip(self))]
+    async fn connect_to_random_seeds(&mut self) -> Result<(), OutboundConnectorError> {
+        let seeds = N::SEEDS.choose_multiple(&mut thread_rng(), MAX_SEED_CONNECTIONS);
+
+        if seeds.len() == 0 {
+            panic!("No seed nodes available to get peers from");
+        }
+
+        // This isn't really needed here to limit connections as the seed nodes will be dropped when we have got
+        // peers from them.
+        let semaphore = Arc::new(Semaphore::new(seeds.len()));
+
+        let mut allowed_errors = seeds.len();
+
+        let mut handshake_futs = FuturesUnordered::new();
+
+        for seed in seeds {
+            info!("Getting peers from seed node: {}", seed);
+
+            let fut = timeout(
+                HANDSHAKE_TIMEOUT,
+                self.connector_svc
+                    .ready()
+                    .await
+                    .expect("Connector had an error in `poll_ready`")
+                    .call(ConnectRequest {
+                        addr: *seed,
+                        permit: semaphore
+                            .clone()
+                            .try_acquire_owned()
+                            .expect("This must have enough permits as we just set the amount."),
+                    }),
+            );
+            // Spawn the handshake on a separate task with a timeout, so we don't get stuck connecting to a peer.
+            handshake_futs.push(tokio::spawn(fut));
+        }
+
+        while let Some(res) = handshake_futs.next().await {
+            if matches!(res, Err(_) | Ok(Err(_)) | Ok(Ok(Err(_)))) {
+                allowed_errors -= 1;
+            }
+        }
+
+        if allowed_errors == 0 {
+            return Err(OutboundConnectorError::FailedToConnectToSeeds);
+        } else {
+            Ok(())
+        }
+    }
+
+    /// Connects to a given outbound peer.
+    #[instrument(level = "info", skip(self, permit), fields(%addr))]
+    async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) {
+        let client_pool = self.client_pool.clone();
+        let connection_fut = self
+            .connector_svc
+            .ready()
+            .await
+            .expect("Connector had an error in `poll_ready`")
+            .call(ConnectRequest { addr, permit });
+
+        tokio::spawn(async move {
+            if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await {
+                client_pool.add_new_client(peer);
+            }
+        });
+    }
+
+    /// Handles a request from the peer set for more peers.
+    async fn handle_peer_request(
+        &mut self,
+        req: &MakeConnectionRequest,
+    ) -> Result<(), OutboundConnectorError> {
+        // try to get a permit.
+        let permit = self
+            .outbound_semaphore
+            .clone()
+            .try_acquire_owned()
+            .or_else(|_| {
+                // if we can't get a permit add one if we are below the max number of connections.
+                if self.extra_peers >= self.config.allowed_extra_connections() {
+                    // If we can't add a permit return an error.
+                    Err(OutboundConnectorError::MaxConnections)
+                } else {
+                    self.outbound_semaphore.add_permits(1);
+                    self.extra_peers += 1;
+                    Ok(self.outbound_semaphore.clone().try_acquire_owned().unwrap())
+                }
+            })?;
+
+        // try to get a random peer on any network zone from the address book.
+        let peer = self
+            .address_book_svc
+            .ready()
+            .await
+            .expect("Error in address book!")
+            .call(AddressBookRequest::TakeRandomPeer {
+                height: req.block_needed,
+            })
+            .await;
+
+        match peer {
+            Err(_) => {
+                // TODO: We should probably send peer requests to our connected peers rather than go to seeds.
+                warn!("No peers in address book which are available and have the data we need. Getting peers from seed nodes.");
+
+                self.connect_to_random_seeds().await?;
+                Err(OutboundConnectorError::NoAvailablePeers)
+            }
+
+            Ok(AddressBookResponse::Peer(peer)) => {
+                self.connect_to_outbound_peer(permit, peer.adr).await;
+                Ok(())
+            }
+            Ok(_) => panic!("peer list sent incorrect response!"),
+        }
+    }
+
+    /// Handles a free permit, by either connecting to a new peer or by removing a permit if we above the
+    /// minimum number of outbound connections.
+    #[instrument(level = "debug", skip(self, permit))]
+    async fn handle_free_permit(
+        &mut self,
+        permit: OwnedSemaphorePermit,
+    ) -> Result<(), OutboundConnectorError> {
+        if self.extra_peers > 0 {
+            tracing::debug!(
+                "Permit available but we are over the minimum number of peers, forgetting permit."
+            );
+            permit.forget();
+            self.extra_peers -= 1;
+            return Ok(());
+        }
+
+        tracing::debug!("Permit available making outbound connection.");
+
+        let req = if self.peer_type_gen.sample(&mut thread_rng()) {
+            AddressBookRequest::TakeRandomGrayPeer { height: None }
+        } else {
+            // This will try white peers first then gray.
+            AddressBookRequest::TakeRandomPeer { height: None }
+        };
+
+        let Ok(AddressBookResponse::Peer(peer)) = self
+            .address_book_svc
+            .ready()
+            .await
+            .expect("Error in address book!")
+            .call(req)
+            .await
+        else {
+            warn!("No peers in peer list to make connection to.");
+            self.connect_to_random_seeds().await?;
+            return Err(OutboundConnectorError::NoAvailablePeers);
+        };
+
+        self.connect_to_outbound_peer(permit, peer.adr).await;
+        Ok(())
+    }
+
+    /// Runs the outbound connection count keeper.
+    pub async fn run(mut self) {
+        info!(
+            "Starting outbound connection maintainer, target outbound connections: {}",
+            self.config.outbound_connections
+        );
+
+        loop {
+            tokio::select! {
+                biased;
+                peer_req = self.make_connection_rx.recv() => {
+                    let Some(peer_req) = peer_req else {
+                        info!("Shutting down outbound connector, make_connection_rx closed.");
+                        return;
+                    };
+                    while self.handle_peer_request(&peer_req).await.is_err() {
+                        warn!("Failed to find peer to connect to or to add a permit, trying again in {} seconds", PEER_FIND_TIMEOUT.as_secs());
+                        sleep(PEER_FIND_TIMEOUT).await;
+                    }
+                },
+                // This future is not cancellation safe as you will lose your space in the queue but as we are the only place
+                // that actually requires permits that should be ok.
+                Ok(permit) = self.outbound_semaphore.clone().acquire_owned() => {
+                    if self.handle_free_permit(permit).await.is_err() {
+                        // if we got an error then we still have a permit free so to prevent this from just looping
+                        // uncontrollably add a timeout.
+                        sleep(OUTBOUND_CONNECTION_TIMEOUT).await;
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/p2p/cuprate-p2p/src/constants.rs b/p2p/cuprate-p2p/src/constants.rs
new file mode 100644
index 00000000..82acefbc
--- /dev/null
+++ b/p2p/cuprate-p2p/src/constants.rs
@@ -0,0 +1,45 @@
+use std::time::Duration;
+
+pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30);
+
+pub(crate) const CHAIN_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
+
+pub(crate) const BLOCK_REQUEST_TIMEOUT: Duration = Duration::from_secs(20);
+
+pub(crate) const BLOCK_REQUEST_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
+
+pub(crate) const SEED_CONNECTION_RETRY_TIMEOUT: Duration = Duration::from_secs(60);
+
+pub(crate) const CONCURRENT_PEER_LIST_REQUESTS: usize = 3;
+
+pub(crate) const MAX_SEED_CONNECTIONS: usize = 3;
+
+pub(crate) const PEER_FIND_TIMEOUT: Duration = Duration::from_secs(30);
+
+pub(crate) const OUTBOUND_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
+
+/// The duration of a short ban (1 hour).
+pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 60);
+
+/// The duration of a medium ban (24 hours).
+pub(crate) const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24);
+
+pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5);
+
+pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_OUTBOUND: Duration = Duration::from_millis(2500);
+
+pub(crate) const SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT: usize = 1024 * 1024 * 60;
+
+/// The limit on the amount of transactions kept in the broadcast channel.
+///
+/// A transaction is kept in the broadcast channel until all nodes have broadcast it.
+///
+/// Because of internal implementation details this limit will ALWAYS be hit i.e. a tx will stay in the
+/// channel until [`MAX_TXS_IN_BROADCAST_CHANNEL`] more txs are added.
+pub(crate) const MAX_TXS_IN_BROADCAST_CHANNEL: usize = 50;
+
+pub(crate) const INCOMING_BLOCKS_CACHE_SIZE: usize = 10 * 1024 * 1024;
+
+pub(crate) const NUMBER_OF_BLOCKS_TO_REQUEST: usize = 100;
+
+pub(crate) const CHAIN_REQUESTS_TO_SEND: usize = 2;
diff --git a/p2p/cuprate-p2p/src/lib.rs b/p2p/cuprate-p2p/src/lib.rs
new file mode 100644
index 00000000..024b8788
--- /dev/null
+++ b/p2p/cuprate-p2p/src/lib.rs
@@ -0,0 +1,18 @@
+//! Cuprate's P2P Crate.
+//!
+//! This crate contains a `PeerSet` which holds connected peers on a single [`NetworkZone`](monero_p2p::NetworkZone).
+//! The `PeerSet` has methods to get peers by direction (inbound/outbound) or by a custom method like a load balancing
+//! algorithm.
+//!
+//! This crate also contains the different routing methods that control how messages should be sent, i.e. broadcast to all,
+//! or send to a single peer.
+//!
+
+#![allow(dead_code)]
+
+pub mod config;
+pub mod connection_maintainer;
+mod constants;
+mod peer_set;
+
+pub use config::P2PConfig;
diff --git a/p2p/cuprate-p2p/src/peer_set.rs b/p2p/cuprate-p2p/src/peer_set.rs
new file mode 100644
index 00000000..0a2397aa
--- /dev/null
+++ b/p2p/cuprate-p2p/src/peer_set.rs
@@ -0,0 +1,97 @@
+//! This module contains the peer set and related functionality.
+//!
+use std::sync::Arc;
+
+use dashmap::{DashMap, DashSet};
+use tokio::sync::mpsc;
+
+use monero_p2p::{
+    client::{Client, InternalPeerID},
+    ConnectionDirection, NetworkZone,
+};
+
+mod disconnect_monitor;
+mod drop_guard_client;
+
+pub use drop_guard_client::ClientPoolGuard;
+use monero_p2p::handles::ConnectionHandle;
+
+pub struct ClientPool<N: NetworkZone> {
+    clients: DashMap<InternalPeerID<N::Addr>, Client<N>>,
+    outbound_clients: DashSet<InternalPeerID<N::Addr>>,
+
+    new_connection_tx: mpsc::UnboundedSender<(ConnectionHandle, InternalPeerID<N::Addr>)>,
+}
+
+impl<N: NetworkZone> ClientPool<N> {
+    pub fn new() -> Arc<ClientPool<N>> {
+        let (tx, rx) = mpsc::unbounded_channel();
+
+        let pool = Arc::new(ClientPool {
+            clients: DashMap::new(),
+            outbound_clients: DashSet::new(),
+            new_connection_tx: tx,
+        });
+
+        tokio::spawn(disconnect_monitor::disconnect_monitor(rx, pool.clone()));
+
+        pool
+    }
+
+    fn add_client(&self, client: Client<N>) {
+        let handle = client.info.handle.clone();
+        let id = client.info.id;
+
+        if handle.is_closed() {
+            return;
+        }
+
+        if client.info.direction == ConnectionDirection::OutBound {
+            self.outbound_clients.insert(id);
+        }
+
+        let res = self.clients.insert(id, client);
+        debug_assert!(res.is_none());
+
+        // TODO: document how this prevents a race condition.
+        if handle.is_closed() {
+            self.remove_client(&id);
+        }
+    }
+
+    pub fn add_new_client(&self, client: Client<N>) {
+        self.new_connection_tx
+            .send((client.info.handle.clone(), client.info.id))
+            .unwrap();
+
+        self.add_client(client);
+    }
+
+    fn remove_client(&self, peer: &InternalPeerID<N::Addr>) -> Option<Client<N>> {
+        self.outbound_clients.remove(peer);
+
+        self.clients.remove(peer).map(|(_, client)| client)
+    }
+
+    pub fn borrow_client(
+        self: &Arc<Self>,
+        peer: &InternalPeerID<N::Addr>,
+    ) -> Option<ClientPoolGuard<N>> {
+        self.outbound_clients.remove(peer);
+
+        self.remove_client(peer).map(|client| ClientPoolGuard {
+            pool: Arc::clone(self),
+            client: Some(client),
+        })
+    }
+
+    pub fn borrow_clients(
+        self: &Arc<Self>,
+        peers: &[InternalPeerID<N::Addr>],
+    ) -> Vec<ClientPoolGuard<N>> {
+        peers
+            .iter()
+            .filter_map(|peer| self.borrow_client(peer))
+            .collect()
+    }
+}
diff --git a/p2p/cuprate-p2p/src/peer_set/disconnect_monitor.rs b/p2p/cuprate-p2p/src/peer_set/disconnect_monitor.rs
new file mode 100644
index 00000000..71a8e69c
--- /dev/null
+++ b/p2p/cuprate-p2p/src/peer_set/disconnect_monitor.rs
@@ -0,0 +1,64 @@
+use std::{
+    future::Future,
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+use futures::{stream::FuturesUnordered, StreamExt};
+use tokio::sync::mpsc;
+use tokio_util::sync::WaitForCancellationFutureOwned;
+use tracing::instrument;
+
+use monero_p2p::{client::InternalPeerID, handles::ConnectionHandle, NetworkZone};
+
+use super::ClientPool;
+
+#[instrument(level="info", skip_all, fields(network=N::NAME))]
+pub async fn disconnect_monitor<N: NetworkZone>(
+    mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID<N::Addr>)>,
+    client_pool: Arc<ClientPool<N>>,
+) {
+    tracing::info!("Starting peer disconnect monitor.");
+
+    let mut futs: FuturesUnordered<PeerDisconnectFut<N>> = FuturesUnordered::new();
+
+    loop {
+        tokio::select! {
+            Some((con_handle, peer_id)) = new_connection_rx.recv() => {
+                tracing::debug!("Monitoring {peer_id} for disconnect");
+                futs.push(PeerDisconnectFut {
+                    closed_fut: con_handle.closed(),
+                    peer_id: Some(peer_id),
+                });
+            }
+            Some(peer_id) = futs.next() => {
+                tracing::debug!("{peer_id} has disconnecting, removing from peer_set.");
+                client_pool.remove_client(&peer_id);
+            }
+            else => {
+                tracing::info!("Peer disconnect monitor shutting down.");
+                return;
+            }
+        }
+    }
+}
+
+#[pin_project::pin_project]
+struct PeerDisconnectFut<N: NetworkZone> {
+    #[pin]
+    closed_fut: WaitForCancellationFutureOwned,
+    peer_id: Option<InternalPeerID<N::Addr>>,
+}
+
+impl<N: NetworkZone> Future for PeerDisconnectFut<N> {
+    type Output = InternalPeerID<N::Addr>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        let this = self.project();
+
+        this.closed_fut
+            .poll(cx)
+            .map(|_| this.peer_id.take().unwrap())
+    }
+}
diff --git a/p2p/cuprate-p2p/src/peer_set/drop_guard_client.rs b/p2p/cuprate-p2p/src/peer_set/drop_guard_client.rs
new file mode 100644
index 00000000..771e6301
--- /dev/null
+++ b/p2p/cuprate-p2p/src/peer_set/drop_guard_client.rs
@@ -0,0 +1,36 @@
+use monero_p2p::client::Client;
+use std::{
+    ops::{Deref, DerefMut},
+    sync::Arc,
+};
+
+use monero_p2p::NetworkZone;
+
+use crate::peer_set::ClientPool;
+
+pub struct ClientPoolGuard<N: NetworkZone> {
+    pub(super) pool: Arc<ClientPool<N>>,
+    pub(super) client: Option<Client<N>>,
+}
+
+impl<N: NetworkZone> Deref for ClientPoolGuard<N> {
+    type Target = Client<N>;
+
+    fn deref(&self) -> &Self::Target {
+        self.client.as_ref().unwrap()
+    }
+}
+
+impl<N: NetworkZone> DerefMut for ClientPoolGuard<N> {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        self.client.as_mut().unwrap()
+    }
+}
+
+impl<N: NetworkZone> Drop for ClientPoolGuard<N> {
+    fn drop(&mut self) {
+        let client = self.client.take().unwrap();
+
+        self.pool.add_client(client);
+    }
+}