From 82a2133a8b7513b227cf47d9eca80014dcb852d1 Mon Sep 17 00:00:00 2001
From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com>
Date: Sun, 27 Oct 2024 20:00:49 +0000
Subject: [PATCH] Allow `IncomingTxHandler` to be given later

---
 binaries/cuprated/src/p2p/request_handler.rs | 46 +++++++++++++++++---
 binaries/cuprated/src/txpool/incoming_tx.rs  |  7 +++
 consensus/rules/src/miner_tx.rs              |  2 +-
 consensus/src/context/difficulty.rs          |  2 +-
 p2p/p2p/src/constants.rs                     |  2 +-
 5 files changed, 49 insertions(+), 10 deletions(-)

diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs
index b21e066d..be88a049 100644
--- a/binaries/cuprated/src/p2p/request_handler.rs
+++ b/binaries/cuprated/src/p2p/request_handler.rs
@@ -1,4 +1,5 @@
 use bytes::Bytes;
+use futures::future::Shared;
 use futures::{future::BoxFuture, FutureExt};
 use monero_serai::{block::Block, transaction::Transaction};
 use std::hash::Hash;
@@ -7,6 +8,8 @@ use std::{
     future::{ready, Ready},
     task::{Context, Poll},
 };
+use tokio::sync::{broadcast, oneshot, watch};
+use tokio_stream::wrappers::WatchStream;
 use tower::{Service, ServiceExt};
 
 use cuprate_blockchain::service::BlockchainReadHandle;
@@ -22,7 +25,7 @@ use cuprate_helper::{
     cast::usize_to_u64,
     map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits},
 };
-use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN;
+use cuprate_p2p::constants::{MAX_BLOCK_BATCH_LEN, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN};
 use cuprate_p2p_core::client::InternalPeerID;
 use cuprate_p2p_core::{
     client::PeerInformation, NetZoneAddress, NetworkZone, ProtocolRequest, ProtocolResponse,
@@ -52,7 +55,9 @@ pub struct P2pProtocolRequestHandlerMaker {
     /// The [`TxpoolReadHandle`].
     pub txpool_read_handle: TxpoolReadHandle,
 
-    pub incoming_tx_handler: IncomingTxHandler,
+    pub incoming_tx_handler: Option<IncomingTxHandler>,
+
+    pub incoming_tx_handler_fut: Shared<oneshot::Receiver<IncomingTxHandler>>,
 }
 
 impl<N: NetZoneAddress> Service<PeerInformation<N>> for P2pProtocolRequestHandlerMaker {
@@ -60,11 +65,25 @@ impl<N: NetZoneAddress> Service<PeerInformation<N>> for P2pProtocolRequestHandle
     type Error = tower::BoxError;
     type Future = Ready<Result<Self::Response, Self::Error>>;
 
-    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        if self.incoming_tx_handler.is_none() {
+            return self
+                .incoming_tx_handler_fut
+                .poll_unpin(cx)
+                .map(|incoming_tx_handler| {
+                    self.incoming_tx_handler = Some(incoming_tx_handler?);
+                    Ok(())
+                });
+        }
+
         Poll::Ready(Ok(()))
     }
 
     fn call(&mut self, peer_information: PeerInformation<N>) -> Self::Future {
+        let Some(incoming_tx_handler) = self.incoming_tx_handler.clone() else {
+            panic!("poll_ready was not called or did not return `Poll::Ready`")
+        };
+
         // TODO: check sync info?
 
         let blockchain_read_handle = self.blockchain_read_handle.clone();
@@ -75,7 +94,7 @@ impl<N: NetZoneAddress> Service<PeerInformation<N>> for P2pProtocolRequestHandle
             blockchain_read_handle,
             blockchain_context_service: self.blockchain_context_service.clone(),
             txpool_read_handle,
-            incoming_tx_handler: self.incoming_tx_handler.clone(),
+            incoming_tx_handler,
         }))
     }
 }
