diff --git a/Cargo.lock b/Cargo.lock index 713ada03..d1ad93ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -285,9 +285,7 @@ checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" dependencies = [ "android-tzdata", "iana-time-zone", - "js-sys", "num-traits", - "wasm-bindgen", "windows-targets 0.48.5", ] @@ -408,7 +406,6 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" dependencies = [ - "rand_core", "subtle", ] @@ -448,7 +445,6 @@ version = "0.1.0" dependencies = [ "borsh", "clap", - "crypto-bigint", "cuprate-common", "curve25519-dalek", "dalek-ff-group", @@ -701,7 +697,6 @@ checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", - "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -724,34 +719,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" -[[package]] -name = "futures-executor" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - [[package]] name = "futures-io" version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" -[[package]] -name = "futures-macro" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.48", -] - [[package]] name = "futures-sink" version = "0.3.30" @@ -773,7 +746,6 @@ dependencies = [ "futures-channel", "futures-core", "futures-io", - "futures-macro", "futures-sink", "futures-task", "memchr", @@ -1063,16 +1035,6 @@ version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" -[[package]] -name = "lock_api" -version = "0.4.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" -dependencies = [ - "autocfg", - "scopeguard", -] - [[package]] name = "log" version = "0.4.20" @@ -1166,7 +1128,6 @@ dependencies = [ "thiserror", "tokio", "tracing", - "zeroize", ] [[package]] @@ -1349,29 +1310,6 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" -[[package]] -name = "parking_lot" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" -dependencies = [ - "lock_api", - "parking_lot_core", -] - -[[package]] -name = "parking_lot_core" -version = "0.9.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "smallvec", - "windows-targets 0.48.5", -] - [[package]] name = "password-hash" version = "0.5.0" @@ -1483,9 +1421,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.76" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" dependencies = [ "unicode-ident", ] @@ -1594,7 +1532,7 @@ dependencies = [ [[package]] name = "randomx-rs" version = "1.3.0" -source = "git+https://github.com/Cuprate/randomx-rs.git#6496a61208852a020575dafc160080cf50bda67f" +source = "git+https://github.com/Cuprate/randomx-rs.git?rev=6496a61#6496a61208852a020575dafc160080cf50bda67f" dependencies = [ "bitflags 1.3.2", "libc", @@ -1764,12 +1702,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "scopeguard" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" - [[package]] name = "sct" version = "0.7.1" @@ -1879,15 +1811,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "signal-hook-registry" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" -dependencies = [ - "libc", -] - [[package]] name = "simple-request" version = "0.1.0" @@ -1909,9 +1832,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.12.0" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2593d31f82ead8df961d8bd23a64c2ccf2eb5dd34b0a34bfb4dd54011c72009e" +checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" [[package]] name = "socket2" @@ -2055,13 +1978,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" dependencies = [ "backtrace", - "bytes", "libc", "mio", "num_cpus", - "parking_lot", "pin-project-lite", - "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.48.0", @@ -2169,7 +2089,6 @@ version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ - "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/Cargo.toml b/Cargo.toml index f42ca820..c70d105f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,32 +35,36 @@ opt-level = 1 opt-level = 3 [workspace.dependencies] -async-trait = { version = "0.1.74" } -borsh = { version = "1.2.1", features = ["derive"] } -bytes = { version = "1.5.0" } -clap = { version = "4.4.7" } -chrono = { version = "0.4.31" } -crypto-bigint = { version = "0.5.3" } -curve25519-dalek = { version = "4.1.1" } -dalek-ff-group = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b" } -dirs = { version = "5.0.1" } -futures = { version = "0.3.29" } -hex = { version = "0.4.3" } -monero-epee-bin-serde = { git = "https://github.com/monero-rs/monero-epee-bin-serde.git", rev = "e4a585a" } -monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b" } -multiexp = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b" } -randomx-rs = { version = "1.2.1" } -rand = { version = "0.8.5" } -rayon = { version = "1.8.0" } -serde_bytes = { version = "0.11.12" } -serde_json = { version = "1.0.108" } -serde = { version = "1.0.190", features = ["derive"] } -thiserror = { version = "1.0.50" } -tokio-util = { version = "0.7.10", features = ["codec"]} -tokio = { version = "1.33.0", features = ["full"] } -tower = { version = "0.4.13", features = ["util", "steer"] } -tracing-subscriber = { version = "0.3.17" } -tracing = { version = "0.1.40" } +async-trait = { version = "0.1.74", default-features = false } +borsh = { version = "1.2.1", default-features = false } +bytes = { version = "1.5.0", default-features = false } +clap = { version = "4.4.7", default-features = false } +chrono = { version = "0.4.31", default-features = false } +primitive-types = { version = "0.12.2", default-features = false } +curve25519-dalek = { version = "4.1.1", default-features = false } +dalek-ff-group = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b", default-features = false } +dirs = { version = "5.0.1", default-features = false } +futures = { version = "0.3.29", default-features = false } +hex = { version = "0.4.3", default-features = false } +hex-literal = { version = "0.4", default-features = false } +monero-epee-bin-serde = { git = "https://github.com/monero-rs/monero-epee-bin-serde.git", rev = "e4a585a", default-features = false } +monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b", default-features = false } +multiexp = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b", default-features = false } +pin-project = { version = "1.1.3", default-features = false } +randomx-rs = { git = "https://github.com/Cuprate/randomx-rs.git", rev = "6496a61", default-features = false } +rand = { version = "0.8.5", default-features = false } +rayon = { version = "1.8.0", default-features = false } +serde_bytes = { version = "0.11.12", default-features = false } +serde_json = { version = "1.0.108", default-features = false } +serde = { version = "1.0.190", default-features = false } +thiserror = { version = "1.0.50", default-features = false } +thread_local = { version = "1.1.7", default-features = false } +tokio-util = { version = "0.7.10", default-features = false } +tokio-stream = { version = "0.1.14", default-features = false } +tokio = { version = "1.33.0", default-features = false } +tower = { version = "0.4.13", default-features = false } +tracing-subscriber = { version = "0.3.17", default-features = false } +tracing = { version = "0.1.40", default-features = false } ## workspace.dev-dependencies proptest = { version = "1" } diff --git a/common/Cargo.toml b/common/Cargo.toml index 905c0b9d..d20d73a7 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -10,10 +10,10 @@ default = [] borsh = ["dep:borsh"] [dependencies] -chrono = "0.4.24" -thiserror = "1" -hex = "0.4" +chrono = { workspace = true } +thiserror = { workspace = true } +hex = { workspace = true, features = ["std"] } -futures = "0.3.29" +futures = { workspace = true, features = ["std"] } -borsh = {version = "1.2.1", default-features = false, features = ["derive",], optional = true } \ No newline at end of file +borsh = { workspace = true, features = ["derive", "std"], optional = true } \ No newline at end of file diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 882ed11a..64d74d65 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -8,15 +8,17 @@ authors = ["Boog900"] repository = "https://github.com/Cuprate/cuprate/tree/main/consensus" [features] -default = ["binaries"] +default = [] binaries = [ "tokio/rt-multi-thread", "tokio/macros", - "dep:tracing-subscriber", "tower/retry", "tower/balance", "tower/buffer", "tower/timeout", + "monero-serai/http-rpc", + "dep:curve25519-dalek", + "dep:tracing-subscriber", "dep:serde_json", "dep:serde", "dep:monero-epee-bin-serde", @@ -27,27 +29,23 @@ binaries = [ ] [dependencies] +cuprate-common = {path = "../common"} monero-consensus = {path = "./rules", features = ["rayon"]} -thiserror = "1" -tower = {version = "0.4", features = ["util"]} -tracing = "0.1" -futures = "0.3" +thiserror = { workspace = true } +tower = { workspace = true, features = ["util"] } +tracing = { workspace = true, features = ["std", "attributes"] } +futures = { workspace = true, features = ["std"] } -crypto-bigint = "0.5" -curve25519-dalek = "4" - -randomx-rs = {git = "https://github.com/Cuprate/randomx-rs.git"} -monero-serai = { workspace = true } +randomx-rs = { workspace = true } +monero-serai = { workspace = true, features = ["std"] } multiexp = { workspace = true } dalek-ff-group = { workspace = true } -cuprate-common = {path = "../common"} - -rayon = "1" -thread_local = "1.1.7" -tokio = "1" -tokio-util = "0.7" +rayon = { workspace = true } +thread_local = { workspace = true } +tokio = { workspace = true } +tokio-util = { workspace = true } hex = "0.4" @@ -60,6 +58,7 @@ tracing-subscriber = {version = "0.3", optional = true} borsh = { workspace = true, optional = true} dirs = {version="5.0", optional = true} clap = { version = "4.4.8", optional = true, features = ["derive"] } +curve25519-dalek = { workspace = true, optional = true } # here to help cargo to pick a version - remove me syn = "2.0.37" @@ -67,6 +66,6 @@ syn = "2.0.37" [dev-dependencies] monero-consensus = {path = "./rules", features = ["proptest"]} -tokio = {version = "1", features = ["rt-multi-thread", "macros"]} -proptest = "1" -proptest-derive = "0.4.0" \ No newline at end of file +tokio = { workspace = true, features = ["rt-multi-thread", "macros"]} +proptest = { workspace = true } +proptest-derive = { workspace = true } \ No newline at end of file diff --git a/consensus/rules/Cargo.toml b/consensus/rules/Cargo.toml index 27d25bad..7f19e9e4 100644 --- a/consensus/rules/Cargo.toml +++ b/consensus/rules/Cargo.toml @@ -12,19 +12,18 @@ rayon = ["dep:rayon"] cryptonight-cuprate = {path = "../../cryptonight"} cuprate-common = {path = "../../common"} -monero-serai = { workspace = true } -multiexp = { workspace = true } -dalek-ff-group = { workspace = true } -zeroize = "1.7" -curve25519-dalek = { workspace = true } +monero-serai = { workspace = true, features = ["std"] } +multiexp = { workspace = true, features = ["std", "batch"] } +dalek-ff-group = { workspace = true, features = ["std"] } +curve25519-dalek = { workspace = true, features = ["alloc", "zeroize", "precomputed-tables"] } -rand = { workspace = true } +rand = { workspace = true, features = ["std"] } -hex = "0.4" -hex-literal = "0.4" -primitive-types = { version = "0.12.2", default-features = false } +hex = { workspace = true, features = ["std"] } +hex-literal = { workspace = true } +primitive-types = { workspace = true } # enabiling std pulls in 9 extra crates! -tracing = { workspace = true } +tracing = { workspace = true, features = ["std"] } thiserror = { workspace = true } rayon = { workspace = true, optional = true } diff --git a/consensus/src/bin/scan_chain.rs b/consensus/src/bin/scan_chain.rs index 2898a0f0..f8178771 100644 --- a/consensus/src/bin/scan_chain.rs +++ b/consensus/src/bin/scan_chain.rs @@ -1,506 +1,518 @@ -#![cfg(feature = "binaries")] - -use std::{ - collections::{HashMap, HashSet}, - ops::Range, - path::PathBuf, - sync::Arc, -}; - -use clap::Parser; -use futures::{ - channel::{mpsc, oneshot}, - SinkExt, StreamExt, -}; -use monero_serai::{block::Block, transaction::Transaction}; -use tokio::sync::RwLock; -use tower::{Service, ServiceExt}; -use tracing::level_filters::LevelFilter; - -use cuprate_common::Network; - -use cuprate_consensus::{ - block::PrePreparedBlockExPOW, - context::{ - BlockChainContextRequest, BlockChainContextResponse, ContextConfig, - UpdateBlockchainCacheData, - }, - initialize_blockchain_context, initialize_verifier, - randomx::RandomXVM, - rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig}, - Database, DatabaseRequest, DatabaseResponse, PrePreparedBlock, VerifiedBlockInformation, - VerifyBlockRequest, VerifyBlockResponse, -}; - -use monero_consensus::{blocks::randomx_seed_height, HardFork}; - mod tx_pool; -const MAX_BLOCKS_IN_RANGE: u64 = 500; -const BATCHES_IN_REQUEST: u64 = 3; -const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 1000; +#[cfg(feature = "binaries")] +mod bin { + use std::{ + collections::{HashMap, HashSet}, + ops::Range, + path::PathBuf, + sync::Arc, + }; -/// Calls for a batch of blocks, returning the response and the time it took. -async fn call_batch( - range: Range, - database: D, -) -> Result { - database - .oneshot(DatabaseRequest::BlockBatchInRange(range)) - .await -} + use clap::Parser; + use futures::{ + channel::{mpsc, oneshot}, + SinkExt, StreamExt, + }; + use monero_serai::{block::Block, transaction::Transaction}; + use tokio::sync::RwLock; + use tower::{Service, ServiceExt}; + use tracing::level_filters::LevelFilter; -async fn update_cache_and_context( - cache: &RwLock, - context_updater: &mut Ctx, - verified_block_info: VerifiedBlockInformation, -) -> Result<(), tower::BoxError> -where - Ctx: tower::Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - >, -{ - // add the new block to the cache - cache.write().await.add_new_block_data( - verified_block_info.generated_coins, - &verified_block_info.block.miner_tx, - &verified_block_info.txs, - ); - // update the chain context svc with the new block - context_updater - .ready() - .await? - .call(BlockChainContextRequest::Update( - UpdateBlockchainCacheData { - new_top_hash: verified_block_info.block_hash, - height: verified_block_info.height, - timestamp: verified_block_info.block.header.timestamp, - weight: verified_block_info.weight, - long_term_weight: verified_block_info.long_term_weight, - vote: verified_block_info.hf_vote, - generated_coins: verified_block_info.generated_coins, - cumulative_difficulty: verified_block_info.cumulative_difficulty, - }, - )) - .await?; + use cuprate_common::Network; - Ok(()) -} + use cuprate_consensus::{ + block::PrePreparedBlockExPOW, + context::{ + BlockChainContextRequest, BlockChainContextResponse, ContextConfig, + UpdateBlockchainCacheData, + }, + initialize_blockchain_context, initialize_verifier, + randomx::RandomXVM, + rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig}, + Database, DatabaseRequest, DatabaseResponse, PrePreparedBlock, VerifiedBlockInformation, + VerifyBlockRequest, VerifyBlockResponse, + }; -async fn call_blocks( - mut new_tx_chan: tx_pool::NewTxChanSen, - mut block_chan: mpsc::Sender>, - start_height: u64, - chain_height: u64, - database: D, -) -> Result<(), tower::BoxError> -where - D: Database + Clone + Send + Sync + 'static, - D::Future: Send + 'static, -{ - let mut next_fut = tokio::spawn(call_batch( - start_height..(start_height + (MAX_BLOCKS_IN_RANGE * BATCHES_IN_REQUEST)).min(chain_height), - database.clone(), - )); + use monero_consensus::{blocks::randomx_seed_height, HardFork}; - for next_batch_start in (start_height..chain_height) - .step_by((MAX_BLOCKS_IN_RANGE * BATCHES_IN_REQUEST) as usize) - .skip(1) + use super::tx_pool; + + const MAX_BLOCKS_IN_RANGE: u64 = 500; + const BATCHES_IN_REQUEST: u64 = 3; + const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 1000; + + /// Calls for a batch of blocks, returning the response and the time it took. + async fn call_batch( + range: Range, + database: D, + ) -> Result { + database + .oneshot(DatabaseRequest::BlockBatchInRange(range)) + .await + } + + async fn update_cache_and_context( + cache: &RwLock, + context_updater: &mut Ctx, + verified_block_info: VerifiedBlockInformation, + ) -> Result<(), tower::BoxError> + where + Ctx: tower::Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + >, { - // Call the next batch while we handle this batch. - let current_fut = std::mem::replace( - &mut next_fut, - tokio::spawn(call_batch( - next_batch_start - ..(next_batch_start + (MAX_BLOCKS_IN_RANGE * BATCHES_IN_REQUEST)) - .min(chain_height), - database.clone(), - )), + // add the new block to the cache + cache.write().await.add_new_block_data( + verified_block_info.generated_coins, + &verified_block_info.block.miner_tx, + &verified_block_info.txs, ); + // update the chain context svc with the new block + context_updater + .ready() + .await? + .call(BlockChainContextRequest::Update( + UpdateBlockchainCacheData { + new_top_hash: verified_block_info.block_hash, + height: verified_block_info.height, + timestamp: verified_block_info.block.header.timestamp, + weight: verified_block_info.weight, + long_term_weight: verified_block_info.long_term_weight, + vote: verified_block_info.hf_vote, + generated_coins: verified_block_info.generated_coins, + cumulative_difficulty: verified_block_info.cumulative_difficulty, + }, + )) + .await?; - let DatabaseResponse::BlockBatchInRange(blocks) = current_fut.await?? else { - panic!("Database sent incorrect response!"); - }; + Ok(()) + } - tracing::info!( - "Got batch: {:?}, chain height: {}", - (next_batch_start - (MAX_BLOCKS_IN_RANGE * BATCHES_IN_REQUEST))..(next_batch_start), - chain_height - ); + async fn call_blocks( + mut new_tx_chan: tx_pool::NewTxChanSen, + mut block_chan: mpsc::Sender>, + start_height: u64, + chain_height: u64, + database: D, + ) -> Result<(), tower::BoxError> + where + D: Database + Clone + Send + Sync + 'static, + D::Future: Send + 'static, + { + let mut next_fut = tokio::spawn(call_batch( + start_height + ..(start_height + (MAX_BLOCKS_IN_RANGE * BATCHES_IN_REQUEST)).min(chain_height), + database.clone(), + )); - let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip(); + for next_batch_start in (start_height..chain_height) + .step_by((MAX_BLOCKS_IN_RANGE * BATCHES_IN_REQUEST) as usize) + .skip(1) + { + // Call the next batch while we handle this batch. + let current_fut = std::mem::replace( + &mut next_fut, + tokio::spawn(call_batch( + next_batch_start + ..(next_batch_start + (MAX_BLOCKS_IN_RANGE * BATCHES_IN_REQUEST)) + .min(chain_height), + database.clone(), + )), + ); - let hf = |block: &Block| HardFork::from_version(block.header.major_version).unwrap(); + let DatabaseResponse::BlockBatchInRange(blocks) = current_fut.await?? else { + panic!("Database sent incorrect response!"); + }; - let txs_hf = if blocks.first().map(hf) == blocks.last().map(hf) { - vec![( - txs.into_iter().flatten().collect::>(), - blocks.first().map(hf).unwrap(), - )] - } else { - let mut txs_hfs: Vec<(Vec, HardFork)> = Vec::new(); - let mut last_hf = blocks.first().map(hf).unwrap(); + tracing::info!( + "Got batch: {:?}, chain height: {}", + (next_batch_start - (MAX_BLOCKS_IN_RANGE * BATCHES_IN_REQUEST))..(next_batch_start), + chain_height + ); - txs_hfs.push((vec![], last_hf)); + let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip(); - for (mut txs, current_hf) in txs.into_iter().zip(blocks.iter().map(hf)) { - if current_hf == last_hf { - assert_eq!(txs_hfs.last_mut().unwrap().1, current_hf); - txs_hfs.last_mut().unwrap().0.append(&mut txs); - } else { - txs_hfs.push((txs, current_hf)); - last_hf = current_hf; + let hf = |block: &Block| HardFork::from_version(block.header.major_version).unwrap(); + + let txs_hf = if blocks.first().map(hf) == blocks.last().map(hf) { + vec![( + txs.into_iter().flatten().collect::>(), + blocks.first().map(hf).unwrap(), + )] + } else { + let mut txs_hfs: Vec<(Vec, HardFork)> = Vec::new(); + let mut last_hf = blocks.first().map(hf).unwrap(); + + txs_hfs.push((vec![], last_hf)); + + for (mut txs, current_hf) in txs.into_iter().zip(blocks.iter().map(hf)) { + if current_hf == last_hf { + assert_eq!(txs_hfs.last_mut().unwrap().1, current_hf); + txs_hfs.last_mut().unwrap().0.append(&mut txs); + } else { + txs_hfs.push((txs, current_hf)); + last_hf = current_hf; + } } - } - txs_hfs + txs_hfs + }; + + let (tx, rx) = oneshot::channel(); + new_tx_chan.send((txs_hf, tx)).await?; + rx.await.unwrap().unwrap(); + + block_chan.send(blocks).await?; + } + + Ok(()) + } + + async fn scan_chain( + cache: Arc>, + save_file: PathBuf, + _rpc_config: Arc>, + database: D, + net: Network, + ) -> Result<(), tower::BoxError> + where + D: Database + Clone + Send + Sync + 'static, + D::Future: Send + 'static, + { + tracing::info!("Beginning chain scan"); + + // TODO: when we implement all rules use the RPCs chain height, for now we don't check v2 txs. + let chain_height = 3_152_725; + + tracing::info!("scanning to chain height: {}", chain_height); + + let config = match net { + Network::Mainnet => ContextConfig::main_net(), + Network::Stagenet => ContextConfig::stage_net(), + Network::Testnet => ContextConfig::test_net(), }; + let mut ctx_svc = initialize_blockchain_context(config, database.clone()).await?; + let (tx, rx) = oneshot::channel(); - new_tx_chan.send((txs_hf, tx)).await?; - rx.await.unwrap().unwrap(); - block_chan.send(blocks).await?; - } + let (tx_pool_svc, new_tx_chan) = tx_pool::TxPool::spawn(rx, ctx_svc.clone()).await?; - Ok(()) -} + let (mut block_verifier, transaction_verifier) = + initialize_verifier(database.clone(), tx_pool_svc, ctx_svc.clone()).await?; -async fn scan_chain( - cache: Arc>, - save_file: PathBuf, - _rpc_config: Arc>, - database: D, - net: Network, -) -> Result<(), tower::BoxError> -where - D: Database + Clone + Send + Sync + 'static, - D::Future: Send + 'static, -{ - tracing::info!("Beginning chain scan"); + tx.send(transaction_verifier).map_err(|_| "").unwrap(); - // TODO: when we implement all rules use the RPCs chain height, for now we don't check v2 txs. - let chain_height = 3_152_725; + let start_height = cache.read().await.height; - tracing::info!("scanning to chain height: {}", chain_height); + let (block_tx, mut incoming_blocks) = mpsc::channel(3); - let config = match net { - Network::Mainnet => ContextConfig::main_net(), - Network::Stagenet => ContextConfig::stage_net(), - Network::Testnet => ContextConfig::test_net(), - }; + let (mut prepped_blocks_tx, mut prepped_blocks_rx) = mpsc::channel(3); - let mut ctx_svc = initialize_blockchain_context(config, database.clone()).await?; + tokio::spawn(async move { + call_blocks(new_tx_chan, block_tx, start_height, chain_height, database).await + }); - let (tx, rx) = oneshot::channel(); + let BlockChainContextResponse::Context(ctx) = ctx_svc + .ready() + .await? + .call(BlockChainContextRequest::Get) + .await? + else { + panic!("ctx svc sent wrong response!"); + }; + let mut rx_seed_cache = ctx.unchecked_blockchain_context().rx_seed_cache.clone(); + let mut rx_seed_cache_initiated = false; - let (tx_pool_svc, new_tx_chan) = tx_pool::TxPool::spawn(rx, ctx_svc.clone()).await?; + let mut randomx_vms: Option> = Some(HashMap::new()); - let (mut block_verifier, transaction_verifier) = - initialize_verifier(database.clone(), tx_pool_svc, ctx_svc.clone()).await?; + let mut cloned_ctx_svc = ctx_svc.clone(); + tokio::spawn(async move { + while let Some(blocks) = incoming_blocks.next().await { + if blocks.last().unwrap().header.major_version >= 12 { + if !rx_seed_cache_initiated { + let BlockChainContextResponse::Context(ctx) = cloned_ctx_svc + .ready() + .await + .unwrap() + .call(BlockChainContextRequest::Get) + .await + .unwrap() + else { + panic!("ctx svc sent wrong response!"); + }; + rx_seed_cache = ctx.unchecked_blockchain_context().rx_seed_cache.clone(); + rx_seed_cache_initiated = true; + } - tx.send(transaction_verifier).map_err(|_| "").unwrap(); + let unwrapped_rx_vms = randomx_vms.as_mut().unwrap(); - let start_height = cache.read().await.height; - - let (block_tx, mut incoming_blocks) = mpsc::channel(3); - - let (mut prepped_blocks_tx, mut prepped_blocks_rx) = mpsc::channel(3); - - tokio::spawn(async move { - call_blocks(new_tx_chan, block_tx, start_height, chain_height, database).await - }); - - let BlockChainContextResponse::Context(ctx) = ctx_svc - .ready() - .await? - .call(BlockChainContextRequest::Get) - .await? - else { - panic!("ctx svc sent wrong response!"); - }; - let mut rx_seed_cache = ctx.unchecked_blockchain_context().rx_seed_cache.clone(); - let mut rx_seed_cache_initiated = false; - - let mut randomx_vms: Option> = Some(HashMap::new()); - - let mut cloned_ctx_svc = ctx_svc.clone(); - tokio::spawn(async move { - while let Some(blocks) = incoming_blocks.next().await { - if blocks.last().unwrap().header.major_version >= 12 { - if !rx_seed_cache_initiated { - let BlockChainContextResponse::Context(ctx) = cloned_ctx_svc - .ready() - .await - .unwrap() - .call(BlockChainContextRequest::Get) - .await - .unwrap() - else { - panic!("ctx svc sent wrong response!"); - }; - rx_seed_cache = ctx.unchecked_blockchain_context().rx_seed_cache.clone(); - rx_seed_cache_initiated = true; - } - - let unwrapped_rx_vms = randomx_vms.as_mut().unwrap(); - - let blocks = rayon_spawn_async(move || { - blocks - .into_iter() - .map(move |block| PrePreparedBlockExPOW::new(block).unwrap()) - .collect::>() - }) - .await; - - let seeds_needed = blocks - .iter() - .map(|block| { - rx_seed_cache.new_block(block.block.number() as u64, &block.block_hash); - randomx_seed_height(block.block.number() as u64) + let blocks = rayon_spawn_async(move || { + blocks + .into_iter() + .map(move |block| PrePreparedBlockExPOW::new(block).unwrap()) + .collect::>() }) - .collect::>(); + .await; - unwrapped_rx_vms.retain(|seed_height, _| seeds_needed.contains(seed_height)); + let seeds_needed = blocks + .iter() + .map(|block| { + rx_seed_cache.new_block(block.block.number() as u64, &block.block_hash); + randomx_seed_height(block.block.number() as u64) + }) + .collect::>(); - for seed_height in seeds_needed { - unwrapped_rx_vms.entry(seed_height).or_insert_with(|| { - RandomXVM::new(rx_seed_cache.get_seeds_hash(seed_height)).unwrap() - }); + unwrapped_rx_vms.retain(|seed_height, _| seeds_needed.contains(seed_height)); + + for seed_height in seeds_needed { + unwrapped_rx_vms.entry(seed_height).or_insert_with(|| { + RandomXVM::new(rx_seed_cache.get_seeds_hash(seed_height)).unwrap() + }); + } + + let arc_rx_vms = Arc::new(randomx_vms.take().unwrap()); + let cloned_arc_rx_vms = arc_rx_vms.clone(); + let blocks = rayon_spawn_async(move || { + blocks + .into_iter() + .map(move |block| { + let rx_vm = arc_rx_vms + .get(&randomx_seed_height(block.block.number() as u64)) + .unwrap(); + PrePreparedBlock::new_rx(block, rx_vm).unwrap() + }) + .collect::>() + }) + .await; + + randomx_vms = Some(Arc::into_inner(cloned_arc_rx_vms).unwrap()); + + prepped_blocks_tx.send(blocks).await.unwrap(); + } else { + let blocks = rayon_spawn_async(move || { + blocks + .into_iter() + .map(move |block| PrePreparedBlock::new(block).unwrap()) + .collect::>() + }) + .await; + + prepped_blocks_tx.send(blocks).await.unwrap(); + } + } + }); + + while let Some(incoming_blocks) = prepped_blocks_rx.next().await { + let mut height; + for block in incoming_blocks { + let VerifyBlockResponse::MainChain(verified_block_info) = block_verifier + .ready() + .await? + .call(VerifyBlockRequest::MainChainPrepared(block)) + .await?; + + height = verified_block_info.height; + + if verified_block_info.height % 5000 == 0 { + tracing::info!("saving cache to: {}", save_file.display()); + cache.write().await.save(&save_file).unwrap(); } - let arc_rx_vms = Arc::new(randomx_vms.take().unwrap()); - let cloned_arc_rx_vms = arc_rx_vms.clone(); - let blocks = rayon_spawn_async(move || { - blocks - .into_iter() - .map(move |block| { - let rx_vm = arc_rx_vms - .get(&randomx_seed_height(block.block.number() as u64)) - .unwrap(); - PrePreparedBlock::new_rx(block, rx_vm).unwrap() - }) - .collect::>() - }) - .await; + update_cache_and_context(&cache, &mut ctx_svc, verified_block_info).await?; - randomx_vms = Some(Arc::into_inner(cloned_arc_rx_vms).unwrap()); - - prepped_blocks_tx.send(blocks).await.unwrap(); - } else { - let blocks = rayon_spawn_async(move || { - blocks - .into_iter() - .map(move |block| PrePreparedBlock::new(block).unwrap()) - .collect::>() - }) - .await; - - prepped_blocks_tx.send(blocks).await.unwrap(); + if height % 200 == 0 { + tracing::info!( + "verified blocks: {:?}, chain height: {}", + 0..height, + chain_height + ); + } } } - }); - while let Some(incoming_blocks) = prepped_blocks_rx.next().await { - let mut height; - for block in incoming_blocks { - let VerifyBlockResponse::MainChain(verified_block_info) = block_verifier - .ready() - .await? - .call(VerifyBlockRequest::MainChainPrepared(block)) - .await?; - - height = verified_block_info.height; - - if verified_block_info.height % 5000 == 0 { - tracing::info!("saving cache to: {}", save_file.display()); - cache.write().await.save(&save_file).unwrap(); - } - - update_cache_and_context(&cache, &mut ctx_svc, verified_block_info).await?; - - if height % 200 == 0 { - tracing::info!( - "verified blocks: {:?}, chain height: {}", - 0..height, - chain_height - ); - } - } + Ok(()) } - Ok(()) -} - -#[derive(Parser)] -struct Args { - /// The log level, valid values: - /// "off", "error", "warn", "info", "debug", "trace", or a number 0-5. - #[arg(short, long, default_value = "info")] - log_level: LevelFilter, - /// The network we should scan, valid values: - /// "mainnet", "testnet", "stagenet". - #[arg(short, long, default_value = "mainnet")] - network: String, - /// A list of RPC nodes we should use. - /// Example: http://xmr-node.cakewallet.com:18081 - #[arg(long)] - rpc_nodes: Vec, - /// Stops the scanner from including the default list of nodes, this is not - /// recommended unless you have sufficient self defined nodes with `rpc_nodes` - #[arg(long)] - dont_use_default_nodes: bool, - /// The directory/ folder to save the scanning cache in. - /// This will default to your user cache directory. - #[arg(long)] - cache_dir: Option, + async fn rayon_spawn_async(f: F) -> R + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let (tx, rx) = tokio::sync::oneshot::channel(); + rayon::spawn(|| { + let _ = tx.send(f()); + }); + rx.await.expect("The sender must not be dropped") + } + + #[derive(Parser)] + struct Args { + /// The log level, valid values: + /// "off", "error", "warn", "info", "debug", "trace", or a number 0-5. + #[arg(short, long, default_value = "info")] + log_level: LevelFilter, + /// The network we should scan, valid values: + /// "mainnet", "testnet", "stagenet". + #[arg(short, long, default_value = "mainnet")] + network: String, + /// A list of RPC nodes we should use. + /// Example: http://xmr-node.cakewallet.com:18081 + #[arg(long)] + rpc_nodes: Vec, + /// Stops the scanner from including the default list of nodes, this is not + /// recommended unless you have sufficient self defined nodes with `rpc_nodes` + #[arg(long)] + dont_use_default_nodes: bool, + /// The directory/ folder to save the scanning cache in. + /// This will default to your user cache directory. + #[arg(long)] + cache_dir: Option, + } + + pub async fn run() { + let args = Args::parse(); + + if args.dont_use_default_nodes & args.rpc_nodes.is_empty() { + panic!("Can't run scanner with no RPC nodes, see `--help` ") + } + + tracing_subscriber::fmt() + .with_max_level(args.log_level) + .init(); + + let network = match args.network.as_str() { + "mainnet" => Network::Mainnet, + "testnet" => Network::Testnet, + "stagenet" => Network::Stagenet, + _ => panic!("Invalid network, scanner currently only supports mainnet"), + }; + + let mut file_for_cache = match args.cache_dir { + Some(dir) => dir, + None => dirs::cache_dir().unwrap(), + }; + + match network { + Network::Mainnet => file_for_cache.push("cuprate_rpc_scanning_cache.bin"), + Network::Stagenet => file_for_cache.push("cuprate_rpc_scanning_cache_stage_net.bin"), + Network::Testnet => file_for_cache.push("cuprate_rpc_scanning_cache_test_net.bin"), + } + + let mut urls = if args.dont_use_default_nodes { + vec![] + } else { + match network { + Network::Mainnet => vec![ + "http://xmr-node.cakewallet.com:18081".to_string(), + "https://node.sethforprivacy.com".to_string(), + // "http://nodex.monerujo.io:18081".to_string(), + "http://nodes.hashvault.pro:18081".to_string(), + "http://node.c3pool.com:18081".to_string(), + "http://node.trocador.app:18089".to_string(), + "http://xmr.lukas.services:18089".to_string(), + "http://xmr-node-eu.cakewallet.com:18081".to_string(), + "http://68.118.241.70:18089".to_string(), + "http://145.239.97.211:18089".to_string(), + // + "http://xmr-node.cakewallet.com:18081".to_string(), + "https://node.sethforprivacy.com".to_string(), + // "http://nodex.monerujo.io:18081".to_string(), + "http://nodes.hashvault.pro:18081".to_string(), + "http://node.c3pool.com:18081".to_string(), + "http://node.trocador.app:18089".to_string(), + "http://xmr.lukas.services:18089".to_string(), + "http://xmr-node-eu.cakewallet.com:18081".to_string(), + "http://68.118.241.70:18089".to_string(), + "http://145.239.97.211:18089".to_string(), + ], + Network::Testnet => vec![ + "http://testnet.xmr-tw.org:28081".to_string(), + "http://node3.monerodevs.org:28089".to_string(), + "http://node.monerodevs.org:28089".to_string(), + "http://125.229.105.12:28081".to_string(), + "http://node2.monerodevs.org:28089".to_string(), + "https://testnet.xmr.ditatompel.com".to_string(), + "http://singapore.node.xmr.pm:28081".to_string(), + // + "http://testnet.xmr-tw.org:28081".to_string(), + "http://node3.monerodevs.org:28089".to_string(), + "http://node.monerodevs.org:28089".to_string(), + "http://125.229.105.12:28081".to_string(), + "http://node2.monerodevs.org:28089".to_string(), + "https://testnet.xmr.ditatompel.com".to_string(), + "http://singapore.node.xmr.pm:28081".to_string(), + ], + Network::Stagenet => vec![ + "http://125.229.105.12:38081".to_string(), + "http://90.189.159.23:38089".to_string(), + "http://stagenet.xmr-tw.org:38081".to_string(), + "http://node.monerodevs.org:38089".to_string(), + "http://stagenet.community.rino.io:38081".to_string(), + "http://node2.monerodevs.org:38089".to_string(), + "http://node3.monerodevs.org:38089".to_string(), + "http://singapore.node.xmr.pm:38081".to_string(), + "https://stagenet.xmr.ditatompel.com".to_string(), + "http://3.10.182.182:38081".to_string(), + // + "http://125.229.105.12:38081".to_string(), + "http://90.189.159.23:38089".to_string(), + "http://stagenet.xmr-tw.org:38081".to_string(), + "http://node.monerodevs.org:38089".to_string(), + "http://stagenet.community.rino.io:38081".to_string(), + "http://node2.monerodevs.org:38089".to_string(), + "http://node3.monerodevs.org:38089".to_string(), + "http://singapore.node.xmr.pm:38081".to_string(), + "https://stagenet.xmr.ditatompel.com".to_string(), + "http://3.10.182.182:38081".to_string(), + ], + } + }; + + urls.extend(args.rpc_nodes.into_iter()); + + let rpc_config = RpcConfig::new(MAX_BLOCKS_IN_RANGE, MAX_BLOCKS_HEADERS_IN_RANGE); + let rpc_config = Arc::new(std::sync::RwLock::new(rpc_config)); + + tracing::info!("Attempting to open cache at: {}", file_for_cache.display()); + let cache = match ScanningCache::load(&file_for_cache) { + Ok(cache) => { + tracing::info!("Reloaded from cache, chain height: {}", cache.height); + Arc::new(RwLock::new(cache)) + } + Err(_) => { + tracing::warn!("Couldn't load from cache starting from scratch"); + let mut cache = ScanningCache::default(); + let genesis = monero_consensus::genesis::generate_genesis_block(&network); + + let total_outs = genesis + .miner_tx + .prefix + .outputs + .iter() + .map(|out| out.amount.unwrap_or(0)) + .sum::(); + + cache.add_new_block_data(total_outs, &genesis.miner_tx, &[]); + Arc::new(RwLock::new(cache)) + } + }; + + let rpc = init_rpc_load_balancer(urls, cache.clone(), rpc_config.clone()); + + scan_chain(cache, file_for_cache, rpc_config, rpc, network) + .await + .unwrap(); + } } +#[cfg(feature = "binaries")] #[tokio::main] async fn main() { - let args = Args::parse(); - - if args.dont_use_default_nodes & args.rpc_nodes.is_empty() { - panic!("Can't run scanner with no RPC nodes, see `--help` ") - } - - tracing_subscriber::fmt() - .with_max_level(args.log_level) - .init(); - - let network = match args.network.as_str() { - "mainnet" => Network::Mainnet, - "testnet" => Network::Testnet, - "stagenet" => Network::Stagenet, - _ => panic!("Invalid network, scanner currently only supports mainnet"), - }; - - let mut file_for_cache = match args.cache_dir { - Some(dir) => dir, - None => dirs::cache_dir().unwrap(), - }; - - match network { - Network::Mainnet => file_for_cache.push("cuprate_rpc_scanning_cache.bin"), - Network::Stagenet => file_for_cache.push("cuprate_rpc_scanning_cache_stage_net.bin"), - Network::Testnet => file_for_cache.push("cuprate_rpc_scanning_cache_test_net.bin"), - } - - let mut urls = if args.dont_use_default_nodes { - vec![] - } else { - match network { - Network::Mainnet => vec![ - "http://xmr-node.cakewallet.com:18081".to_string(), - "https://node.sethforprivacy.com".to_string(), - // "http://nodex.monerujo.io:18081".to_string(), - "http://nodes.hashvault.pro:18081".to_string(), - "http://node.c3pool.com:18081".to_string(), - "http://node.trocador.app:18089".to_string(), - "http://xmr.lukas.services:18089".to_string(), - "http://xmr-node-eu.cakewallet.com:18081".to_string(), - "http://68.118.241.70:18089".to_string(), - "http://145.239.97.211:18089".to_string(), - // - "http://xmr-node.cakewallet.com:18081".to_string(), - "https://node.sethforprivacy.com".to_string(), - // "http://nodex.monerujo.io:18081".to_string(), - "http://nodes.hashvault.pro:18081".to_string(), - "http://node.c3pool.com:18081".to_string(), - "http://node.trocador.app:18089".to_string(), - "http://xmr.lukas.services:18089".to_string(), - "http://xmr-node-eu.cakewallet.com:18081".to_string(), - "http://68.118.241.70:18089".to_string(), - "http://145.239.97.211:18089".to_string(), - ], - Network::Testnet => vec![ - "http://testnet.xmr-tw.org:28081".to_string(), - "http://node3.monerodevs.org:28089".to_string(), - "http://node.monerodevs.org:28089".to_string(), - "http://125.229.105.12:28081".to_string(), - "http://node2.monerodevs.org:28089".to_string(), - "https://testnet.xmr.ditatompel.com".to_string(), - "http://singapore.node.xmr.pm:28081".to_string(), - // - "http://testnet.xmr-tw.org:28081".to_string(), - "http://node3.monerodevs.org:28089".to_string(), - "http://node.monerodevs.org:28089".to_string(), - "http://125.229.105.12:28081".to_string(), - "http://node2.monerodevs.org:28089".to_string(), - "https://testnet.xmr.ditatompel.com".to_string(), - "http://singapore.node.xmr.pm:28081".to_string(), - ], - Network::Stagenet => vec![ - "http://125.229.105.12:38081".to_string(), - "http://90.189.159.23:38089".to_string(), - "http://stagenet.xmr-tw.org:38081".to_string(), - "http://node.monerodevs.org:38089".to_string(), - "http://stagenet.community.rino.io:38081".to_string(), - "http://node2.monerodevs.org:38089".to_string(), - "http://node3.monerodevs.org:38089".to_string(), - "http://singapore.node.xmr.pm:38081".to_string(), - "https://stagenet.xmr.ditatompel.com".to_string(), - "http://3.10.182.182:38081".to_string(), - // - "http://125.229.105.12:38081".to_string(), - "http://90.189.159.23:38089".to_string(), - "http://stagenet.xmr-tw.org:38081".to_string(), - "http://node.monerodevs.org:38089".to_string(), - "http://stagenet.community.rino.io:38081".to_string(), - "http://node2.monerodevs.org:38089".to_string(), - "http://node3.monerodevs.org:38089".to_string(), - "http://singapore.node.xmr.pm:38081".to_string(), - "https://stagenet.xmr.ditatompel.com".to_string(), - "http://3.10.182.182:38081".to_string(), - ], - } - }; - - urls.extend(args.rpc_nodes.into_iter()); - - let rpc_config = RpcConfig::new(MAX_BLOCKS_IN_RANGE, MAX_BLOCKS_HEADERS_IN_RANGE); - let rpc_config = Arc::new(std::sync::RwLock::new(rpc_config)); - - tracing::info!("Attempting to open cache at: {}", file_for_cache.display()); - let cache = match ScanningCache::load(&file_for_cache) { - Ok(cache) => { - tracing::info!("Reloaded from cache, chain height: {}", cache.height); - Arc::new(RwLock::new(cache)) - } - Err(_) => { - tracing::warn!("Couldn't load from cache starting from scratch"); - let mut cache = ScanningCache::default(); - let genesis = monero_consensus::genesis::generate_genesis_block(&network); - - let total_outs = genesis - .miner_tx - .prefix - .outputs - .iter() - .map(|out| out.amount.unwrap_or(0)) - .sum::(); - - cache.add_new_block_data(total_outs, &genesis.miner_tx, &[]); - Arc::new(RwLock::new(cache)) - } - }; - - let rpc = init_rpc_load_balancer(urls, cache.clone(), rpc_config.clone()); - - scan_chain(cache, file_for_cache, rpc_config, rpc, network) - .await - .unwrap(); + bin::run().await } -async fn rayon_spawn_async(f: F) -> R -where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, -{ - let (tx, rx) = tokio::sync::oneshot::channel(); - rayon::spawn(|| { - let _ = tx.send(f()); - }); - rx.await.expect("The sender must not be dropped") -} +#[cfg(not(feature = "binaries"))] +fn main() {} diff --git a/consensus/src/bin/tx_pool.rs b/consensus/src/bin/tx_pool.rs index 905a58c6..c27c5cf8 100644 --- a/consensus/src/bin/tx_pool.rs +++ b/consensus/src/bin/tx_pool.rs @@ -1,138 +1,102 @@ -#![cfg(feature = "binaries")] +#[cfg(feature = "binaries")] +mod bin { + use std::{ + collections::HashMap, + sync::{Arc, Mutex}, + task::{Context, Poll}, + }; -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, - task::{Context, Poll}, -}; + use futures::{ + channel::{mpsc, oneshot}, + StreamExt, + }; + use monero_serai::transaction::Transaction; + use tower::{Service, ServiceExt}; -use futures::{ - channel::{mpsc, oneshot}, - StreamExt, -}; -use monero_serai::transaction::Transaction; -use tower::{Service, ServiceExt}; + use cuprate_common::tower_utils::InfallibleOneshotReceiver; -use cuprate_common::tower_utils::InfallibleOneshotReceiver; + use cuprate_consensus::{ + context::{ + BlockChainContext, BlockChainContextRequest, BlockChainContextResponse, + RawBlockChainContext, + }, + transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse}, + ExtendedConsensusError, TxNotInPool, TxPoolRequest, TxPoolResponse, + }; + use monero_consensus::HardFork; -use cuprate_consensus::{ - context::{ - BlockChainContext, BlockChainContextRequest, BlockChainContextResponse, - RawBlockChainContext, - }, - transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse}, - ExtendedConsensusError, TxNotInPool, TxPoolRequest, TxPoolResponse, -}; -use monero_consensus::HardFork; - -#[derive(Clone)] -pub struct TxPoolHandle { - tx_pool_task: std::sync::Arc>, - tx_pool_chan: mpsc::Sender<( - TxPoolRequest, - oneshot::Sender>, - )>, -} - -impl tower::Service for TxPoolHandle { - type Response = TxPoolResponse; - type Error = TxNotInPool; - type Future = InfallibleOneshotReceiver>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - if self.tx_pool_task.is_finished() { - panic!("Tx pool task finished before it was supposed to!"); - }; - - self.tx_pool_chan - .poll_ready(cx) - .map_err(|_| panic!("Tx pool channel closed before it was supposed to")) + #[derive(Clone)] + pub struct TxPoolHandle { + tx_pool_task: std::sync::Arc>, + tx_pool_chan: mpsc::Sender<( + TxPoolRequest, + oneshot::Sender>, + )>, } - fn call(&mut self, req: TxPoolRequest) -> Self::Future { - let (tx, rx) = oneshot::channel(); - self.tx_pool_chan - .try_send((req, tx)) - .expect("You need to use `poll_ready` to check capacity!"); + impl tower::Service for TxPoolHandle { + type Response = TxPoolResponse; + type Error = TxNotInPool; + type Future = InfallibleOneshotReceiver>; - rx.into() - } -} + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.tx_pool_task.is_finished() { + panic!("Tx pool task finished before it was supposed to!"); + }; -pub type NewTxChanRec = mpsc::Receiver<( - Vec<(Vec, HardFork)>, - oneshot::Sender>, -)>; + self.tx_pool_chan + .poll_ready(cx) + .map_err(|_| panic!("Tx pool channel closed before it was supposed to")) + } -pub type NewTxChanSen = mpsc::Sender<( - Vec<(Vec, HardFork)>, - oneshot::Sender>, -)>; + fn call(&mut self, req: TxPoolRequest) -> Self::Future { + let (tx, rx) = oneshot::channel(); + self.tx_pool_chan + .try_send((req, tx)) + .expect("You need to use `poll_ready` to check capacity!"); -pub struct TxPool { - txs: Arc>>>, - current_ctx: BlockChainContext, - tx_verifier: Option, - tx_verifier_chan: Option>, - ctx_svc: Ctx, -} - -impl TxPool -where - TxV: Service - + Clone - + Send - + 'static, - TxV::Future: Send + 'static, - Ctx: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - > + Send - + 'static, - Ctx::Future: Send + 'static, -{ - pub async fn spawn( - tx_verifier_chan: oneshot::Receiver, - mut ctx_svc: Ctx, - ) -> Result<(TxPoolHandle, NewTxChanSen), tower::BoxError> { - let BlockChainContextResponse::Context(current_ctx) = ctx_svc - .ready() - .await? - .call(BlockChainContextRequest::Get) - .await? - else { - panic!("Context service service returned wrong response!") - }; - - let tx_pool = TxPool { - txs: Default::default(), - current_ctx, - tx_verifier: None, - tx_verifier_chan: Some(tx_verifier_chan), - ctx_svc, - }; - - let (tx_pool_tx, tx_pool_rx) = mpsc::channel(3); - let (new_tx_tx, new_tx_rx) = mpsc::channel(3); - - let tx_pool_task = tokio::spawn(tx_pool.run(tx_pool_rx, new_tx_rx)); - - Ok(( - TxPoolHandle { - tx_pool_task: tx_pool_task.into(), - tx_pool_chan: tx_pool_tx, - }, - new_tx_tx, - )) + rx.into() + } } - async fn get_or_update_ctx(&mut self) -> Result { - if let Ok(current_ctx) = self.current_ctx.blockchain_context().cloned() { - Ok(current_ctx) - } else { - let BlockChainContextResponse::Context(current_ctx) = self - .ctx_svc + pub type NewTxChanRec = mpsc::Receiver<( + Vec<(Vec, HardFork)>, + oneshot::Sender>, + )>; + + pub type NewTxChanSen = mpsc::Sender<( + Vec<(Vec, HardFork)>, + oneshot::Sender>, + )>; + + pub struct TxPool { + txs: Arc>>>, + current_ctx: BlockChainContext, + tx_verifier: Option, + tx_verifier_chan: Option>, + ctx_svc: Ctx, + } + + impl TxPool + where + TxV: Service + + Clone + + Send + + 'static, + TxV::Future: Send + 'static, + Ctx: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + > + Send + + 'static, + Ctx::Future: Send + 'static, + { + pub async fn spawn( + tx_verifier_chan: oneshot::Receiver, + mut ctx_svc: Ctx, + ) -> Result<(TxPoolHandle, NewTxChanSen), tower::BoxError> { + let BlockChainContextResponse::Context(current_ctx) = ctx_svc .ready() .await? .call(BlockChainContextRequest::Get) @@ -141,107 +105,147 @@ where panic!("Context service service returned wrong response!") }; - self.current_ctx = current_ctx; - - Ok(self.current_ctx.unchecked_blockchain_context().clone()) - } - } - - async fn handle_txs_req( - &mut self, - req: TxPoolRequest, - tx: oneshot::Sender>, - ) { - let TxPoolRequest::Transactions(txs_to_get) = req; - - let mut res = Vec::with_capacity(txs_to_get.len()); - - for tx_hash in txs_to_get { - let Some(tx) = self.txs.lock().unwrap().remove(&tx_hash) else { - tracing::debug!("tx not in pool: {}", hex::encode(tx_hash)); - let _ = tx.send(Err(TxNotInPool)); - return; + let tx_pool = TxPool { + txs: Default::default(), + current_ctx, + tx_verifier: None, + tx_verifier_chan: Some(tx_verifier_chan), + ctx_svc, }; - res.push(tx) + + let (tx_pool_tx, tx_pool_rx) = mpsc::channel(3); + let (new_tx_tx, new_tx_rx) = mpsc::channel(3); + + let tx_pool_task = tokio::spawn(tx_pool.run(tx_pool_rx, new_tx_rx)); + + Ok(( + TxPoolHandle { + tx_pool_task: tx_pool_task.into(), + tx_pool_chan: tx_pool_tx, + }, + new_tx_tx, + )) } - let _ = tx.send(Ok(TxPoolResponse::Transactions(res))); - } - - async fn handle_new_txs( - &mut self, - new_txs: Vec<(Vec, HardFork)>, - res_chan: oneshot::Sender>, - ) -> Result<(), tower::BoxError> { - if self.tx_verifier.is_none() { - self.tx_verifier = Some(self.tx_verifier_chan.take().unwrap().await?); - } - - let current_ctx = self.get_or_update_ctx().await?; - - let mut tx_verifier = self.tx_verifier.clone().unwrap(); - let tx_pool = self.txs.clone(); - - tokio::spawn(async move { - for (txs, hf) in new_txs { - // We only batch the setup a real tx pool would also call `VerifyTxRequest::Block` - let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier + async fn get_or_update_ctx(&mut self) -> Result { + if let Ok(current_ctx) = self.current_ctx.blockchain_context().cloned() { + Ok(current_ctx) + } else { + let BlockChainContextResponse::Context(current_ctx) = self + .ctx_svc .ready() - .await - .unwrap() - .call(VerifyTxRequest::BatchSetup { - txs, - hf, - re_org_token: current_ctx.re_org_token.clone(), - }) - .await - .unwrap() + .await? + .call(BlockChainContextRequest::Get) + .await? else { - panic!("Tx verifier sent incorrect response!"); + panic!("Context service service returned wrong response!") }; - let mut locked_pool = tx_pool.lock().unwrap(); + self.current_ctx = current_ctx; - for tx in txs { - let tx_hash = tx.tx_hash; - if locked_pool.insert(tx_hash, tx).is_some() { - panic!("added same tx to pool twice: {}", hex::encode(tx_hash)) + Ok(self.current_ctx.unchecked_blockchain_context().clone()) + } + } + + async fn handle_txs_req( + &mut self, + req: TxPoolRequest, + tx: oneshot::Sender>, + ) { + let TxPoolRequest::Transactions(txs_to_get) = req; + + let mut res = Vec::with_capacity(txs_to_get.len()); + + for tx_hash in txs_to_get { + let Some(tx) = self.txs.lock().unwrap().remove(&tx_hash) else { + tracing::debug!("tx not in pool: {}", hex::encode(tx_hash)); + let _ = tx.send(Err(TxNotInPool)); + return; + }; + res.push(tx) + } + + let _ = tx.send(Ok(TxPoolResponse::Transactions(res))); + } + + async fn handle_new_txs( + &mut self, + new_txs: Vec<(Vec, HardFork)>, + res_chan: oneshot::Sender>, + ) -> Result<(), tower::BoxError> { + if self.tx_verifier.is_none() { + self.tx_verifier = Some(self.tx_verifier_chan.take().unwrap().await?); + } + + let current_ctx = self.get_or_update_ctx().await?; + + let mut tx_verifier = self.tx_verifier.clone().unwrap(); + let tx_pool = self.txs.clone(); + + tokio::spawn(async move { + for (txs, hf) in new_txs { + // We only batch the setup a real tx pool would also call `VerifyTxRequest::Block` + let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier + .ready() + .await + .unwrap() + .call(VerifyTxRequest::BatchSetup { + txs, + hf, + re_org_token: current_ctx.re_org_token.clone(), + }) + .await + .unwrap() + else { + panic!("Tx verifier sent incorrect response!"); + }; + + let mut locked_pool = tx_pool.lock().unwrap(); + + for tx in txs { + let tx_hash = tx.tx_hash; + if locked_pool.insert(tx_hash, tx).is_some() { + panic!("added same tx to pool twice: {}", hex::encode(tx_hash)) + } } } - } - res_chan.send(Ok(())).unwrap(); - }); - Ok(()) - } + res_chan.send(Ok(())).unwrap(); + }); + Ok(()) + } - pub async fn run( - mut self, - mut tx_pool_handle: mpsc::Receiver<( - TxPoolRequest, - oneshot::Sender>, - )>, - mut new_tx_channel: NewTxChanRec, - ) { - loop { - tokio::select! { - biased; - new_txs = new_tx_channel.next() => { - let Some(new_txs) = new_txs else { - todo!("Shutdown txpool") - }; + pub async fn run( + mut self, + mut tx_pool_handle: mpsc::Receiver<( + TxPoolRequest, + oneshot::Sender>, + )>, + mut new_tx_channel: NewTxChanRec, + ) { + loop { + tokio::select! { + biased; + new_txs = new_tx_channel.next() => { + let Some(new_txs) = new_txs else { + todo!("Shutdown txpool") + }; - self.handle_new_txs(new_txs.0, new_txs.1).await.unwrap() - } - pool_req = tx_pool_handle.next() => { - let Some((req, tx)) = pool_req else { - todo!("Shutdown txpool") - }; - self.handle_txs_req(req, tx).await; + self.handle_new_txs(new_txs.0, new_txs.1).await.unwrap() + } + pool_req = tx_pool_handle.next() => { + let Some((req, tx)) = pool_req else { + todo!("Shutdown txpool") + }; + self.handle_txs_req(req, tx).await; + } } } } } } +#[cfg(feature = "binaries")] +pub use bin::*; + #[allow(dead_code)] fn main() {} diff --git a/helper/Cargo.toml b/helper/Cargo.toml index 5520ad8d..465e9518 100644 --- a/helper/Cargo.toml +++ b/helper/Cargo.toml @@ -14,8 +14,8 @@ time = ["dep:chrono", "std"] thread = ["std", "dep:target_os_lib"] [dependencies] -chrono = { workspace = true, optional = true } -futures = { workspace = true, optional = true } +chrono = { workspace = true, optional = true, features = ["std", "clock"] } +futures = { workspace = true, optional = true, features = ["std"] } rayon = { workspace = true, optional = true } # This is kinda a stupid work around. diff --git a/p2p/address-book/Cargo.toml b/p2p/address-book/Cargo.toml index f7f656c6..70ffcf06 100644 --- a/p2p/address-book/Cargo.toml +++ b/p2p/address-book/Cargo.toml @@ -11,20 +11,21 @@ cuprate-common = {path = "../../common"} monero-wire = {path= "../../net/monero-wire"} monero-p2p = {path = "../monero-p2p" } -tower = { version= "0.4.13", features = ["util", "buffer"] } -tokio = {version = "1.34.0", default-features = false, features = ["time", "fs", "rt"]} +tower = { workspace = true, features = ["util", "buffer"] } +tokio = { workspace = true, features = ["time", "fs", "rt"]} -futures = "0.3.29" -pin-project = "1.1.3" -async-trait = "0.1.74" +futures = { workspace = true, features = ["std"] } +pin-project = { workspace = true } +async-trait = { workspace = true } -thiserror = "1.0.50" -tracing = "0.1.40" +thiserror = { workspace = true } +tracing = { workspace = true, features = ["std", "attributes"] } -rand = "0.8.5" +rand = { workspace = true, features = ["std", "std_rng"] } -borsh = {version = "1.2.1", features = ["derive"]} +borsh = { workspace = true, features = ["derive", "std"]} [dev-dependencies] -tokio = {version = "1.34.0", features = ["rt-multi-thread", "macros"]} cuprate-test-utils = {path = "../../test-utils"} + +tokio = { workspace = true, features = ["rt-multi-thread", "macros"]} diff --git a/p2p/monero-p2p/Cargo.toml b/p2p/monero-p2p/Cargo.toml index 1566c537..378e4ae2 100644 --- a/p2p/monero-p2p/Cargo.toml +++ b/p2p/monero-p2p/Cargo.toml @@ -13,21 +13,21 @@ borsh = ["dep:borsh"] monero-wire = {path= "../../net/monero-wire"} cuprate-common = {path = "../../common", features = ["borsh"]} -tokio = {version= "1.34.0", default-features = false, features = ["net", "sync"]} -tokio-util = { version = "0.7.10", default-features = false, features = ["codec"] } -tokio-stream = {version = "0.1.14", default-features = false, features = ["sync"]} -futures = "0.3.29" -async-trait = "0.1.74" -tower = { version= "0.4.13", features = ["util"] } +tokio = { workspace = true, features = ["net", "sync", "macros"]} +tokio-util = { workspace = true, features = ["codec"] } +tokio-stream = { workspace = true, features = ["sync"]} +futures = { workspace = true, features = ["std"] } +async-trait = { workspace = true } +tower = { workspace = true, features = ["util"] } -thiserror = "1.0.50" -tracing = "0.1.40" +thiserror = { workspace = true } +tracing = { workspace = true, features = ["std"] } -borsh = {version = "1.2.1", default-features = false, features = ["derive", "std"], optional = true } +borsh = { workspace = true, default-features = false, features = ["derive", "std"], optional = true } [dev-dependencies] cuprate-test-utils = {path = "../../test-utils"} -hex = "0.4.3" -tokio = {version= "1.34.0", default-features = false, features = ["net", "rt-multi-thread", "rt", "macros"]} -tracing-subscriber = "0.3" +hex = { workspace = true } +tokio = { workspace = true, features = ["net", "rt-multi-thread", "rt", "macros"]} +tracing-subscriber = { workspace = true } diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index 30ef638b..e485781f 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" monero-wire = {path = "../net/monero-wire"} monero-p2p = {path = "../p2p/monero-p2p" } -futures = "0.3.29" -async-trait = "0.1.74" +futures = { workspace = true, features = ["std"] } +async-trait = { workspace = true } -borsh = {version = "1.2.1", features = ["derive"]} \ No newline at end of file +borsh = { workspace = true, features = ["derive"]} \ No newline at end of file