P2P Client Pool (#121)

* p2p changes

* clippy

* a few more docs

* init cuprate-p2p

* remove some unrelated code and add some docs

* start documenting client_pool.rs

* add more docs

* typo

* fix docs

* use JoinSet in connection maintainer

* small changes

* Apply suggestions from code review

Co-authored-by: hinto-janai <hinto.janai@protonmail.com>

* review changes

* Update p2p/cuprate-p2p/src/connection_maintainer.rs

Co-authored-by: hinto-janai <hinto.janai@protonmail.com>

---------

Co-authored-by: hinto-janai <hinto.janai@protonmail.com>
This commit is contained in:
Boog900 2024-05-17 13:52:51 +00:00 committed by GitHub
parent 38ede35468
commit c5fbbcc6e8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 669 additions and 0 deletions

42
Cargo.lock generated
View file

@ -576,6 +576,35 @@ dependencies = [
"windows",
]
[[package]]
name = "cuprate-p2p"
version = "0.1.0"
dependencies = [
"bytes",
"cuprate-helper",
"cuprate-test-utils",
"dashmap",
"fixed-bytes",
"futures",
"hex",
"indexmap 2.2.6",
"monero-address-book",
"monero-p2p",
"monero-pruning",
"monero-serai",
"monero-wire",
"pin-project",
"rand",
"rand_distr",
"rayon",
"thiserror",
"tokio",
"tokio-stream",
"tokio-util",
"tower",
"tracing",
]
[[package]]
name = "cuprate-test-utils"
version = "0.1.0"
@ -671,6 +700,19 @@ dependencies = [
"tracing",
]
[[package]]
name = "dashmap"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "diff"
version = "0.1.13"

View file

@ -11,6 +11,7 @@ members = [
"net/fixed-bytes",
"net/levin",
"net/monero-wire",
"p2p/cuprate-p2p",
"p2p/dandelion",
"p2p/monero-p2p",
"p2p/address-book",
@ -50,6 +51,7 @@ crypto-bigint = { version = "0.5.5", default-features = false }
crossbeam = { version = "0.8.4", default-features = false }
curve25519-dalek = { version = "4.1.1", default-features = false }
dalek-ff-group = { git = "https://github.com/Cuprate/serai.git", rev = "347d4cf", default-features = false }
dashmap = { version = "5.5.3", default-features = false }
dirs = { version = "5.0.1", default-features = false }
futures = { version = "0.3.29", default-features = false }
hex = { version = "0.4.3", default-features = false }

View file

@ -0,0 +1,36 @@
[package]
name = "cuprate-p2p"
version = "0.1.0"
edition = "2021"
license = "MIT"
authors = ["Boog900"]
[dependencies]
fixed-bytes = { path = "../../net/fixed-bytes" }
monero-wire = { path = "../../net/monero-wire" }
monero-p2p = { path = "../monero-p2p", features = ["borsh"] }
monero-address-book = { path = "../address-book" }
monero-pruning = { path = "../../pruning" }
cuprate-helper = { path = "../../helper", features = ["asynch"] }
monero-serai = { workspace = true, features = ["std"] }
tower = { workspace = true }
tokio = { workspace = true, features = ["rt"] }
rayon = { workspace = true }
tokio-util = { workspace = true }
tokio-stream = { workspace = true, features = ["sync", "time"] }
futures = { workspace = true, features = ["std"] }
pin-project = { workspace = true }
dashmap = { workspace = true }
thiserror = { workspace = true }
bytes = { workspace = true, features = ["std"] }
indexmap = { workspace = true, features = ["std"] }
rand = { workspace = true, features = ["std", "std_rng"] }
rand_distr = { workspace = true, features = ["std"] }
hex = { workspace = true, features = ["std"] }
tracing = { workspace = true, features = ["std", "attributes"] }
[dev-dependencies]
cuprate-test-utils = { path = "../../test-utils" }

View file

@ -0,0 +1,148 @@
//! # Client Pool.
//!
//! The [`ClientPool`], is a pool of currently connected peers that can be pulled from.
//! It does _not_ necessarily contain every connected peer as another place could have
//! taken a peer from the pool.
//!
//! When taking peers from the pool they are wrapped in [`ClientPoolDropGuard`], which
//! returns the peer to the pool when it is dropped.
//!
//! Internally the pool is a [`DashMap`] which means care should be taken in `async` code
//! as internally this uses blocking RwLocks.
//!
use std::sync::Arc;
use dashmap::{DashMap, DashSet};
use tokio::sync::mpsc;
use monero_p2p::{
client::{Client, InternalPeerID},
handles::ConnectionHandle,
ConnectionDirection, NetworkZone,
};
mod disconnect_monitor;
mod drop_guard_client;
pub use drop_guard_client::ClientPoolDropGuard;
/// The client pool, which holds currently connected free peers.
///
/// See the [module docs](self) for more.
pub struct ClientPool<N: NetworkZone> {
/// The connected [`Client`]s.
clients: DashMap<InternalPeerID<N::Addr>, Client<N>>,
/// A set of outbound clients, as these allow accesses/mutation from different threads,
/// a peer ID in here does not mean the peer is necessarily in `clients` as it could have been removed
/// by another thread. However, if the peer is in both here and `clients` it is definitely
/// an outbound peer.
outbound_clients: DashSet<InternalPeerID<N::Addr>>,
/// A channel to send new peer ids down to monitor for disconnect.
new_connection_tx: mpsc::UnboundedSender<(ConnectionHandle, InternalPeerID<N::Addr>)>,
}
impl<N: NetworkZone> ClientPool<N> {
/// Returns a new [`ClientPool`] wrapped in an [`Arc`].
pub fn new() -> Arc<ClientPool<N>> {
let (tx, rx) = mpsc::unbounded_channel();
let pool = Arc::new(ClientPool {
clients: DashMap::new(),
outbound_clients: DashSet::new(),
new_connection_tx: tx,
});
tokio::spawn(disconnect_monitor::disconnect_monitor(rx, pool.clone()));
pool
}
/// Adds a [`Client`] to the pool, the client must have previously been taken from the
/// pool.
///
/// See [`ClientPool::add_new_client`] to add a [`Client`] which was not taken from the pool before.
///
/// # Panics
/// This function panics if `client` already exists in the pool.
fn add_client(&self, client: Client<N>) {
let handle = client.info.handle.clone();
let id = client.info.id;
// Fast path: if the client is disconnected don't add it to the peer set.
if handle.is_closed() {
return;
}
if client.info.direction == ConnectionDirection::OutBound {
self.outbound_clients.insert(id);
}
let res = self.clients.insert(id, client);
assert!(res.is_none());
// We have to check this again otherwise we could have a race condition where a
// peer is disconnected after the first check, the disconnect monitor tries to remove it,
// and then it is added to the pool.
if handle.is_closed() {
self.remove_client(&id);
}
}
/// Adds a _new_ [`Client`] to the pool, this client should be a new connection, and not already
/// from the pool.
///
/// # Panics
/// This function panics if `client` already exists in the pool.
pub fn add_new_client(&self, client: Client<N>) {
self.new_connection_tx
.send((client.info.handle.clone(), client.info.id))
.unwrap();
self.add_client(client);
}
/// Remove a [`Client`] from the pool.
///
/// [`None`] is returned if the client did not exist in the pool.
fn remove_client(&self, peer: &InternalPeerID<N::Addr>) -> Option<Client<N>> {
self.outbound_clients.remove(peer);
self.clients.remove(peer).map(|(_, client)| client)
}
/// Borrows a [`Client`] from the pool.
///
/// The [`Client`] is wrapped in [`ClientPoolDropGuard`] which
/// will return the client to the pool when it's dropped.
///
/// See [`Self::borrow_clients`] for borrowing multiple clients.
pub fn borrow_client(
self: &Arc<Self>,
peer: &InternalPeerID<N::Addr>,
) -> Option<ClientPoolDropGuard<N>> {
self.remove_client(peer).map(|client| ClientPoolDropGuard {
pool: Arc::clone(self),
client: Some(client),
})
}
/// Borrows multiple [`Client`]s from the pool.
///
/// Note that the returned iterator is not guaranteed to contain every peer asked for.
///
/// See [`Self::borrow_client`] for borrowing a single client.
#[allow(private_interfaces)] // TODO: Remove me when 2024 Rust
pub fn borrow_clients<'a, 'b>(
self: &'a Arc<Self>,
peers: &'b [InternalPeerID<N::Addr>],
) -> impl Iterator<Item = ClientPoolDropGuard<N>> + Captures<(&'a (), &'b ())> {
peers.iter().filter_map(|peer| self.borrow_client(peer))
}
}
/// TODO: Remove me when 2024 Rust
///
/// https://rust-lang.github.io/rfcs/3498-lifetime-capture-rules-2024.html#the-captures-trick
trait Captures<U> {}
impl<T: ?Sized, U> Captures<U> for T {}

View file

@ -0,0 +1,72 @@
//! # Disconnect Monitor
//!
//! This module contains the [`disconnect_monitor`] task, which monitors connected peers for disconnection
//! and then removes them from the [`ClientPool`] if they do.
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures::{stream::FuturesUnordered, StreamExt};
use tokio::sync::mpsc;
use tokio_util::sync::WaitForCancellationFutureOwned;
use tracing::instrument;
use monero_p2p::{client::InternalPeerID, handles::ConnectionHandle, NetworkZone};
use super::ClientPool;
/// The disconnect monitor task.
#[instrument(level = "info", skip_all)]
pub async fn disconnect_monitor<N: NetworkZone>(
mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID<N::Addr>)>,
client_pool: Arc<ClientPool<N>>,
) {
tracing::info!("Starting peer disconnect monitor.");
let mut futs: FuturesUnordered<PeerDisconnectFut<N>> = FuturesUnordered::new();
loop {
tokio::select! {
Some((con_handle, peer_id)) = new_connection_rx.recv() => {
tracing::debug!("Monitoring {peer_id} for disconnect");
futs.push(PeerDisconnectFut {
closed_fut: con_handle.closed(),
peer_id: Some(peer_id),
});
}
Some(peer_id) = futs.next() => {
tracing::debug!("{peer_id} has disconnected, removing from client pool.");
client_pool.remove_client(&peer_id);
}
else => {
tracing::info!("Peer disconnect monitor shutting down.");
return;
}
}
}
}
/// A [`Future`] that resolves when a peer disconnects.
#[pin_project::pin_project]
struct PeerDisconnectFut<N: NetworkZone> {
/// The inner [`Future`] that resolves when a peer disconnects.
#[pin]
closed_fut: WaitForCancellationFutureOwned,
/// The peers ID.
peer_id: Option<InternalPeerID<N::Addr>>,
}
impl<N: NetworkZone> Future for PeerDisconnectFut<N> {
type Output = InternalPeerID<N::Addr>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.closed_fut
.poll(cx)
.map(|_| this.peer_id.take().unwrap())
}
}

