Consensus: Improve API ()

* add specific method for context

* add new statemachine for tx verification

* fix consensus crates build

* working builds

* fix CI

* add docs

* fix CI

* fix docs

* fix clippy

* cleanup

* add docs to `blockchain_context`

* fix doc tests

* review fixes

* fix cargo doc
This commit is contained in:
Boog900 2025-01-28 22:25:20 +00:00 committed by GitHub
parent 9842535e86
commit d8d1e34266
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
39 changed files with 822 additions and 1226 deletions

7
Cargo.lock generated
View file

@ -56,6 +56,12 @@ version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7"
[[package]]
name = "arc-swap"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
name = "arrayref"
version = "0.3.9"
@ -646,6 +652,7 @@ dependencies = [
name = "cuprate-consensus-context"
version = "0.1.0"
dependencies = [
"arc-swap",
"cuprate-consensus-rules",
"cuprate-helper",
"cuprate-types",

View file

@ -100,6 +100,7 @@ cuprate-zmq-types = { path = "zmq/types", default-featur
# External dependencies
anyhow = { version = "1", default-features = false }
arc-swap = { version = "1", default-features = false }
arrayvec = { version = "0.7", default-features = false }
async-trait = { version = "0.1", default-features = false }
bitflags = { version = "2", default-features = false }
@ -153,7 +154,6 @@ tokio-test = { version = "0.4" }
## TODO:
## Potential dependencies.
# arc-swap = { version = "1.6.0" } # Atomically swappable Arc<T> | https://github.com/vorner/arc-swap
# itoa = { version = "1.0.9" } # Fast integer to string formatting | https://github.com/dtolnay/itoa
# notify = { version = "6.1.1" } # Filesystem watching | https://github.com/notify-rs/notify
# once_cell = { version = "1.18.0" } # Lazy/one-time initialization | https://github.com/matklad/once_cell

View file

@ -8,7 +8,7 @@ use tokio::sync::{mpsc, Notify};
use tower::{BoxError, Service, ServiceExt};
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::{generate_genesis_block, BlockChainContextService, ContextConfig};
use cuprate_consensus::{generate_genesis_block, BlockchainContextService, ContextConfig};
use cuprate_cryptonight::cryptonight_hash_v0;
use cuprate_p2p::{block_downloader::BlockDownloaderConfig, NetworkInterface};
use cuprate_p2p_core::{ClearNet, Network};
@ -26,9 +26,7 @@ mod syncer;
mod types;
pub use manager::init_blockchain_manager;
pub use types::{
ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle,
};
pub use types::ConsensusBlockchainReadHandle;
/// Checks if the genesis block is in the blockchain and adds it if not.
pub async fn check_add_genesis(
@ -81,22 +79,12 @@ pub async fn check_add_genesis(
pub async fn init_consensus(
blockchain_read_handle: BlockchainReadHandle,
context_config: ContextConfig,
) -> Result<
(
ConcreteBlockVerifierService,
ConcreteTxVerifierService,
BlockChainContextService,
),
BoxError,
> {
) -> Result<BlockchainContextService, BoxError> {
let read_handle = ConsensusBlockchainReadHandle::new(blockchain_read_handle, BoxError::from);
let ctx_service =
cuprate_consensus::initialize_blockchain_context(context_config, read_handle.clone())
.await?;
let (block_verifier_svc, tx_verifier_svc) =
cuprate_consensus::initialize_verifier(read_handle, ctx_service.clone());
Ok((block_verifier_svc, tx_verifier_svc, ctx_service))
Ok(ctx_service)
}

View file

@ -3,16 +3,14 @@ use std::{collections::HashMap, sync::Arc};
use futures::StreamExt;
use monero_serai::block::Block;
use tokio::sync::{mpsc, oneshot, Notify};
use tower::{Service, ServiceExt};
use tower::{BoxError, Service, ServiceExt};
use tracing::error;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::{
BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService,
BlockVerifierService, ExtendedConsensusError, TxVerifierService, VerifyBlockRequest,
VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse,
BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService,
ExtendedConsensusError,
};
use cuprate_consensus_context::RawBlockChainContext;
use cuprate_p2p::{
block_downloader::{BlockBatch, BlockDownloaderConfig},
BroadcastSvc, NetworkInterface,
@ -26,10 +24,8 @@ use cuprate_types::{
use crate::{
blockchain::{
chain_service::ChainService,
interface::COMMAND_TX,
syncer,
types::{ConcreteBlockVerifierService, ConsensusBlockchainReadHandle},
chain_service::ChainService, interface::COMMAND_TX, syncer,
types::ConsensusBlockchainReadHandle,
},
constants::PANIC_CRITICAL_SERVICE_ERROR,
};
@ -48,8 +44,7 @@ pub async fn init_blockchain_manager(
blockchain_write_handle: BlockchainWriteHandle,
blockchain_read_handle: BlockchainReadHandle,
txpool_write_handle: TxpoolWriteHandle,
mut blockchain_context_service: BlockChainContextService,
block_verifier_service: ConcreteBlockVerifierService,
mut blockchain_context_service: BlockchainContextService,
block_downloader_config: BlockDownloaderConfig,
) {
// TODO: find good values for these size limits
@ -68,24 +63,14 @@ pub async fn init_blockchain_manager(
block_downloader_config,
));
let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::Context)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!()
};
let manager = BlockchainManager {
blockchain_write_handle,
blockchain_read_handle,
blockchain_read_handle: ConsensusBlockchainReadHandle::new(
blockchain_read_handle,
BoxError::from,
),
txpool_write_handle,
blockchain_context_service,
cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(),
block_verifier_service,
stop_current_block_downloader,
broadcast_svc: clearnet_interface.broadcast_svc(),
};
@ -104,18 +89,12 @@ pub struct BlockchainManager {
/// is held.
blockchain_write_handle: BlockchainWriteHandle,
/// A [`BlockchainReadHandle`].
blockchain_read_handle: BlockchainReadHandle,
blockchain_read_handle: ConsensusBlockchainReadHandle,
/// A [`TxpoolWriteHandle`].
txpool_write_handle: TxpoolWriteHandle,
// TODO: Improve the API of the cache service.
// TODO: rename the cache service -> `BlockchainContextService`.
/// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve
/// values without needing to go to a [`BlockchainReadHandle`].
blockchain_context_service: BlockChainContextService,
/// A cached context representing the current state.
cached_blockchain_context: RawBlockChainContext,
/// The block verifier service, to verify incoming blocks.
block_verifier_service: ConcreteBlockVerifierService,
blockchain_context_service: BlockchainContextService,
/// A [`Notify`] to tell the [syncer](syncer::syncer) that we want to cancel this current download
/// attempt.
stop_current_block_downloader: Arc<Notify>,

View file

@ -13,9 +13,12 @@ use tracing::{info, instrument};
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::{
block::PreparedBlock, transactions::new_tx_verification_data, BlockChainContextRequest,
BlockChainContextResponse, BlockVerifierService, ExtendedConsensusError, VerifyBlockRequest,
VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse,
block::{
batch_prepare_main_chain_blocks, sanity_check_alt_block, verify_main_chain_block,
verify_prepped_main_chain_block, PreparedBlock,
},
transactions::new_tx_verification_data,
BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError,
};
use cuprate_consensus_context::NewBlockData;
use cuprate_helper::cast::usize_to_u64;
@ -83,30 +86,33 @@ impl super::BlockchainManager {
block: Block,
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
) -> Result<IncomingBlockOk, anyhow::Error> {
if block.header.previous != self.cached_blockchain_context.top_hash {
if block.header.previous
!= self
.blockchain_context_service
.blockchain_context()
.top_hash
{
self.handle_incoming_alt_block(block, prepared_txs).await?;
return Ok(IncomingBlockOk::AddedToAltChain);
}
let VerifyBlockResponse::MainChain(verified_block) = self
.block_verifier_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(VerifyBlockRequest::MainChain {
block,
prepared_txs,
})
.await?
else {
unreachable!();
};
let verified_block = verify_main_chain_block(
block,
prepared_txs,
&mut self.blockchain_context_service,
self.blockchain_read_handle.clone(),
)
.await?;
let block_blob = Bytes::copy_from_slice(&verified_block.block_blob);
self.add_valid_block_to_main_chain(verified_block).await;
self.broadcast_block(block_blob, self.cached_blockchain_context.chain_height)
.await;
let chain_height = self
.blockchain_context_service
.blockchain_context()
.chain_height;
self.broadcast_block(block_blob, chain_height).await;
Ok(IncomingBlockOk::AddedToMainChain)
}
@ -135,7 +141,12 @@ impl super::BlockchainManager {
.first()
.expect("Block batch should not be empty");
if first_block.header.previous == self.cached_blockchain_context.top_hash {
if first_block.header.previous
== self
.blockchain_context_service
.blockchain_context()
.top_hash
{
self.handle_incoming_block_batch_main_chain(batch).await;
} else {
self.handle_incoming_block_batch_alt_chain(batch).await;
@ -155,43 +166,27 @@ impl super::BlockchainManager {
/// This function will panic if any internal service returns an unexpected error that we cannot
/// recover from or if the incoming batch contains no blocks.
async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) {
let batch_prep_res = self
.block_verifier_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(VerifyBlockRequest::MainChainBatchPrepareBlocks {
blocks: batch.blocks,
})
.await;
let prepped_blocks = match batch_prep_res {
Ok(VerifyBlockResponse::MainChainBatchPrepped(prepped_blocks)) => prepped_blocks,
Err(_) => {
batch.peer_handle.ban_peer(LONG_BAN);
self.stop_current_block_downloader.notify_one();
return;
}
_ => unreachable!(),
let Ok(prepped_blocks) =
batch_prepare_main_chain_blocks(batch.blocks, &mut self.blockchain_context_service)
.await
else {
batch.peer_handle.ban_peer(LONG_BAN);
self.stop_current_block_downloader.notify_one();
return;
};
for (block, txs) in prepped_blocks {
let verify_res = self
.block_verifier_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(VerifyBlockRequest::MainChainPrepped { block, txs })
.await;
let verified_block = match verify_res {
Ok(VerifyBlockResponse::MainChain(verified_block)) => verified_block,
Err(_) => {
batch.peer_handle.ban_peer(LONG_BAN);
self.stop_current_block_downloader.notify_one();
return;
}
_ => unreachable!(),
let Ok(verified_block) = verify_prepped_main_chain_block(
block,
txs,
&mut self.blockchain_context_service,
self.blockchain_read_handle.clone(),
)
.await
else {
batch.peer_handle.ban_peer(LONG_BAN);
self.stop_current_block_downloader.notify_one();
return;
};
self.add_valid_block_to_main_chain(verified_block).await;
@ -272,25 +267,18 @@ impl super::BlockchainManager {
block: Block,
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
) -> Result<AddAltBlock, anyhow::Error> {
let VerifyBlockResponse::AltChain(alt_block_info) = self
.block_verifier_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(VerifyBlockRequest::AltChain {
block,
prepared_txs,
})
.await?
else {
unreachable!();
};
let alt_block_info =
sanity_check_alt_block(block, prepared_txs, self.blockchain_context_service.clone())
.await?;
// TODO: check in consensus crate if alt block with this hash already exists.
// If this alt chain
// If this alt chain has more cumulative difficulty, reorg.
if alt_block_info.cumulative_difficulty
> self.cached_blockchain_context.cumulative_difficulty
> self
.blockchain_context_service
.blockchain_context()
.cumulative_difficulty
{
self.try_do_reorg(alt_block_info).await?;
return Ok(AddAltBlock::Reorged);
@ -335,7 +323,8 @@ impl super::BlockchainManager {
.call(BlockchainReadRequest::AltBlocksInChain(
top_alt_block.chain_id,
))
.await?
.await
.map_err(|e| anyhow::anyhow!(e))?
else {
unreachable!();
};
@ -343,7 +332,10 @@ impl super::BlockchainManager {
alt_blocks.push(top_alt_block);
let split_height = alt_blocks[0].height;
let current_main_chain_height = self.cached_blockchain_context.chain_height;
let current_main_chain_height = self
.blockchain_context_service
.blockchain_context()
.chain_height;
let BlockchainResponse::PopBlocks(old_main_chain_id) = self
.blockchain_write_handle
@ -402,24 +394,18 @@ impl super::BlockchainManager {
let prepped_txs = alt_block
.txs
.drain(..)
.map(|tx| Ok(Arc::new(tx.try_into()?)))
.map(|tx| Ok(tx.try_into()?))
.collect::<Result<_, anyhow::Error>>()?;
let prepped_block = PreparedBlock::new_alt_block(alt_block)?;
let VerifyBlockResponse::MainChain(verified_block) = self
.block_verifier_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(VerifyBlockRequest::MainChainPrepped {
block: prepped_block,
txs: prepped_txs,
})
.await?
else {
unreachable!();
};
let verified_block = verify_prepped_main_chain_block(
prepped_block,
prepped_txs,
&mut self.blockchain_context_service,
self.blockchain_read_handle.clone(),
)
.await?;
self.add_valid_block_to_main_chain(verified_block).await;
}
@ -429,8 +415,7 @@ impl super::BlockchainManager {
/// Adds a [`VerifiedBlockInformation`] to the main-chain.
///
/// This function will update the blockchain database and the context cache, it will also
/// update [`Self::cached_blockchain_context`].
/// This function will update the blockchain database and the context cache.
///
/// # Panics
///
@ -477,20 +462,6 @@ impl super::BlockchainManager {
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
let BlockChainContextResponse::Context(blockchain_context) = self
.blockchain_context_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::Context)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!();
};
self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone();
self.txpool_write_handle
.ready()
.await

View file

@ -9,8 +9,8 @@ use tokio::{
use tower::{Service, ServiceExt};
use tracing::instrument;
use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse};
use cuprate_consensus_context::RawBlockChainContext;
use cuprate_consensus::{BlockChainContextRequest, BlockChainContextResponse, BlockchainContext};
use cuprate_consensus_context::BlockchainContextService;
use cuprate_p2p::{
block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
NetworkInterface, PeerSetRequest, PeerSetResponse,
@ -30,8 +30,8 @@ pub enum SyncerError {
/// The syncer tasks that makes sure we are fully synchronised with our connected peers.
#[instrument(level = "debug", skip_all)]
pub async fn syncer<C, CN>(
mut context_svc: C,
pub async fn syncer<CN>(
mut context_svc: BlockchainContextService,
our_chain: CN,
mut clearnet_interface: NetworkInterface<ClearNet>,
incoming_block_batch_tx: mpsc::Sender<BlockBatch>,
@ -39,12 +39,6 @@ pub async fn syncer<C, CN>(
block_downloader_config: BlockDownloaderConfig,
) -> Result<(), SyncerError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
>,
C::Future: Send + 'static,
CN: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
+ Clone
+ Send
@ -55,15 +49,6 @@ where
let mut check_sync_interval = interval(CHECK_SYNC_FREQUENCY);
let BlockChainContextResponse::Context(mut blockchain_ctx) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::Context)
.await?
else {
unreachable!();
};
tracing::debug!("Waiting for new sync info in top sync channel");
loop {
@ -71,10 +56,9 @@ where
tracing::trace!("Checking connected peers to see if we are behind",);
check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?;
let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context();
let blockchain_context = context_svc.blockchain_context();
if !check_behind_peers(raw_blockchain_context, &mut clearnet_interface).await? {
if !check_behind_peers(blockchain_context, &mut clearnet_interface).await? {
continue;
}
@ -92,10 +76,9 @@ where
}
batch = block_batch_stream.next() => {
let Some(batch) = batch else {
check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?;
let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context();
let blockchain_context = context_svc.blockchain_context();
if !check_behind_peers(raw_blockchain_context, &mut clearnet_interface).await? {
if !check_behind_peers(blockchain_context, &mut clearnet_interface).await? {
tracing::info!("Synchronised with the network.");
}
@ -114,7 +97,7 @@ where
/// Returns `true` if we are behind the current connected network peers.
async fn check_behind_peers(
raw_blockchain_context: &RawBlockChainContext,
blockchain_context: &BlockchainContext,
mut clearnet_interface: &mut NetworkInterface<ClearNet>,
) -> Result<bool, tower::BoxError> {
let PeerSetResponse::MostPoWSeen {
@ -130,38 +113,9 @@ async fn check_behind_peers(
unreachable!();
};
if cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty {
if cumulative_difficulty <= blockchain_context.cumulative_difficulty {
return Ok(false);
}
Ok(true)
}
/// Checks if we should update the given [`BlockChainContext`] and updates it if needed.
async fn check_update_blockchain_context<C>(
context_svc: C,
old_context: &mut BlockChainContext,
) -> Result<(), tower::BoxError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
>,
C::Future: Send + 'static,
{
if old_context.blockchain_context().is_ok() {
return Ok(());
}
let BlockChainContextResponse::Context(ctx) = context_svc
.oneshot(BlockChainContextRequest::Context)
.await?
else {
unreachable!();
};
*old_context = ctx;
Ok(())
}

View file

@ -1,17 +1,6 @@
use tower::util::MapErr;
use cuprate_blockchain::{cuprate_database::RuntimeError, service::BlockchainReadHandle};
use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService};
/// The [`BlockVerifierService`] with all generic types defined.
pub type ConcreteBlockVerifierService = BlockVerifierService<
BlockChainContextService,
ConcreteTxVerifierService,
ConsensusBlockchainReadHandle,
>;
/// The [`TxVerifierService`] with all generic types defined.
pub type ConcreteTxVerifierService = TxVerifierService<ConsensusBlockchainReadHandle>;
/// The [`BlockchainReadHandle`] with the [`tower::Service::Error`] mapped to conform to what the consensus crate requires.
pub type ConsensusBlockchainReadHandle =

View file

@ -9,7 +9,7 @@ use tower::{Service, ServiceExt};
use tracing::level_filters::LevelFilter;
use cuprate_consensus_context::{
BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService,
BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService,
};
use cuprate_helper::time::secs_to_hms;
@ -87,7 +87,7 @@ pub fn command_listener(incoming_commands: mpsc::Sender<Command>) -> ! {
/// The [`Command`] handler loop.
pub async fn io_loop(
mut incoming_commands: mpsc::Receiver<Command>,
mut context_service: BlockChainContextService,
mut context_service: BlockchainContextService,
) {
loop {
let Some(command) = incoming_commands.recv().await else {
@ -113,17 +113,8 @@ pub async fn io_loop(
}
}
Command::Status => {
let BlockChainContextResponse::Context(blockchain_context) = context_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::Context)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!();
};
let context = blockchain_context.unchecked_blockchain_context();
let context = context_service.blockchain_context();
let uptime = statics::START_INSTANT.elapsed().unwrap_or_default();
let (h, m, s) = secs_to_hms(uptime.as_secs());

View file

@ -24,7 +24,7 @@ use tracing::level_filters::LevelFilter;
use tracing_subscriber::{layer::SubscriberExt, reload::Handle, util::SubscriberInitExt, Registry};
use cuprate_consensus_context::{
BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService,
BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService,
};
use cuprate_helper::time::secs_to_hms;
@ -90,7 +90,7 @@ fn main() {
.await;
// Start the context service and the block/tx verifier.
let (block_verifier, tx_verifier, context_svc) =
let context_svc =
blockchain::init_consensus(blockchain_read_handle.clone(), config.context_config())
.await
.unwrap();
@ -111,7 +111,7 @@ fn main() {
txpool_write_handle.clone(),
txpool_read_handle,
context_svc.clone(),
tx_verifier,
blockchain_read_handle.clone(),
);
if incoming_tx_handler_tx.send(tx_handler).is_err() {
unreachable!()
@ -124,7 +124,6 @@ fn main() {
blockchain_read_handle,
txpool_write_handle,
context_svc.clone(),
block_verifier,
config.block_downloader_config(),
)
.await;

View file

@ -6,7 +6,7 @@ use tokio::sync::oneshot;
use tower::ServiceExt;
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::BlockChainContextService;
use cuprate_consensus::BlockchainContextService;
use cuprate_p2p::{NetworkInterface, P2PConfig};
use cuprate_p2p_core::ClearNet;
use cuprate_txpool::service::TxpoolReadHandle;
@ -25,7 +25,7 @@ pub use network_address::CrossNetworkInternalPeerId;
/// handshakes can not be completed.
pub async fn start_clearnet_p2p(
blockchain_read_handle: BlockchainReadHandle,
blockchain_context_service: BlockChainContextService,
blockchain_context_service: BlockchainContextService,
txpool_read_handle: TxpoolReadHandle,
config: P2PConfig<ClearNet>,
) -> Result<

View file

@ -1,10 +1,13 @@
use std::task::{Context, Poll};
use std::{
future::{ready, Ready},
task::{Context, Poll},
};
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
use tower::Service;
use cuprate_consensus::{
BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService,
BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService,
};
use cuprate_helper::{cast::usize_to_u64, map::split_u128_into_low_high_bits};
use cuprate_p2p_core::services::{CoreSyncDataRequest, CoreSyncDataResponse};
@ -12,38 +15,30 @@ use cuprate_wire::CoreSyncData;
/// The core sync service.
#[derive(Clone)]
pub struct CoreSyncService(pub BlockChainContextService);
pub struct CoreSyncService(pub BlockchainContextService);
impl Service<CoreSyncDataRequest> for CoreSyncService {
type Response = CoreSyncDataResponse;
type Error = tower::BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
Poll::Ready(Ok(()))
}
fn call(&mut self, _: CoreSyncDataRequest) -> Self::Future {
self.0
.call(BlockChainContextRequest::Context)
.map_ok(|res| {
let BlockChainContextResponse::Context(context) = res else {
unreachable!()
};
let context = self.0.blockchain_context();
let context = context.unchecked_blockchain_context();
let (cumulative_difficulty, cumulative_difficulty_top64) =
split_u128_into_low_high_bits(context.cumulative_difficulty);
let (cumulative_difficulty, cumulative_difficulty_top64) =
split_u128_into_low_high_bits(context.cumulative_difficulty);
CoreSyncDataResponse(CoreSyncData {
cumulative_difficulty,
cumulative_difficulty_top64,
current_height: usize_to_u64(context.chain_height),
pruning_seed: 0,
top_id: context.top_hash,
top_version: context.current_hf.as_u8(),
})
})
.boxed()
ready(Ok(CoreSyncDataResponse(CoreSyncData {
cumulative_difficulty,
cumulative_difficulty_top64,
current_height: usize_to_u64(context.chain_height),
pruning_seed: 0,
top_id: context.top_hash,
top_version: context.current_hf.as_u8(),
})))
}
}

View file

@ -18,7 +18,7 @@ use tower::{Service, ServiceExt};
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::{
transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
BlockChainContextService,
BlockchainContextService,
};
use cuprate_dandelion_tower::TxState;
use cuprate_fixed_bytes::ByteArrayVec;
@ -56,7 +56,7 @@ use crate::{
#[derive(Clone)]
pub struct P2pProtocolRequestHandlerMaker {
pub blockchain_read_handle: BlockchainReadHandle,
pub blockchain_context_service: BlockChainContextService,
pub blockchain_context_service: BlockchainContextService,
pub txpool_read_handle: TxpoolReadHandle,
/// The [`IncomingTxHandler`], wrapped in an [`Option`] as there is a cyclic reference between [`P2pProtocolRequestHandlerMaker`]
@ -114,7 +114,7 @@ where
pub struct P2pProtocolRequestHandler<N: NetZoneAddress> {
peer_information: PeerInformation<N>,
blockchain_read_handle: BlockchainReadHandle,
blockchain_context_service: BlockChainContextService,
blockchain_context_service: BlockchainContextService,
txpool_read_handle: TxpoolReadHandle,
incoming_tx_handler: IncomingTxHandler,
}
@ -366,25 +366,14 @@ async fn new_fluffy_block<A: NetZoneAddress>(
async fn new_transactions<A>(
peer_information: PeerInformation<A>,
request: NewTransactions,
mut blockchain_context_service: BlockChainContextService,
mut blockchain_context_service: BlockchainContextService,
mut incoming_tx_handler: IncomingTxHandler,
) -> anyhow::Result<ProtocolResponse>
where
A: NetZoneAddress,
InternalPeerID<A>: Into<CrossNetworkInternalPeerId>,
{
let BlockChainContextResponse::Context(context) = blockchain_context_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::Context)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!()
};
let context = context.unchecked_blockchain_context();
let context = blockchain_context_service.blockchain_context();
// If we are more than 2 blocks behind the peer then ignore the txs - we are probably still syncing.
if usize_to_u64(context.chain_height + 2)

View file

@ -8,7 +8,7 @@ use monero_serai::block::Block;
use tower::Service;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::BlockChainContextService;
use cuprate_consensus::BlockchainContextService;
use cuprate_pruning::PruningSeed;
use cuprate_rpc_interface::RpcHandler;
use cuprate_rpc_types::{
@ -148,7 +148,7 @@ pub struct CupratedRpcHandler {
pub blockchain_read: BlockchainReadHandle,
/// Handle to the blockchain context service.
pub blockchain_context: BlockChainContextService,
pub blockchain_context: BlockchainContextService,
/// Handle to the blockchain manager.
pub blockchain_manager: BlockchainManagerHandle,
@ -165,7 +165,7 @@ impl CupratedRpcHandler {
pub const fn new(
restricted: bool,
blockchain_read: BlockchainReadHandle,
blockchain_context: BlockChainContextService,
blockchain_context: BlockchainContextService,
blockchain_manager: BlockchainManagerHandle,
txpool_read: TxpoolReadHandle,
txpool_manager: std::convert::Infallible,

View file

@ -7,35 +7,26 @@ use monero_serai::block::Block;
use tower::{Service, ServiceExt};
use cuprate_consensus_context::{
BlockChainContext, BlockChainContextRequest, BlockChainContextResponse,
BlockChainContextService,
BlockChainContextRequest, BlockChainContextResponse, BlockchainContext,
BlockchainContextService,
};
use cuprate_helper::cast::u64_to_usize;
use cuprate_types::{FeeEstimate, HardFork, HardForkInfo};
// FIXME: use `anyhow::Error` over `tower::BoxError` in blockchain context.
/// [`BlockChainContextRequest::Context`].
pub(crate) async fn context(
blockchain_context: &mut BlockChainContextService,
) -> Result<BlockChainContext, Error> {
let BlockChainContextResponse::Context(context) = blockchain_context
.ready()
.await
.map_err(|e| anyhow!(e))?
.call(BlockChainContextRequest::Context)
.await
.map_err(|e| anyhow!(e))?
else {
unreachable!();
};
blockchain_context: &mut BlockchainContextService,
) -> Result<BlockchainContext, Error> {
// TODO: Remove this whole function just call directly in all usages.
let context = blockchain_context.blockchain_context().clone();
Ok(context)
}
/// [`BlockChainContextRequest::HardForkInfo`].
pub(crate) async fn hard_fork_info(
blockchain_context: &mut BlockChainContextService,
blockchain_context: &mut BlockchainContextService,
hard_fork: HardFork,
) -> Result<HardForkInfo, Error> {
let BlockChainContextResponse::HardForkInfo(hf_info) = blockchain_context
@ -54,7 +45,7 @@ pub(crate) async fn hard_fork_info(
/// [`BlockChainContextRequest::FeeEstimate`].
pub(crate) async fn fee_estimate(
blockchain_context: &mut BlockChainContextService,
blockchain_context: &mut BlockchainContextService,
grace_blocks: u64,
) -> Result<FeeEstimate, Error> {
let BlockChainContextResponse::FeeEstimate(fee) = blockchain_context
@ -73,7 +64,7 @@ pub(crate) async fn fee_estimate(
/// [`BlockChainContextRequest::CalculatePow`]
pub(crate) async fn calculate_pow(
blockchain_context: &mut BlockChainContextService,
blockchain_context: &mut BlockchainContextService,
hardfork: HardFork,
height: u64,
block: Box<Block>,

View file

@ -1,13 +1,11 @@
//! Transaction Pool
//!
//! Handles initiating the tx-pool, providing the preprocessor required for the dandelion pool.
use cuprate_consensus::BlockChainContextService;
use cuprate_consensus::BlockchainContextService;
use cuprate_p2p::NetworkInterface;
use cuprate_p2p_core::ClearNet;
use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
use crate::blockchain::ConcreteTxVerifierService;
mod dandelion;
mod incoming_tx;
mod txs_being_handled;

View file

@ -7,11 +7,13 @@ use std::{
use bytes::Bytes;
use futures::{future::BoxFuture, FutureExt};
use monero_serai::transaction::Transaction;
use tower::{Service, ServiceExt};
use tower::{BoxError, Service, ServiceExt};
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::transactions::{start_tx_verification, PrepTransactions};
use cuprate_consensus::{
transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse,
BlockChainContextService, ExtendedConsensusError, VerifyTxRequest,
BlockchainContextService, ExtendedConsensusError,
};
use cuprate_dandelion_tower::{
pool::{DandelionPoolService, IncomingTxBuilder},
@ -32,7 +34,7 @@ use cuprate_txpool::{
use cuprate_types::TransactionVerificationData;
use crate::{
blockchain::ConcreteTxVerifierService,
blockchain::ConsensusBlockchainReadHandle,
constants::PANIC_CRITICAL_SERVICE_ERROR,
p2p::CrossNetworkInternalPeerId,
signals::REORG_LOCK,
@ -76,16 +78,16 @@ pub struct IncomingTxHandler {
/// A store of txs currently being handled in incoming tx requests.
pub(super) txs_being_handled: TxsBeingHandled,
/// The blockchain context cache.
pub(super) blockchain_context_cache: BlockChainContextService,
pub(super) blockchain_context_cache: BlockchainContextService,
/// The dandelion txpool manager.
pub(super) dandelion_pool_manager:
DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
/// The transaction verifier service.
pub(super) tx_verifier_service: ConcreteTxVerifierService,
/// The txpool write handle.
pub(super) txpool_write_handle: TxpoolWriteHandle,
/// The txpool read handle.
pub(super) txpool_read_handle: TxpoolReadHandle,
/// The blockchain read handle.
pub(super) blockchain_read_handle: ConsensusBlockchainReadHandle,
}
impl IncomingTxHandler {
@ -95,8 +97,8 @@ impl IncomingTxHandler {
clear_net: NetworkInterface<ClearNet>,
txpool_write_handle: TxpoolWriteHandle,
txpool_read_handle: TxpoolReadHandle,
blockchain_context_cache: BlockChainContextService,
tx_verifier_service: ConcreteTxVerifierService,
blockchain_context_cache: BlockchainContextService,
blockchain_read_handle: BlockchainReadHandle,
) -> Self {
let dandelion_router = dandelion::dandelion_router(clear_net);
@ -110,9 +112,12 @@ impl IncomingTxHandler {
txs_being_handled: TxsBeingHandled::new(),
blockchain_context_cache,
dandelion_pool_manager,
tx_verifier_service,
txpool_write_handle,
txpool_read_handle,
blockchain_read_handle: ConsensusBlockchainReadHandle::new(
blockchain_read_handle,
BoxError::from,
),
}
}
}
@ -131,7 +136,7 @@ impl Service<IncomingTxs> for IncomingTxHandler {
req,
self.txs_being_handled.clone(),
self.blockchain_context_cache.clone(),
self.tx_verifier_service.clone(),
self.blockchain_read_handle.clone(),
self.txpool_write_handle.clone(),
self.txpool_read_handle.clone(),
self.dandelion_pool_manager.clone(),
@ -144,8 +149,8 @@ impl Service<IncomingTxs> for IncomingTxHandler {
async fn handle_incoming_txs(
IncomingTxs { txs, state }: IncomingTxs,
txs_being_handled: TxsBeingHandled,
mut blockchain_context_cache: BlockChainContextService,
mut tx_verifier_service: ConcreteTxVerifierService,
mut blockchain_context_cache: BlockchainContextService,
blockchain_read_handle: ConsensusBlockchainReadHandle,
mut txpool_write_handle: TxpoolWriteHandle,
mut txpool_read_handle: TxpoolReadHandle,
mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>,
@ -155,30 +160,20 @@ async fn handle_incoming_txs(
let (txs, stem_pool_txs, txs_being_handled_guard) =
prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?;
let BlockChainContextResponse::Context(context) = blockchain_context_cache
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::Context)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!()
};
let context = blockchain_context_cache.blockchain_context();
let context = context.unchecked_blockchain_context();
tx_verifier_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(VerifyTxRequest::Prepped {
txs: txs.clone(),
current_chain_height: context.chain_height,
top_hash: context.top_hash,
time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(),
hf: context.current_hf,
})
let txs = start_tx_verification()
.append_prepped_txs(txs)
.prepare()
.map_err(|e| IncomingTxError::Consensus(e.into()))?
.full(
context.chain_height,
context.top_hash,
context.current_adjusted_timestamp_for_time_lock(),
context.current_hf,
blockchain_read_handle,
)
.verify()
.await
.map_err(IncomingTxError::Consensus)?;
@ -220,7 +215,7 @@ async fn prepare_incoming_txs(
txpool_read_handle: &mut TxpoolReadHandle,
) -> Result<
(
Vec<Arc<TransactionVerificationData>>,
Vec<TransactionVerificationData>,
Vec<TxId>,
TxsBeingHandledLocally,
),
@ -282,7 +277,7 @@ async fn prepare_incoming_txs(
let tx = new_tx_verification_data(tx)
.map_err(|e| IncomingTxError::Consensus(e.into()))?;
Ok(Arc::new(tx))
Ok(tx)
})
.collect::<Result<Vec<_>, IncomingTxError>>()?;
@ -295,7 +290,7 @@ async fn prepare_incoming_txs(
///
/// This will add the tx to the txpool and route it to the network.
async fn handle_valid_tx(
tx: Arc<TransactionVerificationData>,
tx: TransactionVerificationData,
state: TxState<CrossNetworkInternalPeerId>,
txpool_write_handle: &mut TxpoolWriteHandle,
dandelion_pool_manager: &mut DandelionPoolService<
@ -312,7 +307,7 @@ async fn handle_valid_tx(
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(TxpoolWriteRequest::AddTransaction {
tx,
tx: Box::new(tx),
state_stem: state.is_stem_stage(),
})
.await

View file

@ -17,7 +17,6 @@ cfg-if = { workspace = true }
thiserror = { workspace = true }
tower = { workspace = true, features = ["util"] }
tracing = { workspace = true, features = ["std", "attributes"] }
futures = { workspace = true, features = ["std", "async-await"] }
monero-serai = { workspace = true, features = ["std"] }
@ -35,6 +34,7 @@ hex-literal = { workspace = true }
curve25519-dalek = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"]}
futures = { workspace = true, features = ["std", "async-await"] }
tokio-test = { workspace = true }
proptest = { workspace = true }
proptest-derive = { workspace = true }

View file

@ -10,6 +10,7 @@ cuprate-consensus-rules = { workspace = true, features = ["proptest"]}
cuprate-helper = { workspace = true, default-features = false, features = ["std", "cast", "num", "asynch"] }
cuprate-types = { workspace = true, default-features = false, features = ["blockchain"] }
arc-swap = { workspace = true }
futures = { workspace = true, features = ["std", "async-await"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"]}
tokio-util = { workspace = true }

View file

@ -1,6 +1,6 @@
//! # Blockchain Context
//!
//! This crate contains a service to get cached context from the blockchain: [`BlockChainContext`].
//! This crate contains a service to get cached context from the blockchain: [`BlockchainContext`].
//! This is used during contextual validation, this does not have all the data for contextual validation
//! (outputs) for that you will need a [`Database`].
@ -17,6 +17,7 @@ use std::{
task::{Context, Poll},
};
use arc_swap::Cache;
use futures::{channel::oneshot, FutureExt};
use monero_serai::block::Block;
use tokio::sync::mpsc;
@ -34,7 +35,6 @@ pub mod weight;
mod alt_chains;
mod task;
mod tokens;
use cuprate_types::{Chain, ChainInfo, FeeEstimate, HardForkInfo};
use difficulty::DifficultyCache;
@ -44,7 +44,6 @@ use weight::BlockWeightsCache;
pub use alt_chains::{sealed::AltChainRequestToken, AltChainContextCache};
pub use difficulty::DifficultyCacheConfig;
pub use hardforks::HardForkConfig;
pub use tokens::*;
pub use weight::BlockWeightsCacheConfig;
pub const BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW: u64 = 60;
@ -96,27 +95,29 @@ impl ContextConfig {
pub async fn initialize_blockchain_context<D>(
cfg: ContextConfig,
database: D,
) -> Result<BlockChainContextService, ContextCacheError>
) -> Result<BlockchainContextService, ContextCacheError>
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
let context_task = task::ContextTask::init_context(cfg, database).await?;
let (context_task, context_cache) = task::ContextTask::init_context(cfg, database).await?;
// TODO: make buffer size configurable.
let (tx, rx) = mpsc::channel(15);
tokio::spawn(context_task.run(rx));
Ok(BlockChainContextService {
Ok(BlockchainContextService {
cached_context: Cache::new(context_cache),
channel: PollSender::new(tx),
})
}
/// Raw blockchain context, gotten from [`BlockChainContext`]. This data may turn invalid so is not ok to keep
/// around. You should keep around [`BlockChainContext`] instead.
#[derive(Debug, Clone)]
pub struct RawBlockChainContext {
/// Raw blockchain context, gotten from [`BlockchainContext`]. This data may turn invalid so is not ok to keep
/// around. You should keep around [`BlockchainContext`] instead.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct BlockchainContext {
/// The current cumulative difficulty.
pub cumulative_difficulty: u128,
/// Context to verify a block, as needed by [`cuprate-consensus-rules`]
@ -127,14 +128,14 @@ pub struct RawBlockChainContext {
top_block_timestamp: Option<u64>,
}
impl std::ops::Deref for RawBlockChainContext {
impl std::ops::Deref for BlockchainContext {
type Target = ContextToVerifyBlock;
fn deref(&self) -> &Self::Target {
&self.context_to_verify_block
}
}
impl RawBlockChainContext {
impl BlockchainContext {
/// Returns the timestamp the should be used when checking locked outputs.
///
/// ref: <https://cuprate.github.io/monero-book/consensus_rules/transactions/unlock_time.html#getting-the-current-time>
@ -167,40 +168,6 @@ impl RawBlockChainContext {
}
}
/// Blockchain context which keeps a token of validity so users will know when the data is no longer valid.
#[derive(Debug, Clone)]
pub struct BlockChainContext {
/// A token representing this data's validity.
validity_token: ValidityToken,
/// The actual block chain context.
raw: RawBlockChainContext,
}
#[derive(Debug, Clone, Copy, thiserror::Error)]
#[error("data is no longer valid")]
pub struct DataNoLongerValid;
impl BlockChainContext {
/// Checks if the data is still valid.
pub fn is_still_valid(&self) -> bool {
self.validity_token.is_data_valid()
}
/// Checks if the data is valid returning an Err if not and a reference to the blockchain context if
/// it is.
pub fn blockchain_context(&self) -> Result<&RawBlockChainContext, DataNoLongerValid> {
if !self.is_still_valid() {
return Err(DataNoLongerValid);
}
Ok(&self.raw)
}
/// Returns the blockchain context without checking the validity token.
pub const fn unchecked_blockchain_context(&self) -> &RawBlockChainContext {
&self.raw
}
}
/// Data needed from a new block to add it to the context cache.
#[derive(Debug, Clone)]
pub struct NewBlockData {
@ -225,9 +192,6 @@ pub struct NewBlockData {
/// A request to the blockchain context cache.
#[derive(Debug, Clone)]
pub enum BlockChainContextRequest {
/// Get the current blockchain context.
Context,
/// Gets all the current `RandomX` VMs.
CurrentRxVms,
@ -363,9 +327,6 @@ pub enum BlockChainContextResponse {
/// - [`BlockChainContextRequest::AddAltChainContextCache`]
Ok,
/// Response to [`BlockChainContextRequest::Context`]
Context(BlockChainContext),
/// Response to [`BlockChainContextRequest::CurrentRxVms`]
///
/// A map of seed height to `RandomX` VMs.
@ -403,11 +364,20 @@ pub enum BlockChainContextResponse {
/// The blockchain context service.
#[derive(Clone)]
pub struct BlockChainContextService {
pub struct BlockchainContextService {
cached_context: Cache<Arc<arc_swap::ArcSwap<BlockchainContext>>, Arc<BlockchainContext>>,
channel: PollSender<task::ContextTaskRequest>,
}
impl Service<BlockChainContextRequest> for BlockChainContextService {
impl BlockchainContextService {
/// Get the current [`BlockchainContext`] from the cache.
pub fn blockchain_context(&mut self) -> &BlockchainContext {
self.cached_context.load()
}
}
impl Service<BlockChainContextRequest> for BlockchainContextService {
type Response = BlockChainContextResponse;
type Error = tower::BoxError;
type Future =

View file

@ -3,6 +3,9 @@
//! This module contains the async task that handles keeping track of blockchain context.
//! It holds all the context caches and handles [`tower::Service`] requests.
//!
use std::sync::Arc;
use arc_swap::ArcSwap;
use futures::channel::oneshot;
use tokio::sync::mpsc;
use tower::ServiceExt;
@ -12,14 +15,17 @@ use cuprate_consensus_rules::blocks::ContextToVerifyBlock;
use cuprate_helper::cast::u64_to_usize;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain,
Chain, HardFork,
};
use crate::{
alt_chains::{get_alt_chain_difficulty_cache, get_alt_chain_weight_cache, AltChainMap},
difficulty, hardforks, rx_vms, weight, BlockChainContext, BlockChainContextRequest,
BlockChainContextResponse, ContextCacheError, ContextConfig, Database, RawBlockChainContext,
ValidityToken, BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW,
difficulty::DifficultyCache,
hardforks::HardForkState,
rx_vms,
weight::BlockWeightsCache,
BlockChainContextRequest, BlockChainContextResponse, BlockchainContext, ContextCacheError,
ContextConfig, Database, BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW,
};
/// A request from the context service to the context task.
@ -34,18 +40,16 @@ pub(super) struct ContextTaskRequest {
/// The Context task that keeps the blockchain context and handles requests.
pub(crate) struct ContextTask<D: Database> {
/// A token used to invalidate previous contexts when a new
/// block is added to the chain.
current_validity_token: ValidityToken,
context_cache: Arc<ArcSwap<BlockchainContext>>,
/// The difficulty cache.
difficulty_cache: difficulty::DifficultyCache,
difficulty_cache: DifficultyCache,
/// The weight cache.
weight_cache: weight::BlockWeightsCache,
weight_cache: BlockWeightsCache,
/// The RX VM cache.
rx_vm_cache: rx_vms::RandomXVmCache,
/// The hard-fork state cache.
hardfork_state: hardforks::HardForkState,
hardfork_state: HardForkState,
alt_chain_cache_map: AltChainMap,
@ -65,7 +69,7 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
pub(crate) async fn init_context(
cfg: ContextConfig,
mut database: D,
) -> Result<Self, ContextCacheError> {
) -> Result<(Self, Arc<ArcSwap<BlockchainContext>>), ContextCacheError> {
let ContextConfig {
difficulty_cfg,
weights_config,
@ -94,29 +98,19 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
let db = database.clone();
let hardfork_state_handle = tokio::spawn(async move {
hardforks::HardForkState::init_from_chain_height(chain_height, hard_fork_cfg, db).await
HardForkState::init_from_chain_height(chain_height, hard_fork_cfg, db).await
});
let db = database.clone();
let difficulty_cache_handle = tokio::spawn(async move {
difficulty::DifficultyCache::init_from_chain_height(
chain_height,
difficulty_cfg,
db,
Chain::Main,
)
.await
DifficultyCache::init_from_chain_height(chain_height, difficulty_cfg, db, Chain::Main)
.await
});
let db = database.clone();
let weight_cache_handle = tokio::spawn(async move {
weight::BlockWeightsCache::init_from_chain_height(
chain_height,
weights_config,
db,
Chain::Main,
)
.await
BlockWeightsCache::init_from_chain_height(chain_height, weights_config, db, Chain::Main)
.await
});
// Wait for the hardfork state to finish first as we need it to start the randomX VM cache.
@ -128,10 +122,24 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
rx_vms::RandomXVmCache::init_from_chain_height(chain_height, &current_hf, db).await
});
let difficulty_cache = difficulty_cache_handle.await.unwrap()?;
let weight_cache = weight_cache_handle.await.unwrap()?;
let blockchain_context = blockchain_context(
&weight_cache,
&difficulty_cache,
current_hf,
top_block_hash,
chain_height,
already_generated_coins,
);
let context_cache = Arc::new(ArcSwap::from_pointee(blockchain_context));
let context_svc = Self {
current_validity_token: ValidityToken::new(),
difficulty_cache: difficulty_cache_handle.await.unwrap()?,
weight_cache: weight_cache_handle.await.unwrap()?,
context_cache: Arc::clone(&context_cache),
difficulty_cache,
weight_cache,
rx_vm_cache: rx_seed_handle.await.unwrap()?,
hardfork_state,
alt_chain_cache_map: AltChainMap::new(),
@ -141,7 +149,20 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
database,
};
Ok(context_svc)
Ok((context_svc, context_cache))
}
fn update_blockchain_context(&self) {
let context = blockchain_context(
&self.weight_cache,
&self.difficulty_cache,
self.hardfork_state.current_hardfork(),
self.top_block_hash,
self.chain_height,
self.already_generated_coins,
);
self.context_cache.store(Arc::new(context));
}
/// Handles a [`BlockChainContextRequest`] and returns a [`BlockChainContextResponse`].
@ -150,36 +171,6 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
req: BlockChainContextRequest,
) -> Result<BlockChainContextResponse, tower::BoxError> {
Ok(match req {
BlockChainContextRequest::Context => {
tracing::debug!("Getting blockchain context");
let current_hf = self.hardfork_state.current_hardfork();
BlockChainContextResponse::Context(BlockChainContext {
validity_token: self.current_validity_token.clone(),
raw: RawBlockChainContext {
context_to_verify_block: ContextToVerifyBlock {
median_weight_for_block_reward: self
.weight_cache
.median_for_block_reward(current_hf),
effective_median_weight: self
.weight_cache
.effective_median_block_weight(current_hf),
top_hash: self.top_block_hash,
median_block_timestamp: self
.difficulty_cache
.median_timestamp(u64_to_usize(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW)),
chain_height: self.chain_height,
current_hf,
next_difficulty: self.difficulty_cache.next_difficulty(current_hf),
already_generated_coins: self.already_generated_coins,
},
cumulative_difficulty: self.difficulty_cache.cumulative_difficulty(),
median_long_term_weight: self.weight_cache.median_long_term_weight(),
top_block_timestamp: self.difficulty_cache.top_block_timestamp(),
},
})
}
BlockChainContextRequest::CurrentRxVms => {
BlockChainContextResponse::RxVms(self.rx_vm_cache.get_vms().await)
}
@ -202,9 +193,6 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
"Updating blockchain cache with new block, height: {}",
new.height
);
// Cancel the validity token and replace it with a new one.
std::mem::replace(&mut self.current_validity_token, ValidityToken::new())
.set_data_invalid();
self.difficulty_cache.new_block(
new.height,
@ -225,6 +213,8 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
.already_generated_coins
.saturating_add(new.generated_coins);
self.update_blockchain_context();
BlockChainContextResponse::Ok
}
BlockChainContextRequest::PopBlocks { numb_blocks } => {
@ -272,8 +262,7 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
self.already_generated_coins = already_generated_coins;
self.top_block_hash = top_block_hash;
std::mem::replace(&mut self.current_validity_token, ValidityToken::new())
.set_data_invalid();
self.update_blockchain_context();
BlockChainContextResponse::Ok
}
@ -342,3 +331,30 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> {
tracing::info!("Shutting down blockchain context task.");
}
}
fn blockchain_context(
weight_cache: &BlockWeightsCache,
difficulty_cache: &DifficultyCache,
current_hf: HardFork,
top_hash: [u8; 32],
chain_height: usize,
already_generated_coins: u64,
) -> BlockchainContext {
BlockchainContext {
context_to_verify_block: ContextToVerifyBlock {
median_weight_for_block_reward: weight_cache.median_for_block_reward(current_hf),
effective_median_weight: weight_cache.effective_median_block_weight(current_hf),
top_hash,
median_block_timestamp: difficulty_cache
.median_timestamp(u64_to_usize(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW)),
chain_height,
current_hf,
next_difficulty: difficulty_cache.next_difficulty(current_hf),
already_generated_coins,
},
cumulative_difficulty: difficulty_cache.cumulative_difficulty(),
median_long_term_weight: weight_cache.median_long_term_weight(),
top_block_timestamp: difficulty_cache.top_block_timestamp(),
}
}

View file

@ -1,33 +0,0 @@
//! Tokens
//!
//! This module contains tokens which keep track of the validity of certain data.
//! Currently, there is 1 token:
//! - [`ValidityToken`]
//!
use tokio_util::sync::CancellationToken;
/// A token representing if a piece of data is valid.
#[derive(Debug, Clone, Default)]
pub struct ValidityToken {
token: CancellationToken,
}
impl ValidityToken {
/// Creates a new [`ValidityToken`]
pub fn new() -> Self {
Self {
token: CancellationToken::new(),
}
}
/// Returns `true` if the data is still valid.
pub fn is_data_valid(&self) -> bool {
!self.token.is_cancelled()
}
/// Sets the data to invalid.
pub fn set_data_invalid(self) {
self.token.cancel();
}
}

View file

@ -10,10 +10,10 @@ use monero_serai::{
block::Block,
transaction::{Input, Transaction},
};
use tower::{Service, ServiceExt};
use tower::Service;
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_consensus_context::{BlockChainContextRequest, BlockChainContextResponse};
use cuprate_consensus_context::BlockchainContextService;
use cuprate_consensus_rules::{miner_tx::MinerTxError, ConsensusError};
use cuprate_helper::cast::u64_to_usize;
use cuprate_types::{VerifiedBlockInformation, VerifiedTransactionInformation};
@ -110,37 +110,18 @@ impl From<tower::BoxError> for FastSyncError {
}
}
pub struct FastSyncService<C> {
context_svc: C,
pub struct FastSyncService {
context_svc: BlockchainContextService,
}
impl<C> FastSyncService<C>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
{
impl FastSyncService {
#[expect(dead_code)]
pub(crate) const fn new(context_svc: C) -> Self {
pub(crate) const fn new(context_svc: BlockchainContextService) -> Self {
Self { context_svc }
}
}
impl<C> Service<FastSyncRequest> for FastSyncService<C>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
{
impl Service<FastSyncRequest> for FastSyncService {
type Response = FastSyncResponse;
type Error = FastSyncError;
type Future =
@ -151,7 +132,7 @@ where
}
fn call(&mut self, req: FastSyncRequest) -> Self::Future {
let context_svc = self.context_svc.clone();
let mut context_svc = self.context_svc.clone();
Box::pin(async move {
match req {
@ -160,7 +141,7 @@ where
block_ids,
} => validate_hashes(start_height, &block_ids),
FastSyncRequest::ValidateBlock { block, txs, token } => {
validate_block(context_svc, block, txs, token).await
validate_block(&mut context_svc, block, txs, &token)
}
}
})
@ -210,31 +191,13 @@ fn validate_hashes(
})
}
async fn validate_block<C>(
mut context_svc: C,
fn validate_block(
context_svc: &mut BlockchainContextService,
block: Block,
mut txs: HashMap<[u8; 32], Transaction>,
token: ValidBlockId,
) -> Result<FastSyncResponse, FastSyncError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
C::Future: Send + 'static,
{
let BlockChainContextResponse::Context(checked_context) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::Context)
.await?
else {
panic!("Context service returned wrong response!");
};
let block_chain_ctx = checked_context.unchecked_blockchain_context().clone();
token: &ValidBlockId,
) -> Result<FastSyncResponse, FastSyncError> {
let block_chain_ctx = context_svc.blockchain_context().clone();
let block_hash = block.hash();
if block_hash != token.0 {

View file

@ -210,7 +210,7 @@ fn check_txs_unique(txs: &[[u8; 32]]) -> Result<(), BlockError> {
/// This struct contains the data needed to verify a block, implementers MUST make sure
/// the data in this struct is calculated correctly.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ContextToVerifyBlock {
/// ref: <https://monero-book.cuprate.org/consensus_rules/blocks/weights.html#median-weight-for-coinbase-checks>
pub median_weight_for_block_reward: usize,

View file

@ -1,21 +1,18 @@
//! Block Verifier Service.
use std::{
collections::HashMap,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
//! Block Verification.
//!
//! This module contains functions for verifying blocks:
//! - [`verify_main_chain_block`]
//! - [`batch_prepare_main_chain_blocks`]
//! - [`verify_prepped_main_chain_block`]
//! - [`sanity_check_alt_block`]
//!
use std::{collections::HashMap, mem};
use futures::FutureExt;
use monero_serai::{
block::Block,
transaction::{Input, Transaction},
};
use monero_serai::{block::Block, transaction::Input};
use tower::{Service, ServiceExt};
use cuprate_consensus_context::{
BlockChainContextRequest, BlockChainContextResponse, RawBlockChainContext,
BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService,
};
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_types::{
@ -32,17 +29,14 @@ use cuprate_consensus_rules::{
ConsensusError, HardFork,
};
use crate::{
transactions::{VerifyTxRequest, VerifyTxResponse},
Database, ExtendedConsensusError,
};
use crate::{transactions::start_tx_verification, Database, ExtendedConsensusError};
mod alt_block;
mod batch_prepare;
mod free;
use alt_block::sanity_check_alt_block;
use batch_prepare::batch_prepare_main_chain_block;
pub use alt_block::sanity_check_alt_block;
pub use batch_prepare::batch_prepare_main_chain_blocks;
use free::pull_ordered_transactions;
/// A pre-prepared block with all data needed to verify it, except the block's proof of work.
@ -198,168 +192,17 @@ impl PreparedBlock {
}
}
/// A request to verify a block.
pub enum VerifyBlockRequest {
/// A request to verify a block.
MainChain {
block: Block,
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
},
/// Verifies a prepared block.
MainChainPrepped {
/// The already prepared block.
block: PreparedBlock,
/// The full list of transactions for this block, in the order given in `block`.
// TODO: Remove the Arc here
txs: Vec<Arc<TransactionVerificationData>>,
},
/// Batch prepares a list of blocks and transactions for verification.
MainChainBatchPrepareBlocks {
/// The list of blocks and their transactions (not necessarily in the order given in the block).
blocks: Vec<(Block, Vec<Transaction>)>,
},
/// A request to sanity check an alt block, also returning the cumulative difficulty of the alt chain.
///
/// Unlike requests to verify main chain blocks, you do not need to add the returned block to the context
/// service, you will still have to add it to the database though.
AltChain {
/// The alt block to sanity check.
block: Block,
/// The alt transactions.
prepared_txs: HashMap<[u8; 32], TransactionVerificationData>,
},
}
/// A response from a verify block request.
pub enum VerifyBlockResponse {
/// This block is valid.
MainChain(VerifiedBlockInformation),
/// The sanity checked alt block.
AltChain(AltBlockInformation),
/// A list of prepared blocks for verification, you should call [`VerifyBlockRequest::MainChainPrepped`] on each of the returned
/// blocks to fully verify them.
MainChainBatchPrepped(Vec<(PreparedBlock, Vec<Arc<TransactionVerificationData>>)>),
}
/// The block verifier service.
pub struct BlockVerifierService<C, TxV, D> {
/// The context service.
context_svc: C,
/// The tx verifier service.
tx_verifier_svc: TxV,
/// The database.
// Not use yet but will be.
_database: D,
}
impl<C, TxV, D> BlockVerifierService<C, TxV, D>
where
C: Service<BlockChainContextRequest, Response = BlockChainContextResponse>
+ Clone
+ Send
+ 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
+ 'static,
D: Database + Clone + Send + 'static,
D::Future: Send + 'static,
{
/// Creates a new block verifier.
pub(crate) const fn new(context_svc: C, tx_verifier_svc: TxV, database: D) -> Self {
Self {
context_svc,
tx_verifier_svc,
_database: database,
}
}
}
impl<C, TxV, D> Service<VerifyBlockRequest> for BlockVerifierService<C, TxV, D>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
+ 'static,
TxV::Future: Send + 'static,
D: Database + Clone + Send + 'static,
D::Future: Send + 'static,
{
type Response = VerifyBlockResponse;
type Error = ExtendedConsensusError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: VerifyBlockRequest) -> Self::Future {
let context_svc = self.context_svc.clone();
let tx_verifier_svc = self.tx_verifier_svc.clone();
async move {
match req {
VerifyBlockRequest::MainChain {
block,
prepared_txs,
} => {
verify_main_chain_block(block, prepared_txs, context_svc, tx_verifier_svc).await
}
VerifyBlockRequest::MainChainBatchPrepareBlocks { blocks } => {
batch_prepare_main_chain_block(blocks, context_svc).await
}
VerifyBlockRequest::MainChainPrepped { block, txs } => {
verify_prepped_main_chain_block(block, txs, context_svc, tx_verifier_svc, None)
.await
}
VerifyBlockRequest::AltChain {
block,
prepared_txs,
} => sanity_check_alt_block(block, prepared_txs, context_svc).await,
}
}
.boxed()
}
}
/// Verifies a prepared block.
async fn verify_main_chain_block<C, TxV>(
/// Fully verify a block and all its transactions.
pub async fn verify_main_chain_block<D>(
block: Block,
txs: HashMap<[u8; 32], TransactionVerificationData>,
mut context_svc: C,
tx_verifier_svc: TxV,
) -> Result<VerifyBlockResponse, ExtendedConsensusError>
context_svc: &mut BlockchainContextService,
database: D,
) -> Result<VerifiedBlockInformation, ExtendedConsensusError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>,
D: Database + Clone + Send + 'static,
{
let BlockChainContextResponse::Context(checked_context) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::Context)
.await?
else {
panic!("Context service returned wrong response!");
};
let context = checked_context.unchecked_blockchain_context().clone();
let context = context_svc.blockchain_context().clone();
tracing::debug!("got blockchain context: {:?}", context);
tracing::debug!(
@ -398,55 +241,22 @@ where
.map_err(ConsensusError::Block)?;
// Check that the txs included are what we need and that there are not any extra.
// TODO: Remove the Arc here
let ordered_txs = pull_ordered_transactions(&prepped_block.block, txs)?
.into_iter()
.map(Arc::new)
.collect();
let ordered_txs = pull_ordered_transactions(&prepped_block.block, txs)?;
verify_prepped_main_chain_block(
prepped_block,
ordered_txs,
context_svc,
tx_verifier_svc,
Some(context),
)
.await
verify_prepped_main_chain_block(prepped_block, ordered_txs, context_svc, database).await
}
async fn verify_prepped_main_chain_block<C, TxV>(
/// Fully verify a block that has already been prepared using [`batch_prepare_main_chain_blocks`].
pub async fn verify_prepped_main_chain_block<D>(
prepped_block: PreparedBlock,
txs: Vec<Arc<TransactionVerificationData>>,
context_svc: C,
tx_verifier_svc: TxV,
cached_context: Option<RawBlockChainContext>,
) -> Result<VerifyBlockResponse, ExtendedConsensusError>
mut txs: Vec<TransactionVerificationData>,
context_svc: &mut BlockchainContextService,
database: D,
) -> Result<VerifiedBlockInformation, ExtendedConsensusError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>,
D: Database + Clone + Send + 'static,
{
let context = if let Some(context) = cached_context {
context
} else {
let BlockChainContextResponse::Context(checked_context) = context_svc
.oneshot(BlockChainContextRequest::Context)
.await?
else {
panic!("Context service returned wrong response!");
};
let context = checked_context.unchecked_blockchain_context().clone();
tracing::debug!("got blockchain context: {context:?}");
context
};
let context = context_svc.blockchain_context();
tracing::debug!("verifying block: {}", hex::encode(prepped_block.block_hash));
@ -464,15 +274,20 @@ where
}
}
tx_verifier_svc
.oneshot(VerifyTxRequest::Prepped {
txs: txs.clone(),
current_chain_height: context.chain_height,
top_hash: context.top_hash,
time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(),
hf: context.current_hf,
})
let temp = start_tx_verification()
.append_prepped_txs(mem::take(&mut txs))
.prepare()?
.full(
context.chain_height,
context.top_hash,
context.current_adjusted_timestamp_for_time_lock(),
context.current_hf,
database,
)
.verify()
.await?;
txs = temp;
}
let block_weight =
@ -489,26 +304,18 @@ where
)
.map_err(ConsensusError::Block)?;
Ok(VerifyBlockResponse::MainChain(VerifiedBlockInformation {
Ok(VerifiedBlockInformation {
block_hash: prepped_block.block_hash,
block: prepped_block.block,
block_blob: prepped_block.block_blob,
txs: txs
.into_iter()
.map(|tx| {
// Note: it would be possible for the transaction verification service to hold onto the tx after the call
// if one of txs was invalid and the rest are still in rayon threads.
let tx = Arc::into_inner(tx).expect(
"Transaction verification service should not hold onto valid transactions.",
);
VerifiedTransactionInformation {
tx_blob: tx.tx_blob,
tx_weight: tx.tx_weight,
fee: tx.fee,
tx_hash: tx.tx_hash,
tx: tx.tx,
}
.map(|tx| VerifiedTransactionInformation {
tx_blob: tx.tx_blob,
tx_weight: tx.tx_weight,
fee: tx.fee,
tx_hash: tx.tx_hash,
tx: tx.tx,
})
.collect(),
pow_hash: prepped_block.pow_hash,
@ -517,5 +324,5 @@ where
height: context.chain_height,
long_term_weight: context.next_block_long_term_weight(block_weight),
cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty,
}))
})
}

View file

@ -29,7 +29,6 @@ use cuprate_types::{
use crate::{
block::{free::pull_ordered_transactions, PreparedBlock},
BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError,
VerifyBlockResponse,
};
/// This function sanity checks an alt-block.
@ -37,11 +36,11 @@ use crate::{
/// Returns [`AltBlockInformation`], which contains the cumulative difficulty of the alt chain.
///
/// This function only checks the block's proof-of-work and its weight.
pub(crate) async fn sanity_check_alt_block<C>(
pub async fn sanity_check_alt_block<C>(
block: Block,
txs: HashMap<[u8; 32], TransactionVerificationData>,
mut context_svc: C,
) -> Result<VerifyBlockResponse, ExtendedConsensusError>
) -> Result<AltBlockInformation, ExtendedConsensusError>
where
C: Service<
BlockChainContextRequest,
@ -185,7 +184,7 @@ where
})
.await?;
Ok(VerifyBlockResponse::AltChain(block_info))
Ok(block_info)
}
/// Retrieves the alt RX VM for the chosen block height.

View file

@ -5,7 +5,7 @@ use rayon::prelude::*;
use tower::{Service, ServiceExt};
use tracing::instrument;
use cuprate_consensus_context::rx_vms::RandomXVm;
use cuprate_consensus_context::{rx_vms::RandomXVm, BlockchainContextService};
use cuprate_consensus_rules::{
blocks::{check_block_pow, is_randomx_seed_height, randomx_seed_height, BlockError},
hard_forks::HardForkError,
@ -13,29 +13,21 @@ use cuprate_consensus_rules::{
ConsensusError, HardFork,
};
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_types::TransactionVerificationData;
use crate::{
block::{free::pull_ordered_transactions, PreparedBlock, PreparedBlockExPow},
transactions::new_tx_verification_data,
batch_verifier::MultiThreadedBatchVerifier,
block::{free::order_transactions, PreparedBlock, PreparedBlockExPow},
transactions::start_tx_verification,
BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError,
VerifyBlockResponse,
};
/// Batch prepares a list of blocks for verification.
#[instrument(level = "debug", name = "batch_prep_blocks", skip_all, fields(amt = blocks.len()))]
pub(crate) async fn batch_prepare_main_chain_block<C>(
pub async fn batch_prepare_main_chain_blocks(
blocks: Vec<(Block, Vec<Transaction>)>,
mut context_svc: C,
) -> Result<VerifyBlockResponse, ExtendedConsensusError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
C::Future: Send + 'static,
{
context_svc: &mut BlockchainContextService,
) -> Result<Vec<(PreparedBlock, Vec<TransactionVerificationData>)>, ExtendedConsensusError> {
let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
tracing::debug!("Calculating block hashes.");
@ -89,16 +81,6 @@ where
timestamps_hfs.push((block_0.block.header.timestamp, block_0.hf_version));
}
// Get the current blockchain context.
let BlockChainContextResponse::Context(checked_context) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::Context)
.await?
else {
panic!("Context service returned wrong response!");
};
// Calculate the expected difficulties for each block in the batch.
let BlockChainContextResponse::BatchDifficulties(difficulties) = context_svc
.ready()
@ -111,7 +93,8 @@ where
panic!("Context service returned wrong response!");
};
let context = checked_context.unchecked_blockchain_context().clone();
// Get the current blockchain context.
let context = context_svc.blockchain_context();
// Make sure the blocks follow the main chain.
@ -168,7 +151,9 @@ where
tracing::debug!("Calculating PoW and prepping transaction");
let blocks = rayon_spawn_async(move || {
blocks
let batch_verifier = MultiThreadedBatchVerifier::new(rayon::current_num_threads());
let res = blocks
.into_par_iter()
.zip(difficulties)
.zip(txs)
@ -183,27 +168,26 @@ where
// Check the PoW
check_block_pow(&block.pow_hash, difficultly).map_err(ConsensusError::Block)?;
// Now setup the txs.
let txs = txs
.into_par_iter()
.map(|tx| {
let tx = new_tx_verification_data(tx)?;
Ok::<_, ConsensusError>((tx.tx_hash, tx))
})
.collect::<Result<HashMap<_, _>, _>>()?;
let mut txs = start_tx_verification()
.append_txs(txs)
.prepare()?
.only_semantic(block.hf_version)
.queue(&batch_verifier)?;
// Order the txs correctly.
// TODO: Remove the Arc here
let ordered_txs = pull_ordered_transactions(&block.block, txs)?
.into_iter()
.map(Arc::new)
.collect();
order_transactions(&block.block, &mut txs)?;
Ok((block, ordered_txs))
Ok((block, txs))
})
.collect::<Result<Vec<_>, ExtendedConsensusError>>()
.collect::<Result<Vec<_>, ExtendedConsensusError>>()?;
if !batch_verifier.verify() {
return Err(ExtendedConsensusError::OneOrMoreBatchVerificationStatementsInvalid);
}
Ok(res)
})
.await?;
Ok(VerifyBlockResponse::MainChainBatchPrepped(blocks))
Ok(blocks)
}

View file

@ -7,6 +7,36 @@ use cuprate_types::TransactionVerificationData;
use crate::ExtendedConsensusError;
/// Orders the [`TransactionVerificationData`] list the same as it appears in [`Block::transactions`]
pub(crate) fn order_transactions(
block: &Block,
txs: &mut [TransactionVerificationData],
) -> Result<(), ExtendedConsensusError> {
if block.transactions.len() != txs.len() {
return Err(ExtendedConsensusError::TxsIncludedWithBlockIncorrect);
}
for (i, tx_hash) in block.transactions.iter().enumerate() {
if tx_hash != &txs[i].tx_hash {
let at_index = txs[i..]
.iter()
.position(|tx| &tx.tx_hash == tx_hash)
.ok_or(ExtendedConsensusError::TxsIncludedWithBlockIncorrect)?;
// The above `position` will give an index from inside its view of the slice so we need to add the difference.
txs.swap(i, i + at_index);
}
}
debug_assert!(block
.transactions
.iter()
.zip(txs.iter())
.all(|(tx_hash, tx)| tx_hash == &tx.tx_hash));
Ok(())
}
/// Returns a list of transactions, pulled from `txs` in the order they are in the [`Block`].
///
/// Will error if a tx need is not in `txs` or if `txs` contain more txs than needed.

View file

@ -1,10 +1,6 @@
//! Cuprate Consensus
//!
//! This crate contains 3 [`tower::Service`]s that implement Monero's consensus rules:
//!
//! - [`BlockChainContextService`] Which handles keeping the current state of the blockchain.
//! - [`BlockVerifierService`] Which handles block verification.
//! - [`TxVerifierService`] Which handles transaction verification.
//! This crate contains Monero [`block`] and [`transactions`] verification functionality.
//!
//! This crate is generic over the database which is implemented as a [`tower::Service`]. To
//! implement a database you need to have a service which accepts [`BlockchainReadRequest`] and responds
@ -17,23 +13,22 @@ cfg_if::cfg_if! {
use cuprate_test_utils as _;
use curve25519_dalek as _;
use hex_literal as _;
use futures as _;
}
}
use cuprate_consensus_rules::ConsensusError;
mod batch_verifier;
pub mod batch_verifier;
pub mod block;
#[cfg(test)]
mod tests;
pub mod transactions;
pub use block::{BlockVerifierService, VerifyBlockRequest, VerifyBlockResponse};
pub use cuprate_consensus_context::{
initialize_blockchain_context, BlockChainContext, BlockChainContextRequest,
BlockChainContextResponse, BlockChainContextService, ContextConfig,
initialize_blockchain_context, BlockChainContextRequest, BlockChainContextResponse,
BlockchainContext, BlockchainContextService, ContextConfig,
};
pub use transactions::{TxVerifierService, VerifyTxRequest, VerifyTxResponse};
// re-export.
pub use cuprate_consensus_rules::genesis::generate_genesis_block;
@ -63,37 +58,9 @@ pub enum ExtendedConsensusError {
NoBlocksToVerify,
}
/// Initialize the 2 verifier [`tower::Service`]s (block and transaction).
pub fn initialize_verifier<D, Ctx>(
database: D,
ctx_svc: Ctx,
) -> (
BlockVerifierService<Ctx, TxVerifierService<D>, D>,
TxVerifierService<D>,
)
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
Ctx: tower::Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
Ctx::Future: Send + 'static,
{
let tx_svc = TxVerifierService::new(database.clone());
let block_svc = BlockVerifierService::new(ctx_svc, tx_svc.clone(), database);
(block_svc, tx_svc)
}
use __private::Database;
pub mod __private {
use std::future::Future;
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
/// A type alias trait used to represent a database, so we don't have to write [`tower::Service`] bounds
@ -108,10 +75,9 @@ pub mod __private {
BlockchainReadRequest,
Response = BlockchainResponse,
Error = tower::BoxError,
Future = Self::Future2,
Future: Send + 'static,
>
{
type Future2: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static;
}
impl<
@ -119,11 +85,9 @@ pub mod __private {
BlockchainReadRequest,
Response = BlockchainResponse,
Error = tower::BoxError,
Future: Send + 'static,
>,
> Database for T
where
T::Future: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static,
{
type Future2 = T::Future;
}
}

View file

@ -3,8 +3,7 @@ use proptest::{strategy::Strategy, test_runner::TestRunner};
use tower::ServiceExt;
use cuprate_consensus_context::{
initialize_blockchain_context, BlockChainContextRequest, BlockChainContextResponse,
ContextConfig, NewBlockData,
initialize_blockchain_context, BlockChainContextRequest, ContextConfig, NewBlockData,
};
use crate::{tests::mock_db::*, HardFork};
@ -35,21 +34,12 @@ async fn context_invalidated_on_new_block() -> Result<(), tower::BoxError> {
.unwrap()
.current();
let ctx_svc = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?;
let mut ctx_svc = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?;
let BlockChainContextResponse::Context(context) = ctx_svc
.clone()
.oneshot(BlockChainContextRequest::Context)
.await?
else {
panic!("Context service returned wrong response!");
};
assert!(context.is_still_valid());
assert!(context.is_still_valid());
assert!(context.is_still_valid());
let context = ctx_svc.blockchain_context().clone();
ctx_svc
.clone()
.oneshot(BlockChainContextRequest::Update(NewBlockData {
block_hash: [0; 32],
height: BLOCKCHAIN_HEIGHT,
@ -62,7 +52,7 @@ async fn context_invalidated_on_new_block() -> Result<(), tower::BoxError> {
}))
.await?;
assert!(!context.is_still_valid());
assert_ne!(&context, ctx_svc.blockchain_context());
Ok(())
}
@ -77,18 +67,11 @@ async fn context_height_correct() -> Result<(), tower::BoxError> {
.unwrap()
.current();
let ctx_svc = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?;
let mut ctx_svc = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?;
let BlockChainContextResponse::Context(context) =
ctx_svc.oneshot(BlockChainContextRequest::Context).await?
else {
panic!("context service returned incorrect response!")
};
let context = ctx_svc.blockchain_context();
assert_eq!(
context.blockchain_context().unwrap().chain_height,
BLOCKCHAIN_HEIGHT
);
assert_eq!(context.chain_height, BLOCKCHAIN_HEIGHT);
Ok(())
}

View file

@ -1,20 +1,35 @@
//! # Transaction Verifier Service.
//!
//! This module contains the [`TxVerifierService`] which handles consensus validation of transactions.
//! This module contains the transaction validation interface, which can be accessed with [`start_tx_verification`].
//!
use std::{
collections::HashSet,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
//! Transaction verification states will be cached to prevent doing the expensive checks multiple times.
//!
//! ## Example Semantic Verification
//!
//! ```rust
//! # use cuprate_test_utils::data::TX_E2D393;
//! # use monero_serai::transaction::Transaction;
//! use cuprate_consensus::{transactions::start_tx_verification, HardFork, batch_verifier::MultiThreadedBatchVerifier};
//!
//! # fn main() -> Result<(), tower::BoxError> {
//! # let tx = Transaction::read(&mut TX_E2D393).unwrap();
//! let batch_verifier = MultiThreadedBatchVerifier::new(rayon::current_num_threads());
//!
//! let tx = start_tx_verification()
//! .append_txs(vec![tx])
//! .prepare()?
//! .only_semantic(HardFork::V9)
//! .queue(&batch_verifier)?;
//!
//! assert!(batch_verifier.verify());
//! Ok(())
//! # }
//! ```
use std::collections::HashSet;
use futures::FutureExt;
use monero_serai::transaction::{Input, Timelock, Transaction};
use rayon::prelude::*;
use tower::{Service, ServiceExt};
use tracing::instrument;
use tower::ServiceExt;
use cuprate_consensus_rules::{
transactions::{
@ -40,183 +55,223 @@ mod free;
pub use free::new_tx_verification_data;
/// A struct representing the type of validation that needs to be completed for this transaction.
/// An enum representing the type of validation that needs to be completed for this transaction.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
enum VerificationNeeded {
/// Decoy check on a v1 transaction.
V1DecoyCheck,
/// Both semantic validation and contextual validation are needed.
SemanticAndContextual,
/// Only contextual validation is needed.
Contextual,
/// No verification needed.
None,
}
/// A request to verify a transaction.
pub enum VerifyTxRequest {
/// Verifies a batch of prepared txs.
Prepped {
/// The transactions to verify.
// TODO: Can we use references to remove the Vec? wont play nicely with Service though
txs: Vec<Arc<TransactionVerificationData>>,
/// The current chain height.
current_chain_height: usize,
/// The top block hash.
top_hash: [u8; 32],
/// The value for time to use to check time locked outputs.
time_for_time_lock: u64,
/// The current [`HardFork`]
hf: HardFork,
},
/// Verifies a batch of new txs.
/// Returning [`VerifyTxResponse::OkPrepped`]
New {
/// The transactions to verify.
txs: Vec<Transaction>,
/// The current chain height.
current_chain_height: usize,
/// The top block hash.
top_hash: [u8; 32],
/// The value for time to use to check time locked outputs.
time_for_time_lock: u64,
/// The current [`HardFork`]
hf: HardFork,
},
}
/// A response from a verify transaction request.
#[derive(Debug)]
pub enum VerifyTxResponse {
OkPrepped(Vec<Arc<TransactionVerificationData>>),
Ok,
}
/// The transaction verifier service.
#[derive(Clone)]
pub struct TxVerifierService<D> {
/// The database.
database: D,
}
impl<D> TxVerifierService<D>
where
D: Database + Clone + Send + 'static,
D::Future: Send + 'static,
{
/// Creates a new [`TxVerifierService`].
pub const fn new(database: D) -> Self {
Self { database }
/// Start the transaction verification process.
pub const fn start_tx_verification() -> PrepTransactions {
PrepTransactions {
txs: vec![],
prepped_txs: vec![],
}
}
impl<D> Service<VerifyTxRequest> for TxVerifierService<D>
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
type Response = VerifyTxResponse;
type Error = ExtendedConsensusError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.database.poll_ready(cx).map_err(Into::into)
}
fn call(&mut self, req: VerifyTxRequest) -> Self::Future {
let database = self.database.clone();
async move {
match req {
VerifyTxRequest::New {
txs,
current_chain_height,
top_hash,
time_for_time_lock,
hf,
} => {
prep_and_verify_transactions(
database,
txs,
current_chain_height,
top_hash,
time_for_time_lock,
hf,
)
.await
}
VerifyTxRequest::Prepped {
txs,
current_chain_height,
top_hash,
time_for_time_lock,
hf,
} => {
verify_prepped_transactions(
database,
&txs,
current_chain_height,
top_hash,
time_for_time_lock,
hf,
)
.await
}
}
}
.boxed()
}
}
/// Prepares transactions for verification, then verifies them.
async fn prep_and_verify_transactions<D>(
database: D,
/// The preparation phase of transaction verification.
///
/// The order of transactions will be kept throughout the verification process, transactions
/// inserted with [`PrepTransactions::append_prepped_txs`] will be put before transactions given
/// in [`PrepTransactions::append_txs`]
pub struct PrepTransactions {
prepped_txs: Vec<TransactionVerificationData>,
txs: Vec<Transaction>,
current_chain_height: usize,
top_hash: [u8; 32],
time_for_time_lock: u64,
hf: HardFork,
) -> Result<VerifyTxResponse, ExtendedConsensusError>
where
D: Database + Clone + Sync + Send + 'static,
{
let span = tracing::info_span!("prep_txs", amt = txs.len());
tracing::debug!(parent: &span, "prepping transactions for verification.");
let txs = rayon_spawn_async(|| {
txs.into_par_iter()
.map(|tx| new_tx_verification_data(tx).map(Arc::new))
.collect::<Result<Vec<_>, _>>()
})
.await?;
verify_prepped_transactions(
database,
&txs,
current_chain_height,
top_hash,
time_for_time_lock,
hf,
)
.await?;
Ok(VerifyTxResponse::OkPrepped(txs))
}
#[instrument(name = "verify_txs", skip_all, fields(amt = txs.len()) level = "info")]
async fn verify_prepped_transactions<D>(
mut database: D,
txs: &[Arc<TransactionVerificationData>],
impl PrepTransactions {
/// Append some new transactions to prepare.
#[must_use]
pub fn append_txs(mut self, mut txs: Vec<Transaction>) -> Self {
self.txs.append(&mut txs);
self
}
/// Append some already prepped transactions.
#[must_use]
pub fn append_prepped_txs(mut self, mut txs: Vec<TransactionVerificationData>) -> Self {
self.prepped_txs.append(&mut txs);
self
}
/// Prepare the transactions and advance to the next step: [`VerificationWanted`].
///
/// # [`rayon`]
///
/// This function will use [`rayon`] to parallelize the preparation process, so should not be called
/// in an async function, unless all the transactions given were already prepared, i.e. [`Self::append_prepped_txs`].
pub fn prepare(mut self) -> Result<VerificationWanted, ConsensusError> {
if !self.txs.is_empty() {
self.prepped_txs.append(
&mut self
.txs
.into_par_iter()
.map(new_tx_verification_data)
.collect::<Result<_, _>>()?,
);
}
Ok(VerificationWanted {
prepped_txs: self.prepped_txs,
})
}
}
/// The step where the type of verification is decided.
pub struct VerificationWanted {
prepped_txs: Vec<TransactionVerificationData>,
}
impl VerificationWanted {
/// Only semantic verification.
///
/// Semantic verification is verification that can done without other blockchain data. The [`HardFork`]
/// is technically other blockchain data but excluding it reduces the amount of things that can be checked
/// significantly, and it is easy to get compared to other blockchain data needed for contextual validation.
pub fn only_semantic(self, hf: HardFork) -> SemanticVerification {
SemanticVerification {
prepped_txs: self.prepped_txs,
hf,
}
}
/// Full verification.
///
/// Fully verify the transactions, all checks will be performed, if they were already performed then they
/// won't be done again unless necessary.
pub fn full<D: Database>(
self,
current_chain_height: usize,
top_hash: [u8; 32],
time_for_time_lock: u64,
hf: HardFork,
database: D,
) -> FullVerification<D> {
FullVerification {
prepped_txs: self.prepped_txs,
current_chain_height,
top_hash,
time_for_time_lock,
hf,
database,
}
}
}
/// Semantic transaction verification.
///
/// [`VerificationWanted::only_semantic`]
pub struct SemanticVerification {
prepped_txs: Vec<TransactionVerificationData>,
hf: HardFork,
}
impl SemanticVerification {
/// Perform the semantic checks and queue any checks that can be batched into the batch verifier.
///
/// If this function returns [`Ok`] the transaction(s) could still be semantically invalid,
/// [`MultiThreadedBatchVerifier::verify`] must be called on the `batch_verifier` after.
pub fn queue(
mut self,
batch_verifier: &MultiThreadedBatchVerifier,
) -> Result<Vec<TransactionVerificationData>, ConsensusError> {
self.prepped_txs.par_iter_mut().try_for_each(|tx| {
let fee = check_transaction_semantic(
&tx.tx,
tx.tx_blob.len(),
tx.tx_weight,
&tx.tx_hash,
self.hf,
batch_verifier,
)?;
// make sure we calculated the right fee.
assert_eq!(fee, tx.fee);
tx.cached_verification_state = CachedVerificationState::OnlySemantic(self.hf);
Ok::<_, ConsensusError>(())
})?;
Ok(self.prepped_txs)
}
}
/// Full transaction verification.
///
/// [`VerificationWanted::full`]
pub struct FullVerification<D> {
prepped_txs: Vec<TransactionVerificationData>,
current_chain_height: usize,
top_hash: [u8; 32],
time_for_time_lock: u64,
hf: HardFork,
) -> Result<VerifyTxResponse, ExtendedConsensusError>
where
D: Database + Clone + Sync + Send + 'static,
{
tracing::debug!("Verifying transactions");
database: D,
}
tracing::trace!("Checking for duplicate key images");
impl<D: Database + Clone> FullVerification<D> {
/// Fully verify each transaction.
pub async fn verify(
mut self,
) -> Result<Vec<TransactionVerificationData>, ExtendedConsensusError> {
check_kis_unique(&self.prepped_txs, &mut self.database).await?;
let hashes_in_main_chain =
hashes_referenced_in_main_chain(&self.prepped_txs, &mut self.database).await?;
let (verification_needed, any_v1_decoy_check_needed) = verification_needed(
&self.prepped_txs,
&hashes_in_main_chain,
self.hf,
self.current_chain_height,
self.time_for_time_lock,
)?;
if any_v1_decoy_check_needed {
verify_transactions_decoy_info(
self.prepped_txs
.iter()
.zip(verification_needed.iter())
.filter_map(|(tx, needed)| {
if *needed == VerificationNeeded::V1DecoyCheck {
Some(tx)
} else {
None
}
}),
self.hf,
self.database.clone(),
)
.await?;
}
verify_transactions(
self.prepped_txs,
verification_needed,
self.current_chain_height,
self.top_hash,
self.time_for_time_lock,
self.hf,
self.database,
)
.await
}
}
/// Check that each key image used in each transaction is unique in the whole chain.
async fn check_kis_unique<D: Database>(
txs: &[TransactionVerificationData],
database: &mut D,
) -> Result<(), ExtendedConsensusError> {
let mut spent_kis = HashSet::with_capacity(txs.len());
txs.iter().try_for_each(|tx| {
@ -246,14 +301,18 @@ where
return Err(ConsensusError::Transaction(TransactionError::KeyImageSpent).into());
}
Ok(())
}
/// Returns a [`HashSet`] of all the hashes referenced in each transaction's [`CachedVerificationState`], that
/// are also in the main chain.
async fn hashes_referenced_in_main_chain<D: Database>(
txs: &[TransactionVerificationData],
database: &mut D,
) -> Result<HashSet<[u8; 32]>, ExtendedConsensusError> {
let mut verified_at_block_hashes = txs
.iter()
.filter_map(|txs| {
txs.cached_verification_state
.lock()
.unwrap()
.verified_at_block_hash()
})
.filter_map(|txs| txs.cached_verification_state.verified_at_block_hash())
.collect::<HashSet<_>>();
tracing::trace!(
@ -277,74 +336,53 @@ where
verified_at_block_hashes = known_hashes;
}
let (txs_needing_full_verification, txs_needing_partial_verification) =
transactions_needing_verification(
txs,
&verified_at_block_hashes,
hf,
current_chain_height,
time_for_time_lock,
)?;
futures::try_join!(
verify_transactions_decoy_info(txs_needing_partial_verification, hf, database.clone()),
verify_transactions(
txs_needing_full_verification,
current_chain_height,
top_hash,
time_for_time_lock,
hf,
database
)
)?;
Ok(VerifyTxResponse::Ok)
Ok(verified_at_block_hashes)
}
#[expect(
clippy::type_complexity,
reason = "I don't think the return is too complex"
)]
fn transactions_needing_verification(
txs: &[Arc<TransactionVerificationData>],
/// Returns a list of [`VerificationNeeded`] for each transaction passed in. The returned
/// [`Vec`] will be the same length as the inputted transactions.
///
/// A [`bool`] is also returned, which will be true if any transactions need [`VerificationNeeded::V1DecoyCheck`].
fn verification_needed(
txs: &[TransactionVerificationData],
hashes_in_main_chain: &HashSet<[u8; 32]>,
current_hf: HardFork,
current_chain_height: usize,
time_for_time_lock: u64,
) -> Result<
(
Vec<(Arc<TransactionVerificationData>, VerificationNeeded)>,
Vec<Arc<TransactionVerificationData>>,
),
ConsensusError,
> {
) -> Result<(Vec<VerificationNeeded>, bool), ConsensusError> {
// txs needing full validation: semantic and/or contextual
let mut full_validation_transactions = Vec::new();
// txs needing partial _contextual_ validation, not semantic.
let mut partial_validation_transactions = Vec::new();
let mut verification_needed = Vec::with_capacity(txs.len());
let mut any_v1_decoy_checks = false;
for tx in txs {
let guard = tx.cached_verification_state.lock().unwrap();
match &*guard {
match &tx.cached_verification_state {
CachedVerificationState::NotVerified => {
drop(guard);
full_validation_transactions
.push((Arc::clone(tx), VerificationNeeded::SemanticAndContextual));
// Tx not verified at all need all checks.
verification_needed.push(VerificationNeeded::SemanticAndContextual);
continue;
}
CachedVerificationState::OnlySemantic(hf) => {
if current_hf != *hf {
// HF changed must do semantic checks again.
verification_needed.push(VerificationNeeded::SemanticAndContextual);
continue;
}
// Tx already semantically valid for this HF only contextual checks needed.
verification_needed.push(VerificationNeeded::Contextual);
continue;
}
CachedVerificationState::ValidAtHashAndHF { block_hash, hf } => {
if current_hf != *hf {
drop(guard);
full_validation_transactions
.push((Arc::clone(tx), VerificationNeeded::SemanticAndContextual));
// HF changed must do all checks again.
verification_needed.push(VerificationNeeded::SemanticAndContextual);
continue;
}
if !hashes_in_main_chain.contains(block_hash) {
drop(guard);
full_validation_transactions
.push((Arc::clone(tx), VerificationNeeded::Contextual));
// The block we know this transaction was valid at is no longer in the chain do
// contextual checks again.
verification_needed.push(VerificationNeeded::Contextual);
continue;
}
}
@ -354,20 +392,20 @@ fn transactions_needing_verification(
time_lock,
} => {
if current_hf != *hf {
drop(guard);
full_validation_transactions
.push((Arc::clone(tx), VerificationNeeded::SemanticAndContextual));
// HF changed must do all checks again.
verification_needed.push(VerificationNeeded::SemanticAndContextual);
continue;
}
if !hashes_in_main_chain.contains(block_hash) {
drop(guard);
full_validation_transactions
.push((Arc::clone(tx), VerificationNeeded::Contextual));
// The block we know this transaction was valid at is no longer in the chain do
// contextual checks again.
verification_needed.push(VerificationNeeded::Contextual);
continue;
}
// If the time lock is still locked then the transaction is invalid.
// Time is not monotonic in Monero so these can become invalid with new blocks.
if !output_unlocked(time_lock, current_chain_height, time_for_time_lock, *hf) {
return Err(ConsensusError::Transaction(
TransactionError::OneOrMoreRingMembersLocked,
@ -377,57 +415,80 @@ fn transactions_needing_verification(
}
if tx.version == TxVersion::RingSignatures {
drop(guard);
partial_validation_transactions.push(Arc::clone(tx));
// v1 txs always need at least decoy checks as they can become invalid with new blocks.
verification_needed.push(VerificationNeeded::V1DecoyCheck);
any_v1_decoy_checks = true;
continue;
}
verification_needed.push(VerificationNeeded::None);
}
Ok((
full_validation_transactions,
partial_validation_transactions,
))
Ok((verification_needed, any_v1_decoy_checks))
}
async fn verify_transactions_decoy_info<D>(
txs: Vec<Arc<TransactionVerificationData>>,
/// Do [`VerificationNeeded::V1DecoyCheck`] on each tx passed in.
async fn verify_transactions_decoy_info<D: Database>(
txs: impl Iterator<Item = &TransactionVerificationData> + Clone,
hf: HardFork,
database: D,
) -> Result<(), ExtendedConsensusError>
where
D: Database + Clone + Sync + Send + 'static,
{
) -> Result<(), ExtendedConsensusError> {
// Decoy info is not validated for V1 txs.
if hf == HardFork::V1 || txs.is_empty() {
if hf == HardFork::V1 {
return Ok(());
}
batch_get_decoy_info(&txs, hf, database)
batch_get_decoy_info(txs, hf, database)
.await?
.try_for_each(|decoy_info| decoy_info.and_then(|di| Ok(check_decoy_info(&di, hf)?)))?;
Ok(())
}
/// Do [`VerificationNeeded::Contextual`] or [`VerificationNeeded::SemanticAndContextual`].
///
/// The inputs to this function are the txs wanted to be verified and a list of [`VerificationNeeded`],
/// if any other [`VerificationNeeded`] is specified other than [`VerificationNeeded::Contextual`] or
/// [`VerificationNeeded::SemanticAndContextual`], nothing will be verified for that tx.
async fn verify_transactions<D>(
txs: Vec<(Arc<TransactionVerificationData>, VerificationNeeded)>,
mut txs: Vec<TransactionVerificationData>,
verification_needed: Vec<VerificationNeeded>,
current_chain_height: usize,
top_hash: [u8; 32],
current_time_lock_timestamp: u64,
hf: HardFork,
database: D,
) -> Result<(), ExtendedConsensusError>
) -> Result<Vec<TransactionVerificationData>, ExtendedConsensusError>
where
D: Database + Clone + Sync + Send + 'static,
D: Database,
{
let txs_ring_member_info =
batch_get_ring_member_info(txs.iter().map(|(tx, _)| tx), hf, database).await?;
/// A filter each tx not [`VerificationNeeded::Contextual`] or
/// [`VerificationNeeded::SemanticAndContextual`]
const fn tx_filter<T>((_, needed): &(T, &VerificationNeeded)) -> bool {
matches!(
needed,
VerificationNeeded::Contextual | VerificationNeeded::SemanticAndContextual
)
}
let txs_ring_member_info = batch_get_ring_member_info(
txs.iter()
.zip(verification_needed.iter())
.filter(tx_filter)
.map(|(tx, _)| tx),
hf,
database,
)
.await?;
rayon_spawn_async(move || {
let batch_verifier = MultiThreadedBatchVerifier::new(rayon::current_num_threads());
txs.par_iter()
.zip(txs_ring_member_info.par_iter())
txs.iter()
.zip(verification_needed.iter())
.filter(tx_filter)
.zip(txs_ring_member_info.iter())
.par_bridge()
.try_for_each(|((tx, verification_needed), ring)| {
// do semantic validation if needed.
if *verification_needed == VerificationNeeded::SemanticAndContextual {
@ -459,11 +520,12 @@ where
return Err(ExtendedConsensusError::OneOrMoreBatchVerificationStatementsInvalid);
}
txs.iter()
txs.iter_mut()
.zip(verification_needed.iter())
.filter(tx_filter)
.zip(txs_ring_member_info)
.for_each(|((tx, _), ring)| {
*tx.cached_verification_state.lock().unwrap() = if ring.time_locked_outs.is_empty()
{
tx.cached_verification_state = if ring.time_locked_outs.is_empty() {
// no outputs with time-locks used.
CachedVerificationState::ValidAtHashAndHF {
block_hash: top_hash,
@ -475,7 +537,7 @@ where
.time_locked_outs
.iter()
.filter_map(|lock| match lock {
Timelock::Time(time) => Some(*time),
Timelock::Time(time) => Some(time),
_ => None,
})
.min();
@ -485,7 +547,7 @@ where
CachedVerificationState::ValidAtHashAndHFWithTimeBasedLock {
block_hash: top_hash,
hf,
time_lock: Timelock::Time(time),
time_lock: Timelock::Time(*time),
}
} else {
// no time-based locked output was used.
@ -497,9 +559,7 @@ where
}
});
Ok(())
Ok(txs)
})
.await?;
Ok(())
.await
}

View file

@ -10,10 +10,7 @@
//!
//! Because this data is unique for *every* transaction and the context service is just for blockchain state data.
//!
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use std::collections::{HashMap, HashSet};
use monero_serai::transaction::{Input, Timelock};
use tower::ServiceExt;
@ -142,7 +139,7 @@ fn new_rings(
/// This function batch gets all the ring members for the inputted transactions and fills in data about
/// them.
pub async fn batch_get_ring_member_info<D: Database>(
txs_verification_data: impl Iterator<Item = &Arc<TransactionVerificationData>> + Clone,
txs_verification_data: impl Iterator<Item = &TransactionVerificationData> + Clone,
hf: HardFork,
mut database: D,
) -> Result<Vec<TxRingMembersInfo>, ExtendedConsensusError> {
@ -205,22 +202,20 @@ pub async fn batch_get_ring_member_info<D: Database>(
/// This functions panics if `hf == HardFork::V1` as decoy info
/// should not be needed for V1.
#[instrument(level = "debug", skip_all)]
pub async fn batch_get_decoy_info<'a, D: Database + Clone + Send + 'static>(
txs_verification_data: &'a [Arc<TransactionVerificationData>],
pub async fn batch_get_decoy_info<'a, D: Database>(
txs_verification_data: impl Iterator<Item = &'a TransactionVerificationData> + Clone,
hf: HardFork,
mut database: D,
) -> Result<impl Iterator<Item = Result<DecoyInfo, ConsensusError>> + 'a, ExtendedConsensusError> {
) -> Result<
impl Iterator<Item = Result<DecoyInfo, ConsensusError>> + sealed::Captures<&'a ()>,
ExtendedConsensusError,
> {
// decoy info is not needed for V1.
assert_ne!(hf, HardFork::V1);
tracing::debug!(
"Retrieving decoy info for {} txs.",
txs_verification_data.len()
);
// Get all the different input amounts.
let unique_input_amounts = txs_verification_data
.iter()
.clone()
.flat_map(|tx_info| {
tx_info.tx.prefix().inputs.iter().map(|input| match input {
Input::ToKey { amount, .. } => amount.unwrap_or(0),
@ -245,7 +240,7 @@ pub async fn batch_get_decoy_info<'a, D: Database + Clone + Send + 'static>(
panic!("Database sent incorrect response!")
};
Ok(txs_verification_data.iter().map(move |tx_v_data| {
Ok(txs_verification_data.map(move |tx_v_data| {
DecoyInfo::new(
&tx_v_data.tx.prefix().inputs,
|amt| outputs_with_amount.get(&amt).copied().unwrap_or(0),
@ -254,3 +249,11 @@ pub async fn batch_get_decoy_info<'a, D: Database + Clone + Send + 'static>(
.map_err(ConsensusError::Transaction)
}))
}
mod sealed {
/// TODO: Remove me when 2024 Rust
///
/// <https://rust-lang.github.io/rfcs/3498-lifetime-capture-rules-2024.html#the-captures-trick>
pub trait Captures<U> {}
impl<T: ?Sized, U> Captures<U> for T {}
}

View file

@ -1,5 +1,3 @@
use std::sync::Mutex as StdMutex;
use monero_serai::{
ringct::{bulletproofs::Bulletproof, RctType},
transaction::{Input, Transaction},
@ -31,7 +29,7 @@ pub fn new_tx_verification_data(
tx_blob,
tx_weight,
fee,
cached_verification_state: StdMutex::new(CachedVerificationState::NotVerified),
cached_verification_state: CachedVerificationState::NotVerified,
tx,
})
}

View file

@ -9,11 +9,9 @@ use std::{
use curve25519_dalek::{constants::ED25519_BASEPOINT_POINT, edwards::CompressedEdwardsY};
use monero_serai::transaction::{Timelock, Transaction};
use tower::{service_fn, Service, ServiceExt};
use tower::service_fn;
use cuprate_consensus::{
TxVerifierService, VerifyTxRequest, VerifyTxResponse, __private::Database,
};
use cuprate_consensus::{__private::Database, transactions::start_tx_verification};
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
OutputOnChain,
@ -82,17 +80,17 @@ macro_rules! test_verify_valid_v2_tx {
let map = BTreeMap::from_iter(members);
let database = dummy_database(map);
let mut tx_verifier = TxVerifierService::new(database);
assert!(matches!(tx_verifier.ready().await.unwrap().call(
VerifyTxRequest::New {
txs: vec![Transaction::read(&mut $tx).unwrap()].into(),
current_chain_height: 10,
top_hash: [0; 32],
hf: HardFork::$hf,
time_for_time_lock: u64::MAX
}
).await.unwrap(), VerifyTxResponse::OkPrepped(_)));
assert!(
start_tx_verification()
.append_txs(
vec![Transaction::read(&mut $tx).unwrap()]
)
.prepare()
.unwrap()
.full(10, [0; 32], u64::MAX, HardFork::$hf, database.clone())
.verify()
.await.is_ok()
);
// Check verification fails if we put random ring members
@ -111,17 +109,17 @@ macro_rules! test_verify_valid_v2_tx {
let map = BTreeMap::from_iter(members);
let database = dummy_database(map);
let mut tx_verifier = TxVerifierService::new(database);
assert!(tx_verifier.ready().await.unwrap().call(
VerifyTxRequest::New {
txs: vec![Transaction::read(&mut $tx).unwrap()].into(),
current_chain_height: 10,
top_hash: [0; 32],
hf: HardFork::$hf,
time_for_time_lock: u64::MAX
}
).await.is_err());
assert!(
start_tx_verification()
.append_txs(
vec![Transaction::read(&mut $tx).unwrap()]
)
.prepare()
.unwrap()
.full(10, [0; 32], u64::MAX, HardFork::$hf, database.clone())
.verify()
.await.is_err()
);
}
};

View file

@ -1,8 +1,6 @@
//! Transaction read ops.
//!
//! This module handles reading full transaction data, like getting a transaction from the pool.
use std::sync::Mutex;
use monero_serai::transaction::Transaction;
use cuprate_database::{DatabaseRo, DbResult};
@ -34,7 +32,7 @@ pub fn get_transaction_verification_data(
tx_weight: tx_info.weight,
fee: tx_info.fee,
tx_hash: *tx_hash,
cached_verification_state: Mutex::new(cached_verification_state),
cached_verification_state,
})
}

View file

@ -48,7 +48,7 @@ pub fn add_transaction(
)?;
// Add the cached verification state to table 2.
let cached_verification_state = (*tx.cached_verification_state.lock().unwrap()).into();
let cached_verification_state = tx.cached_verification_state.into();
tables
.cached_verification_state_mut()
.put(&tx.tx_hash, &cached_verification_state)?;

View file

@ -92,7 +92,7 @@
//! // Prepare a request to write block.
//! let tx = TX_V1_SIG2.clone();
//! let request = TxpoolWriteRequest::AddTransaction {
//! tx: Arc::new(tx.try_into().unwrap()),
//! tx: Box::new(tx.try_into().unwrap()),
//! state_stem: false,
//! };
//!

View file

@ -1,10 +1,7 @@
//! Tx-pool [`service`](super) interface.
//!
//! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums.
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use std::collections::{HashMap, HashSet};
use cuprate_types::TransactionVerificationData;
@ -90,7 +87,7 @@ pub enum TxpoolWriteRequest {
/// Returns [`TxpoolWriteResponse::AddTransaction`].
AddTransaction {
/// The tx to add.
tx: Arc<TransactionVerificationData>,
tx: Box<TransactionVerificationData>,
/// A [`bool`] denoting the routing state of this tx.
///
/// [`true`] if this tx is in the stem state.

View file

@ -71,6 +71,13 @@ impl From<RawCachedVerificationState> for CachedVerificationState {
fn from(value: RawCachedVerificationState) -> Self {
// if the hash is all `0`s then there is no hash this is valid at.
if value.raw_valid_at_hash == [0; 32] {
if value.raw_hf != 0 {
return Self::OnlySemantic(
HardFork::from_version(value.raw_hf)
.expect("hard-fork values stored in the DB should always be valid"),
);
}
return Self::NotVerified;
}
@ -103,6 +110,11 @@ impl From<CachedVerificationState> for RawCachedVerificationState {
raw_hf: 0,
raw_valid_past_timestamp: [0; 8],
},
CachedVerificationState::OnlySemantic(hf) => Self {
raw_valid_at_hash: [0; 32],
raw_hf: hf.as_u8(),
raw_valid_past_timestamp: [0; 8],
},
CachedVerificationState::ValidAtHashAndHF { block_hash, hf } => Self {
raw_valid_at_hash: block_hash,
raw_hf: hf.as_u8(),

View file

@ -1,7 +1,5 @@
//! Contains [`TransactionVerificationData`] and the related types.
use std::sync::Mutex;
use monero_serai::transaction::{Timelock, Transaction};
use crate::{HardFork, VerifiedTransactionInformation};
@ -37,6 +35,8 @@ impl TxVersion {
pub enum CachedVerificationState {
/// The transaction has not been validated.
NotVerified,
/// The transaction was only validated semantically.
OnlySemantic(HardFork),
/// The transaction is valid* if the block represented by this hash is in the blockchain and the [`HardFork`]
/// is the same.
///
@ -67,7 +67,7 @@ impl CachedVerificationState {
/// Returns the block hash this is valid for if in state [`CachedVerificationState::ValidAtHashAndHF`] or [`CachedVerificationState::ValidAtHashAndHFWithTimeBasedLock`].
pub const fn verified_at_block_hash(&self) -> Option<[u8; 32]> {
match self {
Self::NotVerified => None,
Self::NotVerified | Self::OnlySemantic(_) => None,
Self::ValidAtHashAndHF { block_hash, .. }
| Self::ValidAtHashAndHFWithTimeBasedLock { block_hash, .. } => Some(*block_hash),
}
@ -75,7 +75,7 @@ impl CachedVerificationState {
}
/// Data needed to verify a transaction.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct TransactionVerificationData {
/// The transaction we are verifying
pub tx: Transaction,
@ -90,7 +90,7 @@ pub struct TransactionVerificationData {
/// The hash of this transaction.
pub tx_hash: [u8; 32],
/// The verification state of this transaction.
pub cached_verification_state: Mutex<CachedVerificationState>,
pub cached_verification_state: CachedVerificationState,
}
#[derive(Debug, Copy, Clone, thiserror::Error)]
@ -108,7 +108,7 @@ impl TryFrom<VerifiedTransactionInformation> for TransactionVerificationData {
tx_weight: value.tx_weight,
fee: value.fee,
tx_hash: value.tx_hash,
cached_verification_state: Mutex::new(CachedVerificationState::NotVerified),
cached_verification_state: CachedVerificationState::NotVerified,
})
}
}