diff --git a/p2p/p2p/Cargo.toml b/p2p/p2p/Cargo.toml index 7cbbdcb..ef85277 100644 --- a/p2p/p2p/Cargo.toml +++ b/p2p/p2p/Cargo.toml @@ -39,3 +39,6 @@ cuprate-test-utils = { path = "../../test-utils" } indexmap = { workspace = true } proptest = { workspace = true } tokio-test = { workspace = true } + +[lints] +workspace = true \ No newline at end of file diff --git a/p2p/p2p/src/block_downloader.rs b/p2p/p2p/src/block_downloader.rs index d295016..39980a0 100644 --- a/p2p/p2p/src/block_downloader.rs +++ b/p2p/p2p/src/block_downloader.rs @@ -78,7 +78,7 @@ pub struct BlockDownloaderConfig { /// An error that occurred in the [`BlockDownloader`]. #[derive(Debug, thiserror::Error)] -pub enum BlockDownloadError { +pub(crate) enum BlockDownloadError { #[error("A request to a peer timed out.")] TimedOut, #[error("The block buffer was closed.")] @@ -219,7 +219,7 @@ struct BlockDownloader { /// The running chain entry tasks. /// /// Returns a result of the chain entry or an error. - #[allow(clippy::type_complexity)] + #[expect(clippy::type_complexity)] chain_entry_task: JoinSet, ChainEntry), BlockDownloadError>>, /// The current inflight requests. @@ -273,7 +273,7 @@ where } /// Checks if we can make use of any peers that are currently pending requests. - async fn check_pending_peers( + fn check_pending_peers( &mut self, chain_tracker: &mut ChainTracker, pending_peers: &mut BTreeMap>>, @@ -287,7 +287,8 @@ where continue; } - if let Some(peer) = self.try_handle_free_client(chain_tracker, peer).await { + let client = self.try_handle_free_client(chain_tracker, peer); + if let Some(peer) = client { // This peer is ok however it does not have the data we currently need, this will only happen // because of its pruning seed so just skip over all peers with this pruning seed. peers.push(peer); @@ -303,7 +304,7 @@ where /// for them. /// /// Returns the [`ClientPoolDropGuard`] back if it doesn't have the batch according to its pruning seed. - async fn request_inflight_batch_again( + fn request_inflight_batch_again( &mut self, client: ClientPoolDropGuard, ) -> Option> { @@ -354,7 +355,7 @@ where /// /// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according /// to its pruning seed. - async fn request_block_batch( + fn request_block_batch( &mut self, chain_tracker: &mut ChainTracker, client: ClientPoolDropGuard, @@ -399,7 +400,7 @@ where // If our ready queue is too large send duplicate requests for the blocks we are waiting on. if self.block_queue.size() >= self.config.in_progress_queue_size { - return self.request_inflight_batch_again(client).await; + return self.request_inflight_batch_again(client); } // No failed requests that we can handle, request some new blocks. @@ -434,7 +435,7 @@ where /// /// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according /// to its pruning seed. - async fn try_handle_free_client( + fn try_handle_free_client( &mut self, chain_tracker: &mut ChainTracker, client: ClientPoolDropGuard, @@ -472,7 +473,7 @@ where } // Request a batch of blocks instead. - self.request_block_batch(chain_tracker, client).await + self.request_block_batch(chain_tracker, client) } /// Checks the [`ClientPool`] for free peers. @@ -516,7 +517,7 @@ where .push(client); } - self.check_pending_peers(chain_tracker, pending_peers).await; + self.check_pending_peers(chain_tracker, pending_peers); Ok(()) } @@ -574,7 +575,7 @@ where .or_default() .push(client); - self.check_pending_peers(chain_tracker, pending_peers).await; + self.check_pending_peers(chain_tracker, pending_peers); return Ok(()); }; @@ -611,7 +612,7 @@ where .or_default() .push(client); - self.check_pending_peers(chain_tracker, pending_peers).await; + self.check_pending_peers(chain_tracker, pending_peers); Ok(()) } @@ -679,7 +680,7 @@ where .or_default() .push(client); - self.check_pending_peers(&mut chain_tracker, &mut pending_peers).await; + self.check_pending_peers(&mut chain_tracker, &mut pending_peers); } Err(_) => self.amount_of_empty_chain_entries += 1 } @@ -698,7 +699,7 @@ struct BlockDownloadTaskResponse { } /// Returns if a peer has all the blocks in a range, according to its [`PruningSeed`]. -fn client_has_block_in_range( +const fn client_has_block_in_range( pruning_seed: &PruningSeed, start_height: usize, length: usize, diff --git a/p2p/p2p/src/block_downloader/block_queue.rs b/p2p/p2p/src/block_downloader/block_queue.rs index 5a92f49..5dd1b0d 100644 --- a/p2p/p2p/src/block_downloader/block_queue.rs +++ b/p2p/p2p/src/block_downloader/block_queue.rs @@ -13,7 +13,7 @@ use super::{BlockBatch, BlockDownloadError}; /// /// Also, the [`Ord`] impl is reversed so older blocks (lower height) come first in a [`BinaryHeap`]. #[derive(Debug, Clone)] -pub struct ReadyQueueBatch { +pub(crate) struct ReadyQueueBatch { /// The start height of the batch. pub start_height: usize, /// The batch of blocks. @@ -43,7 +43,7 @@ impl Ord for ReadyQueueBatch { /// The block queue that holds downloaded block batches, adding them to the [`async_buffer`] when the /// oldest batch has been downloaded. -pub struct BlockQueue { +pub(crate) struct BlockQueue { /// A queue of ready batches. ready_batches: BinaryHeap, /// The size, in bytes, of all the batches in [`Self::ready_batches`]. @@ -55,8 +55,8 @@ pub struct BlockQueue { impl BlockQueue { /// Creates a new [`BlockQueue`]. - pub fn new(buffer_appender: BufferAppender) -> BlockQueue { - BlockQueue { + pub(crate) const fn new(buffer_appender: BufferAppender) -> Self { + Self { ready_batches: BinaryHeap::new(), ready_batches_size: 0, buffer_appender, @@ -64,12 +64,12 @@ impl BlockQueue { } /// Returns the oldest batch that has not been put in the [`async_buffer`] yet. - pub fn oldest_ready_batch(&self) -> Option { + pub(crate) fn oldest_ready_batch(&self) -> Option { self.ready_batches.peek().map(|batch| batch.start_height) } /// Returns the size of all the batches that have not been put into the [`async_buffer`] yet. - pub fn size(&self) -> usize { + pub(crate) const fn size(&self) -> usize { self.ready_batches_size } @@ -77,7 +77,7 @@ impl BlockQueue { /// /// `oldest_in_flight_start_height` should be the start height of the oldest batch that is still inflight, if /// there are no batches inflight then this should be [`None`]. - pub async fn add_incoming_batch( + pub(crate) async fn add_incoming_batch( &mut self, new_batch: ReadyQueueBatch, oldest_in_flight_start_height: Option, diff --git a/p2p/p2p/src/block_downloader/chain_tracker.rs b/p2p/p2p/src/block_downloader/chain_tracker.rs index aacb163..a2f03c5 100644 --- a/p2p/p2p/src/block_downloader/chain_tracker.rs +++ b/p2p/p2p/src/block_downloader/chain_tracker.rs @@ -20,7 +20,7 @@ pub(crate) struct ChainEntry { /// A batch of blocks to retrieve. #[derive(Clone)] -pub struct BlocksToRetrieve { +pub(crate) struct BlocksToRetrieve { /// The block IDs to get. pub ids: ByteArrayVec<32>, /// The hash of the last block before this batch. @@ -39,7 +39,7 @@ pub struct BlocksToRetrieve { /// An error returned from the [`ChainTracker`]. #[derive(Debug, Clone)] -pub enum ChainTrackerError { +pub(crate) enum ChainTrackerError { /// The new chain entry is invalid. NewEntryIsInvalid, /// The new chain entry does not follow from the top of our chain tracker. @@ -50,7 +50,7 @@ pub enum ChainTrackerError { /// /// This struct allows following a single chain. It takes in [`ChainEntry`]s and /// allows getting [`BlocksToRetrieve`]. -pub struct ChainTracker { +pub(crate) struct ChainTracker { /// A list of [`ChainEntry`]s, in order. entries: VecDeque>, /// The height of the first block, in the first entry in [`Self::entries`]. @@ -65,7 +65,7 @@ pub struct ChainTracker { impl ChainTracker { /// Creates a new chain tracker. - pub fn new( + pub(crate) fn new( new_entry: ChainEntry, first_height: usize, our_genesis: [u8; 32], @@ -76,9 +76,9 @@ impl ChainTracker { entries.push_back(new_entry); Self { - top_seen_hash, entries, first_height, + top_seen_hash, previous_hash, our_genesis, } @@ -86,17 +86,17 @@ impl ChainTracker { /// Returns `true` if the peer is expected to have the next block after our highest seen block /// according to their pruning seed. - pub fn should_ask_for_next_chain_entry(&self, seed: &PruningSeed) -> bool { + pub(crate) fn should_ask_for_next_chain_entry(&self, seed: &PruningSeed) -> bool { seed.has_full_block(self.top_height(), CRYPTONOTE_MAX_BLOCK_HEIGHT) } /// Returns the simple history, the highest seen block and the genesis block. - pub fn get_simple_history(&self) -> [[u8; 32]; 2] { + pub(crate) const fn get_simple_history(&self) -> [[u8; 32]; 2] { [self.top_seen_hash, self.our_genesis] } /// Returns the height of the highest block we are tracking. - pub fn top_height(&self) -> usize { + pub(crate) fn top_height(&self) -> usize { let top_block_idx = self .entries .iter() @@ -110,7 +110,7 @@ impl ChainTracker { /// /// # Panics /// This function panics if `batch_size` is `0`. - pub fn block_requests_queued(&self, batch_size: usize) -> usize { + pub(crate) fn block_requests_queued(&self, batch_size: usize) -> usize { self.entries .iter() .map(|entry| entry.ids.len().div_ceil(batch_size)) @@ -118,7 +118,10 @@ impl ChainTracker { } /// Attempts to add an incoming [`ChainEntry`] to the chain tracker. - pub fn add_entry(&mut self, mut chain_entry: ChainEntry) -> Result<(), ChainTrackerError> { + pub(crate) fn add_entry( + &mut self, + mut chain_entry: ChainEntry, + ) -> Result<(), ChainTrackerError> { if chain_entry.ids.is_empty() { // The peer must send at lest one overlapping block. chain_entry.handle.ban_peer(MEDIUM_BAN); @@ -154,7 +157,7 @@ impl ChainTracker { /// Returns a batch of blocks to request. /// /// The returned batches length will be less than or equal to `max_blocks` - pub fn blocks_to_get( + pub(crate) fn blocks_to_get( &mut self, pruning_seed: &PruningSeed, max_blocks: usize, diff --git a/p2p/p2p/src/block_downloader/download_batch.rs b/p2p/p2p/src/block_downloader/download_batch.rs index ea57ead..bbb14b3 100644 --- a/p2p/p2p/src/block_downloader/download_batch.rs +++ b/p2p/p2p/src/block_downloader/download_batch.rs @@ -30,6 +30,7 @@ use crate::{ attempt = _attempt ) )] +#[expect(clippy::used_underscore_binding)] pub async fn download_batch_task( client: ClientPoolDropGuard, ids: ByteArrayVec<32>, @@ -103,6 +104,7 @@ async fn request_batch_from_peer( Ok((client, batch)) } +#[expect(clippy::needless_pass_by_value)] fn deserialize_batch( blocks_response: GetObjectsResponse, expected_start_height: usize, diff --git a/p2p/p2p/src/block_downloader/request_chain.rs b/p2p/p2p/src/block_downloader/request_chain.rs index 4b0b47e..bde40ce 100644 --- a/p2p/p2p/src/block_downloader/request_chain.rs +++ b/p2p/p2p/src/block_downloader/request_chain.rs @@ -30,7 +30,7 @@ use crate::{ /// /// Because the block downloader only follows and downloads one chain we only have to send the block hash of /// top block we have found and the genesis block, this is then called `short_history`. -pub async fn request_chain_entry_from_peer( +pub(crate) async fn request_chain_entry_from_peer( mut client: ClientPoolDropGuard, short_history: [[u8; 32]; 2], ) -> Result<(ClientPoolDropGuard, ChainEntry), BlockDownloadError> { @@ -179,7 +179,7 @@ where Some(res) => { // res has already been set, replace it if this peer claims higher cumulative difficulty if res.0.cumulative_difficulty() < task_res.0.cumulative_difficulty() { - let _ = mem::replace(res, task_res); + drop(mem::replace(res, task_res)); } } None => { diff --git a/p2p/p2p/src/block_downloader/tests.rs b/p2p/p2p/src/block_downloader/tests.rs index 86a9a46..a5c5e92 100644 --- a/p2p/p2p/src/block_downloader/tests.rs +++ b/p2p/p2p/src/block_downloader/tests.rs @@ -47,6 +47,7 @@ proptest! { let tokio_pool = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); + #[expect(clippy::significant_drop_tightening)] tokio_pool.block_on(async move { timeout(Duration::from_secs(600), async move { let client_pool = ClientPool::new(); @@ -54,7 +55,7 @@ proptest! { let mut peer_ids = Vec::with_capacity(peers); for _ in 0..peers { - let client = mock_block_downloader_client(blockchain.clone()); + let client = mock_block_downloader_client(Arc::clone(&blockchain)); peer_ids.push(client.info.id); @@ -156,7 +157,7 @@ prop_compose! { for (height, mut block) in blocks.into_iter().enumerate() { if let Some(last) = blockchain.last() { block.0.header.previous = *last.0; - block.0.miner_transaction.prefix_mut().inputs = vec![Input::Gen(height)] + block.0.miner_transaction.prefix_mut().inputs = vec![Input::Gen(height)]; } blockchain.insert(block.0.hash(), block); @@ -173,7 +174,7 @@ fn mock_block_downloader_client(blockchain: Arc) -> Client( +pub(crate) fn init_broadcast_channels( config: BroadcastConfig, ) -> ( BroadcastSvc, @@ -193,7 +193,7 @@ impl Service> for BroadcastSvc { }; // An error here means _all_ receivers were dropped which we assume will never happen. - let _ = match direction { + drop(match direction { Some(ConnectionDirection::Inbound) => { self.tx_broadcast_channel_inbound.send(nex_tx_info) } @@ -201,10 +201,10 @@ impl Service> for BroadcastSvc { self.tx_broadcast_channel_outbound.send(nex_tx_info) } None => { - let _ = self.tx_broadcast_channel_outbound.send(nex_tx_info.clone()); + drop(self.tx_broadcast_channel_outbound.send(nex_tx_info.clone())); self.tx_broadcast_channel_inbound.send(nex_tx_info) } - }; + }); } } @@ -246,7 +246,7 @@ struct BroadcastTxInfo { /// /// This is given to the connection task to await on for broadcast messages. #[pin_project::pin_project] -pub struct BroadcastMessageStream { +pub(crate) struct BroadcastMessageStream { /// The peer that is holding this stream. addr: InternalPeerID, @@ -336,8 +336,9 @@ impl Stream for BroadcastMessageStream { Poll::Ready(Some(BroadcastMessage::NewTransaction(txs))) } else { tracing::trace!("Diffusion flush timer expired but no txs to diffuse"); - // poll next_flush now to register the waker with it + // poll next_flush now to register the waker with it. // the waker will already be registered with the block broadcast channel. + #[expect(clippy::let_underscore_must_use)] let _ = this.next_flush.poll(cx); Poll::Pending } @@ -458,7 +459,7 @@ mod tests { let match_tx = |mes, txs| match mes { BroadcastMessage::NewTransaction(tx) => assert_eq!(tx.txs.as_slice(), txs), - _ => panic!("Block broadcast?"), + BroadcastMessage::NewFluffyBlock(_) => panic!("Block broadcast?"), }; let next = outbound_stream.next().await.unwrap(); @@ -520,7 +521,7 @@ mod tests { let match_tx = |mes, txs| match mes { BroadcastMessage::NewTransaction(tx) => assert_eq!(tx.txs.as_slice(), txs), - _ => panic!("Block broadcast?"), + BroadcastMessage::NewFluffyBlock(_) => panic!("Block broadcast?"), }; let next = outbound_stream.next().await.unwrap(); @@ -536,6 +537,6 @@ mod tests { futures::future::select(inbound_stream_from.next(), outbound_stream_from.next()) ) .await - .is_err()) + .is_err()); } } diff --git a/p2p/p2p/src/client_pool.rs b/p2p/p2p/src/client_pool.rs index 51f57e9..3405224 100644 --- a/p2p/p2p/src/client_pool.rs +++ b/p2p/p2p/src/client_pool.rs @@ -8,7 +8,7 @@ //! returns the peer to the pool when it is dropped. //! //! Internally the pool is a [`DashMap`] which means care should be taken in `async` code -//! as internally this uses blocking RwLocks. +//! as internally this uses blocking `RwLock`s. use std::sync::Arc; use dashmap::DashMap; @@ -24,7 +24,7 @@ use cuprate_p2p_core::{ pub(crate) mod disconnect_monitor; mod drop_guard_client; -pub use drop_guard_client::ClientPoolDropGuard; +pub(crate) use drop_guard_client::ClientPoolDropGuard; /// The client pool, which holds currently connected free peers. /// @@ -38,16 +38,17 @@ pub struct ClientPool { impl ClientPool { /// Returns a new [`ClientPool`] wrapped in an [`Arc`]. - pub fn new() -> Arc> { + pub fn new() -> Arc { let (tx, rx) = mpsc::unbounded_channel(); - let pool = Arc::new(ClientPool { + let pool = Arc::new(Self { clients: DashMap::new(), new_connection_tx: tx, }); tokio::spawn( - disconnect_monitor::disconnect_monitor(rx, pool.clone()).instrument(Span::current()), + disconnect_monitor::disconnect_monitor(rx, Arc::clone(&pool)) + .instrument(Span::current()), ); pool @@ -69,8 +70,7 @@ impl ClientPool { return; } - let res = self.clients.insert(id, client); - assert!(res.is_none()); + assert!(self.clients.insert(id, client).is_none()); // We have to check this again otherwise we could have a race condition where a // peer is disconnected after the first check, the disconnect monitor tries to remove it, @@ -121,7 +121,6 @@ impl ClientPool { /// Note that the returned iterator is not guaranteed to contain every peer asked for. /// /// See [`Self::borrow_client`] for borrowing a single client. - #[allow(private_interfaces)] // TODO: Remove me when 2024 Rust pub fn borrow_clients<'a, 'b>( self: &'a Arc, peers: &'b [InternalPeerID], @@ -133,7 +132,7 @@ impl ClientPool { mod sealed { /// TODO: Remove me when 2024 Rust /// - /// https://rust-lang.github.io/rfcs/3498-lifetime-capture-rules-2024.html#the-captures-trick + /// pub trait Captures {} impl Captures for T {} diff --git a/p2p/p2p/src/client_pool/disconnect_monitor.rs b/p2p/p2p/src/client_pool/disconnect_monitor.rs index f45d5e3..f54b560 100644 --- a/p2p/p2p/src/client_pool/disconnect_monitor.rs +++ b/p2p/p2p/src/client_pool/disconnect_monitor.rs @@ -78,6 +78,6 @@ impl Future for PeerDisconnectFut { this.closed_fut .poll(cx) - .map(|_| this.peer_id.take().unwrap()) + .map(|()| this.peer_id.take().unwrap()) } } diff --git a/p2p/p2p/src/connection_maintainer.rs b/p2p/p2p/src/connection_maintainer.rs index 3dfd5e8..be89973 100644 --- a/p2p/p2p/src/connection_maintainer.rs +++ b/p2p/p2p/src/connection_maintainer.rs @@ -99,12 +99,17 @@ where /// Connects to random seeds to get peers and immediately disconnects #[instrument(level = "info", skip(self))] + #[expect( + clippy::significant_drop_in_scrutinee, + clippy::significant_drop_tightening + )] async fn connect_to_random_seeds(&mut self) -> Result<(), OutboundConnectorError> { let seeds = N::SEEDS.choose_multiple(&mut thread_rng(), MAX_SEED_CONNECTIONS); - if seeds.len() == 0 { - panic!("No seed nodes available to get peers from"); - } + assert!( + seeds.len() != 0, + "No seed nodes available to get peers from" + ); let mut allowed_errors = seeds.len(); @@ -129,7 +134,7 @@ where } while let Some(res) = handshake_futs.join_next().await { - if matches!(res, Err(_) | Ok(Err(_)) | Ok(Ok(Err(_)))) { + if matches!(res, Err(_) | Ok(Err(_) | Ok(Err(_)))) { allowed_errors -= 1; } } @@ -144,7 +149,7 @@ where /// Connects to a given outbound peer. #[instrument(level = "info", skip_all)] async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) { - let client_pool = self.client_pool.clone(); + let client_pool = Arc::clone(&self.client_pool); let connection_fut = self .connector_svc .ready() @@ -157,6 +162,7 @@ where tokio::spawn( async move { + #[expect(clippy::significant_drop_in_scrutinee)] if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await { client_pool.add_new_client(peer); } @@ -166,14 +172,16 @@ where } /// Handles a request from the peer set for more peers. + #[expect( + clippy::significant_drop_tightening, + reason = "we need to hold onto a permit" + )] async fn handle_peer_request( &mut self, req: &MakeConnectionRequest, ) -> Result<(), OutboundConnectorError> { // try to get a permit. - let permit = self - .outbound_semaphore - .clone() + let permit = Arc::clone(&self.outbound_semaphore) .try_acquire_owned() .or_else(|_| { // if we can't get a permit add one if we are below the max number of connections. @@ -183,7 +191,9 @@ where } else { self.outbound_semaphore.add_permits(1); self.extra_peers += 1; - Ok(self.outbound_semaphore.clone().try_acquire_owned().unwrap()) + Ok(Arc::clone(&self.outbound_semaphore) + .try_acquire_owned() + .unwrap()) } })?; @@ -272,12 +282,12 @@ where tracing::info!("Shutting down outbound connector, make connection channel closed."); return; }; - // We can't really do much about errors in this function. + #[expect(clippy::let_underscore_must_use, reason = "We can't really do much about errors in this function.")] let _ = self.handle_peer_request(&peer_req).await; }, // This future is not cancellation safe as you will lose your space in the queue but as we are the only place // that actually requires permits that should be ok. - Ok(permit) = self.outbound_semaphore.clone().acquire_owned() => { + Ok(permit) = Arc::clone(&self.outbound_semaphore).acquire_owned() => { if self.handle_free_permit(permit).await.is_err() { // if we got an error then we still have a permit free so to prevent this from just looping // uncontrollably add a timeout. diff --git a/p2p/p2p/src/inbound_server.rs b/p2p/p2p/src/inbound_server.rs index 80ff38e..0d50d54 100644 --- a/p2p/p2p/src/inbound_server.rs +++ b/p2p/p2p/src/inbound_server.rs @@ -100,7 +100,7 @@ where }; // If we're still behind our maximum limit, Initiate handshake. - if let Ok(permit) = semaphore.clone().try_acquire_owned() { + if let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() { tracing::debug!("Permit free for incoming connection, attempting handshake."); let fut = handshaker.ready().await?.call(DoHandshakeRequest { @@ -111,11 +111,12 @@ where permit: Some(permit), }); - let cloned_pool = client_pool.clone(); + let cloned_pool = Arc::clone(&client_pool); tokio::spawn( async move { - if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, fut).await { + let client = timeout(HANDSHAKE_TIMEOUT, fut).await; + if let Ok(Ok(peer)) = client { cloned_pool.add_new_client(peer); } } @@ -133,8 +134,10 @@ where let fut = timeout(PING_REQUEST_TIMEOUT, peer_stream.next()); // Ok if timeout did not elapsed -> Some if there is a message -> Ok if it has been decoded - if let Ok(Some(Ok(Message::Request(AdminRequestMessage::Ping)))) = fut.await - { + if matches!( + fut.await, + Ok(Some(Ok(Message::Request(AdminRequestMessage::Ping)))) + ) { let response = peer_sink .send( Message::Response(AdminResponseMessage::Ping(PingResponse { @@ -148,7 +151,7 @@ where if let Err(err) = response { tracing::debug!( "Unable to respond to ping request from peer ({addr}): {err}" - ) + ); } } } diff --git a/p2p/p2p/src/lib.rs b/p2p/p2p/src/lib.rs index be18c2a..2f51c6c 100644 --- a/p2p/p2p/src/lib.rs +++ b/p2p/p2p/src/lib.rs @@ -103,7 +103,7 @@ where let outbound_connector = Connector::new(outbound_handshaker); let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new( config.clone(), - client_pool.clone(), + Arc::clone(&client_pool), make_connection_rx, address_book.clone(), outbound_connector, @@ -118,17 +118,17 @@ where ); background_tasks.spawn( inbound_server::inbound_server( - client_pool.clone(), + Arc::clone(&client_pool), inbound_handshaker, address_book.clone(), config, ) .map(|res| { if let Err(e) = res { - tracing::error!("Error in inbound connection listener: {e}") + tracing::error!("Error in inbound connection listener: {e}"); } - tracing::info!("Inbound connection listener shutdown") + tracing::info!("Inbound connection listener shutdown"); }) .instrument(Span::current()), ); @@ -155,7 +155,7 @@ pub struct NetworkInterface { /// on that claimed chain. top_block_watch: watch::Receiver, /// A channel to request extra connections. - #[allow(dead_code)] // will be used eventually + #[expect(dead_code, reason = "will be used eventually")] make_connection_tx: mpsc::Sender, /// The address book service. address_book: BoxCloneService, AddressBookResponse, tower::BoxError>, @@ -184,7 +184,7 @@ impl NetworkInterface { C::Future: Send + 'static, { block_downloader::download_blocks( - self.pool.clone(), + Arc::clone(&self.pool), self.sync_states_svc.clone(), our_chain_service, config, diff --git a/p2p/p2p/src/sync_states.rs b/p2p/p2p/src/sync_states.rs index 70ef6ca..0c03795 100644 --- a/p2p/p2p/src/sync_states.rs +++ b/p2p/p2p/src/sync_states.rs @@ -40,7 +40,7 @@ pub struct NewSyncInfo { /// This is the service that handles: /// 1. Finding out if we need to sync /// 1. Giving the peers that should be synced _from_, to the requester -pub struct PeerSyncSvc { +pub(crate) struct PeerSyncSvc { /// A map of cumulative difficulties to peers. cumulative_difficulties: BTreeMap>>, /// A map of peers to cumulative difficulties. @@ -56,7 +56,7 @@ pub struct PeerSyncSvc { impl PeerSyncSvc { /// Creates a new [`PeerSyncSvc`] with a [`Receiver`](watch::Receiver) that will be updated with /// the highest seen sync data, this makes no guarantees about which peer will be chosen in case of a tie. - pub fn new() -> (Self, watch::Receiver) { + pub(crate) fn new() -> (Self, watch::Receiver) { let (watch_tx, mut watch_rx) = watch::channel(NewSyncInfo { chain_height: 0, top_hash: [0; 32], @@ -108,9 +108,7 @@ impl PeerSyncSvc { if let Some(block_needed) = block_needed { // we just use CRYPTONOTE_MAX_BLOCK_HEIGHT as the blockchain height, this only means // we don't take into account the tip blocks which are not pruned. - self.peers - .get(peer) - .unwrap() + self.peers[peer] .1 .has_full_block(block_needed, CRYPTONOTE_MAX_BLOCK_HEIGHT) } else { @@ -126,7 +124,7 @@ impl PeerSyncSvc { &mut self, peer_id: InternalPeerID, handle: ConnectionHandle, - core_sync_data: CoreSyncData, + core_sync_data: &CoreSyncData, ) -> Result<(), tower::BoxError> { tracing::trace!( "Received new core sync data from peer, top hash: {}", @@ -176,7 +174,7 @@ impl PeerSyncSvc { self.closed_connections.push(PeerDisconnectFut { closed_fut: handle.closed(), peer_id: Some(peer_id), - }) + }); } self.cumulative_difficulties @@ -190,11 +188,15 @@ impl PeerSyncSvc { || self .last_peer_in_watcher_handle .as_ref() - .is_some_and(|handle| handle.is_closed()) + .is_some_and(ConnectionHandle::is_closed) { tracing::debug!( "Updating sync watcher channel with new highest seen cumulative difficulty: {new_cumulative_difficulty}" ); + #[expect( + clippy::let_underscore_must_use, + reason = "dropped receivers can be ignored" + )] let _ = self.new_height_watcher.send(NewSyncInfo { top_hash: core_sync_data.top_id, chain_height: core_sync_data.current_height, @@ -228,8 +230,8 @@ impl Service> for PeerSyncSvc { block_needed, ))), PeerSyncRequest::IncomingCoreSyncData(peer_id, handle, sync_data) => self - .update_peer_sync_info(peer_id, handle, sync_data) - .map(|_| PeerSyncResponse::Ok), + .update_peer_sync_info(peer_id, handle, &sync_data) + .map(|()| PeerSyncResponse::Ok), }; ready(res) @@ -413,6 +415,6 @@ mod tests { assert!( peers.contains(&InternalPeerID::Unknown(0)) && peers.contains(&InternalPeerID::Unknown(1)) - ) + ); } }