View file

@ -0,0 +1,41 @@
use std::{
ops::{Deref, DerefMut},
sync::Arc,
};
use monero_p2p::{client::Client, NetworkZone};
use crate::client_pool::ClientPool;
/// A wrapper around [`Client`] which returns the client to the [`ClientPool`] when dropped.
pub struct ClientPoolDropGuard<N: NetworkZone> {
/// The [`ClientPool`] to return the peer to.
pub(super) pool: Arc<ClientPool<N>>,
/// The [`Client`].
///
/// This is set to [`Some`] when this guard is created, then
/// ### [`take`](Option::take)n and returned to the pool when dropped.
pub(super) client: Option<Client<N>>,
}
impl<N: NetworkZone> Deref for ClientPoolDropGuard<N> {
type Target = Client<N>;
fn deref(&self) -> &Self::Target {
self.client.as_ref().unwrap()
}
}
impl<N: NetworkZone> DerefMut for ClientPoolDropGuard<N> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.client.as_mut().unwrap()
}
}
impl<N: NetworkZone> Drop for ClientPoolDropGuard<N> {
fn drop(&mut self) {
let client = self.client.take().unwrap();
self.pool.add_client(client);
}
}

View file

@ -0,0 +1,12 @@
/// P2P config.
#[derive(Clone, Debug)]
pub struct P2PConfig {
/// The number of outbound connections to make and try keep.
pub outbound_connections: usize,
/// The amount of extra connections we can make if we are under load from the rest of Cuprate.
pub extra_outbound_connections: usize,
/// The percent of outbound peers that should be gray aka never connected to before.
///
/// Only values 0..=1 are valid.
pub gray_peers_percent: f64,
}

