diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index f6e0be39..b21e066d 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -10,7 +10,11 @@ use std::{ use tower::{Service, ServiceExt}; use cuprate_blockchain::service::BlockchainReadHandle; -use cuprate_consensus::{transactions::new_tx_verification_data, BlockChainContextService}; +use cuprate_consensus::{ + transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse, + BlockChainContextService, +}; +use cuprate_dandelion_tower::TxState; use cuprate_fixed_bytes::ByteArrayVec; use cuprate_helper::cast::u64_to_usize; use cuprate_helper::{ @@ -19,7 +23,10 @@ use cuprate_helper::{ map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}, }; use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN; -use cuprate_p2p_core::{client::PeerInformation, NetworkZone, ProtocolRequest, ProtocolResponse}; +use cuprate_p2p_core::client::InternalPeerID; +use cuprate_p2p_core::{ + client::PeerInformation, NetZoneAddress, NetworkZone, ProtocolRequest, ProtocolResponse, +}; use cuprate_txpool::service::TxpoolReadHandle; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, @@ -27,21 +34,28 @@ use cuprate_types::{ }; use cuprate_wire::protocol::{ ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest, - GetObjectsResponse, NewFluffyBlock, + GetObjectsResponse, NewFluffyBlock, NewTransactions, }; use crate::blockchain::interface::{self as blockchain_interface, IncomingBlockError}; +use crate::constants::PANIC_CRITICAL_SERVICE_ERROR; +use crate::txpool::{IncomingTxHandler, IncomingTxs}; /// The P2P protocol request handler [`MakeService`](tower::MakeService). #[derive(Clone)] pub struct P2pProtocolRequestHandlerMaker { /// The [`BlockchainReadHandle`] pub blockchain_read_handle: BlockchainReadHandle, + + pub blockchain_context_service: BlockChainContextService, + /// The [`TxpoolReadHandle`]. pub txpool_read_handle: TxpoolReadHandle, + + pub incoming_tx_handler: IncomingTxHandler, } -impl Service> for P2pProtocolRequestHandlerMaker { +impl Service> for P2pProtocolRequestHandlerMaker { type Response = P2pProtocolRequestHandler; type Error = tower::BoxError; type Future = Ready>; @@ -59,23 +73,29 @@ impl Service> for P2pProtocolRequestHandlerMa ready(Ok(P2pProtocolRequestHandler { peer_information, blockchain_read_handle, + blockchain_context_service: self.blockchain_context_service.clone(), txpool_read_handle, + incoming_tx_handler: self.incoming_tx_handler.clone(), })) } } /// The P2P protocol request handler. #[derive(Clone)] -pub struct P2pProtocolRequestHandler { +pub struct P2pProtocolRequestHandler { /// The [`PeerInformation`] for this peer. peer_information: PeerInformation, /// The [`BlockchainReadHandle`]. blockchain_read_handle: BlockchainReadHandle, + + blockchain_context_service: BlockChainContextService, /// The [`TxpoolReadHandle`]. txpool_read_handle: TxpoolReadHandle, + + incoming_tx_handler: IncomingTxHandler, } -impl Service for P2pProtocolRequestHandler { +impl Service for P2pProtocolRequestHandler { type Response = ProtocolResponse; type Error = anyhow::Error; type Future = BoxFuture<'static, Result>; @@ -105,9 +125,14 @@ impl Service for P2pProtocolRequestHandler { self.txpool_read_handle.clone(), ) .boxed(), - ProtocolRequest::GetTxPoolCompliment(_) | ProtocolRequest::NewTransactions(_) => { - ready(Ok(ProtocolResponse::NA)).boxed() - } // TODO: tx-pool + ProtocolRequest::NewTransactions(r) => new_transactions( + self.peer_information.clone(), + r, + self.blockchain_context_service.clone(), + self.incoming_tx_handler.clone(), + ) + .boxed(), + ProtocolRequest::GetTxPoolCompliment(_) => ready(Ok(ProtocolResponse::NA)).boxed(), // TODO: tx-pool } } } @@ -297,3 +322,59 @@ async fn new_fluffy_block( Err(e) => Err(e.into()), } } + +async fn new_transactions( + peer_information: PeerInformation, + request: NewTransactions, + mut blockchain_context_service: BlockChainContextService, + mut incoming_tx_handler: IncomingTxHandler, +) -> anyhow::Result { + let BlockChainContextResponse::Context(context) = blockchain_context_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockChainContextRequest::Context) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; + + let context = context.unchecked_blockchain_context(); + + if usize_to_u64(context.chain_height + 2) + < peer_information + .core_sync_data + .lock() + .unwrap() + .current_height + { + return Ok(ProtocolResponse::NA); + } + + let state = if request.dandelionpp_fluff { + TxState::Fluff + } else { + let InternalPeerID::KnownAddr(addr) = peer_information.id else { + todo!("Anonymity networks") + }; + + TxState::Stem { from: addr.into() } + }; + + drop(request.padding); + let res = incoming_tx_handler + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(IncomingTxs { + txs: request.txs, + state, + }) + .await; + + match res { + Ok(()) => Ok(ProtocolResponse::NA), + Err(e) => Err(e.into()), + } +} diff --git a/binaries/cuprated/src/txpool.rs b/binaries/cuprated/src/txpool.rs index 79731023..a44c765a 100644 --- a/binaries/cuprated/src/txpool.rs +++ b/binaries/cuprated/src/txpool.rs @@ -5,3 +5,5 @@ mod dandelion; mod incoming_tx; mod txs_being_handled; + +pub use incoming_tx::{IncomingTxError, IncomingTxHandler, IncomingTxs}; diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index 1ade33d6..9ffa8b9d 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -37,9 +37,13 @@ use crate::{ }; /// An error that can happen handling an incoming tx. +#[derive(Debug, thiserror::Error)] pub enum IncomingTxError { + #[error("parse error: {0}")] Parse(std::io::Error), + #[error("consensus error: {0}")] Consensus(ExtendedConsensusError), + #[error("Duplicate tx sent in message")] DuplicateTransaction, } @@ -51,7 +55,7 @@ pub struct IncomingTxs { /// The transaction type used for dandelion++. #[derive(Clone)] -struct DandelionTx(Bytes); +pub struct DandelionTx(Bytes); /// A transaction ID/hash. type TxId = [u8; 32]; @@ -59,19 +63,20 @@ type TxId = [u8; 32]; /// The service than handles incoming transaction pool transactions. /// /// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes. +#[derive(Clone)] pub struct IncomingTxHandler { /// A store of txs currently being handled in incoming tx requests. - txs_being_handled: TxsBeingHandled, + pub txs_being_handled: TxsBeingHandled, /// The blockchain context cache. - blockchain_context_cache: BlockChainContextService, + pub blockchain_context_cache: BlockChainContextService, /// The dandelion txpool manager. - dandelion_pool_manager: DandelionPoolService, + pub dandelion_pool_manager: DandelionPoolService, /// The transaction verifier service. - tx_verifier_service: ConcreteTxVerifierService, + pub tx_verifier_service: ConcreteTxVerifierService, /// The txpool write handle. - txpool_write_handle: TxpoolWriteHandle, + pub txpool_write_handle: TxpoolWriteHandle, /// The txpool read handle. - txpool_read_handle: TxpoolReadHandle, + pub txpool_read_handle: TxpoolReadHandle, } impl Service for IncomingTxHandler {