Split peer finding into a dedicated task

This commit is contained in:
Luke Parker 2024-03-22 23:40:15 -04:00
parent af9b1ad5f9
commit 35b58a45bd
No known key found for this signature in database

View file

@ -290,6 +290,75 @@ impl LibP2p {
IdentTopic::new(format!("{LIBP2P_TOPIC}-{}", hex::encode(set.encode())))
}
// Find and connect to peers
let (pending_p2p_connections_send, mut pending_p2p_connections_recv) =
tokio::sync::mpsc::unbounded_channel();
let (to_dial_send, mut to_dial_recv) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn({
let pending_p2p_connections_send = pending_p2p_connections_send.clone();
async move {
loop {
// TODO: Add better peer management logic?
{
let connect = |addr: Multiaddr| {
log::info!("found peer from substrate: {addr}");
let protocols = addr.iter().filter_map(|piece| match piece {
// Drop PeerIds from the Substrate P2p network
Protocol::P2p(_) => None,
// Use our own TCP port
Protocol::Tcp(_) => Some(Protocol::Tcp(PORT)),
other => Some(other),
});
let mut new_addr = Multiaddr::empty();
for protocol in protocols {
new_addr.push(protocol);
}
let addr = new_addr;
log::debug!("transformed found peer: {addr}");
// TODO: Check this isn't a duplicate
to_dial_send.send(addr).unwrap();
};
while let Some(network) = pending_p2p_connections_recv.recv().await {
if let Ok(mut nodes) = serai.p2p_validators(network).await {
// If there's an insufficient amount of nodes known, connect to all yet add it
// back and break
if nodes.len() < 3 {
log::warn!(
"insufficient amount of P2P nodes known for {:?}: {}",
network,
nodes.len()
);
pending_p2p_connections_send.send(network).unwrap();
for node in nodes {
connect(node);
}
break;
}
// Randomly select up to 5
for _ in 0 .. 5 {
if !nodes.is_empty() {
let to_connect = nodes.swap_remove(
usize::try_from(OsRng.next_u64() % u64::try_from(nodes.len()).unwrap())
.unwrap(),
);
connect(to_connect);
}
}
}
}
}
// Sleep 60 seconds before moving to the next iteration
tokio::time::sleep(core::time::Duration::from_secs(60)).await;
}
}
});
// Manage the actual swarm
tokio::spawn({
let mut time_of_last_p2p_message = Instant::now();
@ -321,66 +390,7 @@ impl LibP2p {
async move {
let mut set_for_genesis = HashMap::new();
let mut pending_p2p_connections = vec![];
// Run this task ad-infinitum
loop {
// Handle pending P2P connections
// TODO: Break this out onto its own task with better peer management logic?
{
let mut connect = |addr: Multiaddr| {
log::info!("found peer from substrate: {addr}");
let protocols = addr.iter().filter_map(|piece| match piece {
// Drop PeerIds from the Substrate P2p network
Protocol::P2p(_) => None,
// Use our own TCP port
Protocol::Tcp(_) => Some(Protocol::Tcp(PORT)),
other => Some(other),
});
let mut new_addr = Multiaddr::empty();
for protocol in protocols {
new_addr.push(protocol);
}
let addr = new_addr;
log::debug!("transformed found peer: {addr}");
if let Err(e) = swarm.dial(addr) {
log::warn!("dialing peer failed: {e:?}");
}
};
while let Some(network) = pending_p2p_connections.pop() {
if let Ok(mut nodes) = serai.p2p_validators(network).await {
// If there's an insufficient amount of nodes known, connect to all yet add it back
// and break
if nodes.len() < 3 {
log::warn!(
"insufficient amount of P2P nodes known for {:?}: {}",
network,
nodes.len()
);
pending_p2p_connections.push(network);
for node in nodes {
connect(node);
}
break;
}
// Randomly select up to 5
for _ in 0 .. 5 {
if !nodes.is_empty() {
let to_connect = nodes.swap_remove(
usize::try_from(OsRng.next_u64() % u64::try_from(nodes.len()).unwrap())
.unwrap(),
);
connect(to_connect);
}
}
}
}
}
let time_since_last = Instant::now().duration_since(time_of_last_p2p_message);
tokio::select! {
biased;
@ -392,7 +402,7 @@ impl LibP2p {
let topic = topic_for_set(set);
if subscribe {
log::info!("subscribing to p2p messages for {set:?}");
pending_p2p_connections.push(set.network);
pending_p2p_connections_send.send(set.network).unwrap();
set_for_genesis.insert(genesis, set);
swarm.behaviour_mut().gossipsub.subscribe(&topic).unwrap();
} else {
@ -440,6 +450,14 @@ impl LibP2p {
}
}
// Handle peers to dial
addr = to_dial_recv.recv() => {
let addr = addr.expect("received address was None (sender dropped?)");
if let Err(e) = swarm.dial(addr) {
log::warn!("dialing peer failed: {e:?}");
}
}
// If it's been >80s since we've published a message, publish a KeepAlive since we're
// still an active service
// This is useful when we have no active tributaries and accordingly aren't sending