P2P: Network init (#130)

* p2p changes

* clippy

* a few more docs

* init cuprate-p2p

* remove some unrelated code and add some docs

* start documenting client_pool.rs

* add more docs

* typo

* fix docs

* use JoinSet in connection maintainer

* small changes

* add peer sync state svc

* add broadcast svc

* add more docs

* add some tests

* add a test

* fix merge

* add another test

* unify PeerDisconnectFut and add more docs

* start network init

* add an inbound connection server

* remove crate doc for now

* fix address book docs

* fix leak in client pool

* correct comment

* fix merge + add some docs

* fix doc

* dandelion_tower -> dandelion-tower

* fix async-buffer builds

* check if incoming peers are banned

* add interface methods

* update docs

* use a JoinSet for background network tasks

* Apply suggestions from code review

Co-authored-by: hinto-janai <hinto.janai@protonmail.com>

* Update p2p/monero-p2p/src/services.rs

Co-authored-by: hinto-janai <hinto.janai@protonmail.com>

---------

Co-authored-by: hinto-janai <hinto.janai@protonmail.com>
This commit is contained in:
Boog900 2024-06-04 17:19:25 +00:00 committed by GitHub
parent 889e15738b
commit b510739701
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 415 additions and 66 deletions

View file

@ -13,6 +13,7 @@ members = [
"p2p/cuprate-p2p", "p2p/cuprate-p2p",
"p2p/dandelion", "p2p/dandelion",
"p2p/monero-p2p", "p2p/monero-p2p",
"p2p/async-buffer",
"p2p/address-book", "p2p/address-book",
"storage/cuprate-blockchain", "storage/cuprate-blockchain",
"storage/cuprate-txpool", "storage/cuprate-txpool",

View file

@ -11,7 +11,7 @@ monero-pruning = { path = "../../pruning" }
monero-wire = { path= "../../net/monero-wire" } monero-wire = { path= "../../net/monero-wire" }
monero-p2p = { path = "../monero-p2p" } monero-p2p = { path = "../monero-p2p" }
tower = { workspace = true, features = ["util", "buffer"] } tower = { workspace = true, features = ["util"] }
tokio = { workspace = true, features = ["time", "fs", "rt"]} tokio = { workspace = true, features = ["time", "fs", "rt"]}
tokio-util = { workspace = true, features = ["time"] } tokio-util = { workspace = true, features = ["time"] }

View file

@ -409,6 +409,9 @@ impl<Z: NetworkZone> Service<AddressBookRequest<Z>> for AddressBook<Z> {
AddressBookRequest::GetWhitePeers(len) => { AddressBookRequest::GetWhitePeers(len) => {
Ok(AddressBookResponse::Peers(self.get_white_peers(len))) Ok(AddressBookResponse::Peers(self.get_white_peers(len)))
} }
AddressBookRequest::IsPeerBanned(addr) => Ok(AddressBookResponse::IsPeerBanned(
self.is_peer_banned(&addr),
)),
}; };
ready(response) ready(response)

View file

@ -2,7 +2,7 @@
//! //!
//! This module holds the logic for persistent peer storage. //! This module holds the logic for persistent peer storage.
//! Cuprates address book is modeled as a [`tower::Service`] //! Cuprates address book is modeled as a [`tower::Service`]
//! The request is [`AddressBookRequest`] and the response is //! The request is [`AddressBookRequest`](monero_p2p::services::AddressBookRequest) and the response is
//! [`AddressBookResponse`](monero_p2p::services::AddressBookResponse). //! [`AddressBookResponse`](monero_p2p::services::AddressBookResponse).
//! //!
//! Cuprate, like monerod, actually has multiple address books, one //! Cuprate, like monerod, actually has multiple address books, one
@ -13,9 +13,7 @@
//! //!
use std::{io::ErrorKind, path::PathBuf, time::Duration}; use std::{io::ErrorKind, path::PathBuf, time::Duration};
use tower::buffer::Buffer; use monero_p2p::NetworkZone;
use monero_p2p::{services::AddressBookRequest, NetworkZone};
mod book; mod book;
mod peer_list; mod peer_list;
@ -65,7 +63,7 @@ pub enum AddressBookError {
/// Initializes the P2P address book for a specific network zone. /// Initializes the P2P address book for a specific network zone.
pub async fn init_address_book<Z: NetworkZone>( pub async fn init_address_book<Z: NetworkZone>(
cfg: AddressBookConfig, cfg: AddressBookConfig,
) -> Result<Buffer<book::AddressBook<Z>, AddressBookRequest<Z>>, std::io::Error> { ) -> Result<book::AddressBook<Z>, std::io::Error> {
tracing::info!( tracing::info!(
"Loading peers from file: {} ", "Loading peers from file: {} ",
cfg.peer_store_file.display() cfg.peer_store_file.display()
@ -82,5 +80,5 @@ pub async fn init_address_book<Z: NetworkZone>(
let address_book = book::AddressBook::<Z>::new(cfg, white_list, gray_list, Vec::new()); let address_book = book::AddressBook::<Z>::new(cfg, white_list, gray_list, Vec::new());
Ok(Buffer::new(address_book, 150)) Ok(address_book)
} }

View file

@ -48,7 +48,7 @@ pub fn new_buffer<T>(max_item_weight: usize) -> (BufferAppender<T>, BufferStream
queue: tx, queue: tx,
sink_waker: sink_waker.clone(), sink_waker: sink_waker.clone(),
capacity: capacity_atomic.clone(), capacity: capacity_atomic.clone(),
max_item_weight: capacity, max_item_weight,
}, },
BufferStream { BufferStream {
queue: rx, queue: rx,

View file

@ -15,8 +15,8 @@ cuprate-helper = { path = "../../helper", features = ["asynch"] }
monero-serai = { workspace = true, features = ["std"] } monero-serai = { workspace = true, features = ["std"] }
tower = { workspace = true } tower = { workspace = true, features = ["buffer"] }
tokio = { workspace = true, features = ["rt"] } tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
rayon = { workspace = true } rayon = { workspace = true }
tokio-util = { workspace = true } tokio-util = { workspace = true }
tokio-stream = { workspace = true, features = ["sync", "time"] } tokio-stream = { workspace = true, features = ["sync", "time"] }
@ -32,5 +32,7 @@ rand_distr = { workspace = true, features = ["std"] }
hex = { workspace = true, features = ["std"] } hex = { workspace = true, features = ["std"] }
tracing = { workspace = true, features = ["std", "attributes"] } tracing = { workspace = true, features = ["std", "attributes"] }
tracing-subscriber = "0.3.18"
[dev-dependencies] [dev-dependencies]
cuprate-test-utils = { path = "../../test-utils" } cuprate-test-utils = { path = "../../test-utils" }

View file

@ -151,6 +151,7 @@ pub enum BroadcastRequest<N: NetworkZone> {
}, },
} }
#[derive(Clone)]
pub struct BroadcastSvc<N: NetworkZone> { pub struct BroadcastSvc<N: NetworkZone> {
new_block_watch: watch::Sender<NewBlockInfo>, new_block_watch: watch::Sender<NewBlockInfo>,
tx_broadcast_channel_outbound: broadcast::Sender<BroadcastTxInfo<N>>, tx_broadcast_channel_outbound: broadcast::Sender<BroadcastTxInfo<N>>,

View file

@ -12,13 +12,14 @@
//! //!
use std::sync::Arc; use std::sync::Arc;
use dashmap::{DashMap, DashSet}; use dashmap::DashMap;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tracing::{Instrument, Span};
use monero_p2p::{ use monero_p2p::{
client::{Client, InternalPeerID}, client::{Client, InternalPeerID},
handles::ConnectionHandle, handles::ConnectionHandle,
ConnectionDirection, NetworkZone, NetworkZone,
}; };
pub(crate) mod disconnect_monitor; pub(crate) mod disconnect_monitor;
@ -32,12 +33,6 @@ pub use drop_guard_client::ClientPoolDropGuard;
pub struct ClientPool<N: NetworkZone> { pub struct ClientPool<N: NetworkZone> {
/// The connected [`Client`]s. /// The connected [`Client`]s.
clients: DashMap<InternalPeerID<N::Addr>, Client<N>>, clients: DashMap<InternalPeerID<N::Addr>, Client<N>>,
/// A set of outbound clients, as these allow accesses/mutation from different threads,
/// a peer ID in here does not mean the peer is necessarily in `clients` as it could have been removed
/// by another thread. However, if the peer is in both here and `clients` it is definitely
/// an outbound peer.
outbound_clients: DashSet<InternalPeerID<N::Addr>>,
/// A channel to send new peer ids down to monitor for disconnect. /// A channel to send new peer ids down to monitor for disconnect.
new_connection_tx: mpsc::UnboundedSender<(ConnectionHandle, InternalPeerID<N::Addr>)>, new_connection_tx: mpsc::UnboundedSender<(ConnectionHandle, InternalPeerID<N::Addr>)>,
} }
@ -49,11 +44,12 @@ impl<N: NetworkZone> ClientPool<N> {
let pool = Arc::new(ClientPool { let pool = Arc::new(ClientPool {
clients: DashMap::new(), clients: DashMap::new(),
outbound_clients: DashSet::new(),
new_connection_tx: tx, new_connection_tx: tx,
}); });
tokio::spawn(disconnect_monitor::disconnect_monitor(rx, pool.clone())); tokio::spawn(
disconnect_monitor::disconnect_monitor(rx, pool.clone()).instrument(Span::current()),
);
pool pool
} }
@ -74,10 +70,6 @@ impl<N: NetworkZone> ClientPool<N> {
return; return;
} }
if client.info.direction == ConnectionDirection::OutBound {
self.outbound_clients.insert(id);
}
let res = self.clients.insert(id, client); let res = self.clients.insert(id, client);
assert!(res.is_none()); assert!(res.is_none());
@ -106,8 +98,6 @@ impl<N: NetworkZone> ClientPool<N> {
/// ///
/// [`None`] is returned if the client did not exist in the pool. /// [`None`] is returned if the client did not exist in the pool.
fn remove_client(&self, peer: &InternalPeerID<N::Addr>) -> Option<Client<N>> { fn remove_client(&self, peer: &InternalPeerID<N::Addr>) -> Option<Client<N>> {
self.outbound_clients.remove(peer);
self.clients.remove(peer).map(|(_, client)| client) self.clients.remove(peer).map(|(_, client)| client)
} }

View file

@ -24,6 +24,11 @@ pub async fn disconnect_monitor<N: NetworkZone>(
mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID<N::Addr>)>, mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID<N::Addr>)>,
client_pool: Arc<ClientPool<N>>, client_pool: Arc<ClientPool<N>>,
) { ) {
// We need to hold a weak reference otherwise the client pool and this would hold a reference to
// each other causing the pool to be leaked.
let weak_client_pool = Arc::downgrade(&client_pool);
drop(client_pool);
tracing::info!("Starting peer disconnect monitor."); tracing::info!("Starting peer disconnect monitor.");
let mut futs: FuturesUnordered<PeerDisconnectFut<N>> = FuturesUnordered::new(); let mut futs: FuturesUnordered<PeerDisconnectFut<N>> = FuturesUnordered::new();
@ -39,7 +44,13 @@ pub async fn disconnect_monitor<N: NetworkZone>(
} }
Some(peer_id) = futs.next() => { Some(peer_id) = futs.next() => {
tracing::debug!("{peer_id} has disconnected, removing from client pool."); tracing::debug!("{peer_id} has disconnected, removing from client pool.");
client_pool.remove_client(&peer_id); let Some(pool) = weak_client_pool.upgrade() else {
tracing::info!("Peer disconnect monitor shutting down.");
return;
};
pool.remove_client(&peer_id);
drop(pool);
} }
else => { else => {
tracing::info!("Peer disconnect monitor shutting down."); tracing::info!("Peer disconnect monitor shutting down.");

View file

@ -14,7 +14,7 @@ pub struct ClientPoolDropGuard<N: NetworkZone> {
/// The [`Client`]. /// The [`Client`].
/// ///
/// This is set to [`Some`] when this guard is created, then /// This is set to [`Some`] when this guard is created, then
/// ### [`take`](Option::take)n and returned to the pool when dropped. /// [`take`](Option::take)n and returned to the pool when dropped.
pub(super) client: Option<Client<N>>, pub(super) client: Option<Client<N>>,
} }

View file

@ -1,12 +1,52 @@
use cuprate_helper::network::Network;
use monero_address_book::AddressBookConfig;
use monero_p2p::NetworkZone;
use monero_wire::{common::PeerSupportFlags, BasicNodeData};
/// P2P config. /// P2P config.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct P2PConfig { pub struct P2PConfig<N: NetworkZone> {
/// The [`Network`] we should connect to.
pub network: Network,
/// The number of outbound connections to make and try keep. /// The number of outbound connections to make and try keep.
pub outbound_connections: usize, pub outbound_connections: usize,
/// The amount of extra connections we can make if we are under load from the rest of Cuprate. /// The amount of extra connections we can make if we are under load from the rest of Cuprate.
pub extra_outbound_connections: usize, pub extra_outbound_connections: usize,
/// The maximum amount of inbound connections, only relevant if [`P2PConfig::server_config`] is set to [`Some`]
pub max_inbound_connections: usize,
/// The percent of outbound peers that should be gray aka never connected to before. /// The percent of outbound peers that should be gray aka never connected to before.
/// ///
/// Only values 0..=1 are valid. /// Only values 0..=1 are valid.
pub gray_peers_percent: f64, pub gray_peers_percent: f64,
/// The inbound server configuration,
///
/// If this is [`None`] no inbound connections will be accepted.
pub server_config: Option<N::ServerCfg>,
/// The port to listen on for inbound connections, only relevant if [`P2PConfig::server_config`] is set to [`Some`].
pub p2p_port: u16,
/// The public RPC port to tell peers about so wallets can use our node. `0` if we do not have a public RPC port.
pub rpc_port: u16,
/// The [`AddressBookConfig`].
pub address_book_config: AddressBookConfig,
}
impl<N: NetworkZone> P2PConfig<N> {
/// Returns the [`BasicNodeData`] for this [`P2PConfig`].
///
/// [`BasicNodeData::peer_id`] is set to a random u64, so this function should only be called once
/// per [`NetworkZone`] per run.
pub(crate) fn basic_node_data(&self) -> BasicNodeData {
BasicNodeData {
my_port: u32::from(self.p2p_port),
network_id: self.network.network_id(),
peer_id: rand::random(),
support_flags: PeerSupportFlags::FLUFFY_BLOCKS,
rpc_port: self.rpc_port,
// We do not (and probably will never) support paying for RPC with hashes.
rpc_credits_per_hash: 0,
}
}
} }

View file

@ -12,7 +12,7 @@ use tokio::{
time::{sleep, timeout}, time::{sleep, timeout},
}; };
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use tracing::instrument; use tracing::{instrument, Instrument, Span};
use monero_p2p::{ use monero_p2p::{
client::{Client, ConnectRequest, HandshakeError}, client::{Client, ConnectRequest, HandshakeError},
@ -60,7 +60,7 @@ pub struct OutboundConnectionKeeper<N: NetworkZone, A, C> {
/// we add a permit to the semaphore and keep track here, upto a value in config. /// we add a permit to the semaphore and keep track here, upto a value in config.
pub extra_peers: usize, pub extra_peers: usize,
/// The p2p config. /// The p2p config.
pub config: P2PConfig, pub config: P2PConfig<N>,
/// The [`Bernoulli`] distribution, when sampled will return true if we should connect to a gray peer or /// The [`Bernoulli`] distribution, when sampled will return true if we should connect to a gray peer or
/// false if we should connect to a white peer. /// false if we should connect to a white peer.
/// ///
@ -76,7 +76,7 @@ where
C::Future: Send + 'static, C::Future: Send + 'static,
{ {
pub fn new( pub fn new(
config: P2PConfig, config: P2PConfig<N>,
client_pool: Arc<ClientPool<N>>, client_pool: Arc<ClientPool<N>>,
make_connection_rx: mpsc::Receiver<MakeConnectionRequest>, make_connection_rx: mpsc::Receiver<MakeConnectionRequest>,
address_book_svc: A, address_book_svc: A,
@ -149,7 +149,7 @@ where
} }
/// Connects to a given outbound peer. /// Connects to a given outbound peer.
#[instrument(level = "info", skip(self, permit), fields(%addr))] #[instrument(level = "info", skip_all)]
async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) { async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) {
let client_pool = self.client_pool.clone(); let client_pool = self.client_pool.clone();
let connection_fut = self let connection_fut = self
@ -159,11 +159,14 @@ where
.expect("Connector had an error in `poll_ready`") .expect("Connector had an error in `poll_ready`")
.call(ConnectRequest { addr, permit }); .call(ConnectRequest { addr, permit });
tokio::spawn(async move { tokio::spawn(
async move {
if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await { if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await {
client_pool.add_new_client(peer); client_pool.add_new_client(peer);
} }
}); }
.instrument(Span::current()),
);
} }
/// Handles a request from the peer set for more peers. /// Handles a request from the peer set for more peers.

View file

@ -1,7 +1,7 @@
use std::time::Duration; use std::time::Duration;
/// The timeout we set on handshakes. /// The timeout we set on handshakes.
pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30); pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(20);
/// The maximum amount of connections to make to seed nodes for when we need peers. /// The maximum amount of connections to make to seed nodes for when we need peers.
pub(crate) const MAX_SEED_CONNECTIONS: usize = 3; pub(crate) const MAX_SEED_CONNECTIONS: usize = 3;
@ -28,6 +28,12 @@ pub(crate) const SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT: usize = 10 * 1024 * 1024;
/// 50 more transactions after it are added to the queue. /// 50 more transactions after it are added to the queue.
pub(crate) const MAX_TXS_IN_BROADCAST_CHANNEL: usize = 50; pub(crate) const MAX_TXS_IN_BROADCAST_CHANNEL: usize = 50;
/// The time to sleep after an inbound connection comes in.
///
/// This is a safety measure to prevent Cuprate from getting spammed with a load of inbound connections.
/// TODO: it might be a good idea to make this configurable.
pub(crate) const INBOUND_CONNECTION_COOL_DOWN: Duration = Duration::from_millis(500);
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View file

@ -0,0 +1,113 @@
//! # Inbound Server
//!
//! This module contains the inbound connection server, which listens for inbound connections, gives
//! them to the handshaker service and then adds them to the client pool.
use std::{pin::pin, sync::Arc};
use futures::StreamExt;
use tokio::{
sync::Semaphore,
time::{sleep, timeout},
};
use tower::{Service, ServiceExt};
use tracing::{instrument, Instrument, Span};
use monero_p2p::{
client::{Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
services::{AddressBookRequest, AddressBookResponse},
AddressBook, ConnectionDirection, NetworkZone,
};
use crate::{
client_pool::ClientPool,
constants::{HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN},
P2PConfig,
};
/// Starts the inbound server.
#[instrument(level = "warn", skip_all)]
pub async fn inbound_server<N, HS, A>(
client_pool: Arc<ClientPool<N>>,
mut handshaker: HS,
mut address_book: A,
config: P2PConfig<N>,
) -> Result<(), tower::BoxError>
where
N: NetworkZone,
HS: Service<DoHandshakeRequest<N>, Response = Client<N>, Error = HandshakeError>
+ Send
+ 'static,
HS::Future: Send + 'static,
A: AddressBook<N>,
{
let Some(server_config) = config.server_config else {
tracing::warn!("No inbound server config provided, not listening for inbound connections.");
return Ok(());
};
tracing::info!("Starting inbound connection server");
let listener = N::incoming_connection_listener(server_config, config.p2p_port)
.await
.inspect_err(|e| tracing::warn!("Failed to start inbound server: {e}"))?;
let mut listener = pin!(listener);
let semaphore = Arc::new(Semaphore::new(config.max_inbound_connections));
while let Some(connection) = listener.next().await {
let Ok((addr, peer_stream, peer_sink)) = connection else {
continue;
};
if let Some(addr) = &addr {
let AddressBookResponse::IsPeerBanned(banned) = address_book
.ready()
.await?
.call(AddressBookRequest::IsPeerBanned(*addr))
.await?
else {
panic!("Address book returned incorrect response!");
};
if banned {
continue;
}
}
let addr = match addr {
Some(addr) => InternalPeerID::KnownAddr(addr),
None => InternalPeerID::Unknown(rand::random()),
};
if let Ok(permit) = semaphore.clone().try_acquire_owned() {
tracing::debug!("Permit free for incoming connection, attempting handshake.");
let fut = handshaker.ready().await?.call(DoHandshakeRequest {
addr,
peer_stream,
peer_sink,
direction: ConnectionDirection::InBound,
permit,
});
let cloned_pool = client_pool.clone();
tokio::spawn(
async move {
if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, fut).await {
cloned_pool.add_new_client(peer);
}
}
.instrument(Span::current()),
);
} else {
tracing::debug!("No permit free for incoming connection.");
// TODO: listen for if the peer is just trying to ping us to see if we are reachable.
}
sleep(INBOUND_CONNECTION_COOL_DOWN).await;
}
Ok(())
}

View file

@ -1,17 +1,186 @@
//! Cuprate's P2P Crate. //! Cuprate's P2P Crate.
//! //!
//! This crate contains a [`ClientPool`](client_pool::ClientPool) which holds connected peers on a single [`NetworkZone`](monero_p2p::NetworkZone). //! This crate contains a [`NetworkInterface`] which allows interacting with the Monero P2P network on
//! //! a certain [`NetworkZone`]
//! This crate also contains the different routing methods that control how messages should be sent, i.e. broadcast to all, use std::sync::Arc;
//! or send to a single peer.
//! use futures::FutureExt;
#![allow(dead_code)] use tokio::{
sync::{mpsc, watch},
task::JoinSet,
};
use tokio_stream::wrappers::WatchStream;
use tower::{buffer::Buffer, util::BoxCloneService, ServiceExt};
use tracing::{instrument, Instrument, Span};
use monero_p2p::{
client::Connector,
client::InternalPeerID,
services::{AddressBookRequest, AddressBookResponse},
CoreSyncSvc, NetworkZone, PeerRequestHandler,
};
mod broadcast; mod broadcast;
pub mod client_pool; mod client_pool;
pub mod config; pub mod config;
pub mod connection_maintainer; pub mod connection_maintainer;
mod constants; mod constants;
mod inbound_server;
mod sync_states; mod sync_states;
pub use broadcast::{BroadcastRequest, BroadcastSvc};
use client_pool::ClientPoolDropGuard;
pub use config::P2PConfig; pub use config::P2PConfig;
use connection_maintainer::MakeConnectionRequest;
/// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`].
///
/// This function starts all the tasks to maintain/accept/make connections.
///
/// # Usage
/// You must provide:
/// - A peer request handler, which is given to each connection
/// - A core sync service, which keeps track of the sync state of our node
#[instrument(level = "debug", name = "net", skip_all, fields(zone = N::NAME))]
pub async fn initialize_network<N, R, CS>(
peer_req_handler: R,
core_sync_svc: CS,
config: P2PConfig<N>,
) -> Result<NetworkInterface<N>, tower::BoxError>
where
N: NetworkZone,
R: PeerRequestHandler + Clone,
CS: CoreSyncSvc + Clone,
{
let address_book =
monero_address_book::init_address_book(config.address_book_config.clone()).await?;
let address_book = Buffer::new(
address_book,
config.max_inbound_connections + config.outbound_connections,
);
let (sync_states_svc, top_block_watch) = sync_states::PeerSyncSvc::new();
let sync_states_svc = Buffer::new(
sync_states_svc,
config.max_inbound_connections + config.outbound_connections,
);
// Use the default config. Changing the defaults affects tx fluff times, which could affect D++ so for now don't allow changing
// this.
let (broadcast_svc, outbound_mkr, inbound_mkr) =
broadcast::init_broadcast_channels(broadcast::BroadcastConfig::default());
let mut basic_node_data = config.basic_node_data();
if !N::CHECK_NODE_ID {
basic_node_data.peer_id = 1;
}
let outbound_handshaker = monero_p2p::client::HandShaker::new(
address_book.clone(),
sync_states_svc.clone(),
core_sync_svc.clone(),
peer_req_handler.clone(),
outbound_mkr,
basic_node_data.clone(),
);
let inbound_handshaker = monero_p2p::client::HandShaker::new(
address_book.clone(),
sync_states_svc,
core_sync_svc.clone(),
peer_req_handler,
inbound_mkr,
basic_node_data,
);
let client_pool = client_pool::ClientPool::new();
let (make_connection_tx, make_connection_rx) = mpsc::channel(3);
let outbound_connector = Connector::new(outbound_handshaker);
let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new(
config.clone(),
client_pool.clone(),
make_connection_rx,
address_book.clone(),
outbound_connector,
);
let mut background_tasks = JoinSet::new();
background_tasks.spawn(
outbound_connection_maintainer
.run()
.instrument(Span::current()),
);
background_tasks.spawn(
inbound_server::inbound_server(
client_pool.clone(),
inbound_handshaker,
address_book.clone(),
config,
)
.map(|res| {
if let Err(e) = res {
tracing::error!("Error in inbound connection listener: {e}")
}
tracing::info!("Inbound connection listener shutdown")
})
.instrument(Span::current()),
);
Ok(NetworkInterface {
pool: client_pool,
broadcast_svc,
top_block_watch,
make_connection_tx,
address_book: address_book.boxed_clone(),
_background_tasks: Arc::new(background_tasks),
})
}
/// The interface to Monero's P2P network on a certain [`NetworkZone`].
#[derive(Clone)]
pub struct NetworkInterface<N: NetworkZone> {
/// A pool of free connected peers.
pool: Arc<client_pool::ClientPool<N>>,
/// A [`Service`] that allows broadcasting to all connected peers.
broadcast_svc: BroadcastSvc<N>,
/// A [`watch`] channel that contains the highest seen cumulative difficulty and other info
/// on that claimed chain.
top_block_watch: watch::Receiver<sync_states::NewSyncInfo>,
/// A channel to request extra connections.
#[allow(dead_code)] // will be used eventually
make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
/// The address book service.
address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>,
/// Background tasks that will be aborted when this interface is dropped.
_background_tasks: Arc<JoinSet<()>>,
}
impl<N: NetworkZone> NetworkInterface<N> {
/// Returns a service which allows broadcasting messages to all the connected peers in a specific [`NetworkZone`].
pub fn broadcast_svc(&self) -> BroadcastSvc<N> {
self.broadcast_svc.clone()
}
/// Returns a stream which yields the highest seen sync state from a connected peer.
pub fn top_sync_stream(&self) -> WatchStream<sync_states::NewSyncInfo> {
WatchStream::from_changes(self.top_block_watch.clone())
}
/// Returns the address book service.
pub fn address_book(
&self,
) -> BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError> {
self.address_book.clone()
}
/// Pulls a client from the client pool, returning it in a guard that will return it there when it's
/// dropped.
pub fn borrow_client(&self, peer: &InternalPeerID<N::Addr>) -> Option<ClientPoolDropGuard<N>> {
self.pool.borrow_client(peer)
}
}

View file

@ -25,14 +25,14 @@ use monero_wire::CoreSyncData;
use crate::{client_pool::disconnect_monitor::PeerDisconnectFut, constants::SHORT_BAN}; use crate::{client_pool::disconnect_monitor::PeerDisconnectFut, constants::SHORT_BAN};
/// The highest claimed sync info from our connected peers. /// The highest claimed sync info from our connected peers.
#[derive(Debug)] #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct NewSyncInfo { pub struct NewSyncInfo {
/// The peers chain height. /// The peers chain height.
chain_height: u64, pub chain_height: u64,
/// The peers top block's hash. /// The peers top block's hash.
top_hash: [u8; 32], pub top_hash: [u8; 32],
/// The peers cumulative difficulty. /// The peers cumulative difficulty.
cumulative_difficulty: u128, pub cumulative_difficulty: u128,
} }
/// A service that keeps track of our peers blockchains. /// A service that keeps track of our peers blockchains.

