diff --git a/Cargo.lock b/Cargo.lock index ebf29b5d..b3553dab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,6 +56,12 @@ version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "arrayref" version = "0.3.9" @@ -646,6 +652,7 @@ dependencies = [ name = "cuprate-consensus-context" version = "0.1.0" dependencies = [ + "arc-swap", "cuprate-consensus-rules", "cuprate-helper", "cuprate-types", diff --git a/Cargo.toml b/Cargo.toml index 8460f504..86470f65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,6 +100,7 @@ cuprate-zmq-types = { path = "zmq/types", default-featur # External dependencies anyhow = { version = "1", default-features = false } +arc-swap = { version = "1", default-features = false } arrayvec = { version = "0.7", default-features = false } async-trait = { version = "0.1", default-features = false } bitflags = { version = "2", default-features = false } @@ -153,7 +154,6 @@ tokio-test = { version = "0.4" } ## TODO: ## Potential dependencies. -# arc-swap = { version = "1.6.0" } # Atomically swappable Arc<T> | https://github.com/vorner/arc-swap # itoa = { version = "1.0.9" } # Fast integer to string formatting | https://github.com/dtolnay/itoa # notify = { version = "6.1.1" } # Filesystem watching | https://github.com/notify-rs/notify # once_cell = { version = "1.18.0" } # Lazy/one-time initialization | https://github.com/matklad/once_cell diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index 63bbded3..34317084 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -8,7 +8,7 @@ use tokio::sync::{mpsc, Notify}; use tower::{BoxError, Service, ServiceExt}; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; -use cuprate_consensus::{generate_genesis_block, BlockChainContextService, ContextConfig}; +use cuprate_consensus::{generate_genesis_block, BlockchainContextService, ContextConfig}; use cuprate_cryptonight::cryptonight_hash_v0; use cuprate_p2p::{block_downloader::BlockDownloaderConfig, NetworkInterface}; use cuprate_p2p_core::{ClearNet, Network}; @@ -26,9 +26,7 @@ mod syncer; mod types; pub use manager::init_blockchain_manager; -pub use types::{ - ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle, -}; +pub use types::ConsensusBlockchainReadHandle; /// Checks if the genesis block is in the blockchain and adds it if not. pub async fn check_add_genesis( @@ -81,22 +79,12 @@ pub async fn check_add_genesis( pub async fn init_consensus( blockchain_read_handle: BlockchainReadHandle, context_config: ContextConfig, -) -> Result< - ( - ConcreteBlockVerifierService, - ConcreteTxVerifierService, - BlockChainContextService, - ), - BoxError, -> { +) -> Result<BlockchainContextService, BoxError> { let read_handle = ConsensusBlockchainReadHandle::new(blockchain_read_handle, BoxError::from); let ctx_service = cuprate_consensus::initialize_blockchain_context(context_config, read_handle.clone()) .await?; - let (block_verifier_svc, tx_verifier_svc) = - cuprate_consensus::initialize_verifier(read_handle, ctx_service.clone()); - - Ok((block_verifier_svc, tx_verifier_svc, ctx_service)) + Ok(ctx_service) } diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index 2166795e..8dde7ac5 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -3,16 +3,14 @@ use std::{collections::HashMap, sync::Arc}; use futures::StreamExt; use monero_serai::block::Block; use tokio::sync::{mpsc, oneshot, Notify}; -use tower::{Service, ServiceExt}; +use tower::{BoxError, Service, ServiceExt}; use tracing::error; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_consensus::{ - BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, - BlockVerifierService, ExtendedConsensusError, TxVerifierService, VerifyBlockRequest, - VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse, + BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService, + ExtendedConsensusError, }; -use cuprate_consensus_context::RawBlockChainContext; use cuprate_p2p::{ block_downloader::{BlockBatch, BlockDownloaderConfig}, BroadcastSvc, NetworkInterface, @@ -26,10 +24,8 @@ use cuprate_types::{ use crate::{ blockchain::{ - chain_service::ChainService, - interface::COMMAND_TX, - syncer, - types::{ConcreteBlockVerifierService, ConsensusBlockchainReadHandle}, + chain_service::ChainService, interface::COMMAND_TX, syncer, + types::ConsensusBlockchainReadHandle, }, constants::PANIC_CRITICAL_SERVICE_ERROR, }; @@ -48,8 +44,7 @@ pub async fn init_blockchain_manager( blockchain_write_handle: BlockchainWriteHandle, blockchain_read_handle: BlockchainReadHandle, txpool_write_handle: TxpoolWriteHandle, - mut blockchain_context_service: BlockChainContextService, - block_verifier_service: ConcreteBlockVerifierService, + mut blockchain_context_service: BlockchainContextService, block_downloader_config: BlockDownloaderConfig, ) { // TODO: find good values for these size limits @@ -68,24 +63,14 @@ pub async fn init_blockchain_manager( block_downloader_config, )); - let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(BlockChainContextRequest::Context) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - else { - unreachable!() - }; - let manager = BlockchainManager { blockchain_write_handle, - blockchain_read_handle, + blockchain_read_handle: ConsensusBlockchainReadHandle::new( + blockchain_read_handle, + BoxError::from, + ), txpool_write_handle, blockchain_context_service, - cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(), - block_verifier_service, stop_current_block_downloader, broadcast_svc: clearnet_interface.broadcast_svc(), }; @@ -104,18 +89,12 @@ pub struct BlockchainManager { /// is held. blockchain_write_handle: BlockchainWriteHandle, /// A [`BlockchainReadHandle`]. - blockchain_read_handle: BlockchainReadHandle, + blockchain_read_handle: ConsensusBlockchainReadHandle, /// A [`TxpoolWriteHandle`]. txpool_write_handle: TxpoolWriteHandle, - // TODO: Improve the API of the cache service. - // TODO: rename the cache service -> `BlockchainContextService`. /// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve /// values without needing to go to a [`BlockchainReadHandle`]. - blockchain_context_service: BlockChainContextService, - /// A cached context representing the current state. - cached_blockchain_context: RawBlockChainContext, - /// The block verifier service, to verify incoming blocks. - block_verifier_service: ConcreteBlockVerifierService, + blockchain_context_service: BlockchainContextService, /// A [`Notify`] to tell the [syncer](syncer::syncer) that we want to cancel this current download /// attempt. stop_current_block_downloader: Arc<Notify>, diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index 382a0ff1..c73d86fb 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -13,9 +13,12 @@ use tracing::{info, instrument}; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_consensus::{ - block::PreparedBlock, transactions::new_tx_verification_data, BlockChainContextRequest, - BlockChainContextResponse, BlockVerifierService, ExtendedConsensusError, VerifyBlockRequest, - VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse, + block::{ + batch_prepare_main_chain_blocks, sanity_check_alt_block, verify_main_chain_block, + verify_prepped_main_chain_block, PreparedBlock, + }, + transactions::new_tx_verification_data, + BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError, }; use cuprate_consensus_context::NewBlockData; use cuprate_helper::cast::usize_to_u64; @@ -83,30 +86,33 @@ impl super::BlockchainManager { block: Block, prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, ) -> Result<IncomingBlockOk, anyhow::Error> { - if block.header.previous != self.cached_blockchain_context.top_hash { + if block.header.previous + != self + .blockchain_context_service + .blockchain_context() + .top_hash + { self.handle_incoming_alt_block(block, prepared_txs).await?; return Ok(IncomingBlockOk::AddedToAltChain); } - let VerifyBlockResponse::MainChain(verified_block) = self - .block_verifier_service - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(VerifyBlockRequest::MainChain { - block, - prepared_txs, - }) - .await? - else { - unreachable!(); - }; + let verified_block = verify_main_chain_block( + block, + prepared_txs, + &mut self.blockchain_context_service, + self.blockchain_read_handle.clone(), + ) + .await?; let block_blob = Bytes::copy_from_slice(&verified_block.block_blob); self.add_valid_block_to_main_chain(verified_block).await; - self.broadcast_block(block_blob, self.cached_blockchain_context.chain_height) - .await; + let chain_height = self + .blockchain_context_service + .blockchain_context() + .chain_height; + + self.broadcast_block(block_blob, chain_height).await; Ok(IncomingBlockOk::AddedToMainChain) } @@ -135,7 +141,12 @@ impl super::BlockchainManager { .first() .expect("Block batch should not be empty"); - if first_block.header.previous == self.cached_blockchain_context.top_hash { + if first_block.header.previous + == self + .blockchain_context_service + .blockchain_context() + .top_hash + { self.handle_incoming_block_batch_main_chain(batch).await; } else { self.handle_incoming_block_batch_alt_chain(batch).await; @@ -155,43 +166,27 @@ impl super::BlockchainManager { /// This function will panic if any internal service returns an unexpected error that we cannot /// recover from or if the incoming batch contains no blocks. async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) { - let batch_prep_res = self - .block_verifier_service - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(VerifyBlockRequest::MainChainBatchPrepareBlocks { - blocks: batch.blocks, - }) - .await; - - let prepped_blocks = match batch_prep_res { - Ok(VerifyBlockResponse::MainChainBatchPrepped(prepped_blocks)) => prepped_blocks, - Err(_) => { - batch.peer_handle.ban_peer(LONG_BAN); - self.stop_current_block_downloader.notify_one(); - return; - } - _ => unreachable!(), + let Ok(prepped_blocks) = + batch_prepare_main_chain_blocks(batch.blocks, &mut self.blockchain_context_service) + .await + else { + batch.peer_handle.ban_peer(LONG_BAN); + self.stop_current_block_downloader.notify_one(); + return; }; for (block, txs) in prepped_blocks { - let verify_res = self - .block_verifier_service - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(VerifyBlockRequest::MainChainPrepped { block, txs }) - .await; - - let verified_block = match verify_res { - Ok(VerifyBlockResponse::MainChain(verified_block)) => verified_block, - Err(_) => { - batch.peer_handle.ban_peer(LONG_BAN); - self.stop_current_block_downloader.notify_one(); - return; - } - _ => unreachable!(), + let Ok(verified_block) = verify_prepped_main_chain_block( + block, + txs, + &mut self.blockchain_context_service, + self.blockchain_read_handle.clone(), + ) + .await + else { + batch.peer_handle.ban_peer(LONG_BAN); + self.stop_current_block_downloader.notify_one(); + return; }; self.add_valid_block_to_main_chain(verified_block).await; @@ -272,25 +267,18 @@ impl super::BlockchainManager { block: Block, prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, ) -> Result<AddAltBlock, anyhow::Error> { - let VerifyBlockResponse::AltChain(alt_block_info) = self - .block_verifier_service - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(VerifyBlockRequest::AltChain { - block, - prepared_txs, - }) - .await? - else { - unreachable!(); - }; + let alt_block_info = + sanity_check_alt_block(block, prepared_txs, self.blockchain_context_service.clone()) + .await?; // TODO: check in consensus crate if alt block with this hash already exists. - // If this alt chain + // If this alt chain has more cumulative difficulty, reorg. if alt_block_info.cumulative_difficulty - > self.cached_blockchain_context.cumulative_difficulty + > self + .blockchain_context_service + .blockchain_context() + .cumulative_difficulty { self.try_do_reorg(alt_block_info).await?; return Ok(AddAltBlock::Reorged); @@ -335,7 +323,8 @@ impl super::BlockchainManager { .call(BlockchainReadRequest::AltBlocksInChain( top_alt_block.chain_id, )) - .await? + .await + .map_err(|e| anyhow::anyhow!(e))? else { unreachable!(); }; @@ -343,7 +332,10 @@ impl super::BlockchainManager { alt_blocks.push(top_alt_block); let split_height = alt_blocks[0].height; - let current_main_chain_height = self.cached_blockchain_context.chain_height; + let current_main_chain_height = self + .blockchain_context_service + .blockchain_context() + .chain_height; let BlockchainResponse::PopBlocks(old_main_chain_id) = self .blockchain_write_handle @@ -402,24 +394,18 @@ impl super::BlockchainManager { let prepped_txs = alt_block .txs .drain(..) - .map(|tx| Ok(Arc::new(tx.try_into()?))) + .map(|tx| Ok(tx.try_into()?)) .collect::<Result<_, anyhow::Error>>()?; let prepped_block = PreparedBlock::new_alt_block(alt_block)?; - let VerifyBlockResponse::MainChain(verified_block) = self - .block_verifier_service - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(VerifyBlockRequest::MainChainPrepped { - block: prepped_block, - txs: prepped_txs, - }) - .await? - else { - unreachable!(); - }; + let verified_block = verify_prepped_main_chain_block( + prepped_block, + prepped_txs, + &mut self.blockchain_context_service, + self.blockchain_read_handle.clone(), + ) + .await?; self.add_valid_block_to_main_chain(verified_block).await; } @@ -429,8 +415,7 @@ impl super::BlockchainManager { /// Adds a [`VerifiedBlockInformation`] to the main-chain. /// - /// This function will update the blockchain database and the context cache, it will also - /// update [`Self::cached_blockchain_context`]. + /// This function will update the blockchain database and the context cache. /// /// # Panics /// @@ -477,20 +462,6 @@ impl super::BlockchainManager { .await .expect(PANIC_CRITICAL_SERVICE_ERROR); - let BlockChainContextResponse::Context(blockchain_context) = self - .blockchain_context_service - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(BlockChainContextRequest::Context) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - else { - unreachable!(); - }; - - self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone(); - self.txpool_write_handle .ready() .await diff --git a/binaries/cuprated/src/blockchain/syncer.rs b/binaries/cuprated/src/blockchain/syncer.rs index 7733ca3d..1da5812d 100644 --- a/binaries/cuprated/src/blockchain/syncer.rs +++ b/binaries/cuprated/src/blockchain/syncer.rs @@ -9,8 +9,8 @@ use tokio::{ use tower::{Service, ServiceExt}; use tracing::instrument; -use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse}; -use cuprate_consensus_context::RawBlockChainContext; +use cuprate_consensus::{BlockChainContextRequest, BlockChainContextResponse, BlockchainContext}; +use cuprate_consensus_context::BlockchainContextService; use cuprate_p2p::{ block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}, NetworkInterface, PeerSetRequest, PeerSetResponse, @@ -30,8 +30,8 @@ pub enum SyncerError { /// The syncer tasks that makes sure we are fully synchronised with our connected peers. #[instrument(level = "debug", skip_all)] -pub async fn syncer<C, CN>( - mut context_svc: C, +pub async fn syncer<CN>( + mut context_svc: BlockchainContextService, our_chain: CN, mut clearnet_interface: NetworkInterface<ClearNet>, incoming_block_batch_tx: mpsc::Sender<BlockBatch>, @@ -39,12 +39,6 @@ pub async fn syncer<C, CN>( block_downloader_config: BlockDownloaderConfig, ) -> Result<(), SyncerError> where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - >, - C::Future: Send + 'static, CN: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError> + Clone + Send @@ -55,15 +49,6 @@ where let mut check_sync_interval = interval(CHECK_SYNC_FREQUENCY); - let BlockChainContextResponse::Context(mut blockchain_ctx) = context_svc - .ready() - .await? - .call(BlockChainContextRequest::Context) - .await? - else { - unreachable!(); - }; - tracing::debug!("Waiting for new sync info in top sync channel"); loop { @@ -71,10 +56,9 @@ where tracing::trace!("Checking connected peers to see if we are behind",); - check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?; - let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context(); + let blockchain_context = context_svc.blockchain_context(); - if !check_behind_peers(raw_blockchain_context, &mut clearnet_interface).await? { + if !check_behind_peers(blockchain_context, &mut clearnet_interface).await? { continue; } @@ -92,10 +76,9 @@ where } batch = block_batch_stream.next() => { let Some(batch) = batch else { - check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?; - let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context(); + let blockchain_context = context_svc.blockchain_context(); - if !check_behind_peers(raw_blockchain_context, &mut clearnet_interface).await? { + if !check_behind_peers(blockchain_context, &mut clearnet_interface).await? { tracing::info!("Synchronised with the network."); } @@ -114,7 +97,7 @@ where /// Returns `true` if we are behind the current connected network peers. async fn check_behind_peers( - raw_blockchain_context: &RawBlockChainContext, + blockchain_context: &BlockchainContext, mut clearnet_interface: &mut NetworkInterface<ClearNet>, ) -> Result<bool, tower::BoxError> { let PeerSetResponse::MostPoWSeen { @@ -130,38 +113,9 @@ async fn check_behind_peers( unreachable!(); }; - if cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty { + if cumulative_difficulty <= blockchain_context.cumulative_difficulty { return Ok(false); } Ok(true) } - -/// Checks if we should update the given [`BlockChainContext`] and updates it if needed. -async fn check_update_blockchain_context<C>( - context_svc: C, - old_context: &mut BlockChainContext, -) -> Result<(), tower::BoxError> -where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - >, - C::Future: Send + 'static, -{ - if old_context.blockchain_context().is_ok() { - return Ok(()); - } - - let BlockChainContextResponse::Context(ctx) = context_svc - .oneshot(BlockChainContextRequest::Context) - .await? - else { - unreachable!(); - }; - - *old_context = ctx; - - Ok(()) -} diff --git a/binaries/cuprated/src/blockchain/types.rs b/binaries/cuprated/src/blockchain/types.rs index 54e46621..6bad0fdc 100644 --- a/binaries/cuprated/src/blockchain/types.rs +++ b/binaries/cuprated/src/blockchain/types.rs @@ -1,17 +1,6 @@ use tower::util::MapErr; use cuprate_blockchain::{cuprate_database::RuntimeError, service::BlockchainReadHandle}; -use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService}; - -/// The [`BlockVerifierService`] with all generic types defined. -pub type ConcreteBlockVerifierService = BlockVerifierService< - BlockChainContextService, - ConcreteTxVerifierService, - ConsensusBlockchainReadHandle, ->; - -/// The [`TxVerifierService`] with all generic types defined. -pub type ConcreteTxVerifierService = TxVerifierService<ConsensusBlockchainReadHandle>; /// The [`BlockchainReadHandle`] with the [`tower::Service::Error`] mapped to conform to what the consensus crate requires. pub type ConsensusBlockchainReadHandle = diff --git a/binaries/cuprated/src/commands.rs b/binaries/cuprated/src/commands.rs index 5958036c..e99282be 100644 --- a/binaries/cuprated/src/commands.rs +++ b/binaries/cuprated/src/commands.rs @@ -9,7 +9,7 @@ use tower::{Service, ServiceExt}; use tracing::level_filters::LevelFilter; use cuprate_consensus_context::{ - BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, + BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService, }; use cuprate_helper::time::secs_to_hms; @@ -87,7 +87,7 @@ pub fn command_listener(incoming_commands: mpsc::Sender<Command>) -> ! { /// The [`Command`] handler loop. pub async fn io_loop( mut incoming_commands: mpsc::Receiver<Command>, - mut context_service: BlockChainContextService, + mut context_service: BlockchainContextService, ) { loop { let Some(command) = incoming_commands.recv().await else { @@ -113,17 +113,8 @@ pub async fn io_loop( } } Command::Status => { - let BlockChainContextResponse::Context(blockchain_context) = context_service - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(BlockChainContextRequest::Context) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - else { - unreachable!(); - }; - let context = blockchain_context.unchecked_blockchain_context(); + let context = context_service.blockchain_context(); + let uptime = statics::START_INSTANT.elapsed().unwrap_or_default(); let (h, m, s) = secs_to_hms(uptime.as_secs()); diff --git a/binaries/cuprated/src/main.rs b/binaries/cuprated/src/main.rs index 69b1bc95..21eb07d5 100644 --- a/binaries/cuprated/src/main.rs +++ b/binaries/cuprated/src/main.rs @@ -24,7 +24,7 @@ use tracing::level_filters::LevelFilter; use tracing_subscriber::{layer::SubscriberExt, reload::Handle, util::SubscriberInitExt, Registry}; use cuprate_consensus_context::{ - BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, + BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService, }; use cuprate_helper::time::secs_to_hms; @@ -90,7 +90,7 @@ fn main() { .await; // Start the context service and the block/tx verifier. - let (block_verifier, tx_verifier, context_svc) = + let context_svc = blockchain::init_consensus(blockchain_read_handle.clone(), config.context_config()) .await .unwrap(); @@ -111,7 +111,7 @@ fn main() { txpool_write_handle.clone(), txpool_read_handle, context_svc.clone(), - tx_verifier, + blockchain_read_handle.clone(), ); if incoming_tx_handler_tx.send(tx_handler).is_err() { unreachable!() @@ -124,7 +124,6 @@ fn main() { blockchain_read_handle, txpool_write_handle, context_svc.clone(), - block_verifier, config.block_downloader_config(), ) .await; diff --git a/binaries/cuprated/src/p2p.rs b/binaries/cuprated/src/p2p.rs index aeb98b60..c5afcd50 100644 --- a/binaries/cuprated/src/p2p.rs +++ b/binaries/cuprated/src/p2p.rs @@ -6,7 +6,7 @@ use tokio::sync::oneshot; use tower::ServiceExt; use cuprate_blockchain::service::BlockchainReadHandle; -use cuprate_consensus::BlockChainContextService; +use cuprate_consensus::BlockchainContextService; use cuprate_p2p::{NetworkInterface, P2PConfig}; use cuprate_p2p_core::ClearNet; use cuprate_txpool::service::TxpoolReadHandle; @@ -25,7 +25,7 @@ pub use network_address::CrossNetworkInternalPeerId; /// handshakes can not be completed. pub async fn start_clearnet_p2p( blockchain_read_handle: BlockchainReadHandle, - blockchain_context_service: BlockChainContextService, + blockchain_context_service: BlockchainContextService, txpool_read_handle: TxpoolReadHandle, config: P2PConfig<ClearNet>, ) -> Result< diff --git a/binaries/cuprated/src/p2p/core_sync_service.rs b/binaries/cuprated/src/p2p/core_sync_service.rs index d3c3ca1e..55975d52 100644 --- a/binaries/cuprated/src/p2p/core_sync_service.rs +++ b/binaries/cuprated/src/p2p/core_sync_service.rs @@ -1,10 +1,13 @@ -use std::task::{Context, Poll}; +use std::{ + future::{ready, Ready}, + task::{Context, Poll}, +}; use futures::{future::BoxFuture, FutureExt, TryFutureExt}; use tower::Service; use cuprate_consensus::{ - BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, + BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService, }; use cuprate_helper::{cast::usize_to_u64, map::split_u128_into_low_high_bits}; use cuprate_p2p_core::services::{CoreSyncDataRequest, CoreSyncDataResponse}; @@ -12,38 +15,30 @@ use cuprate_wire::CoreSyncData; /// The core sync service. #[derive(Clone)] -pub struct CoreSyncService(pub BlockChainContextService); +pub struct CoreSyncService(pub BlockchainContextService); impl Service<CoreSyncDataRequest> for CoreSyncService { type Response = CoreSyncDataResponse; type Error = tower::BoxError; - type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; + type Future = Ready<Result<Self::Response, Self::Error>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.0.poll_ready(cx) + Poll::Ready(Ok(())) } fn call(&mut self, _: CoreSyncDataRequest) -> Self::Future { - self.0 - .call(BlockChainContextRequest::Context) - .map_ok(|res| { - let BlockChainContextResponse::Context(context) = res else { - unreachable!() - }; + let context = self.0.blockchain_context(); - let context = context.unchecked_blockchain_context(); - let (cumulative_difficulty, cumulative_difficulty_top64) = - split_u128_into_low_high_bits(context.cumulative_difficulty); + let (cumulative_difficulty, cumulative_difficulty_top64) = + split_u128_into_low_high_bits(context.cumulative_difficulty); - CoreSyncDataResponse(CoreSyncData { - cumulative_difficulty, - cumulative_difficulty_top64, - current_height: usize_to_u64(context.chain_height), - pruning_seed: 0, - top_id: context.top_hash, - top_version: context.current_hf.as_u8(), - }) - }) - .boxed() + ready(Ok(CoreSyncDataResponse(CoreSyncData { + cumulative_difficulty, + cumulative_difficulty_top64, + current_height: usize_to_u64(context.chain_height), + pruning_seed: 0, + top_id: context.top_hash, + top_version: context.current_hf.as_u8(), + }))) } } diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 620c9df3..b7c9a930 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -18,7 +18,7 @@ use tower::{Service, ServiceExt}; use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_consensus::{ transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse, - BlockChainContextService, + BlockchainContextService, }; use cuprate_dandelion_tower::TxState; use cuprate_fixed_bytes::ByteArrayVec; @@ -56,7 +56,7 @@ use crate::{ #[derive(Clone)] pub struct P2pProtocolRequestHandlerMaker { pub blockchain_read_handle: BlockchainReadHandle, - pub blockchain_context_service: BlockChainContextService, + pub blockchain_context_service: BlockchainContextService, pub txpool_read_handle: TxpoolReadHandle, /// The [`IncomingTxHandler`], wrapped in an [`Option`] as there is a cyclic reference between [`P2pProtocolRequestHandlerMaker`] @@ -114,7 +114,7 @@ where pub struct P2pProtocolRequestHandler<N: NetZoneAddress> { peer_information: PeerInformation<N>, blockchain_read_handle: BlockchainReadHandle, - blockchain_context_service: BlockChainContextService, + blockchain_context_service: BlockchainContextService, txpool_read_handle: TxpoolReadHandle, incoming_tx_handler: IncomingTxHandler, } @@ -366,25 +366,14 @@ async fn new_fluffy_block<A: NetZoneAddress>( async fn new_transactions<A>( peer_information: PeerInformation<A>, request: NewTransactions, - mut blockchain_context_service: BlockChainContextService, + mut blockchain_context_service: BlockchainContextService, mut incoming_tx_handler: IncomingTxHandler, ) -> anyhow::Result<ProtocolResponse> where A: NetZoneAddress, InternalPeerID<A>: Into<CrossNetworkInternalPeerId>, { - let BlockChainContextResponse::Context(context) = blockchain_context_service - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(BlockChainContextRequest::Context) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - else { - unreachable!() - }; - - let context = context.unchecked_blockchain_context(); + let context = blockchain_context_service.blockchain_context(); // If we are more than 2 blocks behind the peer then ignore the txs - we are probably still syncing. if usize_to_u64(context.chain_height + 2) diff --git a/binaries/cuprated/src/rpc/handler.rs b/binaries/cuprated/src/rpc/handler.rs index 1f73403b..c16248d9 100644 --- a/binaries/cuprated/src/rpc/handler.rs +++ b/binaries/cuprated/src/rpc/handler.rs @@ -8,7 +8,7 @@ use monero_serai::block::Block; use tower::Service; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; -use cuprate_consensus::BlockChainContextService; +use cuprate_consensus::BlockchainContextService; use cuprate_pruning::PruningSeed; use cuprate_rpc_interface::RpcHandler; use cuprate_rpc_types::{ @@ -148,7 +148,7 @@ pub struct CupratedRpcHandler { pub blockchain_read: BlockchainReadHandle, /// Handle to the blockchain context service. - pub blockchain_context: BlockChainContextService, + pub blockchain_context: BlockchainContextService, /// Handle to the blockchain manager. pub blockchain_manager: BlockchainManagerHandle, @@ -165,7 +165,7 @@ impl CupratedRpcHandler { pub const fn new( restricted: bool, blockchain_read: BlockchainReadHandle, - blockchain_context: BlockChainContextService, + blockchain_context: BlockchainContextService, blockchain_manager: BlockchainManagerHandle, txpool_read: TxpoolReadHandle, txpool_manager: std::convert::Infallible, diff --git a/binaries/cuprated/src/rpc/request/blockchain_context.rs b/binaries/cuprated/src/rpc/request/blockchain_context.rs index c6f0f225..c34019fd 100644 --- a/binaries/cuprated/src/rpc/request/blockchain_context.rs +++ b/binaries/cuprated/src/rpc/request/blockchain_context.rs @@ -7,35 +7,26 @@ use monero_serai::block::Block; use tower::{Service, ServiceExt}; use cuprate_consensus_context::{ - BlockChainContext, BlockChainContextRequest, BlockChainContextResponse, - BlockChainContextService, + BlockChainContextRequest, BlockChainContextResponse, BlockchainContext, + BlockchainContextService, }; use cuprate_helper::cast::u64_to_usize; use cuprate_types::{FeeEstimate, HardFork, HardForkInfo}; // FIXME: use `anyhow::Error` over `tower::BoxError` in blockchain context. -/// [`BlockChainContextRequest::Context`]. pub(crate) async fn context( - blockchain_context: &mut BlockChainContextService, -) -> Result<BlockChainContext, Error> { - let BlockChainContextResponse::Context(context) = blockchain_context - .ready() - .await - .map_err(|e| anyhow!(e))? - .call(BlockChainContextRequest::Context) - .await - .map_err(|e| anyhow!(e))? - else { - unreachable!(); - }; + blockchain_context: &mut BlockchainContextService, +) -> Result<BlockchainContext, Error> { + // TODO: Remove this whole function just call directly in all usages. + let context = blockchain_context.blockchain_context().clone(); Ok(context) } /// [`BlockChainContextRequest::HardForkInfo`]. pub(crate) async fn hard_fork_info( - blockchain_context: &mut BlockChainContextService, + blockchain_context: &mut BlockchainContextService, hard_fork: HardFork, ) -> Result<HardForkInfo, Error> { let BlockChainContextResponse::HardForkInfo(hf_info) = blockchain_context @@ -54,7 +45,7 @@ pub(crate) async fn hard_fork_info( /// [`BlockChainContextRequest::FeeEstimate`]. pub(crate) async fn fee_estimate( - blockchain_context: &mut BlockChainContextService, + blockchain_context: &mut BlockchainContextService, grace_blocks: u64, ) -> Result<FeeEstimate, Error> { let BlockChainContextResponse::FeeEstimate(fee) = blockchain_context @@ -73,7 +64,7 @@ pub(crate) async fn fee_estimate( /// [`BlockChainContextRequest::CalculatePow`] pub(crate) async fn calculate_pow( - blockchain_context: &mut BlockChainContextService, + blockchain_context: &mut BlockchainContextService, hardfork: HardFork, height: u64, block: Box<Block>, diff --git a/binaries/cuprated/src/txpool.rs b/binaries/cuprated/src/txpool.rs index 20769567..53fd733f 100644 --- a/binaries/cuprated/src/txpool.rs +++ b/binaries/cuprated/src/txpool.rs @@ -1,13 +1,11 @@ //! Transaction Pool //! //! Handles initiating the tx-pool, providing the preprocessor required for the dandelion pool. -use cuprate_consensus::BlockChainContextService; +use cuprate_consensus::BlockchainContextService; use cuprate_p2p::NetworkInterface; use cuprate_p2p_core::ClearNet; use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; -use crate::blockchain::ConcreteTxVerifierService; - mod dandelion; mod incoming_tx; mod txs_being_handled; diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index bf7684e4..93d47f19 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -7,11 +7,13 @@ use std::{ use bytes::Bytes; use futures::{future::BoxFuture, FutureExt}; use monero_serai::transaction::Transaction; -use tower::{Service, ServiceExt}; +use tower::{BoxError, Service, ServiceExt}; +use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_consensus::transactions::{start_tx_verification, PrepTransactions}; use cuprate_consensus::{ transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse, - BlockChainContextService, ExtendedConsensusError, VerifyTxRequest, + BlockchainContextService, ExtendedConsensusError, }; use cuprate_dandelion_tower::{ pool::{DandelionPoolService, IncomingTxBuilder}, @@ -32,7 +34,7 @@ use cuprate_txpool::{ use cuprate_types::TransactionVerificationData; use crate::{ - blockchain::ConcreteTxVerifierService, + blockchain::ConsensusBlockchainReadHandle, constants::PANIC_CRITICAL_SERVICE_ERROR, p2p::CrossNetworkInternalPeerId, signals::REORG_LOCK, @@ -76,16 +78,16 @@ pub struct IncomingTxHandler { /// A store of txs currently being handled in incoming tx requests. pub(super) txs_being_handled: TxsBeingHandled, /// The blockchain context cache. - pub(super) blockchain_context_cache: BlockChainContextService, + pub(super) blockchain_context_cache: BlockchainContextService, /// The dandelion txpool manager. pub(super) dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>, - /// The transaction verifier service. - pub(super) tx_verifier_service: ConcreteTxVerifierService, /// The txpool write handle. pub(super) txpool_write_handle: TxpoolWriteHandle, /// The txpool read handle. pub(super) txpool_read_handle: TxpoolReadHandle, + /// The blockchain read handle. + pub(super) blockchain_read_handle: ConsensusBlockchainReadHandle, } impl IncomingTxHandler { @@ -95,8 +97,8 @@ impl IncomingTxHandler { clear_net: NetworkInterface<ClearNet>, txpool_write_handle: TxpoolWriteHandle, txpool_read_handle: TxpoolReadHandle, - blockchain_context_cache: BlockChainContextService, - tx_verifier_service: ConcreteTxVerifierService, + blockchain_context_cache: BlockchainContextService, + blockchain_read_handle: BlockchainReadHandle, ) -> Self { let dandelion_router = dandelion::dandelion_router(clear_net); @@ -110,9 +112,12 @@ impl IncomingTxHandler { txs_being_handled: TxsBeingHandled::new(), blockchain_context_cache, dandelion_pool_manager, - tx_verifier_service, txpool_write_handle, txpool_read_handle, + blockchain_read_handle: ConsensusBlockchainReadHandle::new( + blockchain_read_handle, + BoxError::from, + ), } } } @@ -131,7 +136,7 @@ impl Service<IncomingTxs> for IncomingTxHandler { req, self.txs_being_handled.clone(), self.blockchain_context_cache.clone(), - self.tx_verifier_service.clone(), + self.blockchain_read_handle.clone(), self.txpool_write_handle.clone(), self.txpool_read_handle.clone(), self.dandelion_pool_manager.clone(), @@ -144,8 +149,8 @@ impl Service<IncomingTxs> for IncomingTxHandler { async fn handle_incoming_txs( IncomingTxs { txs, state }: IncomingTxs, txs_being_handled: TxsBeingHandled, - mut blockchain_context_cache: BlockChainContextService, - mut tx_verifier_service: ConcreteTxVerifierService, + mut blockchain_context_cache: BlockchainContextService, + blockchain_read_handle: ConsensusBlockchainReadHandle, mut txpool_write_handle: TxpoolWriteHandle, mut txpool_read_handle: TxpoolReadHandle, mut dandelion_pool_manager: DandelionPoolService<DandelionTx, TxId, CrossNetworkInternalPeerId>, @@ -155,30 +160,20 @@ async fn handle_incoming_txs( let (txs, stem_pool_txs, txs_being_handled_guard) = prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?; - let BlockChainContextResponse::Context(context) = blockchain_context_cache - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(BlockChainContextRequest::Context) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - else { - unreachable!() - }; + let context = blockchain_context_cache.blockchain_context(); - let context = context.unchecked_blockchain_context(); - - tx_verifier_service - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(VerifyTxRequest::Prepped { - txs: txs.clone(), - current_chain_height: context.chain_height, - top_hash: context.top_hash, - time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), - hf: context.current_hf, - }) + let txs = start_tx_verification() + .append_prepped_txs(txs) + .prepare() + .map_err(|e| IncomingTxError::Consensus(e.into()))? + .full( + context.chain_height, + context.top_hash, + context.current_adjusted_timestamp_for_time_lock(), + context.current_hf, + blockchain_read_handle, + ) + .verify() .await .map_err(IncomingTxError::Consensus)?; @@ -220,7 +215,7 @@ async fn prepare_incoming_txs( txpool_read_handle: &mut TxpoolReadHandle, ) -> Result< ( - Vec<Arc<TransactionVerificationData>>, + Vec<TransactionVerificationData>, Vec<TxId>, TxsBeingHandledLocally, ), @@ -282,7 +277,7 @@ async fn prepare_incoming_txs( let tx = new_tx_verification_data(tx) .map_err(|e| IncomingTxError::Consensus(e.into()))?; - Ok(Arc::new(tx)) + Ok(tx) }) .collect::<Result<Vec<_>, IncomingTxError>>()?; @@ -295,7 +290,7 @@ async fn prepare_incoming_txs( /// /// This will add the tx to the txpool and route it to the network. async fn handle_valid_tx( - tx: Arc<TransactionVerificationData>, + tx: TransactionVerificationData, state: TxState<CrossNetworkInternalPeerId>, txpool_write_handle: &mut TxpoolWriteHandle, dandelion_pool_manager: &mut DandelionPoolService< @@ -312,7 +307,7 @@ async fn handle_valid_tx( .await .expect(PANIC_CRITICAL_SERVICE_ERROR) .call(TxpoolWriteRequest::AddTransaction { - tx, + tx: Box::new(tx), state_stem: state.is_stem_stage(), }) .await diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 8b732a07..076e0ad5 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -17,7 +17,6 @@ cfg-if = { workspace = true } thiserror = { workspace = true } tower = { workspace = true, features = ["util"] } tracing = { workspace = true, features = ["std", "attributes"] } -futures = { workspace = true, features = ["std", "async-await"] } monero-serai = { workspace = true, features = ["std"] } @@ -35,6 +34,7 @@ hex-literal = { workspace = true } curve25519-dalek = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros"]} +futures = { workspace = true, features = ["std", "async-await"] } tokio-test = { workspace = true } proptest = { workspace = true } proptest-derive = { workspace = true } diff --git a/consensus/context/Cargo.toml b/consensus/context/Cargo.toml index 76790464..fb5d8a8a 100644 --- a/consensus/context/Cargo.toml +++ b/consensus/context/Cargo.toml @@ -10,6 +10,7 @@ cuprate-consensus-rules = { workspace = true, features = ["proptest"]} cuprate-helper = { workspace = true, default-features = false, features = ["std", "cast", "num", "asynch"] } cuprate-types = { workspace = true, default-features = false, features = ["blockchain"] } +arc-swap = { workspace = true } futures = { workspace = true, features = ["std", "async-await"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros"]} tokio-util = { workspace = true } diff --git a/consensus/context/src/lib.rs b/consensus/context/src/lib.rs index acc4d23d..adb33383 100644 --- a/consensus/context/src/lib.rs +++ b/consensus/context/src/lib.rs @@ -1,6 +1,6 @@ //! # Blockchain Context //! -//! This crate contains a service to get cached context from the blockchain: [`BlockChainContext`]. +//! This crate contains a service to get cached context from the blockchain: [`BlockchainContext`]. //! This is used during contextual validation, this does not have all the data for contextual validation //! (outputs) for that you will need a [`Database`]. @@ -17,6 +17,7 @@ use std::{ task::{Context, Poll}, }; +use arc_swap::Cache; use futures::{channel::oneshot, FutureExt}; use monero_serai::block::Block; use tokio::sync::mpsc; @@ -34,7 +35,6 @@ pub mod weight; mod alt_chains; mod task; -mod tokens; use cuprate_types::{Chain, ChainInfo, FeeEstimate, HardForkInfo}; use difficulty::DifficultyCache; @@ -44,7 +44,6 @@ use weight::BlockWeightsCache; pub use alt_chains::{sealed::AltChainRequestToken, AltChainContextCache}; pub use difficulty::DifficultyCacheConfig; pub use hardforks::HardForkConfig; -pub use tokens::*; pub use weight::BlockWeightsCacheConfig; pub const BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW: u64 = 60; @@ -96,27 +95,29 @@ impl ContextConfig { pub async fn initialize_blockchain_context<D>( cfg: ContextConfig, database: D, -) -> Result<BlockChainContextService, ContextCacheError> +) -> Result<BlockchainContextService, ContextCacheError> where D: Database + Clone + Send + Sync + 'static, D::Future: Send + 'static, { - let context_task = task::ContextTask::init_context(cfg, database).await?; + let (context_task, context_cache) = task::ContextTask::init_context(cfg, database).await?; // TODO: make buffer size configurable. let (tx, rx) = mpsc::channel(15); tokio::spawn(context_task.run(rx)); - Ok(BlockChainContextService { + Ok(BlockchainContextService { + cached_context: Cache::new(context_cache), + channel: PollSender::new(tx), }) } -/// Raw blockchain context, gotten from [`BlockChainContext`]. This data may turn invalid so is not ok to keep -/// around. You should keep around [`BlockChainContext`] instead. -#[derive(Debug, Clone)] -pub struct RawBlockChainContext { +/// Raw blockchain context, gotten from [`BlockchainContext`]. This data may turn invalid so is not ok to keep +/// around. You should keep around [`BlockchainContext`] instead. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct BlockchainContext { /// The current cumulative difficulty. pub cumulative_difficulty: u128, /// Context to verify a block, as needed by [`cuprate-consensus-rules`] @@ -127,14 +128,14 @@ pub struct RawBlockChainContext { top_block_timestamp: Option<u64>, } -impl std::ops::Deref for RawBlockChainContext { +impl std::ops::Deref for BlockchainContext { type Target = ContextToVerifyBlock; fn deref(&self) -> &Self::Target { &self.context_to_verify_block } } -impl RawBlockChainContext { +impl BlockchainContext { /// Returns the timestamp the should be used when checking locked outputs. /// /// ref: <https://cuprate.github.io/monero-book/consensus_rules/transactions/unlock_time.html#getting-the-current-time> @@ -167,40 +168,6 @@ impl RawBlockChainContext { } } -/// Blockchain context which keeps a token of validity so users will know when the data is no longer valid. -#[derive(Debug, Clone)] -pub struct BlockChainContext { - /// A token representing this data's validity. - validity_token: ValidityToken, - /// The actual block chain context. - raw: RawBlockChainContext, -} - -#[derive(Debug, Clone, Copy, thiserror::Error)] -#[error("data is no longer valid")] -pub struct DataNoLongerValid; - -impl BlockChainContext { - /// Checks if the data is still valid. - pub fn is_still_valid(&self) -> bool { - self.validity_token.is_data_valid() - } - - /// Checks if the data is valid returning an Err if not and a reference to the blockchain context if - /// it is. - pub fn blockchain_context(&self) -> Result<&RawBlockChainContext, DataNoLongerValid> { - if !self.is_still_valid() { - return Err(DataNoLongerValid); - } - Ok(&self.raw) - } - - /// Returns the blockchain context without checking the validity token. - pub const fn unchecked_blockchain_context(&self) -> &RawBlockChainContext { - &self.raw - } -} - /// Data needed from a new block to add it to the context cache. #[derive(Debug, Clone)] pub struct NewBlockData { @@ -225,9 +192,6 @@ pub struct NewBlockData { /// A request to the blockchain context cache. #[derive(Debug, Clone)] pub enum BlockChainContextRequest { - /// Get the current blockchain context. - Context, - /// Gets all the current `RandomX` VMs. CurrentRxVms, @@ -363,9 +327,6 @@ pub enum BlockChainContextResponse { /// - [`BlockChainContextRequest::AddAltChainContextCache`] Ok, - /// Response to [`BlockChainContextRequest::Context`] - Context(BlockChainContext), - /// Response to [`BlockChainContextRequest::CurrentRxVms`] /// /// A map of seed height to `RandomX` VMs. @@ -403,11 +364,20 @@ pub enum BlockChainContextResponse { /// The blockchain context service. #[derive(Clone)] -pub struct BlockChainContextService { +pub struct BlockchainContextService { + cached_context: Cache<Arc<arc_swap::ArcSwap<BlockchainContext>>, Arc<BlockchainContext>>, + channel: PollSender<task::ContextTaskRequest>, } -impl Service<BlockChainContextRequest> for BlockChainContextService { +impl BlockchainContextService { + /// Get the current [`BlockchainContext`] from the cache. + pub fn blockchain_context(&mut self) -> &BlockchainContext { + self.cached_context.load() + } +} + +impl Service<BlockChainContextRequest> for BlockchainContextService { type Response = BlockChainContextResponse; type Error = tower::BoxError; type Future = diff --git a/consensus/context/src/task.rs b/consensus/context/src/task.rs index b0759952..327a3e06 100644 --- a/consensus/context/src/task.rs +++ b/consensus/context/src/task.rs @@ -3,6 +3,9 @@ //! This module contains the async task that handles keeping track of blockchain context. //! It holds all the context caches and handles [`tower::Service`] requests. //! +use std::sync::Arc; + +use arc_swap::ArcSwap; use futures::channel::oneshot; use tokio::sync::mpsc; use tower::ServiceExt; @@ -12,14 +15,17 @@ use cuprate_consensus_rules::blocks::ContextToVerifyBlock; use cuprate_helper::cast::u64_to_usize; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, - Chain, + Chain, HardFork, }; use crate::{ alt_chains::{get_alt_chain_difficulty_cache, get_alt_chain_weight_cache, AltChainMap}, - difficulty, hardforks, rx_vms, weight, BlockChainContext, BlockChainContextRequest, - BlockChainContextResponse, ContextCacheError, ContextConfig, Database, RawBlockChainContext, - ValidityToken, BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW, + difficulty::DifficultyCache, + hardforks::HardForkState, + rx_vms, + weight::BlockWeightsCache, + BlockChainContextRequest, BlockChainContextResponse, BlockchainContext, ContextCacheError, + ContextConfig, Database, BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW, }; /// A request from the context service to the context task. @@ -34,18 +40,16 @@ pub(super) struct ContextTaskRequest { /// The Context task that keeps the blockchain context and handles requests. pub(crate) struct ContextTask<D: Database> { - /// A token used to invalidate previous contexts when a new - /// block is added to the chain. - current_validity_token: ValidityToken, + context_cache: Arc<ArcSwap<BlockchainContext>>, /// The difficulty cache. - difficulty_cache: difficulty::DifficultyCache, + difficulty_cache: DifficultyCache, /// The weight cache. - weight_cache: weight::BlockWeightsCache, + weight_cache: BlockWeightsCache, /// The RX VM cache. rx_vm_cache: rx_vms::RandomXVmCache, /// The hard-fork state cache. - hardfork_state: hardforks::HardForkState, + hardfork_state: HardForkState, alt_chain_cache_map: AltChainMap, @@ -65,7 +69,7 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> { pub(crate) async fn init_context( cfg: ContextConfig, mut database: D, - ) -> Result<Self, ContextCacheError> { + ) -> Result<(Self, Arc<ArcSwap<BlockchainContext>>), ContextCacheError> { let ContextConfig { difficulty_cfg, weights_config, @@ -94,29 +98,19 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> { let db = database.clone(); let hardfork_state_handle = tokio::spawn(async move { - hardforks::HardForkState::init_from_chain_height(chain_height, hard_fork_cfg, db).await + HardForkState::init_from_chain_height(chain_height, hard_fork_cfg, db).await }); let db = database.clone(); let difficulty_cache_handle = tokio::spawn(async move { - difficulty::DifficultyCache::init_from_chain_height( - chain_height, - difficulty_cfg, - db, - Chain::Main, - ) - .await + DifficultyCache::init_from_chain_height(chain_height, difficulty_cfg, db, Chain::Main) + .await }); let db = database.clone(); let weight_cache_handle = tokio::spawn(async move { - weight::BlockWeightsCache::init_from_chain_height( - chain_height, - weights_config, - db, - Chain::Main, - ) - .await + BlockWeightsCache::init_from_chain_height(chain_height, weights_config, db, Chain::Main) + .await }); // Wait for the hardfork state to finish first as we need it to start the randomX VM cache. @@ -128,10 +122,24 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> { rx_vms::RandomXVmCache::init_from_chain_height(chain_height, ¤t_hf, db).await }); + let difficulty_cache = difficulty_cache_handle.await.unwrap()?; + let weight_cache = weight_cache_handle.await.unwrap()?; + + let blockchain_context = blockchain_context( + &weight_cache, + &difficulty_cache, + current_hf, + top_block_hash, + chain_height, + already_generated_coins, + ); + + let context_cache = Arc::new(ArcSwap::from_pointee(blockchain_context)); + let context_svc = Self { - current_validity_token: ValidityToken::new(), - difficulty_cache: difficulty_cache_handle.await.unwrap()?, - weight_cache: weight_cache_handle.await.unwrap()?, + context_cache: Arc::clone(&context_cache), + difficulty_cache, + weight_cache, rx_vm_cache: rx_seed_handle.await.unwrap()?, hardfork_state, alt_chain_cache_map: AltChainMap::new(), @@ -141,7 +149,20 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> { database, }; - Ok(context_svc) + Ok((context_svc, context_cache)) + } + + fn update_blockchain_context(&self) { + let context = blockchain_context( + &self.weight_cache, + &self.difficulty_cache, + self.hardfork_state.current_hardfork(), + self.top_block_hash, + self.chain_height, + self.already_generated_coins, + ); + + self.context_cache.store(Arc::new(context)); } /// Handles a [`BlockChainContextRequest`] and returns a [`BlockChainContextResponse`]. @@ -150,36 +171,6 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> { req: BlockChainContextRequest, ) -> Result<BlockChainContextResponse, tower::BoxError> { Ok(match req { - BlockChainContextRequest::Context => { - tracing::debug!("Getting blockchain context"); - - let current_hf = self.hardfork_state.current_hardfork(); - - BlockChainContextResponse::Context(BlockChainContext { - validity_token: self.current_validity_token.clone(), - raw: RawBlockChainContext { - context_to_verify_block: ContextToVerifyBlock { - median_weight_for_block_reward: self - .weight_cache - .median_for_block_reward(current_hf), - effective_median_weight: self - .weight_cache - .effective_median_block_weight(current_hf), - top_hash: self.top_block_hash, - median_block_timestamp: self - .difficulty_cache - .median_timestamp(u64_to_usize(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW)), - chain_height: self.chain_height, - current_hf, - next_difficulty: self.difficulty_cache.next_difficulty(current_hf), - already_generated_coins: self.already_generated_coins, - }, - cumulative_difficulty: self.difficulty_cache.cumulative_difficulty(), - median_long_term_weight: self.weight_cache.median_long_term_weight(), - top_block_timestamp: self.difficulty_cache.top_block_timestamp(), - }, - }) - } BlockChainContextRequest::CurrentRxVms => { BlockChainContextResponse::RxVms(self.rx_vm_cache.get_vms().await) } @@ -202,9 +193,6 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> { "Updating blockchain cache with new block, height: {}", new.height ); - // Cancel the validity token and replace it with a new one. - std::mem::replace(&mut self.current_validity_token, ValidityToken::new()) - .set_data_invalid(); self.difficulty_cache.new_block( new.height, @@ -225,6 +213,8 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> { .already_generated_coins .saturating_add(new.generated_coins); + self.update_blockchain_context(); + BlockChainContextResponse::Ok } BlockChainContextRequest::PopBlocks { numb_blocks } => { @@ -272,8 +262,7 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> { self.already_generated_coins = already_generated_coins; self.top_block_hash = top_block_hash; - std::mem::replace(&mut self.current_validity_token, ValidityToken::new()) - .set_data_invalid(); + self.update_blockchain_context(); BlockChainContextResponse::Ok } @@ -342,3 +331,30 @@ impl<D: Database + Clone + Send + 'static> ContextTask<D> { tracing::info!("Shutting down blockchain context task."); } } + +fn blockchain_context( + weight_cache: &BlockWeightsCache, + difficulty_cache: &DifficultyCache, + + current_hf: HardFork, + top_hash: [u8; 32], + chain_height: usize, + already_generated_coins: u64, +) -> BlockchainContext { + BlockchainContext { + context_to_verify_block: ContextToVerifyBlock { + median_weight_for_block_reward: weight_cache.median_for_block_reward(current_hf), + effective_median_weight: weight_cache.effective_median_block_weight(current_hf), + top_hash, + median_block_timestamp: difficulty_cache + .median_timestamp(u64_to_usize(BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW)), + chain_height, + current_hf, + next_difficulty: difficulty_cache.next_difficulty(current_hf), + already_generated_coins, + }, + cumulative_difficulty: difficulty_cache.cumulative_difficulty(), + median_long_term_weight: weight_cache.median_long_term_weight(), + top_block_timestamp: difficulty_cache.top_block_timestamp(), + } +} diff --git a/consensus/context/src/tokens.rs b/consensus/context/src/tokens.rs deleted file mode 100644 index d2223039..00000000 --- a/consensus/context/src/tokens.rs +++ /dev/null @@ -1,33 +0,0 @@ -//! Tokens -//! -//! This module contains tokens which keep track of the validity of certain data. -//! Currently, there is 1 token: -//! - [`ValidityToken`] -//! - -use tokio_util::sync::CancellationToken; - -/// A token representing if a piece of data is valid. -#[derive(Debug, Clone, Default)] -pub struct ValidityToken { - token: CancellationToken, -} - -impl ValidityToken { - /// Creates a new [`ValidityToken`] - pub fn new() -> Self { - Self { - token: CancellationToken::new(), - } - } - - /// Returns `true` if the data is still valid. - pub fn is_data_valid(&self) -> bool { - !self.token.is_cancelled() - } - - /// Sets the data to invalid. - pub fn set_data_invalid(self) { - self.token.cancel(); - } -} diff --git a/consensus/fast-sync/src/fast_sync.rs b/consensus/fast-sync/src/fast_sync.rs index 3764e217..6016bb0c 100644 --- a/consensus/fast-sync/src/fast_sync.rs +++ b/consensus/fast-sync/src/fast_sync.rs @@ -10,10 +10,10 @@ use monero_serai::{ block::Block, transaction::{Input, Transaction}, }; -use tower::{Service, ServiceExt}; +use tower::Service; use cuprate_consensus::transactions::new_tx_verification_data; -use cuprate_consensus_context::{BlockChainContextRequest, BlockChainContextResponse}; +use cuprate_consensus_context::BlockchainContextService; use cuprate_consensus_rules::{miner_tx::MinerTxError, ConsensusError}; use cuprate_helper::cast::u64_to_usize; use cuprate_types::{VerifiedBlockInformation, VerifiedTransactionInformation}; @@ -110,37 +110,18 @@ impl From<tower::BoxError> for FastSyncError { } } -pub struct FastSyncService<C> { - context_svc: C, +pub struct FastSyncService { + context_svc: BlockchainContextService, } -impl<C> FastSyncService<C> -where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Clone - + Send - + 'static, -{ +impl FastSyncService { #[expect(dead_code)] - pub(crate) const fn new(context_svc: C) -> Self { + pub(crate) const fn new(context_svc: BlockchainContextService) -> Self { Self { context_svc } } } -impl<C> Service<FastSyncRequest> for FastSyncService<C> -where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Clone - + Send - + 'static, - C::Future: Send + 'static, -{ +impl Service<FastSyncRequest> for FastSyncService { type Response = FastSyncResponse; type Error = FastSyncError; type Future = @@ -151,7 +132,7 @@ where } fn call(&mut self, req: FastSyncRequest) -> Self::Future { - let context_svc = self.context_svc.clone(); + let mut context_svc = self.context_svc.clone(); Box::pin(async move { match req { @@ -160,7 +141,7 @@ where block_ids, } => validate_hashes(start_height, &block_ids), FastSyncRequest::ValidateBlock { block, txs, token } => { - validate_block(context_svc, block, txs, token).await + validate_block(&mut context_svc, block, txs, &token) } } }) @@ -210,31 +191,13 @@ fn validate_hashes( }) } -async fn validate_block<C>( - mut context_svc: C, +fn validate_block( + context_svc: &mut BlockchainContextService, block: Block, mut txs: HashMap<[u8; 32], Transaction>, - token: ValidBlockId, -) -> Result<FastSyncResponse, FastSyncError> -where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Send - + 'static, - C::Future: Send + 'static, -{ - let BlockChainContextResponse::Context(checked_context) = context_svc - .ready() - .await? - .call(BlockChainContextRequest::Context) - .await? - else { - panic!("Context service returned wrong response!"); - }; - - let block_chain_ctx = checked_context.unchecked_blockchain_context().clone(); + token: &ValidBlockId, +) -> Result<FastSyncResponse, FastSyncError> { + let block_chain_ctx = context_svc.blockchain_context().clone(); let block_hash = block.hash(); if block_hash != token.0 { diff --git a/consensus/rules/src/blocks.rs b/consensus/rules/src/blocks.rs index 5e55ce2a..b9f5683f 100644 --- a/consensus/rules/src/blocks.rs +++ b/consensus/rules/src/blocks.rs @@ -210,7 +210,7 @@ fn check_txs_unique(txs: &[[u8; 32]]) -> Result<(), BlockError> { /// This struct contains the data needed to verify a block, implementers MUST make sure /// the data in this struct is calculated correctly. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq)] pub struct ContextToVerifyBlock { /// ref: <https://monero-book.cuprate.org/consensus_rules/blocks/weights.html#median-weight-for-coinbase-checks> pub median_weight_for_block_reward: usize, diff --git a/consensus/src/block.rs b/consensus/src/block.rs index 3f5d749e..b94be0f4 100644 --- a/consensus/src/block.rs +++ b/consensus/src/block.rs @@ -1,21 +1,18 @@ -//! Block Verifier Service. -use std::{ - collections::HashMap, - future::Future, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; +//! Block Verification. +//! +//! This module contains functions for verifying blocks: +//! - [`verify_main_chain_block`] +//! - [`batch_prepare_main_chain_blocks`] +//! - [`verify_prepped_main_chain_block`] +//! - [`sanity_check_alt_block`] +//! +use std::{collections::HashMap, mem}; -use futures::FutureExt; -use monero_serai::{ - block::Block, - transaction::{Input, Transaction}, -}; +use monero_serai::{block::Block, transaction::Input}; use tower::{Service, ServiceExt}; use cuprate_consensus_context::{ - BlockChainContextRequest, BlockChainContextResponse, RawBlockChainContext, + BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService, }; use cuprate_helper::asynch::rayon_spawn_async; use cuprate_types::{ @@ -32,17 +29,14 @@ use cuprate_consensus_rules::{ ConsensusError, HardFork, }; -use crate::{ - transactions::{VerifyTxRequest, VerifyTxResponse}, - Database, ExtendedConsensusError, -}; +use crate::{transactions::start_tx_verification, Database, ExtendedConsensusError}; mod alt_block; mod batch_prepare; mod free; -use alt_block::sanity_check_alt_block; -use batch_prepare::batch_prepare_main_chain_block; +pub use alt_block::sanity_check_alt_block; +pub use batch_prepare::batch_prepare_main_chain_blocks; use free::pull_ordered_transactions; /// A pre-prepared block with all data needed to verify it, except the block's proof of work. @@ -198,168 +192,17 @@ impl PreparedBlock { } } -/// A request to verify a block. -pub enum VerifyBlockRequest { - /// A request to verify a block. - MainChain { - block: Block, - prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, - }, - /// Verifies a prepared block. - MainChainPrepped { - /// The already prepared block. - block: PreparedBlock, - /// The full list of transactions for this block, in the order given in `block`. - // TODO: Remove the Arc here - txs: Vec<Arc<TransactionVerificationData>>, - }, - /// Batch prepares a list of blocks and transactions for verification. - MainChainBatchPrepareBlocks { - /// The list of blocks and their transactions (not necessarily in the order given in the block). - blocks: Vec<(Block, Vec<Transaction>)>, - }, - /// A request to sanity check an alt block, also returning the cumulative difficulty of the alt chain. - /// - /// Unlike requests to verify main chain blocks, you do not need to add the returned block to the context - /// service, you will still have to add it to the database though. - AltChain { - /// The alt block to sanity check. - block: Block, - /// The alt transactions. - prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, - }, -} - -/// A response from a verify block request. -pub enum VerifyBlockResponse { - /// This block is valid. - MainChain(VerifiedBlockInformation), - /// The sanity checked alt block. - AltChain(AltBlockInformation), - /// A list of prepared blocks for verification, you should call [`VerifyBlockRequest::MainChainPrepped`] on each of the returned - /// blocks to fully verify them. - MainChainBatchPrepped(Vec<(PreparedBlock, Vec<Arc<TransactionVerificationData>>)>), -} - -/// The block verifier service. -pub struct BlockVerifierService<C, TxV, D> { - /// The context service. - context_svc: C, - /// The tx verifier service. - tx_verifier_svc: TxV, - /// The database. - // Not use yet but will be. - _database: D, -} - -impl<C, TxV, D> BlockVerifierService<C, TxV, D> -where - C: Service<BlockChainContextRequest, Response = BlockChainContextResponse> - + Clone - + Send - + 'static, - TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError> - + Clone - + Send - + 'static, - D: Database + Clone + Send + 'static, - D::Future: Send + 'static, -{ - /// Creates a new block verifier. - pub(crate) const fn new(context_svc: C, tx_verifier_svc: TxV, database: D) -> Self { - Self { - context_svc, - tx_verifier_svc, - _database: database, - } - } -} - -impl<C, TxV, D> Service<VerifyBlockRequest> for BlockVerifierService<C, TxV, D> -where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Clone - + Send - + 'static, - C::Future: Send + 'static, - - TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError> - + Clone - + Send - + 'static, - TxV::Future: Send + 'static, - - D: Database + Clone + Send + 'static, - D::Future: Send + 'static, -{ - type Response = VerifyBlockResponse; - type Error = ExtendedConsensusError; - type Future = - Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: VerifyBlockRequest) -> Self::Future { - let context_svc = self.context_svc.clone(); - let tx_verifier_svc = self.tx_verifier_svc.clone(); - - async move { - match req { - VerifyBlockRequest::MainChain { - block, - prepared_txs, - } => { - verify_main_chain_block(block, prepared_txs, context_svc, tx_verifier_svc).await - } - VerifyBlockRequest::MainChainBatchPrepareBlocks { blocks } => { - batch_prepare_main_chain_block(blocks, context_svc).await - } - VerifyBlockRequest::MainChainPrepped { block, txs } => { - verify_prepped_main_chain_block(block, txs, context_svc, tx_verifier_svc, None) - .await - } - VerifyBlockRequest::AltChain { - block, - prepared_txs, - } => sanity_check_alt_block(block, prepared_txs, context_svc).await, - } - } - .boxed() - } -} - -/// Verifies a prepared block. -async fn verify_main_chain_block<C, TxV>( +/// Fully verify a block and all its transactions. +pub async fn verify_main_chain_block<D>( block: Block, txs: HashMap<[u8; 32], TransactionVerificationData>, - mut context_svc: C, - tx_verifier_svc: TxV, -) -> Result<VerifyBlockResponse, ExtendedConsensusError> + context_svc: &mut BlockchainContextService, + database: D, +) -> Result<VerifiedBlockInformation, ExtendedConsensusError> where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Send - + 'static, - C::Future: Send + 'static, - TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>, + D: Database + Clone + Send + 'static, { - let BlockChainContextResponse::Context(checked_context) = context_svc - .ready() - .await? - .call(BlockChainContextRequest::Context) - .await? - else { - panic!("Context service returned wrong response!"); - }; - - let context = checked_context.unchecked_blockchain_context().clone(); + let context = context_svc.blockchain_context().clone(); tracing::debug!("got blockchain context: {:?}", context); tracing::debug!( @@ -398,55 +241,22 @@ where .map_err(ConsensusError::Block)?; // Check that the txs included are what we need and that there are not any extra. - // TODO: Remove the Arc here - let ordered_txs = pull_ordered_transactions(&prepped_block.block, txs)? - .into_iter() - .map(Arc::new) - .collect(); + let ordered_txs = pull_ordered_transactions(&prepped_block.block, txs)?; - verify_prepped_main_chain_block( - prepped_block, - ordered_txs, - context_svc, - tx_verifier_svc, - Some(context), - ) - .await + verify_prepped_main_chain_block(prepped_block, ordered_txs, context_svc, database).await } -async fn verify_prepped_main_chain_block<C, TxV>( +/// Fully verify a block that has already been prepared using [`batch_prepare_main_chain_blocks`]. +pub async fn verify_prepped_main_chain_block<D>( prepped_block: PreparedBlock, - txs: Vec<Arc<TransactionVerificationData>>, - context_svc: C, - tx_verifier_svc: TxV, - cached_context: Option<RawBlockChainContext>, -) -> Result<VerifyBlockResponse, ExtendedConsensusError> + mut txs: Vec<TransactionVerificationData>, + context_svc: &mut BlockchainContextService, + database: D, +) -> Result<VerifiedBlockInformation, ExtendedConsensusError> where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Send - + 'static, - C::Future: Send + 'static, - TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>, + D: Database + Clone + Send + 'static, { - let context = if let Some(context) = cached_context { - context - } else { - let BlockChainContextResponse::Context(checked_context) = context_svc - .oneshot(BlockChainContextRequest::Context) - .await? - else { - panic!("Context service returned wrong response!"); - }; - - let context = checked_context.unchecked_blockchain_context().clone(); - - tracing::debug!("got blockchain context: {context:?}"); - - context - }; + let context = context_svc.blockchain_context(); tracing::debug!("verifying block: {}", hex::encode(prepped_block.block_hash)); @@ -464,15 +274,20 @@ where } } - tx_verifier_svc - .oneshot(VerifyTxRequest::Prepped { - txs: txs.clone(), - current_chain_height: context.chain_height, - top_hash: context.top_hash, - time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), - hf: context.current_hf, - }) + let temp = start_tx_verification() + .append_prepped_txs(mem::take(&mut txs)) + .prepare()? + .full( + context.chain_height, + context.top_hash, + context.current_adjusted_timestamp_for_time_lock(), + context.current_hf, + database, + ) + .verify() .await?; + + txs = temp; } let block_weight = @@ -489,26 +304,18 @@ where ) .map_err(ConsensusError::Block)?; - Ok(VerifyBlockResponse::MainChain(VerifiedBlockInformation { + Ok(VerifiedBlockInformation { block_hash: prepped_block.block_hash, block: prepped_block.block, block_blob: prepped_block.block_blob, txs: txs .into_iter() - .map(|tx| { - // Note: it would be possible for the transaction verification service to hold onto the tx after the call - // if one of txs was invalid and the rest are still in rayon threads. - let tx = Arc::into_inner(tx).expect( - "Transaction verification service should not hold onto valid transactions.", - ); - - VerifiedTransactionInformation { - tx_blob: tx.tx_blob, - tx_weight: tx.tx_weight, - fee: tx.fee, - tx_hash: tx.tx_hash, - tx: tx.tx, - } + .map(|tx| VerifiedTransactionInformation { + tx_blob: tx.tx_blob, + tx_weight: tx.tx_weight, + fee: tx.fee, + tx_hash: tx.tx_hash, + tx: tx.tx, }) .collect(), pow_hash: prepped_block.pow_hash, @@ -517,5 +324,5 @@ where height: context.chain_height, long_term_weight: context.next_block_long_term_weight(block_weight), cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty, - })) + }) } diff --git a/consensus/src/block/alt_block.rs b/consensus/src/block/alt_block.rs index 18c27345..1d0641b1 100644 --- a/consensus/src/block/alt_block.rs +++ b/consensus/src/block/alt_block.rs @@ -29,7 +29,6 @@ use cuprate_types::{ use crate::{ block::{free::pull_ordered_transactions, PreparedBlock}, BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError, - VerifyBlockResponse, }; /// This function sanity checks an alt-block. @@ -37,11 +36,11 @@ use crate::{ /// Returns [`AltBlockInformation`], which contains the cumulative difficulty of the alt chain. /// /// This function only checks the block's proof-of-work and its weight. -pub(crate) async fn sanity_check_alt_block<C>( +pub async fn sanity_check_alt_block<C>( block: Block, txs: HashMap<[u8; 32], TransactionVerificationData>, mut context_svc: C, -) -> Result<VerifyBlockResponse, ExtendedConsensusError> +) -> Result<AltBlockInformation, ExtendedConsensusError> where C: Service< BlockChainContextRequest, @@ -185,7 +184,7 @@ where }) .await?; - Ok(VerifyBlockResponse::AltChain(block_info)) + Ok(block_info) } /// Retrieves the alt RX VM for the chosen block height. diff --git a/consensus/src/block/batch_prepare.rs b/consensus/src/block/batch_prepare.rs index ef384f5d..54cb3fea 100644 --- a/consensus/src/block/batch_prepare.rs +++ b/consensus/src/block/batch_prepare.rs @@ -5,7 +5,7 @@ use rayon::prelude::*; use tower::{Service, ServiceExt}; use tracing::instrument; -use cuprate_consensus_context::rx_vms::RandomXVm; +use cuprate_consensus_context::{rx_vms::RandomXVm, BlockchainContextService}; use cuprate_consensus_rules::{ blocks::{check_block_pow, is_randomx_seed_height, randomx_seed_height, BlockError}, hard_forks::HardForkError, @@ -13,29 +13,21 @@ use cuprate_consensus_rules::{ ConsensusError, HardFork, }; use cuprate_helper::asynch::rayon_spawn_async; +use cuprate_types::TransactionVerificationData; use crate::{ - block::{free::pull_ordered_transactions, PreparedBlock, PreparedBlockExPow}, - transactions::new_tx_verification_data, + batch_verifier::MultiThreadedBatchVerifier, + block::{free::order_transactions, PreparedBlock, PreparedBlockExPow}, + transactions::start_tx_verification, BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError, - VerifyBlockResponse, }; /// Batch prepares a list of blocks for verification. #[instrument(level = "debug", name = "batch_prep_blocks", skip_all, fields(amt = blocks.len()))] -pub(crate) async fn batch_prepare_main_chain_block<C>( +pub async fn batch_prepare_main_chain_blocks( blocks: Vec<(Block, Vec<Transaction>)>, - mut context_svc: C, -) -> Result<VerifyBlockResponse, ExtendedConsensusError> -where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Send - + 'static, - C::Future: Send + 'static, -{ + context_svc: &mut BlockchainContextService, +) -> Result<Vec<(PreparedBlock, Vec<TransactionVerificationData>)>, ExtendedConsensusError> { let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip(); tracing::debug!("Calculating block hashes."); @@ -89,16 +81,6 @@ where timestamps_hfs.push((block_0.block.header.timestamp, block_0.hf_version)); } - // Get the current blockchain context. - let BlockChainContextResponse::Context(checked_context) = context_svc - .ready() - .await? - .call(BlockChainContextRequest::Context) - .await? - else { - panic!("Context service returned wrong response!"); - }; - // Calculate the expected difficulties for each block in the batch. let BlockChainContextResponse::BatchDifficulties(difficulties) = context_svc .ready() @@ -111,7 +93,8 @@ where panic!("Context service returned wrong response!"); }; - let context = checked_context.unchecked_blockchain_context().clone(); + // Get the current blockchain context. + let context = context_svc.blockchain_context(); // Make sure the blocks follow the main chain. @@ -168,7 +151,9 @@ where tracing::debug!("Calculating PoW and prepping transaction"); let blocks = rayon_spawn_async(move || { - blocks + let batch_verifier = MultiThreadedBatchVerifier::new(rayon::current_num_threads()); + + let res = blocks .into_par_iter() .zip(difficulties) .zip(txs) @@ -183,27 +168,26 @@ where // Check the PoW check_block_pow(&block.pow_hash, difficultly).map_err(ConsensusError::Block)?; - // Now setup the txs. - let txs = txs - .into_par_iter() - .map(|tx| { - let tx = new_tx_verification_data(tx)?; - Ok::<_, ConsensusError>((tx.tx_hash, tx)) - }) - .collect::<Result<HashMap<_, _>, _>>()?; + let mut txs = start_tx_verification() + .append_txs(txs) + .prepare()? + .only_semantic(block.hf_version) + .queue(&batch_verifier)?; // Order the txs correctly. - // TODO: Remove the Arc here - let ordered_txs = pull_ordered_transactions(&block.block, txs)? - .into_iter() - .map(Arc::new) - .collect(); + order_transactions(&block.block, &mut txs)?; - Ok((block, ordered_txs)) + Ok((block, txs)) }) - .collect::<Result<Vec<_>, ExtendedConsensusError>>() + .collect::<Result<Vec<_>, ExtendedConsensusError>>()?; + + if !batch_verifier.verify() { + return Err(ExtendedConsensusError::OneOrMoreBatchVerificationStatementsInvalid); + } + + Ok(res) }) .await?; - Ok(VerifyBlockResponse::MainChainBatchPrepped(blocks)) + Ok(blocks) } diff --git a/consensus/src/block/free.rs b/consensus/src/block/free.rs index e122374d..9da0af5d 100644 --- a/consensus/src/block/free.rs +++ b/consensus/src/block/free.rs @@ -7,6 +7,36 @@ use cuprate_types::TransactionVerificationData; use crate::ExtendedConsensusError; +/// Orders the [`TransactionVerificationData`] list the same as it appears in [`Block::transactions`] +pub(crate) fn order_transactions( + block: &Block, + txs: &mut [TransactionVerificationData], +) -> Result<(), ExtendedConsensusError> { + if block.transactions.len() != txs.len() { + return Err(ExtendedConsensusError::TxsIncludedWithBlockIncorrect); + } + + for (i, tx_hash) in block.transactions.iter().enumerate() { + if tx_hash != &txs[i].tx_hash { + let at_index = txs[i..] + .iter() + .position(|tx| &tx.tx_hash == tx_hash) + .ok_or(ExtendedConsensusError::TxsIncludedWithBlockIncorrect)?; + + // The above `position` will give an index from inside its view of the slice so we need to add the difference. + txs.swap(i, i + at_index); + } + } + + debug_assert!(block + .transactions + .iter() + .zip(txs.iter()) + .all(|(tx_hash, tx)| tx_hash == &tx.tx_hash)); + + Ok(()) +} + /// Returns a list of transactions, pulled from `txs` in the order they are in the [`Block`]. /// /// Will error if a tx need is not in `txs` or if `txs` contain more txs than needed. diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index f21d00b2..3d8169c4 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -1,10 +1,6 @@ //! Cuprate Consensus //! -//! This crate contains 3 [`tower::Service`]s that implement Monero's consensus rules: -//! -//! - [`BlockChainContextService`] Which handles keeping the current state of the blockchain. -//! - [`BlockVerifierService`] Which handles block verification. -//! - [`TxVerifierService`] Which handles transaction verification. +//! This crate contains Monero [`block`] and [`transactions`] verification functionality. //! //! This crate is generic over the database which is implemented as a [`tower::Service`]. To //! implement a database you need to have a service which accepts [`BlockchainReadRequest`] and responds @@ -17,23 +13,22 @@ cfg_if::cfg_if! { use cuprate_test_utils as _; use curve25519_dalek as _; use hex_literal as _; + use futures as _; } } use cuprate_consensus_rules::ConsensusError; -mod batch_verifier; +pub mod batch_verifier; pub mod block; #[cfg(test)] mod tests; pub mod transactions; -pub use block::{BlockVerifierService, VerifyBlockRequest, VerifyBlockResponse}; pub use cuprate_consensus_context::{ - initialize_blockchain_context, BlockChainContext, BlockChainContextRequest, - BlockChainContextResponse, BlockChainContextService, ContextConfig, + initialize_blockchain_context, BlockChainContextRequest, BlockChainContextResponse, + BlockchainContext, BlockchainContextService, ContextConfig, }; -pub use transactions::{TxVerifierService, VerifyTxRequest, VerifyTxResponse}; // re-export. pub use cuprate_consensus_rules::genesis::generate_genesis_block; @@ -63,37 +58,9 @@ pub enum ExtendedConsensusError { NoBlocksToVerify, } -/// Initialize the 2 verifier [`tower::Service`]s (block and transaction). -pub fn initialize_verifier<D, Ctx>( - database: D, - ctx_svc: Ctx, -) -> ( - BlockVerifierService<Ctx, TxVerifierService<D>, D>, - TxVerifierService<D>, -) -where - D: Database + Clone + Send + Sync + 'static, - D::Future: Send + 'static, - Ctx: tower::Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Clone - + Send - + Sync - + 'static, - Ctx::Future: Send + 'static, -{ - let tx_svc = TxVerifierService::new(database.clone()); - let block_svc = BlockVerifierService::new(ctx_svc, tx_svc.clone(), database); - (block_svc, tx_svc) -} - use __private::Database; pub mod __private { - use std::future::Future; - use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; /// A type alias trait used to represent a database, so we don't have to write [`tower::Service`] bounds @@ -108,10 +75,9 @@ pub mod __private { BlockchainReadRequest, Response = BlockchainResponse, Error = tower::BoxError, - Future = Self::Future2, + Future: Send + 'static, > { - type Future2: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static; } impl< @@ -119,11 +85,9 @@ pub mod __private { BlockchainReadRequest, Response = BlockchainResponse, Error = tower::BoxError, + Future: Send + 'static, >, > Database for T - where - T::Future: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static, { - type Future2 = T::Future; } } diff --git a/consensus/src/tests/context.rs b/consensus/src/tests/context.rs index b9c52177..ecb1464a 100644 --- a/consensus/src/tests/context.rs +++ b/consensus/src/tests/context.rs @@ -3,8 +3,7 @@ use proptest::{strategy::Strategy, test_runner::TestRunner}; use tower::ServiceExt; use cuprate_consensus_context::{ - initialize_blockchain_context, BlockChainContextRequest, BlockChainContextResponse, - ContextConfig, NewBlockData, + initialize_blockchain_context, BlockChainContextRequest, ContextConfig, NewBlockData, }; use crate::{tests::mock_db::*, HardFork}; @@ -35,21 +34,12 @@ async fn context_invalidated_on_new_block() -> Result<(), tower::BoxError> { .unwrap() .current(); - let ctx_svc = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?; + let mut ctx_svc = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?; - let BlockChainContextResponse::Context(context) = ctx_svc - .clone() - .oneshot(BlockChainContextRequest::Context) - .await? - else { - panic!("Context service returned wrong response!"); - }; - - assert!(context.is_still_valid()); - assert!(context.is_still_valid()); - assert!(context.is_still_valid()); + let context = ctx_svc.blockchain_context().clone(); ctx_svc + .clone() .oneshot(BlockChainContextRequest::Update(NewBlockData { block_hash: [0; 32], height: BLOCKCHAIN_HEIGHT, @@ -62,7 +52,7 @@ async fn context_invalidated_on_new_block() -> Result<(), tower::BoxError> { })) .await?; - assert!(!context.is_still_valid()); + assert_ne!(&context, ctx_svc.blockchain_context()); Ok(()) } @@ -77,18 +67,11 @@ async fn context_height_correct() -> Result<(), tower::BoxError> { .unwrap() .current(); - let ctx_svc = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?; + let mut ctx_svc = initialize_blockchain_context(TEST_CONTEXT_CONFIG, db).await?; - let BlockChainContextResponse::Context(context) = - ctx_svc.oneshot(BlockChainContextRequest::Context).await? - else { - panic!("context service returned incorrect response!") - }; + let context = ctx_svc.blockchain_context(); - assert_eq!( - context.blockchain_context().unwrap().chain_height, - BLOCKCHAIN_HEIGHT - ); + assert_eq!(context.chain_height, BLOCKCHAIN_HEIGHT); Ok(()) } diff --git a/consensus/src/transactions.rs b/consensus/src/transactions.rs index f29c852f..f776247f 100644 --- a/consensus/src/transactions.rs +++ b/consensus/src/transactions.rs @@ -1,20 +1,35 @@ //! # Transaction Verifier Service. //! -//! This module contains the [`TxVerifierService`] which handles consensus validation of transactions. +//! This module contains the transaction validation interface, which can be accessed with [`start_tx_verification`]. //! -use std::{ - collections::HashSet, - future::Future, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; +//! Transaction verification states will be cached to prevent doing the expensive checks multiple times. +//! +//! ## Example Semantic Verification +//! +//! ```rust +//! # use cuprate_test_utils::data::TX_E2D393; +//! # use monero_serai::transaction::Transaction; +//! use cuprate_consensus::{transactions::start_tx_verification, HardFork, batch_verifier::MultiThreadedBatchVerifier}; +//! +//! # fn main() -> Result<(), tower::BoxError> { +//! # let tx = Transaction::read(&mut TX_E2D393).unwrap(); +//! let batch_verifier = MultiThreadedBatchVerifier::new(rayon::current_num_threads()); +//! +//! let tx = start_tx_verification() +//! .append_txs(vec![tx]) +//! .prepare()? +//! .only_semantic(HardFork::V9) +//! .queue(&batch_verifier)?; +//! +//! assert!(batch_verifier.verify()); +//! Ok(()) +//! # } +//! ``` +use std::collections::HashSet; -use futures::FutureExt; use monero_serai::transaction::{Input, Timelock, Transaction}; use rayon::prelude::*; -use tower::{Service, ServiceExt}; -use tracing::instrument; +use tower::ServiceExt; use cuprate_consensus_rules::{ transactions::{ @@ -40,183 +55,223 @@ mod free; pub use free::new_tx_verification_data; -/// A struct representing the type of validation that needs to be completed for this transaction. +/// An enum representing the type of validation that needs to be completed for this transaction. #[derive(Debug, Copy, Clone, Eq, PartialEq)] enum VerificationNeeded { + /// Decoy check on a v1 transaction. + V1DecoyCheck, /// Both semantic validation and contextual validation are needed. SemanticAndContextual, /// Only contextual validation is needed. Contextual, + /// No verification needed. + None, } -/// A request to verify a transaction. -pub enum VerifyTxRequest { - /// Verifies a batch of prepared txs. - Prepped { - /// The transactions to verify. - // TODO: Can we use references to remove the Vec? wont play nicely with Service though - txs: Vec<Arc<TransactionVerificationData>>, - /// The current chain height. - current_chain_height: usize, - /// The top block hash. - top_hash: [u8; 32], - /// The value for time to use to check time locked outputs. - time_for_time_lock: u64, - /// The current [`HardFork`] - hf: HardFork, - }, - /// Verifies a batch of new txs. - /// Returning [`VerifyTxResponse::OkPrepped`] - New { - /// The transactions to verify. - txs: Vec<Transaction>, - /// The current chain height. - current_chain_height: usize, - /// The top block hash. - top_hash: [u8; 32], - /// The value for time to use to check time locked outputs. - time_for_time_lock: u64, - /// The current [`HardFork`] - hf: HardFork, - }, -} - -/// A response from a verify transaction request. -#[derive(Debug)] -pub enum VerifyTxResponse { - OkPrepped(Vec<Arc<TransactionVerificationData>>), - Ok, -} - -/// The transaction verifier service. -#[derive(Clone)] -pub struct TxVerifierService<D> { - /// The database. - database: D, -} - -impl<D> TxVerifierService<D> -where - D: Database + Clone + Send + 'static, - D::Future: Send + 'static, -{ - /// Creates a new [`TxVerifierService`]. - pub const fn new(database: D) -> Self { - Self { database } +/// Start the transaction verification process. +pub const fn start_tx_verification() -> PrepTransactions { + PrepTransactions { + txs: vec![], + prepped_txs: vec![], } } -impl<D> Service<VerifyTxRequest> for TxVerifierService<D> -where - D: Database + Clone + Send + Sync + 'static, - D::Future: Send + 'static, -{ - type Response = VerifyTxResponse; - type Error = ExtendedConsensusError; - type Future = - Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - self.database.poll_ready(cx).map_err(Into::into) - } - - fn call(&mut self, req: VerifyTxRequest) -> Self::Future { - let database = self.database.clone(); - - async move { - match req { - VerifyTxRequest::New { - txs, - current_chain_height, - top_hash, - time_for_time_lock, - hf, - } => { - prep_and_verify_transactions( - database, - txs, - current_chain_height, - top_hash, - time_for_time_lock, - hf, - ) - .await - } - - VerifyTxRequest::Prepped { - txs, - current_chain_height, - top_hash, - time_for_time_lock, - hf, - } => { - verify_prepped_transactions( - database, - &txs, - current_chain_height, - top_hash, - time_for_time_lock, - hf, - ) - .await - } - } - } - .boxed() - } -} - -/// Prepares transactions for verification, then verifies them. -async fn prep_and_verify_transactions<D>( - database: D, +/// The preparation phase of transaction verification. +/// +/// The order of transactions will be kept throughout the verification process, transactions +/// inserted with [`PrepTransactions::append_prepped_txs`] will be put before transactions given +/// in [`PrepTransactions::append_txs`] +pub struct PrepTransactions { + prepped_txs: Vec<TransactionVerificationData>, txs: Vec<Transaction>, - current_chain_height: usize, - top_hash: [u8; 32], - time_for_time_lock: u64, - hf: HardFork, -) -> Result<VerifyTxResponse, ExtendedConsensusError> -where - D: Database + Clone + Sync + Send + 'static, -{ - let span = tracing::info_span!("prep_txs", amt = txs.len()); - - tracing::debug!(parent: &span, "prepping transactions for verification."); - let txs = rayon_spawn_async(|| { - txs.into_par_iter() - .map(|tx| new_tx_verification_data(tx).map(Arc::new)) - .collect::<Result<Vec<_>, _>>() - }) - .await?; - - verify_prepped_transactions( - database, - &txs, - current_chain_height, - top_hash, - time_for_time_lock, - hf, - ) - .await?; - - Ok(VerifyTxResponse::OkPrepped(txs)) } -#[instrument(name = "verify_txs", skip_all, fields(amt = txs.len()) level = "info")] -async fn verify_prepped_transactions<D>( - mut database: D, - txs: &[Arc<TransactionVerificationData>], +impl PrepTransactions { + /// Append some new transactions to prepare. + #[must_use] + pub fn append_txs(mut self, mut txs: Vec<Transaction>) -> Self { + self.txs.append(&mut txs); + + self + } + + /// Append some already prepped transactions. + #[must_use] + pub fn append_prepped_txs(mut self, mut txs: Vec<TransactionVerificationData>) -> Self { + self.prepped_txs.append(&mut txs); + + self + } + + /// Prepare the transactions and advance to the next step: [`VerificationWanted`]. + /// + /// # [`rayon`] + /// + /// This function will use [`rayon`] to parallelize the preparation process, so should not be called + /// in an async function, unless all the transactions given were already prepared, i.e. [`Self::append_prepped_txs`]. + pub fn prepare(mut self) -> Result<VerificationWanted, ConsensusError> { + if !self.txs.is_empty() { + self.prepped_txs.append( + &mut self + .txs + .into_par_iter() + .map(new_tx_verification_data) + .collect::<Result<_, _>>()?, + ); + } + + Ok(VerificationWanted { + prepped_txs: self.prepped_txs, + }) + } +} + +/// The step where the type of verification is decided. +pub struct VerificationWanted { + prepped_txs: Vec<TransactionVerificationData>, +} + +impl VerificationWanted { + /// Only semantic verification. + /// + /// Semantic verification is verification that can done without other blockchain data. The [`HardFork`] + /// is technically other blockchain data but excluding it reduces the amount of things that can be checked + /// significantly, and it is easy to get compared to other blockchain data needed for contextual validation. + pub fn only_semantic(self, hf: HardFork) -> SemanticVerification { + SemanticVerification { + prepped_txs: self.prepped_txs, + hf, + } + } + + /// Full verification. + /// + /// Fully verify the transactions, all checks will be performed, if they were already performed then they + /// won't be done again unless necessary. + pub fn full<D: Database>( + self, + current_chain_height: usize, + top_hash: [u8; 32], + time_for_time_lock: u64, + hf: HardFork, + database: D, + ) -> FullVerification<D> { + FullVerification { + prepped_txs: self.prepped_txs, + current_chain_height, + top_hash, + time_for_time_lock, + hf, + database, + } + } +} + +/// Semantic transaction verification. +/// +/// [`VerificationWanted::only_semantic`] +pub struct SemanticVerification { + prepped_txs: Vec<TransactionVerificationData>, + hf: HardFork, +} + +impl SemanticVerification { + /// Perform the semantic checks and queue any checks that can be batched into the batch verifier. + /// + /// If this function returns [`Ok`] the transaction(s) could still be semantically invalid, + /// [`MultiThreadedBatchVerifier::verify`] must be called on the `batch_verifier` after. + pub fn queue( + mut self, + batch_verifier: &MultiThreadedBatchVerifier, + ) -> Result<Vec<TransactionVerificationData>, ConsensusError> { + self.prepped_txs.par_iter_mut().try_for_each(|tx| { + let fee = check_transaction_semantic( + &tx.tx, + tx.tx_blob.len(), + tx.tx_weight, + &tx.tx_hash, + self.hf, + batch_verifier, + )?; + // make sure we calculated the right fee. + assert_eq!(fee, tx.fee); + + tx.cached_verification_state = CachedVerificationState::OnlySemantic(self.hf); + + Ok::<_, ConsensusError>(()) + })?; + + Ok(self.prepped_txs) + } +} + +/// Full transaction verification. +/// +/// [`VerificationWanted::full`] +pub struct FullVerification<D> { + prepped_txs: Vec<TransactionVerificationData>, + current_chain_height: usize, top_hash: [u8; 32], time_for_time_lock: u64, hf: HardFork, -) -> Result<VerifyTxResponse, ExtendedConsensusError> -where - D: Database + Clone + Sync + Send + 'static, -{ - tracing::debug!("Verifying transactions"); + database: D, +} - tracing::trace!("Checking for duplicate key images"); +impl<D: Database + Clone> FullVerification<D> { + /// Fully verify each transaction. + pub async fn verify( + mut self, + ) -> Result<Vec<TransactionVerificationData>, ExtendedConsensusError> { + check_kis_unique(&self.prepped_txs, &mut self.database).await?; + let hashes_in_main_chain = + hashes_referenced_in_main_chain(&self.prepped_txs, &mut self.database).await?; + + let (verification_needed, any_v1_decoy_check_needed) = verification_needed( + &self.prepped_txs, + &hashes_in_main_chain, + self.hf, + self.current_chain_height, + self.time_for_time_lock, + )?; + + if any_v1_decoy_check_needed { + verify_transactions_decoy_info( + self.prepped_txs + .iter() + .zip(verification_needed.iter()) + .filter_map(|(tx, needed)| { + if *needed == VerificationNeeded::V1DecoyCheck { + Some(tx) + } else { + None + } + }), + self.hf, + self.database.clone(), + ) + .await?; + } + + verify_transactions( + self.prepped_txs, + verification_needed, + self.current_chain_height, + self.top_hash, + self.time_for_time_lock, + self.hf, + self.database, + ) + .await + } +} + +/// Check that each key image used in each transaction is unique in the whole chain. +async fn check_kis_unique<D: Database>( + txs: &[TransactionVerificationData], + database: &mut D, +) -> Result<(), ExtendedConsensusError> { let mut spent_kis = HashSet::with_capacity(txs.len()); txs.iter().try_for_each(|tx| { @@ -246,14 +301,18 @@ where return Err(ConsensusError::Transaction(TransactionError::KeyImageSpent).into()); } + Ok(()) +} + +/// Returns a [`HashSet`] of all the hashes referenced in each transaction's [`CachedVerificationState`], that +/// are also in the main chain. +async fn hashes_referenced_in_main_chain<D: Database>( + txs: &[TransactionVerificationData], + database: &mut D, +) -> Result<HashSet<[u8; 32]>, ExtendedConsensusError> { let mut verified_at_block_hashes = txs .iter() - .filter_map(|txs| { - txs.cached_verification_state - .lock() - .unwrap() - .verified_at_block_hash() - }) + .filter_map(|txs| txs.cached_verification_state.verified_at_block_hash()) .collect::<HashSet<_>>(); tracing::trace!( @@ -277,74 +336,53 @@ where verified_at_block_hashes = known_hashes; } - let (txs_needing_full_verification, txs_needing_partial_verification) = - transactions_needing_verification( - txs, - &verified_at_block_hashes, - hf, - current_chain_height, - time_for_time_lock, - )?; - - futures::try_join!( - verify_transactions_decoy_info(txs_needing_partial_verification, hf, database.clone()), - verify_transactions( - txs_needing_full_verification, - current_chain_height, - top_hash, - time_for_time_lock, - hf, - database - ) - )?; - - Ok(VerifyTxResponse::Ok) + Ok(verified_at_block_hashes) } -#[expect( - clippy::type_complexity, - reason = "I don't think the return is too complex" -)] -fn transactions_needing_verification( - txs: &[Arc<TransactionVerificationData>], +/// Returns a list of [`VerificationNeeded`] for each transaction passed in. The returned +/// [`Vec`] will be the same length as the inputted transactions. +/// +/// A [`bool`] is also returned, which will be true if any transactions need [`VerificationNeeded::V1DecoyCheck`]. +fn verification_needed( + txs: &[TransactionVerificationData], hashes_in_main_chain: &HashSet<[u8; 32]>, current_hf: HardFork, current_chain_height: usize, time_for_time_lock: u64, -) -> Result< - ( - Vec<(Arc<TransactionVerificationData>, VerificationNeeded)>, - Vec<Arc<TransactionVerificationData>>, - ), - ConsensusError, -> { +) -> Result<(Vec<VerificationNeeded>, bool), ConsensusError> { // txs needing full validation: semantic and/or contextual - let mut full_validation_transactions = Vec::new(); - // txs needing partial _contextual_ validation, not semantic. - let mut partial_validation_transactions = Vec::new(); + let mut verification_needed = Vec::with_capacity(txs.len()); + + let mut any_v1_decoy_checks = false; for tx in txs { - let guard = tx.cached_verification_state.lock().unwrap(); - - match &*guard { + match &tx.cached_verification_state { CachedVerificationState::NotVerified => { - drop(guard); - full_validation_transactions - .push((Arc::clone(tx), VerificationNeeded::SemanticAndContextual)); + // Tx not verified at all need all checks. + verification_needed.push(VerificationNeeded::SemanticAndContextual); + continue; + } + CachedVerificationState::OnlySemantic(hf) => { + if current_hf != *hf { + // HF changed must do semantic checks again. + verification_needed.push(VerificationNeeded::SemanticAndContextual); + continue; + } + // Tx already semantically valid for this HF only contextual checks needed. + verification_needed.push(VerificationNeeded::Contextual); continue; } CachedVerificationState::ValidAtHashAndHF { block_hash, hf } => { if current_hf != *hf { - drop(guard); - full_validation_transactions - .push((Arc::clone(tx), VerificationNeeded::SemanticAndContextual)); + // HF changed must do all checks again. + verification_needed.push(VerificationNeeded::SemanticAndContextual); continue; } if !hashes_in_main_chain.contains(block_hash) { - drop(guard); - full_validation_transactions - .push((Arc::clone(tx), VerificationNeeded::Contextual)); + // The block we know this transaction was valid at is no longer in the chain do + // contextual checks again. + verification_needed.push(VerificationNeeded::Contextual); continue; } } @@ -354,20 +392,20 @@ fn transactions_needing_verification( time_lock, } => { if current_hf != *hf { - drop(guard); - full_validation_transactions - .push((Arc::clone(tx), VerificationNeeded::SemanticAndContextual)); + // HF changed must do all checks again. + verification_needed.push(VerificationNeeded::SemanticAndContextual); continue; } if !hashes_in_main_chain.contains(block_hash) { - drop(guard); - full_validation_transactions - .push((Arc::clone(tx), VerificationNeeded::Contextual)); + // The block we know this transaction was valid at is no longer in the chain do + // contextual checks again. + verification_needed.push(VerificationNeeded::Contextual); continue; } // If the time lock is still locked then the transaction is invalid. + // Time is not monotonic in Monero so these can become invalid with new blocks. if !output_unlocked(time_lock, current_chain_height, time_for_time_lock, *hf) { return Err(ConsensusError::Transaction( TransactionError::OneOrMoreRingMembersLocked, @@ -377,57 +415,80 @@ fn transactions_needing_verification( } if tx.version == TxVersion::RingSignatures { - drop(guard); - partial_validation_transactions.push(Arc::clone(tx)); + // v1 txs always need at least decoy checks as they can become invalid with new blocks. + verification_needed.push(VerificationNeeded::V1DecoyCheck); + any_v1_decoy_checks = true; continue; } + + verification_needed.push(VerificationNeeded::None); } - Ok(( - full_validation_transactions, - partial_validation_transactions, - )) + Ok((verification_needed, any_v1_decoy_checks)) } -async fn verify_transactions_decoy_info<D>( - txs: Vec<Arc<TransactionVerificationData>>, +/// Do [`VerificationNeeded::V1DecoyCheck`] on each tx passed in. +async fn verify_transactions_decoy_info<D: Database>( + txs: impl Iterator<Item = &TransactionVerificationData> + Clone, hf: HardFork, database: D, -) -> Result<(), ExtendedConsensusError> -where - D: Database + Clone + Sync + Send + 'static, -{ +) -> Result<(), ExtendedConsensusError> { // Decoy info is not validated for V1 txs. - if hf == HardFork::V1 || txs.is_empty() { + if hf == HardFork::V1 { return Ok(()); } - batch_get_decoy_info(&txs, hf, database) + batch_get_decoy_info(txs, hf, database) .await? .try_for_each(|decoy_info| decoy_info.and_then(|di| Ok(check_decoy_info(&di, hf)?)))?; Ok(()) } +/// Do [`VerificationNeeded::Contextual`] or [`VerificationNeeded::SemanticAndContextual`]. +/// +/// The inputs to this function are the txs wanted to be verified and a list of [`VerificationNeeded`], +/// if any other [`VerificationNeeded`] is specified other than [`VerificationNeeded::Contextual`] or +/// [`VerificationNeeded::SemanticAndContextual`], nothing will be verified for that tx. async fn verify_transactions<D>( - txs: Vec<(Arc<TransactionVerificationData>, VerificationNeeded)>, + mut txs: Vec<TransactionVerificationData>, + verification_needed: Vec<VerificationNeeded>, current_chain_height: usize, top_hash: [u8; 32], current_time_lock_timestamp: u64, hf: HardFork, database: D, -) -> Result<(), ExtendedConsensusError> +) -> Result<Vec<TransactionVerificationData>, ExtendedConsensusError> where - D: Database + Clone + Sync + Send + 'static, + D: Database, { - let txs_ring_member_info = - batch_get_ring_member_info(txs.iter().map(|(tx, _)| tx), hf, database).await?; + /// A filter each tx not [`VerificationNeeded::Contextual`] or + /// [`VerificationNeeded::SemanticAndContextual`] + const fn tx_filter<T>((_, needed): &(T, &VerificationNeeded)) -> bool { + matches!( + needed, + VerificationNeeded::Contextual | VerificationNeeded::SemanticAndContextual + ) + } + + let txs_ring_member_info = batch_get_ring_member_info( + txs.iter() + .zip(verification_needed.iter()) + .filter(tx_filter) + .map(|(tx, _)| tx), + hf, + database, + ) + .await?; rayon_spawn_async(move || { let batch_verifier = MultiThreadedBatchVerifier::new(rayon::current_num_threads()); - txs.par_iter() - .zip(txs_ring_member_info.par_iter()) + txs.iter() + .zip(verification_needed.iter()) + .filter(tx_filter) + .zip(txs_ring_member_info.iter()) + .par_bridge() .try_for_each(|((tx, verification_needed), ring)| { // do semantic validation if needed. if *verification_needed == VerificationNeeded::SemanticAndContextual { @@ -459,11 +520,12 @@ where return Err(ExtendedConsensusError::OneOrMoreBatchVerificationStatementsInvalid); } - txs.iter() + txs.iter_mut() + .zip(verification_needed.iter()) + .filter(tx_filter) .zip(txs_ring_member_info) .for_each(|((tx, _), ring)| { - *tx.cached_verification_state.lock().unwrap() = if ring.time_locked_outs.is_empty() - { + tx.cached_verification_state = if ring.time_locked_outs.is_empty() { // no outputs with time-locks used. CachedVerificationState::ValidAtHashAndHF { block_hash: top_hash, @@ -475,7 +537,7 @@ where .time_locked_outs .iter() .filter_map(|lock| match lock { - Timelock::Time(time) => Some(*time), + Timelock::Time(time) => Some(time), _ => None, }) .min(); @@ -485,7 +547,7 @@ where CachedVerificationState::ValidAtHashAndHFWithTimeBasedLock { block_hash: top_hash, hf, - time_lock: Timelock::Time(time), + time_lock: Timelock::Time(*time), } } else { // no time-based locked output was used. @@ -497,9 +559,7 @@ where } }); - Ok(()) + Ok(txs) }) - .await?; - - Ok(()) + .await } diff --git a/consensus/src/transactions/contextual_data.rs b/consensus/src/transactions/contextual_data.rs index 66c53b34..f9e3115a 100644 --- a/consensus/src/transactions/contextual_data.rs +++ b/consensus/src/transactions/contextual_data.rs @@ -10,10 +10,7 @@ //! //! Because this data is unique for *every* transaction and the context service is just for blockchain state data. //! -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use std::collections::{HashMap, HashSet}; use monero_serai::transaction::{Input, Timelock}; use tower::ServiceExt; @@ -142,7 +139,7 @@ fn new_rings( /// This function batch gets all the ring members for the inputted transactions and fills in data about /// them. pub async fn batch_get_ring_member_info<D: Database>( - txs_verification_data: impl Iterator<Item = &Arc<TransactionVerificationData>> + Clone, + txs_verification_data: impl Iterator<Item = &TransactionVerificationData> + Clone, hf: HardFork, mut database: D, ) -> Result<Vec<TxRingMembersInfo>, ExtendedConsensusError> { @@ -205,22 +202,20 @@ pub async fn batch_get_ring_member_info<D: Database>( /// This functions panics if `hf == HardFork::V1` as decoy info /// should not be needed for V1. #[instrument(level = "debug", skip_all)] -pub async fn batch_get_decoy_info<'a, D: Database + Clone + Send + 'static>( - txs_verification_data: &'a [Arc<TransactionVerificationData>], +pub async fn batch_get_decoy_info<'a, D: Database>( + txs_verification_data: impl Iterator<Item = &'a TransactionVerificationData> + Clone, hf: HardFork, mut database: D, -) -> Result<impl Iterator<Item = Result<DecoyInfo, ConsensusError>> + 'a, ExtendedConsensusError> { +) -> Result< + impl Iterator<Item = Result<DecoyInfo, ConsensusError>> + sealed::Captures<&'a ()>, + ExtendedConsensusError, +> { // decoy info is not needed for V1. assert_ne!(hf, HardFork::V1); - tracing::debug!( - "Retrieving decoy info for {} txs.", - txs_verification_data.len() - ); - // Get all the different input amounts. let unique_input_amounts = txs_verification_data - .iter() + .clone() .flat_map(|tx_info| { tx_info.tx.prefix().inputs.iter().map(|input| match input { Input::ToKey { amount, .. } => amount.unwrap_or(0), @@ -245,7 +240,7 @@ pub async fn batch_get_decoy_info<'a, D: Database + Clone + Send + 'static>( panic!("Database sent incorrect response!") }; - Ok(txs_verification_data.iter().map(move |tx_v_data| { + Ok(txs_verification_data.map(move |tx_v_data| { DecoyInfo::new( &tx_v_data.tx.prefix().inputs, |amt| outputs_with_amount.get(&amt).copied().unwrap_or(0), @@ -254,3 +249,11 @@ pub async fn batch_get_decoy_info<'a, D: Database + Clone + Send + 'static>( .map_err(ConsensusError::Transaction) })) } + +mod sealed { + /// TODO: Remove me when 2024 Rust + /// + /// <https://rust-lang.github.io/rfcs/3498-lifetime-capture-rules-2024.html#the-captures-trick> + pub trait Captures<U> {} + impl<T: ?Sized, U> Captures<U> for T {} +} diff --git a/consensus/src/transactions/free.rs b/consensus/src/transactions/free.rs index 3613f292..0d642123 100644 --- a/consensus/src/transactions/free.rs +++ b/consensus/src/transactions/free.rs @@ -1,5 +1,3 @@ -use std::sync::Mutex as StdMutex; - use monero_serai::{ ringct::{bulletproofs::Bulletproof, RctType}, transaction::{Input, Transaction}, @@ -31,7 +29,7 @@ pub fn new_tx_verification_data( tx_blob, tx_weight, fee, - cached_verification_state: StdMutex::new(CachedVerificationState::NotVerified), + cached_verification_state: CachedVerificationState::NotVerified, tx, }) } diff --git a/consensus/tests/verify_correct_txs.rs b/consensus/tests/verify_correct_txs.rs index 4d6c1793..54158497 100644 --- a/consensus/tests/verify_correct_txs.rs +++ b/consensus/tests/verify_correct_txs.rs @@ -9,11 +9,9 @@ use std::{ use curve25519_dalek::{constants::ED25519_BASEPOINT_POINT, edwards::CompressedEdwardsY}; use monero_serai::transaction::{Timelock, Transaction}; -use tower::{service_fn, Service, ServiceExt}; +use tower::service_fn; -use cuprate_consensus::{ - TxVerifierService, VerifyTxRequest, VerifyTxResponse, __private::Database, -}; +use cuprate_consensus::{__private::Database, transactions::start_tx_verification}; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, OutputOnChain, @@ -82,17 +80,17 @@ macro_rules! test_verify_valid_v2_tx { let map = BTreeMap::from_iter(members); let database = dummy_database(map); - let mut tx_verifier = TxVerifierService::new(database); - - assert!(matches!(tx_verifier.ready().await.unwrap().call( - VerifyTxRequest::New { - txs: vec![Transaction::read(&mut $tx).unwrap()].into(), - current_chain_height: 10, - top_hash: [0; 32], - hf: HardFork::$hf, - time_for_time_lock: u64::MAX - } - ).await.unwrap(), VerifyTxResponse::OkPrepped(_))); + assert!( + start_tx_verification() + .append_txs( + vec![Transaction::read(&mut $tx).unwrap()] + ) + .prepare() + .unwrap() + .full(10, [0; 32], u64::MAX, HardFork::$hf, database.clone()) + .verify() + .await.is_ok() + ); // Check verification fails if we put random ring members @@ -111,17 +109,17 @@ macro_rules! test_verify_valid_v2_tx { let map = BTreeMap::from_iter(members); let database = dummy_database(map); - let mut tx_verifier = TxVerifierService::new(database); - - assert!(tx_verifier.ready().await.unwrap().call( - VerifyTxRequest::New { - txs: vec![Transaction::read(&mut $tx).unwrap()].into(), - current_chain_height: 10, - top_hash: [0; 32], - hf: HardFork::$hf, - time_for_time_lock: u64::MAX - } - ).await.is_err()); + assert!( + start_tx_verification() + .append_txs( + vec![Transaction::read(&mut $tx).unwrap()] + ) + .prepare() + .unwrap() + .full(10, [0; 32], u64::MAX, HardFork::$hf, database.clone()) + .verify() + .await.is_err() + ); } }; diff --git a/storage/txpool/src/ops/tx_read.rs b/storage/txpool/src/ops/tx_read.rs index 24101f77..cad886b3 100644 --- a/storage/txpool/src/ops/tx_read.rs +++ b/storage/txpool/src/ops/tx_read.rs @@ -1,8 +1,6 @@ //! Transaction read ops. //! //! This module handles reading full transaction data, like getting a transaction from the pool. -use std::sync::Mutex; - use monero_serai::transaction::Transaction; use cuprate_database::{DatabaseRo, DbResult}; @@ -34,7 +32,7 @@ pub fn get_transaction_verification_data( tx_weight: tx_info.weight, fee: tx_info.fee, tx_hash: *tx_hash, - cached_verification_state: Mutex::new(cached_verification_state), + cached_verification_state, }) } diff --git a/storage/txpool/src/ops/tx_write.rs b/storage/txpool/src/ops/tx_write.rs index 8f426fb7..f1f43b2d 100644 --- a/storage/txpool/src/ops/tx_write.rs +++ b/storage/txpool/src/ops/tx_write.rs @@ -48,7 +48,7 @@ pub fn add_transaction( )?; // Add the cached verification state to table 2. - let cached_verification_state = (*tx.cached_verification_state.lock().unwrap()).into(); + let cached_verification_state = tx.cached_verification_state.into(); tables .cached_verification_state_mut() .put(&tx.tx_hash, &cached_verification_state)?; diff --git a/storage/txpool/src/service.rs b/storage/txpool/src/service.rs index 08981a40..2a13f1c6 100644 --- a/storage/txpool/src/service.rs +++ b/storage/txpool/src/service.rs @@ -92,7 +92,7 @@ //! // Prepare a request to write block. //! let tx = TX_V1_SIG2.clone(); //! let request = TxpoolWriteRequest::AddTransaction { -//! tx: Arc::new(tx.try_into().unwrap()), +//! tx: Box::new(tx.try_into().unwrap()), //! state_stem: false, //! }; //! diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index a27c6309..e83fd429 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -1,10 +1,7 @@ //! Tx-pool [`service`](super) interface. //! //! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums. -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use std::collections::{HashMap, HashSet}; use cuprate_types::TransactionVerificationData; @@ -90,7 +87,7 @@ pub enum TxpoolWriteRequest { /// Returns [`TxpoolWriteResponse::AddTransaction`]. AddTransaction { /// The tx to add. - tx: Arc<TransactionVerificationData>, + tx: Box<TransactionVerificationData>, /// A [`bool`] denoting the routing state of this tx. /// /// [`true`] if this tx is in the stem state. diff --git a/storage/txpool/src/types.rs b/storage/txpool/src/types.rs index 2acb819e..b6478b4b 100644 --- a/storage/txpool/src/types.rs +++ b/storage/txpool/src/types.rs @@ -71,6 +71,13 @@ impl From<RawCachedVerificationState> for CachedVerificationState { fn from(value: RawCachedVerificationState) -> Self { // if the hash is all `0`s then there is no hash this is valid at. if value.raw_valid_at_hash == [0; 32] { + if value.raw_hf != 0 { + return Self::OnlySemantic( + HardFork::from_version(value.raw_hf) + .expect("hard-fork values stored in the DB should always be valid"), + ); + } + return Self::NotVerified; } @@ -103,6 +110,11 @@ impl From<CachedVerificationState> for RawCachedVerificationState { raw_hf: 0, raw_valid_past_timestamp: [0; 8], }, + CachedVerificationState::OnlySemantic(hf) => Self { + raw_valid_at_hash: [0; 32], + raw_hf: hf.as_u8(), + raw_valid_past_timestamp: [0; 8], + }, CachedVerificationState::ValidAtHashAndHF { block_hash, hf } => Self { raw_valid_at_hash: block_hash, raw_hf: hf.as_u8(), diff --git a/types/src/transaction_verification_data.rs b/types/src/transaction_verification_data.rs index 3dfe5fdf..89c1fdf3 100644 --- a/types/src/transaction_verification_data.rs +++ b/types/src/transaction_verification_data.rs @@ -1,7 +1,5 @@ //! Contains [`TransactionVerificationData`] and the related types. -use std::sync::Mutex; - use monero_serai::transaction::{Timelock, Transaction}; use crate::{HardFork, VerifiedTransactionInformation}; @@ -37,6 +35,8 @@ impl TxVersion { pub enum CachedVerificationState { /// The transaction has not been validated. NotVerified, + /// The transaction was only validated semantically. + OnlySemantic(HardFork), /// The transaction is valid* if the block represented by this hash is in the blockchain and the [`HardFork`] /// is the same. /// @@ -67,7 +67,7 @@ impl CachedVerificationState { /// Returns the block hash this is valid for if in state [`CachedVerificationState::ValidAtHashAndHF`] or [`CachedVerificationState::ValidAtHashAndHFWithTimeBasedLock`]. pub const fn verified_at_block_hash(&self) -> Option<[u8; 32]> { match self { - Self::NotVerified => None, + Self::NotVerified | Self::OnlySemantic(_) => None, Self::ValidAtHashAndHF { block_hash, .. } | Self::ValidAtHashAndHFWithTimeBasedLock { block_hash, .. } => Some(*block_hash), } @@ -75,7 +75,7 @@ impl CachedVerificationState { } /// Data needed to verify a transaction. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TransactionVerificationData { /// The transaction we are verifying pub tx: Transaction, @@ -90,7 +90,7 @@ pub struct TransactionVerificationData { /// The hash of this transaction. pub tx_hash: [u8; 32], /// The verification state of this transaction. - pub cached_verification_state: Mutex<CachedVerificationState>, + pub cached_verification_state: CachedVerificationState, } #[derive(Debug, Copy, Clone, thiserror::Error)] @@ -108,7 +108,7 @@ impl TryFrom<VerifiedTransactionInformation> for TransactionVerificationData { tx_weight: value.tx_weight, fee: value.fee, tx_hash: value.tx_hash, - cached_verification_state: Mutex::new(CachedVerificationState::NotVerified), + cached_verification_state: CachedVerificationState::NotVerified, }) } }