diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index b21e066d..be88a049 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -1,4 +1,5 @@ use bytes::Bytes; +use futures::future::Shared; use futures::{future::BoxFuture, FutureExt}; use monero_serai::{block::Block, transaction::Transaction}; use std::hash::Hash; @@ -7,6 +8,8 @@ use std::{ future::{ready, Ready}, task::{Context, Poll}, }; +use tokio::sync::{broadcast, oneshot, watch}; +use tokio_stream::wrappers::WatchStream; use tower::{Service, ServiceExt}; use cuprate_blockchain::service::BlockchainReadHandle; @@ -22,7 +25,7 @@ use cuprate_helper::{ cast::usize_to_u64, map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}, }; -use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN; +use cuprate_p2p::constants::{MAX_BLOCK_BATCH_LEN, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN}; use cuprate_p2p_core::client::InternalPeerID; use cuprate_p2p_core::{ client::PeerInformation, NetZoneAddress, NetworkZone, ProtocolRequest, ProtocolResponse, @@ -52,7 +55,9 @@ pub struct P2pProtocolRequestHandlerMaker { /// The [`TxpoolReadHandle`]. pub txpool_read_handle: TxpoolReadHandle, - pub incoming_tx_handler: IncomingTxHandler, + pub incoming_tx_handler: Option<IncomingTxHandler>, + + pub incoming_tx_handler_fut: Shared<oneshot::Receiver<IncomingTxHandler>>, } impl<N: NetZoneAddress> Service<PeerInformation<N>> for P2pProtocolRequestHandlerMaker { @@ -60,11 +65,25 @@ impl<N: NetZoneAddress> Service<PeerInformation<N>> for P2pProtocolRequestHandle type Error = tower::BoxError; type Future = Ready<Result<Self::Response, Self::Error>>; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + if self.incoming_tx_handler.is_none() { + return self + .incoming_tx_handler_fut + .poll_unpin(cx) + .map(|incoming_tx_handler| { + self.incoming_tx_handler = Some(incoming_tx_handler?); + Ok(()) + }); + } + Poll::Ready(Ok(())) } fn call(&mut self, peer_information: PeerInformation<N>) -> Self::Future { + let Some(incoming_tx_handler) = self.incoming_tx_handler.clone() else { + panic!("poll_ready was not called or did not return `Poll::Ready`") + }; + // TODO: check sync info? let blockchain_read_handle = self.blockchain_read_handle.clone(); @@ -75,7 +94,7 @@ impl<N: NetZoneAddress> Service<PeerInformation<N>> for P2pProtocolRequestHandle blockchain_read_handle, blockchain_context_service: self.blockchain_context_service.clone(), txpool_read_handle, - incoming_tx_handler: self.incoming_tx_handler.clone(), + incoming_tx_handler, })) } } @@ -120,6 +139,7 @@ impl<Z: NetZoneAddress> Service<ProtocolRequest> for P2pProtocolRequestHandler<Z ))) .boxed(), ProtocolRequest::NewFluffyBlock(r) => new_fluffy_block( + self.peer_information.clone(), r, self.blockchain_read_handle.clone(), self.txpool_read_handle.clone(), @@ -216,7 +236,7 @@ async fn get_chain( cumulative_difficulty_top64, m_block_ids: ByteArrayVec::from(block_ids), first_block: first_block_blob.map_or(Bytes::new(), Bytes::from), - // only needed when + // only needed when pruned m_block_weights: if want_pruned_data { block_weights.into_iter().map(usize_to_u64).collect() } else { @@ -266,13 +286,20 @@ async fn fluffy_missing_txs( } /// [`ProtocolRequest::NewFluffyBlock`] -async fn new_fluffy_block( +async fn new_fluffy_block<A: NetZoneAddress>( + peer_information: PeerInformation<A>, request: NewFluffyBlock, mut blockchain_read_handle: BlockchainReadHandle, mut txpool_read_handle: TxpoolReadHandle, ) -> anyhow::Result<ProtocolResponse> { let current_blockchain_height = request.current_blockchain_height; + peer_information + .core_sync_data + .lock() + .unwrap() + .current_height = current_blockchain_height; + let (block, txs) = rayon_spawn_async(move || -> Result<_, anyhow::Error> { let block = Block::read(&mut request.b.block.as_ref())?; @@ -282,10 +309,14 @@ async fn new_fluffy_block( .take_normal() .ok_or(anyhow::anyhow!("Peer sent pruned txs in fluffy block"))?; - // TODO: size check these tx blobs let txs = tx_blobs .into_iter() .map(|tx_blob| { + if tx_blob.len() > MAX_TRANSACTION_BLOB_SIZE { + peer_information.handle.ban_peer(MEDIUM_BAN); + anyhow::bail!("Peer sent a transaction over the size limit."); + } + let tx = Transaction::read(&mut tx_blob.as_ref())?; Ok((tx.hash(), tx)) @@ -323,6 +354,7 @@ async fn new_fluffy_block( } } +/// [`ProtocolRequest::NewTransactions`] async fn new_transactions<A: NetZoneAddress>( peer_information: PeerInformation<A>, request: NewTransactions, diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index 9ffa8b9d..b352bfe4 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -22,6 +22,7 @@ use cuprate_dandelion_tower::{ State, TxState, }; use cuprate_helper::asynch::rayon_spawn_async; +use cuprate_p2p::constants::MAX_TRANSACTION_BLOB_SIZE; use cuprate_txpool::service::{ interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse}, TxpoolReadHandle, TxpoolWriteHandle, @@ -39,6 +40,8 @@ use crate::{ /// An error that can happen handling an incoming tx. #[derive(Debug, thiserror::Error)] pub enum IncomingTxError { + #[error("Peer sent a transaction which is too big")] + TooLarge, #[error("parse error: {0}")] Parse(std::io::Error), #[error("consensus error: {0}")] @@ -191,6 +194,10 @@ async fn prepare_incoming_txs( let txs = tx_blobs .into_iter() .filter_map(|tx_blob| { + if tx_blob.len() > MAX_TRANSACTION_BLOB_SIZE { + return Some(Err(IncomingTxError::TooLarge)); + } + let tx_blob_hash = tx_blob_hash(tx_blob.as_ref()); // If a duplicate is in here the incoming tx batch contained the same tx twice. diff --git a/consensus/rules/src/miner_tx.rs b/consensus/rules/src/miner_tx.rs index 5221ee55..bb3b004a 100644 --- a/consensus/rules/src/miner_tx.rs +++ b/consensus/rules/src/miner_tx.rs @@ -68,7 +68,7 @@ pub fn calculate_block_reward( .unwrap(); let effective_median_bw: u128 = median_bw.try_into().unwrap(); - (((base_reward as u128 * multiplicand) / effective_median_bw) / effective_median_bw) + (((u128::from(base_reward) * multiplicand) / effective_median_bw) / effective_median_bw) .try_into() .unwrap() } diff --git a/consensus/src/context/difficulty.rs b/consensus/src/context/difficulty.rs index 9316dc5e..4c92b72e 100644 --- a/consensus/src/context/difficulty.rs +++ b/consensus/src/context/difficulty.rs @@ -330,7 +330,7 @@ fn next_difficulty( } // TODO: do checked operations here and unwrap so we don't silently overflow? - (windowed_work * hf.block_time().as_secs() as u128 + time_span - 1) / time_span + (windowed_work * u128::from(hf.block_time().as_secs()) + time_span - 1) / time_span } /// Get the start and end of the window to calculate difficulty. diff --git a/p2p/p2p/src/constants.rs b/p2p/p2p/src/constants.rs index d1060aea..59c2e1a3 100644 --- a/p2p/p2p/src/constants.rs +++ b/p2p/p2p/src/constants.rs @@ -61,7 +61,7 @@ pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_sec /// be less than. /// /// ref: <https://monero-book.cuprate.org/consensus_rules/transactions.html#transaction-size> -pub(crate) const MAX_TRANSACTION_BLOB_SIZE: usize = 1_000_000; +pub const MAX_TRANSACTION_BLOB_SIZE: usize = 1_000_000; /// The maximum amount of block IDs allowed in a chain entry response. ///