From d84e8a48f86924dc18b97264c83ffa56012fa87b Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Thu, 30 May 2024 17:02:38 +0100 Subject: [PATCH] add interface methods --- p2p/cuprate-p2p/src/broadcast.rs | 1 + p2p/cuprate-p2p/src/client_pool.rs | 16 +------ .../src/client_pool/disconnect_monitor.rs | 4 +- p2p/cuprate-p2p/src/inbound_server.rs | 2 +- p2p/cuprate-p2p/src/lib.rs | 47 +++++++++++++++---- p2p/cuprate-p2p/src/sync_states.rs | 8 ++-- 6 files changed, 49 insertions(+), 29 deletions(-) diff --git a/p2p/cuprate-p2p/src/broadcast.rs b/p2p/cuprate-p2p/src/broadcast.rs index cc8a3fd6..b6e5e807 100644 --- a/p2p/cuprate-p2p/src/broadcast.rs +++ b/p2p/cuprate-p2p/src/broadcast.rs @@ -151,6 +151,7 @@ pub enum BroadcastRequest<N: NetworkZone> { }, } +#[derive(Clone)] pub struct BroadcastSvc<N: NetworkZone> { new_block_watch: watch::Sender<NewBlockInfo>, tx_broadcast_channel_outbound: broadcast::Sender<BroadcastTxInfo<N>>, diff --git a/p2p/cuprate-p2p/src/client_pool.rs b/p2p/cuprate-p2p/src/client_pool.rs index ed547dde..8b77f423 100644 --- a/p2p/cuprate-p2p/src/client_pool.rs +++ b/p2p/cuprate-p2p/src/client_pool.rs @@ -12,14 +12,14 @@ //! use std::sync::Arc; -use dashmap::{DashMap, DashSet}; +use dashmap::DashMap; use tokio::sync::mpsc; use tracing::{Instrument, Span}; use monero_p2p::{ client::{Client, InternalPeerID}, handles::ConnectionHandle, - ConnectionDirection, NetworkZone, + NetworkZone, }; pub(crate) mod disconnect_monitor; @@ -33,11 +33,6 @@ pub use drop_guard_client::ClientPoolDropGuard; pub struct ClientPool<N: NetworkZone> { /// The connected [`Client`]s. clients: DashMap<InternalPeerID<N::Addr>, Client<N>>, - /// A set of outbound clients, as these allow accesses/mutation from different threads, - /// a peer ID in here does not mean the peer is necessarily in `clients` as it could have been removed - /// by another thread. However, if the peer is in both here and `clients` it is definitely - /// an outbound peer. - outbound_clients: DashSet<InternalPeerID<N::Addr>>, /// A channel to send new peer ids down to monitor for disconnect. new_connection_tx: mpsc::UnboundedSender<(ConnectionHandle, InternalPeerID<N::Addr>)>, } @@ -49,7 +44,6 @@ impl<N: NetworkZone> ClientPool<N> { let pool = Arc::new(ClientPool { clients: DashMap::new(), - outbound_clients: DashSet::new(), new_connection_tx: tx, }); @@ -76,10 +70,6 @@ impl<N: NetworkZone> ClientPool<N> { return; } - if client.info.direction == ConnectionDirection::OutBound { - self.outbound_clients.insert(id); - } - let res = self.clients.insert(id, client); assert!(res.is_none()); @@ -108,8 +98,6 @@ impl<N: NetworkZone> ClientPool<N> { /// /// [`None`] is returned if the client did not exist in the pool. fn remove_client(&self, peer: &InternalPeerID<N::Addr>) -> Option<Client<N>> { - self.outbound_clients.remove(peer); - self.clients.remove(peer).map(|(_, client)| client) } diff --git a/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs b/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs index 977e46e8..e83fa325 100644 --- a/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs +++ b/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs @@ -24,8 +24,8 @@ pub async fn disconnect_monitor<N: NetworkZone>( mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID<N::Addr>)>, client_pool: Arc<ClientPool<N>>, ) { - // We need to hold a weak reference otherwise the client pool and this would hold a reference to each - // other causing the pool to be leaked. + // We need to hold a weak reference otherwise the client pool and this would hold a reference to + // each other causing the pool to be leaked. let weak_client_pool = Arc::downgrade(&client_pool); drop(client_pool); diff --git a/p2p/cuprate-p2p/src/inbound_server.rs b/p2p/cuprate-p2p/src/inbound_server.rs index df15515e..0eb4772c 100644 --- a/p2p/cuprate-p2p/src/inbound_server.rs +++ b/p2p/cuprate-p2p/src/inbound_server.rs @@ -12,9 +12,9 @@ use tokio::{ use tower::{Service, ServiceExt}; use tracing::{instrument, Instrument, Span}; -use monero_p2p::services::{AddressBookRequest, AddressBookResponse}; use monero_p2p::{ client::{Client, DoHandshakeRequest, HandshakeError, InternalPeerID}, + services::{AddressBookRequest, AddressBookResponse}, AddressBook, ConnectionDirection, NetworkZone, }; diff --git a/p2p/cuprate-p2p/src/lib.rs b/p2p/cuprate-p2p/src/lib.rs index 500eeb09..b244c57a 100644 --- a/p2p/cuprate-p2p/src/lib.rs +++ b/p2p/cuprate-p2p/src/lib.rs @@ -5,14 +5,20 @@ //! This crate also contains the different routing methods that control how messages should be sent, i.e. broadcast to all, //! or send to a single peer. //! -#![allow(dead_code)] use std::sync::Arc; + use tokio::sync::{mpsc, watch}; -use tower::buffer::Buffer; +use tokio_stream::wrappers::WatchStream; +use tower::{buffer::Buffer, util::BoxCloneService, ServiceExt}; use tracing::{instrument, Instrument, Span}; -use monero_p2p::{CoreSyncSvc, NetworkZone, PeerRequestHandler}; +use monero_p2p::{ + client::Connector, + client::InternalPeerID, + services::{AddressBookRequest, AddressBookResponse}, + CoreSyncSvc, NetworkZone, PeerRequestHandler, +}; mod broadcast; mod client_pool; @@ -22,9 +28,10 @@ mod constants; mod inbound_server; mod sync_states; -use crate::connection_maintainer::MakeConnectionRequest; +pub use broadcast::{BroadcastRequest, BroadcastSvc}; +use client_pool::ClientPoolDropGuard; pub use config::P2PConfig; -use monero_p2p::client::Connector; +use connection_maintainer::MakeConnectionRequest; /// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`]. /// @@ -108,7 +115,7 @@ where inbound_server::inbound_server( client_pool.clone(), inbound_handshaker, - address_book, + address_book.clone(), config, ) .instrument(Span::current()), @@ -119,6 +126,7 @@ where broadcast_svc, top_block_watch, make_connection_tx, + address_book: address_book.boxed_clone(), }) } @@ -126,11 +134,34 @@ where pub struct NetworkInterface<N: NetworkZone> { /// A pool of free connected peers. pool: Arc<client_pool::ClientPool<N>>, - /// A [`Service`](tower::Service) that allows broadcasting to all connected peers. - broadcast_svc: broadcast::BroadcastSvc<N>, + /// A [`Service`] that allows broadcasting to all connected peers. + broadcast_svc: BroadcastSvc<N>, /// A [`watch`] channel that contains the highest seen cumulative difficulty and other info /// on that claimed chain. top_block_watch: watch::Receiver<sync_states::NewSyncInfo>, /// A channel to request extra connections. + #[allow(dead_code)] // will be used eventually make_connection_tx: mpsc::Sender<MakeConnectionRequest>, + /// The address book service. + address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>, +} + +impl<N: NetworkZone> NetworkInterface<N> { + pub fn broadcast_svc(&self) -> BroadcastSvc<N> { + self.broadcast_svc.clone() + } + + pub fn top_sync_stream(&self) -> WatchStream<sync_states::NewSyncInfo> { + WatchStream::from_changes(self.top_block_watch.clone()) + } + + pub fn address_book( + &self, + ) -> BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError> { + self.address_book.clone() + } + + pub fn borrow_client(&self, peer: &InternalPeerID<N::Addr>) -> Option<ClientPoolDropGuard<N>> { + self.pool.borrow_client(peer) + } } diff --git a/p2p/cuprate-p2p/src/sync_states.rs b/p2p/cuprate-p2p/src/sync_states.rs index 9b8b3bd2..2321692a 100644 --- a/p2p/cuprate-p2p/src/sync_states.rs +++ b/p2p/cuprate-p2p/src/sync_states.rs @@ -25,14 +25,14 @@ use monero_wire::CoreSyncData; use crate::{client_pool::disconnect_monitor::PeerDisconnectFut, constants::SHORT_BAN}; /// The highest claimed sync info from our connected peers. -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] pub struct NewSyncInfo { /// The peers chain height. - chain_height: u64, + pub chain_height: u64, /// The peers top block's hash. - top_hash: [u8; 32], + pub top_hash: [u8; 32], /// The peers cumulative difficulty. - cumulative_difficulty: u128, + pub cumulative_difficulty: u128, } /// A service that keeps track of our peers blockchains.