remove unrelated changes
Some checks failed
Audit / audit (push) Has been cancelled
Deny / audit (push) Has been cancelled

This commit is contained in:
Boog900 2024-10-03 22:52:47 +01:00
parent 6702dbee11
commit 403964bc22
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
19 changed files with 43 additions and 378 deletions

View file

@ -73,8 +73,8 @@ tower = { workspace = true }
tracing-subscriber = { workspace = true, features = ["std", "fmt", "default"] }
tracing = { workspace = true }
#[lints]
#workspace = true
[lints]
workspace = true
[profile.dev]
panic = "abort"

View file

@ -1,6 +1,6 @@
//! Blockchain
//!
//! Will contain the chain manager and syncer.
//! Contains the blockchain manager, syncer and an interface to mutate the blockchain.
use std::sync::Arc;
use futures::FutureExt;
@ -17,7 +17,9 @@ use cuprate_types::{
VerifiedBlockInformation,
};
mod interface;
use crate::constants::PANIC_CRITICAL_SERVICE_ERROR;
pub mod interface;
mod manager;
mod syncer;
mod types;
@ -26,8 +28,6 @@ use types::{
ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle,
};
pub use interface::{handle_incoming_block, IncomingBlockError};
/// Checks if the genesis block is in the blockchain and adds it if not.
pub async fn check_add_genesis(
blockchain_read_handle: &mut BlockchainReadHandle,
@ -38,7 +38,7 @@ pub async fn check_add_genesis(
if blockchain_read_handle
.ready()
.await
.unwrap()
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockchainReadRequest::ChainHeight)
.await
.is_ok()
@ -54,7 +54,7 @@ pub async fn check_add_genesis(
blockchain_write_handle
.ready()
.await
.unwrap()
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockchainWriteRequest::WriteBlock(
VerifiedBlockInformation {
block_blob: genesis.serialize(),
@ -72,7 +72,7 @@ pub async fn check_add_genesis(
},
))
.await
.unwrap();
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}
/// Initializes the consensus services.
@ -85,7 +85,7 @@ pub async fn init_consensus(
ConcreteTxVerifierService,
BlockChainContextService,
),
tower::BoxError,
BoxError,
> {
let read_handle = ConsensusBlockchainReadHandle::new(blockchain_read_handle, BoxError::from);

View file

@ -1,6 +1,6 @@
//! The blockchain manger interface.
//!
//! This module contains all the functions to mutate the blockchains state in any way, through the
//! This module contains all the functions to mutate the blockchain's state in any way, through the
//! blockchain manger.
use std::{
collections::{HashMap, HashSet},
@ -28,6 +28,10 @@ use crate::{
pub static COMMAND_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
///
/// This is used over something like a dashmap as we expect a lot of collisions in a short amount of
/// time for new blocks so we would lose the benefit of sharded locks. A dashmap is made up of `RwLocks`
/// which are also more expensive than `Mutex`s.
pub static BLOCKS_BEING_HANDLED: OnceLock<Mutex<HashSet<[u8; 32]>>> = OnceLock::new();
/// An error that can be returned from [`handle_incoming_block`].
@ -90,7 +94,7 @@ pub async fn handle_incoming_block(
));
}
// TODO: check we actually go given the right txs.
// TODO: check we actually got given the right txs.
let prepped_txs = given_txs
.into_par_iter()
.map(|tx| {

View file

@ -49,9 +49,10 @@ pub async fn init_blockchain_manger(
block_verifier_service: ConcreteBlockVerifierService,
block_downloader_config: BlockDownloaderConfig,
) {
// TODO: find good values for these size limits
let (batch_tx, batch_rx) = mpsc::channel(1);
let stop_current_block_downloader = Arc::new(Notify::new());
let (command_tx, command_rx) = mpsc::channel(1);
let (command_tx, command_rx) = mpsc::channel(3);
COMMAND_TX.set(command_tx).unwrap();
@ -60,7 +61,7 @@ pub async fn init_blockchain_manger(
ChainService(blockchain_read_handle.clone()),
clearnet_interface.clone(),
batch_tx,
stop_current_block_downloader.clone(),
Arc::clone(&stop_current_block_downloader),
block_downloader_config,
));

View file

@ -1,3 +1,4 @@
//! This module contains the commands for th blockchain manager.
use std::collections::HashMap;
use monero_serai::block::Block;
@ -5,10 +6,15 @@ use tokio::sync::oneshot;
use cuprate_types::TransactionVerificationData;
/// The blockchain manager commands.
pub enum BlockchainManagerCommand {
/// Attempt to add a new block to the blockchain.
AddBlock {
/// The [`Block`] to add.
block: Block,
/// All the transactions defined in [`Block::transactions`].
prepped_txs: HashMap<[u8; 32], TransactionVerificationData>,
/// The channel to send the response down.
response_tx: oneshot::Sender<Result<bool, anyhow::Error>>,
},
}

View file

@ -1,3 +1,4 @@
//! The blockchain manger handler functions.
use std::{collections::HashMap, sync::Arc};
use bytes::Bytes;
@ -67,7 +68,7 @@ impl super::BlockchainManager {
/// Otherwise, this function will validate and add the block to the main chain.
///
/// On success returns a [`bool`] indicating if the block was added to the main chain ([`true`])
/// of an alt-chain ([`false`]).
/// or an alt-chain ([`false`]).
pub async fn handle_incoming_block(
&mut self,
block: Block,
@ -373,7 +374,7 @@ impl super::BlockchainManager {
///
/// This function assumes the first [`AltBlockInformation`] is the next block in the blockchain
/// for the blockchain database and the context cache, or in other words that the blockchain database
/// and context cache has had the top blocks popped to where the alt-chain meets the main-chain.
/// and context cache have already had the top blocks popped to where the alt-chain meets the main-chain.
///
/// # Errors
///

View file

@ -1,3 +1,4 @@
// FIXME: This whole module is not great and should be rewritten when the PeerSet is made.
use std::{pin::pin, sync::Arc, time::Duration};
use futures::StreamExt;
@ -16,7 +17,6 @@ use cuprate_p2p::{
};
use cuprate_p2p_core::ClearNet;
// FIXME: This whole module is not great and should be rewritten when the PeerSet is made.
const CHECK_SYNC_FREQUENCY: Duration = Duration::from_secs(30);
/// An error returned from the [`syncer`].
@ -28,6 +28,11 @@ pub enum SyncerError {
ServiceError(#[from] tower::BoxError),
}
/// The syncer tasks that makes sure we are fully synchronised with our connected peers.
#[expect(
clippy::significant_drop_tightening,
reason = "Client pool which will be removed"
)]
#[instrument(level = "debug", skip_all)]
pub async fn syncer<C, CN>(
mut context_svc: C,
@ -89,7 +94,7 @@ where
loop {
tokio::select! {
_ = stop_current_block_downloader.notified() => {
() = stop_current_block_downloader.notified() => {
tracing::info!("Stopping block downloader");
break;
}
@ -104,6 +109,7 @@ where
}
}
/// Checks if we should update the given [`BlockChainContext`] and updates it if needed.
async fn check_update_blockchain_context<C>(
context_svc: C,
old_context: &mut BlockChainContext,

View file

@ -1,62 +1 @@
//! cuprated config
use std::time::Duration;
use cuprate_blockchain::config::{
Config as BlockchainConfig, ConfigBuilder as BlockchainConfigBuilder,
};
use cuprate_consensus::ContextConfig;
use cuprate_p2p::{block_downloader::BlockDownloaderConfig, AddressBookConfig, P2PConfig};
use cuprate_p2p_core::{ClearNet, Network};
pub fn config() -> CupratedConfig {
// TODO: read config options from the conf files & cli args.
CupratedConfig {}
}
pub struct CupratedConfig {
// TODO: expose config options we want to allow changing.
}
impl CupratedConfig {
pub fn blockchain_config(&self) -> BlockchainConfig {
BlockchainConfigBuilder::new().fast().build()
}
pub fn clearnet_config(&self) -> P2PConfig<ClearNet> {
P2PConfig {
network: Network::Mainnet,
outbound_connections: 64,
extra_outbound_connections: 0,
max_inbound_connections: 0,
gray_peers_percent: 0.7,
server_config: None,
p2p_port: 0,
rpc_port: 0,
address_book_config: AddressBookConfig {
max_white_list_length: 1000,
max_gray_list_length: 5000,
peer_store_file: "p2p_state.bin".into(),
peer_save_period: Duration::from_secs(60),
},
}
}
pub fn block_downloader_config(&self) -> BlockDownloaderConfig {
BlockDownloaderConfig {
buffer_size: 50_000_000,
in_progress_queue_size: 50_000_000,
check_client_pool_interval: Duration::from_secs(45),
target_batch_size: 10_000_000,
initial_batch_size: 1,
}
}
pub fn network(&self) -> Network {
Network::Mainnet
}
pub fn context_config(&self) -> ContextConfig {
ContextConfig::main_net()
}
}

View file

@ -2,5 +2,4 @@
//!
//! Will handle initiating the P2P and contains a protocol request handler.
pub mod core_sync_svc;
pub mod request_handler;

View file

@ -1,51 +0,0 @@
use cuprate_blockchain::cuprate_database::RuntimeError;
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::{
BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService,
};
use cuprate_p2p_core::services::{CoreSyncDataRequest, CoreSyncDataResponse};
use cuprate_p2p_core::CoreSyncData;
use cuprate_types::blockchain::BlockchainReadRequest;
use futures::future::{BoxFuture, MapErr, MapOk};
use futures::{FutureExt, TryFutureExt};
use std::task::{Context, Poll};
use tower::Service;
#[derive(Clone)]
pub struct CoreSyncService(pub BlockChainContextService);
impl Service<CoreSyncDataRequest> for CoreSyncService {
type Response = CoreSyncDataResponse;
type Error = tower::BoxError;
type Future = MapOk<
<BlockChainContextService as Service<BlockChainContextRequest>>::Future,
fn(BlockChainContextResponse) -> CoreSyncDataResponse,
>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
}
fn call(&mut self, _: CoreSyncDataRequest) -> Self::Future {
self.0
.call(BlockChainContextRequest::GetContext)
.map_ok(|res| {
let BlockChainContextResponse::Context(ctx) = res else {
panic!("blockchain context service returned wrong response.");
};
let raw_ctx = ctx.unchecked_blockchain_context();
// TODO: the hardfork here should be the version of the top block not the current HF,
// on HF boundaries these will be different.
CoreSyncDataResponse(CoreSyncData::new(
raw_ctx.cumulative_difficulty,
// TODO:
raw_ctx.chain_height as u64,
0,
raw_ctx.top_hash,
raw_ctx.current_hf.as_u8(),
))
})
}
}

View file

@ -1,205 +1 @@
use bytes::Bytes;
use cuprate_p2p_core::{NetworkZone, ProtocolRequest, ProtocolResponse};
use futures::future::BoxFuture;
use futures::FutureExt;
use monero_serai::block::Block;
use monero_serai::transaction::Transaction;
use rayon::prelude::*;
use std::task::{Context, Poll};
use tower::{Service, ServiceExt};
use tracing::trace;
use crate::blockchain::{handle_incoming_block, IncomingBlockError};
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_fixed_bytes::ByteArray;
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_helper::cast::usize_to_u64;
use cuprate_helper::map::split_u128_into_low_high_bits;
use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN;
use cuprate_p2p_core::client::PeerInformation;
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
use cuprate_types::BlockCompleteEntry;
use cuprate_wire::protocol::{
ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest,
GetObjectsResponse, NewFluffyBlock,
};
#[derive(Clone)]
pub struct P2pProtocolRequestHandlerMaker {
pub blockchain_read_handle: BlockchainReadHandle,
}
impl<N: NetworkZone> Service<PeerInformation<N>> for P2pProtocolRequestHandlerMaker {
type Response = P2pProtocolRequestHandler<N>;
type Error = tower::BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, peer_information: PeerInformation<N>) -> Self::Future {
// TODO: check peer info.
let blockchain_read_handle = self.blockchain_read_handle.clone();
async {
Ok(P2pProtocolRequestHandler {
peer_information,
blockchain_read_handle,
})
}
.boxed()
}
}
#[derive(Clone)]
pub struct P2pProtocolRequestHandler<N: NetworkZone> {
peer_information: PeerInformation<N>,
blockchain_read_handle: BlockchainReadHandle,
}
impl<Z: NetworkZone> Service<ProtocolRequest> for P2pProtocolRequestHandler<Z> {
type Response = ProtocolResponse;
type Error = tower::BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ProtocolRequest) -> Self::Future {
async { Ok(ProtocolResponse::NA) }.boxed()
}
}
async fn get_objects(
blockchain_read_handle: BlockchainReadHandle,
req: GetObjectsRequest,
) -> Result<ProtocolResponse, tower::BoxError> {
if req.blocks.is_empty() {
Err("No blocks requested in a GetObjectsRequest")?;
}
if req.blocks.len() > MAX_BLOCK_BATCH_LEN {
Err("Too many blocks requested in a GetObjectsRequest")?;
}
let block_ids: Vec<[u8; 32]> = (&req.blocks).into();
// de-allocate the backing [`Bytes`]
drop(req);
Ok(ProtocolResponse::NA)
/*
let res = blockchain_read_handle
.oneshot(BlockchainReadRequest::BlockCompleteEntries(block_ids))
.await?;
let BlockchainResponse::BlockCompleteEntries {
blocks,
missed_ids,
current_blockchain_height,
} = res
else {
panic!("Blockchain service returned wrong response!");
};
Ok(ProtocolResponse::GetObjects(GetObjectsResponse {
blocks,
missed_ids: missed_ids.into(),
current_blockchain_height: usize_to_u64(current_blockchain_height),
}))
*/
}
async fn get_chain(
blockchain_read_handle: BlockchainReadHandle,
req: ChainRequest,
) -> Result<ProtocolResponse, tower::BoxError> {
if req.block_ids.is_empty() {
Err("No block hashes sent in a `ChainRequest`")?;
}
Ok(ProtocolResponse::NA)
/*
if req.block_ids.len() > MAX_BLOCKCHAIN_SUPPLEMENT_LEN {
Err("Too many block hashes in a `ChainRequest`")?;
}
let block_ids: Vec<[u8; 32]> = (&req.block_ids).into();
// de-allocate the backing [`Bytes`]
drop(req);
let res = blockchain_read_handle
.oneshot(BlockchainReadRequest::NextMissingChainEntry(block_ids))
.await?;
let BlockchainResponse::NextMissingChainEntry {
next_entry,
first_missing_block,
start_height,
chain_height,
cumulative_difficulty,
} = res
else {
panic!("Blockchain service returned wrong response!");
};
let (cumulative_difficulty_low64, cumulative_difficulty_top64) =
split_u128_into_low_high_bits(cumulative_difficulty);
Ok(ProtocolResponse::GetChain(ChainResponse {
start_height: usize_to_u64(start_height),
total_height: usize_to_u64(chain_height),
cumulative_difficulty_low64,
cumulative_difficulty_top64,
m_block_ids: next_entry.into(),
m_block_weights: vec![],
first_block: first_missing_block.map_or(Bytes::new(), Bytes::from),
}))
*/
}
async fn new_fluffy_block(
mut blockchain_read_handle: BlockchainReadHandle,
incoming_block: NewFluffyBlock,
) -> Result<ProtocolResponse, tower::BoxError> {
let peer_blockchain_height = incoming_block.current_blockchain_height;
let (block, txs) = rayon_spawn_async(move || {
let block = Block::read(&mut incoming_block.b.block.as_ref())?;
let txs = incoming_block
.b
.txs
.take_normal()
.expect("TODO")
.into_par_iter()
.map(|tx| {
let tx = Transaction::read(&mut tx.as_ref())?;
Ok(tx)
})
.collect::<Result<_, tower::BoxError>>()?;
Ok::<_, tower::BoxError>((block, txs))
})
.await?;
let res = handle_incoming_block(block, txs, &mut blockchain_read_handle).await;
match res {
Err(IncomingBlockError::UnknownTransactions(block_hash, tx_indexes)) => Ok(
ProtocolResponse::FluffyMissingTxs(FluffyMissingTransactionsRequest {
block_hash: ByteArray::from(block_hash),
current_blockchain_height: peer_blockchain_height,
missing_tx_indices: tx_indexes,
}),
),
Err(IncomingBlockError::InvalidBlock(e)) => Err(e)?,
Err(IncomingBlockError::Orphan) | Ok(_) => Ok(ProtocolResponse::NA),
}
}

View file

@ -181,6 +181,7 @@ impl PreparedBlock {
})
}
/// Creates a new [`PreparedBlock`] from an [`AltBlockInformation`].
pub fn new_alt_block(block: AltBlockInformation) -> Result<Self, ConsensusError> {
Ok(Self {
block_blob: block.block_blob,

View file

@ -1,34 +0,0 @@
//! Utils for working with [`Transaction`]
use monero_serai::transaction::{Input, Transaction};
/// Calculates the fee of the [`Transaction`].
///
/// # Panics
/// This will panic if the inputs overflow or the transaction outputs too much, so should only
/// be used on known to be valid txs.
pub fn tx_fee(tx: &Transaction) -> u64 {
let mut fee = 0_u64;
match &tx {
Transaction::V1 { prefix, .. } => {
for input in &prefix.inputs {
match input {
Input::Gen(_) => return 0,
Input::ToKey { amount, .. } => {
fee = fee.checked_add(amount.unwrap_or(0)).unwrap();
}
}
}
for output in &prefix.outputs {
fee.checked_sub(output.amount.unwrap_or(0)).unwrap();
}
}
Transaction::V2 { proofs, .. } => {
fee = proofs.as_ref().unwrap().base.fee;
}
};
fee
}

View file

@ -116,7 +116,6 @@ pub enum ProtocolResponse {
GetChain(ChainResponse),
NewFluffyBlock(NewFluffyBlock),
NewTransactions(NewTransactions),
FluffyMissingTxs(FluffyMissingTransactionsRequest),
NA,
}
@ -140,7 +139,6 @@ impl PeerResponse {
ProtocolResponse::GetChain(_) => MessageID::GetChain,
ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock,
ProtocolResponse::NewTransactions(_) => MessageID::NewFluffyBlock,
ProtocolResponse::FluffyMissingTxs(_) => MessageID::FluffyMissingTxs,
ProtocolResponse::NA => return None,
},

View file

@ -71,7 +71,6 @@ impl TryFrom<ProtocolResponse> for ProtocolMessage {
ProtocolResponse::NewFluffyBlock(val) => Self::NewFluffyBlock(val),
ProtocolResponse::GetChain(val) => Self::ChainEntryResponse(val),
ProtocolResponse::GetObjects(val) => Self::GetObjectsResponse(val),
ProtocolResponse::FluffyMissingTxs(val) => Self::FluffyMissingTransactionsRequest(val),
ProtocolResponse::NA => return Err(MessageConversionError),
})
}

View file

@ -154,6 +154,8 @@ impl<N: NetworkZone> ClientPool<N> {
self.borrow_clients(&peers).collect()
}
/// Checks all clients in the pool checking if any claim a higher cumulative difficulty than the
/// amount specified.
pub fn contains_client_with_more_cumulative_difficulty(
&self,
cumulative_difficulty: u128,

View file

@ -16,11 +16,10 @@ pub(crate) const MAX_SEED_CONNECTIONS: usize = 3;
pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5);
/// The durations of a short ban.
#[cfg_attr(not(test), expect(dead_code))]
pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10);
pub const SHORT_BAN: Duration = Duration::from_secs(60 * 10);
/// The durations of a medium ban.
pub(crate) const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24);
pub const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24);
/// The durations of a long ban.
pub const LONG_BAN: Duration = Duration::from_secs(60 * 60 * 24 * 7);
@ -53,7 +52,7 @@ pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3;
/// The enforced maximum amount of blocks to request in a batch.
///
/// Requesting more than this will cause the peer to disconnect and potentially lead to bans.
pub const MAX_BLOCK_BATCH_LEN: usize = 100;
pub(crate) const MAX_BLOCK_BATCH_LEN: usize = 100;
/// The timeout that the block downloader will use for requests.
pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);

View file

@ -172,7 +172,7 @@ impl<N: NetworkZone> NetworkInterface<N> {
self.address_book.clone()
}
/// TODO
/// Borrows the `ClientPool`, for access to connected peers.
pub const fn client_pool(&self) -> &Arc<client_pool::ClientPool<N>> {
&self.pool
}