cuprated: Init (#344)

* add cuprated skeleton

* fmt and add deny exception

* add main chain batch handler

* add blockchain init

* very rough block manager

* misc changes

* move more config values

* add new tables & types

* add function to fully add an alt block

* resolve current todo!s

* add new requests

* WIP: starting re-orgs

* add last service request

* commit Cargo.lock

* add test

* more docs + cleanup + alt blocks request

* clippy + fmt

* document types

* move tx_fee to helper

* more doc updates

* fmt

* fix imports

* remove config files

* fix merge errors

* fix generated coins

* handle more p2p requests + alt blocks

* clean up handler code

* add function for incoming blocks

* add docs to handler functions

* broadcast new blocks + add commands

* add fluffy block handler

* fix new block handling

* small cleanup

* increase outbound peer count

* fix merge

* clean up the blockchain manger

* add more docs + cleanup imports

* fix typo

* fix doc

* remove unrelated changes

* add `get_objects` handler

* add `get_chain` handler

* add `fluffy_missing_txs` handler

* add `new_fluffy_block` handler

* improve interface globals

* manger -> manager

* enums instead of bools

* move chain service to separate file

* more review fixes

* sort imports + docs

* init config

* init dandelion integration

* add dandelion start function

* finish incoming tx handler

* Add tx blob hash table

* Add missing txpool requests

* handle duplicate stem txs

* check txpool on incoming block

* add request to remove tx in new blocks from the pool

* tell the txpool about incoming blocks

* fix merge

* typos

* remove blockchain height from txpool

* fix merge

* fix merge

* handle incoming txs in p2p request handler

* split sections

* finish initial config.

* fix clap

* misc changes

* fix doc

* fix test & clippy

* fix test 2

* try fix windows

* testing

* testing 2

* fix windows test

* fix windows: the remix.

* Allow `IncomingTxHandler` to be given later

* add p2p clearnet init

* fix build

* misc changes

* review comments

* fix imports

* rename & fix default config file

* fix cargo hack

* enable serde on `cuprate-helper`

* changes from matrix chats

* fix ci

* fix doc

* fix doc test

* doc updates

* more doc updates

* sort imports

* add startup code

* d -> h

* add file logging

* fix stem peer service

* todo

* remove `get_range`

* change usages of `get_range`

* clippy

* cargo update

* fix test + update comment

* manually set numb threads for each pool

* fix address book saves

* add more data to status

* fix config

* cleanup main + logging

* add more info to output when changing log level

* cleanup commands

* fix small issue in block downloader  more misc clean up

* cross block bp(+) batch verification

* add message when syncing is done

* Revert "cross block bp(+) batch verification"

This reverts commit 764c4663a0.

* fix fmt & clippy

* move `io_loop` to commands

* review fixes

* fix clippy

* review fixes
This commit is contained in:
Boog900 2025-01-17 20:24:24 +00:00 committed by GitHub
parent 6320411f60
commit 503ef11514
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 782 additions and 112 deletions

106
Cargo.lock generated
View file

@ -445,6 +445,7 @@ checksum = "30582fc632330df2bd26877bde0c1f4470d57c582bbc070376afcd04d8cb4838"
dependencies = [
"anstyle",
"clap_lex",
"strsim",
"terminal_size",
]
@ -1233,6 +1234,7 @@ dependencies = [
"toml",
"tower 0.5.1",
"tracing",
"tracing-appender",
"tracing-subscriber",
]
@ -1268,7 +1270,7 @@ dependencies = [
[[package]]
name = "dalek-ff-group"
version = "0.4.1"
source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79"
source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66"
dependencies = [
"crypto-bigint",
"curve25519-dalek",
@ -1295,6 +1297,15 @@ dependencies = [
"parking_lot_core",
]
[[package]]
name = "deranged"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4"
dependencies = [
"powerfmt",
]
[[package]]
name = "diff"
version = "0.1.13"
@ -1424,7 +1435,7 @@ dependencies = [
[[package]]
name = "flexible-transcript"
version = "0.3.2"
source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79"
source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66"
dependencies = [
"blake2",
"digest",
@ -2174,7 +2185,7 @@ dependencies = [
[[package]]
name = "monero-address"
version = "0.1.0"
source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79"
source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66"
dependencies = [
"curve25519-dalek",
"monero-io",
@ -2187,7 +2198,7 @@ dependencies = [
[[package]]
name = "monero-borromean"
version = "0.1.0"
source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79"
source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66"
dependencies = [
"curve25519-dalek",
"monero-generators",
@ -2200,7 +2211,7 @@ dependencies = [
[[package]]
name = "monero-bulletproofs"
version = "0.1.0"
source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79"
source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66"
dependencies = [
"curve25519-dalek",
"monero-generators",
@ -2215,7 +2226,7 @@ dependencies = [
[[package]]
name = "monero-clsag"
version = "0.1.0"
source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79"
source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66"
dependencies = [
"curve25519-dalek",
"dalek-ff-group",
@ -2235,7 +2246,7 @@ dependencies = [
[[package]]
name = "monero-generators"
version = "0.4.0"
source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79"
source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66"
dependencies = [
"curve25519-dalek",
"dalek-ff-group",
@ -2249,7 +2260,7 @@ dependencies = [
[[package]]
name = "monero-io"
version = "0.1.0"
source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79"
source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66"
dependencies = [
"curve25519-dalek",
"std-shims",
@ -2258,7 +2269,7 @@ dependencies = [
[[package]]
name = "monero-mlsag"
version = "0.1.0"
source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79"
source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66"
dependencies = [
"curve25519-dalek",
"monero-generators",
@ -2272,7 +2283,7 @@ dependencies = [
[[package]]
name = "monero-primitives"
version = "0.1.0"
source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79"
source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66"
dependencies = [
"curve25519-dalek",
"monero-generators",
@ -2285,9 +2296,8 @@ dependencies = [
[[package]]
name = "monero-rpc"
version = "0.1.0"
source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79"
source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66"
dependencies = [
"async-trait",
"curve25519-dalek",
"hex",
"monero-address",
@ -2302,7 +2312,7 @@ dependencies = [
[[package]]
name = "monero-serai"
version = "0.1.4-alpha"
source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79"
source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66"
dependencies = [
"curve25519-dalek",
"hex-literal",
@ -2320,9 +2330,8 @@ dependencies = [
[[package]]
name = "monero-simple-request-rpc"
version = "0.1.0"
source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79"
source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66"
dependencies = [
"async-trait",
"digest_auth",
"hex",
"monero-rpc",
@ -2340,6 +2349,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "num-conv"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-traits"
version = "0.2.19"
@ -2536,6 +2551,12 @@ dependencies = [
"plotters-backend",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "ppv-lite86"
version = "0.2.20"
@ -3087,7 +3108,7 @@ dependencies = [
[[package]]
name = "simple-request"
version = "0.1.0"
source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79"
source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66"
dependencies = [
"http-body-util",
"hyper",
@ -3153,12 +3174,18 @@ checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "std-shims"
version = "0.1.1"
source = "git+https://github.com/Cuprate/serai.git?rev=d5205ce#d5205ce2319e09414eb91d12cf38e83a08165f79"
source = "git+https://github.com/Cuprate/serai.git?rev=e6fdef6#e6fdef6d0b4481932ac9647796eb3fa56197ed66"
dependencies = [
"hashbrown 0.14.5",
"spin",
]
[[package]]
name = "strsim"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "strum"
version = "0.26.3"
@ -3295,6 +3322,37 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a693d0c8cf16973fac5a93fbe47b8c6452e7097d4fcac49f3d7a18e39c76e62e"
[[package]]
name = "time"
version = "0.3.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21"
dependencies = [
"deranged",
"itoa",
"num-conv",
"powerfmt",
"serde",
"time-core",
"time-macros",
]
[[package]]
name = "time-core"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3"
[[package]]
name = "time-macros"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2834e6017e3e5e4b9834939793b282bc03b37a3336245fa820e35e233e2a85de"
dependencies = [
"num-conv",
"time-core",
]
[[package]]
name = "tinystr"
version = "0.7.6"
@ -3388,6 +3446,8 @@ dependencies = [
"bytes",
"futures-core",
"futures-sink",
"futures-util",
"hashbrown 0.14.5",
"pin-project-lite",
"slab",
"tokio",
@ -3494,6 +3554,18 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-appender"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf"
dependencies = [
"crossbeam-channel",
"thiserror",
"time",
"tracing-subscriber",
]
[[package]]
name = "tracing-attributes"
version = "0.1.28"

View file

@ -62,6 +62,7 @@ codegen-units = 1 # Optimize for binary speed over compile times
opt-level = 3
[profile.dev]
panic = "abort"
lto = false
strip = "none"
# Not much slower compile times than opt-level 0, but much faster code.
@ -128,7 +129,7 @@ futures = { version = "0.3", default-features = false }
hex = { version = "0.4", default-features = false }
hex-literal = { version = "0.4", default-features = false }
indexmap = { version = "2", default-features = false }
monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "d5205ce", default-features = false }
monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "e6fdef6", default-features = false }
paste = { version = "1", default-features = false }
pin-project = { version = "1", default-features = false }
randomx-rs = { git = "https://github.com/Cuprate/randomx-rs.git", rev = "0028464", default-features = false }
@ -146,14 +147,15 @@ tokio-stream = { version = "0.1", default-features = false }
tokio = { version = "1", default-features = false }
tower = { git = "https://github.com/Cuprate/tower.git", rev = "6c7faf0", default-features = false } # <https://github.com/tower-rs/tower/pull/796>
toml = { version = "0.8", default-features = false }
tracing-appender = { version = "0.2", default-features = false }
tracing-subscriber = { version = "0.3", default-features = false }
tracing = { version = "0.1", default-features = false }
## workspace.dev-dependencies
criterion = { version = "0.5" }
function_name = { version = "0.3" }
monero-rpc = { git = "https://github.com/Cuprate/serai.git", rev = "d5205ce" }
monero-simple-request-rpc = { git = "https://github.com/Cuprate/serai.git", rev = "d5205ce" }
monero-rpc = { git = "https://github.com/Cuprate/serai.git", rev = "e6fdef6" }
monero-simple-request-rpc = { git = "https://github.com/Cuprate/serai.git", rev = "e6fdef6" }
tempfile = { version = "3" }
pretty_assertions = { version = "1" }
proptest = { version = "1" }

View file

@ -14,7 +14,7 @@ cuprate-fast-sync = { workspace = true }
cuprate-consensus-context = { workspace = true }
cuprate-consensus-rules = { workspace = true }
cuprate-cryptonight = { workspace = true }
cuprate-helper = { workspace = true, features = ["serde"] }
cuprate-helper = { workspace = true, features = ["std", "serde", "time"] }
cuprate-epee-encoding = { workspace = true }
cuprate-fixed-bytes = { workspace = true }
cuprate-levin = { workspace = true }
@ -44,7 +44,7 @@ borsh = { workspace = true }
bytemuck = { workspace = true }
bytes = { workspace = true }
cfg-if = { workspace = true }
clap = { workspace = true, features = ["cargo", "help", "wrap_help"] }
clap = { workspace = true, features = ["cargo", "help", "wrap_help", "usage", "error-context", "suggestions"] }
chrono = { workspace = true }
crypto-bigint = { workspace = true }
crossbeam = { workspace = true }
@ -68,11 +68,12 @@ serde_json = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
thread_local = { workspace = true }
tokio-util = { workspace = true }
tokio-util = { workspace = true, features = ["rt"] }
tokio-stream = { workspace = true }
tokio = { workspace = true }
toml = { workspace = true, features = ["parse", "display"]}
tower = { workspace = true }
tracing-appender = { workspace = true }
tracing-subscriber = { workspace = true, features = ["std", "fmt", "default"] }
tracing = { workspace = true, features = ["default"] }

View file

@ -11,8 +11,10 @@ network = "Mainnet"
## Tracing config.
[tracing]
## The minimum level for log events to be displayed.
level = "info"
## The stdout logging config.
stdout = { level = "info" }
## The file output logging config.
file = { level = "debug", max_log_files = 7 }
## Clear-net config.
[p2p.clear_net]
@ -45,15 +47,10 @@ buffer_bytes = 50_000_000
## The size of the queue of blocks which are waiting for a parent block to be downloaded (bytes).
in_progress_queue_bytes = 50_000_000
## The target size of a batch of blocks (bytes), must not exceed 100MB.
target_batch_bytes= 5_000_000
target_batch_bytes = 10_000_000
## The amount of time between checking the pool of connected peers for free peers to download blocks.
check_client_pool_interval = { secs = 30, nanos = 0 }
## Storage config
[storage]
## The amount of reader threads to spawn.
reader_threads = "OnePerThread"
## Txpool storage config.
[storage.txpool]
## The database sync mode for the txpool.

View file

@ -25,6 +25,7 @@ mod manager;
mod syncer;
mod types;
pub use manager::init_blockchain_manager;
pub use types::{
ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle,
};

View file

@ -9,7 +9,7 @@ use rayon::prelude::*;
use std::ops::ControlFlow;
use std::{collections::HashMap, sync::Arc};
use tower::{Service, ServiceExt};
use tracing::info;
use tracing::{info, instrument};
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::{
@ -120,6 +120,15 @@ impl super::BlockchainManager {
///
/// This function will panic if the batch is empty or if any internal service returns an unexpected
/// error that we cannot recover from or if the incoming batch contains no blocks.
#[instrument(
name = "incoming_block_batch",
skip_all,
level = "info",
fields(
start_height = batch.blocks.first().unwrap().0.number().unwrap(),
len = batch.blocks.len()
)
)]
pub async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) {
let (first_block, _) = batch
.blocks
@ -146,11 +155,6 @@ impl super::BlockchainManager {
/// This function will panic if any internal service returns an unexpected error that we cannot
/// recover from or if the incoming batch contains no blocks.
async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) {
info!(
"Handling batch to main chain height: {}",
batch.blocks.first().unwrap().0.number().unwrap()
);
let batch_prep_res = self
.block_verifier_service
.ready()
@ -192,6 +196,7 @@ impl super::BlockchainManager {
self.add_valid_block_to_main_chain(verified_block).await;
}
info!("Successfully added block batch");
}
/// Handles an incoming [`BlockBatch`] that does not follow the main-chain.

View file

@ -10,6 +10,7 @@ use tower::{Service, ServiceExt};
use tracing::instrument;
use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse};
use cuprate_consensus_context::RawBlockChainContext;
use cuprate_p2p::{
block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
NetworkInterface, PeerSetRequest, PeerSetResponse,
@ -73,20 +74,7 @@ where
check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?;
let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context();
let PeerSetResponse::MostPoWSeen {
cumulative_difficulty,
..
} = clearnet_interface
.peer_set()
.ready()
.await?
.call(PeerSetRequest::MostPoWSeen)
.await?
else {
unreachable!();
};
if cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty {
if !check_behind_peers(raw_blockchain_context, &mut clearnet_interface).await? {
continue;
}
@ -99,11 +87,18 @@ where
loop {
tokio::select! {
() = stop_current_block_downloader.notified() => {
tracing::info!("Stopping block downloader");
tracing::info!("Received stop signal, stopping block downloader");
break;
}
batch = block_batch_stream.next() => {
let Some(batch) = batch else {
check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?;
let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context();
if !check_behind_peers(raw_blockchain_context, &mut clearnet_interface).await? {
tracing::info!("Synchronised with the network.");
}
break;
};
@ -117,6 +112,31 @@ where
}
}
/// Returns `true` if we are behind the current connected network peers.
async fn check_behind_peers(
raw_blockchain_context: &RawBlockChainContext,
mut clearnet_interface: &mut NetworkInterface<ClearNet>,
) -> Result<bool, tower::BoxError> {
let PeerSetResponse::MostPoWSeen {
cumulative_difficulty,
..
} = clearnet_interface
.peer_set()
.ready()
.await?
.call(PeerSetRequest::MostPoWSeen)
.await?
else {
unreachable!();
};
if cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty {
return Ok(false);
}
Ok(true)
}
/// Checks if we should update the given [`BlockChainContext`] and updates it if needed.
async fn check_update_blockchain_context<C>(
context_svc: C,

View file

@ -0,0 +1,137 @@
//! Commands
//!
//! `cuprated` [`Command`] definition and handling.
use std::{io, thread::sleep, time::Duration};
use clap::{builder::TypedValueParser, Parser, ValueEnum};
use tokio::sync::mpsc;
use tower::{Service, ServiceExt};
use tracing::level_filters::LevelFilter;
use cuprate_consensus_context::{
BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService,
};
use cuprate_helper::time::secs_to_hms;
use crate::{
constants::PANIC_CRITICAL_SERVICE_ERROR,
logging::{self, CupratedTracingFilter},
statics,
};
/// A command received from [`io::stdin`].
#[derive(Debug, Parser)]
#[command(
multicall = true,
subcommand_required = true,
rename_all = "snake_case",
help_template = "{all-args}",
arg_required_else_help = true,
disable_help_flag = true
)]
pub enum Command {
/// Change the log output.
#[command(arg_required_else_help = true)]
SetLog {
/// The minimum log level that will be displayed.
#[arg(
short, long,
value_parser = clap::builder::PossibleValuesParser::new(["off", "trace", "debug", "info", "warn", "error"])
.map(|s| s.parse::<LevelFilter>().unwrap()),
)]
level: Option<LevelFilter>,
/// The logging output target to change.
#[arg(value_enum, default_value_t)]
output_target: OutputTarget,
},
/// Print status information on `cuprated`.
Status,
}
/// The log output target.
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)]
pub enum OutputTarget {
/// The stdout logging output.
#[default]
Stdout,
/// The file appender logging output.
File,
}
/// The [`Command`] listener loop.
pub fn command_listener(incoming_commands: mpsc::Sender<Command>) -> ! {
let mut stdin = io::stdin();
let mut line = String::new();
loop {
line.clear();
if let Err(e) = stdin.read_line(&mut line) {
eprintln!("Failed to read from stdin: {e}");
sleep(Duration::from_secs(1));
continue;
}
match Command::try_parse_from(line.split_whitespace()) {
Ok(command) => drop(
incoming_commands
.blocking_send(command)
.inspect_err(|err| eprintln!("Failed to send command: {err}")),
),
Err(err) => err.print().unwrap(),
}
}
}
/// The [`Command`] handler loop.
pub async fn io_loop(
mut incoming_commands: mpsc::Receiver<Command>,
mut context_service: BlockChainContextService,
) {
loop {
let Some(command) = incoming_commands.recv().await else {
tracing::warn!("Shutting down io_loop command channel closed.");
return;
};
match command {
Command::SetLog {
level,
output_target,
} => {
let modify_output = |filter: &mut CupratedTracingFilter| {
if let Some(level) = level {
filter.level = level;
}
println!("NEW LOG FILTER: {filter}");
};
match output_target {
OutputTarget::File => logging::modify_file_output(modify_output),
OutputTarget::Stdout => logging::modify_stdout_output(modify_output),
}
}
Command::Status => {
let BlockChainContextResponse::Context(blockchain_context) = context_service
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockChainContextRequest::Context)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
else {
unreachable!();
};
let context = blockchain_context.unchecked_blockchain_context();
let uptime = statics::START_INSTANT.elapsed().unwrap_or_default();
let (h, m, s) = secs_to_hms(uptime.as_secs());
let height = context.chain_height;
let top_hash = hex::encode(context.top_hash);
println!("STATUS:\n uptime: {h}h {m}m {s}s,\n height: {height},\n top_hash: {top_hash}");
}
}
}
}

View file

@ -20,12 +20,16 @@ use cuprate_p2p_core::{ClearNet, ClearNetServerCfg};
mod args;
mod fs;
mod p2p;
mod rayon;
mod storage;
mod tokio;
mod tracing_config;
use crate::config::fs::FileSystemConfig;
use fs::FileSystemConfig;
use p2p::P2PConfig;
use rayon::RayonConfig;
use storage::StorageConfig;
use tokio::TokioConfig;
use tracing_config::TracingConfig;
/// Reads the args & config file, returning a [`Config`].
@ -65,22 +69,26 @@ pub fn read_config_and_args() -> Config {
}
/// The config for all of Cuprate.
#[derive(Default, Deserialize, Serialize)]
#[derive(Debug, Default, Deserialize, Serialize, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub struct Config {
/// The network we should run on.
network: Network,
/// [`tracing`] config.
tracing: TracingConfig,
pub tracing: TracingConfig,
pub tokio: TokioConfig,
pub rayon: RayonConfig,
/// The P2P network config.
p2p: P2PConfig,
/// The storage config.
storage: StorageConfig,
pub storage: StorageConfig,
fs: FileSystemConfig,
pub fs: FileSystemConfig,
}
impl Config {
@ -152,6 +160,18 @@ impl Config {
.build()
}
/// The [`cuprate_txpool`] config.
pub fn txpool_config(&self) -> cuprate_txpool::config::Config {
let txpool = &self.storage.txpool;
// We don't set reader threads as we manually make the reader threadpool.
cuprate_txpool::config::ConfigBuilder::default()
.network(self.network)
.data_directory(self.fs.data_directory.clone())
.sync_mode(txpool.shared.sync_mode)
.build()
}
/// The [`BlockDownloaderConfig`].
pub fn block_downloader_config(&self) -> BlockDownloaderConfig {
self.p2p.block_downloader.clone().into()

View file

@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
use cuprate_helper::fs::{CUPRATE_CACHE_DIR, CUPRATE_DATA_DIR};
#[derive(Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
#[serde(deny_unknown_fields, default)]
pub struct FileSystemConfig {
pub data_directory: PathBuf,

View file

@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
use cuprate_helper::{fs::address_book_path, network::Network};
/// P2P config.
#[derive(Default, Deserialize, Serialize)]
#[derive(Debug, Default, Deserialize, Serialize, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub struct P2PConfig {
/// Clear-net config.
@ -18,7 +18,7 @@ pub struct P2PConfig {
pub block_downloader: BlockDownloaderConfig,
}
#[derive(Clone, Deserialize, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize, Eq, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub struct BlockDownloaderConfig {
/// The size in bytes of the buffer between the block downloader and the place which
@ -50,13 +50,13 @@ impl Default for BlockDownloaderConfig {
buffer_bytes: 50_000_000,
in_progress_queue_bytes: 50_000_000,
check_client_pool_interval: Duration::from_secs(30),
target_batch_bytes: 5_000_000,
target_batch_bytes: 10_000_000,
}
}
}
/// The config values for P2P clear-net.
#[derive(Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub struct ClearNetConfig {
/// The server config.
@ -75,7 +75,7 @@ impl Default for ClearNetConfig {
}
/// Network config values shared between all network zones.
#[derive(Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub struct SharedNetConfig {
/// The number of outbound connections to make and try keep.
@ -121,7 +121,7 @@ impl Default for SharedNetConfig {
}
}
#[derive(Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub struct AddressBookConfig {
max_white_list_length: usize,
@ -134,7 +134,7 @@ impl Default for AddressBookConfig {
Self {
max_white_list_length: 1_000,
max_gray_list_length: 5_000,
peer_save_period: Duration::from_secs(30),
peer_save_period: Duration::from_secs(90),
}
}
}

View file

@ -0,0 +1,17 @@
use serde::{Deserialize, Serialize};
/// The [`rayon`] config.
#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub struct RayonConfig {
/// The number of threads to use for the [`rayon::ThreadPool`].
pub threads: usize,
}
impl Default for RayonConfig {
fn default() -> Self {
Self {
threads: cuprate_helper::thread::threads_75().get(),
}
}
}

View file

@ -7,19 +7,29 @@ use cuprate_database_service::ReaderThreads;
use cuprate_helper::fs::CUPRATE_DATA_DIR;
/// The storage config.
#[derive(Default, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
#[serde(deny_unknown_fields, default)]
pub struct StorageConfig {
/// The amount of reader threads to spawn between the tx-pool and blockchain.
pub reader_threads: ReaderThreads,
pub reader_threads: usize,
/// The tx-pool config.
pub txpool: TxpoolConfig,
/// The blockchain config.
pub blockchain: BlockchainConfig,
}
impl Default for StorageConfig {
fn default() -> Self {
Self {
reader_threads: cuprate_helper::thread::threads_25().get(),
txpool: Default::default(),
blockchain: Default::default(),
}
}
}
/// The blockchain config.
#[derive(Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
#[serde(deny_unknown_fields, default)]
pub struct BlockchainConfig {
#[serde(flatten)]
@ -37,7 +47,7 @@ impl Default for BlockchainConfig {
}
/// The tx-pool config.
#[derive(Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
#[serde(deny_unknown_fields, default)]
pub struct TxpoolConfig {
#[serde(flatten)]
@ -59,7 +69,7 @@ impl Default for TxpoolConfig {
}
/// Config values shared between the tx-pool and blockchain.
#[derive(Default, Deserialize, Serialize)]
#[derive(Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
#[serde(deny_unknown_fields, default)]
pub struct SharedStorageConfig {
/// The [`SyncMode`] of the database.

View file

@ -0,0 +1,17 @@
use serde::{Deserialize, Serialize};
/// [`tokio`] config.
#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub struct TokioConfig {
/// The amount of threads to spawn for the async thread-pool
pub threads: usize,
}
impl Default for TokioConfig {
fn default() -> Self {
Self {
threads: cuprate_helper::thread::threads_75().get(),
}
}
}

View file

@ -2,15 +2,22 @@ use serde::{Deserialize, Serialize};
use tracing::level_filters::LevelFilter;
/// [`tracing`] config.
#[derive(Deserialize, Serialize)]
#[derive(Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub struct TracingConfig {
/// The default minimum log level.
#[serde(with = "level_filter_serde")]
level: LevelFilter,
pub stdout: StdoutTracingConfig,
pub file: FileTracingConfig,
}
impl Default for TracingConfig {
#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub struct StdoutTracingConfig {
/// The default minimum log level.
#[serde(with = "level_filter_serde")]
pub level: LevelFilter,
}
impl Default for StdoutTracingConfig {
fn default() -> Self {
Self {
level: LevelFilter::INFO,
@ -18,6 +25,26 @@ impl Default for TracingConfig {
}
}
#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub struct FileTracingConfig {
/// The default minimum log level.
#[serde(with = "level_filter_serde")]
pub level: LevelFilter,
/// The maximum amount of log files to keep, once this number is passed the oldest file
/// will be deleted.
pub max_log_files: usize,
}
impl Default for FileTracingConfig {
fn default() -> Self {
Self {
level: LevelFilter::DEBUG,
max_log_files: 7,
}
}
}
mod level_filter_serde {
use std::str::FromStr;

View file

@ -42,5 +42,7 @@ mod test {
#[test]
fn generate_config_text_is_valid() {
let config: Config = toml::from_str(EXAMPLE_CONFIG).unwrap();
assert_eq!(config, Config::default());
}
}

View file

@ -0,0 +1,142 @@
//! Logging
//!
//! `cuprated` log filtering settings and related functionality.
use std::ops::BitAnd;
use std::{
fmt::{Display, Formatter},
mem::forget,
sync::OnceLock,
};
use tracing::{
instrument::WithSubscriber, level_filters::LevelFilter, subscriber::Interest, Metadata,
};
use tracing_appender::{non_blocking::NonBlocking, rolling::Rotation};
use tracing_subscriber::{
filter::Filtered,
fmt::{
self,
format::{DefaultFields, Format},
Layer as FmtLayer,
},
layer::{Context, Filter, Layered, SubscriberExt},
reload::{Handle, Layer as ReloadLayer},
util::SubscriberInitExt,
Layer, Registry,
};
use cuprate_helper::fs::logs_path;
use crate::config::Config;
/// A [`OnceLock`] which holds the [`Handle`] to update the file logging output.
///
/// Initialized in [`init_logging`].
static FILE_WRITER_FILTER_HANDLE: OnceLock<Handle<CupratedTracingFilter, Registry>> =
OnceLock::new();
/// A [`OnceLock`] which holds the [`Handle`] to update the stdout logging output.
///
/// Initialized in [`init_logging`].
#[expect(clippy::type_complexity)] // factoring out isn't going to help readability.
static STDOUT_FILTER_HANDLE: OnceLock<
Handle<
CupratedTracingFilter,
Layered<
Filtered<
FmtLayer<Registry, DefaultFields, Format, NonBlocking>,
ReloadLayer<CupratedTracingFilter, Registry>,
Registry,
>,
Registry,
Registry,
>,
>,
> = OnceLock::new();
/// The [`Filter`] used to alter cuprated's log output.
#[derive(Debug)]
pub struct CupratedTracingFilter {
pub level: LevelFilter,
}
// Custom display behavior for command output.
impl Display for CupratedTracingFilter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Filter")
.field("minimum_level", &self.level.to_string())
.finish()
}
}
impl<S> Filter<S> for CupratedTracingFilter {
fn enabled(&self, meta: &Metadata<'_>, cx: &Context<'_, S>) -> bool {
Filter::<S>::enabled(&self.level, meta, cx)
}
fn callsite_enabled(&self, meta: &'static Metadata<'static>) -> Interest {
Filter::<S>::callsite_enabled(&self.level, meta)
}
fn max_level_hint(&self) -> Option<LevelFilter> {
Some(self.level)
}
}
/// Initialize [`tracing`] for logging to stdout and to a file.
pub fn init_logging(config: &Config) {
// initialize the stdout filter, set `STDOUT_FILTER_HANDLE` and create the layer.
let (stdout_filter, stdout_handle) = ReloadLayer::new(CupratedTracingFilter {
level: config.tracing.stdout.level,
});
STDOUT_FILTER_HANDLE.set(stdout_handle).unwrap();
let stdout_layer = FmtLayer::default()
.with_target(false)
.with_filter(stdout_filter);
// create the tracing appender.
let appender_config = &config.tracing.file;
let (appender, guard) = tracing_appender::non_blocking(
tracing_appender::rolling::Builder::new()
.rotation(Rotation::DAILY)
.max_log_files(appender_config.max_log_files)
.build(logs_path(&config.fs.data_directory, config.network()))
.unwrap(),
);
// TODO: drop this when we shutdown.
forget(guard);
// initialize the appender filter, set `FILE_WRITER_FILTER_HANDLE` and create the layer.
let (appender_filter, appender_handle) = ReloadLayer::new(CupratedTracingFilter {
level: appender_config.level,
});
FILE_WRITER_FILTER_HANDLE.set(appender_handle).unwrap();
let appender_layer = fmt::layer()
.with_target(false)
.with_ansi(false)
.with_writer(appender)
.with_filter(appender_filter);
// initialize tracing with the 2 layers.
tracing_subscriber::registry()
.with(appender_layer)
.with(stdout_layer)
.init();
}
/// Modify the stdout [`CupratedTracingFilter`].
///
/// Must only be called after [`init_logging`].
pub fn modify_stdout_output(f: impl FnOnce(&mut CupratedTracingFilter)) {
STDOUT_FILTER_HANDLE.get().unwrap().modify(f).unwrap();
}
/// Modify the file appender [`CupratedTracingFilter`].
///
/// Must only be called after [`init_logging`].
pub fn modify_file_output(f: impl FnOnce(&mut CupratedTracingFilter)) {
FILE_WRITER_FILTER_HANDLE.get().unwrap().modify(f).unwrap();
}

View file

@ -16,9 +16,27 @@
reason = "TODO: remove after v1.0.0"
)]
use std::mem;
use std::sync::Arc;
use tokio::sync::mpsc;
use tower::{Service, ServiceExt};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{layer::SubscriberExt, reload::Handle, util::SubscriberInitExt, Registry};
use cuprate_consensus_context::{
BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService,
};
use cuprate_helper::time::secs_to_hms;
use crate::{
config::Config, constants::PANIC_CRITICAL_SERVICE_ERROR, logging::CupratedTracingFilter,
};
mod blockchain;
mod commands;
mod config;
mod constants;
mod logging;
mod p2p;
mod rpc;
mod signals;
@ -29,8 +47,109 @@ fn main() {
// Initialize global static `LazyLock` data.
statics::init_lazylock_statics();
let _config = config::read_config_and_args();
let config = config::read_config_and_args();
// TODO: everything else.
todo!()
// Initialize logging.
logging::init_logging(&config);
// Initialize the thread-pools
init_global_rayon_pool(&config);
let rt = init_tokio_rt(&config);
let db_thread_pool = cuprate_database_service::init_thread_pool(
cuprate_database_service::ReaderThreads::Number(config.storage.reader_threads),
);
// Start the blockchain & tx-pool databases.
let (mut blockchain_read_handle, mut blockchain_write_handle, _) =
cuprate_blockchain::service::init_with_pool(
config.blockchain_config(),
Arc::clone(&db_thread_pool),
)
.unwrap();
let (txpool_read_handle, txpool_write_handle, _) =
cuprate_txpool::service::init_with_pool(config.txpool_config(), db_thread_pool).unwrap();
// Initialize async tasks.
rt.block_on(async move {
// Check add the genesis block to the blockchain.
blockchain::check_add_genesis(
&mut blockchain_read_handle,
&mut blockchain_write_handle,
config.network(),
)
.await;
// Start the context service and the block/tx verifier.
let (block_verifier, tx_verifier, context_svc) =
blockchain::init_consensus(blockchain_read_handle.clone(), config.context_config())
.await
.unwrap();
// Start clearnet P2P.
let (clearnet, incoming_tx_handler_tx) = p2p::start_clearnet_p2p(
blockchain_read_handle.clone(),
context_svc.clone(),
txpool_read_handle.clone(),
config.clearnet_p2p_config(),
)
.await
.unwrap();
// Create the incoming tx handler service.
let tx_handler = txpool::IncomingTxHandler::init(
clearnet.clone(),
txpool_write_handle.clone(),
txpool_read_handle,
context_svc.clone(),
tx_verifier,
);
if incoming_tx_handler_tx.send(tx_handler).is_err() {
unreachable!()
}
// Initialize the blockchain manager.
blockchain::init_blockchain_manager(
clearnet,
blockchain_write_handle,
blockchain_read_handle,
txpool_write_handle,
context_svc.clone(),
block_verifier,
config.block_downloader_config(),
)
.await;
// Start the command listener.
let (command_tx, command_rx) = mpsc::channel(1);
std::thread::spawn(|| commands::command_listener(command_tx));
// Wait on the io_loop, spawned on a separate task as this improves performance.
tokio::spawn(commands::io_loop(command_rx, context_svc))
.await
.unwrap();
});
}
/// Initialize the [`tokio`] runtime.
fn init_tokio_rt(config: &Config) -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.tokio.threads)
.thread_name("cuprated-tokio")
.enable_all()
.build()
.unwrap()
}
/// Initialize the global [`rayon`] thread-pool.
fn init_global_rayon_pool(config: &Config) {
rayon::ThreadPoolBuilder::new()
.num_threads(config.rayon.threads)
.thread_name(|index| format!("cuprated-rayon-{index}"))
.build_global()
.unwrap();
}

View file

@ -238,7 +238,7 @@ async fn get_chain(
split_u128_into_low_high_bits(cumulative_difficulty);
Ok(ProtocolResponse::GetChain(ChainResponse {
start_height: usize_to_u64(std::num::NonZero::get(start_height)),
start_height: usize_to_u64(start_height),
total_height: usize_to_u64(chain_height),
cumulative_difficulty_low64,
cumulative_difficulty_top64,

View file

@ -53,6 +53,8 @@ impl Stream for OutboundPeerStream {
OutboundPeerStreamState::AwaitingPeer(fut) => {
let res = ready!(fut.poll_unpin(cx));
self.state = OutboundPeerStreamState::Standby;
return Poll::Ready(Some(res.map(|res| {
let PeerSetResponse::StemPeer(stem_peer) = res else {
unreachable!()

View file

@ -200,6 +200,21 @@ pub fn txpool_path(data_dir: &Path, network: Network) -> PathBuf {
path_with_network(data_dir, network).join("txpool")
}
/// Cuprate's logs directory.
///
/// This is the PATH used for all Cuprate log files.
///
/// ```rust
/// use cuprate_helper::{network::Network, fs::{CUPRATE_DATA_DIR, logs_path}};
///
/// assert_eq!(logs_path(&**CUPRATE_DATA_DIR, Network::Mainnet).as_path(), CUPRATE_DATA_DIR.join("logs"));
/// assert_eq!(logs_path(&**CUPRATE_DATA_DIR, Network::Stagenet).as_path(), CUPRATE_DATA_DIR.join(Network::Stagenet.to_string()).join("logs"));
/// assert_eq!(logs_path(&**CUPRATE_DATA_DIR, Network::Testnet).as_path(), CUPRATE_DATA_DIR.join(Network::Testnet.to_string()).join("logs"));
/// ```
pub fn logs_path(data_dir: &Path, network: Network) -> PathBuf {
path_with_network(data_dir, network).join("logs")
}
/// Cuprate's address-book directory.
///
/// This is the PATH used for any Cuprate address-book files.

View file

@ -36,21 +36,21 @@ macro_rules! impl_thread_percent {
clippy::cast_precision_loss,
reason = "we need to round integers"
)]
NonZeroUsize::new(max(1, (threads().get() as f64 * $percent).floor() as usize)).unwrap()
NonZeroUsize::new(max(1, (threads().get() as f64 * $percent).ceil() as usize)).unwrap()
}
)*
}
}
impl_thread_percent! {
/// Get 90% (rounded down) of available amount of system threads.
/// Get 90% (rounded up) of available amount of system threads.
threads_90 => 0.90,
/// Get 75% (rounded down) of available amount of system threads.
/// Get 75% (rounded up) of available amount of system threads.
threads_75 => 0.75,
/// Get 50% (rounded down) of available amount of system threads.
/// Get 50% (rounded up) of available amount of system threads.
threads_50 => 0.50,
/// Get 25% (rounded down) of available amount of system threads.
/// Get 25% (rounded up) of available amount of system threads.
threads_25 => 0.25,
/// Get 10% (rounded down) of available amount of system threads.
/// Get 10% (rounded up) of available amount of system threads.
threads_10 => 0.10,
}

View file

@ -302,9 +302,6 @@ impl<Z: BorshNetworkZone> AddressBook<Z> {
if peb.pruning_seed != peer.pruning_seed {
return Err(AddressBookError::PeersDataChanged("Pruning seed"));
}
if Z::CHECK_NODE_ID && peb.id != peer.id {
return Err(AddressBookError::PeersDataChanged("peer ID"));
}
// TODO: cuprate doesn't need last seen timestamps but should we have them anyway?
peb.last_seen = 0;
peb.rpc_port = peer.rpc_port;

View file

@ -39,10 +39,13 @@ pub(crate) fn save_peers_to_disk<Z: BorshNetworkZone>(
})
.unwrap();
let file = cfg
.peer_store_directory
.join(format!("{}_p2p_state", Z::NAME));
spawn_blocking(move || fs::write(&file, &data))
let dir = cfg.peer_store_directory.clone();
let file = dir.join(Z::NAME);
spawn_blocking(move || {
fs::create_dir_all(dir)?;
fs::write(&file, &data)
})
}
pub(crate) async fn read_peers_from_disk<Z: BorshNetworkZone>(
@ -54,9 +57,7 @@ pub(crate) async fn read_peers_from_disk<Z: BorshNetworkZone>(
),
std::io::Error,
> {
let file = cfg
.peer_store_directory
.join(format!("{}_p2p_state", Z::NAME));
let file = cfg.peer_store_directory.join(Z::NAME);
tracing::info!("Loading peers from file: {} ", file.display());

View file

@ -2,7 +2,6 @@ use std::collections::HashSet;
use monero_serai::{block::Block, transaction::Transaction};
use rayon::prelude::*;
use tokio::time::timeout;
use tower::{Service, ServiceExt};
use tracing::instrument;
@ -16,7 +15,7 @@ use cuprate_wire::protocol::{GetObjectsRequest, GetObjectsResponse};
use crate::{
block_downloader::{BlockBatch, BlockDownloadError, BlockDownloadTaskResponse},
constants::{BLOCK_DOWNLOADER_REQUEST_TIMEOUT, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN},
constants::{MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN},
peer_set::ClientDropGuard,
};
@ -60,17 +59,15 @@ async fn request_batch_from_peer<N: NetworkZone>(
}));
// Request the blocks and add a timeout to the request
let blocks_response = timeout(BLOCK_DOWNLOADER_REQUEST_TIMEOUT, async {
let blocks_response = {
let PeerResponse::Protocol(ProtocolResponse::GetObjects(blocks_response)) =
client.ready().await?.call(request).await?
else {
panic!("Connection task returned wrong response.");
};
Ok::<_, BlockDownloadError>(blocks_response)
})
.await
.map_err(|_| BlockDownloadError::TimedOut)??;
blocks_response
};
// Initial sanity checks
if blocks_response.blocks.len() > ids.len() {

View file

@ -115,8 +115,11 @@ where
tokio::spawn(
async move {
let client = timeout(HANDSHAKE_TIMEOUT, fut).await;
if let Ok(Ok(peer)) = client {
drop(new_connection_tx.send(peer).await);
match client {
Ok(Ok(peer)) => drop(new_connection_tx.send(peer).await),
Err(_) => tracing::debug!("Timed out"),
Ok(Err(e)) => tracing::debug!("error: {e:?}"),
}
}
.instrument(Span::current()),

View file

@ -3,13 +3,15 @@
//---------------------------------------------------------------------------------------------------- Import
use std::sync::Arc;
use rayon::ThreadPool;
use cuprate_database::{ConcreteEnv, InitError};
use cuprate_types::{AltBlockInformation, VerifiedBlockInformation};
use crate::{
config::Config,
service::{
init_read_service, init_write_service,
init_read_service, init_read_service_with_pool, init_write_service,
types::{BlockchainReadHandle, BlockchainWriteHandle},
},
};
@ -46,6 +48,39 @@ pub fn init(
Ok((readers, writer, db))
}
#[cold]
#[inline(never)] // Only called once (?)
/// Initialize a database, and return a read/write handle to it.
///
/// Unlike [`init`] this will not create a thread-pool, instead using
/// the one passed in.
///
/// Once the returned handles are [`Drop::drop`]ed, the reader
/// thread-pool and writer thread will exit automatically.
///
/// # Errors
/// This will forward the error if [`crate::open`] failed.
pub fn init_with_pool(
config: Config,
pool: Arc<ThreadPool>,
) -> Result<
(
BlockchainReadHandle,
BlockchainWriteHandle,
Arc<ConcreteEnv>,
),
InitError,
> {
// Initialize the database itself.
let db = Arc::new(crate::open(config)?);
// Spawn the Reader thread pool and Writer.
let readers = init_read_service_with_pool(Arc::clone(&db), pool);
let writer = init_write_service(Arc::clone(&db));
Ok((readers, writer, db))
}
//---------------------------------------------------------------------------------------------------- Compact history
/// Given a position in the compact history, returns the height offset that should be in that position.
///

View file

@ -128,7 +128,7 @@ mod write;
pub use write::init_write_service;
mod free;
pub use free::init;
pub use free::{init, init_with_pool};
mod types;
pub use types::{BlockchainReadHandle, BlockchainWriteHandle};

View file

@ -631,7 +631,7 @@ fn next_chain_entry(
};
Ok(BlockchainResponse::NextChainEntry {
start_height: std::num::NonZero::new(first_known_height),
start_height: Some(first_known_height),
chain_height,
block_ids,
block_weights,

View file

@ -128,5 +128,5 @@ mod read;
mod types;
mod write;
pub use free::init;
pub use free::{init, init_with_pool};
pub use types::{TxpoolReadHandle, TxpoolWriteHandle};

View file

@ -1,10 +1,12 @@
use std::sync::Arc;
use rayon::ThreadPool;
use cuprate_database::{ConcreteEnv, InitError};
use crate::{
service::{
read::init_read_service,
read::{init_read_service, init_read_service_with_pool},
types::{TxpoolReadHandle, TxpoolWriteHandle},
write::init_write_service,
},
@ -35,3 +37,29 @@ pub fn init(
Ok((readers, writer, db))
}
#[cold]
#[inline(never)] // Only called once (?)
/// Initialize a database, and return a read/write handle to it.
///
/// Unlike [`init`] this will not create a thread-pool, instead using
/// the one passed in.
///
/// Once the returned handles are [`Drop::drop`]ed, the reader
/// thread-pool and writer thread will exit automatically.
///
/// # Errors
/// This will forward the error if [`crate::open`] failed.
pub fn init_with_pool(
config: Config,
pool: Arc<ThreadPool>,
) -> Result<(TxpoolReadHandle, TxpoolWriteHandle, Arc<ConcreteEnv>), InitError> {
// Initialize the database itself.
let db = Arc::new(crate::open(config)?);
// Spawn the Reader thread pool and Writer.
let readers = init_read_service_with_pool(Arc::clone(&db), pool);
let writer = init_write_service(Arc::clone(&db));
Ok((readers, writer, db))
}

View file

@ -44,7 +44,10 @@ pub(super) fn init_read_service(env: Arc<ConcreteEnv>, threads: ReaderThreads) -
/// Should be called _once_ per actual database.
#[cold]
#[inline(never)] // Only called once.
fn init_read_service_with_pool(env: Arc<ConcreteEnv>, pool: Arc<ThreadPool>) -> TxpoolReadHandle {
pub(super) fn init_read_service_with_pool(
env: Arc<ConcreteEnv>,
pool: Arc<ThreadPool>,
) -> TxpoolReadHandle {
DatabaseReadService::new(env, pool, map_request)
}

View file

@ -288,7 +288,7 @@ pub enum BlockchainResponse {
/// If all blocks were unknown `start_height` will be [`None`], the other fields will be meaningless.
NextChainEntry {
/// The start height of this entry, [`None`] if we could not find the split point.
start_height: Option<std::num::NonZero<usize>>,
start_height: Option<usize>,
/// The current chain height.
chain_height: usize,
/// The next block hashes in the entry.