Merge branch 'main' into init

This commit is contained in:
Boog900 2024-11-20 19:29:24 +00:00
commit 720b7afcd1
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
19 changed files with 602 additions and 418 deletions

1
Cargo.lock generated
View file

@ -840,7 +840,6 @@ dependencies = [
"cuprate-test-utils", "cuprate-test-utils",
"cuprate-types", "cuprate-types",
"cuprate-wire", "cuprate-wire",
"dashmap",
"futures", "futures",
"indexmap", "indexmap",
"monero-serai", "monero-serai",

View file

@ -12,7 +12,7 @@ use tracing::instrument;
use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse}; use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse};
use cuprate_p2p::{ use cuprate_p2p::{
block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}, block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
NetworkInterface, NetworkInterface, PeerSetRequest, PeerSetResponse,
}; };
use cuprate_p2p_core::ClearNet; use cuprate_p2p_core::ClearNet;
@ -28,15 +28,11 @@ pub enum SyncerError {
} }
/// The syncer tasks that makes sure we are fully synchronised with our connected peers. /// The syncer tasks that makes sure we are fully synchronised with our connected peers.
#[expect(
clippy::significant_drop_tightening,
reason = "Client pool which will be removed"
)]
#[instrument(level = "debug", skip_all)] #[instrument(level = "debug", skip_all)]
pub async fn syncer<C, CN>( pub async fn syncer<C, CN>(
mut context_svc: C, mut context_svc: C,
our_chain: CN, our_chain: CN,
clearnet_interface: NetworkInterface<ClearNet>, mut clearnet_interface: NetworkInterface<ClearNet>,
incoming_block_batch_tx: mpsc::Sender<BlockBatch>, incoming_block_batch_tx: mpsc::Sender<BlockBatch>,
stop_current_block_downloader: Arc<Notify>, stop_current_block_downloader: Arc<Notify>,
block_downloader_config: BlockDownloaderConfig, block_downloader_config: BlockDownloaderConfig,
@ -67,8 +63,6 @@ where
unreachable!(); unreachable!();
}; };
let client_pool = clearnet_interface.client_pool();
tracing::debug!("Waiting for new sync info in top sync channel"); tracing::debug!("Waiting for new sync info in top sync channel");
loop { loop {
@ -79,9 +73,20 @@ where
check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?; check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?;
let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context(); let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context();
if !client_pool.contains_client_with_more_cumulative_difficulty( let PeerSetResponse::MostPoWSeen {
raw_blockchain_context.cumulative_difficulty, cumulative_difficulty,
) { ..
} = clearnet_interface
.peer_set()
.ready()
.await?
.call(PeerSetRequest::MostPoWSeen)
.await?
else {
unreachable!();
};
if cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty {
continue; continue;
} }

View file

@ -59,7 +59,7 @@ pub fn dandelion_router(clear_net: NetworkInterface<ClearNet>) -> ConcreteDandel
diffuse_service::DiffuseService { diffuse_service::DiffuseService {
clear_net_broadcast_service: clear_net.broadcast_svc(), clear_net_broadcast_service: clear_net.broadcast_svc(),
}, },
stem_service::OutboundPeerStream { clear_net }, stem_service::OutboundPeerStream::new(clear_net),
DANDELION_CONFIG, DANDELION_CONFIG,
) )
} }

View file

