diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 48a23aa5..8e88f6ee 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -2,7 +2,7 @@ use core::{time::Duration, fmt}; use std::{ sync::Arc, io::Read, - collections::HashMap, + collections::{HashSet, HashMap}, time::{SystemTime, Instant}, }; @@ -292,17 +292,22 @@ impl LibP2p { IdentTopic::new(format!("{LIBP2P_TOPIC}-{}", hex::encode(set.encode()))) } + // The addrs we're currently dialing, and the networks associated with them + let dialing_peers = Arc::new(RwLock::new(HashMap::new())); + // Find and connect to peers - let (pending_p2p_connections_send, mut pending_p2p_connections_recv) = + let (connect_to_network_send, mut connect_to_network_recv) = tokio::sync::mpsc::unbounded_channel(); let (to_dial_send, mut to_dial_recv) = tokio::sync::mpsc::unbounded_channel(); tokio::spawn({ - let pending_p2p_connections_send = pending_p2p_connections_send.clone(); + let dialing_peers = dialing_peers.clone(); + let connect_to_network_send = connect_to_network_send.clone(); async move { loop { - // TODO: Add better peer management logic? - { - let connect = |addr: Multiaddr| { + let connect = |network: NetworkId, addr: Multiaddr| { + let dialing_peers = dialing_peers.clone(); + let to_dial_send = to_dial_send.clone(); + async move { log::info!("found peer from substrate: {addr}"); let protocols = addr.iter().filter_map(|piece| match piece { @@ -320,45 +325,78 @@ impl LibP2p { let addr = new_addr; log::debug!("transformed found peer: {addr}"); - // TODO: Check this isn't a duplicate - to_dial_send.send(addr).unwrap(); - }; - - // TODO: We should also connect to random peers from random nets as needed for - // cosigning - let mut to_retry = vec![]; - while let Some(network) = pending_p2p_connections_recv.recv().await { - if let Ok(mut nodes) = serai.p2p_validators(network).await { - // If there's an insufficient amount of nodes known, connect to all yet add it - // back and break - if nodes.len() < 3 { - log::warn!( - "insufficient amount of P2P nodes known for {:?}: {}", - network, - nodes.len() - ); - to_retry.push(network); - for node in nodes { - connect(node); - } - continue; + let (is_fresh_dial, nets) = { + let mut dialing_peers = dialing_peers.write().await; + let is_fresh_dial = dialing_peers.contains_key(&addr); + if !is_fresh_dial { + dialing_peers.insert(addr.clone(), HashSet::new()); } + // Associate this network with this peer + dialing_peers.get_mut(&addr).unwrap().insert(network); - // Randomly select up to 5 - for _ in 0 .. 5 { - if !nodes.is_empty() { - let to_connect = nodes.swap_remove( - usize::try_from(OsRng.next_u64() % u64::try_from(nodes.len()).unwrap()) - .unwrap(), - ); - connect(to_connect); - } + let nets = dialing_peers.get(&addr).unwrap().clone(); + (is_fresh_dial, nets) + }; + + // Spawn a task to remove this peer from 'dialing' in sixty seconds, in case dialing + // fails + // This performs cleanup and bounds the size of the map to whatever growth occurs + // within a temporal window + tokio::spawn({ + let dialing_peers = dialing_peers.clone(); + let addr = addr.clone(); + async move { + tokio::time::sleep(core::time::Duration::from_secs(60)).await; + let mut dialing_peers = dialing_peers.write().await; + dialing_peers.remove(&addr); + } + }); + + if !is_fresh_dial { + to_dial_send.send((addr, nets)).unwrap(); + } + } + }; + + // TODO: We should also connect to random peers from random nets as needed for + // cosigning + + // Define a buffer, `to_retry`, so we can exhaust this channel before sending more down + // it + let mut to_retry = vec![]; + while let Some(network) = connect_to_network_recv.recv().await { + if let Ok(mut nodes) = serai.p2p_validators(network).await { + // If there's an insufficient amount of nodes known, connect to all yet add it + // back and break + if nodes.len() < 3 { + log::warn!( + "insufficient amount of P2P nodes known for {:?}: {}", + network, + nodes.len() + ); + // TODO: We weren't retry so long as we're told of sufficient nodes + // We should stop retrying when we actually connect to sufficient nodes + to_retry.push(network); + for node in nodes { + connect(network, node).await; + } + continue; + } + + // Randomly select up to 5 + for _ in 0 .. 5 { + if !nodes.is_empty() { + let to_connect = nodes.swap_remove( + usize::try_from(OsRng.next_u64() % u64::try_from(nodes.len()).unwrap()) + .unwrap(), + ); + connect(network, to_connect).await; } } } - for to_retry in to_retry { - pending_p2p_connections_send.send(to_retry).unwrap(); - } + } + for to_retry in to_retry { + connect_to_network_send.send(to_retry).unwrap(); } // Sleep 60 seconds before moving to the next iteration tokio::time::sleep(core::time::Duration::from_secs(60)).await; @@ -397,8 +435,10 @@ impl LibP2p { } async move { + // The peers we're currently connected to, and the networks associated with them + let mut connected_peers = HashMap::new(); + let mut set_for_genesis = HashMap::new(); - let mut connected_peers = 0; loop { let time_since_last = Instant::now().duration_since(time_of_last_p2p_message); tokio::select! { @@ -411,7 +451,7 @@ impl LibP2p { let topic = topic_for_set(set); if subscribe { log::info!("subscribing to p2p messages for {set:?}"); - pending_p2p_connections_send.send(set.network).unwrap(); + connect_to_network_send.send(set.network).unwrap(); set_for_genesis.insert(genesis, set); swarm.behaviour_mut().gossipsub.subscribe(&topic).unwrap(); } else { @@ -440,26 +480,65 @@ impl LibP2p { Some(SwarmEvent::Dialing { connection_id, .. }) => { log::debug!("dialing to peer in connection ID {}", &connection_id); } - Some(SwarmEvent::ConnectionEstablished { peer_id, connection_id, .. }) => { + Some(SwarmEvent::ConnectionEstablished { + peer_id, + connection_id, + endpoint, + .. + }) => { if &peer_id == swarm.local_peer_id() { log::warn!("established a libp2p connection to ourselves"); swarm.close_connection(connection_id); continue; } - connected_peers += 1; + let addr = endpoint.get_remote_address(); + let nets = { + let mut dialing_peers = dialing_peers.write().await; + if let Some(nets) = dialing_peers.remove(addr) { + nets + } else { + log::debug!("connected to a peer who we didn't have within dialing"); + HashSet::new() + } + }; + connected_peers.insert(addr.clone(), nets); + log::debug!( "connection established to peer {} in connection ID {}, connected peers: {}", &peer_id, &connection_id, - connected_peers, + connected_peers.len(), ); } - Some(SwarmEvent::ConnectionClosed { peer_id, .. }) => { - connected_peers -= 1; + Some(SwarmEvent::ConnectionClosed { peer_id, endpoint, .. }) => { + let nets = + connected_peers + .remove(endpoint.get_remote_address()) + .expect("closed connection to peer which never connected"); + + // For each net we lost a peer for, check if we still have sufficient peers + // overall + for net in nets { + let mut remaining_peers = 0; + for nets in connected_peers.values() { + if nets.contains(&net) { + remaining_peers += 1; + } + } + // If we do not, start connecting to this network again + if remaining_peers < 3 { + connect_to_network_send + .send(net) + .expect( + "couldn't send net to connect to due to disconnects (receiver dropped?)" + ); + } + } + log::debug!( "connection with peer {peer_id} closed, connected peers: {}", - connected_peers, + connected_peers.len(), ); } Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub( @@ -474,8 +553,17 @@ impl LibP2p { } // Handle peers to dial - addr = to_dial_recv.recv() => { - let addr = addr.expect("received address was None (sender dropped?)"); + addr_and_nets = to_dial_recv.recv() => { + let (addr, nets) = + addr_and_nets.expect("received address was None (sender dropped?)"); + // If we've already dialed and connected to this address, don't further dial them + // Just associate these networks with them + if let Some(existing_nets) = connected_peers.get_mut(&addr) { + for net in nets { + existing_nets.insert(net); + } + } + if let Err(e) = swarm.dial(addr) { log::warn!("dialing peer failed: {e:?}"); }