add interface methods

This commit is contained in:
Boog900 2024-05-30 17:02:38 +01:00
parent fd7225e673
commit d84e8a48f8
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
6 changed files with 49 additions and 29 deletions

View file

@ -151,6 +151,7 @@ pub enum BroadcastRequest<N: NetworkZone> {
},
}
#[derive(Clone)]
pub struct BroadcastSvc<N: NetworkZone> {
new_block_watch: watch::Sender<NewBlockInfo>,
tx_broadcast_channel_outbound: broadcast::Sender<BroadcastTxInfo<N>>,

View file

@ -12,14 +12,14 @@
//!
use std::sync::Arc;
use dashmap::{DashMap, DashSet};
use dashmap::DashMap;
use tokio::sync::mpsc;
use tracing::{Instrument, Span};
use monero_p2p::{
client::{Client, InternalPeerID},
handles::ConnectionHandle,
ConnectionDirection, NetworkZone,
NetworkZone,
};
pub(crate) mod disconnect_monitor;
@ -33,11 +33,6 @@ pub use drop_guard_client::ClientPoolDropGuard;
pub struct ClientPool<N: NetworkZone> {
/// The connected [`Client`]s.
clients: DashMap<InternalPeerID<N::Addr>, Client<N>>,
/// A set of outbound clients, as these allow accesses/mutation from different threads,
/// a peer ID in here does not mean the peer is necessarily in `clients` as it could have been removed
/// by another thread. However, if the peer is in both here and `clients` it is definitely
/// an outbound peer.
outbound_clients: DashSet<InternalPeerID<N::Addr>>,
/// A channel to send new peer ids down to monitor for disconnect.
new_connection_tx: mpsc::UnboundedSender<(ConnectionHandle, InternalPeerID<N::Addr>)>,
}
@ -49,7 +44,6 @@ impl<N: NetworkZone> ClientPool<N> {
let pool = Arc::new(ClientPool {
clients: DashMap::new(),
outbound_clients: DashSet::new(),
new_connection_tx: tx,
});
@ -76,10 +70,6 @@ impl<N: NetworkZone> ClientPool<N> {
return;
}
if client.info.direction == ConnectionDirection::OutBound {
self.outbound_clients.insert(id);
}
let res = self.clients.insert(id, client);
assert!(res.is_none());
@ -108,8 +98,6 @@ impl<N: NetworkZone> ClientPool<N> {
///
/// [`None`] is returned if the client did not exist in the pool.
fn remove_client(&self, peer: &InternalPeerID<N::Addr>) -> Option<Client<N>> {
self.outbound_clients.remove(peer);
self.clients.remove(peer).map(|(_, client)| client)
}

View file

@ -24,8 +24,8 @@ pub async fn disconnect_monitor<N: NetworkZone>(
mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID<N::Addr>)>,
client_pool: Arc<ClientPool<N>>,
) {
// We need to hold a weak reference otherwise the client pool and this would hold a reference to each
// other causing the pool to be leaked.
// We need to hold a weak reference otherwise the client pool and this would hold a reference to
// each other causing the pool to be leaked.
let weak_client_pool = Arc::downgrade(&client_pool);
drop(client_pool);

View file

@ -12,9 +12,9 @@ use tokio::{
use tower::{Service, ServiceExt};
use tracing::{instrument, Instrument, Span};
use monero_p2p::services::{AddressBookRequest, AddressBookResponse};
use monero_p2p::{
client::{Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
services::{AddressBookRequest, AddressBookResponse},
AddressBook, ConnectionDirection, NetworkZone,
};

View file

@ -5,14 +5,20 @@
//! This crate also contains the different routing methods that control how messages should be sent, i.e. broadcast to all,
//! or send to a single peer.
//!
#![allow(dead_code)]
use std::sync::Arc;
use tokio::sync::{mpsc, watch};
use tower::buffer::Buffer;
use tokio_stream::wrappers::WatchStream;
use tower::{buffer::Buffer, util::BoxCloneService, ServiceExt};
use tracing::{instrument, Instrument, Span};
use monero_p2p::{CoreSyncSvc, NetworkZone, PeerRequestHandler};
use monero_p2p::{
client::Connector,
client::InternalPeerID,
services::{AddressBookRequest, AddressBookResponse},
CoreSyncSvc, NetworkZone, PeerRequestHandler,
};
mod broadcast;
mod client_pool;
@ -22,9 +28,10 @@ mod constants;
mod inbound_server;
mod sync_states;
use crate::connection_maintainer::MakeConnectionRequest;
pub use broadcast::{BroadcastRequest, BroadcastSvc};
use client_pool::ClientPoolDropGuard;
pub use config::P2PConfig;
use monero_p2p::client::Connector;
use connection_maintainer::MakeConnectionRequest;
/// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`].
///
@ -108,7 +115,7 @@ where
inbound_server::inbound_server(
client_pool.clone(),
inbound_handshaker,
address_book,
address_book.clone(),
config,
)
.instrument(Span::current()),
@ -119,6 +126,7 @@ where
broadcast_svc,
top_block_watch,
make_connection_tx,
address_book: address_book.boxed_clone(),
})
}
@ -126,11 +134,34 @@ where
pub struct NetworkInterface<N: NetworkZone> {
/// A pool of free connected peers.
pool: Arc<client_pool::ClientPool<N>>,
/// A [`Service`](tower::Service) that allows broadcasting to all connected peers.
broadcast_svc: broadcast::BroadcastSvc<N>,
/// A [`Service`] that allows broadcasting to all connected peers.
broadcast_svc: BroadcastSvc<N>,
/// A [`watch`] channel that contains the highest seen cumulative difficulty and other info
/// on that claimed chain.
top_block_watch: watch::Receiver<sync_states::NewSyncInfo>,
/// A channel to request extra connections.
#[allow(dead_code)] // will be used eventually
make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
/// The address book service.
address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>,
}
impl<N: NetworkZone> NetworkInterface<N> {
pub fn broadcast_svc(&self) -> BroadcastSvc<N> {
self.broadcast_svc.clone()
}
pub fn top_sync_stream(&self) -> WatchStream<sync_states::NewSyncInfo> {
WatchStream::from_changes(self.top_block_watch.clone())
}
pub fn address_book(
&self,
) -> BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError> {
self.address_book.clone()
}
pub fn borrow_client(&self, peer: &InternalPeerID<N::Addr>) -> Option<ClientPoolDropGuard<N>> {
self.pool.borrow_client(peer)
}
}

View file

@ -25,14 +25,14 @@ use monero_wire::CoreSyncData;
use crate::{client_pool::disconnect_monitor::PeerDisconnectFut, constants::SHORT_BAN};
/// The highest claimed sync info from our connected peers.
#[derive(Debug)]
#[derive(Debug, Copy, Clone)]
pub struct NewSyncInfo {
/// The peers chain height.
chain_height: u64,
pub chain_height: u64,
/// The peers top block's hash.
top_hash: [u8; 32],
pub top_hash: [u8; 32],
/// The peers cumulative difficulty.
cumulative_difficulty: u128,
pub cumulative_difficulty: u128,
}
/// A service that keeps track of our peers blockchains.