From caaeceda2ee22f712d9f88c73b0aafa4a537fab5 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Thu, 3 Oct 2024 03:36:33 +0100 Subject: [PATCH] clean up the blockchain manger --- binaries/cuprated/src/blockchain.rs | 43 +------ .../src/blockchain/{free.rs => interface.rs} | 0 binaries/cuprated/src/blockchain/manager.rs | 112 ++++++++++++------ .../src/blockchain/manager/handler.rs | 72 +++++++---- binaries/cuprated/src/constants.rs | 3 + 5 files changed, 126 insertions(+), 104 deletions(-) rename binaries/cuprated/src/blockchain/{free.rs => interface.rs} (100%) diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index 649dcef0..80d10bf2 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -17,19 +17,19 @@ use cuprate_types::{ VerifiedBlockInformation, }; -mod free; +mod interface; mod manager; mod syncer; mod types; -use crate::blockchain::free::INCOMING_BLOCK_TX; +use crate::blockchain::interface::INCOMING_BLOCK_TX; use manager::BlockchainManager; use types::{ ChainService, ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle, }; -pub use free::{handle_incoming_block, IncomingBlockError}; +pub use interface::{handle_incoming_block, IncomingBlockError}; /// Checks if the genesis block is in the blockchain and adds it if not. pub async fn check_add_genesis( @@ -100,40 +100,3 @@ pub async fn init_consensus( Ok((block_verifier_svc, tx_verifier_svc, ctx_service)) } - -/// Initializes the blockchain manager task and syncer. -pub async fn init_blockchain_manager( - clearnet_interface: NetworkInterface, - blockchain_write_handle: BlockchainWriteHandle, - blockchain_read_handle: BlockchainReadHandle, - blockchain_context_service: BlockChainContextService, - block_verifier_service: ConcreteBlockVerifierService, - block_downloader_config: BlockDownloaderConfig, -) { - let (batch_tx, batch_rx) = mpsc::channel(1); - let stop_current_block_downloader = Arc::new(Notify::new()); - let (command_tx, command_rx) = mpsc::channel(1); - - INCOMING_BLOCK_TX.set(command_tx).unwrap(); - - tokio::spawn(syncer::syncer( - blockchain_context_service.clone(), - ChainService(blockchain_read_handle.clone()), - clearnet_interface.clone(), - batch_tx, - stop_current_block_downloader.clone(), - block_downloader_config, - )); - - let manager = BlockchainManager::new( - blockchain_write_handle, - blockchain_read_handle, - blockchain_context_service, - block_verifier_service, - stop_current_block_downloader, - clearnet_interface.broadcast_svc(), - ) - .await; - - tokio::spawn(manager.run(batch_rx, command_rx)); -} diff --git a/binaries/cuprated/src/blockchain/free.rs b/binaries/cuprated/src/blockchain/interface.rs similarity index 100% rename from binaries/cuprated/src/blockchain/free.rs rename to binaries/cuprated/src/blockchain/interface.rs diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index d5a7ff9a..314519d0 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -1,8 +1,13 @@ pub(super) mod commands; mod handler; +use crate::blockchain::interface::INCOMING_BLOCK_TX; use crate::blockchain::manager::commands::BlockchainManagerCommand; -use crate::blockchain::types::ConsensusBlockchainReadHandle; +use crate::blockchain::types::ChainService; +use crate::blockchain::{ + syncer, + types::{ConcreteBlockVerifierService, ConsensusBlockchainReadHandle}, +}; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_consensus::context::RawBlockChainContext; use cuprate_consensus::{ @@ -10,8 +15,8 @@ use cuprate_consensus::{ BlockVerifierService, ExtendedConsensusError, TxVerifierService, VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse, }; -use cuprate_p2p::block_downloader::BlockBatch; -use cuprate_p2p::BroadcastSvc; +use cuprate_p2p::block_downloader::{BlockBatch, BlockDownloaderConfig}; +use cuprate_p2p::{BroadcastSvc, NetworkInterface}; use cuprate_p2p_core::ClearNet; use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; use cuprate_types::{Chain, TransactionVerificationData}; @@ -25,55 +30,86 @@ use tower::{Service, ServiceExt}; use tracing::error; use tracing_subscriber::fmt::time::FormatTime; -pub struct BlockchainManager { +pub async fn init_blockchain_manger( + clearnet_interface: NetworkInterface, blockchain_write_handle: BlockchainWriteHandle, blockchain_read_handle: BlockchainReadHandle, + mut blockchain_context_service: BlockChainContextService, + block_verifier_service: ConcreteBlockVerifierService, + block_downloader_config: BlockDownloaderConfig, +) { + let (batch_tx, batch_rx) = mpsc::channel(1); + let stop_current_block_downloader = Arc::new(Notify::new()); + let (command_tx, command_rx) = mpsc::channel(1); + + INCOMING_BLOCK_TX.set(command_tx).unwrap(); + + tokio::spawn(syncer::syncer( + blockchain_context_service.clone(), + ChainService(blockchain_read_handle.clone()), + clearnet_interface.clone(), + batch_tx, + stop_current_block_downloader.clone(), + block_downloader_config, + )); + + let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service + .ready() + .await + .expect("TODO") + .call(BlockChainContextRequest::GetContext) + .await + .expect("TODO") + else { + panic!("Blockchain context service returned wrong response!"); + }; + + let manger = BlockchainManager { + blockchain_write_handle, + blockchain_read_handle, + blockchain_context_service, + cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(), + block_verifier_service, + stop_current_block_downloader, + broadcast_svc, + }; + + tokio::spawn(manger.run(batch_rx, command_rx)); +} + +/// The blockchain manager. +/// +/// This handles all mutation of the blockchain, anything that changes the state of the blockchain must +/// go through this. +/// +/// Other parts of Cuprate can interface with this by using the functions in [`interface`](super::interface). +pub struct BlockchainManager { + /// The [`BlockchainWriteHandle`], this is the _only_ part of Cuprate where a [`BlockchainWriteHandle`] + /// is held. + blockchain_write_handle: BlockchainWriteHandle, + /// A [`BlockchainReadHandle`]. + blockchain_read_handle: BlockchainReadHandle, + // TODO: Improve the API of the cache service. + // TODO: rename the cache service -> `BlockchainContextService`. + /// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve + /// values without needing to go to a [`BlockchainReadHandle`]. blockchain_context_service: BlockChainContextService, + /// A cached context representing the current state. cached_blockchain_context: RawBlockChainContext, + /// The block verifier service, to verify incoming blocks. block_verifier_service: BlockVerifierService< BlockChainContextService, TxVerifierService, ConsensusBlockchainReadHandle, >, + /// A [`Notify`] to tell the [syncer](syncer::syncer) that we want to cancel this current download + /// attempt. stop_current_block_downloader: Arc, + /// The broadcast service, to broadcast new blocks. broadcast_svc: BroadcastSvc, } impl BlockchainManager { - pub async fn new( - blockchain_write_handle: BlockchainWriteHandle, - blockchain_read_handle: BlockchainReadHandle, - mut blockchain_context_service: BlockChainContextService, - block_verifier_service: BlockVerifierService< - BlockChainContextService, - TxVerifierService, - ConsensusBlockchainReadHandle, - >, - stop_current_block_downloader: Arc, - broadcast_svc: BroadcastSvc, - ) -> Self { - let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service - .ready() - .await - .expect("TODO") - .call(BlockChainContextRequest::GetContext) - .await - .expect("TODO") - else { - panic!("Blockchain context service returned wrong response!"); - }; - - Self { - blockchain_write_handle, - blockchain_read_handle, - blockchain_context_service, - cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(), - block_verifier_service, - stop_current_block_downloader, - broadcast_svc, - } - } - pub async fn run( mut self, mut block_batch_rx: mpsc::Receiver, diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index 6a92ba73..7d4c81c4 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -21,6 +21,7 @@ use cuprate_types::{ }; use crate::blockchain::manager::commands::BlockchainManagerCommand; +use crate::constants::PANIC_CRITICAL_SERVICE_ERROR; use crate::{blockchain::types::ConsensusBlockchainReadHandle, signals::REORG_LOCK}; impl super::BlockchainManager { @@ -42,13 +43,13 @@ impl super::BlockchainManager { self.broadcast_svc .ready() .await - .expect("TODO") + .expect("Broadcast service cannot error.") .call(BroadcastRequest::Block { block_bytes, current_blockchain_height: usize_to_u64(blockchain_height), }) .await - .expect("TODO"); + .expect("Broadcast service cannot error."); } /// Handle an incoming [`Block`]. @@ -75,7 +76,7 @@ impl super::BlockchainManager { .block_verifier_service .ready() .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) .call(VerifyBlockRequest::MainChain { block, prepared_txs, @@ -102,7 +103,7 @@ impl super::BlockchainManager { /// # Panics /// /// This function will panic if the batch is empty or if any internal service returns an unexpected - /// error that we cannot recover from. + /// error that we cannot recover from or if the incoming batch contains no blocks. pub async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) { let (first_block, _) = batch .blocks @@ -127,7 +128,7 @@ impl super::BlockchainManager { /// # Panics /// /// This function will panic if any internal service returns an unexpected error that we cannot - /// recover from. + /// recover from or if the incoming batch contains no blocks. async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) { info!( "Handling batch to main chain height: {}", @@ -138,7 +139,7 @@ impl super::BlockchainManager { .block_verifier_service .ready() .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) .call(VerifyBlockRequest::MainChainBatchPrepareBlocks { blocks: batch.blocks, }) @@ -159,7 +160,7 @@ impl super::BlockchainManager { .block_verifier_service .ready() .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) .call(VerifyBlockRequest::MainChainPrepped { block, txs }) .await; @@ -189,8 +190,12 @@ impl super::BlockchainManager { /// /// This function will panic if any internal service returns an unexpected error that we cannot /// recover from. - async fn handle_incoming_block_batch_alt_chain(&mut self, batch: BlockBatch) { - for (block, txs) in batch.blocks { + async fn handle_incoming_block_batch_alt_chain(&mut self, mut batch: BlockBatch) { + // TODO: this needs testing (this whole section does but this specifically). + + let mut blocks = batch.blocks.into_iter(); + + while let Some((block, txs)) = blocks.next() { // async blocks work as try blocks. let res = async { let txs = txs @@ -201,16 +206,28 @@ impl super::BlockchainManager { }) .collect::>()?; - self.handle_incoming_alt_block(block, txs).await?; + let reorged = self.handle_incoming_alt_block(block, txs).await?; - Ok::<_, anyhow::Error>(()) + Ok::<_, anyhow::Error>(reorged) } .await; - if let Err(e) = res { - batch.peer_handle.ban_peer(LONG_BAN); - self.stop_current_block_downloader.notify_one(); - return; + match res { + Err(e) => { + batch.peer_handle.ban_peer(LONG_BAN); + self.stop_current_block_downloader.notify_one(); + return; + } + // the chain was reorged + Ok(true) => { + // Collect the remaining blocks and add them to the main chain instead. + batch.blocks = blocks.collect(); + self.handle_incoming_block_batch_main_chain(batch).await; + + return; + } + // continue adding alt blocks. + Ok(false) => (), } } } @@ -221,6 +238,8 @@ impl super::BlockchainManager { /// of the alt chain is higher than the main chain it will attempt a reorg otherwise it will add /// the alt block to the alt block cache. /// + /// This function returns a [`bool`] indicating if the chain was reorganised ([`true`]) or not ([`false`]). + /// /// # Errors /// /// This will return an [`Err`] if: @@ -235,12 +254,12 @@ impl super::BlockchainManager { &mut self, block: Block, prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, - ) -> Result<(), anyhow::Error> { + ) -> Result { let VerifyBlockResponse::AltChain(alt_block_info) = self .block_verifier_service .ready() .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) .call(VerifyBlockRequest::AltChain { block, prepared_txs, @@ -250,23 +269,24 @@ impl super::BlockchainManager { panic!("Incorrect response!"); }; - // TODO: check in consensus crate if alt block already exists. + // TODO: check in consensus crate if alt block with this hash already exists. + // If this alt chain if alt_block_info.cumulative_difficulty > self.cached_blockchain_context.cumulative_difficulty { self.try_do_reorg(alt_block_info).await?; - return Ok(()); + return Ok(true); } self.blockchain_write_handle .ready() .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) .call(BlockchainWriteRequest::WriteAltBlock(alt_block_info)) .await?; - Ok(()) + Ok(false) } /// Attempt a re-org with the given top block of the alt-chain. @@ -294,7 +314,7 @@ impl super::BlockchainManager { .blockchain_read_handle .ready() .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) .call(BlockchainReadRequest::AltBlocksInChain( top_alt_block.chain_id, )) @@ -312,12 +332,12 @@ impl super::BlockchainManager { .blockchain_write_handle .ready() .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) .call(BlockchainWriteRequest::PopBlocks( current_main_chain_height - split_height + 1, )) .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) else { panic!("Incorrect response!"); }; @@ -325,12 +345,12 @@ impl super::BlockchainManager { self.blockchain_context_service .ready() .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) .call(BlockChainContextRequest::PopBlocks { numb_blocks: current_main_chain_height - split_height + 1, }) .await - .expect("TODO"); + .expect(PANIC_CRITICAL_SERVICE_ERROR); let reorg_res = self.verify_add_alt_blocks_to_main_chain(alt_blocks).await; diff --git a/binaries/cuprated/src/constants.rs b/binaries/cuprated/src/constants.rs index 9463d476..d4dfc1ad 100644 --- a/binaries/cuprated/src/constants.rs +++ b/binaries/cuprated/src/constants.rs @@ -14,6 +14,9 @@ pub const VERSION_BUILD: &str = if cfg!(debug_assertions) { formatcp!("{VERSION}-release") }; +pub const PANIC_CRITICAL_SERVICE_ERROR: &str = + "A service critical to Cuprate's function returned an unexpected error."; + #[cfg(test)] mod test { use super::*;