From b510739701790f7b047be05801103b296d374b98 Mon Sep 17 00:00:00 2001 From: Boog900 Date: Tue, 4 Jun 2024 17:19:25 +0000 Subject: [PATCH] P2P: Network init (#130) * p2p changes * clippy * a few more docs * init cuprate-p2p * remove some unrelated code and add some docs * start documenting client_pool.rs * add more docs * typo * fix docs * use JoinSet in connection maintainer * small changes * add peer sync state svc * add broadcast svc * add more docs * add some tests * add a test * fix merge * add another test * unify PeerDisconnectFut and add more docs * start network init * add an inbound connection server * remove crate doc for now * fix address book docs * fix leak in client pool * correct comment * fix merge + add some docs * fix doc * dandelion_tower -> dandelion-tower * fix async-buffer builds * check if incoming peers are banned * add interface methods * update docs * use a JoinSet for background network tasks * Apply suggestions from code review Co-authored-by: hinto-janai * Update p2p/monero-p2p/src/services.rs Co-authored-by: hinto-janai --------- Co-authored-by: hinto-janai --- Cargo.toml | 1 + p2p/address-book/Cargo.toml | 2 +- p2p/address-book/src/book.rs | 3 + p2p/address-book/src/lib.rs | 10 +- p2p/async-buffer/src/lib.rs | 2 +- p2p/cuprate-p2p/Cargo.toml | 6 +- p2p/cuprate-p2p/src/broadcast.rs | 1 + p2p/cuprate-p2p/src/client_pool.rs | 22 +-- .../src/client_pool/disconnect_monitor.rs | 13 +- .../src/client_pool/drop_guard_client.rs | 2 +- p2p/cuprate-p2p/src/config.rs | 42 +++- p2p/cuprate-p2p/src/connection_maintainer.rs | 19 +- p2p/cuprate-p2p/src/constants.rs | 8 +- p2p/cuprate-p2p/src/inbound_server.rs | 113 +++++++++++ p2p/cuprate-p2p/src/lib.rs | 183 +++++++++++++++++- p2p/cuprate-p2p/src/sync_states.rs | 8 +- p2p/dandelion/Cargo.toml | 2 +- p2p/monero-p2p/src/client.rs | 4 +- p2p/monero-p2p/src/lib.rs | 9 +- p2p/monero-p2p/src/network_zones/clear.rs | 6 +- p2p/monero-p2p/src/services.rs | 4 + p2p/monero-p2p/tests/fragmented_handshake.rs | 7 +- p2p/monero-p2p/tests/handshake.rs | 4 +- test-utils/src/test_netzone.rs | 10 +- 24 files changed, 415 insertions(+), 66 deletions(-) create mode 100644 p2p/cuprate-p2p/src/inbound_server.rs diff --git a/Cargo.toml b/Cargo.toml index d07f7bf4..4b9bd637 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "p2p/cuprate-p2p", "p2p/dandelion", "p2p/monero-p2p", + "p2p/async-buffer", "p2p/address-book", "storage/cuprate-blockchain", "storage/cuprate-txpool", diff --git a/p2p/address-book/Cargo.toml b/p2p/address-book/Cargo.toml index e60ec611..9b24c022 100644 --- a/p2p/address-book/Cargo.toml +++ b/p2p/address-book/Cargo.toml @@ -11,7 +11,7 @@ monero-pruning = { path = "../../pruning" } monero-wire = { path= "../../net/monero-wire" } monero-p2p = { path = "../monero-p2p" } -tower = { workspace = true, features = ["util", "buffer"] } +tower = { workspace = true, features = ["util"] } tokio = { workspace = true, features = ["time", "fs", "rt"]} tokio-util = { workspace = true, features = ["time"] } diff --git a/p2p/address-book/src/book.rs b/p2p/address-book/src/book.rs index 3a49c6be..2f0617e9 100644 --- a/p2p/address-book/src/book.rs +++ b/p2p/address-book/src/book.rs @@ -409,6 +409,9 @@ impl Service> for AddressBook { AddressBookRequest::GetWhitePeers(len) => { Ok(AddressBookResponse::Peers(self.get_white_peers(len))) } + AddressBookRequest::IsPeerBanned(addr) => Ok(AddressBookResponse::IsPeerBanned( + self.is_peer_banned(&addr), + )), }; ready(response) diff --git a/p2p/address-book/src/lib.rs b/p2p/address-book/src/lib.rs index a3dc0543..51f83ddc 100644 --- a/p2p/address-book/src/lib.rs +++ b/p2p/address-book/src/lib.rs @@ -2,7 +2,7 @@ //! //! This module holds the logic for persistent peer storage. //! Cuprates address book is modeled as a [`tower::Service`] -//! The request is [`AddressBookRequest`] and the response is +//! The request is [`AddressBookRequest`](monero_p2p::services::AddressBookRequest) and the response is //! [`AddressBookResponse`](monero_p2p::services::AddressBookResponse). //! //! Cuprate, like monerod, actually has multiple address books, one @@ -13,9 +13,7 @@ //! use std::{io::ErrorKind, path::PathBuf, time::Duration}; -use tower::buffer::Buffer; - -use monero_p2p::{services::AddressBookRequest, NetworkZone}; +use monero_p2p::NetworkZone; mod book; mod peer_list; @@ -65,7 +63,7 @@ pub enum AddressBookError { /// Initializes the P2P address book for a specific network zone. pub async fn init_address_book( cfg: AddressBookConfig, -) -> Result, AddressBookRequest>, std::io::Error> { +) -> Result, std::io::Error> { tracing::info!( "Loading peers from file: {} ", cfg.peer_store_file.display() @@ -82,5 +80,5 @@ pub async fn init_address_book( let address_book = book::AddressBook::::new(cfg, white_list, gray_list, Vec::new()); - Ok(Buffer::new(address_book, 150)) + Ok(address_book) } diff --git a/p2p/async-buffer/src/lib.rs b/p2p/async-buffer/src/lib.rs index ded8c6a9..0e2ced24 100644 --- a/p2p/async-buffer/src/lib.rs +++ b/p2p/async-buffer/src/lib.rs @@ -48,7 +48,7 @@ pub fn new_buffer(max_item_weight: usize) -> (BufferAppender, BufferStream queue: tx, sink_waker: sink_waker.clone(), capacity: capacity_atomic.clone(), - max_item_weight: capacity, + max_item_weight, }, BufferStream { queue: rx, diff --git a/p2p/cuprate-p2p/Cargo.toml b/p2p/cuprate-p2p/Cargo.toml index d73684af..687493a0 100644 --- a/p2p/cuprate-p2p/Cargo.toml +++ b/p2p/cuprate-p2p/Cargo.toml @@ -15,8 +15,8 @@ cuprate-helper = { path = "../../helper", features = ["asynch"] } monero-serai = { workspace = true, features = ["std"] } -tower = { workspace = true } -tokio = { workspace = true, features = ["rt"] } +tower = { workspace = true, features = ["buffer"] } +tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } rayon = { workspace = true } tokio-util = { workspace = true } tokio-stream = { workspace = true, features = ["sync", "time"] } @@ -32,5 +32,7 @@ rand_distr = { workspace = true, features = ["std"] } hex = { workspace = true, features = ["std"] } tracing = { workspace = true, features = ["std", "attributes"] } +tracing-subscriber = "0.3.18" + [dev-dependencies] cuprate-test-utils = { path = "../../test-utils" } diff --git a/p2p/cuprate-p2p/src/broadcast.rs b/p2p/cuprate-p2p/src/broadcast.rs index cc8a3fd6..b6e5e807 100644 --- a/p2p/cuprate-p2p/src/broadcast.rs +++ b/p2p/cuprate-p2p/src/broadcast.rs @@ -151,6 +151,7 @@ pub enum BroadcastRequest { }, } +#[derive(Clone)] pub struct BroadcastSvc { new_block_watch: watch::Sender, tx_broadcast_channel_outbound: broadcast::Sender>, diff --git a/p2p/cuprate-p2p/src/client_pool.rs b/p2p/cuprate-p2p/src/client_pool.rs index 5dc7d1b9..8b77f423 100644 --- a/p2p/cuprate-p2p/src/client_pool.rs +++ b/p2p/cuprate-p2p/src/client_pool.rs @@ -12,13 +12,14 @@ //! use std::sync::Arc; -use dashmap::{DashMap, DashSet}; +use dashmap::DashMap; use tokio::sync::mpsc; +use tracing::{Instrument, Span}; use monero_p2p::{ client::{Client, InternalPeerID}, handles::ConnectionHandle, - ConnectionDirection, NetworkZone, + NetworkZone, }; pub(crate) mod disconnect_monitor; @@ -32,12 +33,6 @@ pub use drop_guard_client::ClientPoolDropGuard; pub struct ClientPool { /// The connected [`Client`]s. clients: DashMap, Client>, - /// A set of outbound clients, as these allow accesses/mutation from different threads, - /// a peer ID in here does not mean the peer is necessarily in `clients` as it could have been removed - /// by another thread. However, if the peer is in both here and `clients` it is definitely - /// an outbound peer. - outbound_clients: DashSet>, - /// A channel to send new peer ids down to monitor for disconnect. new_connection_tx: mpsc::UnboundedSender<(ConnectionHandle, InternalPeerID)>, } @@ -49,11 +44,12 @@ impl ClientPool { let pool = Arc::new(ClientPool { clients: DashMap::new(), - outbound_clients: DashSet::new(), new_connection_tx: tx, }); - tokio::spawn(disconnect_monitor::disconnect_monitor(rx, pool.clone())); + tokio::spawn( + disconnect_monitor::disconnect_monitor(rx, pool.clone()).instrument(Span::current()), + ); pool } @@ -74,10 +70,6 @@ impl ClientPool { return; } - if client.info.direction == ConnectionDirection::OutBound { - self.outbound_clients.insert(id); - } - let res = self.clients.insert(id, client); assert!(res.is_none()); @@ -106,8 +98,6 @@ impl ClientPool { /// /// [`None`] is returned if the client did not exist in the pool. fn remove_client(&self, peer: &InternalPeerID) -> Option> { - self.outbound_clients.remove(peer); - self.clients.remove(peer).map(|(_, client)| client) } diff --git a/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs b/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs index 4e5ec081..e83fa325 100644 --- a/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs +++ b/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs @@ -24,6 +24,11 @@ pub async fn disconnect_monitor( mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID)>, client_pool: Arc>, ) { + // We need to hold a weak reference otherwise the client pool and this would hold a reference to + // each other causing the pool to be leaked. + let weak_client_pool = Arc::downgrade(&client_pool); + drop(client_pool); + tracing::info!("Starting peer disconnect monitor."); let mut futs: FuturesUnordered> = FuturesUnordered::new(); @@ -39,7 +44,13 @@ pub async fn disconnect_monitor( } Some(peer_id) = futs.next() => { tracing::debug!("{peer_id} has disconnected, removing from client pool."); - client_pool.remove_client(&peer_id); + let Some(pool) = weak_client_pool.upgrade() else { + tracing::info!("Peer disconnect monitor shutting down."); + return; + }; + + pool.remove_client(&peer_id); + drop(pool); } else => { tracing::info!("Peer disconnect monitor shutting down."); diff --git a/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs b/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs index 5555d713..d8c20c6e 100644 --- a/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs +++ b/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs @@ -14,7 +14,7 @@ pub struct ClientPoolDropGuard { /// The [`Client`]. /// /// This is set to [`Some`] when this guard is created, then - /// ### [`take`](Option::take)n and returned to the pool when dropped. + /// [`take`](Option::take)n and returned to the pool when dropped. pub(super) client: Option>, } diff --git a/p2p/cuprate-p2p/src/config.rs b/p2p/cuprate-p2p/src/config.rs index 31b5ab12..a92ad9a2 100644 --- a/p2p/cuprate-p2p/src/config.rs +++ b/p2p/cuprate-p2p/src/config.rs @@ -1,12 +1,52 @@ +use cuprate_helper::network::Network; +use monero_address_book::AddressBookConfig; +use monero_p2p::NetworkZone; +use monero_wire::{common::PeerSupportFlags, BasicNodeData}; + /// P2P config. #[derive(Clone, Debug)] -pub struct P2PConfig { +pub struct P2PConfig { + /// The [`Network`] we should connect to. + pub network: Network, + /// The number of outbound connections to make and try keep. pub outbound_connections: usize, /// The amount of extra connections we can make if we are under load from the rest of Cuprate. pub extra_outbound_connections: usize, + /// The maximum amount of inbound connections, only relevant if [`P2PConfig::server_config`] is set to [`Some`] + pub max_inbound_connections: usize, /// The percent of outbound peers that should be gray aka never connected to before. /// /// Only values 0..=1 are valid. pub gray_peers_percent: f64, + /// The inbound server configuration, + /// + /// If this is [`None`] no inbound connections will be accepted. + pub server_config: Option, + + /// The port to listen on for inbound connections, only relevant if [`P2PConfig::server_config`] is set to [`Some`]. + pub p2p_port: u16, + /// The public RPC port to tell peers about so wallets can use our node. `0` if we do not have a public RPC port. + pub rpc_port: u16, + + /// The [`AddressBookConfig`]. + pub address_book_config: AddressBookConfig, +} + +impl P2PConfig { + /// Returns the [`BasicNodeData`] for this [`P2PConfig`]. + /// + /// [`BasicNodeData::peer_id`] is set to a random u64, so this function should only be called once + /// per [`NetworkZone`] per run. + pub(crate) fn basic_node_data(&self) -> BasicNodeData { + BasicNodeData { + my_port: u32::from(self.p2p_port), + network_id: self.network.network_id(), + peer_id: rand::random(), + support_flags: PeerSupportFlags::FLUFFY_BLOCKS, + rpc_port: self.rpc_port, + // We do not (and probably will never) support paying for RPC with hashes. + rpc_credits_per_hash: 0, + } + } } diff --git a/p2p/cuprate-p2p/src/connection_maintainer.rs b/p2p/cuprate-p2p/src/connection_maintainer.rs index bff4b9d5..4ec66950 100644 --- a/p2p/cuprate-p2p/src/connection_maintainer.rs +++ b/p2p/cuprate-p2p/src/connection_maintainer.rs @@ -12,7 +12,7 @@ use tokio::{ time::{sleep, timeout}, }; use tower::{Service, ServiceExt}; -use tracing::instrument; +use tracing::{instrument, Instrument, Span}; use monero_p2p::{ client::{Client, ConnectRequest, HandshakeError}, @@ -60,7 +60,7 @@ pub struct OutboundConnectionKeeper { /// we add a permit to the semaphore and keep track here, upto a value in config. pub extra_peers: usize, /// The p2p config. - pub config: P2PConfig, + pub config: P2PConfig, /// The [`Bernoulli`] distribution, when sampled will return true if we should connect to a gray peer or /// false if we should connect to a white peer. /// @@ -76,7 +76,7 @@ where C::Future: Send + 'static, { pub fn new( - config: P2PConfig, + config: P2PConfig, client_pool: Arc>, make_connection_rx: mpsc::Receiver, address_book_svc: A, @@ -149,7 +149,7 @@ where } /// Connects to a given outbound peer. - #[instrument(level = "info", skip(self, permit), fields(%addr))] + #[instrument(level = "info", skip_all)] async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) { let client_pool = self.client_pool.clone(); let connection_fut = self @@ -159,11 +159,14 @@ where .expect("Connector had an error in `poll_ready`") .call(ConnectRequest { addr, permit }); - tokio::spawn(async move { - if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await { - client_pool.add_new_client(peer); + tokio::spawn( + async move { + if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await { + client_pool.add_new_client(peer); + } } - }); + .instrument(Span::current()), + ); } /// Handles a request from the peer set for more peers. diff --git a/p2p/cuprate-p2p/src/constants.rs b/p2p/cuprate-p2p/src/constants.rs index 8ec02759..0c65386b 100644 --- a/p2p/cuprate-p2p/src/constants.rs +++ b/p2p/cuprate-p2p/src/constants.rs @@ -1,7 +1,7 @@ use std::time::Duration; /// The timeout we set on handshakes. -pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30); +pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(20); /// The maximum amount of connections to make to seed nodes for when we need peers. pub(crate) const MAX_SEED_CONNECTIONS: usize = 3; @@ -28,6 +28,12 @@ pub(crate) const SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT: usize = 10 * 1024 * 1024; /// 50 more transactions after it are added to the queue. pub(crate) const MAX_TXS_IN_BROADCAST_CHANNEL: usize = 50; +/// The time to sleep after an inbound connection comes in. +/// +/// This is a safety measure to prevent Cuprate from getting spammed with a load of inbound connections. +/// TODO: it might be a good idea to make this configurable. +pub(crate) const INBOUND_CONNECTION_COOL_DOWN: Duration = Duration::from_millis(500); + #[cfg(test)] mod tests { use super::*; diff --git a/p2p/cuprate-p2p/src/inbound_server.rs b/p2p/cuprate-p2p/src/inbound_server.rs new file mode 100644 index 00000000..d8389e79 --- /dev/null +++ b/p2p/cuprate-p2p/src/inbound_server.rs @@ -0,0 +1,113 @@ +//! # Inbound Server +//! +//! This module contains the inbound connection server, which listens for inbound connections, gives +//! them to the handshaker service and then adds them to the client pool. +use std::{pin::pin, sync::Arc}; + +use futures::StreamExt; +use tokio::{ + sync::Semaphore, + time::{sleep, timeout}, +}; +use tower::{Service, ServiceExt}; +use tracing::{instrument, Instrument, Span}; + +use monero_p2p::{ + client::{Client, DoHandshakeRequest, HandshakeError, InternalPeerID}, + services::{AddressBookRequest, AddressBookResponse}, + AddressBook, ConnectionDirection, NetworkZone, +}; + +use crate::{ + client_pool::ClientPool, + constants::{HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN}, + P2PConfig, +}; + +/// Starts the inbound server. +#[instrument(level = "warn", skip_all)] +pub async fn inbound_server( + client_pool: Arc>, + mut handshaker: HS, + mut address_book: A, + config: P2PConfig, +) -> Result<(), tower::BoxError> +where + N: NetworkZone, + HS: Service, Response = Client, Error = HandshakeError> + + Send + + 'static, + HS::Future: Send + 'static, + A: AddressBook, +{ + let Some(server_config) = config.server_config else { + tracing::warn!("No inbound server config provided, not listening for inbound connections."); + return Ok(()); + }; + + tracing::info!("Starting inbound connection server"); + + let listener = N::incoming_connection_listener(server_config, config.p2p_port) + .await + .inspect_err(|e| tracing::warn!("Failed to start inbound server: {e}"))?; + + let mut listener = pin!(listener); + + let semaphore = Arc::new(Semaphore::new(config.max_inbound_connections)); + + while let Some(connection) = listener.next().await { + let Ok((addr, peer_stream, peer_sink)) = connection else { + continue; + }; + + if let Some(addr) = &addr { + let AddressBookResponse::IsPeerBanned(banned) = address_book + .ready() + .await? + .call(AddressBookRequest::IsPeerBanned(*addr)) + .await? + else { + panic!("Address book returned incorrect response!"); + }; + + if banned { + continue; + } + } + + let addr = match addr { + Some(addr) => InternalPeerID::KnownAddr(addr), + None => InternalPeerID::Unknown(rand::random()), + }; + + if let Ok(permit) = semaphore.clone().try_acquire_owned() { + tracing::debug!("Permit free for incoming connection, attempting handshake."); + + let fut = handshaker.ready().await?.call(DoHandshakeRequest { + addr, + peer_stream, + peer_sink, + direction: ConnectionDirection::InBound, + permit, + }); + + let cloned_pool = client_pool.clone(); + + tokio::spawn( + async move { + if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, fut).await { + cloned_pool.add_new_client(peer); + } + } + .instrument(Span::current()), + ); + } else { + tracing::debug!("No permit free for incoming connection."); + // TODO: listen for if the peer is just trying to ping us to see if we are reachable. + } + + sleep(INBOUND_CONNECTION_COOL_DOWN).await; + } + + Ok(()) +} diff --git a/p2p/cuprate-p2p/src/lib.rs b/p2p/cuprate-p2p/src/lib.rs index afa4c93a..37ea32a3 100644 --- a/p2p/cuprate-p2p/src/lib.rs +++ b/p2p/cuprate-p2p/src/lib.rs @@ -1,17 +1,186 @@ //! Cuprate's P2P Crate. //! -//! This crate contains a [`ClientPool`](client_pool::ClientPool) which holds connected peers on a single [`NetworkZone`](monero_p2p::NetworkZone). -//! -//! This crate also contains the different routing methods that control how messages should be sent, i.e. broadcast to all, -//! or send to a single peer. -//! -#![allow(dead_code)] +//! This crate contains a [`NetworkInterface`] which allows interacting with the Monero P2P network on +//! a certain [`NetworkZone`] +use std::sync::Arc; + +use futures::FutureExt; +use tokio::{ + sync::{mpsc, watch}, + task::JoinSet, +}; +use tokio_stream::wrappers::WatchStream; +use tower::{buffer::Buffer, util::BoxCloneService, ServiceExt}; +use tracing::{instrument, Instrument, Span}; + +use monero_p2p::{ + client::Connector, + client::InternalPeerID, + services::{AddressBookRequest, AddressBookResponse}, + CoreSyncSvc, NetworkZone, PeerRequestHandler, +}; mod broadcast; -pub mod client_pool; +mod client_pool; pub mod config; pub mod connection_maintainer; mod constants; +mod inbound_server; mod sync_states; +pub use broadcast::{BroadcastRequest, BroadcastSvc}; +use client_pool::ClientPoolDropGuard; pub use config::P2PConfig; +use connection_maintainer::MakeConnectionRequest; + +/// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`]. +/// +/// This function starts all the tasks to maintain/accept/make connections. +/// +/// # Usage +/// You must provide: +/// - A peer request handler, which is given to each connection +/// - A core sync service, which keeps track of the sync state of our node +#[instrument(level = "debug", name = "net", skip_all, fields(zone = N::NAME))] +pub async fn initialize_network( + peer_req_handler: R, + core_sync_svc: CS, + config: P2PConfig, +) -> Result, tower::BoxError> +where + N: NetworkZone, + R: PeerRequestHandler + Clone, + CS: CoreSyncSvc + Clone, +{ + let address_book = + monero_address_book::init_address_book(config.address_book_config.clone()).await?; + let address_book = Buffer::new( + address_book, + config.max_inbound_connections + config.outbound_connections, + ); + + let (sync_states_svc, top_block_watch) = sync_states::PeerSyncSvc::new(); + let sync_states_svc = Buffer::new( + sync_states_svc, + config.max_inbound_connections + config.outbound_connections, + ); + + // Use the default config. Changing the defaults affects tx fluff times, which could affect D++ so for now don't allow changing + // this. + let (broadcast_svc, outbound_mkr, inbound_mkr) = + broadcast::init_broadcast_channels(broadcast::BroadcastConfig::default()); + + let mut basic_node_data = config.basic_node_data(); + + if !N::CHECK_NODE_ID { + basic_node_data.peer_id = 1; + } + + let outbound_handshaker = monero_p2p::client::HandShaker::new( + address_book.clone(), + sync_states_svc.clone(), + core_sync_svc.clone(), + peer_req_handler.clone(), + outbound_mkr, + basic_node_data.clone(), + ); + + let inbound_handshaker = monero_p2p::client::HandShaker::new( + address_book.clone(), + sync_states_svc, + core_sync_svc.clone(), + peer_req_handler, + inbound_mkr, + basic_node_data, + ); + + let client_pool = client_pool::ClientPool::new(); + + let (make_connection_tx, make_connection_rx) = mpsc::channel(3); + + let outbound_connector = Connector::new(outbound_handshaker); + let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new( + config.clone(), + client_pool.clone(), + make_connection_rx, + address_book.clone(), + outbound_connector, + ); + + let mut background_tasks = JoinSet::new(); + + background_tasks.spawn( + outbound_connection_maintainer + .run() + .instrument(Span::current()), + ); + background_tasks.spawn( + inbound_server::inbound_server( + client_pool.clone(), + inbound_handshaker, + address_book.clone(), + config, + ) + .map(|res| { + if let Err(e) = res { + tracing::error!("Error in inbound connection listener: {e}") + } + + tracing::info!("Inbound connection listener shutdown") + }) + .instrument(Span::current()), + ); + + Ok(NetworkInterface { + pool: client_pool, + broadcast_svc, + top_block_watch, + make_connection_tx, + address_book: address_book.boxed_clone(), + _background_tasks: Arc::new(background_tasks), + }) +} + +/// The interface to Monero's P2P network on a certain [`NetworkZone`]. +#[derive(Clone)] +pub struct NetworkInterface { + /// A pool of free connected peers. + pool: Arc>, + /// A [`Service`] that allows broadcasting to all connected peers. + broadcast_svc: BroadcastSvc, + /// A [`watch`] channel that contains the highest seen cumulative difficulty and other info + /// on that claimed chain. + top_block_watch: watch::Receiver, + /// A channel to request extra connections. + #[allow(dead_code)] // will be used eventually + make_connection_tx: mpsc::Sender, + /// The address book service. + address_book: BoxCloneService, AddressBookResponse, tower::BoxError>, + /// Background tasks that will be aborted when this interface is dropped. + _background_tasks: Arc>, +} + +impl NetworkInterface { + /// Returns a service which allows broadcasting messages to all the connected peers in a specific [`NetworkZone`]. + pub fn broadcast_svc(&self) -> BroadcastSvc { + self.broadcast_svc.clone() + } + + /// Returns a stream which yields the highest seen sync state from a connected peer. + pub fn top_sync_stream(&self) -> WatchStream { + WatchStream::from_changes(self.top_block_watch.clone()) + } + + /// Returns the address book service. + pub fn address_book( + &self, + ) -> BoxCloneService, AddressBookResponse, tower::BoxError> { + self.address_book.clone() + } + + /// Pulls a client from the client pool, returning it in a guard that will return it there when it's + /// dropped. + pub fn borrow_client(&self, peer: &InternalPeerID) -> Option> { + self.pool.borrow_client(peer) + } +} diff --git a/p2p/cuprate-p2p/src/sync_states.rs b/p2p/cuprate-p2p/src/sync_states.rs index 9b8b3bd2..127b8d7e 100644 --- a/p2p/cuprate-p2p/src/sync_states.rs +++ b/p2p/cuprate-p2p/src/sync_states.rs @@ -25,14 +25,14 @@ use monero_wire::CoreSyncData; use crate::{client_pool::disconnect_monitor::PeerDisconnectFut, constants::SHORT_BAN}; /// The highest claimed sync info from our connected peers. -#[derive(Debug)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct NewSyncInfo { /// The peers chain height. - chain_height: u64, + pub chain_height: u64, /// The peers top block's hash. - top_hash: [u8; 32], + pub top_hash: [u8; 32], /// The peers cumulative difficulty. - cumulative_difficulty: u128, + pub cumulative_difficulty: u128, } /// A service that keeps track of our peers blockchains. diff --git a/p2p/dandelion/Cargo.toml b/p2p/dandelion/Cargo.toml index a8a04691..e5d7e340 100644 --- a/p2p/dandelion/Cargo.toml +++ b/p2p/dandelion/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "dandelion_tower" +name = "dandelion-tower" version = "0.1.0" edition = "2021" license = "MIT" diff --git a/p2p/monero-p2p/src/client.rs b/p2p/monero-p2p/src/client.rs index 8aab306d..02deae51 100644 --- a/p2p/monero-p2p/src/client.rs +++ b/p2p/monero-p2p/src/client.rs @@ -29,13 +29,13 @@ pub use handshaker::{DoHandshakeRequest, HandShaker, HandshakeError}; use monero_pruning::PruningSeed; /// An internal identifier for a given peer, will be their address if known -/// or a random u64 if not. +/// or a random u128 if not. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub enum InternalPeerID { /// A known address. KnownAddr(A), /// An unknown address (probably an inbound anonymity network connection). - Unknown(u64), + Unknown(u128), } impl Display for InternalPeerID { diff --git a/p2p/monero-p2p/src/lib.rs b/p2p/monero-p2p/src/lib.rs index 9c171320..13ecf4aa 100644 --- a/p2p/monero-p2p/src/lib.rs +++ b/p2p/monero-p2p/src/lib.rs @@ -130,11 +130,11 @@ pub trait NetworkZone: Clone + Copy + Send + 'static { /// The sink (outgoing data) type for this network. type Sink: Sink, Error = BucketError> + Unpin + Send + 'static; /// The inbound connection listener for this network. - type Listener: Stream< - Item = Result<(Option, Self::Stream, Self::Sink), std::io::Error>, - >; + type Listener: Stream, Self::Stream, Self::Sink), std::io::Error>> + + Send + + 'static; /// Config used to start a server which listens for incoming connections. - type ServerCfg; + type ServerCfg: Clone + Debug + Send + 'static; async fn connect_to_peer( addr: Self::Addr, @@ -142,6 +142,7 @@ pub trait NetworkZone: Clone + Copy + Send + 'static { async fn incoming_connection_listener( config: Self::ServerCfg, + port: u16, ) -> Result; } diff --git a/p2p/monero-p2p/src/network_zones/clear.rs b/p2p/monero-p2p/src/network_zones/clear.rs index 5141a069..c77f1333 100644 --- a/p2p/monero-p2p/src/network_zones/clear.rs +++ b/p2p/monero-p2p/src/network_zones/clear.rs @@ -37,8 +37,9 @@ impl NetZoneAddress for SocketAddr { } } +#[derive(Debug, Clone)] pub struct ClearNetServerCfg { - pub addr: SocketAddr, + pub ip: IpAddr, } #[derive(Clone, Copy)] @@ -80,8 +81,9 @@ impl NetworkZone for ClearNet { async fn incoming_connection_listener( config: Self::ServerCfg, + port: u16, ) -> Result { - let listener = TcpListener::bind(config.addr).await?; + let listener = TcpListener::bind(SocketAddr::new(config.ip, port)).await?; Ok(InBoundStream { listener }) } } diff --git a/p2p/monero-p2p/src/services.rs b/p2p/monero-p2p/src/services.rs index e86e2776..a0ea2e7a 100644 --- a/p2p/monero-p2p/src/services.rs +++ b/p2p/monero-p2p/src/services.rs @@ -119,10 +119,14 @@ pub enum AddressBookRequest { TakeRandomPeer { height: Option }, /// Gets the specified number of white peers, or less if we don't have enough. GetWhitePeers(usize), + /// Checks if the given peer is banned. + IsPeerBanned(Z::Addr), } pub enum AddressBookResponse { Ok, Peer(ZoneSpecificPeerListEntryBase), Peers(Vec>), + /// Contains `true` if the peer is banned. + IsPeerBanned(bool), } diff --git a/p2p/monero-p2p/tests/fragmented_handshake.rs b/p2p/monero-p2p/tests/fragmented_handshake.rs index 60d490f8..e9833cf3 100644 --- a/p2p/monero-p2p/tests/fragmented_handshake.rs +++ b/p2p/monero-p2p/tests/fragmented_handshake.rs @@ -71,8 +71,9 @@ impl NetworkZone for FragNet { async fn incoming_connection_listener( config: Self::ServerCfg, + port: u16, ) -> Result { - let listener = TcpListener::bind(config.addr).await?; + let listener = TcpListener::bind(SocketAddr::new(config.ip, port)).await?; Ok(InBoundStream { listener }) } } @@ -194,9 +195,9 @@ async fn fragmented_handshake_monerod_to_cuprate() { our_basic_node_data, ); - let addr = "127.0.0.1:18081".parse().unwrap(); + let ip = "127.0.0.1".parse().unwrap(); - let mut listener = FragNet::incoming_connection_listener(ClearNetServerCfg { addr }) + let mut listener = FragNet::incoming_connection_listener(ClearNetServerCfg { ip }, 18081) .await .unwrap(); diff --git a/p2p/monero-p2p/tests/handshake.rs b/p2p/monero-p2p/tests/handshake.rs index 1d8b649c..b63a221b 100644 --- a/p2p/monero-p2p/tests/handshake.rs +++ b/p2p/monero-p2p/tests/handshake.rs @@ -174,9 +174,9 @@ async fn handshake_monerod_to_cuprate() { our_basic_node_data, ); - let addr = "127.0.0.1:18081".parse().unwrap(); + let ip = "127.0.0.1".parse().unwrap(); - let mut listener = ClearNet::incoming_connection_listener(ClearNetServerCfg { addr }) + let mut listener = ClearNet::incoming_connection_listener(ClearNetServerCfg { ip }, 18081) .await .unwrap(); diff --git a/test-utils/src/test_netzone.rs b/test-utils/src/test_netzone.rs index 0a534164..e82e5532 100644 --- a/test-utils/src/test_netzone.rs +++ b/test-utils/src/test_netzone.rs @@ -87,8 +87,9 @@ impl, Self::Stream, Self::Sink), std::io::Error>, - >, + Item = Result<(Option, Self::Stream, Self::Sink), std::io::Error>, + > + Send + + 'static, >, >; type ServerCfg = (); @@ -97,7 +98,10 @@ impl Result { + async fn incoming_connection_listener( + _: Self::ServerCfg, + _: u16, + ) -> Result { unimplemented!() } }