From d2ab8e20c45cea6cb040c2e16ef792f47c5b1682 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sun, 15 Sep 2024 01:20:24 +0100 Subject: [PATCH] add fluffy block handler --- binaries/cuprated/src/blockchain.rs | 4 +- binaries/cuprated/src/blockchain/free.rs | 17 +++++-- binaries/cuprated/src/blockchain/manager.rs | 4 +- .../src/blockchain/manager/commands.rs | 1 - .../src/blockchain/manager/handler.rs | 6 +-- binaries/cuprated/src/p2p/request_handler.rs | 50 ++++++++++++++++++- p2p/p2p-core/src/protocol.rs | 2 + p2p/p2p-core/src/protocol/try_from.rs | 1 + 8 files changed, 71 insertions(+), 14 deletions(-) diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index f05878ac..2668dd8f 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -22,12 +22,14 @@ mod manager; mod syncer; mod types; +use crate::blockchain::free::INCOMING_BLOCK_TX; use manager::BlockchainManager; use types::{ ChainService, ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle, }; -use crate::blockchain::free::INCOMING_BLOCK_TX; + +pub use free::{handle_incoming_block, IncomingBlockError}; /// Checks if the genesis block is in the blockchain and adds it if not. pub async fn check_add_genesis( diff --git a/binaries/cuprated/src/blockchain/free.rs b/binaries/cuprated/src/blockchain/free.rs index eb80a31d..6f44572c 100644 --- a/binaries/cuprated/src/blockchain/free.rs +++ b/binaries/cuprated/src/blockchain/free.rs @@ -1,3 +1,4 @@ +use crate::blockchain::manager::commands::BlockchainManagerCommand; use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_consensus::transactions::new_tx_verification_data; use cuprate_helper::cast::usize_to_u64; @@ -10,14 +11,13 @@ use std::collections::HashMap; use std::sync::OnceLock; use tokio::sync::{mpsc, oneshot}; use tower::{Service, ServiceExt}; -use crate::blockchain::manager::commands::BlockchainManagerCommand; pub static INCOMING_BLOCK_TX: OnceLock> = OnceLock::new(); #[derive(Debug, thiserror::Error)] pub enum IncomingBlockError { #[error("Unknown transactions in block.")] - UnknownTransactions(Vec), + UnknownTransactions([u8; 32], Vec), #[error("The block has an unknown parent.")] Orphan, #[error(transparent)] @@ -29,7 +29,10 @@ pub async fn handle_incoming_block( given_txs: Vec, blockchain_read_handle: &mut BlockchainReadHandle, ) -> Result { - if !block_exists(block.header.previous, blockchain_read_handle).await.expect("TODO") { + if !block_exists(block.header.previous, blockchain_read_handle) + .await + .expect("TODO") + { return Err(IncomingBlockError::Orphan); } @@ -45,6 +48,7 @@ pub async fn handle_incoming_block( // TODO: Get transactions from the tx pool first. if given_txs.len() != block.transactions.len() { return Err(IncomingBlockError::UnknownTransactions( + block_hash, (0..usize_to_u64(block.transactions.len())).collect(), )); } @@ -65,7 +69,7 @@ pub async fn handle_incoming_block( let (response_tx, response_rx) = oneshot::channel(); incoming_block_tx - .send( BlockchainManagerCommand::AddBlock { + .send(BlockchainManagerCommand::AddBlock { block, prepped_txs, response_tx, @@ -73,7 +77,10 @@ pub async fn handle_incoming_block( .await .expect("TODO: don't actually panic here"); - response_rx.await.unwrap().map_err(IncomingBlockError::InvalidBlock) + response_rx + .await + .unwrap() + .map_err(IncomingBlockError::InvalidBlock) } async fn block_exists( diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index b208436c..d5a7ff9a 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -1,6 +1,7 @@ -mod handler; pub(super) mod commands; +mod handler; +use crate::blockchain::manager::commands::BlockchainManagerCommand; use crate::blockchain::types::ConsensusBlockchainReadHandle; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_consensus::context::RawBlockChainContext; @@ -23,7 +24,6 @@ use tokio::sync::{oneshot, Notify}; use tower::{Service, ServiceExt}; use tracing::error; use tracing_subscriber::fmt::time::FormatTime; -use crate::blockchain::manager::commands::BlockchainManagerCommand; pub struct BlockchainManager { blockchain_write_handle: BlockchainWriteHandle, diff --git a/binaries/cuprated/src/blockchain/manager/commands.rs b/binaries/cuprated/src/blockchain/manager/commands.rs index 1b6f4a48..c60c7ef0 100644 --- a/binaries/cuprated/src/blockchain/manager/commands.rs +++ b/binaries/cuprated/src/blockchain/manager/commands.rs @@ -14,4 +14,3 @@ pub enum BlockchainManagerCommand { PopBlocks, } - diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index a925e710..50268761 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -20,8 +20,8 @@ use cuprate_types::{ AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation, }; -use crate::{blockchain::types::ConsensusBlockchainReadHandle, signals::REORG_LOCK}; use crate::blockchain::manager::commands::BlockchainManagerCommand; +use crate::{blockchain::types::ConsensusBlockchainReadHandle, signals::REORG_LOCK}; impl super::BlockchainManager { pub async fn handle_command(&mut self, command: BlockchainManagerCommand) { @@ -29,13 +29,13 @@ impl super::BlockchainManager { BlockchainManagerCommand::AddBlock { block, prepped_txs, - response_tx + response_tx, } => { let res = self.handle_incoming_block(block, prepped_txs).await; drop(response_tx.send(res)); } - BlockchainManagerCommand::PopBlocks => todo!() + BlockchainManagerCommand::PopBlocks => todo!(), } } diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 0a4412fa..ee219392 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -2,17 +2,24 @@ use bytes::Bytes; use cuprate_p2p_core::{ProtocolRequest, ProtocolResponse}; use futures::future::BoxFuture; use futures::FutureExt; +use monero_serai::block::Block; +use monero_serai::transaction::Transaction; +use rayon::prelude::*; use std::task::{Context, Poll}; use tower::{Service, ServiceExt}; use tracing::trace; +use crate::blockchain::{handle_incoming_block, IncomingBlockError}; use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_consensus::transactions::new_tx_verification_data; +use cuprate_fixed_bytes::ByteArray; +use cuprate_helper::asynch::rayon_spawn_async; use cuprate_helper::cast::usize_to_u64; use cuprate_helper::map::split_u128_into_low_high_bits; use cuprate_p2p::constants::{MAX_BLOCKCHAIN_SUPPLEMENT_LEN, MAX_BLOCK_BATCH_LEN}; use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; use cuprate_types::BlockCompleteEntry; -use cuprate_wire::protocol::{ChainRequest, ChainResponse, GetObjectsRequest, GetObjectsResponse}; +use cuprate_wire::protocol::{ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest, GetObjectsResponse, NewFluffyBlock}; #[derive(Clone)] pub struct P2pProtocolRequestHandler { @@ -39,7 +46,7 @@ impl Service for P2pProtocolRequestHandler { ProtocolRequest::FluffyMissingTxs(_) => async { Ok(ProtocolResponse::NA) }.boxed(), ProtocolRequest::GetTxPoolCompliment(_) => async { Ok(ProtocolResponse::NA) }.boxed(), ProtocolRequest::NewBlock(_) => async { Ok(ProtocolResponse::NA) }.boxed(), - ProtocolRequest::NewFluffyBlock(_) => async { Ok(ProtocolResponse::NA) }.boxed(), + ProtocolRequest::NewFluffyBlock(block) => new_fluffy_block(self.blockchain_read_handle.clone(), block).boxed(), ProtocolRequest::NewTransactions(_) => async { Ok(ProtocolResponse::NA) }.boxed(), } } @@ -125,3 +132,42 @@ async fn get_chain( first_block: first_missing_block.map_or(Bytes::new(), Bytes::from), })) } + +async fn new_fluffy_block( + mut blockchain_read_handle: BlockchainReadHandle, + incoming_block: NewFluffyBlock, +) -> Result { + let peer_blockchain_height = incoming_block.current_blockchain_height; + + let (block, txs) = rayon_spawn_async(move || { + let block = Block::read(&mut incoming_block.b.block.as_ref())?; + let txs = incoming_block + .b + .txs + .take_normal() + .expect("TODO") + .into_par_iter() + .map(|tx| { + let tx = Transaction::read(&mut tx.as_ref())?; + Ok(tx) + }) + .collect::>()?; + + Ok::<_, tower::BoxError>((block, txs)) + }) + .await?; + + let res = handle_incoming_block(block, txs, &mut blockchain_read_handle).await; + + match res { + Err(IncomingBlockError::UnknownTransactions(block_hash, tx_indexes)) => { + return Ok(ProtocolResponse::FluffyMissingTxs(FluffyMissingTransactionsRequest{ + block_hash: ByteArray::from(block_hash), + current_blockchain_height: peer_blockchain_height, + missing_tx_indices: tx_indexes, + })) + } + Err(IncomingBlockError::InvalidBlock(e)) => Err(e)?, + Err(IncomingBlockError::Orphan) | Ok(_) => Ok(ProtocolResponse::NA), + } +} diff --git a/p2p/p2p-core/src/protocol.rs b/p2p/p2p-core/src/protocol.rs index 5e4f4d7e..fc3cb7c8 100644 --- a/p2p/p2p-core/src/protocol.rs +++ b/p2p/p2p-core/src/protocol.rs @@ -116,6 +116,7 @@ pub enum ProtocolResponse { GetChain(ChainResponse), NewFluffyBlock(NewFluffyBlock), NewTransactions(NewTransactions), + FluffyMissingTxs(FluffyMissingTransactionsRequest), NA, } @@ -139,6 +140,7 @@ impl PeerResponse { ProtocolResponse::GetChain(_) => MessageID::GetChain, ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock, ProtocolResponse::NewTransactions(_) => MessageID::NewFluffyBlock, + ProtocolResponse::FluffyMissingTxs(_) => MessageID::FluffyMissingTxs, ProtocolResponse::NA => return None, }, diff --git a/p2p/p2p-core/src/protocol/try_from.rs b/p2p/p2p-core/src/protocol/try_from.rs index 8a0b67d2..b3c5203d 100644 --- a/p2p/p2p-core/src/protocol/try_from.rs +++ b/p2p/p2p-core/src/protocol/try_from.rs @@ -75,6 +75,7 @@ impl TryFrom for ProtocolMessage { ProtocolResponse::NewFluffyBlock(val) => ProtocolMessage::NewFluffyBlock(val), ProtocolResponse::GetChain(val) => ProtocolMessage::ChainEntryResponse(val), ProtocolResponse::GetObjects(val) => ProtocolMessage::GetObjectsResponse(val), + ProtocolResponse::FluffyMissingTxs(val) => ProtocolMessage::FluffyMissingTransactionsRequest(val), ProtocolResponse::NA => return Err(MessageConversionError), }) }