From 20f9a063ea183dc7cd38bab3486a802c8fc580b6 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Wed, 22 May 2024 21:43:45 +0100 Subject: [PATCH] fix merge + add some docs --- .../src/client_pool/disconnect_monitor.rs | 4 +- .../src/client_pool/drop_guard_client.rs | 2 +- p2p/cuprate-p2p/src/config.rs | 7 +- p2p/cuprate-p2p/src/constants.rs | 8 +- p2p/cuprate-p2p/src/inbound_server.rs | 24 ++-- p2p/cuprate-p2p/src/lib.rs | 116 +++++++++++++++++- p2p/monero-p2p/src/client.rs | 4 +- 7 files changed, 148 insertions(+), 17 deletions(-) diff --git a/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs b/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs index 782e88cf..977e46e8 100644 --- a/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs +++ b/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs @@ -24,8 +24,8 @@ pub async fn disconnect_monitor<N: NetworkZone>( mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID<N::Addr>)>, client_pool: Arc<ClientPool<N>>, ) { - // We need to hold a weak reference otherwise the client pool and this would be a circular reference - // which means the pool would be leaked. + // We need to hold a weak reference otherwise the client pool and this would hold a reference to each + // other causing the pool to be leaked. let weak_client_pool = Arc::downgrade(&client_pool); drop(client_pool); diff --git a/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs b/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs index 5555d713..d8c20c6e 100644 --- a/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs +++ b/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs @@ -14,7 +14,7 @@ pub struct ClientPoolDropGuard<N: NetworkZone> { /// The [`Client`]. /// /// This is set to [`Some`] when this guard is created, then - /// ### [`take`](Option::take)n and returned to the pool when dropped. + /// [`take`](Option::take)n and returned to the pool when dropped. pub(super) client: Option<Client<N>>, } diff --git a/p2p/cuprate-p2p/src/config.rs b/p2p/cuprate-p2p/src/config.rs index bae2a663..336c9eaf 100644 --- a/p2p/cuprate-p2p/src/config.rs +++ b/p2p/cuprate-p2p/src/config.rs @@ -1,12 +1,12 @@ use cuprate_helper::network::Network; use monero_address_book::AddressBookConfig; use monero_p2p::NetworkZone; -use monero_wire::common::PeerSupportFlags; -use monero_wire::BasicNodeData; +use monero_wire::{common::PeerSupportFlags, BasicNodeData}; /// P2P config. #[derive(Clone, Debug)] pub struct P2PConfig<N: NetworkZone> { + /// The [`Network`] we should connect to. pub network: Network, /// The number of outbound connections to make and try keep. @@ -29,6 +29,7 @@ pub struct P2PConfig<N: NetworkZone> { /// The public RPC port to tell peers about so wallets can use our node. `0` if we do not have a public RPC port. pub rpc_port: u16, + /// The [`AddressBookConfig`]. pub address_book_config: AddressBookConfig, } @@ -36,7 +37,7 @@ impl<N: NetworkZone> P2PConfig<N> { /// Returns the [`BasicNodeData`] for this [`P2PConfig`]. /// /// [`BasicNodeData::peer_id`] is set to a random u64, so this function should only be called once - /// per [`NetworkZone`]. + /// per [`NetworkZone`] per run. pub(crate) fn basic_node_data(&self) -> BasicNodeData { BasicNodeData { my_port: self.p2p_port as u32, diff --git a/p2p/cuprate-p2p/src/constants.rs b/p2p/cuprate-p2p/src/constants.rs index 8ec02759..0c65386b 100644 --- a/p2p/cuprate-p2p/src/constants.rs +++ b/p2p/cuprate-p2p/src/constants.rs @@ -1,7 +1,7 @@ use std::time::Duration; /// The timeout we set on handshakes. -pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30); +pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(20); /// The maximum amount of connections to make to seed nodes for when we need peers. pub(crate) const MAX_SEED_CONNECTIONS: usize = 3; @@ -28,6 +28,12 @@ pub(crate) const SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT: usize = 10 * 1024 * 1024; /// 50 more transactions after it are added to the queue. pub(crate) const MAX_TXS_IN_BROADCAST_CHANNEL: usize = 50; +/// The time to sleep after an inbound connection comes in. +/// +/// This is a safety measure to prevent Cuprate from getting spammed with a load of inbound connections. +/// TODO: it might be a good idea to make this configurable. +pub(crate) const INBOUND_CONNECTION_COOL_DOWN: Duration = Duration::from_millis(500); + #[cfg(test)] mod tests { use super::*; diff --git a/p2p/cuprate-p2p/src/inbound_server.rs b/p2p/cuprate-p2p/src/inbound_server.rs index cccdf371..b56903a9 100644 --- a/p2p/cuprate-p2p/src/inbound_server.rs +++ b/p2p/cuprate-p2p/src/inbound_server.rs @@ -5,19 +5,26 @@ use std::{pin::pin, sync::Arc}; use futures::StreamExt; +use tokio::{ + sync::Semaphore, + time::{sleep, timeout}, +}; +use tower::{Service, ServiceExt}; +use tracing::{instrument, Instrument, Span}; + use monero_p2p::{ client::{Client, DoHandshakeRequest, HandshakeError, InternalPeerID}, ConnectionDirection, NetworkZone, }; -use tokio::time::sleep; -use tokio::{sync::Semaphore, time::timeout}; -use tower::{Service, ServiceExt}; -use tracing::{instrument, Instrument, Span}; -use crate::constants::INBOUND_CONNECTION_COOL_DOWN; -use crate::{client_pool::ClientPool, constants::HANDSHAKE_TIMEOUT, P2PConfig}; +use crate::{ + client_pool::ClientPool, + constants::{HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN}, + P2PConfig, +}; -#[instrument(level = "info", skip_all)] +/// The inbound server. +#[instrument(level = "warn", skip_all)] pub async fn inbound_server<N, HS>( client_pool: Arc<ClientPool<N>>, mut handshaker: HS, @@ -35,6 +42,9 @@ where return Ok(()); }; + // TODO: take in the address book and check if incoming peers are banned before adding them to our + // connections. + tracing::info!("Starting inbound connection server"); let listener = N::incoming_connection_listener(server_config, config.p2p_port) diff --git a/p2p/cuprate-p2p/src/lib.rs b/p2p/cuprate-p2p/src/lib.rs index afa4c93a..b694bc4c 100644 --- a/p2p/cuprate-p2p/src/lib.rs +++ b/p2p/cuprate-p2p/src/lib.rs @@ -7,11 +7,125 @@ //! #![allow(dead_code)] +use std::sync::Arc; +use tokio::sync::{mpsc, watch}; +use tower::buffer::Buffer; +use tracing::{instrument, Instrument, Span}; + +use monero_p2p::{CoreSyncSvc, NetworkZone, PeerRequestHandler}; + mod broadcast; -pub mod client_pool; +mod client_pool; pub mod config; pub mod connection_maintainer; mod constants; +mod inbound_server; mod sync_states; +use crate::connection_maintainer::MakeConnectionRequest; pub use config::P2PConfig; +use monero_p2p::client::Connector; + +/// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`]. +/// +/// This function starts all the tasks to maintain connections/ accept connections/ make connections. +/// +/// To use you must provide, a peer request handler, which is given to each connection and a core sync service +/// which keeps track of the sync state of our node. +#[instrument(level="debug", name="net", skip_all, fields(zone=N::NAME))] +pub async fn initialize_network<N, R, CS>( + peer_req_handler: R, + core_sync_svc: CS, + config: P2PConfig<N>, +) -> Result<NetworkInterface<N>, tower::BoxError> +where + N: NetworkZone, + R: PeerRequestHandler + Clone, + CS: CoreSyncSvc + Clone, +{ + let address_book = + monero_address_book::init_address_book(config.address_book_config.clone()).await?; + let address_book = Buffer::new( + address_book, + config.max_inbound_connections + config.outbound_connections, + ); + + let (sync_states_svc, top_block_watch) = sync_states::PeerSyncSvc::new(); + let sync_states_svc = Buffer::new( + sync_states_svc, + config.max_inbound_connections + config.outbound_connections, + ); + + // Use the default config. Changing the defaults affects tx fluff times, which could effect D++ so for now don't allow changing + // this. + let (broadcast_svc, outbound_mkr, inbound_mkr) = + broadcast::init_broadcast_channels(broadcast::BroadcastConfig::default()); + + let mut basic_node_data = config.basic_node_data(); + + if !N::CHECK_NODE_ID { + // TODO: make sure this is the value monerod sets for anon networks. + basic_node_data.peer_id = 1; + } + + let outbound_handshaker = monero_p2p::client::HandShaker::new( + address_book.clone(), + sync_states_svc.clone(), + core_sync_svc.clone(), + peer_req_handler.clone(), + outbound_mkr, + basic_node_data.clone(), + ); + + let inbound_handshaker = monero_p2p::client::HandShaker::new( + address_book.clone(), + sync_states_svc, + core_sync_svc.clone(), + peer_req_handler, + inbound_mkr, + basic_node_data, + ); + + let client_pool = client_pool::ClientPool::new(); + + let (make_connection_tx, make_connection_rx) = mpsc::channel(3); + + let outbound_connector = Connector::new(outbound_handshaker); + let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new( + config.clone(), + client_pool.clone(), + make_connection_rx, + address_book.clone(), + outbound_connector, + ); + + tokio::spawn( + outbound_connection_maintainer + .run() + .instrument(Span::current()), + ); + tokio::spawn( + inbound_server::inbound_server(client_pool.clone(), inbound_handshaker, config) + .instrument(Span::current()), + ); + + Ok(NetworkInterface { + pool: client_pool, + broadcast_svc, + top_block_watch, + make_connection_tx, + }) +} + +/// The interface to Monero's P2P network on a certain [`NetworkZone`]. +pub struct NetworkInterface<N: NetworkZone> { + /// A pool of free connected peers. + pool: Arc<client_pool::ClientPool<N>>, + /// A [`Service`](tower::Service) that allows broadcasting to all connected peers. + broadcast_svc: broadcast::BroadcastSvc<N>, + /// A [`watch`] channel that contains the highest seen cumulative difficulty and other info + /// on that claimed chain. + top_block_watch: watch::Receiver<sync_states::NewSyncInfo>, + /// A channel to request extra connections. + make_connection_tx: mpsc::Sender<MakeConnectionRequest>, +} diff --git a/p2p/monero-p2p/src/client.rs b/p2p/monero-p2p/src/client.rs index 8aab306d..02deae51 100644 --- a/p2p/monero-p2p/src/client.rs +++ b/p2p/monero-p2p/src/client.rs @@ -29,13 +29,13 @@ pub use handshaker::{DoHandshakeRequest, HandShaker, HandshakeError}; use monero_pruning::PruningSeed; /// An internal identifier for a given peer, will be their address if known -/// or a random u64 if not. +/// or a random u128 if not. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub enum InternalPeerID<A> { /// A known address. KnownAddr(A), /// An unknown address (probably an inbound anonymity network connection). - Unknown(u64), + Unknown(u128), } impl<A: Display> Display for InternalPeerID<A> {