diff --git a/Cargo.lock b/Cargo.lock index b7eb7a4..4ccf365 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -580,6 +580,7 @@ version = "0.0.0" dependencies = [ "bitflags 2.6.0", "bytemuck", + "bytes", "cuprate-constants", "cuprate-database", "cuprate-database-service", diff --git a/binaries/cuprated/src/p2p.rs b/binaries/cuprated/src/p2p.rs index cdf1cef..aeb98b6 100644 --- a/binaries/cuprated/src/p2p.rs +++ b/binaries/cuprated/src/p2p.rs @@ -1,8 +1,57 @@ //! P2P //! //! Will handle initiating the P2P and contains a protocol request handler. +use futures::{FutureExt, TryFutureExt}; +use tokio::sync::oneshot; +use tower::ServiceExt; +use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_consensus::BlockChainContextService; +use cuprate_p2p::{NetworkInterface, P2PConfig}; +use cuprate_p2p_core::ClearNet; +use cuprate_txpool::service::TxpoolReadHandle; + +use crate::txpool::IncomingTxHandler; + +mod core_sync_service; mod network_address; pub mod request_handler; pub use network_address::CrossNetworkInternalPeerId; + +/// Starts the P2P clearnet network, returning a [`NetworkInterface`] to interact with it. +/// +/// A [`oneshot::Sender`] is also returned to provide the [`IncomingTxHandler`], until this is provided network +/// handshakes can not be completed. +pub async fn start_clearnet_p2p( + blockchain_read_handle: BlockchainReadHandle, + blockchain_context_service: BlockChainContextService, + txpool_read_handle: TxpoolReadHandle, + config: P2PConfig, +) -> Result< + ( + NetworkInterface, + oneshot::Sender, + ), + tower::BoxError, +> { + let (incoming_tx_handler_tx, incoming_tx_handler_rx) = oneshot::channel(); + + let request_handler_maker = request_handler::P2pProtocolRequestHandlerMaker { + blockchain_read_handle, + blockchain_context_service: blockchain_context_service.clone(), + txpool_read_handle, + incoming_tx_handler: None, + incoming_tx_handler_fut: incoming_tx_handler_rx.shared(), + }; + + Ok(( + cuprate_p2p::initialize_network( + request_handler_maker.map_response(|s| s.map_err(Into::into)), + core_sync_service::CoreSyncService(blockchain_context_service), + config, + ) + .await?, + incoming_tx_handler_tx, + )) +} diff --git a/binaries/cuprated/src/p2p/core_sync_service.rs b/binaries/cuprated/src/p2p/core_sync_service.rs new file mode 100644 index 0000000..d3c3ca1 --- /dev/null +++ b/binaries/cuprated/src/p2p/core_sync_service.rs @@ -0,0 +1,49 @@ +use std::task::{Context, Poll}; + +use futures::{future::BoxFuture, FutureExt, TryFutureExt}; +use tower::Service; + +use cuprate_consensus::{ + BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, +}; +use cuprate_helper::{cast::usize_to_u64, map::split_u128_into_low_high_bits}; +use cuprate_p2p_core::services::{CoreSyncDataRequest, CoreSyncDataResponse}; +use cuprate_wire::CoreSyncData; + +/// The core sync service. +#[derive(Clone)] +pub struct CoreSyncService(pub BlockChainContextService); + +impl Service for CoreSyncService { + type Response = CoreSyncDataResponse; + type Error = tower::BoxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_ready(cx) + } + + fn call(&mut self, _: CoreSyncDataRequest) -> Self::Future { + self.0 + .call(BlockChainContextRequest::Context) + .map_ok(|res| { + let BlockChainContextResponse::Context(context) = res else { + unreachable!() + }; + + let context = context.unchecked_blockchain_context(); + let (cumulative_difficulty, cumulative_difficulty_top64) = + split_u128_into_low_high_bits(context.cumulative_difficulty); + + CoreSyncDataResponse(CoreSyncData { + cumulative_difficulty, + cumulative_difficulty_top64, + current_height: usize_to_u64(context.chain_height), + pruning_seed: 0, + top_id: context.top_hash, + top_version: context.current_hf.as_u8(), + }) + }) + .boxed() + } +} diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 8b13789..cb0757c 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -1 +1,432 @@ +use std::{ + collections::HashSet, + future::{ready, Ready}, + hash::Hash, + task::{Context, Poll}, +}; +use bytes::Bytes; +use futures::{ + future::{BoxFuture, Shared}, + FutureExt, +}; +use monero_serai::{block::Block, transaction::Transaction}; +use tokio::sync::{broadcast, oneshot, watch}; +use tokio_stream::wrappers::WatchStream; +use tower::{Service, ServiceExt}; + +use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_consensus::{ + transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse, + BlockChainContextService, +}; +use cuprate_dandelion_tower::TxState; +use cuprate_fixed_bytes::ByteArrayVec; +use cuprate_helper::cast::u64_to_usize; +use cuprate_helper::{ + asynch::rayon_spawn_async, + cast::usize_to_u64, + map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}, +}; +use cuprate_p2p::constants::{ + MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MAX_BLOCK_BATCH_LEN, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN, +}; +use cuprate_p2p_core::{ + client::{InternalPeerID, PeerInformation}, + NetZoneAddress, NetworkZone, ProtocolRequest, ProtocolResponse, +}; +use cuprate_txpool::service::TxpoolReadHandle; +use cuprate_types::{ + blockchain::{BlockchainReadRequest, BlockchainResponse}, + BlockCompleteEntry, MissingTxsInBlock, TransactionBlobs, +}; +use cuprate_wire::protocol::{ + ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest, + GetObjectsResponse, NewFluffyBlock, NewTransactions, +}; + +use crate::{ + blockchain::interface::{self as blockchain_interface, IncomingBlockError}, + constants::PANIC_CRITICAL_SERVICE_ERROR, + p2p::CrossNetworkInternalPeerId, + txpool::{IncomingTxError, IncomingTxHandler, IncomingTxs}, +}; + +/// The P2P protocol request handler [`MakeService`](tower::MakeService). +#[derive(Clone)] +pub struct P2pProtocolRequestHandlerMaker { + pub blockchain_read_handle: BlockchainReadHandle, + + pub blockchain_context_service: BlockChainContextService, + + pub txpool_read_handle: TxpoolReadHandle, + + /// The [`IncomingTxHandler`], wrapped in an [`Option`] as there is a cyclic reference between [`P2pProtocolRequestHandlerMaker`] + /// and the [`IncomingTxHandler`]. + pub incoming_tx_handler: Option, + + /// A [`Future`](std::future::Future) that produces the [`IncomingTxHandler`]. + pub incoming_tx_handler_fut: Shared>, +} + +impl Service> for P2pProtocolRequestHandlerMaker +where + InternalPeerID: Into, +{ + type Response = P2pProtocolRequestHandler; + type Error = tower::BoxError; + type Future = Ready>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.incoming_tx_handler.is_none() { + return self + .incoming_tx_handler_fut + .poll_unpin(cx) + .map(|incoming_tx_handler| { + self.incoming_tx_handler = Some(incoming_tx_handler?); + Ok(()) + }); + } + + Poll::Ready(Ok(())) + } + + fn call(&mut self, peer_information: PeerInformation) -> Self::Future { + let Some(incoming_tx_handler) = self.incoming_tx_handler.clone() else { + panic!("poll_ready was not called or did not return `Poll::Ready`") + }; + + // TODO: check sync info? + + let blockchain_read_handle = self.blockchain_read_handle.clone(); + let txpool_read_handle = self.txpool_read_handle.clone(); + + ready(Ok(P2pProtocolRequestHandler { + peer_information, + blockchain_read_handle, + blockchain_context_service: self.blockchain_context_service.clone(), + txpool_read_handle, + incoming_tx_handler, + })) + } +} + +/// The P2P protocol request handler. +#[derive(Clone)] +pub struct P2pProtocolRequestHandler { + peer_information: PeerInformation, + + blockchain_read_handle: BlockchainReadHandle, + + blockchain_context_service: BlockChainContextService, + + txpool_read_handle: TxpoolReadHandle, + + incoming_tx_handler: IncomingTxHandler, +} + +impl Service for P2pProtocolRequestHandler +where + InternalPeerID: Into, +{ + type Response = ProtocolResponse; + type Error = anyhow::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: ProtocolRequest) -> Self::Future { + match request { + ProtocolRequest::GetObjects(r) => { + get_objects(r, self.blockchain_read_handle.clone()).boxed() + } + ProtocolRequest::GetChain(r) => { + get_chain(r, self.blockchain_read_handle.clone()).boxed() + } + ProtocolRequest::FluffyMissingTxs(r) => { + fluffy_missing_txs(r, self.blockchain_read_handle.clone()).boxed() + } + ProtocolRequest::NewBlock(_) => ready(Err(anyhow::anyhow!( + "Peer sent a full block when we support fluffy blocks" + ))) + .boxed(), + ProtocolRequest::NewFluffyBlock(r) => new_fluffy_block( + self.peer_information.clone(), + r, + self.blockchain_read_handle.clone(), + self.txpool_read_handle.clone(), + ) + .boxed(), + ProtocolRequest::NewTransactions(r) => new_transactions( + self.peer_information.clone(), + r, + self.blockchain_context_service.clone(), + self.incoming_tx_handler.clone(), + ) + .boxed(), + ProtocolRequest::GetTxPoolCompliment(_) => ready(Ok(ProtocolResponse::NA)).boxed(), // TODO: should we support this? + } + } +} + +//---------------------------------------------------------------------------------------------------- Handler functions + +/// [`ProtocolRequest::GetObjects`] +async fn get_objects( + request: GetObjectsRequest, + mut blockchain_read_handle: BlockchainReadHandle, +) -> anyhow::Result { + if request.blocks.len() > MAX_BLOCK_BATCH_LEN { + anyhow::bail!("Peer requested more blocks than allowed.") + } + + let block_hashes: Vec<[u8; 32]> = (&request.blocks).into(); + // deallocate the backing `Bytes`. + drop(request); + + let BlockchainResponse::BlockCompleteEntries { + blocks, + missing_hashes, + blockchain_height, + } = blockchain_read_handle + .ready() + .await? + .call(BlockchainReadRequest::BlockCompleteEntries(block_hashes)) + .await? + else { + panic!("blockchain returned wrong response!"); + }; + + Ok(ProtocolResponse::GetObjects(GetObjectsResponse { + blocks, + missed_ids: ByteArrayVec::from(missing_hashes), + current_blockchain_height: usize_to_u64(blockchain_height), + })) +} + +/// [`ProtocolRequest::GetChain`] +async fn get_chain( + request: ChainRequest, + mut blockchain_read_handle: BlockchainReadHandle, +) -> anyhow::Result { + if request.block_ids.len() > MAX_BLOCKS_IDS_IN_CHAIN_ENTRY { + anyhow::bail!("Peer sent too many block hashes in chain request.") + } + + let block_hashes: Vec<[u8; 32]> = (&request.block_ids).into(); + let want_pruned_data = request.prune; + // deallocate the backing `Bytes`. + drop(request); + + let BlockchainResponse::NextChainEntry { + start_height, + chain_height, + block_ids, + block_weights, + cumulative_difficulty, + first_block_blob, + } = blockchain_read_handle + .ready() + .await? + .call(BlockchainReadRequest::NextChainEntry(block_hashes, 10_000)) + .await? + else { + panic!("blockchain returned wrong response!"); + }; + + if start_height == 0 { + anyhow::bail!("The peers chain has a different genesis block than ours."); + } + + let (cumulative_difficulty_low64, cumulative_difficulty_top64) = + split_u128_into_low_high_bits(cumulative_difficulty); + + Ok(ProtocolResponse::GetChain(ChainResponse { + start_height: usize_to_u64(start_height), + total_height: usize_to_u64(chain_height), + cumulative_difficulty_low64, + cumulative_difficulty_top64, + m_block_ids: ByteArrayVec::from(block_ids), + first_block: first_block_blob.map_or(Bytes::new(), Bytes::from), + // only needed when pruned + m_block_weights: if want_pruned_data { + block_weights.into_iter().map(usize_to_u64).collect() + } else { + vec![] + }, + })) +} + +/// [`ProtocolRequest::FluffyMissingTxs`] +async fn fluffy_missing_txs( + mut request: FluffyMissingTransactionsRequest, + mut blockchain_read_handle: BlockchainReadHandle, +) -> anyhow::Result { + let tx_indexes = std::mem::take(&mut request.missing_tx_indices); + let block_hash: [u8; 32] = *request.block_hash; + let current_blockchain_height = request.current_blockchain_height; + + // deallocate the backing `Bytes`. + drop(request); + + let BlockchainResponse::MissingTxsInBlock(res) = blockchain_read_handle + .ready() + .await? + .call(BlockchainReadRequest::MissingTxsInBlock { + block_hash, + tx_indexes, + }) + .await? + else { + panic!("blockchain returned wrong response!"); + }; + + let Some(MissingTxsInBlock { block, txs }) = res else { + anyhow::bail!("The peer requested txs out of range."); + }; + + Ok(ProtocolResponse::NewFluffyBlock(NewFluffyBlock { + b: BlockCompleteEntry { + block: Bytes::from(block), + txs: TransactionBlobs::Normal(txs.into_iter().map(Bytes::from).collect()), + pruned: false, + // only needed for pruned blocks. + block_weight: 0, + }, + current_blockchain_height, + })) +} + +/// [`ProtocolRequest::NewFluffyBlock`] +async fn new_fluffy_block( + peer_information: PeerInformation, + request: NewFluffyBlock, + mut blockchain_read_handle: BlockchainReadHandle, + mut txpool_read_handle: TxpoolReadHandle, +) -> anyhow::Result { + // TODO: check context service here and ignore the block? + let current_blockchain_height = request.current_blockchain_height; + + peer_information + .core_sync_data + .lock() + .unwrap() + .current_height = current_blockchain_height; + + let (block, txs) = rayon_spawn_async(move || -> Result<_, anyhow::Error> { + let block = Block::read(&mut request.b.block.as_ref())?; + + let tx_blobs = request + .b + .txs + .take_normal() + .ok_or(anyhow::anyhow!("Peer sent pruned txs in fluffy block"))?; + + let txs = tx_blobs + .into_iter() + .map(|tx_blob| { + if tx_blob.len() > MAX_TRANSACTION_BLOB_SIZE { + anyhow::bail!("Peer sent a transaction over the size limit."); + } + + let tx = Transaction::read(&mut tx_blob.as_ref())?; + + Ok((tx.hash(), tx)) + }) + .collect::>()?; + + // The backing `Bytes` will be deallocated when this closure returns. + + Ok((block, txs)) + }) + .await?; + + let res = blockchain_interface::handle_incoming_block( + block, + txs, + &mut blockchain_read_handle, + &mut txpool_read_handle, + ) + .await; + + match res { + Ok(_) => Ok(ProtocolResponse::NA), + Err(IncomingBlockError::UnknownTransactions(block_hash, missing_tx_indices)) => Ok( + ProtocolResponse::FluffyMissingTransactionsRequest(FluffyMissingTransactionsRequest { + block_hash: block_hash.into(), + current_blockchain_height, + missing_tx_indices: missing_tx_indices.into_iter().map(usize_to_u64).collect(), + }), + ), + Err(IncomingBlockError::Orphan) => { + // Block's parent was unknown, could be syncing? + Ok(ProtocolResponse::NA) + } + Err(e) => Err(e.into()), + } +} + +/// [`ProtocolRequest::NewTransactions`] +async fn new_transactions( + peer_information: PeerInformation, + request: NewTransactions, + mut blockchain_context_service: BlockChainContextService, + mut incoming_tx_handler: IncomingTxHandler, +) -> anyhow::Result +where + A: NetZoneAddress, + InternalPeerID: Into, +{ + let BlockChainContextResponse::Context(context) = blockchain_context_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockChainContextRequest::Context) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; + + let context = context.unchecked_blockchain_context(); + + // If we are more than 2 blocks behind the peer then ignore the txs - we are probably still syncing. + if usize_to_u64(context.chain_height + 2) + < peer_information + .core_sync_data + .lock() + .unwrap() + .current_height + { + return Ok(ProtocolResponse::NA); + } + + let state = if request.dandelionpp_fluff { + TxState::Fluff + } else { + TxState::Stem { + from: peer_information.id.into(), + } + }; + + // Drop all the data except the stuff we still need. + let NewTransactions { + txs, + dandelionpp_fluff: _, + padding: _, + } = request; + + let res = incoming_tx_handler + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(IncomingTxs { txs, state }) + .await; + + match res { + Ok(()) => Ok(ProtocolResponse::NA), + Err(e) => Err(e.into()), + } +} diff --git a/binaries/cuprated/src/txpool.rs b/binaries/cuprated/src/txpool.rs index 9592c2b..2076956 100644 --- a/binaries/cuprated/src/txpool.rs +++ b/binaries/cuprated/src/txpool.rs @@ -12,4 +12,4 @@ mod dandelion; mod incoming_tx; mod txs_being_handled; -pub use incoming_tx::IncomingTxHandler; +pub use incoming_tx::{IncomingTxError, IncomingTxHandler, IncomingTxs}; diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index e204159..bf7684e 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -43,9 +43,13 @@ use crate::{ }; /// An error that can happen handling an incoming tx. +#[derive(Debug, thiserror::Error)] pub enum IncomingTxError { + #[error("Error parsing tx: {0}")] Parse(std::io::Error), + #[error(transparent)] Consensus(ExtendedConsensusError), + #[error("Duplicate tx in message")] DuplicateTransaction, } @@ -67,6 +71,7 @@ pub(super) type TxId = [u8; 32]; /// The service than handles incoming transaction pool transactions. /// /// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes. +#[derive(Clone)] pub struct IncomingTxHandler { /// A store of txs currently being handled in incoming tx requests. pub(super) txs_being_handled: TxsBeingHandled, diff --git a/net/wire/src/p2p/protocol.rs b/net/wire/src/p2p/protocol.rs index 1d1d45a..cc4b49d 100644 --- a/net/wire/src/p2p/protocol.rs +++ b/net/wire/src/p2p/protocol.rs @@ -159,7 +159,7 @@ epee_object!( current_blockchain_height: u64, ); -/// A request for Txs we are missing from our `TxPool` +/// A request for txs we are missing from an incoming block. #[derive(Debug, Clone, PartialEq, Eq)] pub struct FluffyMissingTransactionsRequest { /// The Block we are missing the Txs in diff --git a/p2p/p2p-core/src/protocol.rs b/p2p/p2p-core/src/protocol.rs index 7d8d431..82aac82 100644 --- a/p2p/p2p-core/src/protocol.rs +++ b/p2p/p2p-core/src/protocol.rs @@ -116,6 +116,7 @@ pub enum ProtocolResponse { GetChain(ChainResponse), NewFluffyBlock(NewFluffyBlock), NewTransactions(NewTransactions), + FluffyMissingTransactionsRequest(FluffyMissingTransactionsRequest), NA, } @@ -139,6 +140,9 @@ impl PeerResponse { ProtocolResponse::GetChain(_) => MessageID::GetChain, ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock, ProtocolResponse::NewTransactions(_) => MessageID::NewFluffyBlock, + ProtocolResponse::FluffyMissingTransactionsRequest(_) => { + MessageID::FluffyMissingTxs + } ProtocolResponse::NA => return None, }, diff --git a/p2p/p2p-core/src/protocol/try_from.rs b/p2p/p2p-core/src/protocol/try_from.rs index d3a7260..2dfc41d 100644 --- a/p2p/p2p-core/src/protocol/try_from.rs +++ b/p2p/p2p-core/src/protocol/try_from.rs @@ -71,6 +71,9 @@ impl TryFrom for ProtocolMessage { ProtocolResponse::NewFluffyBlock(val) => Self::NewFluffyBlock(val), ProtocolResponse::GetChain(val) => Self::ChainEntryResponse(val), ProtocolResponse::GetObjects(val) => Self::GetObjectsResponse(val), + ProtocolResponse::FluffyMissingTransactionsRequest(val) => { + Self::FluffyMissingTransactionsRequest(val) + } ProtocolResponse::NA => return Err(MessageConversionError), }) } diff --git a/p2p/p2p/src/constants.rs b/p2p/p2p/src/constants.rs index f70d64c..a81557c 100644 --- a/p2p/p2p/src/constants.rs +++ b/p2p/p2p/src/constants.rs @@ -52,7 +52,7 @@ pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3; /// The enforced maximum amount of blocks to request in a batch. /// /// Requesting more than this will cause the peer to disconnect and potentially lead to bans. -pub(crate) const MAX_BLOCK_BATCH_LEN: usize = 100; +pub const MAX_BLOCK_BATCH_LEN: usize = 100; /// The timeout that the block downloader will use for requests. pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); @@ -61,13 +61,13 @@ pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_sec /// be less than. /// /// ref: -pub(crate) const MAX_TRANSACTION_BLOB_SIZE: usize = 1_000_000; +pub const MAX_TRANSACTION_BLOB_SIZE: usize = 1_000_000; /// The maximum amount of block IDs allowed in a chain entry response. /// /// ref: // TODO: link to the protocol book when this section is added. -pub(crate) const MAX_BLOCKS_IDS_IN_CHAIN_ENTRY: usize = 25_000; +pub const MAX_BLOCKS_IDS_IN_CHAIN_ENTRY: usize = 25_000; /// The amount of failures downloading a specific batch before we stop attempting to download it. pub(crate) const MAX_DOWNLOAD_FAILURES: usize = 5; diff --git a/storage/blockchain/Cargo.toml b/storage/blockchain/Cargo.toml index 92b4374..4625054 100644 --- a/storage/blockchain/Cargo.toml +++ b/storage/blockchain/Cargo.toml @@ -34,6 +34,7 @@ serde = { workspace = true, optional = true } tower = { workspace = true } thread_local = { workspace = true } rayon = { workspace = true } +bytes = "1.7.2" [dev-dependencies] cuprate-constants = { workspace = true } diff --git a/storage/blockchain/src/ops/block.rs b/storage/blockchain/src/ops/block.rs index 6d32fd8..94c1fc6 100644 --- a/storage/blockchain/src/ops/block.rs +++ b/storage/blockchain/src/ops/block.rs @@ -2,21 +2,23 @@ //---------------------------------------------------------------------------------------------------- Import use bytemuck::TransparentWrapper; +use bytes::Bytes; use monero_serai::{ block::{Block, BlockHeader}, transaction::Transaction, }; use cuprate_database::{ - RuntimeError, StorableVec, {DatabaseRo, DatabaseRw}, + RuntimeError, StorableVec, {DatabaseIter, DatabaseRo, DatabaseRw}, }; +use cuprate_helper::cast::usize_to_u64; use cuprate_helper::{ map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}, tx::tx_fee, }; use cuprate_types::{ - AltBlockInformation, ChainId, ExtendedBlockHeader, HardFork, VerifiedBlockInformation, - VerifiedTransactionInformation, + AltBlockInformation, BlockCompleteEntry, ChainId, ExtendedBlockHeader, HardFork, + TransactionBlobs, VerifiedBlockInformation, VerifiedTransactionInformation, }; use crate::{ @@ -27,7 +29,7 @@ use crate::{ output::get_rct_num_outputs, tx::{add_tx, remove_tx}, }, - tables::{BlockHeights, BlockInfos, Tables, TablesMut}, + tables::{BlockHeights, BlockInfos, Tables, TablesIter, TablesMut}, types::{BlockHash, BlockHeight, BlockInfo}, }; @@ -224,6 +226,66 @@ pub fn pop_block( Ok((block_height, block_info.block_hash, block)) } +//---------------------------------------------------------------------------------------------------- `get_block_blob_with_tx_indexes` +/// Retrieve a block's raw bytes, the index of the miner transaction and the number of non miner-txs in the block. +/// +#[doc = doc_error!()] +pub fn get_block_blob_with_tx_indexes( + block_height: &BlockHeight, + tables: &impl Tables, +) -> Result<(Vec, u64, usize), RuntimeError> { + use monero_serai::io::write_varint; + + let block_info = tables.block_infos().get(block_height)?; + + let miner_tx_idx = block_info.mining_tx_index; + let block_txs = tables.block_txs_hashes().get(block_height)?.0; + let numb_txs = block_txs.len(); + + // Get the block header + let mut block = tables.block_header_blobs().get(block_height)?.0; + + // Add the miner tx to the blob. + let mut miner_tx_blob = tables.tx_blobs().get(&miner_tx_idx)?.0; + block.append(&mut miner_tx_blob); + + // Add the blocks tx hashes. + write_varint(&block_txs.len(), &mut block) + .expect("The number of txs per block will not exceed u64::MAX"); + + let block_txs_bytes = bytemuck::cast_slice(&block_txs); + block.extend_from_slice(block_txs_bytes); + + Ok((block, miner_tx_idx, numb_txs)) +} + +//---------------------------------------------------------------------------------------------------- `get_block_extended_header_*` +/// Retrieve a [`BlockCompleteEntry`] from the database. +/// +#[doc = doc_error!()] +pub fn get_block_complete_entry( + block_hash: &BlockHash, + tables: &impl TablesIter, +) -> Result { + let block_height = tables.block_heights().get(block_hash)?; + let (block_blob, miner_tx_idx, numb_non_miner_txs) = + get_block_blob_with_tx_indexes(&block_height, tables)?; + + let first_tx_idx = miner_tx_idx + 1; + + let tx_blobs = tables + .tx_blobs_iter() + .get_range(first_tx_idx..=usize_to_u64(numb_non_miner_txs))? + .map(|tx_blob| Ok(Bytes::from(tx_blob?.0))) + .collect::>()?; + + Ok(BlockCompleteEntry { + block: Bytes::from(block_blob), + txs: TransactionBlobs::Normal(tx_blobs), + pruned: false, + block_weight: 0, + }) +} //---------------------------------------------------------------------------------------------------- `get_block_extended_header_*` /// Retrieve a [`ExtendedBlockHeader`] from the database. diff --git a/storage/blockchain/src/ops/blockchain.rs b/storage/blockchain/src/ops/blockchain.rs index 04f8b26..c6cd404 100644 --- a/storage/blockchain/src/ops/blockchain.rs +++ b/storage/blockchain/src/ops/blockchain.rs @@ -4,9 +4,9 @@ use cuprate_database::{DatabaseRo, RuntimeError}; use crate::{ - ops::macros::doc_error, + ops::{block::block_exists, macros::doc_error}, tables::{BlockHeights, BlockInfos}, - types::BlockHeight, + types::{BlockHash, BlockHeight}, }; //---------------------------------------------------------------------------------------------------- Free Functions @@ -78,6 +78,45 @@ pub fn cumulative_generated_coins( } } +/// Find the split point between our chain and a list of [`BlockHash`]s from another chain. +/// +/// This function accepts chains in chronological and reverse chronological order, however +/// if the wrong order is specified the return value is meaningless. +/// +/// For chronologically ordered chains this will return the index of the first unknown, for reverse +/// chronologically ordered chains this will return the index of the fist known. +/// +/// If all blocks are known for chronologically ordered chains or unknown for reverse chronologically +/// ordered chains then the length of the chain will be returned. +#[doc = doc_error!()] +#[inline] +pub fn find_split_point( + block_ids: &[BlockHash], + chronological_order: bool, + table_block_heights: &impl DatabaseRo, +) -> Result { + let mut err = None; + + // Do a binary search to find the first unknown/known block in the batch. + let idx = + block_ids.partition_point( + |block_id| match block_exists(block_id, table_block_heights) { + Ok(exists) => exists & chronological_order, + Err(e) => { + err.get_or_insert(e); + // if this happens the search is scrapped, just return `false` back. + false + } + }, + ); + + if let Some(e) = err { + return Err(e); + } + + Ok(idx) +} + //---------------------------------------------------------------------------------------------------- Tests #[cfg(test)] mod test { diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index e3c0180..01fc92d 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -10,23 +10,24 @@ //---------------------------------------------------------------------------------------------------- Import use std::{ + cmp::min, collections::{HashMap, HashSet}, sync::Arc, }; use rayon::{ - iter::{IntoParallelIterator, ParallelIterator}, + iter::{Either, IntoParallelIterator, ParallelIterator}, prelude::*, ThreadPool, }; use thread_local::ThreadLocal; -use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError}; +use cuprate_database::{ConcreteEnv, DatabaseIter, DatabaseRo, Env, EnvInner, RuntimeError}; use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; use cuprate_helper::map::combine_low_high_bits_to_u128; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, - Chain, ChainId, ExtendedBlockHeader, OutputHistogramInput, OutputOnChain, + Chain, ChainId, ExtendedBlockHeader, MissingTxsInBlock, OutputHistogramInput, OutputOnChain, }; use crate::{ @@ -36,9 +37,10 @@ use crate::{ get_alt_chain_history_ranges, }, block::{ - block_exists, get_block_extended_header_from_height, get_block_height, get_block_info, + block_exists, get_block_blob_with_tx_indexes, get_block_complete_entry, + get_block_extended_header_from_height, get_block_height, get_block_info, }, - blockchain::{cumulative_generated_coins, top_block_height}, + blockchain::{cumulative_generated_coins, find_split_point, top_block_height}, key_image::key_image_exists, output::id_to_output_on_chain, }, @@ -46,7 +48,7 @@ use crate::{ free::{compact_history_genesis_not_included, compact_history_index_to_height_offset}, types::{BlockchainReadHandle, ResponseResult}, }, - tables::{AltBlockHeights, BlockHeights, BlockInfos, OpenTables, Tables}, + tables::{AltBlockHeights, BlockHeights, BlockInfos, OpenTables, Tables, TablesIter}, types::{ AltBlockHeight, Amount, AmountIndex, BlockHash, BlockHeight, KeyImage, PreRctOutputId, }, @@ -100,6 +102,7 @@ fn map_request( /* SOMEDAY: pre-request handling, run some code for each request? */ match request { + R::BlockCompleteEntries(block_hashes) => block_complete_entries(env, block_hashes), R::BlockExtendedHeader(block) => block_extended_header(env, block), R::BlockHash(block, chain) => block_hash(env, block, chain), R::FindBlock(block_hash) => find_block(env, block_hash), @@ -113,7 +116,12 @@ fn map_request( R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(env, vec), R::KeyImagesSpent(set) => key_images_spent(env, set), R::CompactChainHistory => compact_chain_history(env), + R::NextChainEntry(block_hashes, amount) => next_chain_entry(env, &block_hashes, amount), R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids), + R::MissingTxsInBlock { + block_hash, + tx_indexes, + } => missing_txs_in_block(env, block_hash, tx_indexes), R::AltBlocksInChain(chain_id) => alt_blocks_in_chain(env, chain_id), R::Block { height } => block(env, height), R::BlockByHash(hash) => block_by_hash(env, hash), @@ -198,6 +206,38 @@ macro_rules! get_tables { // TODO: The overhead of parallelism may be too much for every request, perfomace test to find optimal // amount of parallelism. +/// [`BlockchainReadRequest::BlockCompleteEntries`]. +fn block_complete_entries(env: &ConcreteEnv, block_hashes: Vec) -> ResponseResult { + // Prepare tx/tables in `ThreadLocal`. + let env_inner = env.env_inner(); + let tx_ro = thread_local(env); + let tables = thread_local(env); + + let (missing_hashes, blocks) = block_hashes + .into_par_iter() + .map(|block_hash| { + let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?; + let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref(); + + match get_block_complete_entry(&block_hash, tables) { + Err(RuntimeError::KeyNotFound) => Ok(Either::Left(block_hash)), + res => res.map(Either::Right), + } + }) + .collect::>()?; + + let tx_ro = tx_ro.get_or_try(|| env_inner.tx_ro())?; + let tables = get_tables!(env_inner, tx_ro, tables)?.as_ref(); + + let blockchain_height = crate::ops::blockchain::chain_height(tables.block_heights())?; + + Ok(BlockchainResponse::BlockCompleteEntries { + blocks, + missing_hashes, + blockchain_height, + }) +} + /// [`BlockchainReadRequest::BlockExtendedHeader`]. #[inline] fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult { @@ -534,6 +574,76 @@ fn compact_chain_history(env: &ConcreteEnv) -> ResponseResult { }) } +/// [`BlockchainReadRequest::NextChainEntry`] +/// +/// # Invariant +/// `block_ids` must be sorted in reverse chronological block order, or else +/// the returned result is unspecified and meaningless, as this function +/// performs a binary search. +fn next_chain_entry( + env: &ConcreteEnv, + block_ids: &[BlockHash], + next_entry_size: usize, +) -> ResponseResult { + // Single-threaded, no `ThreadLocal` required. + let env_inner = env.env_inner(); + let tx_ro = env_inner.tx_ro()?; + + let tables = env_inner.open_tables(&tx_ro)?; + let table_block_heights = tables.block_heights(); + let table_block_infos = tables.block_infos_iter(); + + let idx = find_split_point(block_ids, false, table_block_heights)?; + + // This will happen if we have a different genesis block. + if idx == block_ids.len() { + return Ok(BlockchainResponse::NextChainEntry { + start_height: 0, + chain_height: 0, + block_ids: vec![], + block_weights: vec![], + cumulative_difficulty: 0, + first_block_blob: None, + }); + } + + // The returned chain entry must overlap with one of the blocks we were told about. + let first_known_block_hash = block_ids[idx]; + let first_known_height = table_block_heights.get(&first_known_block_hash)?; + + let chain_height = crate::ops::blockchain::chain_height(table_block_heights)?; + let last_height_in_chain_entry = min(first_known_height + next_entry_size, chain_height); + + let (block_ids, block_weights) = table_block_infos + .get_range(first_known_height..last_height_in_chain_entry)? + .map(|block_info| { + let block_info = block_info?; + + Ok((block_info.block_hash, block_info.weight)) + }) + .collect::, Vec<_>), RuntimeError>>()?; + + let top_block_info = table_block_infos.get(&(chain_height - 1))?; + + let first_block_blob = if block_ids.len() >= 2 { + Some(get_block_blob_with_tx_indexes(&(first_known_height + 1), &tables)?.0) + } else { + None + }; + + Ok(BlockchainResponse::NextChainEntry { + start_height: first_known_height, + chain_height, + block_ids, + block_weights, + cumulative_difficulty: combine_low_high_bits_to_u128( + top_block_info.cumulative_difficulty_low, + top_block_info.cumulative_difficulty_high, + ), + first_block_blob, + }) +} + /// [`BlockchainReadRequest::FindFirstUnknown`] /// /// # Invariant @@ -546,24 +656,7 @@ fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseRes let table_block_heights = env_inner.open_db_ro::(&tx_ro)?; - let mut err = None; - - // Do a binary search to find the first unknown block in the batch. - let idx = - block_ids.partition_point( - |block_id| match block_exists(block_id, &table_block_heights) { - Ok(exists) => exists, - Err(e) => { - err.get_or_insert(e); - // if this happens the search is scrapped, just return `false` back. - false - } - }, - ); - - if let Some(e) = err { - return Err(e); - } + let idx = find_split_point(block_ids, true, &table_block_heights)?; Ok(if idx == block_ids.len() { BlockchainResponse::FindFirstUnknown(None) @@ -576,6 +669,36 @@ fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseRes }) } +/// [`BlockchainReadRequest::MissingTxsInBlock`] +fn missing_txs_in_block( + env: &ConcreteEnv, + block_hash: [u8; 32], + missing_txs: Vec, +) -> ResponseResult { + // Single-threaded, no `ThreadLocal` required. + let env_inner = env.env_inner(); + let tx_ro = env_inner.tx_ro()?; + let tables = env_inner.open_tables(&tx_ro)?; + + let block_height = tables.block_heights().get(&block_hash)?; + + let (block, miner_tx_index, numb_txs) = get_block_blob_with_tx_indexes(&block_height, &tables)?; + let first_tx_index = miner_tx_index + 1; + + if numb_txs < missing_txs.len() { + return Ok(BlockchainResponse::MissingTxsInBlock(None)); + } + + let txs = missing_txs + .into_iter() + .map(|index_offset| Ok(tables.tx_blobs().get(&(first_tx_index + index_offset))?.0)) + .collect::>()?; + + Ok(BlockchainResponse::MissingTxsInBlock(Some( + MissingTxsInBlock { block, txs }, + ))) +} + /// [`BlockchainReadRequest::AltBlocksInChain`] fn alt_blocks_in_chain(env: &ConcreteEnv, chain_id: ChainId) -> ResponseResult { // Prepare tx/tables in `ThreadLocal`. diff --git a/types/src/blockchain.rs b/types/src/blockchain.rs index c39c0bd..258d526 100644 --- a/types/src/blockchain.rs +++ b/types/src/blockchain.rs @@ -11,9 +11,11 @@ use std::{ use monero_serai::block::Block; use crate::{ - types::{Chain, ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation}, - AltBlockInformation, ChainId, ChainInfo, CoinbaseTxSum, OutputHistogramEntry, - OutputHistogramInput, + types::{ + Chain, ExtendedBlockHeader, MissingTxsInBlock, OutputOnChain, VerifiedBlockInformation, + }, + AltBlockInformation, BlockCompleteEntry, ChainId, ChainInfo, CoinbaseTxSum, + OutputHistogramEntry, OutputHistogramInput, }; //---------------------------------------------------------------------------------------------------- ReadRequest @@ -27,6 +29,11 @@ use crate::{ /// See `Response` for the expected responses per `Request`. #[derive(Debug, Clone, PartialEq, Eq)] pub enum BlockchainReadRequest { + /// Request [`BlockCompleteEntry`]s. + /// + /// The input is the block hashes. + BlockCompleteEntries(Vec<[u8; 32]>), + /// Request a block's extended header. /// /// The input is the block's height. @@ -96,6 +103,16 @@ pub enum BlockchainReadRequest { /// A request for the compact chain history. CompactChainHistory, + /// A request for the next chain entry. + /// + /// Input is a list of block hashes and the amount of block hashes to return in the next chain entry. + /// + /// # Invariant + /// The [`Vec`] containing the block IDs must be sorted in reverse chronological block + /// order, or else the returned response is unspecified and meaningless, + /// as this request performs a binary search + NextChainEntry(Vec<[u8; 32]>, usize), + /// A request to find the first unknown block ID in a list of block IDs. /// /// # Invariant @@ -104,6 +121,16 @@ pub enum BlockchainReadRequest { /// as this request performs a binary search. FindFirstUnknown(Vec<[u8; 32]>), + /// A request for transactions from a specific block. + MissingTxsInBlock { + /// The block to get transactions from. + block_hash: [u8; 32], + /// The indexes of the transactions from the block. + /// This is not the global index of the txs, instead it is the local index as they appear in + /// the block. + tx_indexes: Vec, + }, + /// A request for all alt blocks in the chain with the given [`ChainId`]. AltBlocksInChain(ChainId), @@ -182,6 +209,16 @@ pub enum BlockchainWriteRequest { #[expect(clippy::large_enum_variant)] pub enum BlockchainResponse { //------------------------------------------------------ Reads + /// Response to [`BlockchainReadRequest::BlockCompleteEntries`]. + BlockCompleteEntries { + /// The [`BlockCompleteEntry`]s that we had. + blocks: Vec, + /// The hashes of blocks that were requested, but we don't have. + missing_hashes: Vec<[u8; 32]>, + /// Our blockchain height. + blockchain_height: usize, + }, + /// Response to [`BlockchainReadRequest::BlockExtendedHeader`]. /// /// Inner value is the extended headed of the requested block. @@ -248,6 +285,24 @@ pub enum BlockchainResponse { cumulative_difficulty: u128, }, + /// Response to [`BlockchainReadRequest::NextChainEntry`]. + /// + /// If all blocks were unknown `start_height` will be `0`, the other fields will be meaningless. + NextChainEntry { + /// The start height of this entry, `0` if we could not find the split point. + start_height: usize, + /// The current chain height. + chain_height: usize, + /// The next block hashes in the entry. + block_ids: Vec<[u8; 32]>, + /// The block weights of the next blocks. + block_weights: Vec, + /// The current cumulative difficulty of our chain. + cumulative_difficulty: u128, + /// The block blob of the 2nd block in `block_ids`, if there is one. + first_block_blob: Option>, + }, + /// Response to [`BlockchainReadRequest::FindFirstUnknown`]. /// /// Contains the index of the first unknown block and its expected height. @@ -255,7 +310,12 @@ pub enum BlockchainResponse { /// This will be [`None`] if all blocks were known. FindFirstUnknown(Option<(usize, usize)>), - /// Response to [`BlockchainReadRequest::AltBlocksInChain`]. + /// The response for [`BlockchainReadRequest::MissingTxsInBlock`]. + /// + /// Will return [`None`] if the request contained an index out of range. + MissingTxsInBlock(Option), + + /// The response for [`BlockchainReadRequest::AltBlocksInChain`]. /// /// Contains all the alt blocks in the alt-chain in chronological order. AltBlocksInChain(Vec), diff --git a/types/src/lib.rs b/types/src/lib.rs index a5a04f9..51d37d6 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -26,8 +26,8 @@ pub use transaction_verification_data::{ pub use types::{ AddAuxPow, AltBlockInformation, AuxPow, Chain, ChainId, ChainInfo, CoinbaseTxSum, ExtendedBlockHeader, FeeEstimate, HardForkInfo, MinerData, MinerDataTxBacklogEntry, - OutputHistogramEntry, OutputHistogramInput, OutputOnChain, VerifiedBlockInformation, - VerifiedTransactionInformation, + MissingTxsInBlock, OutputHistogramEntry, OutputHistogramInput, OutputOnChain, + VerifiedBlockInformation, VerifiedTransactionInformation, }; //---------------------------------------------------------------------------------------------------- Feature-gated diff --git a/types/src/types.rs b/types/src/types.rs index 720ad0a..ebb02c5 100644 --- a/types/src/types.rs +++ b/types/src/types.rs @@ -259,6 +259,13 @@ pub struct AddAuxPow { pub aux_pow: Vec, } +/// The inner response for a request for missing txs. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct MissingTxsInBlock { + pub block: Vec, + pub txs: Vec>, +} + //---------------------------------------------------------------------------------------------------- Tests #[cfg(test)] mod test {