diff --git a/Cargo.lock b/Cargo.lock index b08aee58..c17cb7c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -576,6 +576,36 @@ dependencies = [ "windows", ] +[[package]] +name = "cuprate-p2p" +version = "0.1.0" +dependencies = [ + "bytes", + "cuprate-helper", + "cuprate-test-utils", + "dashmap", + "fixed-bytes", + "futures", + "hex", + "indexmap 2.2.6", + "monero-address-book", + "monero-p2p", + "monero-pruning", + "monero-serai", + "monero-wire", + "pin-project", + "rand", + "rand_distr", + "rayon", + "thiserror", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tracing", + "tracing-subscriber", +] + [[package]] name = "cuprate-test-utils" version = "0.1.0" @@ -671,6 +701,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "diff" version = "0.1.13" diff --git a/p2p/cuprate-p2p/src/client_pool.rs b/p2p/cuprate-p2p/src/client_pool.rs index 0a2397aa..97487c3b 100644 --- a/p2p/cuprate-p2p/src/client_pool.rs +++ b/p2p/cuprate-p2p/src/client_pool.rs @@ -1,4 +1,14 @@ -//! This module contains the peer set and related functionality. +//! # Client Pool. +//! +//! The [`ClientPool`], is a pool of currently connected peers that can be pulled from. +//! It does _not_ necessarily contain every connected peer as another place could have +//! taken a peer from the pool. +//! +//! When taking peers from the pool they are wrapped in [`ClientPoolDropGuard`], which +//! returns the peer to the pool when it is dropped. +//! +//! Internally the pool is a [`DashMap`] which means care should be taken in `async` code +//! as internally this uses blocking RwLocks. //! use std::sync::Arc; @@ -13,13 +23,21 @@ use monero_p2p::{ mod disconnect_monitor; mod drop_guard_client; -pub use drop_guard_client::ClientPoolGuard; +pub use drop_guard_client::ClientPoolDropGuard; use monero_p2p::handles::ConnectionHandle; +/// The client pool, which holds currently connected free peers. +/// +/// See the [module docs](self) for more. pub struct ClientPool<N: NetworkZone> { + /// The connected [`Client`]s. clients: DashMap<InternalPeerID<N::Addr>, Client<N>>, + /// A set of outbound clients, as these allow accesses/ mutation from different threads + /// a peer ID in here does not mean the peer is definitely in `clients` , if the peer is + /// in both here and `clients` it is defiantly an outbound peer, outbound_clients: DashSet<InternalPeerID<N::Addr>>, + /// A channel to send new peer ids down to monitor for disconnect. new_connection_tx: mpsc::UnboundedSender<(ConnectionHandle, InternalPeerID<N::Addr>)>, } @@ -76,10 +94,10 @@ impl<N: NetworkZone> ClientPool<N> { pub fn borrow_client( self: &Arc<Self>, peer: &InternalPeerID<N::Addr>, - ) -> Option<ClientPoolGuard<N>> { + ) -> Option<ClientPoolDropGuard<N>> { self.outbound_clients.remove(peer); - self.remove_client(peer).map(|client| ClientPoolGuard { + self.remove_client(peer).map(|client| ClientPoolDropGuard { pool: Arc::clone(self), client: Some(client), }) @@ -88,7 +106,7 @@ impl<N: NetworkZone> ClientPool<N> { pub fn borrow_clients( self: &Arc<Self>, peers: &[InternalPeerID<N::Addr>], - ) -> Vec<ClientPoolGuard<N>> { + ) -> Vec<ClientPoolDropGuard<N>> { peers .iter() .filter_map(|peer| self.borrow_client(peer)) diff --git a/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs b/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs index 8342e073..cf100e8e 100644 --- a/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs +++ b/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs @@ -8,12 +8,12 @@ use monero_p2p::NetworkZone; use crate::client_pool::ClientPool; -pub struct ClientPoolGuard<N: NetworkZone> { +pub struct ClientPoolDropGuard<N: NetworkZone> { pub(super) pool: Arc<ClientPool<N>>, pub(super) client: Option<Client<N>>, } -impl<N: NetworkZone> Deref for ClientPoolGuard<N> { +impl<N: NetworkZone> Deref for ClientPoolDropGuard<N> { type Target = Client<N>; fn deref(&self) -> &Self::Target { @@ -21,13 +21,13 @@ impl<N: NetworkZone> Deref for ClientPoolGuard<N> { } } -impl<N: NetworkZone> DerefMut for ClientPoolGuard<N> { +impl<N: NetworkZone> DerefMut for ClientPoolDropGuard<N> { fn deref_mut(&mut self) -> &mut Self::Target { self.client.as_mut().unwrap() } } -impl<N: NetworkZone> Drop for ClientPoolGuard<N> { +impl<N: NetworkZone> Drop for ClientPoolDropGuard<N> { fn drop(&mut self) { let client = self.client.take().unwrap(); diff --git a/p2p/cuprate-p2p/src/connection_maintainer.rs b/p2p/cuprate-p2p/src/connection_maintainer.rs index 78568650..0b7a492f 100644 --- a/p2p/cuprate-p2p/src/connection_maintainer.rs +++ b/p2p/cuprate-p2p/src/connection_maintainer.rs @@ -43,7 +43,7 @@ pub struct MakeConnectionRequest { block_needed: Option<u64>, } -/// The outbound connection (count) keeper. +/// The outbound connection count keeper. /// /// This handles maintaining a minimum number of connections and making extra connections when needed, upto a maximum. pub struct OutboundConnectionKeeper<N: NetworkZone, A, C> {