mirror of
https://github.com/Cuprate/cuprate.git
synced 2024-11-16 15:58:17 +00:00
dandelion++ lib (#111)
* init D++ * init D++ router * working D++ router * add test * D++ tx pool * add more txpool docs * add a txpool builder * add tracing * add more docs * fix doc * reduce test epoch (windows CI fail) * generate first state in config Windows seems to not allows taking a big value from an instant * extend tests * clippy * review comments + more docs * Apply suggestions from code review Co-authored-by: hinto-janai <hinto.janai@protonmail.com> * update Cargo.lock * rename txpool.rs -> pool.rs * review comments * Update p2p/dandelion/src/tests/router.rs Co-authored-by: hinto-janai <hinto.janai@protonmail.com> * Update p2p/dandelion/src/router.rs Co-authored-by: hinto-janai <hinto.janai@protonmail.com> --------- Co-authored-by: hinto-janai <hinto.janai@protonmail.com>
This commit is contained in:
parent
10d327ca13
commit
75306babf8
11 changed files with 1579 additions and 0 deletions
15
Cargo.lock
generated
15
Cargo.lock
generated
|
@ -656,6 +656,21 @@ dependencies = [
|
||||||
"zeroize",
|
"zeroize",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "dandelion_tower"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"futures",
|
||||||
|
"proptest",
|
||||||
|
"rand",
|
||||||
|
"rand_distr",
|
||||||
|
"thiserror",
|
||||||
|
"tokio",
|
||||||
|
"tokio-util",
|
||||||
|
"tower",
|
||||||
|
"tracing",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "diff"
|
name = "diff"
|
||||||
version = "0.1.13"
|
version = "0.1.13"
|
||||||
|
|
|
@ -11,6 +11,7 @@ members = [
|
||||||
"net/fixed-bytes",
|
"net/fixed-bytes",
|
||||||
"net/levin",
|
"net/levin",
|
||||||
"net/monero-wire",
|
"net/monero-wire",
|
||||||
|
"p2p/dandelion",
|
||||||
"p2p/monero-p2p",
|
"p2p/monero-p2p",
|
||||||
"p2p/address-book",
|
"p2p/address-book",
|
||||||
"pruning",
|
"pruning",
|
||||||
|
@ -60,6 +61,7 @@ paste = { version = "1.0.14", default-features = false }
|
||||||
pin-project = { version = "1.1.3", default-features = false }
|
pin-project = { version = "1.1.3", default-features = false }
|
||||||
randomx-rs = { git = "https://github.com/Cuprate/randomx-rs.git", rev = "0028464", default-features = false }
|
randomx-rs = { git = "https://github.com/Cuprate/randomx-rs.git", rev = "0028464", default-features = false }
|
||||||
rand = { version = "0.8.5", default-features = false }
|
rand = { version = "0.8.5", default-features = false }
|
||||||
|
rand_distr = { version = "0.4.3", default-features = false }
|
||||||
rayon = { version = "1.9.0", default-features = false }
|
rayon = { version = "1.9.0", default-features = false }
|
||||||
serde_bytes = { version = "0.11.12", default-features = false }
|
serde_bytes = { version = "0.11.12", default-features = false }
|
||||||
serde_json = { version = "1.0.108", default-features = false }
|
serde_json = { version = "1.0.108", default-features = false }
|
||||||
|
|
27
p2p/dandelion/Cargo.toml
Normal file
27
p2p/dandelion/Cargo.toml
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
[package]
|
||||||
|
name = "dandelion_tower"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
license = "MIT"
|
||||||
|
authors = ["Boog900"]
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = ["txpool"]
|
||||||
|
txpool = ["dep:rand_distr", "dep:tokio-util", "dep:tokio"]
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
tower = { workspace = true, features = ["discover", "util"] }
|
||||||
|
tracing = { workspace = true, features = ["std"] }
|
||||||
|
|
||||||
|
futures = { workspace = true, features = ["std"] }
|
||||||
|
tokio = { workspace = true, features = ["rt", "sync", "macros"], optional = true}
|
||||||
|
tokio-util = { workspace = true, features = ["time"], optional = true }
|
||||||
|
|
||||||
|
rand = { workspace = true, features = ["std", "std_rng"] }
|
||||||
|
rand_distr = { workspace = true, features = ["std"], optional = true }
|
||||||
|
|
||||||
|
thiserror = { workspace = true }
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "sync"] }
|
||||||
|
proptest = { workspace = true, features = ["default"] }
|
149
p2p/dandelion/src/config.rs
Normal file
149
p2p/dandelion/src/config.rs
Normal file
|
@ -0,0 +1,149 @@
|
||||||
|
use std::{
|
||||||
|
ops::{Mul, Neg},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// When calculating the embargo timeout using the formula: `(-k*(k-1)*hop)/(2*log(1-ep))`
|
||||||
|
///
|
||||||
|
/// (1 - ep) is the probability that a transaction travels for `k` hops before a nodes embargo timeout fires, this constant is (1 - ep).
|
||||||
|
const EMBARGO_FULL_TRAVEL_PROBABILITY: f64 = 0.90;
|
||||||
|
|
||||||
|
/// The graph type to use for dandelion routing, the dandelion paper recommends [Graph::FourRegular].
|
||||||
|
///
|
||||||
|
/// The decision between line graphs and 4-regular graphs depend on the priorities of the system, if
|
||||||
|
/// linkability of transactions is a first order concern then line graphs may be better, however 4-regular graphs
|
||||||
|
/// can give constant-order privacy benefits against adversaries with knowledge of the graph.
|
||||||
|
///
|
||||||
|
/// See appendix C of the dandelion++ paper.
|
||||||
|
#[derive(Default, Debug, Copy, Clone)]
|
||||||
|
pub enum Graph {
|
||||||
|
/// Line graph.
|
||||||
|
///
|
||||||
|
/// When this is selected one peer will be chosen from the outbound peers each epoch to route transactions
|
||||||
|
/// to.
|
||||||
|
///
|
||||||
|
/// In general this is not recommend over [`Graph::FourRegular`] but may be better for certain systems.
|
||||||
|
Line,
|
||||||
|
/// Quasi-4-Regular.
|
||||||
|
///
|
||||||
|
/// When this is selected two peers will be chosen from the outbound peers each epoch, each stem transaction
|
||||||
|
/// received will then be sent to one of these two peers. Transactions from the same node will always go to the
|
||||||
|
/// same peer.
|
||||||
|
#[default]
|
||||||
|
FourRegular,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The config used to initialize dandelion.
|
||||||
|
///
|
||||||
|
/// One notable missing item from the config is `Tbase` AKA the timeout parameter to prevent black hole
|
||||||
|
/// attacks. This is removed from the config for simplicity, `Tbase` is calculated using the formula provided
|
||||||
|
/// in the D++ paper:
|
||||||
|
///
|
||||||
|
/// `(-k*(k-1)*hop)/(2*log(1-ep))`
|
||||||
|
///
|
||||||
|
/// Where `k` is calculated from the fluff probability, `hop` is `time_between_hop` and `ep` is fixed at `0.1`.
|
||||||
|
///
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub struct DandelionConfig {
|
||||||
|
/// The time it takes for a stem transaction to pass through a node, including network latency.
|
||||||
|
///
|
||||||
|
/// It's better to be safe and put a slightly higher value than lower.
|
||||||
|
pub time_between_hop: Duration,
|
||||||
|
/// The duration of an epoch.
|
||||||
|
pub epoch_duration: Duration,
|
||||||
|
/// `q` in the dandelion paper, this is the probability that a node will be in the fluff state for
|
||||||
|
/// a certain epoch.
|
||||||
|
///
|
||||||
|
/// The dandelion paper recommends to make this value small, but the smaller this value, the higher
|
||||||
|
/// the broadcast latency.
|
||||||
|
///
|
||||||
|
/// It is recommended for this value to be <= `0.2`, this value *MUST* be in range `0.0..=1.0`.
|
||||||
|
pub fluff_probability: f64,
|
||||||
|
/// The graph type.
|
||||||
|
pub graph: Graph,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DandelionConfig {
|
||||||
|
/// Returns the number of outbound peers to use to stem transactions.
|
||||||
|
///
|
||||||
|
/// This value depends on the [`Graph`] chosen.
|
||||||
|
pub fn number_of_stems(&self) -> usize {
|
||||||
|
match self.graph {
|
||||||
|
Graph::Line => 1,
|
||||||
|
Graph::FourRegular => 2,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the average embargo timeout, `Tbase` in the dandelion++ paper.
|
||||||
|
///
|
||||||
|
/// This is the average embargo timeout _only including this node_ with `k` nodes also putting an embargo timeout
|
||||||
|
/// using the exponential distribution, the average until one of them fluffs is `Tbase / k`.
|
||||||
|
pub fn average_embargo_timeout(&self) -> Duration {
|
||||||
|
// we set k equal to the expected stem length with this fluff probability.
|
||||||
|
let k = self.expected_stem_length();
|
||||||
|
let time_between_hop = self.time_between_hop.as_secs_f64();
|
||||||
|
|
||||||
|
Duration::from_secs_f64(
|
||||||
|
// (-k*(k-1)*hop)/(2*ln(1-ep))
|
||||||
|
((k.neg() * (k - 1.0) * time_between_hop)
|
||||||
|
/ EMBARGO_FULL_TRAVEL_PROBABILITY.ln().mul(2.0))
|
||||||
|
.ceil(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the expected length of a stem.
|
||||||
|
pub fn expected_stem_length(&self) -> f64 {
|
||||||
|
self.fluff_probability.recip()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::{
|
||||||
|
f64::consts::E,
|
||||||
|
ops::{Mul, Neg},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
|
use proptest::{prop_assert, proptest};
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn monerod_average_embargo_timeout() {
|
||||||
|
let cfg = DandelionConfig {
|
||||||
|
time_between_hop: Duration::from_millis(175),
|
||||||
|
epoch_duration: Default::default(),
|
||||||
|
fluff_probability: 0.125,
|
||||||
|
graph: Default::default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(cfg.average_embargo_timeout(), Duration::from_secs(47));
|
||||||
|
}
|
||||||
|
|
||||||
|
proptest! {
|
||||||
|
#[test]
|
||||||
|
fn embargo_full_travel_probablity_correct(time_between_hop in 1_u64..1_000_000, fluff_probability in 0.000001..1.0) {
|
||||||
|
let cfg = DandelionConfig {
|
||||||
|
time_between_hop: Duration::from_millis(time_between_hop),
|
||||||
|
epoch_duration: Default::default(),
|
||||||
|
fluff_probability,
|
||||||
|
graph: Default::default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// assert that the `average_embargo_timeout` is high enough that the probability of `k` nodes
|
||||||
|
// not diffusing before expected diffusion is greater than or equal to `EMBARGO_FULL_TRAVEL_PROBABLY`
|
||||||
|
//
|
||||||
|
// using the formula from in appendix B.5
|
||||||
|
let k = cfg.expected_stem_length();
|
||||||
|
let time_between_hop = cfg.time_between_hop.as_secs_f64();
|
||||||
|
|
||||||
|
let average_embargo_timeout = cfg.average_embargo_timeout().as_secs_f64();
|
||||||
|
|
||||||
|
let probability =
|
||||||
|
E.powf((k.neg() * (k - 1.0) * time_between_hop) / average_embargo_timeout.mul(2.0));
|
||||||
|
|
||||||
|
prop_assert!(probability >= EMBARGO_FULL_TRAVEL_PROBABILITY, "probability = {probability}, average_embargo_timeout = {average_embargo_timeout}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
70
p2p/dandelion/src/lib.rs
Normal file
70
p2p/dandelion/src/lib.rs
Normal file
|
@ -0,0 +1,70 @@
|
||||||
|
//! # Dandelion Tower
|
||||||
|
//!
|
||||||
|
//! This crate implements [dandelion++](https://arxiv.org/pdf/1805.11060.pdf), using [`tower`].
|
||||||
|
//!
|
||||||
|
//! This crate provides 2 [`tower::Service`]s, a [`DandelionRouter`] and a [`DandelionPool`](pool::DandelionPool).
|
||||||
|
//! The router is pretty minimal and only handles the absolute necessary data to route transactions, whereas the
|
||||||
|
//! pool keeps track of all data necessary for dandelion++ but requires you to provide a backing tx-pool.
|
||||||
|
//!
|
||||||
|
//! This split was done not because the [`DandelionPool`](pool::DandelionPool) is unnecessary but because it is hard
|
||||||
|
//! to cover a wide range of projects when abstracting over the tx-pool. Not using the [`DandelionPool`](pool::DandelionPool)
|
||||||
|
//! requires you to implement part of the paper yourself.
|
||||||
|
//!
|
||||||
|
//! # Features
|
||||||
|
//!
|
||||||
|
//! This crate only has one feature `txpool` which enables [`DandelionPool`](pool::DandelionPool).
|
||||||
|
//!
|
||||||
|
//! # Needed Services
|
||||||
|
//!
|
||||||
|
//! To use this crate you need to provide a few types.
|
||||||
|
//!
|
||||||
|
//! ## Diffuse Service
|
||||||
|
//!
|
||||||
|
//! This service should implement diffusion, which is sending the transaction to every peer, with each peer
|
||||||
|
//! having a timer using the exponential distribution and batch sending all txs that were queued in that time.
|
||||||
|
//!
|
||||||
|
//! The diffuse service should have a request of [`DiffuseRequest`](traits::DiffuseRequest) and it's error
|
||||||
|
//! should be [`tower::BoxError`].
|
||||||
|
//!
|
||||||
|
//! ## Outbound Peer Discoverer
|
||||||
|
//!
|
||||||
|
//! The outbound peer [`Discover`](tower::discover::Discover) should provide a stream of randomly selected outbound
|
||||||
|
//! peers, these peers will then be used to route stem txs to.
|
||||||
|
//!
|
||||||
|
//! The peers will not be returned anywhere, so it is recommended to wrap them in some sort of drop guard that returns
|
||||||
|
//! them back to a peer set.
|
||||||
|
//!
|
||||||
|
//! ## Peer Service
|
||||||
|
//!
|
||||||
|
//! This service represents a connection to an individual peer, this should be returned from the Outbound Peer
|
||||||
|
//! Discover. This should immediately send the transaction to the peer when requested, i.e. it should _not_ set
|
||||||
|
//! a timer.
|
||||||
|
//!
|
||||||
|
//! The diffuse service should have a request of [`StemRequest`](traits::StemRequest) and it's error
|
||||||
|
//! should be [`tower::BoxError`].
|
||||||
|
//!
|
||||||
|
//! ## Backing Pool
|
||||||
|
//!
|
||||||
|
//! ([`DandelionPool`](pool::DandelionPool) only)
|
||||||
|
//!
|
||||||
|
//! This service is a backing tx-pool, in memory or on disk.
|
||||||
|
//! The backing pool should have a request of [`TxStoreRequest`](traits::TxStoreRequest) and a response of
|
||||||
|
//! [`TxStoreResponse`](traits::TxStoreResponse), with an error of [`tower::BoxError`].
|
||||||
|
//!
|
||||||
|
//! Users should keep a handle to the backing pool to request data from it, when requesting data you _must_
|
||||||
|
//! make sure you only look in the public pool if you are going to be giving data to peers, as stem transactions
|
||||||
|
//! must stay private.
|
||||||
|
//!
|
||||||
|
//! When removing data, for example because of a new block, you can remove from both pools provided it doesn't leak
|
||||||
|
//! any data about stem transactions. You will probably want to set up a task that monitors the tx pool for stuck transactions,
|
||||||
|
//! transactions that slipped in just as one was removed etc, this crate does not handle that.
|
||||||
|
mod config;
|
||||||
|
#[cfg(feature = "txpool")]
|
||||||
|
pub mod pool;
|
||||||
|
mod router;
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests;
|
||||||
|
pub mod traits;
|
||||||
|
|
||||||
|
pub use config::*;
|
||||||
|
pub use router::*;
|
510
p2p/dandelion/src/pool.rs
Normal file
510
p2p/dandelion/src/pool.rs
Normal file
|
@ -0,0 +1,510 @@
|
||||||
|
//! # Dandelion++ Pool
|
||||||
|
//!
|
||||||
|
//! This module contains [`DandelionPool`] which is a thin wrapper around a backing transaction store,
|
||||||
|
//! which fully implements the dandelion++ protocol.
|
||||||
|
//!
|
||||||
|
//! ### How To Get Txs From [`DandelionPool`].
|
||||||
|
//!
|
||||||
|
//! [`DandelionPool`] does not provide a full tx-pool API. You cannot retrieve transactions from it or
|
||||||
|
//! check what transactions are in it, to do this you must keep a handle to the backing transaction store
|
||||||
|
//! yourself.
|
||||||
|
//!
|
||||||
|
//! The reason for this is, the [`DandelionPool`] will only itself be passing these requests onto the backing
|
||||||
|
//! pool, so it makes sense to remove the "middle man".
|
||||||
|
//!
|
||||||
|
//! ### Keep Stem Transactions Hidden
|
||||||
|
//!
|
||||||
|
//! When using your handle to the backing store it must be remembered to keep transactions in the stem pool hidden.
|
||||||
|
//! So handle any requests to the tx-pool like the stem side of the pool does not exist.
|
||||||
|
//!
|
||||||
|
use std::{
|
||||||
|
collections::{HashMap, HashSet},
|
||||||
|
future::Future,
|
||||||
|
hash::Hash,
|
||||||
|
marker::PhantomData,
|
||||||
|
pin::Pin,
|
||||||
|
task::{Context, Poll},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
|
use futures::{FutureExt, StreamExt};
|
||||||
|
use rand::prelude::*;
|
||||||
|
use rand_distr::Exp;
|
||||||
|
use tokio::{
|
||||||
|
sync::{mpsc, oneshot},
|
||||||
|
task::JoinSet,
|
||||||
|
};
|
||||||
|
use tokio_util::{sync::PollSender, time::DelayQueue};
|
||||||
|
use tower::{Service, ServiceExt};
|
||||||
|
use tracing::Instrument;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
traits::{TxStoreRequest, TxStoreResponse},
|
||||||
|
DandelionConfig, DandelionRouteReq, DandelionRouterError, State, TxState,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Start the [`DandelionPool`].
|
||||||
|
///
|
||||||
|
/// This function spawns the [`DandelionPool`] and returns [`DandelionPoolService`] which can be used to send
|
||||||
|
/// requests to the pool.
|
||||||
|
///
|
||||||
|
/// ### Args
|
||||||
|
///
|
||||||
|
/// - `buffer_size` is the size of the channel's buffer between the [`DandelionPoolService`] and [`DandelionPool`].
|
||||||
|
/// - `dandelion_router` is the router service, kept generic instead of [`DandelionRouter`](crate::DandelionRouter) to allow
|
||||||
|
/// user to customise routing functionality.
|
||||||
|
/// - `backing_pool` is the backing transaction storage service
|
||||||
|
/// - `config` is [`DandelionConfig`].
|
||||||
|
pub fn start_dandelion_pool<P, R, Tx, TxID, PID>(
|
||||||
|
buffer_size: usize,
|
||||||
|
dandelion_router: R,
|
||||||
|
backing_pool: P,
|
||||||
|
config: DandelionConfig,
|
||||||
|
) -> DandelionPoolService<Tx, TxID, PID>
|
||||||
|
where
|
||||||
|
Tx: Clone + Send + 'static,
|
||||||
|
TxID: Hash + Eq + Clone + Send + 'static,
|
||||||
|
PID: Hash + Eq + Clone + Send + 'static,
|
||||||
|
P: Service<
|
||||||
|
TxStoreRequest<Tx, TxID>,
|
||||||
|
Response = TxStoreResponse<Tx, TxID>,
|
||||||
|
Error = tower::BoxError,
|
||||||
|
> + Send
|
||||||
|
+ 'static,
|
||||||
|
P::Future: Send + 'static,
|
||||||
|
R: Service<DandelionRouteReq<Tx, PID>, Response = State, Error = DandelionRouterError>
|
||||||
|
+ Send
|
||||||
|
+ 'static,
|
||||||
|
R::Future: Send + 'static,
|
||||||
|
{
|
||||||
|
let (tx, rx) = mpsc::channel(buffer_size);
|
||||||
|
|
||||||
|
let pool = DandelionPool {
|
||||||
|
dandelion_router,
|
||||||
|
backing_pool,
|
||||||
|
routing_set: JoinSet::new(),
|
||||||
|
stem_origins: HashMap::new(),
|
||||||
|
embargo_timers: DelayQueue::new(),
|
||||||
|
embargo_dist: Exp::new(1.0 / config.average_embargo_timeout().as_secs_f64()).unwrap(),
|
||||||
|
config,
|
||||||
|
_tx: PhantomData,
|
||||||
|
};
|
||||||
|
|
||||||
|
let span = tracing::debug_span!("dandelion_pool");
|
||||||
|
|
||||||
|
tokio::spawn(pool.run(rx).instrument(span));
|
||||||
|
|
||||||
|
DandelionPoolService {
|
||||||
|
tx: PollSender::new(tx),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug, thiserror::Error)]
|
||||||
|
#[error("The dandelion pool was shutdown")]
|
||||||
|
pub struct DandelionPoolShutDown;
|
||||||
|
|
||||||
|
/// An incoming transaction for the [`DandelionPool`] to handle.
|
||||||
|
///
|
||||||
|
/// Users may notice there is no way to check if the dandelion-pool wants a tx according to an inventory message like seen
|
||||||
|
/// in Bitcoin, only having a request for a full tx. Users should look in the *public* backing pool to handle inv messages,
|
||||||
|
/// and request txs even if they are in the stem pool.
|
||||||
|
pub struct IncomingTx<Tx, TxID, PID> {
|
||||||
|
/// The transaction.
|
||||||
|
///
|
||||||
|
/// It is recommended to put this in an [`Arc`](std::sync::Arc) as it needs to be cloned to send to the backing
|
||||||
|
/// tx pool and [`DandelionRouter`](crate::DandelionRouter)
|
||||||
|
pub tx: Tx,
|
||||||
|
/// The transaction ID.
|
||||||
|
pub tx_id: TxID,
|
||||||
|
/// The routing state of this transaction.
|
||||||
|
pub tx_state: TxState<PID>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The dandelion tx pool service.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct DandelionPoolService<Tx, TxID, PID> {
|
||||||
|
/// The channel to [`DandelionPool`].
|
||||||
|
tx: PollSender<(IncomingTx<Tx, TxID, PID>, oneshot::Sender<()>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Tx, TxID, PID> Service<IncomingTx<Tx, TxID, PID>> for DandelionPoolService<Tx, TxID, PID>
|
||||||
|
where
|
||||||
|
Tx: Clone + Send,
|
||||||
|
TxID: Hash + Eq + Clone + Send + 'static,
|
||||||
|
PID: Hash + Eq + Clone + Send + 'static,
|
||||||
|
{
|
||||||
|
type Response = ();
|
||||||
|
type Error = DandelionPoolShutDown;
|
||||||
|
type Future =
|
||||||
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
self.tx.poll_reserve(cx).map_err(|_| DandelionPoolShutDown)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: IncomingTx<Tx, TxID, PID>) -> Self::Future {
|
||||||
|
// although the channel isn't sending anything we want to wait for the request to be handled before continuing.
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let res = self
|
||||||
|
.tx
|
||||||
|
.send_item((req, tx))
|
||||||
|
.map_err(|_| DandelionPoolShutDown);
|
||||||
|
|
||||||
|
async move {
|
||||||
|
res?;
|
||||||
|
rx.await.expect("Oneshot dropped before response!");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The dandelion++ tx pool.
|
||||||
|
///
|
||||||
|
/// See the [module docs](self) for more.
|
||||||
|
pub struct DandelionPool<P, R, Tx, TxID, PID> {
|
||||||
|
/// The dandelion++ router
|
||||||
|
dandelion_router: R,
|
||||||
|
/// The backing tx storage.
|
||||||
|
backing_pool: P,
|
||||||
|
/// The set of tasks that are running the future returned from `dandelion_router`.
|
||||||
|
routing_set: JoinSet<(TxID, Result<State, TxState<PID>>)>,
|
||||||
|
|
||||||
|
/// The origin of stem transactions.
|
||||||
|
stem_origins: HashMap<TxID, HashSet<PID>>,
|
||||||
|
|
||||||
|
/// Current stem pool embargo timers.
|
||||||
|
embargo_timers: DelayQueue<TxID>,
|
||||||
|
/// The distrobution to sample to get embargo timers.
|
||||||
|
embargo_dist: Exp<f64>,
|
||||||
|
|
||||||
|
/// The d++ config.
|
||||||
|
config: DandelionConfig,
|
||||||
|
|
||||||
|
_tx: PhantomData<Tx>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<P, R, Tx, TxID, PID> DandelionPool<P, R, Tx, TxID, PID>
|
||||||
|
where
|
||||||
|
Tx: Clone + Send,
|
||||||
|
TxID: Hash + Eq + Clone + Send + 'static,
|
||||||
|
PID: Hash + Eq + Clone + Send + 'static,
|
||||||
|
P: Service<
|
||||||
|
TxStoreRequest<Tx, TxID>,
|
||||||
|
Response = TxStoreResponse<Tx, TxID>,
|
||||||
|
Error = tower::BoxError,
|
||||||
|
>,
|
||||||
|
P::Future: Send + 'static,
|
||||||
|
R: Service<DandelionRouteReq<Tx, PID>, Response = State, Error = DandelionRouterError>,
|
||||||
|
R::Future: Send + 'static,
|
||||||
|
{
|
||||||
|
/// Stores the tx in the backing pools stem pool, setting the embargo timer, stem origin and steming the tx.
|
||||||
|
async fn store_tx_and_stem(
|
||||||
|
&mut self,
|
||||||
|
tx: Tx,
|
||||||
|
tx_id: TxID,
|
||||||
|
from: Option<PID>,
|
||||||
|
) -> Result<(), tower::BoxError> {
|
||||||
|
self.backing_pool
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(TxStoreRequest::Store(
|
||||||
|
tx.clone(),
|
||||||
|
tx_id.clone(),
|
||||||
|
State::Stem,
|
||||||
|
))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let embargo_timer = self.embargo_dist.sample(&mut thread_rng());
|
||||||
|
tracing::debug!(
|
||||||
|
"Setting embargo timer for stem tx: {} seconds.",
|
||||||
|
embargo_timer
|
||||||
|
);
|
||||||
|
self.embargo_timers
|
||||||
|
.insert(tx_id.clone(), Duration::from_secs_f64(embargo_timer));
|
||||||
|
|
||||||
|
self.stem_tx(tx, tx_id, from).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stems the tx, setting the stem origin, if it wasn't already set.
|
||||||
|
///
|
||||||
|
/// This function does not add the tx to the backing pool.
|
||||||
|
async fn stem_tx(
|
||||||
|
&mut self,
|
||||||
|
tx: Tx,
|
||||||
|
tx_id: TxID,
|
||||||
|
from: Option<PID>,
|
||||||
|
) -> Result<(), tower::BoxError> {
|
||||||
|
if let Some(peer) = &from {
|
||||||
|
self.stem_origins
|
||||||
|
.entry(tx_id.clone())
|
||||||
|
.or_default()
|
||||||
|
.insert(peer.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
let state = from
|
||||||
|
.map(|from| TxState::Stem { from })
|
||||||
|
.unwrap_or(TxState::Local);
|
||||||
|
|
||||||
|
let fut = self
|
||||||
|
.dandelion_router
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(DandelionRouteReq {
|
||||||
|
tx,
|
||||||
|
state: state.clone(),
|
||||||
|
});
|
||||||
|
|
||||||
|
self.routing_set
|
||||||
|
.spawn(fut.map(|res| (tx_id, res.map_err(|_| state))));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stores the tx in the backing pool and fluffs the tx, removing the stem data for this tx.
|
||||||
|
async fn store_and_fluff_tx(&mut self, tx: Tx, tx_id: TxID) -> Result<(), tower::BoxError> {
|
||||||
|
// fluffs the tx first to prevent timing attacks where we could fluff at different average times
|
||||||
|
// depending on if the tx was in the stem pool already or not.
|
||||||
|
// Massively overkill but this is a minimal change.
|
||||||
|
self.fluff_tx(tx.clone(), tx_id.clone()).await?;
|
||||||
|
|
||||||
|
// Remove the tx from the maps used during the stem phase.
|
||||||
|
self.stem_origins.remove(&tx_id);
|
||||||
|
|
||||||
|
self.backing_pool
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(TxStoreRequest::Store(tx, tx_id, State::Fluff))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// The key for this is *Not* the tx_id, it is given on insert, so just keep the timer in the
|
||||||
|
// map. These timers should be relatively short, so it shouldn't be a problem.
|
||||||
|
//self.embargo_timers.try_remove(&tx_id);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fluffs a tx, does not add the tx to the tx pool.
|
||||||
|
async fn fluff_tx(&mut self, tx: Tx, tx_id: TxID) -> Result<(), tower::BoxError> {
|
||||||
|
let fut = self
|
||||||
|
.dandelion_router
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(DandelionRouteReq {
|
||||||
|
tx,
|
||||||
|
state: TxState::Fluff,
|
||||||
|
});
|
||||||
|
|
||||||
|
self.routing_set
|
||||||
|
.spawn(fut.map(|res| (tx_id, res.map_err(|_| TxState::Fluff))));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Function to handle an incoming [`DandelionPoolRequest::IncomingTx`].
|
||||||
|
async fn handle_incoming_tx(
|
||||||
|
&mut self,
|
||||||
|
tx: Tx,
|
||||||
|
tx_state: TxState<PID>,
|
||||||
|
tx_id: TxID,
|
||||||
|
) -> Result<(), tower::BoxError> {
|
||||||
|
let TxStoreResponse::Contains(have_tx) = self
|
||||||
|
.backing_pool
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(TxStoreRequest::Contains(tx_id.clone()))
|
||||||
|
.await?
|
||||||
|
else {
|
||||||
|
panic!("Backing tx pool responded with wrong response for request.");
|
||||||
|
};
|
||||||
|
// If we have already fluffed this tx then we don't need to do anything.
|
||||||
|
if have_tx == Some(State::Fluff) {
|
||||||
|
tracing::debug!("Already fluffed incoming tx, ignoring.");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
match tx_state {
|
||||||
|
TxState::Stem { from } => {
|
||||||
|
if self
|
||||||
|
.stem_origins
|
||||||
|
.get(&tx_id)
|
||||||
|
.is_some_and(|peers| peers.contains(&from))
|
||||||
|
{
|
||||||
|
tracing::debug!("Received stem tx twice from same peer, fluffing it");
|
||||||
|
// The same peer sent us a tx twice, fluff it.
|
||||||
|
self.promote_and_fluff_tx(tx_id).await
|
||||||
|
} else {
|
||||||
|
// This could be a new tx or it could have already been stemed, but we still stem it again
|
||||||
|
// unless the same peer sends us a tx twice.
|
||||||
|
tracing::debug!("Steming incoming tx");
|
||||||
|
self.store_tx_and_stem(tx, tx_id, Some(from)).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
TxState::Fluff => {
|
||||||
|
tracing::debug!("Fluffing incoming tx");
|
||||||
|
self.store_and_fluff_tx(tx, tx_id).await
|
||||||
|
}
|
||||||
|
TxState::Local => {
|
||||||
|
// If we have already stemed this tx then nothing to do.
|
||||||
|
if have_tx.is_some() {
|
||||||
|
tracing::debug!("Received a local tx that we already have, skipping");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
tracing::debug!("Steming local transaction");
|
||||||
|
self.store_tx_and_stem(tx, tx_id, None).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Promotes a tx to the clear pool.
|
||||||
|
async fn promote_tx(&mut self, tx_id: TxID) -> Result<(), tower::BoxError> {
|
||||||
|
// Remove the tx from the maps used during the stem phase.
|
||||||
|
self.stem_origins.remove(&tx_id);
|
||||||
|
|
||||||
|
// The key for this is *Not* the tx_id, it is given on insert, so just keep the timer in the
|
||||||
|
// map. These timers should be relatively short, so it shouldn't be a problem.
|
||||||
|
//self.embargo_timers.try_remove(&tx_id);
|
||||||
|
|
||||||
|
self.backing_pool
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(TxStoreRequest::Promote(tx_id))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Promotes a tx to the public fluff pool and fluffs the tx.
|
||||||
|
async fn promote_and_fluff_tx(&mut self, tx_id: TxID) -> Result<(), tower::BoxError> {
|
||||||
|
tracing::debug!("Promoting transaction to public pool and fluffing it.");
|
||||||
|
|
||||||
|
let TxStoreResponse::Transaction(tx) = self
|
||||||
|
.backing_pool
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(TxStoreRequest::Get(tx_id.clone()))
|
||||||
|
.await?
|
||||||
|
else {
|
||||||
|
panic!("Backing tx pool responded with wrong response for request.");
|
||||||
|
};
|
||||||
|
|
||||||
|
let Some((tx, state)) = tx else {
|
||||||
|
tracing::debug!("Could not find tx, skipping.");
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
if state == State::Fluff {
|
||||||
|
tracing::debug!("Transaction already fluffed, skipping.");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
self.promote_tx(tx_id.clone()).await?;
|
||||||
|
self.fluff_tx(tx, tx_id).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a tx stored in the fluff _OR_ stem pool.
|
||||||
|
async fn get_tx_from_pool(&mut self, tx_id: TxID) -> Result<Option<Tx>, tower::BoxError> {
|
||||||
|
let TxStoreResponse::Transaction(tx) = self
|
||||||
|
.backing_pool
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(TxStoreRequest::Get(tx_id))
|
||||||
|
.await?
|
||||||
|
else {
|
||||||
|
panic!("Backing tx pool responded with wrong response for request.");
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(tx.map(|tx| tx.0))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Starts the [`DandelionPool`].
|
||||||
|
async fn run(
|
||||||
|
mut self,
|
||||||
|
mut rx: mpsc::Receiver<(IncomingTx<Tx, TxID, PID>, oneshot::Sender<()>)>,
|
||||||
|
) {
|
||||||
|
tracing::debug!("Starting dandelion++ tx-pool, config: {:?}", self.config);
|
||||||
|
|
||||||
|
// On start up we just fluff all txs left in the stem pool.
|
||||||
|
let Ok(TxStoreResponse::IDs(ids)) = (&mut self.backing_pool)
|
||||||
|
.oneshot(TxStoreRequest::IDsInStemPool)
|
||||||
|
.await
|
||||||
|
else {
|
||||||
|
tracing::error!("Failed to get transactions in stem pool.");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
tracing::debug!(
|
||||||
|
"Fluffing {} txs that are currently in the stem pool",
|
||||||
|
ids.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
for id in ids {
|
||||||
|
if let Err(e) = self.promote_and_fluff_tx(id).await {
|
||||||
|
tracing::error!("Failed to fluff tx in the stem pool at start up, {e}.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tracing::trace!("Waiting for next event.");
|
||||||
|
tokio::select! {
|
||||||
|
// biased to handle current txs before routing new ones.
|
||||||
|
biased;
|
||||||
|
Some(fired) = self.embargo_timers.next() => {
|
||||||
|
tracing::debug!("Embargo timer fired, did not see stem tx in time.");
|
||||||
|
|
||||||
|
let tx_id = fired.into_inner();
|
||||||
|
if let Err(e) = self.promote_and_fluff_tx(tx_id).await {
|
||||||
|
tracing::error!("Error handling fired embargo timer: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(Ok((tx_id, res))) = self.routing_set.join_next() => {
|
||||||
|
tracing::trace!("Received d++ routing result.");
|
||||||
|
|
||||||
|
let res = match res {
|
||||||
|
Ok(State::Fluff) => {
|
||||||
|
tracing::debug!("Transaction was fluffed upgrading it to the public pool.");
|
||||||
|
self.promote_tx(tx_id).await
|
||||||
|
}
|
||||||
|
Err(tx_state) => {
|
||||||
|
tracing::debug!("Error routing transaction, trying again.");
|
||||||
|
|
||||||
|
match self.get_tx_from_pool(tx_id.clone()).await {
|
||||||
|
Ok(Some(tx)) => match tx_state {
|
||||||
|
TxState::Fluff => self.fluff_tx(tx, tx_id).await,
|
||||||
|
TxState::Stem { from } => self.stem_tx(tx, tx_id, Some(from)).await,
|
||||||
|
TxState::Local => self.stem_tx(tx, tx_id, None).await,
|
||||||
|
}
|
||||||
|
Err(e) => Err(e),
|
||||||
|
_ => continue,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(State::Stem) => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = res {
|
||||||
|
tracing::error!("Error handling transaction routing return: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
req = rx.recv() => {
|
||||||
|
tracing::debug!("Received new tx to route.");
|
||||||
|
|
||||||
|
let Some((IncomingTx { tx, tx_state, tx_id }, res_tx)) = req else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = self.handle_incoming_tx(tx, tx_state, tx_id).await {
|
||||||
|
let _ = res_tx.send(());
|
||||||
|
|
||||||
|
tracing::error!("Error handling transaction in dandelion pool: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let _ = res_tx.send(());
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
348
p2p/dandelion/src/router.rs
Normal file
348
p2p/dandelion/src/router.rs
Normal file
|
@ -0,0 +1,348 @@
|
||||||
|
//! # Dandelion++ Router
|
||||||
|
//!
|
||||||
|
//! This module contains [`DandelionRouter`] which is a [`Service`]. It that handles keeping the
|
||||||
|
//! current dandelion++ [`State`] and deciding where to send transactions based on their [`TxState`].
|
||||||
|
//!
|
||||||
|
//! ### What The Router Does Not Do
|
||||||
|
//!
|
||||||
|
//! It does not handle anything to do with keeping transactions long term, i.e. embargo timers and handling
|
||||||
|
//! loops in the stem. It is up to implementers to do this if they decide not top use [`DandelionPool`](crate::pool::DandelionPool)
|
||||||
|
//!
|
||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
future::Future,
|
||||||
|
hash::Hash,
|
||||||
|
marker::PhantomData,
|
||||||
|
pin::Pin,
|
||||||
|
task::{ready, Context, Poll},
|
||||||
|
time::Instant,
|
||||||
|
};
|
||||||
|
|
||||||
|
use futures::TryFutureExt;
|
||||||
|
use rand::{distributions::Bernoulli, prelude::*, thread_rng};
|
||||||
|
use tower::{
|
||||||
|
discover::{Change, Discover},
|
||||||
|
Service,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
traits::{DiffuseRequest, StemRequest},
|
||||||
|
DandelionConfig,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// An error returned from the [`DandelionRouter`]
|
||||||
|
#[derive(thiserror::Error, Debug)]
|
||||||
|
pub enum DandelionRouterError {
|
||||||
|
/// This error is probably recoverable so the request should be retried.
|
||||||
|
#[error("Peer chosen to route stem txs to had an err: {0}.")]
|
||||||
|
PeerError(tower::BoxError),
|
||||||
|
/// The broadcast service returned an error.
|
||||||
|
#[error("Broadcast service returned an err: {0}.")]
|
||||||
|
BroadcastError(tower::BoxError),
|
||||||
|
/// The outbound peer discoverer returned an error, this is critical.
|
||||||
|
#[error("The outbound peer discoverer returned an err: {0}.")]
|
||||||
|
OutboundPeerDiscoverError(tower::BoxError),
|
||||||
|
/// The outbound peer discoverer returned [`None`].
|
||||||
|
#[error("The outbound peer discoverer exited.")]
|
||||||
|
OutboundPeerDiscoverExited,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The dandelion++ state.
|
||||||
|
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
|
||||||
|
pub enum State {
|
||||||
|
/// Fluff state, in this state we are diffusing stem transactions to all peers.
|
||||||
|
Fluff,
|
||||||
|
/// Stem state, in this state we are stemming stem transactions to a single outbound peer.
|
||||||
|
Stem,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The routing state of a transaction.
|
||||||
|
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||||
|
pub enum TxState<ID> {
|
||||||
|
/// Fluff state.
|
||||||
|
Fluff,
|
||||||
|
/// Stem state.
|
||||||
|
Stem {
|
||||||
|
/// The peer who sent us this transaction's ID.
|
||||||
|
from: ID,
|
||||||
|
},
|
||||||
|
/// Local - the transaction originated from our node.
|
||||||
|
Local,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A request to route a transaction.
|
||||||
|
pub struct DandelionRouteReq<Tx, ID> {
|
||||||
|
/// The transaction.
|
||||||
|
pub tx: Tx,
|
||||||
|
/// The transaction state.
|
||||||
|
pub state: TxState<ID>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The dandelion router service.
|
||||||
|
pub struct DandelionRouter<P, B, ID, S, Tx> {
|
||||||
|
// pub(crate) is for tests
|
||||||
|
/// A [`Discover`] where we can get outbound peers from.
|
||||||
|
outbound_peer_discover: Pin<Box<P>>,
|
||||||
|
/// A [`Service`] which handle broadcasting (diffusing) transactions.
|
||||||
|
broadcast_svc: B,
|
||||||
|
|
||||||
|
/// The current state.
|
||||||
|
current_state: State,
|
||||||
|
/// The time at which this epoch started.
|
||||||
|
epoch_start: Instant,
|
||||||
|
|
||||||
|
/// The stem our local transactions will be sent to.
|
||||||
|
local_route: Option<ID>,
|
||||||
|
/// A [`HashMap`] linking peer's IDs to IDs in `stem_peers`.
|
||||||
|
stem_routes: HashMap<ID, ID>,
|
||||||
|
/// Peers we are using for stemming.
|
||||||
|
///
|
||||||
|
/// This will contain peers, even in [`State::Fluff`] to allow us to stem [`TxState::Local`]
|
||||||
|
/// transactions.
|
||||||
|
pub(crate) stem_peers: HashMap<ID, S>,
|
||||||
|
|
||||||
|
/// The distribution to sample to get the [`State`], true is [`State::Fluff`].
|
||||||
|
state_dist: Bernoulli,
|
||||||
|
|
||||||
|
/// The config.
|
||||||
|
config: DandelionConfig,
|
||||||
|
|
||||||
|
/// The routers tracing span.
|
||||||
|
span: tracing::Span,
|
||||||
|
|
||||||
|
_tx: PhantomData<Tx>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Tx, ID, P, B, S> DandelionRouter<P, B, ID, S, Tx>
|
||||||
|
where
|
||||||
|
ID: Hash + Eq + Clone,
|
||||||
|
P: Discover<Key = ID, Service = S, Error = tower::BoxError>,
|
||||||
|
B: Service<DiffuseRequest<Tx>, Error = tower::BoxError>,
|
||||||
|
S: Service<StemRequest<Tx>, Error = tower::BoxError>,
|
||||||
|
{
|
||||||
|
/// Creates a new [`DandelionRouter`], with the provided services and config.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
/// This function panics if [`DandelionConfig::fluff_probability`] is not `0.0..=1.0`.
|
||||||
|
pub fn new(broadcast_svc: B, outbound_peer_discover: P, config: DandelionConfig) -> Self {
|
||||||
|
// get the current state
|
||||||
|
let state_dist = Bernoulli::new(config.fluff_probability)
|
||||||
|
.expect("Fluff probability was not between 0 and 1");
|
||||||
|
|
||||||
|
let current_state = if state_dist.sample(&mut thread_rng()) {
|
||||||
|
State::Fluff
|
||||||
|
} else {
|
||||||
|
State::Stem
|
||||||
|
};
|
||||||
|
|
||||||
|
DandelionRouter {
|
||||||
|
outbound_peer_discover: Box::pin(outbound_peer_discover),
|
||||||
|
broadcast_svc,
|
||||||
|
current_state,
|
||||||
|
epoch_start: Instant::now(),
|
||||||
|
local_route: None,
|
||||||
|
stem_routes: HashMap::new(),
|
||||||
|
stem_peers: HashMap::new(),
|
||||||
|
state_dist,
|
||||||
|
config,
|
||||||
|
span: tracing::debug_span!("dandelion_router", state = ?current_state),
|
||||||
|
_tx: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This function gets the number of outbound peers from the [`Discover`] required for the selected [`Graph`](crate::Graph).
|
||||||
|
fn poll_prepare_graph(
|
||||||
|
&mut self,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<Result<(), DandelionRouterError>> {
|
||||||
|
let peers_needed = match self.current_state {
|
||||||
|
State::Stem => self.config.number_of_stems(),
|
||||||
|
// When in the fluff state we only need one peer, the one for our txs.
|
||||||
|
State::Fluff => 1,
|
||||||
|
};
|
||||||
|
|
||||||
|
while self.stem_peers.len() < peers_needed {
|
||||||
|
match ready!(self
|
||||||
|
.outbound_peer_discover
|
||||||
|
.as_mut()
|
||||||
|
.poll_discover(cx)
|
||||||
|
.map_err(DandelionRouterError::OutboundPeerDiscoverError))
|
||||||
|
.ok_or(DandelionRouterError::OutboundPeerDiscoverExited)??
|
||||||
|
{
|
||||||
|
Change::Insert(key, svc) => {
|
||||||
|
self.stem_peers.insert(key, svc);
|
||||||
|
}
|
||||||
|
Change::Remove(key) => {
|
||||||
|
self.stem_peers.remove(&key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn fluff_tx(&mut self, tx: Tx) -> B::Future {
|
||||||
|
self.broadcast_svc.call(DiffuseRequest(tx))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stem_tx(&mut self, tx: Tx, from: ID) -> S::Future {
|
||||||
|
loop {
|
||||||
|
let stem_route = self.stem_routes.entry(from.clone()).or_insert_with(|| {
|
||||||
|
self.stem_peers
|
||||||
|
.iter()
|
||||||
|
.choose(&mut thread_rng())
|
||||||
|
.expect("No peers in `stem_peers` was poll_ready called?")
|
||||||
|
.0
|
||||||
|
.clone()
|
||||||
|
});
|
||||||
|
|
||||||
|
let Some(peer) = self.stem_peers.get_mut(stem_route) else {
|
||||||
|
self.stem_routes.remove(&from);
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
return peer.call(StemRequest(tx));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stem_local_tx(&mut self, tx: Tx) -> S::Future {
|
||||||
|
loop {
|
||||||
|
let stem_route = self.local_route.get_or_insert_with(|| {
|
||||||
|
self.stem_peers
|
||||||
|
.iter()
|
||||||
|
.choose(&mut thread_rng())
|
||||||
|
.expect("No peers in `stem_peers` was poll_ready called?")
|
||||||
|
.0
|
||||||
|
.clone()
|
||||||
|
});
|
||||||
|
|
||||||
|
let Some(peer) = self.stem_peers.get_mut(stem_route) else {
|
||||||
|
self.local_route.take();
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
return peer.call(StemRequest(tx));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
## Generics ##
|
||||||
|
|
||||||
|
Tx: The tx type
|
||||||
|
ID: Peer Id type - unique identifier for nodes.
|
||||||
|
P: Peer Set discover - where we can get outbound peers from
|
||||||
|
B: Broadcast service - where we send txs to get diffused.
|
||||||
|
S: The Peer service - handles routing messages to a single node.
|
||||||
|
*/
|
||||||
|
impl<Tx, ID, P, B, S> Service<DandelionRouteReq<Tx, ID>> for DandelionRouter<P, B, ID, S, Tx>
|
||||||
|
where
|
||||||
|
ID: Hash + Eq + Clone,
|
||||||
|
P: Discover<Key = ID, Service = S, Error = tower::BoxError>,
|
||||||
|
B: Service<DiffuseRequest<Tx>, Error = tower::BoxError>,
|
||||||
|
B::Future: Send + 'static,
|
||||||
|
S: Service<StemRequest<Tx>, Error = tower::BoxError>,
|
||||||
|
S::Future: Send + 'static,
|
||||||
|
{
|
||||||
|
type Response = State;
|
||||||
|
type Error = DandelionRouterError;
|
||||||
|
type Future =
|
||||||
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
if self.epoch_start.elapsed() > self.config.epoch_duration {
|
||||||
|
// clear all the stem routing data.
|
||||||
|
self.stem_peers.clear();
|
||||||
|
self.stem_routes.clear();
|
||||||
|
self.local_route.take();
|
||||||
|
|
||||||
|
self.current_state = if self.state_dist.sample(&mut thread_rng()) {
|
||||||
|
State::Fluff
|
||||||
|
} else {
|
||||||
|
State::Stem
|
||||||
|
};
|
||||||
|
|
||||||
|
self.span
|
||||||
|
.record("state", format!("{:?}", self.current_state));
|
||||||
|
tracing::debug!(parent: &self.span, "Starting new d++ epoch",);
|
||||||
|
|
||||||
|
self.epoch_start = Instant::now();
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut peers_pending = false;
|
||||||
|
|
||||||
|
let span = &self.span;
|
||||||
|
|
||||||
|
self.stem_peers
|
||||||
|
.retain(|_, peer_svc| match peer_svc.poll_ready(cx) {
|
||||||
|
Poll::Ready(res) => res
|
||||||
|
.inspect_err(|e| {
|
||||||
|
tracing::debug!(
|
||||||
|
parent: span,
|
||||||
|
"Peer returned an error on `poll_ready`: {e}, removing from router.",
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.is_ok(),
|
||||||
|
Poll::Pending => {
|
||||||
|
// Pending peers should be kept - they have not errored yet.
|
||||||
|
peers_pending = true;
|
||||||
|
true
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if peers_pending {
|
||||||
|
return Poll::Pending;
|
||||||
|
}
|
||||||
|
|
||||||
|
// now we have removed the failed peers check if we still have enough for the graph chosen.
|
||||||
|
ready!(self.poll_prepare_graph(cx)?);
|
||||||
|
|
||||||
|
ready!(self
|
||||||
|
.broadcast_svc
|
||||||
|
.poll_ready(cx)
|
||||||
|
.map_err(DandelionRouterError::BroadcastError)?);
|
||||||
|
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: DandelionRouteReq<Tx, ID>) -> Self::Future {
|
||||||
|
tracing::trace!(parent: &self.span, "Handling route request.");
|
||||||
|
|
||||||
|
match req.state {
|
||||||
|
TxState::Fluff => Box::pin(
|
||||||
|
self.fluff_tx(req.tx)
|
||||||
|
.map_ok(|_| State::Fluff)
|
||||||
|
.map_err(DandelionRouterError::BroadcastError),
|
||||||
|
),
|
||||||
|
TxState::Stem { from } => match self.current_state {
|
||||||
|
State::Fluff => {
|
||||||
|
tracing::debug!(parent: &self.span, "Fluffing stem tx.");
|
||||||
|
|
||||||
|
Box::pin(
|
||||||
|
self.fluff_tx(req.tx)
|
||||||
|
.map_ok(|_| State::Fluff)
|
||||||
|
.map_err(DandelionRouterError::BroadcastError),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
State::Stem => {
|
||||||
|
tracing::trace!(parent: &self.span, "Steming transaction");
|
||||||
|
|
||||||
|
Box::pin(
|
||||||
|
self.stem_tx(req.tx, from)
|
||||||
|
.map_ok(|_| State::Stem)
|
||||||
|
.map_err(DandelionRouterError::PeerError),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
TxState::Local => {
|
||||||
|
tracing::debug!(parent: &self.span, "Steming local tx.");
|
||||||
|
|
||||||
|
Box::pin(
|
||||||
|
self.stem_local_tx(req.tx)
|
||||||
|
.map_ok(|_| State::Stem)
|
||||||
|
.map_err(DandelionRouterError::PeerError),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
130
p2p/dandelion/src/tests/mod.rs
Normal file
130
p2p/dandelion/src/tests/mod.rs
Normal file
|
@ -0,0 +1,130 @@
|
||||||
|
mod pool;
|
||||||
|
mod router;
|
||||||
|
|
||||||
|
use std::{collections::HashMap, future::Future, hash::Hash, sync::Arc};
|
||||||
|
|
||||||
|
use futures::TryStreamExt;
|
||||||
|
use tokio::sync::mpsc::{self, UnboundedReceiver};
|
||||||
|
use tower::{
|
||||||
|
discover::{Discover, ServiceList},
|
||||||
|
util::service_fn,
|
||||||
|
Service, ServiceExt,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
traits::{TxStoreRequest, TxStoreResponse},
|
||||||
|
State,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub fn mock_discover_svc<Req: Send + 'static>() -> (
|
||||||
|
impl Discover<
|
||||||
|
Key = usize,
|
||||||
|
Service = impl Service<
|
||||||
|
Req,
|
||||||
|
Future = impl Future<Output = Result<(), tower::BoxError>> + Send + 'static,
|
||||||
|
Error = tower::BoxError,
|
||||||
|
> + Send
|
||||||
|
+ 'static,
|
||||||
|
Error = tower::BoxError,
|
||||||
|
>,
|
||||||
|
UnboundedReceiver<(u64, Req)>,
|
||||||
|
) {
|
||||||
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let discover = ServiceList::new((0..).map(move |i| {
|
||||||
|
let tx_2 = tx.clone();
|
||||||
|
|
||||||
|
service_fn(move |req| {
|
||||||
|
tx_2.send((i, req)).unwrap();
|
||||||
|
|
||||||
|
async move { Ok::<(), tower::BoxError>(()) }
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
.map_err(Into::into);
|
||||||
|
|
||||||
|
(discover, rx)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn mock_broadcast_svc<Req: Send + 'static>() -> (
|
||||||
|
impl Service<
|
||||||
|
Req,
|
||||||
|
Future = impl Future<Output = Result<(), tower::BoxError>> + Send + 'static,
|
||||||
|
Error = tower::BoxError,
|
||||||
|
> + Send
|
||||||
|
+ 'static,
|
||||||
|
UnboundedReceiver<Req>,
|
||||||
|
) {
|
||||||
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
(
|
||||||
|
service_fn(move |req| {
|
||||||
|
tx.send(req).unwrap();
|
||||||
|
|
||||||
|
async move { Ok::<(), tower::BoxError>(()) }
|
||||||
|
}),
|
||||||
|
rx,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::type_complexity)] // just test code.
|
||||||
|
pub fn mock_in_memory_backing_pool<
|
||||||
|
Tx: Clone + Send + 'static,
|
||||||
|
TxID: Clone + Hash + Eq + Send + 'static,
|
||||||
|
>() -> (
|
||||||
|
impl Service<
|
||||||
|
TxStoreRequest<Tx, TxID>,
|
||||||
|
Response = TxStoreResponse<Tx, TxID>,
|
||||||
|
Future = impl Future<Output = Result<TxStoreResponse<Tx, TxID>, tower::BoxError>>
|
||||||
|
+ Send
|
||||||
|
+ 'static,
|
||||||
|
Error = tower::BoxError,
|
||||||
|
> + Send
|
||||||
|
+ 'static,
|
||||||
|
Arc<std::sync::Mutex<HashMap<TxID, (Tx, State)>>>,
|
||||||
|
) {
|
||||||
|
let txs = Arc::new(std::sync::Mutex::new(HashMap::new()));
|
||||||
|
let txs_2 = txs.clone();
|
||||||
|
|
||||||
|
(
|
||||||
|
service_fn(move |req: TxStoreRequest<Tx, TxID>| {
|
||||||
|
let txs = txs.clone();
|
||||||
|
async move {
|
||||||
|
match req {
|
||||||
|
TxStoreRequest::Store(tx, tx_id, state) => {
|
||||||
|
txs.lock().unwrap().insert(tx_id, (tx, state));
|
||||||
|
Ok(TxStoreResponse::Ok)
|
||||||
|
}
|
||||||
|
TxStoreRequest::Get(tx_id) => {
|
||||||
|
let tx_state = txs.lock().unwrap().get(&tx_id).cloned();
|
||||||
|
Ok(TxStoreResponse::Transaction(tx_state))
|
||||||
|
}
|
||||||
|
TxStoreRequest::Contains(tx_id) => Ok(TxStoreResponse::Contains(
|
||||||
|
txs.lock().unwrap().get(&tx_id).map(|res| res.1),
|
||||||
|
)),
|
||||||
|
TxStoreRequest::IDsInStemPool => {
|
||||||
|
// horribly inefficient, but it's test code :)
|
||||||
|
let ids = txs
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.iter()
|
||||||
|
.filter(|(_, (_, state))| matches!(state, State::Stem))
|
||||||
|
.map(|tx| tx.0.clone())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
Ok(TxStoreResponse::IDs(ids))
|
||||||
|
}
|
||||||
|
TxStoreRequest::Promote(tx_id) => {
|
||||||
|
let _ = txs
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.get_mut(&tx_id)
|
||||||
|
.map(|tx| tx.1 = State::Fluff);
|
||||||
|
|
||||||
|
Ok(TxStoreResponse::Ok)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
txs_2,
|
||||||
|
)
|
||||||
|
}
|
42
p2p/dandelion/src/tests/pool.rs
Normal file
42
p2p/dandelion/src/tests/pool.rs
Normal file
|
@ -0,0 +1,42 @@
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
pool::{start_dandelion_pool, IncomingTx},
|
||||||
|
DandelionConfig, DandelionRouter, Graph, TxState,
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn basic_functionality() {
|
||||||
|
let config = DandelionConfig {
|
||||||
|
time_between_hop: Duration::from_millis(175),
|
||||||
|
epoch_duration: Duration::from_secs(0), // make every poll ready change state
|
||||||
|
fluff_probability: 0.2,
|
||||||
|
graph: Graph::FourRegular,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (broadcast_svc, mut broadcast_rx) = mock_broadcast_svc();
|
||||||
|
let (outbound_peer_svc, _outbound_rx) = mock_discover_svc();
|
||||||
|
|
||||||
|
let router = DandelionRouter::new(broadcast_svc, outbound_peer_svc, config);
|
||||||
|
|
||||||
|
let (pool_svc, pool) = mock_in_memory_backing_pool();
|
||||||
|
|
||||||
|
let mut pool_svc = start_dandelion_pool(15, router, pool_svc, config);
|
||||||
|
|
||||||
|
pool_svc
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.call(IncomingTx {
|
||||||
|
tx: 0_usize,
|
||||||
|
tx_id: 1_usize,
|
||||||
|
tx_state: TxState::Fluff,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert!(pool.lock().unwrap().contains_key(&1));
|
||||||
|
assert!(broadcast_rx.try_recv().is_ok())
|
||||||
|
}
|
237
p2p/dandelion/src/tests/router.rs
Normal file
237
p2p/dandelion/src/tests/router.rs
Normal file
|
@ -0,0 +1,237 @@
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
|
use crate::{DandelionConfig, DandelionRouteReq, DandelionRouter, Graph, TxState};
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
/// make sure the number of stemm peers is correct.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn number_stems_correct() {
|
||||||
|
let mut config = DandelionConfig {
|
||||||
|
time_between_hop: Duration::from_millis(175),
|
||||||
|
epoch_duration: Duration::from_secs(60_000),
|
||||||
|
fluff_probability: 0.0, // we want to be in stem state
|
||||||
|
graph: Graph::FourRegular,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (broadcast_svc, _broadcast_rx) = mock_broadcast_svc();
|
||||||
|
let (outbound_peer_svc, _outbound_rx) = mock_discover_svc();
|
||||||
|
|
||||||
|
let mut router = DandelionRouter::new(broadcast_svc, outbound_peer_svc, config);
|
||||||
|
|
||||||
|
const FROM_PEER: usize = 20;
|
||||||
|
|
||||||
|
// send a request to make the generic bound inference work, without specifying types.
|
||||||
|
router
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.call(DandelionRouteReq {
|
||||||
|
tx: 0_usize,
|
||||||
|
state: TxState::Stem { from: FROM_PEER },
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(router.stem_peers.len(), 2); // Graph::FourRegular
|
||||||
|
|
||||||
|
config.graph = Graph::Line;
|
||||||
|
|
||||||
|
let (broadcast_svc, _broadcast_rx) = mock_broadcast_svc();
|
||||||
|
let (outbound_peer_svc, _outbound_rx) = mock_discover_svc();
|
||||||
|
|
||||||
|
let mut router = DandelionRouter::new(broadcast_svc, outbound_peer_svc, config);
|
||||||
|
|
||||||
|
// send a request to make the generic bound inference work, without specifying types.
|
||||||
|
router
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.call(DandelionRouteReq {
|
||||||
|
tx: 0_usize,
|
||||||
|
state: TxState::Stem { from: FROM_PEER },
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(router.stem_peers.len(), 1); // Graph::Line
|
||||||
|
}
|
||||||
|
|
||||||
|
/// make sure a tx from the same peer goes to the same peer.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn routes_consistent() {
|
||||||
|
let config = DandelionConfig {
|
||||||
|
time_between_hop: Duration::from_millis(175),
|
||||||
|
epoch_duration: Duration::from_secs(60_000),
|
||||||
|
fluff_probability: 0.0, // we want this test to always stem
|
||||||
|
graph: Graph::FourRegular,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (broadcast_svc, mut broadcast_rx) = mock_broadcast_svc();
|
||||||
|
let (outbound_peer_svc, mut outbound_rx) = mock_discover_svc();
|
||||||
|
|
||||||
|
let mut router = DandelionRouter::new(broadcast_svc, outbound_peer_svc, config);
|
||||||
|
|
||||||
|
const FROM_PEER: usize = 20;
|
||||||
|
|
||||||
|
// The router will panic if it attempts to flush.
|
||||||
|
broadcast_rx.close();
|
||||||
|
|
||||||
|
for _ in 0..30 {
|
||||||
|
router
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.call(DandelionRouteReq {
|
||||||
|
tx: 0_usize,
|
||||||
|
state: TxState::Stem { from: FROM_PEER },
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut stem_peer = None;
|
||||||
|
let mut total_txs = 0;
|
||||||
|
|
||||||
|
while let Ok((peer_id, _)) = outbound_rx.try_recv() {
|
||||||
|
let stem_peer = stem_peer.get_or_insert(peer_id);
|
||||||
|
// make sure all peer ids are the same (so the same svc got all txs).
|
||||||
|
assert_eq!(*stem_peer, peer_id);
|
||||||
|
|
||||||
|
total_txs += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(total_txs, 30);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// make sure local txs always stem - even in fluff state.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn local_always_stem() {
|
||||||
|
let config = DandelionConfig {
|
||||||
|
time_between_hop: Duration::from_millis(175),
|
||||||
|
epoch_duration: Duration::from_secs(60_000),
|
||||||
|
fluff_probability: 1.0, // we want this test to always fluff
|
||||||
|
graph: Graph::FourRegular,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (broadcast_svc, mut broadcast_rx) = mock_broadcast_svc();
|
||||||
|
let (outbound_peer_svc, mut outbound_rx) = mock_discover_svc();
|
||||||
|
|
||||||
|
let mut router = DandelionRouter::new(broadcast_svc, outbound_peer_svc, config);
|
||||||
|
|
||||||
|
// The router will panic if it attempts to flush.
|
||||||
|
broadcast_rx.close();
|
||||||
|
|
||||||
|
for _ in 0..30 {
|
||||||
|
router
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.call(DandelionRouteReq {
|
||||||
|
tx: 0_usize,
|
||||||
|
state: TxState::Local,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut stem_peer = None;
|
||||||
|
let mut total_txs = 0;
|
||||||
|
|
||||||
|
while let Ok((peer_id, _)) = outbound_rx.try_recv() {
|
||||||
|
let stem_peer = stem_peer.get_or_insert(peer_id);
|
||||||
|
// make sure all peer ids are the same (so the same svc got all txs).
|
||||||
|
assert_eq!(*stem_peer, peer_id);
|
||||||
|
|
||||||
|
total_txs += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(total_txs, 30);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// make sure local txs always stem - even in fluff state.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn stem_txs_fluff_in_state_fluff() {
|
||||||
|
let config = DandelionConfig {
|
||||||
|
time_between_hop: Duration::from_millis(175),
|
||||||
|
epoch_duration: Duration::from_secs(60_000),
|
||||||
|
fluff_probability: 1.0, // we want this test to always fluff
|
||||||
|
graph: Graph::FourRegular,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (broadcast_svc, mut broadcast_rx) = mock_broadcast_svc();
|
||||||
|
let (outbound_peer_svc, mut outbound_rx) = mock_discover_svc();
|
||||||
|
|
||||||
|
let mut router = DandelionRouter::new(broadcast_svc, outbound_peer_svc, config);
|
||||||
|
|
||||||
|
const FROM_PEER: usize = 20;
|
||||||
|
|
||||||
|
// The router will panic if it attempts to stem.
|
||||||
|
outbound_rx.close();
|
||||||
|
|
||||||
|
for _ in 0..30 {
|
||||||
|
router
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.call(DandelionRouteReq {
|
||||||
|
tx: 0_usize,
|
||||||
|
state: TxState::Stem { from: FROM_PEER },
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut total_txs = 0;
|
||||||
|
|
||||||
|
while broadcast_rx.try_recv().is_ok() {
|
||||||
|
total_txs += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(total_txs, 30);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// make sure we get all txs sent to the router out in a stem or a fluff.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn random_routing() {
|
||||||
|
let config = DandelionConfig {
|
||||||
|
time_between_hop: Duration::from_millis(175),
|
||||||
|
epoch_duration: Duration::from_secs(0), // make every poll ready change state
|
||||||
|
fluff_probability: 0.2,
|
||||||
|
graph: Graph::FourRegular,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (broadcast_svc, mut broadcast_rx) = mock_broadcast_svc();
|
||||||
|
let (outbound_peer_svc, mut outbound_rx) = mock_discover_svc();
|
||||||
|
|
||||||
|
let mut router = DandelionRouter::new(broadcast_svc, outbound_peer_svc, config);
|
||||||
|
|
||||||
|
for _ in 0..3000 {
|
||||||
|
router
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.call(DandelionRouteReq {
|
||||||
|
tx: 0_usize,
|
||||||
|
state: TxState::Stem {
|
||||||
|
from: rand::random(),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut total_txs = 0;
|
||||||
|
|
||||||
|
while broadcast_rx.try_recv().is_ok() {
|
||||||
|
total_txs += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
while outbound_rx.try_recv().is_ok() {
|
||||||
|
total_txs += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(total_txs, 3000);
|
||||||
|
}
|
49
p2p/dandelion/src/traits.rs
Normal file
49
p2p/dandelion/src/traits.rs
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
/// A request to diffuse a transaction to all connected peers.
|
||||||
|
///
|
||||||
|
/// This crate does not handle diffusion it is left to implementers.
|
||||||
|
pub struct DiffuseRequest<Tx>(pub Tx);
|
||||||
|
|
||||||
|
/// A request sent to a single peer to stem this transaction.
|
||||||
|
pub struct StemRequest<Tx>(pub Tx);
|
||||||
|
|
||||||
|
#[cfg(feature = "txpool")]
|
||||||
|
/// A request sent to the backing transaction pool storage.
|
||||||
|
pub enum TxStoreRequest<Tx, TxID> {
|
||||||
|
/// A request to store a transaction with the ID to store it under and the pool to store it in.
|
||||||
|
///
|
||||||
|
/// If the tx is already in the pool then do nothing, unless the tx is in the stem pool then move it
|
||||||
|
/// to the fluff pool, _if this request state is fluff_.
|
||||||
|
Store(Tx, TxID, crate::State),
|
||||||
|
/// A request to retrieve a `Tx` with the given ID from the pool, should not remove that tx from the pool.
|
||||||
|
///
|
||||||
|
/// Must return [`TxStoreResponse::Transaction`]
|
||||||
|
Get(TxID),
|
||||||
|
/// Promote a transaction from the stem pool to the public pool.
|
||||||
|
///
|
||||||
|
/// If the tx is already in the fluff pool do nothing.
|
||||||
|
///
|
||||||
|
/// This should not error if the tx isn't in the pool at all.
|
||||||
|
Promote(TxID),
|
||||||
|
/// A request to check if a translation is in the pool.
|
||||||
|
///
|
||||||
|
/// Must return [`TxStoreResponse::Contains`]
|
||||||
|
Contains(TxID),
|
||||||
|
/// Returns the IDs of all the transaction in the stem pool.
|
||||||
|
///
|
||||||
|
/// Must return [`TxStoreResponse::IDs`]
|
||||||
|
IDsInStemPool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "txpool")]
|
||||||
|
/// A response sent back from the backing transaction pool.
|
||||||
|
pub enum TxStoreResponse<Tx, TxID> {
|
||||||
|
/// A generic ok response.
|
||||||
|
Ok,
|
||||||
|
/// A response containing a [`Option`] for if the transaction is in the pool (Some) or not (None) and in which pool
|
||||||
|
/// the tx is in.
|
||||||
|
Contains(Option<crate::State>),
|
||||||
|
/// A response containing a requested transaction.
|
||||||
|
Transaction(Option<(Tx, crate::State)>),
|
||||||
|
/// A list of transaction IDs.
|
||||||
|
IDs(Vec<TxID>),
|
||||||
|
}
|
Loading…
Reference in a new issue