p2p: enable workspace lints (#289)

* p2p: enable workspace lints

* fmt

* fixes

* fixes

* fixes

* review fixes
This commit is contained in:
hinto-janai 2024-09-20 20:36:39 -04:00 committed by GitHub
parent c840053854
commit f4c88b6f05
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 116 additions and 91 deletions

View file

@ -39,3 +39,6 @@ cuprate-test-utils = { path = "../../test-utils" }
indexmap = { workspace = true } indexmap = { workspace = true }
proptest = { workspace = true } proptest = { workspace = true }
tokio-test = { workspace = true } tokio-test = { workspace = true }
[lints]
workspace = true

View file

@ -78,7 +78,7 @@ pub struct BlockDownloaderConfig {
/// An error that occurred in the [`BlockDownloader`]. /// An error that occurred in the [`BlockDownloader`].
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum BlockDownloadError { pub(crate) enum BlockDownloadError {
#[error("A request to a peer timed out.")] #[error("A request to a peer timed out.")]
TimedOut, TimedOut,
#[error("The block buffer was closed.")] #[error("The block buffer was closed.")]
@ -219,7 +219,7 @@ struct BlockDownloader<N: NetworkZone, S, C> {
/// The running chain entry tasks. /// The running chain entry tasks.
/// ///
/// Returns a result of the chain entry or an error. /// Returns a result of the chain entry or an error.
#[allow(clippy::type_complexity)] #[expect(clippy::type_complexity)]
chain_entry_task: JoinSet<Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError>>, chain_entry_task: JoinSet<Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError>>,
/// The current inflight requests. /// The current inflight requests.
@ -273,7 +273,7 @@ where
} }
/// Checks if we can make use of any peers that are currently pending requests. /// Checks if we can make use of any peers that are currently pending requests.
async fn check_pending_peers( fn check_pending_peers(
&mut self, &mut self,
chain_tracker: &mut ChainTracker<N>, chain_tracker: &mut ChainTracker<N>,
pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>, pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>,
@ -287,7 +287,8 @@ where
continue; continue;
} }
if let Some(peer) = self.try_handle_free_client(chain_tracker, peer).await { let client = self.try_handle_free_client(chain_tracker, peer);
if let Some(peer) = client {
// This peer is ok however it does not have the data we currently need, this will only happen // This peer is ok however it does not have the data we currently need, this will only happen
// because of its pruning seed so just skip over all peers with this pruning seed. // because of its pruning seed so just skip over all peers with this pruning seed.
peers.push(peer); peers.push(peer);
@ -303,7 +304,7 @@ where
/// for them. /// for them.
/// ///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the batch according to its pruning seed. /// Returns the [`ClientPoolDropGuard`] back if it doesn't have the batch according to its pruning seed.
async fn request_inflight_batch_again( fn request_inflight_batch_again(
&mut self, &mut self,
client: ClientPoolDropGuard<N>, client: ClientPoolDropGuard<N>,
) -> Option<ClientPoolDropGuard<N>> { ) -> Option<ClientPoolDropGuard<N>> {
@ -354,7 +355,7 @@ where
/// ///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according /// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according
/// to its pruning seed. /// to its pruning seed.
async fn request_block_batch( fn request_block_batch(
&mut self, &mut self,
chain_tracker: &mut ChainTracker<N>, chain_tracker: &mut ChainTracker<N>,
client: ClientPoolDropGuard<N>, client: ClientPoolDropGuard<N>,
@ -399,7 +400,7 @@ where
// If our ready queue is too large send duplicate requests for the blocks we are waiting on. // If our ready queue is too large send duplicate requests for the blocks we are waiting on.
if self.block_queue.size() >= self.config.in_progress_queue_size { if self.block_queue.size() >= self.config.in_progress_queue_size {
return self.request_inflight_batch_again(client).await; return self.request_inflight_batch_again(client);
} }
// No failed requests that we can handle, request some new blocks. // No failed requests that we can handle, request some new blocks.
@ -434,7 +435,7 @@ where
/// ///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according /// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according
/// to its pruning seed. /// to its pruning seed.
async fn try_handle_free_client( fn try_handle_free_client(
&mut self, &mut self,
chain_tracker: &mut ChainTracker<N>, chain_tracker: &mut ChainTracker<N>,
client: ClientPoolDropGuard<N>, client: ClientPoolDropGuard<N>,
@ -472,7 +473,7 @@ where
} }
// Request a batch of blocks instead. // Request a batch of blocks instead.
self.request_block_batch(chain_tracker, client).await self.request_block_batch(chain_tracker, client)
} }
/// Checks the [`ClientPool`] for free peers. /// Checks the [`ClientPool`] for free peers.
@ -516,7 +517,7 @@ where
.push(client); .push(client);
} }
self.check_pending_peers(chain_tracker, pending_peers).await; self.check_pending_peers(chain_tracker, pending_peers);
Ok(()) Ok(())
} }
@ -574,7 +575,7 @@ where
.or_default() .or_default()
.push(client); .push(client);
self.check_pending_peers(chain_tracker, pending_peers).await; self.check_pending_peers(chain_tracker, pending_peers);
return Ok(()); return Ok(());
}; };
@ -611,7 +612,7 @@ where
.or_default() .or_default()
.push(client); .push(client);
self.check_pending_peers(chain_tracker, pending_peers).await; self.check_pending_peers(chain_tracker, pending_peers);
Ok(()) Ok(())
} }
@ -679,7 +680,7 @@ where
.or_default() .or_default()
.push(client); .push(client);
self.check_pending_peers(&mut chain_tracker, &mut pending_peers).await; self.check_pending_peers(&mut chain_tracker, &mut pending_peers);
} }
Err(_) => self.amount_of_empty_chain_entries += 1 Err(_) => self.amount_of_empty_chain_entries += 1
} }
@ -698,7 +699,7 @@ struct BlockDownloadTaskResponse<N: NetworkZone> {
} }
/// Returns if a peer has all the blocks in a range, according to its [`PruningSeed`]. /// Returns if a peer has all the blocks in a range, according to its [`PruningSeed`].
fn client_has_block_in_range( const fn client_has_block_in_range(
pruning_seed: &PruningSeed, pruning_seed: &PruningSeed,
start_height: usize, start_height: usize,
length: usize, length: usize,

View file

@ -13,7 +13,7 @@ use super::{BlockBatch, BlockDownloadError};
/// ///
/// Also, the [`Ord`] impl is reversed so older blocks (lower height) come first in a [`BinaryHeap`]. /// Also, the [`Ord`] impl is reversed so older blocks (lower height) come first in a [`BinaryHeap`].
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ReadyQueueBatch { pub(crate) struct ReadyQueueBatch {
/// The start height of the batch. /// The start height of the batch.
pub start_height: usize, pub start_height: usize,
/// The batch of blocks. /// The batch of blocks.
@ -43,7 +43,7 @@ impl Ord for ReadyQueueBatch {
/// The block queue that holds downloaded block batches, adding them to the [`async_buffer`] when the /// The block queue that holds downloaded block batches, adding them to the [`async_buffer`] when the
/// oldest batch has been downloaded. /// oldest batch has been downloaded.
pub struct BlockQueue { pub(crate) struct BlockQueue {
/// A queue of ready batches. /// A queue of ready batches.
ready_batches: BinaryHeap<ReadyQueueBatch>, ready_batches: BinaryHeap<ReadyQueueBatch>,
/// The size, in bytes, of all the batches in [`Self::ready_batches`]. /// The size, in bytes, of all the batches in [`Self::ready_batches`].
@ -55,8 +55,8 @@ pub struct BlockQueue {
impl BlockQueue { impl BlockQueue {
/// Creates a new [`BlockQueue`]. /// Creates a new [`BlockQueue`].
pub fn new(buffer_appender: BufferAppender<BlockBatch>) -> BlockQueue { pub(crate) const fn new(buffer_appender: BufferAppender<BlockBatch>) -> Self {
BlockQueue { Self {
ready_batches: BinaryHeap::new(), ready_batches: BinaryHeap::new(),
ready_batches_size: 0, ready_batches_size: 0,
buffer_appender, buffer_appender,
@ -64,12 +64,12 @@ impl BlockQueue {
} }
/// Returns the oldest batch that has not been put in the [`async_buffer`] yet. /// Returns the oldest batch that has not been put in the [`async_buffer`] yet.
pub fn oldest_ready_batch(&self) -> Option<usize> { pub(crate) fn oldest_ready_batch(&self) -> Option<usize> {
self.ready_batches.peek().map(|batch| batch.start_height) self.ready_batches.peek().map(|batch| batch.start_height)
} }
/// Returns the size of all the batches that have not been put into the [`async_buffer`] yet. /// Returns the size of all the batches that have not been put into the [`async_buffer`] yet.
pub fn size(&self) -> usize { pub(crate) const fn size(&self) -> usize {
self.ready_batches_size self.ready_batches_size
} }
@ -77,7 +77,7 @@ impl BlockQueue {
/// ///
/// `oldest_in_flight_start_height` should be the start height of the oldest batch that is still inflight, if /// `oldest_in_flight_start_height` should be the start height of the oldest batch that is still inflight, if
/// there are no batches inflight then this should be [`None`]. /// there are no batches inflight then this should be [`None`].
pub async fn add_incoming_batch( pub(crate) async fn add_incoming_batch(
&mut self, &mut self,
new_batch: ReadyQueueBatch, new_batch: ReadyQueueBatch,
oldest_in_flight_start_height: Option<usize>, oldest_in_flight_start_height: Option<usize>,

View file

@ -20,7 +20,7 @@ pub(crate) struct ChainEntry<N: NetworkZone> {
/// A batch of blocks to retrieve. /// A batch of blocks to retrieve.
#[derive(Clone)] #[derive(Clone)]
pub struct BlocksToRetrieve<N: NetworkZone> { pub(crate) struct BlocksToRetrieve<N: NetworkZone> {
/// The block IDs to get. /// The block IDs to get.
pub ids: ByteArrayVec<32>, pub ids: ByteArrayVec<32>,
/// The hash of the last block before this batch. /// The hash of the last block before this batch.
@ -39,7 +39,7 @@ pub struct BlocksToRetrieve<N: NetworkZone> {
/// An error returned from the [`ChainTracker`]. /// An error returned from the [`ChainTracker`].
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum ChainTrackerError { pub(crate) enum ChainTrackerError {
/// The new chain entry is invalid. /// The new chain entry is invalid.
NewEntryIsInvalid, NewEntryIsInvalid,
/// The new chain entry does not follow from the top of our chain tracker. /// The new chain entry does not follow from the top of our chain tracker.
@ -50,7 +50,7 @@ pub enum ChainTrackerError {
/// ///
/// This struct allows following a single chain. It takes in [`ChainEntry`]s and /// This struct allows following a single chain. It takes in [`ChainEntry`]s and
/// allows getting [`BlocksToRetrieve`]. /// allows getting [`BlocksToRetrieve`].
pub struct ChainTracker<N: NetworkZone> { pub(crate) struct ChainTracker<N: NetworkZone> {
/// A list of [`ChainEntry`]s, in order. /// A list of [`ChainEntry`]s, in order.
entries: VecDeque<ChainEntry<N>>, entries: VecDeque<ChainEntry<N>>,
/// The height of the first block, in the first entry in [`Self::entries`]. /// The height of the first block, in the first entry in [`Self::entries`].
@ -65,7 +65,7 @@ pub struct ChainTracker<N: NetworkZone> {
impl<N: NetworkZone> ChainTracker<N> { impl<N: NetworkZone> ChainTracker<N> {
/// Creates a new chain tracker. /// Creates a new chain tracker.
pub fn new( pub(crate) fn new(
new_entry: ChainEntry<N>, new_entry: ChainEntry<N>,
first_height: usize, first_height: usize,
our_genesis: [u8; 32], our_genesis: [u8; 32],
@ -76,9 +76,9 @@ impl<N: NetworkZone> ChainTracker<N> {
entries.push_back(new_entry); entries.push_back(new_entry);
Self { Self {
top_seen_hash,
entries, entries,
first_height, first_height,
top_seen_hash,
previous_hash, previous_hash,
our_genesis, our_genesis,
} }
@ -86,17 +86,17 @@ impl<N: NetworkZone> ChainTracker<N> {
/// Returns `true` if the peer is expected to have the next block after our highest seen block /// Returns `true` if the peer is expected to have the next block after our highest seen block
/// according to their pruning seed. /// according to their pruning seed.
pub fn should_ask_for_next_chain_entry(&self, seed: &PruningSeed) -> bool { pub(crate) fn should_ask_for_next_chain_entry(&self, seed: &PruningSeed) -> bool {
seed.has_full_block(self.top_height(), CRYPTONOTE_MAX_BLOCK_HEIGHT) seed.has_full_block(self.top_height(), CRYPTONOTE_MAX_BLOCK_HEIGHT)
} }
/// Returns the simple history, the highest seen block and the genesis block. /// Returns the simple history, the highest seen block and the genesis block.
pub fn get_simple_history(&self) -> [[u8; 32]; 2] { pub(crate) const fn get_simple_history(&self) -> [[u8; 32]; 2] {
[self.top_seen_hash, self.our_genesis] [self.top_seen_hash, self.our_genesis]
} }
/// Returns the height of the highest block we are tracking. /// Returns the height of the highest block we are tracking.
pub fn top_height(&self) -> usize { pub(crate) fn top_height(&self) -> usize {
let top_block_idx = self let top_block_idx = self
.entries .entries
.iter() .iter()
@ -110,7 +110,7 @@ impl<N: NetworkZone> ChainTracker<N> {
/// ///
/// # Panics /// # Panics
/// This function panics if `batch_size` is `0`. /// This function panics if `batch_size` is `0`.
pub fn block_requests_queued(&self, batch_size: usize) -> usize { pub(crate) fn block_requests_queued(&self, batch_size: usize) -> usize {
self.entries self.entries
.iter() .iter()
.map(|entry| entry.ids.len().div_ceil(batch_size)) .map(|entry| entry.ids.len().div_ceil(batch_size))
@ -118,7 +118,10 @@ impl<N: NetworkZone> ChainTracker<N> {
} }
/// Attempts to add an incoming [`ChainEntry`] to the chain tracker. /// Attempts to add an incoming [`ChainEntry`] to the chain tracker.
pub fn add_entry(&mut self, mut chain_entry: ChainEntry<N>) -> Result<(), ChainTrackerError> { pub(crate) fn add_entry(
&mut self,
mut chain_entry: ChainEntry<N>,
) -> Result<(), ChainTrackerError> {
if chain_entry.ids.is_empty() { if chain_entry.ids.is_empty() {
// The peer must send at lest one overlapping block. // The peer must send at lest one overlapping block.
chain_entry.handle.ban_peer(MEDIUM_BAN); chain_entry.handle.ban_peer(MEDIUM_BAN);
@ -154,7 +157,7 @@ impl<N: NetworkZone> ChainTracker<N> {
/// Returns a batch of blocks to request. /// Returns a batch of blocks to request.
/// ///
/// The returned batches length will be less than or equal to `max_blocks` /// The returned batches length will be less than or equal to `max_blocks`
pub fn blocks_to_get( pub(crate) fn blocks_to_get(
&mut self, &mut self,
pruning_seed: &PruningSeed, pruning_seed: &PruningSeed,
max_blocks: usize, max_blocks: usize,

View file

@ -30,6 +30,7 @@ use crate::{
attempt = _attempt attempt = _attempt
) )
)] )]
#[expect(clippy::used_underscore_binding)]
pub async fn download_batch_task<N: NetworkZone>( pub async fn download_batch_task<N: NetworkZone>(
client: ClientPoolDropGuard<N>, client: ClientPoolDropGuard<N>,
ids: ByteArrayVec<32>, ids: ByteArrayVec<32>,
@ -103,6 +104,7 @@ async fn request_batch_from_peer<N: NetworkZone>(
Ok((client, batch)) Ok((client, batch))
} }
#[expect(clippy::needless_pass_by_value)]
fn deserialize_batch( fn deserialize_batch(
blocks_response: GetObjectsResponse, blocks_response: GetObjectsResponse,
expected_start_height: usize, expected_start_height: usize,

View file

@ -30,7 +30,7 @@ use crate::{
/// ///
/// Because the block downloader only follows and downloads one chain we only have to send the block hash of /// Because the block downloader only follows and downloads one chain we only have to send the block hash of
/// top block we have found and the genesis block, this is then called `short_history`. /// top block we have found and the genesis block, this is then called `short_history`.
pub async fn request_chain_entry_from_peer<N: NetworkZone>( pub(crate) async fn request_chain_entry_from_peer<N: NetworkZone>(
mut client: ClientPoolDropGuard<N>, mut client: ClientPoolDropGuard<N>,
short_history: [[u8; 32]; 2], short_history: [[u8; 32]; 2],
) -> Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError> { ) -> Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError> {
@ -179,7 +179,7 @@ where
Some(res) => { Some(res) => {
// res has already been set, replace it if this peer claims higher cumulative difficulty // res has already been set, replace it if this peer claims higher cumulative difficulty
if res.0.cumulative_difficulty() < task_res.0.cumulative_difficulty() { if res.0.cumulative_difficulty() < task_res.0.cumulative_difficulty() {
let _ = mem::replace(res, task_res); drop(mem::replace(res, task_res));
} }
} }
None => { None => {

View file

@ -47,6 +47,7 @@ proptest! {
let tokio_pool = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); let tokio_pool = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
#[expect(clippy::significant_drop_tightening)]
tokio_pool.block_on(async move { tokio_pool.block_on(async move {
timeout(Duration::from_secs(600), async move { timeout(Duration::from_secs(600), async move {
let client_pool = ClientPool::new(); let client_pool = ClientPool::new();
@ -54,7 +55,7 @@ proptest! {
let mut peer_ids = Vec::with_capacity(peers); let mut peer_ids = Vec::with_capacity(peers);
for _ in 0..peers { for _ in 0..peers {
let client = mock_block_downloader_client(blockchain.clone()); let client = mock_block_downloader_client(Arc::clone(&blockchain));
peer_ids.push(client.info.id); peer_ids.push(client.info.id);
@ -156,7 +157,7 @@ prop_compose! {
for (height, mut block) in blocks.into_iter().enumerate() { for (height, mut block) in blocks.into_iter().enumerate() {
if let Some(last) = blockchain.last() { if let Some(last) = blockchain.last() {
block.0.header.previous = *last.0; block.0.header.previous = *last.0;
block.0.miner_transaction.prefix_mut().inputs = vec![Input::Gen(height)] block.0.miner_transaction.prefix_mut().inputs = vec![Input::Gen(height)];
} }
blockchain.insert(block.0.hash(), block); blockchain.insert(block.0.hash(), block);
@ -173,7 +174,7 @@ fn mock_block_downloader_client(blockchain: Arc<MockBlockchain>) -> Client<Clear
cuprate_p2p_core::handles::HandleBuilder::new().build(); cuprate_p2p_core::handles::HandleBuilder::new().build();
let request_handler = service_fn(move |req: PeerRequest| { let request_handler = service_fn(move |req: PeerRequest| {
let bc = blockchain.clone(); let bc = Arc::clone(&blockchain);
async move { async move {
match req { match req {

View file

@ -35,7 +35,7 @@ use crate::constants::{
/// The configuration for the [`BroadcastSvc`]. /// The configuration for the [`BroadcastSvc`].
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct BroadcastConfig { pub(crate) struct BroadcastConfig {
/// The average number of seconds between diffusion flushes for outbound connections. /// The average number of seconds between diffusion flushes for outbound connections.
pub diffusion_flush_average_seconds_outbound: Duration, pub diffusion_flush_average_seconds_outbound: Duration,
/// The average number of seconds between diffusion flushes for inbound connections. /// The average number of seconds between diffusion flushes for inbound connections.
@ -57,7 +57,7 @@ impl Default for BroadcastConfig {
/// - The [`BroadcastSvc`] /// - The [`BroadcastSvc`]
/// - A function that takes in [`InternalPeerID`]s and produces [`BroadcastMessageStream`]s to give to **outbound** peers. /// - A function that takes in [`InternalPeerID`]s and produces [`BroadcastMessageStream`]s to give to **outbound** peers.
/// - A function that takes in [`InternalPeerID`]s and produces [`BroadcastMessageStream`]s to give to **inbound** peers. /// - A function that takes in [`InternalPeerID`]s and produces [`BroadcastMessageStream`]s to give to **inbound** peers.
pub fn init_broadcast_channels<N: NetworkZone>( pub(crate) fn init_broadcast_channels<N: NetworkZone>(
config: BroadcastConfig, config: BroadcastConfig,
) -> ( ) -> (
BroadcastSvc<N>, BroadcastSvc<N>,
@ -193,7 +193,7 @@ impl<N: NetworkZone> Service<BroadcastRequest<N>> for BroadcastSvc<N> {
}; };
// An error here means _all_ receivers were dropped which we assume will never happen. // An error here means _all_ receivers were dropped which we assume will never happen.
let _ = match direction { drop(match direction {
Some(ConnectionDirection::Inbound) => { Some(ConnectionDirection::Inbound) => {
self.tx_broadcast_channel_inbound.send(nex_tx_info) self.tx_broadcast_channel_inbound.send(nex_tx_info)
} }
@ -201,10 +201,10 @@ impl<N: NetworkZone> Service<BroadcastRequest<N>> for BroadcastSvc<N> {
self.tx_broadcast_channel_outbound.send(nex_tx_info) self.tx_broadcast_channel_outbound.send(nex_tx_info)
} }
None => { None => {
let _ = self.tx_broadcast_channel_outbound.send(nex_tx_info.clone()); drop(self.tx_broadcast_channel_outbound.send(nex_tx_info.clone()));
self.tx_broadcast_channel_inbound.send(nex_tx_info) self.tx_broadcast_channel_inbound.send(nex_tx_info)
} }
}; });
} }
} }
@ -246,7 +246,7 @@ struct BroadcastTxInfo<N: NetworkZone> {
/// ///
/// This is given to the connection task to await on for broadcast messages. /// This is given to the connection task to await on for broadcast messages.
#[pin_project::pin_project] #[pin_project::pin_project]
pub struct BroadcastMessageStream<N: NetworkZone> { pub(crate) struct BroadcastMessageStream<N: NetworkZone> {
/// The peer that is holding this stream. /// The peer that is holding this stream.
addr: InternalPeerID<N::Addr>, addr: InternalPeerID<N::Addr>,
@ -336,8 +336,9 @@ impl<N: NetworkZone> Stream for BroadcastMessageStream<N> {
Poll::Ready(Some(BroadcastMessage::NewTransaction(txs))) Poll::Ready(Some(BroadcastMessage::NewTransaction(txs)))
} else { } else {
tracing::trace!("Diffusion flush timer expired but no txs to diffuse"); tracing::trace!("Diffusion flush timer expired but no txs to diffuse");
// poll next_flush now to register the waker with it // poll next_flush now to register the waker with it.
// the waker will already be registered with the block broadcast channel. // the waker will already be registered with the block broadcast channel.
#[expect(clippy::let_underscore_must_use)]
let _ = this.next_flush.poll(cx); let _ = this.next_flush.poll(cx);
Poll::Pending Poll::Pending
} }
@ -458,7 +459,7 @@ mod tests {
let match_tx = |mes, txs| match mes { let match_tx = |mes, txs| match mes {
BroadcastMessage::NewTransaction(tx) => assert_eq!(tx.txs.as_slice(), txs), BroadcastMessage::NewTransaction(tx) => assert_eq!(tx.txs.as_slice(), txs),
_ => panic!("Block broadcast?"), BroadcastMessage::NewFluffyBlock(_) => panic!("Block broadcast?"),
}; };
let next = outbound_stream.next().await.unwrap(); let next = outbound_stream.next().await.unwrap();
@ -520,7 +521,7 @@ mod tests {
let match_tx = |mes, txs| match mes { let match_tx = |mes, txs| match mes {
BroadcastMessage::NewTransaction(tx) => assert_eq!(tx.txs.as_slice(), txs), BroadcastMessage::NewTransaction(tx) => assert_eq!(tx.txs.as_slice(), txs),
_ => panic!("Block broadcast?"), BroadcastMessage::NewFluffyBlock(_) => panic!("Block broadcast?"),
}; };
let next = outbound_stream.next().await.unwrap(); let next = outbound_stream.next().await.unwrap();
@ -536,6 +537,6 @@ mod tests {
futures::future::select(inbound_stream_from.next(), outbound_stream_from.next()) futures::future::select(inbound_stream_from.next(), outbound_stream_from.next())
) )
.await .await
.is_err()) .is_err());
} }
} }

View file

@ -8,7 +8,7 @@
//! returns the peer to the pool when it is dropped. //! returns the peer to the pool when it is dropped.
//! //!
//! Internally the pool is a [`DashMap`] which means care should be taken in `async` code //! Internally the pool is a [`DashMap`] which means care should be taken in `async` code
//! as internally this uses blocking RwLocks. //! as internally this uses blocking `RwLock`s.
use std::sync::Arc; use std::sync::Arc;
use dashmap::DashMap; use dashmap::DashMap;
@ -24,7 +24,7 @@ use cuprate_p2p_core::{
pub(crate) mod disconnect_monitor; pub(crate) mod disconnect_monitor;
mod drop_guard_client; mod drop_guard_client;
pub use drop_guard_client::ClientPoolDropGuard; pub(crate) use drop_guard_client::ClientPoolDropGuard;
/// The client pool, which holds currently connected free peers. /// The client pool, which holds currently connected free peers.
/// ///
@ -38,16 +38,17 @@ pub struct ClientPool<N: NetworkZone> {
impl<N: NetworkZone> ClientPool<N> { impl<N: NetworkZone> ClientPool<N> {
/// Returns a new [`ClientPool`] wrapped in an [`Arc`]. /// Returns a new [`ClientPool`] wrapped in an [`Arc`].
pub fn new() -> Arc<ClientPool<N>> { pub fn new() -> Arc<Self> {
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
let pool = Arc::new(ClientPool { let pool = Arc::new(Self {
clients: DashMap::new(), clients: DashMap::new(),
new_connection_tx: tx, new_connection_tx: tx,
}); });
tokio::spawn( tokio::spawn(
disconnect_monitor::disconnect_monitor(rx, pool.clone()).instrument(Span::current()), disconnect_monitor::disconnect_monitor(rx, Arc::clone(&pool))
.instrument(Span::current()),
); );
pool pool
@ -69,8 +70,7 @@ impl<N: NetworkZone> ClientPool<N> {
return; return;
} }
let res = self.clients.insert(id, client); assert!(self.clients.insert(id, client).is_none());
assert!(res.is_none());
// We have to check this again otherwise we could have a race condition where a // We have to check this again otherwise we could have a race condition where a
// peer is disconnected after the first check, the disconnect monitor tries to remove it, // peer is disconnected after the first check, the disconnect monitor tries to remove it,
@ -121,7 +121,6 @@ impl<N: NetworkZone> ClientPool<N> {
/// Note that the returned iterator is not guaranteed to contain every peer asked for. /// Note that the returned iterator is not guaranteed to contain every peer asked for.
/// ///
/// See [`Self::borrow_client`] for borrowing a single client. /// See [`Self::borrow_client`] for borrowing a single client.
#[allow(private_interfaces)] // TODO: Remove me when 2024 Rust
pub fn borrow_clients<'a, 'b>( pub fn borrow_clients<'a, 'b>(
self: &'a Arc<Self>, self: &'a Arc<Self>,
peers: &'b [InternalPeerID<N::Addr>], peers: &'b [InternalPeerID<N::Addr>],
@ -133,7 +132,7 @@ impl<N: NetworkZone> ClientPool<N> {
mod sealed { mod sealed {
/// TODO: Remove me when 2024 Rust /// TODO: Remove me when 2024 Rust
/// ///
/// https://rust-lang.github.io/rfcs/3498-lifetime-capture-rules-2024.html#the-captures-trick /// <https://rust-lang.github.io/rfcs/3498-lifetime-capture-rules-2024.html#the-captures-trick>
pub trait Captures<U> {} pub trait Captures<U> {}
impl<T: ?Sized, U> Captures<U> for T {} impl<T: ?Sized, U> Captures<U> for T {}

View file

@ -78,6 +78,6 @@ impl<N: NetworkZone> Future for PeerDisconnectFut<N> {
this.closed_fut this.closed_fut
.poll(cx) .poll(cx)
.map(|_| this.peer_id.take().unwrap()) .map(|()| this.peer_id.take().unwrap())
} }
} }

View file

@ -99,12 +99,17 @@ where
/// Connects to random seeds to get peers and immediately disconnects /// Connects to random seeds to get peers and immediately disconnects
#[instrument(level = "info", skip(self))] #[instrument(level = "info", skip(self))]
#[expect(
clippy::significant_drop_in_scrutinee,
clippy::significant_drop_tightening
)]
async fn connect_to_random_seeds(&mut self) -> Result<(), OutboundConnectorError> { async fn connect_to_random_seeds(&mut self) -> Result<(), OutboundConnectorError> {
let seeds = N::SEEDS.choose_multiple(&mut thread_rng(), MAX_SEED_CONNECTIONS); let seeds = N::SEEDS.choose_multiple(&mut thread_rng(), MAX_SEED_CONNECTIONS);
if seeds.len() == 0 { assert!(
panic!("No seed nodes available to get peers from"); seeds.len() != 0,
} "No seed nodes available to get peers from"
);
let mut allowed_errors = seeds.len(); let mut allowed_errors = seeds.len();
@ -129,7 +134,7 @@ where
} }
while let Some(res) = handshake_futs.join_next().await { while let Some(res) = handshake_futs.join_next().await {
if matches!(res, Err(_) | Ok(Err(_)) | Ok(Ok(Err(_)))) { if matches!(res, Err(_) | Ok(Err(_) | Ok(Err(_)))) {
allowed_errors -= 1; allowed_errors -= 1;
} }
} }
@ -144,7 +149,7 @@ where
/// Connects to a given outbound peer. /// Connects to a given outbound peer.
#[instrument(level = "info", skip_all)] #[instrument(level = "info", skip_all)]
async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) { async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) {
let client_pool = self.client_pool.clone(); let client_pool = Arc::clone(&self.client_pool);
let connection_fut = self let connection_fut = self
.connector_svc .connector_svc
.ready() .ready()
@ -157,6 +162,7 @@ where
tokio::spawn( tokio::spawn(
async move { async move {
#[expect(clippy::significant_drop_in_scrutinee)]
if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await { if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await {
client_pool.add_new_client(peer); client_pool.add_new_client(peer);
} }
@ -166,14 +172,16 @@ where
} }
/// Handles a request from the peer set for more peers. /// Handles a request from the peer set for more peers.
#[expect(
clippy::significant_drop_tightening,
reason = "we need to hold onto a permit"
)]
async fn handle_peer_request( async fn handle_peer_request(
&mut self, &mut self,
req: &MakeConnectionRequest, req: &MakeConnectionRequest,
) -> Result<(), OutboundConnectorError> { ) -> Result<(), OutboundConnectorError> {
// try to get a permit. // try to get a permit.
let permit = self let permit = Arc::clone(&self.outbound_semaphore)
.outbound_semaphore
.clone()
.try_acquire_owned() .try_acquire_owned()
.or_else(|_| { .or_else(|_| {
// if we can't get a permit add one if we are below the max number of connections. // if we can't get a permit add one if we are below the max number of connections.
@ -183,7 +191,9 @@ where
} else { } else {
self.outbound_semaphore.add_permits(1); self.outbound_semaphore.add_permits(1);
self.extra_peers += 1; self.extra_peers += 1;
Ok(self.outbound_semaphore.clone().try_acquire_owned().unwrap()) Ok(Arc::clone(&self.outbound_semaphore)
.try_acquire_owned()
.unwrap())
} }
})?; })?;
@ -272,12 +282,12 @@ where
tracing::info!("Shutting down outbound connector, make connection channel closed."); tracing::info!("Shutting down outbound connector, make connection channel closed.");
return; return;
}; };
// We can't really do much about errors in this function. #[expect(clippy::let_underscore_must_use, reason = "We can't really do much about errors in this function.")]
let _ = self.handle_peer_request(&peer_req).await; let _ = self.handle_peer_request(&peer_req).await;
}, },
// This future is not cancellation safe as you will lose your space in the queue but as we are the only place // This future is not cancellation safe as you will lose your space in the queue but as we are the only place
// that actually requires permits that should be ok. // that actually requires permits that should be ok.
Ok(permit) = self.outbound_semaphore.clone().acquire_owned() => { Ok(permit) = Arc::clone(&self.outbound_semaphore).acquire_owned() => {
if self.handle_free_permit(permit).await.is_err() { if self.handle_free_permit(permit).await.is_err() {
// if we got an error then we still have a permit free so to prevent this from just looping // if we got an error then we still have a permit free so to prevent this from just looping
// uncontrollably add a timeout. // uncontrollably add a timeout.

View file

@ -100,7 +100,7 @@ where
}; };
// If we're still behind our maximum limit, Initiate handshake. // If we're still behind our maximum limit, Initiate handshake.
if let Ok(permit) = semaphore.clone().try_acquire_owned() { if let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() {
tracing::debug!("Permit free for incoming connection, attempting handshake."); tracing::debug!("Permit free for incoming connection, attempting handshake.");
let fut = handshaker.ready().await?.call(DoHandshakeRequest { let fut = handshaker.ready().await?.call(DoHandshakeRequest {
@ -111,11 +111,12 @@ where
permit: Some(permit), permit: Some(permit),
}); });
let cloned_pool = client_pool.clone(); let cloned_pool = Arc::clone(&client_pool);
tokio::spawn( tokio::spawn(
async move { async move {
if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, fut).await { let client = timeout(HANDSHAKE_TIMEOUT, fut).await;
if let Ok(Ok(peer)) = client {
cloned_pool.add_new_client(peer); cloned_pool.add_new_client(peer);
} }
} }
@ -133,8 +134,10 @@ where
let fut = timeout(PING_REQUEST_TIMEOUT, peer_stream.next()); let fut = timeout(PING_REQUEST_TIMEOUT, peer_stream.next());
// Ok if timeout did not elapsed -> Some if there is a message -> Ok if it has been decoded // Ok if timeout did not elapsed -> Some if there is a message -> Ok if it has been decoded
if let Ok(Some(Ok(Message::Request(AdminRequestMessage::Ping)))) = fut.await if matches!(
{ fut.await,
Ok(Some(Ok(Message::Request(AdminRequestMessage::Ping))))
) {
let response = peer_sink let response = peer_sink
.send( .send(
Message::Response(AdminResponseMessage::Ping(PingResponse { Message::Response(AdminResponseMessage::Ping(PingResponse {
@ -148,7 +151,7 @@ where
if let Err(err) = response { if let Err(err) = response {
tracing::debug!( tracing::debug!(
"Unable to respond to ping request from peer ({addr}): {err}" "Unable to respond to ping request from peer ({addr}): {err}"
) );
} }
} }
} }

View file

@ -103,7 +103,7 @@ where
let outbound_connector = Connector::new(outbound_handshaker); let outbound_connector = Connector::new(outbound_handshaker);
let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new( let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new(
config.clone(), config.clone(),
client_pool.clone(), Arc::clone(&client_pool),
make_connection_rx, make_connection_rx,
address_book.clone(), address_book.clone(),
outbound_connector, outbound_connector,
@ -118,17 +118,17 @@ where
); );
background_tasks.spawn( background_tasks.spawn(
inbound_server::inbound_server( inbound_server::inbound_server(
client_pool.clone(), Arc::clone(&client_pool),
inbound_handshaker, inbound_handshaker,
address_book.clone(), address_book.clone(),
config, config,
) )
.map(|res| { .map(|res| {
if let Err(e) = res { if let Err(e) = res {
tracing::error!("Error in inbound connection listener: {e}") tracing::error!("Error in inbound connection listener: {e}");
} }
tracing::info!("Inbound connection listener shutdown") tracing::info!("Inbound connection listener shutdown");
}) })
.instrument(Span::current()), .instrument(Span::current()),
); );
@ -155,7 +155,7 @@ pub struct NetworkInterface<N: NetworkZone> {
/// on that claimed chain. /// on that claimed chain.
top_block_watch: watch::Receiver<sync_states::NewSyncInfo>, top_block_watch: watch::Receiver<sync_states::NewSyncInfo>,
/// A channel to request extra connections. /// A channel to request extra connections.
#[allow(dead_code)] // will be used eventually #[expect(dead_code, reason = "will be used eventually")]
make_connection_tx: mpsc::Sender<MakeConnectionRequest>, make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
/// The address book service. /// The address book service.
address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>, address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>,
@ -184,7 +184,7 @@ impl<N: NetworkZone> NetworkInterface<N> {
C::Future: Send + 'static, C::Future: Send + 'static,
{ {
block_downloader::download_blocks( block_downloader::download_blocks(
self.pool.clone(), Arc::clone(&self.pool),
self.sync_states_svc.clone(), self.sync_states_svc.clone(),
our_chain_service, our_chain_service,
config, config,

View file

@ -40,7 +40,7 @@ pub struct NewSyncInfo {
/// This is the service that handles: /// This is the service that handles:
/// 1. Finding out if we need to sync /// 1. Finding out if we need to sync
/// 1. Giving the peers that should be synced _from_, to the requester /// 1. Giving the peers that should be synced _from_, to the requester
pub struct PeerSyncSvc<N: NetworkZone> { pub(crate) struct PeerSyncSvc<N: NetworkZone> {
/// A map of cumulative difficulties to peers. /// A map of cumulative difficulties to peers.
cumulative_difficulties: BTreeMap<u128, HashSet<InternalPeerID<N::Addr>>>, cumulative_difficulties: BTreeMap<u128, HashSet<InternalPeerID<N::Addr>>>,
/// A map of peers to cumulative difficulties. /// A map of peers to cumulative difficulties.
@ -56,7 +56,7 @@ pub struct PeerSyncSvc<N: NetworkZone> {
impl<N: NetworkZone> PeerSyncSvc<N> { impl<N: NetworkZone> PeerSyncSvc<N> {
/// Creates a new [`PeerSyncSvc`] with a [`Receiver`](watch::Receiver) that will be updated with /// Creates a new [`PeerSyncSvc`] with a [`Receiver`](watch::Receiver) that will be updated with
/// the highest seen sync data, this makes no guarantees about which peer will be chosen in case of a tie. /// the highest seen sync data, this makes no guarantees about which peer will be chosen in case of a tie.
pub fn new() -> (Self, watch::Receiver<NewSyncInfo>) { pub(crate) fn new() -> (Self, watch::Receiver<NewSyncInfo>) {
let (watch_tx, mut watch_rx) = watch::channel(NewSyncInfo { let (watch_tx, mut watch_rx) = watch::channel(NewSyncInfo {
chain_height: 0, chain_height: 0,
top_hash: [0; 32], top_hash: [0; 32],
@ -108,9 +108,7 @@ impl<N: NetworkZone> PeerSyncSvc<N> {
if let Some(block_needed) = block_needed { if let Some(block_needed) = block_needed {
// we just use CRYPTONOTE_MAX_BLOCK_HEIGHT as the blockchain height, this only means // we just use CRYPTONOTE_MAX_BLOCK_HEIGHT as the blockchain height, this only means
// we don't take into account the tip blocks which are not pruned. // we don't take into account the tip blocks which are not pruned.
self.peers self.peers[peer]
.get(peer)
.unwrap()
.1 .1
.has_full_block(block_needed, CRYPTONOTE_MAX_BLOCK_HEIGHT) .has_full_block(block_needed, CRYPTONOTE_MAX_BLOCK_HEIGHT)
} else { } else {
@ -126,7 +124,7 @@ impl<N: NetworkZone> PeerSyncSvc<N> {
&mut self, &mut self,
peer_id: InternalPeerID<N::Addr>, peer_id: InternalPeerID<N::Addr>,
handle: ConnectionHandle, handle: ConnectionHandle,
core_sync_data: CoreSyncData, core_sync_data: &CoreSyncData,
) -> Result<(), tower::BoxError> { ) -> Result<(), tower::BoxError> {
tracing::trace!( tracing::trace!(
"Received new core sync data from peer, top hash: {}", "Received new core sync data from peer, top hash: {}",
@ -176,7 +174,7 @@ impl<N: NetworkZone> PeerSyncSvc<N> {
self.closed_connections.push(PeerDisconnectFut { self.closed_connections.push(PeerDisconnectFut {
closed_fut: handle.closed(), closed_fut: handle.closed(),
peer_id: Some(peer_id), peer_id: Some(peer_id),
}) });
} }
self.cumulative_difficulties self.cumulative_difficulties
@ -190,11 +188,15 @@ impl<N: NetworkZone> PeerSyncSvc<N> {
|| self || self
.last_peer_in_watcher_handle .last_peer_in_watcher_handle
.as_ref() .as_ref()
.is_some_and(|handle| handle.is_closed()) .is_some_and(ConnectionHandle::is_closed)
{ {
tracing::debug!( tracing::debug!(
"Updating sync watcher channel with new highest seen cumulative difficulty: {new_cumulative_difficulty}" "Updating sync watcher channel with new highest seen cumulative difficulty: {new_cumulative_difficulty}"
); );
#[expect(
clippy::let_underscore_must_use,
reason = "dropped receivers can be ignored"
)]
let _ = self.new_height_watcher.send(NewSyncInfo { let _ = self.new_height_watcher.send(NewSyncInfo {
top_hash: core_sync_data.top_id, top_hash: core_sync_data.top_id,
chain_height: core_sync_data.current_height, chain_height: core_sync_data.current_height,
@ -228,8 +230,8 @@ impl<N: NetworkZone> Service<PeerSyncRequest<N>> for PeerSyncSvc<N> {
block_needed, block_needed,
))), ))),
PeerSyncRequest::IncomingCoreSyncData(peer_id, handle, sync_data) => self PeerSyncRequest::IncomingCoreSyncData(peer_id, handle, sync_data) => self
.update_peer_sync_info(peer_id, handle, sync_data) .update_peer_sync_info(peer_id, handle, &sync_data)
.map(|_| PeerSyncResponse::Ok), .map(|()| PeerSyncResponse::Ok),
}; };
ready(res) ready(res)
@ -413,6 +415,6 @@ mod tests {
assert!( assert!(
peers.contains(&InternalPeerID::Unknown(0)) peers.contains(&InternalPeerID::Unknown(0))
&& peers.contains(&InternalPeerID::Unknown(1)) && peers.contains(&InternalPeerID::Unknown(1))
) );
} }
} }