p2p: remove old code (#74)

This commit is contained in:
Boog900 2024-02-25 13:42:27 +00:00 committed by GitHub
parent 083c0515d2
commit cf0fcfb6c5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 0 additions and 3648 deletions

View file

@ -1,23 +0,0 @@
[package]
name = "cuprate-p2p"
version = "0.1.0"
edition = "2021"
license = "AGPL-3.0-only"
authors = ["Boog900"]
[dependencies]
chrono = "0.4.24"
thiserror = "1.0.39"
cuprate-common = {path = "../common"}
monero-wire = {path= "../net/monero-wire"}
futures = "0.3.26"
tower = {version = "0.4.13", features = ["util", "steer", "load", "discover", "load-shed", "buffer", "timeout"]}
tokio = {version= "1.27", features=["rt", "time", "net"]}
tokio-util = {version = "0.7.8", features=["codec"]}
tokio-stream = {version="0.1.14", features=["time"]}
async-trait = "0.1.68"
tracing = "0.1.37"
tracing-error = "0.2.0"
rand = "0.8.5"
pin-project = "1.0.12"

View file

@ -1,14 +0,0 @@
Copyright (C) 2023 Cuprate Contributors
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.

View file

@ -1,157 +0,0 @@
//! Cuprate Address Book
//!
//! This module holds the logic for persistent peer storage.
//! Cuprates address book is modeled as a [`tower::Service`]
//! The request is [`AddressBookRequest`] and the response is
//! [`AddressBookResponse`].
//!
//! Cuprate, like monerod, actually has 3 address books, one
//! for each [`NetZone`]. This is to reduce the possibility of
//! clear net peers getting linked to their dark counterparts
//! and so peers will only get told about peers they can
//! connect to.
//!
mod addr_book_client;
mod address_book;
pub mod connection_handle;
use cuprate_common::PruningSeed;
use monero_wire::{messages::PeerListEntryBase, network_address::NetZone, NetworkAddress, PeerID};
use connection_handle::ConnectionAddressBookHandle;
pub use addr_book_client::start_address_book;
/// Possible errors when dealing with the address book.
/// This is boxed when returning an error in the [`tower::Service`].
#[derive(Debug, thiserror::Error)]
pub enum AddressBookError {
/// The peer is not in the address book for this zone.
#[error("Peer was not found in book")]
PeerNotFound,
/// The peer list is empty.
#[error("The peer list is empty")]
PeerListEmpty,
/// The peers pruning seed has changed.
#[error("The peers pruning seed has changed")]
PeersPruningSeedChanged,
/// The peer is banned.
#[error("The peer is banned")]
PeerIsBanned,
/// When handling a received peer list, the list contains
/// a peer in a different [`NetZone`]
#[error("Peer sent an address out of it's net-zone")]
PeerSentAnAddressOutOfZone,
/// The channel to the address book has closed unexpectedly.
#[error("The address books channel has closed.")]
AddressBooksChannelClosed,
/// The address book task has exited.
#[error("The address book task has exited.")]
AddressBookTaskExited,
/// The peer file store has failed.
#[error("Peer Store Error: {0}")]
PeerStoreError(&'static str),
}
/// A message sent to tell the address book that a peer has disconnected.
pub struct PeerConnectionClosed;
/// A request to the address book.
#[derive(Debug)]
pub enum AddressBookRequest {
/// A request to handle an incoming peer list.
HandleNewPeerList(Vec<PeerListEntryBase>, NetZone),
/// Updates the `last_seen` timestamp of this peer.
SetPeerSeen(PeerID, chrono::NaiveDateTime, NetZone),
/// Bans a peer for the specified duration. This request
/// will send disconnect signals to all peers with the same
/// [`ban_identifier`](NetworkAddress::ban_identifier).
BanPeer(PeerID, std::time::Duration, NetZone),
/// Adds a peer to the connected list
ConnectedToPeer {
/// The net zone of this connection.
zone: NetZone,
/// A handle between the connection and address book.
connection_handle: ConnectionAddressBookHandle,
/// The connection addr, None if the peer is using a
/// hidden network.
addr: Option<NetworkAddress>,
/// The peers id.
id: PeerID,
/// If the peer is reachable by our node.
reachable: bool,
/// The last seen timestamp, note: Cuprate may skip updating this
/// field on some inbound messages
last_seen: chrono::NaiveDateTime,
/// The peers pruning seed
pruning_seed: PruningSeed,
/// The peers port.
rpc_port: u16,
/// The peers rpc credits per hash
rpc_credits_per_hash: u32,
},
/// A request to get and eempty the anchor list,
/// used when starting the node.
GetAndEmptyAnchorList(NetZone),
/// Get a random Gray peer from the peer list
/// If a pruning seed is given we will select from
/// peers with that seed and peers that dont prune.
GetRandomGrayPeer(NetZone, Option<PruningSeed>),
/// Get a random White peer from the peer list
/// If a pruning seed is given we will select from
/// peers with that seed and peers that dont prune.
GetRandomWhitePeer(NetZone, Option<PruningSeed>),
/// Get a list of random peers from the white list,
/// The list will be less than or equal to the provided
/// len.
GetRandomWhitePeers(NetZone, usize),
}
impl std::fmt::Display for AddressBookRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::HandleNewPeerList(..) => f.write_str("HandleNewPeerList"),
Self::SetPeerSeen(..) => f.write_str("SetPeerSeen"),
Self::BanPeer(..) => f.write_str("BanPeer"),
Self::ConnectedToPeer { .. } => f.write_str("ConnectedToPeer"),
Self::GetAndEmptyAnchorList(_) => f.write_str("GetAndEmptyAnchorList"),
Self::GetRandomGrayPeer(..) => f.write_str("GetRandomGrayPeer"),
Self::GetRandomWhitePeer(..) => f.write_str("GetRandomWhitePeer"),
Self::GetRandomWhitePeers(_, len) => {
f.write_str(&format!("GetRandomWhitePeers, len: {len}"))
}
}
}
}
impl AddressBookRequest {
/// Gets the [`NetZone`] for this request so we can
/// route it to the required address book.
pub fn get_zone(&self) -> NetZone {
match self {
Self::HandleNewPeerList(_, zone) => *zone,
Self::SetPeerSeen(.., zone) => *zone,
Self::BanPeer(.., zone) => *zone,
Self::ConnectedToPeer { zone, .. } => *zone,
Self::GetAndEmptyAnchorList(zone) => *zone,
Self::GetRandomGrayPeer(zone, _) => *zone,
Self::GetRandomWhitePeer(zone, _) => *zone,
Self::GetRandomWhitePeers(zone, _) => *zone,
}
}
}
/// A response from the AddressBook.
#[derive(Debug)]
pub enum AddressBookResponse {
/// The request was handled ok.
Ok,
/// A peer.
Peer(PeerListEntryBase),
/// A list of peers.
Peers(Vec<PeerListEntryBase>),
}

View file

@ -1,166 +0,0 @@
//! This module holds the address books client and [`tower::Service`].
//!
//! To start the address book use [`start_address_book`].
// TODO: Store banned peers persistently.
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
use futures::channel::{mpsc, oneshot};
use futures::FutureExt;
use tokio::task::{spawn, JoinHandle};
use tower::steer::Steer;
use tower::BoxError;
use tracing::Instrument;
use monero_wire::network_address::NetZone;
use crate::{Config, P2PStore};
use super::address_book::{AddressBook, AddressBookClientRequest};
use super::{AddressBookError, AddressBookRequest, AddressBookResponse};
/// Start the address book.
/// Under the hood this function spawns 3 address books
/// for the 3 [`NetZone`] and combines them into a [`tower::Steer`](Steer).
pub async fn start_address_book<S>(
peer_store: S,
config: Config,
) -> Result<
impl tower::Service<
AddressBookRequest,
Response = AddressBookResponse,
Error = BoxError,
Future = Pin<
Box<dyn Future<Output = Result<AddressBookResponse, BoxError>> + Send + 'static>,
>,
>,
BoxError,
>
where
S: P2PStore,
{
let mut builder = AddressBookBuilder::new(peer_store, config);
let public = builder.build(NetZone::Public).await?;
let tor = builder.build(NetZone::Tor).await?;
let i2p = builder.build(NetZone::I2p).await?;
// This list MUST be in the same order as closuer in the `Steer` func
let books = vec![public, tor, i2p];
Ok(Steer::new(
books,
|req: &AddressBookRequest, _: &[_]| match req.get_zone() {
// This:
NetZone::Public => 0,
NetZone::Tor => 1,
NetZone::I2p => 2,
},
))
}
/// An address book builder.
/// This:
/// - starts the address book
/// - creates and returns the `AddressBookClient`
struct AddressBookBuilder<S> {
peer_store: S,
config: Config,
}
impl<S> AddressBookBuilder<S>
where
S: P2PStore,
{
fn new(peer_store: S, config: Config) -> Self {
AddressBookBuilder { peer_store, config }
}
/// Builds the address book for a specific [`NetZone`]
async fn build(&mut self, zone: NetZone) -> Result<AddressBookClient, AddressBookError> {
let (white, gray, anchor) = self
.peer_store
.load_peers(zone)
.await
.map_err(|e| AddressBookError::PeerStoreError(e))?;
let book = AddressBook::new(
self.config.clone(),
zone,
white,
gray,
anchor,
vec![],
self.peer_store.clone(),
);
let (tx, rx) = mpsc::channel(0);
let book_span = tracing::info_span!("AddressBook", book = book.book_name());
let book_handle = spawn(book.run(rx).instrument(book_span));
Ok(AddressBookClient {
book: tx,
book_handle,
})
}
}
/// The Client for an individual address book.
#[derive(Debug)]
struct AddressBookClient {
/// The channel to pass requests to the address book.
book: mpsc::Sender<AddressBookClientRequest>,
/// The address book task handle.
book_handle: JoinHandle<()>,
}
impl tower::Service<AddressBookRequest> for AddressBookClient {
type Response = AddressBookResponse;
type Error = BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
// Check the channel
match self.book.poll_ready(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(())) => (),
Poll::Ready(Err(_)) => {
return Poll::Ready(Err(AddressBookError::AddressBooksChannelClosed.into()))
}
}
// Check the address book task is still running
match self.book_handle.poll_unpin(cx) {
// The address book is still running
Poll::Pending => Poll::Ready(Ok(())),
// The address book task has exited
Poll::Ready(_) => Err(AddressBookError::AddressBookTaskExited)?,
}
}
fn call(&mut self, req: AddressBookRequest) -> Self::Future {
let (tx, rx) = oneshot::channel();
// get the callers span
let span = tracing::debug_span!(parent: &tracing::span::Span::current(), "AddressBook");
let req = AddressBookClientRequest { req, tx, span };
match self.book.try_send(req) {
Err(_e) => {
// I'm assuming all callers will call `poll_ready` first (which they are supposed to)
futures::future::ready(Err(AddressBookError::AddressBooksChannelClosed.into()))
.boxed()
}
Ok(()) => async move {
rx.await
.expect("Address Book will not drop requests until completed")
.map_err(Into::into)
}
.boxed(),
}
}
}

View file

