P2P Address book & Handshake changes (#89)

* use tokio's delay queue for bans

* document handles

* remove peers from address book when retrieving

* ping inbound peers during handshakes

* support receiving pings during handshakes

* add peer to anchor before reducing whit list

* clippy

* comment handshakes

* typos

* sort `use`

* use `rand::prelude::*`

* review comments

* update macro
This commit is contained in:
Boog900 2024-03-20 20:58:12 +00:00 committed by GitHub
parent de931f8630
commit 93372fa4b5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 642 additions and 535 deletions

13
Cargo.lock generated
View file

@ -1051,7 +1051,7 @@ dependencies = [
"futures-sink",
"futures-util",
"http 0.2.11",
"indexmap 2.2.4",
"indexmap 2.2.5",
"slab",
"tokio",
"tokio-util",
@ -1362,9 +1362,9 @@ dependencies = [
[[package]]
name = "indexmap"
version = "2.2.4"
version = "2.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "967d6dd42f16dbf0eb8040cb9e477933562684d3918f7d253f2ff9087fb3e7a3"
checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4"
dependencies = [
"equivalent",
"hashbrown 0.14.3",
@ -1545,17 +1545,17 @@ dependencies = [
name = "monero-address-book"
version = "0.1.0"
dependencies = [
"async-trait",
"borsh",
"cuprate-test-utils",
"futures",
"indexmap 2.2.5",
"monero-p2p",
"monero-pruning",
"monero-wire",
"pin-project",
"rand",
"thiserror",
"tokio",
"tokio-util",
"tower",
"tracing",
]
@ -2864,6 +2864,7 @@ dependencies = [
"futures-core",
"futures-sink",
"pin-project-lite",
"slab",
"tokio",
"tracing",
]
@ -2880,7 +2881,7 @@ version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1"
dependencies = [
"indexmap 2.2.4",
"indexmap 2.2.5",
"toml_datetime",
"winnow",
]

View file

@ -51,6 +51,7 @@ dirs = { version = "5.0.1", default-features = false }
futures = { version = "0.3.29", default-features = false }
hex = { version = "0.4.3", default-features = false }
hex-literal = { version = "0.4", default-features = false }
indexmap = { version = "2.2.5", default-features = false }
monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "347d4cf", default-features = false }
multiexp = { git = "https://github.com/Cuprate/serai.git", rev = "347d4cf", default-features = false }
paste = { version = "1.0.14", default-features = false }

View file

@ -1,202 +1,6 @@
# Epee Encoding
- [What](#what)
- [Features](#features)
- [Usage](#usage)
- [Derive Attributes](#derive-attributes)
- [No std](#no-std)
- [Options](#options)
## What
This crate implements the epee binary format found in Monero; unlike other crates,
this one does not use serde, this is not because serde is bad but its to reduce the
load on maintainers as all the traits in this lib are specific to epee instead of
general purpose.
## Features
### Default
The default feature enables the [derive](#derive) feature.
### Derive
This feature enables the derive macro for creating epee objects for example:
```rust
use epee_encoding::EpeeObject;
#[derive(EpeeObject)]
struct Test {
val: u8
}
```
## Usage
### example without derive:
```rust
use epee_encoding::{EpeeObject, EpeeObjectBuilder, read_epee_value, write_field, to_bytes, from_bytes};
use epee_encoding::io::{Read, Write};
pub struct Test {
val: u64
}
#[derive(Default)]
pub struct __TestEpeeBuilder {
val: Option<u64>,
}
impl EpeeObjectBuilder<Test> for __TestEpeeBuilder {
fn add_field<R: Read>(&mut self, name: &str, r: &mut R) -> epee_encoding::error::Result<bool> {
match name {
"val" => {self.val = Some(read_epee_value(r)?);}
_ => return Ok(false),
}
Ok(true)
}
fn finish(self) -> epee_encoding::error::Result<Test> {
Ok(
Test {
val: self.val.ok_or_else(|| epee_encoding::error::Error::Format("Required field was not found!"))?
}
)
}
}
impl EpeeObject for Test {
type Builder = __TestEpeeBuilder;
fn number_of_fields(&self) -> u64 {
1
}
fn write_fields<W: Write>(&self, w: &mut W) -> epee_encoding::error::Result<()> {
// write the fields
write_field(&self.val, "val", w)
}
}
let data = [1, 17, 1, 1, 1, 1, 2, 1, 1, 4, 3, 118, 97, 108, 5, 4, 0, 0, 0, 0, 0, 0, 0]; // the data to decode;
let val: Test = from_bytes(&data).unwrap();
let data = to_bytes(&val).unwrap();
```
### example with derive:
```rust
use epee_encoding::{EpeeObject, from_bytes, to_bytes};
#[derive(EpeeObject)]
struct Test {
val: u64
}
let data = [1, 17, 1, 1, 1, 1, 2, 1, 1, 4, 3, 118, 97, 108, 5, 4, 0, 0, 0, 0, 0, 0, 0]; // the data to decode;
let val: Test = from_bytes(&data).unwrap();
let data = to_bytes(&val).unwrap();
```
## Derive Attributes
The `EpeeObject` derive macro has a few attributes which correspond to specific C/C++ macro fields.
- [epee_flatten](#epeeflatten)
- [epee_alt_name](#epeealtname)
- [epee_default](#epeedefault)
### epee_flatten
This is equivalent to `KV_SERIALIZE_PARENT`, it flattens all the fields in the object into the parent object.
so this in C/C++:
```cpp
struct request_t: public rpc_request_base
{
uint8_t major_version;
BEGIN_KV_SERIALIZE_MAP()
KV_SERIALIZE_PARENT(rpc_request_base)
KV_SERIALIZE(major_version)
END_KV_SERIALIZE_MAP()
};
```
Would look like this in Rust:
```rust
#[derive(EpeeObject)]
struct RequestT {
#[epee_flatten]
rpc_request_base: RequestBase,
major_version: u8,
}
```
### epee_alt_name
This allows you to re-name a field for when its encoded, although this isn't related to a specific macro in
C/C++ this was included because Monero has [some odd names](https://github.com/monero-project/monero/blob/0a1eaf26f9dd6b762c2582ee12603b2a4671c735/src/cryptonote_protocol/cryptonote_protocol_defs.h#L199).
example:
```rust
#[derive(EpeeObject)]
pub struct HandshakeR {
#[epee_alt_name("node_data")]
pub node_data: BasicNodeData,
}
```
### epee_default
This is equivalent to `KV_SERIALIZE_OPT` and allows you to specify a default value for a field, when a default value
is specified the value will be used if it is not contained in the data and the field will not be encoded if the value is
the default value.
so this in C/C++:
```cpp
struct request_t
{
std::vector<blobdata> txs;
std::string _; // padding
bool dandelionpp_fluff; //zero initialization defaults to stem mode
BEGIN_KV_SERIALIZE_MAP()
KV_SERIALIZE(txs)
KV_SERIALIZE(_)
KV_SERIALIZE_OPT(dandelionpp_fluff, true) // backwards compatible mode is fluff
END_KV_SERIALIZE_MAP()
};
```
would look like this in Rust:
```rust
#[derive(EpeeObject)]
struct RequestT {
txs: Vec<Vec<u8>>,
#[epee_alt_name("_")]
padding: Vec<u8>,
#[epee_default(true)]
dandelionpp_fluff: bool,
}
```
## No std
This crate is no-std.
## Options
To have an optional field, you should wrap the type in `Option` and use the `epee_default` attribute.
So it would look like this:
```rust
#[derive(EpeeObject)]
struct T {
#[epee_default(None)]
val: Option<u8>,
}
```

View file

@ -4,6 +4,8 @@
//! This library contains the Epee binary format found in Monero, unlike other
//! crates this crate does not use serde.
//!
//! See [`epee_object`] for how to easily implement [`EpeeObject`] for your types.
//!
//! example without macro:
//! ```rust
//! # use epee_encoding::{EpeeObject, EpeeObjectBuilder, read_epee_value, write_field, to_bytes, from_bytes};
@ -56,33 +58,6 @@
//!
//!
//! ```
//!
//! example with macro:
//! ```rust
//! use epee_encoding::{from_bytes, to_bytes};
//!
//! // TODO: open an issue documenting why you need to do this here
//! // like this: https://github.com/Boog900/epee-encoding/issues/1
//! mod i_64079 {
//! use epee_encoding::epee_object;
//!
//! pub struct Test2 {
//! val: u64
//! }
//!
//! epee_object!(
//! Test2,
//! val: u64,
//! );
//! }
//! use i_64079::*;
//!
//!
//! let data = [1, 17, 1, 1, 1, 1, 2, 1, 1, 4, 3, 118, 97, 108, 5, 4, 0, 0, 0, 0, 0, 0, 0]; // the data to decode;
//! let val: Test2 = from_bytes(&mut data.as_slice()).unwrap();
//! let data = to_bytes(val).unwrap();
//!
//! ```
extern crate alloc;

View file

@ -1,42 +1,129 @@
pub use bytes;
pub use paste::paste;
#[macro_export]
macro_rules! field_name {
($field: tt, $alt_name: tt) => {
$alt_name
};
($field: ident,) => {
stringify!($field)
};
}
#[macro_export]
macro_rules! field_ty {
($ty:ty, $ty_as:ty) => {
$ty_as
};
($ty:ty,) => {
$ty
};
}
#[macro_export]
macro_rules! try_right_then_left {
($a:expr, $b:expr) => {
$b
};
($a:expr,) => {
$a
};
}
/// Macro to derive [`EpeeObject`](crate::EpeeObject) for structs.
///
/// ### Basic Usage:
///
/// ```rust
/// // mod visibility is here because of Rust visibility weirdness, you shouldn't need this unless defined in a function.
/// // see: <https://github.com/rust-lang/rust/issues/64079>
/// mod visibility {
///
/// use epee_encoding::epee_object;
///
/// struct Example {
/// a: u8
/// }
///
/// epee_object!(
/// Example,
/// a: u8,
/// );
/// }
/// ```
///
/// ### Advanced Usage:
///
/// ```rust
/// // mod visibility is here because of Rust visibility weirdness, you shouldn't need this unless defined in a function.
/// // see: <https://github.com/rust-lang/rust/issues/64079>
/// mod visibility {
///
/// use epee_encoding::epee_object;
///
/// struct Example {
/// a: u8,
/// b: u8,
/// c: u8,
/// d: u8,
/// e_f: Example2
/// }
///
/// struct Example2 {
/// e: u8
/// }
///
/// epee_object!(
/// Example2,
/// e: u8,
/// );
///
/// epee_object!(
/// Example,
/// // `("ALT-NAME")` changes the name of the field in the encoded data.
/// a("A"): u8,
/// // `= VALUE` sets a default value that this field will be set to if not in the data
/// // when encoding this field will be skipped if equal to the default.
/// b: u8 = 0,
/// // `as ALT-TYPE` encodes the data using the alt type, the alt type must impl Into<Type> and From<&Type>
/// c: u8 as u8,
/// // `=> read_fn, write_fn, should_write_fn,` allows you to specify alt field encoding functions.
/// // for the required args see the default functions, which are used here:
/// d: u8 => epee_encoding::read_epee_value, epee_encoding::write_field, <u8 as epee_encoding::EpeeValue>::should_write,
/// // `!flatten` can be used on fields which are epee objects, and it flattens the fields of that object into this object.
/// // So for this example `e_f` will not appear in the data but e will.
/// // You can't use the other options with this.
/// !flatten: e_f: Example2,
/// );
/// }
/// ```
///
///
#[macro_export]
macro_rules! epee_object {
// ------------------------------------------------------------------------ internal_try_right_then_left
// All this does is return the second (right) arg if present otherwise the left is returned.
(
@internal_try_right_then_left
$a:expr, $b:expr
) => {
$b
};
(
@internal_try_right_then_left
$a:expr,
) => {
$a
};
// ------------------------------------------------------------------------ internal_field_name
// Returns the alt_name if present otherwise stringifies the field ident.
(
@internal_field_name
$field: tt, $alt_name: tt
) => {
$alt_name
};
(
@internal_field_name
$field: ident,
) => {
stringify!($field)
};
// ------------------------------------------------------------------------ internal_field_type
// All this does is return the second (right) arg if present otherwise the left is returned.
(
@internal_field_type
$ty:ty, $ty_as:ty
) => {
$ty_as
};
(
@internal_field_type
$ty:ty,
) => {
$ty
};
// ------------------------------------------------------------------------ Entry Point
(
$obj:ident,
$($field: ident $(($alt_name: literal))?: $ty:ty $(as $ty_as:ty )? $(= $default:expr)? $(=> $read_fn:expr, $write_fn:expr, $should_write_fn:expr)?, )+
$(!flatten: $($flat_field: ident: $flat_ty:ty ,)+)?
$($field: ident $(($alt_name: literal))?: $ty:ty $(as $ty_as:ty )? $(= $default:expr)? $(=> $read_fn:expr, $write_fn:expr, $should_write_fn:expr)?, )*
$(!flatten: $flat_field: ident: $flat_ty:ty ,)*
) => {
epee_encoding::macros::paste!(
@ -46,27 +133,26 @@ macro_rules! epee_object {
#[derive(Default)]
pub struct [<__Builder $obj>] {
$($field: Option<epee_encoding::field_ty!($ty, $($ty_as)?)>,)+
$($($flat_field: <$flat_ty as epee_encoding::EpeeObject>::Builder,)+)?
$($field: Option<epee_encoding::epee_object!(@internal_field_type $ty, $($ty_as)?)>,)*
$($flat_field: <$flat_ty as epee_encoding::EpeeObject>::Builder,)*
}
impl epee_encoding::EpeeObjectBuilder<$obj> for [<__Builder $obj>] {
fn add_field<B: epee_encoding::macros::bytes::Buf>(&mut self, name: &str, b: &mut B) -> epee_encoding::error::Result<bool> {
match name {
$(epee_encoding::field_name!($field, $($alt_name)?) => {
$(epee_encoding::epee_object!(@internal_field_name $field, $($alt_name)?) => {
if core::mem::replace(&mut self.$field, Some(
epee_encoding::try_right_then_left!(epee_encoding::read_epee_value(b)?, $($read_fn(b)?)?)
epee_encoding::epee_object!(@internal_try_right_then_left epee_encoding::read_epee_value(b)?, $($read_fn(b)?)?)
)).is_some() {
Err(epee_encoding::error::Error::Value(format!("Duplicate field in data: {}", epee_encoding::field_name!($field, $($alt_name)?))))?;
Err(epee_encoding::error::Error::Value(format!("Duplicate field in data: {}", epee_encoding::epee_object!(@internal_field_name$field, $($alt_name)?))))?;
}
Ok(true)
},)+
},)*
_ => {
$(
$(if self.$flat_field.add_field(name, b)? {
return Ok(true);
})+
)?
})*
Ok(false)
}
@ -78,7 +164,7 @@ macro_rules! epee_object {
$obj {
$(
$field: {
let epee_default_value = epee_encoding::try_right_then_left!(epee_encoding::EpeeValue::epee_default_value(), $({
let epee_default_value = epee_encoding::epee_object!(@internal_try_right_then_left epee_encoding::EpeeValue::epee_default_value(), $({
let _ = $should_write_fn;
None
})?);
@ -87,15 +173,14 @@ macro_rules! epee_object {
$(.or(Some($default)))?
.or(epee_default_value)
$(.map(<$ty_as>::into))?
.ok_or(epee_encoding::error::Error::Value(format!("Missing field in data: {}", epee_encoding::field_name!($field, $($alt_name)?))))?
.ok_or(epee_encoding::error::Error::Value(format!("Missing field in data: {}", epee_encoding::epee_object!(@internal_field_name$field, $($alt_name)?))))?
},
)+
)*
$(
$(
$flat_field: self.$flat_field.finish()?,
)+
)?
)*
}
)
}
@ -109,38 +194,34 @@ macro_rules! epee_object {
let mut fields = 0;
$(
let field = epee_encoding::try_right_then_left!(&self.$field, $(<&$ty_as>::from(&self.$field))? );
let field = epee_encoding::epee_object!(@internal_try_right_then_left &self.$field, $(<&$ty_as>::from(&self.$field))? );
if $((field) != &$default &&)? epee_encoding::try_right_then_left!(epee_encoding::EpeeValue::should_write, $($should_write_fn)?)(field )
if $((field) != &$default &&)? epee_encoding::epee_object!(@internal_try_right_then_left epee_encoding::EpeeValue::should_write, $($should_write_fn)?)(field )
{
fields += 1;
}
)+
)*
$(
$(
fields += self.$flat_field.number_of_fields();
)+
)?
)*
fields
}
fn write_fields<B: epee_encoding::macros::bytes::BufMut>(self, w: &mut B) -> epee_encoding::error::Result<()> {
$(
let field = epee_encoding::try_right_then_left!(self.$field, $(<$ty_as>::from(self.$field))? );
let field = epee_encoding::epee_object!(@internal_try_right_then_left self.$field, $(<$ty_as>::from(self.$field))? );
if $(field != $default &&)? epee_encoding::try_right_then_left!(epee_encoding::EpeeValue::should_write, $($should_write_fn)?)(&field )
if $(field != $default &&)? epee_encoding::epee_object!(@internal_try_right_then_left epee_encoding::EpeeValue::should_write, $($should_write_fn)?)(&field )
{
epee_encoding::try_right_then_left!(epee_encoding::write_field, $($write_fn)?)((field), epee_encoding::field_name!($field, $($alt_name)?), w)?;
epee_encoding::epee_object!(@internal_try_right_then_left epee_encoding::write_field, $($write_fn)?)((field), epee_encoding::epee_object!(@internal_field_name$field, $($alt_name)?), w)?;
}
)+
)*
$(
$(
self.$flat_field.write_fields(w)?;
)+
)?
)*
Ok(())
}

View file

@ -86,9 +86,8 @@ struct Parent12 {
epee_object!(
Parent12,
h: f64,
!flatten:
child1: Child1,
child2: Child2,
!flatten: child1: Child1,
!flatten: child2: Child2,
);
#[test]

View file

@ -106,7 +106,7 @@ pub fn make_fragmented_messages<T: LevinBody>(
new_body.resize(fragment_size - HEADER_SIZE, 0);
bucket.body = new_body.freeze();
bucket.header.size = fragment_size
bucket.header.size = (fragment_size - HEADER_SIZE)
.try_into()
.expect("Bucket size does not fit into u64");
}

View file

@ -124,7 +124,8 @@ proptest! {
let len = fragments.len();
for (i, fragment) in fragments.into_iter().enumerate() {
prop_assert_eq!(fragment.body.len() + 33, fragment_size, "numb_fragments:{}, index: {}", len, i)
prop_assert_eq!(fragment.body.len() + 33, fragment_size, "numb_fragments:{}, index: {}", len, i);
prop_assert_eq!(fragment.header.size + 33, fragment_size as u64);
}
}

View file

@ -16,11 +16,14 @@
//! This module defines a Monero `Message` enum which contains
//! every possible Monero network message (levin body)
use bytes::{Buf, Bytes, BytesMut};
use std::fmt::Formatter;
use bytes::{Buf, BytesMut};
use epee_encoding::epee_object;
use levin_cuprate::{
BucketBuilder, BucketError, LevinBody, LevinCommand as LevinCommandTrait, MessageType,
};
use std::fmt::Formatter;
pub mod admin;
pub mod common;
@ -276,8 +279,18 @@ impl RequestMessage {
Ok(match command {
C::Handshake => decode_message(RequestMessage::Handshake, buf)?,
C::TimedSync => decode_message(RequestMessage::TimedSync, buf)?,
C::Ping => RequestMessage::Ping,
C::SupportFlags => RequestMessage::SupportFlags,
C::Ping => {
epee_encoding::from_bytes::<EmptyMessage, _>(buf)
.map_err(|e| BucketError::BodyDecodingError(e.into()))?;
RequestMessage::Ping
}
C::SupportFlags => {
epee_encoding::from_bytes::<EmptyMessage, _>(buf)
.map_err(|e| BucketError::BodyDecodingError(e.into()))?;
RequestMessage::SupportFlags
}
_ => return Err(BucketError::UnknownCommand),
})
}
@ -288,14 +301,8 @@ impl RequestMessage {
match self {
RequestMessage::Handshake(val) => build_message(C::Handshake, val, builder)?,
RequestMessage::TimedSync(val) => build_message(C::TimedSync, val, builder)?,
RequestMessage::Ping => {
builder.set_command(C::Ping);
builder.set_body(Bytes::new());
}
RequestMessage::SupportFlags => {
builder.set_command(C::SupportFlags);
builder.set_body(Bytes::new());
}
RequestMessage::Ping => build_message(C::Ping, EmptyMessage, builder)?,
RequestMessage::SupportFlags => build_message(C::SupportFlags, EmptyMessage, builder)?,
}
Ok(())
}
@ -408,3 +415,13 @@ impl LevinBody for Message {
}
}
}
/// An internal empty message.
///
/// This represents P2P messages that have no fields as epee's binary format will still add a header
/// for these objects, so we need to decode/encode a message.
struct EmptyMessage;
epee_object! {
EmptyMessage,
}

View file

@ -84,7 +84,7 @@ epee_object!(
);
/// The status field of an okay ping response
pub static PING_OK_RESPONSE_STATUS_TEXT: Bytes = Bytes::from_static("OK".as_bytes());
pub const PING_OK_RESPONSE_STATUS_TEXT: Bytes = Bytes::from_static("OK".as_bytes());
/// A Ping Response
#[derive(Debug, Clone, PartialEq, Eq)]

View file

@ -13,13 +13,13 @@ monero-p2p = { path = "../monero-p2p" }
tower = { workspace = true, features = ["util", "buffer"] }
tokio = { workspace = true, features = ["time", "fs", "rt"]}
tokio-util = { workspace = true, features = ["time"] }
futures = { workspace = true, features = ["std"] }
pin-project = { workspace = true }
async-trait = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true, features = ["std", "attributes"] }
indexmap = { workspace = true, features = ["std"] }
rand = { workspace = true, features = ["std", "std_rng"] }

View file

@ -1,22 +1,22 @@
//! The address book service.
//!
//! This module holds the address book service for a specific network zone.
use std::{
collections::{HashMap, HashSet},
future::Future,
panic,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use futures::{
future::{ready, Ready},
stream::FuturesUnordered,
FutureExt, StreamExt,
FutureExt,
};
use pin_project::pin_project;
use tokio::{
task::JoinHandle,
time::{interval, sleep, Interval, MissedTickBehavior, Sleep},
time::{interval, Instant, Interval, MissedTickBehavior},
};
use tokio_util::time::DelayQueue;
use tower::Service;
use monero_p2p::{
@ -27,7 +27,7 @@ use monero_p2p::{
};
use monero_pruning::PruningSeed;
use crate::{peer_list::PeerList, store::save_peers_to_disk, AddressBookError, Config};
use crate::{peer_list::PeerList, store::save_peers_to_disk, AddressBookConfig, AddressBookError};
#[cfg(test)]
mod tests;
@ -45,21 +45,6 @@ pub struct ConnectionPeerEntry<Z: NetworkZone> {
rpc_credits_per_hash: u32,
}
/// A future that resolves when a peer is unbanned.
#[pin_project(project = EnumProj)]
pub struct BanedPeerFut<Addr: NetZoneAddress>(Addr::BanID, #[pin] Sleep);
impl<Addr: NetZoneAddress> Future for BanedPeerFut<Addr> {
type Output = Addr::BanID;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
match this.1.poll_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(*this.0),
}
}
}
pub struct AddressBook<Z: NetworkZone> {
/// Our white peers - the peers we have previously connected to.
white_list: PeerList<Z>,
@ -71,20 +56,19 @@ pub struct AddressBook<Z: NetworkZone> {
/// The currently connected peers.
connected_peers: HashMap<InternalPeerID<Z::Addr>, ConnectionPeerEntry<Z>>,
connected_peers_ban_id: HashMap<<Z::Addr as NetZoneAddress>::BanID, HashSet<Z::Addr>>,
/// The currently banned peers
banned_peers: HashSet<<Z::Addr as NetZoneAddress>::BanID>,
banned_peers_fut: FuturesUnordered<BanedPeerFut<Z::Addr>>,
banned_peers: HashMap<<Z::Addr as NetZoneAddress>::BanID, Instant>,
banned_peers_queue: DelayQueue<<Z::Addr as NetZoneAddress>::BanID>,
peer_save_task_handle: Option<JoinHandle<std::io::Result<()>>>,
peer_save_interval: Interval,
cfg: Config,
cfg: AddressBookConfig,
}
impl<Z: NetworkZone> AddressBook<Z> {
pub fn new(
cfg: Config,
cfg: AddressBookConfig,
white_peers: Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>,
gray_peers: Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>,
anchor_peers: Vec<Z::Addr>,
@ -94,8 +78,9 @@ impl<Z: NetworkZone> AddressBook<Z> {
let anchor_list = HashSet::from_iter(anchor_peers);
// TODO: persist banned peers
let banned_peers = HashSet::new();
let banned_peers_fut = FuturesUnordered::new();
let banned_peers = HashMap::new();
let banned_peers_queue = DelayQueue::new();
let connected_peers = HashMap::new();
let mut peer_save_interval = interval(cfg.peer_save_period);
@ -108,7 +93,7 @@ impl<Z: NetworkZone> AddressBook<Z> {
connected_peers,
connected_peers_ban_id: HashMap::new(),
banned_peers,
banned_peers_fut,
banned_peers_queue,
peer_save_task_handle: None,
peer_save_interval,
cfg,
@ -146,8 +131,9 @@ impl<Z: NetworkZone> AddressBook<Z> {
}
fn poll_unban_peers(&mut self, cx: &mut Context<'_>) {
while let Poll::Ready(Some(ban_id)) = self.banned_peers_fut.poll_next_unpin(cx) {
self.banned_peers.remove(&ban_id);
while let Poll::Ready(Some(ban_id)) = self.banned_peers_queue.poll_expired(cx) {
tracing::debug!("Host {:?} is unbanned, ban has expired.", ban_id.get_ref(),);
self.banned_peers.remove(ban_id.get_ref());
}
}
@ -198,8 +184,8 @@ impl<Z: NetworkZone> AddressBook<Z> {
}
fn ban_peer(&mut self, addr: Z::Addr, time: Duration) {
if self.banned_peers.contains(&addr.ban_id()) {
return;
if self.banned_peers.contains_key(&addr.ban_id()) {
tracing::error!("Tried to ban peer twice, this shouldn't happen.")
}
if let Some(connected_peers_with_ban_id) = self.connected_peers_ban_id.get(&addr.ban_id()) {
@ -220,17 +206,20 @@ impl<Z: NetworkZone> AddressBook<Z> {
self.white_list.remove_peers_with_ban_id(&addr.ban_id());
self.gray_list.remove_peers_with_ban_id(&addr.ban_id());
self.banned_peers.insert(addr.ban_id());
self.banned_peers_fut
.push(BanedPeerFut(addr.ban_id(), sleep(time)))
let unban_at = Instant::now() + time;
self.banned_peers_queue.insert_at(addr.ban_id(), unban_at);
self.banned_peers.insert(addr.ban_id(), unban_at);
}
/// adds a peer to the gray list.
fn add_peer_to_gray_list(&mut self, mut peer: ZoneSpecificPeerListEntryBase<Z::Addr>) {
if self.white_list.contains_peer(&peer.adr) {
tracing::trace!("Peer {} is already in white list skipping.", peer.adr);
return;
};
if !self.gray_list.contains_peer(&peer.adr) {
tracing::trace!("Adding peer {} to gray list.", peer.adr);
peer.last_seen = 0;
self.gray_list.add_new_peer(peer);
}
@ -238,7 +227,7 @@ impl<Z: NetworkZone> AddressBook<Z> {
/// Checks if a peer is banned.
fn is_peer_banned(&self, peer: &Z::Addr) -> bool {
self.banned_peers.contains(&peer.ban_id())
self.banned_peers.contains_key(&peer.ban_id())
}
fn handle_incoming_peer_list(
@ -264,24 +253,22 @@ impl<Z: NetworkZone> AddressBook<Z> {
.reduce_list(&HashSet::new(), self.cfg.max_gray_list_length);
}
fn get_random_white_peer(
&self,
fn take_random_white_peer(
&mut self,
block_needed: Option<u64>,
) -> Option<ZoneSpecificPeerListEntryBase<Z::Addr>> {
tracing::debug!("Retrieving random white peer");
self.white_list
.get_random_peer(&mut rand::thread_rng(), block_needed)
.copied()
.take_random_peer(&mut rand::thread_rng(), block_needed)
}
fn get_random_gray_peer(
&self,
fn take_random_gray_peer(
&mut self,
block_needed: Option<u64>,
) -> Option<ZoneSpecificPeerListEntryBase<Z::Addr>> {
tracing::debug!("Retrieving random gray peer");
self.gray_list
.get_random_peer(&mut rand::thread_rng(), block_needed)
.copied()
.take_random_peer(&mut rand::thread_rng(), block_needed)
}
fn get_white_peers(&self, len: usize) -> Vec<ZoneSpecificPeerListEntryBase<Z::Addr>> {
@ -341,7 +328,7 @@ impl<Z: NetworkZone> AddressBook<Z> {
if self.is_peer_banned(addr) {
return Err(AddressBookError::PeerIsBanned);
}
// although the peer may not be readable still add it to the connected peers with ban ID.
// although the peer may not be reachable still add it to the connected peers with ban ID.
self.connected_peers_ban_id
.entry(addr.ban_id())
.or_default()
@ -350,13 +337,11 @@ impl<Z: NetworkZone> AddressBook<Z> {
// if the address is Some that means we can reach it from our node.
if let Some(addr) = peer.addr {
// remove the peer from the gray list as we know it's active.
self.gray_list.remove_peer(&addr);
// The peer is reachable, update our white list and add it to the anchor connections.
self.update_white_list_peer_entry(&peer)?;
self.anchor_list.insert(addr);
self.white_list
.reduce_list(&self.anchor_list, self.cfg.max_white_list_length);
self.anchor_list.insert(addr);
}
self.connected_peers.insert(internal_peer_id, peer);
@ -382,8 +367,8 @@ impl<Z: NetworkZone> Service<AddressBookRequest<Z>> for AddressBook<Z> {
let response = match req {
AddressBookRequest::NewConnection {
addr,
internal_peer_id,
public_address,
handle,
id,
pruning_seed,
@ -393,7 +378,7 @@ impl<Z: NetworkZone> Service<AddressBookRequest<Z>> for AddressBook<Z> {
.handle_new_connection(
internal_peer_id,
ConnectionPeerEntry {
addr,
addr: public_address,
id,
handle,
pruning_seed,
@ -402,20 +387,21 @@ impl<Z: NetworkZone> Service<AddressBookRequest<Z>> for AddressBook<Z> {
},
)
.map(|_| AddressBookResponse::Ok),
AddressBookRequest::BanPeer(addr, time) => {
self.ban_peer(addr, time);
Ok(AddressBookResponse::Ok)
}
AddressBookRequest::IncomingPeerList(peer_list) => {
self.handle_incoming_peer_list(peer_list);
Ok(AddressBookResponse::Ok)
}
AddressBookRequest::GetRandomWhitePeer { height } => self
.get_random_white_peer(height)
AddressBookRequest::TakeRandomWhitePeer { height } => self
.take_random_white_peer(height)
.map(AddressBookResponse::Peer)
.ok_or(AddressBookError::PeerNotFound),
AddressBookRequest::GetRandomGrayPeer { height } => self
.get_random_gray_peer(height)
AddressBookRequest::TakeRandomGrayPeer { height } => self
.take_random_gray_peer(height)
.map(AddressBookResponse::Peer)
.ok_or(AddressBookError::PeerNotFound),
AddressBookRequest::TakeRandomPeer { height } => self
.take_random_white_peer(height)
.or_else(|| self.take_random_gray_peer(height))
.map(AddressBookResponse::Peer)
.ok_or(AddressBookError::PeerNotFound),
AddressBookRequest::GetWhitePeers(len) => {

View file

@ -8,12 +8,12 @@ use monero_p2p::handles::HandleBuilder;
use monero_pruning::PruningSeed;
use super::{AddressBook, ConnectionPeerEntry, InternalPeerID};
use crate::{peer_list::tests::make_fake_peer_list, AddressBookError, Config};
use crate::{peer_list::tests::make_fake_peer_list, AddressBookConfig, AddressBookError};
use cuprate_test_utils::test_netzone::{TestNetZone, TestNetZoneAddr};
fn test_cfg() -> Config {
Config {
fn test_cfg() -> AddressBookConfig {
AddressBookConfig {
max_white_list_length: 100,
max_gray_list_length: 500,
peer_store_file: PathBuf::new(),
@ -35,7 +35,7 @@ fn make_fake_address_book(
connected_peers: Default::default(),
connected_peers_ban_id: Default::default(),
banned_peers: Default::default(),
banned_peers_fut: Default::default(),
banned_peers_queue: Default::default(),
peer_save_task_handle: None,
peer_save_interval: interval(Duration::from_secs(60)),
cfg: test_cfg(),
@ -43,15 +43,15 @@ fn make_fake_address_book(
}
#[tokio::test]
async fn get_random_peers() {
let address_book = make_fake_address_book(50, 250);
let peer = address_book.get_random_white_peer(None).unwrap();
assert!(address_book.white_list.contains_peer(&peer.adr));
async fn take_random_peers() {
let mut address_book = make_fake_address_book(50, 250);
let peer = address_book.take_random_white_peer(None).unwrap();
assert!(!address_book.white_list.contains_peer(&peer.adr));
assert!(!address_book.gray_list.contains_peer(&peer.adr));
let peer = address_book.get_random_gray_peer(None).unwrap();
let peer = address_book.take_random_gray_peer(None).unwrap();
assert!(!address_book.white_list.contains_peer(&peer.adr));
assert!(address_book.gray_list.contains_peer(&peer.adr));
assert!(!address_book.gray_list.contains_peer(&peer.adr));
}
#[tokio::test]
@ -81,7 +81,7 @@ async fn add_new_peer_already_connected() {
let semaphore = Arc::new(Semaphore::new(10));
let (_, handle, _) = HandleBuilder::default()
let (_, handle) = HandleBuilder::default()
.with_permit(semaphore.clone().try_acquire_owned().unwrap())
.build();
@ -99,7 +99,7 @@ async fn add_new_peer_already_connected() {
)
.unwrap();
let (_, handle, _) = HandleBuilder::default()
let (_, handle) = HandleBuilder::default()
.with_permit(semaphore.try_acquire_owned().unwrap())
.build();
@ -143,7 +143,12 @@ async fn banned_peer_removed_from_peer_lists() {
assert_eq!(address_book.white_list.len(), 98);
assert_eq!(
address_book.banned_peers_fut.next().await.unwrap(),
address_book
.banned_peers_queue
.next()
.await
.unwrap()
.into_inner(),
TestNetZoneAddr(1)
)
}

View file

@ -3,7 +3,7 @@
//! This module holds the logic for persistent peer storage.
//! Cuprates address book is modeled as a [`tower::Service`]
//! The request is [`AddressBookRequest`] and the response is
//! [`AddressBookResponse`].
//! [`AddressBookResponse`](monero_p2p::services::AddressBookResponse).
//!
//! Cuprate, like monerod, actually has multiple address books, one
//! for each [`NetworkZone`]. This is to reduce the possibility of
@ -11,24 +11,31 @@
//! and so peers will only get told about peers they can
//! connect to.
//!
use std::{io::ErrorKind, path::PathBuf, time::Duration};
use std::{path::PathBuf, time::Duration};
use tower::buffer::Buffer;
use monero_p2p::{
services::{AddressBookRequest, AddressBookResponse},
NetworkZone,
};
use monero_p2p::{services::AddressBookRequest, NetworkZone};
mod book;
mod peer_list;
mod store;
/// The address book config.
#[derive(Debug, Clone)]
pub struct Config {
max_white_list_length: usize,
max_gray_list_length: usize,
peer_store_file: PathBuf,
peer_save_period: Duration,
pub struct AddressBookConfig {
/// The maximum number of white peers in the peer list.
///
/// White peers are peers we have connected to before.
pub max_white_list_length: usize,
/// The maximum number of gray peers in the peer list.
///
/// Gray peers are peers we are yet to make a connection to.
pub max_gray_list_length: usize,
/// The location to store the address book.
pub peer_store_file: PathBuf,
/// The amount of time between saving the address book to disk.
pub peer_save_period: Duration,
}
/// Possible errors when dealing with the address book.
@ -41,9 +48,6 @@ pub enum AddressBookError {
/// The peer is not in the address book for this zone.
#[error("Peer was not found in book")]
PeerNotFound,
/// The peer list is empty.
#[error("The peer list is empty")]
PeerListEmpty,
/// Immutable peer data was changed.
#[error("Immutable peer data was changed: {0}")]
PeersDataChanged(&'static str),
@ -58,19 +62,25 @@ pub enum AddressBookError {
AddressBookTaskExited,
}
/// Initializes the P2P address book for a specific network zone.
pub async fn init_address_book<Z: NetworkZone>(
cfg: Config,
) -> Result<
impl tower::Service<
AddressBookRequest<Z>,
Response = AddressBookResponse<Z>,
Error = tower::BoxError,
>,
std::io::Error,
> {
let (white_list, gray_list) = store::read_peers_from_disk::<Z>(&cfg).await?;
cfg: AddressBookConfig,
) -> Result<Buffer<book::AddressBook<Z>, AddressBookRequest<Z>>, std::io::Error> {
tracing::info!(
"Loading peers from file: {} ",
cfg.peer_store_file.display()
);
let (white_list, gray_list) = match store::read_peers_from_disk::<Z>(&cfg).await {
Ok(res) => res,
Err(e) if e.kind() == ErrorKind::NotFound => (vec![], vec![]),
Err(e) => {
tracing::error!("Failed to open peer list, {}", e);
panic!("{e}");
}
};
let address_book = book::AddressBook::<Z>::new(cfg, white_list, gray_list, Vec::new());
Ok(tower::buffer::Buffer::new(address_book, 15))
Ok(Buffer::new(address_book, 15))
}

View file

@ -1,6 +1,7 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use rand::{seq::SliceRandom, Rng};
use indexmap::IndexMap;
use rand::prelude::*;
use monero_p2p::{services::ZoneSpecificPeerListEntryBase, NetZoneAddress, NetworkZone};
use monero_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT};
@ -14,7 +15,7 @@ pub mod tests;
#[derive(Debug)]
pub struct PeerList<Z: NetworkZone> {
/// The peers with their peer data.
pub peers: HashMap<Z::Addr, ZoneSpecificPeerListEntryBase<Z::Addr>>,
pub peers: IndexMap<Z::Addr, ZoneSpecificPeerListEntryBase<Z::Addr>>,
/// An index of Pruning seed to address, so can quickly grab peers with the blocks
/// we want.
///
@ -31,7 +32,7 @@ pub struct PeerList<Z: NetworkZone> {
impl<Z: NetworkZone> PeerList<Z> {
/// Creates a new peer list.
pub fn new(list: Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>) -> PeerList<Z> {
let mut peers = HashMap::with_capacity(list.len());
let mut peers = IndexMap::with_capacity(list.len());
let mut pruning_seeds = BTreeMap::new();
let mut ban_ids = HashMap::with_capacity(list.len());
@ -78,29 +79,27 @@ impl<Z: NetworkZone> PeerList<Z> {
}
}
/// Gets a reference to a peer
pub fn get_peer(&self, peer: &Z::Addr) -> Option<&ZoneSpecificPeerListEntryBase<Z::Addr>> {
self.peers.get(peer)
}
/// Returns a random peer.
/// If the pruning seed is specified then we will get a random peer with
/// that pruning seed otherwise we will just get a random peer in the whole
/// list.
pub fn get_random_peer<R: Rng>(
&self,
///
/// The given peer will be removed from the peer list.
pub fn take_random_peer<R: Rng>(
&mut self,
r: &mut R,
block_needed: Option<u64>,
) -> Option<&ZoneSpecificPeerListEntryBase<Z::Addr>> {
) -> Option<ZoneSpecificPeerListEntryBase<Z::Addr>> {
if let Some(needed_height) = block_needed {
let (_, addresses_with_block) = self.pruning_seeds.iter().find(|(seed, _)| {
// TODO: factor in peer blockchain height?
seed.get_next_unpruned_block(needed_height, CRYPTONOTE_MAX_BLOCK_HEIGHT)
.expect("Explain")
.expect("Block needed is higher than max block allowed.")
== needed_height
})?;
let n = r.gen_range(0..addresses_with_block.len());
self.get_peer(&addresses_with_block[n])
let peer = addresses_with_block[n];
self.remove_peer(&peer)
} else {
let len = self.len();
if len == 0 {
@ -108,7 +107,8 @@ impl<Z: NetworkZone> PeerList<Z> {
} else {
let n = r.gen_range(0..len);
self.peers.values().nth(n)
let (&key, _) = self.peers.get_index(n).unwrap();
self.remove_peer(&key)
}
}
}
@ -118,7 +118,10 @@ impl<Z: NetworkZone> PeerList<Z> {
r: &mut R,
len: usize,
) -> Vec<ZoneSpecificPeerListEntryBase<Z::Addr>> {
let mut peers = self.peers.values().copied().collect::<Vec<_>>();
let mut peers = self.peers.values().copied().choose_multiple(r, len);
// Order of the returned peers is not random, I am unsure of the impact of this, potentially allowing someone to make guesses about which peers
// were connected first.
// So to mitigate this shuffle the result.
peers.shuffle(r);
peers.drain(len.min(peers.len())..peers.len());
peers
@ -180,7 +183,7 @@ impl<Z: NetworkZone> PeerList<Z> {
&mut self,
peer: &Z::Addr,
) -> Option<ZoneSpecificPeerListEntryBase<Z::Addr>> {
let peer_eb = self.peers.remove(peer)?;
let peer_eb = self.peers.swap_remove(peer)?;
self.remove_peer_from_all_idxs(&peer_eb);
Some(peer_eb)
}

View file

@ -86,12 +86,10 @@ fn peer_list_reduce_length_with_peers_we_need() {
fn peer_list_remove_specific_peer() {
let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100);
let peer = *peer_list
.get_random_peer(&mut rand::thread_rng(), None)
let peer = peer_list
.take_random_peer(&mut rand::thread_rng(), None)
.unwrap();
assert!(peer_list.remove_peer(&peer.adr).is_some());
let pruning_idxs = peer_list.pruning_seeds;
let peers = peer_list.peers;
@ -125,7 +123,7 @@ fn peer_list_add_new_peer() {
peer_list.add_new_peer(new_peer);
assert_eq!(peer_list.len(), 11);
assert_eq!(peer_list.get_peer(&new_peer.adr), Some(&new_peer));
assert_eq!(peer_list.peers.get(&new_peer.adr), Some(&new_peer));
assert!(peer_list
.pruning_seeds
.get(&new_peer.pruning_seed)
@ -136,19 +134,22 @@ fn peer_list_add_new_peer() {
#[test]
fn peer_list_add_existing_peer() {
let mut peer_list = make_fake_peer_list(0, 10);
let existing_peer = *peer_list.get_peer(&TestNetZoneAddr(0)).unwrap();
let existing_peer = *peer_list.peers.get(&TestNetZoneAddr(0)).unwrap();
peer_list.add_new_peer(existing_peer);
assert_eq!(peer_list.len(), 10);
assert_eq!(peer_list.get_peer(&existing_peer.adr), Some(&existing_peer));
assert_eq!(
peer_list.peers.get(&existing_peer.adr),
Some(&existing_peer)
);
}
#[test]
fn peer_list_get_non_existent_peer() {
let peer_list = make_fake_peer_list(0, 10);
let non_existent_peer = TestNetZoneAddr(50);
assert_eq!(peer_list.get_peer(&non_existent_peer), None);
assert_eq!(peer_list.peers.get(&non_existent_peer), None);
}
#[test]
@ -159,7 +160,7 @@ fn peer_list_get_peer_with_block() {
peer_list.add_new_peer(make_fake_peer(101, Some(384)));
let peer = peer_list
.get_random_peer(&mut r, Some(1))
.take_random_peer(&mut r, Some(1))
.expect("We just added a peer with the correct seed");
assert!(peer
@ -172,13 +173,10 @@ fn peer_list_get_peer_with_block() {
fn peer_list_ban_peers() {
let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100);
let peer = peer_list
.get_random_peer(&mut rand::thread_rng(), None)
.take_random_peer(&mut rand::thread_rng(), None)
.unwrap();
let ban_id = peer.adr.ban_id();
assert!(peer_list.contains_peer(&peer.adr));
assert_ne!(peer_list.ban_ids.get(&ban_id).unwrap().len(), 0);
peer_list.remove_peers_with_ban_id(&ban_id);
assert_eq!(peer_list.ban_ids.get(&ban_id), None);
for (addr, _) in peer_list.peers {
assert_ne!(addr.ban_id(), ban_id);

View file

@ -5,7 +5,7 @@ use tokio::task::{spawn_blocking, JoinHandle};
use monero_p2p::{services::ZoneSpecificPeerListEntryBase, NetZoneAddress, NetworkZone};
use crate::{peer_list::PeerList, Config};
use crate::{peer_list::PeerList, AddressBookConfig};
// TODO: store anchor and ban list.
@ -22,7 +22,7 @@ struct DeserPeerDataV1<A: NetZoneAddress> {
}
pub fn save_peers_to_disk<Z: NetworkZone>(
cfg: &Config,
cfg: &AddressBookConfig,
white_list: &PeerList<Z>,
gray_list: &PeerList<Z>,
) -> JoinHandle<std::io::Result<()>> {
@ -39,7 +39,7 @@ pub fn save_peers_to_disk<Z: NetworkZone>(
}
pub async fn read_peers_from_disk<Z: NetworkZone>(
cfg: &Config,
cfg: &AddressBookConfig,
) -> Result<
(
Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>,

View file

@ -22,7 +22,7 @@ async-trait = { workspace = true }
tower = { workspace = true, features = ["util"] }
thiserror = { workspace = true }
tracing = { workspace = true, features = ["std"] }
tracing = { workspace = true, features = ["std", "attributes"] }
borsh = { workspace = true, default-features = false, features = ["derive", "std"], optional = true }

View file

@ -1,3 +1,9 @@
//! Handshake Module
//!
//! This module contains a [`HandShaker`] which is a [`Service`] that takes an open connection and attempts
//! to complete a handshake with them.
//!
//! This module also contains a [`ping`] function that can be used to check if an address is reachable.
use std::{
future::Future,
marker::PhantomData,
@ -13,8 +19,9 @@ use tokio::{
time::{error::Elapsed, timeout},
};
use tower::{Service, ServiceExt};
use tracing::Instrument;
use tracing::{info_span, instrument, Instrument};
use monero_pruning::{PruningError, PruningSeed};
use monero_wire::{
admin::{
HandshakeRequest, HandshakeResponse, PingResponse, SupportFlagsResponse,
@ -29,13 +36,32 @@ use crate::{
client::{connection::Connection, Client, InternalPeerID},
handles::HandleBuilder,
AddressBook, AddressBookRequest, AddressBookResponse, ConnectionDirection, CoreSyncDataRequest,
CoreSyncDataResponse, CoreSyncSvc, MessageID, NetworkZone, PeerBroadcast, PeerRequestHandler,
SharedError, MAX_PEERS_IN_PEER_LIST_MESSAGE,
CoreSyncDataResponse, CoreSyncSvc, MessageID, NetZoneAddress, NetworkZone, PeerBroadcast,
PeerRequestHandler, SharedError, MAX_PEERS_IN_PEER_LIST_MESSAGE,
};
const MAX_EAGER_PROTOCOL_MESSAGES: usize = 2;
/// This is a Cuprate specific constant.
///
/// When completing a handshake monerod might send protocol messages before the handshake is actually
/// complete, this is a problem for Cuprate as we must complete the handshake before responding to any
/// protocol requests. So when we receive a protocol message during a handshake we keep them around to handle
/// after the handshake.
///
/// Because we use the [bytes crate](https://crates.io/crates/bytes) in monero-wire for zero-copy parsing
/// it is not safe to keep too many of these messages around for long.
const MAX_EAGER_PROTOCOL_MESSAGES: usize = 1;
/// The time given to complete a handshake before the handshake fails.
const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(120);
/// A timeout put on pings during handshakes.
///
/// When we receive an inbound connection we open an outbound connection to the node and send a ping message
/// to see if we can reach the node, so we can add it to our address book.
///
/// This timeout must be significantly shorter than [`HANDSHAKE_TIMEOUT`] so we don't drop inbound connections that
/// don't have ports open.
const PING_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Debug, thiserror::Error)]
pub enum HandshakeError {
#[error("The handshake timed out")]
@ -48,6 +74,8 @@ pub enum HandshakeError {
PeerSentIncorrectPeerList(#[from] crate::services::PeerListConversionError),
#[error("Peer sent invalid message: {0}")]
PeerSentInvalidMessage(&'static str),
#[error("The peers pruning seed is invalid.")]
InvalidPruningSeed(#[from] PruningError),
#[error("Levin bucket error: {0}")]
LevinBucketError(#[from] BucketError),
#[error("Internal service error: {0}")]
@ -56,28 +84,42 @@ pub enum HandshakeError {
IO(#[from] std::io::Error),
}
/// A request to complete a handshake.
pub struct DoHandshakeRequest<Z: NetworkZone> {
/// The [`InternalPeerID`] of the peer we are handshaking with.
pub addr: InternalPeerID<Z::Addr>,
/// The receiving side of the connection.
pub peer_stream: Z::Stream,
/// The sending side of the connection.
pub peer_sink: Z::Sink,
/// The direction of the connection.
pub direction: ConnectionDirection,
/// A permit for this connection.
pub permit: OwnedSemaphorePermit,
}
/// The peer handshaking service.
#[derive(Debug, Clone)]
pub struct HandShaker<Z: NetworkZone, AdrBook, CSync, ReqHdlr> {
/// The address book service.
address_book: AdrBook,
/// The core sync data service.
core_sync_svc: CSync,
/// The peer request handler service.
peer_request_svc: ReqHdlr,
/// Our [`BasicNodeData`]
our_basic_node_data: BasicNodeData,
/// The channel to broadcast messages to all peers created with this handshaker.
broadcast_tx: broadcast::Sender<Arc<PeerBroadcast>>,
/// The network zone.
_zone: PhantomData<Z>,
}
impl<Z: NetworkZone, AdrBook, CSync, ReqHdlr> HandShaker<Z, AdrBook, CSync, ReqHdlr> {
/// Creates a new handshaker.
pub fn new(
address_book: AdrBook,
core_sync_svc: CSync,
@ -122,7 +164,7 @@ where
let core_sync_svc = self.core_sync_svc.clone();
let our_basic_node_data = self.our_basic_node_data.clone();
let span = tracing::info_span!(parent: &tracing::Span::current(), "handshaker", %req.addr);
let span = info_span!(parent: &tracing::Span::current(), "handshaker", addr=%req.addr);
async move {
timeout(
@ -143,6 +185,46 @@ where
}
}
/// Send a ping to the requested peer and wait for a response, returning the `peer_id`.
///
/// This function does not put a timeout on the ping.
pub async fn ping<N: NetworkZone>(addr: N::Addr) -> Result<u64, HandshakeError> {
tracing::debug!("Sending Ping to peer");
let (mut peer_stream, mut peer_sink) = N::connect_to_peer(addr).await?;
tracing::debug!("Made outbound connection to peer, sending ping.");
peer_sink
.send(Message::Request(RequestMessage::Ping).into())
.await?;
if let Some(res) = peer_stream.next().await {
if let Message::Response(ResponseMessage::Ping(ping)) = res? {
if ping.status == PING_OK_RESPONSE_STATUS_TEXT {
tracing::debug!("Ping successful.");
return Ok(ping.peer_id);
}
tracing::debug!("Peer's ping response was not `OK`.");
return Err(HandshakeError::PeerSentInvalidMessage(
"Ping response was not `OK`",
));
}
tracing::debug!("Peer sent invalid response to ping.");
return Err(HandshakeError::PeerSentInvalidMessage(
"Peer did not send correct response for ping.",
));
}
tracing::debug!("Connection closed before ping response.");
Err(BucketError::IO(std::io::Error::new(
std::io::ErrorKind::ConnectionAborted,
"The peer stream returned None",
)))?
}
/// This function completes a handshake with the requested peer.
async fn handshake<Z: NetworkZone, AdrBook, CSync, ReqHdlr>(
req: DoHandshakeRequest<Z>,
@ -167,11 +249,13 @@ where
permit,
} = req;
// A list of protocol messages the peer has sent during the handshake for us to handle after the handshake.
// see: [`MAX_EAGER_PROTOCOL_MESSAGES`]
let mut eager_protocol_messages = Vec::new();
let mut allow_support_flag_req = true;
let (peer_core_sync, mut peer_node_data) = match direction {
ConnectionDirection::InBound => {
// Inbound handshake the peer sends the request.
tracing::debug!("waiting for handshake request.");
let Message::Request(RequestMessage::Handshake(handshake_req)) = wait_for_message::<Z>(
@ -180,8 +264,7 @@ where
&mut peer_sink,
&mut peer_stream,
&mut eager_protocol_messages,
&mut allow_support_flag_req,
our_basic_node_data.support_flags,
&our_basic_node_data,
)
.await?
else {
@ -189,10 +272,11 @@ where
};
tracing::debug!("Received handshake request.");
// We will respond to the handshake request later.
(handshake_req.payload_data, handshake_req.node_data)
}
ConnectionDirection::OutBound => {
// Outbound handshake, we send the request.
send_hs_request::<Z, _>(
&mut peer_sink,
&mut core_sync_svc,
@ -200,6 +284,7 @@ where
)
.await?;
// Wait for the handshake response.
let Message::Response(ResponseMessage::Handshake(handshake_res)) =
wait_for_message::<Z>(
LevinCommand::Handshake,
@ -207,8 +292,7 @@ where
&mut peer_sink,
&mut peer_stream,
&mut eager_protocol_messages,
&mut allow_support_flag_req,
our_basic_node_data.support_flags,
&our_basic_node_data,
)
.await?
else {
@ -228,6 +312,7 @@ where
handshake_res.local_peerlist_new.len()
);
// Tell our address book about the new peers.
address_book
.ready()
.await?
@ -252,6 +337,10 @@ where
return Err(HandshakeError::PeerHasSameNodeID);
}
/*
// monerod sends a request for support flags if the peer doesn't specify any but this seems unnecessary
// as the peer should specify them in the handshake.
if peer_node_data.support_flags.is_empty() {
tracing::debug!(
"Peer didn't send support flags or has no features, sending request to make sure."
@ -267,8 +356,7 @@ where
&mut peer_sink,
&mut peer_stream,
&mut eager_protocol_messages,
&mut allow_support_flag_req,
our_basic_node_data.support_flags,
&our_basic_node_data,
)
.await?
else {
@ -279,7 +367,16 @@ where
peer_node_data.support_flags = support_flags_res.support_flags;
}
if direction == ConnectionDirection::InBound {
*/
// Make sure the pruning seed is valid.
let pruning_seed = PruningSeed::decompress_p2p_rules(peer_core_sync.pruning_seed)?;
// public_address, if Some, is the reachable address of the node.
let public_address = 'check_out_addr: {
match direction {
ConnectionDirection::InBound => {
// First send the handshake response.
send_hs_response::<Z, _, _>(
&mut peer_sink,
&mut core_sync_svc,
@ -287,20 +384,58 @@ where
our_basic_node_data,
)
.await?;
}
// Now if the peer specifies a reachable port, open a connection and ping them to check.
if peer_node_data.my_port != 0 {
let InternalPeerID::KnownAddr(mut outbound_address) = addr else {
// Anonymity network, we don't know the inbound address.
break 'check_out_addr None;
};
// u32 does not make sense as a port so just truncate it.
outbound_address.set_port(peer_node_data.my_port as u16);
let Ok(Ok(ping_peer_id)) = timeout(
PING_TIMEOUT,
ping::<Z>(outbound_address).instrument(info_span!("ping")),
)
.await
else {
// The ping was not successful.
break 'check_out_addr None;
};
// Make sure we are talking to the right node.
if ping_peer_id == peer_node_data.peer_id {
break 'check_out_addr Some(outbound_address);
}
}
// The peer did not specify a reachable port or the ping was not successful.
None
}
ConnectionDirection::OutBound => {
let InternalPeerID::KnownAddr(outbound_addr) = addr else {
unreachable!("How could we make an outbound connection to an unknown address");
};
// This is an outbound connection, this address is obviously reachable.
Some(outbound_addr)
}
}
};
// Tell the core sync service about the new peer.
core_sync_svc
.ready()
.await?
.call(CoreSyncDataRequest::HandleIncoming(peer_core_sync))
.call(CoreSyncDataRequest::HandleIncoming(peer_core_sync.clone()))
.await?;
tracing::debug!("Handshake complete.");
// Set up the connection data.
let error_slot = SharedError::new();
let (connection_guard, handle, _) = HandleBuilder::new().with_permit(permit).build();
let (connection_guard, handle) = HandleBuilder::new().with_permit(permit).build();
let (connection_tx, client_rx) = mpsc::channel(3);
let connection = Connection::<Z, _>::new(
@ -315,12 +450,33 @@ where
let connection_handle =
tokio::spawn(connection.run(peer_stream.fuse(), eager_protocol_messages));
let client = Client::<Z>::new(addr, handle, connection_tx, connection_handle, error_slot);
let client = Client::<Z>::new(
addr,
handle.clone(),
connection_tx,
connection_handle,
error_slot,
);
// Tell the address book about the new connection.
address_book
.ready()
.await?
.call(AddressBookRequest::NewConnection {
internal_peer_id: addr,
public_address,
handle,
id: peer_node_data.peer_id,
pruning_seed,
rpc_port: peer_node_data.rpc_port,
rpc_credits_per_hash: peer_node_data.rpc_credits_per_hash,
})
.await?;
Ok(client)
}
/// Sends a [`HandshakeRequest`] to the peer.
/// Sends a [`RequestMessage::Handshake`] down the peer sink.
async fn send_hs_request<Z: NetworkZone, CSync>(
peer_sink: &mut Z::Sink,
core_sync_svc: &mut CSync,
@ -352,6 +508,7 @@ where
Ok(())
}
/// Sends a [`ResponseMessage::Handshake`] down the peer sink.
async fn send_hs_response<Z: NetworkZone, CSync, AdrBook>(
peer_sink: &mut Z::Sink,
core_sync_svc: &mut CSync,
@ -397,15 +554,25 @@ where
Ok(())
}
/// Waits for a message with a specific [`LevinCommand`].
///
/// The message needed must not be a protocol message, only request/ response "admin" messages are allowed.
///
/// `levin_command` is the [`LevinCommand`] you need and `request` is for if the message is a request.
async fn wait_for_message<Z: NetworkZone>(
levin_command: LevinCommand,
request: bool,
peer_sink: &mut Z::Sink,
peer_stream: &mut Z::Stream,
eager_protocol_messages: &mut Vec<monero_wire::ProtocolMessage>,
allow_support_flag_req: &mut bool,
support_flags: PeerSupportFlags,
our_basic_node_data: &BasicNodeData,
) -> Result<Message, HandshakeError> {
let mut allow_support_flag_req = true;
let mut allow_ping = true;
while let Some(message) = peer_stream.next().await {
let message = message?;
@ -431,21 +598,38 @@ async fn wait_for_message<Z: NetworkZone>(
return Ok(Message::Request(req_message));
}
if matches!(req_message, RequestMessage::SupportFlags) {
if !*allow_support_flag_req {
match req_message {
RequestMessage::SupportFlags => {
if !allow_support_flag_req {
return Err(HandshakeError::PeerSentInvalidMessage(
"Peer sent 2 support flag requests",
));
}
send_support_flags::<Z>(peer_sink, support_flags).await?;
send_support_flags::<Z>(peer_sink, our_basic_node_data.support_flags)
.await?;
// don't let the peer send more after the first request.
*allow_support_flag_req = false;
allow_support_flag_req = false;
continue;
}
RequestMessage::Ping => {
if !allow_support_flag_req {
return Err(HandshakeError::PeerSentInvalidMessage(
"Peer sent 2 ping requests",
));
}
send_ping_response::<Z>(peer_sink, our_basic_node_data.peer_id).await?;
// don't let the peer send more after the first request.
allow_ping = false;
continue;
}
_ => {
return Err(HandshakeError::PeerSentInvalidMessage(
"Peer sent an admin request before responding to the handshake",
));
))
}
}
}
Message::Response(res_message) if !request => {
if res_message.command() == levin_command {
@ -470,6 +654,7 @@ async fn wait_for_message<Z: NetworkZone>(
)))?
}
/// Sends a [`ResponseMessage::SupportFlags`] down the peer sink.
async fn send_support_flags<Z: NetworkZone>(
peer_sink: &mut Z::Sink,
support_flags: PeerSupportFlags,
@ -484,3 +669,20 @@ async fn send_support_flags<Z: NetworkZone>(
)
.await?)
}
/// Sends a [`ResponseMessage::Ping`] down the peer sink.
async fn send_ping_response<Z: NetworkZone>(
peer_sink: &mut Z::Sink,
peer_id: u64,
) -> Result<(), HandshakeError> {
tracing::debug!("Sending ping response.");
Ok(peer_sink
.send(
Message::Response(ResponseMessage::Ping(PingResponse {
status: PING_OK_RESPONSE_STATUS_TEXT,
peer_id,
}))
.into(),
)
.await?)
}

View file

@ -1,28 +1,41 @@
//! Connection Handles.
//!
use std::time::Duration;
//! This module contains the [`ConnectionHandle`] which allows banning a peer, disconnecting a peer and
//! checking if the peer is still connected.
use std::{
sync::{Arc, OnceLock},
time::Duration,
};
use futures::{channel::mpsc, SinkExt};
use futures::SinkExt;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::CancellationToken;
/// A [`ConnectionHandle`] builder.
#[derive(Default, Debug)]
pub struct HandleBuilder {
permit: Option<OwnedSemaphorePermit>,
}
impl HandleBuilder {
/// Create a new builder.
pub fn new() -> Self {
Self { permit: None }
}
/// Sets the permit for this connection.
///
/// This must be called at least once.
pub fn with_permit(mut self, permit: OwnedSemaphorePermit) -> Self {
self.permit = Some(permit);
self
}
pub fn build(self) -> (ConnectionGuard, ConnectionHandle, PeerHandle) {
/// Builds the [`ConnectionGuard`] which should be handed to the connection task and the [`ConnectionHandle`].
///
/// This will panic if a permit was not set [`HandleBuilder::with_permit`]
pub fn build(self) -> (ConnectionGuard, ConnectionHandle) {
let token = CancellationToken::new();
let (tx, rx) = mpsc::channel(0);
(
ConnectionGuard {
@ -31,13 +44,14 @@ impl HandleBuilder {
},
ConnectionHandle {
token: token.clone(),
ban: rx,
ban: Arc::new(OnceLock::new()),
},
PeerHandle { ban: tx, token },
)
}
}
/// A struct representing the time a peer should be banned for.
#[derive(Debug, Copy, Clone)]
pub struct BanPeer(pub Duration);
/// A struct given to the connection task.
@ -47,9 +61,13 @@ pub struct ConnectionGuard {
}
impl ConnectionGuard {
/// Checks if we should close the connection.
pub fn should_shutdown(&self) -> bool {
self.token.is_cancelled()
}
/// Tell the corresponding [`ConnectionHandle`]s that this connection is closed.
///
/// This will be called on [`Drop::drop`].
pub fn connection_closed(&self) {
self.token.cancel()
}
@ -61,39 +79,29 @@ impl Drop for ConnectionGuard {
}
}
/// A handle given to a task that needs to close this connection and find out if the connection has
/// been banned.
/// A handle given to a task that needs to ban, disconnect, check if the peer should be banned or check
/// the peer is still connected.
#[derive(Debug, Clone)]
pub struct ConnectionHandle {
token: CancellationToken,
ban: mpsc::Receiver<BanPeer>,
ban: Arc<OnceLock<BanPeer>>,
}
impl ConnectionHandle {
/// Bans the peer for the given `duration`.
pub fn ban_peer(&self, duration: Duration) {
let _ = self.ban.set(BanPeer(duration));
}
/// Checks if this connection is closed.
pub fn is_closed(&self) -> bool {
self.token.is_cancelled()
}
/// Returns if this peer has been banned and the [`Duration`] of that ban.
pub fn check_should_ban(&mut self) -> Option<BanPeer> {
match self.ban.try_next() {
Ok(res) => res,
Err(_) => None,
}
self.ban.get().copied()
}
/// Sends the signal to the connection task to disconnect.
pub fn send_close_signal(&self) {
self.token.cancel()
}
}
/// A handle given to a task that needs to be able to ban a peer.
#[derive(Clone)]
pub struct PeerHandle {
token: CancellationToken,
ban: mpsc::Sender<BanPeer>,
}
impl PeerHandle {
pub fn ban_peer(&mut self, duration: Duration) {
// This channel won't be dropped and if it's full the peer has already been banned.
let _ = self.ban.try_send(BanPeer(duration));
self.token.cancel()
}
}

View file

@ -74,6 +74,9 @@ pub trait NetZoneAddress:
/// TODO: IP zone banning?
type BanID: Debug + Hash + Eq + Clone + Copy + Send + 'static;
/// Changes the port of this address to `port`.
fn set_port(&mut self, port: u16);
fn ban_id(&self) -> Self::BanID;
fn should_add_to_peer_list(&self) -> bool;

View file

@ -16,6 +16,10 @@ use crate::{NetZoneAddress, NetworkZone};
impl NetZoneAddress for SocketAddr {
type BanID = IpAddr;
fn set_port(&mut self, port: u16) {
SocketAddr::set_port(self, port)
}
fn ban_id(&self) -> Self::BanID {
self.ip()
}

View file

@ -69,34 +69,41 @@ impl<A: NetZoneAddress> TryFrom<monero_wire::PeerListEntryBase>
}
pub enum AddressBookRequest<Z: NetworkZone> {
/// Tells the address book that we have connected or received a connection from a peer.
NewConnection {
addr: Option<Z::Addr>,
/// The [`InternalPeerID`] of this connection.
internal_peer_id: InternalPeerID<Z::Addr>,
/// The public address of the peer, if this peer has a reachable public address.
public_address: Option<Z::Addr>,
/// The [`ConnectionHandle`] to this peer.
handle: ConnectionHandle,
/// An ID the peer assigned itself.
id: u64,
/// The peers [`PruningSeed`].
pruning_seed: PruningSeed,
/// The peers port.
/// The peers rpc port.
rpc_port: u16,
/// The peers rpc credits per hash
rpc_credits_per_hash: u32,
},
/// Bans a peer for the specified duration. This request
/// will send disconnect signals to all peers with the same
/// address.
BanPeer(Z::Addr, std::time::Duration),
/// Tells the address book about a peer list received from a peer.
IncomingPeerList(Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>),
/// Gets a random white peer from the peer list. If height is specified
/// Takes a random white peer from the peer list. If height is specified
/// then the peer list should retrieve a peer that should have a full
/// block at that height according to it's pruning seed
GetRandomWhitePeer {
height: Option<u64>,
},
/// Gets a random gray peer from the peer list. If height is specified
TakeRandomWhitePeer { height: Option<u64> },
/// Takes a random gray peer from the peer list. If height is specified
/// then the peer list should retrieve a peer that should have a full
/// block at that height according to it's pruning seed
GetRandomGrayPeer {
height: Option<u64>,
},
TakeRandomGrayPeer { height: Option<u64> },
/// Takes a random peer from the peer list. If height is specified
/// then the peer list should retrieve a peer that should have a full
/// block at that height according to it's pruning seed.
///
/// The address book will look in the white peer list first, then the gray
/// one if no peer is found.
TakeRandomPeer { height: Option<u64> },
/// Gets the specified number of white peers, or less if we don't have enough.
GetWhitePeers(usize),
}

View file

@ -7,11 +7,11 @@ use monero_p2p::handles::HandleBuilder;
#[test]
fn send_ban_signal() {
let semaphore = Arc::new(Semaphore::new(5));
let (guard, mut connection_handle, mut peer_handle) = HandleBuilder::default()
let (guard, mut connection_handle) = HandleBuilder::default()
.with_permit(semaphore.try_acquire_owned().unwrap())
.build();
peer_handle.ban_peer(Duration::from_secs(300));
connection_handle.ban_peer(Duration::from_secs(300));
let Some(ban_time) = connection_handle.check_should_ban() else {
panic!("ban signal not received!");
@ -29,13 +29,13 @@ fn send_ban_signal() {
#[test]
fn multiple_ban_signals() {
let semaphore = Arc::new(Semaphore::new(5));
let (guard, mut connection_handle, mut peer_handle) = HandleBuilder::default()
let (guard, mut connection_handle) = HandleBuilder::default()
.with_permit(semaphore.try_acquire_owned().unwrap())
.build();
peer_handle.ban_peer(Duration::from_secs(300));
peer_handle.ban_peer(Duration::from_secs(301));
peer_handle.ban_peer(Duration::from_secs(302));
connection_handle.ban_peer(Duration::from_secs(300));
connection_handle.ban_peer(Duration::from_secs(301));
connection_handle.ban_peer(Duration::from_secs(302));
let Some(ban_time) = connection_handle.check_should_ban() else {
panic!("ban signal not received!");
@ -54,7 +54,7 @@ fn multiple_ban_signals() {
#[test]
fn dropped_guard_sends_disconnect_signal() {
let semaphore = Arc::new(Semaphore::new(5));
let (guard, connection_handle, _) = HandleBuilder::default()
let (guard, connection_handle) = HandleBuilder::default()
.with_permit(semaphore.try_acquire_owned().unwrap())
.build();

View file

@ -29,6 +29,8 @@ pub struct TestNetZoneAddr(pub u32);
impl NetZoneAddress for TestNetZoneAddr {
type BanID = Self;
fn set_port(&mut self, _: u16) {}
fn ban_id(&self) -> Self::BanID {
*self
}