diff --git a/Cargo.lock b/Cargo.lock index 809bdf28..d3f45031 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -688,6 +688,7 @@ dependencies = [ name = "cuprate-p2p" version = "0.1.0" dependencies = [ + "borsh", "bytes", "cuprate-address-book", "cuprate-async-buffer", @@ -728,9 +729,11 @@ dependencies = [ "cuprate-wire", "futures", "hex", + "hex-literal", "thiserror", "tokio", "tokio-stream", + "tokio-test", "tokio-util", "tower", "tracing", diff --git a/net/wire/src/lib.rs b/net/wire/src/lib.rs index 45a2405c..674a2e91 100644 --- a/net/wire/src/lib.rs +++ b/net/wire/src/lib.rs @@ -13,10 +13,10 @@ // copies or substantial portions of the Software. // -//! # Monero Wire +//! # Cuprate Wire //! //! A crate defining Monero network messages and network addresses, -//! built on top of the levin-cuprate crate. +//! built on top of the [`cuprate_levin`] crate. //! //! ## License //! diff --git a/net/wire/src/p2p.rs b/net/wire/src/p2p.rs index 0d448e4f..97431099 100644 --- a/net/wire/src/p2p.rs +++ b/net/wire/src/p2p.rs @@ -177,6 +177,7 @@ fn build_message( Ok(()) } +#[derive(Debug, Clone)] pub enum ProtocolMessage { NewBlock(NewBlock), NewFluffyBlock(NewFluffyBlock), @@ -255,22 +256,23 @@ impl ProtocolMessage { } } -pub enum RequestMessage { +#[derive(Debug, Clone)] +pub enum AdminRequestMessage { Handshake(HandshakeRequest), Ping, SupportFlags, TimedSync(TimedSyncRequest), } -impl RequestMessage { +impl AdminRequestMessage { pub fn command(&self) -> LevinCommand { use LevinCommand as C; match self { - RequestMessage::Handshake(_) => C::Handshake, - RequestMessage::Ping => C::Ping, - RequestMessage::SupportFlags => C::SupportFlags, - RequestMessage::TimedSync(_) => C::TimedSync, + AdminRequestMessage::Handshake(_) => C::Handshake, + AdminRequestMessage::Ping => C::Ping, + AdminRequestMessage::SupportFlags => C::SupportFlags, + AdminRequestMessage::TimedSync(_) => C::TimedSync, } } @@ -278,19 +280,19 @@ impl RequestMessage { use LevinCommand as C; Ok(match command { - C::Handshake => decode_message(RequestMessage::Handshake, buf)?, - C::TimedSync => decode_message(RequestMessage::TimedSync, buf)?, + C::Handshake => decode_message(AdminRequestMessage::Handshake, buf)?, + C::TimedSync => decode_message(AdminRequestMessage::TimedSync, buf)?, C::Ping => { cuprate_epee_encoding::from_bytes::(buf) .map_err(|e| BucketError::BodyDecodingError(e.into()))?; - RequestMessage::Ping + AdminRequestMessage::Ping } C::SupportFlags => { cuprate_epee_encoding::from_bytes::(buf) .map_err(|e| BucketError::BodyDecodingError(e.into()))?; - RequestMessage::SupportFlags + AdminRequestMessage::SupportFlags } _ => return Err(BucketError::UnknownCommand), }) @@ -300,31 +302,34 @@ impl RequestMessage { use LevinCommand as C; match self { - RequestMessage::Handshake(val) => build_message(C::Handshake, val, builder)?, - RequestMessage::TimedSync(val) => build_message(C::TimedSync, val, builder)?, - RequestMessage::Ping => build_message(C::Ping, EmptyMessage, builder)?, - RequestMessage::SupportFlags => build_message(C::SupportFlags, EmptyMessage, builder)?, + AdminRequestMessage::Handshake(val) => build_message(C::Handshake, val, builder)?, + AdminRequestMessage::TimedSync(val) => build_message(C::TimedSync, val, builder)?, + AdminRequestMessage::Ping => build_message(C::Ping, EmptyMessage, builder)?, + AdminRequestMessage::SupportFlags => { + build_message(C::SupportFlags, EmptyMessage, builder)? + } } Ok(()) } } -pub enum ResponseMessage { +#[derive(Debug, Clone)] +pub enum AdminResponseMessage { Handshake(HandshakeResponse), Ping(PingResponse), SupportFlags(SupportFlagsResponse), TimedSync(TimedSyncResponse), } -impl ResponseMessage { +impl AdminResponseMessage { pub fn command(&self) -> LevinCommand { use LevinCommand as C; match self { - ResponseMessage::Handshake(_) => C::Handshake, - ResponseMessage::Ping(_) => C::Ping, - ResponseMessage::SupportFlags(_) => C::SupportFlags, - ResponseMessage::TimedSync(_) => C::TimedSync, + AdminResponseMessage::Handshake(_) => C::Handshake, + AdminResponseMessage::Ping(_) => C::Ping, + AdminResponseMessage::SupportFlags(_) => C::SupportFlags, + AdminResponseMessage::TimedSync(_) => C::TimedSync, } } @@ -332,10 +337,10 @@ impl ResponseMessage { use LevinCommand as C; Ok(match command { - C::Handshake => decode_message(ResponseMessage::Handshake, buf)?, - C::TimedSync => decode_message(ResponseMessage::TimedSync, buf)?, - C::Ping => decode_message(ResponseMessage::Ping, buf)?, - C::SupportFlags => decode_message(ResponseMessage::SupportFlags, buf)?, + C::Handshake => decode_message(AdminResponseMessage::Handshake, buf)?, + C::TimedSync => decode_message(AdminResponseMessage::TimedSync, buf)?, + C::Ping => decode_message(AdminResponseMessage::Ping, buf)?, + C::SupportFlags => decode_message(AdminResponseMessage::SupportFlags, buf)?, _ => return Err(BucketError::UnknownCommand), }) } @@ -344,18 +349,21 @@ impl ResponseMessage { use LevinCommand as C; match self { - ResponseMessage::Handshake(val) => build_message(C::Handshake, val, builder)?, - ResponseMessage::TimedSync(val) => build_message(C::TimedSync, val, builder)?, - ResponseMessage::Ping(val) => build_message(C::Ping, val, builder)?, - ResponseMessage::SupportFlags(val) => build_message(C::SupportFlags, val, builder)?, + AdminResponseMessage::Handshake(val) => build_message(C::Handshake, val, builder)?, + AdminResponseMessage::TimedSync(val) => build_message(C::TimedSync, val, builder)?, + AdminResponseMessage::Ping(val) => build_message(C::Ping, val, builder)?, + AdminResponseMessage::SupportFlags(val) => { + build_message(C::SupportFlags, val, builder)? + } } Ok(()) } } +#[derive(Debug, Clone)] pub enum Message { - Request(RequestMessage), - Response(ResponseMessage), + Request(AdminRequestMessage), + Response(AdminResponseMessage), Protocol(ProtocolMessage), } @@ -390,8 +398,10 @@ impl LevinBody for Message { command: LevinCommand, ) -> Result { Ok(match typ { - MessageType::Request => Message::Request(RequestMessage::decode(body, command)?), - MessageType::Response => Message::Response(ResponseMessage::decode(body, command)?), + MessageType::Request => Message::Request(AdminRequestMessage::decode(body, command)?), + MessageType::Response => { + Message::Response(AdminResponseMessage::decode(body, command)?) + } MessageType::Notification => Message::Protocol(ProtocolMessage::decode(body, command)?), }) } diff --git a/net/wire/src/p2p/protocol.rs b/net/wire/src/p2p/protocol.rs index 5e95a4f8..a385099b 100644 --- a/net/wire/src/p2p/protocol.rs +++ b/net/wire/src/p2p/protocol.rs @@ -61,7 +61,7 @@ epee_object!( /// A Request For Blocks #[derive(Debug, Clone, PartialEq, Eq)] pub struct GetObjectsRequest { - /// Block hashes we want + /// Block hashes wanted. pub blocks: ByteArrayVec<32>, /// Pruned pub pruned: bool, diff --git a/p2p/address-book/src/book.rs b/p2p/address-book/src/book.rs index b6ab07ad..4b5a1e76 100644 --- a/p2p/address-book/src/book.rs +++ b/p2p/address-book/src/book.rs @@ -27,7 +27,10 @@ use cuprate_p2p_core::{ }; use cuprate_pruning::PruningSeed; -use crate::{peer_list::PeerList, store::save_peers_to_disk, AddressBookConfig, AddressBookError}; +use crate::{ + peer_list::PeerList, store::save_peers_to_disk, AddressBookConfig, AddressBookError, + BorshNetworkZone, +}; #[cfg(test)] mod tests; @@ -45,7 +48,7 @@ pub struct ConnectionPeerEntry { rpc_credits_per_hash: u32, } -pub struct AddressBook { +pub struct AddressBook { /// Our white peers - the peers we have previously connected to. white_list: PeerList, /// Our gray peers - the peers we have been told about but haven't connected to. @@ -66,7 +69,7 @@ pub struct AddressBook { cfg: AddressBookConfig, } -impl AddressBook { +impl AddressBook { pub fn new( cfg: AddressBookConfig, white_peers: Vec>, @@ -351,7 +354,7 @@ impl AddressBook { } } -impl Service> for AddressBook { +impl Service> for AddressBook { type Response = AddressBookResponse; type Error = AddressBookError; type Future = Ready>; diff --git a/p2p/address-book/src/book/tests.rs b/p2p/address-book/src/book/tests.rs index 11f31868..1abea043 100644 --- a/p2p/address-book/src/book/tests.rs +++ b/p2p/address-book/src/book/tests.rs @@ -1,7 +1,7 @@ -use std::{path::PathBuf, sync::Arc, time::Duration}; +use std::{path::PathBuf, time::Duration}; use futures::StreamExt; -use tokio::{sync::Semaphore, time::interval}; +use tokio::time::interval; use cuprate_p2p_core::handles::HandleBuilder; use cuprate_pruning::PruningSeed; @@ -78,11 +78,7 @@ async fn get_white_peers() { async fn add_new_peer_already_connected() { let mut address_book = make_fake_address_book(0, 0); - let semaphore = Arc::new(Semaphore::new(10)); - - let (_, handle) = HandleBuilder::default() - .with_permit(semaphore.clone().try_acquire_owned().unwrap()) - .build(); + let (_, handle) = HandleBuilder::default().build(); address_book .handle_new_connection( @@ -98,9 +94,7 @@ async fn add_new_peer_already_connected() { ) .unwrap(); - let (_, handle) = HandleBuilder::default() - .with_permit(semaphore.try_acquire_owned().unwrap()) - .build(); + let (_, handle) = HandleBuilder::default().build(); assert_eq!( address_book.handle_new_connection( diff --git a/p2p/address-book/src/lib.rs b/p2p/address-book/src/lib.rs index 1ce659f1..c0903485 100644 --- a/p2p/address-book/src/lib.rs +++ b/p2p/address-book/src/lib.rs @@ -10,10 +10,9 @@ //! clear net peers getting linked to their dark counterparts //! and so peers will only get told about peers they can //! connect to. -//! use std::{io::ErrorKind, path::PathBuf, time::Duration}; -use cuprate_p2p_core::NetworkZone; +use cuprate_p2p_core::{NetZoneAddress, NetworkZone}; mod book; mod peer_list; @@ -61,7 +60,7 @@ pub enum AddressBookError { } /// Initializes the P2P address book for a specific network zone. -pub async fn init_address_book( +pub async fn init_address_book( cfg: AddressBookConfig, ) -> Result, std::io::Error> { tracing::info!( @@ -82,3 +81,21 @@ pub async fn init_address_book( Ok(address_book) } + +use sealed::BorshNetworkZone; +mod sealed { + use super::*; + + /// An internal trait for the address book for a [`NetworkZone`] that adds the requirement of [`borsh`] traits + /// onto the network address. + pub trait BorshNetworkZone: NetworkZone { + type BorshAddr: NetZoneAddress + borsh::BorshDeserialize + borsh::BorshSerialize; + } + + impl BorshNetworkZone for T + where + T::Addr: borsh::BorshDeserialize + borsh::BorshSerialize, + { + type BorshAddr = T::Addr; + } +} diff --git a/p2p/address-book/src/store.rs b/p2p/address-book/src/store.rs index 94b0ec24..abc42d69 100644 --- a/p2p/address-book/src/store.rs +++ b/p2p/address-book/src/store.rs @@ -3,9 +3,9 @@ use std::fs; use borsh::{from_slice, to_vec, BorshDeserialize, BorshSerialize}; use tokio::task::{spawn_blocking, JoinHandle}; -use cuprate_p2p_core::{services::ZoneSpecificPeerListEntryBase, NetZoneAddress, NetworkZone}; +use cuprate_p2p_core::{services::ZoneSpecificPeerListEntryBase, NetZoneAddress}; -use crate::{peer_list::PeerList, AddressBookConfig}; +use crate::{peer_list::PeerList, AddressBookConfig, BorshNetworkZone}; // TODO: store anchor and ban list. @@ -21,7 +21,7 @@ struct DeserPeerDataV1 { gray_list: Vec>, } -pub fn save_peers_to_disk( +pub fn save_peers_to_disk( cfg: &AddressBookConfig, white_list: &PeerList, gray_list: &PeerList, @@ -38,7 +38,7 @@ pub fn save_peers_to_disk( spawn_blocking(move || fs::write(&file, &data)) } -pub async fn read_peers_from_disk( +pub async fn read_peers_from_disk( cfg: &AddressBookConfig, ) -> Result< ( diff --git a/p2p/dandelion-tower/src/config.rs b/p2p/dandelion-tower/src/config.rs index 71a4e5b2..6266d60a 100644 --- a/p2p/dandelion-tower/src/config.rs +++ b/p2p/dandelion-tower/src/config.rs @@ -42,7 +42,6 @@ pub enum Graph { /// `(-k*(k-1)*hop)/(2*log(1-ep))` /// /// Where `k` is calculated from the fluff probability, `hop` is `time_between_hop` and `ep` is fixed at `0.1`. -/// #[derive(Debug, Clone, Copy)] pub struct DandelionConfig { /// The time it takes for a stem transaction to pass through a node, including network latency. diff --git a/p2p/dandelion-tower/src/pool.rs b/p2p/dandelion-tower/src/pool.rs index eddcc670..68f79451 100644 --- a/p2p/dandelion-tower/src/pool.rs +++ b/p2p/dandelion-tower/src/pool.rs @@ -16,7 +16,6 @@ //! //! When using your handle to the backing store it must be remembered to keep transactions in the stem pool hidden. //! So handle any requests to the tx-pool like the stem side of the pool does not exist. -//! use std::{ collections::{HashMap, HashSet}, future::Future, diff --git a/p2p/dandelion-tower/src/router.rs b/p2p/dandelion-tower/src/router.rs index a64819a7..c118c0b7 100644 --- a/p2p/dandelion-tower/src/router.rs +++ b/p2p/dandelion-tower/src/router.rs @@ -7,7 +7,6 @@ //! //! It does not handle anything to do with keeping transactions long term, i.e. embargo timers and handling //! loops in the stem. It is up to implementers to do this if they decide not to use [`DandelionPool`](crate::pool::DandelionPool) -//! use std::{ collections::HashMap, hash::Hash, diff --git a/p2p/p2p-core/Cargo.toml b/p2p/p2p-core/Cargo.toml index f434d51a..9ef8e249 100644 --- a/p2p/p2p-core/Cargo.toml +++ b/p2p/p2p-core/Cargo.toml @@ -23,6 +23,7 @@ tower = { workspace = true, features = ["util", "tracing"] } thiserror = { workspace = true } tracing = { workspace = true, features = ["std", "attributes"] } +hex-literal = { workspace = true } borsh = { workspace = true, features = ["derive", "std"], optional = true } @@ -31,4 +32,5 @@ cuprate-test-utils = {path = "../../test-utils"} hex = { workspace = true, features = ["std"] } tokio = { workspace = true, features = ["net", "rt-multi-thread", "rt", "macros"]} +tokio-test = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/p2p/p2p-core/src/client.rs b/p2p/p2p-core/src/client.rs index 0e81d964..662a8eee 100644 --- a/p2p/p2p-core/src/client.rs +++ b/p2p/p2p-core/src/client.rs @@ -24,10 +24,11 @@ use crate::{ mod connection; mod connector; pub mod handshaker; +mod request_handler; mod timeout_monitor; pub use connector::{ConnectRequest, Connector}; -pub use handshaker::{DoHandshakeRequest, HandShaker, HandshakeError}; +pub use handshaker::{DoHandshakeRequest, HandshakeError, HandshakerBuilder}; /// An internal identifier for a given peer, will be their address if known /// or a random u128 if not. @@ -188,7 +189,8 @@ pub fn mock_client( mut request_handler: S, ) -> Client where - S: crate::PeerRequestHandler, + S: Service + Send + 'static, + S::Future: Send + 'static, { let (tx, mut rx) = mpsc::channel(1); diff --git a/p2p/p2p-core/src/client/connection.rs b/p2p/p2p-core/src/client/connection.rs index 341d8c09..f3f3f6be 100644 --- a/p2p/p2p-core/src/client/connection.rs +++ b/p2p/p2p-core/src/client/connection.rs @@ -2,7 +2,6 @@ //! //! This module handles routing requests from a [`Client`](crate::client::Client) or a broadcast channel to //! a peer. This module also handles routing requests from the connected peer to a request handler. -//! use std::pin::Pin; use futures::{ @@ -15,15 +14,15 @@ use tokio::{ time::{sleep, timeout, Sleep}, }; use tokio_stream::wrappers::ReceiverStream; -use tower::ServiceExt; use cuprate_wire::{LevinCommand, Message, ProtocolMessage}; +use crate::client::request_handler::PeerRequestHandler; use crate::{ constants::{REQUEST_TIMEOUT, SENDING_TIMEOUT}, handles::ConnectionGuard, - BroadcastMessage, MessageID, NetworkZone, PeerError, PeerRequest, PeerRequestHandler, - PeerResponse, SharedError, + AddressBook, BroadcastMessage, CoreSyncSvc, MessageID, NetworkZone, PeerError, PeerRequest, + PeerResponse, PeerSyncSvc, ProtocolRequestHandler, ProtocolResponse, SharedError, }; /// A request to the connection task from a [`Client`](crate::client::Client). @@ -72,7 +71,7 @@ fn levin_command_response(message_id: &MessageID, command: LevinCommand) -> bool } /// This represents a connection to a peer. -pub struct Connection { +pub struct Connection { /// The peer sink - where we send messages to the peer. peer_sink: Z::Sink, @@ -87,7 +86,7 @@ pub struct Connection { broadcast_stream: Pin>, /// The inner handler for any requests that come from the requested peer. - peer_request_handler: ReqHndlr, + peer_request_handler: PeerRequestHandler, /// The connection guard which will send signals to other parts of Cuprate when this connection is dropped. connection_guard: ConnectionGuard, @@ -95,9 +94,13 @@ pub struct Connection { error: SharedError, } -impl Connection +impl Connection where - ReqHndlr: PeerRequestHandler, + Z: NetworkZone, + A: AddressBook, + CS: CoreSyncSvc, + PS: PeerSyncSvc, + PR: ProtocolRequestHandler, BrdcstStrm: Stream + Send + 'static, { /// Create a new connection struct. @@ -105,10 +108,10 @@ where peer_sink: Z::Sink, client_rx: mpsc::Receiver, broadcast_stream: BrdcstStrm, - peer_request_handler: ReqHndlr, + peer_request_handler: PeerRequestHandler, connection_guard: ConnectionGuard, error: SharedError, - ) -> Connection { + ) -> Connection { Connection { peer_sink, state: State::WaitingForRequest, @@ -175,7 +178,9 @@ where return Err(e); } else { // We still need to respond even if the response is this. - let _ = req.response_channel.send(Ok(PeerResponse::NA)); + let _ = req + .response_channel + .send(Ok(PeerResponse::Protocol(ProtocolResponse::NA))); } Ok(()) @@ -185,17 +190,14 @@ where async fn handle_peer_request(&mut self, req: PeerRequest) -> Result<(), PeerError> { tracing::debug!("Received peer request: {:?}", req.id()); - let ready_svc = self.peer_request_handler.ready().await?; - let res = ready_svc.call(req).await?; - if matches!(res, PeerResponse::NA) { - return Ok(()); + let res = self.peer_request_handler.handle_peer_request(req).await?; + + // This will be an error if a response does not need to be sent + if let Ok(res) = res.try_into() { + self.send_message_to_peer(res).await?; } - self.send_message_to_peer( - res.try_into() - .expect("We just checked if the response was `NA`"), - ) - .await + Ok(()) } /// Handles a message from a peer when we are in [`State::WaitingForResponse`]. diff --git a/p2p/p2p-core/src/client/connector.rs b/p2p/p2p-core/src/client/connector.rs index 278d7407..d937165f 100644 --- a/p2p/p2p-core/src/client/connector.rs +++ b/p2p/p2p-core/src/client/connector.rs @@ -4,7 +4,6 @@ //! perform a handshake and create a [`Client`]. //! //! This is where outbound connections are created. -//! use std::{ future::Future, pin::Pin, @@ -16,9 +15,9 @@ use tokio::sync::OwnedSemaphorePermit; use tower::{Service, ServiceExt}; use crate::{ - client::{Client, DoHandshakeRequest, HandShaker, HandshakeError, InternalPeerID}, - AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone, - PeerRequestHandler, PeerSyncSvc, + client::{handshaker::HandShaker, Client, DoHandshakeRequest, HandshakeError, InternalPeerID}, + AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone, PeerSyncSvc, + ProtocolRequestHandler, }; /// A request to connect to a peer. @@ -27,30 +26,32 @@ pub struct ConnectRequest { pub addr: Z::Addr, /// A permit which will be held be the connection allowing you to set limits on the number of /// connections. - pub permit: OwnedSemaphorePermit, + /// + /// This doesn't have to be set. + pub permit: Option, } /// The connector service, this service connects to peer and returns the [`Client`]. -pub struct Connector { - handshaker: HandShaker, +pub struct Connector { + handshaker: HandShaker, } -impl - Connector +impl + Connector { /// Create a new connector from a handshaker. - pub fn new(handshaker: HandShaker) -> Self { + pub fn new(handshaker: HandShaker) -> Self { Self { handshaker } } } -impl - Service> for Connector +impl + Service> for Connector where AdrBook: AddressBook + Clone, CSync: CoreSyncSvc + Clone, PSync: PeerSyncSvc + Clone, - ReqHdlr: PeerRequestHandler + Clone, + ProtoHdlr: ProtocolRequestHandler + Clone, BrdcstStrm: Stream + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Clone + Send + 'static, { @@ -74,7 +75,7 @@ where permit: req.permit, peer_stream, peer_sink, - direction: ConnectionDirection::OutBound, + direction: ConnectionDirection::Outbound, }; handshaker.ready().await?.call(req).await } diff --git a/p2p/p2p-core/src/client/handshaker.rs b/p2p/p2p-core/src/client/handshaker.rs index 1071b339..67a58d48 100644 --- a/p2p/p2p-core/src/client/handshaker.rs +++ b/p2p/p2p-core/src/client/handshaker.rs @@ -18,7 +18,7 @@ use tokio::{ time::{error::Elapsed, timeout}, }; use tower::{Service, ServiceExt}; -use tracing::{info_span, Instrument}; +use tracing::{info_span, Instrument, Span}; use cuprate_pruning::{PruningError, PruningSeed}; use cuprate_wire::{ @@ -27,13 +27,13 @@ use cuprate_wire::{ PING_OK_RESPONSE_STATUS_TEXT, }, common::PeerSupportFlags, - BasicNodeData, BucketError, LevinCommand, Message, RequestMessage, ResponseMessage, + AdminRequestMessage, AdminResponseMessage, BasicNodeData, BucketError, LevinCommand, Message, }; use crate::{ client::{ - connection::Connection, timeout_monitor::connection_timeout_monitor_task, Client, - InternalPeerID, PeerInformation, + connection::Connection, request_handler::PeerRequestHandler, + timeout_monitor::connection_timeout_monitor_task, Client, InternalPeerID, PeerInformation, }, constants::{ HANDSHAKE_TIMEOUT, MAX_EAGER_PROTOCOL_MESSAGES, MAX_PEERS_IN_PEER_LIST_MESSAGE, @@ -43,9 +43,12 @@ use crate::{ services::PeerSyncRequest, AddressBook, AddressBookRequest, AddressBookResponse, BroadcastMessage, ConnectionDirection, CoreSyncDataRequest, CoreSyncDataResponse, CoreSyncSvc, NetZoneAddress, NetworkZone, - PeerRequestHandler, PeerSyncSvc, SharedError, + PeerSyncSvc, ProtocolRequestHandler, SharedError, }; +pub mod builder; +pub use builder::HandshakerBuilder; + #[derive(Debug, thiserror::Error)] pub enum HandshakeError { #[error("The handshake timed out")] @@ -78,21 +81,21 @@ pub struct DoHandshakeRequest { pub peer_sink: Z::Sink, /// The direction of the connection. pub direction: ConnectionDirection, - /// A permit for this connection. - pub permit: OwnedSemaphorePermit, + /// An [`Option`]al permit for this connection. + pub permit: Option, } /// The peer handshaking service. #[derive(Debug, Clone)] -pub struct HandShaker { +pub struct HandShaker { /// The address book service. address_book: AdrBook, /// The core sync data service. core_sync_svc: CSync, /// The peer sync service. peer_sync_svc: PSync, - /// The peer request handler service. - peer_request_svc: ReqHdlr, + /// The protocol request handler service. + protocol_request_svc: ProtoHdlr, /// Our [`BasicNodeData`] our_basic_node_data: BasicNodeData, @@ -100,42 +103,46 @@ pub struct HandShaker, } -impl - HandShaker +impl + HandShaker { /// Creates a new handshaker. - pub fn new( + fn new( address_book: AdrBook, peer_sync_svc: PSync, core_sync_svc: CSync, - peer_request_svc: ReqHdlr, + protocol_request_svc: ProtoHdlr, broadcast_stream_maker: BrdcstStrmMkr, - our_basic_node_data: BasicNodeData, + connection_parent_span: Span, ) -> Self { Self { address_book, peer_sync_svc, core_sync_svc, - peer_request_svc, + protocol_request_svc, broadcast_stream_maker, our_basic_node_data, + connection_parent_span, _zone: PhantomData, } } } -impl - Service> for HandShaker +impl + Service> + for HandShaker where AdrBook: AddressBook + Clone, CSync: CoreSyncSvc + Clone, PSync: PeerSyncSvc + Clone, - ReqHdlr: PeerRequestHandler + Clone, + ProtoHdlr: ProtocolRequestHandler + Clone, BrdcstStrm: Stream + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Clone + Send + 'static, { @@ -152,12 +159,14 @@ where let broadcast_stream_maker = self.broadcast_stream_maker.clone(); let address_book = self.address_book.clone(); - let peer_request_svc = self.peer_request_svc.clone(); + let protocol_request_svc = self.protocol_request_svc.clone(); let core_sync_svc = self.core_sync_svc.clone(); let peer_sync_svc = self.peer_sync_svc.clone(); let our_basic_node_data = self.our_basic_node_data.clone(); - let span = info_span!(parent: &tracing::Span::current(), "handshaker", addr=%req.addr); + let connection_parent_span = self.connection_parent_span.clone(); + + let span = info_span!(parent: &Span::current(), "handshaker", addr=%req.addr); async move { timeout( @@ -168,8 +177,9 @@ where address_book, core_sync_svc, peer_sync_svc, - peer_request_svc, + protocol_request_svc, our_basic_node_data, + connection_parent_span, ), ) .await? @@ -190,11 +200,11 @@ pub async fn ping(addr: N::Addr) -> Result tracing::debug!("Made outbound connection to peer, sending ping."); peer_sink - .send(Message::Request(RequestMessage::Ping).into()) + .send(Message::Request(AdminRequestMessage::Ping).into()) .await?; if let Some(res) = peer_stream.next().await { - if let Message::Response(ResponseMessage::Ping(ping)) = res? { + if let Message::Response(AdminResponseMessage::Ping(ping)) = res? { if ping.status == PING_OK_RESPONSE_STATUS_TEXT { tracing::debug!("Ping successful."); return Ok(ping.peer_id); @@ -220,7 +230,8 @@ pub async fn ping(addr: N::Addr) -> Result } /// This function completes a handshake with the requested peer. -async fn handshake( +#[allow(clippy::too_many_arguments)] +async fn handshake( req: DoHandshakeRequest, broadcast_stream_maker: BrdcstStrmMkr, @@ -228,14 +239,15 @@ async fn handshake Result, HandshakeError> where - AdrBook: AddressBook, - CSync: CoreSyncSvc, - PSync: PeerSyncSvc, - ReqHdlr: PeerRequestHandler, + AdrBook: AddressBook + Clone, + CSync: CoreSyncSvc + Clone, + PSync: PeerSyncSvc + Clone, + ProtoHdlr: ProtocolRequestHandler, BrdcstStrm: Stream + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Send + 'static, { @@ -252,19 +264,20 @@ where let mut eager_protocol_messages = Vec::new(); let (peer_core_sync, peer_node_data) = match direction { - ConnectionDirection::InBound => { + ConnectionDirection::Inbound => { // Inbound handshake the peer sends the request. tracing::debug!("waiting for handshake request."); - let Message::Request(RequestMessage::Handshake(handshake_req)) = wait_for_message::( - LevinCommand::Handshake, - true, - &mut peer_sink, - &mut peer_stream, - &mut eager_protocol_messages, - &our_basic_node_data, - ) - .await? + let Message::Request(AdminRequestMessage::Handshake(handshake_req)) = + wait_for_message::( + LevinCommand::Handshake, + true, + &mut peer_sink, + &mut peer_stream, + &mut eager_protocol_messages, + &our_basic_node_data, + ) + .await? else { panic!("wait_for_message returned ok with wrong message."); }; @@ -273,7 +286,7 @@ where // We will respond to the handshake request later. (handshake_req.payload_data, handshake_req.node_data) } - ConnectionDirection::OutBound => { + ConnectionDirection::Outbound => { // Outbound handshake, we send the request. send_hs_request::( &mut peer_sink, @@ -283,7 +296,7 @@ where .await?; // Wait for the handshake response. - let Message::Response(ResponseMessage::Handshake(handshake_res)) = + let Message::Response(AdminResponseMessage::Handshake(handshake_res)) = wait_for_message::( LevinCommand::Handshake, false, @@ -373,13 +386,13 @@ where // public_address, if Some, is the reachable address of the node. let public_address = 'check_out_addr: { match direction { - ConnectionDirection::InBound => { + ConnectionDirection::Inbound => { // First send the handshake response. send_hs_response::( &mut peer_sink, &mut core_sync_svc, &mut address_book, - our_basic_node_data, + our_basic_node_data.clone(), ) .await?; @@ -411,7 +424,7 @@ where // The peer did not specify a reachable port or the ping was not successful. None } - ConnectionDirection::OutBound => { + ConnectionDirection::Outbound => { let InternalPeerID::KnownAddr(outbound_addr) = addr else { unreachable!("How could we make an outbound connection to an unknown address"); }; @@ -424,37 +437,7 @@ where tracing::debug!("Handshake complete."); - // Set up the connection data. - let error_slot = SharedError::new(); let (connection_guard, handle) = HandleBuilder::new().with_permit(permit).build(); - let (connection_tx, client_rx) = mpsc::channel(1); - - let connection = Connection::::new( - peer_sink, - client_rx, - broadcast_stream_maker(addr), - peer_request_svc, - connection_guard, - error_slot.clone(), - ); - - let connection_span = tracing::error_span!(parent: &tracing::Span::none(), "connection", %addr); - let connection_handle = tokio::spawn( - connection - .run(peer_stream.fuse(), eager_protocol_messages) - .instrument(connection_span), - ); - - // Tell the core sync service about the new peer. - peer_sync_svc - .ready() - .await? - .call(PeerSyncRequest::IncomingCoreSyncData( - addr, - handle.clone(), - peer_core_sync, - )) - .await?; // Tell the address book about the new connection. address_book @@ -471,6 +454,21 @@ where }) .await?; + // Tell the core sync service about the new peer. + peer_sync_svc + .ready() + .await? + .call(PeerSyncRequest::IncomingCoreSyncData( + addr, + handle.clone(), + peer_core_sync, + )) + .await?; + + // Set up the connection data. + let error_slot = SharedError::new(); + let (connection_tx, client_rx) = mpsc::channel(1); + let info = PeerInformation { id: addr, handle, @@ -478,6 +476,32 @@ where pruning_seed, }; + let request_handler = PeerRequestHandler { + address_book_svc: address_book.clone(), + our_sync_svc: core_sync_svc.clone(), + peer_sync_svc: peer_sync_svc.clone(), + protocol_request_handler, + our_basic_node_data, + peer_info: info.clone(), + }; + + let connection = Connection::::new( + peer_sink, + client_rx, + broadcast_stream_maker(addr), + request_handler, + connection_guard, + error_slot.clone(), + ); + + let connection_span = + tracing::error_span!(parent: &connection_parent_span, "connection", %addr); + let connection_handle = tokio::spawn( + connection + .run(peer_stream.fuse(), eager_protocol_messages) + .instrument(connection_span), + ); + let semaphore = Arc::new(Semaphore::new(1)); let timeout_handle = tokio::spawn(connection_timeout_monitor_task( @@ -502,7 +526,7 @@ where Ok(client) } -/// Sends a [`RequestMessage::Handshake`] down the peer sink. +/// Sends a [`AdminRequestMessage::Handshake`] down the peer sink. async fn send_hs_request( peer_sink: &mut Z::Sink, core_sync_svc: &mut CSync, @@ -525,13 +549,13 @@ where tracing::debug!("Sending handshake request."); peer_sink - .send(Message::Request(RequestMessage::Handshake(req)).into()) + .send(Message::Request(AdminRequestMessage::Handshake(req)).into()) .await?; Ok(()) } -/// Sends a [`ResponseMessage::Handshake`] down the peer sink. +/// Sends a [`AdminResponseMessage::Handshake`] down the peer sink. async fn send_hs_response( peer_sink: &mut Z::Sink, core_sync_svc: &mut CSync, @@ -568,7 +592,7 @@ where tracing::debug!("Sending handshake response."); peer_sink - .send(Message::Response(ResponseMessage::Handshake(res)).into()) + .send(Message::Response(AdminResponseMessage::Handshake(res)).into()) .await?; Ok(()) @@ -619,7 +643,7 @@ async fn wait_for_message( } match req_message { - RequestMessage::SupportFlags => { + AdminRequestMessage::SupportFlags => { if !allow_support_flag_req { return Err(HandshakeError::PeerSentInvalidMessage( "Peer sent 2 support flag requests", @@ -631,7 +655,7 @@ async fn wait_for_message( allow_support_flag_req = false; continue; } - RequestMessage::Ping => { + AdminRequestMessage::Ping => { if !allow_ping { return Err(HandshakeError::PeerSentInvalidMessage( "Peer sent 2 ping requests", @@ -674,7 +698,7 @@ async fn wait_for_message( )))? } -/// Sends a [`ResponseMessage::SupportFlags`] down the peer sink. +/// Sends a [`AdminResponseMessage::SupportFlags`] down the peer sink. async fn send_support_flags( peer_sink: &mut Z::Sink, support_flags: PeerSupportFlags, @@ -682,7 +706,7 @@ async fn send_support_flags( tracing::debug!("Sending support flag response."); Ok(peer_sink .send( - Message::Response(ResponseMessage::SupportFlags(SupportFlagsResponse { + Message::Response(AdminResponseMessage::SupportFlags(SupportFlagsResponse { support_flags, })) .into(), @@ -690,7 +714,7 @@ async fn send_support_flags( .await?) } -/// Sends a [`ResponseMessage::Ping`] down the peer sink. +/// Sends a [`AdminResponseMessage::Ping`] down the peer sink. async fn send_ping_response( peer_sink: &mut Z::Sink, peer_id: u64, @@ -698,7 +722,7 @@ async fn send_ping_response( tracing::debug!("Sending ping response."); Ok(peer_sink .send( - Message::Response(ResponseMessage::Ping(PingResponse { + Message::Response(AdminResponseMessage::Ping(PingResponse { status: PING_OK_RESPONSE_STATUS_TEXT, peer_id, })) diff --git a/p2p/p2p-core/src/client/handshaker/builder.rs b/p2p/p2p-core/src/client/handshaker/builder.rs new file mode 100644 index 00000000..a40f3962 --- /dev/null +++ b/p2p/p2p-core/src/client/handshaker/builder.rs @@ -0,0 +1,292 @@ +use std::marker::PhantomData; + +use futures::{stream, Stream}; +use tracing::Span; + +use cuprate_wire::BasicNodeData; + +use crate::{ + client::{handshaker::HandShaker, InternalPeerID}, + AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, PeerSyncSvc, ProtocolRequestHandler, +}; + +mod dummy; +pub use dummy::{ + DummyAddressBook, DummyCoreSyncSvc, DummyPeerSyncSvc, DummyProtocolRequestHandler, +}; + +/// A [`HandShaker`] [`Service`](tower::Service) builder. +/// +/// This builder applies default values to make usage easier, behaviour and drawbacks of the defaults are documented +/// on the `with_*` method to change it, for example [`HandshakerBuilder::with_protocol_request_handler`]. +/// +/// If you want to use any network other than [`Mainnet`](crate::Network::Mainnet) +/// you will need to change the core sync service with [`HandshakerBuilder::with_core_sync_svc`], +/// see that method for details. +#[derive(Debug, Clone)] +pub struct HandshakerBuilder< + N: NetworkZone, + AdrBook = DummyAddressBook, + CSync = DummyCoreSyncSvc, + PSync = DummyPeerSyncSvc, + ProtoHdlr = DummyProtocolRequestHandler, + BrdcstStrmMkr = fn( + InternalPeerID<::Addr>, + ) -> stream::Pending, +> { + /// The address book service. + address_book: AdrBook, + /// The core sync data service. + core_sync_svc: CSync, + /// The peer sync service. + peer_sync_svc: PSync, + /// The protocol request service. + protocol_request_svc: ProtoHdlr, + /// Our [`BasicNodeData`] + our_basic_node_data: BasicNodeData, + /// A function that returns a stream that will give items to be broadcast by a connection. + broadcast_stream_maker: BrdcstStrmMkr, + /// The [`Span`] that will set as the parent to the connection [`Span`]. + connection_parent_span: Option, + + /// The network zone. + _zone: PhantomData, +} + +impl HandshakerBuilder { + /// Creates a new builder with our node's basic node data. + pub fn new(our_basic_node_data: BasicNodeData) -> Self { + Self { + address_book: DummyAddressBook, + core_sync_svc: DummyCoreSyncSvc::static_mainnet_genesis(), + peer_sync_svc: DummyPeerSyncSvc, + protocol_request_svc: DummyProtocolRequestHandler, + our_basic_node_data, + broadcast_stream_maker: |_| stream::pending(), + connection_parent_span: None, + _zone: PhantomData, + } + } +} + +impl + HandshakerBuilder +{ + /// Changes the address book to the provided one. + /// + /// ## Default Address Book + /// + /// The default address book is used if this function is not called. + /// + /// The default address book's only drawback is that it does not keep track of peers and therefore + /// bans. + pub fn with_address_book( + self, + new_address_book: NAdrBook, + ) -> HandshakerBuilder + where + NAdrBook: AddressBook + Clone, + { + let HandshakerBuilder { + core_sync_svc, + peer_sync_svc, + protocol_request_svc, + our_basic_node_data, + broadcast_stream_maker, + connection_parent_span, + _zone, + .. + } = self; + + HandshakerBuilder { + address_book: new_address_book, + core_sync_svc, + peer_sync_svc, + protocol_request_svc, + our_basic_node_data, + broadcast_stream_maker, + connection_parent_span, + _zone, + } + } + + /// Changes the core sync service to the provided one. + /// + /// The core sync service should keep track of our nodes core sync data. + /// + /// ## Default Core Sync Service + /// + /// The default core sync service is used if this method is not called. + /// + /// The default core sync service will just use the mainnet genesis block, to use other network's + /// genesis see [`DummyCoreSyncSvc::static_stagenet_genesis`] and [`DummyCoreSyncSvc::static_testnet_genesis`]. + /// The drawbacks to keeping this the default is that it will always return the mainnet genesis as our nodes + /// sync info, which means peers won't know our actual chain height, this may or may not be a problem for + /// different use cases. + pub fn with_core_sync_svc( + self, + new_core_sync_svc: NCSync, + ) -> HandshakerBuilder + where + NCSync: CoreSyncSvc + Clone, + { + let HandshakerBuilder { + address_book, + peer_sync_svc, + protocol_request_svc, + our_basic_node_data, + broadcast_stream_maker, + connection_parent_span, + _zone, + .. + } = self; + + HandshakerBuilder { + address_book, + core_sync_svc: new_core_sync_svc, + peer_sync_svc, + protocol_request_svc, + our_basic_node_data, + broadcast_stream_maker, + connection_parent_span, + _zone, + } + } + + /// Changes the peer sync service, which keeps track of peers sync states. + /// + /// ## Default Peer Sync Service + /// + /// The default peer sync service will be used if this method is not called. + /// + /// The default peer sync service will not keep track of peers sync states. + pub fn with_peer_sync_svc( + self, + new_peer_sync_svc: NPSync, + ) -> HandshakerBuilder + where + NPSync: PeerSyncSvc + Clone, + { + let HandshakerBuilder { + address_book, + core_sync_svc, + protocol_request_svc, + our_basic_node_data, + broadcast_stream_maker, + connection_parent_span, + _zone, + .. + } = self; + + HandshakerBuilder { + address_book, + core_sync_svc, + peer_sync_svc: new_peer_sync_svc, + protocol_request_svc, + our_basic_node_data, + broadcast_stream_maker, + connection_parent_span, + _zone, + } + } + + /// Changes the protocol request handler, which handles [`ProtocolRequest`](crate::ProtocolRequest)s to our node. + /// + /// ## Default Protocol Request Handler + /// + /// The default protocol request handler will not respond to any protocol requests, this should not + /// be an issue as long as peers do not think we are ahead of them, if they do they will send requests + /// for our blocks, and we won't respond which will cause them to disconnect. + pub fn with_protocol_request_handler( + self, + new_protocol_handler: NProtoHdlr, + ) -> HandshakerBuilder + where + NProtoHdlr: ProtocolRequestHandler + Clone, + { + let HandshakerBuilder { + address_book, + core_sync_svc, + peer_sync_svc, + our_basic_node_data, + broadcast_stream_maker, + connection_parent_span, + _zone, + .. + } = self; + + HandshakerBuilder { + address_book, + core_sync_svc, + peer_sync_svc, + protocol_request_svc: new_protocol_handler, + our_basic_node_data, + broadcast_stream_maker, + connection_parent_span, + _zone, + } + } + + /// Changes the broadcast stream maker, which is used to create streams that yield messages to broadcast. + /// + /// ## Default Broadcast Stream Maker + /// + /// The default broadcast stream maker just returns [`stream::Pending`], i.e. the returned stream will not + /// produce any messages to broadcast, this is not a problem if your use case does not require broadcasting + /// messages. + pub fn with_broadcast_stream_maker( + self, + new_broadcast_stream_maker: NBrdcstStrmMkr, + ) -> HandshakerBuilder + where + BrdcstStrm: Stream + Send + 'static, + NBrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Clone + Send + 'static, + { + let HandshakerBuilder { + address_book, + core_sync_svc, + peer_sync_svc, + protocol_request_svc, + our_basic_node_data, + connection_parent_span, + _zone, + .. + } = self; + + HandshakerBuilder { + address_book, + core_sync_svc, + peer_sync_svc, + protocol_request_svc, + our_basic_node_data, + broadcast_stream_maker: new_broadcast_stream_maker, + connection_parent_span, + _zone, + } + } + + /// Changes the parent [`Span`] of the connection task to the one provided. + /// + /// ## Default Connection Parent Span + /// + /// The default connection span will be [`Span::none`]. + pub fn with_connection_parent_span(self, connection_parent_span: Span) -> Self { + Self { + connection_parent_span: Some(connection_parent_span), + ..self + } + } + + /// Builds the [`HandShaker`]. + pub fn build(self) -> HandShaker { + HandShaker::new( + self.address_book, + self.peer_sync_svc, + self.core_sync_svc, + self.protocol_request_svc, + self.broadcast_stream_maker, + self.our_basic_node_data, + self.connection_parent_span.unwrap_or(Span::none()), + ) + } +} diff --git a/p2p/p2p-core/src/client/handshaker/builder/dummy.rs b/p2p/p2p-core/src/client/handshaker/builder/dummy.rs new file mode 100644 index 00000000..ae97cdce --- /dev/null +++ b/p2p/p2p-core/src/client/handshaker/builder/dummy.rs @@ -0,0 +1,151 @@ +use std::{ + future::{ready, Ready}, + task::{Context, Poll}, +}; + +use tower::Service; + +use cuprate_wire::CoreSyncData; + +use crate::{ + services::{ + AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse, + PeerSyncRequest, PeerSyncResponse, + }, + NetworkZone, ProtocolRequest, ProtocolResponse, +}; + +/// A dummy peer sync service, that doesn't actually keep track of peers sync states. +#[derive(Debug, Clone)] +pub struct DummyPeerSyncSvc; + +impl Service> for DummyPeerSyncSvc { + type Response = PeerSyncResponse; + type Error = tower::BoxError; + type Future = Ready>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: PeerSyncRequest) -> Self::Future { + ready(Ok(match req { + PeerSyncRequest::PeersToSyncFrom { .. } => PeerSyncResponse::PeersToSyncFrom(vec![]), + PeerSyncRequest::IncomingCoreSyncData(_, _, _) => PeerSyncResponse::Ok, + })) + } +} + +/// A dummy core sync service that just returns static [`CoreSyncData`]. +#[derive(Debug, Clone)] +pub struct DummyCoreSyncSvc(CoreSyncData); + +impl DummyCoreSyncSvc { + /// Returns a [`DummyCoreSyncSvc`] that will just return the mainnet genesis [`CoreSyncData`]. + pub fn static_mainnet_genesis() -> DummyCoreSyncSvc { + DummyCoreSyncSvc(CoreSyncData { + cumulative_difficulty: 1, + cumulative_difficulty_top64: 0, + current_height: 1, + pruning_seed: 0, + top_id: hex_literal::hex!( + "418015bb9ae982a1975da7d79277c2705727a56894ba0fb246adaabb1f4632e3" + ), + top_version: 1, + }) + } + + /// Returns a [`DummyCoreSyncSvc`] that will just return the testnet genesis [`CoreSyncData`]. + pub fn static_testnet_genesis() -> DummyCoreSyncSvc { + DummyCoreSyncSvc(CoreSyncData { + cumulative_difficulty: 1, + cumulative_difficulty_top64: 0, + current_height: 1, + pruning_seed: 0, + top_id: hex_literal::hex!( + "48ca7cd3c8de5b6a4d53d2861fbdaedca141553559f9be9520068053cda8430b" + ), + top_version: 1, + }) + } + + /// Returns a [`DummyCoreSyncSvc`] that will just return the stagenet genesis [`CoreSyncData`]. + pub fn static_stagenet_genesis() -> DummyCoreSyncSvc { + DummyCoreSyncSvc(CoreSyncData { + cumulative_difficulty: 1, + cumulative_difficulty_top64: 0, + current_height: 1, + pruning_seed: 0, + top_id: hex_literal::hex!( + "76ee3cc98646292206cd3e86f74d88b4dcc1d937088645e9b0cbca84b7ce74eb" + ), + top_version: 1, + }) + } + + /// Returns a [`DummyCoreSyncSvc`] that will return the provided [`CoreSyncData`]. + pub fn static_custom(data: CoreSyncData) -> DummyCoreSyncSvc { + DummyCoreSyncSvc(data) + } +} + +impl Service for DummyCoreSyncSvc { + type Response = CoreSyncDataResponse; + type Error = tower::BoxError; + type Future = Ready>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _: CoreSyncDataRequest) -> Self::Future { + ready(Ok(CoreSyncDataResponse(self.0.clone()))) + } +} + +/// A dummy address book that doesn't actually keep track of peers. +#[derive(Debug, Clone)] +pub struct DummyAddressBook; + +impl Service> for DummyAddressBook { + type Response = AddressBookResponse; + type Error = tower::BoxError; + type Future = Ready>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: AddressBookRequest) -> Self::Future { + ready(Ok(match req { + AddressBookRequest::GetWhitePeers(_) => AddressBookResponse::Peers(vec![]), + AddressBookRequest::TakeRandomGrayPeer { .. } + | AddressBookRequest::TakeRandomPeer { .. } + | AddressBookRequest::TakeRandomWhitePeer { .. } => { + return ready(Err("dummy address book does not hold peers".into())); + } + AddressBookRequest::NewConnection { .. } | AddressBookRequest::IncomingPeerList(_) => { + AddressBookResponse::Ok + } + AddressBookRequest::IsPeerBanned(_) => AddressBookResponse::IsPeerBanned(false), + })) + } +} + +/// A dummy protocol request handler. +#[derive(Debug, Clone)] +pub struct DummyProtocolRequestHandler; + +impl Service for DummyProtocolRequestHandler { + type Response = ProtocolResponse; + type Error = tower::BoxError; + type Future = Ready>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _: ProtocolRequest) -> Self::Future { + ready(Ok(ProtocolResponse::NA)) + } +} diff --git a/p2p/p2p-core/src/client/request_handler.rs b/p2p/p2p-core/src/client/request_handler.rs new file mode 100644 index 00000000..284f9545 --- /dev/null +++ b/p2p/p2p-core/src/client/request_handler.rs @@ -0,0 +1,144 @@ +use futures::TryFutureExt; +use tower::ServiceExt; + +use cuprate_wire::{ + admin::{ + PingResponse, SupportFlagsResponse, TimedSyncRequest, TimedSyncResponse, + PING_OK_RESPONSE_STATUS_TEXT, + }, + AdminRequestMessage, AdminResponseMessage, BasicNodeData, +}; + +use crate::{ + client::PeerInformation, + constants::MAX_PEERS_IN_PEER_LIST_MESSAGE, + services::{ + AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse, + PeerSyncRequest, + }, + AddressBook, CoreSyncSvc, NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc, + ProtocolRequestHandler, +}; + +#[derive(thiserror::Error, Debug, Copy, Clone, Eq, PartialEq)] +enum PeerRequestHandlerError { + #[error("Received a handshake request during a connection.")] + ReceivedHandshakeDuringConnection, +} + +/// The peer request handler, handles incoming [`PeerRequest`]s to our node. +#[derive(Debug, Clone)] +pub(crate) struct PeerRequestHandler { + /// The address book service. + pub address_book_svc: A, + /// Our core sync service. + pub our_sync_svc: CS, + /// The peer sync service. + pub peer_sync_svc: PS, + + /// The handler for [`ProtocolRequest`](crate::ProtocolRequest)s to our node. + pub protocol_request_handler: PR, + + /// The basic node data of our node. + pub our_basic_node_data: BasicNodeData, + + /// The information on the connected peer. + pub peer_info: PeerInformation, +} + +impl PeerRequestHandler +where + Z: NetworkZone, + A: AddressBook, + CS: CoreSyncSvc, + PS: PeerSyncSvc, + PR: ProtocolRequestHandler, +{ + /// Handles an incoming [`PeerRequest`] to our node. + pub async fn handle_peer_request( + &mut self, + req: PeerRequest, + ) -> Result { + match req { + PeerRequest::Admin(admin_req) => match admin_req { + AdminRequestMessage::Handshake(_) => { + Err(PeerRequestHandlerError::ReceivedHandshakeDuringConnection.into()) + } + AdminRequestMessage::SupportFlags => { + let support_flags = self.our_basic_node_data.support_flags; + + Ok(PeerResponse::Admin(AdminResponseMessage::SupportFlags( + SupportFlagsResponse { support_flags }, + ))) + } + AdminRequestMessage::Ping => Ok(PeerResponse::Admin(AdminResponseMessage::Ping( + PingResponse { + peer_id: self.our_basic_node_data.peer_id, + status: PING_OK_RESPONSE_STATUS_TEXT, + }, + ))), + AdminRequestMessage::TimedSync(timed_sync_req) => { + let res = self.handle_timed_sync_request(timed_sync_req).await?; + + Ok(PeerResponse::Admin(AdminResponseMessage::TimedSync(res))) + } + }, + + PeerRequest::Protocol(protocol_req) => { + // TODO: add limits here + + self.protocol_request_handler + .ready() + .await? + .call(protocol_req) + .map_ok(PeerResponse::Protocol) + .await + } + } + } + + /// Handles a [`TimedSyncRequest`] to our node. + async fn handle_timed_sync_request( + &mut self, + req: TimedSyncRequest, + ) -> Result { + // TODO: add a limit on the amount of these requests in a certain time period. + + let peer_id = self.peer_info.id; + let handle = self.peer_info.handle.clone(); + + self.peer_sync_svc + .ready() + .await? + .call(PeerSyncRequest::IncomingCoreSyncData( + peer_id, + handle, + req.payload_data, + )) + .await?; + + let AddressBookResponse::Peers(peers) = self + .address_book_svc + .ready() + .await? + .call(AddressBookRequest::GetWhitePeers( + MAX_PEERS_IN_PEER_LIST_MESSAGE, + )) + .await? + else { + panic!("Address book sent incorrect response!"); + }; + + let CoreSyncDataResponse(core_sync_data) = self + .our_sync_svc + .ready() + .await? + .call(CoreSyncDataRequest) + .await?; + + Ok(TimedSyncResponse { + payload_data: core_sync_data, + local_peerlist_new: peers.into_iter().map(Into::into).collect(), + }) + } +} diff --git a/p2p/p2p-core/src/client/timeout_monitor.rs b/p2p/p2p-core/src/client/timeout_monitor.rs index db261b4d..5228edea 100644 --- a/p2p/p2p-core/src/client/timeout_monitor.rs +++ b/p2p/p2p-core/src/client/timeout_monitor.rs @@ -12,7 +12,7 @@ use tokio::{ use tower::ServiceExt; use tracing::instrument; -use cuprate_wire::admin::TimedSyncRequest; +use cuprate_wire::{admin::TimedSyncRequest, AdminRequestMessage, AdminResponseMessage}; use crate::{ client::{connection::ConnectionTaskRequest, InternalPeerID}, @@ -87,15 +87,15 @@ where tracing::debug!(parent: &ping_span, "Sending timed sync to peer"); connection_tx .send(ConnectionTaskRequest { - request: PeerRequest::TimedSync(TimedSyncRequest { + request: PeerRequest::Admin(AdminRequestMessage::TimedSync(TimedSyncRequest { payload_data: core_sync_data, - }), + })), response_channel: tx, permit: Some(permit), }) .await?; - let PeerResponse::TimedSync(timed_sync) = rx.await?? else { + let PeerResponse::Admin(AdminResponseMessage::TimedSync(timed_sync)) = rx.await?? else { panic!("Connection task returned wrong response!"); }; diff --git a/p2p/p2p-core/src/handles.rs b/p2p/p2p-core/src/handles.rs index f3831708..da47b651 100644 --- a/p2p/p2p-core/src/handles.rs +++ b/p2p/p2p-core/src/handles.rs @@ -23,10 +23,8 @@ impl HandleBuilder { } /// Sets the permit for this connection. - /// - /// This must be called at least once. - pub fn with_permit(mut self, permit: OwnedSemaphorePermit) -> Self { - self.permit = Some(permit); + pub fn with_permit(mut self, permit: Option) -> Self { + self.permit = permit; self } @@ -39,7 +37,7 @@ impl HandleBuilder { ( ConnectionGuard { token: token.clone(), - _permit: self.permit.expect("connection permit was not set!"), + _permit: self.permit, }, ConnectionHandle { token: token.clone(), @@ -56,7 +54,7 @@ pub struct BanPeer(pub Duration); /// A struct given to the connection task. pub struct ConnectionGuard { token: CancellationToken, - _permit: OwnedSemaphorePermit, + _permit: Option, } impl ConnectionGuard { diff --git a/p2p/p2p-core/src/lib.rs b/p2p/p2p-core/src/lib.rs index 8703d59e..83cc4d2e 100644 --- a/p2p/p2p-core/src/lib.rs +++ b/p2p/p2p-core/src/lib.rs @@ -1,4 +1,4 @@ -//! # Monero P2P +//! # Cuprate P2P Core //! //! This crate is general purpose P2P networking library for working with Monero. This is a low level //! crate, which means it may seem verbose for a lot of use cases, if you want a crate that handles @@ -6,13 +6,57 @@ //! //! # Network Zones //! -//! This crate abstracts over network zones, Tor/I2p/clearnet with the [NetworkZone] trait. Currently only clearnet is implemented: [ClearNet](network_zones::ClearNet). +//! This crate abstracts over network zones, Tor/I2p/clearnet with the [NetworkZone] trait. Currently only clearnet is implemented: [ClearNet]. //! //! # Usage //! -//! TODO +//! ## Connecting to a peer //! -use std::{fmt::Debug, future::Future, hash::Hash, pin::Pin}; +//! ```rust +//! # use std::{net::SocketAddr, str::FromStr}; +//! # +//! # use tower::ServiceExt; +//! # +//! # use cuprate_p2p_core::{ +//! # client::{ConnectRequest, Connector, HandshakerBuilder}, +//! # ClearNet, Network, +//! # }; +//! # use cuprate_wire::{common::PeerSupportFlags, BasicNodeData}; +//! # use cuprate_test_utils::monerod::monerod; +//! # +//! # tokio_test::block_on(async move { +//! # +//! # let _monerod = monerod::<&str>([]).await; +//! # let addr = _monerod.p2p_addr(); +//! # +//! // The information about our local node. +//! let our_basic_node_data = BasicNodeData { +//! my_port: 0, +//! network_id: Network::Mainnet.network_id(), +//! peer_id: 0, +//! support_flags: PeerSupportFlags::FLUFFY_BLOCKS, +//! rpc_port: 0, +//! rpc_credits_per_hash: 0, +//! }; +//! +//! // See [`HandshakerBuilder`] for information about the default values set, they may not be +//! // appropriate for every use case. +//! let handshaker = HandshakerBuilder::::new(our_basic_node_data).build(); +//! +//! // The outbound connector. +//! let mut connector = Connector::new(handshaker); +//! +//! // The connection. +//! let connection = connector +//! .oneshot(ConnectRequest { +//! addr, +//! permit: None, +//! }) +//! .await +//! .unwrap(); +//! # }); +//! ``` +use std::{fmt::Debug, future::Future, hash::Hash}; use futures::{Sink, Stream}; @@ -25,21 +69,27 @@ pub mod client; mod constants; pub mod error; pub mod handles; -pub mod network_zones; +mod network_zones; pub mod protocol; pub mod services; pub use error::*; +pub use network_zones::{ClearNet, ClearNetServerCfg}; pub use protocol::*; use services::*; +//re-export +pub use cuprate_helper::network::Network; +/// The direction of a connection. #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum ConnectionDirection { - InBound, - OutBound, + /// An inbound connection to our node. + Inbound, + /// An outbound connection from our node. + Outbound, } -#[cfg(not(feature = "borsh"))] +/// An address on a specific [`NetworkZone`]. pub trait NetZoneAddress: TryFrom + Into @@ -56,46 +106,19 @@ pub trait NetZoneAddress: /// that include the port, to be able to facilitate this network addresses must have a ban ID /// which for hidden services could just be the address it self but for clear net addresses will /// be the IP address. - /// TODO: IP zone banning? - type BanID: Debug + Hash + Eq + Clone + Copy + Send + 'static; - - /// Changes the port of this address to `port`. - fn set_port(&mut self, port: u16); - - fn make_canonical(&mut self); - - fn ban_id(&self) -> Self::BanID; - - fn should_add_to_peer_list(&self) -> bool; -} - -#[cfg(feature = "borsh")] -pub trait NetZoneAddress: - TryFrom - + Into - + std::fmt::Display - + borsh::BorshSerialize - + borsh::BorshDeserialize - + Hash - + Eq - + Copy - + Send - + Sync - + Unpin - + 'static -{ - /// Cuprate needs to be able to ban peers by IP addresses and not just by SocketAddr as - /// that include the port, to be able to facilitate this network addresses must have a ban ID - /// which for hidden services could just be the address it self but for clear net addresses will - /// be the IP address. - /// TODO: IP zone banning? + /// + /// - TODO: IP zone banning? + /// - TODO: rename this to Host. + type BanID: Debug + Hash + Eq + Clone + Copy + Send + 'static; /// Changes the port of this address to `port`. fn set_port(&mut self, port: u16); + /// Turns this address into its canonical form. fn make_canonical(&mut self); + /// Returns the [`Self::BanID`] for this address. fn ban_id(&self) -> Self::BanID; fn should_add_to_peer_list(&self) -> bool; @@ -136,6 +159,15 @@ pub trait NetworkZone: Clone + Copy + Send + 'static { /// Config used to start a server which listens for incoming connections. type ServerCfg: Clone + Debug + Send + 'static; + /// Connects to a peer with the given address. + /// + ///
+ /// + /// This does not complete a handshake with the peer, to do that see the [crate](crate) docs. + /// + ///
+ /// + /// Returns the [`Self::Stream`] and [`Self::Sink`] to send messages to the peer. async fn connect_to_peer( addr: Self::Addr, ) -> Result<(Self::Stream, Self::Sink), std::io::Error>; @@ -206,55 +238,48 @@ pub trait CoreSyncSvc: CoreSyncDataRequest, Response = CoreSyncDataResponse, Error = tower::BoxError, - Future = Pin< - Box< - dyn Future> + Send + 'static, - >, - >, + Future = Self::Future2, > + Send + 'static { + // This allows us to put more restrictive bounds on the future without defining the future here + // explicitly. + type Future2: Future> + Send + 'static; } -impl CoreSyncSvc for T where +impl CoreSyncSvc for T +where T: tower::Service< CoreSyncDataRequest, Response = CoreSyncDataResponse, Error = tower::BoxError, - Future = Pin< - Box< - dyn Future> - + Send - + 'static, - >, - >, > + Send - + 'static + + 'static, + T::Future: Future> + Send + 'static, { + type Future2 = T::Future; } -pub trait PeerRequestHandler: +pub trait ProtocolRequestHandler: tower::Service< - PeerRequest, - Response = PeerResponse, + ProtocolRequest, + Response = ProtocolResponse, Error = tower::BoxError, - Future = Pin< - Box> + Send + 'static>, - >, + Future = Self::Future2, > + Send + 'static { + // This allows us to put more restrictive bounds on the future without defining the future here + // explicitly. + type Future2: Future> + Send + 'static; } -impl PeerRequestHandler for T where - T: tower::Service< - PeerRequest, - Response = PeerResponse, - Error = tower::BoxError, - Future = Pin< - Box> + Send + 'static>, - >, - > + Send - + 'static +impl ProtocolRequestHandler for T +where + T: tower::Service + + Send + + 'static, + T::Future: Future> + Send + 'static, { + type Future2 = T::Future; } diff --git a/p2p/p2p-core/src/protocol.rs b/p2p/p2p-core/src/protocol.rs index 172038f8..5e4f4d7e 100644 --- a/p2p/p2p-core/src/protocol.rs +++ b/p2p/p2p-core/src/protocol.rs @@ -1,13 +1,16 @@ -//! This module defines InternalRequests and InternalResponses. Cuprate's P2P works by translating network messages into an internal -//! request/ response, this is easy for levin "requests" and "responses" (admin messages) but takes a bit more work with "notifications" +//! This module defines [`PeerRequest`] and [`PeerResponse`]. Cuprate's P2P crates works by translating network messages into an internal +//! request/response enums, this is easy for levin "requests" and "responses" (admin messages) but takes a bit more work with "notifications" //! (protocol messages). //! -//! Some notifications are easy to translate, like `GetObjectsRequest` is obviously a request but others like `NewFluffyBlock` are a -//! bit tri cker. To translate a `NewFluffyBlock` into a request/ response we will have to look to see if we asked for `FluffyMissingTransactionsRequest` -//! if we have we interpret `NewFluffyBlock` as a response if not its a request that doesn't require a response. +//! Some notifications are easy to translate, like [`GetObjectsRequest`] is obviously a request but others like [`NewFluffyBlock`] are a +//! bit tricker. To translate a [`NewFluffyBlock`] into a request/ response we will have to look to see if we asked for [`FluffyMissingTransactionsRequest`], +//! if we have, we interpret [`NewFluffyBlock`] as a response, if not, it's a request that doesn't require a response. //! -//! Here is every P2P request/ response. *note admin messages are already request/ response so "Handshake" is actually made of a HandshakeRequest & HandshakeResponse +//! Here is every P2P request/response. //! +//! *note admin messages are already request/response so "Handshake" is actually made of a HandshakeRequest & HandshakeResponse +//! +//! ```md //! Admin: //! Handshake, //! TimedSync, @@ -21,16 +24,14 @@ //! Request: NewBlock, Response: None, //! Request: NewFluffyBlock, Response: None, //! Request: NewTransactions, Response: None +//!``` //! use cuprate_wire::{ - admin::{ - HandshakeRequest, HandshakeResponse, PingResponse, SupportFlagsResponse, TimedSyncRequest, - TimedSyncResponse, - }, protocol::{ ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest, GetObjectsResponse, GetTxPoolCompliment, NewBlock, NewFluffyBlock, NewTransactions, }, + AdminRequestMessage, AdminResponseMessage, }; mod try_from; @@ -60,12 +61,7 @@ pub enum BroadcastMessage { } #[derive(Debug, Clone)] -pub enum PeerRequest { - Handshake(HandshakeRequest), - TimedSync(TimedSyncRequest), - Ping, - SupportFlags, - +pub enum ProtocolRequest { GetObjects(GetObjectsRequest), GetChain(ChainRequest), FluffyMissingTxs(FluffyMissingTransactionsRequest), @@ -75,41 +71,47 @@ pub enum PeerRequest { NewTransactions(NewTransactions), } +#[derive(Debug, Clone)] +pub enum PeerRequest { + Admin(AdminRequestMessage), + Protocol(ProtocolRequest), +} + impl PeerRequest { pub fn id(&self) -> MessageID { match self { - PeerRequest::Handshake(_) => MessageID::Handshake, - PeerRequest::TimedSync(_) => MessageID::TimedSync, - PeerRequest::Ping => MessageID::Ping, - PeerRequest::SupportFlags => MessageID::SupportFlags, - - PeerRequest::GetObjects(_) => MessageID::GetObjects, - PeerRequest::GetChain(_) => MessageID::GetChain, - PeerRequest::FluffyMissingTxs(_) => MessageID::FluffyMissingTxs, - PeerRequest::GetTxPoolCompliment(_) => MessageID::GetTxPoolCompliment, - PeerRequest::NewBlock(_) => MessageID::NewBlock, - PeerRequest::NewFluffyBlock(_) => MessageID::NewFluffyBlock, - PeerRequest::NewTransactions(_) => MessageID::NewTransactions, + PeerRequest::Admin(admin_req) => match admin_req { + AdminRequestMessage::Handshake(_) => MessageID::Handshake, + AdminRequestMessage::TimedSync(_) => MessageID::TimedSync, + AdminRequestMessage::Ping => MessageID::Ping, + AdminRequestMessage::SupportFlags => MessageID::SupportFlags, + }, + PeerRequest::Protocol(protocol_request) => match protocol_request { + ProtocolRequest::GetObjects(_) => MessageID::GetObjects, + ProtocolRequest::GetChain(_) => MessageID::GetChain, + ProtocolRequest::FluffyMissingTxs(_) => MessageID::FluffyMissingTxs, + ProtocolRequest::GetTxPoolCompliment(_) => MessageID::GetTxPoolCompliment, + ProtocolRequest::NewBlock(_) => MessageID::NewBlock, + ProtocolRequest::NewFluffyBlock(_) => MessageID::NewFluffyBlock, + ProtocolRequest::NewTransactions(_) => MessageID::NewTransactions, + }, } } pub fn needs_response(&self) -> bool { !matches!( self, - PeerRequest::NewBlock(_) - | PeerRequest::NewFluffyBlock(_) - | PeerRequest::NewTransactions(_) + PeerRequest::Protocol( + ProtocolRequest::NewBlock(_) + | ProtocolRequest::NewFluffyBlock(_) + | ProtocolRequest::NewTransactions(_) + ) ) } } #[derive(Debug, Clone)] -pub enum PeerResponse { - Handshake(HandshakeResponse), - TimedSync(TimedSyncResponse), - Ping(PingResponse), - SupportFlags(SupportFlagsResponse), - +pub enum ProtocolResponse { GetObjects(GetObjectsResponse), GetChain(ChainResponse), NewFluffyBlock(NewFluffyBlock), @@ -117,20 +119,29 @@ pub enum PeerResponse { NA, } +#[derive(Debug, Clone)] +pub enum PeerResponse { + Admin(AdminResponseMessage), + Protocol(ProtocolResponse), +} + impl PeerResponse { - pub fn id(&self) -> MessageID { - match self { - PeerResponse::Handshake(_) => MessageID::Handshake, - PeerResponse::TimedSync(_) => MessageID::TimedSync, - PeerResponse::Ping(_) => MessageID::Ping, - PeerResponse::SupportFlags(_) => MessageID::SupportFlags, + pub fn id(&self) -> Option { + Some(match self { + PeerResponse::Admin(admin_res) => match admin_res { + AdminResponseMessage::Handshake(_) => MessageID::Handshake, + AdminResponseMessage::TimedSync(_) => MessageID::TimedSync, + AdminResponseMessage::Ping(_) => MessageID::Ping, + AdminResponseMessage::SupportFlags(_) => MessageID::SupportFlags, + }, + PeerResponse::Protocol(protocol_res) => match protocol_res { + ProtocolResponse::GetObjects(_) => MessageID::GetObjects, + ProtocolResponse::GetChain(_) => MessageID::GetChain, + ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock, + ProtocolResponse::NewTransactions(_) => MessageID::NewFluffyBlock, - PeerResponse::GetObjects(_) => MessageID::GetObjects, - PeerResponse::GetChain(_) => MessageID::GetChain, - PeerResponse::NewFluffyBlock(_) => MessageID::NewBlock, - PeerResponse::NewTransactions(_) => MessageID::NewFluffyBlock, - - PeerResponse::NA => panic!("Can't get message ID for a non existent response"), - } + ProtocolResponse::NA => return None, + }, + }) } } diff --git a/p2p/p2p-core/src/protocol/try_from.rs b/p2p/p2p-core/src/protocol/try_from.rs index 8e3d026a..8a0b67d2 100644 --- a/p2p/p2p-core/src/protocol/try_from.rs +++ b/p2p/p2p-core/src/protocol/try_from.rs @@ -1,150 +1,111 @@ //! This module contains the implementations of [`TryFrom`] and [`From`] to convert between //! [`Message`], [`PeerRequest`] and [`PeerResponse`]. -use cuprate_wire::{Message, ProtocolMessage, RequestMessage, ResponseMessage}; +use cuprate_wire::{Message, ProtocolMessage}; -use super::{PeerRequest, PeerResponse}; +use crate::{PeerRequest, PeerResponse, ProtocolRequest, ProtocolResponse}; #[derive(Debug)] pub struct MessageConversionError; -macro_rules! match_body { - (match $value: ident {$($body:tt)*} ($left:pat => $right_ty:expr) $($todo:tt)*) => { - match_body!( match $value { - $left => $right_ty, - $($body)* - } $($todo)* ) - }; - (match $value: ident {$($body:tt)*}) => { - match $value { - $($body)* - } - }; -} - -macro_rules! from { - ($left_ty:ident, $right_ty:ident, {$($left:ident $(($val: ident))? = $right:ident $(($vall: ident))?,)+}) => { - impl From<$left_ty> for $right_ty { - fn from(value: $left_ty) -> Self { - match_body!( match value {} - $(($left_ty::$left$(($val))? => $right_ty::$right$(($vall))?))+ - ) +impl From for ProtocolMessage { + fn from(value: ProtocolRequest) -> Self { + match value { + ProtocolRequest::GetObjects(val) => ProtocolMessage::GetObjectsRequest(val), + ProtocolRequest::GetChain(val) => ProtocolMessage::ChainRequest(val), + ProtocolRequest::FluffyMissingTxs(val) => { + ProtocolMessage::FluffyMissingTransactionsRequest(val) } + ProtocolRequest::GetTxPoolCompliment(val) => ProtocolMessage::GetTxPoolCompliment(val), + ProtocolRequest::NewBlock(val) => ProtocolMessage::NewBlock(val), + ProtocolRequest::NewFluffyBlock(val) => ProtocolMessage::NewFluffyBlock(val), + ProtocolRequest::NewTransactions(val) => ProtocolMessage::NewTransactions(val), } - }; + } } -macro_rules! try_from { - ($left_ty:ident, $right_ty:ident, {$($left:ident $(($val: ident))? = $right:ident $(($vall: ident))?,)+}) => { - impl TryFrom<$left_ty> for $right_ty { - type Error = MessageConversionError; - - fn try_from(value: $left_ty) -> Result { - Ok(match_body!( match value { - _ => return Err(MessageConversionError) - } - $(($left_ty::$left$(($val))? => $right_ty::$right$(($vall))?))+ - )) - } - } - }; -} - -macro_rules! from_try_from { - ($left_ty:ident, $right_ty:ident, {$($left:ident $(($val: ident))? = $right:ident $(($vall: ident))?,)+}) => { - try_from!($left_ty, $right_ty, {$($left $(($val))? = $right $(($vall))?,)+}); - from!($right_ty, $left_ty, {$($right $(($val))? = $left $(($vall))?,)+}); - }; -} - -macro_rules! try_from_try_from { - ($left_ty:ident, $right_ty:ident, {$($left:ident $(($val: ident))? = $right:ident $(($vall: ident))?,)+}) => { - try_from!($left_ty, $right_ty, {$($left $(($val))? = $right $(($vall))?,)+}); - try_from!($right_ty, $left_ty, {$($right $(($val))? = $left $(($val))?,)+}); - }; -} - -from_try_from!(PeerRequest, RequestMessage,{ - Handshake(val) = Handshake(val), - Ping = Ping, - SupportFlags = SupportFlags, - TimedSync(val) = TimedSync(val), -}); - -try_from_try_from!(PeerRequest, ProtocolMessage,{ - NewBlock(val) = NewBlock(val), - NewFluffyBlock(val) = NewFluffyBlock(val), - GetObjects(val) = GetObjectsRequest(val), - GetChain(val) = ChainRequest(val), - NewTransactions(val) = NewTransactions(val), - FluffyMissingTxs(val) = FluffyMissingTransactionsRequest(val), - GetTxPoolCompliment(val) = GetTxPoolCompliment(val), -}); - -impl TryFrom for PeerRequest { +impl TryFrom for ProtocolRequest { type Error = MessageConversionError; - fn try_from(value: Message) -> Result { - match value { - Message::Request(req) => Ok(req.into()), - Message::Protocol(pro) => pro.try_into(), - _ => Err(MessageConversionError), - } + fn try_from(value: ProtocolMessage) -> Result { + Ok(match value { + ProtocolMessage::GetObjectsRequest(val) => ProtocolRequest::GetObjects(val), + ProtocolMessage::ChainRequest(val) => ProtocolRequest::GetChain(val), + ProtocolMessage::FluffyMissingTransactionsRequest(val) => { + ProtocolRequest::FluffyMissingTxs(val) + } + ProtocolMessage::GetTxPoolCompliment(val) => ProtocolRequest::GetTxPoolCompliment(val), + ProtocolMessage::NewBlock(val) => ProtocolRequest::NewBlock(val), + ProtocolMessage::NewFluffyBlock(val) => ProtocolRequest::NewFluffyBlock(val), + ProtocolMessage::NewTransactions(val) => ProtocolRequest::NewTransactions(val), + ProtocolMessage::GetObjectsResponse(_) | ProtocolMessage::ChainEntryResponse(_) => { + return Err(MessageConversionError) + } + }) } } impl From for Message { fn from(value: PeerRequest) -> Self { match value { - PeerRequest::Handshake(val) => Message::Request(RequestMessage::Handshake(val)), - PeerRequest::Ping => Message::Request(RequestMessage::Ping), - PeerRequest::SupportFlags => Message::Request(RequestMessage::SupportFlags), - PeerRequest::TimedSync(val) => Message::Request(RequestMessage::TimedSync(val)), - - PeerRequest::NewBlock(val) => Message::Protocol(ProtocolMessage::NewBlock(val)), - PeerRequest::NewFluffyBlock(val) => { - Message::Protocol(ProtocolMessage::NewFluffyBlock(val)) - } - PeerRequest::GetObjects(val) => { - Message::Protocol(ProtocolMessage::GetObjectsRequest(val)) - } - PeerRequest::GetChain(val) => Message::Protocol(ProtocolMessage::ChainRequest(val)), - PeerRequest::NewTransactions(val) => { - Message::Protocol(ProtocolMessage::NewTransactions(val)) - } - PeerRequest::FluffyMissingTxs(val) => { - Message::Protocol(ProtocolMessage::FluffyMissingTransactionsRequest(val)) - } - PeerRequest::GetTxPoolCompliment(val) => { - Message::Protocol(ProtocolMessage::GetTxPoolCompliment(val)) - } + PeerRequest::Admin(val) => Message::Request(val), + PeerRequest::Protocol(val) => Message::Protocol(val.into()), } } } -from_try_from!(PeerResponse, ResponseMessage,{ - Handshake(val) = Handshake(val), - Ping(val) = Ping(val), - SupportFlags(val) = SupportFlags(val), - TimedSync(val) = TimedSync(val), -}); +impl TryFrom for PeerRequest { + type Error = MessageConversionError; -try_from_try_from!(PeerResponse, ProtocolMessage,{ - NewFluffyBlock(val) = NewFluffyBlock(val), - GetObjects(val) = GetObjectsResponse(val), - GetChain(val) = ChainEntryResponse(val), - NewTransactions(val) = NewTransactions(val), + fn try_from(value: Message) -> Result { + match value { + Message::Request(req) => Ok(PeerRequest::Admin(req)), + Message::Protocol(pro) => Ok(PeerRequest::Protocol(pro.try_into()?)), + Message::Response(_) => Err(MessageConversionError), + } + } +} -}); +impl TryFrom for ProtocolMessage { + type Error = MessageConversionError; + + fn try_from(value: ProtocolResponse) -> Result { + Ok(match value { + ProtocolResponse::NewTransactions(val) => ProtocolMessage::NewTransactions(val), + ProtocolResponse::NewFluffyBlock(val) => ProtocolMessage::NewFluffyBlock(val), + ProtocolResponse::GetChain(val) => ProtocolMessage::ChainEntryResponse(val), + ProtocolResponse::GetObjects(val) => ProtocolMessage::GetObjectsResponse(val), + ProtocolResponse::NA => return Err(MessageConversionError), + }) + } +} + +impl TryFrom for ProtocolResponse { + type Error = MessageConversionError; + + fn try_from(value: ProtocolMessage) -> Result { + Ok(match value { + ProtocolMessage::NewTransactions(val) => ProtocolResponse::NewTransactions(val), + ProtocolMessage::NewFluffyBlock(val) => ProtocolResponse::NewFluffyBlock(val), + ProtocolMessage::ChainEntryResponse(val) => ProtocolResponse::GetChain(val), + ProtocolMessage::GetObjectsResponse(val) => ProtocolResponse::GetObjects(val), + ProtocolMessage::ChainRequest(_) + | ProtocolMessage::FluffyMissingTransactionsRequest(_) + | ProtocolMessage::GetObjectsRequest(_) + | ProtocolMessage::GetTxPoolCompliment(_) + | ProtocolMessage::NewBlock(_) => return Err(MessageConversionError), + }) + } +} impl TryFrom for PeerResponse { type Error = MessageConversionError; fn try_from(value: Message) -> Result { match value { - Message::Response(res) => Ok(res.into()), - Message::Protocol(pro) => pro.try_into(), - _ => Err(MessageConversionError), + Message::Response(res) => Ok(PeerResponse::Admin(res)), + Message::Protocol(pro) => Ok(PeerResponse::Protocol(pro.try_into()?)), + Message::Request(_) => Err(MessageConversionError), } } } @@ -154,27 +115,8 @@ impl TryFrom for Message { fn try_from(value: PeerResponse) -> Result { Ok(match value { - PeerResponse::Handshake(val) => Message::Response(ResponseMessage::Handshake(val)), - PeerResponse::Ping(val) => Message::Response(ResponseMessage::Ping(val)), - PeerResponse::SupportFlags(val) => { - Message::Response(ResponseMessage::SupportFlags(val)) - } - PeerResponse::TimedSync(val) => Message::Response(ResponseMessage::TimedSync(val)), - - PeerResponse::NewFluffyBlock(val) => { - Message::Protocol(ProtocolMessage::NewFluffyBlock(val)) - } - PeerResponse::GetObjects(val) => { - Message::Protocol(ProtocolMessage::GetObjectsResponse(val)) - } - PeerResponse::GetChain(val) => { - Message::Protocol(ProtocolMessage::ChainEntryResponse(val)) - } - PeerResponse::NewTransactions(val) => { - Message::Protocol(ProtocolMessage::NewTransactions(val)) - } - - PeerResponse::NA => return Err(MessageConversionError), + PeerResponse::Admin(val) => Message::Response(val), + PeerResponse::Protocol(val) => Message::Protocol(val.try_into()?), }) } } diff --git a/p2p/p2p-core/src/services.rs b/p2p/p2p-core/src/services.rs index 6fd6c15b..b01bde0e 100644 --- a/p2p/p2p-core/src/services.rs +++ b/p2p/p2p-core/src/services.rs @@ -6,6 +6,7 @@ use crate::{ NetworkZone, }; +/// A request to the service that keeps track of peers sync states. pub enum PeerSyncRequest { /// Request some peers to sync from. /// @@ -15,10 +16,11 @@ pub enum PeerSyncRequest { current_cumulative_difficulty: u128, block_needed: Option, }, - /// Add/update a peers core sync data to the sync state service. + /// Add/update a peer's core sync data. IncomingCoreSyncData(InternalPeerID, ConnectionHandle, CoreSyncData), } +/// A response from the service that keeps track of peers sync states. pub enum PeerSyncResponse { /// The return value of [`PeerSyncRequest::PeersToSyncFrom`]. PeersToSyncFrom(Vec>), @@ -26,10 +28,16 @@ pub enum PeerSyncResponse { Ok, } +/// A request to the core sync service for our node's [`CoreSyncData`]. pub struct CoreSyncDataRequest; +/// A response from the core sync service containing our [`CoreSyncData`]. pub struct CoreSyncDataResponse(pub CoreSyncData); +/// A [`NetworkZone`] specific [`PeerListEntryBase`]. +/// +/// Using this type instead of [`PeerListEntryBase`] in the address book makes +/// usage easier for the rest of the P2P code as we can guarantee only the correct addresses will be stored and returned. #[derive(Debug, Copy, Clone, Eq, PartialEq)] #[cfg_attr( feature = "borsh", @@ -57,6 +65,7 @@ impl From> for cuprate_wire: } } +/// An error converting a [`PeerListEntryBase`] into a [`ZoneSpecificPeerListEntryBase`]. #[derive(Debug, thiserror::Error)] pub enum PeerListConversionError { #[error("Address is in incorrect zone")] @@ -82,6 +91,7 @@ impl TryFrom } } +/// A request to the address book service. pub enum AddressBookRequest { /// Tells the address book that we have connected or received a connection from a peer. NewConnection { @@ -123,6 +133,7 @@ pub enum AddressBookRequest { IsPeerBanned(Z::Addr), } +/// A response from the address book service. pub enum AddressBookResponse { Ok, Peer(ZoneSpecificPeerListEntryBase), diff --git a/p2p/p2p-core/tests/fragmented_handshake.rs b/p2p/p2p-core/tests/fragmented_handshake.rs index 2e96574c..c19a2a63 100644 --- a/p2p/p2p-core/tests/fragmented_handshake.rs +++ b/p2p/p2p-core/tests/fragmented_handshake.rs @@ -2,7 +2,6 @@ use std::{ net::SocketAddr, pin::Pin, - sync::Arc, task::{Context, Poll}, time::Duration, }; @@ -13,7 +12,6 @@ use tokio::{ tcp::{OwnedReadHalf, OwnedWriteHalf}, TcpListener, TcpStream, }, - sync::Semaphore, time::timeout, }; use tokio_util::{ @@ -24,9 +22,11 @@ use tower::{Service, ServiceExt}; use cuprate_helper::network::Network; use cuprate_p2p_core::{ - client::{ConnectRequest, Connector, DoHandshakeRequest, HandShaker, InternalPeerID}, - network_zones::ClearNetServerCfg, - ConnectionDirection, NetworkZone, + client::{ + handshaker::HandshakerBuilder, ConnectRequest, Connector, DoHandshakeRequest, + InternalPeerID, + }, + ClearNetServerCfg, ConnectionDirection, NetworkZone, }; use cuprate_wire::{ common::PeerSupportFlags, @@ -36,9 +36,6 @@ use cuprate_wire::{ use cuprate_test_utils::monerod::monerod; -mod utils; -use utils::*; - /// A network zone equal to clear net where every message sent is turned into a fragmented message. /// Does not support sending fragmented or dummy messages manually. #[derive(Clone, Copy)] @@ -135,9 +132,6 @@ impl Encoder> for FragmentCodec { #[tokio::test] async fn fragmented_handshake_cuprate_to_monerod() { - let semaphore = Arc::new(Semaphore::new(10)); - let permit = semaphore.acquire_owned().await.unwrap(); - let monerod = monerod(["--fixed-difficulty=1", "--out-peers=0"]).await; let our_basic_node_data = BasicNodeData { @@ -149,14 +143,7 @@ async fn fragmented_handshake_cuprate_to_monerod() { rpc_credits_per_hash: 0, }; - let handshaker = HandShaker::::new( - DummyAddressBook, - DummyPeerSyncSvc, - DummyCoreSyncSvc, - DummyPeerRequestHandlerSvc, - |_| futures::stream::pending(), - our_basic_node_data, - ); + let handshaker = HandshakerBuilder::::new(our_basic_node_data).build(); let mut connector = Connector::new(handshaker); @@ -166,7 +153,7 @@ async fn fragmented_handshake_cuprate_to_monerod() { .unwrap() .call(ConnectRequest { addr: monerod.p2p_addr(), - permit, + permit: None, }) .await .unwrap(); @@ -174,9 +161,6 @@ async fn fragmented_handshake_cuprate_to_monerod() { #[tokio::test] async fn fragmented_handshake_monerod_to_cuprate() { - let semaphore = Arc::new(Semaphore::new(10)); - let permit = semaphore.acquire_owned().await.unwrap(); - let our_basic_node_data = BasicNodeData { my_port: 18081, network_id: Network::Mainnet.network_id(), @@ -186,14 +170,7 @@ async fn fragmented_handshake_monerod_to_cuprate() { rpc_credits_per_hash: 0, }; - let mut handshaker = HandShaker::::new( - DummyAddressBook, - DummyPeerSyncSvc, - DummyCoreSyncSvc, - DummyPeerRequestHandlerSvc, - |_| futures::stream::pending(), - our_basic_node_data, - ); + let mut handshaker = HandshakerBuilder::::new(our_basic_node_data).build(); let ip = "127.0.0.1".parse().unwrap(); @@ -215,8 +192,8 @@ async fn fragmented_handshake_monerod_to_cuprate() { addr: InternalPeerID::KnownAddr(addr.unwrap()), // This is clear net all addresses are known. peer_stream: stream, peer_sink: sink, - direction: ConnectionDirection::InBound, - permit, + direction: ConnectionDirection::Inbound, + permit: None, }) .await .unwrap(); diff --git a/p2p/p2p-core/tests/handles.rs b/p2p/p2p-core/tests/handles.rs index e98cd2d4..47d70b05 100644 --- a/p2p/p2p-core/tests/handles.rs +++ b/p2p/p2p-core/tests/handles.rs @@ -6,10 +6,7 @@ use cuprate_p2p_core::handles::HandleBuilder; #[test] fn send_ban_signal() { - let semaphore = Arc::new(Semaphore::new(5)); - let (guard, mut connection_handle) = HandleBuilder::default() - .with_permit(semaphore.try_acquire_owned().unwrap()) - .build(); + let (guard, mut connection_handle) = HandleBuilder::default().build(); connection_handle.ban_peer(Duration::from_secs(300)); @@ -28,10 +25,7 @@ fn send_ban_signal() { #[test] fn multiple_ban_signals() { - let semaphore = Arc::new(Semaphore::new(5)); - let (guard, mut connection_handle) = HandleBuilder::default() - .with_permit(semaphore.try_acquire_owned().unwrap()) - .build(); + let (guard, mut connection_handle) = HandleBuilder::default().build(); connection_handle.ban_peer(Duration::from_secs(300)); connection_handle.ban_peer(Duration::from_secs(301)); @@ -55,7 +49,7 @@ fn multiple_ban_signals() { fn dropped_guard_sends_disconnect_signal() { let semaphore = Arc::new(Semaphore::new(5)); let (guard, connection_handle) = HandleBuilder::default() - .with_permit(semaphore.try_acquire_owned().unwrap()) + .with_permit(Some(semaphore.try_acquire_owned().unwrap())) .build(); assert!(!connection_handle.is_closed()); diff --git a/p2p/p2p-core/tests/handshake.rs b/p2p/p2p-core/tests/handshake.rs index f9792488..5ce6153a 100644 --- a/p2p/p2p-core/tests/handshake.rs +++ b/p2p/p2p-core/tests/handshake.rs @@ -1,9 +1,8 @@ -use std::{sync::Arc, time::Duration}; +use std::time::Duration; use futures::StreamExt; use tokio::{ io::{duplex, split}, - sync::Semaphore, time::timeout, }; use tokio_util::codec::{FramedRead, FramedWrite}; @@ -13,9 +12,11 @@ use cuprate_helper::network::Network; use cuprate_wire::{common::PeerSupportFlags, BasicNodeData, MoneroWireCodec}; use cuprate_p2p_core::{ - client::{ConnectRequest, Connector, DoHandshakeRequest, HandShaker, InternalPeerID}, - network_zones::{ClearNet, ClearNetServerCfg}, - ConnectionDirection, NetworkZone, + client::{ + handshaker::HandshakerBuilder, ConnectRequest, Connector, DoHandshakeRequest, + InternalPeerID, + }, + ClearNet, ClearNetServerCfg, ConnectionDirection, NetworkZone, }; use cuprate_test_utils::{ @@ -23,18 +24,10 @@ use cuprate_test_utils::{ test_netzone::{TestNetZone, TestNetZoneAddr}, }; -mod utils; -use utils::*; - #[tokio::test] async fn handshake_cuprate_to_cuprate() { // Tests a Cuprate <-> Cuprate handshake by making 2 handshake services and making them talk to // each other. - - let semaphore = Arc::new(Semaphore::new(10)); - let permit_1 = semaphore.clone().acquire_owned().await.unwrap(); - let permit_2 = semaphore.acquire_owned().await.unwrap(); - let our_basic_node_data_1 = BasicNodeData { my_port: 0, network_id: Network::Mainnet.network_id(), @@ -48,23 +41,11 @@ async fn handshake_cuprate_to_cuprate() { let mut our_basic_node_data_2 = our_basic_node_data_1.clone(); our_basic_node_data_2.peer_id = 2344; - let mut handshaker_1 = HandShaker::, _, _, _, _, _>::new( - DummyAddressBook, - DummyPeerSyncSvc, - DummyCoreSyncSvc, - DummyPeerRequestHandlerSvc, - |_| futures::stream::pending(), - our_basic_node_data_1, - ); + let mut handshaker_1 = + HandshakerBuilder::>::new(our_basic_node_data_1).build(); - let mut handshaker_2 = HandShaker::, _, _, _, _, _>::new( - DummyAddressBook, - DummyPeerSyncSvc, - DummyCoreSyncSvc, - DummyPeerRequestHandlerSvc, - |_| futures::stream::pending(), - our_basic_node_data_2, - ); + let mut handshaker_2 = + HandshakerBuilder::>::new(our_basic_node_data_2).build(); let (p1, p2) = duplex(50_000); @@ -75,16 +56,16 @@ async fn handshake_cuprate_to_cuprate() { addr: InternalPeerID::KnownAddr(TestNetZoneAddr(888)), peer_stream: FramedRead::new(p2_receiver, MoneroWireCodec::default()), peer_sink: FramedWrite::new(p2_sender, MoneroWireCodec::default()), - direction: ConnectionDirection::OutBound, - permit: permit_1, + direction: ConnectionDirection::Outbound, + permit: None, }; let p2_handshake_req = DoHandshakeRequest { addr: InternalPeerID::KnownAddr(TestNetZoneAddr(444)), peer_stream: FramedRead::new(p1_receiver, MoneroWireCodec::default()), peer_sink: FramedWrite::new(p1_sender, MoneroWireCodec::default()), - direction: ConnectionDirection::InBound, - permit: permit_2, + direction: ConnectionDirection::Inbound, + permit: None, }; let p1 = tokio::spawn(async move { @@ -114,9 +95,6 @@ async fn handshake_cuprate_to_cuprate() { #[tokio::test] async fn handshake_cuprate_to_monerod() { - let semaphore = Arc::new(Semaphore::new(10)); - let permit = semaphore.acquire_owned().await.unwrap(); - let monerod = monerod(["--fixed-difficulty=1", "--out-peers=0"]).await; let our_basic_node_data = BasicNodeData { @@ -128,14 +106,7 @@ async fn handshake_cuprate_to_monerod() { rpc_credits_per_hash: 0, }; - let handshaker = HandShaker::::new( - DummyAddressBook, - DummyPeerSyncSvc, - DummyCoreSyncSvc, - DummyPeerRequestHandlerSvc, - |_| futures::stream::pending(), - our_basic_node_data, - ); + let handshaker = HandshakerBuilder::::new(our_basic_node_data).build(); let mut connector = Connector::new(handshaker); @@ -145,7 +116,7 @@ async fn handshake_cuprate_to_monerod() { .unwrap() .call(ConnectRequest { addr: monerod.p2p_addr(), - permit, + permit: None, }) .await .unwrap(); @@ -153,9 +124,6 @@ async fn handshake_cuprate_to_monerod() { #[tokio::test] async fn handshake_monerod_to_cuprate() { - let semaphore = Arc::new(Semaphore::new(10)); - let permit = semaphore.acquire_owned().await.unwrap(); - let our_basic_node_data = BasicNodeData { my_port: 18081, network_id: Network::Mainnet.network_id(), @@ -165,14 +133,7 @@ async fn handshake_monerod_to_cuprate() { rpc_credits_per_hash: 0, }; - let mut handshaker = HandShaker::::new( - DummyAddressBook, - DummyPeerSyncSvc, - DummyCoreSyncSvc, - DummyPeerRequestHandlerSvc, - |_| futures::stream::pending(), - our_basic_node_data, - ); + let mut handshaker = HandshakerBuilder::::new(our_basic_node_data).build(); let ip = "127.0.0.1".parse().unwrap(); @@ -194,8 +155,8 @@ async fn handshake_monerod_to_cuprate() { addr: InternalPeerID::KnownAddr(addr.unwrap()), // This is clear net all addresses are known. peer_stream: stream, peer_sink: sink, - direction: ConnectionDirection::InBound, - permit, + direction: ConnectionDirection::Inbound, + permit: None, }) .await .unwrap(); diff --git a/p2p/p2p-core/tests/sending_receiving.rs b/p2p/p2p-core/tests/sending_receiving.rs index b4c42e2c..e035daf8 100644 --- a/p2p/p2p-core/tests/sending_receiving.rs +++ b/p2p/p2p-core/tests/sending_receiving.rs @@ -1,27 +1,18 @@ -use std::sync::Arc; - -use tokio::sync::Semaphore; use tower::{Service, ServiceExt}; use cuprate_helper::network::Network; use cuprate_wire::{common::PeerSupportFlags, protocol::GetObjectsRequest, BasicNodeData}; use cuprate_p2p_core::{ - client::{ConnectRequest, Connector, HandShaker}, - network_zones::ClearNet, + client::{handshaker::HandshakerBuilder, ConnectRequest, Connector}, protocol::{PeerRequest, PeerResponse}, + ClearNet, ProtocolRequest, ProtocolResponse, }; use cuprate_test_utils::monerod::monerod; -mod utils; -use utils::*; - #[tokio::test] async fn get_single_block_from_monerod() { - let semaphore = Arc::new(Semaphore::new(10)); - let permit = semaphore.acquire_owned().await.unwrap(); - let monerod = monerod(["--out-peers=0"]).await; let our_basic_node_data = BasicNodeData { @@ -33,14 +24,7 @@ async fn get_single_block_from_monerod() { rpc_credits_per_hash: 0, }; - let handshaker = HandShaker::::new( - DummyAddressBook, - DummyPeerSyncSvc, - DummyCoreSyncSvc, - DummyPeerRequestHandlerSvc, - |_| futures::stream::pending(), - our_basic_node_data, - ); + let handshaker = HandshakerBuilder::::new(our_basic_node_data).build(); let mut connector = Connector::new(handshaker); @@ -50,22 +34,26 @@ async fn get_single_block_from_monerod() { .unwrap() .call(ConnectRequest { addr: monerod.p2p_addr(), - permit, + permit: None, }) .await .unwrap(); - let PeerResponse::GetObjects(obj) = connected_peer + let PeerResponse::Protocol(ProtocolResponse::GetObjects(obj)) = connected_peer .ready() .await .unwrap() - .call(PeerRequest::GetObjects(GetObjectsRequest { - blocks: hex::decode("418015bb9ae982a1975da7d79277c2705727a56894ba0fb246adaabb1f4632e3") + .call(PeerRequest::Protocol(ProtocolRequest::GetObjects( + GetObjectsRequest { + blocks: hex::decode( + "418015bb9ae982a1975da7d79277c2705727a56894ba0fb246adaabb1f4632e3", + ) .unwrap() .try_into() .unwrap(), - pruned: false, - })) + pruned: false, + }, + ))) .await .unwrap() else { diff --git a/p2p/p2p-core/tests/utils.rs b/p2p/p2p-core/tests/utils.rs deleted file mode 100644 index 9587bb58..00000000 --- a/p2p/p2p-core/tests/utils.rs +++ /dev/null @@ -1,110 +0,0 @@ -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; - -use futures::FutureExt; -use tower::Service; - -use cuprate_p2p_core::{ - services::{ - AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse, - PeerSyncRequest, PeerSyncResponse, - }, - NetworkZone, PeerRequest, PeerResponse, -}; - -#[derive(Clone)] -pub struct DummyAddressBook; - -impl Service> for DummyAddressBook { - type Response = AddressBookResponse; - type Error = tower::BoxError; - type Future = - Pin> + Send + 'static>>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: AddressBookRequest) -> Self::Future { - async move { - Ok(match req { - AddressBookRequest::GetWhitePeers(_) => AddressBookResponse::Peers(vec![]), - _ => AddressBookResponse::Ok, - }) - } - .boxed() - } -} - -#[derive(Clone)] -pub struct DummyCoreSyncSvc; - -impl Service for DummyCoreSyncSvc { - type Response = CoreSyncDataResponse; - type Error = tower::BoxError; - type Future = - Pin> + Send + 'static>>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, _: CoreSyncDataRequest) -> Self::Future { - async move { - Ok(CoreSyncDataResponse(cuprate_wire::CoreSyncData { - cumulative_difficulty: 1, - cumulative_difficulty_top64: 0, - current_height: 1, - pruning_seed: 0, - top_id: hex::decode( - "418015bb9ae982a1975da7d79277c2705727a56894ba0fb246adaabb1f4632e3", - ) - .unwrap() - .try_into() - .unwrap(), - top_version: 1, - })) - } - .boxed() - } -} - -#[derive(Clone)] -pub struct DummyPeerSyncSvc; - -impl Service> for DummyPeerSyncSvc { - type Error = tower::BoxError; - type Future = - Pin> + Send + 'static>>; - - type Response = PeerSyncResponse; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, _: PeerSyncRequest) -> Self::Future { - async { Ok(PeerSyncResponse::Ok) }.boxed() - } -} - -#[derive(Clone)] -pub struct DummyPeerRequestHandlerSvc; - -impl Service for DummyPeerRequestHandlerSvc { - type Response = PeerResponse; - type Error = tower::BoxError; - type Future = - Pin> + Send + 'static>>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, _: PeerRequest) -> Self::Future { - async move { Ok(PeerResponse::NA) }.boxed() - } -} diff --git a/p2p/p2p/Cargo.toml b/p2p/p2p/Cargo.toml index 507d3621..e9b03d2c 100644 --- a/p2p/p2p/Cargo.toml +++ b/p2p/p2p/Cargo.toml @@ -31,6 +31,7 @@ rand = { workspace = true, features = ["std", "std_rng"] } rand_distr = { workspace = true, features = ["std"] } hex = { workspace = true, features = ["std"] } tracing = { workspace = true, features = ["std", "attributes"] } +borsh = { workspace = true, features = ["derive", "std"] } [dev-dependencies] cuprate-test-utils = { path = "../../test-utils" } diff --git a/p2p/p2p/src/block_downloader/block_queue.rs b/p2p/p2p/src/block_downloader/block_queue.rs index b03d847d..d846c22c 100644 --- a/p2p/p2p/src/block_downloader/block_queue.rs +++ b/p2p/p2p/src/block_downloader/block_queue.rs @@ -113,11 +113,10 @@ impl BlockQueue { #[cfg(test)] mod tests { - use futures::StreamExt; - use std::{collections::BTreeSet, sync::Arc}; + use std::collections::BTreeSet; + use futures::StreamExt; use proptest::{collection::vec, prelude::*}; - use tokio::sync::Semaphore; use tokio_test::block_on; use cuprate_p2p_core::handles::HandleBuilder; @@ -126,8 +125,7 @@ mod tests { prop_compose! { fn ready_batch_strategy()(start_height in 0_u64..500_000_000) -> ReadyQueueBatch { - // TODO: The permit will not be needed here when - let (_, peer_handle) = HandleBuilder::new().with_permit(Arc::new(Semaphore::new(1)).try_acquire_owned().unwrap()).build(); + let (_, peer_handle) = HandleBuilder::new().build(); ReadyQueueBatch { start_height, diff --git a/p2p/p2p/src/block_downloader/download_batch.rs b/p2p/p2p/src/block_downloader/download_batch.rs index e9dfcb45..fbf33b15 100644 --- a/p2p/p2p/src/block_downloader/download_batch.rs +++ b/p2p/p2p/src/block_downloader/download_batch.rs @@ -8,7 +8,10 @@ use tracing::instrument; use cuprate_fixed_bytes::ByteArrayVec; use cuprate_helper::asynch::rayon_spawn_async; -use cuprate_p2p_core::{handles::ConnectionHandle, NetworkZone, PeerRequest, PeerResponse}; +use cuprate_p2p_core::{ + handles::ConnectionHandle, NetworkZone, PeerRequest, PeerResponse, ProtocolRequest, + ProtocolResponse, +}; use cuprate_wire::protocol::{GetObjectsRequest, GetObjectsResponse}; use crate::{ @@ -50,16 +53,15 @@ async fn request_batch_from_peer( previous_id: [u8; 32], expected_start_height: u64, ) -> Result<(ClientPoolDropGuard, BlockBatch), BlockDownloadError> { - // Request the blocks. + let request = PeerRequest::Protocol(ProtocolRequest::GetObjects(GetObjectsRequest { + blocks: ids.clone(), + pruned: false, + })); + + // Request the blocks and add a timeout to the request let blocks_response = timeout(BLOCK_DOWNLOADER_REQUEST_TIMEOUT, async { - let PeerResponse::GetObjects(blocks_response) = client - .ready() - .await? - .call(PeerRequest::GetObjects(GetObjectsRequest { - blocks: ids.clone(), - pruned: false, - })) - .await? + let PeerResponse::Protocol(ProtocolResponse::GetObjects(blocks_response)) = + client.ready().await?.call(request).await? else { panic!("Connection task returned wrong response."); }; diff --git a/p2p/p2p/src/block_downloader/request_chain.rs b/p2p/p2p/src/block_downloader/request_chain.rs index 471635bf..4b0b47e5 100644 --- a/p2p/p2p/src/block_downloader/request_chain.rs +++ b/p2p/p2p/src/block_downloader/request_chain.rs @@ -10,7 +10,7 @@ use cuprate_p2p_core::{ client::InternalPeerID, handles::ConnectionHandle, services::{PeerSyncRequest, PeerSyncResponse}, - NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc, + NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc, ProtocolRequest, ProtocolResponse, }; use cuprate_wire::protocol::{ChainRequest, ChainResponse}; @@ -34,13 +34,15 @@ pub async fn request_chain_entry_from_peer( mut client: ClientPoolDropGuard, short_history: [[u8; 32]; 2], ) -> Result<(ClientPoolDropGuard, ChainEntry), BlockDownloadError> { - let PeerResponse::GetChain(chain_res) = client + let PeerResponse::Protocol(ProtocolResponse::GetChain(chain_res)) = client .ready() .await? - .call(PeerRequest::GetChain(ChainRequest { - block_ids: short_history.into(), - prune: true, - })) + .call(PeerRequest::Protocol(ProtocolRequest::GetChain( + ChainRequest { + block_ids: short_history.into(), + prune: true, + }, + ))) .await? else { panic!("Connection task returned wrong response!"); @@ -132,10 +134,10 @@ where let mut futs = JoinSet::new(); - let req = PeerRequest::GetChain(ChainRequest { + let req = PeerRequest::Protocol(ProtocolRequest::GetChain(ChainRequest { block_ids: block_ids.into(), prune: false, - }); + })); tracing::debug!("Sending requests for chain entries."); @@ -149,7 +151,7 @@ where futs.spawn(timeout( BLOCK_DOWNLOADER_REQUEST_TIMEOUT, async move { - let PeerResponse::GetChain(chain_res) = + let PeerResponse::Protocol(ProtocolResponse::GetChain(chain_res)) = next_peer.ready().await?.call(cloned_req).await? else { panic!("connection task returned wrong response!"); diff --git a/p2p/p2p/src/block_downloader/tests.rs b/p2p/p2p/src/block_downloader/tests.rs index bf342727..981c557f 100644 --- a/p2p/p2p/src/block_downloader/tests.rs +++ b/p2p/p2p/src/block_downloader/tests.rs @@ -15,15 +15,15 @@ use monero_serai::{ transaction::{Input, Timelock, Transaction, TransactionPrefix}, }; use proptest::{collection::vec, prelude::*}; -use tokio::{sync::Semaphore, time::timeout}; +use tokio::time::timeout; use tower::{service_fn, Service}; use cuprate_fixed_bytes::ByteArrayVec; use cuprate_p2p_core::{ client::{mock_client, Client, InternalPeerID, PeerInformation}, - network_zones::ClearNet, services::{PeerSyncRequest, PeerSyncResponse}, - ConnectionDirection, NetworkZone, PeerRequest, PeerResponse, + ClearNet, ConnectionDirection, NetworkZone, PeerRequest, PeerResponse, ProtocolRequest, + ProtocolResponse, }; use cuprate_pruning::PruningSeed; use cuprate_wire::{ @@ -182,18 +182,15 @@ prop_compose! { } fn mock_block_downloader_client(blockchain: Arc) -> Client { - let semaphore = Arc::new(Semaphore::new(1)); - - let (connection_guard, connection_handle) = cuprate_p2p_core::handles::HandleBuilder::new() - .with_permit(semaphore.try_acquire_owned().unwrap()) - .build(); + let (connection_guard, connection_handle) = + cuprate_p2p_core::handles::HandleBuilder::new().build(); let request_handler = service_fn(move |req: PeerRequest| { let bc = blockchain.clone(); async move { match req { - PeerRequest::GetChain(chain_req) => { + PeerRequest::Protocol(ProtocolRequest::GetChain(chain_req)) => { let mut i = 0; while !bc.blocks.contains_key(&chain_req.block_ids[i]) { i += 1; @@ -215,18 +212,20 @@ fn mock_block_downloader_client(blockchain: Arc) -> Client>(); - Ok(PeerResponse::GetChain(ChainResponse { - start_height: 0, - total_height: 0, - cumulative_difficulty_low64: 1, - cumulative_difficulty_top64: 0, - m_block_ids: block_ids.into(), - m_block_weights: vec![], - first_block: Default::default(), - })) + Ok(PeerResponse::Protocol(ProtocolResponse::GetChain( + ChainResponse { + start_height: 0, + total_height: 0, + cumulative_difficulty_low64: 1, + cumulative_difficulty_top64: 0, + m_block_ids: block_ids.into(), + m_block_weights: vec![], + first_block: Default::default(), + }, + ))) } - PeerRequest::GetObjects(obj) => { + PeerRequest::Protocol(ProtocolRequest::GetObjects(obj)) => { let mut res = Vec::with_capacity(obj.blocks.len()); for i in 0..obj.blocks.len() { @@ -249,11 +248,13 @@ fn mock_block_downloader_client(blockchain: Arc) -> Client panic!(), } @@ -264,7 +265,7 @@ fn mock_block_downloader_client(blockchain: Arc) -> Client Service> for BroadcastSvc { // An error here means _all_ receivers were dropped which we assume will never happen. let _ = match direction { - Some(ConnectionDirection::InBound) => { + Some(ConnectionDirection::Inbound) => { self.tx_broadcast_channel_inbound.send(nex_tx_info) } - Some(ConnectionDirection::OutBound) => { + Some(ConnectionDirection::Outbound) => { self.tx_broadcast_channel_outbound.send(nex_tx_info) } None => { @@ -428,7 +428,7 @@ mod tests { .unwrap() .call(BroadcastRequest::Transaction { tx_bytes: Bytes::from_static(&[1]), - direction: Some(ConnectionDirection::OutBound), + direction: Some(ConnectionDirection::Outbound), received_from: None, }) .await @@ -440,7 +440,7 @@ mod tests { .unwrap() .call(BroadcastRequest::Transaction { tx_bytes: Bytes::from_static(&[2]), - direction: Some(ConnectionDirection::InBound), + direction: Some(ConnectionDirection::Inbound), received_from: None, }) .await diff --git a/p2p/p2p/src/client_pool.rs b/p2p/p2p/src/client_pool.rs index 711491d0..51f57e9f 100644 --- a/p2p/p2p/src/client_pool.rs +++ b/p2p/p2p/src/client_pool.rs @@ -9,7 +9,6 @@ //! //! Internally the pool is a [`DashMap`] which means care should be taken in `async` code //! as internally this uses blocking RwLocks. -//! use std::sync::Arc; use dashmap::DashMap; diff --git a/p2p/p2p/src/connection_maintainer.rs b/p2p/p2p/src/connection_maintainer.rs index 8e5c9bc3..2bcf2707 100644 --- a/p2p/p2p/src/connection_maintainer.rs +++ b/p2p/p2p/src/connection_maintainer.rs @@ -106,10 +106,6 @@ where panic!("No seed nodes available to get peers from"); } - // This isn't really needed here to limit connections as the seed nodes will be dropped when we have got - // peers from them. - let semaphore = Arc::new(Semaphore::new(seeds.len())); - let mut allowed_errors = seeds.len(); let mut handshake_futs = JoinSet::new(); @@ -125,10 +121,7 @@ where .expect("Connector had an error in `poll_ready`") .call(ConnectRequest { addr: *seed, - permit: semaphore - .clone() - .try_acquire_owned() - .expect("This must have enough permits as we just set the amount."), + permit: None, }), ); // Spawn the handshake on a separate task with a timeout, so we don't get stuck connecting to a peer. @@ -157,7 +150,10 @@ where .ready() .await .expect("Connector had an error in `poll_ready`") - .call(ConnectRequest { addr, permit }); + .call(ConnectRequest { + addr, + permit: Some(permit), + }); tokio::spawn( async move { diff --git a/p2p/p2p/src/inbound_server.rs b/p2p/p2p/src/inbound_server.rs index 6bc1e6d8..aa971a51 100644 --- a/p2p/p2p/src/inbound_server.rs +++ b/p2p/p2p/src/inbound_server.rs @@ -87,8 +87,8 @@ where addr, peer_stream, peer_sink, - direction: ConnectionDirection::InBound, - permit, + direction: ConnectionDirection::Inbound, + permit: Some(permit), }); let cloned_pool = client_pool.clone(); diff --git a/p2p/p2p/src/lib.rs b/p2p/p2p/src/lib.rs index 95154ec7..be18c2a3 100644 --- a/p2p/p2p/src/lib.rs +++ b/p2p/p2p/src/lib.rs @@ -4,7 +4,6 @@ //! a certain [`NetworkZone`] use std::sync::Arc; -use cuprate_async_buffer::BufferStream; use futures::FutureExt; use tokio::{ sync::{mpsc, watch}, @@ -14,11 +13,12 @@ use tokio_stream::wrappers::WatchStream; use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt}; use tracing::{instrument, Instrument, Span}; +use cuprate_async_buffer::BufferStream; use cuprate_p2p_core::{ client::Connector, client::InternalPeerID, services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest}, - CoreSyncSvc, NetworkZone, PeerRequestHandler, + CoreSyncSvc, NetworkZone, ProtocolRequestHandler, }; mod block_downloader; @@ -42,17 +42,18 @@ use connection_maintainer::MakeConnectionRequest; /// /// # Usage /// You must provide: -/// - A peer request handler, which is given to each connection +/// - A protocol request handler, which is given to each connection /// - A core sync service, which keeps track of the sync state of our node #[instrument(level = "debug", name = "net", skip_all, fields(zone = N::NAME))] -pub async fn initialize_network( - peer_req_handler: R, +pub async fn initialize_network( + protocol_request_handler: PR, core_sync_svc: CS, config: P2PConfig, ) -> Result, tower::BoxError> where N: NetworkZone, - R: PeerRequestHandler + Clone, + N::Addr: borsh::BorshDeserialize + borsh::BorshSerialize, + PR: ProtocolRequestHandler + Clone, CS: CoreSyncSvc + Clone, { let address_book = @@ -79,23 +80,21 @@ where basic_node_data.peer_id = 1; } - let outbound_handshaker = cuprate_p2p_core::client::HandShaker::new( - address_book.clone(), - sync_states_svc.clone(), - core_sync_svc.clone(), - peer_req_handler.clone(), - outbound_mkr, - basic_node_data.clone(), - ); + let outbound_handshaker_builder = + cuprate_p2p_core::client::HandshakerBuilder::new(basic_node_data) + .with_address_book(address_book.clone()) + .with_peer_sync_svc(sync_states_svc.clone()) + .with_core_sync_svc(core_sync_svc) + .with_protocol_request_handler(protocol_request_handler) + .with_broadcast_stream_maker(outbound_mkr) + .with_connection_parent_span(Span::current()); - let inbound_handshaker = cuprate_p2p_core::client::HandShaker::new( - address_book.clone(), - sync_states_svc.clone(), - core_sync_svc.clone(), - peer_req_handler, - inbound_mkr, - basic_node_data, - ); + let inbound_handshaker = outbound_handshaker_builder + .clone() + .with_broadcast_stream_maker(inbound_mkr) + .build(); + + let outbound_handshaker = outbound_handshaker_builder.build(); let client_pool = client_pool::ClientPool::new(); diff --git a/p2p/p2p/src/sync_states.rs b/p2p/p2p/src/sync_states.rs index 1b4e81ae..1484941f 100644 --- a/p2p/p2p/src/sync_states.rs +++ b/p2p/p2p/src/sync_states.rs @@ -238,9 +238,6 @@ impl Service> for PeerSyncSvc { #[cfg(test)] mod tests { - use std::sync::Arc; - - use tokio::sync::Semaphore; use tower::{Service, ServiceExt}; use cuprate_p2p_core::{ @@ -255,11 +252,7 @@ mod tests { #[tokio::test] async fn top_sync_channel_updates() { - let semaphore = Arc::new(Semaphore::new(1)); - - let (_g, handle) = HandleBuilder::new() - .with_permit(semaphore.try_acquire_owned().unwrap()) - .build(); + let (_g, handle) = HandleBuilder::new().build(); let (mut svc, mut watch) = PeerSyncSvc::>::new(); @@ -336,11 +329,7 @@ mod tests { #[tokio::test] async fn peer_sync_info_updates() { - let semaphore = Arc::new(Semaphore::new(1)); - - let (_g, handle) = HandleBuilder::new() - .with_permit(semaphore.try_acquire_owned().unwrap()) - .build(); + let (_g, handle) = HandleBuilder::new().build(); let (mut svc, _watch) = PeerSyncSvc::>::new();