Merge branch 'cuprated-txpool' into p2p-request-handler

This commit is contained in:
Boog900 2024-10-14 21:03:15 +01:00
commit 5ba2bf020a
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
28 changed files with 972 additions and 51 deletions

2
Cargo.lock generated
View file

@ -911,6 +911,7 @@ dependencies = [
"monero-serai",
"rayon",
"serde",
"sha3",
"tempfile",
"thiserror",
"tokio",
@ -1010,6 +1011,7 @@ dependencies = [
"serde",
"serde_bytes",
"serde_json",
"sha3",
"thiserror",
"thread_local",
"tokio",

View file

@ -78,6 +78,7 @@ rayon = { version = "1.10.0", default-features = false }
serde_bytes = { version = "0.11.15", default-features = false }
serde_json = { version = "1.0.128", default-features = false }
serde = { version = "1.0.210", default-features = false }
sha3 = { version = "0.10.8", default-features = false }
strum = { version = "0.26.3", default-features = false }
thiserror = { version = "1.0.63", default-features = false }
thread_local = { version = "1.1.8", default-features = false }

View file

@ -20,7 +20,7 @@ cuprate-levin = { path = "../../net/levin" }
cuprate-wire = { path = "../../net/wire" }
cuprate-p2p = { path = "../../p2p/p2p" }
cuprate-p2p-core = { path = "../../p2p/p2p-core" }
cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower" }
cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower", features = ["txpool"] }
cuprate-async-buffer = { path = "../../p2p/async-buffer" }
cuprate-address-book = { path = "../../p2p/address-book" }
cuprate-blockchain = { path = "../../storage/blockchain", features = ["service"] }
@ -64,6 +64,7 @@ rayon = { workspace = true }
serde_bytes = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true }
sha3 = { workspace = true, features = ["std"] }
thiserror = { workspace = true }
thread_local = { workspace = true }
tokio-util = { workspace = true }

View file

@ -25,7 +25,7 @@ mod manager;
mod syncer;
mod types;
use types::{
pub use types::{
ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle,
};

View file

@ -15,9 +15,11 @@ use tower::{Service, ServiceExt};
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_helper::cast::usize_to_u64;
use cuprate_txpool::service::interface::{TxpoolReadRequest, TxpoolReadResponse};
use cuprate_txpool::service::TxpoolReadHandle;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain, TransactionVerificationData,
Chain,
};
use crate::{
@ -38,7 +40,7 @@ pub enum IncomingBlockError {
///
/// The inner values are the block hash and the indexes of the missing txs in the block.
#[error("Unknown transactions in block.")]
UnknownTransactions([u8; 32], Vec<u64>),
UnknownTransactions([u8; 32], Vec<usize>),
/// We are missing the block's parent.
#[error("The block has an unknown parent.")]
Orphan,
@ -59,8 +61,9 @@ pub enum IncomingBlockError {
/// - the block's parent is unknown
pub async fn handle_incoming_block(
block: Block,
given_txs: Vec<Transaction>,
mut given_txs: HashMap<[u8; 32], Transaction>,
blockchain_read_handle: &mut BlockchainReadHandle,
txpool_read_handle: &mut TxpoolReadHandle,
) -> Result<IncomingBlockOk, IncomingBlockError> {
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
///
@ -72,7 +75,12 @@ pub async fn handle_incoming_block(
/// which are also more expensive than `Mutex`s.
static BLOCKS_BEING_HANDLED: LazyLock<Mutex<HashSet<[u8; 32]>>> =
LazyLock::new(|| Mutex::new(HashSet::new()));
// FIXME: we should look in the tx-pool for txs when that is ready.
if given_txs.len() > block.transactions.len() {
return Err(IncomingBlockError::InvalidBlock(anyhow::anyhow!(
"Too many transactions given for block"
)));
}
if !block_exists(block.header.previous, blockchain_read_handle)
.await
@ -90,23 +98,32 @@ pub async fn handle_incoming_block(
return Ok(IncomingBlockOk::AlreadyHave);
}
// TODO: remove this when we have a working tx-pool.
if given_txs.len() != block.transactions.len() {
return Err(IncomingBlockError::UnknownTransactions(
block_hash,
(0..usize_to_u64(block.transactions.len())).collect(),
));
}
let TxpoolReadResponse::TxsForBlock { mut txs, missing } = txpool_read_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolReadRequest::TxsForBlock(block.transactions.clone()))
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!()
};
// TODO: check we actually got given the right txs.
let prepped_txs = given_txs
.into_par_iter()
.map(|tx| {
let tx = new_tx_verification_data(tx)?;
Ok((tx.tx_hash, tx))
})
.collect::<Result<_, anyhow::Error>>()
.map_err(IncomingBlockError::InvalidBlock)?;
if !missing.is_empty() {
let needed_hashes = missing.iter().map(|index| block.transactions[*index]);
for needed_hash in needed_hashes {
let Some(tx) = given_txs.remove(&needed_hash) else {
return Err(IncomingBlockError::UnknownTransactions(block_hash, missing));
};
txs.insert(
needed_hash,
new_tx_verification_data(tx)
.map_err(|e| IncomingBlockError::InvalidBlock(e.into()))?,
);
}
}
let Some(incoming_block_tx) = COMMAND_TX.get() else {
// We could still be starting up the blockchain manager.
@ -126,7 +143,7 @@ pub async fn handle_incoming_block(
incoming_block_tx
.send(BlockchainManagerCommand::AddBlock {
block,
prepped_txs: todo!(),
prepped_txs: txs,
response_tx,
})
.await

View file

@ -36,6 +36,7 @@ mod commands;
mod handler;
pub use commands::{BlockchainManagerCommand, IncomingBlockOk};
use cuprate_txpool::service::TxpoolWriteHandle;
/// Initialize the blockchain manager.
///
@ -45,6 +46,7 @@ pub async fn init_blockchain_manager(
clearnet_interface: NetworkInterface<ClearNet>,
blockchain_write_handle: BlockchainWriteHandle,
blockchain_read_handle: BlockchainReadHandle,
txpool_write_handle: TxpoolWriteHandle,
mut blockchain_context_service: BlockChainContextService,
block_verifier_service: ConcreteBlockVerifierService,
block_downloader_config: BlockDownloaderConfig,
@ -79,6 +81,7 @@ pub async fn init_blockchain_manager(
let manager = BlockchainManager {
blockchain_write_handle,
blockchain_read_handle,
txpool_write_handle,
blockchain_context_service,
cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(),
block_verifier_service,
@ -101,6 +104,8 @@ pub struct BlockchainManager {
blockchain_write_handle: BlockchainWriteHandle,
/// A [`BlockchainReadHandle`].
blockchain_read_handle: BlockchainReadHandle,
/// A [`TxpoolWriteHandle`].
txpool_write_handle: TxpoolWriteHandle,
// TODO: Improve the API of the cache service.
// TODO: rename the cache service -> `BlockchainContextService`.
/// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve

View file

@ -1,6 +1,7 @@
//! The blockchain manager handler functions.
use bytes::Bytes;
use futures::{TryFutureExt, TryStreamExt};
use monero_serai::transaction::Input;
use monero_serai::{block::Block, transaction::Transaction};
use rayon::prelude::*;
use std::ops::ControlFlow;
@ -17,6 +18,7 @@ use cuprate_consensus::{
};
use cuprate_helper::cast::usize_to_u64;
use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest};
use cuprate_txpool::service::interface::TxpoolWriteRequest;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest},
AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation,
@ -434,6 +436,18 @@ impl super::BlockchainManager {
&mut self,
verified_block: VerifiedBlockInformation,
) {
// FIXME: this is pretty inefficient, we should probably return the KI map created in the consensus crate.
let spent_key_images = verified_block
.txs
.iter()
.flat_map(|tx| {
tx.tx.prefix().inputs.iter().map(|input| match input {
Input::ToKey { key_image, .. } => key_image.compress().0,
Input::Gen(_) => unreachable!(),
})
})
.collect::<Vec<[u8; 32]>>();
self.blockchain_context_service
.ready()
.await
@ -472,6 +486,14 @@ impl super::BlockchainManager {
};
self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone();
self.txpool_write_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolWriteRequest::NewBlock { spent_key_images })
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}
}

View file

@ -1,3 +1,7 @@
//! Transaction Pool
//!
//! Will handle initiating the tx-pool, providing the preprocessor required for the dandelion pool.
mod dandelion;
mod incoming_tx;
mod txs_being_handled;

View file

@ -0,0 +1,59 @@
use std::time::Duration;
use bytes::Bytes;
use cuprate_dandelion_tower::pool::DandelionPoolService;
use cuprate_dandelion_tower::{DandelionConfig, DandelionRouter, Graph};
use cuprate_p2p::NetworkInterface;
use cuprate_p2p_core::ClearNet;
use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
use cuprate_wire::NetworkAddress;
mod diffuse_service;
mod stem_service;
mod tx_store;
#[derive(Clone)]
pub struct DandelionTx(Bytes);
type TxId = [u8; 32];
const DANDELION_CONFIG: DandelionConfig = DandelionConfig {
time_between_hop: Duration::from_millis(175),
epoch_duration: Duration::from_secs(10 * 60),
fluff_probability: 0.12,
graph: Graph::FourRegular,
};
type ConcreteDandelionRouter = DandelionRouter<
stem_service::OutboundPeerStream,
diffuse_service::DiffuseService,
NetworkAddress,
stem_service::StemPeerService<ClearNet>,
DandelionTx,
>;
pub fn start_dandelion_pool_manager(
router: ConcreteDandelionRouter,
txpool_read_handle: TxpoolReadHandle,
txpool_write_handle: TxpoolWriteHandle,
) -> DandelionPoolService<DandelionTx, TxId, NetworkAddress> {
cuprate_dandelion_tower::pool::start_dandelion_pool_manager(
12,
router,
tx_store::TxStoreService {
txpool_read_handle,
txpool_write_handle,
},
DANDELION_CONFIG,
)
}
pub fn dandelion_router(clear_net: NetworkInterface<ClearNet>) -> ConcreteDandelionRouter {
DandelionRouter::new(
diffuse_service::DiffuseService {
clear_net_broadcast_service: clear_net.broadcast_svc(),
},
stem_service::OutboundPeerStream { clear_net },
DANDELION_CONFIG,
)
}

View file

@ -0,0 +1,45 @@
use std::{
future::{ready, Ready},
task::{Context, Poll},
};
use futures::FutureExt;
use tower::Service;
use cuprate_dandelion_tower::traits::DiffuseRequest;
use cuprate_p2p::{BroadcastRequest, BroadcastSvc, NetworkInterface};
use cuprate_p2p_core::ClearNet;
use super::DandelionTx;
/// The dandelion diffusion service.
pub struct DiffuseService {
pub clear_net_broadcast_service: BroadcastSvc<ClearNet>,
}
impl Service<DiffuseRequest<DandelionTx>> for DiffuseService {
type Response = ();
type Error = tower::BoxError;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.clear_net_broadcast_service
.poll_ready(cx)
.map_err(Into::into)
}
fn call(&mut self, req: DiffuseRequest<DandelionTx>) -> Self::Future {
// TODO: Call `into_inner` when 1.82.0 stabilizes
self.clear_net_broadcast_service
.call(BroadcastRequest::Transaction {
tx_bytes: req.0 .0,
direction: None,
received_from: None,
})
.now_or_never()
.unwrap()
.expect("Broadcast service is Infallible");
ready(Ok(()))
}
}

View file

@ -0,0 +1,67 @@
use std::{
pin::Pin,
task::{Context, Poll},
};
use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer};
use cuprate_p2p::NetworkInterface;
use cuprate_p2p_core::{
client::{Client, InternalPeerID},
ClearNet, NetworkZone, PeerRequest, ProtocolRequest,
};
use cuprate_wire::{protocol::NewTransactions, NetworkAddress};
use bytes::Bytes;
use futures::Stream;
use tower::Service;
use super::DandelionTx;
/// The dandelion outbound peer stream.
pub struct OutboundPeerStream {
pub clear_net: NetworkInterface<ClearNet>,
}
impl Stream for OutboundPeerStream {
type Item = Result<OutboundPeer<NetworkAddress, StemPeerService<ClearNet>>, tower::BoxError>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// TODO: make the outbound peer choice random.
Poll::Ready(Some(Ok(self
.clear_net
.client_pool()
.outbound_client()
.map_or(OutboundPeer::Exhausted, |client| {
let addr = match client.info.id {
InternalPeerID::KnownAddr(addr) => addr,
InternalPeerID::Unknown(_) => panic!("Outbound peer had an unknown address"),
};
OutboundPeer::Peer(addr.into(), StemPeerService(client))
}))))
}
}
/// The stem service, used to send stem txs.
pub struct StemPeerService<N: NetworkZone>(Client<N>);
impl<N: NetworkZone> Service<StemRequest<DandelionTx>> for StemPeerService<N> {
type Response = <Client<N> as Service<PeerRequest>>::Response;
type Error = tower::BoxError;
type Future = <Client<N> as Service<PeerRequest>>::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
}
fn call(&mut self, req: StemRequest<DandelionTx>) -> Self::Future {
self.0
.call(PeerRequest::Protocol(ProtocolRequest::NewTransactions(
NewTransactions {
txs: vec![req.0 .0],
dandelionpp_fluff: false,
padding: Bytes::new(),
},
)))
}
}

View file

@ -0,0 +1,77 @@
use std::{
f32::consts::E,
task::{Context, Poll},
};
use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt, StreamExt, TryFutureExt};
use tower::{util::Oneshot, Service, ServiceExt};
use cuprate_dandelion_tower::{
traits::{TxStoreRequest, TxStoreResponse},
State,
};
use cuprate_database::RuntimeError;
use cuprate_txpool::service::{
interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse},
TxpoolReadHandle, TxpoolWriteHandle,
};
use super::{DandelionTx, TxId};
/// The dandelion tx-store service.
///
/// This is just mapping the interface [`cuprate_dandelion_tower`] wants to what [`cuprate_txpool`] provides.
pub struct TxStoreService {
pub txpool_read_handle: TxpoolReadHandle,
pub txpool_write_handle: TxpoolWriteHandle,
}
impl Service<TxStoreRequest<TxId>> for TxStoreService {
type Response = TxStoreResponse<DandelionTx>;
type Error = tower::BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: TxStoreRequest<TxId>) -> Self::Future {
match req {
TxStoreRequest::Get(tx_id) => self
.txpool_read_handle
.clone()
.oneshot(TxpoolReadRequest::TxBlob(tx_id))
.map(|res| match res {
Ok(TxpoolReadResponse::TxBlob {
tx_blob,
state_stem,
}) => {
let state = if state_stem {
State::Stem
} else {
State::Fluff
};
Ok(TxStoreResponse::Transaction(Some((
DandelionTx(Bytes::from(tx_blob)),
state,
))))
}
Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Transaction(None)),
Err(e) => Err(e.into()),
Ok(_) => unreachable!(),
})
.boxed(),
TxStoreRequest::Promote(tx_id) => self
.txpool_write_handle
.clone()
.oneshot(TxpoolWriteRequest::Promote(tx_id))
.map(|res| match res {
Ok(_) | Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Ok),
Err(e) => Err(e.into()),
})
.boxed(),
}
}
}

View file

@ -0,0 +1,326 @@
use std::{
collections::HashSet,
future::ready,
sync::Arc,
task::{Context, Poll},
};
use bytes::Bytes;
use dashmap::DashSet;
use futures::{future::BoxFuture, FutureExt};
use monero_serai::transaction::Transaction;
use sha3::{Digest, Sha3_256};
use tower::{Service, ServiceExt};
use cuprate_consensus::{
transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
BlockChainContextService, ExtendedConsensusError, TxVerifierService, VerifyTxRequest,
VerifyTxResponse,
};
use cuprate_dandelion_tower::{
pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder},
State, TxState,
};
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_txpool::service::{
interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse},
TxpoolReadHandle, TxpoolWriteHandle,
};
use cuprate_types::TransactionVerificationData;
use cuprate_wire::NetworkAddress;
use crate::{
blockchain::ConcreteTxVerifierService,
constants::PANIC_CRITICAL_SERVICE_ERROR,
signals::REORG_LOCK,
txpool::txs_being_handled::{tx_blob_hash, TxBeingHandledLocally, TxsBeingHandled},
};
/// An error that can happen handling an incoming tx.
pub enum IncomingTxError {
Parse(std::io::Error),
Consensus(ExtendedConsensusError),
DuplicateTransaction,
}
/// Incoming transactions.
pub struct IncomingTxs {
pub txs: Vec<Bytes>,
pub state: TxState<NetworkAddress>,
}
/// The transaction type used for dandelion++.
#[derive(Clone)]
struct DandelionTx(Bytes);
/// A transaction ID/hash.
type TxId = [u8; 32];
/// The service than handles incoming transaction pool transactions.
///
/// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes.
pub struct IncomingTxHandler {
/// A store of txs currently being handled in incoming tx requests.
txs_being_handled: TxsBeingHandled,
/// The blockchain context cache.
blockchain_context_cache: BlockChainContextService,
/// The dandelion txpool manager.
dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, NetworkAddress>,
/// The transaction verifier service.
tx_verifier_service: ConcreteTxVerifierService,
/// The txpool write handle.
txpool_write_handle: TxpoolWriteHandle,
/// The txpool read handle.
txpool_read_handle: TxpoolReadHandle,
}
impl Service<IncomingTxs> for IncomingTxHandler {
type Response = ();
type Error = IncomingTxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: IncomingTxs) -> Self::Future {
handle_incoming_txs(
req.txs,
req.state,
self.txs_being_handled.clone(),
self.blockchain_context_cache.clone(),
self.tx_verifier_service.clone(),
self.txpool_write_handle.clone(),
self.txpool_read_handle.clone(),
self.dandelion_pool_manager.clone(),
)
.boxed()
}
}
#[expect(clippy::too_many_arguments)]
async fn handle_incoming_txs(
txs: Vec<Bytes>,
state: TxState<NetworkAddress>,
txs_being_handled: TxsBeingHandled,
mut blockchain_context_cache: BlockChainContextService,
mut tx_verifier_service: ConcreteTxVerifierService,
mut txpool_write_handle: TxpoolWriteHandle,
mut txpool_read_handle: TxpoolReadHandle,
mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, NetworkAddress>,
) -> Result<(), IncomingTxError> {
let reorg_guard = REORG_LOCK.read().await;
let (txs, stem_pool_txs, txs_being_handled_guard) =
prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?;
let BlockChainContextResponse::Context(context) = blockchain_context_cache
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::Context)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!()
};
let context = context.unchecked_blockchain_context();
tx_verifier_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(VerifyTxRequest::Prepped {
txs: txs.clone(),
current_chain_height: context.chain_height,
top_hash: context.top_hash,
time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(),
hf: context.current_hf,
})
.await
.map_err(IncomingTxError::Consensus)?;
for tx in txs {
handle_valid_tx(
tx,
state.clone(),
&mut txpool_write_handle,
&mut dandelion_pool_manager,
)
.await;
}
for stem_tx in stem_pool_txs {
rerelay_stem_tx(
&stem_tx,
state.clone(),
&mut txpool_read_handle,
&mut dandelion_pool_manager,
)
.await;
}
Ok(())
}
/// Prepares the incoming transactions for verification.
///
/// This will filter out all transactions already in the pool or txs already being handled in another request.
async fn prepare_incoming_txs(
tx_blobs: Vec<Bytes>,
txs_being_handled: TxsBeingHandled,
txpool_read_handle: &mut TxpoolReadHandle,
) -> Result<
(
Vec<Arc<TransactionVerificationData>>,
Vec<TxId>,
TxBeingHandledLocally,
),
IncomingTxError,
> {
let mut tx_blob_hashes = HashSet::new();
let mut txs_being_handled_locally = txs_being_handled.local_tracker();
// Compute the blob hash for each tx and filter out the txs currently being handled by another incoming tx batch.
let txs = tx_blobs
.into_iter()
.filter_map(|tx_blob| {
let tx_blob_hash = tx_blob_hash(tx_blob.as_ref());
// If a duplicate is in here the incoming tx batch contained the same tx twice.
if !tx_blob_hashes.insert(tx_blob_hash) {
return Some(Err(IncomingTxError::DuplicateTransaction));
}
// If a duplicate is here it is being handled in another batch.
if !txs_being_handled_locally.try_add_tx(tx_blob_hash) {
return None;
}
Some(Ok((tx_blob_hash, tx_blob)))
})
.collect::<Result<Vec<_>, _>>()?;
// Filter the txs already in the txpool out.
// This will leave the txs already in the pool in [`TxBeingHandledLocally`] but that shouldn't be an issue.
let TxpoolReadResponse::FilterKnownTxBlobHashes {
unknown_blob_hashes,
stem_pool_hashes,
} = txpool_read_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes))
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!()
};
// Now prepare the txs for verification.
rayon_spawn_async(move || {
let txs = txs
.into_iter()
.filter_map(|(tx_blob_hash, tx_blob)| {
if unknown_blob_hashes.contains(&tx_blob_hash) {
Some(tx_blob)
} else {
None
}
})
.map(|bytes| {
let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?;
let tx = new_tx_verification_data(tx)
.map_err(|e| IncomingTxError::Consensus(e.into()))?;
Ok(Arc::new(tx))
})
.collect::<Result<Vec<_>, IncomingTxError>>()?;
Ok((txs, stem_pool_hashes, txs_being_handled_locally))
})
.await
}
async fn handle_valid_tx(
tx: Arc<TransactionVerificationData>,
state: TxState<NetworkAddress>,
txpool_write_handle: &mut TxpoolWriteHandle,
dandelion_pool_manager: &mut DandelionPoolService<DandelionTx, TxId, NetworkAddress>,
) {
let incoming_tx =
IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash);
let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolWriteRequest::AddTransaction {
tx,
state_stem: state.state_stem(),
})
.await
.expect("TODO")
else {
unreachable!()
};
// TODO: track double spends to quickly ignore them from their blob hash.
if let Some(tx_hash) = double_spend {
return;
};
// TODO: There is a race condition possible if a tx and block come in at the same time <https://github.com/Cuprate/cuprate/issues/314>.
let incoming_tx = incoming_tx
.with_routing_state(state)
.with_state_in_db(None)
.build()
.unwrap();
dandelion_pool_manager
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(incoming_tx)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}
async fn rerelay_stem_tx(
tx_hash: &TxId,
state: TxState<NetworkAddress>,
txpool_read_handle: &mut TxpoolReadHandle,
dandelion_pool_manager: &mut DandelionPoolService<DandelionTx, TxId, NetworkAddress>,
) {
let TxpoolReadResponse::TxBlob { tx_blob, .. } = txpool_read_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolReadRequest::TxBlob(*tx_hash))
.await
.expect("TODO")
else {
unreachable!()
};
let incoming_tx =
IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx_blob)), *tx_hash);
// TODO: fill this in properly.
let incoming_tx = incoming_tx
.with_routing_state(state)
.with_state_in_db(Some(State::Stem))
.build()
.unwrap();
dandelion_pool_manager
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(incoming_tx)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}

