fix merge + add some docs

This commit is contained in:
Boog900 2024-05-22 21:43:45 +01:00
parent 7f5e44f7b1
commit 20f9a063ea
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
7 changed files with 148 additions and 17 deletions

View file

@ -24,8 +24,8 @@ pub async fn disconnect_monitor<N: NetworkZone>(
mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID<N::Addr>)>,
client_pool: Arc<ClientPool<N>>,
) {
// We need to hold a weak reference otherwise the client pool and this would be a circular reference
// which means the pool would be leaked.
// We need to hold a weak reference otherwise the client pool and this would hold a reference to each
// other causing the pool to be leaked.
let weak_client_pool = Arc::downgrade(&client_pool);
drop(client_pool);

View file

@ -14,7 +14,7 @@ pub struct ClientPoolDropGuard<N: NetworkZone> {
/// The [`Client`].
///
/// This is set to [`Some`] when this guard is created, then
/// ### [`take`](Option::take)n and returned to the pool when dropped.
/// [`take`](Option::take)n and returned to the pool when dropped.
pub(super) client: Option<Client<N>>,
}

View file

@ -1,12 +1,12 @@
use cuprate_helper::network::Network;
use monero_address_book::AddressBookConfig;
use monero_p2p::NetworkZone;
use monero_wire::common::PeerSupportFlags;
use monero_wire::BasicNodeData;
use monero_wire::{common::PeerSupportFlags, BasicNodeData};
/// P2P config.
#[derive(Clone, Debug)]
pub struct P2PConfig<N: NetworkZone> {
/// The [`Network`] we should connect to.
pub network: Network,
/// The number of outbound connections to make and try keep.
@ -29,6 +29,7 @@ pub struct P2PConfig<N: NetworkZone> {
/// The public RPC port to tell peers about so wallets can use our node. `0` if we do not have a public RPC port.
pub rpc_port: u16,
/// The [`AddressBookConfig`].
pub address_book_config: AddressBookConfig,
}
@ -36,7 +37,7 @@ impl<N: NetworkZone> P2PConfig<N> {
/// Returns the [`BasicNodeData`] for this [`P2PConfig`].
///
/// [`BasicNodeData::peer_id`] is set to a random u64, so this function should only be called once
/// per [`NetworkZone`].
/// per [`NetworkZone`] per run.
pub(crate) fn basic_node_data(&self) -> BasicNodeData {
BasicNodeData {
my_port: self.p2p_port as u32,

View file

@ -1,7 +1,7 @@
use std::time::Duration;
/// The timeout we set on handshakes.
pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30);
pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(20);
/// The maximum amount of connections to make to seed nodes for when we need peers.
pub(crate) const MAX_SEED_CONNECTIONS: usize = 3;
@ -28,6 +28,12 @@ pub(crate) const SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT: usize = 10 * 1024 * 1024;
/// 50 more transactions after it are added to the queue.
pub(crate) const MAX_TXS_IN_BROADCAST_CHANNEL: usize = 50;
/// The time to sleep after an inbound connection comes in.
///
/// This is a safety measure to prevent Cuprate from getting spammed with a load of inbound connections.
/// TODO: it might be a good idea to make this configurable.
pub(crate) const INBOUND_CONNECTION_COOL_DOWN: Duration = Duration::from_millis(500);
#[cfg(test)]
mod tests {
use super::*;

View file

@ -5,19 +5,26 @@
use std::{pin::pin, sync::Arc};
use futures::StreamExt;
use tokio::{
sync::Semaphore,
time::{sleep, timeout},
};
use tower::{Service, ServiceExt};
use tracing::{instrument, Instrument, Span};
use monero_p2p::{
client::{Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
ConnectionDirection, NetworkZone,
};
use tokio::time::sleep;
use tokio::{sync::Semaphore, time::timeout};
use tower::{Service, ServiceExt};
use tracing::{instrument, Instrument, Span};
use crate::constants::INBOUND_CONNECTION_COOL_DOWN;
use crate::{client_pool::ClientPool, constants::HANDSHAKE_TIMEOUT, P2PConfig};
use crate::{
client_pool::ClientPool,
constants::{HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN},
P2PConfig,
};
#[instrument(level = "info", skip_all)]
/// The inbound server.
#[instrument(level = "warn", skip_all)]
pub async fn inbound_server<N, HS>(
client_pool: Arc<ClientPool<N>>,
mut handshaker: HS,
@ -35,6 +42,9 @@ where
return Ok(());
};
// TODO: take in the address book and check if incoming peers are banned before adding them to our
// connections.
tracing::info!("Starting inbound connection server");
let listener = N::incoming_connection_listener(server_config, config.p2p_port)

View file

@ -7,11 +7,125 @@
//!
#![allow(dead_code)]
use std::sync::Arc;
use tokio::sync::{mpsc, watch};
use tower::buffer::Buffer;
use tracing::{instrument, Instrument, Span};
use monero_p2p::{CoreSyncSvc, NetworkZone, PeerRequestHandler};
mod broadcast;
pub mod client_pool;
mod client_pool;
pub mod config;
pub mod connection_maintainer;
mod constants;
mod inbound_server;
mod sync_states;
use crate::connection_maintainer::MakeConnectionRequest;
pub use config::P2PConfig;
use monero_p2p::client::Connector;
/// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`].
///
/// This function starts all the tasks to maintain connections/ accept connections/ make connections.
///
/// To use you must provide, a peer request handler, which is given to each connection and a core sync service
/// which keeps track of the sync state of our node.
#[instrument(level="debug", name="net", skip_all, fields(zone=N::NAME))]
pub async fn initialize_network<N, R, CS>(
peer_req_handler: R,
core_sync_svc: CS,
config: P2PConfig<N>,
) -> Result<NetworkInterface<N>, tower::BoxError>
where
N: NetworkZone,
R: PeerRequestHandler + Clone,
CS: CoreSyncSvc + Clone,
{
let address_book =
monero_address_book::init_address_book(config.address_book_config.clone()).await?;
let address_book = Buffer::new(
address_book,
config.max_inbound_connections + config.outbound_connections,
);
let (sync_states_svc, top_block_watch) = sync_states::PeerSyncSvc::new();
let sync_states_svc = Buffer::new(
sync_states_svc,
config.max_inbound_connections + config.outbound_connections,
);
// Use the default config. Changing the defaults affects tx fluff times, which could effect D++ so for now don't allow changing
// this.
let (broadcast_svc, outbound_mkr, inbound_mkr) =
broadcast::init_broadcast_channels(broadcast::BroadcastConfig::default());
let mut basic_node_data = config.basic_node_data();
if !N::CHECK_NODE_ID {
// TODO: make sure this is the value monerod sets for anon networks.
basic_node_data.peer_id = 1;
}
let outbound_handshaker = monero_p2p::client::HandShaker::new(
address_book.clone(),
sync_states_svc.clone(),
core_sync_svc.clone(),
peer_req_handler.clone(),
outbound_mkr,
basic_node_data.clone(),
);
let inbound_handshaker = monero_p2p::client::HandShaker::new(
address_book.clone(),
sync_states_svc,
core_sync_svc.clone(),
peer_req_handler,
inbound_mkr,
basic_node_data,
);
let client_pool = client_pool::ClientPool::new();
let (make_connection_tx, make_connection_rx) = mpsc::channel(3);
let outbound_connector = Connector::new(outbound_handshaker);
let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new(
config.clone(),
client_pool.clone(),
make_connection_rx,
address_book.clone(),
outbound_connector,
);
tokio::spawn(
outbound_connection_maintainer
.run()
.instrument(Span::current()),
);
tokio::spawn(
inbound_server::inbound_server(client_pool.clone(), inbound_handshaker, config)
.instrument(Span::current()),
);
Ok(NetworkInterface {
pool: client_pool,
broadcast_svc,
top_block_watch,
make_connection_tx,
})
}
/// The interface to Monero's P2P network on a certain [`NetworkZone`].
pub struct NetworkInterface<N: NetworkZone> {
/// A pool of free connected peers.
pool: Arc<client_pool::ClientPool<N>>,
/// A [`Service`](tower::Service) that allows broadcasting to all connected peers.
broadcast_svc: broadcast::BroadcastSvc<N>,
/// A [`watch`] channel that contains the highest seen cumulative difficulty and other info
/// on that claimed chain.
top_block_watch: watch::Receiver<sync_states::NewSyncInfo>,
/// A channel to request extra connections.
make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
}

View file

@ -29,13 +29,13 @@ pub use handshaker::{DoHandshakeRequest, HandShaker, HandshakeError};
use monero_pruning::PruningSeed;
/// An internal identifier for a given peer, will be their address if known
/// or a random u64 if not.
/// or a random u128 if not.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum InternalPeerID<A> {
/// A known address.
KnownAddr(A),
/// An unknown address (probably an inbound anonymity network connection).
Unknown(u64),
Unknown(u128),
}
impl<A: Display> Display for InternalPeerID<A> {