unify PeerDisconnectFut and add more docs

This commit is contained in:
Boog900 2024-05-18 13:52:21 +01:00
parent f4ef14ca18
commit 41970d748a
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
3 changed files with 15 additions and 29 deletions

View file

@ -21,7 +21,7 @@ use monero_p2p::{
ConnectionDirection, NetworkZone, ConnectionDirection, NetworkZone,
}; };
mod disconnect_monitor; pub(crate) mod disconnect_monitor;
mod drop_guard_client; mod drop_guard_client;
pub use drop_guard_client::ClientPoolDropGuard; pub use drop_guard_client::ClientPoolDropGuard;

View file

@ -51,12 +51,12 @@ pub async fn disconnect_monitor<N: NetworkZone>(
/// A [`Future`] that resolves when a peer disconnects. /// A [`Future`] that resolves when a peer disconnects.
#[pin_project::pin_project] #[pin_project::pin_project]
struct PeerDisconnectFut<N: NetworkZone> { pub(crate) struct PeerDisconnectFut<N: NetworkZone> {
/// The inner [`Future`] that resolves when a peer disconnects. /// The inner [`Future`] that resolves when a peer disconnects.
#[pin] #[pin]
closed_fut: WaitForCancellationFutureOwned, pub(crate) closed_fut: WaitForCancellationFutureOwned,
/// The peers ID. /// The peers ID.
peer_id: Option<InternalPeerID<N::Addr>>, pub(crate) peer_id: Option<InternalPeerID<N::Addr>>,
} }
impl<N: NetworkZone> Future for PeerDisconnectFut<N> { impl<N: NetworkZone> Future for PeerDisconnectFut<N> {

View file

@ -1,14 +1,16 @@
//! # Sync States
//!
//! This module contains a [`PeerSyncSvc`] which keeps track of connected peers claimed chain states,
//! to allow checking if we are behind and getting a list of peers who claim they are ahead.
use std::{ use std::{
cmp::Ordering, cmp::Ordering,
collections::{BTreeMap, HashMap, HashSet}, collections::{BTreeMap, HashMap, HashSet},
future::{ready, Future, Ready}, future::{ready, Ready},
pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use futures::{stream::FuturesUnordered, StreamExt}; use futures::{stream::FuturesUnordered, StreamExt};
use tokio::sync::watch; use tokio::sync::watch;
use tokio_util::sync::WaitForCancellationFutureOwned;
use tower::Service; use tower::Service;
use monero_p2p::{ use monero_p2p::{
@ -20,7 +22,7 @@ use monero_p2p::{
use monero_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT}; use monero_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT};
use monero_wire::CoreSyncData; use monero_wire::CoreSyncData;
use crate::constants::SHORT_BAN; use crate::{client_pool::disconnect_monitor::PeerDisconnectFut, constants::SHORT_BAN};
/// The highest claimed sync info from our connected peers. /// The highest claimed sync info from our connected peers.
#[derive(Debug)] #[derive(Debug)]
@ -77,6 +79,7 @@ impl<N: NetworkZone> PeerSyncSvc<N> {
) )
} }
/// This function checks if any peers have disconnected, removing them if they have.
fn poll_disconnected(&mut self, cx: &mut Context<'_>) { fn poll_disconnected(&mut self, cx: &mut Context<'_>) {
while let Poll::Ready(Some(peer_id)) = self.closed_connections.poll_next_unpin(cx) { while let Poll::Ready(Some(peer_id)) = self.closed_connections.poll_next_unpin(cx) {
tracing::trace!("Peer {peer_id} disconnected, removing from peers sync info service."); tracing::trace!("Peer {peer_id} disconnected, removing from peers sync info service.");
@ -183,11 +186,13 @@ impl<N: NetworkZone> PeerSyncSvc<N> {
.or_default() .or_default()
.insert(peer_id); .insert(peer_id);
// If the claimed cumulative difficulty is higher than the current one in the watcher
// or if the peer in the watch has disconnected, update it.
if self.new_height_watcher.borrow().cumulative_difficulty < new_cumulative_difficulty if self.new_height_watcher.borrow().cumulative_difficulty < new_cumulative_difficulty
|| !self || self
.last_peer_in_watcher_handle .last_peer_in_watcher_handle
.as_ref() .as_ref()
.is_some_and(|handle| !handle.is_closed()) .is_some_and(|handle| handle.is_closed())
{ {
tracing::debug!( tracing::debug!(
"Updating sync watcher channel with new highest seen cumulative difficulty." "Updating sync watcher channel with new highest seen cumulative difficulty."
@ -233,25 +238,6 @@ impl<N: NetworkZone> Service<PeerSyncRequest<N>> for PeerSyncSvc<N> {
} }
} }
#[pin_project::pin_project]
struct PeerDisconnectFut<N: NetworkZone> {
#[pin]
closed_fut: WaitForCancellationFutureOwned,
peer_id: Option<InternalPeerID<N::Addr>>,
}
impl<N: NetworkZone> Future for PeerDisconnectFut<N> {
type Output = InternalPeerID<N::Addr>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.closed_fut
.poll(cx)
.map(|_| this.peer_id.take().unwrap())
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::Arc; use std::sync::Arc;