From cf0fcfb6c57e2a4157d9aa2b72fde6c2c54412d3 Mon Sep 17 00:00:00 2001 From: Boog900 Date: Sun, 25 Feb 2024 13:42:27 +0000 Subject: [PATCH] p2p: remove old code (#74) --- p2p/Cargo.toml | 23 - p2p/LICENSE | 14 - p2p/src/address_book.rs | 157 ----- p2p/src/address_book/addr_book_client.rs | 166 ----- p2p/src/address_book/address_book.rs | 594 ----------------- .../address_book/address_book/peer_list.rs | 231 ------- .../address_book/peer_list/tests.rs | 176 ----- p2p/src/address_book/address_book/tests.rs | 81 --- p2p/src/address_book/connection_handle.rs | 110 --- p2p/src/config.rs | 78 --- p2p/src/connection_counter.rs | 125 ---- p2p/src/connection_handle.rs | 98 --- p2p/src/constants.rs | 58 -- p2p/src/lib.rs | 81 --- p2p/src/peer.rs | 16 - p2p/src/peer/client.rs | 176 ----- p2p/src/peer/connection.rs | 169 ----- p2p/src/peer/connector.rs | 159 ----- p2p/src/peer/error.rs | 116 ---- p2p/src/peer/handshaker.rs | 627 ------------------ p2p/src/peer/load_tracked_client.rs | 74 --- p2p/src/peer/tests.rs | 1 - p2p/src/peer/tests/handshake.rs | 1 - p2p/src/protocol.rs | 29 - p2p/src/protocol/internal_network.rs | 125 ---- p2p/src/protocol/internal_network/try_from.rs | 163 ----- 26 files changed, 3648 deletions(-) delete mode 100644 p2p/Cargo.toml delete mode 100644 p2p/LICENSE delete mode 100644 p2p/src/address_book.rs delete mode 100644 p2p/src/address_book/addr_book_client.rs delete mode 100644 p2p/src/address_book/address_book.rs delete mode 100644 p2p/src/address_book/address_book/peer_list.rs delete mode 100644 p2p/src/address_book/address_book/peer_list/tests.rs delete mode 100644 p2p/src/address_book/address_book/tests.rs delete mode 100644 p2p/src/address_book/connection_handle.rs delete mode 100644 p2p/src/config.rs delete mode 100644 p2p/src/connection_counter.rs delete mode 100644 p2p/src/connection_handle.rs delete mode 100644 p2p/src/constants.rs delete mode 100644 p2p/src/lib.rs delete mode 100644 p2p/src/peer.rs delete mode 100644 p2p/src/peer/client.rs delete mode 100644 p2p/src/peer/connection.rs delete mode 100644 p2p/src/peer/connector.rs delete mode 100644 p2p/src/peer/error.rs delete mode 100644 p2p/src/peer/handshaker.rs delete mode 100644 p2p/src/peer/load_tracked_client.rs delete mode 100644 p2p/src/peer/tests.rs delete mode 100644 p2p/src/peer/tests/handshake.rs delete mode 100644 p2p/src/protocol.rs delete mode 100644 p2p/src/protocol/internal_network.rs delete mode 100644 p2p/src/protocol/internal_network/try_from.rs diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml deleted file mode 100644 index b90ea4b..0000000 --- a/p2p/Cargo.toml +++ /dev/null @@ -1,23 +0,0 @@ -[package] -name = "cuprate-p2p" -version = "0.1.0" -edition = "2021" -license = "AGPL-3.0-only" -authors = ["Boog900"] - - -[dependencies] -chrono = "0.4.24" -thiserror = "1.0.39" -cuprate-common = {path = "../common"} -monero-wire = {path= "../net/monero-wire"} -futures = "0.3.26" -tower = {version = "0.4.13", features = ["util", "steer", "load", "discover", "load-shed", "buffer", "timeout"]} -tokio = {version= "1.27", features=["rt", "time", "net"]} -tokio-util = {version = "0.7.8", features=["codec"]} -tokio-stream = {version="0.1.14", features=["time"]} -async-trait = "0.1.68" -tracing = "0.1.37" -tracing-error = "0.2.0" -rand = "0.8.5" -pin-project = "1.0.12" diff --git a/p2p/LICENSE b/p2p/LICENSE deleted file mode 100644 index e19903e..0000000 --- a/p2p/LICENSE +++ /dev/null @@ -1,14 +0,0 @@ - Copyright (C) 2023 Cuprate Contributors - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see . \ No newline at end of file diff --git a/p2p/src/address_book.rs b/p2p/src/address_book.rs deleted file mode 100644 index 6865137..0000000 --- a/p2p/src/address_book.rs +++ /dev/null @@ -1,157 +0,0 @@ -//! Cuprate Address Book -//! -//! This module holds the logic for persistent peer storage. -//! Cuprates address book is modeled as a [`tower::Service`] -//! The request is [`AddressBookRequest`] and the response is -//! [`AddressBookResponse`]. -//! -//! Cuprate, like monerod, actually has 3 address books, one -//! for each [`NetZone`]. This is to reduce the possibility of -//! clear net peers getting linked to their dark counterparts -//! and so peers will only get told about peers they can -//! connect to. -//! - -mod addr_book_client; -mod address_book; -pub mod connection_handle; - -use cuprate_common::PruningSeed; -use monero_wire::{messages::PeerListEntryBase, network_address::NetZone, NetworkAddress, PeerID}; - -use connection_handle::ConnectionAddressBookHandle; - -pub use addr_book_client::start_address_book; - -/// Possible errors when dealing with the address book. -/// This is boxed when returning an error in the [`tower::Service`]. -#[derive(Debug, thiserror::Error)] -pub enum AddressBookError { - /// The peer is not in the address book for this zone. - #[error("Peer was not found in book")] - PeerNotFound, - /// The peer list is empty. - #[error("The peer list is empty")] - PeerListEmpty, - /// The peers pruning seed has changed. - #[error("The peers pruning seed has changed")] - PeersPruningSeedChanged, - /// The peer is banned. - #[error("The peer is banned")] - PeerIsBanned, - /// When handling a received peer list, the list contains - /// a peer in a different [`NetZone`] - #[error("Peer sent an address out of it's net-zone")] - PeerSentAnAddressOutOfZone, - /// The channel to the address book has closed unexpectedly. - #[error("The address books channel has closed.")] - AddressBooksChannelClosed, - /// The address book task has exited. - #[error("The address book task has exited.")] - AddressBookTaskExited, - /// The peer file store has failed. - #[error("Peer Store Error: {0}")] - PeerStoreError(&'static str), -} - -/// A message sent to tell the address book that a peer has disconnected. -pub struct PeerConnectionClosed; - -/// A request to the address book. -#[derive(Debug)] -pub enum AddressBookRequest { - /// A request to handle an incoming peer list. - HandleNewPeerList(Vec, NetZone), - /// Updates the `last_seen` timestamp of this peer. - SetPeerSeen(PeerID, chrono::NaiveDateTime, NetZone), - /// Bans a peer for the specified duration. This request - /// will send disconnect signals to all peers with the same - /// [`ban_identifier`](NetworkAddress::ban_identifier). - BanPeer(PeerID, std::time::Duration, NetZone), - /// Adds a peer to the connected list - ConnectedToPeer { - /// The net zone of this connection. - zone: NetZone, - /// A handle between the connection and address book. - connection_handle: ConnectionAddressBookHandle, - /// The connection addr, None if the peer is using a - /// hidden network. - addr: Option, - /// The peers id. - id: PeerID, - /// If the peer is reachable by our node. - reachable: bool, - /// The last seen timestamp, note: Cuprate may skip updating this - /// field on some inbound messages - last_seen: chrono::NaiveDateTime, - /// The peers pruning seed - pruning_seed: PruningSeed, - /// The peers port. - rpc_port: u16, - /// The peers rpc credits per hash - rpc_credits_per_hash: u32, - }, - - /// A request to get and eempty the anchor list, - /// used when starting the node. - GetAndEmptyAnchorList(NetZone), - /// Get a random Gray peer from the peer list - /// If a pruning seed is given we will select from - /// peers with that seed and peers that dont prune. - GetRandomGrayPeer(NetZone, Option), - /// Get a random White peer from the peer list - /// If a pruning seed is given we will select from - /// peers with that seed and peers that dont prune. - GetRandomWhitePeer(NetZone, Option), - /// Get a list of random peers from the white list, - /// The list will be less than or equal to the provided - /// len. - GetRandomWhitePeers(NetZone, usize), -} - -impl std::fmt::Display for AddressBookRequest { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::HandleNewPeerList(..) => f.write_str("HandleNewPeerList"), - Self::SetPeerSeen(..) => f.write_str("SetPeerSeen"), - Self::BanPeer(..) => f.write_str("BanPeer"), - Self::ConnectedToPeer { .. } => f.write_str("ConnectedToPeer"), - - Self::GetAndEmptyAnchorList(_) => f.write_str("GetAndEmptyAnchorList"), - Self::GetRandomGrayPeer(..) => f.write_str("GetRandomGrayPeer"), - Self::GetRandomWhitePeer(..) => f.write_str("GetRandomWhitePeer"), - Self::GetRandomWhitePeers(_, len) => { - f.write_str(&format!("GetRandomWhitePeers, len: {len}")) - } - } - } -} - -impl AddressBookRequest { - /// Gets the [`NetZone`] for this request so we can - /// route it to the required address book. - pub fn get_zone(&self) -> NetZone { - match self { - Self::HandleNewPeerList(_, zone) => *zone, - Self::SetPeerSeen(.., zone) => *zone, - Self::BanPeer(.., zone) => *zone, - Self::ConnectedToPeer { zone, .. } => *zone, - - Self::GetAndEmptyAnchorList(zone) => *zone, - Self::GetRandomGrayPeer(zone, _) => *zone, - Self::GetRandomWhitePeer(zone, _) => *zone, - Self::GetRandomWhitePeers(zone, _) => *zone, - } - } -} - -/// A response from the AddressBook. -#[derive(Debug)] -pub enum AddressBookResponse { - /// The request was handled ok. - Ok, - /// A peer. - Peer(PeerListEntryBase), - /// A list of peers. - Peers(Vec), -} diff --git a/p2p/src/address_book/addr_book_client.rs b/p2p/src/address_book/addr_book_client.rs deleted file mode 100644 index f35d265..0000000 --- a/p2p/src/address_book/addr_book_client.rs +++ /dev/null @@ -1,166 +0,0 @@ -//! This module holds the address books client and [`tower::Service`]. -//! -//! To start the address book use [`start_address_book`]. -// TODO: Store banned peers persistently. -use std::future::Future; -use std::pin::Pin; -use std::task::Poll; - -use futures::channel::{mpsc, oneshot}; -use futures::FutureExt; -use tokio::task::{spawn, JoinHandle}; -use tower::steer::Steer; -use tower::BoxError; -use tracing::Instrument; - -use monero_wire::network_address::NetZone; - -use crate::{Config, P2PStore}; - -use super::address_book::{AddressBook, AddressBookClientRequest}; -use super::{AddressBookError, AddressBookRequest, AddressBookResponse}; - -/// Start the address book. -/// Under the hood this function spawns 3 address books -/// for the 3 [`NetZone`] and combines them into a [`tower::Steer`](Steer). -pub async fn start_address_book( - peer_store: S, - config: Config, -) -> Result< - impl tower::Service< - AddressBookRequest, - Response = AddressBookResponse, - Error = BoxError, - Future = Pin< - Box> + Send + 'static>, - >, - >, - BoxError, -> -where - S: P2PStore, -{ - let mut builder = AddressBookBuilder::new(peer_store, config); - - let public = builder.build(NetZone::Public).await?; - let tor = builder.build(NetZone::Tor).await?; - let i2p = builder.build(NetZone::I2p).await?; - - // This list MUST be in the same order as closuer in the `Steer` func - let books = vec![public, tor, i2p]; - - Ok(Steer::new( - books, - |req: &AddressBookRequest, _: &[_]| match req.get_zone() { - // This: - NetZone::Public => 0, - NetZone::Tor => 1, - NetZone::I2p => 2, - }, - )) -} - -/// An address book builder. -/// This: -/// - starts the address book -/// - creates and returns the `AddressBookClient` -struct AddressBookBuilder { - peer_store: S, - config: Config, -} - -impl AddressBookBuilder -where - S: P2PStore, -{ - fn new(peer_store: S, config: Config) -> Self { - AddressBookBuilder { peer_store, config } - } - - /// Builds the address book for a specific [`NetZone`] - async fn build(&mut self, zone: NetZone) -> Result { - let (white, gray, anchor) = self - .peer_store - .load_peers(zone) - .await - .map_err(|e| AddressBookError::PeerStoreError(e))?; - - let book = AddressBook::new( - self.config.clone(), - zone, - white, - gray, - anchor, - vec![], - self.peer_store.clone(), - ); - - let (tx, rx) = mpsc::channel(0); - - let book_span = tracing::info_span!("AddressBook", book = book.book_name()); - - let book_handle = spawn(book.run(rx).instrument(book_span)); - - Ok(AddressBookClient { - book: tx, - book_handle, - }) - } -} - -/// The Client for an individual address book. -#[derive(Debug)] -struct AddressBookClient { - /// The channel to pass requests to the address book. - book: mpsc::Sender, - /// The address book task handle. - book_handle: JoinHandle<()>, -} - -impl tower::Service for AddressBookClient { - type Response = AddressBookResponse; - type Error = BoxError; - type Future = - Pin> + Send + 'static>>; - - fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { - // Check the channel - match self.book.poll_ready(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(())) => (), - Poll::Ready(Err(_)) => { - return Poll::Ready(Err(AddressBookError::AddressBooksChannelClosed.into())) - } - } - - // Check the address book task is still running - match self.book_handle.poll_unpin(cx) { - // The address book is still running - Poll::Pending => Poll::Ready(Ok(())), - // The address book task has exited - Poll::Ready(_) => Err(AddressBookError::AddressBookTaskExited)?, - } - } - - fn call(&mut self, req: AddressBookRequest) -> Self::Future { - let (tx, rx) = oneshot::channel(); - // get the callers span - let span = tracing::debug_span!(parent: &tracing::span::Span::current(), "AddressBook"); - - let req = AddressBookClientRequest { req, tx, span }; - - match self.book.try_send(req) { - Err(_e) => { - // I'm assuming all callers will call `poll_ready` first (which they are supposed to) - futures::future::ready(Err(AddressBookError::AddressBooksChannelClosed.into())) - .boxed() - } - Ok(()) => async move { - rx.await - .expect("Address Book will not drop requests until completed") - .map_err(Into::into) - } - .boxed(), - } - } -} diff --git a/p2p/src/address_book/address_book.rs b/p2p/src/address_book/address_book.rs deleted file mode 100644 index 84d149b..0000000 --- a/p2p/src/address_book/address_book.rs +++ /dev/null @@ -1,594 +0,0 @@ -//! This module contains the actual address book logic. -//! -//! The address book is split into multiple [`PeerList`]: -//! -//! - A White list: For peers we have connected to ourselves. -//! -//! - A Gray list: For Peers we have been told about but -//! haven't connected to ourselves. -//! -//! - An Anchor list: This holds peers we are currently -//! connected to that are reachable if we were to -//! connect to them again. For example an inbound proxy -//! connection would not get added to this list as we cant -//! connect to this peer ourselves. Behind the scenes we -//! are just storing the key to a peer in the white list. -//! -use std::collections::{HashMap, HashSet}; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use futures::stream::FuturesUnordered; -use futures::{ - channel::{mpsc, oneshot}, - FutureExt, Stream, StreamExt, -}; -use pin_project::pin_project; -use rand::prelude::SliceRandom; - -use cuprate_common::shutdown::is_shutting_down; -use cuprate_common::PruningSeed; -use monero_wire::{messages::PeerListEntryBase, network_address::NetZone, NetworkAddress, PeerID}; - -use super::{AddressBookError, AddressBookRequest, AddressBookResponse}; -use crate::address_book::connection_handle::ConnectionAddressBookHandle; -use crate::{constants::ADDRESS_BOOK_SAVE_INTERVAL, Config, P2PStore}; - -mod peer_list; -use peer_list::PeerList; - -#[cfg(test)] -mod tests; - -/// A request sent to the address book task. -pub(crate) struct AddressBookClientRequest { - /// The request - pub req: AddressBookRequest, - /// A oneshot to send the result down - pub tx: oneshot::Sender>, - /// The tracing span to keep the context of the request - pub span: tracing::Span, -} - -/// An entry in the connected list. -pub struct ConnectionPeerEntry { - /// A oneshot sent from the Connection when it has finished. - connection_handle: ConnectionAddressBookHandle, - /// The connection addr, None if the peer is connected through - /// a hidden network. - addr: Option, - /// If the peer is reachable by our node. - reachable: bool, - /// The last seen timestamp, note: Cuprate may skip updating this - /// field on some inbound messages - last_seen: chrono::NaiveDateTime, - /// The peers pruning seed - pruning_seed: PruningSeed, - /// The peers port. - rpc_port: u16, - /// The peers rpc credits per hash - rpc_credits_per_hash: u32, -} - -/// A future that resolves when a peer is unbanned. -#[pin_project(project = EnumProj)] -pub struct BanedPeerFut(Vec, #[pin] tokio::time::Sleep); - -impl Future for BanedPeerFut { - type Output = Vec; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); - match this.1.poll_unpin(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(_) => Poll::Ready(this.0.clone()), - } - } -} - -/// The address book for a specific [`NetZone`] -pub struct AddressBook { - /// The [`NetZone`] of this address book. - zone: NetZone, - /// A copy of the nodes configuration. - config: Config, - /// The Address books white list. - white_list: PeerList, - /// The Address books gray list. - gray_list: PeerList, - /// The Address books anchor list. - anchor_list: HashSet, - /// The Currently connected peers. - connected_peers: HashMap, - /// A tuple of: - /// - A hashset of [`ban_identifier`](NetworkAddress::ban_identifier) - /// - A [`FuturesUnordered`] which contains futures for every ban_id - /// that will resolve when the ban_id should be un banned. - baned_peers: (HashSet>, FuturesUnordered), - /// The peer store to save the peers to persistent storage - p2p_store: PeerStore, -} - -impl AddressBook { - /// Creates a new address book for a given [`NetZone`] - pub fn new( - config: Config, - zone: NetZone, - white_peers: Vec, - gray_peers: Vec, - anchor_peers: Vec, - baned_peers: Vec<(NetworkAddress, chrono::NaiveDateTime)>, - p2p_store: PeerStore, - ) -> Self { - let white_list = PeerList::new(white_peers); - let gray_list = PeerList::new(gray_peers); - let anchor_list = HashSet::from_iter(anchor_peers); - let baned_peers = (HashSet::new(), FuturesUnordered::new()); - - let connected_peers = HashMap::new(); - - AddressBook { - zone, - config, - white_list, - gray_list, - anchor_list, - connected_peers, - baned_peers, - p2p_store, - } - } - - /// Returns the books name (Based on the [`NetZone`]) - pub const fn book_name(&self) -> &'static str { - match self.zone { - NetZone::Public => "PublicAddressBook", - NetZone::Tor => "TorAddressBook", - NetZone::I2p => "I2pAddressBook", - } - } - - /// Returns the length of the white list - fn len_white_list(&self) -> usize { - self.white_list.len() - } - - /// Returns the length of the gray list - fn len_gray_list(&self) -> usize { - self.gray_list.len() - } - - /// Returns the length of the anchor list - fn len_anchor_list(&self) -> usize { - self.anchor_list.len() - } - - /// Returns the length of the banned list - fn len_banned_list(&self) -> usize { - self.baned_peers.0.len() - } - - /// Returns the maximum length of the white list - /// *note this list can grow bigger if we are connected to more - /// than this amount. - fn max_white_peers(&self) -> usize { - self.config.max_white_peers() - } - - /// Returns the maximum length of the gray list - fn max_gray_peers(&self) -> usize { - self.config.max_gray_peers() - } - - /// Checks if a peer is banned. - fn is_peer_banned(&self, peer: &NetworkAddress) -> bool { - self.baned_peers.0.contains(&peer.ban_identifier()) - } - - /// Checks if banned peers should be unbanned as the duration has elapsed - fn check_unban_peers(&mut self) { - while let Some(Some(addr)) = Pin::new(&mut self.baned_peers.1).next().now_or_never() { - tracing::debug!("Unbanning peer: {addr:?}"); - self.baned_peers.0.remove(&addr); - } - } - - /// Checks if peers have disconnected, if they have removing them from the - /// connected and anchor list. - fn check_connected_peers(&mut self) { - let mut remove_from_anchor = vec![]; - // We dont have to worry about updating our white list with the information - // before we remove the peers as that happens on every save. - self.connected_peers.retain(|_, peer| { - if !peer.connection_handle.connection_closed() { - // add the peer to the list to get removed from the anchor - if let Some(addr) = peer.addr { - remove_from_anchor.push(addr) - } - false - } else { - true - } - }); - // If we are shutting down we want to keep our anchor peers for - // the next time we boot up so we dont remove disconnecting peers - // from the anchor list if we are shutting down. - if !is_shutting_down() { - for peer in remove_from_anchor { - self.anchor_list.remove(&peer); - } - } - } - - // Bans the peer and tells the connection tasks of peers with the same ban id to shutdown. - fn ban_peer( - &mut self, - peer: PeerID, - time: std::time::Duration, - ) -> Result<(), AddressBookError> { - tracing::debug!("Banning peer: {peer:?} for: {time:?}"); - - let Some(conn_entry) = self.connected_peers.get(&peer) else { - tracing::debug!("Peer is not in connected list"); - return Err(AddressBookError::PeerNotFound); - }; - // tell the connection task to finish. - conn_entry.connection_handle.kill_connection(); - // try find the NetworkAddress of the peer - let Some(addr) = conn_entry.addr else { - tracing::debug!("Peer does not have an address we can ban"); - return Ok(()); - }; - - let ban_id = addr.ban_identifier(); - - self.white_list.remove_peers_with_ban_id(&ban_id); - self.gray_list.remove_peers_with_ban_id(&ban_id); - // Dont remove from anchor list or connection list as this will happen when - // the connection is closed. - - // tell the connection task of peers with the same ban id to shutdown. - for conn in self.connected_peers.values() { - if let Some(addr) = conn.addr { - if addr.ban_identifier() == ban_id { - conn.connection_handle.kill_connection() - } - } - } - - // add the ban identifier to the ban list - self.baned_peers.0.insert(ban_id.clone()); - self.baned_peers - .1 - .push(BanedPeerFut(ban_id, tokio::time::sleep(time))); - Ok(()) - } - - /// Update the last seen timestamp of a connected peer. - fn update_last_seen( - &mut self, - peer: PeerID, - last_seen: chrono::NaiveDateTime, - ) -> Result<(), AddressBookError> { - if let Some(mut peer) = self.connected_peers.get_mut(&peer) { - peer.last_seen = last_seen; - Ok(()) - } else { - Err(AddressBookError::PeerNotFound) - } - } - - /// adds a peer to the gray list. - fn add_peer_to_gray_list(&mut self, mut peer: PeerListEntryBase) { - if self.white_list.contains_peer(&peer.adr) { - return; - }; - if !self.gray_list.contains_peer(&peer.adr) { - peer.last_seen = 0; - self.gray_list.add_new_peer(peer); - } - } - - /// handles an incoming peer list, - /// dose some basic validation on the addresses - /// appends the good peers to our book. - fn handle_new_peerlist( - &mut self, - mut peers: Vec, - ) -> Result<(), AddressBookError> { - let length = peers.len(); - - tracing::debug!("Received new peer list, length: {length}"); - - let mut err = None; - peers.retain(|peer| { - if err.is_some() { - false - } else if peer.adr.is_local() || peer.adr.is_loopback() { - false - } else if peer.adr.port() == peer.rpc_port { - false - } else if PruningSeed::try_from(peer.pruning_seed).is_err() { - false - } else if peer.adr.get_zone() != self.zone { - tracing::info!("Received an address from a different network zone, ignoring list."); - err = Some(AddressBookError::PeerSentAnAddressOutOfZone); - false - } else if self.is_peer_banned(&peer.adr) { - false - } else { - true - } - }); - - if let Some(e) = err { - return Err(e); - } else { - for peer in peers { - self.add_peer_to_gray_list(peer); - } - self.gray_list - .reduce_list(&HashSet::new(), self.max_gray_peers()); - Ok(()) - } - } - - /// Gets a random peer from our gray list. - /// If pruning seed is set we will get a peer with that pruning seed. - fn get_random_gray_peer( - &mut self, - pruning_seed: Option, - ) -> Option { - self.gray_list - .get_random_peer(&mut rand::thread_rng(), pruning_seed.map(Into::into)) - .map(|p| *p) - } - - /// Gets a random peer from our white list. - /// If pruning seed is set we will get a peer with that pruning seed. - fn get_random_white_peer( - &mut self, - pruning_seed: Option, - ) -> Option { - self.white_list - .get_random_peer(&mut rand::thread_rng(), pruning_seed.map(Into::into)) - .map(|p| *p) - } - - /// Gets random peers from our white list. - /// will be less than or equal to `len`. - fn get_random_white_peers(&mut self, len: usize) -> Vec { - let white_len = self.white_list.len(); - let len = if len < white_len { len } else { white_len }; - let mut white_peers: Vec<&PeerListEntryBase> = self.white_list.iter_all_peers().collect(); - white_peers.shuffle(&mut rand::thread_rng()); - white_peers[0..len].iter().map(|peb| **peb).collect() - } - - /// Updates an entry in the white list, if the peer is not found and `reachable` is true then - /// the peer will be added to the white list. - fn update_white_list_peer_entry( - &mut self, - addr: &NetworkAddress, - id: PeerID, - conn_entry: &ConnectionPeerEntry, - ) -> Result<(), AddressBookError> { - if let Some(peb) = self.white_list.get_peer_mut(addr) { - if peb.pruning_seed == conn_entry.pruning_seed.into() { - return Err(AddressBookError::PeersPruningSeedChanged); - } - peb.id = id; - peb.last_seen = conn_entry.last_seen.timestamp(); - peb.rpc_port = conn_entry.rpc_port; - peb.rpc_credits_per_hash = conn_entry.rpc_credits_per_hash; - peb.pruning_seed = conn_entry.pruning_seed.into(); - } else if conn_entry.reachable { - // if the peer is reachable add it to our white list - let peb = PeerListEntryBase { - id, - adr: *addr, - last_seen: conn_entry.last_seen.timestamp(), - rpc_port: conn_entry.rpc_port, - rpc_credits_per_hash: conn_entry.rpc_credits_per_hash, - pruning_seed: conn_entry.pruning_seed.into(), - }; - self.white_list.add_new_peer(peb); - } - Ok(()) - } - - /// Handles a new connection, adding it to the white list if the - /// peer is reachable by our node. - fn handle_new_connection( - &mut self, - connection_handle: ConnectionAddressBookHandle, - addr: Option, - id: PeerID, - reachable: bool, - last_seen: chrono::NaiveDateTime, - pruning_seed: PruningSeed, - rpc_port: u16, - rpc_credits_per_hash: u32, - ) -> Result<(), AddressBookError> { - let connection_entry = ConnectionPeerEntry { - connection_handle, - addr, - reachable, - last_seen, - pruning_seed, - rpc_port, - rpc_credits_per_hash, - }; - if let Some(addr) = addr { - if self.baned_peers.0.contains(&addr.ban_identifier()) { - return Err(AddressBookError::PeerIsBanned); - } - // remove the peer from the gray list as we know it's active. - let _ = self.gray_list.remove_peer(&addr); - if !reachable { - // If we can't reach the peer remove it from the white list as well - let _ = self.white_list.remove_peer(&addr); - } else { - // The peer is reachable, update our white list and add it to the anchor connections. - self.update_white_list_peer_entry(&addr, id, &connection_entry)?; - self.anchor_list.insert(addr); - } - } - - self.connected_peers.insert(id, connection_entry); - self.white_list - .reduce_list(&self.anchor_list, self.max_white_peers()); - Ok(()) - } - - /// Get and empties the anchor list, used at startup to - /// connect to some peers we were previously connected to. - fn get_and_empty_anchor_list(&mut self) -> Vec { - self.anchor_list - .drain() - .map(|addr| { - self.white_list - .get_peer(&addr) - .expect("If peer is in anchor it must be in white list") - .clone() - }) - .collect() - } - - /// Handles an [`AddressBookClientRequest`] to the address book. - async fn handle_request(&mut self, req: AddressBookClientRequest) { - let _guard = req.span.enter(); - - tracing::trace!("received request: {}", req.req); - - let res = match req.req { - AddressBookRequest::HandleNewPeerList(new_peers, _) => self - .handle_new_peerlist(new_peers) - .map(|_| AddressBookResponse::Ok), - AddressBookRequest::SetPeerSeen(peer, last_seen, _) => self - .update_last_seen(peer, last_seen) - .map(|_| AddressBookResponse::Ok), - AddressBookRequest::BanPeer(peer, time, _) => { - self.ban_peer(peer, time).map(|_| AddressBookResponse::Ok) - } - AddressBookRequest::ConnectedToPeer { - zone: _, - connection_handle, - addr, - id, - reachable, - last_seen, - pruning_seed, - rpc_port, - rpc_credits_per_hash, - } => self - .handle_new_connection( - connection_handle, - addr, - id, - reachable, - last_seen, - pruning_seed, - rpc_port, - rpc_credits_per_hash, - ) - .map(|_| AddressBookResponse::Ok), - - AddressBookRequest::GetAndEmptyAnchorList(_) => { - Ok(AddressBookResponse::Peers(self.get_and_empty_anchor_list())) - } - - AddressBookRequest::GetRandomGrayPeer(_, pruning_seed) => { - match self.get_random_gray_peer(pruning_seed) { - Some(peer) => Ok(AddressBookResponse::Peer(peer)), - None => Err(AddressBookError::PeerListEmpty), - } - } - AddressBookRequest::GetRandomWhitePeer(_, pruning_seed) => { - match self.get_random_white_peer(pruning_seed) { - Some(peer) => Ok(AddressBookResponse::Peer(peer)), - None => Err(AddressBookError::PeerListEmpty), - } - } - AddressBookRequest::GetRandomWhitePeers(_, len) => { - Ok(AddressBookResponse::Peers(self.get_random_white_peers(len))) - } - }; - - if let Err(e) = &res { - tracing::debug!("Error when handling request, err: {e}") - } - - let _ = req.tx.send(res); - } - - /// Updates the white list with the information in the `connected_peers` list. - /// This only updates the `last_seen` timestamp as that's the only thing that should - /// change during connections. - fn update_white_list_with_conn_list(&mut self) { - for (_, peer) in self.connected_peers.iter() { - if peer.reachable { - if let Some(peer_eb) = self.white_list.get_peer_mut(&peer.addr.unwrap()) { - peer_eb.last_seen = peer.last_seen.timestamp(); - } - } - } - } - - /// Saves the address book to persistent storage. - /// TODO: save the banned peer list. - #[tracing::instrument(level="trace", skip(self), fields(name = self.book_name()) )] - async fn save(&mut self) { - self.update_white_list_with_conn_list(); - tracing::trace!( - "white_len: {}, gray_len: {}, anchor_len: {}, banned_len: {}", - self.len_white_list(), - self.len_gray_list(), - self.len_anchor_list(), - self.len_banned_list() - ); - let res = self - .p2p_store - .save_peers( - self.zone, - (&self.white_list).into(), - (&self.gray_list).into(), - self.anchor_list.iter().collect(), - ) - .await; - match res { - Ok(()) => tracing::trace!("Complete"), - Err(e) => tracing::error!("Error saving address book: {e}"), - } - } - - /// Runs the address book task - /// Should be spawned in a task. - pub(crate) async fn run(mut self, mut rx: mpsc::Receiver) { - let mut save_interval = { - let mut interval = tokio::time::interval(ADDRESS_BOOK_SAVE_INTERVAL); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - // Interval ticks at 0, interval, 2 interval, ... - // this is just to ignore the first tick - interval.tick().await; - tokio_stream::wrappers::IntervalStream::new(interval).fuse() - }; - - loop { - self.check_unban_peers(); - self.check_connected_peers(); - futures::select! { - req = rx.next() => { - if let Some(req) = req { - self.handle_request(req).await - } else { - tracing::debug!("{} req channel closed, saving and shutting down book", self.book_name()); - self.save().await; - return; - } - } - _ = save_interval.next() => self.save().await - } - } - } -} diff --git a/p2p/src/address_book/address_book/peer_list.rs b/p2p/src/address_book/address_book/peer_list.rs deleted file mode 100644 index e852100..0000000 --- a/p2p/src/address_book/address_book/peer_list.rs +++ /dev/null @@ -1,231 +0,0 @@ -//! This module contains the individual address books peer lists. -//! -use std::collections::{HashMap, HashSet}; -use std::hash::Hash; - -use cuprate_common::CRYPTONOTE_PRUNING_LOG_STRIPES; -use monero_wire::{messages::PeerListEntryBase, NetworkAddress}; -use rand::Rng; - -#[cfg(test)] -mod tests; - -/// A Peer list in the address book. -/// -/// This could either be the white list or gray list. -pub struct PeerList { - /// The peers with their peer data. - peers: HashMap, - /// An index of Pruning seed to address, so - /// can quickly grab peers with the pruning seed - /// we want. - pruning_idxs: HashMap>, - /// An index of [`ban_identifier`](NetworkAddress::ban_identifier) to Address - /// to allow us to quickly remove baned peers. - ban_id_idxs: HashMap, Vec>, -} - -impl<'a> Into> for &'a PeerList { - fn into(self) -> Vec<&'a PeerListEntryBase> { - self.peers.iter().map(|(_, peb)| peb).collect() - } -} - -impl PeerList { - /// Creates a new peer list. - pub fn new(list: Vec) -> PeerList { - let mut peers = HashMap::with_capacity(list.len()); - let mut pruning_idxs = HashMap::with_capacity(2 << CRYPTONOTE_PRUNING_LOG_STRIPES); - let mut ban_id_idxs = HashMap::with_capacity(list.len()); // worse case, every peer has a different NetworkAddress and ban id - - for peer in list { - peers.insert(peer.adr, peer); - - pruning_idxs - .entry(peer.pruning_seed) - .or_insert_with(Vec::new) - .push(peer.adr); - - ban_id_idxs - .entry(peer.adr.ban_identifier()) - .or_insert_with(Vec::new) - .push(peer.adr); - } - PeerList { - peers, - pruning_idxs, - ban_id_idxs, - } - } - - /// Gets the length of the peer list - pub fn len(&self) -> usize { - self.peers.len() - } - - /// Adds a new peer to the peer list - pub fn add_new_peer(&mut self, peer: PeerListEntryBase) { - if let None = self.peers.insert(peer.adr, peer) { - self.pruning_idxs - .entry(peer.pruning_seed) - .or_insert_with(Vec::new) - .push(peer.adr); - - self.ban_id_idxs - .entry(peer.adr.ban_identifier()) - .or_insert_with(Vec::new) - .push(peer.adr); - } - } - - /// Gets a reference to a peer - pub fn get_peer(&self, peer: &NetworkAddress) -> Option<&PeerListEntryBase> { - self.peers.get(peer) - } - - /// Returns an iterator over every peer in this peer list - pub fn iter_all_peers(&self) -> impl Iterator { - self.peers.values() - } - - /// Returns a random peer. - /// If the pruning seed is specified then we will get a random peer with - /// that pruning seed otherwise we will just get a random peer in the whole - /// list. - pub fn get_random_peer( - &self, - r: &mut R, - pruning_seed: Option, - ) -> Option<&PeerListEntryBase> { - if let Some(seed) = pruning_seed { - let mut peers = self.get_peers_with_pruning(&seed)?; - let len = self.len_by_seed(&seed); - if len == 0 { - None - } else { - let n = r.gen_range(0..len); - - peers.nth(n) - } - } else { - let mut peers = self.iter_all_peers(); - let len = self.len(); - if len == 0 { - None - } else { - let n = r.gen_range(0..len); - - peers.nth(n) - } - } - } - - /// Returns a mutable reference to a peer. - pub fn get_peer_mut(&mut self, peer: &NetworkAddress) -> Option<&mut PeerListEntryBase> { - self.peers.get_mut(peer) - } - - /// Returns true if the list contains this peer. - pub fn contains_peer(&self, peer: &NetworkAddress) -> bool { - self.peers.contains_key(peer) - } - - /// Returns an iterator of peer info of peers with a specific pruning seed. - fn get_peers_with_pruning( - &self, - seed: &u32, - ) -> Option> { - let addrs = self.pruning_idxs.get(seed)?; - - Some(addrs.iter().map(move |addr| { - self.peers - .get(addr) - .expect("Address must be in peer list if we have an idx for it") - })) - } - - /// Removes a peer from the pruning idx - /// - /// MUST NOT BE USED ALONE - fn remove_peer_pruning_idx(&mut self, peer: &PeerListEntryBase) { - remove_peer_idx(&mut self.pruning_idxs, &peer.pruning_seed, &peer.adr) - } - - /// Removes a peer from the ban idx - /// - /// MUST NOT BE USED ALONE - fn remove_peer_ban_idx(&mut self, peer: &PeerListEntryBase) { - remove_peer_idx(&mut self.ban_id_idxs, &peer.adr.ban_identifier(), &peer.adr) - } - - /// Removes a peer from all the indexes - /// - /// MUST NOT BE USED ALONE - fn remove_peer_from_all_idxs(&mut self, peer: &PeerListEntryBase) { - self.remove_peer_ban_idx(peer); - self.remove_peer_pruning_idx(peer); - } - - /// Removes a peer from the peer list - pub fn remove_peer(&mut self, peer: &NetworkAddress) -> Option { - let peer_eb = self.peers.remove(peer)?; - self.remove_peer_from_all_idxs(&peer_eb); - Some(peer_eb) - } - - /// Removes all peers with a specific ban id. - pub fn remove_peers_with_ban_id(&mut self, ban_id: &Vec) { - let Some(addresses) = self.ban_id_idxs.get(ban_id) else { - // No peers to ban - return; - }; - for addr in addresses.clone() { - self.remove_peer(&addr); - } - } - - /// Tries to reduce the peer list to `new_len`. - /// - /// This function could keep the list bigger than `new_len` if `must_keep_peers`s length - /// is larger than new_len, in that case we will remove as much as we can. - pub fn reduce_list(&mut self, must_keep_peers: &HashSet, new_len: usize) { - if new_len >= self.len() { - return; - } - - let target_removed = self.len() - new_len; - let mut removed_count = 0; - let mut peers_to_remove: Vec = Vec::with_capacity(target_removed); - - for (peer_adr, _) in &self.peers { - if removed_count >= target_removed { - break; - } - if !must_keep_peers.contains(peer_adr) { - peers_to_remove.push(*peer_adr); - removed_count += 1; - } - } - - for peer_adr in peers_to_remove { - let _ = self.remove_peer(&peer_adr); - } - } -} - -/// Remove a peer from an index. -fn remove_peer_idx( - idx_map: &mut HashMap>, - idx: &T, - addr: &NetworkAddress, -) { - if let Some(peer_list) = idx_map.get_mut(idx) { - if let Some(idx) = peer_list.iter().position(|peer_adr| peer_adr == addr) { - peer_list.swap_remove(idx); - } else { - unreachable!("This function will only be called when the peer exists."); - } - } else { - unreachable!("Index must exist if a peer has that index"); - } -} diff --git a/p2p/src/address_book/address_book/peer_list/tests.rs b/p2p/src/address_book/address_book/peer_list/tests.rs deleted file mode 100644 index 00ca37c..0000000 --- a/p2p/src/address_book/address_book/peer_list/tests.rs +++ /dev/null @@ -1,176 +0,0 @@ -use std::{collections::HashSet, vec}; - -use monero_wire::{messages::PeerListEntryBase, NetworkAddress}; -use rand::Rng; - -use super::PeerList; - -fn make_fake_peer_list(numb_o_peers: usize) -> PeerList { - let mut peer_list = vec![PeerListEntryBase::default(); numb_o_peers]; - for (idx, peer) in peer_list.iter_mut().enumerate() { - let NetworkAddress::IPv4(ip) = &mut peer.adr else {panic!("this test requires default to be ipv4")}; - ip.m_ip += idx as u32; - } - - PeerList::new(peer_list) -} - -fn make_fake_peer_list_with_random_pruning_seeds(numb_o_peers: usize) -> PeerList { - let mut r = rand::thread_rng(); - - let mut peer_list = vec![PeerListEntryBase::default(); numb_o_peers]; - for (idx, peer) in peer_list.iter_mut().enumerate() { - let NetworkAddress::IPv4(ip) = &mut peer.adr else {panic!("this test requires default to be ipv4")}; - ip.m_ip += idx as u32; - ip.m_port += r.gen_range(0..15); - - peer.pruning_seed = if r.gen_bool(0.4) { - 0 - } else { - r.gen_range(384..=391) - }; - } - - PeerList::new(peer_list) -} - -#[test] -fn peer_list_reduce_length() { - let mut peer_list = make_fake_peer_list(2090); - let must_keep_peers = HashSet::new(); - - let target_len = 2000; - - peer_list.reduce_list(&must_keep_peers, target_len); - - assert_eq!(peer_list.len(), target_len); -} - -#[test] -fn peer_list_reduce_length_with_peers_we_need() { - let mut peer_list = make_fake_peer_list(500); - let must_keep_peers = HashSet::from_iter(peer_list.peers.iter().map(|(adr, _)| *adr)); - - let target_len = 49; - - peer_list.reduce_list(&must_keep_peers, target_len); - - // we can't remove any of the peers we said we need them all - assert_eq!(peer_list.len(), 500); -} - -#[test] -fn peer_list_get_peers_by_pruning_seed() { - let mut r = rand::thread_rng(); - - let peer_list = make_fake_peer_list_with_random_pruning_seeds(1000); - let seed = if r.gen_bool(0.4) { - 0 - } else { - r.gen_range(384..=391) - }; - - let peers_with_seed = peer_list - .get_peers_with_pruning(&seed) - .expect("If you hit this buy a lottery ticket"); - - for peer in peers_with_seed { - assert_eq!(peer.pruning_seed, seed); - } - - assert_eq!(peer_list.len(), 1000); -} - -#[test] -fn peer_list_remove_specific_peer() { - let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100); - - let peer = peer_list - .get_random_peer(&mut rand::thread_rng(), None) - .unwrap() - .clone(); - - assert!(peer_list.remove_peer(&peer.adr).is_some()); - - let pruning_idxs = peer_list.pruning_idxs; - let peers = peer_list.peers; - - for (_, addrs) in pruning_idxs { - addrs.iter().for_each(|adr| assert_ne!(adr, &peer.adr)) - } - - assert!(!peers.contains_key(&peer.adr)); -} - -#[test] -fn peer_list_pruning_idxs_are_correct() { - let peer_list = make_fake_peer_list_with_random_pruning_seeds(100); - let mut total_len = 0; - - for (seed, list) in peer_list.pruning_idxs { - for peer in list.iter() { - assert_eq!(peer_list.peers.get(peer).unwrap().pruning_seed, seed); - total_len += 1; - } - } - - assert_eq!(total_len, peer_list.peers.len()) -} - -#[test] -fn peer_list_add_new_peer() { - let mut peer_list = make_fake_peer_list(10); - let mut new_peer = PeerListEntryBase::default(); - let NetworkAddress::IPv4(ip) = &mut new_peer.adr else {panic!("this test requires default to be ipv4")}; - ip.m_ip += 50; - - peer_list.add_new_peer(new_peer.clone()); - - assert_eq!(peer_list.len(), 11); - assert_eq!(peer_list.get_peer(&new_peer.adr), Some(&new_peer)); - assert!(peer_list - .pruning_idxs - .get(&new_peer.pruning_seed) - .unwrap() - .contains(&new_peer.adr)); -} - -#[test] -fn peer_list_add_existing_peer() { - let mut peer_list = make_fake_peer_list(10); - let existing_peer = peer_list - .get_peer(&NetworkAddress::default()) - .unwrap() - .clone(); - - peer_list.add_new_peer(existing_peer.clone()); - - assert_eq!(peer_list.len(), 10); - assert_eq!(peer_list.get_peer(&existing_peer.adr), Some(&existing_peer)); -} - -#[test] -fn peer_list_get_non_existent_peer() { - let peer_list = make_fake_peer_list(10); - let mut non_existent_peer = NetworkAddress::default(); - let NetworkAddress::IPv4(ip) = &mut non_existent_peer else {panic!("this test requires default to be ipv4")}; - ip.m_ip += 50; - - assert_eq!(peer_list.get_peer(&non_existent_peer), None); -} - -#[test] -fn peer_list_ban_peers() { - let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100); - let peer = peer_list - .get_random_peer(&mut rand::thread_rng(), None) - .unwrap(); - let ban_id = peer.adr.ban_identifier(); - assert!(peer_list.contains_peer(&peer.adr)); - assert_ne!(peer_list.ban_id_idxs.get(&ban_id).unwrap().len(), 0); - peer_list.remove_peers_with_ban_id(&ban_id); - assert_eq!(peer_list.ban_id_idxs.get(&ban_id).unwrap().len(), 0); - for (addr, _) in peer_list.peers { - assert_ne!(addr.ban_identifier(), ban_id); - } -} diff --git a/p2p/src/address_book/address_book/tests.rs b/p2p/src/address_book/address_book/tests.rs deleted file mode 100644 index acf7460..0000000 --- a/p2p/src/address_book/address_book/tests.rs +++ /dev/null @@ -1,81 +0,0 @@ -use super::*; -use crate::NetZoneBasicNodeData; -use monero_wire::network_address::IPv4Address; -use rand::Rng; - -fn create_random_net_address(r: &mut R) -> NetworkAddress { - NetworkAddress::IPv4(IPv4Address { - m_ip: r.gen(), - m_port: r.gen(), - }) -} - -fn create_random_net_addr_vec(r: &mut R, len: usize) -> Vec { - let mut ret = Vec::with_capacity(len); - for i in 0..len { - ret.push(create_random_net_address(r)); - } - ret -} - -fn create_random_peer(r: &mut R) -> PeerListEntryBase { - PeerListEntryBase { - adr: create_random_net_address(r), - pruning_seed: r.gen_range(384..=391), - id: PeerID(r.gen()), - last_seen: r.gen(), - rpc_port: r.gen(), - rpc_credits_per_hash: r.gen(), - } -} - -fn create_random_peer_vec(r: &mut R, len: usize) -> Vec { - let mut ret = Vec::with_capacity(len); - for i in 0..len { - ret.push(create_random_peer(r)); - } - ret -} - -#[derive(Clone)] -pub struct MockPeerStore; - -#[async_trait::async_trait] -impl P2PStore for MockPeerStore { - async fn basic_node_data(&mut self) -> Result, &'static str> { - unimplemented!() - } - async fn save_basic_node_data( - &mut self, - node_id: &NetZoneBasicNodeData, - ) -> Result<(), &'static str> { - unimplemented!() - } - async fn load_peers( - &mut self, - zone: NetZone, - ) -> Result< - ( - Vec, - Vec, - Vec, - ), - &'static str, - > { - let mut r = rand::thread_rng(); - Ok(( - create_random_peer_vec(&mut r, 300), - create_random_peer_vec(&mut r, 1500), - create_random_net_addr_vec(&mut r, 50), - )) - } - async fn save_peers( - &mut self, - zone: NetZone, - white: Vec<&PeerListEntryBase>, - gray: Vec<&PeerListEntryBase>, - anchor: Vec<&NetworkAddress>, - ) -> Result<(), &'static str> { - todo!() - } -} diff --git a/p2p/src/address_book/connection_handle.rs b/p2p/src/address_book/connection_handle.rs deleted file mode 100644 index 1f36155..0000000 --- a/p2p/src/address_book/connection_handle.rs +++ /dev/null @@ -1,110 +0,0 @@ -//! This module contains the address book [`Connection`](crate::peer::connection::Connection) handle -//! -//! # Why do we need a handle between the address book and connection task -//! -//! When banning a peer we need to tell the connection task to close and -//! when we close a connection we need to remove it from our connection -//! and anchor list. -//! -//! -use futures::channel::oneshot; -use tokio_util::sync::CancellationToken; - -/// A message sent to tell the address book that a peer has disconnected. -pub struct PeerConnectionClosed; - -/// The connection side of the address book to connection -/// communication. -#[derive(Debug)] -pub struct AddressBookConnectionHandle { - connection_closed: Option>, - close: CancellationToken, -} - -impl AddressBookConnectionHandle { - /// Returns true if the address book has told us to kill the - /// connection. - pub fn is_canceled(&self) -> bool { - self.close.is_cancelled() - } -} - -impl Drop for AddressBookConnectionHandle { - fn drop(&mut self) { - let connection_closed = std::mem::replace(&mut self.connection_closed, None).unwrap(); - let _ = connection_closed.send(PeerConnectionClosed); - } -} - -/// The address book side of the address book to connection -/// communication. -#[derive(Debug)] -pub struct ConnectionAddressBookHandle { - connection_closed: oneshot::Receiver, - killer: CancellationToken, -} - -impl ConnectionAddressBookHandle { - /// Checks if the connection task has closed, returns - /// true if the task has closed - pub fn connection_closed(&mut self) -> bool { - let Ok(mes) = self.connection_closed.try_recv() else { - panic!("This must not be called again after returning true and the connection task must tell us if a connection is closed") - }; - match mes { - None => false, - Some(_) => true, - } - } - - /// Ends the connection task, the caller of this function should - /// wait to be told the connection has closed by [`check_if_connection_closed`](Self::check_if_connection_closed) - /// before acting on the closed connection. - pub fn kill_connection(&self) { - self.killer.cancel() - } -} - -/// Creates a new handle pair that can be given to the connection task and -/// address book respectively. -pub fn new_address_book_connection_handle( -) -> (AddressBookConnectionHandle, ConnectionAddressBookHandle) { - let (tx, rx) = oneshot::channel(); - let token = CancellationToken::new(); - - let ab_c_h = AddressBookConnectionHandle { - connection_closed: Some(tx), - close: token.clone(), - }; - let c_ab_h = ConnectionAddressBookHandle { - connection_closed: rx, - killer: token, - }; - - (ab_c_h, c_ab_h) -} - -#[cfg(test)] -mod tests { - use crate::address_book::connection_handle::new_address_book_connection_handle; - - #[test] - fn close_connection_from_address_book() { - let (conn_side, mut addr_side) = new_address_book_connection_handle(); - - assert!(!conn_side.is_canceled()); - assert!(!addr_side.connection_closed()); - addr_side.kill_connection(); - assert!(conn_side.is_canceled()); - } - - #[test] - fn close_connection_from_connection() { - let (conn_side, mut addr_side) = new_address_book_connection_handle(); - - assert!(!conn_side.is_canceled()); - assert!(!addr_side.connection_closed()); - drop(conn_side); - assert!(addr_side.connection_closed()); - } -} diff --git a/p2p/src/config.rs b/p2p/src/config.rs deleted file mode 100644 index 9d8db6b..0000000 --- a/p2p/src/config.rs +++ /dev/null @@ -1,78 +0,0 @@ -use cuprate_common::Network; -use monero_wire::messages::{common::PeerSupportFlags, BasicNodeData, PeerID}; - -use crate::{ - constants::{ - CUPRATE_SUPPORT_FLAGS, DEFAULT_IN_PEERS, DEFAULT_LOAD_OUT_PEERS_MULTIPLIER, - DEFAULT_TARGET_OUT_PEERS, MAX_GRAY_LIST_PEERS, MAX_WHITE_LIST_PEERS, - }, - NodeID, -}; - -#[derive(Debug, Clone, Copy)] -pub struct Config { - /// Port - my_port: u32, - /// The Network - network: Network, - /// RPC Port - rpc_port: u16, - - target_out_peers: usize, - out_peers_load_multiplier: usize, - max_in_peers: usize, - max_white_peers: usize, - max_gray_peers: usize, -} - -impl Default for Config { - fn default() -> Self { - Config { - my_port: 18080, - network: Network::MainNet, - rpc_port: 18081, - target_out_peers: DEFAULT_TARGET_OUT_PEERS, - out_peers_load_multiplier: DEFAULT_LOAD_OUT_PEERS_MULTIPLIER, - max_in_peers: DEFAULT_IN_PEERS, - max_white_peers: MAX_WHITE_LIST_PEERS, - max_gray_peers: MAX_GRAY_LIST_PEERS, - } - } -} - -impl Config { - pub fn basic_node_data(&self, peer_id: PeerID) -> BasicNodeData { - BasicNodeData { - my_port: self.my_port, - network_id: self.network.network_id(), - peer_id, - support_flags: CUPRATE_SUPPORT_FLAGS, - rpc_port: self.rpc_port, - rpc_credits_per_hash: 0, - } - } - - pub fn peerset_total_connection_limit(&self) -> usize { - self.target_out_peers * self.out_peers_load_multiplier + self.max_in_peers - } - - pub fn network(&self) -> Network { - self.network - } - - pub fn max_white_peers(&self) -> usize { - self.max_white_peers - } - - pub fn max_gray_peers(&self) -> usize { - self.max_gray_peers - } - - pub fn public_port(&self) -> u32 { - self.my_port - } - - pub fn public_rpc_port(&self) -> u16 { - self.rpc_port - } -} diff --git a/p2p/src/connection_counter.rs b/p2p/src/connection_counter.rs deleted file mode 100644 index c8cc840..0000000 --- a/p2p/src/connection_counter.rs +++ /dev/null @@ -1,125 +0,0 @@ -use std::{fmt, sync::Arc}; - -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; - -/// A counter for active connections. -/// -/// Creates a [`ConnectionTracker`] to track each active connection. -/// When these trackers are dropped, the counter gets notified. -pub struct ActiveConnectionCounter { - /// The limit for this type of connection, for diagnostics only. - /// The caller must enforce the limit by ignoring, delaying, or dropping connections. - limit: usize, - - /// The label for this connection counter, typically its type. - label: Arc, - - semaphore: Arc, -} - -impl fmt::Debug for ActiveConnectionCounter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ActiveConnectionCounter") - .field("label", &self.label) - .field("count", &self.count()) - .field("limit", &self.limit) - .finish() - } -} - -impl ActiveConnectionCounter { - /// Create and return a new active connection counter. - pub fn new_counter() -> Self { - Self::new_counter_with(Semaphore::MAX_PERMITS, "Active Connections") - } - - /// Create and return a new active connection counter with `limit` and `label`. - /// The caller must check and enforce limits using [`update_count()`](Self::update_count). - pub fn new_counter_with(limit: usize, label: S) -> Self { - let label = label.to_string(); - - Self { - limit, - label: label.into(), - semaphore: Arc::new(Semaphore::new(limit)), - } - } - - /// Create and return a new [`ConnectionTracker`], using a permit from the semaphore, - /// SAFETY: - /// This function will panic if the semaphore doesn't have anymore permits. - pub fn track_connection(&mut self) -> ConnectionTracker { - ConnectionTracker::new(self) - } - - pub fn count(&self) -> usize { - let count = self - .limit - .checked_sub(self.semaphore.available_permits()) - .expect("Limit is less than available connection permits"); - - tracing::trace!( - open_connections = ?count, - limit = ?self.limit, - label = ?self.label, - ); - - count - } - - pub fn available_permits(&self) -> usize { - self.semaphore.available_permits() - } -} - -/// A per-connection tracker. -/// -/// [`ActiveConnectionCounter`] creates a tracker instance for each active connection. -pub struct ConnectionTracker { - /// The permit for this connection, updates the semaphore when dropped. - permit: OwnedSemaphorePermit, - - /// The label for this connection counter, typically its type. - label: Arc, -} - -impl fmt::Debug for ConnectionTracker { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("ConnectionTracker") - .field(&self.label) - .finish() - } -} - -impl ConnectionTracker { - /// Create and return a new active connection tracker, and add 1 to `counter`. - /// All connection trackers share a label with their connection counter. - /// - /// When the returned tracker is dropped, `counter` will be notified. - /// - /// SAFETY: - /// This function will panic if the [`ActiveConnectionCounter`] doesn't have anymore permits. - fn new(counter: &mut ActiveConnectionCounter) -> Self { - tracing::debug!( - open_connections = ?counter.count(), - limit = ?counter.limit, - label = ?counter.label, - "opening a new peer connection", - ); - - Self { - permit: counter.semaphore.clone().try_acquire_owned().unwrap(), - label: counter.label.clone(), - } - } -} - -impl Drop for ConnectionTracker { - fn drop(&mut self) { - tracing::debug!( - label = ?self.label, - "A peer connection has closed", - ); - // the permit is automatically dropped - } -} diff --git a/p2p/src/connection_handle.rs b/p2p/src/connection_handle.rs deleted file mode 100644 index f3d3601..0000000 --- a/p2p/src/connection_handle.rs +++ /dev/null @@ -1,98 +0,0 @@ -//! -//! # Why do we need a handle between the address book and connection task -//! -//! When banning a peer we need to tell the connection task to close and -//! when we close a connection we need to tell the address book. -//! -//! -use std::time::Duration; - -use futures::channel::mpsc; -use futures::SinkExt; -use tokio_util::sync::CancellationToken; - -use crate::connection_counter::ConnectionTracker; - -#[derive(Default, Debug)] -pub struct HandleBuilder { - tracker: Option, -} - -impl HandleBuilder { - pub fn set_tracker(&mut self, tracker: ConnectionTracker) { - self.tracker = Some(tracker) - } - - pub fn build(self) -> (DisconnectSignal, ConnectionHandle, PeerHandle) { - let token = CancellationToken::new(); - let (tx, rx) = mpsc::channel(0); - - ( - DisconnectSignal { - token: token.clone(), - tracker: self.tracker.expect("Tracker was not set!"), - }, - ConnectionHandle { - token: token.clone(), - ban: rx, - }, - PeerHandle { ban: tx }, - ) - } -} - -pub struct BanPeer(pub Duration); - -/// A struct given to the connection task. -pub struct DisconnectSignal { - token: CancellationToken, - tracker: ConnectionTracker, -} - -impl DisconnectSignal { - pub fn should_shutdown(&self) -> bool { - self.token.is_cancelled() - } - pub fn connection_closed(&self) { - self.token.cancel() - } -} - -impl Drop for DisconnectSignal { - fn drop(&mut self) { - self.token.cancel() - } -} - -/// A handle given to a task that needs to cancel this connection. -pub struct ConnectionHandle { - token: CancellationToken, - ban: mpsc::Receiver, -} - -impl ConnectionHandle { - pub fn is_closed(&self) -> bool { - self.token.is_cancelled() - } - pub fn check_should_ban(&mut self) -> Option { - match self.ban.try_next() { - Ok(res) => res, - Err(_) => None, - } - } - pub fn send_close_signal(&self) { - self.token.cancel() - } -} - -/// A handle given to a task that needs to be able to ban a connection. -#[derive(Clone)] -pub struct PeerHandle { - ban: mpsc::Sender, -} - -impl PeerHandle { - pub fn ban_peer(&mut self, duration: Duration) { - let _ = self.ban.send(BanPeer(duration)); - } -} diff --git a/p2p/src/constants.rs b/p2p/src/constants.rs deleted file mode 100644 index 4d3c900..0000000 --- a/p2p/src/constants.rs +++ /dev/null @@ -1,58 +0,0 @@ -use core::time::Duration; - -use monero_wire::messages::common::PeerSupportFlags; - -pub const CUPRATE_SUPPORT_FLAGS: PeerSupportFlags = - PeerSupportFlags::get_support_flag_fluffy_blocks(); - -pub const CUPRATE_MINIMUM_SUPPORT_FLAGS: PeerSupportFlags = - PeerSupportFlags::get_support_flag_fluffy_blocks(); - -pub const DEFAULT_TARGET_OUT_PEERS: usize = 20; - -pub const DEFAULT_LOAD_OUT_PEERS_MULTIPLIER: usize = 3; - -pub const DEFAULT_IN_PEERS: usize = 20; - -pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(5); - -pub const ADDRESS_BOOK_SAVE_INTERVAL: Duration = Duration::from_secs(60); - -pub const ADDRESS_BOOK_BUFFER_SIZE: usize = 3; - -pub const PEERSET_BUFFER_SIZE: usize = 3; - -/// The maximum size of the address books white list. -/// This number is copied from monerod. -pub const MAX_WHITE_LIST_PEERS: usize = 1000; - -/// The maximum size of the address books gray list. -/// This number is copied from monerod. -pub const MAX_GRAY_LIST_PEERS: usize = 5000; - -/// The max amount of peers that can be sent in one -/// message. -pub const P2P_MAX_PEERS_IN_HANDSHAKE: usize = 250; - -/// The timeout for sending a message to a remote peer, -/// and receiving a response from a remote peer. -pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(20); - -/// The default RTT estimate for peer responses. -/// -/// We choose a high value for the default RTT, so that new peers must prove they -/// are fast, before we prefer them to other peers. This is particularly -/// important on testnet, which has a small number of peers, which are often -/// slow. -/// -/// Make the default RTT slightly higher than the request timeout. -pub const EWMA_DEFAULT_RTT: Duration = Duration::from_secs(REQUEST_TIMEOUT.as_secs() + 1); - -/// The decay time for the EWMA response time metric used for load balancing. -/// -/// This should be much larger than the `SYNC_RESTART_TIMEOUT`, so we choose -/// better peers when we restart the sync. -pub const EWMA_DECAY_TIME_NANOS: f64 = 200.0 * NANOS_PER_SECOND; - -/// The number of nanoseconds in one second. -const NANOS_PER_SECOND: f64 = 1_000_000_000.0; diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs deleted file mode 100644 index cf1fc44..0000000 --- a/p2p/src/lib.rs +++ /dev/null @@ -1,81 +0,0 @@ -pub mod address_book; -pub mod config; -pub mod connection_counter; -mod connection_handle; -mod constants; -pub mod peer; -mod protocol; - -pub use config::Config; -use rand::Rng; - -#[derive(Debug, Clone)] -pub struct NetZoneBasicNodeData { - public: monero_wire::BasicNodeData, - tor: monero_wire::BasicNodeData, - i2p: monero_wire::BasicNodeData, -} - -impl NetZoneBasicNodeData { - pub fn basic_node_data(&self, net_zone: &monero_wire::NetZone) -> monero_wire::BasicNodeData { - match net_zone { - monero_wire::NetZone::Public => self.public.clone(), - _ => todo!(), - } - } - pub fn new(config: &Config, node_id: &NodeID) -> Self { - let bnd = monero_wire::BasicNodeData { - my_port: config.public_port(), - network_id: config.network().network_id(), - peer_id: node_id.public, - support_flags: constants::CUPRATE_SUPPORT_FLAGS, - rpc_port: config.public_rpc_port(), - rpc_credits_per_hash: 0, - }; - - // obviously this is wrong, i will change when i add tor support - NetZoneBasicNodeData { - public: bnd.clone(), - tor: bnd.clone(), - i2p: bnd, - } - } -} - -#[async_trait::async_trait] -pub trait P2PStore: Clone + Send + 'static { - /// Loads the peers from the peer store. - /// returns (in order): - /// the white list, - /// the gray list, - /// the anchor list, - /// the ban list - async fn load_peers( - &mut self, - zone: monero_wire::NetZone, - ) -> Result< - ( - Vec, // white list - Vec, // gray list - Vec, // anchor list - // Vec<(monero_wire::NetworkAddress, chrono::NaiveDateTime)>, // ban list - ), - &'static str, - >; - - async fn save_peers( - &mut self, - zone: monero_wire::NetZone, - white: Vec<&monero_wire::PeerListEntryBase>, - gray: Vec<&monero_wire::PeerListEntryBase>, - anchor: Vec<&monero_wire::NetworkAddress>, - // bans: Vec<(&monero_wire::NetworkAddress, &chrono::NaiveDateTime)>, // ban lists - ) -> Result<(), &'static str>; - - async fn basic_node_data(&mut self) -> Result, &'static str>; - - async fn save_basic_node_data( - &mut self, - node_id: &NetZoneBasicNodeData, - ) -> Result<(), &'static str>; -} diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs deleted file mode 100644 index 5d1f2ae..0000000 --- a/p2p/src/peer.rs +++ /dev/null @@ -1,16 +0,0 @@ -pub mod client; -pub mod connection; -pub mod connector; -pub mod handshaker; -pub mod load_tracked_client; - -mod error; -#[cfg(test)] -mod tests; - -pub use client::Client; -pub use client::ConnectionInfo; -pub use connection::Connection; -pub use connector::{Connector, OutboundConnectorRequest}; -pub use handshaker::Handshaker; -pub use load_tracked_client::LoadTrackedClient; diff --git a/p2p/src/peer/client.rs b/p2p/src/peer/client.rs deleted file mode 100644 index b79a80c..0000000 --- a/p2p/src/peer/client.rs +++ /dev/null @@ -1,176 +0,0 @@ -use std::pin::Pin; -use std::sync::atomic::AtomicU64; -use std::task::{Context, Poll}; -use std::{future::Future, sync::Arc}; - -use futures::{ - channel::{mpsc, oneshot}, - FutureExt, -}; -use tokio::task::JoinHandle; -use tower::BoxError; - -use cuprate_common::PruningSeed; -use monero_wire::{messages::common::PeerSupportFlags, NetworkAddress}; - -use super::{ - connection::ClientRequest, - error::{ErrorSlot, PeerError, SharedPeerError}, - PeerError, -}; -use crate::connection_handle::PeerHandle; -use crate::protocol::{InternalMessageRequest, InternalMessageResponse}; - -pub struct ConnectionInfo { - pub support_flags: PeerSupportFlags, - pub pruning_seed: PruningSeed, - pub handle: PeerHandle, - pub rpc_port: u16, - pub rpc_credits_per_hash: u32, -} - -pub struct Client { - pub connection_info: Arc, - /// Used to shut down the corresponding heartbeat. - /// This is always Some except when we take it on drop. - heartbeat_shutdown_tx: Option>, - server_tx: mpsc::Sender, - connection_task: JoinHandle<()>, - heartbeat_task: JoinHandle<()>, - - error_slot: ErrorSlot, -} - -impl Client { - pub fn new( - connection_info: Arc, - heartbeat_shutdown_tx: oneshot::Sender<()>, - server_tx: mpsc::Sender, - connection_task: JoinHandle<()>, - heartbeat_task: JoinHandle<()>, - error_slot: ErrorSlot, - ) -> Self { - Client { - connection_info, - heartbeat_shutdown_tx: Some(heartbeat_shutdown_tx), - server_tx, - connection_task, - heartbeat_task, - error_slot, - } - } - - /// Check if this connection's heartbeat task has exited. - #[allow(clippy::unwrap_in_result)] - fn check_heartbeat(&mut self, cx: &mut Context<'_>) -> Result<(), SharedPeerError> { - let is_canceled = self - .heartbeat_shutdown_tx - .as_mut() - .expect("only taken on drop") - .poll_canceled(cx) - .is_ready(); - - if is_canceled { - return self.set_task_exited_error( - "heartbeat", - PeerError::HeartbeatTaskExited("Task was cancelled".to_string()), - ); - } - - match self.heartbeat_task.poll_unpin(cx) { - Poll::Pending => { - // Heartbeat task is still running. - Ok(()) - } - Poll::Ready(Ok(Ok(_))) => { - // Heartbeat task stopped unexpectedly, without panic or error. - self.set_task_exited_error( - "heartbeat", - PeerError::HeartbeatTaskExited( - "Heartbeat task stopped unexpectedly".to_string(), - ), - ) - } - Poll::Ready(Ok(Err(error))) => { - // Heartbeat task stopped unexpectedly, with error. - self.set_task_exited_error( - "heartbeat", - PeerError::HeartbeatTaskExited(error.to_string()), - ) - } - Poll::Ready(Err(error)) => { - // Heartbeat task was cancelled. - if error.is_cancelled() { - self.set_task_exited_error( - "heartbeat", - PeerError::HeartbeatTaskExited("Task was cancelled".to_string()), - ) - } - // Heartbeat task stopped with panic. - else if error.is_panic() { - panic!("heartbeat task has panicked: {error}"); - } - // Heartbeat task stopped with error. - else { - self.set_task_exited_error( - "heartbeat", - PeerError::HeartbeatTaskExited(error.to_string()), - ) - } - } - } - } - - /// Check if the connection's task has exited. - fn check_connection(&mut self, context: &mut Context<'_>) -> Result<(), PeerError> { - match self.connection_task.poll_unpin(context) { - Poll::Pending => { - // Connection task is still running. - Ok(()) - } - Poll::Ready(Ok(())) => { - // Connection task stopped unexpectedly, without panicking. - return Err(PeerError::ConnectionTaskClosed); - } - Poll::Ready(Err(error)) => { - // Connection task stopped unexpectedly with a panic. shut the node down. - tracing::error!("Peer Connection task panicked: {error}, shutting the node down!"); - set_shutting_down(); - return Err(PeerError::ConnectionTaskClosed); - } - } - } -} - -impl tower::Service for Client { - type Response = InternalMessageResponse; - type Error = SharedPeerError; - type Future = - Pin> + Send + 'static>>; - - fn poll_ready( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.server_tx - .poll_ready(cx) - .map_err(|e| PeerError::ClientChannelClosed.into()) - } - fn call(&mut self, req: InternalMessageRequest) -> Self::Future { - let (tx, rx) = oneshot::channel(); - - match self.server_tx.try_send(ClientRequest { req, tx }) { - Ok(()) => rx - .map(|recv_result| { - recv_result - .expect("ClientRequest oneshot sender must not be dropped before send") - .map_err(|e| e.into()) - }) - .boxed(), - Err(_) => { - // TODO: better error handling - futures::future::ready(Err(PeerError::ClientChannelClosed.into())).boxed() - } - } - } -} diff --git a/p2p/src/peer/connection.rs b/p2p/src/peer/connection.rs deleted file mode 100644 index d518d2e..0000000 --- a/p2p/src/peer/connection.rs +++ /dev/null @@ -1,169 +0,0 @@ -use futures::channel::{mpsc, oneshot}; -use futures::stream::FusedStream; -use futures::{Sink, SinkExt, Stream, StreamExt}; - -use monero_wire::{Message, BucketError}; -use tower::{BoxError, Service}; - -use crate::connection_handle::DisconnectSignal; -use crate::peer::error::{ErrorSlot, PeerError, SharedPeerError}; -use crate::peer::handshaker::ConnectionAddr; -use crate::protocol::internal_network::{MessageID, Request, Response}; - -pub struct ClientRequest { - pub req: Request, - pub tx: oneshot::Sender>, -} - -pub enum State { - WaitingForRequest, - WaitingForResponse { - request_id: MessageID, - tx: oneshot::Sender>, - }, -} - -pub struct Connection { - address: ConnectionAddr, - state: State, - sink: Snk, - client_rx: mpsc::Receiver, - - error_slot: ErrorSlot, - - /// # Security - /// - /// If this connection tracker or `Connection`s are leaked, - /// the number of active connections will appear higher than it actually is. - /// If enough connections leak, Cuprate will stop making new connections. - connection_tracker: DisconnectSignal, - - svc: Svc, -} - -impl Connection -where - Svc: Service, - Snk: Sink + Unpin, -{ - pub fn new( - address: ConnectionAddr, - sink: Snk, - client_rx: mpsc::Receiver, - error_slot: ErrorSlot, - connection_tracker: DisconnectSignal, - svc: Svc, - ) -> Connection { - Connection { - address, - state: State::WaitingForRequest, - sink, - client_rx, - error_slot, - connection_tracker, - svc, - } - } - async fn handle_response(&mut self, res: Response) -> Result<(), PeerError> { - let state = std::mem::replace(&mut self.state, State::WaitingForRequest); - if let State::WaitingForResponse { request_id, tx } = state { - if request_id != res.id() { - // TODO: Fail here - return Err(PeerError::PeerSentIncorrectResponse); - } - - // response passed our tests we can send it to the requester - let _ = tx.send(Ok(res)); - Ok(()) - } else { - unreachable!("This will only be called when in state WaitingForResponse"); - } - } - - async fn send_message_to_peer(&mut self, mes: impl Into) -> Result<(), PeerError> { - Ok(self.sink.send(mes.into()).await?) - } - - async fn handle_peer_request(&mut self, req: Request) -> Result<(), PeerError> { - // we should check contents of peer requests for obvious errors like we do with responses - todo!() - /* - let ready_svc = self.svc.ready().await?; - let res = ready_svc.call(req).await?; - self.send_message_to_peer(res).await - */ - } - - async fn handle_client_request(&mut self, req: ClientRequest) -> Result<(), PeerError> { - if req.req.needs_response() { - self.state = State::WaitingForResponse { - request_id: req.req.id(), - tx: req.tx, - }; - } - // TODO: send NA response to requester - self.send_message_to_peer(req.req).await - } - - async fn state_waiting_for_request(&mut self, stream: &mut Str) -> Result<(), PeerError> - where - Str: FusedStream> + Unpin, - { - futures::select! { - peer_message = stream.next() => { - match peer_message.expect("MessageStream will never return None") { - Ok(message) => { - self.handle_peer_request(message.try_into().map_err(|_| PeerError::ResponseError(""))?).await - }, - Err(e) => Err(e.into()), - } - }, - client_req = self.client_rx.next() => { - self.handle_client_request(client_req.ok_or(PeerError::ClientChannelClosed)?).await - }, - } - } - - async fn state_waiting_for_response(&mut self, stream: &mut Str) -> Result<(), PeerError> - where - Str: FusedStream> + Unpin, - { - // put a timeout on this - let peer_message = stream - .next() - .await - .expect("MessageStream will never return None")?; - - if !peer_message.is_request() - && self.state.expected_response_id() == Some(peer_message.id()) - { - if let Ok(res) = peer_message.try_into() { - Ok(self.handle_response(res).await?) - } else { - // im almost certain this is impossible to hit, but im not certain enough to use unreachable!() - Err(PeerError::ResponseError("Peer sent incorrect response")) - } - } else { - if let Ok(req) = peer_message.try_into() { - self.handle_peer_request(req).await - } else { - // this can be hit if the peer sends a protocol response with the wrong id - Err(PeerError::ResponseError("Peer sent incorrect response")) - } - } - } - - pub async fn run(mut self, mut stream: Str) - where - Str: FusedStream> + Unpin, - { - loop { - let _res = match self.state { - State::WaitingForRequest => self.state_waiting_for_request(&mut stream).await, - State::WaitingForResponse { .. } => { - self.state_waiting_for_response(&mut stream).await - } - }; - } - } -} diff --git a/p2p/src/peer/connector.rs b/p2p/src/peer/connector.rs deleted file mode 100644 index 28f09f9..0000000 --- a/p2p/src/peer/connector.rs +++ /dev/null @@ -1,159 +0,0 @@ -//! Wrapper around handshake logic that also opens a TCP connection. - -use std::{ - future::Future, - net::SocketAddr, - pin::Pin, - task::{Context, Poll}, -}; - -use futures::{AsyncRead, AsyncWrite, FutureExt}; -use monero_wire::{network_address::NetZone, NetworkAddress}; -use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; -use tower::{BoxError, Service, ServiceExt}; -use tracing::Instrument; - -use crate::peer::handshaker::ConnectionAddr; -use crate::{ - address_book::{AddressBookRequest, AddressBookResponse}, - connection_counter::ConnectionTracker, - protocol::{ - CoreSyncDataRequest, CoreSyncDataResponse, InternalMessageRequest, InternalMessageResponse, - }, -}; - -use super::{ - handshaker::{DoHandshakeRequest, Handshaker}, - Client, -}; - -async fn connect(addr: &NetworkAddress) -> Result<(impl AsyncRead, impl AsyncWrite), BoxError> { - match addr.get_zone() { - NetZone::Public => { - let stream = - tokio::net::TcpStream::connect(SocketAddr::try_from(*addr).unwrap()).await?; - let (read, write) = stream.into_split(); - Ok((read.compat(), write.compat_write())) - } - _ => unimplemented!(), - } -} - -/// A wrapper around [`Handshake`] that opens a connection before -/// forwarding to the inner handshake service. Writing this as its own -/// [`tower::Service`] lets us apply unified timeout policies, etc. -#[derive(Debug, Clone)] -pub struct Connector -where - CoreSync: Service - + Clone - + Send - + 'static, - CoreSync::Future: Send, - - Svc: Service - + Clone - + Send - + 'static, - Svc::Future: Send, - - AdrBook: Service - + Clone - + Send - + 'static, - AdrBook::Future: Send, -{ - handshaker: Handshaker, -} - -impl Connector -where - CoreSync: Service - + Clone - + Send - + 'static, - CoreSync::Future: Send, - - Svc: Service - + Clone - + Send - + 'static, - Svc::Future: Send, - - AdrBook: Service - + Clone - + Send - + 'static, - AdrBook::Future: Send, -{ - pub fn new(handshaker: Handshaker) -> Self { - Connector { handshaker } - } -} - -/// A connector request. -/// Contains the information needed to make an outbound connection to the peer. -pub struct OutboundConnectorRequest { - /// The Monero listener address of the peer. - pub addr: NetworkAddress, - - /// A connection tracker that reduces the open connection count when dropped. - /// - /// Used to limit the number of open connections in Cuprate. - pub connection_tracker: ConnectionTracker, -} - -impl Service for Connector -where - CoreSync: Service - + Clone - + Send - + 'static, - CoreSync::Future: Send, - - Svc: Service - + Clone - + Send - + 'static, - Svc::Future: Send, - - AdrBook: Service - + Clone - + Send - + 'static, - AdrBook::Future: Send, -{ - type Response = (NetworkAddress, Client); - type Error = BoxError; - type Future = - Pin> + Send + 'static>>; - - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: OutboundConnectorRequest) -> Self::Future { - let OutboundConnectorRequest { - addr: address, - connection_tracker, - }: OutboundConnectorRequest = req; - - let hs = self.handshaker.clone(); - let connector_span = tracing::info_span!("connector", peer = ?address); - - async move { - let (read, write) = connect(&address).await?; - let client = hs - .oneshot(DoHandshakeRequest { - read, - write, - addr: ConnectionAddr::OutBound { address }, - connection_tracker, - }) - .await?; - Ok((address, client)) - } - .instrument(connector_span) - .boxed() - } -} diff --git a/p2p/src/peer/error.rs b/p2p/src/peer/error.rs deleted file mode 100644 index bbf3650..0000000 --- a/p2p/src/peer/error.rs +++ /dev/null @@ -1,116 +0,0 @@ -use std::sync::{Arc, Mutex}; - -use monero_wire::BucketError; -use thiserror::Error; -use tracing_error::TracedError; - -/// A wrapper around `Arc` that implements `Error`. -#[derive(Error, Debug, Clone)] -#[error(transparent)] -pub struct SharedPeerError(Arc>); - -impl From for SharedPeerError -where - PeerError: From, -{ - fn from(source: E) -> Self { - Self(Arc::new(TracedError::from(PeerError::from(source)))) - } -} - -impl SharedPeerError { - /// Returns a debug-formatted string describing the inner [`PeerError`]. - /// - /// Unfortunately, [`TracedError`] makes it impossible to get a reference to the original error. - pub fn inner_debug(&self) -> String { - format!("{:?}", self.0.as_ref()) - } -} - -#[derive(Debug, Error)] -pub enum PeerError { - #[error("The connection task has closed.")] - ConnectionTaskClosed, - #[error("Error with peers response: {0}.")] - ResponseError(&'static str), - #[error("The connected peer sent an an unexpected response message.")] - PeerSentUnexpectedResponse, - #[error("The connected peer sent an incorrect response.")] - BucketError(#[from] BucketError), - #[error("The channel was closed.")] - ClientChannelClosed, -} - -/// A shared error slot for peer errors. -/// -/// # Correctness -/// -/// Error slots are shared between sync and async code. In async code, the error -/// mutex should be held for as short a time as possible. This avoids blocking -/// the async task thread on acquiring the mutex. -/// -/// > If the value behind the mutex is just data, it’s usually appropriate to use a blocking mutex -/// > ... -/// > wrap the `Arc>` in a struct -/// > that provides non-async methods for performing operations on the data within, -/// > and only lock the mutex inside these methods -/// -/// -#[derive(Default, Clone)] -pub struct ErrorSlot(Arc>>); - -impl std::fmt::Debug for ErrorSlot { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - // don't hang if the mutex is locked - // show the panic if the mutex was poisoned - f.debug_struct("ErrorSlot") - .field("error", &self.0.try_lock()) - .finish() - } -} - -impl ErrorSlot { - /// Read the current error in the slot. - /// - /// Returns `None` if there is no error in the slot. - /// - /// # Correctness - /// - /// Briefly locks the error slot's threaded `std::sync::Mutex`, to get a - /// reference to the error in the slot. - #[allow(clippy::unwrap_in_result)] - pub fn try_get_error(&self) -> Option { - self.0 - .lock() - .expect("error mutex should be unpoisoned") - .as_ref() - .cloned() - } - - /// Update the current error in the slot. - /// - /// Returns `Err(AlreadyErrored)` if there was already an error in the slot. - /// - /// # Correctness - /// - /// Briefly locks the error slot's threaded `std::sync::Mutex`, to check for - /// a previous error, then update the error in the slot. - #[allow(clippy::unwrap_in_result)] - pub fn try_update_error(&self, e: SharedPeerError) -> Result<(), AlreadyErrored> { - let mut guard = self.0.lock().expect("error mutex should be unpoisoned"); - - if let Some(original_error) = guard.clone() { - Err(AlreadyErrored { original_error }) - } else { - *guard = Some(e); - Ok(()) - } - } -} - -/// Error returned when the [`ErrorSlot`] already contains an error. -#[derive(Clone, Debug)] -pub struct AlreadyErrored { - /// The original error in the error slot. - pub original_error: SharedPeerError, -} diff --git a/p2p/src/peer/handshaker.rs b/p2p/src/peer/handshaker.rs deleted file mode 100644 index e1b4641..0000000 --- a/p2p/src/peer/handshaker.rs +++ /dev/null @@ -1,627 +0,0 @@ -/// This module contains the logic for turning [`AsyncRead`] and [`AsyncWrite`] -/// into [`Client`] and [`Connection`]. -/// -/// The main entry point is modeled as a [`tower::Service`] the struct being -/// [`Handshaker`]. The [`Handshaker`] accepts handshake requests: [`DoHandshakeRequest`] -/// and creates a state machine that's drives the handshake forward: [`HandshakeSM`] and -/// eventually outputs a [`Client`] and [`Connection`]. -/// -use std::future::Future; -use std::net::SocketAddr; -use std::pin::Pin; - -use futures::{channel::mpsc, sink::Sink, SinkExt, Stream}; -use futures::{FutureExt, StreamExt}; -use thiserror::Error; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - time, -}; -use tokio_util::codec::{FramedRead, FramedWrite}; -use tower::{BoxError, Service, ServiceExt}; -use tracing::Instrument; - -use cuprate_common::{Network, PruningSeed}; -use monero_wire::messages::admin::SupportFlagsResponse; -use monero_wire::{ - messages::{ - admin::{HandshakeRequest, HandshakeResponse}, - common::PeerSupportFlags, - BasicNodeData, CoreSyncData, PeerID, PeerListEntryBase, - }, - BucketError, Message, MoneroWireCodec, NetZone, NetworkAddress, RequestMessage, - ResponseMessage, -}; - -use super::{ - client::{Client, ConnectionInfo}, - connection::Connection, - PeerError, -}; -use crate::address_book::connection_handle::new_address_book_connection_handle; -use crate::address_book::{AddressBookRequest, AddressBookResponse}; -use crate::connection_counter::ConnectionTracker; -use crate::constants::{ - CUPRATE_MINIMUM_SUPPORT_FLAGS, HANDSHAKE_TIMEOUT, P2P_MAX_PEERS_IN_HANDSHAKE, -}; -use crate::protocol::{ - CoreSyncDataRequest, CoreSyncDataResponse, Direction, InternalMessageRequest, - InternalMessageResponse, -}; -use crate::NetZoneBasicNodeData; - -/// Possible handshake errors -#[derive(Debug, Error)] -pub enum HandShakeError { - /// The peer did not complete the handshake fast enough. - #[error("The peer did not complete the handshake fast enough")] - PeerTimedOut, - /// The Peer has non-standard pruning. - #[error("The peer has a weird pruning scheme")] - PeerClaimedWeirdPruning, - /// The peer does not have the minimum support flags - #[error("The peer does not have the minimum support flags")] - PeerDoesNotHaveTheMinimumSupportFlags, - /// The peer is not on the network we are on (MAINNET|TESTNET|STAGENET) - #[error("The peer is on a different network")] - PeerIsOnADifferentNetwork, - /// The peer sent us too many peers, more than [`P2P_MAX_PEERS_IN_HANDSHAKE`] - #[error("The peer sent too many peers, considered spamming")] - PeerSentTooManyPeers, - /// The peer sent an incorrect response - #[error("The peer sent a wrong response to our handshake")] - PeerSentWrongResponse, - /// Error communicating with peer - #[error("Bucket error while communicating with peer: {0}")] - BucketError(#[from] BucketError), -} - -/// An address used to connect to a peer. -#[derive(Debug, Copy, Clone)] -pub enum ConnectionAddr { - /// Outbound connection to another peer. - OutBound { address: NetworkAddress }, - /// An inbound direct connection to our node. - InBoundDirect { transient_address: SocketAddr }, - /// An inbound connection through a hidden network - /// like Tor/ I2p - InBoundProxy { net_zone: NetZone }, -} - -impl ConnectionAddr { - /// Gets the [`NetworkAddress`] of this connection. - pub fn get_network_address(&self, port: u16) -> Option { - match self { - ConnectionAddr::OutBound { address } => Some(*address), - _ => None, - } - } - /// Gets the [`NetZone`] of this connection. - pub fn get_zone(&self) -> NetZone { - match self { - ConnectionAddr::OutBound { address } => address.get_zone(), - ConnectionAddr::InBoundDirect { .. } => NetZone::Public, - ConnectionAddr::InBoundProxy { net_zone } => *net_zone, - } - } - - /// Gets the [`Direction`] of this connection. - pub fn direction(&self) -> Direction { - match self { - ConnectionAddr::OutBound { .. } => Direction::Outbound, - ConnectionAddr::InBoundDirect { .. } | ConnectionAddr::InBoundProxy { .. } => { - Direction::Inbound - } - } - } -} - -/// A request to handshake with a peer. -pub struct DoHandshakeRequest { - /// The read-half of the connection. - pub read: R, - /// The write-half of the connection. - pub write: W, - /// The [`ConnectionAddr`] of this connection. - pub addr: ConnectionAddr, - /// The [`ConnectionTracker`] of this connection. - pub connection_tracker: ConnectionTracker, -} - -/// A [`Service`] that accepts [`DoHandshakeRequest`] and -/// produces a [`Client`] and [`Connection`]. -#[derive(Debug, Clone)] -pub struct Handshaker { - /// A collection of our [`BasicNodeData`] for each [`NetZone`] - /// for more info see: [`NetZoneBasicNodeData`] - basic_node_data: NetZoneBasicNodeData, - /// The [`Network`] our node is using - network: Network, - /// The span [`Connection`] tasks will be [`tracing::instrument`]ed with - parent_span: tracing::Span, - /// The address book [`Service`] - address_book: AdrBook, - /// A [`Service`] to handle incoming [`CoreSyncData`] and to get - /// our [`CoreSyncData`]. - core_sync_svc: CoreSync, - /// A service given to the [`Connection`] task to answer incoming - /// requests to our node. - peer_request_service: Svc, -} - -impl Handshaker { - pub fn new( - basic_node_data: NetZoneBasicNodeData, - network: Network, - address_book: AdrBook, - core_sync_svc: CoreSync, - peer_request_service: Svc, - ) -> Self { - Handshaker { - basic_node_data, - network, - parent_span: tracing::Span::current(), - address_book, - core_sync_svc, - peer_request_service, - } - } -} - -impl Service> - for Handshaker -where - CoreSync: Service - + Clone - + Send - + 'static, - CoreSync::Future: Send, - - Svc: Service - + Clone - + Send - + 'static, - Svc::Future: Send, - - AdrBook: Service - + Clone - + Send - + 'static, - AdrBook::Future: Send, - - W: AsyncWrite + Unpin + Send + 'static, - R: AsyncRead + Unpin + Send + 'static, -{ - type Response = Client; - type Error = BoxError; - type Future = - Pin> + Send + 'static>>; - - fn poll_ready( - &mut self, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - // We are always ready. - std::task::Poll::Ready(Ok(())) - } - - fn call(&mut self, req: DoHandshakeRequest) -> Self::Future { - let DoHandshakeRequest { - read, - write, - addr, - connection_tracker, - } = req; - - // create the levin message stream/ sink. - let peer_stream = FramedRead::new(read, MoneroWireCodec::default()); - let peer_sink = FramedWrite::new(write, MoneroWireCodec::default()); - - // The span the handshake state machine will use - let span = tracing::debug_span!("Handshaker"); - - // The span the connection task will use. - let connection_span = tracing::debug_span!(parent: &self.parent_span, "Connection"); - - // clone the services that the handshake state machine will need. - let core_sync_svc = self.core_sync_svc.clone(); - let address_book = self.address_book.clone(); - let peer_request_service = self.peer_request_service.clone(); - - let state_machine = HandshakeSM { - peer_sink, - peer_stream, - addr, - network: self.network, - basic_node_data: self.basic_node_data.basic_node_data(&addr.get_zone()), - address_book, - core_sync_svc, - peer_request_service, - connection_span, - connection_tracker, - state: HandshakeState::Start, - }; - // although callers should use a timeout do one here as well just to be safe. - let ret = time::timeout(HANDSHAKE_TIMEOUT, state_machine.do_handshake()); - - async move { - match ret.await { - Ok(handshake) => handshake, - Err(_) => Err(HandShakeError::PeerTimedOut.into()), - } - } - .instrument(span) - .boxed() - } -} - -/// The states a handshake can be in. -enum HandshakeState { - /// The initial state. - /// if this is an inbound handshake then this state means we - /// are waiting for a [`HandshakeRequest`]. - Start, - /// Waiting for a [`HandshakeResponse`]. - WaitingForHandshakeResponse, - /// Waiting for a [`SupportFlagsResponse`] - /// This contains the peers node data. - WaitingForSupportFlagResponse(BasicNodeData, CoreSyncData), - /// The handshake is complete. - /// This contains the peers node data. - Complete(BasicNodeData, CoreSyncData), -} - -impl HandshakeState { - /// Returns true if the handshake is completed. - pub fn is_complete(&self) -> bool { - matches!(self, Self::Complete(..)) - } - - /// returns the peers [`BasicNodeData`] and [`CoreSyncData`] if the peer - /// is in state [`HandshakeState::Complete`]. - pub fn peer_data(self) -> Option<(BasicNodeData, CoreSyncData)> { - match self { - HandshakeState::Complete(bnd, coresync) => Some((bnd, coresync)), - _ => None, - } - } -} - -/// The state machine that drives a handshake forward and -/// accepts requests (that can happen during a handshake) -/// from a peer. -struct HandshakeSM { - /// The levin [`FramedWrite`] for the peer. - peer_sink: W, - /// The levin [`FramedRead`] for the peer. - peer_stream: R, - /// The [`ConnectionAddr`] for the peer. - addr: ConnectionAddr, - /// The [`Network`] we are on. - network: Network, - - /// Our [`BasicNodeData`]. - basic_node_data: BasicNodeData, - /// The address book [`Service`] - address_book: AdrBook, - /// The core sync [`Service`] to handle incoming - /// [`CoreSyncData`] and to retrieve ours. - core_sync_svc: CoreSync, - /// The [`Service`] passed to the [`Connection`] - /// task to handle incoming peer requests. - peer_request_service: Svc, - - /// The [`tracing::Span`] the [`Connection`] task - /// will be [`tracing::instrument`]ed with. - connection_span: tracing::Span, - /// A connection tracker to keep track of the - /// number of connections Cuprate is making. - connection_tracker: ConnectionTracker, - - state: HandshakeState, -} - -impl HandshakeSM -where - CoreSync: Service - + Clone - + Send - + 'static, - CoreSync::Future: Send, - - Svc: Service - + Clone - + Send - + 'static, - Svc::Future: Send, - - AdrBook: Service - + Clone - + Send - + 'static, - AdrBook::Future: Send, - - W: Sink + Unpin, - R: Stream> + Unpin, -{ - /// Gets our [`CoreSyncData`] from the `core_sync_svc`. - async fn get_our_core_sync(&mut self) -> Result { - let core_sync_svc = self.core_sync_svc.ready().await?; - let CoreSyncDataResponse::Ours(core_sync) = core_sync_svc.call(CoreSyncDataRequest::GetOurs).await? else { - unreachable!("The Service must give correct responses"); - }; - tracing::trace!("Got core sync data: {core_sync:?}"); - Ok(core_sync) - } - - /// Sends a [`HandshakeRequest`] to the peer. - async fn send_handshake_req( - &mut self, - node_data: BasicNodeData, - payload_data: CoreSyncData, - ) -> Result<(), HandShakeError> { - let handshake_req = HandshakeRequest { - node_data, - payload_data, - }; - - tracing::trace!("Sending handshake request: {handshake_req:?}"); - - let message: Message = Message::Request(RequestMessage::Handshake(handshake_req)); - self.peer_sink.send(message).await?; - Ok(()) - } - - /// Sends a [`SupportFlagsRequest`] to the peer. - /// This is done when a peer sends no support flags in their - /// [`HandshakeRequest`] or [`HandshakeResponse`]. - /// - /// *note because Cuprate has minimum required support flags this won't - /// happeen but is included here just in case this changes. - async fn send_support_flag_req(&mut self) -> Result<(), HandShakeError> { - tracing::trace!("Peer sent no support flags, sending request"); - - let message: Message = Message::Request(RequestMessage::SupportFlags); - self.peer_sink.send(message).await?; - - Ok(()) - } - - /// Handles an incoming [`HandshakeResponse`]. - async fn handle_handshake_response(&mut self, res: HandshakeResponse) -> Result<(), BoxError> { - let HandshakeResponse { - node_data: peer_node_data, - payload_data: peer_core_sync, - local_peerlist_new, - } = res; - - // Check the peer is on the correct network. - if peer_node_data.network_id != self.network.network_id() { - tracing::debug!("Handshake failed: peer is on a different network"); - return Err(HandShakeError::PeerIsOnADifferentNetwork.into()); - } - - // Check the peer meets the minimum support flags. - if !peer_node_data - .support_flags - .contains(&CUPRATE_MINIMUM_SUPPORT_FLAGS) - { - tracing::debug!("Handshake failed: peer does not have minimum required support flags"); - return Err(HandShakeError::PeerDoesNotHaveTheMinimumSupportFlags.into()); - } - - // Check the peer didn't send too many peers. - if local_peerlist_new.len() > P2P_MAX_PEERS_IN_HANDSHAKE { - tracing::debug!("Handshake failed: peer sent too many peers in response"); - return Err(HandShakeError::PeerSentTooManyPeers.into()); - } - - // Tell the sync mgr about the new incoming core sync data. - self.core_sync_svc - .ready() - .await? - .call(CoreSyncDataRequest::NewIncoming(peer_core_sync.clone())) - .await?; - - // Tell the address book about the new peers - self.address_book - .ready() - .await? - .call(AddressBookRequest::HandleNewPeerList( - local_peerlist_new, - self.addr.get_zone(), - )) - .await?; - - // This won't actually happen (as long as we have a none 0 minimum support flags) - // it's just included here for completeness. - if peer_node_data.support_flags.is_empty() { - self.send_support_flag_req().await?; - self.state = - HandshakeState::WaitingForSupportFlagResponse(peer_node_data, peer_core_sync); - } else { - // this will always happen. - self.state = HandshakeState::Complete(peer_node_data, peer_core_sync); - } - - Ok(()) - } - - /// Handles a [`MessageResponse`]. - async fn handle_message_response(&mut self, response: ResponseMessage) -> Result<(), BoxError> { - // The functions called here will change the state of the HandshakeSM so `HandshakeState::Start` - // is just used as a place holder. - // - // doing this allows us to not clone the BasicNodeData and CoreSyncData for WaitingForSupportFlagResponse. - let prv_state = std::mem::replace(&mut self.state, HandshakeState::Start); - - match (prv_state, response) { - ( - HandshakeState::WaitingForHandshakeResponse, - ResponseMessage::Handshake(handshake), - ) => self.handle_handshake_response(handshake).await, - ( - HandshakeState::WaitingForSupportFlagResponse(mut bnd, coresync), - ResponseMessage::SupportFlags(support_flags), - ) => { - bnd.support_flags = support_flags.support_flags; - self.state = HandshakeState::Complete(bnd, coresync); - Ok(()) - } - _ => Err(HandShakeError::PeerSentWrongResponse.into()), - } - } - - /// Sends our [`PeerSupportFlags`] to the peer. - async fn send_support_flags( - &mut self, - support_flags: PeerSupportFlags, - ) -> Result<(), HandShakeError> { - let message = Message::Response(ResponseMessage::SupportFlags(SupportFlagsResponse { - support_flags, - })); - self.peer_sink.send(message).await?; - Ok(()) - } - - /// Attempts an outbound handshake with the peer. - async fn do_outbound_handshake(&mut self) -> Result<(), BoxError> { - // Get the data needed for the handshake request. - let core_sync = self.get_our_core_sync().await?; - // send the handshake request. - self.send_handshake_req(self.basic_node_data.clone(), core_sync) - .await?; - // set the state to waiting for a response. - self.state = HandshakeState::WaitingForHandshakeResponse; - - while !self.state.is_complete() { - match self.peer_stream.next().await { - Some(mes) => { - let mes = mes?; - match mes { - Message::Request(RequestMessage::SupportFlags) => { - // The only request we should be getting during an outbound handshake - // is a support flag request. - self.send_support_flags(self.basic_node_data.support_flags) - .await? - } - Message::Response(response) => { - // This could be a handshake response or a support flags response. - self.handle_message_response(response).await? - } - _ => return Err(HandShakeError::PeerSentWrongResponse.into()), - } - } - None => unreachable!("peer_stream wont return None"), - } - } - - Ok(()) - } - - /// Completes a handshake with a peer. - async fn do_handshake(mut self) -> Result { - let mut peer_reachable = false; - match self.addr.direction() { - Direction::Outbound => { - self.do_outbound_handshake().await?; - // If this is an outbound handshake then obviously the peer - // is reachable. - peer_reachable = true - } - Direction::Inbound => todo!(), - } - - let (server_tx, server_rx) = mpsc::channel(0); - - let (peer_node_data, coresync) = self - .state - .peer_data() - .expect("We must be in state complete to be here"); - - let pruning_seed = PruningSeed::try_from(coresync.pruning_seed).map_err(|e| Box::new(e))?; - - // create the handle between the Address book and the connection task to - // allow the address book to shutdown the connection task and to update - // the address book when the connection is closed. - let (book_connection_side_handle, connection_book_side_handle) = - new_address_book_connection_handle(); - - // tell the address book about the new connection. - self.address_book - .ready() - .await? - .call(AddressBookRequest::ConnectedToPeer { - zone: self.addr.get_zone(), - connection_handle: connection_book_side_handle, - addr: self.addr.get_network_address( - peer_node_data - .my_port - .try_into() - .map_err(|_| "Peer sent a port that does not fit into a u16")?, - ), - id: peer_node_data.peer_id, - reachable: peer_reachable, - last_seen: chrono::Utc::now().naive_utc(), - pruning_seed: pruning_seed.clone(), - rpc_port: peer_node_data.rpc_port, - rpc_credits_per_hash: peer_node_data.rpc_credits_per_hash, - }) - .await?; - - // This block below is for keeping the last seen times in the address book - // upto date. We only update the last seen times on timed syncs to reduce - // the load on the address book. - // - // first clone the items needed - let mut address_book = self.address_book.clone(); - let peer_id = peer_node_data.peer_id; - let net_zone = self.addr.get_zone(); - - /* - let peer_stream = self.peer_stream.then(|mes| async move { - if let Ok(mes) = &mes { - if mes.id() == TimedSync::ID { - if let Ok(ready_book) = address_book.ready().await { - // we dont care about address book errors here, If there is a problem - // with the address book the node will get shutdown. - let _ = ready_book - .call(AddressBookRequest::SetPeerSeen( - peer_id, - chrono::Utc::now().naive_utc(), - net_zone, - )) - .await; - } - } - } - // return the message - mes - }); - - */ - let connection = Connection::new( - self.addr, - self.peer_sink, - server_rx, - self.connection_tracker, - book_connection_side_handle, - self.peer_request_service, - ); - - let connection_task = tokio::task::spawn(connection.run().instrument(self.connection_span)); - - let connection_info = ConnectionInfo { - addr: self.addr, - support_flags: peer_node_data.support_flags, - pruning_seed, - peer_id: peer_node_data.peer_id, - rpc_port: peer_node_data.rpc_port, - rpc_credits_per_hash: peer_node_data.rpc_credits_per_hash, - }; - - let client = Client::new(connection_info.into(), /* futures::futures_channel::oneshot::Sender<()> */, server_tx, connection_task, /* tokio::task::JoinHandle<()> */); - - Ok(client) - } -} diff --git a/p2p/src/peer/load_tracked_client.rs b/p2p/src/peer/load_tracked_client.rs deleted file mode 100644 index 8ac5e04..0000000 --- a/p2p/src/peer/load_tracked_client.rs +++ /dev/null @@ -1,74 +0,0 @@ -//! A peer connection service wrapper type to handle load tracking and provide access to the -//! reported protocol version. - -use std::sync::atomic::Ordering; -use std::{ - sync::Arc, - task::{Context, Poll}, -}; - -use cuprate_common::PruningSeed; -use tower::{ - load::{Load, PeakEwma}, - Service, -}; - -use crate::{ - constants::{EWMA_DECAY_TIME_NANOS, EWMA_DEFAULT_RTT}, - peer::{Client, ConnectionInfo}, -}; - -/// A client service wrapper that keeps track of its load. -/// -/// It also keeps track of the peer's reported protocol version. -pub struct LoadTrackedClient { - /// A service representing a connected peer, wrapped in a load tracker. - service: PeakEwma, - - /// The metadata for the connected peer `service`. - connection_info: Arc, -} - -/// Create a new [`LoadTrackedClient`] wrapping the provided `client` service. -impl From for LoadTrackedClient { - fn from(client: Client) -> Self { - let connection_info = client.connection_info.clone(); - - let service = PeakEwma::new( - client, - EWMA_DEFAULT_RTT, - EWMA_DECAY_TIME_NANOS, - tower::load::CompleteOnResponse::default(), - ); - - LoadTrackedClient { - service, - connection_info, - } - } -} - -impl Service for LoadTrackedClient -where - Client: Service, -{ - type Response = >::Response; - type Error = >::Error; - type Future = as Service>::Future; - - fn poll_ready(&mut self, context: &mut Context<'_>) -> Poll> { - self.service.poll_ready(context) - } - - fn call(&mut self, request: Request) -> Self::Future { - self.service.call(request) - } -} - -impl Load for LoadTrackedClient { - type Metric = as Load>::Metric; - - fn load(&self) -> Self::Metric { - self.service.load() - } -} diff --git a/p2p/src/peer/tests.rs b/p2p/src/peer/tests.rs deleted file mode 100644 index ac52444..0000000 --- a/p2p/src/peer/tests.rs +++ /dev/null @@ -1 +0,0 @@ -mod handshake; diff --git a/p2p/src/peer/tests/handshake.rs b/p2p/src/peer/tests/handshake.rs deleted file mode 100644 index e7ed008..0000000 --- a/p2p/src/peer/tests/handshake.rs +++ /dev/null @@ -1 +0,0 @@ -pub use crate::peer::handshaker::Handshaker; diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs deleted file mode 100644 index 235a2b4..0000000 --- a/p2p/src/protocol.rs +++ /dev/null @@ -1,29 +0,0 @@ -pub mod internal_network; - -pub use internal_network::{InternalMessageRequest, InternalMessageResponse}; - -use monero_wire::messages::CoreSyncData; - -/// A request to a [`tower::Service`] that handles sync states. -pub enum CoreSyncDataRequest { - /// Get our [`CoreSyncData`]. - GetOurs, - /// Handle an incoming [`CoreSyncData`]. - NewIncoming(CoreSyncData), -} - -/// A response from a [`tower::Service`] that handles sync states. -pub enum CoreSyncDataResponse { - /// Our [`CoreSyncData`] - Ours(CoreSyncData), - /// The incoming [`CoreSyncData`] is ok. - Ok, -} - -/// The direction of a connection. -pub enum Direction { - /// An inbound connection. - Inbound, - /// An outbound connection. - Outbound, -} diff --git a/p2p/src/protocol/internal_network.rs b/p2p/src/protocol/internal_network.rs deleted file mode 100644 index 42a419e..0000000 --- a/p2p/src/protocol/internal_network.rs +++ /dev/null @@ -1,125 +0,0 @@ -/// This module defines InternalRequests and InternalResponses. Cuprate's P2P works by translating network messages into an internal -/// request/ response, this is easy for levin "requests" and "responses" (admin messages) but takes a bit more work with "notifications" -/// (protocol messages). -/// -/// Some notifications are easy to translate, like `GetObjectsRequest` is obviously a request but others like `NewFluffyBlock` are a -/// bit tricker. To translate a `NewFluffyBlock` into a request/ response we will have to look to see if we asked for `FluffyMissingTransactionsRequest` -/// if we have we interpret `NewFluffyBlock` as a response if not its a request that doesn't require a response. -/// -/// Here is every P2P request/ response. *note admin messages are already request/ response so "Handshake" is actually made of a HandshakeRequest & HandshakeResponse -/// -/// Admin: -/// Handshake, -/// TimedSync, -/// Ping, -/// SupportFlags -/// Protocol: -/// Request: GetObjectsRequest, Response: GetObjectsResponse, -/// Request: ChainRequest, Response: ChainResponse, -/// Request: FluffyMissingTransactionsRequest, Response: NewFluffyBlock, <- these 2 could be requests or responses -/// Request: GetTxPoolCompliment, Response: NewTransactions, <- -/// Request: NewBlock, Response: None, -/// Request: NewFluffyBlock, Response: None, -/// Request: NewTransactions, Response: None -/// -use monero_wire::{ - ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest, - GetObjectsResponse, GetTxPoolCompliment, HandshakeRequest, HandshakeResponse, Message, - NewBlock, NewFluffyBlock, NewTransactions, PingResponse, RequestMessage, SupportFlagsResponse, - TimedSyncRequest, TimedSyncResponse, -}; - -mod try_from; - -/// An enum representing a request/ response combination, so a handshake request -/// and response would have the same [`MessageID`]. This allows associating the -/// correct response to a request. -#[derive(Debug, Eq, PartialEq, Copy, Clone)] -pub enum MessageID { - Handshake, - TimedSync, - Ping, - SupportFlags, - - GetObjects, - GetChain, - FluffyMissingTxs, - GetTxPoolCompliment, - NewBlock, - NewFluffyBlock, - NewTransactions, -} - -pub enum Request { - Handshake(HandshakeRequest), - TimedSync(TimedSyncRequest), - Ping, - SupportFlags, - - GetObjects(GetObjectsRequest), - GetChain(ChainRequest), - FluffyMissingTxs(FluffyMissingTransactionsRequest), - GetTxPoolCompliment(GetTxPoolCompliment), - NewBlock(NewBlock), - NewFluffyBlock(NewFluffyBlock), - NewTransactions(NewTransactions), -} - -impl Request { - pub fn id(&self) -> MessageID { - match self { - Request::Handshake(_) => MessageID::Handshake, - Request::TimedSync(_) => MessageID::TimedSync, - Request::Ping => MessageID::Ping, - Request::SupportFlags => MessageID::SupportFlags, - - Request::GetObjects(_) => MessageID::GetObjects, - Request::GetChain(_) => MessageID::GetChain, - Request::FluffyMissingTxs(_) => MessageID::FluffyMissingTxs, - Request::GetTxPoolCompliment(_) => MessageID::GetTxPoolCompliment, - Request::NewBlock(_) => MessageID::NewBlock, - Request::NewFluffyBlock(_) => MessageID::NewFluffyBlock, - Request::NewTransactions(_) => MessageID::NewTransactions, - } - } - - pub fn needs_response(&self) -> bool { - match self { - Request::NewBlock(_) | Request::NewFluffyBlock(_) | Request::NewTransactions(_) => { - false - } - _ => true, - } - } -} - -pub enum Response { - Handshake(HandshakeResponse), - TimedSync(TimedSyncResponse), - Ping(PingResponse), - SupportFlags(SupportFlagsResponse), - - GetObjects(GetObjectsResponse), - GetChain(ChainResponse), - NewFluffyBlock(NewFluffyBlock), - NewTransactions(NewTransactions), - NA, -} - -impl Response { - pub fn id(&self) -> MessageID { - match self { - Response::Handshake(_) => MessageID::Handshake, - Response::TimedSync(_) => MessageID::TimedSync, - Response::Ping(_) => MessageID::Ping, - Response::SupportFlags(_) => MessageID::SupportFlags, - - Response::GetObjects(_) => MessageID::GetObjects, - Response::GetChain(_) => MessageID::GetChain, - Response::NewFluffyBlock(_) => MessageID::NewBlock, - Response::NewTransactions(_) => MessageID::NewFluffyBlock, - - Response::NA => panic!("Can't get message ID for a non existent response"), - } - } -} diff --git a/p2p/src/protocol/internal_network/try_from.rs b/p2p/src/protocol/internal_network/try_from.rs deleted file mode 100644 index c8c9ec5..0000000 --- a/p2p/src/protocol/internal_network/try_from.rs +++ /dev/null @@ -1,163 +0,0 @@ -//! This module contains the implementations of [`TryFrom`] and [`From`] to convert between -//! [`Message`], [`Request`] and [`Response`]. - -use monero_wire::messages::{Message, ProtocolMessage, RequestMessage, ResponseMessage}; - -use super::{Request, Response}; - -pub struct MessageConversionError; - - -macro_rules! match_body { - (match $value: ident {$($body:tt)*} ($left:pat => $right_ty:expr) $($todo:tt)*) => { - match_body!( match $value { - $left => $right_ty, - $($body)* - } $($todo)* ) - }; - (match $value: ident {$($body:tt)*}) => { - match $value { - $($body)* - } - }; -} - - -macro_rules! from { - ($left_ty:ident, $right_ty:ident, {$($left:ident $(($val: ident))? = $right:ident $(($vall: ident))?,)+}) => { - impl From<$left_ty> for $right_ty { - fn from(value: $left_ty) -> Self { - match_body!( match value {} - $(($left_ty::$left$(($val))? => $right_ty::$right$(($vall))?))+ - ) - } - } - }; -} - -macro_rules! try_from { - ($left_ty:ident, $right_ty:ident, {$($left:ident $(($val: ident))? = $right:ident $(($vall: ident))?,)+}) => { - impl TryFrom<$left_ty> for $right_ty { - type Error = MessageConversionError; - - fn try_from(value: $left_ty) -> Result { - Ok(match_body!( match value { - _ => return Err(MessageConversionError) - } - $(($left_ty::$left$(($val))? => $right_ty::$right$(($vall))?))+ - )) - } - } - }; -} - -macro_rules! from_try_from { - ($left_ty:ident, $right_ty:ident, {$($left:ident $(($val: ident))? = $right:ident $(($vall: ident))?,)+}) => { - try_from!($left_ty, $right_ty, {$($left $(($val))? = $right $(($vall))?,)+}); - from!($right_ty, $left_ty, {$($right $(($val))? = $left $(($vall))?,)+}); - }; -} - -macro_rules! try_from_try_from { - ($left_ty:ident, $right_ty:ident, {$($left:ident $(($val: ident))? = $right:ident $(($vall: ident))?,)+}) => { - try_from!($left_ty, $right_ty, {$($left $(($val))? = $right $(($vall))?,)+}); - try_from!($right_ty, $left_ty, {$($right $(($val))? = $left $(($val))?,)+}); - }; -} - -from_try_from!(Request, RequestMessage,{ - Handshake(val) = Handshake(val), - Ping = Ping, - SupportFlags = SupportFlags, - TimedSync(val) = TimedSync(val), -}); - -try_from_try_from!(Request, ProtocolMessage,{ - NewBlock(val) = NewBlock(val), - NewFluffyBlock(val) = NewFluffyBlock(val), - GetObjects(val) = GetObjectsRequest(val), - GetChain(val) = ChainRequest(val), - NewTransactions(val) = NewTransactions(val), - FluffyMissingTxs(val) = FluffyMissingTransactionsRequest(val), - GetTxPoolCompliment(val) = GetTxPoolCompliment(val), -}); - - - -impl TryFrom for Request { - type Error = MessageConversionError; - - fn try_from(value: Message) -> Result { - match value { - Message::Request(req) => Ok(req.into()), - Message::Protocol(pro) => pro.try_into(), - _ => Err(MessageConversionError), - } - } -} - -impl From for Message { - fn from(value: Request) -> Self { - match value { - Request::Handshake(val) => Message::Request(RequestMessage::Handshake(val)), - Request::Ping => Message::Request(RequestMessage::Ping), - Request::SupportFlags => Message::Request(RequestMessage::SupportFlags), - Request::TimedSync(val) => Message::Request(RequestMessage::TimedSync(val)), - - Request::NewBlock(val) => Message::Protocol(ProtocolMessage::NewBlock(val)), - Request::NewFluffyBlock(val) => Message::Protocol(ProtocolMessage::NewFluffyBlock(val)), - Request::GetObjects(val) => Message::Protocol(ProtocolMessage::GetObjectsRequest(val)), - Request::GetChain(val) => Message::Protocol(ProtocolMessage::ChainRequest(val)), - Request::NewTransactions(val) => Message::Protocol(ProtocolMessage::NewTransactions(val)), - Request::FluffyMissingTxs(val) => Message::Protocol(ProtocolMessage::FluffyMissingTransactionsRequest(val)), - Request::GetTxPoolCompliment(val) => Message::Protocol(ProtocolMessage::GetTxPoolCompliment(val)), - } - } -} - -from_try_from!(Response, ResponseMessage,{ - Handshake(val) = Handshake(val), - Ping(val) = Ping(val), - SupportFlags(val) = SupportFlags(val), - TimedSync(val) = TimedSync(val), -}); - -try_from_try_from!(Response, ProtocolMessage,{ - NewFluffyBlock(val) = NewFluffyBlock(val), - GetObjects(val) = GetObjectsResponse(val), - GetChain(val) = ChainEntryResponse(val), - NewTransactions(val) = NewTransactions(val), - -}); - -impl TryFrom for Response { - type Error = MessageConversionError; - - fn try_from(value: Message) -> Result { - match value { - Message::Response(res) => Ok(res.into()), - Message::Protocol(pro) => pro.try_into(), - _ => Err(MessageConversionError), - } - } -} - -impl TryFrom for Message { - type Error = MessageConversionError; - - fn try_from(value: Response) -> Result { - Ok(match value { - Response::Handshake(val) => Message::Response(ResponseMessage::Handshake(val)), - Response::Ping(val) => Message::Response(ResponseMessage::Ping(val)), - Response::SupportFlags(val) => Message::Response(ResponseMessage::SupportFlags(val)), - Response::TimedSync(val) => Message::Response(ResponseMessage::TimedSync(val)), - - Response::NewFluffyBlock(val) => Message::Protocol(ProtocolMessage::NewFluffyBlock(val)), - Response::GetObjects(val) => Message::Protocol(ProtocolMessage::GetObjectsResponse(val)), - Response::GetChain(val) => Message::Protocol(ProtocolMessage::ChainEntryResponse(val)), - Response::NewTransactions(val) => Message::Protocol(ProtocolMessage::NewTransactions(val)), - - Response::NA => return Err(MessageConversionError), - }) - } -}