Merge branch 'main' into p2p-request-handler
Some checks failed
Deny / audit (push) Has been cancelled

This commit is contained in:
Boog900 2024-11-20 19:23:25 +00:00
commit a3c9b4e994
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
22 changed files with 668 additions and 418 deletions

1
Cargo.lock generated
View file

@ -838,7 +838,6 @@ dependencies = [
"cuprate-test-utils", "cuprate-test-utils",
"cuprate-types", "cuprate-types",
"cuprate-wire", "cuprate-wire",
"dashmap",
"futures", "futures",
"indexmap", "indexmap",
"monero-serai", "monero-serai",

View file

@ -12,7 +12,7 @@ use tracing::instrument;
use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse}; use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse};
use cuprate_p2p::{ use cuprate_p2p::{
block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}, block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
NetworkInterface, NetworkInterface, PeerSetRequest, PeerSetResponse,
}; };
use cuprate_p2p_core::ClearNet; use cuprate_p2p_core::ClearNet;
@ -28,15 +28,11 @@ pub enum SyncerError {
} }
/// The syncer tasks that makes sure we are fully synchronised with our connected peers. /// The syncer tasks that makes sure we are fully synchronised with our connected peers.
#[expect(
clippy::significant_drop_tightening,
reason = "Client pool which will be removed"
)]
#[instrument(level = "debug", skip_all)] #[instrument(level = "debug", skip_all)]
pub async fn syncer<C, CN>( pub async fn syncer<C, CN>(
mut context_svc: C, mut context_svc: C,
our_chain: CN, our_chain: CN,
clearnet_interface: NetworkInterface<ClearNet>, mut clearnet_interface: NetworkInterface<ClearNet>,
incoming_block_batch_tx: mpsc::Sender<BlockBatch>, incoming_block_batch_tx: mpsc::Sender<BlockBatch>,
stop_current_block_downloader: Arc<Notify>, stop_current_block_downloader: Arc<Notify>,
block_downloader_config: BlockDownloaderConfig, block_downloader_config: BlockDownloaderConfig,
@ -67,8 +63,6 @@ where
unreachable!(); unreachable!();
}; };
let client_pool = clearnet_interface.client_pool();
tracing::debug!("Waiting for new sync info in top sync channel"); tracing::debug!("Waiting for new sync info in top sync channel");
loop { loop {
@ -79,9 +73,20 @@ where
check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?; check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?;
let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context(); let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context();
if !client_pool.contains_client_with_more_cumulative_difficulty( let PeerSetResponse::MostPoWSeen {
raw_blockchain_context.cumulative_difficulty, cumulative_difficulty,
) { ..
} = clearnet_interface
.peer_set()
.ready()
.await?
.call(PeerSetRequest::MostPoWSeen)
.await?
else {
unreachable!();
};
if cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty {
continue; continue;
} }

View file

@ -59,7 +59,7 @@ pub fn dandelion_router(clear_net: NetworkInterface<ClearNet>) -> ConcreteDandel
diffuse_service::DiffuseService { diffuse_service::DiffuseService {
clear_net_broadcast_service: clear_net.broadcast_svc(), clear_net_broadcast_service: clear_net.broadcast_svc(),
}, },
stem_service::OutboundPeerStream { clear_net }, stem_service::OutboundPeerStream::new(clear_net),
DANDELION_CONFIG, DANDELION_CONFIG,
) )
} }

View file

