2024-03-20 20:58:12 +00:00
|
|
|
//! Handshake Module
|
|
|
|
//!
|
|
|
|
//! This module contains a [`HandShaker`] which is a [`Service`] that takes an open connection and attempts
|
|
|
|
//! to complete a handshake with them.
|
|
|
|
//!
|
|
|
|
//! This module also contains a [`ping`] function that can be used to check if an address is reachable.
|
2023-11-30 18:09:05 +00:00
|
|
|
use std::{
|
|
|
|
future::Future,
|
|
|
|
marker::PhantomData,
|
|
|
|
pin::Pin,
|
2024-01-13 00:07:35 +00:00
|
|
|
sync::Arc,
|
2023-11-30 18:09:05 +00:00
|
|
|
task::{Context, Poll},
|
|
|
|
};
|
|
|
|
|
2024-05-02 22:58:22 +00:00
|
|
|
use futures::{FutureExt, SinkExt, Stream, StreamExt};
|
2024-01-22 18:18:15 +00:00
|
|
|
use tokio::{
|
2024-05-02 22:58:22 +00:00
|
|
|
sync::{mpsc, OwnedSemaphorePermit, Semaphore},
|
2024-01-22 18:18:15 +00:00
|
|
|
time::{error::Elapsed, timeout},
|
|
|
|
};
|
2023-11-30 18:09:05 +00:00
|
|
|
use tower::{Service, ServiceExt};
|
2024-05-02 22:58:22 +00:00
|
|
|
use tracing::{info_span, Instrument};
|
2023-11-30 18:09:05 +00:00
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
use monero_pruning::{PruningError, PruningSeed};
|
2023-11-30 18:09:05 +00:00
|
|
|
use monero_wire::{
|
|
|
|
admin::{
|
|
|
|
HandshakeRequest, HandshakeResponse, PingResponse, SupportFlagsResponse,
|
|
|
|
PING_OK_RESPONSE_STATUS_TEXT,
|
|
|
|
},
|
|
|
|
common::PeerSupportFlags,
|
2024-05-02 22:58:22 +00:00
|
|
|
BasicNodeData, BucketError, LevinCommand, Message, RequestMessage, ResponseMessage,
|
2023-11-30 18:09:05 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
use crate::{
|
2024-05-02 22:58:22 +00:00
|
|
|
client::{
|
|
|
|
connection::Connection, timeout_monitor::connection_timeout_monitor_task, Client,
|
|
|
|
InternalPeerID, PeerInformation,
|
|
|
|
},
|
|
|
|
constants::{
|
|
|
|
HANDSHAKE_TIMEOUT, MAX_EAGER_PROTOCOL_MESSAGES, MAX_PEERS_IN_PEER_LIST_MESSAGE,
|
|
|
|
PING_TIMEOUT,
|
|
|
|
},
|
2024-01-13 00:07:35 +00:00
|
|
|
handles::HandleBuilder,
|
2024-05-02 22:58:22 +00:00
|
|
|
services::PeerSyncRequest,
|
|
|
|
AddressBook, AddressBookRequest, AddressBookResponse, BroadcastMessage, ConnectionDirection,
|
|
|
|
CoreSyncDataRequest, CoreSyncDataResponse, CoreSyncSvc, NetZoneAddress, NetworkZone,
|
|
|
|
PeerRequestHandler, PeerSyncSvc, SharedError,
|
2023-11-30 18:09:05 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
|
|
|
pub enum HandshakeError {
|
2024-01-22 18:18:15 +00:00
|
|
|
#[error("The handshake timed out")]
|
|
|
|
TimedOut(#[from] Elapsed),
|
|
|
|
#[error("Peer has the same node ID as us")]
|
2023-11-30 18:09:05 +00:00
|
|
|
PeerHasSameNodeID,
|
2024-01-22 18:18:15 +00:00
|
|
|
#[error("Peer is on a different network")]
|
2023-11-30 18:09:05 +00:00
|
|
|
IncorrectNetwork,
|
2024-01-22 18:18:15 +00:00
|
|
|
#[error("Peer sent a peer list with peers from different zones")]
|
2023-12-08 15:03:01 +00:00
|
|
|
PeerSentIncorrectPeerList(#[from] crate::services::PeerListConversionError),
|
2024-01-22 18:18:15 +00:00
|
|
|
#[error("Peer sent invalid message: {0}")]
|
2023-11-30 18:09:05 +00:00
|
|
|
PeerSentInvalidMessage(&'static str),
|
2024-03-20 20:58:12 +00:00
|
|
|
#[error("The peers pruning seed is invalid.")]
|
|
|
|
InvalidPruningSeed(#[from] PruningError),
|
2023-11-30 18:09:05 +00:00
|
|
|
#[error("Levin bucket error: {0}")]
|
|
|
|
LevinBucketError(#[from] BucketError),
|
|
|
|
#[error("Internal service error: {0}")]
|
|
|
|
InternalSvcErr(#[from] tower::BoxError),
|
2024-01-22 18:18:15 +00:00
|
|
|
#[error("I/O error: {0}")]
|
2023-11-30 18:09:05 +00:00
|
|
|
IO(#[from] std::io::Error),
|
|
|
|
}
|
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
/// A request to complete a handshake.
|
2023-11-30 18:09:05 +00:00
|
|
|
pub struct DoHandshakeRequest<Z: NetworkZone> {
|
2024-03-20 20:58:12 +00:00
|
|
|
/// The [`InternalPeerID`] of the peer we are handshaking with.
|
2024-01-13 00:07:35 +00:00
|
|
|
pub addr: InternalPeerID<Z::Addr>,
|
2024-03-20 20:58:12 +00:00
|
|
|
/// The receiving side of the connection.
|
2023-11-30 18:09:05 +00:00
|
|
|
pub peer_stream: Z::Stream,
|
2024-03-20 20:58:12 +00:00
|
|
|
/// The sending side of the connection.
|
2023-11-30 18:09:05 +00:00
|
|
|
pub peer_sink: Z::Sink,
|
2024-03-20 20:58:12 +00:00
|
|
|
/// The direction of the connection.
|
2023-11-30 18:09:05 +00:00
|
|
|
pub direction: ConnectionDirection,
|
2024-03-20 20:58:12 +00:00
|
|
|
/// A permit for this connection.
|
2024-01-13 00:07:35 +00:00
|
|
|
pub permit: OwnedSemaphorePermit,
|
2023-11-30 18:09:05 +00:00
|
|
|
}
|
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
/// The peer handshaking service.
|
2023-11-30 18:09:05 +00:00
|
|
|
#[derive(Debug, Clone)]
|
2024-05-02 22:58:22 +00:00
|
|
|
pub struct HandShaker<Z: NetworkZone, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr> {
|
2024-03-20 20:58:12 +00:00
|
|
|
/// The address book service.
|
2023-11-30 18:09:05 +00:00
|
|
|
address_book: AdrBook,
|
2024-03-20 20:58:12 +00:00
|
|
|
/// The core sync data service.
|
2023-11-30 18:09:05 +00:00
|
|
|
core_sync_svc: CSync,
|
2024-05-02 22:58:22 +00:00
|
|
|
/// The peer sync service.
|
|
|
|
peer_sync_svc: PSync,
|
2024-03-20 20:58:12 +00:00
|
|
|
/// The peer request handler service.
|
2023-11-30 18:09:05 +00:00
|
|
|
peer_request_svc: ReqHdlr,
|
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
/// Our [`BasicNodeData`]
|
2023-11-30 18:09:05 +00:00
|
|
|
our_basic_node_data: BasicNodeData,
|
|
|
|
|
2024-05-02 22:58:22 +00:00
|
|
|
/// A function that returns a stream that will give items to be broadcast by a connection.
|
|
|
|
broadcast_stream_maker: BrdcstStrmMkr,
|
2024-01-13 00:07:35 +00:00
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
/// The network zone.
|
2023-11-30 18:09:05 +00:00
|
|
|
_zone: PhantomData<Z>,
|
|
|
|
}
|
|
|
|
|
2024-05-02 22:58:22 +00:00
|
|
|
impl<Z: NetworkZone, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr>
|
|
|
|
HandShaker<Z, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr>
|
|
|
|
{
|
2024-03-20 20:58:12 +00:00
|
|
|
/// Creates a new handshaker.
|
2023-11-30 18:09:05 +00:00
|
|
|
pub fn new(
|
|
|
|
address_book: AdrBook,
|
2024-05-02 22:58:22 +00:00
|
|
|
peer_sync_svc: PSync,
|
2023-11-30 18:09:05 +00:00
|
|
|
core_sync_svc: CSync,
|
|
|
|
peer_request_svc: ReqHdlr,
|
2024-05-02 22:58:22 +00:00
|
|
|
broadcast_stream_maker: BrdcstStrmMkr,
|
2024-01-13 00:07:35 +00:00
|
|
|
|
2023-11-30 18:09:05 +00:00
|
|
|
our_basic_node_data: BasicNodeData,
|
|
|
|
) -> Self {
|
|
|
|
Self {
|
|
|
|
address_book,
|
2024-05-02 22:58:22 +00:00
|
|
|
peer_sync_svc,
|
2023-11-30 18:09:05 +00:00
|
|
|
core_sync_svc,
|
|
|
|
peer_request_svc,
|
2024-05-02 22:58:22 +00:00
|
|
|
broadcast_stream_maker,
|
2023-11-30 18:09:05 +00:00
|
|
|
our_basic_node_data,
|
|
|
|
_zone: PhantomData,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-05-02 22:58:22 +00:00
|
|
|
impl<Z: NetworkZone, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr, BrdcstStrm>
|
|
|
|
Service<DoHandshakeRequest<Z>> for HandShaker<Z, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr>
|
2023-11-30 18:09:05 +00:00
|
|
|
where
|
|
|
|
AdrBook: AddressBook<Z> + Clone,
|
|
|
|
CSync: CoreSyncSvc + Clone,
|
2024-05-02 22:58:22 +00:00
|
|
|
PSync: PeerSyncSvc<Z> + Clone,
|
2023-11-30 18:09:05 +00:00
|
|
|
ReqHdlr: PeerRequestHandler + Clone,
|
2024-05-02 22:58:22 +00:00
|
|
|
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
|
|
|
|
BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Clone + Send + 'static,
|
2023-11-30 18:09:05 +00:00
|
|
|
{
|
2024-01-13 00:07:35 +00:00
|
|
|
type Response = Client<Z>;
|
2023-11-30 18:09:05 +00:00
|
|
|
type Error = HandshakeError;
|
|
|
|
type Future =
|
|
|
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
|
|
|
|
|
|
|
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
|
|
|
Poll::Ready(Ok(()))
|
|
|
|
}
|
|
|
|
|
|
|
|
fn call(&mut self, req: DoHandshakeRequest<Z>) -> Self::Future {
|
2024-05-02 22:58:22 +00:00
|
|
|
let broadcast_stream_maker = self.broadcast_stream_maker.clone();
|
2024-01-13 00:07:35 +00:00
|
|
|
|
2023-11-30 18:09:05 +00:00
|
|
|
let address_book = self.address_book.clone();
|
|
|
|
let peer_request_svc = self.peer_request_svc.clone();
|
|
|
|
let core_sync_svc = self.core_sync_svc.clone();
|
2024-05-02 22:58:22 +00:00
|
|
|
let peer_sync_svc = self.peer_sync_svc.clone();
|
2023-11-30 18:09:05 +00:00
|
|
|
let our_basic_node_data = self.our_basic_node_data.clone();
|
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
let span = info_span!(parent: &tracing::Span::current(), "handshaker", addr=%req.addr);
|
2023-11-30 18:09:05 +00:00
|
|
|
|
|
|
|
async move {
|
2024-01-22 18:18:15 +00:00
|
|
|
timeout(
|
|
|
|
HANDSHAKE_TIMEOUT,
|
|
|
|
handshake(
|
|
|
|
req,
|
2024-05-02 22:58:22 +00:00
|
|
|
broadcast_stream_maker,
|
2024-01-22 18:18:15 +00:00
|
|
|
address_book,
|
|
|
|
core_sync_svc,
|
2024-05-02 22:58:22 +00:00
|
|
|
peer_sync_svc,
|
2024-01-22 18:18:15 +00:00
|
|
|
peer_request_svc,
|
|
|
|
our_basic_node_data,
|
|
|
|
),
|
2024-01-12 00:02:25 +00:00
|
|
|
)
|
2024-01-22 18:18:15 +00:00
|
|
|
.await?
|
2023-11-30 18:09:05 +00:00
|
|
|
}
|
|
|
|
.instrument(span)
|
|
|
|
.boxed()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
/// Send a ping to the requested peer and wait for a response, returning the `peer_id`.
|
|
|
|
///
|
|
|
|
/// This function does not put a timeout on the ping.
|
|
|
|
pub async fn ping<N: NetworkZone>(addr: N::Addr) -> Result<u64, HandshakeError> {
|
|
|
|
tracing::debug!("Sending Ping to peer");
|
|
|
|
|
|
|
|
let (mut peer_stream, mut peer_sink) = N::connect_to_peer(addr).await?;
|
|
|
|
|
|
|
|
tracing::debug!("Made outbound connection to peer, sending ping.");
|
|
|
|
|
|
|
|
peer_sink
|
|
|
|
.send(Message::Request(RequestMessage::Ping).into())
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
if let Some(res) = peer_stream.next().await {
|
|
|
|
if let Message::Response(ResponseMessage::Ping(ping)) = res? {
|
|
|
|
if ping.status == PING_OK_RESPONSE_STATUS_TEXT {
|
|
|
|
tracing::debug!("Ping successful.");
|
|
|
|
return Ok(ping.peer_id);
|
|
|
|
}
|
|
|
|
|
|
|
|
tracing::debug!("Peer's ping response was not `OK`.");
|
|
|
|
return Err(HandshakeError::PeerSentInvalidMessage(
|
|
|
|
"Ping response was not `OK`",
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
tracing::debug!("Peer sent invalid response to ping.");
|
|
|
|
return Err(HandshakeError::PeerSentInvalidMessage(
|
|
|
|
"Peer did not send correct response for ping.",
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
tracing::debug!("Connection closed before ping response.");
|
|
|
|
Err(BucketError::IO(std::io::Error::new(
|
|
|
|
std::io::ErrorKind::ConnectionAborted,
|
|
|
|
"The peer stream returned None",
|
|
|
|
)))?
|
|
|
|
}
|
|
|
|
|
2024-01-22 18:18:15 +00:00
|
|
|
/// This function completes a handshake with the requested peer.
|
2024-05-02 22:58:22 +00:00
|
|
|
async fn handshake<Z: NetworkZone, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr, BrdcstStrm>(
|
2024-01-22 18:18:15 +00:00
|
|
|
req: DoHandshakeRequest<Z>,
|
2024-01-13 00:07:35 +00:00
|
|
|
|
2024-05-02 22:58:22 +00:00
|
|
|
broadcast_stream_maker: BrdcstStrmMkr,
|
2024-01-13 00:07:35 +00:00
|
|
|
|
2024-01-12 00:02:25 +00:00
|
|
|
mut address_book: AdrBook,
|
|
|
|
mut core_sync_svc: CSync,
|
2024-05-02 22:58:22 +00:00
|
|
|
mut peer_sync_svc: PSync,
|
2023-11-30 18:09:05 +00:00
|
|
|
peer_request_svc: ReqHdlr,
|
|
|
|
our_basic_node_data: BasicNodeData,
|
2024-01-13 00:07:35 +00:00
|
|
|
) -> Result<Client<Z>, HandshakeError>
|
2023-11-30 18:09:05 +00:00
|
|
|
where
|
|
|
|
AdrBook: AddressBook<Z>,
|
|
|
|
CSync: CoreSyncSvc,
|
2024-05-02 22:58:22 +00:00
|
|
|
PSync: PeerSyncSvc<Z>,
|
2023-11-30 18:09:05 +00:00
|
|
|
ReqHdlr: PeerRequestHandler,
|
2024-05-02 22:58:22 +00:00
|
|
|
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
|
|
|
|
BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Send + 'static,
|
2023-11-30 18:09:05 +00:00
|
|
|
{
|
2024-01-22 18:18:15 +00:00
|
|
|
let DoHandshakeRequest {
|
|
|
|
addr,
|
|
|
|
mut peer_stream,
|
|
|
|
mut peer_sink,
|
|
|
|
direction,
|
|
|
|
permit,
|
|
|
|
} = req;
|
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
// A list of protocol messages the peer has sent during the handshake for us to handle after the handshake.
|
|
|
|
// see: [`MAX_EAGER_PROTOCOL_MESSAGES`]
|
2024-01-12 00:02:25 +00:00
|
|
|
let mut eager_protocol_messages = Vec::new();
|
|
|
|
|
2024-05-02 22:58:22 +00:00
|
|
|
let (peer_core_sync, peer_node_data) = match direction {
|
2024-01-12 00:02:25 +00:00
|
|
|
ConnectionDirection::InBound => {
|
2024-03-20 20:58:12 +00:00
|
|
|
// Inbound handshake the peer sends the request.
|
2024-01-12 00:02:25 +00:00
|
|
|
tracing::debug!("waiting for handshake request.");
|
|
|
|
|
|
|
|
let Message::Request(RequestMessage::Handshake(handshake_req)) = wait_for_message::<Z>(
|
|
|
|
LevinCommand::Handshake,
|
|
|
|
true,
|
|
|
|
&mut peer_sink,
|
|
|
|
&mut peer_stream,
|
|
|
|
&mut eager_protocol_messages,
|
2024-03-20 20:58:12 +00:00
|
|
|
&our_basic_node_data,
|
2024-01-12 00:02:25 +00:00
|
|
|
)
|
2023-11-30 18:09:05 +00:00
|
|
|
.await?
|
2024-01-12 00:02:25 +00:00
|
|
|
else {
|
|
|
|
panic!("wait_for_message returned ok with wrong message.");
|
|
|
|
};
|
2023-11-30 18:09:05 +00:00
|
|
|
|
2024-01-12 00:02:25 +00:00
|
|
|
tracing::debug!("Received handshake request.");
|
2024-03-20 20:58:12 +00:00
|
|
|
// We will respond to the handshake request later.
|
2024-01-12 00:02:25 +00:00
|
|
|
(handshake_req.payload_data, handshake_req.node_data)
|
|
|
|
}
|
|
|
|
ConnectionDirection::OutBound => {
|
2024-03-20 20:58:12 +00:00
|
|
|
// Outbound handshake, we send the request.
|
2024-01-12 00:02:25 +00:00
|
|
|
send_hs_request::<Z, _>(
|
|
|
|
&mut peer_sink,
|
|
|
|
&mut core_sync_svc,
|
|
|
|
our_basic_node_data.clone(),
|
|
|
|
)
|
2023-11-30 18:09:05 +00:00
|
|
|
.await?;
|
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
// Wait for the handshake response.
|
2024-01-12 00:02:25 +00:00
|
|
|
let Message::Response(ResponseMessage::Handshake(handshake_res)) =
|
|
|
|
wait_for_message::<Z>(
|
|
|
|
LevinCommand::Handshake,
|
|
|
|
false,
|
|
|
|
&mut peer_sink,
|
|
|
|
&mut peer_stream,
|
|
|
|
&mut eager_protocol_messages,
|
2024-03-20 20:58:12 +00:00
|
|
|
&our_basic_node_data,
|
2024-01-12 00:02:25 +00:00
|
|
|
)
|
|
|
|
.await?
|
|
|
|
else {
|
|
|
|
panic!("wait_for_message returned ok with wrong message.");
|
|
|
|
};
|
|
|
|
|
|
|
|
if handshake_res.local_peerlist_new.len() > MAX_PEERS_IN_PEER_LIST_MESSAGE {
|
|
|
|
tracing::debug!("peer sent too many peers in response, cancelling handshake");
|
2023-11-30 18:09:05 +00:00
|
|
|
|
2024-01-12 00:02:25 +00:00
|
|
|
return Err(HandshakeError::PeerSentInvalidMessage(
|
|
|
|
"Too many peers in peer list message (>250)",
|
|
|
|
));
|
|
|
|
}
|
2023-11-30 18:09:05 +00:00
|
|
|
|
|
|
|
tracing::debug!(
|
2024-01-12 00:02:25 +00:00
|
|
|
"Telling address book about new peers, len: {}",
|
|
|
|
handshake_res.local_peerlist_new.len()
|
2023-11-30 18:09:05 +00:00
|
|
|
);
|
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
// Tell our address book about the new peers.
|
2024-01-12 00:02:25 +00:00
|
|
|
address_book
|
|
|
|
.ready()
|
|
|
|
.await?
|
|
|
|
.call(AddressBookRequest::IncomingPeerList(
|
|
|
|
handshake_res
|
|
|
|
.local_peerlist_new
|
|
|
|
.into_iter()
|
|
|
|
.map(TryInto::try_into)
|
|
|
|
.collect::<Result<_, _>>()?,
|
|
|
|
))
|
|
|
|
.await?;
|
2023-11-30 18:09:05 +00:00
|
|
|
|
2024-01-12 00:02:25 +00:00
|
|
|
(handshake_res.payload_data, handshake_res.node_data)
|
2023-11-30 18:09:05 +00:00
|
|
|
}
|
2024-01-12 00:02:25 +00:00
|
|
|
};
|
2023-11-30 18:09:05 +00:00
|
|
|
|
2024-01-12 00:02:25 +00:00
|
|
|
if peer_node_data.network_id != our_basic_node_data.network_id {
|
|
|
|
return Err(HandshakeError::IncorrectNetwork);
|
|
|
|
}
|
2023-11-30 18:09:05 +00:00
|
|
|
|
2024-01-12 00:02:25 +00:00
|
|
|
if Z::CHECK_NODE_ID && peer_node_data.peer_id == our_basic_node_data.peer_id {
|
|
|
|
return Err(HandshakeError::PeerHasSameNodeID);
|
|
|
|
}
|
2023-11-30 18:09:05 +00:00
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
/*
|
|
|
|
// monerod sends a request for support flags if the peer doesn't specify any but this seems unnecessary
|
|
|
|
// as the peer should specify them in the handshake.
|
|
|
|
|
2024-01-12 00:02:25 +00:00
|
|
|
if peer_node_data.support_flags.is_empty() {
|
2023-11-30 18:09:05 +00:00
|
|
|
tracing::debug!(
|
2024-01-12 00:02:25 +00:00
|
|
|
"Peer didn't send support flags or has no features, sending request to make sure."
|
2023-11-30 18:09:05 +00:00
|
|
|
);
|
2024-01-12 00:02:25 +00:00
|
|
|
peer_sink
|
2024-03-05 01:29:57 +00:00
|
|
|
.send(Message::Request(RequestMessage::SupportFlags).into())
|
2023-11-30 18:09:05 +00:00
|
|
|
.await?;
|
|
|
|
|
2024-01-12 00:02:25 +00:00
|
|
|
let Message::Response(ResponseMessage::SupportFlags(support_flags_res)) =
|
|
|
|
wait_for_message::<Z>(
|
|
|
|
LevinCommand::SupportFlags,
|
|
|
|
false,
|
|
|
|
&mut peer_sink,
|
|
|
|
&mut peer_stream,
|
|
|
|
&mut eager_protocol_messages,
|
2024-03-20 20:58:12 +00:00
|
|
|
&our_basic_node_data,
|
2024-01-12 00:02:25 +00:00
|
|
|
)
|
2023-11-30 18:09:05 +00:00
|
|
|
.await?
|
2024-01-12 00:02:25 +00:00
|
|
|
else {
|
|
|
|
panic!("wait_for_message returned ok with wrong message.");
|
|
|
|
};
|
2023-11-30 18:09:05 +00:00
|
|
|
|
2024-01-12 00:02:25 +00:00
|
|
|
tracing::debug!("Received support flag response.");
|
|
|
|
peer_node_data.support_flags = support_flags_res.support_flags;
|
2023-11-30 18:09:05 +00:00
|
|
|
}
|
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
*/
|
|
|
|
|
|
|
|
// Make sure the pruning seed is valid.
|
|
|
|
let pruning_seed = PruningSeed::decompress_p2p_rules(peer_core_sync.pruning_seed)?;
|
2023-11-30 18:09:05 +00:00
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
// public_address, if Some, is the reachable address of the node.
|
|
|
|
let public_address = 'check_out_addr: {
|
|
|
|
match direction {
|
|
|
|
ConnectionDirection::InBound => {
|
|
|
|
// First send the handshake response.
|
|
|
|
send_hs_response::<Z, _, _>(
|
|
|
|
&mut peer_sink,
|
|
|
|
&mut core_sync_svc,
|
|
|
|
&mut address_book,
|
|
|
|
our_basic_node_data,
|
|
|
|
)
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
// Now if the peer specifies a reachable port, open a connection and ping them to check.
|
|
|
|
if peer_node_data.my_port != 0 {
|
|
|
|
let InternalPeerID::KnownAddr(mut outbound_address) = addr else {
|
|
|
|
// Anonymity network, we don't know the inbound address.
|
|
|
|
break 'check_out_addr None;
|
|
|
|
};
|
|
|
|
|
|
|
|
// u32 does not make sense as a port so just truncate it.
|
|
|
|
outbound_address.set_port(peer_node_data.my_port as u16);
|
|
|
|
|
|
|
|
let Ok(Ok(ping_peer_id)) = timeout(
|
|
|
|
PING_TIMEOUT,
|
|
|
|
ping::<Z>(outbound_address).instrument(info_span!("ping")),
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
else {
|
|
|
|
// The ping was not successful.
|
|
|
|
break 'check_out_addr None;
|
|
|
|
};
|
|
|
|
|
|
|
|
// Make sure we are talking to the right node.
|
|
|
|
if ping_peer_id == peer_node_data.peer_id {
|
|
|
|
break 'check_out_addr Some(outbound_address);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// The peer did not specify a reachable port or the ping was not successful.
|
|
|
|
None
|
|
|
|
}
|
|
|
|
ConnectionDirection::OutBound => {
|
|
|
|
let InternalPeerID::KnownAddr(outbound_addr) = addr else {
|
|
|
|
unreachable!("How could we make an outbound connection to an unknown address");
|
|
|
|
};
|
|
|
|
|
|
|
|
// This is an outbound connection, this address is obviously reachable.
|
|
|
|
Some(outbound_addr)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2024-01-12 00:02:25 +00:00
|
|
|
tracing::debug!("Handshake complete.");
|
2023-11-30 18:09:05 +00:00
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
// Set up the connection data.
|
2024-01-13 00:07:35 +00:00
|
|
|
let error_slot = SharedError::new();
|
2024-03-20 20:58:12 +00:00
|
|
|
let (connection_guard, handle) = HandleBuilder::new().with_permit(permit).build();
|
2024-05-02 22:58:22 +00:00
|
|
|
let (connection_tx, client_rx) = mpsc::channel(1);
|
2024-01-13 00:07:35 +00:00
|
|
|
|
2024-05-02 22:58:22 +00:00
|
|
|
let connection = Connection::<Z, _, _>::new(
|
2024-01-13 00:07:35 +00:00
|
|
|
peer_sink,
|
|
|
|
client_rx,
|
2024-05-02 22:58:22 +00:00
|
|
|
broadcast_stream_maker(addr),
|
2024-01-13 00:07:35 +00:00
|
|
|
peer_request_svc,
|
|
|
|
connection_guard,
|
|
|
|
error_slot.clone(),
|
|
|
|
);
|
|
|
|
|
2024-05-02 22:58:22 +00:00
|
|
|
let connection_span = tracing::error_span!(parent: &tracing::Span::none(), "connection", %addr);
|
|
|
|
let connection_handle = tokio::spawn(
|
|
|
|
connection
|
|
|
|
.run(peer_stream.fuse(), eager_protocol_messages)
|
|
|
|
.instrument(connection_span),
|
2024-03-20 20:58:12 +00:00
|
|
|
);
|
|
|
|
|
2024-05-02 22:58:22 +00:00
|
|
|
// Tell the core sync service about the new peer.
|
|
|
|
peer_sync_svc
|
|
|
|
.ready()
|
|
|
|
.await?
|
|
|
|
.call(PeerSyncRequest::IncomingCoreSyncData(
|
|
|
|
addr,
|
|
|
|
handle.clone(),
|
|
|
|
peer_core_sync,
|
|
|
|
))
|
|
|
|
.await?;
|
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
// Tell the address book about the new connection.
|
|
|
|
address_book
|
|
|
|
.ready()
|
|
|
|
.await?
|
|
|
|
.call(AddressBookRequest::NewConnection {
|
|
|
|
internal_peer_id: addr,
|
|
|
|
public_address,
|
2024-05-02 22:58:22 +00:00
|
|
|
handle: handle.clone(),
|
2024-03-20 20:58:12 +00:00
|
|
|
id: peer_node_data.peer_id,
|
|
|
|
pruning_seed,
|
|
|
|
rpc_port: peer_node_data.rpc_port,
|
|
|
|
rpc_credits_per_hash: peer_node_data.rpc_credits_per_hash,
|
|
|
|
})
|
|
|
|
.await?;
|
2024-01-13 00:07:35 +00:00
|
|
|
|
2024-05-02 22:58:22 +00:00
|
|
|
let info = PeerInformation {
|
|
|
|
id: addr,
|
|
|
|
handle,
|
|
|
|
direction,
|
|
|
|
pruning_seed,
|
|
|
|
};
|
|
|
|
|
|
|
|
let semaphore = Arc::new(Semaphore::new(1));
|
|
|
|
|
|
|
|
let timeout_handle = tokio::spawn(connection_timeout_monitor_task(
|
|
|
|
info.id,
|
|
|
|
info.handle.clone(),
|
|
|
|
connection_tx.clone(),
|
|
|
|
semaphore.clone(),
|
|
|
|
address_book,
|
|
|
|
core_sync_svc,
|
|
|
|
peer_sync_svc,
|
|
|
|
));
|
|
|
|
|
|
|
|
let client = Client::<Z>::new(
|
|
|
|
info,
|
|
|
|
connection_tx,
|
|
|
|
connection_handle,
|
|
|
|
timeout_handle,
|
|
|
|
semaphore,
|
|
|
|
error_slot,
|
|
|
|
);
|
|
|
|
|
2024-01-13 00:07:35 +00:00
|
|
|
Ok(client)
|
2024-01-12 00:02:25 +00:00
|
|
|
}
|
2023-11-30 18:09:05 +00:00
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
/// Sends a [`RequestMessage::Handshake`] down the peer sink.
|
2024-01-12 00:02:25 +00:00
|
|
|
async fn send_hs_request<Z: NetworkZone, CSync>(
|
|
|
|
peer_sink: &mut Z::Sink,
|
|
|
|
core_sync_svc: &mut CSync,
|
|
|
|
our_basic_node_data: BasicNodeData,
|
|
|
|
) -> Result<(), HandshakeError>
|
|
|
|
where
|
|
|
|
CSync: CoreSyncSvc,
|
|
|
|
{
|
2024-05-02 22:58:22 +00:00
|
|
|
let CoreSyncDataResponse(our_core_sync_data) = core_sync_svc
|
2024-01-12 00:02:25 +00:00
|
|
|
.ready()
|
|
|
|
.await?
|
2024-05-02 22:58:22 +00:00
|
|
|
.call(CoreSyncDataRequest)
|
|
|
|
.await?;
|
2024-01-12 00:02:25 +00:00
|
|
|
|
|
|
|
let req = HandshakeRequest {
|
|
|
|
node_data: our_basic_node_data,
|
|
|
|
payload_data: our_core_sync_data,
|
|
|
|
};
|
|
|
|
|
|
|
|
tracing::debug!("Sending handshake request.");
|
|
|
|
|
|
|
|
peer_sink
|
2024-03-05 01:29:57 +00:00
|
|
|
.send(Message::Request(RequestMessage::Handshake(req)).into())
|
2024-01-12 00:02:25 +00:00
|
|
|
.await?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
2023-11-30 18:09:05 +00:00
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
/// Sends a [`ResponseMessage::Handshake`] down the peer sink.
|
2024-01-12 00:02:25 +00:00
|
|
|
async fn send_hs_response<Z: NetworkZone, CSync, AdrBook>(
|
|
|
|
peer_sink: &mut Z::Sink,
|
|
|
|
core_sync_svc: &mut CSync,
|
|
|
|
address_book: &mut AdrBook,
|
|
|
|
our_basic_node_data: BasicNodeData,
|
|
|
|
) -> Result<(), HandshakeError>
|
|
|
|
where
|
|
|
|
AdrBook: AddressBook<Z>,
|
|
|
|
CSync: CoreSyncSvc,
|
|
|
|
{
|
2024-05-02 22:58:22 +00:00
|
|
|
let CoreSyncDataResponse(our_core_sync_data) = core_sync_svc
|
2024-01-12 00:02:25 +00:00
|
|
|
.ready()
|
|
|
|
.await?
|
2024-05-02 22:58:22 +00:00
|
|
|
.call(CoreSyncDataRequest)
|
|
|
|
.await?;
|
2024-01-12 00:02:25 +00:00
|
|
|
|
|
|
|
let AddressBookResponse::Peers(our_peer_list) = address_book
|
|
|
|
.ready()
|
|
|
|
.await?
|
|
|
|
.call(AddressBookRequest::GetWhitePeers(
|
|
|
|
MAX_PEERS_IN_PEER_LIST_MESSAGE,
|
|
|
|
))
|
|
|
|
.await?
|
|
|
|
else {
|
|
|
|
panic!("Address book sent incorrect response");
|
|
|
|
};
|
|
|
|
|
|
|
|
let res = HandshakeResponse {
|
|
|
|
node_data: our_basic_node_data,
|
|
|
|
payload_data: our_core_sync_data,
|
|
|
|
local_peerlist_new: our_peer_list.into_iter().map(Into::into).collect(),
|
|
|
|
};
|
|
|
|
|
|
|
|
tracing::debug!("Sending handshake response.");
|
|
|
|
|
|
|
|
peer_sink
|
2024-03-05 01:29:57 +00:00
|
|
|
.send(Message::Response(ResponseMessage::Handshake(res)).into())
|
2024-01-12 00:02:25 +00:00
|
|
|
.await?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
2023-11-30 18:09:05 +00:00
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
/// Waits for a message with a specific [`LevinCommand`].
|
|
|
|
///
|
|
|
|
/// The message needed must not be a protocol message, only request/ response "admin" messages are allowed.
|
|
|
|
///
|
|
|
|
/// `levin_command` is the [`LevinCommand`] you need and `request` is for if the message is a request.
|
2024-01-12 00:02:25 +00:00
|
|
|
async fn wait_for_message<Z: NetworkZone>(
|
|
|
|
levin_command: LevinCommand,
|
|
|
|
request: bool,
|
2024-03-20 20:58:12 +00:00
|
|
|
|
2024-01-12 00:02:25 +00:00
|
|
|
peer_sink: &mut Z::Sink,
|
|
|
|
peer_stream: &mut Z::Stream,
|
2024-03-20 20:58:12 +00:00
|
|
|
|
2024-01-12 00:02:25 +00:00
|
|
|
eager_protocol_messages: &mut Vec<monero_wire::ProtocolMessage>,
|
2024-03-20 20:58:12 +00:00
|
|
|
|
|
|
|
our_basic_node_data: &BasicNodeData,
|
2024-01-12 00:02:25 +00:00
|
|
|
) -> Result<Message, HandshakeError> {
|
2024-03-20 20:58:12 +00:00
|
|
|
let mut allow_support_flag_req = true;
|
|
|
|
let mut allow_ping = true;
|
|
|
|
|
2024-01-12 00:02:25 +00:00
|
|
|
while let Some(message) = peer_stream.next().await {
|
|
|
|
let message = message?;
|
|
|
|
|
|
|
|
match message {
|
|
|
|
Message::Protocol(protocol_message) => {
|
|
|
|
tracing::debug!(
|
|
|
|
"Received eager protocol message with ID: {}, adding to queue",
|
|
|
|
protocol_message.command()
|
|
|
|
);
|
|
|
|
eager_protocol_messages.push(protocol_message);
|
|
|
|
if eager_protocol_messages.len() > MAX_EAGER_PROTOCOL_MESSAGES {
|
|
|
|
tracing::debug!(
|
2024-02-15 16:03:04 +00:00
|
|
|
"Peer sent too many protocol messages before a handshake response."
|
2024-01-12 00:02:25 +00:00
|
|
|
);
|
2023-11-30 18:09:05 +00:00
|
|
|
return Err(HandshakeError::PeerSentInvalidMessage(
|
2024-01-12 00:02:25 +00:00
|
|
|
"Peer sent too many protocol messages",
|
2023-11-30 18:09:05 +00:00
|
|
|
));
|
2024-01-12 00:02:25 +00:00
|
|
|
}
|
|
|
|
continue;
|
2023-11-30 18:09:05 +00:00
|
|
|
}
|
2024-01-12 00:02:25 +00:00
|
|
|
Message::Request(req_message) => {
|
|
|
|
if req_message.command() == levin_command && request {
|
|
|
|
return Ok(Message::Request(req_message));
|
|
|
|
}
|
2023-11-30 18:09:05 +00:00
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
match req_message {
|
|
|
|
RequestMessage::SupportFlags => {
|
|
|
|
if !allow_support_flag_req {
|
|
|
|
return Err(HandshakeError::PeerSentInvalidMessage(
|
|
|
|
"Peer sent 2 support flag requests",
|
|
|
|
));
|
|
|
|
}
|
|
|
|
send_support_flags::<Z>(peer_sink, our_basic_node_data.support_flags)
|
|
|
|
.await?;
|
|
|
|
// don't let the peer send more after the first request.
|
|
|
|
allow_support_flag_req = false;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
RequestMessage::Ping => {
|
2024-05-02 22:58:22 +00:00
|
|
|
if !allow_ping {
|
2024-03-20 20:58:12 +00:00
|
|
|
return Err(HandshakeError::PeerSentInvalidMessage(
|
|
|
|
"Peer sent 2 ping requests",
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
send_ping_response::<Z>(peer_sink, our_basic_node_data.peer_id).await?;
|
|
|
|
|
|
|
|
// don't let the peer send more after the first request.
|
|
|
|
allow_ping = false;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
_ => {
|
2024-01-12 00:02:25 +00:00
|
|
|
return Err(HandshakeError::PeerSentInvalidMessage(
|
2024-03-20 20:58:12 +00:00
|
|
|
"Peer sent an admin request before responding to the handshake",
|
|
|
|
))
|
2024-01-12 00:02:25 +00:00
|
|
|
}
|
|
|
|
}
|
2023-11-30 18:09:05 +00:00
|
|
|
}
|
2024-01-12 00:02:25 +00:00
|
|
|
Message::Response(res_message) if !request => {
|
|
|
|
if res_message.command() == levin_command {
|
|
|
|
return Ok(Message::Response(res_message));
|
|
|
|
}
|
2023-11-30 18:09:05 +00:00
|
|
|
|
2024-01-12 00:02:25 +00:00
|
|
|
tracing::debug!("Received unexpected response: {}", res_message.command());
|
|
|
|
return Err(HandshakeError::PeerSentInvalidMessage(
|
|
|
|
"Peer sent an incorrect response",
|
|
|
|
));
|
|
|
|
}
|
2023-11-30 18:09:05 +00:00
|
|
|
|
2024-01-12 00:02:25 +00:00
|
|
|
_ => Err(HandshakeError::PeerSentInvalidMessage(
|
|
|
|
"Peer sent an incorrect message",
|
|
|
|
)),
|
|
|
|
}?
|
2023-11-30 18:09:05 +00:00
|
|
|
}
|
|
|
|
|
2024-01-12 00:02:25 +00:00
|
|
|
Err(BucketError::IO(std::io::Error::new(
|
|
|
|
std::io::ErrorKind::ConnectionAborted,
|
|
|
|
"The peer stream returned None",
|
|
|
|
)))?
|
|
|
|
}
|
2023-11-30 18:09:05 +00:00
|
|
|
|
2024-03-20 20:58:12 +00:00
|
|
|
/// Sends a [`ResponseMessage::SupportFlags`] down the peer sink.
|
2024-01-12 00:02:25 +00:00
|
|
|
async fn send_support_flags<Z: NetworkZone>(
|
|
|
|
peer_sink: &mut Z::Sink,
|
|
|
|
support_flags: PeerSupportFlags,
|
|
|
|
) -> Result<(), HandshakeError> {
|
|
|
|
tracing::debug!("Sending support flag response.");
|
|
|
|
Ok(peer_sink
|
2024-03-05 01:29:57 +00:00
|
|
|
.send(
|
|
|
|
Message::Response(ResponseMessage::SupportFlags(SupportFlagsResponse {
|
|
|
|
support_flags,
|
|
|
|
}))
|
|
|
|
.into(),
|
|
|
|
)
|
2024-01-12 00:02:25 +00:00
|
|
|
.await?)
|
2023-11-30 18:09:05 +00:00
|
|
|
}
|
2024-03-20 20:58:12 +00:00
|
|
|
|
|
|
|
/// Sends a [`ResponseMessage::Ping`] down the peer sink.
|
|
|
|
async fn send_ping_response<Z: NetworkZone>(
|
|
|
|
peer_sink: &mut Z::Sink,
|
|
|
|
peer_id: u64,
|
|
|
|
) -> Result<(), HandshakeError> {
|
|
|
|
tracing::debug!("Sending ping response.");
|
|
|
|
Ok(peer_sink
|
|
|
|
.send(
|
|
|
|
Message::Response(ResponseMessage::Ping(PingResponse {
|
|
|
|
status: PING_OK_RESPONSE_STATUS_TEXT,
|
|
|
|
peer_id,
|
|
|
|
}))
|
|
|
|
.into(),
|
|
|
|
)
|
|
|
|
.await?)
|
|
|
|
}
|