View file

@ -1,5 +1,5 @@
[package] [package]
name = "dandelion_tower" name = "dandelion-tower"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
license = "MIT" license = "MIT"

View file

@ -29,13 +29,13 @@ pub use handshaker::{DoHandshakeRequest, HandShaker, HandshakeError};
use monero_pruning::PruningSeed; use monero_pruning::PruningSeed;
/// An internal identifier for a given peer, will be their address if known /// An internal identifier for a given peer, will be their address if known
/// or a random u64 if not. /// or a random u128 if not.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum InternalPeerID<A> { pub enum InternalPeerID<A> {
/// A known address. /// A known address.
KnownAddr(A), KnownAddr(A),
/// An unknown address (probably an inbound anonymity network connection). /// An unknown address (probably an inbound anonymity network connection).
Unknown(u64), Unknown(u128),
} }
impl<A: Display> Display for InternalPeerID<A> { impl<A: Display> Display for InternalPeerID<A> {

View file

@ -130,11 +130,11 @@ pub trait NetworkZone: Clone + Copy + Send + 'static {
/// The sink (outgoing data) type for this network. /// The sink (outgoing data) type for this network.
type Sink: Sink<LevinMessage<Message>, Error = BucketError> + Unpin + Send + 'static; type Sink: Sink<LevinMessage<Message>, Error = BucketError> + Unpin + Send + 'static;
/// The inbound connection listener for this network. /// The inbound connection listener for this network.
type Listener: Stream< type Listener: Stream<Item = Result<(Option<Self::Addr>, Self::Stream, Self::Sink), std::io::Error>>
Item = Result<(Option<Self::Addr>, Self::Stream, Self::Sink), std::io::Error>, + Send
>; + 'static;
/// Config used to start a server which listens for incoming connections. /// Config used to start a server which listens for incoming connections.
type ServerCfg; type ServerCfg: Clone + Debug + Send + 'static;
async fn connect_to_peer( async fn connect_to_peer(
addr: Self::Addr, addr: Self::Addr,
@ -142,6 +142,7 @@ pub trait NetworkZone: Clone + Copy + Send + 'static {
async fn incoming_connection_listener( async fn incoming_connection_listener(
config: Self::ServerCfg, config: Self::ServerCfg,
port: u16,
) -> Result<Self::Listener, std::io::Error>; ) -> Result<Self::Listener, std::io::Error>;
} }

View file

@ -37,8 +37,9 @@ impl NetZoneAddress for SocketAddr {
} }
} }
#[derive(Debug, Clone)]
pub struct ClearNetServerCfg { pub struct ClearNetServerCfg {
pub addr: SocketAddr, pub ip: IpAddr,
} }
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
@ -80,8 +81,9 @@ impl NetworkZone for ClearNet {
async fn incoming_connection_listener( async fn incoming_connection_listener(
config: Self::ServerCfg, config: Self::ServerCfg,
port: u16,
) -> Result<Self::Listener, std::io::Error> { ) -> Result<Self::Listener, std::io::Error> {
let listener = TcpListener::bind(config.addr).await?; let listener = TcpListener::bind(SocketAddr::new(config.ip, port)).await?;
Ok(InBoundStream { listener }) Ok(InBoundStream { listener })
} }
} }

