diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 11db03b0..67cb2005 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -1,9 +1,9 @@ -use core::{time::Duration, fmt, task::Poll}; -use std::{sync::Arc, time::Instant, collections::VecDeque, io::Read}; +use core::{time::Duration, fmt}; +use std::{sync::Arc, time::Instant, io::Read}; use async_trait::async_trait; -use tokio::{sync::Mutex, time::sleep}; +use tokio::sync::{mpsc, Mutex}; use libp2p::{ futures::StreamExt, @@ -155,7 +155,10 @@ lazy_static::lazy_static! { #[allow(clippy::type_complexity)] #[derive(Clone)] -pub struct LibP2p(Arc>>, Arc)>>>); +pub struct LibP2p( + Arc>>>, + Arc)>>>, +); impl fmt::Debug for LibP2p { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("LibP2p").finish_non_exhaustive() @@ -232,70 +235,96 @@ impl LibP2p { const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o') swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap(); - let res = LibP2p(Arc::new(Mutex::new(swarm)), Arc::new(Mutex::new(VecDeque::new()))); + let (broadcast_send, mut broadcast_recv) = mpsc::unbounded_channel(); + let (receive_send, receive_recv) = mpsc::unbounded_channel(); + tokio::spawn({ - let p2p = res.clone(); + #[allow(clippy::needless_pass_by_ref_mut)] // False positive + async fn broadcast_raw(p2p: &mut Swarm, msg: Vec) { + // Update the time of last message + *TIME_OF_LAST_P2P_MESSAGE.lock().await = Instant::now(); + + match p2p.behaviour_mut().gossipsub.publish(IdentTopic::new(LIBP2P_TOPIC), msg.clone()) { + Err(PublishError::SigningError(e)) => panic!("signing error when broadcasting: {e}"), + Err(PublishError::InsufficientPeers) => { + log::warn!("failed to send p2p message due to insufficient peers") + } + Err(PublishError::MessageTooLarge) => { + panic!("tried to send a too large message: {}", hex::encode(msg)) + } + Err(PublishError::TransformFailed(e)) => panic!("IdentityTransform failed: {e}"), + Err(PublishError::Duplicate) | Ok(_) => {} + } + } + async move { // Run this task ad-infinitum loop { - // If it's been >80s since we've published a message, publish a KeepAlive since we're - // still an active service - // This is useful when we have no active tributaries and accordingly aren't sending - // heartbeats - // If we are sending heartbeats, we should've sent one after 60s of no finalized blocks - // (where a finalized block only occurs due to network activity), meaning this won't be - // run let time_since_last = Instant::now().duration_since(*TIME_OF_LAST_P2P_MESSAGE.lock().await); - if time_since_last > Duration::from_secs(80) { - p2p.broadcast_raw(P2pMessageKind::KeepAlive.serialize()).await; - } + tokio::select! { + biased; - // Maintain this lock until it's out of events - // TODO: Is there a less contentious way to run this poll? - let mut p2p_lock = p2p.0.lock().await; - loop { - match futures::poll!(p2p_lock.next()) { - //#[cfg(debug_assertions)] - Poll::Ready(Some(SwarmEvent::Behaviour(BehaviorEvent::Mdns( - libp2p::mdns::Event::Discovered(list), - )))) => { - for (peer, mut addr) in list { - // Check the port is as expected to prevent trying to peer with Substrate nodes - if addr.pop() == Some(libp2p::multiaddr::Protocol::Tcp(PORT)) { - log::info!("found peer via mdns"); - p2p_lock.behaviour_mut().gossipsub.add_explicit_peer(&peer); + // Handle any queued outbound messages + msg = broadcast_recv.recv() => { + broadcast_raw( + &mut swarm, + msg.expect("broadcast_recv closed. are we shutting down?") + ).await; + } + + // Handle new incoming messages + event = swarm.next() => { + match event { + //#[cfg(debug_assertions)] + Some(SwarmEvent::Behaviour(BehaviorEvent::Mdns( + libp2p::mdns::Event::Discovered(list), + ))) => { + for (peer, mut addr) in list { + // Check the port is as expected to prevent trying to peer with Substrate nodes + if addr.pop() == Some(libp2p::multiaddr::Protocol::Tcp(PORT)) { + log::info!("found peer via mdns"); + swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer); + } } } - } - //#[cfg(debug_assertions)] - Poll::Ready(Some(SwarmEvent::Behaviour(BehaviorEvent::Mdns( - libp2p::mdns::Event::Expired(list), - )))) => { - for (peer, _) in list { - log::info!("disconnecting peer due to mdns"); - p2p_lock.behaviour_mut().gossipsub.remove_explicit_peer(&peer); + //#[cfg(debug_assertions)] + Some(SwarmEvent::Behaviour(BehaviorEvent::Mdns( + libp2p::mdns::Event::Expired(list), + ))) => { + for (peer, _) in list { + log::info!("disconnecting peer due to mdns"); + swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer); + } } - } - Poll::Ready(Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub( - GsEvent::Message { propagation_source, message, .. }, - )))) => { - *TIME_OF_LAST_P2P_MESSAGE.lock().await = Instant::now(); - p2p.1.lock().await.push_back((propagation_source, message.data)); - } - Poll::Ready(Some(_)) => {} - _ => { - drop(p2p_lock); - sleep(Duration::from_millis(100)).await; - break; + Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub( + GsEvent::Message { propagation_source, message, .. }, + ))) => { + receive_send + .send((propagation_source, message.data)) + .expect("receive_send closed. are we shutting down?"); + } + _ => {} } } + + // If it's been >80s since we've published a message, publish a KeepAlive since we're + // still an active service + // This is useful when we have no active tributaries and accordingly aren't sending + // heartbeats + // If we are sending heartbeats, we should've sent one after 60s of no finalized blocks + // (where a finalized block only occurs due to network activity), meaning this won't be + // run + _ = tokio::time::sleep(Duration::from_secs(80).saturating_sub(time_since_last)) => { + broadcast_raw(&mut swarm, P2pMessageKind::KeepAlive.serialize()).await; + } } } } }); - res + + LibP2p(Arc::new(Mutex::new(broadcast_send)), Arc::new(Mutex::new(receive_recv))) } } @@ -308,36 +337,11 @@ impl P2p for LibP2p { } async fn broadcast_raw(&self, msg: Vec) { - // Update the time of last message - *TIME_OF_LAST_P2P_MESSAGE.lock().await = Instant::now(); - - match self - .0 - .lock() - .await - .behaviour_mut() - .gossipsub - .publish(IdentTopic::new(LIBP2P_TOPIC), msg.clone()) - { - Err(PublishError::SigningError(e)) => panic!("signing error when broadcasting: {e}"), - Err(PublishError::InsufficientPeers) => { - log::warn!("failed to send p2p message due to insufficient peers") - } - Err(PublishError::MessageTooLarge) => { - panic!("tried to send a too large message: {}", hex::encode(msg)) - } - Err(PublishError::TransformFailed(e)) => panic!("IdentityTransform failed: {e}"), - Err(PublishError::Duplicate) | Ok(_) => {} - }; + self.0.lock().await.send(msg).expect("broadcast_send closed. are we shutting down?"); } async fn receive_raw(&self) -> (Self::Id, Vec) { - loop { - if let Some(res) = self.1.lock().await.pop_front() { - return res; - } - sleep(Duration::from_millis(100)).await; - } + self.1.lock().await.recv().await.expect("receive_recv closed. are we shutting down?") } }