From adf592e5309eba8740356541ac8dd43bd59849da Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Thu, 10 Oct 2024 20:12:18 +0100 Subject: [PATCH 01/12] init dandelion integration --- Cargo.lock | 1 + Cargo.toml | 1 + binaries/cuprated/Cargo.toml | 1 + binaries/cuprated/src/blockchain.rs | 2 +- binaries/cuprated/src/txpool.rs | 5 + binaries/cuprated/src/txpool/dandelion.rs | 38 +++ .../src/txpool/dandelion/diffuse_service.rs | 32 +++ .../src/txpool/dandelion/stem_service.rs | 54 ++++ .../cuprated/src/txpool/dandelion/tx_store.rs | 47 ++++ binaries/cuprated/src/txpool/incoming_tx.rs | 249 ++++++++++++++++++ binaries/cuprated/src/txpool/manager.rs | 1 + .../cuprated/src/txpool/txs_being_handled.rs | 48 ++++ p2p/dandelion-tower/src/router.rs | 6 + p2p/p2p/src/client_pool.rs | 12 +- storage/service/src/service/write.rs | 2 +- storage/txpool/src/service/interface.rs | 8 + 16 files changed, 504 insertions(+), 3 deletions(-) create mode 100644 binaries/cuprated/src/txpool/dandelion.rs create mode 100644 binaries/cuprated/src/txpool/dandelion/diffuse_service.rs create mode 100644 binaries/cuprated/src/txpool/dandelion/stem_service.rs create mode 100644 binaries/cuprated/src/txpool/dandelion/tx_store.rs create mode 100644 binaries/cuprated/src/txpool/incoming_tx.rs create mode 100644 binaries/cuprated/src/txpool/manager.rs create mode 100644 binaries/cuprated/src/txpool/txs_being_handled.rs diff --git a/Cargo.lock b/Cargo.lock index 5fea18d..5e597f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1008,6 +1008,7 @@ dependencies = [ "serde", "serde_bytes", "serde_json", + "sha3", "thiserror", "thread_local", "tokio", diff --git a/Cargo.toml b/Cargo.toml index fa348cc..1de62ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,7 @@ rayon = { version = "1.10.0", default-features = false } serde_bytes = { version = "0.11.15", default-features = false } serde_json = { version = "1.0.128", default-features = false } serde = { version = "1.0.210", default-features = false } +sha3 = { version = "0.10.8", default-features = false } thiserror = { version = "1.0.63", default-features = false } thread_local = { version = "1.1.8", default-features = false } tokio-util = { version = "0.7.12", default-features = false } diff --git a/binaries/cuprated/Cargo.toml b/binaries/cuprated/Cargo.toml index 325406b..62a1032 100644 --- a/binaries/cuprated/Cargo.toml +++ b/binaries/cuprated/Cargo.toml @@ -64,6 +64,7 @@ rayon = { workspace = true } serde_bytes = { workspace = true } serde_json = { workspace = true } serde = { workspace = true } +sha3 = { workspace = true, features = ["std"] } thiserror = { workspace = true } thread_local = { workspace = true } tokio-util = { workspace = true } diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index a06f3fa..c4b75e4 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -25,7 +25,7 @@ mod manager; mod syncer; mod types; -use types::{ +pub use types::{ ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle, }; diff --git a/binaries/cuprated/src/txpool.rs b/binaries/cuprated/src/txpool.rs index a6f05e7..af5a420 100644 --- a/binaries/cuprated/src/txpool.rs +++ b/binaries/cuprated/src/txpool.rs @@ -1,3 +1,8 @@ //! Transaction Pool //! //! Will handle initiating the tx-pool, providing the preprocessor required for the dandelion pool. + +mod dandelion; +mod incoming_tx; +mod manager; +mod txs_being_handled; diff --git a/binaries/cuprated/src/txpool/dandelion.rs b/binaries/cuprated/src/txpool/dandelion.rs new file mode 100644 index 0000000..47d1ca3 --- /dev/null +++ b/binaries/cuprated/src/txpool/dandelion.rs @@ -0,0 +1,38 @@ +use bytes::Bytes; +use cuprate_dandelion_tower::{DandelionConfig, DandelionRouter}; +use cuprate_p2p::NetworkInterface; +use cuprate_p2p_core::ClearNet; +use cuprate_wire::NetworkAddress; + +mod diffuse_service; +mod stem_service; +mod tx_store; + +struct DandelionTx(Bytes); + +type TxId = [u8; 32]; + +pub fn start_dandelion_router( + clear_net: NetworkInterface, +) -> DandelionRouter< + stem_service::OutboundPeerStream, + diffuse_service::DiffuseService, + NetworkAddress, + stem_service::StemPeerService, + DandelionTx, +> { + DandelionRouter::new( + diffuse_service::DiffuseService { + clear_net_broadcast_service: clear_net.broadcast_svc(), + }, + stem_service::OutboundPeerStream { + clear_net: clear_net.clone(), + }, + DandelionConfig { + time_between_hop: Default::default(), + epoch_duration: Default::default(), + fluff_probability: 0.0, + graph: Default::default(), + }, + ) +} diff --git a/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs b/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs new file mode 100644 index 0000000..57b7d29 --- /dev/null +++ b/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs @@ -0,0 +1,32 @@ +use std::task::{Context, Poll}; +use tower::Service; + +use crate::txpool::dandelion::DandelionTx; +use cuprate_dandelion_tower::traits::DiffuseRequest; +use cuprate_p2p::{BroadcastRequest, BroadcastSvc, NetworkInterface}; +use cuprate_p2p_core::ClearNet; + +pub struct DiffuseService { + pub clear_net_broadcast_service: BroadcastSvc, +} + +impl Service> for DiffuseService { + type Response = BroadcastSvc::Response; + type Error = tower::BoxError; + type Future = BroadcastSvc::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.clear_net_broadcast_service + .poll_ready(cx) + .map_err(Into::into) + } + + fn call(&mut self, req: DiffuseRequest) -> Self::Future { + self.clear_net_broadcast_service + .call(BroadcastRequest::Transaction { + tx_bytes: req.0 .0, + direction: None, + received_from: None, + }) + } +} diff --git a/binaries/cuprated/src/txpool/dandelion/stem_service.rs b/binaries/cuprated/src/txpool/dandelion/stem_service.rs new file mode 100644 index 0000000..6970ebf --- /dev/null +++ b/binaries/cuprated/src/txpool/dandelion/stem_service.rs @@ -0,0 +1,54 @@ +use super::DandelionTx; +use bytes::Bytes; +use cuprate_dandelion_tower::traits::StemRequest; +use cuprate_dandelion_tower::OutboundPeer; +use cuprate_p2p::NetworkInterface; +use cuprate_p2p_core::client::Client; +use cuprate_p2p_core::{ClearNet, NetworkZone, PeerRequest, ProtocolRequest}; +use cuprate_wire::protocol::NewTransactions; +use cuprate_wire::NetworkAddress; +use futures::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tower::Service; + +pub struct OutboundPeerStream { + pub clear_net: NetworkInterface, +} + +impl Stream for OutboundPeerStream { + type Item = Result>, tower::BoxError>; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Some(Ok(self + .clear_net + .client_pool() + .outbound_client() + .map_or(OutboundPeer::Exhausted, |client| { + OutboundPeer::Peer(client.info.id.into(), StemPeerService(client)) + })))) + } +} + +pub struct StemPeerService(Client); + +impl Service> for StemPeerService { + type Response = (); + type Error = tower::BoxError; + type Future = Client::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_ready(cx) + } + + fn call(&mut self, req: StemRequest) -> Self::Future { + self.0 + .call(PeerRequest::Protocol(ProtocolRequest::NewTransactions( + NewTransactions { + txs: vec![req.0 .0], + dandelionpp_fluff: false, + padding: Bytes::new(), + }, + ))) + } +} diff --git a/binaries/cuprated/src/txpool/dandelion/tx_store.rs b/binaries/cuprated/src/txpool/dandelion/tx_store.rs new file mode 100644 index 0000000..98de764 --- /dev/null +++ b/binaries/cuprated/src/txpool/dandelion/tx_store.rs @@ -0,0 +1,47 @@ +use crate::txpool::dandelion::{DandelionTx, TxId}; +use bytes::Bytes; +use cuprate_dandelion_tower::traits::{TxStoreRequest, TxStoreResponse}; +use cuprate_database::RuntimeError; +use cuprate_txpool::service::interface::{TxpoolReadRequest, TxpoolReadResponse}; +use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; +use futures::future::BoxFuture; +use futures::{FutureExt, StreamExt, TryFutureExt}; +use std::task::{Context, Poll}; +use tower::util::Oneshot; +use tower::{Service, ServiceExt}; + +pub struct TxStoreService { + txpool_read_handle: TxpoolReadHandle, + txpool_write_handle: TxpoolWriteHandle, +} + +impl Service> for TxStoreService { + type Response = TxStoreResponse; + type Error = tower::BoxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: TxStoreRequest) -> Self::Future { + match req { + TxStoreRequest::Get(tx_id) => self + .txpool_read_handle + .clone() + .oneshot(TxpoolReadRequest::TxBlob(tx_id)) + .map(|res| match res { + Ok(TxpoolReadResponse::TxBlob(blob)) => Ok(TxStoreResponse::Transaction(Some( + (DandelionTx(Bytes::from(blob)), todo!()), + ))), + Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Transaction(None)), + Err(e) => Err(e.into()), + Ok(_) => unreachable!(), + }) + .boxed(), + TxStoreRequest::Promote(tx_id) => { + todo!() + } + } + } +} diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs new file mode 100644 index 0000000..37202b0 --- /dev/null +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -0,0 +1,249 @@ +use std::collections::HashSet; +use std::future::ready; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use crate::blockchain::ConcreteTxVerifierService; +use crate::txpool::txs_being_handled::{tx_blob_hash, TxBeingHandledLocally, TxsBeingHandled}; +use bytes::Bytes; +use cuprate_consensus::transactions::new_tx_verification_data; +use cuprate_consensus::{ + BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, + ExtendedConsensusError, TxVerifierService, VerifyTxRequest, VerifyTxResponse, +}; +use cuprate_dandelion_tower::pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder}; +use cuprate_dandelion_tower::TxState; +use cuprate_helper::asynch::rayon_spawn_async; +use cuprate_txpool::service::interface::{ + TxpoolReadRequest, TxpoolWriteRequest, TxpoolWriteResponse, +}; +use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; +use cuprate_wire::NetworkAddress; +use dashmap::DashSet; +use futures::future::BoxFuture; +use futures::FutureExt; +use monero_serai::transaction::Transaction; +use sha3::{Digest, Sha3_256}; +use tower::{Service, ServiceExt}; + +pub enum IncomingTxError { + Parse(std::io::Error), + Consensus(ExtendedConsensusError), + DuplicateTransaction, +} + +pub enum IncomingTxs { + Bytes { + txs: Vec, + state: TxState, + }, +} + +struct DandelionTx(Bytes); + +type TxId = [u8; 32]; + +pub struct IncomingTxHandler { + txs_being_added: Arc, + + blockchain_context_cache: BlockChainContextService, + + dandelion_pool_manager: DandelionPoolService, + tx_verifier_service: ConcreteTxVerifierService, + + txpool_write_handle: TxpoolWriteHandle, + + txpool_read_handle: TxpoolReadHandle, +} + +impl Service for IncomingTxHandler { + type Response = (); + type Error = IncomingTxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: IncomingTxs) -> Self::Future { + let IncomingTxs::Bytes { mut txs, state } = req; + + let mut local_tracker = self.txs_being_added.local_tracker(); + + txs.retain(|bytes| local_tracker.try_add_tx(bytes.as_ref())); + + if txs.is_empty() { + return ready(Ok(())).boxed(); + } + + let mut blockchain_context_cache = self.blockchain_context_cache.clone(); + let mut tx_verifier_service = self.tx_verifier_service.clone(); + let mut txpool_write_handle = self.txpool_write_handle.clone(); + + async move { + let txs = rayon_spawn_async(move || { + txs.into_iter() + .map(|bytes| { + let tx = Transaction::read(&mut bytes.as_ref()) + .map_err(IncomingTxError::Parse)?; + + let tx = new_tx_verification_data(tx) + .map_err(|e| IncomingTxError::Consensus(e.into()))?; + + Ok(Arc::new(tx)) + }) + .collect::, IncomingTxError>>() + }) + .await?; + + let BlockChainContextResponse::Context(context) = blockchain_context_cache + .ready() + .await? + .call(BlockChainContextRequest::GetContext) + .await? + else { + unreachable!() + }; + + let context = context.unchecked_blockchain_context(); + + tx_verifier_service + .ready() + .await? + .call(VerifyTxRequest::Prepped { + txs: txs.clone(), + current_chain_height: context.chain_height, + top_hash: context.top_hash, + time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), + hf: context.current_hf, + }) + .await?; + + txpool_write_handle + .ready() + .await? + .call(TxpoolWriteRequest::AddTransaction { + tx, + state_stem: state.state_stem(), + }) + .await; + + todo!() + } + .boxed() + } +} + +async fn handle_incoming_txs( + txs: Vec, + state: TxState, + tx_being_handled_locally: TxBeingHandledLocally, + mut blockchain_context_cache: BlockChainContextService, + mut tx_verifier_service: ConcreteTxVerifierService, + mut txpool_write_handle: TxpoolWriteHandle, + mut txpool_read_handle: TxpoolReadHandle, + mut dandelion_pool_manager: DandelionPoolService, +) -> Result<(), IncomingTxError> { + let mut tx_blob_hashes = HashSet::new(); + + let txs = txs + .into_iter() + .map(|tx_blob| { + let tx_blob_hash = tx_blob_hash(tx_blob.as_ref()); + if !tx_blob_hashes.insert(tx_blob_hash) { + return Err(IncomingTxError::DuplicateTransaction); + } + + Ok((tx_blob_hash, tx_blob)) + }) + .collect::, _>>()?; + + let TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes) = txpool_read_handle + .ready() + .await? + .call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes)) + .await? + else { + unreachable!() + }; + + let txs = rayon_spawn_async(move || { + txs.into_iter() + .filter_map(|(tx_blob_hash, tx_blob)| { + if tx_blob_hashes.contains(&tx_blob_hash) { + Some(tx_blob) + } else { + None + } + }) + .map(|bytes| { + let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?; + + let tx = new_tx_verification_data(tx) + .map_err(|e| IncomingTxError::Consensus(e.into()))?; + + Ok(Arc::new(tx)) + }) + .collect::, IncomingTxError>>() + }) + .await?; + + let BlockChainContextResponse::Context(context) = blockchain_context_cache + .ready() + .await? + .call(BlockChainContextRequest::GetContext) + .await? + else { + unreachable!() + }; + + let context = context.unchecked_blockchain_context(); + + tx_verifier_service + .ready() + .await? + .call(VerifyTxRequest::Prepped { + txs: txs.clone(), + current_chain_height: context.chain_height, + top_hash: context.top_hash, + time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), + hf: context.current_hf, + }) + .await?; + + for tx in txs { + let incoming_tx = IncomingTxBuilder::new(Bytes::copy_from_slice(&tx.tx_blob), tx.tx_hash); + + let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle + .ready() + .await? + .call(TxpoolWriteRequest::AddTransaction { + tx, + state_stem: state.state_stem(), + }) + .await? + else { + unreachable!() + }; + + // TODO: track double spends to quickly ignore them from their blob hash. + if let Some(tx_hash) = double_spend { + continue; + }; + + // TODO: check blockchain for double spends to prevent a race condition. + + // TODO: fill this in properly. + let incoming_tx = incoming_tx + .with_routing_state(state.clone()) + .with_state_in_db(None) + .build() + .unwrap(); + + dandelion_pool_manager + .ready() + .await? + .call(incoming_tx) + .await?; + } +} diff --git a/binaries/cuprated/src/txpool/manager.rs b/binaries/cuprated/src/txpool/manager.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/binaries/cuprated/src/txpool/manager.rs @@ -0,0 +1 @@ + diff --git a/binaries/cuprated/src/txpool/txs_being_handled.rs b/binaries/cuprated/src/txpool/txs_being_handled.rs new file mode 100644 index 0000000..3973e7a --- /dev/null +++ b/binaries/cuprated/src/txpool/txs_being_handled.rs @@ -0,0 +1,48 @@ +use dashmap::DashSet; +use sha3::{Digest, Sha3_256}; +use std::sync::Arc; + +pub fn tx_blob_hash(tx_bytes: &[u8]) -> [u8; 32] { + let mut hasher = Sha3_256::new(); + hasher.update(tx_bytes); + hasher.finalize().into() +} +#[derive(Clone)] +pub struct TxsBeingHandled(Arc>); + +impl TxsBeingHandled { + pub fn local_tracker(&self) -> TxBeingHandledLocally { + TxBeingHandledLocally { + txs_being_handled: self.clone(), + txs: vec![], + } + } +} + +pub struct TxBeingHandledLocally { + txs_being_handled: TxsBeingHandled, + txs: Vec<[u8; 32]>, +} + +impl TxBeingHandledLocally { + pub fn try_add_tx(&mut self, tx_bytes: &[u8]) -> bool { + let mut hasher = Sha3_256::new(); + hasher.update(tx_bytes); + let tx_blob_hash = hasher.finalize().into(); + + if !self.txs_being_handled.0.insert(tx_blob_hash) { + return false; + } + + self.txs.push(tx_blob_hash); + true + } +} + +impl Drop for TxBeingHandledLocally { + fn drop(&mut self) { + for hash in &self.txs { + self.txs_being_handled.0.remove(hash); + } + } +} diff --git a/p2p/dandelion-tower/src/router.rs b/p2p/dandelion-tower/src/router.rs index 88702be..c04dcae 100644 --- a/p2p/dandelion-tower/src/router.rs +++ b/p2p/dandelion-tower/src/router.rs @@ -73,6 +73,12 @@ pub enum TxState { Local, } +impl TxState { + pub const fn state_stem(&self) -> bool { + matches!(self, Self::Local | Self::Stem { .. }) + } +} + /// A request to route a transaction. pub struct DandelionRouteReq { /// The transaction. diff --git a/p2p/p2p/src/client_pool.rs b/p2p/p2p/src/client_pool.rs index fc97fc1..25dd242 100644 --- a/p2p/p2p/src/client_pool.rs +++ b/p2p/p2p/src/client_pool.rs @@ -18,7 +18,7 @@ use tracing::{Instrument, Span}; use cuprate_p2p_core::{ client::{Client, InternalPeerID}, handles::ConnectionHandle, - NetworkZone, + ConnectionDirection, NetworkZone, }; pub(crate) mod disconnect_monitor; @@ -165,6 +165,16 @@ impl ClientPool { sync_data.cumulative_difficulty() > cumulative_difficulty }) } + + pub fn outbound_client(&self) -> Option> { + let client = self + .clients + .iter() + .find(|element| element.value().info.direction == ConnectionDirection::Outbound)?; + let id = *client.key(); + + Some(self.clients.remove(&id).unwrap().1) + } } mod sealed { diff --git a/storage/service/src/service/write.rs b/storage/service/src/service/write.rs index f75d615..3914f22 100644 --- a/storage/service/src/service/write.rs +++ b/storage/service/src/service/write.rs @@ -21,7 +21,7 @@ const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter"); /// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`] /// will return an `async`hronous channel that can be `.await`ed upon /// to receive the corresponding response. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DatabaseWriteHandle { /// Sender channel to the database write thread-pool. /// diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index 450b28d..22a09cb 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -1,6 +1,8 @@ //! Tx-pool [`service`](super) interface. //! //! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums. + +use std::collections::HashSet; use std::sync::Arc; use cuprate_types::TransactionVerificationData; @@ -9,11 +11,14 @@ use crate::types::TransactionHash; //---------------------------------------------------------------------------------------------------- TxpoolReadRequest /// The transaction pool [`tower::Service`] read request type. +#[derive(Clone)] pub enum TxpoolReadRequest { /// A request for the blob (raw bytes) of a transaction with the given hash. TxBlob(TransactionHash), /// A request for the [`TransactionVerificationData`] of a transaction in the tx pool. TxVerificationData(TransactionHash), + + FilterKnownTxBlobHashes(HashSet), } //---------------------------------------------------------------------------------------------------- TxpoolReadResponse @@ -25,10 +30,13 @@ pub enum TxpoolReadResponse { TxBlob(Vec), /// A response of [`TransactionVerificationData`]. TxVerificationData(TransactionVerificationData), + + FilterKnownTxBlobHashes(HashSet), } //---------------------------------------------------------------------------------------------------- TxpoolWriteRequest /// The transaction pool [`tower::Service`] write request type. +#[derive(Clone)] pub enum TxpoolWriteRequest { /// Add a transaction to the pool. /// From d2c7e49e8014e34c4971d69d1ad29c59f1818cb1 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Thu, 10 Oct 2024 21:14:22 +0100 Subject: [PATCH 02/12] add dandelion start function --- binaries/cuprated/src/txpool/dandelion.rs | 32 ++++++++++++++++--- .../cuprated/src/txpool/dandelion/tx_store.rs | 4 +-- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/binaries/cuprated/src/txpool/dandelion.rs b/binaries/cuprated/src/txpool/dandelion.rs index 47d1ca3..27f1f79 100644 --- a/binaries/cuprated/src/txpool/dandelion.rs +++ b/binaries/cuprated/src/txpool/dandelion.rs @@ -1,26 +1,50 @@ use bytes::Bytes; +use cuprate_dandelion_tower::pool::DandelionPoolService; use cuprate_dandelion_tower::{DandelionConfig, DandelionRouter}; use cuprate_p2p::NetworkInterface; use cuprate_p2p_core::ClearNet; +use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; use cuprate_wire::NetworkAddress; mod diffuse_service; mod stem_service; mod tx_store; +#[derive(Clone)] struct DandelionTx(Bytes); type TxId = [u8; 32]; -pub fn start_dandelion_router( - clear_net: NetworkInterface, -) -> DandelionRouter< +type ConcreteDandelionRouter = DandelionRouter< stem_service::OutboundPeerStream, diffuse_service::DiffuseService, NetworkAddress, stem_service::StemPeerService, DandelionTx, -> { +>; + +pub fn start_dandelion_pool_manager( + router: ConcreteDandelionRouter, + txpool_read_handle: TxpoolReadHandle, + txpool_write_handle: TxpoolWriteHandle, +) -> DandelionPoolService { + cuprate_dandelion_tower::pool::start_dandelion_pool_manager( + 12, + router, + tx_store::TxStoreService { + txpool_read_handle, + txpool_write_handle, + }, + DandelionConfig { + time_between_hop: Default::default(), + epoch_duration: Default::default(), + fluff_probability: 0.0, + graph: Default::default(), + }, + ) +} + +pub fn dandelion_router(clear_net: NetworkInterface) -> ConcreteDandelionRouter { DandelionRouter::new( diffuse_service::DiffuseService { clear_net_broadcast_service: clear_net.broadcast_svc(), diff --git a/binaries/cuprated/src/txpool/dandelion/tx_store.rs b/binaries/cuprated/src/txpool/dandelion/tx_store.rs index 98de764..29f719e 100644 --- a/binaries/cuprated/src/txpool/dandelion/tx_store.rs +++ b/binaries/cuprated/src/txpool/dandelion/tx_store.rs @@ -11,8 +11,8 @@ use tower::util::Oneshot; use tower::{Service, ServiceExt}; pub struct TxStoreService { - txpool_read_handle: TxpoolReadHandle, - txpool_write_handle: TxpoolWriteHandle, + pub txpool_read_handle: TxpoolReadHandle, + pub txpool_write_handle: TxpoolWriteHandle, } impl Service> for TxStoreService { From b6d94cf78017109d55f670ddc198f925947da75d Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sun, 13 Oct 2024 01:22:20 +0100 Subject: [PATCH 03/12] finish incoming tx handler --- binaries/cuprated/Cargo.toml | 2 +- binaries/cuprated/src/txpool.rs | 1 - binaries/cuprated/src/txpool/dandelion.rs | 31 +- .../src/txpool/dandelion/diffuse_service.rs | 21 +- .../src/txpool/dandelion/stem_service.rs | 41 +- .../cuprated/src/txpool/dandelion/tx_store.rs | 63 +++- binaries/cuprated/src/txpool/incoming_tx.rs | 357 ++++++++++-------- binaries/cuprated/src/txpool/manager.rs | 1 - .../cuprated/src/txpool/txs_being_handled.rs | 7 +- p2p/dandelion-tower/src/router.rs | 2 +- storage/service/src/service/write.rs | 10 +- storage/txpool/src/service/interface.rs | 27 +- storage/txpool/src/service/read.rs | 1 + storage/txpool/src/types.rs | 3 + 14 files changed, 328 insertions(+), 239 deletions(-) delete mode 100644 binaries/cuprated/src/txpool/manager.rs diff --git a/binaries/cuprated/Cargo.toml b/binaries/cuprated/Cargo.toml index 62a1032..c8a4247 100644 --- a/binaries/cuprated/Cargo.toml +++ b/binaries/cuprated/Cargo.toml @@ -20,7 +20,7 @@ cuprate-levin = { path = "../../net/levin" } cuprate-wire = { path = "../../net/wire" } cuprate-p2p = { path = "../../p2p/p2p" } cuprate-p2p-core = { path = "../../p2p/p2p-core" } -cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower" } +cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower", features = ["txpool"] } cuprate-async-buffer = { path = "../../p2p/async-buffer" } cuprate-address-book = { path = "../../p2p/address-book" } cuprate-blockchain = { path = "../../storage/blockchain", features = ["service"] } diff --git a/binaries/cuprated/src/txpool.rs b/binaries/cuprated/src/txpool.rs index af5a420..7973102 100644 --- a/binaries/cuprated/src/txpool.rs +++ b/binaries/cuprated/src/txpool.rs @@ -4,5 +4,4 @@ mod dandelion; mod incoming_tx; -mod manager; mod txs_being_handled; diff --git a/binaries/cuprated/src/txpool/dandelion.rs b/binaries/cuprated/src/txpool/dandelion.rs index 27f1f79..07fd479 100644 --- a/binaries/cuprated/src/txpool/dandelion.rs +++ b/binaries/cuprated/src/txpool/dandelion.rs @@ -1,6 +1,8 @@ +use std::time::Duration; + use bytes::Bytes; use cuprate_dandelion_tower::pool::DandelionPoolService; -use cuprate_dandelion_tower::{DandelionConfig, DandelionRouter}; +use cuprate_dandelion_tower::{DandelionConfig, DandelionRouter, Graph}; use cuprate_p2p::NetworkInterface; use cuprate_p2p_core::ClearNet; use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; @@ -11,10 +13,17 @@ mod stem_service; mod tx_store; #[derive(Clone)] -struct DandelionTx(Bytes); +pub struct DandelionTx(Bytes); type TxId = [u8; 32]; +const DANDELION_CONFIG: DandelionConfig = DandelionConfig { + time_between_hop: Duration::from_millis(175), + epoch_duration: Duration::from_secs(10 * 60), + fluff_probability: 0.12, + graph: Graph::FourRegular, +}; + type ConcreteDandelionRouter = DandelionRouter< stem_service::OutboundPeerStream, diffuse_service::DiffuseService, @@ -35,12 +44,7 @@ pub fn start_dandelion_pool_manager( txpool_read_handle, txpool_write_handle, }, - DandelionConfig { - time_between_hop: Default::default(), - epoch_duration: Default::default(), - fluff_probability: 0.0, - graph: Default::default(), - }, + DANDELION_CONFIG, ) } @@ -49,14 +53,7 @@ pub fn dandelion_router(clear_net: NetworkInterface) -> ConcreteDandel diffuse_service::DiffuseService { clear_net_broadcast_service: clear_net.broadcast_svc(), }, - stem_service::OutboundPeerStream { - clear_net: clear_net.clone(), - }, - DandelionConfig { - time_between_hop: Default::default(), - epoch_duration: Default::default(), - fluff_probability: 0.0, - graph: Default::default(), - }, + stem_service::OutboundPeerStream { clear_net }, + DANDELION_CONFIG, ) } diff --git a/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs b/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs index 57b7d29..115799d 100644 --- a/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs +++ b/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs @@ -1,19 +1,26 @@ -use std::task::{Context, Poll}; +use std::{ + future::{ready, Ready}, + task::{Context, Poll}, +}; + +use futures::FutureExt; use tower::Service; -use crate::txpool::dandelion::DandelionTx; use cuprate_dandelion_tower::traits::DiffuseRequest; use cuprate_p2p::{BroadcastRequest, BroadcastSvc, NetworkInterface}; use cuprate_p2p_core::ClearNet; +use super::DandelionTx; + +/// The dandelion diffusion service. pub struct DiffuseService { pub clear_net_broadcast_service: BroadcastSvc, } impl Service> for DiffuseService { - type Response = BroadcastSvc::Response; + type Response = (); type Error = tower::BoxError; - type Future = BroadcastSvc::Future; + type Future = Ready>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.clear_net_broadcast_service @@ -22,11 +29,17 @@ impl Service> for DiffuseService { } fn call(&mut self, req: DiffuseRequest) -> Self::Future { + // TODO: Call `into_inner` when 1.82.0 stabilizes self.clear_net_broadcast_service .call(BroadcastRequest::Transaction { tx_bytes: req.0 .0, direction: None, received_from: None, }) + .now_or_never() + .unwrap() + .expect("Broadcast service is Infallible"); + + ready(Ok(())) } } diff --git a/binaries/cuprated/src/txpool/dandelion/stem_service.rs b/binaries/cuprated/src/txpool/dandelion/stem_service.rs index 6970ebf..330c884 100644 --- a/binaries/cuprated/src/txpool/dandelion/stem_service.rs +++ b/binaries/cuprated/src/txpool/dandelion/stem_service.rs @@ -1,17 +1,23 @@ -use super::DandelionTx; -use bytes::Bytes; -use cuprate_dandelion_tower::traits::StemRequest; -use cuprate_dandelion_tower::OutboundPeer; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer}; use cuprate_p2p::NetworkInterface; -use cuprate_p2p_core::client::Client; -use cuprate_p2p_core::{ClearNet, NetworkZone, PeerRequest, ProtocolRequest}; -use cuprate_wire::protocol::NewTransactions; -use cuprate_wire::NetworkAddress; +use cuprate_p2p_core::{ + client::{Client, InternalPeerID}, + ClearNet, NetworkZone, PeerRequest, ProtocolRequest, +}; +use cuprate_wire::{protocol::NewTransactions, NetworkAddress}; + +use bytes::Bytes; use futures::Stream; -use std::pin::Pin; -use std::task::{Context, Poll}; use tower::Service; +use super::DandelionTx; + +/// The dandelion outbound peer stream. pub struct OutboundPeerStream { pub clear_net: NetworkInterface, } @@ -20,22 +26,29 @@ impl Stream for OutboundPeerStream { type Item = Result>, tower::BoxError>; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + // TODO: make the outbound peer choice random. Poll::Ready(Some(Ok(self .clear_net .client_pool() .outbound_client() .map_or(OutboundPeer::Exhausted, |client| { - OutboundPeer::Peer(client.info.id.into(), StemPeerService(client)) + let addr = match client.info.id { + InternalPeerID::KnownAddr(addr) => addr, + InternalPeerID::Unknown(_) => panic!("Outbound peer had an unknown address"), + }; + + OutboundPeer::Peer(addr.into(), StemPeerService(client)) })))) } } -pub struct StemPeerService(Client); +/// The stem service, used to send stem txs. +pub struct StemPeerService(Client); impl Service> for StemPeerService { - type Response = (); + type Response = as Service>::Response; type Error = tower::BoxError; - type Future = Client::Future; + type Future = as Service>::Future; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.0.poll_ready(cx) diff --git a/binaries/cuprated/src/txpool/dandelion/tx_store.rs b/binaries/cuprated/src/txpool/dandelion/tx_store.rs index 29f719e..f13a646 100644 --- a/binaries/cuprated/src/txpool/dandelion/tx_store.rs +++ b/binaries/cuprated/src/txpool/dandelion/tx_store.rs @@ -1,15 +1,27 @@ -use crate::txpool::dandelion::{DandelionTx, TxId}; -use bytes::Bytes; -use cuprate_dandelion_tower::traits::{TxStoreRequest, TxStoreResponse}; -use cuprate_database::RuntimeError; -use cuprate_txpool::service::interface::{TxpoolReadRequest, TxpoolReadResponse}; -use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; -use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt, TryFutureExt}; -use std::task::{Context, Poll}; -use tower::util::Oneshot; -use tower::{Service, ServiceExt}; +use std::{ + f32::consts::E, + task::{Context, Poll}, +}; +use bytes::Bytes; +use futures::{future::BoxFuture, FutureExt, StreamExt, TryFutureExt}; +use tower::{util::Oneshot, Service, ServiceExt}; + +use cuprate_dandelion_tower::{ + traits::{TxStoreRequest, TxStoreResponse}, + State, +}; +use cuprate_database::RuntimeError; +use cuprate_txpool::service::{ + interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse}, + TxpoolReadHandle, TxpoolWriteHandle, +}; + +use super::{DandelionTx, TxId}; + +/// The dandelion tx-store service. +/// +/// This is just mapping the interface [`cuprate_dandelion_tower`] wants to what [`cuprate_txpool`] provides. pub struct TxStoreService { pub txpool_read_handle: TxpoolReadHandle, pub txpool_write_handle: TxpoolWriteHandle, @@ -31,17 +43,34 @@ impl Service> for TxStoreService { .clone() .oneshot(TxpoolReadRequest::TxBlob(tx_id)) .map(|res| match res { - Ok(TxpoolReadResponse::TxBlob(blob)) => Ok(TxStoreResponse::Transaction(Some( - (DandelionTx(Bytes::from(blob)), todo!()), - ))), + Ok(TxpoolReadResponse::TxBlob { + tx_blob, + state_stem, + }) => { + let state = if state_stem { + State::Stem + } else { + State::Fluff + }; + + Ok(TxStoreResponse::Transaction(Some(( + DandelionTx(Bytes::from(tx_blob)), + state, + )))) + } Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Transaction(None)), Err(e) => Err(e.into()), Ok(_) => unreachable!(), }) .boxed(), - TxStoreRequest::Promote(tx_id) => { - todo!() - } + TxStoreRequest::Promote(tx_id) => self + .txpool_write_handle + .oneshot(TxpoolWriteRequest::Promote(tx_id)) + .map(|res| match res { + Ok(_) | Err(RuntimeError::KeyNotFound) => TxStoreResponse::Ok, + Err(e) => Err(e.into()), + }) + .boxed(), } } } diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index 37202b0..abac69d 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -1,58 +1,76 @@ -use std::collections::HashSet; -use std::future::ready; -use std::sync::Arc; -use std::task::{Context, Poll}; +use std::{ + collections::HashSet, + future::ready, + sync::Arc, + task::{Context, Poll}, +}; -use crate::blockchain::ConcreteTxVerifierService; -use crate::txpool::txs_being_handled::{tx_blob_hash, TxBeingHandledLocally, TxsBeingHandled}; use bytes::Bytes; -use cuprate_consensus::transactions::new_tx_verification_data; -use cuprate_consensus::{ - BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, - ExtendedConsensusError, TxVerifierService, VerifyTxRequest, VerifyTxResponse, -}; -use cuprate_dandelion_tower::pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder}; -use cuprate_dandelion_tower::TxState; -use cuprate_helper::asynch::rayon_spawn_async; -use cuprate_txpool::service::interface::{ - TxpoolReadRequest, TxpoolWriteRequest, TxpoolWriteResponse, -}; -use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; -use cuprate_wire::NetworkAddress; use dashmap::DashSet; -use futures::future::BoxFuture; -use futures::FutureExt; +use futures::{future::BoxFuture, FutureExt}; use monero_serai::transaction::Transaction; use sha3::{Digest, Sha3_256}; use tower::{Service, ServiceExt}; +use cuprate_consensus::{ + transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse, + BlockChainContextService, ExtendedConsensusError, TxVerifierService, VerifyTxRequest, + VerifyTxResponse, +}; +use cuprate_dandelion_tower::{ + pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder}, + TxState, +}; +use cuprate_helper::asynch::rayon_spawn_async; +use cuprate_txpool::service::{ + interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse}, + TxpoolReadHandle, TxpoolWriteHandle, +}; +use cuprate_types::TransactionVerificationData; +use cuprate_wire::NetworkAddress; + +use crate::{ + blockchain::ConcreteTxVerifierService, + constants::PANIC_CRITICAL_SERVICE_ERROR, + signals::REORG_LOCK, + txpool::txs_being_handled::{tx_blob_hash, TxBeingHandledLocally, TxsBeingHandled}, +}; + +/// An error that can happen handling an incoming tx. pub enum IncomingTxError { Parse(std::io::Error), Consensus(ExtendedConsensusError), DuplicateTransaction, } -pub enum IncomingTxs { - Bytes { - txs: Vec, - state: TxState, - }, +/// Incoming transactions. +pub struct IncomingTxs { + pub txs: Vec, + pub state: TxState, } +/// The transaction type used for dandelion++. +#[derive(Clone)] struct DandelionTx(Bytes); +/// A transaction ID/hash. type TxId = [u8; 32]; +/// The service than handles incoming transaction pool transactions. +/// +/// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes. pub struct IncomingTxHandler { - txs_being_added: Arc, - + /// A store of txs currently being handled in incoming tx requests. + txs_being_handled: TxsBeingHandled, + /// The blockchain context cache. blockchain_context_cache: BlockChainContextService, - + /// The dandelion txpool manager. dandelion_pool_manager: DandelionPoolService, + /// The transaction verifier service. tx_verifier_service: ConcreteTxVerifierService, - + /// The txpool write handle. txpool_write_handle: TxpoolWriteHandle, - + /// The txpool read handle. txpool_read_handle: TxpoolReadHandle, } @@ -66,70 +84,18 @@ impl Service for IncomingTxHandler { } fn call(&mut self, req: IncomingTxs) -> Self::Future { - let IncomingTxs::Bytes { mut txs, state } = req; + let IncomingTxs::Bytes { txs, state } = req; - let mut local_tracker = self.txs_being_added.local_tracker(); - - txs.retain(|bytes| local_tracker.try_add_tx(bytes.as_ref())); - - if txs.is_empty() { - return ready(Ok(())).boxed(); - } - - let mut blockchain_context_cache = self.blockchain_context_cache.clone(); - let mut tx_verifier_service = self.tx_verifier_service.clone(); - let mut txpool_write_handle = self.txpool_write_handle.clone(); - - async move { - let txs = rayon_spawn_async(move || { - txs.into_iter() - .map(|bytes| { - let tx = Transaction::read(&mut bytes.as_ref()) - .map_err(IncomingTxError::Parse)?; - - let tx = new_tx_verification_data(tx) - .map_err(|e| IncomingTxError::Consensus(e.into()))?; - - Ok(Arc::new(tx)) - }) - .collect::, IncomingTxError>>() - }) - .await?; - - let BlockChainContextResponse::Context(context) = blockchain_context_cache - .ready() - .await? - .call(BlockChainContextRequest::GetContext) - .await? - else { - unreachable!() - }; - - let context = context.unchecked_blockchain_context(); - - tx_verifier_service - .ready() - .await? - .call(VerifyTxRequest::Prepped { - txs: txs.clone(), - current_chain_height: context.chain_height, - top_hash: context.top_hash, - time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), - hf: context.current_hf, - }) - .await?; - - txpool_write_handle - .ready() - .await? - .call(TxpoolWriteRequest::AddTransaction { - tx, - state_stem: state.state_stem(), - }) - .await; - - todo!() - } + handle_incoming_txs( + txs, + state, + self.txs_being_handled.clone(), + self.blockchain_context_cache.clone(), + self.tx_verifier_service.clone(), + self.txpool_write_handle.clone(), + self.txpool_read_handle.clone(), + self.dandelion_pool_manager.clone(), + ) .boxed() } } @@ -137,38 +103,106 @@ impl Service for IncomingTxHandler { async fn handle_incoming_txs( txs: Vec, state: TxState, - tx_being_handled_locally: TxBeingHandledLocally, + txs_being_handled: TxsBeingHandled, mut blockchain_context_cache: BlockChainContextService, mut tx_verifier_service: ConcreteTxVerifierService, mut txpool_write_handle: TxpoolWriteHandle, mut txpool_read_handle: TxpoolReadHandle, mut dandelion_pool_manager: DandelionPoolService, ) -> Result<(), IncomingTxError> { - let mut tx_blob_hashes = HashSet::new(); + let reorg_guard = REORG_LOCK.read().await; - let txs = txs - .into_iter() - .map(|tx_blob| { - let tx_blob_hash = tx_blob_hash(tx_blob.as_ref()); - if !tx_blob_hashes.insert(tx_blob_hash) { - return Err(IncomingTxError::DuplicateTransaction); - } + let (txs, txs_being_handled_guard) = + prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?; - Ok((tx_blob_hash, tx_blob)) - }) - .collect::, _>>()?; - - let TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes) = txpool_read_handle + let BlockChainContextResponse::Context(context) = blockchain_context_cache .ready() - .await? - .call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes)) - .await? + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockChainContextRequest::GetContext) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) else { unreachable!() }; - let txs = rayon_spawn_async(move || { - txs.into_iter() + let context = context.unchecked_blockchain_context(); + + tx_verifier_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(VerifyTxRequest::Prepped { + txs: txs.clone(), + current_chain_height: context.chain_height, + top_hash: context.top_hash, + time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), + hf: context.current_hf, + }) + .await + .map_err(IncomingTxError::Consensus)?; + + for tx in txs { + handle_valid_tx( + tx, + state.clone(), + &mut txpool_write_handle, + &mut dandelion_pool_manager, + ) + .await + } + + Ok(()) +} + +/// Prepares the incoming transactions for verification. +/// +/// This will filter out all transactions already in the pool or txs already being handled in another request. +async fn prepare_incoming_txs( + tx_blobs: Vec, + txs_being_handled: TxsBeingHandled, + txpool_read_handle: &mut TxpoolReadHandle, +) -> Result<(Vec>, TxBeingHandledLocally), IncomingTxError> { + let mut tx_blob_hashes = HashSet::new(); + let mut txs_being_handled_loacally = txs_being_handled.local_tracker(); + + // Compute the blob hash for each tx and filter out the txs currently being handled by another incoming tx batch. + let txs = tx_blobs + .into_iter() + .filter_map(|tx_blob| { + let tx_blob_hash = tx_blob_hash(tx_blob.as_ref()); + + // If a duplicate is in here the incoming tx batch contained the same tx twice. + if !tx_blob_hashes.insert(tx_blob_hash) { + return Some(Err(IncomingTxError::DuplicateTransaction)); + } + + // If a duplicate is here it is being handled in another batch. + if !txs_being_handled_loacally.try_add_tx(tx_blob_hash) { + return None; + } + + Some(Ok((tx_blob_hash, tx_blob))) + }) + .collect::, _>>()?; + + // Filter the txs already in the txpool out. + // This will leave the txs already in the pool in [`TxBeingHandledLocally`] but that shouldn't be an issue. + let TxpoolReadResponse::FilterKnownTxBlobHashes(tx_blob_hashes) = txpool_read_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes)) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; + + // Now prepare the txs for verification. + rayon_spawn_async(move || { + let txs = txs + .into_iter() .filter_map(|(tx_blob_hash, tx_blob)| { if tx_blob_hashes.contains(&tx_blob_hash) { Some(tx_blob) @@ -184,66 +218,55 @@ async fn handle_incoming_txs( Ok(Arc::new(tx)) }) - .collect::, IncomingTxError>>() - }) - .await?; + .collect::, IncomingTxError>>()?; - let BlockChainContextResponse::Context(context) = blockchain_context_cache + Ok((txs, txs_being_handled_loacally)) + }) + .await +} + +async fn handle_valid_tx( + tx: Arc, + state: TxState, + txpool_write_handle: &mut TxpoolWriteHandle, + dandelion_pool_manager: &mut DandelionPoolService, +) { + let incoming_tx = + IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash); + + let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle .ready() - .await? - .call(BlockChainContextRequest::GetContext) - .await? + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolWriteRequest::AddTransaction { + tx, + state_stem: state.state_stem(), + }) + .await + .expect("TODO") else { unreachable!() }; - let context = context.unchecked_blockchain_context(); + // TODO: track double spends to quickly ignore them from their blob hash. + if let Some(tx_hash) = double_spend { + return; + }; - tx_verifier_service + // TODO: check blockchain for double spends to prevent a race condition. + + // TODO: fill this in properly. + let incoming_tx = incoming_tx + .with_routing_state(state) + .with_state_in_db(None) + .build() + .unwrap(); + + dandelion_pool_manager .ready() - .await? - .call(VerifyTxRequest::Prepped { - txs: txs.clone(), - current_chain_height: context.chain_height, - top_hash: context.top_hash, - time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), - hf: context.current_hf, - }) - .await?; - - for tx in txs { - let incoming_tx = IncomingTxBuilder::new(Bytes::copy_from_slice(&tx.tx_blob), tx.tx_hash); - - let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle - .ready() - .await? - .call(TxpoolWriteRequest::AddTransaction { - tx, - state_stem: state.state_stem(), - }) - .await? - else { - unreachable!() - }; - - // TODO: track double spends to quickly ignore them from their blob hash. - if let Some(tx_hash) = double_spend { - continue; - }; - - // TODO: check blockchain for double spends to prevent a race condition. - - // TODO: fill this in properly. - let incoming_tx = incoming_tx - .with_routing_state(state.clone()) - .with_state_in_db(None) - .build() - .unwrap(); - - dandelion_pool_manager - .ready() - .await? - .call(incoming_tx) - .await?; - } + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(incoming_tx) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); } diff --git a/binaries/cuprated/src/txpool/manager.rs b/binaries/cuprated/src/txpool/manager.rs deleted file mode 100644 index 8b13789..0000000 --- a/binaries/cuprated/src/txpool/manager.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/binaries/cuprated/src/txpool/txs_being_handled.rs b/binaries/cuprated/src/txpool/txs_being_handled.rs index 3973e7a..8b30821 100644 --- a/binaries/cuprated/src/txpool/txs_being_handled.rs +++ b/binaries/cuprated/src/txpool/txs_being_handled.rs @@ -7,6 +7,7 @@ pub fn tx_blob_hash(tx_bytes: &[u8]) -> [u8; 32] { hasher.update(tx_bytes); hasher.finalize().into() } + #[derive(Clone)] pub struct TxsBeingHandled(Arc>); @@ -25,11 +26,7 @@ pub struct TxBeingHandledLocally { } impl TxBeingHandledLocally { - pub fn try_add_tx(&mut self, tx_bytes: &[u8]) -> bool { - let mut hasher = Sha3_256::new(); - hasher.update(tx_bytes); - let tx_blob_hash = hasher.finalize().into(); - + pub fn try_add_tx(&mut self, tx_blob_hash: [u8; 32]) -> bool { if !self.txs_being_handled.0.insert(tx_blob_hash) { return false; } diff --git a/p2p/dandelion-tower/src/router.rs b/p2p/dandelion-tower/src/router.rs index c04dcae..899b123 100644 --- a/p2p/dandelion-tower/src/router.rs +++ b/p2p/dandelion-tower/src/router.rs @@ -74,7 +74,7 @@ pub enum TxState { } impl TxState { - pub const fn state_stem(&self) -> bool { + pub fn state_stem(&self) -> bool { matches!(self, Self::Local | Self::Stem { .. }) } } diff --git a/storage/service/src/service/write.rs b/storage/service/src/service/write.rs index 3914f22..607c4aa 100644 --- a/storage/service/src/service/write.rs +++ b/storage/service/src/service/write.rs @@ -21,7 +21,7 @@ const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter"); /// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`] /// will return an `async`hronous channel that can be `.await`ed upon /// to receive the corresponding response. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct DatabaseWriteHandle { /// Sender channel to the database write thread-pool. /// @@ -30,6 +30,14 @@ pub struct DatabaseWriteHandle { crossbeam::channel::Sender<(Req, oneshot::Sender>)>, } +impl Clone for DatabaseWriteHandle { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + } + } +} + impl DatabaseWriteHandle where Req: Send + 'static, diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index 22a09cb..cfcd8da 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -1,13 +1,11 @@ //! Tx-pool [`service`](super) interface. //! //! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums. - -use std::collections::HashSet; -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use cuprate_types::TransactionVerificationData; -use crate::types::TransactionHash; +use crate::types::{TransactionBlobHash, TransactionHash}; //---------------------------------------------------------------------------------------------------- TxpoolReadRequest /// The transaction pool [`tower::Service`] read request type. @@ -17,8 +15,10 @@ pub enum TxpoolReadRequest { TxBlob(TransactionHash), /// A request for the [`TransactionVerificationData`] of a transaction in the tx pool. TxVerificationData(TransactionHash), - - FilterKnownTxBlobHashes(HashSet), + /// A request to filter (remove) all **known** transactions from the set. + /// + /// The hash is **not** the transaction hash, it is the hash of the serialized tx-blob. + FilterKnownTxBlobHashes(HashSet), } //---------------------------------------------------------------------------------------------------- TxpoolReadResponse @@ -26,12 +26,14 @@ pub enum TxpoolReadRequest { #[expect(clippy::large_enum_variant)] pub enum TxpoolReadResponse { /// A response containing the raw bytes of a transaction. - // TODO: use bytes::Bytes. - TxBlob(Vec), + TxBlob { + tx_blob: Vec, + state_stem: bool, + }, /// A response of [`TransactionVerificationData`]. TxVerificationData(TransactionVerificationData), - - FilterKnownTxBlobHashes(HashSet), + /// The response for [`TxpoolReadRequest::FilterKnownTxBlobHashes`]. + FilterKnownTxBlobHashes(HashSet), } //---------------------------------------------------------------------------------------------------- TxpoolWriteRequest @@ -53,6 +55,11 @@ pub enum TxpoolWriteRequest { /// /// Returns [`TxpoolWriteResponse::Ok`]. RemoveTransaction(TransactionHash), + /// Promote a transaction from the stem pool to the fluff pool. + /// If the tx is already in the fluff pool this does nothing. + /// + /// Returns [`TxpoolWriteResponse::Ok`]. + Promote(TransactionHash), } //---------------------------------------------------------------------------------------------------- TxpoolWriteResponse diff --git a/storage/txpool/src/service/read.rs b/storage/txpool/src/service/read.rs index f006813..20a4bcd 100644 --- a/storage/txpool/src/service/read.rs +++ b/storage/txpool/src/service/read.rs @@ -58,6 +58,7 @@ fn map_request( match request { TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash), TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash), + _ => todo!(), } } diff --git a/storage/txpool/src/types.rs b/storage/txpool/src/types.rs index 4da2d0f..83d9e01 100644 --- a/storage/txpool/src/types.rs +++ b/storage/txpool/src/types.rs @@ -17,6 +17,9 @@ pub type KeyImage = [u8; 32]; /// A transaction hash. pub type TransactionHash = [u8; 32]; +/// A transaction blob hash. +pub type TransactionBlobHash = [u8; 32]; + bitflags::bitflags! { /// Flags representing the state of the transaction in the pool. #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)] From 4ba94f4bb7f58e30cb64d51baa9581b4985ce639 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sun, 13 Oct 2024 01:50:48 +0100 Subject: [PATCH 04/12] Add tx blob hash table --- Cargo.lock | 1 + storage/txpool/Cargo.toml | 1 + storage/txpool/src/free.rs | 16 +++++++++++++++- storage/txpool/src/ops/tx_write.rs | 9 +++++++++ storage/txpool/src/service/interface.rs | 7 ++----- storage/txpool/src/tables.rs | 10 ++++++++-- 6 files changed, 36 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e597f8..6742abe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -910,6 +910,7 @@ dependencies = [ "monero-serai", "rayon", "serde", + "sha3", "tempfile", "thiserror", "tokio", diff --git a/storage/txpool/Cargo.toml b/storage/txpool/Cargo.toml index 70211d9..923060e 100644 --- a/storage/txpool/Cargo.toml +++ b/storage/txpool/Cargo.toml @@ -29,6 +29,7 @@ bytemuck = { workspace = true, features = ["must_cast", "derive" bitflags = { workspace = true, features = ["std", "serde", "bytemuck"] } thiserror = { workspace = true } hex = { workspace = true } +sha3 = { workspace = true, features = ["std"] } tower = { workspace = true, optional = true } rayon = { workspace = true, optional = true } diff --git a/storage/txpool/src/free.rs b/storage/txpool/src/free.rs index d394002..e04e350 100644 --- a/storage/txpool/src/free.rs +++ b/storage/txpool/src/free.rs @@ -1,9 +1,11 @@ //! General free functions (related to the tx-pool database). //---------------------------------------------------------------------------------------------------- Import +use sha3::{Digest, Sha3_256}; + use cuprate_database::{ConcreteEnv, Env, EnvInner, InitError, RuntimeError, TxRw}; -use crate::{config::Config, tables::OpenTables}; +use crate::{config::Config, tables::OpenTables, types::TransactionBlobHash}; //---------------------------------------------------------------------------------------------------- Free functions /// Open the txpool database using the passed [`Config`]. @@ -60,3 +62,15 @@ pub fn open(config: Config) -> Result { Ok(env) } + +/// Calculate the transaction blob hash. +/// +/// This value is supposed to be quick to compute just based of the tx-blob without needing to parse the tx. +/// +/// The exact way the hash is calculated is not stable and is subject to change, as such it should not be exposed +/// as a way to interact with Cuprate externally. +pub fn transaction_blob_hash(tx_blob: &[u8]) -> TransactionBlobHash { + let mut hasher = Sha3_256::new(); + hasher.update(tx_blob); + hasher.finalize().into() +} diff --git a/storage/txpool/src/ops/tx_write.rs b/storage/txpool/src/ops/tx_write.rs index 9885b9c..7b3b9a6 100644 --- a/storage/txpool/src/ops/tx_write.rs +++ b/storage/txpool/src/ops/tx_write.rs @@ -8,6 +8,7 @@ use cuprate_database::{DatabaseRw, RuntimeError, StorableVec}; use cuprate_types::TransactionVerificationData; use crate::{ + free::transaction_blob_hash, ops::{ key_images::{add_tx_key_images, remove_tx_key_images}, TxPoolWriteError, @@ -56,6 +57,10 @@ pub fn add_transaction( let kis_table = tables.spent_key_images_mut(); add_tx_key_images(&tx.tx.prefix().inputs, &tx.tx_hash, kis_table)?; + // Add the blob hash to table 4. + let blob_hash = transaction_blob_hash(&tx.tx_blob); + tables.known_blob_hashes_mut().put(&blob_hash, &())?; + Ok(()) } @@ -79,5 +84,9 @@ pub fn remove_transaction( let kis_table = tables.spent_key_images_mut(); remove_tx_key_images(&tx.prefix().inputs, kis_table)?; + // Remove the blob hash from table 4. + let blob_hash = transaction_blob_hash(&tx_blob); + tables.known_blob_hashes_mut().delete(&blob_hash)?; + Ok(()) } diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index cfcd8da..a5d6634 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -16,7 +16,7 @@ pub enum TxpoolReadRequest { /// A request for the [`TransactionVerificationData`] of a transaction in the tx pool. TxVerificationData(TransactionHash), /// A request to filter (remove) all **known** transactions from the set. - /// + /// /// The hash is **not** the transaction hash, it is the hash of the serialized tx-blob. FilterKnownTxBlobHashes(HashSet), } @@ -26,10 +26,7 @@ pub enum TxpoolReadRequest { #[expect(clippy::large_enum_variant)] pub enum TxpoolReadResponse { /// A response containing the raw bytes of a transaction. - TxBlob { - tx_blob: Vec, - state_stem: bool, - }, + TxBlob { tx_blob: Vec, state_stem: bool }, /// A response of [`TransactionVerificationData`]. TxVerificationData(TransactionVerificationData), /// The response for [`TxpoolReadRequest::FilterKnownTxBlobHashes`]. diff --git a/storage/txpool/src/tables.rs b/storage/txpool/src/tables.rs index dbb686a..2ab3262 100644 --- a/storage/txpool/src/tables.rs +++ b/storage/txpool/src/tables.rs @@ -16,7 +16,9 @@ //! accessing _all_ tables defined here at once. use cuprate_database::{define_tables, StorableVec}; -use crate::types::{KeyImage, RawCachedVerificationState, TransactionHash, TransactionInfo}; +use crate::types::{ + KeyImage, RawCachedVerificationState, TransactionBlobHash, TransactionHash, TransactionInfo, +}; define_tables! { /// Serialized transaction blobs. @@ -41,5 +43,9 @@ define_tables! { /// /// This table contains the spent key images from all transactions in the pool. 3 => SpentKeyImages, - KeyImage => TransactionHash + KeyImage => TransactionHash, + + /// Transaction blob hashes that are in the pool. + 4 => KnownBlobHashes, + TransactionBlobHash => (), } From 2d5567e47bd532fda832c163cfb9438f11f3e9db Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sun, 13 Oct 2024 02:20:45 +0100 Subject: [PATCH 05/12] Add missing txpool requests --- .../cuprated/src/txpool/dandelion/tx_store.rs | 3 +- binaries/cuprated/src/txpool/incoming_tx.rs | 6 +-- storage/txpool/src/lib.rs | 2 +- storage/txpool/src/service/read.rs | 47 ++++++++++++++++--- storage/txpool/src/service/write.rs | 25 ++++++++-- 5 files changed, 67 insertions(+), 16 deletions(-) diff --git a/binaries/cuprated/src/txpool/dandelion/tx_store.rs b/binaries/cuprated/src/txpool/dandelion/tx_store.rs index f13a646..c8fda16 100644 --- a/binaries/cuprated/src/txpool/dandelion/tx_store.rs +++ b/binaries/cuprated/src/txpool/dandelion/tx_store.rs @@ -65,9 +65,10 @@ impl Service> for TxStoreService { .boxed(), TxStoreRequest::Promote(tx_id) => self .txpool_write_handle + .clone() .oneshot(TxpoolWriteRequest::Promote(tx_id)) .map(|res| match res { - Ok(_) | Err(RuntimeError::KeyNotFound) => TxStoreResponse::Ok, + Ok(_) | Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Ok), Err(e) => Err(e.into()), }) .boxed(), diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index abac69d..addcad5 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -84,11 +84,9 @@ impl Service for IncomingTxHandler { } fn call(&mut self, req: IncomingTxs) -> Self::Future { - let IncomingTxs::Bytes { txs, state } = req; - handle_incoming_txs( - txs, - state, + req.txs, + req.state, self.txs_being_handled.clone(), self.blockchain_context_cache.clone(), self.tx_verifier_service.clone(), diff --git a/storage/txpool/src/lib.rs b/storage/txpool/src/lib.rs index 243dc4d..04d7bb9 100644 --- a/storage/txpool/src/lib.rs +++ b/storage/txpool/src/lib.rs @@ -13,7 +13,7 @@ pub mod tables; pub mod types; pub use config::Config; -pub use free::open; +pub use free::{open, transaction_blob_hash}; //re-exports pub use cuprate_database; diff --git a/storage/txpool/src/service/read.rs b/storage/txpool/src/service/read.rs index 20a4bcd..4bcb32a 100644 --- a/storage/txpool/src/service/read.rs +++ b/storage/txpool/src/service/read.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use rayon::ThreadPool; @@ -11,8 +11,8 @@ use crate::{ interface::{TxpoolReadRequest, TxpoolReadResponse}, types::{ReadResponseResult, TxpoolReadHandle}, }, - tables::{OpenTables, TransactionBlobs}, - types::TransactionHash, + tables::{KnownBlobHashes, OpenTables, TransactionBlobs, TransactionInfos}, + types::{TransactionBlobHash, TransactionHash, TxStateFlags}, }; // TODO: update the docs here @@ -58,7 +58,9 @@ fn map_request( match request { TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash), TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash), - _ => todo!(), + TxpoolReadRequest::FilterKnownTxBlobHashes(blob_hashes) => { + filter_known_tx_blob_hashes(env, blob_hashes) + } } } @@ -86,10 +88,15 @@ fn tx_blob(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult { let tx_ro = inner_env.tx_ro()?; let tx_blobs_table = inner_env.open_db_ro::(&tx_ro)?; + let tx_infos_table = inner_env.open_db_ro::(&tx_ro)?; - tx_blobs_table - .get(tx_hash) - .map(|blob| TxpoolReadResponse::TxBlob(blob.0)) + let tx_blob = tx_blobs_table.get(tx_hash)?.0; + let tx_info = tx_infos_table.get(tx_hash)?; + + Ok(TxpoolReadResponse::TxBlob { + tx_blob, + state_stem: tx_info.flags.contains(TxStateFlags::STATE_STEM), + }) } /// [`TxpoolReadRequest::TxVerificationData`]. @@ -102,3 +109,29 @@ fn tx_verification_data(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadRes get_transaction_verification_data(tx_hash, &tables).map(TxpoolReadResponse::TxVerificationData) } + +/// [`TxpoolReadRequest::FilterKnownTxBlobHashes`]. +fn filter_known_tx_blob_hashes( + env: &ConcreteEnv, + mut blob_hashes: HashSet, +) -> ReadResponseResult { + let inner_env = env.env_inner(); + let tx_ro = inner_env.tx_ro()?; + + let tx_blob_hashes = inner_env.open_db_ro::(&tx_ro)?; + + let mut err = None; + blob_hashes.retain(|blob_hash| match tx_blob_hashes.contains(blob_hash) { + Ok(exists) => !exists, + Err(e) => { + err.get_or_insert(e); + false + } + }); + + if let Some(e) = err { + return Err(e); + } + + Ok(TxpoolReadResponse::FilterKnownTxBlobHashes(blob_hashes)) +} diff --git a/storage/txpool/src/service/write.rs b/storage/txpool/src/service/write.rs index 8a3b1bf..84916c8 100644 --- a/storage/txpool/src/service/write.rs +++ b/storage/txpool/src/service/write.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use cuprate_database::{ConcreteEnv, Env, EnvInner, RuntimeError, TxRw}; +use cuprate_database::{ConcreteEnv, DatabaseRw, Env, EnvInner, RuntimeError, TxRw}; use cuprate_database_service::DatabaseWriteHandle; use cuprate_types::TransactionVerificationData; @@ -10,8 +10,8 @@ use crate::{ interface::{TxpoolWriteRequest, TxpoolWriteResponse}, types::TxpoolWriteHandle, }, - tables::OpenTables, - types::TransactionHash, + tables::{OpenTables, TransactionInfos}, + types::{TransactionHash, TxStateFlags}, }; //---------------------------------------------------------------------------------------------------- init_write_service @@ -31,6 +31,7 @@ fn handle_txpool_request( add_transaction(env, tx, *state_stem) } TxpoolWriteRequest::RemoveTransaction(tx_hash) => remove_transaction(env, tx_hash), + TxpoolWriteRequest::Promote(tx_hash) => promote(env, tx_hash), } } @@ -101,3 +102,21 @@ fn remove_transaction( TxRw::commit(tx_rw)?; Ok(TxpoolWriteResponse::Ok) } + +/// [`TxpoolWriteRequest::Promote`] +fn promote( + env: &ConcreteEnv, + tx_hash: &TransactionHash, +) -> Result { + let env_inner = env.env_inner(); + let tx_rw = env_inner.tx_rw()?; + + let mut tx_infos = env_inner.open_db_rw::(&tx_rw)?; + + tx_infos.update(tx_hash, |mut info| { + info.flags.remove(TxStateFlags::STATE_STEM); + Some(info) + })?; + + Ok(TxpoolWriteResponse::Ok) +} From 93fb3c657c256e7bd67d28afffd70b955bd46f8d Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sun, 13 Oct 2024 18:35:58 +0100 Subject: [PATCH 06/12] handle duplicate stem txs --- binaries/cuprated/src/txpool/incoming_tx.rs | 68 +++++++++++++++++++-- storage/txpool/src/ops.rs | 2 +- storage/txpool/src/ops/tx_read.rs | 12 ++++ storage/txpool/src/ops/tx_write.rs | 4 +- storage/txpool/src/service/interface.rs | 7 ++- storage/txpool/src/service/read.rs | 32 ++++++++-- storage/txpool/src/tables.rs | 2 +- 7 files changed, 112 insertions(+), 15 deletions(-) diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index addcad5..f0404b6 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -19,7 +19,7 @@ use cuprate_consensus::{ }; use cuprate_dandelion_tower::{ pool::{DandelionPoolService, IncomingTx, IncomingTxBuilder}, - TxState, + State, TxState, }; use cuprate_helper::asynch::rayon_spawn_async; use cuprate_txpool::service::{ @@ -110,7 +110,7 @@ async fn handle_incoming_txs( ) -> Result<(), IncomingTxError> { let reorg_guard = REORG_LOCK.read().await; - let (txs, txs_being_handled_guard) = + let (txs, stem_pool_txs, txs_being_handled_guard) = prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?; let BlockChainContextResponse::Context(context) = blockchain_context_cache @@ -150,6 +150,16 @@ async fn handle_incoming_txs( .await } + for stem_tx in stem_pool_txs { + rerelay_stem_tx( + &stem_tx, + state.clone(), + &mut txpool_read_handle, + &mut dandelion_pool_manager, + ) + .await; + } + Ok(()) } @@ -160,7 +170,14 @@ async fn prepare_incoming_txs( tx_blobs: Vec, txs_being_handled: TxsBeingHandled, txpool_read_handle: &mut TxpoolReadHandle, -) -> Result<(Vec>, TxBeingHandledLocally), IncomingTxError> { +) -> Result< + ( + Vec>, + Vec, + TxBeingHandledLocally, + ), + IncomingTxError, +> { let mut tx_blob_hashes = HashSet::new(); let mut txs_being_handled_loacally = txs_being_handled.local_tracker(); @@ -186,7 +203,10 @@ async fn prepare_incoming_txs( // Filter the txs already in the txpool out. // This will leave the txs already in the pool in [`TxBeingHandledLocally`] but that shouldn't be an issue. - let TxpoolReadResponse::FilterKnownTxBlobHashes(tx_blob_hashes) = txpool_read_handle + let TxpoolReadResponse::FilterKnownTxBlobHashes { + unknown_blob_hashes, + stem_pool_hashes, + } = txpool_read_handle .ready() .await .expect(PANIC_CRITICAL_SERVICE_ERROR) @@ -202,7 +222,7 @@ async fn prepare_incoming_txs( let txs = txs .into_iter() .filter_map(|(tx_blob_hash, tx_blob)| { - if tx_blob_hashes.contains(&tx_blob_hash) { + if unknown_blob_hashes.contains(&tx_blob_hash) { Some(tx_blob) } else { None @@ -218,7 +238,7 @@ async fn prepare_incoming_txs( }) .collect::, IncomingTxError>>()?; - Ok((txs, txs_being_handled_loacally)) + Ok((txs, stem_pool_hashes, txs_being_handled_loacally)) }) .await } @@ -268,3 +288,39 @@ async fn handle_valid_tx( .await .expect(PANIC_CRITICAL_SERVICE_ERROR); } + +async fn rerelay_stem_tx( + tx_hash: &TxId, + state: TxState, + txpool_read_handle: &mut TxpoolReadHandle, + dandelion_pool_manager: &mut DandelionPoolService, +) { + let TxpoolReadResponse::TxBlob { tx_blob, .. } = txpool_read_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolReadRequest::TxBlob(*tx_hash)) + .await + .expect("TODO") + else { + unreachable!() + }; + + let incoming_tx = + IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx_blob)), *tx_hash); + + // TODO: fill this in properly. + let incoming_tx = incoming_tx + .with_routing_state(state) + .with_state_in_db(Some(State::Stem)) + .build() + .unwrap(); + + dandelion_pool_manager + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(incoming_tx) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); +} diff --git a/storage/txpool/src/ops.rs b/storage/txpool/src/ops.rs index 50d9ea4..289a8bb 100644 --- a/storage/txpool/src/ops.rs +++ b/storage/txpool/src/ops.rs @@ -85,7 +85,7 @@ mod key_images; mod tx_read; mod tx_write; -pub use tx_read::get_transaction_verification_data; +pub use tx_read::{get_transaction_verification_data, in_stem_pool}; pub use tx_write::{add_transaction, remove_transaction}; /// An error that can occur on some tx-write ops. diff --git a/storage/txpool/src/ops/tx_read.rs b/storage/txpool/src/ops/tx_read.rs index db89415..6b79cba 100644 --- a/storage/txpool/src/ops/tx_read.rs +++ b/storage/txpool/src/ops/tx_read.rs @@ -8,6 +8,8 @@ use monero_serai::transaction::Transaction; use cuprate_database::{DatabaseRo, RuntimeError}; use cuprate_types::{TransactionVerificationData, TxVersion}; +use crate::tables::TransactionInfos; +use crate::types::TxStateFlags; use crate::{tables::Tables, types::TransactionHash}; /// Gets the [`TransactionVerificationData`] of a transaction in the tx-pool, leaving the tx in the pool. @@ -34,3 +36,13 @@ pub fn get_transaction_verification_data( cached_verification_state: Mutex::new(cached_verification_state), }) } + +pub fn in_stem_pool( + tx_hash: &TransactionHash, + tx_infos: &impl DatabaseRo, +) -> Result { + Ok(tx_infos + .get(tx_hash)? + .flags + .contains(TxStateFlags::STATE_STEM)) +} diff --git a/storage/txpool/src/ops/tx_write.rs b/storage/txpool/src/ops/tx_write.rs index 7b3b9a6..dc5ab46 100644 --- a/storage/txpool/src/ops/tx_write.rs +++ b/storage/txpool/src/ops/tx_write.rs @@ -59,7 +59,9 @@ pub fn add_transaction( // Add the blob hash to table 4. let blob_hash = transaction_blob_hash(&tx.tx_blob); - tables.known_blob_hashes_mut().put(&blob_hash, &())?; + tables + .known_blob_hashes_mut() + .put(&blob_hash, &tx.tx_hash)?; Ok(()) } diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index a5d6634..0e57039 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -30,7 +30,12 @@ pub enum TxpoolReadResponse { /// A response of [`TransactionVerificationData`]. TxVerificationData(TransactionVerificationData), /// The response for [`TxpoolReadRequest::FilterKnownTxBlobHashes`]. - FilterKnownTxBlobHashes(HashSet), + FilterKnownTxBlobHashes { + /// The blob hashes that are unknown. + unknown_blob_hashes: HashSet, + /// The tx hashes of the blob hashes that were known but were in the stem pool. + stem_pool_hashes: Vec, + }, } //---------------------------------------------------------------------------------------------------- TxpoolWriteRequest diff --git a/storage/txpool/src/service/read.rs b/storage/txpool/src/service/read.rs index 4bcb32a..9e92a0d 100644 --- a/storage/txpool/src/service/read.rs +++ b/storage/txpool/src/service/read.rs @@ -2,9 +2,10 @@ use std::{collections::HashSet, sync::Arc}; use rayon::ThreadPool; -use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner}; +use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError}; use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; +use crate::ops::in_stem_pool; use crate::{ ops::get_transaction_verification_data, service::{ @@ -119,12 +120,30 @@ fn filter_known_tx_blob_hashes( let tx_ro = inner_env.tx_ro()?; let tx_blob_hashes = inner_env.open_db_ro::(&tx_ro)?; + let tx_infos = inner_env.open_db_ro::(&tx_ro)?; + + let mut stem_pool_hashes = Vec::new(); + + // A closure that returns if a tx with a certain blob hash is unknown. + // This also fills in `stem_tx_hashes`. + let mut tx_unknown = |blob_hash| -> Result { + match tx_blob_hashes.get(&blob_hash) { + Ok(tx_hash) => { + if in_stem_pool(&tx_hash, &tx_infos)? { + stem_pool_hashes.push(tx_hash); + } + Ok(false) + } + Err(RuntimeError::KeyNotFound) => Ok(true), + Err(e) => Err(e), + } + }; let mut err = None; - blob_hashes.retain(|blob_hash| match tx_blob_hashes.contains(blob_hash) { - Ok(exists) => !exists, + blob_hashes.retain(|blob_hash| match tx_unknown(*blob_hash) { + Ok(res) => res, Err(e) => { - err.get_or_insert(e); + err = Some(e); false } }); @@ -133,5 +152,8 @@ fn filter_known_tx_blob_hashes( return Err(e); } - Ok(TxpoolReadResponse::FilterKnownTxBlobHashes(blob_hashes)) + Ok(TxpoolReadResponse::FilterKnownTxBlobHashes { + unknown_blob_hashes: blob_hashes, + stem_pool_hashes, + }) } diff --git a/storage/txpool/src/tables.rs b/storage/txpool/src/tables.rs index 2ab3262..1f2d449 100644 --- a/storage/txpool/src/tables.rs +++ b/storage/txpool/src/tables.rs @@ -47,5 +47,5 @@ define_tables! { /// Transaction blob hashes that are in the pool. 4 => KnownBlobHashes, - TransactionBlobHash => (), + TransactionBlobHash => TransactionHash, } From 5ab5b062fdd3cbdcfa5c065d6b45d31998d4bd23 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sun, 13 Oct 2024 22:43:53 +0100 Subject: [PATCH 07/12] check txpool on incoming block --- binaries/cuprated/src/blockchain/interface.rs | 57 ++++++++++++------- storage/txpool/src/service/interface.rs | 10 +++- storage/txpool/src/service/read.rs | 36 ++++++++++-- storage/txpool/src/types.rs | 1 - 4 files changed, 77 insertions(+), 27 deletions(-) diff --git a/binaries/cuprated/src/blockchain/interface.rs b/binaries/cuprated/src/blockchain/interface.rs index 985e60d..7f7dfd3 100644 --- a/binaries/cuprated/src/blockchain/interface.rs +++ b/binaries/cuprated/src/blockchain/interface.rs @@ -15,6 +15,8 @@ use tower::{Service, ServiceExt}; use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_consensus::transactions::new_tx_verification_data; use cuprate_helper::cast::usize_to_u64; +use cuprate_txpool::service::interface::{TxpoolReadRequest, TxpoolReadResponse}; +use cuprate_txpool::service::TxpoolReadHandle; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, Chain, @@ -38,7 +40,7 @@ pub enum IncomingBlockError { /// /// The inner values are the block hash and the indexes of the missing txs in the block. #[error("Unknown transactions in block.")] - UnknownTransactions([u8; 32], Vec), + UnknownTransactions([u8; 32], Vec), /// We are missing the block's parent. #[error("The block has an unknown parent.")] Orphan, @@ -59,8 +61,9 @@ pub enum IncomingBlockError { /// - the block's parent is unknown pub async fn handle_incoming_block( block: Block, - given_txs: Vec, + mut given_txs: HashMap<[u8; 32], Transaction>, blockchain_read_handle: &mut BlockchainReadHandle, + txpool_read_handle: &mut TxpoolReadHandle, ) -> Result { /// A [`HashSet`] of block hashes that the blockchain manager is currently handling. /// @@ -72,7 +75,12 @@ pub async fn handle_incoming_block( /// which are also more expensive than `Mutex`s. static BLOCKS_BEING_HANDLED: LazyLock>> = LazyLock::new(|| Mutex::new(HashSet::new())); - // FIXME: we should look in the tx-pool for txs when that is ready. + + if given_txs.len() > block.transactions.len() { + return Err(IncomingBlockError::InvalidBlock(anyhow::anyhow!( + "Too many transactions given for block" + ))); + } if !block_exists(block.header.previous, blockchain_read_handle) .await @@ -90,23 +98,32 @@ pub async fn handle_incoming_block( return Ok(IncomingBlockOk::AlreadyHave); } - // TODO: remove this when we have a working tx-pool. - if given_txs.len() != block.transactions.len() { - return Err(IncomingBlockError::UnknownTransactions( - block_hash, - (0..usize_to_u64(block.transactions.len())).collect(), - )); - } + let TxpoolReadResponse::TxsForBlock { mut txs, missing } = txpool_read_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolReadRequest::TxsForBlock(block.transactions.clone())) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; - // TODO: check we actually got given the right txs. - let prepped_txs = given_txs - .into_par_iter() - .map(|tx| { - let tx = new_tx_verification_data(tx)?; - Ok((tx.tx_hash, tx)) - }) - .collect::>() - .map_err(IncomingBlockError::InvalidBlock)?; + if !missing.is_empty() { + let needed_hashes = missing.iter().map(|index| block.transactions[*index]); + + for needed_hash in needed_hashes { + let Some(tx) = given_txs.remove(&needed_hash) else { + return Err(IncomingBlockError::UnknownTransactions(block_hash, missing)); + }; + + txs.insert( + needed_hash, + new_tx_verification_data(tx) + .map_err(|e| IncomingBlockError::InvalidBlock(e.into()))?, + ); + } + } let Some(incoming_block_tx) = COMMAND_TX.get() else { // We could still be starting up the blockchain manager. @@ -126,7 +143,7 @@ pub async fn handle_incoming_block( incoming_block_tx .send(BlockchainManagerCommand::AddBlock { block, - prepped_txs, + prepped_txs: txs, response_tx, }) .await diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index 0e57039..8e88065 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -1,9 +1,9 @@ //! Tx-pool [`service`](super) interface. //! //! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums. -use std::{collections::HashSet, sync::Arc}; - use cuprate_types::TransactionVerificationData; +use std::collections::HashMap; +use std::{collections::HashSet, sync::Arc}; use crate::types::{TransactionBlobHash, TransactionHash}; @@ -19,6 +19,8 @@ pub enum TxpoolReadRequest { /// /// The hash is **not** the transaction hash, it is the hash of the serialized tx-blob. FilterKnownTxBlobHashes(HashSet), + /// A request to pull some transactions for an incoming block. + TxsForBlock(Vec), } //---------------------------------------------------------------------------------------------------- TxpoolReadResponse @@ -36,6 +38,10 @@ pub enum TxpoolReadResponse { /// The tx hashes of the blob hashes that were known but were in the stem pool. stem_pool_hashes: Vec, }, + TxsForBlock { + txs: HashMap<[u8; 32], TransactionVerificationData>, + missing: Vec, + }, } //---------------------------------------------------------------------------------------------------- TxpoolWriteRequest diff --git a/storage/txpool/src/service/read.rs b/storage/txpool/src/service/read.rs index 9e92a0d..47c1d65 100644 --- a/storage/txpool/src/service/read.rs +++ b/storage/txpool/src/service/read.rs @@ -1,13 +1,15 @@ -use std::{collections::HashSet, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use rayon::ThreadPool; use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError}; use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; -use crate::ops::in_stem_pool; use crate::{ - ops::get_transaction_verification_data, + ops::{get_transaction_verification_data, in_stem_pool}, service::{ interface::{TxpoolReadRequest, TxpoolReadResponse}, types::{ReadResponseResult, TxpoolReadHandle}, @@ -51,7 +53,6 @@ fn init_read_service_with_pool(env: Arc, pool: Arc) -> /// 1. `Request` is mapped to a handler function /// 2. Handler function is called /// 3. [`TxpoolReadResponse`] is returned -#[expect(clippy::needless_pass_by_value)] fn map_request( env: &ConcreteEnv, // Access to the database request: TxpoolReadRequest, // The request we must fulfill @@ -62,6 +63,7 @@ fn map_request( TxpoolReadRequest::FilterKnownTxBlobHashes(blob_hashes) => { filter_known_tx_blob_hashes(env, blob_hashes) } + TxpoolReadRequest::TxsForBlock(txs_needed) => txs_for_block(env, txs_needed), } } @@ -157,3 +159,29 @@ fn filter_known_tx_blob_hashes( stem_pool_hashes, }) } + +/// [`TxpoolReadRequest::TxsForBlock`]. +fn txs_for_block(env: &ConcreteEnv, txs: Vec) -> ReadResponseResult { + let inner_env = env.env_inner(); + let tx_ro = inner_env.tx_ro()?; + + let tables = inner_env.open_tables(&tx_ro)?; + + let mut missing_tx_indexes = Vec::with_capacity(txs.len()); + let mut txs_verification_data = HashMap::with_capacity(txs.len()); + + for (i, tx_hash) in txs.into_iter().enumerate() { + match get_transaction_verification_data(&tx_hash, &tables) { + Ok(tx) => { + txs_verification_data.insert(tx_hash, tx); + } + Err(RuntimeError::KeyNotFound) => missing_tx_indexes.push(i), + Err(e) => return Err(e), + } + } + + Ok(TxpoolReadResponse::TxsForBlock { + txs: txs_verification_data, + missing: missing_tx_indexes, + }) +} diff --git a/storage/txpool/src/types.rs b/storage/txpool/src/types.rs index 83d9e01..2acb819 100644 --- a/storage/txpool/src/types.rs +++ b/storage/txpool/src/types.rs @@ -6,7 +6,6 @@ //! //! use bytemuck::{Pod, Zeroable}; - use monero_serai::transaction::Timelock; use cuprate_types::{CachedVerificationState, HardFork}; From 74a07cb8baf565defcaf1c62679a711b65a3e9ba Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Mon, 14 Oct 2024 01:00:02 +0100 Subject: [PATCH 08/12] add request to remove tx in new blocks from the pool --- storage/txpool/src/service/interface.rs | 7 ++- storage/txpool/src/service/write.rs | 65 +++++++++++++++++++++++-- storage/txpool/src/tables.rs | 6 ++- storage/txpool/src/types.rs | 6 +++ 4 files changed, 79 insertions(+), 5 deletions(-) diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index 8e88065..7dc52f5 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -5,7 +5,7 @@ use cuprate_types::TransactionVerificationData; use std::collections::HashMap; use std::{collections::HashSet, sync::Arc}; -use crate::types::{TransactionBlobHash, TransactionHash}; +use crate::types::{KeyImage, TransactionBlobHash, TransactionHash}; //---------------------------------------------------------------------------------------------------- TxpoolReadRequest /// The transaction pool [`tower::Service`] read request type. @@ -68,6 +68,11 @@ pub enum TxpoolWriteRequest { /// /// Returns [`TxpoolWriteResponse::Ok`]. Promote(TransactionHash), + + NewBlock { + blockchain_height: usize, + spent_key_images: Vec, + }, } //---------------------------------------------------------------------------------------------------- TxpoolWriteResponse diff --git a/storage/txpool/src/service/write.rs b/storage/txpool/src/service/write.rs index 84916c8..2a4a5bb 100644 --- a/storage/txpool/src/service/write.rs +++ b/storage/txpool/src/service/write.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use cuprate_database::{ConcreteEnv, DatabaseRw, Env, EnvInner, RuntimeError, TxRw}; +use cuprate_database::{ConcreteEnv, DatabaseRo, DatabaseRw, Env, EnvInner, RuntimeError, TxRw}; use cuprate_database_service::DatabaseWriteHandle; use cuprate_types::TransactionVerificationData; @@ -10,8 +10,8 @@ use crate::{ interface::{TxpoolWriteRequest, TxpoolWriteResponse}, types::TxpoolWriteHandle, }, - tables::{OpenTables, TransactionInfos}, - types::{TransactionHash, TxStateFlags}, + tables::{OpenTables, Tables, TablesMut, TransactionInfos}, + types::{KeyImage, PoolInfo, TransactionHash, TxStateFlags}, }; //---------------------------------------------------------------------------------------------------- init_write_service @@ -32,6 +32,10 @@ fn handle_txpool_request( } TxpoolWriteRequest::RemoveTransaction(tx_hash) => remove_transaction(env, tx_hash), TxpoolWriteRequest::Promote(tx_hash) => promote(env, tx_hash), + TxpoolWriteRequest::NewBlock { + blockchain_height, + spent_key_images, + } => new_block(env, *blockchain_height, spent_key_images), } } @@ -118,5 +122,60 @@ fn promote( Some(info) })?; + drop(tx_infos); + + TxRw::commit(tx_rw)?; + Ok(TxpoolWriteResponse::Ok) +} + +fn new_block( + env: &ConcreteEnv, + blockchain_height: usize, + spent_key_images: &[KeyImage], +) -> Result { + let env_inner = env.env_inner(); + let tx_rw = env_inner.tx_rw()?; + + // FIXME: use try blocks once stable. + let result = || { + let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?; + + for key_image in spent_key_images { + match tables_mut + .spent_key_images() + .get(key_image) + .and_then(|tx_hash| ops::remove_transaction(&tx_hash, &mut tables_mut)) + { + Ok(()) | Err(RuntimeError::KeyNotFound) => (), + Err(e) => return Err(e), + } + } + + let res = tables_mut.pool_stats_mut().update(&(), |mut info| { + info.last_known_blockchain_height = blockchain_height; + + Some(info) + }); + + match res { + Ok(()) => (), + Err(RuntimeError::KeyNotFound) => tables_mut.pool_stats_mut().put( + &(), + &PoolInfo { + last_known_blockchain_height: blockchain_height, + }, + )?, + Err(e) => return Err(e), + } + + Ok(()) + }; + + if let Err(e) = result() { + TxRw::abort(tx_rw)?; + return Err(e); + } + + TxRw::commit(tx_rw)?; Ok(TxpoolWriteResponse::Ok) } diff --git a/storage/txpool/src/tables.rs b/storage/txpool/src/tables.rs index 1f2d449..d1d4468 100644 --- a/storage/txpool/src/tables.rs +++ b/storage/txpool/src/tables.rs @@ -17,7 +17,8 @@ use cuprate_database::{define_tables, StorableVec}; use crate::types::{ - KeyImage, RawCachedVerificationState, TransactionBlobHash, TransactionHash, TransactionInfo, + KeyImage, PoolInfo, RawCachedVerificationState, TransactionBlobHash, TransactionHash, + TransactionInfo, }; define_tables! { @@ -48,4 +49,7 @@ define_tables! { /// Transaction blob hashes that are in the pool. 4 => KnownBlobHashes, TransactionBlobHash => TransactionHash, + + 5 => PoolStats, + () => PoolInfo, } diff --git a/storage/txpool/src/types.rs b/storage/txpool/src/types.rs index 2acb819..f1c1d0f 100644 --- a/storage/txpool/src/types.rs +++ b/storage/txpool/src/types.rs @@ -19,6 +19,12 @@ pub type TransactionHash = [u8; 32]; /// A transaction blob hash. pub type TransactionBlobHash = [u8; 32]; +#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)] +#[repr(C)] +pub struct PoolInfo { + pub last_known_blockchain_height: usize, +} + bitflags::bitflags! { /// Flags representing the state of the transaction in the pool. #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)] From fb29f3237560c54430e2927e55bc9b8ef75305b7 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Mon, 14 Oct 2024 18:15:43 +0100 Subject: [PATCH 09/12] tell the txpool about incoming blocks --- binaries/cuprated/src/blockchain/manager.rs | 5 ++++ .../src/blockchain/manager/handler.rs | 26 +++++++++++++++++++ binaries/cuprated/src/txpool/incoming_tx.rs | 3 +-- storage/txpool/src/service/interface.rs | 14 +++++++--- 4 files changed, 43 insertions(+), 5 deletions(-) diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index 568ed57..13f7871 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -36,6 +36,7 @@ mod commands; mod handler; pub use commands::{BlockchainManagerCommand, IncomingBlockOk}; +use cuprate_txpool::service::TxpoolWriteHandle; /// Initialize the blockchain manager. /// @@ -45,6 +46,7 @@ pub async fn init_blockchain_manager( clearnet_interface: NetworkInterface, blockchain_write_handle: BlockchainWriteHandle, blockchain_read_handle: BlockchainReadHandle, + txpool_write_handle: TxpoolWriteHandle, mut blockchain_context_service: BlockChainContextService, block_verifier_service: ConcreteBlockVerifierService, block_downloader_config: BlockDownloaderConfig, @@ -79,6 +81,7 @@ pub async fn init_blockchain_manager( let manager = BlockchainManager { blockchain_write_handle, blockchain_read_handle, + txpool_write_handle, blockchain_context_service, cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(), block_verifier_service, @@ -101,6 +104,8 @@ pub struct BlockchainManager { blockchain_write_handle: BlockchainWriteHandle, /// A [`BlockchainReadHandle`]. blockchain_read_handle: BlockchainReadHandle, + /// A [`TxpoolWriteHandle`]. + txpool_write_handle: TxpoolWriteHandle, // TODO: Improve the API of the cache service. // TODO: rename the cache service -> `BlockchainContextService`. /// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index 303e2e4..b341ca6 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -1,6 +1,7 @@ //! The blockchain manager handler functions. use bytes::Bytes; use futures::{TryFutureExt, TryStreamExt}; +use monero_serai::transaction::Input; use monero_serai::{block::Block, transaction::Transaction}; use rayon::prelude::*; use std::ops::ControlFlow; @@ -17,6 +18,7 @@ use cuprate_consensus::{ }; use cuprate_helper::cast::usize_to_u64; use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest}; +use cuprate_txpool::service::interface::TxpoolWriteRequest; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest}, AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation, @@ -434,6 +436,19 @@ impl super::BlockchainManager { &mut self, verified_block: VerifiedBlockInformation, ) { + // FIXME: this is pretty inefficient, we should probably return the KI map created in the consensus crate. + let spent_key_images = verified_block + .txs + .iter() + .flat_map(|tx| { + tx.tx.prefix().inputs.iter().map(|input| match input { + Input::ToKey { key_image, .. } => key_image.compress().0, + Input::Gen(_) => unreachable!(), + }) + }) + .collect::>(); + let blockchain_height = verified_block.height + 1; + self.blockchain_context_service .ready() .await @@ -472,6 +487,17 @@ impl super::BlockchainManager { }; self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone(); + + self.txpool_write_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolWriteRequest::NewBlock { + spent_key_images, + blockchain_height, + }) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); } } diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index f0404b6..14e98e0 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -271,9 +271,8 @@ async fn handle_valid_tx( return; }; - // TODO: check blockchain for double spends to prevent a race condition. + // TODO: There is a race condition possible if a tx and block come in at the same time . - // TODO: fill this in properly. let incoming_tx = incoming_tx .with_routing_state(state) .with_state_in_db(None) diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index 7dc52f5..8a527ab 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -1,9 +1,12 @@ //! Tx-pool [`service`](super) interface. //! //! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums. +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + use cuprate_types::TransactionVerificationData; -use std::collections::HashMap; -use std::{collections::HashSet, sync::Arc}; use crate::types::{KeyImage, TransactionBlobHash, TransactionHash}; @@ -38,8 +41,11 @@ pub enum TxpoolReadResponse { /// The tx hashes of the blob hashes that were known but were in the stem pool. stem_pool_hashes: Vec, }, + /// The response for [`TxpoolReadRequest::TxsForBlock`]. TxsForBlock { + /// The txs we had in the txpool. txs: HashMap<[u8; 32], TransactionVerificationData>, + /// The indexes of the missing txs. missing: Vec, }, } @@ -68,9 +74,11 @@ pub enum TxpoolWriteRequest { /// /// Returns [`TxpoolWriteResponse::Ok`]. Promote(TransactionHash), - + /// Tell the tx-pool about a new block. NewBlock { + /// The new blockchain height. blockchain_height: usize, + /// The spent key images in the new block. spent_key_images: Vec, }, } From b8e96cffcb0b852fccf676f3752154ee0e8f95e7 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Mon, 14 Oct 2024 19:25:00 +0100 Subject: [PATCH 10/12] fix merge --- binaries/cuprated/src/txpool/incoming_tx.rs | 5 +++-- p2p/dandelion-tower/src/router.rs | 2 +- storage/txpool/src/service/interface.rs | 5 ++++- storage/txpool/src/service/read.rs | 1 - storage/txpool/src/types.rs | 1 + 5 files changed, 9 insertions(+), 5 deletions(-) diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index 14e98e0..a486738 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -98,6 +98,7 @@ impl Service for IncomingTxHandler { } } +#[expect(clippy::too_many_arguments)] async fn handle_incoming_txs( txs: Vec, state: TxState, @@ -117,7 +118,7 @@ async fn handle_incoming_txs( .ready() .await .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(BlockChainContextRequest::GetContext) + .call(BlockChainContextRequest::Context) .await .expect(PANIC_CRITICAL_SERVICE_ERROR) else { @@ -147,7 +148,7 @@ async fn handle_incoming_txs( &mut txpool_write_handle, &mut dandelion_pool_manager, ) - .await + .await; } for stem_tx in stem_pool_txs { diff --git a/p2p/dandelion-tower/src/router.rs b/p2p/dandelion-tower/src/router.rs index 899b123..c04dcae 100644 --- a/p2p/dandelion-tower/src/router.rs +++ b/p2p/dandelion-tower/src/router.rs @@ -74,7 +74,7 @@ pub enum TxState { } impl TxState { - pub fn state_stem(&self) -> bool { + pub const fn state_stem(&self) -> bool { matches!(self, Self::Local | Self::Stem { .. }) } } diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index e1eb050..0aa9049 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -8,7 +8,10 @@ use std::{ use cuprate_types::TransactionVerificationData; -use crate::{tx::TxEntry, types::{KeyImage, TransactionBlobHash, TransactionHash}}; +use crate::{ + tx::TxEntry, + types::{KeyImage, TransactionBlobHash, TransactionHash}, +}; //---------------------------------------------------------------------------------------------------- TxpoolReadRequest /// The transaction pool [`tower::Service`] read request type. diff --git a/storage/txpool/src/service/read.rs b/storage/txpool/src/service/read.rs index 41ee019..bdeeb74 100644 --- a/storage/txpool/src/service/read.rs +++ b/storage/txpool/src/service/read.rs @@ -4,7 +4,6 @@ clippy::unnecessary_wraps, reason = "TODO: finish implementing the signatures from " )] - use std::{ collections::{HashMap, HashSet}, sync::Arc, diff --git a/storage/txpool/src/types.rs b/storage/txpool/src/types.rs index f1c1d0f..f507bba 100644 --- a/storage/txpool/src/types.rs +++ b/storage/txpool/src/types.rs @@ -19,6 +19,7 @@ pub type TransactionHash = [u8; 32]; /// A transaction blob hash. pub type TransactionBlobHash = [u8; 32]; +/// Information on the tx-pool. #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)] #[repr(C)] pub struct PoolInfo { From bfb4cbf831205dde14b87e5da3b72353dbb0a371 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Mon, 14 Oct 2024 19:31:17 +0100 Subject: [PATCH 11/12] typos --- binaries/cuprated/src/txpool/incoming_tx.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index a486738..1ade33d 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -180,7 +180,7 @@ async fn prepare_incoming_txs( IncomingTxError, > { let mut tx_blob_hashes = HashSet::new(); - let mut txs_being_handled_loacally = txs_being_handled.local_tracker(); + let mut txs_being_handled_locally = txs_being_handled.local_tracker(); // Compute the blob hash for each tx and filter out the txs currently being handled by another incoming tx batch. let txs = tx_blobs @@ -194,7 +194,7 @@ async fn prepare_incoming_txs( } // If a duplicate is here it is being handled in another batch. - if !txs_being_handled_loacally.try_add_tx(tx_blob_hash) { + if !txs_being_handled_locally.try_add_tx(tx_blob_hash) { return None; } @@ -239,7 +239,7 @@ async fn prepare_incoming_txs( }) .collect::, IncomingTxError>>()?; - Ok((txs, stem_pool_hashes, txs_being_handled_loacally)) + Ok((txs, stem_pool_hashes, txs_being_handled_locally)) }) .await } From 6c1f871d81f848e9694444755f45902929aee227 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Mon, 14 Oct 2024 20:24:12 +0100 Subject: [PATCH 12/12] remove blockchain height from txpool --- .../src/blockchain/manager/handler.rs | 6 +---- storage/txpool/src/service/interface.rs | 2 -- storage/txpool/src/service/write.rs | 27 +++---------------- storage/txpool/src/tables.rs | 6 +---- storage/txpool/src/types.rs | 7 ----- 5 files changed, 5 insertions(+), 43 deletions(-) diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index ef537fc..c3d256b 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -447,7 +447,6 @@ impl super::BlockchainManager { }) }) .collect::>(); - let blockchain_height = verified_block.height + 1; self.blockchain_context_service .ready() @@ -492,10 +491,7 @@ impl super::BlockchainManager { .ready() .await .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(TxpoolWriteRequest::NewBlock { - spent_key_images, - blockchain_height, - }) + .call(TxpoolWriteRequest::NewBlock { spent_key_images }) .await .expect(PANIC_CRITICAL_SERVICE_ERROR); } diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index 0aa9049..a400a21 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -106,8 +106,6 @@ pub enum TxpoolWriteRequest { /// Tell the tx-pool about a new block. NewBlock { - /// The new blockchain height. - blockchain_height: usize, /// The spent key images in the new block. spent_key_images: Vec, }, diff --git a/storage/txpool/src/service/write.rs b/storage/txpool/src/service/write.rs index 2a4a5bb..9d1a598 100644 --- a/storage/txpool/src/service/write.rs +++ b/storage/txpool/src/service/write.rs @@ -10,8 +10,8 @@ use crate::{ interface::{TxpoolWriteRequest, TxpoolWriteResponse}, types::TxpoolWriteHandle, }, - tables::{OpenTables, Tables, TablesMut, TransactionInfos}, - types::{KeyImage, PoolInfo, TransactionHash, TxStateFlags}, + tables::{OpenTables, Tables, TransactionInfos}, + types::{KeyImage, TransactionHash, TxStateFlags}, }; //---------------------------------------------------------------------------------------------------- init_write_service @@ -32,10 +32,7 @@ fn handle_txpool_request( } TxpoolWriteRequest::RemoveTransaction(tx_hash) => remove_transaction(env, tx_hash), TxpoolWriteRequest::Promote(tx_hash) => promote(env, tx_hash), - TxpoolWriteRequest::NewBlock { - blockchain_height, - spent_key_images, - } => new_block(env, *blockchain_height, spent_key_images), + TxpoolWriteRequest::NewBlock { spent_key_images } => new_block(env, spent_key_images), } } @@ -130,7 +127,6 @@ fn promote( fn new_block( env: &ConcreteEnv, - blockchain_height: usize, spent_key_images: &[KeyImage], ) -> Result { let env_inner = env.env_inner(); @@ -151,23 +147,6 @@ fn new_block( } } - let res = tables_mut.pool_stats_mut().update(&(), |mut info| { - info.last_known_blockchain_height = blockchain_height; - - Some(info) - }); - - match res { - Ok(()) => (), - Err(RuntimeError::KeyNotFound) => tables_mut.pool_stats_mut().put( - &(), - &PoolInfo { - last_known_blockchain_height: blockchain_height, - }, - )?, - Err(e) => return Err(e), - } - Ok(()) }; diff --git a/storage/txpool/src/tables.rs b/storage/txpool/src/tables.rs index d1d4468..1f2d449 100644 --- a/storage/txpool/src/tables.rs +++ b/storage/txpool/src/tables.rs @@ -17,8 +17,7 @@ use cuprate_database::{define_tables, StorableVec}; use crate::types::{ - KeyImage, PoolInfo, RawCachedVerificationState, TransactionBlobHash, TransactionHash, - TransactionInfo, + KeyImage, RawCachedVerificationState, TransactionBlobHash, TransactionHash, TransactionInfo, }; define_tables! { @@ -49,7 +48,4 @@ define_tables! { /// Transaction blob hashes that are in the pool. 4 => KnownBlobHashes, TransactionBlobHash => TransactionHash, - - 5 => PoolStats, - () => PoolInfo, } diff --git a/storage/txpool/src/types.rs b/storage/txpool/src/types.rs index f507bba..2acb819 100644 --- a/storage/txpool/src/types.rs +++ b/storage/txpool/src/types.rs @@ -19,13 +19,6 @@ pub type TransactionHash = [u8; 32]; /// A transaction blob hash. pub type TransactionBlobHash = [u8; 32]; -/// Information on the tx-pool. -#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)] -#[repr(C)] -pub struct PoolInfo { - pub last_known_blockchain_height: usize, -} - bitflags::bitflags! { /// Flags representing the state of the transaction in the pool. #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)]