check txpool on incoming block

This commit is contained in:
Boog900 2024-10-13 22:43:53 +01:00
parent 93fb3c657c
commit 5ab5b062fd
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
4 changed files with 77 additions and 27 deletions

View file

@ -15,6 +15,8 @@ use tower::{Service, ServiceExt};
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_helper::cast::usize_to_u64;
use cuprate_txpool::service::interface::{TxpoolReadRequest, TxpoolReadResponse};
use cuprate_txpool::service::TxpoolReadHandle;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain,
@ -38,7 +40,7 @@ pub enum IncomingBlockError {
///
/// The inner values are the block hash and the indexes of the missing txs in the block.
#[error("Unknown transactions in block.")]
UnknownTransactions([u8; 32], Vec<u64>),
UnknownTransactions([u8; 32], Vec<usize>),
/// We are missing the block's parent.
#[error("The block has an unknown parent.")]
Orphan,
@ -59,8 +61,9 @@ pub enum IncomingBlockError {
/// - the block's parent is unknown
pub async fn handle_incoming_block(
block: Block,
given_txs: Vec<Transaction>,
mut given_txs: HashMap<[u8; 32], Transaction>,
blockchain_read_handle: &mut BlockchainReadHandle,
txpool_read_handle: &mut TxpoolReadHandle,
) -> Result<IncomingBlockOk, IncomingBlockError> {
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
///
@ -72,7 +75,12 @@ pub async fn handle_incoming_block(
/// which are also more expensive than `Mutex`s.
static BLOCKS_BEING_HANDLED: LazyLock<Mutex<HashSet<[u8; 32]>>> =
LazyLock::new(|| Mutex::new(HashSet::new()));
// FIXME: we should look in the tx-pool for txs when that is ready.
if given_txs.len() > block.transactions.len() {
return Err(IncomingBlockError::InvalidBlock(anyhow::anyhow!(
"Too many transactions given for block"
)));
}
if !block_exists(block.header.previous, blockchain_read_handle)
.await
@ -90,23 +98,32 @@ pub async fn handle_incoming_block(
return Ok(IncomingBlockOk::AlreadyHave);
}
// TODO: remove this when we have a working tx-pool.
if given_txs.len() != block.transactions.len() {
return Err(IncomingBlockError::UnknownTransactions(
block_hash,
(0..usize_to_u64(block.transactions.len())).collect(),
));
}
let TxpoolReadResponse::TxsForBlock { mut txs, missing } = txpool_read_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolReadRequest::TxsForBlock(block.transactions.clone()))
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!()
};
// TODO: check we actually got given the right txs.
let prepped_txs = given_txs
.into_par_iter()
.map(|tx| {
let tx = new_tx_verification_data(tx)?;
Ok((tx.tx_hash, tx))
})
.collect::<Result<_, anyhow::Error>>()
.map_err(IncomingBlockError::InvalidBlock)?;
if !missing.is_empty() {
let needed_hashes = missing.iter().map(|index| block.transactions[*index]);
for needed_hash in needed_hashes {
let Some(tx) = given_txs.remove(&needed_hash) else {
return Err(IncomingBlockError::UnknownTransactions(block_hash, missing));
};
txs.insert(
needed_hash,
new_tx_verification_data(tx)
.map_err(|e| IncomingBlockError::InvalidBlock(e.into()))?,
);
}
}
let Some(incoming_block_tx) = COMMAND_TX.get() else {
// We could still be starting up the blockchain manager.
@ -126,7 +143,7 @@ pub async fn handle_incoming_block(
incoming_block_tx
.send(BlockchainManagerCommand::AddBlock {
block,
prepped_txs,
prepped_txs: txs,
response_tx,
})
.await

View file

@ -1,9 +1,9 @@
//! Tx-pool [`service`](super) interface.
//!
//! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums.
use std::{collections::HashSet, sync::Arc};
use cuprate_types::TransactionVerificationData;
use std::collections::HashMap;
use std::{collections::HashSet, sync::Arc};
use crate::types::{TransactionBlobHash, TransactionHash};
@ -19,6 +19,8 @@ pub enum TxpoolReadRequest {
///
/// The hash is **not** the transaction hash, it is the hash of the serialized tx-blob.
FilterKnownTxBlobHashes(HashSet<TransactionBlobHash>),
/// A request to pull some transactions for an incoming block.
TxsForBlock(Vec<TransactionHash>),
}
//---------------------------------------------------------------------------------------------------- TxpoolReadResponse
@ -36,6 +38,10 @@ pub enum TxpoolReadResponse {
/// The tx hashes of the blob hashes that were known but were in the stem pool.
stem_pool_hashes: Vec<TransactionHash>,
},
TxsForBlock {
txs: HashMap<[u8; 32], TransactionVerificationData>,
missing: Vec<usize>,
},
}
//---------------------------------------------------------------------------------------------------- TxpoolWriteRequest

View file

@ -1,13 +1,15 @@
use std::{collections::HashSet, sync::Arc};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use rayon::ThreadPool;
use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError};
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
use crate::ops::in_stem_pool;
use crate::{
ops::get_transaction_verification_data,
ops::{get_transaction_verification_data, in_stem_pool},
service::{
interface::{TxpoolReadRequest, TxpoolReadResponse},
types::{ReadResponseResult, TxpoolReadHandle},
@ -51,7 +53,6 @@ fn init_read_service_with_pool(env: Arc<ConcreteEnv>, pool: Arc<ThreadPool>) ->
/// 1. `Request` is mapped to a handler function
/// 2. Handler function is called
/// 3. [`TxpoolReadResponse`] is returned
#[expect(clippy::needless_pass_by_value)]
fn map_request(
env: &ConcreteEnv, // Access to the database
request: TxpoolReadRequest, // The request we must fulfill
@ -62,6 +63,7 @@ fn map_request(
TxpoolReadRequest::FilterKnownTxBlobHashes(blob_hashes) => {
filter_known_tx_blob_hashes(env, blob_hashes)
}
TxpoolReadRequest::TxsForBlock(txs_needed) => txs_for_block(env, txs_needed),
}
}
@ -157,3 +159,29 @@ fn filter_known_tx_blob_hashes(
stem_pool_hashes,
})
}
/// [`TxpoolReadRequest::TxsForBlock`].
fn txs_for_block(env: &ConcreteEnv, txs: Vec<TransactionHash>) -> ReadResponseResult {
let inner_env = env.env_inner();
let tx_ro = inner_env.tx_ro()?;
let tables = inner_env.open_tables(&tx_ro)?;
let mut missing_tx_indexes = Vec::with_capacity(txs.len());
let mut txs_verification_data = HashMap::with_capacity(txs.len());
for (i, tx_hash) in txs.into_iter().enumerate() {
match get_transaction_verification_data(&tx_hash, &tables) {
Ok(tx) => {
txs_verification_data.insert(tx_hash, tx);
}
Err(RuntimeError::KeyNotFound) => missing_tx_indexes.push(i),
Err(e) => return Err(e),
}
}
Ok(TxpoolReadResponse::TxsForBlock {
txs: txs_verification_data,
missing: missing_tx_indexes,
})
}

View file

@ -6,7 +6,6 @@
//!
//! <!-- FIXME: Add schema here or a link to it when complete -->
use bytemuck::{Pod, Zeroable};
use monero_serai::transaction::Timelock;
use cuprate_types::{CachedVerificationState, HardFork};