@ -1,594 +0,0 @@
//! This module contains the actual address book logic.
//!
//! The address book is split into multiple [`PeerList`]:
//!
//! - A White list: For peers we have connected to ourselves.
//!
//! - A Gray list: For Peers we have been told about but
//! haven't connected to ourselves.
//!
//! - An Anchor list: This holds peers we are currently
//! connected to that are reachable if we were to
//! connect to them again. For example an inbound proxy
//! connection would not get added to this list as we cant
//! connect to this peer ourselves. Behind the scenes we
//! are just storing the key to a peer in the white list.
//!
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::stream::FuturesUnordered;
use futures::{
channel::{mpsc, oneshot},
FutureExt, Stream, StreamExt,
};
use pin_project::pin_project;
use rand::prelude::SliceRandom;
use cuprate_common::shutdown::is_shutting_down;
use cuprate_common::PruningSeed;
use monero_wire::{messages::PeerListEntryBase, network_address::NetZone, NetworkAddress, PeerID};
use super::{AddressBookError, AddressBookRequest, AddressBookResponse};
use crate::address_book::connection_handle::ConnectionAddressBookHandle;
use crate::{constants::ADDRESS_BOOK_SAVE_INTERVAL, Config, P2PStore};
mod peer_list;
use peer_list::PeerList;
#[cfg(test)]
mod tests;
/// A request sent to the address book task.
pub(crate) struct AddressBookClientRequest {
/// The request
pub req: AddressBookRequest,
/// A oneshot to send the result down
pub tx: oneshot::Sender<Result<AddressBookResponse, AddressBookError>>,
/// The tracing span to keep the context of the request
pub span: tracing::Span,
}
/// An entry in the connected list.
pub struct ConnectionPeerEntry {
/// A oneshot sent from the Connection when it has finished.
connection_handle: ConnectionAddressBookHandle,
/// The connection addr, None if the peer is connected through
/// a hidden network.
addr: Option<NetworkAddress>,
/// If the peer is reachable by our node.
reachable: bool,
/// The last seen timestamp, note: Cuprate may skip updating this
/// field on some inbound messages
last_seen: chrono::NaiveDateTime,
/// The peers pruning seed
pruning_seed: PruningSeed,
/// The peers port.
rpc_port: u16,
/// The peers rpc credits per hash
rpc_credits_per_hash: u32,
}
/// A future that resolves when a peer is unbanned.
#[pin_project(project = EnumProj)]
pub struct BanedPeerFut(Vec<u8>, #[pin] tokio::time::Sleep);
impl Future for BanedPeerFut {
type Output = Vec<u8>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
match this.1.poll_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(this.0.clone()),
}
}
}
/// The address book for a specific [`NetZone`]
pub struct AddressBook<PeerStore> {
/// The [`NetZone`] of this address book.
zone: NetZone,
/// A copy of the nodes configuration.
config: Config,
/// The Address books white list.
white_list: PeerList,
/// The Address books gray list.
gray_list: PeerList,
/// The Address books anchor list.
anchor_list: HashSet<NetworkAddress>,
/// The Currently connected peers.
connected_peers: HashMap<PeerID, ConnectionPeerEntry>,
/// A tuple of:
/// - A hashset of [`ban_identifier`](NetworkAddress::ban_identifier)
/// - A [`FuturesUnordered`] which contains futures for every ban_id
/// that will resolve when the ban_id should be un banned.
baned_peers: (HashSet<Vec<u8>>, FuturesUnordered<BanedPeerFut>),
/// The peer store to save the peers to persistent storage
p2p_store: PeerStore,
}
impl<PeerStore: P2PStore> AddressBook<PeerStore> {
/// Creates a new address book for a given [`NetZone`]
pub fn new(
config: Config,
zone: NetZone,
white_peers: Vec<PeerListEntryBase>,
gray_peers: Vec<PeerListEntryBase>,
anchor_peers: Vec<NetworkAddress>,
baned_peers: Vec<(NetworkAddress, chrono::NaiveDateTime)>,
p2p_store: PeerStore,
) -> Self {
let white_list = PeerList::new(white_peers);
let gray_list = PeerList::new(gray_peers);
let anchor_list = HashSet::from_iter(anchor_peers);
let baned_peers = (HashSet::new(), FuturesUnordered::new());
let connected_peers = HashMap::new();
AddressBook {
zone,
config,
white_list,
gray_list,
anchor_list,
connected_peers,
baned_peers,
p2p_store,
}
}
/// Returns the books name (Based on the [`NetZone`])
pub const fn book_name(&self) -> &'static str {
match self.zone {
NetZone::Public => "PublicAddressBook",
NetZone::Tor => "TorAddressBook",
NetZone::I2p => "I2pAddressBook",
}
}
/// Returns the length of the white list
fn len_white_list(&self) -> usize {
self.white_list.len()
}
/// Returns the length of the gray list
fn len_gray_list(&self) -> usize {
self.gray_list.len()
}
/// Returns the length of the anchor list
fn len_anchor_list(&self) -> usize {
self.anchor_list.len()
}
/// Returns the length of the banned list
fn len_banned_list(&self) -> usize {
self.baned_peers.0.len()
}
/// Returns the maximum length of the white list
/// *note this list can grow bigger if we are connected to more
/// than this amount.
fn max_white_peers(&self) -> usize {
self.config.max_white_peers()
}
/// Returns the maximum length of the gray list
fn max_gray_peers(&self) -> usize {
self.config.max_gray_peers()
}
/// Checks if a peer is banned.
fn is_peer_banned(&self, peer: &NetworkAddress) -> bool {
self.baned_peers.0.contains(&peer.ban_identifier())
}
/// Checks if banned peers should be unbanned as the duration has elapsed
fn check_unban_peers(&mut self) {
while let Some(Some(addr)) = Pin::new(&mut self.baned_peers.1).next().now_or_never() {
tracing::debug!("Unbanning peer: {addr:?}");
self.baned_peers.0.remove(&addr);
}
}
/// Checks if peers have disconnected, if they have removing them from the
/// connected and anchor list.
fn check_connected_peers(&mut self) {
let mut remove_from_anchor = vec![];
// We dont have to worry about updating our white list with the information
// before we remove the peers as that happens on every save.
self.connected_peers.retain(|_, peer| {
if !peer.connection_handle.connection_closed() {
// add the peer to the list to get removed from the anchor
if let Some(addr) = peer.addr {
remove_from_anchor.push(addr)
}
false
} else {
true
}
});
// If we are shutting down we want to keep our anchor peers for
// the next time we boot up so we dont remove disconnecting peers
// from the anchor list if we are shutting down.
if !is_shutting_down() {
for peer in remove_from_anchor {
self.anchor_list.remove(&peer);
}
}
}
// Bans the peer and tells the connection tasks of peers with the same ban id to shutdown.
fn ban_peer(
&mut self,
peer: PeerID,
time: std::time::Duration,
) -> Result<(), AddressBookError> {
tracing::debug!("Banning peer: {peer:?} for: {time:?}");
let Some(conn_entry) = self.connected_peers.get(&peer) else {
tracing::debug!("Peer is not in connected list");
return Err(AddressBookError::PeerNotFound);
};
// tell the connection task to finish.
conn_entry.connection_handle.kill_connection();
// try find the NetworkAddress of the peer
let Some(addr) = conn_entry.addr else {
tracing::debug!("Peer does not have an address we can ban");
return Ok(());
};
let ban_id = addr.ban_identifier();
self.white_list.remove_peers_with_ban_id(&ban_id);
self.gray_list.remove_peers_with_ban_id(&ban_id);
// Dont remove from anchor list or connection list as this will happen when
// the connection is closed.
// tell the connection task of peers with the same ban id to shutdown.
for conn in self.connected_peers.values() {
if let Some(addr) = conn.addr {
if addr.ban_identifier() == ban_id {
conn.connection_handle.kill_connection()
}
}
}
// add the ban identifier to the ban list
self.baned_peers.0.insert(ban_id.clone());
self.baned_peers
.1
.push(BanedPeerFut(ban_id, tokio::time::sleep(time)));
Ok(())
}
/// Update the last seen timestamp of a connected peer.
fn update_last_seen(
&mut self,
peer: PeerID,
last_seen: chrono::NaiveDateTime,
) -> Result<(), AddressBookError> {
if let Some(mut peer) = self.connected_peers.get_mut(&peer) {
peer.last_seen = last_seen;
Ok(())
} else {
Err(AddressBookError::PeerNotFound)
}
}
/// adds a peer to the gray list.
fn add_peer_to_gray_list(&mut self, mut peer: PeerListEntryBase) {
if self.white_list.contains_peer(&peer.adr) {
return;
};
if !self.gray_list.contains_peer(&peer.adr) {
peer.last_seen = 0;
self.gray_list.add_new_peer(peer);
}
}
/// handles an incoming peer list,
/// dose some basic validation on the addresses
/// appends the good peers to our book.
fn handle_new_peerlist(
&mut self,
mut peers: Vec<PeerListEntryBase>,
) -> Result<(), AddressBookError> {
let length = peers.len();
tracing::debug!("Received new peer list, length: {length}");
let mut err = None;
peers.retain(|peer| {
if err.is_some() {
false
} else if peer.adr.is_local() || peer.adr.is_loopback() {
false
} else if peer.adr.port() == peer.rpc_port {
false
} else if PruningSeed::try_from(peer.pruning_seed).is_err() {
false
} else if peer.adr.get_zone() != self.zone {
tracing::info!("Received an address from a different network zone, ignoring list.");
err = Some(AddressBookError::PeerSentAnAddressOutOfZone);
false
} else if self.is_peer_banned(&peer.adr) {
false
} else {
true
}
});
if let Some(e) = err {
return Err(e);
} else {
for peer in peers {
self.add_peer_to_gray_list(peer);
}
self.gray_list
.reduce_list(&HashSet::new(), self.max_gray_peers());
Ok(())
}
}
/// Gets a random peer from our gray list.
/// If pruning seed is set we will get a peer with that pruning seed.
fn get_random_gray_peer(
&mut self,
pruning_seed: Option<PruningSeed>,
) -> Option<PeerListEntryBase> {
self.gray_list
.get_random_peer(&mut rand::thread_rng(), pruning_seed.map(Into::into))
.map(|p| *p)
}
/// Gets a random peer from our white list.
/// If pruning seed is set we will get a peer with that pruning seed.
fn get_random_white_peer(
&mut self,
pruning_seed: Option<PruningSeed>,
) -> Option<PeerListEntryBase> {
self.white_list
.get_random_peer(&mut rand::thread_rng(), pruning_seed.map(Into::into))
.map(|p| *p)
}
/// Gets random peers from our white list.
/// will be less than or equal to `len`.
fn get_random_white_peers(&mut self, len: usize) -> Vec<PeerListEntryBase> {
let white_len = self.white_list.len();
let len = if len < white_len { len } else { white_len };
let mut white_peers: Vec<&PeerListEntryBase> = self.white_list.iter_all_peers().collect();
white_peers.shuffle(&mut rand::thread_rng());
white_peers[0..len].iter().map(|peb| **peb).collect()
}
/// Updates an entry in the white list, if the peer is not found and `reachable` is true then
/// the peer will be added to the white list.
fn update_white_list_peer_entry(
&mut self,
addr: &NetworkAddress,
id: PeerID,
conn_entry: &ConnectionPeerEntry,
) -> Result<(), AddressBookError> {
if let Some(peb) = self.white_list.get_peer_mut(addr) {
if peb.pruning_seed == conn_entry.pruning_seed.into() {
return Err(AddressBookError::PeersPruningSeedChanged);
}
peb.id = id;
peb.last_seen = conn_entry.last_seen.timestamp();
peb.rpc_port = conn_entry.rpc_port;
peb.rpc_credits_per_hash = conn_entry.rpc_credits_per_hash;
peb.pruning_seed = conn_entry.pruning_seed.into();
} else if conn_entry.reachable {
// if the peer is reachable add it to our white list
let peb = PeerListEntryBase {
id,
adr: *addr,
last_seen: conn_entry.last_seen.timestamp(),
rpc_port: conn_entry.rpc_port,
rpc_credits_per_hash: conn_entry.rpc_credits_per_hash,
pruning_seed: conn_entry.pruning_seed.into(),
};
self.white_list.add_new_peer(peb);
}
Ok(())
}
/// Handles a new connection, adding it to the white list if the
/// peer is reachable by our node.
fn handle_new_connection(
&mut self,
connection_handle: ConnectionAddressBookHandle,
addr: Option<NetworkAddress>,
id: PeerID,
reachable: bool,
last_seen: chrono::NaiveDateTime,
pruning_seed: PruningSeed,
rpc_port: u16,
rpc_credits_per_hash: u32,
) -> Result<(), AddressBookError> {
let connection_entry = ConnectionPeerEntry {
connection_handle,
addr,
reachable,
last_seen,
pruning_seed,
rpc_port,
rpc_credits_per_hash,
};
if let Some(addr) = addr {
if self.baned_peers.0.contains(&addr.ban_identifier()) {
return Err(AddressBookError::PeerIsBanned);
}
// remove the peer from the gray list as we know it's active.
let _ = self.gray_list.remove_peer(&addr);
if !reachable {
// If we can't reach the peer remove it from the white list as well
let _ = self.white_list.remove_peer(&addr);
} else {
// The peer is reachable, update our white list and add it to the anchor connections.
self.update_white_list_peer_entry(&addr, id, &connection_entry)?;
self.anchor_list.insert(addr);
}
}
self.connected_peers.insert(id, connection_entry);
self.white_list
.reduce_list(&self.anchor_list, self.max_white_peers());
Ok(())
}
/// Get and empties the anchor list, used at startup to
/// connect to some peers we were previously connected to.
fn get_and_empty_anchor_list(&mut self) -> Vec<PeerListEntryBase> {
self.anchor_list
.drain()
.map(|addr| {
self.white_list
.get_peer(&addr)
.expect("If peer is in anchor it must be in white list")
.clone()
})
.collect()
}
/// Handles an [`AddressBookClientRequest`] to the address book.
async fn handle_request(&mut self, req: AddressBookClientRequest) {
let _guard = req.span.enter();
tracing::trace!("received request: {}", req.req);
let res = match req.req {
AddressBookRequest::HandleNewPeerList(new_peers, _) => self
.handle_new_peerlist(new_peers)
.map(|_| AddressBookResponse::Ok),
AddressBookRequest::SetPeerSeen(peer, last_seen, _) => self
.update_last_seen(peer, last_seen)
.map(|_| AddressBookResponse::Ok),
AddressBookRequest::BanPeer(peer, time, _) => {
self.ban_peer(peer, time).map(|_| AddressBookResponse::Ok)
}
AddressBookRequest::ConnectedToPeer {
zone: _,
connection_handle,
addr,
id,
reachable,
last_seen,
pruning_seed,
rpc_port,
rpc_credits_per_hash,
} => self
.handle_new_connection(
connection_handle,
addr,
id,
reachable,
last_seen,
pruning_seed,
rpc_port,
rpc_credits_per_hash,
)
.map(|_| AddressBookResponse::Ok),
AddressBookRequest::GetAndEmptyAnchorList(_) => {
Ok(AddressBookResponse::Peers(self.get_and_empty_anchor_list()))
}
AddressBookRequest::GetRandomGrayPeer(_, pruning_seed) => {
match self.get_random_gray_peer(pruning_seed) {
Some(peer) => Ok(AddressBookResponse::Peer(peer)),
None => Err(AddressBookError::PeerListEmpty),
}
}
AddressBookRequest::GetRandomWhitePeer(_, pruning_seed) => {
match self.get_random_white_peer(pruning_seed) {
Some(peer) => Ok(AddressBookResponse::Peer(peer)),
None => Err(AddressBookError::PeerListEmpty),
}
}
AddressBookRequest::GetRandomWhitePeers(_, len) => {
Ok(AddressBookResponse::Peers(self.get_random_white_peers(len)))
}
};
if let Err(e) = &res {
tracing::debug!("Error when handling request, err: {e}")
}
let _ = req.tx.send(res);
}
/// Updates the white list with the information in the `connected_peers` list.
/// This only updates the `last_seen` timestamp as that's the only thing that should
/// change during connections.
fn update_white_list_with_conn_list(&mut self) {
for (_, peer) in self.connected_peers.iter() {
if peer.reachable {
if let Some(peer_eb) = self.white_list.get_peer_mut(&peer.addr.unwrap()) {
peer_eb.last_seen = peer.last_seen.timestamp();
}
}
}
}
/// Saves the address book to persistent storage.
/// TODO: save the banned peer list.
#[tracing::instrument(level="trace", skip(self), fields(name = self.book_name()) )]
async fn save(&mut self) {
self.update_white_list_with_conn_list();
tracing::trace!(
"white_len: {}, gray_len: {}, anchor_len: {}, banned_len: {}",
self.len_white_list(),
self.len_gray_list(),
self.len_anchor_list(),
self.len_banned_list()
);
let res = self
.p2p_store
.save_peers(
self.zone,
(&self.white_list).into(),
(&self.gray_list).into(),
self.anchor_list.iter().collect(),
)
.await;
match res {
Ok(()) => tracing::trace!("Complete"),
Err(e) => tracing::error!("Error saving address book: {e}"),
}
}
/// Runs the address book task
/// Should be spawned in a task.
pub(crate) async fn run(mut self, mut rx: mpsc::Receiver<AddressBookClientRequest>) {
let mut save_interval = {
let mut interval = tokio::time::interval(ADDRESS_BOOK_SAVE_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
// Interval ticks at 0, interval, 2 interval, ...
// this is just to ignore the first tick
interval.tick().await;
tokio_stream::wrappers::IntervalStream::new(interval).fuse()
};
loop {
self.check_unban_peers();
self.check_connected_peers();
futures::select! {
req = rx.next() => {
if let Some(req) = req {
self.handle_request(req).await
} else {
tracing::debug!("{} req channel closed, saving and shutting down book", self.book_name());
self.save().await;
return;
}
}
_ = save_interval.next() => self.save().await
}
}
}
}