View file

@ -0,0 +1,45 @@
use dashmap::DashSet;
use sha3::{Digest, Sha3_256};
use std::sync::Arc;
pub fn tx_blob_hash(tx_bytes: &[u8]) -> [u8; 32] {
let mut hasher = Sha3_256::new();
hasher.update(tx_bytes);
hasher.finalize().into()
}
#[derive(Clone)]
pub struct TxsBeingHandled(Arc<DashSet<[u8; 32]>>);
impl TxsBeingHandled {
pub fn local_tracker(&self) -> TxBeingHandledLocally {
TxBeingHandledLocally {
txs_being_handled: self.clone(),
txs: vec![],
}
}
}
pub struct TxBeingHandledLocally {
txs_being_handled: TxsBeingHandled,
txs: Vec<[u8; 32]>,
}
impl TxBeingHandledLocally {
pub fn try_add_tx(&mut self, tx_blob_hash: [u8; 32]) -> bool {
if !self.txs_being_handled.0.insert(tx_blob_hash) {
return false;
}
self.txs.push(tx_blob_hash);
true
}
}
impl Drop for TxBeingHandledLocally {
fn drop(&mut self) {
for hash in &self.txs {
self.txs_being_handled.0.remove(hash);
}
}
}

View file

@ -73,6 +73,12 @@ pub enum TxState<Id> {
Local,
}
impl<Id> TxState<Id> {
pub const fn state_stem(&self) -> bool {
matches!(self, Self::Local | Self::Stem { .. })
}
}
/// A request to route a transaction.
pub struct DandelionRouteReq<Tx, Id> {
/// The transaction.

View file

@ -18,7 +18,7 @@ use tracing::{Instrument, Span};
use cuprate_p2p_core::{
client::{Client, InternalPeerID},
handles::ConnectionHandle,
NetworkZone,
ConnectionDirection, NetworkZone,
};
pub(crate) mod disconnect_monitor;
@ -165,6 +165,16 @@ impl<N: NetworkZone> ClientPool<N> {
sync_data.cumulative_difficulty() > cumulative_difficulty
})
}
pub fn outbound_client(&self) -> Option<Client<N>> {
let client = self
.clients
.iter()
.find(|element| element.value().info.direction == ConnectionDirection::Outbound)?;
let id = *client.key();
Some(self.clients.remove(&id).unwrap().1)
}
}
mod sealed {

View file

@ -30,6 +30,14 @@ pub struct DatabaseWriteHandle<Req, Res> {
crossbeam::channel::Sender<(Req, oneshot::Sender<Result<Res, RuntimeError>>)>,
}
impl<Req, Res> Clone for DatabaseWriteHandle<Req, Res> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
}
}
}
impl<Req, Res> DatabaseWriteHandle<Req, Res>
where
Req: Send + 'static,