View file

@ -0,0 +1,291 @@
//! Outbound Connection Maintainer.
//!
//! This module handles maintaining the number of outbound connections defined in the [`P2PConfig`].
//! It also handles making extra connections when the peer set is under load or when we need data that
//! no connected peer has.
use std::sync::Arc;
use rand::{distributions::Bernoulli, prelude::*};
use tokio::{
sync::{mpsc, OwnedSemaphorePermit, Semaphore},
task::JoinSet,
time::{sleep, timeout},
};
use tower::{Service, ServiceExt};
use tracing::instrument;
use monero_p2p::{
client::{Client, ConnectRequest, HandshakeError},
services::{AddressBookRequest, AddressBookResponse},
AddressBook, NetworkZone,
};
use crate::{
client_pool::ClientPool,
config::P2PConfig,
constants::{HANDSHAKE_TIMEOUT, MAX_SEED_CONNECTIONS, OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT},
};
enum OutboundConnectorError {
MaxConnections,
FailedToConnectToSeeds,
NoAvailablePeers,
}
/// A request from the peer set to make an outbound connection.
///
/// This will only be sent when the peer set is under load from the rest of Cuprate or the peer
/// set needs specific data that none of the currently connected peers have.
pub struct MakeConnectionRequest {
/// The block needed that no connected peers have due to pruning.
block_needed: Option<u64>,
}
/// The outbound connection count keeper.
///
/// This handles maintaining a minimum number of connections and making extra connections when needed, upto a maximum.
pub struct OutboundConnectionKeeper<N: NetworkZone, A, C> {
/// The pool of currently connected peers.
pub client_pool: Arc<ClientPool<N>>,
/// The channel that tells us to make new _extra_ outbound connections.
pub make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
/// The address book service
pub address_book_svc: A,
/// The service to connect to a specific peer.
pub connector_svc: C,
/// A semaphore to keep the amount of outbound peers constant.
pub outbound_semaphore: Arc<Semaphore>,
/// The amount of peers we connected to because we needed more peers. If the `outbound_semaphore`
/// is full, and we need to connect to more peers for blocks or because not enough peers are ready
/// we add a permit to the semaphore and keep track here, upto a value in config.
pub extra_peers: usize,
/// The p2p config.
pub config: P2PConfig,
/// The [`Bernoulli`] distribution, when sampled will return true if we should connect to a gray peer or
/// false if we should connect to a white peer.
///
/// This is weighted to the percentage given in `config`.
pub peer_type_gen: Bernoulli,
}
impl<N, A, C> OutboundConnectionKeeper<N, A, C>
where
N: NetworkZone,
A: AddressBook<N>,
C: Service<ConnectRequest<N>, Response = Client<N>, Error = HandshakeError>,
C::Future: Send + 'static,
{
pub fn new(
config: P2PConfig,
client_pool: Arc<ClientPool<N>>,
make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
address_book_svc: A,
connector_svc: C,
) -> Self {
let peer_type_gen = Bernoulli::new(config.gray_peers_percent)
.expect("Gray peer percent is incorrect should be 0..=1");
Self {
client_pool,
make_connection_rx,
address_book_svc,
connector_svc,
outbound_semaphore: Arc::new(Semaphore::new(config.outbound_connections)),
extra_peers: 0,
config,
peer_type_gen,
}
}
/// Connects to random seeds to get peers and immediately disconnects
#[instrument(level = "info", skip(self))]
async fn connect_to_random_seeds(&mut self) -> Result<(), OutboundConnectorError> {
let seeds = N::SEEDS.choose_multiple(&mut thread_rng(), MAX_SEED_CONNECTIONS);
if seeds.len() == 0 {
panic!("No seed nodes available to get peers from");
}
// This isn't really needed here to limit connections as the seed nodes will be dropped when we have got
// peers from them.
let semaphore = Arc::new(Semaphore::new(seeds.len()));
let mut allowed_errors = seeds.len();
let mut handshake_futs = JoinSet::new();
for seed in seeds {
tracing::info!("Getting peers from seed node: {}", seed);
let fut = timeout(
HANDSHAKE_TIMEOUT,
self.connector_svc
.ready()
.await
.expect("Connector had an error in `poll_ready`")
.call(ConnectRequest {
addr: *seed,
permit: semaphore
.clone()
.try_acquire_owned()
.expect("This must have enough permits as we just set the amount."),
}),
);
// Spawn the handshake on a separate task with a timeout, so we don't get stuck connecting to a peer.
handshake_futs.spawn(fut);
}
while let Some(res) = handshake_futs.join_next().await {
if matches!(res, Err(_) | Ok(Err(_)) | Ok(Ok(Err(_)))) {
allowed_errors -= 1;
}
}
if allowed_errors == 0 {
Err(OutboundConnectorError::FailedToConnectToSeeds)
} else {
Ok(())
}
}
/// Connects to a given outbound peer.
#[instrument(level = "info", skip(self, permit), fields(%addr))]
async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) {
let client_pool = self.client_pool.clone();
let connection_fut = self
.connector_svc
.ready()
.await
.expect("Connector had an error in `poll_ready`")
.call(ConnectRequest { addr, permit });
tokio::spawn(async move {
if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await {
client_pool.add_new_client(peer);
}
});
}
/// Handles a request from the peer set for more peers.
async fn handle_peer_request(
&mut self,
req: &MakeConnectionRequest,
) -> Result<(), OutboundConnectorError> {
// try to get a permit.
let permit = self
.outbound_semaphore
.clone()
.try_acquire_owned()
.or_else(|_| {
// if we can't get a permit add one if we are below the max number of connections.
if self.extra_peers >= self.config.extra_outbound_connections {
// If we can't add a permit return an error.
Err(OutboundConnectorError::MaxConnections)
} else {
self.outbound_semaphore.add_permits(1);
self.extra_peers += 1;
Ok(self.outbound_semaphore.clone().try_acquire_owned().unwrap())
}
})?;
// try to get a random peer on any network zone from the address book.
let peer = self
.address_book_svc
.ready()
.await
.expect("Error in address book!")
.call(AddressBookRequest::TakeRandomPeer {
height: req.block_needed,
})
.await;
match peer {
Err(_) => {
// TODO: We should probably send peer requests to our connected peers rather than go to seeds.
tracing::warn!("No peers in address book which are available and have the data we need. Getting peers from seed nodes.");
self.connect_to_random_seeds().await?;
Err(OutboundConnectorError::NoAvailablePeers)
}
Ok(AddressBookResponse::Peer(peer)) => {
self.connect_to_outbound_peer(permit, peer.adr).await;
Ok(())
}
Ok(_) => panic!("peer list sent incorrect response!"),
}
}
/// Handles a free permit, by either connecting to a new peer or by removing a permit if we are above the
/// minimum number of outbound connections.
#[instrument(level = "debug", skip(self, permit))]
async fn handle_free_permit(
&mut self,
permit: OwnedSemaphorePermit,
) -> Result<(), OutboundConnectorError> {
if self.extra_peers > 0 {
tracing::debug!(
"Permit available but we are over the minimum number of peers, forgetting permit."
);
permit.forget();
self.extra_peers -= 1;
return Ok(());
}
tracing::debug!("Permit available, making outbound connection.");
let req = if self.peer_type_gen.sample(&mut thread_rng()) {
AddressBookRequest::TakeRandomGrayPeer { height: None }
} else {
// This will try white peers first then gray.
AddressBookRequest::TakeRandomPeer { height: None }
};
let Ok(AddressBookResponse::Peer(peer)) = self
.address_book_svc
.ready()
.await
.expect("Error in address book!")
.call(req)
.await
else {
tracing::warn!("No peers in peer list to make connection to.");
self.connect_to_random_seeds().await?;
return Err(OutboundConnectorError::NoAvailablePeers);
};
self.connect_to_outbound_peer(permit, peer.adr).await;
Ok(())
}
/// Runs the outbound connection count keeper.
pub async fn run(mut self) {
tracing::info!(
"Starting outbound connection maintainer, target outbound connections: {}",
self.config.outbound_connections
);
loop {
tokio::select! {
biased;
peer_req = self.make_connection_rx.recv() => {
let Some(peer_req) = peer_req else {
tracing::info!("Shutting down outbound connector, make connection channel closed.");
return;
};
// We can't really do much about errors in this function.
let _ = self.handle_peer_request(&peer_req).await;
},
// This future is not cancellation safe as you will lose your space in the queue but as we are the only place
// that actually requires permits that should be ok.
Ok(permit) = self.outbound_semaphore.clone().acquire_owned() => {
if self.handle_free_permit(permit).await.is_err() {
// if we got an error then we still have a permit free so to prevent this from just looping
// uncontrollably add a timeout.
sleep(OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT).await;
}
}
}
}
}
}

View file

@ -0,0 +1,10 @@
use std::time::Duration;
/// The timeout we set on handshakes.
pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30);
/// The maximum amount of connections to make to seed nodes for when we need peers.
pub(crate) const MAX_SEED_CONNECTIONS: usize = 3;
/// The timeout for when we fail to find a peer to connect to.
pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5);

View file

@ -0,0 +1,15 @@
//! Cuprate's P2P Crate.
//!
//! This crate contains a [`ClientPool`](client_pool::ClientPool) which holds connected peers on a single [`NetworkZone`](monero_p2p::NetworkZone).
//!
//! This crate also contains the different routing methods that control how messages should be sent, i.e. broadcast to all,
//! or send to a single peer.
//!
#![allow(dead_code)]
pub mod client_pool;
pub mod config;
pub mod connection_maintainer;
mod constants;
pub use config::P2PConfig;