add more docs + cleanup imports

This commit is contained in:
Boog900 2024-10-03 21:35:42 +01:00
parent caaeceda2e
commit 8cff481227
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
12 changed files with 180 additions and 137 deletions

View file

@ -1,11 +1,11 @@
//! Blockchain
//!
//! Will contain the chain manager and syncer.
use std::sync::Arc;
use futures::FutureExt;
use std::sync::Arc;
use tokio::sync::{mpsc, Notify};
use tower::{Service, ServiceExt};
use tower::{BoxError, Service, ServiceExt};
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::{generate_genesis_block, BlockChainContextService, ContextConfig};
@ -22,11 +22,8 @@ mod manager;
mod syncer;
mod types;
use crate::blockchain::interface::INCOMING_BLOCK_TX;
use manager::BlockchainManager;
use types::{
ChainService, ConcreteBlockVerifierService, ConcreteTxVerifierService,
ConsensusBlockchainReadHandle,
ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle,
};
pub use interface::{handle_incoming_block, IncomingBlockError};
@ -51,6 +48,9 @@ pub async fn check_add_genesis(
let genesis = generate_genesis_block(network);
assert_eq!(genesis.miner_transaction.prefix().outputs.len(), 1);
assert!(genesis.transactions.is_empty());
blockchain_write_handle
.ready()
.await
@ -87,16 +87,14 @@ pub async fn init_consensus(
),
tower::BoxError,
> {
let ctx_service = cuprate_consensus::initialize_blockchain_context(
context_config,
ConsensusBlockchainReadHandle(blockchain_read_handle.clone()),
)
let read_handle = ConsensusBlockchainReadHandle::new(blockchain_read_handle, BoxError::from);
let ctx_service =
cuprate_consensus::initialize_blockchain_context(context_config, read_handle.clone())
.await?;
let (block_verifier_svc, tx_verifier_svc) = cuprate_consensus::initialize_verifier(
ConsensusBlockchainReadHandle(blockchain_read_handle),
ctx_service.clone(),
);
let (block_verifier_svc, tx_verifier_svc) =
cuprate_consensus::initialize_verifier(read_handle, ctx_service.clone());
Ok((block_verifier_svc, tx_verifier_svc, ctx_service))
}

View file

@ -1,39 +1,74 @@
use crate::blockchain::manager::commands::BlockchainManagerCommand;
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_helper::cast::usize_to_u64;
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
use cuprate_types::Chain;
use monero_serai::block::Block;
use monero_serai::transaction::Transaction;
//! The blockchain manger interface.
//!
//! This module contains all the functions to mutate the blockchains state in any way, through the
//! blockchain manger.
use std::{
collections::{HashMap, HashSet},
sync::{Mutex, OnceLock},
};
use monero_serai::{block::Block, transaction::Transaction};
use rayon::prelude::*;
use std::collections::{HashMap, HashSet};
use std::sync::{Mutex, OnceLock};
use tokio::sync::{mpsc, oneshot};
use tower::{Service, ServiceExt};
pub static INCOMING_BLOCK_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_helper::cast::usize_to_u64;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain,
};
use crate::{
blockchain::manager::BlockchainManagerCommand, constants::PANIC_CRITICAL_SERVICE_ERROR,
};
/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manger.
pub static COMMAND_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
pub static BLOCKS_BEING_HANDLED: OnceLock<Mutex<HashSet<[u8; 32]>>> = OnceLock::new();
/// An error that can be returned from [`handle_incoming_block`].
#[derive(Debug, thiserror::Error)]
pub enum IncomingBlockError {
/// Some transactions in the block were unknown.
///
/// The inner values are the block hash and the indexes of the missing txs in the block.
#[error("Unknown transactions in block.")]
UnknownTransactions([u8; 32], Vec<u64>),
/// We are missing the block's parent.
#[error("The block has an unknown parent.")]
Orphan,
/// The block was invalid.
#[error(transparent)]
InvalidBlock(anyhow::Error),
}
/// Try to add a new block to the blockchain.
///
/// This returns a [`bool`] indicating if the block was added to the main-chain ([`true`]) or an alt-chain
/// ([`false`]).
///
/// If we already knew about this block or the blockchain manger is not setup yet `Ok(false)` is returned.
///
/// # Errors
///
/// This function will return an error if:
/// - the block was invalid
/// - we are missing transactions
/// - the block's parent is unknown
pub async fn handle_incoming_block(
block: Block,
given_txs: Vec<Transaction>,
blockchain_read_handle: &mut BlockchainReadHandle,
) -> Result<bool, IncomingBlockError> {
// FIXME: we should look in the tx-pool for txs when that is ready.
if !block_exists(block.header.previous, blockchain_read_handle)
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
{
return Err(IncomingBlockError::Orphan);
}
@ -42,12 +77,12 @@ pub async fn handle_incoming_block(
if block_exists(block_hash, blockchain_read_handle)
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
{
return Ok(false);
}
// TODO: Get transactions from the tx pool first.
// TODO: remove this when we have a working tx-pool.
if given_txs.len() != block.transactions.len() {
return Err(IncomingBlockError::UnknownTransactions(
block_hash,
@ -55,6 +90,7 @@ pub async fn handle_incoming_block(
));
}
// TODO: check we actually go given the right txs.
let prepped_txs = given_txs
.into_par_iter()
.map(|tx| {
@ -64,19 +100,25 @@ pub async fn handle_incoming_block(
.collect::<Result<_, anyhow::Error>>()
.map_err(IncomingBlockError::InvalidBlock)?;
let Some(incoming_block_tx) = INCOMING_BLOCK_TX.get() else {
let Some(incoming_block_tx) = COMMAND_TX.get() else {
// We could still be starting up the blockchain manger, so just return this as there is nothing
// else we can do.
return Ok(false);
};
// Add the blocks hash to the blocks being handled.
if !BLOCKS_BEING_HANDLED
.get_or_init(|| Mutex::new(HashSet::new()))
.lock()
.unwrap()
.insert(block_hash)
{
// If another place is already adding this block then we can stop.
return Ok(false);
}
// From this point on we MUST not early return without removing the block hash from `BLOCKS_BEING_HANDLED`.
let (response_tx, response_rx) = oneshot::channel();
incoming_block_tx
@ -86,13 +128,14 @@ pub async fn handle_incoming_block(
response_tx,
})
.await
.expect("TODO: don't actually panic here");
.expect("TODO: don't actually panic here, an err means we are shutting down");
let res = response_rx
.await
.unwrap()
.expect("The blockchain manager will always respond")
.map_err(IncomingBlockError::InvalidBlock);
// Remove the block hash from the blocks being handled.
BLOCKS_BEING_HANDLED
.get()
.unwrap()
@ -103,6 +146,7 @@ pub async fn handle_incoming_block(
res
}
/// Check if we have a block with the given hash.
async fn block_exists(
block_hash: [u8; 32],
blockchain_read_handle: &mut BlockchainReadHandle,

View file

@ -1,35 +1,46 @@
pub(super) mod commands;
mod handler;
use std::{collections::HashMap, sync::Arc};
use crate::blockchain::interface::INCOMING_BLOCK_TX;
use crate::blockchain::manager::commands::BlockchainManagerCommand;
use crate::blockchain::types::ChainService;
use crate::blockchain::{
syncer,
types::{ConcreteBlockVerifierService, ConsensusBlockchainReadHandle},
};
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::context::RawBlockChainContext;
use cuprate_consensus::{
BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService,
BlockVerifierService, ExtendedConsensusError, TxVerifierService, VerifyBlockRequest,
VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse,
};
use cuprate_p2p::block_downloader::{BlockBatch, BlockDownloaderConfig};
use cuprate_p2p::{BroadcastSvc, NetworkInterface};
use cuprate_p2p_core::ClearNet;
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
use cuprate_types::{Chain, TransactionVerificationData};
use futures::StreamExt;
use monero_serai::block::Block;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::{oneshot, Notify};
use tokio::sync::{mpsc, oneshot, Notify};
use tower::{Service, ServiceExt};
use tracing::error;
use tracing_subscriber::fmt::time::FormatTime;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::{
context::RawBlockChainContext, BlockChainContextRequest, BlockChainContextResponse,
BlockChainContextService, BlockVerifierService, ExtendedConsensusError, TxVerifierService,
VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse,
};
use cuprate_p2p::{
block_downloader::{BlockBatch, BlockDownloaderConfig},
BroadcastSvc, NetworkInterface,
};
use cuprate_p2p_core::ClearNet;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain, TransactionVerificationData,
};
use crate::{
blockchain::{
interface::COMMAND_TX,
syncer,
types::ChainService,
types::{ConcreteBlockVerifierService, ConsensusBlockchainReadHandle},
},
constants::PANIC_CRITICAL_SERVICE_ERROR,
};
mod commands;
mod handler;
pub use commands::BlockchainManagerCommand;
/// Initialize the blockchain manger.
///
/// This function sets up the [`BlockchainManager`] and the [`syncer`] so that the functions in [`interface`](super::interface)
/// can be called.
pub async fn init_blockchain_manger(
clearnet_interface: NetworkInterface<ClearNet>,
blockchain_write_handle: BlockchainWriteHandle,
@ -42,7 +53,7 @@ pub async fn init_blockchain_manger(
let stop_current_block_downloader = Arc::new(Notify::new());
let (command_tx, command_rx) = mpsc::channel(1);
INCOMING_BLOCK_TX.set(command_tx).unwrap();
COMMAND_TX.set(command_tx).unwrap();
tokio::spawn(syncer::syncer(
blockchain_context_service.clone(),
@ -56,10 +67,10 @@ pub async fn init_blockchain_manger(
let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service
.ready()
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::GetContext)
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
panic!("Blockchain context service returned wrong response!");
};
@ -71,7 +82,7 @@ pub async fn init_blockchain_manger(
cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(),
block_verifier_service,
stop_current_block_downloader,
broadcast_svc,
broadcast_svc: clearnet_interface.broadcast_svc(),
};
tokio::spawn(manger.run(batch_rx, command_rx));
@ -97,11 +108,7 @@ pub struct BlockchainManager {
/// A cached context representing the current state.
cached_blockchain_context: RawBlockChainContext,
/// The block verifier service, to verify incoming blocks.
block_verifier_service: BlockVerifierService<
BlockChainContextService,
TxVerifierService<ConsensusBlockchainReadHandle>,
ConsensusBlockchainReadHandle,
>,
block_verifier_service: ConcreteBlockVerifierService,
/// A [`Notify`] to tell the [syncer](syncer::syncer) that we want to cancel this current download
/// attempt.
stop_current_block_downloader: Arc<Notify>,
@ -110,6 +117,7 @@ pub struct BlockchainManager {
}
impl BlockchainManager {
/// The [`BlockchainManager`] task.
pub async fn run(
mut self,
mut block_batch_rx: mpsc::Receiver<BlockBatch>,

View file

@ -1,8 +1,9 @@
use std::{collections::HashMap, sync::Arc};
use bytes::Bytes;
use futures::{TryFutureExt, TryStreamExt};
use monero_serai::{block::Block, transaction::Transaction};
use rayon::prelude::*;
use std::{collections::HashMap, sync::Arc};
use tower::{Service, ServiceExt};
use tracing::info;
@ -20,11 +21,16 @@ use cuprate_types::{
AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation,
};
use crate::blockchain::manager::commands::BlockchainManagerCommand;
use crate::constants::PANIC_CRITICAL_SERVICE_ERROR;
use crate::{blockchain::types::ConsensusBlockchainReadHandle, signals::REORG_LOCK};
use crate::{
blockchain::{
manager::commands::BlockchainManagerCommand, types::ConsensusBlockchainReadHandle,
},
constants::PANIC_CRITICAL_SERVICE_ERROR,
signals::REORG_LOCK,
};
impl super::BlockchainManager {
/// Handle an incoming command from another part of Cuprate.
pub async fn handle_command(&mut self, command: BlockchainManagerCommand) {
match command {
BlockchainManagerCommand::AddBlock {
@ -39,6 +45,7 @@ impl super::BlockchainManager {
}
}
/// Broadcast a valid block to the network.
async fn broadcast_block(&mut self, block_bytes: Bytes, blockchain_height: usize) {
self.broadcast_svc
.ready()
@ -191,7 +198,7 @@ impl super::BlockchainManager {
/// This function will panic if any internal service returns an unexpected error that we cannot
/// recover from.
async fn handle_incoming_block_batch_alt_chain(&mut self, mut batch: BlockBatch) {
// TODO: this needs testing (this whole section does but this specifically).
// TODO: this needs testing (this whole section does but alt-blocks specifically).
let mut blocks = batch.blocks.into_iter();
@ -394,7 +401,7 @@ impl super::BlockchainManager {
.block_verifier_service
.ready()
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(VerifyBlockRequest::MainChainPrepped {
block: prepped_block,
txs: prepped_txs,
@ -426,7 +433,7 @@ impl super::BlockchainManager {
self.blockchain_context_service
.ready()
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::Update(NewBlockData {
block_hash: verified_block.block_hash,
height: verified_block.height,
@ -438,24 +445,24 @@ impl super::BlockchainManager {
cumulative_difficulty: verified_block.cumulative_difficulty,
}))
.await
.expect("TODO");
.expect(PANIC_CRITICAL_SERVICE_ERROR);
self.blockchain_write_handle
.ready()
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockchainWriteRequest::WriteBlock(verified_block))
.await
.expect("TODO");
.expect(PANIC_CRITICAL_SERVICE_ERROR);
let BlockChainContextResponse::Context(blockchain_context) = self
.blockchain_context_service
.ready()
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::GetContext)
.await
.expect("TODO")
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
panic!("Incorrect response!");
};

View file

@ -1,6 +1,4 @@
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;
use std::{pin::pin, sync::Arc, time::Duration};
use futures::StreamExt;
use tokio::time::interval;
@ -18,6 +16,7 @@ use cuprate_p2p::{
};
use cuprate_p2p_core::ClearNet;
// FIXME: This whole module is not great and should be rewritten when the PeerSet is made.
const CHECK_SYNC_FREQUENCY: Duration = Duration::from_secs(30);
/// An error returned from the [`syncer`].

View file

@ -1,41 +1,32 @@
use cuprate_blockchain::cuprate_database::RuntimeError;
use cuprate_blockchain::service::BlockchainReadHandle;
use std::task::{Context, Poll};
use futures::future::BoxFuture;
use futures::{FutureExt, TryFutureExt};
use tower::{util::MapErr, Service};
use cuprate_blockchain::{cuprate_database::RuntimeError, service::BlockchainReadHandle};
use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService};
use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse};
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
use futures::future::{BoxFuture, MapErr};
use futures::{FutureExt, TryFutureExt};
use std::task::{Context, Poll};
use tower::Service;
/// The [`BlockVerifierService`] with all generic types defined.
pub type ConcreteBlockVerifierService = BlockVerifierService<
BlockChainContextService,
TxVerifierService<ConsensusBlockchainReadHandle>,
ConcreteTxVerifierService,
ConsensusBlockchainReadHandle,
>;
/// The [`TxVerifierService`] with all generic types defined.
pub type ConcreteTxVerifierService = TxVerifierService<ConsensusBlockchainReadHandle>;
#[derive(Clone)]
pub struct ConsensusBlockchainReadHandle(pub BlockchainReadHandle);
impl Service<BlockchainReadRequest> for ConsensusBlockchainReadHandle {
type Response = BlockchainResponse;
type Error = tower::BoxError;
type Future = MapErr<
<BlockchainReadHandle as Service<BlockchainReadRequest>>::Future,
fn(RuntimeError) -> tower::BoxError,
>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx).map_err(Into::into)
}
fn call(&mut self, req: BlockchainReadRequest) -> Self::Future {
self.0.call(req).map_err(Into::into)
}
}
/// The [`BlockchainReadHandle`] with the [`tower::Service::Error`] mapped to conform to what the consensus crate requires.
pub type ConsensusBlockchainReadHandle =
MapErr<BlockchainReadHandle, fn(RuntimeError) -> tower::BoxError>;
/// That service that allows retrieving the chain state to give to the P2P crates, so we can figure out
/// what blocks we need.
///
/// This has a more minimal interface than [`BlockchainReadRequest`] to make using the p2p crates easier.
#[derive(Clone)]
pub struct ChainService(pub BlockchainReadHandle);
@ -79,6 +70,7 @@ impl Service<ChainSvcRequest> for ChainService {
.call(BlockchainReadRequest::CompactChainHistory)
.map_ok(|res| {
// TODO create a custom request instead of hijacking this one.
// TODO: use the context cache.
let BlockchainResponse::CompactChainHistory {
cumulative_difficulty,
..

View file

@ -90,7 +90,7 @@ async fn get_objects(
// de-allocate the backing [`Bytes`]
drop(req);
return Ok(ProtocolResponse::NA);
Ok(ProtocolResponse::NA)
/*
let res = blockchain_read_handle
@ -122,7 +122,8 @@ async fn get_chain(
if req.block_ids.is_empty() {
Err("No block hashes sent in a `ChainRequest`")?;
}
return Ok(ProtocolResponse::NA);
Ok(ProtocolResponse::NA)
/*
if req.block_ids.len() > MAX_BLOCKCHAIN_SUPPLEMENT_LEN {
@ -191,15 +192,13 @@ async fn new_fluffy_block(
let res = handle_incoming_block(block, txs, &mut blockchain_read_handle).await;
match res {
Err(IncomingBlockError::UnknownTransactions(block_hash, tx_indexes)) => {
return Ok(ProtocolResponse::FluffyMissingTxs(
FluffyMissingTransactionsRequest {
Err(IncomingBlockError::UnknownTransactions(block_hash, tx_indexes)) => Ok(
ProtocolResponse::FluffyMissingTxs(FluffyMissingTransactionsRequest {
block_hash: ByteArray::from(block_hash),
current_blockchain_height: peer_blockchain_height,
missing_tx_indices: tx_indexes,
},
))
}
}),
),
Err(IncomingBlockError::InvalidBlock(e)) => Err(e)?,
Err(IncomingBlockError::Orphan) | Ok(_) => Ok(ProtocolResponse::NA),
}

View file

@ -1,3 +1,9 @@
//! Signals for Cuprate state used throughout the binary.
use tokio::sync::RwLock;
/// Reorg lock.
///
/// A [`RwLock`] where a write lock is taken during a reorg and a read lock can be taken
/// for any operation which must complete without a reorg happening.
pub static REORG_LOCK: RwLock<()> = RwLock::const_new(());

View file

@ -8,7 +8,6 @@ use std::{
};
use futures::FutureExt;
use monero_serai::generators::H;
use monero_serai::{
block::Block,
transaction::{Input, Transaction},
@ -124,10 +123,7 @@ impl PreparedBlock {
///
/// The randomX VM must be Some if RX is needed or this will panic.
/// The randomX VM must also be initialised with the correct seed.
pub fn new<R: RandomX>(
block: Block,
randomx_vm: Option<&R>,
) -> Result<PreparedBlock, ConsensusError> {
pub fn new<R: RandomX>(block: Block, randomx_vm: Option<&R>) -> Result<Self, ConsensusError> {
let (hf_version, hf_vote) = HardFork::from_block_header(&block.header)
.map_err(|_| BlockError::HardForkError(HardForkError::HardForkUnknown))?;
@ -185,8 +181,8 @@ impl PreparedBlock {
})
}
pub fn new_alt_block(block: AltBlockInformation) -> Result<PreparedBlock, ConsensusError> {
Ok(PreparedBlock {
pub fn new_alt_block(block: AltBlockInformation) -> Result<Self, ConsensusError> {
Ok(Self {
block_blob: block.block_blob,
hf_vote: HardFork::from_version(block.block.header.hardfork_version)
.map_err(|_| BlockError::HardForkError(HardForkError::HardForkUnknown))?,

View file

@ -65,7 +65,6 @@ pub enum ExtendedConsensusError {
}
/// Initialize the 2 verifier [`tower::Service`]s (block and transaction).
#[expect(clippy::type_complexity)]
pub fn initialize_verifier<D, Ctx>(
database: D,
ctx_svc: Ctx,

View file

@ -158,13 +158,10 @@ impl<N: NetworkZone> ClientPool<N> {
&self,
cumulative_difficulty: u128,
) -> bool {
self.clients
.iter()
.find(|element| {
self.clients.iter().any(|element| {
let sync_data = element.value().info.core_sync_data.lock().unwrap();
sync_data.cumulative_difficulty() > cumulative_difficulty
})
.is_some()
}
}

View file

@ -12,7 +12,6 @@ use tracing::{instrument, Instrument, Span};
use cuprate_async_buffer::BufferStream;
use cuprate_p2p_core::{
client::Connector,
client::InternalPeerID,
services::{AddressBookRequest, AddressBookResponse},
CoreSyncSvc, NetworkZone, ProtocolRequestHandlerMaker,
};
@ -27,7 +26,6 @@ mod inbound_server;
use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse};
pub use broadcast::{BroadcastRequest, BroadcastSvc};
use client_pool::ClientPoolDropGuard;
pub use config::{AddressBookConfig, P2PConfig};
use connection_maintainer::MakeConnectionRequest;
@ -175,7 +173,7 @@ impl<N: NetworkZone> NetworkInterface<N> {
}
/// TODO
pub fn client_pool(&self) -> &Arc<client_pool::ClientPool<N>> {
pub const fn client_pool(&self) -> &Arc<client_pool::ClientPool<N>> {
&self.pool
}
}