View file

@ -29,6 +29,7 @@ bytemuck = { workspace = true, features = ["must_cast", "derive"
bitflags = { workspace = true, features = ["std", "serde", "bytemuck"] }
thiserror = { workspace = true }
hex = { workspace = true }
sha3 = { workspace = true, features = ["std"] }
tower = { workspace = true, optional = true }
rayon = { workspace = true, optional = true }

View file

@ -1,9 +1,11 @@
//! General free functions (related to the tx-pool database).
//---------------------------------------------------------------------------------------------------- Import
use sha3::{Digest, Sha3_256};
use cuprate_database::{ConcreteEnv, Env, EnvInner, InitError, RuntimeError, TxRw};
use crate::{config::Config, tables::OpenTables};
use crate::{config::Config, tables::OpenTables, types::TransactionBlobHash};
//---------------------------------------------------------------------------------------------------- Free functions
/// Open the txpool database using the passed [`Config`].
@ -60,3 +62,15 @@ pub fn open(config: Config) -> Result<ConcreteEnv, InitError> {
Ok(env)
}
/// Calculate the transaction blob hash.
///
/// This value is supposed to be quick to compute just based of the tx-blob without needing to parse the tx.
///
/// The exact way the hash is calculated is not stable and is subject to change, as such it should not be exposed
/// as a way to interact with Cuprate externally.
pub fn transaction_blob_hash(tx_blob: &[u8]) -> TransactionBlobHash {
let mut hasher = Sha3_256::new();
hasher.update(tx_blob);
hasher.finalize().into()
}

View file

@ -14,7 +14,7 @@ mod tx;
pub mod types;
pub use config::Config;
pub use free::open;
pub use free::{open, transaction_blob_hash};
pub use tx::TxEntry;
//re-exports

View file

@ -85,7 +85,7 @@ mod key_images;
mod tx_read;
mod tx_write;
pub use tx_read::get_transaction_verification_data;
pub use tx_read::{get_transaction_verification_data, in_stem_pool};
pub use tx_write::{add_transaction, remove_transaction};
/// An error that can occur on some tx-write ops.

View file

@ -8,6 +8,8 @@ use monero_serai::transaction::Transaction;
use cuprate_database::{DatabaseRo, RuntimeError};
use cuprate_types::{TransactionVerificationData, TxVersion};
use crate::tables::TransactionInfos;
use crate::types::TxStateFlags;
use crate::{tables::Tables, types::TransactionHash};
/// Gets the [`TransactionVerificationData`] of a transaction in the tx-pool, leaving the tx in the pool.
@ -34,3 +36,13 @@ pub fn get_transaction_verification_data(
cached_verification_state: Mutex::new(cached_verification_state),
})
}
pub fn in_stem_pool(
tx_hash: &TransactionHash,
tx_infos: &impl DatabaseRo<TransactionInfos>,
) -> Result<bool, RuntimeError> {
Ok(tx_infos
.get(tx_hash)?
.flags
.contains(TxStateFlags::STATE_STEM))
}