View file

@ -1,231 +0,0 @@
//! This module contains the individual address books peer lists.
//!
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use cuprate_common::CRYPTONOTE_PRUNING_LOG_STRIPES;
use monero_wire::{messages::PeerListEntryBase, NetworkAddress};
use rand::Rng;
#[cfg(test)]
mod tests;
/// A Peer list in the address book.
///
/// This could either be the white list or gray list.
pub struct PeerList {
/// The peers with their peer data.
peers: HashMap<NetworkAddress, PeerListEntryBase>,
/// An index of Pruning seed to address, so
/// can quickly grab peers with the pruning seed
/// we want.
pruning_idxs: HashMap<u32, Vec<NetworkAddress>>,
/// An index of [`ban_identifier`](NetworkAddress::ban_identifier) to Address
/// to allow us to quickly remove baned peers.
ban_id_idxs: HashMap<Vec<u8>, Vec<NetworkAddress>>,
}
impl<'a> Into<Vec<&'a PeerListEntryBase>> for &'a PeerList {
fn into(self) -> Vec<&'a PeerListEntryBase> {
self.peers.iter().map(|(_, peb)| peb).collect()
}
}
impl PeerList {
/// Creates a new peer list.
pub fn new(list: Vec<PeerListEntryBase>) -> PeerList {
let mut peers = HashMap::with_capacity(list.len());
let mut pruning_idxs = HashMap::with_capacity(2 << CRYPTONOTE_PRUNING_LOG_STRIPES);
let mut ban_id_idxs = HashMap::with_capacity(list.len()); // worse case, every peer has a different NetworkAddress and ban id
for peer in list {
peers.insert(peer.adr, peer);
pruning_idxs
.entry(peer.pruning_seed)
.or_insert_with(Vec::new)
.push(peer.adr);
ban_id_idxs
.entry(peer.adr.ban_identifier())
.or_insert_with(Vec::new)
.push(peer.adr);
}
PeerList {
peers,
pruning_idxs,
ban_id_idxs,
}
}
/// Gets the length of the peer list
pub fn len(&self) -> usize {
self.peers.len()
}
/// Adds a new peer to the peer list
pub fn add_new_peer(&mut self, peer: PeerListEntryBase) {
if let None = self.peers.insert(peer.adr, peer) {
self.pruning_idxs
.entry(peer.pruning_seed)
.or_insert_with(Vec::new)
.push(peer.adr);
self.ban_id_idxs
.entry(peer.adr.ban_identifier())
.or_insert_with(Vec::new)
.push(peer.adr);
}
}
/// Gets a reference to a peer
pub fn get_peer(&self, peer: &NetworkAddress) -> Option<&PeerListEntryBase> {
self.peers.get(peer)
}
/// Returns an iterator over every peer in this peer list
pub fn iter_all_peers(&self) -> impl Iterator<Item = &PeerListEntryBase> {
self.peers.values()
}
/// Returns a random peer.
/// If the pruning seed is specified then we will get a random peer with
/// that pruning seed otherwise we will just get a random peer in the whole
/// list.
pub fn get_random_peer<R: Rng>(
&self,
r: &mut R,
pruning_seed: Option<u32>,
) -> Option<&PeerListEntryBase> {
if let Some(seed) = pruning_seed {
let mut peers = self.get_peers_with_pruning(&seed)?;
let len = self.len_by_seed(&seed);
if len == 0 {
None
} else {
let n = r.gen_range(0..len);
peers.nth(n)
}
} else {
let mut peers = self.iter_all_peers();
let len = self.len();
if len == 0 {
None
} else {
let n = r.gen_range(0..len);
peers.nth(n)
}
}
}
/// Returns a mutable reference to a peer.
pub fn get_peer_mut(&mut self, peer: &NetworkAddress) -> Option<&mut PeerListEntryBase> {
self.peers.get_mut(peer)
}
/// Returns true if the list contains this peer.
pub fn contains_peer(&self, peer: &NetworkAddress) -> bool {
self.peers.contains_key(peer)
}
/// Returns an iterator of peer info of peers with a specific pruning seed.
fn get_peers_with_pruning(
&self,
seed: &u32,
) -> Option<impl Iterator<Item = &PeerListEntryBase>> {
let addrs = self.pruning_idxs.get(seed)?;
Some(addrs.iter().map(move |addr| {
self.peers
.get(addr)
.expect("Address must be in peer list if we have an idx for it")
}))
}
/// Removes a peer from the pruning idx
///
/// MUST NOT BE USED ALONE
fn remove_peer_pruning_idx(&mut self, peer: &PeerListEntryBase) {
remove_peer_idx(&mut self.pruning_idxs, &peer.pruning_seed, &peer.adr)
}
/// Removes a peer from the ban idx
///
/// MUST NOT BE USED ALONE
fn remove_peer_ban_idx(&mut self, peer: &PeerListEntryBase) {
remove_peer_idx(&mut self.ban_id_idxs, &peer.adr.ban_identifier(), &peer.adr)
}
/// Removes a peer from all the indexes
///
/// MUST NOT BE USED ALONE
fn remove_peer_from_all_idxs(&mut self, peer: &PeerListEntryBase) {
self.remove_peer_ban_idx(peer);
self.remove_peer_pruning_idx(peer);
}
/// Removes a peer from the peer list
pub fn remove_peer(&mut self, peer: &NetworkAddress) -> Option<PeerListEntryBase> {
let peer_eb = self.peers.remove(peer)?;
self.remove_peer_from_all_idxs(&peer_eb);
Some(peer_eb)
}
/// Removes all peers with a specific ban id.
pub fn remove_peers_with_ban_id(&mut self, ban_id: &Vec<u8>) {
let Some(addresses) = self.ban_id_idxs.get(ban_id) else {
// No peers to ban
return;
};
for addr in addresses.clone() {
self.remove_peer(&addr);
}
}
/// Tries to reduce the peer list to `new_len`.
///
/// This function could keep the list bigger than `new_len` if `must_keep_peers`s length
/// is larger than new_len, in that case we will remove as much as we can.
pub fn reduce_list(&mut self, must_keep_peers: &HashSet<NetworkAddress>, new_len: usize) {
if new_len >= self.len() {
return;
}
let target_removed = self.len() - new_len;
let mut removed_count = 0;
let mut peers_to_remove: Vec<NetworkAddress> = Vec::with_capacity(target_removed);
for (peer_adr, _) in &self.peers {
if removed_count >= target_removed {
break;
}
if !must_keep_peers.contains(peer_adr) {
peers_to_remove.push(*peer_adr);
removed_count += 1;
}
}
for peer_adr in peers_to_remove {
let _ = self.remove_peer(&peer_adr);
}
}
}
/// Remove a peer from an index.
fn remove_peer_idx<T: Hash + Eq + PartialEq>(
idx_map: &mut HashMap<T, Vec<NetworkAddress>>,
idx: &T,
addr: &NetworkAddress,
) {
if let Some(peer_list) = idx_map.get_mut(idx) {
if let Some(idx) = peer_list.iter().position(|peer_adr| peer_adr == addr) {
peer_list.swap_remove(idx);
} else {
unreachable!("This function will only be called when the peer exists.");
}
} else {
unreachable!("Index must exist if a peer has that index");
}
}

View file

@ -1,176 +0,0 @@
use std::{collections::HashSet, vec};
use monero_wire::{messages::PeerListEntryBase, NetworkAddress};
use rand::Rng;
use super::PeerList;
fn make_fake_peer_list(numb_o_peers: usize) -> PeerList {
let mut peer_list = vec![PeerListEntryBase::default(); numb_o_peers];
for (idx, peer) in peer_list.iter_mut().enumerate() {
let NetworkAddress::IPv4(ip) = &mut peer.adr else {panic!("this test requires default to be ipv4")};
ip.m_ip += idx as u32;
}
PeerList::new(peer_list)
}
fn make_fake_peer_list_with_random_pruning_seeds(numb_o_peers: usize) -> PeerList {
let mut r = rand::thread_rng();
let mut peer_list = vec![PeerListEntryBase::default(); numb_o_peers];
for (idx, peer) in peer_list.iter_mut().enumerate() {
let NetworkAddress::IPv4(ip) = &mut peer.adr else {panic!("this test requires default to be ipv4")};
ip.m_ip += idx as u32;
ip.m_port += r.gen_range(0..15);
peer.pruning_seed = if r.gen_bool(0.4) {
0
} else {
r.gen_range(384..=391)
};
}
PeerList::new(peer_list)
}
#[test]
fn peer_list_reduce_length() {
let mut peer_list = make_fake_peer_list(2090);
let must_keep_peers = HashSet::new();
let target_len = 2000;
peer_list.reduce_list(&must_keep_peers, target_len);
assert_eq!(peer_list.len(), target_len);
}
#[test]
fn peer_list_reduce_length_with_peers_we_need() {
let mut peer_list = make_fake_peer_list(500);
let must_keep_peers = HashSet::from_iter(peer_list.peers.iter().map(|(adr, _)| *adr));
let target_len = 49;
peer_list.reduce_list(&must_keep_peers, target_len);
// we can't remove any of the peers we said we need them all
assert_eq!(peer_list.len(), 500);
}
#[test]
fn peer_list_get_peers_by_pruning_seed() {
let mut r = rand::thread_rng();
let peer_list = make_fake_peer_list_with_random_pruning_seeds(1000);
let seed = if r.gen_bool(0.4) {
0
} else {
r.gen_range(384..=391)
};
let peers_with_seed = peer_list
.get_peers_with_pruning(&seed)
.expect("If you hit this buy a lottery ticket");
for peer in peers_with_seed {
assert_eq!(peer.pruning_seed, seed);
}
assert_eq!(peer_list.len(), 1000);
}
#[test]
fn peer_list_remove_specific_peer() {
let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100);
let peer = peer_list
.get_random_peer(&mut rand::thread_rng(), None)
.unwrap()
.clone();
assert!(peer_list.remove_peer(&peer.adr).is_some());
let pruning_idxs = peer_list.pruning_idxs;
let peers = peer_list.peers;
for (_, addrs) in pruning_idxs {
addrs.iter().for_each(|adr| assert_ne!(adr, &peer.adr))
}
assert!(!peers.contains_key(&peer.adr));
}
#[test]
fn peer_list_pruning_idxs_are_correct() {
let peer_list = make_fake_peer_list_with_random_pruning_seeds(100);
let mut total_len = 0;
for (seed, list) in peer_list.pruning_idxs {
for peer in list.iter() {
assert_eq!(peer_list.peers.get(peer).unwrap().pruning_seed, seed);
total_len += 1;
}
}
assert_eq!(total_len, peer_list.peers.len())
}
#[test]
fn peer_list_add_new_peer() {
let mut peer_list = make_fake_peer_list(10);
let mut new_peer = PeerListEntryBase::default();
let NetworkAddress::IPv4(ip) = &mut new_peer.adr else {panic!("this test requires default to be ipv4")};
ip.m_ip += 50;
peer_list.add_new_peer(new_peer.clone());
assert_eq!(peer_list.len(), 11);
assert_eq!(peer_list.get_peer(&new_peer.adr), Some(&new_peer));
assert!(peer_list
.pruning_idxs
.get(&new_peer.pruning_seed)
.unwrap()
.contains(&new_peer.adr));
}
#[test]
fn peer_list_add_existing_peer() {
let mut peer_list = make_fake_peer_list(10);
let existing_peer = peer_list
.get_peer(&NetworkAddress::default())
.unwrap()
.clone();
peer_list.add_new_peer(existing_peer.clone());
assert_eq!(peer_list.len(), 10);
assert_eq!(peer_list.get_peer(&existing_peer.adr), Some(&existing_peer));
}
#[test]
fn peer_list_get_non_existent_peer() {
let peer_list = make_fake_peer_list(10);
let mut non_existent_peer = NetworkAddress::default();
let NetworkAddress::IPv4(ip) = &mut non_existent_peer else {panic!("this test requires default to be ipv4")};
ip.m_ip += 50;
assert_eq!(peer_list.get_peer(&non_existent_peer), None);
}
#[test]
fn peer_list_ban_peers() {
let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100);
let peer = peer_list
.get_random_peer(&mut rand::thread_rng(), None)
.unwrap();
let ban_id = peer.adr.ban_identifier();
assert!(peer_list.contains_peer(&peer.adr));
assert_ne!(peer_list.ban_id_idxs.get(&ban_id).unwrap().len(), 0);
peer_list.remove_peers_with_ban_id(&ban_id);
assert_eq!(peer_list.ban_id_idxs.get(&ban_id).unwrap().len(), 0);
for (addr, _) in peer_list.peers {
assert_ne!(addr.ban_identifier(), ban_id);
}
}

