From 859d67d8b6945fe1e92395d2e4c248017ce0f6ae Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Tue, 7 May 2024 01:07:18 +0100 Subject: [PATCH] add more docs --- p2p/cuprate-p2p/src/client_pool.rs | 43 +++++++++++++------ .../src/client_pool/disconnect_monitor.rs | 12 +++++- .../src/client_pool/drop_guard_client.rs | 6 ++- p2p/cuprate-p2p/src/connection_maintainer.rs | 14 +++--- p2p/cuprate-p2p/src/constants.rs | 7 +-- p2p/cuprate-p2p/src/lib.rs | 5 +-- 6 files changed, 55 insertions(+), 32 deletions(-) diff --git a/p2p/cuprate-p2p/src/client_pool.rs b/p2p/cuprate-p2p/src/client_pool.rs index 97487c3b..66feeb92 100644 --- a/p2p/cuprate-p2p/src/client_pool.rs +++ b/p2p/cuprate-p2p/src/client_pool.rs @@ -42,6 +42,7 @@ pub struct ClientPool { } impl ClientPool { + /// Returns a new [`ClientPool`] wrapped in an [`Arc`]. pub fn new() -> Arc> { let (tx, rx) = mpsc::unbounded_channel(); @@ -56,10 +57,15 @@ impl ClientPool { pool } + /// Adds a [`Client`] to the pool, the client must have previously been taken from the + /// pool. + /// + /// See [`ClientPool::add_new_client`] to add a [`Client`] which was not taken from the pool before. fn add_client(&self, client: Client) { let handle = client.info.handle.clone(); let id = client.info.id; + // Fast path: if the client is disconnected don't add it to the peer set. if handle.is_closed() { return; } @@ -69,14 +75,19 @@ impl ClientPool { } let res = self.clients.insert(id, client); - debug_assert!(res.is_none()); + assert!(res.is_none()); - // TODO: document how this prevents a race condition. + // We have to check this again otherwise we could have a race condition where a + // peer is disconnected after the first check, the disconnect monitor trys to remove it, + // and then it is added to the pool. if handle.is_closed() { self.remove_client(&id); } } + /// Adds a _new_ [`Client`] to the pool, this client should be a new connection, + /// + /// See [`ClientPool::add_client`] to add a [`Client`] which was removed from the pool. pub fn add_new_client(&self, client: Client) { self.new_connection_tx .send((client.info.handle.clone(), client.info.id)) @@ -85,31 +96,39 @@ impl ClientPool { self.add_client(client); } + /// Remove a [`Client`] from the pool. fn remove_client(&self, peer: &InternalPeerID) -> Option> { self.outbound_clients.remove(peer); self.clients.remove(peer).map(|(_, client)| client) } + /// Borrows a [`Client`] from the pool, the [`Client`] is wrapped in [`ClientPoolDropGuard`] which + /// will return the client to the pool when it's dropped. pub fn borrow_client( self: &Arc, peer: &InternalPeerID, ) -> Option> { - self.outbound_clients.remove(peer); - self.remove_client(peer).map(|client| ClientPoolDropGuard { pool: Arc::clone(self), client: Some(client), }) } - pub fn borrow_clients( - self: &Arc, - peers: &[InternalPeerID], - ) -> Vec> { - peers - .iter() - .filter_map(|peer| self.borrow_client(peer)) - .collect() + /// Borrows multiple [`Client`]s from the pool. + /// + /// The returned iterator is not guaranteed to contain ever peer asked for, + #[allow(private_interfaces)] // TODO: Remove me when 2024 Rust + pub fn borrow_clients<'a, 'b>( + self: &'a Arc, + peers: &'b [InternalPeerID], + ) -> impl Iterator> + Captures<(&'a (), &'b ())> { + peers.iter().filter_map(|peer| self.borrow_client(peer)) } } + +/// TODO: Remove me when 2024 Rust +/// +/// https://rust-lang.github.io/rfcs/3498-lifetime-capture-rules-2024.html#the-captures-trick +trait Captures {} +impl Captures for T {} diff --git a/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs b/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs index 71a8e69c..532c2121 100644 --- a/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs +++ b/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs @@ -1,3 +1,7 @@ +//! # Disconnect Monitor +//! +//! This module contains the [`disconnect_monitor`] task, which monitors connected peers for disconnection +//! and the removes them from the [`ClientPool`] if they do. use std::{ future::Future, pin::Pin, @@ -14,7 +18,8 @@ use monero_p2p::{client::InternalPeerID, handles::ConnectionHandle, NetworkZone} use super::ClientPool; -#[instrument(level="info", skip_all, fields(network=N::NAME))] +/// The disconnect monitor task. +#[instrument(level = "info", skip_all)] pub async fn disconnect_monitor( mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID)>, client_pool: Arc>, @@ -33,7 +38,7 @@ pub async fn disconnect_monitor( }); } Some(peer_id) = futs.next() => { - tracing::debug!("{peer_id} has disconnecting, removing from peer_set."); + tracing::debug!("{peer_id} has disconnected, removing from client pool."); client_pool.remove_client(&peer_id); } else => { @@ -44,10 +49,13 @@ pub async fn disconnect_monitor( } } +/// A [`Future`] that resolves when a peer disconnects. #[pin_project::pin_project] struct PeerDisconnectFut { + /// The inner [`Future`] that resolves when a peer disconnects. #[pin] closed_fut: WaitForCancellationFutureOwned, + /// The peers ID. peer_id: Option>, } diff --git a/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs b/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs index cf100e8e..d6b6f7d4 100644 --- a/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs +++ b/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs @@ -1,15 +1,17 @@ -use monero_p2p::client::Client; use std::{ ops::{Deref, DerefMut}, sync::Arc, }; -use monero_p2p::NetworkZone; +use monero_p2p::{client::Client, NetworkZone}; use crate::client_pool::ClientPool; +/// A wrapper around [`Client`] which returns the client to the [`ClientPool`] when dropped. pub struct ClientPoolDropGuard { + /// The [`ClientPool`] to return the peer to. pub(super) pool: Arc>, + /// The [`Client`]. pub(super) client: Option>, } diff --git a/p2p/cuprate-p2p/src/connection_maintainer.rs b/p2p/cuprate-p2p/src/connection_maintainer.rs index 0b7a492f..d56e5094 100644 --- a/p2p/cuprate-p2p/src/connection_maintainer.rs +++ b/p2p/cuprate-p2p/src/connection_maintainer.rs @@ -23,9 +23,7 @@ use monero_p2p::{ use crate::{ client_pool::ClientPool, config::P2PConfig, - constants::{ - HANDSHAKE_TIMEOUT, MAX_SEED_CONNECTIONS, OUTBOUND_CONNECTION_TIMEOUT, PEER_FIND_TIMEOUT, - }, + constants::{HANDSHAKE_TIMEOUT, MAX_SEED_CONNECTIONS, OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT}, }; enum OutboundConnectorError { @@ -271,13 +269,11 @@ where biased; peer_req = self.make_connection_rx.recv() => { let Some(peer_req) = peer_req else { - info!("Shutting down outbound connector, make_connection_rx closed."); + info!("Shutting down outbound connector, make connection channel closed."); return; }; - while self.handle_peer_request(&peer_req).await.is_err() { - warn!("Failed to find peer to connect to or to add a permit, trying again in {} seconds", PEER_FIND_TIMEOUT.as_secs()); - sleep(PEER_FIND_TIMEOUT).await; - } + // We can't really do much about errors in this function. + let _ = self.handle_peer_request(&peer_req).await; }, // This future is not cancellation safe as you will lose your space in the queue but as we are the only place // that actually requires permits that should be ok. @@ -285,7 +281,7 @@ where if self.handle_free_permit(permit).await.is_err() { // if we got an error then we still have a permit free so to prevent this from just looping // uncontrollably add a timeout. - sleep(OUTBOUND_CONNECTION_TIMEOUT).await; + sleep(OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT).await; } } } diff --git a/p2p/cuprate-p2p/src/constants.rs b/p2p/cuprate-p2p/src/constants.rs index 8a5fe397..58e263fa 100644 --- a/p2p/cuprate-p2p/src/constants.rs +++ b/p2p/cuprate-p2p/src/constants.rs @@ -1,9 +1,10 @@ use std::time::Duration; +/// The timeout we set on handshakes. pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30); +/// The maximum amount of connections to make to seed nodes for when we need peers. pub(crate) const MAX_SEED_CONNECTIONS: usize = 3; -pub(crate) const PEER_FIND_TIMEOUT: Duration = Duration::from_secs(30); - -pub(crate) const OUTBOUND_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5); +/// The timeout for when we fail to find a peer to connect to. +pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5); diff --git a/p2p/cuprate-p2p/src/lib.rs b/p2p/cuprate-p2p/src/lib.rs index b9492f9e..7186e877 100644 --- a/p2p/cuprate-p2p/src/lib.rs +++ b/p2p/cuprate-p2p/src/lib.rs @@ -1,13 +1,10 @@ //! Cuprate's P2P Crate. //! -//! This crate contains a `PeerSet` which holds connected peers on a single [`NetworkZone`](monero_p2p::NetworkZone). -//! The `PeerSet` has methods to get peers by direction (inbound/outbound) or by a custom method like a load balancing -//! algorithm. +//! This crate contains a [`ClientPool`](client_pool::ClientPool) which holds connected peers on a single [`NetworkZone`](monero_p2p::NetworkZone). //! //! This crate also contains the different routing methods that control how messages should be sent, i.e. broadcast to all, //! or send to a single peer. //! - #![allow(dead_code)] mod client_pool;