diff --git a/p2p/dandelion-tower/src/pool/incoming_tx.rs b/p2p/dandelion-tower/src/pool/incoming_tx.rs index e68f8bec..c9a30dee 100644 --- a/p2p/dandelion-tower/src/pool/incoming_tx.rs +++ b/p2p/dandelion-tower/src/pool/incoming_tx.rs @@ -2,13 +2,13 @@ use crate::{State, TxState}; /// An incoming transaction that has gone through the preprocessing stage. -pub struct IncomingTx { +pub struct IncomingTx { /// The transaction. pub(crate) tx: Tx, /// The transaction ID. pub(crate) tx_id: TxId, /// The routing state of the transaction. - pub(crate) routing_state: TxState, + pub(crate) routing_state: TxState, } /// An [`IncomingTx`] builder. @@ -17,18 +17,18 @@ pub struct IncomingTx { /// /// - `RS`: routing state; a `bool` for if the routing state is set /// - `DBS`: database state; a `bool` for if the state in the DB is set -pub struct IncomingTxBuilder { +pub struct IncomingTxBuilder { /// The transaction. tx: Tx, /// The transaction ID. tx_id: TxId, /// The routing state of the transaction. - routing_state: Option>, + routing_state: Option>, /// The state of this transaction in the DB. state_in_db: Option, } -impl IncomingTxBuilder { +impl IncomingTxBuilder { /// Creates a new [`IncomingTxBuilder`]. pub fn new(tx: Tx, tx_id: TxId) -> Self { Self { @@ -40,14 +40,14 @@ impl IncomingTxBuilder { } } -impl IncomingTxBuilder { +impl IncomingTxBuilder { /// Adds the routing state to the builder. /// /// The routing state is the origin of this transaction from our perspective. pub fn with_routing_state( self, - state: TxState, - ) -> IncomingTxBuilder { + state: TxState, + ) -> IncomingTxBuilder { IncomingTxBuilder { tx: self.tx, tx_id: self.tx_id, @@ -57,14 +57,14 @@ impl IncomingTxBuilder IncomingTxBuilder { +impl IncomingTxBuilder { /// Adds the database state to the builder. /// /// If the transaction is not in the DB already then the state should be [`None`]. pub fn with_state_in_db( self, state: Option, - ) -> IncomingTxBuilder { + ) -> IncomingTxBuilder { IncomingTxBuilder { tx: self.tx, tx_id: self.tx_id, @@ -74,12 +74,12 @@ impl IncomingTxBuilder } } -impl IncomingTxBuilder { +impl IncomingTxBuilder { /// Builds the [`IncomingTx`]. /// /// If this returns [`None`] then the transaction does not need to be given to the dandelion pool /// manager. - pub fn build(self) -> Option> { + pub fn build(self) -> Option> { let routing_state = self.routing_state.unwrap(); if self.state_in_db == Some(State::Fluff) { diff --git a/p2p/dandelion-tower/src/pool/manager.rs b/p2p/dandelion-tower/src/pool/manager.rs index 34675447..9e1572e1 100644 --- a/p2p/dandelion-tower/src/pool/manager.rs +++ b/p2p/dandelion-tower/src/pool/manager.rs @@ -28,16 +28,16 @@ pub struct DandelionPoolShutDown; /// The dandelion++ pool manager. /// /// See the [module docs](super) for more. -pub struct DandelionPoolManager { +pub struct DandelionPoolManager { /// The dandelion++ router pub(crate) dandelion_router: R, /// The backing tx storage. pub(crate) backing_pool: P, /// The set of tasks that are running the future returned from `dandelion_router`. - pub(crate) routing_set: JoinSet<(TxId, Result>)>, + pub(crate) routing_set: JoinSet<(TxId, Result>)>, /// The origin of stem transactions. - pub(crate) stem_origins: HashMap>, + pub(crate) stem_origins: HashMap>, /// Current stem pool embargo timers. pub(crate) embargo_timers: DelayQueue, @@ -50,14 +50,14 @@ pub struct DandelionPoolManager { pub(crate) _tx: PhantomData, } -impl DandelionPoolManager +impl DandelionPoolManager where Tx: Clone + Send, TxId: Hash + Eq + Clone + Send + 'static, - PId: Hash + Eq + Clone + Send + 'static, + PeerId: Hash + Eq + Clone + Send + 'static, P: Service, Response = TxStoreResponse, Error = tower::BoxError>, P::Future: Send + 'static, - R: Service, Response = State, Error = DandelionRouterError>, + R: Service, Response = State, Error = DandelionRouterError>, R::Future: Send + 'static, { /// Adds a new embargo timer to the running timers, with a duration pulled from [`Self::embargo_dist`] @@ -79,7 +79,7 @@ where &mut self, tx: Tx, tx_id: TxId, - from: Option, + from: Option, ) -> Result<(), tower::BoxError> { if let Some(peer) = &from { self.stem_origins @@ -126,7 +126,7 @@ where async fn handle_incoming_tx( &mut self, tx: Tx, - tx_state: TxState, + tx_state: TxState, tx_id: TxId, ) -> Result<(), tower::BoxError> { match tx_state { @@ -225,7 +225,7 @@ where /// Starts the [`DandelionPoolManager`]. pub(crate) async fn run( mut self, - mut rx: mpsc::Receiver<(IncomingTx, oneshot::Sender<()>)>, + mut rx: mpsc::Receiver<(IncomingTx, oneshot::Sender<()>)>, ) { tracing::debug!("Starting dandelion++ tx-pool, config: {:?}", self.config); diff --git a/p2p/dandelion-tower/src/pool/mod.rs b/p2p/dandelion-tower/src/pool/mod.rs index a338a907..40a36172 100644 --- a/p2p/dandelion-tower/src/pool/mod.rs +++ b/p2p/dandelion-tower/src/pool/mod.rs @@ -60,21 +60,21 @@ pub use manager::DandelionPoolManager; /// user to customise routing functionality. /// - `backing_pool` is the backing transaction storage service /// - `config` is [`DandelionConfig`]. -pub fn start_dandelion_pool_manager( +pub fn start_dandelion_pool_manager( buffer_size: usize, dandelion_router: R, backing_pool: P, config: DandelionConfig, -) -> DandelionPoolService +) -> DandelionPoolService where Tx: Clone + Send + 'static, TxId: Hash + Eq + Clone + Send + 'static, - PId: Hash + Eq + Clone + Send + 'static, + PeerId: Hash + Eq + Clone + Send + 'static, P: Service, Response = TxStoreResponse, Error = tower::BoxError> + Send + 'static, P::Future: Send + 'static, - R: Service, Response = State, Error = DandelionRouterError> + R: Service, Response = State, Error = DandelionRouterError> + Send + 'static, R::Future: Send + 'static, @@ -105,16 +105,17 @@ where /// /// Used to send [`IncomingTx`]s to the [`DandelionPoolManager`] #[derive(Clone)] -pub struct DandelionPoolService { +pub struct DandelionPoolService { /// The channel to [`DandelionPoolManager`]. - tx: PollSender<(IncomingTx, oneshot::Sender<()>)>, + tx: PollSender<(IncomingTx, oneshot::Sender<()>)>, } -impl Service> for DandelionPoolService +impl Service> + for DandelionPoolService where Tx: Clone + Send, TxId: Hash + Eq + Clone + Send + 'static, - PId: Hash + Eq + Clone + Send + 'static, + PeerId: Hash + Eq + Clone + Send + 'static, { type Response = (); type Error = DandelionPoolShutDown; @@ -124,7 +125,7 @@ where self.tx.poll_reserve(cx).map_err(|_| DandelionPoolShutDown) } - fn call(&mut self, req: IncomingTx) -> Self::Future { + fn call(&mut self, req: IncomingTx) -> Self::Future { // although the channel isn't sending anything we want to wait for the request to be handled before continuing. let (tx, rx) = oneshot::channel();