mirror of
https://github.com/Cuprate/cuprate.git
synced 2024-11-16 15:58:17 +00:00
PId
-> PeerId
This commit is contained in:
parent
ca8ac91f22
commit
d6122f34dd
3 changed files with 31 additions and 30 deletions
|
@ -2,13 +2,13 @@
|
||||||
use crate::{State, TxState};
|
use crate::{State, TxState};
|
||||||
|
|
||||||
/// An incoming transaction that has gone through the preprocessing stage.
|
/// An incoming transaction that has gone through the preprocessing stage.
|
||||||
pub struct IncomingTx<Tx, TxId, PId> {
|
pub struct IncomingTx<Tx, TxId, PeerId> {
|
||||||
/// The transaction.
|
/// The transaction.
|
||||||
pub(crate) tx: Tx,
|
pub(crate) tx: Tx,
|
||||||
/// The transaction ID.
|
/// The transaction ID.
|
||||||
pub(crate) tx_id: TxId,
|
pub(crate) tx_id: TxId,
|
||||||
/// The routing state of the transaction.
|
/// The routing state of the transaction.
|
||||||
pub(crate) routing_state: TxState<PId>,
|
pub(crate) routing_state: TxState<PeerId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An [`IncomingTx`] builder.
|
/// An [`IncomingTx`] builder.
|
||||||
|
@ -17,18 +17,18 @@ pub struct IncomingTx<Tx, TxId, PId> {
|
||||||
///
|
///
|
||||||
/// - `RS`: routing state; a `bool` for if the routing state is set
|
/// - `RS`: routing state; a `bool` for if the routing state is set
|
||||||
/// - `DBS`: database state; a `bool` for if the state in the DB is set
|
/// - `DBS`: database state; a `bool` for if the state in the DB is set
|
||||||
pub struct IncomingTxBuilder<const RS: bool, const DBS: bool, Tx, TxId, PId> {
|
pub struct IncomingTxBuilder<const RS: bool, const DBS: bool, Tx, TxId, PeerId> {
|
||||||
/// The transaction.
|
/// The transaction.
|
||||||
tx: Tx,
|
tx: Tx,
|
||||||
/// The transaction ID.
|
/// The transaction ID.
|
||||||
tx_id: TxId,
|
tx_id: TxId,
|
||||||
/// The routing state of the transaction.
|
/// The routing state of the transaction.
|
||||||
routing_state: Option<TxState<PId>>,
|
routing_state: Option<TxState<PeerId>>,
|
||||||
/// The state of this transaction in the DB.
|
/// The state of this transaction in the DB.
|
||||||
state_in_db: Option<State>,
|
state_in_db: Option<State>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Tx, TxId, PId> IncomingTxBuilder<false, false, Tx, TxId, PId> {
|
impl<Tx, TxId, PeerId> IncomingTxBuilder<false, false, Tx, TxId, PeerId> {
|
||||||
/// Creates a new [`IncomingTxBuilder`].
|
/// Creates a new [`IncomingTxBuilder`].
|
||||||
pub fn new(tx: Tx, tx_id: TxId) -> Self {
|
pub fn new(tx: Tx, tx_id: TxId) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
@ -40,14 +40,14 @@ impl<Tx, TxId, PId> IncomingTxBuilder<false, false, Tx, TxId, PId> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<const DBS: bool, Tx, TxId, PId> IncomingTxBuilder<false, DBS, Tx, TxId, PId> {
|
impl<const DBS: bool, Tx, TxId, PeerId> IncomingTxBuilder<false, DBS, Tx, TxId, PeerId> {
|
||||||
/// Adds the routing state to the builder.
|
/// Adds the routing state to the builder.
|
||||||
///
|
///
|
||||||
/// The routing state is the origin of this transaction from our perspective.
|
/// The routing state is the origin of this transaction from our perspective.
|
||||||
pub fn with_routing_state(
|
pub fn with_routing_state(
|
||||||
self,
|
self,
|
||||||
state: TxState<PId>,
|
state: TxState<PeerId>,
|
||||||
) -> IncomingTxBuilder<true, DBS, Tx, TxId, PId> {
|
) -> IncomingTxBuilder<true, DBS, Tx, TxId, PeerId> {
|
||||||
IncomingTxBuilder {
|
IncomingTxBuilder {
|
||||||
tx: self.tx,
|
tx: self.tx,
|
||||||
tx_id: self.tx_id,
|
tx_id: self.tx_id,
|
||||||
|
@ -57,14 +57,14 @@ impl<const DBS: bool, Tx, TxId, PId> IncomingTxBuilder<false, DBS, Tx, TxId, PId
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<const RS: bool, Tx, TxId, PId> IncomingTxBuilder<RS, false, Tx, TxId, PId> {
|
impl<const RS: bool, Tx, TxId, PeerId> IncomingTxBuilder<RS, false, Tx, TxId, PeerId> {
|
||||||
/// Adds the database state to the builder.
|
/// Adds the database state to the builder.
|
||||||
///
|
///
|
||||||
/// If the transaction is not in the DB already then the state should be [`None`].
|
/// If the transaction is not in the DB already then the state should be [`None`].
|
||||||
pub fn with_state_in_db(
|
pub fn with_state_in_db(
|
||||||
self,
|
self,
|
||||||
state: Option<State>,
|
state: Option<State>,
|
||||||
) -> IncomingTxBuilder<RS, true, Tx, TxId, PId> {
|
) -> IncomingTxBuilder<RS, true, Tx, TxId, PeerId> {
|
||||||
IncomingTxBuilder {
|
IncomingTxBuilder {
|
||||||
tx: self.tx,
|
tx: self.tx,
|
||||||
tx_id: self.tx_id,
|
tx_id: self.tx_id,
|
||||||
|
@ -74,12 +74,12 @@ impl<const RS: bool, Tx, TxId, PId> IncomingTxBuilder<RS, false, Tx, TxId, PId>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Tx, TxId, PId> IncomingTxBuilder<true, true, Tx, TxId, PId> {
|
impl<Tx, TxId, PeerId> IncomingTxBuilder<true, true, Tx, TxId, PeerId> {
|
||||||
/// Builds the [`IncomingTx`].
|
/// Builds the [`IncomingTx`].
|
||||||
///
|
///
|
||||||
/// If this returns [`None`] then the transaction does not need to be given to the dandelion pool
|
/// If this returns [`None`] then the transaction does not need to be given to the dandelion pool
|
||||||
/// manager.
|
/// manager.
|
||||||
pub fn build(self) -> Option<IncomingTx<Tx, TxId, PId>> {
|
pub fn build(self) -> Option<IncomingTx<Tx, TxId, PeerId>> {
|
||||||
let routing_state = self.routing_state.unwrap();
|
let routing_state = self.routing_state.unwrap();
|
||||||
|
|
||||||
if self.state_in_db == Some(State::Fluff) {
|
if self.state_in_db == Some(State::Fluff) {
|
||||||
|
|
|
@ -28,16 +28,16 @@ pub struct DandelionPoolShutDown;
|
||||||
/// The dandelion++ pool manager.
|
/// The dandelion++ pool manager.
|
||||||
///
|
///
|
||||||
/// See the [module docs](super) for more.
|
/// See the [module docs](super) for more.
|
||||||
pub struct DandelionPoolManager<P, R, Tx, TxId, PId> {
|
pub struct DandelionPoolManager<P, R, Tx, TxId, PeerId> {
|
||||||
/// The dandelion++ router
|
/// The dandelion++ router
|
||||||
pub(crate) dandelion_router: R,
|
pub(crate) dandelion_router: R,
|
||||||
/// The backing tx storage.
|
/// The backing tx storage.
|
||||||
pub(crate) backing_pool: P,
|
pub(crate) backing_pool: P,
|
||||||
/// The set of tasks that are running the future returned from `dandelion_router`.
|
/// The set of tasks that are running the future returned from `dandelion_router`.
|
||||||
pub(crate) routing_set: JoinSet<(TxId, Result<State, TxState<PId>>)>,
|
pub(crate) routing_set: JoinSet<(TxId, Result<State, TxState<PeerId>>)>,
|
||||||
|
|
||||||
/// The origin of stem transactions.
|
/// The origin of stem transactions.
|
||||||
pub(crate) stem_origins: HashMap<TxId, HashSet<PId>>,
|
pub(crate) stem_origins: HashMap<TxId, HashSet<PeerId>>,
|
||||||
|
|
||||||
/// Current stem pool embargo timers.
|
/// Current stem pool embargo timers.
|
||||||
pub(crate) embargo_timers: DelayQueue<TxId>,
|
pub(crate) embargo_timers: DelayQueue<TxId>,
|
||||||
|
@ -50,14 +50,14 @@ pub struct DandelionPoolManager<P, R, Tx, TxId, PId> {
|
||||||
pub(crate) _tx: PhantomData<Tx>,
|
pub(crate) _tx: PhantomData<Tx>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<P, R, Tx, TxId, PId> DandelionPoolManager<P, R, Tx, TxId, PId>
|
impl<P, R, Tx, TxId, PeerId> DandelionPoolManager<P, R, Tx, TxId, PeerId>
|
||||||
where
|
where
|
||||||
Tx: Clone + Send,
|
Tx: Clone + Send,
|
||||||
TxId: Hash + Eq + Clone + Send + 'static,
|
TxId: Hash + Eq + Clone + Send + 'static,
|
||||||
PId: Hash + Eq + Clone + Send + 'static,
|
PeerId: Hash + Eq + Clone + Send + 'static,
|
||||||
P: Service<TxStoreRequest<TxId>, Response = TxStoreResponse<Tx>, Error = tower::BoxError>,
|
P: Service<TxStoreRequest<TxId>, Response = TxStoreResponse<Tx>, Error = tower::BoxError>,
|
||||||
P::Future: Send + 'static,
|
P::Future: Send + 'static,
|
||||||
R: Service<DandelionRouteReq<Tx, PId>, Response = State, Error = DandelionRouterError>,
|
R: Service<DandelionRouteReq<Tx, PeerId>, Response = State, Error = DandelionRouterError>,
|
||||||
R::Future: Send + 'static,
|
R::Future: Send + 'static,
|
||||||
{
|
{
|
||||||
/// Adds a new embargo timer to the running timers, with a duration pulled from [`Self::embargo_dist`]
|
/// Adds a new embargo timer to the running timers, with a duration pulled from [`Self::embargo_dist`]
|
||||||
|
@ -79,7 +79,7 @@ where
|
||||||
&mut self,
|
&mut self,
|
||||||
tx: Tx,
|
tx: Tx,
|
||||||
tx_id: TxId,
|
tx_id: TxId,
|
||||||
from: Option<PId>,
|
from: Option<PeerId>,
|
||||||
) -> Result<(), tower::BoxError> {
|
) -> Result<(), tower::BoxError> {
|
||||||
if let Some(peer) = &from {
|
if let Some(peer) = &from {
|
||||||
self.stem_origins
|
self.stem_origins
|
||||||
|
@ -126,7 +126,7 @@ where
|
||||||
async fn handle_incoming_tx(
|
async fn handle_incoming_tx(
|
||||||
&mut self,
|
&mut self,
|
||||||
tx: Tx,
|
tx: Tx,
|
||||||
tx_state: TxState<PId>,
|
tx_state: TxState<PeerId>,
|
||||||
tx_id: TxId,
|
tx_id: TxId,
|
||||||
) -> Result<(), tower::BoxError> {
|
) -> Result<(), tower::BoxError> {
|
||||||
match tx_state {
|
match tx_state {
|
||||||
|
@ -225,7 +225,7 @@ where
|
||||||
/// Starts the [`DandelionPoolManager`].
|
/// Starts the [`DandelionPoolManager`].
|
||||||
pub(crate) async fn run(
|
pub(crate) async fn run(
|
||||||
mut self,
|
mut self,
|
||||||
mut rx: mpsc::Receiver<(IncomingTx<Tx, TxId, PId>, oneshot::Sender<()>)>,
|
mut rx: mpsc::Receiver<(IncomingTx<Tx, TxId, PeerId>, oneshot::Sender<()>)>,
|
||||||
) {
|
) {
|
||||||
tracing::debug!("Starting dandelion++ tx-pool, config: {:?}", self.config);
|
tracing::debug!("Starting dandelion++ tx-pool, config: {:?}", self.config);
|
||||||
|
|
||||||
|
|
|
@ -60,21 +60,21 @@ pub use manager::DandelionPoolManager;
|
||||||
/// user to customise routing functionality.
|
/// user to customise routing functionality.
|
||||||
/// - `backing_pool` is the backing transaction storage service
|
/// - `backing_pool` is the backing transaction storage service
|
||||||
/// - `config` is [`DandelionConfig`].
|
/// - `config` is [`DandelionConfig`].
|
||||||
pub fn start_dandelion_pool_manager<P, R, Tx, TxId, PId>(
|
pub fn start_dandelion_pool_manager<P, R, Tx, TxId, PeerId>(
|
||||||
buffer_size: usize,
|
buffer_size: usize,
|
||||||
dandelion_router: R,
|
dandelion_router: R,
|
||||||
backing_pool: P,
|
backing_pool: P,
|
||||||
config: DandelionConfig,
|
config: DandelionConfig,
|
||||||
) -> DandelionPoolService<Tx, TxId, PId>
|
) -> DandelionPoolService<Tx, TxId, PeerId>
|
||||||
where
|
where
|
||||||
Tx: Clone + Send + 'static,
|
Tx: Clone + Send + 'static,
|
||||||
TxId: Hash + Eq + Clone + Send + 'static,
|
TxId: Hash + Eq + Clone + Send + 'static,
|
||||||
PId: Hash + Eq + Clone + Send + 'static,
|
PeerId: Hash + Eq + Clone + Send + 'static,
|
||||||
P: Service<TxStoreRequest<TxId>, Response = TxStoreResponse<Tx>, Error = tower::BoxError>
|
P: Service<TxStoreRequest<TxId>, Response = TxStoreResponse<Tx>, Error = tower::BoxError>
|
||||||
+ Send
|
+ Send
|
||||||
+ 'static,
|
+ 'static,
|
||||||
P::Future: Send + 'static,
|
P::Future: Send + 'static,
|
||||||
R: Service<DandelionRouteReq<Tx, PId>, Response = State, Error = DandelionRouterError>
|
R: Service<DandelionRouteReq<Tx, PeerId>, Response = State, Error = DandelionRouterError>
|
||||||
+ Send
|
+ Send
|
||||||
+ 'static,
|
+ 'static,
|
||||||
R::Future: Send + 'static,
|
R::Future: Send + 'static,
|
||||||
|
@ -105,16 +105,17 @@ where
|
||||||
///
|
///
|
||||||
/// Used to send [`IncomingTx`]s to the [`DandelionPoolManager`]
|
/// Used to send [`IncomingTx`]s to the [`DandelionPoolManager`]
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct DandelionPoolService<Tx, TxId, PId> {
|
pub struct DandelionPoolService<Tx, TxId, PeerId> {
|
||||||
/// The channel to [`DandelionPoolManager`].
|
/// The channel to [`DandelionPoolManager`].
|
||||||
tx: PollSender<(IncomingTx<Tx, TxId, PId>, oneshot::Sender<()>)>,
|
tx: PollSender<(IncomingTx<Tx, TxId, PeerId>, oneshot::Sender<()>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Tx, TxId, PId> Service<IncomingTx<Tx, TxId, PId>> for DandelionPoolService<Tx, TxId, PId>
|
impl<Tx, TxId, PeerId> Service<IncomingTx<Tx, TxId, PeerId>>
|
||||||
|
for DandelionPoolService<Tx, TxId, PeerId>
|
||||||
where
|
where
|
||||||
Tx: Clone + Send,
|
Tx: Clone + Send,
|
||||||
TxId: Hash + Eq + Clone + Send + 'static,
|
TxId: Hash + Eq + Clone + Send + 'static,
|
||||||
PId: Hash + Eq + Clone + Send + 'static,
|
PeerId: Hash + Eq + Clone + Send + 'static,
|
||||||
{
|
{
|
||||||
type Response = ();
|
type Response = ();
|
||||||
type Error = DandelionPoolShutDown;
|
type Error = DandelionPoolShutDown;
|
||||||
|
@ -124,7 +125,7 @@ where
|
||||||
self.tx.poll_reserve(cx).map_err(|_| DandelionPoolShutDown)
|
self.tx.poll_reserve(cx).map_err(|_| DandelionPoolShutDown)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn call(&mut self, req: IncomingTx<Tx, TxId, PId>) -> Self::Future {
|
fn call(&mut self, req: IncomingTx<Tx, TxId, PeerId>) -> Self::Future {
|
||||||
// although the channel isn't sending anything we want to wait for the request to be handled before continuing.
|
// although the channel isn't sending anything we want to wait for the request to be handled before continuing.
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue