mirror of
https://github.com/Cuprate/cuprate.git
synced 2025-02-02 03:06:36 +00:00
init cuprate-p2p
This commit is contained in:
parent
75a1db3fdc
commit
b9caee9335
10 changed files with 680 additions and 0 deletions
43
Cargo.lock
generated
43
Cargo.lock
generated
|
@ -642,6 +642,36 @@ dependencies = [
|
|||
"windows",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cuprate-p2p"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"cuprate-helper",
|
||||
"cuprate-test-utils",
|
||||
"dashmap",
|
||||
"fixed-bytes",
|
||||
"futures",
|
||||
"hex",
|
||||
"indexmap 2.2.6",
|
||||
"monero-address-book",
|
||||
"monero-p2p",
|
||||
"monero-pruning",
|
||||
"monero-serai",
|
||||
"monero-wire",
|
||||
"pin-project",
|
||||
"rand",
|
||||
"rand_distr",
|
||||
"rayon",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tower",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cuprate-test-utils"
|
||||
version = "0.1.0"
|
||||
|
@ -726,6 +756,19 @@ dependencies = [
|
|||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dashmap"
|
||||
version = "5.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"hashbrown 0.14.3",
|
||||
"lock_api",
|
||||
"once_cell",
|
||||
"parking_lot_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "deranged"
|
||||
version = "0.3.11"
|
||||
|
|
|
@ -11,6 +11,7 @@ members = [
|
|||
"net/fixed-bytes",
|
||||
"net/levin",
|
||||
"net/monero-wire",
|
||||
"p2p/cuprate-p2p",
|
||||
"p2p/monero-p2p",
|
||||
"p2p/address-book",
|
||||
"pruning",
|
||||
|
@ -49,6 +50,7 @@ crypto-bigint = { version = "0.5.5", default-features = false }
|
|||
crossbeam = { version = "0.8.4", default-features = false }
|
||||
curve25519-dalek = { version = "4.1.1", default-features = false }
|
||||
dalek-ff-group = { git = "https://github.com/Cuprate/serai.git", rev = "347d4cf", default-features = false }
|
||||
dashmap = { version = "5.5.3", default-features = false }
|
||||
dirs = { version = "5.0.1", default-features = false }
|
||||
futures = { version = "0.3.29", default-features = false }
|
||||
hex = { version = "0.4.3", default-features = false }
|
||||
|
@ -60,6 +62,7 @@ paste = { version = "1.0.14", default-features = false }
|
|||
pin-project = { version = "1.1.3", default-features = false }
|
||||
randomx-rs = { git = "https://github.com/Cuprate/randomx-rs.git", rev = "0028464", default-features = false }
|
||||
rand = { version = "0.8.5", default-features = false }
|
||||
rand_distr = { version = "0.4.3", default-features = false }
|
||||
rayon = { version = "1.9.0", default-features = false }
|
||||
serde_bytes = { version = "0.11.12", default-features = false }
|
||||
serde_json = { version = "1.0.108", default-features = false }
|
||||
|
|
38
p2p/cuprate-p2p/Cargo.toml
Normal file
38
p2p/cuprate-p2p/Cargo.toml
Normal file
|
@ -0,0 +1,38 @@
|
|||
[package]
|
||||
name = "cuprate-p2p"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
license = "MIT"
|
||||
authors = ["Boog900"]
|
||||
|
||||
[dependencies]
|
||||
fixed-bytes = { path = "../../net/fixed-bytes" }
|
||||
monero-wire = { path = "../../net/monero-wire" }
|
||||
monero-p2p = { path = "../monero-p2p", features = ["borsh"] }
|
||||
monero-address-book = { path = "../address-book" }
|
||||
monero-pruning = { path = "../../pruning" }
|
||||
cuprate-helper = { path = "../../helper", features = ["asynch"] }
|
||||
|
||||
monero-serai = { workspace = true, features = ["std"] }
|
||||
|
||||
tower = { workspace = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
rayon = { workspace = true }
|
||||
tokio-util = { workspace = true }
|
||||
tokio-stream = { workspace = true, features = ["sync", "time"] }
|
||||
futures = { workspace = true, features = ["std"] }
|
||||
pin-project = { workspace = true }
|
||||
dashmap = { workspace = true }
|
||||
|
||||
thiserror = { workspace = true }
|
||||
bytes = { workspace = true, features = ["std"] }
|
||||
indexmap = { workspace = true, features = ["std"] }
|
||||
rand = { workspace = true, features = ["std", "std_rng"] }
|
||||
rand_distr = { workspace = true, features = ["std"] }
|
||||
hex = { workspace = true, features = ["std"] }
|
||||
tracing = { workspace = true, features = ["std", "attributes"] }
|
||||
|
||||
[dev-dependencies]
|
||||
cuprate-test-utils = {path = "../../test-utils"}
|
||||
tracing-subscriber = { version = "0.3.18" }
|
||||
|
42
p2p/cuprate-p2p/src/config.rs
Normal file
42
p2p/cuprate-p2p/src/config.rs
Normal file
|
@ -0,0 +1,42 @@
|
|||
use cuprate_helper::network::Network;
|
||||
use monero_address_book::AddressBookConfig;
|
||||
|
||||
/// P2P config.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct P2PConfig {
|
||||
pub p2p_port: u16,
|
||||
|
||||
pub rpc_port: u16,
|
||||
|
||||
pub network: Network,
|
||||
|
||||
/// The number of outbound connections to make and try keep.
|
||||
pub outbound_connections: usize,
|
||||
/// The absolute maximum number of held outbound connections.
|
||||
///
|
||||
/// *Note:* Cuprate might make more connections than this to see if a peer is reachable or
|
||||
/// to get peers from that node, these connections are not held for long though.
|
||||
pub max_outbound_connections: usize,
|
||||
|
||||
/// The number of anchor connections to make.
|
||||
///
|
||||
/// An anchor connection is a connection which was held before last shutdown, anchor connections
|
||||
/// help to prevent certain attacks.
|
||||
pub anchor_connections: usize,
|
||||
|
||||
/// The percent of outbound peers that should be gray aka never connected to before.
|
||||
///
|
||||
/// Only values 0..=1 are valid.
|
||||
pub gray_peers_percent: f64,
|
||||
|
||||
/// The maximum amount of inbound peers
|
||||
pub max_inbound_connections: usize,
|
||||
|
||||
pub address_book_config: AddressBookConfig,
|
||||
}
|
||||
|
||||
impl P2PConfig {
|
||||
pub fn allowed_extra_connections(&self) -> usize {
|
||||
self.max_outbound_connections - self.outbound_connections
|
||||
}
|
||||
}
|
294
p2p/cuprate-p2p/src/connection_maintainer.rs
Normal file
294
p2p/cuprate-p2p/src/connection_maintainer.rs
Normal file
|
@ -0,0 +1,294 @@
|
|||
//! Outbound Connection Maintainer.
|
||||
//!
|
||||
//! This module handles maintaining the number of outbound connections defined in the [`P2PConfig`].
|
||||
//! It also handles making extra connections when the peer set is under load or when we need data that
|
||||
//! no connected peer has.
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use rand::{distributions::Bernoulli, prelude::*};
|
||||
use tokio::{
|
||||
sync::{mpsc, OwnedSemaphorePermit, Semaphore},
|
||||
time::{sleep, timeout},
|
||||
};
|
||||
use tower::{Service, ServiceExt};
|
||||
use tracing::{info, instrument, warn};
|
||||
|
||||
use monero_p2p::{
|
||||
client::{Client, ConnectRequest, HandshakeError},
|
||||
services::{AddressBookRequest, AddressBookResponse},
|
||||
AddressBook, NetworkZone,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
config::P2PConfig,
|
||||
constants::{
|
||||
HANDSHAKE_TIMEOUT, MAX_SEED_CONNECTIONS, OUTBOUND_CONNECTION_TIMEOUT, PEER_FIND_TIMEOUT,
|
||||
},
|
||||
peer_set::ClientPool,
|
||||
};
|
||||
|
||||
enum OutboundConnectorError {
|
||||
MaxConnections,
|
||||
FailedToConnectToSeeds,
|
||||
NoAvailablePeers,
|
||||
}
|
||||
|
||||
/// A request from the peer set to make an outbound connection.
|
||||
///
|
||||
/// This will only be sent when the peer set is under load from the rest of Cuprate or the peer
|
||||
/// set needs specific data that none of the currently connected peers have.
|
||||
pub struct MakeConnectionRequest {
|
||||
/// The block needed that no connected peers have due to pruning.
|
||||
block_needed: Option<u64>,
|
||||
}
|
||||
|
||||
/// The outbound connection (count) keeper.
|
||||
///
|
||||
/// This handles maintaining a minimum number of connections and making extra connections when needed, upto a maximum.
|
||||
pub struct OutboundConnectionKeeper<N: NetworkZone, A, C> {
|
||||
/// TODO.
|
||||
pub client_pool: Arc<ClientPool<N>>,
|
||||
/// The channel that tells us to make new outbound connections
|
||||
pub make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
|
||||
/// The address book service
|
||||
pub address_book_svc: A,
|
||||
/// The service to connect to a specific peer.
|
||||
pub connector_svc: C,
|
||||
/// A semaphore to keep the amount of outbound peers constant.
|
||||
pub outbound_semaphore: Arc<Semaphore>,
|
||||
/// The amount of peers we connected to because we needed more peers. If the `outbound_semaphore`
|
||||
/// is full, and we need to connect to more peers for blocks ro because not enough peers are ready
|
||||
/// we add a permit to the semaphore and keep track here, upto a value in config.
|
||||
pub extra_peers: usize,
|
||||
/// The p2p config.
|
||||
pub config: P2PConfig,
|
||||
/// The [`Bernoulli`] distribution, when sampled will return true if we should connect to a gray peer or
|
||||
/// false if we should connect to a white peer.
|
||||
///
|
||||
/// This is weighted to the percentage given in `config`.
|
||||
pub peer_type_gen: Bernoulli,
|
||||
}
|
||||
|
||||
impl<N: NetworkZone, A, C> OutboundConnectionKeeper<N, A, C>
|
||||
where
|
||||
A: AddressBook<N>,
|
||||
C: Service<ConnectRequest<N>, Response = Client<N>, Error = HandshakeError>,
|
||||
C::Future: Send + 'static,
|
||||
{
|
||||
pub fn new(
|
||||
config: &P2PConfig,
|
||||
client_pool: Arc<ClientPool<N>>,
|
||||
make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
|
||||
address_book_svc: A,
|
||||
connector_svc: C,
|
||||
) -> Self {
|
||||
let peer_type_gen = Bernoulli::new(config.gray_peers_percent)
|
||||
.expect("Gray peer percent is incorrect should be 0..=1");
|
||||
|
||||
Self {
|
||||
client_pool,
|
||||
make_connection_rx,
|
||||
address_book_svc,
|
||||
connector_svc,
|
||||
outbound_semaphore: Arc::new(Semaphore::new(config.outbound_connections)),
|
||||
extra_peers: 0,
|
||||
config: config.clone(),
|
||||
peer_type_gen,
|
||||
}
|
||||
}
|
||||
|
||||
/// Connects to random seeds to get peers and immediately disconnects
|
||||
#[instrument(level = "info", skip(self))]
|
||||
async fn connect_to_random_seeds(&mut self) -> Result<(), OutboundConnectorError> {
|
||||
let seeds = N::SEEDS.choose_multiple(&mut thread_rng(), MAX_SEED_CONNECTIONS);
|
||||
|
||||
if seeds.len() == 0 {
|
||||
panic!("No seed nodes available to get peers from");
|
||||
}
|
||||
|
||||
// This isn't really needed here to limit connections as the seed nodes will be dropped when we have got
|
||||
// peers from them.
|
||||
let semaphore = Arc::new(Semaphore::new(seeds.len()));
|
||||
|
||||
let mut allowed_errors = seeds.len();
|
||||
|
||||
let mut handshake_futs = FuturesUnordered::new();
|
||||
|
||||
for seed in seeds {
|
||||
info!("Getting peers from seed node: {}", seed);
|
||||
|
||||
let fut = timeout(
|
||||
HANDSHAKE_TIMEOUT,
|
||||
self.connector_svc
|
||||
.ready()
|
||||
.await
|
||||
.expect("Connector had an error in `poll_ready`")
|
||||
.call(ConnectRequest {
|
||||
addr: *seed,
|
||||
permit: semaphore
|
||||
.clone()
|
||||
.try_acquire_owned()
|
||||
.expect("This must have enough permits as we just set the amount."),
|
||||
}),
|
||||
);
|
||||
// Spawn the handshake on a separate task with a timeout, so we don't get stuck connecting to a peer.
|
||||
handshake_futs.push(tokio::spawn(fut));
|
||||
}
|
||||
|
||||
while let Some(res) = handshake_futs.next().await {
|
||||
if matches!(res, Err(_) | Ok(Err(_)) | Ok(Ok(Err(_)))) {
|
||||
allowed_errors -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
if allowed_errors == 0 {
|
||||
return Err(OutboundConnectorError::FailedToConnectToSeeds);
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Connects to a given outbound peer.
|
||||
#[instrument(level = "info", skip(self, permit), fields(%addr))]
|
||||
async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) {
|
||||
let client_pool = self.client_pool.clone();
|
||||
let connection_fut = self
|
||||
.connector_svc
|
||||
.ready()
|
||||
.await
|
||||
.expect("Connector had an error in `poll_ready`")
|
||||
.call(ConnectRequest { addr, permit });
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await {
|
||||
client_pool.add_new_client(peer);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Handles a request from the peer set for more peers.
|
||||
async fn handle_peer_request(
|
||||
&mut self,
|
||||
req: &MakeConnectionRequest,
|
||||
) -> Result<(), OutboundConnectorError> {
|
||||
// try to get a permit.
|
||||
let permit = self
|
||||
.outbound_semaphore
|
||||
.clone()
|
||||
.try_acquire_owned()
|
||||
.or_else(|_| {
|
||||
// if we can't get a permit add one if we are below the max number of connections.
|
||||
if self.extra_peers >= self.config.allowed_extra_connections() {
|
||||
// If we can't add a permit return an error.
|
||||
Err(OutboundConnectorError::MaxConnections)
|
||||
} else {
|
||||
self.outbound_semaphore.add_permits(1);
|
||||
self.extra_peers += 1;
|
||||
Ok(self.outbound_semaphore.clone().try_acquire_owned().unwrap())
|
||||
}
|
||||
})?;
|
||||
|
||||
// try to get a random peer on any network zone from the address book.
|
||||
let peer = self
|
||||
.address_book_svc
|
||||
.ready()
|
||||
.await
|
||||
.expect("Error in address book!")
|
||||
.call(AddressBookRequest::TakeRandomPeer {
|
||||
height: req.block_needed,
|
||||
})
|
||||
.await;
|
||||
|
||||
match peer {
|
||||
Err(_) => {
|
||||
// TODO: We should probably send peer requests to our connected peers rather than go to seeds.
|
||||
warn!("No peers in address book which are available and have the data we need. Getting peers from seed nodes.");
|
||||
|
||||
self.connect_to_random_seeds().await?;
|
||||
Err(OutboundConnectorError::NoAvailablePeers)
|
||||
}
|
||||
|
||||
Ok(AddressBookResponse::Peer(peer)) => {
|
||||
self.connect_to_outbound_peer(permit, peer.adr).await;
|
||||
Ok(())
|
||||
}
|
||||
Ok(_) => panic!("peer list sent incorrect response!"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles a free permit, by either connecting to a new peer or by removing a permit if we above the
|
||||
/// minimum number of outbound connections.
|
||||
#[instrument(level = "debug", skip(self, permit))]
|
||||
async fn handle_free_permit(
|
||||
&mut self,
|
||||
permit: OwnedSemaphorePermit,
|
||||
) -> Result<(), OutboundConnectorError> {
|
||||
if self.extra_peers > 0 {
|
||||
tracing::debug!(
|
||||
"Permit available but we are over the minimum number of peers, forgetting permit."
|
||||
);
|
||||
permit.forget();
|
||||
self.extra_peers -= 1;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
tracing::debug!("Permit available making outbound connection.");
|
||||
|
||||
let req = if self.peer_type_gen.sample(&mut thread_rng()) {
|
||||
AddressBookRequest::TakeRandomGrayPeer { height: None }
|
||||
} else {
|
||||
// This will try white peers first then gray.
|
||||
AddressBookRequest::TakeRandomPeer { height: None }
|
||||
};
|
||||
|
||||
let Ok(AddressBookResponse::Peer(peer)) = self
|
||||
.address_book_svc
|
||||
.ready()
|
||||
.await
|
||||
.expect("Error in address book!")
|
||||
.call(req)
|
||||
.await
|
||||
else {
|
||||
warn!("No peers in peer list to make connection to.");
|
||||
self.connect_to_random_seeds().await?;
|
||||
return Err(OutboundConnectorError::NoAvailablePeers);
|
||||
};
|
||||
|
||||
self.connect_to_outbound_peer(permit, peer.adr).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Runs the outbound connection count keeper.
|
||||
pub async fn run(mut self) {
|
||||
info!(
|
||||
"Starting outbound connection maintainer, target outbound connections: {}",
|
||||
self.config.outbound_connections
|
||||
);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
peer_req = self.make_connection_rx.recv() => {
|
||||
let Some(peer_req) = peer_req else {
|
||||
info!("Shutting down outbound connector, make_connection_rx closed.");
|
||||
return;
|
||||
};
|
||||
while self.handle_peer_request(&peer_req).await.is_err() {
|
||||
warn!("Failed to find peer to connect to or to add a permit, trying again in {} seconds", PEER_FIND_TIMEOUT.as_secs());
|
||||
sleep(PEER_FIND_TIMEOUT).await;
|
||||
}
|
||||
},
|
||||
// This future is not cancellation safe as you will lose your space in the queue but as we are the only place
|
||||
// that actually requires permits that should be ok.
|
||||
Ok(permit) = self.outbound_semaphore.clone().acquire_owned() => {
|
||||
if self.handle_free_permit(permit).await.is_err() {
|
||||
// if we got an error then we still have a permit free so to prevent this from just looping
|
||||
// uncontrollably add a timeout.
|
||||
sleep(OUTBOUND_CONNECTION_TIMEOUT).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
45
p2p/cuprate-p2p/src/constants.rs
Normal file
45
p2p/cuprate-p2p/src/constants.rs
Normal file
|
@ -0,0 +1,45 @@
|
|||
use std::time::Duration;
|
||||
|
||||
pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
pub(crate) const CHAIN_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
pub(crate) const BLOCK_REQUEST_TIMEOUT: Duration = Duration::from_secs(20);
|
||||
|
||||
pub(crate) const BLOCK_REQUEST_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
||||
pub(crate) const SEED_CONNECTION_RETRY_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
pub(crate) const CONCURRENT_PEER_LIST_REQUESTS: usize = 3;
|
||||
|
||||
pub(crate) const MAX_SEED_CONNECTIONS: usize = 3;
|
||||
|
||||
pub(crate) const PEER_FIND_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
pub(crate) const OUTBOUND_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
/// The duration of a short ban (1 hour).
|
||||
pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 60);
|
||||
|
||||
/// The duration of a medium ban (24 hours).
|
||||
pub(crate) const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24);
|
||||
|
||||
pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5);
|
||||
|
||||
pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_OUTBOUND: Duration = Duration::from_millis(2500);
|
||||
|
||||
pub(crate) const SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT: usize = 1024 * 1024 * 60;
|
||||
|
||||
/// The limit on the amount of transactions kept in the broadcast channel.
|
||||
///
|
||||
/// A transaction is kept in the broadcast channel until all nodes have broadcast it.
|
||||
///
|
||||
/// Because of internal implementation details this limit will ALWAYS be hit i.e. a tx will stay in the
|
||||
/// channel until [`MAX_TXS_IN_BROADCAST_CHANNEL`] more txs are added.
|
||||
pub(crate) const MAX_TXS_IN_BROADCAST_CHANNEL: usize = 50;
|
||||
|
||||
pub(crate) const INCOMING_BLOCKS_CACHE_SIZE: usize = 10 * 1024 * 1024;
|
||||
|
||||
pub(crate) const NUMBER_OF_BLOCKS_TO_REQUEST: usize = 100;
|
||||
|
||||
pub(crate) const CHAIN_REQUESTS_TO_SEND: usize = 2;
|
18
p2p/cuprate-p2p/src/lib.rs
Normal file
18
p2p/cuprate-p2p/src/lib.rs
Normal file
|
@ -0,0 +1,18 @@
|
|||
//! Cuprate's P2P Crate.
|
||||
//!
|
||||
//! This crate contains a `PeerSet` which holds connected peers on a single [`NetworkZone`](monero_p2p::NetworkZone).
|
||||
//! The `PeerSet` has methods to get peers by direction (inbound/outbound) or by a custom method like a load balancing
|
||||
//! algorithm.
|
||||
//!
|
||||
//! This crate also contains the different routing methods that control how messages should be sent, i.e. broadcast to all,
|
||||
//! or send to a single peer.
|
||||
//!
|
||||
|
||||
#![allow(dead_code)]
|
||||
|
||||
pub mod config;
|
||||
pub mod connection_maintainer;
|
||||
mod constants;
|
||||
mod peer_set;
|
||||
|
||||
pub use config::P2PConfig;
|
97
p2p/cuprate-p2p/src/peer_set.rs
Normal file
97
p2p/cuprate-p2p/src/peer_set.rs
Normal file
|
@ -0,0 +1,97 @@
|
|||
//! This module contains the peer set and related functionality.
|
||||
//!
|
||||
use std::sync::Arc;
|
||||
|
||||
use dashmap::{DashMap, DashSet};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use monero_p2p::{
|
||||
client::{Client, InternalPeerID},
|
||||
ConnectionDirection, NetworkZone,
|
||||
};
|
||||
|
||||
mod disconnect_monitor;
|
||||
mod drop_guard_client;
|
||||
|
||||
pub use drop_guard_client::ClientPoolGuard;
|
||||
use monero_p2p::handles::ConnectionHandle;
|
||||
|
||||
pub struct ClientPool<N: NetworkZone> {
|
||||
clients: DashMap<InternalPeerID<N::Addr>, Client<N>>,
|
||||
outbound_clients: DashSet<InternalPeerID<N::Addr>>,
|
||||
|
||||
new_connection_tx: mpsc::UnboundedSender<(ConnectionHandle, InternalPeerID<N::Addr>)>,
|
||||
}
|
||||
|
||||
impl<N: NetworkZone> ClientPool<N> {
|
||||
pub fn new() -> Arc<ClientPool<N>> {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
|
||||
let pool = Arc::new(ClientPool {
|
||||
clients: DashMap::new(),
|
||||
outbound_clients: DashSet::new(),
|
||||
new_connection_tx: tx,
|
||||
});
|
||||
|
||||
tokio::spawn(disconnect_monitor::disconnect_monitor(rx, pool.clone()));
|
||||
|
||||
pool
|
||||
}
|
||||
|
||||
fn add_client(&self, client: Client<N>) {
|
||||
let handle = client.info.handle.clone();
|
||||
let id = client.info.id;
|
||||
|
||||
if handle.is_closed() {
|
||||
return;
|
||||
}
|
||||
|
||||
if client.info.direction == ConnectionDirection::OutBound {
|
||||
self.outbound_clients.insert(id);
|
||||
}
|
||||
|
||||
let res = self.clients.insert(id, client);
|
||||
debug_assert!(res.is_none());
|
||||
|
||||
// TODO: document how this prevents a race condition.
|
||||
if handle.is_closed() {
|
||||
self.remove_client(&id);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_new_client(&self, client: Client<N>) {
|
||||
self.new_connection_tx
|
||||
.send((client.info.handle.clone(), client.info.id))
|
||||
.unwrap();
|
||||
|
||||
self.add_client(client);
|
||||
}
|
||||
|
||||
fn remove_client(&self, peer: &InternalPeerID<N::Addr>) -> Option<Client<N>> {
|
||||
self.outbound_clients.remove(peer);
|
||||
|
||||
self.clients.remove(peer).map(|(_, client)| client)
|
||||
}
|
||||
|
||||
pub fn borrow_client(
|
||||
self: &Arc<Self>,
|
||||
peer: &InternalPeerID<N::Addr>,
|
||||
) -> Option<ClientPoolGuard<N>> {
|
||||
self.outbound_clients.remove(peer);
|
||||
|
||||
self.remove_client(peer).map(|client| ClientPoolGuard {
|
||||
pool: Arc::clone(self),
|
||||
client: Some(client),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn borrow_clients(
|
||||
self: &Arc<Self>,
|
||||
peers: &[InternalPeerID<N::Addr>],
|
||||
) -> Vec<ClientPoolGuard<N>> {
|
||||
peers
|
||||
.iter()
|
||||
.filter_map(|peer| self.borrow_client(peer))
|
||||
.collect()
|
||||
}
|
||||
}
|
64
p2p/cuprate-p2p/src/peer_set/disconnect_monitor.rs
Normal file
64
p2p/cuprate-p2p/src/peer_set/disconnect_monitor.rs
Normal file
|
@ -0,0 +1,64 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_util::sync::WaitForCancellationFutureOwned;
|
||||
use tracing::instrument;
|
||||
|
||||
use monero_p2p::{client::InternalPeerID, handles::ConnectionHandle, NetworkZone};
|
||||
|
||||
use super::ClientPool;
|
||||
|
||||
#[instrument(level="info", skip_all, fields(network=N::NAME))]
|
||||
pub async fn disconnect_monitor<N: NetworkZone>(
|
||||
mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID<N::Addr>)>,
|
||||
client_pool: Arc<ClientPool<N>>,
|
||||
) {
|
||||
tracing::info!("Starting peer disconnect monitor.");
|
||||
|
||||
let mut futs: FuturesUnordered<PeerDisconnectFut<N>> = FuturesUnordered::new();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some((con_handle, peer_id)) = new_connection_rx.recv() => {
|
||||
tracing::debug!("Monitoring {peer_id} for disconnect");
|
||||
futs.push(PeerDisconnectFut {
|
||||
closed_fut: con_handle.closed(),
|
||||
peer_id: Some(peer_id),
|
||||
});
|
||||
}
|
||||
Some(peer_id) = futs.next() => {
|
||||
tracing::debug!("{peer_id} has disconnecting, removing from peer_set.");
|
||||
client_pool.remove_client(&peer_id);
|
||||
}
|
||||
else => {
|
||||
tracing::info!("Peer disconnect monitor shutting down.");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project::pin_project]
|
||||
struct PeerDisconnectFut<N: NetworkZone> {
|
||||
#[pin]
|
||||
closed_fut: WaitForCancellationFutureOwned,
|
||||
peer_id: Option<InternalPeerID<N::Addr>>,
|
||||
}
|
||||
|
||||
impl<N: NetworkZone> Future for PeerDisconnectFut<N> {
|
||||
type Output = InternalPeerID<N::Addr>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
|
||||
this.closed_fut
|
||||
.poll(cx)
|
||||
.map(|_| this.peer_id.take().unwrap())
|
||||
}
|
||||
}
|
36
p2p/cuprate-p2p/src/peer_set/drop_guard_client.rs
Normal file
36
p2p/cuprate-p2p/src/peer_set/drop_guard_client.rs
Normal file
|
@ -0,0 +1,36 @@
|
|||
use monero_p2p::client::Client;
|
||||
use std::{
|
||||
ops::{Deref, DerefMut},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use monero_p2p::NetworkZone;
|
||||
|
||||
use crate::peer_set::ClientPool;
|
||||
|
||||
pub struct ClientPoolGuard<N: NetworkZone> {
|
||||
pub(super) pool: Arc<ClientPool<N>>,
|
||||
pub(super) client: Option<Client<N>>,
|
||||
}
|
||||
|
||||
impl<N: NetworkZone> Deref for ClientPoolGuard<N> {
|
||||
type Target = Client<N>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.client.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: NetworkZone> DerefMut for ClientPoolGuard<N> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.client.as_mut().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: NetworkZone> Drop for ClientPoolGuard<N> {
|
||||
fn drop(&mut self) {
|
||||
let client = self.client.take().unwrap();
|
||||
|
||||
self.pool.add_client(client);
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue