mirror of
https://github.com/hinto-janai/cuprate.git
synced 2025-01-10 21:04:59 +00:00
Compare commits
No commits in common. "aecbdf5728e0f7c5173e83115cf385afe84d7c94" and "5fd4718103d690d5c474fbd9e8d756f0de787f1b" have entirely different histories.
aecbdf5728
...
5fd4718103
34 changed files with 90 additions and 1143 deletions
32
Cargo.lock
generated
32
Cargo.lock
generated
|
@ -56,18 +56,6 @@ version = "1.0.89"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6"
|
||||
|
||||
[[package]]
|
||||
name = "arrayref"
|
||||
version = "0.3.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb"
|
||||
|
||||
[[package]]
|
||||
name = "arrayvec"
|
||||
version = "0.7.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
|
||||
|
||||
[[package]]
|
||||
name = "async-stream"
|
||||
version = "0.3.5"
|
||||
|
@ -250,19 +238,6 @@ dependencies = [
|
|||
"digest",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "blake3"
|
||||
version = "1.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d82033247fd8e890df8f740e407ad4d038debb9eb1f40533fffb32e7d17dc6f7"
|
||||
dependencies = [
|
||||
"arrayref",
|
||||
"arrayvec",
|
||||
"cc",
|
||||
"cfg-if",
|
||||
"constant_time_eq",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "block-buffer"
|
||||
version = "0.10.4"
|
||||
|
@ -428,12 +403,6 @@ dependencies = [
|
|||
"unicode-xid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "constant_time_eq"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6"
|
||||
|
||||
[[package]]
|
||||
name = "core-foundation"
|
||||
version = "0.9.4"
|
||||
|
@ -951,7 +920,6 @@ name = "cuprate-txpool"
|
|||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"bitflags 2.6.0",
|
||||
"blake3",
|
||||
"bytemuck",
|
||||
"cuprate-database",
|
||||
"cuprate-database-service",
|
||||
|
|
|
@ -82,7 +82,6 @@ cuprate-rpc-interface = { path = "rpc/interface" ,default-feature
|
|||
anyhow = { version = "1.0.89", default-features = false }
|
||||
async-trait = { version = "0.1.82", default-features = false }
|
||||
bitflags = { version = "2.6.0", default-features = false }
|
||||
blake3 = { version = "1", default-features = false }
|
||||
borsh = { version = "1.5.1", default-features = false }
|
||||
bytemuck = { version = "1.18.0", default-features = false }
|
||||
bytes = { version = "1.7.2", default-features = false }
|
||||
|
|
|
@ -22,7 +22,7 @@ cuprate-levin = { workspace = true }
|
|||
cuprate-wire = { workspace = true }
|
||||
cuprate-p2p = { workspace = true }
|
||||
cuprate-p2p-core = { workspace = true }
|
||||
cuprate-dandelion-tower = { workspace = true, features = ["txpool"] }
|
||||
cuprate-dandelion-tower = { workspace = true }
|
||||
cuprate-async-buffer = { workspace = true }
|
||||
cuprate-address-book = { workspace = true }
|
||||
cuprate-blockchain = { workspace = true, features = ["service"] }
|
||||
|
|
|
@ -25,7 +25,7 @@ mod manager;
|
|||
mod syncer;
|
||||
mod types;
|
||||
|
||||
pub use types::{
|
||||
use types::{
|
||||
ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle,
|
||||
};
|
||||
|
||||
|
|
|
@ -8,16 +8,17 @@ use std::{
|
|||
};
|
||||
|
||||
use monero_serai::{block::Block, transaction::Transaction};
|
||||
use rayon::prelude::*;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tower::{Service, ServiceExt};
|
||||
|
||||
use cuprate_blockchain::service::BlockchainReadHandle;
|
||||
use cuprate_consensus::transactions::new_tx_verification_data;
|
||||
use cuprate_txpool::service::{
|
||||
interface::{TxpoolReadRequest, TxpoolReadResponse},
|
||||
TxpoolReadHandle,
|
||||
use cuprate_helper::cast::usize_to_u64;
|
||||
use cuprate_types::{
|
||||
blockchain::{BlockchainReadRequest, BlockchainResponse},
|
||||
Chain,
|
||||
};
|
||||
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
|
||||
|
||||
use crate::{
|
||||
blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk},
|
||||
|
@ -37,7 +38,7 @@ pub enum IncomingBlockError {
|
|||
///
|
||||
/// The inner values are the block hash and the indexes of the missing txs in the block.
|
||||
#[error("Unknown transactions in block.")]
|
||||
UnknownTransactions([u8; 32], Vec<usize>),
|
||||
UnknownTransactions([u8; 32], Vec<u64>),
|
||||
/// We are missing the block's parent.
|
||||
#[error("The block has an unknown parent.")]
|
||||
Orphan,
|
||||
|
@ -58,9 +59,8 @@ pub enum IncomingBlockError {
|
|||
/// - the block's parent is unknown
|
||||
pub async fn handle_incoming_block(
|
||||
block: Block,
|
||||
mut given_txs: HashMap<[u8; 32], Transaction>,
|
||||
given_txs: Vec<Transaction>,
|
||||
blockchain_read_handle: &mut BlockchainReadHandle,
|
||||
txpool_read_handle: &mut TxpoolReadHandle,
|
||||
) -> Result<IncomingBlockOk, IncomingBlockError> {
|
||||
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
|
||||
///
|
||||
|
@ -72,12 +72,7 @@ pub async fn handle_incoming_block(
|
|||
/// which are also more expensive than `Mutex`s.
|
||||
static BLOCKS_BEING_HANDLED: LazyLock<Mutex<HashSet<[u8; 32]>>> =
|
||||
LazyLock::new(|| Mutex::new(HashSet::new()));
|
||||
|
||||
if given_txs.len() > block.transactions.len() {
|
||||
return Err(IncomingBlockError::InvalidBlock(anyhow::anyhow!(
|
||||
"Too many transactions given for block"
|
||||
)));
|
||||
}
|
||||
// FIXME: we should look in the tx-pool for txs when that is ready.
|
||||
|
||||
if !block_exists(block.header.previous, blockchain_read_handle)
|
||||
.await
|
||||
|
@ -95,37 +90,24 @@ pub async fn handle_incoming_block(
|
|||
return Ok(IncomingBlockOk::AlreadyHave);
|
||||
}
|
||||
|
||||
let TxpoolReadResponse::TxsForBlock { mut txs, missing } = txpool_read_handle
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(TxpoolReadRequest::TxsForBlock(block.transactions.clone()))
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
if !missing.is_empty() {
|
||||
let needed_hashes = missing.iter().map(|index| block.transactions[*index]);
|
||||
|
||||
for needed_hash in needed_hashes {
|
||||
let Some(tx) = given_txs.remove(&needed_hash) else {
|
||||
// We return back the indexes of all txs missing from our pool, not taking into account the txs
|
||||
// that were given with the block, as these txs will be dropped. It is not worth it to try to add
|
||||
// these txs to the pool as this will only happen with a misbehaving peer or if the txpool reaches
|
||||
// the size limit.
|
||||
return Err(IncomingBlockError::UnknownTransactions(block_hash, missing));
|
||||
};
|
||||
|
||||
txs.insert(
|
||||
needed_hash,
|
||||
new_tx_verification_data(tx)
|
||||
.map_err(|e| IncomingBlockError::InvalidBlock(e.into()))?,
|
||||
);
|
||||
}
|
||||
// TODO: remove this when we have a working tx-pool.
|
||||
if given_txs.len() != block.transactions.len() {
|
||||
return Err(IncomingBlockError::UnknownTransactions(
|
||||
block_hash,
|
||||
(0..usize_to_u64(block.transactions.len())).collect(),
|
||||
));
|
||||
}
|
||||
|
||||
// TODO: check we actually got given the right txs.
|
||||
let prepped_txs = given_txs
|
||||
.into_par_iter()
|
||||
.map(|tx| {
|
||||
let tx = new_tx_verification_data(tx)?;
|
||||
Ok((tx.tx_hash, tx))
|
||||
})
|
||||
.collect::<Result<_, anyhow::Error>>()
|
||||
.map_err(IncomingBlockError::InvalidBlock)?;
|
||||
|
||||
let Some(incoming_block_tx) = COMMAND_TX.get() else {
|
||||
// We could still be starting up the blockchain manager.
|
||||
return Ok(IncomingBlockOk::NotReady);
|
||||
|
@ -137,37 +119,28 @@ pub async fn handle_incoming_block(
|
|||
return Ok(IncomingBlockOk::AlreadyHave);
|
||||
}
|
||||
|
||||
// We must remove the block hash from `BLOCKS_BEING_HANDLED`.
|
||||
let _guard = {
|
||||
struct RemoveFromBlocksBeingHandled {
|
||||
block_hash: [u8; 32],
|
||||
}
|
||||
impl Drop for RemoveFromBlocksBeingHandled {
|
||||
fn drop(&mut self) {
|
||||
BLOCKS_BEING_HANDLED
|
||||
.lock()
|
||||
.unwrap()
|
||||
.remove(&self.block_hash);
|
||||
}
|
||||
}
|
||||
RemoveFromBlocksBeingHandled { block_hash }
|
||||
};
|
||||
// From this point on we MUST not early return without removing the block hash from `BLOCKS_BEING_HANDLED`.
|
||||
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
|
||||
incoming_block_tx
|
||||
.send(BlockchainManagerCommand::AddBlock {
|
||||
block,
|
||||
prepped_txs: txs,
|
||||
prepped_txs,
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.expect("TODO: don't actually panic here, an err means we are shutting down");
|
||||
|
||||
response_rx
|
||||
let res = response_rx
|
||||
.await
|
||||
.expect("The blockchain manager will always respond")
|
||||
.map_err(IncomingBlockError::InvalidBlock)
|
||||
.map_err(IncomingBlockError::InvalidBlock);
|
||||
|
||||
// Remove the block hash from the blocks being handled.
|
||||
BLOCKS_BEING_HANDLED.lock().unwrap().remove(&block_hash);
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
/// Check if we have a block with the given hash.
|
||||
|
|
|
@ -18,7 +18,6 @@ use cuprate_p2p::{
|
|||
BroadcastSvc, NetworkInterface,
|
||||
};
|
||||
use cuprate_p2p_core::ClearNet;
|
||||
use cuprate_txpool::service::TxpoolWriteHandle;
|
||||
use cuprate_types::{
|
||||
blockchain::{BlockchainReadRequest, BlockchainResponse},
|
||||
Chain, TransactionVerificationData,
|
||||
|
@ -47,7 +46,6 @@ pub async fn init_blockchain_manager(
|
|||
clearnet_interface: NetworkInterface<ClearNet>,
|
||||
blockchain_write_handle: BlockchainWriteHandle,
|
||||
blockchain_read_handle: BlockchainReadHandle,
|
||||
txpool_write_handle: TxpoolWriteHandle,
|
||||
mut blockchain_context_service: BlockChainContextService,
|
||||
block_verifier_service: ConcreteBlockVerifierService,
|
||||
block_downloader_config: BlockDownloaderConfig,
|
||||
|
@ -82,7 +80,6 @@ pub async fn init_blockchain_manager(
|
|||
let manager = BlockchainManager {
|
||||
blockchain_write_handle,
|
||||
blockchain_read_handle,
|
||||
txpool_write_handle,
|
||||
blockchain_context_service,
|
||||
cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(),
|
||||
block_verifier_service,
|
||||
|
@ -105,8 +102,6 @@ pub struct BlockchainManager {
|
|||
blockchain_write_handle: BlockchainWriteHandle,
|
||||
/// A [`BlockchainReadHandle`].
|
||||
blockchain_read_handle: BlockchainReadHandle,
|
||||
/// A [`TxpoolWriteHandle`].
|
||||
txpool_write_handle: TxpoolWriteHandle,
|
||||
// TODO: Improve the API of the cache service.
|
||||
// TODO: rename the cache service -> `BlockchainContextService`.
|
||||
/// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve
|
||||
|
|
|
@ -1,10 +1,7 @@
|
|||
//! The blockchain manager handler functions.
|
||||
use bytes::Bytes;
|
||||
use futures::{TryFutureExt, TryStreamExt};
|
||||
use monero_serai::{
|
||||
block::Block,
|
||||
transaction::{Input, Transaction},
|
||||
};
|
||||
use monero_serai::{block::Block, transaction::Transaction};
|
||||
use rayon::prelude::*;
|
||||
use std::ops::ControlFlow;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
@ -20,14 +17,16 @@ use cuprate_consensus::{
|
|||
use cuprate_consensus_context::NewBlockData;
|
||||
use cuprate_helper::cast::usize_to_u64;
|
||||
use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest};
|
||||
use cuprate_txpool::service::interface::TxpoolWriteRequest;
|
||||
use cuprate_types::{
|
||||
blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest},
|
||||
AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation,
|
||||
};
|
||||
|
||||
use crate::blockchain::manager::commands::IncomingBlockOk;
|
||||
use crate::{
|
||||
blockchain::manager::commands::{BlockchainManagerCommand, IncomingBlockOk},
|
||||
blockchain::{
|
||||
manager::commands::BlockchainManagerCommand, types::ConsensusBlockchainReadHandle,
|
||||
},
|
||||
constants::PANIC_CRITICAL_SERVICE_ERROR,
|
||||
signals::REORG_LOCK,
|
||||
};
|
||||
|
@ -435,18 +434,6 @@ impl super::BlockchainManager {
|
|||
&mut self,
|
||||
verified_block: VerifiedBlockInformation,
|
||||
) {
|
||||
// FIXME: this is pretty inefficient, we should probably return the KI map created in the consensus crate.
|
||||
let spent_key_images = verified_block
|
||||
.txs
|
||||
.iter()
|
||||
.flat_map(|tx| {
|
||||
tx.tx.prefix().inputs.iter().map(|input| match input {
|
||||
Input::ToKey { key_image, .. } => key_image.compress().0,
|
||||
Input::Gen(_) => unreachable!(),
|
||||
})
|
||||
})
|
||||
.collect::<Vec<[u8; 32]>>();
|
||||
|
||||
self.blockchain_context_service
|
||||
.ready()
|
||||
.await
|
||||
|
@ -485,14 +472,6 @@ impl super::BlockchainManager {
|
|||
};
|
||||
|
||||
self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone();
|
||||
|
||||
self.txpool_write_handle
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(TxpoolWriteRequest::NewBlock { spent_key_images })
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
// FIXME: This whole module is not great and should be rewritten when the PeerSet is made.
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use std::{pin::pin, sync::Arc, time::Duration};
|
||||
|
||||
use futures::StreamExt;
|
||||
use tokio::time::interval;
|
||||
use tokio::{
|
||||
sync::{mpsc, Notify},
|
||||
time::interval,
|
||||
time::sleep,
|
||||
};
|
||||
use tower::{Service, ServiceExt};
|
||||
use tracing::instrument;
|
||||
|
|
|
@ -1,7 +1,13 @@
|
|||
use tower::util::MapErr;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use tower::{util::MapErr, Service};
|
||||
|
||||
use cuprate_blockchain::{cuprate_database::RuntimeError, service::BlockchainReadHandle};
|
||||
use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService};
|
||||
use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse};
|
||||
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
|
||||
|
||||
/// The [`BlockVerifierService`] with all generic types defined.
|
||||
pub type ConcreteBlockVerifierService = BlockVerifierService<
|
||||
|
|
|
@ -2,7 +2,4 @@
|
|||
//!
|
||||
//! Will handle initiating the P2P and contains a protocol request handler.
|
||||
|
||||
mod network_address;
|
||||
pub mod request_handler;
|
||||
|
||||
pub use network_address::CrossNetworkInternalPeerId;
|
||||
|
|
|
@ -1,16 +0,0 @@
|
|||
use std::net::SocketAddr;
|
||||
|
||||
use cuprate_p2p_core::{client::InternalPeerID, ClearNet, NetworkZone};
|
||||
|
||||
/// An identifier for a P2P peer on any network.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub enum CrossNetworkInternalPeerId {
|
||||
/// A clear-net peer.
|
||||
ClearNet(InternalPeerID<<ClearNet as NetworkZone>::Addr>),
|
||||
}
|
||||
|
||||
impl From<InternalPeerID<<ClearNet as NetworkZone>::Addr>> for CrossNetworkInternalPeerId {
|
||||
fn from(addr: InternalPeerID<<ClearNet as NetworkZone>::Addr>) -> Self {
|
||||
Self::ClearNet(addr)
|
||||
}
|
||||
}
|
|
@ -1,7 +1,7 @@
|
|||
//! Global `static`s used throughout `cuprated`.
|
||||
|
||||
use std::{
|
||||
sync::LazyLock,
|
||||
sync::{atomic::AtomicU64, LazyLock},
|
||||
time::{SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
|
|
|
@ -1,15 +1,3 @@
|
|||
//! Transaction Pool
|
||||
//!
|
||||
//! Handles initiating the tx-pool, providing the preprocessor required for the dandelion pool.
|
||||
use cuprate_consensus::BlockChainContextService;
|
||||
use cuprate_p2p::NetworkInterface;
|
||||
use cuprate_p2p_core::ClearNet;
|
||||
use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
|
||||
|
||||
use crate::blockchain::ConcreteTxVerifierService;
|
||||
|
||||
mod dandelion;
|
||||
mod incoming_tx;
|
||||
mod txs_being_handled;
|
||||
|
||||
pub use incoming_tx::IncomingTxHandler;
|
||||
//! Will handle initiating the tx-pool, providing the preprocessor required for the dandelion pool.
|
||||
|
|
|
@ -1,65 +0,0 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use cuprate_dandelion_tower::{
|
||||
pool::DandelionPoolService, DandelionConfig, DandelionRouter, Graph,
|
||||
};
|
||||
use cuprate_p2p::NetworkInterface;
|
||||
use cuprate_p2p_core::ClearNet;
|
||||
use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
|
||||
|
||||
use crate::{
|
||||
p2p::CrossNetworkInternalPeerId,
|
||||
txpool::incoming_tx::{DandelionTx, TxId},
|
||||
};
|
||||
|
||||
mod diffuse_service;
|
||||
mod stem_service;
|
||||
mod tx_store;
|
||||
|
||||
/// The configuration used for [`cuprate_dandelion_tower`].
|
||||
///
|
||||
/// TODO: should we expose this to users of cuprated? probably not.
|
||||
const DANDELION_CONFIG: DandelionConfig = DandelionConfig {
|
||||
time_between_hop: Duration::from_millis(175),
|
||||
epoch_duration: Duration::from_secs(10 * 60),
|
||||
fluff_probability: 0.12,
|
||||
graph: Graph::FourRegular,
|
||||
};
|
||||
|
||||
/// A [`DandelionRouter`] with all generic types defined.
|
||||
type ConcreteDandelionRouter = DandelionRouter<
|
||||
stem_service::OutboundPeerStream,
|
||||
diffuse_service::DiffuseService,
|
||||
CrossNetworkInternalPeerId,
|
||||
stem_service::StemPeerService<ClearNet>,
|
||||
DandelionTx,
|
||||
>;
|
||||
|
||||
/// Starts the dandelion pool manager task and returns a handle to send txs to broadcast.
|
||||
pub fn start_dandelion_pool_manager(
|
||||
router: ConcreteDandelionRouter,
|
||||
txpool_read_handle: TxpoolReadHandle,
|
||||
txpool_write_handle: TxpoolWriteHandle,
|
||||
) -> DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId> {
|
||||
cuprate_dandelion_tower::pool::start_dandelion_pool_manager(
|
||||
// TODO: make this constant configurable?
|
||||
32,
|
||||
router,
|
||||
tx_store::TxStoreService {
|
||||
txpool_read_handle,
|
||||
txpool_write_handle,
|
||||
},
|
||||
DANDELION_CONFIG,
|
||||
)
|
||||
}
|
||||
|
||||
/// Creates a [`DandelionRouter`] from a [`NetworkInterface`].
|
||||
pub fn dandelion_router(clear_net: NetworkInterface<ClearNet>) -> ConcreteDandelionRouter {
|
||||
DandelionRouter::new(
|
||||
diffuse_service::DiffuseService {
|
||||
clear_net_broadcast_service: clear_net.broadcast_svc(),
|
||||
},
|
||||
stem_service::OutboundPeerStream { clear_net },
|
||||
DANDELION_CONFIG,
|
||||
)
|
||||
}
|
|
@ -1,44 +0,0 @@
|
|||
use std::{
|
||||
future::{ready, Ready},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use futures::FutureExt;
|
||||
use tower::Service;
|
||||
|
||||
use cuprate_dandelion_tower::traits::DiffuseRequest;
|
||||
use cuprate_p2p::{BroadcastRequest, BroadcastSvc};
|
||||
use cuprate_p2p_core::ClearNet;
|
||||
|
||||
use crate::txpool::dandelion::DandelionTx;
|
||||
|
||||
/// The dandelion diffusion service.
|
||||
pub struct DiffuseService {
|
||||
pub clear_net_broadcast_service: BroadcastSvc<ClearNet>,
|
||||
}
|
||||
|
||||
impl Service<DiffuseRequest<DandelionTx>> for DiffuseService {
|
||||
type Response = ();
|
||||
type Error = tower::BoxError;
|
||||
type Future = Ready<Result<Self::Response, Self::Error>>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.clear_net_broadcast_service
|
||||
.poll_ready(cx)
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: DiffuseRequest<DandelionTx>) -> Self::Future {
|
||||
// TODO: the dandelion crate should pass along where we got the tx from.
|
||||
let Ok(()) = self
|
||||
.clear_net_broadcast_service
|
||||
.call(BroadcastRequest::Transaction {
|
||||
tx_bytes: req.0 .0,
|
||||
direction: None,
|
||||
received_from: None,
|
||||
})
|
||||
.into_inner();
|
||||
|
||||
ready(Ok(()))
|
||||
}
|
||||
}
|
|
@ -1,68 +0,0 @@
|
|||
use std::{
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::Stream;
|
||||
use tower::Service;
|
||||
|
||||
use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer};
|
||||
use cuprate_p2p::{ClientPoolDropGuard, NetworkInterface};
|
||||
use cuprate_p2p_core::{
|
||||
client::{Client, InternalPeerID},
|
||||
ClearNet, NetworkZone, PeerRequest, ProtocolRequest,
|
||||
};
|
||||
use cuprate_wire::protocol::NewTransactions;
|
||||
|
||||
use crate::{p2p::CrossNetworkInternalPeerId, txpool::dandelion::DandelionTx};
|
||||
|
||||
/// The dandelion outbound peer stream.
|
||||
pub struct OutboundPeerStream {
|
||||
pub clear_net: NetworkInterface<ClearNet>,
|
||||
}
|
||||
|
||||
impl Stream for OutboundPeerStream {
|
||||
type Item = Result<
|
||||
OutboundPeer<CrossNetworkInternalPeerId, StemPeerService<ClearNet>>,
|
||||
tower::BoxError,
|
||||
>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
// TODO: make the outbound peer choice random.
|
||||
Poll::Ready(Some(Ok(self
|
||||
.clear_net
|
||||
.client_pool()
|
||||
.outbound_client()
|
||||
.map_or(OutboundPeer::Exhausted, |client| {
|
||||
OutboundPeer::Peer(
|
||||
CrossNetworkInternalPeerId::ClearNet(client.info.id),
|
||||
StemPeerService(client),
|
||||
)
|
||||
}))))
|
||||
}
|
||||
}
|
||||
|
||||
/// The stem service, used to send stem txs.
|
||||
pub struct StemPeerService<N: NetworkZone>(ClientPoolDropGuard<N>);
|
||||
|
||||
impl<N: NetworkZone> Service<StemRequest<DandelionTx>> for StemPeerService<N> {
|
||||
type Response = <Client<N> as Service<PeerRequest>>::Response;
|
||||
type Error = tower::BoxError;
|
||||
type Future = <Client<N> as Service<PeerRequest>>::Future;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.0.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: StemRequest<DandelionTx>) -> Self::Future {
|
||||
self.0
|
||||
.call(PeerRequest::Protocol(ProtocolRequest::NewTransactions(
|
||||
NewTransactions {
|
||||
txs: vec![req.0 .0],
|
||||
dandelionpp_fluff: false,
|
||||
padding: Bytes::new(),
|
||||
},
|
||||
)))
|
||||
}
|
||||
}
|
|
@ -1,74 +0,0 @@
|
|||
use std::task::{Context, Poll};
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::{future::BoxFuture, FutureExt};
|
||||
use tower::{Service, ServiceExt};
|
||||
|
||||
use cuprate_dandelion_tower::{
|
||||
traits::{TxStoreRequest, TxStoreResponse},
|
||||
State,
|
||||
};
|
||||
use cuprate_database::RuntimeError;
|
||||
use cuprate_txpool::service::{
|
||||
interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest},
|
||||
TxpoolReadHandle, TxpoolWriteHandle,
|
||||
};
|
||||
|
||||
use super::{DandelionTx, TxId};
|
||||
|
||||
/// The dandelion tx-store service.
|
||||
///
|
||||
/// This is just mapping the interface [`cuprate_dandelion_tower`] wants to what [`cuprate_txpool`] provides.
|
||||
pub struct TxStoreService {
|
||||
pub txpool_read_handle: TxpoolReadHandle,
|
||||
pub txpool_write_handle: TxpoolWriteHandle,
|
||||
}
|
||||
|
||||
impl Service<TxStoreRequest<TxId>> for TxStoreService {
|
||||
type Response = TxStoreResponse<DandelionTx>;
|
||||
type Error = tower::BoxError;
|
||||
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: TxStoreRequest<TxId>) -> Self::Future {
|
||||
match req {
|
||||
TxStoreRequest::Get(tx_id) => self
|
||||
.txpool_read_handle
|
||||
.clone()
|
||||
.oneshot(TxpoolReadRequest::TxBlob(tx_id))
|
||||
.map(|res| match res {
|
||||
Ok(TxpoolReadResponse::TxBlob {
|
||||
tx_blob,
|
||||
state_stem,
|
||||
}) => {
|
||||
let state = if state_stem {
|
||||
State::Stem
|
||||
} else {
|
||||
State::Fluff
|
||||
};
|
||||
|
||||
Ok(TxStoreResponse::Transaction(Some((
|
||||
DandelionTx(Bytes::from(tx_blob)),
|
||||
state,
|
||||
))))
|
||||
}
|
||||
Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Transaction(None)),
|
||||
Err(e) => Err(e.into()),
|
||||
Ok(_) => unreachable!(),
|
||||
})
|
||||
.boxed(),
|
||||
TxStoreRequest::Promote(tx_id) => self
|
||||
.txpool_write_handle
|
||||
.clone()
|
||||
.oneshot(TxpoolWriteRequest::Promote(tx_id))
|
||||
.map(|res| match res {
|
||||
Ok(_) | Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Ok),
|
||||
Err(e) => Err(e.into()),
|
||||
})
|
||||
.boxed(),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,379 +0,0 @@
|
|||
use std::{
|
||||
collections::HashSet,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::{future::BoxFuture, FutureExt};
|
||||
use monero_serai::transaction::Transaction;
|
||||
use tower::{Service, ServiceExt};
|
||||
|
||||
use cuprate_consensus::{
|
||||
transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
|
||||
BlockChainContextService, ExtendedConsensusError, VerifyTxRequest,
|
||||
};
|
||||
use cuprate_dandelion_tower::{
|
||||
pool::{DandelionPoolService, IncomingTxBuilder},
|
||||
State, TxState,
|
||||
};
|
||||
use cuprate_helper::asynch::rayon_spawn_async;
|
||||
use cuprate_p2p::NetworkInterface;
|
||||
use cuprate_p2p_core::ClearNet;
|
||||
use cuprate_txpool::{
|
||||
service::{
|
||||
interface::{
|
||||
TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse,
|
||||
},
|
||||
TxpoolReadHandle, TxpoolWriteHandle,
|
||||
},
|
||||
transaction_blob_hash,
|
||||
};
|
||||
use cuprate_types::TransactionVerificationData;
|
||||
|
||||
use crate::{
|
||||
blockchain::ConcreteTxVerifierService,
|
||||
constants::PANIC_CRITICAL_SERVICE_ERROR,
|
||||
p2p::CrossNetworkInternalPeerId,
|
||||
signals::REORG_LOCK,
|
||||
txpool::{
|
||||
dandelion,
|
||||
txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally},
|
||||
},
|
||||
};
|
||||
|
||||
/// An error that can happen handling an incoming tx.
|
||||
pub enum IncomingTxError {
|
||||
Parse(std::io::Error),
|
||||
Consensus(ExtendedConsensusError),
|
||||
DuplicateTransaction,
|
||||
}
|
||||
|
||||
/// Incoming transactions.
|
||||
pub struct IncomingTxs {
|
||||
/// The raw bytes of the transactions.
|
||||
pub txs: Vec<Bytes>,
|
||||
/// The routing state of the transactions.
|
||||
pub state: TxState<CrossNetworkInternalPeerId>,
|
||||
}
|
||||
|
||||
/// The transaction type used for dandelion++.
|
||||
#[derive(Clone)]
|
||||
pub struct DandelionTx(pub Bytes);
|
||||
|
||||
/// A transaction ID/hash.
|
||||
pub(super) type TxId = [u8; 32];
|
||||
|
||||
/// The service than handles incoming transaction pool transactions.
|
||||
///
|
||||
/// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes.
|
||||
pub struct IncomingTxHandler {
|
||||
/// A store of txs currently being handled in incoming tx requests.
|
||||
pub(super) txs_being_handled: TxsBeingHandled,
|
||||
/// The blockchain context cache.
|
||||
pub(super) blockchain_context_cache: BlockChainContextService,
|
||||
/// The dandelion txpool manager.
|
||||
pub(super) dandelion_pool_manager:
|
||||
DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
|
||||
/// The transaction verifier service.
|
||||
pub(super) tx_verifier_service: ConcreteTxVerifierService,
|
||||
/// The txpool write handle.
|
||||
pub(super) txpool_write_handle: TxpoolWriteHandle,
|
||||
/// The txpool read handle.
|
||||
pub(super) txpool_read_handle: TxpoolReadHandle,
|
||||
}
|
||||
|
||||
impl IncomingTxHandler {
|
||||
/// Initialize the [`IncomingTxHandler`].
|
||||
#[expect(clippy::significant_drop_tightening)]
|
||||
pub fn init(
|
||||
clear_net: NetworkInterface<ClearNet>,
|
||||
txpool_write_handle: TxpoolWriteHandle,
|
||||
txpool_read_handle: TxpoolReadHandle,
|
||||
blockchain_context_cache: BlockChainContextService,
|
||||
tx_verifier_service: ConcreteTxVerifierService,
|
||||
) -> Self {
|
||||
let dandelion_router = dandelion::dandelion_router(clear_net);
|
||||
|
||||
let dandelion_pool_manager = dandelion::start_dandelion_pool_manager(
|
||||
dandelion_router,
|
||||
txpool_read_handle.clone(),
|
||||
txpool_write_handle.clone(),
|
||||
);
|
||||
|
||||
Self {
|
||||
txs_being_handled: TxsBeingHandled::new(),
|
||||
blockchain_context_cache,
|
||||
dandelion_pool_manager,
|
||||
tx_verifier_service,
|
||||
txpool_write_handle,
|
||||
txpool_read_handle,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Service<IncomingTxs> for IncomingTxHandler {
|
||||
type Response = ();
|
||||
type Error = IncomingTxError;
|
||||
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: IncomingTxs) -> Self::Future {
|
||||
handle_incoming_txs(
|
||||
req,
|
||||
self.txs_being_handled.clone(),
|
||||
self.blockchain_context_cache.clone(),
|
||||
self.tx_verifier_service.clone(),
|
||||
self.txpool_write_handle.clone(),
|
||||
self.txpool_read_handle.clone(),
|
||||
self.dandelion_pool_manager.clone(),
|
||||
)
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles the incoming txs.
|
||||
async fn handle_incoming_txs(
|
||||
IncomingTxs { txs, state }: IncomingTxs,
|
||||
txs_being_handled: TxsBeingHandled,
|
||||
mut blockchain_context_cache: BlockChainContextService,
|
||||
mut tx_verifier_service: ConcreteTxVerifierService,
|
||||
mut txpool_write_handle: TxpoolWriteHandle,
|
||||
mut txpool_read_handle: TxpoolReadHandle,
|
||||
mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
|
||||
) -> Result<(), IncomingTxError> {
|
||||
let _reorg_guard = REORG_LOCK.read().await;
|
||||
|
||||
let (txs, stem_pool_txs, txs_being_handled_guard) =
|
||||
prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?;
|
||||
|
||||
let BlockChainContextResponse::Context(context) = blockchain_context_cache
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(BlockChainContextRequest::Context)
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
let context = context.unchecked_blockchain_context();
|
||||
|
||||
tx_verifier_service
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(VerifyTxRequest::Prepped {
|
||||
txs: txs.clone(),
|
||||
current_chain_height: context.chain_height,
|
||||
top_hash: context.top_hash,
|
||||
time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(),
|
||||
hf: context.current_hf,
|
||||
})
|
||||
.await
|
||||
.map_err(IncomingTxError::Consensus)?;
|
||||
|
||||
for tx in txs {
|
||||
handle_valid_tx(
|
||||
tx,
|
||||
state.clone(),
|
||||
&mut txpool_write_handle,
|
||||
&mut dandelion_pool_manager,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Re-relay any txs we got in the block that were already in our stem pool.
|
||||
for stem_tx in stem_pool_txs {
|
||||
rerelay_stem_tx(
|
||||
&stem_tx,
|
||||
state.clone(),
|
||||
&mut txpool_read_handle,
|
||||
&mut dandelion_pool_manager,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prepares the incoming transactions for verification.
|
||||
///
|
||||
/// This will filter out all transactions already in the pool or txs already being handled in another request.
|
||||
///
|
||||
/// Returns in order:
|
||||
/// - The [`TransactionVerificationData`] for all the txs we did not already have
|
||||
/// - The Ids of the transactions in the incoming message that are in our stem-pool
|
||||
/// - A [`TxsBeingHandledLocally`] guard that prevents verifying the same tx at the same time across 2 tasks.
|
||||
async fn prepare_incoming_txs(
|
||||
tx_blobs: Vec<Bytes>,
|
||||
txs_being_handled: TxsBeingHandled,
|
||||
txpool_read_handle: &mut TxpoolReadHandle,
|
||||
) -> Result<
|
||||
(
|
||||
Vec<Arc<TransactionVerificationData>>,
|
||||
Vec<TxId>,
|
||||
TxsBeingHandledLocally,
|
||||
),
|
||||
IncomingTxError,
|
||||
> {
|
||||
let mut tx_blob_hashes = HashSet::new();
|
||||
let mut txs_being_handled_locally = txs_being_handled.local_tracker();
|
||||
|
||||
// Compute the blob hash for each tx and filter out the txs currently being handled by another incoming tx batch.
|
||||
let txs = tx_blobs
|
||||
.into_iter()
|
||||
.filter_map(|tx_blob| {
|
||||
let tx_blob_hash = transaction_blob_hash(&tx_blob);
|
||||
|
||||
// If a duplicate is in here the incoming tx batch contained the same tx twice.
|
||||
if !tx_blob_hashes.insert(tx_blob_hash) {
|
||||
return Some(Err(IncomingTxError::DuplicateTransaction));
|
||||
}
|
||||
|
||||
// If a duplicate is here it is being handled in another batch.
|
||||
if !txs_being_handled_locally.try_add_tx(tx_blob_hash) {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(Ok((tx_blob_hash, tx_blob)))
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
// Filter the txs already in the txpool out.
|
||||
// This will leave the txs already in the pool in [`TxBeingHandledLocally`] but that shouldn't be an issue.
|
||||
let TxpoolReadResponse::FilterKnownTxBlobHashes {
|
||||
unknown_blob_hashes,
|
||||
stem_pool_hashes,
|
||||
} = txpool_read_handle
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes))
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
// Now prepare the txs for verification.
|
||||
rayon_spawn_async(move || {
|
||||
let txs = txs
|
||||
.into_iter()
|
||||
.filter_map(|(tx_blob_hash, tx_blob)| {
|
||||
if unknown_blob_hashes.contains(&tx_blob_hash) {
|
||||
Some(tx_blob)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.map(|bytes| {
|
||||
let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?;
|
||||
|
||||
let tx = new_tx_verification_data(tx)
|
||||
.map_err(|e| IncomingTxError::Consensus(e.into()))?;
|
||||
|
||||
Ok(Arc::new(tx))
|
||||
})
|
||||
.collect::<Result<Vec<_>, IncomingTxError>>()?;
|
||||
|
||||
Ok((txs, stem_pool_hashes, txs_being_handled_locally))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Handle a verified tx.
|
||||
///
|
||||
/// This will add the tx to the txpool and route it to the network.
|
||||
async fn handle_valid_tx(
|
||||
tx: Arc<TransactionVerificationData>,
|
||||
state: TxState<CrossNetworkInternalPeerId>,
|
||||
txpool_write_handle: &mut TxpoolWriteHandle,
|
||||
dandelion_pool_manager: &mut DandelionPoolService<
|
||||
DandelionTx,
|
||||
TxId,
|
||||
CrossNetworkInternalPeerId,
|
||||
>,
|
||||
) {
|
||||
let incoming_tx =
|
||||
IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash);
|
||||
|
||||
let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(TxpoolWriteRequest::AddTransaction {
|
||||
tx,
|
||||
state_stem: state.is_stem_stage(),
|
||||
})
|
||||
.await
|
||||
.expect("TODO")
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
// TODO: track double spends to quickly ignore them from their blob hash.
|
||||
if let Some(tx_hash) = double_spend {
|
||||
return;
|
||||
};
|
||||
|
||||
// TODO: There is a race condition possible if a tx and block come in at the same time: <https://github.com/Cuprate/cuprate/issues/314>.
|
||||
|
||||
let incoming_tx = incoming_tx
|
||||
.with_routing_state(state)
|
||||
.with_state_in_db(None)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
dandelion_pool_manager
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(incoming_tx)
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR);
|
||||
}
|
||||
|
||||
/// Re-relay a tx that was already in our stem pool.
|
||||
async fn rerelay_stem_tx(
|
||||
tx_hash: &TxId,
|
||||
state: TxState<CrossNetworkInternalPeerId>,
|
||||
txpool_read_handle: &mut TxpoolReadHandle,
|
||||
dandelion_pool_manager: &mut DandelionPoolService<
|
||||
DandelionTx,
|
||||
TxId,
|
||||
CrossNetworkInternalPeerId,
|
||||
>,
|
||||
) {
|
||||
let Ok(TxpoolReadResponse::TxBlob { tx_blob, .. }) = txpool_read_handle
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(TxpoolReadRequest::TxBlob(*tx_hash))
|
||||
.await
|
||||
else {
|
||||
// The tx could have been dropped from the pool.
|
||||
return;
|
||||
};
|
||||
|
||||
let incoming_tx =
|
||||
IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx_blob)), *tx_hash);
|
||||
|
||||
let incoming_tx = incoming_tx
|
||||
.with_routing_state(state)
|
||||
.with_state_in_db(Some(State::Stem))
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
dandelion_pool_manager
|
||||
.ready()
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||
.call(incoming_tx)
|
||||
.await
|
||||
.expect(PANIC_CRITICAL_SERVICE_ERROR);
|
||||
}
|
|
@ -1,53 +0,0 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use dashmap::DashSet;
|
||||
|
||||
/// A set of txs currently being handled, shared between instances of the incoming tx handler.
|
||||
#[derive(Clone)]
|
||||
pub struct TxsBeingHandled(Arc<DashSet<[u8; 32]>>);
|
||||
|
||||
impl TxsBeingHandled {
|
||||
/// Create a new [`TxsBeingHandled`]
|
||||
pub fn new() -> Self {
|
||||
Self(Arc::new(DashSet::new()))
|
||||
}
|
||||
|
||||
/// Create a new [`TxsBeingHandledLocally`] that will keep track of txs being handled in a request.
|
||||
pub fn local_tracker(&self) -> TxsBeingHandledLocally {
|
||||
TxsBeingHandledLocally {
|
||||
txs_being_handled: self.clone(),
|
||||
txs: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A tracker of txs being handled in a single request. This will add the txs to the global [`TxsBeingHandled`]
|
||||
/// tracker as well.
|
||||
///
|
||||
/// When this is dropped the txs will be removed from [`TxsBeingHandled`].
|
||||
pub struct TxsBeingHandledLocally {
|
||||
txs_being_handled: TxsBeingHandled,
|
||||
txs: Vec<[u8; 32]>,
|
||||
}
|
||||
|
||||
impl TxsBeingHandledLocally {
|
||||
/// Try add a tx to the map from its [`transaction_blob_hash`](cuprate_txpool::transaction_blob_hash).
|
||||
///
|
||||
/// Returns `true` if the tx was added and `false` if another task is already handling this tx.
|
||||
pub fn try_add_tx(&mut self, tx_blob_hash: [u8; 32]) -> bool {
|
||||
if !self.txs_being_handled.0.insert(tx_blob_hash) {
|
||||
return false;
|
||||
}
|
||||
|
||||
self.txs.push(tx_blob_hash);
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TxsBeingHandledLocally {
|
||||
fn drop(&mut self) {
|
||||
for hash in &self.txs {
|
||||
self.txs_being_handled.0.remove(hash);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -73,15 +73,6 @@ pub enum TxState<Id> {
|
|||
Local,
|
||||
}
|
||||
|
||||
impl<Id> TxState<Id> {
|
||||
/// Returns `true` if the tx is in the stem stage.
|
||||
///
|
||||
/// [`TxState::Local`] & [`TxState::Stem`] are the 2 stem stage states.
|
||||
pub const fn is_stem_stage(&self) -> bool {
|
||||
matches!(self, Self::Local | Self::Stem { .. })
|
||||
}
|
||||
}
|
||||
|
||||
/// A request to route a transaction.
|
||||
pub struct DandelionRouteReq<Tx, Id> {
|
||||
/// The transaction.
|
||||
|
|
|
@ -18,13 +18,13 @@ use tracing::{Instrument, Span};
|
|||
use cuprate_p2p_core::{
|
||||
client::{Client, InternalPeerID},
|
||||
handles::ConnectionHandle,
|
||||
ConnectionDirection, NetworkZone,
|
||||
NetworkZone,
|
||||
};
|
||||
|
||||
pub(crate) mod disconnect_monitor;
|
||||
mod drop_guard_client;
|
||||
|
||||
pub use drop_guard_client::ClientPoolDropGuard;
|
||||
pub(crate) use drop_guard_client::ClientPoolDropGuard;
|
||||
|
||||
/// The client pool, which holds currently connected free peers.
|
||||
///
|
||||
|
@ -165,17 +165,6 @@ impl<N: NetworkZone> ClientPool<N> {
|
|||
sync_data.cumulative_difficulty() > cumulative_difficulty
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the first outbound peer when iterating over the peers.
|
||||
pub fn outbound_client(self: &Arc<Self>) -> Option<ClientPoolDropGuard<N>> {
|
||||
let client = self
|
||||
.clients
|
||||
.iter()
|
||||
.find(|element| element.value().info.direction == ConnectionDirection::Outbound)?;
|
||||
let id = *client.key();
|
||||
|
||||
Some(self.borrow_client(&id).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
mod sealed {
|
||||
|
|
|
@ -18,7 +18,7 @@ use cuprate_p2p_core::{
|
|||
|
||||
pub mod block_downloader;
|
||||
mod broadcast;
|
||||
pub mod client_pool;
|
||||
mod client_pool;
|
||||
pub mod config;
|
||||
pub mod connection_maintainer;
|
||||
pub mod constants;
|
||||
|
@ -26,7 +26,6 @@ mod inbound_server;
|
|||
|
||||
use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse};
|
||||
pub use broadcast::{BroadcastRequest, BroadcastSvc};
|
||||
pub use client_pool::{ClientPool, ClientPoolDropGuard};
|
||||
pub use config::{AddressBookConfig, P2PConfig};
|
||||
use connection_maintainer::MakeConnectionRequest;
|
||||
|
||||
|
@ -83,7 +82,7 @@ where
|
|||
|
||||
let outbound_handshaker = outbound_handshaker_builder.build();
|
||||
|
||||
let client_pool = ClientPool::new();
|
||||
let client_pool = client_pool::ClientPool::new();
|
||||
|
||||
let (make_connection_tx, make_connection_rx) = mpsc::channel(3);
|
||||
|
||||
|
@ -133,7 +132,7 @@ where
|
|||
#[derive(Clone)]
|
||||
pub struct NetworkInterface<N: NetworkZone> {
|
||||
/// A pool of free connected peers.
|
||||
pool: Arc<ClientPool<N>>,
|
||||
pool: Arc<client_pool::ClientPool<N>>,
|
||||
/// A [`Service`] that allows broadcasting to all connected peers.
|
||||
broadcast_svc: BroadcastSvc<N>,
|
||||
/// A channel to request extra connections.
|
||||
|
@ -174,7 +173,7 @@ impl<N: NetworkZone> NetworkInterface<N> {
|
|||
}
|
||||
|
||||
/// Borrows the `ClientPool`, for access to connected peers.
|
||||
pub const fn client_pool(&self) -> &Arc<ClientPool<N>> {
|
||||
pub const fn client_pool(&self) -> &Arc<client_pool::ClientPool<N>> {
|
||||
&self.pool
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,14 +30,6 @@ pub struct DatabaseWriteHandle<Req, Res> {
|
|||
crossbeam::channel::Sender<(Req, oneshot::Sender<Result<Res, RuntimeError>>)>,
|
||||
}
|
||||
|
||||
impl<Req, Res> Clone for DatabaseWriteHandle<Req, Res> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
sender: self.sender.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Req, Res> DatabaseWriteHandle<Req, Res>
|
||||
where
|
||||
Req: Send + 'static,
|
||||
|
|
|
@ -29,7 +29,6 @@ bytemuck = { workspace = true, features = ["must_cast", "derive"
|
|||
bitflags = { workspace = true, features = ["std", "serde", "bytemuck"] }
|
||||
thiserror = { workspace = true }
|
||||
hex = { workspace = true }
|
||||
blake3 = { workspace = true, features = ["std"] }
|
||||
|
||||
tower = { workspace = true, optional = true }
|
||||
rayon = { workspace = true, optional = true }
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
//---------------------------------------------------------------------------------------------------- Import
|
||||
use cuprate_database::{ConcreteEnv, Env, EnvInner, InitError, RuntimeError, TxRw};
|
||||
|
||||
use crate::{config::Config, tables::OpenTables, types::TransactionBlobHash};
|
||||
use crate::{config::Config, tables::OpenTables};
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- Free functions
|
||||
/// Open the txpool database using the passed [`Config`].
|
||||
|
@ -60,13 +60,3 @@ pub fn open(config: Config) -> Result<ConcreteEnv, InitError> {
|
|||
|
||||
Ok(env)
|
||||
}
|
||||
|
||||
/// Calculate the transaction blob hash.
|
||||
///
|
||||
/// This value is supposed to be quick to compute just based of the tx-blob without needing to parse the tx.
|
||||
///
|
||||
/// The exact way the hash is calculated is not stable and is subject to change, as such it should not be exposed
|
||||
/// as a way to interact with Cuprate externally.
|
||||
pub fn transaction_blob_hash(tx_blob: &[u8]) -> TransactionBlobHash {
|
||||
blake3::hash(tx_blob).into()
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ mod tx;
|
|||
pub mod types;
|
||||
|
||||
pub use config::Config;
|
||||
pub use free::{open, transaction_blob_hash};
|
||||
pub use free::open;
|
||||
pub use tx::{BlockTemplateTxEntry, TxEntry};
|
||||
|
||||
//re-exports
|
||||
|
|
|
@ -85,7 +85,7 @@ mod key_images;
|
|||
mod tx_read;
|
||||
mod tx_write;
|
||||
|
||||
pub use tx_read::{get_transaction_verification_data, in_stem_pool};
|
||||
pub use tx_read::get_transaction_verification_data;
|
||||
pub use tx_write::{add_transaction, remove_transaction};
|
||||
|
||||
/// An error that can occur on some tx-write ops.
|
||||
|
|
|
@ -8,10 +8,7 @@ use monero_serai::transaction::Transaction;
|
|||
use cuprate_database::{DatabaseRo, RuntimeError};
|
||||
use cuprate_types::{TransactionVerificationData, TxVersion};
|
||||
|
||||
use crate::{
|
||||
tables::{Tables, TransactionInfos},
|
||||
types::{TransactionHash, TxStateFlags},
|
||||
};
|
||||
use crate::{tables::Tables, types::TransactionHash};
|
||||
|
||||
/// Gets the [`TransactionVerificationData`] of a transaction in the tx-pool, leaving the tx in the pool.
|
||||
pub fn get_transaction_verification_data(
|
||||
|
@ -37,17 +34,3 @@ pub fn get_transaction_verification_data(
|
|||
cached_verification_state: Mutex::new(cached_verification_state),
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns `true` if the transaction with the given hash is in the stem pool.
|
||||
///
|
||||
/// # Errors
|
||||
/// This will return an [`Err`] if the transaction is not in the pool.
|
||||
pub fn in_stem_pool(
|
||||
tx_hash: &TransactionHash,
|
||||
tx_infos: &impl DatabaseRo<TransactionInfos>,
|
||||
) -> Result<bool, RuntimeError> {
|
||||
Ok(tx_infos
|
||||
.get(tx_hash)?
|
||||
.flags
|
||||
.contains(TxStateFlags::STATE_STEM))
|
||||
}
|
||||
|
|
|
@ -8,7 +8,6 @@ use cuprate_database::{DatabaseRw, RuntimeError, StorableVec};
|
|||
use cuprate_types::TransactionVerificationData;
|
||||
|
||||
use crate::{
|
||||
free::transaction_blob_hash,
|
||||
ops::{
|
||||
key_images::{add_tx_key_images, remove_tx_key_images},
|
||||
TxPoolWriteError,
|
||||
|
@ -57,12 +56,6 @@ pub fn add_transaction(
|
|||
let kis_table = tables.spent_key_images_mut();
|
||||
add_tx_key_images(&tx.tx.prefix().inputs, &tx.tx_hash, kis_table)?;
|
||||
|
||||
// Add the blob hash to table 4.
|
||||
let blob_hash = transaction_blob_hash(&tx.tx_blob);
|
||||
tables
|
||||
.known_blob_hashes_mut()
|
||||
.put(&blob_hash, &tx.tx_hash)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -86,9 +79,5 @@ pub fn remove_transaction(
|
|||
let kis_table = tables.spent_key_images_mut();
|
||||
remove_tx_key_images(&tx.prefix().inputs, kis_table)?;
|
||||
|
||||
// Remove the blob hash from table 4.
|
||||
let blob_hash = transaction_blob_hash(&tx_blob);
|
||||
tables.known_blob_hashes_mut().delete(&blob_hash)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,36 +1,24 @@
|
|||
//! Tx-pool [`service`](super) interface.
|
||||
//!
|
||||
//! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums.
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::Arc,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
|
||||
use cuprate_types::TransactionVerificationData;
|
||||
|
||||
use crate::{
|
||||
tx::{BlockTemplateTxEntry, TxEntry},
|
||||
types::{KeyImage, TransactionBlobHash, TransactionHash},
|
||||
types::TransactionHash,
|
||||
};
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- TxpoolReadRequest
|
||||
/// The transaction pool [`tower::Service`] read request type.
|
||||
#[derive(Clone)]
|
||||
pub enum TxpoolReadRequest {
|
||||
/// Get the blob (raw bytes) of a transaction with the given hash.
|
||||
/// A request for the blob (raw bytes) of a transaction with the given hash.
|
||||
TxBlob(TransactionHash),
|
||||
|
||||
/// Get the [`TransactionVerificationData`] of a transaction in the tx pool.
|
||||
/// A request for the [`TransactionVerificationData`] of a transaction in the tx pool.
|
||||
TxVerificationData(TransactionHash),
|
||||
|
||||
/// Filter (remove) all **known** transactions from the set.
|
||||
///
|
||||
/// The hash is **not** the transaction hash, it is the hash of the serialized tx-blob.
|
||||
FilterKnownTxBlobHashes(HashSet<TransactionBlobHash>),
|
||||
|
||||
/// Get some transactions for an incoming block.
|
||||
TxsForBlock(Vec<TransactionHash>),
|
||||
|
||||
/// Get information on all transactions in the pool.
|
||||
Backlog,
|
||||
|
||||
|
@ -52,28 +40,15 @@ pub enum TxpoolReadRequest {
|
|||
/// The transaction pool [`tower::Service`] read response type.
|
||||
#[expect(clippy::large_enum_variant)]
|
||||
pub enum TxpoolReadResponse {
|
||||
/// The response for [`TxpoolReadRequest::TxBlob`].
|
||||
TxBlob { tx_blob: Vec<u8>, state_stem: bool },
|
||||
/// Response to [`TxpoolReadRequest::TxBlob`].
|
||||
///
|
||||
/// The inner value is the raw bytes of a transaction.
|
||||
// TODO: use bytes::Bytes.
|
||||
TxBlob(Vec<u8>),
|
||||
|
||||
/// The response for [`TxpoolReadRequest::TxVerificationData`].
|
||||
/// Response to [`TxpoolReadRequest::TxVerificationData`].
|
||||
TxVerificationData(TransactionVerificationData),
|
||||
|
||||
/// The response for [`TxpoolReadRequest::FilterKnownTxBlobHashes`].
|
||||
FilterKnownTxBlobHashes {
|
||||
/// The blob hashes that are unknown.
|
||||
unknown_blob_hashes: HashSet<TransactionBlobHash>,
|
||||
/// The tx hashes of the blob hashes that were known but were in the stem pool.
|
||||
stem_pool_hashes: Vec<TransactionHash>,
|
||||
},
|
||||
|
||||
/// The response for [`TxpoolReadRequest::TxsForBlock`].
|
||||
TxsForBlock {
|
||||
/// The txs we had in the txpool.
|
||||
txs: HashMap<[u8; 32], TransactionVerificationData>,
|
||||
/// The indexes of the missing txs.
|
||||
missing: Vec<usize>,
|
||||
},
|
||||
|
||||
/// Response to [`TxpoolReadRequest::Backlog`].
|
||||
///
|
||||
/// The inner [`Vec`] contains information on all
|
||||
|
@ -109,17 +84,9 @@ pub enum TxpoolWriteRequest {
|
|||
},
|
||||
|
||||
/// Remove a transaction with the given hash from the pool.
|
||||
///
|
||||
/// Returns [`TxpoolWriteResponse::Ok`].
|
||||
RemoveTransaction(TransactionHash),
|
||||
|
||||
/// Promote a transaction from the stem pool to the fluff pool.
|
||||
/// If the tx is already in the fluff pool this does nothing.
|
||||
Promote(TransactionHash),
|
||||
|
||||
/// Tell the tx-pool about a new block.
|
||||
NewBlock {
|
||||
/// The spent key images in the new block.
|
||||
spent_key_images: Vec<KeyImage>,
|
||||
},
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- TxpoolWriteResponse
|
||||
|
@ -128,8 +95,6 @@ pub enum TxpoolWriteRequest {
|
|||
pub enum TxpoolWriteResponse {
|
||||
/// Response to:
|
||||
/// - [`TxpoolWriteRequest::RemoveTransaction`]
|
||||
/// - [`TxpoolWriteRequest::Promote`]
|
||||
/// - [`TxpoolWriteRequest::NewBlock`]
|
||||
Ok,
|
||||
|
||||
/// Response to [`TxpoolWriteRequest::AddTransaction`].
|
||||
|
|
|
@ -4,24 +4,22 @@
|
|||
clippy::unnecessary_wraps,
|
||||
reason = "TODO: finish implementing the signatures from <https://github.com/Cuprate/cuprate/pull/297>"
|
||||
)]
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use rayon::ThreadPool;
|
||||
|
||||
use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError};
|
||||
use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner};
|
||||
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
|
||||
|
||||
use crate::{
|
||||
ops::{get_transaction_verification_data, in_stem_pool},
|
||||
ops::get_transaction_verification_data,
|
||||
service::{
|
||||
interface::{TxpoolReadRequest, TxpoolReadResponse},
|
||||
types::{ReadResponseResult, TxpoolReadHandle},
|
||||
},
|
||||
tables::{KnownBlobHashes, OpenTables, TransactionBlobs, TransactionInfos},
|
||||
types::{TransactionBlobHash, TransactionHash},
|
||||
tables::{OpenTables, TransactionBlobs},
|
||||
types::TransactionHash,
|
||||
};
|
||||
|
||||
// TODO: update the docs here
|
||||
|
@ -59,6 +57,7 @@ fn init_read_service_with_pool(env: Arc<ConcreteEnv>, pool: Arc<ThreadPool>) ->
|
|||
/// 1. `Request` is mapped to a handler function
|
||||
/// 2. Handler function is called
|
||||
/// 3. [`TxpoolReadResponse`] is returned
|
||||
#[expect(clippy::needless_pass_by_value)]
|
||||
fn map_request(
|
||||
env: &ConcreteEnv, // Access to the database
|
||||
request: TxpoolReadRequest, // The request we must fulfill
|
||||
|
@ -66,10 +65,6 @@ fn map_request(
|
|||
match request {
|
||||
TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash),
|
||||
TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash),
|
||||
TxpoolReadRequest::FilterKnownTxBlobHashes(blob_hashes) => {
|
||||
filter_known_tx_blob_hashes(env, blob_hashes)
|
||||
}
|
||||
TxpoolReadRequest::TxsForBlock(txs_needed) => txs_for_block(env, txs_needed),
|
||||
TxpoolReadRequest::Backlog => backlog(env),
|
||||
TxpoolReadRequest::BlockTemplateBacklog => block_template_backlog(env),
|
||||
TxpoolReadRequest::Size {
|
||||
|
@ -102,14 +97,10 @@ fn tx_blob(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult {
|
|||
let tx_ro = inner_env.tx_ro()?;
|
||||
|
||||
let tx_blobs_table = inner_env.open_db_ro::<TransactionBlobs>(&tx_ro)?;
|
||||
let tx_infos_table = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
|
||||
|
||||
let tx_blob = tx_blobs_table.get(tx_hash)?.0;
|
||||
|
||||
Ok(TxpoolReadResponse::TxBlob {
|
||||
tx_blob,
|
||||
state_stem: in_stem_pool(tx_hash, &tx_infos_table)?,
|
||||
})
|
||||
tx_blobs_table
|
||||
.get(tx_hash)
|
||||
.map(|blob| TxpoolReadResponse::TxBlob(blob.0))
|
||||
}
|
||||
|
||||
/// [`TxpoolReadRequest::TxVerificationData`].
|
||||
|
@ -123,79 +114,6 @@ fn tx_verification_data(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadRes
|
|||
get_transaction_verification_data(tx_hash, &tables).map(TxpoolReadResponse::TxVerificationData)
|
||||
}
|
||||
|
||||
/// [`TxpoolReadRequest::FilterKnownTxBlobHashes`].
|
||||
fn filter_known_tx_blob_hashes(
|
||||
env: &ConcreteEnv,
|
||||
mut blob_hashes: HashSet<TransactionBlobHash>,
|
||||
) -> ReadResponseResult {
|
||||
let inner_env = env.env_inner();
|
||||
let tx_ro = inner_env.tx_ro()?;
|
||||
|
||||
let tx_blob_hashes = inner_env.open_db_ro::<KnownBlobHashes>(&tx_ro)?;
|
||||
let tx_infos = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
|
||||
|
||||
let mut stem_pool_hashes = Vec::new();
|
||||
|
||||
// A closure that returns `true` if a tx with a certain blob hash is unknown.
|
||||
// This also fills in `stem_tx_hashes`.
|
||||
let mut tx_unknown = |blob_hash| -> Result<bool, RuntimeError> {
|
||||
match tx_blob_hashes.get(&blob_hash) {
|
||||
Ok(tx_hash) => {
|
||||
if in_stem_pool(&tx_hash, &tx_infos)? {
|
||||
stem_pool_hashes.push(tx_hash);
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
Err(RuntimeError::KeyNotFound) => Ok(true),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
};
|
||||
|
||||
let mut err = None;
|
||||
blob_hashes.retain(|blob_hash| match tx_unknown(*blob_hash) {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
err = Some(e);
|
||||
false
|
||||
}
|
||||
});
|
||||
|
||||
if let Some(e) = err {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
Ok(TxpoolReadResponse::FilterKnownTxBlobHashes {
|
||||
unknown_blob_hashes: blob_hashes,
|
||||
stem_pool_hashes,
|
||||
})
|
||||
}
|
||||
|
||||
/// [`TxpoolReadRequest::TxsForBlock`].
|
||||
fn txs_for_block(env: &ConcreteEnv, txs: Vec<TransactionHash>) -> ReadResponseResult {
|
||||
let inner_env = env.env_inner();
|
||||
let tx_ro = inner_env.tx_ro()?;
|
||||
|
||||
let tables = inner_env.open_tables(&tx_ro)?;
|
||||
|
||||
let mut missing_tx_indexes = Vec::with_capacity(txs.len());
|
||||
let mut txs_verification_data = HashMap::with_capacity(txs.len());
|
||||
|
||||
for (i, tx_hash) in txs.into_iter().enumerate() {
|
||||
match get_transaction_verification_data(&tx_hash, &tables) {
|
||||
Ok(tx) => {
|
||||
txs_verification_data.insert(tx_hash, tx);
|
||||
}
|
||||
Err(RuntimeError::KeyNotFound) => missing_tx_indexes.push(i),
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(TxpoolReadResponse::TxsForBlock {
|
||||
txs: txs_verification_data,
|
||||
missing: missing_tx_indexes,
|
||||
})
|
||||
}
|
||||
|
||||
/// [`TxpoolReadRequest::Backlog`].
|
||||
#[inline]
|
||||
fn backlog(env: &ConcreteEnv) -> ReadResponseResult {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use cuprate_database::{ConcreteEnv, DatabaseRo, DatabaseRw, Env, EnvInner, RuntimeError, TxRw};
|
||||
use cuprate_database::{ConcreteEnv, Env, EnvInner, RuntimeError, TxRw};
|
||||
use cuprate_database_service::DatabaseWriteHandle;
|
||||
use cuprate_types::TransactionVerificationData;
|
||||
|
||||
|
@ -10,8 +10,8 @@ use crate::{
|
|||
interface::{TxpoolWriteRequest, TxpoolWriteResponse},
|
||||
types::TxpoolWriteHandle,
|
||||
},
|
||||
tables::{OpenTables, Tables, TransactionInfos},
|
||||
types::{KeyImage, TransactionHash, TxStateFlags},
|
||||
tables::OpenTables,
|
||||
types::TransactionHash,
|
||||
};
|
||||
|
||||
//---------------------------------------------------------------------------------------------------- init_write_service
|
||||
|
@ -31,8 +31,6 @@ fn handle_txpool_request(
|
|||
add_transaction(env, tx, *state_stem)
|
||||
}
|
||||
TxpoolWriteRequest::RemoveTransaction(tx_hash) => remove_transaction(env, tx_hash),
|
||||
TxpoolWriteRequest::Promote(tx_hash) => promote(env, tx_hash),
|
||||
TxpoolWriteRequest::NewBlock { spent_key_images } => new_block(env, spent_key_images),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -103,68 +101,3 @@ fn remove_transaction(
|
|||
TxRw::commit(tx_rw)?;
|
||||
Ok(TxpoolWriteResponse::Ok)
|
||||
}
|
||||
|
||||
/// [`TxpoolWriteRequest::Promote`]
|
||||
fn promote(
|
||||
env: &ConcreteEnv,
|
||||
tx_hash: &TransactionHash,
|
||||
) -> Result<TxpoolWriteResponse, RuntimeError> {
|
||||
let env_inner = env.env_inner();
|
||||
let tx_rw = env_inner.tx_rw()?;
|
||||
|
||||
let res = || {
|
||||
let mut tx_infos = env_inner.open_db_rw::<TransactionInfos>(&tx_rw)?;
|
||||
|
||||
tx_infos.update(tx_hash, |mut info| {
|
||||
info.flags.remove(TxStateFlags::STATE_STEM);
|
||||
Some(info)
|
||||
})
|
||||
};
|
||||
|
||||
if let Err(e) = res() {
|
||||
// error promoting the tx, abort the DB transaction.
|
||||
TxRw::abort(tx_rw)
|
||||
.expect("could not maintain database atomicity by aborting write transaction");
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
TxRw::commit(tx_rw)?;
|
||||
Ok(TxpoolWriteResponse::Ok)
|
||||
}
|
||||
|
||||
/// [`TxpoolWriteRequest::NewBlock`]
|
||||
fn new_block(
|
||||
env: &ConcreteEnv,
|
||||
spent_key_images: &[KeyImage],
|
||||
) -> Result<TxpoolWriteResponse, RuntimeError> {
|
||||
let env_inner = env.env_inner();
|
||||
let tx_rw = env_inner.tx_rw()?;
|
||||
|
||||
// FIXME: use try blocks once stable.
|
||||
let result = || {
|
||||
let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
|
||||
|
||||
// Remove all txs which spend key images that were spent in the new block.
|
||||
for key_image in spent_key_images {
|
||||
match tables_mut
|
||||
.spent_key_images()
|
||||
.get(key_image)
|
||||
.and_then(|tx_hash| ops::remove_transaction(&tx_hash, &mut tables_mut))
|
||||
{
|
||||
Ok(()) | Err(RuntimeError::KeyNotFound) => (),
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
if let Err(e) = result() {
|
||||
TxRw::abort(tx_rw)?;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
TxRw::commit(tx_rw)?;
|
||||
Ok(TxpoolWriteResponse::Ok)
|
||||
}
|
||||
|
|
|
@ -16,9 +16,7 @@
|
|||
//! accessing _all_ tables defined here at once.
|
||||
use cuprate_database::{define_tables, StorableVec};
|
||||
|
||||
use crate::types::{
|
||||
KeyImage, RawCachedVerificationState, TransactionBlobHash, TransactionHash, TransactionInfo,
|
||||
};
|
||||
use crate::types::{KeyImage, RawCachedVerificationState, TransactionHash, TransactionInfo};
|
||||
|
||||
define_tables! {
|
||||
/// Serialized transaction blobs.
|
||||
|
@ -43,9 +41,5 @@ define_tables! {
|
|||
///
|
||||
/// This table contains the spent key images from all transactions in the pool.
|
||||
3 => SpentKeyImages,
|
||||
KeyImage => TransactionHash,
|
||||
|
||||
/// Transaction blob hashes that are in the pool.
|
||||
4 => KnownBlobHashes,
|
||||
TransactionBlobHash => TransactionHash,
|
||||
KeyImage => TransactionHash
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
//!
|
||||
//! <!-- FIXME: Add schema here or a link to it when complete -->
|
||||
use bytemuck::{Pod, Zeroable};
|
||||
|
||||
use monero_serai::transaction::Timelock;
|
||||
|
||||
use cuprate_types::{CachedVerificationState, HardFork};
|
||||
|
@ -16,9 +17,6 @@ pub type KeyImage = [u8; 32];
|
|||
/// A transaction hash.
|
||||
pub type TransactionHash = [u8; 32];
|
||||
|
||||
/// A transaction blob hash.
|
||||
pub type TransactionBlobHash = [u8; 32];
|
||||
|
||||
bitflags::bitflags! {
|
||||
/// Flags representing the state of the transaction in the pool.
|
||||
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)]
|
||||
|
|
Loading…
Reference in a new issue