From 6e8fbf04f633f4425fc606d6dc1d3c8f88410ff8 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Fri, 4 Oct 2024 02:45:42 +0100 Subject: [PATCH] add `get_objects` handler --- Cargo.lock | 1 + binaries/cuprated/src/p2p/request_handler.rs | 106 +++++++++++++++++++ p2p/p2p/src/constants.rs | 2 +- storage/blockchain/Cargo.toml | 1 + storage/blockchain/src/ops/block.rs | 62 ++++++++++- storage/blockchain/src/service/read.rs | 38 ++++++- types/src/blockchain.rs | 10 +- 7 files changed, 214 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dd303bf..8ba7ed8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -547,6 +547,7 @@ version = "0.0.0" dependencies = [ "bitflags 2.6.0", "bytemuck", + "bytes", "cuprate-database", "cuprate-database-service", "cuprate-helper", diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 8b13789..818700d 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -1 +1,107 @@ +use std::{ + future::{ready, Ready}, + task::{Context, Poll}, +}; +use futures::future::BoxFuture; +use futures::FutureExt; +use tower::{Service, ServiceExt}; + +use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_consensus::BlockChainContextService; +use cuprate_fixed_bytes::ByteArrayVec; +use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN; +use cuprate_p2p_core::{client::PeerInformation, NetworkZone, ProtocolRequest, ProtocolResponse}; +use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; +use cuprate_wire::protocol::{GetObjectsRequest, GetObjectsResponse}; + +#[derive(Clone)] +pub struct P2pProtocolRequestHandlerMaker { + pub blockchain_read_handle: BlockchainReadHandle, +} + +impl Service> for P2pProtocolRequestHandlerMaker { + type Response = P2pProtocolRequestHandler; + type Error = tower::BoxError; + type Future = Ready>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, peer_information: PeerInformation) -> Self::Future { + // TODO: check sync info? + + let blockchain_read_handle = self.blockchain_read_handle.clone(); + + ready(Ok(P2pProtocolRequestHandler { + peer_information, + blockchain_read_handle, + })) + } +} + +#[derive(Clone)] +pub struct P2pProtocolRequestHandler { + peer_information: PeerInformation, + blockchain_read_handle: BlockchainReadHandle, +} + +impl Service for P2pProtocolRequestHandler { + type Response = ProtocolResponse; + type Error = anyhow::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: ProtocolRequest) -> Self::Future { + match request { + ProtocolRequest::GetObjects(r) => { + get_objects(r, self.blockchain_read_handle.clone()).boxed() + } + ProtocolRequest::GetChain(_) => todo!(), + ProtocolRequest::FluffyMissingTxs(_) => todo!(), + ProtocolRequest::GetTxPoolCompliment(_) => todo!(), + ProtocolRequest::NewBlock(_) => todo!(), + ProtocolRequest::NewFluffyBlock(_) => todo!(), + ProtocolRequest::NewTransactions(_) => todo!(), + } + } +} + +//---------------------------------------------------------------------------------------------------- Handler functions + +/// [`ProtocolRequest::GetObjects`] +async fn get_objects( + request: GetObjectsRequest, + mut blockchain_read_handle: BlockchainReadHandle, +) -> anyhow::Result { + if request.blocks.len() > MAX_BLOCK_BATCH_LEN { + anyhow::bail!("Peer requested more blocks than allowed.") + } + + let block_ids: Vec<[u8; 32]> = (&request.blocks).into(); + // de-allocate the backing `Bytes`. + drop(request); + + let BlockchainResponse::BlockCompleteEntries { + blocks, + missing_hashes, + blockchain_height, + } = blockchain_read_handle + .ready() + .await? + .call(BlockchainReadRequest::BlockCompleteEntries(block_ids)) + .await? + else { + panic!("blockchain returned wrong response!"); + }; + + Ok(ProtocolResponse::GetObjects(GetObjectsResponse { + blocks, + missed_ids: ByteArrayVec::from(missing_hashes), + current_blockchain_height, + })) +} diff --git a/p2p/p2p/src/constants.rs b/p2p/p2p/src/constants.rs index f70d64c..d1060ae 100644 --- a/p2p/p2p/src/constants.rs +++ b/p2p/p2p/src/constants.rs @@ -52,7 +52,7 @@ pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3; /// The enforced maximum amount of blocks to request in a batch. /// /// Requesting more than this will cause the peer to disconnect and potentially lead to bans. -pub(crate) const MAX_BLOCK_BATCH_LEN: usize = 100; +pub const MAX_BLOCK_BATCH_LEN: usize = 100; /// The timeout that the block downloader will use for requests. pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); diff --git a/storage/blockchain/Cargo.toml b/storage/blockchain/Cargo.toml index 6eecb89..073b418 100644 --- a/storage/blockchain/Cargo.toml +++ b/storage/blockchain/Cargo.toml @@ -35,6 +35,7 @@ serde = { workspace = true, optional = true } tower = { workspace = true } thread_local = { workspace = true, optional = true } rayon = { workspace = true, optional = true } +bytes = "1.7.2" [dev-dependencies] cuprate-helper = { path = "../../helper", features = ["thread", "cast"] } diff --git a/storage/blockchain/src/ops/block.rs b/storage/blockchain/src/ops/block.rs index 6d32fd8..d038d1a 100644 --- a/storage/blockchain/src/ops/block.rs +++ b/storage/blockchain/src/ops/block.rs @@ -1,7 +1,9 @@ //! Block functions. +use std::slice; //---------------------------------------------------------------------------------------------------- Import use bytemuck::TransparentWrapper; +use bytes::{Bytes, BytesMut}; use monero_serai::{ block::{Block, BlockHeader}, transaction::Transaction, @@ -10,15 +12,17 @@ use monero_serai::{ use cuprate_database::{ RuntimeError, StorableVec, {DatabaseRo, DatabaseRw}, }; +use cuprate_helper::cast::usize_to_u64; use cuprate_helper::{ map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}, tx::tx_fee, }; use cuprate_types::{ - AltBlockInformation, ChainId, ExtendedBlockHeader, HardFork, VerifiedBlockInformation, - VerifiedTransactionInformation, + AltBlockInformation, BlockCompleteEntry, ChainId, ExtendedBlockHeader, HardFork, + TransactionBlobs, VerifiedBlockInformation, VerifiedTransactionInformation, }; +use crate::tables::TablesIter; use crate::{ ops::{ alt_block, @@ -224,6 +228,60 @@ pub fn pop_block( Ok((block_height, block_info.block_hash, block)) } +//---------------------------------------------------------------------------------------------------- `get_block_blob_with_tx_indexes` +pub fn get_block_blob_with_tx_indexes( + block_height: &BlockHeight, + tables: &impl Tables, +) -> Result<(Vec, u64, usize), RuntimeError> { + use monero_serai::io::write_varint; + + let block_info = tables.block_infos().get(&block_height)?; + + let miner_tx_idx = block_info.mining_tx_index; + let mut block_txs = tables.block_txs_hashes().get(&block_height)?.0; + let numb_txs = block_txs.len(); + + // Get the block header + let mut block = tables.block_header_blobs().get(&block_height)?.0; + + // Add the miner tx to the blob. + let mut miner_tx_blob = tables.tx_blobs().get(&miner_tx_idx)?.0; + block.append(&mut miner_tx_blob); + + // Add the blocks tx hashes. + write_varint(&block_txs.len(), &mut block) + .expect("The number of txs per block will not exceed u64::MAX"); + for tx in block_txs { + block.extend_from_slice(&tx); + } + + Ok((block, miner_tx_idx, numb_txs)) +} + +//---------------------------------------------------------------------------------------------------- `get_block_extended_header_*` +pub fn get_block_complete_entry( + block_hash: &BlockHash, + tables: &impl TablesIter, +) -> Result { + let block_height = tables.block_heights().get(block_hash)?; + let (block_blob, miner_tx_idx, numb_non_miner_txs) = + get_block_blob_with_tx_indexes(&block_height, tables)?; + + let first_tx_idx = miner_tx_idx + 1; + + let tx_blobs = tables + .tx_blobs_iter() + .get_range(first_tx_idx..=usize_to_u64(numb_non_miner_txs))? + .map(|tx_blob| Ok(Bytes::from(tx_blob?.0))) + .collect::>()?; + + Ok(BlockCompleteEntry { + block: Bytes::from(block_blob), + txs: TransactionBlobs::Normal(tx_blobs), + pruned: false, + block_weight: 0, + }) +} //---------------------------------------------------------------------------------------------------- `get_block_extended_header_*` /// Retrieve a [`ExtendedBlockHeader`] from the database. diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index b0e7e04..1715135 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -7,7 +7,7 @@ use std::{ }; use rayon::{ - iter::{IntoParallelIterator, ParallelIterator}, + iter::{IntoParallelIterator, ParallelIterator, Either}, prelude::*, ThreadPool, }; @@ -18,9 +18,10 @@ use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThre use cuprate_helper::map::combine_low_high_bits_to_u128; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, - Chain, ChainId, ExtendedBlockHeader, OutputOnChain, + BlockCompleteEntry, Chain, ChainId, ExtendedBlockHeader, OutputOnChain, }; +use crate::ops::block::get_block_complete_entry; use crate::{ ops::{ alt_block::{ @@ -92,6 +93,7 @@ fn map_request( /* SOMEDAY: pre-request handling, run some code for each request? */ match request { + R::BlockCompleteEntries(block_hashes) => block_complete_entries(env, block_hashes), R::BlockExtendedHeader(block) => block_extended_header(env, block), R::BlockHash(block, chain) => block_hash(env, block, chain), R::FindBlock(block_hash) => find_block(env, block_hash), @@ -182,6 +184,38 @@ macro_rules! get_tables { // TODO: The overhead of parallelism may be too much for every request, perfomace test to find optimal // amount of parallelism. +/// [`BlockchainReadRequest::BlockCompleteEntries`]. +fn block_complete_entries(env: &ConcreteEnv, block_hashes: Vec) -> ResponseResult { + // Prepare tx/tables in `ThreadLocal`. + let env_inner = env.env_inner(); + let tx_ro = thread_local(env); + let tables = thread_local(env); + + let (missing_hashes, blocks) = block_hashes + .into_par_iter() + .map(|block_hash| { + let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?; + let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref(); + + match get_block_complete_entry(&block_hash, tables) { + Err(RuntimeError::KeyNotFound) => Ok(Either::Left(block_hash)), + res => res.map(Either::Right), + } + }) + .collect::>()?; + + let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?; + let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref(); + + let blockchain_height = crate::ops::blockchain::chain_height(tables.block_heights())?; + + Ok(BlockchainResponse::BlockCompleteEntries { + blocks, + missing_hashes, + blockchain_height, + }) +} + /// [`BlockchainReadRequest::BlockExtendedHeader`]. #[inline] fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult { diff --git a/types/src/blockchain.rs b/types/src/blockchain.rs index f2b96db..1112705 100644 --- a/types/src/blockchain.rs +++ b/types/src/blockchain.rs @@ -10,7 +10,7 @@ use std::{ use crate::{ types::{Chain, ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation}, - AltBlockInformation, ChainId, + AltBlockInformation, BlockCompleteEntry, ChainId, }; //---------------------------------------------------------------------------------------------------- ReadRequest @@ -24,6 +24,8 @@ use crate::{ /// See `Response` for the expected responses per `Request`. #[derive(Debug, Clone, PartialEq, Eq)] pub enum BlockchainReadRequest { + BlockCompleteEntries(Vec<[u8; 32]>), + /// Request a block's extended header. /// /// The input is the block's height. @@ -149,6 +151,12 @@ pub enum BlockchainWriteRequest { #[derive(Debug, Clone, PartialEq, Eq)] pub enum BlockchainResponse { //------------------------------------------------------ Reads + BlockCompleteEntries { + blocks: Vec, + missing_hashes: Vec<[u8; 32]>, + blockchain_height: usize, + }, + /// Response to [`BlockchainReadRequest::BlockExtendedHeader`]. /// /// Inner value is the extended headed of the requested block.