give the protocol handler access to the peer info

This commit is contained in:
Boog900 2024-09-27 20:36:07 +01:00
parent 88605b081f
commit 75c70eb720
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
8 changed files with 174 additions and 78 deletions

69
Cargo.lock generated
View file

@ -129,9 +129,9 @@ dependencies = [
"serde_urlencoded", "serde_urlencoded",
"sync_wrapper 1.0.1", "sync_wrapper 1.0.1",
"tokio", "tokio",
"tower", "tower 0.4.13",
"tower-layer", "tower-layer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service", "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tracing", "tracing",
] ]
@ -151,8 +151,8 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
"rustversion", "rustversion",
"sync_wrapper 0.1.2", "sync_wrapper 0.1.2",
"tower-layer", "tower-layer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service", "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tracing", "tracing",
] ]
@ -507,7 +507,7 @@ dependencies = [
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-util", "tokio-util",
"tower", "tower 0.5.1",
"tracing", "tracing",
] ]
@ -545,7 +545,7 @@ dependencies = [
"tempfile", "tempfile",
"thread_local", "thread_local",
"tokio", "tokio",
"tower", "tower 0.5.1",
] ]
[[package]] [[package]]
@ -572,7 +572,7 @@ dependencies = [
"tokio", "tokio",
"tokio-test", "tokio-test",
"tokio-util", "tokio-util",
"tower", "tower 0.5.1",
"tracing", "tracing",
] ]
@ -618,7 +618,7 @@ dependencies = [
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-util", "tokio-util",
"tower", "tower 0.5.1",
"tracing", "tracing",
] ]
@ -648,7 +648,7 @@ dependencies = [
"futures", "futures",
"rayon", "rayon",
"serde", "serde",
"tower", "tower 0.5.1",
] ]
[[package]] [[package]]
@ -680,7 +680,7 @@ dependencies = [
"sha3", "sha3",
"thiserror", "thiserror",
"tokio", "tokio",
"tower", "tower 0.5.1",
] ]
[[package]] [[package]]
@ -766,7 +766,7 @@ dependencies = [
"tokio-stream", "tokio-stream",
"tokio-test", "tokio-test",
"tokio-util", "tokio-util",
"tower", "tower 0.5.1",
"tracing", "tracing",
] ]
@ -789,7 +789,7 @@ dependencies = [
"tokio-stream", "tokio-stream",
"tokio-test", "tokio-test",
"tokio-util", "tokio-util",
"tower", "tower 0.5.1",
"tracing", "tracing",
] ]
@ -817,7 +817,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"tower", "tower 0.5.1",
"ureq", "ureq",
] ]
@ -878,7 +878,7 @@ dependencies = [
"tempfile", "tempfile",
"thiserror", "thiserror",
"tokio", "tokio",
"tower", "tower 0.5.1",
] ]
[[package]] [[package]]
@ -972,7 +972,7 @@ dependencies = [
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tokio-util", "tokio-util",
"tower", "tower 0.5.1",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
] ]
@ -1471,7 +1471,7 @@ dependencies = [
"rustls-pki-types", "rustls-pki-types",
"tokio", "tokio",
"tokio-rustls", "tokio-rustls",
"tower-service", "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]
@ -1489,8 +1489,8 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
"socket2", "socket2",
"tokio", "tokio",
"tower", "tower 0.4.13",
"tower-service", "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tracing", "tracing",
] ]
@ -2519,7 +2519,7 @@ dependencies = [
"hyper-rustls", "hyper-rustls",
"hyper-util", "hyper-util",
"tokio", "tokio",
"tower-service", "tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]
@ -2801,9 +2801,24 @@ dependencies = [
"pin-project", "pin-project",
"pin-project-lite", "pin-project-lite",
"tokio", "tokio",
"tower-layer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tracing",
]
[[package]]
name = "tower"
version = "0.5.1"
source = "git+https://github.com/Boog900/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf"
dependencies = [
"futures-core",
"futures-util",
"pin-project-lite",
"sync_wrapper 0.1.2",
"tokio",
"tokio-util", "tokio-util",
"tower-layer", "tower-layer 0.3.3 (git+https://github.com/Boog900/tower.git?rev=6c7faf0)",
"tower-service", "tower-service 0.3.3 (git+https://github.com/Boog900/tower.git?rev=6c7faf0)",
"tracing", "tracing",
] ]
@ -2813,12 +2828,22 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
[[package]]
name = "tower-layer"
version = "0.3.3"
source = "git+https://github.com/Boog900/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf"
[[package]] [[package]]
name = "tower-service" name = "tower-service"
version = "0.3.3" version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
[[package]]
name = "tower-service"
version = "0.3.3"
source = "git+https://github.com/Boog900/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf"
[[package]] [[package]]
name = "tracing" name = "tracing"
version = "0.1.40" version = "0.1.40"

