Remove contention between LibP2p spawned task and consumers via channels

This commit is contained in:
Luke Parker 2023-08-30 23:31:05 -04:00
parent 2f57a69cb6
commit 1e79de87e8
No known key found for this signature in database

View file

@ -1,9 +1,9 @@
use core::{time::Duration, fmt, task::Poll};
use std::{sync::Arc, time::Instant, collections::VecDeque, io::Read};
use core::{time::Duration, fmt};
use std::{sync::Arc, time::Instant, io::Read};
use async_trait::async_trait;
use tokio::{sync::Mutex, time::sleep};
use tokio::sync::{mpsc, Mutex};
use libp2p::{
futures::StreamExt,
@ -155,7 +155,10 @@ lazy_static::lazy_static! {
#[allow(clippy::type_complexity)]
#[derive(Clone)]
pub struct LibP2p(Arc<Mutex<Swarm<Behavior>>>, Arc<Mutex<VecDeque<(PeerId, Vec<u8>)>>>);
pub struct LibP2p(
Arc<Mutex<mpsc::UnboundedSender<Vec<u8>>>>,
Arc<Mutex<mpsc::UnboundedReceiver<(PeerId, Vec<u8>)>>>,
);
impl fmt::Debug for LibP2p {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("LibP2p").finish_non_exhaustive()
@ -232,70 +235,96 @@ impl LibP2p {
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap();
let res = LibP2p(Arc::new(Mutex::new(swarm)), Arc::new(Mutex::new(VecDeque::new())));
let (broadcast_send, mut broadcast_recv) = mpsc::unbounded_channel();
let (receive_send, receive_recv) = mpsc::unbounded_channel();
tokio::spawn({
let p2p = res.clone();
#[allow(clippy::needless_pass_by_ref_mut)] // False positive
async fn broadcast_raw(p2p: &mut Swarm<Behavior>, msg: Vec<u8>) {
// Update the time of last message
*TIME_OF_LAST_P2P_MESSAGE.lock().await = Instant::now();
match p2p.behaviour_mut().gossipsub.publish(IdentTopic::new(LIBP2P_TOPIC), msg.clone()) {
Err(PublishError::SigningError(e)) => panic!("signing error when broadcasting: {e}"),
Err(PublishError::InsufficientPeers) => {
log::warn!("failed to send p2p message due to insufficient peers")
}
Err(PublishError::MessageTooLarge) => {
panic!("tried to send a too large message: {}", hex::encode(msg))
}
Err(PublishError::TransformFailed(e)) => panic!("IdentityTransform failed: {e}"),
Err(PublishError::Duplicate) | Ok(_) => {}
}
}
async move {
// Run this task ad-infinitum
loop {
// If it's been >80s since we've published a message, publish a KeepAlive since we're
// still an active service
// This is useful when we have no active tributaries and accordingly aren't sending
// heartbeats
// If we are sending heartbeats, we should've sent one after 60s of no finalized blocks
// (where a finalized block only occurs due to network activity), meaning this won't be
// run
let time_since_last =
Instant::now().duration_since(*TIME_OF_LAST_P2P_MESSAGE.lock().await);
if time_since_last > Duration::from_secs(80) {
p2p.broadcast_raw(P2pMessageKind::KeepAlive.serialize()).await;
}
tokio::select! {
biased;
// Maintain this lock until it's out of events
// TODO: Is there a less contentious way to run this poll?
let mut p2p_lock = p2p.0.lock().await;
loop {
match futures::poll!(p2p_lock.next()) {
//#[cfg(debug_assertions)]
Poll::Ready(Some(SwarmEvent::Behaviour(BehaviorEvent::Mdns(
libp2p::mdns::Event::Discovered(list),
)))) => {
for (peer, mut addr) in list {
// Check the port is as expected to prevent trying to peer with Substrate nodes
if addr.pop() == Some(libp2p::multiaddr::Protocol::Tcp(PORT)) {
log::info!("found peer via mdns");
p2p_lock.behaviour_mut().gossipsub.add_explicit_peer(&peer);
// Handle any queued outbound messages
msg = broadcast_recv.recv() => {
broadcast_raw(
&mut swarm,
msg.expect("broadcast_recv closed. are we shutting down?")
).await;
}
// Handle new incoming messages
event = swarm.next() => {
match event {
//#[cfg(debug_assertions)]
Some(SwarmEvent::Behaviour(BehaviorEvent::Mdns(
libp2p::mdns::Event::Discovered(list),
))) => {
for (peer, mut addr) in list {
// Check the port is as expected to prevent trying to peer with Substrate nodes
if addr.pop() == Some(libp2p::multiaddr::Protocol::Tcp(PORT)) {
log::info!("found peer via mdns");
swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer);
}
}
}
}
//#[cfg(debug_assertions)]
Poll::Ready(Some(SwarmEvent::Behaviour(BehaviorEvent::Mdns(
libp2p::mdns::Event::Expired(list),
)))) => {
for (peer, _) in list {
log::info!("disconnecting peer due to mdns");
p2p_lock.behaviour_mut().gossipsub.remove_explicit_peer(&peer);
//#[cfg(debug_assertions)]
Some(SwarmEvent::Behaviour(BehaviorEvent::Mdns(
libp2p::mdns::Event::Expired(list),
))) => {
for (peer, _) in list {
log::info!("disconnecting peer due to mdns");
swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer);
}
}
}
Poll::Ready(Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub(
GsEvent::Message { propagation_source, message, .. },
)))) => {
*TIME_OF_LAST_P2P_MESSAGE.lock().await = Instant::now();
p2p.1.lock().await.push_back((propagation_source, message.data));
}
Poll::Ready(Some(_)) => {}
_ => {
drop(p2p_lock);
sleep(Duration::from_millis(100)).await;
break;
Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub(
GsEvent::Message { propagation_source, message, .. },
))) => {
receive_send
.send((propagation_source, message.data))
.expect("receive_send closed. are we shutting down?");
}
_ => {}
}
}
// If it's been >80s since we've published a message, publish a KeepAlive since we're
// still an active service
// This is useful when we have no active tributaries and accordingly aren't sending
// heartbeats
// If we are sending heartbeats, we should've sent one after 60s of no finalized blocks
// (where a finalized block only occurs due to network activity), meaning this won't be
// run
_ = tokio::time::sleep(Duration::from_secs(80).saturating_sub(time_since_last)) => {
broadcast_raw(&mut swarm, P2pMessageKind::KeepAlive.serialize()).await;
}
}
}
}
});
res
LibP2p(Arc::new(Mutex::new(broadcast_send)), Arc::new(Mutex::new(receive_recv)))
}
}
@ -308,36 +337,11 @@ impl P2p for LibP2p {
}
async fn broadcast_raw(&self, msg: Vec<u8>) {
// Update the time of last message
*TIME_OF_LAST_P2P_MESSAGE.lock().await = Instant::now();
match self
.0
.lock()
.await
.behaviour_mut()
.gossipsub
.publish(IdentTopic::new(LIBP2P_TOPIC), msg.clone())
{
Err(PublishError::SigningError(e)) => panic!("signing error when broadcasting: {e}"),
Err(PublishError::InsufficientPeers) => {
log::warn!("failed to send p2p message due to insufficient peers")
}
Err(PublishError::MessageTooLarge) => {
panic!("tried to send a too large message: {}", hex::encode(msg))
}
Err(PublishError::TransformFailed(e)) => panic!("IdentityTransform failed: {e}"),
Err(PublishError::Duplicate) | Ok(_) => {}
};
self.0.lock().await.send(msg).expect("broadcast_send closed. are we shutting down?");
}
async fn receive_raw(&self) -> (Self::Id, Vec<u8>) {
loop {
if let Some(res) = self.1.lock().await.pop_front() {
return res;
}
sleep(Duration::from_millis(100)).await;
}
self.1.lock().await.recv().await.expect("receive_recv closed. are we shutting down?")
}
}