diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index cb57ee3d..72909e0a 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -58,6 +58,16 @@ pub enum P2pMessageKind { } impl P2pMessageKind { + fn genesis(&self) -> Option<[u8; 32]> { + match self { + P2pMessageKind::KeepAlive => None, + P2pMessageKind::Tributary(genesis) => Some(*genesis), + P2pMessageKind::Heartbeat(genesis) => Some(*genesis), + P2pMessageKind::Block(genesis) => Some(*genesis), + P2pMessageKind::CosignedBlock => None, + } + } + fn serialize(&self) -> Vec { match self { P2pMessageKind::KeepAlive => vec![0], @@ -119,14 +129,17 @@ pub struct Message { pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p { type Id: Send + Sync + Clone + Copy + fmt::Debug; - async fn send_raw(&self, to: Self::Id, msg: Vec); - async fn broadcast_raw(&self, msg: Vec); + async fn subscribe(&self, genesis: [u8; 32]); + async fn unsubscribe(&self, genesis: [u8; 32]); + + async fn send_raw(&self, to: Self::Id, genesis: Option<[u8; 32]>, msg: Vec); + async fn broadcast_raw(&self, genesis: Option<[u8; 32]>, msg: Vec); async fn receive_raw(&self) -> (Self::Id, Vec); async fn send(&self, to: Self::Id, kind: P2pMessageKind, msg: Vec) { let mut actual_msg = kind.serialize(); actual_msg.extend(msg); - self.send_raw(to, actual_msg).await; + self.send_raw(to, kind.genesis(), actual_msg).await; } async fn broadcast(&self, kind: P2pMessageKind, msg: Vec) { let mut actual_msg = kind.serialize(); @@ -143,7 +156,7 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p { } ); */ - self.broadcast_raw(actual_msg).await; + self.broadcast_raw(kind.genesis(), actual_msg).await; } async fn receive(&self) -> Message { let (sender, kind, msg) = loop { @@ -185,10 +198,11 @@ struct Behavior { #[allow(clippy::type_complexity)] #[derive(Clone)] -pub struct LibP2p( - Arc>>>, - Arc)>>>, -); +pub struct LibP2p { + subscribe: Arc>>, + broadcast: Arc, Vec)>>>, + receive: Arc)>>>, +} impl fmt::Debug for LibP2p { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("LibP2p").finish_non_exhaustive() @@ -240,8 +254,7 @@ impl LibP2p { ) .unwrap(); - // Uses a single topic to prevent being a BTC validator only connected to ETH validators, - // unable to communicate with other BTC validators + // Subscribe to the base topic let topic = IdentTopic::new(LIBP2P_TOPIC); gossipsub.subscribe(&topic).unwrap(); @@ -274,6 +287,11 @@ impl LibP2p { let (broadcast_send, mut broadcast_recv) = mpsc::unbounded_channel(); let (receive_send, receive_recv) = mpsc::unbounded_channel(); + let (subscribe_send, mut subscribe_recv) = mpsc::unbounded_channel(); + + fn topic_for_genesis(genesis: [u8; 32]) -> IdentTopic { + IdentTopic::new(format!("{LIBP2P_TOPIC}-{}", hex::encode(genesis))) + } tokio::spawn({ let mut time_of_last_p2p_message = Instant::now(); @@ -282,12 +300,19 @@ impl LibP2p { async fn broadcast_raw( p2p: &mut Swarm, time_of_last_p2p_message: &mut Instant, + genesis: Option<[u8; 32]>, msg: Vec, ) { // Update the time of last message *time_of_last_p2p_message = Instant::now(); - match p2p.behaviour_mut().gossipsub.publish(IdentTopic::new(LIBP2P_TOPIC), msg.clone()) { + let topic = if let Some(genesis) = genesis { + topic_for_genesis(genesis) + } else { + IdentTopic::new(LIBP2P_TOPIC) + }; + + match p2p.behaviour_mut().gossipsub.publish(topic, msg.clone()) { Err(PublishError::SigningError(e)) => panic!("signing error when broadcasting: {e}"), Err(PublishError::InsufficientPeers) => { log::warn!("failed to send p2p message due to insufficient peers") @@ -307,12 +332,32 @@ impl LibP2p { tokio::select! { biased; + // Subscribe to any new topics + topic = subscribe_recv.recv() => { + let (subscribe, topic) = topic.expect("subscribe_recv closed. are we shutting down?"); + if subscribe { + swarm + .behaviour_mut() + .gossipsub + .subscribe(&topic_for_genesis(topic)) + .unwrap(); + } else { + swarm + .behaviour_mut() + .gossipsub + .unsubscribe(&topic_for_genesis(topic)) + .unwrap(); + } + } + // Handle any queued outbound messages msg = broadcast_recv.recv() => { + let (genesis, msg) = msg.expect("broadcast_recv closed. are we shutting down?"); broadcast_raw( &mut swarm, &mut time_of_last_p2p_message, - msg.expect("broadcast_recv closed. are we shutting down?") + genesis, + msg, ).await; } @@ -363,6 +408,7 @@ impl LibP2p { broadcast_raw( &mut swarm, &mut time_of_last_p2p_message, + None, P2pMessageKind::KeepAlive.serialize() ).await; } @@ -371,7 +417,11 @@ impl LibP2p { } }); - LibP2p(Arc::new(Mutex::new(broadcast_send)), Arc::new(Mutex::new(receive_recv))) + LibP2p { + subscribe: Arc::new(Mutex::new(subscribe_send)), + broadcast: Arc::new(Mutex::new(broadcast_send)), + receive: Arc::new(Mutex::new(receive_recv)), + } } } @@ -379,18 +429,41 @@ impl LibP2p { impl P2p for LibP2p { type Id = PeerId; - async fn send_raw(&self, _: Self::Id, msg: Vec) { - self.broadcast_raw(msg).await; + async fn subscribe(&self, genesis: [u8; 32]) { + self + .subscribe + .lock() + .await + .send((true, genesis)) + .expect("subscribe_send closed. are we shutting down?"); } - async fn broadcast_raw(&self, msg: Vec) { - self.0.lock().await.send(msg).expect("broadcast_send closed. are we shutting down?"); + async fn unsubscribe(&self, genesis: [u8; 32]) { + self + .subscribe + .lock() + .await + .send((false, genesis)) + .expect("subscribe_send closed. are we shutting down?"); + } + + async fn send_raw(&self, _: Self::Id, genesis: Option<[u8; 32]>, msg: Vec) { + self.broadcast_raw(genesis, msg).await; + } + + async fn broadcast_raw(&self, genesis: Option<[u8; 32]>, msg: Vec) { + self + .broadcast + .lock() + .await + .send((genesis, msg)) + .expect("broadcast_send closed. are we shutting down?"); } // TODO: We only have a single handle call this. Differentiate Send/Recv to remove this constant // lock acquisition? async fn receive_raw(&self) -> (Self::Id, Vec) { - self.1.lock().await.recv().await.expect("receive_recv closed. are we shutting down?") + self.receive.lock().await.recv().await.expect("receive_recv closed. are we shutting down?") } } @@ -473,6 +546,9 @@ pub async fn handle_p2p_task( let (send, mut recv) = mpsc::unbounded_channel(); channels.write().await.insert(genesis, send); + // Subscribe to the topic for this tributary + p2p.subscribe(genesis).await; + // Per-Tributary P2P message handler tokio::spawn({ let p2p = p2p.clone(); @@ -595,6 +671,7 @@ pub async fn handle_p2p_task( TributaryEvent::TributaryRetired(set) => { if let Some(genesis) = set_to_genesis.remove(&set) { channels.write().await.remove(&genesis); + p2p.unsubscribe(genesis).await; } } } diff --git a/coordinator/src/tests/mod.rs b/coordinator/src/tests/mod.rs index 9a01e122..29aec904 100644 --- a/coordinator/src/tests/mod.rs +++ b/coordinator/src/tests/mod.rs @@ -62,11 +62,14 @@ impl LocalP2p { impl P2p for LocalP2p { type Id = usize; - async fn send_raw(&self, to: Self::Id, msg: Vec) { + async fn subscribe(&self, _genesis: [u8; 32]) {} + async fn unsubscribe(&self, _genesis: [u8; 32]) {} + + async fn send_raw(&self, to: Self::Id, _genesis: Option<[u8; 32]>, msg: Vec) { self.1.write().await.1[to].push_back((self.0, msg)); } - async fn broadcast_raw(&self, msg: Vec) { + async fn broadcast_raw(&self, _genesis: Option<[u8; 32]>, msg: Vec) { // Content-based deduplication let mut lock = self.1.write().await; {