From e59bb0a1f5ba83a0eb24688474d5f462b57e68a2 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Mon, 13 May 2024 23:39:36 +0100 Subject: [PATCH] use JoinSet in connection maintainer --- p2p/cuprate-p2p/src/connection_maintainer.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/p2p/cuprate-p2p/src/connection_maintainer.rs b/p2p/cuprate-p2p/src/connection_maintainer.rs index d56e5094..1e6cf9ca 100644 --- a/p2p/cuprate-p2p/src/connection_maintainer.rs +++ b/p2p/cuprate-p2p/src/connection_maintainer.rs @@ -5,10 +5,10 @@ //! no connected peer has. use std::sync::Arc; -use futures::{stream::FuturesUnordered, StreamExt}; use rand::{distributions::Bernoulli, prelude::*}; use tokio::{ sync::{mpsc, OwnedSemaphorePermit, Semaphore}, + task::JoinSet, time::{sleep, timeout}, }; use tower::{Service, ServiceExt}; @@ -111,7 +111,7 @@ where let mut allowed_errors = seeds.len(); - let mut handshake_futs = FuturesUnordered::new(); + let mut handshake_futs = JoinSet::new(); for seed in seeds { info!("Getting peers from seed node: {}", seed); @@ -131,10 +131,10 @@ where }), ); // Spawn the handshake on a separate task with a timeout, so we don't get stuck connecting to a peer. - handshake_futs.push(tokio::spawn(fut)); + handshake_futs.spawn(fut); } - while let Some(res) = handshake_futs.next().await { + while let Some(res) = handshake_futs.join_next().await { if matches!(res, Err(_) | Ok(Err(_)) | Ok(Ok(Err(_)))) { allowed_errors -= 1; } @@ -231,7 +231,7 @@ where return Ok(()); } - tracing::debug!("Permit available making outbound connection."); + tracing::debug!("Permit available, making outbound connection."); let req = if self.peer_type_gen.sample(&mut thread_rng()) { AddressBookRequest::TakeRandomGrayPeer { height: None }