From 75306babf8fd7a5fe6b158f60debd3a62d4e8d4a Mon Sep 17 00:00:00 2001 From: Boog900 Date: Sun, 5 May 2024 19:22:41 +0000 Subject: [PATCH] dandelion++ lib (#111) * init D++ * init D++ router * working D++ router * add test * D++ tx pool * add more txpool docs * add a txpool builder * add tracing * add more docs * fix doc * reduce test epoch (windows CI fail) * generate first state in config Windows seems to not allows taking a big value from an instant * extend tests * clippy * review comments + more docs * Apply suggestions from code review Co-authored-by: hinto-janai * update Cargo.lock * rename txpool.rs -> pool.rs * review comments * Update p2p/dandelion/src/tests/router.rs Co-authored-by: hinto-janai * Update p2p/dandelion/src/router.rs Co-authored-by: hinto-janai --------- Co-authored-by: hinto-janai --- Cargo.lock | 15 + Cargo.toml | 2 + p2p/dandelion/Cargo.toml | 27 ++ p2p/dandelion/src/config.rs | 149 +++++++++ p2p/dandelion/src/lib.rs | 70 ++++ p2p/dandelion/src/pool.rs | 510 ++++++++++++++++++++++++++++++ p2p/dandelion/src/router.rs | 348 ++++++++++++++++++++ p2p/dandelion/src/tests/mod.rs | 130 ++++++++ p2p/dandelion/src/tests/pool.rs | 42 +++ p2p/dandelion/src/tests/router.rs | 237 ++++++++++++++ p2p/dandelion/src/traits.rs | 49 +++ 11 files changed, 1579 insertions(+) create mode 100644 p2p/dandelion/Cargo.toml create mode 100644 p2p/dandelion/src/config.rs create mode 100644 p2p/dandelion/src/lib.rs create mode 100644 p2p/dandelion/src/pool.rs create mode 100644 p2p/dandelion/src/router.rs create mode 100644 p2p/dandelion/src/tests/mod.rs create mode 100644 p2p/dandelion/src/tests/pool.rs create mode 100644 p2p/dandelion/src/tests/router.rs create mode 100644 p2p/dandelion/src/traits.rs diff --git a/Cargo.lock b/Cargo.lock index 0750040..b08aee5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -656,6 +656,21 @@ dependencies = [ "zeroize", ] +[[package]] +name = "dandelion_tower" +version = "0.1.0" +dependencies = [ + "futures", + "proptest", + "rand", + "rand_distr", + "thiserror", + "tokio", + "tokio-util", + "tower", + "tracing", +] + [[package]] name = "diff" version = "0.1.13" diff --git a/Cargo.toml b/Cargo.toml index ee2ddc1..48bad34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "net/fixed-bytes", "net/levin", "net/monero-wire", + "p2p/dandelion", "p2p/monero-p2p", "p2p/address-book", "pruning", @@ -60,6 +61,7 @@ paste = { version = "1.0.14", default-features = false } pin-project = { version = "1.1.3", default-features = false } randomx-rs = { git = "https://github.com/Cuprate/randomx-rs.git", rev = "0028464", default-features = false } rand = { version = "0.8.5", default-features = false } +rand_distr = { version = "0.4.3", default-features = false } rayon = { version = "1.9.0", default-features = false } serde_bytes = { version = "0.11.12", default-features = false } serde_json = { version = "1.0.108", default-features = false } diff --git a/p2p/dandelion/Cargo.toml b/p2p/dandelion/Cargo.toml new file mode 100644 index 0000000..a8a0469 --- /dev/null +++ b/p2p/dandelion/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "dandelion_tower" +version = "0.1.0" +edition = "2021" +license = "MIT" +authors = ["Boog900"] + +[features] +default = ["txpool"] +txpool = ["dep:rand_distr", "dep:tokio-util", "dep:tokio"] + +[dependencies] +tower = { workspace = true, features = ["discover", "util"] } +tracing = { workspace = true, features = ["std"] } + +futures = { workspace = true, features = ["std"] } +tokio = { workspace = true, features = ["rt", "sync", "macros"], optional = true} +tokio-util = { workspace = true, features = ["time"], optional = true } + +rand = { workspace = true, features = ["std", "std_rng"] } +rand_distr = { workspace = true, features = ["std"], optional = true } + +thiserror = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["rt-multi-thread", "macros", "sync"] } +proptest = { workspace = true, features = ["default"] } \ No newline at end of file diff --git a/p2p/dandelion/src/config.rs b/p2p/dandelion/src/config.rs new file mode 100644 index 0000000..71a4e5b --- /dev/null +++ b/p2p/dandelion/src/config.rs @@ -0,0 +1,149 @@ +use std::{ + ops::{Mul, Neg}, + time::Duration, +}; + +/// When calculating the embargo timeout using the formula: `(-k*(k-1)*hop)/(2*log(1-ep))` +/// +/// (1 - ep) is the probability that a transaction travels for `k` hops before a nodes embargo timeout fires, this constant is (1 - ep). +const EMBARGO_FULL_TRAVEL_PROBABILITY: f64 = 0.90; + +/// The graph type to use for dandelion routing, the dandelion paper recommends [Graph::FourRegular]. +/// +/// The decision between line graphs and 4-regular graphs depend on the priorities of the system, if +/// linkability of transactions is a first order concern then line graphs may be better, however 4-regular graphs +/// can give constant-order privacy benefits against adversaries with knowledge of the graph. +/// +/// See appendix C of the dandelion++ paper. +#[derive(Default, Debug, Copy, Clone)] +pub enum Graph { + /// Line graph. + /// + /// When this is selected one peer will be chosen from the outbound peers each epoch to route transactions + /// to. + /// + /// In general this is not recommend over [`Graph::FourRegular`] but may be better for certain systems. + Line, + /// Quasi-4-Regular. + /// + /// When this is selected two peers will be chosen from the outbound peers each epoch, each stem transaction + /// received will then be sent to one of these two peers. Transactions from the same node will always go to the + /// same peer. + #[default] + FourRegular, +} + +/// The config used to initialize dandelion. +/// +/// One notable missing item from the config is `Tbase` AKA the timeout parameter to prevent black hole +/// attacks. This is removed from the config for simplicity, `Tbase` is calculated using the formula provided +/// in the D++ paper: +/// +/// `(-k*(k-1)*hop)/(2*log(1-ep))` +/// +/// Where `k` is calculated from the fluff probability, `hop` is `time_between_hop` and `ep` is fixed at `0.1`. +/// +#[derive(Debug, Clone, Copy)] +pub struct DandelionConfig { + /// The time it takes for a stem transaction to pass through a node, including network latency. + /// + /// It's better to be safe and put a slightly higher value than lower. + pub time_between_hop: Duration, + /// The duration of an epoch. + pub epoch_duration: Duration, + /// `q` in the dandelion paper, this is the probability that a node will be in the fluff state for + /// a certain epoch. + /// + /// The dandelion paper recommends to make this value small, but the smaller this value, the higher + /// the broadcast latency. + /// + /// It is recommended for this value to be <= `0.2`, this value *MUST* be in range `0.0..=1.0`. + pub fluff_probability: f64, + /// The graph type. + pub graph: Graph, +} + +impl DandelionConfig { + /// Returns the number of outbound peers to use to stem transactions. + /// + /// This value depends on the [`Graph`] chosen. + pub fn number_of_stems(&self) -> usize { + match self.graph { + Graph::Line => 1, + Graph::FourRegular => 2, + } + } + + /// Returns the average embargo timeout, `Tbase` in the dandelion++ paper. + /// + /// This is the average embargo timeout _only including this node_ with `k` nodes also putting an embargo timeout + /// using the exponential distribution, the average until one of them fluffs is `Tbase / k`. + pub fn average_embargo_timeout(&self) -> Duration { + // we set k equal to the expected stem length with this fluff probability. + let k = self.expected_stem_length(); + let time_between_hop = self.time_between_hop.as_secs_f64(); + + Duration::from_secs_f64( + // (-k*(k-1)*hop)/(2*ln(1-ep)) + ((k.neg() * (k - 1.0) * time_between_hop) + / EMBARGO_FULL_TRAVEL_PROBABILITY.ln().mul(2.0)) + .ceil(), + ) + } + + /// Returns the expected length of a stem. + pub fn expected_stem_length(&self) -> f64 { + self.fluff_probability.recip() + } +} + +#[cfg(test)] +mod tests { + use std::{ + f64::consts::E, + ops::{Mul, Neg}, + time::Duration, + }; + + use proptest::{prop_assert, proptest}; + + use super::*; + + #[test] + fn monerod_average_embargo_timeout() { + let cfg = DandelionConfig { + time_between_hop: Duration::from_millis(175), + epoch_duration: Default::default(), + fluff_probability: 0.125, + graph: Default::default(), + }; + + assert_eq!(cfg.average_embargo_timeout(), Duration::from_secs(47)); + } + + proptest! { + #[test] + fn embargo_full_travel_probablity_correct(time_between_hop in 1_u64..1_000_000, fluff_probability in 0.000001..1.0) { + let cfg = DandelionConfig { + time_between_hop: Duration::from_millis(time_between_hop), + epoch_duration: Default::default(), + fluff_probability, + graph: Default::default(), + }; + + // assert that the `average_embargo_timeout` is high enough that the probability of `k` nodes + // not diffusing before expected diffusion is greater than or equal to `EMBARGO_FULL_TRAVEL_PROBABLY` + // + // using the formula from in appendix B.5 + let k = cfg.expected_stem_length(); + let time_between_hop = cfg.time_between_hop.as_secs_f64(); + + let average_embargo_timeout = cfg.average_embargo_timeout().as_secs_f64(); + + let probability = + E.powf((k.neg() * (k - 1.0) * time_between_hop) / average_embargo_timeout.mul(2.0)); + + prop_assert!(probability >= EMBARGO_FULL_TRAVEL_PROBABILITY, "probability = {probability}, average_embargo_timeout = {average_embargo_timeout}"); + } + } +} diff --git a/p2p/dandelion/src/lib.rs b/p2p/dandelion/src/lib.rs new file mode 100644 index 0000000..f162724 --- /dev/null +++ b/p2p/dandelion/src/lib.rs @@ -0,0 +1,70 @@ +//! # Dandelion Tower +//! +//! This crate implements [dandelion++](https://arxiv.org/pdf/1805.11060.pdf), using [`tower`]. +//! +//! This crate provides 2 [`tower::Service`]s, a [`DandelionRouter`] and a [`DandelionPool`](pool::DandelionPool). +//! The router is pretty minimal and only handles the absolute necessary data to route transactions, whereas the +//! pool keeps track of all data necessary for dandelion++ but requires you to provide a backing tx-pool. +//! +//! This split was done not because the [`DandelionPool`](pool::DandelionPool) is unnecessary but because it is hard +//! to cover a wide range of projects when abstracting over the tx-pool. Not using the [`DandelionPool`](pool::DandelionPool) +//! requires you to implement part of the paper yourself. +//! +//! # Features +//! +//! This crate only has one feature `txpool` which enables [`DandelionPool`](pool::DandelionPool). +//! +//! # Needed Services +//! +//! To use this crate you need to provide a few types. +//! +//! ## Diffuse Service +//! +//! This service should implement diffusion, which is sending the transaction to every peer, with each peer +//! having a timer using the exponential distribution and batch sending all txs that were queued in that time. +//! +//! The diffuse service should have a request of [`DiffuseRequest`](traits::DiffuseRequest) and it's error +//! should be [`tower::BoxError`]. +//! +//! ## Outbound Peer Discoverer +//! +//! The outbound peer [`Discover`](tower::discover::Discover) should provide a stream of randomly selected outbound +//! peers, these peers will then be used to route stem txs to. +//! +//! The peers will not be returned anywhere, so it is recommended to wrap them in some sort of drop guard that returns +//! them back to a peer set. +//! +//! ## Peer Service +//! +//! This service represents a connection to an individual peer, this should be returned from the Outbound Peer +//! Discover. This should immediately send the transaction to the peer when requested, i.e. it should _not_ set +//! a timer. +//! +//! The diffuse service should have a request of [`StemRequest`](traits::StemRequest) and it's error +//! should be [`tower::BoxError`]. +//! +//! ## Backing Pool +//! +//! ([`DandelionPool`](pool::DandelionPool) only) +//! +//! This service is a backing tx-pool, in memory or on disk. +//! The backing pool should have a request of [`TxStoreRequest`](traits::TxStoreRequest) and a response of +//! [`TxStoreResponse`](traits::TxStoreResponse), with an error of [`tower::BoxError`]. +//! +//! Users should keep a handle to the backing pool to request data from it, when requesting data you _must_ +//! make sure you only look in the public pool if you are going to be giving data to peers, as stem transactions +//! must stay private. +//! +//! When removing data, for example because of a new block, you can remove from both pools provided it doesn't leak +//! any data about stem transactions. You will probably want to set up a task that monitors the tx pool for stuck transactions, +//! transactions that slipped in just as one was removed etc, this crate does not handle that. +mod config; +#[cfg(feature = "txpool")] +pub mod pool; +mod router; +#[cfg(test)] +mod tests; +pub mod traits; + +pub use config::*; +pub use router::*; diff --git a/p2p/dandelion/src/pool.rs b/p2p/dandelion/src/pool.rs new file mode 100644 index 0000000..eddcc67 --- /dev/null +++ b/p2p/dandelion/src/pool.rs @@ -0,0 +1,510 @@ +//! # Dandelion++ Pool +//! +//! This module contains [`DandelionPool`] which is a thin wrapper around a backing transaction store, +//! which fully implements the dandelion++ protocol. +//! +//! ### How To Get Txs From [`DandelionPool`]. +//! +//! [`DandelionPool`] does not provide a full tx-pool API. You cannot retrieve transactions from it or +//! check what transactions are in it, to do this you must keep a handle to the backing transaction store +//! yourself. +//! +//! The reason for this is, the [`DandelionPool`] will only itself be passing these requests onto the backing +//! pool, so it makes sense to remove the "middle man". +//! +//! ### Keep Stem Transactions Hidden +//! +//! When using your handle to the backing store it must be remembered to keep transactions in the stem pool hidden. +//! So handle any requests to the tx-pool like the stem side of the pool does not exist. +//! +use std::{ + collections::{HashMap, HashSet}, + future::Future, + hash::Hash, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use futures::{FutureExt, StreamExt}; +use rand::prelude::*; +use rand_distr::Exp; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinSet, +}; +use tokio_util::{sync::PollSender, time::DelayQueue}; +use tower::{Service, ServiceExt}; +use tracing::Instrument; + +use crate::{ + traits::{TxStoreRequest, TxStoreResponse}, + DandelionConfig, DandelionRouteReq, DandelionRouterError, State, TxState, +}; + +/// Start the [`DandelionPool`]. +/// +/// This function spawns the [`DandelionPool`] and returns [`DandelionPoolService`] which can be used to send +/// requests to the pool. +/// +/// ### Args +/// +/// - `buffer_size` is the size of the channel's buffer between the [`DandelionPoolService`] and [`DandelionPool`]. +/// - `dandelion_router` is the router service, kept generic instead of [`DandelionRouter`](crate::DandelionRouter) to allow +/// user to customise routing functionality. +/// - `backing_pool` is the backing transaction storage service +/// - `config` is [`DandelionConfig`]. +pub fn start_dandelion_pool( + buffer_size: usize, + dandelion_router: R, + backing_pool: P, + config: DandelionConfig, +) -> DandelionPoolService +where + Tx: Clone + Send + 'static, + TxID: Hash + Eq + Clone + Send + 'static, + PID: Hash + Eq + Clone + Send + 'static, + P: Service< + TxStoreRequest, + Response = TxStoreResponse, + Error = tower::BoxError, + > + Send + + 'static, + P::Future: Send + 'static, + R: Service, Response = State, Error = DandelionRouterError> + + Send + + 'static, + R::Future: Send + 'static, +{ + let (tx, rx) = mpsc::channel(buffer_size); + + let pool = DandelionPool { + dandelion_router, + backing_pool, + routing_set: JoinSet::new(), + stem_origins: HashMap::new(), + embargo_timers: DelayQueue::new(), + embargo_dist: Exp::new(1.0 / config.average_embargo_timeout().as_secs_f64()).unwrap(), + config, + _tx: PhantomData, + }; + + let span = tracing::debug_span!("dandelion_pool"); + + tokio::spawn(pool.run(rx).instrument(span)); + + DandelionPoolService { + tx: PollSender::new(tx), + } +} + +#[derive(Copy, Clone, Debug, thiserror::Error)] +#[error("The dandelion pool was shutdown")] +pub struct DandelionPoolShutDown; + +/// An incoming transaction for the [`DandelionPool`] to handle. +/// +/// Users may notice there is no way to check if the dandelion-pool wants a tx according to an inventory message like seen +/// in Bitcoin, only having a request for a full tx. Users should look in the *public* backing pool to handle inv messages, +/// and request txs even if they are in the stem pool. +pub struct IncomingTx { + /// The transaction. + /// + /// It is recommended to put this in an [`Arc`](std::sync::Arc) as it needs to be cloned to send to the backing + /// tx pool and [`DandelionRouter`](crate::DandelionRouter) + pub tx: Tx, + /// The transaction ID. + pub tx_id: TxID, + /// The routing state of this transaction. + pub tx_state: TxState, +} + +/// The dandelion tx pool service. +#[derive(Clone)] +pub struct DandelionPoolService { + /// The channel to [`DandelionPool`]. + tx: PollSender<(IncomingTx, oneshot::Sender<()>)>, +} + +impl Service> for DandelionPoolService +where + Tx: Clone + Send, + TxID: Hash + Eq + Clone + Send + 'static, + PID: Hash + Eq + Clone + Send + 'static, +{ + type Response = (); + type Error = DandelionPoolShutDown; + type Future = + Pin> + Send + 'static>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.tx.poll_reserve(cx).map_err(|_| DandelionPoolShutDown) + } + + fn call(&mut self, req: IncomingTx) -> Self::Future { + // although the channel isn't sending anything we want to wait for the request to be handled before continuing. + let (tx, rx) = oneshot::channel(); + + let res = self + .tx + .send_item((req, tx)) + .map_err(|_| DandelionPoolShutDown); + + async move { + res?; + rx.await.expect("Oneshot dropped before response!"); + + Ok(()) + } + .boxed() + } +} + +/// The dandelion++ tx pool. +/// +/// See the [module docs](self) for more. +pub struct DandelionPool { + /// The dandelion++ router + dandelion_router: R, + /// The backing tx storage. + backing_pool: P, + /// The set of tasks that are running the future returned from `dandelion_router`. + routing_set: JoinSet<(TxID, Result>)>, + + /// The origin of stem transactions. + stem_origins: HashMap>, + + /// Current stem pool embargo timers. + embargo_timers: DelayQueue, + /// The distrobution to sample to get embargo timers. + embargo_dist: Exp, + + /// The d++ config. + config: DandelionConfig, + + _tx: PhantomData, +} + +impl DandelionPool +where + Tx: Clone + Send, + TxID: Hash + Eq + Clone + Send + 'static, + PID: Hash + Eq + Clone + Send + 'static, + P: Service< + TxStoreRequest, + Response = TxStoreResponse, + Error = tower::BoxError, + >, + P::Future: Send + 'static, + R: Service, Response = State, Error = DandelionRouterError>, + R::Future: Send + 'static, +{ + /// Stores the tx in the backing pools stem pool, setting the embargo timer, stem origin and steming the tx. + async fn store_tx_and_stem( + &mut self, + tx: Tx, + tx_id: TxID, + from: Option, + ) -> Result<(), tower::BoxError> { + self.backing_pool + .ready() + .await? + .call(TxStoreRequest::Store( + tx.clone(), + tx_id.clone(), + State::Stem, + )) + .await?; + + let embargo_timer = self.embargo_dist.sample(&mut thread_rng()); + tracing::debug!( + "Setting embargo timer for stem tx: {} seconds.", + embargo_timer + ); + self.embargo_timers + .insert(tx_id.clone(), Duration::from_secs_f64(embargo_timer)); + + self.stem_tx(tx, tx_id, from).await + } + + /// Stems the tx, setting the stem origin, if it wasn't already set. + /// + /// This function does not add the tx to the backing pool. + async fn stem_tx( + &mut self, + tx: Tx, + tx_id: TxID, + from: Option, + ) -> Result<(), tower::BoxError> { + if let Some(peer) = &from { + self.stem_origins + .entry(tx_id.clone()) + .or_default() + .insert(peer.clone()); + } + + let state = from + .map(|from| TxState::Stem { from }) + .unwrap_or(TxState::Local); + + let fut = self + .dandelion_router + .ready() + .await? + .call(DandelionRouteReq { + tx, + state: state.clone(), + }); + + self.routing_set + .spawn(fut.map(|res| (tx_id, res.map_err(|_| state)))); + Ok(()) + } + + /// Stores the tx in the backing pool and fluffs the tx, removing the stem data for this tx. + async fn store_and_fluff_tx(&mut self, tx: Tx, tx_id: TxID) -> Result<(), tower::BoxError> { + // fluffs the tx first to prevent timing attacks where we could fluff at different average times + // depending on if the tx was in the stem pool already or not. + // Massively overkill but this is a minimal change. + self.fluff_tx(tx.clone(), tx_id.clone()).await?; + + // Remove the tx from the maps used during the stem phase. + self.stem_origins.remove(&tx_id); + + self.backing_pool + .ready() + .await? + .call(TxStoreRequest::Store(tx, tx_id, State::Fluff)) + .await?; + + // The key for this is *Not* the tx_id, it is given on insert, so just keep the timer in the + // map. These timers should be relatively short, so it shouldn't be a problem. + //self.embargo_timers.try_remove(&tx_id); + + Ok(()) + } + + /// Fluffs a tx, does not add the tx to the tx pool. + async fn fluff_tx(&mut self, tx: Tx, tx_id: TxID) -> Result<(), tower::BoxError> { + let fut = self + .dandelion_router + .ready() + .await? + .call(DandelionRouteReq { + tx, + state: TxState::Fluff, + }); + + self.routing_set + .spawn(fut.map(|res| (tx_id, res.map_err(|_| TxState::Fluff)))); + Ok(()) + } + + /// Function to handle an incoming [`DandelionPoolRequest::IncomingTx`]. + async fn handle_incoming_tx( + &mut self, + tx: Tx, + tx_state: TxState, + tx_id: TxID, + ) -> Result<(), tower::BoxError> { + let TxStoreResponse::Contains(have_tx) = self + .backing_pool + .ready() + .await? + .call(TxStoreRequest::Contains(tx_id.clone())) + .await? + else { + panic!("Backing tx pool responded with wrong response for request."); + }; + // If we have already fluffed this tx then we don't need to do anything. + if have_tx == Some(State::Fluff) { + tracing::debug!("Already fluffed incoming tx, ignoring."); + return Ok(()); + } + + match tx_state { + TxState::Stem { from } => { + if self + .stem_origins + .get(&tx_id) + .is_some_and(|peers| peers.contains(&from)) + { + tracing::debug!("Received stem tx twice from same peer, fluffing it"); + // The same peer sent us a tx twice, fluff it. + self.promote_and_fluff_tx(tx_id).await + } else { + // This could be a new tx or it could have already been stemed, but we still stem it again + // unless the same peer sends us a tx twice. + tracing::debug!("Steming incoming tx"); + self.store_tx_and_stem(tx, tx_id, Some(from)).await + } + } + TxState::Fluff => { + tracing::debug!("Fluffing incoming tx"); + self.store_and_fluff_tx(tx, tx_id).await + } + TxState::Local => { + // If we have already stemed this tx then nothing to do. + if have_tx.is_some() { + tracing::debug!("Received a local tx that we already have, skipping"); + return Ok(()); + } + tracing::debug!("Steming local transaction"); + self.store_tx_and_stem(tx, tx_id, None).await + } + } + } + + /// Promotes a tx to the clear pool. + async fn promote_tx(&mut self, tx_id: TxID) -> Result<(), tower::BoxError> { + // Remove the tx from the maps used during the stem phase. + self.stem_origins.remove(&tx_id); + + // The key for this is *Not* the tx_id, it is given on insert, so just keep the timer in the + // map. These timers should be relatively short, so it shouldn't be a problem. + //self.embargo_timers.try_remove(&tx_id); + + self.backing_pool + .ready() + .await? + .call(TxStoreRequest::Promote(tx_id)) + .await?; + + Ok(()) + } + + /// Promotes a tx to the public fluff pool and fluffs the tx. + async fn promote_and_fluff_tx(&mut self, tx_id: TxID) -> Result<(), tower::BoxError> { + tracing::debug!("Promoting transaction to public pool and fluffing it."); + + let TxStoreResponse::Transaction(tx) = self + .backing_pool + .ready() + .await? + .call(TxStoreRequest::Get(tx_id.clone())) + .await? + else { + panic!("Backing tx pool responded with wrong response for request."); + }; + + let Some((tx, state)) = tx else { + tracing::debug!("Could not find tx, skipping."); + return Ok(()); + }; + + if state == State::Fluff { + tracing::debug!("Transaction already fluffed, skipping."); + return Ok(()); + } + + self.promote_tx(tx_id.clone()).await?; + self.fluff_tx(tx, tx_id).await + } + + /// Returns a tx stored in the fluff _OR_ stem pool. + async fn get_tx_from_pool(&mut self, tx_id: TxID) -> Result, tower::BoxError> { + let TxStoreResponse::Transaction(tx) = self + .backing_pool + .ready() + .await? + .call(TxStoreRequest::Get(tx_id)) + .await? + else { + panic!("Backing tx pool responded with wrong response for request."); + }; + + Ok(tx.map(|tx| tx.0)) + } + + /// Starts the [`DandelionPool`]. + async fn run( + mut self, + mut rx: mpsc::Receiver<(IncomingTx, oneshot::Sender<()>)>, + ) { + tracing::debug!("Starting dandelion++ tx-pool, config: {:?}", self.config); + + // On start up we just fluff all txs left in the stem pool. + let Ok(TxStoreResponse::IDs(ids)) = (&mut self.backing_pool) + .oneshot(TxStoreRequest::IDsInStemPool) + .await + else { + tracing::error!("Failed to get transactions in stem pool."); + return; + }; + + tracing::debug!( + "Fluffing {} txs that are currently in the stem pool", + ids.len() + ); + + for id in ids { + if let Err(e) = self.promote_and_fluff_tx(id).await { + tracing::error!("Failed to fluff tx in the stem pool at start up, {e}."); + return; + } + } + + loop { + tracing::trace!("Waiting for next event."); + tokio::select! { + // biased to handle current txs before routing new ones. + biased; + Some(fired) = self.embargo_timers.next() => { + tracing::debug!("Embargo timer fired, did not see stem tx in time."); + + let tx_id = fired.into_inner(); + if let Err(e) = self.promote_and_fluff_tx(tx_id).await { + tracing::error!("Error handling fired embargo timer: {e}"); + return; + } + } + Some(Ok((tx_id, res))) = self.routing_set.join_next() => { + tracing::trace!("Received d++ routing result."); + + let res = match res { + Ok(State::Fluff) => { + tracing::debug!("Transaction was fluffed upgrading it to the public pool."); + self.promote_tx(tx_id).await + } + Err(tx_state) => { + tracing::debug!("Error routing transaction, trying again."); + + match self.get_tx_from_pool(tx_id.clone()).await { + Ok(Some(tx)) => match tx_state { + TxState::Fluff => self.fluff_tx(tx, tx_id).await, + TxState::Stem { from } => self.stem_tx(tx, tx_id, Some(from)).await, + TxState::Local => self.stem_tx(tx, tx_id, None).await, + } + Err(e) => Err(e), + _ => continue, + } + } + Ok(State::Stem) => continue, + }; + + if let Err(e) = res { + tracing::error!("Error handling transaction routing return: {e}"); + return; + } + } + req = rx.recv() => { + tracing::debug!("Received new tx to route."); + + let Some((IncomingTx { tx, tx_state, tx_id }, res_tx)) = req else { + return; + }; + + if let Err(e) = self.handle_incoming_tx(tx, tx_state, tx_id).await { + let _ = res_tx.send(()); + + tracing::error!("Error handling transaction in dandelion pool: {e}"); + return; + } + let _ = res_tx.send(()); + + } + } + } + } +} diff --git a/p2p/dandelion/src/router.rs b/p2p/dandelion/src/router.rs new file mode 100644 index 0000000..61e962c --- /dev/null +++ b/p2p/dandelion/src/router.rs @@ -0,0 +1,348 @@ +//! # Dandelion++ Router +//! +//! This module contains [`DandelionRouter`] which is a [`Service`]. It that handles keeping the +//! current dandelion++ [`State`] and deciding where to send transactions based on their [`TxState`]. +//! +//! ### What The Router Does Not Do +//! +//! It does not handle anything to do with keeping transactions long term, i.e. embargo timers and handling +//! loops in the stem. It is up to implementers to do this if they decide not top use [`DandelionPool`](crate::pool::DandelionPool) +//! +use std::{ + collections::HashMap, + future::Future, + hash::Hash, + marker::PhantomData, + pin::Pin, + task::{ready, Context, Poll}, + time::Instant, +}; + +use futures::TryFutureExt; +use rand::{distributions::Bernoulli, prelude::*, thread_rng}; +use tower::{ + discover::{Change, Discover}, + Service, +}; + +use crate::{ + traits::{DiffuseRequest, StemRequest}, + DandelionConfig, +}; + +/// An error returned from the [`DandelionRouter`] +#[derive(thiserror::Error, Debug)] +pub enum DandelionRouterError { + /// This error is probably recoverable so the request should be retried. + #[error("Peer chosen to route stem txs to had an err: {0}.")] + PeerError(tower::BoxError), + /// The broadcast service returned an error. + #[error("Broadcast service returned an err: {0}.")] + BroadcastError(tower::BoxError), + /// The outbound peer discoverer returned an error, this is critical. + #[error("The outbound peer discoverer returned an err: {0}.")] + OutboundPeerDiscoverError(tower::BoxError), + /// The outbound peer discoverer returned [`None`]. + #[error("The outbound peer discoverer exited.")] + OutboundPeerDiscoverExited, +} + +/// The dandelion++ state. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum State { + /// Fluff state, in this state we are diffusing stem transactions to all peers. + Fluff, + /// Stem state, in this state we are stemming stem transactions to a single outbound peer. + Stem, +} + +/// The routing state of a transaction. +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum TxState { + /// Fluff state. + Fluff, + /// Stem state. + Stem { + /// The peer who sent us this transaction's ID. + from: ID, + }, + /// Local - the transaction originated from our node. + Local, +} + +/// A request to route a transaction. +pub struct DandelionRouteReq { + /// The transaction. + pub tx: Tx, + /// The transaction state. + pub state: TxState, +} + +/// The dandelion router service. +pub struct DandelionRouter { + // pub(crate) is for tests + /// A [`Discover`] where we can get outbound peers from. + outbound_peer_discover: Pin>, + /// A [`Service`] which handle broadcasting (diffusing) transactions. + broadcast_svc: B, + + /// The current state. + current_state: State, + /// The time at which this epoch started. + epoch_start: Instant, + + /// The stem our local transactions will be sent to. + local_route: Option, + /// A [`HashMap`] linking peer's IDs to IDs in `stem_peers`. + stem_routes: HashMap, + /// Peers we are using for stemming. + /// + /// This will contain peers, even in [`State::Fluff`] to allow us to stem [`TxState::Local`] + /// transactions. + pub(crate) stem_peers: HashMap, + + /// The distribution to sample to get the [`State`], true is [`State::Fluff`]. + state_dist: Bernoulli, + + /// The config. + config: DandelionConfig, + + /// The routers tracing span. + span: tracing::Span, + + _tx: PhantomData, +} + +impl DandelionRouter +where + ID: Hash + Eq + Clone, + P: Discover, + B: Service, Error = tower::BoxError>, + S: Service, Error = tower::BoxError>, +{ + /// Creates a new [`DandelionRouter`], with the provided services and config. + /// + /// # Panics + /// This function panics if [`DandelionConfig::fluff_probability`] is not `0.0..=1.0`. + pub fn new(broadcast_svc: B, outbound_peer_discover: P, config: DandelionConfig) -> Self { + // get the current state + let state_dist = Bernoulli::new(config.fluff_probability) + .expect("Fluff probability was not between 0 and 1"); + + let current_state = if state_dist.sample(&mut thread_rng()) { + State::Fluff + } else { + State::Stem + }; + + DandelionRouter { + outbound_peer_discover: Box::pin(outbound_peer_discover), + broadcast_svc, + current_state, + epoch_start: Instant::now(), + local_route: None, + stem_routes: HashMap::new(), + stem_peers: HashMap::new(), + state_dist, + config, + span: tracing::debug_span!("dandelion_router", state = ?current_state), + _tx: PhantomData, + } + } + + /// This function gets the number of outbound peers from the [`Discover`] required for the selected [`Graph`](crate::Graph). + fn poll_prepare_graph( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> { + let peers_needed = match self.current_state { + State::Stem => self.config.number_of_stems(), + // When in the fluff state we only need one peer, the one for our txs. + State::Fluff => 1, + }; + + while self.stem_peers.len() < peers_needed { + match ready!(self + .outbound_peer_discover + .as_mut() + .poll_discover(cx) + .map_err(DandelionRouterError::OutboundPeerDiscoverError)) + .ok_or(DandelionRouterError::OutboundPeerDiscoverExited)?? + { + Change::Insert(key, svc) => { + self.stem_peers.insert(key, svc); + } + Change::Remove(key) => { + self.stem_peers.remove(&key); + } + } + } + + Poll::Ready(Ok(())) + } + + fn fluff_tx(&mut self, tx: Tx) -> B::Future { + self.broadcast_svc.call(DiffuseRequest(tx)) + } + + fn stem_tx(&mut self, tx: Tx, from: ID) -> S::Future { + loop { + let stem_route = self.stem_routes.entry(from.clone()).or_insert_with(|| { + self.stem_peers + .iter() + .choose(&mut thread_rng()) + .expect("No peers in `stem_peers` was poll_ready called?") + .0 + .clone() + }); + + let Some(peer) = self.stem_peers.get_mut(stem_route) else { + self.stem_routes.remove(&from); + continue; + }; + + return peer.call(StemRequest(tx)); + } + } + + fn stem_local_tx(&mut self, tx: Tx) -> S::Future { + loop { + let stem_route = self.local_route.get_or_insert_with(|| { + self.stem_peers + .iter() + .choose(&mut thread_rng()) + .expect("No peers in `stem_peers` was poll_ready called?") + .0 + .clone() + }); + + let Some(peer) = self.stem_peers.get_mut(stem_route) else { + self.local_route.take(); + continue; + }; + + return peer.call(StemRequest(tx)); + } + } +} + +/* +## Generics ## + +Tx: The tx type +ID: Peer Id type - unique identifier for nodes. +P: Peer Set discover - where we can get outbound peers from +B: Broadcast service - where we send txs to get diffused. +S: The Peer service - handles routing messages to a single node. + */ +impl Service> for DandelionRouter +where + ID: Hash + Eq + Clone, + P: Discover, + B: Service, Error = tower::BoxError>, + B::Future: Send + 'static, + S: Service, Error = tower::BoxError>, + S::Future: Send + 'static, +{ + type Response = State; + type Error = DandelionRouterError; + type Future = + Pin> + Send + 'static>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.epoch_start.elapsed() > self.config.epoch_duration { + // clear all the stem routing data. + self.stem_peers.clear(); + self.stem_routes.clear(); + self.local_route.take(); + + self.current_state = if self.state_dist.sample(&mut thread_rng()) { + State::Fluff + } else { + State::Stem + }; + + self.span + .record("state", format!("{:?}", self.current_state)); + tracing::debug!(parent: &self.span, "Starting new d++ epoch",); + + self.epoch_start = Instant::now(); + } + + let mut peers_pending = false; + + let span = &self.span; + + self.stem_peers + .retain(|_, peer_svc| match peer_svc.poll_ready(cx) { + Poll::Ready(res) => res + .inspect_err(|e| { + tracing::debug!( + parent: span, + "Peer returned an error on `poll_ready`: {e}, removing from router.", + ) + }) + .is_ok(), + Poll::Pending => { + // Pending peers should be kept - they have not errored yet. + peers_pending = true; + true + } + }); + + if peers_pending { + return Poll::Pending; + } + + // now we have removed the failed peers check if we still have enough for the graph chosen. + ready!(self.poll_prepare_graph(cx)?); + + ready!(self + .broadcast_svc + .poll_ready(cx) + .map_err(DandelionRouterError::BroadcastError)?); + + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: DandelionRouteReq) -> Self::Future { + tracing::trace!(parent: &self.span, "Handling route request."); + + match req.state { + TxState::Fluff => Box::pin( + self.fluff_tx(req.tx) + .map_ok(|_| State::Fluff) + .map_err(DandelionRouterError::BroadcastError), + ), + TxState::Stem { from } => match self.current_state { + State::Fluff => { + tracing::debug!(parent: &self.span, "Fluffing stem tx."); + + Box::pin( + self.fluff_tx(req.tx) + .map_ok(|_| State::Fluff) + .map_err(DandelionRouterError::BroadcastError), + ) + } + State::Stem => { + tracing::trace!(parent: &self.span, "Steming transaction"); + + Box::pin( + self.stem_tx(req.tx, from) + .map_ok(|_| State::Stem) + .map_err(DandelionRouterError::PeerError), + ) + } + }, + TxState::Local => { + tracing::debug!(parent: &self.span, "Steming local tx."); + + Box::pin( + self.stem_local_tx(req.tx) + .map_ok(|_| State::Stem) + .map_err(DandelionRouterError::PeerError), + ) + } + } + } +} diff --git a/p2p/dandelion/src/tests/mod.rs b/p2p/dandelion/src/tests/mod.rs new file mode 100644 index 0000000..1f3ba3e --- /dev/null +++ b/p2p/dandelion/src/tests/mod.rs @@ -0,0 +1,130 @@ +mod pool; +mod router; + +use std::{collections::HashMap, future::Future, hash::Hash, sync::Arc}; + +use futures::TryStreamExt; +use tokio::sync::mpsc::{self, UnboundedReceiver}; +use tower::{ + discover::{Discover, ServiceList}, + util::service_fn, + Service, ServiceExt, +}; + +use crate::{ + traits::{TxStoreRequest, TxStoreResponse}, + State, +}; + +pub fn mock_discover_svc() -> ( + impl Discover< + Key = usize, + Service = impl Service< + Req, + Future = impl Future> + Send + 'static, + Error = tower::BoxError, + > + Send + + 'static, + Error = tower::BoxError, + >, + UnboundedReceiver<(u64, Req)>, +) { + let (tx, rx) = mpsc::unbounded_channel(); + + let discover = ServiceList::new((0..).map(move |i| { + let tx_2 = tx.clone(); + + service_fn(move |req| { + tx_2.send((i, req)).unwrap(); + + async move { Ok::<(), tower::BoxError>(()) } + }) + })) + .map_err(Into::into); + + (discover, rx) +} + +pub fn mock_broadcast_svc() -> ( + impl Service< + Req, + Future = impl Future> + Send + 'static, + Error = tower::BoxError, + > + Send + + 'static, + UnboundedReceiver, +) { + let (tx, rx) = mpsc::unbounded_channel(); + + ( + service_fn(move |req| { + tx.send(req).unwrap(); + + async move { Ok::<(), tower::BoxError>(()) } + }), + rx, + ) +} + +#[allow(clippy::type_complexity)] // just test code. +pub fn mock_in_memory_backing_pool< + Tx: Clone + Send + 'static, + TxID: Clone + Hash + Eq + Send + 'static, +>() -> ( + impl Service< + TxStoreRequest, + Response = TxStoreResponse, + Future = impl Future, tower::BoxError>> + + Send + + 'static, + Error = tower::BoxError, + > + Send + + 'static, + Arc>>, +) { + let txs = Arc::new(std::sync::Mutex::new(HashMap::new())); + let txs_2 = txs.clone(); + + ( + service_fn(move |req: TxStoreRequest| { + let txs = txs.clone(); + async move { + match req { + TxStoreRequest::Store(tx, tx_id, state) => { + txs.lock().unwrap().insert(tx_id, (tx, state)); + Ok(TxStoreResponse::Ok) + } + TxStoreRequest::Get(tx_id) => { + let tx_state = txs.lock().unwrap().get(&tx_id).cloned(); + Ok(TxStoreResponse::Transaction(tx_state)) + } + TxStoreRequest::Contains(tx_id) => Ok(TxStoreResponse::Contains( + txs.lock().unwrap().get(&tx_id).map(|res| res.1), + )), + TxStoreRequest::IDsInStemPool => { + // horribly inefficient, but it's test code :) + let ids = txs + .lock() + .unwrap() + .iter() + .filter(|(_, (_, state))| matches!(state, State::Stem)) + .map(|tx| tx.0.clone()) + .collect::>(); + + Ok(TxStoreResponse::IDs(ids)) + } + TxStoreRequest::Promote(tx_id) => { + let _ = txs + .lock() + .unwrap() + .get_mut(&tx_id) + .map(|tx| tx.1 = State::Fluff); + + Ok(TxStoreResponse::Ok) + } + } + } + }), + txs_2, + ) +} diff --git a/p2p/dandelion/src/tests/pool.rs b/p2p/dandelion/src/tests/pool.rs new file mode 100644 index 0000000..4a7c87d --- /dev/null +++ b/p2p/dandelion/src/tests/pool.rs @@ -0,0 +1,42 @@ +use std::time::Duration; + +use crate::{ + pool::{start_dandelion_pool, IncomingTx}, + DandelionConfig, DandelionRouter, Graph, TxState, +}; + +use super::*; + +#[tokio::test] +async fn basic_functionality() { + let config = DandelionConfig { + time_between_hop: Duration::from_millis(175), + epoch_duration: Duration::from_secs(0), // make every poll ready change state + fluff_probability: 0.2, + graph: Graph::FourRegular, + }; + + let (broadcast_svc, mut broadcast_rx) = mock_broadcast_svc(); + let (outbound_peer_svc, _outbound_rx) = mock_discover_svc(); + + let router = DandelionRouter::new(broadcast_svc, outbound_peer_svc, config); + + let (pool_svc, pool) = mock_in_memory_backing_pool(); + + let mut pool_svc = start_dandelion_pool(15, router, pool_svc, config); + + pool_svc + .ready() + .await + .unwrap() + .call(IncomingTx { + tx: 0_usize, + tx_id: 1_usize, + tx_state: TxState::Fluff, + }) + .await + .unwrap(); + + assert!(pool.lock().unwrap().contains_key(&1)); + assert!(broadcast_rx.try_recv().is_ok()) +} diff --git a/p2p/dandelion/src/tests/router.rs b/p2p/dandelion/src/tests/router.rs new file mode 100644 index 0000000..0170edb --- /dev/null +++ b/p2p/dandelion/src/tests/router.rs @@ -0,0 +1,237 @@ +use std::time::Duration; + +use tower::{Service, ServiceExt}; + +use crate::{DandelionConfig, DandelionRouteReq, DandelionRouter, Graph, TxState}; + +use super::*; + +/// make sure the number of stemm peers is correct. +#[tokio::test] +async fn number_stems_correct() { + let mut config = DandelionConfig { + time_between_hop: Duration::from_millis(175), + epoch_duration: Duration::from_secs(60_000), + fluff_probability: 0.0, // we want to be in stem state + graph: Graph::FourRegular, + }; + + let (broadcast_svc, _broadcast_rx) = mock_broadcast_svc(); + let (outbound_peer_svc, _outbound_rx) = mock_discover_svc(); + + let mut router = DandelionRouter::new(broadcast_svc, outbound_peer_svc, config); + + const FROM_PEER: usize = 20; + + // send a request to make the generic bound inference work, without specifying types. + router + .ready() + .await + .unwrap() + .call(DandelionRouteReq { + tx: 0_usize, + state: TxState::Stem { from: FROM_PEER }, + }) + .await + .unwrap(); + + assert_eq!(router.stem_peers.len(), 2); // Graph::FourRegular + + config.graph = Graph::Line; + + let (broadcast_svc, _broadcast_rx) = mock_broadcast_svc(); + let (outbound_peer_svc, _outbound_rx) = mock_discover_svc(); + + let mut router = DandelionRouter::new(broadcast_svc, outbound_peer_svc, config); + + // send a request to make the generic bound inference work, without specifying types. + router + .ready() + .await + .unwrap() + .call(DandelionRouteReq { + tx: 0_usize, + state: TxState::Stem { from: FROM_PEER }, + }) + .await + .unwrap(); + + assert_eq!(router.stem_peers.len(), 1); // Graph::Line +} + +/// make sure a tx from the same peer goes to the same peer. +#[tokio::test] +async fn routes_consistent() { + let config = DandelionConfig { + time_between_hop: Duration::from_millis(175), + epoch_duration: Duration::from_secs(60_000), + fluff_probability: 0.0, // we want this test to always stem + graph: Graph::FourRegular, + }; + + let (broadcast_svc, mut broadcast_rx) = mock_broadcast_svc(); + let (outbound_peer_svc, mut outbound_rx) = mock_discover_svc(); + + let mut router = DandelionRouter::new(broadcast_svc, outbound_peer_svc, config); + + const FROM_PEER: usize = 20; + + // The router will panic if it attempts to flush. + broadcast_rx.close(); + + for _ in 0..30 { + router + .ready() + .await + .unwrap() + .call(DandelionRouteReq { + tx: 0_usize, + state: TxState::Stem { from: FROM_PEER }, + }) + .await + .unwrap(); + } + + let mut stem_peer = None; + let mut total_txs = 0; + + while let Ok((peer_id, _)) = outbound_rx.try_recv() { + let stem_peer = stem_peer.get_or_insert(peer_id); + // make sure all peer ids are the same (so the same svc got all txs). + assert_eq!(*stem_peer, peer_id); + + total_txs += 1; + } + + assert_eq!(total_txs, 30); +} + +/// make sure local txs always stem - even in fluff state. +#[tokio::test] +async fn local_always_stem() { + let config = DandelionConfig { + time_between_hop: Duration::from_millis(175), + epoch_duration: Duration::from_secs(60_000), + fluff_probability: 1.0, // we want this test to always fluff + graph: Graph::FourRegular, + }; + + let (broadcast_svc, mut broadcast_rx) = mock_broadcast_svc(); + let (outbound_peer_svc, mut outbound_rx) = mock_discover_svc(); + + let mut router = DandelionRouter::new(broadcast_svc, outbound_peer_svc, config); + + // The router will panic if it attempts to flush. + broadcast_rx.close(); + + for _ in 0..30 { + router + .ready() + .await + .unwrap() + .call(DandelionRouteReq { + tx: 0_usize, + state: TxState::Local, + }) + .await + .unwrap(); + } + + let mut stem_peer = None; + let mut total_txs = 0; + + while let Ok((peer_id, _)) = outbound_rx.try_recv() { + let stem_peer = stem_peer.get_or_insert(peer_id); + // make sure all peer ids are the same (so the same svc got all txs). + assert_eq!(*stem_peer, peer_id); + + total_txs += 1; + } + + assert_eq!(total_txs, 30); +} + +/// make sure local txs always stem - even in fluff state. +#[tokio::test] +async fn stem_txs_fluff_in_state_fluff() { + let config = DandelionConfig { + time_between_hop: Duration::from_millis(175), + epoch_duration: Duration::from_secs(60_000), + fluff_probability: 1.0, // we want this test to always fluff + graph: Graph::FourRegular, + }; + + let (broadcast_svc, mut broadcast_rx) = mock_broadcast_svc(); + let (outbound_peer_svc, mut outbound_rx) = mock_discover_svc(); + + let mut router = DandelionRouter::new(broadcast_svc, outbound_peer_svc, config); + + const FROM_PEER: usize = 20; + + // The router will panic if it attempts to stem. + outbound_rx.close(); + + for _ in 0..30 { + router + .ready() + .await + .unwrap() + .call(DandelionRouteReq { + tx: 0_usize, + state: TxState::Stem { from: FROM_PEER }, + }) + .await + .unwrap(); + } + + let mut total_txs = 0; + + while broadcast_rx.try_recv().is_ok() { + total_txs += 1; + } + + assert_eq!(total_txs, 30); +} + +/// make sure we get all txs sent to the router out in a stem or a fluff. +#[tokio::test] +async fn random_routing() { + let config = DandelionConfig { + time_between_hop: Duration::from_millis(175), + epoch_duration: Duration::from_secs(0), // make every poll ready change state + fluff_probability: 0.2, + graph: Graph::FourRegular, + }; + + let (broadcast_svc, mut broadcast_rx) = mock_broadcast_svc(); + let (outbound_peer_svc, mut outbound_rx) = mock_discover_svc(); + + let mut router = DandelionRouter::new(broadcast_svc, outbound_peer_svc, config); + + for _ in 0..3000 { + router + .ready() + .await + .unwrap() + .call(DandelionRouteReq { + tx: 0_usize, + state: TxState::Stem { + from: rand::random(), + }, + }) + .await + .unwrap(); + } + + let mut total_txs = 0; + + while broadcast_rx.try_recv().is_ok() { + total_txs += 1; + } + + while outbound_rx.try_recv().is_ok() { + total_txs += 1; + } + + assert_eq!(total_txs, 3000); +} diff --git a/p2p/dandelion/src/traits.rs b/p2p/dandelion/src/traits.rs new file mode 100644 index 0000000..c84ecf0 --- /dev/null +++ b/p2p/dandelion/src/traits.rs @@ -0,0 +1,49 @@ +/// A request to diffuse a transaction to all connected peers. +/// +/// This crate does not handle diffusion it is left to implementers. +pub struct DiffuseRequest(pub Tx); + +/// A request sent to a single peer to stem this transaction. +pub struct StemRequest(pub Tx); + +#[cfg(feature = "txpool")] +/// A request sent to the backing transaction pool storage. +pub enum TxStoreRequest { + /// A request to store a transaction with the ID to store it under and the pool to store it in. + /// + /// If the tx is already in the pool then do nothing, unless the tx is in the stem pool then move it + /// to the fluff pool, _if this request state is fluff_. + Store(Tx, TxID, crate::State), + /// A request to retrieve a `Tx` with the given ID from the pool, should not remove that tx from the pool. + /// + /// Must return [`TxStoreResponse::Transaction`] + Get(TxID), + /// Promote a transaction from the stem pool to the public pool. + /// + /// If the tx is already in the fluff pool do nothing. + /// + /// This should not error if the tx isn't in the pool at all. + Promote(TxID), + /// A request to check if a translation is in the pool. + /// + /// Must return [`TxStoreResponse::Contains`] + Contains(TxID), + /// Returns the IDs of all the transaction in the stem pool. + /// + /// Must return [`TxStoreResponse::IDs`] + IDsInStemPool, +} + +#[cfg(feature = "txpool")] +/// A response sent back from the backing transaction pool. +pub enum TxStoreResponse { + /// A generic ok response. + Ok, + /// A response containing a [`Option`] for if the transaction is in the pool (Some) or not (None) and in which pool + /// the tx is in. + Contains(Option), + /// A response containing a requested transaction. + Transaction(Option<(Tx, crate::State)>), + /// A list of transaction IDs. + IDs(Vec), +}