View file

@ -1,81 +0,0 @@
use super::*;
use crate::NetZoneBasicNodeData;
use monero_wire::network_address::IPv4Address;
use rand::Rng;
fn create_random_net_address<R: Rng>(r: &mut R) -> NetworkAddress {
NetworkAddress::IPv4(IPv4Address {
m_ip: r.gen(),
m_port: r.gen(),
})
}
fn create_random_net_addr_vec<R: Rng>(r: &mut R, len: usize) -> Vec<NetworkAddress> {
let mut ret = Vec::with_capacity(len);
for i in 0..len {
ret.push(create_random_net_address(r));
}
ret
}
fn create_random_peer<R: Rng>(r: &mut R) -> PeerListEntryBase {
PeerListEntryBase {
adr: create_random_net_address(r),
pruning_seed: r.gen_range(384..=391),
id: PeerID(r.gen()),
last_seen: r.gen(),
rpc_port: r.gen(),
rpc_credits_per_hash: r.gen(),
}
}
fn create_random_peer_vec<R: Rng>(r: &mut R, len: usize) -> Vec<PeerListEntryBase> {
let mut ret = Vec::with_capacity(len);
for i in 0..len {
ret.push(create_random_peer(r));
}
ret
}
#[derive(Clone)]
pub struct MockPeerStore;
#[async_trait::async_trait]
impl P2PStore for MockPeerStore {
async fn basic_node_data(&mut self) -> Result<Option<NetZoneBasicNodeData>, &'static str> {
unimplemented!()
}
async fn save_basic_node_data(
&mut self,
node_id: &NetZoneBasicNodeData,
) -> Result<(), &'static str> {
unimplemented!()
}
async fn load_peers(
&mut self,
zone: NetZone,
) -> Result<
(
Vec<PeerListEntryBase>,
Vec<PeerListEntryBase>,
Vec<NetworkAddress>,
),
&'static str,
> {
let mut r = rand::thread_rng();
Ok((
create_random_peer_vec(&mut r, 300),
create_random_peer_vec(&mut r, 1500),
create_random_net_addr_vec(&mut r, 50),
))
}
async fn save_peers(
&mut self,
zone: NetZone,
white: Vec<&PeerListEntryBase>,
gray: Vec<&PeerListEntryBase>,
anchor: Vec<&NetworkAddress>,
) -> Result<(), &'static str> {
todo!()
}
}

View file

@ -1,110 +0,0 @@
//! This module contains the address book [`Connection`](crate::peer::connection::Connection) handle
//!
//! # Why do we need a handle between the address book and connection task
//!
//! When banning a peer we need to tell the connection task to close and
//! when we close a connection we need to remove it from our connection
//! and anchor list.
//!
//!
use futures::channel::oneshot;
use tokio_util::sync::CancellationToken;
/// A message sent to tell the address book that a peer has disconnected.
pub struct PeerConnectionClosed;
/// The connection side of the address book to connection
/// communication.
#[derive(Debug)]
pub struct AddressBookConnectionHandle {
connection_closed: Option<oneshot::Sender<PeerConnectionClosed>>,
close: CancellationToken,
}
impl AddressBookConnectionHandle {
/// Returns true if the address book has told us to kill the
/// connection.
pub fn is_canceled(&self) -> bool {
self.close.is_cancelled()
}
}
impl Drop for AddressBookConnectionHandle {
fn drop(&mut self) {
let connection_closed = std::mem::replace(&mut self.connection_closed, None).unwrap();
let _ = connection_closed.send(PeerConnectionClosed);
}
}
/// The address book side of the address book to connection
/// communication.
#[derive(Debug)]
pub struct ConnectionAddressBookHandle {
connection_closed: oneshot::Receiver<PeerConnectionClosed>,
killer: CancellationToken,
}
impl ConnectionAddressBookHandle {
/// Checks if the connection task has closed, returns
/// true if the task has closed
pub fn connection_closed(&mut self) -> bool {
let Ok(mes) = self.connection_closed.try_recv() else {
panic!("This must not be called again after returning true and the connection task must tell us if a connection is closed")
};
match mes {
None => false,
Some(_) => true,
}
}
/// Ends the connection task, the caller of this function should
/// wait to be told the connection has closed by [`check_if_connection_closed`](Self::check_if_connection_closed)
/// before acting on the closed connection.
pub fn kill_connection(&self) {
self.killer.cancel()
}
}
/// Creates a new handle pair that can be given to the connection task and
/// address book respectively.
pub fn new_address_book_connection_handle(
) -> (AddressBookConnectionHandle, ConnectionAddressBookHandle) {
let (tx, rx) = oneshot::channel();
let token = CancellationToken::new();
let ab_c_h = AddressBookConnectionHandle {
connection_closed: Some(tx),
close: token.clone(),
};
let c_ab_h = ConnectionAddressBookHandle {
connection_closed: rx,
killer: token,
};
(ab_c_h, c_ab_h)
}
#[cfg(test)]
mod tests {
use crate::address_book::connection_handle::new_address_book_connection_handle;
#[test]
fn close_connection_from_address_book() {
let (conn_side, mut addr_side) = new_address_book_connection_handle();
assert!(!conn_side.is_canceled());
assert!(!addr_side.connection_closed());
addr_side.kill_connection();
assert!(conn_side.is_canceled());
}
#[test]
fn close_connection_from_connection() {
let (conn_side, mut addr_side) = new_address_book_connection_handle();
assert!(!conn_side.is_canceled());
assert!(!addr_side.connection_closed());
drop(conn_side);
assert!(addr_side.connection_closed());
}
}

View file

@ -1,78 +0,0 @@
use cuprate_common::Network;
use monero_wire::messages::{common::PeerSupportFlags, BasicNodeData, PeerID};
use crate::{
constants::{
CUPRATE_SUPPORT_FLAGS, DEFAULT_IN_PEERS, DEFAULT_LOAD_OUT_PEERS_MULTIPLIER,
DEFAULT_TARGET_OUT_PEERS, MAX_GRAY_LIST_PEERS, MAX_WHITE_LIST_PEERS,
},
NodeID,
};
#[derive(Debug, Clone, Copy)]
pub struct Config {
/// Port
my_port: u32,
/// The Network
network: Network,
/// RPC Port
rpc_port: u16,
target_out_peers: usize,
out_peers_load_multiplier: usize,
max_in_peers: usize,
max_white_peers: usize,
max_gray_peers: usize,
}
impl Default for Config {
fn default() -> Self {
Config {
my_port: 18080,
network: Network::MainNet,
rpc_port: 18081,
target_out_peers: DEFAULT_TARGET_OUT_PEERS,
out_peers_load_multiplier: DEFAULT_LOAD_OUT_PEERS_MULTIPLIER,
max_in_peers: DEFAULT_IN_PEERS,
max_white_peers: MAX_WHITE_LIST_PEERS,
max_gray_peers: MAX_GRAY_LIST_PEERS,
}
}
}
impl Config {
pub fn basic_node_data(&self, peer_id: PeerID) -> BasicNodeData {
BasicNodeData {
my_port: self.my_port,
network_id: self.network.network_id(),
peer_id,
support_flags: CUPRATE_SUPPORT_FLAGS,
rpc_port: self.rpc_port,
rpc_credits_per_hash: 0,
}
}
pub fn peerset_total_connection_limit(&self) -> usize {
self.target_out_peers * self.out_peers_load_multiplier + self.max_in_peers
}
pub fn network(&self) -> Network {
self.network
}
pub fn max_white_peers(&self) -> usize {
self.max_white_peers
}
pub fn max_gray_peers(&self) -> usize {
self.max_gray_peers
}
pub fn public_port(&self) -> u32 {
self.my_port
}
pub fn public_rpc_port(&self) -> u16 {
self.rpc_port
}
}

View file

@ -1,125 +0,0 @@
use std::{fmt, sync::Arc};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
/// A counter for active connections.
///
/// Creates a [`ConnectionTracker`] to track each active connection.
/// When these trackers are dropped, the counter gets notified.
pub struct ActiveConnectionCounter {
/// The limit for this type of connection, for diagnostics only.
/// The caller must enforce the limit by ignoring, delaying, or dropping connections.
limit: usize,
/// The label for this connection counter, typically its type.
label: Arc<str>,
semaphore: Arc<Semaphore>,
}
impl fmt::Debug for ActiveConnectionCounter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ActiveConnectionCounter")
.field("label", &self.label)
.field("count", &self.count())
.field("limit", &self.limit)
.finish()
}
}
impl ActiveConnectionCounter {
/// Create and return a new active connection counter.
pub fn new_counter() -> Self {
Self::new_counter_with(Semaphore::MAX_PERMITS, "Active Connections")
}
/// Create and return a new active connection counter with `limit` and `label`.
/// The caller must check and enforce limits using [`update_count()`](Self::update_count).
pub fn new_counter_with<S: ToString>(limit: usize, label: S) -> Self {
let label = label.to_string();
Self {
limit,
label: label.into(),
semaphore: Arc::new(Semaphore::new(limit)),
}
}
/// Create and return a new [`ConnectionTracker`], using a permit from the semaphore,
/// SAFETY:
/// This function will panic if the semaphore doesn't have anymore permits.
pub fn track_connection(&mut self) -> ConnectionTracker {
ConnectionTracker::new(self)
}
pub fn count(&self) -> usize {
let count = self
.limit
.checked_sub(self.semaphore.available_permits())
.expect("Limit is less than available connection permits");
tracing::trace!(
open_connections = ?count,
limit = ?self.limit,
label = ?self.label,
);
count
}
pub fn available_permits(&self) -> usize {
self.semaphore.available_permits()
}
}
/// A per-connection tracker.
///
/// [`ActiveConnectionCounter`] creates a tracker instance for each active connection.
pub struct ConnectionTracker {
/// The permit for this connection, updates the semaphore when dropped.
permit: OwnedSemaphorePermit,
/// The label for this connection counter, typically its type.
label: Arc<str>,
}
impl fmt::Debug for ConnectionTracker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("ConnectionTracker")
.field(&self.label)
.finish()
}
}
impl ConnectionTracker {
/// Create and return a new active connection tracker, and add 1 to `counter`.
/// All connection trackers share a label with their connection counter.
///
/// When the returned tracker is dropped, `counter` will be notified.
///
/// SAFETY:
/// This function will panic if the [`ActiveConnectionCounter`] doesn't have anymore permits.
fn new(counter: &mut ActiveConnectionCounter) -> Self {
tracing::debug!(
open_connections = ?counter.count(),
limit = ?counter.limit,
label = ?counter.label,
"opening a new peer connection",
);
Self {
permit: counter.semaphore.clone().try_acquire_owned().unwrap(),
label: counter.label.clone(),
}
}
}
impl Drop for ConnectionTracker {
fn drop(&mut self) {
tracing::debug!(
label = ?self.label,
"A peer connection has closed",
);
// the permit is automatically dropped
}
}

View file

@ -1,98 +0,0 @@
//!
//! # Why do we need a handle between the address book and connection task
//!
//! When banning a peer we need to tell the connection task to close and
//! when we close a connection we need to tell the address book.
//!
//!
use std::time::Duration;
use futures::channel::mpsc;
use futures::SinkExt;
use tokio_util::sync::CancellationToken;
use crate::connection_counter::ConnectionTracker;
#[derive(Default, Debug)]
pub struct HandleBuilder {
tracker: Option<ConnectionTracker>,
}
impl HandleBuilder {
pub fn set_tracker(&mut self, tracker: ConnectionTracker) {
self.tracker = Some(tracker)
}
pub fn build(self) -> (DisconnectSignal, ConnectionHandle, PeerHandle) {
let token = CancellationToken::new();
let (tx, rx) = mpsc::channel(0);
(
DisconnectSignal {
token: token.clone(),
tracker: self.tracker.expect("Tracker was not set!"),
},
ConnectionHandle {
token: token.clone(),
ban: rx,
},
PeerHandle { ban: tx },
)
}
}
pub struct BanPeer(pub Duration);
/// A struct given to the connection task.
pub struct DisconnectSignal {
token: CancellationToken,
tracker: ConnectionTracker,
}
impl DisconnectSignal {
pub fn should_shutdown(&self) -> bool {
self.token.is_cancelled()
}
pub fn connection_closed(&self) {
self.token.cancel()
}
}
impl Drop for DisconnectSignal {
fn drop(&mut self) {
self.token.cancel()
}
}
/// A handle given to a task that needs to cancel this connection.
pub struct ConnectionHandle {
token: CancellationToken,
ban: mpsc::Receiver<BanPeer>,
}
impl ConnectionHandle {
pub fn is_closed(&self) -> bool {
self.token.is_cancelled()
}
pub fn check_should_ban(&mut self) -> Option<BanPeer> {
match self.ban.try_next() {
Ok(res) => res,
Err(_) => None,
}
}
pub fn send_close_signal(&self) {
self.token.cancel()
}
}
/// A handle given to a task that needs to be able to ban a connection.
#[derive(Clone)]
pub struct PeerHandle {
ban: mpsc::Sender<BanPeer>,
}
impl PeerHandle {
pub fn ban_peer(&mut self, duration: Duration) {
let _ = self.ban.send(BanPeer(duration));
}
}

View file

