Use multiple LibP2P topics

We still need a peering protocol... Hopefully, we can read peers off of the
Substrate node's DHT.
This commit is contained in:
Luke Parker 2023-11-18 20:37:53 -05:00
parent 25066437da
commit be48dcc4a4
No known key found for this signature in database
2 changed files with 100 additions and 20 deletions

View file

@ -58,6 +58,16 @@ pub enum P2pMessageKind {
}
impl P2pMessageKind {
fn genesis(&self) -> Option<[u8; 32]> {
match self {
P2pMessageKind::KeepAlive => None,
P2pMessageKind::Tributary(genesis) => Some(*genesis),
P2pMessageKind::Heartbeat(genesis) => Some(*genesis),
P2pMessageKind::Block(genesis) => Some(*genesis),
P2pMessageKind::CosignedBlock => None,
}
}
fn serialize(&self) -> Vec<u8> {
match self {
P2pMessageKind::KeepAlive => vec![0],
@ -119,14 +129,17 @@ pub struct Message<P: P2p> {
pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
type Id: Send + Sync + Clone + Copy + fmt::Debug;
async fn send_raw(&self, to: Self::Id, msg: Vec<u8>);
async fn broadcast_raw(&self, msg: Vec<u8>);
async fn subscribe(&self, genesis: [u8; 32]);
async fn unsubscribe(&self, genesis: [u8; 32]);
async fn send_raw(&self, to: Self::Id, genesis: Option<[u8; 32]>, msg: Vec<u8>);
async fn broadcast_raw(&self, genesis: Option<[u8; 32]>, msg: Vec<u8>);
async fn receive_raw(&self) -> (Self::Id, Vec<u8>);
async fn send(&self, to: Self::Id, kind: P2pMessageKind, msg: Vec<u8>) {
let mut actual_msg = kind.serialize();
actual_msg.extend(msg);
self.send_raw(to, actual_msg).await;
self.send_raw(to, kind.genesis(), actual_msg).await;
}
async fn broadcast(&self, kind: P2pMessageKind, msg: Vec<u8>) {
let mut actual_msg = kind.serialize();
@ -143,7 +156,7 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
}
);
*/
self.broadcast_raw(actual_msg).await;
self.broadcast_raw(kind.genesis(), actual_msg).await;
}
async fn receive(&self) -> Message<Self> {
let (sender, kind, msg) = loop {
@ -185,10 +198,11 @@ struct Behavior {
#[allow(clippy::type_complexity)]
#[derive(Clone)]
pub struct LibP2p(
Arc<Mutex<mpsc::UnboundedSender<Vec<u8>>>>,
Arc<Mutex<mpsc::UnboundedReceiver<(PeerId, Vec<u8>)>>>,
);
pub struct LibP2p {
subscribe: Arc<Mutex<mpsc::UnboundedSender<(bool, [u8; 32])>>>,
broadcast: Arc<Mutex<mpsc::UnboundedSender<(Option<[u8; 32]>, Vec<u8>)>>>,
receive: Arc<Mutex<mpsc::UnboundedReceiver<(PeerId, Vec<u8>)>>>,
}
impl fmt::Debug for LibP2p {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("LibP2p").finish_non_exhaustive()
@ -240,8 +254,7 @@ impl LibP2p {
)
.unwrap();
// Uses a single topic to prevent being a BTC validator only connected to ETH validators,
// unable to communicate with other BTC validators
// Subscribe to the base topic
let topic = IdentTopic::new(LIBP2P_TOPIC);
gossipsub.subscribe(&topic).unwrap();
@ -274,6 +287,11 @@ impl LibP2p {
let (broadcast_send, mut broadcast_recv) = mpsc::unbounded_channel();
let (receive_send, receive_recv) = mpsc::unbounded_channel();
let (subscribe_send, mut subscribe_recv) = mpsc::unbounded_channel();
fn topic_for_genesis(genesis: [u8; 32]) -> IdentTopic {
IdentTopic::new(format!("{LIBP2P_TOPIC}-{}", hex::encode(genesis)))
}
tokio::spawn({
let mut time_of_last_p2p_message = Instant::now();
@ -282,12 +300,19 @@ impl LibP2p {
async fn broadcast_raw(
p2p: &mut Swarm<Behavior>,
time_of_last_p2p_message: &mut Instant,
genesis: Option<[u8; 32]>,
msg: Vec<u8>,
) {
// Update the time of last message
*time_of_last_p2p_message = Instant::now();
match p2p.behaviour_mut().gossipsub.publish(IdentTopic::new(LIBP2P_TOPIC), msg.clone()) {
let topic = if let Some(genesis) = genesis {
topic_for_genesis(genesis)
} else {
IdentTopic::new(LIBP2P_TOPIC)
};
match p2p.behaviour_mut().gossipsub.publish(topic, msg.clone()) {
Err(PublishError::SigningError(e)) => panic!("signing error when broadcasting: {e}"),
Err(PublishError::InsufficientPeers) => {
log::warn!("failed to send p2p message due to insufficient peers")
@ -307,12 +332,32 @@ impl LibP2p {
tokio::select! {
biased;
// Subscribe to any new topics
topic = subscribe_recv.recv() => {
let (subscribe, topic) = topic.expect("subscribe_recv closed. are we shutting down?");
if subscribe {
swarm
.behaviour_mut()
.gossipsub
.subscribe(&topic_for_genesis(topic))
.unwrap();
} else {
swarm
.behaviour_mut()
.gossipsub
.unsubscribe(&topic_for_genesis(topic))
.unwrap();
}
}
// Handle any queued outbound messages
msg = broadcast_recv.recv() => {
let (genesis, msg) = msg.expect("broadcast_recv closed. are we shutting down?");
broadcast_raw(
&mut swarm,
&mut time_of_last_p2p_message,
msg.expect("broadcast_recv closed. are we shutting down?")
genesis,
msg,
).await;
}
@ -363,6 +408,7 @@ impl LibP2p {
broadcast_raw(
&mut swarm,
&mut time_of_last_p2p_message,
None,
P2pMessageKind::KeepAlive.serialize()
).await;
}
@ -371,7 +417,11 @@ impl LibP2p {
}
});
LibP2p(Arc::new(Mutex::new(broadcast_send)), Arc::new(Mutex::new(receive_recv)))
LibP2p {
subscribe: Arc::new(Mutex::new(subscribe_send)),
broadcast: Arc::new(Mutex::new(broadcast_send)),
receive: Arc::new(Mutex::new(receive_recv)),
}
}
}
@ -379,18 +429,41 @@ impl LibP2p {
impl P2p for LibP2p {
type Id = PeerId;
async fn send_raw(&self, _: Self::Id, msg: Vec<u8>) {
self.broadcast_raw(msg).await;
async fn subscribe(&self, genesis: [u8; 32]) {
self
.subscribe
.lock()
.await
.send((true, genesis))
.expect("subscribe_send closed. are we shutting down?");
}
async fn broadcast_raw(&self, msg: Vec<u8>) {
self.0.lock().await.send(msg).expect("broadcast_send closed. are we shutting down?");
async fn unsubscribe(&self, genesis: [u8; 32]) {
self
.subscribe
.lock()
.await
.send((false, genesis))
.expect("subscribe_send closed. are we shutting down?");
}
async fn send_raw(&self, _: Self::Id, genesis: Option<[u8; 32]>, msg: Vec<u8>) {
self.broadcast_raw(genesis, msg).await;
}
async fn broadcast_raw(&self, genesis: Option<[u8; 32]>, msg: Vec<u8>) {
self
.broadcast
.lock()
.await
.send((genesis, msg))
.expect("broadcast_send closed. are we shutting down?");
}
// TODO: We only have a single handle call this. Differentiate Send/Recv to remove this constant
// lock acquisition?
async fn receive_raw(&self) -> (Self::Id, Vec<u8>) {
self.1.lock().await.recv().await.expect("receive_recv closed. are we shutting down?")
self.receive.lock().await.recv().await.expect("receive_recv closed. are we shutting down?")
}
}
@ -473,6 +546,9 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
let (send, mut recv) = mpsc::unbounded_channel();
channels.write().await.insert(genesis, send);
// Subscribe to the topic for this tributary
p2p.subscribe(genesis).await;
// Per-Tributary P2P message handler
tokio::spawn({
let p2p = p2p.clone();
@ -595,6 +671,7 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
TributaryEvent::TributaryRetired(set) => {
if let Some(genesis) = set_to_genesis.remove(&set) {
channels.write().await.remove(&genesis);
p2p.unsubscribe(genesis).await;
}
}
}

View file

@ -62,11 +62,14 @@ impl LocalP2p {
impl P2p for LocalP2p {
type Id = usize;
async fn send_raw(&self, to: Self::Id, msg: Vec<u8>) {
async fn subscribe(&self, _genesis: [u8; 32]) {}
async fn unsubscribe(&self, _genesis: [u8; 32]) {}
async fn send_raw(&self, to: Self::Id, _genesis: Option<[u8; 32]>, msg: Vec<u8>) {
self.1.write().await.1[to].push_back((self.0, msg));
}
async fn broadcast_raw(&self, msg: Vec<u8>) {
async fn broadcast_raw(&self, _genesis: Option<[u8; 32]>, msg: Vec<u8>) {
// Content-based deduplication
let mut lock = self.1.write().await;
{