mirror of
https://github.com/Cuprate/cuprate.git
synced 2025-01-08 20:09:44 +00:00
dandelion-tower: improve API (#257)
* init * reduce the jobs handled by the dandelion pool * fix docs * resolve todo * review changes * Update p2p/dandelion-tower/src/pool/incoming_tx.rs Co-authored-by: hinto-janai <hinto.janai@protonmail.com> * Update p2p/dandelion-tower/src/pool/incoming_tx.rs Co-authored-by: hinto-janai <hinto.janai@protonmail.com> * `PId` -> `PeerId` --------- Co-authored-by: hinto-janai <hinto.janai@protonmail.com>
This commit is contained in:
parent
ccff75057e
commit
8655a3f5e5
9 changed files with 595 additions and 599 deletions
|
@ -2,17 +2,17 @@
|
||||||
//!
|
//!
|
||||||
//! This crate implements [dandelion++](https://arxiv.org/pdf/1805.11060.pdf), using [`tower`].
|
//! This crate implements [dandelion++](https://arxiv.org/pdf/1805.11060.pdf), using [`tower`].
|
||||||
//!
|
//!
|
||||||
//! This crate provides 2 [`tower::Service`]s, a [`DandelionRouter`] and a [`DandelionPool`](pool::DandelionPool).
|
//! This crate provides 2 [`tower::Service`]s, a [`DandelionRouter`] and a [`DandelionPoolManager`](pool::DandelionPoolManager).
|
||||||
//! The router is pretty minimal and only handles the absolute necessary data to route transactions, whereas the
|
//! The router is pretty minimal and only handles the absolute necessary data to route transactions, whereas the
|
||||||
//! pool keeps track of all data necessary for dandelion++ but requires you to provide a backing tx-pool.
|
//! pool keeps track of all data necessary for dandelion++ but requires you to provide a backing tx-pool.
|
||||||
//!
|
//!
|
||||||
//! This split was done not because the [`DandelionPool`](pool::DandelionPool) is unnecessary but because it is hard
|
//! This split was done not because the [`DandelionPoolManager`](pool::DandelionPoolManager) is unnecessary but because it is hard
|
||||||
//! to cover a wide range of projects when abstracting over the tx-pool. Not using the [`DandelionPool`](pool::DandelionPool)
|
//! to cover a wide range of projects when abstracting over the tx-pool. Not using the [`DandelionPoolManager`](pool::DandelionPoolManager)
|
||||||
//! requires you to implement part of the paper yourself.
|
//! requires you to implement part of the paper yourself.
|
||||||
//!
|
//!
|
||||||
//! # Features
|
//! # Features
|
||||||
//!
|
//!
|
||||||
//! This crate only has one feature `txpool` which enables [`DandelionPool`](pool::DandelionPool).
|
//! This crate only has one feature `txpool` which enables [`DandelionPoolManager`](pool::DandelionPoolManager).
|
||||||
//!
|
//!
|
||||||
//! # Needed Services
|
//! # Needed Services
|
||||||
//!
|
//!
|
||||||
|
@ -45,7 +45,7 @@
|
||||||
//!
|
//!
|
||||||
//! ## Backing Pool
|
//! ## Backing Pool
|
||||||
//!
|
//!
|
||||||
//! ([`DandelionPool`](pool::DandelionPool) only)
|
//! ([`DandelionPoolManager`](pool::DandelionPoolManager) only)
|
||||||
//!
|
//!
|
||||||
//! This service is a backing tx-pool, in memory or on disk.
|
//! This service is a backing tx-pool, in memory or on disk.
|
||||||
//! The backing pool should have a request of [`TxStoreRequest`](traits::TxStoreRequest) and a response of
|
//! The backing pool should have a request of [`TxStoreRequest`](traits::TxStoreRequest) and a response of
|
||||||
|
|
|
@ -1,509 +0,0 @@
|
||||||
//! # Dandelion++ Pool
|
|
||||||
//!
|
|
||||||
//! This module contains [`DandelionPool`] which is a thin wrapper around a backing transaction store,
|
|
||||||
//! which fully implements the dandelion++ protocol.
|
|
||||||
//!
|
|
||||||
//! ### How To Get Txs From [`DandelionPool`].
|
|
||||||
//!
|
|
||||||
//! [`DandelionPool`] does not provide a full tx-pool API. You cannot retrieve transactions from it or
|
|
||||||
//! check what transactions are in it, to do this you must keep a handle to the backing transaction store
|
|
||||||
//! yourself.
|
|
||||||
//!
|
|
||||||
//! The reason for this is, the [`DandelionPool`] will only itself be passing these requests onto the backing
|
|
||||||
//! pool, so it makes sense to remove the "middle man".
|
|
||||||
//!
|
|
||||||
//! ### Keep Stem Transactions Hidden
|
|
||||||
//!
|
|
||||||
//! When using your handle to the backing store it must be remembered to keep transactions in the stem pool hidden.
|
|
||||||
//! So handle any requests to the tx-pool like the stem side of the pool does not exist.
|
|
||||||
use std::{
|
|
||||||
collections::{HashMap, HashSet},
|
|
||||||
future::Future,
|
|
||||||
hash::Hash,
|
|
||||||
marker::PhantomData,
|
|
||||||
pin::Pin,
|
|
||||||
task::{Context, Poll},
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
|
|
||||||
use futures::{FutureExt, StreamExt};
|
|
||||||
use rand::prelude::*;
|
|
||||||
use rand_distr::Exp;
|
|
||||||
use tokio::{
|
|
||||||
sync::{mpsc, oneshot},
|
|
||||||
task::JoinSet,
|
|
||||||
};
|
|
||||||
use tokio_util::{sync::PollSender, time::DelayQueue};
|
|
||||||
use tower::{Service, ServiceExt};
|
|
||||||
use tracing::Instrument;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
traits::{TxStoreRequest, TxStoreResponse},
|
|
||||||
DandelionConfig, DandelionRouteReq, DandelionRouterError, State, TxState,
|
|
||||||
};
|
|
||||||
|
|
||||||
/// Start the [`DandelionPool`].
|
|
||||||
///
|
|
||||||
/// This function spawns the [`DandelionPool`] and returns [`DandelionPoolService`] which can be used to send
|
|
||||||
/// requests to the pool.
|
|
||||||
///
|
|
||||||
/// ### Args
|
|
||||||
///
|
|
||||||
/// - `buffer_size` is the size of the channel's buffer between the [`DandelionPoolService`] and [`DandelionPool`].
|
|
||||||
/// - `dandelion_router` is the router service, kept generic instead of [`DandelionRouter`](crate::DandelionRouter) to allow
|
|
||||||
/// user to customise routing functionality.
|
|
||||||
/// - `backing_pool` is the backing transaction storage service
|
|
||||||
/// - `config` is [`DandelionConfig`].
|
|
||||||
pub fn start_dandelion_pool<P, R, Tx, TxID, PID>(
|
|
||||||
buffer_size: usize,
|
|
||||||
dandelion_router: R,
|
|
||||||
backing_pool: P,
|
|
||||||
config: DandelionConfig,
|
|
||||||
) -> DandelionPoolService<Tx, TxID, PID>
|
|
||||||
where
|
|
||||||
Tx: Clone + Send + 'static,
|
|
||||||
TxID: Hash + Eq + Clone + Send + 'static,
|
|
||||||
PID: Hash + Eq + Clone + Send + 'static,
|
|
||||||
P: Service<
|
|
||||||
TxStoreRequest<Tx, TxID>,
|
|
||||||
Response = TxStoreResponse<Tx, TxID>,
|
|
||||||
Error = tower::BoxError,
|
|
||||||
> + Send
|
|
||||||
+ 'static,
|
|
||||||
P::Future: Send + 'static,
|
|
||||||
R: Service<DandelionRouteReq<Tx, PID>, Response = State, Error = DandelionRouterError>
|
|
||||||
+ Send
|
|
||||||
+ 'static,
|
|
||||||
R::Future: Send + 'static,
|
|
||||||
{
|
|
||||||
let (tx, rx) = mpsc::channel(buffer_size);
|
|
||||||
|
|
||||||
let pool = DandelionPool {
|
|
||||||
dandelion_router,
|
|
||||||
backing_pool,
|
|
||||||
routing_set: JoinSet::new(),
|
|
||||||
stem_origins: HashMap::new(),
|
|
||||||
embargo_timers: DelayQueue::new(),
|
|
||||||
embargo_dist: Exp::new(1.0 / config.average_embargo_timeout().as_secs_f64()).unwrap(),
|
|
||||||
config,
|
|
||||||
_tx: PhantomData,
|
|
||||||
};
|
|
||||||
|
|
||||||
let span = tracing::debug_span!("dandelion_pool");
|
|
||||||
|
|
||||||
tokio::spawn(pool.run(rx).instrument(span));
|
|
||||||
|
|
||||||
DandelionPoolService {
|
|
||||||
tx: PollSender::new(tx),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, thiserror::Error)]
|
|
||||||
#[error("The dandelion pool was shutdown")]
|
|
||||||
pub struct DandelionPoolShutDown;
|
|
||||||
|
|
||||||
/// An incoming transaction for the [`DandelionPool`] to handle.
|
|
||||||
///
|
|
||||||
/// Users may notice there is no way to check if the dandelion-pool wants a tx according to an inventory message like seen
|
|
||||||
/// in Bitcoin, only having a request for a full tx. Users should look in the *public* backing pool to handle inv messages,
|
|
||||||
/// and request txs even if they are in the stem pool.
|
|
||||||
pub struct IncomingTx<Tx, TxID, PID> {
|
|
||||||
/// The transaction.
|
|
||||||
///
|
|
||||||
/// It is recommended to put this in an [`Arc`](std::sync::Arc) as it needs to be cloned to send to the backing
|
|
||||||
/// tx pool and [`DandelionRouter`](crate::DandelionRouter)
|
|
||||||
pub tx: Tx,
|
|
||||||
/// The transaction ID.
|
|
||||||
pub tx_id: TxID,
|
|
||||||
/// The routing state of this transaction.
|
|
||||||
pub tx_state: TxState<PID>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The dandelion tx pool service.
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct DandelionPoolService<Tx, TxID, PID> {
|
|
||||||
/// The channel to [`DandelionPool`].
|
|
||||||
tx: PollSender<(IncomingTx<Tx, TxID, PID>, oneshot::Sender<()>)>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Tx, TxID, PID> Service<IncomingTx<Tx, TxID, PID>> for DandelionPoolService<Tx, TxID, PID>
|
|
||||||
where
|
|
||||||
Tx: Clone + Send,
|
|
||||||
TxID: Hash + Eq + Clone + Send + 'static,
|
|
||||||
PID: Hash + Eq + Clone + Send + 'static,
|
|
||||||
{
|
|
||||||
type Response = ();
|
|
||||||
type Error = DandelionPoolShutDown;
|
|
||||||
type Future =
|
|
||||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
|
||||||
|
|
||||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
|
||||||
self.tx.poll_reserve(cx).map_err(|_| DandelionPoolShutDown)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn call(&mut self, req: IncomingTx<Tx, TxID, PID>) -> Self::Future {
|
|
||||||
// although the channel isn't sending anything we want to wait for the request to be handled before continuing.
|
|
||||||
let (tx, rx) = oneshot::channel();
|
|
||||||
|
|
||||||
let res = self
|
|
||||||
.tx
|
|
||||||
.send_item((req, tx))
|
|
||||||
.map_err(|_| DandelionPoolShutDown);
|
|
||||||
|
|
||||||
async move {
|
|
||||||
res?;
|
|
||||||
rx.await.expect("Oneshot dropped before response!");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
.boxed()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The dandelion++ tx pool.
|
|
||||||
///
|
|
||||||
/// See the [module docs](self) for more.
|
|
||||||
pub struct DandelionPool<P, R, Tx, TxID, PID> {
|
|
||||||
/// The dandelion++ router
|
|
||||||
dandelion_router: R,
|
|
||||||
/// The backing tx storage.
|
|
||||||
backing_pool: P,
|
|
||||||
/// The set of tasks that are running the future returned from `dandelion_router`.
|
|
||||||
routing_set: JoinSet<(TxID, Result<State, TxState<PID>>)>,
|
|
||||||
|
|
||||||
/// The origin of stem transactions.
|
|
||||||
stem_origins: HashMap<TxID, HashSet<PID>>,
|
|
||||||
|
|
||||||
/// Current stem pool embargo timers.
|
|
||||||
embargo_timers: DelayQueue<TxID>,
|
|
||||||
/// The distrobution to sample to get embargo timers.
|
|
||||||
embargo_dist: Exp<f64>,
|
|
||||||
|
|
||||||
/// The d++ config.
|
|
||||||
config: DandelionConfig,
|
|
||||||
|
|
||||||
_tx: PhantomData<Tx>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<P, R, Tx, TxID, PID> DandelionPool<P, R, Tx, TxID, PID>
|
|
||||||
where
|
|
||||||
Tx: Clone + Send,
|
|
||||||
TxID: Hash + Eq + Clone + Send + 'static,
|
|
||||||
PID: Hash + Eq + Clone + Send + 'static,
|
|
||||||
P: Service<
|
|
||||||
TxStoreRequest<Tx, TxID>,
|
|
||||||
Response = TxStoreResponse<Tx, TxID>,
|
|
||||||
Error = tower::BoxError,
|
|
||||||
>,
|
|
||||||
P::Future: Send + 'static,
|
|
||||||
R: Service<DandelionRouteReq<Tx, PID>, Response = State, Error = DandelionRouterError>,
|
|
||||||
R::Future: Send + 'static,
|
|
||||||
{
|
|
||||||
/// Stores the tx in the backing pools stem pool, setting the embargo timer, stem origin and steming the tx.
|
|
||||||
async fn store_tx_and_stem(
|
|
||||||
&mut self,
|
|
||||||
tx: Tx,
|
|
||||||
tx_id: TxID,
|
|
||||||
from: Option<PID>,
|
|
||||||
) -> Result<(), tower::BoxError> {
|
|
||||||
self.backing_pool
|
|
||||||
.ready()
|
|
||||||
.await?
|
|
||||||
.call(TxStoreRequest::Store(
|
|
||||||
tx.clone(),
|
|
||||||
tx_id.clone(),
|
|
||||||
State::Stem,
|
|
||||||
))
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let embargo_timer = self.embargo_dist.sample(&mut thread_rng());
|
|
||||||
tracing::debug!(
|
|
||||||
"Setting embargo timer for stem tx: {} seconds.",
|
|
||||||
embargo_timer
|
|
||||||
);
|
|
||||||
self.embargo_timers
|
|
||||||
.insert(tx_id.clone(), Duration::from_secs_f64(embargo_timer));
|
|
||||||
|
|
||||||
self.stem_tx(tx, tx_id, from).await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Stems the tx, setting the stem origin, if it wasn't already set.
|
|
||||||
///
|
|
||||||
/// This function does not add the tx to the backing pool.
|
|
||||||
async fn stem_tx(
|
|
||||||
&mut self,
|
|
||||||
tx: Tx,
|
|
||||||
tx_id: TxID,
|
|
||||||
from: Option<PID>,
|
|
||||||
) -> Result<(), tower::BoxError> {
|
|
||||||
if let Some(peer) = &from {
|
|
||||||
self.stem_origins
|
|
||||||
.entry(tx_id.clone())
|
|
||||||
.or_default()
|
|
||||||
.insert(peer.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
let state = from
|
|
||||||
.map(|from| TxState::Stem { from })
|
|
||||||
.unwrap_or(TxState::Local);
|
|
||||||
|
|
||||||
let fut = self
|
|
||||||
.dandelion_router
|
|
||||||
.ready()
|
|
||||||
.await?
|
|
||||||
.call(DandelionRouteReq {
|
|
||||||
tx,
|
|
||||||
state: state.clone(),
|
|
||||||
});
|
|
||||||
|
|
||||||
self.routing_set
|
|
||||||
.spawn(fut.map(|res| (tx_id, res.map_err(|_| state))));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Stores the tx in the backing pool and fluffs the tx, removing the stem data for this tx.
|
|
||||||
async fn store_and_fluff_tx(&mut self, tx: Tx, tx_id: TxID) -> Result<(), tower::BoxError> {
|
|
||||||
// fluffs the tx first to prevent timing attacks where we could fluff at different average times
|
|
||||||
// depending on if the tx was in the stem pool already or not.
|
|
||||||
// Massively overkill but this is a minimal change.
|
|
||||||
self.fluff_tx(tx.clone(), tx_id.clone()).await?;
|
|
||||||
|
|
||||||
// Remove the tx from the maps used during the stem phase.
|
|
||||||
self.stem_origins.remove(&tx_id);
|
|
||||||
|
|
||||||
self.backing_pool
|
|
||||||
.ready()
|
|
||||||
.await?
|
|
||||||
.call(TxStoreRequest::Store(tx, tx_id, State::Fluff))
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// The key for this is *Not* the tx_id, it is given on insert, so just keep the timer in the
|
|
||||||
// map. These timers should be relatively short, so it shouldn't be a problem.
|
|
||||||
//self.embargo_timers.try_remove(&tx_id);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Fluffs a tx, does not add the tx to the tx pool.
|
|
||||||
async fn fluff_tx(&mut self, tx: Tx, tx_id: TxID) -> Result<(), tower::BoxError> {
|
|
||||||
let fut = self
|
|
||||||
.dandelion_router
|
|
||||||
.ready()
|
|
||||||
.await?
|
|
||||||
.call(DandelionRouteReq {
|
|
||||||
tx,
|
|
||||||
state: TxState::Fluff,
|
|
||||||
});
|
|
||||||
|
|
||||||
self.routing_set
|
|
||||||
.spawn(fut.map(|res| (tx_id, res.map_err(|_| TxState::Fluff))));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Function to handle an incoming [`DandelionPoolRequest::IncomingTx`].
|
|
||||||
async fn handle_incoming_tx(
|
|
||||||
&mut self,
|
|
||||||
tx: Tx,
|
|
||||||
tx_state: TxState<PID>,
|
|
||||||
tx_id: TxID,
|
|
||||||
) -> Result<(), tower::BoxError> {
|
|
||||||
let TxStoreResponse::Contains(have_tx) = self
|
|
||||||
.backing_pool
|
|
||||||
.ready()
|
|
||||||
.await?
|
|
||||||
.call(TxStoreRequest::Contains(tx_id.clone()))
|
|
||||||
.await?
|
|
||||||
else {
|
|
||||||
panic!("Backing tx pool responded with wrong response for request.");
|
|
||||||
};
|
|
||||||
// If we have already fluffed this tx then we don't need to do anything.
|
|
||||||
if have_tx == Some(State::Fluff) {
|
|
||||||
tracing::debug!("Already fluffed incoming tx, ignoring.");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
match tx_state {
|
|
||||||
TxState::Stem { from } => {
|
|
||||||
if self
|
|
||||||
.stem_origins
|
|
||||||
.get(&tx_id)
|
|
||||||
.is_some_and(|peers| peers.contains(&from))
|
|
||||||
{
|
|
||||||
tracing::debug!("Received stem tx twice from same peer, fluffing it");
|
|
||||||
// The same peer sent us a tx twice, fluff it.
|
|
||||||
self.promote_and_fluff_tx(tx_id).await
|
|
||||||
} else {
|
|
||||||
// This could be a new tx or it could have already been stemed, but we still stem it again
|
|
||||||
// unless the same peer sends us a tx twice.
|
|
||||||
tracing::debug!("Steming incoming tx");
|
|
||||||
self.store_tx_and_stem(tx, tx_id, Some(from)).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
TxState::Fluff => {
|
|
||||||
tracing::debug!("Fluffing incoming tx");
|
|
||||||
self.store_and_fluff_tx(tx, tx_id).await
|
|
||||||
}
|
|
||||||
TxState::Local => {
|
|
||||||
// If we have already stemed this tx then nothing to do.
|
|
||||||
if have_tx.is_some() {
|
|
||||||
tracing::debug!("Received a local tx that we already have, skipping");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
tracing::debug!("Steming local transaction");
|
|
||||||
self.store_tx_and_stem(tx, tx_id, None).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Promotes a tx to the clear pool.
|
|
||||||
async fn promote_tx(&mut self, tx_id: TxID) -> Result<(), tower::BoxError> {
|
|
||||||
// Remove the tx from the maps used during the stem phase.
|
|
||||||
self.stem_origins.remove(&tx_id);
|
|
||||||
|
|
||||||
// The key for this is *Not* the tx_id, it is given on insert, so just keep the timer in the
|
|
||||||
// map. These timers should be relatively short, so it shouldn't be a problem.
|
|
||||||
//self.embargo_timers.try_remove(&tx_id);
|
|
||||||
|
|
||||||
self.backing_pool
|
|
||||||
.ready()
|
|
||||||
.await?
|
|
||||||
.call(TxStoreRequest::Promote(tx_id))
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Promotes a tx to the public fluff pool and fluffs the tx.
|
|
||||||
async fn promote_and_fluff_tx(&mut self, tx_id: TxID) -> Result<(), tower::BoxError> {
|
|
||||||
tracing::debug!("Promoting transaction to public pool and fluffing it.");
|
|
||||||
|
|
||||||
let TxStoreResponse::Transaction(tx) = self
|
|
||||||
.backing_pool
|
|
||||||
.ready()
|
|
||||||
.await?
|
|
||||||
.call(TxStoreRequest::Get(tx_id.clone()))
|
|
||||||
.await?
|
|
||||||
else {
|
|
||||||
panic!("Backing tx pool responded with wrong response for request.");
|
|
||||||
};
|
|
||||||
|
|
||||||
let Some((tx, state)) = tx else {
|
|
||||||
tracing::debug!("Could not find tx, skipping.");
|
|
||||||
return Ok(());
|
|
||||||
};
|
|
||||||
|
|
||||||
if state == State::Fluff {
|
|
||||||
tracing::debug!("Transaction already fluffed, skipping.");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
self.promote_tx(tx_id.clone()).await?;
|
|
||||||
self.fluff_tx(tx, tx_id).await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a tx stored in the fluff _OR_ stem pool.
|
|
||||||
async fn get_tx_from_pool(&mut self, tx_id: TxID) -> Result<Option<Tx>, tower::BoxError> {
|
|
||||||
let TxStoreResponse::Transaction(tx) = self
|
|
||||||
.backing_pool
|
|
||||||
.ready()
|
|
||||||
.await?
|
|
||||||
.call(TxStoreRequest::Get(tx_id))
|
|
||||||
.await?
|
|
||||||
else {
|
|
||||||
panic!("Backing tx pool responded with wrong response for request.");
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(tx.map(|tx| tx.0))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Starts the [`DandelionPool`].
|
|
||||||
async fn run(
|
|
||||||
mut self,
|
|
||||||
mut rx: mpsc::Receiver<(IncomingTx<Tx, TxID, PID>, oneshot::Sender<()>)>,
|
|
||||||
) {
|
|
||||||
tracing::debug!("Starting dandelion++ tx-pool, config: {:?}", self.config);
|
|
||||||
|
|
||||||
// On start up we just fluff all txs left in the stem pool.
|
|
||||||
let Ok(TxStoreResponse::IDs(ids)) = (&mut self.backing_pool)
|
|
||||||
.oneshot(TxStoreRequest::IDsInStemPool)
|
|
||||||
.await
|
|
||||||
else {
|
|
||||||
tracing::error!("Failed to get transactions in stem pool.");
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
tracing::debug!(
|
|
||||||
"Fluffing {} txs that are currently in the stem pool",
|
|
||||||
ids.len()
|
|
||||||
);
|
|
||||||
|
|
||||||
for id in ids {
|
|
||||||
if let Err(e) = self.promote_and_fluff_tx(id).await {
|
|
||||||
tracing::error!("Failed to fluff tx in the stem pool at start up, {e}.");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
tracing::trace!("Waiting for next event.");
|
|
||||||
tokio::select! {
|
|
||||||
// biased to handle current txs before routing new ones.
|
|
||||||
biased;
|
|
||||||
Some(fired) = self.embargo_timers.next() => {
|
|
||||||
tracing::debug!("Embargo timer fired, did not see stem tx in time.");
|
|
||||||
|
|
||||||
let tx_id = fired.into_inner();
|
|
||||||
if let Err(e) = self.promote_and_fluff_tx(tx_id).await {
|
|
||||||
tracing::error!("Error handling fired embargo timer: {e}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some(Ok((tx_id, res))) = self.routing_set.join_next() => {
|
|
||||||
tracing::trace!("Received d++ routing result.");
|
|
||||||
|
|
||||||
let res = match res {
|
|
||||||
Ok(State::Fluff) => {
|
|
||||||
tracing::debug!("Transaction was fluffed upgrading it to the public pool.");
|
|
||||||
self.promote_tx(tx_id).await
|
|
||||||
}
|
|
||||||
Err(tx_state) => {
|
|
||||||
tracing::debug!("Error routing transaction, trying again.");
|
|
||||||
|
|
||||||
match self.get_tx_from_pool(tx_id.clone()).await {
|
|
||||||
Ok(Some(tx)) => match tx_state {
|
|
||||||
TxState::Fluff => self.fluff_tx(tx, tx_id).await,
|
|
||||||
TxState::Stem { from } => self.stem_tx(tx, tx_id, Some(from)).await,
|
|
||||||
TxState::Local => self.stem_tx(tx, tx_id, None).await,
|
|
||||||
}
|
|
||||||
Err(e) => Err(e),
|
|
||||||
_ => continue,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(State::Stem) => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(e) = res {
|
|
||||||
tracing::error!("Error handling transaction routing return: {e}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
req = rx.recv() => {
|
|
||||||
tracing::debug!("Received new tx to route.");
|
|
||||||
|
|
||||||
let Some((IncomingTx { tx, tx_state, tx_id }, res_tx)) = req else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(e) = self.handle_incoming_tx(tx, tx_state, tx_id).await {
|
|
||||||
let _ = res_tx.send(());
|
|
||||||
|
|
||||||
tracing::error!("Error handling transaction in dandelion pool: {e}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let _ = res_tx.send(());
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
113
p2p/dandelion-tower/src/pool/incoming_tx.rs
Normal file
113
p2p/dandelion-tower/src/pool/incoming_tx.rs
Normal file
|
@ -0,0 +1,113 @@
|
||||||
|
//! Contains [`IncomingTx`] and [`IncomingTxBuilder`]
|
||||||
|
use crate::{State, TxState};
|
||||||
|
|
||||||
|
/// An incoming transaction that has gone through the preprocessing stage.
|
||||||
|
pub struct IncomingTx<Tx, TxId, PeerId> {
|
||||||
|
/// The transaction.
|
||||||
|
pub(crate) tx: Tx,
|
||||||
|
/// The transaction ID.
|
||||||
|
pub(crate) tx_id: TxId,
|
||||||
|
/// The routing state of the transaction.
|
||||||
|
pub(crate) routing_state: TxState<PeerId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An [`IncomingTx`] builder.
|
||||||
|
///
|
||||||
|
/// The const generics here are used to restrict what methods can be called.
|
||||||
|
///
|
||||||
|
/// - `RS`: routing state; a `bool` for if the routing state is set
|
||||||
|
/// - `DBS`: database state; a `bool` for if the state in the DB is set
|
||||||
|
pub struct IncomingTxBuilder<const RS: bool, const DBS: bool, Tx, TxId, PeerId> {
|
||||||
|
/// The transaction.
|
||||||
|
tx: Tx,
|
||||||
|
/// The transaction ID.
|
||||||
|
tx_id: TxId,
|
||||||
|
/// The routing state of the transaction.
|
||||||
|
routing_state: Option<TxState<PeerId>>,
|
||||||
|
/// The state of this transaction in the DB.
|
||||||
|
state_in_db: Option<State>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Tx, TxId, PeerId> IncomingTxBuilder<false, false, Tx, TxId, PeerId> {
|
||||||
|
/// Creates a new [`IncomingTxBuilder`].
|
||||||
|
pub fn new(tx: Tx, tx_id: TxId) -> Self {
|
||||||
|
Self {
|
||||||
|
tx,
|
||||||
|
tx_id,
|
||||||
|
routing_state: None,
|
||||||
|
state_in_db: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<const DBS: bool, Tx, TxId, PeerId> IncomingTxBuilder<false, DBS, Tx, TxId, PeerId> {
|
||||||
|
/// Adds the routing state to the builder.
|
||||||
|
///
|
||||||
|
/// The routing state is the origin of this transaction from our perspective.
|
||||||
|
pub fn with_routing_state(
|
||||||
|
self,
|
||||||
|
state: TxState<PeerId>,
|
||||||
|
) -> IncomingTxBuilder<true, DBS, Tx, TxId, PeerId> {
|
||||||
|
IncomingTxBuilder {
|
||||||
|
tx: self.tx,
|
||||||
|
tx_id: self.tx_id,
|
||||||
|
routing_state: Some(state),
|
||||||
|
state_in_db: self.state_in_db,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<const RS: bool, Tx, TxId, PeerId> IncomingTxBuilder<RS, false, Tx, TxId, PeerId> {
|
||||||
|
/// Adds the database state to the builder.
|
||||||
|
///
|
||||||
|
/// If the transaction is not in the DB already then the state should be [`None`].
|
||||||
|
pub fn with_state_in_db(
|
||||||
|
self,
|
||||||
|
state: Option<State>,
|
||||||
|
) -> IncomingTxBuilder<RS, true, Tx, TxId, PeerId> {
|
||||||
|
IncomingTxBuilder {
|
||||||
|
tx: self.tx,
|
||||||
|
tx_id: self.tx_id,
|
||||||
|
routing_state: self.routing_state,
|
||||||
|
state_in_db: state,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Tx, TxId, PeerId> IncomingTxBuilder<true, true, Tx, TxId, PeerId> {
|
||||||
|
/// Builds the [`IncomingTx`].
|
||||||
|
///
|
||||||
|
/// If this returns [`None`] then the transaction does not need to be given to the dandelion pool
|
||||||
|
/// manager.
|
||||||
|
pub fn build(self) -> Option<IncomingTx<Tx, TxId, PeerId>> {
|
||||||
|
let routing_state = self.routing_state.unwrap();
|
||||||
|
|
||||||
|
if self.state_in_db == Some(State::Fluff) {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(IncomingTx {
|
||||||
|
tx: self.tx,
|
||||||
|
tx_id: self.tx_id,
|
||||||
|
routing_state,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_builder() {
|
||||||
|
IncomingTxBuilder::new(1, 2)
|
||||||
|
.with_routing_state(TxState::Stem { from: 3 })
|
||||||
|
.with_state_in_db(None)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
IncomingTxBuilder::new(1, 2)
|
||||||
|
.with_state_in_db(None)
|
||||||
|
.with_routing_state(TxState::Stem { from: 3 })
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
294
p2p/dandelion-tower/src/pool/manager.rs
Normal file
294
p2p/dandelion-tower/src/pool/manager.rs
Normal file
|
@ -0,0 +1,294 @@
|
||||||
|
use std::{
|
||||||
|
collections::{HashMap, HashSet},
|
||||||
|
hash::Hash,
|
||||||
|
marker::PhantomData,
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
|
use futures::{FutureExt, StreamExt};
|
||||||
|
use rand::prelude::*;
|
||||||
|
use rand_distr::Exp;
|
||||||
|
use tokio::{
|
||||||
|
sync::{mpsc, oneshot},
|
||||||
|
task::JoinSet,
|
||||||
|
};
|
||||||
|
use tokio_util::time::DelayQueue;
|
||||||
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
pool::IncomingTx,
|
||||||
|
traits::{TxStoreRequest, TxStoreResponse},
|
||||||
|
DandelionConfig, DandelionRouteReq, DandelionRouterError, State, TxState,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug, thiserror::Error)]
|
||||||
|
#[error("The dandelion pool was shutdown")]
|
||||||
|
pub struct DandelionPoolShutDown;
|
||||||
|
|
||||||
|
/// The dandelion++ pool manager.
|
||||||
|
///
|
||||||
|
/// See the [module docs](super) for more.
|
||||||
|
pub struct DandelionPoolManager<P, R, Tx, TxId, PeerId> {
|
||||||
|
/// The dandelion++ router
|
||||||
|
pub(crate) dandelion_router: R,
|
||||||
|
/// The backing tx storage.
|
||||||
|
pub(crate) backing_pool: P,
|
||||||
|
/// The set of tasks that are running the future returned from `dandelion_router`.
|
||||||
|
pub(crate) routing_set: JoinSet<(TxId, Result<State, TxState<PeerId>>)>,
|
||||||
|
|
||||||
|
/// The origin of stem transactions.
|
||||||
|
pub(crate) stem_origins: HashMap<TxId, HashSet<PeerId>>,
|
||||||
|
|
||||||
|
/// Current stem pool embargo timers.
|
||||||
|
pub(crate) embargo_timers: DelayQueue<TxId>,
|
||||||
|
/// The distrobution to sample to get embargo timers.
|
||||||
|
pub(crate) embargo_dist: Exp<f64>,
|
||||||
|
|
||||||
|
/// The d++ config.
|
||||||
|
pub(crate) config: DandelionConfig,
|
||||||
|
|
||||||
|
pub(crate) _tx: PhantomData<Tx>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<P, R, Tx, TxId, PeerId> DandelionPoolManager<P, R, Tx, TxId, PeerId>
|
||||||
|
where
|
||||||
|
Tx: Clone + Send,
|
||||||
|
TxId: Hash + Eq + Clone + Send + 'static,
|
||||||
|
PeerId: Hash + Eq + Clone + Send + 'static,
|
||||||
|
P: Service<TxStoreRequest<TxId>, Response = TxStoreResponse<Tx>, Error = tower::BoxError>,
|
||||||
|
P::Future: Send + 'static,
|
||||||
|
R: Service<DandelionRouteReq<Tx, PeerId>, Response = State, Error = DandelionRouterError>,
|
||||||
|
R::Future: Send + 'static,
|
||||||
|
{
|
||||||
|
/// Adds a new embargo timer to the running timers, with a duration pulled from [`Self::embargo_dist`]
|
||||||
|
fn add_embargo_timer_for_tx(&mut self, tx_id: TxId) {
|
||||||
|
let embargo_timer = self.embargo_dist.sample(&mut thread_rng());
|
||||||
|
tracing::debug!(
|
||||||
|
"Setting embargo timer for stem tx: {} seconds.",
|
||||||
|
embargo_timer
|
||||||
|
);
|
||||||
|
|
||||||
|
self.embargo_timers
|
||||||
|
.insert(tx_id, Duration::from_secs_f64(embargo_timer));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stems the tx, setting the stem origin, if it wasn't already set.
|
||||||
|
///
|
||||||
|
/// This function does not add the tx to the backing pool.
|
||||||
|
async fn stem_tx(
|
||||||
|
&mut self,
|
||||||
|
tx: Tx,
|
||||||
|
tx_id: TxId,
|
||||||
|
from: Option<PeerId>,
|
||||||
|
) -> Result<(), tower::BoxError> {
|
||||||
|
if let Some(peer) = &from {
|
||||||
|
self.stem_origins
|
||||||
|
.entry(tx_id.clone())
|
||||||
|
.or_default()
|
||||||
|
.insert(peer.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
let state = from
|
||||||
|
.map(|from| TxState::Stem { from })
|
||||||
|
.unwrap_or(TxState::Local);
|
||||||
|
|
||||||
|
let fut = self
|
||||||
|
.dandelion_router
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(DandelionRouteReq {
|
||||||
|
tx,
|
||||||
|
state: state.clone(),
|
||||||
|
});
|
||||||
|
|
||||||
|
self.routing_set
|
||||||
|
.spawn(fut.map(|res| (tx_id, res.map_err(|_| state))));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fluffs a tx, does not add the tx to the tx pool.
|
||||||
|
async fn fluff_tx(&mut self, tx: Tx, tx_id: TxId) -> Result<(), tower::BoxError> {
|
||||||
|
let fut = self
|
||||||
|
.dandelion_router
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(DandelionRouteReq {
|
||||||
|
tx,
|
||||||
|
state: TxState::Fluff,
|
||||||
|
});
|
||||||
|
|
||||||
|
self.routing_set
|
||||||
|
.spawn(fut.map(|res| (tx_id, res.map_err(|_| TxState::Fluff))));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Function to handle an [`IncomingTx`].
|
||||||
|
async fn handle_incoming_tx(
|
||||||
|
&mut self,
|
||||||
|
tx: Tx,
|
||||||
|
tx_state: TxState<PeerId>,
|
||||||
|
tx_id: TxId,
|
||||||
|
) -> Result<(), tower::BoxError> {
|
||||||
|
match tx_state {
|
||||||
|
TxState::Stem { from } => {
|
||||||
|
if self
|
||||||
|
.stem_origins
|
||||||
|
.get(&tx_id)
|
||||||
|
.is_some_and(|peers| peers.contains(&from))
|
||||||
|
{
|
||||||
|
tracing::debug!("Received stem tx twice from same peer, fluffing it");
|
||||||
|
// The same peer sent us a tx twice, fluff it.
|
||||||
|
self.promote_and_fluff_tx(tx_id).await?;
|
||||||
|
} else {
|
||||||
|
// This could be a new tx or it could have already been stemed, but we still stem it again
|
||||||
|
// unless the same peer sends us a tx twice.
|
||||||
|
tracing::debug!("Steming incoming tx");
|
||||||
|
self.stem_tx(tx, tx_id.clone(), Some(from)).await?;
|
||||||
|
self.add_embargo_timer_for_tx(tx_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
TxState::Fluff => {
|
||||||
|
tracing::debug!("Fluffing incoming tx");
|
||||||
|
self.fluff_tx(tx, tx_id).await?;
|
||||||
|
}
|
||||||
|
TxState::Local => {
|
||||||
|
tracing::debug!("Steming local transaction");
|
||||||
|
self.stem_tx(tx, tx_id.clone(), None).await?;
|
||||||
|
self.add_embargo_timer_for_tx(tx_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Promotes a tx to the clear pool.
|
||||||
|
async fn promote_tx(&mut self, tx_id: TxId) -> Result<(), tower::BoxError> {
|
||||||
|
// Remove the tx from the maps used during the stem phase.
|
||||||
|
self.stem_origins.remove(&tx_id);
|
||||||
|
|
||||||
|
// The key for this is *Not* the tx_id, it is given on insert, so just keep the timer in the
|
||||||
|
// map. These timers should be relatively short, so it shouldn't be a problem.
|
||||||
|
//self.embargo_timers.try_remove(&tx_id);
|
||||||
|
|
||||||
|
self.backing_pool
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(TxStoreRequest::Promote(tx_id))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Promotes a tx to the public fluff pool and fluffs the tx.
|
||||||
|
async fn promote_and_fluff_tx(&mut self, tx_id: TxId) -> Result<(), tower::BoxError> {
|
||||||
|
tracing::debug!("Promoting transaction to public pool and fluffing it.");
|
||||||
|
|
||||||
|
let TxStoreResponse::Transaction(tx) = self
|
||||||
|
.backing_pool
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(TxStoreRequest::Get(tx_id.clone()))
|
||||||
|
.await?
|
||||||
|
else {
|
||||||
|
panic!("Backing tx pool responded with wrong response for request.");
|
||||||
|
};
|
||||||
|
|
||||||
|
let Some((tx, state)) = tx else {
|
||||||
|
tracing::debug!("Could not find tx, skipping.");
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
if state == State::Fluff {
|
||||||
|
tracing::debug!("Transaction already fluffed, skipping.");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
self.promote_tx(tx_id.clone()).await?;
|
||||||
|
self.fluff_tx(tx, tx_id).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a tx stored in the fluff _OR_ stem pool.
|
||||||
|
async fn get_tx_from_pool(&mut self, tx_id: TxId) -> Result<Option<Tx>, tower::BoxError> {
|
||||||
|
let TxStoreResponse::Transaction(tx) = self
|
||||||
|
.backing_pool
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(TxStoreRequest::Get(tx_id))
|
||||||
|
.await?
|
||||||
|
else {
|
||||||
|
panic!("Backing tx pool responded with wrong response for request.");
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(tx.map(|tx| tx.0))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Starts the [`DandelionPoolManager`].
|
||||||
|
pub(crate) async fn run(
|
||||||
|
mut self,
|
||||||
|
mut rx: mpsc::Receiver<(IncomingTx<Tx, TxId, PeerId>, oneshot::Sender<()>)>,
|
||||||
|
) {
|
||||||
|
tracing::debug!("Starting dandelion++ tx-pool, config: {:?}", self.config);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tracing::trace!("Waiting for next event.");
|
||||||
|
tokio::select! {
|
||||||
|
// biased to handle current txs before routing new ones.
|
||||||
|
biased;
|
||||||
|
Some(fired) = self.embargo_timers.next() => {
|
||||||
|
tracing::debug!("Embargo timer fired, did not see stem tx in time.");
|
||||||
|
|
||||||
|
let tx_id = fired.into_inner();
|
||||||
|
if let Err(e) = self.promote_and_fluff_tx(tx_id).await {
|
||||||
|
tracing::error!("Error handling fired embargo timer: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(Ok((tx_id, res))) = self.routing_set.join_next() => {
|
||||||
|
tracing::trace!("Received d++ routing result.");
|
||||||
|
|
||||||
|
let res = match res {
|
||||||
|
Ok(State::Fluff) => {
|
||||||
|
tracing::debug!("Transaction was fluffed upgrading it to the public pool.");
|
||||||
|
self.promote_tx(tx_id).await
|
||||||
|
}
|
||||||
|
Err(tx_state) => {
|
||||||
|
tracing::debug!("Error routing transaction, trying again.");
|
||||||
|
|
||||||
|
match self.get_tx_from_pool(tx_id.clone()).await {
|
||||||
|
Ok(Some(tx)) => match tx_state {
|
||||||
|
TxState::Fluff => self.fluff_tx(tx, tx_id).await,
|
||||||
|
TxState::Stem { from } => self.stem_tx(tx, tx_id, Some(from)).await,
|
||||||
|
TxState::Local => self.stem_tx(tx, tx_id, None).await,
|
||||||
|
}
|
||||||
|
Err(e) => Err(e),
|
||||||
|
_ => continue,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(State::Stem) => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = res {
|
||||||
|
tracing::error!("Error handling transaction routing return: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
req = rx.recv() => {
|
||||||
|
tracing::debug!("Received new tx to route.");
|
||||||
|
|
||||||
|
let Some((IncomingTx { tx, tx_id, routing_state }, res_tx)) = req else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = self.handle_incoming_tx(tx, routing_state, tx_id).await {
|
||||||
|
let _ = res_tx.send(());
|
||||||
|
|
||||||
|
tracing::error!("Error handling transaction in dandelion pool: {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let _ = res_tx.send(());
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
145
p2p/dandelion-tower/src/pool/mod.rs
Normal file
145
p2p/dandelion-tower/src/pool/mod.rs
Normal file
|
@ -0,0 +1,145 @@
|
||||||
|
//! # Dandelion++ Pool
|
||||||
|
//!
|
||||||
|
//! This module contains [`DandelionPoolManager`] which is a wrapper around a backing transaction store,
|
||||||
|
//! which fully implements the dandelion++ protocol.
|
||||||
|
//!
|
||||||
|
//! The [`DandelionPoolManager`] is a middle man between a [preprocessing stage](#preprocessing-stage) and a dandelion router.
|
||||||
|
//! It handles promoting transactions in the stem state to the fluff state and setting embargo timers on stem state transactions.
|
||||||
|
//!
|
||||||
|
//! ### Preprocessing stage
|
||||||
|
//!
|
||||||
|
//! The preprocessing stage (not handled in this crate) before giving the transaction to the [`DandelionPoolManager`]
|
||||||
|
//! should handle:
|
||||||
|
//!
|
||||||
|
//! - verifying the tx.
|
||||||
|
//! - checking if we have the tx in the pool already and giving that information to the [`IncomingTxBuilder`].
|
||||||
|
//! - storing the tx in the pool, if it isn't there already.
|
||||||
|
//!
|
||||||
|
//! ### Keep Stem Transactions Hidden
|
||||||
|
//!
|
||||||
|
//! When using your handle to the backing store it must be remembered to keep transactions in the stem pool hidden.
|
||||||
|
//! So handle any requests to the tx-pool like the stem side of the pool does not exist.
|
||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
hash::Hash,
|
||||||
|
marker::PhantomData,
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
use futures::{future::BoxFuture, FutureExt};
|
||||||
|
use rand_distr::Exp;
|
||||||
|
use tokio::{
|
||||||
|
sync::{mpsc, oneshot},
|
||||||
|
task::JoinSet,
|
||||||
|
};
|
||||||
|
use tokio_util::{sync::PollSender, time::DelayQueue};
|
||||||
|
use tower::Service;
|
||||||
|
use tracing::Instrument;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
pool::manager::DandelionPoolShutDown,
|
||||||
|
traits::{TxStoreRequest, TxStoreResponse},
|
||||||
|
DandelionConfig, DandelionRouteReq, DandelionRouterError, State,
|
||||||
|
};
|
||||||
|
|
||||||
|
mod incoming_tx;
|
||||||
|
mod manager;
|
||||||
|
|
||||||
|
pub use incoming_tx::{IncomingTx, IncomingTxBuilder};
|
||||||
|
pub use manager::DandelionPoolManager;
|
||||||
|
|
||||||
|
/// Start the [`DandelionPoolManager`].
|
||||||
|
///
|
||||||
|
/// This function spawns the [`DandelionPoolManager`] and returns [`DandelionPoolService`] which can be used to send
|
||||||
|
/// requests to the pool.
|
||||||
|
///
|
||||||
|
/// ### Args
|
||||||
|
///
|
||||||
|
/// - `buffer_size` is the size of the channel's buffer between the [`DandelionPoolService`] and [`DandelionPoolManager`].
|
||||||
|
/// - `dandelion_router` is the router service, kept generic instead of [`DandelionRouter`](crate::DandelionRouter) to allow
|
||||||
|
/// user to customise routing functionality.
|
||||||
|
/// - `backing_pool` is the backing transaction storage service
|
||||||
|
/// - `config` is [`DandelionConfig`].
|
||||||
|
pub fn start_dandelion_pool_manager<P, R, Tx, TxId, PeerId>(
|
||||||
|
buffer_size: usize,
|
||||||
|
dandelion_router: R,
|
||||||
|
backing_pool: P,
|
||||||
|
config: DandelionConfig,
|
||||||
|
) -> DandelionPoolService<Tx, TxId, PeerId>
|
||||||
|
where
|
||||||
|
Tx: Clone + Send + 'static,
|
||||||
|
TxId: Hash + Eq + Clone + Send + 'static,
|
||||||
|
PeerId: Hash + Eq + Clone + Send + 'static,
|
||||||
|
P: Service<TxStoreRequest<TxId>, Response = TxStoreResponse<Tx>, Error = tower::BoxError>
|
||||||
|
+ Send
|
||||||
|
+ 'static,
|
||||||
|
P::Future: Send + 'static,
|
||||||
|
R: Service<DandelionRouteReq<Tx, PeerId>, Response = State, Error = DandelionRouterError>
|
||||||
|
+ Send
|
||||||
|
+ 'static,
|
||||||
|
R::Future: Send + 'static,
|
||||||
|
{
|
||||||
|
let (tx, rx) = mpsc::channel(buffer_size);
|
||||||
|
|
||||||
|
let pool = DandelionPoolManager {
|
||||||
|
dandelion_router,
|
||||||
|
backing_pool,
|
||||||
|
routing_set: JoinSet::new(),
|
||||||
|
stem_origins: HashMap::new(),
|
||||||
|
embargo_timers: DelayQueue::new(),
|
||||||
|
embargo_dist: Exp::new(1.0 / config.average_embargo_timeout().as_secs_f64()).unwrap(),
|
||||||
|
config,
|
||||||
|
_tx: PhantomData,
|
||||||
|
};
|
||||||
|
|
||||||
|
let span = tracing::debug_span!("dandelion_pool");
|
||||||
|
|
||||||
|
tokio::spawn(pool.run(rx).instrument(span));
|
||||||
|
|
||||||
|
DandelionPoolService {
|
||||||
|
tx: PollSender::new(tx),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The dandelion pool manager service.
|
||||||
|
///
|
||||||
|
/// Used to send [`IncomingTx`]s to the [`DandelionPoolManager`]
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct DandelionPoolService<Tx, TxId, PeerId> {
|
||||||
|
/// The channel to [`DandelionPoolManager`].
|
||||||
|
tx: PollSender<(IncomingTx<Tx, TxId, PeerId>, oneshot::Sender<()>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Tx, TxId, PeerId> Service<IncomingTx<Tx, TxId, PeerId>>
|
||||||
|
for DandelionPoolService<Tx, TxId, PeerId>
|
||||||
|
where
|
||||||
|
Tx: Clone + Send,
|
||||||
|
TxId: Hash + Eq + Clone + Send + 'static,
|
||||||
|
PeerId: Hash + Eq + Clone + Send + 'static,
|
||||||
|
{
|
||||||
|
type Response = ();
|
||||||
|
type Error = DandelionPoolShutDown;
|
||||||
|
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
self.tx.poll_reserve(cx).map_err(|_| DandelionPoolShutDown)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: IncomingTx<Tx, TxId, PeerId>) -> Self::Future {
|
||||||
|
// although the channel isn't sending anything we want to wait for the request to be handled before continuing.
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let res = self
|
||||||
|
.tx
|
||||||
|
.send_item((req, tx))
|
||||||
|
.map_err(|_| DandelionPoolShutDown);
|
||||||
|
|
||||||
|
async move {
|
||||||
|
res?;
|
||||||
|
rx.await.expect("Oneshot dropped before response!");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
}
|
|
@ -6,7 +6,7 @@
|
||||||
//! ### What The Router Does Not Do
|
//! ### What The Router Does Not Do
|
||||||
//!
|
//!
|
||||||
//! It does not handle anything to do with keeping transactions long term, i.e. embargo timers and handling
|
//! It does not handle anything to do with keeping transactions long term, i.e. embargo timers and handling
|
||||||
//! loops in the stem. It is up to implementers to do this if they decide not to use [`DandelionPool`](crate::pool::DandelionPool)
|
//! loops in the stem. It is up to implementers to do this if they decide not to use [`DandelionPool`](crate::pool::DandelionPoolManager)
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
hash::Hash,
|
hash::Hash,
|
||||||
|
@ -43,9 +43,9 @@ pub enum DandelionRouterError {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A response from an attempt to retrieve an outbound peer.
|
/// A response from an attempt to retrieve an outbound peer.
|
||||||
pub enum OutboundPeer<ID, T> {
|
pub enum OutboundPeer<Id, T> {
|
||||||
/// A peer.
|
/// A peer.
|
||||||
Peer(ID, T),
|
Peer(Id, T),
|
||||||
/// The peer store is exhausted and has no more to return.
|
/// The peer store is exhausted and has no more to return.
|
||||||
Exhausted,
|
Exhausted,
|
||||||
}
|
}
|
||||||
|
@ -61,28 +61,28 @@ pub enum State {
|
||||||
|
|
||||||
/// The routing state of a transaction.
|
/// The routing state of a transaction.
|
||||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||||
pub enum TxState<ID> {
|
pub enum TxState<Id> {
|
||||||
/// Fluff state.
|
/// Fluff state.
|
||||||
Fluff,
|
Fluff,
|
||||||
/// Stem state.
|
/// Stem state.
|
||||||
Stem {
|
Stem {
|
||||||
/// The peer who sent us this transaction's ID.
|
/// The peer who sent us this transaction's Id.
|
||||||
from: ID,
|
from: Id,
|
||||||
},
|
},
|
||||||
/// Local - the transaction originated from our node.
|
/// Local - the transaction originated from our node.
|
||||||
Local,
|
Local,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A request to route a transaction.
|
/// A request to route a transaction.
|
||||||
pub struct DandelionRouteReq<Tx, ID> {
|
pub struct DandelionRouteReq<Tx, Id> {
|
||||||
/// The transaction.
|
/// The transaction.
|
||||||
pub tx: Tx,
|
pub tx: Tx,
|
||||||
/// The transaction state.
|
/// The transaction state.
|
||||||
pub state: TxState<ID>,
|
pub state: TxState<Id>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The dandelion router service.
|
/// The dandelion router service.
|
||||||
pub struct DandelionRouter<P, B, ID, S, Tx> {
|
pub struct DandelionRouter<P, B, Id, S, Tx> {
|
||||||
// pub(crate) is for tests
|
// pub(crate) is for tests
|
||||||
/// A [`Discover`] where we can get outbound peers from.
|
/// A [`Discover`] where we can get outbound peers from.
|
||||||
outbound_peer_discover: Pin<Box<P>>,
|
outbound_peer_discover: Pin<Box<P>>,
|
||||||
|
@ -95,14 +95,14 @@ pub struct DandelionRouter<P, B, ID, S, Tx> {
|
||||||
epoch_start: Instant,
|
epoch_start: Instant,
|
||||||
|
|
||||||
/// The stem our local transactions will be sent to.
|
/// The stem our local transactions will be sent to.
|
||||||
local_route: Option<ID>,
|
local_route: Option<Id>,
|
||||||
/// A [`HashMap`] linking peer's IDs to IDs in `stem_peers`.
|
/// A [`HashMap`] linking peer's Ids to Ids in `stem_peers`.
|
||||||
stem_routes: HashMap<ID, ID>,
|
stem_routes: HashMap<Id, Id>,
|
||||||
/// Peers we are using for stemming.
|
/// Peers we are using for stemming.
|
||||||
///
|
///
|
||||||
/// This will contain peers, even in [`State::Fluff`] to allow us to stem [`TxState::Local`]
|
/// This will contain peers, even in [`State::Fluff`] to allow us to stem [`TxState::Local`]
|
||||||
/// transactions.
|
/// transactions.
|
||||||
pub(crate) stem_peers: HashMap<ID, S>,
|
pub(crate) stem_peers: HashMap<Id, S>,
|
||||||
|
|
||||||
/// The distribution to sample to get the [`State`], true is [`State::Fluff`].
|
/// The distribution to sample to get the [`State`], true is [`State::Fluff`].
|
||||||
state_dist: Bernoulli,
|
state_dist: Bernoulli,
|
||||||
|
@ -116,10 +116,10 @@ pub struct DandelionRouter<P, B, ID, S, Tx> {
|
||||||
_tx: PhantomData<Tx>,
|
_tx: PhantomData<Tx>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Tx, ID, P, B, S> DandelionRouter<P, B, ID, S, Tx>
|
impl<Tx, Id, P, B, S> DandelionRouter<P, B, Id, S, Tx>
|
||||||
where
|
where
|
||||||
ID: Hash + Eq + Clone,
|
Id: Hash + Eq + Clone,
|
||||||
P: TryStream<Ok = OutboundPeer<ID, S>, Error = tower::BoxError>,
|
P: TryStream<Ok = OutboundPeer<Id, S>, Error = tower::BoxError>,
|
||||||
B: Service<DiffuseRequest<Tx>, Error = tower::BoxError>,
|
B: Service<DiffuseRequest<Tx>, Error = tower::BoxError>,
|
||||||
B::Future: Send + 'static,
|
B::Future: Send + 'static,
|
||||||
S: Service<StemRequest<Tx>, Error = tower::BoxError>,
|
S: Service<StemRequest<Tx>, Error = tower::BoxError>,
|
||||||
|
@ -198,7 +198,7 @@ where
|
||||||
fn stem_tx(
|
fn stem_tx(
|
||||||
&mut self,
|
&mut self,
|
||||||
tx: Tx,
|
tx: Tx,
|
||||||
from: ID,
|
from: Id,
|
||||||
) -> BoxFuture<'static, Result<State, DandelionRouterError>> {
|
) -> BoxFuture<'static, Result<State, DandelionRouterError>> {
|
||||||
if self.stem_peers.is_empty() {
|
if self.stem_peers.is_empty() {
|
||||||
tracing::debug!("Stem peers are empty, fluffing stem transaction.");
|
tracing::debug!("Stem peers are empty, fluffing stem transaction.");
|
||||||
|
@ -258,19 +258,10 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
impl<Tx, Id, P, B, S> Service<DandelionRouteReq<Tx, Id>> for DandelionRouter<P, B, Id, S, Tx>
|
||||||
## Generics ##
|
|
||||||
|
|
||||||
Tx: The tx type
|
|
||||||
ID: Peer Id type - unique identifier for nodes.
|
|
||||||
P: Peer Set discover - where we can get outbound peers from
|
|
||||||
B: Broadcast service - where we send txs to get diffused.
|
|
||||||
S: The Peer service - handles routing messages to a single node.
|
|
||||||
*/
|
|
||||||
impl<Tx, ID, P, B, S> Service<DandelionRouteReq<Tx, ID>> for DandelionRouter<P, B, ID, S, Tx>
|
|
||||||
where
|
where
|
||||||
ID: Hash + Eq + Clone,
|
Id: Hash + Eq + Clone,
|
||||||
P: TryStream<Ok = OutboundPeer<ID, S>, Error = tower::BoxError>,
|
P: TryStream<Ok = OutboundPeer<Id, S>, Error = tower::BoxError>,
|
||||||
B: Service<DiffuseRequest<Tx>, Error = tower::BoxError>,
|
B: Service<DiffuseRequest<Tx>, Error = tower::BoxError>,
|
||||||
B::Future: Send + 'static,
|
B::Future: Send + 'static,
|
||||||
S: Service<StemRequest<Tx>, Error = tower::BoxError>,
|
S: Service<StemRequest<Tx>, Error = tower::BoxError>,
|
||||||
|
@ -336,7 +327,7 @@ where
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn call(&mut self, req: DandelionRouteReq<Tx, ID>) -> Self::Future {
|
fn call(&mut self, req: DandelionRouteReq<Tx, Id>) -> Self::Future {
|
||||||
tracing::trace!(parent: &self.span, "Handling route request.");
|
tracing::trace!(parent: &self.span, "Handling route request.");
|
||||||
|
|
||||||
match req.state {
|
match req.state {
|
||||||
|
|
|
@ -76,11 +76,9 @@ pub fn mock_in_memory_backing_pool<
|
||||||
TxID: Clone + Hash + Eq + Send + 'static,
|
TxID: Clone + Hash + Eq + Send + 'static,
|
||||||
>() -> (
|
>() -> (
|
||||||
impl Service<
|
impl Service<
|
||||||
TxStoreRequest<Tx, TxID>,
|
TxStoreRequest<TxID>,
|
||||||
Response = TxStoreResponse<Tx, TxID>,
|
Response = TxStoreResponse<Tx>,
|
||||||
Future = impl Future<Output = Result<TxStoreResponse<Tx, TxID>, tower::BoxError>>
|
Future = impl Future<Output = Result<TxStoreResponse<Tx>, tower::BoxError>> + Send + 'static,
|
||||||
+ Send
|
|
||||||
+ 'static,
|
|
||||||
Error = tower::BoxError,
|
Error = tower::BoxError,
|
||||||
> + Send
|
> + Send
|
||||||
+ 'static,
|
+ 'static,
|
||||||
|
@ -90,33 +88,14 @@ pub fn mock_in_memory_backing_pool<
|
||||||
let txs_2 = txs.clone();
|
let txs_2 = txs.clone();
|
||||||
|
|
||||||
(
|
(
|
||||||
service_fn(move |req: TxStoreRequest<Tx, TxID>| {
|
service_fn(move |req: TxStoreRequest<TxID>| {
|
||||||
let txs = txs.clone();
|
let txs = txs.clone();
|
||||||
async move {
|
async move {
|
||||||
match req {
|
match req {
|
||||||
TxStoreRequest::Store(tx, tx_id, state) => {
|
|
||||||
txs.lock().unwrap().insert(tx_id, (tx, state));
|
|
||||||
Ok(TxStoreResponse::Ok)
|
|
||||||
}
|
|
||||||
TxStoreRequest::Get(tx_id) => {
|
TxStoreRequest::Get(tx_id) => {
|
||||||
let tx_state = txs.lock().unwrap().get(&tx_id).cloned();
|
let tx_state = txs.lock().unwrap().get(&tx_id).cloned();
|
||||||
Ok(TxStoreResponse::Transaction(tx_state))
|
Ok(TxStoreResponse::Transaction(tx_state))
|
||||||
}
|
}
|
||||||
TxStoreRequest::Contains(tx_id) => Ok(TxStoreResponse::Contains(
|
|
||||||
txs.lock().unwrap().get(&tx_id).map(|res| res.1),
|
|
||||||
)),
|
|
||||||
TxStoreRequest::IDsInStemPool => {
|
|
||||||
// horribly inefficient, but it's test code :)
|
|
||||||
let ids = txs
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.iter()
|
|
||||||
.filter(|(_, (_, state))| matches!(state, State::Stem))
|
|
||||||
.map(|tx| tx.0.clone())
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
Ok(TxStoreResponse::IDs(ids))
|
|
||||||
}
|
|
||||||
TxStoreRequest::Promote(tx_id) => {
|
TxStoreRequest::Promote(tx_id) => {
|
||||||
let _ = txs
|
let _ = txs
|
||||||
.lock()
|
.lock()
|
||||||
|
|
|
@ -1,12 +1,11 @@
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
use crate::{
|
use crate::{
|
||||||
pool::{start_dandelion_pool, IncomingTx},
|
pool::{start_dandelion_pool_manager, IncomingTx},
|
||||||
DandelionConfig, DandelionRouter, Graph, TxState,
|
DandelionConfig, DandelionRouter, Graph, TxState,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn basic_functionality() {
|
async fn basic_functionality() {
|
||||||
let config = DandelionConfig {
|
let config = DandelionConfig {
|
||||||
|
@ -21,9 +20,9 @@ async fn basic_functionality() {
|
||||||
|
|
||||||
let router = DandelionRouter::new(broadcast_svc, outbound_peer_svc, config);
|
let router = DandelionRouter::new(broadcast_svc, outbound_peer_svc, config);
|
||||||
|
|
||||||
let (pool_svc, pool) = mock_in_memory_backing_pool();
|
let (pool_svc, _pool) = mock_in_memory_backing_pool();
|
||||||
|
|
||||||
let mut pool_svc = start_dandelion_pool(15, router, pool_svc, config);
|
let mut pool_svc = start_dandelion_pool_manager(15, router, pool_svc, config);
|
||||||
|
|
||||||
pool_svc
|
pool_svc
|
||||||
.ready()
|
.ready()
|
||||||
|
@ -32,11 +31,13 @@ async fn basic_functionality() {
|
||||||
.call(IncomingTx {
|
.call(IncomingTx {
|
||||||
tx: 0_usize,
|
tx: 0_usize,
|
||||||
tx_id: 1_usize,
|
tx_id: 1_usize,
|
||||||
tx_state: TxState::Fluff,
|
routing_state: TxState::Fluff,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
assert!(pool.lock().unwrap().contains_key(&1));
|
// TODO: the DandelionPoolManager doesn't handle adding txs to the pool, add more tests here to test
|
||||||
|
// all functionality.
|
||||||
|
//assert!(pool.lock().unwrap().contains_key(&1));
|
||||||
assert!(broadcast_rx.try_recv().is_ok())
|
assert!(broadcast_rx.try_recv().is_ok())
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,42 +8,24 @@ pub struct StemRequest<Tx>(pub Tx);
|
||||||
|
|
||||||
#[cfg(feature = "txpool")]
|
#[cfg(feature = "txpool")]
|
||||||
/// A request sent to the backing transaction pool storage.
|
/// A request sent to the backing transaction pool storage.
|
||||||
pub enum TxStoreRequest<Tx, TxID> {
|
pub enum TxStoreRequest<TxId> {
|
||||||
/// A request to store a transaction with the ID to store it under and the pool to store it in.
|
/// A request to retrieve a `Tx` with the given Id from the pool, should not remove that tx from the pool.
|
||||||
///
|
|
||||||
/// If the tx is already in the pool then do nothing, unless the tx is in the stem pool then move it
|
|
||||||
/// to the fluff pool, _if this request state is fluff_.
|
|
||||||
Store(Tx, TxID, crate::State),
|
|
||||||
/// A request to retrieve a `Tx` with the given ID from the pool, should not remove that tx from the pool.
|
|
||||||
///
|
///
|
||||||
/// Must return [`TxStoreResponse::Transaction`]
|
/// Must return [`TxStoreResponse::Transaction`]
|
||||||
Get(TxID),
|
Get(TxId),
|
||||||
/// Promote a transaction from the stem pool to the public pool.
|
/// Promote a transaction from the stem pool to the public pool.
|
||||||
///
|
///
|
||||||
/// If the tx is already in the fluff pool do nothing.
|
/// If the tx is already in the fluff pool do nothing.
|
||||||
///
|
///
|
||||||
/// This should not error if the tx isn't in the pool at all.
|
/// This should not error if the tx isn't in the pool at all.
|
||||||
Promote(TxID),
|
Promote(TxId),
|
||||||
/// A request to check if a translation is in the pool.
|
|
||||||
///
|
|
||||||
/// Must return [`TxStoreResponse::Contains`]
|
|
||||||
Contains(TxID),
|
|
||||||
/// Returns the IDs of all the transaction in the stem pool.
|
|
||||||
///
|
|
||||||
/// Must return [`TxStoreResponse::IDs`]
|
|
||||||
IDsInStemPool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "txpool")]
|
#[cfg(feature = "txpool")]
|
||||||
/// A response sent back from the backing transaction pool.
|
/// A response sent back from the backing transaction pool.
|
||||||
pub enum TxStoreResponse<Tx, TxID> {
|
pub enum TxStoreResponse<Tx> {
|
||||||
/// A generic ok response.
|
/// A generic ok response.
|
||||||
Ok,
|
Ok,
|
||||||
/// A response containing a [`Option`] for if the transaction is in the pool (Some) or not (None) and in which pool
|
|
||||||
/// the tx is in.
|
|
||||||
Contains(Option<crate::State>),
|
|
||||||
/// A response containing a requested transaction.
|
/// A response containing a requested transaction.
|
||||||
Transaction(Option<(Tx, crate::State)>),
|
Transaction(Option<(Tx, crate::State)>),
|
||||||
/// A list of transaction IDs.
|
|
||||||
IDs(Vec<TxID>),
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue