p2p-core: enable workspace lints

This commit is contained in:
hinto.janai 2024-09-17 20:34:05 -04:00
parent 90027143f0
commit 0f4939a318
No known key found for this signature in database
GPG key ID: D47CE05FA175A499
22 changed files with 150 additions and 138 deletions

2
Cargo.lock generated
View file

@ -786,11 +786,9 @@ dependencies = [
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tokio-test",
"tokio-util", "tokio-util",
"tower", "tower",
"tracing", "tracing",
"tracing-subscriber",
] ]
[[package]] [[package]]

View file

@ -32,5 +32,8 @@ cuprate-test-utils = {path = "../../test-utils"}
hex = { workspace = true, features = ["std"] } hex = { workspace = true, features = ["std"] }
tokio = { workspace = true, features = ["net", "rt-multi-thread", "rt", "macros"]} tokio = { workspace = true, features = ["net", "rt-multi-thread", "rt", "macros"]}
tokio-test = { workspace = true } # tokio-test = { workspace = true }
tracing-subscriber = { workspace = true } # tracing-subscriber = { workspace = true }
[lints]
workspace = true

View file

@ -43,8 +43,8 @@ pub enum InternalPeerID<A> {
impl<A: Display> Display for InternalPeerID<A> { impl<A: Display> Display for InternalPeerID<A> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self { match self {
InternalPeerID::KnownAddr(addr) => addr.fmt(f), Self::KnownAddr(addr) => addr.fmt(f),
InternalPeerID::Unknown(id) => f.write_str(&format!("Unknown, ID: {id}")), Self::Unknown(id) => f.write_str(&format!("Unknown, ID: {id}")),
} }
} }
} }
@ -113,7 +113,7 @@ impl<Z: NetworkZone> Client<Z> {
fn set_err(&self, err: PeerError) -> tower::BoxError { fn set_err(&self, err: PeerError) -> tower::BoxError {
let err_str = err.to_string(); let err_str = err.to_string();
match self.error.try_insert_err(err) { match self.error.try_insert_err(err) {
Ok(_) => err_str, Ok(()) => err_str,
Err(e) => e.to_string(), Err(e) => e.to_string(),
} }
.into() .into()
@ -169,7 +169,7 @@ impl<Z: NetworkZone> Service<PeerRequest> for Client<Z> {
TrySendError::Closed(req) | TrySendError::Full(req) => { TrySendError::Closed(req) | TrySendError::Full(req) => {
self.set_err(PeerError::ClientChannelClosed); self.set_err(PeerError::ClientChannelClosed);
let _ = req let _unused = req
.response_channel .response_channel
.send(Err(PeerError::ClientChannelClosed.into())); .send(Err(PeerError::ClientChannelClosed.into()));
} }
@ -216,7 +216,7 @@ where
tracing::debug!("Sending back response"); tracing::debug!("Sending back response");
let _ = req.response_channel.send(Ok(res)); let _unused = req.response_channel.send(Ok(res));
} }
} }
.instrument(task_span), .instrument(task_span),

View file