@ -1,58 +0,0 @@
use core::time::Duration;
use monero_wire::messages::common::PeerSupportFlags;
pub const CUPRATE_SUPPORT_FLAGS: PeerSupportFlags =
PeerSupportFlags::get_support_flag_fluffy_blocks();
pub const CUPRATE_MINIMUM_SUPPORT_FLAGS: PeerSupportFlags =
PeerSupportFlags::get_support_flag_fluffy_blocks();
pub const DEFAULT_TARGET_OUT_PEERS: usize = 20;
pub const DEFAULT_LOAD_OUT_PEERS_MULTIPLIER: usize = 3;
pub const DEFAULT_IN_PEERS: usize = 20;
pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(5);
pub const ADDRESS_BOOK_SAVE_INTERVAL: Duration = Duration::from_secs(60);
pub const ADDRESS_BOOK_BUFFER_SIZE: usize = 3;
pub const PEERSET_BUFFER_SIZE: usize = 3;
/// The maximum size of the address books white list.
/// This number is copied from monerod.
pub const MAX_WHITE_LIST_PEERS: usize = 1000;
/// The maximum size of the address books gray list.
/// This number is copied from monerod.
pub const MAX_GRAY_LIST_PEERS: usize = 5000;
/// The max amount of peers that can be sent in one
/// message.
pub const P2P_MAX_PEERS_IN_HANDSHAKE: usize = 250;
/// The timeout for sending a message to a remote peer,
/// and receiving a response from a remote peer.
pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(20);
/// The default RTT estimate for peer responses.
///
/// We choose a high value for the default RTT, so that new peers must prove they
/// are fast, before we prefer them to other peers. This is particularly
/// important on testnet, which has a small number of peers, which are often
/// slow.
///
/// Make the default RTT slightly higher than the request timeout.
pub const EWMA_DEFAULT_RTT: Duration = Duration::from_secs(REQUEST_TIMEOUT.as_secs() + 1);
/// The decay time for the EWMA response time metric used for load balancing.
///
/// This should be much larger than the `SYNC_RESTART_TIMEOUT`, so we choose
/// better peers when we restart the sync.
pub const EWMA_DECAY_TIME_NANOS: f64 = 200.0 * NANOS_PER_SECOND;
/// The number of nanoseconds in one second.
const NANOS_PER_SECOND: f64 = 1_000_000_000.0;

View file

@ -1,81 +0,0 @@
pub mod address_book;
pub mod config;
pub mod connection_counter;
mod connection_handle;
mod constants;
pub mod peer;
mod protocol;
pub use config::Config;
use rand::Rng;
#[derive(Debug, Clone)]
pub struct NetZoneBasicNodeData {
public: monero_wire::BasicNodeData,
tor: monero_wire::BasicNodeData,
i2p: monero_wire::BasicNodeData,
}
impl NetZoneBasicNodeData {
pub fn basic_node_data(&self, net_zone: &monero_wire::NetZone) -> monero_wire::BasicNodeData {
match net_zone {
monero_wire::NetZone::Public => self.public.clone(),
_ => todo!(),
}
}
pub fn new(config: &Config, node_id: &NodeID) -> Self {
let bnd = monero_wire::BasicNodeData {
my_port: config.public_port(),
network_id: config.network().network_id(),
peer_id: node_id.public,
support_flags: constants::CUPRATE_SUPPORT_FLAGS,
rpc_port: config.public_rpc_port(),
rpc_credits_per_hash: 0,
};
// obviously this is wrong, i will change when i add tor support
NetZoneBasicNodeData {
public: bnd.clone(),
tor: bnd.clone(),
i2p: bnd,
}
}
}
#[async_trait::async_trait]
pub trait P2PStore: Clone + Send + 'static {
/// Loads the peers from the peer store.
/// returns (in order):
/// the white list,
/// the gray list,
/// the anchor list,
/// the ban list
async fn load_peers(
&mut self,
zone: monero_wire::NetZone,
) -> Result<
(
Vec<monero_wire::PeerListEntryBase>, // white list
Vec<monero_wire::PeerListEntryBase>, // gray list
Vec<monero_wire::NetworkAddress>, // anchor list
// Vec<(monero_wire::NetworkAddress, chrono::NaiveDateTime)>, // ban list
),
&'static str,
>;
async fn save_peers(
&mut self,
zone: monero_wire::NetZone,
white: Vec<&monero_wire::PeerListEntryBase>,
gray: Vec<&monero_wire::PeerListEntryBase>,
anchor: Vec<&monero_wire::NetworkAddress>,
// bans: Vec<(&monero_wire::NetworkAddress, &chrono::NaiveDateTime)>, // ban lists
) -> Result<(), &'static str>;
async fn basic_node_data(&mut self) -> Result<Option<NetZoneBasicNodeData>, &'static str>;
async fn save_basic_node_data(
&mut self,
node_id: &NetZoneBasicNodeData,
) -> Result<(), &'static str>;
}

View file

@ -1,16 +0,0 @@
pub mod client;
pub mod connection;
pub mod connector;
pub mod handshaker;
pub mod load_tracked_client;
mod error;
#[cfg(test)]
mod tests;
pub use client::Client;
pub use client::ConnectionInfo;
pub use connection::Connection;
pub use connector::{Connector, OutboundConnectorRequest};
pub use handshaker::Handshaker;
pub use load_tracked_client::LoadTrackedClient;

View file

@ -1,176 +0,0 @@
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::task::{Context, Poll};
use std::{future::Future, sync::Arc};
use futures::{
channel::{mpsc, oneshot},
FutureExt,
};
use tokio::task::JoinHandle;
use tower::BoxError;
use cuprate_common::PruningSeed;
use monero_wire::{messages::common::PeerSupportFlags, NetworkAddress};
use super::{
connection::ClientRequest,
error::{ErrorSlot, PeerError, SharedPeerError},
PeerError,
};
use crate::connection_handle::PeerHandle;
use crate::protocol::{InternalMessageRequest, InternalMessageResponse};
pub struct ConnectionInfo {
pub support_flags: PeerSupportFlags,
pub pruning_seed: PruningSeed,
pub handle: PeerHandle,
pub rpc_port: u16,
pub rpc_credits_per_hash: u32,
}
pub struct Client {
pub connection_info: Arc<ConnectionInfo>,
/// Used to shut down the corresponding heartbeat.
/// This is always Some except when we take it on drop.
heartbeat_shutdown_tx: Option<oneshot::Sender<()>>,
server_tx: mpsc::Sender<ClientRequest>,
connection_task: JoinHandle<()>,
heartbeat_task: JoinHandle<()>,
error_slot: ErrorSlot,
}
impl Client {
pub fn new(
connection_info: Arc<ConnectionInfo>,
heartbeat_shutdown_tx: oneshot::Sender<()>,
server_tx: mpsc::Sender<ClientRequest>,
connection_task: JoinHandle<()>,
heartbeat_task: JoinHandle<()>,
error_slot: ErrorSlot,
) -> Self {
Client {
connection_info,
heartbeat_shutdown_tx: Some(heartbeat_shutdown_tx),
server_tx,
connection_task,
heartbeat_task,
error_slot,
}
}
/// Check if this connection's heartbeat task has exited.
#[allow(clippy::unwrap_in_result)]
fn check_heartbeat(&mut self, cx: &mut Context<'_>) -> Result<(), SharedPeerError> {
let is_canceled = self
.heartbeat_shutdown_tx
.as_mut()
.expect("only taken on drop")
.poll_canceled(cx)
.is_ready();
if is_canceled {
return self.set_task_exited_error(
"heartbeat",
PeerError::HeartbeatTaskExited("Task was cancelled".to_string()),
);
}
match self.heartbeat_task.poll_unpin(cx) {
Poll::Pending => {
// Heartbeat task is still running.
Ok(())
}
Poll::Ready(Ok(Ok(_))) => {
// Heartbeat task stopped unexpectedly, without panic or error.
self.set_task_exited_error(
"heartbeat",
PeerError::HeartbeatTaskExited(
"Heartbeat task stopped unexpectedly".to_string(),
),
)
}
Poll::Ready(Ok(Err(error))) => {
// Heartbeat task stopped unexpectedly, with error.
self.set_task_exited_error(
"heartbeat",
PeerError::HeartbeatTaskExited(error.to_string()),
)
}
Poll::Ready(Err(error)) => {
// Heartbeat task was cancelled.
if error.is_cancelled() {
self.set_task_exited_error(
"heartbeat",
PeerError::HeartbeatTaskExited("Task was cancelled".to_string()),
)
}
// Heartbeat task stopped with panic.
else if error.is_panic() {
panic!("heartbeat task has panicked: {error}");
}
// Heartbeat task stopped with error.
else {
self.set_task_exited_error(
"heartbeat",
PeerError::HeartbeatTaskExited(error.to_string()),
)
}
}
}
}
/// Check if the connection's task has exited.
fn check_connection(&mut self, context: &mut Context<'_>) -> Result<(), PeerError> {
match self.connection_task.poll_unpin(context) {
Poll::Pending => {
// Connection task is still running.
Ok(())
}
Poll::Ready(Ok(())) => {
// Connection task stopped unexpectedly, without panicking.
return Err(PeerError::ConnectionTaskClosed);
}
Poll::Ready(Err(error)) => {
// Connection task stopped unexpectedly with a panic. shut the node down.
tracing::error!("Peer Connection task panicked: {error}, shutting the node down!");
set_shutting_down();
return Err(PeerError::ConnectionTaskClosed);
}
}
}
}
impl tower::Service<InternalMessageRequest> for Client {
type Response = InternalMessageResponse;
type Error = SharedPeerError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.server_tx
.poll_ready(cx)
.map_err(|e| PeerError::ClientChannelClosed.into())
}
fn call(&mut self, req: InternalMessageRequest) -> Self::Future {
let (tx, rx) = oneshot::channel();
match self.server_tx.try_send(ClientRequest { req, tx }) {
Ok(()) => rx
.map(|recv_result| {
recv_result
.expect("ClientRequest oneshot sender must not be dropped before send")
.map_err(|e| e.into())
})
.boxed(),
Err(_) => {
// TODO: better error handling
futures::future::ready(Err(PeerError::ClientChannelClosed.into())).boxed()
}
}
}
}

View file

@ -1,169 +0,0 @@
use futures::channel::{mpsc, oneshot};
use futures::stream::FusedStream;
use futures::{Sink, SinkExt, Stream, StreamExt};
use monero_wire::{Message, BucketError};
use tower::{BoxError, Service};
use crate::connection_handle::DisconnectSignal;
use crate::peer::error::{ErrorSlot, PeerError, SharedPeerError};
use crate::peer::handshaker::ConnectionAddr;
use crate::protocol::internal_network::{MessageID, Request, Response};
pub struct ClientRequest {
pub req: Request,
pub tx: oneshot::Sender<Result<Response, SharedPeerError>>,
}
pub enum State {
WaitingForRequest,
WaitingForResponse {
request_id: MessageID,
tx: oneshot::Sender<Result<Response, SharedPeerError>>,
},
}
pub struct Connection<Svc, Snk> {
address: ConnectionAddr,
state: State,
sink: Snk,
client_rx: mpsc::Receiver<ClientRequest>,
error_slot: ErrorSlot,
/// # Security
///
/// If this connection tracker or `Connection`s are leaked,
/// the number of active connections will appear higher than it actually is.
/// If enough connections leak, Cuprate will stop making new connections.
connection_tracker: DisconnectSignal,
svc: Svc,
}
impl<Svc, Snk> Connection<Svc, Snk>
where
Svc: Service<Request, Response = Response, Error = BoxError>,
Snk: Sink<Message, Error = BucketError> + Unpin,
{
pub fn new(
address: ConnectionAddr,
sink: Snk,
client_rx: mpsc::Receiver<ClientRequest>,
error_slot: ErrorSlot,
connection_tracker: DisconnectSignal,
svc: Svc,
) -> Connection<Svc, Snk> {
Connection {
address,
state: State::WaitingForRequest,
sink,
client_rx,
error_slot,
connection_tracker,
svc,
}
}
async fn handle_response(&mut self, res: Response) -> Result<(), PeerError> {
let state = std::mem::replace(&mut self.state, State::WaitingForRequest);
if let State::WaitingForResponse { request_id, tx } = state {
if request_id != res.id() {
// TODO: Fail here
return Err(PeerError::PeerSentIncorrectResponse);
}
// response passed our tests we can send it to the requester
let _ = tx.send(Ok(res));
Ok(())
} else {
unreachable!("This will only be called when in state WaitingForResponse");
}
}
async fn send_message_to_peer(&mut self, mes: impl Into<Message>) -> Result<(), PeerError> {
Ok(self.sink.send(mes.into()).await?)
}
async fn handle_peer_request(&mut self, req: Request) -> Result<(), PeerError> {
// we should check contents of peer requests for obvious errors like we do with responses
todo!()
/*
let ready_svc = self.svc.ready().await?;
let res = ready_svc.call(req).await?;
self.send_message_to_peer(res).await
*/
}
async fn handle_client_request(&mut self, req: ClientRequest) -> Result<(), PeerError> {
if req.req.needs_response() {
self.state = State::WaitingForResponse {
request_id: req.req.id(),
tx: req.tx,
};
}
// TODO: send NA response to requester
self.send_message_to_peer(req.req).await
}
async fn state_waiting_for_request<Str>(&mut self, stream: &mut Str) -> Result<(), PeerError>
where
Str: FusedStream<Item = Result<Message, BucketError>> + Unpin,
{
futures::select! {
peer_message = stream.next() => {
match peer_message.expect("MessageStream will never return None") {
Ok(message) => {
self.handle_peer_request(message.try_into().map_err(|_| PeerError::ResponseError(""))?).await
},
Err(e) => Err(e.into()),
}
},
client_req = self.client_rx.next() => {
self.handle_client_request(client_req.ok_or(PeerError::ClientChannelClosed)?).await
},
}
}
async fn state_waiting_for_response<Str>(&mut self, stream: &mut Str) -> Result<(), PeerError>
where
Str: FusedStream<Item = Result<Message, BucketError>> + Unpin,
{
// put a timeout on this
let peer_message = stream
.next()
.await
.expect("MessageStream will never return None")?;
if !peer_message.is_request()
&& self.state.expected_response_id() == Some(peer_message.id())
{
if let Ok(res) = peer_message.try_into() {
Ok(self.handle_response(res).await?)
} else {
// im almost certain this is impossible to hit, but im not certain enough to use unreachable!()
Err(PeerError::ResponseError("Peer sent incorrect response"))
}
} else {
if let Ok(req) = peer_message.try_into() {
self.handle_peer_request(req).await
} else {
// this can be hit if the peer sends a protocol response with the wrong id
Err(PeerError::ResponseError("Peer sent incorrect response"))
}
}
}
pub async fn run<Str>(mut self, mut stream: Str)
where
Str: FusedStream<Item = Result<Message, BucketError>> + Unpin,
{
loop {
let _res = match self.state {
State::WaitingForRequest => self.state_waiting_for_request(&mut stream).await,
State::WaitingForResponse { .. } => {
self.state_waiting_for_response(&mut stream).await
}
};
}
}
}

