mirror of
https://github.com/Cuprate/cuprate.git
synced 2024-12-22 11:39:26 +00:00
cuprated: Blockchain Manager (#274)
Some checks failed
CI / fmt (push) Waiting to run
CI / typo (push) Waiting to run
CI / ci (macos-latest, stable, bash) (push) Waiting to run
CI / ci (ubuntu-latest, stable, bash) (push) Waiting to run
CI / ci (windows-latest, stable-x86_64-pc-windows-gnu, msys2 {0}) (push) Waiting to run
Doc / build (push) Waiting to run
Doc / deploy (push) Blocked by required conditions
Audit / audit (push) Has been cancelled
Deny / audit (push) Has been cancelled
Some checks failed
CI / fmt (push) Waiting to run
CI / typo (push) Waiting to run
CI / ci (macos-latest, stable, bash) (push) Waiting to run
CI / ci (ubuntu-latest, stable, bash) (push) Waiting to run
CI / ci (windows-latest, stable-x86_64-pc-windows-gnu, msys2 {0}) (push) Waiting to run
Doc / build (push) Waiting to run
Doc / deploy (push) Blocked by required conditions
Audit / audit (push) Has been cancelled
Deny / audit (push) Has been cancelled
* add cuprated skeleton * fmt and add deny exception * add main chain batch handler * add blockchain init * very rough block manager * misc changes * move more config values * add new tables & types * add function to fully add an alt block * resolve current todo!s * add new requests * WIP: starting re-orgs * add last service request * commit Cargo.lock * add test * more docs + cleanup + alt blocks request * clippy + fmt * document types * move tx_fee to helper * more doc updates * fmt * fix imports * remove config files * fix merge errors * fix generated coins * handle more p2p requests + alt blocks * clean up handler code * add function for incoming blocks * add docs to handler functions * broadcast new blocks + add commands * add fluffy block handler * fix new block handling * small cleanup * increase outbound peer count * fix merge * clean up the blockchain manger * add more docs + cleanup imports * fix typo * fix doc * remove unrelated changes * improve interface globals * manger -> manager * enums instead of bools * move chain service to separate file * more review fixes * add link to issue * fix syncer + update comment * fmt
This commit is contained in:
parent
00bdd6ffaa
commit
8be369846e
22 changed files with 1275 additions and 31 deletions
48
Cargo.lock
generated
48
Cargo.lock
generated
|
@ -1906,6 +1906,16 @@ dependencies = [
|
|||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.46.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
|
||||
dependencies = [
|
||||
"overload",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-traits"
|
||||
version = "0.2.19"
|
||||
|
@ -1943,6 +1953,12 @@ version = "0.2.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
|
||||
|
||||
[[package]]
|
||||
name = "overload"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||
|
||||
[[package]]
|
||||
name = "page_size"
|
||||
version = "0.6.0"
|
||||
|
@ -2564,6 +2580,15 @@ dependencies = [
|
|||
"keccak",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sharded-slab"
|
||||
version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "shlex"
|
||||
version = "1.3.0"
|
||||
|
@ -2960,6 +2985,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"valuable",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-log"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
|
||||
dependencies = [
|
||||
"log",
|
||||
"once_cell",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -2968,7 +3005,12 @@ version = "0.3.18"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
|
||||
dependencies = [
|
||||
"nu-ansi-term",
|
||||
"sharded-slab",
|
||||
"smallvec",
|
||||
"thread_local",
|
||||
"tracing-core",
|
||||
"tracing-log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -3051,6 +3093,12 @@ dependencies = [
|
|||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "valuable"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
|
||||
|
||||
[[package]]
|
||||
name = "version_check"
|
||||
version = "0.9.5"
|
||||
|
|
|
@ -23,7 +23,7 @@ cuprate-p2p-core = { path = "../../p2p/p2p-core" }
|
|||
cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower" }
|
||||
cuprate-async-buffer = { path = "../../p2p/async-buffer" }
|
||||
cuprate-address-book = { path = "../../p2p/address-book" }
|
||||
cuprate-blockchain = { path = "../../storage/blockchain" }
|
||||
cuprate-blockchain = { path = "../../storage/blockchain", features = ["service"] }
|
||||
cuprate-database-service = { path = "../../storage/service" }
|
||||
cuprate-txpool = { path = "../../storage/txpool" }
|
||||
cuprate-database = { path = "../../storage/database" }
|
||||
|
@ -70,8 +70,14 @@ tokio-util = { workspace = true }
|
|||
tokio-stream = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tower = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
tracing-subscriber = { workspace = true, features = ["std", "fmt", "default"] }
|
||||
tracing = { workspace = true }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[profile.dev]
|
||||
panic = "abort"
|
||||
|
||||
[profile.release]
|
||||
panic = "abort"
|
||||
|
|
|
@ -1,6 +1,101 @@
|
|||
//! Blockchain
|
||||
//!
|
||||
//! Will contain the chain manager and syncer.
|
||||
//! Contains the blockchain manager, syncer and an interface to mutate the blockchain.
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::FutureExt;
|
||||
use tokio::sync::{mpsc, Notify};
|
||||
use tower::{BoxError, Service, ServiceExt};
|
||||
|
||||
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
|
||||
use cuprate_consensus::{generate_genesis_block, BlockChainContextService, ContextConfig};
|
||||
use cuprate_cryptonight::cryptonight_hash_v0;
|
||||
use cuprate_p2p::{block_downloader::BlockDownloaderConfig, NetworkInterface};
|
||||
use cuprate_p2p_core::{ClearNet, Network};
|
||||
use cuprate_types::{
|
||||
blockchain::{BlockchainReadRequest, BlockchainWriteRequest},
|
||||
VerifiedBlockInformation,
|
||||
};
|
||||
|
||||
use crate::constants::PANIC_CRITICAL_SERVICE_ERROR;
|
||||
|
||||
mod chain_service;
|
||||
pub mod interface;
|
||||
mod manager;
|
||||
mod syncer;
|
||||
mod types;
|
||||
|
||||
use types::{
|
||||
ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle,
|
||||
};
|
||||
|
||||
/// Checks if the genesis block is in the blockchain and adds it if not.
|
||||
pub async fn check_add_genesis(
|
||||
blockchain_read_handle: &mut BlockchainReadHandle,
|
||||
blockchain_write_handle: &mut BlockchainWriteHandle,
|
||||
network: Network,
|
||||
) {
|
||||
// Try to get the chain height, will fail if the genesis block is not in the DB.
|
||||
if blockchain_read_handle
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(BlockchainReadRequest::ChainHeight)
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
let genesis = generate_genesis_block(network);
|
||||
|
||||
assert_eq!(genesis.miner_transaction.prefix().outputs.len(), 1);
|
||||
assert!(genesis.transactions.is_empty());
|
||||
|
||||
blockchain_write_handle
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(BlockchainWriteRequest::WriteBlock(
|
||||
VerifiedBlockInformation {
|
||||
block_blob: genesis.serialize(),
|
||||
txs: vec![],
|
||||
block_hash: genesis.hash(),
|
||||
pow_hash: cryptonight_hash_v0(&genesis.serialize_pow_hash()),
|
||||
height: 0,
|
||||
generated_coins: genesis.miner_transaction.prefix().outputs[0]
|
||||
.amount
|
||||
.unwrap(),
|
||||
weight: genesis.miner_transaction.weight(),
|
||||
long_term_weight: genesis.miner_transaction.weight(),
|
||||
cumulative_difficulty: 1,
|
||||
block: genesis,
|
||||
},
|
||||
))
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR);
|
||||
}
|
||||
|
||||
/// Initializes the consensus services.
|
||||
pub async fn init_consensus(
|
||||
blockchain_read_handle: BlockchainReadHandle,
|
||||
context_config: ContextConfig,
|
||||
) -> Result<
|
||||
(
|
||||
ConcreteBlockVerifierService,
|
||||
ConcreteTxVerifierService,
|
||||
BlockChainContextService,
|
||||
),
|
||||
BoxError,
|
||||
> {
|
||||
let read_handle = ConsensusBlockchainReadHandle::new(blockchain_read_handle, BoxError::from);
|
||||
|
||||
let ctx_service =
|
||||
cuprate_consensus::initialize_blockchain_context(context_config, read_handle.clone())
|
||||
.await?;
|
||||
|
||||
let (block_verifier_svc, tx_verifier_svc) =
|
||||
cuprate_consensus::initialize_verifier(read_handle, ctx_service.clone());
|
||||
|
||||
Ok((block_verifier_svc, tx_verifier_svc, ctx_service))
|
||||
}
|
||||
|
|
72
binaries/cuprated/src/blockchain/chain_service.rs
Normal file
72
binaries/cuprated/src/blockchain/chain_service.rs
Normal file
|
@ -0,0 +1,72 @@
|
|||
use std::task::{Context, Poll};
|
||||
|
||||
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
|
||||
use tower::Service;
|
||||
|
||||
use cuprate_blockchain::service::BlockchainReadHandle;
|
||||
use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse};
|
||||
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
|
||||
|
||||
/// That service that allows retrieving the chain state to give to the P2P crates, so we can figure out
|
||||
/// what blocks we need.
|
||||
///
|
||||
/// This has a more minimal interface than [`BlockchainReadRequest`] to make using the p2p crates easier.
|
||||
#[derive(Clone)]
|
||||
pub struct ChainService(pub BlockchainReadHandle);
|
||||
|
||||
impl Service<ChainSvcRequest> for ChainService {
|
||||
type Response = ChainSvcResponse;
|
||||
type Error = tower::BoxError;
|
||||
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.0.poll_ready(cx).map_err(Into::into)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: ChainSvcRequest) -> Self::Future {
|
||||
let map_res = |res: BlockchainResponse| match res {
|
||||
BlockchainResponse::CompactChainHistory {
|
||||
block_ids,
|
||||
cumulative_difficulty,
|
||||
} => ChainSvcResponse::CompactHistory {
|
||||
block_ids,
|
||||
cumulative_difficulty,
|
||||
},
|
||||
BlockchainResponse::FindFirstUnknown(res) => ChainSvcResponse::FindFirstUnknown(res),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
match req {
|
||||
ChainSvcRequest::CompactHistory => self
|
||||
.0
|
||||
.call(BlockchainReadRequest::CompactChainHistory)
|
||||
.map_ok(map_res)
|
||||
.map_err(Into::into)
|
||||
.boxed(),
|
||||
ChainSvcRequest::FindFirstUnknown(req) => self
|
||||
.0
|
||||
.call(BlockchainReadRequest::FindFirstUnknown(req))
|
||||
.map_ok(map_res)
|
||||
.map_err(Into::into)
|
||||
.boxed(),
|
||||
ChainSvcRequest::CumulativeDifficulty => self
|
||||
.0
|
||||
.call(BlockchainReadRequest::CompactChainHistory)
|
||||
.map_ok(|res| {
|
||||
// TODO create a custom request instead of hijacking this one.
|
||||
// TODO: use the context cache.
|
||||
let BlockchainResponse::CompactChainHistory {
|
||||
cumulative_difficulty,
|
||||
..
|
||||
} = res
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
ChainSvcResponse::CumulativeDifficulty(cumulative_difficulty)
|
||||
})
|
||||
.map_err(Into::into)
|
||||
.boxed(),
|
||||
}
|
||||
}
|
||||
}
|
161
binaries/cuprated/src/blockchain/interface.rs
Normal file
161
binaries/cuprated/src/blockchain/interface.rs
Normal file
|
@ -0,0 +1,161 @@
|
|||
//! The blockchain manager interface.
|
||||
//!
|
||||
//! This module contains all the functions to mutate the blockchain's state in any way, through the
|
||||
//! blockchain manager.
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::{LazyLock, Mutex, OnceLock},
|
||||
};
|
||||
|
||||
use monero_serai::{block::Block, transaction::Transaction};
|
||||
use rayon::prelude::*;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tower::{Service, ServiceExt};
|
||||
|
||||
use cuprate_blockchain::service::BlockchainReadHandle;
|
||||
use cuprate_consensus::transactions::new_tx_verification_data;
|
||||
use cuprate_helper::cast::usize_to_u64;
|
||||
use cuprate_types::{
|
||||
blockchain::{BlockchainReadRequest, BlockchainResponse},
|
||||
Chain,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk},
|
||||
constants::PANIC_CRITICAL_SERVICE_ERROR,
|
||||
};
|
||||
|
||||
/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manager.
|
||||
///
|
||||
/// This channel is initialized in [`init_blockchain_manager`](super::manager::init_blockchain_manager), the functions
|
||||
/// in this file document what happens if this is not initialized when they are called.
|
||||
pub(super) static COMMAND_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();
|
||||
|
||||
/// An error that can be returned from [`handle_incoming_block`].
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum IncomingBlockError {
|
||||
/// Some transactions in the block were unknown.
|
||||
///
|
||||
/// The inner values are the block hash and the indexes of the missing txs in the block.
|
||||
#[error("Unknown transactions in block.")]
|
||||
UnknownTransactions([u8; 32], Vec<u64>),
|
||||
/// We are missing the block's parent.
|
||||
#[error("The block has an unknown parent.")]
|
||||
Orphan,
|
||||
/// The block was invalid.
|
||||
#[error(transparent)]
|
||||
InvalidBlock(anyhow::Error),
|
||||
}
|
||||
|
||||
/// Try to add a new block to the blockchain.
|
||||
///
|
||||
/// On success returns [`IncomingBlockOk`].
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// This function will return an error if:
|
||||
/// - the block was invalid
|
||||
/// - we are missing transactions
|
||||
/// - the block's parent is unknown
|
||||
pub async fn handle_incoming_block(
|
||||
block: Block,
|
||||
given_txs: Vec<Transaction>,
|
||||
blockchain_read_handle: &mut BlockchainReadHandle,
|
||||
) -> Result<IncomingBlockOk, IncomingBlockError> {
|
||||
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
|
||||
///
|
||||
/// This lock prevents sending the same block to the blockchain manager from multiple connections
|
||||
/// before one of them actually gets added to the chain, allowing peers to do other things.
|
||||
///
|
||||
/// This is used over something like a dashmap as we expect a lot of collisions in a short amount of
|
||||
/// time for new blocks, so we would lose the benefit of sharded locks. A dashmap is made up of `RwLocks`
|
||||
/// which are also more expensive than `Mutex`s.
|
||||
static BLOCKS_BEING_HANDLED: LazyLock<Mutex<HashSet<[u8; 32]>>> =
|
||||
LazyLock::new(|| Mutex::new(HashSet::new()));
|
||||
// FIXME: we should look in the tx-pool for txs when that is ready.
|
||||
|
||||
if !block_exists(block.header.previous, blockchain_read_handle)
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
{
|
||||
return Err(IncomingBlockError::Orphan);
|
||||
}
|
||||
|
||||
let block_hash = block.hash();
|
||||
|
||||
if block_exists(block_hash, blockchain_read_handle)
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
{
|
||||
return Ok(IncomingBlockOk::AlreadyHave);
|
||||
}
|
||||
|
||||
// TODO: remove this when we have a working tx-pool.
|
||||
if given_txs.len() != block.transactions.len() {
|
||||
return Err(IncomingBlockError::UnknownTransactions(
|
||||
block_hash,
|
||||
(0..usize_to_u64(block.transactions.len())).collect(),
|
||||
));
|
||||
}
|
||||
|
||||
// TODO: check we actually got given the right txs.
|
||||
let prepped_txs = given_txs
|
||||
.into_par_iter()
|
||||
.map(|tx| {
|
||||
let tx = new_tx_verification_data(tx)?;
|
||||
Ok((tx.tx_hash, tx))
|
||||
})
|
||||
.collect::<Result<_, anyhow::Error>>()
|
||||
.map_err(IncomingBlockError::InvalidBlock)?;
|
||||
|
||||
let Some(incoming_block_tx) = COMMAND_TX.get() else {
|
||||
// We could still be starting up the blockchain manager.
|
||||
return Ok(IncomingBlockOk::NotReady);
|
||||
};
|
||||
|
||||
// Add the blocks hash to the blocks being handled.
|
||||
if !BLOCKS_BEING_HANDLED.lock().unwrap().insert(block_hash) {
|
||||
// If another place is already adding this block then we can stop.
|
||||
return Ok(IncomingBlockOk::AlreadyHave);
|
||||
}
|
||||
|
||||
// From this point on we MUST not early return without removing the block hash from `BLOCKS_BEING_HANDLED`.
|
||||
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
|
||||
incoming_block_tx
|
||||
.send(BlockchainManagerCommand::AddBlock {
|
||||
block,
|
||||
prepped_txs,
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.expect("TODO: don't actually panic here, an err means we are shutting down");
|
||||
|
||||
let res = response_rx
|
||||
.await
|
||||
.expect("The blockchain manager will always respond")
|
||||
.map_err(IncomingBlockError::InvalidBlock);
|
||||
|
||||
// Remove the block hash from the blocks being handled.
|
||||
BLOCKS_BEING_HANDLED.lock().unwrap().remove(&block_hash);
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
/// Check if we have a block with the given hash.
|
||||
async fn block_exists(
|
||||
block_hash: [u8; 32],
|
||||
blockchain_read_handle: &mut BlockchainReadHandle,
|
||||
) -> Result<bool, anyhow::Error> {
|
||||
let BlockchainResponse::FindBlock(chain) = blockchain_read_handle
|
||||
.ready()
|
||||
.await?
|
||||
.call(BlockchainReadRequest::FindBlock(block_hash))
|
||||
.await?
|
||||
else {
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
Ok(chain.is_some())
|
||||
}
|
|
@ -1 +1,143 @@
|
|||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use futures::StreamExt;
|
||||
use monero_serai::block::Block;
|
||||
use tokio::sync::{mpsc, oneshot, Notify};
|
||||
use tower::{Service, ServiceExt};
|
||||
use tracing::error;
|
||||
|
||||
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
|
||||
use cuprate_consensus::{
|
||||
context::RawBlockChainContext, BlockChainContextRequest, BlockChainContextResponse,
|
||||
BlockChainContextService, BlockVerifierService, ExtendedConsensusError, TxVerifierService,
|
||||
VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse,
|
||||
};
|
||||
use cuprate_p2p::{
|
||||
block_downloader::{BlockBatch, BlockDownloaderConfig},
|
||||
BroadcastSvc, NetworkInterface,
|
||||
};
|
||||
use cuprate_p2p_core::ClearNet;
|
||||
use cuprate_types::{
|
||||
blockchain::{BlockchainReadRequest, BlockchainResponse},
|
||||
Chain, TransactionVerificationData,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
blockchain::{
|
||||
chain_service::ChainService,
|
||||
interface::COMMAND_TX,
|
||||
syncer,
|
||||
types::{ConcreteBlockVerifierService, ConsensusBlockchainReadHandle},
|
||||
},
|
||||
constants::PANIC_CRITICAL_SERVICE_ERROR,
|
||||
};
|
||||
|
||||
mod commands;
|
||||
mod handler;
|
||||
|
||||
pub use commands::{BlockchainManagerCommand, IncomingBlockOk};
|
||||
|
||||
/// Initialize the blockchain manager.
|
||||
///
|
||||
/// This function sets up the [`BlockchainManager`] and the [`syncer`] so that the functions in [`interface`](super::interface)
|
||||
/// can be called.
|
||||
pub async fn init_blockchain_manager(
|
||||
clearnet_interface: NetworkInterface<ClearNet>,
|
||||
blockchain_write_handle: BlockchainWriteHandle,
|
||||
blockchain_read_handle: BlockchainReadHandle,
|
||||
mut blockchain_context_service: BlockChainContextService,
|
||||
block_verifier_service: ConcreteBlockVerifierService,
|
||||
block_downloader_config: BlockDownloaderConfig,
|
||||
) {
|
||||
// TODO: find good values for these size limits
|
||||
let (batch_tx, batch_rx) = mpsc::channel(1);
|
||||
let stop_current_block_downloader = Arc::new(Notify::new());
|
||||
let (command_tx, command_rx) = mpsc::channel(3);
|
||||
|
||||
COMMAND_TX.set(command_tx).unwrap();
|
||||
|
||||
tokio::spawn(syncer::syncer(
|
||||
blockchain_context_service.clone(),
|
||||
ChainService(blockchain_read_handle.clone()),
|
||||
clearnet_interface.clone(),
|
||||
batch_tx,
|
||||
Arc::clone(&stop_current_block_downloader),
|
||||
block_downloader_config,
|
||||
));
|
||||
|
||||
let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(BlockChainContextRequest::GetContext)
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
let manager = BlockchainManager {
|
||||
blockchain_write_handle,
|
||||
blockchain_read_handle,
|
||||
blockchain_context_service,
|
||||
cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(),
|
||||
block_verifier_service,
|
||||
stop_current_block_downloader,
|
||||
broadcast_svc: clearnet_interface.broadcast_svc(),
|
||||
};
|
||||
|
||||
tokio::spawn(manager.run(batch_rx, command_rx));
|
||||
}
|
||||
|
||||
/// The blockchain manager.
|
||||
///
|
||||
/// This handles all mutation of the blockchain, anything that changes the state of the blockchain must
|
||||
/// go through this.
|
||||
///
|
||||
/// Other parts of Cuprate can interface with this by using the functions in [`interface`](super::interface).
|
||||
pub struct BlockchainManager {
|
||||
/// The [`BlockchainWriteHandle`], this is the _only_ part of Cuprate where a [`BlockchainWriteHandle`]
|
||||
/// is held.
|
||||
blockchain_write_handle: BlockchainWriteHandle,
|
||||
/// A [`BlockchainReadHandle`].
|
||||
blockchain_read_handle: BlockchainReadHandle,
|
||||
// TODO: Improve the API of the cache service.
|
||||
// TODO: rename the cache service -> `BlockchainContextService`.
|
||||
/// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve
|
||||
/// values without needing to go to a [`BlockchainReadHandle`].
|
||||
blockchain_context_service: BlockChainContextService,
|
||||
/// A cached context representing the current state.
|
||||
cached_blockchain_context: RawBlockChainContext,
|
||||
/// The block verifier service, to verify incoming blocks.
|
||||
block_verifier_service: ConcreteBlockVerifierService,
|
||||
/// A [`Notify`] to tell the [syncer](syncer::syncer) that we want to cancel this current download
|
||||
/// attempt.
|
||||
stop_current_block_downloader: Arc<Notify>,
|
||||
/// The broadcast service, to broadcast new blocks.
|
||||
broadcast_svc: BroadcastSvc<ClearNet>,
|
||||
}
|
||||
|
||||
impl BlockchainManager {
|
||||
/// The [`BlockchainManager`] task.
|
||||
pub async fn run(
|
||||
mut self,
|
||||
mut block_batch_rx: mpsc::Receiver<BlockBatch>,
|
||||
mut command_rx: mpsc::Receiver<BlockchainManagerCommand>,
|
||||
) {
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(batch) = block_batch_rx.recv() => {
|
||||
self.handle_incoming_block_batch(
|
||||
batch,
|
||||
).await;
|
||||
}
|
||||
Some(incoming_command) = command_rx.recv() => {
|
||||
self.handle_command(incoming_command).await;
|
||||
}
|
||||
else => {
|
||||
todo!("TODO: exit the BC manager")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
32
binaries/cuprated/src/blockchain/manager/commands.rs
Normal file
32
binaries/cuprated/src/blockchain/manager/commands.rs
Normal file
|
@ -0,0 +1,32 @@
|
|||
//! This module contains the commands for the blockchain manager.
|
||||
use std::collections::HashMap;
|
||||
|
||||
use monero_serai::block::Block;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use cuprate_types::TransactionVerificationData;
|
||||
|
||||
/// The blockchain manager commands.
|
||||
pub enum BlockchainManagerCommand {
|
||||
/// Attempt to add a new block to the blockchain.
|
||||
AddBlock {
|
||||
/// The [`Block`] to add.
|
||||
block: Block,
|
||||
/// All the transactions defined in [`Block::transactions`].
|
||||
prepped_txs: HashMap<[u8; 32], TransactionVerificationData>,
|
||||
/// The channel to send the response down.
|
||||
response_tx: oneshot::Sender<Result<IncomingBlockOk, anyhow::Error>>,
|
||||
},
|
||||
}
|
||||
|
||||
/// The [`Ok`] response for an incoming block.
|
||||
pub enum IncomingBlockOk {
|
||||
/// The block was added to the main-chain.
|
||||
AddedToMainChain,
|
||||
/// The blockchain manager is not ready yet.
|
||||
NotReady,
|
||||
/// The block was added to an alt-chain.
|
||||
AddedToAltChain,
|
||||
/// We already have the block.
|
||||
AlreadyHave,
|
||||
}
|
484
binaries/cuprated/src/blockchain/manager/handler.rs
Normal file
484
binaries/cuprated/src/blockchain/manager/handler.rs
Normal file
|
@ -0,0 +1,484 @@
|
|||
//! The blockchain manager handler functions.
|
||||
use bytes::Bytes;
|
||||
use futures::{TryFutureExt, TryStreamExt};
|
||||
use monero_serai::{block::Block, transaction::Transaction};
|
||||
use rayon::prelude::*;
|
||||
use std::ops::ControlFlow;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tower::{Service, ServiceExt};
|
||||
use tracing::info;
|
||||
|
||||
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
|
||||
use cuprate_consensus::{
|
||||
block::PreparedBlock, context::NewBlockData, transactions::new_tx_verification_data,
|
||||
BlockChainContextRequest, BlockChainContextResponse, BlockVerifierService,
|
||||
ExtendedConsensusError, VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest,
|
||||
VerifyTxResponse,
|
||||
};
|
||||
use cuprate_helper::cast::usize_to_u64;
|
||||
use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest};
|
||||
use cuprate_types::{
|
||||
blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest},
|
||||
AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation,
|
||||
};
|
||||
|
||||
use crate::blockchain::manager::commands::IncomingBlockOk;
|
||||
use crate::{
|
||||
blockchain::{
|
||||
manager::commands::BlockchainManagerCommand, types::ConsensusBlockchainReadHandle,
|
||||
},
|
||||
constants::PANIC_CRITICAL_SERVICE_ERROR,
|
||||
signals::REORG_LOCK,
|
||||
};
|
||||
|
||||
impl super::BlockchainManager {
|
||||
/// Handle an incoming command from another part of Cuprate.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function will panic if any internal service returns an unexpected error that we cannot
|
||||
/// recover from.
|
||||
pub async fn handle_command(&mut self, command: BlockchainManagerCommand) {
|
||||
match command {
|
||||
BlockchainManagerCommand::AddBlock {
|
||||
block,
|
||||
prepped_txs,
|
||||
response_tx,
|
||||
} => {
|
||||
let res = self.handle_incoming_block(block, prepped_txs).await;
|
||||
|
||||
drop(response_tx.send(res));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Broadcast a valid block to the network.
|
||||
async fn broadcast_block(&mut self, block_bytes: Bytes, blockchain_height: usize) {
|
||||
self.broadcast_svc
|
||||
.ready()
|
||||
.await
|
||||
.expect("Broadcast service is Infallible.")
|
||||
.call(BroadcastRequest::Block {
|
||||
block_bytes,
|
||||
current_blockchain_height: usize_to_u64(blockchain_height),
|
||||
})
|
||||
.await
|
||||
.expect("Broadcast service is Infallible.");
|
||||
}
|
||||
|
||||
/// Handle an incoming [`Block`].
|
||||
///
|
||||
/// This function will route to [`Self::handle_incoming_alt_block`] if the block does not follow
|
||||
/// the top of the main chain.
|
||||
///
|
||||
/// Otherwise, this function will validate and add the block to the main chain.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function will panic if any internal service returns an unexpected error that we cannot
|
||||
/// recover from.
|
||||
pub async fn handle_incoming_block(
|
||||
&mut self,
|
||||
block: Block,
|
||||
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
|
||||
) -> Result<IncomingBlockOk, anyhow::Error> {
|
||||
if block.header.previous != self.cached_blockchain_context.top_hash {
|
||||
self.handle_incoming_alt_block(block, prepared_txs).await?;
|
||||
return Ok(IncomingBlockOk::AddedToAltChain);
|
||||
}
|
||||
|
||||
let VerifyBlockResponse::MainChain(verified_block) = self
|
||||
.block_verifier_service
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(VerifyBlockRequest::MainChain {
|
||||
block,
|
||||
prepared_txs,
|
||||
})
|
||||
.await?
|
||||
else {
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
let block_blob = Bytes::copy_from_slice(&verified_block.block_blob);
|
||||
self.add_valid_block_to_main_chain(verified_block).await;
|
||||
|
||||
self.broadcast_block(block_blob, self.cached_blockchain_context.chain_height)
|
||||
.await;
|
||||
|
||||
Ok(IncomingBlockOk::AddedToMainChain)
|
||||
}
|
||||
|
||||
/// Handle an incoming [`BlockBatch`].
|
||||
///
|
||||
/// This function will route to [`Self::handle_incoming_block_batch_main_chain`] or [`Self::handle_incoming_block_batch_alt_chain`]
|
||||
/// depending on if the first block in the batch follows from the top of our chain.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function will panic if the batch is empty or if any internal service returns an unexpected
|
||||
/// error that we cannot recover from or if the incoming batch contains no blocks.
|
||||
pub async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) {
|
||||
let (first_block, _) = batch
|
||||
.blocks
|
||||
.first()
|
||||
.expect("Block batch should not be empty");
|
||||
|
||||
if first_block.header.previous == self.cached_blockchain_context.top_hash {
|
||||
self.handle_incoming_block_batch_main_chain(batch).await;
|
||||
} else {
|
||||
self.handle_incoming_block_batch_alt_chain(batch).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles an incoming [`BlockBatch`] that follows the main chain.
|
||||
///
|
||||
/// This function will handle validating the blocks in the batch and adding them to the blockchain
|
||||
/// database and context cache.
|
||||
///
|
||||
/// This function will also handle banning the peer and canceling the block downloader if the
|
||||
/// block is invalid.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function will panic if any internal service returns an unexpected error that we cannot
|
||||
/// recover from or if the incoming batch contains no blocks.
|
||||
async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) {
|
||||
info!(
|
||||
"Handling batch to main chain height: {}",
|
||||
batch.blocks.first().unwrap().0.number().unwrap()
|
||||
);
|
||||
|
||||
let batch_prep_res = self
|
||||
.block_verifier_service
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(VerifyBlockRequest::MainChainBatchPrepareBlocks {
|
||||
blocks: batch.blocks,
|
||||
})
|
||||
.await;
|
||||
|
||||
let prepped_blocks = match batch_prep_res {
|
||||
Ok(VerifyBlockResponse::MainChainBatchPrepped(prepped_blocks)) => prepped_blocks,
|
||||
Err(_) => {
|
||||
batch.peer_handle.ban_peer(LONG_BAN);
|
||||
self.stop_current_block_downloader.notify_one();
|
||||
return;
|
||||
}
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
for (block, txs) in prepped_blocks {
|
||||
let verify_res = self
|
||||
.block_verifier_service
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(VerifyBlockRequest::MainChainPrepped { block, txs })
|
||||
.await;
|
||||
|
||||
let verified_block = match verify_res {
|
||||
Ok(VerifyBlockResponse::MainChain(verified_block)) => verified_block,
|
||||
Err(_) => {
|
||||
batch.peer_handle.ban_peer(LONG_BAN);
|
||||
self.stop_current_block_downloader.notify_one();
|
||||
return;
|
||||
}
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
self.add_valid_block_to_main_chain(verified_block).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles an incoming [`BlockBatch`] that does not follow the main-chain.
|
||||
///
|
||||
/// This function will handle validating the alt-blocks to add them to our cache and reorging the
|
||||
/// chain if the alt-chain has a higher cumulative difficulty.
|
||||
///
|
||||
/// This function will also handle banning the peer and canceling the block downloader if the
|
||||
/// alt block is invalid or if a reorg fails.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function will panic if any internal service returns an unexpected error that we cannot
|
||||
/// recover from.
|
||||
async fn handle_incoming_block_batch_alt_chain(&mut self, mut batch: BlockBatch) {
|
||||
// TODO: this needs testing (this whole section does but alt-blocks specifically).
|
||||
|
||||
let mut blocks = batch.blocks.into_iter();
|
||||
|
||||
while let Some((block, txs)) = blocks.next() {
|
||||
// async blocks work as try blocks.
|
||||
let res = async {
|
||||
let txs = txs
|
||||
.into_par_iter()
|
||||
.map(|tx| {
|
||||
let tx = new_tx_verification_data(tx)?;
|
||||
Ok((tx.tx_hash, tx))
|
||||
})
|
||||
.collect::<Result<_, anyhow::Error>>()?;
|
||||
|
||||
let reorged = self.handle_incoming_alt_block(block, txs).await?;
|
||||
|
||||
Ok::<_, anyhow::Error>(reorged)
|
||||
}
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Err(e) => {
|
||||
batch.peer_handle.ban_peer(LONG_BAN);
|
||||
self.stop_current_block_downloader.notify_one();
|
||||
return;
|
||||
}
|
||||
Ok(AddAltBlock::Reorged) => {
|
||||
// Collect the remaining blocks and add them to the main chain instead.
|
||||
batch.blocks = blocks.collect();
|
||||
self.handle_incoming_block_batch_main_chain(batch).await;
|
||||
return;
|
||||
}
|
||||
// continue adding alt blocks.
|
||||
Ok(AddAltBlock::Cached) => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles an incoming alt [`Block`].
|
||||
///
|
||||
/// This function will do some pre-validation of the alt block, then if the cumulative difficulty
|
||||
/// of the alt chain is higher than the main chain it will attempt a reorg otherwise it will add
|
||||
/// the alt block to the alt block cache.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// This will return an [`Err`] if:
|
||||
/// - The alt block was invalid.
|
||||
/// - An attempt to reorg the chain failed.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function will panic if any internal service returns an unexpected error that we cannot
|
||||
/// recover from.
|
||||
async fn handle_incoming_alt_block(
|
||||
&mut self,
|
||||
block: Block,
|
||||
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
|
||||
) -> Result<AddAltBlock, anyhow::Error> {
|
||||
let VerifyBlockResponse::AltChain(alt_block_info) = self
|
||||
.block_verifier_service
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(VerifyBlockRequest::AltChain {
|
||||
block,
|
||||
prepared_txs,
|
||||
})
|
||||
.await?
|
||||
else {
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
// TODO: check in consensus crate if alt block with this hash already exists.
|
||||
|
||||
// If this alt chain
|
||||
if alt_block_info.cumulative_difficulty
|
||||
> self.cached_blockchain_context.cumulative_difficulty
|
||||
{
|
||||
self.try_do_reorg(alt_block_info).await?;
|
||||
return Ok(AddAltBlock::Reorged);
|
||||
}
|
||||
|
||||
self.blockchain_write_handle
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(BlockchainWriteRequest::WriteAltBlock(alt_block_info))
|
||||
.await?;
|
||||
|
||||
Ok(AddAltBlock::Cached)
|
||||
}
|
||||
|
||||
/// Attempt a re-org with the given top block of the alt-chain.
|
||||
///
|
||||
/// This function will take a write lock on [`REORG_LOCK`] and then set up the blockchain database
|
||||
/// and context cache to verify the alt-chain. It will then attempt to verify and add each block
|
||||
/// in the alt-chain to the main-chain. Releasing the lock on [`REORG_LOCK`] when finished.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// This function will return an [`Err`] if the re-org was unsuccessful, if this happens the chain
|
||||
/// will be returned back into its state it was at when then function was called.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function will panic if any internal service returns an unexpected error that we cannot
|
||||
/// recover from.
|
||||
async fn try_do_reorg(
|
||||
&mut self,
|
||||
top_alt_block: AltBlockInformation,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let _guard = REORG_LOCK.write().await;
|
||||
|
||||
let BlockchainResponse::AltBlocksInChain(mut alt_blocks) = self
|
||||
.blockchain_read_handle
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(BlockchainReadRequest::AltBlocksInChain(
|
||||
top_alt_block.chain_id,
|
||||
))
|
||||
.await?
|
||||
else {
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
alt_blocks.push(top_alt_block);
|
||||
|
||||
let split_height = alt_blocks[0].height;
|
||||
let current_main_chain_height = self.cached_blockchain_context.chain_height;
|
||||
|
||||
let BlockchainResponse::PopBlocks(old_main_chain_id) = self
|
||||
.blockchain_write_handle
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(BlockchainWriteRequest::PopBlocks(
|
||||
current_main_chain_height - split_height + 1,
|
||||
))
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
else {
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
self.blockchain_context_service
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(BlockChainContextRequest::PopBlocks {
|
||||
numb_blocks: current_main_chain_height - split_height + 1,
|
||||
})
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR);
|
||||
|
||||
let reorg_res = self.verify_add_alt_blocks_to_main_chain(alt_blocks).await;
|
||||
|
||||
match reorg_res {
|
||||
Ok(()) => Ok(()),
|
||||
Err(e) => {
|
||||
todo!("Reverse reorg")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Verify and add a list of [`AltBlockInformation`]s to the main-chain.
|
||||
///
|
||||
/// This function assumes the first [`AltBlockInformation`] is the next block in the blockchain
|
||||
/// for the blockchain database and the context cache, or in other words that the blockchain database
|
||||
/// and context cache have already had the top blocks popped to where the alt-chain meets the main-chain.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// This function will return an [`Err`] if the alt-blocks were invalid, in this case the re-org should
|
||||
/// be aborted and the chain should be returned to its previous state.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function will panic if any internal service returns an unexpected error that we cannot
|
||||
/// recover from.
|
||||
async fn verify_add_alt_blocks_to_main_chain(
|
||||
&mut self,
|
||||
alt_blocks: Vec<AltBlockInformation>,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
for mut alt_block in alt_blocks {
|
||||
let prepped_txs = alt_block
|
||||
.txs
|
||||
.drain(..)
|
||||
.map(|tx| Ok(Arc::new(tx.try_into()?)))
|
||||
.collect::<Result<_, anyhow::Error>>()?;
|
||||
|
||||
let prepped_block = PreparedBlock::new_alt_block(alt_block)?;
|
||||
|
||||
let VerifyBlockResponse::MainChain(verified_block) = self
|
||||
.block_verifier_service
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(VerifyBlockRequest::MainChainPrepped {
|
||||
block: prepped_block,
|
||||
txs: prepped_txs,
|
||||
})
|
||||
.await?
|
||||
else {
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
self.add_valid_block_to_main_chain(verified_block).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Adds a [`VerifiedBlockInformation`] to the main-chain.
|
||||
///
|
||||
/// This function will update the blockchain database and the context cache, it will also
|
||||
/// update [`Self::cached_blockchain_context`].
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This function will panic if any internal service returns an unexpected error that we cannot
|
||||
/// recover from.
|
||||
pub async fn add_valid_block_to_main_chain(
|
||||
&mut self,
|
||||
verified_block: VerifiedBlockInformation,
|
||||
) {
|
||||
self.blockchain_context_service
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(BlockChainContextRequest::Update(NewBlockData {
|
||||
block_hash: verified_block.block_hash,
|
||||
height: verified_block.height,
|
||||
timestamp: verified_block.block.header.timestamp,
|
||||
weight: verified_block.weight,
|
||||
long_term_weight: verified_block.long_term_weight,
|
||||
generated_coins: verified_block.generated_coins,
|
||||
vote: HardFork::from_vote(verified_block.block.header.hardfork_signal),
|
||||
cumulative_difficulty: verified_block.cumulative_difficulty,
|
||||
}))
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR);
|
||||
|
||||
self.blockchain_write_handle
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(BlockchainWriteRequest::WriteBlock(verified_block))
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR);
|
||||
|
||||
let BlockChainContextResponse::Context(blockchain_context) = self
|
||||
.blockchain_context_service
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(BlockChainContextRequest::GetContext)
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
else {
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone();
|
||||
}
|
||||
}
|
||||
|
||||
/// The result from successfully adding an alt-block.
|
||||
enum AddAltBlock {
|
||||
/// The alt-block was cached.
|
||||
Cached,
|
||||
/// The chain was reorged.
|
||||
Reorged,
|
||||
}
|
|
@ -1 +1,143 @@
|
|||
// FIXME: This whole module is not great and should be rewritten when the PeerSet is made.
|
||||
use std::{pin::pin, sync::Arc, time::Duration};
|
||||
|
||||
use futures::StreamExt;
|
||||
use tokio::time::interval;
|
||||
use tokio::{
|
||||
sync::{mpsc, Notify},
|
||||
time::sleep,
|
||||
};
|
||||
use tower::{Service, ServiceExt};
|
||||
use tracing::instrument;
|
||||
|
||||
use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse};
|
||||
use cuprate_p2p::{
|
||||
block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
|
||||
NetworkInterface,
|
||||
};
|
||||
use cuprate_p2p_core::ClearNet;
|
||||
|
||||
const CHECK_SYNC_FREQUENCY: Duration = Duration::from_secs(30);
|
||||
|
||||
/// An error returned from the [`syncer`].
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum SyncerError {
|
||||
#[error("Incoming block channel closed.")]
|
||||
IncomingBlockChannelClosed,
|
||||
#[error("One of our services returned an error: {0}.")]
|
||||
ServiceError(#[from] tower::BoxError),
|
||||
}
|
||||
|
||||
/// The syncer tasks that makes sure we are fully synchronised with our connected peers.
|
||||
#[expect(
|
||||
clippy::significant_drop_tightening,
|
||||
reason = "Client pool which will be removed"
|
||||
)]
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
pub async fn syncer<C, CN>(
|
||||
mut context_svc: C,
|
||||
our_chain: CN,
|
||||
clearnet_interface: NetworkInterface<ClearNet>,
|
||||
incoming_block_batch_tx: mpsc::Sender<BlockBatch>,
|
||||
stop_current_block_downloader: Arc<Notify>,
|
||||
block_downloader_config: BlockDownloaderConfig,
|
||||
) -> Result<(), SyncerError>
|
||||
where
|
||||
C: Service<
|
||||
BlockChainContextRequest,
|
||||
Response = BlockChainContextResponse,
|
||||
Error = tower::BoxError,
|
||||
>,
|
||||
C::Future: Send + 'static,
|
||||
CN: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
CN::Future: Send + 'static,
|
||||
{
|
||||
tracing::info!("Starting blockchain syncer");
|
||||
|
||||
let mut check_sync_interval = interval(CHECK_SYNC_FREQUENCY);
|
||||
|
||||
let BlockChainContextResponse::Context(mut blockchain_ctx) = context_svc
|
||||
.ready()
|
||||
.await?
|
||||
.call(BlockChainContextRequest::GetContext)
|
||||
.await?
|
||||
else {
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
let client_pool = clearnet_interface.client_pool();
|
||||
|
||||
tracing::debug!("Waiting for new sync info in top sync channel");
|
||||
|
||||
loop {
|
||||
check_sync_interval.tick().await;
|
||||
|
||||
tracing::trace!("Checking connected peers to see if we are behind",);
|
||||
|
||||
check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?;
|
||||
let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context();
|
||||
|
||||
if !client_pool.contains_client_with_more_cumulative_difficulty(
|
||||
raw_blockchain_context.cumulative_difficulty,
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
"We are behind peers claimed cumulative difficulty, starting block downloader"
|
||||
);
|
||||
let mut block_batch_stream =
|
||||
clearnet_interface.block_downloader(our_chain.clone(), block_downloader_config);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
() = stop_current_block_downloader.notified() => {
|
||||
tracing::info!("Stopping block downloader");
|
||||
break;
|
||||
}
|
||||
batch = block_batch_stream.next() => {
|
||||
let Some(batch) = batch else {
|
||||
break;
|
||||
};
|
||||
|
||||
tracing::debug!("Got batch, len: {}", batch.blocks.len());
|
||||
if incoming_block_batch_tx.send(batch).await.is_err() {
|
||||
return Err(SyncerError::IncomingBlockChannelClosed);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if we should update the given [`BlockChainContext`] and updates it if needed.
|
||||
async fn check_update_blockchain_context<C>(
|
||||
context_svc: C,
|
||||
old_context: &mut BlockChainContext,
|
||||
) -> Result<(), tower::BoxError>
|
||||
where
|
||||
C: Service<
|
||||
BlockChainContextRequest,
|
||||
Response = BlockChainContextResponse,
|
||||
Error = tower::BoxError,
|
||||
>,
|
||||
C::Future: Send + 'static,
|
||||
{
|
||||
if old_context.blockchain_context().is_ok() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let BlockChainContextResponse::Context(ctx) = context_svc
|
||||
.oneshot(BlockChainContextRequest::GetContext)
|
||||
.await?
|
||||
else {
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
*old_context = ctx;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
24
binaries/cuprated/src/blockchain/types.rs
Normal file
24
binaries/cuprated/src/blockchain/types.rs
Normal file
|
@ -0,0 +1,24 @@
|
|||
use std::task::{Context, Poll};
|
||||
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use tower::{util::MapErr, Service};
|
||||
|
||||
use cuprate_blockchain::{cuprate_database::RuntimeError, service::BlockchainReadHandle};
|
||||
use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService};
|
||||
use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse};
|
||||
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
|
||||
|
||||
/// The [`BlockVerifierService`] with all generic types defined.
|
||||
pub type ConcreteBlockVerifierService = BlockVerifierService<
|
||||
BlockChainContextService,
|
||||
ConcreteTxVerifierService,
|
||||
ConsensusBlockchainReadHandle,
|
||||
>;
|
||||
|
||||
/// The [`TxVerifierService`] with all generic types defined.
|
||||
pub type ConcreteTxVerifierService = TxVerifierService<ConsensusBlockchainReadHandle>;
|
||||
|
||||
/// The [`BlockchainReadHandle`] with the [`tower::Service::Error`] mapped to conform to what the consensus crate requires.
|
||||
pub type ConsensusBlockchainReadHandle =
|
||||
MapErr<BlockchainReadHandle, fn(RuntimeError) -> tower::BoxError>;
|
|
@ -14,6 +14,10 @@ pub const VERSION_BUILD: &str = if cfg!(debug_assertions) {
|
|||
formatcp!("{VERSION}-release")
|
||||
};
|
||||
|
||||
/// The panic message used when cuprated encounters a critical service error.
|
||||
pub const PANIC_CRITICAL_SERVICE_ERROR: &str =
|
||||
"A service critical to Cuprate's function returned an unexpected error.";
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
|
|
@ -16,6 +16,7 @@ mod config;
|
|||
mod constants;
|
||||
mod p2p;
|
||||
mod rpc;
|
||||
mod signals;
|
||||
mod statics;
|
||||
mod txpool;
|
||||
|
||||
|
|
|
@ -2,4 +2,4 @@
|
|||
//!
|
||||
//! Will handle initiating the P2P and contains a protocol request handler.
|
||||
|
||||
mod request_handler;
|
||||
pub mod request_handler;
|
||||
|
|
12
binaries/cuprated/src/signals.rs
Normal file
12
binaries/cuprated/src/signals.rs
Normal file
|
@ -0,0 +1,12 @@
|
|||
//! Signals for Cuprate state used throughout the binary.
|
||||
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
/// Reorg lock.
|
||||
///
|
||||
/// A [`RwLock`] where a write lock is taken during a reorg and a read lock can be taken
|
||||
/// for any operation which must complete without a reorg happening.
|
||||
///
|
||||
/// Currently, the only operation that needs to take a read lock is adding txs to the tx-pool,
|
||||
/// this can potentially be removed in the future, see: <https://github.com/Cuprate/cuprate/issues/305>
|
||||
pub static REORG_LOCK: RwLock<()> = RwLock::const_new(());
|
|
@ -123,7 +123,7 @@ impl PreparedBlock {
|
|||
///
|
||||
/// The randomX VM must be Some if RX is needed or this will panic.
|
||||
/// The randomX VM must also be initialised with the correct seed.
|
||||
fn new<R: RandomX>(block: Block, randomx_vm: Option<&R>) -> Result<Self, ConsensusError> {
|
||||
pub fn new<R: RandomX>(block: Block, randomx_vm: Option<&R>) -> Result<Self, ConsensusError> {
|
||||
let (hf_version, hf_vote) = HardFork::from_block_header(&block.header)
|
||||
.map_err(|_| BlockError::HardForkError(HardForkError::HardForkUnknown))?;
|
||||
|
||||
|
@ -180,6 +180,20 @@ impl PreparedBlock {
|
|||
block: block.block,
|
||||
})
|
||||
}
|
||||
|
||||
/// Creates a new [`PreparedBlock`] from an [`AltBlockInformation`].
|
||||
pub fn new_alt_block(block: AltBlockInformation) -> Result<Self, ConsensusError> {
|
||||
Ok(Self {
|
||||
block_blob: block.block_blob,
|
||||
hf_vote: HardFork::from_version(block.block.header.hardfork_version)
|
||||
.map_err(|_| BlockError::HardForkError(HardForkError::HardForkUnknown))?,
|
||||
hf_version: HardFork::from_vote(block.block.header.hardfork_signal),
|
||||
block_hash: block.block_hash,
|
||||
pow_hash: block.pow_hash,
|
||||
miner_tx_weight: block.block.miner_transaction.weight(),
|
||||
block: block.block,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A request to verify a block.
|
||||
|
@ -246,7 +260,7 @@ where
|
|||
+ Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
D: Database + Clone + Send + Sync + 'static,
|
||||
D: Database + Clone + Send + 'static,
|
||||
D::Future: Send + 'static,
|
||||
{
|
||||
/// Creates a new block verifier.
|
||||
|
@ -276,7 +290,7 @@ where
|
|||
+ 'static,
|
||||
TxV::Future: Send + 'static,
|
||||
|
||||
D: Database + Clone + Send + Sync + 'static,
|
||||
D: Database + Clone + Send + 'static,
|
||||
D::Future: Send + 'static,
|
||||
{
|
||||
type Response = VerifyBlockResponse;
|
||||
|
|
|
@ -37,6 +37,7 @@ pub use context::{
|
|||
pub use transactions::{TxVerifierService, VerifyTxRequest, VerifyTxResponse};
|
||||
|
||||
// re-export.
|
||||
pub use cuprate_consensus_rules::genesis::generate_genesis_block;
|
||||
pub use cuprate_types::{
|
||||
blockchain::{BlockchainReadRequest, BlockchainResponse},
|
||||
HardFork,
|
||||
|
@ -64,17 +65,13 @@ pub enum ExtendedConsensusError {
|
|||
}
|
||||
|
||||
/// Initialize the 2 verifier [`tower::Service`]s (block and transaction).
|
||||
#[expect(clippy::type_complexity)]
|
||||
pub fn initialize_verifier<D, Ctx>(
|
||||
database: D,
|
||||
ctx_svc: Ctx,
|
||||
) -> Result<
|
||||
(
|
||||
) -> (
|
||||
BlockVerifierService<Ctx, TxVerifierService<D>, D>,
|
||||
TxVerifierService<D>,
|
||||
),
|
||||
ConsensusError,
|
||||
>
|
||||
)
|
||||
where
|
||||
D: Database + Clone + Send + Sync + 'static,
|
||||
D::Future: Send + 'static,
|
||||
|
@ -90,7 +87,7 @@ where
|
|||
{
|
||||
let tx_svc = TxVerifierService::new(database.clone());
|
||||
let block_svc = BlockVerifierService::new(ctx_svc, tx_svc.clone(), database);
|
||||
Ok((block_svc, tx_svc))
|
||||
(block_svc, tx_svc)
|
||||
}
|
||||
|
||||
use __private::Database;
|
||||
|
|
|
@ -89,6 +89,7 @@ pub use protocol::*;
|
|||
use services::*;
|
||||
//re-export
|
||||
pub use cuprate_helper::network::Network;
|
||||
pub use cuprate_wire::CoreSyncData;
|
||||
|
||||
/// The direction of a connection.
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! # Block Downloader
|
||||
//!
|
||||
//! This module contains the [`BlockDownloader`], which finds a chain to
|
||||
//! This module contains the block downloader, which finds a chain to
|
||||
//! download from our connected peers and downloads it. See the actual
|
||||
//! `struct` documentation for implementation details.
|
||||
//!
|
||||
|
|
|
@ -153,6 +153,18 @@ impl<N: NetworkZone> ClientPool<N> {
|
|||
|
||||
self.borrow_clients(&peers).collect()
|
||||
}
|
||||
|
||||
/// Checks all clients in the pool checking if any claim a higher cumulative difficulty than the
|
||||
/// amount specified.
|
||||
pub fn contains_client_with_more_cumulative_difficulty(
|
||||
&self,
|
||||
cumulative_difficulty: u128,
|
||||
) -> bool {
|
||||
self.clients.iter().any(|element| {
|
||||
let sync_data = element.value().info.core_sync_data.lock().unwrap();
|
||||
sync_data.cumulative_difficulty() > cumulative_difficulty
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
mod sealed {
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
use cuprate_address_book::AddressBookConfig;
|
||||
use cuprate_helper::network::Network;
|
||||
use cuprate_p2p_core::NetworkZone;
|
||||
use cuprate_wire::{common::PeerSupportFlags, BasicNodeData};
|
||||
|
||||
pub use cuprate_address_book::AddressBookConfig;
|
||||
|
||||
/// P2P config.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct P2PConfig<N: NetworkZone> {
|
||||
|
|
|
@ -16,14 +16,13 @@ pub(crate) const MAX_SEED_CONNECTIONS: usize = 3;
|
|||
pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
/// The durations of a short ban.
|
||||
#[cfg_attr(not(test), expect(dead_code))]
|
||||
pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10);
|
||||
pub const SHORT_BAN: Duration = Duration::from_secs(60 * 10);
|
||||
|
||||
/// The durations of a medium ban.
|
||||
pub(crate) const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24);
|
||||
pub const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24);
|
||||
|
||||
/// The durations of a long ban.
|
||||
pub(crate) const LONG_BAN: Duration = Duration::from_secs(60 * 60 * 24 * 7);
|
||||
pub const LONG_BAN: Duration = Duration::from_secs(60 * 60 * 24 * 7);
|
||||
|
||||
/// The default amount of time between inbound diffusion flushes.
|
||||
pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5);
|
||||
|
|
|
@ -12,23 +12,21 @@ use tracing::{instrument, Instrument, Span};
|
|||
use cuprate_async_buffer::BufferStream;
|
||||
use cuprate_p2p_core::{
|
||||
client::Connector,
|
||||
client::InternalPeerID,
|
||||
services::{AddressBookRequest, AddressBookResponse},
|
||||
CoreSyncSvc, NetworkZone, ProtocolRequestHandlerMaker,
|
||||
};
|
||||
|
||||
mod block_downloader;
|
||||
pub mod block_downloader;
|
||||
mod broadcast;
|
||||
mod client_pool;
|
||||
pub mod config;
|
||||
pub mod connection_maintainer;
|
||||
mod constants;
|
||||
pub mod constants;
|
||||
mod inbound_server;
|
||||
|
||||
use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse};
|
||||
pub use broadcast::{BroadcastRequest, BroadcastSvc};
|
||||
use client_pool::ClientPoolDropGuard;
|
||||
pub use config::P2PConfig;
|
||||
pub use config::{AddressBookConfig, P2PConfig};
|
||||
use connection_maintainer::MakeConnectionRequest;
|
||||
|
||||
/// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`].
|
||||
|
@ -174,9 +172,8 @@ impl<N: NetworkZone> NetworkInterface<N> {
|
|||
self.address_book.clone()
|
||||
}
|
||||
|
||||
/// Pulls a client from the client pool, returning it in a guard that will return it there when it's
|
||||
/// dropped.
|
||||
pub fn borrow_client(&self, peer: &InternalPeerID<N::Addr>) -> Option<ClientPoolDropGuard<N>> {
|
||||
self.pool.borrow_client(peer)
|
||||
/// Borrows the `ClientPool`, for access to connected peers.
|
||||
pub const fn client_pool(&self) -> &Arc<client_pool::ClientPool<N>> {
|
||||
&self.pool
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue