diff --git a/Cargo.lock b/Cargo.lock index e9cb1b85..0750040e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1489,6 +1489,7 @@ dependencies = [ name = "monero-wire" version = "0.1.0" dependencies = [ + "bitflags 2.5.0", "bytes", "epee-encoding", "fixed-bytes", diff --git a/net/epee-encoding/src/container_as_blob.rs b/net/epee-encoding/src/container_as_blob.rs index 164acb25..084b43bb 100644 --- a/net/epee-encoding/src/container_as_blob.rs +++ b/net/epee-encoding/src/container_as_blob.rs @@ -39,11 +39,7 @@ impl EpeeValue for ContainerAsBlob { } Ok(ContainerAsBlob( - bytes - .windows(T::SIZE) - .step_by(T::SIZE) - .map(T::from_bytes) - .collect(), + bytes.chunks(T::SIZE).map(T::from_bytes).collect(), )) } diff --git a/net/epee-encoding/src/error.rs b/net/epee-encoding/src/error.rs index e344322c..4b3c7b0d 100644 --- a/net/epee-encoding/src/error.rs +++ b/net/epee-encoding/src/error.rs @@ -1,5 +1,8 @@ -use core::fmt::{Debug, Formatter}; -use core::{num::TryFromIntError, str::Utf8Error}; +use core::{ + fmt::{Debug, Formatter}, + num::TryFromIntError, + str::Utf8Error, +}; pub type Result = core::result::Result; diff --git a/net/epee-encoding/src/value.rs b/net/epee-encoding/src/value.rs index 540b1295..ef42241b 100644 --- a/net/epee-encoding/src/value.rs +++ b/net/epee-encoding/src/value.rs @@ -308,11 +308,7 @@ impl EpeeValue for ByteArrayVec { return Err(Error::Format("Byte array exceeded max length")); } - if r.remaining() - < usize::try_from(len)? - .checked_mul(N) - .ok_or(Error::Value("Length of field is too long".to_string()))? - { + if r.remaining() < usize::try_from(len)? { return Err(Error::IO("Not enough bytes to fill object")); } diff --git a/net/fixed-bytes/src/lib.rs b/net/fixed-bytes/src/lib.rs index c7b71151..8776d309 100644 --- a/net/fixed-bytes/src/lib.rs +++ b/net/fixed-bytes/src/lib.rs @@ -1,8 +1,9 @@ -use core::ops::Deref; -use std::fmt::{Debug, Formatter}; -use std::ops::Index; +use core::{ + fmt::{Debug, Formatter}, + ops::{Deref, Index}, +}; -use bytes::Bytes; +use bytes::{BufMut, Bytes, BytesMut}; #[cfg_attr(feature = "std", derive(thiserror::Error))] pub enum FixedByteError { @@ -101,6 +102,40 @@ impl ByteArrayVec { pub fn take_bytes(self) -> Bytes { self.0 } + + /// Splits the byte array vec into two at the given index. + /// + /// Afterwards self contains elements [0, at), and the returned [`ByteArrayVec`] contains elements [at, len). + /// + /// This is an O(1) operation that just increases the reference count and sets a few indices. + /// + /// # Panics + /// Panics if at > len. + pub fn split_off(&mut self, at: usize) -> Self { + Self(self.0.split_off(at * N)) + } +} + +impl From<&ByteArrayVec> for Vec<[u8; N]> { + fn from(value: &ByteArrayVec) -> Self { + let mut out = Vec::with_capacity(value.len()); + for i in 0..value.len() { + out.push(value[i]) + } + + out + } +} + +impl From> for ByteArrayVec { + fn from(value: Vec<[u8; N]>) -> Self { + let mut bytes = BytesMut::with_capacity(N * value.len()); + for i in value.into_iter() { + bytes.extend_from_slice(&i) + } + + ByteArrayVec(bytes.freeze()) + } } impl TryFrom for ByteArrayVec { @@ -115,8 +150,38 @@ impl TryFrom for ByteArrayVec { } } +impl From<[u8; N]> for ByteArrayVec { + fn from(value: [u8; N]) -> Self { + ByteArrayVec(Bytes::copy_from_slice(value.as_slice())) + } +} + +impl From<[[u8; N]; LEN]> for ByteArrayVec { + fn from(value: [[u8; N]; LEN]) -> Self { + let mut bytes = BytesMut::with_capacity(N * LEN); + + for val in value.into_iter() { + bytes.put_slice(val.as_slice()); + } + + ByteArrayVec(bytes.freeze()) + } +} + +impl TryFrom> for ByteArrayVec { + type Error = FixedByteError; + + fn try_from(value: Vec) -> Result { + if value.len() % N != 0 { + return Err(FixedByteError::InvalidLength); + } + + Ok(ByteArrayVec(Bytes::from(value))) + } +} + impl Index for ByteArrayVec { - type Output = [u8; 32]; + type Output = [u8; N]; fn index(&self, index: usize) -> &Self::Output { if (index + 1) * N > self.0.len() { diff --git a/net/monero-wire/Cargo.toml b/net/monero-wire/Cargo.toml index 611fb080..882b3644 100644 --- a/net/monero-wire/Cargo.toml +++ b/net/monero-wire/Cargo.toml @@ -15,7 +15,8 @@ levin-cuprate = {path="../levin"} epee-encoding = { path = "../epee-encoding" } fixed-bytes = { path = "../fixed-bytes" } -bytes = { workspace = true } +bitflags = { workspace = true, features = ["std"] } +bytes = { workspace = true, features = ["std"] } thiserror = { workspace = true } [dev-dependencies] diff --git a/net/monero-wire/src/p2p/admin.rs b/net/monero-wire/src/p2p/admin.rs index 95d2f1b0..95ffef2d 100644 --- a/net/monero-wire/src/p2p/admin.rs +++ b/net/monero-wire/src/p2p/admin.rs @@ -139,8 +139,7 @@ mod tests { my_port: 0, network_id: [ 18, 48, 241, 113, 97, 4, 65, 97, 23, 49, 0, 130, 22, 161, 161, 16, - ] - .into(), + ], peer_id: 9671405426614699871, support_flags: PeerSupportFlags::from(1_u32), rpc_port: 0, @@ -945,8 +944,7 @@ mod tests { my_port: 18080, network_id: [ 18, 48, 241, 113, 97, 4, 65, 97, 23, 49, 0, 130, 22, 161, 161, 16, - ] - .into(), + ], peer_id: 6037804360359455404, support_flags: PeerSupportFlags::from(1_u32), rpc_port: 18089, diff --git a/net/monero-wire/src/p2p/common.rs b/net/monero-wire/src/p2p/common.rs index c05bfcfa..74babefe 100644 --- a/net/monero-wire/src/p2p/common.rs +++ b/net/monero-wire/src/p2p/common.rs @@ -15,7 +15,9 @@ //! Common types that are used across multiple messages. +use bitflags::bitflags; use bytes::{Buf, BufMut, Bytes}; + use epee_encoding::{epee_object, EpeeValue, InnerMarker}; use fixed_bytes::ByteArray; @@ -24,6 +26,13 @@ use crate::NetworkAddress; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct PeerSupportFlags(u32); +bitflags! { + impl PeerSupportFlags: u32 { + const FLUFFY_BLOCKS = 0b0000_0001; + const _ = !0; + } +} + impl From for PeerSupportFlags { fn from(value: u32) -> Self { PeerSupportFlags(value) @@ -42,27 +51,14 @@ impl<'a> From<&'a PeerSupportFlags> for &'a u32 { } } -impl PeerSupportFlags { - //const FLUFFY_BLOCKS: u32 = 0b0000_0001; - - pub fn is_empty(&self) -> bool { - self.0 == 0 - } -} - -impl From for PeerSupportFlags { - fn from(value: u8) -> Self { - PeerSupportFlags(value.into()) - } -} - /// Basic Node Data, information on the connected peer #[derive(Debug, Clone, PartialEq, Eq)] pub struct BasicNodeData { /// Port pub my_port: u32, /// The Network Id - pub network_id: ByteArray<16>, + // We don't use ByteArray here to allow users to keep this data long term. + pub network_id: [u8; 16], /// Peer ID pub peer_id: u64, /// The Peers Support Flags @@ -79,7 +75,7 @@ pub struct BasicNodeData { epee_object! { BasicNodeData, my_port: u32, - network_id: ByteArray<16>, + network_id: [u8; 16], peer_id: u64, support_flags: PeerSupportFlags as u32 = 0_u32, rpc_port: u16 = 0_u16, @@ -101,7 +97,8 @@ pub struct CoreSyncData { /// (If this is not in the message the default is 0) pub pruning_seed: u32, /// Hash of the top block - pub top_id: ByteArray<32>, + // We don't use ByteArray here to allow users to keep this data long term. + pub top_id: [u8; 32], /// Version of the top block pub top_version: u8, } @@ -112,7 +109,7 @@ epee_object! { cumulative_difficulty_top64: u64 = 0_u64, current_height: u64, pruning_seed: u32 = 0_u32, - top_id: ByteArray<32>, + top_id: [u8; 32], top_version: u8 = 0_u8, } @@ -131,7 +128,7 @@ impl CoreSyncData { cumulative_difficulty_top64, current_height, pruning_seed, - top_id: top_id.into(), + top_id, top_version, } } diff --git a/net/monero-wire/src/p2p/protocol.rs b/net/monero-wire/src/p2p/protocol.rs index 1362e0ad..4dc9a928 100644 --- a/net/monero-wire/src/p2p/protocol.rs +++ b/net/monero-wire/src/p2p/protocol.rs @@ -114,7 +114,7 @@ pub struct ChainResponse { /// Total Height pub total_height: u64, /// Cumulative Difficulty Low - pub cumulative_difficulty: u64, + pub cumulative_difficulty_low64: u64, /// Cumulative Difficulty High pub cumulative_difficulty_top64: u64, /// Block IDs @@ -125,11 +125,19 @@ pub struct ChainResponse { pub first_block: Bytes, } +impl ChainResponse { + #[inline] + pub fn cumulative_difficulty(&self) -> u128 { + let cumulative_difficulty = self.cumulative_difficulty_top64 as u128; + cumulative_difficulty << 64 | self.cumulative_difficulty_low64 as u128 + } +} + epee_object!( ChainResponse, start_height: u64, total_height: u64, - cumulative_difficulty: u64, + cumulative_difficulty_low64("cumulative_difficulty"): u64, cumulative_difficulty_top64: u64 = 0_u64, m_block_ids: ByteArrayVec<32>, m_block_weights: Vec as ContainerAsBlob, diff --git a/p2p/address-book/src/book.rs b/p2p/address-book/src/book.rs index 4c3a773d..3a49c6be 100644 --- a/p2p/address-book/src/book.rs +++ b/p2p/address-book/src/book.rs @@ -84,7 +84,7 @@ impl AddressBook { let connected_peers = HashMap::new(); let mut peer_save_interval = interval(cfg.peer_save_period); - peer_save_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + peer_save_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); Self { white_list, @@ -236,7 +236,9 @@ impl AddressBook { ) { tracing::debug!("Received new peer list, length: {}", peer_list.len()); - peer_list.retain(|peer| { + peer_list.retain_mut(|peer| { + peer.adr.make_canonical(); + if !peer.adr.should_add_to_peer_list() { false } else { @@ -259,7 +261,7 @@ impl AddressBook { ) -> Option> { tracing::debug!("Retrieving random white peer"); self.white_list - .take_random_peer(&mut rand::thread_rng(), block_needed) + .take_random_peer(&mut rand::thread_rng(), block_needed, &self.anchor_list) } fn take_random_gray_peer( @@ -268,7 +270,7 @@ impl AddressBook { ) -> Option> { tracing::debug!("Retrieving random gray peer"); self.gray_list - .take_random_peer(&mut rand::thread_rng(), block_needed) + .take_random_peer(&mut rand::thread_rng(), block_needed, &HashSet::new()) } fn get_white_peers(&self, len: usize) -> Vec> { diff --git a/p2p/address-book/src/book/tests.rs b/p2p/address-book/src/book/tests.rs index 4e1fd877..1cb0fc85 100644 --- a/p2p/address-book/src/book/tests.rs +++ b/p2p/address-book/src/book/tests.rs @@ -1,8 +1,7 @@ use std::{path::PathBuf, sync::Arc, time::Duration}; use futures::StreamExt; -use tokio::sync::Semaphore; -use tokio::time::interval; +use tokio::{sync::Semaphore, time::interval}; use monero_p2p::handles::HandleBuilder; use monero_pruning::PruningSeed; diff --git a/p2p/address-book/src/lib.rs b/p2p/address-book/src/lib.rs index ce56b4f4..a3dc0543 100644 --- a/p2p/address-book/src/lib.rs +++ b/p2p/address-book/src/lib.rs @@ -82,5 +82,5 @@ pub async fn init_address_book( let address_book = book::AddressBook::::new(cfg, white_list, gray_list, Vec::new()); - Ok(Buffer::new(address_book, 15)) + Ok(Buffer::new(address_book, 150)) } diff --git a/p2p/address-book/src/peer_list.rs b/p2p/address-book/src/peer_list.rs index f2c192f0..2aaf432a 100644 --- a/p2p/address-book/src/peer_list.rs +++ b/p2p/address-book/src/peer_list.rs @@ -89,28 +89,42 @@ impl PeerList { &mut self, r: &mut R, block_needed: Option, + must_keep_peers: &HashSet, ) -> Option> { - if let Some(needed_height) = block_needed { - let (_, addresses_with_block) = self.pruning_seeds.iter().find(|(seed, _)| { - // TODO: factor in peer blockchain height? - seed.get_next_unpruned_block(needed_height, CRYPTONOTE_MAX_BLOCK_HEIGHT) - .expect("Block needed is higher than max block allowed.") - == needed_height - })?; - let n = r.gen_range(0..addresses_with_block.len()); - let peer = addresses_with_block[n]; - self.remove_peer(&peer) - } else { - let len = self.len(); - if len == 0 { - None - } else { - let n = r.gen_range(0..len); + // Take a random peer and see if it's in the list of must_keep_peers, if it is try again. + // TODO: improve this - let (&key, _) = self.peers.get_index(n).unwrap(); - self.remove_peer(&key) + for _ in 0..3 { + if let Some(needed_height) = block_needed { + let (_, addresses_with_block) = self.pruning_seeds.iter().find(|(seed, _)| { + // TODO: factor in peer blockchain height? + seed.get_next_unpruned_block(needed_height, CRYPTONOTE_MAX_BLOCK_HEIGHT) + .expect("Block needed is higher than max block allowed.") + == needed_height + })?; + let n = r.gen_range(0..addresses_with_block.len()); + let peer = addresses_with_block[n]; + if must_keep_peers.contains(&peer) { + continue; + } + + return self.remove_peer(&peer); + } + let len = self.len(); + + if len == 0 { + return None; + } + + let n = r.gen_range(0..len); + + let (&key, _) = self.peers.get_index(n).unwrap(); + if !must_keep_peers.contains(&key) { + return self.remove_peer(&key); } } + + None } pub fn get_random_peers( diff --git a/p2p/address-book/src/peer_list/tests.rs b/p2p/address-book/src/peer_list/tests.rs index ed9682eb..7aba0a20 100644 --- a/p2p/address-book/src/peer_list/tests.rs +++ b/p2p/address-book/src/peer_list/tests.rs @@ -87,7 +87,7 @@ fn peer_list_remove_specific_peer() { let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100); let peer = peer_list - .take_random_peer(&mut rand::thread_rng(), None) + .take_random_peer(&mut rand::thread_rng(), None, &HashSet::new()) .unwrap(); let pruning_idxs = peer_list.pruning_seeds; @@ -160,7 +160,7 @@ fn peer_list_get_peer_with_block() { peer_list.add_new_peer(make_fake_peer(101, Some(384))); let peer = peer_list - .take_random_peer(&mut r, Some(1)) + .take_random_peer(&mut r, Some(1), &HashSet::new()) .expect("We just added a peer with the correct seed"); assert!(peer @@ -173,7 +173,7 @@ fn peer_list_get_peer_with_block() { fn peer_list_ban_peers() { let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100); let peer = peer_list - .take_random_peer(&mut rand::thread_rng(), None) + .take_random_peer(&mut rand::thread_rng(), None, &HashSet::new()) .unwrap(); let ban_id = peer.adr.ban_id(); diff --git a/p2p/monero-p2p/Cargo.toml b/p2p/monero-p2p/Cargo.toml index 50202f82..e416fbbb 100644 --- a/p2p/monero-p2p/Cargo.toml +++ b/p2p/monero-p2p/Cargo.toml @@ -17,14 +17,14 @@ monero-pruning = { path = "../../pruning" } tokio = { workspace = true, features = ["net", "sync", "macros", "time"]} tokio-util = { workspace = true, features = ["codec"] } tokio-stream = { workspace = true, features = ["sync"]} -futures = { workspace = true, features = ["std", "async-await"] } +futures = { workspace = true, features = ["std"] } async-trait = { workspace = true } -tower = { workspace = true, features = ["util"] } +tower = { workspace = true, features = ["util", "tracing"] } thiserror = { workspace = true } tracing = { workspace = true, features = ["std", "attributes"] } -borsh = { workspace = true, default-features = false, features = ["derive", "std"], optional = true } +borsh = { workspace = true, features = ["derive", "std"], optional = true } [dev-dependencies] cuprate-test-utils = {path = "../../test-utils"} diff --git a/p2p/monero-p2p/src/client.rs b/p2p/monero-p2p/src/client.rs index 8e3ca488..8aab306d 100644 --- a/p2p/monero-p2p/src/client.rs +++ b/p2p/monero-p2p/src/client.rs @@ -1,32 +1,40 @@ -use std::fmt::Formatter; use std::{ - fmt::{Debug, Display}, - task::{Context, Poll}, + fmt::{Debug, Display, Formatter}, + sync::Arc, + task::{ready, Context, Poll}, }; use futures::channel::oneshot; -use tokio::{sync::mpsc, task::JoinHandle}; -use tokio_util::sync::PollSender; +use tokio::{ + sync::{mpsc, OwnedSemaphorePermit, Semaphore}, + task::JoinHandle, +}; +use tokio_util::sync::PollSemaphore; use tower::Service; use cuprate_helper::asynch::InfallibleOneshotReceiver; use crate::{ - handles::ConnectionHandle, NetworkZone, PeerError, PeerRequest, PeerResponse, SharedError, + handles::ConnectionHandle, ConnectionDirection, NetworkZone, PeerError, PeerRequest, + PeerResponse, SharedError, }; mod connection; mod connector; pub mod handshaker; +mod timeout_monitor; pub use connector::{ConnectRequest, Connector}; pub use handshaker::{DoHandshakeRequest, HandShaker, HandshakeError}; +use monero_pruning::PruningSeed; /// An internal identifier for a given peer, will be their address if known /// or a random u64 if not. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub enum InternalPeerID { + /// A known address. KnownAddr(A), + /// An unknown address (probably an inbound anonymity network connection). Unknown(u64), } @@ -34,38 +42,72 @@ impl Display for InternalPeerID { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { InternalPeerID::KnownAddr(addr) => addr.fmt(f), - InternalPeerID::Unknown(id) => f.write_str(&format!("Unknown addr, ID: {}", id)), + InternalPeerID::Unknown(id) => f.write_str(&format!("Unknown, ID: {id}")), } } } +/// Information on a connected peer. +#[derive(Debug, Clone)] +pub struct PeerInformation { + /// The internal peer ID of this peer. + pub id: InternalPeerID, + /// The [`ConnectionHandle`] for this peer, allows banning this peer and checking if it is still + /// alive. + pub handle: ConnectionHandle, + /// The direction of this connection (inbound|outbound). + pub direction: ConnectionDirection, + /// The peers pruning seed. + pub pruning_seed: PruningSeed, +} + +/// This represents a connection to a peer. +/// +/// It allows sending requests to the peer, but does only does minimal checks that the data returned +/// is the data asked for, i.e. for a certain request the only thing checked will be that the response +/// is the correct response for that request, not that the response contains the correct data. pub struct Client { - id: InternalPeerID, - handle: ConnectionHandle, + /// Information on the connected peer. + pub info: PeerInformation, - connection_tx: PollSender, + /// The channel to the [`Connection`](connection::Connection) task. + connection_tx: mpsc::Sender, + /// The [`JoinHandle`] of the spawned connection task. connection_handle: JoinHandle<()>, + /// The [`JoinHandle`] of the spawned timeout monitor task. + timeout_handle: JoinHandle>, + /// The semaphore that limits the requests sent to the peer. + semaphore: PollSemaphore, + /// A permit for the semaphore, will be [`Some`] after `poll_ready` returns ready. + permit: Option, + + /// The error slot shared between the [`Client`] and [`Connection`](connection::Connection). error: SharedError, } impl Client { - pub fn new( - id: InternalPeerID, - handle: ConnectionHandle, + /// Creates a new [`Client`]. + pub(crate) fn new( + info: PeerInformation, connection_tx: mpsc::Sender, connection_handle: JoinHandle<()>, + timeout_handle: JoinHandle>, + semaphore: Arc, error: SharedError, ) -> Self { Self { - id, - handle, - connection_tx: PollSender::new(connection_tx), + info, + connection_tx, + timeout_handle, + semaphore: PollSemaphore::new(semaphore), + permit: None, connection_handle, error, } } + /// Internal function to set an error on the [`SharedError`]. fn set_err(&self, err: PeerError) -> tower::BoxError { let err_str = err.to_string(); match self.error.try_insert_err(err) { @@ -86,25 +128,38 @@ impl Service for Client { return Poll::Ready(Err(err.to_string().into())); } - if self.connection_handle.is_finished() { + if self.connection_handle.is_finished() || self.timeout_handle.is_finished() { let err = self.set_err(PeerError::ClientChannelClosed); return Poll::Ready(Err(err)); } - self.connection_tx - .poll_reserve(cx) - .map_err(|_| PeerError::ClientChannelClosed.into()) + if self.permit.is_some() { + return Poll::Ready(Ok(())); + } + + let permit = ready!(self.semaphore.poll_acquire(cx)) + .expect("Client semaphore should not be closed!"); + + self.permit = Some(permit); + + Poll::Ready(Ok(())) } fn call(&mut self, request: PeerRequest) -> Self::Future { + let permit = self + .permit + .take() + .expect("poll_ready did not return ready before call to call"); + let (tx, rx) = oneshot::channel(); let req = connection::ConnectionTaskRequest { response_channel: tx, request, + permit: Some(permit), }; self.connection_tx - .send_item(req) + .try_send(req) .map_err(|_| ()) .expect("poll_ready should have been called"); diff --git a/p2p/monero-p2p/src/client/connection.rs b/p2p/monero-p2p/src/client/connection.rs index b458c3da..266dcf7f 100644 --- a/p2p/monero-p2p/src/client/connection.rs +++ b/p2p/monero-p2p/src/client/connection.rs @@ -1,37 +1,59 @@ -use std::sync::Arc; +//! The Connection Task +//! +//! This module handles routing requests from a [`Client`](crate::client::Client) or a broadcast channel to +//! a peer. This module also handles routing requests from the connected peer to a request handler. +//! +use std::pin::Pin; use futures::{ channel::oneshot, stream::{Fuse, FusedStream}, - SinkExt, StreamExt, + SinkExt, Stream, StreamExt, }; -use tokio::sync::{broadcast, mpsc}; -use tokio_stream::wrappers::{BroadcastStream, ReceiverStream}; +use tokio::{ + sync::{mpsc, OwnedSemaphorePermit}, + time::{sleep, timeout, Sleep}, +}; +use tokio_stream::wrappers::ReceiverStream; use tower::ServiceExt; use monero_wire::{LevinCommand, Message, ProtocolMessage}; use crate::{ - handles::ConnectionGuard, MessageID, NetworkZone, PeerBroadcast, PeerError, PeerRequest, - PeerRequestHandler, PeerResponse, SharedError, + constants::{REQUEST_TIMEOUT, SENDING_TIMEOUT}, + handles::ConnectionGuard, + BroadcastMessage, MessageID, NetworkZone, PeerError, PeerRequest, PeerRequestHandler, + PeerResponse, SharedError, }; +/// A request to the connection task from a [`Client`](crate::client::Client). pub struct ConnectionTaskRequest { + /// The request. pub request: PeerRequest, + /// The response channel. pub response_channel: oneshot::Sender>, + /// A permit for this request + pub permit: Option, } +/// The connection state. pub enum State { + /// Waiting for a request from Cuprate or the connected peer. WaitingForRequest, + /// Waiting for a response from the peer. WaitingForResponse { + /// The requests ID. request_id: MessageID, + /// The channel to send the response down. tx: oneshot::Sender>, + /// A permit for this request. + _req_permit: Option, }, } /// Returns if the [`LevinCommand`] is the correct response message for our request. /// -/// e.g that we didn't get a block for a txs request. +/// e.g. that we didn't get a block for a txs request. fn levin_command_response(message_id: &MessageID, command: LevinCommand) -> bool { matches!( (message_id, command), @@ -49,46 +71,82 @@ fn levin_command_response(message_id: &MessageID, command: LevinCommand) -> bool ) } -pub struct Connection { +/// This represents a connection to a peer. +pub struct Connection { + /// The peer sink - where we send messages to the peer. peer_sink: Z::Sink, + /// The connections current state. state: State, - client_rx: Fuse>, - broadcast_rx: Fuse>>, + /// Will be [`Some`] if we are expecting a response from the peer. + request_timeout: Option>>, + /// The client channel where requests from Cuprate to this peer will come from for us to route. + client_rx: Fuse>, + /// A stream of messages to broadcast from Cuprate. + broadcast_stream: Pin>, + + /// The inner handler for any requests that come from the requested peer. peer_request_handler: ReqHndlr, + /// The connection guard which will send signals to other parts of Cuprate when this connection is dropped. connection_guard: ConnectionGuard, + /// An error slot which is shared with the client. error: SharedError, } -impl Connection +impl Connection where ReqHndlr: PeerRequestHandler, + BrdcstStrm: Stream + Send + 'static, { + /// Create a new connection struct. pub fn new( peer_sink: Z::Sink, client_rx: mpsc::Receiver, - broadcast_rx: broadcast::Receiver>, + broadcast_stream: BrdcstStrm, peer_request_handler: ReqHndlr, connection_guard: ConnectionGuard, error: SharedError, - ) -> Connection { + ) -> Connection { Connection { peer_sink, state: State::WaitingForRequest, + request_timeout: None, client_rx: ReceiverStream::new(client_rx).fuse(), - broadcast_rx: BroadcastStream::new(broadcast_rx).fuse(), + broadcast_stream: Box::pin(broadcast_stream), peer_request_handler, connection_guard, error, } } + /// Sends a message to the peer, this function implements a timeout, so we don't get stuck sending a message to the + /// peer. async fn send_message_to_peer(&mut self, mes: Message) -> Result<(), PeerError> { - Ok(self.peer_sink.send(mes.into()).await?) + tracing::debug!("Sending message: [{}] to peer", mes.command()); + + timeout(SENDING_TIMEOUT, self.peer_sink.send(mes.into())) + .await + .map_err(|_| PeerError::TimedOut) + .and_then(|res| res.map_err(PeerError::BucketError)) } + /// Handles a broadcast request from Cuprate. + async fn handle_client_broadcast(&mut self, mes: BroadcastMessage) -> Result<(), PeerError> { + match mes { + BroadcastMessage::NewFluffyBlock(block) => { + self.send_message_to_peer(Message::Protocol(ProtocolMessage::NewFluffyBlock(block))) + .await + } + BroadcastMessage::NewTransaction(txs) => { + self.send_message_to_peer(Message::Protocol(ProtocolMessage::NewTransactions(txs))) + .await + } + } + } + + /// Handles a request from Cuprate, unlike a broadcast this request will be directed specifically at this peer. async fn handle_client_request(&mut self, req: ConnectionTaskRequest) -> Result<(), PeerError> { tracing::debug!("handling client request, id: {:?}", req.request.id()); @@ -96,21 +154,34 @@ where self.state = State::WaitingForResponse { request_id: req.request.id(), tx: req.response_channel, + _req_permit: req.permit, }; + self.send_message_to_peer(req.request.into()).await?; - } else { - let res = self.send_message_to_peer(req.request.into()).await; - if let Err(e) = res { - let err_str = e.to_string(); - let _ = req.response_channel.send(Err(err_str.clone().into())); - Err(e)? - } else { - req.response_channel.send(Ok(PeerResponse::NA)); - } + // Set the timeout after sending the message, TODO: Is this a good idea. + self.request_timeout = Some(Box::pin(sleep(REQUEST_TIMEOUT))); + return Ok(()); } + + // INVARIANT: This function cannot exit early without sending a response back down the + // response channel. + let res = self.send_message_to_peer(req.request.into()).await; + + // send the response now, the request does not need a response from the peer. + if let Err(e) = res { + // can't clone the error so turn it to a string first, hacky but oh well. + let err_str = e.to_string(); + let _ = req.response_channel.send(Err(err_str.clone().into())); + return Err(e); + } else { + // We still need to respond even if the response is this. + let _ = req.response_channel.send(Ok(PeerResponse::NA)); + } + Ok(()) } + /// Handles a request from the connected peer to this node. async fn handle_peer_request(&mut self, req: PeerRequest) -> Result<(), PeerError> { tracing::debug!("Received peer request: {:?}", req.id()); @@ -120,12 +191,19 @@ where return Ok(()); } - self.send_message_to_peer(res.try_into().unwrap()).await + self.send_message_to_peer( + res.try_into() + .expect("We just checked if the response was `NA`"), + ) + .await } + /// Handles a message from a peer when we are in [`State::WaitingForResponse`]. async fn handle_potential_response(&mut self, mes: Message) -> Result<(), PeerError> { tracing::debug!("Received peer message, command: {:?}", mes.command()); + // If the message is defiantly a request then there is no way it can be a response to + // our request. if mes.is_request() { return self.handle_peer_request(mes.try_into().unwrap()).await; } @@ -134,6 +212,7 @@ where panic!("Not in correct state, can't receive response!") }; + // Check if the message is a response to our request. if levin_command_response(request_id, mes.command()) { // TODO: Do more checks before returning response. @@ -143,7 +222,12 @@ where panic!("Not in correct state, can't receive response!") }; - let _ = tx.send(Ok(mes.try_into().unwrap())); + let _ = tx.send(Ok(mes + .try_into() + .map_err(|_| PeerError::PeerSentInvalidMessage)?)); + + self.request_timeout = None; + Ok(()) } else { self.handle_peer_request( @@ -154,15 +238,21 @@ where } } + /// The main-loop for when we are in [`State::WaitingForRequest`]. async fn state_waiting_for_request(&mut self, stream: &mut Str) -> Result<(), PeerError> where Str: FusedStream> + Unpin, { tracing::debug!("waiting for peer/client request."); + tokio::select! { biased; - broadcast_req = self.broadcast_rx.next() => { - todo!() + broadcast_req = self.broadcast_stream.next() => { + if let Some(broadcast_req) = broadcast_req { + self.handle_client_broadcast(broadcast_req).await + } else { + Err(PeerError::ClientChannelClosed) + } } client_req = self.client_rx.next() => { if let Some(client_req) = client_req { @@ -181,16 +271,26 @@ where } } + /// The main-loop for when we are in [`State::WaitingForResponse`]. async fn state_waiting_for_response(&mut self, stream: &mut Str) -> Result<(), PeerError> where Str: FusedStream> + Unpin, { - tracing::debug!("waiting for peer response.."); + tracing::debug!("waiting for peer response."); + tokio::select! { biased; - broadcast_req = self.broadcast_rx.next() => { - todo!() + _ = self.request_timeout.as_mut().expect("Request timeout was not set!") => { + Err(PeerError::ClientChannelClosed) } + broadcast_req = self.broadcast_stream.next() => { + if let Some(broadcast_req) = broadcast_req { + self.handle_client_broadcast(broadcast_req).await + } else { + Err(PeerError::ClientChannelClosed) + } + } + // We don't wait for client requests as we are already handling one. peer_message = stream.next() => { if let Some(peer_message) = peer_message { self.handle_potential_response(peer_message?).await @@ -201,6 +301,9 @@ where } } + /// Runs the Connection handler logic, this should be put in a separate task. + /// + /// `eager_protocol_messages` are protocol messages that we received during a handshake. pub async fn run(mut self, mut stream: Str, eager_protocol_messages: Vec) where Str: FusedStream> + Unpin, @@ -241,8 +344,11 @@ where } } + /// Shutdowns the connection, flushing pending requests and setting the error slot, if it hasn't been + /// set already. fn shutdown(mut self, err: PeerError) { tracing::debug!("Connection task shutting down: {}", err); + let mut client_rx = self.client_rx.into_inner().into_inner(); client_rx.close(); @@ -251,6 +357,12 @@ where tracing::debug!("Shared error already contains an error: {}", err); } + if let State::WaitingForResponse { tx, .. } = + std::mem::replace(&mut self.state, State::WaitingForRequest) + { + let _ = tx.send(Err(err_str.clone().into())); + } + while let Ok(req) = client_rx.try_recv() { let _ = req.response_channel.send(Err(err_str.clone().into())); } diff --git a/p2p/monero-p2p/src/client/connector.rs b/p2p/monero-p2p/src/client/connector.rs index 3f9f6047..278d7407 100644 --- a/p2p/monero-p2p/src/client/connector.rs +++ b/p2p/monero-p2p/src/client/connector.rs @@ -1,39 +1,58 @@ +//! Connector +//! +//! This module handles connecting to peers and giving the sink/stream to the handshaker which will then +//! perform a handshake and create a [`Client`]. +//! +//! This is where outbound connections are created. +//! use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; -use futures::FutureExt; +use futures::{FutureExt, Stream}; use tokio::sync::OwnedSemaphorePermit; use tower::{Service, ServiceExt}; use crate::{ client::{Client, DoHandshakeRequest, HandShaker, HandshakeError, InternalPeerID}, - AddressBook, ConnectionDirection, CoreSyncSvc, NetworkZone, PeerRequestHandler, + AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone, + PeerRequestHandler, PeerSyncSvc, }; +/// A request to connect to a peer. pub struct ConnectRequest { + /// The peer's address. pub addr: Z::Addr, + /// A permit which will be held be the connection allowing you to set limits on the number of + /// connections. pub permit: OwnedSemaphorePermit, } -pub struct Connector { - handshaker: HandShaker, +/// The connector service, this service connects to peer and returns the [`Client`]. +pub struct Connector { + handshaker: HandShaker, } -impl Connector { - pub fn new(handshaker: HandShaker) -> Self { +impl + Connector +{ + /// Create a new connector from a handshaker. + pub fn new(handshaker: HandShaker) -> Self { Self { handshaker } } } -impl Service> - for Connector +impl + Service> for Connector where AdrBook: AddressBook + Clone, CSync: CoreSyncSvc + Clone, + PSync: PeerSyncSvc + Clone, ReqHdlr: PeerRequestHandler + Clone, + BrdcstStrm: Stream + Send + 'static, + BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Clone + Send + 'static, { type Response = Client; type Error = HandshakeError; diff --git a/p2p/monero-p2p/src/client/handshaker.rs b/p2p/monero-p2p/src/client/handshaker.rs index bad4882b..03f3f563 100644 --- a/p2p/monero-p2p/src/client/handshaker.rs +++ b/p2p/monero-p2p/src/client/handshaker.rs @@ -10,16 +10,15 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, - time::Duration, }; -use futures::{FutureExt, SinkExt, StreamExt}; +use futures::{FutureExt, SinkExt, Stream, StreamExt}; use tokio::{ - sync::{broadcast, mpsc, OwnedSemaphorePermit}, + sync::{mpsc, OwnedSemaphorePermit, Semaphore}, time::{error::Elapsed, timeout}, }; use tower::{Service, ServiceExt}; -use tracing::{info_span, instrument, Instrument}; +use tracing::{info_span, Instrument}; use monero_pruning::{PruningError, PruningSeed}; use monero_wire::{ @@ -28,40 +27,25 @@ use monero_wire::{ PING_OK_RESPONSE_STATUS_TEXT, }, common::PeerSupportFlags, - BasicNodeData, BucketError, CoreSyncData, LevinCommand, Message, RequestMessage, - ResponseMessage, + BasicNodeData, BucketError, LevinCommand, Message, RequestMessage, ResponseMessage, }; use crate::{ - client::{connection::Connection, Client, InternalPeerID}, + client::{ + connection::Connection, timeout_monitor::connection_timeout_monitor_task, Client, + InternalPeerID, PeerInformation, + }, + constants::{ + HANDSHAKE_TIMEOUT, MAX_EAGER_PROTOCOL_MESSAGES, MAX_PEERS_IN_PEER_LIST_MESSAGE, + PING_TIMEOUT, + }, handles::HandleBuilder, - AddressBook, AddressBookRequest, AddressBookResponse, ConnectionDirection, CoreSyncDataRequest, - CoreSyncDataResponse, CoreSyncSvc, MessageID, NetZoneAddress, NetworkZone, PeerBroadcast, - PeerRequestHandler, SharedError, MAX_PEERS_IN_PEER_LIST_MESSAGE, + services::PeerSyncRequest, + AddressBook, AddressBookRequest, AddressBookResponse, BroadcastMessage, ConnectionDirection, + CoreSyncDataRequest, CoreSyncDataResponse, CoreSyncSvc, NetZoneAddress, NetworkZone, + PeerRequestHandler, PeerSyncSvc, SharedError, }; -/// This is a Cuprate specific constant. -/// -/// When completing a handshake monerod might send protocol messages before the handshake is actually -/// complete, this is a problem for Cuprate as we must complete the handshake before responding to any -/// protocol requests. So when we receive a protocol message during a handshake we keep them around to handle -/// after the handshake. -/// -/// Because we use the [bytes crate](https://crates.io/crates/bytes) in monero-wire for zero-copy parsing -/// it is not safe to keep too many of these messages around for long. -const MAX_EAGER_PROTOCOL_MESSAGES: usize = 1; -/// The time given to complete a handshake before the handshake fails. -const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(120); - -/// A timeout put on pings during handshakes. -/// -/// When we receive an inbound connection we open an outbound connection to the node and send a ping message -/// to see if we can reach the node, so we can add it to our address book. -/// -/// This timeout must be significantly shorter than [`HANDSHAKE_TIMEOUT`] so we don't drop inbound connections that -/// don't have ports open. -const PING_TIMEOUT: Duration = Duration::from_secs(10); - #[derive(Debug, thiserror::Error)] pub enum HandshakeError { #[error("The handshake timed out")] @@ -100,52 +84,60 @@ pub struct DoHandshakeRequest { /// The peer handshaking service. #[derive(Debug, Clone)] -pub struct HandShaker { +pub struct HandShaker { /// The address book service. address_book: AdrBook, /// The core sync data service. core_sync_svc: CSync, + /// The peer sync service. + peer_sync_svc: PSync, /// The peer request handler service. peer_request_svc: ReqHdlr, /// Our [`BasicNodeData`] our_basic_node_data: BasicNodeData, - /// The channel to broadcast messages to all peers created with this handshaker. - broadcast_tx: broadcast::Sender>, + /// A function that returns a stream that will give items to be broadcast by a connection. + broadcast_stream_maker: BrdcstStrmMkr, /// The network zone. _zone: PhantomData, } -impl HandShaker { +impl + HandShaker +{ /// Creates a new handshaker. pub fn new( address_book: AdrBook, + peer_sync_svc: PSync, core_sync_svc: CSync, peer_request_svc: ReqHdlr, - - broadcast_tx: broadcast::Sender>, + broadcast_stream_maker: BrdcstStrmMkr, our_basic_node_data: BasicNodeData, ) -> Self { Self { address_book, + peer_sync_svc, core_sync_svc, peer_request_svc, - broadcast_tx, + broadcast_stream_maker, our_basic_node_data, _zone: PhantomData, } } } -impl Service> - for HandShaker +impl + Service> for HandShaker where AdrBook: AddressBook + Clone, CSync: CoreSyncSvc + Clone, + PSync: PeerSyncSvc + Clone, ReqHdlr: PeerRequestHandler + Clone, + BrdcstStrm: Stream + Send + 'static, + BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Clone + Send + 'static, { type Response = Client; type Error = HandshakeError; @@ -157,11 +149,12 @@ where } fn call(&mut self, req: DoHandshakeRequest) -> Self::Future { - let broadcast_rx = self.broadcast_tx.subscribe(); + let broadcast_stream_maker = self.broadcast_stream_maker.clone(); let address_book = self.address_book.clone(); let peer_request_svc = self.peer_request_svc.clone(); let core_sync_svc = self.core_sync_svc.clone(); + let peer_sync_svc = self.peer_sync_svc.clone(); let our_basic_node_data = self.our_basic_node_data.clone(); let span = info_span!(parent: &tracing::Span::current(), "handshaker", addr=%req.addr); @@ -171,9 +164,10 @@ where HANDSHAKE_TIMEOUT, handshake( req, - broadcast_rx, + broadcast_stream_maker, address_book, core_sync_svc, + peer_sync_svc, peer_request_svc, our_basic_node_data, ), @@ -226,20 +220,24 @@ pub async fn ping(addr: N::Addr) -> Result } /// This function completes a handshake with the requested peer. -async fn handshake( +async fn handshake( req: DoHandshakeRequest, - broadcast_rx: broadcast::Receiver>, + broadcast_stream_maker: BrdcstStrmMkr, mut address_book: AdrBook, mut core_sync_svc: CSync, + mut peer_sync_svc: PSync, peer_request_svc: ReqHdlr, our_basic_node_data: BasicNodeData, ) -> Result, HandshakeError> where AdrBook: AddressBook, CSync: CoreSyncSvc, + PSync: PeerSyncSvc, ReqHdlr: PeerRequestHandler, + BrdcstStrm: Stream + Send + 'static, + BrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Send + 'static, { let DoHandshakeRequest { addr, @@ -253,7 +251,7 @@ where // see: [`MAX_EAGER_PROTOCOL_MESSAGES`] let mut eager_protocol_messages = Vec::new(); - let (peer_core_sync, mut peer_node_data) = match direction { + let (peer_core_sync, peer_node_data) = match direction { ConnectionDirection::InBound => { // Inbound handshake the peer sends the request. tracing::debug!("waiting for handshake request."); @@ -424,40 +422,40 @@ where } }; - // Tell the core sync service about the new peer. - core_sync_svc - .ready() - .await? - .call(CoreSyncDataRequest::HandleIncoming(peer_core_sync.clone())) - .await?; - tracing::debug!("Handshake complete."); // Set up the connection data. let error_slot = SharedError::new(); let (connection_guard, handle) = HandleBuilder::new().with_permit(permit).build(); - let (connection_tx, client_rx) = mpsc::channel(3); + let (connection_tx, client_rx) = mpsc::channel(1); - let connection = Connection::::new( + let connection = Connection::::new( peer_sink, client_rx, - broadcast_rx, + broadcast_stream_maker(addr), peer_request_svc, connection_guard, error_slot.clone(), ); - let connection_handle = - tokio::spawn(connection.run(peer_stream.fuse(), eager_protocol_messages)); - - let client = Client::::new( - addr, - handle.clone(), - connection_tx, - connection_handle, - error_slot, + let connection_span = tracing::error_span!(parent: &tracing::Span::none(), "connection", %addr); + let connection_handle = tokio::spawn( + connection + .run(peer_stream.fuse(), eager_protocol_messages) + .instrument(connection_span), ); + // Tell the core sync service about the new peer. + peer_sync_svc + .ready() + .await? + .call(PeerSyncRequest::IncomingCoreSyncData( + addr, + handle.clone(), + peer_core_sync, + )) + .await?; + // Tell the address book about the new connection. address_book .ready() @@ -465,7 +463,7 @@ where .call(AddressBookRequest::NewConnection { internal_peer_id: addr, public_address, - handle, + handle: handle.clone(), id: peer_node_data.peer_id, pruning_seed, rpc_port: peer_node_data.rpc_port, @@ -473,6 +471,34 @@ where }) .await?; + let info = PeerInformation { + id: addr, + handle, + direction, + pruning_seed, + }; + + let semaphore = Arc::new(Semaphore::new(1)); + + let timeout_handle = tokio::spawn(connection_timeout_monitor_task( + info.id, + info.handle.clone(), + connection_tx.clone(), + semaphore.clone(), + address_book, + core_sync_svc, + peer_sync_svc, + )); + + let client = Client::::new( + info, + connection_tx, + connection_handle, + timeout_handle, + semaphore, + error_slot, + ); + Ok(client) } @@ -485,14 +511,11 @@ async fn send_hs_request( where CSync: CoreSyncSvc, { - let CoreSyncDataResponse::Ours(our_core_sync_data) = core_sync_svc + let CoreSyncDataResponse(our_core_sync_data) = core_sync_svc .ready() .await? - .call(CoreSyncDataRequest::Ours) - .await? - else { - panic!("core sync service returned wrong response!"); - }; + .call(CoreSyncDataRequest) + .await?; let req = HandshakeRequest { node_data: our_basic_node_data, @@ -519,14 +542,11 @@ where AdrBook: AddressBook, CSync: CoreSyncSvc, { - let CoreSyncDataResponse::Ours(our_core_sync_data) = core_sync_svc + let CoreSyncDataResponse(our_core_sync_data) = core_sync_svc .ready() .await? - .call(CoreSyncDataRequest::Ours) - .await? - else { - panic!("core sync service returned wrong response!"); - }; + .call(CoreSyncDataRequest) + .await?; let AddressBookResponse::Peers(our_peer_list) = address_book .ready() @@ -612,7 +632,7 @@ async fn wait_for_message( continue; } RequestMessage::Ping => { - if !allow_support_flag_req { + if !allow_ping { return Err(HandshakeError::PeerSentInvalidMessage( "Peer sent 2 ping requests", )); diff --git a/p2p/monero-p2p/src/client/timeout_monitor.rs b/p2p/monero-p2p/src/client/timeout_monitor.rs new file mode 100644 index 00000000..dcdf85d7 --- /dev/null +++ b/p2p/monero-p2p/src/client/timeout_monitor.rs @@ -0,0 +1,135 @@ +//! Timeout Monitor +//! +//! This module holds the task that sends periodic [TimedSync](PeerRequest::TimedSync) requests to a peer to make +//! sure the connection is still active. +use std::sync::Arc; + +use futures::channel::oneshot; +use monero_wire::admin::TimedSyncRequest; +use tokio::{ + sync::{mpsc, Semaphore}, + time::{interval, MissedTickBehavior}, +}; +use tower::ServiceExt; +use tracing::instrument; + +use crate::{ + client::{connection::ConnectionTaskRequest, InternalPeerID}, + constants::{MAX_PEERS_IN_PEER_LIST_MESSAGE, TIMEOUT_INTERVAL}, + handles::ConnectionHandle, + services::{AddressBookRequest, CoreSyncDataRequest, CoreSyncDataResponse, PeerSyncRequest}, + AddressBook, CoreSyncSvc, NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc, +}; + +/// The timeout monitor task, this task will send periodic timed sync requests to the peer to make sure it is still active. +#[instrument( + name = "timeout_monitor", + level = "debug", + fields(addr = %id), + skip_all, +)] +pub async fn connection_timeout_monitor_task( + id: InternalPeerID, + handle: ConnectionHandle, + + connection_tx: mpsc::Sender, + semaphore: Arc, + + mut address_book_svc: AdrBook, + mut core_sync_svc: CSync, + mut peer_core_sync_svc: PSync, +) -> Result<(), tower::BoxError> +where + AdrBook: AddressBook, + CSync: CoreSyncSvc, + PSync: PeerSyncSvc, +{ + // Instead of tracking the time from last message from the peer and sending a timed sync if this value is too high, + // we just send a timed sync every [TIMEOUT_INTERVAL] seconds. + let mut interval = interval(TIMEOUT_INTERVAL); + + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + // The first tick ticks instantly. + interval.tick().await; + + loop { + interval.tick().await; + + tracing::trace!("timeout monitor tick."); + + if connection_tx.is_closed() { + tracing::debug!("Closing timeout monitor, connection disconnected."); + return Ok(()); + } + + let Ok(permit) = semaphore.clone().try_acquire_owned() else { + // If we can't get a permit the connection is currently waiting for a response, so no need to + // do a timed sync. + continue; + }; + + let ping_span = tracing::debug_span!("timed_sync"); + + // get our core sync data + tracing::trace!(parent: &ping_span, "Attempting to get our core sync data"); + let CoreSyncDataResponse(core_sync_data) = core_sync_svc + .ready() + .await? + .call(CoreSyncDataRequest) + .await?; + + let (tx, rx) = oneshot::channel(); + + // TODO: Instead of always sending timed syncs, send pings if we have a full peer list. + + tracing::debug!(parent: &ping_span, "Sending timed sync to peer"); + connection_tx + .send(ConnectionTaskRequest { + request: PeerRequest::TimedSync(TimedSyncRequest { + payload_data: core_sync_data, + }), + response_channel: tx, + permit: Some(permit), + }) + .await?; + + let PeerResponse::TimedSync(timed_sync) = rx.await?? else { + panic!("Connection task returned wrong response!"); + }; + + tracing::debug!( + parent: &ping_span, + "Received timed sync response, incoming peer list len: {}", + timed_sync.local_peerlist_new.len() + ); + + if timed_sync.local_peerlist_new.len() > MAX_PEERS_IN_PEER_LIST_MESSAGE { + return Err("Peer sent too many peers in peer list".into()); + } + + // Tell our address book about the new peers. + address_book_svc + .ready() + .await? + .call(AddressBookRequest::IncomingPeerList( + timed_sync + .local_peerlist_new + .into_iter() + .map(TryInto::try_into) + .collect::>()?, + )) + .await?; + + // Tell the peer sync service about the peers core sync data + peer_core_sync_svc + .ready() + .await? + .call(PeerSyncRequest::IncomingCoreSyncData( + id, + handle.clone(), + timed_sync.payload_data, + )) + .await?; + } +} diff --git a/p2p/monero-p2p/src/constants.rs b/p2p/monero-p2p/src/constants.rs new file mode 100644 index 00000000..c7b18f77 --- /dev/null +++ b/p2p/monero-p2p/src/constants.rs @@ -0,0 +1,43 @@ +//! Constants used around monero-p2p + +use std::time::Duration; + +/// The request timeout - the time we give a peer to respond to a request. +pub(crate) const REQUEST_TIMEOUT: Duration = Duration::from_secs(60); + +/// The timeout used when sending messages to a peer. +/// +/// TODO: Make this configurable? +/// TODO: Is this a good default. +pub(crate) const SENDING_TIMEOUT: Duration = Duration::from_secs(20); + +/// The interval between timed syncs. +/// +/// TODO: Make this configurable? +/// TODO: Is this a good default. +pub(crate) const TIMEOUT_INTERVAL: Duration = Duration::from_secs(61); + +/// This is a Cuprate specific constant. +/// +/// When completing a handshake monerod might send protocol messages before the handshake is actually +/// complete, this is a problem for Cuprate as we must complete the handshake before responding to any +/// protocol requests. So when we receive a protocol message during a handshake we keep them around to handle +/// after the handshake. +/// +/// Because we use the [bytes crate](https://crates.io/crates/bytes) in monero-wire for zero-copy parsing +/// it is not safe to keep too many of these messages around for long. +pub(crate) const MAX_EAGER_PROTOCOL_MESSAGES: usize = 1; + +/// A timeout put on pings during handshakes. +/// +/// When we receive an inbound connection we open an outbound connection to the node and send a ping message +/// to see if we can reach the node, so we can add it to our address book. +/// +/// This timeout must be significantly shorter than [`HANDSHAKE_TIMEOUT`] so we don't drop inbound connections that +/// don't have ports open. +pub(crate) const PING_TIMEOUT: Duration = Duration::from_secs(10); + +/// A timeout for a handshake - the handshake must complete before this. +pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(60); + +pub(crate) const MAX_PEERS_IN_PEER_LIST_MESSAGE: usize = 250; diff --git a/p2p/monero-p2p/src/error.rs b/p2p/monero-p2p/src/error.rs index 2b8ace84..e74a2bb6 100644 --- a/p2p/monero-p2p/src/error.rs +++ b/p2p/monero-p2p/src/error.rs @@ -30,6 +30,8 @@ impl SharedError { #[derive(Debug, thiserror::Error)] pub enum PeerError { + #[error("The connection timed out.")] + TimedOut, #[error("The connection was closed.")] ConnectionClosed, #[error("The connection tasks client channel was closed")] diff --git a/p2p/monero-p2p/src/handles.rs b/p2p/monero-p2p/src/handles.rs index ed8a30da..f3831708 100644 --- a/p2p/monero-p2p/src/handles.rs +++ b/p2p/monero-p2p/src/handles.rs @@ -7,9 +7,8 @@ use std::{ time::Duration, }; -use futures::SinkExt; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; -use tokio_util::sync::CancellationToken; +use tokio::sync::OwnedSemaphorePermit; +use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned}; /// A [`ConnectionHandle`] builder. #[derive(Default, Debug)] @@ -40,7 +39,7 @@ impl HandleBuilder { ( ConnectionGuard { token: token.clone(), - permit: self.permit.expect("connection permit was not set!"), + _permit: self.permit.expect("connection permit was not set!"), }, ConnectionHandle { token: token.clone(), @@ -57,7 +56,7 @@ pub struct BanPeer(pub Duration); /// A struct given to the connection task. pub struct ConnectionGuard { token: CancellationToken, - permit: OwnedSemaphorePermit, + _permit: OwnedSemaphorePermit, } impl ConnectionGuard { @@ -88,9 +87,13 @@ pub struct ConnectionHandle { } impl ConnectionHandle { + pub fn closed(&self) -> WaitForCancellationFutureOwned { + self.token.clone().cancelled_owned() + } /// Bans the peer for the given `duration`. pub fn ban_peer(&self, duration: Duration) { let _ = self.ban.set(BanPeer(duration)); + self.token.cancel(); } /// Checks if this connection is closed. pub fn is_closed(&self) -> bool { diff --git a/p2p/monero-p2p/src/lib.rs b/p2p/monero-p2p/src/lib.rs index 0105e7e1..9c171320 100644 --- a/p2p/monero-p2p/src/lib.rs +++ b/p2p/monero-p2p/src/lib.rs @@ -1,5 +1,17 @@ -#![allow(unused)] - +//! # Monero P2P +//! +//! This crate is general purpose P2P networking library for working with Monero. This is a low level +//! crate, which means it may seem verbose for a lot of use cases, if you want a crate that handles +//! more of the P2P logic have a look at `cuprate-p2p`. +//! +//! # Network Zones +//! +//! This crate abstracts over network zones, Tor/I2p/clearnet with the [NetworkZone] trait. Currently only clearnet is implemented: [ClearNet](network_zones::ClearNet). +//! +//! # Usage +//! +//! TODO +//! use std::{fmt::Debug, future::Future, hash::Hash, pin::Pin}; use futures::{Sink, Stream}; @@ -10,6 +22,7 @@ use monero_wire::{ }; pub mod client; +mod constants; pub mod error; pub mod handles; pub mod network_zones; @@ -20,8 +33,6 @@ pub use error::*; pub use protocol::*; use services::*; -const MAX_PEERS_IN_PEER_LIST_MESSAGE: usize = 250; - #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum ConnectionDirection { InBound, @@ -35,9 +46,9 @@ pub trait NetZoneAddress: + std::fmt::Display + Hash + Eq - + Clone + Copy + Send + + Sync + Unpin + 'static { @@ -48,6 +59,11 @@ pub trait NetZoneAddress: /// TODO: IP zone banning? type BanID: Debug + Hash + Eq + Clone + Copy + Send + 'static; + /// Changes the port of this address to `port`. + fn set_port(&mut self, port: u16); + + fn make_canonical(&mut self); + fn ban_id(&self) -> Self::BanID; fn should_add_to_peer_list(&self) -> bool; @@ -64,6 +80,7 @@ pub trait NetZoneAddress: + Eq + Copy + Send + + Sync + Unpin + 'static { @@ -77,6 +94,8 @@ pub trait NetZoneAddress: /// Changes the port of this address to `port`. fn set_port(&mut self, port: u16); + fn make_canonical(&mut self); + fn ban_id(&self) -> Self::BanID; fn should_add_to_peer_list(&self) -> bool; @@ -100,6 +119,8 @@ pub trait NetworkZone: Clone + Copy + Send + 'static { /// This has privacy implications on an anonymity network if true so should be set /// to false. const CHECK_NODE_ID: bool; + /// Fixed seed nodes for this network. + const SEEDS: &'static [Self::Addr]; /// The address type of this network. type Addr: NetZoneAddress; @@ -124,7 +145,35 @@ pub trait NetworkZone: Clone + Copy + Send + 'static { ) -> Result; } -pub(crate) trait AddressBook: +// #################################################################################### +// Below here is just helper traits, so we don't have to type out tower::Service bounds +// everywhere but still get to use tower. + +pub trait PeerSyncSvc: + tower::Service< + PeerSyncRequest, + Response = PeerSyncResponse, + Error = tower::BoxError, + Future = Self::Future2, + > + Send + + 'static +{ + // This allows us to put more restrictive bounds on the future without defining the future here + // explicitly. + type Future2: Future> + Send + 'static; +} + +impl PeerSyncSvc for T +where + T: tower::Service, Response = PeerSyncResponse, Error = tower::BoxError> + + Send + + 'static, + T::Future: Future> + Send + 'static, +{ + type Future2 = T::Future; +} + +pub trait AddressBook: tower::Service< AddressBookRequest, Response = AddressBookResponse, @@ -151,7 +200,7 @@ where type Future2 = T::Future; } -pub(crate) trait CoreSyncSvc: +pub trait CoreSyncSvc: tower::Service< CoreSyncDataRequest, Response = CoreSyncDataResponse, @@ -183,7 +232,7 @@ impl CoreSyncSvc for T where { } -pub(crate) trait PeerRequestHandler: +pub trait PeerRequestHandler: tower::Service< PeerRequest, Response = PeerResponse, diff --git a/p2p/monero-p2p/src/network_zones/clear.rs b/p2p/monero-p2p/src/network_zones/clear.rs index 7c3c599a..5141a069 100644 --- a/p2p/monero-p2p/src/network_zones/clear.rs +++ b/p2p/monero-p2p/src/network_zones/clear.rs @@ -1,8 +1,8 @@ -use std::net::{IpAddr, SocketAddr}; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use monero_wire::MoneroWireCodec; +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + pin::Pin, + task::{Context, Poll}, +}; use futures::Stream; use tokio::net::{ @@ -11,6 +11,8 @@ use tokio::net::{ }; use tokio_util::codec::{FramedRead, FramedWrite}; +use monero_wire::MoneroWireCodec; + use crate::{NetZoneAddress, NetworkZone}; impl NetZoneAddress for SocketAddr { @@ -24,8 +26,14 @@ impl NetZoneAddress for SocketAddr { self.ip() } + fn make_canonical(&mut self) { + let ip = self.ip().to_canonical(); + self.set_ip(ip); + } + fn should_add_to_peer_list(&self) -> bool { - todo!() + // TODO + true } } @@ -36,9 +44,19 @@ pub struct ClearNetServerCfg { #[derive(Clone, Copy)] pub enum ClearNet {} +const fn ip_v4(a: u8, b: u8, c: u8, d: u8, port: u16) -> SocketAddr { + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(a, b, c, d)), port) +} + #[async_trait::async_trait] impl NetworkZone for ClearNet { const NAME: &'static str = "ClearNet"; + + const SEEDS: &'static [Self::Addr] = &[ + ip_v4(37, 187, 74, 171, 18080), + ip_v4(192, 99, 8, 110, 18080), + ]; + const ALLOW_SYNC: bool = true; const DANDELION_PP: bool = true; const CHECK_NODE_ID: bool = true; @@ -85,7 +103,10 @@ impl Stream for InBoundStream { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.listener .poll_accept(cx) - .map_ok(|(stream, addr)| { + .map_ok(|(stream, mut addr)| { + let ip = addr.ip().to_canonical(); + addr.set_ip(ip); + let (read, write) = stream.into_split(); ( Some(addr), diff --git a/p2p/monero-p2p/src/protocol.rs b/p2p/monero-p2p/src/protocol.rs index a4fb6e9e..10157ae6 100644 --- a/p2p/monero-p2p/src/protocol.rs +++ b/p2p/monero-p2p/src/protocol.rs @@ -1,28 +1,27 @@ -/// This module defines InternalRequests and InternalResponses. Cuprate's P2P works by translating network messages into an internal -/// request/ response, this is easy for levin "requests" and "responses" (admin messages) but takes a bit more work with "notifications" -/// (protocol messages). -/// -/// Some notifications are easy to translate, like `GetObjectsRequest` is obviously a request but others like `NewFluffyBlock` are a -/// bit tricker. To translate a `NewFluffyBlock` into a request/ response we will have to look to see if we asked for `FluffyMissingTransactionsRequest` -/// if we have we interpret `NewFluffyBlock` as a response if not its a request that doesn't require a response. -/// -/// Here is every P2P request/ response. *note admin messages are already request/ response so "Handshake" is actually made of a HandshakeRequest & HandshakeResponse -/// -/// Admin: -/// Handshake, -/// TimedSync, -/// Ping, -/// SupportFlags -/// Protocol: -/// Request: GetObjectsRequest, Response: GetObjectsResponse, -/// Request: ChainRequest, Response: ChainResponse, -/// Request: FluffyMissingTransactionsRequest, Response: NewFluffyBlock, <- these 2 could be requests or responses -/// Request: GetTxPoolCompliment, Response: NewTransactions, <- -/// Request: NewBlock, Response: None, -/// Request: NewFluffyBlock, Response: None, -/// Request: NewTransactions, Response: None -/// -/// +//! This module defines InternalRequests and InternalResponses. Cuprate's P2P works by translating network messages into an internal +//! request/ response, this is easy for levin "requests" and "responses" (admin messages) but takes a bit more work with "notifications" +//! (protocol messages). +//! +//! Some notifications are easy to translate, like `GetObjectsRequest` is obviously a request but others like `NewFluffyBlock` are a +//! bit tri cker. To translate a `NewFluffyBlock` into a request/ response we will have to look to see if we asked for `FluffyMissingTransactionsRequest` +//! if we have we interpret `NewFluffyBlock` as a response if not its a request that doesn't require a response. +//! +//! Here is every P2P request/ response. *note admin messages are already request/ response so "Handshake" is actually made of a HandshakeRequest & HandshakeResponse +//! +//! Admin: +//! Handshake, +//! TimedSync, +//! Ping, +//! SupportFlags +//! Protocol: +//! Request: GetObjectsRequest, Response: GetObjectsResponse, +//! Request: ChainRequest, Response: ChainResponse, +//! Request: FluffyMissingTransactionsRequest, Response: NewFluffyBlock, <- these 2 could be requests or responses +//! Request: GetTxPoolCompliment, Response: NewTransactions, <- +//! Request: NewBlock, Response: None, +//! Request: NewFluffyBlock, Response: None, +//! Request: NewTransactions, Response: None +//! use monero_wire::{ admin::{ HandshakeRequest, HandshakeResponse, PingResponse, SupportFlagsResponse, TimedSyncRequest, @@ -55,13 +54,12 @@ pub enum MessageID { NewTransactions, } -/// This is a sub-set of [`PeerRequest`] for requests that should be sent to all nodes. -pub enum PeerBroadcast { - Transactions(NewTransactions), - NewBlock(NewBlock), +pub enum BroadcastMessage { NewFluffyBlock(NewFluffyBlock), + NewTransaction(NewTransactions), } +#[derive(Debug, Clone)] pub enum PeerRequest { Handshake(HandshakeRequest), TimedSync(TimedSyncRequest), @@ -105,6 +103,7 @@ impl PeerRequest { } } +#[derive(Debug, Clone)] pub enum PeerResponse { Handshake(HandshakeResponse), TimedSync(TimedSyncResponse), diff --git a/p2p/monero-p2p/src/services.rs b/p2p/monero-p2p/src/services.rs index 6c6df6ce..e86e2776 100644 --- a/p2p/monero-p2p/src/services.rs +++ b/p2p/monero-p2p/src/services.rs @@ -1,21 +1,35 @@ use monero_pruning::{PruningError, PruningSeed}; -use monero_wire::{NetZone, NetworkAddress, PeerListEntryBase}; +use monero_wire::{CoreSyncData, PeerListEntryBase}; use crate::{ client::InternalPeerID, handles::ConnectionHandle, NetZoneAddress, NetworkAddressIncorrectZone, NetworkZone, }; -pub enum CoreSyncDataRequest { - Ours, - HandleIncoming(monero_wire::CoreSyncData), +pub enum PeerSyncRequest { + /// Request some peers to sync from. + /// + /// This takes in the current cumulative difficulty of our chain and will return peers that + /// claim to have a higher cumulative difficulty. + PeersToSyncFrom { + current_cumulative_difficulty: u128, + block_needed: Option, + }, + /// Add/update a peers core sync data to the sync state service. + IncomingCoreSyncData(InternalPeerID, ConnectionHandle, CoreSyncData), } -pub enum CoreSyncDataResponse { - Ours(monero_wire::CoreSyncData), +pub enum PeerSyncResponse { + /// The return value of [`PeerSyncRequest::PeersToSyncFrom`]. + PeersToSyncFrom(Vec>), + /// A generic ok response. Ok, } +pub struct CoreSyncDataRequest; + +pub struct CoreSyncDataResponse(pub CoreSyncData); + #[derive(Debug, Copy, Clone, Eq, PartialEq)] #[cfg_attr( feature = "borsh", diff --git a/p2p/monero-p2p/tests/fragmented_handshake.rs b/p2p/monero-p2p/tests/fragmented_handshake.rs index fdc25193..60d490f8 100644 --- a/p2p/monero-p2p/tests/fragmented_handshake.rs +++ b/p2p/monero-p2p/tests/fragmented_handshake.rs @@ -13,7 +13,7 @@ use tokio::{ tcp::{OwnedReadHalf, OwnedWriteHalf}, TcpListener, TcpStream, }, - sync::{broadcast, Semaphore}, + sync::Semaphore, time::timeout, }; use tokio_util::{ @@ -47,6 +47,7 @@ pub enum FragNet {} #[async_trait::async_trait] impl NetworkZone for FragNet { const NAME: &'static str = "FragNet"; + const SEEDS: &'static [Self::Addr] = &[]; const ALLOW_SYNC: bool = true; const DANDELION_PP: bool = true; const CHECK_NODE_ID: bool = true; @@ -133,7 +134,6 @@ impl Encoder> for FragmentCodec { #[tokio::test] async fn fragmented_handshake_cuprate_to_monerod() { - let (broadcast_tx, _) = broadcast::channel(1); // this isn't actually used in this test. let semaphore = Arc::new(Semaphore::new(10)); let permit = semaphore.acquire_owned().await.unwrap(); @@ -141,18 +141,19 @@ async fn fragmented_handshake_cuprate_to_monerod() { let our_basic_node_data = BasicNodeData { my_port: 0, - network_id: Network::Mainnet.network_id().into(), + network_id: Network::Mainnet.network_id(), peer_id: 87980, support_flags: PeerSupportFlags::from(1_u32), rpc_port: 0, rpc_credits_per_hash: 0, }; - let handshaker = HandShaker::::new( + let handshaker = HandShaker::::new( DummyAddressBook, + DummyPeerSyncSvc, DummyCoreSyncSvc, DummyPeerRequestHandlerSvc, - broadcast_tx, + |_| futures::stream::pending(), our_basic_node_data, ); @@ -172,24 +173,24 @@ async fn fragmented_handshake_cuprate_to_monerod() { #[tokio::test] async fn fragmented_handshake_monerod_to_cuprate() { - let (broadcast_tx, _) = broadcast::channel(1); // this isn't actually used in this test. let semaphore = Arc::new(Semaphore::new(10)); let permit = semaphore.acquire_owned().await.unwrap(); let our_basic_node_data = BasicNodeData { my_port: 18081, - network_id: Network::Mainnet.network_id().into(), + network_id: Network::Mainnet.network_id(), peer_id: 87980, support_flags: PeerSupportFlags::from(1_u32), rpc_port: 0, rpc_credits_per_hash: 0, }; - let mut handshaker = HandShaker::::new( + let mut handshaker = HandShaker::::new( DummyAddressBook, + DummyPeerSyncSvc, DummyCoreSyncSvc, DummyPeerRequestHandlerSvc, - broadcast_tx, + |_| futures::stream::pending(), our_basic_node_data, ); diff --git a/p2p/monero-p2p/tests/handshake.rs b/p2p/monero-p2p/tests/handshake.rs index 2634263d..1d8b649c 100644 --- a/p2p/monero-p2p/tests/handshake.rs +++ b/p2p/monero-p2p/tests/handshake.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration}; use futures::StreamExt; use tokio::{ io::{duplex, split}, - sync::{broadcast, Semaphore}, + sync::Semaphore, time::timeout, }; use tokio_util::codec::{FramedRead, FramedWrite}; @@ -31,14 +31,13 @@ async fn handshake_cuprate_to_cuprate() { // Tests a Cuprate <-> Cuprate handshake by making 2 handshake services and making them talk to // each other. - let (broadcast_tx, _) = broadcast::channel(1); // this isn't actually used in this test. let semaphore = Arc::new(Semaphore::new(10)); let permit_1 = semaphore.clone().acquire_owned().await.unwrap(); let permit_2 = semaphore.acquire_owned().await.unwrap(); let our_basic_node_data_1 = BasicNodeData { my_port: 0, - network_id: Network::Mainnet.network_id().into(), + network_id: Network::Mainnet.network_id(), peer_id: 87980, // TODO: This fails if the support flags are empty (0) support_flags: PeerSupportFlags::from(1_u32), @@ -49,19 +48,21 @@ async fn handshake_cuprate_to_cuprate() { let mut our_basic_node_data_2 = our_basic_node_data_1.clone(); our_basic_node_data_2.peer_id = 2344; - let mut handshaker_1 = HandShaker::, _, _, _>::new( + let mut handshaker_1 = HandShaker::, _, _, _, _, _>::new( DummyAddressBook, + DummyPeerSyncSvc, DummyCoreSyncSvc, DummyPeerRequestHandlerSvc, - broadcast_tx.clone(), + |_| futures::stream::pending(), our_basic_node_data_1, ); - let mut handshaker_2 = HandShaker::, _, _, _>::new( + let mut handshaker_2 = HandShaker::, _, _, _, _, _>::new( DummyAddressBook, + DummyPeerSyncSvc, DummyCoreSyncSvc, DummyPeerRequestHandlerSvc, - broadcast_tx.clone(), + |_| futures::stream::pending(), our_basic_node_data_2, ); @@ -106,14 +107,13 @@ async fn handshake_cuprate_to_cuprate() { .unwrap() }); - let (res1, res2) = futures::join!(p1, p2); + let (res1, res2) = tokio::join!(p1, p2); res1.unwrap(); res2.unwrap(); } #[tokio::test] async fn handshake_cuprate_to_monerod() { - let (broadcast_tx, _) = broadcast::channel(1); // this isn't actually used in this test. let semaphore = Arc::new(Semaphore::new(10)); let permit = semaphore.acquire_owned().await.unwrap(); @@ -121,18 +121,19 @@ async fn handshake_cuprate_to_monerod() { let our_basic_node_data = BasicNodeData { my_port: 0, - network_id: Network::Mainnet.network_id().into(), + network_id: Network::Mainnet.network_id(), peer_id: 87980, support_flags: PeerSupportFlags::from(1_u32), rpc_port: 0, rpc_credits_per_hash: 0, }; - let handshaker = HandShaker::::new( + let handshaker = HandShaker::::new( DummyAddressBook, + DummyPeerSyncSvc, DummyCoreSyncSvc, DummyPeerRequestHandlerSvc, - broadcast_tx, + |_| futures::stream::pending(), our_basic_node_data, ); @@ -152,24 +153,24 @@ async fn handshake_cuprate_to_monerod() { #[tokio::test] async fn handshake_monerod_to_cuprate() { - let (broadcast_tx, _) = broadcast::channel(1); // this isn't actually used in this test. let semaphore = Arc::new(Semaphore::new(10)); let permit = semaphore.acquire_owned().await.unwrap(); let our_basic_node_data = BasicNodeData { my_port: 18081, - network_id: Network::Mainnet.network_id().into(), + network_id: Network::Mainnet.network_id(), peer_id: 87980, support_flags: PeerSupportFlags::from(1_u32), rpc_port: 0, rpc_credits_per_hash: 0, }; - let mut handshaker = HandShaker::::new( + let mut handshaker = HandShaker::::new( DummyAddressBook, + DummyPeerSyncSvc, DummyCoreSyncSvc, DummyPeerRequestHandlerSvc, - broadcast_tx, + |_| futures::stream::pending(), our_basic_node_data, ); diff --git a/p2p/monero-p2p/tests/sending_receiving.rs b/p2p/monero-p2p/tests/sending_receiving.rs new file mode 100644 index 00000000..fc5c369b --- /dev/null +++ b/p2p/monero-p2p/tests/sending_receiving.rs @@ -0,0 +1,78 @@ +use std::sync::Arc; + +use tokio::sync::Semaphore; +use tower::{Service, ServiceExt}; + +use cuprate_helper::network::Network; +use monero_wire::{common::PeerSupportFlags, protocol::GetObjectsRequest, BasicNodeData}; + +use monero_p2p::{ + client::{ConnectRequest, Connector, HandShaker}, + network_zones::ClearNet, + protocol::{PeerRequest, PeerResponse}, +}; + +use cuprate_test_utils::monerod::monerod; + +mod utils; +use utils::*; + +#[tokio::test] +async fn get_single_block_from_monerod() { + let semaphore = Arc::new(Semaphore::new(10)); + let permit = semaphore.acquire_owned().await.unwrap(); + + let monerod = monerod(["--out-peers=0"]).await; + + let our_basic_node_data = BasicNodeData { + my_port: 0, + network_id: Network::Mainnet.network_id(), + peer_id: 87980, + support_flags: PeerSupportFlags::FLUFFY_BLOCKS, + rpc_port: 0, + rpc_credits_per_hash: 0, + }; + + let handshaker = HandShaker::::new( + DummyAddressBook, + DummyPeerSyncSvc, + DummyCoreSyncSvc, + DummyPeerRequestHandlerSvc, + |_| futures::stream::pending(), + our_basic_node_data, + ); + + let mut connector = Connector::new(handshaker); + + let mut connected_peer = connector + .ready() + .await + .unwrap() + .call(ConnectRequest { + addr: monerod.p2p_addr(), + permit, + }) + .await + .unwrap(); + + let PeerResponse::GetObjects(obj) = connected_peer + .ready() + .await + .unwrap() + .call(PeerRequest::GetObjects(GetObjectsRequest { + blocks: hex::decode("418015bb9ae982a1975da7d79277c2705727a56894ba0fb246adaabb1f4632e3") + .unwrap() + .try_into() + .unwrap(), + pruned: false, + })) + .await + .unwrap() + else { + panic!("Client returned wrong response"); + }; + + assert_eq!(obj.blocks.len(), 1); + assert_eq!(obj.missed_ids.len(), 0); + assert_eq!(obj.current_blockchain_height, 1); +} diff --git a/p2p/monero-p2p/tests/utils.rs b/p2p/monero-p2p/tests/utils.rs index e6b457f7..9836cbfa 100644 --- a/p2p/monero-p2p/tests/utils.rs +++ b/p2p/monero-p2p/tests/utils.rs @@ -10,6 +10,7 @@ use tower::Service; use monero_p2p::{ services::{ AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse, + PeerSyncRequest, PeerSyncResponse, }, NetworkZone, PeerRequest, PeerResponse, }; @@ -51,31 +52,45 @@ impl Service for DummyCoreSyncSvc { Poll::Ready(Ok(())) } - fn call(&mut self, req: CoreSyncDataRequest) -> Self::Future { + fn call(&mut self, _: CoreSyncDataRequest) -> Self::Future { async move { - match req { - CoreSyncDataRequest::Ours => { - Ok(CoreSyncDataResponse::Ours(monero_wire::CoreSyncData { - cumulative_difficulty: 1, - cumulative_difficulty_top64: 0, - current_height: 1, - pruning_seed: 0, - top_id: hex::decode( - "418015bb9ae982a1975da7d79277c2705727a56894ba0fb246adaabb1f4632e3", - ) - .unwrap() - .try_into() - .unwrap(), - top_version: 1, - })) - } - CoreSyncDataRequest::HandleIncoming(_) => Ok(CoreSyncDataResponse::Ok), - } + Ok(CoreSyncDataResponse(monero_wire::CoreSyncData { + cumulative_difficulty: 1, + cumulative_difficulty_top64: 0, + current_height: 1, + pruning_seed: 0, + top_id: hex::decode( + "418015bb9ae982a1975da7d79277c2705727a56894ba0fb246adaabb1f4632e3", + ) + .unwrap() + .try_into() + .unwrap(), + top_version: 1, + })) } .boxed() } } +#[derive(Clone)] +pub struct DummyPeerSyncSvc; + +impl Service> for DummyPeerSyncSvc { + type Error = tower::BoxError; + type Future = + Pin> + Send + 'static>>; + + type Response = PeerSyncResponse; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _: PeerSyncRequest) -> Self::Future { + async { Ok(PeerSyncResponse::Ok) }.boxed() + } +} + #[derive(Clone)] pub struct DummyPeerRequestHandlerSvc; diff --git a/test-utils/src/test_netzone.rs b/test-utils/src/test_netzone.rs index 523f9754..0a534164 100644 --- a/test-utils/src/test_netzone.rs +++ b/test-utils/src/test_netzone.rs @@ -31,6 +31,8 @@ impl NetZoneAddress for TestNetZoneAddr { fn set_port(&mut self, _: u16) {} + fn make_canonical(&mut self) {} + fn ban_id(&self) -> Self::BanID { *self } @@ -74,6 +76,7 @@ impl { const NAME: &'static str = "Testing"; + const SEEDS: &'static [Self::Addr] = &[]; const ALLOW_SYNC: bool = ALLOW_SYNC; const DANDELION_PP: bool = DANDELION_PP; const CHECK_NODE_ID: bool = CHECK_NODE_ID;