View file

@ -1,159 +0,0 @@
//! Wrapper around handshake logic that also opens a TCP connection.
use std::{
future::Future,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use futures::{AsyncRead, AsyncWrite, FutureExt};
use monero_wire::{network_address::NetZone, NetworkAddress};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use tower::{BoxError, Service, ServiceExt};
use tracing::Instrument;
use crate::peer::handshaker::ConnectionAddr;
use crate::{
address_book::{AddressBookRequest, AddressBookResponse},
connection_counter::ConnectionTracker,
protocol::{
CoreSyncDataRequest, CoreSyncDataResponse, InternalMessageRequest, InternalMessageResponse,
},
};
use super::{
handshaker::{DoHandshakeRequest, Handshaker},
Client,
};
async fn connect(addr: &NetworkAddress) -> Result<(impl AsyncRead, impl AsyncWrite), BoxError> {
match addr.get_zone() {
NetZone::Public => {
let stream =
tokio::net::TcpStream::connect(SocketAddr::try_from(*addr).unwrap()).await?;
let (read, write) = stream.into_split();
Ok((read.compat(), write.compat_write()))
}
_ => unimplemented!(),
}
}
/// A wrapper around [`Handshake`] that opens a connection before
/// forwarding to the inner handshake service. Writing this as its own
/// [`tower::Service`] lets us apply unified timeout policies, etc.
#[derive(Debug, Clone)]
pub struct Connector<Svc, CoreSync, AdrBook>
where
CoreSync: Service<CoreSyncDataRequest, Response = CoreSyncDataResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
CoreSync::Future: Send,
Svc: Service<InternalMessageRequest, Response = InternalMessageResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
Svc::Future: Send,
AdrBook: Service<AddressBookRequest, Response = AddressBookResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
AdrBook::Future: Send,
{
handshaker: Handshaker<Svc, CoreSync, AdrBook>,
}
impl<Svc, CoreSync, AdrBook> Connector<Svc, CoreSync, AdrBook>
where
CoreSync: Service<CoreSyncDataRequest, Response = CoreSyncDataResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
CoreSync::Future: Send,
Svc: Service<InternalMessageRequest, Response = InternalMessageResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
Svc::Future: Send,
AdrBook: Service<AddressBookRequest, Response = AddressBookResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
AdrBook::Future: Send,
{
pub fn new(handshaker: Handshaker<Svc, CoreSync, AdrBook>) -> Self {
Connector { handshaker }
}
}
/// A connector request.
/// Contains the information needed to make an outbound connection to the peer.
pub struct OutboundConnectorRequest {
/// The Monero listener address of the peer.
pub addr: NetworkAddress,
/// A connection tracker that reduces the open connection count when dropped.
///
/// Used to limit the number of open connections in Cuprate.
pub connection_tracker: ConnectionTracker,
}
impl<Svc, CoreSync, AdrBook> Service<OutboundConnectorRequest> for Connector<Svc, CoreSync, AdrBook>
where
CoreSync: Service<CoreSyncDataRequest, Response = CoreSyncDataResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
CoreSync::Future: Send,
Svc: Service<InternalMessageRequest, Response = InternalMessageResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
Svc::Future: Send,
AdrBook: Service<AddressBookRequest, Response = AddressBookResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
AdrBook::Future: Send,
{
type Response = (NetworkAddress, Client);
type Error = BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: OutboundConnectorRequest) -> Self::Future {
let OutboundConnectorRequest {
addr: address,
connection_tracker,
}: OutboundConnectorRequest = req;
let hs = self.handshaker.clone();
let connector_span = tracing::info_span!("connector", peer = ?address);
async move {
let (read, write) = connect(&address).await?;
let client = hs
.oneshot(DoHandshakeRequest {
read,
write,
addr: ConnectionAddr::OutBound { address },
connection_tracker,
})
.await?;
Ok((address, client))
}
.instrument(connector_span)
.boxed()
}
}

View file

@ -1,116 +0,0 @@
use std::sync::{Arc, Mutex};
use monero_wire::BucketError;
use thiserror::Error;
use tracing_error::TracedError;
/// A wrapper around `Arc<PeerError>` that implements `Error`.
#[derive(Error, Debug, Clone)]
#[error(transparent)]
pub struct SharedPeerError(Arc<TracedError<PeerError>>);
impl<E> From<E> for SharedPeerError
where
PeerError: From<E>,
{
fn from(source: E) -> Self {
Self(Arc::new(TracedError::from(PeerError::from(source))))
}
}
impl SharedPeerError {
/// Returns a debug-formatted string describing the inner [`PeerError`].
///
/// Unfortunately, [`TracedError`] makes it impossible to get a reference to the original error.
pub fn inner_debug(&self) -> String {
format!("{:?}", self.0.as_ref())
}
}
#[derive(Debug, Error)]
pub enum PeerError {
#[error("The connection task has closed.")]
ConnectionTaskClosed,
#[error("Error with peers response: {0}.")]
ResponseError(&'static str),
#[error("The connected peer sent an an unexpected response message.")]
PeerSentUnexpectedResponse,
#[error("The connected peer sent an incorrect response.")]
BucketError(#[from] BucketError),
#[error("The channel was closed.")]
ClientChannelClosed,
}
/// A shared error slot for peer errors.
///
/// # Correctness
///
/// Error slots are shared between sync and async code. In async code, the error
/// mutex should be held for as short a time as possible. This avoids blocking
/// the async task thread on acquiring the mutex.
///
/// > If the value behind the mutex is just data, its usually appropriate to use a blocking mutex
/// > ...
/// > wrap the `Arc<Mutex<...>>` in a struct
/// > that provides non-async methods for performing operations on the data within,
/// > and only lock the mutex inside these methods
///
/// <https://docs.rs/tokio/1.15.0/tokio/sync/struct.Mutex.html#which-kind-of-mutex-should-you-use>
#[derive(Default, Clone)]
pub struct ErrorSlot(Arc<std::sync::Mutex<Option<SharedPeerError>>>);
impl std::fmt::Debug for ErrorSlot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// don't hang if the mutex is locked
// show the panic if the mutex was poisoned
f.debug_struct("ErrorSlot")
.field("error", &self.0.try_lock())
.finish()
}
}
impl ErrorSlot {
/// Read the current error in the slot.
///
/// Returns `None` if there is no error in the slot.
///
/// # Correctness
///
/// Briefly locks the error slot's threaded `std::sync::Mutex`, to get a
/// reference to the error in the slot.
#[allow(clippy::unwrap_in_result)]
pub fn try_get_error(&self) -> Option<SharedPeerError> {
self.0
.lock()
.expect("error mutex should be unpoisoned")
.as_ref()
.cloned()
}
/// Update the current error in the slot.
///
/// Returns `Err(AlreadyErrored)` if there was already an error in the slot.
///
/// # Correctness
///
/// Briefly locks the error slot's threaded `std::sync::Mutex`, to check for
/// a previous error, then update the error in the slot.
#[allow(clippy::unwrap_in_result)]
pub fn try_update_error(&self, e: SharedPeerError) -> Result<(), AlreadyErrored> {
let mut guard = self.0.lock().expect("error mutex should be unpoisoned");
if let Some(original_error) = guard.clone() {
Err(AlreadyErrored { original_error })
} else {
*guard = Some(e);
Ok(())
}
}
}
/// Error returned when the [`ErrorSlot`] already contains an error.
#[derive(Clone, Debug)]
pub struct AlreadyErrored {
/// The original error in the error slot.
pub original_error: SharedPeerError,
}

View file

@ -1,627 +0,0 @@
/// This module contains the logic for turning [`AsyncRead`] and [`AsyncWrite`]
/// into [`Client`] and [`Connection`].
///
/// The main entry point is modeled as a [`tower::Service`] the struct being
/// [`Handshaker`]. The [`Handshaker`] accepts handshake requests: [`DoHandshakeRequest`]
/// and creates a state machine that's drives the handshake forward: [`HandshakeSM`] and
/// eventually outputs a [`Client`] and [`Connection`].
///
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use futures::{channel::mpsc, sink::Sink, SinkExt, Stream};
use futures::{FutureExt, StreamExt};
use thiserror::Error;
use tokio::{
io::{AsyncRead, AsyncWrite},
time,
};
use tokio_util::codec::{FramedRead, FramedWrite};
use tower::{BoxError, Service, ServiceExt};
use tracing::Instrument;
use cuprate_common::{Network, PruningSeed};
use monero_wire::messages::admin::SupportFlagsResponse;
use monero_wire::{
messages::{
admin::{HandshakeRequest, HandshakeResponse},
common::PeerSupportFlags,
BasicNodeData, CoreSyncData, PeerID, PeerListEntryBase,
},
BucketError, Message, MoneroWireCodec, NetZone, NetworkAddress, RequestMessage,
ResponseMessage,
};
use super::{
client::{Client, ConnectionInfo},
connection::Connection,
PeerError,
};
use crate::address_book::connection_handle::new_address_book_connection_handle;
use crate::address_book::{AddressBookRequest, AddressBookResponse};
use crate::connection_counter::ConnectionTracker;
use crate::constants::{
CUPRATE_MINIMUM_SUPPORT_FLAGS, HANDSHAKE_TIMEOUT, P2P_MAX_PEERS_IN_HANDSHAKE,
};
use crate::protocol::{
CoreSyncDataRequest, CoreSyncDataResponse, Direction, InternalMessageRequest,
InternalMessageResponse,
};
use crate::NetZoneBasicNodeData;
/// Possible handshake errors
#[derive(Debug, Error)]
pub enum HandShakeError {
/// The peer did not complete the handshake fast enough.
#[error("The peer did not complete the handshake fast enough")]
PeerTimedOut,
/// The Peer has non-standard pruning.
#[error("The peer has a weird pruning scheme")]
PeerClaimedWeirdPruning,
/// The peer does not have the minimum support flags
#[error("The peer does not have the minimum support flags")]
PeerDoesNotHaveTheMinimumSupportFlags,
/// The peer is not on the network we are on (MAINNET|TESTNET|STAGENET)
#[error("The peer is on a different network")]
PeerIsOnADifferentNetwork,
/// The peer sent us too many peers, more than [`P2P_MAX_PEERS_IN_HANDSHAKE`]
#[error("The peer sent too many peers, considered spamming")]
PeerSentTooManyPeers,
/// The peer sent an incorrect response
#[error("The peer sent a wrong response to our handshake")]
PeerSentWrongResponse,
/// Error communicating with peer
#[error("Bucket error while communicating with peer: {0}")]
BucketError(#[from] BucketError),
}
/// An address used to connect to a peer.
#[derive(Debug, Copy, Clone)]
pub enum ConnectionAddr {
/// Outbound connection to another peer.
OutBound { address: NetworkAddress },
/// An inbound direct connection to our node.
InBoundDirect { transient_address: SocketAddr },
/// An inbound connection through a hidden network
/// like Tor/ I2p
InBoundProxy { net_zone: NetZone },
}
impl ConnectionAddr {
/// Gets the [`NetworkAddress`] of this connection.
pub fn get_network_address(&self, port: u16) -> Option<NetworkAddress> {
match self {
ConnectionAddr::OutBound { address } => Some(*address),
_ => None,
}
}
/// Gets the [`NetZone`] of this connection.
pub fn get_zone(&self) -> NetZone {
match self {
ConnectionAddr::OutBound { address } => address.get_zone(),
ConnectionAddr::InBoundDirect { .. } => NetZone::Public,
ConnectionAddr::InBoundProxy { net_zone } => *net_zone,
}
}
/// Gets the [`Direction`] of this connection.
pub fn direction(&self) -> Direction {
match self {
ConnectionAddr::OutBound { .. } => Direction::Outbound,
ConnectionAddr::InBoundDirect { .. } | ConnectionAddr::InBoundProxy { .. } => {
Direction::Inbound
}
}
}
}
/// A request to handshake with a peer.
pub struct DoHandshakeRequest<W, R> {
/// The read-half of the connection.
pub read: R,
/// The write-half of the connection.
pub write: W,
/// The [`ConnectionAddr`] of this connection.
pub addr: ConnectionAddr,
/// The [`ConnectionTracker`] of this connection.
pub connection_tracker: ConnectionTracker,
}
/// A [`Service`] that accepts [`DoHandshakeRequest`] and
/// produces a [`Client`] and [`Connection`].
#[derive(Debug, Clone)]
pub struct Handshaker<Svc, CoreSync, AdrBook> {
/// A collection of our [`BasicNodeData`] for each [`NetZone`]
/// for more info see: [`NetZoneBasicNodeData`]
basic_node_data: NetZoneBasicNodeData,
/// The [`Network`] our node is using
network: Network,
/// The span [`Connection`] tasks will be [`tracing::instrument`]ed with
parent_span: tracing::Span,
/// The address book [`Service`]
address_book: AdrBook,
/// A [`Service`] to handle incoming [`CoreSyncData`] and to get
/// our [`CoreSyncData`].
core_sync_svc: CoreSync,
/// A service given to the [`Connection`] task to answer incoming
/// requests to our node.
peer_request_service: Svc,
}
impl<Svc, CoreSync, AdrBook> Handshaker<Svc, CoreSync, AdrBook> {
pub fn new(
basic_node_data: NetZoneBasicNodeData,
network: Network,
address_book: AdrBook,
core_sync_svc: CoreSync,
peer_request_service: Svc,
) -> Self {
Handshaker {
basic_node_data,
network,
parent_span: tracing::Span::current(),
address_book,
core_sync_svc,
peer_request_service,
}
}
}
impl<Svc, CoreSync, AdrBook, W, R> Service<DoHandshakeRequest<W, R>>
for Handshaker<Svc, CoreSync, AdrBook>
where
CoreSync: Service<CoreSyncDataRequest, Response = CoreSyncDataResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
CoreSync::Future: Send,
Svc: Service<InternalMessageRequest, Response = InternalMessageResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
Svc::Future: Send,
AdrBook: Service<AddressBookRequest, Response = AddressBookResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
AdrBook::Future: Send,
W: AsyncWrite + Unpin + Send + 'static,
R: AsyncRead + Unpin + Send + 'static,
{
type Response = Client;
type Error = BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
// We are always ready.
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, req: DoHandshakeRequest<W, R>) -> Self::Future {
let DoHandshakeRequest {
read,
write,
addr,
connection_tracker,
} = req;
// create the levin message stream/ sink.
let peer_stream = FramedRead::new(read, MoneroWireCodec::default());
let peer_sink = FramedWrite::new(write, MoneroWireCodec::default());
// The span the handshake state machine will use
let span = tracing::debug_span!("Handshaker");
// The span the connection task will use.
let connection_span = tracing::debug_span!(parent: &self.parent_span, "Connection");
// clone the services that the handshake state machine will need.
let core_sync_svc = self.core_sync_svc.clone();
let address_book = self.address_book.clone();
let peer_request_service = self.peer_request_service.clone();
let state_machine = HandshakeSM {
peer_sink,
peer_stream,
addr,
network: self.network,
basic_node_data: self.basic_node_data.basic_node_data(&addr.get_zone()),
address_book,
core_sync_svc,
peer_request_service,
connection_span,
connection_tracker,
state: HandshakeState::Start,
};
// although callers should use a timeout do one here as well just to be safe.
let ret = time::timeout(HANDSHAKE_TIMEOUT, state_machine.do_handshake());
async move {
match ret.await {
Ok(handshake) => handshake,
Err(_) => Err(HandShakeError::PeerTimedOut.into()),
}
}
.instrument(span)
.boxed()
}
}
/// The states a handshake can be in.
enum HandshakeState {
/// The initial state.
/// if this is an inbound handshake then this state means we
/// are waiting for a [`HandshakeRequest`].
Start,
/// Waiting for a [`HandshakeResponse`].
WaitingForHandshakeResponse,
/// Waiting for a [`SupportFlagsResponse`]
/// This contains the peers node data.
WaitingForSupportFlagResponse(BasicNodeData, CoreSyncData),
/// The handshake is complete.
/// This contains the peers node data.
Complete(BasicNodeData, CoreSyncData),
}
impl HandshakeState {
/// Returns true if the handshake is completed.
pub fn is_complete(&self) -> bool {
matches!(self, Self::Complete(..))
}
/// returns the peers [`BasicNodeData`] and [`CoreSyncData`] if the peer
/// is in state [`HandshakeState::Complete`].
pub fn peer_data(self) -> Option<(BasicNodeData, CoreSyncData)> {
match self {
HandshakeState::Complete(bnd, coresync) => Some((bnd, coresync)),
_ => None,
}
}
}
/// The state machine that drives a handshake forward and
/// accepts requests (that can happen during a handshake)
/// from a peer.
struct HandshakeSM<Svc, CoreSync, AdrBook, W, R> {
/// The levin [`FramedWrite`] for the peer.
peer_sink: W,
/// The levin [`FramedRead`] for the peer.
peer_stream: R,
/// The [`ConnectionAddr`] for the peer.
addr: ConnectionAddr,
/// The [`Network`] we are on.
network: Network,
/// Our [`BasicNodeData`].
basic_node_data: BasicNodeData,
/// The address book [`Service`]
address_book: AdrBook,
/// The core sync [`Service`] to handle incoming
/// [`CoreSyncData`] and to retrieve ours.
core_sync_svc: CoreSync,
/// The [`Service`] passed to the [`Connection`]
/// task to handle incoming peer requests.
peer_request_service: Svc,
/// The [`tracing::Span`] the [`Connection`] task
/// will be [`tracing::instrument`]ed with.
connection_span: tracing::Span,
/// A connection tracker to keep track of the
/// number of connections Cuprate is making.
connection_tracker: ConnectionTracker,
state: HandshakeState,
}
impl<Svc, CoreSync, AdrBook, W, R> HandshakeSM<Svc, CoreSync, AdrBook, W, R>
where
CoreSync: Service<CoreSyncDataRequest, Response = CoreSyncDataResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
CoreSync::Future: Send,
Svc: Service<InternalMessageRequest, Response = InternalMessageResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
Svc::Future: Send,
AdrBook: Service<AddressBookRequest, Response = AddressBookResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
AdrBook::Future: Send,
W: Sink<Message, Error = BucketError> + Unpin,
R: Stream<Item = Result<Message, BucketError>> + Unpin,
{
/// Gets our [`CoreSyncData`] from the `core_sync_svc`.
async fn get_our_core_sync(&mut self) -> Result<CoreSyncData, BoxError> {
let core_sync_svc = self.core_sync_svc.ready().await?;
let CoreSyncDataResponse::Ours(core_sync) = core_sync_svc.call(CoreSyncDataRequest::GetOurs).await? else {
unreachable!("The Service must give correct responses");
};
tracing::trace!("Got core sync data: {core_sync:?}");
Ok(core_sync)
}
/// Sends a [`HandshakeRequest`] to the peer.
async fn send_handshake_req(
&mut self,
node_data: BasicNodeData,
payload_data: CoreSyncData,
) -> Result<(), HandShakeError> {
let handshake_req = HandshakeRequest {
node_data,
payload_data,
};
tracing::trace!("Sending handshake request: {handshake_req:?}");
let message: Message = Message::Request(RequestMessage::Handshake(handshake_req));
self.peer_sink.send(message).await?;
Ok(())
}
/// Sends a [`SupportFlagsRequest`] to the peer.
/// This is done when a peer sends no support flags in their
/// [`HandshakeRequest`] or [`HandshakeResponse`].
///
/// *note because Cuprate has minimum required support flags this won't
/// happeen but is included here just in case this changes.
async fn send_support_flag_req(&mut self) -> Result<(), HandShakeError> {
tracing::trace!("Peer sent no support flags, sending request");
let message: Message = Message::Request(RequestMessage::SupportFlags);
self.peer_sink.send(message).await?;
Ok(())
}
/// Handles an incoming [`HandshakeResponse`].
async fn handle_handshake_response(&mut self, res: HandshakeResponse) -> Result<(), BoxError> {
let HandshakeResponse {
node_data: peer_node_data,
payload_data: peer_core_sync,
local_peerlist_new,
} = res;
// Check the peer is on the correct network.
if peer_node_data.network_id != self.network.network_id() {
tracing::debug!("Handshake failed: peer is on a different network");
return Err(HandShakeError::PeerIsOnADifferentNetwork.into());
}
// Check the peer meets the minimum support flags.
if !peer_node_data
.support_flags
.contains(&CUPRATE_MINIMUM_SUPPORT_FLAGS)
{
tracing::debug!("Handshake failed: peer does not have minimum required support flags");
return Err(HandShakeError::PeerDoesNotHaveTheMinimumSupportFlags.into());
}
// Check the peer didn't send too many peers.
if local_peerlist_new.len() > P2P_MAX_PEERS_IN_HANDSHAKE {
tracing::debug!("Handshake failed: peer sent too many peers in response");
return Err(HandShakeError::PeerSentTooManyPeers.into());
}
// Tell the sync mgr about the new incoming core sync data.
self.core_sync_svc
.ready()
.await?
.call(CoreSyncDataRequest::NewIncoming(peer_core_sync.clone()))
.await?;
// Tell the address book about the new peers
self.address_book
.ready()
.await?
.call(AddressBookRequest::HandleNewPeerList(
local_peerlist_new,
self.addr.get_zone(),
))
.await?;
// This won't actually happen (as long as we have a none 0 minimum support flags)
// it's just included here for completeness.
if peer_node_data.support_flags.is_empty() {
self.send_support_flag_req().await?;
self.state =
HandshakeState::WaitingForSupportFlagResponse(peer_node_data, peer_core_sync);
} else {
// this will always happen.
self.state = HandshakeState::Complete(peer_node_data, peer_core_sync);
}
Ok(())
}
/// Handles a [`MessageResponse`].
async fn handle_message_response(&mut self, response: ResponseMessage) -> Result<(), BoxError> {
// The functions called here will change the state of the HandshakeSM so `HandshakeState::Start`
// is just used as a place holder.
//
// doing this allows us to not clone the BasicNodeData and CoreSyncData for WaitingForSupportFlagResponse.
let prv_state = std::mem::replace(&mut self.state, HandshakeState::Start);
match (prv_state, response) {
(
HandshakeState::WaitingForHandshakeResponse,
ResponseMessage::Handshake(handshake),
) => self.handle_handshake_response(handshake).await,
(
HandshakeState::WaitingForSupportFlagResponse(mut bnd, coresync),
ResponseMessage::SupportFlags(support_flags),
) => {
bnd.support_flags = support_flags.support_flags;
self.state = HandshakeState::Complete(bnd, coresync);
Ok(())
}
_ => Err(HandShakeError::PeerSentWrongResponse.into()),
}
}
/// Sends our [`PeerSupportFlags`] to the peer.
async fn send_support_flags(
&mut self,
support_flags: PeerSupportFlags,
) -> Result<(), HandShakeError> {
let message = Message::Response(ResponseMessage::SupportFlags(SupportFlagsResponse {
support_flags,
}));
self.peer_sink.send(message).await?;
Ok(())
}
/// Attempts an outbound handshake with the peer.
async fn do_outbound_handshake(&mut self) -> Result<(), BoxError> {
// Get the data needed for the handshake request.
let core_sync = self.get_our_core_sync().await?;
// send the handshake request.
self.send_handshake_req(self.basic_node_data.clone(), core_sync)
.await?;
// set the state to waiting for a response.
self.state = HandshakeState::WaitingForHandshakeResponse;
while !self.state.is_complete() {
match self.peer_stream.next().await {
Some(mes) => {
let mes = mes?;
match mes {
Message::Request(RequestMessage::SupportFlags) => {
// The only request we should be getting during an outbound handshake
// is a support flag request.
self.send_support_flags(self.basic_node_data.support_flags)
.await?
}
Message::Response(response) => {
// This could be a handshake response or a support flags response.
self.handle_message_response(response).await?
}
_ => return Err(HandShakeError::PeerSentWrongResponse.into()),
}
}
None => unreachable!("peer_stream wont return None"),
}
}
Ok(())
}
/// Completes a handshake with a peer.
async fn do_handshake(mut self) -> Result<Client, BoxError> {
let mut peer_reachable = false;
match self.addr.direction() {
Direction::Outbound => {
self.do_outbound_handshake().await?;
// If this is an outbound handshake then obviously the peer
// is reachable.
peer_reachable = true
}
Direction::Inbound => todo!(),
}
let (server_tx, server_rx) = mpsc::channel(0);
let (peer_node_data, coresync) = self
.state
.peer_data()
.expect("We must be in state complete to be here");
let pruning_seed = PruningSeed::try_from(coresync.pruning_seed).map_err(|e| Box::new(e))?;
// create the handle between the Address book and the connection task to
// allow the address book to shutdown the connection task and to update
// the address book when the connection is closed.
let (book_connection_side_handle, connection_book_side_handle) =
new_address_book_connection_handle();
// tell the address book about the new connection.
self.address_book
.ready()
.await?
.call(AddressBookRequest::ConnectedToPeer {
zone: self.addr.get_zone(),
connection_handle: connection_book_side_handle,
addr: self.addr.get_network_address(
peer_node_data
.my_port
.try_into()
.map_err(|_| "Peer sent a port that does not fit into a u16")?,
),
id: peer_node_data.peer_id,
reachable: peer_reachable,
last_seen: chrono::Utc::now().naive_utc(),
pruning_seed: pruning_seed.clone(),
rpc_port: peer_node_data.rpc_port,
rpc_credits_per_hash: peer_node_data.rpc_credits_per_hash,
})
.await?;
// This block below is for keeping the last seen times in the address book
// upto date. We only update the last seen times on timed syncs to reduce
// the load on the address book.
//
// first clone the items needed
let mut address_book = self.address_book.clone();
let peer_id = peer_node_data.peer_id;
let net_zone = self.addr.get_zone();
/*
let peer_stream = self.peer_stream.then(|mes| async move {
if let Ok(mes) = &mes {
if mes.id() == TimedSync::ID {
if let Ok(ready_book) = address_book.ready().await {
// we dont care about address book errors here, If there is a problem
// with the address book the node will get shutdown.
let _ = ready_book
.call(AddressBookRequest::SetPeerSeen(
peer_id,
chrono::Utc::now().naive_utc(),
net_zone,
))
.await;
}
}
}
// return the message
mes
});
*/
let connection = Connection::new(
self.addr,
self.peer_sink,
server_rx,
self.connection_tracker,
book_connection_side_handle,
self.peer_request_service,
);
let connection_task = tokio::task::spawn(connection.run().instrument(self.connection_span));
let connection_info = ConnectionInfo {
addr: self.addr,
support_flags: peer_node_data.support_flags,
pruning_seed,
peer_id: peer_node_data.peer_id,
rpc_port: peer_node_data.rpc_port,
rpc_credits_per_hash: peer_node_data.rpc_credits_per_hash,
};
let client = Client::new(connection_info.into(), /* futures::futures_channel::oneshot::Sender<()> */, server_tx, connection_task, /* tokio::task::JoinHandle<()> */);
Ok(client)
}
}

