diff --git a/coordinator/src/cosign_evaluator.rs b/coordinator/src/cosign_evaluator.rs index 4ce7faf7..29d9cc4b 100644 --- a/coordinator/src/cosign_evaluator.rs +++ b/coordinator/src/cosign_evaluator.rs @@ -22,7 +22,7 @@ use serai_db::{Get, DbTxn, Db, create_db}; use processor_messages::coordinator::cosign_block_msg; use crate::{ - p2p::{CosignedBlock, P2pMessageKind, P2p}, + p2p::{CosignedBlock, GossipMessageKind, P2p}, substrate::LatestCosignedBlock, }; @@ -323,7 +323,7 @@ impl CosignEvaluator { for cosign in cosigns { let mut buf = vec![]; cosign.serialize(&mut buf).unwrap(); - P2p::broadcast(&p2p, P2pMessageKind::CosignedBlock, buf).await; + P2p::broadcast(&p2p, GossipMessageKind::CosignedBlock, buf).await; } sleep(Duration::from_secs(60)).await; } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 4de23ae0..58de348d 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -260,7 +260,7 @@ async fn handle_processor_message( cosign_channel.send(cosigned_block).unwrap(); let mut buf = vec![]; cosigned_block.serialize(&mut buf).unwrap(); - P2p::broadcast(p2p, P2pMessageKind::CosignedBlock, buf).await; + P2p::broadcast(p2p, GossipMessageKind::CosignedBlock, buf).await; None } // This causes an action on Substrate yet not on any Tributary diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 4e476c36..ba84a8b8 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -55,71 +55,112 @@ pub struct CosignedBlock { } #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] -pub enum P2pMessageKind { +pub enum ReqResMessageKind { KeepAlive, - Tributary([u8; 32]), Heartbeat([u8; 32]), Block([u8; 32]), +} + +impl ReqResMessageKind { + pub fn read(reader: &mut R) -> Option { + let mut kind = [0; 1]; + reader.read_exact(&mut kind).ok()?; + match kind[0] { + 0 => Some(ReqResMessageKind::KeepAlive), + 1 => Some({ + let mut genesis = [0; 32]; + reader.read_exact(&mut genesis).ok()?; + ReqResMessageKind::Heartbeat(genesis) + }), + 2 => Some({ + let mut genesis = [0; 32]; + reader.read_exact(&mut genesis).ok()?; + ReqResMessageKind::Block(genesis) + }), + _ => None, + } + } + + pub fn serialize(&self) -> Vec { + match self { + ReqResMessageKind::KeepAlive => vec![0], + ReqResMessageKind::Heartbeat(genesis) => { + let mut res = vec![1]; + res.extend(genesis); + res + } + ReqResMessageKind::Block(genesis) => { + let mut res = vec![2]; + res.extend(genesis); + res + } + } + } +} + +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] +pub enum GossipMessageKind { + Tributary([u8; 32]), CosignedBlock, } +impl GossipMessageKind { + pub fn read(reader: &mut R) -> Option { + let mut kind = [0; 1]; + reader.read_exact(&mut kind).ok()?; + match kind[0] { + 0 => Some({ + let mut genesis = [0; 32]; + reader.read_exact(&mut genesis).ok()?; + GossipMessageKind::Tributary(genesis) + }), + 1 => Some(GossipMessageKind::CosignedBlock), + _ => None, + } + } + + pub fn serialize(&self) -> Vec { + match self { + GossipMessageKind::Tributary(genesis) => { + let mut res = vec![0]; + res.extend(genesis); + res + } + GossipMessageKind::CosignedBlock => { + vec![1] + } + } + } +} + +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] +pub enum P2pMessageKind { + ReqRes(ReqResMessageKind), + Gossip(GossipMessageKind), +} + impl P2pMessageKind { fn genesis(&self) -> Option<[u8; 32]> { match self { - P2pMessageKind::KeepAlive | P2pMessageKind::CosignedBlock => None, - P2pMessageKind::Tributary(genesis) | - P2pMessageKind::Heartbeat(genesis) | - P2pMessageKind::Block(genesis) => Some(*genesis), + P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) | + P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => None, + P2pMessageKind::ReqRes( + ReqResMessageKind::Heartbeat(genesis) | ReqResMessageKind::Block(genesis), + ) | + P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => Some(*genesis), } } +} - fn serialize(&self) -> Vec { - match self { - P2pMessageKind::KeepAlive => vec![0], - P2pMessageKind::Tributary(genesis) => { - let mut res = vec![1]; - res.extend(genesis); - res - } - P2pMessageKind::Heartbeat(genesis) => { - let mut res = vec![2]; - res.extend(genesis); - res - } - P2pMessageKind::Block(genesis) => { - let mut res = vec![3]; - res.extend(genesis); - res - } - P2pMessageKind::CosignedBlock => { - vec![4] - } - } +impl From for P2pMessageKind { + fn from(kind: ReqResMessageKind) -> P2pMessageKind { + P2pMessageKind::ReqRes(kind) } +} - fn read(reader: &mut R) -> Option { - let mut kind = [0; 1]; - reader.read_exact(&mut kind).ok()?; - match kind[0] { - 0 => Some(P2pMessageKind::KeepAlive), - 1 => Some({ - let mut genesis = [0; 32]; - reader.read_exact(&mut genesis).ok()?; - P2pMessageKind::Tributary(genesis) - }), - 2 => Some({ - let mut genesis = [0; 32]; - reader.read_exact(&mut genesis).ok()?; - P2pMessageKind::Heartbeat(genesis) - }), - 3 => Some({ - let mut genesis = [0; 32]; - reader.read_exact(&mut genesis).ok()?; - P2pMessageKind::Block(genesis) - }), - 4 => Some(P2pMessageKind::CosignedBlock), - _ => None, - } +impl From for P2pMessageKind { + fn from(kind: GossipMessageKind) -> P2pMessageKind { + P2pMessageKind::Gossip(kind) } } @@ -139,15 +180,19 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p { async fn send_raw(&self, to: Self::Id, msg: Vec); async fn broadcast_raw(&self, kind: P2pMessageKind, msg: Vec); - async fn receive_raw(&self) -> (Self::Id, Vec); + async fn receive(&self) -> Message; - async fn send(&self, to: Self::Id, kind: P2pMessageKind, msg: Vec) { + async fn send(&self, to: Self::Id, kind: ReqResMessageKind, msg: Vec) { let mut actual_msg = kind.serialize(); actual_msg.extend(msg); self.send_raw(to, actual_msg).await; } - async fn broadcast(&self, kind: P2pMessageKind, msg: Vec) { - let mut actual_msg = kind.serialize(); + async fn broadcast(&self, kind: impl Send + Into, msg: Vec) { + let kind = kind.into(); + let mut actual_msg = match kind { + P2pMessageKind::ReqRes(kind) => kind.serialize(), + P2pMessageKind::Gossip(kind) => kind.serialize(), + }; actual_msg.extend(msg); /* log::trace!( @@ -163,35 +208,6 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p { */ self.broadcast_raw(kind, actual_msg).await; } - async fn receive(&self) -> Message { - let (sender, kind, msg) = loop { - let (sender, msg) = self.receive_raw().await; - if msg.is_empty() { - log::error!("empty p2p message from {sender:?}"); - continue; - } - - let mut msg_ref = msg.as_ref(); - let Some(kind) = P2pMessageKind::read::<&[u8]>(&mut msg_ref) else { - log::error!("invalid p2p message kind from {sender:?}"); - continue; - }; - break (sender, kind, msg_ref.to_vec()); - }; - /* - log::trace!( - "received p2p message (kind {})", - match kind { - P2pMessageKind::KeepAlive => "KeepAlive".to_string(), - P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)), - P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)), - P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)), - P2pMessageKind::CosignedBlock => "CosignedBlock".to_string(), - } - ); - */ - Message { sender, kind, msg } - } } #[derive(NetworkBehaviour)] @@ -206,7 +222,7 @@ pub struct LibP2p { subscribe: Arc>>, send: Arc)>>>, broadcast: Arc)>>>, - receive: Arc)>>>, + receive: Arc>>>, } impl fmt::Debug for LibP2p { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -502,8 +518,7 @@ impl LibP2p { msg = broadcast_recv.recv() => { let (kind, msg): (P2pMessageKind, Vec) = msg.expect("broadcast_recv closed. are we shutting down?"); - if matches!(kind, P2pMessageKind::KeepAlive) || - matches!(kind, P2pMessageKind::Heartbeat(_)) { + if matches!(kind, P2pMessageKind::ReqRes(_)) { // Use request/response for peer_id in swarm.connected_peers().copied().collect::>() { swarm.behaviour_mut().reqres.send_request(&peer_id, msg.clone()); @@ -600,17 +615,27 @@ impl LibP2p { RrMessage::Request { request, .. } => request, RrMessage::Response { response, .. } => response, }; - receive_send - .send((peer, message)) - .expect("receive_send closed. are we shutting down?"); + + let mut msg_ref = message.as_slice(); + let Some(kind) = ReqResMessageKind::read(&mut msg_ref) else { continue }; + let message = Message { + sender: peer, + kind: P2pMessageKind::ReqRes(kind), + msg: msg_ref.to_vec(), + }; + receive_send.send(message).expect("receive_send closed. are we shutting down?"); } Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub( GsEvent::Message { propagation_source, message, .. }, ))) => { - // TODO: Ban Heartbeat/Blocks received over gossipsub - receive_send - .send((propagation_source, message.data)) - .expect("receive_send closed. are we shutting down?"); + let mut msg_ref = message.data.as_slice(); + let Some(kind) = GossipMessageKind::read(&mut msg_ref) else { continue }; + let message = Message { + sender: propagation_source, + kind: P2pMessageKind::Gossip(kind), + msg: msg_ref.to_vec(), + }; + receive_send.send(message).expect("receive_send closed. are we shutting down?"); } _ => {} } @@ -642,12 +667,13 @@ impl LibP2p { // (where a finalized block only occurs due to network activity), meaning this won't be // run () = tokio::time::sleep(Duration::from_secs(80).saturating_sub(time_since_last)) => { - broadcast_raw( - &mut swarm, - &mut time_of_last_p2p_message, - None, - P2pMessageKind::KeepAlive.serialize() - ); + time_of_last_p2p_message = Instant::now(); + for peer_id in swarm.connected_peers().copied().collect::>() { + swarm + .behaviour_mut() + .reqres + .send_request(&peer_id, ReqResMessageKind::KeepAlive.serialize()); + } } } } @@ -700,7 +726,7 @@ impl P2p for LibP2p { // TODO: We only have a single handle call this. Differentiate Send/Recv to remove this constant // lock acquisition? - async fn receive_raw(&self) -> (Self::Id, Vec) { + async fn receive(&self) -> Message { self.receive.lock().await.recv().await.expect("receive_recv closed. are we shutting down?") } } @@ -708,7 +734,7 @@ impl P2p for LibP2p { #[async_trait] impl TributaryP2p for LibP2p { async fn broadcast(&self, genesis: [u8; 32], msg: Vec) { - ::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await + ::broadcast(self, GossipMessageKind::Tributary(genesis), msg).await } } @@ -751,7 +777,7 @@ pub async fn heartbeat_tributaries_task( .expect("system clock is wrong") .as_secs(); msg.extend(time.to_le_bytes()); - P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), msg).await; + P2p::broadcast(&p2p, ReqResMessageKind::Heartbeat(tributary.genesis()), msg).await; } } @@ -795,20 +821,12 @@ pub async fn handle_p2p_task( break; }; match msg.kind { - P2pMessageKind::KeepAlive => {} - - P2pMessageKind::Tributary(msg_genesis) => { - assert_eq!(msg_genesis, genesis); - log::trace!("handling message for tributary {:?}", spec_set); - if tributary.tributary.handle_message(&msg.msg).await { - P2p::broadcast(&p2p, msg.kind, msg.msg).await; - } - } + P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) => {} // TODO: Slash on Heartbeat which justifies a response, since the node // obviously was offline and we must now use our bandwidth to compensate for // them? - P2pMessageKind::Heartbeat(msg_genesis) => { + P2pMessageKind::ReqRes(ReqResMessageKind::Heartbeat(msg_genesis)) => { assert_eq!(msg_genesis, genesis); if msg.msg.len() != 40 { log::error!("validator sent invalid heartbeat"); @@ -848,13 +866,13 @@ pub async fn handle_p2p_task( res.extend(reader.commit(&next).unwrap()); // Also include the timestamp used within the Heartbeat res.extend(&msg.msg[32 .. 40]); - p2p.send(msg.sender, P2pMessageKind::Block(genesis), res).await; + p2p.send(msg.sender, ReqResMessageKind::Block(genesis), res).await; } } }); } - P2pMessageKind::Block(msg_genesis) => { + P2pMessageKind::ReqRes(ReqResMessageKind::Block(msg_genesis)) => { assert_eq!(msg_genesis, genesis); let mut msg_ref: &[u8] = msg.msg.as_ref(); let Ok(block) = Block::::read(&mut msg_ref) else { @@ -873,7 +891,15 @@ pub async fn handle_p2p_task( ); } - P2pMessageKind::CosignedBlock => unreachable!(), + P2pMessageKind::Gossip(GossipMessageKind::Tributary(msg_genesis)) => { + assert_eq!(msg_genesis, genesis); + log::trace!("handling message for tributary {:?}", spec_set); + if tributary.tributary.handle_message(&msg.msg).await { + P2p::broadcast(&p2p, msg.kind, msg.msg).await; + } + } + + P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => unreachable!(), } } } @@ -893,15 +919,16 @@ pub async fn handle_p2p_task( loop { let msg = p2p.receive().await; match msg.kind { - P2pMessageKind::KeepAlive => {} - P2pMessageKind::Tributary(genesis) | - P2pMessageKind::Heartbeat(genesis) | - P2pMessageKind::Block(genesis) => { + P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) => {} + P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) | + P2pMessageKind::ReqRes( + ReqResMessageKind::Heartbeat(genesis) | ReqResMessageKind::Block(genesis), + ) => { if let Some(channel) = channels.read().await.get(&genesis) { channel.send(msg).unwrap(); } } - P2pMessageKind::CosignedBlock => { + P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => { let Ok(msg) = CosignedBlock::deserialize_reader(&mut msg.msg.as_slice()) else { log::error!("received CosignedBlock message with invalidly serialized contents"); continue; diff --git a/coordinator/src/tests/mod.rs b/coordinator/src/tests/mod.rs index 55b6c99f..db4c158f 100644 --- a/coordinator/src/tests/mod.rs +++ b/coordinator/src/tests/mod.rs @@ -14,7 +14,7 @@ use tokio::sync::RwLock; use crate::{ processors::{Message, Processors}, - TributaryP2p, P2pMessageKind, P2p, + TributaryP2p, ReqResMessageKind, GossipMessageKind, P2pMessageKind, Message as P2pMessage, P2p, }; pub mod tributary; @@ -45,7 +45,10 @@ impl Processors for MemProcessors { #[allow(clippy::type_complexity)] #[derive(Clone, Debug)] -pub struct LocalP2p(usize, pub Arc>, Vec)>>)>>); +pub struct LocalP2p( + usize, + pub Arc>, Vec)>>)>>, +); impl LocalP2p { pub fn new(validators: usize) -> Vec { @@ -66,10 +69,12 @@ impl P2p for LocalP2p { async fn unsubscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {} async fn send_raw(&self, to: Self::Id, msg: Vec) { - self.1.write().await.1[to].push_back((self.0, msg)); + let mut msg_ref = msg.as_slice(); + let kind = ReqResMessageKind::read(&mut msg_ref).unwrap(); + self.1.write().await.1[to].push_back((self.0, P2pMessageKind::ReqRes(kind), msg_ref.to_vec())); } - async fn broadcast_raw(&self, _kind: P2pMessageKind, msg: Vec) { + async fn broadcast_raw(&self, kind: P2pMessageKind, msg: Vec) { // Content-based deduplication let mut lock = self.1.write().await; { @@ -81,19 +86,26 @@ impl P2p for LocalP2p { } let queues = &mut lock.1; + let kind_len = (match kind { + P2pMessageKind::ReqRes(kind) => kind.serialize(), + P2pMessageKind::Gossip(kind) => kind.serialize(), + }) + .len(); + let msg = msg[kind_len ..].to_vec(); + for (i, msg_queue) in queues.iter_mut().enumerate() { if i == self.0 { continue; } - msg_queue.push_back((self.0, msg.clone())); + msg_queue.push_back((self.0, kind, msg.clone())); } } - async fn receive_raw(&self) -> (Self::Id, Vec) { + async fn receive(&self) -> P2pMessage { // This is a cursed way to implement an async read from a Vec loop { - if let Some(res) = self.1.write().await.1[self.0].pop_front() { - return res; + if let Some((sender, kind, msg)) = self.1.write().await.1[self.0].pop_front() { + return P2pMessage { sender, kind, msg }; } tokio::time::sleep(std::time::Duration::from_millis(100)).await; } @@ -103,6 +115,11 @@ impl P2p for LocalP2p { #[async_trait] impl TributaryP2p for LocalP2p { async fn broadcast(&self, genesis: [u8; 32], msg: Vec) { - ::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await + ::broadcast( + self, + P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)), + msg, + ) + .await } } diff --git a/coordinator/src/tests/tributary/chain.rs b/coordinator/src/tests/tributary/chain.rs index 360af7ec..7fc6a064 100644 --- a/coordinator/src/tests/tributary/chain.rs +++ b/coordinator/src/tests/tributary/chain.rs @@ -26,7 +26,7 @@ use serai_db::MemDb; use tributary::Tributary; use crate::{ - P2pMessageKind, P2p, + GossipMessageKind, P2pMessageKind, P2p, tributary::{Transaction, TributarySpec}, tests::LocalP2p, }; @@ -98,7 +98,7 @@ pub async fn run_tributaries( for (p2p, tributary) in &mut tributaries { while let Poll::Ready(msg) = poll!(p2p.receive()) { match msg.kind { - P2pMessageKind::Tributary(genesis) => { + P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => { assert_eq!(genesis, tributary.genesis()); if tributary.handle_message(&msg.msg).await { p2p.broadcast(msg.kind, msg.msg).await; @@ -173,7 +173,7 @@ async fn tributary_test() { for (p2p, tributary) in &mut tributaries { while let Poll::Ready(msg) = poll!(p2p.receive()) { match msg.kind { - P2pMessageKind::Tributary(genesis) => { + P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => { assert_eq!(genesis, tributary.genesis()); tributary.handle_message(&msg.msg).await; } @@ -199,7 +199,7 @@ async fn tributary_test() { for (p2p, tributary) in &mut tributaries { while let Poll::Ready(msg) = poll!(p2p.receive()) { match msg.kind { - P2pMessageKind::Tributary(genesis) => { + P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => { assert_eq!(genesis, tributary.genesis()); tributary.handle_message(&msg.msg).await; }