review fixes

This commit is contained in:
Boog900 2024-10-26 23:02:51 +01:00
parent 5cae64f214
commit 48a4fa89a9
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
7 changed files with 61 additions and 54 deletions
binaries/cuprated/src
storage/txpool/src/service

View file

@ -137,7 +137,21 @@ pub async fn handle_incoming_block(
return Ok(IncomingBlockOk::AlreadyHave);
}
// From this point on we MUST not early return without removing the block hash from `BLOCKS_BEING_HANDLED`.
// We must remove the block hash from `BLOCKS_BEING_HANDLED`.
let _guard = {
struct RemoveFromBlocksBeingHandled {
block_hash: [u8; 32],
}
impl Drop for RemoveFromBlocksBeingHandled {
fn drop(&mut self) {
BLOCKS_BEING_HANDLED
.lock()
.unwrap()
.remove(&self.block_hash);
}
}
RemoveFromBlocksBeingHandled { block_hash }
};
let (response_tx, response_rx) = oneshot::channel();
@ -150,15 +164,10 @@ pub async fn handle_incoming_block(
.await
.expect("TODO: don't actually panic here, an err means we are shutting down");
let res = response_rx
response_rx
.await
.expect("The blockchain manager will always respond")
.map_err(IncomingBlockError::InvalidBlock);
// Remove the block hash from the blocks being handled.
BLOCKS_BEING_HANDLED.lock().unwrap().remove(&block_hash);
res
.map_err(IncomingBlockError::InvalidBlock)
}
/// Check if we have a block with the given hash.

View file

@ -13,30 +13,3 @@ mod incoming_tx;
mod txs_being_handled;
pub use incoming_tx::IncomingTxHandler;
/// Initialize the [`IncomingTxHandler`].
#[expect(clippy::significant_drop_tightening)]
pub fn incoming_tx_handler(
clear_net: NetworkInterface<ClearNet>,
txpool_write_handle: TxpoolWriteHandle,
txpool_read_handle: TxpoolReadHandle,
blockchain_context_cache: BlockChainContextService,
tx_verifier_service: ConcreteTxVerifierService,
) -> IncomingTxHandler {
let dandelion_router = dandelion::dandelion_router(clear_net);
let dandelion_pool_manager = dandelion::start_dandelion_pool_manager(
dandelion_router,
txpool_read_handle.clone(),
txpool_write_handle.clone(),
);
IncomingTxHandler {
txs_being_handled: txs_being_handled::TxsBeingHandled::new(),
blockchain_context_cache,
dandelion_pool_manager,
tx_verifier_service,
txpool_write_handle,
txpool_read_handle,
}
}

View file

@ -18,7 +18,7 @@ mod tx_store;
/// The configuration used for [`cuprate_dandelion_tower`].
///
/// TODO: should we expose this? probably not.
/// TODO: should we expose this to users of cuprated? probably not.
const DANDELION_CONFIG: DandelionConfig = DandelionConfig {
time_between_hop: Duration::from_millis(175),
epoch_duration: Duration::from_secs(10 * 60),

View file

@ -10,7 +10,7 @@ use cuprate_dandelion_tower::traits::DiffuseRequest;
use cuprate_p2p::{BroadcastRequest, BroadcastSvc};
use cuprate_p2p_core::ClearNet;
use super::DandelionTx;
use crate::txpool::dandelion::DandelionTx;
/// The dandelion diffusion service.
pub struct DiffuseService {
@ -30,16 +30,14 @@ impl Service<DiffuseRequest<DandelionTx>> for DiffuseService {
fn call(&mut self, req: DiffuseRequest<DandelionTx>) -> Self::Future {
// TODO: the dandelion crate should pass along where we got the tx from.
// TODO: Use `into_inner` when 1.82.0 stabilizes.
self.clear_net_broadcast_service
let Ok(()) = self
.clear_net_broadcast_service
.call(BroadcastRequest::Transaction {
tx_bytes: req.0 .0,
direction: None,
received_from: None,
})
.now_or_never()
.unwrap()
.expect("Broadcast service is Infallible");
.into_inner();
ready(Ok(()))
}

View file

@ -15,8 +15,7 @@ use cuprate_p2p_core::{
};
use cuprate_wire::protocol::NewTransactions;
use super::DandelionTx;
use crate::p2p::CrossNetworkInternalPeerId;
use crate::{p2p::CrossNetworkInternalPeerId, txpool::dandelion::DandelionTx};
/// The dandelion outbound peer stream.
pub struct OutboundPeerStream {

View file

@ -18,6 +18,8 @@ use cuprate_dandelion_tower::{
State, TxState,
};
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_p2p::NetworkInterface;
use cuprate_p2p_core::ClearNet;
use cuprate_txpool::{
service::{
interface::{
@ -34,7 +36,10 @@ use crate::{
constants::PANIC_CRITICAL_SERVICE_ERROR,
p2p::CrossNetworkInternalPeerId,
signals::REORG_LOCK,
txpool::txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally},
txpool::{
dandelion,
txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally},
},
};
/// An error that can happen handling an incoming tx.
@ -78,6 +83,35 @@ pub struct IncomingTxHandler {
pub(super) txpool_read_handle: TxpoolReadHandle,
}
impl IncomingTxHandler {
/// Initialize the [`IncomingTxHandler`].
#[expect(clippy::significant_drop_tightening)]
pub fn init(
clear_net: NetworkInterface<ClearNet>,
txpool_write_handle: TxpoolWriteHandle,
txpool_read_handle: TxpoolReadHandle,
blockchain_context_cache: BlockChainContextService,
tx_verifier_service: ConcreteTxVerifierService,
) -> Self {
let dandelion_router = dandelion::dandelion_router(clear_net);
let dandelion_pool_manager = dandelion::start_dandelion_pool_manager(
dandelion_router,
txpool_read_handle.clone(),
txpool_write_handle.clone(),
);
Self {
txs_being_handled: TxsBeingHandled::new(),
blockchain_context_cache,
dandelion_pool_manager,
tx_verifier_service,
txpool_write_handle,
txpool_read_handle,
}
}
}
impl Service<IncomingTxs> for IncomingTxHandler {
type Response = ();
type Error = IncomingTxError;
@ -89,8 +123,7 @@ impl Service<IncomingTxs> for IncomingTxHandler {
fn call(&mut self, req: IncomingTxs) -> Self::Future {
handle_incoming_txs(
req.txs,
req.state,
req,
self.txs_being_handled.clone(),
self.blockchain_context_cache.clone(),
self.tx_verifier_service.clone(),
@ -105,8 +138,7 @@ impl Service<IncomingTxs> for IncomingTxHandler {
/// Handles the incoming txs.
#[expect(clippy::too_many_arguments)]
async fn handle_incoming_txs(
txs: Vec<Bytes>,
state: TxState<CrossNetworkInternalPeerId>,
IncomingTxs { txs, state }: IncomingTxs,
txs_being_handled: TxsBeingHandled,
mut blockchain_context_cache: BlockChainContextService,
mut tx_verifier_service: ConcreteTxVerifierService,
@ -197,7 +229,7 @@ async fn prepare_incoming_txs(
let txs = tx_blobs
.into_iter()
.filter_map(|tx_blob| {
let tx_blob_hash = transaction_blob_hash(tx_blob.as_ref());
let tx_blob_hash = transaction_blob_hash(&tx_blob);
// If a duplicate is in here the incoming tx batch contained the same tx twice.
if !tx_blob_hashes.insert(tx_blob_hash) {

View file

@ -94,14 +94,10 @@ pub enum TxpoolWriteRequest {
},
/// Remove a transaction with the given hash from the pool.
///
/// Returns [`TxpoolWriteResponse::Ok`].
RemoveTransaction(TransactionHash),
/// Promote a transaction from the stem pool to the fluff pool.
/// If the tx is already in the fluff pool this does nothing.
///
/// Returns [`TxpoolWriteResponse::Ok`].
Promote(TransactionHash),
/// Tell the tx-pool about a new block.