move address book to separate crate.

Also changes the address book to use the network zone trait
This commit is contained in:
Boog900 2023-12-08 15:03:01 +00:00
parent 34dd105a0c
commit 2c4cc1fb93
No known key found for this signature in database
GPG key ID: 5401367FB7302004
32 changed files with 1645 additions and 129 deletions

View file

@ -9,17 +9,20 @@ members = [
# "database", # "database",
"net/levin", "net/levin",
"net/monero-wire", "net/monero-wire",
"p2p/monero-peer", "p2p/monero-p2p",
"p2p/address-book",
"test-utils" "test-utils"
] ]
[profile.release] [profile.release]
panic = 'abort'
lto = true # Build with LTO lto = true # Build with LTO
strip = "none" # Keep panic stack traces strip = "none" # Keep panic stack traces
codegen-units = 1 # Optimize for binary speed over compile times codegen-units = 1 # Optimize for binary speed over compile times
opt-level = 3 opt-level = 3
[profile.dev] [profile.dev]
panic = 'abort'
lto = false lto = false
strip = "none" strip = "none"
# Not much slower compile times than opt-level 0, but much faster code. # Not much slower compile times than opt-level 0, but much faster code.

View file

@ -5,6 +5,9 @@ edition = "2021"
license = "AGPL-3.0-only" license = "AGPL-3.0-only"
authors = ["Boog900"] authors = ["Boog900"]
[features]
default = []
borsh = ["dep:borsh"]
[dependencies] [dependencies]
chrono = "0.4.24" chrono = "0.4.24"
@ -12,3 +15,5 @@ thiserror = "1"
hex = "0.4" hex = "0.4"
futures = "0.3.29" futures = "0.3.29"
borsh = {version = "1.2.1", default-features = false, features = ["derive",], optional = true }

View file

