diff --git a/Cargo.lock b/Cargo.lock index 08edc496..57e438de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1314,15 +1314,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "cbor4ii" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59b4c883b9cc4757b061600d39001d4d0232bece4a3174696cf8f58a14db107d" -dependencies = [ - "serde", -] - [[package]] name = "cc" version = "1.0.88" @@ -4129,7 +4120,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8e3b4d67870478db72bac87bfc260ee6641d0734e0e3e275798f089c3fecfd4" dependencies = [ "async-trait", - "cbor4ii", "futures", "instant", "libp2p-core", @@ -4137,7 +4127,6 @@ dependencies = [ "libp2p-swarm", "log", "rand", - "serde", "smallvec", "void", ] diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index d12c3933..ae4e2be7 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -51,7 +51,7 @@ env_logger = { version = "0.10", default-features = false, features = ["humantim futures-util = { version = "0.3", default-features = false, features = ["std"] } tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] } -libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "cbor", "request-response", "gossipsub", "macros"] } +libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "request-response", "gossipsub", "macros"] } [dev-dependencies] tributary = { package = "tributary-chain", path = "./tributary", features = ["tests"] } diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 5e56073a..d147b588 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -1,7 +1,7 @@ use core::{time::Duration, fmt}; use std::{ sync::Arc, - io::Read, + io::{self, Read}, collections::{HashSet, HashMap}, time::{SystemTime, Instant}, }; @@ -15,13 +15,12 @@ use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorS use serai_db::Db; -use futures_util::StreamExt; +use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt}; use tokio::{ sync::{Mutex, RwLock, mpsc, broadcast}, time::sleep, }; -// TODO: Remove cbor use libp2p::{ core::multiaddr::{Protocol, Multiaddr}, identity::Keypair, @@ -29,7 +28,8 @@ use libp2p::{ tcp::Config as TcpConfig, noise, yamux, request_response::{ - Config as RrConfig, Message as RrMessage, Event as RrEvent, cbor::Behaviour as RrBehavior, + Codec as RrCodecTrait, Message as RrMessage, Event as RrEvent, Config as RrConfig, + Behaviour as RrBehavior, }, gossipsub::{ IdentTopic, FastMessageId, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder, @@ -44,6 +44,8 @@ pub(crate) use tributary::{ReadWrite, P2p as TributaryP2p}; use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent}; +// Block size limit + 1 KB of space for signatures/metadata +const MAX_LIBP2P_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024; const LIBP2P_TOPIC: &str = "serai-coordinator"; #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)] @@ -210,9 +212,66 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p { } } +#[derive(Default, Clone, Copy, PartialEq, Eq, Debug)] +struct RrCodec; +#[async_trait] +impl RrCodecTrait for RrCodec { + type Protocol = &'static str; + type Request = Vec; + type Response = Vec; + + async fn read_request( + &mut self, + _: &Self::Protocol, + io: &mut R, + ) -> io::Result> { + let mut len = [0; 4]; + io.read_exact(&mut len).await?; + let len = usize::try_from(u32::from_le_bytes(len)).expect("not a 32-bit platform?"); + if len > MAX_LIBP2P_MESSAGE_SIZE { + Err(io::Error::other("request length exceeded MAX_LIBP2P_MESSAGE_SIZE"))?; + } + // This may be a non-trivial allocation easily causable + // While we could chunk the read, meaning we only perform the allocation as bandwidth is used, + // the max message size should be sufficiently sane + let mut buf = vec![0; len]; + io.read_exact(&mut buf).await?; + Ok(buf) + } + async fn read_response( + &mut self, + proto: &Self::Protocol, + io: &mut R, + ) -> io::Result> { + self.read_request(proto, io).await + } + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut W, + req: Vec, + ) -> io::Result<()> { + io.write_all( + &u32::try_from(req.len()) + .map_err(|_| io::Error::other("request length exceeded 2**32"))? + .to_le_bytes(), + ) + .await?; + io.write_all(&req).await + } + async fn write_response( + &mut self, + proto: &Self::Protocol, + io: &mut W, + res: Vec, + ) -> io::Result<()> { + self.write_request(proto, io, res).await + } +} + #[derive(NetworkBehaviour)] struct Behavior { - reqres: RrBehavior, Vec>, + reqres: RrBehavior, gossipsub: GsBehavior, } @@ -233,9 +292,6 @@ impl fmt::Debug for LibP2p { impl LibP2p { #[allow(clippy::new_without_default)] pub fn new(serai: Arc) -> Self { - // Block size limit + 1 KB of space for signatures/metadata - const MAX_LIBP2P_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024; - log::info!("creating a libp2p instance"); let throwaway_key_pair = Keypair::generate_ed25519();