View file

@ -1,74 +0,0 @@
//! A peer connection service wrapper type to handle load tracking and provide access to the
//! reported protocol version.
use std::sync::atomic::Ordering;
use std::{
sync::Arc,
task::{Context, Poll},
};
use cuprate_common::PruningSeed;
use tower::{
load::{Load, PeakEwma},
Service,
};
use crate::{
constants::{EWMA_DECAY_TIME_NANOS, EWMA_DEFAULT_RTT},
peer::{Client, ConnectionInfo},
};
/// A client service wrapper that keeps track of its load.
///
/// It also keeps track of the peer's reported protocol version.
pub struct LoadTrackedClient {
/// A service representing a connected peer, wrapped in a load tracker.
service: PeakEwma<Client>,
/// The metadata for the connected peer `service`.
connection_info: Arc<ConnectionInfo>,
}
/// Create a new [`LoadTrackedClient`] wrapping the provided `client` service.
impl From<Client> for LoadTrackedClient {
fn from(client: Client) -> Self {
let connection_info = client.connection_info.clone();
let service = PeakEwma::new(
client,
EWMA_DEFAULT_RTT,
EWMA_DECAY_TIME_NANOS,
tower::load::CompleteOnResponse::default(),
);
LoadTrackedClient {
service,
connection_info,
}
}
}
impl<Request> Service<Request> for LoadTrackedClient
where
Client: Service<Request>,
{
type Response = <Client as Service<Request>>::Response;
type Error = <Client as Service<Request>>::Error;
type Future = <PeakEwma<Client> as Service<Request>>::Future;
fn poll_ready(&mut self, context: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(context)
}
fn call(&mut self, request: Request) -> Self::Future {
self.service.call(request)
}
}
impl Load for LoadTrackedClient {
type Metric = <PeakEwma<Client> as Load>::Metric;
fn load(&self) -> Self::Metric {
self.service.load()
}
}

View file

@ -1 +0,0 @@
mod handshake;

View file

@ -1 +0,0 @@
pub use crate::peer::handshaker::Handshaker;

View file

@ -1,29 +0,0 @@
pub mod internal_network;
pub use internal_network::{InternalMessageRequest, InternalMessageResponse};
use monero_wire::messages::CoreSyncData;
/// A request to a [`tower::Service`] that handles sync states.
pub enum CoreSyncDataRequest {
/// Get our [`CoreSyncData`].
GetOurs,
/// Handle an incoming [`CoreSyncData`].
NewIncoming(CoreSyncData),
}
/// A response from a [`tower::Service`] that handles sync states.
pub enum CoreSyncDataResponse {
/// Our [`CoreSyncData`]
Ours(CoreSyncData),
/// The incoming [`CoreSyncData`] is ok.
Ok,
}
/// The direction of a connection.
pub enum Direction {
/// An inbound connection.
Inbound,
/// An outbound connection.
Outbound,
}

View file

@ -1,125 +0,0 @@
/// This module defines InternalRequests and InternalResponses. Cuprate's P2P works by translating network messages into an internal
/// request/ response, this is easy for levin "requests" and "responses" (admin messages) but takes a bit more work with "notifications"
/// (protocol messages).
///
/// Some notifications are easy to translate, like `GetObjectsRequest` is obviously a request but others like `NewFluffyBlock` are a
/// bit tricker. To translate a `NewFluffyBlock` into a request/ response we will have to look to see if we asked for `FluffyMissingTransactionsRequest`
/// if we have we interpret `NewFluffyBlock` as a response if not its a request that doesn't require a response.
///
/// Here is every P2P request/ response. *note admin messages are already request/ response so "Handshake" is actually made of a HandshakeRequest & HandshakeResponse
///
/// Admin:
/// Handshake,
/// TimedSync,
/// Ping,
/// SupportFlags
/// Protocol:
/// Request: GetObjectsRequest, Response: GetObjectsResponse,
/// Request: ChainRequest, Response: ChainResponse,
/// Request: FluffyMissingTransactionsRequest, Response: NewFluffyBlock, <- these 2 could be requests or responses
/// Request: GetTxPoolCompliment, Response: NewTransactions, <-
/// Request: NewBlock, Response: None,
/// Request: NewFluffyBlock, Response: None,
/// Request: NewTransactions, Response: None
///
use monero_wire::{
ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest,
GetObjectsResponse, GetTxPoolCompliment, HandshakeRequest, HandshakeResponse, Message,
NewBlock, NewFluffyBlock, NewTransactions, PingResponse, RequestMessage, SupportFlagsResponse,
TimedSyncRequest, TimedSyncResponse,
};
mod try_from;
/// An enum representing a request/ response combination, so a handshake request
/// and response would have the same [`MessageID`]. This allows associating the
/// correct response to a request.
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub enum MessageID {
Handshake,
TimedSync,
Ping,
SupportFlags,
GetObjects,
GetChain,
FluffyMissingTxs,
GetTxPoolCompliment,
NewBlock,
NewFluffyBlock,
NewTransactions,
}
pub enum Request {
Handshake(HandshakeRequest),
TimedSync(TimedSyncRequest),
Ping,
SupportFlags,
GetObjects(GetObjectsRequest),
GetChain(ChainRequest),
FluffyMissingTxs(FluffyMissingTransactionsRequest),
GetTxPoolCompliment(GetTxPoolCompliment),
NewBlock(NewBlock),
NewFluffyBlock(NewFluffyBlock),
NewTransactions(NewTransactions),
}
impl Request {
pub fn id(&self) -> MessageID {
match self {
Request::Handshake(_) => MessageID::Handshake,
Request::TimedSync(_) => MessageID::TimedSync,
Request::Ping => MessageID::Ping,
Request::SupportFlags => MessageID::SupportFlags,
Request::GetObjects(_) => MessageID::GetObjects,
Request::GetChain(_) => MessageID::GetChain,
Request::FluffyMissingTxs(_) => MessageID::FluffyMissingTxs,
Request::GetTxPoolCompliment(_) => MessageID::GetTxPoolCompliment,
Request::NewBlock(_) => MessageID::NewBlock,
Request::NewFluffyBlock(_) => MessageID::NewFluffyBlock,
Request::NewTransactions(_) => MessageID::NewTransactions,
}
}
pub fn needs_response(&self) -> bool {
match self {
Request::NewBlock(_) | Request::NewFluffyBlock(_) | Request::NewTransactions(_) => {
false
}
_ => true,
}
}
}
pub enum Response {
Handshake(HandshakeResponse),
TimedSync(TimedSyncResponse),
Ping(PingResponse),
SupportFlags(SupportFlagsResponse),
GetObjects(GetObjectsResponse),
GetChain(ChainResponse),
NewFluffyBlock(NewFluffyBlock),
NewTransactions(NewTransactions),
NA,
}
impl Response {
pub fn id(&self) -> MessageID {
match self {
Response::Handshake(_) => MessageID::Handshake,
Response::TimedSync(_) => MessageID::TimedSync,
Response::Ping(_) => MessageID::Ping,
Response::SupportFlags(_) => MessageID::SupportFlags,
Response::GetObjects(_) => MessageID::GetObjects,
Response::GetChain(_) => MessageID::GetChain,
Response::NewFluffyBlock(_) => MessageID::NewBlock,
Response::NewTransactions(_) => MessageID::NewFluffyBlock,
Response::NA => panic!("Can't get message ID for a non existent response"),
}
}
}

View file

@ -1,163 +0,0 @@
//! This module contains the implementations of [`TryFrom`] and [`From`] to convert between
//! [`Message`], [`Request`] and [`Response`].
use monero_wire::messages::{Message, ProtocolMessage, RequestMessage, ResponseMessage};
use super::{Request, Response};
pub struct MessageConversionError;
macro_rules! match_body {
(match $value: ident {$($body:tt)*} ($left:pat => $right_ty:expr) $($todo:tt)*) => {
match_body!( match $value {
$left => $right_ty,
$($body)*
} $($todo)* )
};
(match $value: ident {$($body:tt)*}) => {
match $value {
$($body)*
}
};
}
macro_rules! from {
($left_ty:ident, $right_ty:ident, {$($left:ident $(($val: ident))? = $right:ident $(($vall: ident))?,)+}) => {
impl From<$left_ty> for $right_ty {
fn from(value: $left_ty) -> Self {
match_body!( match value {}
$(($left_ty::$left$(($val))? => $right_ty::$right$(($vall))?))+
)
}
}
};
}
macro_rules! try_from {
($left_ty:ident, $right_ty:ident, {$($left:ident $(($val: ident))? = $right:ident $(($vall: ident))?,)+}) => {
impl TryFrom<$left_ty> for $right_ty {
type Error = MessageConversionError;
fn try_from(value: $left_ty) -> Result<Self, Self::Error> {
Ok(match_body!( match value {
_ => return Err(MessageConversionError)
}
$(($left_ty::$left$(($val))? => $right_ty::$right$(($vall))?))+
))
}
}
};
}
macro_rules! from_try_from {
($left_ty:ident, $right_ty:ident, {$($left:ident $(($val: ident))? = $right:ident $(($vall: ident))?,)+}) => {
try_from!($left_ty, $right_ty, {$($left $(($val))? = $right $(($vall))?,)+});
from!($right_ty, $left_ty, {$($right $(($val))? = $left $(($vall))?,)+});
};
}
macro_rules! try_from_try_from {
($left_ty:ident, $right_ty:ident, {$($left:ident $(($val: ident))? = $right:ident $(($vall: ident))?,)+}) => {
try_from!($left_ty, $right_ty, {$($left $(($val))? = $right $(($vall))?,)+});
try_from!($right_ty, $left_ty, {$($right $(($val))? = $left $(($val))?,)+});
};
}
from_try_from!(Request, RequestMessage,{
Handshake(val) = Handshake(val),
Ping = Ping,
SupportFlags = SupportFlags,
TimedSync(val) = TimedSync(val),
});
try_from_try_from!(Request, ProtocolMessage,{
NewBlock(val) = NewBlock(val),
NewFluffyBlock(val) = NewFluffyBlock(val),
GetObjects(val) = GetObjectsRequest(val),
GetChain(val) = ChainRequest(val),
NewTransactions(val) = NewTransactions(val),
FluffyMissingTxs(val) = FluffyMissingTransactionsRequest(val),
GetTxPoolCompliment(val) = GetTxPoolCompliment(val),
});
impl TryFrom<Message> for Request {
type Error = MessageConversionError;
fn try_from(value: Message) -> Result<Self, Self::Error> {
match value {
Message::Request(req) => Ok(req.into()),
Message::Protocol(pro) => pro.try_into(),
_ => Err(MessageConversionError),
}
}
}
impl From<Request> for Message {
fn from(value: Request) -> Self {
match value {
Request::Handshake(val) => Message::Request(RequestMessage::Handshake(val)),
Request::Ping => Message::Request(RequestMessage::Ping),
Request::SupportFlags => Message::Request(RequestMessage::SupportFlags),
Request::TimedSync(val) => Message::Request(RequestMessage::TimedSync(val)),
Request::NewBlock(val) => Message::Protocol(ProtocolMessage::NewBlock(val)),
Request::NewFluffyBlock(val) => Message::Protocol(ProtocolMessage::NewFluffyBlock(val)),
Request::GetObjects(val) => Message::Protocol(ProtocolMessage::GetObjectsRequest(val)),
Request::GetChain(val) => Message::Protocol(ProtocolMessage::ChainRequest(val)),
Request::NewTransactions(val) => Message::Protocol(ProtocolMessage::NewTransactions(val)),
Request::FluffyMissingTxs(val) => Message::Protocol(ProtocolMessage::FluffyMissingTransactionsRequest(val)),
Request::GetTxPoolCompliment(val) => Message::Protocol(ProtocolMessage::GetTxPoolCompliment(val)),
}
}
}
from_try_from!(Response, ResponseMessage,{
Handshake(val) = Handshake(val),
Ping(val) = Ping(val),
SupportFlags(val) = SupportFlags(val),
TimedSync(val) = TimedSync(val),
});
try_from_try_from!(Response, ProtocolMessage,{
NewFluffyBlock(val) = NewFluffyBlock(val),
GetObjects(val) = GetObjectsResponse(val),
GetChain(val) = ChainEntryResponse(val),
NewTransactions(val) = NewTransactions(val),
});
impl TryFrom<Message> for Response {
type Error = MessageConversionError;
fn try_from(value: Message) -> Result<Self, Self::Error> {
match value {
Message::Response(res) => Ok(res.into()),
Message::Protocol(pro) => pro.try_into(),
_ => Err(MessageConversionError),
}
}
}
impl TryFrom<Response> for Message {
type Error = MessageConversionError;
fn try_from(value: Response) -> Result<Self, Self::Error> {
Ok(match value {
Response::Handshake(val) => Message::Response(ResponseMessage::Handshake(val)),
Response::Ping(val) => Message::Response(ResponseMessage::Ping(val)),
Response::SupportFlags(val) => Message::Response(ResponseMessage::SupportFlags(val)),
Response::TimedSync(val) => Message::Response(ResponseMessage::TimedSync(val)),
Response::NewFluffyBlock(val) => Message::Protocol(ProtocolMessage::NewFluffyBlock(val)),
Response::GetObjects(val) => Message::Protocol(ProtocolMessage::GetObjectsResponse(val)),
Response::GetChain(val) => Message::Protocol(ProtocolMessage::ChainEntryResponse(val)),
Response::NewTransactions(val) => Message::Protocol(ProtocolMessage::NewTransactions(val)),
Response::NA => return Err(MessageConversionError),
})
}
}