add more docs

This commit is contained in:
Boog900 2024-05-07 01:07:18 +01:00
parent 1b01336294
commit 859d67d8b6
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
6 changed files with 55 additions and 32 deletions

View file

@ -42,6 +42,7 @@ pub struct ClientPool<N: NetworkZone> {
} }
impl<N: NetworkZone> ClientPool<N> { impl<N: NetworkZone> ClientPool<N> {
/// Returns a new [`ClientPool`] wrapped in an [`Arc`].
pub fn new() -> Arc<ClientPool<N>> { pub fn new() -> Arc<ClientPool<N>> {
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
@ -56,10 +57,15 @@ impl<N: NetworkZone> ClientPool<N> {
pool pool
} }
/// Adds a [`Client`] to the pool, the client must have previously been taken from the
/// pool.
///
/// See [`ClientPool::add_new_client`] to add a [`Client`] which was not taken from the pool before.
fn add_client(&self, client: Client<N>) { fn add_client(&self, client: Client<N>) {
let handle = client.info.handle.clone(); let handle = client.info.handle.clone();
let id = client.info.id; let id = client.info.id;
// Fast path: if the client is disconnected don't add it to the peer set.
if handle.is_closed() { if handle.is_closed() {
return; return;
} }
@ -69,14 +75,19 @@ impl<N: NetworkZone> ClientPool<N> {
} }
let res = self.clients.insert(id, client); let res = self.clients.insert(id, client);
debug_assert!(res.is_none()); assert!(res.is_none());
// TODO: document how this prevents a race condition. // We have to check this again otherwise we could have a race condition where a
// peer is disconnected after the first check, the disconnect monitor trys to remove it,
// and then it is added to the pool.
if handle.is_closed() { if handle.is_closed() {
self.remove_client(&id); self.remove_client(&id);
} }
} }
/// Adds a _new_ [`Client`] to the pool, this client should be a new connection,
///
/// See [`ClientPool::add_client`] to add a [`Client`] which was removed from the pool.
pub fn add_new_client(&self, client: Client<N>) { pub fn add_new_client(&self, client: Client<N>) {
self.new_connection_tx self.new_connection_tx
.send((client.info.handle.clone(), client.info.id)) .send((client.info.handle.clone(), client.info.id))
@ -85,31 +96,39 @@ impl<N: NetworkZone> ClientPool<N> {
self.add_client(client); self.add_client(client);
} }
/// Remove a [`Client`] from the pool.
fn remove_client(&self, peer: &InternalPeerID<N::Addr>) -> Option<Client<N>> { fn remove_client(&self, peer: &InternalPeerID<N::Addr>) -> Option<Client<N>> {
self.outbound_clients.remove(peer); self.outbound_clients.remove(peer);
self.clients.remove(peer).map(|(_, client)| client) self.clients.remove(peer).map(|(_, client)| client)
} }
/// Borrows a [`Client`] from the pool, the [`Client`] is wrapped in [`ClientPoolDropGuard`] which
/// will return the client to the pool when it's dropped.
pub fn borrow_client( pub fn borrow_client(
self: &Arc<Self>, self: &Arc<Self>,
peer: &InternalPeerID<N::Addr>, peer: &InternalPeerID<N::Addr>,
) -> Option<ClientPoolDropGuard<N>> { ) -> Option<ClientPoolDropGuard<N>> {
self.outbound_clients.remove(peer);
self.remove_client(peer).map(|client| ClientPoolDropGuard { self.remove_client(peer).map(|client| ClientPoolDropGuard {
pool: Arc::clone(self), pool: Arc::clone(self),
client: Some(client), client: Some(client),
}) })
} }
pub fn borrow_clients( /// Borrows multiple [`Client`]s from the pool.
self: &Arc<Self>, ///
peers: &[InternalPeerID<N::Addr>], /// The returned iterator is not guaranteed to contain ever peer asked for,
) -> Vec<ClientPoolDropGuard<N>> { #[allow(private_interfaces)] // TODO: Remove me when 2024 Rust
peers pub fn borrow_clients<'a, 'b>(
.iter() self: &'a Arc<Self>,
.filter_map(|peer| self.borrow_client(peer)) peers: &'b [InternalPeerID<N::Addr>],
.collect() ) -> impl Iterator<Item = ClientPoolDropGuard<N>> + Captures<(&'a (), &'b ())> {
peers.iter().filter_map(|peer| self.borrow_client(peer))
} }
} }
/// TODO: Remove me when 2024 Rust
///
/// https://rust-lang.github.io/rfcs/3498-lifetime-capture-rules-2024.html#the-captures-trick
trait Captures<U> {}
impl<T: ?Sized, U> Captures<U> for T {}

View file

@ -1,3 +1,7 @@
//! # Disconnect Monitor
//!
//! This module contains the [`disconnect_monitor`] task, which monitors connected peers for disconnection
//! and the removes them from the [`ClientPool`] if they do.
use std::{ use std::{
future::Future, future::Future,
pin::Pin, pin::Pin,
@ -14,7 +18,8 @@ use monero_p2p::{client::InternalPeerID, handles::ConnectionHandle, NetworkZone}
use super::ClientPool; use super::ClientPool;
#[instrument(level="info", skip_all, fields(network=N::NAME))] /// The disconnect monitor task.
#[instrument(level = "info", skip_all)]
pub async fn disconnect_monitor<N: NetworkZone>( pub async fn disconnect_monitor<N: NetworkZone>(
mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID<N::Addr>)>, mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID<N::Addr>)>,
client_pool: Arc<ClientPool<N>>, client_pool: Arc<ClientPool<N>>,
@ -33,7 +38,7 @@ pub async fn disconnect_monitor<N: NetworkZone>(
}); });
} }
Some(peer_id) = futs.next() => { Some(peer_id) = futs.next() => {
tracing::debug!("{peer_id} has disconnecting, removing from peer_set."); tracing::debug!("{peer_id} has disconnected, removing from client pool.");
client_pool.remove_client(&peer_id); client_pool.remove_client(&peer_id);
} }
else => { else => {
@ -44,10 +49,13 @@ pub async fn disconnect_monitor<N: NetworkZone>(
} }
} }
/// A [`Future`] that resolves when a peer disconnects.
#[pin_project::pin_project] #[pin_project::pin_project]
struct PeerDisconnectFut<N: NetworkZone> { struct PeerDisconnectFut<N: NetworkZone> {
/// The inner [`Future`] that resolves when a peer disconnects.
#[pin] #[pin]
closed_fut: WaitForCancellationFutureOwned, closed_fut: WaitForCancellationFutureOwned,
/// The peers ID.
peer_id: Option<InternalPeerID<N::Addr>>, peer_id: Option<InternalPeerID<N::Addr>>,
} }

