diff --git a/binaries/cuprated/src/blockchain/interface.rs b/binaries/cuprated/src/blockchain/interface.rs index 189828e9..b36a31ff 100644 --- a/binaries/cuprated/src/blockchain/interface.rs +++ b/binaries/cuprated/src/blockchain/interface.rs @@ -17,7 +17,7 @@ use cuprate_consensus::transactions::new_tx_verification_data; use cuprate_helper::cast::usize_to_u64; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, - Chain, + Chain, TransactionVerificationData, }; use crate::{ @@ -65,7 +65,7 @@ pub enum IncomingBlockError { /// - the block's parent is unknown pub async fn handle_incoming_block( block: Block, - given_txs: Vec, + given_txs: HashMap<[u8; 32], TransactionVerificationData>, blockchain_read_handle: &mut BlockchainReadHandle, ) -> Result { // FIXME: we should look in the tx-pool for txs when that is ready. @@ -95,14 +95,6 @@ pub async fn handle_incoming_block( } // TODO: check we actually got given the right txs. - let prepped_txs = given_txs - .into_par_iter() - .map(|tx| { - let tx = new_tx_verification_data(tx)?; - Ok((tx.tx_hash, tx)) - }) - .collect::>() - .map_err(IncomingBlockError::InvalidBlock)?; let Some(incoming_block_tx) = COMMAND_TX.get() else { // We could still be starting up the blockchain manger, so just return this as there is nothing @@ -128,7 +120,7 @@ pub async fn handle_incoming_block( incoming_block_tx .send(BlockchainManagerCommand::AddBlock { block, - prepped_txs, + prepped_txs: given_txs, response_tx, }) .await diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 3b94770a..c868de94 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -1,6 +1,9 @@ use bytes::Bytes; use futures::future::BoxFuture; use futures::FutureExt; +use monero_serai::block::Block; +use monero_serai::transaction::Transaction; +use std::collections::HashSet; use std::{ future::{ready, Ready}, task::{Context, Poll}, @@ -8,8 +11,10 @@ use std::{ use tower::{Service, ServiceExt}; use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_consensus::transactions::new_tx_verification_data; use cuprate_consensus::BlockChainContextService; use cuprate_fixed_bytes::ByteArrayVec; +use cuprate_helper::asynch::rayon_spawn_async; use cuprate_helper::cast::usize_to_u64; use cuprate_helper::map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}; use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN; @@ -21,6 +26,9 @@ use cuprate_wire::protocol::{ GetObjectsResponse, NewFluffyBlock, }; +use crate::blockchain::interface as blockchain_interface; +use crate::blockchain::interface::IncomingBlockError; + #[derive(Clone)] pub struct P2pProtocolRequestHandlerMaker { pub blockchain_read_handle: BlockchainReadHandle, @@ -77,7 +85,9 @@ impl Service for P2pProtocolRequestHandler { "Peer sent a full block when we support fluffy blocks" ))) .boxed(), - ProtocolRequest::NewFluffyBlock(_) => todo!(), + ProtocolRequest::NewFluffyBlock(r) => { + new_fluffy_block(r, self.blockchain_read_handle.clone()).boxed() + } ProtocolRequest::GetTxPoolCompliment(_) | ProtocolRequest::NewTransactions(_) => { ready(Ok(ProtocolResponse::NA)).boxed() } // TODO: tx-pool @@ -97,7 +107,7 @@ async fn get_objects( } let block_hashes: Vec<[u8; 32]> = (&request.blocks).into(); - // de-allocate the backing `Bytes`. + // deallocate the backing `Bytes`. drop(request); let BlockchainResponse::BlockCompleteEntries { @@ -131,7 +141,7 @@ async fn get_chain( let block_hashes: Vec<[u8; 32]> = (&request.block_ids).into(); let want_pruned_data = request.prune; - // de-allocate the backing `Bytes`. + // deallocate the backing `Bytes`. drop(request); let BlockchainResponse::NextChainEntry { @@ -182,7 +192,7 @@ async fn fluffy_missing_txs( let block_hash: [u8; 32] = *request.block_hash; let current_blockchain_height = request.current_blockchain_height; - // de-allocate the backing `Bytes`. + // deallocate the backing `Bytes`. drop(request); let BlockchainResponse::MissingTxsInBlock(res) = blockchain_read_handle @@ -212,3 +222,63 @@ async fn fluffy_missing_txs( current_blockchain_height, })) } + +/// [`ProtocolRequest::NewFluffyBlock`] +async fn new_fluffy_block( + request: NewFluffyBlock, + mut blockchain_read_handle: BlockchainReadHandle, +) -> anyhow::Result { + let current_blockchain_height = request.current_blockchain_height; + + let (block, txs) = rayon_spawn_async(move || -> Result<_, anyhow::Error> { + let block = Block::read(&mut request.b.block.as_ref())?; + + let tx_blobs = request + .b + .txs + .take_normal() + .ok_or(anyhow::anyhow!("Peer sent pruned txs in fluffy block"))?; + + let mut txs_in_block = block.transactions.iter().copied().collect::>(); + + // TODO: size check these tx blobs + let txs = tx_blobs + .into_iter() + .map(|tx_blob| { + let tx = Transaction::read(&mut tx_blob.as_ref())?; + + let tx = new_tx_verification_data(tx)?; + + if !txs_in_block.remove(&tx.tx_hash) { + anyhow::bail!("Peer sent tx in fluffy block that wasn't actually in block") + } + + Ok((tx.tx_hash, tx)) + }) + .collect::>()?; + + // The backing `Bytes` will be deallocated when this closure returns. + + Ok((block, txs)) + }) + .await?; + + let res = + blockchain_interface::handle_incoming_block(block, txs, &mut blockchain_read_handle).await; + + match res { + Ok(_) => Ok(ProtocolResponse::NA), + Err(IncomingBlockError::UnknownTransactions(block_hash, missing_tx_indices)) => Ok( + ProtocolResponse::FluffyMissingTransactionsRequest(FluffyMissingTransactionsRequest { + block_hash: block_hash.into(), + current_blockchain_height, + missing_tx_indices, + }), + ), + Err(IncomingBlockError::Orphan) => { + // Block's parent was unknown, could be syncing? + Ok(ProtocolResponse::NA) + } + Err(e) => Err(e.into()), + } +} diff --git a/p2p/p2p-core/src/protocol.rs b/p2p/p2p-core/src/protocol.rs index 7d8d431b..82aac824 100644 --- a/p2p/p2p-core/src/protocol.rs +++ b/p2p/p2p-core/src/protocol.rs @@ -116,6 +116,7 @@ pub enum ProtocolResponse { GetChain(ChainResponse), NewFluffyBlock(NewFluffyBlock), NewTransactions(NewTransactions), + FluffyMissingTransactionsRequest(FluffyMissingTransactionsRequest), NA, } @@ -139,6 +140,9 @@ impl PeerResponse { ProtocolResponse::GetChain(_) => MessageID::GetChain, ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock, ProtocolResponse::NewTransactions(_) => MessageID::NewFluffyBlock, + ProtocolResponse::FluffyMissingTransactionsRequest(_) => { + MessageID::FluffyMissingTxs + } ProtocolResponse::NA => return None, }, diff --git a/p2p/p2p-core/src/protocol/try_from.rs b/p2p/p2p-core/src/protocol/try_from.rs index d3a7260f..2dfc41db 100644 --- a/p2p/p2p-core/src/protocol/try_from.rs +++ b/p2p/p2p-core/src/protocol/try_from.rs @@ -71,6 +71,9 @@ impl TryFrom for ProtocolMessage { ProtocolResponse::NewFluffyBlock(val) => Self::NewFluffyBlock(val), ProtocolResponse::GetChain(val) => Self::ChainEntryResponse(val), ProtocolResponse::GetObjects(val) => Self::GetObjectsResponse(val), + ProtocolResponse::FluffyMissingTransactionsRequest(val) => { + Self::FluffyMissingTransactionsRequest(val) + } ProtocolResponse::NA => return Err(MessageConversionError), }) }