@ -18,6 +18,8 @@
//! ``` //! ```
//! //!
use std::cmp::Ordering;
use thiserror::Error; use thiserror::Error;
use super::{ use super::{
@ -50,9 +52,33 @@ pub enum PruningError {
/// ///
// Internally we use an Option<u32> to represent if a pruning seed is 0 (None)which means // Internally we use an Option<u32> to represent if a pruning seed is 0 (None)which means
// no pruning will take place. // no pruning will take place.
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
#[cfg_attr(
feature = "borsh",
derive(borsh::BorshSerialize, borsh::BorshDeserialize)
)]
pub struct PruningSeed(Option<u32>); pub struct PruningSeed(Option<u32>);
impl PartialOrd for PruningSeed {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for PruningSeed {
fn cmp(&self, other: &Self) -> Ordering {
match (self.get_log_stripes(), other.get_log_stripes()) {
(None, None) => Ordering::Equal,
(None, Some(_)) => Ordering::Greater,
(Some(_), None) => Ordering::Less,
(Some(stripe_s), Some(stripe_o)) => match stripe_s.cmp(&stripe_o) {
Ordering::Equal => self.get_stripe().unwrap().cmp(&other.get_stripe().unwrap()),
ordering => ordering,
},
}
}
}
impl PruningSeed { impl PruningSeed {
/// Creates a new pruning seed from a `stripe` and `log_stripes` /// Creates a new pruning seed from a `stripe` and `log_stripes`
/// ///
@ -240,6 +266,12 @@ impl TryFrom<u32> for PruningSeed {
} }
} }
impl From<PruningSeed> for u32 {
fn from(value: PruningSeed) -> Self {
value.0.unwrap_or(0)
}
}
fn get_block_pruning_stripe( fn get_block_pruning_stripe(
block_height: u64, block_height: u64,
blockchain_height: u64, blockchain_height: u64,

View file

@ -0,0 +1,30 @@
[package]
name = "monero-address-book"
version = "0.1.0"
edition = "2021"
license = "MIT"
authors = ["Boog900"]
[dependencies]
cuprate-common = {path = "../../common"}
monero-wire = {path= "../../net/monero-wire"}
monero-p2p = {path = "../monero-p2p" }
tower = { version= "0.4.13", features = ["util", "buffer"] }
tokio = {version = "1.34.0", default-features = false, features = ["time", "fs", "rt"]}
futures = "0.3.29"
pin-project = "1.1.3"
async-trait = "0.1.74"
thiserror = "1.0.50"
tracing = "0.1.40"
rand = "0.8.5"
borsh = {version = "1.2.1", features = ["derive"]}
[dev-dependencies]
tokio = {version = "1.34.0", features = ["rt-multi-thread", "macros"]}
cuprate-test-utils = {path = "../../test-utils"}

View file

@ -0,0 +1,424 @@
use std::{
collections::{HashMap, HashSet},
future::Future,
panic,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use pin_project::pin_project;
use tokio::{
task::JoinHandle,
time::{interval, sleep, Interval, MissedTickBehavior, Sleep},
};
use tower::Service;
use cuprate_common::{tower_utils::InstaFuture, PruningSeed};
use monero_p2p::{
client::InternalPeerID,
handles::ConnectionHandle,
services::{AddressBookRequest, AddressBookResponse, ZoneSpecificPeerListEntryBase},
NetZoneAddress, NetworkZone,
};
use crate::{peer_list::PeerList, store::save_peers_to_disk, AddressBookError, Config};
#[cfg(test)]
mod tests;
/// An entry in the connected list.
pub struct ConnectionPeerEntry<Z: NetworkZone> {
addr: Option<Z::Addr>,
id: u64,
handle: ConnectionHandle,
/// The peers pruning seed
pruning_seed: PruningSeed,
/// The peers port.
rpc_port: u16,
/// The peers rpc credits per hash
rpc_credits_per_hash: u32,
}
/// A future that resolves when a peer is unbanned.
#[pin_project(project = EnumProj)]
pub struct BanedPeerFut<Addr: NetZoneAddress>(Addr::BanID, #[pin] Sleep);
impl<Addr: NetZoneAddress> Future for BanedPeerFut<Addr> {
type Output = Addr::BanID;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
match this.1.poll_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(*this.0),
}
}
}
pub struct AddressBook<Z: NetworkZone> {
/// Our white peers - the peers we have previously connected to.
white_list: PeerList<Z>,
/// Our gray peers - the peers we have been told about but haven't connected to.
gray_list: PeerList<Z>,
/// Our anchor peers - on start up will contain a list of peers we were connected to before shutting down
/// after that will contain a list of peers currently connected to that we can reach.
anchor_list: HashSet<Z::Addr>,
/// The currently connected peers.
connected_peers: HashMap<InternalPeerID<Z::Addr>, ConnectionPeerEntry<Z>>,
connected_peers_ban_id: HashMap<<Z::Addr as NetZoneAddress>::BanID, HashSet<Z::Addr>>,
/// The currently banned peers
banned_peers: HashSet<<Z::Addr as NetZoneAddress>::BanID>,
banned_peers_fut: FuturesUnordered<BanedPeerFut<Z::Addr>>,
peer_save_task_handle: Option<JoinHandle<std::io::Result<()>>>,
peer_save_interval: Interval,
cfg: Config,
}
impl<Z: NetworkZone> AddressBook<Z> {
pub fn new(
cfg: Config,
white_peers: Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>,
gray_peers: Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>,
anchor_peers: Vec<Z::Addr>,
) -> Self {
let white_list = PeerList::new(white_peers);
let gray_list = PeerList::new(gray_peers);
let anchor_list = HashSet::from_iter(anchor_peers);
// TODO: persist banned peers
let banned_peers = HashSet::new();
let banned_peers_fut = FuturesUnordered::new();
let connected_peers = HashMap::new();
let mut peer_save_interval = interval(cfg.peer_save_period);
peer_save_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
Self {
white_list,
gray_list,
anchor_list,
connected_peers,
connected_peers_ban_id: HashMap::new(),
banned_peers,
banned_peers_fut,
peer_save_task_handle: None,
peer_save_interval,
cfg,
}
}
fn poll_save_to_disk(&mut self, cx: &mut Context<'_>) {
if let Some(handle) = &mut self.peer_save_task_handle {
// if we have already spawned a task to save the peer list wait for that to complete.
match handle.poll_unpin(cx) {
Poll::Pending => return,
Poll::Ready(Ok(Err(e))) => {
tracing::error!("Could not save peer list to disk, got error: {}", e)
}
Poll::Ready(Err(e)) => {
if e.is_panic() {
panic::resume_unwind(e.into_panic())
}
}
_ => (),
}
}
// the task is finished.
self.peer_save_task_handle = None;
let Poll::Ready(_) = self.peer_save_interval.poll_tick(cx) else {
return;
};
self.peer_save_task_handle = Some(save_peers_to_disk(
&self.cfg,
&self.white_list,
&self.gray_list,
));
}
fn poll_unban_peers(&mut self, cx: &mut Context<'_>) {
while let Poll::Ready(Some(ban_id)) = self.banned_peers_fut.poll_next_unpin(cx) {
self.banned_peers.remove(&ban_id);
}
}
fn poll_connected_peers(&mut self) {
let mut internal_addr_disconnected = Vec::new();
let mut addrs_to_ban = Vec::new();
for (internal_addr, peer) in &mut self.connected_peers {
if let Some(time) = peer.handle.check_should_ban() {
match internal_addr {
InternalPeerID::KnownAddr(addr) => addrs_to_ban.push((*addr, time.0)),
// If we don't know the peers address all we can do is disconnect.
InternalPeerID::Unknown(_) => peer.handle.send_close_signal(),
}
}
if peer.handle.is_closed() {
internal_addr_disconnected.push(*internal_addr);
}
}
for (addr, time) in addrs_to_ban.into_iter() {
self.ban_peer(addr, time);
}
for disconnected_addr in internal_addr_disconnected {
self.connected_peers.remove(&disconnected_addr);
if let InternalPeerID::KnownAddr(addr) = disconnected_addr {
// remove the peer from the connected peers with this ban ID.
self.connected_peers_ban_id
.get_mut(&addr.ban_id())
.unwrap()
.remove(&addr);
// If the amount of peers with this ban id is 0 remove the whole set.
if self
.connected_peers_ban_id
.get(&addr.ban_id())
.unwrap()
.is_empty()
{
self.connected_peers_ban_id.remove(&addr.ban_id());
}
// remove the peer from the anchor list.
self.anchor_list.remove(&addr);
}
}
}
fn ban_peer(&mut self, addr: Z::Addr, time: Duration) {
if self.banned_peers.contains(&addr.ban_id()) {
return;
}
if let Some(connected_peers_with_ban_id) = self.connected_peers_ban_id.get(&addr.ban_id()) {
for peer in connected_peers_with_ban_id.iter().map(|addr| {
tracing::debug!("Banning peer: {}, for: {:?}", addr, time);
self.connected_peers
.get(&InternalPeerID::KnownAddr(*addr))
.expect("Peer must be in connected list if in connected_peers_with_ban_id")
}) {
// The peer will get removed from our connected list once we disconnect
peer.handle.send_close_signal();
// Remove the peer now from anchors so we don't accidentally persist a bad anchor peer to disk.
self.anchor_list.remove(&addr);
}
}
self.white_list.remove_peers_with_ban_id(&addr.ban_id());
self.gray_list.remove_peers_with_ban_id(&addr.ban_id());
self.banned_peers.insert(addr.ban_id());
self.banned_peers_fut
.push(BanedPeerFut(addr.ban_id(), sleep(time)))
}
/// adds a peer to the gray list.
fn add_peer_to_gray_list(&mut self, mut peer: ZoneSpecificPeerListEntryBase<Z::Addr>) {
if self.white_list.contains_peer(&peer.adr) {
return;
};
if !self.gray_list.contains_peer(&peer.adr) {
peer.last_seen = 0;
self.gray_list.add_new_peer(peer);
}
}
/// Checks if a peer is banned.
fn is_peer_banned(&self, peer: &Z::Addr) -> bool {
self.banned_peers.contains(&peer.ban_id())
}
fn handle_incoming_peer_list(
&mut self,
mut peer_list: Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>,
) {
tracing::debug!("Received new peer list, length: {}", peer_list.len());
peer_list.retain(|peer| {
if !peer.adr.should_add_to_peer_list() {
false
} else {
!self.is_peer_banned(&peer.adr)
}
// TODO: check rpc/ p2p ports not the same
});
for peer in peer_list {
self.add_peer_to_gray_list(peer);
}
// The gray list has no peers we need to keep in the list so just pass an empty HashSet.
self.gray_list
.reduce_list(&HashSet::new(), self.cfg.max_gray_list_length);
}
fn get_random_white_peer(
&self,
block_needed: Option<u64>,
) -> Option<ZoneSpecificPeerListEntryBase<Z::Addr>> {
tracing::debug!("Retrieving random white peer");
self.white_list
.get_random_peer(&mut rand::thread_rng(), block_needed)
.copied()
}
fn get_random_gray_peer(
&self,
block_needed: Option<u64>,
) -> Option<ZoneSpecificPeerListEntryBase<Z::Addr>> {
tracing::debug!("Retrieving random gray peer");
self.gray_list
.get_random_peer(&mut rand::thread_rng(), block_needed)
.copied()
}
fn get_white_peers(&self, len: usize) -> Vec<ZoneSpecificPeerListEntryBase<Z::Addr>> {
tracing::debug!("Retrieving white peers, maximum: {}", len);
self.white_list
.get_random_peers(&mut rand::thread_rng(), len)
}
/// Updates an entry in the white list, if the peer is not found then
/// the peer will be added to the white list.
fn update_white_list_peer_entry(
&mut self,
peer: &ConnectionPeerEntry<Z>,
) -> Result<(), AddressBookError> {
let Some(addr) = &peer.addr else {
// If the peer isn't reachable we shouldn't add it too our address book.
return Ok(());
};
if let Some(peb) = self.white_list.get_peer_mut(addr) {
if peb.pruning_seed != peer.pruning_seed {
return Err(AddressBookError::PeersDataChanged("Pruning seed"));
}
if Z::CHECK_NODE_ID && peb.id != peer.id {
return Err(AddressBookError::PeersDataChanged("peer ID"));
}
// TODO: cuprate doesn't need last seen timestamps but should we have them anyway?
peb.last_seen = 0;
peb.rpc_port = peer.rpc_port;
peb.rpc_credits_per_hash = peer.rpc_credits_per_hash;
} else {
// if the peer is reachable add it to our white list
let peb = ZoneSpecificPeerListEntryBase {
id: peer.id,
adr: *addr,
last_seen: 0,
rpc_port: peer.rpc_port,
rpc_credits_per_hash: peer.rpc_credits_per_hash,
pruning_seed: peer.pruning_seed,
};
self.white_list.add_new_peer(peb);
}
Ok(())
}
fn handle_new_connection(
&mut self,
internal_peer_id: InternalPeerID<Z::Addr>,
peer: ConnectionPeerEntry<Z>,
) -> Result<(), AddressBookError> {
if self.connected_peers.contains_key(&internal_peer_id) {
return Err(AddressBookError::PeerAlreadyConnected);
}
// If we know the address then check if it's banned.
if let InternalPeerID::KnownAddr(addr) = &internal_peer_id {
if self.is_peer_banned(addr) {
return Err(AddressBookError::PeerIsBanned);
}
// although the peer may not be readable still add it to the connected peers with ban ID.
self.connected_peers_ban_id
.entry(addr.ban_id())
.or_default()
.insert(*addr);
}
// if the address is Some that means we can reach it from our node.
if let Some(addr) = peer.addr {
// remove the peer from the gray list as we know it's active.
self.gray_list.remove_peer(&addr);
// The peer is reachable, update our white list and add it to the anchor connections.
self.update_white_list_peer_entry(&peer)?;
self.white_list
.reduce_list(&self.anchor_list, self.cfg.max_white_list_length);
self.anchor_list.insert(addr);
}
self.connected_peers.insert(internal_peer_id, peer);
Ok(())
}
}
impl<Z: NetworkZone> Service<AddressBookRequest<Z>> for AddressBook<Z> {
type Response = AddressBookResponse<Z>;
type Error = AddressBookError;
type Future = InstaFuture<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_unban_peers(cx);
self.poll_save_to_disk(cx);
self.poll_connected_peers();
Poll::Ready(Ok(()))
}
fn call(&mut self, req: AddressBookRequest<Z>) -> Self::Future {
let span = tracing::info_span!("AddressBook");
let _guard = span.enter();
let response = match req {
AddressBookRequest::NewConnection {
addr,
internal_peer_id,
handle,
id,
pruning_seed,
rpc_port,
rpc_credits_per_hash,
} => self
.handle_new_connection(
internal_peer_id,
ConnectionPeerEntry {
addr,
id,
handle,
pruning_seed,
rpc_port,
rpc_credits_per_hash,
},
)
.map(|_| AddressBookResponse::Ok),
AddressBookRequest::BanPeer(addr, time) => {
self.ban_peer(addr, time);
Ok(AddressBookResponse::Ok)
}
AddressBookRequest::IncomingPeerList(peer_list) => {
self.handle_incoming_peer_list(peer_list);
Ok(AddressBookResponse::Ok)
}
AddressBookRequest::GetRandomWhitePeer { height } => self
.get_random_white_peer(height)
.map(AddressBookResponse::Peer)
.ok_or(AddressBookError::PeerNotFound),
AddressBookRequest::GetRandomGrayPeer { height } => self
.get_random_gray_peer(height)
.map(AddressBookResponse::Peer)
.ok_or(AddressBookError::PeerNotFound),
AddressBookRequest::GetWhitePeers(len) => {
Ok(AddressBookResponse::Peers(self.get_white_peers(len)))
}
};
InstaFuture::from(response)
}
}

View file

@ -0,0 +1,149 @@
use std::{path::PathBuf, sync::Arc, time::Duration};
use futures::StreamExt;
use tokio::sync::Semaphore;
use tokio::time::interval;
use cuprate_common::PruningSeed;
use monero_p2p::handles::HandleBuilder;
use super::{AddressBook, ConnectionPeerEntry, InternalPeerID};
use crate::{peer_list::tests::make_fake_peer_list, AddressBookError, Config};
use cuprate_test_utils::test_netzone::{TestNetZone, TestNetZoneAddr};
fn test_cfg() -> Config {
Config {
max_white_list_length: 100,
max_gray_list_length: 500,
peer_store_file: PathBuf::new(),
peer_save_period: Duration::from_secs(60),
}
}
fn make_fake_address_book(
numb_white: u32,
numb_gray: u32,
) -> AddressBook<TestNetZone<true, true, true>> {
let white_list = make_fake_peer_list(0, numb_white);
let gray_list = make_fake_peer_list(numb_white, numb_gray);
AddressBook {
white_list,
gray_list,
anchor_list: Default::default(),
connected_peers: Default::default(),
connected_peers_ban_id: Default::default(),
banned_peers: Default::default(),
banned_peers_fut: Default::default(),
peer_save_task_handle: None,
peer_save_interval: interval(Duration::from_secs(60)),
cfg: test_cfg(),
}
}
#[tokio::test]
async fn get_random_peers() {
let address_book = make_fake_address_book(50, 250);
let peer = address_book.get_random_white_peer(None).unwrap();
assert!(address_book.white_list.contains_peer(&peer.adr));
assert!(!address_book.gray_list.contains_peer(&peer.adr));
let peer = address_book.get_random_gray_peer(None).unwrap();
assert!(!address_book.white_list.contains_peer(&peer.adr));
assert!(address_book.gray_list.contains_peer(&peer.adr));
}
#[tokio::test]
async fn get_white_peers() {
let address_book = make_fake_address_book(100, 0);
let peers = address_book.get_white_peers(50);
assert_eq!(peers.len(), 50);
let peers = address_book.get_white_peers(60);
assert_eq!(peers.len(), 60);
for window in peers.windows(2) {
assert_ne!(window[0], window[1]);
}
let address_book = make_fake_address_book(45, 0);
let peers = address_book.get_white_peers(50);
assert_eq!(peers.len(), 45);
let peers = address_book.get_white_peers(60);
assert_eq!(peers.len(), 45);
for window in peers.windows(2) {
assert_ne!(window[0], window[1]);
}
}
#[tokio::test]
async fn add_new_peer_already_connected() {
let mut address_book = make_fake_address_book(0, 0);
let semaphore = Arc::new(Semaphore::new(10));
let (_, handle, _) = HandleBuilder::default()
.with_permit(semaphore.clone().try_acquire_owned().unwrap())
.build();
address_book
.handle_new_connection(
InternalPeerID::KnownAddr(TestNetZoneAddr(1)),
ConnectionPeerEntry {
addr: None,
id: 0,
handle,
pruning_seed: PruningSeed::try_from(385).unwrap(),
rpc_port: 0,
rpc_credits_per_hash: 0,
},
)
.unwrap();
let (_, handle, _) = HandleBuilder::default()
.with_permit(semaphore.try_acquire_owned().unwrap())
.build();
assert_eq!(
address_book.handle_new_connection(
InternalPeerID::KnownAddr(TestNetZoneAddr(1)),
ConnectionPeerEntry {
addr: None,
id: 0,
handle,
pruning_seed: PruningSeed::try_from(385).unwrap(),
rpc_port: 0,
rpc_credits_per_hash: 0,
},
),
Err(AddressBookError::PeerAlreadyConnected)
)
}
#[tokio::test]
async fn banned_peer_removed_from_peer_lists() {
let mut address_book = make_fake_address_book(100, 0);
assert_eq!(address_book.banned_peers.len(), 0);
assert_eq!(address_book.white_list.len(), 100);
address_book.ban_peer(TestNetZoneAddr(1), Duration::from_secs(1));
assert_eq!(address_book.banned_peers.len(), 1);
assert_eq!(address_book.white_list.len(), 99);
address_book.ban_peer(TestNetZoneAddr(1), Duration::from_secs(1));
assert_eq!(address_book.banned_peers.len(), 1);
assert_eq!(address_book.white_list.len(), 99);
address_book.ban_peer(TestNetZoneAddr(1), Duration::from_secs(1));
assert_eq!(address_book.banned_peers.len(), 1);
assert_eq!(address_book.white_list.len(), 99);
address_book.ban_peer(TestNetZoneAddr(5), Duration::from_secs(100));
assert_eq!(address_book.banned_peers.len(), 2);
assert_eq!(address_book.white_list.len(), 98);
assert_eq!(
address_book.banned_peers_fut.next().await.unwrap(),
TestNetZoneAddr(1)
)
}

View file

@ -0,0 +1,76 @@
//! Cuprate Address Book
//!
//! This module holds the logic for persistent peer storage.
//! Cuprates address book is modeled as a [`tower::Service`]
//! The request is [`AddressBookRequest`] and the response is
//! [`AddressBookResponse`].
//!
//! Cuprate, like monerod, actually has multiple address books, one
//! for each [`NetworkZone`]. This is to reduce the possibility of
//! clear net peers getting linked to their dark counterparts
//! and so peers will only get told about peers they can
//! connect to.
//!
use std::{path::PathBuf, time::Duration};
use monero_p2p::{
services::{AddressBookRequest, AddressBookResponse},
NetworkZone,
};
mod book;
mod peer_list;
mod store;
#[derive(Debug, Clone)]
pub struct Config {
max_white_list_length: usize,
max_gray_list_length: usize,
peer_store_file: PathBuf,
peer_save_period: Duration,
}
/// Possible errors when dealing with the address book.
/// This is boxed when returning an error in the [`tower::Service`].
#[derive(Debug, thiserror::Error, Eq, PartialEq)]
pub enum AddressBookError {
/// The peer is already connected.
#[error("Peer is already connected")]
PeerAlreadyConnected,
/// The peer is not in the address book for this zone.
#[error("Peer was not found in book")]
PeerNotFound,
/// The peer list is empty.
#[error("The peer list is empty")]
PeerListEmpty,
/// Immutable peer data was changed.
#[error("Immutable peer data was changed: {0}")]
PeersDataChanged(&'static str),
/// The peer is banned.
#[error("The peer is banned")]
PeerIsBanned,
/// The channel to the address book has closed unexpectedly.
#[error("The address books channel has closed.")]
AddressBooksChannelClosed,
/// The address book task has exited.
#[error("The address book task has exited.")]
AddressBookTaskExited,
}
pub async fn init_address_book<Z: NetworkZone>(
cfg: Config,
) -> Result<
impl tower::Service<
AddressBookRequest<Z>,
Response = AddressBookResponse<Z>,
Error = tower::BoxError,
>,
std::io::Error,
> {
let (white_list, gray_list) = store::read_peers_from_disk::<Z>(&cfg).await?;
let address_book = book::AddressBook::<Z>::new(cfg, white_list, gray_list, Vec::new());
Ok(tower::buffer::Buffer::new(address_book, 15))
}

View file

@ -0,0 +1,240 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use rand::{seq::SliceRandom, Rng};
use cuprate_common::{PruningSeed, CRYPTONOTE_MAX_BLOCK_NUMBER};
use monero_p2p::{services::ZoneSpecificPeerListEntryBase, NetZoneAddress, NetworkZone};
#[cfg(test)]
pub mod tests;
/// A Peer list in the address book.
///
/// This could either be the white list or gray list.
#[derive(Debug)]
pub struct PeerList<Z: NetworkZone> {
/// The peers with their peer data.
pub peers: HashMap<Z::Addr, ZoneSpecificPeerListEntryBase<Z::Addr>>,
/// An index of Pruning seed to address, so can quickly grab peers with the blocks
/// we want.
///
/// Pruning seeds are sorted by first their log_stripes and then their stripe.
/// This means the first peers in this list will store more blocks than peers
/// later on. So when we need a peer with a certain block we look at the peers
/// storing more blocks first then work our way to the peers storing less.
///
pruning_seeds: BTreeMap<PruningSeed, Vec<Z::Addr>>,
/// A hashmap linking ban_ids to addresses.
ban_ids: HashMap<<Z::Addr as NetZoneAddress>::BanID, Vec<Z::Addr>>,
}
impl<Z: NetworkZone> PeerList<Z> {
/// Creates a new peer list.
pub fn new(list: Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>) -> PeerList<Z> {
let mut peers = HashMap::with_capacity(list.len());
let mut pruning_seeds = BTreeMap::new();
let mut ban_ids = HashMap::with_capacity(list.len());
for peer in list {
pruning_seeds
.entry(peer.pruning_seed)
.or_insert_with(Vec::new)
.push(peer.adr);
ban_ids
.entry(peer.adr.ban_id())
.or_insert_with(Vec::new)
.push(peer.adr);
peers.insert(peer.adr, peer);
}
PeerList {
peers,
pruning_seeds,
ban_ids,
}
}
/// Gets the length of the peer list
pub fn len(&self) -> usize {
self.peers.len()
}
/// Adds a new peer to the peer list
pub fn add_new_peer(&mut self, peer: ZoneSpecificPeerListEntryBase<Z::Addr>) {
if self.peers.insert(peer.adr, peer).is_none() {
// It's more clear with this
#[allow(clippy::unwrap_or_default)]
self.pruning_seeds
.entry(peer.pruning_seed)
.or_insert_with(Vec::new)
.push(peer.adr);
#[allow(clippy::unwrap_or_default)]
self.ban_ids
.entry(peer.adr.ban_id())
.or_insert_with(Vec::new)
.push(peer.adr);
}
}
/// Gets a reference to a peer
pub fn get_peer(&self, peer: &Z::Addr) -> Option<&ZoneSpecificPeerListEntryBase<Z::Addr>> {
self.peers.get(peer)
}
/// Returns a random peer.
/// If the pruning seed is specified then we will get a random peer with
/// that pruning seed otherwise we will just get a random peer in the whole
/// list.
pub fn get_random_peer<R: Rng>(
&self,
r: &mut R,
block_needed: Option<u64>,
) -> Option<&ZoneSpecificPeerListEntryBase<Z::Addr>> {
if let Some(needed_height) = block_needed {
let (_, addresses_with_block) = self.pruning_seeds.iter().find(|(seed, _)| {
// TODO: factor in peer blockchain height?
seed.get_next_unpruned_block(needed_height, CRYPTONOTE_MAX_BLOCK_NUMBER)
.expect("Explain")
== needed_height
})?;
let n = r.gen_range(0..addresses_with_block.len());
self.get_peer(&addresses_with_block[n])
} else {
let len = self.len();
if len == 0 {
None
} else {
let n = r.gen_range(0..len);
self.peers.values().nth(n)
}
}
}
pub fn get_random_peers<R: Rng>(
&self,
r: &mut R,
len: usize,
) -> Vec<ZoneSpecificPeerListEntryBase<Z::Addr>> {
let mut peers = self.peers.values().copied().collect::<Vec<_>>();
peers.shuffle(r);
peers.drain(len.min(peers.len())..peers.len());
peers
}
/// Returns a mutable reference to a peer.
pub fn get_peer_mut(
&mut self,
peer: &Z::Addr,
) -> Option<&mut ZoneSpecificPeerListEntryBase<Z::Addr>> {
self.peers.get_mut(peer)
}
/// Returns true if the list contains this peer.
pub fn contains_peer(&self, peer: &Z::Addr) -> bool {
self.peers.contains_key(peer)
}
/// Removes a peer from the pruning idx
///
/// MUST NOT BE USED ALONE
fn remove_peer_pruning_idx(&mut self, peer: &ZoneSpecificPeerListEntryBase<Z::Addr>) {
remove_peer_idx::<Z>(self.pruning_seeds.get_mut(&peer.pruning_seed), &peer.adr);
if self
.pruning_seeds
.get(&peer.pruning_seed)
.expect("There must be a peer with this id")
.is_empty()
{
self.pruning_seeds.remove(&peer.pruning_seed);
}
}
/// Removes a peer from the ban idx
///
/// MUST NOT BE USED ALONE
fn remove_peer_ban_idx(&mut self, peer: &ZoneSpecificPeerListEntryBase<Z::Addr>) {
remove_peer_idx::<Z>(self.ban_ids.get_mut(&peer.adr.ban_id()), &peer.adr);
if self
.ban_ids
.get(&peer.adr.ban_id())
.expect("There must be a peer with this id")
.is_empty()
{
self.ban_ids.remove(&peer.adr.ban_id());
}
}
/// Removes a peer from all the indexes
///
/// MUST NOT BE USED ALONE
fn remove_peer_from_all_idxs(&mut self, peer: &ZoneSpecificPeerListEntryBase<Z::Addr>) {
self.remove_peer_pruning_idx(peer);
self.remove_peer_ban_idx(peer)
}
/// Removes a peer from the peer list
pub fn remove_peer(
&mut self,
peer: &Z::Addr,
) -> Option<ZoneSpecificPeerListEntryBase<Z::Addr>> {
let peer_eb = self.peers.remove(peer)?;
self.remove_peer_from_all_idxs(&peer_eb);
Some(peer_eb)
}
/// Removes all peers with a specific ban id.
pub fn remove_peers_with_ban_id(&mut self, ban_id: &<Z::Addr as NetZoneAddress>::BanID) {
let Some(addresses) = self.ban_ids.get(ban_id) else {
// No peers to ban
return;
};
for addr in addresses.clone() {
self.remove_peer(&addr);
}
}
/// Tries to reduce the peer list to `new_len`.
///
/// This function could keep the list bigger than `new_len` if `must_keep_peers`s length
/// is larger than new_len, in that case we will remove as much as we can.
pub fn reduce_list(&mut self, must_keep_peers: &HashSet<Z::Addr>, new_len: usize) {
if new_len >= self.len() {
return;
}
let target_removed = self.len() - new_len;
let mut removed_count = 0;
let mut peers_to_remove: Vec<Z::Addr> = Vec::with_capacity(target_removed);
for peer_adr in self.peers.keys() {
if removed_count >= target_removed {
break;
}
if !must_keep_peers.contains(peer_adr) {
peers_to_remove.push(*peer_adr);
removed_count += 1;
}
}
for peer_adr in peers_to_remove {
let _ = self.remove_peer(&peer_adr);
}
}
}
/// Remove a peer from an index.
fn remove_peer_idx<Z: NetworkZone>(peer_list: Option<&mut Vec<Z::Addr>>, addr: &Z::Addr) {
if let Some(peer_list) = peer_list {
if let Some(idx) = peer_list.iter().position(|peer_adr| peer_adr == addr) {
peer_list.swap_remove(idx);
} else {
unreachable!("This function will only be called when the peer exists.");
}
} else {
unreachable!("Index must exist if a peer has that index");
}
}

View file

@ -0,0 +1,186 @@
use std::collections::HashSet;
use rand::Rng;
use cuprate_common::PruningSeed;
use monero_p2p::services::ZoneSpecificPeerListEntryBase;
use cuprate_test_utils::test_netzone::{TestNetZone, TestNetZoneAddr};
use monero_p2p::NetZoneAddress;
use super::PeerList;
fn make_fake_peer(
id: u32,
pruning_seed: Option<u32>,
) -> ZoneSpecificPeerListEntryBase<TestNetZoneAddr> {
ZoneSpecificPeerListEntryBase {
adr: TestNetZoneAddr(id),
id: id as u64,
last_seen: 0,
pruning_seed: PruningSeed::try_from(pruning_seed.unwrap_or(0)).unwrap(),
rpc_port: 0,
rpc_credits_per_hash: 0,
}
}
pub fn make_fake_peer_list(
start_idx: u32,
numb_o_peers: u32,
) -> PeerList<TestNetZone<true, true, true>> {
let mut peer_list = Vec::with_capacity(numb_o_peers as usize);
for idx in start_idx..(start_idx + numb_o_peers) {
peer_list.push(make_fake_peer(idx, None))
}
PeerList::new(peer_list)
}
fn make_fake_peer_list_with_random_pruning_seeds(
numb_o_peers: u32,
) -> PeerList<TestNetZone<true, true, true>> {
let mut r = rand::thread_rng();
let mut peer_list = Vec::with_capacity(numb_o_peers as usize);
for idx in 0..numb_o_peers {
peer_list.push(make_fake_peer(
idx,
Some(if r.gen_bool(0.4) {
0
} else {
r.gen_range(384..=391)
}),
))
}
PeerList::new(peer_list)
}
#[test]
fn peer_list_reduce_length() {
let mut peer_list = make_fake_peer_list(0, 2090);
let must_keep_peers = HashSet::new();
let target_len = 2000;
peer_list.reduce_list(&must_keep_peers, target_len);
assert_eq!(peer_list.len(), target_len);
}
#[test]
fn peer_list_reduce_length_with_peers_we_need() {
let mut peer_list = make_fake_peer_list(0, 500);
let must_keep_peers = HashSet::from_iter(peer_list.peers.keys().copied());
let target_len = 49;
peer_list.reduce_list(&must_keep_peers, target_len);
// we can't remove any of the peers we said we need them all
assert_eq!(peer_list.len(), 500);
}
#[test]
fn peer_list_remove_specific_peer() {
let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100);
let peer = *peer_list
.get_random_peer(&mut rand::thread_rng(), None)
.unwrap();
assert!(peer_list.remove_peer(&peer.adr).is_some());
let pruning_idxs = peer_list.pruning_seeds;
let peers = peer_list.peers;
for (_, addrs) in pruning_idxs {
addrs.iter().for_each(|adr| assert_ne!(adr, &peer.adr))
}
assert!(!peers.contains_key(&peer.adr));
}
#[test]
fn peer_list_pruning_idxs_are_correct() {
let peer_list = make_fake_peer_list_with_random_pruning_seeds(100);
let mut total_len = 0;
for (seed, list) in peer_list.pruning_seeds {
for peer in list.iter() {
assert_eq!(peer_list.peers.get(peer).unwrap().pruning_seed, seed);
total_len += 1;
}
}
assert_eq!(total_len, peer_list.peers.len())
}
#[test]
fn peer_list_add_new_peer() {
let mut peer_list = make_fake_peer_list(0, 10);
let new_peer = make_fake_peer(50, None);
peer_list.add_new_peer(new_peer);
assert_eq!(peer_list.len(), 11);
assert_eq!(peer_list.get_peer(&new_peer.adr), Some(&new_peer));
assert!(peer_list
.pruning_seeds
.get(&new_peer.pruning_seed)
.unwrap()
.contains(&new_peer.adr));
}
#[test]
fn peer_list_add_existing_peer() {
let mut peer_list = make_fake_peer_list(0, 10);
let existing_peer = *peer_list.get_peer(&TestNetZoneAddr(0)).unwrap();
peer_list.add_new_peer(existing_peer);
assert_eq!(peer_list.len(), 10);
assert_eq!(peer_list.get_peer(&existing_peer.adr), Some(&existing_peer));
}
#[test]
fn peer_list_get_non_existent_peer() {
let peer_list = make_fake_peer_list(0, 10);
let non_existent_peer = TestNetZoneAddr(50);
assert_eq!(peer_list.get_peer(&non_existent_peer), None);
}
#[test]
fn peer_list_get_peer_with_block() {
let mut r = rand::thread_rng();
let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100);
peer_list.add_new_peer(make_fake_peer(101, Some(384)));
let peer = peer_list
.get_random_peer(&mut r, Some(1))
.expect("We just added a peer with the correct seed");
assert!(peer
.pruning_seed
.get_next_unpruned_block(1, 1_000_000)
.is_ok())
}
#[test]
fn peer_list_ban_peers() {
let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100);
let peer = peer_list
.get_random_peer(&mut rand::thread_rng(), None)
.unwrap();
let ban_id = peer.adr.ban_id();
assert!(peer_list.contains_peer(&peer.adr));
assert_ne!(peer_list.ban_ids.get(&ban_id).unwrap().len(), 0);
peer_list.remove_peers_with_ban_id(&ban_id);
assert_eq!(peer_list.ban_ids.get(&ban_id), None);
for (addr, _) in peer_list.peers {
assert_ne!(addr.ban_id(), ban_id);
}
}

View file

@ -0,0 +1,92 @@
use std::fs;
use borsh::{from_slice, to_vec, BorshDeserialize, BorshSerialize};
use tokio::task::{spawn_blocking, JoinHandle};
use monero_p2p::{services::ZoneSpecificPeerListEntryBase, NetZoneAddress, NetworkZone};
use crate::{peer_list::PeerList, Config};
// TODO: store anchor and ban list.
#[derive(BorshSerialize)]
struct SerPeerDataV1<'a, A: NetZoneAddress> {
white_list: Vec<&'a ZoneSpecificPeerListEntryBase<A>>,
gray_list: Vec<&'a ZoneSpecificPeerListEntryBase<A>>,
}
#[derive(BorshDeserialize)]
struct DeserPeerDataV1<A: NetZoneAddress> {
white_list: Vec<ZoneSpecificPeerListEntryBase<A>>,
gray_list: Vec<ZoneSpecificPeerListEntryBase<A>>,
}
pub fn save_peers_to_disk<Z: NetworkZone>(
cfg: &Config,
white_list: &PeerList<Z>,
gray_list: &PeerList<Z>,
) -> JoinHandle<std::io::Result<()>> {
// maybe move this to another thread but that would require cloning the data ... this
// happens so infrequently that it's probably not worth it.
let data = to_vec(&SerPeerDataV1 {
white_list: white_list.peers.values().collect::<Vec<_>>(),
gray_list: gray_list.peers.values().collect::<Vec<_>>(),
})
.unwrap();
let file = cfg.peer_store_file.clone();
spawn_blocking(move || fs::write(&file, &data))
}
pub async fn read_peers_from_disk<Z: NetworkZone>(
cfg: &Config,
) -> Result<
(
Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>,
Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>,
),
std::io::Error,
> {
let file = cfg.peer_store_file.clone();
let data = spawn_blocking(move || fs::read(file)).await.unwrap()?;
let de_ser: DeserPeerDataV1<Z::Addr> = from_slice(&data)?;
Ok((de_ser.white_list, de_ser.gray_list))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::peer_list::{tests::make_fake_peer_list, PeerList};
use cuprate_test_utils::test_netzone::{TestNetZone, TestNetZoneAddr};
#[test]
fn ser_deser_peer_list() {
let white_list = make_fake_peer_list(0, 50);
let gray_list = make_fake_peer_list(50, 100);
let data = to_vec(&SerPeerDataV1 {
white_list: white_list.peers.values().collect::<Vec<_>>(),
gray_list: gray_list.peers.values().collect::<Vec<_>>(),
})
.unwrap();
let de_ser: DeserPeerDataV1<TestNetZoneAddr> = from_slice(&data).unwrap();
let white_list_2: PeerList<TestNetZone<true, true, true>> =
PeerList::new(de_ser.white_list);
let gray_list_2: PeerList<TestNetZone<true, true, true>> = PeerList::new(de_ser.gray_list);
assert_eq!(white_list.peers.len(), white_list_2.peers.len());
assert_eq!(gray_list.peers.len(), gray_list_2.peers.len());
for addr in white_list.peers.keys() {
assert!(white_list_2.contains_peer(addr));
}
for addr in gray_list.peers.keys() {
assert!(gray_list_2.contains_peer(addr));
}
}
}

View file

@ -1,25 +1,29 @@
[package] [package]
name = "monero-peer" name = "monero-p2p"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
license = "MIT"
authors = ["Boog900"]
[features] [features]
default = [] default = ["borsh"]
borsh = ["dep:borsh"]
[dependencies] [dependencies]
monero-wire = {path= "../../net/monero-wire"} monero-wire = {path= "../../net/monero-wire"}
cuprate-common = {path = "../../common"} cuprate-common = {path = "../../common", features = ["borsh"]}
tokio = {version= "1.34.0", default-features = false, features = ["net"]} tokio = {version= "1.34.0", default-features = false, features = ["net"]}
tokio-util = { version = "0.7.10", default-features = false, features = ["codec"] } tokio-util = { version = "0.7.10", default-features = false, features = ["codec"] }
futures = "0.3.29" futures = "0.3.29"
async-trait = "0.1.74" async-trait = "0.1.74"
tower = { version= "0.4.13", features = ["util"] } tower = { version= "0.4.13", features = ["util"] }
thiserror = "1.0.50"
thiserror = "1.0.50"
tracing = "0.1.40" tracing = "0.1.40"
borsh = {version = "1.2.1", default-features = false, features = ["derive", "std"], optional = true }
[dev-dependencies] [dev-dependencies]
cuprate-test-utils = {path = "../../test-utils"} cuprate-test-utils = {path = "../../test-utils"}

