mirror of
https://github.com/Cuprate/cuprate.git
synced 2025-01-10 21:05:01 +00:00
init dandelion integration
This commit is contained in:
parent
8be369846e
commit
adf592e530
16 changed files with 504 additions and 3 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -1008,6 +1008,7 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"serde_bytes",
|
"serde_bytes",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
"sha3",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"thread_local",
|
"thread_local",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
|
|
@ -78,6 +78,7 @@ rayon = { version = "1.10.0", default-features = false }
|
||||||
serde_bytes = { version = "0.11.15", default-features = false }
|
serde_bytes = { version = "0.11.15", default-features = false }
|
||||||
serde_json = { version = "1.0.128", default-features = false }
|
serde_json = { version = "1.0.128", default-features = false }
|
||||||
serde = { version = "1.0.210", default-features = false }
|
serde = { version = "1.0.210", default-features = false }
|
||||||
|
sha3 = { version = "0.10.8", default-features = false }
|
||||||
thiserror = { version = "1.0.63", default-features = false }
|
thiserror = { version = "1.0.63", default-features = false }
|
||||||
thread_local = { version = "1.1.8", default-features = false }
|
thread_local = { version = "1.1.8", default-features = false }
|
||||||
tokio-util = { version = "0.7.12", default-features = false }
|
tokio-util = { version = "0.7.12", default-features = false }
|
||||||
|
|
|
@ -64,6 +64,7 @@ rayon = { workspace = true }
|
||||||
serde_bytes = { workspace = true }
|
serde_bytes = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
|
sha3 = { workspace = true, features = ["std"] }
|
||||||
thiserror = { workspace = true }
|
thiserror = { workspace = true }
|
||||||
thread_local = { workspace = true }
|
thread_local = { workspace = true }
|
||||||
tokio-util = { workspace = true }
|
tokio-util = { workspace = true }
|
||||||
|
|
|
@ -25,7 +25,7 @@ mod manager;
|
||||||
mod syncer;
|
mod syncer;
|
||||||
mod types;
|
mod types;
|
||||||
|
|
||||||
use types::{
|
pub use types::{
|
||||||
ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle,
|
ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,8 @@
|
||||||
//! Transaction Pool
|
//! Transaction Pool
|
||||||
//!
|
//!
|
||||||
//! Will handle initiating the tx-pool, providing the preprocessor required for the dandelion pool.
|
//! Will handle initiating the tx-pool, providing the preprocessor required for the dandelion pool.
|
||||||
|
|
||||||
|
mod dandelion;
|
||||||
|
mod incoming_tx;
|
||||||
|
mod manager;
|
||||||
|
mod txs_being_handled;
|
||||||
|
|
38
binaries/cuprated/src/txpool/dandelion.rs
Normal file
38
binaries/cuprated/src/txpool/dandelion.rs
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
use bytes::Bytes;
|
||||||
|
use cuprate_dandelion_tower::{DandelionConfig, DandelionRouter};
|
||||||
|
use cuprate_p2p::NetworkInterface;
|
||||||
|
use cuprate_p2p_core::ClearNet;
|
||||||
|
use cuprate_wire::NetworkAddress;
|
||||||
|
|
||||||
|
mod diffuse_service;
|
||||||
|
mod stem_service;
|
||||||
|
mod tx_store;
|
||||||
|
|
||||||
|
struct DandelionTx(Bytes);
|
||||||
|
|
||||||
|
type TxId = [u8; 32];
|
||||||
|
|
||||||
|
pub fn start_dandelion_router(
|
||||||
|
clear_net: NetworkInterface<ClearNet>,
|
||||||
|
) -> DandelionRouter<
|
||||||
|
stem_service::OutboundPeerStream,
|
||||||
|
diffuse_service::DiffuseService,
|
||||||
|
NetworkAddress,
|
||||||
|
stem_service::StemPeerService<ClearNet>,
|
||||||
|
DandelionTx,
|
||||||
|
> {
|
||||||
|
DandelionRouter::new(
|
||||||
|
diffuse_service::DiffuseService {
|
||||||
|
clear_net_broadcast_service: clear_net.broadcast_svc(),
|
||||||
|
},
|
||||||
|
stem_service::OutboundPeerStream {
|
||||||
|
clear_net: clear_net.clone(),
|
||||||
|
},
|
||||||
|
DandelionConfig {
|
||||||
|
time_between_hop: Default::default(),
|
||||||
|
epoch_duration: Default::default(),
|
||||||
|
fluff_probability: 0.0,
|
||||||
|
graph: Default::default(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
32
binaries/cuprated/src/txpool/dandelion/diffuse_service.rs
Normal file
32
binaries/cuprated/src/txpool/dandelion/diffuse_service.rs
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use tower::Service;
|
||||||
|
|
||||||
|
use crate::txpool::dandelion::DandelionTx;
|
||||||
|
use cuprate_dandelion_tower::traits::DiffuseRequest;
|
||||||
|
use cuprate_p2p::{BroadcastRequest, BroadcastSvc, NetworkInterface};
|
||||||
|
use cuprate_p2p_core::ClearNet;
|
||||||
|
|
||||||
|
pub struct DiffuseService {
|
||||||
|
pub clear_net_broadcast_service: BroadcastSvc<ClearNet>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Service<DiffuseRequest<DandelionTx>> for DiffuseService {
|
||||||
|
type Response = BroadcastSvc::Response;
|
||||||
|
type Error = tower::BoxError;
|
||||||
|
type Future = BroadcastSvc::Future;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
self.clear_net_broadcast_service
|
||||||
|
.poll_ready(cx)
|
||||||
|
.map_err(Into::into)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: DiffuseRequest<DandelionTx>) -> Self::Future {
|
||||||
|
self.clear_net_broadcast_service
|
||||||
|
.call(BroadcastRequest::Transaction {
|
||||||
|
tx_bytes: req.0 .0,
|
||||||
|
direction: None,
|
||||||
|
received_from: None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
54
binaries/cuprated/src/txpool/dandelion/stem_service.rs
Normal file
54
binaries/cuprated/src/txpool/dandelion/stem_service.rs
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
use super::DandelionTx;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use cuprate_dandelion_tower::traits::StemRequest;
|
||||||
|
use cuprate_dandelion_tower::OutboundPeer;
|
||||||
|
use cuprate_p2p::NetworkInterface;
|
||||||
|
use cuprate_p2p_core::client::Client;
|
||||||
|
use cuprate_p2p_core::{ClearNet, NetworkZone, PeerRequest, ProtocolRequest};
|
||||||
|
use cuprate_wire::protocol::NewTransactions;
|
||||||
|
use cuprate_wire::NetworkAddress;
|
||||||
|
use futures::Stream;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use tower::Service;
|
||||||
|
|
||||||
|
pub struct OutboundPeerStream {
|
||||||
|
pub clear_net: NetworkInterface<ClearNet>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream for OutboundPeerStream {
|
||||||
|
type Item = Result<OutboundPeer<NetworkAddress, StemPeerService<ClearNet>>, tower::BoxError>;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
Poll::Ready(Some(Ok(self
|
||||||
|
.clear_net
|
||||||
|
.client_pool()
|
||||||
|
.outbound_client()
|
||||||
|
.map_or(OutboundPeer::Exhausted, |client| {
|
||||||
|
OutboundPeer::Peer(client.info.id.into(), StemPeerService(client))
|
||||||
|
}))))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct StemPeerService<N>(Client<N>);
|
||||||
|
|
||||||
|
impl<N: NetworkZone> Service<StemRequest<DandelionTx>> for StemPeerService<N> {
|
||||||
|
type Response = ();
|
||||||
|
type Error = tower::BoxError;
|
||||||
|
type Future = Client::Future;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
self.0.poll_ready(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: StemRequest<DandelionTx>) -> Self::Future {
|
||||||
|
self.0
|
||||||
|
.call(PeerRequest::Protocol(ProtocolRequest::NewTransactions(
|
||||||
|
NewTransactions {
|
||||||
|
txs: vec![req.0 .0],
|
||||||
|
dandelionpp_fluff: false,
|
||||||
|
padding: Bytes::new(),
|
||||||
|
},
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
}
|
47
binaries/cuprated/src/txpool/dandelion/tx_store.rs
Normal file
47
binaries/cuprated/src/txpool/dandelion/tx_store.rs
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
use crate::txpool::dandelion::{DandelionTx, TxId};
|
||||||
|
use bytes::Bytes;
|
||||||
|
use cuprate_dandelion_tower::traits::{TxStoreRequest, TxStoreResponse};
|
||||||
|
use cuprate_database::RuntimeError;
|
||||||
|
use cuprate_txpool::service::interface::{TxpoolReadRequest, TxpoolReadResponse};
|
||||||
|
use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
|
||||||
|
use futures::future::BoxFuture;
|
||||||
|
use futures::{FutureExt, StreamExt, TryFutureExt};
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use tower::util::Oneshot;
|
||||||
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
|
pub struct TxStoreService {
|
||||||
|
txpool_read_handle: TxpoolReadHandle,
|
||||||
|
txpool_write_handle: TxpoolWriteHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Service<TxStoreRequest<TxId>> for TxStoreService {
|
||||||
|
type Response = TxStoreResponse<DandelionTx>;
|
||||||
|
type Error = tower::BoxError;
|
||||||
|
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: TxStoreRequest<TxId>) -> Self::Future {
|
||||||
|
match req {
|
||||||
|
TxStoreRequest::Get(tx_id) => self
|
||||||
|
.txpool_read_handle
|
||||||
|
.clone()
|
||||||
|
.oneshot(TxpoolReadRequest::TxBlob(tx_id))
|
||||||
|
.map(|res| match res {
|
||||||
|
Ok(TxpoolReadResponse::TxBlob(blob)) => Ok(TxStoreResponse::Transaction(Some(
|
||||||
|
(DandelionTx(Bytes::from(blob)), todo!()),
|
||||||
|
))),
|
||||||
|
Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Transaction(None)),
|
||||||
|
Err(e) => Err(e.into()),
|
||||||
|
Ok(_) => unreachable!(),
|
||||||
|
})
|
||||||
|
.boxed(),
|
||||||
|
TxStoreRequest::Promote(tx_id) => {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
249
binaries/cuprated/src/txpool/incoming_tx.rs
Normal file
249
binaries/cuprated/src/txpool/incoming_tx.rs
Normal file
|
@ -0,0 +1,249 @@
|
||||||
|
use std::collections::HashSet;
|
||||||
|
use std::future::ready;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
use crate::blockchain::ConcreteTxVerifierService;
|
||||||
|
use crate::txpool::txs_being_handled::{tx_blob_hash, TxBeingHandledLocally, TxsBeingHandled};
|
||||||
|
use bytes::Bytes;
|
||||||
|
use cuprate_consensus::transactions::new_tx_verification_data;
|
||||||
|
use cuprate_consensus::{
|
||||||
|
BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService,
|
||||||
|
ExtendedConsensusError, TxVerifierService, VerifyTxRequest, VerifyTxResponse,
|
||||||
|
};
|
||||||
|
use cuprate_dandelion_tower::pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder};
|
||||||
|
use cuprate_dandelion_tower::TxState;
|
||||||
|
use cuprate_helper::asynch::rayon_spawn_async;
|
||||||
|
use cuprate_txpool::service::interface::{
|
||||||
|
TxpoolReadRequest, TxpoolWriteRequest, TxpoolWriteResponse,
|
||||||
|
};
|
||||||
|
use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
|
||||||
|
use cuprate_wire::NetworkAddress;
|
||||||
|
use dashmap::DashSet;
|
||||||
|
use futures::future::BoxFuture;
|
||||||
|
use futures::FutureExt;
|
||||||
|
use monero_serai::transaction::Transaction;
|
||||||
|
use sha3::{Digest, Sha3_256};
|
||||||
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
|
pub enum IncomingTxError {
|
||||||
|
Parse(std::io::Error),
|
||||||
|
Consensus(ExtendedConsensusError),
|
||||||
|
DuplicateTransaction,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum IncomingTxs {
|
||||||
|
Bytes {
|
||||||
|
txs: Vec<Bytes>,
|
||||||
|
state: TxState<NetworkAddress>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
struct DandelionTx(Bytes);
|
||||||
|
|
||||||
|
type TxId = [u8; 32];
|
||||||
|
|
||||||
|
pub struct IncomingTxHandler {
|
||||||
|
txs_being_added: Arc<TxsBeingHandled>,
|
||||||
|
|
||||||
|
blockchain_context_cache: BlockChainContextService,
|
||||||
|
|
||||||
|
dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, NetworkAddress>,
|
||||||
|
tx_verifier_service: ConcreteTxVerifierService,
|
||||||
|
|
||||||
|
txpool_write_handle: TxpoolWriteHandle,
|
||||||
|
|
||||||
|
txpool_read_handle: TxpoolReadHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Service<IncomingTxs> for IncomingTxHandler {
|
||||||
|
type Response = ();
|
||||||
|
type Error = IncomingTxError;
|
||||||
|
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: IncomingTxs) -> Self::Future {
|
||||||
|
let IncomingTxs::Bytes { mut txs, state } = req;
|
||||||
|
|
||||||
|
let mut local_tracker = self.txs_being_added.local_tracker();
|
||||||
|
|
||||||
|
txs.retain(|bytes| local_tracker.try_add_tx(bytes.as_ref()));
|
||||||
|
|
||||||
|
if txs.is_empty() {
|
||||||
|
return ready(Ok(())).boxed();
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut blockchain_context_cache = self.blockchain_context_cache.clone();
|
||||||
|
let mut tx_verifier_service = self.tx_verifier_service.clone();
|
||||||
|
let mut txpool_write_handle = self.txpool_write_handle.clone();
|
||||||
|
|
||||||
|
async move {
|
||||||
|
let txs = rayon_spawn_async(move || {
|
||||||
|
txs.into_iter()
|
||||||
|
.map(|bytes| {
|
||||||
|
let tx = Transaction::read(&mut bytes.as_ref())
|
||||||
|
.map_err(IncomingTxError::Parse)?;
|
||||||
|
|
||||||
|
let tx = new_tx_verification_data(tx)
|
||||||
|
.map_err(|e| IncomingTxError::Consensus(e.into()))?;
|
||||||
|
|
||||||
|
Ok(Arc::new(tx))
|
||||||
|
})
|
||||||
|
.collect::<Result<Vec<_>, IncomingTxError>>()
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let BlockChainContextResponse::Context(context) = blockchain_context_cache
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(BlockChainContextRequest::GetContext)
|
||||||
|
.await?
|
||||||
|
else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
|
||||||
|
let context = context.unchecked_blockchain_context();
|
||||||
|
|
||||||
|
tx_verifier_service
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(VerifyTxRequest::Prepped {
|
||||||
|
txs: txs.clone(),
|
||||||
|
current_chain_height: context.chain_height,
|
||||||
|
top_hash: context.top_hash,
|
||||||
|
time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(),
|
||||||
|
hf: context.current_hf,
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
txpool_write_handle
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(TxpoolWriteRequest::AddTransaction {
|
||||||
|
tx,
|
||||||
|
state_stem: state.state_stem(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_incoming_txs(
|
||||||
|
txs: Vec<Bytes>,
|
||||||
|
state: TxState<NetworkAddress>,
|
||||||
|
tx_being_handled_locally: TxBeingHandledLocally,
|
||||||
|
mut blockchain_context_cache: BlockChainContextService,
|
||||||
|
mut tx_verifier_service: ConcreteTxVerifierService,
|
||||||
|
mut txpool_write_handle: TxpoolWriteHandle,
|
||||||
|
mut txpool_read_handle: TxpoolReadHandle,
|
||||||
|
mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, NetworkAddress>,
|
||||||
|
) -> Result<(), IncomingTxError> {
|
||||||
|
let mut tx_blob_hashes = HashSet::new();
|
||||||
|
|
||||||
|
let txs = txs
|
||||||
|
.into_iter()
|
||||||
|
.map(|tx_blob| {
|
||||||
|
let tx_blob_hash = tx_blob_hash(tx_blob.as_ref());
|
||||||
|
if !tx_blob_hashes.insert(tx_blob_hash) {
|
||||||
|
return Err(IncomingTxError::DuplicateTransaction);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok((tx_blob_hash, tx_blob))
|
||||||
|
})
|
||||||
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
|
let TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes) = txpool_read_handle
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes))
|
||||||
|
.await?
|
||||||
|
else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
|
||||||
|
let txs = rayon_spawn_async(move || {
|
||||||
|
txs.into_iter()
|
||||||
|
.filter_map(|(tx_blob_hash, tx_blob)| {
|
||||||
|
if tx_blob_hashes.contains(&tx_blob_hash) {
|
||||||
|
Some(tx_blob)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.map(|bytes| {
|
||||||
|
let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?;
|
||||||
|
|
||||||
|
let tx = new_tx_verification_data(tx)
|
||||||
|
.map_err(|e| IncomingTxError::Consensus(e.into()))?;
|
||||||
|
|
||||||
|
Ok(Arc::new(tx))
|
||||||
|
})
|
||||||
|
.collect::<Result<Vec<_>, IncomingTxError>>()
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let BlockChainContextResponse::Context(context) = blockchain_context_cache
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(BlockChainContextRequest::GetContext)
|
||||||
|
.await?
|
||||||
|
else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
|
||||||
|
let context = context.unchecked_blockchain_context();
|
||||||
|
|
||||||
|
tx_verifier_service
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(VerifyTxRequest::Prepped {
|
||||||
|
txs: txs.clone(),
|
||||||
|
current_chain_height: context.chain_height,
|
||||||
|
top_hash: context.top_hash,
|
||||||
|
time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(),
|
||||||
|
hf: context.current_hf,
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
for tx in txs {
|
||||||
|
let incoming_tx = IncomingTxBuilder::new(Bytes::copy_from_slice(&tx.tx_blob), tx.tx_hash);
|
||||||
|
|
||||||
|
let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(TxpoolWriteRequest::AddTransaction {
|
||||||
|
tx,
|
||||||
|
state_stem: state.state_stem(),
|
||||||
|
})
|
||||||
|
.await?
|
||||||
|
else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: track double spends to quickly ignore them from their blob hash.
|
||||||
|
if let Some(tx_hash) = double_spend {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: check blockchain for double spends to prevent a race condition.
|
||||||
|
|
||||||
|
// TODO: fill this in properly.
|
||||||
|
let incoming_tx = incoming_tx
|
||||||
|
.with_routing_state(state.clone())
|
||||||
|
.with_state_in_db(None)
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
dandelion_pool_manager
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(incoming_tx)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
1
binaries/cuprated/src/txpool/manager.rs
Normal file
1
binaries/cuprated/src/txpool/manager.rs
Normal file
|
@ -0,0 +1 @@
|
||||||
|
|
48
binaries/cuprated/src/txpool/txs_being_handled.rs
Normal file
48
binaries/cuprated/src/txpool/txs_being_handled.rs
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
use dashmap::DashSet;
|
||||||
|
use sha3::{Digest, Sha3_256};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
pub fn tx_blob_hash(tx_bytes: &[u8]) -> [u8; 32] {
|
||||||
|
let mut hasher = Sha3_256::new();
|
||||||
|
hasher.update(tx_bytes);
|
||||||
|
hasher.finalize().into()
|
||||||
|
}
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct TxsBeingHandled(Arc<DashSet<[u8; 32]>>);
|
||||||
|
|
||||||
|
impl TxsBeingHandled {
|
||||||
|
pub fn local_tracker(&self) -> TxBeingHandledLocally {
|
||||||
|
TxBeingHandledLocally {
|
||||||
|
txs_being_handled: self.clone(),
|
||||||
|
txs: vec![],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct TxBeingHandledLocally {
|
||||||
|
txs_being_handled: TxsBeingHandled,
|
||||||
|
txs: Vec<[u8; 32]>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TxBeingHandledLocally {
|
||||||
|
pub fn try_add_tx(&mut self, tx_bytes: &[u8]) -> bool {
|
||||||
|
let mut hasher = Sha3_256::new();
|
||||||
|
hasher.update(tx_bytes);
|
||||||
|
let tx_blob_hash = hasher.finalize().into();
|
||||||
|
|
||||||
|
if !self.txs_being_handled.0.insert(tx_blob_hash) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.txs.push(tx_blob_hash);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for TxBeingHandledLocally {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
for hash in &self.txs {
|
||||||
|
self.txs_being_handled.0.remove(hash);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -73,6 +73,12 @@ pub enum TxState<Id> {
|
||||||
Local,
|
Local,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<Id> TxState<Id> {
|
||||||
|
pub const fn state_stem(&self) -> bool {
|
||||||
|
matches!(self, Self::Local | Self::Stem { .. })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// A request to route a transaction.
|
/// A request to route a transaction.
|
||||||
pub struct DandelionRouteReq<Tx, Id> {
|
pub struct DandelionRouteReq<Tx, Id> {
|
||||||
/// The transaction.
|
/// The transaction.
|
||||||
|
|
|
@ -18,7 +18,7 @@ use tracing::{Instrument, Span};
|
||||||
use cuprate_p2p_core::{
|
use cuprate_p2p_core::{
|
||||||
client::{Client, InternalPeerID},
|
client::{Client, InternalPeerID},
|
||||||
handles::ConnectionHandle,
|
handles::ConnectionHandle,
|
||||||
NetworkZone,
|
ConnectionDirection, NetworkZone,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub(crate) mod disconnect_monitor;
|
pub(crate) mod disconnect_monitor;
|
||||||
|
@ -165,6 +165,16 @@ impl<N: NetworkZone> ClientPool<N> {
|
||||||
sync_data.cumulative_difficulty() > cumulative_difficulty
|
sync_data.cumulative_difficulty() > cumulative_difficulty
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn outbound_client(&self) -> Option<Client<N>> {
|
||||||
|
let client = self
|
||||||
|
.clients
|
||||||
|
.iter()
|
||||||
|
.find(|element| element.value().info.direction == ConnectionDirection::Outbound)?;
|
||||||
|
let id = *client.key();
|
||||||
|
|
||||||
|
Some(self.clients.remove(&id).unwrap().1)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mod sealed {
|
mod sealed {
|
||||||
|
|
|
@ -21,7 +21,7 @@ const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter");
|
||||||
/// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`]
|
/// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`]
|
||||||
/// will return an `async`hronous channel that can be `.await`ed upon
|
/// will return an `async`hronous channel that can be `.await`ed upon
|
||||||
/// to receive the corresponding response.
|
/// to receive the corresponding response.
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct DatabaseWriteHandle<Req, Res> {
|
pub struct DatabaseWriteHandle<Req, Res> {
|
||||||
/// Sender channel to the database write thread-pool.
|
/// Sender channel to the database write thread-pool.
|
||||||
///
|
///
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
//! Tx-pool [`service`](super) interface.
|
//! Tx-pool [`service`](super) interface.
|
||||||
//!
|
//!
|
||||||
//! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums.
|
//! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums.
|
||||||
|
|
||||||
|
use std::collections::HashSet;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use cuprate_types::TransactionVerificationData;
|
use cuprate_types::TransactionVerificationData;
|
||||||
|
@ -9,11 +11,14 @@ use crate::types::TransactionHash;
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------------------------- TxpoolReadRequest
|
//---------------------------------------------------------------------------------------------------- TxpoolReadRequest
|
||||||
/// The transaction pool [`tower::Service`] read request type.
|
/// The transaction pool [`tower::Service`] read request type.
|
||||||
|
#[derive(Clone)]
|
||||||
pub enum TxpoolReadRequest {
|
pub enum TxpoolReadRequest {
|
||||||
/// A request for the blob (raw bytes) of a transaction with the given hash.
|
/// A request for the blob (raw bytes) of a transaction with the given hash.
|
||||||
TxBlob(TransactionHash),
|
TxBlob(TransactionHash),
|
||||||
/// A request for the [`TransactionVerificationData`] of a transaction in the tx pool.
|
/// A request for the [`TransactionVerificationData`] of a transaction in the tx pool.
|
||||||
TxVerificationData(TransactionHash),
|
TxVerificationData(TransactionHash),
|
||||||
|
|
||||||
|
FilterKnownTxBlobHashes(HashSet<TransactionHash>),
|
||||||
}
|
}
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------------------------- TxpoolReadResponse
|
//---------------------------------------------------------------------------------------------------- TxpoolReadResponse
|
||||||
|
@ -25,10 +30,13 @@ pub enum TxpoolReadResponse {
|
||||||
TxBlob(Vec<u8>),
|
TxBlob(Vec<u8>),
|
||||||
/// A response of [`TransactionVerificationData`].
|
/// A response of [`TransactionVerificationData`].
|
||||||
TxVerificationData(TransactionVerificationData),
|
TxVerificationData(TransactionVerificationData),
|
||||||
|
|
||||||
|
FilterKnownTxBlobHashes(HashSet<TransactionHash>),
|
||||||
}
|
}
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------------------------- TxpoolWriteRequest
|
//---------------------------------------------------------------------------------------------------- TxpoolWriteRequest
|
||||||
/// The transaction pool [`tower::Service`] write request type.
|
/// The transaction pool [`tower::Service`] write request type.
|
||||||
|
#[derive(Clone)]
|
||||||
pub enum TxpoolWriteRequest {
|
pub enum TxpoolWriteRequest {
|
||||||
/// Add a transaction to the pool.
|
/// Add a transaction to the pool.
|
||||||
///
|
///
|
||||||
|
|
Loading…
Reference in a new issue