clean up the blockchain manger
Some checks are pending
Audit / audit (push) Waiting to run
Deny / audit (push) Waiting to run

This commit is contained in:
Boog900 2024-10-03 03:36:33 +01:00
parent 69f9d84ae1
commit caaeceda2e
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
5 changed files with 126 additions and 104 deletions

View file

@ -17,19 +17,19 @@ use cuprate_types::{
VerifiedBlockInformation,
};
mod free;
mod interface;
mod manager;
mod syncer;
mod types;
use crate::blockchain::free::INCOMING_BLOCK_TX;
use crate::blockchain::interface::INCOMING_BLOCK_TX;
use manager::BlockchainManager;
use types::{
ChainService, ConcreteBlockVerifierService, ConcreteTxVerifierService,
ConsensusBlockchainReadHandle,
};
pub use free::{handle_incoming_block, IncomingBlockError};
pub use interface::{handle_incoming_block, IncomingBlockError};
/// Checks if the genesis block is in the blockchain and adds it if not.
pub async fn check_add_genesis(
@ -100,40 +100,3 @@ pub async fn init_consensus(
Ok((block_verifier_svc, tx_verifier_svc, ctx_service))
}
/// Initializes the blockchain manager task and syncer.
pub async fn init_blockchain_manager(
clearnet_interface: NetworkInterface<ClearNet>,
blockchain_write_handle: BlockchainWriteHandle,
blockchain_read_handle: BlockchainReadHandle,
blockchain_context_service: BlockChainContextService,
block_verifier_service: ConcreteBlockVerifierService,
block_downloader_config: BlockDownloaderConfig,
) {
let (batch_tx, batch_rx) = mpsc::channel(1);
let stop_current_block_downloader = Arc::new(Notify::new());
let (command_tx, command_rx) = mpsc::channel(1);
INCOMING_BLOCK_TX.set(command_tx).unwrap();
tokio::spawn(syncer::syncer(
blockchain_context_service.clone(),
ChainService(blockchain_read_handle.clone()),
clearnet_interface.clone(),
batch_tx,
stop_current_block_downloader.clone(),
block_downloader_config,
));
let manager = BlockchainManager::new(
blockchain_write_handle,
blockchain_read_handle,
blockchain_context_service,
block_verifier_service,
stop_current_block_downloader,
clearnet_interface.broadcast_svc(),
)
.await;
tokio::spawn(manager.run(batch_rx, command_rx));
}

View file

@ -1,8 +1,13 @@
pub(super) mod commands;
mod handler;
use crate::blockchain::interface::INCOMING_BLOCK_TX;
use crate::blockchain::manager::commands::BlockchainManagerCommand;
use crate::blockchain::types::ConsensusBlockchainReadHandle;
use crate::blockchain::types::ChainService;
use crate::blockchain::{
syncer,
types::{ConcreteBlockVerifierService, ConsensusBlockchainReadHandle},
};
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::context::RawBlockChainContext;
use cuprate_consensus::{
@ -10,8 +15,8 @@ use cuprate_consensus::{
BlockVerifierService, ExtendedConsensusError, TxVerifierService, VerifyBlockRequest,
VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse,
};
use cuprate_p2p::block_downloader::BlockBatch;
use cuprate_p2p::BroadcastSvc;
use cuprate_p2p::block_downloader::{BlockBatch, BlockDownloaderConfig};
use cuprate_p2p::{BroadcastSvc, NetworkInterface};
use cuprate_p2p_core::ClearNet;
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
use cuprate_types::{Chain, TransactionVerificationData};
@ -25,55 +30,86 @@ use tower::{Service, ServiceExt};
use tracing::error;
use tracing_subscriber::fmt::time::FormatTime;
pub struct BlockchainManager {
pub async fn init_blockchain_manger(
clearnet_interface: NetworkInterface<ClearNet>,
blockchain_write_handle: BlockchainWriteHandle,
blockchain_read_handle: BlockchainReadHandle,
mut blockchain_context_service: BlockChainContextService,
block_verifier_service: ConcreteBlockVerifierService,
block_downloader_config: BlockDownloaderConfig,
) {
let (batch_tx, batch_rx) = mpsc::channel(1);
let stop_current_block_downloader = Arc::new(Notify::new());
let (command_tx, command_rx) = mpsc::channel(1);
INCOMING_BLOCK_TX.set(command_tx).unwrap();
tokio::spawn(syncer::syncer(
blockchain_context_service.clone(),
ChainService(blockchain_read_handle.clone()),
clearnet_interface.clone(),
batch_tx,
stop_current_block_downloader.clone(),
block_downloader_config,
));
let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service
.ready()
.await
.expect("TODO")
.call(BlockChainContextRequest::GetContext)
.await
.expect("TODO")
else {
panic!("Blockchain context service returned wrong response!");
};
let manger = BlockchainManager {
blockchain_write_handle,
blockchain_read_handle,
blockchain_context_service,
cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(),
block_verifier_service,
stop_current_block_downloader,
broadcast_svc,
};
tokio::spawn(manger.run(batch_rx, command_rx));
}
/// The blockchain manager.
///
/// This handles all mutation of the blockchain, anything that changes the state of the blockchain must
/// go through this.
///
/// Other parts of Cuprate can interface with this by using the functions in [`interface`](super::interface).
pub struct BlockchainManager {
/// The [`BlockchainWriteHandle`], this is the _only_ part of Cuprate where a [`BlockchainWriteHandle`]
/// is held.
blockchain_write_handle: BlockchainWriteHandle,
/// A [`BlockchainReadHandle`].
blockchain_read_handle: BlockchainReadHandle,
// TODO: Improve the API of the cache service.
// TODO: rename the cache service -> `BlockchainContextService`.
/// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve
/// values without needing to go to a [`BlockchainReadHandle`].
blockchain_context_service: BlockChainContextService,
/// A cached context representing the current state.
cached_blockchain_context: RawBlockChainContext,
/// The block verifier service, to verify incoming blocks.
block_verifier_service: BlockVerifierService<
BlockChainContextService,
TxVerifierService<ConsensusBlockchainReadHandle>,
ConsensusBlockchainReadHandle,
>,
/// A [`Notify`] to tell the [syncer](syncer::syncer) that we want to cancel this current download
/// attempt.
stop_current_block_downloader: Arc<Notify>,
/// The broadcast service, to broadcast new blocks.
broadcast_svc: BroadcastSvc<ClearNet>,
}
impl BlockchainManager {
pub async fn new(
blockchain_write_handle: BlockchainWriteHandle,
blockchain_read_handle: BlockchainReadHandle,
mut blockchain_context_service: BlockChainContextService,
block_verifier_service: BlockVerifierService<
BlockChainContextService,
TxVerifierService<ConsensusBlockchainReadHandle>,
ConsensusBlockchainReadHandle,
>,
stop_current_block_downloader: Arc<Notify>,
broadcast_svc: BroadcastSvc<ClearNet>,
) -> Self {
let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service
.ready()
.await
.expect("TODO")
.call(BlockChainContextRequest::GetContext)
.await
.expect("TODO")
else {
panic!("Blockchain context service returned wrong response!");
};
Self {
blockchain_write_handle,
blockchain_read_handle,
blockchain_context_service,
cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(),
block_verifier_service,
stop_current_block_downloader,
broadcast_svc,
}
}
pub async fn run(
mut self,
mut block_batch_rx: mpsc::Receiver<BlockBatch>,

View file

@ -21,6 +21,7 @@ use cuprate_types::{
};
use crate::blockchain::manager::commands::BlockchainManagerCommand;
use crate::constants::PANIC_CRITICAL_SERVICE_ERROR;
use crate::{blockchain::types::ConsensusBlockchainReadHandle, signals::REORG_LOCK};
impl super::BlockchainManager {
@ -42,13 +43,13 @@ impl super::BlockchainManager {
self.broadcast_svc
.ready()
.await
.expect("TODO")
.expect("Broadcast service cannot error.")
.call(BroadcastRequest::Block {
block_bytes,
current_blockchain_height: usize_to_u64(blockchain_height),
})
.await
.expect("TODO");
.expect("Broadcast service cannot error.");
}
/// Handle an incoming [`Block`].
@ -75,7 +76,7 @@ impl super::BlockchainManager {
.block_verifier_service
.ready()
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(VerifyBlockRequest::MainChain {
block,
prepared_txs,
@ -102,7 +103,7 @@ impl super::BlockchainManager {
/// # Panics
///
/// This function will panic if the batch is empty or if any internal service returns an unexpected
/// error that we cannot recover from.
/// error that we cannot recover from or if the incoming batch contains no blocks.
pub async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) {
let (first_block, _) = batch
.blocks
@ -127,7 +128,7 @@ impl super::BlockchainManager {
/// # Panics
///
/// This function will panic if any internal service returns an unexpected error that we cannot
/// recover from.
/// recover from or if the incoming batch contains no blocks.
async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) {
info!(
"Handling batch to main chain height: {}",
@ -138,7 +139,7 @@ impl super::BlockchainManager {
.block_verifier_service
.ready()
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(VerifyBlockRequest::MainChainBatchPrepareBlocks {
blocks: batch.blocks,
})
@ -159,7 +160,7 @@ impl super::BlockchainManager {
.block_verifier_service
.ready()
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(VerifyBlockRequest::MainChainPrepped { block, txs })
.await;
@ -189,8 +190,12 @@ impl super::BlockchainManager {
///
/// This function will panic if any internal service returns an unexpected error that we cannot
/// recover from.
async fn handle_incoming_block_batch_alt_chain(&mut self, batch: BlockBatch) {
for (block, txs) in batch.blocks {
async fn handle_incoming_block_batch_alt_chain(&mut self, mut batch: BlockBatch) {
// TODO: this needs testing (this whole section does but this specifically).
let mut blocks = batch.blocks.into_iter();
while let Some((block, txs)) = blocks.next() {
// async blocks work as try blocks.
let res = async {
let txs = txs
@ -201,16 +206,28 @@ impl super::BlockchainManager {
})
.collect::<Result<_, anyhow::Error>>()?;
self.handle_incoming_alt_block(block, txs).await?;
let reorged = self.handle_incoming_alt_block(block, txs).await?;
Ok::<_, anyhow::Error>(())
Ok::<_, anyhow::Error>(reorged)
}
.await;
if let Err(e) = res {
batch.peer_handle.ban_peer(LONG_BAN);
self.stop_current_block_downloader.notify_one();
return;
match res {
Err(e) => {
batch.peer_handle.ban_peer(LONG_BAN);
self.stop_current_block_downloader.notify_one();
return;
}
// the chain was reorged
Ok(true) => {
// Collect the remaining blocks and add them to the main chain instead.
batch.blocks = blocks.collect();
self.handle_incoming_block_batch_main_chain(batch).await;
return;
}
// continue adding alt blocks.
Ok(false) => (),
}
}
}
@ -221,6 +238,8 @@ impl super::BlockchainManager {
/// of the alt chain is higher than the main chain it will attempt a reorg otherwise it will add
/// the alt block to the alt block cache.
///
/// This function returns a [`bool`] indicating if the chain was reorganised ([`true`]) or not ([`false`]).
///
/// # Errors
///
/// This will return an [`Err`] if:
@ -235,12 +254,12 @@ impl super::BlockchainManager {
&mut self,
block: Block,
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
) -> Result<(), anyhow::Error> {
) -> Result<bool, anyhow::Error> {
let VerifyBlockResponse::AltChain(alt_block_info) = self
.block_verifier_service
.ready()
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(VerifyBlockRequest::AltChain {
block,
prepared_txs,
@ -250,23 +269,24 @@ impl super::BlockchainManager {
panic!("Incorrect response!");
};
// TODO: check in consensus crate if alt block already exists.
// TODO: check in consensus crate if alt block with this hash already exists.
// If this alt chain
if alt_block_info.cumulative_difficulty
> self.cached_blockchain_context.cumulative_difficulty
{
self.try_do_reorg(alt_block_info).await?;
return Ok(());
return Ok(true);
}
self.blockchain_write_handle
.ready()
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockchainWriteRequest::WriteAltBlock(alt_block_info))
.await?;
Ok(())
Ok(false)
}
/// Attempt a re-org with the given top block of the alt-chain.
@ -294,7 +314,7 @@ impl super::BlockchainManager {
.blockchain_read_handle
.ready()
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockchainReadRequest::AltBlocksInChain(
top_alt_block.chain_id,
))
@ -312,12 +332,12 @@ impl super::BlockchainManager {
.blockchain_write_handle
.ready()
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockchainWriteRequest::PopBlocks(
current_main_chain_height - split_height + 1,
))
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
panic!("Incorrect response!");
};
@ -325,12 +345,12 @@ impl super::BlockchainManager {
self.blockchain_context_service
.ready()
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::PopBlocks {
numb_blocks: current_main_chain_height - split_height + 1,
})
.await
.expect("TODO");
.expect(PANIC_CRITICAL_SERVICE_ERROR);
let reorg_res = self.verify_add_alt_blocks_to_main_chain(alt_blocks).await;

View file

@ -14,6 +14,9 @@ pub const VERSION_BUILD: &str = if cfg!(debug_assertions) {
formatcp!("{VERSION}-release")
};
pub const PANIC_CRITICAL_SERVICE_ERROR: &str =
"A service critical to Cuprate's function returned an unexpected error.";
#[cfg(test)]
mod test {
use super::*;