diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index 69de3399..ae5a1d3d 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -22,7 +22,7 @@ use tracing::error; pub struct IncomingBlock { pub block: Block, pub prepped_txs: HashMap<[u8; 32], TransactionVerificationData>, - pub response_tx: oneshot::Sender>, + pub response_tx: oneshot::Sender>, } pub struct BlockchainManager { @@ -35,7 +35,7 @@ pub struct BlockchainManager { TxVerifierService, ConsensusBlockchainReadHandle, >, - // TODO: stop_current_block_downloader: Notify, + stop_current_block_downloader: Notify, } impl BlockchainManager { diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index f9f6ce80..1bdae16c 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -1,40 +1,45 @@ -use crate::blockchain::types::ConsensusBlockchainReadHandle; -use crate::signals::REORG_LOCK; +use std::{collections::HashMap, sync::Arc}; + +use futures::{TryFutureExt, TryStreamExt}; +use monero_serai::{block::Block, transaction::Transaction}; +use rayon::prelude::*; +use tower::{Service, ServiceExt}; +use tracing::info; + use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; -use cuprate_consensus::block::PreparedBlock; -use cuprate_consensus::context::NewBlockData; -use cuprate_consensus::transactions::new_tx_verification_data; use cuprate_consensus::{ + block::PreparedBlock, context::NewBlockData, transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse, BlockVerifierService, ExtendedConsensusError, VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse, }; -use cuprate_p2p::block_downloader::BlockBatch; -use cuprate_types::blockchain::{ - BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest, -}; +use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN}; use cuprate_types::{ + blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest}, AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation, }; -use futures::{TryFutureExt, TryStreamExt}; -use monero_serai::block::Block; -use monero_serai::transaction::Transaction; -use rayon::prelude::*; -use std::collections::HashMap; -use std::sync::Arc; -use tower::{Service, ServiceExt}; -use tracing::info; + +use crate::{blockchain::types::ConsensusBlockchainReadHandle, signals::REORG_LOCK}; impl super::BlockchainManager { + /// Handle an incoming [`Block`]. + /// + /// This function will route to [`Self::handle_incoming_alt_block`] if the block does not follow + /// the top of the main chain. + /// + /// Otherwise, this function will validate and add the block to the main chain. + /// + /// On success returns a [`bool`] indicating if the block was added to the main chain ([`true`]) + /// of an alt-chain ([`false`]). pub async fn handle_incoming_block( &mut self, block: Block, prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, - ) -> Result<(), anyhow::Error> { + ) -> Result { if block.header.previous != self.cached_blockchain_context.top_hash { self.handle_incoming_alt_block(block, prepared_txs).await?; - return Ok(()); + return Ok(false); } let VerifyBlockResponse::MainChain(verified_block) = self @@ -53,9 +58,18 @@ impl super::BlockchainManager { self.add_valid_block_to_main_chain(verified_block).await; - Ok(()) + Ok(true) } + /// Handle an incoming [`BlockBatch`]. + /// + /// This function will route to [`Self::handle_incoming_block_batch_main_chain`] or [`Self::handle_incoming_block_batch_alt_chain`] + /// depending on if the first block in the batch follows from the top of our chain. + /// + /// # Panics + /// + /// This function will panic if the batch is empty or if any internal service returns an unexpected + /// error that we cannot recover from. pub async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) { let (first_block, _) = batch .blocks @@ -63,26 +77,36 @@ impl super::BlockchainManager { .expect("Block batch should not be empty"); if first_block.header.previous == self.cached_blockchain_context.top_hash { - self.handle_incoming_block_batch_main_chain(batch) - .await - .expect("TODO"); + self.handle_incoming_block_batch_main_chain(batch).await; } else { - self.handle_incoming_block_batch_alt_chain(batch) - .await - .expect("TODO"); + self.handle_incoming_block_batch_alt_chain(batch).await; } } - async fn handle_incoming_block_batch_main_chain( - &mut self, - batch: BlockBatch, - ) -> Result<(), anyhow::Error> { + /// Handles an incoming [`BlockBatch`] that follows the main chain. + /// + /// This function will handle validating the blocks in the batch and adding them to the blockchain + /// database and context cache. + /// + /// This function will also handle banning the peer and canceling the block downloader if the + /// block is invalid. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. + async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) { info!( "Handling batch to main chain height: {}", batch.blocks.first().unwrap().0.number().unwrap() ); - let VerifyBlockResponse::MainChainBatchPrepped(prepped) = self + let ban_cancel_download = || { + batch.peer_handle.ban_peer(LONG_BAN); + self.stop_current_block_downloader.notify_one(); + }; + + let batch_prep_res = self .block_verifier_service .ready() .await @@ -90,21 +114,33 @@ impl super::BlockchainManager { .call(VerifyBlockRequest::MainChainBatchPrepareBlocks { blocks: batch.blocks, }) - .await? - else { - panic!("Incorrect response!"); + .await; + + let prepped_blocks = match batch_prep_res { + Ok(VerifyBlockResponse::MainChainBatchPrepped(prepped_blocks)) => prepped_blocks, + Err(_) => { + ban_cancel_download(); + return; + } + _ => panic!("Incorrect response!"), }; - for (block, txs) in prepped { - let VerifyBlockResponse::MainChain(verified_block) = self + for (block, txs) in prepped_blocks { + let verify_res = self .block_verifier_service .ready() .await .expect("TODO") .call(VerifyBlockRequest::MainChainPrepped { block, txs }) - .await? - else { - panic!("Incorrect response!"); + .await; + + let VerifyBlockResponse::MainChain(verified_block) = match verify_res { + Ok(VerifyBlockResponse::MainChain(verified_block)) => verified_block, + Err(_) => { + ban_cancel_download(); + return; + } + _ => panic!("Incorrect response!"), }; self.add_valid_block_to_main_chain(verified_block).await; @@ -113,25 +149,60 @@ impl super::BlockchainManager { Ok(()) } - async fn handle_incoming_block_batch_alt_chain( - &mut self, - batch: BlockBatch, - ) -> Result<(), anyhow::Error> { + /// Handles an incoming [`BlockBatch`] that does not follow the main-chain. + /// + /// This function will handle validating the alt-blocks to add them to our cache and reorging the + /// chain if the alt-chain has a higher cumulative difficulty. + /// + /// This function will also handle banning the peer and canceling the block downloader if the + /// alt block is invalid or if a reorg fails. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. + async fn handle_incoming_block_batch_alt_chain(&mut self, batch: BlockBatch) { for (block, txs) in batch.blocks { - let txs = txs - .into_par_iter() - .map(|tx| { - let tx = new_tx_verification_data(tx)?; - Ok((tx.tx_hash, tx)) - }) - .collect::>()?; + // async blocks work as try blocks. + let res = async { + let txs = txs + .into_par_iter() + .map(|tx| { + let tx = new_tx_verification_data(tx)?; + Ok((tx.tx_hash, tx)) + }) + .collect::>()?; - self.handle_incoming_alt_block(block, txs).await?; + self.handle_incoming_alt_block(block, txs).await?; + + Ok(()) + } + .await; + + if let Err(e) = res { + batch.peer_handle.ban_peer(LONG_BAN); + self.stop_current_block_downloader.notify_one(); + return; + } } - - Ok(()) } + /// Handles an incoming alt [`Block`]. + /// + /// This function will do some pre-validation of the alt block, then if the cumulative difficulty + /// of the alt chain is higher than the main chain it will attempt a reorg otherwise it will add + /// the alt block to the alt block cache. + /// + /// # Errors + /// + /// This will return an [`Err`] if: + /// - The alt block was invalid. + /// - An attempt to reorg the chain failed. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. pub async fn handle_incoming_alt_block( &mut self, block: Block, @@ -157,8 +228,6 @@ impl super::BlockchainManager { > self.cached_blockchain_context.cumulative_difficulty { self.try_do_reorg(alt_block_info).await?; - // TODO: ban the peer if the reorg failed. - return Ok(()); } @@ -172,6 +241,21 @@ impl super::BlockchainManager { Ok(()) } + /// Attempt a re-org with the given top block of the alt-chain. + /// + /// This function will take a write lock on [`REORG_LOCK`] and then set up the blockchain database + /// and context cache to verify the alt-chain. It will then attempt to verify and add each block + /// in the alt-chain to tha main-chain. Releasing the lock on [`REORG_LOCK`] when finished. + /// + /// # Errors + /// + /// This function will return an [`Err`] if the re-org was unsuccessful, if this happens the chain + /// will be returned back into its state it was at when then function was called. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. async fn try_do_reorg( &mut self, top_alt_block: AltBlockInformation, @@ -230,6 +314,21 @@ impl super::BlockchainManager { } } + /// Verify and add a list of [`AltBlockInformation`]s to the main-chain. + /// + /// This function assumes the first [`AltBlockInformation`] is the next block in the blockchain + /// for the blockchain database and the context cache, or in other words that the blockchain database + /// and context cache has had the top blocks popped to where the alt-chain meets the main-chain. + /// + /// # Errors + /// + /// This function will return an [`Err`] if the alt-blocks were invalid, in this case the re-org should + /// be aborted and the chain should be returned to its previous state. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. async fn verify_add_alt_blocks_to_main_chain( &mut self, alt_blocks: Vec, @@ -263,6 +362,15 @@ impl super::BlockchainManager { Ok(()) } + /// Adds a [`VerifiedBlockInformation`] to the main-chain. + /// + /// This function will update the blockchain database and the context cache, it will also + /// update [`Self::cached_blockchain_context`]. + /// + /// # Panics + /// + /// This function will panic if any internal service returns an unexpected error that we cannot + /// recover from. pub async fn add_valid_block_to_main_chain( &mut self, verified_block: VerifiedBlockInformation, diff --git a/binaries/cuprated/src/blockchain/syncer.rs b/binaries/cuprated/src/blockchain/syncer.rs index dc738123..fbf4a88f 100644 --- a/binaries/cuprated/src/blockchain/syncer.rs +++ b/binaries/cuprated/src/blockchain/syncer.rs @@ -1,7 +1,11 @@ +use std::pin::pin; use std::time::Duration; use futures::StreamExt; -use tokio::{sync::mpsc, time::sleep}; +use tokio::{ + sync::{mpsc, Notify}, + time::sleep, +}; use tower::{Service, ServiceExt}; use tracing::instrument; @@ -27,6 +31,7 @@ pub async fn syncer( our_chain: CN, clearnet_interface: NetworkInterface, incoming_block_batch_tx: mpsc::Sender, + stop_current_block_downloader: Notify, block_downloader_config: BlockDownloaderConfig, ) -> Result<(), SyncerError> where @@ -82,10 +87,18 @@ where let mut block_batch_stream = clearnet_interface.block_downloader(our_chain.clone(), block_downloader_config); - while let Some(batch) = block_batch_stream.next().await { - tracing::debug!("Got batch, len: {}", batch.blocks.len()); - if incoming_block_batch_tx.send(batch).await.is_err() { - return Err(SyncerError::IncomingBlockChannelClosed); + loop { + tokio::select! { + _ = stop_current_block_downloader.notified() => { + tracing::info!("Stopping block downloader"); + break; + } + Some(batch) = block_batch_stream.next() => { + tracing::debug!("Got batch, len: {}", batch.blocks.len()); + if incoming_block_batch_tx.send(batch).await.is_err() { + return Err(SyncerError::IncomingBlockChannelClosed); + } + } } } } diff --git a/p2p/p2p/src/constants.rs b/p2p/p2p/src/constants.rs index 4e6daa73..0dbd188e 100644 --- a/p2p/p2p/src/constants.rs +++ b/p2p/p2p/src/constants.rs @@ -10,13 +10,13 @@ pub(crate) const MAX_SEED_CONNECTIONS: usize = 3; pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5); /// The durations of a short ban. -pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10); +pub const SHORT_BAN: Duration = Duration::from_secs(60 * 10); /// The durations of a medium ban. -pub(crate) const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24); +pub const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24); /// The durations of a long ban. -pub(crate) const LONG_BAN: Duration = Duration::from_secs(60 * 60 * 24 * 7); +pub const LONG_BAN: Duration = Duration::from_secs(60 * 60 * 24 * 7); /// The default amount of time between inbound diffusion flushes. pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5); diff --git a/p2p_state.bin b/p2p_state.bin index fc17e050..9faaaff0 100644 Binary files a/p2p_state.bin and b/p2p_state.bin differ