View file

@ -1,15 +1,17 @@
use monero_p2p::client::Client;
use std::{ use std::{
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
sync::Arc, sync::Arc,
}; };
use monero_p2p::NetworkZone; use monero_p2p::{client::Client, NetworkZone};
use crate::client_pool::ClientPool; use crate::client_pool::ClientPool;
/// A wrapper around [`Client`] which returns the client to the [`ClientPool`] when dropped.
pub struct ClientPoolDropGuard<N: NetworkZone> { pub struct ClientPoolDropGuard<N: NetworkZone> {
/// The [`ClientPool`] to return the peer to.
pub(super) pool: Arc<ClientPool<N>>, pub(super) pool: Arc<ClientPool<N>>,
/// The [`Client`].
pub(super) client: Option<Client<N>>, pub(super) client: Option<Client<N>>,
} }

View file

@ -23,9 +23,7 @@ use monero_p2p::{
use crate::{ use crate::{
client_pool::ClientPool, client_pool::ClientPool,
config::P2PConfig, config::P2PConfig,
constants::{ constants::{HANDSHAKE_TIMEOUT, MAX_SEED_CONNECTIONS, OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT},
HANDSHAKE_TIMEOUT, MAX_SEED_CONNECTIONS, OUTBOUND_CONNECTION_TIMEOUT, PEER_FIND_TIMEOUT,
},
}; };
enum OutboundConnectorError { enum OutboundConnectorError {
@ -271,13 +269,11 @@ where
biased; biased;
peer_req = self.make_connection_rx.recv() => { peer_req = self.make_connection_rx.recv() => {
let Some(peer_req) = peer_req else { let Some(peer_req) = peer_req else {
info!("Shutting down outbound connector, make_connection_rx closed."); info!("Shutting down outbound connector, make connection channel closed.");
return; return;
}; };
while self.handle_peer_request(&peer_req).await.is_err() { // We can't really do much about errors in this function.
warn!("Failed to find peer to connect to or to add a permit, trying again in {} seconds", PEER_FIND_TIMEOUT.as_secs()); let _ = self.handle_peer_request(&peer_req).await;
sleep(PEER_FIND_TIMEOUT).await;
}
}, },
// This future is not cancellation safe as you will lose your space in the queue but as we are the only place // This future is not cancellation safe as you will lose your space in the queue but as we are the only place
// that actually requires permits that should be ok. // that actually requires permits that should be ok.
@ -285,7 +281,7 @@ where
if self.handle_free_permit(permit).await.is_err() { if self.handle_free_permit(permit).await.is_err() {
// if we got an error then we still have a permit free so to prevent this from just looping // if we got an error then we still have a permit free so to prevent this from just looping
// uncontrollably add a timeout. // uncontrollably add a timeout.
sleep(OUTBOUND_CONNECTION_TIMEOUT).await; sleep(OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT).await;
} }
} }
} }

View file

@ -1,9 +1,10 @@
use std::time::Duration; use std::time::Duration;
/// The timeout we set on handshakes.
pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30); pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30);
/// The maximum amount of connections to make to seed nodes for when we need peers.
pub(crate) const MAX_SEED_CONNECTIONS: usize = 3; pub(crate) const MAX_SEED_CONNECTIONS: usize = 3;
pub(crate) const PEER_FIND_TIMEOUT: Duration = Duration::from_secs(30); /// The timeout for when we fail to find a peer to connect to.
pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5);
pub(crate) const OUTBOUND_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);

View file

@ -1,13 +1,10 @@
//! Cuprate's P2P Crate. //! Cuprate's P2P Crate.
//! //!
//! This crate contains a `PeerSet` which holds connected peers on a single [`NetworkZone`](monero_p2p::NetworkZone). //! This crate contains a [`ClientPool`](client_pool::ClientPool) which holds connected peers on a single [`NetworkZone`](monero_p2p::NetworkZone).
//! The `PeerSet` has methods to get peers by direction (inbound/outbound) or by a custom method like a load balancing
//! algorithm.
//! //!
//! This crate also contains the different routing methods that control how messages should be sent, i.e. broadcast to all, //! This crate also contains the different routing methods that control how messages should be sent, i.e. broadcast to all,
//! or send to a single peer. //! or send to a single peer.
//! //!
#![allow(dead_code)] #![allow(dead_code)]
mod client_pool; mod client_pool;