diff --git a/p2p/dandelion-tower/src/lib.rs b/p2p/dandelion-tower/src/lib.rs index aa622f30..60b5ea5d 100644 --- a/p2p/dandelion-tower/src/lib.rs +++ b/p2p/dandelion-tower/src/lib.rs @@ -2,17 +2,17 @@ //! //! This crate implements [dandelion++](https://arxiv.org/pdf/1805.11060.pdf), using [`tower`]. //! -//! This crate provides 2 [`tower::Service`]s, a [`DandelionRouter`] and a [`DandelionPool`](pool::DandelionPool). +//! This crate provides 2 [`tower::Service`]s, a [`DandelionRouter`] and a [`DandelionPoolManager`](pool::DandelionPoolManager). //! The router is pretty minimal and only handles the absolute necessary data to route transactions, whereas the //! pool keeps track of all data necessary for dandelion++ but requires you to provide a backing tx-pool. //! -//! This split was done not because the [`DandelionPool`](pool::DandelionPool) is unnecessary but because it is hard -//! to cover a wide range of projects when abstracting over the tx-pool. Not using the [`DandelionPool`](pool::DandelionPool) +//! This split was done not because the [`DandelionPoolManager`](pool::DandelionPoolManager) is unnecessary but because it is hard +//! to cover a wide range of projects when abstracting over the tx-pool. Not using the [`DandelionPoolManager`](pool::DandelionPoolManager) //! requires you to implement part of the paper yourself. //! //! # Features //! -//! This crate only has one feature `txpool` which enables [`DandelionPool`](pool::DandelionPool). +//! This crate only has one feature `txpool` which enables [`DandelionPoolManager`](pool::DandelionPoolManager). //! //! # Needed Services //! @@ -45,7 +45,7 @@ //! //! ## Backing Pool //! -//! ([`DandelionPool`](pool::DandelionPool) only) +//! ([`DandelionPoolManager`](pool::DandelionPoolManager) only) //! //! This service is a backing tx-pool, in memory or on disk. //! The backing pool should have a request of [`TxStoreRequest`](traits::TxStoreRequest) and a response of diff --git a/p2p/dandelion-tower/src/pool.rs b/p2p/dandelion-tower/src/pool.rs deleted file mode 100644 index 5f4f7346..00000000 --- a/p2p/dandelion-tower/src/pool.rs +++ /dev/null @@ -1,509 +0,0 @@ -//! # Dandelion++ Pool -//! -//! This module contains [`DandelionPool`] which is a thin wrapper around a backing transaction store, -//! which fully implements the dandelion++ protocol. -//! -//! ### How To Get Txs From [`DandelionPool`]. -//! -//! [`DandelionPool`] does not provide a full tx-pool API. You cannot retrieve transactions from it or -//! check what transactions are in it, to do this you must keep a handle to the backing transaction store -//! yourself. -//! -//! The reason for this is, the [`DandelionPool`] will only itself be passing these requests onto the backing -//! pool, so it makes sense to remove the "middle man". -//! -//! ### Keep Stem Transactions Hidden -//! -//! When using your handle to the backing store it must be remembered to keep transactions in the stem pool hidden. -//! So handle any requests to the tx-pool like the stem side of the pool does not exist. -use std::{ - collections::{HashMap, HashSet}, - future::Future, - hash::Hash, - marker::PhantomData, - pin::Pin, - task::{Context, Poll}, - time::Duration, -}; - -use futures::{FutureExt, StreamExt}; -use rand::prelude::*; -use rand_distr::Exp; -use tokio::{ - sync::{mpsc, oneshot}, - task::JoinSet, -}; -use tokio_util::{sync::PollSender, time::DelayQueue}; -use tower::{Service, ServiceExt}; -use tracing::Instrument; - -use crate::{ - traits::{TxStoreRequest, TxStoreResponse}, - DandelionConfig, DandelionRouteReq, DandelionRouterError, State, TxState, -}; - -/// Start the [`DandelionPool`]. -/// -/// This function spawns the [`DandelionPool`] and returns [`DandelionPoolService`] which can be used to send -/// requests to the pool. -/// -/// ### Args -/// -/// - `buffer_size` is the size of the channel's buffer between the [`DandelionPoolService`] and [`DandelionPool`]. -/// - `dandelion_router` is the router service, kept generic instead of [`DandelionRouter`](crate::DandelionRouter) to allow -/// user to customise routing functionality. -/// - `backing_pool` is the backing transaction storage service -/// - `config` is [`DandelionConfig`]. -pub fn start_dandelion_pool( - buffer_size: usize, - dandelion_router: R, - backing_pool: P, - config: DandelionConfig, -) -> DandelionPoolService -where - Tx: Clone + Send + 'static, - TxID: Hash + Eq + Clone + Send + 'static, - PID: Hash + Eq + Clone + Send + 'static, - P: Service< - TxStoreRequest, - Response = TxStoreResponse, - Error = tower::BoxError, - > + Send - + 'static, - P::Future: Send + 'static, - R: Service, Response = State, Error = DandelionRouterError> - + Send - + 'static, - R::Future: Send + 'static, -{ - let (tx, rx) = mpsc::channel(buffer_size); - - let pool = DandelionPool { - dandelion_router, - backing_pool, - routing_set: JoinSet::new(), - stem_origins: HashMap::new(), - embargo_timers: DelayQueue::new(), - embargo_dist: Exp::new(1.0 / config.average_embargo_timeout().as_secs_f64()).unwrap(), - config, - _tx: PhantomData, - }; - - let span = tracing::debug_span!("dandelion_pool"); - - tokio::spawn(pool.run(rx).instrument(span)); - - DandelionPoolService { - tx: PollSender::new(tx), - } -} - -#[derive(Copy, Clone, Debug, thiserror::Error)] -#[error("The dandelion pool was shutdown")] -pub struct DandelionPoolShutDown; - -/// An incoming transaction for the [`DandelionPool`] to handle. -/// -/// Users may notice there is no way to check if the dandelion-pool wants a tx according to an inventory message like seen -/// in Bitcoin, only having a request for a full tx. Users should look in the *public* backing pool to handle inv messages, -/// and request txs even if they are in the stem pool. -pub struct IncomingTx { - /// The transaction. - /// - /// It is recommended to put this in an [`Arc`](std::sync::Arc) as it needs to be cloned to send to the backing - /// tx pool and [`DandelionRouter`](crate::DandelionRouter) - pub tx: Tx, - /// The transaction ID. - pub tx_id: TxID, - /// The routing state of this transaction. - pub tx_state: TxState, -} - -/// The dandelion tx pool service. -#[derive(Clone)] -pub struct DandelionPoolService { - /// The channel to [`DandelionPool`]. - tx: PollSender<(IncomingTx, oneshot::Sender<()>)>, -} - -impl Service> for DandelionPoolService -where - Tx: Clone + Send, - TxID: Hash + Eq + Clone + Send + 'static, - PID: Hash + Eq + Clone + Send + 'static, -{ - type Response = (); - type Error = DandelionPoolShutDown; - type Future = - Pin> + Send + 'static>>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.tx.poll_reserve(cx).map_err(|_| DandelionPoolShutDown) - } - - fn call(&mut self, req: IncomingTx) -> Self::Future { - // although the channel isn't sending anything we want to wait for the request to be handled before continuing. - let (tx, rx) = oneshot::channel(); - - let res = self - .tx - .send_item((req, tx)) - .map_err(|_| DandelionPoolShutDown); - - async move { - res?; - rx.await.expect("Oneshot dropped before response!"); - - Ok(()) - } - .boxed() - } -} - -/// The dandelion++ tx pool. -/// -/// See the [module docs](self) for more. -pub struct DandelionPool { - /// The dandelion++ router - dandelion_router: R, - /// The backing tx storage. - backing_pool: P, - /// The set of tasks that are running the future returned from `dandelion_router`. - routing_set: JoinSet<(TxID, Result>)>, - - /// The origin of stem transactions. - stem_origins: HashMap>, - - /// Current stem pool embargo timers. - embargo_timers: DelayQueue, - /// The distrobution to sample to get embargo timers. - embargo_dist: Exp, - - /// The d++ config. - config: DandelionConfig, - - _tx: PhantomData, -} - -impl DandelionPool -where - Tx: Clone + Send, - TxID: Hash + Eq + Clone + Send + 'static, - PID: Hash + Eq + Clone + Send + 'static, - P: Service< - TxStoreRequest, - Response = TxStoreResponse, - Error = tower::BoxError, - >, - P::Future: Send + 'static, - R: Service, Response = State, Error = DandelionRouterError>, - R::Future: Send + 'static, -{ - /// Stores the tx in the backing pools stem pool, setting the embargo timer, stem origin and steming the tx. - async fn store_tx_and_stem( - &mut self, - tx: Tx, - tx_id: TxID, - from: Option, - ) -> Result<(), tower::BoxError> { - self.backing_pool - .ready() - .await? - .call(TxStoreRequest::Store( - tx.clone(), - tx_id.clone(), - State::Stem, - )) - .await?; - - let embargo_timer = self.embargo_dist.sample(&mut thread_rng()); - tracing::debug!( - "Setting embargo timer for stem tx: {} seconds.", - embargo_timer - ); - self.embargo_timers - .insert(tx_id.clone(), Duration::from_secs_f64(embargo_timer)); - - self.stem_tx(tx, tx_id, from).await - } - - /// Stems the tx, setting the stem origin, if it wasn't already set. - /// - /// This function does not add the tx to the backing pool. - async fn stem_tx( - &mut self, - tx: Tx, - tx_id: TxID, - from: Option, - ) -> Result<(), tower::BoxError> { - if let Some(peer) = &from { - self.stem_origins - .entry(tx_id.clone()) - .or_default() - .insert(peer.clone()); - } - - let state = from - .map(|from| TxState::Stem { from }) - .unwrap_or(TxState::Local); - - let fut = self - .dandelion_router - .ready() - .await? - .call(DandelionRouteReq { - tx, - state: state.clone(), - }); - - self.routing_set - .spawn(fut.map(|res| (tx_id, res.map_err(|_| state)))); - Ok(()) - } - - /// Stores the tx in the backing pool and fluffs the tx, removing the stem data for this tx. - async fn store_and_fluff_tx(&mut self, tx: Tx, tx_id: TxID) -> Result<(), tower::BoxError> { - // fluffs the tx first to prevent timing attacks where we could fluff at different average times - // depending on if the tx was in the stem pool already or not. - // Massively overkill but this is a minimal change. - self.fluff_tx(tx.clone(), tx_id.clone()).await?; - - // Remove the tx from the maps used during the stem phase. - self.stem_origins.remove(&tx_id); - - self.backing_pool - .ready() - .await? - .call(TxStoreRequest::Store(tx, tx_id, State::Fluff)) - .await?; - - // The key for this is *Not* the tx_id, it is given on insert, so just keep the timer in the - // map. These timers should be relatively short, so it shouldn't be a problem. - //self.embargo_timers.try_remove(&tx_id); - - Ok(()) - } - - /// Fluffs a tx, does not add the tx to the tx pool. - async fn fluff_tx(&mut self, tx: Tx, tx_id: TxID) -> Result<(), tower::BoxError> { - let fut = self - .dandelion_router - .ready() - .await? - .call(DandelionRouteReq { - tx, - state: TxState::Fluff, - }); - - self.routing_set - .spawn(fut.map(|res| (tx_id, res.map_err(|_| TxState::Fluff)))); - Ok(()) - } - - /// Function to handle an incoming [`DandelionPoolRequest::IncomingTx`]. - async fn handle_incoming_tx( - &mut self, - tx: Tx, - tx_state: TxState, - tx_id: TxID, - ) -> Result<(), tower::BoxError> { - let TxStoreResponse::Contains(have_tx) = self - .backing_pool - .ready() - .await? - .call(TxStoreRequest::Contains(tx_id.clone())) - .await? - else { - panic!("Backing tx pool responded with wrong response for request."); - }; - // If we have already fluffed this tx then we don't need to do anything. - if have_tx == Some(State::Fluff) { - tracing::debug!("Already fluffed incoming tx, ignoring."); - return Ok(()); - } - - match tx_state { - TxState::Stem { from } => { - if self - .stem_origins - .get(&tx_id) - .is_some_and(|peers| peers.contains(&from)) - { - tracing::debug!("Received stem tx twice from same peer, fluffing it"); - // The same peer sent us a tx twice, fluff it. - self.promote_and_fluff_tx(tx_id).await - } else { - // This could be a new tx or it could have already been stemed, but we still stem it again - // unless the same peer sends us a tx twice. - tracing::debug!("Steming incoming tx"); - self.store_tx_and_stem(tx, tx_id, Some(from)).await - } - } - TxState::Fluff => { - tracing::debug!("Fluffing incoming tx"); - self.store_and_fluff_tx(tx, tx_id).await - } - TxState::Local => { - // If we have already stemed this tx then nothing to do. - if have_tx.is_some() { - tracing::debug!("Received a local tx that we already have, skipping"); - return Ok(()); - } - tracing::debug!("Steming local transaction"); - self.store_tx_and_stem(tx, tx_id, None).await - } - } - } - - /// Promotes a tx to the clear pool. - async fn promote_tx(&mut self, tx_id: TxID) -> Result<(), tower::BoxError> { - // Remove the tx from the maps used during the stem phase. - self.stem_origins.remove(&tx_id); - - // The key for this is *Not* the tx_id, it is given on insert, so just keep the timer in the - // map. These timers should be relatively short, so it shouldn't be a problem. - //self.embargo_timers.try_remove(&tx_id); - - self.backing_pool - .ready() - .await? - .call(TxStoreRequest::Promote(tx_id)) - .await?; - - Ok(()) - } - - /// Promotes a tx to the public fluff pool and fluffs the tx. - async fn promote_and_fluff_tx(&mut self, tx_id: TxID) -> Result<(), tower::BoxError> { - tracing::debug!("Promoting transaction to public pool and fluffing it."); - - let TxStoreResponse::Transaction(tx) = self - .backing_pool - .ready() - .await? - .call(TxStoreRequest::Get(tx_id.clone())) - .await? - else { - panic!("Backing tx pool responded with wrong response for request."); - }; - - let Some((tx, state)) = tx else { - tracing::debug!("Could not find tx, skipping."); - return Ok(()); - }; - - if state == State::Fluff { - tracing::debug!("Transaction already fluffed, skipping."); - return Ok(()); - } - - self.promote_tx(tx_id.clone()).await?; - self.fluff_tx(tx, tx_id).await - } - - /// Returns a tx stored in the fluff _OR_ stem pool. - async fn get_tx_from_pool(&mut self, tx_id: TxID) -> Result, tower::BoxError> { - let TxStoreResponse::Transaction(tx) = self - .backing_pool - .ready() - .await? - .call(TxStoreRequest::Get(tx_id)) - .await? - else { - panic!("Backing tx pool responded with wrong response for request."); - }; - - Ok(tx.map(|tx| tx.0)) - } - - /// Starts the [`DandelionPool`]. - async fn run( - mut self, - mut rx: mpsc::Receiver<(IncomingTx, oneshot::Sender<()>)>, - ) { - tracing::debug!("Starting dandelion++ tx-pool, config: {:?}", self.config); - - // On start up we just fluff all txs left in the stem pool. - let Ok(TxStoreResponse::IDs(ids)) = (&mut self.backing_pool) - .oneshot(TxStoreRequest::IDsInStemPool) - .await - else { - tracing::error!("Failed to get transactions in stem pool."); - return; - }; - - tracing::debug!( - "Fluffing {} txs that are currently in the stem pool", - ids.len() - ); - - for id in ids { - if let Err(e) = self.promote_and_fluff_tx(id).await { - tracing::error!("Failed to fluff tx in the stem pool at start up, {e}."); - return; - } - } - - loop { - tracing::trace!("Waiting for next event."); - tokio::select! { - // biased to handle current txs before routing new ones. - biased; - Some(fired) = self.embargo_timers.next() => { - tracing::debug!("Embargo timer fired, did not see stem tx in time."); - - let tx_id = fired.into_inner(); - if let Err(e) = self.promote_and_fluff_tx(tx_id).await { - tracing::error!("Error handling fired embargo timer: {e}"); - return; - } - } - Some(Ok((tx_id, res))) = self.routing_set.join_next() => { - tracing::trace!("Received d++ routing result."); - - let res = match res { - Ok(State::Fluff) => { - tracing::debug!("Transaction was fluffed upgrading it to the public pool."); - self.promote_tx(tx_id).await - } - Err(tx_state) => { - tracing::debug!("Error routing transaction, trying again."); - - match self.get_tx_from_pool(tx_id.clone()).await { - Ok(Some(tx)) => match tx_state { - TxState::Fluff => self.fluff_tx(tx, tx_id).await, - TxState::Stem { from } => self.stem_tx(tx, tx_id, Some(from)).await, - TxState::Local => self.stem_tx(tx, tx_id, None).await, - } - Err(e) => Err(e), - _ => continue, - } - } - Ok(State::Stem) => continue, - }; - - if let Err(e) = res { - tracing::error!("Error handling transaction routing return: {e}"); - return; - } - } - req = rx.recv() => { - tracing::debug!("Received new tx to route."); - - let Some((IncomingTx { tx, tx_state, tx_id }, res_tx)) = req else { - return; - }; - - if let Err(e) = self.handle_incoming_tx(tx, tx_state, tx_id).await { - let _ = res_tx.send(()); - - tracing::error!("Error handling transaction in dandelion pool: {e}"); - return; - } - let _ = res_tx.send(()); - - } - } - } - } -} diff --git a/p2p/dandelion-tower/src/pool/incoming_tx.rs b/p2p/dandelion-tower/src/pool/incoming_tx.rs new file mode 100644 index 00000000..c9a30dee --- /dev/null +++ b/p2p/dandelion-tower/src/pool/incoming_tx.rs @@ -0,0 +1,113 @@ +//! Contains [`IncomingTx`] and [`IncomingTxBuilder`] +use crate::{State, TxState}; + +/// An incoming transaction that has gone through the preprocessing stage. +pub struct IncomingTx { + /// The transaction. + pub(crate) tx: Tx, + /// The transaction ID. + pub(crate) tx_id: TxId, + /// The routing state of the transaction. + pub(crate) routing_state: TxState, +} + +/// An [`IncomingTx`] builder. +/// +/// The const generics here are used to restrict what methods can be called. +/// +/// - `RS`: routing state; a `bool` for if the routing state is set +/// - `DBS`: database state; a `bool` for if the state in the DB is set +pub struct IncomingTxBuilder { + /// The transaction. + tx: Tx, + /// The transaction ID. + tx_id: TxId, + /// The routing state of the transaction. + routing_state: Option>, + /// The state of this transaction in the DB. + state_in_db: Option, +} + +impl IncomingTxBuilder { + /// Creates a new [`IncomingTxBuilder`]. + pub fn new(tx: Tx, tx_id: TxId) -> Self { + Self { + tx, + tx_id, + routing_state: None, + state_in_db: None, + } + } +} + +impl IncomingTxBuilder { + /// Adds the routing state to the builder. + /// + /// The routing state is the origin of this transaction from our perspective. + pub fn with_routing_state( + self, + state: TxState, + ) -> IncomingTxBuilder { + IncomingTxBuilder { + tx: self.tx, + tx_id: self.tx_id, + routing_state: Some(state), + state_in_db: self.state_in_db, + } + } +} + +impl IncomingTxBuilder { + /// Adds the database state to the builder. + /// + /// If the transaction is not in the DB already then the state should be [`None`]. + pub fn with_state_in_db( + self, + state: Option, + ) -> IncomingTxBuilder { + IncomingTxBuilder { + tx: self.tx, + tx_id: self.tx_id, + routing_state: self.routing_state, + state_in_db: state, + } + } +} + +impl IncomingTxBuilder { + /// Builds the [`IncomingTx`]. + /// + /// If this returns [`None`] then the transaction does not need to be given to the dandelion pool + /// manager. + pub fn build(self) -> Option> { + let routing_state = self.routing_state.unwrap(); + + if self.state_in_db == Some(State::Fluff) { + return None; + } + + Some(IncomingTx { + tx: self.tx, + tx_id: self.tx_id, + routing_state, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_builder() { + IncomingTxBuilder::new(1, 2) + .with_routing_state(TxState::Stem { from: 3 }) + .with_state_in_db(None) + .build(); + + IncomingTxBuilder::new(1, 2) + .with_state_in_db(None) + .with_routing_state(TxState::Stem { from: 3 }) + .build(); + } +} diff --git a/p2p/dandelion-tower/src/pool/manager.rs b/p2p/dandelion-tower/src/pool/manager.rs new file mode 100644 index 00000000..9e1572e1 --- /dev/null +++ b/p2p/dandelion-tower/src/pool/manager.rs @@ -0,0 +1,294 @@ +use std::{ + collections::{HashMap, HashSet}, + hash::Hash, + marker::PhantomData, + time::Duration, +}; + +use futures::{FutureExt, StreamExt}; +use rand::prelude::*; +use rand_distr::Exp; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinSet, +}; +use tokio_util::time::DelayQueue; +use tower::{Service, ServiceExt}; + +use crate::{ + pool::IncomingTx, + traits::{TxStoreRequest, TxStoreResponse}, + DandelionConfig, DandelionRouteReq, DandelionRouterError, State, TxState, +}; + +#[derive(Copy, Clone, Debug, thiserror::Error)] +#[error("The dandelion pool was shutdown")] +pub struct DandelionPoolShutDown; + +/// The dandelion++ pool manager. +/// +/// See the [module docs](super) for more. +pub struct DandelionPoolManager { + /// The dandelion++ router + pub(crate) dandelion_router: R, + /// The backing tx storage. + pub(crate) backing_pool: P, + /// The set of tasks that are running the future returned from `dandelion_router`. + pub(crate) routing_set: JoinSet<(TxId, Result>)>, + + /// The origin of stem transactions. + pub(crate) stem_origins: HashMap>, + + /// Current stem pool embargo timers. + pub(crate) embargo_timers: DelayQueue, + /// The distrobution to sample to get embargo timers. + pub(crate) embargo_dist: Exp, + + /// The d++ config. + pub(crate) config: DandelionConfig, + + pub(crate) _tx: PhantomData, +} + +impl DandelionPoolManager +where + Tx: Clone + Send, + TxId: Hash + Eq + Clone + Send + 'static, + PeerId: Hash + Eq + Clone + Send + 'static, + P: Service, Response = TxStoreResponse, Error = tower::BoxError>, + P::Future: Send + 'static, + R: Service, Response = State, Error = DandelionRouterError>, + R::Future: Send + 'static, +{ + /// Adds a new embargo timer to the running timers, with a duration pulled from [`Self::embargo_dist`] + fn add_embargo_timer_for_tx(&mut self, tx_id: TxId) { + let embargo_timer = self.embargo_dist.sample(&mut thread_rng()); + tracing::debug!( + "Setting embargo timer for stem tx: {} seconds.", + embargo_timer + ); + + self.embargo_timers + .insert(tx_id, Duration::from_secs_f64(embargo_timer)); + } + + /// Stems the tx, setting the stem origin, if it wasn't already set. + /// + /// This function does not add the tx to the backing pool. + async fn stem_tx( + &mut self, + tx: Tx, + tx_id: TxId, + from: Option, + ) -> Result<(), tower::BoxError> { + if let Some(peer) = &from { + self.stem_origins + .entry(tx_id.clone()) + .or_default() + .insert(peer.clone()); + } + + let state = from + .map(|from| TxState::Stem { from }) + .unwrap_or(TxState::Local); + + let fut = self + .dandelion_router + .ready() + .await? + .call(DandelionRouteReq { + tx, + state: state.clone(), + }); + + self.routing_set + .spawn(fut.map(|res| (tx_id, res.map_err(|_| state)))); + Ok(()) + } + + /// Fluffs a tx, does not add the tx to the tx pool. + async fn fluff_tx(&mut self, tx: Tx, tx_id: TxId) -> Result<(), tower::BoxError> { + let fut = self + .dandelion_router + .ready() + .await? + .call(DandelionRouteReq { + tx, + state: TxState::Fluff, + }); + + self.routing_set + .spawn(fut.map(|res| (tx_id, res.map_err(|_| TxState::Fluff)))); + Ok(()) + } + + /// Function to handle an [`IncomingTx`]. + async fn handle_incoming_tx( + &mut self, + tx: Tx, + tx_state: TxState, + tx_id: TxId, + ) -> Result<(), tower::BoxError> { + match tx_state { + TxState::Stem { from } => { + if self + .stem_origins + .get(&tx_id) + .is_some_and(|peers| peers.contains(&from)) + { + tracing::debug!("Received stem tx twice from same peer, fluffing it"); + // The same peer sent us a tx twice, fluff it. + self.promote_and_fluff_tx(tx_id).await?; + } else { + // This could be a new tx or it could have already been stemed, but we still stem it again + // unless the same peer sends us a tx twice. + tracing::debug!("Steming incoming tx"); + self.stem_tx(tx, tx_id.clone(), Some(from)).await?; + self.add_embargo_timer_for_tx(tx_id); + } + } + TxState::Fluff => { + tracing::debug!("Fluffing incoming tx"); + self.fluff_tx(tx, tx_id).await?; + } + TxState::Local => { + tracing::debug!("Steming local transaction"); + self.stem_tx(tx, tx_id.clone(), None).await?; + self.add_embargo_timer_for_tx(tx_id); + } + } + + Ok(()) + } + + /// Promotes a tx to the clear pool. + async fn promote_tx(&mut self, tx_id: TxId) -> Result<(), tower::BoxError> { + // Remove the tx from the maps used during the stem phase. + self.stem_origins.remove(&tx_id); + + // The key for this is *Not* the tx_id, it is given on insert, so just keep the timer in the + // map. These timers should be relatively short, so it shouldn't be a problem. + //self.embargo_timers.try_remove(&tx_id); + + self.backing_pool + .ready() + .await? + .call(TxStoreRequest::Promote(tx_id)) + .await?; + + Ok(()) + } + + /// Promotes a tx to the public fluff pool and fluffs the tx. + async fn promote_and_fluff_tx(&mut self, tx_id: TxId) -> Result<(), tower::BoxError> { + tracing::debug!("Promoting transaction to public pool and fluffing it."); + + let TxStoreResponse::Transaction(tx) = self + .backing_pool + .ready() + .await? + .call(TxStoreRequest::Get(tx_id.clone())) + .await? + else { + panic!("Backing tx pool responded with wrong response for request."); + }; + + let Some((tx, state)) = tx else { + tracing::debug!("Could not find tx, skipping."); + return Ok(()); + }; + + if state == State::Fluff { + tracing::debug!("Transaction already fluffed, skipping."); + return Ok(()); + } + + self.promote_tx(tx_id.clone()).await?; + self.fluff_tx(tx, tx_id).await + } + + /// Returns a tx stored in the fluff _OR_ stem pool. + async fn get_tx_from_pool(&mut self, tx_id: TxId) -> Result, tower::BoxError> { + let TxStoreResponse::Transaction(tx) = self + .backing_pool + .ready() + .await? + .call(TxStoreRequest::Get(tx_id)) + .await? + else { + panic!("Backing tx pool responded with wrong response for request."); + }; + + Ok(tx.map(|tx| tx.0)) + } + + /// Starts the [`DandelionPoolManager`]. + pub(crate) async fn run( + mut self, + mut rx: mpsc::Receiver<(IncomingTx, oneshot::Sender<()>)>, + ) { + tracing::debug!("Starting dandelion++ tx-pool, config: {:?}", self.config); + + loop { + tracing::trace!("Waiting for next event."); + tokio::select! { + // biased to handle current txs before routing new ones. + biased; + Some(fired) = self.embargo_timers.next() => { + tracing::debug!("Embargo timer fired, did not see stem tx in time."); + + let tx_id = fired.into_inner(); + if let Err(e) = self.promote_and_fluff_tx(tx_id).await { + tracing::error!("Error handling fired embargo timer: {e}"); + return; + } + } + Some(Ok((tx_id, res))) = self.routing_set.join_next() => { + tracing::trace!("Received d++ routing result."); + + let res = match res { + Ok(State::Fluff) => { + tracing::debug!("Transaction was fluffed upgrading it to the public pool."); + self.promote_tx(tx_id).await + } + Err(tx_state) => { + tracing::debug!("Error routing transaction, trying again."); + + match self.get_tx_from_pool(tx_id.clone()).await { + Ok(Some(tx)) => match tx_state { + TxState::Fluff => self.fluff_tx(tx, tx_id).await, + TxState::Stem { from } => self.stem_tx(tx, tx_id, Some(from)).await, + TxState::Local => self.stem_tx(tx, tx_id, None).await, + } + Err(e) => Err(e), + _ => continue, + } + } + Ok(State::Stem) => continue, + }; + + if let Err(e) = res { + tracing::error!("Error handling transaction routing return: {e}"); + return; + } + } + req = rx.recv() => { + tracing::debug!("Received new tx to route."); + + let Some((IncomingTx { tx, tx_id, routing_state }, res_tx)) = req else { + return; + }; + + if let Err(e) = self.handle_incoming_tx(tx, routing_state, tx_id).await { + let _ = res_tx.send(()); + + tracing::error!("Error handling transaction in dandelion pool: {e}"); + return; + } + let _ = res_tx.send(()); + + } + } + } + } +} diff --git a/p2p/dandelion-tower/src/pool/mod.rs b/p2p/dandelion-tower/src/pool/mod.rs new file mode 100644 index 00000000..40a36172 --- /dev/null +++ b/p2p/dandelion-tower/src/pool/mod.rs @@ -0,0 +1,145 @@ +//! # Dandelion++ Pool +//! +//! This module contains [`DandelionPoolManager`] which is a wrapper around a backing transaction store, +//! which fully implements the dandelion++ protocol. +//! +//! The [`DandelionPoolManager`] is a middle man between a [preprocessing stage](#preprocessing-stage) and a dandelion router. +//! It handles promoting transactions in the stem state to the fluff state and setting embargo timers on stem state transactions. +//! +//! ### Preprocessing stage +//! +//! The preprocessing stage (not handled in this crate) before giving the transaction to the [`DandelionPoolManager`] +//! should handle: +//! +//! - verifying the tx. +//! - checking if we have the tx in the pool already and giving that information to the [`IncomingTxBuilder`]. +//! - storing the tx in the pool, if it isn't there already. +//! +//! ### Keep Stem Transactions Hidden +//! +//! When using your handle to the backing store it must be remembered to keep transactions in the stem pool hidden. +//! So handle any requests to the tx-pool like the stem side of the pool does not exist. +use std::{ + collections::HashMap, + hash::Hash, + marker::PhantomData, + task::{Context, Poll}, +}; + +use futures::{future::BoxFuture, FutureExt}; +use rand_distr::Exp; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinSet, +}; +use tokio_util::{sync::PollSender, time::DelayQueue}; +use tower::Service; +use tracing::Instrument; + +use crate::{ + pool::manager::DandelionPoolShutDown, + traits::{TxStoreRequest, TxStoreResponse}, + DandelionConfig, DandelionRouteReq, DandelionRouterError, State, +}; + +mod incoming_tx; +mod manager; + +pub use incoming_tx::{IncomingTx, IncomingTxBuilder}; +pub use manager::DandelionPoolManager; + +/// Start the [`DandelionPoolManager`]. +/// +/// This function spawns the [`DandelionPoolManager`] and returns [`DandelionPoolService`] which can be used to send +/// requests to the pool. +/// +/// ### Args +/// +/// - `buffer_size` is the size of the channel's buffer between the [`DandelionPoolService`] and [`DandelionPoolManager`]. +/// - `dandelion_router` is the router service, kept generic instead of [`DandelionRouter`](crate::DandelionRouter) to allow +/// user to customise routing functionality. +/// - `backing_pool` is the backing transaction storage service +/// - `config` is [`DandelionConfig`]. +pub fn start_dandelion_pool_manager( + buffer_size: usize, + dandelion_router: R, + backing_pool: P, + config: DandelionConfig, +) -> DandelionPoolService +where + Tx: Clone + Send + 'static, + TxId: Hash + Eq + Clone + Send + 'static, + PeerId: Hash + Eq + Clone + Send + 'static, + P: Service, Response = TxStoreResponse, Error = tower::BoxError> + + Send + + 'static, + P::Future: Send + 'static, + R: Service, Response = State, Error = DandelionRouterError> + + Send + + 'static, + R::Future: Send + 'static, +{ + let (tx, rx) = mpsc::channel(buffer_size); + + let pool = DandelionPoolManager { + dandelion_router, + backing_pool, + routing_set: JoinSet::new(), + stem_origins: HashMap::new(), + embargo_timers: DelayQueue::new(), + embargo_dist: Exp::new(1.0 / config.average_embargo_timeout().as_secs_f64()).unwrap(), + config, + _tx: PhantomData, + }; + + let span = tracing::debug_span!("dandelion_pool"); + + tokio::spawn(pool.run(rx).instrument(span)); + + DandelionPoolService { + tx: PollSender::new(tx), + } +} + +/// The dandelion pool manager service. +/// +/// Used to send [`IncomingTx`]s to the [`DandelionPoolManager`] +#[derive(Clone)] +pub struct DandelionPoolService { + /// The channel to [`DandelionPoolManager`]. + tx: PollSender<(IncomingTx, oneshot::Sender<()>)>, +} + +impl Service> + for DandelionPoolService +where + Tx: Clone + Send, + TxId: Hash + Eq + Clone + Send + 'static, + PeerId: Hash + Eq + Clone + Send + 'static, +{ + type Response = (); + type Error = DandelionPoolShutDown; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.tx.poll_reserve(cx).map_err(|_| DandelionPoolShutDown) + } + + fn call(&mut self, req: IncomingTx) -> Self::Future { + // although the channel isn't sending anything we want to wait for the request to be handled before continuing. + let (tx, rx) = oneshot::channel(); + + let res = self + .tx + .send_item((req, tx)) + .map_err(|_| DandelionPoolShutDown); + + async move { + res?; + rx.await.expect("Oneshot dropped before response!"); + + Ok(()) + } + .boxed() + } +} diff --git a/p2p/dandelion-tower/src/router.rs b/p2p/dandelion-tower/src/router.rs index c118c0b7..edeccaeb 100644 --- a/p2p/dandelion-tower/src/router.rs +++ b/p2p/dandelion-tower/src/router.rs @@ -6,7 +6,7 @@ //! ### What The Router Does Not Do //! //! It does not handle anything to do with keeping transactions long term, i.e. embargo timers and handling -//! loops in the stem. It is up to implementers to do this if they decide not to use [`DandelionPool`](crate::pool::DandelionPool) +//! loops in the stem. It is up to implementers to do this if they decide not to use [`DandelionPool`](crate::pool::DandelionPoolManager) use std::{ collections::HashMap, hash::Hash, @@ -43,9 +43,9 @@ pub enum DandelionRouterError { } /// A response from an attempt to retrieve an outbound peer. -pub enum OutboundPeer { +pub enum OutboundPeer { /// A peer. - Peer(ID, T), + Peer(Id, T), /// The peer store is exhausted and has no more to return. Exhausted, } @@ -61,28 +61,28 @@ pub enum State { /// The routing state of a transaction. #[derive(Debug, Clone, Eq, PartialEq)] -pub enum TxState { +pub enum TxState { /// Fluff state. Fluff, /// Stem state. Stem { - /// The peer who sent us this transaction's ID. - from: ID, + /// The peer who sent us this transaction's Id. + from: Id, }, /// Local - the transaction originated from our node. Local, } /// A request to route a transaction. -pub struct DandelionRouteReq { +pub struct DandelionRouteReq { /// The transaction. pub tx: Tx, /// The transaction state. - pub state: TxState, + pub state: TxState, } /// The dandelion router service. -pub struct DandelionRouter { +pub struct DandelionRouter { // pub(crate) is for tests /// A [`Discover`] where we can get outbound peers from. outbound_peer_discover: Pin>, @@ -95,14 +95,14 @@ pub struct DandelionRouter { epoch_start: Instant, /// The stem our local transactions will be sent to. - local_route: Option, - /// A [`HashMap`] linking peer's IDs to IDs in `stem_peers`. - stem_routes: HashMap, + local_route: Option, + /// A [`HashMap`] linking peer's Ids to Ids in `stem_peers`. + stem_routes: HashMap, /// Peers we are using for stemming. /// /// This will contain peers, even in [`State::Fluff`] to allow us to stem [`TxState::Local`] /// transactions. - pub(crate) stem_peers: HashMap, + pub(crate) stem_peers: HashMap, /// The distribution to sample to get the [`State`], true is [`State::Fluff`]. state_dist: Bernoulli, @@ -116,10 +116,10 @@ pub struct DandelionRouter { _tx: PhantomData, } -impl DandelionRouter +impl DandelionRouter where - ID: Hash + Eq + Clone, - P: TryStream, Error = tower::BoxError>, + Id: Hash + Eq + Clone, + P: TryStream, Error = tower::BoxError>, B: Service, Error = tower::BoxError>, B::Future: Send + 'static, S: Service, Error = tower::BoxError>, @@ -198,7 +198,7 @@ where fn stem_tx( &mut self, tx: Tx, - from: ID, + from: Id, ) -> BoxFuture<'static, Result> { if self.stem_peers.is_empty() { tracing::debug!("Stem peers are empty, fluffing stem transaction."); @@ -258,19 +258,10 @@ where } } -/* -## Generics ## - -Tx: The tx type -ID: Peer Id type - unique identifier for nodes. -P: Peer Set discover - where we can get outbound peers from -B: Broadcast service - where we send txs to get diffused. -S: The Peer service - handles routing messages to a single node. - */ -impl Service> for DandelionRouter +impl Service> for DandelionRouter where - ID: Hash + Eq + Clone, - P: TryStream, Error = tower::BoxError>, + Id: Hash + Eq + Clone, + P: TryStream, Error = tower::BoxError>, B: Service, Error = tower::BoxError>, B::Future: Send + 'static, S: Service, Error = tower::BoxError>, @@ -336,7 +327,7 @@ where Poll::Ready(Ok(())) } - fn call(&mut self, req: DandelionRouteReq) -> Self::Future { + fn call(&mut self, req: DandelionRouteReq) -> Self::Future { tracing::trace!(parent: &self.span, "Handling route request."); match req.state { diff --git a/p2p/dandelion-tower/src/tests/mod.rs b/p2p/dandelion-tower/src/tests/mod.rs index d868a991..1c6a3e05 100644 --- a/p2p/dandelion-tower/src/tests/mod.rs +++ b/p2p/dandelion-tower/src/tests/mod.rs @@ -76,11 +76,9 @@ pub fn mock_in_memory_backing_pool< TxID: Clone + Hash + Eq + Send + 'static, >() -> ( impl Service< - TxStoreRequest, - Response = TxStoreResponse, - Future = impl Future, tower::BoxError>> - + Send - + 'static, + TxStoreRequest, + Response = TxStoreResponse, + Future = impl Future, tower::BoxError>> + Send + 'static, Error = tower::BoxError, > + Send + 'static, @@ -90,33 +88,14 @@ pub fn mock_in_memory_backing_pool< let txs_2 = txs.clone(); ( - service_fn(move |req: TxStoreRequest| { + service_fn(move |req: TxStoreRequest| { let txs = txs.clone(); async move { match req { - TxStoreRequest::Store(tx, tx_id, state) => { - txs.lock().unwrap().insert(tx_id, (tx, state)); - Ok(TxStoreResponse::Ok) - } TxStoreRequest::Get(tx_id) => { let tx_state = txs.lock().unwrap().get(&tx_id).cloned(); Ok(TxStoreResponse::Transaction(tx_state)) } - TxStoreRequest::Contains(tx_id) => Ok(TxStoreResponse::Contains( - txs.lock().unwrap().get(&tx_id).map(|res| res.1), - )), - TxStoreRequest::IDsInStemPool => { - // horribly inefficient, but it's test code :) - let ids = txs - .lock() - .unwrap() - .iter() - .filter(|(_, (_, state))| matches!(state, State::Stem)) - .map(|tx| tx.0.clone()) - .collect::>(); - - Ok(TxStoreResponse::IDs(ids)) - } TxStoreRequest::Promote(tx_id) => { let _ = txs .lock() diff --git a/p2p/dandelion-tower/src/tests/pool.rs b/p2p/dandelion-tower/src/tests/pool.rs index 4a7c87dd..b7fa55eb 100644 --- a/p2p/dandelion-tower/src/tests/pool.rs +++ b/p2p/dandelion-tower/src/tests/pool.rs @@ -1,12 +1,11 @@ use std::time::Duration; +use super::*; use crate::{ - pool::{start_dandelion_pool, IncomingTx}, + pool::{start_dandelion_pool_manager, IncomingTx}, DandelionConfig, DandelionRouter, Graph, TxState, }; -use super::*; - #[tokio::test] async fn basic_functionality() { let config = DandelionConfig { @@ -21,9 +20,9 @@ async fn basic_functionality() { let router = DandelionRouter::new(broadcast_svc, outbound_peer_svc, config); - let (pool_svc, pool) = mock_in_memory_backing_pool(); + let (pool_svc, _pool) = mock_in_memory_backing_pool(); - let mut pool_svc = start_dandelion_pool(15, router, pool_svc, config); + let mut pool_svc = start_dandelion_pool_manager(15, router, pool_svc, config); pool_svc .ready() @@ -32,11 +31,13 @@ async fn basic_functionality() { .call(IncomingTx { tx: 0_usize, tx_id: 1_usize, - tx_state: TxState::Fluff, + routing_state: TxState::Fluff, }) .await .unwrap(); - assert!(pool.lock().unwrap().contains_key(&1)); + // TODO: the DandelionPoolManager doesn't handle adding txs to the pool, add more tests here to test + // all functionality. + //assert!(pool.lock().unwrap().contains_key(&1)); assert!(broadcast_rx.try_recv().is_ok()) } diff --git a/p2p/dandelion-tower/src/traits.rs b/p2p/dandelion-tower/src/traits.rs index c84ecf04..bbf60863 100644 --- a/p2p/dandelion-tower/src/traits.rs +++ b/p2p/dandelion-tower/src/traits.rs @@ -8,42 +8,24 @@ pub struct StemRequest(pub Tx); #[cfg(feature = "txpool")] /// A request sent to the backing transaction pool storage. -pub enum TxStoreRequest { - /// A request to store a transaction with the ID to store it under and the pool to store it in. - /// - /// If the tx is already in the pool then do nothing, unless the tx is in the stem pool then move it - /// to the fluff pool, _if this request state is fluff_. - Store(Tx, TxID, crate::State), - /// A request to retrieve a `Tx` with the given ID from the pool, should not remove that tx from the pool. +pub enum TxStoreRequest { + /// A request to retrieve a `Tx` with the given Id from the pool, should not remove that tx from the pool. /// /// Must return [`TxStoreResponse::Transaction`] - Get(TxID), + Get(TxId), /// Promote a transaction from the stem pool to the public pool. /// /// If the tx is already in the fluff pool do nothing. /// /// This should not error if the tx isn't in the pool at all. - Promote(TxID), - /// A request to check if a translation is in the pool. - /// - /// Must return [`TxStoreResponse::Contains`] - Contains(TxID), - /// Returns the IDs of all the transaction in the stem pool. - /// - /// Must return [`TxStoreResponse::IDs`] - IDsInStemPool, + Promote(TxId), } #[cfg(feature = "txpool")] /// A response sent back from the backing transaction pool. -pub enum TxStoreResponse { +pub enum TxStoreResponse { /// A generic ok response. Ok, - /// A response containing a [`Option`] for if the transaction is in the pool (Some) or not (None) and in which pool - /// the tx is in. - Contains(Option), /// A response containing a requested transaction. Transaction(Option<(Tx, crate::State)>), - /// A list of transaction IDs. - IDs(Vec), }