diff --git a/p2p/cuprate-p2p/src/connection_maintainer.rs b/p2p/cuprate-p2p/src/connection_maintainer.rs index d56e5094..1e6cf9ca 100644 --- a/p2p/cuprate-p2p/src/connection_maintainer.rs +++ b/p2p/cuprate-p2p/src/connection_maintainer.rs @@ -5,10 +5,10 @@ //! no connected peer has. use std::sync::Arc; -use futures::{stream::FuturesUnordered, StreamExt}; use rand::{distributions::Bernoulli, prelude::*}; use tokio::{ sync::{mpsc, OwnedSemaphorePermit, Semaphore}, + task::JoinSet, time::{sleep, timeout}, }; use tower::{Service, ServiceExt}; @@ -111,7 +111,7 @@ where let mut allowed_errors = seeds.len(); - let mut handshake_futs = FuturesUnordered::new(); + let mut handshake_futs = JoinSet::new(); for seed in seeds { info!("Getting peers from seed node: {}", seed); @@ -131,10 +131,10 @@ where }), ); // Spawn the handshake on a separate task with a timeout, so we don't get stuck connecting to a peer. - handshake_futs.push(tokio::spawn(fut)); + handshake_futs.spawn(fut); } - while let Some(res) = handshake_futs.next().await { + while let Some(res) = handshake_futs.join_next().await { if matches!(res, Err(_) | Ok(Err(_)) | Ok(Ok(Err(_)))) { allowed_errors -= 1; } @@ -231,7 +231,7 @@ where return Ok(()); } - tracing::debug!("Permit available making outbound connection."); + tracing::debug!("Permit available, making outbound connection."); let req = if self.peer_type_gen.sample(&mut thread_rng()) { AddressBookRequest::TakeRandomGrayPeer { height: None }