diff --git a/Cargo.toml b/Cargo.toml index 52aa2a4..757ed53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,17 +9,20 @@ members = [ # "database", "net/levin", "net/monero-wire", - "p2p/monero-peer", + "p2p/monero-p2p", + "p2p/address-book", "test-utils" ] [profile.release] +panic = 'abort' lto = true # Build with LTO strip = "none" # Keep panic stack traces codegen-units = 1 # Optimize for binary speed over compile times opt-level = 3 [profile.dev] +panic = 'abort' lto = false strip = "none" # Not much slower compile times than opt-level 0, but much faster code. diff --git a/common/Cargo.toml b/common/Cargo.toml index 919b36d..905c0b9 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -5,10 +5,15 @@ edition = "2021" license = "AGPL-3.0-only" authors = ["Boog900"] +[features] +default = [] +borsh = ["dep:borsh"] [dependencies] chrono = "0.4.24" thiserror = "1" hex = "0.4" -futures = "0.3.29" \ No newline at end of file +futures = "0.3.29" + +borsh = {version = "1.2.1", default-features = false, features = ["derive",], optional = true } \ No newline at end of file diff --git a/common/src/pruning.rs b/common/src/pruning.rs index b78cde8..43a8dd5 100644 --- a/common/src/pruning.rs +++ b/common/src/pruning.rs @@ -18,6 +18,8 @@ //! ``` //! +use std::cmp::Ordering; + use thiserror::Error; use super::{ @@ -50,9 +52,33 @@ pub enum PruningError { /// // Internally we use an Option to represent if a pruning seed is 0 (None)which means // no pruning will take place. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +#[cfg_attr( + feature = "borsh", + derive(borsh::BorshSerialize, borsh::BorshDeserialize) +)] pub struct PruningSeed(Option); +impl PartialOrd for PruningSeed { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for PruningSeed { + fn cmp(&self, other: &Self) -> Ordering { + match (self.get_log_stripes(), other.get_log_stripes()) { + (None, None) => Ordering::Equal, + (None, Some(_)) => Ordering::Greater, + (Some(_), None) => Ordering::Less, + (Some(stripe_s), Some(stripe_o)) => match stripe_s.cmp(&stripe_o) { + Ordering::Equal => self.get_stripe().unwrap().cmp(&other.get_stripe().unwrap()), + ordering => ordering, + }, + } + } +} + impl PruningSeed { /// Creates a new pruning seed from a `stripe` and `log_stripes` /// @@ -240,6 +266,12 @@ impl TryFrom for PruningSeed { } } +impl From for u32 { + fn from(value: PruningSeed) -> Self { + value.0.unwrap_or(0) + } +} + fn get_block_pruning_stripe( block_height: u64, blockchain_height: u64, diff --git a/p2p/address-book/Cargo.toml b/p2p/address-book/Cargo.toml new file mode 100644 index 0000000..f7f656c --- /dev/null +++ b/p2p/address-book/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "monero-address-book" +version = "0.1.0" +edition = "2021" +license = "MIT" +authors = ["Boog900"] + + +[dependencies] +cuprate-common = {path = "../../common"} +monero-wire = {path= "../../net/monero-wire"} +monero-p2p = {path = "../monero-p2p" } + +tower = { version= "0.4.13", features = ["util", "buffer"] } +tokio = {version = "1.34.0", default-features = false, features = ["time", "fs", "rt"]} + +futures = "0.3.29" +pin-project = "1.1.3" +async-trait = "0.1.74" + +thiserror = "1.0.50" +tracing = "0.1.40" + +rand = "0.8.5" + +borsh = {version = "1.2.1", features = ["derive"]} + +[dev-dependencies] +tokio = {version = "1.34.0", features = ["rt-multi-thread", "macros"]} +cuprate-test-utils = {path = "../../test-utils"} diff --git a/p2p/address-book/src/book.rs b/p2p/address-book/src/book.rs new file mode 100644 index 0000000..703ae0f --- /dev/null +++ b/p2p/address-book/src/book.rs @@ -0,0 +1,424 @@ +use std::{ + collections::{HashMap, HashSet}, + future::Future, + panic, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; +use pin_project::pin_project; +use tokio::{ + task::JoinHandle, + time::{interval, sleep, Interval, MissedTickBehavior, Sleep}, +}; +use tower::Service; + +use cuprate_common::{tower_utils::InstaFuture, PruningSeed}; +use monero_p2p::{ + client::InternalPeerID, + handles::ConnectionHandle, + services::{AddressBookRequest, AddressBookResponse, ZoneSpecificPeerListEntryBase}, + NetZoneAddress, NetworkZone, +}; + +use crate::{peer_list::PeerList, store::save_peers_to_disk, AddressBookError, Config}; + +#[cfg(test)] +mod tests; + +/// An entry in the connected list. +pub struct ConnectionPeerEntry { + addr: Option, + id: u64, + handle: ConnectionHandle, + /// The peers pruning seed + pruning_seed: PruningSeed, + /// The peers port. + rpc_port: u16, + /// The peers rpc credits per hash + rpc_credits_per_hash: u32, +} + +/// A future that resolves when a peer is unbanned. +#[pin_project(project = EnumProj)] +pub struct BanedPeerFut(Addr::BanID, #[pin] Sleep); + +impl Future for BanedPeerFut { + type Output = Addr::BanID; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + match this.1.poll_unpin(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(_) => Poll::Ready(*this.0), + } + } +} + +pub struct AddressBook { + /// Our white peers - the peers we have previously connected to. + white_list: PeerList, + /// Our gray peers - the peers we have been told about but haven't connected to. + gray_list: PeerList, + /// Our anchor peers - on start up will contain a list of peers we were connected to before shutting down + /// after that will contain a list of peers currently connected to that we can reach. + anchor_list: HashSet, + /// The currently connected peers. + connected_peers: HashMap, ConnectionPeerEntry>, + connected_peers_ban_id: HashMap<::BanID, HashSet>, + /// The currently banned peers + banned_peers: HashSet<::BanID>, + + banned_peers_fut: FuturesUnordered>, + + peer_save_task_handle: Option>>, + peer_save_interval: Interval, + + cfg: Config, +} + +impl AddressBook { + pub fn new( + cfg: Config, + white_peers: Vec>, + gray_peers: Vec>, + anchor_peers: Vec, + ) -> Self { + let white_list = PeerList::new(white_peers); + let gray_list = PeerList::new(gray_peers); + let anchor_list = HashSet::from_iter(anchor_peers); + + // TODO: persist banned peers + let banned_peers = HashSet::new(); + let banned_peers_fut = FuturesUnordered::new(); + let connected_peers = HashMap::new(); + + let mut peer_save_interval = interval(cfg.peer_save_period); + peer_save_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + Self { + white_list, + gray_list, + anchor_list, + connected_peers, + connected_peers_ban_id: HashMap::new(), + banned_peers, + banned_peers_fut, + peer_save_task_handle: None, + peer_save_interval, + cfg, + } + } + + fn poll_save_to_disk(&mut self, cx: &mut Context<'_>) { + if let Some(handle) = &mut self.peer_save_task_handle { + // if we have already spawned a task to save the peer list wait for that to complete. + match handle.poll_unpin(cx) { + Poll::Pending => return, + Poll::Ready(Ok(Err(e))) => { + tracing::error!("Could not save peer list to disk, got error: {}", e) + } + Poll::Ready(Err(e)) => { + if e.is_panic() { + panic::resume_unwind(e.into_panic()) + } + } + _ => (), + } + } + // the task is finished. + self.peer_save_task_handle = None; + + let Poll::Ready(_) = self.peer_save_interval.poll_tick(cx) else { + return; + }; + + self.peer_save_task_handle = Some(save_peers_to_disk( + &self.cfg, + &self.white_list, + &self.gray_list, + )); + } + + fn poll_unban_peers(&mut self, cx: &mut Context<'_>) { + while let Poll::Ready(Some(ban_id)) = self.banned_peers_fut.poll_next_unpin(cx) { + self.banned_peers.remove(&ban_id); + } + } + + fn poll_connected_peers(&mut self) { + let mut internal_addr_disconnected = Vec::new(); + let mut addrs_to_ban = Vec::new(); + + for (internal_addr, peer) in &mut self.connected_peers { + if let Some(time) = peer.handle.check_should_ban() { + match internal_addr { + InternalPeerID::KnownAddr(addr) => addrs_to_ban.push((*addr, time.0)), + // If we don't know the peers address all we can do is disconnect. + InternalPeerID::Unknown(_) => peer.handle.send_close_signal(), + } + } + + if peer.handle.is_closed() { + internal_addr_disconnected.push(*internal_addr); + } + } + + for (addr, time) in addrs_to_ban.into_iter() { + self.ban_peer(addr, time); + } + + for disconnected_addr in internal_addr_disconnected { + self.connected_peers.remove(&disconnected_addr); + if let InternalPeerID::KnownAddr(addr) = disconnected_addr { + // remove the peer from the connected peers with this ban ID. + self.connected_peers_ban_id + .get_mut(&addr.ban_id()) + .unwrap() + .remove(&addr); + + // If the amount of peers with this ban id is 0 remove the whole set. + if self + .connected_peers_ban_id + .get(&addr.ban_id()) + .unwrap() + .is_empty() + { + self.connected_peers_ban_id.remove(&addr.ban_id()); + } + // remove the peer from the anchor list. + self.anchor_list.remove(&addr); + } + } + } + + fn ban_peer(&mut self, addr: Z::Addr, time: Duration) { + if self.banned_peers.contains(&addr.ban_id()) { + return; + } + + if let Some(connected_peers_with_ban_id) = self.connected_peers_ban_id.get(&addr.ban_id()) { + for peer in connected_peers_with_ban_id.iter().map(|addr| { + tracing::debug!("Banning peer: {}, for: {:?}", addr, time); + + self.connected_peers + .get(&InternalPeerID::KnownAddr(*addr)) + .expect("Peer must be in connected list if in connected_peers_with_ban_id") + }) { + // The peer will get removed from our connected list once we disconnect + peer.handle.send_close_signal(); + // Remove the peer now from anchors so we don't accidentally persist a bad anchor peer to disk. + self.anchor_list.remove(&addr); + } + } + + self.white_list.remove_peers_with_ban_id(&addr.ban_id()); + self.gray_list.remove_peers_with_ban_id(&addr.ban_id()); + + self.banned_peers.insert(addr.ban_id()); + self.banned_peers_fut + .push(BanedPeerFut(addr.ban_id(), sleep(time))) + } + + /// adds a peer to the gray list. + fn add_peer_to_gray_list(&mut self, mut peer: ZoneSpecificPeerListEntryBase) { + if self.white_list.contains_peer(&peer.adr) { + return; + }; + if !self.gray_list.contains_peer(&peer.adr) { + peer.last_seen = 0; + self.gray_list.add_new_peer(peer); + } + } + + /// Checks if a peer is banned. + fn is_peer_banned(&self, peer: &Z::Addr) -> bool { + self.banned_peers.contains(&peer.ban_id()) + } + + fn handle_incoming_peer_list( + &mut self, + mut peer_list: Vec>, + ) { + tracing::debug!("Received new peer list, length: {}", peer_list.len()); + + peer_list.retain(|peer| { + if !peer.adr.should_add_to_peer_list() { + false + } else { + !self.is_peer_banned(&peer.adr) + } + // TODO: check rpc/ p2p ports not the same + }); + + for peer in peer_list { + self.add_peer_to_gray_list(peer); + } + // The gray list has no peers we need to keep in the list so just pass an empty HashSet. + self.gray_list + .reduce_list(&HashSet::new(), self.cfg.max_gray_list_length); + } + + fn get_random_white_peer( + &self, + block_needed: Option, + ) -> Option> { + tracing::debug!("Retrieving random white peer"); + self.white_list + .get_random_peer(&mut rand::thread_rng(), block_needed) + .copied() + } + + fn get_random_gray_peer( + &self, + block_needed: Option, + ) -> Option> { + tracing::debug!("Retrieving random gray peer"); + self.gray_list + .get_random_peer(&mut rand::thread_rng(), block_needed) + .copied() + } + + fn get_white_peers(&self, len: usize) -> Vec> { + tracing::debug!("Retrieving white peers, maximum: {}", len); + self.white_list + .get_random_peers(&mut rand::thread_rng(), len) + } + + /// Updates an entry in the white list, if the peer is not found then + /// the peer will be added to the white list. + fn update_white_list_peer_entry( + &mut self, + peer: &ConnectionPeerEntry, + ) -> Result<(), AddressBookError> { + let Some(addr) = &peer.addr else { + // If the peer isn't reachable we shouldn't add it too our address book. + return Ok(()); + }; + + if let Some(peb) = self.white_list.get_peer_mut(addr) { + if peb.pruning_seed != peer.pruning_seed { + return Err(AddressBookError::PeersDataChanged("Pruning seed")); + } + if Z::CHECK_NODE_ID && peb.id != peer.id { + return Err(AddressBookError::PeersDataChanged("peer ID")); + } + // TODO: cuprate doesn't need last seen timestamps but should we have them anyway? + peb.last_seen = 0; + peb.rpc_port = peer.rpc_port; + peb.rpc_credits_per_hash = peer.rpc_credits_per_hash; + } else { + // if the peer is reachable add it to our white list + let peb = ZoneSpecificPeerListEntryBase { + id: peer.id, + adr: *addr, + last_seen: 0, + rpc_port: peer.rpc_port, + rpc_credits_per_hash: peer.rpc_credits_per_hash, + pruning_seed: peer.pruning_seed, + }; + self.white_list.add_new_peer(peb); + } + Ok(()) + } + + fn handle_new_connection( + &mut self, + internal_peer_id: InternalPeerID, + peer: ConnectionPeerEntry, + ) -> Result<(), AddressBookError> { + if self.connected_peers.contains_key(&internal_peer_id) { + return Err(AddressBookError::PeerAlreadyConnected); + } + + // If we know the address then check if it's banned. + if let InternalPeerID::KnownAddr(addr) = &internal_peer_id { + if self.is_peer_banned(addr) { + return Err(AddressBookError::PeerIsBanned); + } + // although the peer may not be readable still add it to the connected peers with ban ID. + self.connected_peers_ban_id + .entry(addr.ban_id()) + .or_default() + .insert(*addr); + } + + // if the address is Some that means we can reach it from our node. + if let Some(addr) = peer.addr { + // remove the peer from the gray list as we know it's active. + self.gray_list.remove_peer(&addr); + // The peer is reachable, update our white list and add it to the anchor connections. + self.update_white_list_peer_entry(&peer)?; + self.white_list + .reduce_list(&self.anchor_list, self.cfg.max_white_list_length); + self.anchor_list.insert(addr); + } + + self.connected_peers.insert(internal_peer_id, peer); + Ok(()) + } +} + +impl Service> for AddressBook { + type Response = AddressBookResponse; + type Error = AddressBookError; + type Future = InstaFuture>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.poll_unban_peers(cx); + self.poll_save_to_disk(cx); + self.poll_connected_peers(); + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: AddressBookRequest) -> Self::Future { + let span = tracing::info_span!("AddressBook"); + let _guard = span.enter(); + + let response = match req { + AddressBookRequest::NewConnection { + addr, + internal_peer_id, + handle, + id, + pruning_seed, + rpc_port, + rpc_credits_per_hash, + } => self + .handle_new_connection( + internal_peer_id, + ConnectionPeerEntry { + addr, + id, + handle, + pruning_seed, + rpc_port, + rpc_credits_per_hash, + }, + ) + .map(|_| AddressBookResponse::Ok), + AddressBookRequest::BanPeer(addr, time) => { + self.ban_peer(addr, time); + Ok(AddressBookResponse::Ok) + } + AddressBookRequest::IncomingPeerList(peer_list) => { + self.handle_incoming_peer_list(peer_list); + Ok(AddressBookResponse::Ok) + } + AddressBookRequest::GetRandomWhitePeer { height } => self + .get_random_white_peer(height) + .map(AddressBookResponse::Peer) + .ok_or(AddressBookError::PeerNotFound), + AddressBookRequest::GetRandomGrayPeer { height } => self + .get_random_gray_peer(height) + .map(AddressBookResponse::Peer) + .ok_or(AddressBookError::PeerNotFound), + AddressBookRequest::GetWhitePeers(len) => { + Ok(AddressBookResponse::Peers(self.get_white_peers(len))) + } + }; + + InstaFuture::from(response) + } +} diff --git a/p2p/address-book/src/book/tests.rs b/p2p/address-book/src/book/tests.rs new file mode 100644 index 0000000..d6df95b --- /dev/null +++ b/p2p/address-book/src/book/tests.rs @@ -0,0 +1,149 @@ +use std::{path::PathBuf, sync::Arc, time::Duration}; + +use futures::StreamExt; +use tokio::sync::Semaphore; +use tokio::time::interval; + +use cuprate_common::PruningSeed; +use monero_p2p::handles::HandleBuilder; + +use super::{AddressBook, ConnectionPeerEntry, InternalPeerID}; +use crate::{peer_list::tests::make_fake_peer_list, AddressBookError, Config}; + +use cuprate_test_utils::test_netzone::{TestNetZone, TestNetZoneAddr}; + +fn test_cfg() -> Config { + Config { + max_white_list_length: 100, + max_gray_list_length: 500, + peer_store_file: PathBuf::new(), + peer_save_period: Duration::from_secs(60), + } +} + +fn make_fake_address_book( + numb_white: u32, + numb_gray: u32, +) -> AddressBook> { + let white_list = make_fake_peer_list(0, numb_white); + let gray_list = make_fake_peer_list(numb_white, numb_gray); + + AddressBook { + white_list, + gray_list, + anchor_list: Default::default(), + connected_peers: Default::default(), + connected_peers_ban_id: Default::default(), + banned_peers: Default::default(), + banned_peers_fut: Default::default(), + peer_save_task_handle: None, + peer_save_interval: interval(Duration::from_secs(60)), + cfg: test_cfg(), + } +} + +#[tokio::test] +async fn get_random_peers() { + let address_book = make_fake_address_book(50, 250); + let peer = address_book.get_random_white_peer(None).unwrap(); + assert!(address_book.white_list.contains_peer(&peer.adr)); + assert!(!address_book.gray_list.contains_peer(&peer.adr)); + + let peer = address_book.get_random_gray_peer(None).unwrap(); + assert!(!address_book.white_list.contains_peer(&peer.adr)); + assert!(address_book.gray_list.contains_peer(&peer.adr)); +} + +#[tokio::test] +async fn get_white_peers() { + let address_book = make_fake_address_book(100, 0); + let peers = address_book.get_white_peers(50); + assert_eq!(peers.len(), 50); + let peers = address_book.get_white_peers(60); + assert_eq!(peers.len(), 60); + for window in peers.windows(2) { + assert_ne!(window[0], window[1]); + } + + let address_book = make_fake_address_book(45, 0); + let peers = address_book.get_white_peers(50); + assert_eq!(peers.len(), 45); + let peers = address_book.get_white_peers(60); + assert_eq!(peers.len(), 45); + for window in peers.windows(2) { + assert_ne!(window[0], window[1]); + } +} + +#[tokio::test] +async fn add_new_peer_already_connected() { + let mut address_book = make_fake_address_book(0, 0); + + let semaphore = Arc::new(Semaphore::new(10)); + + let (_, handle, _) = HandleBuilder::default() + .with_permit(semaphore.clone().try_acquire_owned().unwrap()) + .build(); + + address_book + .handle_new_connection( + InternalPeerID::KnownAddr(TestNetZoneAddr(1)), + ConnectionPeerEntry { + addr: None, + id: 0, + handle, + pruning_seed: PruningSeed::try_from(385).unwrap(), + rpc_port: 0, + rpc_credits_per_hash: 0, + }, + ) + .unwrap(); + + let (_, handle, _) = HandleBuilder::default() + .with_permit(semaphore.try_acquire_owned().unwrap()) + .build(); + + assert_eq!( + address_book.handle_new_connection( + InternalPeerID::KnownAddr(TestNetZoneAddr(1)), + ConnectionPeerEntry { + addr: None, + id: 0, + handle, + pruning_seed: PruningSeed::try_from(385).unwrap(), + rpc_port: 0, + rpc_credits_per_hash: 0, + }, + ), + Err(AddressBookError::PeerAlreadyConnected) + ) +} + +#[tokio::test] +async fn banned_peer_removed_from_peer_lists() { + let mut address_book = make_fake_address_book(100, 0); + + assert_eq!(address_book.banned_peers.len(), 0); + assert_eq!(address_book.white_list.len(), 100); + + address_book.ban_peer(TestNetZoneAddr(1), Duration::from_secs(1)); + assert_eq!(address_book.banned_peers.len(), 1); + assert_eq!(address_book.white_list.len(), 99); + + address_book.ban_peer(TestNetZoneAddr(1), Duration::from_secs(1)); + assert_eq!(address_book.banned_peers.len(), 1); + assert_eq!(address_book.white_list.len(), 99); + + address_book.ban_peer(TestNetZoneAddr(1), Duration::from_secs(1)); + assert_eq!(address_book.banned_peers.len(), 1); + assert_eq!(address_book.white_list.len(), 99); + + address_book.ban_peer(TestNetZoneAddr(5), Duration::from_secs(100)); + assert_eq!(address_book.banned_peers.len(), 2); + assert_eq!(address_book.white_list.len(), 98); + + assert_eq!( + address_book.banned_peers_fut.next().await.unwrap(), + TestNetZoneAddr(1) + ) +} diff --git a/p2p/address-book/src/lib.rs b/p2p/address-book/src/lib.rs new file mode 100644 index 0000000..cc2dc38 --- /dev/null +++ b/p2p/address-book/src/lib.rs @@ -0,0 +1,76 @@ +//! Cuprate Address Book +//! +//! This module holds the logic for persistent peer storage. +//! Cuprates address book is modeled as a [`tower::Service`] +//! The request is [`AddressBookRequest`] and the response is +//! [`AddressBookResponse`]. +//! +//! Cuprate, like monerod, actually has multiple address books, one +//! for each [`NetworkZone`]. This is to reduce the possibility of +//! clear net peers getting linked to their dark counterparts +//! and so peers will only get told about peers they can +//! connect to. +//! + +use std::{path::PathBuf, time::Duration}; + +use monero_p2p::{ + services::{AddressBookRequest, AddressBookResponse}, + NetworkZone, +}; + +mod book; +mod peer_list; +mod store; + +#[derive(Debug, Clone)] +pub struct Config { + max_white_list_length: usize, + max_gray_list_length: usize, + peer_store_file: PathBuf, + peer_save_period: Duration, +} + +/// Possible errors when dealing with the address book. +/// This is boxed when returning an error in the [`tower::Service`]. +#[derive(Debug, thiserror::Error, Eq, PartialEq)] +pub enum AddressBookError { + /// The peer is already connected. + #[error("Peer is already connected")] + PeerAlreadyConnected, + /// The peer is not in the address book for this zone. + #[error("Peer was not found in book")] + PeerNotFound, + /// The peer list is empty. + #[error("The peer list is empty")] + PeerListEmpty, + /// Immutable peer data was changed. + #[error("Immutable peer data was changed: {0}")] + PeersDataChanged(&'static str), + /// The peer is banned. + #[error("The peer is banned")] + PeerIsBanned, + /// The channel to the address book has closed unexpectedly. + #[error("The address books channel has closed.")] + AddressBooksChannelClosed, + /// The address book task has exited. + #[error("The address book task has exited.")] + AddressBookTaskExited, +} + +pub async fn init_address_book( + cfg: Config, +) -> Result< + impl tower::Service< + AddressBookRequest, + Response = AddressBookResponse, + Error = tower::BoxError, + >, + std::io::Error, +> { + let (white_list, gray_list) = store::read_peers_from_disk::(&cfg).await?; + + let address_book = book::AddressBook::::new(cfg, white_list, gray_list, Vec::new()); + + Ok(tower::buffer::Buffer::new(address_book, 15)) +} diff --git a/p2p/address-book/src/peer_list.rs b/p2p/address-book/src/peer_list.rs new file mode 100644 index 0000000..d48ed35 --- /dev/null +++ b/p2p/address-book/src/peer_list.rs @@ -0,0 +1,240 @@ +use std::collections::{BTreeMap, HashMap, HashSet}; + +use rand::{seq::SliceRandom, Rng}; + +use cuprate_common::{PruningSeed, CRYPTONOTE_MAX_BLOCK_NUMBER}; +use monero_p2p::{services::ZoneSpecificPeerListEntryBase, NetZoneAddress, NetworkZone}; + +#[cfg(test)] +pub mod tests; + +/// A Peer list in the address book. +/// +/// This could either be the white list or gray list. +#[derive(Debug)] +pub struct PeerList { + /// The peers with their peer data. + pub peers: HashMap>, + /// An index of Pruning seed to address, so can quickly grab peers with the blocks + /// we want. + /// + /// Pruning seeds are sorted by first their log_stripes and then their stripe. + /// This means the first peers in this list will store more blocks than peers + /// later on. So when we need a peer with a certain block we look at the peers + /// storing more blocks first then work our way to the peers storing less. + /// + pruning_seeds: BTreeMap>, + /// A hashmap linking ban_ids to addresses. + ban_ids: HashMap<::BanID, Vec>, +} + +impl PeerList { + /// Creates a new peer list. + pub fn new(list: Vec>) -> PeerList { + let mut peers = HashMap::with_capacity(list.len()); + let mut pruning_seeds = BTreeMap::new(); + let mut ban_ids = HashMap::with_capacity(list.len()); + + for peer in list { + pruning_seeds + .entry(peer.pruning_seed) + .or_insert_with(Vec::new) + .push(peer.adr); + + ban_ids + .entry(peer.adr.ban_id()) + .or_insert_with(Vec::new) + .push(peer.adr); + + peers.insert(peer.adr, peer); + } + PeerList { + peers, + pruning_seeds, + ban_ids, + } + } + + /// Gets the length of the peer list + pub fn len(&self) -> usize { + self.peers.len() + } + + /// Adds a new peer to the peer list + pub fn add_new_peer(&mut self, peer: ZoneSpecificPeerListEntryBase) { + if self.peers.insert(peer.adr, peer).is_none() { + // It's more clear with this + #[allow(clippy::unwrap_or_default)] + self.pruning_seeds + .entry(peer.pruning_seed) + .or_insert_with(Vec::new) + .push(peer.adr); + + #[allow(clippy::unwrap_or_default)] + self.ban_ids + .entry(peer.adr.ban_id()) + .or_insert_with(Vec::new) + .push(peer.adr); + } + } + + /// Gets a reference to a peer + pub fn get_peer(&self, peer: &Z::Addr) -> Option<&ZoneSpecificPeerListEntryBase> { + self.peers.get(peer) + } + + /// Returns a random peer. + /// If the pruning seed is specified then we will get a random peer with + /// that pruning seed otherwise we will just get a random peer in the whole + /// list. + pub fn get_random_peer( + &self, + r: &mut R, + block_needed: Option, + ) -> Option<&ZoneSpecificPeerListEntryBase> { + if let Some(needed_height) = block_needed { + let (_, addresses_with_block) = self.pruning_seeds.iter().find(|(seed, _)| { + // TODO: factor in peer blockchain height? + seed.get_next_unpruned_block(needed_height, CRYPTONOTE_MAX_BLOCK_NUMBER) + .expect("Explain") + == needed_height + })?; + let n = r.gen_range(0..addresses_with_block.len()); + self.get_peer(&addresses_with_block[n]) + } else { + let len = self.len(); + if len == 0 { + None + } else { + let n = r.gen_range(0..len); + + self.peers.values().nth(n) + } + } + } + + pub fn get_random_peers( + &self, + r: &mut R, + len: usize, + ) -> Vec> { + let mut peers = self.peers.values().copied().collect::>(); + peers.shuffle(r); + peers.drain(len.min(peers.len())..peers.len()); + peers + } + + /// Returns a mutable reference to a peer. + pub fn get_peer_mut( + &mut self, + peer: &Z::Addr, + ) -> Option<&mut ZoneSpecificPeerListEntryBase> { + self.peers.get_mut(peer) + } + + /// Returns true if the list contains this peer. + pub fn contains_peer(&self, peer: &Z::Addr) -> bool { + self.peers.contains_key(peer) + } + + /// Removes a peer from the pruning idx + /// + /// MUST NOT BE USED ALONE + fn remove_peer_pruning_idx(&mut self, peer: &ZoneSpecificPeerListEntryBase) { + remove_peer_idx::(self.pruning_seeds.get_mut(&peer.pruning_seed), &peer.adr); + if self + .pruning_seeds + .get(&peer.pruning_seed) + .expect("There must be a peer with this id") + .is_empty() + { + self.pruning_seeds.remove(&peer.pruning_seed); + } + } + + /// Removes a peer from the ban idx + /// + /// MUST NOT BE USED ALONE + fn remove_peer_ban_idx(&mut self, peer: &ZoneSpecificPeerListEntryBase) { + remove_peer_idx::(self.ban_ids.get_mut(&peer.adr.ban_id()), &peer.adr); + if self + .ban_ids + .get(&peer.adr.ban_id()) + .expect("There must be a peer with this id") + .is_empty() + { + self.ban_ids.remove(&peer.adr.ban_id()); + } + } + + /// Removes a peer from all the indexes + /// + /// MUST NOT BE USED ALONE + fn remove_peer_from_all_idxs(&mut self, peer: &ZoneSpecificPeerListEntryBase) { + self.remove_peer_pruning_idx(peer); + self.remove_peer_ban_idx(peer) + } + + /// Removes a peer from the peer list + pub fn remove_peer( + &mut self, + peer: &Z::Addr, + ) -> Option> { + let peer_eb = self.peers.remove(peer)?; + self.remove_peer_from_all_idxs(&peer_eb); + Some(peer_eb) + } + + /// Removes all peers with a specific ban id. + pub fn remove_peers_with_ban_id(&mut self, ban_id: &::BanID) { + let Some(addresses) = self.ban_ids.get(ban_id) else { + // No peers to ban + return; + }; + + for addr in addresses.clone() { + self.remove_peer(&addr); + } + } + + /// Tries to reduce the peer list to `new_len`. + /// + /// This function could keep the list bigger than `new_len` if `must_keep_peers`s length + /// is larger than new_len, in that case we will remove as much as we can. + pub fn reduce_list(&mut self, must_keep_peers: &HashSet, new_len: usize) { + if new_len >= self.len() { + return; + } + + let target_removed = self.len() - new_len; + let mut removed_count = 0; + let mut peers_to_remove: Vec = Vec::with_capacity(target_removed); + + for peer_adr in self.peers.keys() { + if removed_count >= target_removed { + break; + } + if !must_keep_peers.contains(peer_adr) { + peers_to_remove.push(*peer_adr); + removed_count += 1; + } + } + + for peer_adr in peers_to_remove { + let _ = self.remove_peer(&peer_adr); + } + } +} + +/// Remove a peer from an index. +fn remove_peer_idx(peer_list: Option<&mut Vec>, addr: &Z::Addr) { + if let Some(peer_list) = peer_list { + if let Some(idx) = peer_list.iter().position(|peer_adr| peer_adr == addr) { + peer_list.swap_remove(idx); + } else { + unreachable!("This function will only be called when the peer exists."); + } + } else { + unreachable!("Index must exist if a peer has that index"); + } +} diff --git a/p2p/address-book/src/peer_list/tests.rs b/p2p/address-book/src/peer_list/tests.rs new file mode 100644 index 0000000..e732246 --- /dev/null +++ b/p2p/address-book/src/peer_list/tests.rs @@ -0,0 +1,186 @@ +use std::collections::HashSet; + +use rand::Rng; + +use cuprate_common::PruningSeed; +use monero_p2p::services::ZoneSpecificPeerListEntryBase; + +use cuprate_test_utils::test_netzone::{TestNetZone, TestNetZoneAddr}; +use monero_p2p::NetZoneAddress; + +use super::PeerList; + +fn make_fake_peer( + id: u32, + pruning_seed: Option, +) -> ZoneSpecificPeerListEntryBase { + ZoneSpecificPeerListEntryBase { + adr: TestNetZoneAddr(id), + id: id as u64, + last_seen: 0, + pruning_seed: PruningSeed::try_from(pruning_seed.unwrap_or(0)).unwrap(), + rpc_port: 0, + rpc_credits_per_hash: 0, + } +} + +pub fn make_fake_peer_list( + start_idx: u32, + numb_o_peers: u32, +) -> PeerList> { + let mut peer_list = Vec::with_capacity(numb_o_peers as usize); + + for idx in start_idx..(start_idx + numb_o_peers) { + peer_list.push(make_fake_peer(idx, None)) + } + + PeerList::new(peer_list) +} + +fn make_fake_peer_list_with_random_pruning_seeds( + numb_o_peers: u32, +) -> PeerList> { + let mut r = rand::thread_rng(); + + let mut peer_list = Vec::with_capacity(numb_o_peers as usize); + + for idx in 0..numb_o_peers { + peer_list.push(make_fake_peer( + idx, + Some(if r.gen_bool(0.4) { + 0 + } else { + r.gen_range(384..=391) + }), + )) + } + PeerList::new(peer_list) +} + +#[test] +fn peer_list_reduce_length() { + let mut peer_list = make_fake_peer_list(0, 2090); + let must_keep_peers = HashSet::new(); + + let target_len = 2000; + + peer_list.reduce_list(&must_keep_peers, target_len); + + assert_eq!(peer_list.len(), target_len); +} + +#[test] +fn peer_list_reduce_length_with_peers_we_need() { + let mut peer_list = make_fake_peer_list(0, 500); + let must_keep_peers = HashSet::from_iter(peer_list.peers.keys().copied()); + + let target_len = 49; + + peer_list.reduce_list(&must_keep_peers, target_len); + + // we can't remove any of the peers we said we need them all + assert_eq!(peer_list.len(), 500); +} + +#[test] +fn peer_list_remove_specific_peer() { + let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100); + + let peer = *peer_list + .get_random_peer(&mut rand::thread_rng(), None) + .unwrap(); + + assert!(peer_list.remove_peer(&peer.adr).is_some()); + + let pruning_idxs = peer_list.pruning_seeds; + let peers = peer_list.peers; + + for (_, addrs) in pruning_idxs { + addrs.iter().for_each(|adr| assert_ne!(adr, &peer.adr)) + } + + assert!(!peers.contains_key(&peer.adr)); +} + +#[test] +fn peer_list_pruning_idxs_are_correct() { + let peer_list = make_fake_peer_list_with_random_pruning_seeds(100); + let mut total_len = 0; + + for (seed, list) in peer_list.pruning_seeds { + for peer in list.iter() { + assert_eq!(peer_list.peers.get(peer).unwrap().pruning_seed, seed); + total_len += 1; + } + } + + assert_eq!(total_len, peer_list.peers.len()) +} + +#[test] +fn peer_list_add_new_peer() { + let mut peer_list = make_fake_peer_list(0, 10); + let new_peer = make_fake_peer(50, None); + + peer_list.add_new_peer(new_peer); + + assert_eq!(peer_list.len(), 11); + assert_eq!(peer_list.get_peer(&new_peer.adr), Some(&new_peer)); + assert!(peer_list + .pruning_seeds + .get(&new_peer.pruning_seed) + .unwrap() + .contains(&new_peer.adr)); +} + +#[test] +fn peer_list_add_existing_peer() { + let mut peer_list = make_fake_peer_list(0, 10); + let existing_peer = *peer_list.get_peer(&TestNetZoneAddr(0)).unwrap(); + + peer_list.add_new_peer(existing_peer); + + assert_eq!(peer_list.len(), 10); + assert_eq!(peer_list.get_peer(&existing_peer.adr), Some(&existing_peer)); +} + +#[test] +fn peer_list_get_non_existent_peer() { + let peer_list = make_fake_peer_list(0, 10); + let non_existent_peer = TestNetZoneAddr(50); + assert_eq!(peer_list.get_peer(&non_existent_peer), None); +} + +#[test] +fn peer_list_get_peer_with_block() { + let mut r = rand::thread_rng(); + + let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100); + peer_list.add_new_peer(make_fake_peer(101, Some(384))); + + let peer = peer_list + .get_random_peer(&mut r, Some(1)) + .expect("We just added a peer with the correct seed"); + + assert!(peer + .pruning_seed + .get_next_unpruned_block(1, 1_000_000) + .is_ok()) +} + +#[test] +fn peer_list_ban_peers() { + let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100); + let peer = peer_list + .get_random_peer(&mut rand::thread_rng(), None) + .unwrap(); + let ban_id = peer.adr.ban_id(); + + assert!(peer_list.contains_peer(&peer.adr)); + assert_ne!(peer_list.ban_ids.get(&ban_id).unwrap().len(), 0); + peer_list.remove_peers_with_ban_id(&ban_id); + assert_eq!(peer_list.ban_ids.get(&ban_id), None); + for (addr, _) in peer_list.peers { + assert_ne!(addr.ban_id(), ban_id); + } +} diff --git a/p2p/address-book/src/store.rs b/p2p/address-book/src/store.rs new file mode 100644 index 0000000..690bcfa --- /dev/null +++ b/p2p/address-book/src/store.rs @@ -0,0 +1,92 @@ +use std::fs; + +use borsh::{from_slice, to_vec, BorshDeserialize, BorshSerialize}; +use tokio::task::{spawn_blocking, JoinHandle}; + +use monero_p2p::{services::ZoneSpecificPeerListEntryBase, NetZoneAddress, NetworkZone}; + +use crate::{peer_list::PeerList, Config}; + +// TODO: store anchor and ban list. + +#[derive(BorshSerialize)] +struct SerPeerDataV1<'a, A: NetZoneAddress> { + white_list: Vec<&'a ZoneSpecificPeerListEntryBase>, + gray_list: Vec<&'a ZoneSpecificPeerListEntryBase>, +} + +#[derive(BorshDeserialize)] +struct DeserPeerDataV1 { + white_list: Vec>, + gray_list: Vec>, +} + +pub fn save_peers_to_disk( + cfg: &Config, + white_list: &PeerList, + gray_list: &PeerList, +) -> JoinHandle> { + // maybe move this to another thread but that would require cloning the data ... this + // happens so infrequently that it's probably not worth it. + let data = to_vec(&SerPeerDataV1 { + white_list: white_list.peers.values().collect::>(), + gray_list: gray_list.peers.values().collect::>(), + }) + .unwrap(); + + let file = cfg.peer_store_file.clone(); + spawn_blocking(move || fs::write(&file, &data)) +} + +pub async fn read_peers_from_disk( + cfg: &Config, +) -> Result< + ( + Vec>, + Vec>, + ), + std::io::Error, +> { + let file = cfg.peer_store_file.clone(); + let data = spawn_blocking(move || fs::read(file)).await.unwrap()?; + + let de_ser: DeserPeerDataV1 = from_slice(&data)?; + Ok((de_ser.white_list, de_ser.gray_list)) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::peer_list::{tests::make_fake_peer_list, PeerList}; + + use cuprate_test_utils::test_netzone::{TestNetZone, TestNetZoneAddr}; + + #[test] + fn ser_deser_peer_list() { + let white_list = make_fake_peer_list(0, 50); + let gray_list = make_fake_peer_list(50, 100); + + let data = to_vec(&SerPeerDataV1 { + white_list: white_list.peers.values().collect::>(), + gray_list: gray_list.peers.values().collect::>(), + }) + .unwrap(); + + let de_ser: DeserPeerDataV1 = from_slice(&data).unwrap(); + + let white_list_2: PeerList> = + PeerList::new(de_ser.white_list); + let gray_list_2: PeerList> = PeerList::new(de_ser.gray_list); + + assert_eq!(white_list.peers.len(), white_list_2.peers.len()); + assert_eq!(gray_list.peers.len(), gray_list_2.peers.len()); + + for addr in white_list.peers.keys() { + assert!(white_list_2.contains_peer(addr)); + } + + for addr in gray_list.peers.keys() { + assert!(gray_list_2.contains_peer(addr)); + } + } +} diff --git a/p2p/monero-peer/Cargo.toml b/p2p/monero-p2p/Cargo.toml similarity index 70% rename from p2p/monero-peer/Cargo.toml rename to p2p/monero-p2p/Cargo.toml index 03ef392..b9a8090 100644 --- a/p2p/monero-peer/Cargo.toml +++ b/p2p/monero-p2p/Cargo.toml @@ -1,25 +1,29 @@ [package] -name = "monero-peer" +name = "monero-p2p" version = "0.1.0" edition = "2021" +license = "MIT" +authors = ["Boog900"] [features] -default = [] +default = ["borsh"] +borsh = ["dep:borsh"] [dependencies] monero-wire = {path= "../../net/monero-wire"} -cuprate-common = {path = "../../common"} +cuprate-common = {path = "../../common", features = ["borsh"]} tokio = {version= "1.34.0", default-features = false, features = ["net"]} tokio-util = { version = "0.7.10", default-features = false, features = ["codec"] } futures = "0.3.29" async-trait = "0.1.74" - tower = { version= "0.4.13", features = ["util"] } -thiserror = "1.0.50" +thiserror = "1.0.50" tracing = "0.1.40" +borsh = {version = "1.2.1", default-features = false, features = ["derive", "std"], optional = true } + [dev-dependencies] cuprate-test-utils = {path = "../../test-utils"} diff --git a/p2p/monero-p2p/src/client.rs b/p2p/monero-p2p/src/client.rs new file mode 100644 index 0000000..4416702 --- /dev/null +++ b/p2p/monero-p2p/src/client.rs @@ -0,0 +1,14 @@ +mod conector; +mod connection; +pub mod handshaker; + +pub use conector::{ConnectRequest, Connector}; +pub use handshaker::{DoHandshakeRequest, HandShaker, HandshakeError}; + +/// An internal identifier for a given peer, will be their address if known +/// or a random u64 if not. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub enum InternalPeerID { + KnownAddr(A), + Unknown(u64), +} diff --git a/p2p/monero-peer/src/client/conector.rs b/p2p/monero-p2p/src/client/conector.rs similarity index 98% rename from p2p/monero-peer/src/client/conector.rs rename to p2p/monero-p2p/src/client/conector.rs index 2a0e151..5ee69f0 100644 --- a/p2p/monero-peer/src/client/conector.rs +++ b/p2p/monero-p2p/src/client/conector.rs @@ -47,7 +47,7 @@ where let mut handshaker = self.handshaker.clone(); async move { - let (peer_stream, peer_sink) = Z::connect_to_peer(req.addr.clone()).await?; + let (peer_stream, peer_sink) = Z::connect_to_peer(req.addr).await?; let req = DoHandshakeRequest { addr: req.addr, peer_stream, diff --git a/p2p/monero-peer/src/client/connection.rs b/p2p/monero-p2p/src/client/connection.rs similarity index 98% rename from p2p/monero-peer/src/client/connection.rs rename to p2p/monero-p2p/src/client/connection.rs index e8de44d..bb445ba 100644 --- a/p2p/monero-peer/src/client/connection.rs +++ b/p2p/monero-p2p/src/client/connection.rs @@ -41,7 +41,7 @@ impl State { LevinCommand::NewTransactions ) ), - _ => false, + _ => panic!("We are not in a state to be checking responses!"), } } } diff --git a/p2p/monero-peer/src/client/handshaker.rs b/p2p/monero-p2p/src/client/handshaker.rs similarity index 98% rename from p2p/monero-peer/src/client/handshaker.rs rename to p2p/monero-p2p/src/client/handshaker.rs index 4482cd3..d72e7ed 100644 --- a/p2p/monero-peer/src/client/handshaker.rs +++ b/p2p/monero-p2p/src/client/handshaker.rs @@ -31,7 +31,7 @@ pub enum HandshakeError { #[error("peer is on a different network")] IncorrectNetwork, #[error("peer sent a peer list with peers from different zones")] - PeerSentIncorrectZonePeerList(#[from] crate::NetworkAddressIncorrectZone), + PeerSentIncorrectPeerList(#[from] crate::services::PeerListConversionError), #[error("peer sent invalid message: {0}")] PeerSentInvalidMessage(&'static str), #[error("Levin bucket error: {0}")] @@ -234,7 +234,9 @@ where .address_book .ready() .await? - .call(AddressBookRequest::GetPeers(MAX_PEERS_IN_PEER_LIST_MESSAGE)) + .call(AddressBookRequest::GetWhitePeers( + MAX_PEERS_IN_PEER_LIST_MESSAGE, + )) .await? else { panic!("Address book sent incorrect response"); diff --git a/p2p/monero-peer/src/error.rs b/p2p/monero-p2p/src/error.rs similarity index 100% rename from p2p/monero-peer/src/error.rs rename to p2p/monero-p2p/src/error.rs diff --git a/p2p/monero-p2p/src/handles.rs b/p2p/monero-p2p/src/handles.rs new file mode 100644 index 0000000..912726e --- /dev/null +++ b/p2p/monero-p2p/src/handles.rs @@ -0,0 +1,95 @@ +//! +use std::time::Duration; + +use futures::{channel::mpsc, SinkExt}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio_util::sync::CancellationToken; + +#[derive(Default, Debug)] +pub struct HandleBuilder { + permit: Option, +} + +impl HandleBuilder { + pub fn with_permit(mut self, permit: OwnedSemaphorePermit) -> Self { + self.permit = Some(permit); + self + } + + pub fn build(self) -> (ConnectionGuard, ConnectionHandle, PeerHandle) { + let token = CancellationToken::new(); + let (tx, rx) = mpsc::channel(0); + + ( + ConnectionGuard { + token: token.clone(), + permit: self.permit.expect("connection permit was not set!"), + }, + ConnectionHandle { + token: token.clone(), + ban: rx, + }, + PeerHandle { ban: tx, token }, + ) + } +} + +pub struct BanPeer(pub Duration); + +/// A struct given to the connection task. +pub struct ConnectionGuard { + token: CancellationToken, + permit: OwnedSemaphorePermit, +} + +impl ConnectionGuard { + pub fn should_shutdown(&self) -> bool { + self.token.is_cancelled() + } + pub fn connection_closed(&self) { + self.token.cancel() + } +} + +impl Drop for ConnectionGuard { + fn drop(&mut self) { + self.token.cancel() + } +} + +/// A handle given to a task that needs to close this connection and find out if the connection has +/// been banned. +pub struct ConnectionHandle { + token: CancellationToken, + ban: mpsc::Receiver, +} + +impl ConnectionHandle { + pub fn is_closed(&self) -> bool { + self.token.is_cancelled() + } + pub fn check_should_ban(&mut self) -> Option { + match self.ban.try_next() { + Ok(res) => res, + Err(_) => None, + } + } + pub fn send_close_signal(&self) { + self.token.cancel() + } +} + +/// A handle given to a task that needs to be able to ban a peer. +#[derive(Clone)] +pub struct PeerHandle { + token: CancellationToken, + ban: mpsc::Sender, +} + +impl PeerHandle { + pub fn ban_peer(&mut self, duration: Duration) { + // This channel won't be dropped and if it's full the peer has already been banned. + let _ = self.ban.try_send(BanPeer(duration)); + self.token.cancel() + } +} diff --git a/p2p/monero-peer/src/lib.rs b/p2p/monero-p2p/src/lib.rs similarity index 60% rename from p2p/monero-peer/src/lib.rs rename to p2p/monero-p2p/src/lib.rs index 25fe966..201a1b5 100644 --- a/p2p/monero-peer/src/lib.rs +++ b/p2p/monero-p2p/src/lib.rs @@ -1,6 +1,6 @@ #![allow(unused)] -use std::{future::Future, pin::Pin}; +use std::{fmt::Debug, future::Future, hash::Hash, pin::Pin}; use futures::{Sink, Stream}; @@ -10,6 +10,7 @@ use monero_wire::{ pub mod client; pub mod error; +pub mod handles; pub mod network_zones; pub mod protocol; pub mod services; @@ -26,9 +27,63 @@ pub enum ConnectionDirection { OutBound, } +#[cfg(not(feature = "borsh"))] +pub trait NetZoneAddress: + TryFrom + + Into + + std::fmt::Display + + Hash + + Eq + + Clone + + Copy + + Send + + Unpin + + 'static +{ + /// Cuprate needs to be able to ban peers by IP addresses and not just by SocketAddr as + /// that include the port, to be able to facilitate this network addresses must have a ban ID + /// which for hidden services could just be the address it self but for clear net addresses will + /// be the IP address. + /// TODO: IP zone banning? + type BanID: Debug + Hash + Eq + Clone + Copy + Send + 'static; + + fn ban_id(&self) -> Self::BanID; + + fn should_add_to_peer_list(&self) -> bool; +} + +#[cfg(feature = "borsh")] +pub trait NetZoneAddress: + TryFrom + + Into + + std::fmt::Display + + borsh::BorshSerialize + + borsh::BorshDeserialize + + Hash + + Eq + + Clone + + Copy + + Send + + Unpin + + 'static +{ + /// Cuprate needs to be able to ban peers by IP addresses and not just by SocketAddr as + /// that include the port, to be able to facilitate this network addresses must have a ban ID + /// which for hidden services could just be the address it self but for clear net addresses will + /// be the IP address. + /// TODO: IP zone banning? + type BanID: Debug + Hash + Eq + Clone + Copy + Send + 'static; + + fn ban_id(&self) -> Self::BanID; + + fn should_add_to_peer_list(&self) -> bool; +} + /// An abstraction over a network zone (tor/i2p/clear) #[async_trait::async_trait] -pub trait NetworkZone: Clone + Send + 'static { +pub trait NetworkZone: Clone + Copy + Send + 'static { + /// The network name. + const NAME: &'static str; /// Allow syncing over this network. /// /// Not recommended for anonymity networks. @@ -44,12 +99,8 @@ pub trait NetworkZone: Clone + Send + 'static { const CHECK_NODE_ID: bool; /// The address type of this network. - type Addr: TryFrom - + Into - + std::fmt::Display - + Clone - + Send - + 'static; + type Addr: NetZoneAddress; + /// The stream (incoming data) type for this network. type Stream: Stream> + Unpin + Send + 'static; /// The sink (outgoing data) type for this network. @@ -69,33 +120,26 @@ pub(crate) trait AddressBook: AddressBookRequest, Response = AddressBookResponse, Error = tower::BoxError, - Future = Pin< - Box< - dyn Future, tower::BoxError>> - + Send - + 'static, - >, - >, + Future = Self::Future2, > + Send + 'static { + // This allows us to put more restrictive bounds on the future without defining the future here + // explicitly. + type Future2: Future> + Send + 'static; } -impl AddressBook for T where +impl AddressBook for T +where T: tower::Service< AddressBookRequest, Response = AddressBookResponse, Error = tower::BoxError, - Future = Pin< - Box< - dyn Future, tower::BoxError>> - + Send - + 'static, - >, - >, > + Send - + 'static + + 'static, + T::Future: Future> + Send + 'static, { + type Future2 = T::Future; } pub(crate) trait CoreSyncSvc: diff --git a/p2p/monero-peer/src/network_zones.rs b/p2p/monero-p2p/src/network_zones.rs similarity index 100% rename from p2p/monero-peer/src/network_zones.rs rename to p2p/monero-p2p/src/network_zones.rs diff --git a/p2p/monero-peer/src/network_zones/clear.rs b/p2p/monero-p2p/src/network_zones/clear.rs similarity index 74% rename from p2p/monero-peer/src/network_zones/clear.rs rename to p2p/monero-p2p/src/network_zones/clear.rs index cc35285..5f4f3f1 100644 --- a/p2p/monero-peer/src/network_zones/clear.rs +++ b/p2p/monero-p2p/src/network_zones/clear.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use monero_wire::MoneroWireCodec; @@ -8,15 +8,28 @@ use tokio::net::{ }; use tokio_util::codec::{FramedRead, FramedWrite}; -use crate::NetworkZone; +use crate::{NetZoneAddress, NetworkZone}; -#[derive(Clone)] +impl NetZoneAddress for SocketAddr { + type BanID = IpAddr; + + fn ban_id(&self) -> Self::BanID { + self.ip() + } + + fn should_add_to_peer_list(&self) -> bool { + todo!() + } +} + +#[derive(Clone, Copy)] pub struct ClearNet; pub struct ClearNetServerCfg {} #[async_trait::async_trait] impl NetworkZone for ClearNet { + const NAME: &'static str = "ClearNet"; const ALLOW_SYNC: bool = true; const DANDELION_PP: bool = true; const CHECK_NODE_ID: bool = true; diff --git a/p2p/monero-peer/src/protocol.rs b/p2p/monero-p2p/src/protocol.rs similarity index 100% rename from p2p/monero-peer/src/protocol.rs rename to p2p/monero-p2p/src/protocol.rs diff --git a/p2p/monero-peer/src/protocol/try_from.rs b/p2p/monero-p2p/src/protocol/try_from.rs similarity index 100% rename from p2p/monero-peer/src/protocol/try_from.rs rename to p2p/monero-p2p/src/protocol/try_from.rs diff --git a/p2p/monero-p2p/src/services.rs b/p2p/monero-p2p/src/services.rs new file mode 100644 index 0000000..9a1a254 --- /dev/null +++ b/p2p/monero-p2p/src/services.rs @@ -0,0 +1,107 @@ +use cuprate_common::{PruningError, PruningSeed}; +use monero_wire::{NetZone, NetworkAddress, PeerListEntryBase}; + +use crate::{ + client::InternalPeerID, handles::ConnectionHandle, NetZoneAddress, NetworkAddressIncorrectZone, + NetworkZone, +}; + +pub enum CoreSyncDataRequest { + Ours, + HandleIncoming(monero_wire::CoreSyncData), +} + +pub enum CoreSyncDataResponse { + Ours(monero_wire::CoreSyncData), + Ok, +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +#[cfg_attr( + feature = "borsh", + derive(borsh::BorshSerialize, borsh::BorshDeserialize) +)] +pub struct ZoneSpecificPeerListEntryBase { + pub adr: A, + pub id: u64, + pub last_seen: i64, + pub pruning_seed: PruningSeed, + pub rpc_port: u16, + pub rpc_credits_per_hash: u32, +} + +impl From> for monero_wire::PeerListEntryBase { + fn from(value: ZoneSpecificPeerListEntryBase) -> Self { + Self { + adr: value.adr.into(), + id: value.id, + last_seen: value.last_seen, + pruning_seed: value.pruning_seed.into(), + rpc_port: value.rpc_port, + rpc_credits_per_hash: value.rpc_credits_per_hash, + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum PeerListConversionError { + #[error("Address is in incorrect zone")] + Address(#[from] NetworkAddressIncorrectZone), + #[error("Pruning seed error: {0}")] + PruningSeed(#[from] PruningError), +} + +impl TryFrom + for ZoneSpecificPeerListEntryBase +{ + type Error = PeerListConversionError; + + fn try_from(value: PeerListEntryBase) -> Result { + Ok(Self { + adr: value.adr.try_into()?, + id: value.id, + last_seen: value.last_seen, + pruning_seed: PruningSeed::try_from(value.pruning_seed)?, + rpc_port: value.rpc_port, + rpc_credits_per_hash: value.rpc_credits_per_hash, + }) + } +} + +pub enum AddressBookRequest { + NewConnection { + addr: Option, + internal_peer_id: InternalPeerID, + handle: ConnectionHandle, + id: u64, + pruning_seed: PruningSeed, + /// The peers port. + rpc_port: u16, + /// The peers rpc credits per hash + rpc_credits_per_hash: u32, + }, + /// Bans a peer for the specified duration. This request + /// will send disconnect signals to all peers with the same + /// address. + BanPeer(Z::Addr, std::time::Duration), + IncomingPeerList(Vec>), + /// Gets a random white peer from the peer list. If height is specified + /// then the peer list should retrieve a peer that should have a full + /// block at that height according to it's pruning seed + GetRandomWhitePeer { + height: Option, + }, + /// Gets a random gray peer from the peer list. If height is specified + /// then the peer list should retrieve a peer that should have a full + /// block at that height according to it's pruning seed + GetRandomGrayPeer { + height: Option, + }, + GetWhitePeers(usize), +} + +pub enum AddressBookResponse { + Ok, + Peer(ZoneSpecificPeerListEntryBase), + Peers(Vec>), +} diff --git a/p2p/monero-p2p/tests/handles.rs b/p2p/monero-p2p/tests/handles.rs new file mode 100644 index 0000000..be62db6 --- /dev/null +++ b/p2p/monero-p2p/tests/handles.rs @@ -0,0 +1,64 @@ +use std::{sync::Arc, time::Duration}; + +use tokio::sync::Semaphore; + +use monero_p2p::handles::HandleBuilder; + +#[test] +fn send_ban_signal() { + let semaphore = Arc::new(Semaphore::new(5)); + let (guard, mut connection_handle, mut peer_handle) = HandleBuilder::default() + .with_permit(semaphore.try_acquire_owned().unwrap()) + .build(); + + peer_handle.ban_peer(Duration::from_secs(300)); + + let Some(ban_time) = connection_handle.check_should_ban() else { + panic!("ban signal not received!"); + }; + + assert_eq!(ban_time.0, Duration::from_secs(300)); + + connection_handle.send_close_signal(); + assert!(guard.should_shutdown()); + + guard.connection_closed(); + assert!(connection_handle.is_closed()); +} + +#[test] +fn multiple_ban_signals() { + let semaphore = Arc::new(Semaphore::new(5)); + let (guard, mut connection_handle, mut peer_handle) = HandleBuilder::default() + .with_permit(semaphore.try_acquire_owned().unwrap()) + .build(); + + peer_handle.ban_peer(Duration::from_secs(300)); + peer_handle.ban_peer(Duration::from_secs(301)); + peer_handle.ban_peer(Duration::from_secs(302)); + + let Some(ban_time) = connection_handle.check_should_ban() else { + panic!("ban signal not received!"); + }; + + // only the first will be seen + assert_eq!(ban_time.0, Duration::from_secs(300)); + + connection_handle.send_close_signal(); + assert!(guard.should_shutdown()); + + guard.connection_closed(); + assert!(connection_handle.is_closed()); +} + +#[test] +fn dropped_guard_sends_disconnect_signal() { + let semaphore = Arc::new(Semaphore::new(5)); + let (guard, connection_handle, _) = HandleBuilder::default() + .with_permit(semaphore.try_acquire_owned().unwrap()) + .build(); + + assert!(!connection_handle.is_closed()); + drop(guard); + assert!(connection_handle.is_closed()); +} diff --git a/p2p/monero-peer/tests/handshake.rs b/p2p/monero-p2p/tests/handshake.rs similarity index 99% rename from p2p/monero-peer/tests/handshake.rs rename to p2p/monero-p2p/tests/handshake.rs index 846ae86..8385c55 100644 --- a/p2p/monero-peer/tests/handshake.rs +++ b/p2p/monero-p2p/tests/handshake.rs @@ -6,7 +6,7 @@ use tower::{Service, ServiceExt}; use cuprate_common::Network; use monero_wire::{common::PeerSupportFlags, BasicNodeData}; -use monero_peer::{ +use monero_p2p::{ client::{ConnectRequest, Connector, DoHandshakeRequest, HandShaker}, network_zones::ClearNet, ConnectionDirection, diff --git a/p2p/monero-peer/tests/utils.rs b/p2p/monero-p2p/tests/utils.rs similarity index 96% rename from p2p/monero-peer/tests/utils.rs rename to p2p/monero-p2p/tests/utils.rs index da52480..372b7ea 100644 --- a/p2p/monero-peer/tests/utils.rs +++ b/p2p/monero-p2p/tests/utils.rs @@ -7,7 +7,7 @@ use std::{ use futures::FutureExt; use tower::Service; -use monero_peer::{ +use monero_p2p::{ services::{ AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse, }, @@ -30,7 +30,7 @@ impl Service> for DummyAddressBook { fn call(&mut self, req: AddressBookRequest) -> Self::Future { async move { Ok(match req { - AddressBookRequest::GetPeers(_) => AddressBookResponse::Peers(vec![]), + AddressBookRequest::GetWhitePeers(_) => AddressBookResponse::Peers(vec![]), _ => AddressBookResponse::Ok, }) } diff --git a/p2p/monero-peer/src/client.rs b/p2p/monero-peer/src/client.rs deleted file mode 100644 index 87d63a3..0000000 --- a/p2p/monero-peer/src/client.rs +++ /dev/null @@ -1,6 +0,0 @@ -mod conector; -mod connection; -pub mod handshaker; - -pub use conector::{ConnectRequest, Connector}; -pub use handshaker::{DoHandshakeRequest, HandShaker, HandshakeError}; diff --git a/p2p/monero-peer/src/services.rs b/p2p/monero-peer/src/services.rs deleted file mode 100644 index db1187f..0000000 --- a/p2p/monero-peer/src/services.rs +++ /dev/null @@ -1,61 +0,0 @@ -use monero_wire::PeerListEntryBase; - -use crate::{NetworkAddressIncorrectZone, NetworkZone}; - -pub enum CoreSyncDataRequest { - Ours, - HandleIncoming(monero_wire::CoreSyncData), -} - -pub enum CoreSyncDataResponse { - Ours(monero_wire::CoreSyncData), - Ok, -} - -pub struct ZoneSpecificPeerListEntryBase { - pub adr: Z::Addr, - pub id: u64, - pub last_seen: i64, - pub pruning_seed: u32, - pub rpc_port: u16, - pub rpc_credits_per_hash: u32, -} - -impl From> for monero_wire::PeerListEntryBase { - fn from(value: ZoneSpecificPeerListEntryBase) -> Self { - Self { - adr: value.adr.into(), - id: value.id, - last_seen: value.last_seen, - pruning_seed: value.pruning_seed, - rpc_port: value.rpc_port, - rpc_credits_per_hash: value.rpc_credits_per_hash, - } - } -} - -impl TryFrom for ZoneSpecificPeerListEntryBase { - type Error = NetworkAddressIncorrectZone; - - fn try_from(value: PeerListEntryBase) -> Result { - Ok(Self { - adr: value.adr.try_into()?, - id: value.id, - last_seen: value.last_seen, - pruning_seed: value.pruning_seed, - rpc_port: value.rpc_port, - rpc_credits_per_hash: value.rpc_credits_per_hash, - }) - } -} - -pub enum AddressBookRequest { - NewConnection(Z::Addr, ZoneSpecificPeerListEntryBase), - IncomingPeerList(Vec>), - GetPeers(usize), -} - -pub enum AddressBookResponse { - Ok, - Peers(Vec>), -} diff --git a/p2p/src/address_book/address_book/peer_list.rs b/p2p/src/address_book/address_book/peer_list.rs index de8a17e..e852100 100644 --- a/p2p/src/address_book/address_book/peer_list.rs +++ b/p2p/src/address_book/address_book/peer_list.rs @@ -62,15 +62,7 @@ impl PeerList { pub fn len(&self) -> usize { self.peers.len() } - - /// Gets the amount of peers with a specific seed. - pub fn len_by_seed(&self, pruning_seed: &u32) -> usize { - self.pruning_idxs - .get(pruning_seed) - .map(|indexes| indexes.len()) - .unwrap_or(0) - } - + /// Adds a new peer to the peer list pub fn add_new_peer(&mut self, peer: PeerListEntryBase) { if let None = self.peers.insert(peer.adr, peer) { diff --git a/p2p/src/connection_counter.rs b/p2p/src/connection_counter.rs index 922b47f..c8cc840 100644 --- a/p2p/src/connection_counter.rs +++ b/p2p/src/connection_counter.rs @@ -1,8 +1,3 @@ -//! Counting active connections used by Cuprate. -//! -//! These types can be used to count any kind of active resource. -//! But they are currently used to track the number of open connections. - use std::{fmt, sync::Arc}; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; @@ -127,4 +122,4 @@ impl Drop for ConnectionTracker { ); // the permit is automatically dropped } -} \ No newline at end of file +} diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index 8096b09..30ef638 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -5,7 +5,9 @@ edition = "2021" [dependencies] monero-wire = {path = "../net/monero-wire"} -monero-peer = {path = "../p2p/monero-peer"} +monero-p2p = {path = "../p2p/monero-p2p" } futures = "0.3.29" -async-trait = "0.1.74" \ No newline at end of file +async-trait = "0.1.74" + +borsh = {version = "1.2.1", features = ["derive"]} \ No newline at end of file diff --git a/test-utils/src/test_netzone.rs b/test-utils/src/test_netzone.rs index 34e3b56..2ee6d34 100644 --- a/test-utils/src/test_netzone.rs +++ b/test-utils/src/test_netzone.rs @@ -6,6 +6,7 @@ use std::{ task::{Context, Poll}, }; +use borsh::{BorshDeserialize, BorshSerialize}; use futures::{channel::mpsc::Sender as InnerSender, stream::BoxStream, Sink}; use monero_wire::{ @@ -13,11 +14,23 @@ use monero_wire::{ BucketError, Message, }; -use monero_peer::NetworkZone; +use monero_p2p::{NetZoneAddress, NetworkZone}; -#[derive(Clone)] +#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, BorshSerialize, BorshDeserialize)] pub struct TestNetZoneAddr(pub u32); +impl NetZoneAddress for TestNetZoneAddr { + type BanID = Self; + + fn ban_id(&self) -> Self::BanID { + *self + } + + fn should_add_to_peer_list(&self) -> bool { + true + } +} + impl std::fmt::Display for TestNetZoneAddr { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str(format!("test client, id: {}", self.0).as_str()) @@ -83,13 +96,14 @@ impl Sink for Sender { } } -#[derive(Clone)] +#[derive(Debug, Clone, Copy, Eq, PartialEq)] pub struct TestNetZone; #[async_trait::async_trait] impl NetworkZone for TestNetZone { + const NAME: &'static str = "Testing"; const ALLOW_SYNC: bool = ALLOW_SYNC; const DANDELION_PP: bool = DANDELION_PP; const CHECK_NODE_ID: bool = CHECK_NODE_ID;