diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index f2f39017..f6e0be39 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -1,17 +1,18 @@ +use bytes::Bytes; +use futures::{future::BoxFuture, FutureExt}; +use monero_serai::{block::Block, transaction::Transaction}; +use std::hash::Hash; use std::{ collections::HashSet, future::{ready, Ready}, task::{Context, Poll}, }; - -use bytes::Bytes; -use futures::{future::BoxFuture, FutureExt}; -use monero_serai::{block::Block, transaction::Transaction}; use tower::{Service, ServiceExt}; use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_consensus::{transactions::new_tx_verification_data, BlockChainContextService}; use cuprate_fixed_bytes::ByteArrayVec; +use cuprate_helper::cast::u64_to_usize; use cuprate_helper::{ asynch::rayon_spawn_async, cast::usize_to_u64, @@ -19,6 +20,7 @@ use cuprate_helper::{ }; use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN; use cuprate_p2p_core::{client::PeerInformation, NetworkZone, ProtocolRequest, ProtocolResponse}; +use cuprate_txpool::service::TxpoolReadHandle; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, BlockCompleteEntry, MissingTxsInBlock, TransactionBlobs, @@ -35,6 +37,8 @@ use crate::blockchain::interface::{self as blockchain_interface, IncomingBlockEr pub struct P2pProtocolRequestHandlerMaker { /// The [`BlockchainReadHandle`] pub blockchain_read_handle: BlockchainReadHandle, + /// The [`TxpoolReadHandle`]. + pub txpool_read_handle: TxpoolReadHandle, } impl Service> for P2pProtocolRequestHandlerMaker { @@ -50,10 +54,12 @@ impl Service> for P2pProtocolRequestHandlerMa // TODO: check sync info? let blockchain_read_handle = self.blockchain_read_handle.clone(); + let txpool_read_handle = self.txpool_read_handle.clone(); ready(Ok(P2pProtocolRequestHandler { peer_information, blockchain_read_handle, + txpool_read_handle, })) } } @@ -63,8 +69,10 @@ impl Service> for P2pProtocolRequestHandlerMa pub struct P2pProtocolRequestHandler { /// The [`PeerInformation`] for this peer. peer_information: PeerInformation, - /// The [`BlockchainReadHandle`] + /// The [`BlockchainReadHandle`]. blockchain_read_handle: BlockchainReadHandle, + /// The [`TxpoolReadHandle`]. + txpool_read_handle: TxpoolReadHandle, } impl Service for P2pProtocolRequestHandler { @@ -91,9 +99,12 @@ impl Service for P2pProtocolRequestHandler { "Peer sent a full block when we support fluffy blocks" ))) .boxed(), - ProtocolRequest::NewFluffyBlock(r) => { - new_fluffy_block(r, self.blockchain_read_handle.clone()).boxed() - } + ProtocolRequest::NewFluffyBlock(r) => new_fluffy_block( + r, + self.blockchain_read_handle.clone(), + self.txpool_read_handle.clone(), + ) + .boxed(), ProtocolRequest::GetTxPoolCompliment(_) | ProtocolRequest::NewTransactions(_) => { ready(Ok(ProtocolResponse::NA)).boxed() } // TODO: tx-pool @@ -233,6 +244,7 @@ async fn fluffy_missing_txs( async fn new_fluffy_block( request: NewFluffyBlock, mut blockchain_read_handle: BlockchainReadHandle, + mut txpool_read_handle: TxpoolReadHandle, ) -> anyhow::Result { let current_blockchain_height = request.current_blockchain_height; @@ -251,7 +263,7 @@ async fn new_fluffy_block( .map(|tx_blob| { let tx = Transaction::read(&mut tx_blob.as_ref())?; - Ok(tx) + Ok((tx.hash(), tx)) }) .collect::>()?; @@ -261,8 +273,13 @@ async fn new_fluffy_block( }) .await?; - let res = - blockchain_interface::handle_incoming_block(block, txs, &mut blockchain_read_handle).await; + let res = blockchain_interface::handle_incoming_block( + block, + txs, + &mut blockchain_read_handle, + &mut txpool_read_handle, + ) + .await; match res { Ok(_) => Ok(ProtocolResponse::NA), @@ -270,7 +287,7 @@ async fn new_fluffy_block( ProtocolResponse::FluffyMissingTransactionsRequest(FluffyMissingTransactionsRequest { block_hash: block_hash.into(), current_blockchain_height, - missing_tx_indices, + missing_tx_indices: missing_tx_indices.into_iter().map(usize_to_u64).collect(), }), ), Err(IncomingBlockError::Orphan) => {