diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index ba84a8b8..5e56073a 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -36,7 +36,7 @@ use libp2p::{ IdentityTransform, AllowAllSubscriptionFilter, Event as GsEvent, PublishError, Behaviour as GsBehavior, }, - swarm::{NetworkBehaviour, SwarmEvent, Swarm}, + swarm::{NetworkBehaviour, SwarmEvent}, SwarmBuilder, }; @@ -456,32 +456,6 @@ impl LibP2p { tokio::spawn({ let mut time_of_last_p2p_message = Instant::now(); - #[allow(clippy::needless_pass_by_ref_mut)] // False positive - fn broadcast_raw( - p2p: &mut Swarm, - time_of_last_p2p_message: &mut Instant, - set: Option, - msg: Vec, - ) { - // Update the time of last message - *time_of_last_p2p_message = Instant::now(); - - let topic = - if let Some(set) = set { topic_for_set(set) } else { IdentTopic::new(LIBP2P_TOPIC) }; - - match p2p.behaviour_mut().gossipsub.publish(topic, msg.clone()) { - Err(PublishError::SigningError(e)) => panic!("signing error when broadcasting: {e}"), - Err(PublishError::InsufficientPeers) => { - log::warn!("failed to send p2p message due to insufficient peers") - } - Err(PublishError::MessageTooLarge) => { - panic!("tried to send a too large message: {}", hex::encode(msg)) - } - Err(PublishError::TransformFailed(e)) => panic!("IdentityTransform failed: {e}"), - Err(PublishError::Duplicate) | Ok(_) => {} - } - } - async move { let connected_peers = connected_peers.clone(); @@ -516,22 +490,41 @@ impl LibP2p { // Handle any queued outbound messages msg = broadcast_recv.recv() => { + // Update the time of last message + time_of_last_p2p_message = Instant::now(); + let (kind, msg): (P2pMessageKind, Vec) = msg.expect("broadcast_recv closed. are we shutting down?"); + if matches!(kind, P2pMessageKind::ReqRes(_)) { - // Use request/response + // Use request/response, yet send to all connected peers for peer_id in swarm.connected_peers().copied().collect::>() { swarm.behaviour_mut().reqres.send_request(&peer_id, msg.clone()); } } else { // Use gossipsub - let set = kind.genesis().and_then(|genesis| set_for_genesis.get(&genesis).copied()); - broadcast_raw( - &mut swarm, - &mut time_of_last_p2p_message, - set, - msg, - ); + + let set = + kind.genesis().and_then(|genesis| set_for_genesis.get(&genesis).copied()); + let topic = if let Some(set) = set { + topic_for_set(set) + } else { + IdentTopic::new(LIBP2P_TOPIC) + }; + + match swarm.behaviour_mut().gossipsub.publish(topic, msg.clone()) { + Err(PublishError::SigningError(e)) => { + panic!("signing error when broadcasting: {e}") + }, + Err(PublishError::InsufficientPeers) => { + log::warn!("failed to send p2p message due to insufficient peers") + } + Err(PublishError::MessageTooLarge) => { + panic!("tried to send a too large message: {}", hex::encode(msg)) + } + Err(PublishError::TransformFailed(e)) => panic!("IdentityTransform failed: {e}"), + Err(PublishError::Duplicate) | Ok(_) => {} + } } }