From 7b8756fa80e386fb04173d8220c15c86bf9f9888 Mon Sep 17 00:00:00 2001 From: Boog900 Date: Tue, 3 Dec 2024 20:21:05 +0000 Subject: [PATCH] cuprated: P2P protocol request handler (#303) * add cuprated skeleton * fmt and add deny exception * add main chain batch handler * add blockchain init * very rough block manager * misc changes * move more config values * add new tables & types * add function to fully add an alt block * resolve current todo!s * add new requests * WIP: starting re-orgs * add last service request * commit Cargo.lock * add test * more docs + cleanup + alt blocks request * clippy + fmt * document types * move tx_fee to helper * more doc updates * fmt * fix imports * remove config files * fix merge errors * fix generated coins * handle more p2p requests + alt blocks * clean up handler code * add function for incoming blocks * add docs to handler functions * broadcast new blocks + add commands * add fluffy block handler * fix new block handling * small cleanup * increase outbound peer count * fix merge * clean up the blockchain manger * add more docs + cleanup imports * fix typo * fix doc * remove unrelated changes * add `get_objects` handler * add `get_chain` handler * add `fluffy_missing_txs` handler * add `new_fluffy_block` handler * improve interface globals * manger -> manager * enums instead of bools * move chain service to separate file * more review fixes * sort imports + docs * init dandelion integration * add dandelion start function * finish incoming tx handler * Add tx blob hash table * Add missing txpool requests * handle duplicate stem txs * check txpool on incoming block * add request to remove tx in new blocks from the pool * tell the txpool about incoming blocks * fix merge * typos * remove blockchain height from txpool * fix merge * fix merge * handle incoming txs in p2p request handler * Allow `IncomingTxHandler` to be given later * add p2p clearnet init * fix build * misc changes * doc updates * more doc updates * sort imports * review changes * Result -> DbResult * use `NonZero` * review fixes * remove `rust-2024-compatibility` lint --- Cargo.lock | 1 + Cargo.toml | 1 - binaries/cuprated/src/p2p.rs | 49 ++ .../cuprated/src/p2p/core_sync_service.rs | 49 ++ binaries/cuprated/src/p2p/request_handler.rs | 421 ++++++++++++++++++ binaries/cuprated/src/txpool.rs | 2 +- binaries/cuprated/src/txpool/incoming_tx.rs | 5 + net/wire/src/p2p/protocol.rs | 2 +- p2p/p2p-core/src/protocol.rs | 4 + p2p/p2p-core/src/protocol/try_from.rs | 3 + p2p/p2p/src/constants.rs | 6 +- storage/blockchain/Cargo.toml | 1 + storage/blockchain/src/ops/block.rs | 68 ++- storage/blockchain/src/ops/blockchain.rs | 42 +- storage/blockchain/src/service/read.rs | 174 ++++++-- types/src/blockchain.rs | 66 ++- types/src/lib.rs | 4 +- types/src/types.rs | 7 + 18 files changed, 861 insertions(+), 44 deletions(-) create mode 100644 binaries/cuprated/src/p2p/core_sync_service.rs diff --git a/Cargo.lock b/Cargo.lock index 0d55c8a..c8701f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -704,6 +704,7 @@ version = "0.0.0" dependencies = [ "bitflags 2.6.0", "bytemuck", + "bytes", "cuprate-constants", "cuprate-database", "cuprate-database-service", diff --git a/Cargo.toml b/Cargo.toml index 9be1528..1813057 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -376,7 +376,6 @@ unused_lifetimes = "deny" unused_macro_rules = "deny" ambiguous_glob_imports = "deny" unused_unsafe = "deny" -rust_2024_compatibility = "deny" # Warm let_underscore = { level = "deny", priority = -1 } diff --git a/binaries/cuprated/src/p2p.rs b/binaries/cuprated/src/p2p.rs index cdf1cef..aeb98b6 100644 --- a/binaries/cuprated/src/p2p.rs +++ b/binaries/cuprated/src/p2p.rs @@ -1,8 +1,57 @@ //! P2P //! //! Will handle initiating the P2P and contains a protocol request handler. +use futures::{FutureExt, TryFutureExt}; +use tokio::sync::oneshot; +use tower::ServiceExt; +use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_consensus::BlockChainContextService; +use cuprate_p2p::{NetworkInterface, P2PConfig}; +use cuprate_p2p_core::ClearNet; +use cuprate_txpool::service::TxpoolReadHandle; + +use crate::txpool::IncomingTxHandler; + +mod core_sync_service; mod network_address; pub mod request_handler; pub use network_address::CrossNetworkInternalPeerId; + +/// Starts the P2P clearnet network, returning a [`NetworkInterface`] to interact with it. +/// +/// A [`oneshot::Sender`] is also returned to provide the [`IncomingTxHandler`], until this is provided network +/// handshakes can not be completed. +pub async fn start_clearnet_p2p( + blockchain_read_handle: BlockchainReadHandle, + blockchain_context_service: BlockChainContextService, + txpool_read_handle: TxpoolReadHandle, + config: P2PConfig, +) -> Result< + ( + NetworkInterface, + oneshot::Sender, + ), + tower::BoxError, +> { + let (incoming_tx_handler_tx, incoming_tx_handler_rx) = oneshot::channel(); + + let request_handler_maker = request_handler::P2pProtocolRequestHandlerMaker { + blockchain_read_handle, + blockchain_context_service: blockchain_context_service.clone(), + txpool_read_handle, + incoming_tx_handler: None, + incoming_tx_handler_fut: incoming_tx_handler_rx.shared(), + }; + + Ok(( + cuprate_p2p::initialize_network( + request_handler_maker.map_response(|s| s.map_err(Into::into)), + core_sync_service::CoreSyncService(blockchain_context_service), + config, + ) + .await?, + incoming_tx_handler_tx, + )) +} diff --git a/binaries/cuprated/src/p2p/core_sync_service.rs b/binaries/cuprated/src/p2p/core_sync_service.rs new file mode 100644 index 0000000..d3c3ca1 --- /dev/null +++ b/binaries/cuprated/src/p2p/core_sync_service.rs @@ -0,0 +1,49 @@ +use std::task::{Context, Poll}; + +use futures::{future::BoxFuture, FutureExt, TryFutureExt}; +use tower::Service; + +use cuprate_consensus::{ + BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, +}; +use cuprate_helper::{cast::usize_to_u64, map::split_u128_into_low_high_bits}; +use cuprate_p2p_core::services::{CoreSyncDataRequest, CoreSyncDataResponse}; +use cuprate_wire::CoreSyncData; + +/// The core sync service. +#[derive(Clone)] +pub struct CoreSyncService(pub BlockChainContextService); + +impl Service for CoreSyncService { + type Response = CoreSyncDataResponse; + type Error = tower::BoxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_ready(cx) + } + + fn call(&mut self, _: CoreSyncDataRequest) -> Self::Future { + self.0 + .call(BlockChainContextRequest::Context) + .map_ok(|res| { + let BlockChainContextResponse::Context(context) = res else { + unreachable!() + }; + + let context = context.unchecked_blockchain_context(); + let (cumulative_difficulty, cumulative_difficulty_top64) = + split_u128_into_low_high_bits(context.cumulative_difficulty); + + CoreSyncDataResponse(CoreSyncData { + cumulative_difficulty, + cumulative_difficulty_top64, + current_height: usize_to_u64(context.chain_height), + pruning_seed: 0, + top_id: context.top_hash, + top_version: context.current_hf.as_u8(), + }) + }) + .boxed() + } +} diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 8b13789..7d72fa3 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -1 +1,422 @@ +use std::{ + collections::HashSet, + future::{ready, Ready}, + hash::Hash, + task::{Context, Poll}, +}; +use bytes::Bytes; +use futures::{ + future::{BoxFuture, Shared}, + FutureExt, +}; +use monero_serai::{block::Block, transaction::Transaction}; +use tokio::sync::{broadcast, oneshot, watch}; +use tokio_stream::wrappers::WatchStream; +use tower::{Service, ServiceExt}; + +use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_consensus::{ + transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse, + BlockChainContextService, +}; +use cuprate_dandelion_tower::TxState; +use cuprate_fixed_bytes::ByteArrayVec; +use cuprate_helper::cast::u64_to_usize; +use cuprate_helper::{ + asynch::rayon_spawn_async, + cast::usize_to_u64, + map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}, +}; +use cuprate_p2p::constants::{ + MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MAX_BLOCK_BATCH_LEN, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN, +}; +use cuprate_p2p_core::{ + client::{InternalPeerID, PeerInformation}, + NetZoneAddress, NetworkZone, ProtocolRequest, ProtocolResponse, +}; +use cuprate_txpool::service::TxpoolReadHandle; +use cuprate_types::{ + blockchain::{BlockchainReadRequest, BlockchainResponse}, + BlockCompleteEntry, TransactionBlobs, TxsInBlock, +}; +use cuprate_wire::protocol::{ + ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest, + GetObjectsResponse, NewFluffyBlock, NewTransactions, +}; + +use crate::{ + blockchain::interface::{self as blockchain_interface, IncomingBlockError}, + constants::PANIC_CRITICAL_SERVICE_ERROR, + p2p::CrossNetworkInternalPeerId, + txpool::{IncomingTxError, IncomingTxHandler, IncomingTxs}, +}; + +/// The P2P protocol request handler [`MakeService`](tower::MakeService). +#[derive(Clone)] +pub struct P2pProtocolRequestHandlerMaker { + pub blockchain_read_handle: BlockchainReadHandle, + pub blockchain_context_service: BlockChainContextService, + pub txpool_read_handle: TxpoolReadHandle, + + /// The [`IncomingTxHandler`], wrapped in an [`Option`] as there is a cyclic reference between [`P2pProtocolRequestHandlerMaker`] + /// and the [`IncomingTxHandler`]. + pub incoming_tx_handler: Option, + + /// A [`Future`](std::future::Future) that produces the [`IncomingTxHandler`]. + pub incoming_tx_handler_fut: Shared>, +} + +impl Service> for P2pProtocolRequestHandlerMaker +where + InternalPeerID: Into, +{ + type Response = P2pProtocolRequestHandler; + type Error = tower::BoxError; + type Future = Ready>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.incoming_tx_handler.is_none() { + return self + .incoming_tx_handler_fut + .poll_unpin(cx) + .map(|incoming_tx_handler| { + self.incoming_tx_handler = Some(incoming_tx_handler?); + Ok(()) + }); + } + + Poll::Ready(Ok(())) + } + + fn call(&mut self, peer_information: PeerInformation) -> Self::Future { + let Some(incoming_tx_handler) = self.incoming_tx_handler.clone() else { + panic!("poll_ready was not called or did not return `Poll::Ready`") + }; + + // TODO: check sync info? + + let blockchain_read_handle = self.blockchain_read_handle.clone(); + let txpool_read_handle = self.txpool_read_handle.clone(); + + ready(Ok(P2pProtocolRequestHandler { + peer_information, + blockchain_read_handle, + blockchain_context_service: self.blockchain_context_service.clone(), + txpool_read_handle, + incoming_tx_handler, + })) + } +} + +/// The P2P protocol request handler. +#[derive(Clone)] +pub struct P2pProtocolRequestHandler { + peer_information: PeerInformation, + blockchain_read_handle: BlockchainReadHandle, + blockchain_context_service: BlockChainContextService, + txpool_read_handle: TxpoolReadHandle, + incoming_tx_handler: IncomingTxHandler, +} + +impl Service for P2pProtocolRequestHandler +where + InternalPeerID: Into, +{ + type Response = ProtocolResponse; + type Error = anyhow::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: ProtocolRequest) -> Self::Future { + match request { + ProtocolRequest::GetObjects(r) => { + get_objects(r, self.blockchain_read_handle.clone()).boxed() + } + ProtocolRequest::GetChain(r) => { + get_chain(r, self.blockchain_read_handle.clone()).boxed() + } + ProtocolRequest::FluffyMissingTxs(r) => { + fluffy_missing_txs(r, self.blockchain_read_handle.clone()).boxed() + } + ProtocolRequest::NewBlock(_) => ready(Err(anyhow::anyhow!( + "Peer sent a full block when we support fluffy blocks" + ))) + .boxed(), + ProtocolRequest::NewFluffyBlock(r) => new_fluffy_block( + self.peer_information.clone(), + r, + self.blockchain_read_handle.clone(), + self.txpool_read_handle.clone(), + ) + .boxed(), + ProtocolRequest::NewTransactions(r) => new_transactions( + self.peer_information.clone(), + r, + self.blockchain_context_service.clone(), + self.incoming_tx_handler.clone(), + ) + .boxed(), + ProtocolRequest::GetTxPoolCompliment(_) => ready(Ok(ProtocolResponse::NA)).boxed(), // TODO: should we support this? + } + } +} + +//---------------------------------------------------------------------------------------------------- Handler functions + +/// [`ProtocolRequest::GetObjects`] +async fn get_objects( + request: GetObjectsRequest, + mut blockchain_read_handle: BlockchainReadHandle, +) -> anyhow::Result { + if request.blocks.len() > MAX_BLOCK_BATCH_LEN { + anyhow::bail!("Peer requested more blocks than allowed.") + } + + let block_hashes: Vec<[u8; 32]> = (&request.blocks).into(); + // deallocate the backing `Bytes`. + drop(request); + + let BlockchainResponse::BlockCompleteEntries { + blocks, + missing_hashes, + blockchain_height, + } = blockchain_read_handle + .ready() + .await? + .call(BlockchainReadRequest::BlockCompleteEntries(block_hashes)) + .await? + else { + unreachable!(); + }; + + Ok(ProtocolResponse::GetObjects(GetObjectsResponse { + blocks, + missed_ids: ByteArrayVec::from(missing_hashes), + current_blockchain_height: usize_to_u64(blockchain_height), + })) +} + +/// [`ProtocolRequest::GetChain`] +async fn get_chain( + request: ChainRequest, + mut blockchain_read_handle: BlockchainReadHandle, +) -> anyhow::Result { + if request.block_ids.len() > MAX_BLOCKS_IDS_IN_CHAIN_ENTRY { + anyhow::bail!("Peer sent too many block hashes in chain request.") + } + + let block_hashes: Vec<[u8; 32]> = (&request.block_ids).into(); + let want_pruned_data = request.prune; + // deallocate the backing `Bytes`. + drop(request); + + let BlockchainResponse::NextChainEntry { + start_height, + chain_height, + block_ids, + block_weights, + cumulative_difficulty, + first_block_blob, + } = blockchain_read_handle + .ready() + .await? + .call(BlockchainReadRequest::NextChainEntry(block_hashes, 10_000)) + .await? + else { + unreachable!(); + }; + + let Some(start_height) = start_height else { + anyhow::bail!("The peers chain has a different genesis block than ours."); + }; + + let (cumulative_difficulty_low64, cumulative_difficulty_top64) = + split_u128_into_low_high_bits(cumulative_difficulty); + + Ok(ProtocolResponse::GetChain(ChainResponse { + start_height: usize_to_u64(std::num::NonZero::get(start_height)), + total_height: usize_to_u64(chain_height), + cumulative_difficulty_low64, + cumulative_difficulty_top64, + m_block_ids: ByteArrayVec::from(block_ids), + first_block: first_block_blob.map_or(Bytes::new(), Bytes::from), + // only needed when pruned + m_block_weights: if want_pruned_data { + block_weights.into_iter().map(usize_to_u64).collect() + } else { + vec![] + }, + })) +} + +/// [`ProtocolRequest::FluffyMissingTxs`] +async fn fluffy_missing_txs( + mut request: FluffyMissingTransactionsRequest, + mut blockchain_read_handle: BlockchainReadHandle, +) -> anyhow::Result { + let tx_indexes = std::mem::take(&mut request.missing_tx_indices); + let block_hash: [u8; 32] = *request.block_hash; + let current_blockchain_height = request.current_blockchain_height; + + // deallocate the backing `Bytes`. + drop(request); + + let BlockchainResponse::TxsInBlock(res) = blockchain_read_handle + .ready() + .await? + .call(BlockchainReadRequest::TxsInBlock { + block_hash, + tx_indexes, + }) + .await? + else { + unreachable!(); + }; + + let Some(TxsInBlock { block, txs }) = res else { + anyhow::bail!("The peer requested txs out of range."); + }; + + Ok(ProtocolResponse::NewFluffyBlock(NewFluffyBlock { + b: BlockCompleteEntry { + block: Bytes::from(block), + txs: TransactionBlobs::Normal(txs.into_iter().map(Bytes::from).collect()), + pruned: false, + // only needed for pruned blocks. + block_weight: 0, + }, + current_blockchain_height, + })) +} + +/// [`ProtocolRequest::NewFluffyBlock`] +async fn new_fluffy_block( + peer_information: PeerInformation, + request: NewFluffyBlock, + mut blockchain_read_handle: BlockchainReadHandle, + mut txpool_read_handle: TxpoolReadHandle, +) -> anyhow::Result { + // TODO: check context service here and ignore the block? + let current_blockchain_height = request.current_blockchain_height; + + peer_information + .core_sync_data + .lock() + .unwrap() + .current_height = current_blockchain_height; + + let (block, txs) = rayon_spawn_async(move || -> Result<_, anyhow::Error> { + let block = Block::read(&mut request.b.block.as_ref())?; + + let tx_blobs = request + .b + .txs + .take_normal() + .ok_or(anyhow::anyhow!("Peer sent pruned txs in fluffy block"))?; + + let txs = tx_blobs + .into_iter() + .map(|tx_blob| { + if tx_blob.len() > MAX_TRANSACTION_BLOB_SIZE { + anyhow::bail!("Peer sent a transaction over the size limit."); + } + + let tx = Transaction::read(&mut tx_blob.as_ref())?; + + Ok((tx.hash(), tx)) + }) + .collect::>()?; + + // The backing `Bytes` will be deallocated when this closure returns. + + Ok((block, txs)) + }) + .await?; + + let res = blockchain_interface::handle_incoming_block( + block, + txs, + &mut blockchain_read_handle, + &mut txpool_read_handle, + ) + .await; + + match res { + Ok(_) => Ok(ProtocolResponse::NA), + Err(IncomingBlockError::UnknownTransactions(block_hash, missing_tx_indices)) => Ok( + ProtocolResponse::FluffyMissingTransactionsRequest(FluffyMissingTransactionsRequest { + block_hash: block_hash.into(), + current_blockchain_height, + missing_tx_indices: missing_tx_indices.into_iter().map(usize_to_u64).collect(), + }), + ), + Err(IncomingBlockError::Orphan) => { + // Block's parent was unknown, could be syncing? + Ok(ProtocolResponse::NA) + } + Err(e) => Err(e.into()), + } +} + +/// [`ProtocolRequest::NewTransactions`] +async fn new_transactions( + peer_information: PeerInformation, + request: NewTransactions, + mut blockchain_context_service: BlockChainContextService, + mut incoming_tx_handler: IncomingTxHandler, +) -> anyhow::Result +where + A: NetZoneAddress, + InternalPeerID: Into, +{ + let BlockChainContextResponse::Context(context) = blockchain_context_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockChainContextRequest::Context) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; + + let context = context.unchecked_blockchain_context(); + + // If we are more than 2 blocks behind the peer then ignore the txs - we are probably still syncing. + if usize_to_u64(context.chain_height + 2) + < peer_information + .core_sync_data + .lock() + .unwrap() + .current_height + { + return Ok(ProtocolResponse::NA); + } + + let state = if request.dandelionpp_fluff { + TxState::Fluff + } else { + TxState::Stem { + from: peer_information.id.into(), + } + }; + + // Drop all the data except the stuff we still need. + let NewTransactions { txs, .. } = request; + + let res = incoming_tx_handler + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(IncomingTxs { txs, state }) + .await; + + match res { + Ok(()) => Ok(ProtocolResponse::NA), + Err(e) => Err(e.into()), + } +} diff --git a/binaries/cuprated/src/txpool.rs b/binaries/cuprated/src/txpool.rs index 9592c2b..2076956 100644 --- a/binaries/cuprated/src/txpool.rs +++ b/binaries/cuprated/src/txpool.rs @@ -12,4 +12,4 @@ mod dandelion; mod incoming_tx; mod txs_being_handled; -pub use incoming_tx::IncomingTxHandler; +pub use incoming_tx::{IncomingTxError, IncomingTxHandler, IncomingTxs}; diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index e204159..bf7684e 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -43,9 +43,13 @@ use crate::{ }; /// An error that can happen handling an incoming tx. +#[derive(Debug, thiserror::Error)] pub enum IncomingTxError { + #[error("Error parsing tx: {0}")] Parse(std::io::Error), + #[error(transparent)] Consensus(ExtendedConsensusError), + #[error("Duplicate tx in message")] DuplicateTransaction, } @@ -67,6 +71,7 @@ pub(super) type TxId = [u8; 32]; /// The service than handles incoming transaction pool transactions. /// /// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes. +#[derive(Clone)] pub struct IncomingTxHandler { /// A store of txs currently being handled in incoming tx requests. pub(super) txs_being_handled: TxsBeingHandled, diff --git a/net/wire/src/p2p/protocol.rs b/net/wire/src/p2p/protocol.rs index 1d1d45a..cc4b49d 100644 --- a/net/wire/src/p2p/protocol.rs +++ b/net/wire/src/p2p/protocol.rs @@ -159,7 +159,7 @@ epee_object!( current_blockchain_height: u64, ); -/// A request for Txs we are missing from our `TxPool` +/// A request for txs we are missing from an incoming block. #[derive(Debug, Clone, PartialEq, Eq)] pub struct FluffyMissingTransactionsRequest { /// The Block we are missing the Txs in diff --git a/p2p/p2p-core/src/protocol.rs b/p2p/p2p-core/src/protocol.rs index 7d8d431..82aac82 100644 --- a/p2p/p2p-core/src/protocol.rs +++ b/p2p/p2p-core/src/protocol.rs @@ -116,6 +116,7 @@ pub enum ProtocolResponse { GetChain(ChainResponse), NewFluffyBlock(NewFluffyBlock), NewTransactions(NewTransactions), + FluffyMissingTransactionsRequest(FluffyMissingTransactionsRequest), NA, } @@ -139,6 +140,9 @@ impl PeerResponse { ProtocolResponse::GetChain(_) => MessageID::GetChain, ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock, ProtocolResponse::NewTransactions(_) => MessageID::NewFluffyBlock, + ProtocolResponse::FluffyMissingTransactionsRequest(_) => { + MessageID::FluffyMissingTxs + } ProtocolResponse::NA => return None, }, diff --git a/p2p/p2p-core/src/protocol/try_from.rs b/p2p/p2p-core/src/protocol/try_from.rs index d3a7260..2dfc41d 100644 --- a/p2p/p2p-core/src/protocol/try_from.rs +++ b/p2p/p2p-core/src/protocol/try_from.rs @@ -71,6 +71,9 @@ impl TryFrom for ProtocolMessage { ProtocolResponse::NewFluffyBlock(val) => Self::NewFluffyBlock(val), ProtocolResponse::GetChain(val) => Self::ChainEntryResponse(val), ProtocolResponse::GetObjects(val) => Self::GetObjectsResponse(val), + ProtocolResponse::FluffyMissingTransactionsRequest(val) => { + Self::FluffyMissingTransactionsRequest(val) + } ProtocolResponse::NA => return Err(MessageConversionError), }) } diff --git a/p2p/p2p/src/constants.rs b/p2p/p2p/src/constants.rs index f70d64c..a81557c 100644 --- a/p2p/p2p/src/constants.rs +++ b/p2p/p2p/src/constants.rs @@ -52,7 +52,7 @@ pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3; /// The enforced maximum amount of blocks to request in a batch. /// /// Requesting more than this will cause the peer to disconnect and potentially lead to bans. -pub(crate) const MAX_BLOCK_BATCH_LEN: usize = 100; +pub const MAX_BLOCK_BATCH_LEN: usize = 100; /// The timeout that the block downloader will use for requests. pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); @@ -61,13 +61,13 @@ pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_sec /// be less than. /// /// ref: -pub(crate) const MAX_TRANSACTION_BLOB_SIZE: usize = 1_000_000; +pub const MAX_TRANSACTION_BLOB_SIZE: usize = 1_000_000; /// The maximum amount of block IDs allowed in a chain entry response. /// /// ref: // TODO: link to the protocol book when this section is added. -pub(crate) const MAX_BLOCKS_IDS_IN_CHAIN_ENTRY: usize = 25_000; +pub const MAX_BLOCKS_IDS_IN_CHAIN_ENTRY: usize = 25_000; /// The amount of failures downloading a specific batch before we stop attempting to download it. pub(crate) const MAX_DOWNLOAD_FAILURES: usize = 5; diff --git a/storage/blockchain/Cargo.toml b/storage/blockchain/Cargo.toml index 92b4374..c935924 100644 --- a/storage/blockchain/Cargo.toml +++ b/storage/blockchain/Cargo.toml @@ -34,6 +34,7 @@ serde = { workspace = true, optional = true } tower = { workspace = true } thread_local = { workspace = true } rayon = { workspace = true } +bytes = { workspace = true } [dev-dependencies] cuprate-constants = { workspace = true } diff --git a/storage/blockchain/src/ops/block.rs b/storage/blockchain/src/ops/block.rs index 5e54187..2dc88aa 100644 --- a/storage/blockchain/src/ops/block.rs +++ b/storage/blockchain/src/ops/block.rs @@ -2,21 +2,23 @@ //---------------------------------------------------------------------------------------------------- Import use bytemuck::TransparentWrapper; +use bytes::Bytes; use monero_serai::{ block::{Block, BlockHeader}, transaction::Transaction, }; use cuprate_database::{ - DbResult, RuntimeError, StorableVec, {DatabaseRo, DatabaseRw}, + DbResult, RuntimeError, StorableVec, {DatabaseIter, DatabaseRo, DatabaseRw}, }; +use cuprate_helper::cast::usize_to_u64; use cuprate_helper::{ map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}, tx::tx_fee, }; use cuprate_types::{ - AltBlockInformation, ChainId, ExtendedBlockHeader, HardFork, VerifiedBlockInformation, - VerifiedTransactionInformation, + AltBlockInformation, BlockCompleteEntry, ChainId, ExtendedBlockHeader, HardFork, + TransactionBlobs, VerifiedBlockInformation, VerifiedTransactionInformation, }; use crate::{ @@ -27,7 +29,7 @@ use crate::{ output::get_rct_num_outputs, tx::{add_tx, remove_tx}, }, - tables::{BlockHeights, BlockInfos, Tables, TablesMut}, + tables::{BlockHeights, BlockInfos, Tables, TablesIter, TablesMut}, types::{BlockHash, BlockHeight, BlockInfo}, }; @@ -222,6 +224,64 @@ pub fn pop_block( Ok((block_height, block_info.block_hash, block)) } +//---------------------------------------------------------------------------------------------------- `get_block_blob_with_tx_indexes` +/// Retrieve a block's raw bytes, the index of the miner transaction and the number of non miner-txs in the block. +/// +#[doc = doc_error!()] +pub fn get_block_blob_with_tx_indexes( + block_height: &BlockHeight, + tables: &impl Tables, +) -> Result<(Vec, u64, usize), RuntimeError> { + let miner_tx_idx = tables.block_infos().get(block_height)?.mining_tx_index; + + let block_txs = tables.block_txs_hashes().get(block_height)?.0; + let numb_txs = block_txs.len(); + + // Get the block header + let mut block = tables.block_header_blobs().get(block_height)?.0; + + // Add the miner tx to the blob. + let mut miner_tx_blob = tables.tx_blobs().get(&miner_tx_idx)?.0; + block.append(&mut miner_tx_blob); + + // Add the blocks tx hashes. + monero_serai::io::write_varint(&block_txs.len(), &mut block) + .expect("The number of txs per block will not exceed u64::MAX"); + + let block_txs_bytes = bytemuck::must_cast_slice(&block_txs); + block.extend_from_slice(block_txs_bytes); + + Ok((block, miner_tx_idx, numb_txs)) +} + +//---------------------------------------------------------------------------------------------------- `get_block_extended_header_*` +/// Retrieve a [`BlockCompleteEntry`] from the database. +/// +#[doc = doc_error!()] +pub fn get_block_complete_entry( + block_hash: &BlockHash, + tables: &impl TablesIter, +) -> Result { + let block_height = tables.block_heights().get(block_hash)?; + let (block_blob, miner_tx_idx, numb_non_miner_txs) = + get_block_blob_with_tx_indexes(&block_height, tables)?; + + let first_tx_idx = miner_tx_idx + 1; + + let tx_blobs = tables + .tx_blobs_iter() + .get_range(first_tx_idx..(usize_to_u64(numb_non_miner_txs) + first_tx_idx))? + .map(|tx_blob| Ok(Bytes::from(tx_blob?.0))) + .collect::>()?; + + Ok(BlockCompleteEntry { + block: Bytes::from(block_blob), + txs: TransactionBlobs::Normal(tx_blobs), + pruned: false, + block_weight: 0, + }) +} + //---------------------------------------------------------------------------------------------------- `get_block_extended_header_*` /// Retrieve a [`ExtendedBlockHeader`] from the database. /// diff --git a/storage/blockchain/src/ops/blockchain.rs b/storage/blockchain/src/ops/blockchain.rs index 7163363..54dd752 100644 --- a/storage/blockchain/src/ops/blockchain.rs +++ b/storage/blockchain/src/ops/blockchain.rs @@ -4,9 +4,9 @@ use cuprate_database::{DatabaseRo, DbResult, RuntimeError}; use crate::{ - ops::macros::doc_error, + ops::{block::block_exists, macros::doc_error}, tables::{BlockHeights, BlockInfos}, - types::BlockHeight, + types::{BlockHash, BlockHeight}, }; //---------------------------------------------------------------------------------------------------- Free Functions @@ -76,6 +76,44 @@ pub fn cumulative_generated_coins( } } +/// Find the split point between our chain and a list of [`BlockHash`]s from another chain. +/// +/// This function accepts chains in chronological and reverse chronological order, however +/// if the wrong order is specified the return value is meaningless. +/// +/// For chronologically ordered chains this will return the index of the first unknown, for reverse +/// chronologically ordered chains this will return the index of the first known. +/// +/// If all blocks are known for chronologically ordered chains or unknown for reverse chronologically +/// ordered chains then the length of the chain will be returned. +#[doc = doc_error!()] +#[inline] +pub fn find_split_point( + block_ids: &[BlockHash], + chronological_order: bool, + table_block_heights: &impl DatabaseRo, +) -> Result { + let mut err = None; + + // Do a binary search to find the first unknown/known block in the batch. + let idx = block_ids.partition_point(|block_id| { + match block_exists(block_id, table_block_heights) { + Ok(exists) => exists == chronological_order, + Err(e) => { + err.get_or_insert(e); + // if this happens the search is scrapped, just return `false` back. + false + } + } + }); + + if let Some(e) = err { + return Err(e); + } + + Ok(idx) +} + //---------------------------------------------------------------------------------------------------- Tests #[cfg(test)] mod test { diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index 7657759..84b1b92 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -10,23 +10,26 @@ //---------------------------------------------------------------------------------------------------- Import use std::{ + cmp::min, collections::{HashMap, HashSet}, sync::Arc, }; use rayon::{ - iter::{IntoParallelIterator, ParallelIterator}, + iter::{Either, IntoParallelIterator, ParallelIterator}, prelude::*, ThreadPool, }; use thread_local::ThreadLocal; -use cuprate_database::{ConcreteEnv, DatabaseRo, DbResult, Env, EnvInner, RuntimeError}; +use cuprate_database::{ + ConcreteEnv, DatabaseIter, DatabaseRo, DbResult, Env, EnvInner, RuntimeError, +}; use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; use cuprate_helper::map::combine_low_high_bits_to_u128; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, - Chain, ChainId, ExtendedBlockHeader, OutputHistogramInput, OutputOnChain, + Chain, ChainId, ExtendedBlockHeader, OutputHistogramInput, OutputOnChain, TxsInBlock, }; use crate::{ @@ -36,9 +39,10 @@ use crate::{ get_alt_chain_history_ranges, }, block::{ - block_exists, get_block_extended_header_from_height, get_block_height, get_block_info, + block_exists, get_block_blob_with_tx_indexes, get_block_complete_entry, + get_block_extended_header_from_height, get_block_height, get_block_info, }, - blockchain::{cumulative_generated_coins, top_block_height}, + blockchain::{cumulative_generated_coins, find_split_point, top_block_height}, key_image::key_image_exists, output::id_to_output_on_chain, }, @@ -46,7 +50,7 @@ use crate::{ free::{compact_history_genesis_not_included, compact_history_index_to_height_offset}, types::{BlockchainReadHandle, ResponseResult}, }, - tables::{AltBlockHeights, BlockHeights, BlockInfos, OpenTables, Tables}, + tables::{AltBlockHeights, BlockHeights, BlockInfos, OpenTables, Tables, TablesIter}, types::{ AltBlockHeight, Amount, AmountIndex, BlockHash, BlockHeight, KeyImage, PreRctOutputId, }, @@ -100,6 +104,7 @@ fn map_request( /* SOMEDAY: pre-request handling, run some code for each request? */ match request { + R::BlockCompleteEntries(block_hashes) => block_complete_entries(env, block_hashes), R::BlockExtendedHeader(block) => block_extended_header(env, block), R::BlockHash(block, chain) => block_hash(env, block, chain), R::FindBlock(block_hash) => find_block(env, block_hash), @@ -113,7 +118,12 @@ fn map_request( R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(env, vec), R::KeyImagesSpent(set) => key_images_spent(env, set), R::CompactChainHistory => compact_chain_history(env), + R::NextChainEntry(block_hashes, amount) => next_chain_entry(env, &block_hashes, amount), R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids), + R::TxsInBlock { + block_hash, + tx_indexes, + } => txs_in_block(env, block_hash, tx_indexes), R::AltBlocksInChain(chain_id) => alt_blocks_in_chain(env, chain_id), R::Block { height } => block(env, height), R::BlockByHash(hash) => block_by_hash(env, hash), @@ -198,6 +208,38 @@ macro_rules! get_tables { // TODO: The overhead of parallelism may be too much for every request, perfomace test to find optimal // amount of parallelism. +/// [`BlockchainReadRequest::BlockCompleteEntries`]. +fn block_complete_entries(env: &ConcreteEnv, block_hashes: Vec) -> ResponseResult { + // Prepare tx/tables in `ThreadLocal`. + let env_inner = env.env_inner(); + let tx_ro = thread_local(env); + let tables = thread_local(env); + + let (missing_hashes, blocks) = block_hashes + .into_par_iter() + .map(|block_hash| { + let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?; + let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref(); + + match get_block_complete_entry(&block_hash, tables) { + Err(RuntimeError::KeyNotFound) => Ok(Either::Left(block_hash)), + res => res.map(Either::Right), + } + }) + .collect::>()?; + + let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?; + let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref(); + + let blockchain_height = crate::ops::blockchain::chain_height(tables.block_heights())?; + + Ok(BlockchainResponse::BlockCompleteEntries { + blocks, + missing_hashes, + blockchain_height, + }) +} + /// [`BlockchainReadRequest::BlockExtendedHeader`]. #[inline] fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult { @@ -335,7 +377,7 @@ fn block_extended_header_in_range( } }) }) - .collect::, _>>()? + .collect::>>()? } }; @@ -534,6 +576,76 @@ fn compact_chain_history(env: &ConcreteEnv) -> ResponseResult { }) } +/// [`BlockchainReadRequest::NextChainEntry`] +/// +/// # Invariant +/// `block_ids` must be sorted in reverse chronological block order, or else +/// the returned result is unspecified and meaningless, as this function +/// performs a binary search. +fn next_chain_entry( + env: &ConcreteEnv, + block_ids: &[BlockHash], + next_entry_size: usize, +) -> ResponseResult { + // Single-threaded, no `ThreadLocal` required. + let env_inner = env.env_inner(); + let tx_ro = env_inner.tx_ro()?; + + let tables = env_inner.open_tables(&tx_ro)?; + let table_block_heights = tables.block_heights(); + let table_block_infos = tables.block_infos_iter(); + + let idx = find_split_point(block_ids, false, table_block_heights)?; + + // This will happen if we have a different genesis block. + if idx == block_ids.len() { + return Ok(BlockchainResponse::NextChainEntry { + start_height: None, + chain_height: 0, + block_ids: vec![], + block_weights: vec![], + cumulative_difficulty: 0, + first_block_blob: None, + }); + } + + // The returned chain entry must overlap with one of the blocks we were told about. + let first_known_block_hash = block_ids[idx]; + let first_known_height = table_block_heights.get(&first_known_block_hash)?; + + let chain_height = crate::ops::blockchain::chain_height(table_block_heights)?; + let last_height_in_chain_entry = min(first_known_height + next_entry_size, chain_height); + + let (block_ids, block_weights) = table_block_infos + .get_range(first_known_height..last_height_in_chain_entry)? + .map(|block_info| { + let block_info = block_info?; + + Ok((block_info.block_hash, block_info.weight)) + }) + .collect::, Vec<_>)>>()?; + + let top_block_info = table_block_infos.get(&(chain_height - 1))?; + + let first_block_blob = if block_ids.len() >= 2 { + Some(get_block_blob_with_tx_indexes(&(first_known_height + 1), &tables)?.0) + } else { + None + }; + + Ok(BlockchainResponse::NextChainEntry { + start_height: std::num::NonZero::new(first_known_height), + chain_height, + block_ids, + block_weights, + cumulative_difficulty: combine_low_high_bits_to_u128( + top_block_info.cumulative_difficulty_low, + top_block_info.cumulative_difficulty_high, + ), + first_block_blob, + }) +} + /// [`BlockchainReadRequest::FindFirstUnknown`] /// /// # Invariant @@ -546,24 +658,7 @@ fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseRes let table_block_heights = env_inner.open_db_ro::(&tx_ro)?; - let mut err = None; - - // Do a binary search to find the first unknown block in the batch. - let idx = - block_ids.partition_point( - |block_id| match block_exists(block_id, &table_block_heights) { - Ok(exists) => exists, - Err(e) => { - err.get_or_insert(e); - // if this happens the search is scrapped, just return `false` back. - false - } - }, - ); - - if let Some(e) = err { - return Err(e); - } + let idx = find_split_point(block_ids, true, &table_block_heights)?; Ok(if idx == block_ids.len() { BlockchainResponse::FindFirstUnknown(None) @@ -576,6 +671,33 @@ fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseRes }) } +/// [`BlockchainReadRequest::TxsInBlock`] +fn txs_in_block(env: &ConcreteEnv, block_hash: [u8; 32], missing_txs: Vec) -> ResponseResult { + // Single-threaded, no `ThreadLocal` required. + let env_inner = env.env_inner(); + let tx_ro = env_inner.tx_ro()?; + let tables = env_inner.open_tables(&tx_ro)?; + + let block_height = tables.block_heights().get(&block_hash)?; + + let (block, miner_tx_index, numb_txs) = get_block_blob_with_tx_indexes(&block_height, &tables)?; + let first_tx_index = miner_tx_index + 1; + + if numb_txs < missing_txs.len() { + return Ok(BlockchainResponse::TxsInBlock(None)); + } + + let txs = missing_txs + .into_iter() + .map(|index_offset| Ok(tables.tx_blobs().get(&(first_tx_index + index_offset))?.0)) + .collect::>()?; + + Ok(BlockchainResponse::TxsInBlock(Some(TxsInBlock { + block, + txs, + }))) +} + /// [`BlockchainReadRequest::AltBlocksInChain`] fn alt_blocks_in_chain(env: &ConcreteEnv, chain_id: ChainId) -> ResponseResult { // Prepare tx/tables in `ThreadLocal`. @@ -613,7 +735,7 @@ fn alt_blocks_in_chain(env: &ConcreteEnv, chain_id: ChainId) -> ResponseResult { ) }) }) - .collect::>()?; + .collect::>()?; Ok(BlockchainResponse::AltBlocksInChain(blocks)) } diff --git a/types/src/blockchain.rs b/types/src/blockchain.rs index c39c0bd..7518935 100644 --- a/types/src/blockchain.rs +++ b/types/src/blockchain.rs @@ -11,9 +11,9 @@ use std::{ use monero_serai::block::Block; use crate::{ - types::{Chain, ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation}, - AltBlockInformation, ChainId, ChainInfo, CoinbaseTxSum, OutputHistogramEntry, - OutputHistogramInput, + types::{Chain, ExtendedBlockHeader, OutputOnChain, TxsInBlock, VerifiedBlockInformation}, + AltBlockInformation, BlockCompleteEntry, ChainId, ChainInfo, CoinbaseTxSum, + OutputHistogramEntry, OutputHistogramInput, }; //---------------------------------------------------------------------------------------------------- ReadRequest @@ -27,6 +27,11 @@ use crate::{ /// See `Response` for the expected responses per `Request`. #[derive(Debug, Clone, PartialEq, Eq)] pub enum BlockchainReadRequest { + /// Request [`BlockCompleteEntry`]s. + /// + /// The input is the block hashes. + BlockCompleteEntries(Vec<[u8; 32]>), + /// Request a block's extended header. /// /// The input is the block's height. @@ -96,6 +101,16 @@ pub enum BlockchainReadRequest { /// A request for the compact chain history. CompactChainHistory, + /// A request for the next chain entry. + /// + /// Input is a list of block hashes and the amount of block hashes to return in the next chain entry. + /// + /// # Invariant + /// The [`Vec`] containing the block IDs must be sorted in reverse chronological block + /// order, or else the returned response is unspecified and meaningless, + /// as this request performs a binary search + NextChainEntry(Vec<[u8; 32]>, usize), + /// A request to find the first unknown block ID in a list of block IDs. /// /// # Invariant @@ -104,6 +119,16 @@ pub enum BlockchainReadRequest { /// as this request performs a binary search. FindFirstUnknown(Vec<[u8; 32]>), + /// A request for transactions from a specific block. + TxsInBlock { + /// The block to get transactions from. + block_hash: [u8; 32], + /// The indexes of the transactions from the block. + /// This is not the global index of the txs, instead it is the local index as they appear in + /// the block. + tx_indexes: Vec, + }, + /// A request for all alt blocks in the chain with the given [`ChainId`]. AltBlocksInChain(ChainId), @@ -182,6 +207,16 @@ pub enum BlockchainWriteRequest { #[expect(clippy::large_enum_variant)] pub enum BlockchainResponse { //------------------------------------------------------ Reads + /// Response to [`BlockchainReadRequest::BlockCompleteEntries`]. + BlockCompleteEntries { + /// The [`BlockCompleteEntry`]s that we had. + blocks: Vec, + /// The hashes of blocks that were requested, but we don't have. + missing_hashes: Vec<[u8; 32]>, + /// Our blockchain height. + blockchain_height: usize, + }, + /// Response to [`BlockchainReadRequest::BlockExtendedHeader`]. /// /// Inner value is the extended headed of the requested block. @@ -248,6 +283,24 @@ pub enum BlockchainResponse { cumulative_difficulty: u128, }, + /// Response to [`BlockchainReadRequest::NextChainEntry`]. + /// + /// If all blocks were unknown `start_height` will be [`None`], the other fields will be meaningless. + NextChainEntry { + /// The start height of this entry, [`None`] if we could not find the split point. + start_height: Option>, + /// The current chain height. + chain_height: usize, + /// The next block hashes in the entry. + block_ids: Vec<[u8; 32]>, + /// The block weights of the next blocks. + block_weights: Vec, + /// The current cumulative difficulty of our chain. + cumulative_difficulty: u128, + /// The block blob of the 2nd block in `block_ids`, if there is one. + first_block_blob: Option>, + }, + /// Response to [`BlockchainReadRequest::FindFirstUnknown`]. /// /// Contains the index of the first unknown block and its expected height. @@ -255,7 +308,12 @@ pub enum BlockchainResponse { /// This will be [`None`] if all blocks were known. FindFirstUnknown(Option<(usize, usize)>), - /// Response to [`BlockchainReadRequest::AltBlocksInChain`]. + /// The response for [`BlockchainReadRequest::TxsInBlock`]. + /// + /// Will return [`None`] if the request contained an index out of range. + TxsInBlock(Option), + + /// The response for [`BlockchainReadRequest::AltBlocksInChain`]. /// /// Contains all the alt blocks in the alt-chain in chronological order. AltBlocksInChain(Vec), diff --git a/types/src/lib.rs b/types/src/lib.rs index a5a04f9..7aaf0b9 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -26,8 +26,8 @@ pub use transaction_verification_data::{ pub use types::{ AddAuxPow, AltBlockInformation, AuxPow, Chain, ChainId, ChainInfo, CoinbaseTxSum, ExtendedBlockHeader, FeeEstimate, HardForkInfo, MinerData, MinerDataTxBacklogEntry, - OutputHistogramEntry, OutputHistogramInput, OutputOnChain, VerifiedBlockInformation, - VerifiedTransactionInformation, + OutputHistogramEntry, OutputHistogramInput, OutputOnChain, TxsInBlock, + VerifiedBlockInformation, VerifiedTransactionInformation, }; //---------------------------------------------------------------------------------------------------- Feature-gated diff --git a/types/src/types.rs b/types/src/types.rs index 720ad0a..8a5b5aa 100644 --- a/types/src/types.rs +++ b/types/src/types.rs @@ -259,6 +259,13 @@ pub struct AddAuxPow { pub aux_pow: Vec, } +/// The inner response for a request for txs in a block. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct TxsInBlock { + pub block: Vec, + pub txs: Vec>, +} + //---------------------------------------------------------------------------------------------------- Tests #[cfg(test)] mod test {