add more docs

This commit is contained in:
Boog900 2024-05-15 16:30:46 +01:00
parent 7881013380
commit df6d6ec187
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
2 changed files with 42 additions and 6 deletions

View file

@ -35,9 +35,12 @@ use crate::constants::{
MAX_TXS_IN_BROADCAST_CHANNEL, SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT, MAX_TXS_IN_BROADCAST_CHANNEL, SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT,
}; };
/// The configuration for the [`BroadcastSvc`].
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct BroadcastConfig { pub struct BroadcastConfig {
/// The average number of seconds between diffusion flushes for outbound connections.
pub diffusion_flush_average_seconds_outbound: Duration, pub diffusion_flush_average_seconds_outbound: Duration,
/// The average number of seconds between diffusion flushes for inbound connections.
pub diffusion_flush_average_seconds_inbound: Duration, pub diffusion_flush_average_seconds_inbound: Duration,
} }
@ -141,9 +144,12 @@ pub enum BroadcastRequest<N: NetworkZone> {
/// will only be broadcast to that sub-set of peers, if it is [`None`] then the transaction will /// will only be broadcast to that sub-set of peers, if it is [`None`] then the transaction will
/// be broadcast to all peers. /// be broadcast to all peers.
Transaction { Transaction {
/// The serialised tx to broadcast.
tx_bytes: Bytes, tx_bytes: Bytes,
/// The direction of peers to broadcast this tx to, if [`None`] it will be sent to all peers.
direction: Option<ConnectionDirection>, direction: Option<ConnectionDirection>,
skip_peers: Arc<std::sync::Mutex<HashSet<InternalPeerID<N::Addr>>>>, /// The peer on this network that told us about the tx.
received_from: Option<InternalPeerID<N::Addr>>,
}, },
} }
@ -180,12 +186,12 @@ impl<N: NetworkZone> Service<BroadcastRequest<N>> for BroadcastSvc<N> {
} }
BroadcastRequest::Transaction { BroadcastRequest::Transaction {
tx_bytes, tx_bytes,
skip_peers, received_from,
direction, direction,
} => { } => {
let nex_tx_info = BroadcastTxInfo { let nex_tx_info = BroadcastTxInfo {
tx: tx_bytes, tx: tx_bytes,
skip_peers, received_from,
}; };
let _ = match direction { let _ = match direction {
@ -219,6 +225,7 @@ impl<T: Clone> Clone for CloneableBroadcastRecover<T> {
} }
} }
/// A new block to broadcast.
#[derive(Clone)] #[derive(Clone)]
struct NewBlockInfo { struct NewBlockInfo {
/// The block. /// The block.
@ -227,26 +234,39 @@ struct NewBlockInfo {
current_blockchain_height: u64, current_blockchain_height: u64,
} }
/// A new transaction to broadcast.
#[derive(Clone)] #[derive(Clone)]
struct BroadcastTxInfo<N: NetworkZone> { struct BroadcastTxInfo<N: NetworkZone> {
/// The tx.
tx: Bytes, tx: Bytes,
skip_peers: Arc<std::sync::Mutex<HashSet<InternalPeerID<N::Addr>>>>, /// The peer that sent us this tx (if the peer is on this network).
received_from: Option<InternalPeerID<N::Addr>>,
} }
/// A [`Stream`] that returns [`BroadcastMessage`] to broadcast to a peer.
///
/// This is given to the connection task to await on for broadcast messages.
#[pin_project::pin_project] #[pin_project::pin_project]
pub struct BroadcastMessageStream<N: NetworkZone> { pub struct BroadcastMessageStream<N: NetworkZone> {
/// The peer that is holding this stream.
addr: InternalPeerID<N::Addr>, addr: InternalPeerID<N::Addr>,
/// The channel where new blocks are received.
#[pin] #[pin]
new_block_watch: WatchStream<NewBlockInfo>, new_block_watch: WatchStream<NewBlockInfo>,
/// The channel where txs to broadcast are received.
tx_broadcast_channel: broadcast::Receiver<BroadcastTxInfo<N>>, tx_broadcast_channel: broadcast::Receiver<BroadcastTxInfo<N>>,
/// The distribution to generate the wait time before the next transaction
/// diffusion flush.
diffusion_flush_dist: Exp<f64>, diffusion_flush_dist: Exp<f64>,
/// A [`Sleep`] that will awake when it's time to broadcast txs.
#[pin] #[pin]
next_flush: Sleep, next_flush: Sleep,
} }
impl<N: NetworkZone> BroadcastMessageStream<N> { impl<N: NetworkZone> BroadcastMessageStream<N> {
/// Creates a new [`BroadcastMessageStream`]
fn new( fn new(
addr: InternalPeerID<N::Addr>, addr: InternalPeerID<N::Addr>,
diffusion_flush_dist: Exp<f64>, diffusion_flush_dist: Exp<f64>,
@ -258,6 +278,7 @@ impl<N: NetworkZone> BroadcastMessageStream<N> {
Self { Self {
addr, addr,
// We don't want to broadcast the message currently in the queue.
new_block_watch: WatchStream::from_changes(new_block_watch), new_block_watch: WatchStream::from_changes(new_block_watch),
tx_broadcast_channel, tx_broadcast_channel,
diffusion_flush_dist, diffusion_flush_dist,
@ -272,6 +293,7 @@ impl<N: NetworkZone> Stream for BroadcastMessageStream<N> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project(); let mut this = self.project();
// Prioritise blocks.
if let Poll::Ready(res) = this.new_block_watch.poll_next(cx) { if let Poll::Ready(res) = this.new_block_watch.poll_next(cx) {
let Some(block) = res else { let Some(block) = res else {
return Poll::Ready(None); return Poll::Ready(None);
@ -281,6 +303,7 @@ impl<N: NetworkZone> Stream for BroadcastMessageStream<N> {
b: BlockCompleteEntry { b: BlockCompleteEntry {
pruned: false, pruned: false,
block: block.block_bytes, block: block.block_bytes,
// This is a full fluffy block these values do not need to be set.
block_weight: 0, block_weight: 0,
txs: TransactionBlobs::None, txs: TransactionBlobs::None,
}, },
@ -322,6 +345,8 @@ impl<N: NetworkZone> Stream for BroadcastMessageStream<N> {
} }
} }
/// Returns a list of new transactions to broadcast and a [`bool`] for if there are more txs in the queue
/// that won't fit in the current batch.
fn get_txs_to_broadcast<N: NetworkZone>( fn get_txs_to_broadcast<N: NetworkZone>(
addr: &InternalPeerID<N::Addr>, addr: &InternalPeerID<N::Addr>,
broadcast_rx: &mut broadcast::Receiver<BroadcastTxInfo<N>>, broadcast_rx: &mut broadcast::Receiver<BroadcastTxInfo<N>>,
@ -336,7 +361,8 @@ fn get_txs_to_broadcast<N: NetworkZone>(
loop { loop {
match broadcast_rx.try_recv() { match broadcast_rx.try_recv() {
Ok(txs) => { Ok(txs) => {
if txs.skip_peers.lock().unwrap().contains(addr) { if txs.received_from.is_some_and(|from| &from == addr) {
// If we are the one that sent this tx don't broadcast it back to us.
continue; continue;
} }

View file

@ -9,10 +9,20 @@ pub(crate) const MAX_SEED_CONNECTIONS: usize = 3;
/// The timeout for when we fail to find a peer to connect to. /// The timeout for when we fail to find a peer to connect to.
pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5); pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5);
/// The default amount of time between inbound diffusion flushes.
pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5); pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5);
/// The default amount of time between outbound diffusion flushes.
///
/// Shorter than inbound as we control these connections.
pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_OUTBOUND: Duration = Duration::from_millis(2500); pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_OUTBOUND: Duration = Duration::from_millis(2500);
pub(crate) const SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT: usize = 10_000_000; /// This size limit on [`NewTransactions`](monero_wire::protocol::NewTransactions) messages that we create.
pub(crate) const SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT: usize = 10 * 1024 * 1024;
/// The amount of transactions in the broadcast queue. When this value is hit old transactions will be dropped from
/// the queue.
///
/// Because of internal implementation details this value is _always_ hit, i.e. transactions will not be dropped until
/// 50 more after it are added.
pub(crate) const MAX_TXS_IN_BROADCAST_CHANNEL: usize = 50; pub(crate) const MAX_TXS_IN_BROADCAST_CHANNEL: usize = 50;