diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml
deleted file mode 100644
index b90ea4b..0000000
--- a/p2p/Cargo.toml
+++ /dev/null
@@ -1,23 +0,0 @@
-[package]
-name = "cuprate-p2p"
-version = "0.1.0"
-edition = "2021"
-license = "AGPL-3.0-only"
-authors = ["Boog900"]
-
-
-[dependencies]
-chrono = "0.4.24"
-thiserror = "1.0.39"
-cuprate-common = {path = "../common"}
-monero-wire = {path= "../net/monero-wire"}
-futures = "0.3.26"
-tower = {version = "0.4.13", features = ["util", "steer", "load", "discover", "load-shed", "buffer", "timeout"]}
-tokio = {version= "1.27", features=["rt", "time", "net"]}
-tokio-util = {version = "0.7.8", features=["codec"]}
-tokio-stream = {version="0.1.14", features=["time"]}
-async-trait = "0.1.68"
-tracing = "0.1.37"
-tracing-error = "0.2.0"
-rand = "0.8.5"
-pin-project = "1.0.12"
diff --git a/p2p/LICENSE b/p2p/LICENSE
deleted file mode 100644
index e19903e..0000000
--- a/p2p/LICENSE
+++ /dev/null
@@ -1,14 +0,0 @@
- Copyright (C) 2023 Cuprate Contributors
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see .
\ No newline at end of file
diff --git a/p2p/src/address_book.rs b/p2p/src/address_book.rs
deleted file mode 100644
index 6865137..0000000
--- a/p2p/src/address_book.rs
+++ /dev/null
@@ -1,157 +0,0 @@
-//! Cuprate Address Book
-//!
-//! This module holds the logic for persistent peer storage.
-//! Cuprates address book is modeled as a [`tower::Service`]
-//! The request is [`AddressBookRequest`] and the response is
-//! [`AddressBookResponse`].
-//!
-//! Cuprate, like monerod, actually has 3 address books, one
-//! for each [`NetZone`]. This is to reduce the possibility of
-//! clear net peers getting linked to their dark counterparts
-//! and so peers will only get told about peers they can
-//! connect to.
-//!
-
-mod addr_book_client;
-mod address_book;
-pub mod connection_handle;
-
-use cuprate_common::PruningSeed;
-use monero_wire::{messages::PeerListEntryBase, network_address::NetZone, NetworkAddress, PeerID};
-
-use connection_handle::ConnectionAddressBookHandle;
-
-pub use addr_book_client::start_address_book;
-
-/// Possible errors when dealing with the address book.
-/// This is boxed when returning an error in the [`tower::Service`].
-#[derive(Debug, thiserror::Error)]
-pub enum AddressBookError {
- /// The peer is not in the address book for this zone.
- #[error("Peer was not found in book")]
- PeerNotFound,
- /// The peer list is empty.
- #[error("The peer list is empty")]
- PeerListEmpty,
- /// The peers pruning seed has changed.
- #[error("The peers pruning seed has changed")]
- PeersPruningSeedChanged,
- /// The peer is banned.
- #[error("The peer is banned")]
- PeerIsBanned,
- /// When handling a received peer list, the list contains
- /// a peer in a different [`NetZone`]
- #[error("Peer sent an address out of it's net-zone")]
- PeerSentAnAddressOutOfZone,
- /// The channel to the address book has closed unexpectedly.
- #[error("The address books channel has closed.")]
- AddressBooksChannelClosed,
- /// The address book task has exited.
- #[error("The address book task has exited.")]
- AddressBookTaskExited,
- /// The peer file store has failed.
- #[error("Peer Store Error: {0}")]
- PeerStoreError(&'static str),
-}
-
-/// A message sent to tell the address book that a peer has disconnected.
-pub struct PeerConnectionClosed;
-
-/// A request to the address book.
-#[derive(Debug)]
-pub enum AddressBookRequest {
- /// A request to handle an incoming peer list.
- HandleNewPeerList(Vec, NetZone),
- /// Updates the `last_seen` timestamp of this peer.
- SetPeerSeen(PeerID, chrono::NaiveDateTime, NetZone),
- /// Bans a peer for the specified duration. This request
- /// will send disconnect signals to all peers with the same
- /// [`ban_identifier`](NetworkAddress::ban_identifier).
- BanPeer(PeerID, std::time::Duration, NetZone),
- /// Adds a peer to the connected list
- ConnectedToPeer {
- /// The net zone of this connection.
- zone: NetZone,
- /// A handle between the connection and address book.
- connection_handle: ConnectionAddressBookHandle,
- /// The connection addr, None if the peer is using a
- /// hidden network.
- addr: Option,
- /// The peers id.
- id: PeerID,
- /// If the peer is reachable by our node.
- reachable: bool,
- /// The last seen timestamp, note: Cuprate may skip updating this
- /// field on some inbound messages
- last_seen: chrono::NaiveDateTime,
- /// The peers pruning seed
- pruning_seed: PruningSeed,
- /// The peers port.
- rpc_port: u16,
- /// The peers rpc credits per hash
- rpc_credits_per_hash: u32,
- },
-
- /// A request to get and eempty the anchor list,
- /// used when starting the node.
- GetAndEmptyAnchorList(NetZone),
- /// Get a random Gray peer from the peer list
- /// If a pruning seed is given we will select from
- /// peers with that seed and peers that dont prune.
- GetRandomGrayPeer(NetZone, Option),
- /// Get a random White peer from the peer list
- /// If a pruning seed is given we will select from
- /// peers with that seed and peers that dont prune.
- GetRandomWhitePeer(NetZone, Option),
- /// Get a list of random peers from the white list,
- /// The list will be less than or equal to the provided
- /// len.
- GetRandomWhitePeers(NetZone, usize),
-}
-
-impl std::fmt::Display for AddressBookRequest {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- Self::HandleNewPeerList(..) => f.write_str("HandleNewPeerList"),
- Self::SetPeerSeen(..) => f.write_str("SetPeerSeen"),
- Self::BanPeer(..) => f.write_str("BanPeer"),
- Self::ConnectedToPeer { .. } => f.write_str("ConnectedToPeer"),
-
- Self::GetAndEmptyAnchorList(_) => f.write_str("GetAndEmptyAnchorList"),
- Self::GetRandomGrayPeer(..) => f.write_str("GetRandomGrayPeer"),
- Self::GetRandomWhitePeer(..) => f.write_str("GetRandomWhitePeer"),
- Self::GetRandomWhitePeers(_, len) => {
- f.write_str(&format!("GetRandomWhitePeers, len: {len}"))
- }
- }
- }
-}
-
-impl AddressBookRequest {
- /// Gets the [`NetZone`] for this request so we can
- /// route it to the required address book.
- pub fn get_zone(&self) -> NetZone {
- match self {
- Self::HandleNewPeerList(_, zone) => *zone,
- Self::SetPeerSeen(.., zone) => *zone,
- Self::BanPeer(.., zone) => *zone,
- Self::ConnectedToPeer { zone, .. } => *zone,
-
- Self::GetAndEmptyAnchorList(zone) => *zone,
- Self::GetRandomGrayPeer(zone, _) => *zone,
- Self::GetRandomWhitePeer(zone, _) => *zone,
- Self::GetRandomWhitePeers(zone, _) => *zone,
- }
- }
-}
-
-/// A response from the AddressBook.
-#[derive(Debug)]
-pub enum AddressBookResponse {
- /// The request was handled ok.
- Ok,
- /// A peer.
- Peer(PeerListEntryBase),
- /// A list of peers.
- Peers(Vec),
-}
diff --git a/p2p/src/address_book/addr_book_client.rs b/p2p/src/address_book/addr_book_client.rs
deleted file mode 100644
index f35d265..0000000
--- a/p2p/src/address_book/addr_book_client.rs
+++ /dev/null
@@ -1,166 +0,0 @@
-//! This module holds the address books client and [`tower::Service`].
-//!
-//! To start the address book use [`start_address_book`].
-// TODO: Store banned peers persistently.
-use std::future::Future;
-use std::pin::Pin;
-use std::task::Poll;
-
-use futures::channel::{mpsc, oneshot};
-use futures::FutureExt;
-use tokio::task::{spawn, JoinHandle};
-use tower::steer::Steer;
-use tower::BoxError;
-use tracing::Instrument;
-
-use monero_wire::network_address::NetZone;
-
-use crate::{Config, P2PStore};
-
-use super::address_book::{AddressBook, AddressBookClientRequest};
-use super::{AddressBookError, AddressBookRequest, AddressBookResponse};
-
-/// Start the address book.
-/// Under the hood this function spawns 3 address books
-/// for the 3 [`NetZone`] and combines them into a [`tower::Steer`](Steer).
-pub async fn start_address_book(
- peer_store: S,
- config: Config,
-) -> Result<
- impl tower::Service<
- AddressBookRequest,
- Response = AddressBookResponse,
- Error = BoxError,
- Future = Pin<
- Box> + Send + 'static>,
- >,
- >,
- BoxError,
->
-where
- S: P2PStore,
-{
- let mut builder = AddressBookBuilder::new(peer_store, config);
-
- let public = builder.build(NetZone::Public).await?;
- let tor = builder.build(NetZone::Tor).await?;
- let i2p = builder.build(NetZone::I2p).await?;
-
- // This list MUST be in the same order as closuer in the `Steer` func
- let books = vec![public, tor, i2p];
-
- Ok(Steer::new(
- books,
- |req: &AddressBookRequest, _: &[_]| match req.get_zone() {
- // This:
- NetZone::Public => 0,
- NetZone::Tor => 1,
- NetZone::I2p => 2,
- },
- ))
-}
-
-/// An address book builder.
-/// This:
-/// - starts the address book
-/// - creates and returns the `AddressBookClient`
-struct AddressBookBuilder {
- peer_store: S,
- config: Config,
-}
-
-impl AddressBookBuilder
-where
- S: P2PStore,
-{
- fn new(peer_store: S, config: Config) -> Self {
- AddressBookBuilder { peer_store, config }
- }
-
- /// Builds the address book for a specific [`NetZone`]
- async fn build(&mut self, zone: NetZone) -> Result {
- let (white, gray, anchor) = self
- .peer_store
- .load_peers(zone)
- .await
- .map_err(|e| AddressBookError::PeerStoreError(e))?;
-
- let book = AddressBook::new(
- self.config.clone(),
- zone,
- white,
- gray,
- anchor,
- vec![],
- self.peer_store.clone(),
- );
-
- let (tx, rx) = mpsc::channel(0);
-
- let book_span = tracing::info_span!("AddressBook", book = book.book_name());
-
- let book_handle = spawn(book.run(rx).instrument(book_span));
-
- Ok(AddressBookClient {
- book: tx,
- book_handle,
- })
- }
-}
-
-/// The Client for an individual address book.
-#[derive(Debug)]
-struct AddressBookClient {
- /// The channel to pass requests to the address book.
- book: mpsc::Sender,
- /// The address book task handle.
- book_handle: JoinHandle<()>,
-}
-
-impl tower::Service for AddressBookClient {
- type Response = AddressBookResponse;
- type Error = BoxError;
- type Future =
- Pin> + Send + 'static>>;
-
- fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> {
- // Check the channel
- match self.book.poll_ready(cx) {
- Poll::Pending => return Poll::Pending,
- Poll::Ready(Ok(())) => (),
- Poll::Ready(Err(_)) => {
- return Poll::Ready(Err(AddressBookError::AddressBooksChannelClosed.into()))
- }
- }
-
- // Check the address book task is still running
- match self.book_handle.poll_unpin(cx) {
- // The address book is still running
- Poll::Pending => Poll::Ready(Ok(())),
- // The address book task has exited
- Poll::Ready(_) => Err(AddressBookError::AddressBookTaskExited)?,
- }
- }
-
- fn call(&mut self, req: AddressBookRequest) -> Self::Future {
- let (tx, rx) = oneshot::channel();
- // get the callers span
- let span = tracing::debug_span!(parent: &tracing::span::Span::current(), "AddressBook");
-
- let req = AddressBookClientRequest { req, tx, span };
-
- match self.book.try_send(req) {
- Err(_e) => {
- // I'm assuming all callers will call `poll_ready` first (which they are supposed to)
- futures::future::ready(Err(AddressBookError::AddressBooksChannelClosed.into()))
- .boxed()
- }
- Ok(()) => async move {
- rx.await
- .expect("Address Book will not drop requests until completed")
- .map_err(Into::into)
- }
- .boxed(),
- }
- }
-}
diff --git a/p2p/src/address_book/address_book.rs b/p2p/src/address_book/address_book.rs
deleted file mode 100644
index 84d149b..0000000
--- a/p2p/src/address_book/address_book.rs
+++ /dev/null
@@ -1,594 +0,0 @@
-//! This module contains the actual address book logic.
-//!
-//! The address book is split into multiple [`PeerList`]:
-//!
-//! - A White list: For peers we have connected to ourselves.
-//!
-//! - A Gray list: For Peers we have been told about but
-//! haven't connected to ourselves.
-//!
-//! - An Anchor list: This holds peers we are currently
-//! connected to that are reachable if we were to
-//! connect to them again. For example an inbound proxy
-//! connection would not get added to this list as we cant
-//! connect to this peer ourselves. Behind the scenes we
-//! are just storing the key to a peer in the white list.
-//!
-use std::collections::{HashMap, HashSet};
-use std::future::Future;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-
-use futures::stream::FuturesUnordered;
-use futures::{
- channel::{mpsc, oneshot},
- FutureExt, Stream, StreamExt,
-};
-use pin_project::pin_project;
-use rand::prelude::SliceRandom;
-
-use cuprate_common::shutdown::is_shutting_down;
-use cuprate_common::PruningSeed;
-use monero_wire::{messages::PeerListEntryBase, network_address::NetZone, NetworkAddress, PeerID};
-
-use super::{AddressBookError, AddressBookRequest, AddressBookResponse};
-use crate::address_book::connection_handle::ConnectionAddressBookHandle;
-use crate::{constants::ADDRESS_BOOK_SAVE_INTERVAL, Config, P2PStore};
-
-mod peer_list;
-use peer_list::PeerList;
-
-#[cfg(test)]
-mod tests;
-
-/// A request sent to the address book task.
-pub(crate) struct AddressBookClientRequest {
- /// The request
- pub req: AddressBookRequest,
- /// A oneshot to send the result down
- pub tx: oneshot::Sender>,
- /// The tracing span to keep the context of the request
- pub span: tracing::Span,
-}
-
-/// An entry in the connected list.
-pub struct ConnectionPeerEntry {
- /// A oneshot sent from the Connection when it has finished.
- connection_handle: ConnectionAddressBookHandle,
- /// The connection addr, None if the peer is connected through
- /// a hidden network.
- addr: Option,
- /// If the peer is reachable by our node.
- reachable: bool,
- /// The last seen timestamp, note: Cuprate may skip updating this
- /// field on some inbound messages
- last_seen: chrono::NaiveDateTime,
- /// The peers pruning seed
- pruning_seed: PruningSeed,
- /// The peers port.
- rpc_port: u16,
- /// The peers rpc credits per hash
- rpc_credits_per_hash: u32,
-}
-
-/// A future that resolves when a peer is unbanned.
-#[pin_project(project = EnumProj)]
-pub struct BanedPeerFut(Vec, #[pin] tokio::time::Sleep);
-
-impl Future for BanedPeerFut {
- type Output = Vec;
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
- let mut this = self.project();
- match this.1.poll_unpin(cx) {
- Poll::Pending => Poll::Pending,
- Poll::Ready(_) => Poll::Ready(this.0.clone()),
- }
- }
-}
-
-/// The address book for a specific [`NetZone`]
-pub struct AddressBook {
- /// The [`NetZone`] of this address book.
- zone: NetZone,
- /// A copy of the nodes configuration.
- config: Config,
- /// The Address books white list.
- white_list: PeerList,
- /// The Address books gray list.
- gray_list: PeerList,
- /// The Address books anchor list.
- anchor_list: HashSet,
- /// The Currently connected peers.
- connected_peers: HashMap,
- /// A tuple of:
- /// - A hashset of [`ban_identifier`](NetworkAddress::ban_identifier)
- /// - A [`FuturesUnordered`] which contains futures for every ban_id
- /// that will resolve when the ban_id should be un banned.
- baned_peers: (HashSet>, FuturesUnordered),
- /// The peer store to save the peers to persistent storage
- p2p_store: PeerStore,
-}
-
-impl AddressBook {
- /// Creates a new address book for a given [`NetZone`]
- pub fn new(
- config: Config,
- zone: NetZone,
- white_peers: Vec,
- gray_peers: Vec,
- anchor_peers: Vec,
- baned_peers: Vec<(NetworkAddress, chrono::NaiveDateTime)>,
- p2p_store: PeerStore,
- ) -> Self {
- let white_list = PeerList::new(white_peers);
- let gray_list = PeerList::new(gray_peers);
- let anchor_list = HashSet::from_iter(anchor_peers);
- let baned_peers = (HashSet::new(), FuturesUnordered::new());
-
- let connected_peers = HashMap::new();
-
- AddressBook {
- zone,
- config,
- white_list,
- gray_list,
- anchor_list,
- connected_peers,
- baned_peers,
- p2p_store,
- }
- }
-
- /// Returns the books name (Based on the [`NetZone`])
- pub const fn book_name(&self) -> &'static str {
- match self.zone {
- NetZone::Public => "PublicAddressBook",
- NetZone::Tor => "TorAddressBook",
- NetZone::I2p => "I2pAddressBook",
- }
- }
-
- /// Returns the length of the white list
- fn len_white_list(&self) -> usize {
- self.white_list.len()
- }
-
- /// Returns the length of the gray list
- fn len_gray_list(&self) -> usize {
- self.gray_list.len()
- }
-
- /// Returns the length of the anchor list
- fn len_anchor_list(&self) -> usize {
- self.anchor_list.len()
- }
-
- /// Returns the length of the banned list
- fn len_banned_list(&self) -> usize {
- self.baned_peers.0.len()
- }
-
- /// Returns the maximum length of the white list
- /// *note this list can grow bigger if we are connected to more
- /// than this amount.
- fn max_white_peers(&self) -> usize {
- self.config.max_white_peers()
- }
-
- /// Returns the maximum length of the gray list
- fn max_gray_peers(&self) -> usize {
- self.config.max_gray_peers()
- }
-
- /// Checks if a peer is banned.
- fn is_peer_banned(&self, peer: &NetworkAddress) -> bool {
- self.baned_peers.0.contains(&peer.ban_identifier())
- }
-
- /// Checks if banned peers should be unbanned as the duration has elapsed
- fn check_unban_peers(&mut self) {
- while let Some(Some(addr)) = Pin::new(&mut self.baned_peers.1).next().now_or_never() {
- tracing::debug!("Unbanning peer: {addr:?}");
- self.baned_peers.0.remove(&addr);
- }
- }
-
- /// Checks if peers have disconnected, if they have removing them from the
- /// connected and anchor list.
- fn check_connected_peers(&mut self) {
- let mut remove_from_anchor = vec![];
- // We dont have to worry about updating our white list with the information
- // before we remove the peers as that happens on every save.
- self.connected_peers.retain(|_, peer| {
- if !peer.connection_handle.connection_closed() {
- // add the peer to the list to get removed from the anchor
- if let Some(addr) = peer.addr {
- remove_from_anchor.push(addr)
- }
- false
- } else {
- true
- }
- });
- // If we are shutting down we want to keep our anchor peers for
- // the next time we boot up so we dont remove disconnecting peers
- // from the anchor list if we are shutting down.
- if !is_shutting_down() {
- for peer in remove_from_anchor {
- self.anchor_list.remove(&peer);
- }
- }
- }
-
- // Bans the peer and tells the connection tasks of peers with the same ban id to shutdown.
- fn ban_peer(
- &mut self,
- peer: PeerID,
- time: std::time::Duration,
- ) -> Result<(), AddressBookError> {
- tracing::debug!("Banning peer: {peer:?} for: {time:?}");
-
- let Some(conn_entry) = self.connected_peers.get(&peer) else {
- tracing::debug!("Peer is not in connected list");
- return Err(AddressBookError::PeerNotFound);
- };
- // tell the connection task to finish.
- conn_entry.connection_handle.kill_connection();
- // try find the NetworkAddress of the peer
- let Some(addr) = conn_entry.addr else {
- tracing::debug!("Peer does not have an address we can ban");
- return Ok(());
- };
-
- let ban_id = addr.ban_identifier();
-
- self.white_list.remove_peers_with_ban_id(&ban_id);
- self.gray_list.remove_peers_with_ban_id(&ban_id);
- // Dont remove from anchor list or connection list as this will happen when
- // the connection is closed.
-
- // tell the connection task of peers with the same ban id to shutdown.
- for conn in self.connected_peers.values() {
- if let Some(addr) = conn.addr {
- if addr.ban_identifier() == ban_id {
- conn.connection_handle.kill_connection()
- }
- }
- }
-
- // add the ban identifier to the ban list
- self.baned_peers.0.insert(ban_id.clone());
- self.baned_peers
- .1
- .push(BanedPeerFut(ban_id, tokio::time::sleep(time)));
- Ok(())
- }
-
- /// Update the last seen timestamp of a connected peer.
- fn update_last_seen(
- &mut self,
- peer: PeerID,
- last_seen: chrono::NaiveDateTime,
- ) -> Result<(), AddressBookError> {
- if let Some(mut peer) = self.connected_peers.get_mut(&peer) {
- peer.last_seen = last_seen;
- Ok(())
- } else {
- Err(AddressBookError::PeerNotFound)
- }
- }
-
- /// adds a peer to the gray list.
- fn add_peer_to_gray_list(&mut self, mut peer: PeerListEntryBase) {
- if self.white_list.contains_peer(&peer.adr) {
- return;
- };
- if !self.gray_list.contains_peer(&peer.adr) {
- peer.last_seen = 0;
- self.gray_list.add_new_peer(peer);
- }
- }
-
- /// handles an incoming peer list,
- /// dose some basic validation on the addresses
- /// appends the good peers to our book.
- fn handle_new_peerlist(
- &mut self,
- mut peers: Vec,
- ) -> Result<(), AddressBookError> {
- let length = peers.len();
-
- tracing::debug!("Received new peer list, length: {length}");
-
- let mut err = None;
- peers.retain(|peer| {
- if err.is_some() {
- false
- } else if peer.adr.is_local() || peer.adr.is_loopback() {
- false
- } else if peer.adr.port() == peer.rpc_port {
- false
- } else if PruningSeed::try_from(peer.pruning_seed).is_err() {
- false
- } else if peer.adr.get_zone() != self.zone {
- tracing::info!("Received an address from a different network zone, ignoring list.");
- err = Some(AddressBookError::PeerSentAnAddressOutOfZone);
- false
- } else if self.is_peer_banned(&peer.adr) {
- false
- } else {
- true
- }
- });
-
- if let Some(e) = err {
- return Err(e);
- } else {
- for peer in peers {
- self.add_peer_to_gray_list(peer);
- }
- self.gray_list
- .reduce_list(&HashSet::new(), self.max_gray_peers());
- Ok(())
- }
- }
-
- /// Gets a random peer from our gray list.
- /// If pruning seed is set we will get a peer with that pruning seed.
- fn get_random_gray_peer(
- &mut self,
- pruning_seed: Option,
- ) -> Option {
- self.gray_list
- .get_random_peer(&mut rand::thread_rng(), pruning_seed.map(Into::into))
- .map(|p| *p)
- }
-
- /// Gets a random peer from our white list.
- /// If pruning seed is set we will get a peer with that pruning seed.
- fn get_random_white_peer(
- &mut self,
- pruning_seed: Option,
- ) -> Option {
- self.white_list
- .get_random_peer(&mut rand::thread_rng(), pruning_seed.map(Into::into))
- .map(|p| *p)
- }
-
- /// Gets random peers from our white list.
- /// will be less than or equal to `len`.
- fn get_random_white_peers(&mut self, len: usize) -> Vec {
- let white_len = self.white_list.len();
- let len = if len < white_len { len } else { white_len };
- let mut white_peers: Vec<&PeerListEntryBase> = self.white_list.iter_all_peers().collect();
- white_peers.shuffle(&mut rand::thread_rng());
- white_peers[0..len].iter().map(|peb| **peb).collect()
- }
-
- /// Updates an entry in the white list, if the peer is not found and `reachable` is true then
- /// the peer will be added to the white list.
- fn update_white_list_peer_entry(
- &mut self,
- addr: &NetworkAddress,
- id: PeerID,
- conn_entry: &ConnectionPeerEntry,
- ) -> Result<(), AddressBookError> {
- if let Some(peb) = self.white_list.get_peer_mut(addr) {
- if peb.pruning_seed == conn_entry.pruning_seed.into() {
- return Err(AddressBookError::PeersPruningSeedChanged);
- }
- peb.id = id;
- peb.last_seen = conn_entry.last_seen.timestamp();
- peb.rpc_port = conn_entry.rpc_port;
- peb.rpc_credits_per_hash = conn_entry.rpc_credits_per_hash;
- peb.pruning_seed = conn_entry.pruning_seed.into();
- } else if conn_entry.reachable {
- // if the peer is reachable add it to our white list
- let peb = PeerListEntryBase {
- id,
- adr: *addr,
- last_seen: conn_entry.last_seen.timestamp(),
- rpc_port: conn_entry.rpc_port,
- rpc_credits_per_hash: conn_entry.rpc_credits_per_hash,
- pruning_seed: conn_entry.pruning_seed.into(),
- };
- self.white_list.add_new_peer(peb);
- }
- Ok(())
- }
-
- /// Handles a new connection, adding it to the white list if the
- /// peer is reachable by our node.
- fn handle_new_connection(
- &mut self,
- connection_handle: ConnectionAddressBookHandle,
- addr: Option,
- id: PeerID,
- reachable: bool,
- last_seen: chrono::NaiveDateTime,
- pruning_seed: PruningSeed,
- rpc_port: u16,
- rpc_credits_per_hash: u32,
- ) -> Result<(), AddressBookError> {
- let connection_entry = ConnectionPeerEntry {
- connection_handle,
- addr,
- reachable,
- last_seen,
- pruning_seed,
- rpc_port,
- rpc_credits_per_hash,
- };
- if let Some(addr) = addr {
- if self.baned_peers.0.contains(&addr.ban_identifier()) {
- return Err(AddressBookError::PeerIsBanned);
- }
- // remove the peer from the gray list as we know it's active.
- let _ = self.gray_list.remove_peer(&addr);
- if !reachable {
- // If we can't reach the peer remove it from the white list as well
- let _ = self.white_list.remove_peer(&addr);
- } else {
- // The peer is reachable, update our white list and add it to the anchor connections.
- self.update_white_list_peer_entry(&addr, id, &connection_entry)?;
- self.anchor_list.insert(addr);
- }
- }
-
- self.connected_peers.insert(id, connection_entry);
- self.white_list
- .reduce_list(&self.anchor_list, self.max_white_peers());
- Ok(())
- }
-
- /// Get and empties the anchor list, used at startup to
- /// connect to some peers we were previously connected to.
- fn get_and_empty_anchor_list(&mut self) -> Vec {
- self.anchor_list
- .drain()
- .map(|addr| {
- self.white_list
- .get_peer(&addr)
- .expect("If peer is in anchor it must be in white list")
- .clone()
- })
- .collect()
- }
-
- /// Handles an [`AddressBookClientRequest`] to the address book.
- async fn handle_request(&mut self, req: AddressBookClientRequest) {
- let _guard = req.span.enter();
-
- tracing::trace!("received request: {}", req.req);
-
- let res = match req.req {
- AddressBookRequest::HandleNewPeerList(new_peers, _) => self
- .handle_new_peerlist(new_peers)
- .map(|_| AddressBookResponse::Ok),
- AddressBookRequest::SetPeerSeen(peer, last_seen, _) => self
- .update_last_seen(peer, last_seen)
- .map(|_| AddressBookResponse::Ok),
- AddressBookRequest::BanPeer(peer, time, _) => {
- self.ban_peer(peer, time).map(|_| AddressBookResponse::Ok)
- }
- AddressBookRequest::ConnectedToPeer {
- zone: _,
- connection_handle,
- addr,
- id,
- reachable,
- last_seen,
- pruning_seed,
- rpc_port,
- rpc_credits_per_hash,
- } => self
- .handle_new_connection(
- connection_handle,
- addr,
- id,
- reachable,
- last_seen,
- pruning_seed,
- rpc_port,
- rpc_credits_per_hash,
- )
- .map(|_| AddressBookResponse::Ok),
-
- AddressBookRequest::GetAndEmptyAnchorList(_) => {
- Ok(AddressBookResponse::Peers(self.get_and_empty_anchor_list()))
- }
-
- AddressBookRequest::GetRandomGrayPeer(_, pruning_seed) => {
- match self.get_random_gray_peer(pruning_seed) {
- Some(peer) => Ok(AddressBookResponse::Peer(peer)),
- None => Err(AddressBookError::PeerListEmpty),
- }
- }
- AddressBookRequest::GetRandomWhitePeer(_, pruning_seed) => {
- match self.get_random_white_peer(pruning_seed) {
- Some(peer) => Ok(AddressBookResponse::Peer(peer)),
- None => Err(AddressBookError::PeerListEmpty),
- }
- }
- AddressBookRequest::GetRandomWhitePeers(_, len) => {
- Ok(AddressBookResponse::Peers(self.get_random_white_peers(len)))
- }
- };
-
- if let Err(e) = &res {
- tracing::debug!("Error when handling request, err: {e}")
- }
-
- let _ = req.tx.send(res);
- }
-
- /// Updates the white list with the information in the `connected_peers` list.
- /// This only updates the `last_seen` timestamp as that's the only thing that should
- /// change during connections.
- fn update_white_list_with_conn_list(&mut self) {
- for (_, peer) in self.connected_peers.iter() {
- if peer.reachable {
- if let Some(peer_eb) = self.white_list.get_peer_mut(&peer.addr.unwrap()) {
- peer_eb.last_seen = peer.last_seen.timestamp();
- }
- }
- }
- }
-
- /// Saves the address book to persistent storage.
- /// TODO: save the banned peer list.
- #[tracing::instrument(level="trace", skip(self), fields(name = self.book_name()) )]
- async fn save(&mut self) {
- self.update_white_list_with_conn_list();
- tracing::trace!(
- "white_len: {}, gray_len: {}, anchor_len: {}, banned_len: {}",
- self.len_white_list(),
- self.len_gray_list(),
- self.len_anchor_list(),
- self.len_banned_list()
- );
- let res = self
- .p2p_store
- .save_peers(
- self.zone,
- (&self.white_list).into(),
- (&self.gray_list).into(),
- self.anchor_list.iter().collect(),
- )
- .await;
- match res {
- Ok(()) => tracing::trace!("Complete"),
- Err(e) => tracing::error!("Error saving address book: {e}"),
- }
- }
-
- /// Runs the address book task
- /// Should be spawned in a task.
- pub(crate) async fn run(mut self, mut rx: mpsc::Receiver) {
- let mut save_interval = {
- let mut interval = tokio::time::interval(ADDRESS_BOOK_SAVE_INTERVAL);
- interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
- // Interval ticks at 0, interval, 2 interval, ...
- // this is just to ignore the first tick
- interval.tick().await;
- tokio_stream::wrappers::IntervalStream::new(interval).fuse()
- };
-
- loop {
- self.check_unban_peers();
- self.check_connected_peers();
- futures::select! {
- req = rx.next() => {
- if let Some(req) = req {
- self.handle_request(req).await
- } else {
- tracing::debug!("{} req channel closed, saving and shutting down book", self.book_name());
- self.save().await;
- return;
- }
- }
- _ = save_interval.next() => self.save().await
- }
- }
- }
-}
diff --git a/p2p/src/address_book/address_book/peer_list.rs b/p2p/src/address_book/address_book/peer_list.rs
deleted file mode 100644
index e852100..0000000
--- a/p2p/src/address_book/address_book/peer_list.rs
+++ /dev/null
@@ -1,231 +0,0 @@
-//! This module contains the individual address books peer lists.
-//!
-use std::collections::{HashMap, HashSet};
-use std::hash::Hash;
-
-use cuprate_common::CRYPTONOTE_PRUNING_LOG_STRIPES;
-use monero_wire::{messages::PeerListEntryBase, NetworkAddress};
-use rand::Rng;
-
-#[cfg(test)]
-mod tests;
-
-/// A Peer list in the address book.
-///
-/// This could either be the white list or gray list.
-pub struct PeerList {
- /// The peers with their peer data.
- peers: HashMap,
- /// An index of Pruning seed to address, so
- /// can quickly grab peers with the pruning seed
- /// we want.
- pruning_idxs: HashMap>,
- /// An index of [`ban_identifier`](NetworkAddress::ban_identifier) to Address
- /// to allow us to quickly remove baned peers.
- ban_id_idxs: HashMap, Vec>,
-}
-
-impl<'a> Into> for &'a PeerList {
- fn into(self) -> Vec<&'a PeerListEntryBase> {
- self.peers.iter().map(|(_, peb)| peb).collect()
- }
-}
-
-impl PeerList {
- /// Creates a new peer list.
- pub fn new(list: Vec) -> PeerList {
- let mut peers = HashMap::with_capacity(list.len());
- let mut pruning_idxs = HashMap::with_capacity(2 << CRYPTONOTE_PRUNING_LOG_STRIPES);
- let mut ban_id_idxs = HashMap::with_capacity(list.len()); // worse case, every peer has a different NetworkAddress and ban id
-
- for peer in list {
- peers.insert(peer.adr, peer);
-
- pruning_idxs
- .entry(peer.pruning_seed)
- .or_insert_with(Vec::new)
- .push(peer.adr);
-
- ban_id_idxs
- .entry(peer.adr.ban_identifier())
- .or_insert_with(Vec::new)
- .push(peer.adr);
- }
- PeerList {
- peers,
- pruning_idxs,
- ban_id_idxs,
- }
- }
-
- /// Gets the length of the peer list
- pub fn len(&self) -> usize {
- self.peers.len()
- }
-
- /// Adds a new peer to the peer list
- pub fn add_new_peer(&mut self, peer: PeerListEntryBase) {
- if let None = self.peers.insert(peer.adr, peer) {
- self.pruning_idxs
- .entry(peer.pruning_seed)
- .or_insert_with(Vec::new)
- .push(peer.adr);
-
- self.ban_id_idxs
- .entry(peer.adr.ban_identifier())
- .or_insert_with(Vec::new)
- .push(peer.adr);
- }
- }
-
- /// Gets a reference to a peer
- pub fn get_peer(&self, peer: &NetworkAddress) -> Option<&PeerListEntryBase> {
- self.peers.get(peer)
- }
-
- /// Returns an iterator over every peer in this peer list
- pub fn iter_all_peers(&self) -> impl Iterator {
- self.peers.values()
- }
-
- /// Returns a random peer.
- /// If the pruning seed is specified then we will get a random peer with
- /// that pruning seed otherwise we will just get a random peer in the whole
- /// list.
- pub fn get_random_peer(
- &self,
- r: &mut R,
- pruning_seed: Option,
- ) -> Option<&PeerListEntryBase> {
- if let Some(seed) = pruning_seed {
- let mut peers = self.get_peers_with_pruning(&seed)?;
- let len = self.len_by_seed(&seed);
- if len == 0 {
- None
- } else {
- let n = r.gen_range(0..len);
-
- peers.nth(n)
- }
- } else {
- let mut peers = self.iter_all_peers();
- let len = self.len();
- if len == 0 {
- None
- } else {
- let n = r.gen_range(0..len);
-
- peers.nth(n)
- }
- }
- }
-
- /// Returns a mutable reference to a peer.
- pub fn get_peer_mut(&mut self, peer: &NetworkAddress) -> Option<&mut PeerListEntryBase> {
- self.peers.get_mut(peer)
- }
-
- /// Returns true if the list contains this peer.
- pub fn contains_peer(&self, peer: &NetworkAddress) -> bool {
- self.peers.contains_key(peer)
- }
-
- /// Returns an iterator of peer info of peers with a specific pruning seed.
- fn get_peers_with_pruning(
- &self,
- seed: &u32,
- ) -> Option> {
- let addrs = self.pruning_idxs.get(seed)?;
-
- Some(addrs.iter().map(move |addr| {
- self.peers
- .get(addr)
- .expect("Address must be in peer list if we have an idx for it")
- }))
- }
-
- /// Removes a peer from the pruning idx
- ///
- /// MUST NOT BE USED ALONE
- fn remove_peer_pruning_idx(&mut self, peer: &PeerListEntryBase) {
- remove_peer_idx(&mut self.pruning_idxs, &peer.pruning_seed, &peer.adr)
- }
-
- /// Removes a peer from the ban idx
- ///
- /// MUST NOT BE USED ALONE
- fn remove_peer_ban_idx(&mut self, peer: &PeerListEntryBase) {
- remove_peer_idx(&mut self.ban_id_idxs, &peer.adr.ban_identifier(), &peer.adr)
- }
-
- /// Removes a peer from all the indexes
- ///
- /// MUST NOT BE USED ALONE
- fn remove_peer_from_all_idxs(&mut self, peer: &PeerListEntryBase) {
- self.remove_peer_ban_idx(peer);
- self.remove_peer_pruning_idx(peer);
- }
-
- /// Removes a peer from the peer list
- pub fn remove_peer(&mut self, peer: &NetworkAddress) -> Option {
- let peer_eb = self.peers.remove(peer)?;
- self.remove_peer_from_all_idxs(&peer_eb);
- Some(peer_eb)
- }
-
- /// Removes all peers with a specific ban id.
- pub fn remove_peers_with_ban_id(&mut self, ban_id: &Vec) {
- let Some(addresses) = self.ban_id_idxs.get(ban_id) else {
- // No peers to ban
- return;
- };
- for addr in addresses.clone() {
- self.remove_peer(&addr);
- }
- }
-
- /// Tries to reduce the peer list to `new_len`.
- ///
- /// This function could keep the list bigger than `new_len` if `must_keep_peers`s length
- /// is larger than new_len, in that case we will remove as much as we can.
- pub fn reduce_list(&mut self, must_keep_peers: &HashSet, new_len: usize) {
- if new_len >= self.len() {
- return;
- }
-
- let target_removed = self.len() - new_len;
- let mut removed_count = 0;
- let mut peers_to_remove: Vec = Vec::with_capacity(target_removed);
-
- for (peer_adr, _) in &self.peers {
- if removed_count >= target_removed {
- break;
- }
- if !must_keep_peers.contains(peer_adr) {
- peers_to_remove.push(*peer_adr);
- removed_count += 1;
- }
- }
-
- for peer_adr in peers_to_remove {
- let _ = self.remove_peer(&peer_adr);
- }
- }
-}
-
-/// Remove a peer from an index.
-fn remove_peer_idx(
- idx_map: &mut HashMap>,
- idx: &T,
- addr: &NetworkAddress,
-) {
- if let Some(peer_list) = idx_map.get_mut(idx) {
- if let Some(idx) = peer_list.iter().position(|peer_adr| peer_adr == addr) {
- peer_list.swap_remove(idx);
- } else {
- unreachable!("This function will only be called when the peer exists.");
- }
- } else {
- unreachable!("Index must exist if a peer has that index");
- }
-}
diff --git a/p2p/src/address_book/address_book/peer_list/tests.rs b/p2p/src/address_book/address_book/peer_list/tests.rs
deleted file mode 100644
index 00ca37c..0000000
--- a/p2p/src/address_book/address_book/peer_list/tests.rs
+++ /dev/null
@@ -1,176 +0,0 @@
-use std::{collections::HashSet, vec};
-
-use monero_wire::{messages::PeerListEntryBase, NetworkAddress};
-use rand::Rng;
-
-use super::PeerList;
-
-fn make_fake_peer_list(numb_o_peers: usize) -> PeerList {
- let mut peer_list = vec![PeerListEntryBase::default(); numb_o_peers];
- for (idx, peer) in peer_list.iter_mut().enumerate() {
- let NetworkAddress::IPv4(ip) = &mut peer.adr else {panic!("this test requires default to be ipv4")};
- ip.m_ip += idx as u32;
- }
-
- PeerList::new(peer_list)
-}
-
-fn make_fake_peer_list_with_random_pruning_seeds(numb_o_peers: usize) -> PeerList {
- let mut r = rand::thread_rng();
-
- let mut peer_list = vec![PeerListEntryBase::default(); numb_o_peers];
- for (idx, peer) in peer_list.iter_mut().enumerate() {
- let NetworkAddress::IPv4(ip) = &mut peer.adr else {panic!("this test requires default to be ipv4")};
- ip.m_ip += idx as u32;
- ip.m_port += r.gen_range(0..15);
-
- peer.pruning_seed = if r.gen_bool(0.4) {
- 0
- } else {
- r.gen_range(384..=391)
- };
- }
-
- PeerList::new(peer_list)
-}
-
-#[test]
-fn peer_list_reduce_length() {
- let mut peer_list = make_fake_peer_list(2090);
- let must_keep_peers = HashSet::new();
-
- let target_len = 2000;
-
- peer_list.reduce_list(&must_keep_peers, target_len);
-
- assert_eq!(peer_list.len(), target_len);
-}
-
-#[test]
-fn peer_list_reduce_length_with_peers_we_need() {
- let mut peer_list = make_fake_peer_list(500);
- let must_keep_peers = HashSet::from_iter(peer_list.peers.iter().map(|(adr, _)| *adr));
-
- let target_len = 49;
-
- peer_list.reduce_list(&must_keep_peers, target_len);
-
- // we can't remove any of the peers we said we need them all
- assert_eq!(peer_list.len(), 500);
-}
-
-#[test]
-fn peer_list_get_peers_by_pruning_seed() {
- let mut r = rand::thread_rng();
-
- let peer_list = make_fake_peer_list_with_random_pruning_seeds(1000);
- let seed = if r.gen_bool(0.4) {
- 0
- } else {
- r.gen_range(384..=391)
- };
-
- let peers_with_seed = peer_list
- .get_peers_with_pruning(&seed)
- .expect("If you hit this buy a lottery ticket");
-
- for peer in peers_with_seed {
- assert_eq!(peer.pruning_seed, seed);
- }
-
- assert_eq!(peer_list.len(), 1000);
-}
-
-#[test]
-fn peer_list_remove_specific_peer() {
- let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100);
-
- let peer = peer_list
- .get_random_peer(&mut rand::thread_rng(), None)
- .unwrap()
- .clone();
-
- assert!(peer_list.remove_peer(&peer.adr).is_some());
-
- let pruning_idxs = peer_list.pruning_idxs;
- let peers = peer_list.peers;
-
- for (_, addrs) in pruning_idxs {
- addrs.iter().for_each(|adr| assert_ne!(adr, &peer.adr))
- }
-
- assert!(!peers.contains_key(&peer.adr));
-}
-
-#[test]
-fn peer_list_pruning_idxs_are_correct() {
- let peer_list = make_fake_peer_list_with_random_pruning_seeds(100);
- let mut total_len = 0;
-
- for (seed, list) in peer_list.pruning_idxs {
- for peer in list.iter() {
- assert_eq!(peer_list.peers.get(peer).unwrap().pruning_seed, seed);
- total_len += 1;
- }
- }
-
- assert_eq!(total_len, peer_list.peers.len())
-}
-
-#[test]
-fn peer_list_add_new_peer() {
- let mut peer_list = make_fake_peer_list(10);
- let mut new_peer = PeerListEntryBase::default();
- let NetworkAddress::IPv4(ip) = &mut new_peer.adr else {panic!("this test requires default to be ipv4")};
- ip.m_ip += 50;
-
- peer_list.add_new_peer(new_peer.clone());
-
- assert_eq!(peer_list.len(), 11);
- assert_eq!(peer_list.get_peer(&new_peer.adr), Some(&new_peer));
- assert!(peer_list
- .pruning_idxs
- .get(&new_peer.pruning_seed)
- .unwrap()
- .contains(&new_peer.adr));
-}
-
-#[test]
-fn peer_list_add_existing_peer() {
- let mut peer_list = make_fake_peer_list(10);
- let existing_peer = peer_list
- .get_peer(&NetworkAddress::default())
- .unwrap()
- .clone();
-
- peer_list.add_new_peer(existing_peer.clone());
-
- assert_eq!(peer_list.len(), 10);
- assert_eq!(peer_list.get_peer(&existing_peer.adr), Some(&existing_peer));
-}
-
-#[test]
-fn peer_list_get_non_existent_peer() {
- let peer_list = make_fake_peer_list(10);
- let mut non_existent_peer = NetworkAddress::default();
- let NetworkAddress::IPv4(ip) = &mut non_existent_peer else {panic!("this test requires default to be ipv4")};
- ip.m_ip += 50;
-
- assert_eq!(peer_list.get_peer(&non_existent_peer), None);
-}
-
-#[test]
-fn peer_list_ban_peers() {
- let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100);
- let peer = peer_list
- .get_random_peer(&mut rand::thread_rng(), None)
- .unwrap();
- let ban_id = peer.adr.ban_identifier();
- assert!(peer_list.contains_peer(&peer.adr));
- assert_ne!(peer_list.ban_id_idxs.get(&ban_id).unwrap().len(), 0);
- peer_list.remove_peers_with_ban_id(&ban_id);
- assert_eq!(peer_list.ban_id_idxs.get(&ban_id).unwrap().len(), 0);
- for (addr, _) in peer_list.peers {
- assert_ne!(addr.ban_identifier(), ban_id);
- }
-}
diff --git a/p2p/src/address_book/address_book/tests.rs b/p2p/src/address_book/address_book/tests.rs
deleted file mode 100644
index acf7460..0000000
--- a/p2p/src/address_book/address_book/tests.rs
+++ /dev/null
@@ -1,81 +0,0 @@
-use super::*;
-use crate::NetZoneBasicNodeData;
-use monero_wire::network_address::IPv4Address;
-use rand::Rng;
-
-fn create_random_net_address(r: &mut R) -> NetworkAddress {
- NetworkAddress::IPv4(IPv4Address {
- m_ip: r.gen(),
- m_port: r.gen(),
- })
-}
-
-fn create_random_net_addr_vec(r: &mut R, len: usize) -> Vec {
- let mut ret = Vec::with_capacity(len);
- for i in 0..len {
- ret.push(create_random_net_address(r));
- }
- ret
-}
-
-fn create_random_peer(r: &mut R) -> PeerListEntryBase {
- PeerListEntryBase {
- adr: create_random_net_address(r),
- pruning_seed: r.gen_range(384..=391),
- id: PeerID(r.gen()),
- last_seen: r.gen(),
- rpc_port: r.gen(),
- rpc_credits_per_hash: r.gen(),
- }
-}
-
-fn create_random_peer_vec(r: &mut R, len: usize) -> Vec {
- let mut ret = Vec::with_capacity(len);
- for i in 0..len {
- ret.push(create_random_peer(r));
- }
- ret
-}
-
-#[derive(Clone)]
-pub struct MockPeerStore;
-
-#[async_trait::async_trait]
-impl P2PStore for MockPeerStore {
- async fn basic_node_data(&mut self) -> Result