mirror of
https://github.com/hinto-janai/cuprate.git
synced 2025-01-22 02:34:29 +00:00
Merge branch 'cuprated-blockchain' into p2p-request-handler
This commit is contained in:
commit
048af975f3
10 changed files with 166 additions and 137 deletions
|
@ -19,6 +19,7 @@ use cuprate_types::{
|
||||||
|
|
||||||
use crate::constants::PANIC_CRITICAL_SERVICE_ERROR;
|
use crate::constants::PANIC_CRITICAL_SERVICE_ERROR;
|
||||||
|
|
||||||
|
mod chain_service;
|
||||||
pub mod interface;
|
pub mod interface;
|
||||||
mod manager;
|
mod manager;
|
||||||
mod syncer;
|
mod syncer;
|
||||||
|
|
72
binaries/cuprated/src/blockchain/chain_service.rs
Normal file
72
binaries/cuprated/src/blockchain/chain_service.rs
Normal file
|
@ -0,0 +1,72 @@
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
|
||||||
|
use tower::Service;
|
||||||
|
|
||||||
|
use cuprate_blockchain::service::BlockchainReadHandle;
|
||||||
|
use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse};
|
||||||
|
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
|
||||||
|
|
||||||
|
/// That service that allows retrieving the chain state to give to the P2P crates, so we can figure out
|
||||||
|
/// what blocks we need.
|
||||||
|
///
|
||||||
|
/// This has a more minimal interface than [`BlockchainReadRequest`] to make using the p2p crates easier.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ChainService(pub BlockchainReadHandle);
|
||||||
|
|
||||||
|
impl Service<ChainSvcRequest> for ChainService {
|
||||||
|
type Response = ChainSvcResponse;
|
||||||
|
type Error = tower::BoxError;
|
||||||
|
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
self.0.poll_ready(cx).map_err(Into::into)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: ChainSvcRequest) -> Self::Future {
|
||||||
|
let map_res = |res: BlockchainResponse| match res {
|
||||||
|
BlockchainResponse::CompactChainHistory {
|
||||||
|
block_ids,
|
||||||
|
cumulative_difficulty,
|
||||||
|
} => ChainSvcResponse::CompactHistory {
|
||||||
|
block_ids,
|
||||||
|
cumulative_difficulty,
|
||||||
|
},
|
||||||
|
BlockchainResponse::FindFirstUnknown(res) => ChainSvcResponse::FindFirstUnknown(res),
|
||||||
|
_ => unreachable!(),
|
||||||
|
};
|
||||||
|
|
||||||
|
match req {
|
||||||
|
ChainSvcRequest::CompactHistory => self
|
||||||
|
.0
|
||||||
|
.call(BlockchainReadRequest::CompactChainHistory)
|
||||||
|
.map_ok(map_res)
|
||||||
|
.map_err(Into::into)
|
||||||
|
.boxed(),
|
||||||
|
ChainSvcRequest::FindFirstUnknown(req) => self
|
||||||
|
.0
|
||||||
|
.call(BlockchainReadRequest::FindFirstUnknown(req))
|
||||||
|
.map_ok(map_res)
|
||||||
|
.map_err(Into::into)
|
||||||
|
.boxed(),
|
||||||
|
ChainSvcRequest::CumulativeDifficulty => self
|
||||||
|
.0
|
||||||
|
.call(BlockchainReadRequest::CompactChainHistory)
|
||||||
|
.map_ok(|res| {
|
||||||
|
// TODO create a custom request instead of hijacking this one.
|
||||||
|
// TODO: use the context cache.
|
||||||
|
let BlockchainResponse::CompactChainHistory {
|
||||||
|
cumulative_difficulty,
|
||||||
|
..
|
||||||
|
} = res
|
||||||
|
else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
|
||||||
|
ChainSvcResponse::CumulativeDifficulty(cumulative_difficulty)
|
||||||
|
})
|
||||||
|
.map_err(Into::into)
|
||||||
|
.boxed(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,10 +1,10 @@
|
||||||
//! The blockchain manger interface.
|
//! The blockchain manager interface.
|
||||||
//!
|
//!
|
||||||
//! This module contains all the functions to mutate the blockchain's state in any way, through the
|
//! This module contains all the functions to mutate the blockchain's state in any way, through the
|
||||||
//! blockchain manger.
|
//! blockchain manager.
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
sync::{Mutex, OnceLock},
|
sync::{LazyLock, Mutex, OnceLock},
|
||||||
};
|
};
|
||||||
|
|
||||||
use monero_serai::{block::Block, transaction::Transaction};
|
use monero_serai::{block::Block, transaction::Transaction};
|
||||||
|
@ -21,18 +21,15 @@ use cuprate_types::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
blockchain::manager::BlockchainManagerCommand, constants::PANIC_CRITICAL_SERVICE_ERROR,
|
blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk},
|
||||||
|
constants::PANIC_CRITICAL_SERVICE_ERROR,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manger.
|
/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manager.
|
||||||
pub static COMMAND_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();
|
|
||||||
|
|
||||||
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
|
|
||||||
///
|
///
|
||||||
/// This is used over something like a dashmap as we expect a lot of collisions in a short amount of
|
/// This channel is initialized in [`init_blockchain_manager`](super::manager::init_blockchain_manager), the functions
|
||||||
/// time for new blocks so we would lose the benefit of sharded locks. A dashmap is made up of `RwLocks`
|
/// in this file document what happens if this is not initialized when they are called.
|
||||||
/// which are also more expensive than `Mutex`s.
|
pub(super) static COMMAND_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();
|
||||||
pub static BLOCKS_BEING_HANDLED: OnceLock<Mutex<HashSet<[u8; 32]>>> = OnceLock::new();
|
|
||||||
|
|
||||||
/// An error that can be returned from [`handle_incoming_block`].
|
/// An error that can be returned from [`handle_incoming_block`].
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
@ -52,10 +49,7 @@ pub enum IncomingBlockError {
|
||||||
|
|
||||||
/// Try to add a new block to the blockchain.
|
/// Try to add a new block to the blockchain.
|
||||||
///
|
///
|
||||||
/// This returns a [`bool`] indicating if the block was added to the main-chain ([`true`]) or an alt-chain
|
/// On success returns [`IncomingBlockOk`].
|
||||||
/// ([`false`]).
|
|
||||||
///
|
|
||||||
/// If we already knew about this block or the blockchain manger is not setup yet `Ok(false)` is returned.
|
|
||||||
///
|
///
|
||||||
/// # Errors
|
/// # Errors
|
||||||
///
|
///
|
||||||
|
@ -67,7 +61,17 @@ pub async fn handle_incoming_block(
|
||||||
block: Block,
|
block: Block,
|
||||||
given_txs: HashMap<[u8; 32], TransactionVerificationData>,
|
given_txs: HashMap<[u8; 32], TransactionVerificationData>,
|
||||||
blockchain_read_handle: &mut BlockchainReadHandle,
|
blockchain_read_handle: &mut BlockchainReadHandle,
|
||||||
) -> Result<bool, IncomingBlockError> {
|
) -> Result<IncomingBlockOk, IncomingBlockError> {
|
||||||
|
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
|
||||||
|
///
|
||||||
|
/// This lock prevents sending the same block to the blockchain manager from multiple connections
|
||||||
|
/// before one of them actually gets added to the chain, allowing peers to do other things.
|
||||||
|
///
|
||||||
|
/// This is used over something like a dashmap as we expect a lot of collisions in a short amount of
|
||||||
|
/// time for new blocks, so we would lose the benefit of sharded locks. A dashmap is made up of `RwLocks`
|
||||||
|
/// which are also more expensive than `Mutex`s.
|
||||||
|
static BLOCKS_BEING_HANDLED: LazyLock<Mutex<HashSet<[u8; 32]>>> =
|
||||||
|
LazyLock::new(|| Mutex::new(HashSet::new()));
|
||||||
// FIXME: we should look in the tx-pool for txs when that is ready.
|
// FIXME: we should look in the tx-pool for txs when that is ready.
|
||||||
|
|
||||||
if !block_exists(block.header.previous, blockchain_read_handle)
|
if !block_exists(block.header.previous, blockchain_read_handle)
|
||||||
|
@ -83,7 +87,7 @@ pub async fn handle_incoming_block(
|
||||||
.await
|
.await
|
||||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
{
|
{
|
||||||
return Ok(false);
|
return Ok(IncomingBlockOk::AlreadyHave);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: remove this when we have a working tx-pool.
|
// TODO: remove this when we have a working tx-pool.
|
||||||
|
@ -97,20 +101,14 @@ pub async fn handle_incoming_block(
|
||||||
// TODO: check we actually got given the right txs.
|
// TODO: check we actually got given the right txs.
|
||||||
|
|
||||||
let Some(incoming_block_tx) = COMMAND_TX.get() else {
|
let Some(incoming_block_tx) = COMMAND_TX.get() else {
|
||||||
// We could still be starting up the blockchain manger, so just return this as there is nothing
|
// We could still be starting up the blockchain manager.
|
||||||
// else we can do.
|
return Ok(IncomingBlockOk::NotReady);
|
||||||
return Ok(false);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Add the blocks hash to the blocks being handled.
|
// Add the blocks hash to the blocks being handled.
|
||||||
if !BLOCKS_BEING_HANDLED
|
if !BLOCKS_BEING_HANDLED.lock().unwrap().insert(block_hash) {
|
||||||
.get_or_init(|| Mutex::new(HashSet::new()))
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.insert(block_hash)
|
|
||||||
{
|
|
||||||
// If another place is already adding this block then we can stop.
|
// If another place is already adding this block then we can stop.
|
||||||
return Ok(false);
|
return Ok(IncomingBlockOk::AlreadyHave);
|
||||||
}
|
}
|
||||||
|
|
||||||
// From this point on we MUST not early return without removing the block hash from `BLOCKS_BEING_HANDLED`.
|
// From this point on we MUST not early return without removing the block hash from `BLOCKS_BEING_HANDLED`.
|
||||||
|
@ -132,12 +130,7 @@ pub async fn handle_incoming_block(
|
||||||
.map_err(IncomingBlockError::InvalidBlock);
|
.map_err(IncomingBlockError::InvalidBlock);
|
||||||
|
|
||||||
// Remove the block hash from the blocks being handled.
|
// Remove the block hash from the blocks being handled.
|
||||||
BLOCKS_BEING_HANDLED
|
BLOCKS_BEING_HANDLED.lock().unwrap().remove(&block_hash);
|
||||||
.get()
|
|
||||||
.unwrap()
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.remove(&block_hash);
|
|
||||||
|
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
@ -153,7 +146,7 @@ async fn block_exists(
|
||||||
.call(BlockchainReadRequest::FindBlock(block_hash))
|
.call(BlockchainReadRequest::FindBlock(block_hash))
|
||||||
.await?
|
.await?
|
||||||
else {
|
else {
|
||||||
panic!("Invalid blockchain response!");
|
unreachable!();
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(chain.is_some())
|
Ok(chain.is_some())
|
||||||
|
|
|
@ -24,9 +24,9 @@ use cuprate_types::{
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
blockchain::{
|
blockchain::{
|
||||||
|
chain_service::ChainService,
|
||||||
interface::COMMAND_TX,
|
interface::COMMAND_TX,
|
||||||
syncer,
|
syncer,
|
||||||
types::ChainService,
|
|
||||||
types::{ConcreteBlockVerifierService, ConsensusBlockchainReadHandle},
|
types::{ConcreteBlockVerifierService, ConsensusBlockchainReadHandle},
|
||||||
},
|
},
|
||||||
constants::PANIC_CRITICAL_SERVICE_ERROR,
|
constants::PANIC_CRITICAL_SERVICE_ERROR,
|
||||||
|
@ -35,13 +35,13 @@ use crate::{
|
||||||
mod commands;
|
mod commands;
|
||||||
mod handler;
|
mod handler;
|
||||||
|
|
||||||
pub use commands::BlockchainManagerCommand;
|
pub use commands::{BlockchainManagerCommand, IncomingBlockOk};
|
||||||
|
|
||||||
/// Initialize the blockchain manger.
|
/// Initialize the blockchain manager.
|
||||||
///
|
///
|
||||||
/// This function sets up the [`BlockchainManager`] and the [`syncer`] so that the functions in [`interface`](super::interface)
|
/// This function sets up the [`BlockchainManager`] and the [`syncer`] so that the functions in [`interface`](super::interface)
|
||||||
/// can be called.
|
/// can be called.
|
||||||
pub async fn init_blockchain_manger(
|
pub async fn init_blockchain_manager(
|
||||||
clearnet_interface: NetworkInterface<ClearNet>,
|
clearnet_interface: NetworkInterface<ClearNet>,
|
||||||
blockchain_write_handle: BlockchainWriteHandle,
|
blockchain_write_handle: BlockchainWriteHandle,
|
||||||
blockchain_read_handle: BlockchainReadHandle,
|
blockchain_read_handle: BlockchainReadHandle,
|
||||||
|
@ -73,10 +73,10 @@ pub async fn init_blockchain_manger(
|
||||||
.await
|
.await
|
||||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
else {
|
else {
|
||||||
panic!("Blockchain context service returned wrong response!");
|
unreachable!()
|
||||||
};
|
};
|
||||||
|
|
||||||
let manger = BlockchainManager {
|
let manager = BlockchainManager {
|
||||||
blockchain_write_handle,
|
blockchain_write_handle,
|
||||||
blockchain_read_handle,
|
blockchain_read_handle,
|
||||||
blockchain_context_service,
|
blockchain_context_service,
|
||||||
|
@ -86,7 +86,7 @@ pub async fn init_blockchain_manger(
|
||||||
broadcast_svc: clearnet_interface.broadcast_svc(),
|
broadcast_svc: clearnet_interface.broadcast_svc(),
|
||||||
};
|
};
|
||||||
|
|
||||||
tokio::spawn(manger.run(batch_rx, command_rx));
|
tokio::spawn(manager.run(batch_rx, command_rx));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The blockchain manager.
|
/// The blockchain manager.
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
//! This module contains the commands for th blockchain manager.
|
//! This module contains the commands for the blockchain manager.
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use monero_serai::block::Block;
|
use monero_serai::block::Block;
|
||||||
|
@ -15,6 +15,18 @@ pub enum BlockchainManagerCommand {
|
||||||
/// All the transactions defined in [`Block::transactions`].
|
/// All the transactions defined in [`Block::transactions`].
|
||||||
prepped_txs: HashMap<[u8; 32], TransactionVerificationData>,
|
prepped_txs: HashMap<[u8; 32], TransactionVerificationData>,
|
||||||
/// The channel to send the response down.
|
/// The channel to send the response down.
|
||||||
response_tx: oneshot::Sender<Result<bool, anyhow::Error>>,
|
response_tx: oneshot::Sender<Result<IncomingBlockOk, anyhow::Error>>,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The [`Ok`] response for an incoming block.
|
||||||
|
pub enum IncomingBlockOk {
|
||||||
|
/// The block was added to the main-chain.
|
||||||
|
AddedToMainChain,
|
||||||
|
/// The blockchain manager is not ready yet.
|
||||||
|
NotReady,
|
||||||
|
/// The block was added to an alt-chain.
|
||||||
|
AddedToAltChain,
|
||||||
|
/// We already have the block.
|
||||||
|
AlreadyHave,
|
||||||
|
}
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
//! The blockchain manger handler functions.
|
//! The blockchain manager handler functions.
|
||||||
use std::{collections::HashMap, sync::Arc};
|
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::{TryFutureExt, TryStreamExt};
|
use futures::{TryFutureExt, TryStreamExt};
|
||||||
use monero_serai::{block::Block, transaction::Transaction};
|
use monero_serai::{block::Block, transaction::Transaction};
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
|
use std::ops::ControlFlow;
|
||||||
|
use std::{collections::HashMap, sync::Arc};
|
||||||
use tower::{Service, ServiceExt};
|
use tower::{Service, ServiceExt};
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ use cuprate_types::{
|
||||||
AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation,
|
AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use crate::blockchain::manager::commands::IncomingBlockOk;
|
||||||
use crate::{
|
use crate::{
|
||||||
blockchain::{
|
blockchain::{
|
||||||
manager::commands::BlockchainManagerCommand, types::ConsensusBlockchainReadHandle,
|
manager::commands::BlockchainManagerCommand, types::ConsensusBlockchainReadHandle,
|
||||||
|
@ -32,6 +33,11 @@ use crate::{
|
||||||
|
|
||||||
impl super::BlockchainManager {
|
impl super::BlockchainManager {
|
||||||
/// Handle an incoming command from another part of Cuprate.
|
/// Handle an incoming command from another part of Cuprate.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// This function will panic if any internal service returns an unexpected error that we cannot
|
||||||
|
/// recover from.
|
||||||
pub async fn handle_command(&mut self, command: BlockchainManagerCommand) {
|
pub async fn handle_command(&mut self, command: BlockchainManagerCommand) {
|
||||||
match command {
|
match command {
|
||||||
BlockchainManagerCommand::AddBlock {
|
BlockchainManagerCommand::AddBlock {
|
||||||
|
@ -67,17 +73,18 @@ impl super::BlockchainManager {
|
||||||
///
|
///
|
||||||
/// Otherwise, this function will validate and add the block to the main chain.
|
/// Otherwise, this function will validate and add the block to the main chain.
|
||||||
///
|
///
|
||||||
/// On success returns a [`bool`] indicating if the block was added to the main chain ([`true`])
|
/// # Panics
|
||||||
/// or an alt-chain ([`false`]).
|
///
|
||||||
|
/// This function will panic if any internal service returns an unexpected error that we cannot
|
||||||
|
/// recover from.
|
||||||
pub async fn handle_incoming_block(
|
pub async fn handle_incoming_block(
|
||||||
&mut self,
|
&mut self,
|
||||||
block: Block,
|
block: Block,
|
||||||
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
|
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
|
||||||
) -> Result<bool, anyhow::Error> {
|
) -> Result<IncomingBlockOk, anyhow::Error> {
|
||||||
if block.header.previous != self.cached_blockchain_context.top_hash {
|
if block.header.previous != self.cached_blockchain_context.top_hash {
|
||||||
self.handle_incoming_alt_block(block, prepared_txs).await?;
|
self.handle_incoming_alt_block(block, prepared_txs).await?;
|
||||||
|
return Ok(IncomingBlockOk::AddedToAltChain);
|
||||||
return Ok(false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let VerifyBlockResponse::MainChain(verified_block) = self
|
let VerifyBlockResponse::MainChain(verified_block) = self
|
||||||
|
@ -91,7 +98,7 @@ impl super::BlockchainManager {
|
||||||
})
|
})
|
||||||
.await?
|
.await?
|
||||||
else {
|
else {
|
||||||
panic!("Incorrect response!");
|
unreachable!();
|
||||||
};
|
};
|
||||||
|
|
||||||
let block_blob = Bytes::copy_from_slice(&verified_block.block_blob);
|
let block_blob = Bytes::copy_from_slice(&verified_block.block_blob);
|
||||||
|
@ -100,7 +107,7 @@ impl super::BlockchainManager {
|
||||||
self.broadcast_block(block_blob, self.cached_blockchain_context.chain_height)
|
self.broadcast_block(block_blob, self.cached_blockchain_context.chain_height)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
Ok(true)
|
Ok(IncomingBlockOk::AddedToMainChain)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle an incoming [`BlockBatch`].
|
/// Handle an incoming [`BlockBatch`].
|
||||||
|
@ -160,7 +167,7 @@ impl super::BlockchainManager {
|
||||||
self.stop_current_block_downloader.notify_one();
|
self.stop_current_block_downloader.notify_one();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
_ => panic!("Incorrect response!"),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
|
||||||
for (block, txs) in prepped_blocks {
|
for (block, txs) in prepped_blocks {
|
||||||
|
@ -179,7 +186,7 @@ impl super::BlockchainManager {
|
||||||
self.stop_current_block_downloader.notify_one();
|
self.stop_current_block_downloader.notify_one();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
_ => panic!("Incorrect response!"),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
|
||||||
self.add_valid_block_to_main_chain(verified_block).await;
|
self.add_valid_block_to_main_chain(verified_block).await;
|
||||||
|
@ -226,16 +233,14 @@ impl super::BlockchainManager {
|
||||||
self.stop_current_block_downloader.notify_one();
|
self.stop_current_block_downloader.notify_one();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// the chain was reorged
|
Ok(AddAltBlock::Reorged) => {
|
||||||
Ok(true) => {
|
|
||||||
// Collect the remaining blocks and add them to the main chain instead.
|
// Collect the remaining blocks and add them to the main chain instead.
|
||||||
batch.blocks = blocks.collect();
|
batch.blocks = blocks.collect();
|
||||||
self.handle_incoming_block_batch_main_chain(batch).await;
|
self.handle_incoming_block_batch_main_chain(batch).await;
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// continue adding alt blocks.
|
// continue adding alt blocks.
|
||||||
Ok(false) => (),
|
Ok(AddAltBlock::Cached) => (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -246,8 +251,6 @@ impl super::BlockchainManager {
|
||||||
/// of the alt chain is higher than the main chain it will attempt a reorg otherwise it will add
|
/// of the alt chain is higher than the main chain it will attempt a reorg otherwise it will add
|
||||||
/// the alt block to the alt block cache.
|
/// the alt block to the alt block cache.
|
||||||
///
|
///
|
||||||
/// This function returns a [`bool`] indicating if the chain was reorganised ([`true`]) or not ([`false`]).
|
|
||||||
///
|
|
||||||
/// # Errors
|
/// # Errors
|
||||||
///
|
///
|
||||||
/// This will return an [`Err`] if:
|
/// This will return an [`Err`] if:
|
||||||
|
@ -258,11 +261,11 @@ impl super::BlockchainManager {
|
||||||
///
|
///
|
||||||
/// This function will panic if any internal service returns an unexpected error that we cannot
|
/// This function will panic if any internal service returns an unexpected error that we cannot
|
||||||
/// recover from.
|
/// recover from.
|
||||||
pub async fn handle_incoming_alt_block(
|
async fn handle_incoming_alt_block(
|
||||||
&mut self,
|
&mut self,
|
||||||
block: Block,
|
block: Block,
|
||||||
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
|
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
|
||||||
) -> Result<bool, anyhow::Error> {
|
) -> Result<AddAltBlock, anyhow::Error> {
|
||||||
let VerifyBlockResponse::AltChain(alt_block_info) = self
|
let VerifyBlockResponse::AltChain(alt_block_info) = self
|
||||||
.block_verifier_service
|
.block_verifier_service
|
||||||
.ready()
|
.ready()
|
||||||
|
@ -274,7 +277,7 @@ impl super::BlockchainManager {
|
||||||
})
|
})
|
||||||
.await?
|
.await?
|
||||||
else {
|
else {
|
||||||
panic!("Incorrect response!");
|
unreachable!();
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: check in consensus crate if alt block with this hash already exists.
|
// TODO: check in consensus crate if alt block with this hash already exists.
|
||||||
|
@ -284,7 +287,7 @@ impl super::BlockchainManager {
|
||||||
> self.cached_blockchain_context.cumulative_difficulty
|
> self.cached_blockchain_context.cumulative_difficulty
|
||||||
{
|
{
|
||||||
self.try_do_reorg(alt_block_info).await?;
|
self.try_do_reorg(alt_block_info).await?;
|
||||||
return Ok(true);
|
return Ok(AddAltBlock::Reorged);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.blockchain_write_handle
|
self.blockchain_write_handle
|
||||||
|
@ -294,7 +297,7 @@ impl super::BlockchainManager {
|
||||||
.call(BlockchainWriteRequest::WriteAltBlock(alt_block_info))
|
.call(BlockchainWriteRequest::WriteAltBlock(alt_block_info))
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(false)
|
Ok(AddAltBlock::Cached)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Attempt a re-org with the given top block of the alt-chain.
|
/// Attempt a re-org with the given top block of the alt-chain.
|
||||||
|
@ -328,7 +331,7 @@ impl super::BlockchainManager {
|
||||||
))
|
))
|
||||||
.await?
|
.await?
|
||||||
else {
|
else {
|
||||||
panic!("Incorrect response!");
|
unreachable!();
|
||||||
};
|
};
|
||||||
|
|
||||||
alt_blocks.push(top_alt_block);
|
alt_blocks.push(top_alt_block);
|
||||||
|
@ -347,7 +350,7 @@ impl super::BlockchainManager {
|
||||||
.await
|
.await
|
||||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
else {
|
else {
|
||||||
panic!("Incorrect response!");
|
unreachable!();
|
||||||
};
|
};
|
||||||
|
|
||||||
self.blockchain_context_service
|
self.blockchain_context_service
|
||||||
|
@ -409,7 +412,7 @@ impl super::BlockchainManager {
|
||||||
})
|
})
|
||||||
.await?
|
.await?
|
||||||
else {
|
else {
|
||||||
panic!("Incorrect response!");
|
unreachable!();
|
||||||
};
|
};
|
||||||
|
|
||||||
self.add_valid_block_to_main_chain(verified_block).await;
|
self.add_valid_block_to_main_chain(verified_block).await;
|
||||||
|
@ -465,9 +468,17 @@ impl super::BlockchainManager {
|
||||||
.await
|
.await
|
||||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
else {
|
else {
|
||||||
panic!("Incorrect response!");
|
unreachable!();
|
||||||
};
|
};
|
||||||
|
|
||||||
self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone();
|
self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The result from successfully adding an alt-block.
|
||||||
|
enum AddAltBlock {
|
||||||
|
/// The alt-block was cached.
|
||||||
|
Cached,
|
||||||
|
/// The chain was reorged.
|
||||||
|
Reorged,
|
||||||
|
}
|
||||||
|
|
|
@ -65,7 +65,7 @@ where
|
||||||
.call(BlockChainContextRequest::GetContext)
|
.call(BlockChainContextRequest::GetContext)
|
||||||
.await?
|
.await?
|
||||||
else {
|
else {
|
||||||
panic!("Blockchain context service returned wrong response!");
|
unreachable!();
|
||||||
};
|
};
|
||||||
|
|
||||||
let client_pool = clearnet_interface.client_pool();
|
let client_pool = clearnet_interface.client_pool();
|
||||||
|
@ -130,7 +130,7 @@ where
|
||||||
.oneshot(BlockChainContextRequest::GetContext)
|
.oneshot(BlockChainContextRequest::GetContext)
|
||||||
.await?
|
.await?
|
||||||
else {
|
else {
|
||||||
panic!("Blockchain context service returned wrong response!");
|
unreachable!();
|
||||||
};
|
};
|
||||||
|
|
||||||
*old_context = ctx;
|
*old_context = ctx;
|
||||||
|
|
|
@ -22,67 +22,3 @@ pub type ConcreteTxVerifierService = TxVerifierService<ConsensusBlockchainReadHa
|
||||||
/// The [`BlockchainReadHandle`] with the [`tower::Service::Error`] mapped to conform to what the consensus crate requires.
|
/// The [`BlockchainReadHandle`] with the [`tower::Service::Error`] mapped to conform to what the consensus crate requires.
|
||||||
pub type ConsensusBlockchainReadHandle =
|
pub type ConsensusBlockchainReadHandle =
|
||||||
MapErr<BlockchainReadHandle, fn(RuntimeError) -> tower::BoxError>;
|
MapErr<BlockchainReadHandle, fn(RuntimeError) -> tower::BoxError>;
|
||||||
|
|
||||||
/// That service that allows retrieving the chain state to give to the P2P crates, so we can figure out
|
|
||||||
/// what blocks we need.
|
|
||||||
///
|
|
||||||
/// This has a more minimal interface than [`BlockchainReadRequest`] to make using the p2p crates easier.
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct ChainService(pub BlockchainReadHandle);
|
|
||||||
|
|
||||||
impl Service<ChainSvcRequest> for ChainService {
|
|
||||||
type Response = ChainSvcResponse;
|
|
||||||
type Error = tower::BoxError;
|
|
||||||
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
|
||||||
|
|
||||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
|
||||||
self.0.poll_ready(cx).map_err(Into::into)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn call(&mut self, req: ChainSvcRequest) -> Self::Future {
|
|
||||||
let map_res = |res: BlockchainResponse| match res {
|
|
||||||
BlockchainResponse::CompactChainHistory {
|
|
||||||
block_ids,
|
|
||||||
cumulative_difficulty,
|
|
||||||
} => ChainSvcResponse::CompactHistory {
|
|
||||||
block_ids,
|
|
||||||
cumulative_difficulty,
|
|
||||||
},
|
|
||||||
BlockchainResponse::FindFirstUnknown(res) => ChainSvcResponse::FindFirstUnknown(res),
|
|
||||||
_ => panic!("Blockchain returned wrong response"),
|
|
||||||
};
|
|
||||||
|
|
||||||
match req {
|
|
||||||
ChainSvcRequest::CompactHistory => self
|
|
||||||
.0
|
|
||||||
.call(BlockchainReadRequest::CompactChainHistory)
|
|
||||||
.map_ok(map_res)
|
|
||||||
.map_err(Into::into)
|
|
||||||
.boxed(),
|
|
||||||
ChainSvcRequest::FindFirstUnknown(req) => self
|
|
||||||
.0
|
|
||||||
.call(BlockchainReadRequest::FindFirstUnknown(req))
|
|
||||||
.map_ok(map_res)
|
|
||||||
.map_err(Into::into)
|
|
||||||
.boxed(),
|
|
||||||
ChainSvcRequest::CumulativeDifficulty => self
|
|
||||||
.0
|
|
||||||
.call(BlockchainReadRequest::CompactChainHistory)
|
|
||||||
.map_ok(|res| {
|
|
||||||
// TODO create a custom request instead of hijacking this one.
|
|
||||||
// TODO: use the context cache.
|
|
||||||
let BlockchainResponse::CompactChainHistory {
|
|
||||||
cumulative_difficulty,
|
|
||||||
..
|
|
||||||
} = res
|
|
||||||
else {
|
|
||||||
panic!("Blockchain returned wrong response");
|
|
||||||
};
|
|
||||||
|
|
||||||
ChainSvcResponse::CumulativeDifficulty(cumulative_difficulty)
|
|
||||||
})
|
|
||||||
.map_err(Into::into)
|
|
||||||
.boxed(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -14,6 +14,7 @@ pub const VERSION_BUILD: &str = if cfg!(debug_assertions) {
|
||||||
formatcp!("{VERSION}-release")
|
formatcp!("{VERSION}-release")
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// The panic message used when cuprated encounters a critical service error.
|
||||||
pub const PANIC_CRITICAL_SERVICE_ERROR: &str =
|
pub const PANIC_CRITICAL_SERVICE_ERROR: &str =
|
||||||
"A service critical to Cuprate's function returned an unexpected error.";
|
"A service critical to Cuprate's function returned an unexpected error.";
|
||||||
|
|
||||||
|
|
|
@ -6,4 +6,7 @@ use tokio::sync::RwLock;
|
||||||
///
|
///
|
||||||
/// A [`RwLock`] where a write lock is taken during a reorg and a read lock can be taken
|
/// A [`RwLock`] where a write lock is taken during a reorg and a read lock can be taken
|
||||||
/// for any operation which must complete without a reorg happening.
|
/// for any operation which must complete without a reorg happening.
|
||||||
|
///
|
||||||
|
/// Currently, the only operation that needs to take a read lock is adding txs to the tx-pool,
|
||||||
|
/// this can potentially be removed in the future, see: TODO
|
||||||
pub static REORG_LOCK: RwLock<()> = RwLock::const_new(());
|
pub static REORG_LOCK: RwLock<()> = RwLock::const_new(());
|
||||||
|
|
Loading…
Reference in a new issue