diff --git a/binaries/cuprated/src/blockchain/interface.rs b/binaries/cuprated/src/blockchain/interface.rs index bdf1bf3..879103e 100644 --- a/binaries/cuprated/src/blockchain/interface.rs +++ b/binaries/cuprated/src/blockchain/interface.rs @@ -4,13 +4,13 @@ //! blockchain manager. use std::{ collections::{HashMap, HashSet}, - sync::{Mutex, OnceLock, LazyLock}, + sync::{LazyLock, Mutex, OnceLock}, }; -use tokio::sync::{mpsc, oneshot}; -use tower::{Service, ServiceExt}; use monero_serai::{block::Block, transaction::Transaction}; use rayon::prelude::*; +use tokio::sync::{mpsc, oneshot}; +use tower::{Service, ServiceExt}; use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_consensus::transactions::new_tx_verification_data; @@ -21,12 +21,14 @@ use cuprate_types::{ }; use crate::{ - blockchain::manager::BlockchainManagerCommand, constants::PANIC_CRITICAL_SERVICE_ERROR, + blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk}, + constants::PANIC_CRITICAL_SERVICE_ERROR, }; /// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manager. /// -/// This channel is initialized in [`init_blockchain_manager`](super::manager::init_blockchain_manager). +/// This channel is initialized in [`init_blockchain_manager`](super::manager::init_blockchain_manager), the functions +/// in this file document what happens if this is not initialized when they are called. pub(super) static COMMAND_TX: OnceLock> = OnceLock::new(); /// An error that can be returned from [`handle_incoming_block`]. @@ -62,7 +64,7 @@ pub async fn handle_incoming_block( block: Block, given_txs: Vec, blockchain_read_handle: &mut BlockchainReadHandle, -) -> Result { +) -> Result { /// A [`HashSet`] of block hashes that the blockchain manager is currently handling. /// /// This lock prevents sending the same block to the blockchain manager from multiple connections @@ -88,7 +90,7 @@ pub async fn handle_incoming_block( .await .expect(PANIC_CRITICAL_SERVICE_ERROR) { - return Ok(false); + return Ok(IncomingBlockOk::AlreadyHave); } // TODO: remove this when we have a working tx-pool. @@ -110,15 +112,14 @@ pub async fn handle_incoming_block( .map_err(IncomingBlockError::InvalidBlock)?; let Some(incoming_block_tx) = COMMAND_TX.get() else { - // We could still be starting up the blockchain manager, so just return this as there is nothing - // else we can do. - return Ok(false); + // We could still be starting up the blockchain manager. + return Ok(IncomingBlockOk::NotReady); }; // Add the blocks hash to the blocks being handled. if !BLOCKS_BEING_HANDLED.lock().unwrap().insert(block_hash) { // If another place is already adding this block then we can stop. - return Ok(false); + return Ok(IncomingBlockOk::AlreadyHave); } // From this point on we MUST not early return without removing the block hash from `BLOCKS_BEING_HANDLED`. diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index eae9252..6019592 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -35,7 +35,7 @@ use crate::{ mod commands; mod handler; -pub use commands::BlockchainManagerCommand; +pub use commands::{BlockchainManagerCommand, IncomingBlockOk}; /// Initialize the blockchain manager. /// diff --git a/binaries/cuprated/src/blockchain/manager/commands.rs b/binaries/cuprated/src/blockchain/manager/commands.rs index a8d8473..f5890a8 100644 --- a/binaries/cuprated/src/blockchain/manager/commands.rs +++ b/binaries/cuprated/src/blockchain/manager/commands.rs @@ -15,6 +15,18 @@ pub enum BlockchainManagerCommand { /// All the transactions defined in [`Block::transactions`]. prepped_txs: HashMap<[u8; 32], TransactionVerificationData>, /// The channel to send the response down. - response_tx: oneshot::Sender>, + response_tx: oneshot::Sender>, }, } + +/// The [`Ok`] response for an incoming block. +pub enum IncomingBlockOk { + /// The block was added to the main-chain. + AddedToMainChain, + /// The blockchain manager is not ready yet. + NotReady, + /// The block was added to an alt-chain. + AddedToAltChain, + /// We already have the block. + AlreadyHave, +} diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index 23e8295..d37811b 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -1,10 +1,10 @@ //! The blockchain manager handler functions. -use std::{collections::HashMap, sync::Arc}; - use bytes::Bytes; use futures::{TryFutureExt, TryStreamExt}; use monero_serai::{block::Block, transaction::Transaction}; use rayon::prelude::*; +use std::ops::ControlFlow; +use std::{collections::HashMap, sync::Arc}; use tower::{Service, ServiceExt}; use tracing::info; @@ -22,6 +22,7 @@ use cuprate_types::{ AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation, }; +use crate::blockchain::manager::commands::IncomingBlockOk; use crate::{ blockchain::{ manager::commands::BlockchainManagerCommand, types::ConsensusBlockchainReadHandle, @@ -73,11 +74,10 @@ impl super::BlockchainManager { &mut self, block: Block, prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, - ) -> Result { + ) -> Result { if block.header.previous != self.cached_blockchain_context.top_hash { self.handle_incoming_alt_block(block, prepared_txs).await?; - - return Ok(false); + return Ok(IncomingBlockOk::AddedToAltChain); } let VerifyBlockResponse::MainChain(verified_block) = self @@ -91,7 +91,7 @@ impl super::BlockchainManager { }) .await? else { - panic!("Incorrect response!"); + unreachable!(); }; let block_blob = Bytes::copy_from_slice(&verified_block.block_blob); @@ -100,7 +100,7 @@ impl super::BlockchainManager { self.broadcast_block(block_blob, self.cached_blockchain_context.chain_height) .await; - Ok(true) + Ok(IncomingBlockOk::AddedToMainChain) } /// Handle an incoming [`BlockBatch`]. @@ -160,7 +160,7 @@ impl super::BlockchainManager { self.stop_current_block_downloader.notify_one(); return; } - _ => panic!("Incorrect response!"), + _ => unreachable!(), }; for (block, txs) in prepped_blocks { @@ -179,7 +179,7 @@ impl super::BlockchainManager { self.stop_current_block_downloader.notify_one(); return; } - _ => panic!("Incorrect response!"), + _ => unreachable!(), }; self.add_valid_block_to_main_chain(verified_block).await; @@ -226,16 +226,14 @@ impl super::BlockchainManager { self.stop_current_block_downloader.notify_one(); return; } - // the chain was reorged - Ok(true) => { + Ok(AddAltBlock::Reorged) => { // Collect the remaining blocks and add them to the main chain instead. batch.blocks = blocks.collect(); self.handle_incoming_block_batch_main_chain(batch).await; - return; } // continue adding alt blocks. - Ok(false) => (), + Ok(AddAltBlock::Cached) => (), } } } @@ -258,11 +256,11 @@ impl super::BlockchainManager { /// /// This function will panic if any internal service returns an unexpected error that we cannot /// recover from. - pub async fn handle_incoming_alt_block( + async fn handle_incoming_alt_block( &mut self, block: Block, prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, - ) -> Result { + ) -> Result { let VerifyBlockResponse::AltChain(alt_block_info) = self .block_verifier_service .ready() @@ -274,7 +272,7 @@ impl super::BlockchainManager { }) .await? else { - panic!("Incorrect response!"); + unreachable!(); }; // TODO: check in consensus crate if alt block with this hash already exists. @@ -284,7 +282,7 @@ impl super::BlockchainManager { > self.cached_blockchain_context.cumulative_difficulty { self.try_do_reorg(alt_block_info).await?; - return Ok(true); + return Ok(AddAltBlock::Reorged); } self.blockchain_write_handle @@ -294,7 +292,7 @@ impl super::BlockchainManager { .call(BlockchainWriteRequest::WriteAltBlock(alt_block_info)) .await?; - Ok(false) + Ok(AddAltBlock::Cached) } /// Attempt a re-org with the given top block of the alt-chain. @@ -328,7 +326,7 @@ impl super::BlockchainManager { )) .await? else { - panic!("Incorrect response!"); + unreachable!(); }; alt_blocks.push(top_alt_block); @@ -347,7 +345,7 @@ impl super::BlockchainManager { .await .expect(PANIC_CRITICAL_SERVICE_ERROR) else { - panic!("Incorrect response!"); + unreachable!(); }; self.blockchain_context_service @@ -409,7 +407,7 @@ impl super::BlockchainManager { }) .await? else { - panic!("Incorrect response!"); + unreachable!(); }; self.add_valid_block_to_main_chain(verified_block).await; @@ -465,9 +463,17 @@ impl super::BlockchainManager { .await .expect(PANIC_CRITICAL_SERVICE_ERROR) else { - panic!("Incorrect response!"); + unreachable!(); }; self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone(); } } + +/// The result from successfully adding an alt-block. +enum AddAltBlock { + /// The alt-block was cached. + Cached, + /// The chain was reorged. + Reorged, +}