From c5fbbcc6e8c98e7ee4f647629d9972b3afc7ab66 Mon Sep 17 00:00:00 2001 From: Boog900 Date: Fri, 17 May 2024 13:52:51 +0000 Subject: [PATCH] P2P Client Pool (#121) * p2p changes * clippy * a few more docs * init cuprate-p2p * remove some unrelated code and add some docs * start documenting client_pool.rs * add more docs * typo * fix docs * use JoinSet in connection maintainer * small changes * Apply suggestions from code review Co-authored-by: hinto-janai * review changes * Update p2p/cuprate-p2p/src/connection_maintainer.rs Co-authored-by: hinto-janai --------- Co-authored-by: hinto-janai --- Cargo.lock | 42 +++ Cargo.toml | 2 + p2p/cuprate-p2p/Cargo.toml | 36 +++ p2p/cuprate-p2p/src/client_pool.rs | 148 +++++++++ .../src/client_pool/disconnect_monitor.rs | 72 +++++ .../src/client_pool/drop_guard_client.rs | 41 +++ p2p/cuprate-p2p/src/config.rs | 12 + p2p/cuprate-p2p/src/connection_maintainer.rs | 291 ++++++++++++++++++ p2p/cuprate-p2p/src/constants.rs | 10 + p2p/cuprate-p2p/src/lib.rs | 15 + 10 files changed, 669 insertions(+) create mode 100644 p2p/cuprate-p2p/Cargo.toml create mode 100644 p2p/cuprate-p2p/src/client_pool.rs create mode 100644 p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs create mode 100644 p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs create mode 100644 p2p/cuprate-p2p/src/config.rs create mode 100644 p2p/cuprate-p2p/src/connection_maintainer.rs create mode 100644 p2p/cuprate-p2p/src/constants.rs create mode 100644 p2p/cuprate-p2p/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index b08aee58..6a081885 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -576,6 +576,35 @@ dependencies = [ "windows", ] +[[package]] +name = "cuprate-p2p" +version = "0.1.0" +dependencies = [ + "bytes", + "cuprate-helper", + "cuprate-test-utils", + "dashmap", + "fixed-bytes", + "futures", + "hex", + "indexmap 2.2.6", + "monero-address-book", + "monero-p2p", + "monero-pruning", + "monero-serai", + "monero-wire", + "pin-project", + "rand", + "rand_distr", + "rayon", + "thiserror", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tracing", +] + [[package]] name = "cuprate-test-utils" version = "0.1.0" @@ -671,6 +700,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "diff" version = "0.1.13" diff --git a/Cargo.toml b/Cargo.toml index 48bad34b..987d93d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "net/fixed-bytes", "net/levin", "net/monero-wire", + "p2p/cuprate-p2p", "p2p/dandelion", "p2p/monero-p2p", "p2p/address-book", @@ -50,6 +51,7 @@ crypto-bigint = { version = "0.5.5", default-features = false } crossbeam = { version = "0.8.4", default-features = false } curve25519-dalek = { version = "4.1.1", default-features = false } dalek-ff-group = { git = "https://github.com/Cuprate/serai.git", rev = "347d4cf", default-features = false } +dashmap = { version = "5.5.3", default-features = false } dirs = { version = "5.0.1", default-features = false } futures = { version = "0.3.29", default-features = false } hex = { version = "0.4.3", default-features = false } diff --git a/p2p/cuprate-p2p/Cargo.toml b/p2p/cuprate-p2p/Cargo.toml new file mode 100644 index 00000000..d73684af --- /dev/null +++ b/p2p/cuprate-p2p/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "cuprate-p2p" +version = "0.1.0" +edition = "2021" +license = "MIT" +authors = ["Boog900"] + +[dependencies] +fixed-bytes = { path = "../../net/fixed-bytes" } +monero-wire = { path = "../../net/monero-wire" } +monero-p2p = { path = "../monero-p2p", features = ["borsh"] } +monero-address-book = { path = "../address-book" } +monero-pruning = { path = "../../pruning" } +cuprate-helper = { path = "../../helper", features = ["asynch"] } + +monero-serai = { workspace = true, features = ["std"] } + +tower = { workspace = true } +tokio = { workspace = true, features = ["rt"] } +rayon = { workspace = true } +tokio-util = { workspace = true } +tokio-stream = { workspace = true, features = ["sync", "time"] } +futures = { workspace = true, features = ["std"] } +pin-project = { workspace = true } +dashmap = { workspace = true } + +thiserror = { workspace = true } +bytes = { workspace = true, features = ["std"] } +indexmap = { workspace = true, features = ["std"] } +rand = { workspace = true, features = ["std", "std_rng"] } +rand_distr = { workspace = true, features = ["std"] } +hex = { workspace = true, features = ["std"] } +tracing = { workspace = true, features = ["std", "attributes"] } + +[dev-dependencies] +cuprate-test-utils = { path = "../../test-utils" } diff --git a/p2p/cuprate-p2p/src/client_pool.rs b/p2p/cuprate-p2p/src/client_pool.rs new file mode 100644 index 00000000..15969392 --- /dev/null +++ b/p2p/cuprate-p2p/src/client_pool.rs @@ -0,0 +1,148 @@ +//! # Client Pool. +//! +//! The [`ClientPool`], is a pool of currently connected peers that can be pulled from. +//! It does _not_ necessarily contain every connected peer as another place could have +//! taken a peer from the pool. +//! +//! When taking peers from the pool they are wrapped in [`ClientPoolDropGuard`], which +//! returns the peer to the pool when it is dropped. +//! +//! Internally the pool is a [`DashMap`] which means care should be taken in `async` code +//! as internally this uses blocking RwLocks. +//! +use std::sync::Arc; + +use dashmap::{DashMap, DashSet}; +use tokio::sync::mpsc; + +use monero_p2p::{ + client::{Client, InternalPeerID}, + handles::ConnectionHandle, + ConnectionDirection, NetworkZone, +}; + +mod disconnect_monitor; +mod drop_guard_client; + +pub use drop_guard_client::ClientPoolDropGuard; + +/// The client pool, which holds currently connected free peers. +/// +/// See the [module docs](self) for more. +pub struct ClientPool { + /// The connected [`Client`]s. + clients: DashMap, Client>, + /// A set of outbound clients, as these allow accesses/mutation from different threads, + /// a peer ID in here does not mean the peer is necessarily in `clients` as it could have been removed + /// by another thread. However, if the peer is in both here and `clients` it is definitely + /// an outbound peer. + outbound_clients: DashSet>, + + /// A channel to send new peer ids down to monitor for disconnect. + new_connection_tx: mpsc::UnboundedSender<(ConnectionHandle, InternalPeerID)>, +} + +impl ClientPool { + /// Returns a new [`ClientPool`] wrapped in an [`Arc`]. + pub fn new() -> Arc> { + let (tx, rx) = mpsc::unbounded_channel(); + + let pool = Arc::new(ClientPool { + clients: DashMap::new(), + outbound_clients: DashSet::new(), + new_connection_tx: tx, + }); + + tokio::spawn(disconnect_monitor::disconnect_monitor(rx, pool.clone())); + + pool + } + + /// Adds a [`Client`] to the pool, the client must have previously been taken from the + /// pool. + /// + /// See [`ClientPool::add_new_client`] to add a [`Client`] which was not taken from the pool before. + /// + /// # Panics + /// This function panics if `client` already exists in the pool. + fn add_client(&self, client: Client) { + let handle = client.info.handle.clone(); + let id = client.info.id; + + // Fast path: if the client is disconnected don't add it to the peer set. + if handle.is_closed() { + return; + } + + if client.info.direction == ConnectionDirection::OutBound { + self.outbound_clients.insert(id); + } + + let res = self.clients.insert(id, client); + assert!(res.is_none()); + + // We have to check this again otherwise we could have a race condition where a + // peer is disconnected after the first check, the disconnect monitor tries to remove it, + // and then it is added to the pool. + if handle.is_closed() { + self.remove_client(&id); + } + } + + /// Adds a _new_ [`Client`] to the pool, this client should be a new connection, and not already + /// from the pool. + /// + /// # Panics + /// This function panics if `client` already exists in the pool. + pub fn add_new_client(&self, client: Client) { + self.new_connection_tx + .send((client.info.handle.clone(), client.info.id)) + .unwrap(); + + self.add_client(client); + } + + /// Remove a [`Client`] from the pool. + /// + /// [`None`] is returned if the client did not exist in the pool. + fn remove_client(&self, peer: &InternalPeerID) -> Option> { + self.outbound_clients.remove(peer); + + self.clients.remove(peer).map(|(_, client)| client) + } + + /// Borrows a [`Client`] from the pool. + /// + /// The [`Client`] is wrapped in [`ClientPoolDropGuard`] which + /// will return the client to the pool when it's dropped. + /// + /// See [`Self::borrow_clients`] for borrowing multiple clients. + pub fn borrow_client( + self: &Arc, + peer: &InternalPeerID, + ) -> Option> { + self.remove_client(peer).map(|client| ClientPoolDropGuard { + pool: Arc::clone(self), + client: Some(client), + }) + } + + /// Borrows multiple [`Client`]s from the pool. + /// + /// Note that the returned iterator is not guaranteed to contain every peer asked for. + /// + /// See [`Self::borrow_client`] for borrowing a single client. + #[allow(private_interfaces)] // TODO: Remove me when 2024 Rust + pub fn borrow_clients<'a, 'b>( + self: &'a Arc, + peers: &'b [InternalPeerID], + ) -> impl Iterator> + Captures<(&'a (), &'b ())> { + peers.iter().filter_map(|peer| self.borrow_client(peer)) + } +} + +/// TODO: Remove me when 2024 Rust +/// +/// https://rust-lang.github.io/rfcs/3498-lifetime-capture-rules-2024.html#the-captures-trick +trait Captures {} +impl Captures for T {} diff --git a/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs b/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs new file mode 100644 index 00000000..01297352 --- /dev/null +++ b/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs @@ -0,0 +1,72 @@ +//! # Disconnect Monitor +//! +//! This module contains the [`disconnect_monitor`] task, which monitors connected peers for disconnection +//! and then removes them from the [`ClientPool`] if they do. +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use futures::{stream::FuturesUnordered, StreamExt}; +use tokio::sync::mpsc; +use tokio_util::sync::WaitForCancellationFutureOwned; +use tracing::instrument; + +use monero_p2p::{client::InternalPeerID, handles::ConnectionHandle, NetworkZone}; + +use super::ClientPool; + +/// The disconnect monitor task. +#[instrument(level = "info", skip_all)] +pub async fn disconnect_monitor( + mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID)>, + client_pool: Arc>, +) { + tracing::info!("Starting peer disconnect monitor."); + + let mut futs: FuturesUnordered> = FuturesUnordered::new(); + + loop { + tokio::select! { + Some((con_handle, peer_id)) = new_connection_rx.recv() => { + tracing::debug!("Monitoring {peer_id} for disconnect"); + futs.push(PeerDisconnectFut { + closed_fut: con_handle.closed(), + peer_id: Some(peer_id), + }); + } + Some(peer_id) = futs.next() => { + tracing::debug!("{peer_id} has disconnected, removing from client pool."); + client_pool.remove_client(&peer_id); + } + else => { + tracing::info!("Peer disconnect monitor shutting down."); + return; + } + } + } +} + +/// A [`Future`] that resolves when a peer disconnects. +#[pin_project::pin_project] +struct PeerDisconnectFut { + /// The inner [`Future`] that resolves when a peer disconnects. + #[pin] + closed_fut: WaitForCancellationFutureOwned, + /// The peers ID. + peer_id: Option>, +} + +impl Future for PeerDisconnectFut { + type Output = InternalPeerID; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + this.closed_fut + .poll(cx) + .map(|_| this.peer_id.take().unwrap()) + } +} diff --git a/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs b/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs new file mode 100644 index 00000000..5555d713 --- /dev/null +++ b/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs @@ -0,0 +1,41 @@ +use std::{ + ops::{Deref, DerefMut}, + sync::Arc, +}; + +use monero_p2p::{client::Client, NetworkZone}; + +use crate::client_pool::ClientPool; + +/// A wrapper around [`Client`] which returns the client to the [`ClientPool`] when dropped. +pub struct ClientPoolDropGuard { + /// The [`ClientPool`] to return the peer to. + pub(super) pool: Arc>, + /// The [`Client`]. + /// + /// This is set to [`Some`] when this guard is created, then + /// ### [`take`](Option::take)n and returned to the pool when dropped. + pub(super) client: Option>, +} + +impl Deref for ClientPoolDropGuard { + type Target = Client; + + fn deref(&self) -> &Self::Target { + self.client.as_ref().unwrap() + } +} + +impl DerefMut for ClientPoolDropGuard { + fn deref_mut(&mut self) -> &mut Self::Target { + self.client.as_mut().unwrap() + } +} + +impl Drop for ClientPoolDropGuard { + fn drop(&mut self) { + let client = self.client.take().unwrap(); + + self.pool.add_client(client); + } +} diff --git a/p2p/cuprate-p2p/src/config.rs b/p2p/cuprate-p2p/src/config.rs new file mode 100644 index 00000000..31b5ab12 --- /dev/null +++ b/p2p/cuprate-p2p/src/config.rs @@ -0,0 +1,12 @@ +/// P2P config. +#[derive(Clone, Debug)] +pub struct P2PConfig { + /// The number of outbound connections to make and try keep. + pub outbound_connections: usize, + /// The amount of extra connections we can make if we are under load from the rest of Cuprate. + pub extra_outbound_connections: usize, + /// The percent of outbound peers that should be gray aka never connected to before. + /// + /// Only values 0..=1 are valid. + pub gray_peers_percent: f64, +} diff --git a/p2p/cuprate-p2p/src/connection_maintainer.rs b/p2p/cuprate-p2p/src/connection_maintainer.rs new file mode 100644 index 00000000..bff4b9d5 --- /dev/null +++ b/p2p/cuprate-p2p/src/connection_maintainer.rs @@ -0,0 +1,291 @@ +//! Outbound Connection Maintainer. +//! +//! This module handles maintaining the number of outbound connections defined in the [`P2PConfig`]. +//! It also handles making extra connections when the peer set is under load or when we need data that +//! no connected peer has. +use std::sync::Arc; + +use rand::{distributions::Bernoulli, prelude::*}; +use tokio::{ + sync::{mpsc, OwnedSemaphorePermit, Semaphore}, + task::JoinSet, + time::{sleep, timeout}, +}; +use tower::{Service, ServiceExt}; +use tracing::instrument; + +use monero_p2p::{ + client::{Client, ConnectRequest, HandshakeError}, + services::{AddressBookRequest, AddressBookResponse}, + AddressBook, NetworkZone, +}; + +use crate::{ + client_pool::ClientPool, + config::P2PConfig, + constants::{HANDSHAKE_TIMEOUT, MAX_SEED_CONNECTIONS, OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT}, +}; + +enum OutboundConnectorError { + MaxConnections, + FailedToConnectToSeeds, + NoAvailablePeers, +} + +/// A request from the peer set to make an outbound connection. +/// +/// This will only be sent when the peer set is under load from the rest of Cuprate or the peer +/// set needs specific data that none of the currently connected peers have. +pub struct MakeConnectionRequest { + /// The block needed that no connected peers have due to pruning. + block_needed: Option, +} + +/// The outbound connection count keeper. +/// +/// This handles maintaining a minimum number of connections and making extra connections when needed, upto a maximum. +pub struct OutboundConnectionKeeper { + /// The pool of currently connected peers. + pub client_pool: Arc>, + /// The channel that tells us to make new _extra_ outbound connections. + pub make_connection_rx: mpsc::Receiver, + /// The address book service + pub address_book_svc: A, + /// The service to connect to a specific peer. + pub connector_svc: C, + /// A semaphore to keep the amount of outbound peers constant. + pub outbound_semaphore: Arc, + /// The amount of peers we connected to because we needed more peers. If the `outbound_semaphore` + /// is full, and we need to connect to more peers for blocks or because not enough peers are ready + /// we add a permit to the semaphore and keep track here, upto a value in config. + pub extra_peers: usize, + /// The p2p config. + pub config: P2PConfig, + /// The [`Bernoulli`] distribution, when sampled will return true if we should connect to a gray peer or + /// false if we should connect to a white peer. + /// + /// This is weighted to the percentage given in `config`. + pub peer_type_gen: Bernoulli, +} + +impl OutboundConnectionKeeper +where + N: NetworkZone, + A: AddressBook, + C: Service, Response = Client, Error = HandshakeError>, + C::Future: Send + 'static, +{ + pub fn new( + config: P2PConfig, + client_pool: Arc>, + make_connection_rx: mpsc::Receiver, + address_book_svc: A, + connector_svc: C, + ) -> Self { + let peer_type_gen = Bernoulli::new(config.gray_peers_percent) + .expect("Gray peer percent is incorrect should be 0..=1"); + + Self { + client_pool, + make_connection_rx, + address_book_svc, + connector_svc, + outbound_semaphore: Arc::new(Semaphore::new(config.outbound_connections)), + extra_peers: 0, + config, + peer_type_gen, + } + } + + /// Connects to random seeds to get peers and immediately disconnects + #[instrument(level = "info", skip(self))] + async fn connect_to_random_seeds(&mut self) -> Result<(), OutboundConnectorError> { + let seeds = N::SEEDS.choose_multiple(&mut thread_rng(), MAX_SEED_CONNECTIONS); + + if seeds.len() == 0 { + panic!("No seed nodes available to get peers from"); + } + + // This isn't really needed here to limit connections as the seed nodes will be dropped when we have got + // peers from them. + let semaphore = Arc::new(Semaphore::new(seeds.len())); + + let mut allowed_errors = seeds.len(); + + let mut handshake_futs = JoinSet::new(); + + for seed in seeds { + tracing::info!("Getting peers from seed node: {}", seed); + + let fut = timeout( + HANDSHAKE_TIMEOUT, + self.connector_svc + .ready() + .await + .expect("Connector had an error in `poll_ready`") + .call(ConnectRequest { + addr: *seed, + permit: semaphore + .clone() + .try_acquire_owned() + .expect("This must have enough permits as we just set the amount."), + }), + ); + // Spawn the handshake on a separate task with a timeout, so we don't get stuck connecting to a peer. + handshake_futs.spawn(fut); + } + + while let Some(res) = handshake_futs.join_next().await { + if matches!(res, Err(_) | Ok(Err(_)) | Ok(Ok(Err(_)))) { + allowed_errors -= 1; + } + } + + if allowed_errors == 0 { + Err(OutboundConnectorError::FailedToConnectToSeeds) + } else { + Ok(()) + } + } + + /// Connects to a given outbound peer. + #[instrument(level = "info", skip(self, permit), fields(%addr))] + async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) { + let client_pool = self.client_pool.clone(); + let connection_fut = self + .connector_svc + .ready() + .await + .expect("Connector had an error in `poll_ready`") + .call(ConnectRequest { addr, permit }); + + tokio::spawn(async move { + if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await { + client_pool.add_new_client(peer); + } + }); + } + + /// Handles a request from the peer set for more peers. + async fn handle_peer_request( + &mut self, + req: &MakeConnectionRequest, + ) -> Result<(), OutboundConnectorError> { + // try to get a permit. + let permit = self + .outbound_semaphore + .clone() + .try_acquire_owned() + .or_else(|_| { + // if we can't get a permit add one if we are below the max number of connections. + if self.extra_peers >= self.config.extra_outbound_connections { + // If we can't add a permit return an error. + Err(OutboundConnectorError::MaxConnections) + } else { + self.outbound_semaphore.add_permits(1); + self.extra_peers += 1; + Ok(self.outbound_semaphore.clone().try_acquire_owned().unwrap()) + } + })?; + + // try to get a random peer on any network zone from the address book. + let peer = self + .address_book_svc + .ready() + .await + .expect("Error in address book!") + .call(AddressBookRequest::TakeRandomPeer { + height: req.block_needed, + }) + .await; + + match peer { + Err(_) => { + // TODO: We should probably send peer requests to our connected peers rather than go to seeds. + tracing::warn!("No peers in address book which are available and have the data we need. Getting peers from seed nodes."); + + self.connect_to_random_seeds().await?; + Err(OutboundConnectorError::NoAvailablePeers) + } + + Ok(AddressBookResponse::Peer(peer)) => { + self.connect_to_outbound_peer(permit, peer.adr).await; + Ok(()) + } + Ok(_) => panic!("peer list sent incorrect response!"), + } + } + + /// Handles a free permit, by either connecting to a new peer or by removing a permit if we are above the + /// minimum number of outbound connections. + #[instrument(level = "debug", skip(self, permit))] + async fn handle_free_permit( + &mut self, + permit: OwnedSemaphorePermit, + ) -> Result<(), OutboundConnectorError> { + if self.extra_peers > 0 { + tracing::debug!( + "Permit available but we are over the minimum number of peers, forgetting permit." + ); + permit.forget(); + self.extra_peers -= 1; + return Ok(()); + } + + tracing::debug!("Permit available, making outbound connection."); + + let req = if self.peer_type_gen.sample(&mut thread_rng()) { + AddressBookRequest::TakeRandomGrayPeer { height: None } + } else { + // This will try white peers first then gray. + AddressBookRequest::TakeRandomPeer { height: None } + }; + + let Ok(AddressBookResponse::Peer(peer)) = self + .address_book_svc + .ready() + .await + .expect("Error in address book!") + .call(req) + .await + else { + tracing::warn!("No peers in peer list to make connection to."); + self.connect_to_random_seeds().await?; + return Err(OutboundConnectorError::NoAvailablePeers); + }; + + self.connect_to_outbound_peer(permit, peer.adr).await; + Ok(()) + } + + /// Runs the outbound connection count keeper. + pub async fn run(mut self) { + tracing::info!( + "Starting outbound connection maintainer, target outbound connections: {}", + self.config.outbound_connections + ); + + loop { + tokio::select! { + biased; + peer_req = self.make_connection_rx.recv() => { + let Some(peer_req) = peer_req else { + tracing::info!("Shutting down outbound connector, make connection channel closed."); + return; + }; + // We can't really do much about errors in this function. + let _ = self.handle_peer_request(&peer_req).await; + }, + // This future is not cancellation safe as you will lose your space in the queue but as we are the only place + // that actually requires permits that should be ok. + Ok(permit) = self.outbound_semaphore.clone().acquire_owned() => { + if self.handle_free_permit(permit).await.is_err() { + // if we got an error then we still have a permit free so to prevent this from just looping + // uncontrollably add a timeout. + sleep(OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT).await; + } + } + } + } + } +} diff --git a/p2p/cuprate-p2p/src/constants.rs b/p2p/cuprate-p2p/src/constants.rs new file mode 100644 index 00000000..58e263fa --- /dev/null +++ b/p2p/cuprate-p2p/src/constants.rs @@ -0,0 +1,10 @@ +use std::time::Duration; + +/// The timeout we set on handshakes. +pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30); + +/// The maximum amount of connections to make to seed nodes for when we need peers. +pub(crate) const MAX_SEED_CONNECTIONS: usize = 3; + +/// The timeout for when we fail to find a peer to connect to. +pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5); diff --git a/p2p/cuprate-p2p/src/lib.rs b/p2p/cuprate-p2p/src/lib.rs new file mode 100644 index 00000000..0c53b78f --- /dev/null +++ b/p2p/cuprate-p2p/src/lib.rs @@ -0,0 +1,15 @@ +//! Cuprate's P2P Crate. +//! +//! This crate contains a [`ClientPool`](client_pool::ClientPool) which holds connected peers on a single [`NetworkZone`](monero_p2p::NetworkZone). +//! +//! This crate also contains the different routing methods that control how messages should be sent, i.e. broadcast to all, +//! or send to a single peer. +//! +#![allow(dead_code)] + +pub mod client_pool; +pub mod config; +pub mod connection_maintainer; +mod constants; + +pub use config::P2PConfig;