diff --git a/Cargo.lock b/Cargo.lock index 1cd13f8b..6a13ba4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -445,6 +445,7 @@ checksum = "30582fc632330df2bd26877bde0c1f4470d57c582bbc070376afcd04d8cb4838" dependencies = [ "anstyle", "clap_lex", + "strsim", "terminal_size", ] @@ -1233,6 +1234,7 @@ dependencies = [ "toml", "tower 0.5.1", "tracing", + "tracing-appender", "tracing-subscriber", ] @@ -1268,7 +1270,7 @@ dependencies = [ [[package]] name = "dalek-ff-group" version = "0.4.1" -source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79" +source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66" dependencies = [ "crypto-bigint", "curve25519-dalek", @@ -1295,6 +1297,15 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", +] + [[package]] name = "diff" version = "0.1.13" @@ -1424,7 +1435,7 @@ dependencies = [ [[package]] name = "flexible-transcript" version = "0.3.2" -source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79" +source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66" dependencies = [ "blake2", "digest", @@ -2174,7 +2185,7 @@ dependencies = [ [[package]] name = "monero-address" version = "0.1.0" -source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79" +source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66" dependencies = [ "curve25519-dalek", "monero-io", @@ -2187,7 +2198,7 @@ dependencies = [ [[package]] name = "monero-borromean" version = "0.1.0" -source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79" +source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66" dependencies = [ "curve25519-dalek", "monero-generators", @@ -2200,7 +2211,7 @@ dependencies = [ [[package]] name = "monero-bulletproofs" version = "0.1.0" -source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79" +source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66" dependencies = [ "curve25519-dalek", "monero-generators", @@ -2215,7 +2226,7 @@ dependencies = [ [[package]] name = "monero-clsag" version = "0.1.0" -source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79" +source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66" dependencies = [ "curve25519-dalek", "dalek-ff-group", @@ -2235,7 +2246,7 @@ dependencies = [ [[package]] name = "monero-generators" version = "0.4.0" -source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79" +source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66" dependencies = [ "curve25519-dalek", "dalek-ff-group", @@ -2249,7 +2260,7 @@ dependencies = [ [[package]] name = "monero-io" version = "0.1.0" -source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79" +source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66" dependencies = [ "curve25519-dalek", "std-shims", @@ -2258,7 +2269,7 @@ dependencies = [ [[package]] name = "monero-mlsag" version = "0.1.0" -source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79" +source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66" dependencies = [ "curve25519-dalek", "monero-generators", @@ -2272,7 +2283,7 @@ dependencies = [ [[package]] name = "monero-primitives" version = "0.1.0" -source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79" +source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66" dependencies = [ "curve25519-dalek", "monero-generators", @@ -2285,9 +2296,8 @@ dependencies = [ [[package]] name = "monero-rpc" version = "0.1.0" -source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79" +source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66" dependencies = [ - "async-trait", "curve25519-dalek", "hex", "monero-address", @@ -2302,7 +2312,7 @@ dependencies = [ [[package]] name = "monero-serai" version = "0.1.4-alpha" -source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79" +source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66" dependencies = [ "curve25519-dalek", "hex-literal", @@ -2320,9 +2330,8 @@ dependencies = [ [[package]] name = "monero-simple-request-rpc" version = "0.1.0" -source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79" +source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66" dependencies = [ - "async-trait", "digest_auth", "hex", "monero-rpc", @@ -2340,6 +2349,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-traits" version = "0.2.19" @@ -2536,6 +2551,12 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.20" @@ -3087,7 +3108,7 @@ dependencies = [ [[package]] name = "simple-request" version = "0.1.0" -source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79" +source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66" dependencies = [ "http-body-util", "hyper", @@ -3153,12 +3174,18 @@ checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" [[package]] name = "std-shims" version = "0.1.1" -source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79" +source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66" dependencies = [ "hashbrown 0.14.5", "spin", ] +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "strum" version = "0.26.3" @@ -3295,6 +3322,37 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a693d0c8cf16973fac5a93fbe47b8c6452e7097d4fcac49f3d7a18e39c76e62e" +[[package]] +name = "time" +version = "0.3.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2834e6017e3e5e4b9834939793b282bc03b37a3336245fa820e35e233e2a85de" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinystr" version = "0.7.6" @@ -3388,6 +3446,8 @@ dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", + "hashbrown 0.14.5", "pin-project-lite", "slab", "tokio", @@ -3494,6 +3554,18 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" +dependencies = [ + "crossbeam-channel", + "thiserror", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.28" diff --git a/Cargo.toml b/Cargo.toml index 792bded5..d0ab5714 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,13 +55,14 @@ members = [ ] [profile.release] -panic = "abort" +panic = "abort" lto = true # Build with LTO strip = "none" # Keep panic stack traces codegen-units = 1 # Optimize for binary speed over compile times opt-level = 3 [profile.dev] +panic = "abort" lto = false strip = "none" # Not much slower compile times than opt-level 0, but much faster code. @@ -128,7 +129,7 @@ futures = { version = "0.3", default-features = false } hex = { version = "0.4", default-features = false } hex-literal = { version = "0.4", default-features = false } indexmap = { version = "2", default-features = false } -monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "d5205ce", default-features = false } +monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "e6fdef6", default-features = false } paste = { version = "1", default-features = false } pin-project = { version = "1", default-features = false } randomx-rs = { git = "https://github.com/Cuprate/randomx-rs.git", rev = "0028464", default-features = false } @@ -146,14 +147,15 @@ tokio-stream = { version = "0.1", default-features = false } tokio = { version = "1", default-features = false } tower = { git = "https://github.com/Cuprate/tower.git", rev = "6c7faf0", default-features = false } # toml = { version = "0.8", default-features = false } +tracing-appender = { version = "0.2", default-features = false } tracing-subscriber = { version = "0.3", default-features = false } tracing = { version = "0.1", default-features = false } ## workspace.dev-dependencies criterion = { version = "0.5" } function_name = { version = "0.3" } -monero-rpc = { git = "https://github.com/Cuprate/serai.git", rev = "d5205ce" } -monero-simple-request-rpc = { git = "https://github.com/Cuprate/serai.git", rev = "d5205ce" } +monero-rpc = { git = "https://github.com/Cuprate/serai.git", rev = "e6fdef6" } +monero-simple-request-rpc = { git = "https://github.com/Cuprate/serai.git", rev = "e6fdef6" } tempfile = { version = "3" } pretty_assertions = { version = "1" } proptest = { version = "1" } diff --git a/binaries/cuprated/Cargo.toml b/binaries/cuprated/Cargo.toml index acf8827b..b50091e7 100644 --- a/binaries/cuprated/Cargo.toml +++ b/binaries/cuprated/Cargo.toml @@ -14,7 +14,7 @@ cuprate-fast-sync = { workspace = true } cuprate-consensus-context = { workspace = true } cuprate-consensus-rules = { workspace = true } cuprate-cryptonight = { workspace = true } -cuprate-helper = { workspace = true, features = ["serde"] } +cuprate-helper = { workspace = true, features = ["std", "serde", "time"] } cuprate-epee-encoding = { workspace = true } cuprate-fixed-bytes = { workspace = true } cuprate-levin = { workspace = true } @@ -44,7 +44,7 @@ borsh = { workspace = true } bytemuck = { workspace = true } bytes = { workspace = true } cfg-if = { workspace = true } -clap = { workspace = true, features = ["cargo", "help", "wrap_help"] } +clap = { workspace = true, features = ["cargo", "help", "wrap_help", "usage", "error-context", "suggestions"] } chrono = { workspace = true } crypto-bigint = { workspace = true } crossbeam = { workspace = true } @@ -68,11 +68,12 @@ serde_json = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } thread_local = { workspace = true } -tokio-util = { workspace = true } +tokio-util = { workspace = true, features = ["rt"] } tokio-stream = { workspace = true } tokio = { workspace = true } toml = { workspace = true, features = ["parse", "display"]} tower = { workspace = true } +tracing-appender = { workspace = true } tracing-subscriber = { workspace = true, features = ["std", "fmt", "default"] } tracing = { workspace = true, features = ["default"] } diff --git a/binaries/cuprated/Cuprated.toml b/binaries/cuprated/Cuprated.toml index d248ce1f..5987813a 100644 --- a/binaries/cuprated/Cuprated.toml +++ b/binaries/cuprated/Cuprated.toml @@ -11,8 +11,10 @@ network = "Mainnet" ## Tracing config. [tracing] -## The minimum level for log events to be displayed. -level = "info" +## The stdout logging config. +stdout = { level = "info" } +## The file output logging config. +file = { level = "debug", max_log_files = 7 } ## Clear-net config. [p2p.clear_net] @@ -45,15 +47,10 @@ buffer_bytes = 50_000_000 ## The size of the queue of blocks which are waiting for a parent block to be downloaded (bytes). in_progress_queue_bytes = 50_000_000 ## The target size of a batch of blocks (bytes), must not exceed 100MB. -target_batch_bytes= 5_000_000 +target_batch_bytes = 10_000_000 ## The amount of time between checking the pool of connected peers for free peers to download blocks. check_client_pool_interval = { secs = 30, nanos = 0 } -## Storage config -[storage] -## The amount of reader threads to spawn. -reader_threads = "OnePerThread" - ## Txpool storage config. [storage.txpool] ## The database sync mode for the txpool. diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index c4b75e4e..63bbded3 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -25,6 +25,7 @@ mod manager; mod syncer; mod types; +pub use manager::init_blockchain_manager; pub use types::{ ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle, }; diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index 5d1cd2d6..382a0ff1 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -9,7 +9,7 @@ use rayon::prelude::*; use std::ops::ControlFlow; use std::{collections::HashMap, sync::Arc}; use tower::{Service, ServiceExt}; -use tracing::info; +use tracing::{info, instrument}; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_consensus::{ @@ -120,6 +120,15 @@ impl super::BlockchainManager { /// /// This function will panic if the batch is empty or if any internal service returns an unexpected /// error that we cannot recover from or if the incoming batch contains no blocks. + #[instrument( + name = "incoming_block_batch", + skip_all, + level = "info", + fields( + start_height = batch.blocks.first().unwrap().0.number().unwrap(), + len = batch.blocks.len() + ) + )] pub async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) { let (first_block, _) = batch .blocks @@ -146,11 +155,6 @@ impl super::BlockchainManager { /// This function will panic if any internal service returns an unexpected error that we cannot /// recover from or if the incoming batch contains no blocks. async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) { - info!( - "Handling batch to main chain height: {}", - batch.blocks.first().unwrap().0.number().unwrap() - ); - let batch_prep_res = self .block_verifier_service .ready() @@ -192,6 +196,7 @@ impl super::BlockchainManager { self.add_valid_block_to_main_chain(verified_block).await; } + info!("Successfully added block batch"); } /// Handles an incoming [`BlockBatch`] that does not follow the main-chain. diff --git a/binaries/cuprated/src/blockchain/syncer.rs b/binaries/cuprated/src/blockchain/syncer.rs index 69ad3303..7733ca3d 100644 --- a/binaries/cuprated/src/blockchain/syncer.rs +++ b/binaries/cuprated/src/blockchain/syncer.rs @@ -10,6 +10,7 @@ use tower::{Service, ServiceExt}; use tracing::instrument; use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse}; +use cuprate_consensus_context::RawBlockChainContext; use cuprate_p2p::{ block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}, NetworkInterface, PeerSetRequest, PeerSetResponse, @@ -73,20 +74,7 @@ where check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?; let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context(); - let PeerSetResponse::MostPoWSeen { - cumulative_difficulty, - .. - } = clearnet_interface - .peer_set() - .ready() - .await? - .call(PeerSetRequest::MostPoWSeen) - .await? - else { - unreachable!(); - }; - - if cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty { + if !check_behind_peers(raw_blockchain_context, &mut clearnet_interface).await? { continue; } @@ -99,11 +87,18 @@ where loop { tokio::select! { () = stop_current_block_downloader.notified() => { - tracing::info!("Stopping block downloader"); + tracing::info!("Received stop signal, stopping block downloader"); break; } batch = block_batch_stream.next() => { let Some(batch) = batch else { + check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?; + let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context(); + + if !check_behind_peers(raw_blockchain_context, &mut clearnet_interface).await? { + tracing::info!("Synchronised with the network."); + } + break; }; @@ -117,6 +112,31 @@ where } } +/// Returns `true` if we are behind the current connected network peers. +async fn check_behind_peers( + raw_blockchain_context: &RawBlockChainContext, + mut clearnet_interface: &mut NetworkInterface, +) -> Result { + let PeerSetResponse::MostPoWSeen { + cumulative_difficulty, + .. + } = clearnet_interface + .peer_set() + .ready() + .await? + .call(PeerSetRequest::MostPoWSeen) + .await? + else { + unreachable!(); + }; + + if cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty { + return Ok(false); + } + + Ok(true) +} + /// Checks if we should update the given [`BlockChainContext`] and updates it if needed. async fn check_update_blockchain_context( context_svc: C, diff --git a/binaries/cuprated/src/commands.rs b/binaries/cuprated/src/commands.rs new file mode 100644 index 00000000..5958036c --- /dev/null +++ b/binaries/cuprated/src/commands.rs @@ -0,0 +1,137 @@ +//! Commands +//! +//! `cuprated` [`Command`] definition and handling. +use std::{io, thread::sleep, time::Duration}; + +use clap::{builder::TypedValueParser, Parser, ValueEnum}; +use tokio::sync::mpsc; +use tower::{Service, ServiceExt}; +use tracing::level_filters::LevelFilter; + +use cuprate_consensus_context::{ + BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, +}; +use cuprate_helper::time::secs_to_hms; + +use crate::{ + constants::PANIC_CRITICAL_SERVICE_ERROR, + logging::{self, CupratedTracingFilter}, + statics, +}; + +/// A command received from [`io::stdin`]. +#[derive(Debug, Parser)] +#[command( + multicall = true, + subcommand_required = true, + rename_all = "snake_case", + help_template = "{all-args}", + arg_required_else_help = true, + disable_help_flag = true +)] +pub enum Command { + /// Change the log output. + #[command(arg_required_else_help = true)] + SetLog { + /// The minimum log level that will be displayed. + #[arg( + short, long, + value_parser = clap::builder::PossibleValuesParser::new(["off", "trace", "debug", "info", "warn", "error"]) + .map(|s| s.parse::().unwrap()), + )] + level: Option, + /// The logging output target to change. + #[arg(value_enum, default_value_t)] + output_target: OutputTarget, + }, + + /// Print status information on `cuprated`. + Status, +} + +/// The log output target. +#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] +pub enum OutputTarget { + /// The stdout logging output. + #[default] + Stdout, + /// The file appender logging output. + File, +} + +/// The [`Command`] listener loop. +pub fn command_listener(incoming_commands: mpsc::Sender) -> ! { + let mut stdin = io::stdin(); + let mut line = String::new(); + + loop { + line.clear(); + + if let Err(e) = stdin.read_line(&mut line) { + eprintln!("Failed to read from stdin: {e}"); + sleep(Duration::from_secs(1)); + continue; + } + + match Command::try_parse_from(line.split_whitespace()) { + Ok(command) => drop( + incoming_commands + .blocking_send(command) + .inspect_err(|err| eprintln!("Failed to send command: {err}")), + ), + Err(err) => err.print().unwrap(), + } + } +} + +/// The [`Command`] handler loop. +pub async fn io_loop( + mut incoming_commands: mpsc::Receiver, + mut context_service: BlockChainContextService, +) { + loop { + let Some(command) = incoming_commands.recv().await else { + tracing::warn!("Shutting down io_loop command channel closed."); + return; + }; + + match command { + Command::SetLog { + level, + output_target, + } => { + let modify_output = |filter: &mut CupratedTracingFilter| { + if let Some(level) = level { + filter.level = level; + } + println!("NEW LOG FILTER: {filter}"); + }; + + match output_target { + OutputTarget::File => logging::modify_file_output(modify_output), + OutputTarget::Stdout => logging::modify_stdout_output(modify_output), + } + } + Command::Status => { + let BlockChainContextResponse::Context(blockchain_context) = context_service + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockChainContextRequest::Context) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!(); + }; + let context = blockchain_context.unchecked_blockchain_context(); + let uptime = statics::START_INSTANT.elapsed().unwrap_or_default(); + + let (h, m, s) = secs_to_hms(uptime.as_secs()); + let height = context.chain_height; + let top_hash = hex::encode(context.top_hash); + + println!("STATUS:\n uptime: {h}h {m}m {s}s,\n height: {height},\n top_hash: {top_hash}"); + } + } + } +} diff --git a/binaries/cuprated/src/config.rs b/binaries/cuprated/src/config.rs index c6267a69..c218c1da 100644 --- a/binaries/cuprated/src/config.rs +++ b/binaries/cuprated/src/config.rs @@ -20,12 +20,16 @@ use cuprate_p2p_core::{ClearNet, ClearNetServerCfg}; mod args; mod fs; mod p2p; +mod rayon; mod storage; +mod tokio; mod tracing_config; -use crate::config::fs::FileSystemConfig; +use fs::FileSystemConfig; use p2p::P2PConfig; +use rayon::RayonConfig; use storage::StorageConfig; +use tokio::TokioConfig; use tracing_config::TracingConfig; /// Reads the args & config file, returning a [`Config`]. @@ -65,22 +69,26 @@ pub fn read_config_and_args() -> Config { } /// The config for all of Cuprate. -#[derive(Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, PartialEq)] #[serde(deny_unknown_fields, default)] pub struct Config { /// The network we should run on. network: Network, /// [`tracing`] config. - tracing: TracingConfig, + pub tracing: TracingConfig, + + pub tokio: TokioConfig, + + pub rayon: RayonConfig, /// The P2P network config. p2p: P2PConfig, /// The storage config. - storage: StorageConfig, + pub storage: StorageConfig, - fs: FileSystemConfig, + pub fs: FileSystemConfig, } impl Config { @@ -152,6 +160,18 @@ impl Config { .build() } + /// The [`cuprate_txpool`] config. + pub fn txpool_config(&self) -> cuprate_txpool::config::Config { + let txpool = &self.storage.txpool; + + // We don't set reader threads as we manually make the reader threadpool. + cuprate_txpool::config::ConfigBuilder::default() + .network(self.network) + .data_directory(self.fs.data_directory.clone()) + .sync_mode(txpool.shared.sync_mode) + .build() + } + /// The [`BlockDownloaderConfig`]. pub fn block_downloader_config(&self) -> BlockDownloaderConfig { self.p2p.block_downloader.clone().into() diff --git a/binaries/cuprated/src/config/fs.rs b/binaries/cuprated/src/config/fs.rs index f8f61307..85e70f46 100644 --- a/binaries/cuprated/src/config/fs.rs +++ b/binaries/cuprated/src/config/fs.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use cuprate_helper::fs::{CUPRATE_CACHE_DIR, CUPRATE_DATA_DIR}; -#[derive(Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] #[serde(deny_unknown_fields, default)] pub struct FileSystemConfig { pub data_directory: PathBuf, diff --git a/binaries/cuprated/src/config/p2p.rs b/binaries/cuprated/src/config/p2p.rs index 51f8d0d6..da9c7bc2 100644 --- a/binaries/cuprated/src/config/p2p.rs +++ b/binaries/cuprated/src/config/p2p.rs @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; use cuprate_helper::{fs::address_book_path, network::Network}; /// P2P config. -#[derive(Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, PartialEq)] #[serde(deny_unknown_fields, default)] pub struct P2PConfig { /// Clear-net config. @@ -18,7 +18,7 @@ pub struct P2PConfig { pub block_downloader: BlockDownloaderConfig, } -#[derive(Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize, Eq, PartialEq)] #[serde(deny_unknown_fields, default)] pub struct BlockDownloaderConfig { /// The size in bytes of the buffer between the block downloader and the place which @@ -50,13 +50,13 @@ impl Default for BlockDownloaderConfig { buffer_bytes: 50_000_000, in_progress_queue_bytes: 50_000_000, check_client_pool_interval: Duration::from_secs(30), - target_batch_bytes: 5_000_000, + target_batch_bytes: 10_000_000, } } } /// The config values for P2P clear-net. -#[derive(Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, PartialEq)] #[serde(deny_unknown_fields, default)] pub struct ClearNetConfig { /// The server config. @@ -75,7 +75,7 @@ impl Default for ClearNetConfig { } /// Network config values shared between all network zones. -#[derive(Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, PartialEq)] #[serde(deny_unknown_fields, default)] pub struct SharedNetConfig { /// The number of outbound connections to make and try keep. @@ -121,7 +121,7 @@ impl Default for SharedNetConfig { } } -#[derive(Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] #[serde(deny_unknown_fields, default)] pub struct AddressBookConfig { max_white_list_length: usize, @@ -134,7 +134,7 @@ impl Default for AddressBookConfig { Self { max_white_list_length: 1_000, max_gray_list_length: 5_000, - peer_save_period: Duration::from_secs(30), + peer_save_period: Duration::from_secs(90), } } } diff --git a/binaries/cuprated/src/config/rayon.rs b/binaries/cuprated/src/config/rayon.rs new file mode 100644 index 00000000..5e10efaa --- /dev/null +++ b/binaries/cuprated/src/config/rayon.rs @@ -0,0 +1,17 @@ +use serde::{Deserialize, Serialize}; + +/// The [`rayon`] config. +#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] +#[serde(deny_unknown_fields, default)] +pub struct RayonConfig { + /// The number of threads to use for the [`rayon::ThreadPool`]. + pub threads: usize, +} + +impl Default for RayonConfig { + fn default() -> Self { + Self { + threads: cuprate_helper::thread::threads_75().get(), + } + } +} diff --git a/binaries/cuprated/src/config/storage.rs b/binaries/cuprated/src/config/storage.rs index b3e3c1f4..53dce35e 100644 --- a/binaries/cuprated/src/config/storage.rs +++ b/binaries/cuprated/src/config/storage.rs @@ -7,19 +7,29 @@ use cuprate_database_service::ReaderThreads; use cuprate_helper::fs::CUPRATE_DATA_DIR; /// The storage config. -#[derive(Default, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] #[serde(deny_unknown_fields, default)] pub struct StorageConfig { /// The amount of reader threads to spawn between the tx-pool and blockchain. - pub reader_threads: ReaderThreads, + pub reader_threads: usize, /// The tx-pool config. pub txpool: TxpoolConfig, /// The blockchain config. pub blockchain: BlockchainConfig, } +impl Default for StorageConfig { + fn default() -> Self { + Self { + reader_threads: cuprate_helper::thread::threads_25().get(), + txpool: Default::default(), + blockchain: Default::default(), + } + } +} + /// The blockchain config. -#[derive(Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] #[serde(deny_unknown_fields, default)] pub struct BlockchainConfig { #[serde(flatten)] @@ -37,7 +47,7 @@ impl Default for BlockchainConfig { } /// The tx-pool config. -#[derive(Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] #[serde(deny_unknown_fields, default)] pub struct TxpoolConfig { #[serde(flatten)] @@ -59,7 +69,7 @@ impl Default for TxpoolConfig { } /// Config values shared between the tx-pool and blockchain. -#[derive(Default, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, PartialEq, Eq)] #[serde(deny_unknown_fields, default)] pub struct SharedStorageConfig { /// The [`SyncMode`] of the database. diff --git a/binaries/cuprated/src/config/tokio.rs b/binaries/cuprated/src/config/tokio.rs new file mode 100644 index 00000000..9f106018 --- /dev/null +++ b/binaries/cuprated/src/config/tokio.rs @@ -0,0 +1,17 @@ +use serde::{Deserialize, Serialize}; + +/// [`tokio`] config. +#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] +#[serde(deny_unknown_fields, default)] +pub struct TokioConfig { + /// The amount of threads to spawn for the async thread-pool + pub threads: usize, +} + +impl Default for TokioConfig { + fn default() -> Self { + Self { + threads: cuprate_helper::thread::threads_75().get(), + } + } +} diff --git a/binaries/cuprated/src/config/tracing_config.rs b/binaries/cuprated/src/config/tracing_config.rs index 859d516a..26089512 100644 --- a/binaries/cuprated/src/config/tracing_config.rs +++ b/binaries/cuprated/src/config/tracing_config.rs @@ -2,15 +2,22 @@ use serde::{Deserialize, Serialize}; use tracing::level_filters::LevelFilter; /// [`tracing`] config. -#[derive(Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize, Eq, PartialEq)] #[serde(deny_unknown_fields, default)] pub struct TracingConfig { - /// The default minimum log level. - #[serde(with = "level_filter_serde")] - level: LevelFilter, + pub stdout: StdoutTracingConfig, + pub file: FileTracingConfig, } -impl Default for TracingConfig { +#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] +#[serde(deny_unknown_fields, default)] +pub struct StdoutTracingConfig { + /// The default minimum log level. + #[serde(with = "level_filter_serde")] + pub level: LevelFilter, +} + +impl Default for StdoutTracingConfig { fn default() -> Self { Self { level: LevelFilter::INFO, @@ -18,6 +25,26 @@ impl Default for TracingConfig { } } +#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] +#[serde(deny_unknown_fields, default)] +pub struct FileTracingConfig { + /// The default minimum log level. + #[serde(with = "level_filter_serde")] + pub level: LevelFilter, + /// The maximum amount of log files to keep, once this number is passed the oldest file + /// will be deleted. + pub max_log_files: usize, +} + +impl Default for FileTracingConfig { + fn default() -> Self { + Self { + level: LevelFilter::DEBUG, + max_log_files: 7, + } + } +} + mod level_filter_serde { use std::str::FromStr; diff --git a/binaries/cuprated/src/constants.rs b/binaries/cuprated/src/constants.rs index 057e8bd0..24d7273a 100644 --- a/binaries/cuprated/src/constants.rs +++ b/binaries/cuprated/src/constants.rs @@ -42,5 +42,7 @@ mod test { #[test] fn generate_config_text_is_valid() { let config: Config = toml::from_str(EXAMPLE_CONFIG).unwrap(); + + assert_eq!(config, Config::default()); } } diff --git a/binaries/cuprated/src/logging.rs b/binaries/cuprated/src/logging.rs new file mode 100644 index 00000000..72bac7fb --- /dev/null +++ b/binaries/cuprated/src/logging.rs @@ -0,0 +1,142 @@ +//! Logging +//! +//! `cuprated` log filtering settings and related functionality. +use std::ops::BitAnd; +use std::{ + fmt::{Display, Formatter}, + mem::forget, + sync::OnceLock, +}; +use tracing::{ + instrument::WithSubscriber, level_filters::LevelFilter, subscriber::Interest, Metadata, +}; +use tracing_appender::{non_blocking::NonBlocking, rolling::Rotation}; +use tracing_subscriber::{ + filter::Filtered, + fmt::{ + self, + format::{DefaultFields, Format}, + Layer as FmtLayer, + }, + layer::{Context, Filter, Layered, SubscriberExt}, + reload::{Handle, Layer as ReloadLayer}, + util::SubscriberInitExt, + Layer, Registry, +}; + +use cuprate_helper::fs::logs_path; + +use crate::config::Config; + +/// A [`OnceLock`] which holds the [`Handle`] to update the file logging output. +/// +/// Initialized in [`init_logging`]. +static FILE_WRITER_FILTER_HANDLE: OnceLock> = + OnceLock::new(); + +/// A [`OnceLock`] which holds the [`Handle`] to update the stdout logging output. +/// +/// Initialized in [`init_logging`]. +#[expect(clippy::type_complexity)] // factoring out isn't going to help readability. +static STDOUT_FILTER_HANDLE: OnceLock< + Handle< + CupratedTracingFilter, + Layered< + Filtered< + FmtLayer, + ReloadLayer, + Registry, + >, + Registry, + Registry, + >, + >, +> = OnceLock::new(); + +/// The [`Filter`] used to alter cuprated's log output. +#[derive(Debug)] +pub struct CupratedTracingFilter { + pub level: LevelFilter, +} + +// Custom display behavior for command output. +impl Display for CupratedTracingFilter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Filter") + .field("minimum_level", &self.level.to_string()) + .finish() + } +} + +impl Filter for CupratedTracingFilter { + fn enabled(&self, meta: &Metadata<'_>, cx: &Context<'_, S>) -> bool { + Filter::::enabled(&self.level, meta, cx) + } + + fn callsite_enabled(&self, meta: &'static Metadata<'static>) -> Interest { + Filter::::callsite_enabled(&self.level, meta) + } + + fn max_level_hint(&self) -> Option { + Some(self.level) + } +} + +/// Initialize [`tracing`] for logging to stdout and to a file. +pub fn init_logging(config: &Config) { + // initialize the stdout filter, set `STDOUT_FILTER_HANDLE` and create the layer. + let (stdout_filter, stdout_handle) = ReloadLayer::new(CupratedTracingFilter { + level: config.tracing.stdout.level, + }); + + STDOUT_FILTER_HANDLE.set(stdout_handle).unwrap(); + + let stdout_layer = FmtLayer::default() + .with_target(false) + .with_filter(stdout_filter); + + // create the tracing appender. + let appender_config = &config.tracing.file; + let (appender, guard) = tracing_appender::non_blocking( + tracing_appender::rolling::Builder::new() + .rotation(Rotation::DAILY) + .max_log_files(appender_config.max_log_files) + .build(logs_path(&config.fs.data_directory, config.network())) + .unwrap(), + ); + + // TODO: drop this when we shutdown. + forget(guard); + + // initialize the appender filter, set `FILE_WRITER_FILTER_HANDLE` and create the layer. + let (appender_filter, appender_handle) = ReloadLayer::new(CupratedTracingFilter { + level: appender_config.level, + }); + FILE_WRITER_FILTER_HANDLE.set(appender_handle).unwrap(); + + let appender_layer = fmt::layer() + .with_target(false) + .with_ansi(false) + .with_writer(appender) + .with_filter(appender_filter); + + // initialize tracing with the 2 layers. + tracing_subscriber::registry() + .with(appender_layer) + .with(stdout_layer) + .init(); +} + +/// Modify the stdout [`CupratedTracingFilter`]. +/// +/// Must only be called after [`init_logging`]. +pub fn modify_stdout_output(f: impl FnOnce(&mut CupratedTracingFilter)) { + STDOUT_FILTER_HANDLE.get().unwrap().modify(f).unwrap(); +} + +/// Modify the file appender [`CupratedTracingFilter`]. +/// +/// Must only be called after [`init_logging`]. +pub fn modify_file_output(f: impl FnOnce(&mut CupratedTracingFilter)) { + FILE_WRITER_FILTER_HANDLE.get().unwrap().modify(f).unwrap(); +} diff --git a/binaries/cuprated/src/main.rs b/binaries/cuprated/src/main.rs index 617c5b61..d5f9ec0e 100644 --- a/binaries/cuprated/src/main.rs +++ b/binaries/cuprated/src/main.rs @@ -16,9 +16,27 @@ reason = "TODO: remove after v1.0.0" )] +use std::mem; +use std::sync::Arc; +use tokio::sync::mpsc; +use tower::{Service, ServiceExt}; +use tracing::level_filters::LevelFilter; +use tracing_subscriber::{layer::SubscriberExt, reload::Handle, util::SubscriberInitExt, Registry}; + +use cuprate_consensus_context::{ + BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, +}; +use cuprate_helper::time::secs_to_hms; + +use crate::{ + config::Config, constants::PANIC_CRITICAL_SERVICE_ERROR, logging::CupratedTracingFilter, +}; + mod blockchain; +mod commands; mod config; mod constants; +mod logging; mod p2p; mod rpc; mod signals; @@ -29,8 +47,109 @@ fn main() { // Initialize global static `LazyLock` data. statics::init_lazylock_statics(); - let _config = config::read_config_and_args(); + let config = config::read_config_and_args(); - // TODO: everything else. - todo!() + // Initialize logging. + logging::init_logging(&config); + + // Initialize the thread-pools + + init_global_rayon_pool(&config); + + let rt = init_tokio_rt(&config); + + let db_thread_pool = cuprate_database_service::init_thread_pool( + cuprate_database_service::ReaderThreads::Number(config.storage.reader_threads), + ); + + // Start the blockchain & tx-pool databases. + + let (mut blockchain_read_handle, mut blockchain_write_handle, _) = + cuprate_blockchain::service::init_with_pool( + config.blockchain_config(), + Arc::clone(&db_thread_pool), + ) + .unwrap(); + let (txpool_read_handle, txpool_write_handle, _) = + cuprate_txpool::service::init_with_pool(config.txpool_config(), db_thread_pool).unwrap(); + + // Initialize async tasks. + + rt.block_on(async move { + // Check add the genesis block to the blockchain. + blockchain::check_add_genesis( + &mut blockchain_read_handle, + &mut blockchain_write_handle, + config.network(), + ) + .await; + + // Start the context service and the block/tx verifier. + let (block_verifier, tx_verifier, context_svc) = + blockchain::init_consensus(blockchain_read_handle.clone(), config.context_config()) + .await + .unwrap(); + + // Start clearnet P2P. + let (clearnet, incoming_tx_handler_tx) = p2p::start_clearnet_p2p( + blockchain_read_handle.clone(), + context_svc.clone(), + txpool_read_handle.clone(), + config.clearnet_p2p_config(), + ) + .await + .unwrap(); + + // Create the incoming tx handler service. + let tx_handler = txpool::IncomingTxHandler::init( + clearnet.clone(), + txpool_write_handle.clone(), + txpool_read_handle, + context_svc.clone(), + tx_verifier, + ); + if incoming_tx_handler_tx.send(tx_handler).is_err() { + unreachable!() + } + + // Initialize the blockchain manager. + blockchain::init_blockchain_manager( + clearnet, + blockchain_write_handle, + blockchain_read_handle, + txpool_write_handle, + context_svc.clone(), + block_verifier, + config.block_downloader_config(), + ) + .await; + + // Start the command listener. + let (command_tx, command_rx) = mpsc::channel(1); + std::thread::spawn(|| commands::command_listener(command_tx)); + + // Wait on the io_loop, spawned on a separate task as this improves performance. + tokio::spawn(commands::io_loop(command_rx, context_svc)) + .await + .unwrap(); + }); +} + +/// Initialize the [`tokio`] runtime. +fn init_tokio_rt(config: &Config) -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(config.tokio.threads) + .thread_name("cuprated-tokio") + .enable_all() + .build() + .unwrap() +} + +/// Initialize the global [`rayon`] thread-pool. +fn init_global_rayon_pool(config: &Config) { + rayon::ThreadPoolBuilder::new() + .num_threads(config.rayon.threads) + .thread_name(|index| format!("cuprated-rayon-{index}")) + .build_global() + .unwrap(); } diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 7d72fa37..620c9df3 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -238,7 +238,7 @@ async fn get_chain( split_u128_into_low_high_bits(cumulative_difficulty); Ok(ProtocolResponse::GetChain(ChainResponse { - start_height: usize_to_u64(std::num::NonZero::get(start_height)), + start_height: usize_to_u64(start_height), total_height: usize_to_u64(chain_height), cumulative_difficulty_low64, cumulative_difficulty_top64, diff --git a/binaries/cuprated/src/txpool/dandelion/stem_service.rs b/binaries/cuprated/src/txpool/dandelion/stem_service.rs index 2debfd42..1a288f3d 100644 --- a/binaries/cuprated/src/txpool/dandelion/stem_service.rs +++ b/binaries/cuprated/src/txpool/dandelion/stem_service.rs @@ -53,6 +53,8 @@ impl Stream for OutboundPeerStream { OutboundPeerStreamState::AwaitingPeer(fut) => { let res = ready!(fut.poll_unpin(cx)); + self.state = OutboundPeerStreamState::Standby; + return Poll::Ready(Some(res.map(|res| { let PeerSetResponse::StemPeer(stem_peer) = res else { unreachable!() diff --git a/helper/src/fs.rs b/helper/src/fs.rs index f694f62d..3eb168e3 100644 --- a/helper/src/fs.rs +++ b/helper/src/fs.rs @@ -200,6 +200,21 @@ pub fn txpool_path(data_dir: &Path, network: Network) -> PathBuf { path_with_network(data_dir, network).join("txpool") } +/// Cuprate's logs directory. +/// +/// This is the PATH used for all Cuprate log files. +/// +/// ```rust +/// use cuprate_helper::{network::Network, fs::{CUPRATE_DATA_DIR, logs_path}}; +/// +/// assert_eq!(logs_path(&**CUPRATE_DATA_DIR, Network::Mainnet).as_path(), CUPRATE_DATA_DIR.join("logs")); +/// assert_eq!(logs_path(&**CUPRATE_DATA_DIR, Network::Stagenet).as_path(), CUPRATE_DATA_DIR.join(Network::Stagenet.to_string()).join("logs")); +/// assert_eq!(logs_path(&**CUPRATE_DATA_DIR, Network::Testnet).as_path(), CUPRATE_DATA_DIR.join(Network::Testnet.to_string()).join("logs")); +/// ``` +pub fn logs_path(data_dir: &Path, network: Network) -> PathBuf { + path_with_network(data_dir, network).join("logs") +} + /// Cuprate's address-book directory. /// /// This is the PATH used for any Cuprate address-book files. diff --git a/helper/src/thread.rs b/helper/src/thread.rs index 8ba025de..45890d9c 100644 --- a/helper/src/thread.rs +++ b/helper/src/thread.rs @@ -36,21 +36,21 @@ macro_rules! impl_thread_percent { clippy::cast_precision_loss, reason = "we need to round integers" )] - NonZeroUsize::new(max(1, (threads().get() as f64 * $percent).floor() as usize)).unwrap() + NonZeroUsize::new(max(1, (threads().get() as f64 * $percent).ceil() as usize)).unwrap() } )* } } impl_thread_percent! { - /// Get 90% (rounded down) of available amount of system threads. + /// Get 90% (rounded up) of available amount of system threads. threads_90 => 0.90, - /// Get 75% (rounded down) of available amount of system threads. + /// Get 75% (rounded up) of available amount of system threads. threads_75 => 0.75, - /// Get 50% (rounded down) of available amount of system threads. + /// Get 50% (rounded up) of available amount of system threads. threads_50 => 0.50, - /// Get 25% (rounded down) of available amount of system threads. + /// Get 25% (rounded up) of available amount of system threads. threads_25 => 0.25, - /// Get 10% (rounded down) of available amount of system threads. + /// Get 10% (rounded up) of available amount of system threads. threads_10 => 0.10, } diff --git a/p2p/address-book/src/book.rs b/p2p/address-book/src/book.rs index 3e5269f5..66811dbf 100644 --- a/p2p/address-book/src/book.rs +++ b/p2p/address-book/src/book.rs @@ -302,9 +302,6 @@ impl AddressBook { if peb.pruning_seed != peer.pruning_seed { return Err(AddressBookError::PeersDataChanged("Pruning seed")); } - if Z::CHECK_NODE_ID && peb.id != peer.id { - return Err(AddressBookError::PeersDataChanged("peer ID")); - } // TODO: cuprate doesn't need last seen timestamps but should we have them anyway? peb.last_seen = 0; peb.rpc_port = peer.rpc_port; diff --git a/p2p/address-book/src/store.rs b/p2p/address-book/src/store.rs index 47994ae5..a1233162 100644 --- a/p2p/address-book/src/store.rs +++ b/p2p/address-book/src/store.rs @@ -39,10 +39,13 @@ pub(crate) fn save_peers_to_disk( }) .unwrap(); - let file = cfg - .peer_store_directory - .join(format!("{}_p2p_state", Z::NAME)); - spawn_blocking(move || fs::write(&file, &data)) + let dir = cfg.peer_store_directory.clone(); + let file = dir.join(Z::NAME); + + spawn_blocking(move || { + fs::create_dir_all(dir)?; + fs::write(&file, &data) + }) } pub(crate) async fn read_peers_from_disk( @@ -54,9 +57,7 @@ pub(crate) async fn read_peers_from_disk( ), std::io::Error, > { - let file = cfg - .peer_store_directory - .join(format!("{}_p2p_state", Z::NAME)); + let file = cfg.peer_store_directory.join(Z::NAME); tracing::info!("Loading peers from file: {} ", file.display()); diff --git a/p2p/p2p/src/block_downloader/download_batch.rs b/p2p/p2p/src/block_downloader/download_batch.rs index 7b6e4c96..dd952185 100644 --- a/p2p/p2p/src/block_downloader/download_batch.rs +++ b/p2p/p2p/src/block_downloader/download_batch.rs @@ -2,7 +2,6 @@ use std::collections::HashSet; use monero_serai::{block::Block, transaction::Transaction}; use rayon::prelude::*; -use tokio::time::timeout; use tower::{Service, ServiceExt}; use tracing::instrument; @@ -16,7 +15,7 @@ use cuprate_wire::protocol::{GetObjectsRequest, GetObjectsResponse}; use crate::{ block_downloader::{BlockBatch, BlockDownloadError, BlockDownloadTaskResponse}, - constants::{BLOCK_DOWNLOADER_REQUEST_TIMEOUT, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN}, + constants::{MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN}, peer_set::ClientDropGuard, }; @@ -60,17 +59,15 @@ async fn request_batch_from_peer( })); // Request the blocks and add a timeout to the request - let blocks_response = timeout(BLOCK_DOWNLOADER_REQUEST_TIMEOUT, async { + let blocks_response = { let PeerResponse::Protocol(ProtocolResponse::GetObjects(blocks_response)) = client.ready().await?.call(request).await? else { panic!("Connection task returned wrong response."); }; - Ok::<_, BlockDownloadError>(blocks_response) - }) - .await - .map_err(|_| BlockDownloadError::TimedOut)??; + blocks_response + }; // Initial sanity checks if blocks_response.blocks.len() > ids.len() { diff --git a/p2p/p2p/src/inbound_server.rs b/p2p/p2p/src/inbound_server.rs index 0479560b..a6ed9b40 100644 --- a/p2p/p2p/src/inbound_server.rs +++ b/p2p/p2p/src/inbound_server.rs @@ -115,8 +115,11 @@ where tokio::spawn( async move { let client = timeout(HANDSHAKE_TIMEOUT, fut).await; - if let Ok(Ok(peer)) = client { - drop(new_connection_tx.send(peer).await); + + match client { + Ok(Ok(peer)) => drop(new_connection_tx.send(peer).await), + Err(_) => tracing::debug!("Timed out"), + Ok(Err(e)) => tracing::debug!("error: {e:?}"), } } .instrument(Span::current()), diff --git a/storage/blockchain/src/service/free.rs b/storage/blockchain/src/service/free.rs index 7cc8da8a..8d26207b 100644 --- a/storage/blockchain/src/service/free.rs +++ b/storage/blockchain/src/service/free.rs @@ -3,13 +3,15 @@ //---------------------------------------------------------------------------------------------------- Import use std::sync::Arc; +use rayon::ThreadPool; + use cuprate_database::{ConcreteEnv, InitError}; use cuprate_types::{AltBlockInformation, VerifiedBlockInformation}; use crate::{ config::Config, service::{ - init_read_service, init_write_service, + init_read_service, init_read_service_with_pool, init_write_service, types::{BlockchainReadHandle, BlockchainWriteHandle}, }, }; @@ -46,6 +48,39 @@ pub fn init( Ok((readers, writer, db)) } +#[cold] +#[inline(never)] // Only called once (?) +/// Initialize a database, and return a read/write handle to it. +/// +/// Unlike [`init`] this will not create a thread-pool, instead using +/// the one passed in. +/// +/// Once the returned handles are [`Drop::drop`]ed, the reader +/// thread-pool and writer thread will exit automatically. +/// +/// # Errors +/// This will forward the error if [`crate::open`] failed. +pub fn init_with_pool( + config: Config, + pool: Arc, +) -> Result< + ( + BlockchainReadHandle, + BlockchainWriteHandle, + Arc, + ), + InitError, +> { + // Initialize the database itself. + let db = Arc::new(crate::open(config)?); + + // Spawn the Reader thread pool and Writer. + let readers = init_read_service_with_pool(Arc::clone(&db), pool); + let writer = init_write_service(Arc::clone(&db)); + + Ok((readers, writer, db)) +} + //---------------------------------------------------------------------------------------------------- Compact history /// Given a position in the compact history, returns the height offset that should be in that position. /// diff --git a/storage/blockchain/src/service/mod.rs b/storage/blockchain/src/service/mod.rs index d6a811bd..abe66f27 100644 --- a/storage/blockchain/src/service/mod.rs +++ b/storage/blockchain/src/service/mod.rs @@ -128,7 +128,7 @@ mod write; pub use write::init_write_service; mod free; -pub use free::init; +pub use free::{init, init_with_pool}; mod types; pub use types::{BlockchainReadHandle, BlockchainWriteHandle}; diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index 863f9ab0..45c5aa6f 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -631,7 +631,7 @@ fn next_chain_entry( }; Ok(BlockchainResponse::NextChainEntry { - start_height: std::num::NonZero::new(first_known_height), + start_height: Some(first_known_height), chain_height, block_ids, block_weights, diff --git a/storage/txpool/src/service.rs b/storage/txpool/src/service.rs index 03ce2f03..08981a40 100644 --- a/storage/txpool/src/service.rs +++ b/storage/txpool/src/service.rs @@ -128,5 +128,5 @@ mod read; mod types; mod write; -pub use free::init; +pub use free::{init, init_with_pool}; pub use types::{TxpoolReadHandle, TxpoolWriteHandle}; diff --git a/storage/txpool/src/service/free.rs b/storage/txpool/src/service/free.rs index 003da552..1bb15cd0 100644 --- a/storage/txpool/src/service/free.rs +++ b/storage/txpool/src/service/free.rs @@ -1,10 +1,12 @@ use std::sync::Arc; +use rayon::ThreadPool; + use cuprate_database::{ConcreteEnv, InitError}; use crate::{ service::{ - read::init_read_service, + read::{init_read_service, init_read_service_with_pool}, types::{TxpoolReadHandle, TxpoolWriteHandle}, write::init_write_service, }, @@ -35,3 +37,29 @@ pub fn init( Ok((readers, writer, db)) } + +#[cold] +#[inline(never)] // Only called once (?) +/// Initialize a database, and return a read/write handle to it. +/// +/// Unlike [`init`] this will not create a thread-pool, instead using +/// the one passed in. +/// +/// Once the returned handles are [`Drop::drop`]ed, the reader +/// thread-pool and writer thread will exit automatically. +/// +/// # Errors +/// This will forward the error if [`crate::open`] failed. +pub fn init_with_pool( + config: Config, + pool: Arc, +) -> Result<(TxpoolReadHandle, TxpoolWriteHandle, Arc), InitError> { + // Initialize the database itself. + let db = Arc::new(crate::open(config)?); + + // Spawn the Reader thread pool and Writer. + let readers = init_read_service_with_pool(Arc::clone(&db), pool); + let writer = init_write_service(Arc::clone(&db)); + + Ok((readers, writer, db)) +} diff --git a/storage/txpool/src/service/read.rs b/storage/txpool/src/service/read.rs index 44a29b3c..5fc20dd1 100644 --- a/storage/txpool/src/service/read.rs +++ b/storage/txpool/src/service/read.rs @@ -44,7 +44,10 @@ pub(super) fn init_read_service(env: Arc, threads: ReaderThreads) - /// Should be called _once_ per actual database. #[cold] #[inline(never)] // Only called once. -fn init_read_service_with_pool(env: Arc, pool: Arc) -> TxpoolReadHandle { +pub(super) fn init_read_service_with_pool( + env: Arc, + pool: Arc, +) -> TxpoolReadHandle { DatabaseReadService::new(env, pool, map_request) } diff --git a/types/src/blockchain.rs b/types/src/blockchain.rs index 7518935d..58a23b2a 100644 --- a/types/src/blockchain.rs +++ b/types/src/blockchain.rs @@ -288,7 +288,7 @@ pub enum BlockchainResponse { /// If all blocks were unknown `start_height` will be [`None`], the other fields will be meaningless. NextChainEntry { /// The start height of this entry, [`None`] if we could not find the split point. - start_height: Option>, + start_height: Option, /// The current chain height. chain_height: usize, /// The next block hashes in the entry.