mirror of
https://github.com/Cuprate/cuprate.git
synced 2024-12-23 03:59:31 +00:00
enums instead of bools
This commit is contained in:
parent
375a1e1826
commit
f50d921459
4 changed files with 54 additions and 35 deletions
|
@ -4,13 +4,13 @@
|
|||
//! blockchain manager.
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::{Mutex, OnceLock, LazyLock},
|
||||
sync::{LazyLock, Mutex, OnceLock},
|
||||
};
|
||||
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tower::{Service, ServiceExt};
|
||||
use monero_serai::{block::Block, transaction::Transaction};
|
||||
use rayon::prelude::*;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tower::{Service, ServiceExt};
|
||||
|
||||
use cuprate_blockchain::service::BlockchainReadHandle;
|
||||
use cuprate_consensus::transactions::new_tx_verification_data;
|
||||
|
@ -21,12 +21,14 @@ use cuprate_types::{
|
|||
};
|
||||
|
||||
use crate::{
|
||||
blockchain::manager::BlockchainManagerCommand, constants::PANIC_CRITICAL_SERVICE_ERROR,
|
||||
blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk},
|
||||
constants::PANIC_CRITICAL_SERVICE_ERROR,
|
||||
};
|
||||
|
||||
/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manager.
|
||||
///
|
||||
/// This channel is initialized in [`init_blockchain_manager`](super::manager::init_blockchain_manager).
|
||||
/// This channel is initialized in [`init_blockchain_manager`](super::manager::init_blockchain_manager), the functions
|
||||
/// in this file document what happens if this is not initialized when they are called.
|
||||
pub(super) static COMMAND_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();
|
||||
|
||||
/// An error that can be returned from [`handle_incoming_block`].
|
||||
|
@ -62,7 +64,7 @@ pub async fn handle_incoming_block(
|
|||
block: Block,
|
||||
given_txs: Vec<Transaction>,
|
||||
blockchain_read_handle: &mut BlockchainReadHandle,
|
||||
) -> Result<bool, IncomingBlockError> {
|
||||
) -> Result<IncomingBlockOk, IncomingBlockError> {
|
||||
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
|
||||
///
|
||||
/// This lock prevents sending the same block to the blockchain manager from multiple connections
|
||||
|
@ -88,7 +90,7 @@ pub async fn handle_incoming_block(
|
|||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
{
|
||||
return Ok(false);
|
||||
return Ok(IncomingBlockOk::AlreadyHave);
|
||||
}
|
||||
|
||||
// TODO: remove this when we have a working tx-pool.
|
||||
|
@ -110,15 +112,14 @@ pub async fn handle_incoming_block(
|
|||
.map_err(IncomingBlockError::InvalidBlock)?;
|
||||
|
||||
let Some(incoming_block_tx) = COMMAND_TX.get() else {
|
||||
// We could still be starting up the blockchain manager, so just return this as there is nothing
|
||||
// else we can do.
|
||||
return Ok(false);
|
||||
// We could still be starting up the blockchain manager.
|
||||
return Ok(IncomingBlockOk::NotReady);
|
||||
};
|
||||
|
||||
// Add the blocks hash to the blocks being handled.
|
||||
if !BLOCKS_BEING_HANDLED.lock().unwrap().insert(block_hash) {
|
||||
// If another place is already adding this block then we can stop.
|
||||
return Ok(false);
|
||||
return Ok(IncomingBlockOk::AlreadyHave);
|
||||
}
|
||||
|
||||
// From this point on we MUST not early return without removing the block hash from `BLOCKS_BEING_HANDLED`.
|
||||
|
|
|
@ -35,7 +35,7 @@ use crate::{
|
|||
mod commands;
|
||||
mod handler;
|
||||
|
||||
pub use commands::BlockchainManagerCommand;
|
||||
pub use commands::{BlockchainManagerCommand, IncomingBlockOk};
|
||||
|
||||
/// Initialize the blockchain manager.
|
||||
///
|
||||
|
|
|
@ -15,6 +15,18 @@ pub enum BlockchainManagerCommand {
|
|||
/// All the transactions defined in [`Block::transactions`].
|
||||
prepped_txs: HashMap<[u8; 32], TransactionVerificationData>,
|
||||
/// The channel to send the response down.
|
||||
response_tx: oneshot::Sender<Result<bool, anyhow::Error>>,
|
||||
response_tx: oneshot::Sender<Result<IncomingBlockOk, anyhow::Error>>,
|
||||
},
|
||||
}
|
||||
|
||||
/// The [`Ok`] response for an incoming block.
|
||||
pub enum IncomingBlockOk {
|
||||
/// The block was added to the main-chain.
|
||||
AddedToMainChain,
|
||||
/// The blockchain manager is not ready yet.
|
||||
NotReady,
|
||||
/// The block was added to an alt-chain.
|
||||
AddedToAltChain,
|
||||
/// We already have the block.
|
||||
AlreadyHave,
|
||||
}
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
//! The blockchain manager handler functions.
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::{TryFutureExt, TryStreamExt};
|
||||
use monero_serai::{block::Block, transaction::Transaction};
|
||||
use rayon::prelude::*;
|
||||
use std::ops::ControlFlow;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tower::{Service, ServiceExt};
|
||||
use tracing::info;
|
||||
|
||||
|
@ -22,6 +22,7 @@ use cuprate_types::{
|
|||
AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation,
|
||||
};
|
||||
|
||||
use crate::blockchain::manager::commands::IncomingBlockOk;
|
||||
use crate::{
|
||||
blockchain::{
|
||||
manager::commands::BlockchainManagerCommand, types::ConsensusBlockchainReadHandle,
|
||||
|
@ -73,11 +74,10 @@ impl super::BlockchainManager {
|
|||
&mut self,
|
||||
block: Block,
|
||||
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
|
||||
) -> Result<bool, anyhow::Error> {
|
||||
) -> Result<IncomingBlockOk, anyhow::Error> {
|
||||
if block.header.previous != self.cached_blockchain_context.top_hash {
|
||||
self.handle_incoming_alt_block(block, prepared_txs).await?;
|
||||
|
||||
return Ok(false);
|
||||
return Ok(IncomingBlockOk::AddedToAltChain);
|
||||
}
|
||||
|
||||
let VerifyBlockResponse::MainChain(verified_block) = self
|
||||
|
@ -91,7 +91,7 @@ impl super::BlockchainManager {
|
|||
})
|
||||
.await?
|
||||
else {
|
||||
panic!("Incorrect response!");
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
let block_blob = Bytes::copy_from_slice(&verified_block.block_blob);
|
||||
|
@ -100,7 +100,7 @@ impl super::BlockchainManager {
|
|||
self.broadcast_block(block_blob, self.cached_blockchain_context.chain_height)
|
||||
.await;
|
||||
|
||||
Ok(true)
|
||||
Ok(IncomingBlockOk::AddedToMainChain)
|
||||
}
|
||||
|
||||
/// Handle an incoming [`BlockBatch`].
|
||||
|
@ -160,7 +160,7 @@ impl super::BlockchainManager {
|
|||
self.stop_current_block_downloader.notify_one();
|
||||
return;
|
||||
}
|
||||
_ => panic!("Incorrect response!"),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
for (block, txs) in prepped_blocks {
|
||||
|
@ -179,7 +179,7 @@ impl super::BlockchainManager {
|
|||
self.stop_current_block_downloader.notify_one();
|
||||
return;
|
||||
}
|
||||
_ => panic!("Incorrect response!"),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
self.add_valid_block_to_main_chain(verified_block).await;
|
||||
|
@ -226,16 +226,14 @@ impl super::BlockchainManager {
|
|||
self.stop_current_block_downloader.notify_one();
|
||||
return;
|
||||
}
|
||||
// the chain was reorged
|
||||
Ok(true) => {
|
||||
Ok(AddAltBlock::Reorged) => {
|
||||
// Collect the remaining blocks and add them to the main chain instead.
|
||||
batch.blocks = blocks.collect();
|
||||
self.handle_incoming_block_batch_main_chain(batch).await;
|
||||
|
||||
return;
|
||||
}
|
||||
// continue adding alt blocks.
|
||||
Ok(false) => (),
|
||||
Ok(AddAltBlock::Cached) => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -258,11 +256,11 @@ impl super::BlockchainManager {
|
|||
///
|
||||
/// This function will panic if any internal service returns an unexpected error that we cannot
|
||||
/// recover from.
|
||||
pub async fn handle_incoming_alt_block(
|
||||
async fn handle_incoming_alt_block(
|
||||
&mut self,
|
||||
block: Block,
|
||||
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
|
||||
) -> Result<bool, anyhow::Error> {
|
||||
) -> Result<AddAltBlock, anyhow::Error> {
|
||||
let VerifyBlockResponse::AltChain(alt_block_info) = self
|
||||
.block_verifier_service
|
||||
.ready()
|
||||
|
@ -274,7 +272,7 @@ impl super::BlockchainManager {
|
|||
})
|
||||
.await?
|
||||
else {
|
||||
panic!("Incorrect response!");
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
// TODO: check in consensus crate if alt block with this hash already exists.
|
||||
|
@ -284,7 +282,7 @@ impl super::BlockchainManager {
|
|||
> self.cached_blockchain_context.cumulative_difficulty
|
||||
{
|
||||
self.try_do_reorg(alt_block_info).await?;
|
||||
return Ok(true);
|
||||
return Ok(AddAltBlock::Reorged);
|
||||
}
|
||||
|
||||
self.blockchain_write_handle
|
||||
|
@ -294,7 +292,7 @@ impl super::BlockchainManager {
|
|||
.call(BlockchainWriteRequest::WriteAltBlock(alt_block_info))
|
||||
.await?;
|
||||
|
||||
Ok(false)
|
||||
Ok(AddAltBlock::Cached)
|
||||
}
|
||||
|
||||
/// Attempt a re-org with the given top block of the alt-chain.
|
||||
|
@ -328,7 +326,7 @@ impl super::BlockchainManager {
|
|||
))
|
||||
.await?
|
||||
else {
|
||||
panic!("Incorrect response!");
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
alt_blocks.push(top_alt_block);
|
||||
|
@ -347,7 +345,7 @@ impl super::BlockchainManager {
|
|||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
else {
|
||||
panic!("Incorrect response!");
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
self.blockchain_context_service
|
||||
|
@ -409,7 +407,7 @@ impl super::BlockchainManager {
|
|||
})
|
||||
.await?
|
||||
else {
|
||||
panic!("Incorrect response!");
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
self.add_valid_block_to_main_chain(verified_block).await;
|
||||
|
@ -465,9 +463,17 @@ impl super::BlockchainManager {
|
|||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
else {
|
||||
panic!("Incorrect response!");
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone();
|
||||
}
|
||||
}
|
||||
|
||||
/// The result from successfully adding an alt-block.
|
||||
enum AddAltBlock {
|
||||
/// The alt-block was cached.
|
||||
Cached,
|
||||
/// The chain was reorged.
|
||||
Reorged,
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue