diff --git a/Cargo.lock b/Cargo.lock index 3300a978..e6b61758 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -531,6 +531,7 @@ version = "0.0.0" dependencies = [ "bitflags 2.5.0", "bytemuck", + "bytes", "cuprate-database", "cuprate-database-service", "cuprate-helper", diff --git a/binaries/cuprated/Cargo.toml b/binaries/cuprated/Cargo.toml index 125961b4..43daabfb 100644 --- a/binaries/cuprated/Cargo.toml +++ b/binaries/cuprated/Cargo.toml @@ -23,7 +23,7 @@ cuprate-p2p-core = { path = "../../p2p/p2p-core" } cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower" } cuprate-async-buffer = { path = "../../p2p/async-buffer" } cuprate-address-book = { path = "../../p2p/address-book" } -cuprate-blockchain = { path = "../../storage/blockchain" } +cuprate-blockchain = { path = "../../storage/blockchain", features = ["service"] } cuprate-database-service = { path = "../../storage/service" } cuprate-txpool = { path = "../../storage/txpool" } cuprate-database = { path = "../../storage/database" } diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index e13c9073..eaf8669f 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -1,17 +1,27 @@ mod batch_handler; +mod handler; -use crate::blockchain::manager::batch_handler::handle_incoming_block_batch; use crate::blockchain::types::ConsensusBlockchainReadHandle; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; -use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService}; +use cuprate_consensus::context::RawBlockChainContext; +use cuprate_consensus::{ + BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, + BlockVerifierService, ExtendedConsensusError, TxVerifierService, VerifyBlockRequest, + VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse, +}; use cuprate_p2p::block_downloader::BlockBatch; +use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; +use cuprate_types::Chain; use futures::StreamExt; use tokio::sync::mpsc::Receiver; +use tower::{Service, ServiceExt}; +use tracing::error; pub struct BlockchainManager { blockchain_write_handle: BlockchainWriteHandle, blockchain_read_handle: BlockchainReadHandle, blockchain_context_service: BlockChainContextService, + cached_blockchain_context: RawBlockChainContext, block_verifier_service: BlockVerifierService< BlockChainContextService, TxVerifierService, @@ -34,20 +44,86 @@ impl BlockchainManager { blockchain_write_handle, blockchain_read_handle, blockchain_context_service, + cached_blockchain_context: todo!(), block_verifier_service, } } + async fn handle_incoming_main_chain_batch( + &mut self, + batch: BlockBatch, + ) -> Result<(), anyhow::Error> { + let VerifyBlockResponse::MainChainBatchPrepped(prepped) = self + .block_verifier_service + .ready() + .await + .expect("TODO") + .call(VerifyBlockRequest::MainChainBatchPrepareBlocks { + blocks: batch.blocks, + }) + .await? + else { + panic!("Incorrect response!"); + }; + + for (block, txs) in prepped { + let VerifyBlockResponse::MainChain(verified_block) = block_verifier_service + .ready() + .await + .expect("TODO") + .call(VerifyBlockRequest::MainChainPrepped { block, txs }) + .await + .unwrap() + else { + panic!("Incorrect response!"); + }; + + blockchain_context_service + .ready() + .await + .expect("TODO") + .call(BlockChainContextRequest::Update(NewBlockData { + block_hash: verified_block.block_hash, + height: verified_block.height, + timestamp: verified_block.block.header.timestamp, + weight: verified_block.weight, + long_term_weight: verified_block.long_term_weight, + generated_coins: verified_block.generated_coins, + vote: HardFork::from_vote(verified_block.block.header.hardfork_signal), + cumulative_difficulty: verified_block.cumulative_difficulty, + })) + .await + .expect("TODO"); + + blockchain_write_handle + .ready() + .await + .expect("TODO") + .call(BlockchainWriteRequest::WriteBlock(verified_block)) + .await + .expect("TODO"); + } + } + + async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) { + let (first_block, _) = batch + .blocks + .first() + .expect("Block batch should not be empty"); + + if first_block.header.previous == self.cached_blockchain_context.top_hash { + todo!("Main chain") + } else { + todo!("Alt chain") + } + } + pub async fn run(mut self, mut batch_rx: Receiver) { loop { tokio::select! { Some(batch) = batch_rx.recv() => { - handle_incoming_block_batch( + self.handle_incoming_block_batch( batch, - &mut self.block_verifier_service, - &mut self.blockchain_context_service, - &mut self.blockchain_read_handle, - &mut self.blockchain_write_handle ).await; } else => { diff --git a/binaries/cuprated/src/blockchain/manager/batch_handler.rs b/binaries/cuprated/src/blockchain/manager/batch_handler.rs index c4a3d6e5..ea08af42 100644 --- a/binaries/cuprated/src/blockchain/manager/batch_handler.rs +++ b/binaries/cuprated/src/blockchain/manager/batch_handler.rs @@ -3,6 +3,7 @@ use crate::blockchain::types::ConsensusBlockchainReadHandle; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_consensus::context::NewBlockData; +use cuprate_consensus::transactions::new_tx_verification_data; use cuprate_consensus::{ BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, BlockVerifierService, BlockchainReadRequest, BlockchainResponse, ExtendedConsensusError, @@ -11,82 +12,17 @@ use cuprate_consensus::{ use cuprate_p2p::block_downloader::BlockBatch; use cuprate_types::blockchain::BlockchainWriteRequest; use cuprate_types::{Chain, HardFork}; +use rayon::prelude::*; use tower::{Service, ServiceExt}; use tracing::{debug, error, info}; -pub async fn handle_incoming_block_batch( - batch: BlockBatch, - block_verifier_service: &mut BlockVerifierService, - blockchain_context_service: &mut C, - blockchain_read_handle: &mut BlockchainReadHandle, - blockchain_write_handle: &mut BlockchainWriteHandle, -) where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Clone - + Send - + 'static, - C::Future: Send + 'static, - - TxV: Service - + Clone - + Send - + 'static, - TxV::Future: Send + 'static, -{ - let (first_block, _) = batch - .blocks - .first() - .expect("Block batch should not be empty"); - - handle_incoming_block_batch_main_chain( - batch, - block_verifier_service, - blockchain_context_service, - blockchain_write_handle, - ) - .await; - - // TODO: alt block to the DB - /* - match blockchain_read_handle - .oneshot(BlockchainReadRequest::FindBlock( - first_block.header.previous, - )) - .await - { - Err(_) | Ok(BlockchainResponse::FindBlock(None)) => { - // The block downloader shouldn't be downloading orphan blocks - error!("Failed to find parent block for first block in batch."); - return; - } - Ok(BlockchainResponse::FindBlock(Some((chain, _)))) => match chain { - Chain::Main => { - handle_incoming_block_batch_main_chain( - batch, - block_verifier_service, - blockchain_context_service, - blockchain_write_handle, - ) - .await; - } - Chain::Alt(_) => todo!(), - }, - - Ok(_) => panic!("Blockchain service returned incorrect response"), - } - - */ -} - async fn handle_incoming_block_batch_main_chain( batch: BlockBatch, block_verifier_service: &mut BlockVerifierService, blockchain_context_service: &mut C, blockchain_write_handle: &mut BlockchainWriteHandle, -) where +) -> Result<(), anyhow::Error> +where C: Service< BlockChainContextRequest, Response = BlockChainContextResponse, @@ -114,8 +50,7 @@ async fn handle_incoming_block_batch_main_chain( .call(VerifyBlockRequest::MainChainBatchPrepareBlocks { blocks: batch.blocks, }) - .await - .unwrap() + .await? else { panic!("Incorrect response!"); }; @@ -126,8 +61,7 @@ async fn handle_incoming_block_batch_main_chain( .await .expect("TODO") .call(VerifyBlockRequest::MainChainPrepped { block, txs }) - .await - .unwrap() + .await? else { panic!("Incorrect response!"); }; @@ -158,3 +92,30 @@ async fn handle_incoming_block_batch_main_chain( .expect("TODO"); } } + +async fn handle_incoming_block_batch_alt_chain( + batch: BlockBatch, + block_verifier_service: &mut BlockVerifierService, + blockchain_context_service: &mut C, + blockchain_write_handle: &mut BlockchainWriteHandle, +) -> Result<(), anyhow::Error> +where + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + > + Clone + + Send + + 'static, + C::Future: Send + 'static, + + TxV: Service + + Clone + + Send + + 'static, + TxV::Future: Send + 'static, +{ + for (block, txs) in batch.blocks { + alt_block_info.cumulative_difficulty + } +} diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs new file mode 100644 index 00000000..221acb81 --- /dev/null +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -0,0 +1,163 @@ +use crate::blockchain::types::ConsensusBlockchainReadHandle; +use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; +use cuprate_consensus::transactions::new_tx_verification_data; +use cuprate_consensus::{ + BlockChainContextRequest, BlockChainContextResponse, BlockVerifierService, + ExtendedConsensusError, VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, + VerifyTxResponse, +}; +use cuprate_p2p::block_downloader::BlockBatch; +use cuprate_types::blockchain::{ + BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest, +}; +use cuprate_types::AltBlockInformation; +use monero_serai::block::Block; +use monero_serai::transaction::Transaction; +use rayon::prelude::*; +use tower::{Service, ServiceExt}; + +async fn handle_incoming_alt_block( + block: Block, + txs: Vec, + current_cumulative_difficulty: u128, + block_verifier_service: &mut BlockVerifierService, + blockchain_context_service: &mut C, + blockchain_write_handle: &mut BlockchainWriteHandle, +) -> Result<(), anyhow::Error> +where + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + > + Clone + + Send + + 'static, + C::Future: Send + 'static, + + TxV: Service + + Clone + + Send + + 'static, + TxV::Future: Send + 'static, +{ + let prepared_txs = txs + .into_par_iter() + .map(|tx| { + let tx = new_tx_verification_data(tx)?; + (tx.tx_hash, tx) + }) + .collect::>()?; + + let VerifyBlockResponse::AltChain(alt_block_info) = block_verifier_service + .ready() + .await + .expect("TODO") + .call(VerifyBlockRequest::AltChain { + block, + prepared_txs, + }) + .await? + else { + panic!("Incorrect response!"); + }; + + if alt_block_info.cumulative_difficulty > current_cumulative_difficulty { + todo!("do re-org"); + } + + blockchain_write_handle + .ready() + .await + .expect("TODO") + .call(BlockchainWriteRequest::WriteAltBlock(alt_block_info))?; + + Ok(()) +} + +async fn try_do_reorg( + top_alt_block: AltBlockInformation, + chain_height: usize, + block_verifier_service: &mut BlockVerifierService, + blockchain_context_service: &mut C, + blockchain_write_handle: &mut BlockchainWriteHandle, + blockchain_read_handle: &mut BlockchainReadHandle, +) -> Result<(), anyhow::Error> +where + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + > + Clone + + Send + + 'static, + C::Future: Send + 'static, + + TxV: Service + + Clone + + Send + + 'static, + TxV::Future: Send + 'static, +{ + let BlockchainResponse::AltBlocksInChain(mut alt_blocks) = blockchain_read_handle + .ready() + .await + .expect("TODO") + .call(BlockchainReadRequest::AltBlocksInChain( + top_alt_block.chain_id, + )) + .await? + else { + panic!("Incorrect response!"); + }; + + alt_blocks.push(top_alt_block); + + let split_height = alt_blocks[0].height; + + let BlockchainResponse::PopBlocks(old_main_chain_id) = blockchain_write_handle + .ready() + .await + .expect("TODO") + .call(BlockchainWriteRequest::PopBlocks( + chain_height - split_height + 1, + )) + .await? + else { + panic!("Incorrect response!"); + }; + + todo!() +} + +async fn verify_add_alt_blocks_to_main_chain( + alt_blocks: Vec, + block_verifier_service: &mut BlockVerifierService, + blockchain_context_service: &mut C, + blockchain_write_handle: &mut BlockchainWriteHandle, +) -> Result<(), anyhow::Error> +where + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + > + Clone + + Send + + 'static, + C::Future: Send + 'static, + + TxV: Service + + Clone + + Send + + 'static, + TxV::Future: Send + 'static, +{ + let VerifyBlockResponse::AltChain(alt_block_info) = block_verifier_service + .ready() + .await + .expect("TODO") + .call(VerifyBlockRequest::MainChainPrepped { block, txs }) + .await? + else { + panic!("Incorrect response!"); + }; +} diff --git a/binaries/cuprated/src/main.rs b/binaries/cuprated/src/main.rs index 62a7056a..0e706b93 100644 --- a/binaries/cuprated/src/main.rs +++ b/binaries/cuprated/src/main.rs @@ -42,7 +42,9 @@ fn main() { .unwrap(); let net = cuprate_p2p::initialize_network( - p2p::request_handler::P2pProtocolRequestHandler, + p2p::request_handler::P2pProtocolRequestHandler { + blockchain_read_handle: bc_read_handle.clone(), + }, p2p::core_sync_svc::CoreSyncService(context_svc.clone()), config.clearnet_config(), ) diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 76554e92..0a4412fa 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -1,12 +1,23 @@ +use bytes::Bytes; use cuprate_p2p_core::{ProtocolRequest, ProtocolResponse}; use futures::future::BoxFuture; use futures::FutureExt; use std::task::{Context, Poll}; -use tower::Service; +use tower::{Service, ServiceExt}; use tracing::trace; +use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_helper::cast::usize_to_u64; +use cuprate_helper::map::split_u128_into_low_high_bits; +use cuprate_p2p::constants::{MAX_BLOCKCHAIN_SUPPLEMENT_LEN, MAX_BLOCK_BATCH_LEN}; +use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; +use cuprate_types::BlockCompleteEntry; +use cuprate_wire::protocol::{ChainRequest, ChainResponse, GetObjectsRequest, GetObjectsResponse}; + #[derive(Clone)] -pub struct P2pProtocolRequestHandler; +pub struct P2pProtocolRequestHandler { + pub(crate) blockchain_read_handle: BlockchainReadHandle, +} impl Service for P2pProtocolRequestHandler { type Response = ProtocolResponse; @@ -19,15 +30,98 @@ impl Service for P2pProtocolRequestHandler { fn call(&mut self, req: ProtocolRequest) -> Self::Future { match req { - ProtocolRequest::GetObjects(_) => trace!("TODO: GetObjects"), - ProtocolRequest::GetChain(_) => trace!("TODO: GetChain"), - ProtocolRequest::FluffyMissingTxs(_) => trace!("TODO: FluffyMissingTxs"), - ProtocolRequest::GetTxPoolCompliment(_) => trace!("TODO: GetTxPoolCompliment"), - ProtocolRequest::NewBlock(_) => trace!("TODO: NewBlock"), - ProtocolRequest::NewFluffyBlock(_) => trace!("TODO: NewFluffyBlock"), - ProtocolRequest::NewTransactions(_) => trace!("TODO: NewTransactions"), + ProtocolRequest::GetObjects(req) => { + get_objects(self.blockchain_read_handle.clone(), req).boxed() + } + ProtocolRequest::GetChain(req) => { + get_chain(self.blockchain_read_handle.clone(), req).boxed() + } + ProtocolRequest::FluffyMissingTxs(_) => async { Ok(ProtocolResponse::NA) }.boxed(), + ProtocolRequest::GetTxPoolCompliment(_) => async { Ok(ProtocolResponse::NA) }.boxed(), + ProtocolRequest::NewBlock(_) => async { Ok(ProtocolResponse::NA) }.boxed(), + ProtocolRequest::NewFluffyBlock(_) => async { Ok(ProtocolResponse::NA) }.boxed(), + ProtocolRequest::NewTransactions(_) => async { Ok(ProtocolResponse::NA) }.boxed(), } - - async { Ok(ProtocolResponse::NA) }.boxed() } } + +async fn get_objects( + blockchain_read_handle: BlockchainReadHandle, + req: GetObjectsRequest, +) -> Result { + if req.blocks.is_empty() { + Err("No blocks requested in a GetObjectsRequest")?; + } + + if req.blocks.len() > MAX_BLOCK_BATCH_LEN { + Err("Too many blocks requested in a GetObjectsRequest")?; + } + + let block_ids: Vec<[u8; 32]> = (&req.blocks).into(); + // de-allocate the backing [`Bytes`] + drop(req); + + let res = blockchain_read_handle + .oneshot(BlockchainReadRequest::BlockCompleteEntries(block_ids)) + .await?; + + let BlockchainResponse::BlockCompleteEntries { + blocks, + missed_ids, + current_blockchain_height, + } = res + else { + panic!("Blockchain service returned wrong response!"); + }; + + Ok(ProtocolResponse::GetObjects(GetObjectsResponse { + blocks, + missed_ids: missed_ids.into(), + current_blockchain_height: usize_to_u64(current_blockchain_height), + })) +} + +async fn get_chain( + blockchain_read_handle: BlockchainReadHandle, + req: ChainRequest, +) -> Result { + if req.block_ids.is_empty() { + Err("No block hashes sent in a `ChainRequest`")?; + } + + if req.block_ids.len() > MAX_BLOCKCHAIN_SUPPLEMENT_LEN { + Err("Too many block hashes in a `ChainRequest`")?; + } + + let block_ids: Vec<[u8; 32]> = (&req.block_ids).into(); + // de-allocate the backing [`Bytes`] + drop(req); + + let res = blockchain_read_handle + .oneshot(BlockchainReadRequest::NextMissingChainEntry(block_ids)) + .await?; + + let BlockchainResponse::NextMissingChainEntry { + next_entry, + first_missing_block, + start_height, + chain_height, + cumulative_difficulty, + } = res + else { + panic!("Blockchain service returned wrong response!"); + }; + + let (cumulative_difficulty_low64, cumulative_difficulty_top64) = + split_u128_into_low_high_bits(cumulative_difficulty); + + Ok(ProtocolResponse::GetChain(ChainResponse { + start_height: usize_to_u64(start_height), + total_height: usize_to_u64(chain_height), + cumulative_difficulty_low64, + cumulative_difficulty_top64, + m_block_ids: next_entry.into(), + m_block_weights: vec![], + first_block: first_missing_block.map_or(Bytes::new(), Bytes::from), + })) +} diff --git a/consensus/src/block.rs b/consensus/src/block.rs index b33b5857..727b072b 100644 --- a/consensus/src/block.rs +++ b/consensus/src/block.rs @@ -8,6 +8,7 @@ use std::{ }; use futures::FutureExt; +use monero_serai::generators::H; use monero_serai::{ block::Block, transaction::{Input, Transaction}, @@ -183,6 +184,19 @@ impl PreparedBlock { block: block.block, }) } + + pub fn new_alt_block(block: AltBlockInformation) -> Result { + Ok(PreparedBlock { + block_blob: block.block_blob, + hf_vote: HardFork::from_version(block.block.header.hardfork_version) + .map_err(|_| BlockError::HardForkError(HardForkError::HardForkUnknown))?, + hf_version: HardFork::from_vote(block.block.header.hardfork_signal), + block_hash: block.block_hash, + pow_hash: block.pow_hash, + miner_tx_weight: block.block.miner_transaction.weight(), + block: block.block, + }) + } } /// A request to verify a block. diff --git a/p2p/address-book/src/lib.rs b/p2p/address-book/src/lib.rs index c0903485..8c531e75 100644 --- a/p2p/address-book/src/lib.rs +++ b/p2p/address-book/src/lib.rs @@ -88,14 +88,19 @@ mod sealed { /// An internal trait for the address book for a [`NetworkZone`] that adds the requirement of [`borsh`] traits /// onto the network address. - pub trait BorshNetworkZone: NetworkZone { - type BorshAddr: NetZoneAddress + borsh::BorshDeserialize + borsh::BorshSerialize; + pub trait BorshNetworkZone: + NetworkZone< + Addr: NetZoneAddress + + borsh::BorshDeserialize + + borsh::BorshSerialize, + > + { } impl BorshNetworkZone for T where T::Addr: borsh::BorshDeserialize + borsh::BorshSerialize, + ::BanID: borsh::BorshDeserialize + borsh::BorshSerialize, { - type BorshAddr = T::Addr; } } diff --git a/p2p/p2p/src/constants.rs b/p2p/p2p/src/constants.rs index 44dba917..4e6daa73 100644 --- a/p2p/p2p/src/constants.rs +++ b/p2p/p2p/src/constants.rs @@ -46,7 +46,12 @@ pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3; /// The enforced maximum amount of blocks to request in a batch. /// /// Requesting more than this will cause the peer to disconnect and potentially lead to bans. -pub(crate) const MAX_BLOCK_BATCH_LEN: usize = 100; +pub const MAX_BLOCK_BATCH_LEN: usize = 100; + +/// The enforced maximum amount of block hashes in a blockchain supplement request. +/// +/// Requesting more than this might cause the peer to disconnect and potentially lead to bans. +pub const MAX_BLOCKCHAIN_SUPPLEMENT_LEN: usize = 250; /// The timeout that the block downloader will use for requests. pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); diff --git a/p2p/p2p/src/lib.rs b/p2p/p2p/src/lib.rs index 6f95948f..5355df26 100644 --- a/p2p/p2p/src/lib.rs +++ b/p2p/p2p/src/lib.rs @@ -26,7 +26,7 @@ mod broadcast; mod client_pool; pub mod config; pub mod connection_maintainer; -mod constants; +pub mod constants; mod inbound_server; mod sync_states; diff --git a/storage/blockchain/Cargo.toml b/storage/blockchain/Cargo.toml index 46b8414d..594521ac 100644 --- a/storage/blockchain/Cargo.toml +++ b/storage/blockchain/Cargo.toml @@ -38,6 +38,7 @@ serde = { workspace = true, optional = true } tower = { workspace = true } thread_local = { workspace = true, optional = true } rayon = { workspace = true, optional = true } +bytes = "1.6.0" [dev-dependencies] cuprate-helper = { path = "../../helper", features = ["thread", "cast"] } diff --git a/storage/blockchain/src/ops/block.rs b/storage/blockchain/src/ops/block.rs index 8e046520..01023495 100644 --- a/storage/blockchain/src/ops/block.rs +++ b/storage/blockchain/src/ops/block.rs @@ -2,20 +2,23 @@ //---------------------------------------------------------------------------------------------------- Import use bytemuck::TransparentWrapper; -use monero_serai::block::{Block, BlockHeader}; +use bytes::Bytes; use cuprate_database::{ - RuntimeError, StorableVec, {DatabaseRo, DatabaseRw}, + RuntimeError, StorableVec, {DatabaseIter, DatabaseRo, DatabaseRw}, }; +use cuprate_helper::cast::usize_to_u64; use cuprate_helper::{ map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}, tx_utils::tx_fee, }; use cuprate_types::{ - AltBlockInformation, ChainId, ExtendedBlockHeader, HardFork, VerifiedBlockInformation, - VerifiedTransactionInformation, + AltBlockInformation, BlockCompleteEntry, ChainId, ExtendedBlockHeader, HardFork, + TransactionBlobs, VerifiedBlockInformation, VerifiedTransactionInformation, }; +use monero_serai::block::{Block, BlockHeader}; +use crate::tables::TablesIter; use crate::{ ops::{ alt_block, @@ -256,6 +259,40 @@ pub fn get_block_extended_header_top( Ok((header, height)) } +//---------------------------------------------------------------------------------------------------- `get_block_complete_entry` + +pub fn get_block_complete_entry( + block_hash: &BlockHash, + tables: &impl TablesIter, +) -> Result { + let height = tables.block_heights().get(block_hash)?; + + let block_blob = tables.block_blobs().get(&height)?.0; + + let block = Block::read(&mut block_blob.as_slice()).expect("Valid block failed to be read"); + + let txs = if let Some(first_tx) = block.transactions.first() { + let first_tx_idx = tables.tx_ids().get(first_tx)?; + let end_tx_idx = first_tx_idx + usize_to_u64(block.transactions.len()); + + let tx_blobs = tables.tx_blobs_iter().get_range(first_tx_idx..end_tx_idx)?; + + tx_blobs + .map(|res| Ok(Bytes::from(res?.0))) + .collect::>()? + } else { + vec![] + }; + + Ok(BlockCompleteEntry { + block: Bytes::from(block_blob), + txs: TransactionBlobs::Normal(txs), + pruned: false, + // This is only needed when pruned. + block_weight: 0, + }) +} + //---------------------------------------------------------------------------------------------------- Misc /// Retrieve a [`BlockInfo`] via its [`BlockHeight`]. #[doc = doc_error!()] diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index 73b2d220..07fa8253 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -1,19 +1,20 @@ //! Database reader thread-pool definitions and logic. //---------------------------------------------------------------------------------------------------- Import +use rayon::{ + iter::{Either, IntoParallelIterator, ParallelIterator}, + prelude::*, + ThreadPool, +}; +use std::cmp::min; +use std::ops::Index; use std::{ collections::{HashMap, HashSet}, sync::Arc, }; - -use rayon::{ - iter::{IntoParallelIterator, ParallelIterator}, - prelude::*, - ThreadPool, -}; use thread_local::ThreadLocal; -use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError}; +use cuprate_database::{ConcreteEnv, DatabaseIter, DatabaseRo, Env, EnvInner, RuntimeError}; use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; use cuprate_helper::map::combine_low_high_bits_to_u128; use cuprate_types::{ @@ -21,6 +22,8 @@ use cuprate_types::{ Chain, ChainId, ExtendedBlockHeader, OutputOnChain, }; +use crate::ops::block::get_block_complete_entry; +use crate::tables::{BlockBlobs, TxIds}; use crate::{ ops::{ alt_block::{ @@ -92,6 +95,7 @@ fn map_request( /* SOMEDAY: pre-request handling, run some code for each request? */ match request { + R::BlockCompleteEntries(blocks) => block_complete_entries(env, blocks), R::BlockExtendedHeader(block) => block_extended_header(env, block), R::BlockHash(block, chain) => block_hash(env, block, chain), R::FindBlock(block_hash) => find_block(env, block_hash), @@ -106,6 +110,7 @@ fn map_request( R::KeyImagesSpent(set) => key_images_spent(env, set), R::CompactChainHistory => compact_chain_history(env), R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids), + R::NextMissingChainEntry(block_hashes) => next_missing_chain_entry(env, block_hashes), R::AltBlocksInChain(chain_id) => alt_blocks_in_chain(env, chain_id), } @@ -180,9 +185,41 @@ macro_rules! get_tables { // FIXME: implement multi-transaction read atomicity. // . -// TODO: The overhead of parallelism may be too much for every request, perfomace test to find optimal +// TODO: The overhead of parallelism may be too much for every request, performance test to find optimal // amount of parallelism. +/// [`BlockchainReadRequest::BlockCompleteEntries`]. +fn block_complete_entries(env: &ConcreteEnv, block_hashes: Vec) -> ResponseResult { + // Prepare tx/tables in `ThreadLocal`. + let env_inner = env.env_inner(); + let tx_ro = thread_local(env); + let tables = thread_local(env); + + let (missed_ids, blocks) = block_hashes + .into_par_iter() + .map(|block_hash| { + let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?; + let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref(); + + match get_block_complete_entry(&block_hash, tables) { + Err(RuntimeError::KeyNotFound) => Ok(Either::Left(block_hash)), + res => res.map(Either::Right), + } + }) + .collect::>()?; + + let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?; + let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref(); + + let chain_height = crate::ops::blockchain::chain_height(tables.block_heights())?; + + Ok(BlockchainResponse::BlockCompleteEntries { + blocks, + missed_ids, + current_blockchain_height: chain_height, + }) +} + /// [`BlockchainReadRequest::BlockExtendedHeader`]. #[inline] fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult { @@ -556,6 +593,62 @@ fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseRes }) } +/// [`BlockchainReadRequest::NextMissingChainEntry`] +fn next_missing_chain_entry(env: &ConcreteEnv, block_hashes: Vec<[u8; 32]>) -> ResponseResult { + let env_inner = env.env_inner(); + let tx_ro = env_inner.tx_ro()?; + + let table_block_heights = env_inner.open_db_ro::(&tx_ro)?; + let table_block_infos = env_inner.open_db_ro::(&tx_ro)?; + + let (top_block_height, top_block_info) = table_block_infos.last()?; + + let mut start_height = 0; + + for block_hash in block_hashes { + match table_block_heights.get(&block_hash) { + Ok(height) => { + start_height = height; + break; + } + Err(RuntimeError::KeyNotFound) => continue, + Err(e) => return Err(e), + } + } + + let table_block_infos = env_inner.open_db_ro::(&tx_ro)?; + + const DEFAULT_CHAIN_ENTRY_SIZE: usize = 10_000; + + let end_height = min( + start_height + DEFAULT_CHAIN_ENTRY_SIZE, + top_block_height + 1, + ); + + let block_hashes: Vec<_> = table_block_infos + .get_range(start_height..end_height)? + .map(|block_info| Ok(block_info?.block_hash)) + .collect::>()?; + + let first_missing_block = if block_hashes.len() > 1 { + let table_block_blobs = env_inner.open_db_ro::(&tx_ro)?; + Some(table_block_blobs.get(&(start_height + 1))?.0) + } else { + None + }; + + Ok(BlockchainResponse::NextMissingChainEntry { + next_entry: block_hashes, + first_missing_block, + start_height, + chain_height: top_block_height + 1, + cumulative_difficulty: combine_low_high_bits_to_u128( + top_block_info.cumulative_difficulty_low, + top_block_info.cumulative_difficulty_high, + ), + }) +} + /// [`BlockchainReadRequest::AltBlocksInChain`] fn alt_blocks_in_chain(env: &ConcreteEnv, chain_id: ChainId) -> ResponseResult { // Prepare tx/tables in `ThreadLocal`. diff --git a/types/src/blockchain.rs b/types/src/blockchain.rs index f246e59a..1595ec60 100644 --- a/types/src/blockchain.rs +++ b/types/src/blockchain.rs @@ -11,7 +11,7 @@ use std::{ use crate::{ types::{Chain, ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation}, - AltBlockInformation, ChainId, + AltBlockInformation, BlockCompleteEntry, ChainId, }; //---------------------------------------------------------------------------------------------------- ReadRequest @@ -25,6 +25,11 @@ use crate::{ /// See `Response` for the expected responses per `Request`. #[derive(Debug, Clone, PartialEq, Eq)] pub enum BlockchainReadRequest { + /// Request for [`BlockCompleteEntry`]s. + /// + /// The input is the hashes of the blocks wanted. + BlockCompleteEntries(Vec<[u8; 32]>), + /// Request a block's extended header. /// /// The input is the block's height. @@ -101,6 +106,13 @@ pub enum BlockchainReadRequest { /// order, or else the returned response is unspecified and meaningless, /// as this request performs a binary search. FindFirstUnknown(Vec<[u8; 32]>), + + /// A request for the next missing chain entry. + /// + /// The input is a list of block hashes in reverse chronological order that do not necessarily + /// directly follow each other. + NextMissingChainEntry(Vec<[u8; 32]>), + /// A request for all alt blocks in the chain with the given [`ChainId`]. AltBlocksInChain(ChainId), } @@ -145,6 +157,16 @@ pub enum BlockchainWriteRequest { #[derive(Debug, Clone, PartialEq, Eq)] pub enum BlockchainResponse { //------------------------------------------------------ Reads + /// Response to [`BlockchainReadRequest::BlockCompleteEntries`] + BlockCompleteEntries { + /// The blocks requested that we had. + blocks: Vec, + /// The hashes of the blocks we did not have. + missed_ids: Vec<[u8; 32]>, + /// The current height of our blockchain. + current_blockchain_height: usize, + }, + /// Response to [`BlockchainReadRequest::BlockExtendedHeader`]. /// /// Inner value is the extended headed of the requested block. @@ -218,6 +240,24 @@ pub enum BlockchainResponse { /// This will be [`None`] if all blocks were known. FindFirstUnknown(Option<(usize, usize)>), + /// The response for [`BlockchainReadRequest::NextMissingChainEntry`] + NextMissingChainEntry { + /// A list of block hashes that should be next from the requested chain. + /// + /// The first block hash will overlap with one of the blocks in the request. + next_entry: Vec<[u8; 32]>, + /// The block blob of the second block in `next_entry`. + /// + /// If there is only 1 block in `next_entry` then this will be [`None`]. + first_missing_block: Option>, + /// The height of the first block in `next_entry`. + start_height: usize, + /// The current height of our chain. + chain_height: usize, + /// The cumulative difficulty of our chain. + cumulative_difficulty: u128, + }, + /// The response for [`BlockchainReadRequest::AltBlocksInChain`]. /// /// Contains all the alt blocks in the alt-chain in chronological order.