From f4c88b6f0538f9abe19b4384c3aabcb3fd3deb40 Mon Sep 17 00:00:00 2001 From: hinto-janai Date: Fri, 20 Sep 2024 20:36:39 -0400 Subject: [PATCH 1/2] p2p: enable workspace lints (#289) * p2p: enable workspace lints * fmt * fixes * fixes * fixes * review fixes --- p2p/p2p/Cargo.toml | 3 ++ p2p/p2p/src/block_downloader.rs | 29 +++++++++-------- p2p/p2p/src/block_downloader/block_queue.rs | 14 ++++---- p2p/p2p/src/block_downloader/chain_tracker.rs | 25 ++++++++------- .../src/block_downloader/download_batch.rs | 2 ++ p2p/p2p/src/block_downloader/request_chain.rs | 4 +-- p2p/p2p/src/block_downloader/tests.rs | 7 ++-- p2p/p2p/src/broadcast.rs | 21 ++++++------ p2p/p2p/src/client_pool.rs | 17 +++++----- p2p/p2p/src/client_pool/disconnect_monitor.rs | 2 +- p2p/p2p/src/connection_maintainer.rs | 32 ++++++++++++------- p2p/p2p/src/inbound_server.rs | 15 +++++---- p2p/p2p/src/lib.rs | 12 +++---- p2p/p2p/src/sync_states.rs | 24 +++++++------- 14 files changed, 116 insertions(+), 91 deletions(-) diff --git a/p2p/p2p/Cargo.toml b/p2p/p2p/Cargo.toml index 7cbbdcb..ef85277 100644 --- a/p2p/p2p/Cargo.toml +++ b/p2p/p2p/Cargo.toml @@ -39,3 +39,6 @@ cuprate-test-utils = { path = "../../test-utils" } indexmap = { workspace = true } proptest = { workspace = true } tokio-test = { workspace = true } + +[lints] +workspace = true \ No newline at end of file diff --git a/p2p/p2p/src/block_downloader.rs b/p2p/p2p/src/block_downloader.rs index d295016..39980a0 100644 --- a/p2p/p2p/src/block_downloader.rs +++ b/p2p/p2p/src/block_downloader.rs @@ -78,7 +78,7 @@ pub struct BlockDownloaderConfig { /// An error that occurred in the [`BlockDownloader`]. #[derive(Debug, thiserror::Error)] -pub enum BlockDownloadError { +pub(crate) enum BlockDownloadError { #[error("A request to a peer timed out.")] TimedOut, #[error("The block buffer was closed.")] @@ -219,7 +219,7 @@ struct BlockDownloader { /// The running chain entry tasks. /// /// Returns a result of the chain entry or an error. - #[allow(clippy::type_complexity)] + #[expect(clippy::type_complexity)] chain_entry_task: JoinSet, ChainEntry), BlockDownloadError>>, /// The current inflight requests. @@ -273,7 +273,7 @@ where } /// Checks if we can make use of any peers that are currently pending requests. - async fn check_pending_peers( + fn check_pending_peers( &mut self, chain_tracker: &mut ChainTracker, pending_peers: &mut BTreeMap>>, @@ -287,7 +287,8 @@ where continue; } - if let Some(peer) = self.try_handle_free_client(chain_tracker, peer).await { + let client = self.try_handle_free_client(chain_tracker, peer); + if let Some(peer) = client { // This peer is ok however it does not have the data we currently need, this will only happen // because of its pruning seed so just skip over all peers with this pruning seed. peers.push(peer); @@ -303,7 +304,7 @@ where /// for them. /// /// Returns the [`ClientPoolDropGuard`] back if it doesn't have the batch according to its pruning seed. - async fn request_inflight_batch_again( + fn request_inflight_batch_again( &mut self, client: ClientPoolDropGuard, ) -> Option> { @@ -354,7 +355,7 @@ where /// /// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according /// to its pruning seed. - async fn request_block_batch( + fn request_block_batch( &mut self, chain_tracker: &mut ChainTracker, client: ClientPoolDropGuard, @@ -399,7 +400,7 @@ where // If our ready queue is too large send duplicate requests for the blocks we are waiting on. if self.block_queue.size() >= self.config.in_progress_queue_size { - return self.request_inflight_batch_again(client).await; + return self.request_inflight_batch_again(client); } // No failed requests that we can handle, request some new blocks. @@ -434,7 +435,7 @@ where /// /// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according /// to its pruning seed. - async fn try_handle_free_client( + fn try_handle_free_client( &mut self, chain_tracker: &mut ChainTracker, client: ClientPoolDropGuard, @@ -472,7 +473,7 @@ where } // Request a batch of blocks instead. - self.request_block_batch(chain_tracker, client).await + self.request_block_batch(chain_tracker, client) } /// Checks the [`ClientPool`] for free peers. @@ -516,7 +517,7 @@ where .push(client); } - self.check_pending_peers(chain_tracker, pending_peers).await; + self.check_pending_peers(chain_tracker, pending_peers); Ok(()) } @@ -574,7 +575,7 @@ where .or_default() .push(client); - self.check_pending_peers(chain_tracker, pending_peers).await; + self.check_pending_peers(chain_tracker, pending_peers); return Ok(()); }; @@ -611,7 +612,7 @@ where .or_default() .push(client); - self.check_pending_peers(chain_tracker, pending_peers).await; + self.check_pending_peers(chain_tracker, pending_peers); Ok(()) } @@ -679,7 +680,7 @@ where .or_default() .push(client); - self.check_pending_peers(&mut chain_tracker, &mut pending_peers).await; + self.check_pending_peers(&mut chain_tracker, &mut pending_peers); } Err(_) => self.amount_of_empty_chain_entries += 1 } @@ -698,7 +699,7 @@ struct BlockDownloadTaskResponse { } /// Returns if a peer has all the blocks in a range, according to its [`PruningSeed`]. -fn client_has_block_in_range( +const fn client_has_block_in_range( pruning_seed: &PruningSeed, start_height: usize, length: usize, diff --git a/p2p/p2p/src/block_downloader/block_queue.rs b/p2p/p2p/src/block_downloader/block_queue.rs index 5a92f49..5dd1b0d 100644 --- a/p2p/p2p/src/block_downloader/block_queue.rs +++ b/p2p/p2p/src/block_downloader/block_queue.rs @@ -13,7 +13,7 @@ use super::{BlockBatch, BlockDownloadError}; /// /// Also, the [`Ord`] impl is reversed so older blocks (lower height) come first in a [`BinaryHeap`]. #[derive(Debug, Clone)] -pub struct ReadyQueueBatch { +pub(crate) struct ReadyQueueBatch { /// The start height of the batch. pub start_height: usize, /// The batch of blocks. @@ -43,7 +43,7 @@ impl Ord for ReadyQueueBatch { /// The block queue that holds downloaded block batches, adding them to the [`async_buffer`] when the /// oldest batch has been downloaded. -pub struct BlockQueue { +pub(crate) struct BlockQueue { /// A queue of ready batches. ready_batches: BinaryHeap, /// The size, in bytes, of all the batches in [`Self::ready_batches`]. @@ -55,8 +55,8 @@ pub struct BlockQueue { impl BlockQueue { /// Creates a new [`BlockQueue`]. - pub fn new(buffer_appender: BufferAppender) -> BlockQueue { - BlockQueue { + pub(crate) const fn new(buffer_appender: BufferAppender) -> Self { + Self { ready_batches: BinaryHeap::new(), ready_batches_size: 0, buffer_appender, @@ -64,12 +64,12 @@ impl BlockQueue { } /// Returns the oldest batch that has not been put in the [`async_buffer`] yet. - pub fn oldest_ready_batch(&self) -> Option { + pub(crate) fn oldest_ready_batch(&self) -> Option { self.ready_batches.peek().map(|batch| batch.start_height) } /// Returns the size of all the batches that have not been put into the [`async_buffer`] yet. - pub fn size(&self) -> usize { + pub(crate) const fn size(&self) -> usize { self.ready_batches_size } @@ -77,7 +77,7 @@ impl BlockQueue { /// /// `oldest_in_flight_start_height` should be the start height of the oldest batch that is still inflight, if /// there are no batches inflight then this should be [`None`]. - pub async fn add_incoming_batch( + pub(crate) async fn add_incoming_batch( &mut self, new_batch: ReadyQueueBatch, oldest_in_flight_start_height: Option, diff --git a/p2p/p2p/src/block_downloader/chain_tracker.rs b/p2p/p2p/src/block_downloader/chain_tracker.rs index aacb163..a2f03c5 100644 --- a/p2p/p2p/src/block_downloader/chain_tracker.rs +++ b/p2p/p2p/src/block_downloader/chain_tracker.rs @@ -20,7 +20,7 @@ pub(crate) struct ChainEntry { /// A batch of blocks to retrieve. #[derive(Clone)] -pub struct BlocksToRetrieve { +pub(crate) struct BlocksToRetrieve { /// The block IDs to get. pub ids: ByteArrayVec<32>, /// The hash of the last block before this batch. @@ -39,7 +39,7 @@ pub struct BlocksToRetrieve { /// An error returned from the [`ChainTracker`]. #[derive(Debug, Clone)] -pub enum ChainTrackerError { +pub(crate) enum ChainTrackerError { /// The new chain entry is invalid. NewEntryIsInvalid, /// The new chain entry does not follow from the top of our chain tracker. @@ -50,7 +50,7 @@ pub enum ChainTrackerError { /// /// This struct allows following a single chain. It takes in [`ChainEntry`]s and /// allows getting [`BlocksToRetrieve`]. -pub struct ChainTracker { +pub(crate) struct ChainTracker { /// A list of [`ChainEntry`]s, in order. entries: VecDeque>, /// The height of the first block, in the first entry in [`Self::entries`]. @@ -65,7 +65,7 @@ pub struct ChainTracker { impl ChainTracker { /// Creates a new chain tracker. - pub fn new( + pub(crate) fn new( new_entry: ChainEntry, first_height: usize, our_genesis: [u8; 32], @@ -76,9 +76,9 @@ impl ChainTracker { entries.push_back(new_entry); Self { - top_seen_hash, entries, first_height, + top_seen_hash, previous_hash, our_genesis, } @@ -86,17 +86,17 @@ impl ChainTracker { /// Returns `true` if the peer is expected to have the next block after our highest seen block /// according to their pruning seed. - pub fn should_ask_for_next_chain_entry(&self, seed: &PruningSeed) -> bool { + pub(crate) fn should_ask_for_next_chain_entry(&self, seed: &PruningSeed) -> bool { seed.has_full_block(self.top_height(), CRYPTONOTE_MAX_BLOCK_HEIGHT) } /// Returns the simple history, the highest seen block and the genesis block. - pub fn get_simple_history(&self) -> [[u8; 32]; 2] { + pub(crate) const fn get_simple_history(&self) -> [[u8; 32]; 2] { [self.top_seen_hash, self.our_genesis] } /// Returns the height of the highest block we are tracking. - pub fn top_height(&self) -> usize { + pub(crate) fn top_height(&self) -> usize { let top_block_idx = self .entries .iter() @@ -110,7 +110,7 @@ impl ChainTracker { /// /// # Panics /// This function panics if `batch_size` is `0`. - pub fn block_requests_queued(&self, batch_size: usize) -> usize { + pub(crate) fn block_requests_queued(&self, batch_size: usize) -> usize { self.entries .iter() .map(|entry| entry.ids.len().div_ceil(batch_size)) @@ -118,7 +118,10 @@ impl ChainTracker { } /// Attempts to add an incoming [`ChainEntry`] to the chain tracker. - pub fn add_entry(&mut self, mut chain_entry: ChainEntry) -> Result<(), ChainTrackerError> { + pub(crate) fn add_entry( + &mut self, + mut chain_entry: ChainEntry, + ) -> Result<(), ChainTrackerError> { if chain_entry.ids.is_empty() { // The peer must send at lest one overlapping block. chain_entry.handle.ban_peer(MEDIUM_BAN); @@ -154,7 +157,7 @@ impl ChainTracker { /// Returns a batch of blocks to request. /// /// The returned batches length will be less than or equal to `max_blocks` - pub fn blocks_to_get( + pub(crate) fn blocks_to_get( &mut self, pruning_seed: &PruningSeed, max_blocks: usize, diff --git a/p2p/p2p/src/block_downloader/download_batch.rs b/p2p/p2p/src/block_downloader/download_batch.rs index ea57ead..bbb14b3 100644 --- a/p2p/p2p/src/block_downloader/download_batch.rs +++ b/p2p/p2p/src/block_downloader/download_batch.rs @@ -30,6 +30,7 @@ use crate::{ attempt = _attempt ) )] +#[expect(clippy::used_underscore_binding)] pub async fn download_batch_task( client: ClientPoolDropGuard, ids: ByteArrayVec<32>, @@ -103,6 +104,7 @@ async fn request_batch_from_peer( Ok((client, batch)) } +#[expect(clippy::needless_pass_by_value)] fn deserialize_batch( blocks_response: GetObjectsResponse, expected_start_height: usize, diff --git a/p2p/p2p/src/block_downloader/request_chain.rs b/p2p/p2p/src/block_downloader/request_chain.rs index 4b0b47e..bde40ce 100644 --- a/p2p/p2p/src/block_downloader/request_chain.rs +++ b/p2p/p2p/src/block_downloader/request_chain.rs @@ -30,7 +30,7 @@ use crate::{ /// /// Because the block downloader only follows and downloads one chain we only have to send the block hash of /// top block we have found and the genesis block, this is then called `short_history`. -pub async fn request_chain_entry_from_peer( +pub(crate) async fn request_chain_entry_from_peer( mut client: ClientPoolDropGuard, short_history: [[u8; 32]; 2], ) -> Result<(ClientPoolDropGuard, ChainEntry), BlockDownloadError> { @@ -179,7 +179,7 @@ where Some(res) => { // res has already been set, replace it if this peer claims higher cumulative difficulty if res.0.cumulative_difficulty() < task_res.0.cumulative_difficulty() { - let _ = mem::replace(res, task_res); + drop(mem::replace(res, task_res)); } } None => { diff --git a/p2p/p2p/src/block_downloader/tests.rs b/p2p/p2p/src/block_downloader/tests.rs index 86a9a46..a5c5e92 100644 --- a/p2p/p2p/src/block_downloader/tests.rs +++ b/p2p/p2p/src/block_downloader/tests.rs @@ -47,6 +47,7 @@ proptest! { let tokio_pool = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); + #[expect(clippy::significant_drop_tightening)] tokio_pool.block_on(async move { timeout(Duration::from_secs(600), async move { let client_pool = ClientPool::new(); @@ -54,7 +55,7 @@ proptest! { let mut peer_ids = Vec::with_capacity(peers); for _ in 0..peers { - let client = mock_block_downloader_client(blockchain.clone()); + let client = mock_block_downloader_client(Arc::clone(&blockchain)); peer_ids.push(client.info.id); @@ -156,7 +157,7 @@ prop_compose! { for (height, mut block) in blocks.into_iter().enumerate() { if let Some(last) = blockchain.last() { block.0.header.previous = *last.0; - block.0.miner_transaction.prefix_mut().inputs = vec![Input::Gen(height)] + block.0.miner_transaction.prefix_mut().inputs = vec![Input::Gen(height)]; } blockchain.insert(block.0.hash(), block); @@ -173,7 +174,7 @@ fn mock_block_downloader_client(blockchain: Arc) -> Client( +pub(crate) fn init_broadcast_channels( config: BroadcastConfig, ) -> ( BroadcastSvc, @@ -193,7 +193,7 @@ impl Service> for BroadcastSvc { }; // An error here means _all_ receivers were dropped which we assume will never happen. - let _ = match direction { + drop(match direction { Some(ConnectionDirection::Inbound) => { self.tx_broadcast_channel_inbound.send(nex_tx_info) } @@ -201,10 +201,10 @@ impl Service> for BroadcastSvc { self.tx_broadcast_channel_outbound.send(nex_tx_info) } None => { - let _ = self.tx_broadcast_channel_outbound.send(nex_tx_info.clone()); + drop(self.tx_broadcast_channel_outbound.send(nex_tx_info.clone())); self.tx_broadcast_channel_inbound.send(nex_tx_info) } - }; + }); } } @@ -246,7 +246,7 @@ struct BroadcastTxInfo { /// /// This is given to the connection task to await on for broadcast messages. #[pin_project::pin_project] -pub struct BroadcastMessageStream { +pub(crate) struct BroadcastMessageStream { /// The peer that is holding this stream. addr: InternalPeerID, @@ -336,8 +336,9 @@ impl Stream for BroadcastMessageStream { Poll::Ready(Some(BroadcastMessage::NewTransaction(txs))) } else { tracing::trace!("Diffusion flush timer expired but no txs to diffuse"); - // poll next_flush now to register the waker with it + // poll next_flush now to register the waker with it. // the waker will already be registered with the block broadcast channel. + #[expect(clippy::let_underscore_must_use)] let _ = this.next_flush.poll(cx); Poll::Pending } @@ -458,7 +459,7 @@ mod tests { let match_tx = |mes, txs| match mes { BroadcastMessage::NewTransaction(tx) => assert_eq!(tx.txs.as_slice(), txs), - _ => panic!("Block broadcast?"), + BroadcastMessage::NewFluffyBlock(_) => panic!("Block broadcast?"), }; let next = outbound_stream.next().await.unwrap(); @@ -520,7 +521,7 @@ mod tests { let match_tx = |mes, txs| match mes { BroadcastMessage::NewTransaction(tx) => assert_eq!(tx.txs.as_slice(), txs), - _ => panic!("Block broadcast?"), + BroadcastMessage::NewFluffyBlock(_) => panic!("Block broadcast?"), }; let next = outbound_stream.next().await.unwrap(); @@ -536,6 +537,6 @@ mod tests { futures::future::select(inbound_stream_from.next(), outbound_stream_from.next()) ) .await - .is_err()) + .is_err()); } } diff --git a/p2p/p2p/src/client_pool.rs b/p2p/p2p/src/client_pool.rs index 51f57e9..3405224 100644 --- a/p2p/p2p/src/client_pool.rs +++ b/p2p/p2p/src/client_pool.rs @@ -8,7 +8,7 @@ //! returns the peer to the pool when it is dropped. //! //! Internally the pool is a [`DashMap`] which means care should be taken in `async` code -//! as internally this uses blocking RwLocks. +//! as internally this uses blocking `RwLock`s. use std::sync::Arc; use dashmap::DashMap; @@ -24,7 +24,7 @@ use cuprate_p2p_core::{ pub(crate) mod disconnect_monitor; mod drop_guard_client; -pub use drop_guard_client::ClientPoolDropGuard; +pub(crate) use drop_guard_client::ClientPoolDropGuard; /// The client pool, which holds currently connected free peers. /// @@ -38,16 +38,17 @@ pub struct ClientPool { impl ClientPool { /// Returns a new [`ClientPool`] wrapped in an [`Arc`]. - pub fn new() -> Arc> { + pub fn new() -> Arc { let (tx, rx) = mpsc::unbounded_channel(); - let pool = Arc::new(ClientPool { + let pool = Arc::new(Self { clients: DashMap::new(), new_connection_tx: tx, }); tokio::spawn( - disconnect_monitor::disconnect_monitor(rx, pool.clone()).instrument(Span::current()), + disconnect_monitor::disconnect_monitor(rx, Arc::clone(&pool)) + .instrument(Span::current()), ); pool @@ -69,8 +70,7 @@ impl ClientPool { return; } - let res = self.clients.insert(id, client); - assert!(res.is_none()); + assert!(self.clients.insert(id, client).is_none()); // We have to check this again otherwise we could have a race condition where a // peer is disconnected after the first check, the disconnect monitor tries to remove it, @@ -121,7 +121,6 @@ impl ClientPool { /// Note that the returned iterator is not guaranteed to contain every peer asked for. /// /// See [`Self::borrow_client`] for borrowing a single client. - #[allow(private_interfaces)] // TODO: Remove me when 2024 Rust pub fn borrow_clients<'a, 'b>( self: &'a Arc, peers: &'b [InternalPeerID], @@ -133,7 +132,7 @@ impl ClientPool { mod sealed { /// TODO: Remove me when 2024 Rust /// - /// https://rust-lang.github.io/rfcs/3498-lifetime-capture-rules-2024.html#the-captures-trick + /// pub trait Captures {} impl Captures for T {} diff --git a/p2p/p2p/src/client_pool/disconnect_monitor.rs b/p2p/p2p/src/client_pool/disconnect_monitor.rs index f45d5e3..f54b560 100644 --- a/p2p/p2p/src/client_pool/disconnect_monitor.rs +++ b/p2p/p2p/src/client_pool/disconnect_monitor.rs @@ -78,6 +78,6 @@ impl Future for PeerDisconnectFut { this.closed_fut .poll(cx) - .map(|_| this.peer_id.take().unwrap()) + .map(|()| this.peer_id.take().unwrap()) } } diff --git a/p2p/p2p/src/connection_maintainer.rs b/p2p/p2p/src/connection_maintainer.rs index 3dfd5e8..be89973 100644 --- a/p2p/p2p/src/connection_maintainer.rs +++ b/p2p/p2p/src/connection_maintainer.rs @@ -99,12 +99,17 @@ where /// Connects to random seeds to get peers and immediately disconnects #[instrument(level = "info", skip(self))] + #[expect( + clippy::significant_drop_in_scrutinee, + clippy::significant_drop_tightening + )] async fn connect_to_random_seeds(&mut self) -> Result<(), OutboundConnectorError> { let seeds = N::SEEDS.choose_multiple(&mut thread_rng(), MAX_SEED_CONNECTIONS); - if seeds.len() == 0 { - panic!("No seed nodes available to get peers from"); - } + assert!( + seeds.len() != 0, + "No seed nodes available to get peers from" + ); let mut allowed_errors = seeds.len(); @@ -129,7 +134,7 @@ where } while let Some(res) = handshake_futs.join_next().await { - if matches!(res, Err(_) | Ok(Err(_)) | Ok(Ok(Err(_)))) { + if matches!(res, Err(_) | Ok(Err(_) | Ok(Err(_)))) { allowed_errors -= 1; } } @@ -144,7 +149,7 @@ where /// Connects to a given outbound peer. #[instrument(level = "info", skip_all)] async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) { - let client_pool = self.client_pool.clone(); + let client_pool = Arc::clone(&self.client_pool); let connection_fut = self .connector_svc .ready() @@ -157,6 +162,7 @@ where tokio::spawn( async move { + #[expect(clippy::significant_drop_in_scrutinee)] if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await { client_pool.add_new_client(peer); } @@ -166,14 +172,16 @@ where } /// Handles a request from the peer set for more peers. + #[expect( + clippy::significant_drop_tightening, + reason = "we need to hold onto a permit" + )] async fn handle_peer_request( &mut self, req: &MakeConnectionRequest, ) -> Result<(), OutboundConnectorError> { // try to get a permit. - let permit = self - .outbound_semaphore - .clone() + let permit = Arc::clone(&self.outbound_semaphore) .try_acquire_owned() .or_else(|_| { // if we can't get a permit add one if we are below the max number of connections. @@ -183,7 +191,9 @@ where } else { self.outbound_semaphore.add_permits(1); self.extra_peers += 1; - Ok(self.outbound_semaphore.clone().try_acquire_owned().unwrap()) + Ok(Arc::clone(&self.outbound_semaphore) + .try_acquire_owned() + .unwrap()) } })?; @@ -272,12 +282,12 @@ where tracing::info!("Shutting down outbound connector, make connection channel closed."); return; }; - // We can't really do much about errors in this function. + #[expect(clippy::let_underscore_must_use, reason = "We can't really do much about errors in this function.")] let _ = self.handle_peer_request(&peer_req).await; }, // This future is not cancellation safe as you will lose your space in the queue but as we are the only place // that actually requires permits that should be ok. - Ok(permit) = self.outbound_semaphore.clone().acquire_owned() => { + Ok(permit) = Arc::clone(&self.outbound_semaphore).acquire_owned() => { if self.handle_free_permit(permit).await.is_err() { // if we got an error then we still have a permit free so to prevent this from just looping // uncontrollably add a timeout. diff --git a/p2p/p2p/src/inbound_server.rs b/p2p/p2p/src/inbound_server.rs index 80ff38e..0d50d54 100644 --- a/p2p/p2p/src/inbound_server.rs +++ b/p2p/p2p/src/inbound_server.rs @@ -100,7 +100,7 @@ where }; // If we're still behind our maximum limit, Initiate handshake. - if let Ok(permit) = semaphore.clone().try_acquire_owned() { + if let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() { tracing::debug!("Permit free for incoming connection, attempting handshake."); let fut = handshaker.ready().await?.call(DoHandshakeRequest { @@ -111,11 +111,12 @@ where permit: Some(permit), }); - let cloned_pool = client_pool.clone(); + let cloned_pool = Arc::clone(&client_pool); tokio::spawn( async move { - if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, fut).await { + let client = timeout(HANDSHAKE_TIMEOUT, fut).await; + if let Ok(Ok(peer)) = client { cloned_pool.add_new_client(peer); } } @@ -133,8 +134,10 @@ where let fut = timeout(PING_REQUEST_TIMEOUT, peer_stream.next()); // Ok if timeout did not elapsed -> Some if there is a message -> Ok if it has been decoded - if let Ok(Some(Ok(Message::Request(AdminRequestMessage::Ping)))) = fut.await - { + if matches!( + fut.await, + Ok(Some(Ok(Message::Request(AdminRequestMessage::Ping)))) + ) { let response = peer_sink .send( Message::Response(AdminResponseMessage::Ping(PingResponse { @@ -148,7 +151,7 @@ where if let Err(err) = response { tracing::debug!( "Unable to respond to ping request from peer ({addr}): {err}" - ) + ); } } } diff --git a/p2p/p2p/src/lib.rs b/p2p/p2p/src/lib.rs index be18c2a..2f51c6c 100644 --- a/p2p/p2p/src/lib.rs +++ b/p2p/p2p/src/lib.rs @@ -103,7 +103,7 @@ where let outbound_connector = Connector::new(outbound_handshaker); let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new( config.clone(), - client_pool.clone(), + Arc::clone(&client_pool), make_connection_rx, address_book.clone(), outbound_connector, @@ -118,17 +118,17 @@ where ); background_tasks.spawn( inbound_server::inbound_server( - client_pool.clone(), + Arc::clone(&client_pool), inbound_handshaker, address_book.clone(), config, ) .map(|res| { if let Err(e) = res { - tracing::error!("Error in inbound connection listener: {e}") + tracing::error!("Error in inbound connection listener: {e}"); } - tracing::info!("Inbound connection listener shutdown") + tracing::info!("Inbound connection listener shutdown"); }) .instrument(Span::current()), ); @@ -155,7 +155,7 @@ pub struct NetworkInterface { /// on that claimed chain. top_block_watch: watch::Receiver, /// A channel to request extra connections. - #[allow(dead_code)] // will be used eventually + #[expect(dead_code, reason = "will be used eventually")] make_connection_tx: mpsc::Sender, /// The address book service. address_book: BoxCloneService, AddressBookResponse, tower::BoxError>, @@ -184,7 +184,7 @@ impl NetworkInterface { C::Future: Send + 'static, { block_downloader::download_blocks( - self.pool.clone(), + Arc::clone(&self.pool), self.sync_states_svc.clone(), our_chain_service, config, diff --git a/p2p/p2p/src/sync_states.rs b/p2p/p2p/src/sync_states.rs index 70ef6ca..0c03795 100644 --- a/p2p/p2p/src/sync_states.rs +++ b/p2p/p2p/src/sync_states.rs @@ -40,7 +40,7 @@ pub struct NewSyncInfo { /// This is the service that handles: /// 1. Finding out if we need to sync /// 1. Giving the peers that should be synced _from_, to the requester -pub struct PeerSyncSvc { +pub(crate) struct PeerSyncSvc { /// A map of cumulative difficulties to peers. cumulative_difficulties: BTreeMap>>, /// A map of peers to cumulative difficulties. @@ -56,7 +56,7 @@ pub struct PeerSyncSvc { impl PeerSyncSvc { /// Creates a new [`PeerSyncSvc`] with a [`Receiver`](watch::Receiver) that will be updated with /// the highest seen sync data, this makes no guarantees about which peer will be chosen in case of a tie. - pub fn new() -> (Self, watch::Receiver) { + pub(crate) fn new() -> (Self, watch::Receiver) { let (watch_tx, mut watch_rx) = watch::channel(NewSyncInfo { chain_height: 0, top_hash: [0; 32], @@ -108,9 +108,7 @@ impl PeerSyncSvc { if let Some(block_needed) = block_needed { // we just use CRYPTONOTE_MAX_BLOCK_HEIGHT as the blockchain height, this only means // we don't take into account the tip blocks which are not pruned. - self.peers - .get(peer) - .unwrap() + self.peers[peer] .1 .has_full_block(block_needed, CRYPTONOTE_MAX_BLOCK_HEIGHT) } else { @@ -126,7 +124,7 @@ impl PeerSyncSvc { &mut self, peer_id: InternalPeerID, handle: ConnectionHandle, - core_sync_data: CoreSyncData, + core_sync_data: &CoreSyncData, ) -> Result<(), tower::BoxError> { tracing::trace!( "Received new core sync data from peer, top hash: {}", @@ -176,7 +174,7 @@ impl PeerSyncSvc { self.closed_connections.push(PeerDisconnectFut { closed_fut: handle.closed(), peer_id: Some(peer_id), - }) + }); } self.cumulative_difficulties @@ -190,11 +188,15 @@ impl PeerSyncSvc { || self .last_peer_in_watcher_handle .as_ref() - .is_some_and(|handle| handle.is_closed()) + .is_some_and(ConnectionHandle::is_closed) { tracing::debug!( "Updating sync watcher channel with new highest seen cumulative difficulty: {new_cumulative_difficulty}" ); + #[expect( + clippy::let_underscore_must_use, + reason = "dropped receivers can be ignored" + )] let _ = self.new_height_watcher.send(NewSyncInfo { top_hash: core_sync_data.top_id, chain_height: core_sync_data.current_height, @@ -228,8 +230,8 @@ impl Service> for PeerSyncSvc { block_needed, ))), PeerSyncRequest::IncomingCoreSyncData(peer_id, handle, sync_data) => self - .update_peer_sync_info(peer_id, handle, sync_data) - .map(|_| PeerSyncResponse::Ok), + .update_peer_sync_info(peer_id, handle, &sync_data) + .map(|()| PeerSyncResponse::Ok), }; ready(res) @@ -413,6 +415,6 @@ mod tests { assert!( peers.contains(&InternalPeerID::Unknown(0)) && peers.contains(&InternalPeerID::Unknown(1)) - ) + ); } } From 848a6a71c4164a31b46fc4e69b8d514615df1cf0 Mon Sep 17 00:00:00 2001 From: hinto-janai Date: Fri, 20 Sep 2024 20:37:06 -0400 Subject: [PATCH 2/2] p2p/p2p-core: enable workspace lints (#288) * p2p-core: enable workspace lints * fmt * fix tests * fixes * fixes * fixes * expect reason --- Cargo.lock | 2 +- p2p/p2p-core/Cargo.toml | 10 +-- p2p/p2p-core/src/client.rs | 13 ++-- p2p/p2p-core/src/client/connection.rs | 47 ++++++++------ p2p/p2p-core/src/client/connector.rs | 4 +- p2p/p2p-core/src/client/handshaker.rs | 23 ++++--- p2p/p2p-core/src/client/handshaker/builder.rs | 26 ++++---- .../src/client/handshaker/builder/dummy.rs | 16 ++--- p2p/p2p-core/src/client/request_handler.rs | 4 +- p2p/p2p-core/src/client/timeout_monitor.rs | 4 +- p2p/p2p-core/src/error.rs | 2 +- p2p/p2p-core/src/handles.rs | 15 +++-- p2p/p2p-core/src/lib.rs | 14 +++- p2p/p2p-core/src/network_zones/clear.rs | 2 +- p2p/p2p-core/src/protocol.rs | 18 +++--- p2p/p2p-core/src/protocol/try_from.rs | 64 +++++++++---------- p2p/p2p-core/src/services.rs | 6 +- p2p/p2p-core/tests/fragmented_handshake.rs | 19 +++--- p2p/p2p-core/tests/handles.rs | 2 + p2p/p2p-core/tests/handshake.rs | 14 ++-- p2p/p2p-core/tests/sending_receiving.rs | 5 +- 21 files changed, 168 insertions(+), 142 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 72325bb..5481b62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -776,6 +776,7 @@ version = "0.1.0" dependencies = [ "async-trait", "borsh", + "cfg-if", "cuprate-helper", "cuprate-pruning", "cuprate-test-utils", @@ -790,7 +791,6 @@ dependencies = [ "tokio-util", "tower", "tracing", - "tracing-subscriber", ] [[package]] diff --git a/p2p/p2p-core/Cargo.toml b/p2p/p2p-core/Cargo.toml index 9ef8e24..8341fe9 100644 --- a/p2p/p2p-core/Cargo.toml +++ b/p2p/p2p-core/Cargo.toml @@ -14,13 +14,14 @@ cuprate-helper = { path = "../../helper", features = ["asynch"], default-feature cuprate-wire = { path = "../../net/wire", features = ["tracing"] } cuprate-pruning = { path = "../../pruning" } -tokio = { workspace = true, features = ["net", "sync", "macros", "time"]} +tokio = { workspace = true, features = ["net", "sync", "macros", "time", "rt", "rt-multi-thread"]} tokio-util = { workspace = true, features = ["codec"] } tokio-stream = { workspace = true, features = ["sync"]} futures = { workspace = true, features = ["std"] } async-trait = { workspace = true } tower = { workspace = true, features = ["util", "tracing"] } +cfg-if = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true, features = ["std", "attributes"] } hex-literal = { workspace = true } @@ -28,9 +29,10 @@ hex-literal = { workspace = true } borsh = { workspace = true, features = ["derive", "std"], optional = true } [dev-dependencies] -cuprate-test-utils = {path = "../../test-utils"} +cuprate-test-utils = { path = "../../test-utils" } hex = { workspace = true, features = ["std"] } -tokio = { workspace = true, features = ["net", "rt-multi-thread", "rt", "macros"]} tokio-test = { workspace = true } -tracing-subscriber = { workspace = true } + +[lints] +workspace = true \ No newline at end of file diff --git a/p2p/p2p-core/src/client.rs b/p2p/p2p-core/src/client.rs index 662a8ee..8685189 100644 --- a/p2p/p2p-core/src/client.rs +++ b/p2p/p2p-core/src/client.rs @@ -43,8 +43,8 @@ pub enum InternalPeerID { impl Display for InternalPeerID { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - InternalPeerID::KnownAddr(addr) => addr.fmt(f), - InternalPeerID::Unknown(id) => f.write_str(&format!("Unknown, ID: {id}")), + Self::KnownAddr(addr) => addr.fmt(f), + Self::Unknown(id) => f.write_str(&format!("Unknown, ID: {id}")), } } } @@ -113,7 +113,7 @@ impl Client { fn set_err(&self, err: PeerError) -> tower::BoxError { let err_str = err.to_string(); match self.error.try_insert_err(err) { - Ok(_) => err_str, + Ok(()) => err_str, Err(e) => e.to_string(), } .into() @@ -169,9 +169,8 @@ impl Service for Client { TrySendError::Closed(req) | TrySendError::Full(req) => { self.set_err(PeerError::ClientChannelClosed); - let _ = req - .response_channel - .send(Err(PeerError::ClientChannelClosed.into())); + let resp = Err(PeerError::ClientChannelClosed.into()); + drop(req.response_channel.send(resp)); } } } @@ -216,7 +215,7 @@ where tracing::debug!("Sending back response"); - let _ = req.response_channel.send(Ok(res)); + drop(req.response_channel.send(Ok(res))); } } .instrument(task_span), diff --git a/p2p/p2p-core/src/client/connection.rs b/p2p/p2p-core/src/client/connection.rs index f3f3f6b..f7b9be5 100644 --- a/p2p/p2p-core/src/client/connection.rs +++ b/p2p/p2p-core/src/client/connection.rs @@ -26,7 +26,7 @@ use crate::{ }; /// A request to the connection task from a [`Client`](crate::client::Client). -pub struct ConnectionTaskRequest { +pub(crate) struct ConnectionTaskRequest { /// The request. pub request: PeerRequest, /// The response channel. @@ -36,7 +36,7 @@ pub struct ConnectionTaskRequest { } /// The connection state. -pub enum State { +pub(crate) enum State { /// Waiting for a request from Cuprate or the connected peer. WaitingForRequest, /// Waiting for a response from the peer. @@ -53,7 +53,7 @@ pub enum State { /// Returns if the [`LevinCommand`] is the correct response message for our request. /// /// e.g. that we didn't get a block for a txs request. -fn levin_command_response(message_id: &MessageID, command: LevinCommand) -> bool { +const fn levin_command_response(message_id: MessageID, command: LevinCommand) -> bool { matches!( (message_id, command), (MessageID::Handshake, LevinCommand::Handshake) @@ -71,7 +71,7 @@ fn levin_command_response(message_id: &MessageID, command: LevinCommand) -> bool } /// This represents a connection to a peer. -pub struct Connection { +pub(crate) struct Connection { /// The peer sink - where we send messages to the peer. peer_sink: Z::Sink, @@ -104,15 +104,15 @@ where BrdcstStrm: Stream + Send + 'static, { /// Create a new connection struct. - pub fn new( + pub(crate) fn new( peer_sink: Z::Sink, client_rx: mpsc::Receiver, broadcast_stream: BrdcstStrm, peer_request_handler: PeerRequestHandler, connection_guard: ConnectionGuard, error: SharedError, - ) -> Connection { - Connection { + ) -> Self { + Self { peer_sink, state: State::WaitingForRequest, request_timeout: None, @@ -174,15 +174,14 @@ where if let Err(e) = res { // can't clone the error so turn it to a string first, hacky but oh well. let err_str = e.to_string(); - let _ = req.response_channel.send(Err(err_str.clone().into())); + drop(req.response_channel.send(Err(err_str.into()))); return Err(e); - } else { - // We still need to respond even if the response is this. - let _ = req - .response_channel - .send(Ok(PeerResponse::Protocol(ProtocolResponse::NA))); } + // We still need to respond even if the response is this. + let resp = Ok(PeerResponse::Protocol(ProtocolResponse::NA)); + drop(req.response_channel.send(resp)); + Ok(()) } @@ -215,7 +214,7 @@ where }; // Check if the message is a response to our request. - if levin_command_response(request_id, mes.command()) { + if levin_command_response(*request_id, mes.command()) { // TODO: Do more checks before returning response. let State::WaitingForResponse { tx, .. } = @@ -224,9 +223,11 @@ where panic!("Not in correct state, can't receive response!") }; - let _ = tx.send(Ok(mes + let resp = Ok(mes .try_into() - .map_err(|_| PeerError::PeerSentInvalidMessage)?)); + .map_err(|_| PeerError::PeerSentInvalidMessage)?); + + drop(tx.send(resp)); self.request_timeout = None; @@ -282,7 +283,7 @@ where tokio::select! { biased; - _ = self.request_timeout.as_mut().expect("Request timeout was not set!") => { + () = self.request_timeout.as_mut().expect("Request timeout was not set!") => { Err(PeerError::ClientChannelClosed) } broadcast_req = self.broadcast_stream.next() => { @@ -306,8 +307,11 @@ where /// Runs the Connection handler logic, this should be put in a separate task. /// /// `eager_protocol_messages` are protocol messages that we received during a handshake. - pub async fn run(mut self, mut stream: Str, eager_protocol_messages: Vec) - where + pub(crate) async fn run( + mut self, + mut stream: Str, + eager_protocol_messages: Vec, + ) where Str: FusedStream> + Unpin, { tracing::debug!( @@ -348,6 +352,7 @@ where /// Shutdowns the connection, flushing pending requests and setting the error slot, if it hasn't been /// set already. + #[expect(clippy::significant_drop_tightening)] fn shutdown(mut self, err: PeerError) { tracing::debug!("Connection task shutting down: {}", err); @@ -362,11 +367,11 @@ where if let State::WaitingForResponse { tx, .. } = std::mem::replace(&mut self.state, State::WaitingForRequest) { - let _ = tx.send(Err(err_str.clone().into())); + drop(tx.send(Err(err_str.clone().into()))); } while let Ok(req) = client_rx.try_recv() { - let _ = req.response_channel.send(Err(err_str.clone().into())); + drop(req.response_channel.send(Err(err_str.clone().into()))); } self.connection_guard.connection_closed(); diff --git a/p2p/p2p-core/src/client/connector.rs b/p2p/p2p-core/src/client/connector.rs index d937165..553f5a4 100644 --- a/p2p/p2p-core/src/client/connector.rs +++ b/p2p/p2p-core/src/client/connector.rs @@ -40,7 +40,9 @@ impl Connector { /// Create a new connector from a handshaker. - pub fn new(handshaker: HandShaker) -> Self { + pub const fn new( + handshaker: HandShaker, + ) -> Self { Self { handshaker } } } diff --git a/p2p/p2p-core/src/client/handshaker.rs b/p2p/p2p-core/src/client/handshaker.rs index 67a58d4..d6873a8 100644 --- a/p2p/p2p-core/src/client/handshaker.rs +++ b/p2p/p2p-core/src/client/handshaker.rs @@ -113,7 +113,7 @@ impl HandShaker { /// Creates a new handshaker. - fn new( + const fn new( address_book: AdrBook, peer_sync_svc: PSync, core_sync_svc: CSync, @@ -226,11 +226,12 @@ pub async fn ping(addr: N::Addr) -> Result Err(BucketError::IO(std::io::Error::new( std::io::ErrorKind::ConnectionAborted, "The peer stream returned None", - )))? + )) + .into()) } /// This function completes a handshake with the requested peer. -#[allow(clippy::too_many_arguments)] +#[expect(clippy::too_many_arguments)] async fn handshake( req: DoHandshakeRequest, @@ -403,7 +404,10 @@ where break 'check_out_addr None; }; - // u32 does not make sense as a port so just truncate it. + #[expect( + clippy::cast_possible_truncation, + reason = "u32 does not make sense as a port so just truncate it." + )] outbound_address.set_port(peer_node_data.my_port as u16); let Ok(Ok(ping_peer_id)) = timeout( @@ -508,7 +512,7 @@ where info.id, info.handle.clone(), connection_tx.clone(), - semaphore.clone(), + Arc::clone(&semaphore), address_book, core_sync_svc, peer_sync_svc, @@ -671,7 +675,7 @@ async fn wait_for_message( _ => { return Err(HandshakeError::PeerSentInvalidMessage( "Peer sent an admin request before responding to the handshake", - )) + )); } } } @@ -686,16 +690,17 @@ async fn wait_for_message( )); } - _ => Err(HandshakeError::PeerSentInvalidMessage( + Message::Response(_) => Err(HandshakeError::PeerSentInvalidMessage( "Peer sent an incorrect message", )), - }? + }?; } Err(BucketError::IO(std::io::Error::new( std::io::ErrorKind::ConnectionAborted, "The peer stream returned None", - )))? + )) + .into()) } /// Sends a [`AdminResponseMessage::SupportFlags`] down the peer sink. diff --git a/p2p/p2p-core/src/client/handshaker/builder.rs b/p2p/p2p-core/src/client/handshaker/builder.rs index a40f396..069811d 100644 --- a/p2p/p2p-core/src/client/handshaker/builder.rs +++ b/p2p/p2p-core/src/client/handshaker/builder.rs @@ -87,14 +87,13 @@ impl where NAdrBook: AddressBook + Clone, { - let HandshakerBuilder { + let Self { core_sync_svc, peer_sync_svc, protocol_request_svc, our_basic_node_data, broadcast_stream_maker, connection_parent_span, - _zone, .. } = self; @@ -106,7 +105,7 @@ impl our_basic_node_data, broadcast_stream_maker, connection_parent_span, - _zone, + _zone: PhantomData, } } @@ -130,14 +129,13 @@ impl where NCSync: CoreSyncSvc + Clone, { - let HandshakerBuilder { + let Self { address_book, peer_sync_svc, protocol_request_svc, our_basic_node_data, broadcast_stream_maker, connection_parent_span, - _zone, .. } = self; @@ -149,7 +147,7 @@ impl our_basic_node_data, broadcast_stream_maker, connection_parent_span, - _zone, + _zone: PhantomData, } } @@ -167,14 +165,13 @@ impl where NPSync: PeerSyncSvc + Clone, { - let HandshakerBuilder { + let Self { address_book, core_sync_svc, protocol_request_svc, our_basic_node_data, broadcast_stream_maker, connection_parent_span, - _zone, .. } = self; @@ -186,7 +183,7 @@ impl our_basic_node_data, broadcast_stream_maker, connection_parent_span, - _zone, + _zone: PhantomData, } } @@ -204,14 +201,13 @@ impl where NProtoHdlr: ProtocolRequestHandler + Clone, { - let HandshakerBuilder { + let Self { address_book, core_sync_svc, peer_sync_svc, our_basic_node_data, broadcast_stream_maker, connection_parent_span, - _zone, .. } = self; @@ -223,7 +219,7 @@ impl our_basic_node_data, broadcast_stream_maker, connection_parent_span, - _zone, + _zone: PhantomData, } } @@ -242,14 +238,13 @@ impl BrdcstStrm: Stream + Send + 'static, NBrdcstStrmMkr: Fn(InternalPeerID) -> BrdcstStrm + Clone + Send + 'static, { - let HandshakerBuilder { + let Self { address_book, core_sync_svc, peer_sync_svc, protocol_request_svc, our_basic_node_data, connection_parent_span, - _zone, .. } = self; @@ -261,7 +256,7 @@ impl our_basic_node_data, broadcast_stream_maker: new_broadcast_stream_maker, connection_parent_span, - _zone, + _zone: PhantomData, } } @@ -270,6 +265,7 @@ impl /// ## Default Connection Parent Span /// /// The default connection span will be [`Span::none`]. + #[must_use] pub fn with_connection_parent_span(self, connection_parent_span: Span) -> Self { Self { connection_parent_span: Some(connection_parent_span), diff --git a/p2p/p2p-core/src/client/handshaker/builder/dummy.rs b/p2p/p2p-core/src/client/handshaker/builder/dummy.rs index ae97cdc..e3c4335 100644 --- a/p2p/p2p-core/src/client/handshaker/builder/dummy.rs +++ b/p2p/p2p-core/src/client/handshaker/builder/dummy.rs @@ -42,8 +42,8 @@ pub struct DummyCoreSyncSvc(CoreSyncData); impl DummyCoreSyncSvc { /// Returns a [`DummyCoreSyncSvc`] that will just return the mainnet genesis [`CoreSyncData`]. - pub fn static_mainnet_genesis() -> DummyCoreSyncSvc { - DummyCoreSyncSvc(CoreSyncData { + pub const fn static_mainnet_genesis() -> Self { + Self(CoreSyncData { cumulative_difficulty: 1, cumulative_difficulty_top64: 0, current_height: 1, @@ -56,8 +56,8 @@ impl DummyCoreSyncSvc { } /// Returns a [`DummyCoreSyncSvc`] that will just return the testnet genesis [`CoreSyncData`]. - pub fn static_testnet_genesis() -> DummyCoreSyncSvc { - DummyCoreSyncSvc(CoreSyncData { + pub const fn static_testnet_genesis() -> Self { + Self(CoreSyncData { cumulative_difficulty: 1, cumulative_difficulty_top64: 0, current_height: 1, @@ -70,8 +70,8 @@ impl DummyCoreSyncSvc { } /// Returns a [`DummyCoreSyncSvc`] that will just return the stagenet genesis [`CoreSyncData`]. - pub fn static_stagenet_genesis() -> DummyCoreSyncSvc { - DummyCoreSyncSvc(CoreSyncData { + pub const fn static_stagenet_genesis() -> Self { + Self(CoreSyncData { cumulative_difficulty: 1, cumulative_difficulty_top64: 0, current_height: 1, @@ -84,8 +84,8 @@ impl DummyCoreSyncSvc { } /// Returns a [`DummyCoreSyncSvc`] that will return the provided [`CoreSyncData`]. - pub fn static_custom(data: CoreSyncData) -> DummyCoreSyncSvc { - DummyCoreSyncSvc(data) + pub const fn static_custom(data: CoreSyncData) -> Self { + Self(data) } } diff --git a/p2p/p2p-core/src/client/request_handler.rs b/p2p/p2p-core/src/client/request_handler.rs index 284f954..7059eed 100644 --- a/p2p/p2p-core/src/client/request_handler.rs +++ b/p2p/p2p-core/src/client/request_handler.rs @@ -46,7 +46,7 @@ pub(crate) struct PeerRequestHandler { pub peer_info: PeerInformation, } -impl PeerRequestHandler +impl PeerRequestHandler where Z: NetworkZone, A: AddressBook, @@ -55,7 +55,7 @@ where PR: ProtocolRequestHandler, { /// Handles an incoming [`PeerRequest`] to our node. - pub async fn handle_peer_request( + pub(crate) async fn handle_peer_request( &mut self, req: PeerRequest, ) -> Result { diff --git a/p2p/p2p-core/src/client/timeout_monitor.rs b/p2p/p2p-core/src/client/timeout_monitor.rs index 5228ede..6dbb4a2 100644 --- a/p2p/p2p-core/src/client/timeout_monitor.rs +++ b/p2p/p2p-core/src/client/timeout_monitor.rs @@ -1,6 +1,6 @@ //! Timeout Monitor //! -//! This module holds the task that sends periodic [TimedSync](PeerRequest::TimedSync) requests to a peer to make +//! This module holds the task that sends periodic [`TimedSync`](PeerRequest::TimedSync) requests to a peer to make //! sure the connection is still active. use std::sync::Arc; @@ -64,7 +64,7 @@ where return Ok(()); } - let Ok(permit) = semaphore.clone().try_acquire_owned() else { + let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() else { // If we can't get a permit the connection is currently waiting for a response, so no need to // do a timed sync. continue; diff --git a/p2p/p2p-core/src/error.rs b/p2p/p2p-core/src/error.rs index 65303ad..d0de923 100644 --- a/p2p/p2p-core/src/error.rs +++ b/p2p/p2p-core/src/error.rs @@ -4,7 +4,7 @@ pub struct SharedError(Arc>); impl Clone for SharedError { fn clone(&self) -> Self { - Self(self.0.clone()) + Self(Arc::clone(&self.0)) } } diff --git a/p2p/p2p-core/src/handles.rs b/p2p/p2p-core/src/handles.rs index da47b65..06dc212 100644 --- a/p2p/p2p-core/src/handles.rs +++ b/p2p/p2p-core/src/handles.rs @@ -18,11 +18,12 @@ pub struct HandleBuilder { impl HandleBuilder { /// Create a new builder. - pub fn new() -> Self { + pub const fn new() -> Self { Self { permit: None } } /// Sets the permit for this connection. + #[must_use] pub fn with_permit(mut self, permit: Option) -> Self { self.permit = permit; self @@ -40,7 +41,7 @@ impl HandleBuilder { _permit: self.permit, }, ConnectionHandle { - token: token.clone(), + token, ban: Arc::new(OnceLock::new()), }, ) @@ -66,13 +67,13 @@ impl ConnectionGuard { /// /// This will be called on [`Drop::drop`]. pub fn connection_closed(&self) { - self.token.cancel() + self.token.cancel(); } } impl Drop for ConnectionGuard { fn drop(&mut self) { - self.token.cancel() + self.token.cancel(); } } @@ -90,6 +91,10 @@ impl ConnectionHandle { } /// Bans the peer for the given `duration`. pub fn ban_peer(&self, duration: Duration) { + #[expect( + clippy::let_underscore_must_use, + reason = "error means peer is already banned; fine to ignore" + )] let _ = self.ban.set(BanPeer(duration)); self.token.cancel(); } @@ -103,6 +108,6 @@ impl ConnectionHandle { } /// Sends the signal to the connection task to disconnect. pub fn send_close_signal(&self) { - self.token.cancel() + self.token.cancel(); } } diff --git a/p2p/p2p-core/src/lib.rs b/p2p/p2p-core/src/lib.rs index 83cc4d2..04e8676 100644 --- a/p2p/p2p-core/src/lib.rs +++ b/p2p/p2p-core/src/lib.rs @@ -6,7 +6,7 @@ //! //! # Network Zones //! -//! This crate abstracts over network zones, Tor/I2p/clearnet with the [NetworkZone] trait. Currently only clearnet is implemented: [ClearNet]. +//! This crate abstracts over network zones, Tor/I2p/clearnet with the [`NetworkZone`] trait. Currently only clearnet is implemented: [`ClearNet`]. //! //! # Usage //! @@ -56,6 +56,16 @@ //! .unwrap(); //! # }); //! ``` + +cfg_if::cfg_if! { + // Used in `tests/` + if #[cfg(test)] { + use cuprate_test_utils as _; + use tokio_test as _; + use hex as _; + } +} + use std::{fmt::Debug, future::Future, hash::Hash}; use futures::{Sink, Stream}; @@ -102,7 +112,7 @@ pub trait NetZoneAddress: + Unpin + 'static { - /// Cuprate needs to be able to ban peers by IP addresses and not just by SocketAddr as + /// Cuprate needs to be able to ban peers by IP addresses and not just by `SocketAddr` as /// that include the port, to be able to facilitate this network addresses must have a ban ID /// which for hidden services could just be the address it self but for clear net addresses will /// be the IP address. diff --git a/p2p/p2p-core/src/network_zones/clear.rs b/p2p/p2p-core/src/network_zones/clear.rs index acde368..261d5ad 100644 --- a/p2p/p2p-core/src/network_zones/clear.rs +++ b/p2p/p2p-core/src/network_zones/clear.rs @@ -19,7 +19,7 @@ impl NetZoneAddress for SocketAddr { type BanID = IpAddr; fn set_port(&mut self, port: u16) { - SocketAddr::set_port(self, port) + Self::set_port(self, port); } fn ban_id(&self) -> Self::BanID { diff --git a/p2p/p2p-core/src/protocol.rs b/p2p/p2p-core/src/protocol.rs index 5e4f4d7..7d8d431 100644 --- a/p2p/p2p-core/src/protocol.rs +++ b/p2p/p2p-core/src/protocol.rs @@ -8,7 +8,7 @@ //! //! Here is every P2P request/response. //! -//! *note admin messages are already request/response so "Handshake" is actually made of a HandshakeRequest & HandshakeResponse +//! *note admin messages are already request/response so "Handshake" is actually made of a `HandshakeRequest` & `HandshakeResponse` //! //! ```md //! Admin: @@ -78,15 +78,15 @@ pub enum PeerRequest { } impl PeerRequest { - pub fn id(&self) -> MessageID { + pub const fn id(&self) -> MessageID { match self { - PeerRequest::Admin(admin_req) => match admin_req { + Self::Admin(admin_req) => match admin_req { AdminRequestMessage::Handshake(_) => MessageID::Handshake, AdminRequestMessage::TimedSync(_) => MessageID::TimedSync, AdminRequestMessage::Ping => MessageID::Ping, AdminRequestMessage::SupportFlags => MessageID::SupportFlags, }, - PeerRequest::Protocol(protocol_request) => match protocol_request { + Self::Protocol(protocol_request) => match protocol_request { ProtocolRequest::GetObjects(_) => MessageID::GetObjects, ProtocolRequest::GetChain(_) => MessageID::GetChain, ProtocolRequest::FluffyMissingTxs(_) => MessageID::FluffyMissingTxs, @@ -98,10 +98,10 @@ impl PeerRequest { } } - pub fn needs_response(&self) -> bool { + pub const fn needs_response(&self) -> bool { !matches!( self, - PeerRequest::Protocol( + Self::Protocol( ProtocolRequest::NewBlock(_) | ProtocolRequest::NewFluffyBlock(_) | ProtocolRequest::NewTransactions(_) @@ -126,15 +126,15 @@ pub enum PeerResponse { } impl PeerResponse { - pub fn id(&self) -> Option { + pub const fn id(&self) -> Option { Some(match self { - PeerResponse::Admin(admin_res) => match admin_res { + Self::Admin(admin_res) => match admin_res { AdminResponseMessage::Handshake(_) => MessageID::Handshake, AdminResponseMessage::TimedSync(_) => MessageID::TimedSync, AdminResponseMessage::Ping(_) => MessageID::Ping, AdminResponseMessage::SupportFlags(_) => MessageID::SupportFlags, }, - PeerResponse::Protocol(protocol_res) => match protocol_res { + Self::Protocol(protocol_res) => match protocol_res { ProtocolResponse::GetObjects(_) => MessageID::GetObjects, ProtocolResponse::GetChain(_) => MessageID::GetChain, ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock, diff --git a/p2p/p2p-core/src/protocol/try_from.rs b/p2p/p2p-core/src/protocol/try_from.rs index 8a0b67d..d3a7260 100644 --- a/p2p/p2p-core/src/protocol/try_from.rs +++ b/p2p/p2p-core/src/protocol/try_from.rs @@ -11,15 +11,13 @@ pub struct MessageConversionError; impl From for ProtocolMessage { fn from(value: ProtocolRequest) -> Self { match value { - ProtocolRequest::GetObjects(val) => ProtocolMessage::GetObjectsRequest(val), - ProtocolRequest::GetChain(val) => ProtocolMessage::ChainRequest(val), - ProtocolRequest::FluffyMissingTxs(val) => { - ProtocolMessage::FluffyMissingTransactionsRequest(val) - } - ProtocolRequest::GetTxPoolCompliment(val) => ProtocolMessage::GetTxPoolCompliment(val), - ProtocolRequest::NewBlock(val) => ProtocolMessage::NewBlock(val), - ProtocolRequest::NewFluffyBlock(val) => ProtocolMessage::NewFluffyBlock(val), - ProtocolRequest::NewTransactions(val) => ProtocolMessage::NewTransactions(val), + ProtocolRequest::GetObjects(val) => Self::GetObjectsRequest(val), + ProtocolRequest::GetChain(val) => Self::ChainRequest(val), + ProtocolRequest::FluffyMissingTxs(val) => Self::FluffyMissingTransactionsRequest(val), + ProtocolRequest::GetTxPoolCompliment(val) => Self::GetTxPoolCompliment(val), + ProtocolRequest::NewBlock(val) => Self::NewBlock(val), + ProtocolRequest::NewFluffyBlock(val) => Self::NewFluffyBlock(val), + ProtocolRequest::NewTransactions(val) => Self::NewTransactions(val), } } } @@ -29,15 +27,13 @@ impl TryFrom for ProtocolRequest { fn try_from(value: ProtocolMessage) -> Result { Ok(match value { - ProtocolMessage::GetObjectsRequest(val) => ProtocolRequest::GetObjects(val), - ProtocolMessage::ChainRequest(val) => ProtocolRequest::GetChain(val), - ProtocolMessage::FluffyMissingTransactionsRequest(val) => { - ProtocolRequest::FluffyMissingTxs(val) - } - ProtocolMessage::GetTxPoolCompliment(val) => ProtocolRequest::GetTxPoolCompliment(val), - ProtocolMessage::NewBlock(val) => ProtocolRequest::NewBlock(val), - ProtocolMessage::NewFluffyBlock(val) => ProtocolRequest::NewFluffyBlock(val), - ProtocolMessage::NewTransactions(val) => ProtocolRequest::NewTransactions(val), + ProtocolMessage::GetObjectsRequest(val) => Self::GetObjects(val), + ProtocolMessage::ChainRequest(val) => Self::GetChain(val), + ProtocolMessage::FluffyMissingTransactionsRequest(val) => Self::FluffyMissingTxs(val), + ProtocolMessage::GetTxPoolCompliment(val) => Self::GetTxPoolCompliment(val), + ProtocolMessage::NewBlock(val) => Self::NewBlock(val), + ProtocolMessage::NewFluffyBlock(val) => Self::NewFluffyBlock(val), + ProtocolMessage::NewTransactions(val) => Self::NewTransactions(val), ProtocolMessage::GetObjectsResponse(_) | ProtocolMessage::ChainEntryResponse(_) => { return Err(MessageConversionError) } @@ -48,8 +44,8 @@ impl TryFrom for ProtocolRequest { impl From for Message { fn from(value: PeerRequest) -> Self { match value { - PeerRequest::Admin(val) => Message::Request(val), - PeerRequest::Protocol(val) => Message::Protocol(val.into()), + PeerRequest::Admin(val) => Self::Request(val), + PeerRequest::Protocol(val) => Self::Protocol(val.into()), } } } @@ -59,8 +55,8 @@ impl TryFrom for PeerRequest { fn try_from(value: Message) -> Result { match value { - Message::Request(req) => Ok(PeerRequest::Admin(req)), - Message::Protocol(pro) => Ok(PeerRequest::Protocol(pro.try_into()?)), + Message::Request(req) => Ok(Self::Admin(req)), + Message::Protocol(pro) => Ok(Self::Protocol(pro.try_into()?)), Message::Response(_) => Err(MessageConversionError), } } @@ -71,10 +67,10 @@ impl TryFrom for ProtocolMessage { fn try_from(value: ProtocolResponse) -> Result { Ok(match value { - ProtocolResponse::NewTransactions(val) => ProtocolMessage::NewTransactions(val), - ProtocolResponse::NewFluffyBlock(val) => ProtocolMessage::NewFluffyBlock(val), - ProtocolResponse::GetChain(val) => ProtocolMessage::ChainEntryResponse(val), - ProtocolResponse::GetObjects(val) => ProtocolMessage::GetObjectsResponse(val), + ProtocolResponse::NewTransactions(val) => Self::NewTransactions(val), + ProtocolResponse::NewFluffyBlock(val) => Self::NewFluffyBlock(val), + ProtocolResponse::GetChain(val) => Self::ChainEntryResponse(val), + ProtocolResponse::GetObjects(val) => Self::GetObjectsResponse(val), ProtocolResponse::NA => return Err(MessageConversionError), }) } @@ -85,10 +81,10 @@ impl TryFrom for ProtocolResponse { fn try_from(value: ProtocolMessage) -> Result { Ok(match value { - ProtocolMessage::NewTransactions(val) => ProtocolResponse::NewTransactions(val), - ProtocolMessage::NewFluffyBlock(val) => ProtocolResponse::NewFluffyBlock(val), - ProtocolMessage::ChainEntryResponse(val) => ProtocolResponse::GetChain(val), - ProtocolMessage::GetObjectsResponse(val) => ProtocolResponse::GetObjects(val), + ProtocolMessage::NewTransactions(val) => Self::NewTransactions(val), + ProtocolMessage::NewFluffyBlock(val) => Self::NewFluffyBlock(val), + ProtocolMessage::ChainEntryResponse(val) => Self::GetChain(val), + ProtocolMessage::GetObjectsResponse(val) => Self::GetObjects(val), ProtocolMessage::ChainRequest(_) | ProtocolMessage::FluffyMissingTransactionsRequest(_) | ProtocolMessage::GetObjectsRequest(_) @@ -103,8 +99,8 @@ impl TryFrom for PeerResponse { fn try_from(value: Message) -> Result { match value { - Message::Response(res) => Ok(PeerResponse::Admin(res)), - Message::Protocol(pro) => Ok(PeerResponse::Protocol(pro.try_into()?)), + Message::Response(res) => Ok(Self::Admin(res)), + Message::Protocol(pro) => Ok(Self::Protocol(pro.try_into()?)), Message::Request(_) => Err(MessageConversionError), } } @@ -115,8 +111,8 @@ impl TryFrom for Message { fn try_from(value: PeerResponse) -> Result { Ok(match value { - PeerResponse::Admin(val) => Message::Response(val), - PeerResponse::Protocol(val) => Message::Protocol(val.try_into()?), + PeerResponse::Admin(val) => Self::Response(val), + PeerResponse::Protocol(val) => Self::Protocol(val.try_into()?), }) } } diff --git a/p2p/p2p-core/src/services.rs b/p2p/p2p-core/src/services.rs index 6d66cfa..ba87684 100644 --- a/p2p/p2p-core/src/services.rs +++ b/p2p/p2p-core/src/services.rs @@ -52,7 +52,7 @@ pub struct ZoneSpecificPeerListEntryBase { pub rpc_credits_per_hash: u32, } -impl From> for cuprate_wire::PeerListEntryBase { +impl From> for PeerListEntryBase { fn from(value: ZoneSpecificPeerListEntryBase) -> Self { Self { adr: value.adr.into(), @@ -74,9 +74,7 @@ pub enum PeerListConversionError { PruningSeed(#[from] PruningError), } -impl TryFrom - for ZoneSpecificPeerListEntryBase -{ +impl TryFrom for ZoneSpecificPeerListEntryBase { type Error = PeerListConversionError; fn try_from(value: PeerListEntryBase) -> Result { diff --git a/p2p/p2p-core/tests/fragmented_handshake.rs b/p2p/p2p-core/tests/fragmented_handshake.rs index c19a2a6..1235df9 100644 --- a/p2p/p2p-core/tests/fragmented_handshake.rs +++ b/p2p/p2p-core/tests/fragmented_handshake.rs @@ -1,4 +1,7 @@ //! This file contains a test for a handshake with monerod but uses fragmented messages. + +#![expect(unused_crate_dependencies, reason = "external test module")] + use std::{ net::SocketAddr, pin::Pin, @@ -21,6 +24,13 @@ use tokio_util::{ use tower::{Service, ServiceExt}; use cuprate_helper::network::Network; +use cuprate_test_utils::monerod::monerod; +use cuprate_wire::{ + common::PeerSupportFlags, + levin::{message::make_fragmented_messages, LevinMessage, Protocol}, + BasicNodeData, Message, MoneroWireCodec, +}; + use cuprate_p2p_core::{ client::{ handshaker::HandshakerBuilder, ConnectRequest, Connector, DoHandshakeRequest, @@ -28,13 +38,6 @@ use cuprate_p2p_core::{ }, ClearNetServerCfg, ConnectionDirection, NetworkZone, }; -use cuprate_wire::{ - common::PeerSupportFlags, - levin::{message::make_fragmented_messages, LevinMessage, Protocol}, - BasicNodeData, Message, MoneroWireCodec, -}; - -use cuprate_test_utils::monerod::monerod; /// A network zone equal to clear net where every message sent is turned into a fragmented message. /// Does not support sending fragmented or dummy messages manually. @@ -184,7 +187,7 @@ async fn fragmented_handshake_monerod_to_cuprate() { let next_connection_fut = timeout(Duration::from_secs(30), listener.next()); if let Some(Ok((addr, stream, sink))) = next_connection_fut.await.unwrap() { - let _ = handshaker + handshaker .ready() .await .unwrap() diff --git a/p2p/p2p-core/tests/handles.rs b/p2p/p2p-core/tests/handles.rs index 47d70b0..2a2e2be 100644 --- a/p2p/p2p-core/tests/handles.rs +++ b/p2p/p2p-core/tests/handles.rs @@ -1,3 +1,5 @@ +#![expect(unused_crate_dependencies, reason = "external test module")] + use std::{sync::Arc, time::Duration}; use tokio::sync::Semaphore; diff --git a/p2p/p2p-core/tests/handshake.rs b/p2p/p2p-core/tests/handshake.rs index 5ce6153..86d62ed 100644 --- a/p2p/p2p-core/tests/handshake.rs +++ b/p2p/p2p-core/tests/handshake.rs @@ -1,3 +1,5 @@ +#![expect(unused_crate_dependencies, reason = "external test module")] + use std::time::Duration; use futures::StreamExt; @@ -9,6 +11,10 @@ use tokio_util::codec::{FramedRead, FramedWrite}; use tower::{Service, ServiceExt}; use cuprate_helper::network::Network; +use cuprate_test_utils::{ + monerod::monerod, + test_netzone::{TestNetZone, TestNetZoneAddr}, +}; use cuprate_wire::{common::PeerSupportFlags, BasicNodeData, MoneroWireCodec}; use cuprate_p2p_core::{ @@ -19,12 +25,8 @@ use cuprate_p2p_core::{ ClearNet, ClearNetServerCfg, ConnectionDirection, NetworkZone, }; -use cuprate_test_utils::{ - monerod::monerod, - test_netzone::{TestNetZone, TestNetZoneAddr}, -}; - #[tokio::test] +#[expect(clippy::significant_drop_tightening)] async fn handshake_cuprate_to_cuprate() { // Tests a Cuprate <-> Cuprate handshake by making 2 handshake services and making them talk to // each other. @@ -147,7 +149,7 @@ async fn handshake_monerod_to_cuprate() { let next_connection_fut = timeout(Duration::from_secs(30), listener.next()); if let Some(Ok((addr, stream, sink))) = next_connection_fut.await.unwrap() { - let _ = handshaker + handshaker .ready() .await .unwrap() diff --git a/p2p/p2p-core/tests/sending_receiving.rs b/p2p/p2p-core/tests/sending_receiving.rs index e035daf..8c90c83 100644 --- a/p2p/p2p-core/tests/sending_receiving.rs +++ b/p2p/p2p-core/tests/sending_receiving.rs @@ -1,6 +1,9 @@ +#![expect(unused_crate_dependencies, reason = "external test module")] + use tower::{Service, ServiceExt}; use cuprate_helper::network::Network; +use cuprate_test_utils::monerod::monerod; use cuprate_wire::{common::PeerSupportFlags, protocol::GetObjectsRequest, BasicNodeData}; use cuprate_p2p_core::{ @@ -9,8 +12,6 @@ use cuprate_p2p_core::{ ClearNet, ProtocolRequest, ProtocolResponse, }; -use cuprate_test_utils::monerod::monerod; - #[tokio::test] async fn get_single_block_from_monerod() { let monerod = monerod(["--out-peers=0"]).await;