From f158f869b453b4403805a9186d88cc45a21195d9 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Fri, 4 Oct 2024 17:21:57 +0100 Subject: [PATCH] add `get_chain` handler --- binaries/cuprated/src/p2p/request_handler.rs | 59 ++++++++++- storage/blockchain/src/ops/block.rs | 16 ++- storage/blockchain/src/ops/blockchain.rs | 41 ++++++++ storage/blockchain/src/service/read.rs | 105 ++++++++++++++----- types/src/blockchain.rs | 22 ++++ 5 files changed, 205 insertions(+), 38 deletions(-) diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 818700d7..a109ae1b 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -10,10 +10,12 @@ use tower::{Service, ServiceExt}; use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_consensus::BlockChainContextService; use cuprate_fixed_bytes::ByteArrayVec; +use cuprate_helper::cast::usize_to_u64; +use cuprate_helper::map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}; use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN; use cuprate_p2p_core::{client::PeerInformation, NetworkZone, ProtocolRequest, ProtocolResponse}; use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; -use cuprate_wire::protocol::{GetObjectsRequest, GetObjectsResponse}; +use cuprate_wire::protocol::{ChainRequest, ChainResponse, GetObjectsRequest, GetObjectsResponse}; #[derive(Clone)] pub struct P2pProtocolRequestHandlerMaker { @@ -82,7 +84,7 @@ async fn get_objects( anyhow::bail!("Peer requested more blocks than allowed.") } - let block_ids: Vec<[u8; 32]> = (&request.blocks).into(); + let block_hashes: Vec<[u8; 32]> = (&request.blocks).into(); // de-allocate the backing `Bytes`. drop(request); @@ -93,7 +95,7 @@ async fn get_objects( } = blockchain_read_handle .ready() .await? - .call(BlockchainReadRequest::BlockCompleteEntries(block_ids)) + .call(BlockchainReadRequest::BlockCompleteEntries(block_hashes)) .await? else { panic!("blockchain returned wrong response!"); @@ -102,6 +104,55 @@ async fn get_objects( Ok(ProtocolResponse::GetObjects(GetObjectsResponse { blocks, missed_ids: ByteArrayVec::from(missing_hashes), - current_blockchain_height, + current_blockchain_height: usize_to_u64(blockchain_height), + })) +} + +/// [`ProtocolRequest::GetChain`] +async fn get_chain( + request: ChainRequest, + mut blockchain_read_handle: BlockchainReadHandle, +) -> anyhow::Result { + if request.block_ids.len() > 25_000 { + anyhow::bail!("Peer sent too many block hashes in chain request.") + } + + let block_hashes: Vec<[u8; 32]> = (&request.block_ids).into(); + let want_pruned_data = request.prune; + // de-allocate the backing `Bytes`. + drop(request); + + let BlockchainResponse::NextChainEntry { + start_height, + chain_height, + block_ids, + block_weights, + cumulative_difficulty, + first_block_blob, + } = blockchain_read_handle + .ready() + .await? + .call(BlockchainReadRequest::NextChainEntry(block_hashes, 10_000)) + .await? + else { + panic!("blockchain returned wrong response!"); + }; + + let (cumulative_difficulty_low64, cumulative_difficulty_top64) = + split_u128_into_low_high_bits(cumulative_difficulty); + + Ok(ProtocolResponse::GetChain(ChainResponse { + start_height: usize_to_u64(start_height), + total_height: usize_to_u64(chain_height), + cumulative_difficulty_low64, + cumulative_difficulty_top64, + m_block_ids: ByteArrayVec::from(block_ids), + first_block: Default::default(), + // only needed when + m_block_weights: if want_pruned_data { + block_weights.into_iter().map(usize_to_u64).collect() + } else { + vec![] + }, })) } diff --git a/storage/blockchain/src/ops/block.rs b/storage/blockchain/src/ops/block.rs index d038d1a1..070a413c 100644 --- a/storage/blockchain/src/ops/block.rs +++ b/storage/blockchain/src/ops/block.rs @@ -1,16 +1,15 @@ //! Block functions. -use std::slice; //---------------------------------------------------------------------------------------------------- Import use bytemuck::TransparentWrapper; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use monero_serai::{ block::{Block, BlockHeader}, transaction::Transaction, }; use cuprate_database::{ - RuntimeError, StorableVec, {DatabaseRo, DatabaseRw}, + RuntimeError, StorableVec, {DatabaseIter, DatabaseRo, DatabaseRw}, }; use cuprate_helper::cast::usize_to_u64; use cuprate_helper::{ @@ -22,7 +21,6 @@ use cuprate_types::{ TransactionBlobs, VerifiedBlockInformation, VerifiedTransactionInformation, }; -use crate::tables::TablesIter; use crate::{ ops::{ alt_block, @@ -31,7 +29,7 @@ use crate::{ output::get_rct_num_outputs, tx::{add_tx, remove_tx}, }, - tables::{BlockHeights, BlockInfos, Tables, TablesMut}, + tables::{BlockHeights, BlockInfos, Tables, TablesIter, TablesMut}, types::{BlockHash, BlockHeight, BlockInfo}, }; @@ -235,14 +233,14 @@ pub fn get_block_blob_with_tx_indexes( ) -> Result<(Vec, u64, usize), RuntimeError> { use monero_serai::io::write_varint; - let block_info = tables.block_infos().get(&block_height)?; + let block_info = tables.block_infos().get(block_height)?; let miner_tx_idx = block_info.mining_tx_index; - let mut block_txs = tables.block_txs_hashes().get(&block_height)?.0; + let block_txs = tables.block_txs_hashes().get(block_height)?.0; let numb_txs = block_txs.len(); // Get the block header - let mut block = tables.block_header_blobs().get(&block_height)?.0; + let mut block = tables.block_header_blobs().get(block_height)?.0; // Add the miner tx to the blob. let mut miner_tx_blob = tables.tx_blobs().get(&miner_tx_idx)?.0; @@ -273,7 +271,7 @@ pub fn get_block_complete_entry( .tx_blobs_iter() .get_range(first_tx_idx..=usize_to_u64(numb_non_miner_txs))? .map(|tx_blob| Ok(Bytes::from(tx_blob?.0))) - .collect::>()?; + .collect::>()?; Ok(BlockCompleteEntry { block: Bytes::from(block_blob), diff --git a/storage/blockchain/src/ops/blockchain.rs b/storage/blockchain/src/ops/blockchain.rs index 04f8b26d..664c0d0c 100644 --- a/storage/blockchain/src/ops/blockchain.rs +++ b/storage/blockchain/src/ops/blockchain.rs @@ -3,6 +3,8 @@ //---------------------------------------------------------------------------------------------------- Import use cuprate_database::{DatabaseRo, RuntimeError}; +use crate::ops::block::block_exists; +use crate::types::BlockHash; use crate::{ ops::macros::doc_error, tables::{BlockHeights, BlockInfos}, @@ -78,6 +80,45 @@ pub fn cumulative_generated_coins( } } +/// Find the split point between our chain and a list of [`BlockHash`]s from another chain. +/// +/// This function can be used accepts chains in chronological and reverse chronological order, however +/// if the wrong order is specified the return value is meaningless. +/// +/// For chronologically ordered chains this will return the index of the first unknown, for reverse +/// chronologically ordered chains this will return the index of the fist known. +/// +/// If all blocks are known for chronologically ordered chains or unknown for reverse chronologically +/// ordered chains then the length of the chain will be returned. +#[doc = doc_error!()] +#[inline] +pub fn find_split_point( + block_ids: &[BlockHash], + chronological_order: bool, + table_block_heights: &impl DatabaseRo, +) -> Result { + let mut err = None; + + // Do a binary search to find the first unknown block in the batch. + let idx = + block_ids.partition_point( + |block_id| match block_exists(block_id, table_block_heights) { + Ok(exists) => exists & chronological_order, + Err(e) => { + err.get_or_insert(e); + // if this happens the search is scrapped, just return `false` back. + false + } + }, + ); + + if let Some(e) = err { + return Err(e); + } + + Ok(idx) +} + //---------------------------------------------------------------------------------------------------- Tests #[cfg(test)] mod test { diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index 17151351..10bafc29 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -2,26 +2,26 @@ //---------------------------------------------------------------------------------------------------- Import use std::{ + cmp::min, collections::{HashMap, HashSet}, sync::Arc, }; use rayon::{ - iter::{IntoParallelIterator, ParallelIterator, Either}, + iter::{Either, IntoParallelIterator, ParallelIterator}, prelude::*, ThreadPool, }; use thread_local::ThreadLocal; -use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError}; +use cuprate_database::{ConcreteEnv, DatabaseIter, DatabaseRo, Env, EnvInner, RuntimeError}; use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; use cuprate_helper::map::combine_low_high_bits_to_u128; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, - BlockCompleteEntry, Chain, ChainId, ExtendedBlockHeader, OutputOnChain, + Chain, ChainId, ExtendedBlockHeader, OutputOnChain, }; -use crate::ops::block::get_block_complete_entry; use crate::{ ops::{ alt_block::{ @@ -29,9 +29,10 @@ use crate::{ get_alt_chain_history_ranges, }, block::{ - block_exists, get_block_extended_header_from_height, get_block_height, get_block_info, + block_exists, get_block_blob_with_tx_indexes, get_block_complete_entry, + get_block_extended_header_from_height, get_block_height, get_block_info, }, - blockchain::{cumulative_generated_coins, top_block_height}, + blockchain::{cumulative_generated_coins, find_split_point, top_block_height}, key_image::key_image_exists, output::id_to_output_on_chain, }, @@ -39,7 +40,7 @@ use crate::{ free::{compact_history_genesis_not_included, compact_history_index_to_height_offset}, types::{BlockchainReadHandle, ResponseResult}, }, - tables::{AltBlockHeights, BlockHeights, BlockInfos, OpenTables, Tables}, + tables::{AltBlockHeights, BlockHeights, BlockInfos, OpenTables, Tables, TablesIter}, types::{ AltBlockHeight, Amount, AmountIndex, BlockHash, BlockHeight, KeyImage, PreRctOutputId, }, @@ -107,6 +108,7 @@ fn map_request( R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(env, vec), R::KeyImagesSpent(set) => key_images_spent(env, set), R::CompactChainHistory => compact_chain_history(env), + R::NextChainEntry(block_hashes, amount) => next_chain_entry(env, &block_hashes, amount), R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids), R::AltBlocksInChain(chain_id) => alt_blocks_in_chain(env, chain_id), } @@ -552,6 +554,76 @@ fn compact_chain_history(env: &ConcreteEnv) -> ResponseResult { }) } +/// [`BlockchainReadRequest::NextChainEntry`] +/// +/// # Invariant +/// `block_ids` must be sorted in reverse chronological block order, or else +/// the returned result is unspecified and meaningless, as this function +/// performs a binary search. +fn next_chain_entry( + env: &ConcreteEnv, + block_ids: &[BlockHash], + next_entry_size: usize, +) -> ResponseResult { + // Single-threaded, no `ThreadLocal` required. + let env_inner = env.env_inner(); + let tx_ro = env_inner.tx_ro()?; + + let tables = env_inner.open_tables(&tx_ro)?; + let table_block_heights = tables.block_heights(); + let table_block_infos = tables.block_infos_iter(); + + let idx = find_split_point(block_ids, false, table_block_heights)?; + + // This will happen if we have a different genesis block. + if idx == block_ids.len() { + return Ok(BlockchainResponse::NextChainEntry { + start_height: 0, + chain_height: 0, + block_ids: vec![], + block_weights: vec![], + cumulative_difficulty: 0, + first_block_blob: None, + }); + } + + // The returned chain entry must overlap with one of the blocks we were told about. + let first_known_block_hash = block_ids[idx]; + let first_known_height = table_block_heights.get(&first_known_block_hash)?; + + let chain_height = crate::ops::blockchain::chain_height(table_block_heights)?; + let last_height_in_chain_entry = min(first_known_height + next_entry_size, chain_height); + + let (block_ids, block_weights) = table_block_infos + .get_range(first_known_height..last_height_in_chain_entry)? + .map(|block_info| { + let block_info = block_info?; + + Ok((block_info.block_hash, block_info.weight)) + }) + .collect::, Vec<_>), RuntimeError>>()?; + + let top_block_info = table_block_infos.get(&(chain_height - 1))?; + + let first_block_blob = if block_ids.len() >= 2 { + Some(get_block_blob_with_tx_indexes(&(first_known_height + 1), &tables)?.0) + } else { + None + }; + + Ok(BlockchainResponse::NextChainEntry { + start_height: first_known_height, + chain_height, + block_ids, + block_weights, + cumulative_difficulty: combine_low_high_bits_to_u128( + top_block_info.cumulative_difficulty_low, + top_block_info.cumulative_difficulty_high, + ), + first_block_blob, + }) +} + /// [`BlockchainReadRequest::FindFirstUnknown`] /// /// # Invariant @@ -564,24 +636,7 @@ fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseRes let table_block_heights = env_inner.open_db_ro::(&tx_ro)?; - let mut err = None; - - // Do a binary search to find the first unknown block in the batch. - let idx = - block_ids.partition_point( - |block_id| match block_exists(block_id, &table_block_heights) { - Ok(exists) => exists, - Err(e) => { - err.get_or_insert(e); - // if this happens the search is scrapped, just return `false` back. - false - } - }, - ); - - if let Some(e) = err { - return Err(e); - } + let idx = find_split_point(block_ids, true, &table_block_heights)?; Ok(if idx == block_ids.len() { BlockchainResponse::FindFirstUnknown(None) diff --git a/types/src/blockchain.rs b/types/src/blockchain.rs index 11127057..526dc431 100644 --- a/types/src/blockchain.rs +++ b/types/src/blockchain.rs @@ -95,6 +95,16 @@ pub enum BlockchainReadRequest { /// A request for the compact chain history. CompactChainHistory, + /// A request for the next chain entry. + /// + /// Input is a list of block hashes and the amount of block hashes to return in the next chain entry. + /// + /// # Invariant + /// The [`Vec`] containing the block IDs must be sorted in reverse chronological block + /// order, or else the returned response is unspecified and meaningless, + /// as this request performs a binary search + NextChainEntry(Vec<[u8; 32]>, usize), + /// A request to find the first unknown block ID in a list of block IDs. /// /// # Invariant @@ -223,6 +233,18 @@ pub enum BlockchainResponse { cumulative_difficulty: u128, }, + /// Response to [`BlockchainReadRequest::NextChainEntry`]. + /// + /// If all blocks were unknown `start_height` will be `0`, the other fields will be meaningless. + NextChainEntry { + start_height: usize, + chain_height: usize, + block_ids: Vec<[u8; 32]>, + block_weights: Vec, + cumulative_difficulty: u128, + first_block_blob: Option>, + }, + /// The response for [`BlockchainReadRequest::FindFirstUnknown`]. /// /// Contains the index of the first unknown block and its expected height.