diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index 80d10bf2..52043a06 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -1,11 +1,11 @@ //! Blockchain //! //! Will contain the chain manager and syncer. +use std::sync::Arc; use futures::FutureExt; -use std::sync::Arc; use tokio::sync::{mpsc, Notify}; -use tower::{Service, ServiceExt}; +use tower::{BoxError, Service, ServiceExt}; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_consensus::{generate_genesis_block, BlockChainContextService, ContextConfig}; @@ -22,11 +22,8 @@ mod manager; mod syncer; mod types; -use crate::blockchain::interface::INCOMING_BLOCK_TX; -use manager::BlockchainManager; use types::{ - ChainService, ConcreteBlockVerifierService, ConcreteTxVerifierService, - ConsensusBlockchainReadHandle, + ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle, }; pub use interface::{handle_incoming_block, IncomingBlockError}; @@ -51,6 +48,9 @@ pub async fn check_add_genesis( let genesis = generate_genesis_block(network); + assert_eq!(genesis.miner_transaction.prefix().outputs.len(), 1); + assert!(genesis.transactions.is_empty()); + blockchain_write_handle .ready() .await @@ -87,16 +87,14 @@ pub async fn init_consensus( ), tower::BoxError, > { - let ctx_service = cuprate_consensus::initialize_blockchain_context( - context_config, - ConsensusBlockchainReadHandle(blockchain_read_handle.clone()), - ) - .await?; + let read_handle = ConsensusBlockchainReadHandle::new(blockchain_read_handle, BoxError::from); - let (block_verifier_svc, tx_verifier_svc) = cuprate_consensus::initialize_verifier( - ConsensusBlockchainReadHandle(blockchain_read_handle), - ctx_service.clone(), - ); + let ctx_service = + cuprate_consensus::initialize_blockchain_context(context_config, read_handle.clone()) + .await?; + + let (block_verifier_svc, tx_verifier_svc) = + cuprate_consensus::initialize_verifier(read_handle, ctx_service.clone()); Ok((block_verifier_svc, tx_verifier_svc, ctx_service)) } diff --git a/binaries/cuprated/src/blockchain/interface.rs b/binaries/cuprated/src/blockchain/interface.rs index 56c68edf..18376f39 100644 --- a/binaries/cuprated/src/blockchain/interface.rs +++ b/binaries/cuprated/src/blockchain/interface.rs @@ -1,39 +1,74 @@ -use crate::blockchain::manager::commands::BlockchainManagerCommand; -use cuprate_blockchain::service::BlockchainReadHandle; -use cuprate_consensus::transactions::new_tx_verification_data; -use cuprate_helper::cast::usize_to_u64; -use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; -use cuprate_types::Chain; -use monero_serai::block::Block; -use monero_serai::transaction::Transaction; +//! The blockchain manger interface. +//! +//! This module contains all the functions to mutate the blockchains state in any way, through the +//! blockchain manger. +use std::{ + collections::{HashMap, HashSet}, + sync::{Mutex, OnceLock}, +}; + +use monero_serai::{block::Block, transaction::Transaction}; use rayon::prelude::*; -use std::collections::{HashMap, HashSet}; -use std::sync::{Mutex, OnceLock}; use tokio::sync::{mpsc, oneshot}; use tower::{Service, ServiceExt}; -pub static INCOMING_BLOCK_TX: OnceLock> = OnceLock::new(); +use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_consensus::transactions::new_tx_verification_data; +use cuprate_helper::cast::usize_to_u64; +use cuprate_types::{ + blockchain::{BlockchainReadRequest, BlockchainResponse}, + Chain, +}; +use crate::{ + blockchain::manager::BlockchainManagerCommand, constants::PANIC_CRITICAL_SERVICE_ERROR, +}; + +/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manger. +pub static COMMAND_TX: OnceLock> = OnceLock::new(); + +/// A [`HashSet`] of block hashes that the blockchain manager is currently handling. pub static BLOCKS_BEING_HANDLED: OnceLock>> = OnceLock::new(); +/// An error that can be returned from [`handle_incoming_block`]. #[derive(Debug, thiserror::Error)] pub enum IncomingBlockError { + /// Some transactions in the block were unknown. + /// + /// The inner values are the block hash and the indexes of the missing txs in the block. #[error("Unknown transactions in block.")] UnknownTransactions([u8; 32], Vec), + /// We are missing the block's parent. #[error("The block has an unknown parent.")] Orphan, + /// The block was invalid. #[error(transparent)] InvalidBlock(anyhow::Error), } +/// Try to add a new block to the blockchain. +/// +/// This returns a [`bool`] indicating if the block was added to the main-chain ([`true`]) or an alt-chain +/// ([`false`]). +/// +/// If we already knew about this block or the blockchain manger is not setup yet `Ok(false)` is returned. +/// +/// # Errors +/// +/// This function will return an error if: +/// - the block was invalid +/// - we are missing transactions +/// - the block's parent is unknown pub async fn handle_incoming_block( block: Block, given_txs: Vec, blockchain_read_handle: &mut BlockchainReadHandle, ) -> Result { + // FIXME: we should look in the tx-pool for txs when that is ready. + if !block_exists(block.header.previous, blockchain_read_handle) .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) { return Err(IncomingBlockError::Orphan); } @@ -42,12 +77,12 @@ pub async fn handle_incoming_block( if block_exists(block_hash, blockchain_read_handle) .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) { return Ok(false); } - // TODO: Get transactions from the tx pool first. + // TODO: remove this when we have a working tx-pool. if given_txs.len() != block.transactions.len() { return Err(IncomingBlockError::UnknownTransactions( block_hash, @@ -55,6 +90,7 @@ pub async fn handle_incoming_block( )); } + // TODO: check we actually go given the right txs. let prepped_txs = given_txs .into_par_iter() .map(|tx| { @@ -64,19 +100,25 @@ pub async fn handle_incoming_block( .collect::>() .map_err(IncomingBlockError::InvalidBlock)?; - let Some(incoming_block_tx) = INCOMING_BLOCK_TX.get() else { + let Some(incoming_block_tx) = COMMAND_TX.get() else { + // We could still be starting up the blockchain manger, so just return this as there is nothing + // else we can do. return Ok(false); }; + // Add the blocks hash to the blocks being handled. if !BLOCKS_BEING_HANDLED .get_or_init(|| Mutex::new(HashSet::new())) .lock() .unwrap() .insert(block_hash) { + // If another place is already adding this block then we can stop. return Ok(false); } + // From this point on we MUST not early return without removing the block hash from `BLOCKS_BEING_HANDLED`. + let (response_tx, response_rx) = oneshot::channel(); incoming_block_tx @@ -86,13 +128,14 @@ pub async fn handle_incoming_block( response_tx, }) .await - .expect("TODO: don't actually panic here"); + .expect("TODO: don't actually panic here, an err means we are shutting down"); let res = response_rx .await - .unwrap() + .expect("The blockchain manager will always respond") .map_err(IncomingBlockError::InvalidBlock); + // Remove the block hash from the blocks being handled. BLOCKS_BEING_HANDLED .get() .unwrap() @@ -103,6 +146,7 @@ pub async fn handle_incoming_block( res } +/// Check if we have a block with the given hash. async fn block_exists( block_hash: [u8; 32], blockchain_read_handle: &mut BlockchainReadHandle, diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index 314519d0..2d3c1798 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -1,35 +1,46 @@ -pub(super) mod commands; -mod handler; +use std::{collections::HashMap, sync::Arc}; -use crate::blockchain::interface::INCOMING_BLOCK_TX; -use crate::blockchain::manager::commands::BlockchainManagerCommand; -use crate::blockchain::types::ChainService; -use crate::blockchain::{ - syncer, - types::{ConcreteBlockVerifierService, ConsensusBlockchainReadHandle}, -}; -use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; -use cuprate_consensus::context::RawBlockChainContext; -use cuprate_consensus::{ - BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, - BlockVerifierService, ExtendedConsensusError, TxVerifierService, VerifyBlockRequest, - VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse, -}; -use cuprate_p2p::block_downloader::{BlockBatch, BlockDownloaderConfig}; -use cuprate_p2p::{BroadcastSvc, NetworkInterface}; -use cuprate_p2p_core::ClearNet; -use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; -use cuprate_types::{Chain, TransactionVerificationData}; use futures::StreamExt; use monero_serai::block::Block; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::mpsc; -use tokio::sync::{oneshot, Notify}; +use tokio::sync::{mpsc, oneshot, Notify}; use tower::{Service, ServiceExt}; use tracing::error; -use tracing_subscriber::fmt::time::FormatTime; +use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; +use cuprate_consensus::{ + context::RawBlockChainContext, BlockChainContextRequest, BlockChainContextResponse, + BlockChainContextService, BlockVerifierService, ExtendedConsensusError, TxVerifierService, + VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse, +}; +use cuprate_p2p::{ + block_downloader::{BlockBatch, BlockDownloaderConfig}, + BroadcastSvc, NetworkInterface, +}; +use cuprate_p2p_core::ClearNet; +use cuprate_types::{ + blockchain::{BlockchainReadRequest, BlockchainResponse}, + Chain, TransactionVerificationData, +}; + +use crate::{ + blockchain::{ + interface::COMMAND_TX, + syncer, + types::ChainService, + types::{ConcreteBlockVerifierService, ConsensusBlockchainReadHandle}, + }, + constants::PANIC_CRITICAL_SERVICE_ERROR, +}; + +mod commands; +mod handler; + +pub use commands::BlockchainManagerCommand; + +/// Initialize the blockchain manger. +/// +/// This function sets up the [`BlockchainManager`] and the [`syncer`] so that the functions in [`interface`](super::interface) +/// can be called. pub async fn init_blockchain_manger( clearnet_interface: NetworkInterface, blockchain_write_handle: BlockchainWriteHandle, @@ -42,7 +53,7 @@ pub async fn init_blockchain_manger( let stop_current_block_downloader = Arc::new(Notify::new()); let (command_tx, command_rx) = mpsc::channel(1); - INCOMING_BLOCK_TX.set(command_tx).unwrap(); + COMMAND_TX.set(command_tx).unwrap(); tokio::spawn(syncer::syncer( blockchain_context_service.clone(), @@ -56,10 +67,10 @@ pub async fn init_blockchain_manger( let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service .ready() .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) .call(BlockChainContextRequest::GetContext) .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) else { panic!("Blockchain context service returned wrong response!"); }; @@ -71,7 +82,7 @@ pub async fn init_blockchain_manger( cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(), block_verifier_service, stop_current_block_downloader, - broadcast_svc, + broadcast_svc: clearnet_interface.broadcast_svc(), }; tokio::spawn(manger.run(batch_rx, command_rx)); @@ -97,11 +108,7 @@ pub struct BlockchainManager { /// A cached context representing the current state. cached_blockchain_context: RawBlockChainContext, /// The block verifier service, to verify incoming blocks. - block_verifier_service: BlockVerifierService< - BlockChainContextService, - TxVerifierService, - ConsensusBlockchainReadHandle, - >, + block_verifier_service: ConcreteBlockVerifierService, /// A [`Notify`] to tell the [syncer](syncer::syncer) that we want to cancel this current download /// attempt. stop_current_block_downloader: Arc, @@ -110,6 +117,7 @@ pub struct BlockchainManager { } impl BlockchainManager { + /// The [`BlockchainManager`] task. pub async fn run( mut self, mut block_batch_rx: mpsc::Receiver, diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index 7d4c81c4..62b54d69 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -1,8 +1,9 @@ +use std::{collections::HashMap, sync::Arc}; + use bytes::Bytes; use futures::{TryFutureExt, TryStreamExt}; use monero_serai::{block::Block, transaction::Transaction}; use rayon::prelude::*; -use std::{collections::HashMap, sync::Arc}; use tower::{Service, ServiceExt}; use tracing::info; @@ -20,11 +21,16 @@ use cuprate_types::{ AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation, }; -use crate::blockchain::manager::commands::BlockchainManagerCommand; -use crate::constants::PANIC_CRITICAL_SERVICE_ERROR; -use crate::{blockchain::types::ConsensusBlockchainReadHandle, signals::REORG_LOCK}; +use crate::{ + blockchain::{ + manager::commands::BlockchainManagerCommand, types::ConsensusBlockchainReadHandle, + }, + constants::PANIC_CRITICAL_SERVICE_ERROR, + signals::REORG_LOCK, +}; impl super::BlockchainManager { + /// Handle an incoming command from another part of Cuprate. pub async fn handle_command(&mut self, command: BlockchainManagerCommand) { match command { BlockchainManagerCommand::AddBlock { @@ -39,6 +45,7 @@ impl super::BlockchainManager { } } + /// Broadcast a valid block to the network. async fn broadcast_block(&mut self, block_bytes: Bytes, blockchain_height: usize) { self.broadcast_svc .ready() @@ -191,7 +198,7 @@ impl super::BlockchainManager { /// This function will panic if any internal service returns an unexpected error that we cannot /// recover from. async fn handle_incoming_block_batch_alt_chain(&mut self, mut batch: BlockBatch) { - // TODO: this needs testing (this whole section does but this specifically). + // TODO: this needs testing (this whole section does but alt-blocks specifically). let mut blocks = batch.blocks.into_iter(); @@ -394,7 +401,7 @@ impl super::BlockchainManager { .block_verifier_service .ready() .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) .call(VerifyBlockRequest::MainChainPrepped { block: prepped_block, txs: prepped_txs, @@ -426,7 +433,7 @@ impl super::BlockchainManager { self.blockchain_context_service .ready() .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) .call(BlockChainContextRequest::Update(NewBlockData { block_hash: verified_block.block_hash, height: verified_block.height, @@ -438,24 +445,24 @@ impl super::BlockchainManager { cumulative_difficulty: verified_block.cumulative_difficulty, })) .await - .expect("TODO"); + .expect(PANIC_CRITICAL_SERVICE_ERROR); self.blockchain_write_handle .ready() .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) .call(BlockchainWriteRequest::WriteBlock(verified_block)) .await - .expect("TODO"); + .expect(PANIC_CRITICAL_SERVICE_ERROR); let BlockChainContextResponse::Context(blockchain_context) = self .blockchain_context_service .ready() .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) .call(BlockChainContextRequest::GetContext) .await - .expect("TODO") + .expect(PANIC_CRITICAL_SERVICE_ERROR) else { panic!("Incorrect response!"); }; diff --git a/binaries/cuprated/src/blockchain/syncer.rs b/binaries/cuprated/src/blockchain/syncer.rs index 4b286ceb..c772b128 100644 --- a/binaries/cuprated/src/blockchain/syncer.rs +++ b/binaries/cuprated/src/blockchain/syncer.rs @@ -1,6 +1,4 @@ -use std::pin::pin; -use std::sync::Arc; -use std::time::Duration; +use std::{pin::pin, sync::Arc, time::Duration}; use futures::StreamExt; use tokio::time::interval; @@ -18,6 +16,7 @@ use cuprate_p2p::{ }; use cuprate_p2p_core::ClearNet; +// FIXME: This whole module is not great and should be rewritten when the PeerSet is made. const CHECK_SYNC_FREQUENCY: Duration = Duration::from_secs(30); /// An error returned from the [`syncer`]. diff --git a/binaries/cuprated/src/blockchain/types.rs b/binaries/cuprated/src/blockchain/types.rs index 46576a46..1bf921ec 100644 --- a/binaries/cuprated/src/blockchain/types.rs +++ b/binaries/cuprated/src/blockchain/types.rs @@ -1,41 +1,32 @@ -use cuprate_blockchain::cuprate_database::RuntimeError; -use cuprate_blockchain::service::BlockchainReadHandle; +use std::task::{Context, Poll}; + +use futures::future::BoxFuture; +use futures::{FutureExt, TryFutureExt}; +use tower::{util::MapErr, Service}; + +use cuprate_blockchain::{cuprate_database::RuntimeError, service::BlockchainReadHandle}; use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService}; use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse}; use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; -use futures::future::{BoxFuture, MapErr}; -use futures::{FutureExt, TryFutureExt}; -use std::task::{Context, Poll}; -use tower::Service; +/// The [`BlockVerifierService`] with all generic types defined. pub type ConcreteBlockVerifierService = BlockVerifierService< BlockChainContextService, - TxVerifierService, + ConcreteTxVerifierService, ConsensusBlockchainReadHandle, >; +/// The [`TxVerifierService`] with all generic types defined. pub type ConcreteTxVerifierService = TxVerifierService; -#[derive(Clone)] -pub struct ConsensusBlockchainReadHandle(pub BlockchainReadHandle); - -impl Service for ConsensusBlockchainReadHandle { - type Response = BlockchainResponse; - type Error = tower::BoxError; - type Future = MapErr< - >::Future, - fn(RuntimeError) -> tower::BoxError, - >; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.0.poll_ready(cx).map_err(Into::into) - } - - fn call(&mut self, req: BlockchainReadRequest) -> Self::Future { - self.0.call(req).map_err(Into::into) - } -} +/// The [`BlockchainReadHandle`] with the [`tower::Service::Error`] mapped to conform to what the consensus crate requires. +pub type ConsensusBlockchainReadHandle = + MapErr tower::BoxError>; +/// That service that allows retrieving the chain state to give to the P2P crates, so we can figure out +/// what blocks we need. +/// +/// This has a more minimal interface than [`BlockchainReadRequest`] to make using the p2p crates easier. #[derive(Clone)] pub struct ChainService(pub BlockchainReadHandle); @@ -79,6 +70,7 @@ impl Service for ChainService { .call(BlockchainReadRequest::CompactChainHistory) .map_ok(|res| { // TODO create a custom request instead of hijacking this one. + // TODO: use the context cache. let BlockchainResponse::CompactChainHistory { cumulative_difficulty, .. diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 86507f1f..419c3b42 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -90,7 +90,7 @@ async fn get_objects( // de-allocate the backing [`Bytes`] drop(req); - return Ok(ProtocolResponse::NA); + Ok(ProtocolResponse::NA) /* let res = blockchain_read_handle @@ -122,7 +122,8 @@ async fn get_chain( if req.block_ids.is_empty() { Err("No block hashes sent in a `ChainRequest`")?; } - return Ok(ProtocolResponse::NA); + + Ok(ProtocolResponse::NA) /* if req.block_ids.len() > MAX_BLOCKCHAIN_SUPPLEMENT_LEN { @@ -191,15 +192,13 @@ async fn new_fluffy_block( let res = handle_incoming_block(block, txs, &mut blockchain_read_handle).await; match res { - Err(IncomingBlockError::UnknownTransactions(block_hash, tx_indexes)) => { - return Ok(ProtocolResponse::FluffyMissingTxs( - FluffyMissingTransactionsRequest { - block_hash: ByteArray::from(block_hash), - current_blockchain_height: peer_blockchain_height, - missing_tx_indices: tx_indexes, - }, - )) - } + Err(IncomingBlockError::UnknownTransactions(block_hash, tx_indexes)) => Ok( + ProtocolResponse::FluffyMissingTxs(FluffyMissingTransactionsRequest { + block_hash: ByteArray::from(block_hash), + current_blockchain_height: peer_blockchain_height, + missing_tx_indices: tx_indexes, + }), + ), Err(IncomingBlockError::InvalidBlock(e)) => Err(e)?, Err(IncomingBlockError::Orphan) | Ok(_) => Ok(ProtocolResponse::NA), } diff --git a/binaries/cuprated/src/signals.rs b/binaries/cuprated/src/signals.rs index cafd8cdb..54467559 100644 --- a/binaries/cuprated/src/signals.rs +++ b/binaries/cuprated/src/signals.rs @@ -1,3 +1,9 @@ +//! Signals for Cuprate state used throughout the binary. + use tokio::sync::RwLock; +/// Reorg lock. +/// +/// A [`RwLock`] where a write lock is taken during a reorg and a read lock can be taken +/// for any operation which must complete without a reorg happening. pub static REORG_LOCK: RwLock<()> = RwLock::const_new(()); diff --git a/consensus/src/block.rs b/consensus/src/block.rs index 08e6cef0..53eb1469 100644 --- a/consensus/src/block.rs +++ b/consensus/src/block.rs @@ -8,7 +8,6 @@ use std::{ }; use futures::FutureExt; -use monero_serai::generators::H; use monero_serai::{ block::Block, transaction::{Input, Transaction}, @@ -124,10 +123,7 @@ impl PreparedBlock { /// /// The randomX VM must be Some if RX is needed or this will panic. /// The randomX VM must also be initialised with the correct seed. - pub fn new( - block: Block, - randomx_vm: Option<&R>, - ) -> Result { + pub fn new(block: Block, randomx_vm: Option<&R>) -> Result { let (hf_version, hf_vote) = HardFork::from_block_header(&block.header) .map_err(|_| BlockError::HardForkError(HardForkError::HardForkUnknown))?; @@ -185,8 +181,8 @@ impl PreparedBlock { }) } - pub fn new_alt_block(block: AltBlockInformation) -> Result { - Ok(PreparedBlock { + pub fn new_alt_block(block: AltBlockInformation) -> Result { + Ok(Self { block_blob: block.block_blob, hf_vote: HardFork::from_version(block.block.header.hardfork_version) .map_err(|_| BlockError::HardForkError(HardForkError::HardForkUnknown))?, diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 1e473158..7280f2ff 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -65,7 +65,6 @@ pub enum ExtendedConsensusError { } /// Initialize the 2 verifier [`tower::Service`]s (block and transaction). -#[expect(clippy::type_complexity)] pub fn initialize_verifier( database: D, ctx_svc: Ctx, diff --git a/p2p/p2p/src/client_pool.rs b/p2p/p2p/src/client_pool.rs index 735be76d..99981f0c 100644 --- a/p2p/p2p/src/client_pool.rs +++ b/p2p/p2p/src/client_pool.rs @@ -158,13 +158,10 @@ impl ClientPool { &self, cumulative_difficulty: u128, ) -> bool { - self.clients - .iter() - .find(|element| { - let sync_data = element.value().info.core_sync_data.lock().unwrap(); - sync_data.cumulative_difficulty() > cumulative_difficulty - }) - .is_some() + self.clients.iter().any(|element| { + let sync_data = element.value().info.core_sync_data.lock().unwrap(); + sync_data.cumulative_difficulty() > cumulative_difficulty + }) } } diff --git a/p2p/p2p/src/lib.rs b/p2p/p2p/src/lib.rs index 37416b2d..ad082fc6 100644 --- a/p2p/p2p/src/lib.rs +++ b/p2p/p2p/src/lib.rs @@ -12,7 +12,6 @@ use tracing::{instrument, Instrument, Span}; use cuprate_async_buffer::BufferStream; use cuprate_p2p_core::{ client::Connector, - client::InternalPeerID, services::{AddressBookRequest, AddressBookResponse}, CoreSyncSvc, NetworkZone, ProtocolRequestHandlerMaker, }; @@ -27,7 +26,6 @@ mod inbound_server; use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}; pub use broadcast::{BroadcastRequest, BroadcastSvc}; -use client_pool::ClientPoolDropGuard; pub use config::{AddressBookConfig, P2PConfig}; use connection_maintainer::MakeConnectionRequest; @@ -175,7 +173,7 @@ impl NetworkInterface { } /// TODO - pub fn client_pool(&self) -> &Arc> { + pub const fn client_pool(&self) -> &Arc> { &self.pool } }