diff --git a/p2p/dandelion-tower/Cargo.toml b/p2p/dandelion-tower/Cargo.toml index 5e2fec53..976dad60 100644 --- a/p2p/dandelion-tower/Cargo.toml +++ b/p2p/dandelion-tower/Cargo.toml @@ -10,7 +10,7 @@ default = ["txpool"] txpool = ["dep:rand_distr", "dep:tokio-util", "dep:tokio"] [dependencies] -tower = { workspace = true, features = ["discover", "util"] } +tower = { workspace = true, features = ["util"] } tracing = { workspace = true, features = ["std"] } futures = { workspace = true, features = ["std"] } diff --git a/p2p/dandelion-tower/src/lib.rs b/p2p/dandelion-tower/src/lib.rs index f162724f..aa622f30 100644 --- a/p2p/dandelion-tower/src/lib.rs +++ b/p2p/dandelion-tower/src/lib.rs @@ -26,9 +26,9 @@ //! The diffuse service should have a request of [`DiffuseRequest`](traits::DiffuseRequest) and it's error //! should be [`tower::BoxError`]. //! -//! ## Outbound Peer Discoverer +//! ## Outbound Peer TryStream //! -//! The outbound peer [`Discover`](tower::discover::Discover) should provide a stream of randomly selected outbound +//! The outbound peer [`TryStream`](futures::TryStream) should provide a stream of randomly selected outbound //! peers, these peers will then be used to route stem txs to. //! //! The peers will not be returned anywhere, so it is recommended to wrap them in some sort of drop guard that returns @@ -37,10 +37,10 @@ //! ## Peer Service //! //! This service represents a connection to an individual peer, this should be returned from the Outbound Peer -//! Discover. This should immediately send the transaction to the peer when requested, i.e. it should _not_ set +//! TryStream. This should immediately send the transaction to the peer when requested, it should _not_ set //! a timer. //! -//! The diffuse service should have a request of [`StemRequest`](traits::StemRequest) and it's error +//! The peer service should have a request of [`StemRequest`](traits::StemRequest) and its error //! should be [`tower::BoxError`]. //! //! ## Backing Pool diff --git a/p2p/dandelion-tower/src/router.rs b/p2p/dandelion-tower/src/router.rs index 61e962c3..a64819a7 100644 --- a/p2p/dandelion-tower/src/router.rs +++ b/p2p/dandelion-tower/src/router.rs @@ -6,11 +6,10 @@ //! ### What The Router Does Not Do //! //! It does not handle anything to do with keeping transactions long term, i.e. embargo timers and handling -//! loops in the stem. It is up to implementers to do this if they decide not top use [`DandelionPool`](crate::pool::DandelionPool) +//! loops in the stem. It is up to implementers to do this if they decide not to use [`DandelionPool`](crate::pool::DandelionPool) //! use std::{ collections::HashMap, - future::Future, hash::Hash, marker::PhantomData, pin::Pin, @@ -18,12 +17,9 @@ use std::{ time::Instant, }; -use futures::TryFutureExt; +use futures::{future::BoxFuture, FutureExt, TryFutureExt, TryStream}; use rand::{distributions::Bernoulli, prelude::*, thread_rng}; -use tower::{ - discover::{Change, Discover}, - Service, -}; +use tower::Service; use crate::{ traits::{DiffuseRequest, StemRequest}, @@ -39,14 +35,22 @@ pub enum DandelionRouterError { /// The broadcast service returned an error. #[error("Broadcast service returned an err: {0}.")] BroadcastError(tower::BoxError), - /// The outbound peer discoverer returned an error, this is critical. - #[error("The outbound peer discoverer returned an err: {0}.")] - OutboundPeerDiscoverError(tower::BoxError), + /// The outbound peer stream returned an error, this is critical. + #[error("The outbound peer stream returned an err: {0}.")] + OutboundPeerStreamError(tower::BoxError), /// The outbound peer discoverer returned [`None`]. #[error("The outbound peer discoverer exited.")] OutboundPeerDiscoverExited, } +/// A response from an attempt to retrieve an outbound peer. +pub enum OutboundPeer { + /// A peer. + Peer(ID, T), + /// The peer store is exhausted and has no more to return. + Exhausted, +} + /// The dandelion++ state. #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum State { @@ -116,9 +120,11 @@ pub struct DandelionRouter { impl DandelionRouter where ID: Hash + Eq + Clone, - P: Discover, + P: TryStream, Error = tower::BoxError>, B: Service, Error = tower::BoxError>, + B::Future: Send + 'static, S: Service, Error = tower::BoxError>, + S::Future: Send + 'static, { /// Creates a new [`DandelionRouter`], with the provided services and config. /// @@ -165,15 +171,16 @@ where match ready!(self .outbound_peer_discover .as_mut() - .poll_discover(cx) - .map_err(DandelionRouterError::OutboundPeerDiscoverError)) + .try_poll_next(cx) + .map_err(DandelionRouterError::OutboundPeerStreamError)) .ok_or(DandelionRouterError::OutboundPeerDiscoverExited)?? { - Change::Insert(key, svc) => { + OutboundPeer::Peer(key, svc) => { self.stem_peers.insert(key, svc); } - Change::Remove(key) => { - self.stem_peers.remove(&key); + OutboundPeer::Exhausted => { + tracing::warn!("Failed to retrieve enough outbound peers for optimal dandelion++, privacy may be degraded."); + return Poll::Ready(Ok(())); } } } @@ -181,11 +188,24 @@ where Poll::Ready(Ok(())) } - fn fluff_tx(&mut self, tx: Tx) -> B::Future { - self.broadcast_svc.call(DiffuseRequest(tx)) + fn fluff_tx(&mut self, tx: Tx) -> BoxFuture<'static, Result> { + self.broadcast_svc + .call(DiffuseRequest(tx)) + .map_ok(|_| State::Fluff) + .map_err(DandelionRouterError::BroadcastError) + .boxed() } - fn stem_tx(&mut self, tx: Tx, from: ID) -> S::Future { + fn stem_tx( + &mut self, + tx: Tx, + from: ID, + ) -> BoxFuture<'static, Result> { + if self.stem_peers.is_empty() { + tracing::debug!("Stem peers are empty, fluffing stem transaction."); + return self.fluff_tx(tx); + } + loop { let stem_route = self.stem_routes.entry(from.clone()).or_insert_with(|| { self.stem_peers @@ -201,11 +221,20 @@ where continue; }; - return peer.call(StemRequest(tx)); + return peer + .call(StemRequest(tx)) + .map_ok(|_| State::Stem) + .map_err(DandelionRouterError::PeerError) + .boxed(); } } - fn stem_local_tx(&mut self, tx: Tx) -> S::Future { + fn stem_local_tx(&mut self, tx: Tx) -> BoxFuture<'static, Result> { + if self.stem_peers.is_empty() { + tracing::warn!("Stem peers are empty, no outbound connections to stem local tx to, fluffing instead, privacy will be degraded."); + return self.fluff_tx(tx); + } + loop { let stem_route = self.local_route.get_or_insert_with(|| { self.stem_peers @@ -221,7 +250,11 @@ where continue; }; - return peer.call(StemRequest(tx)); + return peer + .call(StemRequest(tx)) + .map_ok(|_| State::Stem) + .map_err(DandelionRouterError::PeerError) + .boxed(); } } } @@ -238,7 +271,7 @@ S: The Peer service - handles routing messages to a single node. impl Service> for DandelionRouter where ID: Hash + Eq + Clone, - P: Discover, + P: TryStream, Error = tower::BoxError>, B: Service, Error = tower::BoxError>, B::Future: Send + 'static, S: Service, Error = tower::BoxError>, @@ -246,8 +279,7 @@ where { type Response = State; type Error = DandelionRouterError; - type Future = - Pin> + Send + 'static>>; + type Future = BoxFuture<'static, Result>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { if self.epoch_start.elapsed() > self.config.epoch_duration { @@ -309,39 +341,23 @@ where tracing::trace!(parent: &self.span, "Handling route request."); match req.state { - TxState::Fluff => Box::pin( - self.fluff_tx(req.tx) - .map_ok(|_| State::Fluff) - .map_err(DandelionRouterError::BroadcastError), - ), + TxState::Fluff => self.fluff_tx(req.tx), TxState::Stem { from } => match self.current_state { State::Fluff => { tracing::debug!(parent: &self.span, "Fluffing stem tx."); - Box::pin( - self.fluff_tx(req.tx) - .map_ok(|_| State::Fluff) - .map_err(DandelionRouterError::BroadcastError), - ) + self.fluff_tx(req.tx) } State::Stem => { tracing::trace!(parent: &self.span, "Steming transaction"); - Box::pin( - self.stem_tx(req.tx, from) - .map_ok(|_| State::Stem) - .map_err(DandelionRouterError::PeerError), - ) + self.stem_tx(req.tx, from) } }, TxState::Local => { tracing::debug!(parent: &self.span, "Steming local tx."); - Box::pin( - self.stem_local_tx(req.tx) - .map_ok(|_| State::Stem) - .map_err(DandelionRouterError::PeerError), - ) + self.stem_local_tx(req.tx) } } } diff --git a/p2p/dandelion-tower/src/tests/mod.rs b/p2p/dandelion-tower/src/tests/mod.rs index 1f3ba3e8..d868a991 100644 --- a/p2p/dandelion-tower/src/tests/mod.rs +++ b/p2p/dandelion-tower/src/tests/mod.rs @@ -3,44 +3,48 @@ mod router; use std::{collections::HashMap, future::Future, hash::Hash, sync::Arc}; -use futures::TryStreamExt; +use futures::{Stream, StreamExt, TryStreamExt}; use tokio::sync::mpsc::{self, UnboundedReceiver}; -use tower::{ - discover::{Discover, ServiceList}, - util::service_fn, - Service, ServiceExt, -}; +use tower::{util::service_fn, Service, ServiceExt}; use crate::{ traits::{TxStoreRequest, TxStoreResponse}, - State, + OutboundPeer, State, }; pub fn mock_discover_svc() -> ( - impl Discover< - Key = usize, - Service = impl Service< - Req, - Future = impl Future> + Send + 'static, - Error = tower::BoxError, - > + Send - + 'static, - Error = tower::BoxError, + impl Stream< + Item = Result< + OutboundPeer< + usize, + impl Service< + Req, + Future = impl Future> + Send + 'static, + Error = tower::BoxError, + > + Send + + 'static, + >, + tower::BoxError, + >, >, - UnboundedReceiver<(u64, Req)>, + UnboundedReceiver<(usize, Req)>, ) { let (tx, rx) = mpsc::unbounded_channel(); - let discover = ServiceList::new((0..).map(move |i| { - let tx_2 = tx.clone(); + let discover = futures::stream::iter(0_usize..1_000_000) + .map(move |i| { + let tx_2 = tx.clone(); - service_fn(move |req| { - tx_2.send((i, req)).unwrap(); + Ok::<_, tower::BoxError>(OutboundPeer::Peer( + i, + service_fn(move |req| { + tx_2.send((i, req)).unwrap(); - async move { Ok::<(), tower::BoxError>(()) } + async move { Ok::<(), tower::BoxError>(()) } + }), + )) }) - })) - .map_err(Into::into); + .map_err(Into::into); (discover, rx) }