mirror of
https://github.com/Cuprate/cuprate.git
synced 2025-01-10 12:54:47 +00:00
handle incoming txs in p2p request handler
This commit is contained in:
parent
511639d1b4
commit
54957c1033
3 changed files with 104 additions and 16 deletions
|
@ -10,7 +10,11 @@ use std::{
|
||||||
use tower::{Service, ServiceExt};
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
use cuprate_blockchain::service::BlockchainReadHandle;
|
use cuprate_blockchain::service::BlockchainReadHandle;
|
||||||
use cuprate_consensus::{transactions::new_tx_verification_data, BlockChainContextService};
|
use cuprate_consensus::{
|
||||||
|
transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
|
||||||
|
BlockChainContextService,
|
||||||
|
};
|
||||||
|
use cuprate_dandelion_tower::TxState;
|
||||||
use cuprate_fixed_bytes::ByteArrayVec;
|
use cuprate_fixed_bytes::ByteArrayVec;
|
||||||
use cuprate_helper::cast::u64_to_usize;
|
use cuprate_helper::cast::u64_to_usize;
|
||||||
use cuprate_helper::{
|
use cuprate_helper::{
|
||||||
|
@ -19,7 +23,10 @@ use cuprate_helper::{
|
||||||
map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits},
|
map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits},
|
||||||
};
|
};
|
||||||
use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN;
|
use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN;
|
||||||
use cuprate_p2p_core::{client::PeerInformation, NetworkZone, ProtocolRequest, ProtocolResponse};
|
use cuprate_p2p_core::client::InternalPeerID;
|
||||||
|
use cuprate_p2p_core::{
|
||||||
|
client::PeerInformation, NetZoneAddress, NetworkZone, ProtocolRequest, ProtocolResponse,
|
||||||
|
};
|
||||||
use cuprate_txpool::service::TxpoolReadHandle;
|
use cuprate_txpool::service::TxpoolReadHandle;
|
||||||
use cuprate_types::{
|
use cuprate_types::{
|
||||||
blockchain::{BlockchainReadRequest, BlockchainResponse},
|
blockchain::{BlockchainReadRequest, BlockchainResponse},
|
||||||
|
@ -27,21 +34,28 @@ use cuprate_types::{
|
||||||
};
|
};
|
||||||
use cuprate_wire::protocol::{
|
use cuprate_wire::protocol::{
|
||||||
ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest,
|
ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest,
|
||||||
GetObjectsResponse, NewFluffyBlock,
|
GetObjectsResponse, NewFluffyBlock, NewTransactions,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::blockchain::interface::{self as blockchain_interface, IncomingBlockError};
|
use crate::blockchain::interface::{self as blockchain_interface, IncomingBlockError};
|
||||||
|
use crate::constants::PANIC_CRITICAL_SERVICE_ERROR;
|
||||||
|
use crate::txpool::{IncomingTxHandler, IncomingTxs};
|
||||||
|
|
||||||
/// The P2P protocol request handler [`MakeService`](tower::MakeService).
|
/// The P2P protocol request handler [`MakeService`](tower::MakeService).
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct P2pProtocolRequestHandlerMaker {
|
pub struct P2pProtocolRequestHandlerMaker {
|
||||||
/// The [`BlockchainReadHandle`]
|
/// The [`BlockchainReadHandle`]
|
||||||
pub blockchain_read_handle: BlockchainReadHandle,
|
pub blockchain_read_handle: BlockchainReadHandle,
|
||||||
|
|
||||||
|
pub blockchain_context_service: BlockChainContextService,
|
||||||
|
|
||||||
/// The [`TxpoolReadHandle`].
|
/// The [`TxpoolReadHandle`].
|
||||||
pub txpool_read_handle: TxpoolReadHandle,
|
pub txpool_read_handle: TxpoolReadHandle,
|
||||||
|
|
||||||
|
pub incoming_tx_handler: IncomingTxHandler,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<N: NetworkZone> Service<PeerInformation<N>> for P2pProtocolRequestHandlerMaker {
|
impl<N: NetZoneAddress> Service<PeerInformation<N>> for P2pProtocolRequestHandlerMaker {
|
||||||
type Response = P2pProtocolRequestHandler<N>;
|
type Response = P2pProtocolRequestHandler<N>;
|
||||||
type Error = tower::BoxError;
|
type Error = tower::BoxError;
|
||||||
type Future = Ready<Result<Self::Response, Self::Error>>;
|
type Future = Ready<Result<Self::Response, Self::Error>>;
|
||||||
|
@ -59,23 +73,29 @@ impl<N: NetworkZone> Service<PeerInformation<N>> for P2pProtocolRequestHandlerMa
|
||||||
ready(Ok(P2pProtocolRequestHandler {
|
ready(Ok(P2pProtocolRequestHandler {
|
||||||
peer_information,
|
peer_information,
|
||||||
blockchain_read_handle,
|
blockchain_read_handle,
|
||||||
|
blockchain_context_service: self.blockchain_context_service.clone(),
|
||||||
txpool_read_handle,
|
txpool_read_handle,
|
||||||
|
incoming_tx_handler: self.incoming_tx_handler.clone(),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The P2P protocol request handler.
|
/// The P2P protocol request handler.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct P2pProtocolRequestHandler<N: NetworkZone> {
|
pub struct P2pProtocolRequestHandler<N: NetZoneAddress> {
|
||||||
/// The [`PeerInformation`] for this peer.
|
/// The [`PeerInformation`] for this peer.
|
||||||
peer_information: PeerInformation<N>,
|
peer_information: PeerInformation<N>,
|
||||||
/// The [`BlockchainReadHandle`].
|
/// The [`BlockchainReadHandle`].
|
||||||
blockchain_read_handle: BlockchainReadHandle,
|
blockchain_read_handle: BlockchainReadHandle,
|
||||||
|
|
||||||
|
blockchain_context_service: BlockChainContextService,
|
||||||
/// The [`TxpoolReadHandle`].
|
/// The [`TxpoolReadHandle`].
|
||||||
txpool_read_handle: TxpoolReadHandle,
|
txpool_read_handle: TxpoolReadHandle,
|
||||||
|
|
||||||
|
incoming_tx_handler: IncomingTxHandler,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Z: NetworkZone> Service<ProtocolRequest> for P2pProtocolRequestHandler<Z> {
|
impl<Z: NetZoneAddress> Service<ProtocolRequest> for P2pProtocolRequestHandler<Z> {
|
||||||
type Response = ProtocolResponse;
|
type Response = ProtocolResponse;
|
||||||
type Error = anyhow::Error;
|
type Error = anyhow::Error;
|
||||||
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||||
|
@ -105,9 +125,14 @@ impl<Z: NetworkZone> Service<ProtocolRequest> for P2pProtocolRequestHandler<Z> {
|
||||||
self.txpool_read_handle.clone(),
|
self.txpool_read_handle.clone(),
|
||||||
)
|
)
|
||||||
.boxed(),
|
.boxed(),
|
||||||
ProtocolRequest::GetTxPoolCompliment(_) | ProtocolRequest::NewTransactions(_) => {
|
ProtocolRequest::NewTransactions(r) => new_transactions(
|
||||||
ready(Ok(ProtocolResponse::NA)).boxed()
|
self.peer_information.clone(),
|
||||||
} // TODO: tx-pool
|
r,
|
||||||
|
self.blockchain_context_service.clone(),
|
||||||
|
self.incoming_tx_handler.clone(),
|
||||||
|
)
|
||||||
|
.boxed(),
|
||||||
|
ProtocolRequest::GetTxPoolCompliment(_) => ready(Ok(ProtocolResponse::NA)).boxed(), // TODO: tx-pool
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -297,3 +322,59 @@ async fn new_fluffy_block(
|
||||||
Err(e) => Err(e.into()),
|
Err(e) => Err(e.into()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn new_transactions<A: NetZoneAddress>(
|
||||||
|
peer_information: PeerInformation<A>,
|
||||||
|
request: NewTransactions,
|
||||||
|
mut blockchain_context_service: BlockChainContextService,
|
||||||
|
mut incoming_tx_handler: IncomingTxHandler,
|
||||||
|
) -> anyhow::Result<ProtocolResponse> {
|
||||||
|
let BlockChainContextResponse::Context(context) = blockchain_context_service
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
|
.call(BlockChainContextRequest::Context)
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
|
else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
|
||||||
|
let context = context.unchecked_blockchain_context();
|
||||||
|
|
||||||
|
if usize_to_u64(context.chain_height + 2)
|
||||||
|
< peer_information
|
||||||
|
.core_sync_data
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.current_height
|
||||||
|
{
|
||||||
|
return Ok(ProtocolResponse::NA);
|
||||||
|
}
|
||||||
|
|
||||||
|
let state = if request.dandelionpp_fluff {
|
||||||
|
TxState::Fluff
|
||||||
|
} else {
|
||||||
|
let InternalPeerID::KnownAddr(addr) = peer_information.id else {
|
||||||
|
todo!("Anonymity networks")
|
||||||
|
};
|
||||||
|
|
||||||
|
TxState::Stem { from: addr.into() }
|
||||||
|
};
|
||||||
|
|
||||||
|
drop(request.padding);
|
||||||
|
let res = incoming_tx_handler
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
|
.call(IncomingTxs {
|
||||||
|
txs: request.txs,
|
||||||
|
state,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Ok(()) => Ok(ProtocolResponse::NA),
|
||||||
|
Err(e) => Err(e.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -5,3 +5,5 @@
|
||||||
mod dandelion;
|
mod dandelion;
|
||||||
mod incoming_tx;
|
mod incoming_tx;
|
||||||
mod txs_being_handled;
|
mod txs_being_handled;
|
||||||
|
|
||||||
|
pub use incoming_tx::{IncomingTxError, IncomingTxHandler, IncomingTxs};
|
||||||
|
|
|
@ -37,9 +37,13 @@ use crate::{
|
||||||
};
|
};
|
||||||
|
|
||||||
/// An error that can happen handling an incoming tx.
|
/// An error that can happen handling an incoming tx.
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub enum IncomingTxError {
|
pub enum IncomingTxError {
|
||||||
|
#[error("parse error: {0}")]
|
||||||
Parse(std::io::Error),
|
Parse(std::io::Error),
|
||||||
|
#[error("consensus error: {0}")]
|
||||||
Consensus(ExtendedConsensusError),
|
Consensus(ExtendedConsensusError),
|
||||||
|
#[error("Duplicate tx sent in message")]
|
||||||
DuplicateTransaction,
|
DuplicateTransaction,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,7 +55,7 @@ pub struct IncomingTxs {
|
||||||
|
|
||||||
/// The transaction type used for dandelion++.
|
/// The transaction type used for dandelion++.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct DandelionTx(Bytes);
|
pub struct DandelionTx(Bytes);
|
||||||
|
|
||||||
/// A transaction ID/hash.
|
/// A transaction ID/hash.
|
||||||
type TxId = [u8; 32];
|
type TxId = [u8; 32];
|
||||||
|
@ -59,19 +63,20 @@ type TxId = [u8; 32];
|
||||||
/// The service than handles incoming transaction pool transactions.
|
/// The service than handles incoming transaction pool transactions.
|
||||||
///
|
///
|
||||||
/// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes.
|
/// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes.
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct IncomingTxHandler {
|
pub struct IncomingTxHandler {
|
||||||
/// A store of txs currently being handled in incoming tx requests.
|
/// A store of txs currently being handled in incoming tx requests.
|
||||||
txs_being_handled: TxsBeingHandled,
|
pub txs_being_handled: TxsBeingHandled,
|
||||||
/// The blockchain context cache.
|
/// The blockchain context cache.
|
||||||
blockchain_context_cache: BlockChainContextService,
|
pub blockchain_context_cache: BlockChainContextService,
|
||||||
/// The dandelion txpool manager.
|
/// The dandelion txpool manager.
|
||||||
dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, NetworkAddress>,
|
pub dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, NetworkAddress>,
|
||||||
/// The transaction verifier service.
|
/// The transaction verifier service.
|
||||||
tx_verifier_service: ConcreteTxVerifierService,
|
pub tx_verifier_service: ConcreteTxVerifierService,
|
||||||
/// The txpool write handle.
|
/// The txpool write handle.
|
||||||
txpool_write_handle: TxpoolWriteHandle,
|
pub txpool_write_handle: TxpoolWriteHandle,
|
||||||
/// The txpool read handle.
|
/// The txpool read handle.
|
||||||
txpool_read_handle: TxpoolReadHandle,
|
pub txpool_read_handle: TxpoolReadHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service<IncomingTxs> for IncomingTxHandler {
|
impl Service<IncomingTxs> for IncomingTxHandler {
|
||||||
|
|
Loading…
Reference in a new issue