Retry if initial dials fail, not just upon disconnect

This commit is contained in:
Luke Parker 2024-04-21 07:26:16 -04:00
parent be7780e69d
commit 320b5627b5
No known key found for this signature in database

View file

@ -294,6 +294,8 @@ impl LibP2p {
// The addrs we're currently dialing, and the networks associated with them // The addrs we're currently dialing, and the networks associated with them
let dialing_peers = Arc::new(RwLock::new(HashMap::new())); let dialing_peers = Arc::new(RwLock::new(HashMap::new()));
// The peers we're currently connected to, and the networks associated with them
let connected_peers = Arc::new(RwLock::new(HashMap::<Multiaddr, HashSet<NetworkId>>::new()));
// Find and connect to peers // Find and connect to peers
let (connect_to_network_send, mut connect_to_network_recv) = let (connect_to_network_send, mut connect_to_network_recv) =
@ -301,12 +303,16 @@ impl LibP2p {
let (to_dial_send, mut to_dial_recv) = tokio::sync::mpsc::unbounded_channel(); let (to_dial_send, mut to_dial_recv) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn({ tokio::spawn({
let dialing_peers = dialing_peers.clone(); let dialing_peers = dialing_peers.clone();
let connected_peers = connected_peers.clone();
let connect_to_network_send = connect_to_network_send.clone(); let connect_to_network_send = connect_to_network_send.clone();
async move { async move {
loop { loop {
let connect = |network: NetworkId, addr: Multiaddr| { let connect = |network: NetworkId, addr: Multiaddr| {
let dialing_peers = dialing_peers.clone(); let dialing_peers = dialing_peers.clone();
let connected_peers = connected_peers.clone();
let to_dial_send = to_dial_send.clone(); let to_dial_send = to_dial_send.clone();
let connect_to_network_send = connect_to_network_send.clone();
async move { async move {
log::info!("found peer from substrate: {addr}"); log::info!("found peer from substrate: {addr}");
@ -344,15 +350,37 @@ impl LibP2p {
// within a temporal window // within a temporal window
tokio::spawn({ tokio::spawn({
let dialing_peers = dialing_peers.clone(); let dialing_peers = dialing_peers.clone();
let connected_peers = connected_peers.clone();
let connect_to_network_send = connect_to_network_send.clone();
let addr = addr.clone(); let addr = addr.clone();
async move { async move {
tokio::time::sleep(core::time::Duration::from_secs(60)).await; tokio::time::sleep(core::time::Duration::from_secs(60)).await;
let mut dialing_peers = dialing_peers.write().await; let mut dialing_peers = dialing_peers.write().await;
dialing_peers.remove(&addr); if let Some(expected_nets) = dialing_peers.remove(&addr) {
log::debug!("removed addr from dialing upon timeout: {addr}");
// TODO: De-duplicate this below instance
// If we failed to dial and haven't gotten enough actual connections, retry
let connected_peers = connected_peers.read().await;
for net in expected_nets {
let mut remaining_peers = 0;
for nets in connected_peers.values() {
if nets.contains(&net) {
remaining_peers += 1;
}
}
// If we do not, start connecting to this network again
if remaining_peers < 3 {
connect_to_network_send.send(net).expect(
"couldn't send net to connect to due to disconnects (receiver dropped?)",
);
}
}
}
} }
}); });
if !is_fresh_dial { if is_fresh_dial {
to_dial_send.send((addr, nets)).unwrap(); to_dial_send.send((addr, nets)).unwrap();
} }
} }
@ -374,8 +402,6 @@ impl LibP2p {
network, network,
nodes.len() nodes.len()
); );
// TODO: We weren't retry so long as we're told of sufficient nodes
// We should stop retrying when we actually connect to sufficient nodes
to_retry.push(network); to_retry.push(network);
for node in nodes { for node in nodes {
connect(network, node).await; connect(network, node).await;
@ -435,8 +461,7 @@ impl LibP2p {
} }
async move { async move {
// The peers we're currently connected to, and the networks associated with them let connected_peers = connected_peers.clone();
let mut connected_peers = HashMap::new();
let mut set_for_genesis = HashMap::new(); let mut set_for_genesis = HashMap::new();
loop { loop {
@ -502,20 +527,26 @@ impl LibP2p {
HashSet::new() HashSet::new()
} }
}; };
connected_peers.insert(addr.clone(), nets); {
let mut connected_peers = connected_peers.write().await;
connected_peers.insert(addr.clone(), nets);
log::debug!( log::debug!(
"connection established to peer {} in connection ID {}, connected peers: {}", "connection established to peer {} in connection ID {}, connected peers: {}",
&peer_id, &peer_id,
&connection_id, &connection_id,
connected_peers.len(), connected_peers.len(),
); );
}
} }
Some(SwarmEvent::ConnectionClosed { peer_id, endpoint, .. }) => { Some(SwarmEvent::ConnectionClosed { peer_id, endpoint, .. }) => {
let mut connected_peers = connected_peers.write().await;
let nets = let nets =
connected_peers connected_peers
.remove(endpoint.get_remote_address()) .remove(endpoint.get_remote_address())
.expect("closed connection to peer which never connected"); .expect("closed connection to peer which never connected");
// Downgrade to a read lock
let connected_peers = connected_peers.downgrade();
// For each net we lost a peer for, check if we still have sufficient peers // For each net we lost a peer for, check if we still have sufficient peers
// overall // overall
@ -558,7 +589,7 @@ impl LibP2p {
addr_and_nets.expect("received address was None (sender dropped?)"); addr_and_nets.expect("received address was None (sender dropped?)");
// If we've already dialed and connected to this address, don't further dial them // If we've already dialed and connected to this address, don't further dial them
// Just associate these networks with them // Just associate these networks with them
if let Some(existing_nets) = connected_peers.get_mut(&addr) { if let Some(existing_nets) = connected_peers.write().await.get_mut(&addr) {
for net in nets { for net in nets {
existing_nets.insert(net); existing_nets.insert(net);
} }