Properly diversify ReqResMessageKind/GossipMessageKind

This commit is contained in:
Luke Parker 2024-04-23 06:37:41 -04:00
parent 8cef9eff6f
commit 023275bcb6
No known key found for this signature in database
5 changed files with 181 additions and 137 deletions

View file

@ -22,7 +22,7 @@ use serai_db::{Get, DbTxn, Db, create_db};
use processor_messages::coordinator::cosign_block_msg;
use crate::{
p2p::{CosignedBlock, P2pMessageKind, P2p},
p2p::{CosignedBlock, GossipMessageKind, P2p},
substrate::LatestCosignedBlock,
};
@ -323,7 +323,7 @@ impl<D: Db> CosignEvaluator<D> {
for cosign in cosigns {
let mut buf = vec![];
cosign.serialize(&mut buf).unwrap();
P2p::broadcast(&p2p, P2pMessageKind::CosignedBlock, buf).await;
P2p::broadcast(&p2p, GossipMessageKind::CosignedBlock, buf).await;
}
sleep(Duration::from_secs(60)).await;
}

View file

@ -260,7 +260,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
cosign_channel.send(cosigned_block).unwrap();
let mut buf = vec![];
cosigned_block.serialize(&mut buf).unwrap();
P2p::broadcast(p2p, P2pMessageKind::CosignedBlock, buf).await;
P2p::broadcast(p2p, GossipMessageKind::CosignedBlock, buf).await;
None
}
// This causes an action on Substrate yet not on any Tributary

View file

@ -55,71 +55,112 @@ pub struct CosignedBlock {
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum P2pMessageKind {
pub enum ReqResMessageKind {
KeepAlive,
Tributary([u8; 32]),
Heartbeat([u8; 32]),
Block([u8; 32]),
}
impl ReqResMessageKind {
pub fn read<R: Read>(reader: &mut R) -> Option<ReqResMessageKind> {
let mut kind = [0; 1];
reader.read_exact(&mut kind).ok()?;
match kind[0] {
0 => Some(ReqResMessageKind::KeepAlive),
1 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
ReqResMessageKind::Heartbeat(genesis)
}),
2 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
ReqResMessageKind::Block(genesis)
}),
_ => None,
}
}
pub fn serialize(&self) -> Vec<u8> {
match self {
ReqResMessageKind::KeepAlive => vec![0],
ReqResMessageKind::Heartbeat(genesis) => {
let mut res = vec![1];
res.extend(genesis);
res
}
ReqResMessageKind::Block(genesis) => {
let mut res = vec![2];
res.extend(genesis);
res
}
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum GossipMessageKind {
Tributary([u8; 32]),
CosignedBlock,
}
impl GossipMessageKind {
pub fn read<R: Read>(reader: &mut R) -> Option<GossipMessageKind> {
let mut kind = [0; 1];
reader.read_exact(&mut kind).ok()?;
match kind[0] {
0 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
GossipMessageKind::Tributary(genesis)
}),
1 => Some(GossipMessageKind::CosignedBlock),
_ => None,
}
}
pub fn serialize(&self) -> Vec<u8> {
match self {
GossipMessageKind::Tributary(genesis) => {
let mut res = vec![0];
res.extend(genesis);
res
}
GossipMessageKind::CosignedBlock => {
vec![1]
}
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum P2pMessageKind {
ReqRes(ReqResMessageKind),
Gossip(GossipMessageKind),
}
impl P2pMessageKind {
fn genesis(&self) -> Option<[u8; 32]> {
match self {
P2pMessageKind::KeepAlive | P2pMessageKind::CosignedBlock => None,
P2pMessageKind::Tributary(genesis) |
P2pMessageKind::Heartbeat(genesis) |
P2pMessageKind::Block(genesis) => Some(*genesis),
P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) |
P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => None,
P2pMessageKind::ReqRes(
ReqResMessageKind::Heartbeat(genesis) | ReqResMessageKind::Block(genesis),
) |
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => Some(*genesis),
}
}
}
fn serialize(&self) -> Vec<u8> {
match self {
P2pMessageKind::KeepAlive => vec![0],
P2pMessageKind::Tributary(genesis) => {
let mut res = vec![1];
res.extend(genesis);
res
}
P2pMessageKind::Heartbeat(genesis) => {
let mut res = vec![2];
res.extend(genesis);
res
}
P2pMessageKind::Block(genesis) => {
let mut res = vec![3];
res.extend(genesis);
res
}
P2pMessageKind::CosignedBlock => {
vec![4]
}
}
impl From<ReqResMessageKind> for P2pMessageKind {
fn from(kind: ReqResMessageKind) -> P2pMessageKind {
P2pMessageKind::ReqRes(kind)
}
}
fn read<R: Read>(reader: &mut R) -> Option<P2pMessageKind> {
let mut kind = [0; 1];
reader.read_exact(&mut kind).ok()?;
match kind[0] {
0 => Some(P2pMessageKind::KeepAlive),
1 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Tributary(genesis)
}),
2 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Heartbeat(genesis)
}),
3 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Block(genesis)
}),
4 => Some(P2pMessageKind::CosignedBlock),
_ => None,
}
impl From<GossipMessageKind> for P2pMessageKind {
fn from(kind: GossipMessageKind) -> P2pMessageKind {
P2pMessageKind::Gossip(kind)
}
}
@ -139,15 +180,19 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
async fn send_raw(&self, to: Self::Id, msg: Vec<u8>);
async fn broadcast_raw(&self, kind: P2pMessageKind, msg: Vec<u8>);
async fn receive_raw(&self) -> (Self::Id, Vec<u8>);
async fn receive(&self) -> Message<Self>;
async fn send(&self, to: Self::Id, kind: P2pMessageKind, msg: Vec<u8>) {
async fn send(&self, to: Self::Id, kind: ReqResMessageKind, msg: Vec<u8>) {
let mut actual_msg = kind.serialize();
actual_msg.extend(msg);
self.send_raw(to, actual_msg).await;
}
async fn broadcast(&self, kind: P2pMessageKind, msg: Vec<u8>) {
let mut actual_msg = kind.serialize();
async fn broadcast(&self, kind: impl Send + Into<P2pMessageKind>, msg: Vec<u8>) {
let kind = kind.into();
let mut actual_msg = match kind {
P2pMessageKind::ReqRes(kind) => kind.serialize(),
P2pMessageKind::Gossip(kind) => kind.serialize(),
};
actual_msg.extend(msg);
/*
log::trace!(
@ -163,35 +208,6 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
*/
self.broadcast_raw(kind, actual_msg).await;
}
async fn receive(&self) -> Message<Self> {
let (sender, kind, msg) = loop {
let (sender, msg) = self.receive_raw().await;
if msg.is_empty() {
log::error!("empty p2p message from {sender:?}");
continue;
}
let mut msg_ref = msg.as_ref();
let Some(kind) = P2pMessageKind::read::<&[u8]>(&mut msg_ref) else {
log::error!("invalid p2p message kind from {sender:?}");
continue;
};
break (sender, kind, msg_ref.to_vec());
};
/*
log::trace!(
"received p2p message (kind {})",
match kind {
P2pMessageKind::KeepAlive => "KeepAlive".to_string(),
P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)),
P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)),
P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)),
P2pMessageKind::CosignedBlock => "CosignedBlock".to_string(),
}
);
*/
Message { sender, kind, msg }
}
}
#[derive(NetworkBehaviour)]
@ -206,7 +222,7 @@ pub struct LibP2p {
subscribe: Arc<Mutex<mpsc::UnboundedSender<(bool, ValidatorSet, [u8; 32])>>>,
send: Arc<Mutex<mpsc::UnboundedSender<(PeerId, Vec<u8>)>>>,
broadcast: Arc<Mutex<mpsc::UnboundedSender<(P2pMessageKind, Vec<u8>)>>>,
receive: Arc<Mutex<mpsc::UnboundedReceiver<(PeerId, Vec<u8>)>>>,
receive: Arc<Mutex<mpsc::UnboundedReceiver<Message<Self>>>>,
}
impl fmt::Debug for LibP2p {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
@ -502,8 +518,7 @@ impl LibP2p {
msg = broadcast_recv.recv() => {
let (kind, msg): (P2pMessageKind, Vec<u8>) =
msg.expect("broadcast_recv closed. are we shutting down?");
if matches!(kind, P2pMessageKind::KeepAlive) ||
matches!(kind, P2pMessageKind::Heartbeat(_)) {
if matches!(kind, P2pMessageKind::ReqRes(_)) {
// Use request/response
for peer_id in swarm.connected_peers().copied().collect::<Vec<_>>() {
swarm.behaviour_mut().reqres.send_request(&peer_id, msg.clone());
@ -600,17 +615,27 @@ impl LibP2p {
RrMessage::Request { request, .. } => request,
RrMessage::Response { response, .. } => response,
};
receive_send
.send((peer, message))
.expect("receive_send closed. are we shutting down?");
let mut msg_ref = message.as_slice();
let Some(kind) = ReqResMessageKind::read(&mut msg_ref) else { continue };
let message = Message {
sender: peer,
kind: P2pMessageKind::ReqRes(kind),
msg: msg_ref.to_vec(),
};
receive_send.send(message).expect("receive_send closed. are we shutting down?");
}
Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub(
GsEvent::Message { propagation_source, message, .. },
))) => {
// TODO: Ban Heartbeat/Blocks received over gossipsub
receive_send
.send((propagation_source, message.data))
.expect("receive_send closed. are we shutting down?");
let mut msg_ref = message.data.as_slice();
let Some(kind) = GossipMessageKind::read(&mut msg_ref) else { continue };
let message = Message {
sender: propagation_source,
kind: P2pMessageKind::Gossip(kind),
msg: msg_ref.to_vec(),
};
receive_send.send(message).expect("receive_send closed. are we shutting down?");
}
_ => {}
}
@ -642,12 +667,13 @@ impl LibP2p {
// (where a finalized block only occurs due to network activity), meaning this won't be
// run
() = tokio::time::sleep(Duration::from_secs(80).saturating_sub(time_since_last)) => {
broadcast_raw(
&mut swarm,
&mut time_of_last_p2p_message,
None,
P2pMessageKind::KeepAlive.serialize()
);
time_of_last_p2p_message = Instant::now();
for peer_id in swarm.connected_peers().copied().collect::<Vec<_>>() {
swarm
.behaviour_mut()
.reqres
.send_request(&peer_id, ReqResMessageKind::KeepAlive.serialize());
}
}
}
}
@ -700,7 +726,7 @@ impl P2p for LibP2p {
// TODO: We only have a single handle call this. Differentiate Send/Recv to remove this constant
// lock acquisition?
async fn receive_raw(&self) -> (Self::Id, Vec<u8>) {
async fn receive(&self) -> Message<Self> {
self.receive.lock().await.recv().await.expect("receive_recv closed. are we shutting down?")
}
}
@ -708,7 +734,7 @@ impl P2p for LibP2p {
#[async_trait]
impl TributaryP2p for LibP2p {
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
<Self as P2p>::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await
<Self as P2p>::broadcast(self, GossipMessageKind::Tributary(genesis), msg).await
}
}
@ -751,7 +777,7 @@ pub async fn heartbeat_tributaries_task<D: Db, P: P2p>(
.expect("system clock is wrong")
.as_secs();
msg.extend(time.to_le_bytes());
P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), msg).await;
P2p::broadcast(&p2p, ReqResMessageKind::Heartbeat(tributary.genesis()), msg).await;
}
}
@ -795,20 +821,12 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
break;
};
match msg.kind {
P2pMessageKind::KeepAlive => {}
P2pMessageKind::Tributary(msg_genesis) => {
assert_eq!(msg_genesis, genesis);
log::trace!("handling message for tributary {:?}", spec_set);
if tributary.tributary.handle_message(&msg.msg).await {
P2p::broadcast(&p2p, msg.kind, msg.msg).await;
}
}
P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) => {}
// TODO: Slash on Heartbeat which justifies a response, since the node
// obviously was offline and we must now use our bandwidth to compensate for
// them?
P2pMessageKind::Heartbeat(msg_genesis) => {
P2pMessageKind::ReqRes(ReqResMessageKind::Heartbeat(msg_genesis)) => {
assert_eq!(msg_genesis, genesis);
if msg.msg.len() != 40 {
log::error!("validator sent invalid heartbeat");
@ -848,13 +866,13 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
res.extend(reader.commit(&next).unwrap());
// Also include the timestamp used within the Heartbeat
res.extend(&msg.msg[32 .. 40]);
p2p.send(msg.sender, P2pMessageKind::Block(genesis), res).await;
p2p.send(msg.sender, ReqResMessageKind::Block(genesis), res).await;
}
}
});
}
P2pMessageKind::Block(msg_genesis) => {
P2pMessageKind::ReqRes(ReqResMessageKind::Block(msg_genesis)) => {
assert_eq!(msg_genesis, genesis);
let mut msg_ref: &[u8] = msg.msg.as_ref();
let Ok(block) = Block::<Transaction>::read(&mut msg_ref) else {
@ -873,7 +891,15 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
);
}
P2pMessageKind::CosignedBlock => unreachable!(),
P2pMessageKind::Gossip(GossipMessageKind::Tributary(msg_genesis)) => {
assert_eq!(msg_genesis, genesis);
log::trace!("handling message for tributary {:?}", spec_set);
if tributary.tributary.handle_message(&msg.msg).await {
P2p::broadcast(&p2p, msg.kind, msg.msg).await;
}
}
P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => unreachable!(),
}
}
}
@ -893,15 +919,16 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
loop {
let msg = p2p.receive().await;
match msg.kind {
P2pMessageKind::KeepAlive => {}
P2pMessageKind::Tributary(genesis) |
P2pMessageKind::Heartbeat(genesis) |
P2pMessageKind::Block(genesis) => {
P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) => {}
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) |
P2pMessageKind::ReqRes(
ReqResMessageKind::Heartbeat(genesis) | ReqResMessageKind::Block(genesis),
) => {
if let Some(channel) = channels.read().await.get(&genesis) {
channel.send(msg).unwrap();
}
}
P2pMessageKind::CosignedBlock => {
P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => {
let Ok(msg) = CosignedBlock::deserialize_reader(&mut msg.msg.as_slice()) else {
log::error!("received CosignedBlock message with invalidly serialized contents");
continue;

View file

@ -14,7 +14,7 @@ use tokio::sync::RwLock;
use crate::{
processors::{Message, Processors},
TributaryP2p, P2pMessageKind, P2p,
TributaryP2p, ReqResMessageKind, GossipMessageKind, P2pMessageKind, Message as P2pMessage, P2p,
};
pub mod tributary;
@ -45,7 +45,10 @@ impl Processors for MemProcessors {
#[allow(clippy::type_complexity)]
#[derive(Clone, Debug)]
pub struct LocalP2p(usize, pub Arc<RwLock<(HashSet<Vec<u8>>, Vec<VecDeque<(usize, Vec<u8>)>>)>>);
pub struct LocalP2p(
usize,
pub Arc<RwLock<(HashSet<Vec<u8>>, Vec<VecDeque<(usize, P2pMessageKind, Vec<u8>)>>)>>,
);
impl LocalP2p {
pub fn new(validators: usize) -> Vec<LocalP2p> {
@ -66,10 +69,12 @@ impl P2p for LocalP2p {
async fn unsubscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {}
async fn send_raw(&self, to: Self::Id, msg: Vec<u8>) {
self.1.write().await.1[to].push_back((self.0, msg));
let mut msg_ref = msg.as_slice();
let kind = ReqResMessageKind::read(&mut msg_ref).unwrap();
self.1.write().await.1[to].push_back((self.0, P2pMessageKind::ReqRes(kind), msg_ref.to_vec()));
}
async fn broadcast_raw(&self, _kind: P2pMessageKind, msg: Vec<u8>) {
async fn broadcast_raw(&self, kind: P2pMessageKind, msg: Vec<u8>) {
// Content-based deduplication
let mut lock = self.1.write().await;
{
@ -81,19 +86,26 @@ impl P2p for LocalP2p {
}
let queues = &mut lock.1;
let kind_len = (match kind {
P2pMessageKind::ReqRes(kind) => kind.serialize(),
P2pMessageKind::Gossip(kind) => kind.serialize(),
})
.len();
let msg = msg[kind_len ..].to_vec();
for (i, msg_queue) in queues.iter_mut().enumerate() {
if i == self.0 {
continue;
}
msg_queue.push_back((self.0, msg.clone()));
msg_queue.push_back((self.0, kind, msg.clone()));
}
}
async fn receive_raw(&self) -> (Self::Id, Vec<u8>) {
async fn receive(&self) -> P2pMessage<Self> {
// This is a cursed way to implement an async read from a Vec
loop {
if let Some(res) = self.1.write().await.1[self.0].pop_front() {
return res;
if let Some((sender, kind, msg)) = self.1.write().await.1[self.0].pop_front() {
return P2pMessage { sender, kind, msg };
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
@ -103,6 +115,11 @@ impl P2p for LocalP2p {
#[async_trait]
impl TributaryP2p for LocalP2p {
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
<Self as P2p>::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await
<Self as P2p>::broadcast(
self,
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)),
msg,
)
.await
}
}

View file

@ -26,7 +26,7 @@ use serai_db::MemDb;
use tributary::Tributary;
use crate::{
P2pMessageKind, P2p,
GossipMessageKind, P2pMessageKind, P2p,
tributary::{Transaction, TributarySpec},
tests::LocalP2p,
};
@ -98,7 +98,7 @@ pub async fn run_tributaries(
for (p2p, tributary) in &mut tributaries {
while let Poll::Ready(msg) = poll!(p2p.receive()) {
match msg.kind {
P2pMessageKind::Tributary(genesis) => {
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => {
assert_eq!(genesis, tributary.genesis());
if tributary.handle_message(&msg.msg).await {
p2p.broadcast(msg.kind, msg.msg).await;
@ -173,7 +173,7 @@ async fn tributary_test() {
for (p2p, tributary) in &mut tributaries {
while let Poll::Ready(msg) = poll!(p2p.receive()) {
match msg.kind {
P2pMessageKind::Tributary(genesis) => {
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => {
assert_eq!(genesis, tributary.genesis());
tributary.handle_message(&msg.msg).await;
}
@ -199,7 +199,7 @@ async fn tributary_test() {
for (p2p, tributary) in &mut tributaries {
while let Poll::Ready(msg) = poll!(p2p.receive()) {
match msg.kind {
P2pMessageKind::Tributary(genesis) => {
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => {
assert_eq!(genesis, tributary.genesis());
tributary.handle_message(&msg.msg).await;
}