View file

@ -0,0 +1,14 @@
mod conector;
mod connection;
pub mod handshaker;
pub use conector::{ConnectRequest, Connector};
pub use handshaker::{DoHandshakeRequest, HandShaker, HandshakeError};
/// An internal identifier for a given peer, will be their address if known
/// or a random u64 if not.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum InternalPeerID<A> {
KnownAddr(A),
Unknown(u64),
}

View file

@ -47,7 +47,7 @@ where
let mut handshaker = self.handshaker.clone(); let mut handshaker = self.handshaker.clone();
async move { async move {
let (peer_stream, peer_sink) = Z::connect_to_peer(req.addr.clone()).await?; let (peer_stream, peer_sink) = Z::connect_to_peer(req.addr).await?;
let req = DoHandshakeRequest { let req = DoHandshakeRequest {
addr: req.addr, addr: req.addr,
peer_stream, peer_stream,

View file

@ -41,7 +41,7 @@ impl State {
LevinCommand::NewTransactions LevinCommand::NewTransactions
) )
), ),
_ => false, _ => panic!("We are not in a state to be checking responses!"),
} }
} }
} }

View file

@ -31,7 +31,7 @@ pub enum HandshakeError {
#[error("peer is on a different network")] #[error("peer is on a different network")]
IncorrectNetwork, IncorrectNetwork,
#[error("peer sent a peer list with peers from different zones")] #[error("peer sent a peer list with peers from different zones")]
PeerSentIncorrectZonePeerList(#[from] crate::NetworkAddressIncorrectZone), PeerSentIncorrectPeerList(#[from] crate::services::PeerListConversionError),
#[error("peer sent invalid message: {0}")] #[error("peer sent invalid message: {0}")]
PeerSentInvalidMessage(&'static str), PeerSentInvalidMessage(&'static str),
#[error("Levin bucket error: {0}")] #[error("Levin bucket error: {0}")]
@ -234,7 +234,9 @@ where
.address_book .address_book
.ready() .ready()
.await? .await?
.call(AddressBookRequest::GetPeers(MAX_PEERS_IN_PEER_LIST_MESSAGE)) .call(AddressBookRequest::GetWhitePeers(
MAX_PEERS_IN_PEER_LIST_MESSAGE,
))
.await? .await?
else { else {
panic!("Address book sent incorrect response"); panic!("Address book sent incorrect response");

View file

@ -0,0 +1,95 @@
//!
use std::time::Duration;
use futures::{channel::mpsc, SinkExt};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::CancellationToken;
#[derive(Default, Debug)]
pub struct HandleBuilder {
permit: Option<OwnedSemaphorePermit>,
}
impl HandleBuilder {
pub fn with_permit(mut self, permit: OwnedSemaphorePermit) -> Self {
self.permit = Some(permit);
self
}
pub fn build(self) -> (ConnectionGuard, ConnectionHandle, PeerHandle) {
let token = CancellationToken::new();
let (tx, rx) = mpsc::channel(0);
(
ConnectionGuard {
token: token.clone(),
permit: self.permit.expect("connection permit was not set!"),
},
ConnectionHandle {
token: token.clone(),
ban: rx,
},
PeerHandle { ban: tx, token },
)
}
}
pub struct BanPeer(pub Duration);
/// A struct given to the connection task.
pub struct ConnectionGuard {
token: CancellationToken,
permit: OwnedSemaphorePermit,
}
impl ConnectionGuard {
pub fn should_shutdown(&self) -> bool {
self.token.is_cancelled()
}
pub fn connection_closed(&self) {
self.token.cancel()
}
}
impl Drop for ConnectionGuard {
fn drop(&mut self) {
self.token.cancel()
}
}
/// A handle given to a task that needs to close this connection and find out if the connection has
/// been banned.
pub struct ConnectionHandle {
token: CancellationToken,
ban: mpsc::Receiver<BanPeer>,
}
impl ConnectionHandle {
pub fn is_closed(&self) -> bool {
self.token.is_cancelled()
}
pub fn check_should_ban(&mut self) -> Option<BanPeer> {
match self.ban.try_next() {
Ok(res) => res,
Err(_) => None,
}
}
pub fn send_close_signal(&self) {
self.token.cancel()
}
}
/// A handle given to a task that needs to be able to ban a peer.
#[derive(Clone)]
pub struct PeerHandle {
token: CancellationToken,
ban: mpsc::Sender<BanPeer>,
}
impl PeerHandle {
pub fn ban_peer(&mut self, duration: Duration) {
// This channel won't be dropped and if it's full the peer has already been banned.
let _ = self.ban.try_send(BanPeer(duration));
self.token.cancel()
}
}

View file

@ -1,6 +1,6 @@
#![allow(unused)] #![allow(unused)]
use std::{future::Future, pin::Pin}; use std::{fmt::Debug, future::Future, hash::Hash, pin::Pin};
use futures::{Sink, Stream}; use futures::{Sink, Stream};
@ -10,6 +10,7 @@ use monero_wire::{
pub mod client; pub mod client;
pub mod error; pub mod error;
pub mod handles;
pub mod network_zones; pub mod network_zones;
pub mod protocol; pub mod protocol;
pub mod services; pub mod services;
@ -26,9 +27,63 @@ pub enum ConnectionDirection {
OutBound, OutBound,
} }
#[cfg(not(feature = "borsh"))]
pub trait NetZoneAddress:
TryFrom<NetworkAddress, Error = NetworkAddressIncorrectZone>
+ Into<NetworkAddress>
+ std::fmt::Display
+ Hash
+ Eq
+ Clone
+ Copy
+ Send
+ Unpin
+ 'static
{
/// Cuprate needs to be able to ban peers by IP addresses and not just by SocketAddr as
/// that include the port, to be able to facilitate this network addresses must have a ban ID
/// which for hidden services could just be the address it self but for clear net addresses will
/// be the IP address.
/// TODO: IP zone banning?
type BanID: Debug + Hash + Eq + Clone + Copy + Send + 'static;
fn ban_id(&self) -> Self::BanID;
fn should_add_to_peer_list(&self) -> bool;
}
#[cfg(feature = "borsh")]
pub trait NetZoneAddress:
TryFrom<NetworkAddress, Error = NetworkAddressIncorrectZone>
+ Into<NetworkAddress>
+ std::fmt::Display
+ borsh::BorshSerialize
+ borsh::BorshDeserialize
+ Hash
+ Eq
+ Clone
+ Copy
+ Send
+ Unpin
+ 'static
{
/// Cuprate needs to be able to ban peers by IP addresses and not just by SocketAddr as
/// that include the port, to be able to facilitate this network addresses must have a ban ID
/// which for hidden services could just be the address it self but for clear net addresses will
/// be the IP address.
/// TODO: IP zone banning?
type BanID: Debug + Hash + Eq + Clone + Copy + Send + 'static;
fn ban_id(&self) -> Self::BanID;
fn should_add_to_peer_list(&self) -> bool;
}
/// An abstraction over a network zone (tor/i2p/clear) /// An abstraction over a network zone (tor/i2p/clear)
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait NetworkZone: Clone + Send + 'static { pub trait NetworkZone: Clone + Copy + Send + 'static {
/// The network name.
const NAME: &'static str;
/// Allow syncing over this network. /// Allow syncing over this network.
/// ///
/// Not recommended for anonymity networks. /// Not recommended for anonymity networks.
@ -44,12 +99,8 @@ pub trait NetworkZone: Clone + Send + 'static {
const CHECK_NODE_ID: bool; const CHECK_NODE_ID: bool;
/// The address type of this network. /// The address type of this network.
type Addr: TryFrom<NetworkAddress, Error = NetworkAddressIncorrectZone> type Addr: NetZoneAddress;
+ Into<NetworkAddress>
+ std::fmt::Display
+ Clone
+ Send
+ 'static;
/// The stream (incoming data) type for this network. /// The stream (incoming data) type for this network.
type Stream: Stream<Item = Result<Message, BucketError>> + Unpin + Send + 'static; type Stream: Stream<Item = Result<Message, BucketError>> + Unpin + Send + 'static;
/// The sink (outgoing data) type for this network. /// The sink (outgoing data) type for this network.
@ -69,33 +120,26 @@ pub(crate) trait AddressBook<Z: NetworkZone>:
AddressBookRequest<Z>, AddressBookRequest<Z>,
Response = AddressBookResponse<Z>, Response = AddressBookResponse<Z>,
Error = tower::BoxError, Error = tower::BoxError,
Future = Pin< Future = Self::Future2,
Box<
dyn Future<Output = Result<AddressBookResponse<Z>, tower::BoxError>>
+ Send
+ 'static,
>,
>,
> + Send > + Send
+ 'static + 'static
{ {
// This allows us to put more restrictive bounds on the future without defining the future here
// explicitly.
type Future2: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static;
} }
impl<T, Z: NetworkZone> AddressBook<Z> for T where impl<T, Z: NetworkZone> AddressBook<Z> for T
where
T: tower::Service< T: tower::Service<
AddressBookRequest<Z>, AddressBookRequest<Z>,
Response = AddressBookResponse<Z>, Response = AddressBookResponse<Z>,
Error = tower::BoxError, Error = tower::BoxError,
Future = Pin<
Box<
dyn Future<Output = Result<AddressBookResponse<Z>, tower::BoxError>>
+ Send
+ 'static,
>,
>,
> + Send > + Send
+ 'static + 'static,
T::Future: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static,
{ {
type Future2 = T::Future;
} }
pub(crate) trait CoreSyncSvc: pub(crate) trait CoreSyncSvc:

View file

@ -1,4 +1,4 @@
use std::net::SocketAddr; use std::net::{IpAddr, SocketAddr};
use monero_wire::MoneroWireCodec; use monero_wire::MoneroWireCodec;
@ -8,15 +8,28 @@ use tokio::net::{
}; };
use tokio_util::codec::{FramedRead, FramedWrite}; use tokio_util::codec::{FramedRead, FramedWrite};
use crate::NetworkZone; use crate::{NetZoneAddress, NetworkZone};
#[derive(Clone)] impl NetZoneAddress for SocketAddr {
type BanID = IpAddr;
fn ban_id(&self) -> Self::BanID {
self.ip()
}
fn should_add_to_peer_list(&self) -> bool {
todo!()
}
}
#[derive(Clone, Copy)]
pub struct ClearNet; pub struct ClearNet;
pub struct ClearNetServerCfg {} pub struct ClearNetServerCfg {}
#[async_trait::async_trait] #[async_trait::async_trait]
impl NetworkZone for ClearNet { impl NetworkZone for ClearNet {
const NAME: &'static str = "ClearNet";
const ALLOW_SYNC: bool = true; const ALLOW_SYNC: bool = true;
const DANDELION_PP: bool = true; const DANDELION_PP: bool = true;
const CHECK_NODE_ID: bool = true; const CHECK_NODE_ID: bool = true;

View file

@ -0,0 +1,107 @@
use cuprate_common::{PruningError, PruningSeed};
use monero_wire::{NetZone, NetworkAddress, PeerListEntryBase};
use crate::{
client::InternalPeerID, handles::ConnectionHandle, NetZoneAddress, NetworkAddressIncorrectZone,
NetworkZone,
};
pub enum CoreSyncDataRequest {
Ours,
HandleIncoming(monero_wire::CoreSyncData),
}
pub enum CoreSyncDataResponse {
Ours(monero_wire::CoreSyncData),
Ok,
}
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
#[cfg_attr(
feature = "borsh",
derive(borsh::BorshSerialize, borsh::BorshDeserialize)
)]
pub struct ZoneSpecificPeerListEntryBase<A: NetZoneAddress> {
pub adr: A,
pub id: u64,
pub last_seen: i64,
pub pruning_seed: PruningSeed,
pub rpc_port: u16,
pub rpc_credits_per_hash: u32,
}
impl<A: NetZoneAddress> From<ZoneSpecificPeerListEntryBase<A>> for monero_wire::PeerListEntryBase {
fn from(value: ZoneSpecificPeerListEntryBase<A>) -> Self {
Self {
adr: value.adr.into(),
id: value.id,
last_seen: value.last_seen,
pruning_seed: value.pruning_seed.into(),
rpc_port: value.rpc_port,
rpc_credits_per_hash: value.rpc_credits_per_hash,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum PeerListConversionError {
#[error("Address is in incorrect zone")]
Address(#[from] NetworkAddressIncorrectZone),
#[error("Pruning seed error: {0}")]
PruningSeed(#[from] PruningError),
}
impl<A: NetZoneAddress> TryFrom<monero_wire::PeerListEntryBase>
for ZoneSpecificPeerListEntryBase<A>
{
type Error = PeerListConversionError;
fn try_from(value: PeerListEntryBase) -> Result<Self, Self::Error> {
Ok(Self {
adr: value.adr.try_into()?,
id: value.id,
last_seen: value.last_seen,
pruning_seed: PruningSeed::try_from(value.pruning_seed)?,
rpc_port: value.rpc_port,
rpc_credits_per_hash: value.rpc_credits_per_hash,
})
}
}
pub enum AddressBookRequest<Z: NetworkZone> {
NewConnection {
addr: Option<Z::Addr>,
internal_peer_id: InternalPeerID<Z::Addr>,
handle: ConnectionHandle,
id: u64,
pruning_seed: PruningSeed,
/// The peers port.
rpc_port: u16,
/// The peers rpc credits per hash
rpc_credits_per_hash: u32,
},
/// Bans a peer for the specified duration. This request
/// will send disconnect signals to all peers with the same
/// address.
BanPeer(Z::Addr, std::time::Duration),
IncomingPeerList(Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>),
/// Gets a random white peer from the peer list. If height is specified
/// then the peer list should retrieve a peer that should have a full
/// block at that height according to it's pruning seed
GetRandomWhitePeer {
height: Option<u64>,
},
/// Gets a random gray peer from the peer list. If height is specified
/// then the peer list should retrieve a peer that should have a full
/// block at that height according to it's pruning seed
GetRandomGrayPeer {
height: Option<u64>,
},
GetWhitePeers(usize),
}
pub enum AddressBookResponse<Z: NetworkZone> {
Ok,
Peer(ZoneSpecificPeerListEntryBase<Z::Addr>),
Peers(Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>),
}

View file

@ -0,0 +1,64 @@
use std::{sync::Arc, time::Duration};
use tokio::sync::Semaphore;
use monero_p2p::handles::HandleBuilder;
#[test]
fn send_ban_signal() {
let semaphore = Arc::new(Semaphore::new(5));
let (guard, mut connection_handle, mut peer_handle) = HandleBuilder::default()
.with_permit(semaphore.try_acquire_owned().unwrap())
.build();
peer_handle.ban_peer(Duration::from_secs(300));
let Some(ban_time) = connection_handle.check_should_ban() else {
panic!("ban signal not received!");
};
assert_eq!(ban_time.0, Duration::from_secs(300));
connection_handle.send_close_signal();
assert!(guard.should_shutdown());
guard.connection_closed();
assert!(connection_handle.is_closed());
}
#[test]
fn multiple_ban_signals() {
let semaphore = Arc::new(Semaphore::new(5));
let (guard, mut connection_handle, mut peer_handle) = HandleBuilder::default()
.with_permit(semaphore.try_acquire_owned().unwrap())
.build();
peer_handle.ban_peer(Duration::from_secs(300));
peer_handle.ban_peer(Duration::from_secs(301));
peer_handle.ban_peer(Duration::from_secs(302));
let Some(ban_time) = connection_handle.check_should_ban() else {
panic!("ban signal not received!");
};
// only the first will be seen
assert_eq!(ban_time.0, Duration::from_secs(300));
connection_handle.send_close_signal();
assert!(guard.should_shutdown());
guard.connection_closed();
assert!(connection_handle.is_closed());
}
#[test]
fn dropped_guard_sends_disconnect_signal() {
let semaphore = Arc::new(Semaphore::new(5));
let (guard, connection_handle, _) = HandleBuilder::default()
.with_permit(semaphore.try_acquire_owned().unwrap())
.build();
assert!(!connection_handle.is_closed());
drop(guard);
assert!(connection_handle.is_closed());
}

View file

@ -6,7 +6,7 @@ use tower::{Service, ServiceExt};
use cuprate_common::Network; use cuprate_common::Network;
use monero_wire::{common::PeerSupportFlags, BasicNodeData}; use monero_wire::{common::PeerSupportFlags, BasicNodeData};
use monero_peer::{ use monero_p2p::{
client::{ConnectRequest, Connector, DoHandshakeRequest, HandShaker}, client::{ConnectRequest, Connector, DoHandshakeRequest, HandShaker},
network_zones::ClearNet, network_zones::ClearNet,
ConnectionDirection, ConnectionDirection,

View file

@ -7,7 +7,7 @@ use std::{
use futures::FutureExt; use futures::FutureExt;
use tower::Service; use tower::Service;
use monero_peer::{ use monero_p2p::{
services::{ services::{
AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse, AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse,
}, },
@ -30,7 +30,7 @@ impl<Z: NetworkZone> Service<AddressBookRequest<Z>> for DummyAddressBook {
fn call(&mut self, req: AddressBookRequest<Z>) -> Self::Future { fn call(&mut self, req: AddressBookRequest<Z>) -> Self::Future {
async move { async move {
Ok(match req { Ok(match req {
AddressBookRequest::GetPeers(_) => AddressBookResponse::Peers(vec![]), AddressBookRequest::GetWhitePeers(_) => AddressBookResponse::Peers(vec![]),
_ => AddressBookResponse::Ok, _ => AddressBookResponse::Ok,
}) })
} }

View file

@ -1,6 +0,0 @@
mod conector;
mod connection;
pub mod handshaker;
pub use conector::{ConnectRequest, Connector};
pub use handshaker::{DoHandshakeRequest, HandShaker, HandshakeError};

View file

@ -1,61 +0,0 @@
use monero_wire::PeerListEntryBase;
use crate::{NetworkAddressIncorrectZone, NetworkZone};
pub enum CoreSyncDataRequest {
Ours,
HandleIncoming(monero_wire::CoreSyncData),
}
pub enum CoreSyncDataResponse {
Ours(monero_wire::CoreSyncData),
Ok,
}
pub struct ZoneSpecificPeerListEntryBase<Z: NetworkZone> {
pub adr: Z::Addr,
pub id: u64,
pub last_seen: i64,
pub pruning_seed: u32,
pub rpc_port: u16,
pub rpc_credits_per_hash: u32,
}
impl<Z: NetworkZone> From<ZoneSpecificPeerListEntryBase<Z>> for monero_wire::PeerListEntryBase {
fn from(value: ZoneSpecificPeerListEntryBase<Z>) -> Self {
Self {
adr: value.adr.into(),
id: value.id,
last_seen: value.last_seen,
pruning_seed: value.pruning_seed,
rpc_port: value.rpc_port,
rpc_credits_per_hash: value.rpc_credits_per_hash,
}
}
}
impl<Z: NetworkZone> TryFrom<monero_wire::PeerListEntryBase> for ZoneSpecificPeerListEntryBase<Z> {
type Error = NetworkAddressIncorrectZone;
fn try_from(value: PeerListEntryBase) -> Result<Self, Self::Error> {
Ok(Self {
adr: value.adr.try_into()?,
id: value.id,
last_seen: value.last_seen,
pruning_seed: value.pruning_seed,
rpc_port: value.rpc_port,
rpc_credits_per_hash: value.rpc_credits_per_hash,
})
}
}
pub enum AddressBookRequest<Z: NetworkZone> {
NewConnection(Z::Addr, ZoneSpecificPeerListEntryBase<Z>),
IncomingPeerList(Vec<ZoneSpecificPeerListEntryBase<Z>>),
GetPeers(usize),
}
pub enum AddressBookResponse<Z: NetworkZone> {
Ok,
Peers(Vec<ZoneSpecificPeerListEntryBase<Z>>),
}

View file

@ -63,14 +63,6 @@ impl PeerList {
self.peers.len() self.peers.len()
} }
/// Gets the amount of peers with a specific seed.
pub fn len_by_seed(&self, pruning_seed: &u32) -> usize {
self.pruning_idxs
.get(pruning_seed)
.map(|indexes| indexes.len())
.unwrap_or(0)
}
/// Adds a new peer to the peer list /// Adds a new peer to the peer list
pub fn add_new_peer(&mut self, peer: PeerListEntryBase) { pub fn add_new_peer(&mut self, peer: PeerListEntryBase) {
if let None = self.peers.insert(peer.adr, peer) { if let None = self.peers.insert(peer.adr, peer) {

View file

@ -1,8 +1,3 @@
//! Counting active connections used by Cuprate.
//!
//! These types can be used to count any kind of active resource.
//! But they are currently used to track the number of open connections.
use std::{fmt, sync::Arc}; use std::{fmt, sync::Arc};
use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::sync::{OwnedSemaphorePermit, Semaphore};

View file

@ -5,7 +5,9 @@ edition = "2021"
[dependencies] [dependencies]
monero-wire = {path = "../net/monero-wire"} monero-wire = {path = "../net/monero-wire"}
monero-peer = {path = "../p2p/monero-peer"} monero-p2p = {path = "../p2p/monero-p2p" }
futures = "0.3.29" futures = "0.3.29"
async-trait = "0.1.74" async-trait = "0.1.74"
borsh = {version = "1.2.1", features = ["derive"]}

View file

@ -6,6 +6,7 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use borsh::{BorshDeserialize, BorshSerialize};
use futures::{channel::mpsc::Sender as InnerSender, stream::BoxStream, Sink}; use futures::{channel::mpsc::Sender as InnerSender, stream::BoxStream, Sink};
use monero_wire::{ use monero_wire::{
@ -13,11 +14,23 @@ use monero_wire::{
BucketError, Message, BucketError, Message,
}; };
use monero_peer::NetworkZone; use monero_p2p::{NetZoneAddress, NetworkZone};
#[derive(Clone)] #[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, BorshSerialize, BorshDeserialize)]
pub struct TestNetZoneAddr(pub u32); pub struct TestNetZoneAddr(pub u32);
impl NetZoneAddress for TestNetZoneAddr {
type BanID = Self;
fn ban_id(&self) -> Self::BanID {
*self
}
fn should_add_to_peer_list(&self) -> bool {
true
}
}
impl std::fmt::Display for TestNetZoneAddr { impl std::fmt::Display for TestNetZoneAddr {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(format!("test client, id: {}", self.0).as_str()) f.write_str(format!("test client, id: {}", self.0).as_str())
@ -83,13 +96,14 @@ impl Sink<Message> for Sender {
} }
} }
#[derive(Clone)] #[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct TestNetZone<const ALLOW_SYNC: bool, const DANDELION_PP: bool, const CHECK_NODE_ID: bool>; pub struct TestNetZone<const ALLOW_SYNC: bool, const DANDELION_PP: bool, const CHECK_NODE_ID: bool>;
#[async_trait::async_trait] #[async_trait::async_trait]
impl<const ALLOW_SYNC: bool, const DANDELION_PP: bool, const CHECK_NODE_ID: bool> NetworkZone impl<const ALLOW_SYNC: bool, const DANDELION_PP: bool, const CHECK_NODE_ID: bool> NetworkZone
for TestNetZone<ALLOW_SYNC, DANDELION_PP, CHECK_NODE_ID> for TestNetZone<ALLOW_SYNC, DANDELION_PP, CHECK_NODE_ID>
{ {
const NAME: &'static str = "Testing";
const ALLOW_SYNC: bool = ALLOW_SYNC; const ALLOW_SYNC: bool = ALLOW_SYNC;
const DANDELION_PP: bool = DANDELION_PP; const DANDELION_PP: bool = DANDELION_PP;
const CHECK_NODE_ID: bool = CHECK_NODE_ID; const CHECK_NODE_ID: bool = CHECK_NODE_ID;