diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index 1a9c0b7c..a06f3fa7 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -19,6 +19,7 @@ use cuprate_types::{ use crate::constants::PANIC_CRITICAL_SERVICE_ERROR; +mod chain_service; pub mod interface; mod manager; mod syncer; diff --git a/binaries/cuprated/src/blockchain/chain_service.rs b/binaries/cuprated/src/blockchain/chain_service.rs new file mode 100644 index 00000000..af862d1d --- /dev/null +++ b/binaries/cuprated/src/blockchain/chain_service.rs @@ -0,0 +1,72 @@ +use std::task::{Context, Poll}; + +use futures::{future::BoxFuture, FutureExt, TryFutureExt}; +use tower::Service; + +use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse}; +use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; + +/// That service that allows retrieving the chain state to give to the P2P crates, so we can figure out +/// what blocks we need. +/// +/// This has a more minimal interface than [`BlockchainReadRequest`] to make using the p2p crates easier. +#[derive(Clone)] +pub struct ChainService(pub BlockchainReadHandle); + +impl Service for ChainService { + type Response = ChainSvcResponse; + type Error = tower::BoxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, req: ChainSvcRequest) -> Self::Future { + let map_res = |res: BlockchainResponse| match res { + BlockchainResponse::CompactChainHistory { + block_ids, + cumulative_difficulty, + } => ChainSvcResponse::CompactHistory { + block_ids, + cumulative_difficulty, + }, + BlockchainResponse::FindFirstUnknown(res) => ChainSvcResponse::FindFirstUnknown(res), + _ => unreachable!(), + }; + + match req { + ChainSvcRequest::CompactHistory => self + .0 + .call(BlockchainReadRequest::CompactChainHistory) + .map_ok(map_res) + .map_err(Into::into) + .boxed(), + ChainSvcRequest::FindFirstUnknown(req) => self + .0 + .call(BlockchainReadRequest::FindFirstUnknown(req)) + .map_ok(map_res) + .map_err(Into::into) + .boxed(), + ChainSvcRequest::CumulativeDifficulty => self + .0 + .call(BlockchainReadRequest::CompactChainHistory) + .map_ok(|res| { + // TODO create a custom request instead of hijacking this one. + // TODO: use the context cache. + let BlockchainResponse::CompactChainHistory { + cumulative_difficulty, + .. + } = res + else { + unreachable!() + }; + + ChainSvcResponse::CumulativeDifficulty(cumulative_difficulty) + }) + .map_err(Into::into) + .boxed(), + } + } +} diff --git a/binaries/cuprated/src/blockchain/interface.rs b/binaries/cuprated/src/blockchain/interface.rs index b36a31ff..c371d43c 100644 --- a/binaries/cuprated/src/blockchain/interface.rs +++ b/binaries/cuprated/src/blockchain/interface.rs @@ -1,10 +1,10 @@ -//! The blockchain manger interface. +//! The blockchain manager interface. //! //! This module contains all the functions to mutate the blockchain's state in any way, through the -//! blockchain manger. +//! blockchain manager. use std::{ collections::{HashMap, HashSet}, - sync::{Mutex, OnceLock}, + sync::{LazyLock, Mutex, OnceLock}, }; use monero_serai::{block::Block, transaction::Transaction}; @@ -21,18 +21,15 @@ use cuprate_types::{ }; use crate::{ - blockchain::manager::BlockchainManagerCommand, constants::PANIC_CRITICAL_SERVICE_ERROR, + blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk}, + constants::PANIC_CRITICAL_SERVICE_ERROR, }; -/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manger. -pub static COMMAND_TX: OnceLock> = OnceLock::new(); - -/// A [`HashSet`] of block hashes that the blockchain manager is currently handling. +/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manager. /// -/// This is used over something like a dashmap as we expect a lot of collisions in a short amount of -/// time for new blocks so we would lose the benefit of sharded locks. A dashmap is made up of `RwLocks` -/// which are also more expensive than `Mutex`s. -pub static BLOCKS_BEING_HANDLED: OnceLock>> = OnceLock::new(); +/// This channel is initialized in [`init_blockchain_manager`](super::manager::init_blockchain_manager), the functions +/// in this file document what happens if this is not initialized when they are called. +pub(super) static COMMAND_TX: OnceLock> = OnceLock::new(); /// An error that can be returned from [`handle_incoming_block`]. #[derive(Debug, thiserror::Error)] @@ -52,10 +49,7 @@ pub enum IncomingBlockError { /// Try to add a new block to the blockchain. /// -/// This returns a [`bool`] indicating if the block was added to the main-chain ([`true`]) or an alt-chain -/// ([`false`]). -/// -/// If we already knew about this block or the blockchain manger is not setup yet `Ok(false)` is returned. +/// On success returns [`IncomingBlockOk`]. /// /// # Errors /// @@ -67,7 +61,17 @@ pub async fn handle_incoming_block( block: Block, given_txs: HashMap<[u8; 32], TransactionVerificationData>, blockchain_read_handle: &mut BlockchainReadHandle, -) -> Result { +) -> Result { + /// A [`HashSet`] of block hashes that the blockchain manager is currently handling. + /// + /// This lock prevents sending the same block to the blockchain manager from multiple connections + /// before one of them actually gets added to the chain, allowing peers to do other things. + /// + /// This is used over something like a dashmap as we expect a lot of collisions in a short amount of + /// time for new blocks, so we would lose the benefit of sharded locks. A dashmap is made up of `RwLocks` + /// which are also more expensive than `Mutex`s. + static BLOCKS_BEING_HANDLED: LazyLock>> = + LazyLock::new(|| Mutex::new(HashSet::new())); // FIXME: we should look in the tx-pool for txs when that is ready. if !block_exists(block.header.previous, blockchain_read_handle) @@ -83,7 +87,7 @@ pub async fn handle_incoming_block( .await .expect(PANIC_CRITICAL_SERVICE_ERROR) { - return Ok(false); + return Ok(IncomingBlockOk::AlreadyHave); } // TODO: remove this when we have a working tx-pool. @@ -97,20 +101,14 @@ pub async fn handle_incoming_block( // TODO: check we actually got given the right txs. let Some(incoming_block_tx) = COMMAND_TX.get() else { - // We could still be starting up the blockchain manger, so just return this as there is nothing - // else we can do. - return Ok(false); + // We could still be starting up the blockchain manager. + return Ok(IncomingBlockOk::NotReady); }; // Add the blocks hash to the blocks being handled. - if !BLOCKS_BEING_HANDLED - .get_or_init(|| Mutex::new(HashSet::new())) - .lock() - .unwrap() - .insert(block_hash) - { + if !BLOCKS_BEING_HANDLED.lock().unwrap().insert(block_hash) { // If another place is already adding this block then we can stop. - return Ok(false); + return Ok(IncomingBlockOk::AlreadyHave); } // From this point on we MUST not early return without removing the block hash from `BLOCKS_BEING_HANDLED`. @@ -132,12 +130,7 @@ pub async fn handle_incoming_block( .map_err(IncomingBlockError::InvalidBlock); // Remove the block hash from the blocks being handled. - BLOCKS_BEING_HANDLED - .get() - .unwrap() - .lock() - .unwrap() - .remove(&block_hash); + BLOCKS_BEING_HANDLED.lock().unwrap().remove(&block_hash); res } @@ -153,7 +146,7 @@ async fn block_exists( .call(BlockchainReadRequest::FindBlock(block_hash)) .await? else { - panic!("Invalid blockchain response!"); + unreachable!(); }; Ok(chain.is_some()) diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index 8725aeca..568ed572 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -24,9 +24,9 @@ use cuprate_types::{ use crate::{ blockchain::{ + chain_service::ChainService, interface::COMMAND_TX, syncer, - types::ChainService, types::{ConcreteBlockVerifierService, ConsensusBlockchainReadHandle}, }, constants::PANIC_CRITICAL_SERVICE_ERROR, @@ -35,13 +35,13 @@ use crate::{ mod commands; mod handler; -pub use commands::BlockchainManagerCommand; +pub use commands::{BlockchainManagerCommand, IncomingBlockOk}; -/// Initialize the blockchain manger. +/// Initialize the blockchain manager. /// /// This function sets up the [`BlockchainManager`] and the [`syncer`] so that the functions in [`interface`](super::interface) /// can be called. -pub async fn init_blockchain_manger( +pub async fn init_blockchain_manager( clearnet_interface: NetworkInterface, blockchain_write_handle: BlockchainWriteHandle, blockchain_read_handle: BlockchainReadHandle, @@ -73,10 +73,10 @@ pub async fn init_blockchain_manger( .await .expect(PANIC_CRITICAL_SERVICE_ERROR) else { - panic!("Blockchain context service returned wrong response!"); + unreachable!() }; - let manger = BlockchainManager { + let manager = BlockchainManager { blockchain_write_handle, blockchain_read_handle, blockchain_context_service, @@ -86,7 +86,7 @@ pub async fn init_blockchain_manger( broadcast_svc: clearnet_interface.broadcast_svc(), }; - tokio::spawn(manger.run(batch_rx, command_rx)); + tokio::spawn(manager.run(batch_rx, command_rx)); } /// The blockchain manager. diff --git a/binaries/cuprated/src/blockchain/manager/commands.rs b/binaries/cuprated/src/blockchain/manager/commands.rs index a8d84733..643ed88c 100644 --- a/binaries/cuprated/src/blockchain/manager/commands.rs +++ b/binaries/cuprated/src/blockchain/manager/commands.rs @@ -1,4 +1,4 @@ -//! This module contains the commands for th blockchain manager. +//! This module contains the commands for the blockchain manager. use std::collections::HashMap; use monero_serai::block::Block; @@ -15,6 +15,18 @@ pub enum BlockchainManagerCommand { /// All the transactions defined in [`Block::transactions`]. prepped_txs: HashMap<[u8; 32], TransactionVerificationData>, /// The channel to send the response down. - response_tx: oneshot::Sender>, + response_tx: oneshot::Sender>, }, } + +/// The [`Ok`] response for an incoming block. +pub enum IncomingBlockOk { + /// The block was added to the main-chain. + AddedToMainChain, + /// The blockchain manager is not ready yet. + NotReady, + /// The block was added to an alt-chain. + AddedToAltChain, + /// We already have the block. + AlreadyHave, +} diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index 14073062..e9e6c3c3 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -1,10 +1,10 @@ -//! The blockchain manger handler functions. -use std::{collections::HashMap, sync::Arc}; - +//! The blockchain manager handler functions. use bytes::Bytes; use futures::{TryFutureExt, TryStreamExt}; use monero_serai::{block::Block, transaction::Transaction}; use rayon::prelude::*; +use std::ops::ControlFlow; +use std::{collections::HashMap, sync::Arc}; use tower::{Service, ServiceExt}; use tracing::info; @@ -22,6 +22,7 @@ use cuprate_types::{ AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation, }; +use crate::blockchain::manager::commands::IncomingBlockOk; use crate::{ blockchain::{ manager::commands::BlockchainManagerCommand, types::ConsensusBlockchainReadHandle, @@ -32,6 +33,11 @@ use crate::{ impl super::BlockchainManager { /// Handle an incoming command from another part of Cuprate. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. pub async fn handle_command(&mut self, command: BlockchainManagerCommand) { match command { BlockchainManagerCommand::AddBlock { @@ -67,17 +73,18 @@ impl super::BlockchainManager { /// /// Otherwise, this function will validate and add the block to the main chain. /// - /// On success returns a [`bool`] indicating if the block was added to the main chain ([`true`]) - /// or an alt-chain ([`false`]). + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. pub async fn handle_incoming_block( &mut self, block: Block, prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, - ) -> Result { + ) -> Result { if block.header.previous != self.cached_blockchain_context.top_hash { self.handle_incoming_alt_block(block, prepared_txs).await?; - - return Ok(false); + return Ok(IncomingBlockOk::AddedToAltChain); } let VerifyBlockResponse::MainChain(verified_block) = self @@ -91,7 +98,7 @@ impl super::BlockchainManager { }) .await? else { - panic!("Incorrect response!"); + unreachable!(); }; let block_blob = Bytes::copy_from_slice(&verified_block.block_blob); @@ -100,7 +107,7 @@ impl super::BlockchainManager { self.broadcast_block(block_blob, self.cached_blockchain_context.chain_height) .await; - Ok(true) + Ok(IncomingBlockOk::AddedToMainChain) } /// Handle an incoming [`BlockBatch`]. @@ -160,7 +167,7 @@ impl super::BlockchainManager { self.stop_current_block_downloader.notify_one(); return; } - _ => panic!("Incorrect response!"), + _ => unreachable!(), }; for (block, txs) in prepped_blocks { @@ -179,7 +186,7 @@ impl super::BlockchainManager { self.stop_current_block_downloader.notify_one(); return; } - _ => panic!("Incorrect response!"), + _ => unreachable!(), }; self.add_valid_block_to_main_chain(verified_block).await; @@ -226,16 +233,14 @@ impl super::BlockchainManager { self.stop_current_block_downloader.notify_one(); return; } - // the chain was reorged - Ok(true) => { + Ok(AddAltBlock::Reorged) => { // Collect the remaining blocks and add them to the main chain instead. batch.blocks = blocks.collect(); self.handle_incoming_block_batch_main_chain(batch).await; - return; } // continue adding alt blocks. - Ok(false) => (), + Ok(AddAltBlock::Cached) => (), } } } @@ -246,8 +251,6 @@ impl super::BlockchainManager { /// of the alt chain is higher than the main chain it will attempt a reorg otherwise it will add /// the alt block to the alt block cache. /// - /// This function returns a [`bool`] indicating if the chain was reorganised ([`true`]) or not ([`false`]). - /// /// # Errors /// /// This will return an [`Err`] if: @@ -258,11 +261,11 @@ impl super::BlockchainManager { /// /// This function will panic if any internal service returns an unexpected error that we cannot /// recover from. - pub async fn handle_incoming_alt_block( + async fn handle_incoming_alt_block( &mut self, block: Block, prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, - ) -> Result { + ) -> Result { let VerifyBlockResponse::AltChain(alt_block_info) = self .block_verifier_service .ready() @@ -274,7 +277,7 @@ impl super::BlockchainManager { }) .await? else { - panic!("Incorrect response!"); + unreachable!(); }; // TODO: check in consensus crate if alt block with this hash already exists. @@ -284,7 +287,7 @@ impl super::BlockchainManager { > self.cached_blockchain_context.cumulative_difficulty { self.try_do_reorg(alt_block_info).await?; - return Ok(true); + return Ok(AddAltBlock::Reorged); } self.blockchain_write_handle @@ -294,7 +297,7 @@ impl super::BlockchainManager { .call(BlockchainWriteRequest::WriteAltBlock(alt_block_info)) .await?; - Ok(false) + Ok(AddAltBlock::Cached) } /// Attempt a re-org with the given top block of the alt-chain. @@ -328,7 +331,7 @@ impl super::BlockchainManager { )) .await? else { - panic!("Incorrect response!"); + unreachable!(); }; alt_blocks.push(top_alt_block); @@ -347,7 +350,7 @@ impl super::BlockchainManager { .await .expect(PANIC_CRITICAL_SERVICE_ERROR) else { - panic!("Incorrect response!"); + unreachable!(); }; self.blockchain_context_service @@ -409,7 +412,7 @@ impl super::BlockchainManager { }) .await? else { - panic!("Incorrect response!"); + unreachable!(); }; self.add_valid_block_to_main_chain(verified_block).await; @@ -465,9 +468,17 @@ impl super::BlockchainManager { .await .expect(PANIC_CRITICAL_SERVICE_ERROR) else { - panic!("Incorrect response!"); + unreachable!(); }; self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone(); } } + +/// The result from successfully adding an alt-block. +enum AddAltBlock { + /// The alt-block was cached. + Cached, + /// The chain was reorged. + Reorged, +} diff --git a/binaries/cuprated/src/blockchain/syncer.rs b/binaries/cuprated/src/blockchain/syncer.rs index 8c58c54e..7e72c36c 100644 --- a/binaries/cuprated/src/blockchain/syncer.rs +++ b/binaries/cuprated/src/blockchain/syncer.rs @@ -65,7 +65,7 @@ where .call(BlockChainContextRequest::GetContext) .await? else { - panic!("Blockchain context service returned wrong response!"); + unreachable!(); }; let client_pool = clearnet_interface.client_pool(); @@ -130,7 +130,7 @@ where .oneshot(BlockChainContextRequest::GetContext) .await? else { - panic!("Blockchain context service returned wrong response!"); + unreachable!(); }; *old_context = ctx; diff --git a/binaries/cuprated/src/blockchain/types.rs b/binaries/cuprated/src/blockchain/types.rs index 1bf921ec..e3ee62b3 100644 --- a/binaries/cuprated/src/blockchain/types.rs +++ b/binaries/cuprated/src/blockchain/types.rs @@ -22,67 +22,3 @@ pub type ConcreteTxVerifierService = TxVerifierService tower::BoxError>; - -/// That service that allows retrieving the chain state to give to the P2P crates, so we can figure out -/// what blocks we need. -/// -/// This has a more minimal interface than [`BlockchainReadRequest`] to make using the p2p crates easier. -#[derive(Clone)] -pub struct ChainService(pub BlockchainReadHandle); - -impl Service for ChainService { - type Response = ChainSvcResponse; - type Error = tower::BoxError; - type Future = BoxFuture<'static, Result>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.0.poll_ready(cx).map_err(Into::into) - } - - fn call(&mut self, req: ChainSvcRequest) -> Self::Future { - let map_res = |res: BlockchainResponse| match res { - BlockchainResponse::CompactChainHistory { - block_ids, - cumulative_difficulty, - } => ChainSvcResponse::CompactHistory { - block_ids, - cumulative_difficulty, - }, - BlockchainResponse::FindFirstUnknown(res) => ChainSvcResponse::FindFirstUnknown(res), - _ => panic!("Blockchain returned wrong response"), - }; - - match req { - ChainSvcRequest::CompactHistory => self - .0 - .call(BlockchainReadRequest::CompactChainHistory) - .map_ok(map_res) - .map_err(Into::into) - .boxed(), - ChainSvcRequest::FindFirstUnknown(req) => self - .0 - .call(BlockchainReadRequest::FindFirstUnknown(req)) - .map_ok(map_res) - .map_err(Into::into) - .boxed(), - ChainSvcRequest::CumulativeDifficulty => self - .0 - .call(BlockchainReadRequest::CompactChainHistory) - .map_ok(|res| { - // TODO create a custom request instead of hijacking this one. - // TODO: use the context cache. - let BlockchainResponse::CompactChainHistory { - cumulative_difficulty, - .. - } = res - else { - panic!("Blockchain returned wrong response"); - }; - - ChainSvcResponse::CumulativeDifficulty(cumulative_difficulty) - }) - .map_err(Into::into) - .boxed(), - } - } -} diff --git a/binaries/cuprated/src/constants.rs b/binaries/cuprated/src/constants.rs index d4dfc1ad..2f3c7bb6 100644 --- a/binaries/cuprated/src/constants.rs +++ b/binaries/cuprated/src/constants.rs @@ -14,6 +14,7 @@ pub const VERSION_BUILD: &str = if cfg!(debug_assertions) { formatcp!("{VERSION}-release") }; +/// The panic message used when cuprated encounters a critical service error. pub const PANIC_CRITICAL_SERVICE_ERROR: &str = "A service critical to Cuprate's function returned an unexpected error."; diff --git a/binaries/cuprated/src/signals.rs b/binaries/cuprated/src/signals.rs index 54467559..8502679c 100644 --- a/binaries/cuprated/src/signals.rs +++ b/binaries/cuprated/src/signals.rs @@ -6,4 +6,7 @@ use tokio::sync::RwLock; /// /// A [`RwLock`] where a write lock is taken during a reorg and a read lock can be taken /// for any operation which must complete without a reorg happening. +/// +/// Currently, the only operation that needs to take a read lock is adding txs to the tx-pool, +/// this can potentially be removed in the future, see: TODO pub static REORG_LOCK: RwLock<()> = RwLock::const_new(());