From 78810133809fbd7ca13659274bb7299bf82cd9ed Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Wed, 15 May 2024 01:47:44 +0100 Subject: [PATCH] add broadcast svc --- p2p/cuprate-p2p/src/broadcast.rs | 368 +++++++++++++++++++++++++++++++ p2p/cuprate-p2p/src/constants.rs | 8 + p2p/cuprate-p2p/src/lib.rs | 1 + 3 files changed, 377 insertions(+) create mode 100644 p2p/cuprate-p2p/src/broadcast.rs diff --git a/p2p/cuprate-p2p/src/broadcast.rs b/p2p/cuprate-p2p/src/broadcast.rs new file mode 100644 index 00000000..b03980f9 --- /dev/null +++ b/p2p/cuprate-p2p/src/broadcast.rs @@ -0,0 +1,368 @@ +//! # Broadcast Router +//! +//! This module handles broadcasting messages to multiple peers with the [`BroadcastSvc`]. +use std::{ + collections::HashSet, + future::{ready, Future, Ready}, + pin::{pin, Pin}, + sync::Arc, + task::{ready, Context, Poll}, + time::Duration, +}; + +use bytes::Bytes; +use futures::Stream; +use rand::prelude::*; +use rand_distr::Exp; +use tokio::{ + sync::{ + broadcast::{self, error::TryRecvError}, + watch, + }, + time::{sleep_until, Instant, Sleep}, +}; +use tokio_stream::wrappers::WatchStream; +use tower::Service; + +use monero_p2p::{client::InternalPeerID, BroadcastMessage, ConnectionDirection, NetworkZone}; +use monero_wire::{ + common::{BlockCompleteEntry, TransactionBlobs}, + protocol::{NewFluffyBlock, NewTransactions}, +}; + +use crate::constants::{ + DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND, DIFFUSION_FLUSH_AVERAGE_SECONDS_OUTBOUND, + MAX_TXS_IN_BROADCAST_CHANNEL, SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT, +}; + +#[derive(Debug, Clone)] +pub struct BroadcastConfig { + pub diffusion_flush_average_seconds_outbound: Duration, + pub diffusion_flush_average_seconds_inbound: Duration, +} + +impl Default for BroadcastConfig { + fn default() -> Self { + Self { + diffusion_flush_average_seconds_inbound: DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND, + diffusion_flush_average_seconds_outbound: DIFFUSION_FLUSH_AVERAGE_SECONDS_OUTBOUND, + } + } +} + +/// Initialise the [`BroadcastSvc`] and the functions to produce [`BroadcastMessageStream`]s. +/// +/// This function will return in order: +/// - The [`BroadcastSvc`] +/// - A function that takes in [`InternalPeerID`]s and produces [`BroadcastMessageStream`]s to give to **outbound** peers. +/// - A function that takes in [`InternalPeerID`]s and produces [`BroadcastMessageStream`]s to give to **inbound** peers. +pub fn init_broadcast_channels( + config: &BroadcastConfig, +) -> ( + BroadcastSvc, + impl Fn(InternalPeerID) -> BroadcastMessageStream + Clone + Send + 'static, + impl Fn(InternalPeerID) -> BroadcastMessageStream + Clone + Send + 'static, +) { + let outbound_dist = Exp::new( + 1.0 / config + .diffusion_flush_average_seconds_outbound + .as_secs_f64(), + ) + .unwrap(); + let inbound_dist = + Exp::new(1.0 / config.diffusion_flush_average_seconds_inbound.as_secs_f64()).unwrap(); + + // Set a default value for init - the broadcast streams given to the peer tasks will only broadcast from this channel when the value + // changes so no peer will get sent this. + let (block_watch_sender, block_watch_receiver) = watch::channel(NewBlockInfo { + block_bytes: Default::default(), + current_blockchain_height: 0, + }); + + // create the inbound/outbound broadcast channels. + let (tx_broadcast_channel_outbound_sender, tx_broadcast_channel_outbound_receiver) = + broadcast::channel(MAX_TXS_IN_BROADCAST_CHANNEL); + let (tx_broadcast_channel_inbound_sender, tx_broadcast_channel_inbound_receiver) = + broadcast::channel(MAX_TXS_IN_BROADCAST_CHANNEL); + + // create the broadcast service. + let broadcast_svc = BroadcastSvc { + new_block_watch: block_watch_sender, + tx_broadcast_channel_outbound: tx_broadcast_channel_outbound_sender, + tx_broadcast_channel_inbound: tx_broadcast_channel_inbound_sender, + }; + + // wrap the tx broadcast channels in a wrapper that impls Clone so the closures later on impl clone. + let tx_channel_outbound_receiver_wrapped = + CloneableBroadcastRecover(tx_broadcast_channel_outbound_receiver); + let tx_channel_inbound_receiver_wrapped = + CloneableBroadcastRecover(tx_broadcast_channel_inbound_receiver); + + // Create the closures that will be used to start the broadcast streams that the connection task will hold to listen + // for messages to broadcast. + let block_watch_receiver_cloned = block_watch_receiver.clone(); + let outbound_stream_maker = move |addr| { + BroadcastMessageStream::new( + addr, + outbound_dist, + block_watch_receiver_cloned.clone(), + tx_channel_outbound_receiver_wrapped.clone().0, + ) + }; + + let inbound_stream_maker = move |addr| { + BroadcastMessageStream::new( + addr, + inbound_dist, + block_watch_receiver.clone(), + tx_channel_inbound_receiver_wrapped.clone().0, + ) + }; + + (broadcast_svc, outbound_stream_maker, inbound_stream_maker) +} + +/// A request to broadcast some data to all connected peers or a sub-set like all inbound or all outbound. +/// +/// Only certain P2P messages are supported here: [`NewFluffyBlock`] and [`NewTransactions`]. These are the only +/// P2P messages that make sense to broadcast to multiple peers. +/// +/// [`NewBlock`](monero_wire::protocol::NewBlock) has been excluded as monerod has had fluffy blocks for a while and +/// Cuprate sets fluffy blocks as a requirement during handshakes. +pub enum BroadcastRequest { + /// Broadcast a block to the network. The block will be broadcast as a fluffy block to all peers. + Block { + /// The block. + block_bytes: Bytes, + /// The current chain height - will be 1 more than the blocks' height. + current_blockchain_height: u64, + }, + /// Broadcast transactions to the network. If a [`ConnectionDirection`] is set the transaction + /// will only be broadcast to that sub-set of peers, if it is [`None`] then the transaction will + /// be broadcast to all peers. + Transaction { + tx_bytes: Bytes, + direction: Option, + skip_peers: Arc>>>, + }, +} + +pub struct BroadcastSvc { + new_block_watch: watch::Sender, + tx_broadcast_channel_outbound: broadcast::Sender>, + tx_broadcast_channel_inbound: broadcast::Sender>, +} + +impl Service> for BroadcastSvc { + type Response = (); + type Error = std::convert::Infallible; + type Future = Ready>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: BroadcastRequest) -> Self::Future { + match req { + BroadcastRequest::Block { + block_bytes, + current_blockchain_height, + } => { + tracing::debug!( + "queuing block at chain height {} for broadcast", + current_blockchain_height + ); + + self.new_block_watch.send_replace(NewBlockInfo { + block_bytes, + current_blockchain_height, + }); + } + BroadcastRequest::Transaction { + tx_bytes, + skip_peers, + direction, + } => { + let nex_tx_info = BroadcastTxInfo { + tx: tx_bytes, + skip_peers, + }; + + let _ = match direction { + Some(ConnectionDirection::InBound) => { + self.tx_broadcast_channel_inbound.send(nex_tx_info) + } + Some(ConnectionDirection::OutBound) => { + self.tx_broadcast_channel_outbound.send(nex_tx_info) + } + None => { + let _ = self.tx_broadcast_channel_outbound.send(nex_tx_info.clone()); + self.tx_broadcast_channel_inbound.send(nex_tx_info) + } + }; + } + } + + ready(Ok(())) + } +} + +/// A wrapper type that impls [`Clone`] for [`broadcast::Receiver`]. +/// +/// The clone impl just calls [`Receiver::resubscribe`](broadcast::Receiver::resubscribe), which isn't _exactly_ +/// a clone but is what we need for our use case. +struct CloneableBroadcastRecover(broadcast::Receiver); + +impl Clone for CloneableBroadcastRecover { + fn clone(&self) -> Self { + Self(self.0.resubscribe()) + } +} + +#[derive(Clone)] +struct NewBlockInfo { + /// The block. + block_bytes: Bytes, + /// The current chain height - will be 1 more than the blocks' height. + current_blockchain_height: u64, +} + +#[derive(Clone)] +struct BroadcastTxInfo { + tx: Bytes, + skip_peers: Arc>>>, +} + +#[pin_project::pin_project] +pub struct BroadcastMessageStream { + addr: InternalPeerID, + + #[pin] + new_block_watch: WatchStream, + tx_broadcast_channel: broadcast::Receiver>, + + diffusion_flush_dist: Exp, + #[pin] + next_flush: Sleep, +} + +impl BroadcastMessageStream { + fn new( + addr: InternalPeerID, + diffusion_flush_dist: Exp, + new_block_watch: watch::Receiver, + tx_broadcast_channel: broadcast::Receiver>, + ) -> Self { + let next_flush = Instant::now() + + Duration::from_secs_f64(diffusion_flush_dist.sample(&mut thread_rng())); + + Self { + addr, + new_block_watch: WatchStream::from_changes(new_block_watch), + tx_broadcast_channel, + diffusion_flush_dist, + next_flush: sleep_until(next_flush), + } + } +} + +impl Stream for BroadcastMessageStream { + type Item = BroadcastMessage; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + if let Poll::Ready(res) = this.new_block_watch.poll_next(cx) { + let Some(block) = res else { + return Poll::Ready(None); + }; + + let block_mes = NewFluffyBlock { + b: BlockCompleteEntry { + pruned: false, + block: block.block_bytes, + block_weight: 0, + txs: TransactionBlobs::None, + }, + current_blockchain_height: block.current_blockchain_height, + }; + + return Poll::Ready(Some(BroadcastMessage::NewFluffyBlock(block_mes))); + } + + ready!(this.next_flush.as_mut().poll(cx)); + + let (txs, more_available) = get_txs_to_broadcast::(this.addr, this.tx_broadcast_channel); + + let next_flush = if more_available { + // If there are more txs to broadcast then set the next flush for now so we get woken up straight away. + Instant::now() + } else { + Instant::now() + + Duration::from_secs_f64(this.diffusion_flush_dist.sample(&mut thread_rng())) + }; + + let next_flush = sleep_until(next_flush); + this.next_flush.set(next_flush); + + if let Some(txs) = txs { + tracing::debug!( + "Diffusion flush timer expired, diffusing {} txs", + txs.txs.len() + ); + // no need to poll next_flush as we are ready now. + Poll::Ready(Some(BroadcastMessage::NewTransaction(txs))) + } else { + tracing::trace!("Diffusion flush timer expired but no txs to diffuse"); + // poll next_flush now to register the waker with it + // the waker will already be registered with the block broadcast channel. + let _ = this.next_flush.poll(cx); + Poll::Pending + } + } +} + +fn get_txs_to_broadcast( + addr: &InternalPeerID, + broadcast_rx: &mut broadcast::Receiver>, +) -> (Option, bool) { + let mut new_txs = NewTransactions { + txs: vec![], + dandelionpp_fluff: true, + padding: Bytes::new(), + }; + let mut total_size = 0; + + loop { + match broadcast_rx.try_recv() { + Ok(txs) => { + if txs.skip_peers.lock().unwrap().contains(addr) { + continue; + } + + total_size += txs.tx.len(); + + new_txs.txs.push(txs.tx); + + if total_size > SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT { + return (Some(new_txs), true); + } + } + Err(e) => match e { + TryRecvError::Empty | TryRecvError::Closed => { + if new_txs.txs.is_empty() { + return (None, false); + } + return (Some(new_txs), false); + } + TryRecvError::Lagged(lag) => { + tracing::debug!( + "{} transaction broadcast messages were missed, continuing.", + lag + ); + continue; + } + }, + } + } +} diff --git a/p2p/cuprate-p2p/src/constants.rs b/p2p/cuprate-p2p/src/constants.rs index 58e263fa..c90fe9eb 100644 --- a/p2p/cuprate-p2p/src/constants.rs +++ b/p2p/cuprate-p2p/src/constants.rs @@ -8,3 +8,11 @@ pub(crate) const MAX_SEED_CONNECTIONS: usize = 3; /// The timeout for when we fail to find a peer to connect to. pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5); + +pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5); + +pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_OUTBOUND: Duration = Duration::from_millis(2500); + +pub(crate) const SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT: usize = 10_000_000; + +pub(crate) const MAX_TXS_IN_BROADCAST_CHANNEL: usize = 50; diff --git a/p2p/cuprate-p2p/src/lib.rs b/p2p/cuprate-p2p/src/lib.rs index 0c53b78f..0b87fb48 100644 --- a/p2p/cuprate-p2p/src/lib.rs +++ b/p2p/cuprate-p2p/src/lib.rs @@ -7,6 +7,7 @@ //! #![allow(dead_code)] +mod broadcast; pub mod client_pool; pub mod config; pub mod connection_maintainer;