Merge branch 'main' into cuprated
Some checks failed
Audit / audit (push) Has been cancelled
Deny / audit (push) Has been cancelled

This commit is contained in:
Boog900 2024-07-05 17:12:43 +01:00
commit 74b63d6c47
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
46 changed files with 1644 additions and 776 deletions

74
.github/workflows/doc.yml vendored Normal file
View file

@ -0,0 +1,74 @@
# This builds `cargo doc` and uploads it to the repo's GitHub Pages.
name: Doc
on:
push:
branches: [ "main" ] # Only deploy if `main` changes.
workflow_dispatch:
env:
# Show colored output in CI.
CARGO_TERM_COLOR: always
# Generate an index page.
RUSTDOCFLAGS: '--cfg docsrs --show-type-layout --enable-index-page -Zunstable-options'
jobs:
# Build documentation.
build:
# FIXME: how to build and merge Windows + macOS docs
# with Linux's? Similar to the OS toggle on docs.rs.
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: recursive
- name: Install Rust
uses: dtolnay/rust-toolchain@master
with:
# Nightly required for some `cargo doc` settings.
toolchain: nightly
- name: Cache
uses: actions/cache@v4
with:
# Don't cache actual doc files, just build files.
# This is so that removed crates don't show up.
path: target/debug
key: doc
# Packages other than `Boost` used by `Monero` are listed here.
# https://github.com/monero-project/monero/blob/c444a7e002036e834bfb4c68f04a121ce1af5825/.github/workflows/build.yml#L71
- name: Install dependencies (Linux)
run: sudo apt install -y libboost-dev
- name: Documentation
run: cargo +nightly doc --workspace --all-features
- name: Upload documentation
uses: actions/upload-pages-artifact@v3
with:
path: target/doc/
# Deployment job.
deploy:
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
runs-on: ubuntu-latest
needs: build
# Sets permissions of the GITHUB_TOKEN to allow deployment to GitHub Pages
permissions:
contents: read
pages: write
id-token: write
steps:
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4

View file

@ -216,9 +216,9 @@ The description of pull requests should generally follow the template laid out i
If your pull request is long and/or has sections that need clarifying, consider leaving a review on your own PR with comments explaining the changes. If your pull request is long and/or has sections that need clarifying, consider leaving a review on your own PR with comments explaining the changes.
## 5. Documentation ## 5. Documentation
Cuprate's crates (libraries) have inline documentation. Cuprate's crates (libraries) have inline documentation, they are published from the `main` branch at https://doc.cuprate.org.
These can be built and viewed using the `cargo` tool. For example, to build and view a specific crate's documentation, run the following command at the repository's root: Documentation can be built and viewed using the `cargo` tool. For example, to build and view a specific crate's documentation, run the following command at the repository's root:
```bash ```bash
cargo doc --open --package $CRATE cargo doc --open --package $CRATE
``` ```

4
Cargo.lock generated
View file

@ -508,6 +508,7 @@ dependencies = [
"pretty_assertions", "pretty_assertions",
"proptest", "proptest",
"rayon", "rayon",
"serde",
"tempfile", "tempfile",
"thread_local", "thread_local",
"tokio", "tokio",
@ -687,6 +688,7 @@ dependencies = [
name = "cuprate-p2p" name = "cuprate-p2p"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"borsh",
"bytes", "bytes",
"cuprate-address-book", "cuprate-address-book",
"cuprate-async-buffer", "cuprate-async-buffer",
@ -727,9 +729,11 @@ dependencies = [
"cuprate-wire", "cuprate-wire",
"futures", "futures",
"hex", "hex",
"hex-literal",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tokio-test",
"tokio-util", "tokio-util",
"tower", "tower",
"tracing", "tracing",

View file

@ -49,7 +49,7 @@ Cuprate maintains various documentation books:
| [Monero's protocol book](https://monero-book.cuprate.org) | Documents the Monero protocol | | [Monero's protocol book](https://monero-book.cuprate.org) | Documents the Monero protocol |
| [Cuprate's user book](https://user.cuprate.org) | Practical user-guide for using `cuprated` | | [Cuprate's user book](https://user.cuprate.org) | Practical user-guide for using `cuprated` |
For crate (library) documentation, see the `Documentation` section in [`CONTRIBUTING.md`](CONTRIBUTING.md). For crate (library) documentation, see: https://doc.cuprate.org. This site holds documentation for Cuprate's crates and all dependencies. All Cuprate crates start with `cuprate_`, for example: [`cuprate_database`](https://doc.cuprate.org/cuprate_database).
## Contributing ## Contributing

View file

@ -13,10 +13,10 @@
// copies or substantial portions of the Software. // copies or substantial portions of the Software.
// //
//! # Monero Wire //! # Cuprate Wire
//! //!
//! A crate defining Monero network messages and network addresses, //! A crate defining Monero network messages and network addresses,
//! built on top of the levin-cuprate crate. //! built on top of the [`cuprate_levin`] crate.
//! //!
//! ## License //! ## License
//! //!

View file

@ -177,6 +177,7 @@ fn build_message<T: cuprate_epee_encoding::EpeeObject>(
Ok(()) Ok(())
} }
#[derive(Debug, Clone)]
pub enum ProtocolMessage { pub enum ProtocolMessage {
NewBlock(NewBlock), NewBlock(NewBlock),
NewFluffyBlock(NewFluffyBlock), NewFluffyBlock(NewFluffyBlock),
@ -255,22 +256,23 @@ impl ProtocolMessage {
} }
} }
pub enum RequestMessage { #[derive(Debug, Clone)]
pub enum AdminRequestMessage {
Handshake(HandshakeRequest), Handshake(HandshakeRequest),
Ping, Ping,
SupportFlags, SupportFlags,
TimedSync(TimedSyncRequest), TimedSync(TimedSyncRequest),
} }
impl RequestMessage { impl AdminRequestMessage {
pub fn command(&self) -> LevinCommand { pub fn command(&self) -> LevinCommand {
use LevinCommand as C; use LevinCommand as C;
match self { match self {
RequestMessage::Handshake(_) => C::Handshake, AdminRequestMessage::Handshake(_) => C::Handshake,
RequestMessage::Ping => C::Ping, AdminRequestMessage::Ping => C::Ping,
RequestMessage::SupportFlags => C::SupportFlags, AdminRequestMessage::SupportFlags => C::SupportFlags,
RequestMessage::TimedSync(_) => C::TimedSync, AdminRequestMessage::TimedSync(_) => C::TimedSync,
} }
} }
@ -278,19 +280,19 @@ impl RequestMessage {
use LevinCommand as C; use LevinCommand as C;
Ok(match command { Ok(match command {
C::Handshake => decode_message(RequestMessage::Handshake, buf)?, C::Handshake => decode_message(AdminRequestMessage::Handshake, buf)?,
C::TimedSync => decode_message(RequestMessage::TimedSync, buf)?, C::TimedSync => decode_message(AdminRequestMessage::TimedSync, buf)?,
C::Ping => { C::Ping => {
cuprate_epee_encoding::from_bytes::<EmptyMessage, _>(buf) cuprate_epee_encoding::from_bytes::<EmptyMessage, _>(buf)
.map_err(|e| BucketError::BodyDecodingError(e.into()))?; .map_err(|e| BucketError::BodyDecodingError(e.into()))?;
RequestMessage::Ping AdminRequestMessage::Ping
} }
C::SupportFlags => { C::SupportFlags => {
cuprate_epee_encoding::from_bytes::<EmptyMessage, _>(buf) cuprate_epee_encoding::from_bytes::<EmptyMessage, _>(buf)
.map_err(|e| BucketError::BodyDecodingError(e.into()))?; .map_err(|e| BucketError::BodyDecodingError(e.into()))?;
RequestMessage::SupportFlags AdminRequestMessage::SupportFlags
} }
_ => return Err(BucketError::UnknownCommand), _ => return Err(BucketError::UnknownCommand),
}) })
@ -300,31 +302,34 @@ impl RequestMessage {
use LevinCommand as C; use LevinCommand as C;
match self { match self {
RequestMessage::Handshake(val) => build_message(C::Handshake, val, builder)?, AdminRequestMessage::Handshake(val) => build_message(C::Handshake, val, builder)?,
RequestMessage::TimedSync(val) => build_message(C::TimedSync, val, builder)?, AdminRequestMessage::TimedSync(val) => build_message(C::TimedSync, val, builder)?,
RequestMessage::Ping => build_message(C::Ping, EmptyMessage, builder)?, AdminRequestMessage::Ping => build_message(C::Ping, EmptyMessage, builder)?,
RequestMessage::SupportFlags => build_message(C::SupportFlags, EmptyMessage, builder)?, AdminRequestMessage::SupportFlags => {
build_message(C::SupportFlags, EmptyMessage, builder)?
}
} }
Ok(()) Ok(())
} }
} }
pub enum ResponseMessage { #[derive(Debug, Clone)]
pub enum AdminResponseMessage {
Handshake(HandshakeResponse), Handshake(HandshakeResponse),
Ping(PingResponse), Ping(PingResponse),
SupportFlags(SupportFlagsResponse), SupportFlags(SupportFlagsResponse),
TimedSync(TimedSyncResponse), TimedSync(TimedSyncResponse),
} }
impl ResponseMessage { impl AdminResponseMessage {
pub fn command(&self) -> LevinCommand { pub fn command(&self) -> LevinCommand {
use LevinCommand as C; use LevinCommand as C;
match self { match self {
ResponseMessage::Handshake(_) => C::Handshake, AdminResponseMessage::Handshake(_) => C::Handshake,
ResponseMessage::Ping(_) => C::Ping, AdminResponseMessage::Ping(_) => C::Ping,
ResponseMessage::SupportFlags(_) => C::SupportFlags, AdminResponseMessage::SupportFlags(_) => C::SupportFlags,
ResponseMessage::TimedSync(_) => C::TimedSync, AdminResponseMessage::TimedSync(_) => C::TimedSync,
} }
} }
@ -332,10 +337,10 @@ impl ResponseMessage {
use LevinCommand as C; use LevinCommand as C;
Ok(match command { Ok(match command {
C::Handshake => decode_message(ResponseMessage::Handshake, buf)?, C::Handshake => decode_message(AdminResponseMessage::Handshake, buf)?,
C::TimedSync => decode_message(ResponseMessage::TimedSync, buf)?, C::TimedSync => decode_message(AdminResponseMessage::TimedSync, buf)?,
C::Ping => decode_message(ResponseMessage::Ping, buf)?, C::Ping => decode_message(AdminResponseMessage::Ping, buf)?,
C::SupportFlags => decode_message(ResponseMessage::SupportFlags, buf)?, C::SupportFlags => decode_message(AdminResponseMessage::SupportFlags, buf)?,
_ => return Err(BucketError::UnknownCommand), _ => return Err(BucketError::UnknownCommand),
}) })
} }
@ -344,18 +349,21 @@ impl ResponseMessage {
use LevinCommand as C; use LevinCommand as C;
match self { match self {
ResponseMessage::Handshake(val) => build_message(C::Handshake, val, builder)?, AdminResponseMessage::Handshake(val) => build_message(C::Handshake, val, builder)?,
ResponseMessage::TimedSync(val) => build_message(C::TimedSync, val, builder)?, AdminResponseMessage::TimedSync(val) => build_message(C::TimedSync, val, builder)?,
ResponseMessage::Ping(val) => build_message(C::Ping, val, builder)?, AdminResponseMessage::Ping(val) => build_message(C::Ping, val, builder)?,
ResponseMessage::SupportFlags(val) => build_message(C::SupportFlags, val, builder)?, AdminResponseMessage::SupportFlags(val) => {
build_message(C::SupportFlags, val, builder)?
}
} }
Ok(()) Ok(())
} }
} }
#[derive(Debug, Clone)]
pub enum Message { pub enum Message {
Request(RequestMessage), Request(AdminRequestMessage),
Response(ResponseMessage), Response(AdminResponseMessage),
Protocol(ProtocolMessage), Protocol(ProtocolMessage),
} }
@ -390,8 +398,10 @@ impl LevinBody for Message {
command: LevinCommand, command: LevinCommand,
) -> Result<Self, BucketError> { ) -> Result<Self, BucketError> {
Ok(match typ { Ok(match typ {
MessageType::Request => Message::Request(RequestMessage::decode(body, command)?), MessageType::Request => Message::Request(AdminRequestMessage::decode(body, command)?),
MessageType::Response => Message::Response(ResponseMessage::decode(body, command)?), MessageType::Response => {
Message::Response(AdminResponseMessage::decode(body, command)?)
}
MessageType::Notification => Message::Protocol(ProtocolMessage::decode(body, command)?), MessageType::Notification => Message::Protocol(ProtocolMessage::decode(body, command)?),
}) })
} }

View file