View file

@ -119,10 +119,14 @@ pub enum AddressBookRequest<Z: NetworkZone> {
TakeRandomPeer { height: Option<u64> }, TakeRandomPeer { height: Option<u64> },
/// Gets the specified number of white peers, or less if we don't have enough. /// Gets the specified number of white peers, or less if we don't have enough.
GetWhitePeers(usize), GetWhitePeers(usize),
/// Checks if the given peer is banned.
IsPeerBanned(Z::Addr),
} }
pub enum AddressBookResponse<Z: NetworkZone> { pub enum AddressBookResponse<Z: NetworkZone> {
Ok, Ok,
Peer(ZoneSpecificPeerListEntryBase<Z::Addr>), Peer(ZoneSpecificPeerListEntryBase<Z::Addr>),
Peers(Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>), Peers(Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>),
/// Contains `true` if the peer is banned.
IsPeerBanned(bool),
} }

View file

@ -71,8 +71,9 @@ impl NetworkZone for FragNet {
async fn incoming_connection_listener( async fn incoming_connection_listener(
config: Self::ServerCfg, config: Self::ServerCfg,
port: u16,
) -> Result<Self::Listener, std::io::Error> { ) -> Result<Self::Listener, std::io::Error> {
let listener = TcpListener::bind(config.addr).await?; let listener = TcpListener::bind(SocketAddr::new(config.ip, port)).await?;
Ok(InBoundStream { listener }) Ok(InBoundStream { listener })
} }
} }
@ -194,9 +195,9 @@ async fn fragmented_handshake_monerod_to_cuprate() {
our_basic_node_data, our_basic_node_data,
); );
let addr = "127.0.0.1:18081".parse().unwrap(); let ip = "127.0.0.1".parse().unwrap();
let mut listener = FragNet::incoming_connection_listener(ClearNetServerCfg { addr }) let mut listener = FragNet::incoming_connection_listener(ClearNetServerCfg { ip }, 18081)
.await .await
.unwrap(); .unwrap();

