From 291ffe324d37c76f5dcf14be6709e183da60830b Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sun, 15 Sep 2024 01:59:05 +0100 Subject: [PATCH] fix new block handling --- binaries/cuprated/src/blockchain/free.rs | 18 ++++++++++---- binaries/cuprated/src/p2p/request_handler.rs | 25 +++++++++++++------- p2p/p2p-core/src/protocol/try_from.rs | 4 +++- storage/blockchain/src/service/read.rs | 14 ++++++----- 4 files changed, 41 insertions(+), 20 deletions(-) diff --git a/binaries/cuprated/src/blockchain/free.rs b/binaries/cuprated/src/blockchain/free.rs index 6f44572c..b32bb733 100644 --- a/binaries/cuprated/src/blockchain/free.rs +++ b/binaries/cuprated/src/blockchain/free.rs @@ -7,13 +7,15 @@ use cuprate_types::Chain; use monero_serai::block::Block; use monero_serai::transaction::Transaction; use rayon::prelude::*; -use std::collections::HashMap; -use std::sync::OnceLock; +use std::collections::{HashMap, HashSet}; +use std::sync::{Mutex, OnceLock}; use tokio::sync::{mpsc, oneshot}; use tower::{Service, ServiceExt}; pub static INCOMING_BLOCK_TX: OnceLock> = OnceLock::new(); +pub static BLOCKS_BEING_HANDLED: OnceLock>> = OnceLock::new(); + #[derive(Debug, thiserror::Error)] pub enum IncomingBlockError { #[error("Unknown transactions in block.")] @@ -62,6 +64,10 @@ pub async fn handle_incoming_block( .collect::>() .map_err(IncomingBlockError::InvalidBlock)?; + if !BLOCKS_BEING_HANDLED.get_or_init(|| Mutex::new(HashSet::new())).lock().unwrap().insert(block_hash) { + return Ok(false); + } + let Some(incoming_block_tx) = INCOMING_BLOCK_TX.get() else { return Ok(false); }; @@ -77,10 +83,14 @@ pub async fn handle_incoming_block( .await .expect("TODO: don't actually panic here"); - response_rx + let res =response_rx .await .unwrap() - .map_err(IncomingBlockError::InvalidBlock) + .map_err(IncomingBlockError::InvalidBlock); + + BLOCKS_BEING_HANDLED.get().unwrap().lock().unwrap().remove(&block_hash); + + res } async fn block_exists( diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index ee219392..e3245e7e 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -19,7 +19,10 @@ use cuprate_helper::map::split_u128_into_low_high_bits; use cuprate_p2p::constants::{MAX_BLOCKCHAIN_SUPPLEMENT_LEN, MAX_BLOCK_BATCH_LEN}; use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; use cuprate_types::BlockCompleteEntry; -use cuprate_wire::protocol::{ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest, GetObjectsResponse, NewFluffyBlock}; +use cuprate_wire::protocol::{ + ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest, + GetObjectsResponse, NewFluffyBlock, +}; #[derive(Clone)] pub struct P2pProtocolRequestHandler { @@ -46,7 +49,9 @@ impl Service for P2pProtocolRequestHandler { ProtocolRequest::FluffyMissingTxs(_) => async { Ok(ProtocolResponse::NA) }.boxed(), ProtocolRequest::GetTxPoolCompliment(_) => async { Ok(ProtocolResponse::NA) }.boxed(), ProtocolRequest::NewBlock(_) => async { Ok(ProtocolResponse::NA) }.boxed(), - ProtocolRequest::NewFluffyBlock(block) => new_fluffy_block(self.blockchain_read_handle.clone(), block).boxed(), + ProtocolRequest::NewFluffyBlock(block) => { + new_fluffy_block(self.blockchain_read_handle.clone(), block).boxed() + } ProtocolRequest::NewTransactions(_) => async { Ok(ProtocolResponse::NA) }.boxed(), } } @@ -158,14 +163,16 @@ async fn new_fluffy_block( .await?; let res = handle_incoming_block(block, txs, &mut blockchain_read_handle).await; - - match res { + + match res { Err(IncomingBlockError::UnknownTransactions(block_hash, tx_indexes)) => { - return Ok(ProtocolResponse::FluffyMissingTxs(FluffyMissingTransactionsRequest{ - block_hash: ByteArray::from(block_hash), - current_blockchain_height: peer_blockchain_height, - missing_tx_indices: tx_indexes, - })) + return Ok(ProtocolResponse::FluffyMissingTxs( + FluffyMissingTransactionsRequest { + block_hash: ByteArray::from(block_hash), + current_blockchain_height: peer_blockchain_height, + missing_tx_indices: tx_indexes, + }, + )) } Err(IncomingBlockError::InvalidBlock(e)) => Err(e)?, Err(IncomingBlockError::Orphan) | Ok(_) => Ok(ProtocolResponse::NA), diff --git a/p2p/p2p-core/src/protocol/try_from.rs b/p2p/p2p-core/src/protocol/try_from.rs index b3c5203d..ae21a07d 100644 --- a/p2p/p2p-core/src/protocol/try_from.rs +++ b/p2p/p2p-core/src/protocol/try_from.rs @@ -75,7 +75,9 @@ impl TryFrom for ProtocolMessage { ProtocolResponse::NewFluffyBlock(val) => ProtocolMessage::NewFluffyBlock(val), ProtocolResponse::GetChain(val) => ProtocolMessage::ChainEntryResponse(val), ProtocolResponse::GetObjects(val) => ProtocolMessage::GetObjectsResponse(val), - ProtocolResponse::FluffyMissingTxs(val) => ProtocolMessage::FluffyMissingTransactionsRequest(val), + ProtocolResponse::FluffyMissingTxs(val) => { + ProtocolMessage::FluffyMissingTransactionsRequest(val) + } ProtocolResponse::NA => return Err(MessageConversionError), }) } diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index e502c9f1..89f92424 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -268,12 +268,14 @@ fn find_block(env: &ConcreteEnv, block_hash: BlockHash) -> ResponseResult { let table_alt_block_heights = env_inner.open_db_ro::(&tx_ro)?; - let height = table_alt_block_heights.get(&block_hash)?; - - Ok(BlockchainResponse::FindBlock(Some(( - Chain::Alt(height.chain_id.into()), - height.height, - )))) + match table_alt_block_heights.get(&block_hash) { + Ok(height) => Ok(BlockchainResponse::FindBlock(Some(( + Chain::Alt(height.chain_id.into()), + height.height, + )))), + Err(RuntimeError::KeyNotFound) => Ok(BlockchainResponse::FindBlock(None)), + Err(e) => return Err(e), + } } /// [`BlockchainReadRequest::FilterUnknownHashes`].