@ -1,14 +1,15 @@
use std::{ use std::{
future::Future,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{ready, Context, Poll},
}; };
use bytes::Bytes; use bytes::Bytes;
use futures::Stream; use futures::{future::BoxFuture, FutureExt, Stream};
use tower::Service; use tower::Service;
use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer}; use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer};
use cuprate_p2p::{ClientPoolDropGuard, NetworkInterface}; use cuprate_p2p::{ClientDropGuard, NetworkInterface, PeerSetRequest, PeerSetResponse};
use cuprate_p2p_core::{ use cuprate_p2p_core::{
client::{Client, InternalPeerID}, client::{Client, InternalPeerID},
ClearNet, NetworkZone, PeerRequest, ProtocolRequest, ClearNet, NetworkZone, PeerRequest, ProtocolRequest,
@ -19,7 +20,17 @@ use crate::{p2p::CrossNetworkInternalPeerId, txpool::dandelion::DandelionTx};
/// The dandelion outbound peer stream. /// The dandelion outbound peer stream.
pub struct OutboundPeerStream { pub struct OutboundPeerStream {
pub clear_net: NetworkInterface<ClearNet>, clear_net: NetworkInterface<ClearNet>,
state: OutboundPeerStreamState,
}
impl OutboundPeerStream {
pub const fn new(clear_net: NetworkInterface<ClearNet>) -> Self {
Self {
clear_net,
state: OutboundPeerStreamState::Standby,
}
}
} }
impl Stream for OutboundPeerStream { impl Stream for OutboundPeerStream {
@ -28,23 +39,49 @@ impl Stream for OutboundPeerStream {
tower::BoxError, tower::BoxError,
>; >;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// TODO: make the outbound peer choice random. loop {
Poll::Ready(Some(Ok(self match &mut self.state {
.clear_net OutboundPeerStreamState::Standby => {
.client_pool() let peer_set = self.clear_net.peer_set();
.outbound_client() let res = ready!(peer_set.poll_ready(cx));
.map_or(OutboundPeer::Exhausted, |client| {
OutboundPeer::Peer( self.state = OutboundPeerStreamState::AwaitingPeer(
CrossNetworkInternalPeerId::ClearNet(client.info.id), peer_set.call(PeerSetRequest::StemPeer).boxed(),
StemPeerService(client), );
) }
})))) OutboundPeerStreamState::AwaitingPeer(fut) => {
let res = ready!(fut.poll_unpin(cx));
return Poll::Ready(Some(res.map(|res| {
let PeerSetResponse::StemPeer(stem_peer) = res else {
unreachable!()
};
match stem_peer {
Some(peer) => OutboundPeer::Peer(
CrossNetworkInternalPeerId::ClearNet(peer.info.id),
StemPeerService(peer),
),
None => OutboundPeer::Exhausted,
}
})));
}
}
}
} }
} }
/// The state of the [`OutboundPeerStream`].
enum OutboundPeerStreamState {
/// Standby state.
Standby,
/// Awaiting a response from the peer-set.
AwaitingPeer(BoxFuture<'static, Result<PeerSetResponse<ClearNet>, tower::BoxError>>),
}
/// The stem service, used to send stem txs. /// The stem service, used to send stem txs.
pub struct StemPeerService<N: NetworkZone>(ClientPoolDropGuard<N>); pub struct StemPeerService<N: NetworkZone>(ClientDropGuard<N>);
impl<N: NetworkZone> Service<StemRequest<DandelionTx>> for StemPeerService<N> { impl<N: NetworkZone> Service<StemRequest<DandelionTx>> for StemPeerService<N> {
type Response = <Client<N> as Service<PeerRequest>>::Response; type Response = <Client<N> as Service<PeerRequest>>::Response;

View file

@ -27,9 +27,11 @@ mod connector;
pub mod handshaker; pub mod handshaker;
mod request_handler; mod request_handler;
mod timeout_monitor; mod timeout_monitor;
mod weak;
pub use connector::{ConnectRequest, Connector}; pub use connector::{ConnectRequest, Connector};
pub use handshaker::{DoHandshakeRequest, HandshakeError, HandshakerBuilder}; pub use handshaker::{DoHandshakeRequest, HandshakeError, HandshakerBuilder};
pub use weak::WeakClient;
/// An internal identifier for a given peer, will be their address if known /// An internal identifier for a given peer, will be their address if known
/// or a random u128 if not. /// or a random u128 if not.
@ -128,6 +130,17 @@ impl<Z: NetworkZone> Client<Z> {
} }
.into() .into()
} }
/// Create a [`WeakClient`] for this [`Client`].
pub fn downgrade(&self) -> WeakClient<Z> {
WeakClient {
info: self.info.clone(),
connection_tx: self.connection_tx.downgrade(),
semaphore: self.semaphore.clone(),
permit: None,
error: self.error.clone(),
}
}
} }
impl<Z: NetworkZone> Service<PeerRequest> for Client<Z> { impl<Z: NetworkZone> Service<PeerRequest> for Client<Z> {

View file

@ -0,0 +1,114 @@
use std::task::{ready, Context, Poll};
use futures::channel::oneshot;
use tokio::sync::{mpsc, OwnedSemaphorePermit};
use tokio_util::sync::PollSemaphore;
use tower::Service;
use cuprate_helper::asynch::InfallibleOneshotReceiver;
use crate::{
client::{connection, PeerInformation},
NetworkZone, PeerError, PeerRequest, PeerResponse, SharedError,
};
/// A weak handle to a [`Client`](super::Client).
///
/// When this is dropped the peer will not be disconnected.
pub struct WeakClient<N: NetworkZone> {
/// Information on the connected peer.
pub info: PeerInformation<N::Addr>,
/// The channel to the [`Connection`](connection::Connection) task.
pub(super) connection_tx: mpsc::WeakSender<connection::ConnectionTaskRequest>,
/// The semaphore that limits the requests sent to the peer.
pub(super) semaphore: PollSemaphore,
/// A permit for the semaphore, will be [`Some`] after `poll_ready` returns ready.
pub(super) permit: Option<OwnedSemaphorePermit>,
/// The error slot shared between the [`Client`] and [`Connection`](connection::Connection).
pub(super) error: SharedError<PeerError>,
}
impl<N: NetworkZone> WeakClient<N> {
/// Internal function to set an error on the [`SharedError`].
fn set_err(&self, err: PeerError) -> tower::BoxError {
let err_str = err.to_string();
match self.error.try_insert_err(err) {
Ok(()) => err_str,
Err(e) => e.to_string(),
}
.into()
}
}
impl<Z: NetworkZone> Service<PeerRequest> for WeakClient<Z> {
type Response = PeerResponse;
type Error = tower::BoxError;
type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if let Some(err) = self.error.try_get_err() {
return Poll::Ready(Err(err.to_string().into()));
}
if self.connection_tx.strong_count() == 0 {
let err = self.set_err(PeerError::ClientChannelClosed);
return Poll::Ready(Err(err));
}
if self.permit.is_some() {
return Poll::Ready(Ok(()));
}
let permit = ready!(self.semaphore.poll_acquire(cx))
.expect("Client semaphore should not be closed!");
self.permit = Some(permit);
Poll::Ready(Ok(()))
}
#[expect(clippy::significant_drop_tightening)]
fn call(&mut self, request: PeerRequest) -> Self::Future {
let permit = self
.permit
.take()
.expect("poll_ready did not return ready before call to call");
let (tx, rx) = oneshot::channel();
let req = connection::ConnectionTaskRequest {
response_channel: tx,
request,
permit: Some(permit),
};
match self.connection_tx.upgrade() {
None => {
self.set_err(PeerError::ClientChannelClosed);
let resp = Err(PeerError::ClientChannelClosed.into());
drop(req.response_channel.send(resp));
}
Some(sender) => {
if let Err(e) = sender.try_send(req) {
// The connection task could have closed between a call to `poll_ready` and the call to
// `call`, which means if we don't handle the error here the receiver would panic.
use mpsc::error::TrySendError;
match e {
TrySendError::Closed(req) | TrySendError::Full(req) => {
self.set_err(PeerError::ClientChannelClosed);
let resp = Err(PeerError::ClientChannelClosed.into());
drop(req.response_channel.send(resp));
}
}
}
}
}
rx.into()
}
}

View file

@ -20,12 +20,12 @@ monero-serai = { workspace = true, features = ["std"] }
tower = { workspace = true, features = ["buffer"] } tower = { workspace = true, features = ["buffer"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
rayon = { workspace = true }
tokio-util = { workspace = true } tokio-util = { workspace = true }
rayon = { workspace = true }
tokio-stream = { workspace = true, features = ["sync", "time"] } tokio-stream = { workspace = true, features = ["sync", "time"] }
futures = { workspace = true, features = ["std"] } futures = { workspace = true, features = ["std"] }
pin-project = { workspace = true } pin-project = { workspace = true }
dashmap = { workspace = true } indexmap = { workspace = true, features = ["std"] }
thiserror = { workspace = true } thiserror = { workspace = true }
bytes = { workspace = true, features = ["std"] } bytes = { workspace = true, features = ["std"] }

View file

@ -8,7 +8,6 @@
use std::{ use std::{
cmp::{max, min, Reverse}, cmp::{max, min, Reverse},
collections::{BTreeMap, BinaryHeap}, collections::{BTreeMap, BinaryHeap},
sync::Arc,
time::Duration, time::Duration,
}; };
@ -18,7 +17,7 @@ use tokio::{
task::JoinSet, task::JoinSet,
time::{interval, timeout, MissedTickBehavior}, time::{interval, timeout, MissedTickBehavior},
}; };
use tower::{Service, ServiceExt}; use tower::{util::BoxCloneService, Service, ServiceExt};
use tracing::{instrument, Instrument, Span}; use tracing::{instrument, Instrument, Span};
use cuprate_async_buffer::{BufferAppender, BufferStream}; use cuprate_async_buffer::{BufferAppender, BufferStream};
@ -27,11 +26,11 @@ use cuprate_p2p_core::{handles::ConnectionHandle, NetworkZone};
use cuprate_pruning::PruningSeed; use cuprate_pruning::PruningSeed;
use crate::{ use crate::{
client_pool::{ClientPool, ClientPoolDropGuard},
constants::{ constants::{
BLOCK_DOWNLOADER_REQUEST_TIMEOUT, EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED, LONG_BAN, BLOCK_DOWNLOADER_REQUEST_TIMEOUT, EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED, LONG_BAN,
MAX_BLOCK_BATCH_LEN, MAX_DOWNLOAD_FAILURES, MAX_BLOCK_BATCH_LEN, MAX_DOWNLOAD_FAILURES,
}, },
peer_set::ClientDropGuard,
}; };
mod block_queue; mod block_queue;
@ -41,6 +40,7 @@ mod request_chain;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
use crate::peer_set::{PeerSetRequest, PeerSetResponse};
use block_queue::{BlockQueue, ReadyQueueBatch}; use block_queue::{BlockQueue, ReadyQueueBatch};
use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker}; use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker};
use download_batch::download_batch_task; use download_batch::download_batch_task;
@ -135,7 +135,7 @@ pub enum ChainSvcResponse {
/// call this function again, so it can start the search again. /// call this function again, so it can start the search again.
#[instrument(level = "error", skip_all, name = "block_downloader")] #[instrument(level = "error", skip_all, name = "block_downloader")]
pub fn download_blocks<N: NetworkZone, C>( pub fn download_blocks<N: NetworkZone, C>(
client_pool: Arc<ClientPool<N>>, peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
our_chain_svc: C, our_chain_svc: C,
config: BlockDownloaderConfig, config: BlockDownloaderConfig,
) -> BufferStream<BlockBatch> ) -> BufferStream<BlockBatch>
@ -147,8 +147,7 @@ where
{ {
let (buffer_appender, buffer_stream) = cuprate_async_buffer::new_buffer(config.buffer_size); let (buffer_appender, buffer_stream) = cuprate_async_buffer::new_buffer(config.buffer_size);
let block_downloader = let block_downloader = BlockDownloader::new(peer_set, our_chain_svc, buffer_appender, config);
BlockDownloader::new(client_pool, our_chain_svc, buffer_appender, config);
tokio::spawn( tokio::spawn(
block_downloader block_downloader
@ -186,8 +185,8 @@ where
/// - download an already requested batch of blocks (this might happen due to an error in the previous request /// - download an already requested batch of blocks (this might happen due to an error in the previous request
/// or because the queue of ready blocks is too large, so we need the oldest block to clear it). /// or because the queue of ready blocks is too large, so we need the oldest block to clear it).
struct BlockDownloader<N: NetworkZone, C> { struct BlockDownloader<N: NetworkZone, C> {
/// The client pool. /// The peer set.
client_pool: Arc<ClientPool<N>>, peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
/// The service that holds our current chain state. /// The service that holds our current chain state.
our_chain_svc: C, our_chain_svc: C,
@ -208,7 +207,7 @@ struct BlockDownloader<N: NetworkZone, C> {
/// ///
/// Returns a result of the chain entry or an error. /// Returns a result of the chain entry or an error.
#[expect(clippy::type_complexity)] #[expect(clippy::type_complexity)]
chain_entry_task: JoinSet<Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError>>, chain_entry_task: JoinSet<Result<(ClientDropGuard<N>, ChainEntry<N>), BlockDownloadError>>,
/// The current inflight requests. /// The current inflight requests.
/// ///
@ -235,13 +234,13 @@ where
{ {
/// Creates a new [`BlockDownloader`] /// Creates a new [`BlockDownloader`]
fn new( fn new(
client_pool: Arc<ClientPool<N>>, peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
our_chain_svc: C, our_chain_svc: C,
buffer_appender: BufferAppender<BlockBatch>, buffer_appender: BufferAppender<BlockBatch>,
config: BlockDownloaderConfig, config: BlockDownloaderConfig,
) -> Self { ) -> Self {
Self { Self {
client_pool, peer_set,
our_chain_svc, our_chain_svc,
amount_of_blocks_to_request: config.initial_batch_len, amount_of_blocks_to_request: config.initial_batch_len,
amount_of_blocks_to_request_updated_at: 0, amount_of_blocks_to_request_updated_at: 0,
@ -259,7 +258,7 @@ where
fn check_pending_peers( fn check_pending_peers(
&mut self, &mut self,
chain_tracker: &mut ChainTracker<N>, chain_tracker: &mut ChainTracker<N>,
pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>, pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientDropGuard<N>>>,
) { ) {
tracing::debug!("Checking if we can give any work to pending peers."); tracing::debug!("Checking if we can give any work to pending peers.");
@ -286,11 +285,11 @@ where
/// This function will find the batch(es) that we are waiting on to clear our ready queue and sends another request /// This function will find the batch(es) that we are waiting on to clear our ready queue and sends another request
/// for them. /// for them.
/// ///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the batch according to its pruning seed. /// Returns the [`ClientDropGuard`] back if it doesn't have the batch according to its pruning seed.
fn request_inflight_batch_again( fn request_inflight_batch_again(
&mut self, &mut self,
client: ClientPoolDropGuard<N>, client: ClientDropGuard<N>,
) -> Option<ClientPoolDropGuard<N>> { ) -> Option<ClientDropGuard<N>> {
tracing::debug!( tracing::debug!(
"Requesting an inflight batch, current ready queue size: {}", "Requesting an inflight batch, current ready queue size: {}",
self.block_queue.size() self.block_queue.size()
@ -336,13 +335,13 @@ where
/// ///
/// The batch requested will depend on our current state, failed batches will be prioritised. /// The batch requested will depend on our current state, failed batches will be prioritised.
/// ///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according /// Returns the [`ClientDropGuard`] back if it doesn't have the data we currently need according
/// to its pruning seed. /// to its pruning seed.
fn request_block_batch( fn request_block_batch(
&mut self, &mut self,
chain_tracker: &mut ChainTracker<N>, chain_tracker: &mut ChainTracker<N>,
client: ClientPoolDropGuard<N>, client: ClientDropGuard<N>,
) -> Option<ClientPoolDropGuard<N>> { ) -> Option<ClientDropGuard<N>> {
tracing::trace!("Using peer to request a batch of blocks."); tracing::trace!("Using peer to request a batch of blocks.");
// First look to see if we have any failed requests. // First look to see if we have any failed requests.
while let Some(failed_request) = self.failed_batches.peek() { while let Some(failed_request) = self.failed_batches.peek() {
@ -416,13 +415,13 @@ where
/// This function will use our current state to decide if we should send a request for a chain entry /// This function will use our current state to decide if we should send a request for a chain entry
/// or if we should request a batch of blocks. /// or if we should request a batch of blocks.
/// ///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according /// Returns the [`ClientDropGuard`] back if it doesn't have the data we currently need according
/// to its pruning seed. /// to its pruning seed.
fn try_handle_free_client( fn try_handle_free_client(
&mut self, &mut self,
chain_tracker: &mut ChainTracker<N>, chain_tracker: &mut ChainTracker<N>,
client: ClientPoolDropGuard<N>, client: ClientDropGuard<N>,
) -> Option<ClientPoolDropGuard<N>> { ) -> Option<ClientDropGuard<N>> {
// We send 2 requests, so if one of them is slow or doesn't have the next chain, we still have a backup. // We send 2 requests, so if one of them is slow or doesn't have the next chain, we still have a backup.
if self.chain_entry_task.len() < 2 if self.chain_entry_task.len() < 2
// If we have had too many failures then assume the tip has been found so no more chain entries. // If we have had too many failures then assume the tip has been found so no more chain entries.
@ -463,7 +462,7 @@ where
async fn check_for_free_clients( async fn check_for_free_clients(
&mut self, &mut self,
chain_tracker: &mut ChainTracker<N>, chain_tracker: &mut ChainTracker<N>,
pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>, pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientDropGuard<N>>>,
) -> Result<(), BlockDownloadError> { ) -> Result<(), BlockDownloadError> {
tracing::debug!("Checking for free peers"); tracing::debug!("Checking for free peers");
@ -478,10 +477,19 @@ where
panic!("Chain service returned wrong response."); panic!("Chain service returned wrong response.");
}; };
for client in self let PeerSetResponse::PeersWithMorePoW(clients) = self
.client_pool .peer_set
.clients_with_more_cumulative_difficulty(current_cumulative_difficulty) .ready()
{ .await?
.call(PeerSetRequest::PeersWithMorePoW(
current_cumulative_difficulty,
))
.await?
else {
unreachable!();
};
for client in clients {
pending_peers pending_peers
.entry(client.info.pruning_seed) .entry(client.info.pruning_seed)
.or_default() .or_default()
@ -497,9 +505,9 @@ where
async fn handle_download_batch_res( async fn handle_download_batch_res(
&mut self, &mut self,
start_height: usize, start_height: usize,
res: Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError>, res: Result<(ClientDropGuard<N>, BlockBatch), BlockDownloadError>,
chain_tracker: &mut ChainTracker<N>, chain_tracker: &mut ChainTracker<N>,
pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>, pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientDropGuard<N>>>,
) -> Result<(), BlockDownloadError> { ) -> Result<(), BlockDownloadError> {
tracing::debug!("Handling block download response"); tracing::debug!("Handling block download response");
@ -593,7 +601,7 @@ where
/// Starts the main loop of the block downloader. /// Starts the main loop of the block downloader.
async fn run(mut self) -> Result<(), BlockDownloadError> { async fn run(mut self) -> Result<(), BlockDownloadError> {
let mut chain_tracker = let mut chain_tracker =
initial_chain_search(&self.client_pool, &mut self.our_chain_svc).await?; initial_chain_search(&mut self.peer_set, &mut self.our_chain_svc).await?;
let mut pending_peers = BTreeMap::new(); let mut pending_peers = BTreeMap::new();
@ -662,7 +670,7 @@ struct BlockDownloadTaskResponse<N: NetworkZone> {
/// The start height of the batch. /// The start height of the batch.
start_height: usize, start_height: usize,
/// A result containing the batch or an error. /// A result containing the batch or an error.
result: Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError>, result: Result<(ClientDropGuard<N>, BlockBatch), BlockDownloadError>,
} }
/// Returns if a peer has all the blocks in a range, according to its [`PruningSeed`]. /// Returns if a peer has all the blocks in a range, according to its [`PruningSeed`].

View file

@ -16,8 +16,8 @@ use cuprate_wire::protocol::{GetObjectsRequest, GetObjectsResponse};
use crate::{ use crate::{
block_downloader::{BlockBatch, BlockDownloadError, BlockDownloadTaskResponse}, block_downloader::{BlockBatch, BlockDownloadError, BlockDownloadTaskResponse},
client_pool::ClientPoolDropGuard,
constants::{BLOCK_DOWNLOADER_REQUEST_TIMEOUT, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN}, constants::{BLOCK_DOWNLOADER_REQUEST_TIMEOUT, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN},
peer_set::ClientDropGuard,
}; };
/// Attempts to request a batch of blocks from a peer, returning [`BlockDownloadTaskResponse`]. /// Attempts to request a batch of blocks from a peer, returning [`BlockDownloadTaskResponse`].
@ -32,7 +32,7 @@ use crate::{
)] )]
#[expect(clippy::used_underscore_binding)] #[expect(clippy::used_underscore_binding)]
pub async fn download_batch_task<N: NetworkZone>( pub async fn download_batch_task<N: NetworkZone>(
client: ClientPoolDropGuard<N>, client: ClientDropGuard<N>,
ids: ByteArrayVec<32>, ids: ByteArrayVec<32>,
previous_id: [u8; 32], previous_id: [u8; 32],
expected_start_height: usize, expected_start_height: usize,
@ -49,11 +49,11 @@ pub async fn download_batch_task<N: NetworkZone>(
/// This function will validate the blocks that were downloaded were the ones asked for and that they match /// This function will validate the blocks that were downloaded were the ones asked for and that they match
/// the expected height. /// the expected height.
async fn request_batch_from_peer<N: NetworkZone>( async fn request_batch_from_peer<N: NetworkZone>(
mut client: ClientPoolDropGuard<N>, mut client: ClientDropGuard<N>,
ids: ByteArrayVec<32>, ids: ByteArrayVec<32>,
previous_id: [u8; 32], previous_id: [u8; 32],
expected_start_height: usize, expected_start_height: usize,
) -> Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError> { ) -> Result<(ClientDropGuard<N>, BlockBatch), BlockDownloadError> {
let request = PeerRequest::Protocol(ProtocolRequest::GetObjects(GetObjectsRequest { let request = PeerRequest::Protocol(ProtocolRequest::GetObjects(GetObjectsRequest {
blocks: ids.clone(), blocks: ids.clone(),
pruned: false, pruned: false,

View file

@ -1,7 +1,7 @@
use std::{mem, sync::Arc}; use std::mem;
use tokio::{task::JoinSet, time::timeout}; use tokio::{task::JoinSet, time::timeout};
use tower::{Service, ServiceExt}; use tower::{util::BoxCloneService, Service, ServiceExt};
use tracing::{instrument, Instrument, Span}; use tracing::{instrument, Instrument, Span};
use cuprate_p2p_core::{ use cuprate_p2p_core::{
@ -15,11 +15,11 @@ use crate::{
chain_tracker::{ChainEntry, ChainTracker}, chain_tracker::{ChainEntry, ChainTracker},
BlockDownloadError, ChainSvcRequest, ChainSvcResponse, BlockDownloadError, ChainSvcRequest, ChainSvcResponse,
}, },
client_pool::{ClientPool, ClientPoolDropGuard},
constants::{ constants::{
BLOCK_DOWNLOADER_REQUEST_TIMEOUT, INITIAL_CHAIN_REQUESTS_TO_SEND, BLOCK_DOWNLOADER_REQUEST_TIMEOUT, INITIAL_CHAIN_REQUESTS_TO_SEND,
MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MEDIUM_BAN, MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MEDIUM_BAN,
}, },
peer_set::{ClientDropGuard, PeerSetRequest, PeerSetResponse},
}; };
/// Request a chain entry from a peer. /// Request a chain entry from a peer.
@ -27,9 +27,9 @@ use crate::{
/// Because the block downloader only follows and downloads one chain we only have to send the block hash of /// Because the block downloader only follows and downloads one chain we only have to send the block hash of
/// top block we have found and the genesis block, this is then called `short_history`. /// top block we have found and the genesis block, this is then called `short_history`.
pub(crate) async fn request_chain_entry_from_peer<N: NetworkZone>( pub(crate) async fn request_chain_entry_from_peer<N: NetworkZone>(
mut client: ClientPoolDropGuard<N>, mut client: ClientDropGuard<N>,
short_history: [[u8; 32]; 2], short_history: [[u8; 32]; 2],
) -> Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError> { ) -> Result<(ClientDropGuard<N>, ChainEntry<N>), BlockDownloadError> {
let PeerResponse::Protocol(ProtocolResponse::GetChain(chain_res)) = client let PeerResponse::Protocol(ProtocolResponse::GetChain(chain_res)) = client
.ready() .ready()
.await? .await?
@ -80,7 +80,7 @@ pub(crate) async fn request_chain_entry_from_peer<N: NetworkZone>(
/// We then wait for their response and choose the peer who claims the highest cumulative difficulty. /// We then wait for their response and choose the peer who claims the highest cumulative difficulty.
#[instrument(level = "error", skip_all)] #[instrument(level = "error", skip_all)]
pub async fn initial_chain_search<N: NetworkZone, C>( pub async fn initial_chain_search<N: NetworkZone, C>(
client_pool: &Arc<ClientPool<N>>, peer_set: &mut BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
mut our_chain_svc: C, mut our_chain_svc: C,
) -> Result<ChainTracker<N>, BlockDownloadError> ) -> Result<ChainTracker<N>, BlockDownloadError>
where where
@ -102,9 +102,15 @@ where
let our_genesis = *block_ids.last().expect("Blockchain had no genesis block."); let our_genesis = *block_ids.last().expect("Blockchain had no genesis block.");
let mut peers = client_pool let PeerSetResponse::PeersWithMorePoW(clients) = peer_set
.clients_with_more_cumulative_difficulty(cumulative_difficulty) .ready()
.into_iter(); .await?
.call(PeerSetRequest::PeersWithMorePoW(cumulative_difficulty))
.await?
else {
unreachable!();
};
let mut peers = clients.into_iter();
let mut futs = JoinSet::new(); let mut futs = JoinSet::new();

View file

@ -14,8 +14,8 @@ use monero_serai::{
transaction::{Input, Timelock, Transaction, TransactionPrefix}, transaction::{Input, Timelock, Transaction, TransactionPrefix},
}; };
use proptest::{collection::vec, prelude::*}; use proptest::{collection::vec, prelude::*};
use tokio::time::timeout; use tokio::{sync::mpsc, time::timeout};
use tower::{service_fn, Service}; use tower::{buffer::Buffer, service_fn, Service, ServiceExt};
use cuprate_fixed_bytes::ByteArrayVec; use cuprate_fixed_bytes::ByteArrayVec;
use cuprate_p2p_core::{ use cuprate_p2p_core::{
@ -31,7 +31,7 @@ use cuprate_wire::{
use crate::{ use crate::{
block_downloader::{download_blocks, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}, block_downloader::{download_blocks, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
client_pool::ClientPool, peer_set::PeerSet,
}; };
proptest! { proptest! {
@ -48,19 +48,20 @@ proptest! {
let tokio_pool = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); let tokio_pool = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
#[expect(clippy::significant_drop_tightening)]
tokio_pool.block_on(async move { tokio_pool.block_on(async move {
timeout(Duration::from_secs(600), async move { timeout(Duration::from_secs(600), async move {
let client_pool = ClientPool::new(); let (new_connection_tx, new_connection_rx) = mpsc::channel(peers);
let peer_set = PeerSet::new(new_connection_rx);
for _ in 0..peers { for _ in 0..peers {
let client = mock_block_downloader_client(Arc::clone(&blockchain)); let client = mock_block_downloader_client(Arc::clone(&blockchain));
client_pool.add_new_client(client); new_connection_tx.try_send(client).unwrap();
} }
let stream = download_blocks( let stream = download_blocks(
client_pool, Buffer::new(peer_set, 10).boxed_clone(),
OurChainSvc { OurChainSvc {
genesis: *blockchain.blocks.first().unwrap().0 genesis: *blockchain.blocks.first().unwrap().0
}, },

View file

@ -1,188 +0,0 @@
//! # Client Pool.
//!
//! The [`ClientPool`], is a pool of currently connected peers that can be pulled from.
//! It does _not_ necessarily contain every connected peer as another place could have
//! taken a peer from the pool.
//!
//! When taking peers from the pool they are wrapped in [`ClientPoolDropGuard`], which
//! returns the peer to the pool when it is dropped.
//!
//! Internally the pool is a [`DashMap`] which means care should be taken in `async` code
//! as internally this uses blocking `RwLock`s.
use std::sync::Arc;
use dashmap::DashMap;
use tokio::sync::mpsc;
use tracing::{Instrument, Span};
use cuprate_p2p_core::{
client::{Client, InternalPeerID},
handles::ConnectionHandle,
ConnectionDirection, NetworkZone,
};
pub(crate) mod disconnect_monitor;
mod drop_guard_client;
pub use drop_guard_client::ClientPoolDropGuard;
/// The client pool, which holds currently connected free peers.
///
/// See the [module docs](self) for more.
pub struct ClientPool<N: NetworkZone> {
/// The connected [`Client`]s.
clients: DashMap<InternalPeerID<N::Addr>, Client<N>>,
/// A channel to send new peer ids down to monitor for disconnect.
new_connection_tx: mpsc::UnboundedSender<(ConnectionHandle, InternalPeerID<N::Addr>)>,
}
impl<N: NetworkZone> ClientPool<N> {
/// Returns a new [`ClientPool`] wrapped in an [`Arc`].
pub fn new() -> Arc<Self> {
let (tx, rx) = mpsc::unbounded_channel();
let pool = Arc::new(Self {
clients: DashMap::new(),
new_connection_tx: tx,
});
tokio::spawn(
disconnect_monitor::disconnect_monitor(rx, Arc::clone(&pool))
.instrument(Span::current()),
);
pool
}
/// Adds a [`Client`] to the pool, the client must have previously been taken from the
/// pool.
///
/// See [`ClientPool::add_new_client`] to add a [`Client`] which was not taken from the pool before.
///
/// # Panics
/// This function panics if `client` already exists in the pool.
fn add_client(&self, client: Client<N>) {
let handle = client.info.handle.clone();
let id = client.info.id;
// Fast path: if the client is disconnected don't add it to the peer set.
if handle.is_closed() {
return;
}
assert!(self.clients.insert(id, client).is_none());
// We have to check this again otherwise we could have a race condition where a
// peer is disconnected after the first check, the disconnect monitor tries to remove it,
// and then it is added to the pool.
if handle.is_closed() {
self.remove_client(&id);
}
}
/// Adds a _new_ [`Client`] to the pool, this client should be a new connection, and not already
/// from the pool.
///
/// # Panics
/// This function panics if `client` already exists in the pool.
pub fn add_new_client(&self, client: Client<N>) {
self.new_connection_tx
.send((client.info.handle.clone(), client.info.id))
.unwrap();
self.add_client(client);
}
/// Remove a [`Client`] from the pool.
///
/// [`None`] is returned if the client did not exist in the pool.
fn remove_client(&self, peer: &InternalPeerID<N::Addr>) -> Option<Client<N>> {
self.clients.remove(peer).map(|(_, client)| client)
}
/// Borrows a [`Client`] from the pool.
///
/// The [`Client`] is wrapped in [`ClientPoolDropGuard`] which
/// will return the client to the pool when it's dropped.
///
/// See [`Self::borrow_clients`] for borrowing multiple clients.
pub fn borrow_client(
self: &Arc<Self>,
peer: &InternalPeerID<N::Addr>,
) -> Option<ClientPoolDropGuard<N>> {
self.remove_client(peer).map(|client| ClientPoolDropGuard {
pool: Arc::clone(self),
client: Some(client),
})
}
/// Borrows multiple [`Client`]s from the pool.
///
/// Note that the returned iterator is not guaranteed to contain every peer asked for.
///
/// See [`Self::borrow_client`] for borrowing a single client.
pub fn borrow_clients<'a, 'b>(
self: &'a Arc<Self>,
peers: &'b [InternalPeerID<N::Addr>],
) -> impl Iterator<Item = ClientPoolDropGuard<N>> + sealed::Captures<(&'a (), &'b ())> {
peers.iter().filter_map(|peer| self.borrow_client(peer))
}
/// Borrows all [`Client`]s from the pool that have claimed a higher cumulative difficulty than
/// the amount passed in.
///
/// The [`Client`]s are wrapped in [`ClientPoolDropGuard`] which
/// will return the clients to the pool when they are dropped.
pub fn clients_with_more_cumulative_difficulty(
self: &Arc<Self>,
cumulative_difficulty: u128,
) -> Vec<ClientPoolDropGuard<N>> {
let peers = self
.clients
.iter()
.filter_map(|element| {
let peer_sync_info = element.value().info.core_sync_data.lock().unwrap();
if peer_sync_info.cumulative_difficulty() > cumulative_difficulty {
Some(*element.key())
} else {
None
}
})
.collect::<Vec<_>>();
self.borrow_clients(&peers).collect()
}
/// Checks all clients in the pool checking if any claim a higher cumulative difficulty than the
/// amount specified.
pub fn contains_client_with_more_cumulative_difficulty(
&self,
cumulative_difficulty: u128,
) -> bool {
self.clients.iter().any(|element| {
let sync_data = element.value().info.core_sync_data.lock().unwrap();
sync_data.cumulative_difficulty() > cumulative_difficulty
})
}
/// Returns the first outbound peer when iterating over the peers.
pub fn outbound_client(self: &Arc<Self>) -> Option<ClientPoolDropGuard<N>> {
let client = self
.clients
.iter()
.find(|element| element.value().info.direction == ConnectionDirection::Outbound)?;
let id = *client.key();
Some(self.borrow_client(&id).unwrap())
}
}
mod sealed {
/// TODO: Remove me when 2024 Rust
///
/// <https://rust-lang.github.io/rfcs/3498-lifetime-capture-rules-2024.html#the-captures-trick>
pub trait Captures<U> {}
impl<T: ?Sized, U> Captures<U> for T {}
}

View file

@ -1,83 +0,0 @@
//! # Disconnect Monitor
//!
//! This module contains the [`disconnect_monitor`] task, which monitors connected peers for disconnection
//! and then removes them from the [`ClientPool`] if they do.
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures::{stream::FuturesUnordered, StreamExt};
use tokio::sync::mpsc;
use tokio_util::sync::WaitForCancellationFutureOwned;
use tracing::instrument;
use cuprate_p2p_core::{client::InternalPeerID, handles::ConnectionHandle, NetworkZone};
use super::ClientPool;
/// The disconnect monitor task.
#[instrument(level = "info", skip_all)]
pub async fn disconnect_monitor<N: NetworkZone>(
mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID<N::Addr>)>,
client_pool: Arc<ClientPool<N>>,
) {
// We need to hold a weak reference otherwise the client pool and this would hold a reference to
// each other causing the pool to be leaked.
let weak_client_pool = Arc::downgrade(&client_pool);
drop(client_pool);
tracing::info!("Starting peer disconnect monitor.");
let mut futs: FuturesUnordered<PeerDisconnectFut<N>> = FuturesUnordered::new();
loop {
tokio::select! {
Some((con_handle, peer_id)) = new_connection_rx.recv() => {
tracing::debug!("Monitoring {peer_id} for disconnect");
futs.push(PeerDisconnectFut {
closed_fut: con_handle.closed(),
peer_id: Some(peer_id),
});
}
Some(peer_id) = futs.next() => {
tracing::debug!("{peer_id} has disconnected, removing from client pool.");
let Some(pool) = weak_client_pool.upgrade() else {
tracing::info!("Peer disconnect monitor shutting down.");
return;
};
pool.remove_client(&peer_id);
drop(pool);
}
else => {
tracing::info!("Peer disconnect monitor shutting down.");
return;
}
}
}
}
/// A [`Future`] that resolves when a peer disconnects.
#[pin_project::pin_project]
pub(crate) struct PeerDisconnectFut<N: NetworkZone> {
/// The inner [`Future`] that resolves when a peer disconnects.
#[pin]
pub(crate) closed_fut: WaitForCancellationFutureOwned,
/// The peers ID.
pub(crate) peer_id: Option<InternalPeerID<N::Addr>>,
}
impl<N: NetworkZone> Future for PeerDisconnectFut<N> {
type Output = InternalPeerID<N::Addr>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.closed_fut
.poll(cx)
.map(|()| this.peer_id.take().unwrap())
}
}

View file

@ -1,41 +0,0 @@
use std::{
ops::{Deref, DerefMut},
sync::Arc,
};
use cuprate_p2p_core::{client::Client, NetworkZone};
use crate::client_pool::ClientPool;
/// A wrapper around [`Client`] which returns the client to the [`ClientPool`] when dropped.
pub struct ClientPoolDropGuard<N: NetworkZone> {
/// The [`ClientPool`] to return the peer to.
pub(super) pool: Arc<ClientPool<N>>,
/// The [`Client`].
///
/// This is set to [`Some`] when this guard is created, then
/// [`take`](Option::take)n and returned to the pool when dropped.
pub(super) client: Option<Client<N>>,
}
impl<N: NetworkZone> Deref for ClientPoolDropGuard<N> {
type Target = Client<N>;
fn deref(&self) -> &Self::Target {
self.client.as_ref().unwrap()
}
}
impl<N: NetworkZone> DerefMut for ClientPoolDropGuard<N> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.client.as_mut().unwrap()
}
}
impl<N: NetworkZone> Drop for ClientPoolDropGuard<N> {
fn drop(&mut self) {
let client = self.client.take().unwrap();
self.pool.add_client(client);
}
}

View file

@ -21,7 +21,6 @@ use cuprate_p2p_core::{
}; };
use crate::{ use crate::{
client_pool::ClientPool,
config::P2PConfig, config::P2PConfig,
constants::{HANDSHAKE_TIMEOUT, MAX_SEED_CONNECTIONS, OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT}, constants::{HANDSHAKE_TIMEOUT, MAX_SEED_CONNECTIONS, OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT},
}; };
@ -46,7 +45,7 @@ pub struct MakeConnectionRequest {
/// This handles maintaining a minimum number of connections and making extra connections when needed, upto a maximum. /// This handles maintaining a minimum number of connections and making extra connections when needed, upto a maximum.
pub struct OutboundConnectionKeeper<N: NetworkZone, A, C> { pub struct OutboundConnectionKeeper<N: NetworkZone, A, C> {
/// The pool of currently connected peers. /// The pool of currently connected peers.
pub client_pool: Arc<ClientPool<N>>, pub new_peers_tx: mpsc::Sender<Client<N>>,
/// The channel that tells us to make new _extra_ outbound connections. /// The channel that tells us to make new _extra_ outbound connections.
pub make_connection_rx: mpsc::Receiver<MakeConnectionRequest>, pub make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
/// The address book service /// The address book service
@ -77,7 +76,7 @@ where
{ {
pub fn new( pub fn new(
config: P2PConfig<N>, config: P2PConfig<N>,
client_pool: Arc<ClientPool<N>>, new_peers_tx: mpsc::Sender<Client<N>>,
make_connection_rx: mpsc::Receiver<MakeConnectionRequest>, make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
address_book_svc: A, address_book_svc: A,
connector_svc: C, connector_svc: C,
@ -86,7 +85,7 @@ where
.expect("Gray peer percent is incorrect should be 0..=1"); .expect("Gray peer percent is incorrect should be 0..=1");
Self { Self {
client_pool, new_peers_tx,
make_connection_rx, make_connection_rx,
address_book_svc, address_book_svc,
connector_svc, connector_svc,
@ -149,7 +148,7 @@ where
/// Connects to a given outbound peer. /// Connects to a given outbound peer.
#[instrument(level = "info", skip_all)] #[instrument(level = "info", skip_all)]
async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) { async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) {
let client_pool = Arc::clone(&self.client_pool); let new_peers_tx = self.new_peers_tx.clone();
let connection_fut = self let connection_fut = self
.connector_svc .connector_svc
.ready() .ready()
@ -164,7 +163,7 @@ where
async move { async move {
#[expect(clippy::significant_drop_in_scrutinee)] #[expect(clippy::significant_drop_in_scrutinee)]
if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await { if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await {
client_pool.add_new_client(peer); drop(new_peers_tx.send(peer).await);
} }
} }
.instrument(Span::current()), .instrument(Span::current()),

View file

@ -6,7 +6,7 @@ use std::{pin::pin, sync::Arc};
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use tokio::{ use tokio::{
sync::Semaphore, sync::{mpsc, Semaphore},
task::JoinSet, task::JoinSet,
time::{sleep, timeout}, time::{sleep, timeout},
}; };
@ -24,7 +24,6 @@ use cuprate_wire::{
}; };
use crate::{ use crate::{
client_pool::ClientPool,
constants::{ constants::{
HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN, PING_REQUEST_CONCURRENCY, HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN, PING_REQUEST_CONCURRENCY,
PING_REQUEST_TIMEOUT, PING_REQUEST_TIMEOUT,
@ -36,7 +35,7 @@ use crate::{
/// and initiate handshake if needed, after verifying the address isn't banned. /// and initiate handshake if needed, after verifying the address isn't banned.
#[instrument(level = "warn", skip_all)] #[instrument(level = "warn", skip_all)]
pub async fn inbound_server<N, HS, A>( pub async fn inbound_server<N, HS, A>(
client_pool: Arc<ClientPool<N>>, new_connection_tx: mpsc::Sender<Client<N>>,
mut handshaker: HS, mut handshaker: HS,
mut address_book: A, mut address_book: A,
config: P2PConfig<N>, config: P2PConfig<N>,
@ -111,13 +110,13 @@ where
permit: Some(permit), permit: Some(permit),
}); });
let cloned_pool = Arc::clone(&client_pool); let new_connection_tx = new_connection_tx.clone();
tokio::spawn( tokio::spawn(
async move { async move {
let client = timeout(HANDSHAKE_TIMEOUT, fut).await; let client = timeout(HANDSHAKE_TIMEOUT, fut).await;
if let Ok(Ok(peer)) = client { if let Ok(Ok(peer)) = client {
cloned_pool.add_new_client(peer); drop(new_connection_tx.send(peer).await);
} }
} }
.instrument(Span::current()), .instrument(Span::current()),

View file

@ -18,17 +18,18 @@ use cuprate_p2p_core::{
pub mod block_downloader; pub mod block_downloader;
mod broadcast; mod broadcast;
pub mod client_pool;
pub mod config; pub mod config;
pub mod connection_maintainer; pub mod connection_maintainer;
pub mod constants; pub mod constants;
mod inbound_server; mod inbound_server;
mod peer_set;
use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}; use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse};
pub use broadcast::{BroadcastRequest, BroadcastSvc}; pub use broadcast::{BroadcastRequest, BroadcastSvc};
pub use client_pool::{ClientPool, ClientPoolDropGuard};
pub use config::{AddressBookConfig, P2PConfig}; pub use config::{AddressBookConfig, P2PConfig};
use connection_maintainer::MakeConnectionRequest; use connection_maintainer::MakeConnectionRequest;
use peer_set::PeerSet;
pub use peer_set::{ClientDropGuard, PeerSetRequest, PeerSetResponse};
/// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`]. /// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`].
/// ///
@ -54,7 +55,10 @@ where
cuprate_address_book::init_address_book(config.address_book_config.clone()).await?; cuprate_address_book::init_address_book(config.address_book_config.clone()).await?;
let address_book = Buffer::new( let address_book = Buffer::new(
address_book, address_book,
config.max_inbound_connections + config.outbound_connections, config
.max_inbound_connections
.checked_add(config.outbound_connections)
.unwrap(),
); );
// Use the default config. Changing the defaults affects tx fluff times, which could affect D++ so for now don't allow changing // Use the default config. Changing the defaults affects tx fluff times, which could affect D++ so for now don't allow changing
@ -83,19 +87,25 @@ where
let outbound_handshaker = outbound_handshaker_builder.build(); let outbound_handshaker = outbound_handshaker_builder.build();
let client_pool = ClientPool::new(); let (new_connection_tx, new_connection_rx) = mpsc::channel(
config
.outbound_connections
.checked_add(config.max_inbound_connections)
.unwrap(),
);
let (make_connection_tx, make_connection_rx) = mpsc::channel(3); let (make_connection_tx, make_connection_rx) = mpsc::channel(3);
let outbound_connector = Connector::new(outbound_handshaker); let outbound_connector = Connector::new(outbound_handshaker);
let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new( let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new(
config.clone(), config.clone(),
Arc::clone(&client_pool), new_connection_tx.clone(),
make_connection_rx, make_connection_rx,
address_book.clone(), address_book.clone(),
outbound_connector, outbound_connector,
); );
let peer_set = PeerSet::new(new_connection_rx);
let mut background_tasks = JoinSet::new(); let mut background_tasks = JoinSet::new();
background_tasks.spawn( background_tasks.spawn(
@ -105,7 +115,7 @@ where
); );
background_tasks.spawn( background_tasks.spawn(
inbound_server::inbound_server( inbound_server::inbound_server(
Arc::clone(&client_pool), new_connection_tx,
inbound_handshaker, inbound_handshaker,
address_book.clone(), address_book.clone(),
config, config,
@ -121,7 +131,7 @@ where
); );
Ok(NetworkInterface { Ok(NetworkInterface {
pool: client_pool, peer_set: Buffer::new(peer_set, 10).boxed_clone(),
broadcast_svc, broadcast_svc,
make_connection_tx, make_connection_tx,
address_book: address_book.boxed_clone(), address_book: address_book.boxed_clone(),
@ -133,7 +143,7 @@ where
#[derive(Clone)] #[derive(Clone)]
pub struct NetworkInterface<N: NetworkZone> { pub struct NetworkInterface<N: NetworkZone> {
/// A pool of free connected peers. /// A pool of free connected peers.
pool: Arc<ClientPool<N>>, peer_set: BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError>,
/// A [`Service`] that allows broadcasting to all connected peers. /// A [`Service`] that allows broadcasting to all connected peers.
broadcast_svc: BroadcastSvc<N>, broadcast_svc: BroadcastSvc<N>,
/// A channel to request extra connections. /// A channel to request extra connections.
@ -163,7 +173,7 @@ impl<N: NetworkZone> NetworkInterface<N> {
+ 'static, + 'static,
C::Future: Send + 'static, C::Future: Send + 'static,
{ {
block_downloader::download_blocks(Arc::clone(&self.pool), our_chain_service, config) block_downloader::download_blocks(self.peer_set.clone(), our_chain_service, config)
} }
/// Returns the address book service. /// Returns the address book service.
@ -173,8 +183,10 @@ impl<N: NetworkZone> NetworkInterface<N> {
self.address_book.clone() self.address_book.clone()
} }
/// Borrows the `ClientPool`, for access to connected peers. /// Borrows the `PeerSet`, for access to connected peers.
pub const fn client_pool(&self) -> &Arc<ClientPool<N>> { pub fn peer_set(
&self.pool &mut self,
) -> &mut BoxCloneService<PeerSetRequest, PeerSetResponse<N>, tower::BoxError> {
&mut self.peer_set
} }
} }

217
p2p/p2p/src/peer_set.rs Normal file
View file

@ -0,0 +1,217 @@
use std::{
future::{ready, Future, Ready},
pin::{pin, Pin},
task::{Context, Poll},
};
use futures::{stream::FuturesUnordered, StreamExt};
use indexmap::{IndexMap, IndexSet};
use rand::{seq::index::sample, thread_rng};
use tokio::sync::mpsc::Receiver;
use tokio_util::sync::WaitForCancellationFutureOwned;
use tower::Service;
use cuprate_helper::cast::u64_to_usize;
use cuprate_p2p_core::{
client::{Client, InternalPeerID},
ConnectionDirection, NetworkZone,
};
mod client_wrappers;
pub use client_wrappers::ClientDropGuard;
use client_wrappers::StoredClient;
/// A request to the peer-set.
pub enum PeerSetRequest {
/// The most claimed proof-of-work from a peer in the peer-set.
MostPoWSeen,
/// Peers with more cumulative difficulty than the given cumulative difficulty.
///
/// Returned peers will be remembered and won't be returned from subsequent calls until the guard is dropped.
PeersWithMorePoW(u128),
/// A random outbound peer.
///
/// The returned peer will be remembered and won't be returned from subsequent calls until the guard is dropped.
StemPeer,
}
/// A response from the peer-set.
pub enum PeerSetResponse<N: NetworkZone> {
/// [`PeerSetRequest::MostPoWSeen`]
MostPoWSeen {
/// The cumulative difficulty claimed.
cumulative_difficulty: u128,
/// The height claimed.
height: usize,
/// The claimed hash of the top block.
top_hash: [u8; 32],
},
/// [`PeerSetRequest::PeersWithMorePoW`]
///
/// Returned peers will be remembered and won't be returned from subsequent calls until the guard is dropped.
PeersWithMorePoW(Vec<ClientDropGuard<N>>),
/// [`PeerSetRequest::StemPeer`]
///
/// The returned peer will be remembered and won't be returned from subsequent calls until the guard is dropped.
StemPeer(Option<ClientDropGuard<N>>),
}
/// A [`Future`] that completes when a peer disconnects.
#[pin_project::pin_project]
struct ClosedConnectionFuture<N: NetworkZone> {
#[pin]
fut: WaitForCancellationFutureOwned,
id: Option<InternalPeerID<N::Addr>>,
}
impl<N: NetworkZone> Future for ClosedConnectionFuture<N> {
type Output = InternalPeerID<N::Addr>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.fut.poll(cx).map(|()| this.id.take().unwrap())
}
}
/// A collection of all connected peers on a [`NetworkZone`].
pub(crate) struct PeerSet<N: NetworkZone> {
/// The connected peers.
peers: IndexMap<InternalPeerID<N::Addr>, StoredClient<N>>,
/// A [`FuturesUnordered`] that resolves when a peer disconnects.
closed_connections: FuturesUnordered<ClosedConnectionFuture<N>>,
/// The [`InternalPeerID`]s of all outbound peers.
outbound_peers: IndexSet<InternalPeerID<N::Addr>>,
/// A channel of new peers from the inbound server or outbound connector.
new_peers: Receiver<Client<N>>,
}
impl<N: NetworkZone> PeerSet<N> {
pub(crate) fn new(new_peers: Receiver<Client<N>>) -> Self {
Self {
peers: IndexMap::new(),
closed_connections: FuturesUnordered::new(),
outbound_peers: IndexSet::new(),
new_peers,
}
}
/// Polls the new peers channel for newly connected peers.
fn poll_new_peers(&mut self, cx: &mut Context<'_>) {
while let Poll::Ready(Some(new_peer)) = self.new_peers.poll_recv(cx) {
if new_peer.info.direction == ConnectionDirection::Outbound {
self.outbound_peers.insert(new_peer.info.id);
}
self.closed_connections.push(ClosedConnectionFuture {
fut: new_peer.info.handle.closed(),
id: Some(new_peer.info.id),
});
self.peers
.insert(new_peer.info.id, StoredClient::new(new_peer));
}
}
/// Remove disconnected peers from the peer set.
fn remove_dead_peers(&mut self, cx: &mut Context<'_>) {
while let Poll::Ready(Some(dead_peer)) = self.closed_connections.poll_next_unpin(cx) {
let Some(peer) = self.peers.swap_remove(&dead_peer) else {
continue;
};
if peer.client.info.direction == ConnectionDirection::Outbound {
self.outbound_peers.swap_remove(&peer.client.info.id);
}
self.peers.swap_remove(&dead_peer);
}
}
/// [`PeerSetRequest::MostPoWSeen`]
fn most_pow_seen(&self) -> PeerSetResponse<N> {
let most_pow_chain = self
.peers
.values()
.map(|peer| {
let core_sync_data = peer.client.info.core_sync_data.lock().unwrap();
(
core_sync_data.cumulative_difficulty(),
u64_to_usize(core_sync_data.current_height),
core_sync_data.top_id,
)
})
.max_by_key(|(cumulative_difficulty, ..)| *cumulative_difficulty)
.unwrap_or_default();
PeerSetResponse::MostPoWSeen {
cumulative_difficulty: most_pow_chain.0,
height: most_pow_chain.1,
top_hash: most_pow_chain.2,
}
}
/// [`PeerSetRequest::PeersWithMorePoW`]
fn peers_with_more_pow(&self, cumulative_difficulty: u128) -> PeerSetResponse<N> {
PeerSetResponse::PeersWithMorePoW(
self.peers
.values()
.filter(|&client| {
!client.is_downloading_blocks()
&& client
.client
.info
.core_sync_data
.lock()
.unwrap()
.cumulative_difficulty()
> cumulative_difficulty
})
.map(StoredClient::downloading_blocks_guard)
.collect(),
)
}
/// [`PeerSetRequest::StemPeer`]
fn random_peer_for_stem(&self) -> PeerSetResponse<N> {
PeerSetResponse::StemPeer(
sample(
&mut thread_rng(),
self.outbound_peers.len(),
self.outbound_peers.len(),
)
.into_iter()
.find_map(|i| {
let peer = self.outbound_peers.get_index(i).unwrap();
let client = self.peers.get(peer).unwrap();
(!client.is_a_stem_peer()).then(|| client.stem_peer_guard())
}),
)
}
}
impl<N: NetworkZone> Service<PeerSetRequest> for PeerSet<N> {
type Response = PeerSetResponse<N>;
type Error = tower::BoxError;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_new_peers(cx);
self.remove_dead_peers(cx);
// TODO: should we return `Pending` if we don't have any peers?
Poll::Ready(Ok(()))
}
fn call(&mut self, req: PeerSetRequest) -> Self::Future {
ready(match req {
PeerSetRequest::MostPoWSeen => Ok(self.most_pow_seen()),
PeerSetRequest::PeersWithMorePoW(cumulative_difficulty) => {
Ok(self.peers_with_more_pow(cumulative_difficulty))
}
PeerSetRequest::StemPeer => Ok(self.random_peer_for_stem()),
})
}
}

View file

@ -0,0 +1,86 @@
use std::{
ops::{Deref, DerefMut},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use cuprate_p2p_core::{
client::{Client, WeakClient},
NetworkZone,
};
/// A client stored in the peer-set.
pub(super) struct StoredClient<N: NetworkZone> {
pub client: Client<N>,
/// An [`AtomicBool`] for if the peer is currently downloading blocks.
downloading_blocks: Arc<AtomicBool>,
/// An [`AtomicBool`] for if the peer is currently being used to stem txs.
stem_peer: Arc<AtomicBool>,
}
impl<N: NetworkZone> StoredClient<N> {
pub(super) fn new(client: Client<N>) -> Self {
Self {
client,
downloading_blocks: Arc::new(AtomicBool::new(false)),
stem_peer: Arc::new(AtomicBool::new(false)),
}
}
/// Returns [`true`] if the [`StoredClient`] is currently downloading blocks.
pub(super) fn is_downloading_blocks(&self) -> bool {
self.downloading_blocks.load(Ordering::Relaxed)
}
/// Returns [`true`] if the [`StoredClient`] is currently being used to stem txs.
pub(super) fn is_a_stem_peer(&self) -> bool {
self.stem_peer.load(Ordering::Relaxed)
}
/// Returns a [`ClientDropGuard`] that while it is alive keeps the [`StoredClient`] in the downloading blocks state.
pub(super) fn downloading_blocks_guard(&self) -> ClientDropGuard<N> {
self.downloading_blocks.store(true, Ordering::Relaxed);
ClientDropGuard {
client: self.client.downgrade(),
bool: Arc::clone(&self.downloading_blocks),
}
}
/// Returns a [`ClientDropGuard`] that while it is alive keeps the [`StoredClient`] in the stemming peers state.
pub(super) fn stem_peer_guard(&self) -> ClientDropGuard<N> {
self.stem_peer.store(true, Ordering::Relaxed);
ClientDropGuard {
client: self.client.downgrade(),
bool: Arc::clone(&self.stem_peer),
}
}
}
/// A [`Drop`] guard for a client returned from the peer-set.
pub struct ClientDropGuard<N: NetworkZone> {
client: WeakClient<N>,
bool: Arc<AtomicBool>,
}
impl<N: NetworkZone> Deref for ClientDropGuard<N> {
type Target = WeakClient<N>;
fn deref(&self) -> &Self::Target {
&self.client
}
}
impl<N: NetworkZone> DerefMut for ClientDropGuard<N> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.client
}
}
impl<N: NetworkZone> Drop for ClientDropGuard<N> {
fn drop(&mut self) {
self.bool.store(false, Ordering::Relaxed);
}
}