View file

@ -174,9 +174,9 @@ async fn handshake_monerod_to_cuprate() {
our_basic_node_data, our_basic_node_data,
); );
let addr = "127.0.0.1:18081".parse().unwrap(); let ip = "127.0.0.1".parse().unwrap();
let mut listener = ClearNet::incoming_connection_listener(ClearNetServerCfg { addr }) let mut listener = ClearNet::incoming_connection_listener(ClearNetServerCfg { ip }, 18081)
.await .await
.unwrap(); .unwrap();

View file

@ -88,7 +88,8 @@ impl<const ALLOW_SYNC: bool, const DANDELION_PP: bool, const CHECK_NODE_ID: bool
Box< Box<
dyn Stream< dyn Stream<
Item = Result<(Option<Self::Addr>, Self::Stream, Self::Sink), std::io::Error>, Item = Result<(Option<Self::Addr>, Self::Stream, Self::Sink), std::io::Error>,
>, > + Send
+ 'static,
>, >,
>; >;
type ServerCfg = (); type ServerCfg = ();
@ -97,7 +98,10 @@ impl<const ALLOW_SYNC: bool, const DANDELION_PP: bool, const CHECK_NODE_ID: bool
unimplemented!() unimplemented!()
} }
async fn incoming_connection_listener(_: Self::ServerCfg) -> Result<Self::Listener, Error> { async fn incoming_connection_listener(
_: Self::ServerCfg,
_: u16,
) -> Result<Self::Listener, Error> {
unimplemented!() unimplemented!()
} }
} }