add docs to handler functions

This commit is contained in:
Boog900 2024-09-13 03:17:30 +01:00
parent 6ec5bc37a9
commit 90beed1590
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
5 changed files with 186 additions and 65 deletions

View file

@ -22,7 +22,7 @@ use tracing::error;
pub struct IncomingBlock { pub struct IncomingBlock {
pub block: Block, pub block: Block,
pub prepped_txs: HashMap<[u8; 32], TransactionVerificationData>, pub prepped_txs: HashMap<[u8; 32], TransactionVerificationData>,
pub response_tx: oneshot::Sender<Result<(), anyhow::Error>>, pub response_tx: oneshot::Sender<Result<bool, anyhow::Error>>,
} }
pub struct BlockchainManager { pub struct BlockchainManager {
@ -35,7 +35,7 @@ pub struct BlockchainManager {
TxVerifierService<ConsensusBlockchainReadHandle>, TxVerifierService<ConsensusBlockchainReadHandle>,
ConsensusBlockchainReadHandle, ConsensusBlockchainReadHandle,
>, >,
// TODO: stop_current_block_downloader: Notify, stop_current_block_downloader: Notify,
} }
impl BlockchainManager { impl BlockchainManager {

View file

@ -1,40 +1,45 @@
use crate::blockchain::types::ConsensusBlockchainReadHandle; use std::{collections::HashMap, sync::Arc};
use crate::signals::REORG_LOCK;
use futures::{TryFutureExt, TryStreamExt};
use monero_serai::{block::Block, transaction::Transaction};
use rayon::prelude::*;
use tower::{Service, ServiceExt};
use tracing::info;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::block::PreparedBlock;
use cuprate_consensus::context::NewBlockData;
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_consensus::{ use cuprate_consensus::{
block::PreparedBlock, context::NewBlockData, transactions::new_tx_verification_data,
BlockChainContextRequest, BlockChainContextResponse, BlockVerifierService, BlockChainContextRequest, BlockChainContextResponse, BlockVerifierService,
ExtendedConsensusError, VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, ExtendedConsensusError, VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest,
VerifyTxResponse, VerifyTxResponse,
}; };
use cuprate_p2p::block_downloader::BlockBatch; use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN};
use cuprate_types::blockchain::{
BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest,
};
use cuprate_types::{ use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest},
AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation, AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation,
}; };
use futures::{TryFutureExt, TryStreamExt};
use monero_serai::block::Block; use crate::{blockchain::types::ConsensusBlockchainReadHandle, signals::REORG_LOCK};
use monero_serai::transaction::Transaction;
use rayon::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
use tower::{Service, ServiceExt};
use tracing::info;
impl super::BlockchainManager { impl super::BlockchainManager {
/// Handle an incoming [`Block`].
///
/// This function will route to [`Self::handle_incoming_alt_block`] if the block does not follow
/// the top of the main chain.
///
/// Otherwise, this function will validate and add the block to the main chain.
///
/// On success returns a [`bool`] indicating if the block was added to the main chain ([`true`])
/// of an alt-chain ([`false`]).
pub async fn handle_incoming_block( pub async fn handle_incoming_block(
&mut self, &mut self,
block: Block, block: Block,
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
) -> Result<(), anyhow::Error> { ) -> Result<bool, anyhow::Error> {
if block.header.previous != self.cached_blockchain_context.top_hash { if block.header.previous != self.cached_blockchain_context.top_hash {
self.handle_incoming_alt_block(block, prepared_txs).await?; self.handle_incoming_alt_block(block, prepared_txs).await?;
return Ok(()); return Ok(false);
} }
let VerifyBlockResponse::MainChain(verified_block) = self let VerifyBlockResponse::MainChain(verified_block) = self
@ -53,9 +58,18 @@ impl super::BlockchainManager {
self.add_valid_block_to_main_chain(verified_block).await; self.add_valid_block_to_main_chain(verified_block).await;
Ok(()) Ok(true)
} }
/// Handle an incoming [`BlockBatch`].
///
/// This function will route to [`Self::handle_incoming_block_batch_main_chain`] or [`Self::handle_incoming_block_batch_alt_chain`]
/// depending on if the first block in the batch follows from the top of our chain.
///
/// # Panics
///
/// This function will panic if the batch is empty or if any internal service returns an unexpected
/// error that we cannot recover from.
pub async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) { pub async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) {
let (first_block, _) = batch let (first_block, _) = batch
.blocks .blocks
@ -63,26 +77,36 @@ impl super::BlockchainManager {
.expect("Block batch should not be empty"); .expect("Block batch should not be empty");
if first_block.header.previous == self.cached_blockchain_context.top_hash { if first_block.header.previous == self.cached_blockchain_context.top_hash {
self.handle_incoming_block_batch_main_chain(batch) self.handle_incoming_block_batch_main_chain(batch).await;
.await
.expect("TODO");
} else { } else {
self.handle_incoming_block_batch_alt_chain(batch) self.handle_incoming_block_batch_alt_chain(batch).await;
.await
.expect("TODO");
} }
} }
async fn handle_incoming_block_batch_main_chain( /// Handles an incoming [`BlockBatch`] that follows the main chain.
&mut self, ///
batch: BlockBatch, /// This function will handle validating the blocks in the batch and adding them to the blockchain
) -> Result<(), anyhow::Error> { /// database and context cache.
///
/// This function will also handle banning the peer and canceling the block downloader if the
/// block is invalid.
///
/// # Panics
///
/// This function will panic if any internal service returns an unexpected error that we cannot
/// recover from.
async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) {
info!( info!(
"Handling batch to main chain height: {}", "Handling batch to main chain height: {}",
batch.blocks.first().unwrap().0.number().unwrap() batch.blocks.first().unwrap().0.number().unwrap()
); );
let VerifyBlockResponse::MainChainBatchPrepped(prepped) = self let ban_cancel_download = || {
batch.peer_handle.ban_peer(LONG_BAN);
self.stop_current_block_downloader.notify_one();
};
let batch_prep_res = self
.block_verifier_service .block_verifier_service
.ready() .ready()
.await .await
@ -90,21 +114,33 @@ impl super::BlockchainManager {
.call(VerifyBlockRequest::MainChainBatchPrepareBlocks { .call(VerifyBlockRequest::MainChainBatchPrepareBlocks {
blocks: batch.blocks, blocks: batch.blocks,
}) })
.await? .await;
else {
panic!("Incorrect response!"); let prepped_blocks = match batch_prep_res {
Ok(VerifyBlockResponse::MainChainBatchPrepped(prepped_blocks)) => prepped_blocks,
Err(_) => {
ban_cancel_download();
return;
}
_ => panic!("Incorrect response!"),
}; };
for (block, txs) in prepped { for (block, txs) in prepped_blocks {
let VerifyBlockResponse::MainChain(verified_block) = self let verify_res = self
.block_verifier_service .block_verifier_service
.ready() .ready()
.await .await
.expect("TODO") .expect("TODO")
.call(VerifyBlockRequest::MainChainPrepped { block, txs }) .call(VerifyBlockRequest::MainChainPrepped { block, txs })
.await? .await;
else {
panic!("Incorrect response!"); let VerifyBlockResponse::MainChain(verified_block) = match verify_res {
Ok(VerifyBlockResponse::MainChain(verified_block)) => verified_block,
Err(_) => {
ban_cancel_download();
return;
}
_ => panic!("Incorrect response!"),
}; };
self.add_valid_block_to_main_chain(verified_block).await; self.add_valid_block_to_main_chain(verified_block).await;
@ -113,25 +149,60 @@ impl super::BlockchainManager {
Ok(()) Ok(())
} }
async fn handle_incoming_block_batch_alt_chain( /// Handles an incoming [`BlockBatch`] that does not follow the main-chain.
&mut self, ///
batch: BlockBatch, /// This function will handle validating the alt-blocks to add them to our cache and reorging the
) -> Result<(), anyhow::Error> { /// chain if the alt-chain has a higher cumulative difficulty.
///
/// This function will also handle banning the peer and canceling the block downloader if the
/// alt block is invalid or if a reorg fails.
///
/// # Panics
///
/// This function will panic if any internal service returns an unexpected error that we cannot
/// recover from.
async fn handle_incoming_block_batch_alt_chain(&mut self, batch: BlockBatch) {
for (block, txs) in batch.blocks { for (block, txs) in batch.blocks {
let txs = txs // async blocks work as try blocks.
.into_par_iter() let res = async {
.map(|tx| { let txs = txs
let tx = new_tx_verification_data(tx)?; .into_par_iter()
Ok((tx.tx_hash, tx)) .map(|tx| {
}) let tx = new_tx_verification_data(tx)?;
.collect::<Result<_, anyhow::Error>>()?; Ok((tx.tx_hash, tx))
})
.collect::<Result<_, anyhow::Error>>()?;
self.handle_incoming_alt_block(block, txs).await?; self.handle_incoming_alt_block(block, txs).await?;
Ok(())
}
.await;
if let Err(e) = res {
batch.peer_handle.ban_peer(LONG_BAN);
self.stop_current_block_downloader.notify_one();
return;
}
} }
Ok(())
} }
/// Handles an incoming alt [`Block`].
///
/// This function will do some pre-validation of the alt block, then if the cumulative difficulty
/// of the alt chain is higher than the main chain it will attempt a reorg otherwise it will add
/// the alt block to the alt block cache.
///
/// # Errors
///
/// This will return an [`Err`] if:
/// - The alt block was invalid.
/// - An attempt to reorg the chain failed.
///
/// # Panics
///
/// This function will panic if any internal service returns an unexpected error that we cannot
/// recover from.
pub async fn handle_incoming_alt_block( pub async fn handle_incoming_alt_block(
&mut self, &mut self,
block: Block, block: Block,
@ -157,8 +228,6 @@ impl super::BlockchainManager {
> self.cached_blockchain_context.cumulative_difficulty > self.cached_blockchain_context.cumulative_difficulty
{ {
self.try_do_reorg(alt_block_info).await?; self.try_do_reorg(alt_block_info).await?;
// TODO: ban the peer if the reorg failed.
return Ok(()); return Ok(());
} }
@ -172,6 +241,21 @@ impl super::BlockchainManager {
Ok(()) Ok(())
} }
/// Attempt a re-org with the given top block of the alt-chain.
///
/// This function will take a write lock on [`REORG_LOCK`] and then set up the blockchain database
/// and context cache to verify the alt-chain. It will then attempt to verify and add each block
/// in the alt-chain to tha main-chain. Releasing the lock on [`REORG_LOCK`] when finished.
///
/// # Errors
///
/// This function will return an [`Err`] if the re-org was unsuccessful, if this happens the chain
/// will be returned back into its state it was at when then function was called.
///
/// # Panics
///
/// This function will panic if any internal service returns an unexpected error that we cannot
/// recover from.
async fn try_do_reorg( async fn try_do_reorg(
&mut self, &mut self,
top_alt_block: AltBlockInformation, top_alt_block: AltBlockInformation,
@ -230,6 +314,21 @@ impl super::BlockchainManager {
} }
} }
/// Verify and add a list of [`AltBlockInformation`]s to the main-chain.
///
/// This function assumes the first [`AltBlockInformation`] is the next block in the blockchain
/// for the blockchain database and the context cache, or in other words that the blockchain database
/// and context cache has had the top blocks popped to where the alt-chain meets the main-chain.
///
/// # Errors
///
/// This function will return an [`Err`] if the alt-blocks were invalid, in this case the re-org should
/// be aborted and the chain should be returned to its previous state.
///
/// # Panics
///
/// This function will panic if any internal service returns an unexpected error that we cannot
/// recover from.
async fn verify_add_alt_blocks_to_main_chain( async fn verify_add_alt_blocks_to_main_chain(
&mut self, &mut self,
alt_blocks: Vec<AltBlockInformation>, alt_blocks: Vec<AltBlockInformation>,
@ -263,6 +362,15 @@ impl super::BlockchainManager {
Ok(()) Ok(())
} }
/// Adds a [`VerifiedBlockInformation`] to the main-chain.
///
/// This function will update the blockchain database and the context cache, it will also
/// update [`Self::cached_blockchain_context`].
///
/// # Panics
///
/// This function will panic if any internal service returns an unexpected error that we cannot
/// recover from.
pub async fn add_valid_block_to_main_chain( pub async fn add_valid_block_to_main_chain(
&mut self, &mut self,
verified_block: VerifiedBlockInformation, verified_block: VerifiedBlockInformation,

View file

@ -1,7 +1,11 @@
use std::pin::pin;
use std::time::Duration; use std::time::Duration;
use futures::StreamExt; use futures::StreamExt;
use tokio::{sync::mpsc, time::sleep}; use tokio::{
sync::{mpsc, Notify},
time::sleep,
};
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use tracing::instrument; use tracing::instrument;
@ -27,6 +31,7 @@ pub async fn syncer<C, CN>(
our_chain: CN, our_chain: CN,
clearnet_interface: NetworkInterface<ClearNet>, clearnet_interface: NetworkInterface<ClearNet>,
incoming_block_batch_tx: mpsc::Sender<BlockBatch>, incoming_block_batch_tx: mpsc::Sender<BlockBatch>,
stop_current_block_downloader: Notify,
block_downloader_config: BlockDownloaderConfig, block_downloader_config: BlockDownloaderConfig,
) -> Result<(), SyncerError> ) -> Result<(), SyncerError>
where where
@ -82,10 +87,18 @@ where
let mut block_batch_stream = let mut block_batch_stream =
clearnet_interface.block_downloader(our_chain.clone(), block_downloader_config); clearnet_interface.block_downloader(our_chain.clone(), block_downloader_config);
while let Some(batch) = block_batch_stream.next().await { loop {
tracing::debug!("Got batch, len: {}", batch.blocks.len()); tokio::select! {
if incoming_block_batch_tx.send(batch).await.is_err() { _ = stop_current_block_downloader.notified() => {
return Err(SyncerError::IncomingBlockChannelClosed); tracing::info!("Stopping block downloader");
break;
}
Some(batch) = block_batch_stream.next() => {
tracing::debug!("Got batch, len: {}", batch.blocks.len());
if incoming_block_batch_tx.send(batch).await.is_err() {
return Err(SyncerError::IncomingBlockChannelClosed);
}
}
} }
} }
} }

View file

@ -10,13 +10,13 @@ pub(crate) const MAX_SEED_CONNECTIONS: usize = 3;
pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5); pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5);
/// The durations of a short ban. /// The durations of a short ban.
pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10); pub const SHORT_BAN: Duration = Duration::from_secs(60 * 10);
/// The durations of a medium ban. /// The durations of a medium ban.
pub(crate) const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24); pub const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24);
/// The durations of a long ban. /// The durations of a long ban.
pub(crate) const LONG_BAN: Duration = Duration::from_secs(60 * 60 * 24 * 7); pub const LONG_BAN: Duration = Duration::from_secs(60 * 60 * 24 * 7);
/// The default amount of time between inbound diffusion flushes. /// The default amount of time between inbound diffusion flushes.
pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5); pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5);

Binary file not shown.