mirror of
https://github.com/Cuprate/cuprate.git
synced 2025-01-22 18:54:34 +00:00
use JoinSet in connection maintainer
This commit is contained in:
parent
37e7c29c48
commit
e59bb0a1f5
1 changed files with 5 additions and 5 deletions
|
@ -5,10 +5,10 @@
|
||||||
//! no connected peer has.
|
//! no connected peer has.
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use futures::{stream::FuturesUnordered, StreamExt};
|
|
||||||
use rand::{distributions::Bernoulli, prelude::*};
|
use rand::{distributions::Bernoulli, prelude::*};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{mpsc, OwnedSemaphorePermit, Semaphore},
|
sync::{mpsc, OwnedSemaphorePermit, Semaphore},
|
||||||
|
task::JoinSet,
|
||||||
time::{sleep, timeout},
|
time::{sleep, timeout},
|
||||||
};
|
};
|
||||||
use tower::{Service, ServiceExt};
|
use tower::{Service, ServiceExt};
|
||||||
|
@ -111,7 +111,7 @@ where
|
||||||
|
|
||||||
let mut allowed_errors = seeds.len();
|
let mut allowed_errors = seeds.len();
|
||||||
|
|
||||||
let mut handshake_futs = FuturesUnordered::new();
|
let mut handshake_futs = JoinSet::new();
|
||||||
|
|
||||||
for seed in seeds {
|
for seed in seeds {
|
||||||
info!("Getting peers from seed node: {}", seed);
|
info!("Getting peers from seed node: {}", seed);
|
||||||
|
@ -131,10 +131,10 @@ where
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
// Spawn the handshake on a separate task with a timeout, so we don't get stuck connecting to a peer.
|
// Spawn the handshake on a separate task with a timeout, so we don't get stuck connecting to a peer.
|
||||||
handshake_futs.push(tokio::spawn(fut));
|
handshake_futs.spawn(fut);
|
||||||
}
|
}
|
||||||
|
|
||||||
while let Some(res) = handshake_futs.next().await {
|
while let Some(res) = handshake_futs.join_next().await {
|
||||||
if matches!(res, Err(_) | Ok(Err(_)) | Ok(Ok(Err(_)))) {
|
if matches!(res, Err(_) | Ok(Err(_)) | Ok(Ok(Err(_)))) {
|
||||||
allowed_errors -= 1;
|
allowed_errors -= 1;
|
||||||
}
|
}
|
||||||
|
@ -231,7 +231,7 @@ where
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::debug!("Permit available making outbound connection.");
|
tracing::debug!("Permit available, making outbound connection.");
|
||||||
|
|
||||||
let req = if self.peer_type_gen.sample(&mut thread_rng()) {
|
let req = if self.peer_type_gen.sample(&mut thread_rng()) {
|
||||||
AddressBookRequest::TakeRandomGrayPeer { height: None }
|
AddressBookRequest::TakeRandomGrayPeer { height: None }
|
||||||
|
|
Loading…
Reference in a new issue