@ -26,7 +26,7 @@ use crate::{
}; };
/// A request to the connection task from a [`Client`](crate::client::Client). /// A request to the connection task from a [`Client`](crate::client::Client).
pub struct ConnectionTaskRequest { pub(crate) struct ConnectionTaskRequest {
/// The request. /// The request.
pub request: PeerRequest, pub request: PeerRequest,
/// The response channel. /// The response channel.
@ -36,7 +36,7 @@ pub struct ConnectionTaskRequest {
} }
/// The connection state. /// The connection state.
pub enum State { pub(crate) enum State {
/// Waiting for a request from Cuprate or the connected peer. /// Waiting for a request from Cuprate or the connected peer.
WaitingForRequest, WaitingForRequest,
/// Waiting for a response from the peer. /// Waiting for a response from the peer.
@ -53,7 +53,7 @@ pub enum State {
/// Returns if the [`LevinCommand`] is the correct response message for our request. /// Returns if the [`LevinCommand`] is the correct response message for our request.
/// ///
/// e.g. that we didn't get a block for a txs request. /// e.g. that we didn't get a block for a txs request.
fn levin_command_response(message_id: &MessageID, command: LevinCommand) -> bool { const fn levin_command_response(message_id: MessageID, command: LevinCommand) -> bool {
matches!( matches!(
(message_id, command), (message_id, command),
(MessageID::Handshake, LevinCommand::Handshake) (MessageID::Handshake, LevinCommand::Handshake)
@ -71,7 +71,7 @@ fn levin_command_response(message_id: &MessageID, command: LevinCommand) -> bool
} }
/// This represents a connection to a peer. /// This represents a connection to a peer.
pub struct Connection<Z: NetworkZone, A, CS, PS, PR, BrdcstStrm> { pub(crate) struct Connection<Z: NetworkZone, A, CS, PS, PR, BrdcstStrm> {
/// The peer sink - where we send messages to the peer. /// The peer sink - where we send messages to the peer.
peer_sink: Z::Sink, peer_sink: Z::Sink,
@ -104,15 +104,15 @@ where
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static, BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
{ {
/// Create a new connection struct. /// Create a new connection struct.
pub fn new( pub(crate) fn new(
peer_sink: Z::Sink, peer_sink: Z::Sink,
client_rx: mpsc::Receiver<ConnectionTaskRequest>, client_rx: mpsc::Receiver<ConnectionTaskRequest>,
broadcast_stream: BrdcstStrm, broadcast_stream: BrdcstStrm,
peer_request_handler: PeerRequestHandler<Z, A, CS, PS, PR>, peer_request_handler: PeerRequestHandler<Z, A, CS, PS, PR>,
connection_guard: ConnectionGuard, connection_guard: ConnectionGuard,
error: SharedError<PeerError>, error: SharedError<PeerError>,
) -> Connection<Z, A, CS, PS, PR, BrdcstStrm> { ) -> Self {
Connection { Self {
peer_sink, peer_sink,
state: State::WaitingForRequest, state: State::WaitingForRequest,
request_timeout: None, request_timeout: None,
@ -174,14 +174,14 @@ where
if let Err(e) = res { if let Err(e) = res {
// can't clone the error so turn it to a string first, hacky but oh well. // can't clone the error so turn it to a string first, hacky but oh well.
let err_str = e.to_string(); let err_str = e.to_string();
let _ = req.response_channel.send(Err(err_str.clone().into())); let _unused = req.response_channel.send(Err(err_str.into()));
return Err(e); return Err(e);
} else { }
// We still need to respond even if the response is this. // We still need to respond even if the response is this.
let _ = req let _unused = req
.response_channel .response_channel
.send(Ok(PeerResponse::Protocol(ProtocolResponse::NA))); .send(Ok(PeerResponse::Protocol(ProtocolResponse::NA)));
}
Ok(()) Ok(())
} }
@ -215,7 +215,7 @@ where
}; };
// Check if the message is a response to our request. // Check if the message is a response to our request.
if levin_command_response(request_id, mes.command()) { if levin_command_response(*request_id, mes.command()) {
// TODO: Do more checks before returning response. // TODO: Do more checks before returning response.
let State::WaitingForResponse { tx, .. } = let State::WaitingForResponse { tx, .. } =
@ -224,7 +224,7 @@ where
panic!("Not in correct state, can't receive response!") panic!("Not in correct state, can't receive response!")
}; };
let _ = tx.send(Ok(mes let _unused = tx.send(Ok(mes
.try_into() .try_into()
.map_err(|_| PeerError::PeerSentInvalidMessage)?)); .map_err(|_| PeerError::PeerSentInvalidMessage)?));
@ -282,7 +282,7 @@ where
tokio::select! { tokio::select! {
biased; biased;
_ = self.request_timeout.as_mut().expect("Request timeout was not set!") => { () = self.request_timeout.as_mut().expect("Request timeout was not set!") => {
Err(PeerError::ClientChannelClosed) Err(PeerError::ClientChannelClosed)
} }
broadcast_req = self.broadcast_stream.next() => { broadcast_req = self.broadcast_stream.next() => {
@ -306,8 +306,11 @@ where
/// Runs the Connection handler logic, this should be put in a separate task. /// Runs the Connection handler logic, this should be put in a separate task.
/// ///
/// `eager_protocol_messages` are protocol messages that we received during a handshake. /// `eager_protocol_messages` are protocol messages that we received during a handshake.
pub async fn run<Str>(mut self, mut stream: Str, eager_protocol_messages: Vec<ProtocolMessage>) pub(crate) async fn run<Str>(
where mut self,
mut stream: Str,
eager_protocol_messages: Vec<ProtocolMessage>,
) where
Str: FusedStream<Item = Result<Message, cuprate_wire::BucketError>> + Unpin, Str: FusedStream<Item = Result<Message, cuprate_wire::BucketError>> + Unpin,
{ {
tracing::debug!( tracing::debug!(
@ -348,6 +351,7 @@ where
/// Shutdowns the connection, flushing pending requests and setting the error slot, if it hasn't been /// Shutdowns the connection, flushing pending requests and setting the error slot, if it hasn't been
/// set already. /// set already.
#[expect(clippy::significant_drop_tightening)]
fn shutdown(mut self, err: PeerError) { fn shutdown(mut self, err: PeerError) {
tracing::debug!("Connection task shutting down: {}", err); tracing::debug!("Connection task shutting down: {}", err);
@ -362,11 +366,11 @@ where
if let State::WaitingForResponse { tx, .. } = if let State::WaitingForResponse { tx, .. } =
std::mem::replace(&mut self.state, State::WaitingForRequest) std::mem::replace(&mut self.state, State::WaitingForRequest)
{ {
let _ = tx.send(Err(err_str.clone().into())); let _unused = tx.send(Err(err_str.clone().into()));
} }
while let Ok(req) = client_rx.try_recv() { while let Ok(req) = client_rx.try_recv() {
let _ = req.response_channel.send(Err(err_str.clone().into())); let _unused = req.response_channel.send(Err(err_str.clone().into()));
} }
self.connection_guard.connection_closed(); self.connection_guard.connection_closed();

View file

@ -40,7 +40,7 @@ impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
Connector<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr> Connector<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
{ {
/// Create a new connector from a handshaker. /// Create a new connector from a handshaker.
pub fn new(handshaker: HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>) -> Self { pub const fn new(handshaker: HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>) -> Self {
Self { handshaker } Self { handshaker }
} }
} }

View file

@ -113,7 +113,7 @@ impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr> HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
{ {
/// Creates a new handshaker. /// Creates a new handshaker.
fn new( const fn new(
address_book: AdrBook, address_book: AdrBook,
peer_sync_svc: PSync, peer_sync_svc: PSync,
core_sync_svc: CSync, core_sync_svc: CSync,
@ -226,7 +226,8 @@ pub async fn ping<N: NetworkZone>(addr: N::Addr) -> Result<u64, HandshakeError>
Err(BucketError::IO(std::io::Error::new( Err(BucketError::IO(std::io::Error::new(
std::io::ErrorKind::ConnectionAborted, std::io::ErrorKind::ConnectionAborted,
"The peer stream returned None", "The peer stream returned None",
)))? ))
.into())
} }
/// This function completes a handshake with the requested peer. /// This function completes a handshake with the requested peer.
@ -403,7 +404,10 @@ where
break 'check_out_addr None; break 'check_out_addr None;
}; };
// u32 does not make sense as a port so just truncate it. #[expect(
clippy::cast_possible_truncation,
reason = "u32 does not make sense as a port so just truncate it."
)]
outbound_address.set_port(peer_node_data.my_port as u16); outbound_address.set_port(peer_node_data.my_port as u16);
let Ok(Ok(ping_peer_id)) = timeout( let Ok(Ok(ping_peer_id)) = timeout(
@ -508,7 +512,7 @@ where
info.id, info.id,
info.handle.clone(), info.handle.clone(),
connection_tx.clone(), connection_tx.clone(),
semaphore.clone(), Arc::clone(&semaphore),
address_book, address_book,
core_sync_svc, core_sync_svc,
peer_sync_svc, peer_sync_svc,
@ -671,7 +675,7 @@ async fn wait_for_message<Z: NetworkZone>(
_ => { _ => {
return Err(HandshakeError::PeerSentInvalidMessage( return Err(HandshakeError::PeerSentInvalidMessage(
"Peer sent an admin request before responding to the handshake", "Peer sent an admin request before responding to the handshake",
)) ));
} }
} }
} }
@ -686,16 +690,17 @@ async fn wait_for_message<Z: NetworkZone>(
)); ));
} }
_ => Err(HandshakeError::PeerSentInvalidMessage( Message::Response(_) => Err(HandshakeError::PeerSentInvalidMessage(
"Peer sent an incorrect message", "Peer sent an incorrect message",
)), )),
}? }?;
} }
Err(BucketError::IO(std::io::Error::new( Err(BucketError::IO(std::io::Error::new(
std::io::ErrorKind::ConnectionAborted, std::io::ErrorKind::ConnectionAborted,
"The peer stream returned None", "The peer stream returned None",
)))? ))
.into())
} }
/// Sends a [`AdminResponseMessage::SupportFlags`] down the peer sink. /// Sends a [`AdminResponseMessage::SupportFlags`] down the peer sink.

View file

@ -87,14 +87,13 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
where where
NAdrBook: AddressBook<N> + Clone, NAdrBook: AddressBook<N> + Clone,
{ {
let HandshakerBuilder { let Self {
core_sync_svc, core_sync_svc,
peer_sync_svc, peer_sync_svc,
protocol_request_svc, protocol_request_svc,
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker, broadcast_stream_maker,
connection_parent_span, connection_parent_span,
_zone,
.. ..
} = self; } = self;
@ -106,7 +105,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker, broadcast_stream_maker,
connection_parent_span, connection_parent_span,
_zone, _zone: PhantomData,
} }
} }
@ -130,14 +129,13 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
where where
NCSync: CoreSyncSvc + Clone, NCSync: CoreSyncSvc + Clone,
{ {
let HandshakerBuilder { let Self {
address_book, address_book,
peer_sync_svc, peer_sync_svc,
protocol_request_svc, protocol_request_svc,
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker, broadcast_stream_maker,
connection_parent_span, connection_parent_span,
_zone,
.. ..
} = self; } = self;
@ -149,7 +147,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker, broadcast_stream_maker,
connection_parent_span, connection_parent_span,
_zone, _zone: PhantomData,
} }
} }
@ -167,14 +165,13 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
where where
NPSync: PeerSyncSvc<N> + Clone, NPSync: PeerSyncSvc<N> + Clone,
{ {
let HandshakerBuilder { let Self {
address_book, address_book,
core_sync_svc, core_sync_svc,
protocol_request_svc, protocol_request_svc,
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker, broadcast_stream_maker,
connection_parent_span, connection_parent_span,
_zone,
.. ..
} = self; } = self;
@ -186,7 +183,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker, broadcast_stream_maker,
connection_parent_span, connection_parent_span,
_zone, _zone: PhantomData,
} }
} }
@ -204,14 +201,13 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
where where
NProtoHdlr: ProtocolRequestHandler + Clone, NProtoHdlr: ProtocolRequestHandler + Clone,
{ {
let HandshakerBuilder { let Self {
address_book, address_book,
core_sync_svc, core_sync_svc,
peer_sync_svc, peer_sync_svc,
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker, broadcast_stream_maker,
connection_parent_span, connection_parent_span,
_zone,
.. ..
} = self; } = self;
@ -223,7 +219,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker, broadcast_stream_maker,
connection_parent_span, connection_parent_span,
_zone, _zone: PhantomData,
} }
} }
@ -242,14 +238,13 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static, BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
NBrdcstStrmMkr: Fn(InternalPeerID<N::Addr>) -> BrdcstStrm + Clone + Send + 'static, NBrdcstStrmMkr: Fn(InternalPeerID<N::Addr>) -> BrdcstStrm + Clone + Send + 'static,
{ {
let HandshakerBuilder { let Self {
address_book, address_book,
core_sync_svc, core_sync_svc,
peer_sync_svc, peer_sync_svc,
protocol_request_svc, protocol_request_svc,
our_basic_node_data, our_basic_node_data,
connection_parent_span, connection_parent_span,
_zone,
.. ..
} = self; } = self;
@ -261,7 +256,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker: new_broadcast_stream_maker, broadcast_stream_maker: new_broadcast_stream_maker,
connection_parent_span, connection_parent_span,
_zone, _zone: PhantomData,
} }
} }
@ -270,6 +265,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
/// ## Default Connection Parent Span /// ## Default Connection Parent Span
/// ///
/// The default connection span will be [`Span::none`]. /// The default connection span will be [`Span::none`].
#[must_use]
pub fn with_connection_parent_span(self, connection_parent_span: Span) -> Self { pub fn with_connection_parent_span(self, connection_parent_span: Span) -> Self {
Self { Self {
connection_parent_span: Some(connection_parent_span), connection_parent_span: Some(connection_parent_span),

View file

@ -42,8 +42,8 @@ pub struct DummyCoreSyncSvc(CoreSyncData);
impl DummyCoreSyncSvc { impl DummyCoreSyncSvc {
/// Returns a [`DummyCoreSyncSvc`] that will just return the mainnet genesis [`CoreSyncData`]. /// Returns a [`DummyCoreSyncSvc`] that will just return the mainnet genesis [`CoreSyncData`].
pub fn static_mainnet_genesis() -> DummyCoreSyncSvc { pub const fn static_mainnet_genesis() -> Self {
DummyCoreSyncSvc(CoreSyncData { Self(CoreSyncData {
cumulative_difficulty: 1, cumulative_difficulty: 1,
cumulative_difficulty_top64: 0, cumulative_difficulty_top64: 0,
current_height: 1, current_height: 1,
@ -56,8 +56,8 @@ impl DummyCoreSyncSvc {
} }
/// Returns a [`DummyCoreSyncSvc`] that will just return the testnet genesis [`CoreSyncData`]. /// Returns a [`DummyCoreSyncSvc`] that will just return the testnet genesis [`CoreSyncData`].
pub fn static_testnet_genesis() -> DummyCoreSyncSvc { pub const fn static_testnet_genesis() -> Self {
DummyCoreSyncSvc(CoreSyncData { Self(CoreSyncData {
cumulative_difficulty: 1, cumulative_difficulty: 1,
cumulative_difficulty_top64: 0, cumulative_difficulty_top64: 0,
current_height: 1, current_height: 1,
@ -70,8 +70,8 @@ impl DummyCoreSyncSvc {
} }
/// Returns a [`DummyCoreSyncSvc`] that will just return the stagenet genesis [`CoreSyncData`]. /// Returns a [`DummyCoreSyncSvc`] that will just return the stagenet genesis [`CoreSyncData`].
pub fn static_stagenet_genesis() -> DummyCoreSyncSvc { pub const fn static_stagenet_genesis() -> Self {
DummyCoreSyncSvc(CoreSyncData { Self(CoreSyncData {
cumulative_difficulty: 1, cumulative_difficulty: 1,
cumulative_difficulty_top64: 0, cumulative_difficulty_top64: 0,
current_height: 1, current_height: 1,
@ -84,8 +84,8 @@ impl DummyCoreSyncSvc {
} }
/// Returns a [`DummyCoreSyncSvc`] that will return the provided [`CoreSyncData`]. /// Returns a [`DummyCoreSyncSvc`] that will return the provided [`CoreSyncData`].
pub fn static_custom(data: CoreSyncData) -> DummyCoreSyncSvc { pub const fn static_custom(data: CoreSyncData) -> Self {
DummyCoreSyncSvc(data) Self(data)
} }
} }

View file

@ -46,7 +46,7 @@ pub(crate) struct PeerRequestHandler<Z: NetworkZone, A, CS, PS, PR> {
pub peer_info: PeerInformation<Z::Addr>, pub peer_info: PeerInformation<Z::Addr>,
} }
impl<Z: NetworkZone, A, CS, PS, PR> PeerRequestHandler<Z, A, CS, PS, PR> impl<Z, A, CS, PS, PR> PeerRequestHandler<Z, A, CS, PS, PR>
where where
Z: NetworkZone, Z: NetworkZone,
A: AddressBook<Z>, A: AddressBook<Z>,
@ -55,7 +55,7 @@ where
PR: ProtocolRequestHandler, PR: ProtocolRequestHandler,
{ {
/// Handles an incoming [`PeerRequest`] to our node. /// Handles an incoming [`PeerRequest`] to our node.
pub async fn handle_peer_request( pub(crate) async fn handle_peer_request(
&mut self, &mut self,
req: PeerRequest, req: PeerRequest,
) -> Result<PeerResponse, tower::BoxError> { ) -> Result<PeerResponse, tower::BoxError> {

View file

@ -1,6 +1,6 @@
//! Timeout Monitor //! Timeout Monitor
//! //!
//! This module holds the task that sends periodic [TimedSync](PeerRequest::TimedSync) requests to a peer to make //! This module holds the task that sends periodic [`TimedSync`](PeerRequest::TimedSync) requests to a peer to make
//! sure the connection is still active. //! sure the connection is still active.
use std::sync::Arc; use std::sync::Arc;
@ -64,7 +64,7 @@ where
return Ok(()); return Ok(());
} }
let Ok(permit) = semaphore.clone().try_acquire_owned() else { let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() else {
// If we can't get a permit the connection is currently waiting for a response, so no need to // If we can't get a permit the connection is currently waiting for a response, so no need to
// do a timed sync. // do a timed sync.
continue; continue;

View file

@ -4,7 +4,7 @@ pub struct SharedError<T>(Arc<OnceLock<T>>);
impl<T> Clone for SharedError<T> { impl<T> Clone for SharedError<T> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self(self.0.clone()) Self(Arc::clone(&self.0))
} }
} }

View file

@ -18,11 +18,12 @@ pub struct HandleBuilder {
impl HandleBuilder { impl HandleBuilder {
/// Create a new builder. /// Create a new builder.
pub fn new() -> Self { pub const fn new() -> Self {
Self { permit: None } Self { permit: None }
} }
/// Sets the permit for this connection. /// Sets the permit for this connection.
#[must_use]
pub fn with_permit(mut self, permit: Option<OwnedSemaphorePermit>) -> Self { pub fn with_permit(mut self, permit: Option<OwnedSemaphorePermit>) -> Self {
self.permit = permit; self.permit = permit;
self self
@ -40,7 +41,7 @@ impl HandleBuilder {
_permit: self.permit, _permit: self.permit,
}, },
ConnectionHandle { ConnectionHandle {
token: token.clone(), token,
ban: Arc::new(OnceLock::new()), ban: Arc::new(OnceLock::new()),
}, },
) )
@ -66,13 +67,13 @@ impl ConnectionGuard {
/// ///
/// This will be called on [`Drop::drop`]. /// This will be called on [`Drop::drop`].
pub fn connection_closed(&self) { pub fn connection_closed(&self) {
self.token.cancel() self.token.cancel();
} }
} }
impl Drop for ConnectionGuard { impl Drop for ConnectionGuard {
fn drop(&mut self) { fn drop(&mut self) {
self.token.cancel() self.token.cancel();
} }
} }
@ -90,6 +91,7 @@ impl ConnectionHandle {
} }
/// Bans the peer for the given `duration`. /// Bans the peer for the given `duration`.
pub fn ban_peer(&self, duration: Duration) { pub fn ban_peer(&self, duration: Duration) {
#[expect(clippy::let_underscore_must_use, reason = "TODO: handle error")]
let _ = self.ban.set(BanPeer(duration)); let _ = self.ban.set(BanPeer(duration));
self.token.cancel(); self.token.cancel();
} }
@ -103,6 +105,6 @@ impl ConnectionHandle {
} }
/// Sends the signal to the connection task to disconnect. /// Sends the signal to the connection task to disconnect.
pub fn send_close_signal(&self) { pub fn send_close_signal(&self) {
self.token.cancel() self.token.cancel();
} }
} }

View file

@ -6,7 +6,7 @@
//! //!
//! # Network Zones //! # Network Zones
//! //!
//! This crate abstracts over network zones, Tor/I2p/clearnet with the [NetworkZone] trait. Currently only clearnet is implemented: [ClearNet]. //! This crate abstracts over network zones, Tor/I2p/clearnet with the [`NetworkZone`] trait. Currently only clearnet is implemented: [`ClearNet`].
//! //!
//! # Usage //! # Usage
//! //!
@ -65,6 +65,9 @@ use cuprate_wire::{
NetworkAddress, NetworkAddress,
}; };
#[cfg(test)]
mod tests;
pub mod client; pub mod client;
mod constants; mod constants;
pub mod error; pub mod error;
@ -102,7 +105,7 @@ pub trait NetZoneAddress:
+ Unpin + Unpin
+ 'static + 'static
{ {
/// Cuprate needs to be able to ban peers by IP addresses and not just by SocketAddr as /// Cuprate needs to be able to ban peers by IP addresses and not just by `SocketAddr` as
/// that include the port, to be able to facilitate this network addresses must have a ban ID /// that include the port, to be able to facilitate this network addresses must have a ban ID
/// which for hidden services could just be the address it self but for clear net addresses will /// which for hidden services could just be the address it self but for clear net addresses will
/// be the IP address. /// be the IP address.

View file

@ -19,7 +19,7 @@ impl NetZoneAddress for SocketAddr {
type BanID = IpAddr; type BanID = IpAddr;
fn set_port(&mut self, port: u16) { fn set_port(&mut self, port: u16) {
SocketAddr::set_port(self, port) Self::set_port(self, port);
} }
fn ban_id(&self) -> Self::BanID { fn ban_id(&self) -> Self::BanID {

View file

@ -8,7 +8,7 @@
//! //!
//! Here is every P2P request/response. //! Here is every P2P request/response.
//! //!
//! *note admin messages are already request/response so "Handshake" is actually made of a HandshakeRequest & HandshakeResponse //! *note admin messages are already request/response so "Handshake" is actually made of a `HandshakeRequest` & `HandshakeResponse`
//! //!
//! ```md //! ```md
//! Admin: //! Admin:
@ -78,15 +78,15 @@ pub enum PeerRequest {
} }
impl PeerRequest { impl PeerRequest {
pub fn id(&self) -> MessageID { pub const fn id(&self) -> MessageID {
match self { match self {
PeerRequest::Admin(admin_req) => match admin_req { Self::Admin(admin_req) => match admin_req {
AdminRequestMessage::Handshake(_) => MessageID::Handshake, AdminRequestMessage::Handshake(_) => MessageID::Handshake,
AdminRequestMessage::TimedSync(_) => MessageID::TimedSync, AdminRequestMessage::TimedSync(_) => MessageID::TimedSync,
AdminRequestMessage::Ping => MessageID::Ping, AdminRequestMessage::Ping => MessageID::Ping,
AdminRequestMessage::SupportFlags => MessageID::SupportFlags, AdminRequestMessage::SupportFlags => MessageID::SupportFlags,
}, },
PeerRequest::Protocol(protocol_request) => match protocol_request { Self::Protocol(protocol_request) => match protocol_request {
ProtocolRequest::GetObjects(_) => MessageID::GetObjects, ProtocolRequest::GetObjects(_) => MessageID::GetObjects,
ProtocolRequest::GetChain(_) => MessageID::GetChain, ProtocolRequest::GetChain(_) => MessageID::GetChain,
ProtocolRequest::FluffyMissingTxs(_) => MessageID::FluffyMissingTxs, ProtocolRequest::FluffyMissingTxs(_) => MessageID::FluffyMissingTxs,
@ -98,10 +98,10 @@ impl PeerRequest {
} }
} }
pub fn needs_response(&self) -> bool { pub const fn needs_response(&self) -> bool {
!matches!( !matches!(
self, self,
PeerRequest::Protocol( Self::Protocol(
ProtocolRequest::NewBlock(_) ProtocolRequest::NewBlock(_)
| ProtocolRequest::NewFluffyBlock(_) | ProtocolRequest::NewFluffyBlock(_)
| ProtocolRequest::NewTransactions(_) | ProtocolRequest::NewTransactions(_)
@ -126,15 +126,15 @@ pub enum PeerResponse {
} }
impl PeerResponse { impl PeerResponse {
pub fn id(&self) -> Option<MessageID> { pub const fn id(&self) -> Option<MessageID> {
Some(match self { Some(match self {
PeerResponse::Admin(admin_res) => match admin_res { Self::Admin(admin_res) => match admin_res {
AdminResponseMessage::Handshake(_) => MessageID::Handshake, AdminResponseMessage::Handshake(_) => MessageID::Handshake,
AdminResponseMessage::TimedSync(_) => MessageID::TimedSync, AdminResponseMessage::TimedSync(_) => MessageID::TimedSync,
AdminResponseMessage::Ping(_) => MessageID::Ping, AdminResponseMessage::Ping(_) => MessageID::Ping,
AdminResponseMessage::SupportFlags(_) => MessageID::SupportFlags, AdminResponseMessage::SupportFlags(_) => MessageID::SupportFlags,
}, },
PeerResponse::Protocol(protocol_res) => match protocol_res { Self::Protocol(protocol_res) => match protocol_res {
ProtocolResponse::GetObjects(_) => MessageID::GetObjects, ProtocolResponse::GetObjects(_) => MessageID::GetObjects,
ProtocolResponse::GetChain(_) => MessageID::GetChain, ProtocolResponse::GetChain(_) => MessageID::GetChain,
ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock, ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock,

View file

@ -11,15 +11,15 @@ pub struct MessageConversionError;
impl From<ProtocolRequest> for ProtocolMessage { impl From<ProtocolRequest> for ProtocolMessage {
fn from(value: ProtocolRequest) -> Self { fn from(value: ProtocolRequest) -> Self {
match value { match value {
ProtocolRequest::GetObjects(val) => ProtocolMessage::GetObjectsRequest(val), ProtocolRequest::GetObjects(val) => Self::GetObjectsRequest(val),
ProtocolRequest::GetChain(val) => ProtocolMessage::ChainRequest(val), ProtocolRequest::GetChain(val) => Self::ChainRequest(val),
ProtocolRequest::FluffyMissingTxs(val) => { ProtocolRequest::FluffyMissingTxs(val) => {
ProtocolMessage::FluffyMissingTransactionsRequest(val) Self::FluffyMissingTransactionsRequest(val)
} }
ProtocolRequest::GetTxPoolCompliment(val) => ProtocolMessage::GetTxPoolCompliment(val), ProtocolRequest::GetTxPoolCompliment(val) => Self::GetTxPoolCompliment(val),
ProtocolRequest::NewBlock(val) => ProtocolMessage::NewBlock(val), ProtocolRequest::NewBlock(val) => Self::NewBlock(val),
ProtocolRequest::NewFluffyBlock(val) => ProtocolMessage::NewFluffyBlock(val), ProtocolRequest::NewFluffyBlock(val) => Self::NewFluffyBlock(val),
ProtocolRequest::NewTransactions(val) => ProtocolMessage::NewTransactions(val), ProtocolRequest::NewTransactions(val) => Self::NewTransactions(val),
} }
} }
} }
@ -29,15 +29,15 @@ impl TryFrom<ProtocolMessage> for ProtocolRequest {
fn try_from(value: ProtocolMessage) -> Result<Self, Self::Error> { fn try_from(value: ProtocolMessage) -> Result<Self, Self::Error> {
Ok(match value { Ok(match value {
ProtocolMessage::GetObjectsRequest(val) => ProtocolRequest::GetObjects(val), ProtocolMessage::GetObjectsRequest(val) => Self::GetObjects(val),
ProtocolMessage::ChainRequest(val) => ProtocolRequest::GetChain(val), ProtocolMessage::ChainRequest(val) => Self::GetChain(val),
ProtocolMessage::FluffyMissingTransactionsRequest(val) => { ProtocolMessage::FluffyMissingTransactionsRequest(val) => {
ProtocolRequest::FluffyMissingTxs(val) Self::FluffyMissingTxs(val)
} }
ProtocolMessage::GetTxPoolCompliment(val) => ProtocolRequest::GetTxPoolCompliment(val), ProtocolMessage::GetTxPoolCompliment(val) => Self::GetTxPoolCompliment(val),
ProtocolMessage::NewBlock(val) => ProtocolRequest::NewBlock(val), ProtocolMessage::NewBlock(val) => Self::NewBlock(val),
ProtocolMessage::NewFluffyBlock(val) => ProtocolRequest::NewFluffyBlock(val), ProtocolMessage::NewFluffyBlock(val) => Self::NewFluffyBlock(val),
ProtocolMessage::NewTransactions(val) => ProtocolRequest::NewTransactions(val), ProtocolMessage::NewTransactions(val) => Self::NewTransactions(val),
ProtocolMessage::GetObjectsResponse(_) | ProtocolMessage::ChainEntryResponse(_) => { ProtocolMessage::GetObjectsResponse(_) | ProtocolMessage::ChainEntryResponse(_) => {
return Err(MessageConversionError) return Err(MessageConversionError)
} }
@ -48,8 +48,8 @@ impl TryFrom<ProtocolMessage> for ProtocolRequest {
impl From<PeerRequest> for Message { impl From<PeerRequest> for Message {
fn from(value: PeerRequest) -> Self { fn from(value: PeerRequest) -> Self {
match value { match value {
PeerRequest::Admin(val) => Message::Request(val), PeerRequest::Admin(val) => Self::Request(val),
PeerRequest::Protocol(val) => Message::Protocol(val.into()), PeerRequest::Protocol(val) => Self::Protocol(val.into()),
} }
} }
} }
@ -59,8 +59,8 @@ impl TryFrom<Message> for PeerRequest {
fn try_from(value: Message) -> Result<Self, Self::Error> { fn try_from(value: Message) -> Result<Self, Self::Error> {
match value { match value {
Message::Request(req) => Ok(PeerRequest::Admin(req)), Message::Request(req) => Ok(Self::Admin(req)),
Message::Protocol(pro) => Ok(PeerRequest::Protocol(pro.try_into()?)), Message::Protocol(pro) => Ok(Self::Protocol(pro.try_into()?)),
Message::Response(_) => Err(MessageConversionError), Message::Response(_) => Err(MessageConversionError),
} }
} }
@ -71,10 +71,10 @@ impl TryFrom<ProtocolResponse> for ProtocolMessage {
fn try_from(value: ProtocolResponse) -> Result<Self, Self::Error> { fn try_from(value: ProtocolResponse) -> Result<Self, Self::Error> {
Ok(match value { Ok(match value {
ProtocolResponse::NewTransactions(val) => ProtocolMessage::NewTransactions(val), ProtocolResponse::NewTransactions(val) => Self::NewTransactions(val),
ProtocolResponse::NewFluffyBlock(val) => ProtocolMessage::NewFluffyBlock(val), ProtocolResponse::NewFluffyBlock(val) => Self::NewFluffyBlock(val),
ProtocolResponse::GetChain(val) => ProtocolMessage::ChainEntryResponse(val), ProtocolResponse::GetChain(val) => Self::ChainEntryResponse(val),
ProtocolResponse::GetObjects(val) => ProtocolMessage::GetObjectsResponse(val), ProtocolResponse::GetObjects(val) => Self::GetObjectsResponse(val),
ProtocolResponse::NA => return Err(MessageConversionError), ProtocolResponse::NA => return Err(MessageConversionError),
}) })
} }
@ -85,10 +85,10 @@ impl TryFrom<ProtocolMessage> for ProtocolResponse {
fn try_from(value: ProtocolMessage) -> Result<Self, Self::Error> { fn try_from(value: ProtocolMessage) -> Result<Self, Self::Error> {
Ok(match value { Ok(match value {
ProtocolMessage::NewTransactions(val) => ProtocolResponse::NewTransactions(val), ProtocolMessage::NewTransactions(val) => Self::NewTransactions(val),
ProtocolMessage::NewFluffyBlock(val) => ProtocolResponse::NewFluffyBlock(val), ProtocolMessage::NewFluffyBlock(val) => Self::NewFluffyBlock(val),
ProtocolMessage::ChainEntryResponse(val) => ProtocolResponse::GetChain(val), ProtocolMessage::ChainEntryResponse(val) => Self::GetChain(val),
ProtocolMessage::GetObjectsResponse(val) => ProtocolResponse::GetObjects(val), ProtocolMessage::GetObjectsResponse(val) => Self::GetObjects(val),
ProtocolMessage::ChainRequest(_) ProtocolMessage::ChainRequest(_)
| ProtocolMessage::FluffyMissingTransactionsRequest(_) | ProtocolMessage::FluffyMissingTransactionsRequest(_)
| ProtocolMessage::GetObjectsRequest(_) | ProtocolMessage::GetObjectsRequest(_)
@ -103,8 +103,8 @@ impl TryFrom<Message> for PeerResponse {
fn try_from(value: Message) -> Result<Self, Self::Error> { fn try_from(value: Message) -> Result<Self, Self::Error> {
match value { match value {
Message::Response(res) => Ok(PeerResponse::Admin(res)), Message::Response(res) => Ok(Self::Admin(res)),
Message::Protocol(pro) => Ok(PeerResponse::Protocol(pro.try_into()?)), Message::Protocol(pro) => Ok(Self::Protocol(pro.try_into()?)),
Message::Request(_) => Err(MessageConversionError), Message::Request(_) => Err(MessageConversionError),
} }
} }
@ -115,8 +115,8 @@ impl TryFrom<PeerResponse> for Message {
fn try_from(value: PeerResponse) -> Result<Self, Self::Error> { fn try_from(value: PeerResponse) -> Result<Self, Self::Error> {
Ok(match value { Ok(match value {
PeerResponse::Admin(val) => Message::Response(val), PeerResponse::Admin(val) => Self::Response(val),
PeerResponse::Protocol(val) => Message::Protocol(val.try_into()?), PeerResponse::Protocol(val) => Self::Protocol(val.try_into()?),
}) })
} }
} }

View file

@ -52,7 +52,7 @@ pub struct ZoneSpecificPeerListEntryBase<A: NetZoneAddress> {
pub rpc_credits_per_hash: u32, pub rpc_credits_per_hash: u32,
} }
impl<A: NetZoneAddress> From<ZoneSpecificPeerListEntryBase<A>> for cuprate_wire::PeerListEntryBase { impl<A: NetZoneAddress> From<ZoneSpecificPeerListEntryBase<A>> for PeerListEntryBase {
fn from(value: ZoneSpecificPeerListEntryBase<A>) -> Self { fn from(value: ZoneSpecificPeerListEntryBase<A>) -> Self {
Self { Self {
adr: value.adr.into(), adr: value.adr.into(),
@ -74,9 +74,7 @@ pub enum PeerListConversionError {
PruningSeed(#[from] PruningError), PruningSeed(#[from] PruningError),
} }
impl<A: NetZoneAddress> TryFrom<cuprate_wire::PeerListEntryBase> impl<A: NetZoneAddress> TryFrom<PeerListEntryBase> for ZoneSpecificPeerListEntryBase<A> {
for ZoneSpecificPeerListEntryBase<A>
{
type Error = PeerListConversionError; type Error = PeerListConversionError;
fn try_from(value: PeerListEntryBase) -> Result<Self, Self::Error> { fn try_from(value: PeerListEntryBase) -> Result<Self, Self::Error> {

View file

@ -0,0 +1,4 @@
mod fragmented_handshake;
mod handles;
mod handshake;
mod sending_receiving;

View file

@ -1,4 +1,5 @@
//! This file contains a test for a handshake with monerod but uses fragmented messages. //! This file contains a test for a handshake with monerod but uses fragmented messages.
use std::{ use std::{
net::SocketAddr, net::SocketAddr,
pin::Pin, pin::Pin,
@ -21,20 +22,20 @@ use tokio_util::{
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use cuprate_helper::network::Network; use cuprate_helper::network::Network;
use cuprate_p2p_core::{ use cuprate_test_utils::monerod::monerod;
client::{
handshaker::HandshakerBuilder, ConnectRequest, Connector, DoHandshakeRequest,
InternalPeerID,
},
ClearNetServerCfg, ConnectionDirection, NetworkZone,
};
use cuprate_wire::{ use cuprate_wire::{
common::PeerSupportFlags, common::PeerSupportFlags,
levin::{message::make_fragmented_messages, LevinMessage, Protocol}, levin::{message::make_fragmented_messages, LevinMessage, Protocol},
BasicNodeData, Message, MoneroWireCodec, BasicNodeData, Message, MoneroWireCodec,
}; };
use cuprate_test_utils::monerod::monerod; use crate::{
client::{
handshaker::HandshakerBuilder, ConnectRequest, Connector, DoHandshakeRequest,
InternalPeerID,
},
ClearNetServerCfg, ConnectionDirection, NetworkZone,
};
/// A network zone equal to clear net where every message sent is turned into a fragmented message. /// A network zone equal to clear net where every message sent is turned into a fragmented message.
/// Does not support sending fragmented or dummy messages manually. /// Does not support sending fragmented or dummy messages manually.
@ -184,7 +185,7 @@ async fn fragmented_handshake_monerod_to_cuprate() {
let next_connection_fut = timeout(Duration::from_secs(30), listener.next()); let next_connection_fut = timeout(Duration::from_secs(30), listener.next());
if let Some(Ok((addr, stream, sink))) = next_connection_fut.await.unwrap() { if let Some(Ok((addr, stream, sink))) = next_connection_fut.await.unwrap() {
let _ = handshaker let _unused = handshaker
.ready() .ready()
.await .await
.unwrap() .unwrap()

View file

@ -2,7 +2,7 @@ use std::{sync::Arc, time::Duration};
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
use cuprate_p2p_core::handles::HandleBuilder; use crate::handles::HandleBuilder;
#[test] #[test]
fn send_ban_signal() { fn send_ban_signal() {

View file

@ -9,9 +9,13 @@ use tokio_util::codec::{FramedRead, FramedWrite};
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use cuprate_helper::network::Network; use cuprate_helper::network::Network;
use cuprate_test_utils::{
monerod::monerod,
test_netzone::{TestNetZone, TestNetZoneAddr},
};
use cuprate_wire::{common::PeerSupportFlags, BasicNodeData, MoneroWireCodec}; use cuprate_wire::{common::PeerSupportFlags, BasicNodeData, MoneroWireCodec};
use cuprate_p2p_core::{ use crate::{
client::{ client::{
handshaker::HandshakerBuilder, ConnectRequest, Connector, DoHandshakeRequest, handshaker::HandshakerBuilder, ConnectRequest, Connector, DoHandshakeRequest,
InternalPeerID, InternalPeerID,
@ -19,11 +23,6 @@ use cuprate_p2p_core::{
ClearNet, ClearNetServerCfg, ConnectionDirection, NetworkZone, ClearNet, ClearNetServerCfg, ConnectionDirection, NetworkZone,
}; };
use cuprate_test_utils::{
monerod::monerod,
test_netzone::{TestNetZone, TestNetZoneAddr},
};
#[tokio::test] #[tokio::test]
async fn handshake_cuprate_to_cuprate() { async fn handshake_cuprate_to_cuprate() {
// Tests a Cuprate <-> Cuprate handshake by making 2 handshake services and making them talk to // Tests a Cuprate <-> Cuprate handshake by making 2 handshake services and making them talk to
@ -147,7 +146,7 @@ async fn handshake_monerod_to_cuprate() {
let next_connection_fut = timeout(Duration::from_secs(30), listener.next()); let next_connection_fut = timeout(Duration::from_secs(30), listener.next());
if let Some(Ok((addr, stream, sink))) = next_connection_fut.await.unwrap() { if let Some(Ok((addr, stream, sink))) = next_connection_fut.await.unwrap() {
let _ = handshaker let _unused = handshaker
.ready() .ready()
.await .await
.unwrap() .unwrap()

View file

@ -1,16 +1,15 @@
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use cuprate_helper::network::Network; use cuprate_helper::network::Network;
use cuprate_test_utils::monerod::monerod;
use cuprate_wire::{common::PeerSupportFlags, protocol::GetObjectsRequest, BasicNodeData}; use cuprate_wire::{common::PeerSupportFlags, protocol::GetObjectsRequest, BasicNodeData};
use cuprate_p2p_core::{ use crate::{
client::{handshaker::HandshakerBuilder, ConnectRequest, Connector}, client::{handshaker::HandshakerBuilder, ConnectRequest, Connector},
protocol::{PeerRequest, PeerResponse}, protocol::{PeerRequest, PeerResponse},
ClearNet, ProtocolRequest, ProtocolResponse, ClearNet, ProtocolRequest, ProtocolResponse,
}; };
use cuprate_test_utils::monerod::monerod;
#[tokio::test] #[tokio::test]
async fn get_single_block_from_monerod() { async fn get_single_block_from_monerod() {
let monerod = monerod(["--out-peers=0"]).await; let monerod = monerod(["--out-peers=0"]).await;