@ -61,7 +61,7 @@ epee_object!(
/// A Request For Blocks /// A Request For Blocks
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct GetObjectsRequest { pub struct GetObjectsRequest {
/// Block hashes we want /// Block hashes wanted.
pub blocks: ByteArrayVec<32>, pub blocks: ByteArrayVec<32>,
/// Pruned /// Pruned
pub pruned: bool, pub pruned: bool,

View file

@ -27,7 +27,10 @@ use cuprate_p2p_core::{
}; };
use cuprate_pruning::PruningSeed; use cuprate_pruning::PruningSeed;
use crate::{peer_list::PeerList, store::save_peers_to_disk, AddressBookConfig, AddressBookError}; use crate::{
peer_list::PeerList, store::save_peers_to_disk, AddressBookConfig, AddressBookError,
BorshNetworkZone,
};
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
@ -45,7 +48,7 @@ pub struct ConnectionPeerEntry<Z: NetworkZone> {
rpc_credits_per_hash: u32, rpc_credits_per_hash: u32,
} }
pub struct AddressBook<Z: NetworkZone> { pub struct AddressBook<Z: BorshNetworkZone> {
/// Our white peers - the peers we have previously connected to. /// Our white peers - the peers we have previously connected to.
white_list: PeerList<Z>, white_list: PeerList<Z>,
/// Our gray peers - the peers we have been told about but haven't connected to. /// Our gray peers - the peers we have been told about but haven't connected to.
@ -66,7 +69,7 @@ pub struct AddressBook<Z: NetworkZone> {
cfg: AddressBookConfig, cfg: AddressBookConfig,
} }
impl<Z: NetworkZone> AddressBook<Z> { impl<Z: BorshNetworkZone> AddressBook<Z> {
pub fn new( pub fn new(
cfg: AddressBookConfig, cfg: AddressBookConfig,
white_peers: Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>, white_peers: Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>,
@ -351,7 +354,7 @@ impl<Z: NetworkZone> AddressBook<Z> {
} }
} }
impl<Z: NetworkZone> Service<AddressBookRequest<Z>> for AddressBook<Z> { impl<Z: BorshNetworkZone> Service<AddressBookRequest<Z>> for AddressBook<Z> {
type Response = AddressBookResponse<Z>; type Response = AddressBookResponse<Z>;
type Error = AddressBookError; type Error = AddressBookError;
type Future = Ready<Result<Self::Response, Self::Error>>; type Future = Ready<Result<Self::Response, Self::Error>>;

View file

@ -1,7 +1,7 @@
use std::{path::PathBuf, sync::Arc, time::Duration}; use std::{path::PathBuf, time::Duration};
use futures::StreamExt; use futures::StreamExt;
use tokio::{sync::Semaphore, time::interval}; use tokio::time::interval;
use cuprate_p2p_core::handles::HandleBuilder; use cuprate_p2p_core::handles::HandleBuilder;
use cuprate_pruning::PruningSeed; use cuprate_pruning::PruningSeed;
@ -78,11 +78,7 @@ async fn get_white_peers() {
async fn add_new_peer_already_connected() { async fn add_new_peer_already_connected() {
let mut address_book = make_fake_address_book(0, 0); let mut address_book = make_fake_address_book(0, 0);
let semaphore = Arc::new(Semaphore::new(10)); let (_, handle) = HandleBuilder::default().build();
let (_, handle) = HandleBuilder::default()
.with_permit(semaphore.clone().try_acquire_owned().unwrap())
.build();
address_book address_book
.handle_new_connection( .handle_new_connection(
@ -98,9 +94,7 @@ async fn add_new_peer_already_connected() {
) )
.unwrap(); .unwrap();
let (_, handle) = HandleBuilder::default() let (_, handle) = HandleBuilder::default().build();
.with_permit(semaphore.try_acquire_owned().unwrap())
.build();
assert_eq!( assert_eq!(
address_book.handle_new_connection( address_book.handle_new_connection(

View file

@ -10,10 +10,9 @@
//! clear net peers getting linked to their dark counterparts //! clear net peers getting linked to their dark counterparts
//! and so peers will only get told about peers they can //! and so peers will only get told about peers they can
//! connect to. //! connect to.
//!
use std::{io::ErrorKind, path::PathBuf, time::Duration}; use std::{io::ErrorKind, path::PathBuf, time::Duration};
use cuprate_p2p_core::NetworkZone; use cuprate_p2p_core::{NetZoneAddress, NetworkZone};
mod book; mod book;
mod peer_list; mod peer_list;
@ -61,7 +60,7 @@ pub enum AddressBookError {
} }
/// Initializes the P2P address book for a specific network zone. /// Initializes the P2P address book for a specific network zone.
pub async fn init_address_book<Z: NetworkZone>( pub async fn init_address_book<Z: BorshNetworkZone>(
cfg: AddressBookConfig, cfg: AddressBookConfig,
) -> Result<book::AddressBook<Z>, std::io::Error> { ) -> Result<book::AddressBook<Z>, std::io::Error> {
tracing::info!( tracing::info!(
@ -82,3 +81,21 @@ pub async fn init_address_book<Z: NetworkZone>(
Ok(address_book) Ok(address_book)
} }
use sealed::BorshNetworkZone;
mod sealed {
use super::*;
/// An internal trait for the address book for a [`NetworkZone`] that adds the requirement of [`borsh`] traits
/// onto the network address.
pub trait BorshNetworkZone: NetworkZone<Addr = Self::BorshAddr> {
type BorshAddr: NetZoneAddress + borsh::BorshDeserialize + borsh::BorshSerialize;
}
impl<T: NetworkZone> BorshNetworkZone for T
where
T::Addr: borsh::BorshDeserialize + borsh::BorshSerialize,
{
type BorshAddr = T::Addr;
}
}

View file

@ -3,9 +3,9 @@ use std::fs;
use borsh::{from_slice, to_vec, BorshDeserialize, BorshSerialize}; use borsh::{from_slice, to_vec, BorshDeserialize, BorshSerialize};
use tokio::task::{spawn_blocking, JoinHandle}; use tokio::task::{spawn_blocking, JoinHandle};
use cuprate_p2p_core::{services::ZoneSpecificPeerListEntryBase, NetZoneAddress, NetworkZone}; use cuprate_p2p_core::{services::ZoneSpecificPeerListEntryBase, NetZoneAddress};
use crate::{peer_list::PeerList, AddressBookConfig}; use crate::{peer_list::PeerList, AddressBookConfig, BorshNetworkZone};
// TODO: store anchor and ban list. // TODO: store anchor and ban list.
@ -21,7 +21,7 @@ struct DeserPeerDataV1<A: NetZoneAddress> {
gray_list: Vec<ZoneSpecificPeerListEntryBase<A>>, gray_list: Vec<ZoneSpecificPeerListEntryBase<A>>,
} }
pub fn save_peers_to_disk<Z: NetworkZone>( pub fn save_peers_to_disk<Z: BorshNetworkZone>(
cfg: &AddressBookConfig, cfg: &AddressBookConfig,
white_list: &PeerList<Z>, white_list: &PeerList<Z>,
gray_list: &PeerList<Z>, gray_list: &PeerList<Z>,
@ -38,7 +38,7 @@ pub fn save_peers_to_disk<Z: NetworkZone>(
spawn_blocking(move || fs::write(&file, &data)) spawn_blocking(move || fs::write(&file, &data))
} }
pub async fn read_peers_from_disk<Z: NetworkZone>( pub async fn read_peers_from_disk<Z: BorshNetworkZone>(
cfg: &AddressBookConfig, cfg: &AddressBookConfig,
) -> Result< ) -> Result<
( (

View file

@ -42,7 +42,6 @@ pub enum Graph {
/// `(-k*(k-1)*hop)/(2*log(1-ep))` /// `(-k*(k-1)*hop)/(2*log(1-ep))`
/// ///
/// Where `k` is calculated from the fluff probability, `hop` is `time_between_hop` and `ep` is fixed at `0.1`. /// Where `k` is calculated from the fluff probability, `hop` is `time_between_hop` and `ep` is fixed at `0.1`.
///
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub struct DandelionConfig { pub struct DandelionConfig {
/// The time it takes for a stem transaction to pass through a node, including network latency. /// The time it takes for a stem transaction to pass through a node, including network latency.

View file

@ -16,7 +16,6 @@
//! //!
//! When using your handle to the backing store it must be remembered to keep transactions in the stem pool hidden. //! When using your handle to the backing store it must be remembered to keep transactions in the stem pool hidden.
//! So handle any requests to the tx-pool like the stem side of the pool does not exist. //! So handle any requests to the tx-pool like the stem side of the pool does not exist.
//!
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
future::Future, future::Future,

View file

@ -7,7 +7,6 @@
//! //!
//! It does not handle anything to do with keeping transactions long term, i.e. embargo timers and handling //! It does not handle anything to do with keeping transactions long term, i.e. embargo timers and handling
//! loops in the stem. It is up to implementers to do this if they decide not to use [`DandelionPool`](crate::pool::DandelionPool) //! loops in the stem. It is up to implementers to do this if they decide not to use [`DandelionPool`](crate::pool::DandelionPool)
//!
use std::{ use std::{
collections::HashMap, collections::HashMap,
hash::Hash, hash::Hash,

View file

@ -23,6 +23,7 @@ tower = { workspace = true, features = ["util", "tracing"] }
thiserror = { workspace = true } thiserror = { workspace = true }
tracing = { workspace = true, features = ["std", "attributes"] } tracing = { workspace = true, features = ["std", "attributes"] }
hex-literal = { workspace = true }
borsh = { workspace = true, features = ["derive", "std"], optional = true } borsh = { workspace = true, features = ["derive", "std"], optional = true }
@ -31,4 +32,5 @@ cuprate-test-utils = {path = "../../test-utils"}
hex = { workspace = true, features = ["std"] } hex = { workspace = true, features = ["std"] }
tokio = { workspace = true, features = ["net", "rt-multi-thread", "rt", "macros"]} tokio = { workspace = true, features = ["net", "rt-multi-thread", "rt", "macros"]}
tokio-test = { workspace = true }
tracing-subscriber = { workspace = true } tracing-subscriber = { workspace = true }

View file

@ -24,10 +24,11 @@ use crate::{
mod connection; mod connection;
mod connector; mod connector;
pub mod handshaker; pub mod handshaker;
mod request_handler;
mod timeout_monitor; mod timeout_monitor;
pub use connector::{ConnectRequest, Connector}; pub use connector::{ConnectRequest, Connector};
pub use handshaker::{DoHandshakeRequest, HandShaker, HandshakeError}; pub use handshaker::{DoHandshakeRequest, HandshakeError, HandshakerBuilder};
/// An internal identifier for a given peer, will be their address if known /// An internal identifier for a given peer, will be their address if known
/// or a random u128 if not. /// or a random u128 if not.
@ -188,7 +189,8 @@ pub fn mock_client<Z: NetworkZone, S>(
mut request_handler: S, mut request_handler: S,
) -> Client<Z> ) -> Client<Z>
where where
S: crate::PeerRequestHandler, S: Service<PeerRequest, Response = PeerResponse, Error = tower::BoxError> + Send + 'static,
S::Future: Send + 'static,
{ {
let (tx, mut rx) = mpsc::channel(1); let (tx, mut rx) = mpsc::channel(1);

View file

@ -2,7 +2,6 @@
//! //!
//! This module handles routing requests from a [`Client`](crate::client::Client) or a broadcast channel to //! This module handles routing requests from a [`Client`](crate::client::Client) or a broadcast channel to
//! a peer. This module also handles routing requests from the connected peer to a request handler. //! a peer. This module also handles routing requests from the connected peer to a request handler.
//!
use std::pin::Pin; use std::pin::Pin;
use futures::{ use futures::{
@ -15,15 +14,15 @@ use tokio::{
time::{sleep, timeout, Sleep}, time::{sleep, timeout, Sleep},
}; };
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tower::ServiceExt;
use cuprate_wire::{LevinCommand, Message, ProtocolMessage}; use cuprate_wire::{LevinCommand, Message, ProtocolMessage};
use crate::client::request_handler::PeerRequestHandler;
use crate::{ use crate::{
constants::{REQUEST_TIMEOUT, SENDING_TIMEOUT}, constants::{REQUEST_TIMEOUT, SENDING_TIMEOUT},
handles::ConnectionGuard, handles::ConnectionGuard,
BroadcastMessage, MessageID, NetworkZone, PeerError, PeerRequest, PeerRequestHandler, AddressBook, BroadcastMessage, CoreSyncSvc, MessageID, NetworkZone, PeerError, PeerRequest,
PeerResponse, SharedError, PeerResponse, PeerSyncSvc, ProtocolRequestHandler, ProtocolResponse, SharedError,
}; };
/// A request to the connection task from a [`Client`](crate::client::Client). /// A request to the connection task from a [`Client`](crate::client::Client).
@ -72,7 +71,7 @@ fn levin_command_response(message_id: &MessageID, command: LevinCommand) -> bool
} }
/// This represents a connection to a peer. /// This represents a connection to a peer.
pub struct Connection<Z: NetworkZone, ReqHndlr, BrdcstStrm> { pub struct Connection<Z: NetworkZone, A, CS, PS, PR, BrdcstStrm> {
/// The peer sink - where we send messages to the peer. /// The peer sink - where we send messages to the peer.
peer_sink: Z::Sink, peer_sink: Z::Sink,
@ -87,7 +86,7 @@ pub struct Connection<Z: NetworkZone, ReqHndlr, BrdcstStrm> {
broadcast_stream: Pin<Box<BrdcstStrm>>, broadcast_stream: Pin<Box<BrdcstStrm>>,
/// The inner handler for any requests that come from the requested peer. /// The inner handler for any requests that come from the requested peer.
peer_request_handler: ReqHndlr, peer_request_handler: PeerRequestHandler<Z, A, CS, PS, PR>,
/// The connection guard which will send signals to other parts of Cuprate when this connection is dropped. /// The connection guard which will send signals to other parts of Cuprate when this connection is dropped.
connection_guard: ConnectionGuard, connection_guard: ConnectionGuard,
@ -95,9 +94,13 @@ pub struct Connection<Z: NetworkZone, ReqHndlr, BrdcstStrm> {
error: SharedError<PeerError>, error: SharedError<PeerError>,
} }
impl<Z: NetworkZone, ReqHndlr, BrdcstStrm> Connection<Z, ReqHndlr, BrdcstStrm> impl<Z, A, CS, PS, PR, BrdcstStrm> Connection<Z, A, CS, PS, PR, BrdcstStrm>
where where
ReqHndlr: PeerRequestHandler, Z: NetworkZone,
A: AddressBook<Z>,
CS: CoreSyncSvc,
PS: PeerSyncSvc<Z>,
PR: ProtocolRequestHandler,
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static, BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
{ {
/// Create a new connection struct. /// Create a new connection struct.
@ -105,10 +108,10 @@ where
peer_sink: Z::Sink, peer_sink: Z::Sink,
client_rx: mpsc::Receiver<ConnectionTaskRequest>, client_rx: mpsc::Receiver<ConnectionTaskRequest>,
broadcast_stream: BrdcstStrm, broadcast_stream: BrdcstStrm,
peer_request_handler: ReqHndlr, peer_request_handler: PeerRequestHandler<Z, A, CS, PS, PR>,
connection_guard: ConnectionGuard, connection_guard: ConnectionGuard,
error: SharedError<PeerError>, error: SharedError<PeerError>,
) -> Connection<Z, ReqHndlr, BrdcstStrm> { ) -> Connection<Z, A, CS, PS, PR, BrdcstStrm> {
Connection { Connection {
peer_sink, peer_sink,
state: State::WaitingForRequest, state: State::WaitingForRequest,
@ -175,7 +178,9 @@ where
return Err(e); return Err(e);
} else { } else {
// We still need to respond even if the response is this. // We still need to respond even if the response is this.
let _ = req.response_channel.send(Ok(PeerResponse::NA)); let _ = req
.response_channel
.send(Ok(PeerResponse::Protocol(ProtocolResponse::NA)));
} }
Ok(()) Ok(())
@ -185,17 +190,14 @@ where
async fn handle_peer_request(&mut self, req: PeerRequest) -> Result<(), PeerError> { async fn handle_peer_request(&mut self, req: PeerRequest) -> Result<(), PeerError> {
tracing::debug!("Received peer request: {:?}", req.id()); tracing::debug!("Received peer request: {:?}", req.id());
let ready_svc = self.peer_request_handler.ready().await?; let res = self.peer_request_handler.handle_peer_request(req).await?;
let res = ready_svc.call(req).await?;
if matches!(res, PeerResponse::NA) { // This will be an error if a response does not need to be sent
return Ok(()); if let Ok(res) = res.try_into() {
self.send_message_to_peer(res).await?;
} }
self.send_message_to_peer( Ok(())
res.try_into()
.expect("We just checked if the response was `NA`"),
)
.await
} }
/// Handles a message from a peer when we are in [`State::WaitingForResponse`]. /// Handles a message from a peer when we are in [`State::WaitingForResponse`].

View file

@ -4,7 +4,6 @@
//! perform a handshake and create a [`Client`]. //! perform a handshake and create a [`Client`].
//! //!
//! This is where outbound connections are created. //! This is where outbound connections are created.
//!
use std::{ use std::{
future::Future, future::Future,
pin::Pin, pin::Pin,
@ -16,9 +15,9 @@ use tokio::sync::OwnedSemaphorePermit;
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use crate::{ use crate::{
client::{Client, DoHandshakeRequest, HandShaker, HandshakeError, InternalPeerID}, client::{handshaker::HandShaker, Client, DoHandshakeRequest, HandshakeError, InternalPeerID},
AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone, AddressBook, BroadcastMessage, ConnectionDirection, CoreSyncSvc, NetworkZone, PeerSyncSvc,
PeerRequestHandler, PeerSyncSvc, ProtocolRequestHandler,
}; };
/// A request to connect to a peer. /// A request to connect to a peer.
@ -27,30 +26,32 @@ pub struct ConnectRequest<Z: NetworkZone> {
pub addr: Z::Addr, pub addr: Z::Addr,
/// A permit which will be held be the connection allowing you to set limits on the number of /// A permit which will be held be the connection allowing you to set limits on the number of
/// connections. /// connections.
pub permit: OwnedSemaphorePermit, ///
/// This doesn't have to be set.
pub permit: Option<OwnedSemaphorePermit>,
} }
/// The connector service, this service connects to peer and returns the [`Client`]. /// The connector service, this service connects to peer and returns the [`Client`].
pub struct Connector<Z: NetworkZone, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr> { pub struct Connector<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr> {
handshaker: HandShaker<Z, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr>, handshaker: HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>,
} }
impl<Z: NetworkZone, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr> impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
Connector<Z, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr> Connector<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
{ {
/// Create a new connector from a handshaker. /// Create a new connector from a handshaker.
pub fn new(handshaker: HandShaker<Z, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr>) -> Self { pub fn new(handshaker: HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>) -> Self {
Self { handshaker } Self { handshaker }
} }
} }
impl<Z: NetworkZone, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr, BrdcstStrm> impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr, BrdcstStrm>
Service<ConnectRequest<Z>> for Connector<Z, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr> Service<ConnectRequest<Z>> for Connector<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
where where
AdrBook: AddressBook<Z> + Clone, AdrBook: AddressBook<Z> + Clone,
CSync: CoreSyncSvc + Clone, CSync: CoreSyncSvc + Clone,
PSync: PeerSyncSvc<Z> + Clone, PSync: PeerSyncSvc<Z> + Clone,
ReqHdlr: PeerRequestHandler + Clone, ProtoHdlr: ProtocolRequestHandler + Clone,
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static, BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Clone + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Clone + Send + 'static,
{ {
@ -74,7 +75,7 @@ where
permit: req.permit, permit: req.permit,
peer_stream, peer_stream,
peer_sink, peer_sink,
direction: ConnectionDirection::OutBound, direction: ConnectionDirection::Outbound,
}; };
handshaker.ready().await?.call(req).await handshaker.ready().await?.call(req).await
} }

View file

@ -18,7 +18,7 @@ use tokio::{
time::{error::Elapsed, timeout}, time::{error::Elapsed, timeout},
}; };
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use tracing::{info_span, Instrument}; use tracing::{info_span, Instrument, Span};
use cuprate_pruning::{PruningError, PruningSeed}; use cuprate_pruning::{PruningError, PruningSeed};
use cuprate_wire::{ use cuprate_wire::{
@ -27,13 +27,13 @@ use cuprate_wire::{
PING_OK_RESPONSE_STATUS_TEXT, PING_OK_RESPONSE_STATUS_TEXT,
}, },
common::PeerSupportFlags, common::PeerSupportFlags,
BasicNodeData, BucketError, LevinCommand, Message, RequestMessage, ResponseMessage, AdminRequestMessage, AdminResponseMessage, BasicNodeData, BucketError, LevinCommand, Message,
}; };
use crate::{ use crate::{
client::{ client::{
connection::Connection, timeout_monitor::connection_timeout_monitor_task, Client, connection::Connection, request_handler::PeerRequestHandler,
InternalPeerID, PeerInformation, timeout_monitor::connection_timeout_monitor_task, Client, InternalPeerID, PeerInformation,
}, },
constants::{ constants::{
HANDSHAKE_TIMEOUT, MAX_EAGER_PROTOCOL_MESSAGES, MAX_PEERS_IN_PEER_LIST_MESSAGE, HANDSHAKE_TIMEOUT, MAX_EAGER_PROTOCOL_MESSAGES, MAX_PEERS_IN_PEER_LIST_MESSAGE,
@ -43,9 +43,12 @@ use crate::{
services::PeerSyncRequest, services::PeerSyncRequest,
AddressBook, AddressBookRequest, AddressBookResponse, BroadcastMessage, ConnectionDirection, AddressBook, AddressBookRequest, AddressBookResponse, BroadcastMessage, ConnectionDirection,
CoreSyncDataRequest, CoreSyncDataResponse, CoreSyncSvc, NetZoneAddress, NetworkZone, CoreSyncDataRequest, CoreSyncDataResponse, CoreSyncSvc, NetZoneAddress, NetworkZone,
PeerRequestHandler, PeerSyncSvc, SharedError, PeerSyncSvc, ProtocolRequestHandler, SharedError,
}; };
pub mod builder;
pub use builder::HandshakerBuilder;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum HandshakeError { pub enum HandshakeError {
#[error("The handshake timed out")] #[error("The handshake timed out")]
@ -78,21 +81,21 @@ pub struct DoHandshakeRequest<Z: NetworkZone> {
pub peer_sink: Z::Sink, pub peer_sink: Z::Sink,
/// The direction of the connection. /// The direction of the connection.
pub direction: ConnectionDirection, pub direction: ConnectionDirection,
/// A permit for this connection. /// An [`Option`]al permit for this connection.
pub permit: OwnedSemaphorePermit, pub permit: Option<OwnedSemaphorePermit>,
} }
/// The peer handshaking service. /// The peer handshaking service.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct HandShaker<Z: NetworkZone, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr> { pub struct HandShaker<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr> {
/// The address book service. /// The address book service.
address_book: AdrBook, address_book: AdrBook,
/// The core sync data service. /// The core sync data service.
core_sync_svc: CSync, core_sync_svc: CSync,
/// The peer sync service. /// The peer sync service.
peer_sync_svc: PSync, peer_sync_svc: PSync,
/// The peer request handler service. /// The protocol request handler service.
peer_request_svc: ReqHdlr, protocol_request_svc: ProtoHdlr,
/// Our [`BasicNodeData`] /// Our [`BasicNodeData`]
our_basic_node_data: BasicNodeData, our_basic_node_data: BasicNodeData,
@ -100,42 +103,46 @@ pub struct HandShaker<Z: NetworkZone, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrm
/// A function that returns a stream that will give items to be broadcast by a connection. /// A function that returns a stream that will give items to be broadcast by a connection.
broadcast_stream_maker: BrdcstStrmMkr, broadcast_stream_maker: BrdcstStrmMkr,
connection_parent_span: Span,
/// The network zone. /// The network zone.
_zone: PhantomData<Z>, _zone: PhantomData<Z>,
} }
impl<Z: NetworkZone, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr> impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
HandShaker<Z, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr> HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
{ {
/// Creates a new handshaker. /// Creates a new handshaker.
pub fn new( fn new(
address_book: AdrBook, address_book: AdrBook,
peer_sync_svc: PSync, peer_sync_svc: PSync,
core_sync_svc: CSync, core_sync_svc: CSync,
peer_request_svc: ReqHdlr, protocol_request_svc: ProtoHdlr,
broadcast_stream_maker: BrdcstStrmMkr, broadcast_stream_maker: BrdcstStrmMkr,
our_basic_node_data: BasicNodeData, our_basic_node_data: BasicNodeData,
connection_parent_span: Span,
) -> Self { ) -> Self {
Self { Self {
address_book, address_book,
peer_sync_svc, peer_sync_svc,
core_sync_svc, core_sync_svc,
peer_request_svc, protocol_request_svc,
broadcast_stream_maker, broadcast_stream_maker,
our_basic_node_data, our_basic_node_data,
connection_parent_span,
_zone: PhantomData, _zone: PhantomData,
} }
} }
} }
impl<Z: NetworkZone, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr, BrdcstStrm> impl<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr, BrdcstStrm>
Service<DoHandshakeRequest<Z>> for HandShaker<Z, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr> Service<DoHandshakeRequest<Z>>
for HandShaker<Z, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
where where
AdrBook: AddressBook<Z> + Clone, AdrBook: AddressBook<Z> + Clone,
CSync: CoreSyncSvc + Clone, CSync: CoreSyncSvc + Clone,
PSync: PeerSyncSvc<Z> + Clone, PSync: PeerSyncSvc<Z> + Clone,
ReqHdlr: PeerRequestHandler + Clone, ProtoHdlr: ProtocolRequestHandler + Clone,
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static, BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Clone + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Clone + Send + 'static,
{ {
@ -152,12 +159,14 @@ where
let broadcast_stream_maker = self.broadcast_stream_maker.clone(); let broadcast_stream_maker = self.broadcast_stream_maker.clone();
let address_book = self.address_book.clone(); let address_book = self.address_book.clone();
let peer_request_svc = self.peer_request_svc.clone(); let protocol_request_svc = self.protocol_request_svc.clone();
let core_sync_svc = self.core_sync_svc.clone(); let core_sync_svc = self.core_sync_svc.clone();
let peer_sync_svc = self.peer_sync_svc.clone(); let peer_sync_svc = self.peer_sync_svc.clone();
let our_basic_node_data = self.our_basic_node_data.clone(); let our_basic_node_data = self.our_basic_node_data.clone();
let span = info_span!(parent: &tracing::Span::current(), "handshaker", addr=%req.addr); let connection_parent_span = self.connection_parent_span.clone();
let span = info_span!(parent: &Span::current(), "handshaker", addr=%req.addr);
async move { async move {
timeout( timeout(
@ -168,8 +177,9 @@ where
address_book, address_book,
core_sync_svc, core_sync_svc,
peer_sync_svc, peer_sync_svc,
peer_request_svc, protocol_request_svc,
our_basic_node_data, our_basic_node_data,
connection_parent_span,
), ),
) )
.await? .await?
@ -190,11 +200,11 @@ pub async fn ping<N: NetworkZone>(addr: N::Addr) -> Result<u64, HandshakeError>
tracing::debug!("Made outbound connection to peer, sending ping."); tracing::debug!("Made outbound connection to peer, sending ping.");
peer_sink peer_sink
.send(Message::Request(RequestMessage::Ping).into()) .send(Message::Request(AdminRequestMessage::Ping).into())
.await?; .await?;
if let Some(res) = peer_stream.next().await { if let Some(res) = peer_stream.next().await {
if let Message::Response(ResponseMessage::Ping(ping)) = res? { if let Message::Response(AdminResponseMessage::Ping(ping)) = res? {
if ping.status == PING_OK_RESPONSE_STATUS_TEXT { if ping.status == PING_OK_RESPONSE_STATUS_TEXT {
tracing::debug!("Ping successful."); tracing::debug!("Ping successful.");
return Ok(ping.peer_id); return Ok(ping.peer_id);
@ -220,7 +230,8 @@ pub async fn ping<N: NetworkZone>(addr: N::Addr) -> Result<u64, HandshakeError>
} }
/// This function completes a handshake with the requested peer. /// This function completes a handshake with the requested peer.
async fn handshake<Z: NetworkZone, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr, BrdcstStrm>( #[allow(clippy::too_many_arguments)]
async fn handshake<Z: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr, BrdcstStrm>(
req: DoHandshakeRequest<Z>, req: DoHandshakeRequest<Z>,
broadcast_stream_maker: BrdcstStrmMkr, broadcast_stream_maker: BrdcstStrmMkr,
@ -228,14 +239,15 @@ async fn handshake<Z: NetworkZone, AdrBook, CSync, PSync, ReqHdlr, BrdcstStrmMkr
mut address_book: AdrBook, mut address_book: AdrBook,
mut core_sync_svc: CSync, mut core_sync_svc: CSync,
mut peer_sync_svc: PSync, mut peer_sync_svc: PSync,
peer_request_svc: ReqHdlr, protocol_request_handler: ProtoHdlr,
our_basic_node_data: BasicNodeData, our_basic_node_data: BasicNodeData,
connection_parent_span: Span,
) -> Result<Client<Z>, HandshakeError> ) -> Result<Client<Z>, HandshakeError>
where where
AdrBook: AddressBook<Z>, AdrBook: AddressBook<Z> + Clone,
CSync: CoreSyncSvc, CSync: CoreSyncSvc + Clone,
PSync: PeerSyncSvc<Z>, PSync: PeerSyncSvc<Z> + Clone,
ReqHdlr: PeerRequestHandler, ProtoHdlr: ProtocolRequestHandler,
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static, BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Send + 'static, BrdcstStrmMkr: Fn(InternalPeerID<Z::Addr>) -> BrdcstStrm + Send + 'static,
{ {
@ -252,19 +264,20 @@ where
let mut eager_protocol_messages = Vec::new(); let mut eager_protocol_messages = Vec::new();
let (peer_core_sync, peer_node_data) = match direction { let (peer_core_sync, peer_node_data) = match direction {
ConnectionDirection::InBound => { ConnectionDirection::Inbound => {
// Inbound handshake the peer sends the request. // Inbound handshake the peer sends the request.
tracing::debug!("waiting for handshake request."); tracing::debug!("waiting for handshake request.");
let Message::Request(RequestMessage::Handshake(handshake_req)) = wait_for_message::<Z>( let Message::Request(AdminRequestMessage::Handshake(handshake_req)) =
LevinCommand::Handshake, wait_for_message::<Z>(
true, LevinCommand::Handshake,
&mut peer_sink, true,
&mut peer_stream, &mut peer_sink,
&mut eager_protocol_messages, &mut peer_stream,
&our_basic_node_data, &mut eager_protocol_messages,
) &our_basic_node_data,
.await? )
.await?
else { else {
panic!("wait_for_message returned ok with wrong message."); panic!("wait_for_message returned ok with wrong message.");
}; };
@ -273,7 +286,7 @@ where
// We will respond to the handshake request later. // We will respond to the handshake request later.
(handshake_req.payload_data, handshake_req.node_data) (handshake_req.payload_data, handshake_req.node_data)
} }
ConnectionDirection::OutBound => { ConnectionDirection::Outbound => {
// Outbound handshake, we send the request. // Outbound handshake, we send the request.
send_hs_request::<Z, _>( send_hs_request::<Z, _>(
&mut peer_sink, &mut peer_sink,
@ -283,7 +296,7 @@ where
.await?; .await?;
// Wait for the handshake response. // Wait for the handshake response.
let Message::Response(ResponseMessage::Handshake(handshake_res)) = let Message::Response(AdminResponseMessage::Handshake(handshake_res)) =
wait_for_message::<Z>( wait_for_message::<Z>(
LevinCommand::Handshake, LevinCommand::Handshake,
false, false,
@ -373,13 +386,13 @@ where
// public_address, if Some, is the reachable address of the node. // public_address, if Some, is the reachable address of the node.
let public_address = 'check_out_addr: { let public_address = 'check_out_addr: {
match direction { match direction {
ConnectionDirection::InBound => { ConnectionDirection::Inbound => {
// First send the handshake response. // First send the handshake response.
send_hs_response::<Z, _, _>( send_hs_response::<Z, _, _>(
&mut peer_sink, &mut peer_sink,
&mut core_sync_svc, &mut core_sync_svc,
&mut address_book, &mut address_book,
our_basic_node_data, our_basic_node_data.clone(),
) )
.await?; .await?;
@ -411,7 +424,7 @@ where
// The peer did not specify a reachable port or the ping was not successful. // The peer did not specify a reachable port or the ping was not successful.
None None
} }
ConnectionDirection::OutBound => { ConnectionDirection::Outbound => {
let InternalPeerID::KnownAddr(outbound_addr) = addr else { let InternalPeerID::KnownAddr(outbound_addr) = addr else {
unreachable!("How could we make an outbound connection to an unknown address"); unreachable!("How could we make an outbound connection to an unknown address");
}; };
@ -424,37 +437,7 @@ where
tracing::debug!("Handshake complete."); tracing::debug!("Handshake complete.");
// Set up the connection data.
let error_slot = SharedError::new();
let (connection_guard, handle) = HandleBuilder::new().with_permit(permit).build(); let (connection_guard, handle) = HandleBuilder::new().with_permit(permit).build();
let (connection_tx, client_rx) = mpsc::channel(1);
let connection = Connection::<Z, _, _>::new(
peer_sink,
client_rx,
broadcast_stream_maker(addr),
peer_request_svc,
connection_guard,
error_slot.clone(),
);
let connection_span = tracing::error_span!(parent: &tracing::Span::none(), "connection", %addr);
let connection_handle = tokio::spawn(
connection
.run(peer_stream.fuse(), eager_protocol_messages)
.instrument(connection_span),
);
// Tell the core sync service about the new peer.
peer_sync_svc
.ready()
.await?
.call(PeerSyncRequest::IncomingCoreSyncData(
addr,
handle.clone(),
peer_core_sync,
))
.await?;
// Tell the address book about the new connection. // Tell the address book about the new connection.
address_book address_book
@ -471,6 +454,21 @@ where
}) })
.await?; .await?;
// Tell the core sync service about the new peer.
peer_sync_svc
.ready()
.await?
.call(PeerSyncRequest::IncomingCoreSyncData(
addr,
handle.clone(),
peer_core_sync,
))
.await?;
// Set up the connection data.
let error_slot = SharedError::new();
let (connection_tx, client_rx) = mpsc::channel(1);
let info = PeerInformation { let info = PeerInformation {
id: addr, id: addr,
handle, handle,
@ -478,6 +476,32 @@ where
pruning_seed, pruning_seed,
}; };
let request_handler = PeerRequestHandler {
address_book_svc: address_book.clone(),
our_sync_svc: core_sync_svc.clone(),
peer_sync_svc: peer_sync_svc.clone(),
protocol_request_handler,
our_basic_node_data,
peer_info: info.clone(),
};
let connection = Connection::<Z, _, _, _, _, _>::new(
peer_sink,
client_rx,
broadcast_stream_maker(addr),
request_handler,
connection_guard,
error_slot.clone(),
);
let connection_span =
tracing::error_span!(parent: &connection_parent_span, "connection", %addr);
let connection_handle = tokio::spawn(
connection
.run(peer_stream.fuse(), eager_protocol_messages)
.instrument(connection_span),
);
let semaphore = Arc::new(Semaphore::new(1)); let semaphore = Arc::new(Semaphore::new(1));
let timeout_handle = tokio::spawn(connection_timeout_monitor_task( let timeout_handle = tokio::spawn(connection_timeout_monitor_task(
@ -502,7 +526,7 @@ where
Ok(client) Ok(client)
} }
/// Sends a [`RequestMessage::Handshake`] down the peer sink. /// Sends a [`AdminRequestMessage::Handshake`] down the peer sink.
async fn send_hs_request<Z: NetworkZone, CSync>( async fn send_hs_request<Z: NetworkZone, CSync>(
peer_sink: &mut Z::Sink, peer_sink: &mut Z::Sink,
core_sync_svc: &mut CSync, core_sync_svc: &mut CSync,
@ -525,13 +549,13 @@ where
tracing::debug!("Sending handshake request."); tracing::debug!("Sending handshake request.");
peer_sink peer_sink
.send(Message::Request(RequestMessage::Handshake(req)).into()) .send(Message::Request(AdminRequestMessage::Handshake(req)).into())
.await?; .await?;
Ok(()) Ok(())
} }
/// Sends a [`ResponseMessage::Handshake`] down the peer sink. /// Sends a [`AdminResponseMessage::Handshake`] down the peer sink.
async fn send_hs_response<Z: NetworkZone, CSync, AdrBook>( async fn send_hs_response<Z: NetworkZone, CSync, AdrBook>(
peer_sink: &mut Z::Sink, peer_sink: &mut Z::Sink,
core_sync_svc: &mut CSync, core_sync_svc: &mut CSync,
@ -568,7 +592,7 @@ where
tracing::debug!("Sending handshake response."); tracing::debug!("Sending handshake response.");
peer_sink peer_sink
.send(Message::Response(ResponseMessage::Handshake(res)).into()) .send(Message::Response(AdminResponseMessage::Handshake(res)).into())
.await?; .await?;
Ok(()) Ok(())
@ -619,7 +643,7 @@ async fn wait_for_message<Z: NetworkZone>(
} }
match req_message { match req_message {
RequestMessage::SupportFlags => { AdminRequestMessage::SupportFlags => {
if !allow_support_flag_req { if !allow_support_flag_req {
return Err(HandshakeError::PeerSentInvalidMessage( return Err(HandshakeError::PeerSentInvalidMessage(
"Peer sent 2 support flag requests", "Peer sent 2 support flag requests",
@ -631,7 +655,7 @@ async fn wait_for_message<Z: NetworkZone>(
allow_support_flag_req = false; allow_support_flag_req = false;
continue; continue;
} }
RequestMessage::Ping => { AdminRequestMessage::Ping => {
if !allow_ping { if !allow_ping {
return Err(HandshakeError::PeerSentInvalidMessage( return Err(HandshakeError::PeerSentInvalidMessage(
"Peer sent 2 ping requests", "Peer sent 2 ping requests",
@ -674,7 +698,7 @@ async fn wait_for_message<Z: NetworkZone>(
)))? )))?
} }
/// Sends a [`ResponseMessage::SupportFlags`] down the peer sink. /// Sends a [`AdminResponseMessage::SupportFlags`] down the peer sink.
async fn send_support_flags<Z: NetworkZone>( async fn send_support_flags<Z: NetworkZone>(
peer_sink: &mut Z::Sink, peer_sink: &mut Z::Sink,
support_flags: PeerSupportFlags, support_flags: PeerSupportFlags,
@ -682,7 +706,7 @@ async fn send_support_flags<Z: NetworkZone>(
tracing::debug!("Sending support flag response."); tracing::debug!("Sending support flag response.");
Ok(peer_sink Ok(peer_sink
.send( .send(
Message::Response(ResponseMessage::SupportFlags(SupportFlagsResponse { Message::Response(AdminResponseMessage::SupportFlags(SupportFlagsResponse {
support_flags, support_flags,
})) }))
.into(), .into(),
@ -690,7 +714,7 @@ async fn send_support_flags<Z: NetworkZone>(
.await?) .await?)
} }
/// Sends a [`ResponseMessage::Ping`] down the peer sink. /// Sends a [`AdminResponseMessage::Ping`] down the peer sink.
async fn send_ping_response<Z: NetworkZone>( async fn send_ping_response<Z: NetworkZone>(
peer_sink: &mut Z::Sink, peer_sink: &mut Z::Sink,
peer_id: u64, peer_id: u64,
@ -698,7 +722,7 @@ async fn send_ping_response<Z: NetworkZone>(
tracing::debug!("Sending ping response."); tracing::debug!("Sending ping response.");
Ok(peer_sink Ok(peer_sink
.send( .send(
Message::Response(ResponseMessage::Ping(PingResponse { Message::Response(AdminResponseMessage::Ping(PingResponse {
status: PING_OK_RESPONSE_STATUS_TEXT, status: PING_OK_RESPONSE_STATUS_TEXT,
peer_id, peer_id,
})) }))

View file

@ -0,0 +1,292 @@
use std::marker::PhantomData;
use futures::{stream, Stream};
use tracing::Span;
use cuprate_wire::BasicNodeData;
use crate::{
client::{handshaker::HandShaker, InternalPeerID},
AddressBook, BroadcastMessage, CoreSyncSvc, NetworkZone, PeerSyncSvc, ProtocolRequestHandler,
};
mod dummy;
pub use dummy::{
DummyAddressBook, DummyCoreSyncSvc, DummyPeerSyncSvc, DummyProtocolRequestHandler,
};
/// A [`HandShaker`] [`Service`](tower::Service) builder.
///
/// This builder applies default values to make usage easier, behaviour and drawbacks of the defaults are documented
/// on the `with_*` method to change it, for example [`HandshakerBuilder::with_protocol_request_handler`].
///
/// If you want to use any network other than [`Mainnet`](crate::Network::Mainnet)
/// you will need to change the core sync service with [`HandshakerBuilder::with_core_sync_svc`],
/// see that method for details.
#[derive(Debug, Clone)]
pub struct HandshakerBuilder<
N: NetworkZone,
AdrBook = DummyAddressBook,
CSync = DummyCoreSyncSvc,
PSync = DummyPeerSyncSvc,
ProtoHdlr = DummyProtocolRequestHandler,
BrdcstStrmMkr = fn(
InternalPeerID<<N as NetworkZone>::Addr>,
) -> stream::Pending<BroadcastMessage>,
> {
/// The address book service.
address_book: AdrBook,
/// The core sync data service.
core_sync_svc: CSync,
/// The peer sync service.
peer_sync_svc: PSync,
/// The protocol request service.
protocol_request_svc: ProtoHdlr,
/// Our [`BasicNodeData`]
our_basic_node_data: BasicNodeData,
/// A function that returns a stream that will give items to be broadcast by a connection.
broadcast_stream_maker: BrdcstStrmMkr,
/// The [`Span`] that will set as the parent to the connection [`Span`].
connection_parent_span: Option<Span>,
/// The network zone.
_zone: PhantomData<N>,
}
impl<N: NetworkZone> HandshakerBuilder<N> {
/// Creates a new builder with our node's basic node data.
pub fn new(our_basic_node_data: BasicNodeData) -> Self {
Self {
address_book: DummyAddressBook,
core_sync_svc: DummyCoreSyncSvc::static_mainnet_genesis(),
peer_sync_svc: DummyPeerSyncSvc,
protocol_request_svc: DummyProtocolRequestHandler,
our_basic_node_data,
broadcast_stream_maker: |_| stream::pending(),
connection_parent_span: None,
_zone: PhantomData,
}
}
}
impl<N: NetworkZone, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
HandshakerBuilder<N, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
{
/// Changes the address book to the provided one.
///
/// ## Default Address Book
///
/// The default address book is used if this function is not called.
///
/// The default address book's only drawback is that it does not keep track of peers and therefore
/// bans.
pub fn with_address_book<NAdrBook>(
self,
new_address_book: NAdrBook,
) -> HandshakerBuilder<N, NAdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr>
where
NAdrBook: AddressBook<N> + Clone,
{
let HandshakerBuilder {
core_sync_svc,
peer_sync_svc,
protocol_request_svc,
our_basic_node_data,
broadcast_stream_maker,
connection_parent_span,
_zone,
..
} = self;
HandshakerBuilder {
address_book: new_address_book,
core_sync_svc,
peer_sync_svc,
protocol_request_svc,
our_basic_node_data,
broadcast_stream_maker,
connection_parent_span,
_zone,
}
}
/// Changes the core sync service to the provided one.
///
/// The core sync service should keep track of our nodes core sync data.
///
/// ## Default Core Sync Service
///
/// The default core sync service is used if this method is not called.
///
/// The default core sync service will just use the mainnet genesis block, to use other network's
/// genesis see [`DummyCoreSyncSvc::static_stagenet_genesis`] and [`DummyCoreSyncSvc::static_testnet_genesis`].
/// The drawbacks to keeping this the default is that it will always return the mainnet genesis as our nodes
/// sync info, which means peers won't know our actual chain height, this may or may not be a problem for
/// different use cases.
pub fn with_core_sync_svc<NCSync>(
self,
new_core_sync_svc: NCSync,
) -> HandshakerBuilder<N, AdrBook, NCSync, PSync, ProtoHdlr, BrdcstStrmMkr>
where
NCSync: CoreSyncSvc + Clone,
{
let HandshakerBuilder {
address_book,
peer_sync_svc,
protocol_request_svc,
our_basic_node_data,
broadcast_stream_maker,
connection_parent_span,
_zone,
..
} = self;
HandshakerBuilder {
address_book,
core_sync_svc: new_core_sync_svc,
peer_sync_svc,
protocol_request_svc,
our_basic_node_data,
broadcast_stream_maker,
connection_parent_span,
_zone,
}
}
/// Changes the peer sync service, which keeps track of peers sync states.
///
/// ## Default Peer Sync Service
///
/// The default peer sync service will be used if this method is not called.
///
/// The default peer sync service will not keep track of peers sync states.
pub fn with_peer_sync_svc<NPSync>(
self,
new_peer_sync_svc: NPSync,
) -> HandshakerBuilder<N, AdrBook, CSync, NPSync, ProtoHdlr, BrdcstStrmMkr>
where
NPSync: PeerSyncSvc<N> + Clone,
{
let HandshakerBuilder {
address_book,
core_sync_svc,
protocol_request_svc,
our_basic_node_data,
broadcast_stream_maker,
connection_parent_span,
_zone,
..
} = self;
HandshakerBuilder {
address_book,
core_sync_svc,
peer_sync_svc: new_peer_sync_svc,
protocol_request_svc,
our_basic_node_data,
broadcast_stream_maker,
connection_parent_span,
_zone,
}
}
/// Changes the protocol request handler, which handles [`ProtocolRequest`](crate::ProtocolRequest)s to our node.
///
/// ## Default Protocol Request Handler
///
/// The default protocol request handler will not respond to any protocol requests, this should not
/// be an issue as long as peers do not think we are ahead of them, if they do they will send requests
/// for our blocks, and we won't respond which will cause them to disconnect.
pub fn with_protocol_request_handler<NProtoHdlr>(
self,
new_protocol_handler: NProtoHdlr,
) -> HandshakerBuilder<N, AdrBook, CSync, PSync, NProtoHdlr, BrdcstStrmMkr>
where
NProtoHdlr: ProtocolRequestHandler + Clone,
{
let HandshakerBuilder {
address_book,
core_sync_svc,
peer_sync_svc,
our_basic_node_data,
broadcast_stream_maker,
connection_parent_span,
_zone,
..
} = self;
HandshakerBuilder {
address_book,
core_sync_svc,
peer_sync_svc,
protocol_request_svc: new_protocol_handler,
our_basic_node_data,
broadcast_stream_maker,
connection_parent_span,
_zone,
}
}
/// Changes the broadcast stream maker, which is used to create streams that yield messages to broadcast.
///
/// ## Default Broadcast Stream Maker
///
/// The default broadcast stream maker just returns [`stream::Pending`], i.e. the returned stream will not
/// produce any messages to broadcast, this is not a problem if your use case does not require broadcasting
/// messages.
pub fn with_broadcast_stream_maker<NBrdcstStrmMkr, BrdcstStrm>(
self,
new_broadcast_stream_maker: NBrdcstStrmMkr,
) -> HandshakerBuilder<N, AdrBook, CSync, PSync, ProtoHdlr, NBrdcstStrmMkr>
where
BrdcstStrm: Stream<Item = BroadcastMessage> + Send + 'static,
NBrdcstStrmMkr: Fn(InternalPeerID<N::Addr>) -> BrdcstStrm + Clone + Send + 'static,
{
let HandshakerBuilder {
address_book,
core_sync_svc,
peer_sync_svc,
protocol_request_svc,
our_basic_node_data,
connection_parent_span,
_zone,
..
} = self;
HandshakerBuilder {
address_book,
core_sync_svc,
peer_sync_svc,
protocol_request_svc,
our_basic_node_data,
broadcast_stream_maker: new_broadcast_stream_maker,
connection_parent_span,
_zone,
}
}
/// Changes the parent [`Span`] of the connection task to the one provided.
///
/// ## Default Connection Parent Span
///
/// The default connection span will be [`Span::none`].
pub fn with_connection_parent_span(self, connection_parent_span: Span) -> Self {
Self {
connection_parent_span: Some(connection_parent_span),
..self
}
}
/// Builds the [`HandShaker`].
pub fn build(self) -> HandShaker<N, AdrBook, CSync, PSync, ProtoHdlr, BrdcstStrmMkr> {
HandShaker::new(
self.address_book,
self.peer_sync_svc,
self.core_sync_svc,
self.protocol_request_svc,
self.broadcast_stream_maker,
self.our_basic_node_data,
self.connection_parent_span.unwrap_or(Span::none()),
)
}
}

View file

@ -0,0 +1,151 @@
use std::{
future::{ready, Ready},
task::{Context, Poll},
};
use tower::Service;
use cuprate_wire::CoreSyncData;
use crate::{
services::{
AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse,
PeerSyncRequest, PeerSyncResponse,
},
NetworkZone, ProtocolRequest, ProtocolResponse,
};
/// A dummy peer sync service, that doesn't actually keep track of peers sync states.
#[derive(Debug, Clone)]
pub struct DummyPeerSyncSvc;
impl<N: NetworkZone> Service<PeerSyncRequest<N>> for DummyPeerSyncSvc {
type Response = PeerSyncResponse<N>;
type Error = tower::BoxError;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: PeerSyncRequest<N>) -> Self::Future {
ready(Ok(match req {
PeerSyncRequest::PeersToSyncFrom { .. } => PeerSyncResponse::PeersToSyncFrom(vec![]),
PeerSyncRequest::IncomingCoreSyncData(_, _, _) => PeerSyncResponse::Ok,
}))
}
}
/// A dummy core sync service that just returns static [`CoreSyncData`].
#[derive(Debug, Clone)]
pub struct DummyCoreSyncSvc(CoreSyncData);
impl DummyCoreSyncSvc {
/// Returns a [`DummyCoreSyncSvc`] that will just return the mainnet genesis [`CoreSyncData`].
pub fn static_mainnet_genesis() -> DummyCoreSyncSvc {
DummyCoreSyncSvc(CoreSyncData {
cumulative_difficulty: 1,
cumulative_difficulty_top64: 0,
current_height: 1,
pruning_seed: 0,
top_id: hex_literal::hex!(
"418015bb9ae982a1975da7d79277c2705727a56894ba0fb246adaabb1f4632e3"
),
top_version: 1,
})
}
/// Returns a [`DummyCoreSyncSvc`] that will just return the testnet genesis [`CoreSyncData`].
pub fn static_testnet_genesis() -> DummyCoreSyncSvc {
DummyCoreSyncSvc(CoreSyncData {
cumulative_difficulty: 1,
cumulative_difficulty_top64: 0,
current_height: 1,
pruning_seed: 0,
top_id: hex_literal::hex!(
"48ca7cd3c8de5b6a4d53d2861fbdaedca141553559f9be9520068053cda8430b"
),
top_version: 1,
})
}
/// Returns a [`DummyCoreSyncSvc`] that will just return the stagenet genesis [`CoreSyncData`].
pub fn static_stagenet_genesis() -> DummyCoreSyncSvc {
DummyCoreSyncSvc(CoreSyncData {
cumulative_difficulty: 1,
cumulative_difficulty_top64: 0,
current_height: 1,
pruning_seed: 0,
top_id: hex_literal::hex!(
"76ee3cc98646292206cd3e86f74d88b4dcc1d937088645e9b0cbca84b7ce74eb"
),
top_version: 1,
})
}
/// Returns a [`DummyCoreSyncSvc`] that will return the provided [`CoreSyncData`].
pub fn static_custom(data: CoreSyncData) -> DummyCoreSyncSvc {
DummyCoreSyncSvc(data)
}
}
impl Service<CoreSyncDataRequest> for DummyCoreSyncSvc {
type Response = CoreSyncDataResponse;
type Error = tower::BoxError;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: CoreSyncDataRequest) -> Self::Future {
ready(Ok(CoreSyncDataResponse(self.0.clone())))
}
}
/// A dummy address book that doesn't actually keep track of peers.
#[derive(Debug, Clone)]
pub struct DummyAddressBook;
impl<N: NetworkZone> Service<AddressBookRequest<N>> for DummyAddressBook {
type Response = AddressBookResponse<N>;
type Error = tower::BoxError;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: AddressBookRequest<N>) -> Self::Future {
ready(Ok(match req {
AddressBookRequest::GetWhitePeers(_) => AddressBookResponse::Peers(vec![]),
AddressBookRequest::TakeRandomGrayPeer { .. }
| AddressBookRequest::TakeRandomPeer { .. }
| AddressBookRequest::TakeRandomWhitePeer { .. } => {
return ready(Err("dummy address book does not hold peers".into()));
}
AddressBookRequest::NewConnection { .. } | AddressBookRequest::IncomingPeerList(_) => {
AddressBookResponse::Ok
}
AddressBookRequest::IsPeerBanned(_) => AddressBookResponse::IsPeerBanned(false),
}))
}
}
/// A dummy protocol request handler.
#[derive(Debug, Clone)]
pub struct DummyProtocolRequestHandler;
impl Service<ProtocolRequest> for DummyProtocolRequestHandler {
type Response = ProtocolResponse;
type Error = tower::BoxError;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ProtocolRequest) -> Self::Future {
ready(Ok(ProtocolResponse::NA))
}
}

View file

@ -0,0 +1,144 @@
use futures::TryFutureExt;
use tower::ServiceExt;
use cuprate_wire::{
admin::{
PingResponse, SupportFlagsResponse, TimedSyncRequest, TimedSyncResponse,
PING_OK_RESPONSE_STATUS_TEXT,
},
AdminRequestMessage, AdminResponseMessage, BasicNodeData,
};
use crate::{
client::PeerInformation,
constants::MAX_PEERS_IN_PEER_LIST_MESSAGE,
services::{
AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse,
PeerSyncRequest,
},
AddressBook, CoreSyncSvc, NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc,
ProtocolRequestHandler,
};
#[derive(thiserror::Error, Debug, Copy, Clone, Eq, PartialEq)]
enum PeerRequestHandlerError {
#[error("Received a handshake request during a connection.")]
ReceivedHandshakeDuringConnection,
}
/// The peer request handler, handles incoming [`PeerRequest`]s to our node.
#[derive(Debug, Clone)]
pub(crate) struct PeerRequestHandler<Z: NetworkZone, A, CS, PS, PR> {
/// The address book service.
pub address_book_svc: A,
/// Our core sync service.
pub our_sync_svc: CS,
/// The peer sync service.
pub peer_sync_svc: PS,
/// The handler for [`ProtocolRequest`](crate::ProtocolRequest)s to our node.
pub protocol_request_handler: PR,
/// The basic node data of our node.
pub our_basic_node_data: BasicNodeData,
/// The information on the connected peer.
pub peer_info: PeerInformation<Z::Addr>,
}
impl<Z: NetworkZone, A, CS, PS, PR> PeerRequestHandler<Z, A, CS, PS, PR>
where
Z: NetworkZone,
A: AddressBook<Z>,
CS: CoreSyncSvc,
PS: PeerSyncSvc<Z>,
PR: ProtocolRequestHandler,
{
/// Handles an incoming [`PeerRequest`] to our node.
pub async fn handle_peer_request(
&mut self,
req: PeerRequest,
) -> Result<PeerResponse, tower::BoxError> {
match req {
PeerRequest::Admin(admin_req) => match admin_req {
AdminRequestMessage::Handshake(_) => {
Err(PeerRequestHandlerError::ReceivedHandshakeDuringConnection.into())
}
AdminRequestMessage::SupportFlags => {
let support_flags = self.our_basic_node_data.support_flags;
Ok(PeerResponse::Admin(AdminResponseMessage::SupportFlags(
SupportFlagsResponse { support_flags },
)))
}
AdminRequestMessage::Ping => Ok(PeerResponse::Admin(AdminResponseMessage::Ping(
PingResponse {
peer_id: self.our_basic_node_data.peer_id,
status: PING_OK_RESPONSE_STATUS_TEXT,
},
))),
AdminRequestMessage::TimedSync(timed_sync_req) => {
let res = self.handle_timed_sync_request(timed_sync_req).await?;
Ok(PeerResponse::Admin(AdminResponseMessage::TimedSync(res)))
}
},
PeerRequest::Protocol(protocol_req) => {
// TODO: add limits here
self.protocol_request_handler
.ready()
.await?
.call(protocol_req)
.map_ok(PeerResponse::Protocol)
.await
}
}
}
/// Handles a [`TimedSyncRequest`] to our node.
async fn handle_timed_sync_request(
&mut self,
req: TimedSyncRequest,
) -> Result<TimedSyncResponse, tower::BoxError> {
// TODO: add a limit on the amount of these requests in a certain time period.
let peer_id = self.peer_info.id;
let handle = self.peer_info.handle.clone();
self.peer_sync_svc
.ready()
.await?
.call(PeerSyncRequest::IncomingCoreSyncData(
peer_id,
handle,
req.payload_data,
))
.await?;
let AddressBookResponse::Peers(peers) = self
.address_book_svc
.ready()
.await?
.call(AddressBookRequest::GetWhitePeers(
MAX_PEERS_IN_PEER_LIST_MESSAGE,
))
.await?
else {
panic!("Address book sent incorrect response!");
};
let CoreSyncDataResponse(core_sync_data) = self
.our_sync_svc
.ready()
.await?
.call(CoreSyncDataRequest)
.await?;
Ok(TimedSyncResponse {
payload_data: core_sync_data,
local_peerlist_new: peers.into_iter().map(Into::into).collect(),
})
}
}

View file

@ -12,7 +12,7 @@ use tokio::{
use tower::ServiceExt; use tower::ServiceExt;
use tracing::instrument; use tracing::instrument;
use cuprate_wire::admin::TimedSyncRequest; use cuprate_wire::{admin::TimedSyncRequest, AdminRequestMessage, AdminResponseMessage};
use crate::{ use crate::{
client::{connection::ConnectionTaskRequest, InternalPeerID}, client::{connection::ConnectionTaskRequest, InternalPeerID},
@ -87,15 +87,15 @@ where
tracing::debug!(parent: &ping_span, "Sending timed sync to peer"); tracing::debug!(parent: &ping_span, "Sending timed sync to peer");
connection_tx connection_tx
.send(ConnectionTaskRequest { .send(ConnectionTaskRequest {
request: PeerRequest::TimedSync(TimedSyncRequest { request: PeerRequest::Admin(AdminRequestMessage::TimedSync(TimedSyncRequest {
payload_data: core_sync_data, payload_data: core_sync_data,
}), })),
response_channel: tx, response_channel: tx,
permit: Some(permit), permit: Some(permit),
}) })
.await?; .await?;
let PeerResponse::TimedSync(timed_sync) = rx.await?? else { let PeerResponse::Admin(AdminResponseMessage::TimedSync(timed_sync)) = rx.await?? else {
panic!("Connection task returned wrong response!"); panic!("Connection task returned wrong response!");
}; };

View file

@ -23,10 +23,8 @@ impl HandleBuilder {
} }
/// Sets the permit for this connection. /// Sets the permit for this connection.
/// pub fn with_permit(mut self, permit: Option<OwnedSemaphorePermit>) -> Self {
/// This must be called at least once. self.permit = permit;
pub fn with_permit(mut self, permit: OwnedSemaphorePermit) -> Self {
self.permit = Some(permit);
self self
} }
@ -39,7 +37,7 @@ impl HandleBuilder {
( (
ConnectionGuard { ConnectionGuard {
token: token.clone(), token: token.clone(),
_permit: self.permit.expect("connection permit was not set!"), _permit: self.permit,
}, },
ConnectionHandle { ConnectionHandle {
token: token.clone(), token: token.clone(),
@ -56,7 +54,7 @@ pub struct BanPeer(pub Duration);
/// A struct given to the connection task. /// A struct given to the connection task.
pub struct ConnectionGuard { pub struct ConnectionGuard {
token: CancellationToken, token: CancellationToken,
_permit: OwnedSemaphorePermit, _permit: Option<OwnedSemaphorePermit>,
} }
impl ConnectionGuard { impl ConnectionGuard {

View file

@ -1,4 +1,4 @@
//! # Monero P2P //! # Cuprate P2P Core
//! //!
//! This crate is general purpose P2P networking library for working with Monero. This is a low level //! This crate is general purpose P2P networking library for working with Monero. This is a low level
//! crate, which means it may seem verbose for a lot of use cases, if you want a crate that handles //! crate, which means it may seem verbose for a lot of use cases, if you want a crate that handles
@ -6,13 +6,57 @@
//! //!
//! # Network Zones //! # Network Zones
//! //!
//! This crate abstracts over network zones, Tor/I2p/clearnet with the [NetworkZone] trait. Currently only clearnet is implemented: [ClearNet](network_zones::ClearNet). //! This crate abstracts over network zones, Tor/I2p/clearnet with the [NetworkZone] trait. Currently only clearnet is implemented: [ClearNet].
//! //!
//! # Usage //! # Usage
//! //!
//! TODO //! ## Connecting to a peer
//! //!
use std::{fmt::Debug, future::Future, hash::Hash, pin::Pin}; //! ```rust
//! # use std::{net::SocketAddr, str::FromStr};
//! #
//! # use tower::ServiceExt;
//! #
//! # use cuprate_p2p_core::{
//! # client::{ConnectRequest, Connector, HandshakerBuilder},
//! # ClearNet, Network,
//! # };
//! # use cuprate_wire::{common::PeerSupportFlags, BasicNodeData};
//! # use cuprate_test_utils::monerod::monerod;
//! #
//! # tokio_test::block_on(async move {
//! #
//! # let _monerod = monerod::<&str>([]).await;
//! # let addr = _monerod.p2p_addr();
//! #
//! // The information about our local node.
//! let our_basic_node_data = BasicNodeData {
//! my_port: 0,
//! network_id: Network::Mainnet.network_id(),
//! peer_id: 0,
//! support_flags: PeerSupportFlags::FLUFFY_BLOCKS,
//! rpc_port: 0,
//! rpc_credits_per_hash: 0,
//! };
//!
//! // See [`HandshakerBuilder`] for information about the default values set, they may not be
//! // appropriate for every use case.
//! let handshaker = HandshakerBuilder::<ClearNet>::new(our_basic_node_data).build();
//!
//! // The outbound connector.
//! let mut connector = Connector::new(handshaker);
//!
//! // The connection.
//! let connection = connector
//! .oneshot(ConnectRequest {
//! addr,
//! permit: None,
//! })
//! .await
//! .unwrap();
//! # });
//! ```
use std::{fmt::Debug, future::Future, hash::Hash};
use futures::{Sink, Stream}; use futures::{Sink, Stream};
@ -25,21 +69,27 @@ pub mod client;
mod constants; mod constants;
pub mod error; pub mod error;
pub mod handles; pub mod handles;
pub mod network_zones; mod network_zones;
pub mod protocol; pub mod protocol;
pub mod services; pub mod services;
pub use error::*; pub use error::*;
pub use network_zones::{ClearNet, ClearNetServerCfg};
pub use protocol::*; pub use protocol::*;
use services::*; use services::*;
//re-export
pub use cuprate_helper::network::Network;
/// The direction of a connection.
#[derive(Debug, Copy, Clone, Eq, PartialEq)] #[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ConnectionDirection { pub enum ConnectionDirection {
InBound, /// An inbound connection to our node.
OutBound, Inbound,
/// An outbound connection from our node.
Outbound,
} }
#[cfg(not(feature = "borsh"))] /// An address on a specific [`NetworkZone`].
pub trait NetZoneAddress: pub trait NetZoneAddress:
TryFrom<NetworkAddress, Error = NetworkAddressIncorrectZone> TryFrom<NetworkAddress, Error = NetworkAddressIncorrectZone>
+ Into<NetworkAddress> + Into<NetworkAddress>
@ -56,46 +106,19 @@ pub trait NetZoneAddress:
/// that include the port, to be able to facilitate this network addresses must have a ban ID /// that include the port, to be able to facilitate this network addresses must have a ban ID
/// which for hidden services could just be the address it self but for clear net addresses will /// which for hidden services could just be the address it self but for clear net addresses will
/// be the IP address. /// be the IP address.
/// TODO: IP zone banning? ///
type BanID: Debug + Hash + Eq + Clone + Copy + Send + 'static; /// - TODO: IP zone banning?
/// - TODO: rename this to Host.
/// Changes the port of this address to `port`.
fn set_port(&mut self, port: u16);
fn make_canonical(&mut self);
fn ban_id(&self) -> Self::BanID;
fn should_add_to_peer_list(&self) -> bool;
}
#[cfg(feature = "borsh")]
pub trait NetZoneAddress:
TryFrom<NetworkAddress, Error = NetworkAddressIncorrectZone>
+ Into<NetworkAddress>
+ std::fmt::Display
+ borsh::BorshSerialize
+ borsh::BorshDeserialize
+ Hash
+ Eq
+ Copy
+ Send
+ Sync
+ Unpin
+ 'static
{
/// Cuprate needs to be able to ban peers by IP addresses and not just by SocketAddr as
/// that include the port, to be able to facilitate this network addresses must have a ban ID
/// which for hidden services could just be the address it self but for clear net addresses will
/// be the IP address.
/// TODO: IP zone banning?
type BanID: Debug + Hash + Eq + Clone + Copy + Send + 'static; type BanID: Debug + Hash + Eq + Clone + Copy + Send + 'static;
/// Changes the port of this address to `port`. /// Changes the port of this address to `port`.
fn set_port(&mut self, port: u16); fn set_port(&mut self, port: u16);
/// Turns this address into its canonical form.
fn make_canonical(&mut self); fn make_canonical(&mut self);
/// Returns the [`Self::BanID`] for this address.
fn ban_id(&self) -> Self::BanID; fn ban_id(&self) -> Self::BanID;
fn should_add_to_peer_list(&self) -> bool; fn should_add_to_peer_list(&self) -> bool;
@ -136,6 +159,15 @@ pub trait NetworkZone: Clone + Copy + Send + 'static {
/// Config used to start a server which listens for incoming connections. /// Config used to start a server which listens for incoming connections.
type ServerCfg: Clone + Debug + Send + 'static; type ServerCfg: Clone + Debug + Send + 'static;
/// Connects to a peer with the given address.
///
/// <div class="warning">
///
/// This does not complete a handshake with the peer, to do that see the [crate](crate) docs.
///
/// </div>
///
/// Returns the [`Self::Stream`] and [`Self::Sink`] to send messages to the peer.
async fn connect_to_peer( async fn connect_to_peer(
addr: Self::Addr, addr: Self::Addr,
) -> Result<(Self::Stream, Self::Sink), std::io::Error>; ) -> Result<(Self::Stream, Self::Sink), std::io::Error>;
@ -206,55 +238,48 @@ pub trait CoreSyncSvc:
CoreSyncDataRequest, CoreSyncDataRequest,
Response = CoreSyncDataResponse, Response = CoreSyncDataResponse,
Error = tower::BoxError, Error = tower::BoxError,
Future = Pin< Future = Self::Future2,
Box<
dyn Future<Output = Result<CoreSyncDataResponse, tower::BoxError>> + Send + 'static,
>,
>,
> + Send > + Send
+ 'static + 'static
{ {
// This allows us to put more restrictive bounds on the future without defining the future here
// explicitly.
type Future2: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static;
} }
impl<T> CoreSyncSvc for T where impl<T> CoreSyncSvc for T
where
T: tower::Service< T: tower::Service<
CoreSyncDataRequest, CoreSyncDataRequest,
Response = CoreSyncDataResponse, Response = CoreSyncDataResponse,
Error = tower::BoxError, Error = tower::BoxError,
Future = Pin<
Box<
dyn Future<Output = Result<CoreSyncDataResponse, tower::BoxError>>
+ Send
+ 'static,
>,
>,
> + Send > + Send
+ 'static + 'static,
T::Future: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static,
{ {
type Future2 = T::Future;
} }
pub trait PeerRequestHandler: pub trait ProtocolRequestHandler:
tower::Service< tower::Service<
PeerRequest, ProtocolRequest,
Response = PeerResponse, Response = ProtocolResponse,
Error = tower::BoxError, Error = tower::BoxError,
Future = Pin< Future = Self::Future2,
Box<dyn Future<Output = Result<PeerResponse, tower::BoxError>> + Send + 'static>,
>,
> + Send > + Send
+ 'static + 'static
{ {
// This allows us to put more restrictive bounds on the future without defining the future here
// explicitly.
type Future2: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static;
} }
impl<T> PeerRequestHandler for T where impl<T> ProtocolRequestHandler for T
T: tower::Service< where
PeerRequest, T: tower::Service<ProtocolRequest, Response = ProtocolResponse, Error = tower::BoxError>
Response = PeerResponse, + Send
Error = tower::BoxError, + 'static,
Future = Pin< T::Future: Future<Output = Result<Self::Response, Self::Error>> + Send + 'static,
Box<dyn Future<Output = Result<PeerResponse, tower::BoxError>> + Send + 'static>,
>,
> + Send
+ 'static
{ {
type Future2 = T::Future;
} }

View file

@ -1,13 +1,16 @@
//! This module defines InternalRequests and InternalResponses. Cuprate's P2P works by translating network messages into an internal //! This module defines [`PeerRequest`] and [`PeerResponse`]. Cuprate's P2P crates works by translating network messages into an internal
//! request/ response, this is easy for levin "requests" and "responses" (admin messages) but takes a bit more work with "notifications" //! request/response enums, this is easy for levin "requests" and "responses" (admin messages) but takes a bit more work with "notifications"
//! (protocol messages). //! (protocol messages).
//! //!
//! Some notifications are easy to translate, like `GetObjectsRequest` is obviously a request but others like `NewFluffyBlock` are a //! Some notifications are easy to translate, like [`GetObjectsRequest`] is obviously a request but others like [`NewFluffyBlock`] are a
//! bit tri cker. To translate a `NewFluffyBlock` into a request/ response we will have to look to see if we asked for `FluffyMissingTransactionsRequest` //! bit tricker. To translate a [`NewFluffyBlock`] into a request/ response we will have to look to see if we asked for [`FluffyMissingTransactionsRequest`],
//! if we have we interpret `NewFluffyBlock` as a response if not its a request that doesn't require a response. //! if we have, we interpret [`NewFluffyBlock`] as a response, if not, it's a request that doesn't require a response.
//! //!
//! Here is every P2P request/ response. *note admin messages are already request/ response so "Handshake" is actually made of a HandshakeRequest & HandshakeResponse //! Here is every P2P request/response.
//! //!
//! *note admin messages are already request/response so "Handshake" is actually made of a HandshakeRequest & HandshakeResponse
//!
//! ```md
//! Admin: //! Admin:
//! Handshake, //! Handshake,
//! TimedSync, //! TimedSync,
@ -21,16 +24,14 @@
//! Request: NewBlock, Response: None, //! Request: NewBlock, Response: None,
//! Request: NewFluffyBlock, Response: None, //! Request: NewFluffyBlock, Response: None,
//! Request: NewTransactions, Response: None //! Request: NewTransactions, Response: None
//!```
//! //!
use cuprate_wire::{ use cuprate_wire::{
admin::{
HandshakeRequest, HandshakeResponse, PingResponse, SupportFlagsResponse, TimedSyncRequest,
TimedSyncResponse,
},
protocol::{ protocol::{
ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest, ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest,
GetObjectsResponse, GetTxPoolCompliment, NewBlock, NewFluffyBlock, NewTransactions, GetObjectsResponse, GetTxPoolCompliment, NewBlock, NewFluffyBlock, NewTransactions,
}, },
AdminRequestMessage, AdminResponseMessage,
}; };
mod try_from; mod try_from;
@ -60,12 +61,7 @@ pub enum BroadcastMessage {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum PeerRequest { pub enum ProtocolRequest {
Handshake(HandshakeRequest),
TimedSync(TimedSyncRequest),
Ping,
SupportFlags,
GetObjects(GetObjectsRequest), GetObjects(GetObjectsRequest),
GetChain(ChainRequest), GetChain(ChainRequest),
FluffyMissingTxs(FluffyMissingTransactionsRequest), FluffyMissingTxs(FluffyMissingTransactionsRequest),
@ -75,41 +71,47 @@ pub enum PeerRequest {
NewTransactions(NewTransactions), NewTransactions(NewTransactions),
} }
#[derive(Debug, Clone)]
pub enum PeerRequest {
Admin(AdminRequestMessage),
Protocol(ProtocolRequest),
}
impl PeerRequest { impl PeerRequest {
pub fn id(&self) -> MessageID { pub fn id(&self) -> MessageID {
match self { match self {
PeerRequest::Handshake(_) => MessageID::Handshake, PeerRequest::Admin(admin_req) => match admin_req {
PeerRequest::TimedSync(_) => MessageID::TimedSync, AdminRequestMessage::Handshake(_) => MessageID::Handshake,
PeerRequest::Ping => MessageID::Ping, AdminRequestMessage::TimedSync(_) => MessageID::TimedSync,
PeerRequest::SupportFlags => MessageID::SupportFlags, AdminRequestMessage::Ping => MessageID::Ping,
AdminRequestMessage::SupportFlags => MessageID::SupportFlags,
PeerRequest::GetObjects(_) => MessageID::GetObjects, },
PeerRequest::GetChain(_) => MessageID::GetChain, PeerRequest::Protocol(protocol_request) => match protocol_request {
PeerRequest::FluffyMissingTxs(_) => MessageID::FluffyMissingTxs, ProtocolRequest::GetObjects(_) => MessageID::GetObjects,
PeerRequest::GetTxPoolCompliment(_) => MessageID::GetTxPoolCompliment, ProtocolRequest::GetChain(_) => MessageID::GetChain,
PeerRequest::NewBlock(_) => MessageID::NewBlock, ProtocolRequest::FluffyMissingTxs(_) => MessageID::FluffyMissingTxs,
PeerRequest::NewFluffyBlock(_) => MessageID::NewFluffyBlock, ProtocolRequest::GetTxPoolCompliment(_) => MessageID::GetTxPoolCompliment,
PeerRequest::NewTransactions(_) => MessageID::NewTransactions, ProtocolRequest::NewBlock(_) => MessageID::NewBlock,
ProtocolRequest::NewFluffyBlock(_) => MessageID::NewFluffyBlock,
ProtocolRequest::NewTransactions(_) => MessageID::NewTransactions,
},
} }
} }
pub fn needs_response(&self) -> bool { pub fn needs_response(&self) -> bool {
!matches!( !matches!(
self, self,
PeerRequest::NewBlock(_) PeerRequest::Protocol(
| PeerRequest::NewFluffyBlock(_) ProtocolRequest::NewBlock(_)
| PeerRequest::NewTransactions(_) | ProtocolRequest::NewFluffyBlock(_)
| ProtocolRequest::NewTransactions(_)
)
) )
} }
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum PeerResponse { pub enum ProtocolResponse {
Handshake(HandshakeResponse),
TimedSync(TimedSyncResponse),
Ping(PingResponse),
SupportFlags(SupportFlagsResponse),
GetObjects(GetObjectsResponse), GetObjects(GetObjectsResponse),
GetChain(ChainResponse), GetChain(ChainResponse),
NewFluffyBlock(NewFluffyBlock), NewFluffyBlock(NewFluffyBlock),
@ -117,20 +119,29 @@ pub enum PeerResponse {
NA, NA,
} }
#[derive(Debug, Clone)]
pub enum PeerResponse {
Admin(AdminResponseMessage),
Protocol(ProtocolResponse),
}
impl PeerResponse { impl PeerResponse {
pub fn id(&self) -> MessageID { pub fn id(&self) -> Option<MessageID> {
match self { Some(match self {
PeerResponse::Handshake(_) => MessageID::Handshake, PeerResponse::Admin(admin_res) => match admin_res {
PeerResponse::TimedSync(_) => MessageID::TimedSync, AdminResponseMessage::Handshake(_) => MessageID::Handshake,
PeerResponse::Ping(_) => MessageID::Ping, AdminResponseMessage::TimedSync(_) => MessageID::TimedSync,
PeerResponse::SupportFlags(_) => MessageID::SupportFlags, AdminResponseMessage::Ping(_) => MessageID::Ping,
AdminResponseMessage::SupportFlags(_) => MessageID::SupportFlags,
},
PeerResponse::Protocol(protocol_res) => match protocol_res {
ProtocolResponse::GetObjects(_) => MessageID::GetObjects,
ProtocolResponse::GetChain(_) => MessageID::GetChain,
ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock,
ProtocolResponse::NewTransactions(_) => MessageID::NewFluffyBlock,
PeerResponse::GetObjects(_) => MessageID::GetObjects, ProtocolResponse::NA => return None,
PeerResponse::GetChain(_) => MessageID::GetChain, },
PeerResponse::NewFluffyBlock(_) => MessageID::NewBlock, })
PeerResponse::NewTransactions(_) => MessageID::NewFluffyBlock,
PeerResponse::NA => panic!("Can't get message ID for a non existent response"),
}
} }
} }

View file

@ -1,150 +1,111 @@
//! This module contains the implementations of [`TryFrom`] and [`From`] to convert between //! This module contains the implementations of [`TryFrom`] and [`From`] to convert between
//! [`Message`], [`PeerRequest`] and [`PeerResponse`]. //! [`Message`], [`PeerRequest`] and [`PeerResponse`].
use cuprate_wire::{Message, ProtocolMessage, RequestMessage, ResponseMessage}; use cuprate_wire::{Message, ProtocolMessage};
use super::{PeerRequest, PeerResponse}; use crate::{PeerRequest, PeerResponse, ProtocolRequest, ProtocolResponse};
#[derive(Debug)] #[derive(Debug)]
pub struct MessageConversionError; pub struct MessageConversionError;
macro_rules! match_body { impl From<ProtocolRequest> for ProtocolMessage {
(match $value: ident {$($body:tt)*} ($left:pat => $right_ty:expr) $($todo:tt)*) => { fn from(value: ProtocolRequest) -> Self {
match_body!( match $value { match value {
$left => $right_ty, ProtocolRequest::GetObjects(val) => ProtocolMessage::GetObjectsRequest(val),
$($body)* ProtocolRequest::GetChain(val) => ProtocolMessage::ChainRequest(val),
} $($todo)* ) ProtocolRequest::FluffyMissingTxs(val) => {
}; ProtocolMessage::FluffyMissingTransactionsRequest(val)
(match $value: ident {$($body:tt)*}) => {
match $value {
$($body)*
}
};
}
macro_rules! from {
($left_ty:ident, $right_ty:ident, {$($left:ident $(($val: ident))? = $right:ident $(($vall: ident))?,)+}) => {
impl From<$left_ty> for $right_ty {
fn from(value: $left_ty) -> Self {
match_body!( match value {}
$(($left_ty::$left$(($val))? => $right_ty::$right$(($vall))?))+
)
} }
ProtocolRequest::GetTxPoolCompliment(val) => ProtocolMessage::GetTxPoolCompliment(val),
ProtocolRequest::NewBlock(val) => ProtocolMessage::NewBlock(val),
ProtocolRequest::NewFluffyBlock(val) => ProtocolMessage::NewFluffyBlock(val),
ProtocolRequest::NewTransactions(val) => ProtocolMessage::NewTransactions(val),
} }
}; }
} }
macro_rules! try_from { impl TryFrom<ProtocolMessage> for ProtocolRequest {
($left_ty:ident, $right_ty:ident, {$($left:ident $(($val: ident))? = $right:ident $(($vall: ident))?,)+}) => {
impl TryFrom<$left_ty> for $right_ty {
type Error = MessageConversionError;
fn try_from(value: $left_ty) -> Result<Self, Self::Error> {
Ok(match_body!( match value {
_ => return Err(MessageConversionError)
}
$(($left_ty::$left$(($val))? => $right_ty::$right$(($vall))?))+
))
}
}
};
}
macro_rules! from_try_from {
($left_ty:ident, $right_ty:ident, {$($left:ident $(($val: ident))? = $right:ident $(($vall: ident))?,)+}) => {
try_from!($left_ty, $right_ty, {$($left $(($val))? = $right $(($vall))?,)+});
from!($right_ty, $left_ty, {$($right $(($val))? = $left $(($vall))?,)+});
};
}
macro_rules! try_from_try_from {
($left_ty:ident, $right_ty:ident, {$($left:ident $(($val: ident))? = $right:ident $(($vall: ident))?,)+}) => {
try_from!($left_ty, $right_ty, {$($left $(($val))? = $right $(($vall))?,)+});
try_from!($right_ty, $left_ty, {$($right $(($val))? = $left $(($val))?,)+});
};
}
from_try_from!(PeerRequest, RequestMessage,{
Handshake(val) = Handshake(val),
Ping = Ping,
SupportFlags = SupportFlags,
TimedSync(val) = TimedSync(val),
});
try_from_try_from!(PeerRequest, ProtocolMessage,{
NewBlock(val) = NewBlock(val),
NewFluffyBlock(val) = NewFluffyBlock(val),
GetObjects(val) = GetObjectsRequest(val),
GetChain(val) = ChainRequest(val),
NewTransactions(val) = NewTransactions(val),
FluffyMissingTxs(val) = FluffyMissingTransactionsRequest(val),
GetTxPoolCompliment(val) = GetTxPoolCompliment(val),
});
impl TryFrom<Message> for PeerRequest {
type Error = MessageConversionError; type Error = MessageConversionError;
fn try_from(value: Message) -> Result<Self, Self::Error> { fn try_from(value: ProtocolMessage) -> Result<Self, Self::Error> {
match value { Ok(match value {
Message::Request(req) => Ok(req.into()), ProtocolMessage::GetObjectsRequest(val) => ProtocolRequest::GetObjects(val),
Message::Protocol(pro) => pro.try_into(), ProtocolMessage::ChainRequest(val) => ProtocolRequest::GetChain(val),
_ => Err(MessageConversionError), ProtocolMessage::FluffyMissingTransactionsRequest(val) => {
} ProtocolRequest::FluffyMissingTxs(val)
}
ProtocolMessage::GetTxPoolCompliment(val) => ProtocolRequest::GetTxPoolCompliment(val),
ProtocolMessage::NewBlock(val) => ProtocolRequest::NewBlock(val),
ProtocolMessage::NewFluffyBlock(val) => ProtocolRequest::NewFluffyBlock(val),
ProtocolMessage::NewTransactions(val) => ProtocolRequest::NewTransactions(val),
ProtocolMessage::GetObjectsResponse(_) | ProtocolMessage::ChainEntryResponse(_) => {
return Err(MessageConversionError)
}
})
} }
} }
impl From<PeerRequest> for Message { impl From<PeerRequest> for Message {
fn from(value: PeerRequest) -> Self { fn from(value: PeerRequest) -> Self {
match value { match value {
PeerRequest::Handshake(val) => Message::Request(RequestMessage::Handshake(val)), PeerRequest::Admin(val) => Message::Request(val),
PeerRequest::Ping => Message::Request(RequestMessage::Ping), PeerRequest::Protocol(val) => Message::Protocol(val.into()),
PeerRequest::SupportFlags => Message::Request(RequestMessage::SupportFlags),
PeerRequest::TimedSync(val) => Message::Request(RequestMessage::TimedSync(val)),
PeerRequest::NewBlock(val) => Message::Protocol(ProtocolMessage::NewBlock(val)),
PeerRequest::NewFluffyBlock(val) => {
Message::Protocol(ProtocolMessage::NewFluffyBlock(val))
}
PeerRequest::GetObjects(val) => {
Message::Protocol(ProtocolMessage::GetObjectsRequest(val))
}
PeerRequest::GetChain(val) => Message::Protocol(ProtocolMessage::ChainRequest(val)),
PeerRequest::NewTransactions(val) => {
Message::Protocol(ProtocolMessage::NewTransactions(val))
}
PeerRequest::FluffyMissingTxs(val) => {
Message::Protocol(ProtocolMessage::FluffyMissingTransactionsRequest(val))
}
PeerRequest::GetTxPoolCompliment(val) => {
Message::Protocol(ProtocolMessage::GetTxPoolCompliment(val))
}
} }
} }
} }
from_try_from!(PeerResponse, ResponseMessage,{ impl TryFrom<Message> for PeerRequest {
Handshake(val) = Handshake(val), type Error = MessageConversionError;
Ping(val) = Ping(val),
SupportFlags(val) = SupportFlags(val),
TimedSync(val) = TimedSync(val),
});
try_from_try_from!(PeerResponse, ProtocolMessage,{ fn try_from(value: Message) -> Result<Self, Self::Error> {
NewFluffyBlock(val) = NewFluffyBlock(val), match value {
GetObjects(val) = GetObjectsResponse(val), Message::Request(req) => Ok(PeerRequest::Admin(req)),
GetChain(val) = ChainEntryResponse(val), Message::Protocol(pro) => Ok(PeerRequest::Protocol(pro.try_into()?)),
NewTransactions(val) = NewTransactions(val), Message::Response(_) => Err(MessageConversionError),
}
}
}
}); impl TryFrom<ProtocolResponse> for ProtocolMessage {
type Error = MessageConversionError;
fn try_from(value: ProtocolResponse) -> Result<Self, Self::Error> {
Ok(match value {
ProtocolResponse::NewTransactions(val) => ProtocolMessage::NewTransactions(val),
ProtocolResponse::NewFluffyBlock(val) => ProtocolMessage::NewFluffyBlock(val),
ProtocolResponse::GetChain(val) => ProtocolMessage::ChainEntryResponse(val),
ProtocolResponse::GetObjects(val) => ProtocolMessage::GetObjectsResponse(val),
ProtocolResponse::NA => return Err(MessageConversionError),
})
}
}
impl TryFrom<ProtocolMessage> for ProtocolResponse {
type Error = MessageConversionError;
fn try_from(value: ProtocolMessage) -> Result<Self, Self::Error> {
Ok(match value {
ProtocolMessage::NewTransactions(val) => ProtocolResponse::NewTransactions(val),
ProtocolMessage::NewFluffyBlock(val) => ProtocolResponse::NewFluffyBlock(val),
ProtocolMessage::ChainEntryResponse(val) => ProtocolResponse::GetChain(val),
ProtocolMessage::GetObjectsResponse(val) => ProtocolResponse::GetObjects(val),
ProtocolMessage::ChainRequest(_)
| ProtocolMessage::FluffyMissingTransactionsRequest(_)
| ProtocolMessage::GetObjectsRequest(_)
| ProtocolMessage::GetTxPoolCompliment(_)
| ProtocolMessage::NewBlock(_) => return Err(MessageConversionError),
})
}
}
impl TryFrom<Message> for PeerResponse { impl TryFrom<Message> for PeerResponse {
type Error = MessageConversionError; type Error = MessageConversionError;
fn try_from(value: Message) -> Result<Self, Self::Error> { fn try_from(value: Message) -> Result<Self, Self::Error> {
match value { match value {
Message::Response(res) => Ok(res.into()), Message::Response(res) => Ok(PeerResponse::Admin(res)),
Message::Protocol(pro) => pro.try_into(), Message::Protocol(pro) => Ok(PeerResponse::Protocol(pro.try_into()?)),
_ => Err(MessageConversionError), Message::Request(_) => Err(MessageConversionError),
} }
} }
} }
@ -154,27 +115,8 @@ impl TryFrom<PeerResponse> for Message {
fn try_from(value: PeerResponse) -> Result<Self, Self::Error> { fn try_from(value: PeerResponse) -> Result<Self, Self::Error> {
Ok(match value { Ok(match value {
PeerResponse::Handshake(val) => Message::Response(ResponseMessage::Handshake(val)), PeerResponse::Admin(val) => Message::Response(val),
PeerResponse::Ping(val) => Message::Response(ResponseMessage::Ping(val)), PeerResponse::Protocol(val) => Message::Protocol(val.try_into()?),
PeerResponse::SupportFlags(val) => {
Message::Response(ResponseMessage::SupportFlags(val))
}
PeerResponse::TimedSync(val) => Message::Response(ResponseMessage::TimedSync(val)),
PeerResponse::NewFluffyBlock(val) => {
Message::Protocol(ProtocolMessage::NewFluffyBlock(val))
}
PeerResponse::GetObjects(val) => {
Message::Protocol(ProtocolMessage::GetObjectsResponse(val))
}
PeerResponse::GetChain(val) => {
Message::Protocol(ProtocolMessage::ChainEntryResponse(val))
}
PeerResponse::NewTransactions(val) => {
Message::Protocol(ProtocolMessage::NewTransactions(val))
}
PeerResponse::NA => return Err(MessageConversionError),
}) })
} }
} }

View file

@ -6,6 +6,7 @@ use crate::{
NetworkZone, NetworkZone,
}; };
/// A request to the service that keeps track of peers sync states.
pub enum PeerSyncRequest<N: NetworkZone> { pub enum PeerSyncRequest<N: NetworkZone> {
/// Request some peers to sync from. /// Request some peers to sync from.
/// ///
@ -15,10 +16,11 @@ pub enum PeerSyncRequest<N: NetworkZone> {
current_cumulative_difficulty: u128, current_cumulative_difficulty: u128,
block_needed: Option<u64>, block_needed: Option<u64>,
}, },
/// Add/update a peers core sync data to the sync state service. /// Add/update a peer's core sync data.
IncomingCoreSyncData(InternalPeerID<N::Addr>, ConnectionHandle, CoreSyncData), IncomingCoreSyncData(InternalPeerID<N::Addr>, ConnectionHandle, CoreSyncData),
} }
/// A response from the service that keeps track of peers sync states.
pub enum PeerSyncResponse<N: NetworkZone> { pub enum PeerSyncResponse<N: NetworkZone> {
/// The return value of [`PeerSyncRequest::PeersToSyncFrom`]. /// The return value of [`PeerSyncRequest::PeersToSyncFrom`].
PeersToSyncFrom(Vec<InternalPeerID<N::Addr>>), PeersToSyncFrom(Vec<InternalPeerID<N::Addr>>),
@ -26,10 +28,16 @@ pub enum PeerSyncResponse<N: NetworkZone> {
Ok, Ok,
} }
/// A request to the core sync service for our node's [`CoreSyncData`].
pub struct CoreSyncDataRequest; pub struct CoreSyncDataRequest;
/// A response from the core sync service containing our [`CoreSyncData`].
pub struct CoreSyncDataResponse(pub CoreSyncData); pub struct CoreSyncDataResponse(pub CoreSyncData);
/// A [`NetworkZone`] specific [`PeerListEntryBase`].
///
/// Using this type instead of [`PeerListEntryBase`] in the address book makes
/// usage easier for the rest of the P2P code as we can guarantee only the correct addresses will be stored and returned.
#[derive(Debug, Copy, Clone, Eq, PartialEq)] #[derive(Debug, Copy, Clone, Eq, PartialEq)]
#[cfg_attr( #[cfg_attr(
feature = "borsh", feature = "borsh",
@ -57,6 +65,7 @@ impl<A: NetZoneAddress> From<ZoneSpecificPeerListEntryBase<A>> for cuprate_wire:
} }
} }
/// An error converting a [`PeerListEntryBase`] into a [`ZoneSpecificPeerListEntryBase`].
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum PeerListConversionError { pub enum PeerListConversionError {
#[error("Address is in incorrect zone")] #[error("Address is in incorrect zone")]
@ -82,6 +91,7 @@ impl<A: NetZoneAddress> TryFrom<cuprate_wire::PeerListEntryBase>
} }
} }
/// A request to the address book service.
pub enum AddressBookRequest<Z: NetworkZone> { pub enum AddressBookRequest<Z: NetworkZone> {
/// Tells the address book that we have connected or received a connection from a peer. /// Tells the address book that we have connected or received a connection from a peer.
NewConnection { NewConnection {
@ -123,6 +133,7 @@ pub enum AddressBookRequest<Z: NetworkZone> {
IsPeerBanned(Z::Addr), IsPeerBanned(Z::Addr),
} }
/// A response from the address book service.
pub enum AddressBookResponse<Z: NetworkZone> { pub enum AddressBookResponse<Z: NetworkZone> {
Ok, Ok,
Peer(ZoneSpecificPeerListEntryBase<Z::Addr>), Peer(ZoneSpecificPeerListEntryBase<Z::Addr>),

View file

@ -2,7 +2,6 @@
use std::{ use std::{
net::SocketAddr, net::SocketAddr,
pin::Pin, pin::Pin,
sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
time::Duration, time::Duration,
}; };
@ -13,7 +12,6 @@ use tokio::{
tcp::{OwnedReadHalf, OwnedWriteHalf}, tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpListener, TcpStream, TcpListener, TcpStream,
}, },
sync::Semaphore,
time::timeout, time::timeout,
}; };
use tokio_util::{ use tokio_util::{
@ -24,9 +22,11 @@ use tower::{Service, ServiceExt};
use cuprate_helper::network::Network; use cuprate_helper::network::Network;
use cuprate_p2p_core::{ use cuprate_p2p_core::{
client::{ConnectRequest, Connector, DoHandshakeRequest, HandShaker, InternalPeerID}, client::{
network_zones::ClearNetServerCfg, handshaker::HandshakerBuilder, ConnectRequest, Connector, DoHandshakeRequest,
ConnectionDirection, NetworkZone, InternalPeerID,
},
ClearNetServerCfg, ConnectionDirection, NetworkZone,
}; };
use cuprate_wire::{ use cuprate_wire::{
common::PeerSupportFlags, common::PeerSupportFlags,
@ -36,9 +36,6 @@ use cuprate_wire::{
use cuprate_test_utils::monerod::monerod; use cuprate_test_utils::monerod::monerod;
mod utils;
use utils::*;
/// A network zone equal to clear net where every message sent is turned into a fragmented message. /// A network zone equal to clear net where every message sent is turned into a fragmented message.
/// Does not support sending fragmented or dummy messages manually. /// Does not support sending fragmented or dummy messages manually.
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
@ -135,9 +132,6 @@ impl Encoder<LevinMessage<Message>> for FragmentCodec {
#[tokio::test] #[tokio::test]
async fn fragmented_handshake_cuprate_to_monerod() { async fn fragmented_handshake_cuprate_to_monerod() {
let semaphore = Arc::new(Semaphore::new(10));
let permit = semaphore.acquire_owned().await.unwrap();
let monerod = monerod(["--fixed-difficulty=1", "--out-peers=0"]).await; let monerod = monerod(["--fixed-difficulty=1", "--out-peers=0"]).await;
let our_basic_node_data = BasicNodeData { let our_basic_node_data = BasicNodeData {
@ -149,14 +143,7 @@ async fn fragmented_handshake_cuprate_to_monerod() {
rpc_credits_per_hash: 0, rpc_credits_per_hash: 0,
}; };
let handshaker = HandShaker::<FragNet, _, _, _, _, _>::new( let handshaker = HandshakerBuilder::<FragNet>::new(our_basic_node_data).build();
DummyAddressBook,
DummyPeerSyncSvc,
DummyCoreSyncSvc,
DummyPeerRequestHandlerSvc,
|_| futures::stream::pending(),
our_basic_node_data,
);
let mut connector = Connector::new(handshaker); let mut connector = Connector::new(handshaker);
@ -166,7 +153,7 @@ async fn fragmented_handshake_cuprate_to_monerod() {
.unwrap() .unwrap()
.call(ConnectRequest { .call(ConnectRequest {
addr: monerod.p2p_addr(), addr: monerod.p2p_addr(),
permit, permit: None,
}) })
.await .await
.unwrap(); .unwrap();
@ -174,9 +161,6 @@ async fn fragmented_handshake_cuprate_to_monerod() {
#[tokio::test] #[tokio::test]
async fn fragmented_handshake_monerod_to_cuprate() { async fn fragmented_handshake_monerod_to_cuprate() {
let semaphore = Arc::new(Semaphore::new(10));
let permit = semaphore.acquire_owned().await.unwrap();
let our_basic_node_data = BasicNodeData { let our_basic_node_data = BasicNodeData {
my_port: 18081, my_port: 18081,
network_id: Network::Mainnet.network_id(), network_id: Network::Mainnet.network_id(),
@ -186,14 +170,7 @@ async fn fragmented_handshake_monerod_to_cuprate() {
rpc_credits_per_hash: 0, rpc_credits_per_hash: 0,
}; };
let mut handshaker = HandShaker::<FragNet, _, _, _, _, _>::new( let mut handshaker = HandshakerBuilder::<FragNet>::new(our_basic_node_data).build();
DummyAddressBook,
DummyPeerSyncSvc,
DummyCoreSyncSvc,
DummyPeerRequestHandlerSvc,
|_| futures::stream::pending(),
our_basic_node_data,
);
let ip = "127.0.0.1".parse().unwrap(); let ip = "127.0.0.1".parse().unwrap();
@ -215,8 +192,8 @@ async fn fragmented_handshake_monerod_to_cuprate() {
addr: InternalPeerID::KnownAddr(addr.unwrap()), // This is clear net all addresses are known. addr: InternalPeerID::KnownAddr(addr.unwrap()), // This is clear net all addresses are known.
peer_stream: stream, peer_stream: stream,
peer_sink: sink, peer_sink: sink,
direction: ConnectionDirection::InBound, direction: ConnectionDirection::Inbound,
permit, permit: None,
}) })
.await .await
.unwrap(); .unwrap();

View file

@ -6,10 +6,7 @@ use cuprate_p2p_core::handles::HandleBuilder;
#[test] #[test]
fn send_ban_signal() { fn send_ban_signal() {
let semaphore = Arc::new(Semaphore::new(5)); let (guard, mut connection_handle) = HandleBuilder::default().build();
let (guard, mut connection_handle) = HandleBuilder::default()
.with_permit(semaphore.try_acquire_owned().unwrap())
.build();
connection_handle.ban_peer(Duration::from_secs(300)); connection_handle.ban_peer(Duration::from_secs(300));
@ -28,10 +25,7 @@ fn send_ban_signal() {
#[test] #[test]
fn multiple_ban_signals() { fn multiple_ban_signals() {
let semaphore = Arc::new(Semaphore::new(5)); let (guard, mut connection_handle) = HandleBuilder::default().build();
let (guard, mut connection_handle) = HandleBuilder::default()
.with_permit(semaphore.try_acquire_owned().unwrap())
.build();
connection_handle.ban_peer(Duration::from_secs(300)); connection_handle.ban_peer(Duration::from_secs(300));
connection_handle.ban_peer(Duration::from_secs(301)); connection_handle.ban_peer(Duration::from_secs(301));
@ -55,7 +49,7 @@ fn multiple_ban_signals() {
fn dropped_guard_sends_disconnect_signal() { fn dropped_guard_sends_disconnect_signal() {
let semaphore = Arc::new(Semaphore::new(5)); let semaphore = Arc::new(Semaphore::new(5));
let (guard, connection_handle) = HandleBuilder::default() let (guard, connection_handle) = HandleBuilder::default()
.with_permit(semaphore.try_acquire_owned().unwrap()) .with_permit(Some(semaphore.try_acquire_owned().unwrap()))
.build(); .build();
assert!(!connection_handle.is_closed()); assert!(!connection_handle.is_closed());

View file

@ -1,9 +1,8 @@
use std::{sync::Arc, time::Duration}; use std::time::Duration;
use futures::StreamExt; use futures::StreamExt;
use tokio::{ use tokio::{
io::{duplex, split}, io::{duplex, split},
sync::Semaphore,
time::timeout, time::timeout,
}; };
use tokio_util::codec::{FramedRead, FramedWrite}; use tokio_util::codec::{FramedRead, FramedWrite};
@ -13,9 +12,11 @@ use cuprate_helper::network::Network;
use cuprate_wire::{common::PeerSupportFlags, BasicNodeData, MoneroWireCodec}; use cuprate_wire::{common::PeerSupportFlags, BasicNodeData, MoneroWireCodec};
use cuprate_p2p_core::{ use cuprate_p2p_core::{
client::{ConnectRequest, Connector, DoHandshakeRequest, HandShaker, InternalPeerID}, client::{
network_zones::{ClearNet, ClearNetServerCfg}, handshaker::HandshakerBuilder, ConnectRequest, Connector, DoHandshakeRequest,
ConnectionDirection, NetworkZone, InternalPeerID,
},
ClearNet, ClearNetServerCfg, ConnectionDirection, NetworkZone,
}; };
use cuprate_test_utils::{ use cuprate_test_utils::{
@ -23,18 +24,10 @@ use cuprate_test_utils::{
test_netzone::{TestNetZone, TestNetZoneAddr}, test_netzone::{TestNetZone, TestNetZoneAddr},
}; };
mod utils;
use utils::*;
#[tokio::test] #[tokio::test]
async fn handshake_cuprate_to_cuprate() { async fn handshake_cuprate_to_cuprate() {
// Tests a Cuprate <-> Cuprate handshake by making 2 handshake services and making them talk to // Tests a Cuprate <-> Cuprate handshake by making 2 handshake services and making them talk to
// each other. // each other.
let semaphore = Arc::new(Semaphore::new(10));
let permit_1 = semaphore.clone().acquire_owned().await.unwrap();
let permit_2 = semaphore.acquire_owned().await.unwrap();
let our_basic_node_data_1 = BasicNodeData { let our_basic_node_data_1 = BasicNodeData {
my_port: 0, my_port: 0,
network_id: Network::Mainnet.network_id(), network_id: Network::Mainnet.network_id(),
@ -48,23 +41,11 @@ async fn handshake_cuprate_to_cuprate() {
let mut our_basic_node_data_2 = our_basic_node_data_1.clone(); let mut our_basic_node_data_2 = our_basic_node_data_1.clone();
our_basic_node_data_2.peer_id = 2344; our_basic_node_data_2.peer_id = 2344;
let mut handshaker_1 = HandShaker::<TestNetZone<true, true, true>, _, _, _, _, _>::new( let mut handshaker_1 =
DummyAddressBook, HandshakerBuilder::<TestNetZone<true, true, true>>::new(our_basic_node_data_1).build();
DummyPeerSyncSvc,
DummyCoreSyncSvc,
DummyPeerRequestHandlerSvc,
|_| futures::stream::pending(),
our_basic_node_data_1,
);
let mut handshaker_2 = HandShaker::<TestNetZone<true, true, true>, _, _, _, _, _>::new( let mut handshaker_2 =
DummyAddressBook, HandshakerBuilder::<TestNetZone<true, true, true>>::new(our_basic_node_data_2).build();
DummyPeerSyncSvc,
DummyCoreSyncSvc,
DummyPeerRequestHandlerSvc,
|_| futures::stream::pending(),
our_basic_node_data_2,
);
let (p1, p2) = duplex(50_000); let (p1, p2) = duplex(50_000);
@ -75,16 +56,16 @@ async fn handshake_cuprate_to_cuprate() {
addr: InternalPeerID::KnownAddr(TestNetZoneAddr(888)), addr: InternalPeerID::KnownAddr(TestNetZoneAddr(888)),
peer_stream: FramedRead::new(p2_receiver, MoneroWireCodec::default()), peer_stream: FramedRead::new(p2_receiver, MoneroWireCodec::default()),
peer_sink: FramedWrite::new(p2_sender, MoneroWireCodec::default()), peer_sink: FramedWrite::new(p2_sender, MoneroWireCodec::default()),
direction: ConnectionDirection::OutBound, direction: ConnectionDirection::Outbound,
permit: permit_1, permit: None,
}; };
let p2_handshake_req = DoHandshakeRequest { let p2_handshake_req = DoHandshakeRequest {
addr: InternalPeerID::KnownAddr(TestNetZoneAddr(444)), addr: InternalPeerID::KnownAddr(TestNetZoneAddr(444)),
peer_stream: FramedRead::new(p1_receiver, MoneroWireCodec::default()), peer_stream: FramedRead::new(p1_receiver, MoneroWireCodec::default()),
peer_sink: FramedWrite::new(p1_sender, MoneroWireCodec::default()), peer_sink: FramedWrite::new(p1_sender, MoneroWireCodec::default()),
direction: ConnectionDirection::InBound, direction: ConnectionDirection::Inbound,
permit: permit_2, permit: None,
}; };
let p1 = tokio::spawn(async move { let p1 = tokio::spawn(async move {
@ -114,9 +95,6 @@ async fn handshake_cuprate_to_cuprate() {
#[tokio::test] #[tokio::test]
async fn handshake_cuprate_to_monerod() { async fn handshake_cuprate_to_monerod() {
let semaphore = Arc::new(Semaphore::new(10));
let permit = semaphore.acquire_owned().await.unwrap();
let monerod = monerod(["--fixed-difficulty=1", "--out-peers=0"]).await; let monerod = monerod(["--fixed-difficulty=1", "--out-peers=0"]).await;
let our_basic_node_data = BasicNodeData { let our_basic_node_data = BasicNodeData {
@ -128,14 +106,7 @@ async fn handshake_cuprate_to_monerod() {
rpc_credits_per_hash: 0, rpc_credits_per_hash: 0,
}; };
let handshaker = HandShaker::<ClearNet, _, _, _, _, _>::new( let handshaker = HandshakerBuilder::<ClearNet>::new(our_basic_node_data).build();
DummyAddressBook,
DummyPeerSyncSvc,
DummyCoreSyncSvc,
DummyPeerRequestHandlerSvc,
|_| futures::stream::pending(),
our_basic_node_data,
);
let mut connector = Connector::new(handshaker); let mut connector = Connector::new(handshaker);
@ -145,7 +116,7 @@ async fn handshake_cuprate_to_monerod() {
.unwrap() .unwrap()
.call(ConnectRequest { .call(ConnectRequest {
addr: monerod.p2p_addr(), addr: monerod.p2p_addr(),
permit, permit: None,
}) })
.await .await
.unwrap(); .unwrap();
@ -153,9 +124,6 @@ async fn handshake_cuprate_to_monerod() {
#[tokio::test] #[tokio::test]
async fn handshake_monerod_to_cuprate() { async fn handshake_monerod_to_cuprate() {
let semaphore = Arc::new(Semaphore::new(10));
let permit = semaphore.acquire_owned().await.unwrap();
let our_basic_node_data = BasicNodeData { let our_basic_node_data = BasicNodeData {
my_port: 18081, my_port: 18081,
network_id: Network::Mainnet.network_id(), network_id: Network::Mainnet.network_id(),
@ -165,14 +133,7 @@ async fn handshake_monerod_to_cuprate() {
rpc_credits_per_hash: 0, rpc_credits_per_hash: 0,
}; };
let mut handshaker = HandShaker::<ClearNet, _, _, _, _, _>::new( let mut handshaker = HandshakerBuilder::<ClearNet>::new(our_basic_node_data).build();
DummyAddressBook,
DummyPeerSyncSvc,
DummyCoreSyncSvc,
DummyPeerRequestHandlerSvc,
|_| futures::stream::pending(),
our_basic_node_data,
);
let ip = "127.0.0.1".parse().unwrap(); let ip = "127.0.0.1".parse().unwrap();
@ -194,8 +155,8 @@ async fn handshake_monerod_to_cuprate() {
addr: InternalPeerID::KnownAddr(addr.unwrap()), // This is clear net all addresses are known. addr: InternalPeerID::KnownAddr(addr.unwrap()), // This is clear net all addresses are known.
peer_stream: stream, peer_stream: stream,
peer_sink: sink, peer_sink: sink,
direction: ConnectionDirection::InBound, direction: ConnectionDirection::Inbound,
permit, permit: None,
}) })
.await .await
.unwrap(); .unwrap();

View file

@ -1,27 +1,18 @@
use std::sync::Arc;
use tokio::sync::Semaphore;
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use cuprate_helper::network::Network; use cuprate_helper::network::Network;
use cuprate_wire::{common::PeerSupportFlags, protocol::GetObjectsRequest, BasicNodeData}; use cuprate_wire::{common::PeerSupportFlags, protocol::GetObjectsRequest, BasicNodeData};
use cuprate_p2p_core::{ use cuprate_p2p_core::{
client::{ConnectRequest, Connector, HandShaker}, client::{handshaker::HandshakerBuilder, ConnectRequest, Connector},
network_zones::ClearNet,
protocol::{PeerRequest, PeerResponse}, protocol::{PeerRequest, PeerResponse},
ClearNet, ProtocolRequest, ProtocolResponse,
}; };
use cuprate_test_utils::monerod::monerod; use cuprate_test_utils::monerod::monerod;
mod utils;
use utils::*;
#[tokio::test] #[tokio::test]
async fn get_single_block_from_monerod() { async fn get_single_block_from_monerod() {
let semaphore = Arc::new(Semaphore::new(10));
let permit = semaphore.acquire_owned().await.unwrap();
let monerod = monerod(["--out-peers=0"]).await; let monerod = monerod(["--out-peers=0"]).await;
let our_basic_node_data = BasicNodeData { let our_basic_node_data = BasicNodeData {
@ -33,14 +24,7 @@ async fn get_single_block_from_monerod() {
rpc_credits_per_hash: 0, rpc_credits_per_hash: 0,
}; };
let handshaker = HandShaker::<ClearNet, _, _, _, _, _>::new( let handshaker = HandshakerBuilder::<ClearNet>::new(our_basic_node_data).build();
DummyAddressBook,
DummyPeerSyncSvc,
DummyCoreSyncSvc,
DummyPeerRequestHandlerSvc,
|_| futures::stream::pending(),
our_basic_node_data,
);
let mut connector = Connector::new(handshaker); let mut connector = Connector::new(handshaker);
@ -50,22 +34,26 @@ async fn get_single_block_from_monerod() {
.unwrap() .unwrap()
.call(ConnectRequest { .call(ConnectRequest {
addr: monerod.p2p_addr(), addr: monerod.p2p_addr(),
permit, permit: None,
}) })
.await .await
.unwrap(); .unwrap();
let PeerResponse::GetObjects(obj) = connected_peer let PeerResponse::Protocol(ProtocolResponse::GetObjects(obj)) = connected_peer
.ready() .ready()
.await .await
.unwrap() .unwrap()
.call(PeerRequest::GetObjects(GetObjectsRequest { .call(PeerRequest::Protocol(ProtocolRequest::GetObjects(
blocks: hex::decode("418015bb9ae982a1975da7d79277c2705727a56894ba0fb246adaabb1f4632e3") GetObjectsRequest {
blocks: hex::decode(
"418015bb9ae982a1975da7d79277c2705727a56894ba0fb246adaabb1f4632e3",
)
.unwrap() .unwrap()
.try_into() .try_into()
.unwrap(), .unwrap(),
pruned: false, pruned: false,
})) },
)))
.await .await
.unwrap() .unwrap()
else { else {

View file

@ -1,110 +0,0 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures::FutureExt;
use tower::Service;
use cuprate_p2p_core::{
services::{
AddressBookRequest, AddressBookResponse, CoreSyncDataRequest, CoreSyncDataResponse,
PeerSyncRequest, PeerSyncResponse,
},
NetworkZone, PeerRequest, PeerResponse,
};
#[derive(Clone)]
pub struct DummyAddressBook;
impl<Z: NetworkZone> Service<AddressBookRequest<Z>> for DummyAddressBook {
type Response = AddressBookResponse<Z>;
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: AddressBookRequest<Z>) -> Self::Future {
async move {
Ok(match req {
AddressBookRequest::GetWhitePeers(_) => AddressBookResponse::Peers(vec![]),
_ => AddressBookResponse::Ok,
})
}
.boxed()
}
}
#[derive(Clone)]
pub struct DummyCoreSyncSvc;
impl Service<CoreSyncDataRequest> for DummyCoreSyncSvc {
type Response = CoreSyncDataResponse;
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: CoreSyncDataRequest) -> Self::Future {
async move {
Ok(CoreSyncDataResponse(cuprate_wire::CoreSyncData {
cumulative_difficulty: 1,
cumulative_difficulty_top64: 0,
current_height: 1,
pruning_seed: 0,
top_id: hex::decode(
"418015bb9ae982a1975da7d79277c2705727a56894ba0fb246adaabb1f4632e3",
)
.unwrap()
.try_into()
.unwrap(),
top_version: 1,
}))
}
.boxed()
}
}
#[derive(Clone)]
pub struct DummyPeerSyncSvc;
impl<N: NetworkZone> Service<PeerSyncRequest<N>> for DummyPeerSyncSvc {
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
type Response = PeerSyncResponse<N>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: PeerSyncRequest<N>) -> Self::Future {
async { Ok(PeerSyncResponse::Ok) }.boxed()
}
}
#[derive(Clone)]
pub struct DummyPeerRequestHandlerSvc;
impl Service<PeerRequest> for DummyPeerRequestHandlerSvc {
type Response = PeerResponse;
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: PeerRequest) -> Self::Future {
async move { Ok(PeerResponse::NA) }.boxed()
}
}

View file

@ -31,6 +31,7 @@ rand = { workspace = true, features = ["std", "std_rng"] }
rand_distr = { workspace = true, features = ["std"] } rand_distr = { workspace = true, features = ["std"] }
hex = { workspace = true, features = ["std"] } hex = { workspace = true, features = ["std"] }
tracing = { workspace = true, features = ["std", "attributes"] } tracing = { workspace = true, features = ["std", "attributes"] }
borsh = { workspace = true, features = ["derive", "std"] }
[dev-dependencies] [dev-dependencies]
cuprate-test-utils = { path = "../../test-utils" } cuprate-test-utils = { path = "../../test-utils" }

View file

@ -0,0 +1,170 @@
use std::{cmp::Ordering, collections::BinaryHeap};
use cuprate_async_buffer::BufferAppender;
use super::{BlockBatch, BlockDownloadError};
/// A batch of blocks in the ready queue, waiting for previous blocks to come in, so they can
/// be passed into the buffer.
///
/// The [`Eq`] and [`Ord`] impl on this type will only take into account the `start_height`, this
/// is because the block downloader will only download one chain at once so no 2 batches can have
/// the same `start_height`.
///
/// Also, the [`Ord`] impl is reversed so older blocks (lower height) come first in a [`BinaryHeap`].
#[derive(Debug, Clone)]
pub struct ReadyQueueBatch {
/// The start height of the batch.
pub start_height: u64,
/// The batch of blocks.
pub block_batch: BlockBatch,
}
impl Eq for ReadyQueueBatch {}
impl PartialEq<Self> for ReadyQueueBatch {
fn eq(&self, other: &Self) -> bool {
self.start_height.eq(&other.start_height)
}
}
impl PartialOrd<Self> for ReadyQueueBatch {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ReadyQueueBatch {
fn cmp(&self, other: &Self) -> Ordering {
// reverse the ordering so older blocks (lower height) come first in a [`BinaryHeap`]
self.start_height.cmp(&other.start_height).reverse()
}
}
/// The block queue that holds downloaded block batches, adding them to the [`async_buffer`] when the
/// oldest batch has been downloaded.
pub struct BlockQueue {
/// A queue of ready batches.
ready_batches: BinaryHeap<ReadyQueueBatch>,
/// The size, in bytes, of all the batches in [`Self::ready_batches`].
ready_batches_size: usize,
/// The [`BufferAppender`] that gives blocks to Cuprate.
buffer_appender: BufferAppender<BlockBatch>,
}
impl BlockQueue {
/// Creates a new [`BlockQueue`].
pub fn new(buffer_appender: BufferAppender<BlockBatch>) -> BlockQueue {
BlockQueue {
ready_batches: BinaryHeap::new(),
ready_batches_size: 0,
buffer_appender,
}
}
/// Returns the oldest batch that has not been put in the [`async_buffer`] yet.
pub fn oldest_ready_batch(&self) -> Option<u64> {
self.ready_batches.peek().map(|batch| batch.start_height)
}
/// Returns the size of all the batches that have not been put into the [`async_buffer`] yet.
pub fn size(&self) -> usize {
self.ready_batches_size
}
/// Adds an incoming batch to the queue and checks if we can push any batches into the [`async_buffer`].
///
/// `oldest_in_flight_start_height` should be the start height of the oldest batch that is still inflight, if
/// there are no batches inflight then this should be [`None`].
pub async fn add_incoming_batch(
&mut self,
new_batch: ReadyQueueBatch,
oldest_in_flight_start_height: Option<u64>,
) -> Result<(), BlockDownloadError> {
self.ready_batches_size += new_batch.block_batch.size;
self.ready_batches.push(new_batch);
// The height to stop pushing batches into the buffer.
let height_to_stop_at = oldest_in_flight_start_height.unwrap_or(u64::MAX);
while self
.ready_batches
.peek()
.is_some_and(|batch| batch.start_height <= height_to_stop_at)
{
let batch = self
.ready_batches
.pop()
.expect("We just checked we have a batch in the buffer");
let batch_size = batch.block_batch.size;
self.ready_batches_size -= batch_size;
self.buffer_appender
.send(batch.block_batch, batch_size)
.await
.map_err(|_| BlockDownloadError::BufferWasClosed)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use futures::StreamExt;
use proptest::{collection::vec, prelude::*};
use tokio_test::block_on;
use cuprate_p2p_core::handles::HandleBuilder;
use super::*;
prop_compose! {
fn ready_batch_strategy()(start_height in 0_u64..500_000_000) -> ReadyQueueBatch {
let (_, peer_handle) = HandleBuilder::new().build();
ReadyQueueBatch {
start_height,
block_batch: BlockBatch {
blocks: vec![],
size: start_height as usize,
peer_handle,
},
}
}
}
proptest! {
#[test]
fn block_queue_returns_items_in_order(batches in vec(ready_batch_strategy(), 0..10_000)) {
block_on(async move {
let (buffer_tx, mut buffer_rx) = cuprate_async_buffer::new_buffer(usize::MAX);
let mut queue = BlockQueue::new(buffer_tx);
let mut sorted_batches = BTreeSet::from_iter(batches.clone());
let mut soreted_batch_2 = sorted_batches.clone();
for batch in batches {
if sorted_batches.remove(&batch) {
queue.add_incoming_batch(batch, sorted_batches.last().map(|batch| batch.start_height)).await.unwrap();
}
}
assert_eq!(queue.size(), 0);
assert!(queue.oldest_ready_batch().is_none());
drop(queue);
while let Some(batch) = buffer_rx.next().await {
let last_batch = soreted_batch_2.pop_last().unwrap();
assert_eq!(batch.size, last_batch.block_batch.size);
}
});
}
}
}

View file

@ -0,0 +1,201 @@
use std::collections::HashSet;
use monero_serai::{block::Block, transaction::Transaction};
use rayon::prelude::*;
use tokio::time::timeout;
use tower::{Service, ServiceExt};
use tracing::instrument;
use cuprate_fixed_bytes::ByteArrayVec;
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_p2p_core::{
handles::ConnectionHandle, NetworkZone, PeerRequest, PeerResponse, ProtocolRequest,
ProtocolResponse,
};
use cuprate_wire::protocol::{GetObjectsRequest, GetObjectsResponse};
use crate::{
block_downloader::{BlockBatch, BlockDownloadError, BlockDownloadTaskResponse},
client_pool::ClientPoolDropGuard,
constants::{BLOCK_DOWNLOADER_REQUEST_TIMEOUT, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN},
};
/// Attempts to request a batch of blocks from a peer, returning [`BlockDownloadTaskResponse`].
#[instrument(
level = "debug",
name = "download_batch",
skip_all,
fields(
start_height = expected_start_height,
attempt = _attempt
)
)]
pub async fn download_batch_task<N: NetworkZone>(
client: ClientPoolDropGuard<N>,
ids: ByteArrayVec<32>,
previous_id: [u8; 32],
expected_start_height: u64,
_attempt: usize,
) -> BlockDownloadTaskResponse<N> {
BlockDownloadTaskResponse {
start_height: expected_start_height,
result: request_batch_from_peer(client, ids, previous_id, expected_start_height).await,
}
}
/// Requests a sequential batch of blocks from a peer.
///
/// This function will validate the blocks that were downloaded were the ones asked for and that they match
/// the expected height.
async fn request_batch_from_peer<N: NetworkZone>(
mut client: ClientPoolDropGuard<N>,
ids: ByteArrayVec<32>,
previous_id: [u8; 32],
expected_start_height: u64,
) -> Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError> {
let request = PeerRequest::Protocol(ProtocolRequest::GetObjects(GetObjectsRequest {
blocks: ids.clone(),
pruned: false,
}));
// Request the blocks and add a timeout to the request
let blocks_response = timeout(BLOCK_DOWNLOADER_REQUEST_TIMEOUT, async {
let PeerResponse::Protocol(ProtocolResponse::GetObjects(blocks_response)) =
client.ready().await?.call(request).await?
else {
panic!("Connection task returned wrong response.");
};
Ok::<_, BlockDownloadError>(blocks_response)
})
.await
.map_err(|_| BlockDownloadError::TimedOut)??;
// Initial sanity checks
if blocks_response.blocks.len() > ids.len() {
client.info.handle.ban_peer(MEDIUM_BAN);
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
if blocks_response.blocks.len() != ids.len() {
return Err(BlockDownloadError::PeerDidNotHaveRequestedData);
}
let peer_handle = client.info.handle.clone();
let blocks = rayon_spawn_async(move || {
deserialize_batch(
blocks_response,
expected_start_height,
ids,
previous_id,
peer_handle,
)
})
.await;
let batch = blocks.inspect_err(|e| {
// If the peers response was invalid, ban it.
if matches!(e, BlockDownloadError::PeersResponseWasInvalid) {
client.info.handle.ban_peer(MEDIUM_BAN);
}
})?;
Ok((client, batch))
}
fn deserialize_batch(
blocks_response: GetObjectsResponse,
expected_start_height: u64,
requested_ids: ByteArrayVec<32>,
previous_id: [u8; 32],
peer_handle: ConnectionHandle,
) -> Result<BlockBatch, BlockDownloadError> {
let blocks = blocks_response
.blocks
.into_par_iter()
.enumerate()
.map(|(i, block_entry)| {
let expected_height = u64::try_from(i).unwrap() + expected_start_height;
let mut size = block_entry.block.len();
let block = Block::read(&mut block_entry.block.as_ref())
.map_err(|_| BlockDownloadError::PeersResponseWasInvalid)?;
let block_hash = block.hash();
// Check the block matches the one requested and the peer sent enough transactions.
if requested_ids[i] != block_hash || block.txs.len() != block_entry.txs.len() {
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
// Check that the previous ID is correct for the first block.
// This is to protect use against banning the wrong peer.
// This must happen after the hash check.
if i == 0 && block.header.previous != previous_id {
tracing::warn!(
"Invalid chain, peer told us a block follows the chain when it doesn't."
);
// This peer probably did nothing wrong, it was the peer who told us this blockID which
// is misbehaving.
return Err(BlockDownloadError::ChainInvalid);
}
// Check the height lines up as expected.
// This must happen after the hash check.
if !block
.number()
.is_some_and(|height| height == expected_height)
{
tracing::warn!(
"Invalid chain, expected height: {expected_height}, got height: {:?}",
block.number()
);
// This peer probably did nothing wrong, it was the peer who told us this blockID which
// is misbehaving.
return Err(BlockDownloadError::ChainInvalid);
}
// Deserialize the transactions.
let txs = block_entry
.txs
.take_normal()
.ok_or(BlockDownloadError::PeersResponseWasInvalid)?
.into_iter()
.map(|tx_blob| {
size += tx_blob.len();
if tx_blob.len() > MAX_TRANSACTION_BLOB_SIZE {
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
Transaction::read(&mut tx_blob.as_ref())
.map_err(|_| BlockDownloadError::PeersResponseWasInvalid)
})
.collect::<Result<Vec<_>, _>>()?;
// Make sure the transactions in the block were the ones the peer sent.
let mut expected_txs = block.txs.iter().collect::<HashSet<_>>();
for tx in &txs {
if !expected_txs.remove(&tx.hash()) {
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
}
if !expected_txs.is_empty() {
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
Ok(((block, txs), size))
})
.collect::<Result<(Vec<_>, Vec<_>), _>>()?;
Ok(BlockBatch {
blocks: blocks.0,
size: blocks.1.into_iter().sum(),
peer_handle,
})
}

View file

@ -10,7 +10,7 @@ use cuprate_p2p_core::{
client::InternalPeerID, client::InternalPeerID,
handles::ConnectionHandle, handles::ConnectionHandle,
services::{PeerSyncRequest, PeerSyncResponse}, services::{PeerSyncRequest, PeerSyncResponse},
NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc, NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc, ProtocolRequest, ProtocolResponse,
}; };
use cuprate_wire::protocol::{ChainRequest, ChainResponse}; use cuprate_wire::protocol::{ChainRequest, ChainResponse};
@ -34,13 +34,15 @@ pub async fn request_chain_entry_from_peer<N: NetworkZone>(
mut client: ClientPoolDropGuard<N>, mut client: ClientPoolDropGuard<N>,
short_history: [[u8; 32]; 2], short_history: [[u8; 32]; 2],
) -> Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError> { ) -> Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError> {
let PeerResponse::GetChain(chain_res) = client let PeerResponse::Protocol(ProtocolResponse::GetChain(chain_res)) = client
.ready() .ready()
.await? .await?
.call(PeerRequest::GetChain(ChainRequest { .call(PeerRequest::Protocol(ProtocolRequest::GetChain(
block_ids: short_history.into(), ChainRequest {
prune: true, block_ids: short_history.into(),
})) prune: true,
},
)))
.await? .await?
else { else {
panic!("Connection task returned wrong response!"); panic!("Connection task returned wrong response!");
@ -132,10 +134,10 @@ where
let mut futs = JoinSet::new(); let mut futs = JoinSet::new();
let req = PeerRequest::GetChain(ChainRequest { let req = PeerRequest::Protocol(ProtocolRequest::GetChain(ChainRequest {
block_ids: block_ids.into(), block_ids: block_ids.into(),
prune: false, prune: false,
}); }));
tracing::debug!("Sending requests for chain entries."); tracing::debug!("Sending requests for chain entries.");
@ -149,7 +151,7 @@ where
futs.spawn(timeout( futs.spawn(timeout(
BLOCK_DOWNLOADER_REQUEST_TIMEOUT, BLOCK_DOWNLOADER_REQUEST_TIMEOUT,
async move { async move {
let PeerResponse::GetChain(chain_res) = let PeerResponse::Protocol(ProtocolResponse::GetChain(chain_res)) =
next_peer.ready().await?.call(cloned_req).await? next_peer.ready().await?.call(cloned_req).await?
else { else {
panic!("connection task returned wrong response!"); panic!("connection task returned wrong response!");

View file

@ -15,15 +15,15 @@ use monero_serai::{
transaction::{Input, Timelock, Transaction, TransactionPrefix}, transaction::{Input, Timelock, Transaction, TransactionPrefix},
}; };
use proptest::{collection::vec, prelude::*}; use proptest::{collection::vec, prelude::*};
use tokio::{sync::Semaphore, time::timeout}; use tokio::time::timeout;
use tower::{service_fn, Service}; use tower::{service_fn, Service};
use cuprate_fixed_bytes::ByteArrayVec; use cuprate_fixed_bytes::ByteArrayVec;
use cuprate_p2p_core::{ use cuprate_p2p_core::{
client::{mock_client, Client, InternalPeerID, PeerInformation}, client::{mock_client, Client, InternalPeerID, PeerInformation},
network_zones::ClearNet,
services::{PeerSyncRequest, PeerSyncResponse}, services::{PeerSyncRequest, PeerSyncResponse},
ConnectionDirection, NetworkZone, PeerRequest, PeerResponse, ClearNet, ConnectionDirection, NetworkZone, PeerRequest, PeerResponse, ProtocolRequest,
ProtocolResponse,
}; };
use cuprate_pruning::PruningSeed; use cuprate_pruning::PruningSeed;
use cuprate_wire::{ use cuprate_wire::{
@ -182,18 +182,15 @@ prop_compose! {
} }
fn mock_block_downloader_client(blockchain: Arc<MockBlockchain>) -> Client<ClearNet> { fn mock_block_downloader_client(blockchain: Arc<MockBlockchain>) -> Client<ClearNet> {
let semaphore = Arc::new(Semaphore::new(1)); let (connection_guard, connection_handle) =
cuprate_p2p_core::handles::HandleBuilder::new().build();
let (connection_guard, connection_handle) = cuprate_p2p_core::handles::HandleBuilder::new()
.with_permit(semaphore.try_acquire_owned().unwrap())
.build();
let request_handler = service_fn(move |req: PeerRequest| { let request_handler = service_fn(move |req: PeerRequest| {
let bc = blockchain.clone(); let bc = blockchain.clone();
async move { async move {
match req { match req {
PeerRequest::GetChain(chain_req) => { PeerRequest::Protocol(ProtocolRequest::GetChain(chain_req)) => {
let mut i = 0; let mut i = 0;
while !bc.blocks.contains_key(&chain_req.block_ids[i]) { while !bc.blocks.contains_key(&chain_req.block_ids[i]) {
i += 1; i += 1;
@ -215,18 +212,20 @@ fn mock_block_downloader_client(blockchain: Arc<MockBlockchain>) -> Client<Clear
.take(200) .take(200)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
Ok(PeerResponse::GetChain(ChainResponse { Ok(PeerResponse::Protocol(ProtocolResponse::GetChain(
start_height: 0, ChainResponse {
total_height: 0, start_height: 0,
cumulative_difficulty_low64: 1, total_height: 0,
cumulative_difficulty_top64: 0, cumulative_difficulty_low64: 1,
m_block_ids: block_ids.into(), cumulative_difficulty_top64: 0,
m_block_weights: vec![], m_block_ids: block_ids.into(),
first_block: Default::default(), m_block_weights: vec![],
})) first_block: Default::default(),
},
)))
} }
PeerRequest::GetObjects(obj) => { PeerRequest::Protocol(ProtocolRequest::GetObjects(obj)) => {
let mut res = Vec::with_capacity(obj.blocks.len()); let mut res = Vec::with_capacity(obj.blocks.len());
for i in 0..obj.blocks.len() { for i in 0..obj.blocks.len() {
@ -249,11 +248,13 @@ fn mock_block_downloader_client(blockchain: Arc<MockBlockchain>) -> Client<Clear
res.push(block_entry); res.push(block_entry);
} }
Ok(PeerResponse::GetObjects(GetObjectsResponse { Ok(PeerResponse::Protocol(ProtocolResponse::GetObjects(
blocks: res, GetObjectsResponse {
missed_ids: ByteArrayVec::from([]), blocks: res,
current_blockchain_height: 0, missed_ids: ByteArrayVec::from([]),
})) current_blockchain_height: 0,
},
)))
} }
_ => panic!(), _ => panic!(),
} }
@ -264,7 +265,7 @@ fn mock_block_downloader_client(blockchain: Arc<MockBlockchain>) -> Client<Clear
let info = PeerInformation { let info = PeerInformation {
id: InternalPeerID::Unknown(rand::random()), id: InternalPeerID::Unknown(rand::random()),
handle: connection_handle, handle: connection_handle,
direction: ConnectionDirection::InBound, direction: ConnectionDirection::Inbound,
pruning_seed: PruningSeed::NotPruned, pruning_seed: PruningSeed::NotPruned,
}; };

View file

@ -196,10 +196,10 @@ impl<N: NetworkZone> Service<BroadcastRequest<N>> for BroadcastSvc<N> {
// An error here means _all_ receivers were dropped which we assume will never happen. // An error here means _all_ receivers were dropped which we assume will never happen.
let _ = match direction { let _ = match direction {
Some(ConnectionDirection::InBound) => { Some(ConnectionDirection::Inbound) => {
self.tx_broadcast_channel_inbound.send(nex_tx_info) self.tx_broadcast_channel_inbound.send(nex_tx_info)
} }
Some(ConnectionDirection::OutBound) => { Some(ConnectionDirection::Outbound) => {
self.tx_broadcast_channel_outbound.send(nex_tx_info) self.tx_broadcast_channel_outbound.send(nex_tx_info)
} }
None => { None => {
@ -428,7 +428,7 @@ mod tests {
.unwrap() .unwrap()
.call(BroadcastRequest::Transaction { .call(BroadcastRequest::Transaction {
tx_bytes: Bytes::from_static(&[1]), tx_bytes: Bytes::from_static(&[1]),
direction: Some(ConnectionDirection::OutBound), direction: Some(ConnectionDirection::Outbound),
received_from: None, received_from: None,
}) })
.await .await
@ -440,7 +440,7 @@ mod tests {
.unwrap() .unwrap()
.call(BroadcastRequest::Transaction { .call(BroadcastRequest::Transaction {
tx_bytes: Bytes::from_static(&[2]), tx_bytes: Bytes::from_static(&[2]),
direction: Some(ConnectionDirection::InBound), direction: Some(ConnectionDirection::Inbound),
received_from: None, received_from: None,
}) })
.await .await

View file

@ -9,7 +9,6 @@
//! //!
//! Internally the pool is a [`DashMap`] which means care should be taken in `async` code //! Internally the pool is a [`DashMap`] which means care should be taken in `async` code
//! as internally this uses blocking RwLocks. //! as internally this uses blocking RwLocks.
//!
use std::sync::Arc; use std::sync::Arc;
use dashmap::DashMap; use dashmap::DashMap;

View file

@ -106,10 +106,6 @@ where
panic!("No seed nodes available to get peers from"); panic!("No seed nodes available to get peers from");
} }
// This isn't really needed here to limit connections as the seed nodes will be dropped when we have got
// peers from them.
let semaphore = Arc::new(Semaphore::new(seeds.len()));
let mut allowed_errors = seeds.len(); let mut allowed_errors = seeds.len();
let mut handshake_futs = JoinSet::new(); let mut handshake_futs = JoinSet::new();
@ -125,10 +121,7 @@ where
.expect("Connector had an error in `poll_ready`") .expect("Connector had an error in `poll_ready`")
.call(ConnectRequest { .call(ConnectRequest {
addr: *seed, addr: *seed,
permit: semaphore permit: None,
.clone()
.try_acquire_owned()
.expect("This must have enough permits as we just set the amount."),
}), }),
); );
// Spawn the handshake on a separate task with a timeout, so we don't get stuck connecting to a peer. // Spawn the handshake on a separate task with a timeout, so we don't get stuck connecting to a peer.
@ -157,7 +150,10 @@ where
.ready() .ready()
.await .await
.expect("Connector had an error in `poll_ready`") .expect("Connector had an error in `poll_ready`")
.call(ConnectRequest { addr, permit }); .call(ConnectRequest {
addr,
permit: Some(permit),
});
tokio::spawn( tokio::spawn(
async move { async move {

View file

@ -87,8 +87,8 @@ where
addr, addr,
peer_stream, peer_stream,
peer_sink, peer_sink,
direction: ConnectionDirection::InBound, direction: ConnectionDirection::Inbound,
permit, permit: Some(permit),
}); });
let cloned_pool = client_pool.clone(); let cloned_pool = client_pool.clone();

View file

@ -4,7 +4,6 @@
//! a certain [`NetworkZone`] //! a certain [`NetworkZone`]
use std::sync::Arc; use std::sync::Arc;
use cuprate_async_buffer::BufferStream;
use futures::FutureExt; use futures::FutureExt;
use tokio::{ use tokio::{
sync::{mpsc, watch}, sync::{mpsc, watch},
@ -14,11 +13,12 @@ use tokio_stream::wrappers::WatchStream;
use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt}; use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt};
use tracing::{instrument, Instrument, Span}; use tracing::{instrument, Instrument, Span};
use cuprate_async_buffer::BufferStream;
use cuprate_p2p_core::{ use cuprate_p2p_core::{
client::Connector, client::Connector,
client::InternalPeerID, client::InternalPeerID,
services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest}, services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest},
CoreSyncSvc, NetworkZone, PeerRequestHandler, CoreSyncSvc, NetworkZone, ProtocolRequestHandler,
}; };
mod block_downloader; mod block_downloader;
@ -42,17 +42,18 @@ use connection_maintainer::MakeConnectionRequest;
/// ///
/// # Usage /// # Usage
/// You must provide: /// You must provide:
/// - A peer request handler, which is given to each connection /// - A protocol request handler, which is given to each connection
/// - A core sync service, which keeps track of the sync state of our node /// - A core sync service, which keeps track of the sync state of our node
#[instrument(level = "debug", name = "net", skip_all, fields(zone = N::NAME))] #[instrument(level = "debug", name = "net", skip_all, fields(zone = N::NAME))]
pub async fn initialize_network<N, R, CS>( pub async fn initialize_network<N, PR, CS>(
peer_req_handler: R, protocol_request_handler: PR,
core_sync_svc: CS, core_sync_svc: CS,
config: P2PConfig<N>, config: P2PConfig<N>,
) -> Result<NetworkInterface<N>, tower::BoxError> ) -> Result<NetworkInterface<N>, tower::BoxError>
where where
N: NetworkZone, N: NetworkZone,
R: PeerRequestHandler + Clone, N::Addr: borsh::BorshDeserialize + borsh::BorshSerialize,
PR: ProtocolRequestHandler + Clone,
CS: CoreSyncSvc + Clone, CS: CoreSyncSvc + Clone,
{ {
let address_book = let address_book =
@ -79,23 +80,21 @@ where
basic_node_data.peer_id = 1; basic_node_data.peer_id = 1;
} }
let outbound_handshaker = cuprate_p2p_core::client::HandShaker::new( let outbound_handshaker_builder =
address_book.clone(), cuprate_p2p_core::client::HandshakerBuilder::new(basic_node_data)
sync_states_svc.clone(), .with_address_book(address_book.clone())
core_sync_svc.clone(), .with_peer_sync_svc(sync_states_svc.clone())
peer_req_handler.clone(), .with_core_sync_svc(core_sync_svc)
outbound_mkr, .with_protocol_request_handler(protocol_request_handler)
basic_node_data.clone(), .with_broadcast_stream_maker(outbound_mkr)
); .with_connection_parent_span(Span::current());
let inbound_handshaker = cuprate_p2p_core::client::HandShaker::new( let inbound_handshaker = outbound_handshaker_builder
address_book.clone(), .clone()
sync_states_svc.clone(), .with_broadcast_stream_maker(inbound_mkr)
core_sync_svc.clone(), .build();
peer_req_handler,
inbound_mkr, let outbound_handshaker = outbound_handshaker_builder.build();
basic_node_data,
);
let client_pool = client_pool::ClientPool::new(); let client_pool = client_pool::ClientPool::new();

View file

@ -238,9 +238,6 @@ impl<N: NetworkZone> Service<PeerSyncRequest<N>> for PeerSyncSvc<N> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::Arc;
use tokio::sync::Semaphore;
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use cuprate_p2p_core::{ use cuprate_p2p_core::{
@ -255,11 +252,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn top_sync_channel_updates() { async fn top_sync_channel_updates() {
let semaphore = Arc::new(Semaphore::new(1)); let (_g, handle) = HandleBuilder::new().build();
let (_g, handle) = HandleBuilder::new()
.with_permit(semaphore.try_acquire_owned().unwrap())
.build();
let (mut svc, mut watch) = PeerSyncSvc::<TestNetZone<true, true, true>>::new(); let (mut svc, mut watch) = PeerSyncSvc::<TestNetZone<true, true, true>>::new();
@ -336,11 +329,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn peer_sync_info_updates() { async fn peer_sync_info_updates() {
let semaphore = Arc::new(Semaphore::new(1)); let (_g, handle) = HandleBuilder::new().build();
let (_g, handle) = HandleBuilder::new()
.with_permit(semaphore.try_acquire_owned().unwrap())
.build();
let (mut svc, _watch) = PeerSyncSvc::<TestNetZone<true, true, true>>::new(); let (mut svc, _watch) = PeerSyncSvc::<TestNetZone<true, true, true>>::new();

View file

@ -31,6 +31,7 @@ curve25519-dalek = { workspace = true }
cuprate-pruning = { path = "../../pruning" } cuprate-pruning = { path = "../../pruning" }
monero-serai = { workspace = true, features = ["std"] } monero-serai = { workspace = true, features = ["std"] }
paste = { workspace = true } paste = { workspace = true }
serde = { workspace = true, optional = true }
# `service` feature. # `service` feature.
crossbeam = { workspace = true, features = ["std"], optional = true } crossbeam = { workspace = true, features = ["std"], optional = true }

View file

@ -9,11 +9,6 @@ use std::{
ops::Range, ops::Range,
}; };
#[cfg(feature = "borsh")]
use borsh::{BorshDeserialize, BorshSerialize};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use crate::types::{ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation}; use crate::types::{ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation};
//---------------------------------------------------------------------------------------------------- ReadRequest //---------------------------------------------------------------------------------------------------- ReadRequest