diff --git a/binaries/cuprated/src/blockchain/interface.rs b/binaries/cuprated/src/blockchain/interface.rs index 985e60d8..7f7dfd3a 100644 --- a/binaries/cuprated/src/blockchain/interface.rs +++ b/binaries/cuprated/src/blockchain/interface.rs @@ -15,6 +15,8 @@ use tower::{Service, ServiceExt}; use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_consensus::transactions::new_tx_verification_data; use cuprate_helper::cast::usize_to_u64; +use cuprate_txpool::service::interface::{TxpoolReadRequest, TxpoolReadResponse}; +use cuprate_txpool::service::TxpoolReadHandle; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, Chain, @@ -38,7 +40,7 @@ pub enum IncomingBlockError { /// /// The inner values are the block hash and the indexes of the missing txs in the block. #[error("Unknown transactions in block.")] - UnknownTransactions([u8; 32], Vec), + UnknownTransactions([u8; 32], Vec), /// We are missing the block's parent. #[error("The block has an unknown parent.")] Orphan, @@ -59,8 +61,9 @@ pub enum IncomingBlockError { /// - the block's parent is unknown pub async fn handle_incoming_block( block: Block, - given_txs: Vec, + mut given_txs: HashMap<[u8; 32], Transaction>, blockchain_read_handle: &mut BlockchainReadHandle, + txpool_read_handle: &mut TxpoolReadHandle, ) -> Result { /// A [`HashSet`] of block hashes that the blockchain manager is currently handling. /// @@ -72,7 +75,12 @@ pub async fn handle_incoming_block( /// which are also more expensive than `Mutex`s. static BLOCKS_BEING_HANDLED: LazyLock>> = LazyLock::new(|| Mutex::new(HashSet::new())); - // FIXME: we should look in the tx-pool for txs when that is ready. + + if given_txs.len() > block.transactions.len() { + return Err(IncomingBlockError::InvalidBlock(anyhow::anyhow!( + "Too many transactions given for block" + ))); + } if !block_exists(block.header.previous, blockchain_read_handle) .await @@ -90,23 +98,32 @@ pub async fn handle_incoming_block( return Ok(IncomingBlockOk::AlreadyHave); } - // TODO: remove this when we have a working tx-pool. - if given_txs.len() != block.transactions.len() { - return Err(IncomingBlockError::UnknownTransactions( - block_hash, - (0..usize_to_u64(block.transactions.len())).collect(), - )); - } + let TxpoolReadResponse::TxsForBlock { mut txs, missing } = txpool_read_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolReadRequest::TxsForBlock(block.transactions.clone())) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; - // TODO: check we actually got given the right txs. - let prepped_txs = given_txs - .into_par_iter() - .map(|tx| { - let tx = new_tx_verification_data(tx)?; - Ok((tx.tx_hash, tx)) - }) - .collect::>() - .map_err(IncomingBlockError::InvalidBlock)?; + if !missing.is_empty() { + let needed_hashes = missing.iter().map(|index| block.transactions[*index]); + + for needed_hash in needed_hashes { + let Some(tx) = given_txs.remove(&needed_hash) else { + return Err(IncomingBlockError::UnknownTransactions(block_hash, missing)); + }; + + txs.insert( + needed_hash, + new_tx_verification_data(tx) + .map_err(|e| IncomingBlockError::InvalidBlock(e.into()))?, + ); + } + } let Some(incoming_block_tx) = COMMAND_TX.get() else { // We could still be starting up the blockchain manager. @@ -126,7 +143,7 @@ pub async fn handle_incoming_block( incoming_block_tx .send(BlockchainManagerCommand::AddBlock { block, - prepped_txs, + prepped_txs: txs, response_tx, }) .await diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index 0e570399..8e88065a 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -1,9 +1,9 @@ //! Tx-pool [`service`](super) interface. //! //! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums. -use std::{collections::HashSet, sync::Arc}; - use cuprate_types::TransactionVerificationData; +use std::collections::HashMap; +use std::{collections::HashSet, sync::Arc}; use crate::types::{TransactionBlobHash, TransactionHash}; @@ -19,6 +19,8 @@ pub enum TxpoolReadRequest { /// /// The hash is **not** the transaction hash, it is the hash of the serialized tx-blob. FilterKnownTxBlobHashes(HashSet), + /// A request to pull some transactions for an incoming block. + TxsForBlock(Vec), } //---------------------------------------------------------------------------------------------------- TxpoolReadResponse @@ -36,6 +38,10 @@ pub enum TxpoolReadResponse { /// The tx hashes of the blob hashes that were known but were in the stem pool. stem_pool_hashes: Vec, }, + TxsForBlock { + txs: HashMap<[u8; 32], TransactionVerificationData>, + missing: Vec, + }, } //---------------------------------------------------------------------------------------------------- TxpoolWriteRequest diff --git a/storage/txpool/src/service/read.rs b/storage/txpool/src/service/read.rs index 9e92a0d4..47c1d657 100644 --- a/storage/txpool/src/service/read.rs +++ b/storage/txpool/src/service/read.rs @@ -1,13 +1,15 @@ -use std::{collections::HashSet, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use rayon::ThreadPool; use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError}; use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; -use crate::ops::in_stem_pool; use crate::{ - ops::get_transaction_verification_data, + ops::{get_transaction_verification_data, in_stem_pool}, service::{ interface::{TxpoolReadRequest, TxpoolReadResponse}, types::{ReadResponseResult, TxpoolReadHandle}, @@ -51,7 +53,6 @@ fn init_read_service_with_pool(env: Arc, pool: Arc) -> /// 1. `Request` is mapped to a handler function /// 2. Handler function is called /// 3. [`TxpoolReadResponse`] is returned -#[expect(clippy::needless_pass_by_value)] fn map_request( env: &ConcreteEnv, // Access to the database request: TxpoolReadRequest, // The request we must fulfill @@ -62,6 +63,7 @@ fn map_request( TxpoolReadRequest::FilterKnownTxBlobHashes(blob_hashes) => { filter_known_tx_blob_hashes(env, blob_hashes) } + TxpoolReadRequest::TxsForBlock(txs_needed) => txs_for_block(env, txs_needed), } } @@ -157,3 +159,29 @@ fn filter_known_tx_blob_hashes( stem_pool_hashes, }) } + +/// [`TxpoolReadRequest::TxsForBlock`]. +fn txs_for_block(env: &ConcreteEnv, txs: Vec) -> ReadResponseResult { + let inner_env = env.env_inner(); + let tx_ro = inner_env.tx_ro()?; + + let tables = inner_env.open_tables(&tx_ro)?; + + let mut missing_tx_indexes = Vec::with_capacity(txs.len()); + let mut txs_verification_data = HashMap::with_capacity(txs.len()); + + for (i, tx_hash) in txs.into_iter().enumerate() { + match get_transaction_verification_data(&tx_hash, &tables) { + Ok(tx) => { + txs_verification_data.insert(tx_hash, tx); + } + Err(RuntimeError::KeyNotFound) => missing_tx_indexes.push(i), + Err(e) => return Err(e), + } + } + + Ok(TxpoolReadResponse::TxsForBlock { + txs: txs_verification_data, + missing: missing_tx_indexes, + }) +} diff --git a/storage/txpool/src/types.rs b/storage/txpool/src/types.rs index 83d9e01a..2acb819e 100644 --- a/storage/txpool/src/types.rs +++ b/storage/txpool/src/types.rs @@ -6,7 +6,6 @@ //! //! use bytemuck::{Pod, Zeroable}; - use monero_serai::transaction::Timelock; use cuprate_types::{CachedVerificationState, HardFork};