View file

@ -8,6 +8,7 @@ use cuprate_database::{DatabaseRw, RuntimeError, StorableVec};
use cuprate_types::TransactionVerificationData;
use crate::{
free::transaction_blob_hash,
ops::{
key_images::{add_tx_key_images, remove_tx_key_images},
TxPoolWriteError,
@ -56,6 +57,12 @@ pub fn add_transaction(
let kis_table = tables.spent_key_images_mut();
add_tx_key_images(&tx.tx.prefix().inputs, &tx.tx_hash, kis_table)?;
// Add the blob hash to table 4.
let blob_hash = transaction_blob_hash(&tx.tx_blob);
tables
.known_blob_hashes_mut()
.put(&blob_hash, &tx.tx_hash)?;
Ok(())
}
@ -79,5 +86,9 @@ pub fn remove_transaction(
let kis_table = tables.spent_key_images_mut();
remove_tx_key_images(&tx.prefix().inputs, kis_table)?;
// Remove the blob hash from table 4.
let blob_hash = transaction_blob_hash(&tx_blob);
tables.known_blob_hashes_mut().delete(&blob_hash)?;
Ok(())
}

View file

@ -1,14 +1,21 @@
//! Tx-pool [`service`](super) interface.
//!
//! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums.
use std::sync::Arc;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use cuprate_types::TransactionVerificationData;
use crate::{tx::TxEntry, types::TransactionHash};
use crate::{
tx::TxEntry,
types::{KeyImage, TransactionBlobHash, TransactionHash},
};
//---------------------------------------------------------------------------------------------------- TxpoolReadRequest
/// The transaction pool [`tower::Service`] read request type.
#[derive(Clone)]
pub enum TxpoolReadRequest {
/// A request for the blob (raw bytes) of a transaction with the given hash.
TxBlob(TransactionHash),
@ -16,6 +23,14 @@ pub enum TxpoolReadRequest {
/// A request for the [`TransactionVerificationData`] of a transaction in the tx pool.
TxVerificationData(TransactionHash),
/// A request to filter (remove) all **known** transactions from the set.
///
/// The hash is **not** the transaction hash, it is the hash of the serialized tx-blob.
FilterKnownTxBlobHashes(HashSet<TransactionBlobHash>),
/// A request to pull some transactions for an incoming block.
TxsForBlock(Vec<TransactionHash>),
/// Get information on all transactions in the pool.
Backlog,
@ -27,15 +42,28 @@ pub enum TxpoolReadRequest {
/// The transaction pool [`tower::Service`] read response type.
#[expect(clippy::large_enum_variant)]
pub enum TxpoolReadResponse {
/// Response to [`TxpoolReadRequest::TxBlob`].
///
/// The inner value is the raw bytes of a transaction.
// TODO: use bytes::Bytes.
TxBlob(Vec<u8>),
/// A response containing the raw bytes of a transaction.
TxBlob { tx_blob: Vec<u8>, state_stem: bool },
/// Response to [`TxpoolReadRequest::TxVerificationData`].
/// A response of [`TransactionVerificationData`].
TxVerificationData(TransactionVerificationData),
/// The response for [`TxpoolReadRequest::FilterKnownTxBlobHashes`].
FilterKnownTxBlobHashes {
/// The blob hashes that are unknown.
unknown_blob_hashes: HashSet<TransactionBlobHash>,
/// The tx hashes of the blob hashes that were known but were in the stem pool.
stem_pool_hashes: Vec<TransactionHash>,
},
/// The response for [`TxpoolReadRequest::TxsForBlock`].
TxsForBlock {
/// The txs we had in the txpool.
txs: HashMap<[u8; 32], TransactionVerificationData>,
/// The indexes of the missing txs.
missing: Vec<usize>,
},
/// Response to [`TxpoolReadRequest::Backlog`].
///
/// The inner `Vec` contains information on all
@ -69,6 +97,18 @@ pub enum TxpoolWriteRequest {
///
/// Returns [`TxpoolWriteResponse::Ok`].
RemoveTransaction(TransactionHash),
/// Promote a transaction from the stem pool to the fluff pool.
/// If the tx is already in the fluff pool this does nothing.
///
/// Returns [`TxpoolWriteResponse::Ok`].
Promote(TransactionHash),
/// Tell the tx-pool about a new block.
NewBlock {
/// The spent key images in the new block.
spent_key_images: Vec<KeyImage>,
},
}
//---------------------------------------------------------------------------------------------------- TxpoolWriteResponse

View file

@ -4,22 +4,24 @@
clippy::unnecessary_wraps,
reason = "TODO: finish implementing the signatures from <https://github.com/Cuprate/cuprate/pull/297>"
)]
use std::sync::Arc;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use rayon::ThreadPool;
use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner};
use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError};
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
use crate::{
ops::get_transaction_verification_data,
ops::{get_transaction_verification_data, in_stem_pool},
service::{
interface::{TxpoolReadRequest, TxpoolReadResponse},
types::{ReadResponseResult, TxpoolReadHandle},
},
tables::{OpenTables, TransactionBlobs},
types::TransactionHash,
tables::{KnownBlobHashes, OpenTables, TransactionBlobs, TransactionInfos},
types::{TransactionBlobHash, TransactionHash, TxStateFlags},
};
// TODO: update the docs here
@ -57,7 +59,6 @@ fn init_read_service_with_pool(env: Arc<ConcreteEnv>, pool: Arc<ThreadPool>) ->
/// 1. `Request` is mapped to a handler function
/// 2. Handler function is called
/// 3. [`TxpoolReadResponse`] is returned
#[expect(clippy::needless_pass_by_value)]
fn map_request(
env: &ConcreteEnv, // Access to the database
request: TxpoolReadRequest, // The request we must fulfill
@ -65,6 +66,10 @@ fn map_request(
match request {
TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash),
TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash),
TxpoolReadRequest::FilterKnownTxBlobHashes(blob_hashes) => {
filter_known_tx_blob_hashes(env, blob_hashes)
}
TxpoolReadRequest::TxsForBlock(txs_needed) => txs_for_block(env, txs_needed),
TxpoolReadRequest::Backlog => backlog(env),
TxpoolReadRequest::Size => size(env),
}
@ -94,10 +99,15 @@ fn tx_blob(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult {
let tx_ro = inner_env.tx_ro()?;
let tx_blobs_table = inner_env.open_db_ro::<TransactionBlobs>(&tx_ro)?;
let tx_infos_table = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
tx_blobs_table
.get(tx_hash)
.map(|blob| TxpoolReadResponse::TxBlob(blob.0))
let tx_blob = tx_blobs_table.get(tx_hash)?.0;
let tx_info = tx_infos_table.get(tx_hash)?;
Ok(TxpoolReadResponse::TxBlob {
tx_blob,
state_stem: tx_info.flags.contains(TxStateFlags::STATE_STEM),
})
}
/// [`TxpoolReadRequest::TxVerificationData`].
@ -111,6 +121,79 @@ fn tx_verification_data(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadRes
get_transaction_verification_data(tx_hash, &tables).map(TxpoolReadResponse::TxVerificationData)
}
/// [`TxpoolReadRequest::FilterKnownTxBlobHashes`].
fn filter_known_tx_blob_hashes(
env: &ConcreteEnv,
mut blob_hashes: HashSet<TransactionBlobHash>,
) -> ReadResponseResult {
let inner_env = env.env_inner();
let tx_ro = inner_env.tx_ro()?;
let tx_blob_hashes = inner_env.open_db_ro::<KnownBlobHashes>(&tx_ro)?;
let tx_infos = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
let mut stem_pool_hashes = Vec::new();
// A closure that returns if a tx with a certain blob hash is unknown.
// This also fills in `stem_tx_hashes`.
let mut tx_unknown = |blob_hash| -> Result<bool, RuntimeError> {
match tx_blob_hashes.get(&blob_hash) {
Ok(tx_hash) => {
if in_stem_pool(&tx_hash, &tx_infos)? {
stem_pool_hashes.push(tx_hash);
}
Ok(false)
}
Err(RuntimeError::KeyNotFound) => Ok(true),
Err(e) => Err(e),
}
};
let mut err = None;
blob_hashes.retain(|blob_hash| match tx_unknown(*blob_hash) {
Ok(res) => res,
Err(e) => {
err = Some(e);
false
}
});
if let Some(e) = err {
return Err(e);
}
Ok(TxpoolReadResponse::FilterKnownTxBlobHashes {
unknown_blob_hashes: blob_hashes,
stem_pool_hashes,
})
}
/// [`TxpoolReadRequest::TxsForBlock`].
fn txs_for_block(env: &ConcreteEnv, txs: Vec<TransactionHash>) -> ReadResponseResult {
let inner_env = env.env_inner();
let tx_ro = inner_env.tx_ro()?;
let tables = inner_env.open_tables(&tx_ro)?;
let mut missing_tx_indexes = Vec::with_capacity(txs.len());
let mut txs_verification_data = HashMap::with_capacity(txs.len());
for (i, tx_hash) in txs.into_iter().enumerate() {
match get_transaction_verification_data(&tx_hash, &tables) {
Ok(tx) => {
txs_verification_data.insert(tx_hash, tx);
}
Err(RuntimeError::KeyNotFound) => missing_tx_indexes.push(i),
Err(e) => return Err(e),
}
}
Ok(TxpoolReadResponse::TxsForBlock {
txs: txs_verification_data,
missing: missing_tx_indexes,
})
}
/// [`TxpoolReadRequest::Backlog`].
#[inline]
fn backlog(env: &ConcreteEnv) -> ReadResponseResult {

View file

@ -1,6 +1,6 @@
use std::sync::Arc;
use cuprate_database::{ConcreteEnv, Env, EnvInner, RuntimeError, TxRw};
use cuprate_database::{ConcreteEnv, DatabaseRo, DatabaseRw, Env, EnvInner, RuntimeError, TxRw};
use cuprate_database_service::DatabaseWriteHandle;
use cuprate_types::TransactionVerificationData;
@ -10,8 +10,8 @@ use crate::{
interface::{TxpoolWriteRequest, TxpoolWriteResponse},
types::TxpoolWriteHandle,
},
tables::OpenTables,
types::TransactionHash,
tables::{OpenTables, Tables, TransactionInfos},
types::{KeyImage, TransactionHash, TxStateFlags},
};
//---------------------------------------------------------------------------------------------------- init_write_service
@ -31,6 +31,8 @@ fn handle_txpool_request(
add_transaction(env, tx, *state_stem)
}
TxpoolWriteRequest::RemoveTransaction(tx_hash) => remove_transaction(env, tx_hash),
TxpoolWriteRequest::Promote(tx_hash) => promote(env, tx_hash),
TxpoolWriteRequest::NewBlock { spent_key_images } => new_block(env, spent_key_images),
}
}
@ -101,3 +103,58 @@ fn remove_transaction(
TxRw::commit(tx_rw)?;
Ok(TxpoolWriteResponse::Ok)
}
/// [`TxpoolWriteRequest::Promote`]
fn promote(
env: &ConcreteEnv,
tx_hash: &TransactionHash,
) -> Result<TxpoolWriteResponse, RuntimeError> {
let env_inner = env.env_inner();
let tx_rw = env_inner.tx_rw()?;
let mut tx_infos = env_inner.open_db_rw::<TransactionInfos>(&tx_rw)?;
tx_infos.update(tx_hash, |mut info| {
info.flags.remove(TxStateFlags::STATE_STEM);
Some(info)
})?;
drop(tx_infos);
TxRw::commit(tx_rw)?;
Ok(TxpoolWriteResponse::Ok)
}
fn new_block(
env: &ConcreteEnv,
spent_key_images: &[KeyImage],
) -> Result<TxpoolWriteResponse, RuntimeError> {
let env_inner = env.env_inner();
let tx_rw = env_inner.tx_rw()?;
// FIXME: use try blocks once stable.
let result = || {
let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
for key_image in spent_key_images {
match tables_mut
.spent_key_images()
.get(key_image)
.and_then(|tx_hash| ops::remove_transaction(&tx_hash, &mut tables_mut))
{
Ok(()) | Err(RuntimeError::KeyNotFound) => (),
Err(e) => return Err(e),
}
}
Ok(())
};
if let Err(e) = result() {
TxRw::abort(tx_rw)?;
return Err(e);
}
TxRw::commit(tx_rw)?;
Ok(TxpoolWriteResponse::Ok)
}

View file

@ -16,7 +16,9 @@
//! accessing _all_ tables defined here at once.
use cuprate_database::{define_tables, StorableVec};
use crate::types::{KeyImage, RawCachedVerificationState, TransactionHash, TransactionInfo};
use crate::types::{
KeyImage, RawCachedVerificationState, TransactionBlobHash, TransactionHash, TransactionInfo,
};
define_tables! {
/// Serialized transaction blobs.
@ -41,5 +43,9 @@ define_tables! {
///
/// This table contains the spent key images from all transactions in the pool.
3 => SpentKeyImages,
KeyImage => TransactionHash
KeyImage => TransactionHash,
/// Transaction blob hashes that are in the pool.
4 => KnownBlobHashes,
TransactionBlobHash => TransactionHash,
}

View file

@ -6,7 +6,6 @@
//!
//! <!-- FIXME: Add schema here or a link to it when complete -->
use bytemuck::{Pod, Zeroable};
use monero_serai::transaction::Timelock;
use cuprate_types::{CachedVerificationState, HardFork};
@ -17,6 +16,9 @@ pub type KeyImage = [u8; 32];
/// A transaction hash.
pub type TransactionHash = [u8; 32];
/// A transaction blob hash.
pub type TransactionBlobHash = [u8; 32];
bitflags::bitflags! {
/// Flags representing the state of the transaction in the pool.
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)]