Compare commits

...

4 commits

Author SHA1 Message Date
521bf877db
P2P: give the protocol handler access to the peer info (#302)
Some checks are pending
Audit / audit (push) Waiting to run
CI / fmt (push) Waiting to run
CI / typo (push) Waiting to run
CI / ci (macos-latest, stable, bash) (push) Waiting to run
CI / ci (ubuntu-latest, stable, bash) (push) Waiting to run
CI / ci (windows-latest, stable-x86_64-pc-windows-gnu, msys2 {0}) (push) Waiting to run
Deny / audit (push) Waiting to run
Doc / build (push) Waiting to run
Doc / deploy (push) Blocked by required conditions
* give the protocol handler access to the peer info

* add trait alias

* clippy + fmt

* update doc

* simplify trait aliases

* use tower `Shared`

* clean import

* fmt

* Update Cargo.toml

Co-authored-by: hinto-janai <hinto.janai@protonmail.com>

* fix merge

---------

Co-authored-by: hinto-janai <hinto.janai@protonmail.com>
2024-09-30 23:19:53 +01:00
6da9d2d734
P2P: remove peer sync service (#299)
* remove peer sync service

* change `p2p` to not use the peer sync service

* fmt & clippy

* doc updates

* review fixes

* add a little more detail to comment
2024-09-30 22:15:48 +01:00
hinto-janai
12bbadd749
cuprated: add constants & statics modules (#301)
* add modules

* docs

* test

* rename

* tabs -> spaces
2024-09-28 01:41:34 +01:00
a072d44a0d
P2P: fix connection disconnect on Client drop (#298)
fix connection disconnect on `Client` drop
2024-09-25 20:56:57 +01:00
25 changed files with 383 additions and 874 deletions

99
Cargo.lock generated
View file

@ -129,9 +129,9 @@ dependencies = [
"serde_urlencoded",
"sync_wrapper 1.0.1",
"tokio",
"tower",
"tower-layer",
"tower-service",
"tower 0.4.13",
"tower-layer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tracing",
]
@ -151,8 +151,8 @@ dependencies = [
"pin-project-lite",
"rustversion",
"sync_wrapper 0.1.2",
"tower-layer",
"tower-service",
"tower-layer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tracing",
]
@ -383,6 +383,26 @@ version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "const_format"
version = "0.2.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50c655d81ff1114fb0dcdea9225ea9f0cc712a6f8d189378e82bdf62a473a64b"
dependencies = [
"const_format_proc_macros",
]
[[package]]
name = "const_format_proc_macros"
version = "0.2.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eff1a44b93f47b1bac19a27932f5c591e43d1ba357ee4f61526c8a25603f0eb1"
dependencies = [
"proc-macro2",
"quote",
"unicode-xid",
]
[[package]]
name = "core-foundation"
version = "0.9.4"
@ -507,7 +527,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-util",
"tower",
"tower 0.5.1",
"tracing",
]
@ -545,7 +565,7 @@ dependencies = [
"tempfile",
"thread_local",
"tokio",
"tower",
"tower 0.5.1",
]
[[package]]
@ -572,7 +592,7 @@ dependencies = [
"tokio",
"tokio-test",
"tokio-util",
"tower",
"tower 0.5.1",
"tracing",
]
@ -618,7 +638,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-util",
"tower",
"tower 0.5.1",
"tracing",
]
@ -648,7 +668,7 @@ dependencies = [
"futures",
"rayon",
"serde",
"tower",
"tower 0.5.1",
]
[[package]]
@ -680,7 +700,7 @@ dependencies = [
"sha3",
"thiserror",
"tokio",
"tower",
"tower 0.5.1",
]
[[package]]
@ -753,7 +773,6 @@ dependencies = [
"cuprate-wire",
"dashmap",
"futures",
"hex",
"indexmap",
"monero-serai",
"pin-project",
@ -766,7 +785,7 @@ dependencies = [
"tokio-stream",
"tokio-test",
"tokio-util",
"tower",
"tower 0.5.1",
"tracing",
]
@ -789,7 +808,7 @@ dependencies = [
"tokio-stream",
"tokio-test",
"tokio-util",
"tower",
"tower 0.5.1",
"tracing",
]
@ -817,7 +836,7 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"tower",
"tower 0.5.1",
"ureq",
]
@ -878,7 +897,7 @@ dependencies = [
"tempfile",
"thiserror",
"tokio",
"tower",
"tower 0.5.1",
]
[[package]]
@ -913,7 +932,7 @@ dependencies = [
[[package]]
name = "cuprated"
version = "0.1.0"
version = "0.0.1"
dependencies = [
"anyhow",
"async-trait",
@ -924,6 +943,7 @@ dependencies = [
"cfg-if",
"chrono",
"clap",
"const_format",
"crossbeam",
"crypto-bigint",
"cuprate-address-book",
@ -972,7 +992,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"tower",
"tower 0.5.1",
"tracing",
"tracing-subscriber",
]
@ -1471,7 +1491,7 @@ dependencies = [
"rustls-pki-types",
"tokio",
"tokio-rustls",
"tower-service",
"tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -1489,8 +1509,8 @@ dependencies = [
"pin-project-lite",
"socket2",
"tokio",
"tower",
"tower-service",
"tower 0.4.13",
"tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tracing",
]
@ -2519,7 +2539,7 @@ dependencies = [
"hyper-rustls",
"hyper-util",
"tokio",
"tower-service",
"tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -2801,9 +2821,24 @@ dependencies = [
"pin-project",
"pin-project-lite",
"tokio",
"tower-layer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tracing",
]
[[package]]
name = "tower"
version = "0.5.1"
source = "git+https://github.com/Cuprate/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf"
dependencies = [
"futures-core",
"futures-util",
"pin-project-lite",
"sync_wrapper 0.1.2",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tower-layer 0.3.3 (git+https://github.com/Cuprate/tower.git?rev=6c7faf0)",
"tower-service 0.3.3 (git+https://github.com/Cuprate/tower.git?rev=6c7faf0)",
"tracing",
]
@ -2813,12 +2848,22 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
[[package]]
name = "tower-layer"
version = "0.3.3"
source = "git+https://github.com/Cuprate/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf"
[[package]]
name = "tower-service"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
[[package]]
name = "tower-service"
version = "0.3.3"
source = "git+https://github.com/Cuprate/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf"
[[package]]
name = "tracing"
version = "0.1.40"
@ -2899,6 +2944,12 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-xid"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853"
[[package]]
name = "untrusted"
version = "0.9.0"

View file

@ -59,6 +59,7 @@ clap = { version = "4.5.17", default-features = false }
chrono = { version = "0.4.38", default-features = false }
crypto-bigint = { version = "0.5.5", default-features = false }
crossbeam = { version = "0.8.4", default-features = false }
const_format = { version = "0.2.33", default-features = false }
curve25519-dalek = { version = "4.1.3", default-features = false }
dashmap = { version = "5.5.3", default-features = false }
dirs = { version = "5.0.1", default-features = false }
@ -81,7 +82,7 @@ thread_local = { version = "1.1.8", default-features = false }
tokio-util = { version = "0.7.12", default-features = false }
tokio-stream = { version = "0.1.16", default-features = false }
tokio = { version = "1.40.0", default-features = false }
tower = { version = "0.4.13", default-features = false }
tower = { git = "https://github.com/Cuprate/tower.git", rev = "6c7faf0", default-features = false } # <https://github.com/tower-rs/tower/pull/796>
tracing-subscriber = { version = "0.3.18", default-features = false }
tracing = { version = "0.1.40", default-features = false }

View file

@ -1,6 +1,6 @@
[package]
name = "cuprated"
version = "0.1.0"
version = "0.0.1"
edition = "2021"
description = "The Cuprate Monero Rust node."
license = "AGPL-3.0-only"
@ -42,11 +42,12 @@ borsh = { workspace = true }
bytemuck = { workspace = true }
bytes = { workspace = true }
cfg-if = { workspace = true }
clap = { workspace = true }
clap = { workspace = true, features = ["cargo"] }
chrono = { workspace = true }
crypto-bigint = { workspace = true }
crossbeam = { workspace = true }
curve25519-dalek = { workspace = true }
const_format = { workspace = true }
dashmap = { workspace = true }
dirs = { workspace = true }
futures = { workspace = true }

View file

@ -0,0 +1,34 @@
//! General constants used throughout `cuprated`.
use const_format::formatcp;
/// `cuprated`'s semantic version (`MAJOR.MINOR.PATCH`) as string.
pub const VERSION: &str = clap::crate_version!();
/// [`VERSION`] + the build type.
///
/// If a debug build, the suffix is `-debug`, else it is `-release`.
pub const VERSION_BUILD: &str = if cfg!(debug_assertions) {
formatcp!("{VERSION}-debug")
} else {
formatcp!("{VERSION}-release")
};
#[cfg(test)]
mod test {
use super::*;
#[test]
fn version() {
assert_eq!(VERSION, "0.0.1");
}
#[test]
fn version_build() {
if cfg!(debug_assertions) {
assert_eq!(VERSION_BUILD, "0.0.1-debug");
} else {
assert_eq!(VERSION_BUILD, "0.0.1-release");
}
}
}

View file

@ -13,10 +13,16 @@
mod blockchain;
mod config;
mod constants;
mod p2p;
mod rpc;
mod statics;
mod txpool;
fn main() {
// Initialize global static `LazyLock` data.
statics::init_lazylock_statics();
// TODO: everything else.
todo!()
}

View file

@ -0,0 +1,53 @@
//! Global `static`s used throughout `cuprated`.
use std::{
sync::{atomic::AtomicU64, LazyLock},
time::{SystemTime, UNIX_EPOCH},
};
/// Define all the `static`s that should be always be initialized early on.
///
/// This wraps all `static`s inside a `LazyLock` and generates
/// a [`init_lazylock_statics`] function that must/should be
/// used by `main()` early on.
macro_rules! define_init_lazylock_statics {
($(
$( #[$attr:meta] )*
$name:ident: $t:ty = $init_fn:expr;
)*) => {
/// Initialize global static `LazyLock` data.
pub fn init_lazylock_statics() {
$(
LazyLock::force(&$name);
)*
}
$(
$(#[$attr])*
pub static $name: LazyLock<$t> = LazyLock::new(|| $init_fn);
)*
};
}
define_init_lazylock_statics! {
/// The start time of `cuprated`.
START_INSTANT: SystemTime = SystemTime::now();
/// Start time of `cuprated` as a UNIX timestamp.
START_INSTANT_UNIX: u64 = START_INSTANT
.duration_since(UNIX_EPOCH)
.expect("Failed to set `cuprated` startup time.")
.as_secs();
}
#[cfg(test)]
mod test {
use super::*;
/// Sanity check for startup UNIX time.
#[test]
fn start_instant_unix() {
// Fri Sep 27 01:07:13 AM UTC 2024
assert!(*START_INSTANT_UNIX > 1727399233);
}
}

View file

@ -19,7 +19,7 @@ tokio-util = { workspace = true, features = ["codec"] }
tokio-stream = { workspace = true, features = ["sync"]}
futures = { workspace = true, features = ["std"] }
async-trait = { workspace = true }
tower = { workspace = true, features = ["util", "tracing"] }
tower = { workspace = true, features = ["util", "tracing", "make"] }
cfg-if = { workspace = true }
thiserror = { workspace = true }

View file

@ -1,6 +1,6 @@
use std::{
fmt::{Debug, Display, Formatter},
sync::Arc,
sync::{Arc, Mutex},
task::{ready, Context, Poll},
};
@ -15,6 +15,7 @@ use tracing::Instrument;
use cuprate_helper::asynch::InfallibleOneshotReceiver;
use cuprate_pruning::PruningSeed;
use cuprate_wire::CoreSyncData;
use crate::{
handles::{ConnectionGuard, ConnectionHandle},
@ -59,8 +60,17 @@ pub struct PeerInformation<A> {
pub handle: ConnectionHandle,
/// The direction of this connection (inbound|outbound).
pub direction: ConnectionDirection,
/// The peers pruning seed.
/// The peer's [`PruningSeed`].
pub pruning_seed: PruningSeed,
/// The [`CoreSyncData`] of this peer.
///
/// Data across fields are not necessarily related, so [`CoreSyncData::top_id`] is not always the
/// block hash for the block at height one below [`CoreSyncData::current_height`].
///
/// This value is behind a [`Mutex`] and is updated whenever the peer sends new information related
/// to their sync state. It is publicly accessible to anyone who has a peers [`Client`] handle. You
/// probably should not mutate this value unless you are creating a custom [`ProtocolRequestHandler`](crate::ProtocolRequestHandler).
pub core_sync_data: Arc<Mutex<CoreSyncData>>,
}
/// This represents a connection to a peer.

View file

@ -22,7 +22,7 @@ use crate::{
constants::{REQUEST_TIMEOUT, SENDING_TIMEOUT},
handles::ConnectionGuard,
AddressBook, BroadcastMessage, CoreSyncSvc, MessageID, NetworkZone, PeerError, PeerRequest,
PeerResponse, PeerSyncSvc, ProtocolRequestHandler, ProtocolResponse, SharedError,
PeerResponse, ProtocolRequestHandler, ProtocolResponse, SharedError,
};
/// A request to the connection task from a [`Client`](crate::client::Client).
@ -71,7 +71,7 @@ const fn levin_command_response(message_id: MessageID, command: LevinCommand) ->
}
/// This represents a connection to a peer.
pub(crate) struct Connection<Z: NetworkZone, A, CS, PS, PR, BrdcstStrm> {
pub(crate) struct Connection<Z: NetworkZone, A, CS, PR, BrdcstStrm> {
/// The peer sink - where we send messages to the peer.
peer_sink: Z::Sink,
@ -86,7 +86,7 @@ pub(crate) struct Connection<Z: NetworkZone, A, CS, PS, PR, BrdcstStrm> {
broadcast_stream: Pin<Box<BrdcstStrm>>,
/// The inner handler for any requests that come from the requested peer.
peer_request_handler: PeerRequestHandler<Z, A, CS, PS, PR>,
peer_request_handler: PeerRequestHandler<Z, A, CS, PR>,
/// The connection guard which will send signals to other parts of Cuprate when this connection is dropped.
connection_guard: ConnectionGuard,
@ -94,12 +94,11 @@ pub(crate) struct Connection<Z: NetworkZone, A, CS, PS, PR, BrdcstStrm> {
error: SharedError<PeerError>,
}
impl<Z, A, CS, PS, PR, BrdcstStrm> Connection<Z, A, CS, PS, PR, BrdcstStrm>
impl<Z, A, CS, PR, BrdcstStrm> Connection<Z, A, CS, PR, BrdcstStrm>
where
Z: NetworkZone,
A: AddressBook<Z>,
CS: CoreSyncSvc,
PS: PeerSyncSvc<Z>,
PR: ProtocolRequestHandler,
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
{
@ -108,7 +107,7 @@ where
peer_sink: Z::Sink,
client_rx: mpsc::Receiver<ConnectionTaskRequest>,
broadcast_stream: BrdcstStrm,
peer_request_handler: PeerRequestHandler<Z, A, CS, PS, PR>,
peer_request_handler: PeerRequestHandler<Z, A, CS, PR>,
connection_guard: ConnectionGuard,
error: SharedError<PeerError>,
) -> Self {

View file

@ -16,8 +16,8 @@ use tower::{Service, ServiceExt};
use crate::{
client::{handshaker::HandShaker, Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone, PeerSyncSvc,
ProtocolRequestHandler,
AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone,
ProtocolRequestHandlerMaker,
};
/// A request to connect to a peer.
@ -32,28 +32,27 @@ pub struct ConnectRequest<Z: NetworkZone> {
}
/// The connector service, this service connects to peer and returns the [`Client`].
pub struct Connector<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr> {
handshaker: HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>,
pub struct Connector<Z: NetworkZone, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr> {
handshaker: HandShaker<Z, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>,
}
impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
Connector<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
impl<Z: NetworkZone, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
Connector<Z, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
{
/// Create a new connector from a handshaker.
pub const fn new(
handshaker: HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>,
handshaker: HandShaker<Z, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>,
) -> Self {
Self { handshaker }
}
}
impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr, BrdcstStrm>
Service<ConnectRequest<Z>> for Connector<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
impl<Z: NetworkZone, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr, BrdcstStrm>
Service<ConnectRequest<Z>> for Connector<Z, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
where
AdrBook: AddressBook<Z> + Clone,
CSync: CoreSyncSvc + Clone,
PSync: PeerSyncSvc<Z> + Clone,
ProtoHdlr: ProtocolRequestHandler + Clone,
ProtoHdlrMkr: ProtocolRequestHandlerMaker<Z> + Clone,
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Clone + Send + 'static,
{

View file

@ -8,7 +8,7 @@ use std::{
future::Future,
marker::PhantomData,
pin::Pin,
sync::Arc,
sync::{Arc, Mutex},
task::{Context, Poll},
};
@ -40,10 +40,9 @@ use crate::{
PING_TIMEOUT,
},
handles::HandleBuilder,
services::PeerSyncRequest,
AddressBook, AddressBookRequest, AddressBookResponse, BroadcastMessage, ConnectionDirection,
CoreSyncDataRequest, CoreSyncDataResponse, CoreSyncSvc, NetZoneAddress, NetworkZone,
PeerSyncSvc, ProtocolRequestHandler, SharedError,
ProtocolRequestHandlerMaker, SharedError,
};
pub mod builder;
@ -87,15 +86,13 @@ pub struct DoHandshakeRequest<Z: NetworkZone> {
/// The peer handshaking service.
#[derive(Debug, Clone)]
pub struct HandShaker<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr> {
pub struct HandShaker<Z: NetworkZone, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr> {
/// The address book service.
address_book: AdrBook,
/// The core sync data service.
core_sync_svc: CSync,
/// The peer sync service.
peer_sync_svc: PSync,
/// The protocol request handler service.
protocol_request_svc: ProtoHdlr,
protocol_request_svc_maker: ProtoHdlrMkr,
/// Our [`BasicNodeData`]
our_basic_node_data: BasicNodeData,
@ -109,24 +106,22 @@ pub struct HandShaker<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstSt
_zone: PhantomData<Z>,
}
impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
impl<Z: NetworkZone, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
HandShaker<Z, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
{
/// Creates a new handshaker.
const fn new(
address_book: AdrBook,
peer_sync_svc: PSync,
core_sync_svc: CSync,
protocol_request_svc: ProtoHdlr,
protocol_request_svc_maker: ProtoHdlrMkr,
broadcast_stream_maker: BrdcstStrmMkr,
our_basic_node_data: BasicNodeData,
connection_parent_span: Span,
) -> Self {
Self {
address_book,
peer_sync_svc,
core_sync_svc,
protocol_request_svc,
protocol_request_svc_maker,
broadcast_stream_maker,
our_basic_node_data,
connection_parent_span,
@ -135,14 +130,12 @@ impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
}
}
impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr, BrdcstStrm>
Service<DoHandshakeRequest<Z>>
for HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
impl<Z: NetworkZone, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr, BrdcstStrm>
Service<DoHandshakeRequest<Z>> for HandShaker<Z, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
where
AdrBook: AddressBook<Z> + Clone,
CSync: CoreSyncSvc + Clone,
PSync: PeerSyncSvc<Z> + Clone,
ProtoHdlr: ProtocolRequestHandler + Clone,
ProtoHdlrMkr: ProtocolRequestHandlerMaker<Z> + Clone,
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Clone + Send + 'static,
{
@ -159,9 +152,8 @@ where
let broadcast_stream_maker = self.broadcast_stream_maker.clone();
let address_book = self.address_book.clone();
let protocol_request_svc = self.protocol_request_svc.clone();
let protocol_request_svc_maker = self.protocol_request_svc_maker.clone();
let core_sync_svc = self.core_sync_svc.clone();
let peer_sync_svc = self.peer_sync_svc.clone();
let our_basic_node_data = self.our_basic_node_data.clone();
let connection_parent_span = self.connection_parent_span.clone();
@ -176,8 +168,7 @@ where
broadcast_stream_maker,
address_book,
core_sync_svc,
peer_sync_svc,
protocol_request_svc,
protocol_request_svc_maker,
our_basic_node_data,
connection_parent_span,
),
@ -231,24 +222,21 @@ pub async fn ping<N: NetworkZone>(addr: N::Addr) -> Result<u64, HandshakeError>
}
/// This function completes a handshake with the requested peer.
#[expect(clippy::too_many_arguments)]
async fn handshake<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr, BrdcstStrm>(
async fn handshake<Z: NetworkZone, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr, BrdcstStrm>(
req: DoHandshakeRequest<Z>,
broadcast_stream_maker: BrdcstStrmMkr,
mut address_book: AdrBook,
mut core_sync_svc: CSync,
mut peer_sync_svc: PSync,
protocol_request_handler: ProtoHdlr,
mut protocol_request_svc_maker: ProtoHdlrMkr,
our_basic_node_data: BasicNodeData,
connection_parent_span: Span,
) -> Result<Client<Z>, HandshakeError>
where
AdrBook: AddressBook<Z> + Clone,
CSync: CoreSyncSvc + Clone,
PSync: PeerSyncSvc<Z> + Clone,
ProtoHdlr: ProtocolRequestHandler,
ProtoHdlrMkr: ProtocolRequestHandlerMaker<Z>,
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Send + 'static,
{
@ -458,17 +446,6 @@ where
})
.await?;
// Tell the core sync service about the new peer.
peer_sync_svc
.ready()
.await?
.call(PeerSyncRequest::IncomingCoreSyncData(
addr,
handle.clone(),
peer_core_sync,
))
.await?;
// Set up the connection data.
let error_slot = SharedError::new();
let (connection_tx, client_rx) = mpsc::channel(1);
@ -478,18 +455,25 @@ where
handle,
direction,
pruning_seed,
core_sync_data: Arc::new(Mutex::new(peer_core_sync)),
};
let protocol_request_handler = protocol_request_svc_maker
.as_service()
.ready()
.await?
.call(info.clone())
.await?;
let request_handler = PeerRequestHandler {
address_book_svc: address_book.clone(),
our_sync_svc: core_sync_svc.clone(),
peer_sync_svc: peer_sync_svc.clone(),
protocol_request_handler,
our_basic_node_data,
peer_info: info.clone(),
};
let connection = Connection::<Z, _, _, _, _, _>::new(
let connection = Connection::<Z, _, _, _, _>::new(
peer_sink,
client_rx,
broadcast_stream_maker(addr),
@ -509,13 +493,11 @@ where
let semaphore = Arc::new(Semaphore::new(1));
let timeout_handle = tokio::spawn(connection_timeout_monitor_task(
info.id,
info.handle.clone(),
info.clone(),
connection_tx.clone(),
Arc::clone(&semaphore),
address_book,
core_sync_svc,
peer_sync_svc,
));
let client = Client::<Z>::new(

View file

@ -1,24 +1,23 @@
use std::marker::PhantomData;
use std::{convert::Infallible, marker::PhantomData};
use futures::{stream, Stream};
use tower::{make::Shared, util::MapErr};
use tracing::Span;
use cuprate_wire::BasicNodeData;
use crate::{
client::{handshaker::HandShaker, InternalPeerID},
AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, PeerSyncSvc, ProtocolRequestHandler,
AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, ProtocolRequestHandlerMaker,
};
mod dummy;
pub use dummy::{
DummyAddressBook, DummyCoreSyncSvc, DummyPeerSyncSvc, DummyProtocolRequestHandler,
};
pub use dummy::{DummyAddressBook, DummyCoreSyncSvc, DummyProtocolRequestHandler};
/// A [`HandShaker`] [`Service`](tower::Service) builder.
///
/// This builder applies default values to make usage easier, behaviour and drawbacks of the defaults are documented
/// on the `with_*` method to change it, for example [`HandshakerBuilder::with_protocol_request_handler`].
/// on the `with_*` method to change it, for example [`HandshakerBuilder::with_protocol_request_handler_maker`].
///
/// If you want to use any network other than [`Mainnet`](crate::Network::Mainnet)
/// you will need to change the core sync service with [`HandshakerBuilder::with_core_sync_svc`],
@ -28,8 +27,7 @@ pub struct HandshakerBuilder<
N: NetworkZone,
AdrBook = DummyAddressBook,
CSync = DummyCoreSyncSvc,
PSync = DummyPeerSyncSvc,
ProtoHdlr = DummyProtocolRequestHandler,
ProtoHdlrMkr = MapErr<Shared<DummyProtocolRequestHandler>, fn(Infallible) -> tower::BoxError>,
BrdcstStrmMkr = fn(
InternalPeerID<<N as NetworkZone>::Addr>,
) -> stream::Pending<BroadcastMessage>,
@ -38,10 +36,8 @@ pub struct HandshakerBuilder<
address_book: AdrBook,
/// The core sync data service.
core_sync_svc: CSync,
/// The peer sync service.
peer_sync_svc: PSync,
/// The protocol request service.
protocol_request_svc: ProtoHdlr,
protocol_request_svc_maker: ProtoHdlrMkr,
/// Our [`BasicNodeData`]
our_basic_node_data: BasicNodeData,
/// A function that returns a stream that will give items to be broadcast by a connection.
@ -59,8 +55,10 @@ impl<N: NetworkZone> HandshakerBuilder<N> {
Self {
address_book: DummyAddressBook,
core_sync_svc: DummyCoreSyncSvc::static_mainnet_genesis(),
peer_sync_svc: DummyPeerSyncSvc,
protocol_request_svc: DummyProtocolRequestHandler,
protocol_request_svc_maker: MapErr::new(
Shared::new(DummyProtocolRequestHandler),
tower::BoxError::from,
),
our_basic_node_data,
broadcast_stream_maker: |_| stream::pending(),
connection_parent_span: None,
@ -69,8 +67,8 @@ impl<N: NetworkZone> HandshakerBuilder<N> {
}
}
impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
HandshakerBuilder<N, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
impl<N: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
HandshakerBuilder<N, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
{
/// Changes the address book to the provided one.
///
@ -83,14 +81,13 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
pub fn with_address_book<NAdrBook>(
self,
new_address_book: NAdrBook,
) -> HandshakerBuilder<N, NAdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
) -> HandshakerBuilder<N, NAdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
where
NAdrBook: AddressBook<N> + Clone,
{
let Self {
core_sync_svc,
peer_sync_svc,
protocol_request_svc,
protocol_request_svc_maker,
our_basic_node_data,
broadcast_stream_maker,
connection_parent_span,
@ -100,8 +97,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
HandshakerBuilder {
address_book: new_address_book,
core_sync_svc,
peer_sync_svc,
protocol_request_svc,
protocol_request_svc_maker,
our_basic_node_data,
broadcast_stream_maker,
connection_parent_span,
@ -125,14 +121,13 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
pub fn with_core_sync_svc<NCSync>(
self,
new_core_sync_svc: NCSync,
) -> HandshakerBuilder<N, AdrBook, NCSync, PSync, ProtoHdlr, BrdcstStrmMkr>
) -> HandshakerBuilder<N, AdrBook, NCSync, ProtoHdlr, BrdcstStrmMkr>
where
NCSync: CoreSyncSvc + Clone,
{
let Self {
address_book,
peer_sync_svc,
protocol_request_svc,
protocol_request_svc_maker,
our_basic_node_data,
broadcast_stream_maker,
connection_parent_span,
@ -142,8 +137,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
HandshakerBuilder {
address_book,
core_sync_svc: new_core_sync_svc,
peer_sync_svc,
protocol_request_svc,
protocol_request_svc_maker,
our_basic_node_data,
broadcast_stream_maker,
connection_parent_span,
@ -151,60 +145,24 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
}
}
/// Changes the peer sync service, which keeps track of peers sync states.
///
/// ## Default Peer Sync Service
///
/// The default peer sync service will be used if this method is not called.
///
/// The default peer sync service will not keep track of peers sync states.
pub fn with_peer_sync_svc<NPSync>(
self,
new_peer_sync_svc: NPSync,
) -> HandshakerBuilder<N, AdrBook, CSync, NPSync, ProtoHdlr, BrdcstStrmMkr>
where
NPSync: PeerSyncSvc<N> + Clone,
{
let Self {
address_book,
core_sync_svc,
protocol_request_svc,
our_basic_node_data,
broadcast_stream_maker,
connection_parent_span,
..
} = self;
HandshakerBuilder {
address_book,
core_sync_svc,
peer_sync_svc: new_peer_sync_svc,
protocol_request_svc,
our_basic_node_data,
broadcast_stream_maker,
connection_parent_span,
_zone: PhantomData,
}
}
/// Changes the protocol request handler, which handles [`ProtocolRequest`](crate::ProtocolRequest)s to our node.
/// Changes the protocol request handler maker, which creates the service that handles [`ProtocolRequest`](crate::ProtocolRequest)s
/// to our node.
///
/// ## Default Protocol Request Handler
///
/// The default protocol request handler will not respond to any protocol requests, this should not
/// The default service maker will create services that will not respond to any protocol requests, this should not
/// be an issue as long as peers do not think we are ahead of them, if they do they will send requests
/// for our blocks, and we won't respond which will cause them to disconnect.
pub fn with_protocol_request_handler<NProtoHdlr>(
pub fn with_protocol_request_handler_maker<NProtoHdlrMkr>(
self,
new_protocol_handler: NProtoHdlr,
) -> HandshakerBuilder<N, AdrBook, CSync, PSync, NProtoHdlr, BrdcstStrmMkr>
new_protocol_request_svc_maker: NProtoHdlrMkr,
) -> HandshakerBuilder<N, AdrBook, CSync, NProtoHdlrMkr, BrdcstStrmMkr>
where
NProtoHdlr: ProtocolRequestHandler + Clone,
NProtoHdlrMkr: ProtocolRequestHandlerMaker<N> + Clone,
{
let Self {
address_book,
core_sync_svc,
peer_sync_svc,
our_basic_node_data,
broadcast_stream_maker,
connection_parent_span,
@ -214,8 +172,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
HandshakerBuilder {
address_book,
core_sync_svc,
peer_sync_svc,
protocol_request_svc: new_protocol_handler,
protocol_request_svc_maker: new_protocol_request_svc_maker,
our_basic_node_data,
broadcast_stream_maker,
connection_parent_span,
@ -233,7 +190,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
pub fn with_broadcast_stream_maker<NBrdcstStrmMkr, BrdcstStrm>(
self,
new_broadcast_stream_maker: NBrdcstStrmMkr,
) -> HandshakerBuilder<N, AdrBook, CSync, PSync, ProtoHdlr, NBrdcstStrmMkr>
) -> HandshakerBuilder<N, AdrBook, CSync, ProtoHdlr, NBrdcstStrmMkr>
where
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
NBrdcstStrmMkr: Fn(InternalPeerID<N::Addr>) -> BrdcstStrm + Clone + Send + 'static,
@ -241,8 +198,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
let Self {
address_book,
core_sync_svc,
peer_sync_svc,
protocol_request_svc,
protocol_request_svc_maker,
our_basic_node_data,
connection_parent_span,
..
@ -251,8 +207,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
HandshakerBuilder {
address_book,
core_sync_svc,
peer_sync_svc,
protocol_request_svc,
protocol_request_svc_maker,
our_basic_node_data,
broadcast_stream_maker: new_broadcast_stream_maker,
connection_parent_span,
@ -274,12 +229,11 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
}
/// Builds the [`HandShaker`].
pub fn build(self) -> HandShaker<N, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr> {
pub fn build(self) -> HandShaker<N, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr> {
HandShaker::new(
self.address_book,
self.peer_sync_svc,
self.core_sync_svc,
self.protocol_request_svc,
self.protocol_request_svc_maker,
self.broadcast_stream_maker,
self.our_basic_node_data,
self.connection_parent_span.unwrap_or(Span::none()),

View file

@ -10,32 +10,10 @@ use cuprate_wire::CoreSyncData;
use crate::{
services::{
AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse,
PeerSyncRequest, PeerSyncResponse,
},
NetworkZone, ProtocolRequest, ProtocolResponse,
};
/// A dummy peer sync service, that doesn't actually keep track of peers sync states.
#[derive(Debug, Clone)]
pub struct DummyPeerSyncSvc;
impl<N: NetworkZone> Service<PeerSyncRequest<N>> for DummyPeerSyncSvc {
type Response = PeerSyncResponse<N>;
type Error = tower::BoxError;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: PeerSyncRequest<N>) -> Self::Future {
ready(Ok(match req {
PeerSyncRequest::PeersToSyncFrom { .. } => PeerSyncResponse::PeersToSyncFrom(vec![]),
PeerSyncRequest::IncomingCoreSyncData(_, _, _) => PeerSyncResponse::Ok,
}))
}
}
/// A dummy core sync service that just returns static [`CoreSyncData`].
#[derive(Debug, Clone)]
pub struct DummyCoreSyncSvc(CoreSyncData);

View file

@ -14,10 +14,8 @@ use crate::{
constants::MAX_PEERS_IN_PEER_LIST_MESSAGE,
services::{
AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse,
PeerSyncRequest,
},
AddressBook, CoreSyncSvc, NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc,
ProtocolRequestHandler,
AddressBook, CoreSyncSvc, NetworkZone, PeerRequest, PeerResponse, ProtocolRequestHandler,
};
#[derive(thiserror::Error, Debug, Copy, Clone, Eq, PartialEq)]
@ -28,13 +26,11 @@ enum PeerRequestHandlerError {
/// The peer request handler, handles incoming [`PeerRequest`]s to our node.
#[derive(Debug, Clone)]
pub(crate) struct PeerRequestHandler<Z: NetworkZone, A, CS, PS, PR> {
pub(crate) struct PeerRequestHandler<Z: NetworkZone, A, CS, PR> {
/// The address book service.
pub address_book_svc: A,
/// Our core sync service.
pub our_sync_svc: CS,
/// The peer sync service.
pub peer_sync_svc: PS,
/// The handler for [`ProtocolRequest`](crate::ProtocolRequest)s to our node.
pub protocol_request_handler: PR,
@ -46,12 +42,11 @@ pub(crate) struct PeerRequestHandler<Z: NetworkZone, A, CS, PS, PR> {
pub peer_info: PeerInformation<Z::Addr>,
}
impl<Z, A, CS, PS, PR> PeerRequestHandler<Z, A, CS, PS, PR>
impl<Z, A, CS, PR> PeerRequestHandler<Z, A, CS, PR>
where
Z: NetworkZone,
A: AddressBook<Z>,
CS: CoreSyncSvc,
PS: PeerSyncSvc<Z>,
PR: ProtocolRequestHandler,
{
/// Handles an incoming [`PeerRequest`] to our node.
@ -104,18 +99,7 @@ where
) -> Result<TimedSyncResponse, tower::BoxError> {
// TODO: add a limit on the amount of these requests in a certain time period.
let peer_id = self.peer_info.id;
let handle = self.peer_info.handle.clone();
self.peer_sync_svc
.ready()
.await?
.call(PeerSyncRequest::IncomingCoreSyncData(
peer_id,
handle,
req.payload_data,
))
.await?;
*self.peer_info.core_sync_data.lock().unwrap() = req.payload_data;
let AddressBookResponse::Peers(peers) = self
.address_book_svc

View file

@ -15,36 +15,35 @@ use tracing::instrument;
use cuprate_wire::{admin::TimedSyncRequest, AdminRequestMessage, AdminResponseMessage};
use crate::{
client::{connection::ConnectionTaskRequest, InternalPeerID},
client::{connection::ConnectionTaskRequest, PeerInformation},
constants::{MAX_PEERS_IN_PEER_LIST_MESSAGE, TIMEOUT_INTERVAL},
handles::ConnectionHandle,
services::{AddressBookRequest, CoreSyncDataRequest, CoreSyncDataResponse, PeerSyncRequest},
AddressBook, CoreSyncSvc, NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc,
services::{AddressBookRequest, CoreSyncDataRequest, CoreSyncDataResponse},
AddressBook, CoreSyncSvc, NetworkZone, PeerRequest, PeerResponse,
};
/// The timeout monitor task, this task will send periodic timed sync requests to the peer to make sure it is still active.
#[instrument(
name = "timeout_monitor",
level = "debug",
fields(addr = %id),
fields(addr = %peer_information.id),
skip_all,
)]
pub async fn connection_timeout_monitor_task<N: NetworkZone, AdrBook, CSync, PSync>(
id: InternalPeerID<N::Addr>,
handle: ConnectionHandle,
pub async fn connection_timeout_monitor_task<N: NetworkZone, AdrBook, CSync>(
peer_information: PeerInformation<N::Addr>,
connection_tx: mpsc::Sender<ConnectionTaskRequest>,
semaphore: Arc<Semaphore>,
mut address_book_svc: AdrBook,
mut core_sync_svc: CSync,
mut peer_core_sync_svc: PSync,
) -> Result<(), tower::BoxError>
where
AdrBook: AddressBook<N>,
CSync: CoreSyncSvc,
PSync: PeerSyncSvc<N>,
{
let connection_tx_weak = connection_tx.downgrade();
drop(connection_tx);
// Instead of tracking the time from last message from the peer and sending a timed sync if this value is too high,
// we just send a timed sync every [TIMEOUT_INTERVAL] seconds.
let mut interval = interval(TIMEOUT_INTERVAL);
@ -59,10 +58,10 @@ where
tracing::trace!("timeout monitor tick.");
if connection_tx.is_closed() {
let Some(connection_tx) = connection_tx_weak.upgrade() else {
tracing::debug!("Closing timeout monitor, connection disconnected.");
return Ok(());
}
};
let Ok(permit) = Arc::clone(&semaphore).try_acquire_owned() else {
// If we can't get a permit the connection is currently waiting for a response, so no need to
@ -122,15 +121,6 @@ where
))
.await?;
// Tell the peer sync service about the peers core sync data
peer_core_sync_svc
.ready()
.await?
.call(PeerSyncRequest::IncomingCoreSyncData(
id,
handle.clone(),
timed_sync.payload_data,
))
.await?;
*peer_information.core_sync_data.lock().unwrap() = timed_sync.payload_data;
}
}

View file

@ -66,7 +66,7 @@ cfg_if::cfg_if! {
}
}
use std::{fmt::Debug, future::Future, hash::Hash};
use std::{fmt::Debug, hash::Hash};
use futures::{Sink, Stream};
@ -192,55 +192,26 @@ pub trait NetworkZone: Clone + Copy + Send + 'static {
// Below here is just helper traits, so we don't have to type out tower::Service bounds
// everywhere but still get to use tower.
pub trait PeerSyncSvc<Z: NetworkZone>:
tower::Service<
PeerSyncRequest<Z>,
Response = PeerSyncResponse<Z>,
Error = tower::BoxError,
Future = Self::Future2,
> + Send
+ 'static
{
// This allows us to put more restrictive bounds on the future without defining the future here
// explicitly.
type Future2: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static;
}
impl<T, Z: NetworkZone> PeerSyncSvc<Z> for T
where
T: tower::Service<PeerSyncRequest<Z>, Response = PeerSyncResponse<Z>, Error = tower::BoxError>
+ Send
+ 'static,
T::Future: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static,
{
type Future2 = T::Future;
}
pub trait AddressBook<Z: NetworkZone>:
tower::Service<
AddressBookRequest<Z>,
Response = AddressBookResponse<Z>,
Error = tower::BoxError,
Future = Self::Future2,
Future: Send + 'static,
> + Send
+ 'static
{
// This allows us to put more restrictive bounds on the future without defining the future here
// explicitly.
type Future2: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static;
}
impl<T, Z: NetworkZone> AddressBook<Z> for T
where
impl<T, Z: NetworkZone> AddressBook<Z> for T where
T: tower::Service<
AddressBookRequest<Z>,
Response = AddressBookResponse<Z>,
Error = tower::BoxError,
Future: Send + 'static,
> + Send
+ 'static,
T::Future: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static,
+ 'static
{
type Future2 = T::Future;
}
pub trait CoreSyncSvc:
@ -248,26 +219,21 @@ pub trait CoreSyncSvc:
CoreSyncDataRequest,
Response = CoreSyncDataResponse,
Error = tower::BoxError,
Future = Self::Future2,
Future: Send + 'static,
> + Send
+ 'static
{
// This allows us to put more restrictive bounds on the future without defining the future here
// explicitly.
type Future2: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static;
}
impl<T> CoreSyncSvc for T
where
impl<T> CoreSyncSvc for T where
T: tower::Service<
CoreSyncDataRequest,
Response = CoreSyncDataResponse,
Error = tower::BoxError,
Future: Send + 'static,
> + Send
+ 'static,
T::Future: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static,
+ 'static
{
type Future2 = T::Future;
}
pub trait ProtocolRequestHandler:
@ -275,21 +241,43 @@ pub trait ProtocolRequestHandler:
ProtocolRequest,
Response = ProtocolResponse,
Error = tower::BoxError,
Future = Self::Future2,
Future: Send + 'static,
> + Send
+ 'static
{
// This allows us to put more restrictive bounds on the future without defining the future here
// explicitly.
type Future2: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static;
}
impl<T> ProtocolRequestHandler for T
where
T: tower::Service<ProtocolRequest, Response = ProtocolResponse, Error = tower::BoxError>
+ Send
+ 'static,
T::Future: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static,
impl<T> ProtocolRequestHandler for T where
T: tower::Service<
ProtocolRequest,
Response = ProtocolResponse,
Error = tower::BoxError,
Future: Send + 'static,
> + Send
+ 'static
{
}
pub trait ProtocolRequestHandlerMaker<Z: NetworkZone>:
tower::MakeService<
client::PeerInformation<Z::Addr>,
ProtocolRequest,
MakeError = tower::BoxError,
Service: ProtocolRequestHandler,
Future: Send + 'static,
> + Send
+ 'static
{
}
impl<T, Z: NetworkZone> ProtocolRequestHandlerMaker<Z> for T where
T: tower::MakeService<
client::PeerInformation<Z::Addr>,
ProtocolRequest,
MakeError = tower::BoxError,
Service: ProtocolRequestHandler,
Future: Send + 'static,
> + Send
+ 'static
{
type Future2 = T::Future;
}

View file

@ -6,28 +6,6 @@ use crate::{
NetworkZone,
};
/// A request to the service that keeps track of peers sync states.
pub enum PeerSyncRequest<N: NetworkZone> {
/// Request some peers to sync from.
///
/// This takes in the current cumulative difficulty of our chain and will return peers that
/// claim to have a higher cumulative difficulty.
PeersToSyncFrom {
current_cumulative_difficulty: u128,
block_needed: Option<usize>,
},
/// Add/update a peer's core sync data.
IncomingCoreSyncData(InternalPeerID<N::Addr>, ConnectionHandle, CoreSyncData),
}
/// A response from the service that keeps track of peers sync states.
pub enum PeerSyncResponse<N: NetworkZone> {
/// The return value of [`PeerSyncRequest::PeersToSyncFrom`].
PeersToSyncFrom(Vec<InternalPeerID<N::Addr>>),
/// A generic ok response.
Ok,
}
/// A request to the core sync service for our node's [`CoreSyncData`].
pub struct CoreSyncDataRequest;

View file

@ -30,7 +30,6 @@ thiserror = { workspace = true }
bytes = { workspace = true, features = ["std"] }
rand = { workspace = true, features = ["std", "std_rng"] }
rand_distr = { workspace = true, features = ["std"] }
hex = { workspace = true, features = ["std"] }
tracing = { workspace = true, features = ["std", "attributes"] }
borsh = { workspace = true, features = ["derive", "std"] }

View file

@ -22,11 +22,7 @@ use tower::{Service, ServiceExt};
use tracing::{instrument, Instrument, Span};
use cuprate_async_buffer::{BufferAppender, BufferStream};
use cuprate_p2p_core::{
handles::ConnectionHandle,
services::{PeerSyncRequest, PeerSyncResponse},
NetworkZone, PeerSyncSvc,
};
use cuprate_p2p_core::{handles::ConnectionHandle, NetworkZone};
use cuprate_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT};
use crate::{
@ -137,14 +133,12 @@ pub enum ChainSvcResponse {
/// The block downloader may fail before the whole chain is downloaded. If this is the case you can
/// call this function again, so it can start the search again.
#[instrument(level = "error", skip_all, name = "block_downloader")]
pub fn download_blocks<N: NetworkZone, S, C>(
pub fn download_blocks<N: NetworkZone, C>(
client_pool: Arc<ClientPool<N>>,
peer_sync_svc: S,
our_chain_svc: C,
config: BlockDownloaderConfig,
) -> BufferStream<BlockBatch>
where
S: PeerSyncSvc<N> + Clone,
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
+ Send
+ 'static,
@ -152,13 +146,8 @@ where
{
let (buffer_appender, buffer_stream) = cuprate_async_buffer::new_buffer(config.buffer_size);
let block_downloader = BlockDownloader::new(
client_pool,
peer_sync_svc,
our_chain_svc,
buffer_appender,
config,
);
let block_downloader =
BlockDownloader::new(client_pool, our_chain_svc, buffer_appender, config);
tokio::spawn(
block_downloader
@ -195,12 +184,10 @@ where
/// - request the next chain entry
/// - download an already requested batch of blocks (this might happen due to an error in the previous request
/// or because the queue of ready blocks is too large, so we need the oldest block to clear it).
struct BlockDownloader<N: NetworkZone, S, C> {
struct BlockDownloader<N: NetworkZone, C> {
/// The client pool.
client_pool: Arc<ClientPool<N>>,
/// The service that holds the peer's sync states.
peer_sync_svc: S,
/// The service that holds our current chain state.
our_chain_svc: C,
@ -238,9 +225,8 @@ struct BlockDownloader<N: NetworkZone, S, C> {
config: BlockDownloaderConfig,
}
impl<N: NetworkZone, S, C> BlockDownloader<N, S, C>
impl<N: NetworkZone, C> BlockDownloader<N, C>
where
S: PeerSyncSvc<N> + Clone,
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
+ Send
+ 'static,
@ -249,16 +235,12 @@ where
/// Creates a new [`BlockDownloader`]
fn new(
client_pool: Arc<ClientPool<N>>,
peer_sync_svc: S,
our_chain_svc: C,
buffer_appender: BufferAppender<BlockBatch>,
config: BlockDownloaderConfig,
) -> Self {
Self {
client_pool,
peer_sync_svc,
our_chain_svc,
amount_of_blocks_to_request: config.initial_batch_size,
amount_of_blocks_to_request_updated_at: 0,
@ -495,22 +477,10 @@ where
panic!("Chain service returned wrong response.");
};
let PeerSyncResponse::PeersToSyncFrom(peers) = self
.peer_sync_svc
.ready()
.await?
.call(PeerSyncRequest::PeersToSyncFrom {
current_cumulative_difficulty,
block_needed: None,
})
.await?
else {
panic!("Peer sync service returned wrong response.");
};
tracing::debug!("Response received from peer sync service");
for client in self.client_pool.borrow_clients(&peers) {
for client in self
.client_pool
.clients_with_more_cumulative_difficulty(current_cumulative_difficulty)
{
pending_peers
.entry(client.info.pruning_seed)
.or_default()
@ -621,12 +591,8 @@ where
/// Starts the main loop of the block downloader.
async fn run(mut self) -> Result<(), BlockDownloadError> {
let mut chain_tracker = initial_chain_search(
&self.client_pool,
self.peer_sync_svc.clone(),
&mut self.our_chain_svc,
)
.await?;
let mut chain_tracker =
initial_chain_search(&self.client_pool, &mut self.our_chain_svc).await?;
let mut pending_peers = BTreeMap::new();

View file

@ -1,16 +1,12 @@
use std::{mem, sync::Arc};
use rand::prelude::SliceRandom;
use rand::thread_rng;
use tokio::{task::JoinSet, time::timeout};
use tower::{Service, ServiceExt};
use tracing::{instrument, Instrument, Span};
use cuprate_p2p_core::{
client::InternalPeerID,
handles::ConnectionHandle,
services::{PeerSyncRequest, PeerSyncResponse},
NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc, ProtocolRequest, ProtocolResponse,
client::InternalPeerID, handles::ConnectionHandle, NetworkZone, PeerRequest, PeerResponse,
ProtocolRequest, ProtocolResponse,
};
use cuprate_wire::protocol::{ChainRequest, ChainResponse};
@ -83,13 +79,11 @@ pub(crate) async fn request_chain_entry_from_peer<N: NetworkZone>(
///
/// We then wait for their response and choose the peer who claims the highest cumulative difficulty.
#[instrument(level = "error", skip_all)]
pub async fn initial_chain_search<N: NetworkZone, S, C>(
pub async fn initial_chain_search<N: NetworkZone, C>(
client_pool: &Arc<ClientPool<N>>,
mut peer_sync_svc: S,
mut our_chain_svc: C,
) -> Result<ChainTracker<N>, BlockDownloadError>
where
S: PeerSyncSvc<N>,
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>,
{
tracing::debug!("Getting our chain history");
@ -108,29 +102,9 @@ where
let our_genesis = *block_ids.last().expect("Blockchain had no genesis block.");
tracing::debug!("Getting a list of peers with higher cumulative difficulty");
let PeerSyncResponse::PeersToSyncFrom(mut peers) = peer_sync_svc
.ready()
.await?
.call(PeerSyncRequest::PeersToSyncFrom {
block_needed: None,
current_cumulative_difficulty: cumulative_difficulty,
})
.await?
else {
panic!("peer sync service sent wrong response.");
};
tracing::debug!(
"{} peers claim they have a higher cumulative difficulty",
peers.len()
);
// Shuffle the list to remove any possibility of peers being able to prioritize getting picked.
peers.shuffle(&mut thread_rng());
let mut peers = client_pool.borrow_clients(&peers);
let mut peers = client_pool
.clients_with_more_cumulative_difficulty(cumulative_difficulty)
.into_iter();
let mut futs = JoinSet::new();

View file

@ -2,7 +2,7 @@ use std::{
fmt::{Debug, Formatter},
future::Future,
pin::Pin,
sync::Arc,
sync::{Arc, Mutex},
task::{Context, Poll},
time::Duration,
};
@ -20,13 +20,14 @@ use tower::{service_fn, Service};
use cuprate_fixed_bytes::ByteArrayVec;
use cuprate_p2p_core::{
client::{mock_client, Client, InternalPeerID, PeerInformation},
services::{PeerSyncRequest, PeerSyncResponse},
ClearNet, ConnectionDirection, NetworkZone, PeerRequest, PeerResponse, ProtocolRequest,
ProtocolResponse,
ClearNet, ConnectionDirection, PeerRequest, PeerResponse, ProtocolRequest, ProtocolResponse,
};
use cuprate_pruning::PruningSeed;
use cuprate_types::{BlockCompleteEntry, TransactionBlobs};
use cuprate_wire::protocol::{ChainResponse, GetObjectsResponse};
use cuprate_wire::{
protocol::{ChainResponse, GetObjectsResponse},
CoreSyncData,
};
use crate::{
block_downloader::{download_blocks, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
@ -52,19 +53,14 @@ proptest! {
timeout(Duration::from_secs(600), async move {
let client_pool = ClientPool::new();
let mut peer_ids = Vec::with_capacity(peers);
for _ in 0..peers {
let client = mock_block_downloader_client(Arc::clone(&blockchain));
peer_ids.push(client.info.id);
client_pool.add_new_client(client);
}
let stream = download_blocks(
client_pool,
SyncStateSvc(peer_ids) ,
OurChainSvc {
genesis: *blockchain.blocks.first().unwrap().0
},
@ -255,31 +251,19 @@ fn mock_block_downloader_client(blockchain: Arc<MockBlockchain>) -> Client<Clear
handle: connection_handle,
direction: ConnectionDirection::Inbound,
pruning_seed: PruningSeed::NotPruned,
core_sync_data: Arc::new(Mutex::new(CoreSyncData {
cumulative_difficulty: u64::MAX,
cumulative_difficulty_top64: u64::MAX,
current_height: 0,
pruning_seed: 0,
top_id: [0; 32],
top_version: 0,
})),
};
mock_client(info, connection_guard, request_handler)
}
#[derive(Clone)]
struct SyncStateSvc<Z: NetworkZone>(Vec<InternalPeerID<Z::Addr>>);
impl Service<PeerSyncRequest<ClearNet>> for SyncStateSvc<ClearNet> {
type Response = PeerSyncResponse<ClearNet>;
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: PeerSyncRequest<ClearNet>) -> Self::Future {
let peers = self.0.clone();
async move { Ok(PeerSyncResponse::PeersToSyncFrom(peers)) }.boxed()
}
}
struct OurChainSvc {
genesis: [u8; 32],
}

View file

@ -127,6 +127,32 @@ impl<N: NetworkZone> ClientPool<N> {
) -> impl Iterator<Item = ClientPoolDropGuard<N>> + sealed::Captures<(&'a (), &'b ())> {
peers.iter().filter_map(|peer| self.borrow_client(peer))
}
/// Borrows all [`Client`]s from the pool that have claimed a higher cumulative difficulty than
/// the amount passed in.
///
/// The [`Client`]s are wrapped in [`ClientPoolDropGuard`] which
/// will return the clients to the pool when they are dropped.
pub fn clients_with_more_cumulative_difficulty(
self: &Arc<Self>,
cumulative_difficulty: u128,
) -> Vec<ClientPoolDropGuard<N>> {
let peers = self
.clients
.iter()
.filter_map(|element| {
let peer_sync_info = element.value().info.core_sync_data.lock().unwrap();
if peer_sync_info.cumulative_difficulty() > cumulative_difficulty {
Some(*element.key())
} else {
None
}
})
.collect::<Vec<_>>();
self.borrow_clients(&peers).collect()
}
}
mod sealed {

View file

@ -16,6 +16,7 @@ pub(crate) const MAX_SEED_CONNECTIONS: usize = 3;
pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5);
/// The durations of a short ban.
#[cfg_attr(not(test), expect(dead_code))]
pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10);
/// The durations of a medium ban.

View file

@ -5,11 +5,7 @@
use std::sync::Arc;
use futures::FutureExt;
use tokio::{
sync::{mpsc, watch},
task::JoinSet,
};
use tokio_stream::wrappers::WatchStream;
use tokio::{sync::mpsc, task::JoinSet};
use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt};
use tracing::{instrument, Instrument, Span};
@ -17,8 +13,8 @@ use cuprate_async_buffer::BufferStream;
use cuprate_p2p_core::{
client::Connector,
client::InternalPeerID,
services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest},
CoreSyncSvc, NetworkZone, ProtocolRequestHandler,
services::{AddressBookRequest, AddressBookResponse},
CoreSyncSvc, NetworkZone, ProtocolRequestHandlerMaker,
};
mod block_downloader;
@ -28,7 +24,6 @@ pub mod config;
pub mod connection_maintainer;
mod constants;
mod inbound_server;
mod sync_states;
use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse};
pub use broadcast::{BroadcastRequest, BroadcastSvc};
@ -46,14 +41,14 @@ use connection_maintainer::MakeConnectionRequest;
/// - A core sync service, which keeps track of the sync state of our node
#[instrument(level = "debug", name = "net", skip_all, fields(zone = N::NAME))]
pub async fn initialize_network<N, PR, CS>(
protocol_request_handler: PR,
protocol_request_handler_maker: PR,
core_sync_svc: CS,
config: P2PConfig<N>,
) -> Result<NetworkInterface<N>, tower::BoxError>
where
N: NetworkZone,
N::Addr: borsh::BorshDeserialize + borsh::BorshSerialize,
PR: ProtocolRequestHandler + Clone,
PR: ProtocolRequestHandlerMaker<N> + Clone,
CS: CoreSyncSvc + Clone,
{
let address_book =
@ -63,12 +58,6 @@ where
config.max_inbound_connections + config.outbound_connections,
);
let (sync_states_svc, top_block_watch) = sync_states::PeerSyncSvc::new();
let sync_states_svc = Buffer::new(
sync_states_svc,
config.max_inbound_connections + config.outbound_connections,
);
// Use the default config. Changing the defaults affects tx fluff times, which could affect D++ so for now don't allow changing
// this.
let (broadcast_svc, outbound_mkr, inbound_mkr) =
@ -83,9 +72,8 @@ where
let outbound_handshaker_builder =
cuprate_p2p_core::client::HandshakerBuilder::new(basic_node_data)
.with_address_book(address_book.clone())
.with_peer_sync_svc(sync_states_svc.clone())
.with_core_sync_svc(core_sync_svc)
.with_protocol_request_handler(protocol_request_handler)
.with_protocol_request_handler_maker(protocol_request_handler_maker)
.with_broadcast_stream_maker(outbound_mkr)
.with_connection_parent_span(Span::current());
@ -136,9 +124,7 @@ where
Ok(NetworkInterface {
pool: client_pool,
broadcast_svc,
top_block_watch,
make_connection_tx,
sync_states_svc,
address_book: address_book.boxed_clone(),
_background_tasks: Arc::new(background_tasks),
})
@ -151,16 +137,11 @@ pub struct NetworkInterface<N: NetworkZone> {
pool: Arc<client_pool::ClientPool<N>>,
/// A [`Service`] that allows broadcasting to all connected peers.
broadcast_svc: BroadcastSvc<N>,
/// A [`watch`] channel that contains the highest seen cumulative difficulty and other info
/// on that claimed chain.
top_block_watch: watch::Receiver<sync_states::NewSyncInfo>,
/// A channel to request extra connections.
#[expect(dead_code, reason = "will be used eventually")]
make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
/// The address book service.
address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>,
/// The peer's sync states service.
sync_states_svc: Buffer<sync_states::PeerSyncSvc<N>, PeerSyncRequest<N>>,
/// Background tasks that will be aborted when this interface is dropped.
_background_tasks: Arc<JoinSet<()>>,
}
@ -183,17 +164,7 @@ impl<N: NetworkZone> NetworkInterface<N> {
+ 'static,
C::Future: Send + 'static,
{
block_downloader::download_blocks(
Arc::clone(&self.pool),
self.sync_states_svc.clone(),
our_chain_service,
config,
)
}
/// Returns a stream which yields the highest seen sync state from a connected peer.
pub fn top_sync_stream(&self) -> WatchStream<sync_states::NewSyncInfo> {
WatchStream::from_changes(self.top_block_watch.clone())
block_downloader::download_blocks(Arc::clone(&self.pool), our_chain_service, config)
}
/// Returns the address book service.

View file

@ -1,420 +0,0 @@
//! # Sync States
//!
//! This module contains a [`PeerSyncSvc`], which keeps track of the claimed chain states of connected peers.
//! This allows checking if we are behind and getting a list of peers who claim they are ahead.
use std::{
cmp::Ordering,
collections::{BTreeMap, HashMap, HashSet},
future::{ready, Ready},
task::{Context, Poll},
};
use futures::{stream::FuturesUnordered, StreamExt};
use tokio::sync::watch;
use tower::Service;
use cuprate_p2p_core::{
client::InternalPeerID,
handles::ConnectionHandle,
services::{PeerSyncRequest, PeerSyncResponse},
NetworkZone,
};
use cuprate_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT};
use cuprate_wire::CoreSyncData;
use crate::{client_pool::disconnect_monitor::PeerDisconnectFut, constants::SHORT_BAN};
/// The highest claimed sync info from our connected peers.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct NewSyncInfo {
/// The peers chain height.
pub chain_height: u64,
/// The peers top block's hash.
pub top_hash: [u8; 32],
/// The peers cumulative difficulty.
pub cumulative_difficulty: u128,
}
/// A service that keeps track of our peers blockchains.
///
/// This is the service that handles:
/// 1. Finding out if we need to sync
/// 1. Giving the peers that should be synced _from_, to the requester
pub(crate) struct PeerSyncSvc<N: NetworkZone> {
/// A map of cumulative difficulties to peers.
cumulative_difficulties: BTreeMap<u128, HashSet<InternalPeerID<N::Addr>>>,
/// A map of peers to cumulative difficulties.
peers: HashMap<InternalPeerID<N::Addr>, (u128, PruningSeed)>,
/// A watch channel for *a* top synced peer info.
new_height_watcher: watch::Sender<NewSyncInfo>,
/// The handle to the peer that has data in `new_height_watcher`.
last_peer_in_watcher_handle: Option<ConnectionHandle>,
/// A [`FuturesUnordered`] that resolves when a peer disconnects.
closed_connections: FuturesUnordered<PeerDisconnectFut<N>>,
}
impl<N: NetworkZone> PeerSyncSvc<N> {
/// Creates a new [`PeerSyncSvc`] with a [`Receiver`](watch::Receiver) that will be updated with
/// the highest seen sync data, this makes no guarantees about which peer will be chosen in case of a tie.
pub(crate) fn new() -> (Self, watch::Receiver<NewSyncInfo>) {
let (watch_tx, mut watch_rx) = watch::channel(NewSyncInfo {
chain_height: 0,
top_hash: [0; 32],
cumulative_difficulty: 0,
});
watch_rx.mark_unchanged();
(
Self {
cumulative_difficulties: BTreeMap::new(),
peers: HashMap::new(),
new_height_watcher: watch_tx,
last_peer_in_watcher_handle: None,
closed_connections: FuturesUnordered::new(),
},
watch_rx,
)
}
/// This function checks if any peers have disconnected, removing them if they have.
fn poll_disconnected(&mut self, cx: &mut Context<'_>) {
while let Poll::Ready(Some(peer_id)) = self.closed_connections.poll_next_unpin(cx) {
tracing::trace!("Peer {peer_id} disconnected, removing from peers sync info service.");
let (peer_cum_diff, _) = self.peers.remove(&peer_id).unwrap();
let cum_diff_peers = self
.cumulative_difficulties
.get_mut(&peer_cum_diff)
.unwrap();
cum_diff_peers.remove(&peer_id);
if cum_diff_peers.is_empty() {
// If this was the last peer remove the whole entry for this cumulative difficulty.
self.cumulative_difficulties.remove(&peer_cum_diff);
}
}
}
/// Returns a list of peers that claim to have a higher cumulative difficulty than `current_cum_diff`.
fn peers_to_sync_from(
&self,
current_cum_diff: u128,
block_needed: Option<usize>,
) -> Vec<InternalPeerID<N::Addr>> {
self.cumulative_difficulties
.range((current_cum_diff + 1)..)
.flat_map(|(_, peers)| peers)
.filter(|peer| {
if let Some(block_needed) = block_needed {
// we just use CRYPTONOTE_MAX_BLOCK_HEIGHT as the blockchain height, this only means
// we don't take into account the tip blocks which are not pruned.
self.peers[peer]
.1
.has_full_block(block_needed, CRYPTONOTE_MAX_BLOCK_HEIGHT)
} else {
true
}
})
.copied()
.collect()
}
/// Updates a peers sync state.
fn update_peer_sync_info(
&mut self,
peer_id: InternalPeerID<N::Addr>,
handle: ConnectionHandle,
core_sync_data: &CoreSyncData,
) -> Result<(), tower::BoxError> {
tracing::trace!(
"Received new core sync data from peer, top hash: {}",
hex::encode(core_sync_data.top_id)
);
let new_cumulative_difficulty = core_sync_data.cumulative_difficulty();
if let Some((old_cum_diff, _)) = self.peers.get_mut(&peer_id) {
match (*old_cum_diff).cmp(&new_cumulative_difficulty) {
Ordering::Equal => {
// If the cumulative difficulty of the peers chain hasn't changed then no need to update anything.
return Ok(());
}
Ordering::Greater => {
// This will only happen if a peer lowers its cumulative difficulty during the connection.
// This won't happen if a peer re-syncs their blockchain as then the connection would have closed.
tracing::debug!(
"Peer's claimed cumulative difficulty has dropped, closing connection and banning peer for: {} seconds.", SHORT_BAN.as_secs()
);
handle.ban_peer(SHORT_BAN);
return Err("Peers cumulative difficulty dropped".into());
}
Ordering::Less => (),
}
// Remove the old cumulative difficulty entry for this peer
let old_cum_diff_peers = self.cumulative_difficulties.get_mut(old_cum_diff).unwrap();
old_cum_diff_peers.remove(&peer_id);
if old_cum_diff_peers.is_empty() {
// If this was the last peer remove the whole entry for this cumulative difficulty.
self.cumulative_difficulties.remove(old_cum_diff);
}
// update the cumulative difficulty
*old_cum_diff = new_cumulative_difficulty;
} else {
// The peer is new so add it the list of peers.
self.peers.insert(
peer_id,
(
new_cumulative_difficulty,
PruningSeed::decompress_p2p_rules(core_sync_data.pruning_seed)?,
),
);
// add it to the list of peers to watch for disconnection.
self.closed_connections.push(PeerDisconnectFut {
closed_fut: handle.closed(),
peer_id: Some(peer_id),
});
}
self.cumulative_difficulties
.entry(new_cumulative_difficulty)
.or_default()
.insert(peer_id);
// If the claimed cumulative difficulty is higher than the current one in the watcher
// or if the peer in the watch has disconnected, update it.
if self.new_height_watcher.borrow().cumulative_difficulty < new_cumulative_difficulty
|| self
.last_peer_in_watcher_handle
.as_ref()
.is_some_and(ConnectionHandle::is_closed)
{
tracing::debug!(
"Updating sync watcher channel with new highest seen cumulative difficulty: {new_cumulative_difficulty}"
);
#[expect(
clippy::let_underscore_must_use,
reason = "dropped receivers can be ignored"
)]
let _ = self.new_height_watcher.send(NewSyncInfo {
top_hash: core_sync_data.top_id,
chain_height: core_sync_data.current_height,
cumulative_difficulty: new_cumulative_difficulty,
});
self.last_peer_in_watcher_handle.replace(handle);
}
Ok(())
}
}
impl<N: NetworkZone> Service<PeerSyncRequest<N>> for PeerSyncSvc<N> {
type Response = PeerSyncResponse<N>;
type Error = tower::BoxError;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_disconnected(cx);
Poll::Ready(Ok(()))
}
fn call(&mut self, req: PeerSyncRequest<N>) -> Self::Future {
let res = match req {
PeerSyncRequest::PeersToSyncFrom {
current_cumulative_difficulty,
block_needed,
} => Ok(PeerSyncResponse::PeersToSyncFrom(self.peers_to_sync_from(
current_cumulative_difficulty,
block_needed,
))),
PeerSyncRequest::IncomingCoreSyncData(peer_id, handle, sync_data) => self
.update_peer_sync_info(peer_id, handle, &sync_data)
.map(|()| PeerSyncResponse::Ok),
};
ready(res)
}
}
#[cfg(test)]
mod tests {
use tower::{Service, ServiceExt};
use cuprate_p2p_core::{
client::InternalPeerID, handles::HandleBuilder, services::PeerSyncRequest,
};
use cuprate_wire::CoreSyncData;
use cuprate_p2p_core::services::PeerSyncResponse;
use cuprate_test_utils::test_netzone::TestNetZone;
use super::PeerSyncSvc;
#[tokio::test]
async fn top_sync_channel_updates() {
let (_g, handle) = HandleBuilder::new().build();
let (mut svc, mut watch) = PeerSyncSvc::<TestNetZone<true, true, true>>::new();
assert!(!watch.has_changed().unwrap());
svc.ready()
.await
.unwrap()
.call(PeerSyncRequest::IncomingCoreSyncData(
InternalPeerID::Unknown(0),
handle.clone(),
CoreSyncData {
cumulative_difficulty: 1_000,
cumulative_difficulty_top64: 0,
current_height: 0,
pruning_seed: 0,
top_id: [0; 32],
top_version: 0,
},
))
.await
.unwrap();
assert!(watch.has_changed().unwrap());
assert_eq!(watch.borrow().top_hash, [0; 32]);
assert_eq!(watch.borrow().cumulative_difficulty, 1000);
assert_eq!(watch.borrow_and_update().chain_height, 0);
svc.ready()
.await
.unwrap()
.call(PeerSyncRequest::IncomingCoreSyncData(
InternalPeerID::Unknown(1),
handle.clone(),
CoreSyncData {
cumulative_difficulty: 1_000,
cumulative_difficulty_top64: 0,
current_height: 0,
pruning_seed: 0,
top_id: [0; 32],
top_version: 0,
},
))
.await
.unwrap();
assert!(!watch.has_changed().unwrap());
svc.ready()
.await
.unwrap()
.call(PeerSyncRequest::IncomingCoreSyncData(
InternalPeerID::Unknown(2),
handle.clone(),
CoreSyncData {
cumulative_difficulty: 1_001,
cumulative_difficulty_top64: 0,
current_height: 0,
pruning_seed: 0,
top_id: [1; 32],
top_version: 0,
},
))
.await
.unwrap();
assert!(watch.has_changed().unwrap());
assert_eq!(watch.borrow().top_hash, [1; 32]);
assert_eq!(watch.borrow().cumulative_difficulty, 1001);
assert_eq!(watch.borrow_and_update().chain_height, 0);
}
#[tokio::test]
async fn peer_sync_info_updates() {
let (_g, handle) = HandleBuilder::new().build();
let (mut svc, _watch) = PeerSyncSvc::<TestNetZone<true, true, true>>::new();
svc.ready()
.await
.unwrap()
.call(PeerSyncRequest::IncomingCoreSyncData(
InternalPeerID::Unknown(0),
handle.clone(),
CoreSyncData {
cumulative_difficulty: 1_000,
cumulative_difficulty_top64: 0,
current_height: 0,
pruning_seed: 0,
top_id: [0; 32],
top_version: 0,
},
))
.await
.unwrap();
assert_eq!(svc.peers.len(), 1);
assert_eq!(svc.cumulative_difficulties.len(), 1);
svc.ready()
.await
.unwrap()
.call(PeerSyncRequest::IncomingCoreSyncData(
InternalPeerID::Unknown(0),
handle.clone(),
CoreSyncData {
cumulative_difficulty: 1_001,
cumulative_difficulty_top64: 0,
current_height: 0,
pruning_seed: 0,
top_id: [0; 32],
top_version: 0,
},
))
.await
.unwrap();
assert_eq!(svc.peers.len(), 1);
assert_eq!(svc.cumulative_difficulties.len(), 1);
svc.ready()
.await
.unwrap()
.call(PeerSyncRequest::IncomingCoreSyncData(
InternalPeerID::Unknown(1),
handle.clone(),
CoreSyncData {
cumulative_difficulty: 10,
cumulative_difficulty_top64: 0,
current_height: 0,
pruning_seed: 0,
top_id: [0; 32],
top_version: 0,
},
))
.await
.unwrap();
assert_eq!(svc.peers.len(), 2);
assert_eq!(svc.cumulative_difficulties.len(), 2);
let PeerSyncResponse::PeersToSyncFrom(peers) = svc
.ready()
.await
.unwrap()
.call(PeerSyncRequest::PeersToSyncFrom {
block_needed: None,
current_cumulative_difficulty: 0,
})
.await
.unwrap()
else {
panic!("Wrong response for request.")
};
assert!(
peers.contains(&InternalPeerID::Unknown(0))
&& peers.contains(&InternalPeerID::Unknown(1))
);
}
}