@ -1,14 +1,15 @@
use std::{ use std::{
future::Future,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{ready, Context, Poll},
}; };
use bytes::Bytes; use bytes::Bytes;
use futures::Stream; use futures::{future::BoxFuture, FutureExt, Stream};
use tower::Service; use tower::Service;
use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer}; use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer};
use cuprate_p2p::{ClientPoolDropGuard, NetworkInterface}; use cuprate_p2p::{ClientDropGuard, NetworkInterface, PeerSetRequest, PeerSetResponse};
use cuprate_p2p_core::{ use cuprate_p2p_core::{
client::{Client, InternalPeerID}, client::{Client, InternalPeerID},
ClearNet, NetworkZone, PeerRequest, ProtocolRequest, ClearNet, NetworkZone, PeerRequest, ProtocolRequest,
@ -19,7 +20,17 @@ use crate::{p2p::CrossNetworkInternalPeerId, txpool::dandelion::DandelionTx};
/// The dandelion outbound peer stream. /// The dandelion outbound peer stream.
pub struct OutboundPeerStream { pub struct OutboundPeerStream {
pub clear_net: NetworkInterface<ClearNet>, clear_net: NetworkInterface<ClearNet>,
state: OutboundPeerStreamState,
}
impl OutboundPeerStream {
pub const fn new(clear_net: NetworkInterface<ClearNet>) -> Self {
Self {
clear_net,
state: OutboundPeerStreamState::Standby,
}
}
} }
impl Stream for OutboundPeerStream { impl Stream for OutboundPeerStream {
@ -28,23 +39,49 @@ impl Stream for OutboundPeerStream {
tower::BoxError, tower::BoxError,
>; >;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// TODO: make the outbound peer choice random. loop {
Poll::Ready(Some(Ok(self match &mut self.state {
.clear_net OutboundPeerStreamState::Standby => {
.client_pool() let peer_set = self.clear_net.peer_set();
.outbound_client() let res = ready!(peer_set.poll_ready(cx));
.map_or(OutboundPeer::Exhausted, |client| {
OutboundPeer::Peer( self.state = OutboundPeerStreamState::AwaitingPeer(
CrossNetworkInternalPeerId::ClearNet(client.info.id), peer_set.call(PeerSetRequest::StemPeer).boxed(),
StemPeerService(client), );
) }
})))) OutboundPeerStreamState::AwaitingPeer(fut) => {
let res = ready!(fut.poll_unpin(cx));
return Poll::Ready(Some(res.map(|res| {
let PeerSetResponse::StemPeer(stem_peer) = res else {
unreachable!()
};
match stem_peer {
Some(peer) => OutboundPeer::Peer(
CrossNetworkInternalPeerId::ClearNet(peer.info.id),
StemPeerService(peer),
),
None => OutboundPeer::Exhausted,
}
})));
}
}
}
} }
} }
/// The state of the [`OutboundPeerStream`].
enum OutboundPeerStreamState {
/// Standby state.
Standby,
/// Awaiting a response from the peer-set.
AwaitingPeer(BoxFuture<'static, Result<PeerSetResponse<ClearNet>, tower::BoxError>>),
}
/// The stem service, used to send stem txs. /// The stem service, used to send stem txs.
pub struct StemPeerService<N: NetworkZone>(ClientPoolDropGuard<N>); pub struct StemPeerService<N: NetworkZone>(ClientDropGuard<N>);
impl<N: NetworkZone> Service<StemRequest<DandelionTx>> for StemPeerService<N> { impl<N: NetworkZone> Service<StemRequest<DandelionTx>> for StemPeerService<N> {
type Response = <Client<N> as Service<PeerRequest>>::Response; type Response = <Client<N> as Service<PeerRequest>>::Response;

View file

@ -157,6 +157,11 @@
--- ---
- [🟢 Monero oddities](oddities/intro.md)
- [🟡 Little-endian IPv4 addresses](oddities/le-ipv4.md)
---
- [⚪️ Appendix](appendix/intro.md) - [⚪️ Appendix](appendix/intro.md)
- [🟢 Crates](appendix/crates.md) - [🟢 Crates](appendix/crates.md)
- [🔴 Contributing](appendix/contributing.md) - [🔴 Contributing](appendix/contributing.md)

View file

@ -0,0 +1,37 @@
# Monero oddities
This section is a list of any peculiar, interesting,
or non-standard behavior that Monero has that is not
planned on being changed or deprecated.
This section exists to hold all the small yet noteworthy knowledge in one place,
instead of in any single contributor's mind.
These are usually behaviors stemming from implementation rather than protocol/cryptography.
## Formatting
This is the markdown formatting for each entry in this section.
If applicable, consider using this formatting when adding to this section.
```md
# <concise_title_of_the_behavior>
## What
A detailed description of the behavior.
## Expected
The norm or standard behavior that is usually expected.
## Why
The reasoning behind why this behavior exists and/or
any links to more detailed discussion on the behavior.
## Affects
A (potentially non-exhaustive) list of places that this behavior can/does affect.
## Example
An example link or section of code where the behavior occurs.
## Source
A link to original `monerod` code that defines the behavior.
```

View file

@ -0,0 +1,24 @@
# Little-endian IPv4 addresses
## What
Monero encodes IPv4 addresses in [little-endian](https://en.wikipedia.org/wiki/Endianness) byte order.
## Expected
In general, [networking-related protocols/code use _networking order_ (big-endian)](https://en.wikipedia.org/wiki/Endianness#Networking).
## Why
TODO
- <https://github.com/monero-project/monero/issues/3826>
- <https://github.com/monero-project/monero/pull/5544>
## Affects
Any representation and (de)serialization of IPv4 addresses must keep little
endian in-mind, e.g. the P2P wire format or `int` encoded IPv4 addresses in RPC.
For example, [the `ip` field in `set_bans`](https://www.getmonero.org/resources/developer-guides/daemon-rpc.html#set_bans).
For Cuprate, this means Rust's [`Ipv4Addr::from_bits/from`](https://doc.rust-lang.org/1.82.0/src/core/net/ip_addr.rs.html#1182) cannot be used in these cases as [it assumes big-endian encoding](https://doc.rust-lang.org/1.82.0/src/core/net/ip_addr.rs.html#540).
## Source
- <https://github.com/monero-project/monero/blob/893916ad091a92e765ce3241b94e706ad012b62a/contrib/epee/include/net/net_utils_base.h#L97>

View file

@ -27,9 +27,11 @@ mod connector;
pub mod handshaker; pub mod handshaker;
mod request_handler; mod request_handler;
mod timeout_monitor; mod timeout_monitor;
mod weak;
pub use connector::{ConnectRequest, Connector}; pub use connector::{ConnectRequest, Connector};
pub use handshaker::{DoHandshakeRequest, HandshakeError, HandshakerBuilder}; pub use handshaker::{DoHandshakeRequest, HandshakeError, HandshakerBuilder};
pub use weak::WeakClient;
/// An internal identifier for a given peer, will be their address if known /// An internal identifier for a given peer, will be their address if known
/// or a random u128 if not. /// or a random u128 if not.
@ -128,6 +130,17 @@ impl<Z: NetworkZone> Client<Z> {
} }
.into() .into()
} }
/// Create a [`WeakClient`] for this [`Client`].
pub fn downgrade(&self) -> WeakClient<Z> {
WeakClient {
info: self.info.clone(),
connection_tx: self.connection_tx.downgrade(),
semaphore: self.semaphore.clone(),
permit: None,
error: self.error.clone(),
}
}
} }
impl<Z: NetworkZone> Service<PeerRequest> for Client<Z> { impl<Z: NetworkZone> Service<PeerRequest> for Client<Z> {

View file

@ -0,0 +1,114 @@
use std::task::{ready, Context, Poll};
use futures::channel::oneshot;
use tokio::sync::{mpsc, OwnedSemaphorePermit};
use tokio_util::sync::PollSemaphore;
use tower::Service;
use cuprate_helper::asynch::InfallibleOneshotReceiver;
use crate::{
client::{connection, PeerInformation},
NetworkZone, PeerError, PeerRequest, PeerResponse, SharedError,
};
/// A weak handle to a [`Client`](super::Client).
///
/// When this is dropped the peer will not be disconnected.
pub struct WeakClient<N: NetworkZone> {
/// Information on the connected peer.
pub info: PeerInformation<N::Addr>,
/// The channel to the [`Connection`](connection::Connection) task.
pub(super) connection_tx: mpsc::WeakSender<connection::ConnectionTaskRequest>,
/// The semaphore that limits the requests sent to the peer.
pub(super) semaphore: PollSemaphore,
/// A permit for the semaphore, will be [`Some`] after `poll_ready` returns ready.
pub(super) permit: Option<OwnedSemaphorePermit>,
/// The error slot shared between the [`Client`] and [`Connection`](connection::Connection).
pub(super) error: SharedError<PeerError>,
}
impl<N: NetworkZone> WeakClient<N> {
/// Internal function to set an error on the [`SharedError`].
fn set_err(&self, err: PeerError) -> tower::BoxError {
let err_str = err.to_string();
match self.error.try_insert_err(err) {
Ok(()) => err_str,
Err(e) => e.to_string(),
}
.into()
}
}
impl<Z: NetworkZone> Service<PeerRequest> for WeakClient<Z> {
type Response = PeerResponse;
type Error = tower::BoxError;
type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if let Some(err) = self.error.try_get_err() {
return Poll::Ready(Err(err.to_string().into()));
}
if self.connection_tx.strong_count() == 0 {
let err = self.set_err(PeerError::ClientChannelClosed);
return Poll::Ready(Err(err));
}
if self.permit.is_some() {
return Poll::Ready(Ok(()));
}
let permit = ready!(self.semaphore.poll_acquire(cx))
.expect("Client semaphore should not be closed!");
self.permit = Some(permit);
Poll::Ready(Ok(()))
}
#[expect(clippy::significant_drop_tightening)]
fn call(&mut self, request: PeerRequest) -> Self::Future {
let permit = self
.permit
.take()
.expect("poll_ready did not return ready before call to call");
let (tx, rx) = oneshot::channel();
let req = connection::ConnectionTaskRequest {
response_channel: tx,
request,
permit: Some(permit),
};
match self.connection_tx.upgrade() {
None => {
self.set_err(PeerError::ClientChannelClosed);
let resp = Err(PeerError::ClientChannelClosed.into());
drop(req.response_channel.send(resp));
}
Some(sender) => {
if let Err(e) = sender.try_send(req) {
// The connection task could have closed between a call to `poll_ready` and the call to
// `call`, which means if we don't handle the error here the receiver would panic.
use mpsc::error::TrySendError;
match e {
TrySendError::Closed(req) | TrySendError::Full(req) => {
self.set_err(PeerError::ClientChannelClosed);
let resp = Err(PeerError::ClientChannelClosed.into());
drop(req.response_channel.send(resp));
}
}
}
}
}
rx.into()
}
}

View file

@ -20,12 +20,12 @@ monero-serai = { workspace = true, features = ["std"] }
tower = { workspace = true, features = ["buffer"] } tower = { workspace = true, features = ["buffer"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
rayon = { workspace = true }
tokio-util = { workspace = true } tokio-util = { workspace = true }
rayon = { workspace = true }
tokio-stream = { workspace = true, features = ["sync", "time"] } tokio-stream = { workspace = true, features = ["sync", "time"] }
futures = { workspace = true, features = ["std"] } futures = { workspace = true, features = ["std"] }
pin-project = { workspace = true } pin-project = { workspace = true }
dashmap = { workspace = true } indexmap = { workspace = true, features = ["std"] }
thiserror = { workspace = true } thiserror = { workspace = true }
bytes = { workspace = true, features = ["std"] } bytes = { workspace = true, features = ["std"] }

View file

@ -8,7 +8,6 @@
use std::{ use std::{
cmp::{max, min, Reverse}, cmp::{max, min, Reverse},
collections::{BTreeMap, BinaryHeap}, collections::{BTreeMap, BinaryHeap},
sync::Arc,
time::Duration, time::Duration,
}; };
@ -18,7 +17,7 @@ use tokio::{
task::JoinSet, task::JoinSet,
time::{interval, timeout, MissedTickBehavior}, time::{interval, timeout, MissedTickBehavior},
}; };
use tower::{Service, ServiceExt}; use tower::{util::BoxCloneService, Service, ServiceExt};
use tracing::{instrument, Instrument, Span}; use tracing::{instrument, Instrument, Span};
use cuprate_async_buffer::{BufferAppender, BufferStream}; use cuprate_async_buffer::{BufferAppender, BufferStream};
@ -27,11 +26,11 @@ use cuprate_p2p_core::{handles::ConnectionHandle, NetworkZone};
use cuprate_pruning::PruningSeed; use cuprate_pruning::PruningSeed;
use crate::{ use crate::{
client_pool::{ClientPool, ClientPoolDropGuard},
constants::{ constants::{
BLOCK_DOWNLOADER_REQUEST_TIMEOUT, EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED, LONG_BAN, BLOCK_DOWNLOADER_REQUEST_TIMEOUT, EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED, LONG_BAN,
MAX_BLOCK_BATCH_LEN, MAX_DOWNLOAD_FAILURES, MAX_BLOCK_BATCH_LEN, MAX_DOWNLOAD_FAILURES,
}, },
peer_set::ClientDropGuard,
}; };
mod block_queue; mod block_queue;
@ -41,6 +40,7 @@ mod request_chain;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
use crate::peer_set::{PeerSetRequest, PeerSetResponse};
use block_queue::{BlockQueue, ReadyQueueBatch}; use block_queue::{BlockQueue, ReadyQueueBatch};
use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker}; use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker};
use download_batch::download_batch_task; use download_batch::download_batch_task;
@ -135,7 +135,7 @@ pub enum ChainSvcResponse {
/// call this function again, so it can start the search again. /// call this function again, so it can start the search again.
#[instrument(level = "error", skip_all, name = "block_downloader")] #[instrument(level = "error", skip_all, name = "block_downloader")]
pub fn download_blocks<N: NetworkZone, C>( pub fn download_blocks<N: NetworkZone, C>(
client_pool: Arc<ClientPool<N>>, peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
our_chain_svc: C, our_chain_svc: C,
config: BlockDownloaderConfig, config: BlockDownloaderConfig,
) -> BufferStream<BlockBatch> ) -> BufferStream<BlockBatch>
@ -147,8 +147,7 @@ where
{ {
let (buffer_appender, buffer_stream) = cuprate_async_buffer::new_buffer(config.buffer_size); let (buffer_appender, buffer_stream) = cuprate_async_buffer::new_buffer(config.buffer_size);
let block_downloader = let block_downloader = BlockDownloader::new(peer_set, our_chain_svc, buffer_appender, config);
BlockDownloader::new(client_pool, our_chain_svc, buffer_appender, config);
tokio::spawn( tokio::spawn(
block_downloader block_downloader
@ -186,8 +185,8 @@ where
/// - download an already requested batch of blocks (this might happen due to an error in the previous request /// - download an already requested batch of blocks (this might happen due to an error in the previous request
/// or because the queue of ready blocks is too large, so we need the oldest block to clear it). /// or because the queue of ready blocks is too large, so we need the oldest block to clear it).
struct BlockDownloader<N: NetworkZone, C> { struct BlockDownloader<N: NetworkZone, C> {
/// The client pool. /// The peer set.
client_pool: Arc<ClientPool<N>>, peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
/// The service that holds our current chain state. /// The service that holds our current chain state.
our_chain_svc: C, our_chain_svc: C,
@ -208,7 +207,7 @@ struct BlockDownloader<N: NetworkZone, C> {
/// ///
/// Returns a result of the chain entry or an error. /// Returns a result of the chain entry or an error.
#[expect(clippy::type_complexity)] #[expect(clippy::type_complexity)]
chain_entry_task: JoinSet<Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError>>, chain_entry_task: JoinSet<Result<(ClientDropGuard<N>, ChainEntry<N>), BlockDownloadError>>,
/// The current inflight requests. /// The current inflight requests.
/// ///
@ -235,13 +234,13 @@ where
{ {
/// Creates a new [`BlockDownloader`] /// Creates a new [`BlockDownloader`]
fn new( fn new(
client_pool: Arc<ClientPool<N>>, peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
our_chain_svc: C, our_chain_svc: C,
buffer_appender: BufferAppender<BlockBatch>, buffer_appender: BufferAppender<BlockBatch>,
config: BlockDownloaderConfig, config: BlockDownloaderConfig,
) -> Self { ) -> Self {
Self { Self {
client_pool, peer_set,
our_chain_svc, our_chain_svc,
amount_of_blocks_to_request: config.initial_batch_size, amount_of_blocks_to_request: config.initial_batch_size,
amount_of_blocks_to_request_updated_at: 0, amount_of_blocks_to_request_updated_at: 0,
@ -259,7 +258,7 @@ where
fn check_pending_peers( fn check_pending_peers(
&mut self, &mut self,
chain_tracker: &mut ChainTracker<N>, chain_tracker: &mut ChainTracker<N>,
pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>, pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientDropGuard<N>>>,
) { ) {
tracing::debug!("Checking if we can give any work to pending peers."); tracing::debug!("Checking if we can give any work to pending peers.");
@ -286,11 +285,11 @@ where
/// This function will find the batch(es) that we are waiting on to clear our ready queue and sends another request /// This function will find the batch(es) that we are waiting on to clear our ready queue and sends another request
/// for them. /// for them.
/// ///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the batch according to its pruning seed. /// Returns the [`ClientDropGuard`] back if it doesn't have the batch according to its pruning seed.
fn request_inflight_batch_again( fn request_inflight_batch_again(
&mut self, &mut self,
client: ClientPoolDropGuard<N>, client: ClientDropGuard<N>,
) -> Option<ClientPoolDropGuard<N>> { ) -> Option<ClientDropGuard<N>> {
tracing::debug!( tracing::debug!(
"Requesting an inflight batch, current ready queue size: {}", "Requesting an inflight batch, current ready queue size: {}",
self.block_queue.size() self.block_queue.size()
@ -336,13 +335,13 @@ where
/// ///
/// The batch requested will depend on our current state, failed batches will be prioritised. /// The batch requested will depend on our current state, failed batches will be prioritised.
/// ///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according /// Returns the [`ClientDropGuard`] back if it doesn't have the data we currently need according
/// to its pruning seed. /// to its pruning seed.
fn request_block_batch( fn request_block_batch(
&mut self, &mut self,
chain_tracker: &mut ChainTracker<N>, chain_tracker: &mut ChainTracker<N>,
client: ClientPoolDropGuard<N>, client: ClientDropGuard<N>,
) -> Option<ClientPoolDropGuard<N>> { ) -> Option<ClientDropGuard<N>> {
tracing::trace!("Using peer to request a batch of blocks."); tracing::trace!("Using peer to request a batch of blocks.");
// First look to see if we have any failed requests. // First look to see if we have any failed requests.
while let Some(failed_request) = self.failed_batches.peek() { while let Some(failed_request) = self.failed_batches.peek() {
@ -416,13 +415,13 @@ where
/// This function will use our current state to decide if we should send a request for a chain entry /// This function will use our current state to decide if we should send a request for a chain entry
/// or if we should request a batch of blocks. /// or if we should request a batch of blocks.
/// ///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according /// Returns the [`ClientDropGuard`] back if it doesn't have the data we currently need according
/// to its pruning seed. /// to its pruning seed.
fn try_handle_free_client( fn try_handle_free_client(
&mut self, &mut self,
chain_tracker: &mut ChainTracker<N>, chain_tracker: &mut ChainTracker<N>,
client: ClientPoolDropGuard<N>, client: ClientDropGuard<N>,
) -> Option<ClientPoolDropGuard<N>> { ) -> Option<ClientDropGuard<N>> {
// We send 2 requests, so if one of them is slow or doesn't have the next chain, we still have a backup. // We send 2 requests, so if one of them is slow or doesn't have the next chain, we still have a backup.
if self.chain_entry_task.len() < 2 if self.chain_entry_task.len() < 2
// If we have had too many failures then assume the tip has been found so no more chain entries. // If we have had too many failures then assume the tip has been found so no more chain entries.
@ -463,7 +462,7 @@ where
async fn check_for_free_clients( async fn check_for_free_clients(
&mut self, &mut self,
chain_tracker: &mut ChainTracker<N>, chain_tracker: &mut ChainTracker<N>,
pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>, pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientDropGuard<N>>>,
) -> Result<(), BlockDownloadError> { ) -> Result<(), BlockDownloadError> {
tracing::debug!("Checking for free peers"); tracing::debug!("Checking for free peers");
@ -478,10 +477,19 @@ where
panic!("Chain service returned wrong response."); panic!("Chain service returned wrong response.");
}; };
for client in self let PeerSetResponse::PeersWithMorePoW(clients) = self
.client_pool .peer_set
.clients_with_more_cumulative_difficulty(current_cumulative_difficulty) .ready()
{ .await?
.call(PeerSetRequest::PeersWithMorePoW(
current_cumulative_difficulty,
))
.await?
else {
unreachable!();
};
for client in clients {
pending_peers pending_peers
.entry(client.info.pruning_seed) .entry(client.info.pruning_seed)
.or_default() .or_default()
@ -497,9 +505,9 @@ where
async fn handle_download_batch_res( async fn handle_download_batch_res(
&mut self, &mut self,
start_height: usize, start_height: usize,
res: Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError>, res: Result<(ClientDropGuard<N>, BlockBatch), BlockDownloadError>,
chain_tracker: &mut ChainTracker<N>, chain_tracker: &mut ChainTracker<N>,
pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>, pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientDropGuard<N>>>,
) -> Result<(), BlockDownloadError> { ) -> Result<(), BlockDownloadError> {
tracing::debug!("Handling block download response"); tracing::debug!("Handling block download response");
@ -593,7 +601,7 @@ where
/// Starts the main loop of the block downloader. /// Starts the main loop of the block downloader.
async fn run(mut self) -> Result<(), BlockDownloadError> { async fn run(mut self) -> Result<(), BlockDownloadError> {
let mut chain_tracker = let mut chain_tracker =
initial_chain_search(&self.client_pool, &mut self.our_chain_svc).await?; initial_chain_search(&mut self.peer_set, &mut self.our_chain_svc).await?;
let mut pending_peers = BTreeMap::new(); let mut pending_peers = BTreeMap::new();
@ -662,7 +670,7 @@ struct BlockDownloadTaskResponse<N: NetworkZone> {
/// The start height of the batch. /// The start height of the batch.
start_height: usize, start_height: usize,
/// A result containing the batch or an error. /// A result containing the batch or an error.
result: Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError>, result: Result<(ClientDropGuard<N>, BlockBatch), BlockDownloadError>,
} }
/// Returns if a peer has all the blocks in a range, according to its [`PruningSeed`]. /// Returns if a peer has all the blocks in a range, according to its [`PruningSeed`].

View file

@ -16,8 +16,8 @@ use cuprate_wire::protocol::{GetObjectsRequest, GetObjectsResponse};
use crate::{ use crate::{
block_downloader::{BlockBatch, BlockDownloadError, BlockDownloadTaskResponse}, block_downloader::{BlockBatch, BlockDownloadError, BlockDownloadTaskResponse},
client_pool::ClientPoolDropGuard,
constants::{BLOCK_DOWNLOADER_REQUEST_TIMEOUT, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN}, constants::{BLOCK_DOWNLOADER_REQUEST_TIMEOUT, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN},
peer_set::ClientDropGuard,
}; };
/// Attempts to request a batch of blocks from a peer, returning [`BlockDownloadTaskResponse`]. /// Attempts to request a batch of blocks from a peer, returning [`BlockDownloadTaskResponse`].
@ -32,7 +32,7 @@ use crate::{
)] )]
#[expect(clippy::used_underscore_binding)] #[expect(clippy::used_underscore_binding)]
pub async fn download_batch_task<N: NetworkZone>( pub async fn download_batch_task<N: NetworkZone>(
client: ClientPoolDropGuard<N>, client: ClientDropGuard<N>,
ids: ByteArrayVec<32>, ids: ByteArrayVec<32>,
previous_id: [u8; 32], previous_id: [u8; 32],
expected_start_height: usize, expected_start_height: usize,
@ -49,11 +49,11 @@ pub async fn download_batch_task<N: NetworkZone>(
/// This function will validate the blocks that were downloaded were the ones asked for and that they match /// This function will validate the blocks that were downloaded were the ones asked for and that they match
/// the expected height. /// the expected height.
async fn request_batch_from_peer<N: NetworkZone>( async fn request_batch_from_peer<N: NetworkZone>(
mut client: ClientPoolDropGuard<N>, mut client: ClientDropGuard<N>,
ids: ByteArrayVec<32>, ids: ByteArrayVec<32>,
previous_id: [u8; 32], previous_id: [u8; 32],
expected_start_height: usize, expected_start_height: usize,
) -> Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError> { ) -> Result<(ClientDropGuard<N>, BlockBatch), BlockDownloadError> {
let request = PeerRequest::Protocol(ProtocolRequest::GetObjects(GetObjectsRequest { let request = PeerRequest::Protocol(ProtocolRequest::GetObjects(GetObjectsRequest {
blocks: ids.clone(), blocks: ids.clone(),
pruned: false, pruned: false,

View file

@ -1,7 +1,7 @@
use std::{mem, sync::Arc}; use std::mem;
use tokio::{task::JoinSet, time::timeout}; use tokio::{task::JoinSet, time::timeout};
use tower::{Service, ServiceExt}; use tower::{util::BoxCloneService, Service, ServiceExt};
use tracing::{instrument, Instrument, Span}; use tracing::{instrument, Instrument, Span};
use cuprate_p2p_core::{ use cuprate_p2p_core::{
@ -15,11 +15,11 @@ use crate::{
chain_tracker::{ChainEntry, ChainTracker}, chain_tracker::{ChainEntry, ChainTracker},
BlockDownloadError, ChainSvcRequest, ChainSvcResponse, BlockDownloadError, ChainSvcRequest, ChainSvcResponse,
}, },
client_pool::{ClientPool, ClientPoolDropGuard},
constants::{ constants::{
BLOCK_DOWNLOADER_REQUEST_TIMEOUT, INITIAL_CHAIN_REQUESTS_TO_SEND, BLOCK_DOWNLOADER_REQUEST_TIMEOUT, INITIAL_CHAIN_REQUESTS_TO_SEND,
MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MEDIUM_BAN, MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MEDIUM_BAN,
}, },
peer_set::{ClientDropGuard, PeerSetRequest, PeerSetResponse},
}; };
/// Request a chain entry from a peer. /// Request a chain entry from a peer.
@ -27,9 +27,9 @@ use crate::{
/// Because the block downloader only follows and downloads one chain we only have to send the block hash of /// Because the block downloader only follows and downloads one chain we only have to send the block hash of
/// top block we have found and the genesis block, this is then called `short_history`. /// top block we have found and the genesis block, this is then called `short_history`.
pub(crate) async fn request_chain_entry_from_peer<N: NetworkZone>( pub(crate) async fn request_chain_entry_from_peer<N: NetworkZone>(
mut client: ClientPoolDropGuard<N>, mut client: ClientDropGuard<N>,
short_history: [[u8; 32]; 2], short_history: [[u8; 32]; 2],
) -> Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError> { ) -> Result<(ClientDropGuard<N>, ChainEntry<N>), BlockDownloadError> {
let PeerResponse::Protocol(ProtocolResponse::GetChain(chain_res)) = client let PeerResponse::Protocol(ProtocolResponse::GetChain(chain_res)) = client
.ready() .ready()
.await? .await?
@ -80,7 +80,7 @@ pub(crate) async fn request_chain_entry_from_peer<N: NetworkZone>(
/// We then wait for their response and choose the peer who claims the highest cumulative difficulty. /// We then wait for their response and choose the peer who claims the highest cumulative difficulty.
#[instrument(level = "error", skip_all)] #[instrument(level = "error", skip_all)]
pub async fn initial_chain_search<N: NetworkZone, C>( pub async fn initial_chain_search<N: NetworkZone, C>(
client_pool: &Arc<ClientPool<N>>, peer_set: &mut BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
mut our_chain_svc: C, mut our_chain_svc: C,
) -> Result<ChainTracker<N>, BlockDownloadError> ) -> Result<ChainTracker<N>, BlockDownloadError>
where where
@ -102,9 +102,15 @@ where
let our_genesis = *block_ids.last().expect("Blockchain had no genesis block."); let our_genesis = *block_ids.last().expect("Blockchain had no genesis block.");
let mut peers = client_pool let PeerSetResponse::PeersWithMorePoW(clients) = peer_set
.clients_with_more_cumulative_difficulty(cumulative_difficulty) .ready()
.into_iter(); .await?
.call(PeerSetRequest::PeersWithMorePoW(cumulative_difficulty))
.await?
else {
unreachable!();
};
let mut peers = clients.into_iter();
let mut futs = JoinSet::new(); let mut futs = JoinSet::new();

View file

@ -14,8 +14,8 @@ use monero_serai::{
transaction::{Input, Timelock, Transaction, TransactionPrefix}, transaction::{Input, Timelock, Transaction, TransactionPrefix},
}; };
use proptest::{collection::vec, prelude::*}; use proptest::{collection::vec, prelude::*};
use tokio::time::timeout; use tokio::{sync::mpsc, time::timeout};
use tower::{service_fn, Service}; use tower::{buffer::Buffer, service_fn, Service, ServiceExt};
use cuprate_fixed_bytes::ByteArrayVec; use cuprate_fixed_bytes::ByteArrayVec;
use cuprate_p2p_core::{ use cuprate_p2p_core::{
@ -31,7 +31,7 @@ use cuprate_wire::{
use crate::{ use crate::{
block_downloader::{download_blocks, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}, block_downloader::{download_blocks, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
client_pool::ClientPool, peer_set::PeerSet,
}; };
proptest! { proptest! {
@ -48,19 +48,20 @@ proptest! {
let tokio_pool = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); let tokio_pool = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
#[expect(clippy::significant_drop_tightening)]
tokio_pool.block_on(async move { tokio_pool.block_on(async move {
timeout(Duration::from_secs(600), async move { timeout(Duration::from_secs(600), async move {
let client_pool = ClientPool::new(); let (new_connection_tx, new_connection_rx) = mpsc::channel(peers);
let peer_set = PeerSet::new(new_connection_rx);
for _ in 0..peers { for _ in 0..peers {
let client = mock_block_downloader_client(Arc::clone(&blockchain)); let client = mock_block_downloader_client(Arc::clone(&blockchain));
client_pool.add_new_client(client); new_connection_tx.try_send(client).unwrap();
} }
let stream = download_blocks( let stream = download_blocks(
client_pool, Buffer::new(peer_set, 10).boxed_clone(),
OurChainSvc { OurChainSvc {
genesis: *blockchain.blocks.first().unwrap().0 genesis: *blockchain.blocks.first().unwrap().0
}, },

View file

@ -1,188 +0,0 @@
//! # Client Pool.
//!
//! The [`ClientPool`], is a pool of currently connected peers that can be pulled from.
//! It does _not_ necessarily contain every connected peer as another place could have
//! taken a peer from the pool.
//!
//! When taking peers from the pool they are wrapped in [`ClientPoolDropGuard`], which
//! returns the peer to the pool when it is dropped.
//!
//! Internally the pool is a [`DashMap`] which means care should be taken in `async` code
//! as internally this uses blocking `RwLock`s.
use std::sync::Arc;
use dashmap::DashMap;
use tokio::sync::mpsc;
use tracing::{Instrument, Span};
use cuprate_p2p_core::{
client::{Client, InternalPeerID},
handles::ConnectionHandle,
ConnectionDirection, NetworkZone,
};
pub(crate) mod disconnect_monitor;
mod drop_guard_client;
pub use drop_guard_client::ClientPoolDropGuard;
/// The client pool, which holds currently connected free peers.
///
/// See the [module docs](self) for more.
pub struct ClientPool<N: NetworkZone> {
/// The connected [`Client`]s.
clients: DashMap<InternalPeerID<N::Addr>, Client<N>>,
/// A channel to send new peer ids down to monitor for disconnect.
new_connection_tx: mpsc::UnboundedSender<(ConnectionHandle, InternalPeerID<N::Addr>)>,
}
impl<N: NetworkZone> ClientPool<N> {
/// Returns a new [`ClientPool`] wrapped in an [`Arc`].
pub fn new() -> Arc<Self> {
let (tx, rx) = mpsc::unbounded_channel();
let pool = Arc::new(Self {
clients: DashMap::new(),
new_connection_tx: tx,
});
tokio::spawn(
disconnect_monitor::disconnect_monitor(rx, Arc::clone(&pool))
.instrument(Span::current()),
);
pool
}
/// Adds a [`Client`] to the pool, the client must have previously been taken from the
/// pool.
///
/// See [`ClientPool::add_new_client`] to add a [`Client`] which was not taken from the pool before.
///
/// # Panics
/// This function panics if `client` already exists in the pool.
fn add_client(&self, client: Client<N>) {
let handle = client.info.handle.clone();
let id = client.info.id;
// Fast path: if the client is disconnected don't add it to the peer set.
if handle.is_closed() {
return;
}
assert!(self.clients.insert(id, client).is_none());
// We have to check this again otherwise we could have a race condition where a
// peer is disconnected after the first check, the disconnect monitor tries to remove it,
// and then it is added to the pool.
if handle.is_closed() {
self.remove_client(&id);
}
}
/// Adds a _new_ [`Client`] to the pool, this client should be a new connection, and not already
/// from the pool.
///
/// # Panics
/// This function panics if `client` already exists in the pool.
pub fn add_new_client(&self, client: Client<N>) {
self.new_connection_tx
.send((client.info.handle.clone(), client.info.id))
.unwrap();
self.add_client(client);
}
/// Remove a [`Client`] from the pool.
///
/// [`None`] is returned if the client did not exist in the pool.
fn remove_client(&self, peer: &InternalPeerID<N::Addr>) -> Option<Client<N>> {
self.clients.remove(peer).map(|(_, client)| client)
}
/// Borrows a [`Client`] from the pool.
///
/// The [`Client`] is wrapped in [`ClientPoolDropGuard`] which
/// will return the client to the pool when it's dropped.
///
/// See [`Self::borrow_clients`] for borrowing multiple clients.
pub fn borrow_client(
self: &Arc<Self>,
peer: &InternalPeerID<N::Addr>,
) -> Option<ClientPoolDropGuard<N>> {
self.remove_client(peer).map(|client| ClientPoolDropGuard {
pool: Arc::clone(self),
client: Some(client),
})
}
/// Borrows multiple [`Client`]s from the pool.
///
/// Note that the returned iterator is not guaranteed to contain every peer asked for.
///
/// See [`Self::borrow_client`] for borrowing a single client.
pub fn borrow_clients<'a, 'b>(
self: &'a Arc<Self>,
peers: &'b [InternalPeerID<N::Addr>],
) -> impl Iterator<Item = ClientPoolDropGuard<N>> + sealed::Captures<(&'a (), &'b ())> {
peers.iter().filter_map(|peer| self.borrow_client(peer))
}
/// Borrows all [`Client`]s from the pool that have claimed a higher cumulative difficulty than
/// the amount passed in.
///
/// The [`Client`]s are wrapped in [`ClientPoolDropGuard`] which
/// will return the clients to the pool when they are dropped.
pub fn clients_with_more_cumulative_difficulty(
self: &Arc<Self>,
cumulative_difficulty: u128,
) -> Vec<ClientPoolDropGuard<N>> {
let peers = self
.clients
.iter()
.filter_map(|element| {
let peer_sync_info = element.value().info.core_sync_data.lock().unwrap();
if peer_sync_info.cumulative_difficulty() > cumulative_difficulty {
Some(*element.key())
} else {
None
}
})
.collect::<Vec<_>>();
self.borrow_clients(&peers).collect()
}
/// Checks all clients in the pool checking if any claim a higher cumulative difficulty than the
/// amount specified.
pub fn contains_client_with_more_cumulative_difficulty(
&self,
cumulative_difficulty: u128,
) -> bool {
self.clients.iter().any(|element| {
let sync_data = element.value().info.core_sync_data.lock().unwrap();
sync_data.cumulative_difficulty() > cumulative_difficulty
})
}
/// Returns the first outbound peer when iterating over the peers.
pub fn outbound_client(self: &Arc<Self>) -> Option<ClientPoolDropGuard<N>> {
let client = self
.clients
.iter()
.find(|element| element.value().info.direction == ConnectionDirection::Outbound)?;
let id = *client.key();
Some(self.borrow_client(&id).unwrap())
}
}
mod sealed {
/// TODO: Remove me when 2024 Rust
///
/// <https://rust-lang.github.io/rfcs/3498-lifetime-capture-rules-2024.html#the-captures-trick>
pub trait Captures<U> {}
impl<T: ?Sized, U> Captures<U> for T {}
}

View file

@ -1,83 +0,0 @@
//! # Disconnect Monitor
//!
//! This module contains the [`disconnect_monitor`] task, which monitors connected peers for disconnection
//! and then removes them from the [`ClientPool`] if they do.
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures::{stream::FuturesUnordered, StreamExt};
use tokio::sync::mpsc;
use tokio_util::sync::WaitForCancellationFutureOwned;
use tracing::instrument;
use cuprate_p2p_core::{client::InternalPeerID, handles::ConnectionHandle, NetworkZone};
use super::ClientPool;
/// The disconnect monitor task.
#[instrument(level = "info", skip_all)]
pub async fn disconnect_monitor<N: NetworkZone>(
mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID<N::Addr>)>,
client_pool: Arc<ClientPool<N>>,
) {
// We need to hold a weak reference otherwise the client pool and this would hold a reference to
// each other causing the pool to be leaked.
let weak_client_pool = Arc::downgrade(&client_pool);
drop(client_pool);
tracing::info!("Starting peer disconnect monitor.");
let mut futs: FuturesUnordered<PeerDisconnectFut<N>> = FuturesUnordered::new();
loop {
tokio::select! {
Some((con_handle, peer_id)) = new_connection_rx.recv() => {
tracing::debug!("Monitoring {peer_id} for disconnect");
futs.push(PeerDisconnectFut {
closed_fut: con_handle.closed(),
peer_id: Some(peer_id),
});
}
Some(peer_id) = futs.next() => {
tracing::debug!("{peer_id} has disconnected, removing from client pool.");
let Some(pool) = weak_client_pool.upgrade() else {
tracing::info!("Peer disconnect monitor shutting down.");
return;
};
pool.remove_client(&peer_id);
drop(pool);
}
else => {
tracing::info!("Peer disconnect monitor shutting down.");
return;
}
}
}
}
/// A [`Future`] that resolves when a peer disconnects.
#[pin_project::pin_project]
pub(crate) struct PeerDisconnectFut<N: NetworkZone> {
/// The inner [`Future`] that resolves when a peer disconnects.
#[pin]
pub(crate) closed_fut: WaitForCancellationFutureOwned,
/// The peers ID.
pub(crate) peer_id: Option<InternalPeerID<N::Addr>>,
}
impl<N: NetworkZone> Future for PeerDisconnectFut<N> {
type Output = InternalPeerID<N::Addr>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.closed_fut
.poll(cx)
.map(|()| this.peer_id.take().unwrap())
}
}

View file

@ -1,41 +0,0 @@
use std::{
ops::{Deref, DerefMut},
sync::Arc,
};
use cuprate_p2p_core::{client::Client, NetworkZone};
use crate::client_pool::ClientPool;
/// A wrapper around [`Client`] which returns the client to the [`ClientPool`] when dropped.
pub struct ClientPoolDropGuard<N: NetworkZone> {
/// The [`ClientPool`] to return the peer to.
pub(super) pool: Arc<ClientPool<N>>,
/// The [`Client`].
///
/// This is set to [`Some`] when this guard is created, then
/// [`take`](Option::take)n and returned to the pool when dropped.
pub(super) client: Option<Client<N>>,
}
impl<N: NetworkZone> Deref for ClientPoolDropGuard<N> {
type Target = Client<N>;
fn deref(&self) -> &Self::Target {
self.client.as_ref().unwrap()
}
}
impl<N: NetworkZone> DerefMut for ClientPoolDropGuard<N> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.client.as_mut().unwrap()
}
}
impl<N: NetworkZone> Drop for ClientPoolDropGuard<N> {
fn drop(&mut self) {
let client = self.client.take().unwrap();
self.pool.add_client(client);
}
}

View file

@ -21,7 +21,6 @@ use cuprate_p2p_core::{
}; };
use crate::{ use crate::{
client_pool::ClientPool,
config::P2PConfig, config::P2PConfig,
constants::{HANDSHAKE_TIMEOUT, MAX_SEED_CONNECTIONS, OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT}, constants::{HANDSHAKE_TIMEOUT, MAX_SEED_CONNECTIONS, OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT},
}; };
@ -46,7 +45,7 @@ pub struct MakeConnectionRequest {
/// This handles maintaining a minimum number of connections and making extra connections when needed, upto a maximum. /// This handles maintaining a minimum number of connections and making extra connections when needed, upto a maximum.
pub struct OutboundConnectionKeeper<N: NetworkZone, A, C> { pub struct OutboundConnectionKeeper<N: NetworkZone, A, C> {
/// The pool of currently connected peers. /// The pool of currently connected peers.
pub client_pool: Arc<ClientPool<N>>, pub new_peers_tx: mpsc::Sender<Client<N>>,
/// The channel that tells us to make new _extra_ outbound connections. /// The channel that tells us to make new _extra_ outbound connections.
pub make_connection_rx: mpsc::Receiver<MakeConnectionRequest>, pub make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
/// The address book service /// The address book service
@ -77,7 +76,7 @@ where
{ {
pub fn new( pub fn new(
config: P2PConfig<N>, config: P2PConfig<N>,
client_pool: Arc<ClientPool<N>>, new_peers_tx: mpsc::Sender<Client<N>>,
make_connection_rx: mpsc::Receiver<MakeConnectionRequest>, make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
address_book_svc: A, address_book_svc: A,
connector_svc: C, connector_svc: C,
@ -86,7 +85,7 @@ where
.expect("Gray peer percent is incorrect should be 0..=1"); .expect("Gray peer percent is incorrect should be 0..=1");
Self { Self {
client_pool, new_peers_tx,
make_connection_rx, make_connection_rx,
address_book_svc, address_book_svc,
connector_svc, connector_svc,
@ -149,7 +148,7 @@ where
/// Connects to a given outbound peer. /// Connects to a given outbound peer.
#[instrument(level = "info", skip_all)] #[instrument(level = "info", skip_all)]
async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) { async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) {
let client_pool = Arc::clone(&self.client_pool); let new_peers_tx = self.new_peers_tx.clone();
let connection_fut = self let connection_fut = self
.connector_svc .connector_svc
.ready() .ready()
@ -164,7 +163,7 @@ where
async move { async move {
#[expect(clippy::significant_drop_in_scrutinee)] #[expect(clippy::significant_drop_in_scrutinee)]
if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await { if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await {
client_pool.add_new_client(peer); drop(new_peers_tx.send(peer).await);
} }
} }
.instrument(Span::current()), .instrument(Span::current()),

View file

@ -6,7 +6,7 @@ use std::{pin::pin, sync::Arc};
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use tokio::{ use tokio::{
sync::Semaphore, sync::{mpsc, Semaphore},
task::JoinSet, task::JoinSet,
time::{sleep, timeout}, time::{sleep, timeout},
}; };
@ -24,7 +24,6 @@ use cuprate_wire::{
}; };
use crate::{ use crate::{
client_pool::ClientPool,
constants::{ constants::{
HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN, PING_REQUEST_CONCURRENCY, HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN, PING_REQUEST_CONCURRENCY,
PING_REQUEST_TIMEOUT, PING_REQUEST_TIMEOUT,
@ -36,7 +35,7 @@ use crate::{
/// and initiate handshake if needed, after verifying the address isn't banned. /// and initiate handshake if needed, after verifying the address isn't banned.
#[instrument(level = "warn", skip_all)] #[instrument(level = "warn", skip_all)]
pub async fn inbound_server<N, HS, A>( pub async fn inbound_server<N, HS, A>(
client_pool: Arc<ClientPool<N>>, new_connection_tx: mpsc::Sender<Client<N>>,
mut handshaker: HS, mut handshaker: HS,
mut address_book: A, mut address_book: A,
config: P2PConfig<N>, config: P2PConfig<N>,
@ -111,13 +110,13 @@ where
permit: Some(permit), permit: Some(permit),
}); });
let cloned_pool = Arc::clone(&client_pool); let new_connection_tx = new_connection_tx.clone();
tokio::spawn( tokio::spawn(
async move { async move {
let client = timeout(HANDSHAKE_TIMEOUT, fut).await; let client = timeout(HANDSHAKE_TIMEOUT, fut).await;
if let Ok(Ok(peer)) = client { if let Ok(Ok(peer)) = client {
cloned_pool.add_new_client(peer); drop(new_connection_tx.send(peer).await);
} }
} }
.instrument(Span::current()), .instrument(Span::current()),

View file

@ -18,17 +18,18 @@ use cuprate_p2p_core::{
pub mod block_downloader; pub mod block_downloader;
mod broadcast; mod broadcast;
pub mod client_pool;
pub mod config; pub mod config;
pub mod connection_maintainer; pub mod connection_maintainer;
pub mod constants; pub mod constants;
mod inbound_server; mod inbound_server;
mod peer_set;
use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}; use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse};
pub use broadcast::{BroadcastRequest, BroadcastSvc}; pub use broadcast::{BroadcastRequest, BroadcastSvc};
pub use client_pool::{ClientPool, ClientPoolDropGuard};
pub use config::{AddressBookConfig, P2PConfig}; pub use config::{AddressBookConfig, P2PConfig};
use connection_maintainer::MakeConnectionRequest; use connection_maintainer::MakeConnectionRequest;
use peer_set::PeerSet;
pub use peer_set::{ClientDropGuard, PeerSetRequest, PeerSetResponse};
/// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`]. /// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`].
/// ///
@ -54,7 +55,10 @@ where
cuprate_address_book::init_address_book(config.address_book_config.clone()).await?; cuprate_address_book::init_address_book(config.address_book_config.clone()).await?;
let address_book = Buffer::new( let address_book = Buffer::new(
address_book, address_book,
config.max_inbound_connections + config.outbound_connections, config
.max_inbound_connections
.checked_add(config.outbound_connections)
.unwrap(),
); );
// Use the default config. Changing the defaults affects tx fluff times, which could affect D++ so for now don't allow changing // Use the default config. Changing the defaults affects tx fluff times, which could affect D++ so for now don't allow changing
@ -83,19 +87,25 @@ where
let outbound_handshaker = outbound_handshaker_builder.build(); let outbound_handshaker = outbound_handshaker_builder.build();
let client_pool = ClientPool::new(); let (new_connection_tx, new_connection_rx) = mpsc::channel(
config
.outbound_connections
.checked_add(config.max_inbound_connections)
.unwrap(),
);
let (make_connection_tx, make_connection_rx) = mpsc::channel(3); let (make_connection_tx, make_connection_rx) = mpsc::channel(3);
let outbound_connector = Connector::new(outbound_handshaker); let outbound_connector = Connector::new(outbound_handshaker);
let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new( let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new(
config.clone(), config.clone(),
Arc::clone(&client_pool), new_connection_tx.clone(),
make_connection_rx, make_connection_rx,
address_book.clone(), address_book.clone(),
outbound_connector, outbound_connector,
); );
let peer_set = PeerSet::new(new_connection_rx);
let mut background_tasks = JoinSet::new(); let mut background_tasks = JoinSet::new();
background_tasks.spawn( background_tasks.spawn(
@ -105,7 +115,7 @@ where
); );
background_tasks.spawn( background_tasks.spawn(
inbound_server::inbound_server( inbound_server::inbound_server(
Arc::clone(&client_pool), new_connection_tx,
inbound_handshaker, inbound_handshaker,
address_book.clone(), address_book.clone(),
config, config,
@ -121,7 +131,7 @@ where
); );
Ok(NetworkInterface { Ok(NetworkInterface {
pool: client_pool, peer_set: Buffer::new(peer_set, 10).boxed_clone(),
broadcast_svc, broadcast_svc,
make_connection_tx, make_connection_tx,
address_book: address_book.boxed_clone(), address_book: address_book.boxed_clone(),
@ -133,7 +143,7 @@ where
#[derive(Clone)] #[derive(Clone)]
pub struct NetworkInterface<N: NetworkZone> { pub struct NetworkInterface<N: NetworkZone> {
/// A pool of free connected peers. /// A pool of free connected peers.
pool: Arc<ClientPool<N>>, peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
/// A [`Service`] that allows broadcasting to all connected peers. /// A [`Service`] that allows broadcasting to all connected peers.
broadcast_svc: BroadcastSvc<N>, broadcast_svc: BroadcastSvc<N>,
/// A channel to request extra connections. /// A channel to request extra connections.
@ -163,7 +173,7 @@ impl<N: NetworkZone> NetworkInterface<N> {
+ 'static, + 'static,
C::Future: Send + 'static, C::Future: Send + 'static,
{ {
block_downloader::download_blocks(Arc::clone(&self.pool), our_chain_service, config) block_downloader::download_blocks(self.peer_set.clone(), our_chain_service, config)
} }
/// Returns the address book service. /// Returns the address book service.
@ -173,8 +183,10 @@ impl<N: NetworkZone> NetworkInterface<N> {
self.address_book.clone() self.address_book.clone()
} }
/// Borrows the `ClientPool`, for access to connected peers. /// Borrows the `PeerSet`, for access to connected peers.
pub const fn client_pool(&self) -> &Arc<ClientPool<N>> { pub fn peer_set(
&self.pool &mut self,
) -> &mut BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError> {
&mut self.peer_set
} }
} }

217
p2p/p2p/src/peer_set.rs Normal file
View file

@ -0,0 +1,217 @@
use std::{
future::{ready, Future, Ready},
pin::{pin, Pin},
task::{Context, Poll},
};
use futures::{stream::FuturesUnordered, StreamExt};
use indexmap::{IndexMap, IndexSet};
use rand::{seq::index::sample, thread_rng};
use tokio::sync::mpsc::Receiver;
use tokio_util::sync::WaitForCancellationFutureOwned;
use tower::Service;
use cuprate_helper::cast::u64_to_usize;
use cuprate_p2p_core::{
client::{Client, InternalPeerID},
ConnectionDirection, NetworkZone,
};
mod client_wrappers;
pub use client_wrappers::ClientDropGuard;
use client_wrappers::StoredClient;
/// A request to the peer-set.
pub enum PeerSetRequest {
/// The most claimed proof-of-work from a peer in the peer-set.
MostPoWSeen,
/// Peers with more cumulative difficulty than the given cumulative difficulty.
///
/// Returned peers will be remembered and won't be returned from subsequent calls until the guard is dropped.
PeersWithMorePoW(u128),
/// A random outbound peer.
///
/// The returned peer will be remembered and won't be returned from subsequent calls until the guard is dropped.
StemPeer,
}
/// A response from the peer-set.
pub enum PeerSetResponse<N: NetworkZone> {
/// [`PeerSetRequest::MostPoWSeen`]
MostPoWSeen {
/// The cumulative difficulty claimed.
cumulative_difficulty: u128,
/// The height claimed.
height: usize,
/// The claimed hash of the top block.
top_hash: [u8; 32],
},
/// [`PeerSetRequest::PeersWithMorePoW`]
///
/// Returned peers will be remembered and won't be returned from subsequent calls until the guard is dropped.
PeersWithMorePoW(Vec<ClientDropGuard<N>>),
/// [`PeerSetRequest::StemPeer`]
///
/// The returned peer will be remembered and won't be returned from subsequent calls until the guard is dropped.
StemPeer(Option<ClientDropGuard<N>>),
}
/// A [`Future`] that completes when a peer disconnects.
#[pin_project::pin_project]
struct ClosedConnectionFuture<N: NetworkZone> {
#[pin]
fut: WaitForCancellationFutureOwned,
id: Option<InternalPeerID<N::Addr>>,
}
impl<N: NetworkZone> Future for ClosedConnectionFuture<N> {
type Output = InternalPeerID<N::Addr>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.fut.poll(cx).map(|()| this.id.take().unwrap())
}
}
/// A collection of all connected peers on a [`NetworkZone`].
pub(crate) struct PeerSet<N: NetworkZone> {
/// The connected peers.
peers: IndexMap<InternalPeerID<N::Addr>, StoredClient<N>>,
/// A [`FuturesUnordered`] that resolves when a peer disconnects.
closed_connections: FuturesUnordered<ClosedConnectionFuture<N>>,
/// The [`InternalPeerID`]s of all outbound peers.
outbound_peers: IndexSet<InternalPeerID<N::Addr>>,
/// A channel of new peers from the inbound server or outbound connector.
new_peers: Receiver<Client<N>>,
}
impl<N: NetworkZone> PeerSet<N> {
pub(crate) fn new(new_peers: Receiver<Client<N>>) -> Self {
Self {
peers: IndexMap::new(),
closed_connections: FuturesUnordered::new(),
outbound_peers: IndexSet::new(),
new_peers,
}
}
/// Polls the new peers channel for newly connected peers.
fn poll_new_peers(&mut self, cx: &mut Context<'_>) {
while let Poll::Ready(Some(new_peer)) = self.new_peers.poll_recv(cx) {
if new_peer.info.direction == ConnectionDirection::Outbound {
self.outbound_peers.insert(new_peer.info.id);
}
self.closed_connections.push(ClosedConnectionFuture {
fut: new_peer.info.handle.closed(),
id: Some(new_peer.info.id),
});
self.peers
.insert(new_peer.info.id, StoredClient::new(new_peer));
}
}
/// Remove disconnected peers from the peer set.
fn remove_dead_peers(&mut self, cx: &mut Context<'_>) {
while let Poll::Ready(Some(dead_peer)) = self.closed_connections.poll_next_unpin(cx) {
let Some(peer) = self.peers.swap_remove(&dead_peer) else {
continue;
};
if peer.client.info.direction == ConnectionDirection::Outbound {
self.outbound_peers.swap_remove(&peer.client.info.id);
}
self.peers.swap_remove(&dead_peer);
}
}
/// [`PeerSetRequest::MostPoWSeen`]
fn most_pow_seen(&self) -> PeerSetResponse<N> {
let most_pow_chain = self
.peers
.values()
.map(|peer| {
let core_sync_data = peer.client.info.core_sync_data.lock().unwrap();
(
core_sync_data.cumulative_difficulty(),
u64_to_usize(core_sync_data.current_height),
core_sync_data.top_id,
)
})
.max_by_key(|(cumulative_difficulty, ..)| *cumulative_difficulty)
.unwrap_or_default();
PeerSetResponse::MostPoWSeen {
cumulative_difficulty: most_pow_chain.0,
height: most_pow_chain.1,
top_hash: most_pow_chain.2,
}
}
/// [`PeerSetRequest::PeersWithMorePoW`]
fn peers_with_more_pow(&self, cumulative_difficulty: u128) -> PeerSetResponse<N> {
PeerSetResponse::PeersWithMorePoW(
self.peers
.values()
.filter(|&client| {
!client.is_downloading_blocks()
&& client
.client
.info
.core_sync_data
.lock()
.unwrap()
.cumulative_difficulty()
> cumulative_difficulty
})
.map(StoredClient::downloading_blocks_guard)
.collect(),
)
}
/// [`PeerSetRequest::StemPeer`]
fn random_peer_for_stem(&self) -> PeerSetResponse<N> {
PeerSetResponse::StemPeer(
sample(
&mut thread_rng(),
self.outbound_peers.len(),
self.outbound_peers.len(),
)
.into_iter()
.find_map(|i| {
let peer = self.outbound_peers.get_index(i).unwrap();
let client = self.peers.get(peer).unwrap();
(!client.is_a_stem_peer()).then(|| client.stem_peer_guard())
}),
)
}
}
impl<N: NetworkZone> Service<PeerSetRequest> for PeerSet<N> {
type Response = PeerSetResponse<N>;
type Error = tower::BoxError;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_new_peers(cx);
self.remove_dead_peers(cx);
// TODO: should we return `Pending` if we don't have any peers?
Poll::Ready(Ok(()))
}
fn call(&mut self, req: PeerSetRequest) -> Self::Future {
ready(match req {
PeerSetRequest::MostPoWSeen => Ok(self.most_pow_seen()),
PeerSetRequest::PeersWithMorePoW(cumulative_difficulty) => {
Ok(self.peers_with_more_pow(cumulative_difficulty))
}
PeerSetRequest::StemPeer => Ok(self.random_peer_for_stem()),
})
}
}

View file

@ -0,0 +1,86 @@
use std::{
ops::{Deref, DerefMut},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use cuprate_p2p_core::{
client::{Client, WeakClient},
NetworkZone,
};
/// A client stored in the peer-set.
pub(super) struct StoredClient<N: NetworkZone> {
pub client: Client<N>,
/// An [`AtomicBool`] for if the peer is currently downloading blocks.
downloading_blocks: Arc<AtomicBool>,
/// An [`AtomicBool`] for if the peer is currently being used to stem txs.
stem_peer: Arc<AtomicBool>,
}
impl<N: NetworkZone> StoredClient<N> {
pub(super) fn new(client: Client<N>) -> Self {
Self {
client,
downloading_blocks: Arc::new(AtomicBool::new(false)),
stem_peer: Arc::new(AtomicBool::new(false)),
}
}
/// Returns [`true`] if the [`StoredClient`] is currently downloading blocks.
pub(super) fn is_downloading_blocks(&self) -> bool {
self.downloading_blocks.load(Ordering::Relaxed)
}
/// Returns [`true`] if the [`StoredClient`] is currently being used to stem txs.
pub(super) fn is_a_stem_peer(&self) -> bool {
self.stem_peer.load(Ordering::Relaxed)
}
/// Returns a [`ClientDropGuard`] that while it is alive keeps the [`StoredClient`] in the downloading blocks state.
pub(super) fn downloading_blocks_guard(&self) -> ClientDropGuard<N> {
self.downloading_blocks.store(true, Ordering::Relaxed);
ClientDropGuard {
client: self.client.downgrade(),
bool: Arc::clone(&self.downloading_blocks),
}
}
/// Returns a [`ClientDropGuard`] that while it is alive keeps the [`StoredClient`] in the stemming peers state.
pub(super) fn stem_peer_guard(&self) -> ClientDropGuard<N> {
self.stem_peer.store(true, Ordering::Relaxed);
ClientDropGuard {
client: self.client.downgrade(),
bool: Arc::clone(&self.stem_peer),
}
}
}
/// A [`Drop`] guard for a client returned from the peer-set.
pub struct ClientDropGuard<N: NetworkZone> {
client: WeakClient<N>,
bool: Arc<AtomicBool>,
}
impl<N: NetworkZone> Deref for ClientDropGuard<N> {
type Target = WeakClient<N>;
fn deref(&self) -> &Self::Target {
&self.client
}
}
impl<N: NetworkZone> DerefMut for ClientDropGuard<N> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.client
}
}
impl<N: NetworkZone> Drop for ClientDropGuard<N> {
fn drop(&mut self) {
self.bool.store(false, Ordering::Relaxed);
}
}