diff --git a/Cargo.lock b/Cargo.lock
index 0750040..b08aee5 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -656,6 +656,21 @@ dependencies = [
"zeroize",
]
+[[package]]
+name = "dandelion_tower"
+version = "0.1.0"
+dependencies = [
+ "futures",
+ "proptest",
+ "rand",
+ "rand_distr",
+ "thiserror",
+ "tokio",
+ "tokio-util",
+ "tower",
+ "tracing",
+]
+
[[package]]
name = "diff"
version = "0.1.13"
diff --git a/Cargo.toml b/Cargo.toml
index ee2ddc1..48bad34 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,6 +11,7 @@ members = [
"net/fixed-bytes",
"net/levin",
"net/monero-wire",
+ "p2p/dandelion",
"p2p/monero-p2p",
"p2p/address-book",
"pruning",
@@ -60,6 +61,7 @@ paste = { version = "1.0.14", default-features = false }
pin-project = { version = "1.1.3", default-features = false }
randomx-rs = { git = "https://github.com/Cuprate/randomx-rs.git", rev = "0028464", default-features = false }
rand = { version = "0.8.5", default-features = false }
+rand_distr = { version = "0.4.3", default-features = false }
rayon = { version = "1.9.0", default-features = false }
serde_bytes = { version = "0.11.12", default-features = false }
serde_json = { version = "1.0.108", default-features = false }
diff --git a/p2p/dandelion/Cargo.toml b/p2p/dandelion/Cargo.toml
new file mode 100644
index 0000000..a8a0469
--- /dev/null
+++ b/p2p/dandelion/Cargo.toml
@@ -0,0 +1,27 @@
+[package]
+name = "dandelion_tower"
+version = "0.1.0"
+edition = "2021"
+license = "MIT"
+authors = ["Boog900"]
+
+[features]
+default = ["txpool"]
+txpool = ["dep:rand_distr", "dep:tokio-util", "dep:tokio"]
+
+[dependencies]
+tower = { workspace = true, features = ["discover", "util"] }
+tracing = { workspace = true, features = ["std"] }
+
+futures = { workspace = true, features = ["std"] }
+tokio = { workspace = true, features = ["rt", "sync", "macros"], optional = true}
+tokio-util = { workspace = true, features = ["time"], optional = true }
+
+rand = { workspace = true, features = ["std", "std_rng"] }
+rand_distr = { workspace = true, features = ["std"], optional = true }
+
+thiserror = { workspace = true }
+
+[dev-dependencies]
+tokio = { workspace = true, features = ["rt-multi-thread", "macros", "sync"] }
+proptest = { workspace = true, features = ["default"] }
\ No newline at end of file
diff --git a/p2p/dandelion/src/config.rs b/p2p/dandelion/src/config.rs
new file mode 100644
index 0000000..71a4e5b
--- /dev/null
+++ b/p2p/dandelion/src/config.rs
@@ -0,0 +1,149 @@
+use std::{
+ ops::{Mul, Neg},
+ time::Duration,
+};
+
+/// When calculating the embargo timeout using the formula: `(-k*(k-1)*hop)/(2*log(1-ep))`
+///
+/// (1 - ep) is the probability that a transaction travels for `k` hops before a nodes embargo timeout fires, this constant is (1 - ep).
+const EMBARGO_FULL_TRAVEL_PROBABILITY: f64 = 0.90;
+
+/// The graph type to use for dandelion routing, the dandelion paper recommends [Graph::FourRegular].
+///
+/// The decision between line graphs and 4-regular graphs depend on the priorities of the system, if
+/// linkability of transactions is a first order concern then line graphs may be better, however 4-regular graphs
+/// can give constant-order privacy benefits against adversaries with knowledge of the graph.
+///
+/// See appendix C of the dandelion++ paper.
+#[derive(Default, Debug, Copy, Clone)]
+pub enum Graph {
+ /// Line graph.
+ ///
+ /// When this is selected one peer will be chosen from the outbound peers each epoch to route transactions
+ /// to.
+ ///
+ /// In general this is not recommend over [`Graph::FourRegular`] but may be better for certain systems.
+ Line,
+ /// Quasi-4-Regular.
+ ///
+ /// When this is selected two peers will be chosen from the outbound peers each epoch, each stem transaction
+ /// received will then be sent to one of these two peers. Transactions from the same node will always go to the
+ /// same peer.
+ #[default]
+ FourRegular,
+}
+
+/// The config used to initialize dandelion.
+///
+/// One notable missing item from the config is `Tbase` AKA the timeout parameter to prevent black hole
+/// attacks. This is removed from the config for simplicity, `Tbase` is calculated using the formula provided
+/// in the D++ paper:
+///
+/// `(-k*(k-1)*hop)/(2*log(1-ep))`
+///
+/// Where `k` is calculated from the fluff probability, `hop` is `time_between_hop` and `ep` is fixed at `0.1`.
+///
+#[derive(Debug, Clone, Copy)]
+pub struct DandelionConfig {
+ /// The time it takes for a stem transaction to pass through a node, including network latency.
+ ///
+ /// It's better to be safe and put a slightly higher value than lower.
+ pub time_between_hop: Duration,
+ /// The duration of an epoch.
+ pub epoch_duration: Duration,
+ /// `q` in the dandelion paper, this is the probability that a node will be in the fluff state for
+ /// a certain epoch.
+ ///
+ /// The dandelion paper recommends to make this value small, but the smaller this value, the higher
+ /// the broadcast latency.
+ ///
+ /// It is recommended for this value to be <= `0.2`, this value *MUST* be in range `0.0..=1.0`.
+ pub fluff_probability: f64,
+ /// The graph type.
+ pub graph: Graph,
+}
+
+impl DandelionConfig {
+ /// Returns the number of outbound peers to use to stem transactions.
+ ///
+ /// This value depends on the [`Graph`] chosen.
+ pub fn number_of_stems(&self) -> usize {
+ match self.graph {
+ Graph::Line => 1,
+ Graph::FourRegular => 2,
+ }
+ }
+
+ /// Returns the average embargo timeout, `Tbase` in the dandelion++ paper.
+ ///
+ /// This is the average embargo timeout _only including this node_ with `k` nodes also putting an embargo timeout
+ /// using the exponential distribution, the average until one of them fluffs is `Tbase / k`.
+ pub fn average_embargo_timeout(&self) -> Duration {
+ // we set k equal to the expected stem length with this fluff probability.
+ let k = self.expected_stem_length();
+ let time_between_hop = self.time_between_hop.as_secs_f64();
+
+ Duration::from_secs_f64(
+ // (-k*(k-1)*hop)/(2*ln(1-ep))
+ ((k.neg() * (k - 1.0) * time_between_hop)
+ / EMBARGO_FULL_TRAVEL_PROBABILITY.ln().mul(2.0))
+ .ceil(),
+ )
+ }
+
+ /// Returns the expected length of a stem.
+ pub fn expected_stem_length(&self) -> f64 {
+ self.fluff_probability.recip()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::{
+ f64::consts::E,
+ ops::{Mul, Neg},
+ time::Duration,
+ };
+
+ use proptest::{prop_assert, proptest};
+
+ use super::*;
+
+ #[test]
+ fn monerod_average_embargo_timeout() {
+ let cfg = DandelionConfig {
+ time_between_hop: Duration::from_millis(175),
+ epoch_duration: Default::default(),
+ fluff_probability: 0.125,
+ graph: Default::default(),
+ };
+
+ assert_eq!(cfg.average_embargo_timeout(), Duration::from_secs(47));
+ }
+
+ proptest! {
+ #[test]
+ fn embargo_full_travel_probablity_correct(time_between_hop in 1_u64..1_000_000, fluff_probability in 0.000001..1.0) {
+ let cfg = DandelionConfig {
+ time_between_hop: Duration::from_millis(time_between_hop),
+ epoch_duration: Default::default(),
+ fluff_probability,
+ graph: Default::default(),
+ };
+
+ // assert that the `average_embargo_timeout` is high enough that the probability of `k` nodes
+ // not diffusing before expected diffusion is greater than or equal to `EMBARGO_FULL_TRAVEL_PROBABLY`
+ //
+ // using the formula from in appendix B.5
+ let k = cfg.expected_stem_length();
+ let time_between_hop = cfg.time_between_hop.as_secs_f64();
+
+ let average_embargo_timeout = cfg.average_embargo_timeout().as_secs_f64();
+
+ let probability =
+ E.powf((k.neg() * (k - 1.0) * time_between_hop) / average_embargo_timeout.mul(2.0));
+
+ prop_assert!(probability >= EMBARGO_FULL_TRAVEL_PROBABILITY, "probability = {probability}, average_embargo_timeout = {average_embargo_timeout}");
+ }
+ }
+}
diff --git a/p2p/dandelion/src/lib.rs b/p2p/dandelion/src/lib.rs
new file mode 100644
index 0000000..f162724
--- /dev/null
+++ b/p2p/dandelion/src/lib.rs
@@ -0,0 +1,70 @@
+//! # Dandelion Tower
+//!
+//! This crate implements [dandelion++](https://arxiv.org/pdf/1805.11060.pdf), using [`tower`].
+//!
+//! This crate provides 2 [`tower::Service`]s, a [`DandelionRouter`] and a [`DandelionPool`](pool::DandelionPool).
+//! The router is pretty minimal and only handles the absolute necessary data to route transactions, whereas the
+//! pool keeps track of all data necessary for dandelion++ but requires you to provide a backing tx-pool.
+//!
+//! This split was done not because the [`DandelionPool`](pool::DandelionPool) is unnecessary but because it is hard
+//! to cover a wide range of projects when abstracting over the tx-pool. Not using the [`DandelionPool`](pool::DandelionPool)
+//! requires you to implement part of the paper yourself.
+//!
+//! # Features
+//!
+//! This crate only has one feature `txpool` which enables [`DandelionPool`](pool::DandelionPool).
+//!
+//! # Needed Services
+//!
+//! To use this crate you need to provide a few types.
+//!
+//! ## Diffuse Service
+//!
+//! This service should implement diffusion, which is sending the transaction to every peer, with each peer
+//! having a timer using the exponential distribution and batch sending all txs that were queued in that time.
+//!
+//! The diffuse service should have a request of [`DiffuseRequest`](traits::DiffuseRequest) and it's error
+//! should be [`tower::BoxError`].
+//!
+//! ## Outbound Peer Discoverer
+//!
+//! The outbound peer [`Discover`](tower::discover::Discover) should provide a stream of randomly selected outbound
+//! peers, these peers will then be used to route stem txs to.
+//!
+//! The peers will not be returned anywhere, so it is recommended to wrap them in some sort of drop guard that returns
+//! them back to a peer set.
+//!
+//! ## Peer Service
+//!
+//! This service represents a connection to an individual peer, this should be returned from the Outbound Peer
+//! Discover. This should immediately send the transaction to the peer when requested, i.e. it should _not_ set
+//! a timer.
+//!
+//! The diffuse service should have a request of [`StemRequest`](traits::StemRequest) and it's error
+//! should be [`tower::BoxError`].
+//!
+//! ## Backing Pool
+//!
+//! ([`DandelionPool`](pool::DandelionPool) only)
+//!
+//! This service is a backing tx-pool, in memory or on disk.
+//! The backing pool should have a request of [`TxStoreRequest`](traits::TxStoreRequest) and a response of
+//! [`TxStoreResponse`](traits::TxStoreResponse), with an error of [`tower::BoxError`].
+//!
+//! Users should keep a handle to the backing pool to request data from it, when requesting data you _must_
+//! make sure you only look in the public pool if you are going to be giving data to peers, as stem transactions
+//! must stay private.
+//!
+//! When removing data, for example because of a new block, you can remove from both pools provided it doesn't leak
+//! any data about stem transactions. You will probably want to set up a task that monitors the tx pool for stuck transactions,
+//! transactions that slipped in just as one was removed etc, this crate does not handle that.
+mod config;
+#[cfg(feature = "txpool")]
+pub mod pool;
+mod router;
+#[cfg(test)]
+mod tests;
+pub mod traits;
+
+pub use config::*;
+pub use router::*;
diff --git a/p2p/dandelion/src/pool.rs b/p2p/dandelion/src/pool.rs
new file mode 100644
index 0000000..eddcc67
--- /dev/null
+++ b/p2p/dandelion/src/pool.rs
@@ -0,0 +1,510 @@
+//! # Dandelion++ Pool
+//!
+//! This module contains [`DandelionPool`] which is a thin wrapper around a backing transaction store,
+//! which fully implements the dandelion++ protocol.
+//!
+//! ### How To Get Txs From [`DandelionPool`].
+//!
+//! [`DandelionPool`] does not provide a full tx-pool API. You cannot retrieve transactions from it or
+//! check what transactions are in it, to do this you must keep a handle to the backing transaction store
+//! yourself.
+//!
+//! The reason for this is, the [`DandelionPool`] will only itself be passing these requests onto the backing
+//! pool, so it makes sense to remove the "middle man".
+//!
+//! ### Keep Stem Transactions Hidden
+//!
+//! When using your handle to the backing store it must be remembered to keep transactions in the stem pool hidden.
+//! So handle any requests to the tx-pool like the stem side of the pool does not exist.
+//!
+use std::{
+ collections::{HashMap, HashSet},
+ future::Future,
+ hash::Hash,
+ marker::PhantomData,
+ pin::Pin,
+ task::{Context, Poll},
+ time::Duration,
+};
+
+use futures::{FutureExt, StreamExt};
+use rand::prelude::*;
+use rand_distr::Exp;
+use tokio::{
+ sync::{mpsc, oneshot},
+ task::JoinSet,
+};
+use tokio_util::{sync::PollSender, time::DelayQueue};
+use tower::{Service, ServiceExt};
+use tracing::Instrument;
+
+use crate::{
+ traits::{TxStoreRequest, TxStoreResponse},
+ DandelionConfig, DandelionRouteReq, DandelionRouterError, State, TxState,
+};
+
+/// Start the [`DandelionPool`].
+///
+/// This function spawns the [`DandelionPool`] and returns [`DandelionPoolService`] which can be used to send
+/// requests to the pool.
+///
+/// ### Args
+///
+/// - `buffer_size` is the size of the channel's buffer between the [`DandelionPoolService`] and [`DandelionPool`].
+/// - `dandelion_router` is the router service, kept generic instead of [`DandelionRouter`](crate::DandelionRouter) to allow
+/// user to customise routing functionality.
+/// - `backing_pool` is the backing transaction storage service
+/// - `config` is [`DandelionConfig`].
+pub fn start_dandelion_pool
(
+ buffer_size: usize,
+ dandelion_router: R,
+ backing_pool: P,
+ config: DandelionConfig,
+) -> DandelionPoolService
+where
+ Tx: Clone + Send + 'static,
+ TxID: Hash + Eq + Clone + Send + 'static,
+ PID: Hash + Eq + Clone + Send + 'static,
+ P: Service<
+ TxStoreRequest,
+ Response = TxStoreResponse,
+ Error = tower::BoxError,
+ > + Send
+ + 'static,
+ P::Future: Send + 'static,
+ R: Service, Response = State, Error = DandelionRouterError>
+ + Send
+ + 'static,
+ R::Future: Send + 'static,
+{
+ let (tx, rx) = mpsc::channel(buffer_size);
+
+ let pool = DandelionPool {
+ dandelion_router,
+ backing_pool,
+ routing_set: JoinSet::new(),
+ stem_origins: HashMap::new(),
+ embargo_timers: DelayQueue::new(),
+ embargo_dist: Exp::new(1.0 / config.average_embargo_timeout().as_secs_f64()).unwrap(),
+ config,
+ _tx: PhantomData,
+ };
+
+ let span = tracing::debug_span!("dandelion_pool");
+
+ tokio::spawn(pool.run(rx).instrument(span));
+
+ DandelionPoolService {
+ tx: PollSender::new(tx),
+ }
+}
+
+#[derive(Copy, Clone, Debug, thiserror::Error)]
+#[error("The dandelion pool was shutdown")]
+pub struct DandelionPoolShutDown;
+
+/// An incoming transaction for the [`DandelionPool`] to handle.
+///
+/// Users may notice there is no way to check if the dandelion-pool wants a tx according to an inventory message like seen
+/// in Bitcoin, only having a request for a full tx. Users should look in the *public* backing pool to handle inv messages,
+/// and request txs even if they are in the stem pool.
+pub struct IncomingTx {
+ /// The transaction.
+ ///
+ /// It is recommended to put this in an [`Arc`](std::sync::Arc) as it needs to be cloned to send to the backing
+ /// tx pool and [`DandelionRouter`](crate::DandelionRouter)
+ pub tx: Tx,
+ /// The transaction ID.
+ pub tx_id: TxID,
+ /// The routing state of this transaction.
+ pub tx_state: TxState,
+}
+
+/// The dandelion tx pool service.
+#[derive(Clone)]
+pub struct DandelionPoolService {
+ /// The channel to [`DandelionPool`].
+ tx: PollSender<(IncomingTx, oneshot::Sender<()>)>,
+}
+
+impl Service> for DandelionPoolService
+where
+ Tx: Clone + Send,
+ TxID: Hash + Eq + Clone + Send + 'static,
+ PID: Hash + Eq + Clone + Send + 'static,
+{
+ type Response = ();
+ type Error = DandelionPoolShutDown;
+ type Future =
+ Pin> + Send + 'static>>;
+
+ fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> {
+ self.tx.poll_reserve(cx).map_err(|_| DandelionPoolShutDown)
+ }
+
+ fn call(&mut self, req: IncomingTx) -> Self::Future {
+ // although the channel isn't sending anything we want to wait for the request to be handled before continuing.
+ let (tx, rx) = oneshot::channel();
+
+ let res = self
+ .tx
+ .send_item((req, tx))
+ .map_err(|_| DandelionPoolShutDown);
+
+ async move {
+ res?;
+ rx.await.expect("Oneshot dropped before response!");
+
+ Ok(())
+ }
+ .boxed()
+ }
+}
+
+/// The dandelion++ tx pool.
+///
+/// See the [module docs](self) for more.
+pub struct DandelionPool
{
+ /// The dandelion++ router
+ dandelion_router: R,
+ /// The backing tx storage.
+ backing_pool: P,
+ /// The set of tasks that are running the future returned from `dandelion_router`.
+ routing_set: JoinSet<(TxID, Result>)>,
+
+ /// The origin of stem transactions.
+ stem_origins: HashMap>,
+
+ /// Current stem pool embargo timers.
+ embargo_timers: DelayQueue,
+ /// The distrobution to sample to get embargo timers.
+ embargo_dist: Exp,
+
+ /// The d++ config.
+ config: DandelionConfig,
+
+ _tx: PhantomData,
+}
+
+impl
DandelionPool
+where
+ Tx: Clone + Send,
+ TxID: Hash + Eq + Clone + Send + 'static,
+ PID: Hash + Eq + Clone + Send + 'static,
+ P: Service<
+ TxStoreRequest,
+ Response = TxStoreResponse,
+ Error = tower::BoxError,
+ >,
+ P::Future: Send + 'static,
+ R: Service, Response = State, Error = DandelionRouterError>,
+ R::Future: Send + 'static,
+{
+ /// Stores the tx in the backing pools stem pool, setting the embargo timer, stem origin and steming the tx.
+ async fn store_tx_and_stem(
+ &mut self,
+ tx: Tx,
+ tx_id: TxID,
+ from: Option,
+ ) -> Result<(), tower::BoxError> {
+ self.backing_pool
+ .ready()
+ .await?
+ .call(TxStoreRequest::Store(
+ tx.clone(),
+ tx_id.clone(),
+ State::Stem,
+ ))
+ .await?;
+
+ let embargo_timer = self.embargo_dist.sample(&mut thread_rng());
+ tracing::debug!(
+ "Setting embargo timer for stem tx: {} seconds.",
+ embargo_timer
+ );
+ self.embargo_timers
+ .insert(tx_id.clone(), Duration::from_secs_f64(embargo_timer));
+
+ self.stem_tx(tx, tx_id, from).await
+ }
+
+ /// Stems the tx, setting the stem origin, if it wasn't already set.
+ ///
+ /// This function does not add the tx to the backing pool.
+ async fn stem_tx(
+ &mut self,
+ tx: Tx,
+ tx_id: TxID,
+ from: Option,
+ ) -> Result<(), tower::BoxError> {
+ if let Some(peer) = &from {
+ self.stem_origins
+ .entry(tx_id.clone())
+ .or_default()
+ .insert(peer.clone());
+ }
+
+ let state = from
+ .map(|from| TxState::Stem { from })
+ .unwrap_or(TxState::Local);
+
+ let fut = self
+ .dandelion_router
+ .ready()
+ .await?
+ .call(DandelionRouteReq {
+ tx,
+ state: state.clone(),
+ });
+
+ self.routing_set
+ .spawn(fut.map(|res| (tx_id, res.map_err(|_| state))));
+ Ok(())
+ }
+
+ /// Stores the tx in the backing pool and fluffs the tx, removing the stem data for this tx.
+ async fn store_and_fluff_tx(&mut self, tx: Tx, tx_id: TxID) -> Result<(), tower::BoxError> {
+ // fluffs the tx first to prevent timing attacks where we could fluff at different average times
+ // depending on if the tx was in the stem pool already or not.
+ // Massively overkill but this is a minimal change.
+ self.fluff_tx(tx.clone(), tx_id.clone()).await?;
+
+ // Remove the tx from the maps used during the stem phase.
+ self.stem_origins.remove(&tx_id);
+
+ self.backing_pool
+ .ready()
+ .await?
+ .call(TxStoreRequest::Store(tx, tx_id, State::Fluff))
+ .await?;
+
+ // The key for this is *Not* the tx_id, it is given on insert, so just keep the timer in the
+ // map. These timers should be relatively short, so it shouldn't be a problem.
+ //self.embargo_timers.try_remove(&tx_id);
+
+ Ok(())
+ }
+
+ /// Fluffs a tx, does not add the tx to the tx pool.
+ async fn fluff_tx(&mut self, tx: Tx, tx_id: TxID) -> Result<(), tower::BoxError> {
+ let fut = self
+ .dandelion_router
+ .ready()
+ .await?
+ .call(DandelionRouteReq {
+ tx,
+ state: TxState::Fluff,
+ });
+
+ self.routing_set
+ .spawn(fut.map(|res| (tx_id, res.map_err(|_| TxState::Fluff))));
+ Ok(())
+ }
+
+ /// Function to handle an incoming [`DandelionPoolRequest::IncomingTx`].
+ async fn handle_incoming_tx(
+ &mut self,
+ tx: Tx,
+ tx_state: TxState,
+ tx_id: TxID,
+ ) -> Result<(), tower::BoxError> {
+ let TxStoreResponse::Contains(have_tx) = self
+ .backing_pool
+ .ready()
+ .await?
+ .call(TxStoreRequest::Contains(tx_id.clone()))
+ .await?
+ else {
+ panic!("Backing tx pool responded with wrong response for request.");
+ };
+ // If we have already fluffed this tx then we don't need to do anything.
+ if have_tx == Some(State::Fluff) {
+ tracing::debug!("Already fluffed incoming tx, ignoring.");
+ return Ok(());
+ }
+
+ match tx_state {
+ TxState::Stem { from } => {
+ if self
+ .stem_origins
+ .get(&tx_id)
+ .is_some_and(|peers| peers.contains(&from))
+ {
+ tracing::debug!("Received stem tx twice from same peer, fluffing it");
+ // The same peer sent us a tx twice, fluff it.
+ self.promote_and_fluff_tx(tx_id).await
+ } else {
+ // This could be a new tx or it could have already been stemed, but we still stem it again
+ // unless the same peer sends us a tx twice.
+ tracing::debug!("Steming incoming tx");
+ self.store_tx_and_stem(tx, tx_id, Some(from)).await
+ }
+ }
+ TxState::Fluff => {
+ tracing::debug!("Fluffing incoming tx");
+ self.store_and_fluff_tx(tx, tx_id).await
+ }
+ TxState::Local => {
+ // If we have already stemed this tx then nothing to do.
+ if have_tx.is_some() {
+ tracing::debug!("Received a local tx that we already have, skipping");
+ return Ok(());
+ }
+ tracing::debug!("Steming local transaction");
+ self.store_tx_and_stem(tx, tx_id, None).await
+ }
+ }
+ }
+
+ /// Promotes a tx to the clear pool.
+ async fn promote_tx(&mut self, tx_id: TxID) -> Result<(), tower::BoxError> {
+ // Remove the tx from the maps used during the stem phase.
+ self.stem_origins.remove(&tx_id);
+
+ // The key for this is *Not* the tx_id, it is given on insert, so just keep the timer in the
+ // map. These timers should be relatively short, so it shouldn't be a problem.
+ //self.embargo_timers.try_remove(&tx_id);
+
+ self.backing_pool
+ .ready()
+ .await?
+ .call(TxStoreRequest::Promote(tx_id))
+ .await?;
+
+ Ok(())
+ }
+
+ /// Promotes a tx to the public fluff pool and fluffs the tx.
+ async fn promote_and_fluff_tx(&mut self, tx_id: TxID) -> Result<(), tower::BoxError> {
+ tracing::debug!("Promoting transaction to public pool and fluffing it.");
+
+ let TxStoreResponse::Transaction(tx) = self
+ .backing_pool
+ .ready()
+ .await?
+ .call(TxStoreRequest::Get(tx_id.clone()))
+ .await?
+ else {
+ panic!("Backing tx pool responded with wrong response for request.");
+ };
+
+ let Some((tx, state)) = tx else {
+ tracing::debug!("Could not find tx, skipping.");
+ return Ok(());
+ };
+
+ if state == State::Fluff {
+ tracing::debug!("Transaction already fluffed, skipping.");
+ return Ok(());
+ }
+
+ self.promote_tx(tx_id.clone()).await?;
+ self.fluff_tx(tx, tx_id).await
+ }
+
+ /// Returns a tx stored in the fluff _OR_ stem pool.
+ async fn get_tx_from_pool(&mut self, tx_id: TxID) -> Result