mirror of
https://github.com/hinto-janai/cuprate.git
synced 2024-12-23 03:59:37 +00:00
Compare commits
2 commits
5fd4718103
...
aecbdf5728
Author | SHA1 | Date | |
---|---|---|---|
|
aecbdf5728 | ||
b57ee2f4cf |
34 changed files with 1143 additions and 90 deletions
32
Cargo.lock
generated
32
Cargo.lock
generated
|
@ -56,6 +56,18 @@ version = "1.0.89"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6"
|
checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "arrayref"
|
||||||
|
version = "0.3.9"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "arrayvec"
|
||||||
|
version = "0.7.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-stream"
|
name = "async-stream"
|
||||||
version = "0.3.5"
|
version = "0.3.5"
|
||||||
|
@ -238,6 +250,19 @@ dependencies = [
|
||||||
"digest",
|
"digest",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "blake3"
|
||||||
|
version = "1.5.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d82033247fd8e890df8f740e407ad4d038debb9eb1f40533fffb32e7d17dc6f7"
|
||||||
|
dependencies = [
|
||||||
|
"arrayref",
|
||||||
|
"arrayvec",
|
||||||
|
"cc",
|
||||||
|
"cfg-if",
|
||||||
|
"constant_time_eq",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "block-buffer"
|
name = "block-buffer"
|
||||||
version = "0.10.4"
|
version = "0.10.4"
|
||||||
|
@ -403,6 +428,12 @@ dependencies = [
|
||||||
"unicode-xid",
|
"unicode-xid",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "constant_time_eq"
|
||||||
|
version = "0.3.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "core-foundation"
|
name = "core-foundation"
|
||||||
version = "0.9.4"
|
version = "0.9.4"
|
||||||
|
@ -920,6 +951,7 @@ name = "cuprate-txpool"
|
||||||
version = "0.0.0"
|
version = "0.0.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags 2.6.0",
|
"bitflags 2.6.0",
|
||||||
|
"blake3",
|
||||||
"bytemuck",
|
"bytemuck",
|
||||||
"cuprate-database",
|
"cuprate-database",
|
||||||
"cuprate-database-service",
|
"cuprate-database-service",
|
||||||
|
|
|
@ -82,6 +82,7 @@ cuprate-rpc-interface = { path = "rpc/interface" ,default-feature
|
||||||
anyhow = { version = "1.0.89", default-features = false }
|
anyhow = { version = "1.0.89", default-features = false }
|
||||||
async-trait = { version = "0.1.82", default-features = false }
|
async-trait = { version = "0.1.82", default-features = false }
|
||||||
bitflags = { version = "2.6.0", default-features = false }
|
bitflags = { version = "2.6.0", default-features = false }
|
||||||
|
blake3 = { version = "1", default-features = false }
|
||||||
borsh = { version = "1.5.1", default-features = false }
|
borsh = { version = "1.5.1", default-features = false }
|
||||||
bytemuck = { version = "1.18.0", default-features = false }
|
bytemuck = { version = "1.18.0", default-features = false }
|
||||||
bytes = { version = "1.7.2", default-features = false }
|
bytes = { version = "1.7.2", default-features = false }
|
||||||
|
|
|
@ -22,7 +22,7 @@ cuprate-levin = { workspace = true }
|
||||||
cuprate-wire = { workspace = true }
|
cuprate-wire = { workspace = true }
|
||||||
cuprate-p2p = { workspace = true }
|
cuprate-p2p = { workspace = true }
|
||||||
cuprate-p2p-core = { workspace = true }
|
cuprate-p2p-core = { workspace = true }
|
||||||
cuprate-dandelion-tower = { workspace = true }
|
cuprate-dandelion-tower = { workspace = true, features = ["txpool"] }
|
||||||
cuprate-async-buffer = { workspace = true }
|
cuprate-async-buffer = { workspace = true }
|
||||||
cuprate-address-book = { workspace = true }
|
cuprate-address-book = { workspace = true }
|
||||||
cuprate-blockchain = { workspace = true, features = ["service"] }
|
cuprate-blockchain = { workspace = true, features = ["service"] }
|
||||||
|
|
|
@ -25,7 +25,7 @@ mod manager;
|
||||||
mod syncer;
|
mod syncer;
|
||||||
mod types;
|
mod types;
|
||||||
|
|
||||||
use types::{
|
pub use types::{
|
||||||
ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle,
|
ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -8,17 +8,16 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use monero_serai::{block::Block, transaction::Transaction};
|
use monero_serai::{block::Block, transaction::Transaction};
|
||||||
use rayon::prelude::*;
|
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
use tower::{Service, ServiceExt};
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
use cuprate_blockchain::service::BlockchainReadHandle;
|
use cuprate_blockchain::service::BlockchainReadHandle;
|
||||||
use cuprate_consensus::transactions::new_tx_verification_data;
|
use cuprate_consensus::transactions::new_tx_verification_data;
|
||||||
use cuprate_helper::cast::usize_to_u64;
|
use cuprate_txpool::service::{
|
||||||
use cuprate_types::{
|
interface::{TxpoolReadRequest, TxpoolReadResponse},
|
||||||
blockchain::{BlockchainReadRequest, BlockchainResponse},
|
TxpoolReadHandle,
|
||||||
Chain,
|
|
||||||
};
|
};
|
||||||
|
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk},
|
blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk},
|
||||||
|
@ -38,7 +37,7 @@ pub enum IncomingBlockError {
|
||||||
///
|
///
|
||||||
/// The inner values are the block hash and the indexes of the missing txs in the block.
|
/// The inner values are the block hash and the indexes of the missing txs in the block.
|
||||||
#[error("Unknown transactions in block.")]
|
#[error("Unknown transactions in block.")]
|
||||||
UnknownTransactions([u8; 32], Vec<u64>),
|
UnknownTransactions([u8; 32], Vec<usize>),
|
||||||
/// We are missing the block's parent.
|
/// We are missing the block's parent.
|
||||||
#[error("The block has an unknown parent.")]
|
#[error("The block has an unknown parent.")]
|
||||||
Orphan,
|
Orphan,
|
||||||
|
@ -59,8 +58,9 @@ pub enum IncomingBlockError {
|
||||||
/// - the block's parent is unknown
|
/// - the block's parent is unknown
|
||||||
pub async fn handle_incoming_block(
|
pub async fn handle_incoming_block(
|
||||||
block: Block,
|
block: Block,
|
||||||
given_txs: Vec<Transaction>,
|
mut given_txs: HashMap<[u8; 32], Transaction>,
|
||||||
blockchain_read_handle: &mut BlockchainReadHandle,
|
blockchain_read_handle: &mut BlockchainReadHandle,
|
||||||
|
txpool_read_handle: &mut TxpoolReadHandle,
|
||||||
) -> Result<IncomingBlockOk, IncomingBlockError> {
|
) -> Result<IncomingBlockOk, IncomingBlockError> {
|
||||||
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
|
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
|
||||||
///
|
///
|
||||||
|
@ -72,7 +72,12 @@ pub async fn handle_incoming_block(
|
||||||
/// which are also more expensive than `Mutex`s.
|
/// which are also more expensive than `Mutex`s.
|
||||||
static BLOCKS_BEING_HANDLED: LazyLock<Mutex<HashSet<[u8; 32]>>> =
|
static BLOCKS_BEING_HANDLED: LazyLock<Mutex<HashSet<[u8; 32]>>> =
|
||||||
LazyLock::new(|| Mutex::new(HashSet::new()));
|
LazyLock::new(|| Mutex::new(HashSet::new()));
|
||||||
// FIXME: we should look in the tx-pool for txs when that is ready.
|
|
||||||
|
if given_txs.len() > block.transactions.len() {
|
||||||
|
return Err(IncomingBlockError::InvalidBlock(anyhow::anyhow!(
|
||||||
|
"Too many transactions given for block"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
if !block_exists(block.header.previous, blockchain_read_handle)
|
if !block_exists(block.header.previous, blockchain_read_handle)
|
||||||
.await
|
.await
|
||||||
|
@ -90,23 +95,36 @@ pub async fn handle_incoming_block(
|
||||||
return Ok(IncomingBlockOk::AlreadyHave);
|
return Ok(IncomingBlockOk::AlreadyHave);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: remove this when we have a working tx-pool.
|
let TxpoolReadResponse::TxsForBlock { mut txs, missing } = txpool_read_handle
|
||||||
if given_txs.len() != block.transactions.len() {
|
.ready()
|
||||||
return Err(IncomingBlockError::UnknownTransactions(
|
.await
|
||||||
block_hash,
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
(0..usize_to_u64(block.transactions.len())).collect(),
|
.call(TxpoolReadRequest::TxsForBlock(block.transactions.clone()))
|
||||||
));
|
.await
|
||||||
}
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
|
else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
|
||||||
// TODO: check we actually got given the right txs.
|
if !missing.is_empty() {
|
||||||
let prepped_txs = given_txs
|
let needed_hashes = missing.iter().map(|index| block.transactions[*index]);
|
||||||
.into_par_iter()
|
|
||||||
.map(|tx| {
|
for needed_hash in needed_hashes {
|
||||||
let tx = new_tx_verification_data(tx)?;
|
let Some(tx) = given_txs.remove(&needed_hash) else {
|
||||||
Ok((tx.tx_hash, tx))
|
// We return back the indexes of all txs missing from our pool, not taking into account the txs
|
||||||
})
|
// that were given with the block, as these txs will be dropped. It is not worth it to try to add
|
||||||
.collect::<Result<_, anyhow::Error>>()
|
// these txs to the pool as this will only happen with a misbehaving peer or if the txpool reaches
|
||||||
.map_err(IncomingBlockError::InvalidBlock)?;
|
// the size limit.
|
||||||
|
return Err(IncomingBlockError::UnknownTransactions(block_hash, missing));
|
||||||
|
};
|
||||||
|
|
||||||
|
txs.insert(
|
||||||
|
needed_hash,
|
||||||
|
new_tx_verification_data(tx)
|
||||||
|
.map_err(|e| IncomingBlockError::InvalidBlock(e.into()))?,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let Some(incoming_block_tx) = COMMAND_TX.get() else {
|
let Some(incoming_block_tx) = COMMAND_TX.get() else {
|
||||||
// We could still be starting up the blockchain manager.
|
// We could still be starting up the blockchain manager.
|
||||||
|
@ -119,28 +137,37 @@ pub async fn handle_incoming_block(
|
||||||
return Ok(IncomingBlockOk::AlreadyHave);
|
return Ok(IncomingBlockOk::AlreadyHave);
|
||||||
}
|
}
|
||||||
|
|
||||||
// From this point on we MUST not early return without removing the block hash from `BLOCKS_BEING_HANDLED`.
|
// We must remove the block hash from `BLOCKS_BEING_HANDLED`.
|
||||||
|
let _guard = {
|
||||||
|
struct RemoveFromBlocksBeingHandled {
|
||||||
|
block_hash: [u8; 32],
|
||||||
|
}
|
||||||
|
impl Drop for RemoveFromBlocksBeingHandled {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
BLOCKS_BEING_HANDLED
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.remove(&self.block_hash);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
RemoveFromBlocksBeingHandled { block_hash }
|
||||||
|
};
|
||||||
|
|
||||||
let (response_tx, response_rx) = oneshot::channel();
|
let (response_tx, response_rx) = oneshot::channel();
|
||||||
|
|
||||||
incoming_block_tx
|
incoming_block_tx
|
||||||
.send(BlockchainManagerCommand::AddBlock {
|
.send(BlockchainManagerCommand::AddBlock {
|
||||||
block,
|
block,
|
||||||
prepped_txs,
|
prepped_txs: txs,
|
||||||
response_tx,
|
response_tx,
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.expect("TODO: don't actually panic here, an err means we are shutting down");
|
.expect("TODO: don't actually panic here, an err means we are shutting down");
|
||||||
|
|
||||||
let res = response_rx
|
response_rx
|
||||||
.await
|
.await
|
||||||
.expect("The blockchain manager will always respond")
|
.expect("The blockchain manager will always respond")
|
||||||
.map_err(IncomingBlockError::InvalidBlock);
|
.map_err(IncomingBlockError::InvalidBlock)
|
||||||
|
|
||||||
// Remove the block hash from the blocks being handled.
|
|
||||||
BLOCKS_BEING_HANDLED.lock().unwrap().remove(&block_hash);
|
|
||||||
|
|
||||||
res
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if we have a block with the given hash.
|
/// Check if we have a block with the given hash.
|
||||||
|
|
|
@ -18,6 +18,7 @@ use cuprate_p2p::{
|
||||||
BroadcastSvc, NetworkInterface,
|
BroadcastSvc, NetworkInterface,
|
||||||
};
|
};
|
||||||
use cuprate_p2p_core::ClearNet;
|
use cuprate_p2p_core::ClearNet;
|
||||||
|
use cuprate_txpool::service::TxpoolWriteHandle;
|
||||||
use cuprate_types::{
|
use cuprate_types::{
|
||||||
blockchain::{BlockchainReadRequest, BlockchainResponse},
|
blockchain::{BlockchainReadRequest, BlockchainResponse},
|
||||||
Chain, TransactionVerificationData,
|
Chain, TransactionVerificationData,
|
||||||
|
@ -46,6 +47,7 @@ pub async fn init_blockchain_manager(
|
||||||
clearnet_interface: NetworkInterface<ClearNet>,
|
clearnet_interface: NetworkInterface<ClearNet>,
|
||||||
blockchain_write_handle: BlockchainWriteHandle,
|
blockchain_write_handle: BlockchainWriteHandle,
|
||||||
blockchain_read_handle: BlockchainReadHandle,
|
blockchain_read_handle: BlockchainReadHandle,
|
||||||
|
txpool_write_handle: TxpoolWriteHandle,
|
||||||
mut blockchain_context_service: BlockChainContextService,
|
mut blockchain_context_service: BlockChainContextService,
|
||||||
block_verifier_service: ConcreteBlockVerifierService,
|
block_verifier_service: ConcreteBlockVerifierService,
|
||||||
block_downloader_config: BlockDownloaderConfig,
|
block_downloader_config: BlockDownloaderConfig,
|
||||||
|
@ -80,6 +82,7 @@ pub async fn init_blockchain_manager(
|
||||||
let manager = BlockchainManager {
|
let manager = BlockchainManager {
|
||||||
blockchain_write_handle,
|
blockchain_write_handle,
|
||||||
blockchain_read_handle,
|
blockchain_read_handle,
|
||||||
|
txpool_write_handle,
|
||||||
blockchain_context_service,
|
blockchain_context_service,
|
||||||
cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(),
|
cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(),
|
||||||
block_verifier_service,
|
block_verifier_service,
|
||||||
|
@ -102,6 +105,8 @@ pub struct BlockchainManager {
|
||||||
blockchain_write_handle: BlockchainWriteHandle,
|
blockchain_write_handle: BlockchainWriteHandle,
|
||||||
/// A [`BlockchainReadHandle`].
|
/// A [`BlockchainReadHandle`].
|
||||||
blockchain_read_handle: BlockchainReadHandle,
|
blockchain_read_handle: BlockchainReadHandle,
|
||||||
|
/// A [`TxpoolWriteHandle`].
|
||||||
|
txpool_write_handle: TxpoolWriteHandle,
|
||||||
// TODO: Improve the API of the cache service.
|
// TODO: Improve the API of the cache service.
|
||||||
// TODO: rename the cache service -> `BlockchainContextService`.
|
// TODO: rename the cache service -> `BlockchainContextService`.
|
||||||
/// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve
|
/// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
//! The blockchain manager handler functions.
|
//! The blockchain manager handler functions.
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::{TryFutureExt, TryStreamExt};
|
use futures::{TryFutureExt, TryStreamExt};
|
||||||
use monero_serai::{block::Block, transaction::Transaction};
|
use monero_serai::{
|
||||||
|
block::Block,
|
||||||
|
transaction::{Input, Transaction},
|
||||||
|
};
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use std::ops::ControlFlow;
|
use std::ops::ControlFlow;
|
||||||
use std::{collections::HashMap, sync::Arc};
|
use std::{collections::HashMap, sync::Arc};
|
||||||
|
@ -17,16 +20,14 @@ use cuprate_consensus::{
|
||||||
use cuprate_consensus_context::NewBlockData;
|
use cuprate_consensus_context::NewBlockData;
|
||||||
use cuprate_helper::cast::usize_to_u64;
|
use cuprate_helper::cast::usize_to_u64;
|
||||||
use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest};
|
use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest};
|
||||||
|
use cuprate_txpool::service::interface::TxpoolWriteRequest;
|
||||||
use cuprate_types::{
|
use cuprate_types::{
|
||||||
blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest},
|
blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest},
|
||||||
AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation,
|
AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::blockchain::manager::commands::IncomingBlockOk;
|
|
||||||
use crate::{
|
use crate::{
|
||||||
blockchain::{
|
blockchain::manager::commands::{BlockchainManagerCommand, IncomingBlockOk},
|
||||||
manager::commands::BlockchainManagerCommand, types::ConsensusBlockchainReadHandle,
|
|
||||||
},
|
|
||||||
constants::PANIC_CRITICAL_SERVICE_ERROR,
|
constants::PANIC_CRITICAL_SERVICE_ERROR,
|
||||||
signals::REORG_LOCK,
|
signals::REORG_LOCK,
|
||||||
};
|
};
|
||||||
|
@ -434,6 +435,18 @@ impl super::BlockchainManager {
|
||||||
&mut self,
|
&mut self,
|
||||||
verified_block: VerifiedBlockInformation,
|
verified_block: VerifiedBlockInformation,
|
||||||
) {
|
) {
|
||||||
|
// FIXME: this is pretty inefficient, we should probably return the KI map created in the consensus crate.
|
||||||
|
let spent_key_images = verified_block
|
||||||
|
.txs
|
||||||
|
.iter()
|
||||||
|
.flat_map(|tx| {
|
||||||
|
tx.tx.prefix().inputs.iter().map(|input| match input {
|
||||||
|
Input::ToKey { key_image, .. } => key_image.compress().0,
|
||||||
|
Input::Gen(_) => unreachable!(),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect::<Vec<[u8; 32]>>();
|
||||||
|
|
||||||
self.blockchain_context_service
|
self.blockchain_context_service
|
||||||
.ready()
|
.ready()
|
||||||
.await
|
.await
|
||||||
|
@ -472,6 +485,14 @@ impl super::BlockchainManager {
|
||||||
};
|
};
|
||||||
|
|
||||||
self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone();
|
self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone();
|
||||||
|
|
||||||
|
self.txpool_write_handle
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
|
.call(TxpoolWriteRequest::NewBlock { spent_key_images })
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,11 +1,10 @@
|
||||||
// FIXME: This whole module is not great and should be rewritten when the PeerSet is made.
|
// FIXME: This whole module is not great and should be rewritten when the PeerSet is made.
|
||||||
use std::{pin::pin, sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use tokio::time::interval;
|
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{mpsc, Notify},
|
sync::{mpsc, Notify},
|
||||||
time::sleep,
|
time::interval,
|
||||||
};
|
};
|
||||||
use tower::{Service, ServiceExt};
|
use tower::{Service, ServiceExt};
|
||||||
use tracing::instrument;
|
use tracing::instrument;
|
||||||
|
|
|
@ -1,13 +1,7 @@
|
||||||
use std::task::{Context, Poll};
|
use tower::util::MapErr;
|
||||||
|
|
||||||
use futures::future::BoxFuture;
|
|
||||||
use futures::{FutureExt, TryFutureExt};
|
|
||||||
use tower::{util::MapErr, Service};
|
|
||||||
|
|
||||||
use cuprate_blockchain::{cuprate_database::RuntimeError, service::BlockchainReadHandle};
|
use cuprate_blockchain::{cuprate_database::RuntimeError, service::BlockchainReadHandle};
|
||||||
use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService};
|
use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService};
|
||||||
use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse};
|
|
||||||
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
|
|
||||||
|
|
||||||
/// The [`BlockVerifierService`] with all generic types defined.
|
/// The [`BlockVerifierService`] with all generic types defined.
|
||||||
pub type ConcreteBlockVerifierService = BlockVerifierService<
|
pub type ConcreteBlockVerifierService = BlockVerifierService<
|
||||||
|
|
|
@ -2,4 +2,7 @@
|
||||||
//!
|
//!
|
||||||
//! Will handle initiating the P2P and contains a protocol request handler.
|
//! Will handle initiating the P2P and contains a protocol request handler.
|
||||||
|
|
||||||
|
mod network_address;
|
||||||
pub mod request_handler;
|
pub mod request_handler;
|
||||||
|
|
||||||
|
pub use network_address::CrossNetworkInternalPeerId;
|
||||||
|
|
16
binaries/cuprated/src/p2p/network_address.rs
Normal file
16
binaries/cuprated/src/p2p/network_address.rs
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
use cuprate_p2p_core::{client::InternalPeerID, ClearNet, NetworkZone};
|
||||||
|
|
||||||
|
/// An identifier for a P2P peer on any network.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
pub enum CrossNetworkInternalPeerId {
|
||||||
|
/// A clear-net peer.
|
||||||
|
ClearNet(InternalPeerID<<ClearNet as NetworkZone>::Addr>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<InternalPeerID<<ClearNet as NetworkZone>::Addr>> for CrossNetworkInternalPeerId {
|
||||||
|
fn from(addr: InternalPeerID<<ClearNet as NetworkZone>::Addr>) -> Self {
|
||||||
|
Self::ClearNet(addr)
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,7 +1,7 @@
|
||||||
//! Global `static`s used throughout `cuprated`.
|
//! Global `static`s used throughout `cuprated`.
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
sync::{atomic::AtomicU64, LazyLock},
|
sync::LazyLock,
|
||||||
time::{SystemTime, UNIX_EPOCH},
|
time::{SystemTime, UNIX_EPOCH},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,15 @@
|
||||||
//! Transaction Pool
|
//! Transaction Pool
|
||||||
//!
|
//!
|
||||||
//! Will handle initiating the tx-pool, providing the preprocessor required for the dandelion pool.
|
//! Handles initiating the tx-pool, providing the preprocessor required for the dandelion pool.
|
||||||
|
use cuprate_consensus::BlockChainContextService;
|
||||||
|
use cuprate_p2p::NetworkInterface;
|
||||||
|
use cuprate_p2p_core::ClearNet;
|
||||||
|
use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
|
||||||
|
|
||||||
|
use crate::blockchain::ConcreteTxVerifierService;
|
||||||
|
|
||||||
|
mod dandelion;
|
||||||
|
mod incoming_tx;
|
||||||
|
mod txs_being_handled;
|
||||||
|
|
||||||
|
pub use incoming_tx::IncomingTxHandler;
|
||||||
|
|
65
binaries/cuprated/src/txpool/dandelion.rs
Normal file
65
binaries/cuprated/src/txpool/dandelion.rs
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use cuprate_dandelion_tower::{
|
||||||
|
pool::DandelionPoolService, DandelionConfig, DandelionRouter, Graph,
|
||||||
|
};
|
||||||
|
use cuprate_p2p::NetworkInterface;
|
||||||
|
use cuprate_p2p_core::ClearNet;
|
||||||
|
use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
p2p::CrossNetworkInternalPeerId,
|
||||||
|
txpool::incoming_tx::{DandelionTx, TxId},
|
||||||
|
};
|
||||||
|
|
||||||
|
mod diffuse_service;
|
||||||
|
mod stem_service;
|
||||||
|
mod tx_store;
|
||||||
|
|
||||||
|
/// The configuration used for [`cuprate_dandelion_tower`].
|
||||||
|
///
|
||||||
|
/// TODO: should we expose this to users of cuprated? probably not.
|
||||||
|
const DANDELION_CONFIG: DandelionConfig = DandelionConfig {
|
||||||
|
time_between_hop: Duration::from_millis(175),
|
||||||
|
epoch_duration: Duration::from_secs(10 * 60),
|
||||||
|
fluff_probability: 0.12,
|
||||||
|
graph: Graph::FourRegular,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// A [`DandelionRouter`] with all generic types defined.
|
||||||
|
type ConcreteDandelionRouter = DandelionRouter<
|
||||||
|
stem_service::OutboundPeerStream,
|
||||||
|
diffuse_service::DiffuseService,
|
||||||
|
CrossNetworkInternalPeerId,
|
||||||
|
stem_service::StemPeerService<ClearNet>,
|
||||||
|
DandelionTx,
|
||||||
|
>;
|
||||||
|
|
||||||
|
/// Starts the dandelion pool manager task and returns a handle to send txs to broadcast.
|
||||||
|
pub fn start_dandelion_pool_manager(
|
||||||
|
router: ConcreteDandelionRouter,
|
||||||
|
txpool_read_handle: TxpoolReadHandle,
|
||||||
|
txpool_write_handle: TxpoolWriteHandle,
|
||||||
|
) -> DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId> {
|
||||||
|
cuprate_dandelion_tower::pool::start_dandelion_pool_manager(
|
||||||
|
// TODO: make this constant configurable?
|
||||||
|
32,
|
||||||
|
router,
|
||||||
|
tx_store::TxStoreService {
|
||||||
|
txpool_read_handle,
|
||||||
|
txpool_write_handle,
|
||||||
|
},
|
||||||
|
DANDELION_CONFIG,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a [`DandelionRouter`] from a [`NetworkInterface`].
|
||||||
|
pub fn dandelion_router(clear_net: NetworkInterface<ClearNet>) -> ConcreteDandelionRouter {
|
||||||
|
DandelionRouter::new(
|
||||||
|
diffuse_service::DiffuseService {
|
||||||
|
clear_net_broadcast_service: clear_net.broadcast_svc(),
|
||||||
|
},
|
||||||
|
stem_service::OutboundPeerStream { clear_net },
|
||||||
|
DANDELION_CONFIG,
|
||||||
|
)
|
||||||
|
}
|
44
binaries/cuprated/src/txpool/dandelion/diffuse_service.rs
Normal file
44
binaries/cuprated/src/txpool/dandelion/diffuse_service.rs
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
use std::{
|
||||||
|
future::{ready, Ready},
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
use futures::FutureExt;
|
||||||
|
use tower::Service;
|
||||||
|
|
||||||
|
use cuprate_dandelion_tower::traits::DiffuseRequest;
|
||||||
|
use cuprate_p2p::{BroadcastRequest, BroadcastSvc};
|
||||||
|
use cuprate_p2p_core::ClearNet;
|
||||||
|
|
||||||
|
use crate::txpool::dandelion::DandelionTx;
|
||||||
|
|
||||||
|
/// The dandelion diffusion service.
|
||||||
|
pub struct DiffuseService {
|
||||||
|
pub clear_net_broadcast_service: BroadcastSvc<ClearNet>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Service<DiffuseRequest<DandelionTx>> for DiffuseService {
|
||||||
|
type Response = ();
|
||||||
|
type Error = tower::BoxError;
|
||||||
|
type Future = Ready<Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
self.clear_net_broadcast_service
|
||||||
|
.poll_ready(cx)
|
||||||
|
.map_err(Into::into)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: DiffuseRequest<DandelionTx>) -> Self::Future {
|
||||||
|
// TODO: the dandelion crate should pass along where we got the tx from.
|
||||||
|
let Ok(()) = self
|
||||||
|
.clear_net_broadcast_service
|
||||||
|
.call(BroadcastRequest::Transaction {
|
||||||
|
tx_bytes: req.0 .0,
|
||||||
|
direction: None,
|
||||||
|
received_from: None,
|
||||||
|
})
|
||||||
|
.into_inner();
|
||||||
|
|
||||||
|
ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
68
binaries/cuprated/src/txpool/dandelion/stem_service.rs
Normal file
68
binaries/cuprated/src/txpool/dandelion/stem_service.rs
Normal file
|
@ -0,0 +1,68 @@
|
||||||
|
use std::{
|
||||||
|
pin::Pin,
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures::Stream;
|
||||||
|
use tower::Service;
|
||||||
|
|
||||||
|
use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer};
|
||||||
|
use cuprate_p2p::{ClientPoolDropGuard, NetworkInterface};
|
||||||
|
use cuprate_p2p_core::{
|
||||||
|
client::{Client, InternalPeerID},
|
||||||
|
ClearNet, NetworkZone, PeerRequest, ProtocolRequest,
|
||||||
|
};
|
||||||
|
use cuprate_wire::protocol::NewTransactions;
|
||||||
|
|
||||||
|
use crate::{p2p::CrossNetworkInternalPeerId, txpool::dandelion::DandelionTx};
|
||||||
|
|
||||||
|
/// The dandelion outbound peer stream.
|
||||||
|
pub struct OutboundPeerStream {
|
||||||
|
pub clear_net: NetworkInterface<ClearNet>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream for OutboundPeerStream {
|
||||||
|
type Item = Result<
|
||||||
|
OutboundPeer<CrossNetworkInternalPeerId, StemPeerService<ClearNet>>,
|
||||||
|
tower::BoxError,
|
||||||
|
>;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
// TODO: make the outbound peer choice random.
|
||||||
|
Poll::Ready(Some(Ok(self
|
||||||
|
.clear_net
|
||||||
|
.client_pool()
|
||||||
|
.outbound_client()
|
||||||
|
.map_or(OutboundPeer::Exhausted, |client| {
|
||||||
|
OutboundPeer::Peer(
|
||||||
|
CrossNetworkInternalPeerId::ClearNet(client.info.id),
|
||||||
|
StemPeerService(client),
|
||||||
|
)
|
||||||
|
}))))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The stem service, used to send stem txs.
|
||||||
|
pub struct StemPeerService<N: NetworkZone>(ClientPoolDropGuard<N>);
|
||||||
|
|
||||||
|
impl<N: NetworkZone> Service<StemRequest<DandelionTx>> for StemPeerService<N> {
|
||||||
|
type Response = <Client<N> as Service<PeerRequest>>::Response;
|
||||||
|
type Error = tower::BoxError;
|
||||||
|
type Future = <Client<N> as Service<PeerRequest>>::Future;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
self.0.poll_ready(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: StemRequest<DandelionTx>) -> Self::Future {
|
||||||
|
self.0
|
||||||
|
.call(PeerRequest::Protocol(ProtocolRequest::NewTransactions(
|
||||||
|
NewTransactions {
|
||||||
|
txs: vec![req.0 .0],
|
||||||
|
dandelionpp_fluff: false,
|
||||||
|
padding: Bytes::new(),
|
||||||
|
},
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
}
|
74
binaries/cuprated/src/txpool/dandelion/tx_store.rs
Normal file
74
binaries/cuprated/src/txpool/dandelion/tx_store.rs
Normal file
|
@ -0,0 +1,74 @@
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures::{future::BoxFuture, FutureExt};
|
||||||
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
|
use cuprate_dandelion_tower::{
|
||||||
|
traits::{TxStoreRequest, TxStoreResponse},
|
||||||
|
State,
|
||||||
|
};
|
||||||
|
use cuprate_database::RuntimeError;
|
||||||
|
use cuprate_txpool::service::{
|
||||||
|
interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest},
|
||||||
|
TxpoolReadHandle, TxpoolWriteHandle,
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::{DandelionTx, TxId};
|
||||||
|
|
||||||
|
/// The dandelion tx-store service.
|
||||||
|
///
|
||||||
|
/// This is just mapping the interface [`cuprate_dandelion_tower`] wants to what [`cuprate_txpool`] provides.
|
||||||
|
pub struct TxStoreService {
|
||||||
|
pub txpool_read_handle: TxpoolReadHandle,
|
||||||
|
pub txpool_write_handle: TxpoolWriteHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Service<TxStoreRequest<TxId>> for TxStoreService {
|
||||||
|
type Response = TxStoreResponse<DandelionTx>;
|
||||||
|
type Error = tower::BoxError;
|
||||||
|
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: TxStoreRequest<TxId>) -> Self::Future {
|
||||||
|
match req {
|
||||||
|
TxStoreRequest::Get(tx_id) => self
|
||||||
|
.txpool_read_handle
|
||||||
|
.clone()
|
||||||
|
.oneshot(TxpoolReadRequest::TxBlob(tx_id))
|
||||||
|
.map(|res| match res {
|
||||||
|
Ok(TxpoolReadResponse::TxBlob {
|
||||||
|
tx_blob,
|
||||||
|
state_stem,
|
||||||
|
}) => {
|
||||||
|
let state = if state_stem {
|
||||||
|
State::Stem
|
||||||
|
} else {
|
||||||
|
State::Fluff
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(TxStoreResponse::Transaction(Some((
|
||||||
|
DandelionTx(Bytes::from(tx_blob)),
|
||||||
|
state,
|
||||||
|
))))
|
||||||
|
}
|
||||||
|
Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Transaction(None)),
|
||||||
|
Err(e) => Err(e.into()),
|
||||||
|
Ok(_) => unreachable!(),
|
||||||
|
})
|
||||||
|
.boxed(),
|
||||||
|
TxStoreRequest::Promote(tx_id) => self
|
||||||
|
.txpool_write_handle
|
||||||
|
.clone()
|
||||||
|
.oneshot(TxpoolWriteRequest::Promote(tx_id))
|
||||||
|
.map(|res| match res {
|
||||||
|
Ok(_) | Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Ok),
|
||||||
|
Err(e) => Err(e.into()),
|
||||||
|
})
|
||||||
|
.boxed(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
379
binaries/cuprated/src/txpool/incoming_tx.rs
Normal file
379
binaries/cuprated/src/txpool/incoming_tx.rs
Normal file
|
@ -0,0 +1,379 @@
|
||||||
|
use std::{
|
||||||
|
collections::HashSet,
|
||||||
|
sync::Arc,
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures::{future::BoxFuture, FutureExt};
|
||||||
|
use monero_serai::transaction::Transaction;
|
||||||
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
|
use cuprate_consensus::{
|
||||||
|
transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
|
||||||
|
BlockChainContextService, ExtendedConsensusError, VerifyTxRequest,
|
||||||
|
};
|
||||||
|
use cuprate_dandelion_tower::{
|
||||||
|
pool::{DandelionPoolService, IncomingTxBuilder},
|
||||||
|
State, TxState,
|
||||||
|
};
|
||||||
|
use cuprate_helper::asynch::rayon_spawn_async;
|
||||||
|
use cuprate_p2p::NetworkInterface;
|
||||||
|
use cuprate_p2p_core::ClearNet;
|
||||||
|
use cuprate_txpool::{
|
||||||
|
service::{
|
||||||
|
interface::{
|
||||||
|
TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse,
|
||||||
|
},
|
||||||
|
TxpoolReadHandle, TxpoolWriteHandle,
|
||||||
|
},
|
||||||
|
transaction_blob_hash,
|
||||||
|
};
|
||||||
|
use cuprate_types::TransactionVerificationData;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
blockchain::ConcreteTxVerifierService,
|
||||||
|
constants::PANIC_CRITICAL_SERVICE_ERROR,
|
||||||
|
p2p::CrossNetworkInternalPeerId,
|
||||||
|
signals::REORG_LOCK,
|
||||||
|
txpool::{
|
||||||
|
dandelion,
|
||||||
|
txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// An error that can happen handling an incoming tx.
|
||||||
|
pub enum IncomingTxError {
|
||||||
|
Parse(std::io::Error),
|
||||||
|
Consensus(ExtendedConsensusError),
|
||||||
|
DuplicateTransaction,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Incoming transactions.
|
||||||
|
pub struct IncomingTxs {
|
||||||
|
/// The raw bytes of the transactions.
|
||||||
|
pub txs: Vec<Bytes>,
|
||||||
|
/// The routing state of the transactions.
|
||||||
|
pub state: TxState<CrossNetworkInternalPeerId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The transaction type used for dandelion++.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct DandelionTx(pub Bytes);
|
||||||
|
|
||||||
|
/// A transaction ID/hash.
|
||||||
|
pub(super) type TxId = [u8; 32];
|
||||||
|
|
||||||
|
/// The service than handles incoming transaction pool transactions.
|
||||||
|
///
|
||||||
|
/// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes.
|
||||||
|
pub struct IncomingTxHandler {
|
||||||
|
/// A store of txs currently being handled in incoming tx requests.
|
||||||
|
pub(super) txs_being_handled: TxsBeingHandled,
|
||||||
|
/// The blockchain context cache.
|
||||||
|
pub(super) blockchain_context_cache: BlockChainContextService,
|
||||||
|
/// The dandelion txpool manager.
|
||||||
|
pub(super) dandelion_pool_manager:
|
||||||
|
DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
|
||||||
|
/// The transaction verifier service.
|
||||||
|
pub(super) tx_verifier_service: ConcreteTxVerifierService,
|
||||||
|
/// The txpool write handle.
|
||||||
|
pub(super) txpool_write_handle: TxpoolWriteHandle,
|
||||||
|
/// The txpool read handle.
|
||||||
|
pub(super) txpool_read_handle: TxpoolReadHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IncomingTxHandler {
|
||||||
|
/// Initialize the [`IncomingTxHandler`].
|
||||||
|
#[expect(clippy::significant_drop_tightening)]
|
||||||
|
pub fn init(
|
||||||
|
clear_net: NetworkInterface<ClearNet>,
|
||||||
|
txpool_write_handle: TxpoolWriteHandle,
|
||||||
|
txpool_read_handle: TxpoolReadHandle,
|
||||||
|
blockchain_context_cache: BlockChainContextService,
|
||||||
|
tx_verifier_service: ConcreteTxVerifierService,
|
||||||
|
) -> Self {
|
||||||
|
let dandelion_router = dandelion::dandelion_router(clear_net);
|
||||||
|
|
||||||
|
let dandelion_pool_manager = dandelion::start_dandelion_pool_manager(
|
||||||
|
dandelion_router,
|
||||||
|
txpool_read_handle.clone(),
|
||||||
|
txpool_write_handle.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
|
Self {
|
||||||
|
txs_being_handled: TxsBeingHandled::new(),
|
||||||
|
blockchain_context_cache,
|
||||||
|
dandelion_pool_manager,
|
||||||
|
tx_verifier_service,
|
||||||
|
txpool_write_handle,
|
||||||
|
txpool_read_handle,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Service<IncomingTxs> for IncomingTxHandler {
|
||||||
|
type Response = ();
|
||||||
|
type Error = IncomingTxError;
|
||||||
|
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: IncomingTxs) -> Self::Future {
|
||||||
|
handle_incoming_txs(
|
||||||
|
req,
|
||||||
|
self.txs_being_handled.clone(),
|
||||||
|
self.blockchain_context_cache.clone(),
|
||||||
|
self.tx_verifier_service.clone(),
|
||||||
|
self.txpool_write_handle.clone(),
|
||||||
|
self.txpool_read_handle.clone(),
|
||||||
|
self.dandelion_pool_manager.clone(),
|
||||||
|
)
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handles the incoming txs.
|
||||||
|
async fn handle_incoming_txs(
|
||||||
|
IncomingTxs { txs, state }: IncomingTxs,
|
||||||
|
txs_being_handled: TxsBeingHandled,
|
||||||
|
mut blockchain_context_cache: BlockChainContextService,
|
||||||
|
mut tx_verifier_service: ConcreteTxVerifierService,
|
||||||
|
mut txpool_write_handle: TxpoolWriteHandle,
|
||||||
|
mut txpool_read_handle: TxpoolReadHandle,
|
||||||
|
mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
|
||||||
|
) -> Result<(), IncomingTxError> {
|
||||||
|
let _reorg_guard = REORG_LOCK.read().await;
|
||||||
|
|
||||||
|
let (txs, stem_pool_txs, txs_being_handled_guard) =
|
||||||
|
prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?;
|
||||||
|
|
||||||
|
let BlockChainContextResponse::Context(context) = blockchain_context_cache
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
|
.call(BlockChainContextRequest::Context)
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
|
else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
|
||||||
|
let context = context.unchecked_blockchain_context();
|
||||||
|
|
||||||
|
tx_verifier_service
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
|
.call(VerifyTxRequest::Prepped {
|
||||||
|
txs: txs.clone(),
|
||||||
|
current_chain_height: context.chain_height,
|
||||||
|
top_hash: context.top_hash,
|
||||||
|
time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(),
|
||||||
|
hf: context.current_hf,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(IncomingTxError::Consensus)?;
|
||||||
|
|
||||||
|
for tx in txs {
|
||||||
|
handle_valid_tx(
|
||||||
|
tx,
|
||||||
|
state.clone(),
|
||||||
|
&mut txpool_write_handle,
|
||||||
|
&mut dandelion_pool_manager,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-relay any txs we got in the block that were already in our stem pool.
|
||||||
|
for stem_tx in stem_pool_txs {
|
||||||
|
rerelay_stem_tx(
|
||||||
|
&stem_tx,
|
||||||
|
state.clone(),
|
||||||
|
&mut txpool_read_handle,
|
||||||
|
&mut dandelion_pool_manager,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Prepares the incoming transactions for verification.
|
||||||
|
///
|
||||||
|
/// This will filter out all transactions already in the pool or txs already being handled in another request.
|
||||||
|
///
|
||||||
|
/// Returns in order:
|
||||||
|
/// - The [`TransactionVerificationData`] for all the txs we did not already have
|
||||||
|
/// - The Ids of the transactions in the incoming message that are in our stem-pool
|
||||||
|
/// - A [`TxsBeingHandledLocally`] guard that prevents verifying the same tx at the same time across 2 tasks.
|
||||||
|
async fn prepare_incoming_txs(
|
||||||
|
tx_blobs: Vec<Bytes>,
|
||||||
|
txs_being_handled: TxsBeingHandled,
|
||||||
|
txpool_read_handle: &mut TxpoolReadHandle,
|
||||||
|
) -> Result<
|
||||||
|
(
|
||||||
|
Vec<Arc<TransactionVerificationData>>,
|
||||||
|
Vec<TxId>,
|
||||||
|
TxsBeingHandledLocally,
|
||||||
|
),
|
||||||
|
IncomingTxError,
|
||||||
|
> {
|
||||||
|
let mut tx_blob_hashes = HashSet::new();
|
||||||
|
let mut txs_being_handled_locally = txs_being_handled.local_tracker();
|
||||||
|
|
||||||
|
// Compute the blob hash for each tx and filter out the txs currently being handled by another incoming tx batch.
|
||||||
|
let txs = tx_blobs
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|tx_blob| {
|
||||||
|
let tx_blob_hash = transaction_blob_hash(&tx_blob);
|
||||||
|
|
||||||
|
// If a duplicate is in here the incoming tx batch contained the same tx twice.
|
||||||
|
if !tx_blob_hashes.insert(tx_blob_hash) {
|
||||||
|
return Some(Err(IncomingTxError::DuplicateTransaction));
|
||||||
|
}
|
||||||
|
|
||||||
|
// If a duplicate is here it is being handled in another batch.
|
||||||
|
if !txs_being_handled_locally.try_add_tx(tx_blob_hash) {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(Ok((tx_blob_hash, tx_blob)))
|
||||||
|
})
|
||||||
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
|
// Filter the txs already in the txpool out.
|
||||||
|
// This will leave the txs already in the pool in [`TxBeingHandledLocally`] but that shouldn't be an issue.
|
||||||
|
let TxpoolReadResponse::FilterKnownTxBlobHashes {
|
||||||
|
unknown_blob_hashes,
|
||||||
|
stem_pool_hashes,
|
||||||
|
} = txpool_read_handle
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
|
.call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes))
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
|
else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
|
||||||
|
// Now prepare the txs for verification.
|
||||||
|
rayon_spawn_async(move || {
|
||||||
|
let txs = txs
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|(tx_blob_hash, tx_blob)| {
|
||||||
|
if unknown_blob_hashes.contains(&tx_blob_hash) {
|
||||||
|
Some(tx_blob)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.map(|bytes| {
|
||||||
|
let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?;
|
||||||
|
|
||||||
|
let tx = new_tx_verification_data(tx)
|
||||||
|
.map_err(|e| IncomingTxError::Consensus(e.into()))?;
|
||||||
|
|
||||||
|
Ok(Arc::new(tx))
|
||||||
|
})
|
||||||
|
.collect::<Result<Vec<_>, IncomingTxError>>()?;
|
||||||
|
|
||||||
|
Ok((txs, stem_pool_hashes, txs_being_handled_locally))
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle a verified tx.
|
||||||
|
///
|
||||||
|
/// This will add the tx to the txpool and route it to the network.
|
||||||
|
async fn handle_valid_tx(
|
||||||
|
tx: Arc<TransactionVerificationData>,
|
||||||
|
state: TxState<CrossNetworkInternalPeerId>,
|
||||||
|
txpool_write_handle: &mut TxpoolWriteHandle,
|
||||||
|
dandelion_pool_manager: &mut DandelionPoolService<
|
||||||
|
DandelionTx,
|
||||||
|
TxId,
|
||||||
|
CrossNetworkInternalPeerId,
|
||||||
|
>,
|
||||||
|
) {
|
||||||
|
let incoming_tx =
|
||||||
|
IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash);
|
||||||
|
|
||||||
|
let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
|
.call(TxpoolWriteRequest::AddTransaction {
|
||||||
|
tx,
|
||||||
|
state_stem: state.is_stem_stage(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("TODO")
|
||||||
|
else {
|
||||||
|
unreachable!()
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: track double spends to quickly ignore them from their blob hash.
|
||||||
|
if let Some(tx_hash) = double_spend {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: There is a race condition possible if a tx and block come in at the same time: <https://github.com/Cuprate/cuprate/issues/314>.
|
||||||
|
|
||||||
|
let incoming_tx = incoming_tx
|
||||||
|
.with_routing_state(state)
|
||||||
|
.with_state_in_db(None)
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
dandelion_pool_manager
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
|
.call(incoming_tx)
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Re-relay a tx that was already in our stem pool.
|
||||||
|
async fn rerelay_stem_tx(
|
||||||
|
tx_hash: &TxId,
|
||||||
|
state: TxState<CrossNetworkInternalPeerId>,
|
||||||
|
txpool_read_handle: &mut TxpoolReadHandle,
|
||||||
|
dandelion_pool_manager: &mut DandelionPoolService<
|
||||||
|
DandelionTx,
|
||||||
|
TxId,
|
||||||
|
CrossNetworkInternalPeerId,
|
||||||
|
>,
|
||||||
|
) {
|
||||||
|
let Ok(TxpoolReadResponse::TxBlob { tx_blob, .. }) = txpool_read_handle
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
|
.call(TxpoolReadRequest::TxBlob(*tx_hash))
|
||||||
|
.await
|
||||||
|
else {
|
||||||
|
// The tx could have been dropped from the pool.
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let incoming_tx =
|
||||||
|
IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx_blob)), *tx_hash);
|
||||||
|
|
||||||
|
let incoming_tx = incoming_tx
|
||||||
|
.with_routing_state(state)
|
||||||
|
.with_state_in_db(Some(State::Stem))
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
dandelion_pool_manager
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR)
|
||||||
|
.call(incoming_tx)
|
||||||
|
.await
|
||||||
|
.expect(PANIC_CRITICAL_SERVICE_ERROR);
|
||||||
|
}
|
53
binaries/cuprated/src/txpool/txs_being_handled.rs
Normal file
53
binaries/cuprated/src/txpool/txs_being_handled.rs
Normal file
|
@ -0,0 +1,53 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use dashmap::DashSet;
|
||||||
|
|
||||||
|
/// A set of txs currently being handled, shared between instances of the incoming tx handler.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct TxsBeingHandled(Arc<DashSet<[u8; 32]>>);
|
||||||
|
|
||||||
|
impl TxsBeingHandled {
|
||||||
|
/// Create a new [`TxsBeingHandled`]
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self(Arc::new(DashSet::new()))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new [`TxsBeingHandledLocally`] that will keep track of txs being handled in a request.
|
||||||
|
pub fn local_tracker(&self) -> TxsBeingHandledLocally {
|
||||||
|
TxsBeingHandledLocally {
|
||||||
|
txs_being_handled: self.clone(),
|
||||||
|
txs: vec![],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A tracker of txs being handled in a single request. This will add the txs to the global [`TxsBeingHandled`]
|
||||||
|
/// tracker as well.
|
||||||
|
///
|
||||||
|
/// When this is dropped the txs will be removed from [`TxsBeingHandled`].
|
||||||
|
pub struct TxsBeingHandledLocally {
|
||||||
|
txs_being_handled: TxsBeingHandled,
|
||||||
|
txs: Vec<[u8; 32]>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TxsBeingHandledLocally {
|
||||||
|
/// Try add a tx to the map from its [`transaction_blob_hash`](cuprate_txpool::transaction_blob_hash).
|
||||||
|
///
|
||||||
|
/// Returns `true` if the tx was added and `false` if another task is already handling this tx.
|
||||||
|
pub fn try_add_tx(&mut self, tx_blob_hash: [u8; 32]) -> bool {
|
||||||
|
if !self.txs_being_handled.0.insert(tx_blob_hash) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.txs.push(tx_blob_hash);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for TxsBeingHandledLocally {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
for hash in &self.txs {
|
||||||
|
self.txs_being_handled.0.remove(hash);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -73,6 +73,15 @@ pub enum TxState<Id> {
|
||||||
Local,
|
Local,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<Id> TxState<Id> {
|
||||||
|
/// Returns `true` if the tx is in the stem stage.
|
||||||
|
///
|
||||||
|
/// [`TxState::Local`] & [`TxState::Stem`] are the 2 stem stage states.
|
||||||
|
pub const fn is_stem_stage(&self) -> bool {
|
||||||
|
matches!(self, Self::Local | Self::Stem { .. })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// A request to route a transaction.
|
/// A request to route a transaction.
|
||||||
pub struct DandelionRouteReq<Tx, Id> {
|
pub struct DandelionRouteReq<Tx, Id> {
|
||||||
/// The transaction.
|
/// The transaction.
|
||||||
|
|
|
@ -18,13 +18,13 @@ use tracing::{Instrument, Span};
|
||||||
use cuprate_p2p_core::{
|
use cuprate_p2p_core::{
|
||||||
client::{Client, InternalPeerID},
|
client::{Client, InternalPeerID},
|
||||||
handles::ConnectionHandle,
|
handles::ConnectionHandle,
|
||||||
NetworkZone,
|
ConnectionDirection, NetworkZone,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub(crate) mod disconnect_monitor;
|
pub(crate) mod disconnect_monitor;
|
||||||
mod drop_guard_client;
|
mod drop_guard_client;
|
||||||
|
|
||||||
pub(crate) use drop_guard_client::ClientPoolDropGuard;
|
pub use drop_guard_client::ClientPoolDropGuard;
|
||||||
|
|
||||||
/// The client pool, which holds currently connected free peers.
|
/// The client pool, which holds currently connected free peers.
|
||||||
///
|
///
|
||||||
|
@ -165,6 +165,17 @@ impl<N: NetworkZone> ClientPool<N> {
|
||||||
sync_data.cumulative_difficulty() > cumulative_difficulty
|
sync_data.cumulative_difficulty() > cumulative_difficulty
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the first outbound peer when iterating over the peers.
|
||||||
|
pub fn outbound_client(self: &Arc<Self>) -> Option<ClientPoolDropGuard<N>> {
|
||||||
|
let client = self
|
||||||
|
.clients
|
||||||
|
.iter()
|
||||||
|
.find(|element| element.value().info.direction == ConnectionDirection::Outbound)?;
|
||||||
|
let id = *client.key();
|
||||||
|
|
||||||
|
Some(self.borrow_client(&id).unwrap())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mod sealed {
|
mod sealed {
|
||||||
|
|
|
@ -18,7 +18,7 @@ use cuprate_p2p_core::{
|
||||||
|
|
||||||
pub mod block_downloader;
|
pub mod block_downloader;
|
||||||
mod broadcast;
|
mod broadcast;
|
||||||
mod client_pool;
|
pub mod client_pool;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod connection_maintainer;
|
pub mod connection_maintainer;
|
||||||
pub mod constants;
|
pub mod constants;
|
||||||
|
@ -26,6 +26,7 @@ mod inbound_server;
|
||||||
|
|
||||||
use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse};
|
use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse};
|
||||||
pub use broadcast::{BroadcastRequest, BroadcastSvc};
|
pub use broadcast::{BroadcastRequest, BroadcastSvc};
|
||||||
|
pub use client_pool::{ClientPool, ClientPoolDropGuard};
|
||||||
pub use config::{AddressBookConfig, P2PConfig};
|
pub use config::{AddressBookConfig, P2PConfig};
|
||||||
use connection_maintainer::MakeConnectionRequest;
|
use connection_maintainer::MakeConnectionRequest;
|
||||||
|
|
||||||
|
@ -82,7 +83,7 @@ where
|
||||||
|
|
||||||
let outbound_handshaker = outbound_handshaker_builder.build();
|
let outbound_handshaker = outbound_handshaker_builder.build();
|
||||||
|
|
||||||
let client_pool = client_pool::ClientPool::new();
|
let client_pool = ClientPool::new();
|
||||||
|
|
||||||
let (make_connection_tx, make_connection_rx) = mpsc::channel(3);
|
let (make_connection_tx, make_connection_rx) = mpsc::channel(3);
|
||||||
|
|
||||||
|
@ -132,7 +133,7 @@ where
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct NetworkInterface<N: NetworkZone> {
|
pub struct NetworkInterface<N: NetworkZone> {
|
||||||
/// A pool of free connected peers.
|
/// A pool of free connected peers.
|
||||||
pool: Arc<client_pool::ClientPool<N>>,
|
pool: Arc<ClientPool<N>>,
|
||||||
/// A [`Service`] that allows broadcasting to all connected peers.
|
/// A [`Service`] that allows broadcasting to all connected peers.
|
||||||
broadcast_svc: BroadcastSvc<N>,
|
broadcast_svc: BroadcastSvc<N>,
|
||||||
/// A channel to request extra connections.
|
/// A channel to request extra connections.
|
||||||
|
@ -173,7 +174,7 @@ impl<N: NetworkZone> NetworkInterface<N> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Borrows the `ClientPool`, for access to connected peers.
|
/// Borrows the `ClientPool`, for access to connected peers.
|
||||||
pub const fn client_pool(&self) -> &Arc<client_pool::ClientPool<N>> {
|
pub const fn client_pool(&self) -> &Arc<ClientPool<N>> {
|
||||||
&self.pool
|
&self.pool
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,14 @@ pub struct DatabaseWriteHandle<Req, Res> {
|
||||||
crossbeam::channel::Sender<(Req, oneshot::Sender<Result<Res, RuntimeError>>)>,
|
crossbeam::channel::Sender<(Req, oneshot::Sender<Result<Res, RuntimeError>>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<Req, Res> Clone for DatabaseWriteHandle<Req, Res> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
sender: self.sender.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<Req, Res> DatabaseWriteHandle<Req, Res>
|
impl<Req, Res> DatabaseWriteHandle<Req, Res>
|
||||||
where
|
where
|
||||||
Req: Send + 'static,
|
Req: Send + 'static,
|
||||||
|
|
|
@ -29,6 +29,7 @@ bytemuck = { workspace = true, features = ["must_cast", "derive"
|
||||||
bitflags = { workspace = true, features = ["std", "serde", "bytemuck"] }
|
bitflags = { workspace = true, features = ["std", "serde", "bytemuck"] }
|
||||||
thiserror = { workspace = true }
|
thiserror = { workspace = true }
|
||||||
hex = { workspace = true }
|
hex = { workspace = true }
|
||||||
|
blake3 = { workspace = true, features = ["std"] }
|
||||||
|
|
||||||
tower = { workspace = true, optional = true }
|
tower = { workspace = true, optional = true }
|
||||||
rayon = { workspace = true, optional = true }
|
rayon = { workspace = true, optional = true }
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
//---------------------------------------------------------------------------------------------------- Import
|
//---------------------------------------------------------------------------------------------------- Import
|
||||||
use cuprate_database::{ConcreteEnv, Env, EnvInner, InitError, RuntimeError, TxRw};
|
use cuprate_database::{ConcreteEnv, Env, EnvInner, InitError, RuntimeError, TxRw};
|
||||||
|
|
||||||
use crate::{config::Config, tables::OpenTables};
|
use crate::{config::Config, tables::OpenTables, types::TransactionBlobHash};
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------------------------- Free functions
|
//---------------------------------------------------------------------------------------------------- Free functions
|
||||||
/// Open the txpool database using the passed [`Config`].
|
/// Open the txpool database using the passed [`Config`].
|
||||||
|
@ -60,3 +60,13 @@ pub fn open(config: Config) -> Result<ConcreteEnv, InitError> {
|
||||||
|
|
||||||
Ok(env)
|
Ok(env)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Calculate the transaction blob hash.
|
||||||
|
///
|
||||||
|
/// This value is supposed to be quick to compute just based of the tx-blob without needing to parse the tx.
|
||||||
|
///
|
||||||
|
/// The exact way the hash is calculated is not stable and is subject to change, as such it should not be exposed
|
||||||
|
/// as a way to interact with Cuprate externally.
|
||||||
|
pub fn transaction_blob_hash(tx_blob: &[u8]) -> TransactionBlobHash {
|
||||||
|
blake3::hash(tx_blob).into()
|
||||||
|
}
|
||||||
|
|
|
@ -14,7 +14,7 @@ mod tx;
|
||||||
pub mod types;
|
pub mod types;
|
||||||
|
|
||||||
pub use config::Config;
|
pub use config::Config;
|
||||||
pub use free::open;
|
pub use free::{open, transaction_blob_hash};
|
||||||
pub use tx::{BlockTemplateTxEntry, TxEntry};
|
pub use tx::{BlockTemplateTxEntry, TxEntry};
|
||||||
|
|
||||||
//re-exports
|
//re-exports
|
||||||
|
|
|
@ -85,7 +85,7 @@ mod key_images;
|
||||||
mod tx_read;
|
mod tx_read;
|
||||||
mod tx_write;
|
mod tx_write;
|
||||||
|
|
||||||
pub use tx_read::get_transaction_verification_data;
|
pub use tx_read::{get_transaction_verification_data, in_stem_pool};
|
||||||
pub use tx_write::{add_transaction, remove_transaction};
|
pub use tx_write::{add_transaction, remove_transaction};
|
||||||
|
|
||||||
/// An error that can occur on some tx-write ops.
|
/// An error that can occur on some tx-write ops.
|
||||||
|
|
|
@ -8,7 +8,10 @@ use monero_serai::transaction::Transaction;
|
||||||
use cuprate_database::{DatabaseRo, RuntimeError};
|
use cuprate_database::{DatabaseRo, RuntimeError};
|
||||||
use cuprate_types::{TransactionVerificationData, TxVersion};
|
use cuprate_types::{TransactionVerificationData, TxVersion};
|
||||||
|
|
||||||
use crate::{tables::Tables, types::TransactionHash};
|
use crate::{
|
||||||
|
tables::{Tables, TransactionInfos},
|
||||||
|
types::{TransactionHash, TxStateFlags},
|
||||||
|
};
|
||||||
|
|
||||||
/// Gets the [`TransactionVerificationData`] of a transaction in the tx-pool, leaving the tx in the pool.
|
/// Gets the [`TransactionVerificationData`] of a transaction in the tx-pool, leaving the tx in the pool.
|
||||||
pub fn get_transaction_verification_data(
|
pub fn get_transaction_verification_data(
|
||||||
|
@ -34,3 +37,17 @@ pub fn get_transaction_verification_data(
|
||||||
cached_verification_state: Mutex::new(cached_verification_state),
|
cached_verification_state: Mutex::new(cached_verification_state),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns `true` if the transaction with the given hash is in the stem pool.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// This will return an [`Err`] if the transaction is not in the pool.
|
||||||
|
pub fn in_stem_pool(
|
||||||
|
tx_hash: &TransactionHash,
|
||||||
|
tx_infos: &impl DatabaseRo<TransactionInfos>,
|
||||||
|
) -> Result<bool, RuntimeError> {
|
||||||
|
Ok(tx_infos
|
||||||
|
.get(tx_hash)?
|
||||||
|
.flags
|
||||||
|
.contains(TxStateFlags::STATE_STEM))
|
||||||
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ use cuprate_database::{DatabaseRw, RuntimeError, StorableVec};
|
||||||
use cuprate_types::TransactionVerificationData;
|
use cuprate_types::TransactionVerificationData;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
free::transaction_blob_hash,
|
||||||
ops::{
|
ops::{
|
||||||
key_images::{add_tx_key_images, remove_tx_key_images},
|
key_images::{add_tx_key_images, remove_tx_key_images},
|
||||||
TxPoolWriteError,
|
TxPoolWriteError,
|
||||||
|
@ -56,6 +57,12 @@ pub fn add_transaction(
|
||||||
let kis_table = tables.spent_key_images_mut();
|
let kis_table = tables.spent_key_images_mut();
|
||||||
add_tx_key_images(&tx.tx.prefix().inputs, &tx.tx_hash, kis_table)?;
|
add_tx_key_images(&tx.tx.prefix().inputs, &tx.tx_hash, kis_table)?;
|
||||||
|
|
||||||
|
// Add the blob hash to table 4.
|
||||||
|
let blob_hash = transaction_blob_hash(&tx.tx_blob);
|
||||||
|
tables
|
||||||
|
.known_blob_hashes_mut()
|
||||||
|
.put(&blob_hash, &tx.tx_hash)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,5 +86,9 @@ pub fn remove_transaction(
|
||||||
let kis_table = tables.spent_key_images_mut();
|
let kis_table = tables.spent_key_images_mut();
|
||||||
remove_tx_key_images(&tx.prefix().inputs, kis_table)?;
|
remove_tx_key_images(&tx.prefix().inputs, kis_table)?;
|
||||||
|
|
||||||
|
// Remove the blob hash from table 4.
|
||||||
|
let blob_hash = transaction_blob_hash(&tx_blob);
|
||||||
|
tables.known_blob_hashes_mut().delete(&blob_hash)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,24 +1,36 @@
|
||||||
//! Tx-pool [`service`](super) interface.
|
//! Tx-pool [`service`](super) interface.
|
||||||
//!
|
//!
|
||||||
//! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums.
|
//! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums.
|
||||||
use std::sync::Arc;
|
use std::{
|
||||||
|
collections::{HashMap, HashSet},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
use cuprate_types::TransactionVerificationData;
|
use cuprate_types::TransactionVerificationData;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
tx::{BlockTemplateTxEntry, TxEntry},
|
tx::{BlockTemplateTxEntry, TxEntry},
|
||||||
types::TransactionHash,
|
types::{KeyImage, TransactionBlobHash, TransactionHash},
|
||||||
};
|
};
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------------------------- TxpoolReadRequest
|
//---------------------------------------------------------------------------------------------------- TxpoolReadRequest
|
||||||
/// The transaction pool [`tower::Service`] read request type.
|
/// The transaction pool [`tower::Service`] read request type.
|
||||||
|
#[derive(Clone)]
|
||||||
pub enum TxpoolReadRequest {
|
pub enum TxpoolReadRequest {
|
||||||
/// A request for the blob (raw bytes) of a transaction with the given hash.
|
/// Get the blob (raw bytes) of a transaction with the given hash.
|
||||||
TxBlob(TransactionHash),
|
TxBlob(TransactionHash),
|
||||||
|
|
||||||
/// A request for the [`TransactionVerificationData`] of a transaction in the tx pool.
|
/// Get the [`TransactionVerificationData`] of a transaction in the tx pool.
|
||||||
TxVerificationData(TransactionHash),
|
TxVerificationData(TransactionHash),
|
||||||
|
|
||||||
|
/// Filter (remove) all **known** transactions from the set.
|
||||||
|
///
|
||||||
|
/// The hash is **not** the transaction hash, it is the hash of the serialized tx-blob.
|
||||||
|
FilterKnownTxBlobHashes(HashSet<TransactionBlobHash>),
|
||||||
|
|
||||||
|
/// Get some transactions for an incoming block.
|
||||||
|
TxsForBlock(Vec<TransactionHash>),
|
||||||
|
|
||||||
/// Get information on all transactions in the pool.
|
/// Get information on all transactions in the pool.
|
||||||
Backlog,
|
Backlog,
|
||||||
|
|
||||||
|
@ -40,15 +52,28 @@ pub enum TxpoolReadRequest {
|
||||||
/// The transaction pool [`tower::Service`] read response type.
|
/// The transaction pool [`tower::Service`] read response type.
|
||||||
#[expect(clippy::large_enum_variant)]
|
#[expect(clippy::large_enum_variant)]
|
||||||
pub enum TxpoolReadResponse {
|
pub enum TxpoolReadResponse {
|
||||||
/// Response to [`TxpoolReadRequest::TxBlob`].
|
/// The response for [`TxpoolReadRequest::TxBlob`].
|
||||||
///
|
TxBlob { tx_blob: Vec<u8>, state_stem: bool },
|
||||||
/// The inner value is the raw bytes of a transaction.
|
|
||||||
// TODO: use bytes::Bytes.
|
|
||||||
TxBlob(Vec<u8>),
|
|
||||||
|
|
||||||
/// Response to [`TxpoolReadRequest::TxVerificationData`].
|
/// The response for [`TxpoolReadRequest::TxVerificationData`].
|
||||||
TxVerificationData(TransactionVerificationData),
|
TxVerificationData(TransactionVerificationData),
|
||||||
|
|
||||||
|
/// The response for [`TxpoolReadRequest::FilterKnownTxBlobHashes`].
|
||||||
|
FilterKnownTxBlobHashes {
|
||||||
|
/// The blob hashes that are unknown.
|
||||||
|
unknown_blob_hashes: HashSet<TransactionBlobHash>,
|
||||||
|
/// The tx hashes of the blob hashes that were known but were in the stem pool.
|
||||||
|
stem_pool_hashes: Vec<TransactionHash>,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// The response for [`TxpoolReadRequest::TxsForBlock`].
|
||||||
|
TxsForBlock {
|
||||||
|
/// The txs we had in the txpool.
|
||||||
|
txs: HashMap<[u8; 32], TransactionVerificationData>,
|
||||||
|
/// The indexes of the missing txs.
|
||||||
|
missing: Vec<usize>,
|
||||||
|
},
|
||||||
|
|
||||||
/// Response to [`TxpoolReadRequest::Backlog`].
|
/// Response to [`TxpoolReadRequest::Backlog`].
|
||||||
///
|
///
|
||||||
/// The inner [`Vec`] contains information on all
|
/// The inner [`Vec`] contains information on all
|
||||||
|
@ -84,9 +109,17 @@ pub enum TxpoolWriteRequest {
|
||||||
},
|
},
|
||||||
|
|
||||||
/// Remove a transaction with the given hash from the pool.
|
/// Remove a transaction with the given hash from the pool.
|
||||||
///
|
|
||||||
/// Returns [`TxpoolWriteResponse::Ok`].
|
|
||||||
RemoveTransaction(TransactionHash),
|
RemoveTransaction(TransactionHash),
|
||||||
|
|
||||||
|
/// Promote a transaction from the stem pool to the fluff pool.
|
||||||
|
/// If the tx is already in the fluff pool this does nothing.
|
||||||
|
Promote(TransactionHash),
|
||||||
|
|
||||||
|
/// Tell the tx-pool about a new block.
|
||||||
|
NewBlock {
|
||||||
|
/// The spent key images in the new block.
|
||||||
|
spent_key_images: Vec<KeyImage>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------------------------- TxpoolWriteResponse
|
//---------------------------------------------------------------------------------------------------- TxpoolWriteResponse
|
||||||
|
@ -95,6 +128,8 @@ pub enum TxpoolWriteRequest {
|
||||||
pub enum TxpoolWriteResponse {
|
pub enum TxpoolWriteResponse {
|
||||||
/// Response to:
|
/// Response to:
|
||||||
/// - [`TxpoolWriteRequest::RemoveTransaction`]
|
/// - [`TxpoolWriteRequest::RemoveTransaction`]
|
||||||
|
/// - [`TxpoolWriteRequest::Promote`]
|
||||||
|
/// - [`TxpoolWriteRequest::NewBlock`]
|
||||||
Ok,
|
Ok,
|
||||||
|
|
||||||
/// Response to [`TxpoolWriteRequest::AddTransaction`].
|
/// Response to [`TxpoolWriteRequest::AddTransaction`].
|
||||||
|
|
|
@ -4,22 +4,24 @@
|
||||||
clippy::unnecessary_wraps,
|
clippy::unnecessary_wraps,
|
||||||
reason = "TODO: finish implementing the signatures from <https://github.com/Cuprate/cuprate/pull/297>"
|
reason = "TODO: finish implementing the signatures from <https://github.com/Cuprate/cuprate/pull/297>"
|
||||||
)]
|
)]
|
||||||
|
use std::{
|
||||||
use std::sync::Arc;
|
collections::{HashMap, HashSet},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
use rayon::ThreadPool;
|
use rayon::ThreadPool;
|
||||||
|
|
||||||
use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner};
|
use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError};
|
||||||
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
|
use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
ops::get_transaction_verification_data,
|
ops::{get_transaction_verification_data, in_stem_pool},
|
||||||
service::{
|
service::{
|
||||||
interface::{TxpoolReadRequest, TxpoolReadResponse},
|
interface::{TxpoolReadRequest, TxpoolReadResponse},
|
||||||
types::{ReadResponseResult, TxpoolReadHandle},
|
types::{ReadResponseResult, TxpoolReadHandle},
|
||||||
},
|
},
|
||||||
tables::{OpenTables, TransactionBlobs},
|
tables::{KnownBlobHashes, OpenTables, TransactionBlobs, TransactionInfos},
|
||||||
types::TransactionHash,
|
types::{TransactionBlobHash, TransactionHash},
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: update the docs here
|
// TODO: update the docs here
|
||||||
|
@ -57,7 +59,6 @@ fn init_read_service_with_pool(env: Arc<ConcreteEnv>, pool: Arc<ThreadPool>) ->
|
||||||
/// 1. `Request` is mapped to a handler function
|
/// 1. `Request` is mapped to a handler function
|
||||||
/// 2. Handler function is called
|
/// 2. Handler function is called
|
||||||
/// 3. [`TxpoolReadResponse`] is returned
|
/// 3. [`TxpoolReadResponse`] is returned
|
||||||
#[expect(clippy::needless_pass_by_value)]
|
|
||||||
fn map_request(
|
fn map_request(
|
||||||
env: &ConcreteEnv, // Access to the database
|
env: &ConcreteEnv, // Access to the database
|
||||||
request: TxpoolReadRequest, // The request we must fulfill
|
request: TxpoolReadRequest, // The request we must fulfill
|
||||||
|
@ -65,6 +66,10 @@ fn map_request(
|
||||||
match request {
|
match request {
|
||||||
TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash),
|
TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash),
|
||||||
TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash),
|
TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash),
|
||||||
|
TxpoolReadRequest::FilterKnownTxBlobHashes(blob_hashes) => {
|
||||||
|
filter_known_tx_blob_hashes(env, blob_hashes)
|
||||||
|
}
|
||||||
|
TxpoolReadRequest::TxsForBlock(txs_needed) => txs_for_block(env, txs_needed),
|
||||||
TxpoolReadRequest::Backlog => backlog(env),
|
TxpoolReadRequest::Backlog => backlog(env),
|
||||||
TxpoolReadRequest::BlockTemplateBacklog => block_template_backlog(env),
|
TxpoolReadRequest::BlockTemplateBacklog => block_template_backlog(env),
|
||||||
TxpoolReadRequest::Size {
|
TxpoolReadRequest::Size {
|
||||||
|
@ -97,10 +102,14 @@ fn tx_blob(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult {
|
||||||
let tx_ro = inner_env.tx_ro()?;
|
let tx_ro = inner_env.tx_ro()?;
|
||||||
|
|
||||||
let tx_blobs_table = inner_env.open_db_ro::<TransactionBlobs>(&tx_ro)?;
|
let tx_blobs_table = inner_env.open_db_ro::<TransactionBlobs>(&tx_ro)?;
|
||||||
|
let tx_infos_table = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
|
||||||
|
|
||||||
tx_blobs_table
|
let tx_blob = tx_blobs_table.get(tx_hash)?.0;
|
||||||
.get(tx_hash)
|
|
||||||
.map(|blob| TxpoolReadResponse::TxBlob(blob.0))
|
Ok(TxpoolReadResponse::TxBlob {
|
||||||
|
tx_blob,
|
||||||
|
state_stem: in_stem_pool(tx_hash, &tx_infos_table)?,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// [`TxpoolReadRequest::TxVerificationData`].
|
/// [`TxpoolReadRequest::TxVerificationData`].
|
||||||
|
@ -114,6 +123,79 @@ fn tx_verification_data(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadRes
|
||||||
get_transaction_verification_data(tx_hash, &tables).map(TxpoolReadResponse::TxVerificationData)
|
get_transaction_verification_data(tx_hash, &tables).map(TxpoolReadResponse::TxVerificationData)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// [`TxpoolReadRequest::FilterKnownTxBlobHashes`].
|
||||||
|
fn filter_known_tx_blob_hashes(
|
||||||
|
env: &ConcreteEnv,
|
||||||
|
mut blob_hashes: HashSet<TransactionBlobHash>,
|
||||||
|
) -> ReadResponseResult {
|
||||||
|
let inner_env = env.env_inner();
|
||||||
|
let tx_ro = inner_env.tx_ro()?;
|
||||||
|
|
||||||
|
let tx_blob_hashes = inner_env.open_db_ro::<KnownBlobHashes>(&tx_ro)?;
|
||||||
|
let tx_infos = inner_env.open_db_ro::<TransactionInfos>(&tx_ro)?;
|
||||||
|
|
||||||
|
let mut stem_pool_hashes = Vec::new();
|
||||||
|
|
||||||
|
// A closure that returns `true` if a tx with a certain blob hash is unknown.
|
||||||
|
// This also fills in `stem_tx_hashes`.
|
||||||
|
let mut tx_unknown = |blob_hash| -> Result<bool, RuntimeError> {
|
||||||
|
match tx_blob_hashes.get(&blob_hash) {
|
||||||
|
Ok(tx_hash) => {
|
||||||
|
if in_stem_pool(&tx_hash, &tx_infos)? {
|
||||||
|
stem_pool_hashes.push(tx_hash);
|
||||||
|
}
|
||||||
|
Ok(false)
|
||||||
|
}
|
||||||
|
Err(RuntimeError::KeyNotFound) => Ok(true),
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut err = None;
|
||||||
|
blob_hashes.retain(|blob_hash| match tx_unknown(*blob_hash) {
|
||||||
|
Ok(res) => res,
|
||||||
|
Err(e) => {
|
||||||
|
err = Some(e);
|
||||||
|
false
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if let Some(e) = err {
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(TxpoolReadResponse::FilterKnownTxBlobHashes {
|
||||||
|
unknown_blob_hashes: blob_hashes,
|
||||||
|
stem_pool_hashes,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// [`TxpoolReadRequest::TxsForBlock`].
|
||||||
|
fn txs_for_block(env: &ConcreteEnv, txs: Vec<TransactionHash>) -> ReadResponseResult {
|
||||||
|
let inner_env = env.env_inner();
|
||||||
|
let tx_ro = inner_env.tx_ro()?;
|
||||||
|
|
||||||
|
let tables = inner_env.open_tables(&tx_ro)?;
|
||||||
|
|
||||||
|
let mut missing_tx_indexes = Vec::with_capacity(txs.len());
|
||||||
|
let mut txs_verification_data = HashMap::with_capacity(txs.len());
|
||||||
|
|
||||||
|
for (i, tx_hash) in txs.into_iter().enumerate() {
|
||||||
|
match get_transaction_verification_data(&tx_hash, &tables) {
|
||||||
|
Ok(tx) => {
|
||||||
|
txs_verification_data.insert(tx_hash, tx);
|
||||||
|
}
|
||||||
|
Err(RuntimeError::KeyNotFound) => missing_tx_indexes.push(i),
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(TxpoolReadResponse::TxsForBlock {
|
||||||
|
txs: txs_verification_data,
|
||||||
|
missing: missing_tx_indexes,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// [`TxpoolReadRequest::Backlog`].
|
/// [`TxpoolReadRequest::Backlog`].
|
||||||
#[inline]
|
#[inline]
|
||||||
fn backlog(env: &ConcreteEnv) -> ReadResponseResult {
|
fn backlog(env: &ConcreteEnv) -> ReadResponseResult {
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use cuprate_database::{ConcreteEnv, Env, EnvInner, RuntimeError, TxRw};
|
use cuprate_database::{ConcreteEnv, DatabaseRo, DatabaseRw, Env, EnvInner, RuntimeError, TxRw};
|
||||||
use cuprate_database_service::DatabaseWriteHandle;
|
use cuprate_database_service::DatabaseWriteHandle;
|
||||||
use cuprate_types::TransactionVerificationData;
|
use cuprate_types::TransactionVerificationData;
|
||||||
|
|
||||||
|
@ -10,8 +10,8 @@ use crate::{
|
||||||
interface::{TxpoolWriteRequest, TxpoolWriteResponse},
|
interface::{TxpoolWriteRequest, TxpoolWriteResponse},
|
||||||
types::TxpoolWriteHandle,
|
types::TxpoolWriteHandle,
|
||||||
},
|
},
|
||||||
tables::OpenTables,
|
tables::{OpenTables, Tables, TransactionInfos},
|
||||||
types::TransactionHash,
|
types::{KeyImage, TransactionHash, TxStateFlags},
|
||||||
};
|
};
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------------------------- init_write_service
|
//---------------------------------------------------------------------------------------------------- init_write_service
|
||||||
|
@ -31,6 +31,8 @@ fn handle_txpool_request(
|
||||||
add_transaction(env, tx, *state_stem)
|
add_transaction(env, tx, *state_stem)
|
||||||
}
|
}
|
||||||
TxpoolWriteRequest::RemoveTransaction(tx_hash) => remove_transaction(env, tx_hash),
|
TxpoolWriteRequest::RemoveTransaction(tx_hash) => remove_transaction(env, tx_hash),
|
||||||
|
TxpoolWriteRequest::Promote(tx_hash) => promote(env, tx_hash),
|
||||||
|
TxpoolWriteRequest::NewBlock { spent_key_images } => new_block(env, spent_key_images),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,3 +103,68 @@ fn remove_transaction(
|
||||||
TxRw::commit(tx_rw)?;
|
TxRw::commit(tx_rw)?;
|
||||||
Ok(TxpoolWriteResponse::Ok)
|
Ok(TxpoolWriteResponse::Ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// [`TxpoolWriteRequest::Promote`]
|
||||||
|
fn promote(
|
||||||
|
env: &ConcreteEnv,
|
||||||
|
tx_hash: &TransactionHash,
|
||||||
|
) -> Result<TxpoolWriteResponse, RuntimeError> {
|
||||||
|
let env_inner = env.env_inner();
|
||||||
|
let tx_rw = env_inner.tx_rw()?;
|
||||||
|
|
||||||
|
let res = || {
|
||||||
|
let mut tx_infos = env_inner.open_db_rw::<TransactionInfos>(&tx_rw)?;
|
||||||
|
|
||||||
|
tx_infos.update(tx_hash, |mut info| {
|
||||||
|
info.flags.remove(TxStateFlags::STATE_STEM);
|
||||||
|
Some(info)
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = res() {
|
||||||
|
// error promoting the tx, abort the DB transaction.
|
||||||
|
TxRw::abort(tx_rw)
|
||||||
|
.expect("could not maintain database atomicity by aborting write transaction");
|
||||||
|
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
TxRw::commit(tx_rw)?;
|
||||||
|
Ok(TxpoolWriteResponse::Ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// [`TxpoolWriteRequest::NewBlock`]
|
||||||
|
fn new_block(
|
||||||
|
env: &ConcreteEnv,
|
||||||
|
spent_key_images: &[KeyImage],
|
||||||
|
) -> Result<TxpoolWriteResponse, RuntimeError> {
|
||||||
|
let env_inner = env.env_inner();
|
||||||
|
let tx_rw = env_inner.tx_rw()?;
|
||||||
|
|
||||||
|
// FIXME: use try blocks once stable.
|
||||||
|
let result = || {
|
||||||
|
let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?;
|
||||||
|
|
||||||
|
// Remove all txs which spend key images that were spent in the new block.
|
||||||
|
for key_image in spent_key_images {
|
||||||
|
match tables_mut
|
||||||
|
.spent_key_images()
|
||||||
|
.get(key_image)
|
||||||
|
.and_then(|tx_hash| ops::remove_transaction(&tx_hash, &mut tables_mut))
|
||||||
|
{
|
||||||
|
Ok(()) | Err(RuntimeError::KeyNotFound) => (),
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = result() {
|
||||||
|
TxRw::abort(tx_rw)?;
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
TxRw::commit(tx_rw)?;
|
||||||
|
Ok(TxpoolWriteResponse::Ok)
|
||||||
|
}
|
||||||
|
|
|
@ -16,7 +16,9 @@
|
||||||
//! accessing _all_ tables defined here at once.
|
//! accessing _all_ tables defined here at once.
|
||||||
use cuprate_database::{define_tables, StorableVec};
|
use cuprate_database::{define_tables, StorableVec};
|
||||||
|
|
||||||
use crate::types::{KeyImage, RawCachedVerificationState, TransactionHash, TransactionInfo};
|
use crate::types::{
|
||||||
|
KeyImage, RawCachedVerificationState, TransactionBlobHash, TransactionHash, TransactionInfo,
|
||||||
|
};
|
||||||
|
|
||||||
define_tables! {
|
define_tables! {
|
||||||
/// Serialized transaction blobs.
|
/// Serialized transaction blobs.
|
||||||
|
@ -41,5 +43,9 @@ define_tables! {
|
||||||
///
|
///
|
||||||
/// This table contains the spent key images from all transactions in the pool.
|
/// This table contains the spent key images from all transactions in the pool.
|
||||||
3 => SpentKeyImages,
|
3 => SpentKeyImages,
|
||||||
KeyImage => TransactionHash
|
KeyImage => TransactionHash,
|
||||||
|
|
||||||
|
/// Transaction blob hashes that are in the pool.
|
||||||
|
4 => KnownBlobHashes,
|
||||||
|
TransactionBlobHash => TransactionHash,
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,6 @@
|
||||||
//!
|
//!
|
||||||
//! <!-- FIXME: Add schema here or a link to it when complete -->
|
//! <!-- FIXME: Add schema here or a link to it when complete -->
|
||||||
use bytemuck::{Pod, Zeroable};
|
use bytemuck::{Pod, Zeroable};
|
||||||
|
|
||||||
use monero_serai::transaction::Timelock;
|
use monero_serai::transaction::Timelock;
|
||||||
|
|
||||||
use cuprate_types::{CachedVerificationState, HardFork};
|
use cuprate_types::{CachedVerificationState, HardFork};
|
||||||
|
@ -17,6 +16,9 @@ pub type KeyImage = [u8; 32];
|
||||||
/// A transaction hash.
|
/// A transaction hash.
|
||||||
pub type TransactionHash = [u8; 32];
|
pub type TransactionHash = [u8; 32];
|
||||||
|
|
||||||
|
/// A transaction blob hash.
|
||||||
|
pub type TransactionBlobHash = [u8; 32];
|
||||||
|
|
||||||
bitflags::bitflags! {
|
bitflags::bitflags! {
|
||||||
/// Flags representing the state of the transaction in the pool.
|
/// Flags representing the state of the transaction in the pool.
|
||||||
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)]
|
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)]
|
||||||
|
|
Loading…
Reference in a new issue