View file

@ -81,7 +81,7 @@ thread_local = { version = "1.1.8", default-features = false }
tokio-util = { version = "0.7.12", default-features = false } tokio-util = { version = "0.7.12", default-features = false }
tokio-stream = { version = "0.1.16", default-features = false } tokio-stream = { version = "0.1.16", default-features = false }
tokio = { version = "1.40.0", default-features = false } tokio = { version = "1.40.0", default-features = false }
tower = { version = "0.4.13", default-features = false } tower = { git = "https://github.com/Boog900/tower.git", rev = "6c7faf0", default-features = false }
tracing-subscriber = { version = "0.3.18", default-features = false } tracing-subscriber = { version = "0.3.18", default-features = false }
tracing = { version = "0.1.40", default-features = false } tracing = { version = "0.1.40", default-features = false }

View file

@ -19,7 +19,7 @@ tokio-util = { workspace = true, features = ["codec"] }
tokio-stream = { workspace = true, features = ["sync"]} tokio-stream = { workspace = true, features = ["sync"]}
futures = { workspace = true, features = ["std"] } futures = { workspace = true, features = ["std"] }
async-trait = { workspace = true } async-trait = { workspace = true }
tower = { workspace = true, features = ["util", "tracing"] } tower = { workspace = true, features = ["util", "tracing", "make"] }
cfg-if = { workspace = true } cfg-if = { workspace = true }
thiserror = { workspace = true } thiserror = { workspace = true }

View file

@ -12,12 +12,13 @@ use std::{
use futures::{FutureExt, Stream}; use futures::{FutureExt, Stream};
use tokio::sync::OwnedSemaphorePermit; use tokio::sync::OwnedSemaphorePermit;
use tower::{Service, ServiceExt}; use tower::{MakeService, Service, ServiceExt};
use crate::client::PeerInformation;
use crate::{ use crate::{
client::{handshaker::HandShaker, Client, DoHandshakeRequest, HandshakeError, InternalPeerID}, client::{handshaker::HandShaker, Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone, PeerSyncSvc, AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone, PeerSyncSvc,
ProtocolRequestHandler, ProtocolRequest, ProtocolRequestHandler, ProtocolResponse,
}; };
/// A request to connect to a peer. /// A request to connect to a peer.
@ -32,28 +33,36 @@ pub struct ConnectRequest<Z: NetworkZone> {
} }
/// The connector service, this service connects to peer and returns the [`Client`]. /// The connector service, this service connects to peer and returns the [`Client`].
pub struct Connector<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr> { pub struct Connector<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlrMkr, BrdcstStrmMkr> {
handshaker: HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>, handshaker: HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlrMkr, BrdcstStrmMkr>,
} }
impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr> impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlrMkr, BrdcstStrmMkr>
Connector<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr> Connector<Z, AdrBook, CSync, PSync, ProtoHdlrMkr, BrdcstStrmMkr>
{ {
/// Create a new connector from a handshaker. /// Create a new connector from a handshaker.
pub const fn new( pub const fn new(
handshaker: HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>, handshaker: HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlrMkr, BrdcstStrmMkr>,
) -> Self { ) -> Self {
Self { handshaker } Self { handshaker }
} }
} }
impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr, BrdcstStrm> impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlrMkr, BrdcstStrmMkr, BrdcstStrm>
Service<ConnectRequest<Z>> for Connector<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr> Service<ConnectRequest<Z>> for Connector<Z, AdrBook, CSync, PSync, ProtoHdlrMkr, BrdcstStrmMkr>
where where
AdrBook: AddressBook<Z> + Clone, AdrBook: AddressBook<Z> + Clone,
CSync: CoreSyncSvc + Clone, CSync: CoreSyncSvc + Clone,
PSync: PeerSyncSvc<Z> + Clone, PSync: PeerSyncSvc<Z> + Clone,
ProtoHdlr: ProtocolRequestHandler + Clone, ProtoHdlrMkr: MakeService<
PeerInformation<Z::Addr>,
ProtocolRequest,
MakeError = tower::BoxError,
Service: ProtocolRequestHandler,
Future: Send + 'static,
> + Clone
+ Send
+ 'static,
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static, BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Clone + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Clone + Send + 'static,
{ {

View file

@ -17,7 +17,7 @@ use tokio::{
sync::{mpsc, OwnedSemaphorePermit, Semaphore}, sync::{mpsc, OwnedSemaphorePermit, Semaphore},
time::{error::Elapsed, timeout}, time::{error::Elapsed, timeout},
}; };
use tower::{Service, ServiceExt}; use tower::{MakeService, Service, ServiceExt};
use tracing::{info_span, Instrument, Span}; use tracing::{info_span, Instrument, Span};
use cuprate_pruning::{PruningError, PruningSeed}; use cuprate_pruning::{PruningError, PruningSeed};
@ -43,7 +43,7 @@ use crate::{
services::PeerSyncRequest, services::PeerSyncRequest,
AddressBook, AddressBookRequest, AddressBookResponse, BroadcastMessage, ConnectionDirection, AddressBook, AddressBookRequest, AddressBookResponse, BroadcastMessage, ConnectionDirection,
CoreSyncDataRequest, CoreSyncDataResponse, CoreSyncSvc, NetZoneAddress, NetworkZone, CoreSyncDataRequest, CoreSyncDataResponse, CoreSyncSvc, NetZoneAddress, NetworkZone,
PeerSyncSvc, ProtocolRequestHandler, SharedError, PeerSyncSvc, ProtocolRequest, ProtocolRequestHandler, SharedError,
}; };
pub mod builder; pub mod builder;
@ -87,7 +87,7 @@ pub struct DoHandshakeRequest<Z: NetworkZone> {
/// The peer handshaking service. /// The peer handshaking service.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct HandShaker<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr> { pub struct HandShaker<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlrMkr, BrdcstStrmMkr> {
/// The address book service. /// The address book service.
address_book: AdrBook, address_book: AdrBook,
/// The core sync data service. /// The core sync data service.
@ -95,7 +95,7 @@ pub struct HandShaker<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstSt
/// The peer sync service. /// The peer sync service.
peer_sync_svc: PSync, peer_sync_svc: PSync,
/// The protocol request handler service. /// The protocol request handler service.
protocol_request_svc: ProtoHdlr, protocol_request_svc_maker: ProtoHdlrMkr,
/// Our [`BasicNodeData`] /// Our [`BasicNodeData`]
our_basic_node_data: BasicNodeData, our_basic_node_data: BasicNodeData,
@ -109,15 +109,15 @@ pub struct HandShaker<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstSt
_zone: PhantomData<Z>, _zone: PhantomData<Z>,
} }
impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr> impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlrMkr, BrdcstStrmMkr>
HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr> HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlrMkr, BrdcstStrmMkr>
{ {
/// Creates a new handshaker. /// Creates a new handshaker.
const fn new( const fn new(
address_book: AdrBook, address_book: AdrBook,
peer_sync_svc: PSync, peer_sync_svc: PSync,
core_sync_svc: CSync, core_sync_svc: CSync,
protocol_request_svc: ProtoHdlr, protocol_request_svc_maker: ProtoHdlrMkr,
broadcast_stream_maker: BrdcstStrmMkr, broadcast_stream_maker: BrdcstStrmMkr,
our_basic_node_data: BasicNodeData, our_basic_node_data: BasicNodeData,
connection_parent_span: Span, connection_parent_span: Span,
@ -126,7 +126,7 @@ impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
address_book, address_book,
peer_sync_svc, peer_sync_svc,
core_sync_svc, core_sync_svc,
protocol_request_svc, protocol_request_svc_maker,
broadcast_stream_maker, broadcast_stream_maker,
our_basic_node_data, our_basic_node_data,
connection_parent_span, connection_parent_span,
@ -135,14 +135,22 @@ impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
} }
} }
impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr, BrdcstStrm> impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlrMkr, BrdcstStrmMkr, BrdcstStrm>
Service<DoHandshakeRequest<Z>> Service<DoHandshakeRequest<Z>>
for HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr> for HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlrMkr, BrdcstStrmMkr>
where where
AdrBook: AddressBook<Z> + Clone, AdrBook: AddressBook<Z> + Clone,
CSync: CoreSyncSvc + Clone, CSync: CoreSyncSvc + Clone,
PSync: PeerSyncSvc<Z> + Clone, PSync: PeerSyncSvc<Z> + Clone,
ProtoHdlr: ProtocolRequestHandler + Clone, ProtoHdlrMkr: MakeService<
PeerInformation<Z::Addr>,
ProtocolRequest,
MakeError = tower::BoxError,
Service: ProtocolRequestHandler,
Future: Send + 'static,
> + Clone
+ Send
+ 'static,
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static, BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Clone + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Clone + Send + 'static,
{ {
@ -159,7 +167,7 @@ where
let broadcast_stream_maker = self.broadcast_stream_maker.clone(); let broadcast_stream_maker = self.broadcast_stream_maker.clone();
let address_book = self.address_book.clone(); let address_book = self.address_book.clone();
let protocol_request_svc = self.protocol_request_svc.clone(); let protocol_request_svc_maker = self.protocol_request_svc_maker.clone();
let core_sync_svc = self.core_sync_svc.clone(); let core_sync_svc = self.core_sync_svc.clone();
let peer_sync_svc = self.peer_sync_svc.clone(); let peer_sync_svc = self.peer_sync_svc.clone();
let our_basic_node_data = self.our_basic_node_data.clone(); let our_basic_node_data = self.our_basic_node_data.clone();
@ -177,7 +185,7 @@ where
address_book, address_book,
core_sync_svc, core_sync_svc,
peer_sync_svc, peer_sync_svc,
protocol_request_svc, protocol_request_svc_maker,
our_basic_node_data, our_basic_node_data,
connection_parent_span, connection_parent_span,
), ),
@ -232,7 +240,7 @@ pub async fn ping<N: NetworkZone>(addr: N::Addr) -> Result<u64, HandshakeError>
/// This function completes a handshake with the requested peer. /// This function completes a handshake with the requested peer.
#[expect(clippy::too_many_arguments)] #[expect(clippy::too_many_arguments)]
async fn handshake<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr, BrdcstStrm>( async fn handshake<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlrMkr, BrdcstStrmMkr, BrdcstStrm>(
req: DoHandshakeRequest<Z>, req: DoHandshakeRequest<Z>,
broadcast_stream_maker: BrdcstStrmMkr, broadcast_stream_maker: BrdcstStrmMkr,
@ -240,7 +248,7 @@ async fn handshake<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmM
mut address_book: AdrBook, mut address_book: AdrBook,
mut core_sync_svc: CSync, mut core_sync_svc: CSync,
mut peer_sync_svc: PSync, mut peer_sync_svc: PSync,
protocol_request_handler: ProtoHdlr, mut protocol_request_svc_maker: ProtoHdlrMkr,
our_basic_node_data: BasicNodeData, our_basic_node_data: BasicNodeData,
connection_parent_span: Span, connection_parent_span: Span,
) -> Result<Client<Z>, HandshakeError> ) -> Result<Client<Z>, HandshakeError>
@ -248,7 +256,14 @@ where
AdrBook: AddressBook<Z> + Clone, AdrBook: AddressBook<Z> + Clone,
CSync: CoreSyncSvc + Clone, CSync: CoreSyncSvc + Clone,
PSync: PeerSyncSvc<Z> + Clone, PSync: PeerSyncSvc<Z> + Clone,
ProtoHdlr: ProtocolRequestHandler, ProtoHdlrMkr: MakeService<
PeerInformation<Z::Addr>,
ProtocolRequest,
MakeError = tower::BoxError,
Service: ProtocolRequestHandler,
Future: Send + 'static,
> + Send
+ 'static,
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static, BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Send + 'static,
{ {
@ -480,6 +495,13 @@ where
pruning_seed, pruning_seed,
}; };
let protocol_request_handler = protocol_request_svc_maker
.as_service()
.ready()
.await?
.call(info.clone())
.await?;
let request_handler = PeerRequestHandler { let request_handler = PeerRequestHandler {
address_book_svc: address_book.clone(), address_book_svc: address_book.clone(),
our_sync_svc: core_sync_svc.clone(), our_sync_svc: core_sync_svc.clone(),

View file

@ -1,18 +1,21 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use futures::{stream, Stream}; use futures::{stream, Stream};
use tower::MakeService;
use tracing::Span; use tracing::Span;
use cuprate_wire::BasicNodeData; use cuprate_wire::BasicNodeData;
use crate::{ use crate::{
client::{handshaker::HandShaker, InternalPeerID}, client::{handshaker::HandShaker, InternalPeerID, PeerInformation},
AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, PeerSyncSvc, ProtocolRequestHandler, AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, PeerSyncSvc, ProtocolRequest,
ProtocolRequestHandler,
}; };
mod dummy; mod dummy;
pub use dummy::{ pub use dummy::{
DummyAddressBook, DummyCoreSyncSvc, DummyPeerSyncSvc, DummyProtocolRequestHandler, DummyAddressBook, DummyCoreSyncSvc, DummyPeerSyncSvc, DummyProtocolRequestHandler,
DummyProtocolRequestHandlerMaker,
}; };
/// A [`HandShaker`] [`Service`](tower::Service) builder. /// A [`HandShaker`] [`Service`](tower::Service) builder.
@ -29,7 +32,7 @@ pub struct HandshakerBuilder<
AdrBook = DummyAddressBook, AdrBook = DummyAddressBook,
CSync = DummyCoreSyncSvc, CSync = DummyCoreSyncSvc,
PSync = DummyPeerSyncSvc, PSync = DummyPeerSyncSvc,
ProtoHdlr = DummyProtocolRequestHandler, ProtoHdlrMkr = DummyProtocolRequestHandlerMaker,
BrdcstStrmMkr = fn( BrdcstStrmMkr = fn(
InternalPeerID<<N as NetworkZone>::Addr>, InternalPeerID<<N as NetworkZone>::Addr>,
) -> stream::Pending<BroadcastMessage>, ) -> stream::Pending<BroadcastMessage>,
@ -41,7 +44,7 @@ pub struct HandshakerBuilder<
/// The peer sync service. /// The peer sync service.
peer_sync_svc: PSync, peer_sync_svc: PSync,
/// The protocol request service. /// The protocol request service.
protocol_request_svc: ProtoHdlr, protocol_request_svc_maker: ProtoHdlrMkr,
/// Our [`BasicNodeData`] /// Our [`BasicNodeData`]
our_basic_node_data: BasicNodeData, our_basic_node_data: BasicNodeData,
/// A function that returns a stream that will give items to be broadcast by a connection. /// A function that returns a stream that will give items to be broadcast by a connection.
@ -60,7 +63,7 @@ impl<N: NetworkZone> HandshakerBuilder<N> {
address_book: DummyAddressBook, address_book: DummyAddressBook,
core_sync_svc: DummyCoreSyncSvc::static_mainnet_genesis(), core_sync_svc: DummyCoreSyncSvc::static_mainnet_genesis(),
peer_sync_svc: DummyPeerSyncSvc, peer_sync_svc: DummyPeerSyncSvc,
protocol_request_svc: DummyProtocolRequestHandler, protocol_request_svc_maker: DummyProtocolRequestHandlerMaker,
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker: |_| stream::pending(), broadcast_stream_maker: |_| stream::pending(),
connection_parent_span: None, connection_parent_span: None,
@ -90,7 +93,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
let Self { let Self {
core_sync_svc, core_sync_svc,
peer_sync_svc, peer_sync_svc,
protocol_request_svc, protocol_request_svc_maker,
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker, broadcast_stream_maker,
connection_parent_span, connection_parent_span,
@ -101,7 +104,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
address_book: new_address_book, address_book: new_address_book,
core_sync_svc, core_sync_svc,
peer_sync_svc, peer_sync_svc,
protocol_request_svc, protocol_request_svc_maker,
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker, broadcast_stream_maker,
connection_parent_span, connection_parent_span,
@ -132,7 +135,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
let Self { let Self {
address_book, address_book,
peer_sync_svc, peer_sync_svc,
protocol_request_svc, protocol_request_svc_maker,
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker, broadcast_stream_maker,
connection_parent_span, connection_parent_span,
@ -143,7 +146,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
address_book, address_book,
core_sync_svc: new_core_sync_svc, core_sync_svc: new_core_sync_svc,
peer_sync_svc, peer_sync_svc,
protocol_request_svc, protocol_request_svc_maker,
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker, broadcast_stream_maker,
connection_parent_span, connection_parent_span,
@ -168,7 +171,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
let Self { let Self {
address_book, address_book,
core_sync_svc, core_sync_svc,
protocol_request_svc, protocol_request_svc_maker,
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker, broadcast_stream_maker,
connection_parent_span, connection_parent_span,
@ -179,7 +182,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
address_book, address_book,
core_sync_svc, core_sync_svc,
peer_sync_svc: new_peer_sync_svc, peer_sync_svc: new_peer_sync_svc,
protocol_request_svc, protocol_request_svc_maker,
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker, broadcast_stream_maker,
connection_parent_span, connection_parent_span,
@ -187,19 +190,28 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
} }
} }
/// Changes the protocol request handler, which handles [`ProtocolRequest`](crate::ProtocolRequest)s to our node. /// Changes the protocol request handler maker, which creates the service that handles [`ProtocolRequest`](crate::ProtocolRequest)s
/// to our node.
/// ///
/// ## Default Protocol Request Handler /// ## Default Protocol Request Handler
/// ///
/// The default protocol request handler will not respond to any protocol requests, this should not /// The default service maker will create services that will not respond to any protocol requests, this should not
/// be an issue as long as peers do not think we are ahead of them, if they do they will send requests /// be an issue as long as peers do not think we are ahead of them, if they do they will send requests
/// for our blocks, and we won't respond which will cause them to disconnect. /// for our blocks, and we won't respond which will cause them to disconnect.
pub fn with_protocol_request_handler<NProtoHdlr>( pub fn with_protocol_request_handler_maker<NProtoHdlrMkr>(
self, self,
new_protocol_handler: NProtoHdlr, new_protocol_request_svc_maker: NProtoHdlrMkr,
) -> HandshakerBuilder<N, AdrBook, CSync, PSync, NProtoHdlr, BrdcstStrmMkr> ) -> HandshakerBuilder<N, AdrBook, CSync, PSync, NProtoHdlrMkr, BrdcstStrmMkr>
where where
NProtoHdlr: ProtocolRequestHandler + Clone, NProtoHdlrMkr: MakeService<
PeerInformation<N::Addr>,
ProtocolRequest,
MakeError = tower::BoxError,
Service: ProtocolRequestHandler,
Future: Send + 'static,
> + Clone
+ Send
+ 'static,
{ {
let Self { let Self {
address_book, address_book,
@ -215,7 +227,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
address_book, address_book,
core_sync_svc, core_sync_svc,
peer_sync_svc, peer_sync_svc,
protocol_request_svc: new_protocol_handler, protocol_request_svc_maker: new_protocol_request_svc_maker,
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker, broadcast_stream_maker,
connection_parent_span, connection_parent_span,
@ -242,7 +254,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
address_book, address_book,
core_sync_svc, core_sync_svc,
peer_sync_svc, peer_sync_svc,
protocol_request_svc, protocol_request_svc_maker,
our_basic_node_data, our_basic_node_data,
connection_parent_span, connection_parent_span,
.. ..
@ -252,7 +264,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
address_book, address_book,
core_sync_svc, core_sync_svc,
peer_sync_svc, peer_sync_svc,
protocol_request_svc, protocol_request_svc_maker,
our_basic_node_data, our_basic_node_data,
broadcast_stream_maker: new_broadcast_stream_maker, broadcast_stream_maker: new_broadcast_stream_maker,
connection_parent_span, connection_parent_span,
@ -279,7 +291,7 @@ impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
self.address_book, self.address_book,
self.peer_sync_svc, self.peer_sync_svc,
self.core_sync_svc, self.core_sync_svc,
self.protocol_request_svc, self.protocol_request_svc_maker,
self.broadcast_stream_maker, self.broadcast_stream_maker,
self.our_basic_node_data, self.our_basic_node_data,
self.connection_parent_span.unwrap_or(Span::none()), self.connection_parent_span.unwrap_or(Span::none()),

View file

@ -7,12 +7,13 @@ use tower::Service;
use cuprate_wire::CoreSyncData; use cuprate_wire::CoreSyncData;
use crate::client::PeerInformation;
use crate::{ use crate::{
services::{ services::{
AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse, AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse,
PeerSyncRequest, PeerSyncResponse, PeerSyncRequest, PeerSyncResponse,
}, },
NetworkZone, ProtocolRequest, ProtocolResponse, NetZoneAddress, NetworkZone, ProtocolRequest, ProtocolResponse,
}; };
/// A dummy peer sync service, that doesn't actually keep track of peers sync states. /// A dummy peer sync service, that doesn't actually keep track of peers sync states.
@ -132,6 +133,24 @@ impl<N: NetworkZone> Service<AddressBookRequest<N>> for DummyAddressBook {
} }
} }
/// A [`DummyProtocolRequestHandler`] maker.
#[derive(Debug, Clone)]
pub struct DummyProtocolRequestHandlerMaker;
impl<A: NetZoneAddress> Service<PeerInformation<A>> for DummyProtocolRequestHandlerMaker {
type Response = DummyProtocolRequestHandler;
type Error = tower::BoxError;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: PeerInformation<A>) -> Self::Future {
ready(Ok(DummyProtocolRequestHandler))
}
}
/// A dummy protocol request handler. /// A dummy protocol request handler.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct DummyProtocolRequestHandler; pub struct DummyProtocolRequestHandler;

View file

@ -10,7 +10,7 @@ use tokio::{
task::JoinSet, task::JoinSet,
}; };
use tokio_stream::wrappers::WatchStream; use tokio_stream::wrappers::WatchStream;
use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt}; use tower::{buffer::Buffer, util::BoxCloneService, MakeService, Service, ServiceExt};
use tracing::{instrument, Instrument, Span}; use tracing::{instrument, Instrument, Span};
use cuprate_async_buffer::BufferStream; use cuprate_async_buffer::BufferStream;
@ -18,7 +18,7 @@ use cuprate_p2p_core::{
client::Connector, client::Connector,
client::InternalPeerID, client::InternalPeerID,
services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest}, services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest},
CoreSyncSvc, NetworkZone, ProtocolRequestHandler, CoreSyncSvc, NetworkZone, ProtocolRequest, ProtocolRequestHandler,
}; };
mod block_downloader; mod block_downloader;
@ -35,6 +35,7 @@ pub use broadcast::{BroadcastRequest, BroadcastSvc};
use client_pool::ClientPoolDropGuard; use client_pool::ClientPoolDropGuard;
pub use config::P2PConfig; pub use config::P2PConfig;
use connection_maintainer::MakeConnectionRequest; use connection_maintainer::MakeConnectionRequest;
use cuprate_p2p_core::client::PeerInformation;
/// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`]. /// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`].
/// ///
@ -46,14 +47,22 @@ use connection_maintainer::MakeConnectionRequest;
/// - A core sync service, which keeps track of the sync state of our node /// - A core sync service, which keeps track of the sync state of our node
#[instrument(level = "debug", name = "net", skip_all, fields(zone = N::NAME))] #[instrument(level = "debug", name = "net", skip_all, fields(zone = N::NAME))]
pub async fn initialize_network<N, PR, CS>( pub async fn initialize_network<N, PR, CS>(
protocol_request_handler: PR, protocol_request_handler_maker: PR,
core_sync_svc: CS, core_sync_svc: CS,
config: P2PConfig<N>, config: P2PConfig<N>,
) -> Result<NetworkInterface<N>, tower::BoxError> ) -> Result<NetworkInterface<N>, tower::BoxError>
where where
N: NetworkZone, N: NetworkZone,
N::Addr: borsh::BorshDeserialize + borsh::BorshSerialize, N::Addr: borsh::BorshDeserialize + borsh::BorshSerialize,
PR: ProtocolRequestHandler + Clone, PR: MakeService<
PeerInformation<N::Addr>,
ProtocolRequest,
MakeError = tower::BoxError,
Service: ProtocolRequestHandler,
Future: Send + 'static,
> + Clone
+ Send
+ 'static,
CS: CoreSyncSvc + Clone, CS: CoreSyncSvc + Clone,
{ {
let address_book = let address_book =
@ -85,7 +94,7 @@ where
.with_address_book(address_book.clone()) .with_address_book(address_book.clone())
.with_peer_sync_svc(sync_states_svc.clone()) .with_peer_sync_svc(sync_states_svc.clone())
.with_core_sync_svc(core_sync_svc) .with_core_sync_svc(core_sync_svc)
.with_protocol_request_handler(protocol_request_handler) .with_protocol_request_handler_maker(protocol_request_handler_maker)
.with_broadcast_stream_maker(outbound_mkr) .with_broadcast_stream_maker(outbound_mkr)
.with_connection_parent_span(Span::current()); .with_connection_parent_span(Span::current());
@ -160,7 +169,7 @@ pub struct NetworkInterface<N: NetworkZone> {
/// The address book service. /// The address book service.
address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>, address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>,
/// The peer's sync states service. /// The peer's sync states service.
sync_states_svc: Buffer<sync_states::PeerSyncSvc<N>, PeerSyncRequest<N>>, sync_states_svc: Buffer<PeerSyncRequest<N>, <sync_states::PeerSyncSvc<N> as Service<PeerSyncRequest<N>>>::Future>,
/// Background tasks that will be aborted when this interface is dropped. /// Background tasks that will be aborted when this interface is dropped.
_background_tasks: Arc<JoinSet<()>>, _background_tasks: Arc<JoinSet<()>>,
} }