From 978d72b6c14f27bd61836d90f304d766cc7a93f2 Mon Sep 17 00:00:00 2001 From: SyntheticBird <118022351+SyntheticBird45@users.noreply.github.com> Date: Wed, 16 Oct 2024 23:17:58 +0000 Subject: [PATCH 1/5] Move consensus context service into a subcrate. (#318) Co-authored-by: Boog900 --- Cargo.lock | 23 +++++- Cargo.toml | 3 +- binaries/cuprated/Cargo.toml | 1 + binaries/cuprated/src/blockchain/manager.rs | 7 +- .../src/blockchain/manager/handler.rs | 8 +-- .../src/rpc/request/blockchain_context.rs | 2 +- books/architecture/src/appendix/crates.md | 3 +- consensus/Cargo.toml | 6 +- consensus/context/Cargo.toml | 24 +++++++ .../context => context/src}/alt_chains.rs | 13 ++-- .../context => context/src}/difficulty.rs | 23 +++--- .../{src/context => context/src}/hardforks.rs | 28 ++++---- .../{src/context.rs => context/src/lib.rs} | 71 ++++++++++++++++--- .../{src/context => context/src}/rx_vms.rs | 32 ++++----- .../{src/context => context/src}/task.rs | 13 ++-- .../{src/context => context/src}/tokens.rs | 0 .../{src/context => context/src}/weight.rs | 16 ++--- consensus/fast-sync/Cargo.toml | 13 ++-- consensus/fast-sync/src/fast_sync.rs | 6 +- consensus/src/block.rs | 4 +- consensus/src/block/alt_block.rs | 12 ++-- consensus/src/block/batch_prepare.rs | 2 +- consensus/src/lib.rs | 3 +- consensus/src/tests/context.rs | 12 ++-- consensus/src/tests/context/difficulty.rs | 2 +- consensus/src/tests/context/hardforks.rs | 10 ++- consensus/src/tests/context/rx_vms.rs | 6 +- consensus/src/tests/context/weight.rs | 8 +-- 28 files changed, 218 insertions(+), 133 deletions(-) create mode 100644 consensus/context/Cargo.toml rename consensus/{src/context => context/src}/alt_chains.rs (94%) rename consensus/{src/context => context/src}/difficulty.rs (95%) rename consensus/{src/context => context/src}/hardforks.rs (90%) rename consensus/{src/context.rs => context/src/lib.rs} (88%) rename consensus/{src/context => context/src}/rx_vms.rs (90%) rename consensus/{src/context => context/src}/task.rs (96%) rename consensus/{src/context => context/src}/tokens.rs (100%) rename consensus/{src/context => context/src}/weight.rs (96%) diff --git a/Cargo.lock b/Cargo.lock index 1b05efa..ca0174b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -575,6 +575,7 @@ name = "cuprate-consensus" version = "0.1.0" dependencies = [ "cfg-if", + "cuprate-consensus-context", "cuprate-consensus-rules", "cuprate-helper", "cuprate-test-utils", @@ -587,12 +588,30 @@ dependencies = [ "proptest", "proptest-derive", "rand", - "randomx-rs", "rayon", "thiserror", "thread_local", "tokio", "tokio-test", + "tower 0.5.1", + "tracing", +] + +[[package]] +name = "cuprate-consensus-context" +version = "0.1.0" +dependencies = [ + "cuprate-consensus-rules", + "cuprate-helper", + "cuprate-types", + "futures", + "hex", + "monero-serai", + "randomx-rs", + "rayon", + "thiserror", + "thread_local", + "tokio", "tokio-util", "tower 0.5.1", "tracing", @@ -704,6 +723,7 @@ dependencies = [ "clap", "cuprate-blockchain", "cuprate-consensus", + "cuprate-consensus-context", "cuprate-consensus-rules", "cuprate-helper", "cuprate-types", @@ -972,6 +992,7 @@ dependencies = [ "cuprate-async-buffer", "cuprate-blockchain", "cuprate-consensus", + "cuprate-consensus-context", "cuprate-consensus-rules", "cuprate-cryptonight", "cuprate-dandelion-tower", diff --git a/Cargo.toml b/Cargo.toml index 6c322fb..2ef99d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "binaries/cuprated", "constants", "consensus", + "consensus/context", "consensus/fast-sync", "consensus/rules", "cryptonight", @@ -322,4 +323,4 @@ non_camel_case_types = "deny" # unused_results = "deny" # non_exhaustive_omitted_patterns = "deny" # missing_docs = "deny" -# missing_copy_implementations = "deny" \ No newline at end of file +# missing_copy_implementations = "deny" diff --git a/binaries/cuprated/Cargo.toml b/binaries/cuprated/Cargo.toml index 325406b..59fa978 100644 --- a/binaries/cuprated/Cargo.toml +++ b/binaries/cuprated/Cargo.toml @@ -11,6 +11,7 @@ repository = "https://github.com/Cuprate/cuprate/tree/main/binaries/cuprated" # TODO: after v1.0.0, remove unneeded dependencies. cuprate-consensus = { path = "../../consensus" } cuprate-fast-sync = { path = "../../consensus/fast-sync" } +cuprate-consensus-context = { path = "../../consensus/context" } cuprate-consensus-rules = { path = "../../consensus/rules" } cuprate-cryptonight = { path = "../../cryptonight" } cuprate-helper = { path = "../../helper" } diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index 118c8dd..8e613bc 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -8,10 +8,11 @@ use tracing::error; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_consensus::{ - context::RawBlockChainContext, BlockChainContextRequest, BlockChainContextResponse, - BlockChainContextService, BlockVerifierService, ExtendedConsensusError, TxVerifierService, - VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse, + BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, + BlockVerifierService, ExtendedConsensusError, TxVerifierService, VerifyBlockRequest, + VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse, }; +use cuprate_consensus_context::RawBlockChainContext; use cuprate_p2p::{ block_downloader::{BlockBatch, BlockDownloaderConfig}, BroadcastSvc, NetworkInterface, diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index 9603bad..e9805cd 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -10,11 +10,11 @@ use tracing::info; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_consensus::{ - block::PreparedBlock, context::NewBlockData, transactions::new_tx_verification_data, - BlockChainContextRequest, BlockChainContextResponse, BlockVerifierService, - ExtendedConsensusError, VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, - VerifyTxResponse, + block::PreparedBlock, transactions::new_tx_verification_data, BlockChainContextRequest, + BlockChainContextResponse, BlockVerifierService, ExtendedConsensusError, VerifyBlockRequest, + VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse, }; +use cuprate_consensus_context::NewBlockData; use cuprate_helper::cast::usize_to_u64; use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest}; use cuprate_types::{ diff --git a/binaries/cuprated/src/rpc/request/blockchain_context.rs b/binaries/cuprated/src/rpc/request/blockchain_context.rs index b616593..2b14d46 100644 --- a/binaries/cuprated/src/rpc/request/blockchain_context.rs +++ b/binaries/cuprated/src/rpc/request/blockchain_context.rs @@ -5,7 +5,7 @@ use std::convert::Infallible; use anyhow::Error; use tower::{Service, ServiceExt}; -use cuprate_consensus::context::{ +use cuprate_consensus_context::{ BlockChainContext, BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, }; diff --git a/books/architecture/src/appendix/crates.md b/books/architecture/src/appendix/crates.md index 1993c47..fe8f1f0 100644 --- a/books/architecture/src/appendix/crates.md +++ b/books/architecture/src/appendix/crates.md @@ -16,7 +16,8 @@ cargo doc --open --package cuprate-blockchain | Crate | In-tree path | Purpose | |-------|--------------|---------| | [`cuprate-consensus`](https://doc.cuprate.org/cuprate_consensus) | [`consensus/`](https://github.com/Cuprate/cuprate/tree/main/consensus) | TODO -| [`cuprate-consensus-rules`](https://doc.cuprate.org/cuprate_consensus_rules) | [`consensus/rules/`](https://github.com/Cuprate/cuprate/tree/main/consensus-rules) | TODO +| [`cuprate-consensus-context`](https://doc.cuprate.org/cuprate_consensus_context) | [`consensus/context/`](https://github.com/Cuprate/cuprate/tree/main/consensus/context) | TODO +| [`cuprate-consensus-rules`](https://doc.cuprate.org/cuprate_consensus_rules) | [`consensus/rules/`](https://github.com/Cuprate/cuprate/tree/main/consensus/rules) | TODO | [`cuprate-fast-sync`](https://doc.cuprate.org/cuprate_fast_sync) | [`consensus/fast-sync/`](https://github.com/Cuprate/cuprate/tree/main/consensus/fast-sync) | Fast block synchronization ## Networking diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 12d97ee..1fdee89 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -11,6 +11,7 @@ repository = "https://github.com/Cuprate/cuprate/tree/main/consensus" cuprate-helper = { path = "../helper", default-features = false, features = ["std", "asynch", "num"] } cuprate-consensus-rules = { path = "./rules", features = ["rayon"] } cuprate-types = { path = "../types" } +cuprate-consensus-context = { path = "./context" } cfg-if = { workspace = true } thiserror = { workspace = true } @@ -18,13 +19,10 @@ tower = { workspace = true, features = ["util"] } tracing = { workspace = true, features = ["std", "attributes"] } futures = { workspace = true, features = ["std", "async-await"] } -randomx-rs = { workspace = true } monero-serai = { workspace = true, features = ["std"] } rayon = { workspace = true } thread_local = { workspace = true } -tokio = { workspace = true, features = ["rt"] } -tokio-util = { workspace = true } hex = { workspace = true } rand = { workspace = true } @@ -42,4 +40,4 @@ proptest = { workspace = true } proptest-derive = { workspace = true } [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/consensus/context/Cargo.toml b/consensus/context/Cargo.toml new file mode 100644 index 0000000..0080420 --- /dev/null +++ b/consensus/context/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "cuprate-consensus-context" +version = "0.1.0" +edition = "2021" +license = "MIT" +authors = ["SyntheticBird","Boog900"] + +[dependencies] +cuprate-consensus-rules = { path = "../rules", features = ["proptest"]} +cuprate-helper = { path = "../../helper", default-features = false, features = ["std", "cast"] } +cuprate-types = { path = "../../types", default-features = false } + +futures = { workspace = true, features = ["std", "async-await"] } +tokio = { workspace = true, features = ["rt-multi-thread", "macros"]} +tokio-util = { workspace = true } +tower = { workspace = true, features = ["util"] } +tracing = { workspace = true, features = ["std", "attributes"] } +thiserror = { workspace = true } + +monero-serai = { workspace = true, features = ["std"] } +randomx-rs = { workspace = true } +rayon = { workspace = true } +thread_local = { workspace = true } +hex = { workspace = true } diff --git a/consensus/src/context/alt_chains.rs b/consensus/context/src/alt_chains.rs similarity index 94% rename from consensus/src/context/alt_chains.rs rename to consensus/context/src/alt_chains.rs index cd945c8..df82ef3 100644 --- a/consensus/src/context/alt_chains.rs +++ b/consensus/context/src/alt_chains.rs @@ -9,9 +9,8 @@ use cuprate_types::{ }; use crate::{ - ExtendedConsensusError, - __private::Database, - context::{difficulty::DifficultyCache, rx_vms::RandomXVm, weight::BlockWeightsCache}, + ContextCacheError, __private::Database, difficulty::DifficultyCache, rx_vms::RandomXVm, + weight::BlockWeightsCache, }; pub(crate) mod sealed { @@ -38,7 +37,7 @@ pub struct AltChainContextCache { pub chain_height: usize, /// The top hash of the alt chain. pub top_hash: [u8; 32], - /// The [`ChainID`] of the alt chain. + /// The [`ChainId`] of the alt chain. pub chain_id: Option, /// The parent [`Chain`] of this alt chain. pub parent_chain: Chain, @@ -98,7 +97,7 @@ impl AltChainMap { &mut self, prev_id: [u8; 32], database: D, - ) -> Result, ExtendedConsensusError> { + ) -> Result, ContextCacheError> { if let Some(cache) = self.alt_cache_map.remove(&prev_id) { return Ok(cache); } @@ -133,7 +132,7 @@ pub(crate) async fn get_alt_chain_difficulty_cache( prev_id: [u8; 32], main_chain_difficulty_cache: &DifficultyCache, mut database: D, -) -> Result { +) -> Result { // find the block with hash == prev_id. let BlockchainResponse::FindBlock(res) = database .ready() @@ -180,7 +179,7 @@ pub(crate) async fn get_alt_chain_weight_cache( prev_id: [u8; 32], main_chain_weight_cache: &BlockWeightsCache, mut database: D, -) -> Result { +) -> Result { // find the block with hash == prev_id. let BlockchainResponse::FindBlock(res) = database .ready() diff --git a/consensus/src/context/difficulty.rs b/consensus/context/src/difficulty.rs similarity index 95% rename from consensus/src/context/difficulty.rs rename to consensus/context/src/difficulty.rs index 9316dc5..e3f558a 100644 --- a/consensus/src/context/difficulty.rs +++ b/consensus/context/src/difficulty.rs @@ -17,7 +17,7 @@ use cuprate_types::{ Chain, }; -use crate::{Database, ExtendedConsensusError, HardFork}; +use crate::{ContextCacheError, Database, HardFork}; /// The amount of blocks we account for to calculate difficulty const DIFFICULTY_WINDOW: usize = 720; @@ -33,9 +33,9 @@ const DIFFICULTY_LAG: usize = 15; /// #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub struct DifficultyCacheConfig { - pub(crate) window: usize, - pub(crate) cut: usize, - pub(crate) lag: usize, + pub window: usize, + pub cut: usize, + pub lag: usize, } impl DifficultyCacheConfig { @@ -73,14 +73,13 @@ impl DifficultyCacheConfig { #[derive(Debug, Clone, Eq, PartialEq)] pub struct DifficultyCache { /// The list of timestamps in the window. - /// len <= [`DIFFICULTY_BLOCKS_COUNT`] - pub(crate) timestamps: VecDeque, + pub timestamps: VecDeque, /// The current cumulative difficulty of the chain. - pub(crate) cumulative_difficulties: VecDeque, + pub cumulative_difficulties: VecDeque, /// The last height we accounted for. - pub(crate) last_accounted_height: usize, + pub last_accounted_height: usize, /// The config - pub(crate) config: DifficultyCacheConfig, + pub config: DifficultyCacheConfig, } impl DifficultyCache { @@ -91,7 +90,7 @@ impl DifficultyCache { config: DifficultyCacheConfig, database: D, chain: Chain, - ) -> Result { + ) -> Result { tracing::info!("Initializing difficulty cache this may take a while."); let mut block_start = chain_height.saturating_sub(config.total_block_count()); @@ -134,7 +133,7 @@ impl DifficultyCache { &mut self, numb_blocks: usize, database: D, - ) -> Result<(), ExtendedConsensusError> { + ) -> Result<(), ContextCacheError> { let Some(retained_blocks) = self.timestamps.len().checked_sub(numb_blocks) else { // More blocks to pop than we have in the cache, so just restart a new cache. *self = Self::init_from_chain_height( @@ -361,7 +360,7 @@ async fn get_blocks_in_pow_info( database: D, block_heights: Range, chain: Chain, -) -> Result<(VecDeque, VecDeque), ExtendedConsensusError> { +) -> Result<(VecDeque, VecDeque), ContextCacheError> { tracing::info!("Getting blocks timestamps"); let BlockchainResponse::BlockExtendedHeaderInRange(ext_header) = database diff --git a/consensus/src/context/hardforks.rs b/consensus/context/src/hardforks.rs similarity index 90% rename from consensus/src/context/hardforks.rs rename to consensus/context/src/hardforks.rs index 16ae763..e6af492 100644 --- a/consensus/src/context/hardforks.rs +++ b/consensus/context/src/hardforks.rs @@ -9,7 +9,7 @@ use cuprate_types::{ Chain, }; -use crate::{Database, ExtendedConsensusError}; +use crate::{ContextCacheError, Database}; /// The default amount of hard-fork votes to track to decide on activation of a hard-fork. /// @@ -21,9 +21,9 @@ const DEFAULT_WINDOW_SIZE: usize = 10080; // supermajority window check length - #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub struct HardForkConfig { /// The network we are on. - pub(crate) info: HFsInfo, + pub info: HFsInfo, /// The amount of votes we are taking into account to decide on a fork activation. - pub(crate) window: usize, + pub window: usize, } impl HardForkConfig { @@ -54,17 +54,17 @@ impl HardForkConfig { /// A struct that keeps track of the current hard-fork and current votes. #[derive(Debug, Clone, Eq, PartialEq)] -pub(crate) struct HardForkState { +pub struct HardForkState { /// The current active hard-fork. - pub(crate) current_hardfork: HardFork, + pub current_hardfork: HardFork, /// The hard-fork config. - pub(crate) config: HardForkConfig, + pub config: HardForkConfig, /// The votes in the current window. - pub(crate) votes: HFVotes, + pub votes: HFVotes, /// The last block height accounted for. - pub(crate) last_height: usize, + pub last_height: usize, } impl HardForkState { @@ -74,7 +74,7 @@ impl HardForkState { chain_height: usize, config: HardForkConfig, mut database: D, - ) -> Result { + ) -> Result { tracing::info!("Initializing hard-fork state this may take a while."); let block_start = chain_height.saturating_sub(config.window); @@ -122,11 +122,11 @@ impl HardForkState { /// # Invariant /// /// This _must_ only be used on a main-chain cache. - pub(crate) async fn pop_blocks_main_chain( + pub async fn pop_blocks_main_chain( &mut self, numb_blocks: usize, database: D, - ) -> Result<(), ExtendedConsensusError> { + ) -> Result<(), ContextCacheError> { let Some(retained_blocks) = self.votes.total_votes().checked_sub(self.config.window) else { *self = Self::init_from_chain_height( self.last_height + 1 - numb_blocks, @@ -159,7 +159,7 @@ impl HardForkState { } /// Add a new block to the cache. - pub(crate) fn new_block(&mut self, vote: HardFork, height: usize) { + pub fn new_block(&mut self, vote: HardFork, height: usize) { // We don't _need_ to take in `height` but it's for safety, so we don't silently loose track // of blocks. assert_eq!(self.last_height + 1, height); @@ -194,7 +194,7 @@ impl HardForkState { } /// Returns the current hard-fork. - pub(crate) const fn current_hardfork(&self) -> HardFork { + pub const fn current_hardfork(&self) -> HardFork { self.current_hardfork } } @@ -205,7 +205,7 @@ async fn get_votes_in_range( database: D, block_heights: Range, window_size: usize, -) -> Result { +) -> Result { let mut votes = HFVotes::new(window_size); let BlockchainResponse::BlockExtendedHeaderInRange(vote_list) = database diff --git a/consensus/src/context.rs b/consensus/context/src/lib.rs similarity index 88% rename from consensus/src/context.rs rename to consensus/context/src/lib.rs index 3c944a9..82e601d 100644 --- a/consensus/src/context.rs +++ b/consensus/context/src/lib.rs @@ -1,6 +1,6 @@ //! # Blockchain Context //! -//! This module contains a service to get cached context from the blockchain: [`BlockChainContext`]. +//! This crate contains a service to get cached context from the blockchain: [`BlockChainContext`]. //! This is used during contextual validation, this does not have all the data for contextual validation //! (outputs) for that you will need a [`Database`]. //! @@ -18,14 +18,14 @@ use tokio::sync::mpsc; use tokio_util::sync::PollSender; use tower::Service; -use cuprate_consensus_rules::{blocks::ContextToVerifyBlock, current_unix_timestamp, HardFork}; +use cuprate_consensus_rules::{ + blocks::ContextToVerifyBlock, current_unix_timestamp, ConsensusError, HardFork, +}; -use crate::{Database, ExtendedConsensusError}; - -pub(crate) mod difficulty; -pub(crate) mod hardforks; -pub(crate) mod rx_vms; -pub(crate) mod weight; +pub mod difficulty; +pub mod hardforks; +pub mod rx_vms; +pub mod weight; mod alt_chains; mod task; @@ -36,13 +36,13 @@ use difficulty::DifficultyCache; use rx_vms::RandomXVm; use weight::BlockWeightsCache; -pub(crate) use alt_chains::{sealed::AltChainRequestToken, AltChainContextCache}; +pub use alt_chains::{sealed::AltChainRequestToken, AltChainContextCache}; pub use difficulty::DifficultyCacheConfig; pub use hardforks::HardForkConfig; pub use tokens::*; pub use weight::BlockWeightsCacheConfig; -pub(crate) const BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW: u64 = 60; +pub const BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW: u64 = 60; /// Config for the context service. pub struct ContextConfig { @@ -91,7 +91,7 @@ impl ContextConfig { pub async fn initialize_blockchain_context( cfg: ContextConfig, database: D, -) -> Result +) -> Result where D: Database + Clone + Send + Sync + 'static, D::Future: Send + 'static, @@ -414,3 +414,52 @@ impl Service for BlockChainContextService { .boxed() } } + +#[derive(Debug, thiserror::Error)] +pub enum ContextCacheError { + /// A consensus error. + #[error("{0}")] + ConErr(#[from] ConsensusError), + /// A database error. + #[error("Database error: {0}")] + DBErr(#[from] tower::BoxError), +} + +use __private::Database; + +pub mod __private { + use std::future::Future; + + use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; + + /// A type alias trait used to represent a database, so we don't have to write [`tower::Service`] bounds + /// everywhere. + /// + /// Automatically implemented for: + /// ```ignore + /// tower::Service + /// ``` + pub trait Database: + tower::Service< + BlockchainReadRequest, + Response = BlockchainResponse, + Error = tower::BoxError, + Future = Self::Future2, + > + { + type Future2: Future> + Send + 'static; + } + + impl< + T: tower::Service< + BlockchainReadRequest, + Response = BlockchainResponse, + Error = tower::BoxError, + >, + > Database for T + where + T::Future: Future> + Send + 'static, + { + type Future2 = T::Future; + } +} diff --git a/consensus/src/context/rx_vms.rs b/consensus/context/src/rx_vms.rs similarity index 90% rename from consensus/src/context/rx_vms.rs rename to consensus/context/src/rx_vms.rs index c6375fc..803bb32 100644 --- a/consensus/src/context/rx_vms.rs +++ b/consensus/context/src/rx_vms.rs @@ -26,10 +26,10 @@ use cuprate_types::{ Chain, }; -use crate::{Database, ExtendedConsensusError}; +use crate::{ContextCacheError, Database}; /// The amount of randomX VMs to keep in the cache. -const RX_SEEDS_CACHED: usize = 2; +pub const RX_SEEDS_CACHED: usize = 2; /// A multithreaded randomX VM. #[derive(Debug)] @@ -72,14 +72,14 @@ impl RandomX for RandomXVm { /// The randomX VMs cache, keeps the VM needed to calculate the current block's proof-of-work hash (if a VM is needed) and a /// couple more around this VM. #[derive(Clone, Debug)] -pub(crate) struct RandomXVmCache { +pub struct RandomXVmCache { /// The top [`RX_SEEDS_CACHED`] RX seeds. - pub(crate) seeds: VecDeque<(usize, [u8; 32])>, + pub seeds: VecDeque<(usize, [u8; 32])>, /// The VMs for `seeds` (if after hf 12, otherwise this will be empty). - pub(crate) vms: HashMap>, + pub vms: HashMap>, /// A single cached VM that was given to us from a part of Cuprate. - pub(crate) cached_vm: Option<([u8; 32], Arc)>, + pub cached_vm: Option<([u8; 32], Arc)>, } impl RandomXVmCache { @@ -88,7 +88,7 @@ impl RandomXVmCache { chain_height: usize, hf: &HardFork, database: D, - ) -> Result { + ) -> Result { let seed_heights = get_last_rx_seed_heights(chain_height - 1, RX_SEEDS_CACHED); let seed_hashes = get_block_hashes(seed_heights.clone(), database).await?; @@ -125,18 +125,18 @@ impl RandomXVmCache { } /// Add a randomX VM to the cache, with the seed it was created with. - pub(crate) fn add_vm(&mut self, vm: ([u8; 32], Arc)) { + pub fn add_vm(&mut self, vm: ([u8; 32], Arc)) { self.cached_vm.replace(vm); } /// Creates a RX VM for an alt chain, looking at the main chain RX VMs to see if we can use one /// of them first. - pub(crate) async fn get_alt_vm( + pub async fn get_alt_vm( &self, height: usize, chain: Chain, database: D, - ) -> Result, ExtendedConsensusError> { + ) -> Result, ContextCacheError> { let seed_height = randomx_seed_height(height); let BlockchainResponse::BlockHash(seed_hash) = database @@ -162,7 +162,7 @@ impl RandomXVmCache { } /// Get the main-chain `RandomX` VMs. - pub(crate) async fn get_vms(&mut self) -> HashMap> { + pub async fn get_vms(&mut self) -> HashMap> { match self.seeds.len().checked_sub(self.vms.len()) { // No difference in the amount of seeds to VMs. Some(0) => (), @@ -214,7 +214,7 @@ impl RandomXVmCache { } /// Removes all the `RandomX` VMs above the `new_height`. - pub(crate) fn pop_blocks_main_chain(&mut self, new_height: usize) { + pub fn pop_blocks_main_chain(&mut self, new_height: usize) { self.seeds.retain(|(height, _)| *height < new_height); self.vms.retain(|height, _| *height < new_height); } @@ -222,7 +222,7 @@ impl RandomXVmCache { /// Add a new block to the VM cache. /// /// hash is the block hash not the blocks proof-of-work hash. - pub(crate) fn new_block(&mut self, height: usize, hash: &[u8; 32]) { + pub fn new_block(&mut self, height: usize, hash: &[u8; 32]) { if is_randomx_seed_height(height) { tracing::debug!("Block {height} is a randomX seed height, adding it to the cache.",); @@ -243,7 +243,7 @@ impl RandomXVmCache { /// Get the last `amount` of RX seeds, the top height returned here will not necessarily be the RX VM for the top block /// in the chain as VMs include some lag before a seed activates. -pub(crate) fn get_last_rx_seed_heights(mut last_height: usize, mut amount: usize) -> Vec { +pub fn get_last_rx_seed_heights(mut last_height: usize, mut amount: usize) -> Vec { let mut seeds = Vec::with_capacity(amount); if is_randomx_seed_height(last_height) { seeds.push(last_height); @@ -268,7 +268,7 @@ pub(crate) fn get_last_rx_seed_heights(mut last_height: usize, mut amount: usize async fn get_block_hashes( heights: Vec, database: D, -) -> Result, ExtendedConsensusError> { +) -> Result, ContextCacheError> { let mut fut = FuturesOrdered::new(); for height in heights { @@ -281,7 +281,7 @@ async fn get_block_hashes( else { panic!("Database sent incorrect response!"); }; - Result::<_, ExtendedConsensusError>::Ok(hash) + Result::<_, ContextCacheError>::Ok(hash) }); } diff --git a/consensus/src/context/task.rs b/consensus/context/src/task.rs similarity index 96% rename from consensus/src/context/task.rs rename to consensus/context/src/task.rs index c51c795..65cfea9 100644 --- a/consensus/src/context/task.rs +++ b/consensus/context/src/task.rs @@ -16,13 +16,10 @@ use cuprate_types::{ }; use crate::{ - context::{ - alt_chains::{get_alt_chain_difficulty_cache, get_alt_chain_weight_cache, AltChainMap}, - difficulty, hardforks, rx_vms, weight, BlockChainContext, BlockChainContextRequest, - BlockChainContextResponse, ContextConfig, RawBlockChainContext, ValidityToken, - BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW, - }, - Database, ExtendedConsensusError, + alt_chains::{get_alt_chain_difficulty_cache, get_alt_chain_weight_cache, AltChainMap}, + difficulty, hardforks, rx_vms, weight, BlockChainContext, BlockChainContextRequest, + BlockChainContextResponse, ContextCacheError, ContextConfig, Database, RawBlockChainContext, + ValidityToken, BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW, }; /// A request from the context service to the context task. @@ -68,7 +65,7 @@ impl ContextTask { pub(crate) async fn init_context( cfg: ContextConfig, mut database: D, - ) -> Result { + ) -> Result { let ContextConfig { difficulty_cfg, weights_config, diff --git a/consensus/src/context/tokens.rs b/consensus/context/src/tokens.rs similarity index 100% rename from consensus/src/context/tokens.rs rename to consensus/context/src/tokens.rs diff --git a/consensus/src/context/weight.rs b/consensus/context/src/weight.rs similarity index 96% rename from consensus/src/context/weight.rs rename to consensus/context/src/weight.rs index e95ae60..7f72599 100644 --- a/consensus/src/context/weight.rs +++ b/consensus/context/src/weight.rs @@ -21,12 +21,12 @@ use cuprate_types::{ Chain, }; -use crate::{Database, ExtendedConsensusError, HardFork}; +use crate::{ContextCacheError, Database, HardFork}; /// The short term block weight window. -const SHORT_TERM_WINDOW: usize = 100; +pub const SHORT_TERM_WINDOW: usize = 100; /// The long term block weight window. -const LONG_TERM_WINDOW: usize = 100000; +pub const LONG_TERM_WINDOW: usize = 100000; /// Configuration for the block weight cache. /// @@ -80,7 +80,7 @@ impl BlockWeightsCache { config: BlockWeightsCacheConfig, database: D, chain: Chain, - ) -> Result { + ) -> Result { tracing::info!("Initializing weight cache this may take a while."); let long_term_weights = get_long_term_weight_in_range( @@ -121,7 +121,7 @@ impl BlockWeightsCache { &mut self, numb_blocks: usize, database: D, - ) -> Result<(), ExtendedConsensusError> { + ) -> Result<(), ContextCacheError> { if self.long_term_weights.window_len() <= numb_blocks { // More blocks to pop than we have in the cache, so just restart a new cache. *self = Self::init_from_chain_height( @@ -258,7 +258,7 @@ fn calculate_effective_median_block_weight( } /// Calculates a blocks long term weight. -pub(crate) fn calculate_block_long_term_weight( +pub fn calculate_block_long_term_weight( hf: HardFork, block_weight: usize, long_term_median: usize, @@ -287,7 +287,7 @@ async fn get_blocks_weight_in_range( range: Range, database: D, chain: Chain, -) -> Result, ExtendedConsensusError> { +) -> Result, ContextCacheError> { tracing::info!("getting block weights."); let BlockchainResponse::BlockExtendedHeaderInRange(ext_headers) = database @@ -311,7 +311,7 @@ async fn get_long_term_weight_in_range( range: Range, database: D, chain: Chain, -) -> Result, ExtendedConsensusError> { +) -> Result, ContextCacheError> { tracing::info!("getting block long term weights."); let BlockchainResponse::BlockExtendedHeaderInRange(ext_headers) = database diff --git a/consensus/fast-sync/Cargo.toml b/consensus/fast-sync/Cargo.toml index 1d7d97b..aa9c8d2 100644 --- a/consensus/fast-sync/Cargo.toml +++ b/consensus/fast-sync/Cargo.toml @@ -9,11 +9,12 @@ name = "cuprate-fast-sync-create-hashes" path = "src/create.rs" [dependencies] -cuprate-blockchain = { path = "../../storage/blockchain" } -cuprate-consensus = { path = ".." } -cuprate-consensus-rules = { path = "../rules" } -cuprate-types = { path = "../../types" } -cuprate-helper = { path = "../../helper", features = ["cast"] } +cuprate-blockchain = { path = "../../storage/blockchain" } +cuprate-consensus = { path = ".." } +cuprate-consensus-rules = { path = "../rules" } +cuprate-consensus-context = { path = "../context" } +cuprate-types = { path = "../../types" } +cuprate-helper = { path = "../../helper", features = ["cast"] } clap = { workspace = true, features = ["derive", "std"] } hex = { workspace = true } @@ -27,4 +28,4 @@ tower = { workspace = true } [dev-dependencies] [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/consensus/fast-sync/src/fast_sync.rs b/consensus/fast-sync/src/fast_sync.rs index ec4ea29..3764e21 100644 --- a/consensus/fast-sync/src/fast_sync.rs +++ b/consensus/fast-sync/src/fast_sync.rs @@ -12,10 +12,8 @@ use monero_serai::{ }; use tower::{Service, ServiceExt}; -use cuprate_consensus::{ - context::{BlockChainContextRequest, BlockChainContextResponse}, - transactions::new_tx_verification_data, -}; +use cuprate_consensus::transactions::new_tx_verification_data; +use cuprate_consensus_context::{BlockChainContextRequest, BlockChainContextResponse}; use cuprate_consensus_rules::{miner_tx::MinerTxError, ConsensusError}; use cuprate_helper::cast::u64_to_usize; use cuprate_types::{VerifiedBlockInformation, VerifiedTransactionInformation}; diff --git a/consensus/src/block.rs b/consensus/src/block.rs index ceb2cba..3f5d749 100644 --- a/consensus/src/block.rs +++ b/consensus/src/block.rs @@ -14,6 +14,9 @@ use monero_serai::{ }; use tower::{Service, ServiceExt}; +use cuprate_consensus_context::{ + BlockChainContextRequest, BlockChainContextResponse, RawBlockChainContext, +}; use cuprate_helper::asynch::rayon_spawn_async; use cuprate_types::{ AltBlockInformation, TransactionVerificationData, VerifiedBlockInformation, @@ -30,7 +33,6 @@ use cuprate_consensus_rules::{ }; use crate::{ - context::{BlockChainContextRequest, BlockChainContextResponse, RawBlockChainContext}, transactions::{VerifyTxRequest, VerifyTxResponse}, Database, ExtendedConsensusError, }; diff --git a/consensus/src/block/alt_block.rs b/consensus/src/block/alt_block.rs index 3a5ea7c..18c2734 100644 --- a/consensus/src/block/alt_block.rs +++ b/consensus/src/block/alt_block.rs @@ -7,6 +7,12 @@ use std::{collections::HashMap, sync::Arc}; use monero_serai::{block::Block, transaction::Input}; use tower::{Service, ServiceExt}; +use cuprate_consensus_context::{ + difficulty::DifficultyCache, + rx_vms::RandomXVm, + weight::{self, BlockWeightsCache}, + AltChainContextCache, AltChainRequestToken, BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW, +}; use cuprate_consensus_rules::{ blocks::{ check_block_pow, check_block_weight, check_timestamp, randomx_seed_height, BlockError, @@ -22,12 +28,6 @@ use cuprate_types::{ use crate::{ block::{free::pull_ordered_transactions, PreparedBlock}, - context::{ - difficulty::DifficultyCache, - rx_vms::RandomXVm, - weight::{self, BlockWeightsCache}, - AltChainContextCache, AltChainRequestToken, BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW, - }, BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError, VerifyBlockResponse, }; diff --git a/consensus/src/block/batch_prepare.rs b/consensus/src/block/batch_prepare.rs index 029a5ae..ef384f5 100644 --- a/consensus/src/block/batch_prepare.rs +++ b/consensus/src/block/batch_prepare.rs @@ -5,6 +5,7 @@ use rayon::prelude::*; use tower::{Service, ServiceExt}; use tracing::instrument; +use cuprate_consensus_context::rx_vms::RandomXVm; use cuprate_consensus_rules::{ blocks::{check_block_pow, is_randomx_seed_height, randomx_seed_height, BlockError}, hard_forks::HardForkError, @@ -15,7 +16,6 @@ use cuprate_helper::asynch::rayon_spawn_async; use crate::{ block::{free::pull_ordered_transactions, PreparedBlock, PreparedBlockExPow}, - context::rx_vms::RandomXVm, transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError, VerifyBlockResponse, diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 7280f2f..f21d00b 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -24,13 +24,12 @@ use cuprate_consensus_rules::ConsensusError; mod batch_verifier; pub mod block; -pub mod context; #[cfg(test)] mod tests; pub mod transactions; pub use block::{BlockVerifierService, VerifyBlockRequest, VerifyBlockResponse}; -pub use context::{ +pub use cuprate_consensus_context::{ initialize_blockchain_context, BlockChainContext, BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, ContextConfig, }; diff --git a/consensus/src/tests/context.rs b/consensus/src/tests/context.rs index fdef0ac..b9c5217 100644 --- a/consensus/src/tests/context.rs +++ b/consensus/src/tests/context.rs @@ -2,15 +2,13 @@ use proptest::strategy::ValueTree; use proptest::{strategy::Strategy, test_runner::TestRunner}; use tower::ServiceExt; -use crate::{ - context::{ - initialize_blockchain_context, BlockChainContextRequest, BlockChainContextResponse, - ContextConfig, NewBlockData, - }, - tests::mock_db::*, - HardFork, +use cuprate_consensus_context::{ + initialize_blockchain_context, BlockChainContextRequest, BlockChainContextResponse, + ContextConfig, NewBlockData, }; +use crate::{tests::mock_db::*, HardFork}; + pub(crate) mod data; mod difficulty; mod hardforks; diff --git a/consensus/src/tests/context/difficulty.rs b/consensus/src/tests/context/difficulty.rs index d5027f5..f1c0fd9 100644 --- a/consensus/src/tests/context/difficulty.rs +++ b/consensus/src/tests/context/difficulty.rs @@ -4,10 +4,10 @@ use proptest::collection::{size_range, vec}; use proptest::{prelude::*, prop_assert_eq, prop_compose, proptest}; use crate::{ - context::difficulty::*, tests::{context::data::DIF_3000000_3002000, mock_db::*}, HardFork, }; +use cuprate_consensus_context::difficulty::*; use cuprate_helper::num::median; use cuprate_types::Chain; diff --git a/consensus/src/tests/context/hardforks.rs b/consensus/src/tests/context/hardforks.rs index 17bd47f..f080023 100644 --- a/consensus/src/tests/context/hardforks.rs +++ b/consensus/src/tests/context/hardforks.rs @@ -1,13 +1,11 @@ use proptest::{collection::vec, prelude::*}; +use cuprate_consensus_context::{hardforks::HardForkState, HardForkConfig}; use cuprate_consensus_rules::hard_forks::{HFInfo, HFsInfo, HardFork, NUMB_OF_HARD_FORKS}; -use crate::{ - context::{hardforks::HardForkState, HardForkConfig}, - tests::{ - context::data::{HFS_2678808_2688888, HFS_2688888_2689608}, - mock_db::*, - }, +use crate::tests::{ + context::data::{HFS_2678808_2688888, HFS_2688888_2689608}, + mock_db::*, }; const TEST_WINDOW_SIZE: usize = 25; diff --git a/consensus/src/tests/context/rx_vms.rs b/consensus/src/tests/context/rx_vms.rs index b1eba8e..41c6279 100644 --- a/consensus/src/tests/context/rx_vms.rs +++ b/consensus/src/tests/context/rx_vms.rs @@ -3,15 +3,13 @@ use std::collections::VecDeque; use proptest::prelude::*; use tokio::runtime::Builder; +use cuprate_consensus_context::rx_vms::{get_last_rx_seed_heights, RandomXVmCache}; use cuprate_consensus_rules::{ blocks::{is_randomx_seed_height, randomx_seed_height}, HardFork, }; -use crate::{ - context::rx_vms::{get_last_rx_seed_heights, RandomXVmCache}, - tests::mock_db::*, -}; +use crate::tests::mock_db::*; #[test] fn rx_heights_consistent() { diff --git a/consensus/src/tests/context/weight.rs b/consensus/src/tests/context/weight.rs index b23f8f8..dab3979 100644 --- a/consensus/src/tests/context/weight.rs +++ b/consensus/src/tests/context/weight.rs @@ -1,11 +1,11 @@ use crate::{ - context::{ - weight::{calculate_block_long_term_weight, BlockWeightsCache}, - BlockWeightsCacheConfig, - }, tests::{context::data::BW_2850000_3050000, mock_db::*}, HardFork, }; +use cuprate_consensus_context::{ + weight::{calculate_block_long_term_weight, BlockWeightsCache}, + BlockWeightsCacheConfig, +}; use cuprate_types::Chain; pub(crate) const TEST_WEIGHT_CONFIG: BlockWeightsCacheConfig = From 4b350e897d76abfc4fdccd152ff02b966d549031 Mon Sep 17 00:00:00 2001 From: hinto-janai Date: Tue, 22 Oct 2024 12:35:54 -0400 Subject: [PATCH 2/5] consensus-context: enable workspace lints (#321) enable lints, fix 1.82 clippy --- consensus/context/Cargo.toml | 7 +++++-- consensus/context/src/difficulty.rs | 2 +- consensus/context/src/lib.rs | 6 +++++- consensus/rules/Cargo.toml | 2 +- consensus/rules/src/lib.rs | 4 ++-- consensus/rules/src/miner_tx.rs | 2 +- 6 files changed, 15 insertions(+), 8 deletions(-) diff --git a/consensus/context/Cargo.toml b/consensus/context/Cargo.toml index 0080420..f7642e8 100644 --- a/consensus/context/Cargo.toml +++ b/consensus/context/Cargo.toml @@ -7,8 +7,8 @@ authors = ["SyntheticBird","Boog900"] [dependencies] cuprate-consensus-rules = { path = "../rules", features = ["proptest"]} -cuprate-helper = { path = "../../helper", default-features = false, features = ["std", "cast"] } -cuprate-types = { path = "../../types", default-features = false } +cuprate-helper = { path = "../../helper", default-features = false, features = ["std", "cast", "num", "asynch"] } +cuprate-types = { path = "../../types", default-features = false, features = ["blockchain"] } futures = { workspace = true, features = ["std", "async-await"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros"]} @@ -22,3 +22,6 @@ randomx-rs = { workspace = true } rayon = { workspace = true } thread_local = { workspace = true } hex = { workspace = true } + +[lints] +workspace = true \ No newline at end of file diff --git a/consensus/context/src/difficulty.rs b/consensus/context/src/difficulty.rs index e3f558a..1b61eb9 100644 --- a/consensus/context/src/difficulty.rs +++ b/consensus/context/src/difficulty.rs @@ -329,7 +329,7 @@ fn next_difficulty( } // TODO: do checked operations here and unwrap so we don't silently overflow? - (windowed_work * hf.block_time().as_secs() as u128 + time_span - 1) / time_span + (windowed_work * u128::from(hf.block_time().as_secs()) + time_span - 1) / time_span } /// Get the start and end of the window to calculate difficulty. diff --git a/consensus/context/src/lib.rs b/consensus/context/src/lib.rs index 82e601d..198d5a1 100644 --- a/consensus/context/src/lib.rs +++ b/consensus/context/src/lib.rs @@ -3,7 +3,11 @@ //! This crate contains a service to get cached context from the blockchain: [`BlockChainContext`]. //! This is used during contextual validation, this does not have all the data for contextual validation //! (outputs) for that you will need a [`Database`]. -//! + +// Used in documentation references for [`BlockChainContextRequest`] +// FIXME: should we pull in a dependency just to link docs? +use monero_serai as _; + use std::{ cmp::min, collections::HashMap, diff --git a/consensus/rules/Cargo.toml b/consensus/rules/Cargo.toml index 50117ac..fac22bc 100644 --- a/consensus/rules/Cargo.toml +++ b/consensus/rules/Cargo.toml @@ -11,7 +11,7 @@ proptest = ["cuprate-types/proptest"] rayon = ["dep:rayon"] [dependencies] -cuprate-constants = { path = "../../constants", default-features = false } +cuprate-constants = { path = "../../constants", default-features = false, features = ["block"] } cuprate-helper = { path = "../../helper", default-features = false, features = ["std", "cast"] } cuprate-types = { path = "../../types", default-features = false } cuprate-cryptonight = {path = "../../cryptonight"} diff --git a/consensus/rules/src/lib.rs b/consensus/rules/src/lib.rs index 876e2f7..eef20c1 100644 --- a/consensus/rules/src/lib.rs +++ b/consensus/rules/src/lib.rs @@ -63,9 +63,9 @@ where /// An internal function that returns an iterator or a parallel iterator if the /// `rayon` feature is enabled. #[cfg(not(feature = "rayon"))] -fn try_par_iter(t: T) -> impl std::iter::Iterator +fn try_par_iter(t: T) -> impl Iterator where - T: std::iter::IntoIterator, + T: IntoIterator, { t.into_iter() } diff --git a/consensus/rules/src/miner_tx.rs b/consensus/rules/src/miner_tx.rs index 5221ee5..bb3b004 100644 --- a/consensus/rules/src/miner_tx.rs +++ b/consensus/rules/src/miner_tx.rs @@ -68,7 +68,7 @@ pub fn calculate_block_reward( .unwrap(); let effective_median_bw: u128 = median_bw.try_into().unwrap(); - (((base_reward as u128 * multiplicand) / effective_median_bw) / effective_median_bw) + (((u128::from(base_reward) * multiplicand) / effective_median_bw) / effective_median_bw) .try_into() .unwrap() } From b8e2d00af492a5262185d2dda6a42f6bd3b6a7ea Mon Sep 17 00:00:00 2001 From: SyntheticBird <118022351+SyntheticBird45@users.noreply.github.com> Date: Thu, 24 Oct 2024 23:10:33 +0200 Subject: [PATCH 3/5] storage: Add common amounts commitment lookup table (#323) Add common ammounts commitment lookup table - Implements `compute_zero_commitment` function in `cuprate-helper::crypto` module. - Added test that compare the function output with the correct calculation. - Use of a constant-time algorithm for the lookup table. - Added according documentation --- constants/Cargo.toml | 2 +- helper/Cargo.toml | 14 +-- helper/src/crypto.rs | 122 +++++++++++++++++++++++++++ helper/src/lib.rs | 3 + storage/blockchain/Cargo.toml | 2 +- storage/blockchain/src/ops/output.rs | 9 +- storage/blockchain/src/ops/tx.rs | 9 +- 7 files changed, 142 insertions(+), 19 deletions(-) create mode 100644 helper/src/crypto.rs diff --git a/constants/Cargo.toml b/constants/Cargo.toml index 6d3e031..5ce3732 100644 --- a/constants/Cargo.toml +++ b/constants/Cargo.toml @@ -19,4 +19,4 @@ rpc = [] [dev-dependencies] [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/helper/Cargo.toml b/helper/Cargo.toml index 111c6f0..c70efb0 100644 --- a/helper/Cargo.toml +++ b/helper/Cargo.toml @@ -16,6 +16,7 @@ atomic = ["dep:crossbeam"] asynch = ["dep:futures", "dep:rayon"] cast = [] constants = [] +crypto = ["dep:curve25519-dalek", "dep:monero-serai", "std"] fs = ["dep:dirs"] num = [] map = ["cast", "dep:monero-serai", "dep:cuprate-constants"] @@ -26,12 +27,13 @@ tx = ["dep:monero-serai"] [dependencies] cuprate-constants = { path = "../constants", optional = true, features = ["block"] } -crossbeam = { workspace = true, optional = true } -chrono = { workspace = true, optional = true, features = ["std", "clock"] } -dirs = { workspace = true, optional = true } -futures = { workspace = true, optional = true, features = ["std"] } -monero-serai = { workspace = true, optional = true } -rayon = { workspace = true, optional = true } +chrono = { workspace = true, optional = true, features = ["std", "clock"] } +crossbeam = { workspace = true, optional = true } +curve25519-dalek = { workspace = true, optional = true } +dirs = { workspace = true, optional = true } +futures = { workspace = true, optional = true, features = ["std"] } +monero-serai = { workspace = true, optional = true } +rayon = { workspace = true, optional = true } # This is kinda a stupid work around. # [thread] needs to activate one of these libs (windows|libc) diff --git a/helper/src/crypto.rs b/helper/src/crypto.rs new file mode 100644 index 0000000..1a27cd3 --- /dev/null +++ b/helper/src/crypto.rs @@ -0,0 +1,122 @@ +//! Crypto related functions and runtime initialized constants + +//---------------------------------------------------------------------------------------------------- Use +use std::sync::LazyLock; + +use curve25519_dalek::{ + constants::ED25519_BASEPOINT_POINT, edwards::VartimeEdwardsPrecomputation, + traits::VartimePrecomputedMultiscalarMul, EdwardsPoint, Scalar, +}; +use monero_serai::generators::H; + +//---------------------------------------------------------------------------------------------------- Pre-computation + +/// This is the decomposed amount table containing the mandatory Pre-RCT amounts. It is used to pre-compute +/// zero commitments at runtime. +/// +/// Defined at: +/// - +#[rustfmt::skip] +pub const ZERO_COMMITMENT_DECOMPOSED_AMOUNT: [u64; 172] = [ + 1, 2, 3, 4, 5, 6, 7, 8, 9, + 10, 20, 30, 40, 50, 60, 70, 80, 90, + 100, 200, 300, 400, 500, 600, 700, 800, 900, + 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, + 10000, 20000, 30000, 40000, 50000, 60000, 70000, 80000, 90000, + 100000, 200000, 300000, 400000, 500000, 600000, 700000, 800000, 900000, + 1000000, 2000000, 3000000, 4000000, 5000000, 6000000, 7000000, 8000000, 9000000, + 10000000, 20000000, 30000000, 40000000, 50000000, 60000000, 70000000, 80000000, 90000000, + 100000000, 200000000, 300000000, 400000000, 500000000, 600000000, 700000000, 800000000, 900000000, + 1000000000, 2000000000, 3000000000, 4000000000, 5000000000, 6000000000, 7000000000, 8000000000, 9000000000, + 10000000000, 20000000000, 30000000000, 40000000000, 50000000000, 60000000000, 70000000000, 80000000000, 90000000000, + 100000000000, 200000000000, 300000000000, 400000000000, 500000000000, 600000000000, 700000000000, 800000000000, 900000000000, + 1000000000000, 2000000000000, 3000000000000, 4000000000000, 5000000000000, 6000000000000, 7000000000000, 8000000000000, 9000000000000, + 10000000000000, 20000000000000, 30000000000000, 40000000000000, 50000000000000, 60000000000000, 70000000000000, 80000000000000, 90000000000000, + 100000000000000, 200000000000000, 300000000000000, 400000000000000, 500000000000000, 600000000000000, 700000000000000, 800000000000000, 900000000000000, + 1000000000000000, 2000000000000000, 3000000000000000, 4000000000000000, 5000000000000000, 6000000000000000, 7000000000000000, 8000000000000000, 9000000000000000, + 10000000000000000, 20000000000000000, 30000000000000000, 40000000000000000, 50000000000000000, 60000000000000000, 70000000000000000, 80000000000000000, 90000000000000000, + 100000000000000000, 200000000000000000, 300000000000000000, 400000000000000000, 500000000000000000, 600000000000000000, 700000000000000000, 800000000000000000, 900000000000000000, + 1000000000000000000, 2000000000000000000, 3000000000000000000, 4000000000000000000, 5000000000000000000, 6000000000000000000, 7000000000000000000, 8000000000000000000, 9000000000000000000, + 10000000000000000000 +]; + +/// Runtime initialized [`H`] generator. +static H_PRECOMP: LazyLock = + LazyLock::new(|| VartimeEdwardsPrecomputation::new([*H, ED25519_BASEPOINT_POINT])); + +/// Runtime initialized zero commitment lookup table +/// +/// # Invariant +/// This function assumes that the [`ZERO_COMMITMENT_DECOMPOSED_AMOUNT`] +/// table is sorted. +pub static ZERO_COMMITMENT_LOOKUP_TABLE: LazyLock<[EdwardsPoint; 172]> = LazyLock::new(|| { + let mut lookup_table: [EdwardsPoint; 172] = [ED25519_BASEPOINT_POINT; 172]; + + for (i, amount) in ZERO_COMMITMENT_DECOMPOSED_AMOUNT.into_iter().enumerate() { + lookup_table[i] = ED25519_BASEPOINT_POINT + *H * Scalar::from(amount); + } + + lookup_table +}); + +//---------------------------------------------------------------------------------------------------- Free functions + +/// This function computes the zero commitment given a specific amount. +/// +/// It will first attempt to lookup into the table of known Pre-RCT value. +/// Compute it otherwise. +#[expect(clippy::cast_possible_truncation)] +pub fn compute_zero_commitment(amount: u64) -> EdwardsPoint { + // OPTIMIZATION: Unlike monerod which execute a linear search across its lookup + // table (O(n)). Cuprate is making use of an arithmetic based constant time + // version (O(1)). It has been benchmarked in both hit and miss scenarios against + // a binary search lookup (O(log2(n))). To understand the following algorithm it + // is important to observe the pattern that follows the values of + // [`ZERO_COMMITMENT_DECOMPOSED_AMOUNT`]. + + // First obtain the logarithm base 10 of the amount. and extend it back to obtain + // the amount without its most significant digit. + let Some(log) = amount.checked_ilog10() else { + // amount = 0 so H component is 0. + return ED25519_BASEPOINT_POINT; + }; + let div = 10_u64.pow(log); + + // Extract the most significant digit. + let most_significant_digit = amount / div; + + // If the *rounded* version is different than the exact amount. Then + // there aren't only trailing zeroes behind the most significant digit. + // The amount is not part of the table and can calculated apart. + if most_significant_digit * div != amount { + return H_PRECOMP.vartime_multiscalar_mul([Scalar::from(amount), Scalar::ONE]); + } + + // Calculating the index back by progressing within the powers of 10. + // The index of the first value in the cached amount's row. + let row_start = u64::from(log) * 9; + // The index of the cached amount + let index = (most_significant_digit - 1 + row_start) as usize; + + ZERO_COMMITMENT_LOOKUP_TABLE[index] +} + +//---------------------------------------------------------------------------------------------------- Tests +#[cfg(test)] +mod test { + use curve25519_dalek::{traits::VartimePrecomputedMultiscalarMul, Scalar}; + + use crate::crypto::{compute_zero_commitment, H_PRECOMP, ZERO_COMMITMENT_DECOMPOSED_AMOUNT}; + + #[test] + /// Compare the output of `compute_zero_commitment` for all + /// preRCT decomposed amounts against their actual computation. + /// + /// Assert that the lookup table returns the correct commitments + fn compare_lookup_with_computation() { + for amount in ZERO_COMMITMENT_DECOMPOSED_AMOUNT { + let commitment = H_PRECOMP.vartime_multiscalar_mul([Scalar::from(amount), Scalar::ONE]); + assert!(commitment == compute_zero_commitment(amount)); + } + } +} diff --git a/helper/src/lib.rs b/helper/src/lib.rs index bfd2fd6..47d47a2 100644 --- a/helper/src/lib.rs +++ b/helper/src/lib.rs @@ -30,6 +30,9 @@ pub mod time; #[cfg(feature = "tx")] pub mod tx; + +#[cfg(feature = "crypto")] +pub mod crypto; //---------------------------------------------------------------------------------------------------- Private Usage //---------------------------------------------------------------------------------------------------- diff --git a/storage/blockchain/Cargo.toml b/storage/blockchain/Cargo.toml index 0057911..414b784 100644 --- a/storage/blockchain/Cargo.toml +++ b/storage/blockchain/Cargo.toml @@ -20,7 +20,7 @@ service = ["dep:thread_local", "dep:rayon", "cuprate-helper/thread"] [dependencies] cuprate-database = { path = "../database" } cuprate-database-service = { path = "../service" } -cuprate-helper = { path = "../../helper", features = ["fs", "map"] } +cuprate-helper = { path = "../../helper", features = ["fs", "map", "crypto"] } cuprate-types = { path = "../../types", features = ["blockchain"] } cuprate-pruning = { path = "../../pruning" } diff --git a/storage/blockchain/src/ops/output.rs b/storage/blockchain/src/ops/output.rs index 1c7c1d7..14c209a 100644 --- a/storage/blockchain/src/ops/output.rs +++ b/storage/blockchain/src/ops/output.rs @@ -1,12 +1,13 @@ //! Output functions. //---------------------------------------------------------------------------------------------------- Import -use curve25519_dalek::{constants::ED25519_BASEPOINT_POINT, edwards::CompressedEdwardsY, Scalar}; -use monero_serai::{generators::H, transaction::Timelock}; +use curve25519_dalek::edwards::CompressedEdwardsY; +use monero_serai::transaction::Timelock; use cuprate_database::{ RuntimeError, {DatabaseRo, DatabaseRw}, }; +use cuprate_helper::crypto::compute_zero_commitment; use cuprate_helper::map::u64_to_timelock; use cuprate_types::OutputOnChain; @@ -155,9 +156,7 @@ pub fn output_to_output_on_chain( amount: Amount, table_tx_unlock_time: &impl DatabaseRo, ) -> Result { - // FIXME: implement lookup table for common values: - // - let commitment = ED25519_BASEPOINT_POINT + *H * Scalar::from(amount); + let commitment = compute_zero_commitment(amount); let time_lock = if output .output_flags diff --git a/storage/blockchain/src/ops/tx.rs b/storage/blockchain/src/ops/tx.rs index c9799a2..5a60ad5 100644 --- a/storage/blockchain/src/ops/tx.rs +++ b/storage/blockchain/src/ops/tx.rs @@ -2,10 +2,10 @@ //---------------------------------------------------------------------------------------------------- Import use bytemuck::TransparentWrapper; -use curve25519_dalek::{constants::ED25519_BASEPOINT_POINT, Scalar}; use monero_serai::transaction::{Input, Timelock, Transaction}; use cuprate_database::{DatabaseRo, DatabaseRw, RuntimeError, StorableVec}; +use cuprate_helper::crypto::compute_zero_commitment; use crate::{ ops::{ @@ -136,12 +136,9 @@ pub fn add_tx( .enumerate() .map(|(i, output)| { // Create commitment. - // - // FIXME: implement lookup table for common values: - // + let commitment = if miner_tx { - ED25519_BASEPOINT_POINT - + *monero_serai::generators::H * Scalar::from(output.amount.unwrap_or(0)) + compute_zero_commitment(output.amount.unwrap_or(0)) } else { proofs .as_ref() From 63216aecaead915550be93236faf6aeeebf0e04f Mon Sep 17 00:00:00 2001 From: SyntheticBird <118022351+SyntheticBird45@users.noreply.github.com> Date: Fri, 25 Oct 2024 00:12:30 +0200 Subject: [PATCH 4/5] workspace: Defines cuprate members as workspace dependencies (#326) Defines cuprate members as workspace dependencies - Defines cuprate members as workspace dependencies - Changed all `path` import into `workspace = true` Co-authored-by: Boog900 --- Cargo.toml | 29 ++++++++++++++++++++ binaries/cuprated/Cargo.toml | 50 +++++++++++++++++----------------- consensus/Cargo.toml | 12 ++++---- consensus/context/Cargo.toml | 8 +++--- consensus/fast-sync/Cargo.toml | 12 ++++---- consensus/rules/Cargo.toml | 8 +++--- helper/Cargo.toml | 2 +- net/epee-encoding/Cargo.toml | 6 ++-- net/levin/Cargo.toml | 4 +-- net/wire/Cargo.toml | 10 +++---- p2p/address-book/Cargo.toml | 10 +++---- p2p/p2p-core/Cargo.toml | 10 +++---- p2p/p2p/Cargo.toml | 22 +++++++-------- pruning/Cargo.toml | 4 +-- rpc/interface/Cargo.toml | 10 +++---- rpc/types/Cargo.toml | 10 +++---- storage/blockchain/Cargo.toml | 16 +++++------ storage/service/Cargo.toml | 4 +-- storage/txpool/Cargo.toml | 12 ++++---- test-utils/Cargo.toml | 10 +++---- types/Cargo.toml | 8 +++--- 21 files changed, 143 insertions(+), 114 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2ef99d6..3865863 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,35 @@ opt-level = 1 opt-level = 3 [workspace.dependencies] +# Cuprate members +cuprate-fast-sync = { path = "consensus/fast-sync" ,default-features = false} +cuprate-consensus-rules = { path = "consensus/rules" ,default-features = false} +cuprate-constants = { path = "constants" ,default-features = false} +cuprate-consensus = { path = "consensus" ,default-features = false} +cuprate-consensus-context = { path = "consensus/context" ,default-features = false} +cuprate-cryptonight = { path = "cryptonight" ,default-features = false} +cuprate-helper = { path = "helper" ,default-features = false} +cuprate-epee-encoding = { path = "net/epee-encoding" ,default-features = false} +cuprate-fixed-bytes = { path = "net/fixed-bytes" ,default-features = false} +cuprate-levin = { path = "net/levin" ,default-features = false} +cuprate-wire = { path = "net/wire" ,default-features = false} +cuprate-p2p = { path = "p2p/p2p" ,default-features = false} +cuprate-p2p-core = { path = "p2p/p2p-core" ,default-features = false} +cuprate-dandelion-tower = { path = "p2p/dandelion-tower" ,default-features = false} +cuprate-async-buffer = { path = "p2p/async-buffer" ,default-features = false} +cuprate-address-book = { path = "p2p/address-book" ,default-features = false} +cuprate-blockchain = { path = "storage/blockchain" ,default-features = false} +cuprate-database = { path = "storage/database" ,default-features = false} +cuprate-database-service = { path = "storage/service" ,default-features = false} +cuprate-txpool = { path = "storage/txpool" ,default-features = false} +cuprate-pruning = { path = "pruning" ,default-features = false} +cuprate-test-utils = { path = "test-utils" ,default-features = false} +cuprate-types = { path = "types" ,default-features = false} +cuprate-json-rpc = { path = "rpc/json-rpc" ,default-features = false} +cuprate-rpc-types = { path = "rpc/types" ,default-features = false} +cuprate-rpc-interface = { path = "rpc/interface" ,default-features = false} + +# External dependencies anyhow = { version = "1.0.89", default-features = false } async-trait = { version = "0.1.82", default-features = false } bitflags = { version = "2.6.0", default-features = false } diff --git a/binaries/cuprated/Cargo.toml b/binaries/cuprated/Cargo.toml index 59fa978..2f22be0 100644 --- a/binaries/cuprated/Cargo.toml +++ b/binaries/cuprated/Cargo.toml @@ -9,31 +9,31 @@ repository = "https://github.com/Cuprate/cuprate/tree/main/binaries/cuprated" [dependencies] # TODO: after v1.0.0, remove unneeded dependencies. -cuprate-consensus = { path = "../../consensus" } -cuprate-fast-sync = { path = "../../consensus/fast-sync" } -cuprate-consensus-context = { path = "../../consensus/context" } -cuprate-consensus-rules = { path = "../../consensus/rules" } -cuprate-cryptonight = { path = "../../cryptonight" } -cuprate-helper = { path = "../../helper" } -cuprate-epee-encoding = { path = "../../net/epee-encoding" } -cuprate-fixed-bytes = { path = "../../net/fixed-bytes" } -cuprate-levin = { path = "../../net/levin" } -cuprate-wire = { path = "../../net/wire" } -cuprate-p2p = { path = "../../p2p/p2p" } -cuprate-p2p-core = { path = "../../p2p/p2p-core" } -cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower" } -cuprate-async-buffer = { path = "../../p2p/async-buffer" } -cuprate-address-book = { path = "../../p2p/address-book" } -cuprate-blockchain = { path = "../../storage/blockchain", features = ["service"] } -cuprate-database-service = { path = "../../storage/service" } -cuprate-txpool = { path = "../../storage/txpool" } -cuprate-database = { path = "../../storage/database" } -cuprate-pruning = { path = "../../pruning" } -cuprate-test-utils = { path = "../../test-utils" } -cuprate-types = { path = "../../types" } -cuprate-json-rpc = { path = "../../rpc/json-rpc" } -cuprate-rpc-interface = { path = "../../rpc/interface" } -cuprate-rpc-types = { path = "../../rpc/types" } +cuprate-consensus = { workspace = true } +cuprate-fast-sync = { workspace = true } +cuprate-consensus-context = { workspace = true } +cuprate-consensus-rules = { workspace = true } +cuprate-cryptonight = { workspace = true } +cuprate-helper = { workspace = true } +cuprate-epee-encoding = { workspace = true } +cuprate-fixed-bytes = { workspace = true } +cuprate-levin = { workspace = true } +cuprate-wire = { workspace = true } +cuprate-p2p = { workspace = true } +cuprate-p2p-core = { workspace = true } +cuprate-dandelion-tower = { workspace = true } +cuprate-async-buffer = { workspace = true } +cuprate-address-book = { workspace = true } +cuprate-blockchain = { workspace = true, features = ["service"] } +cuprate-database-service = { workspace = true } +cuprate-txpool = { workspace = true } +cuprate-database = { workspace = true } +cuprate-pruning = { workspace = true } +cuprate-test-utils = { workspace = true } +cuprate-types = { workspace = true } +cuprate-json-rpc = { workspace = true } +cuprate-rpc-interface = { workspace = true } +cuprate-rpc-types = { workspace = true } # TODO: after v1.0.0, remove unneeded dependencies. anyhow = { workspace = true } diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 1fdee89..8b732a0 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -8,10 +8,10 @@ authors = ["Boog900"] repository = "https://github.com/Cuprate/cuprate/tree/main/consensus" [dependencies] -cuprate-helper = { path = "../helper", default-features = false, features = ["std", "asynch", "num"] } -cuprate-consensus-rules = { path = "./rules", features = ["rayon"] } -cuprate-types = { path = "../types" } -cuprate-consensus-context = { path = "./context" } +cuprate-helper = { workspace = true, default-features = false, features = ["std", "asynch", "num"] } +cuprate-consensus-rules = { workspace = true, features = ["rayon"] } +cuprate-types = { workspace = true } +cuprate-consensus-context = { workspace = true } cfg-if = { workspace = true } thiserror = { workspace = true } @@ -28,8 +28,8 @@ hex = { workspace = true } rand = { workspace = true } [dev-dependencies] -cuprate-test-utils = { path = "../test-utils" } -cuprate-consensus-rules = {path = "./rules", features = ["proptest"]} +cuprate-test-utils = { workspace = true } +cuprate-consensus-rules = { workspace = true, features = ["proptest"]} hex-literal = { workspace = true } curve25519-dalek = { workspace = true } diff --git a/consensus/context/Cargo.toml b/consensus/context/Cargo.toml index f7642e8..7679046 100644 --- a/consensus/context/Cargo.toml +++ b/consensus/context/Cargo.toml @@ -6,9 +6,9 @@ license = "MIT" authors = ["SyntheticBird","Boog900"] [dependencies] -cuprate-consensus-rules = { path = "../rules", features = ["proptest"]} -cuprate-helper = { path = "../../helper", default-features = false, features = ["std", "cast", "num", "asynch"] } -cuprate-types = { path = "../../types", default-features = false, features = ["blockchain"] } +cuprate-consensus-rules = { workspace = true, features = ["proptest"]} +cuprate-helper = { workspace = true, default-features = false, features = ["std", "cast", "num", "asynch"] } +cuprate-types = { workspace = true, default-features = false, features = ["blockchain"] } futures = { workspace = true, features = ["std", "async-await"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros"]} @@ -24,4 +24,4 @@ thread_local = { workspace = true } hex = { workspace = true } [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/consensus/fast-sync/Cargo.toml b/consensus/fast-sync/Cargo.toml index aa9c8d2..8e732a6 100644 --- a/consensus/fast-sync/Cargo.toml +++ b/consensus/fast-sync/Cargo.toml @@ -9,12 +9,12 @@ name = "cuprate-fast-sync-create-hashes" path = "src/create.rs" [dependencies] -cuprate-blockchain = { path = "../../storage/blockchain" } -cuprate-consensus = { path = ".." } -cuprate-consensus-rules = { path = "../rules" } -cuprate-consensus-context = { path = "../context" } -cuprate-types = { path = "../../types" } -cuprate-helper = { path = "../../helper", features = ["cast"] } +cuprate-blockchain = { workspace = true } +cuprate-consensus = { workspace = true } +cuprate-consensus-rules = { workspace = true } +cuprate-consensus-context = { workspace = true } +cuprate-types = { workspace = true } +cuprate-helper = { workspace = true, features = ["cast"] } clap = { workspace = true, features = ["derive", "std"] } hex = { workspace = true } diff --git a/consensus/rules/Cargo.toml b/consensus/rules/Cargo.toml index fac22bc..8999cbc 100644 --- a/consensus/rules/Cargo.toml +++ b/consensus/rules/Cargo.toml @@ -11,10 +11,10 @@ proptest = ["cuprate-types/proptest"] rayon = ["dep:rayon"] [dependencies] -cuprate-constants = { path = "../../constants", default-features = false, features = ["block"] } -cuprate-helper = { path = "../../helper", default-features = false, features = ["std", "cast"] } -cuprate-types = { path = "../../types", default-features = false } -cuprate-cryptonight = {path = "../../cryptonight"} +cuprate-constants = { workspace = true, default-features = false, features = ["block"] } +cuprate-helper = { workspace = true, default-features = false, features = ["std", "cast"] } +cuprate-types = { workspace = true, default-features = false } +cuprate-cryptonight = { workspace = true } monero-serai = { workspace = true, features = ["std"] } curve25519-dalek = { workspace = true, features = ["alloc", "zeroize", "precomputed-tables"] } diff --git a/helper/Cargo.toml b/helper/Cargo.toml index c70efb0..ad78a44 100644 --- a/helper/Cargo.toml +++ b/helper/Cargo.toml @@ -25,7 +25,7 @@ thread = ["std", "dep:target_os_lib"] tx = ["dep:monero-serai"] [dependencies] -cuprate-constants = { path = "../constants", optional = true, features = ["block"] } +cuprate-constants = { workspace = true, optional = true, features = ["block"] } chrono = { workspace = true, optional = true, features = ["std", "clock"] } crossbeam = { workspace = true, optional = true } diff --git a/net/epee-encoding/Cargo.toml b/net/epee-encoding/Cargo.toml index c021e42..4724e2d 100644 --- a/net/epee-encoding/Cargo.toml +++ b/net/epee-encoding/Cargo.toml @@ -15,8 +15,8 @@ default = ["std"] std = ["dep:thiserror", "bytes/std", "cuprate-fixed-bytes/std"] [dependencies] -cuprate-helper = { path = "../../helper", default-features = false, features = ["cast"] } -cuprate-fixed-bytes = { path = "../fixed-bytes", default-features = false } +cuprate-helper = { workspace = true, default-features = false, features = ["cast"] } +cuprate-fixed-bytes = { workspace = true, default-features = false } paste = "1.0.15" ref-cast = "1.0.23" @@ -27,4 +27,4 @@ thiserror = { workspace = true, optional = true} hex = { workspace = true, features = ["default"] } [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/net/levin/Cargo.toml b/net/levin/Cargo.toml index 68c32e5..a9f3c1f 100644 --- a/net/levin/Cargo.toml +++ b/net/levin/Cargo.toml @@ -12,7 +12,7 @@ default = [] tracing = ["dep:tracing", "tokio-util/tracing"] [dependencies] -cuprate-helper = { path = "../../helper", default-features = false, features = ["cast"] } +cuprate-helper = { workspace = true, default-features = false, features = ["cast"] } cfg-if = { workspace = true } thiserror = { workspace = true } @@ -30,4 +30,4 @@ tokio = { workspace = true, features = ["full"] } futures = { workspace = true, features = ["std"] } [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/net/wire/Cargo.toml b/net/wire/Cargo.toml index 0b77cf1..b500a28 100644 --- a/net/wire/Cargo.toml +++ b/net/wire/Cargo.toml @@ -11,11 +11,11 @@ default = [] tracing = ["cuprate-levin/tracing"] [dependencies] -cuprate-levin = { path = "../levin" } -cuprate-epee-encoding = { path = "../epee-encoding" } -cuprate-fixed-bytes = { path = "../fixed-bytes" } -cuprate-types = { path = "../../types", default-features = false, features = ["epee"] } -cuprate-helper = { path = "../../helper", default-features = false, features = ["map"] } +cuprate-levin = { workspace = true } +cuprate-epee-encoding = { workspace = true } +cuprate-fixed-bytes = { workspace = true } +cuprate-types = { workspace = true, default-features = false, features = ["epee"] } +cuprate-helper = { workspace = true, default-features = false, features = ["map"] } bitflags = { workspace = true, features = ["std"] } bytes = { workspace = true, features = ["std"] } diff --git a/p2p/address-book/Cargo.toml b/p2p/address-book/Cargo.toml index 9afc255..9cbba71 100644 --- a/p2p/address-book/Cargo.toml +++ b/p2p/address-book/Cargo.toml @@ -7,9 +7,9 @@ authors = ["Boog900"] [dependencies] -cuprate-constants = { path = "../../constants" } -cuprate-pruning = { path = "../../pruning" } -cuprate-p2p-core = { path = "../p2p-core" } +cuprate-constants = { workspace = true } +cuprate-pruning = { workspace = true } +cuprate-p2p-core = { workspace = true } tower = { workspace = true, features = ["util"] } tokio = { workspace = true, features = ["time", "fs", "rt"]} @@ -26,9 +26,9 @@ rand = { workspace = true, features = ["std", "std_rng"] } borsh = { workspace = true, features = ["derive", "std"]} [dev-dependencies] -cuprate-test-utils = {path = "../../test-utils"} +cuprate-test-utils = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros"]} [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/p2p/p2p-core/Cargo.toml b/p2p/p2p-core/Cargo.toml index a30590f..0a6aaf3 100644 --- a/p2p/p2p-core/Cargo.toml +++ b/p2p/p2p-core/Cargo.toml @@ -10,9 +10,9 @@ default = ["borsh"] borsh = ["dep:borsh", "cuprate-pruning/borsh"] [dependencies] -cuprate-helper = { path = "../../helper", features = ["asynch"], default-features = false } -cuprate-wire = { path = "../../net/wire", features = ["tracing"] } -cuprate-pruning = { path = "../../pruning" } +cuprate-helper = { workspace = true, features = ["asynch"], default-features = false } +cuprate-wire = { workspace = true, features = ["tracing"] } +cuprate-pruning = { workspace = true } tokio = { workspace = true, features = ["net", "sync", "macros", "time", "rt", "rt-multi-thread"]} tokio-util = { workspace = true, features = ["codec"] } @@ -29,10 +29,10 @@ hex-literal = { workspace = true } borsh = { workspace = true, features = ["derive", "std"], optional = true } [dev-dependencies] -cuprate-test-utils = { path = "../../test-utils" } +cuprate-test-utils = { workspace = true } hex = { workspace = true, features = ["std"] } tokio-test = { workspace = true } [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/p2p/p2p/Cargo.toml b/p2p/p2p/Cargo.toml index 3444b5e..866fb91 100644 --- a/p2p/p2p/Cargo.toml +++ b/p2p/p2p/Cargo.toml @@ -6,15 +6,15 @@ license = "MIT" authors = ["Boog900"] [dependencies] -cuprate-constants = { path = "../../constants" } -cuprate-fixed-bytes = { path = "../../net/fixed-bytes" } -cuprate-wire = { path = "../../net/wire" } -cuprate-p2p-core = { path = "../p2p-core", features = ["borsh"] } -cuprate-address-book = { path = "../address-book" } -cuprate-pruning = { path = "../../pruning" } -cuprate-helper = { path = "../../helper", features = ["asynch"], default-features = false } -cuprate-async-buffer = { path = "../async-buffer" } -cuprate-types = { path = "../../types", default-features = false } +cuprate-constants = { workspace = true } +cuprate-fixed-bytes = { workspace = true } +cuprate-wire = { workspace = true } +cuprate-p2p-core = { workspace = true, features = ["borsh"] } +cuprate-address-book = { workspace = true } +cuprate-pruning = { workspace = true } +cuprate-helper = { workspace = true, features = ["asynch"], default-features = false } +cuprate-async-buffer = { workspace = true } +cuprate-types = { workspace = true, default-features = false } monero-serai = { workspace = true, features = ["std"] } @@ -35,10 +35,10 @@ tracing = { workspace = true, features = ["std", "attributes"] } borsh = { workspace = true, features = ["derive", "std"] } [dev-dependencies] -cuprate-test-utils = { path = "../../test-utils" } +cuprate-test-utils = { workspace = true } indexmap = { workspace = true } proptest = { workspace = true } tokio-test = { workspace = true } [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/pruning/Cargo.toml b/pruning/Cargo.toml index e898fd5..6fcc74e 100644 --- a/pruning/Cargo.toml +++ b/pruning/Cargo.toml @@ -10,11 +10,11 @@ default = [] borsh = ["dep:borsh"] [dependencies] -cuprate-constants = { path = "../constants" } +cuprate-constants = { workspace = true } thiserror = { workspace = true } borsh = { workspace = true, features = ["derive", "std"], optional = true } [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/rpc/interface/Cargo.toml b/rpc/interface/Cargo.toml index 00f7a22..ef62d34 100644 --- a/rpc/interface/Cargo.toml +++ b/rpc/interface/Cargo.toml @@ -13,10 +13,10 @@ default = ["dummy", "serde"] dummy = [] [dependencies] -cuprate-epee-encoding = { path = "../../net/epee-encoding", default-features = false } -cuprate-json-rpc = { path = "../json-rpc", default-features = false } -cuprate-rpc-types = { path = "../types", features = ["serde", "epee"], default-features = false } -cuprate-helper = { path = "../../helper", features = ["asynch"], default-features = false } +cuprate-epee-encoding = { workspace = true, default-features = false } +cuprate-json-rpc = { workspace = true, default-features = false } +cuprate-rpc-types = { workspace = true, features = ["serde", "epee"], default-features = false } +cuprate-helper = { workspace = true, features = ["asynch"], default-features = false } anyhow = { workspace = true } axum = { version = "0.7.5", features = ["json"], default-features = false } @@ -26,7 +26,7 @@ paste = { workspace = true } futures = { workspace = true } [dev-dependencies] -cuprate-test-utils = { path = "../../test-utils" } +cuprate-test-utils = { workspace = true } axum = { version = "0.7.5", features = ["json", "tokio", "http2"] } serde_json = { workspace = true, features = ["std"] } diff --git a/rpc/types/Cargo.toml b/rpc/types/Cargo.toml index cfe7e47..e9ca529 100644 --- a/rpc/types/Cargo.toml +++ b/rpc/types/Cargo.toml @@ -14,18 +14,18 @@ serde = ["dep:serde", "cuprate-fixed-bytes/serde"] epee = ["dep:cuprate-epee-encoding"] [dependencies] -cuprate-epee-encoding = { path = "../../net/epee-encoding", optional = true } -cuprate-fixed-bytes = { path = "../../net/fixed-bytes" } -cuprate-types = { path = "../../types", default-features = false, features = ["epee", "serde"] } +cuprate-epee-encoding = { workspace = true, optional = true } +cuprate-fixed-bytes = { workspace = true } +cuprate-types = { workspace = true, default-features = false, features = ["epee", "serde"] } paste = { workspace = true } serde = { workspace = true, optional = true } [dev-dependencies] -cuprate-test-utils = { path = "../../test-utils" } +cuprate-test-utils = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/storage/blockchain/Cargo.toml b/storage/blockchain/Cargo.toml index 414b784..d0a43b3 100644 --- a/storage/blockchain/Cargo.toml +++ b/storage/blockchain/Cargo.toml @@ -18,11 +18,11 @@ redb-memory = ["cuprate-database/redb-memory"] service = ["dep:thread_local", "dep:rayon", "cuprate-helper/thread"] [dependencies] -cuprate-database = { path = "../database" } -cuprate-database-service = { path = "../service" } -cuprate-helper = { path = "../../helper", features = ["fs", "map", "crypto"] } -cuprate-types = { path = "../../types", features = ["blockchain"] } -cuprate-pruning = { path = "../../pruning" } +cuprate-database = { workspace = true } +cuprate-database-service = { workspace = true } +cuprate-helper = { workspace = true, features = ["fs", "map", "crypto"] } +cuprate-types = { workspace = true, features = ["blockchain"] } +cuprate-pruning = { workspace = true } bitflags = { workspace = true, features = ["std", "serde", "bytemuck"] } bytemuck = { workspace = true, features = ["must_cast", "derive", "min_const_generics", "extern_crate_alloc"] } @@ -37,9 +37,9 @@ thread_local = { workspace = true, optional = true } rayon = { workspace = true, optional = true } [dev-dependencies] -cuprate-constants = { path = "../../constants" } -cuprate-helper = { path = "../../helper", features = ["thread", "cast"] } -cuprate-test-utils = { path = "../../test-utils" } +cuprate-constants = { workspace = true } +cuprate-helper = { workspace = true, features = ["thread", "cast"] } +cuprate-test-utils = { workspace = true } tokio = { workspace = true, features = ["full"] } tempfile = { workspace = true } diff --git a/storage/service/Cargo.toml b/storage/service/Cargo.toml index ed46b35..fa6971c 100644 --- a/storage/service/Cargo.toml +++ b/storage/service/Cargo.toml @@ -9,8 +9,8 @@ repository = "https://github.com/Cuprate/cuprate/tree/main/storage/service" keywords = ["cuprate", "service", "database"] [dependencies] -cuprate-database = { path = "../database" } -cuprate-helper = { path = "../../helper", features = ["fs", "thread", "map"] } +cuprate-database = { workspace = true } +cuprate-helper = { workspace = true, features = ["fs", "thread", "map"] } serde = { workspace = true, optional = true } rayon = { workspace = true } diff --git a/storage/txpool/Cargo.toml b/storage/txpool/Cargo.toml index 70211d9..b9d4218 100644 --- a/storage/txpool/Cargo.toml +++ b/storage/txpool/Cargo.toml @@ -19,10 +19,10 @@ service = ["dep:tower", "dep:rayon", "dep:cuprate-database-service"] serde = ["dep:serde", "cuprate-database/serde", "cuprate-database-service/serde"] [dependencies] -cuprate-database = { path = "../database", features = ["heed"] } -cuprate-database-service = { path = "../service", optional = true } -cuprate-types = { path = "../../types" } -cuprate-helper = { path = "../../helper", default-features = false, features = ["constants"] } +cuprate-database = { workspace = true, features = ["heed"] } +cuprate-database-service = { workspace = true, optional = true } +cuprate-types = { workspace = true } +cuprate-helper = { workspace = true, default-features = false, features = ["constants"] } monero-serai = { workspace = true, features = ["std"] } bytemuck = { workspace = true, features = ["must_cast", "derive", "min_const_generics", "extern_crate_alloc"] } @@ -36,11 +36,11 @@ rayon = { workspace = true, optional = true } serde = { workspace = true, optional = true } [dev-dependencies] -cuprate-test-utils = { path = "../../test-utils" } +cuprate-test-utils = { workspace = true } tokio = { workspace = true } tempfile = { workspace = true } hex-literal = { workspace = true } [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index abf7ee4..4eb5684 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -6,10 +6,10 @@ license = "MIT" authors = ["Boog900", "hinto-janai"] [dependencies] -cuprate-types = { path = "../types" } -cuprate-helper = { path = "../helper", features = ["map", "tx"] } -cuprate-wire = { path = "../net/wire" } -cuprate-p2p-core = { path = "../p2p/p2p-core", features = ["borsh"] } +cuprate-types = { workspace = true } +cuprate-helper = { workspace = true, features = ["map", "tx"] } +cuprate-wire = { workspace = true } +cuprate-p2p-core = { workspace = true, features = ["borsh"] } hex = { workspace = true } hex-literal = { workspace = true } @@ -31,4 +31,4 @@ hex = { workspace = true } pretty_assertions = { workspace = true } [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/types/Cargo.toml b/types/Cargo.toml index 8ac6b25..29887bd 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -18,9 +18,9 @@ json = ["hex", "dep:cuprate-helper"] hex = ["dep:hex"] [dependencies] -cuprate-epee-encoding = { path = "../net/epee-encoding", optional = true } -cuprate-helper = { path = "../helper", optional = true, features = ["cast"] } -cuprate-fixed-bytes = { path = "../net/fixed-bytes" } +cuprate-epee-encoding = { workspace = true, optional = true, features = ["std"] } +cuprate-helper = { workspace = true, optional = true, features = ["cast"] } +cuprate-fixed-bytes = { workspace = true } bytes = { workspace = true } curve25519-dalek = { workspace = true } @@ -39,4 +39,4 @@ pretty_assertions = { workspace = true } serde_json = { workspace = true, features = ["std"] } [lints] -workspace = true \ No newline at end of file +workspace = true From b57ee2f4cf83dcb5fc63626a0cefe426d2b7a418 Mon Sep 17 00:00:00 2001 From: Boog900 Date: Tue, 29 Oct 2024 15:30:51 +0000 Subject: [PATCH 5/5] cuprated: txpool (#312) * init dandelion integration * add dandelion start function * finish incoming tx handler * Add tx blob hash table * Add missing txpool requests * handle duplicate stem txs * check txpool on incoming block * add request to remove tx in new blocks from the pool * tell the txpool about incoming blocks * fix merge * typos * remove blockchain height from txpool * add function to start the pool * add cross network address * pre-review changes * fix CI * review fixes * review fixes * abort on DB error * fix clippy --- Cargo.lock | 32 ++ Cargo.toml | 1 + binaries/cuprated/Cargo.toml | 2 +- binaries/cuprated/src/blockchain.rs | 2 +- binaries/cuprated/src/blockchain/interface.rs | 93 +++-- binaries/cuprated/src/blockchain/manager.rs | 5 + .../src/blockchain/manager/handler.rs | 31 +- binaries/cuprated/src/blockchain/syncer.rs | 5 +- binaries/cuprated/src/blockchain/types.rs | 8 +- binaries/cuprated/src/p2p.rs | 3 + binaries/cuprated/src/p2p/network_address.rs | 16 + binaries/cuprated/src/statics.rs | 2 +- binaries/cuprated/src/txpool.rs | 14 +- binaries/cuprated/src/txpool/dandelion.rs | 65 +++ .../src/txpool/dandelion/diffuse_service.rs | 44 ++ .../src/txpool/dandelion/stem_service.rs | 68 ++++ .../cuprated/src/txpool/dandelion/tx_store.rs | 74 ++++ binaries/cuprated/src/txpool/incoming_tx.rs | 379 ++++++++++++++++++ .../cuprated/src/txpool/txs_being_handled.rs | 53 +++ p2p/dandelion-tower/src/router.rs | 9 + p2p/p2p/src/client_pool.rs | 15 +- p2p/p2p/src/lib.rs | 9 +- storage/service/src/service/write.rs | 8 + storage/txpool/Cargo.toml | 1 + storage/txpool/src/free.rs | 12 +- storage/txpool/src/lib.rs | 2 +- storage/txpool/src/ops.rs | 2 +- storage/txpool/src/ops/tx_read.rs | 19 +- storage/txpool/src/ops/tx_write.rs | 11 + storage/txpool/src/service/interface.rs | 62 ++- storage/txpool/src/service/read.rs | 102 ++++- storage/txpool/src/service/write.rs | 73 +++- storage/txpool/src/tables.rs | 10 +- storage/txpool/src/types.rs | 4 +- 34 files changed, 1146 insertions(+), 90 deletions(-) create mode 100644 binaries/cuprated/src/p2p/network_address.rs create mode 100644 binaries/cuprated/src/txpool/dandelion.rs create mode 100644 binaries/cuprated/src/txpool/dandelion/diffuse_service.rs create mode 100644 binaries/cuprated/src/txpool/dandelion/stem_service.rs create mode 100644 binaries/cuprated/src/txpool/dandelion/tx_store.rs create mode 100644 binaries/cuprated/src/txpool/incoming_tx.rs create mode 100644 binaries/cuprated/src/txpool/txs_being_handled.rs diff --git a/Cargo.lock b/Cargo.lock index ca0174b..0f851dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,6 +56,18 @@ version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" +[[package]] +name = "arrayref" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb" + +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + [[package]] name = "async-stream" version = "0.3.5" @@ -238,6 +250,19 @@ dependencies = [ "digest", ] +[[package]] +name = "blake3" +version = "1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d82033247fd8e890df8f740e407ad4d038debb9eb1f40533fffb32e7d17dc6f7" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if", + "constant_time_eq", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -403,6 +428,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "constant_time_eq" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" + [[package]] name = "core-foundation" version = "0.9.4" @@ -919,6 +950,7 @@ name = "cuprate-txpool" version = "0.0.0" dependencies = [ "bitflags 2.6.0", + "blake3", "bytemuck", "cuprate-database", "cuprate-database-service", diff --git a/Cargo.toml b/Cargo.toml index 3865863..d5aca71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,6 +82,7 @@ cuprate-rpc-interface = { path = "rpc/interface" ,default-feature anyhow = { version = "1.0.89", default-features = false } async-trait = { version = "0.1.82", default-features = false } bitflags = { version = "2.6.0", default-features = false } +blake3 = { version = "1", default-features = false } borsh = { version = "1.5.1", default-features = false } bytemuck = { version = "1.18.0", default-features = false } bytes = { version = "1.7.2", default-features = false } diff --git a/binaries/cuprated/Cargo.toml b/binaries/cuprated/Cargo.toml index 2f22be0..880c205 100644 --- a/binaries/cuprated/Cargo.toml +++ b/binaries/cuprated/Cargo.toml @@ -21,7 +21,7 @@ cuprate-levin = { workspace = true } cuprate-wire = { workspace = true } cuprate-p2p = { workspace = true } cuprate-p2p-core = { workspace = true } -cuprate-dandelion-tower = { workspace = true } +cuprate-dandelion-tower = { workspace = true, features = ["txpool"] } cuprate-async-buffer = { workspace = true } cuprate-address-book = { workspace = true } cuprate-blockchain = { workspace = true, features = ["service"] } diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index a06f3fa..c4b75e4 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -25,7 +25,7 @@ mod manager; mod syncer; mod types; -use types::{ +pub use types::{ ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle, }; diff --git a/binaries/cuprated/src/blockchain/interface.rs b/binaries/cuprated/src/blockchain/interface.rs index 985e60d..2482784 100644 --- a/binaries/cuprated/src/blockchain/interface.rs +++ b/binaries/cuprated/src/blockchain/interface.rs @@ -8,17 +8,16 @@ use std::{ }; use monero_serai::{block::Block, transaction::Transaction}; -use rayon::prelude::*; use tokio::sync::{mpsc, oneshot}; use tower::{Service, ServiceExt}; use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_consensus::transactions::new_tx_verification_data; -use cuprate_helper::cast::usize_to_u64; -use cuprate_types::{ - blockchain::{BlockchainReadRequest, BlockchainResponse}, - Chain, +use cuprate_txpool::service::{ + interface::{TxpoolReadRequest, TxpoolReadResponse}, + TxpoolReadHandle, }; +use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; use crate::{ blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk}, @@ -38,7 +37,7 @@ pub enum IncomingBlockError { /// /// The inner values are the block hash and the indexes of the missing txs in the block. #[error("Unknown transactions in block.")] - UnknownTransactions([u8; 32], Vec), + UnknownTransactions([u8; 32], Vec), /// We are missing the block's parent. #[error("The block has an unknown parent.")] Orphan, @@ -59,8 +58,9 @@ pub enum IncomingBlockError { /// - the block's parent is unknown pub async fn handle_incoming_block( block: Block, - given_txs: Vec, + mut given_txs: HashMap<[u8; 32], Transaction>, blockchain_read_handle: &mut BlockchainReadHandle, + txpool_read_handle: &mut TxpoolReadHandle, ) -> Result { /// A [`HashSet`] of block hashes that the blockchain manager is currently handling. /// @@ -72,7 +72,12 @@ pub async fn handle_incoming_block( /// which are also more expensive than `Mutex`s. static BLOCKS_BEING_HANDLED: LazyLock>> = LazyLock::new(|| Mutex::new(HashSet::new())); - // FIXME: we should look in the tx-pool for txs when that is ready. + + if given_txs.len() > block.transactions.len() { + return Err(IncomingBlockError::InvalidBlock(anyhow::anyhow!( + "Too many transactions given for block" + ))); + } if !block_exists(block.header.previous, blockchain_read_handle) .await @@ -90,23 +95,36 @@ pub async fn handle_incoming_block( return Ok(IncomingBlockOk::AlreadyHave); } - // TODO: remove this when we have a working tx-pool. - if given_txs.len() != block.transactions.len() { - return Err(IncomingBlockError::UnknownTransactions( - block_hash, - (0..usize_to_u64(block.transactions.len())).collect(), - )); - } + let TxpoolReadResponse::TxsForBlock { mut txs, missing } = txpool_read_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolReadRequest::TxsForBlock(block.transactions.clone())) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; - // TODO: check we actually got given the right txs. - let prepped_txs = given_txs - .into_par_iter() - .map(|tx| { - let tx = new_tx_verification_data(tx)?; - Ok((tx.tx_hash, tx)) - }) - .collect::>() - .map_err(IncomingBlockError::InvalidBlock)?; + if !missing.is_empty() { + let needed_hashes = missing.iter().map(|index| block.transactions[*index]); + + for needed_hash in needed_hashes { + let Some(tx) = given_txs.remove(&needed_hash) else { + // We return back the indexes of all txs missing from our pool, not taking into account the txs + // that were given with the block, as these txs will be dropped. It is not worth it to try to add + // these txs to the pool as this will only happen with a misbehaving peer or if the txpool reaches + // the size limit. + return Err(IncomingBlockError::UnknownTransactions(block_hash, missing)); + }; + + txs.insert( + needed_hash, + new_tx_verification_data(tx) + .map_err(|e| IncomingBlockError::InvalidBlock(e.into()))?, + ); + } + } let Some(incoming_block_tx) = COMMAND_TX.get() else { // We could still be starting up the blockchain manager. @@ -119,28 +137,37 @@ pub async fn handle_incoming_block( return Ok(IncomingBlockOk::AlreadyHave); } - // From this point on we MUST not early return without removing the block hash from `BLOCKS_BEING_HANDLED`. + // We must remove the block hash from `BLOCKS_BEING_HANDLED`. + let _guard = { + struct RemoveFromBlocksBeingHandled { + block_hash: [u8; 32], + } + impl Drop for RemoveFromBlocksBeingHandled { + fn drop(&mut self) { + BLOCKS_BEING_HANDLED + .lock() + .unwrap() + .remove(&self.block_hash); + } + } + RemoveFromBlocksBeingHandled { block_hash } + }; let (response_tx, response_rx) = oneshot::channel(); incoming_block_tx .send(BlockchainManagerCommand::AddBlock { block, - prepped_txs, + prepped_txs: txs, response_tx, }) .await .expect("TODO: don't actually panic here, an err means we are shutting down"); - let res = response_rx + response_rx .await .expect("The blockchain manager will always respond") - .map_err(IncomingBlockError::InvalidBlock); - - // Remove the block hash from the blocks being handled. - BLOCKS_BEING_HANDLED.lock().unwrap().remove(&block_hash); - - res + .map_err(IncomingBlockError::InvalidBlock) } /// Check if we have a block with the given hash. diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index 8e613bc..2166795 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -18,6 +18,7 @@ use cuprate_p2p::{ BroadcastSvc, NetworkInterface, }; use cuprate_p2p_core::ClearNet; +use cuprate_txpool::service::TxpoolWriteHandle; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse}, Chain, TransactionVerificationData, @@ -46,6 +47,7 @@ pub async fn init_blockchain_manager( clearnet_interface: NetworkInterface, blockchain_write_handle: BlockchainWriteHandle, blockchain_read_handle: BlockchainReadHandle, + txpool_write_handle: TxpoolWriteHandle, mut blockchain_context_service: BlockChainContextService, block_verifier_service: ConcreteBlockVerifierService, block_downloader_config: BlockDownloaderConfig, @@ -80,6 +82,7 @@ pub async fn init_blockchain_manager( let manager = BlockchainManager { blockchain_write_handle, blockchain_read_handle, + txpool_write_handle, blockchain_context_service, cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(), block_verifier_service, @@ -102,6 +105,8 @@ pub struct BlockchainManager { blockchain_write_handle: BlockchainWriteHandle, /// A [`BlockchainReadHandle`]. blockchain_read_handle: BlockchainReadHandle, + /// A [`TxpoolWriteHandle`]. + txpool_write_handle: TxpoolWriteHandle, // TODO: Improve the API of the cache service. // TODO: rename the cache service -> `BlockchainContextService`. /// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index e9805cd..5d1cd2d 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -1,7 +1,10 @@ //! The blockchain manager handler functions. use bytes::Bytes; use futures::{TryFutureExt, TryStreamExt}; -use monero_serai::{block::Block, transaction::Transaction}; +use monero_serai::{ + block::Block, + transaction::{Input, Transaction}, +}; use rayon::prelude::*; use std::ops::ControlFlow; use std::{collections::HashMap, sync::Arc}; @@ -17,16 +20,14 @@ use cuprate_consensus::{ use cuprate_consensus_context::NewBlockData; use cuprate_helper::cast::usize_to_u64; use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest}; +use cuprate_txpool::service::interface::TxpoolWriteRequest; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest}, AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation, }; -use crate::blockchain::manager::commands::IncomingBlockOk; use crate::{ - blockchain::{ - manager::commands::BlockchainManagerCommand, types::ConsensusBlockchainReadHandle, - }, + blockchain::manager::commands::{BlockchainManagerCommand, IncomingBlockOk}, constants::PANIC_CRITICAL_SERVICE_ERROR, signals::REORG_LOCK, }; @@ -434,6 +435,18 @@ impl super::BlockchainManager { &mut self, verified_block: VerifiedBlockInformation, ) { + // FIXME: this is pretty inefficient, we should probably return the KI map created in the consensus crate. + let spent_key_images = verified_block + .txs + .iter() + .flat_map(|tx| { + tx.tx.prefix().inputs.iter().map(|input| match input { + Input::ToKey { key_image, .. } => key_image.compress().0, + Input::Gen(_) => unreachable!(), + }) + }) + .collect::>(); + self.blockchain_context_service .ready() .await @@ -472,6 +485,14 @@ impl super::BlockchainManager { }; self.cached_blockchain_context = blockchain_context.unchecked_blockchain_context().clone(); + + self.txpool_write_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolWriteRequest::NewBlock { spent_key_images }) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); } } diff --git a/binaries/cuprated/src/blockchain/syncer.rs b/binaries/cuprated/src/blockchain/syncer.rs index 7d6874e..913c983 100644 --- a/binaries/cuprated/src/blockchain/syncer.rs +++ b/binaries/cuprated/src/blockchain/syncer.rs @@ -1,11 +1,10 @@ // FIXME: This whole module is not great and should be rewritten when the PeerSet is made. -use std::{pin::pin, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use futures::StreamExt; -use tokio::time::interval; use tokio::{ sync::{mpsc, Notify}, - time::sleep, + time::interval, }; use tower::{Service, ServiceExt}; use tracing::instrument; diff --git a/binaries/cuprated/src/blockchain/types.rs b/binaries/cuprated/src/blockchain/types.rs index e3ee62b..54e4662 100644 --- a/binaries/cuprated/src/blockchain/types.rs +++ b/binaries/cuprated/src/blockchain/types.rs @@ -1,13 +1,7 @@ -use std::task::{Context, Poll}; - -use futures::future::BoxFuture; -use futures::{FutureExt, TryFutureExt}; -use tower::{util::MapErr, Service}; +use tower::util::MapErr; use cuprate_blockchain::{cuprate_database::RuntimeError, service::BlockchainReadHandle}; use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService}; -use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse}; -use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; /// The [`BlockVerifierService`] with all generic types defined. pub type ConcreteBlockVerifierService = BlockVerifierService< diff --git a/binaries/cuprated/src/p2p.rs b/binaries/cuprated/src/p2p.rs index f55d41d..cdf1cef 100644 --- a/binaries/cuprated/src/p2p.rs +++ b/binaries/cuprated/src/p2p.rs @@ -2,4 +2,7 @@ //! //! Will handle initiating the P2P and contains a protocol request handler. +mod network_address; pub mod request_handler; + +pub use network_address::CrossNetworkInternalPeerId; diff --git a/binaries/cuprated/src/p2p/network_address.rs b/binaries/cuprated/src/p2p/network_address.rs new file mode 100644 index 0000000..7fa8e86 --- /dev/null +++ b/binaries/cuprated/src/p2p/network_address.rs @@ -0,0 +1,16 @@ +use std::net::SocketAddr; + +use cuprate_p2p_core::{client::InternalPeerID, ClearNet, NetworkZone}; + +/// An identifier for a P2P peer on any network. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum CrossNetworkInternalPeerId { + /// A clear-net peer. + ClearNet(InternalPeerID<::Addr>), +} + +impl From::Addr>> for CrossNetworkInternalPeerId { + fn from(addr: InternalPeerID<::Addr>) -> Self { + Self::ClearNet(addr) + } +} diff --git a/binaries/cuprated/src/statics.rs b/binaries/cuprated/src/statics.rs index 8aab1c9..9839608 100644 --- a/binaries/cuprated/src/statics.rs +++ b/binaries/cuprated/src/statics.rs @@ -1,7 +1,7 @@ //! Global `static`s used throughout `cuprated`. use std::{ - sync::{atomic::AtomicU64, LazyLock}, + sync::LazyLock, time::{SystemTime, UNIX_EPOCH}, }; diff --git a/binaries/cuprated/src/txpool.rs b/binaries/cuprated/src/txpool.rs index a6f05e7..9592c2b 100644 --- a/binaries/cuprated/src/txpool.rs +++ b/binaries/cuprated/src/txpool.rs @@ -1,3 +1,15 @@ //! Transaction Pool //! -//! Will handle initiating the tx-pool, providing the preprocessor required for the dandelion pool. +//! Handles initiating the tx-pool, providing the preprocessor required for the dandelion pool. +use cuprate_consensus::BlockChainContextService; +use cuprate_p2p::NetworkInterface; +use cuprate_p2p_core::ClearNet; +use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; + +use crate::blockchain::ConcreteTxVerifierService; + +mod dandelion; +mod incoming_tx; +mod txs_being_handled; + +pub use incoming_tx::IncomingTxHandler; diff --git a/binaries/cuprated/src/txpool/dandelion.rs b/binaries/cuprated/src/txpool/dandelion.rs new file mode 100644 index 0000000..d791b62 --- /dev/null +++ b/binaries/cuprated/src/txpool/dandelion.rs @@ -0,0 +1,65 @@ +use std::time::Duration; + +use cuprate_dandelion_tower::{ + pool::DandelionPoolService, DandelionConfig, DandelionRouter, Graph, +}; +use cuprate_p2p::NetworkInterface; +use cuprate_p2p_core::ClearNet; +use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; + +use crate::{ + p2p::CrossNetworkInternalPeerId, + txpool::incoming_tx::{DandelionTx, TxId}, +}; + +mod diffuse_service; +mod stem_service; +mod tx_store; + +/// The configuration used for [`cuprate_dandelion_tower`]. +/// +/// TODO: should we expose this to users of cuprated? probably not. +const DANDELION_CONFIG: DandelionConfig = DandelionConfig { + time_between_hop: Duration::from_millis(175), + epoch_duration: Duration::from_secs(10 * 60), + fluff_probability: 0.12, + graph: Graph::FourRegular, +}; + +/// A [`DandelionRouter`] with all generic types defined. +type ConcreteDandelionRouter = DandelionRouter< + stem_service::OutboundPeerStream, + diffuse_service::DiffuseService, + CrossNetworkInternalPeerId, + stem_service::StemPeerService, + DandelionTx, +>; + +/// Starts the dandelion pool manager task and returns a handle to send txs to broadcast. +pub fn start_dandelion_pool_manager( + router: ConcreteDandelionRouter, + txpool_read_handle: TxpoolReadHandle, + txpool_write_handle: TxpoolWriteHandle, +) -> DandelionPoolService { + cuprate_dandelion_tower::pool::start_dandelion_pool_manager( + // TODO: make this constant configurable? + 32, + router, + tx_store::TxStoreService { + txpool_read_handle, + txpool_write_handle, + }, + DANDELION_CONFIG, + ) +} + +/// Creates a [`DandelionRouter`] from a [`NetworkInterface`]. +pub fn dandelion_router(clear_net: NetworkInterface) -> ConcreteDandelionRouter { + DandelionRouter::new( + diffuse_service::DiffuseService { + clear_net_broadcast_service: clear_net.broadcast_svc(), + }, + stem_service::OutboundPeerStream { clear_net }, + DANDELION_CONFIG, + ) +} diff --git a/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs b/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs new file mode 100644 index 0000000..621503f --- /dev/null +++ b/binaries/cuprated/src/txpool/dandelion/diffuse_service.rs @@ -0,0 +1,44 @@ +use std::{ + future::{ready, Ready}, + task::{Context, Poll}, +}; + +use futures::FutureExt; +use tower::Service; + +use cuprate_dandelion_tower::traits::DiffuseRequest; +use cuprate_p2p::{BroadcastRequest, BroadcastSvc}; +use cuprate_p2p_core::ClearNet; + +use crate::txpool::dandelion::DandelionTx; + +/// The dandelion diffusion service. +pub struct DiffuseService { + pub clear_net_broadcast_service: BroadcastSvc, +} + +impl Service> for DiffuseService { + type Response = (); + type Error = tower::BoxError; + type Future = Ready>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.clear_net_broadcast_service + .poll_ready(cx) + .map_err(Into::into) + } + + fn call(&mut self, req: DiffuseRequest) -> Self::Future { + // TODO: the dandelion crate should pass along where we got the tx from. + let Ok(()) = self + .clear_net_broadcast_service + .call(BroadcastRequest::Transaction { + tx_bytes: req.0 .0, + direction: None, + received_from: None, + }) + .into_inner(); + + ready(Ok(())) + } +} diff --git a/binaries/cuprated/src/txpool/dandelion/stem_service.rs b/binaries/cuprated/src/txpool/dandelion/stem_service.rs new file mode 100644 index 0000000..5c0ba65 --- /dev/null +++ b/binaries/cuprated/src/txpool/dandelion/stem_service.rs @@ -0,0 +1,68 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use bytes::Bytes; +use futures::Stream; +use tower::Service; + +use cuprate_dandelion_tower::{traits::StemRequest, OutboundPeer}; +use cuprate_p2p::{ClientPoolDropGuard, NetworkInterface}; +use cuprate_p2p_core::{ + client::{Client, InternalPeerID}, + ClearNet, NetworkZone, PeerRequest, ProtocolRequest, +}; +use cuprate_wire::protocol::NewTransactions; + +use crate::{p2p::CrossNetworkInternalPeerId, txpool::dandelion::DandelionTx}; + +/// The dandelion outbound peer stream. +pub struct OutboundPeerStream { + pub clear_net: NetworkInterface, +} + +impl Stream for OutboundPeerStream { + type Item = Result< + OutboundPeer>, + tower::BoxError, + >; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + // TODO: make the outbound peer choice random. + Poll::Ready(Some(Ok(self + .clear_net + .client_pool() + .outbound_client() + .map_or(OutboundPeer::Exhausted, |client| { + OutboundPeer::Peer( + CrossNetworkInternalPeerId::ClearNet(client.info.id), + StemPeerService(client), + ) + })))) + } +} + +/// The stem service, used to send stem txs. +pub struct StemPeerService(ClientPoolDropGuard); + +impl Service> for StemPeerService { + type Response = as Service>::Response; + type Error = tower::BoxError; + type Future = as Service>::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_ready(cx) + } + + fn call(&mut self, req: StemRequest) -> Self::Future { + self.0 + .call(PeerRequest::Protocol(ProtocolRequest::NewTransactions( + NewTransactions { + txs: vec![req.0 .0], + dandelionpp_fluff: false, + padding: Bytes::new(), + }, + ))) + } +} diff --git a/binaries/cuprated/src/txpool/dandelion/tx_store.rs b/binaries/cuprated/src/txpool/dandelion/tx_store.rs new file mode 100644 index 0000000..b890ffd --- /dev/null +++ b/binaries/cuprated/src/txpool/dandelion/tx_store.rs @@ -0,0 +1,74 @@ +use std::task::{Context, Poll}; + +use bytes::Bytes; +use futures::{future::BoxFuture, FutureExt}; +use tower::{Service, ServiceExt}; + +use cuprate_dandelion_tower::{ + traits::{TxStoreRequest, TxStoreResponse}, + State, +}; +use cuprate_database::RuntimeError; +use cuprate_txpool::service::{ + interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest}, + TxpoolReadHandle, TxpoolWriteHandle, +}; + +use super::{DandelionTx, TxId}; + +/// The dandelion tx-store service. +/// +/// This is just mapping the interface [`cuprate_dandelion_tower`] wants to what [`cuprate_txpool`] provides. +pub struct TxStoreService { + pub txpool_read_handle: TxpoolReadHandle, + pub txpool_write_handle: TxpoolWriteHandle, +} + +impl Service> for TxStoreService { + type Response = TxStoreResponse; + type Error = tower::BoxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: TxStoreRequest) -> Self::Future { + match req { + TxStoreRequest::Get(tx_id) => self + .txpool_read_handle + .clone() + .oneshot(TxpoolReadRequest::TxBlob(tx_id)) + .map(|res| match res { + Ok(TxpoolReadResponse::TxBlob { + tx_blob, + state_stem, + }) => { + let state = if state_stem { + State::Stem + } else { + State::Fluff + }; + + Ok(TxStoreResponse::Transaction(Some(( + DandelionTx(Bytes::from(tx_blob)), + state, + )))) + } + Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Transaction(None)), + Err(e) => Err(e.into()), + Ok(_) => unreachable!(), + }) + .boxed(), + TxStoreRequest::Promote(tx_id) => self + .txpool_write_handle + .clone() + .oneshot(TxpoolWriteRequest::Promote(tx_id)) + .map(|res| match res { + Ok(_) | Err(RuntimeError::KeyNotFound) => Ok(TxStoreResponse::Ok), + Err(e) => Err(e.into()), + }) + .boxed(), + } + } +} diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs new file mode 100644 index 0000000..e204159 --- /dev/null +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -0,0 +1,379 @@ +use std::{ + collections::HashSet, + sync::Arc, + task::{Context, Poll}, +}; + +use bytes::Bytes; +use futures::{future::BoxFuture, FutureExt}; +use monero_serai::transaction::Transaction; +use tower::{Service, ServiceExt}; + +use cuprate_consensus::{ + transactions::new_tx_verification_data, BlockChainContextRequest, BlockChainContextResponse, + BlockChainContextService, ExtendedConsensusError, VerifyTxRequest, +}; +use cuprate_dandelion_tower::{ + pool::{DandelionPoolService, IncomingTxBuilder}, + State, TxState, +}; +use cuprate_helper::asynch::rayon_spawn_async; +use cuprate_p2p::NetworkInterface; +use cuprate_p2p_core::ClearNet; +use cuprate_txpool::{ + service::{ + interface::{ + TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse, + }, + TxpoolReadHandle, TxpoolWriteHandle, + }, + transaction_blob_hash, +}; +use cuprate_types::TransactionVerificationData; + +use crate::{ + blockchain::ConcreteTxVerifierService, + constants::PANIC_CRITICAL_SERVICE_ERROR, + p2p::CrossNetworkInternalPeerId, + signals::REORG_LOCK, + txpool::{ + dandelion, + txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally}, + }, +}; + +/// An error that can happen handling an incoming tx. +pub enum IncomingTxError { + Parse(std::io::Error), + Consensus(ExtendedConsensusError), + DuplicateTransaction, +} + +/// Incoming transactions. +pub struct IncomingTxs { + /// The raw bytes of the transactions. + pub txs: Vec, + /// The routing state of the transactions. + pub state: TxState, +} + +/// The transaction type used for dandelion++. +#[derive(Clone)] +pub struct DandelionTx(pub Bytes); + +/// A transaction ID/hash. +pub(super) type TxId = [u8; 32]; + +/// The service than handles incoming transaction pool transactions. +/// +/// This service handles everything including verifying the tx, adding it to the pool and routing it to other nodes. +pub struct IncomingTxHandler { + /// A store of txs currently being handled in incoming tx requests. + pub(super) txs_being_handled: TxsBeingHandled, + /// The blockchain context cache. + pub(super) blockchain_context_cache: BlockChainContextService, + /// The dandelion txpool manager. + pub(super) dandelion_pool_manager: + DandelionPoolService, + /// The transaction verifier service. + pub(super) tx_verifier_service: ConcreteTxVerifierService, + /// The txpool write handle. + pub(super) txpool_write_handle: TxpoolWriteHandle, + /// The txpool read handle. + pub(super) txpool_read_handle: TxpoolReadHandle, +} + +impl IncomingTxHandler { + /// Initialize the [`IncomingTxHandler`]. + #[expect(clippy::significant_drop_tightening)] + pub fn init( + clear_net: NetworkInterface, + txpool_write_handle: TxpoolWriteHandle, + txpool_read_handle: TxpoolReadHandle, + blockchain_context_cache: BlockChainContextService, + tx_verifier_service: ConcreteTxVerifierService, + ) -> Self { + let dandelion_router = dandelion::dandelion_router(clear_net); + + let dandelion_pool_manager = dandelion::start_dandelion_pool_manager( + dandelion_router, + txpool_read_handle.clone(), + txpool_write_handle.clone(), + ); + + Self { + txs_being_handled: TxsBeingHandled::new(), + blockchain_context_cache, + dandelion_pool_manager, + tx_verifier_service, + txpool_write_handle, + txpool_read_handle, + } + } +} + +impl Service for IncomingTxHandler { + type Response = (); + type Error = IncomingTxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: IncomingTxs) -> Self::Future { + handle_incoming_txs( + req, + self.txs_being_handled.clone(), + self.blockchain_context_cache.clone(), + self.tx_verifier_service.clone(), + self.txpool_write_handle.clone(), + self.txpool_read_handle.clone(), + self.dandelion_pool_manager.clone(), + ) + .boxed() + } +} + +/// Handles the incoming txs. +async fn handle_incoming_txs( + IncomingTxs { txs, state }: IncomingTxs, + txs_being_handled: TxsBeingHandled, + mut blockchain_context_cache: BlockChainContextService, + mut tx_verifier_service: ConcreteTxVerifierService, + mut txpool_write_handle: TxpoolWriteHandle, + mut txpool_read_handle: TxpoolReadHandle, + mut dandelion_pool_manager: DandelionPoolService, +) -> Result<(), IncomingTxError> { + let _reorg_guard = REORG_LOCK.read().await; + + let (txs, stem_pool_txs, txs_being_handled_guard) = + prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?; + + let BlockChainContextResponse::Context(context) = blockchain_context_cache + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockChainContextRequest::Context) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; + + let context = context.unchecked_blockchain_context(); + + tx_verifier_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(VerifyTxRequest::Prepped { + txs: txs.clone(), + current_chain_height: context.chain_height, + top_hash: context.top_hash, + time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), + hf: context.current_hf, + }) + .await + .map_err(IncomingTxError::Consensus)?; + + for tx in txs { + handle_valid_tx( + tx, + state.clone(), + &mut txpool_write_handle, + &mut dandelion_pool_manager, + ) + .await; + } + + // Re-relay any txs we got in the block that were already in our stem pool. + for stem_tx in stem_pool_txs { + rerelay_stem_tx( + &stem_tx, + state.clone(), + &mut txpool_read_handle, + &mut dandelion_pool_manager, + ) + .await; + } + + Ok(()) +} + +/// Prepares the incoming transactions for verification. +/// +/// This will filter out all transactions already in the pool or txs already being handled in another request. +/// +/// Returns in order: +/// - The [`TransactionVerificationData`] for all the txs we did not already have +/// - The Ids of the transactions in the incoming message that are in our stem-pool +/// - A [`TxsBeingHandledLocally`] guard that prevents verifying the same tx at the same time across 2 tasks. +async fn prepare_incoming_txs( + tx_blobs: Vec, + txs_being_handled: TxsBeingHandled, + txpool_read_handle: &mut TxpoolReadHandle, +) -> Result< + ( + Vec>, + Vec, + TxsBeingHandledLocally, + ), + IncomingTxError, +> { + let mut tx_blob_hashes = HashSet::new(); + let mut txs_being_handled_locally = txs_being_handled.local_tracker(); + + // Compute the blob hash for each tx and filter out the txs currently being handled by another incoming tx batch. + let txs = tx_blobs + .into_iter() + .filter_map(|tx_blob| { + let tx_blob_hash = transaction_blob_hash(&tx_blob); + + // If a duplicate is in here the incoming tx batch contained the same tx twice. + if !tx_blob_hashes.insert(tx_blob_hash) { + return Some(Err(IncomingTxError::DuplicateTransaction)); + } + + // If a duplicate is here it is being handled in another batch. + if !txs_being_handled_locally.try_add_tx(tx_blob_hash) { + return None; + } + + Some(Ok((tx_blob_hash, tx_blob))) + }) + .collect::, _>>()?; + + // Filter the txs already in the txpool out. + // This will leave the txs already in the pool in [`TxBeingHandledLocally`] but that shouldn't be an issue. + let TxpoolReadResponse::FilterKnownTxBlobHashes { + unknown_blob_hashes, + stem_pool_hashes, + } = txpool_read_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes)) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; + + // Now prepare the txs for verification. + rayon_spawn_async(move || { + let txs = txs + .into_iter() + .filter_map(|(tx_blob_hash, tx_blob)| { + if unknown_blob_hashes.contains(&tx_blob_hash) { + Some(tx_blob) + } else { + None + } + }) + .map(|bytes| { + let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?; + + let tx = new_tx_verification_data(tx) + .map_err(|e| IncomingTxError::Consensus(e.into()))?; + + Ok(Arc::new(tx)) + }) + .collect::, IncomingTxError>>()?; + + Ok((txs, stem_pool_hashes, txs_being_handled_locally)) + }) + .await +} + +/// Handle a verified tx. +/// +/// This will add the tx to the txpool and route it to the network. +async fn handle_valid_tx( + tx: Arc, + state: TxState, + txpool_write_handle: &mut TxpoolWriteHandle, + dandelion_pool_manager: &mut DandelionPoolService< + DandelionTx, + TxId, + CrossNetworkInternalPeerId, + >, +) { + let incoming_tx = + IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx.tx_blob)), tx.tx_hash); + + let TxpoolWriteResponse::AddTransaction(double_spend) = txpool_write_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolWriteRequest::AddTransaction { + tx, + state_stem: state.is_stem_stage(), + }) + .await + .expect("TODO") + else { + unreachable!() + }; + + // TODO: track double spends to quickly ignore them from their blob hash. + if let Some(tx_hash) = double_spend { + return; + }; + + // TODO: There is a race condition possible if a tx and block come in at the same time: . + + let incoming_tx = incoming_tx + .with_routing_state(state) + .with_state_in_db(None) + .build() + .unwrap(); + + dandelion_pool_manager + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(incoming_tx) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); +} + +/// Re-relay a tx that was already in our stem pool. +async fn rerelay_stem_tx( + tx_hash: &TxId, + state: TxState, + txpool_read_handle: &mut TxpoolReadHandle, + dandelion_pool_manager: &mut DandelionPoolService< + DandelionTx, + TxId, + CrossNetworkInternalPeerId, + >, +) { + let Ok(TxpoolReadResponse::TxBlob { tx_blob, .. }) = txpool_read_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolReadRequest::TxBlob(*tx_hash)) + .await + else { + // The tx could have been dropped from the pool. + return; + }; + + let incoming_tx = + IncomingTxBuilder::new(DandelionTx(Bytes::copy_from_slice(&tx_blob)), *tx_hash); + + let incoming_tx = incoming_tx + .with_routing_state(state) + .with_state_in_db(Some(State::Stem)) + .build() + .unwrap(); + + dandelion_pool_manager + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(incoming_tx) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); +} diff --git a/binaries/cuprated/src/txpool/txs_being_handled.rs b/binaries/cuprated/src/txpool/txs_being_handled.rs new file mode 100644 index 0000000..122b8ac --- /dev/null +++ b/binaries/cuprated/src/txpool/txs_being_handled.rs @@ -0,0 +1,53 @@ +use std::sync::Arc; + +use dashmap::DashSet; + +/// A set of txs currently being handled, shared between instances of the incoming tx handler. +#[derive(Clone)] +pub struct TxsBeingHandled(Arc>); + +impl TxsBeingHandled { + /// Create a new [`TxsBeingHandled`] + pub fn new() -> Self { + Self(Arc::new(DashSet::new())) + } + + /// Create a new [`TxsBeingHandledLocally`] that will keep track of txs being handled in a request. + pub fn local_tracker(&self) -> TxsBeingHandledLocally { + TxsBeingHandledLocally { + txs_being_handled: self.clone(), + txs: vec![], + } + } +} + +/// A tracker of txs being handled in a single request. This will add the txs to the global [`TxsBeingHandled`] +/// tracker as well. +/// +/// When this is dropped the txs will be removed from [`TxsBeingHandled`]. +pub struct TxsBeingHandledLocally { + txs_being_handled: TxsBeingHandled, + txs: Vec<[u8; 32]>, +} + +impl TxsBeingHandledLocally { + /// Try add a tx to the map from its [`transaction_blob_hash`](cuprate_txpool::transaction_blob_hash). + /// + /// Returns `true` if the tx was added and `false` if another task is already handling this tx. + pub fn try_add_tx(&mut self, tx_blob_hash: [u8; 32]) -> bool { + if !self.txs_being_handled.0.insert(tx_blob_hash) { + return false; + } + + self.txs.push(tx_blob_hash); + true + } +} + +impl Drop for TxsBeingHandledLocally { + fn drop(&mut self) { + for hash in &self.txs { + self.txs_being_handled.0.remove(hash); + } + } +} diff --git a/p2p/dandelion-tower/src/router.rs b/p2p/dandelion-tower/src/router.rs index 88702be..7ca0598 100644 --- a/p2p/dandelion-tower/src/router.rs +++ b/p2p/dandelion-tower/src/router.rs @@ -73,6 +73,15 @@ pub enum TxState { Local, } +impl TxState { + /// Returns `true` if the tx is in the stem stage. + /// + /// [`TxState::Local`] & [`TxState::Stem`] are the 2 stem stage states. + pub const fn is_stem_stage(&self) -> bool { + matches!(self, Self::Local | Self::Stem { .. }) + } +} + /// A request to route a transaction. pub struct DandelionRouteReq { /// The transaction. diff --git a/p2p/p2p/src/client_pool.rs b/p2p/p2p/src/client_pool.rs index fc97fc1..67c8f11 100644 --- a/p2p/p2p/src/client_pool.rs +++ b/p2p/p2p/src/client_pool.rs @@ -18,13 +18,13 @@ use tracing::{Instrument, Span}; use cuprate_p2p_core::{ client::{Client, InternalPeerID}, handles::ConnectionHandle, - NetworkZone, + ConnectionDirection, NetworkZone, }; pub(crate) mod disconnect_monitor; mod drop_guard_client; -pub(crate) use drop_guard_client::ClientPoolDropGuard; +pub use drop_guard_client::ClientPoolDropGuard; /// The client pool, which holds currently connected free peers. /// @@ -165,6 +165,17 @@ impl ClientPool { sync_data.cumulative_difficulty() > cumulative_difficulty }) } + + /// Returns the first outbound peer when iterating over the peers. + pub fn outbound_client(self: &Arc) -> Option> { + let client = self + .clients + .iter() + .find(|element| element.value().info.direction == ConnectionDirection::Outbound)?; + let id = *client.key(); + + Some(self.borrow_client(&id).unwrap()) + } } mod sealed { diff --git a/p2p/p2p/src/lib.rs b/p2p/p2p/src/lib.rs index b3577a7..541784c 100644 --- a/p2p/p2p/src/lib.rs +++ b/p2p/p2p/src/lib.rs @@ -18,7 +18,7 @@ use cuprate_p2p_core::{ pub mod block_downloader; mod broadcast; -mod client_pool; +pub mod client_pool; pub mod config; pub mod connection_maintainer; pub mod constants; @@ -26,6 +26,7 @@ mod inbound_server; use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}; pub use broadcast::{BroadcastRequest, BroadcastSvc}; +pub use client_pool::{ClientPool, ClientPoolDropGuard}; pub use config::{AddressBookConfig, P2PConfig}; use connection_maintainer::MakeConnectionRequest; @@ -82,7 +83,7 @@ where let outbound_handshaker = outbound_handshaker_builder.build(); - let client_pool = client_pool::ClientPool::new(); + let client_pool = ClientPool::new(); let (make_connection_tx, make_connection_rx) = mpsc::channel(3); @@ -132,7 +133,7 @@ where #[derive(Clone)] pub struct NetworkInterface { /// A pool of free connected peers. - pool: Arc>, + pool: Arc>, /// A [`Service`] that allows broadcasting to all connected peers. broadcast_svc: BroadcastSvc, /// A channel to request extra connections. @@ -173,7 +174,7 @@ impl NetworkInterface { } /// Borrows the `ClientPool`, for access to connected peers. - pub const fn client_pool(&self) -> &Arc> { + pub const fn client_pool(&self) -> &Arc> { &self.pool } } diff --git a/storage/service/src/service/write.rs b/storage/service/src/service/write.rs index f75d615..607c4aa 100644 --- a/storage/service/src/service/write.rs +++ b/storage/service/src/service/write.rs @@ -30,6 +30,14 @@ pub struct DatabaseWriteHandle { crossbeam::channel::Sender<(Req, oneshot::Sender>)>, } +impl Clone for DatabaseWriteHandle { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + } + } +} + impl DatabaseWriteHandle where Req: Send + 'static, diff --git a/storage/txpool/Cargo.toml b/storage/txpool/Cargo.toml index b9d4218..c301166 100644 --- a/storage/txpool/Cargo.toml +++ b/storage/txpool/Cargo.toml @@ -29,6 +29,7 @@ bytemuck = { workspace = true, features = ["must_cast", "derive" bitflags = { workspace = true, features = ["std", "serde", "bytemuck"] } thiserror = { workspace = true } hex = { workspace = true } +blake3 = { workspace = true, features = ["std"] } tower = { workspace = true, optional = true } rayon = { workspace = true, optional = true } diff --git a/storage/txpool/src/free.rs b/storage/txpool/src/free.rs index d394002..d0f9a31 100644 --- a/storage/txpool/src/free.rs +++ b/storage/txpool/src/free.rs @@ -3,7 +3,7 @@ //---------------------------------------------------------------------------------------------------- Import use cuprate_database::{ConcreteEnv, Env, EnvInner, InitError, RuntimeError, TxRw}; -use crate::{config::Config, tables::OpenTables}; +use crate::{config::Config, tables::OpenTables, types::TransactionBlobHash}; //---------------------------------------------------------------------------------------------------- Free functions /// Open the txpool database using the passed [`Config`]. @@ -60,3 +60,13 @@ pub fn open(config: Config) -> Result { Ok(env) } + +/// Calculate the transaction blob hash. +/// +/// This value is supposed to be quick to compute just based of the tx-blob without needing to parse the tx. +/// +/// The exact way the hash is calculated is not stable and is subject to change, as such it should not be exposed +/// as a way to interact with Cuprate externally. +pub fn transaction_blob_hash(tx_blob: &[u8]) -> TransactionBlobHash { + blake3::hash(tx_blob).into() +} diff --git a/storage/txpool/src/lib.rs b/storage/txpool/src/lib.rs index 5fb3b14..8a57c72 100644 --- a/storage/txpool/src/lib.rs +++ b/storage/txpool/src/lib.rs @@ -14,7 +14,7 @@ mod tx; pub mod types; pub use config::Config; -pub use free::open; +pub use free::{open, transaction_blob_hash}; pub use tx::TxEntry; //re-exports diff --git a/storage/txpool/src/ops.rs b/storage/txpool/src/ops.rs index 50d9ea4..289a8bb 100644 --- a/storage/txpool/src/ops.rs +++ b/storage/txpool/src/ops.rs @@ -85,7 +85,7 @@ mod key_images; mod tx_read; mod tx_write; -pub use tx_read::get_transaction_verification_data; +pub use tx_read::{get_transaction_verification_data, in_stem_pool}; pub use tx_write::{add_transaction, remove_transaction}; /// An error that can occur on some tx-write ops. diff --git a/storage/txpool/src/ops/tx_read.rs b/storage/txpool/src/ops/tx_read.rs index db89415..5569075 100644 --- a/storage/txpool/src/ops/tx_read.rs +++ b/storage/txpool/src/ops/tx_read.rs @@ -8,7 +8,10 @@ use monero_serai::transaction::Transaction; use cuprate_database::{DatabaseRo, RuntimeError}; use cuprate_types::{TransactionVerificationData, TxVersion}; -use crate::{tables::Tables, types::TransactionHash}; +use crate::{ + tables::{Tables, TransactionInfos}, + types::{TransactionHash, TxStateFlags}, +}; /// Gets the [`TransactionVerificationData`] of a transaction in the tx-pool, leaving the tx in the pool. pub fn get_transaction_verification_data( @@ -34,3 +37,17 @@ pub fn get_transaction_verification_data( cached_verification_state: Mutex::new(cached_verification_state), }) } + +/// Returns `true` if the transaction with the given hash is in the stem pool. +/// +/// # Errors +/// This will return an [`Err`] if the transaction is not in the pool. +pub fn in_stem_pool( + tx_hash: &TransactionHash, + tx_infos: &impl DatabaseRo, +) -> Result { + Ok(tx_infos + .get(tx_hash)? + .flags + .contains(TxStateFlags::STATE_STEM)) +} diff --git a/storage/txpool/src/ops/tx_write.rs b/storage/txpool/src/ops/tx_write.rs index 9885b9c..dc5ab46 100644 --- a/storage/txpool/src/ops/tx_write.rs +++ b/storage/txpool/src/ops/tx_write.rs @@ -8,6 +8,7 @@ use cuprate_database::{DatabaseRw, RuntimeError, StorableVec}; use cuprate_types::TransactionVerificationData; use crate::{ + free::transaction_blob_hash, ops::{ key_images::{add_tx_key_images, remove_tx_key_images}, TxPoolWriteError, @@ -56,6 +57,12 @@ pub fn add_transaction( let kis_table = tables.spent_key_images_mut(); add_tx_key_images(&tx.tx.prefix().inputs, &tx.tx_hash, kis_table)?; + // Add the blob hash to table 4. + let blob_hash = transaction_blob_hash(&tx.tx_blob); + tables + .known_blob_hashes_mut() + .put(&blob_hash, &tx.tx_hash)?; + Ok(()) } @@ -79,5 +86,9 @@ pub fn remove_transaction( let kis_table = tables.spent_key_images_mut(); remove_tx_key_images(&tx.prefix().inputs, kis_table)?; + // Remove the blob hash from table 4. + let blob_hash = transaction_blob_hash(&tx_blob); + tables.known_blob_hashes_mut().delete(&blob_hash)?; + Ok(()) } diff --git a/storage/txpool/src/service/interface.rs b/storage/txpool/src/service/interface.rs index 88dd02e..5cd518f 100644 --- a/storage/txpool/src/service/interface.rs +++ b/storage/txpool/src/service/interface.rs @@ -1,21 +1,36 @@ //! Tx-pool [`service`](super) interface. //! //! This module contains `cuprate_txpool`'s [`tower::Service`] request and response enums. -use std::sync::Arc; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use cuprate_types::TransactionVerificationData; -use crate::{tx::TxEntry, types::TransactionHash}; +use crate::{ + tx::TxEntry, + types::{KeyImage, TransactionBlobHash, TransactionHash}, +}; //---------------------------------------------------------------------------------------------------- TxpoolReadRequest /// The transaction pool [`tower::Service`] read request type. +#[derive(Clone)] pub enum TxpoolReadRequest { - /// A request for the blob (raw bytes) of a transaction with the given hash. + /// Get the blob (raw bytes) of a transaction with the given hash. TxBlob(TransactionHash), - /// A request for the [`TransactionVerificationData`] of a transaction in the tx pool. + /// Get the [`TransactionVerificationData`] of a transaction in the tx pool. TxVerificationData(TransactionHash), + /// Filter (remove) all **known** transactions from the set. + /// + /// The hash is **not** the transaction hash, it is the hash of the serialized tx-blob. + FilterKnownTxBlobHashes(HashSet), + + /// Get some transactions for an incoming block. + TxsForBlock(Vec), + /// Get information on all transactions in the pool. Backlog, @@ -27,15 +42,28 @@ pub enum TxpoolReadRequest { /// The transaction pool [`tower::Service`] read response type. #[expect(clippy::large_enum_variant)] pub enum TxpoolReadResponse { - /// Response to [`TxpoolReadRequest::TxBlob`]. - /// - /// The inner value is the raw bytes of a transaction. - // TODO: use bytes::Bytes. - TxBlob(Vec), + /// The response for [`TxpoolReadRequest::TxBlob`]. + TxBlob { tx_blob: Vec, state_stem: bool }, - /// Response to [`TxpoolReadRequest::TxVerificationData`]. + /// The response for [`TxpoolReadRequest::TxVerificationData`]. TxVerificationData(TransactionVerificationData), + /// The response for [`TxpoolReadRequest::FilterKnownTxBlobHashes`]. + FilterKnownTxBlobHashes { + /// The blob hashes that are unknown. + unknown_blob_hashes: HashSet, + /// The tx hashes of the blob hashes that were known but were in the stem pool. + stem_pool_hashes: Vec, + }, + + /// The response for [`TxpoolReadRequest::TxsForBlock`]. + TxsForBlock { + /// The txs we had in the txpool. + txs: HashMap<[u8; 32], TransactionVerificationData>, + /// The indexes of the missing txs. + missing: Vec, + }, + /// Response to [`TxpoolReadRequest::Backlog`]. /// /// The inner `Vec` contains information on all @@ -66,9 +94,17 @@ pub enum TxpoolWriteRequest { }, /// Remove a transaction with the given hash from the pool. - /// - /// Returns [`TxpoolWriteResponse::Ok`]. RemoveTransaction(TransactionHash), + + /// Promote a transaction from the stem pool to the fluff pool. + /// If the tx is already in the fluff pool this does nothing. + Promote(TransactionHash), + + /// Tell the tx-pool about a new block. + NewBlock { + /// The spent key images in the new block. + spent_key_images: Vec, + }, } //---------------------------------------------------------------------------------------------------- TxpoolWriteResponse @@ -77,6 +113,8 @@ pub enum TxpoolWriteRequest { pub enum TxpoolWriteResponse { /// Response to: /// - [`TxpoolWriteRequest::RemoveTransaction`] + /// - [`TxpoolWriteRequest::Promote`] + /// - [`TxpoolWriteRequest::NewBlock`] Ok, /// Response to [`TxpoolWriteRequest::AddTransaction`]. diff --git a/storage/txpool/src/service/read.rs b/storage/txpool/src/service/read.rs index 3135322..257fe8e 100644 --- a/storage/txpool/src/service/read.rs +++ b/storage/txpool/src/service/read.rs @@ -4,22 +4,24 @@ clippy::unnecessary_wraps, reason = "TODO: finish implementing the signatures from " )] - -use std::sync::Arc; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use rayon::ThreadPool; -use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner}; +use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError}; use cuprate_database_service::{init_thread_pool, DatabaseReadService, ReaderThreads}; use crate::{ - ops::get_transaction_verification_data, + ops::{get_transaction_verification_data, in_stem_pool}, service::{ interface::{TxpoolReadRequest, TxpoolReadResponse}, types::{ReadResponseResult, TxpoolReadHandle}, }, - tables::{OpenTables, TransactionBlobs}, - types::TransactionHash, + tables::{KnownBlobHashes, OpenTables, TransactionBlobs, TransactionInfos}, + types::{TransactionBlobHash, TransactionHash}, }; // TODO: update the docs here @@ -57,7 +59,6 @@ fn init_read_service_with_pool(env: Arc, pool: Arc) -> /// 1. `Request` is mapped to a handler function /// 2. Handler function is called /// 3. [`TxpoolReadResponse`] is returned -#[expect(clippy::needless_pass_by_value)] fn map_request( env: &ConcreteEnv, // Access to the database request: TxpoolReadRequest, // The request we must fulfill @@ -65,6 +66,10 @@ fn map_request( match request { TxpoolReadRequest::TxBlob(tx_hash) => tx_blob(env, &tx_hash), TxpoolReadRequest::TxVerificationData(tx_hash) => tx_verification_data(env, &tx_hash), + TxpoolReadRequest::FilterKnownTxBlobHashes(blob_hashes) => { + filter_known_tx_blob_hashes(env, blob_hashes) + } + TxpoolReadRequest::TxsForBlock(txs_needed) => txs_for_block(env, txs_needed), TxpoolReadRequest::Backlog => backlog(env), TxpoolReadRequest::Size => size(env), } @@ -94,10 +99,14 @@ fn tx_blob(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadResponseResult { let tx_ro = inner_env.tx_ro()?; let tx_blobs_table = inner_env.open_db_ro::(&tx_ro)?; + let tx_infos_table = inner_env.open_db_ro::(&tx_ro)?; - tx_blobs_table - .get(tx_hash) - .map(|blob| TxpoolReadResponse::TxBlob(blob.0)) + let tx_blob = tx_blobs_table.get(tx_hash)?.0; + + Ok(TxpoolReadResponse::TxBlob { + tx_blob, + state_stem: in_stem_pool(tx_hash, &tx_infos_table)?, + }) } /// [`TxpoolReadRequest::TxVerificationData`]. @@ -111,6 +120,79 @@ fn tx_verification_data(env: &ConcreteEnv, tx_hash: &TransactionHash) -> ReadRes get_transaction_verification_data(tx_hash, &tables).map(TxpoolReadResponse::TxVerificationData) } +/// [`TxpoolReadRequest::FilterKnownTxBlobHashes`]. +fn filter_known_tx_blob_hashes( + env: &ConcreteEnv, + mut blob_hashes: HashSet, +) -> ReadResponseResult { + let inner_env = env.env_inner(); + let tx_ro = inner_env.tx_ro()?; + + let tx_blob_hashes = inner_env.open_db_ro::(&tx_ro)?; + let tx_infos = inner_env.open_db_ro::(&tx_ro)?; + + let mut stem_pool_hashes = Vec::new(); + + // A closure that returns `true` if a tx with a certain blob hash is unknown. + // This also fills in `stem_tx_hashes`. + let mut tx_unknown = |blob_hash| -> Result { + match tx_blob_hashes.get(&blob_hash) { + Ok(tx_hash) => { + if in_stem_pool(&tx_hash, &tx_infos)? { + stem_pool_hashes.push(tx_hash); + } + Ok(false) + } + Err(RuntimeError::KeyNotFound) => Ok(true), + Err(e) => Err(e), + } + }; + + let mut err = None; + blob_hashes.retain(|blob_hash| match tx_unknown(*blob_hash) { + Ok(res) => res, + Err(e) => { + err = Some(e); + false + } + }); + + if let Some(e) = err { + return Err(e); + } + + Ok(TxpoolReadResponse::FilterKnownTxBlobHashes { + unknown_blob_hashes: blob_hashes, + stem_pool_hashes, + }) +} + +/// [`TxpoolReadRequest::TxsForBlock`]. +fn txs_for_block(env: &ConcreteEnv, txs: Vec) -> ReadResponseResult { + let inner_env = env.env_inner(); + let tx_ro = inner_env.tx_ro()?; + + let tables = inner_env.open_tables(&tx_ro)?; + + let mut missing_tx_indexes = Vec::with_capacity(txs.len()); + let mut txs_verification_data = HashMap::with_capacity(txs.len()); + + for (i, tx_hash) in txs.into_iter().enumerate() { + match get_transaction_verification_data(&tx_hash, &tables) { + Ok(tx) => { + txs_verification_data.insert(tx_hash, tx); + } + Err(RuntimeError::KeyNotFound) => missing_tx_indexes.push(i), + Err(e) => return Err(e), + } + } + + Ok(TxpoolReadResponse::TxsForBlock { + txs: txs_verification_data, + missing: missing_tx_indexes, + }) +} + /// [`TxpoolReadRequest::Backlog`]. #[inline] fn backlog(env: &ConcreteEnv) -> ReadResponseResult { diff --git a/storage/txpool/src/service/write.rs b/storage/txpool/src/service/write.rs index 8a3b1bf..13ab81f 100644 --- a/storage/txpool/src/service/write.rs +++ b/storage/txpool/src/service/write.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use cuprate_database::{ConcreteEnv, Env, EnvInner, RuntimeError, TxRw}; +use cuprate_database::{ConcreteEnv, DatabaseRo, DatabaseRw, Env, EnvInner, RuntimeError, TxRw}; use cuprate_database_service::DatabaseWriteHandle; use cuprate_types::TransactionVerificationData; @@ -10,8 +10,8 @@ use crate::{ interface::{TxpoolWriteRequest, TxpoolWriteResponse}, types::TxpoolWriteHandle, }, - tables::OpenTables, - types::TransactionHash, + tables::{OpenTables, Tables, TransactionInfos}, + types::{KeyImage, TransactionHash, TxStateFlags}, }; //---------------------------------------------------------------------------------------------------- init_write_service @@ -31,6 +31,8 @@ fn handle_txpool_request( add_transaction(env, tx, *state_stem) } TxpoolWriteRequest::RemoveTransaction(tx_hash) => remove_transaction(env, tx_hash), + TxpoolWriteRequest::Promote(tx_hash) => promote(env, tx_hash), + TxpoolWriteRequest::NewBlock { spent_key_images } => new_block(env, spent_key_images), } } @@ -101,3 +103,68 @@ fn remove_transaction( TxRw::commit(tx_rw)?; Ok(TxpoolWriteResponse::Ok) } + +/// [`TxpoolWriteRequest::Promote`] +fn promote( + env: &ConcreteEnv, + tx_hash: &TransactionHash, +) -> Result { + let env_inner = env.env_inner(); + let tx_rw = env_inner.tx_rw()?; + + let res = || { + let mut tx_infos = env_inner.open_db_rw::(&tx_rw)?; + + tx_infos.update(tx_hash, |mut info| { + info.flags.remove(TxStateFlags::STATE_STEM); + Some(info) + }) + }; + + if let Err(e) = res() { + // error promoting the tx, abort the DB transaction. + TxRw::abort(tx_rw) + .expect("could not maintain database atomicity by aborting write transaction"); + + return Err(e); + } + + TxRw::commit(tx_rw)?; + Ok(TxpoolWriteResponse::Ok) +} + +/// [`TxpoolWriteRequest::NewBlock`] +fn new_block( + env: &ConcreteEnv, + spent_key_images: &[KeyImage], +) -> Result { + let env_inner = env.env_inner(); + let tx_rw = env_inner.tx_rw()?; + + // FIXME: use try blocks once stable. + let result = || { + let mut tables_mut = env_inner.open_tables_mut(&tx_rw)?; + + // Remove all txs which spend key images that were spent in the new block. + for key_image in spent_key_images { + match tables_mut + .spent_key_images() + .get(key_image) + .and_then(|tx_hash| ops::remove_transaction(&tx_hash, &mut tables_mut)) + { + Ok(()) | Err(RuntimeError::KeyNotFound) => (), + Err(e) => return Err(e), + } + } + + Ok(()) + }; + + if let Err(e) = result() { + TxRw::abort(tx_rw)?; + return Err(e); + } + + TxRw::commit(tx_rw)?; + Ok(TxpoolWriteResponse::Ok) +} diff --git a/storage/txpool/src/tables.rs b/storage/txpool/src/tables.rs index dbb686a..1f2d449 100644 --- a/storage/txpool/src/tables.rs +++ b/storage/txpool/src/tables.rs @@ -16,7 +16,9 @@ //! accessing _all_ tables defined here at once. use cuprate_database::{define_tables, StorableVec}; -use crate::types::{KeyImage, RawCachedVerificationState, TransactionHash, TransactionInfo}; +use crate::types::{ + KeyImage, RawCachedVerificationState, TransactionBlobHash, TransactionHash, TransactionInfo, +}; define_tables! { /// Serialized transaction blobs. @@ -41,5 +43,9 @@ define_tables! { /// /// This table contains the spent key images from all transactions in the pool. 3 => SpentKeyImages, - KeyImage => TransactionHash + KeyImage => TransactionHash, + + /// Transaction blob hashes that are in the pool. + 4 => KnownBlobHashes, + TransactionBlobHash => TransactionHash, } diff --git a/storage/txpool/src/types.rs b/storage/txpool/src/types.rs index 4da2d0f..2acb819 100644 --- a/storage/txpool/src/types.rs +++ b/storage/txpool/src/types.rs @@ -6,7 +6,6 @@ //! //! use bytemuck::{Pod, Zeroable}; - use monero_serai::transaction::Timelock; use cuprate_types::{CachedVerificationState, HardFork}; @@ -17,6 +16,9 @@ pub type KeyImage = [u8; 32]; /// A transaction hash. pub type TransactionHash = [u8; 32]; +/// A transaction blob hash. +pub type TransactionBlobHash = [u8; 32]; + bitflags::bitflags! { /// Flags representing the state of the transaction in the pool. #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Pod, Zeroable)]