mirror of
https://github.com/Cuprate/cuprate.git
synced 2024-12-22 03:29:25 +00:00
P2P: give the protocol handler access to the peer info (#302)
Some checks failed
CI / fmt (push) Has been cancelled
CI / typo (push) Has been cancelled
Audit / audit (push) Has been cancelled
CI / ci (macos-latest, stable, bash) (push) Has been cancelled
CI / ci (ubuntu-latest, stable, bash) (push) Has been cancelled
CI / ci (windows-latest, stable-x86_64-pc-windows-gnu, msys2 {0}) (push) Has been cancelled
Deny / audit (push) Has been cancelled
Doc / build (push) Has been cancelled
Doc / deploy (push) Has been cancelled
Some checks failed
CI / fmt (push) Has been cancelled
CI / typo (push) Has been cancelled
Audit / audit (push) Has been cancelled
CI / ci (macos-latest, stable, bash) (push) Has been cancelled
CI / ci (ubuntu-latest, stable, bash) (push) Has been cancelled
CI / ci (windows-latest, stable-x86_64-pc-windows-gnu, msys2 {0}) (push) Has been cancelled
Deny / audit (push) Has been cancelled
Doc / build (push) Has been cancelled
Doc / deploy (push) Has been cancelled
* give the protocol handler access to the peer info * add trait alias * clippy + fmt * update doc * simplify trait aliases * use tower `Shared` * clean import * fmt * Update Cargo.toml Co-authored-by: hinto-janai <hinto.janai@protonmail.com> * fix merge --------- Co-authored-by: hinto-janai <hinto.janai@protonmail.com>
This commit is contained in:
parent
6da9d2d734
commit
521bf877db
8 changed files with 153 additions and 102 deletions
69
Cargo.lock
generated
69
Cargo.lock
generated
|
@ -129,9 +129,9 @@ dependencies = [
|
|||
"serde_urlencoded",
|
||||
"sync_wrapper 1.0.1",
|
||||
"tokio",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tower 0.4.13",
|
||||
"tower-layer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
|
@ -151,8 +151,8 @@ dependencies = [
|
|||
"pin-project-lite",
|
||||
"rustversion",
|
||||
"sync_wrapper 0.1.2",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tower-layer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
|
@ -527,7 +527,7 @@ dependencies = [
|
|||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower",
|
||||
"tower 0.5.1",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
|
@ -565,7 +565,7 @@ dependencies = [
|
|||
"tempfile",
|
||||
"thread_local",
|
||||
"tokio",
|
||||
"tower",
|
||||
"tower 0.5.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -592,7 +592,7 @@ dependencies = [
|
|||
"tokio",
|
||||
"tokio-test",
|
||||
"tokio-util",
|
||||
"tower",
|
||||
"tower 0.5.1",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
|
@ -638,7 +638,7 @@ dependencies = [
|
|||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower",
|
||||
"tower 0.5.1",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
|
@ -668,7 +668,7 @@ dependencies = [
|
|||
"futures",
|
||||
"rayon",
|
||||
"serde",
|
||||
"tower",
|
||||
"tower 0.5.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -700,7 +700,7 @@ dependencies = [
|
|||
"sha3",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tower",
|
||||
"tower 0.5.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -785,7 +785,7 @@ dependencies = [
|
|||
"tokio-stream",
|
||||
"tokio-test",
|
||||
"tokio-util",
|
||||
"tower",
|
||||
"tower 0.5.1",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
|
@ -808,7 +808,7 @@ dependencies = [
|
|||
"tokio-stream",
|
||||
"tokio-test",
|
||||
"tokio-util",
|
||||
"tower",
|
||||
"tower 0.5.1",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
|
@ -836,7 +836,7 @@ dependencies = [
|
|||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tower",
|
||||
"tower 0.5.1",
|
||||
"ureq",
|
||||
]
|
||||
|
||||
|
@ -897,7 +897,7 @@ dependencies = [
|
|||
"tempfile",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tower",
|
||||
"tower 0.5.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -992,7 +992,7 @@ dependencies = [
|
|||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tower",
|
||||
"tower 0.5.1",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
@ -1491,7 +1491,7 @@ dependencies = [
|
|||
"rustls-pki-types",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tower-service",
|
||||
"tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1509,8 +1509,8 @@ dependencies = [
|
|||
"pin-project-lite",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tower",
|
||||
"tower-service",
|
||||
"tower 0.4.13",
|
||||
"tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
|
@ -2539,7 +2539,7 @@ dependencies = [
|
|||
"hyper-rustls",
|
||||
"hyper-util",
|
||||
"tokio",
|
||||
"tower-service",
|
||||
"tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -2821,9 +2821,24 @@ dependencies = [
|
|||
"pin-project",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tower-layer 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tower-service 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower"
|
||||
version = "0.5.1"
|
||||
source = "git+https://github.com/Cuprate/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"pin-project-lite",
|
||||
"sync_wrapper 0.1.2",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tower-layer 0.3.3 (git+https://github.com/Cuprate/tower.git?rev=6c7faf0)",
|
||||
"tower-service 0.3.3 (git+https://github.com/Cuprate/tower.git?rev=6c7faf0)",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
|
@ -2833,12 +2848,22 @@ version = "0.3.3"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
|
||||
|
||||
[[package]]
|
||||
name = "tower-layer"
|
||||
version = "0.3.3"
|
||||
source = "git+https://github.com/Cuprate/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf"
|
||||
|
||||
[[package]]
|
||||
name = "tower-service"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
|
||||
|
||||
[[package]]
|
||||
name = "tower-service"
|
||||
version = "0.3.3"
|
||||
source = "git+https://github.com/Cuprate/tower.git?rev=6c7faf0#6c7faf0e9dbc74aef5d3110313324bc7e1f997cf"
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
version = "0.1.40"
|
||||
|
|
|
@ -82,7 +82,7 @@ thread_local = { version = "1.1.8", default-features = false }
|
|||
tokio-util = { version = "0.7.12", default-features = false }
|
||||
tokio-stream = { version = "0.1.16", default-features = false }
|
||||
tokio = { version = "1.40.0", default-features = false }
|
||||
tower = { version = "0.4.13", default-features = false }
|
||||
tower = { git = "https://github.com/Cuprate/tower.git", rev = "6c7faf0", default-features = false } # <https://github.com/tower-rs/tower/pull/796>
|
||||
tracing-subscriber = { version = "0.3.18", default-features = false }
|
||||
tracing = { version = "0.1.40", default-features = false }
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ tokio-util = { workspace = true, features = ["codec"] }
|
|||
tokio-stream = { workspace = true, features = ["sync"]}
|
||||
futures = { workspace = true, features = ["std"] }
|
||||
async-trait = { workspace = true }
|
||||
tower = { workspace = true, features = ["util", "tracing"] }
|
||||
tower = { workspace = true, features = ["util", "tracing", "make"] }
|
||||
|
||||
cfg-if = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
|
|
|
@ -17,7 +17,7 @@ use tower::{Service, ServiceExt};
|
|||
use crate::{
|
||||
client::{handshaker::HandShaker, Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
|
||||
AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone,
|
||||
ProtocolRequestHandler,
|
||||
ProtocolRequestHandlerMaker,
|
||||
};
|
||||
|
||||
/// A request to connect to a peer.
|
||||
|
@ -32,25 +32,27 @@ pub struct ConnectRequest<Z: NetworkZone> {
|
|||
}
|
||||
|
||||
/// The connector service, this service connects to peer and returns the [`Client`].
|
||||
pub struct Connector<Z: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr> {
|
||||
handshaker: HandShaker<Z, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>,
|
||||
pub struct Connector<Z: NetworkZone, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr> {
|
||||
handshaker: HandShaker<Z, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>,
|
||||
}
|
||||
|
||||
impl<Z: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
||||
Connector<Z, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
||||
impl<Z: NetworkZone, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
|
||||
Connector<Z, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
|
||||
{
|
||||
/// Create a new connector from a handshaker.
|
||||
pub const fn new(handshaker: HandShaker<Z, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>) -> Self {
|
||||
pub const fn new(
|
||||
handshaker: HandShaker<Z, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>,
|
||||
) -> Self {
|
||||
Self { handshaker }
|
||||
}
|
||||
}
|
||||
|
||||
impl<Z: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr, BrdcstStrm>
|
||||
Service<ConnectRequest<Z>> for Connector<Z, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
||||
impl<Z: NetworkZone, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr, BrdcstStrm>
|
||||
Service<ConnectRequest<Z>> for Connector<Z, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
|
||||
where
|
||||
AdrBook: AddressBook<Z> + Clone,
|
||||
CSync: CoreSyncSvc + Clone,
|
||||
ProtoHdlr: ProtocolRequestHandler + Clone,
|
||||
ProtoHdlrMkr: ProtocolRequestHandlerMaker<Z> + Clone,
|
||||
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
|
||||
BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Clone + Send + 'static,
|
||||
{
|
||||
|
|
|
@ -42,7 +42,7 @@ use crate::{
|
|||
handles::HandleBuilder,
|
||||
AddressBook, AddressBookRequest, AddressBookResponse, BroadcastMessage, ConnectionDirection,
|
||||
CoreSyncDataRequest, CoreSyncDataResponse, CoreSyncSvc, NetZoneAddress, NetworkZone,
|
||||
ProtocolRequestHandler, SharedError,
|
||||
ProtocolRequestHandlerMaker, SharedError,
|
||||
};
|
||||
|
||||
pub mod builder;
|
||||
|
@ -86,13 +86,13 @@ pub struct DoHandshakeRequest<Z: NetworkZone> {
|
|||
|
||||
/// The peer handshaking service.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct HandShaker<Z: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr> {
|
||||
pub struct HandShaker<Z: NetworkZone, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr> {
|
||||
/// The address book service.
|
||||
address_book: AdrBook,
|
||||
/// The core sync data service.
|
||||
core_sync_svc: CSync,
|
||||
/// The protocol request handler service.
|
||||
protocol_request_svc: ProtoHdlr,
|
||||
protocol_request_svc_maker: ProtoHdlrMkr,
|
||||
|
||||
/// Our [`BasicNodeData`]
|
||||
our_basic_node_data: BasicNodeData,
|
||||
|
@ -106,14 +106,14 @@ pub struct HandShaker<Z: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
|||
_zone: PhantomData<Z>,
|
||||
}
|
||||
|
||||
impl<Z: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
||||
HandShaker<Z, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
||||
impl<Z: NetworkZone, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
|
||||
HandShaker<Z, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
|
||||
{
|
||||
/// Creates a new handshaker.
|
||||
const fn new(
|
||||
address_book: AdrBook,
|
||||
core_sync_svc: CSync,
|
||||
protocol_request_svc: ProtoHdlr,
|
||||
protocol_request_svc_maker: ProtoHdlrMkr,
|
||||
broadcast_stream_maker: BrdcstStrmMkr,
|
||||
our_basic_node_data: BasicNodeData,
|
||||
connection_parent_span: Span,
|
||||
|
@ -121,7 +121,7 @@ impl<Z: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
|||
Self {
|
||||
address_book,
|
||||
core_sync_svc,
|
||||
protocol_request_svc,
|
||||
protocol_request_svc_maker,
|
||||
broadcast_stream_maker,
|
||||
our_basic_node_data,
|
||||
connection_parent_span,
|
||||
|
@ -130,12 +130,12 @@ impl<Z: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
|||
}
|
||||
}
|
||||
|
||||
impl<Z: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr, BrdcstStrm>
|
||||
Service<DoHandshakeRequest<Z>> for HandShaker<Z, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
||||
impl<Z: NetworkZone, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr, BrdcstStrm>
|
||||
Service<DoHandshakeRequest<Z>> for HandShaker<Z, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr>
|
||||
where
|
||||
AdrBook: AddressBook<Z> + Clone,
|
||||
CSync: CoreSyncSvc + Clone,
|
||||
ProtoHdlr: ProtocolRequestHandler + Clone,
|
||||
ProtoHdlrMkr: ProtocolRequestHandlerMaker<Z> + Clone,
|
||||
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
|
||||
BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Clone + Send + 'static,
|
||||
{
|
||||
|
@ -152,7 +152,7 @@ where
|
|||
let broadcast_stream_maker = self.broadcast_stream_maker.clone();
|
||||
|
||||
let address_book = self.address_book.clone();
|
||||
let protocol_request_svc = self.protocol_request_svc.clone();
|
||||
let protocol_request_svc_maker = self.protocol_request_svc_maker.clone();
|
||||
let core_sync_svc = self.core_sync_svc.clone();
|
||||
let our_basic_node_data = self.our_basic_node_data.clone();
|
||||
|
||||
|
@ -168,7 +168,7 @@ where
|
|||
broadcast_stream_maker,
|
||||
address_book,
|
||||
core_sync_svc,
|
||||
protocol_request_svc,
|
||||
protocol_request_svc_maker,
|
||||
our_basic_node_data,
|
||||
connection_parent_span,
|
||||
),
|
||||
|
@ -222,21 +222,21 @@ pub async fn ping<N: NetworkZone>(addr: N::Addr) -> Result<u64, HandshakeError>
|
|||
}
|
||||
|
||||
/// This function completes a handshake with the requested peer.
|
||||
async fn handshake<Z: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr, BrdcstStrm>(
|
||||
async fn handshake<Z: NetworkZone, AdrBook, CSync, ProtoHdlrMkr, BrdcstStrmMkr, BrdcstStrm>(
|
||||
req: DoHandshakeRequest<Z>,
|
||||
|
||||
broadcast_stream_maker: BrdcstStrmMkr,
|
||||
|
||||
mut address_book: AdrBook,
|
||||
mut core_sync_svc: CSync,
|
||||
protocol_request_handler: ProtoHdlr,
|
||||
mut protocol_request_svc_maker: ProtoHdlrMkr,
|
||||
our_basic_node_data: BasicNodeData,
|
||||
connection_parent_span: Span,
|
||||
) -> Result<Client<Z>, HandshakeError>
|
||||
where
|
||||
AdrBook: AddressBook<Z> + Clone,
|
||||
CSync: CoreSyncSvc + Clone,
|
||||
ProtoHdlr: ProtocolRequestHandler,
|
||||
ProtoHdlrMkr: ProtocolRequestHandlerMaker<Z>,
|
||||
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
|
||||
BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Send + 'static,
|
||||
{
|
||||
|
@ -458,6 +458,13 @@ where
|
|||
core_sync_data: Arc::new(Mutex::new(peer_core_sync)),
|
||||
};
|
||||
|
||||
let protocol_request_handler = protocol_request_svc_maker
|
||||
.as_service()
|
||||
.ready()
|
||||
.await?
|
||||
.call(info.clone())
|
||||
.await?;
|
||||
|
||||
let request_handler = PeerRequestHandler {
|
||||
address_book_svc: address_book.clone(),
|
||||
our_sync_svc: core_sync_svc.clone(),
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
use std::marker::PhantomData;
|
||||
use std::{convert::Infallible, marker::PhantomData};
|
||||
|
||||
use futures::{stream, Stream};
|
||||
use tower::{make::Shared, util::MapErr};
|
||||
use tracing::Span;
|
||||
|
||||
use cuprate_wire::BasicNodeData;
|
||||
|
||||
use crate::{
|
||||
client::{handshaker::HandShaker, InternalPeerID},
|
||||
AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, ProtocolRequestHandler,
|
||||
AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, ProtocolRequestHandlerMaker,
|
||||
};
|
||||
|
||||
mod dummy;
|
||||
|
@ -16,7 +17,7 @@ pub use dummy::{DummyAddressBook, DummyCoreSyncSvc, DummyProtocolRequestHandler}
|
|||
/// A [`HandShaker`] [`Service`](tower::Service) builder.
|
||||
///
|
||||
/// This builder applies default values to make usage easier, behaviour and drawbacks of the defaults are documented
|
||||
/// on the `with_*` method to change it, for example [`HandshakerBuilder::with_protocol_request_handler`].
|
||||
/// on the `with_*` method to change it, for example [`HandshakerBuilder::with_protocol_request_handler_maker`].
|
||||
///
|
||||
/// If you want to use any network other than [`Mainnet`](crate::Network::Mainnet)
|
||||
/// you will need to change the core sync service with [`HandshakerBuilder::with_core_sync_svc`],
|
||||
|
@ -26,7 +27,7 @@ pub struct HandshakerBuilder<
|
|||
N: NetworkZone,
|
||||
AdrBook = DummyAddressBook,
|
||||
CSync = DummyCoreSyncSvc,
|
||||
ProtoHdlr = DummyProtocolRequestHandler,
|
||||
ProtoHdlrMkr = MapErr<Shared<DummyProtocolRequestHandler>, fn(Infallible) -> tower::BoxError>,
|
||||
BrdcstStrmMkr = fn(
|
||||
InternalPeerID<<N as NetworkZone>::Addr>,
|
||||
) -> stream::Pending<BroadcastMessage>,
|
||||
|
@ -36,7 +37,7 @@ pub struct HandshakerBuilder<
|
|||
/// The core sync data service.
|
||||
core_sync_svc: CSync,
|
||||
/// The protocol request service.
|
||||
protocol_request_svc: ProtoHdlr,
|
||||
protocol_request_svc_maker: ProtoHdlrMkr,
|
||||
/// Our [`BasicNodeData`]
|
||||
our_basic_node_data: BasicNodeData,
|
||||
/// A function that returns a stream that will give items to be broadcast by a connection.
|
||||
|
@ -54,7 +55,10 @@ impl<N: NetworkZone> HandshakerBuilder<N> {
|
|||
Self {
|
||||
address_book: DummyAddressBook,
|
||||
core_sync_svc: DummyCoreSyncSvc::static_mainnet_genesis(),
|
||||
protocol_request_svc: DummyProtocolRequestHandler,
|
||||
protocol_request_svc_maker: MapErr::new(
|
||||
Shared::new(DummyProtocolRequestHandler),
|
||||
tower::BoxError::from,
|
||||
),
|
||||
our_basic_node_data,
|
||||
broadcast_stream_maker: |_| stream::pending(),
|
||||
connection_parent_span: None,
|
||||
|
@ -83,7 +87,7 @@ impl<N: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
|||
{
|
||||
let Self {
|
||||
core_sync_svc,
|
||||
protocol_request_svc,
|
||||
protocol_request_svc_maker,
|
||||
our_basic_node_data,
|
||||
broadcast_stream_maker,
|
||||
connection_parent_span,
|
||||
|
@ -93,7 +97,7 @@ impl<N: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
|||
HandshakerBuilder {
|
||||
address_book: new_address_book,
|
||||
core_sync_svc,
|
||||
protocol_request_svc,
|
||||
protocol_request_svc_maker,
|
||||
our_basic_node_data,
|
||||
broadcast_stream_maker,
|
||||
connection_parent_span,
|
||||
|
@ -123,7 +127,7 @@ impl<N: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
|||
{
|
||||
let Self {
|
||||
address_book,
|
||||
protocol_request_svc,
|
||||
protocol_request_svc_maker,
|
||||
our_basic_node_data,
|
||||
broadcast_stream_maker,
|
||||
connection_parent_span,
|
||||
|
@ -133,7 +137,7 @@ impl<N: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
|||
HandshakerBuilder {
|
||||
address_book,
|
||||
core_sync_svc: new_core_sync_svc,
|
||||
protocol_request_svc,
|
||||
protocol_request_svc_maker,
|
||||
our_basic_node_data,
|
||||
broadcast_stream_maker,
|
||||
connection_parent_span,
|
||||
|
@ -141,19 +145,20 @@ impl<N: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
|||
}
|
||||
}
|
||||
|
||||
/// Changes the protocol request handler, which handles [`ProtocolRequest`](crate::ProtocolRequest)s to our node.
|
||||
/// Changes the protocol request handler maker, which creates the service that handles [`ProtocolRequest`](crate::ProtocolRequest)s
|
||||
/// to our node.
|
||||
///
|
||||
/// ## Default Protocol Request Handler
|
||||
///
|
||||
/// The default protocol request handler will not respond to any protocol requests, this should not
|
||||
/// The default service maker will create services that will not respond to any protocol requests, this should not
|
||||
/// be an issue as long as peers do not think we are ahead of them, if they do they will send requests
|
||||
/// for our blocks, and we won't respond which will cause them to disconnect.
|
||||
pub fn with_protocol_request_handler<NProtoHdlr>(
|
||||
pub fn with_protocol_request_handler_maker<NProtoHdlrMkr>(
|
||||
self,
|
||||
new_protocol_handler: NProtoHdlr,
|
||||
) -> HandshakerBuilder<N, AdrBook, CSync, NProtoHdlr, BrdcstStrmMkr>
|
||||
new_protocol_request_svc_maker: NProtoHdlrMkr,
|
||||
) -> HandshakerBuilder<N, AdrBook, CSync, NProtoHdlrMkr, BrdcstStrmMkr>
|
||||
where
|
||||
NProtoHdlr: ProtocolRequestHandler + Clone,
|
||||
NProtoHdlrMkr: ProtocolRequestHandlerMaker<N> + Clone,
|
||||
{
|
||||
let Self {
|
||||
address_book,
|
||||
|
@ -167,7 +172,7 @@ impl<N: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
|||
HandshakerBuilder {
|
||||
address_book,
|
||||
core_sync_svc,
|
||||
protocol_request_svc: new_protocol_handler,
|
||||
protocol_request_svc_maker: new_protocol_request_svc_maker,
|
||||
our_basic_node_data,
|
||||
broadcast_stream_maker,
|
||||
connection_parent_span,
|
||||
|
@ -193,7 +198,7 @@ impl<N: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
|||
let Self {
|
||||
address_book,
|
||||
core_sync_svc,
|
||||
protocol_request_svc,
|
||||
protocol_request_svc_maker,
|
||||
our_basic_node_data,
|
||||
connection_parent_span,
|
||||
..
|
||||
|
@ -202,7 +207,7 @@ impl<N: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
|||
HandshakerBuilder {
|
||||
address_book,
|
||||
core_sync_svc,
|
||||
protocol_request_svc,
|
||||
protocol_request_svc_maker,
|
||||
our_basic_node_data,
|
||||
broadcast_stream_maker: new_broadcast_stream_maker,
|
||||
connection_parent_span,
|
||||
|
@ -228,7 +233,7 @@ impl<N: NetworkZone, AdrBook, CSync, ProtoHdlr, BrdcstStrmMkr>
|
|||
HandShaker::new(
|
||||
self.address_book,
|
||||
self.core_sync_svc,
|
||||
self.protocol_request_svc,
|
||||
self.protocol_request_svc_maker,
|
||||
self.broadcast_stream_maker,
|
||||
self.our_basic_node_data,
|
||||
self.connection_parent_span.unwrap_or(Span::none()),
|
||||
|
|
|
@ -66,7 +66,7 @@ cfg_if::cfg_if! {
|
|||
}
|
||||
}
|
||||
|
||||
use std::{fmt::Debug, future::Future, hash::Hash};
|
||||
use std::{fmt::Debug, hash::Hash};
|
||||
|
||||
use futures::{Sink, Stream};
|
||||
|
||||
|
@ -197,26 +197,21 @@ pub trait AddressBook<Z: NetworkZone>:
|
|||
AddressBookRequest<Z>,
|
||||
Response = AddressBookResponse<Z>,
|
||||
Error = tower::BoxError,
|
||||
Future = Self::Future2,
|
||||
Future: Send + 'static,
|
||||
> + Send
|
||||
+ 'static
|
||||
{
|
||||
// This allows us to put more restrictive bounds on the future without defining the future here
|
||||
// explicitly.
|
||||
type Future2: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static;
|
||||
}
|
||||
|
||||
impl<T, Z: NetworkZone> AddressBook<Z> for T
|
||||
where
|
||||
impl<T, Z: NetworkZone> AddressBook<Z> for T where
|
||||
T: tower::Service<
|
||||
AddressBookRequest<Z>,
|
||||
Response = AddressBookResponse<Z>,
|
||||
Error = tower::BoxError,
|
||||
Future: Send + 'static,
|
||||
> + Send
|
||||
+ 'static,
|
||||
T::Future: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static,
|
||||
+ 'static
|
||||
{
|
||||
type Future2 = T::Future;
|
||||
}
|
||||
|
||||
pub trait CoreSyncSvc:
|
||||
|
@ -224,26 +219,21 @@ pub trait CoreSyncSvc:
|
|||
CoreSyncDataRequest,
|
||||
Response = CoreSyncDataResponse,
|
||||
Error = tower::BoxError,
|
||||
Future = Self::Future2,
|
||||
Future: Send + 'static,
|
||||
> + Send
|
||||
+ 'static
|
||||
{
|
||||
// This allows us to put more restrictive bounds on the future without defining the future here
|
||||
// explicitly.
|
||||
type Future2: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static;
|
||||
}
|
||||
|
||||
impl<T> CoreSyncSvc for T
|
||||
where
|
||||
impl<T> CoreSyncSvc for T where
|
||||
T: tower::Service<
|
||||
CoreSyncDataRequest,
|
||||
Response = CoreSyncDataResponse,
|
||||
Error = tower::BoxError,
|
||||
Future: Send + 'static,
|
||||
> + Send
|
||||
+ 'static,
|
||||
T::Future: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static,
|
||||
+ 'static
|
||||
{
|
||||
type Future2 = T::Future;
|
||||
}
|
||||
|
||||
pub trait ProtocolRequestHandler:
|
||||
|
@ -251,21 +241,43 @@ pub trait ProtocolRequestHandler:
|
|||
ProtocolRequest,
|
||||
Response = ProtocolResponse,
|
||||
Error = tower::BoxError,
|
||||
Future = Self::Future2,
|
||||
Future: Send + 'static,
|
||||
> + Send
|
||||
+ 'static
|
||||
{
|
||||
// This allows us to put more restrictive bounds on the future without defining the future here
|
||||
// explicitly.
|
||||
type Future2: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static;
|
||||
}
|
||||
|
||||
impl<T> ProtocolRequestHandler for T
|
||||
where
|
||||
T: tower::Service<ProtocolRequest, Response = ProtocolResponse, Error = tower::BoxError>
|
||||
+ Send
|
||||
+ 'static,
|
||||
T::Future: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static,
|
||||
impl<T> ProtocolRequestHandler for T where
|
||||
T: tower::Service<
|
||||
ProtocolRequest,
|
||||
Response = ProtocolResponse,
|
||||
Error = tower::BoxError,
|
||||
Future: Send + 'static,
|
||||
> + Send
|
||||
+ 'static
|
||||
{
|
||||
}
|
||||
|
||||
pub trait ProtocolRequestHandlerMaker<Z: NetworkZone>:
|
||||
tower::MakeService<
|
||||
client::PeerInformation<Z::Addr>,
|
||||
ProtocolRequest,
|
||||
MakeError = tower::BoxError,
|
||||
Service: ProtocolRequestHandler,
|
||||
Future: Send + 'static,
|
||||
> + Send
|
||||
+ 'static
|
||||
{
|
||||
}
|
||||
|
||||
impl<T, Z: NetworkZone> ProtocolRequestHandlerMaker<Z> for T where
|
||||
T: tower::MakeService<
|
||||
client::PeerInformation<Z::Addr>,
|
||||
ProtocolRequest,
|
||||
MakeError = tower::BoxError,
|
||||
Service: ProtocolRequestHandler,
|
||||
Future: Send + 'static,
|
||||
> + Send
|
||||
+ 'static
|
||||
{
|
||||
type Future2 = T::Future;
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ use cuprate_p2p_core::{
|
|||
client::Connector,
|
||||
client::InternalPeerID,
|
||||
services::{AddressBookRequest, AddressBookResponse},
|
||||
CoreSyncSvc, NetworkZone, ProtocolRequestHandler,
|
||||
CoreSyncSvc, NetworkZone, ProtocolRequestHandlerMaker,
|
||||
};
|
||||
|
||||
mod block_downloader;
|
||||
|
@ -41,14 +41,14 @@ use connection_maintainer::MakeConnectionRequest;
|
|||
/// - A core sync service, which keeps track of the sync state of our node
|
||||
#[instrument(level = "debug", name = "net", skip_all, fields(zone = N::NAME))]
|
||||
pub async fn initialize_network<N, PR, CS>(
|
||||
protocol_request_handler: PR,
|
||||
protocol_request_handler_maker: PR,
|
||||
core_sync_svc: CS,
|
||||
config: P2PConfig<N>,
|
||||
) -> Result<NetworkInterface<N>, tower::BoxError>
|
||||
where
|
||||
N: NetworkZone,
|
||||
N::Addr: borsh::BorshDeserialize + borsh::BorshSerialize,
|
||||
PR: ProtocolRequestHandler + Clone,
|
||||
PR: ProtocolRequestHandlerMaker<N> + Clone,
|
||||
CS: CoreSyncSvc + Clone,
|
||||
{
|
||||
let address_book =
|
||||
|
@ -73,7 +73,7 @@ where
|
|||
cuprate_p2p_core::client::HandshakerBuilder::new(basic_node_data)
|
||||
.with_address_book(address_book.clone())
|
||||
.with_core_sync_svc(core_sync_svc)
|
||||
.with_protocol_request_handler(protocol_request_handler)
|
||||
.with_protocol_request_handler_maker(protocol_request_handler_maker)
|
||||
.with_broadcast_stream_maker(outbound_mkr)
|
||||
.with_connection_parent_span(Span::current());
|
||||
|
||||
|
|
Loading…
Reference in a new issue