From ed822eb165c30fbceea6c9b07b571313ed8ba866 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sat, 21 Dec 2024 03:34:04 +0000 Subject: [PATCH] cleanup main + logging --- binaries/cuprated/src/config.rs | 2 +- binaries/cuprated/src/config/p2p.rs | 2 +- binaries/cuprated/src/config/storage.rs | 5 +- binaries/cuprated/src/logging.rs | 71 ++++++++++++++++--------- binaries/cuprated/src/main.rs | 44 ++++++++++----- p2p/p2p/src/inbound_server.rs | 2 +- 6 files changed, 82 insertions(+), 44 deletions(-) diff --git a/binaries/cuprated/src/config.rs b/binaries/cuprated/src/config.rs index f5343597..f3f55bf5 100644 --- a/binaries/cuprated/src/config.rs +++ b/binaries/cuprated/src/config.rs @@ -86,7 +86,7 @@ pub struct Config { p2p: P2PConfig, /// The storage config. - storage: StorageConfig, + pub storage: StorageConfig, pub fs: FileSystemConfig, } diff --git a/binaries/cuprated/src/config/p2p.rs b/binaries/cuprated/src/config/p2p.rs index cc6b5ab9..47dd2654 100644 --- a/binaries/cuprated/src/config/p2p.rs +++ b/binaries/cuprated/src/config/p2p.rs @@ -56,7 +56,7 @@ impl Default for BlockDownloaderConfig { } /// The config values for P2P clear-net. -#[derive(Debug,Deserialize, Serialize, PartialEq)] +#[derive(Debug, Deserialize, Serialize, PartialEq)] #[serde(deny_unknown_fields, default)] pub struct ClearNetConfig { /// The server config. diff --git a/binaries/cuprated/src/config/storage.rs b/binaries/cuprated/src/config/storage.rs index cd330804..b09dd142 100644 --- a/binaries/cuprated/src/config/storage.rs +++ b/binaries/cuprated/src/config/storage.rs @@ -21,7 +21,10 @@ pub struct StorageConfig { impl Default for StorageConfig { fn default() -> Self { Self { - reader_threads: std::thread::available_parallelism().unwrap().get().div_ceil(4), + reader_threads: std::thread::available_parallelism() + .unwrap() + .get() + .div_ceil(4), txpool: Default::default(), blockchain: Default::default(), } diff --git a/binaries/cuprated/src/logging.rs b/binaries/cuprated/src/logging.rs index b91bfad8..12079bc7 100644 --- a/binaries/cuprated/src/logging.rs +++ b/binaries/cuprated/src/logging.rs @@ -1,28 +1,35 @@ -use crate::config::Config; -use cuprate_helper::fs::logs_path; -use std::mem::forget; -use std::sync::OnceLock; -use tracing::instrument::WithSubscriber; -use tracing::level_filters::LevelFilter; -use tracing::subscriber::Interest; -use tracing::Metadata; -use tracing_appender::non_blocking::NonBlocking; -use tracing_appender::rolling::Rotation; -use tracing_subscriber::filter::Filtered; -use tracing_subscriber::fmt::format::{DefaultFields, Format}; -use tracing_subscriber::layer::{Context, Layered, SubscriberExt}; -use tracing_subscriber::util::SubscriberInitExt; -use tracing_subscriber::{ - fmt::Layer as FmtLayer, - layer::Filter, - reload::{Handle, Layer as ReloadLayer}, - Layer, -}; -use tracing_subscriber::{reload, Registry}; +use std::{mem::forget, sync::OnceLock}; +use cuprate_helper::fs::logs_path; + +use tracing::{ + instrument::WithSubscriber, level_filters::LevelFilter, subscriber::Interest, Metadata, +}; +use tracing_appender::{non_blocking::NonBlocking, rolling::Rotation}; +use tracing_subscriber::{ + filter::Filtered, + fmt::{ + self, + format::{DefaultFields, Format}, + Layer as FmtLayer, + }, + layer::{Context, Filter, Layered, SubscriberExt}, + reload::{Handle, Layer as ReloadLayer}, + util::SubscriberInitExt, + Layer, Registry, +}; + +use crate::config::Config; + +/// A [`OnceLock`] which holds the [`Handle`] to update the file logging output. +/// +/// Initialized in [`init_logging`]. static FILE_WRITER_FILTER_HANDLE: OnceLock> = OnceLock::new(); +/// A [`OnceLock`] which holds the [`Handle`] to update the stdout logging output. +/// +/// Initialized in [`init_logging`]. static STDOUT_FILTER_HANDLE: OnceLock< Handle< CupratedTracingFilter, @@ -38,6 +45,7 @@ static STDOUT_FILTER_HANDLE: OnceLock< >, > = OnceLock::new(); +/// The [`Filter`] used to alter cuprated's log output. pub struct CupratedTracingFilter { pub level: LevelFilter, } @@ -56,18 +64,20 @@ impl Filter for CupratedTracingFilter { } } +/// Initialize [`tracing`] for logging to stdout and to a file. pub fn init_logging(config: &Config) { - use tracing_subscriber::{fmt, Layer}; - - let (stdout_filter, stdout_handle) = reload::Layer::new(CupratedTracingFilter { + // initialize the stdout filter, set `STDOUT_FILTER_HANDLE` and create the layer. + let (stdout_filter, stdout_handle) = ReloadLayer::new(CupratedTracingFilter { level: config.tracing.stdout.level, }); + drop(STDOUT_FILTER_HANDLE.set(stdout_handle)); - let stdout_layer = fmt::Layer::default() + let stdout_layer = FmtLayer::default() .with_target(false) .with_filter(stdout_filter); + // create the tracing appender. let appender_config = &config.tracing.file; let (appender, guard) = tracing_appender::non_blocking( tracing_appender::rolling::Builder::new() @@ -77,9 +87,11 @@ pub fn init_logging(config: &Config) { .unwrap(), ); + // TODO: drop this when we shutdown. forget(guard); - let (appender_filter, appender_handle) = reload::Layer::new(CupratedTracingFilter { + // initialize the appender filter, set `FILE_WRITER_FILTER_HANDLE` and create the layer. + let (appender_filter, appender_handle) = ReloadLayer::new(CupratedTracingFilter { level: appender_config.level, }); drop(FILE_WRITER_FILTER_HANDLE.set(appender_handle)); @@ -90,16 +102,23 @@ pub fn init_logging(config: &Config) { .with_writer(appender) .with_filter(appender_filter); + // initialize tracing with the 2 layers. tracing_subscriber::registry() .with(appender_layer) .with(stdout_layer) .init(); } +/// Modify the stdout [`CupratedTracingFilter`]. +/// +/// Must only be called after [`init_logging`]. pub fn modify_stdout_output(f: impl FnOnce(&mut CupratedTracingFilter)) { STDOUT_FILTER_HANDLE.get().unwrap().modify(f).unwrap(); } +/// Modify the file appender [`CupratedTracingFilter`]. +/// +/// Must only be called after [`init_logging`]. pub fn modify_file_output(f: impl FnOnce(&mut CupratedTracingFilter)) { FILE_WRITER_FILTER_HANDLE.get().unwrap().modify(f).unwrap(); } diff --git a/binaries/cuprated/src/main.rs b/binaries/cuprated/src/main.rs index fd07d9aa..52bde99c 100644 --- a/binaries/cuprated/src/main.rs +++ b/binaries/cuprated/src/main.rs @@ -16,20 +16,17 @@ reason = "TODO: remove after v1.0.0" )] -use crate::commands::Command; -use crate::config::Config; -use crate::constants::PANIC_CRITICAL_SERVICE_ERROR; +use tokio::sync::mpsc; +use tower::{Service, ServiceExt}; +use tracing::level_filters::LevelFilter; +use tracing_subscriber::{Registry, util::SubscriberInitExt, reload::Handle, layer::SubscriberExt}; + use cuprate_consensus_context::{ BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, }; use cuprate_helper::time::secs_to_hms; -use tokio::sync::mpsc; -use tower::{Service, ServiceExt}; -use tracing::level_filters::LevelFilter; -use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::reload::Handle; -use tracing_subscriber::util::SubscriberInitExt; -use tracing_subscriber::Registry; + +use crate::{commands::Command, config::Config, constants::PANIC_CRITICAL_SERVICE_ERROR}; mod blockchain; mod commands; @@ -48,16 +45,21 @@ fn main() { let config = config::read_config_and_args(); - init_global_rayon_pool(&config); - + // Initialize logging. logging::init_logging(&config); + // Initialize the thread-pools + + init_global_rayon_pool(&config); + let rt = init_tokio_rt(&config); let db_thread_pool = cuprate_database_service::init_thread_pool( - cuprate_database_service::ReaderThreads::Percent(0.3), + cuprate_database_service::ReaderThreads::Number(config.storage.reader_threads), ); + // Start the blockchain & tx-pool databases. + let (mut blockchain_read_handle, mut blockchain_write_handle, _) = cuprate_blockchain::service::init_with_pool( config.blockchain_config(), @@ -67,7 +69,10 @@ fn main() { let (txpool_read_handle, txpool_write_handle, _) = cuprate_txpool::service::init_with_pool(config.txpool_config(), db_thread_pool).unwrap(); + // Initialize async tasks. + rt.block_on(async move { + // Check add the genesis block to the blockchain. blockchain::check_add_genesis( &mut blockchain_read_handle, &mut blockchain_write_handle, @@ -75,11 +80,13 @@ fn main() { ) .await; + // Start the context service and the block/tx verifier. let (block_verifier, tx_verifier, context_svc) = blockchain::init_consensus(blockchain_read_handle.clone(), config.context_config()) .await .unwrap(); + // Start clearnet P2P. let (clearnet, incoming_tx_handler_tx) = p2p::start_clearnet_p2p( blockchain_read_handle.clone(), context_svc.clone(), @@ -89,6 +96,7 @@ fn main() { .await .unwrap(); + // Create the incoming tx handler service. let tx_handler = txpool::IncomingTxHandler::init( clearnet.clone(), txpool_write_handle.clone(), @@ -100,6 +108,7 @@ fn main() { unreachable!() } + // Initialize the blockchain manager. blockchain::init_blockchain_manager( clearnet, blockchain_write_handle, @@ -111,13 +120,18 @@ fn main() { ) .await; + // Start the command listener. let (command_tx, command_rx) = mpsc::channel(1); std::thread::spawn(|| commands::command_listener(command_tx)); - io_loop(command_rx, context_svc).await; + // Wait on the io_loop, spawned on a separate task as this improves performance. + tokio::spawn(io_loop(command_rx, context_svc)) + .await + .unwrap(); }); } +/// Initialize the [`tokio`] runtime. fn init_tokio_rt(config: &Config) -> tokio::runtime::Runtime { tokio::runtime::Builder::new_multi_thread() .worker_threads(config.tokio.threads) @@ -126,6 +140,7 @@ fn init_tokio_rt(config: &Config) -> tokio::runtime::Runtime { .unwrap() } +/// Initialize the global [`rayon`] thread-pool. fn init_global_rayon_pool(config: &Config) { rayon::ThreadPoolBuilder::new() .num_threads(config.rayon.threads) @@ -133,6 +148,7 @@ fn init_global_rayon_pool(config: &Config) { .unwrap() } +/// The [`Command`] handler loop. async fn io_loop( mut incoming_commands: mpsc::Receiver, mut context_service: BlockChainContextService, diff --git a/p2p/p2p/src/inbound_server.rs b/p2p/p2p/src/inbound_server.rs index 3669dfd2..a6ed9b40 100644 --- a/p2p/p2p/src/inbound_server.rs +++ b/p2p/p2p/src/inbound_server.rs @@ -119,7 +119,7 @@ where match client { Ok(Ok(peer)) => drop(new_connection_tx.send(peer).await), Err(_) => tracing::debug!("Timed out"), - Ok(Err(e)) => tracing::debug!("error: {e:?}") + Ok(Err(e)) => tracing::debug!("error: {e:?}"), } } .instrument(Span::current()),