cleanup main + logging

This commit is contained in:
Boog900 2024-12-21 03:34:04 +00:00
parent 17a8065286
commit ed822eb165
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
6 changed files with 82 additions and 44 deletions

View file

@ -86,7 +86,7 @@ pub struct Config {
p2p: P2PConfig, p2p: P2PConfig,
/// The storage config. /// The storage config.
storage: StorageConfig, pub storage: StorageConfig,
pub fs: FileSystemConfig, pub fs: FileSystemConfig,
} }

View file

@ -21,7 +21,10 @@ pub struct StorageConfig {
impl Default for StorageConfig { impl Default for StorageConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
reader_threads: std::thread::available_parallelism().unwrap().get().div_ceil(4), reader_threads: std::thread::available_parallelism()
.unwrap()
.get()
.div_ceil(4),
txpool: Default::default(), txpool: Default::default(),
blockchain: Default::default(), blockchain: Default::default(),
} }

View file

@ -1,28 +1,35 @@
use crate::config::Config; use std::{mem::forget, sync::OnceLock};
use cuprate_helper::fs::logs_path;
use std::mem::forget;
use std::sync::OnceLock;
use tracing::instrument::WithSubscriber;
use tracing::level_filters::LevelFilter;
use tracing::subscriber::Interest;
use tracing::Metadata;
use tracing_appender::non_blocking::NonBlocking;
use tracing_appender::rolling::Rotation;
use tracing_subscriber::filter::Filtered;
use tracing_subscriber::fmt::format::{DefaultFields, Format};
use tracing_subscriber::layer::{Context, Layered, SubscriberExt};
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{
fmt::Layer as FmtLayer,
layer::Filter,
reload::{Handle, Layer as ReloadLayer},
Layer,
};
use tracing_subscriber::{reload, Registry};
use cuprate_helper::fs::logs_path;
use tracing::{
instrument::WithSubscriber, level_filters::LevelFilter, subscriber::Interest, Metadata,
};
use tracing_appender::{non_blocking::NonBlocking, rolling::Rotation};
use tracing_subscriber::{
filter::Filtered,
fmt::{
self,
format::{DefaultFields, Format},
Layer as FmtLayer,
},
layer::{Context, Filter, Layered, SubscriberExt},
reload::{Handle, Layer as ReloadLayer},
util::SubscriberInitExt,
Layer, Registry,
};
use crate::config::Config;
/// A [`OnceLock`] which holds the [`Handle`] to update the file logging output.
///
/// Initialized in [`init_logging`].
static FILE_WRITER_FILTER_HANDLE: OnceLock<Handle<CupratedTracingFilter, Registry>> = static FILE_WRITER_FILTER_HANDLE: OnceLock<Handle<CupratedTracingFilter, Registry>> =
OnceLock::new(); OnceLock::new();
/// A [`OnceLock`] which holds the [`Handle`] to update the stdout logging output.
///
/// Initialized in [`init_logging`].
static STDOUT_FILTER_HANDLE: OnceLock< static STDOUT_FILTER_HANDLE: OnceLock<
Handle< Handle<
CupratedTracingFilter, CupratedTracingFilter,
@ -38,6 +45,7 @@ static STDOUT_FILTER_HANDLE: OnceLock<
>, >,
> = OnceLock::new(); > = OnceLock::new();
/// The [`Filter`] used to alter cuprated's log output.
pub struct CupratedTracingFilter { pub struct CupratedTracingFilter {
pub level: LevelFilter, pub level: LevelFilter,
} }
@ -56,18 +64,20 @@ impl<S> Filter<S> for CupratedTracingFilter {
} }
} }
/// Initialize [`tracing`] for logging to stdout and to a file.
pub fn init_logging(config: &Config) { pub fn init_logging(config: &Config) {
use tracing_subscriber::{fmt, Layer}; // initialize the stdout filter, set `STDOUT_FILTER_HANDLE` and create the layer.
let (stdout_filter, stdout_handle) = ReloadLayer::new(CupratedTracingFilter {
let (stdout_filter, stdout_handle) = reload::Layer::new(CupratedTracingFilter {
level: config.tracing.stdout.level, level: config.tracing.stdout.level,
}); });
drop(STDOUT_FILTER_HANDLE.set(stdout_handle)); drop(STDOUT_FILTER_HANDLE.set(stdout_handle));
let stdout_layer = fmt::Layer::default() let stdout_layer = FmtLayer::default()
.with_target(false) .with_target(false)
.with_filter(stdout_filter); .with_filter(stdout_filter);
// create the tracing appender.
let appender_config = &config.tracing.file; let appender_config = &config.tracing.file;
let (appender, guard) = tracing_appender::non_blocking( let (appender, guard) = tracing_appender::non_blocking(
tracing_appender::rolling::Builder::new() tracing_appender::rolling::Builder::new()
@ -77,9 +87,11 @@ pub fn init_logging(config: &Config) {
.unwrap(), .unwrap(),
); );
// TODO: drop this when we shutdown.
forget(guard); forget(guard);
let (appender_filter, appender_handle) = reload::Layer::new(CupratedTracingFilter { // initialize the appender filter, set `FILE_WRITER_FILTER_HANDLE` and create the layer.
let (appender_filter, appender_handle) = ReloadLayer::new(CupratedTracingFilter {
level: appender_config.level, level: appender_config.level,
}); });
drop(FILE_WRITER_FILTER_HANDLE.set(appender_handle)); drop(FILE_WRITER_FILTER_HANDLE.set(appender_handle));
@ -90,16 +102,23 @@ pub fn init_logging(config: &Config) {
.with_writer(appender) .with_writer(appender)
.with_filter(appender_filter); .with_filter(appender_filter);
// initialize tracing with the 2 layers.
tracing_subscriber::registry() tracing_subscriber::registry()
.with(appender_layer) .with(appender_layer)
.with(stdout_layer) .with(stdout_layer)
.init(); .init();
} }
/// Modify the stdout [`CupratedTracingFilter`].
///
/// Must only be called after [`init_logging`].
pub fn modify_stdout_output(f: impl FnOnce(&mut CupratedTracingFilter)) { pub fn modify_stdout_output(f: impl FnOnce(&mut CupratedTracingFilter)) {
STDOUT_FILTER_HANDLE.get().unwrap().modify(f).unwrap(); STDOUT_FILTER_HANDLE.get().unwrap().modify(f).unwrap();
} }
/// Modify the file appender [`CupratedTracingFilter`].
///
/// Must only be called after [`init_logging`].
pub fn modify_file_output(f: impl FnOnce(&mut CupratedTracingFilter)) { pub fn modify_file_output(f: impl FnOnce(&mut CupratedTracingFilter)) {
FILE_WRITER_FILTER_HANDLE.get().unwrap().modify(f).unwrap(); FILE_WRITER_FILTER_HANDLE.get().unwrap().modify(f).unwrap();
} }

View file

@ -16,20 +16,17 @@
reason = "TODO: remove after v1.0.0" reason = "TODO: remove after v1.0.0"
)] )]
use crate::commands::Command; use tokio::sync::mpsc;
use crate::config::Config; use tower::{Service, ServiceExt};
use crate::constants::PANIC_CRITICAL_SERVICE_ERROR; use tracing::level_filters::LevelFilter;
use tracing_subscriber::{Registry, util::SubscriberInitExt, reload::Handle, layer::SubscriberExt};
use cuprate_consensus_context::{ use cuprate_consensus_context::{
BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService,
}; };
use cuprate_helper::time::secs_to_hms; use cuprate_helper::time::secs_to_hms;
use tokio::sync::mpsc;
use tower::{Service, ServiceExt}; use crate::{commands::Command, config::Config, constants::PANIC_CRITICAL_SERVICE_ERROR};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::reload::Handle;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::Registry;
mod blockchain; mod blockchain;
mod commands; mod commands;
@ -48,16 +45,21 @@ fn main() {
let config = config::read_config_and_args(); let config = config::read_config_and_args();
init_global_rayon_pool(&config); // Initialize logging.
logging::init_logging(&config); logging::init_logging(&config);
// Initialize the thread-pools
init_global_rayon_pool(&config);
let rt = init_tokio_rt(&config); let rt = init_tokio_rt(&config);
let db_thread_pool = cuprate_database_service::init_thread_pool( let db_thread_pool = cuprate_database_service::init_thread_pool(
cuprate_database_service::ReaderThreads::Percent(0.3), cuprate_database_service::ReaderThreads::Number(config.storage.reader_threads),
); );
// Start the blockchain & tx-pool databases.
let (mut blockchain_read_handle, mut blockchain_write_handle, _) = let (mut blockchain_read_handle, mut blockchain_write_handle, _) =
cuprate_blockchain::service::init_with_pool( cuprate_blockchain::service::init_with_pool(
config.blockchain_config(), config.blockchain_config(),
@ -67,7 +69,10 @@ fn main() {
let (txpool_read_handle, txpool_write_handle, _) = let (txpool_read_handle, txpool_write_handle, _) =
cuprate_txpool::service::init_with_pool(config.txpool_config(), db_thread_pool).unwrap(); cuprate_txpool::service::init_with_pool(config.txpool_config(), db_thread_pool).unwrap();
// Initialize async tasks.
rt.block_on(async move { rt.block_on(async move {
// Check add the genesis block to the blockchain.
blockchain::check_add_genesis( blockchain::check_add_genesis(
&mut blockchain_read_handle, &mut blockchain_read_handle,
&mut blockchain_write_handle, &mut blockchain_write_handle,
@ -75,11 +80,13 @@ fn main() {
) )
.await; .await;
// Start the context service and the block/tx verifier.
let (block_verifier, tx_verifier, context_svc) = let (block_verifier, tx_verifier, context_svc) =
blockchain::init_consensus(blockchain_read_handle.clone(), config.context_config()) blockchain::init_consensus(blockchain_read_handle.clone(), config.context_config())
.await .await
.unwrap(); .unwrap();
// Start clearnet P2P.
let (clearnet, incoming_tx_handler_tx) = p2p::start_clearnet_p2p( let (clearnet, incoming_tx_handler_tx) = p2p::start_clearnet_p2p(
blockchain_read_handle.clone(), blockchain_read_handle.clone(),
context_svc.clone(), context_svc.clone(),
@ -89,6 +96,7 @@ fn main() {
.await .await
.unwrap(); .unwrap();
// Create the incoming tx handler service.
let tx_handler = txpool::IncomingTxHandler::init( let tx_handler = txpool::IncomingTxHandler::init(
clearnet.clone(), clearnet.clone(),
txpool_write_handle.clone(), txpool_write_handle.clone(),
@ -100,6 +108,7 @@ fn main() {
unreachable!() unreachable!()
} }
// Initialize the blockchain manager.
blockchain::init_blockchain_manager( blockchain::init_blockchain_manager(
clearnet, clearnet,
blockchain_write_handle, blockchain_write_handle,
@ -111,13 +120,18 @@ fn main() {
) )
.await; .await;
// Start the command listener.
let (command_tx, command_rx) = mpsc::channel(1); let (command_tx, command_rx) = mpsc::channel(1);
std::thread::spawn(|| commands::command_listener(command_tx)); std::thread::spawn(|| commands::command_listener(command_tx));
io_loop(command_rx, context_svc).await; // Wait on the io_loop, spawned on a separate task as this improves performance.
tokio::spawn(io_loop(command_rx, context_svc))
.await
.unwrap();
}); });
} }
/// Initialize the [`tokio`] runtime.
fn init_tokio_rt(config: &Config) -> tokio::runtime::Runtime { fn init_tokio_rt(config: &Config) -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.tokio.threads) .worker_threads(config.tokio.threads)
@ -126,6 +140,7 @@ fn init_tokio_rt(config: &Config) -> tokio::runtime::Runtime {
.unwrap() .unwrap()
} }
/// Initialize the global [`rayon`] thread-pool.
fn init_global_rayon_pool(config: &Config) { fn init_global_rayon_pool(config: &Config) {
rayon::ThreadPoolBuilder::new() rayon::ThreadPoolBuilder::new()
.num_threads(config.rayon.threads) .num_threads(config.rayon.threads)
@ -133,6 +148,7 @@ fn init_global_rayon_pool(config: &Config) {
.unwrap() .unwrap()
} }
/// The [`Command`] handler loop.
async fn io_loop( async fn io_loop(
mut incoming_commands: mpsc::Receiver<Command>, mut incoming_commands: mpsc::Receiver<Command>,
mut context_service: BlockChainContextService, mut context_service: BlockChainContextService,

View file

@ -119,7 +119,7 @@ where
match client { match client {
Ok(Ok(peer)) => drop(new_connection_tx.send(peer).await), Ok(Ok(peer)) => drop(new_connection_tx.send(peer).await),
Err(_) => tracing::debug!("Timed out"), Err(_) => tracing::debug!("Timed out"),
Ok(Err(e)) => tracing::debug!("error: {e:?}") Ok(Err(e)) => tracing::debug!("error: {e:?}"),
} }
} }
.instrument(Span::current()), .instrument(Span::current()),