Restart coordinator peer finding upon disconnections

This commit is contained in:
Luke Parker 2024-04-21 07:02:49 -04:00
parent 0ddbaefb38
commit be7780e69d
No known key found for this signature in database

View file

@ -2,7 +2,7 @@ use core::{time::Duration, fmt};
use std::{ use std::{
sync::Arc, sync::Arc,
io::Read, io::Read,
collections::HashMap, collections::{HashSet, HashMap},
time::{SystemTime, Instant}, time::{SystemTime, Instant},
}; };
@ -292,17 +292,22 @@ impl LibP2p {
IdentTopic::new(format!("{LIBP2P_TOPIC}-{}", hex::encode(set.encode()))) IdentTopic::new(format!("{LIBP2P_TOPIC}-{}", hex::encode(set.encode())))
} }
// The addrs we're currently dialing, and the networks associated with them
let dialing_peers = Arc::new(RwLock::new(HashMap::new()));
// Find and connect to peers // Find and connect to peers
let (pending_p2p_connections_send, mut pending_p2p_connections_recv) = let (connect_to_network_send, mut connect_to_network_recv) =
tokio::sync::mpsc::unbounded_channel(); tokio::sync::mpsc::unbounded_channel();
let (to_dial_send, mut to_dial_recv) = tokio::sync::mpsc::unbounded_channel(); let (to_dial_send, mut to_dial_recv) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn({ tokio::spawn({
let pending_p2p_connections_send = pending_p2p_connections_send.clone(); let dialing_peers = dialing_peers.clone();
let connect_to_network_send = connect_to_network_send.clone();
async move { async move {
loop { loop {
// TODO: Add better peer management logic? let connect = |network: NetworkId, addr: Multiaddr| {
{ let dialing_peers = dialing_peers.clone();
let connect = |addr: Multiaddr| { let to_dial_send = to_dial_send.clone();
async move {
log::info!("found peer from substrate: {addr}"); log::info!("found peer from substrate: {addr}");
let protocols = addr.iter().filter_map(|piece| match piece { let protocols = addr.iter().filter_map(|piece| match piece {
@ -320,14 +325,46 @@ impl LibP2p {
let addr = new_addr; let addr = new_addr;
log::debug!("transformed found peer: {addr}"); log::debug!("transformed found peer: {addr}");
// TODO: Check this isn't a duplicate let (is_fresh_dial, nets) = {
to_dial_send.send(addr).unwrap(); let mut dialing_peers = dialing_peers.write().await;
let is_fresh_dial = dialing_peers.contains_key(&addr);
if !is_fresh_dial {
dialing_peers.insert(addr.clone(), HashSet::new());
}
// Associate this network with this peer
dialing_peers.get_mut(&addr).unwrap().insert(network);
let nets = dialing_peers.get(&addr).unwrap().clone();
(is_fresh_dial, nets)
};
// Spawn a task to remove this peer from 'dialing' in sixty seconds, in case dialing
// fails
// This performs cleanup and bounds the size of the map to whatever growth occurs
// within a temporal window
tokio::spawn({
let dialing_peers = dialing_peers.clone();
let addr = addr.clone();
async move {
tokio::time::sleep(core::time::Duration::from_secs(60)).await;
let mut dialing_peers = dialing_peers.write().await;
dialing_peers.remove(&addr);
}
});
if !is_fresh_dial {
to_dial_send.send((addr, nets)).unwrap();
}
}
}; };
// TODO: We should also connect to random peers from random nets as needed for // TODO: We should also connect to random peers from random nets as needed for
// cosigning // cosigning
// Define a buffer, `to_retry`, so we can exhaust this channel before sending more down
// it
let mut to_retry = vec![]; let mut to_retry = vec![];
while let Some(network) = pending_p2p_connections_recv.recv().await { while let Some(network) = connect_to_network_recv.recv().await {
if let Ok(mut nodes) = serai.p2p_validators(network).await { if let Ok(mut nodes) = serai.p2p_validators(network).await {
// If there's an insufficient amount of nodes known, connect to all yet add it // If there's an insufficient amount of nodes known, connect to all yet add it
// back and break // back and break
@ -337,9 +374,11 @@ impl LibP2p {
network, network,
nodes.len() nodes.len()
); );
// TODO: We weren't retry so long as we're told of sufficient nodes
// We should stop retrying when we actually connect to sufficient nodes
to_retry.push(network); to_retry.push(network);
for node in nodes { for node in nodes {
connect(node); connect(network, node).await;
} }
continue; continue;
} }
@ -351,14 +390,13 @@ impl LibP2p {
usize::try_from(OsRng.next_u64() % u64::try_from(nodes.len()).unwrap()) usize::try_from(OsRng.next_u64() % u64::try_from(nodes.len()).unwrap())
.unwrap(), .unwrap(),
); );
connect(to_connect); connect(network, to_connect).await;
} }
} }
} }
} }
for to_retry in to_retry { for to_retry in to_retry {
pending_p2p_connections_send.send(to_retry).unwrap(); connect_to_network_send.send(to_retry).unwrap();
}
} }
// Sleep 60 seconds before moving to the next iteration // Sleep 60 seconds before moving to the next iteration
tokio::time::sleep(core::time::Duration::from_secs(60)).await; tokio::time::sleep(core::time::Duration::from_secs(60)).await;
@ -397,8 +435,10 @@ impl LibP2p {
} }
async move { async move {
// The peers we're currently connected to, and the networks associated with them
let mut connected_peers = HashMap::new();
let mut set_for_genesis = HashMap::new(); let mut set_for_genesis = HashMap::new();
let mut connected_peers = 0;
loop { loop {
let time_since_last = Instant::now().duration_since(time_of_last_p2p_message); let time_since_last = Instant::now().duration_since(time_of_last_p2p_message);
tokio::select! { tokio::select! {
@ -411,7 +451,7 @@ impl LibP2p {
let topic = topic_for_set(set); let topic = topic_for_set(set);
if subscribe { if subscribe {
log::info!("subscribing to p2p messages for {set:?}"); log::info!("subscribing to p2p messages for {set:?}");
pending_p2p_connections_send.send(set.network).unwrap(); connect_to_network_send.send(set.network).unwrap();
set_for_genesis.insert(genesis, set); set_for_genesis.insert(genesis, set);
swarm.behaviour_mut().gossipsub.subscribe(&topic).unwrap(); swarm.behaviour_mut().gossipsub.subscribe(&topic).unwrap();
} else { } else {
@ -440,26 +480,65 @@ impl LibP2p {
Some(SwarmEvent::Dialing { connection_id, .. }) => { Some(SwarmEvent::Dialing { connection_id, .. }) => {
log::debug!("dialing to peer in connection ID {}", &connection_id); log::debug!("dialing to peer in connection ID {}", &connection_id);
} }
Some(SwarmEvent::ConnectionEstablished { peer_id, connection_id, .. }) => { Some(SwarmEvent::ConnectionEstablished {
peer_id,
connection_id,
endpoint,
..
}) => {
if &peer_id == swarm.local_peer_id() { if &peer_id == swarm.local_peer_id() {
log::warn!("established a libp2p connection to ourselves"); log::warn!("established a libp2p connection to ourselves");
swarm.close_connection(connection_id); swarm.close_connection(connection_id);
continue; continue;
} }
connected_peers += 1; let addr = endpoint.get_remote_address();
let nets = {
let mut dialing_peers = dialing_peers.write().await;
if let Some(nets) = dialing_peers.remove(addr) {
nets
} else {
log::debug!("connected to a peer who we didn't have within dialing");
HashSet::new()
}
};
connected_peers.insert(addr.clone(), nets);
log::debug!( log::debug!(
"connection established to peer {} in connection ID {}, connected peers: {}", "connection established to peer {} in connection ID {}, connected peers: {}",
&peer_id, &peer_id,
&connection_id, &connection_id,
connected_peers, connected_peers.len(),
); );
} }
Some(SwarmEvent::ConnectionClosed { peer_id, .. }) => { Some(SwarmEvent::ConnectionClosed { peer_id, endpoint, .. }) => {
connected_peers -= 1; let nets =
connected_peers
.remove(endpoint.get_remote_address())
.expect("closed connection to peer which never connected");
// For each net we lost a peer for, check if we still have sufficient peers
// overall
for net in nets {
let mut remaining_peers = 0;
for nets in connected_peers.values() {
if nets.contains(&net) {
remaining_peers += 1;
}
}
// If we do not, start connecting to this network again
if remaining_peers < 3 {
connect_to_network_send
.send(net)
.expect(
"couldn't send net to connect to due to disconnects (receiver dropped?)"
);
}
}
log::debug!( log::debug!(
"connection with peer {peer_id} closed, connected peers: {}", "connection with peer {peer_id} closed, connected peers: {}",
connected_peers, connected_peers.len(),
); );
} }
Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub( Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub(
@ -474,8 +553,17 @@ impl LibP2p {
} }
// Handle peers to dial // Handle peers to dial
addr = to_dial_recv.recv() => { addr_and_nets = to_dial_recv.recv() => {
let addr = addr.expect("received address was None (sender dropped?)"); let (addr, nets) =
addr_and_nets.expect("received address was None (sender dropped?)");
// If we've already dialed and connected to this address, don't further dial them
// Just associate these networks with them
if let Some(existing_nets) = connected_peers.get_mut(&addr) {
for net in nets {
existing_nets.insert(net);
}
}
if let Err(e) = swarm.dial(addr) { if let Err(e) = swarm.dial(addr) {
log::warn!("dialing peer failed: {e:?}"); log::warn!("dialing peer failed: {e:?}");
} }