@@ -120,6 +139,7 @@ impl<Z: NetZoneAddress> Service<ProtocolRequest> for P2pProtocolRequestHandler<Z
             )))
             .boxed(),
             ProtocolRequest::NewFluffyBlock(r) => new_fluffy_block(
+                self.peer_information.clone(),
                 r,
                 self.blockchain_read_handle.clone(),
                 self.txpool_read_handle.clone(),
@@ -216,7 +236,7 @@ async fn get_chain(
         cumulative_difficulty_top64,
         m_block_ids: ByteArrayVec::from(block_ids),
         first_block: first_block_blob.map_or(Bytes::new(), Bytes::from),
-        // only needed when
+        // only needed when pruned
         m_block_weights: if want_pruned_data {
             block_weights.into_iter().map(usize_to_u64).collect()
         } else {
@@ -266,13 +286,20 @@ async fn fluffy_missing_txs(
 }
 
 /// [`ProtocolRequest::NewFluffyBlock`]
-async fn new_fluffy_block(
+async fn new_fluffy_block<A: NetZoneAddress>(
+    peer_information: PeerInformation<A>,
     request: NewFluffyBlock,
     mut blockchain_read_handle: BlockchainReadHandle,
     mut txpool_read_handle: TxpoolReadHandle,
 ) -> anyhow::Result<ProtocolResponse> {
     let current_blockchain_height = request.current_blockchain_height;
 
+    peer_information
+        .core_sync_data
+        .lock()
+        .unwrap()
+        .current_height = current_blockchain_height;
+
     let (block, txs) = rayon_spawn_async(move || -> Result<_, anyhow::Error> {
         let block = Block::read(&mut request.b.block.as_ref())?;
 
@@ -282,10 +309,14 @@ async fn new_fluffy_block(
             .take_normal()
             .ok_or(anyhow::anyhow!("Peer sent pruned txs in fluffy block"))?;
 
-        // TODO: size check these tx blobs
         let txs = tx_blobs
             .into_iter()
             .map(|tx_blob| {
+                if tx_blob.len() > MAX_TRANSACTION_BLOB_SIZE {
+                    peer_information.handle.ban_peer(MEDIUM_BAN);
+                    anyhow::bail!("Peer sent a transaction over the size limit.");
+                }
+
                 let tx = Transaction::read(&mut tx_blob.as_ref())?;
 
                 Ok((tx.hash(), tx))
@@ -323,6 +354,7 @@ async fn new_fluffy_block(
     }
 }
 
+/// [`ProtocolRequest::NewTransactions`]
 async fn new_transactions<A: NetZoneAddress>(
     peer_information: PeerInformation<A>,
     request: NewTransactions,
diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs
index 9ffa8b9d..b352bfe4 100644
--- a/binaries/cuprated/src/txpool/incoming_tx.rs
+++ b/binaries/cuprated/src/txpool/incoming_tx.rs
@@ -22,6 +22,7 @@ use cuprate_dandelion_tower::{
     State, TxState,
 };
 use cuprate_helper::asynch::rayon_spawn_async;
+use cuprate_p2p::constants::MAX_TRANSACTION_BLOB_SIZE;
 use cuprate_txpool::service::{
     interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse},
     TxpoolReadHandle, TxpoolWriteHandle,
@@ -39,6 +40,8 @@ use crate::{
 /// An error that can happen handling an incoming tx.
 #[derive(Debug, thiserror::Error)]
 pub enum IncomingTxError {
+    #[error("Peer sent a transaction which is too big")]
+    TooLarge,
     #[error("parse error: {0}")]
     Parse(std::io::Error),
     #[error("consensus error: {0}")]
@@ -191,6 +194,10 @@ async fn prepare_incoming_txs(
     let txs = tx_blobs
         .into_iter()
         .filter_map(|tx_blob| {
+            if tx_blob.len() > MAX_TRANSACTION_BLOB_SIZE {
+                return Some(Err(IncomingTxError::TooLarge));
+            }
+
             let tx_blob_hash = tx_blob_hash(tx_blob.as_ref());
 
             // If a duplicate is in here the incoming tx batch contained the same tx twice.
diff --git a/consensus/rules/src/miner_tx.rs b/consensus/rules/src/miner_tx.rs
index 5221ee55..bb3b004a 100644
--- a/consensus/rules/src/miner_tx.rs
+++ b/consensus/rules/src/miner_tx.rs
@@ -68,7 +68,7 @@ pub fn calculate_block_reward(
         .unwrap();
     let effective_median_bw: u128 = median_bw.try_into().unwrap();
 
-    (((base_reward as u128 * multiplicand) / effective_median_bw) / effective_median_bw)
+    (((u128::from(base_reward) * multiplicand) / effective_median_bw) / effective_median_bw)
         .try_into()
         .unwrap()
 }
diff --git a/consensus/src/context/difficulty.rs b/consensus/src/context/difficulty.rs
index 9316dc5e..4c92b72e 100644
--- a/consensus/src/context/difficulty.rs
+++ b/consensus/src/context/difficulty.rs
@@ -330,7 +330,7 @@ fn next_difficulty(
     }
 
     // TODO: do checked operations here and unwrap so we don't silently overflow?
-    (windowed_work * hf.block_time().as_secs() as u128 + time_span - 1) / time_span
+    (windowed_work * u128::from(hf.block_time().as_secs()) + time_span - 1) / time_span
 }
 
 /// Get the start and end of the window to calculate difficulty.
diff --git a/p2p/p2p/src/constants.rs b/p2p/p2p/src/constants.rs
index d1060aea..59c2e1a3 100644
--- a/p2p/p2p/src/constants.rs
+++ b/p2p/p2p/src/constants.rs
@@ -61,7 +61,7 @@ pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_sec
 /// be less than.
 ///
 /// ref: <https://monero-book.cuprate.org/consensus_rules/transactions.html#transaction-size>
-pub(crate) const MAX_TRANSACTION_BLOB_SIZE: usize = 1_000_000;
+pub const MAX_TRANSACTION_BLOB_SIZE: usize = 1_000_000;
 
 /// The maximum amount of block IDs allowed in a chain entry response.
 ///