Allow IncomingTxHandler to be given later

This commit is contained in:
Boog900 2024-10-27 20:00:49 +00:00
parent 54957c1033
commit 82a2133a8b
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
5 changed files with 49 additions and 10 deletions

View file

@ -1,4 +1,5 @@
use bytes::Bytes; use bytes::Bytes;
use futures::future::Shared;
use futures::{future::BoxFuture, FutureExt}; use futures::{future::BoxFuture, FutureExt};
use monero_serai::{block::Block, transaction::Transaction}; use monero_serai::{block::Block, transaction::Transaction};
use std::hash::Hash; use std::hash::Hash;
@ -7,6 +8,8 @@ use std::{
future::{ready, Ready}, future::{ready, Ready},
task::{Context, Poll}, task::{Context, Poll},
}; };
use tokio::sync::{broadcast, oneshot, watch};
use tokio_stream::wrappers::WatchStream;
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_blockchain::service::BlockchainReadHandle;
@ -22,7 +25,7 @@ use cuprate_helper::{
cast::usize_to_u64, cast::usize_to_u64,
map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}, map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits},
}; };
use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN; use cuprate_p2p::constants::{MAX_BLOCK_BATCH_LEN, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN};
use cuprate_p2p_core::client::InternalPeerID; use cuprate_p2p_core::client::InternalPeerID;
use cuprate_p2p_core::{ use cuprate_p2p_core::{
client::PeerInformation, NetZoneAddress, NetworkZone, ProtocolRequest, ProtocolResponse, client::PeerInformation, NetZoneAddress, NetworkZone, ProtocolRequest, ProtocolResponse,
@ -52,7 +55,9 @@ pub struct P2pProtocolRequestHandlerMaker {
/// The [`TxpoolReadHandle`]. /// The [`TxpoolReadHandle`].
pub txpool_read_handle: TxpoolReadHandle, pub txpool_read_handle: TxpoolReadHandle,
pub incoming_tx_handler: IncomingTxHandler, pub incoming_tx_handler: Option<IncomingTxHandler>,
pub incoming_tx_handler_fut: Shared<oneshot::Receiver<IncomingTxHandler>>,
} }
impl<N: NetZoneAddress> Service<PeerInformation<N>> for P2pProtocolRequestHandlerMaker { impl<N: NetZoneAddress> Service<PeerInformation<N>> for P2pProtocolRequestHandlerMaker {
@ -60,11 +65,25 @@ impl<N: NetZoneAddress> Service<PeerInformation<N>> for P2pProtocolRequestHandle
type Error = tower::BoxError; type Error = tower::BoxError;
type Future = Ready<Result<Self::Response, Self::Error>>; type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.incoming_tx_handler.is_none() {
return self
.incoming_tx_handler_fut
.poll_unpin(cx)
.map(|incoming_tx_handler| {
self.incoming_tx_handler = Some(incoming_tx_handler?);
Ok(())
});
}
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
fn call(&mut self, peer_information: PeerInformation<N>) -> Self::Future { fn call(&mut self, peer_information: PeerInformation<N>) -> Self::Future {
let Some(incoming_tx_handler) = self.incoming_tx_handler.clone() else {
panic!("poll_ready was not called or did not return `Poll::Ready`")
};
// TODO: check sync info? // TODO: check sync info?
let blockchain_read_handle = self.blockchain_read_handle.clone(); let blockchain_read_handle = self.blockchain_read_handle.clone();
@ -75,7 +94,7 @@ impl<N: NetZoneAddress> Service<PeerInformation<N>> for P2pProtocolRequestHandle
blockchain_read_handle, blockchain_read_handle,
blockchain_context_service: self.blockchain_context_service.clone(), blockchain_context_service: self.blockchain_context_service.clone(),
txpool_read_handle, txpool_read_handle,
incoming_tx_handler: self.incoming_tx_handler.clone(), incoming_tx_handler,
})) }))
} }
} }
@ -120,6 +139,7 @@ impl<Z: NetZoneAddress> Service<ProtocolRequest> for P2pProtocolRequestHandler<Z
))) )))
.boxed(), .boxed(),
ProtocolRequest::NewFluffyBlock(r) => new_fluffy_block( ProtocolRequest::NewFluffyBlock(r) => new_fluffy_block(
self.peer_information.clone(),
r, r,
self.blockchain_read_handle.clone(), self.blockchain_read_handle.clone(),
self.txpool_read_handle.clone(), self.txpool_read_handle.clone(),
@ -216,7 +236,7 @@ async fn get_chain(
cumulative_difficulty_top64, cumulative_difficulty_top64,
m_block_ids: ByteArrayVec::from(block_ids), m_block_ids: ByteArrayVec::from(block_ids),
first_block: first_block_blob.map_or(Bytes::new(), Bytes::from), first_block: first_block_blob.map_or(Bytes::new(), Bytes::from),
// only needed when // only needed when pruned
m_block_weights: if want_pruned_data { m_block_weights: if want_pruned_data {
block_weights.into_iter().map(usize_to_u64).collect() block_weights.into_iter().map(usize_to_u64).collect()
} else { } else {
@ -266,13 +286,20 @@ async fn fluffy_missing_txs(
} }
/// [`ProtocolRequest::NewFluffyBlock`] /// [`ProtocolRequest::NewFluffyBlock`]
async fn new_fluffy_block( async fn new_fluffy_block<A: NetZoneAddress>(
peer_information: PeerInformation<A>,
request: NewFluffyBlock, request: NewFluffyBlock,
mut blockchain_read_handle: BlockchainReadHandle, mut blockchain_read_handle: BlockchainReadHandle,
mut txpool_read_handle: TxpoolReadHandle, mut txpool_read_handle: TxpoolReadHandle,
) -> anyhow::Result<ProtocolResponse> { ) -> anyhow::Result<ProtocolResponse> {
let current_blockchain_height = request.current_blockchain_height; let current_blockchain_height = request.current_blockchain_height;
peer_information
.core_sync_data
.lock()
.unwrap()
.current_height = current_blockchain_height;
let (block, txs) = rayon_spawn_async(move || -> Result<_, anyhow::Error> { let (block, txs) = rayon_spawn_async(move || -> Result<_, anyhow::Error> {
let block = Block::read(&mut request.b.block.as_ref())?; let block = Block::read(&mut request.b.block.as_ref())?;
@ -282,10 +309,14 @@ async fn new_fluffy_block(
.take_normal() .take_normal()
.ok_or(anyhow::anyhow!("Peer sent pruned txs in fluffy block"))?; .ok_or(anyhow::anyhow!("Peer sent pruned txs in fluffy block"))?;
// TODO: size check these tx blobs
let txs = tx_blobs let txs = tx_blobs
.into_iter() .into_iter()
.map(|tx_blob| { .map(|tx_blob| {
if tx_blob.len() > MAX_TRANSACTION_BLOB_SIZE {
peer_information.handle.ban_peer(MEDIUM_BAN);
anyhow::bail!("Peer sent a transaction over the size limit.");
}
let tx = Transaction::read(&mut tx_blob.as_ref())?; let tx = Transaction::read(&mut tx_blob.as_ref())?;
Ok((tx.hash(), tx)) Ok((tx.hash(), tx))
@ -323,6 +354,7 @@ async fn new_fluffy_block(
} }
} }
/// [`ProtocolRequest::NewTransactions`]
async fn new_transactions<A: NetZoneAddress>( async fn new_transactions<A: NetZoneAddress>(
peer_information: PeerInformation<A>, peer_information: PeerInformation<A>,
request: NewTransactions, request: NewTransactions,

View file

@ -22,6 +22,7 @@ use cuprate_dandelion_tower::{
State, TxState, State, TxState,
}; };
use cuprate_helper::asynch::rayon_spawn_async; use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_p2p::constants::MAX_TRANSACTION_BLOB_SIZE;
use cuprate_txpool::service::{ use cuprate_txpool::service::{
interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse}, interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse},
TxpoolReadHandle, TxpoolWriteHandle, TxpoolReadHandle, TxpoolWriteHandle,
@ -39,6 +40,8 @@ use crate::{
/// An error that can happen handling an incoming tx. /// An error that can happen handling an incoming tx.
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum IncomingTxError { pub enum IncomingTxError {
#[error("Peer sent a transaction which is too big")]
TooLarge,
#[error("parse error: {0}")] #[error("parse error: {0}")]
Parse(std::io::Error), Parse(std::io::Error),
#[error("consensus error: {0}")] #[error("consensus error: {0}")]
@ -191,6 +194,10 @@ async fn prepare_incoming_txs(
let txs = tx_blobs let txs = tx_blobs
.into_iter() .into_iter()
.filter_map(|tx_blob| { .filter_map(|tx_blob| {
if tx_blob.len() > MAX_TRANSACTION_BLOB_SIZE {
return Some(Err(IncomingTxError::TooLarge));
}
let tx_blob_hash = tx_blob_hash(tx_blob.as_ref()); let tx_blob_hash = tx_blob_hash(tx_blob.as_ref());
// If a duplicate is in here the incoming tx batch contained the same tx twice. // If a duplicate is in here the incoming tx batch contained the same tx twice.

View file

@ -68,7 +68,7 @@ pub fn calculate_block_reward(
.unwrap(); .unwrap();
let effective_median_bw: u128 = median_bw.try_into().unwrap(); let effective_median_bw: u128 = median_bw.try_into().unwrap();
(((base_reward as u128 * multiplicand) / effective_median_bw) / effective_median_bw) (((u128::from(base_reward) * multiplicand) / effective_median_bw) / effective_median_bw)
.try_into() .try_into()
.unwrap() .unwrap()
} }

View file

@ -330,7 +330,7 @@ fn next_difficulty(
} }
// TODO: do checked operations here and unwrap so we don't silently overflow? // TODO: do checked operations here and unwrap so we don't silently overflow?
(windowed_work * hf.block_time().as_secs() as u128 + time_span - 1) / time_span (windowed_work * u128::from(hf.block_time().as_secs()) + time_span - 1) / time_span
} }
/// Get the start and end of the window to calculate difficulty. /// Get the start and end of the window to calculate difficulty.

View file

@ -61,7 +61,7 @@ pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_sec
/// be less than. /// be less than.
/// ///
/// ref: <https://monero-book.cuprate.org/consensus_rules/transactions.html#transaction-size> /// ref: <https://monero-book.cuprate.org/consensus_rules/transactions.html#transaction-size>
pub(crate) const MAX_TRANSACTION_BLOB_SIZE: usize = 1_000_000; pub const MAX_TRANSACTION_BLOB_SIZE: usize = 1_000_000;
/// The maximum amount of block IDs allowed in a chain entry response. /// The maximum amount of block IDs allowed in a chain entry response.
/// ///