mirror of
https://github.com/serai-dex/serai.git
synced 2024-11-17 09:27:36 +00:00
Properly diversify ReqResMessageKind/GossipMessageKind
This commit is contained in:
parent
f3ccf1cab0
commit
c3b6abf020
5 changed files with 181 additions and 137 deletions
|
@ -22,7 +22,7 @@ use serai_db::{Get, DbTxn, Db, create_db};
|
||||||
use processor_messages::coordinator::cosign_block_msg;
|
use processor_messages::coordinator::cosign_block_msg;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
p2p::{CosignedBlock, P2pMessageKind, P2p},
|
p2p::{CosignedBlock, GossipMessageKind, P2p},
|
||||||
substrate::LatestCosignedBlock,
|
substrate::LatestCosignedBlock,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -323,7 +323,7 @@ impl<D: Db> CosignEvaluator<D> {
|
||||||
for cosign in cosigns {
|
for cosign in cosigns {
|
||||||
let mut buf = vec![];
|
let mut buf = vec![];
|
||||||
cosign.serialize(&mut buf).unwrap();
|
cosign.serialize(&mut buf).unwrap();
|
||||||
P2p::broadcast(&p2p, P2pMessageKind::CosignedBlock, buf).await;
|
P2p::broadcast(&p2p, GossipMessageKind::CosignedBlock, buf).await;
|
||||||
}
|
}
|
||||||
sleep(Duration::from_secs(60)).await;
|
sleep(Duration::from_secs(60)).await;
|
||||||
}
|
}
|
||||||
|
|
|
@ -260,7 +260,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
|
||||||
cosign_channel.send(cosigned_block).unwrap();
|
cosign_channel.send(cosigned_block).unwrap();
|
||||||
let mut buf = vec![];
|
let mut buf = vec![];
|
||||||
cosigned_block.serialize(&mut buf).unwrap();
|
cosigned_block.serialize(&mut buf).unwrap();
|
||||||
P2p::broadcast(p2p, P2pMessageKind::CosignedBlock, buf).await;
|
P2p::broadcast(p2p, GossipMessageKind::CosignedBlock, buf).await;
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
// This causes an action on Substrate yet not on any Tributary
|
// This causes an action on Substrate yet not on any Tributary
|
||||||
|
|
|
@ -55,71 +55,112 @@ pub struct CosignedBlock {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
|
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
|
||||||
pub enum P2pMessageKind {
|
pub enum ReqResMessageKind {
|
||||||
KeepAlive,
|
KeepAlive,
|
||||||
Tributary([u8; 32]),
|
|
||||||
Heartbeat([u8; 32]),
|
Heartbeat([u8; 32]),
|
||||||
Block([u8; 32]),
|
Block([u8; 32]),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReqResMessageKind {
|
||||||
|
pub fn read<R: Read>(reader: &mut R) -> Option<ReqResMessageKind> {
|
||||||
|
let mut kind = [0; 1];
|
||||||
|
reader.read_exact(&mut kind).ok()?;
|
||||||
|
match kind[0] {
|
||||||
|
0 => Some(ReqResMessageKind::KeepAlive),
|
||||||
|
1 => Some({
|
||||||
|
let mut genesis = [0; 32];
|
||||||
|
reader.read_exact(&mut genesis).ok()?;
|
||||||
|
ReqResMessageKind::Heartbeat(genesis)
|
||||||
|
}),
|
||||||
|
2 => Some({
|
||||||
|
let mut genesis = [0; 32];
|
||||||
|
reader.read_exact(&mut genesis).ok()?;
|
||||||
|
ReqResMessageKind::Block(genesis)
|
||||||
|
}),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn serialize(&self) -> Vec<u8> {
|
||||||
|
match self {
|
||||||
|
ReqResMessageKind::KeepAlive => vec![0],
|
||||||
|
ReqResMessageKind::Heartbeat(genesis) => {
|
||||||
|
let mut res = vec![1];
|
||||||
|
res.extend(genesis);
|
||||||
|
res
|
||||||
|
}
|
||||||
|
ReqResMessageKind::Block(genesis) => {
|
||||||
|
let mut res = vec![2];
|
||||||
|
res.extend(genesis);
|
||||||
|
res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
|
||||||
|
pub enum GossipMessageKind {
|
||||||
|
Tributary([u8; 32]),
|
||||||
CosignedBlock,
|
CosignedBlock,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl GossipMessageKind {
|
||||||
|
pub fn read<R: Read>(reader: &mut R) -> Option<GossipMessageKind> {
|
||||||
|
let mut kind = [0; 1];
|
||||||
|
reader.read_exact(&mut kind).ok()?;
|
||||||
|
match kind[0] {
|
||||||
|
0 => Some({
|
||||||
|
let mut genesis = [0; 32];
|
||||||
|
reader.read_exact(&mut genesis).ok()?;
|
||||||
|
GossipMessageKind::Tributary(genesis)
|
||||||
|
}),
|
||||||
|
1 => Some(GossipMessageKind::CosignedBlock),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn serialize(&self) -> Vec<u8> {
|
||||||
|
match self {
|
||||||
|
GossipMessageKind::Tributary(genesis) => {
|
||||||
|
let mut res = vec![0];
|
||||||
|
res.extend(genesis);
|
||||||
|
res
|
||||||
|
}
|
||||||
|
GossipMessageKind::CosignedBlock => {
|
||||||
|
vec![1]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
|
||||||
|
pub enum P2pMessageKind {
|
||||||
|
ReqRes(ReqResMessageKind),
|
||||||
|
Gossip(GossipMessageKind),
|
||||||
|
}
|
||||||
|
|
||||||
impl P2pMessageKind {
|
impl P2pMessageKind {
|
||||||
fn genesis(&self) -> Option<[u8; 32]> {
|
fn genesis(&self) -> Option<[u8; 32]> {
|
||||||
match self {
|
match self {
|
||||||
P2pMessageKind::KeepAlive | P2pMessageKind::CosignedBlock => None,
|
P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) |
|
||||||
P2pMessageKind::Tributary(genesis) |
|
P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => None,
|
||||||
P2pMessageKind::Heartbeat(genesis) |
|
P2pMessageKind::ReqRes(
|
||||||
P2pMessageKind::Block(genesis) => Some(*genesis),
|
ReqResMessageKind::Heartbeat(genesis) | ReqResMessageKind::Block(genesis),
|
||||||
|
) |
|
||||||
|
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => Some(*genesis),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn serialize(&self) -> Vec<u8> {
|
impl From<ReqResMessageKind> for P2pMessageKind {
|
||||||
match self {
|
fn from(kind: ReqResMessageKind) -> P2pMessageKind {
|
||||||
P2pMessageKind::KeepAlive => vec![0],
|
P2pMessageKind::ReqRes(kind)
|
||||||
P2pMessageKind::Tributary(genesis) => {
|
|
||||||
let mut res = vec![1];
|
|
||||||
res.extend(genesis);
|
|
||||||
res
|
|
||||||
}
|
|
||||||
P2pMessageKind::Heartbeat(genesis) => {
|
|
||||||
let mut res = vec![2];
|
|
||||||
res.extend(genesis);
|
|
||||||
res
|
|
||||||
}
|
|
||||||
P2pMessageKind::Block(genesis) => {
|
|
||||||
let mut res = vec![3];
|
|
||||||
res.extend(genesis);
|
|
||||||
res
|
|
||||||
}
|
|
||||||
P2pMessageKind::CosignedBlock => {
|
|
||||||
vec![4]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn read<R: Read>(reader: &mut R) -> Option<P2pMessageKind> {
|
impl From<GossipMessageKind> for P2pMessageKind {
|
||||||
let mut kind = [0; 1];
|
fn from(kind: GossipMessageKind) -> P2pMessageKind {
|
||||||
reader.read_exact(&mut kind).ok()?;
|
P2pMessageKind::Gossip(kind)
|
||||||
match kind[0] {
|
|
||||||
0 => Some(P2pMessageKind::KeepAlive),
|
|
||||||
1 => Some({
|
|
||||||
let mut genesis = [0; 32];
|
|
||||||
reader.read_exact(&mut genesis).ok()?;
|
|
||||||
P2pMessageKind::Tributary(genesis)
|
|
||||||
}),
|
|
||||||
2 => Some({
|
|
||||||
let mut genesis = [0; 32];
|
|
||||||
reader.read_exact(&mut genesis).ok()?;
|
|
||||||
P2pMessageKind::Heartbeat(genesis)
|
|
||||||
}),
|
|
||||||
3 => Some({
|
|
||||||
let mut genesis = [0; 32];
|
|
||||||
reader.read_exact(&mut genesis).ok()?;
|
|
||||||
P2pMessageKind::Block(genesis)
|
|
||||||
}),
|
|
||||||
4 => Some(P2pMessageKind::CosignedBlock),
|
|
||||||
_ => None,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,15 +180,19 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
|
||||||
|
|
||||||
async fn send_raw(&self, to: Self::Id, msg: Vec<u8>);
|
async fn send_raw(&self, to: Self::Id, msg: Vec<u8>);
|
||||||
async fn broadcast_raw(&self, kind: P2pMessageKind, msg: Vec<u8>);
|
async fn broadcast_raw(&self, kind: P2pMessageKind, msg: Vec<u8>);
|
||||||
async fn receive_raw(&self) -> (Self::Id, Vec<u8>);
|
async fn receive(&self) -> Message<Self>;
|
||||||
|
|
||||||
async fn send(&self, to: Self::Id, kind: P2pMessageKind, msg: Vec<u8>) {
|
async fn send(&self, to: Self::Id, kind: ReqResMessageKind, msg: Vec<u8>) {
|
||||||
let mut actual_msg = kind.serialize();
|
let mut actual_msg = kind.serialize();
|
||||||
actual_msg.extend(msg);
|
actual_msg.extend(msg);
|
||||||
self.send_raw(to, actual_msg).await;
|
self.send_raw(to, actual_msg).await;
|
||||||
}
|
}
|
||||||
async fn broadcast(&self, kind: P2pMessageKind, msg: Vec<u8>) {
|
async fn broadcast(&self, kind: impl Send + Into<P2pMessageKind>, msg: Vec<u8>) {
|
||||||
let mut actual_msg = kind.serialize();
|
let kind = kind.into();
|
||||||
|
let mut actual_msg = match kind {
|
||||||
|
P2pMessageKind::ReqRes(kind) => kind.serialize(),
|
||||||
|
P2pMessageKind::Gossip(kind) => kind.serialize(),
|
||||||
|
};
|
||||||
actual_msg.extend(msg);
|
actual_msg.extend(msg);
|
||||||
/*
|
/*
|
||||||
log::trace!(
|
log::trace!(
|
||||||
|
@ -163,35 +208,6 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
|
||||||
*/
|
*/
|
||||||
self.broadcast_raw(kind, actual_msg).await;
|
self.broadcast_raw(kind, actual_msg).await;
|
||||||
}
|
}
|
||||||
async fn receive(&self) -> Message<Self> {
|
|
||||||
let (sender, kind, msg) = loop {
|
|
||||||
let (sender, msg) = self.receive_raw().await;
|
|
||||||
if msg.is_empty() {
|
|
||||||
log::error!("empty p2p message from {sender:?}");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut msg_ref = msg.as_ref();
|
|
||||||
let Some(kind) = P2pMessageKind::read::<&[u8]>(&mut msg_ref) else {
|
|
||||||
log::error!("invalid p2p message kind from {sender:?}");
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
break (sender, kind, msg_ref.to_vec());
|
|
||||||
};
|
|
||||||
/*
|
|
||||||
log::trace!(
|
|
||||||
"received p2p message (kind {})",
|
|
||||||
match kind {
|
|
||||||
P2pMessageKind::KeepAlive => "KeepAlive".to_string(),
|
|
||||||
P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)),
|
|
||||||
P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)),
|
|
||||||
P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)),
|
|
||||||
P2pMessageKind::CosignedBlock => "CosignedBlock".to_string(),
|
|
||||||
}
|
|
||||||
);
|
|
||||||
*/
|
|
||||||
Message { sender, kind, msg }
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(NetworkBehaviour)]
|
#[derive(NetworkBehaviour)]
|
||||||
|
@ -206,7 +222,7 @@ pub struct LibP2p {
|
||||||
subscribe: Arc<Mutex<mpsc::UnboundedSender<(bool, ValidatorSet, [u8; 32])>>>,
|
subscribe: Arc<Mutex<mpsc::UnboundedSender<(bool, ValidatorSet, [u8; 32])>>>,
|
||||||
send: Arc<Mutex<mpsc::UnboundedSender<(PeerId, Vec<u8>)>>>,
|
send: Arc<Mutex<mpsc::UnboundedSender<(PeerId, Vec<u8>)>>>,
|
||||||
broadcast: Arc<Mutex<mpsc::UnboundedSender<(P2pMessageKind, Vec<u8>)>>>,
|
broadcast: Arc<Mutex<mpsc::UnboundedSender<(P2pMessageKind, Vec<u8>)>>>,
|
||||||
receive: Arc<Mutex<mpsc::UnboundedReceiver<(PeerId, Vec<u8>)>>>,
|
receive: Arc<Mutex<mpsc::UnboundedReceiver<Message<Self>>>>,
|
||||||
}
|
}
|
||||||
impl fmt::Debug for LibP2p {
|
impl fmt::Debug for LibP2p {
|
||||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
@ -502,8 +518,7 @@ impl LibP2p {
|
||||||
msg = broadcast_recv.recv() => {
|
msg = broadcast_recv.recv() => {
|
||||||
let (kind, msg): (P2pMessageKind, Vec<u8>) =
|
let (kind, msg): (P2pMessageKind, Vec<u8>) =
|
||||||
msg.expect("broadcast_recv closed. are we shutting down?");
|
msg.expect("broadcast_recv closed. are we shutting down?");
|
||||||
if matches!(kind, P2pMessageKind::KeepAlive) ||
|
if matches!(kind, P2pMessageKind::ReqRes(_)) {
|
||||||
matches!(kind, P2pMessageKind::Heartbeat(_)) {
|
|
||||||
// Use request/response
|
// Use request/response
|
||||||
for peer_id in swarm.connected_peers().copied().collect::<Vec<_>>() {
|
for peer_id in swarm.connected_peers().copied().collect::<Vec<_>>() {
|
||||||
swarm.behaviour_mut().reqres.send_request(&peer_id, msg.clone());
|
swarm.behaviour_mut().reqres.send_request(&peer_id, msg.clone());
|
||||||
|
@ -600,17 +615,27 @@ impl LibP2p {
|
||||||
RrMessage::Request { request, .. } => request,
|
RrMessage::Request { request, .. } => request,
|
||||||
RrMessage::Response { response, .. } => response,
|
RrMessage::Response { response, .. } => response,
|
||||||
};
|
};
|
||||||
receive_send
|
|
||||||
.send((peer, message))
|
let mut msg_ref = message.as_slice();
|
||||||
.expect("receive_send closed. are we shutting down?");
|
let Some(kind) = ReqResMessageKind::read(&mut msg_ref) else { continue };
|
||||||
|
let message = Message {
|
||||||
|
sender: peer,
|
||||||
|
kind: P2pMessageKind::ReqRes(kind),
|
||||||
|
msg: msg_ref.to_vec(),
|
||||||
|
};
|
||||||
|
receive_send.send(message).expect("receive_send closed. are we shutting down?");
|
||||||
}
|
}
|
||||||
Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub(
|
Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub(
|
||||||
GsEvent::Message { propagation_source, message, .. },
|
GsEvent::Message { propagation_source, message, .. },
|
||||||
))) => {
|
))) => {
|
||||||
// TODO: Ban Heartbeat/Blocks received over gossipsub
|
let mut msg_ref = message.data.as_slice();
|
||||||
receive_send
|
let Some(kind) = GossipMessageKind::read(&mut msg_ref) else { continue };
|
||||||
.send((propagation_source, message.data))
|
let message = Message {
|
||||||
.expect("receive_send closed. are we shutting down?");
|
sender: propagation_source,
|
||||||
|
kind: P2pMessageKind::Gossip(kind),
|
||||||
|
msg: msg_ref.to_vec(),
|
||||||
|
};
|
||||||
|
receive_send.send(message).expect("receive_send closed. are we shutting down?");
|
||||||
}
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
|
@ -642,12 +667,13 @@ impl LibP2p {
|
||||||
// (where a finalized block only occurs due to network activity), meaning this won't be
|
// (where a finalized block only occurs due to network activity), meaning this won't be
|
||||||
// run
|
// run
|
||||||
() = tokio::time::sleep(Duration::from_secs(80).saturating_sub(time_since_last)) => {
|
() = tokio::time::sleep(Duration::from_secs(80).saturating_sub(time_since_last)) => {
|
||||||
broadcast_raw(
|
time_of_last_p2p_message = Instant::now();
|
||||||
&mut swarm,
|
for peer_id in swarm.connected_peers().copied().collect::<Vec<_>>() {
|
||||||
&mut time_of_last_p2p_message,
|
swarm
|
||||||
None,
|
.behaviour_mut()
|
||||||
P2pMessageKind::KeepAlive.serialize()
|
.reqres
|
||||||
);
|
.send_request(&peer_id, ReqResMessageKind::KeepAlive.serialize());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -700,7 +726,7 @@ impl P2p for LibP2p {
|
||||||
|
|
||||||
// TODO: We only have a single handle call this. Differentiate Send/Recv to remove this constant
|
// TODO: We only have a single handle call this. Differentiate Send/Recv to remove this constant
|
||||||
// lock acquisition?
|
// lock acquisition?
|
||||||
async fn receive_raw(&self) -> (Self::Id, Vec<u8>) {
|
async fn receive(&self) -> Message<Self> {
|
||||||
self.receive.lock().await.recv().await.expect("receive_recv closed. are we shutting down?")
|
self.receive.lock().await.recv().await.expect("receive_recv closed. are we shutting down?")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -708,7 +734,7 @@ impl P2p for LibP2p {
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl TributaryP2p for LibP2p {
|
impl TributaryP2p for LibP2p {
|
||||||
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
|
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
|
||||||
<Self as P2p>::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await
|
<Self as P2p>::broadcast(self, GossipMessageKind::Tributary(genesis), msg).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -751,7 +777,7 @@ pub async fn heartbeat_tributaries_task<D: Db, P: P2p>(
|
||||||
.expect("system clock is wrong")
|
.expect("system clock is wrong")
|
||||||
.as_secs();
|
.as_secs();
|
||||||
msg.extend(time.to_le_bytes());
|
msg.extend(time.to_le_bytes());
|
||||||
P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), msg).await;
|
P2p::broadcast(&p2p, ReqResMessageKind::Heartbeat(tributary.genesis()), msg).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -795,20 +821,12 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
|
||||||
break;
|
break;
|
||||||
};
|
};
|
||||||
match msg.kind {
|
match msg.kind {
|
||||||
P2pMessageKind::KeepAlive => {}
|
P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) => {}
|
||||||
|
|
||||||
P2pMessageKind::Tributary(msg_genesis) => {
|
|
||||||
assert_eq!(msg_genesis, genesis);
|
|
||||||
log::trace!("handling message for tributary {:?}", spec_set);
|
|
||||||
if tributary.tributary.handle_message(&msg.msg).await {
|
|
||||||
P2p::broadcast(&p2p, msg.kind, msg.msg).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Slash on Heartbeat which justifies a response, since the node
|
// TODO: Slash on Heartbeat which justifies a response, since the node
|
||||||
// obviously was offline and we must now use our bandwidth to compensate for
|
// obviously was offline and we must now use our bandwidth to compensate for
|
||||||
// them?
|
// them?
|
||||||
P2pMessageKind::Heartbeat(msg_genesis) => {
|
P2pMessageKind::ReqRes(ReqResMessageKind::Heartbeat(msg_genesis)) => {
|
||||||
assert_eq!(msg_genesis, genesis);
|
assert_eq!(msg_genesis, genesis);
|
||||||
if msg.msg.len() != 40 {
|
if msg.msg.len() != 40 {
|
||||||
log::error!("validator sent invalid heartbeat");
|
log::error!("validator sent invalid heartbeat");
|
||||||
|
@ -848,13 +866,13 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
|
||||||
res.extend(reader.commit(&next).unwrap());
|
res.extend(reader.commit(&next).unwrap());
|
||||||
// Also include the timestamp used within the Heartbeat
|
// Also include the timestamp used within the Heartbeat
|
||||||
res.extend(&msg.msg[32 .. 40]);
|
res.extend(&msg.msg[32 .. 40]);
|
||||||
p2p.send(msg.sender, P2pMessageKind::Block(genesis), res).await;
|
p2p.send(msg.sender, ReqResMessageKind::Block(genesis), res).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
P2pMessageKind::Block(msg_genesis) => {
|
P2pMessageKind::ReqRes(ReqResMessageKind::Block(msg_genesis)) => {
|
||||||
assert_eq!(msg_genesis, genesis);
|
assert_eq!(msg_genesis, genesis);
|
||||||
let mut msg_ref: &[u8] = msg.msg.as_ref();
|
let mut msg_ref: &[u8] = msg.msg.as_ref();
|
||||||
let Ok(block) = Block::<Transaction>::read(&mut msg_ref) else {
|
let Ok(block) = Block::<Transaction>::read(&mut msg_ref) else {
|
||||||
|
@ -873,7 +891,15 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
P2pMessageKind::CosignedBlock => unreachable!(),
|
P2pMessageKind::Gossip(GossipMessageKind::Tributary(msg_genesis)) => {
|
||||||
|
assert_eq!(msg_genesis, genesis);
|
||||||
|
log::trace!("handling message for tributary {:?}", spec_set);
|
||||||
|
if tributary.tributary.handle_message(&msg.msg).await {
|
||||||
|
P2p::broadcast(&p2p, msg.kind, msg.msg).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => unreachable!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -893,15 +919,16 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
|
||||||
loop {
|
loop {
|
||||||
let msg = p2p.receive().await;
|
let msg = p2p.receive().await;
|
||||||
match msg.kind {
|
match msg.kind {
|
||||||
P2pMessageKind::KeepAlive => {}
|
P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) => {}
|
||||||
P2pMessageKind::Tributary(genesis) |
|
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) |
|
||||||
P2pMessageKind::Heartbeat(genesis) |
|
P2pMessageKind::ReqRes(
|
||||||
P2pMessageKind::Block(genesis) => {
|
ReqResMessageKind::Heartbeat(genesis) | ReqResMessageKind::Block(genesis),
|
||||||
|
) => {
|
||||||
if let Some(channel) = channels.read().await.get(&genesis) {
|
if let Some(channel) = channels.read().await.get(&genesis) {
|
||||||
channel.send(msg).unwrap();
|
channel.send(msg).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
P2pMessageKind::CosignedBlock => {
|
P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => {
|
||||||
let Ok(msg) = CosignedBlock::deserialize_reader(&mut msg.msg.as_slice()) else {
|
let Ok(msg) = CosignedBlock::deserialize_reader(&mut msg.msg.as_slice()) else {
|
||||||
log::error!("received CosignedBlock message with invalidly serialized contents");
|
log::error!("received CosignedBlock message with invalidly serialized contents");
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -14,7 +14,7 @@ use tokio::sync::RwLock;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
processors::{Message, Processors},
|
processors::{Message, Processors},
|
||||||
TributaryP2p, P2pMessageKind, P2p,
|
TributaryP2p, ReqResMessageKind, GossipMessageKind, P2pMessageKind, Message as P2pMessage, P2p,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub mod tributary;
|
pub mod tributary;
|
||||||
|
@ -45,7 +45,10 @@ impl Processors for MemProcessors {
|
||||||
|
|
||||||
#[allow(clippy::type_complexity)]
|
#[allow(clippy::type_complexity)]
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct LocalP2p(usize, pub Arc<RwLock<(HashSet<Vec<u8>>, Vec<VecDeque<(usize, Vec<u8>)>>)>>);
|
pub struct LocalP2p(
|
||||||
|
usize,
|
||||||
|
pub Arc<RwLock<(HashSet<Vec<u8>>, Vec<VecDeque<(usize, P2pMessageKind, Vec<u8>)>>)>>,
|
||||||
|
);
|
||||||
|
|
||||||
impl LocalP2p {
|
impl LocalP2p {
|
||||||
pub fn new(validators: usize) -> Vec<LocalP2p> {
|
pub fn new(validators: usize) -> Vec<LocalP2p> {
|
||||||
|
@ -66,10 +69,12 @@ impl P2p for LocalP2p {
|
||||||
async fn unsubscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {}
|
async fn unsubscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {}
|
||||||
|
|
||||||
async fn send_raw(&self, to: Self::Id, msg: Vec<u8>) {
|
async fn send_raw(&self, to: Self::Id, msg: Vec<u8>) {
|
||||||
self.1.write().await.1[to].push_back((self.0, msg));
|
let mut msg_ref = msg.as_slice();
|
||||||
|
let kind = ReqResMessageKind::read(&mut msg_ref).unwrap();
|
||||||
|
self.1.write().await.1[to].push_back((self.0, P2pMessageKind::ReqRes(kind), msg_ref.to_vec()));
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn broadcast_raw(&self, _kind: P2pMessageKind, msg: Vec<u8>) {
|
async fn broadcast_raw(&self, kind: P2pMessageKind, msg: Vec<u8>) {
|
||||||
// Content-based deduplication
|
// Content-based deduplication
|
||||||
let mut lock = self.1.write().await;
|
let mut lock = self.1.write().await;
|
||||||
{
|
{
|
||||||
|
@ -81,19 +86,26 @@ impl P2p for LocalP2p {
|
||||||
}
|
}
|
||||||
let queues = &mut lock.1;
|
let queues = &mut lock.1;
|
||||||
|
|
||||||
|
let kind_len = (match kind {
|
||||||
|
P2pMessageKind::ReqRes(kind) => kind.serialize(),
|
||||||
|
P2pMessageKind::Gossip(kind) => kind.serialize(),
|
||||||
|
})
|
||||||
|
.len();
|
||||||
|
let msg = msg[kind_len ..].to_vec();
|
||||||
|
|
||||||
for (i, msg_queue) in queues.iter_mut().enumerate() {
|
for (i, msg_queue) in queues.iter_mut().enumerate() {
|
||||||
if i == self.0 {
|
if i == self.0 {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
msg_queue.push_back((self.0, msg.clone()));
|
msg_queue.push_back((self.0, kind, msg.clone()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_raw(&self) -> (Self::Id, Vec<u8>) {
|
async fn receive(&self) -> P2pMessage<Self> {
|
||||||
// This is a cursed way to implement an async read from a Vec
|
// This is a cursed way to implement an async read from a Vec
|
||||||
loop {
|
loop {
|
||||||
if let Some(res) = self.1.write().await.1[self.0].pop_front() {
|
if let Some((sender, kind, msg)) = self.1.write().await.1[self.0].pop_front() {
|
||||||
return res;
|
return P2pMessage { sender, kind, msg };
|
||||||
}
|
}
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
}
|
}
|
||||||
|
@ -103,6 +115,11 @@ impl P2p for LocalP2p {
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl TributaryP2p for LocalP2p {
|
impl TributaryP2p for LocalP2p {
|
||||||
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
|
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
|
||||||
<Self as P2p>::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await
|
<Self as P2p>::broadcast(
|
||||||
|
self,
|
||||||
|
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)),
|
||||||
|
msg,
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,7 @@ use serai_db::MemDb;
|
||||||
use tributary::Tributary;
|
use tributary::Tributary;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
P2pMessageKind, P2p,
|
GossipMessageKind, P2pMessageKind, P2p,
|
||||||
tributary::{Transaction, TributarySpec},
|
tributary::{Transaction, TributarySpec},
|
||||||
tests::LocalP2p,
|
tests::LocalP2p,
|
||||||
};
|
};
|
||||||
|
@ -98,7 +98,7 @@ pub async fn run_tributaries(
|
||||||
for (p2p, tributary) in &mut tributaries {
|
for (p2p, tributary) in &mut tributaries {
|
||||||
while let Poll::Ready(msg) = poll!(p2p.receive()) {
|
while let Poll::Ready(msg) = poll!(p2p.receive()) {
|
||||||
match msg.kind {
|
match msg.kind {
|
||||||
P2pMessageKind::Tributary(genesis) => {
|
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => {
|
||||||
assert_eq!(genesis, tributary.genesis());
|
assert_eq!(genesis, tributary.genesis());
|
||||||
if tributary.handle_message(&msg.msg).await {
|
if tributary.handle_message(&msg.msg).await {
|
||||||
p2p.broadcast(msg.kind, msg.msg).await;
|
p2p.broadcast(msg.kind, msg.msg).await;
|
||||||
|
@ -173,7 +173,7 @@ async fn tributary_test() {
|
||||||
for (p2p, tributary) in &mut tributaries {
|
for (p2p, tributary) in &mut tributaries {
|
||||||
while let Poll::Ready(msg) = poll!(p2p.receive()) {
|
while let Poll::Ready(msg) = poll!(p2p.receive()) {
|
||||||
match msg.kind {
|
match msg.kind {
|
||||||
P2pMessageKind::Tributary(genesis) => {
|
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => {
|
||||||
assert_eq!(genesis, tributary.genesis());
|
assert_eq!(genesis, tributary.genesis());
|
||||||
tributary.handle_message(&msg.msg).await;
|
tributary.handle_message(&msg.msg).await;
|
||||||
}
|
}
|
||||||
|
@ -199,7 +199,7 @@ async fn tributary_test() {
|
||||||
for (p2p, tributary) in &mut tributaries {
|
for (p2p, tributary) in &mut tributaries {
|
||||||
while let Poll::Ready(msg) = poll!(p2p.receive()) {
|
while let Poll::Ready(msg) = poll!(p2p.receive()) {
|
||||||
match msg.kind {
|
match msg.kind {
|
||||||
P2pMessageKind::Tributary(genesis) => {
|
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => {
|
||||||
assert_eq!(genesis, tributary.genesis());
|
assert_eq!(genesis, tributary.genesis());
|
||||||
tributary.handle_message(&msg.msg).await;
|
tributary.handle_message(&msg.msg).await;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue