clean up dependencies

This commit is contained in:
Boog900 2024-01-21 14:46:03 +00:00
parent 50894bef89
commit b9334b6a90
No known key found for this signature in database
GPG key ID: 5401367FB7302004
11 changed files with 776 additions and 838 deletions

91
Cargo.lock generated
View file

@ -285,9 +285,7 @@ checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-targets 0.48.5",
]
@ -408,7 +406,6 @@ version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76"
dependencies = [
"rand_core",
"subtle",
]
@ -448,7 +445,6 @@ version = "0.1.0"
dependencies = [
"borsh",
"clap",
"crypto-bigint",
"cuprate-common",
"curve25519-dalek",
"dalek-ff-group",
@ -701,7 +697,6 @@ checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
@ -724,34 +719,12 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
]
[[package]]
name = "futures-sink"
version = "0.3.30"
@ -773,7 +746,6 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
@ -1063,16 +1035,6 @@ version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c"
[[package]]
name = "lock_api"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.20"
@ -1166,7 +1128,6 @@ dependencies = [
"thiserror",
"tokio",
"tracing",
"zeroize",
]
[[package]]
@ -1349,29 +1310,6 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-targets 0.48.5",
]
[[package]]
name = "password-hash"
version = "0.5.0"
@ -1483,9 +1421,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.76"
version = "1.0.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c"
checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae"
dependencies = [
"unicode-ident",
]
@ -1594,7 +1532,7 @@ dependencies = [
[[package]]
name = "randomx-rs"
version = "1.3.0"
source = "git+https://github.com/Cuprate/randomx-rs.git#6496a61208852a020575dafc160080cf50bda67f"
source = "git+https://github.com/Cuprate/randomx-rs.git?rev=6496a61#6496a61208852a020575dafc160080cf50bda67f"
dependencies = [
"bitflags 1.3.2",
"libc",
@ -1764,12 +1702,6 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "sct"
version = "0.7.1"
@ -1879,15 +1811,6 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1"
dependencies = [
"libc",
]
[[package]]
name = "simple-request"
version = "0.1.0"
@ -1909,9 +1832,9 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.12.0"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2593d31f82ead8df961d8bd23a64c2ccf2eb5dd34b0a34bfb4dd54011c72009e"
checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7"
[[package]]
name = "socket2"
@ -2055,13 +1978,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"num_cpus",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.48.0",
@ -2169,7 +2089,6 @@ version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",

View file

@ -35,32 +35,36 @@ opt-level = 1
opt-level = 3
[workspace.dependencies]
async-trait = { version = "0.1.74" }
borsh = { version = "1.2.1", features = ["derive"] }
bytes = { version = "1.5.0" }
clap = { version = "4.4.7" }
chrono = { version = "0.4.31" }
crypto-bigint = { version = "0.5.3" }
curve25519-dalek = { version = "4.1.1" }
dalek-ff-group = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b" }
dirs = { version = "5.0.1" }
futures = { version = "0.3.29" }
hex = { version = "0.4.3" }
monero-epee-bin-serde = { git = "https://github.com/monero-rs/monero-epee-bin-serde.git", rev = "e4a585a" }
monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b" }
multiexp = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b" }
randomx-rs = { version = "1.2.1" }
rand = { version = "0.8.5" }
rayon = { version = "1.8.0" }
serde_bytes = { version = "0.11.12" }
serde_json = { version = "1.0.108" }
serde = { version = "1.0.190", features = ["derive"] }
thiserror = { version = "1.0.50" }
tokio-util = { version = "0.7.10", features = ["codec"]}
tokio = { version = "1.33.0", features = ["full"] }
tower = { version = "0.4.13", features = ["util", "steer"] }
tracing-subscriber = { version = "0.3.17" }
tracing = { version = "0.1.40" }
async-trait = { version = "0.1.74", default-features = false }
borsh = { version = "1.2.1", default-features = false }
bytes = { version = "1.5.0", default-features = false }
clap = { version = "4.4.7", default-features = false }
chrono = { version = "0.4.31", default-features = false }
primitive-types = { version = "0.12.2", default-features = false }
curve25519-dalek = { version = "4.1.1", default-features = false }
dalek-ff-group = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b", default-features = false }
dirs = { version = "5.0.1", default-features = false }
futures = { version = "0.3.29", default-features = false }
hex = { version = "0.4.3", default-features = false }
hex-literal = { version = "0.4", default-features = false }
monero-epee-bin-serde = { git = "https://github.com/monero-rs/monero-epee-bin-serde.git", rev = "e4a585a", default-features = false }
monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b", default-features = false }
multiexp = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b", default-features = false }
pin-project = { version = "1.1.3", default-features = false }
randomx-rs = { git = "https://github.com/Cuprate/randomx-rs.git", rev = "6496a61", default-features = false }
rand = { version = "0.8.5", default-features = false }
rayon = { version = "1.8.0", default-features = false }
serde_bytes = { version = "0.11.12", default-features = false }
serde_json = { version = "1.0.108", default-features = false }
serde = { version = "1.0.190", default-features = false }
thiserror = { version = "1.0.50", default-features = false }
thread_local = { version = "1.1.7", default-features = false }
tokio-util = { version = "0.7.10", default-features = false }
tokio-stream = { version = "0.1.14", default-features = false }
tokio = { version = "1.33.0", default-features = false }
tower = { version = "0.4.13", default-features = false }
tracing-subscriber = { version = "0.3.17", default-features = false }
tracing = { version = "0.1.40", default-features = false }
## workspace.dev-dependencies
proptest = { version = "1" }

View file

@ -10,10 +10,10 @@ default = []
borsh = ["dep:borsh"]
[dependencies]
chrono = "0.4.24"
thiserror = "1"
hex = "0.4"
chrono = { workspace = true }
thiserror = { workspace = true }
hex = { workspace = true, features = ["std"] }
futures = "0.3.29"
futures = { workspace = true, features = ["std"] }
borsh = {version = "1.2.1", default-features = false, features = ["derive",], optional = true }
borsh = { workspace = true, features = ["derive", "std"], optional = true }

View file

@ -8,15 +8,17 @@ authors = ["Boog900"]
repository = "https://github.com/Cuprate/cuprate/tree/main/consensus"
[features]
default = ["binaries"]
default = []
binaries = [
"tokio/rt-multi-thread",
"tokio/macros",
"dep:tracing-subscriber",
"tower/retry",
"tower/balance",
"tower/buffer",
"tower/timeout",
"monero-serai/http-rpc",
"dep:curve25519-dalek",
"dep:tracing-subscriber",
"dep:serde_json",
"dep:serde",
"dep:monero-epee-bin-serde",
@ -27,27 +29,23 @@ binaries = [
]
[dependencies]
cuprate-common = {path = "../common"}
monero-consensus = {path = "./rules", features = ["rayon"]}
thiserror = "1"
tower = {version = "0.4", features = ["util"]}
tracing = "0.1"
futures = "0.3"
thiserror = { workspace = true }
tower = { workspace = true, features = ["util"] }
tracing = { workspace = true, features = ["std", "attributes"] }
futures = { workspace = true, features = ["std"] }
crypto-bigint = "0.5"
curve25519-dalek = "4"
randomx-rs = {git = "https://github.com/Cuprate/randomx-rs.git"}
monero-serai = { workspace = true }
randomx-rs = { workspace = true }
monero-serai = { workspace = true, features = ["std"] }
multiexp = { workspace = true }
dalek-ff-group = { workspace = true }
cuprate-common = {path = "../common"}
rayon = "1"
thread_local = "1.1.7"
tokio = "1"
tokio-util = "0.7"
rayon = { workspace = true }
thread_local = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
hex = "0.4"
@ -60,6 +58,7 @@ tracing-subscriber = {version = "0.3", optional = true}
borsh = { workspace = true, optional = true}
dirs = {version="5.0", optional = true}
clap = { version = "4.4.8", optional = true, features = ["derive"] }
curve25519-dalek = { workspace = true, optional = true }
# here to help cargo to pick a version - remove me
syn = "2.0.37"
@ -67,6 +66,6 @@ syn = "2.0.37"
[dev-dependencies]
monero-consensus = {path = "./rules", features = ["proptest"]}
tokio = {version = "1", features = ["rt-multi-thread", "macros"]}
proptest = "1"
proptest-derive = "0.4.0"
tokio = { workspace = true, features = ["rt-multi-thread", "macros"]}
proptest = { workspace = true }
proptest-derive = { workspace = true }

View file

@ -12,19 +12,18 @@ rayon = ["dep:rayon"]
cryptonight-cuprate = {path = "../../cryptonight"}
cuprate-common = {path = "../../common"}
monero-serai = { workspace = true }
multiexp = { workspace = true }
dalek-ff-group = { workspace = true }
zeroize = "1.7"
curve25519-dalek = { workspace = true }
monero-serai = { workspace = true, features = ["std"] }
multiexp = { workspace = true, features = ["std", "batch"] }
dalek-ff-group = { workspace = true, features = ["std"] }
curve25519-dalek = { workspace = true, features = ["alloc", "zeroize", "precomputed-tables"] }
rand = { workspace = true }
rand = { workspace = true, features = ["std"] }
hex = "0.4"
hex-literal = "0.4"
primitive-types = { version = "0.12.2", default-features = false }
hex = { workspace = true, features = ["std"] }
hex-literal = { workspace = true }
primitive-types = { workspace = true } # enabiling std pulls in 9 extra crates!
tracing = { workspace = true }
tracing = { workspace = true, features = ["std"] }
thiserror = { workspace = true }
rayon = { workspace = true, optional = true }

View file

@ -1,506 +1,518 @@
#![cfg(feature = "binaries")]
use std::{
collections::{HashMap, HashSet},
ops::Range,
path::PathBuf,
sync::Arc,
};
use clap::Parser;
use futures::{
channel::{mpsc, oneshot},
SinkExt, StreamExt,
};
use monero_serai::{block::Block, transaction::Transaction};
use tokio::sync::RwLock;
use tower::{Service, ServiceExt};
use tracing::level_filters::LevelFilter;
use cuprate_common::Network;
use cuprate_consensus::{
block::PrePreparedBlockExPOW,
context::{
BlockChainContextRequest, BlockChainContextResponse, ContextConfig,
UpdateBlockchainCacheData,
},
initialize_blockchain_context, initialize_verifier,
randomx::RandomXVM,
rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig},
Database, DatabaseRequest, DatabaseResponse, PrePreparedBlock, VerifiedBlockInformation,
VerifyBlockRequest, VerifyBlockResponse,
};
use monero_consensus::{blocks::randomx_seed_height, HardFork};
mod tx_pool;
const MAX_BLOCKS_IN_RANGE: u64 = 500;
const BATCHES_IN_REQUEST: u64 = 3;
const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 1000;
#[cfg(feature = "binaries")]
mod bin {
use std::{
collections::{HashMap, HashSet},
ops::Range,
path::PathBuf,
sync::Arc,
};
/// Calls for a batch of blocks, returning the response and the time it took.
async fn call_batch<D: Database>(
range: Range<u64>,
database: D,
) -> Result<DatabaseResponse, tower::BoxError> {
database
.oneshot(DatabaseRequest::BlockBatchInRange(range))
.await
}
use clap::Parser;
use futures::{
channel::{mpsc, oneshot},
SinkExt, StreamExt,
};
use monero_serai::{block::Block, transaction::Transaction};
use tokio::sync::RwLock;
use tower::{Service, ServiceExt};
use tracing::level_filters::LevelFilter;
async fn update_cache_and_context<Ctx>(
cache: &RwLock<ScanningCache>,
context_updater: &mut Ctx,
verified_block_info: VerifiedBlockInformation,
) -> Result<(), tower::BoxError>
where
Ctx: tower::Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
>,
{
// add the new block to the cache
cache.write().await.add_new_block_data(
verified_block_info.generated_coins,
&verified_block_info.block.miner_tx,
&verified_block_info.txs,
);
// update the chain context svc with the new block
context_updater
.ready()
.await?
.call(BlockChainContextRequest::Update(
UpdateBlockchainCacheData {
new_top_hash: verified_block_info.block_hash,
height: verified_block_info.height,
timestamp: verified_block_info.block.header.timestamp,
weight: verified_block_info.weight,
long_term_weight: verified_block_info.long_term_weight,
vote: verified_block_info.hf_vote,
generated_coins: verified_block_info.generated_coins,
cumulative_difficulty: verified_block_info.cumulative_difficulty,
},
))
.await?;
use cuprate_common::Network;
Ok(())
}
use cuprate_consensus::{
block::PrePreparedBlockExPOW,
context::{
BlockChainContextRequest, BlockChainContextResponse, ContextConfig,
UpdateBlockchainCacheData,
},
initialize_blockchain_context, initialize_verifier,
randomx::RandomXVM,
rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig},
Database, DatabaseRequest, DatabaseResponse, PrePreparedBlock, VerifiedBlockInformation,
VerifyBlockRequest, VerifyBlockResponse,
};
async fn call_blocks<D>(
mut new_tx_chan: tx_pool::NewTxChanSen,
mut block_chan: mpsc::Sender<Vec<Block>>,
start_height: u64,
chain_height: u64,
database: D,
) -> Result<(), tower::BoxError>
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
let mut next_fut = tokio::spawn(call_batch(
start_height..(start_height + (MAX_BLOCKS_IN_RANGE * BATCHES_IN_REQUEST)).min(chain_height),
database.clone(),
));
use monero_consensus::{blocks::randomx_seed_height, HardFork};
for next_batch_start in (start_height..chain_height)
.step_by((MAX_BLOCKS_IN_RANGE * BATCHES_IN_REQUEST) as usize)
.skip(1)
use super::tx_pool;
const MAX_BLOCKS_IN_RANGE: u64 = 500;
const BATCHES_IN_REQUEST: u64 = 3;
const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 1000;
/// Calls for a batch of blocks, returning the response and the time it took.
async fn call_batch<D: Database>(
range: Range<u64>,
database: D,
) -> Result<DatabaseResponse, tower::BoxError> {
database
.oneshot(DatabaseRequest::BlockBatchInRange(range))
.await
}
async fn update_cache_and_context<Ctx>(
cache: &RwLock<ScanningCache>,
context_updater: &mut Ctx,
verified_block_info: VerifiedBlockInformation,
) -> Result<(), tower::BoxError>
where
Ctx: tower::Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
>,
{
// Call the next batch while we handle this batch.
let current_fut = std::mem::replace(
&mut next_fut,
tokio::spawn(call_batch(
next_batch_start
..(next_batch_start + (MAX_BLOCKS_IN_RANGE * BATCHES_IN_REQUEST))
.min(chain_height),
database.clone(),
)),
// add the new block to the cache
cache.write().await.add_new_block_data(
verified_block_info.generated_coins,
&verified_block_info.block.miner_tx,
&verified_block_info.txs,
);
// update the chain context svc with the new block
context_updater
.ready()
.await?
.call(BlockChainContextRequest::Update(
UpdateBlockchainCacheData {
new_top_hash: verified_block_info.block_hash,
height: verified_block_info.height,
timestamp: verified_block_info.block.header.timestamp,
weight: verified_block_info.weight,
long_term_weight: verified_block_info.long_term_weight,
vote: verified_block_info.hf_vote,
generated_coins: verified_block_info.generated_coins,
cumulative_difficulty: verified_block_info.cumulative_difficulty,
},
))
.await?;
let DatabaseResponse::BlockBatchInRange(blocks) = current_fut.await?? else {
panic!("Database sent incorrect response!");
};
Ok(())
}
tracing::info!(
"Got batch: {:?}, chain height: {}",
(next_batch_start - (MAX_BLOCKS_IN_RANGE * BATCHES_IN_REQUEST))..(next_batch_start),
chain_height
);
async fn call_blocks<D>(
mut new_tx_chan: tx_pool::NewTxChanSen,
mut block_chan: mpsc::Sender<Vec<Block>>,
start_height: u64,
chain_height: u64,
database: D,
) -> Result<(), tower::BoxError>
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
let mut next_fut = tokio::spawn(call_batch(
start_height
..(start_height + (MAX_BLOCKS_IN_RANGE * BATCHES_IN_REQUEST)).min(chain_height),
database.clone(),
));
let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
for next_batch_start in (start_height..chain_height)
.step_by((MAX_BLOCKS_IN_RANGE * BATCHES_IN_REQUEST) as usize)
.skip(1)
{
// Call the next batch while we handle this batch.
let current_fut = std::mem::replace(
&mut next_fut,
tokio::spawn(call_batch(
next_batch_start
..(next_batch_start + (MAX_BLOCKS_IN_RANGE * BATCHES_IN_REQUEST))
.min(chain_height),
database.clone(),
)),
);
let hf = |block: &Block| HardFork::from_version(block.header.major_version).unwrap();
let DatabaseResponse::BlockBatchInRange(blocks) = current_fut.await?? else {
panic!("Database sent incorrect response!");
};
let txs_hf = if blocks.first().map(hf) == blocks.last().map(hf) {
vec![(
txs.into_iter().flatten().collect::<Vec<_>>(),
blocks.first().map(hf).unwrap(),
)]
} else {
let mut txs_hfs: Vec<(Vec<Transaction>, HardFork)> = Vec::new();
let mut last_hf = blocks.first().map(hf).unwrap();
tracing::info!(
"Got batch: {:?}, chain height: {}",
(next_batch_start - (MAX_BLOCKS_IN_RANGE * BATCHES_IN_REQUEST))..(next_batch_start),
chain_height
);
txs_hfs.push((vec![], last_hf));
let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
for (mut txs, current_hf) in txs.into_iter().zip(blocks.iter().map(hf)) {
if current_hf == last_hf {
assert_eq!(txs_hfs.last_mut().unwrap().1, current_hf);
txs_hfs.last_mut().unwrap().0.append(&mut txs);
} else {
txs_hfs.push((txs, current_hf));
last_hf = current_hf;
let hf = |block: &Block| HardFork::from_version(block.header.major_version).unwrap();
let txs_hf = if blocks.first().map(hf) == blocks.last().map(hf) {
vec![(
txs.into_iter().flatten().collect::<Vec<_>>(),
blocks.first().map(hf).unwrap(),
)]
} else {
let mut txs_hfs: Vec<(Vec<Transaction>, HardFork)> = Vec::new();
let mut last_hf = blocks.first().map(hf).unwrap();
txs_hfs.push((vec![], last_hf));
for (mut txs, current_hf) in txs.into_iter().zip(blocks.iter().map(hf)) {
if current_hf == last_hf {
assert_eq!(txs_hfs.last_mut().unwrap().1, current_hf);
txs_hfs.last_mut().unwrap().0.append(&mut txs);
} else {
txs_hfs.push((txs, current_hf));
last_hf = current_hf;
}
}
}
txs_hfs
txs_hfs
};
let (tx, rx) = oneshot::channel();
new_tx_chan.send((txs_hf, tx)).await?;
rx.await.unwrap().unwrap();
block_chan.send(blocks).await?;
}
Ok(())
}
async fn scan_chain<D>(
cache: Arc<RwLock<ScanningCache>>,
save_file: PathBuf,
_rpc_config: Arc<std::sync::RwLock<RpcConfig>>,
database: D,
net: Network,
) -> Result<(), tower::BoxError>
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
tracing::info!("Beginning chain scan");
// TODO: when we implement all rules use the RPCs chain height, for now we don't check v2 txs.
let chain_height = 3_152_725;
tracing::info!("scanning to chain height: {}", chain_height);
let config = match net {
Network::Mainnet => ContextConfig::main_net(),
Network::Stagenet => ContextConfig::stage_net(),
Network::Testnet => ContextConfig::test_net(),
};
let mut ctx_svc = initialize_blockchain_context(config, database.clone()).await?;
let (tx, rx) = oneshot::channel();
new_tx_chan.send((txs_hf, tx)).await?;
rx.await.unwrap().unwrap();
block_chan.send(blocks).await?;
}
let (tx_pool_svc, new_tx_chan) = tx_pool::TxPool::spawn(rx, ctx_svc.clone()).await?;
Ok(())
}
let (mut block_verifier, transaction_verifier) =
initialize_verifier(database.clone(), tx_pool_svc, ctx_svc.clone()).await?;
async fn scan_chain<D>(
cache: Arc<RwLock<ScanningCache>>,
save_file: PathBuf,
_rpc_config: Arc<std::sync::RwLock<RpcConfig>>,
database: D,
net: Network,
) -> Result<(), tower::BoxError>
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
tracing::info!("Beginning chain scan");
tx.send(transaction_verifier).map_err(|_| "").unwrap();
// TODO: when we implement all rules use the RPCs chain height, for now we don't check v2 txs.
let chain_height = 3_152_725;
let start_height = cache.read().await.height;
tracing::info!("scanning to chain height: {}", chain_height);
let (block_tx, mut incoming_blocks) = mpsc::channel(3);
let config = match net {
Network::Mainnet => ContextConfig::main_net(),
Network::Stagenet => ContextConfig::stage_net(),
Network::Testnet => ContextConfig::test_net(),
};
let (mut prepped_blocks_tx, mut prepped_blocks_rx) = mpsc::channel(3);
let mut ctx_svc = initialize_blockchain_context(config, database.clone()).await?;
tokio::spawn(async move {
call_blocks(new_tx_chan, block_tx, start_height, chain_height, database).await
});
let (tx, rx) = oneshot::channel();
let BlockChainContextResponse::Context(ctx) = ctx_svc
.ready()
.await?
.call(BlockChainContextRequest::Get)
.await?
else {
panic!("ctx svc sent wrong response!");
};
let mut rx_seed_cache = ctx.unchecked_blockchain_context().rx_seed_cache.clone();
let mut rx_seed_cache_initiated = false;
let (tx_pool_svc, new_tx_chan) = tx_pool::TxPool::spawn(rx, ctx_svc.clone()).await?;
let mut randomx_vms: Option<HashMap<u64, RandomXVM>> = Some(HashMap::new());
let (mut block_verifier, transaction_verifier) =
initialize_verifier(database.clone(), tx_pool_svc, ctx_svc.clone()).await?;
let mut cloned_ctx_svc = ctx_svc.clone();
tokio::spawn(async move {
while let Some(blocks) = incoming_blocks.next().await {
if blocks.last().unwrap().header.major_version >= 12 {
if !rx_seed_cache_initiated {
let BlockChainContextResponse::Context(ctx) = cloned_ctx_svc
.ready()
.await
.unwrap()
.call(BlockChainContextRequest::Get)
.await
.unwrap()
else {
panic!("ctx svc sent wrong response!");
};
rx_seed_cache = ctx.unchecked_blockchain_context().rx_seed_cache.clone();
rx_seed_cache_initiated = true;
}
tx.send(transaction_verifier).map_err(|_| "").unwrap();
let unwrapped_rx_vms = randomx_vms.as_mut().unwrap();
let start_height = cache.read().await.height;
let (block_tx, mut incoming_blocks) = mpsc::channel(3);
let (mut prepped_blocks_tx, mut prepped_blocks_rx) = mpsc::channel(3);
tokio::spawn(async move {
call_blocks(new_tx_chan, block_tx, start_height, chain_height, database).await
});
let BlockChainContextResponse::Context(ctx) = ctx_svc
.ready()
.await?
.call(BlockChainContextRequest::Get)
.await?
else {
panic!("ctx svc sent wrong response!");
};
let mut rx_seed_cache = ctx.unchecked_blockchain_context().rx_seed_cache.clone();
let mut rx_seed_cache_initiated = false;
let mut randomx_vms: Option<HashMap<u64, RandomXVM>> = Some(HashMap::new());
let mut cloned_ctx_svc = ctx_svc.clone();
tokio::spawn(async move {
while let Some(blocks) = incoming_blocks.next().await {
if blocks.last().unwrap().header.major_version >= 12 {
if !rx_seed_cache_initiated {
let BlockChainContextResponse::Context(ctx) = cloned_ctx_svc
.ready()
.await
.unwrap()
.call(BlockChainContextRequest::Get)
.await
.unwrap()
else {
panic!("ctx svc sent wrong response!");
};
rx_seed_cache = ctx.unchecked_blockchain_context().rx_seed_cache.clone();
rx_seed_cache_initiated = true;
}
let unwrapped_rx_vms = randomx_vms.as_mut().unwrap();
let blocks = rayon_spawn_async(move || {
blocks
.into_iter()
.map(move |block| PrePreparedBlockExPOW::new(block).unwrap())
.collect::<Vec<_>>()
})
.await;
let seeds_needed = blocks
.iter()
.map(|block| {
rx_seed_cache.new_block(block.block.number() as u64, &block.block_hash);
randomx_seed_height(block.block.number() as u64)
let blocks = rayon_spawn_async(move || {
blocks
.into_iter()
.map(move |block| PrePreparedBlockExPOW::new(block).unwrap())
.collect::<Vec<_>>()
})
.collect::<HashSet<_>>();
.await;
unwrapped_rx_vms.retain(|seed_height, _| seeds_needed.contains(seed_height));
let seeds_needed = blocks
.iter()
.map(|block| {
rx_seed_cache.new_block(block.block.number() as u64, &block.block_hash);
randomx_seed_height(block.block.number() as u64)
})
.collect::<HashSet<_>>();
for seed_height in seeds_needed {
unwrapped_rx_vms.entry(seed_height).or_insert_with(|| {
RandomXVM::new(rx_seed_cache.get_seeds_hash(seed_height)).unwrap()
});
unwrapped_rx_vms.retain(|seed_height, _| seeds_needed.contains(seed_height));
for seed_height in seeds_needed {
unwrapped_rx_vms.entry(seed_height).or_insert_with(|| {
RandomXVM::new(rx_seed_cache.get_seeds_hash(seed_height)).unwrap()
});
}
let arc_rx_vms = Arc::new(randomx_vms.take().unwrap());
let cloned_arc_rx_vms = arc_rx_vms.clone();
let blocks = rayon_spawn_async(move || {
blocks
.into_iter()
.map(move |block| {
let rx_vm = arc_rx_vms
.get(&randomx_seed_height(block.block.number() as u64))
.unwrap();
PrePreparedBlock::new_rx(block, rx_vm).unwrap()
})
.collect::<Vec<_>>()
})
.await;
randomx_vms = Some(Arc::into_inner(cloned_arc_rx_vms).unwrap());
prepped_blocks_tx.send(blocks).await.unwrap();
} else {
let blocks = rayon_spawn_async(move || {
blocks
.into_iter()
.map(move |block| PrePreparedBlock::new(block).unwrap())
.collect::<Vec<_>>()
})
.await;
prepped_blocks_tx.send(blocks).await.unwrap();
}
}
});
while let Some(incoming_blocks) = prepped_blocks_rx.next().await {
let mut height;
for block in incoming_blocks {
let VerifyBlockResponse::MainChain(verified_block_info) = block_verifier
.ready()
.await?
.call(VerifyBlockRequest::MainChainPrepared(block))
.await?;
height = verified_block_info.height;
if verified_block_info.height % 5000 == 0 {
tracing::info!("saving cache to: {}", save_file.display());
cache.write().await.save(&save_file).unwrap();
}
let arc_rx_vms = Arc::new(randomx_vms.take().unwrap());
let cloned_arc_rx_vms = arc_rx_vms.clone();
let blocks = rayon_spawn_async(move || {
blocks
.into_iter()
.map(move |block| {
let rx_vm = arc_rx_vms
.get(&randomx_seed_height(block.block.number() as u64))
.unwrap();
PrePreparedBlock::new_rx(block, rx_vm).unwrap()
})
.collect::<Vec<_>>()
})
.await;
update_cache_and_context(&cache, &mut ctx_svc, verified_block_info).await?;
randomx_vms = Some(Arc::into_inner(cloned_arc_rx_vms).unwrap());
prepped_blocks_tx.send(blocks).await.unwrap();
} else {
let blocks = rayon_spawn_async(move || {
blocks
.into_iter()
.map(move |block| PrePreparedBlock::new(block).unwrap())
.collect::<Vec<_>>()
})
.await;
prepped_blocks_tx.send(blocks).await.unwrap();
if height % 200 == 0 {
tracing::info!(
"verified blocks: {:?}, chain height: {}",
0..height,
chain_height
);
}
}
}
});
while let Some(incoming_blocks) = prepped_blocks_rx.next().await {
let mut height;
for block in incoming_blocks {
let VerifyBlockResponse::MainChain(verified_block_info) = block_verifier
.ready()
.await?
.call(VerifyBlockRequest::MainChainPrepared(block))
.await?;
height = verified_block_info.height;
if verified_block_info.height % 5000 == 0 {
tracing::info!("saving cache to: {}", save_file.display());
cache.write().await.save(&save_file).unwrap();
}
update_cache_and_context(&cache, &mut ctx_svc, verified_block_info).await?;
if height % 200 == 0 {
tracing::info!(
"verified blocks: {:?}, chain height: {}",
0..height,
chain_height
);
}
}
Ok(())
}
Ok(())
}
#[derive(Parser)]
struct Args {
/// The log level, valid values:
/// "off", "error", "warn", "info", "debug", "trace", or a number 0-5.
#[arg(short, long, default_value = "info")]
log_level: LevelFilter,
/// The network we should scan, valid values:
/// "mainnet", "testnet", "stagenet".
#[arg(short, long, default_value = "mainnet")]
network: String,
/// A list of RPC nodes we should use.
/// Example: http://xmr-node.cakewallet.com:18081
#[arg(long)]
rpc_nodes: Vec<String>,
/// Stops the scanner from including the default list of nodes, this is not
/// recommended unless you have sufficient self defined nodes with `rpc_nodes`
#[arg(long)]
dont_use_default_nodes: bool,
/// The directory/ folder to save the scanning cache in.
/// This will default to your user cache directory.
#[arg(long)]
cache_dir: Option<PathBuf>,
async fn rayon_spawn_async<F, R>(f: F) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
rayon::spawn(|| {
let _ = tx.send(f());
});
rx.await.expect("The sender must not be dropped")
}
#[derive(Parser)]
struct Args {
/// The log level, valid values:
/// "off", "error", "warn", "info", "debug", "trace", or a number 0-5.
#[arg(short, long, default_value = "info")]
log_level: LevelFilter,
/// The network we should scan, valid values:
/// "mainnet", "testnet", "stagenet".
#[arg(short, long, default_value = "mainnet")]
network: String,
/// A list of RPC nodes we should use.
/// Example: http://xmr-node.cakewallet.com:18081
#[arg(long)]
rpc_nodes: Vec<String>,
/// Stops the scanner from including the default list of nodes, this is not
/// recommended unless you have sufficient self defined nodes with `rpc_nodes`
#[arg(long)]
dont_use_default_nodes: bool,
/// The directory/ folder to save the scanning cache in.
/// This will default to your user cache directory.
#[arg(long)]
cache_dir: Option<PathBuf>,
}
pub async fn run() {
let args = Args::parse();
if args.dont_use_default_nodes & args.rpc_nodes.is_empty() {
panic!("Can't run scanner with no RPC nodes, see `--help` ")
}
tracing_subscriber::fmt()
.with_max_level(args.log_level)
.init();
let network = match args.network.as_str() {
"mainnet" => Network::Mainnet,
"testnet" => Network::Testnet,
"stagenet" => Network::Stagenet,
_ => panic!("Invalid network, scanner currently only supports mainnet"),
};
let mut file_for_cache = match args.cache_dir {
Some(dir) => dir,
None => dirs::cache_dir().unwrap(),
};
match network {
Network::Mainnet => file_for_cache.push("cuprate_rpc_scanning_cache.bin"),
Network::Stagenet => file_for_cache.push("cuprate_rpc_scanning_cache_stage_net.bin"),
Network::Testnet => file_for_cache.push("cuprate_rpc_scanning_cache_test_net.bin"),
}
let mut urls = if args.dont_use_default_nodes {
vec![]
} else {
match network {
Network::Mainnet => vec![
"http://xmr-node.cakewallet.com:18081".to_string(),
"https://node.sethforprivacy.com".to_string(),
// "http://nodex.monerujo.io:18081".to_string(),
"http://nodes.hashvault.pro:18081".to_string(),
"http://node.c3pool.com:18081".to_string(),
"http://node.trocador.app:18089".to_string(),
"http://xmr.lukas.services:18089".to_string(),
"http://xmr-node-eu.cakewallet.com:18081".to_string(),
"http://68.118.241.70:18089".to_string(),
"http://145.239.97.211:18089".to_string(),
//
"http://xmr-node.cakewallet.com:18081".to_string(),
"https://node.sethforprivacy.com".to_string(),
// "http://nodex.monerujo.io:18081".to_string(),
"http://nodes.hashvault.pro:18081".to_string(),
"http://node.c3pool.com:18081".to_string(),
"http://node.trocador.app:18089".to_string(),
"http://xmr.lukas.services:18089".to_string(),
"http://xmr-node-eu.cakewallet.com:18081".to_string(),
"http://68.118.241.70:18089".to_string(),
"http://145.239.97.211:18089".to_string(),
],
Network::Testnet => vec![
"http://testnet.xmr-tw.org:28081".to_string(),
"http://node3.monerodevs.org:28089".to_string(),
"http://node.monerodevs.org:28089".to_string(),
"http://125.229.105.12:28081".to_string(),
"http://node2.monerodevs.org:28089".to_string(),
"https://testnet.xmr.ditatompel.com".to_string(),
"http://singapore.node.xmr.pm:28081".to_string(),
//
"http://testnet.xmr-tw.org:28081".to_string(),
"http://node3.monerodevs.org:28089".to_string(),
"http://node.monerodevs.org:28089".to_string(),
"http://125.229.105.12:28081".to_string(),
"http://node2.monerodevs.org:28089".to_string(),
"https://testnet.xmr.ditatompel.com".to_string(),
"http://singapore.node.xmr.pm:28081".to_string(),
],
Network::Stagenet => vec![
"http://125.229.105.12:38081".to_string(),
"http://90.189.159.23:38089".to_string(),
"http://stagenet.xmr-tw.org:38081".to_string(),
"http://node.monerodevs.org:38089".to_string(),
"http://stagenet.community.rino.io:38081".to_string(),
"http://node2.monerodevs.org:38089".to_string(),
"http://node3.monerodevs.org:38089".to_string(),
"http://singapore.node.xmr.pm:38081".to_string(),
"https://stagenet.xmr.ditatompel.com".to_string(),
"http://3.10.182.182:38081".to_string(),
//
"http://125.229.105.12:38081".to_string(),
"http://90.189.159.23:38089".to_string(),
"http://stagenet.xmr-tw.org:38081".to_string(),
"http://node.monerodevs.org:38089".to_string(),
"http://stagenet.community.rino.io:38081".to_string(),
"http://node2.monerodevs.org:38089".to_string(),
"http://node3.monerodevs.org:38089".to_string(),
"http://singapore.node.xmr.pm:38081".to_string(),
"https://stagenet.xmr.ditatompel.com".to_string(),
"http://3.10.182.182:38081".to_string(),
],
}
};
urls.extend(args.rpc_nodes.into_iter());
let rpc_config = RpcConfig::new(MAX_BLOCKS_IN_RANGE, MAX_BLOCKS_HEADERS_IN_RANGE);
let rpc_config = Arc::new(std::sync::RwLock::new(rpc_config));
tracing::info!("Attempting to open cache at: {}", file_for_cache.display());
let cache = match ScanningCache::load(&file_for_cache) {
Ok(cache) => {
tracing::info!("Reloaded from cache, chain height: {}", cache.height);
Arc::new(RwLock::new(cache))
}
Err(_) => {
tracing::warn!("Couldn't load from cache starting from scratch");
let mut cache = ScanningCache::default();
let genesis = monero_consensus::genesis::generate_genesis_block(&network);
let total_outs = genesis
.miner_tx
.prefix
.outputs
.iter()
.map(|out| out.amount.unwrap_or(0))
.sum::<u64>();
cache.add_new_block_data(total_outs, &genesis.miner_tx, &[]);
Arc::new(RwLock::new(cache))
}
};
let rpc = init_rpc_load_balancer(urls, cache.clone(), rpc_config.clone());
scan_chain(cache, file_for_cache, rpc_config, rpc, network)
.await
.unwrap();
}
}
#[cfg(feature = "binaries")]
#[tokio::main]
async fn main() {
let args = Args::parse();
if args.dont_use_default_nodes & args.rpc_nodes.is_empty() {
panic!("Can't run scanner with no RPC nodes, see `--help` ")
}
tracing_subscriber::fmt()
.with_max_level(args.log_level)
.init();
let network = match args.network.as_str() {
"mainnet" => Network::Mainnet,
"testnet" => Network::Testnet,
"stagenet" => Network::Stagenet,
_ => panic!("Invalid network, scanner currently only supports mainnet"),
};
let mut file_for_cache = match args.cache_dir {
Some(dir) => dir,
None => dirs::cache_dir().unwrap(),
};
match network {
Network::Mainnet => file_for_cache.push("cuprate_rpc_scanning_cache.bin"),
Network::Stagenet => file_for_cache.push("cuprate_rpc_scanning_cache_stage_net.bin"),
Network::Testnet => file_for_cache.push("cuprate_rpc_scanning_cache_test_net.bin"),
}
let mut urls = if args.dont_use_default_nodes {
vec![]
} else {
match network {
Network::Mainnet => vec![
"http://xmr-node.cakewallet.com:18081".to_string(),
"https://node.sethforprivacy.com".to_string(),
// "http://nodex.monerujo.io:18081".to_string(),
"http://nodes.hashvault.pro:18081".to_string(),
"http://node.c3pool.com:18081".to_string(),
"http://node.trocador.app:18089".to_string(),
"http://xmr.lukas.services:18089".to_string(),
"http://xmr-node-eu.cakewallet.com:18081".to_string(),
"http://68.118.241.70:18089".to_string(),
"http://145.239.97.211:18089".to_string(),
//
"http://xmr-node.cakewallet.com:18081".to_string(),
"https://node.sethforprivacy.com".to_string(),
// "http://nodex.monerujo.io:18081".to_string(),
"http://nodes.hashvault.pro:18081".to_string(),
"http://node.c3pool.com:18081".to_string(),
"http://node.trocador.app:18089".to_string(),
"http://xmr.lukas.services:18089".to_string(),
"http://xmr-node-eu.cakewallet.com:18081".to_string(),
"http://68.118.241.70:18089".to_string(),
"http://145.239.97.211:18089".to_string(),
],
Network::Testnet => vec![
"http://testnet.xmr-tw.org:28081".to_string(),
"http://node3.monerodevs.org:28089".to_string(),
"http://node.monerodevs.org:28089".to_string(),
"http://125.229.105.12:28081".to_string(),
"http://node2.monerodevs.org:28089".to_string(),
"https://testnet.xmr.ditatompel.com".to_string(),
"http://singapore.node.xmr.pm:28081".to_string(),
//
"http://testnet.xmr-tw.org:28081".to_string(),
"http://node3.monerodevs.org:28089".to_string(),
"http://node.monerodevs.org:28089".to_string(),
"http://125.229.105.12:28081".to_string(),
"http://node2.monerodevs.org:28089".to_string(),
"https://testnet.xmr.ditatompel.com".to_string(),
"http://singapore.node.xmr.pm:28081".to_string(),
],
Network::Stagenet => vec![
"http://125.229.105.12:38081".to_string(),
"http://90.189.159.23:38089".to_string(),
"http://stagenet.xmr-tw.org:38081".to_string(),
"http://node.monerodevs.org:38089".to_string(),
"http://stagenet.community.rino.io:38081".to_string(),
"http://node2.monerodevs.org:38089".to_string(),
"http://node3.monerodevs.org:38089".to_string(),
"http://singapore.node.xmr.pm:38081".to_string(),
"https://stagenet.xmr.ditatompel.com".to_string(),
"http://3.10.182.182:38081".to_string(),
//
"http://125.229.105.12:38081".to_string(),
"http://90.189.159.23:38089".to_string(),
"http://stagenet.xmr-tw.org:38081".to_string(),
"http://node.monerodevs.org:38089".to_string(),
"http://stagenet.community.rino.io:38081".to_string(),
"http://node2.monerodevs.org:38089".to_string(),
"http://node3.monerodevs.org:38089".to_string(),
"http://singapore.node.xmr.pm:38081".to_string(),
"https://stagenet.xmr.ditatompel.com".to_string(),
"http://3.10.182.182:38081".to_string(),
],
}
};
urls.extend(args.rpc_nodes.into_iter());
let rpc_config = RpcConfig::new(MAX_BLOCKS_IN_RANGE, MAX_BLOCKS_HEADERS_IN_RANGE);
let rpc_config = Arc::new(std::sync::RwLock::new(rpc_config));
tracing::info!("Attempting to open cache at: {}", file_for_cache.display());
let cache = match ScanningCache::load(&file_for_cache) {
Ok(cache) => {
tracing::info!("Reloaded from cache, chain height: {}", cache.height);
Arc::new(RwLock::new(cache))
}
Err(_) => {
tracing::warn!("Couldn't load from cache starting from scratch");
let mut cache = ScanningCache::default();
let genesis = monero_consensus::genesis::generate_genesis_block(&network);
let total_outs = genesis
.miner_tx
.prefix
.outputs
.iter()
.map(|out| out.amount.unwrap_or(0))
.sum::<u64>();
cache.add_new_block_data(total_outs, &genesis.miner_tx, &[]);
Arc::new(RwLock::new(cache))
}
};
let rpc = init_rpc_load_balancer(urls, cache.clone(), rpc_config.clone());
scan_chain(cache, file_for_cache, rpc_config, rpc, network)
.await
.unwrap();
bin::run().await
}
async fn rayon_spawn_async<F, R>(f: F) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
rayon::spawn(|| {
let _ = tx.send(f());
});
rx.await.expect("The sender must not be dropped")
}
#[cfg(not(feature = "binaries"))]
fn main() {}

View file

@ -1,138 +1,102 @@
#![cfg(feature = "binaries")]
#[cfg(feature = "binaries")]
mod bin {
use std::{
collections::HashMap,
sync::{Arc, Mutex},
task::{Context, Poll},
};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
task::{Context, Poll},
};
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use monero_serai::transaction::Transaction;
use tower::{Service, ServiceExt};
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use monero_serai::transaction::Transaction;
use tower::{Service, ServiceExt};
use cuprate_common::tower_utils::InfallibleOneshotReceiver;
use cuprate_common::tower_utils::InfallibleOneshotReceiver;
use cuprate_consensus::{
context::{
BlockChainContext, BlockChainContextRequest, BlockChainContextResponse,
RawBlockChainContext,
},
transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse},
ExtendedConsensusError, TxNotInPool, TxPoolRequest, TxPoolResponse,
};
use monero_consensus::HardFork;
use cuprate_consensus::{
context::{
BlockChainContext, BlockChainContextRequest, BlockChainContextResponse,
RawBlockChainContext,
},
transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse},
ExtendedConsensusError, TxNotInPool, TxPoolRequest, TxPoolResponse,
};
use monero_consensus::HardFork;
#[derive(Clone)]
pub struct TxPoolHandle {
tx_pool_task: std::sync::Arc<tokio::task::JoinHandle<()>>,
tx_pool_chan: mpsc::Sender<(
TxPoolRequest,
oneshot::Sender<Result<TxPoolResponse, TxNotInPool>>,
)>,
}
impl tower::Service<TxPoolRequest> for TxPoolHandle {
type Response = TxPoolResponse;
type Error = TxNotInPool;
type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.tx_pool_task.is_finished() {
panic!("Tx pool task finished before it was supposed to!");
};
self.tx_pool_chan
.poll_ready(cx)
.map_err(|_| panic!("Tx pool channel closed before it was supposed to"))
#[derive(Clone)]
pub struct TxPoolHandle {
tx_pool_task: std::sync::Arc<tokio::task::JoinHandle<()>>,
tx_pool_chan: mpsc::Sender<(
TxPoolRequest,
oneshot::Sender<Result<TxPoolResponse, TxNotInPool>>,
)>,
}
fn call(&mut self, req: TxPoolRequest) -> Self::Future {
let (tx, rx) = oneshot::channel();
self.tx_pool_chan
.try_send((req, tx))
.expect("You need to use `poll_ready` to check capacity!");
impl tower::Service<TxPoolRequest> for TxPoolHandle {
type Response = TxPoolResponse;
type Error = TxNotInPool;
type Future = InfallibleOneshotReceiver<Result<Self::Response, Self::Error>>;
rx.into()
}
}
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.tx_pool_task.is_finished() {
panic!("Tx pool task finished before it was supposed to!");
};
pub type NewTxChanRec = mpsc::Receiver<(
Vec<(Vec<Transaction>, HardFork)>,
oneshot::Sender<Result<(), tower::BoxError>>,
)>;
self.tx_pool_chan
.poll_ready(cx)
.map_err(|_| panic!("Tx pool channel closed before it was supposed to"))
}
pub type NewTxChanSen = mpsc::Sender<(
Vec<(Vec<Transaction>, HardFork)>,
oneshot::Sender<Result<(), tower::BoxError>>,
)>;
fn call(&mut self, req: TxPoolRequest) -> Self::Future {
let (tx, rx) = oneshot::channel();
self.tx_pool_chan
.try_send((req, tx))
.expect("You need to use `poll_ready` to check capacity!");
pub struct TxPool<TxV, Ctx> {
txs: Arc<Mutex<HashMap<[u8; 32], Arc<TransactionVerificationData>>>>,
current_ctx: BlockChainContext,
tx_verifier: Option<TxV>,
tx_verifier_chan: Option<oneshot::Receiver<TxV>>,
ctx_svc: Ctx,
}
impl<TxV, Ctx> TxPool<TxV, Ctx>
where
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
+ 'static,
TxV::Future: Send + 'static,
Ctx: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
Ctx::Future: Send + 'static,
{
pub async fn spawn(
tx_verifier_chan: oneshot::Receiver<TxV>,
mut ctx_svc: Ctx,
) -> Result<(TxPoolHandle, NewTxChanSen), tower::BoxError> {
let BlockChainContextResponse::Context(current_ctx) = ctx_svc
.ready()
.await?
.call(BlockChainContextRequest::Get)
.await?
else {
panic!("Context service service returned wrong response!")
};
let tx_pool = TxPool {
txs: Default::default(),
current_ctx,
tx_verifier: None,
tx_verifier_chan: Some(tx_verifier_chan),
ctx_svc,
};
let (tx_pool_tx, tx_pool_rx) = mpsc::channel(3);
let (new_tx_tx, new_tx_rx) = mpsc::channel(3);
let tx_pool_task = tokio::spawn(tx_pool.run(tx_pool_rx, new_tx_rx));
Ok((
TxPoolHandle {
tx_pool_task: tx_pool_task.into(),
tx_pool_chan: tx_pool_tx,
},
new_tx_tx,
))
rx.into()
}
}
async fn get_or_update_ctx(&mut self) -> Result<RawBlockChainContext, tower::BoxError> {
if let Ok(current_ctx) = self.current_ctx.blockchain_context().cloned() {
Ok(current_ctx)
} else {
let BlockChainContextResponse::Context(current_ctx) = self
.ctx_svc
pub type NewTxChanRec = mpsc::Receiver<(
Vec<(Vec<Transaction>, HardFork)>,
oneshot::Sender<Result<(), tower::BoxError>>,
)>;
pub type NewTxChanSen = mpsc::Sender<(
Vec<(Vec<Transaction>, HardFork)>,
oneshot::Sender<Result<(), tower::BoxError>>,
)>;
pub struct TxPool<TxV, Ctx> {
txs: Arc<Mutex<HashMap<[u8; 32], Arc<TransactionVerificationData>>>>,
current_ctx: BlockChainContext,
tx_verifier: Option<TxV>,
tx_verifier_chan: Option<oneshot::Receiver<TxV>>,
ctx_svc: Ctx,
}
impl<TxV, Ctx> TxPool<TxV, Ctx>
where
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
+ 'static,
TxV::Future: Send + 'static,
Ctx: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Send
+ 'static,
Ctx::Future: Send + 'static,
{
pub async fn spawn(
tx_verifier_chan: oneshot::Receiver<TxV>,
mut ctx_svc: Ctx,
) -> Result<(TxPoolHandle, NewTxChanSen), tower::BoxError> {
let BlockChainContextResponse::Context(current_ctx) = ctx_svc
.ready()
.await?
.call(BlockChainContextRequest::Get)
@ -141,107 +105,147 @@ where
panic!("Context service service returned wrong response!")
};
self.current_ctx = current_ctx;
Ok(self.current_ctx.unchecked_blockchain_context().clone())
}
}
async fn handle_txs_req(
&mut self,
req: TxPoolRequest,
tx: oneshot::Sender<Result<TxPoolResponse, TxNotInPool>>,
) {
let TxPoolRequest::Transactions(txs_to_get) = req;
let mut res = Vec::with_capacity(txs_to_get.len());
for tx_hash in txs_to_get {
let Some(tx) = self.txs.lock().unwrap().remove(&tx_hash) else {
tracing::debug!("tx not in pool: {}", hex::encode(tx_hash));
let _ = tx.send(Err(TxNotInPool));
return;
let tx_pool = TxPool {
txs: Default::default(),
current_ctx,
tx_verifier: None,
tx_verifier_chan: Some(tx_verifier_chan),
ctx_svc,
};
res.push(tx)
let (tx_pool_tx, tx_pool_rx) = mpsc::channel(3);
let (new_tx_tx, new_tx_rx) = mpsc::channel(3);
let tx_pool_task = tokio::spawn(tx_pool.run(tx_pool_rx, new_tx_rx));
Ok((
TxPoolHandle {
tx_pool_task: tx_pool_task.into(),
tx_pool_chan: tx_pool_tx,
},
new_tx_tx,
))
}
let _ = tx.send(Ok(TxPoolResponse::Transactions(res)));
}
async fn handle_new_txs(
&mut self,
new_txs: Vec<(Vec<Transaction>, HardFork)>,
res_chan: oneshot::Sender<Result<(), tower::BoxError>>,
) -> Result<(), tower::BoxError> {
if self.tx_verifier.is_none() {
self.tx_verifier = Some(self.tx_verifier_chan.take().unwrap().await?);
}
let current_ctx = self.get_or_update_ctx().await?;
let mut tx_verifier = self.tx_verifier.clone().unwrap();
let tx_pool = self.txs.clone();
tokio::spawn(async move {
for (txs, hf) in new_txs {
// We only batch the setup a real tx pool would also call `VerifyTxRequest::Block`
let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier
async fn get_or_update_ctx(&mut self) -> Result<RawBlockChainContext, tower::BoxError> {
if let Ok(current_ctx) = self.current_ctx.blockchain_context().cloned() {
Ok(current_ctx)
} else {
let BlockChainContextResponse::Context(current_ctx) = self
.ctx_svc
.ready()
.await
.unwrap()
.call(VerifyTxRequest::BatchSetup {
txs,
hf,
re_org_token: current_ctx.re_org_token.clone(),
})
.await
.unwrap()
.await?
.call(BlockChainContextRequest::Get)
.await?
else {
panic!("Tx verifier sent incorrect response!");
panic!("Context service service returned wrong response!")
};
let mut locked_pool = tx_pool.lock().unwrap();
self.current_ctx = current_ctx;
for tx in txs {
let tx_hash = tx.tx_hash;
if locked_pool.insert(tx_hash, tx).is_some() {
panic!("added same tx to pool twice: {}", hex::encode(tx_hash))
Ok(self.current_ctx.unchecked_blockchain_context().clone())
}
}
async fn handle_txs_req(
&mut self,
req: TxPoolRequest,
tx: oneshot::Sender<Result<TxPoolResponse, TxNotInPool>>,
) {
let TxPoolRequest::Transactions(txs_to_get) = req;
let mut res = Vec::with_capacity(txs_to_get.len());
for tx_hash in txs_to_get {
let Some(tx) = self.txs.lock().unwrap().remove(&tx_hash) else {
tracing::debug!("tx not in pool: {}", hex::encode(tx_hash));
let _ = tx.send(Err(TxNotInPool));
return;
};
res.push(tx)
}
let _ = tx.send(Ok(TxPoolResponse::Transactions(res)));
}
async fn handle_new_txs(
&mut self,
new_txs: Vec<(Vec<Transaction>, HardFork)>,
res_chan: oneshot::Sender<Result<(), tower::BoxError>>,
) -> Result<(), tower::BoxError> {
if self.tx_verifier.is_none() {
self.tx_verifier = Some(self.tx_verifier_chan.take().unwrap().await?);
}
let current_ctx = self.get_or_update_ctx().await?;
let mut tx_verifier = self.tx_verifier.clone().unwrap();
let tx_pool = self.txs.clone();
tokio::spawn(async move {
for (txs, hf) in new_txs {
// We only batch the setup a real tx pool would also call `VerifyTxRequest::Block`
let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier
.ready()
.await
.unwrap()
.call(VerifyTxRequest::BatchSetup {
txs,
hf,
re_org_token: current_ctx.re_org_token.clone(),
})
.await
.unwrap()
else {
panic!("Tx verifier sent incorrect response!");
};
let mut locked_pool = tx_pool.lock().unwrap();
for tx in txs {
let tx_hash = tx.tx_hash;
if locked_pool.insert(tx_hash, tx).is_some() {
panic!("added same tx to pool twice: {}", hex::encode(tx_hash))
}
}
}
}
res_chan.send(Ok(())).unwrap();
});
Ok(())
}
res_chan.send(Ok(())).unwrap();
});
Ok(())
}
pub async fn run(
mut self,
mut tx_pool_handle: mpsc::Receiver<(
TxPoolRequest,
oneshot::Sender<Result<TxPoolResponse, TxNotInPool>>,
)>,
mut new_tx_channel: NewTxChanRec,
) {
loop {
tokio::select! {
biased;
new_txs = new_tx_channel.next() => {
let Some(new_txs) = new_txs else {
todo!("Shutdown txpool")
};
pub async fn run(
mut self,
mut tx_pool_handle: mpsc::Receiver<(
TxPoolRequest,
oneshot::Sender<Result<TxPoolResponse, TxNotInPool>>,
)>,
mut new_tx_channel: NewTxChanRec,
) {
loop {
tokio::select! {
biased;
new_txs = new_tx_channel.next() => {
let Some(new_txs) = new_txs else {
todo!("Shutdown txpool")
};
self.handle_new_txs(new_txs.0, new_txs.1).await.unwrap()
}
pool_req = tx_pool_handle.next() => {
let Some((req, tx)) = pool_req else {
todo!("Shutdown txpool")
};
self.handle_txs_req(req, tx).await;
self.handle_new_txs(new_txs.0, new_txs.1).await.unwrap()
}
pool_req = tx_pool_handle.next() => {
let Some((req, tx)) = pool_req else {
todo!("Shutdown txpool")
};
self.handle_txs_req(req, tx).await;
}
}
}
}
}
}
#[cfg(feature = "binaries")]
pub use bin::*;
#[allow(dead_code)]
fn main() {}

View file

@ -14,8 +14,8 @@ time = ["dep:chrono", "std"]
thread = ["std", "dep:target_os_lib"]
[dependencies]
chrono = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
chrono = { workspace = true, optional = true, features = ["std", "clock"] }
futures = { workspace = true, optional = true, features = ["std"] }
rayon = { workspace = true, optional = true }
# This is kinda a stupid work around.

View file

@ -11,20 +11,21 @@ cuprate-common = {path = "../../common"}
monero-wire = {path= "../../net/monero-wire"}
monero-p2p = {path = "../monero-p2p" }
tower = { version= "0.4.13", features = ["util", "buffer"] }
tokio = {version = "1.34.0", default-features = false, features = ["time", "fs", "rt"]}
tower = { workspace = true, features = ["util", "buffer"] }
tokio = { workspace = true, features = ["time", "fs", "rt"]}
futures = "0.3.29"
pin-project = "1.1.3"
async-trait = "0.1.74"
futures = { workspace = true, features = ["std"] }
pin-project = { workspace = true }
async-trait = { workspace = true }
thiserror = "1.0.50"
tracing = "0.1.40"
thiserror = { workspace = true }
tracing = { workspace = true, features = ["std", "attributes"] }
rand = "0.8.5"
rand = { workspace = true, features = ["std", "std_rng"] }
borsh = {version = "1.2.1", features = ["derive"]}
borsh = { workspace = true, features = ["derive", "std"]}
[dev-dependencies]
tokio = {version = "1.34.0", features = ["rt-multi-thread", "macros"]}
cuprate-test-utils = {path = "../../test-utils"}
tokio = { workspace = true, features = ["rt-multi-thread", "macros"]}

View file

@ -13,21 +13,21 @@ borsh = ["dep:borsh"]
monero-wire = {path= "../../net/monero-wire"}
cuprate-common = {path = "../../common", features = ["borsh"]}
tokio = {version= "1.34.0", default-features = false, features = ["net", "sync"]}
tokio-util = { version = "0.7.10", default-features = false, features = ["codec"] }
tokio-stream = {version = "0.1.14", default-features = false, features = ["sync"]}
futures = "0.3.29"
async-trait = "0.1.74"
tower = { version= "0.4.13", features = ["util"] }
tokio = { workspace = true, features = ["net", "sync", "macros"]}
tokio-util = { workspace = true, features = ["codec"] }
tokio-stream = { workspace = true, features = ["sync"]}
futures = { workspace = true, features = ["std"] }
async-trait = { workspace = true }
tower = { workspace = true, features = ["util"] }
thiserror = "1.0.50"
tracing = "0.1.40"
thiserror = { workspace = true }
tracing = { workspace = true, features = ["std"] }
borsh = {version = "1.2.1", default-features = false, features = ["derive", "std"], optional = true }
borsh = { workspace = true, default-features = false, features = ["derive", "std"], optional = true }
[dev-dependencies]
cuprate-test-utils = {path = "../../test-utils"}
hex = "0.4.3"
tokio = {version= "1.34.0", default-features = false, features = ["net", "rt-multi-thread", "rt", "macros"]}
tracing-subscriber = "0.3"
hex = { workspace = true }
tokio = { workspace = true, features = ["net", "rt-multi-thread", "rt", "macros"]}
tracing-subscriber = { workspace = true }

View file

@ -7,7 +7,7 @@ edition = "2021"
monero-wire = {path = "../net/monero-wire"}
monero-p2p = {path = "../p2p/monero-p2p" }
futures = "0.3.29"
async-trait = "0.1.74"
futures = { workspace = true, features = ["std"] }
async-trait = { workspace = true }
borsh = {version = "1.2.1", features = ["derive"]}
borsh = { workspace = true, features = ["derive"]}