From 41970d748ae4c063354cb141a4df57f940077d3f Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sat, 18 May 2024 13:52:21 +0100 Subject: [PATCH] unify PeerDisconnectFut and add more docs --- p2p/cuprate-p2p/src/client_pool.rs | 2 +- .../src/client_pool/disconnect_monitor.rs | 6 ++-- p2p/cuprate-p2p/src/sync_states.rs | 36 ++++++------------- 3 files changed, 15 insertions(+), 29 deletions(-) diff --git a/p2p/cuprate-p2p/src/client_pool.rs b/p2p/cuprate-p2p/src/client_pool.rs index 15969392..5dc7d1b9 100644 --- a/p2p/cuprate-p2p/src/client_pool.rs +++ b/p2p/cuprate-p2p/src/client_pool.rs @@ -21,7 +21,7 @@ use monero_p2p::{ ConnectionDirection, NetworkZone, }; -mod disconnect_monitor; +pub(crate) mod disconnect_monitor; mod drop_guard_client; pub use drop_guard_client::ClientPoolDropGuard; diff --git a/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs b/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs index 01297352..4e5ec081 100644 --- a/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs +++ b/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs @@ -51,12 +51,12 @@ pub async fn disconnect_monitor<N: NetworkZone>( /// A [`Future`] that resolves when a peer disconnects. #[pin_project::pin_project] -struct PeerDisconnectFut<N: NetworkZone> { +pub(crate) struct PeerDisconnectFut<N: NetworkZone> { /// The inner [`Future`] that resolves when a peer disconnects. #[pin] - closed_fut: WaitForCancellationFutureOwned, + pub(crate) closed_fut: WaitForCancellationFutureOwned, /// The peers ID. - peer_id: Option<InternalPeerID<N::Addr>>, + pub(crate) peer_id: Option<InternalPeerID<N::Addr>>, } impl<N: NetworkZone> Future for PeerDisconnectFut<N> { diff --git a/p2p/cuprate-p2p/src/sync_states.rs b/p2p/cuprate-p2p/src/sync_states.rs index 1a44c291..39aabf66 100644 --- a/p2p/cuprate-p2p/src/sync_states.rs +++ b/p2p/cuprate-p2p/src/sync_states.rs @@ -1,14 +1,16 @@ +//! # Sync States +//! +//! This module contains a [`PeerSyncSvc`] which keeps track of connected peers claimed chain states, +//! to allow checking if we are behind and getting a list of peers who claim they are ahead. use std::{ cmp::Ordering, collections::{BTreeMap, HashMap, HashSet}, - future::{ready, Future, Ready}, - pin::Pin, + future::{ready, Ready}, task::{Context, Poll}, }; use futures::{stream::FuturesUnordered, StreamExt}; use tokio::sync::watch; -use tokio_util::sync::WaitForCancellationFutureOwned; use tower::Service; use monero_p2p::{ @@ -20,7 +22,7 @@ use monero_p2p::{ use monero_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT}; use monero_wire::CoreSyncData; -use crate::constants::SHORT_BAN; +use crate::{client_pool::disconnect_monitor::PeerDisconnectFut, constants::SHORT_BAN}; /// The highest claimed sync info from our connected peers. #[derive(Debug)] @@ -77,6 +79,7 @@ impl<N: NetworkZone> PeerSyncSvc<N> { ) } + /// This function checks if any peers have disconnected, removing them if they have. fn poll_disconnected(&mut self, cx: &mut Context<'_>) { while let Poll::Ready(Some(peer_id)) = self.closed_connections.poll_next_unpin(cx) { tracing::trace!("Peer {peer_id} disconnected, removing from peers sync info service."); @@ -183,11 +186,13 @@ impl<N: NetworkZone> PeerSyncSvc<N> { .or_default() .insert(peer_id); + // If the claimed cumulative difficulty is higher than the current one in the watcher + // or if the peer in the watch has disconnected, update it. if self.new_height_watcher.borrow().cumulative_difficulty < new_cumulative_difficulty - || !self + || self .last_peer_in_watcher_handle .as_ref() - .is_some_and(|handle| !handle.is_closed()) + .is_some_and(|handle| handle.is_closed()) { tracing::debug!( "Updating sync watcher channel with new highest seen cumulative difficulty." @@ -233,25 +238,6 @@ impl<N: NetworkZone> Service<PeerSyncRequest<N>> for PeerSyncSvc<N> { } } -#[pin_project::pin_project] -struct PeerDisconnectFut<N: NetworkZone> { - #[pin] - closed_fut: WaitForCancellationFutureOwned, - peer_id: Option<InternalPeerID<N::Addr>>, -} - -impl<N: NetworkZone> Future for PeerDisconnectFut<N> { - type Output = InternalPeerID<N::Addr>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let this = self.project(); - - this.closed_fut - .poll(cx) - .map(|_| this.peer_id.take().unwrap()) - } -} - #[cfg(test)] mod tests { use std::sync::Arc;