diff --git a/Cargo.lock b/Cargo.lock index 4bb47a9..5c96bc3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1051,7 +1051,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.11", - "indexmap 2.2.4", + "indexmap 2.2.5", "slab", "tokio", "tokio-util", @@ -1362,9 +1362,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.4" +version = "2.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "967d6dd42f16dbf0eb8040cb9e477933562684d3918f7d253f2ff9087fb3e7a3" +checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4" dependencies = [ "equivalent", "hashbrown 0.14.3", @@ -1545,17 +1545,17 @@ dependencies = [ name = "monero-address-book" version = "0.1.0" dependencies = [ - "async-trait", "borsh", "cuprate-test-utils", "futures", + "indexmap 2.2.5", "monero-p2p", "monero-pruning", "monero-wire", - "pin-project", "rand", "thiserror", "tokio", + "tokio-util", "tower", "tracing", ] @@ -2864,6 +2864,7 @@ dependencies = [ "futures-core", "futures-sink", "pin-project-lite", + "slab", "tokio", "tracing", ] @@ -2880,7 +2881,7 @@ version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ - "indexmap 2.2.4", + "indexmap 2.2.5", "toml_datetime", "winnow", ] diff --git a/Cargo.toml b/Cargo.toml index 73a1068..7968c16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ dirs = { version = "5.0.1", default-features = false } futures = { version = "0.3.29", default-features = false } hex = { version = "0.4.3", default-features = false } hex-literal = { version = "0.4", default-features = false } +indexmap = { version = "2.2.5", default-features = false } monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "347d4cf", default-features = false } multiexp = { git = "https://github.com/Cuprate/serai.git", rev = "347d4cf", default-features = false } paste = { version = "1.0.14", default-features = false } diff --git a/net/epee-encoding/README.md b/net/epee-encoding/README.md index 89c336d..00502a7 100644 --- a/net/epee-encoding/README.md +++ b/net/epee-encoding/README.md @@ -1,202 +1,6 @@ # Epee Encoding -- [What](#what) -- [Features](#features) -- [Usage](#usage) -- [Derive Attributes](#derive-attributes) -- [No std](#no-std) -- [Options](#options) - -## What This crate implements the epee binary format found in Monero; unlike other crates, this one does not use serde, this is not because serde is bad but its to reduce the load on maintainers as all the traits in this lib are specific to epee instead of general purpose. - -## Features - -### Default - -The default feature enables the [derive](#derive) feature. - -### Derive - -This feature enables the derive macro for creating epee objects for example: - -```rust -use epee_encoding::EpeeObject; -#[derive(EpeeObject)] -struct Test { - val: u8 -} -``` - -## Usage - -### example without derive: -```rust -use epee_encoding::{EpeeObject, EpeeObjectBuilder, read_epee_value, write_field, to_bytes, from_bytes}; -use epee_encoding::io::{Read, Write}; - -pub struct Test { - val: u64 -} - -#[derive(Default)] -pub struct __TestEpeeBuilder { - val: Option, -} - -impl EpeeObjectBuilder for __TestEpeeBuilder { - fn add_field(&mut self, name: &str, r: &mut R) -> epee_encoding::error::Result { - match name { - "val" => {self.val = Some(read_epee_value(r)?);} - _ => return Ok(false), - } - Ok(true) - } - - fn finish(self) -> epee_encoding::error::Result { - Ok( - Test { - val: self.val.ok_or_else(|| epee_encoding::error::Error::Format("Required field was not found!"))? - } - ) - } -} - -impl EpeeObject for Test { - type Builder = __TestEpeeBuilder; - - fn number_of_fields(&self) -> u64 { - 1 - } - - fn write_fields(&self, w: &mut W) -> epee_encoding::error::Result<()> { - // write the fields - write_field(&self.val, "val", w) - } -} - - -let data = [1, 17, 1, 1, 1, 1, 2, 1, 1, 4, 3, 118, 97, 108, 5, 4, 0, 0, 0, 0, 0, 0, 0]; // the data to decode; -let val: Test = from_bytes(&data).unwrap(); -let data = to_bytes(&val).unwrap(); - - -``` - -### example with derive: -```rust -use epee_encoding::{EpeeObject, from_bytes, to_bytes}; - -#[derive(EpeeObject)] -struct Test { - val: u64 -} - - -let data = [1, 17, 1, 1, 1, 1, 2, 1, 1, 4, 3, 118, 97, 108, 5, 4, 0, 0, 0, 0, 0, 0, 0]; // the data to decode; -let val: Test = from_bytes(&data).unwrap(); -let data = to_bytes(&val).unwrap(); - -``` - -## Derive Attributes - -The `EpeeObject` derive macro has a few attributes which correspond to specific C/C++ macro fields. - -- [epee_flatten](#epeeflatten) -- [epee_alt_name](#epeealtname) -- [epee_default](#epeedefault) - -### epee_flatten - -This is equivalent to `KV_SERIALIZE_PARENT`, it flattens all the fields in the object into the parent object. - -so this in C/C++: -```cpp -struct request_t: public rpc_request_base - { - uint8_t major_version; - - BEGIN_KV_SERIALIZE_MAP() - KV_SERIALIZE_PARENT(rpc_request_base) - KV_SERIALIZE(major_version) - END_KV_SERIALIZE_MAP() - }; -``` -Would look like this in Rust: -```rust -#[derive(EpeeObject)] -struct RequestT { - #[epee_flatten] - rpc_request_base: RequestBase, - major_version: u8, -} -``` - -### epee_alt_name - -This allows you to re-name a field for when its encoded, although this isn't related to a specific macro in -C/C++ this was included because Monero has [some odd names](https://github.com/monero-project/monero/blob/0a1eaf26f9dd6b762c2582ee12603b2a4671c735/src/cryptonote_protocol/cryptonote_protocol_defs.h#L199). - -example: -```rust -#[derive(EpeeObject)] -pub struct HandshakeR { - #[epee_alt_name("node_data")] - pub node_data: BasicNodeData, -} -``` - -### epee_default - -This is equivalent to `KV_SERIALIZE_OPT` and allows you to specify a default value for a field, when a default value -is specified the value will be used if it is not contained in the data and the field will not be encoded if the value is -the default value. - -so this in C/C++: -```cpp -struct request_t -{ - std::vector txs; - std::string _; // padding - bool dandelionpp_fluff; //zero initialization defaults to stem mode - - BEGIN_KV_SERIALIZE_MAP() - KV_SERIALIZE(txs) - KV_SERIALIZE(_) - KV_SERIALIZE_OPT(dandelionpp_fluff, true) // backwards compatible mode is fluff - END_KV_SERIALIZE_MAP() -}; -``` - -would look like this in Rust: -```rust -#[derive(EpeeObject)] -struct RequestT { - txs: Vec>, - #[epee_alt_name("_")] - padding: Vec, - #[epee_default(true)] - dandelionpp_fluff: bool, -} -``` - -## No std - -This crate is no-std. - -## Options - -To have an optional field, you should wrap the type in `Option` and use the `epee_default` attribute. -So it would look like this: - -```rust -#[derive(EpeeObject)] -struct T { - #[epee_default(None)] - val: Option, -} -``` \ No newline at end of file diff --git a/net/epee-encoding/src/lib.rs b/net/epee-encoding/src/lib.rs index 1960f28..ffb0a1e 100644 --- a/net/epee-encoding/src/lib.rs +++ b/net/epee-encoding/src/lib.rs @@ -4,6 +4,8 @@ //! This library contains the Epee binary format found in Monero, unlike other //! crates this crate does not use serde. //! +//! See [`epee_object`] for how to easily implement [`EpeeObject`] for your types. +//! //! example without macro: //! ```rust //! # use epee_encoding::{EpeeObject, EpeeObjectBuilder, read_epee_value, write_field, to_bytes, from_bytes}; @@ -56,33 +58,6 @@ //! //! //! ``` -//! -//! example with macro: -//! ```rust -//! use epee_encoding::{from_bytes, to_bytes}; -//! -//! // TODO: open an issue documenting why you need to do this here -//! // like this: https://github.com/Boog900/epee-encoding/issues/1 -//! mod i_64079 { -//! use epee_encoding::epee_object; -//! -//! pub struct Test2 { -//! val: u64 -//! } -//! -//! epee_object!( -//! Test2, -//! val: u64, -//! ); -//! } -//! use i_64079::*; -//! -//! -//! let data = [1, 17, 1, 1, 1, 1, 2, 1, 1, 4, 3, 118, 97, 108, 5, 4, 0, 0, 0, 0, 0, 0, 0]; // the data to decode; -//! let val: Test2 = from_bytes(&mut data.as_slice()).unwrap(); -//! let data = to_bytes(val).unwrap(); -//! -//! ``` extern crate alloc; diff --git a/net/epee-encoding/src/macros.rs b/net/epee-encoding/src/macros.rs index de89fc3..1ec0446 100644 --- a/net/epee-encoding/src/macros.rs +++ b/net/epee-encoding/src/macros.rs @@ -1,42 +1,129 @@ pub use bytes; pub use paste::paste; -#[macro_export] -macro_rules! field_name { - ($field: tt, $alt_name: tt) => { - $alt_name - }; - ($field: ident,) => { - stringify!($field) - }; -} - -#[macro_export] -macro_rules! field_ty { - ($ty:ty, $ty_as:ty) => { - $ty_as - }; - ($ty:ty,) => { - $ty - }; -} - -#[macro_export] -macro_rules! try_right_then_left { - ($a:expr, $b:expr) => { - $b - }; - ($a:expr,) => { - $a - }; -} - +/// Macro to derive [`EpeeObject`](crate::EpeeObject) for structs. +/// +/// ### Basic Usage: +/// +/// ```rust +/// // mod visibility is here because of Rust visibility weirdness, you shouldn't need this unless defined in a function. +/// // see: +/// mod visibility { +/// +/// use epee_encoding::epee_object; +/// +/// struct Example { +/// a: u8 +/// } +/// +/// epee_object!( +/// Example, +/// a: u8, +/// ); +/// } +/// ``` +/// +/// ### Advanced Usage: +/// +/// ```rust +/// // mod visibility is here because of Rust visibility weirdness, you shouldn't need this unless defined in a function. +/// // see: +/// mod visibility { +/// +/// use epee_encoding::epee_object; +/// +/// struct Example { +/// a: u8, +/// b: u8, +/// c: u8, +/// d: u8, +/// e_f: Example2 +/// } +/// +/// struct Example2 { +/// e: u8 +/// } +/// +/// epee_object!( +/// Example2, +/// e: u8, +/// ); +/// +/// epee_object!( +/// Example, +/// // `("ALT-NAME")` changes the name of the field in the encoded data. +/// a("A"): u8, +/// // `= VALUE` sets a default value that this field will be set to if not in the data +/// // when encoding this field will be skipped if equal to the default. +/// b: u8 = 0, +/// // `as ALT-TYPE` encodes the data using the alt type, the alt type must impl Into and From<&Type> +/// c: u8 as u8, +/// // `=> read_fn, write_fn, should_write_fn,` allows you to specify alt field encoding functions. +/// // for the required args see the default functions, which are used here: +/// d: u8 => epee_encoding::read_epee_value, epee_encoding::write_field, ::should_write, +/// // `!flatten` can be used on fields which are epee objects, and it flattens the fields of that object into this object. +/// // So for this example `e_f` will not appear in the data but e will. +/// // You can't use the other options with this. +/// !flatten: e_f: Example2, +/// ); +/// } +/// ``` +/// +/// #[macro_export] macro_rules! epee_object { + // ------------------------------------------------------------------------ internal_try_right_then_left + // All this does is return the second (right) arg if present otherwise the left is returned. + ( + @internal_try_right_then_left + $a:expr, $b:expr + ) => { + $b + }; + + ( + @internal_try_right_then_left + $a:expr, + ) => { + $a + }; + + // ------------------------------------------------------------------------ internal_field_name + // Returns the alt_name if present otherwise stringifies the field ident. + ( + @internal_field_name + $field: tt, $alt_name: tt + ) => { + $alt_name + }; + + ( + @internal_field_name + $field: ident, + ) => { + stringify!($field) + }; + + // ------------------------------------------------------------------------ internal_field_type + // All this does is return the second (right) arg if present otherwise the left is returned. + ( + @internal_field_type + $ty:ty, $ty_as:ty + ) => { + $ty_as + }; + ( + @internal_field_type + $ty:ty, + ) => { + $ty + }; + + // ------------------------------------------------------------------------ Entry Point ( $obj:ident, - $($field: ident $(($alt_name: literal))?: $ty:ty $(as $ty_as:ty )? $(= $default:expr)? $(=> $read_fn:expr, $write_fn:expr, $should_write_fn:expr)?, )+ - $(!flatten: $($flat_field: ident: $flat_ty:ty ,)+)? + $($field: ident $(($alt_name: literal))?: $ty:ty $(as $ty_as:ty )? $(= $default:expr)? $(=> $read_fn:expr, $write_fn:expr, $should_write_fn:expr)?, )* + $(!flatten: $flat_field: ident: $flat_ty:ty ,)* ) => { epee_encoding::macros::paste!( @@ -46,27 +133,26 @@ macro_rules! epee_object { #[derive(Default)] pub struct [<__Builder $obj>] { - $($field: Option,)+ - $($($flat_field: <$flat_ty as epee_encoding::EpeeObject>::Builder,)+)? + $($field: Option,)* + $($flat_field: <$flat_ty as epee_encoding::EpeeObject>::Builder,)* } impl epee_encoding::EpeeObjectBuilder<$obj> for [<__Builder $obj>] { fn add_field(&mut self, name: &str, b: &mut B) -> epee_encoding::error::Result { match name { - $(epee_encoding::field_name!($field, $($alt_name)?) => { + $(epee_encoding::epee_object!(@internal_field_name $field, $($alt_name)?) => { if core::mem::replace(&mut self.$field, Some( - epee_encoding::try_right_then_left!(epee_encoding::read_epee_value(b)?, $($read_fn(b)?)?) + epee_encoding::epee_object!(@internal_try_right_then_left epee_encoding::read_epee_value(b)?, $($read_fn(b)?)?) )).is_some() { - Err(epee_encoding::error::Error::Value(format!("Duplicate field in data: {}", epee_encoding::field_name!($field, $($alt_name)?))))?; + Err(epee_encoding::error::Error::Value(format!("Duplicate field in data: {}", epee_encoding::epee_object!(@internal_field_name$field, $($alt_name)?))))?; } Ok(true) - },)+ + },)* _ => { - $( - $( if self.$flat_field.add_field(name, b)? { - return Ok(true); - })+ - )? + + $(if self.$flat_field.add_field(name, b)? { + return Ok(true); + })* Ok(false) } @@ -78,7 +164,7 @@ macro_rules! epee_object { $obj { $( $field: { - let epee_default_value = epee_encoding::try_right_then_left!(epee_encoding::EpeeValue::epee_default_value(), $({ + let epee_default_value = epee_encoding::epee_object!(@internal_try_right_then_left epee_encoding::EpeeValue::epee_default_value(), $({ let _ = $should_write_fn; None })?); @@ -87,15 +173,14 @@ macro_rules! epee_object { $(.or(Some($default)))? .or(epee_default_value) $(.map(<$ty_as>::into))? - .ok_or(epee_encoding::error::Error::Value(format!("Missing field in data: {}", epee_encoding::field_name!($field, $($alt_name)?))))? + .ok_or(epee_encoding::error::Error::Value(format!("Missing field in data: {}", epee_encoding::epee_object!(@internal_field_name$field, $($alt_name)?))))? }, - )+ + )* $( - $( $flat_field: self.$flat_field.finish()?, - )+ - )? + )* + } ) } @@ -109,38 +194,34 @@ macro_rules! epee_object { let mut fields = 0; $( - let field = epee_encoding::try_right_then_left!(&self.$field, $(<&$ty_as>::from(&self.$field))? ); + let field = epee_encoding::epee_object!(@internal_try_right_then_left &self.$field, $(<&$ty_as>::from(&self.$field))? ); - if $((field) != &$default &&)? epee_encoding::try_right_then_left!(epee_encoding::EpeeValue::should_write, $($should_write_fn)?)(field ) + if $((field) != &$default &&)? epee_encoding::epee_object!(@internal_try_right_then_left epee_encoding::EpeeValue::should_write, $($should_write_fn)?)(field ) { fields += 1; } - )+ + )* $( - $( - fields += self.$flat_field.number_of_fields(); - )+ - )? + fields += self.$flat_field.number_of_fields(); + )* fields } fn write_fields(self, w: &mut B) -> epee_encoding::error::Result<()> { $( - let field = epee_encoding::try_right_then_left!(self.$field, $(<$ty_as>::from(self.$field))? ); + let field = epee_encoding::epee_object!(@internal_try_right_then_left self.$field, $(<$ty_as>::from(self.$field))? ); - if $(field != $default &&)? epee_encoding::try_right_then_left!(epee_encoding::EpeeValue::should_write, $($should_write_fn)?)(&field ) + if $(field != $default &&)? epee_encoding::epee_object!(@internal_try_right_then_left epee_encoding::EpeeValue::should_write, $($should_write_fn)?)(&field ) { - epee_encoding::try_right_then_left!(epee_encoding::write_field, $($write_fn)?)((field), epee_encoding::field_name!($field, $($alt_name)?), w)?; + epee_encoding::epee_object!(@internal_try_right_then_left epee_encoding::write_field, $($write_fn)?)((field), epee_encoding::epee_object!(@internal_field_name$field, $($alt_name)?), w)?; } - )+ + )* $( - $( self.$flat_field.write_fields(w)?; - )+ - )? + )* Ok(()) } diff --git a/net/epee-encoding/tests/flattened.rs b/net/epee-encoding/tests/flattened.rs index fafcd4a..ef92e5e 100644 --- a/net/epee-encoding/tests/flattened.rs +++ b/net/epee-encoding/tests/flattened.rs @@ -86,9 +86,8 @@ struct Parent12 { epee_object!( Parent12, h: f64, - !flatten: - child1: Child1, - child2: Child2, + !flatten: child1: Child1, + !flatten: child2: Child2, ); #[test] diff --git a/net/levin/src/message.rs b/net/levin/src/message.rs index dd60fdd..af8227d 100644 --- a/net/levin/src/message.rs +++ b/net/levin/src/message.rs @@ -106,7 +106,7 @@ pub fn make_fragmented_messages( new_body.resize(fragment_size - HEADER_SIZE, 0); bucket.body = new_body.freeze(); - bucket.header.size = fragment_size + bucket.header.size = (fragment_size - HEADER_SIZE) .try_into() .expect("Bucket size does not fit into u64"); } diff --git a/net/levin/tests/fragmented_message.rs b/net/levin/tests/fragmented_message.rs index 7598e2c..45ff3a0 100644 --- a/net/levin/tests/fragmented_message.rs +++ b/net/levin/tests/fragmented_message.rs @@ -124,7 +124,8 @@ proptest! { let len = fragments.len(); for (i, fragment) in fragments.into_iter().enumerate() { - prop_assert_eq!(fragment.body.len() + 33, fragment_size, "numb_fragments:{}, index: {}", len, i) + prop_assert_eq!(fragment.body.len() + 33, fragment_size, "numb_fragments:{}, index: {}", len, i); + prop_assert_eq!(fragment.header.size + 33, fragment_size as u64); } } diff --git a/net/monero-wire/src/p2p.rs b/net/monero-wire/src/p2p.rs index 31356c5..7a2b7de 100644 --- a/net/monero-wire/src/p2p.rs +++ b/net/monero-wire/src/p2p.rs @@ -16,11 +16,14 @@ //! This module defines a Monero `Message` enum which contains //! every possible Monero network message (levin body) -use bytes::{Buf, Bytes, BytesMut}; +use std::fmt::Formatter; + +use bytes::{Buf, BytesMut}; + +use epee_encoding::epee_object; use levin_cuprate::{ BucketBuilder, BucketError, LevinBody, LevinCommand as LevinCommandTrait, MessageType, }; -use std::fmt::Formatter; pub mod admin; pub mod common; @@ -276,8 +279,18 @@ impl RequestMessage { Ok(match command { C::Handshake => decode_message(RequestMessage::Handshake, buf)?, C::TimedSync => decode_message(RequestMessage::TimedSync, buf)?, - C::Ping => RequestMessage::Ping, - C::SupportFlags => RequestMessage::SupportFlags, + C::Ping => { + epee_encoding::from_bytes::(buf) + .map_err(|e| BucketError::BodyDecodingError(e.into()))?; + + RequestMessage::Ping + } + C::SupportFlags => { + epee_encoding::from_bytes::(buf) + .map_err(|e| BucketError::BodyDecodingError(e.into()))?; + + RequestMessage::SupportFlags + } _ => return Err(BucketError::UnknownCommand), }) } @@ -288,14 +301,8 @@ impl RequestMessage { match self { RequestMessage::Handshake(val) => build_message(C::Handshake, val, builder)?, RequestMessage::TimedSync(val) => build_message(C::TimedSync, val, builder)?, - RequestMessage::Ping => { - builder.set_command(C::Ping); - builder.set_body(Bytes::new()); - } - RequestMessage::SupportFlags => { - builder.set_command(C::SupportFlags); - builder.set_body(Bytes::new()); - } + RequestMessage::Ping => build_message(C::Ping, EmptyMessage, builder)?, + RequestMessage::SupportFlags => build_message(C::SupportFlags, EmptyMessage, builder)?, } Ok(()) } @@ -408,3 +415,13 @@ impl LevinBody for Message { } } } + +/// An internal empty message. +/// +/// This represents P2P messages that have no fields as epee's binary format will still add a header +/// for these objects, so we need to decode/encode a message. +struct EmptyMessage; + +epee_object! { + EmptyMessage, +} diff --git a/net/monero-wire/src/p2p/admin.rs b/net/monero-wire/src/p2p/admin.rs index d31ffdf..95d2f1b 100644 --- a/net/monero-wire/src/p2p/admin.rs +++ b/net/monero-wire/src/p2p/admin.rs @@ -84,7 +84,7 @@ epee_object!( ); /// The status field of an okay ping response -pub static PING_OK_RESPONSE_STATUS_TEXT: Bytes = Bytes::from_static("OK".as_bytes()); +pub const PING_OK_RESPONSE_STATUS_TEXT: Bytes = Bytes::from_static("OK".as_bytes()); /// A Ping Response #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/p2p/address-book/Cargo.toml b/p2p/address-book/Cargo.toml index 9ebbf62..e60ec61 100644 --- a/p2p/address-book/Cargo.toml +++ b/p2p/address-book/Cargo.toml @@ -13,13 +13,13 @@ monero-p2p = { path = "../monero-p2p" } tower = { workspace = true, features = ["util", "buffer"] } tokio = { workspace = true, features = ["time", "fs", "rt"]} +tokio-util = { workspace = true, features = ["time"] } futures = { workspace = true, features = ["std"] } -pin-project = { workspace = true } -async-trait = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true, features = ["std", "attributes"] } +indexmap = { workspace = true, features = ["std"] } rand = { workspace = true, features = ["std", "std_rng"] } diff --git a/p2p/address-book/src/book.rs b/p2p/address-book/src/book.rs index 211baca..4c3a773 100644 --- a/p2p/address-book/src/book.rs +++ b/p2p/address-book/src/book.rs @@ -1,22 +1,22 @@ +//! The address book service. +//! +//! This module holds the address book service for a specific network zone. use std::{ collections::{HashMap, HashSet}, - future::Future, panic, - pin::Pin, task::{Context, Poll}, time::Duration, }; use futures::{ future::{ready, Ready}, - stream::FuturesUnordered, - FutureExt, StreamExt, + FutureExt, }; -use pin_project::pin_project; use tokio::{ task::JoinHandle, - time::{interval, sleep, Interval, MissedTickBehavior, Sleep}, + time::{interval, Instant, Interval, MissedTickBehavior}, }; +use tokio_util::time::DelayQueue; use tower::Service; use monero_p2p::{ @@ -27,7 +27,7 @@ use monero_p2p::{ }; use monero_pruning::PruningSeed; -use crate::{peer_list::PeerList, store::save_peers_to_disk, AddressBookError, Config}; +use crate::{peer_list::PeerList, store::save_peers_to_disk, AddressBookConfig, AddressBookError}; #[cfg(test)] mod tests; @@ -45,21 +45,6 @@ pub struct ConnectionPeerEntry { rpc_credits_per_hash: u32, } -/// A future that resolves when a peer is unbanned. -#[pin_project(project = EnumProj)] -pub struct BanedPeerFut(Addr::BanID, #[pin] Sleep); - -impl Future for BanedPeerFut { - type Output = Addr::BanID; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); - match this.1.poll_unpin(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(_) => Poll::Ready(*this.0), - } - } -} - pub struct AddressBook { /// Our white peers - the peers we have previously connected to. white_list: PeerList, @@ -71,20 +56,19 @@ pub struct AddressBook { /// The currently connected peers. connected_peers: HashMap, ConnectionPeerEntry>, connected_peers_ban_id: HashMap<::BanID, HashSet>, - /// The currently banned peers - banned_peers: HashSet<::BanID>, - banned_peers_fut: FuturesUnordered>, + banned_peers: HashMap<::BanID, Instant>, + banned_peers_queue: DelayQueue<::BanID>, peer_save_task_handle: Option>>, peer_save_interval: Interval, - cfg: Config, + cfg: AddressBookConfig, } impl AddressBook { pub fn new( - cfg: Config, + cfg: AddressBookConfig, white_peers: Vec>, gray_peers: Vec>, anchor_peers: Vec, @@ -94,8 +78,9 @@ impl AddressBook { let anchor_list = HashSet::from_iter(anchor_peers); // TODO: persist banned peers - let banned_peers = HashSet::new(); - let banned_peers_fut = FuturesUnordered::new(); + let banned_peers = HashMap::new(); + let banned_peers_queue = DelayQueue::new(); + let connected_peers = HashMap::new(); let mut peer_save_interval = interval(cfg.peer_save_period); @@ -108,7 +93,7 @@ impl AddressBook { connected_peers, connected_peers_ban_id: HashMap::new(), banned_peers, - banned_peers_fut, + banned_peers_queue, peer_save_task_handle: None, peer_save_interval, cfg, @@ -146,8 +131,9 @@ impl AddressBook { } fn poll_unban_peers(&mut self, cx: &mut Context<'_>) { - while let Poll::Ready(Some(ban_id)) = self.banned_peers_fut.poll_next_unpin(cx) { - self.banned_peers.remove(&ban_id); + while let Poll::Ready(Some(ban_id)) = self.banned_peers_queue.poll_expired(cx) { + tracing::debug!("Host {:?} is unbanned, ban has expired.", ban_id.get_ref(),); + self.banned_peers.remove(ban_id.get_ref()); } } @@ -198,8 +184,8 @@ impl AddressBook { } fn ban_peer(&mut self, addr: Z::Addr, time: Duration) { - if self.banned_peers.contains(&addr.ban_id()) { - return; + if self.banned_peers.contains_key(&addr.ban_id()) { + tracing::error!("Tried to ban peer twice, this shouldn't happen.") } if let Some(connected_peers_with_ban_id) = self.connected_peers_ban_id.get(&addr.ban_id()) { @@ -220,17 +206,20 @@ impl AddressBook { self.white_list.remove_peers_with_ban_id(&addr.ban_id()); self.gray_list.remove_peers_with_ban_id(&addr.ban_id()); - self.banned_peers.insert(addr.ban_id()); - self.banned_peers_fut - .push(BanedPeerFut(addr.ban_id(), sleep(time))) + let unban_at = Instant::now() + time; + + self.banned_peers_queue.insert_at(addr.ban_id(), unban_at); + self.banned_peers.insert(addr.ban_id(), unban_at); } /// adds a peer to the gray list. fn add_peer_to_gray_list(&mut self, mut peer: ZoneSpecificPeerListEntryBase) { if self.white_list.contains_peer(&peer.adr) { + tracing::trace!("Peer {} is already in white list skipping.", peer.adr); return; }; if !self.gray_list.contains_peer(&peer.adr) { + tracing::trace!("Adding peer {} to gray list.", peer.adr); peer.last_seen = 0; self.gray_list.add_new_peer(peer); } @@ -238,7 +227,7 @@ impl AddressBook { /// Checks if a peer is banned. fn is_peer_banned(&self, peer: &Z::Addr) -> bool { - self.banned_peers.contains(&peer.ban_id()) + self.banned_peers.contains_key(&peer.ban_id()) } fn handle_incoming_peer_list( @@ -264,24 +253,22 @@ impl AddressBook { .reduce_list(&HashSet::new(), self.cfg.max_gray_list_length); } - fn get_random_white_peer( - &self, + fn take_random_white_peer( + &mut self, block_needed: Option, ) -> Option> { tracing::debug!("Retrieving random white peer"); self.white_list - .get_random_peer(&mut rand::thread_rng(), block_needed) - .copied() + .take_random_peer(&mut rand::thread_rng(), block_needed) } - fn get_random_gray_peer( - &self, + fn take_random_gray_peer( + &mut self, block_needed: Option, ) -> Option> { tracing::debug!("Retrieving random gray peer"); self.gray_list - .get_random_peer(&mut rand::thread_rng(), block_needed) - .copied() + .take_random_peer(&mut rand::thread_rng(), block_needed) } fn get_white_peers(&self, len: usize) -> Vec> { @@ -341,7 +328,7 @@ impl AddressBook { if self.is_peer_banned(addr) { return Err(AddressBookError::PeerIsBanned); } - // although the peer may not be readable still add it to the connected peers with ban ID. + // although the peer may not be reachable still add it to the connected peers with ban ID. self.connected_peers_ban_id .entry(addr.ban_id()) .or_default() @@ -350,13 +337,11 @@ impl AddressBook { // if the address is Some that means we can reach it from our node. if let Some(addr) = peer.addr { - // remove the peer from the gray list as we know it's active. - self.gray_list.remove_peer(&addr); // The peer is reachable, update our white list and add it to the anchor connections. self.update_white_list_peer_entry(&peer)?; + self.anchor_list.insert(addr); self.white_list .reduce_list(&self.anchor_list, self.cfg.max_white_list_length); - self.anchor_list.insert(addr); } self.connected_peers.insert(internal_peer_id, peer); @@ -382,8 +367,8 @@ impl Service> for AddressBook { let response = match req { AddressBookRequest::NewConnection { - addr, internal_peer_id, + public_address, handle, id, pruning_seed, @@ -393,7 +378,7 @@ impl Service> for AddressBook { .handle_new_connection( internal_peer_id, ConnectionPeerEntry { - addr, + addr: public_address, id, handle, pruning_seed, @@ -402,20 +387,21 @@ impl Service> for AddressBook { }, ) .map(|_| AddressBookResponse::Ok), - AddressBookRequest::BanPeer(addr, time) => { - self.ban_peer(addr, time); - Ok(AddressBookResponse::Ok) - } AddressBookRequest::IncomingPeerList(peer_list) => { self.handle_incoming_peer_list(peer_list); Ok(AddressBookResponse::Ok) } - AddressBookRequest::GetRandomWhitePeer { height } => self - .get_random_white_peer(height) + AddressBookRequest::TakeRandomWhitePeer { height } => self + .take_random_white_peer(height) .map(AddressBookResponse::Peer) .ok_or(AddressBookError::PeerNotFound), - AddressBookRequest::GetRandomGrayPeer { height } => self - .get_random_gray_peer(height) + AddressBookRequest::TakeRandomGrayPeer { height } => self + .take_random_gray_peer(height) + .map(AddressBookResponse::Peer) + .ok_or(AddressBookError::PeerNotFound), + AddressBookRequest::TakeRandomPeer { height } => self + .take_random_white_peer(height) + .or_else(|| self.take_random_gray_peer(height)) .map(AddressBookResponse::Peer) .ok_or(AddressBookError::PeerNotFound), AddressBookRequest::GetWhitePeers(len) => { diff --git a/p2p/address-book/src/book/tests.rs b/p2p/address-book/src/book/tests.rs index e7c790d..4e1fd87 100644 --- a/p2p/address-book/src/book/tests.rs +++ b/p2p/address-book/src/book/tests.rs @@ -8,12 +8,12 @@ use monero_p2p::handles::HandleBuilder; use monero_pruning::PruningSeed; use super::{AddressBook, ConnectionPeerEntry, InternalPeerID}; -use crate::{peer_list::tests::make_fake_peer_list, AddressBookError, Config}; +use crate::{peer_list::tests::make_fake_peer_list, AddressBookConfig, AddressBookError}; use cuprate_test_utils::test_netzone::{TestNetZone, TestNetZoneAddr}; -fn test_cfg() -> Config { - Config { +fn test_cfg() -> AddressBookConfig { + AddressBookConfig { max_white_list_length: 100, max_gray_list_length: 500, peer_store_file: PathBuf::new(), @@ -35,7 +35,7 @@ fn make_fake_address_book( connected_peers: Default::default(), connected_peers_ban_id: Default::default(), banned_peers: Default::default(), - banned_peers_fut: Default::default(), + banned_peers_queue: Default::default(), peer_save_task_handle: None, peer_save_interval: interval(Duration::from_secs(60)), cfg: test_cfg(), @@ -43,15 +43,15 @@ fn make_fake_address_book( } #[tokio::test] -async fn get_random_peers() { - let address_book = make_fake_address_book(50, 250); - let peer = address_book.get_random_white_peer(None).unwrap(); - assert!(address_book.white_list.contains_peer(&peer.adr)); +async fn take_random_peers() { + let mut address_book = make_fake_address_book(50, 250); + let peer = address_book.take_random_white_peer(None).unwrap(); + assert!(!address_book.white_list.contains_peer(&peer.adr)); assert!(!address_book.gray_list.contains_peer(&peer.adr)); - let peer = address_book.get_random_gray_peer(None).unwrap(); + let peer = address_book.take_random_gray_peer(None).unwrap(); assert!(!address_book.white_list.contains_peer(&peer.adr)); - assert!(address_book.gray_list.contains_peer(&peer.adr)); + assert!(!address_book.gray_list.contains_peer(&peer.adr)); } #[tokio::test] @@ -81,7 +81,7 @@ async fn add_new_peer_already_connected() { let semaphore = Arc::new(Semaphore::new(10)); - let (_, handle, _) = HandleBuilder::default() + let (_, handle) = HandleBuilder::default() .with_permit(semaphore.clone().try_acquire_owned().unwrap()) .build(); @@ -99,7 +99,7 @@ async fn add_new_peer_already_connected() { ) .unwrap(); - let (_, handle, _) = HandleBuilder::default() + let (_, handle) = HandleBuilder::default() .with_permit(semaphore.try_acquire_owned().unwrap()) .build(); @@ -143,7 +143,12 @@ async fn banned_peer_removed_from_peer_lists() { assert_eq!(address_book.white_list.len(), 98); assert_eq!( - address_book.banned_peers_fut.next().await.unwrap(), + address_book + .banned_peers_queue + .next() + .await + .unwrap() + .into_inner(), TestNetZoneAddr(1) ) } diff --git a/p2p/address-book/src/lib.rs b/p2p/address-book/src/lib.rs index cc2dc38..ce56b4f 100644 --- a/p2p/address-book/src/lib.rs +++ b/p2p/address-book/src/lib.rs @@ -3,7 +3,7 @@ //! This module holds the logic for persistent peer storage. //! Cuprates address book is modeled as a [`tower::Service`] //! The request is [`AddressBookRequest`] and the response is -//! [`AddressBookResponse`]. +//! [`AddressBookResponse`](monero_p2p::services::AddressBookResponse). //! //! Cuprate, like monerod, actually has multiple address books, one //! for each [`NetworkZone`]. This is to reduce the possibility of @@ -11,24 +11,31 @@ //! and so peers will only get told about peers they can //! connect to. //! +use std::{io::ErrorKind, path::PathBuf, time::Duration}; -use std::{path::PathBuf, time::Duration}; +use tower::buffer::Buffer; -use monero_p2p::{ - services::{AddressBookRequest, AddressBookResponse}, - NetworkZone, -}; +use monero_p2p::{services::AddressBookRequest, NetworkZone}; mod book; mod peer_list; mod store; +/// The address book config. #[derive(Debug, Clone)] -pub struct Config { - max_white_list_length: usize, - max_gray_list_length: usize, - peer_store_file: PathBuf, - peer_save_period: Duration, +pub struct AddressBookConfig { + /// The maximum number of white peers in the peer list. + /// + /// White peers are peers we have connected to before. + pub max_white_list_length: usize, + /// The maximum number of gray peers in the peer list. + /// + /// Gray peers are peers we are yet to make a connection to. + pub max_gray_list_length: usize, + /// The location to store the address book. + pub peer_store_file: PathBuf, + /// The amount of time between saving the address book to disk. + pub peer_save_period: Duration, } /// Possible errors when dealing with the address book. @@ -41,9 +48,6 @@ pub enum AddressBookError { /// The peer is not in the address book for this zone. #[error("Peer was not found in book")] PeerNotFound, - /// The peer list is empty. - #[error("The peer list is empty")] - PeerListEmpty, /// Immutable peer data was changed. #[error("Immutable peer data was changed: {0}")] PeersDataChanged(&'static str), @@ -58,19 +62,25 @@ pub enum AddressBookError { AddressBookTaskExited, } +/// Initializes the P2P address book for a specific network zone. pub async fn init_address_book( - cfg: Config, -) -> Result< - impl tower::Service< - AddressBookRequest, - Response = AddressBookResponse, - Error = tower::BoxError, - >, - std::io::Error, -> { - let (white_list, gray_list) = store::read_peers_from_disk::(&cfg).await?; + cfg: AddressBookConfig, +) -> Result, AddressBookRequest>, std::io::Error> { + tracing::info!( + "Loading peers from file: {} ", + cfg.peer_store_file.display() + ); + + let (white_list, gray_list) = match store::read_peers_from_disk::(&cfg).await { + Ok(res) => res, + Err(e) if e.kind() == ErrorKind::NotFound => (vec![], vec![]), + Err(e) => { + tracing::error!("Failed to open peer list, {}", e); + panic!("{e}"); + } + }; let address_book = book::AddressBook::::new(cfg, white_list, gray_list, Vec::new()); - Ok(tower::buffer::Buffer::new(address_book, 15)) + Ok(Buffer::new(address_book, 15)) } diff --git a/p2p/address-book/src/peer_list.rs b/p2p/address-book/src/peer_list.rs index dfc67d0..f2c192f 100644 --- a/p2p/address-book/src/peer_list.rs +++ b/p2p/address-book/src/peer_list.rs @@ -1,6 +1,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; -use rand::{seq::SliceRandom, Rng}; +use indexmap::IndexMap; +use rand::prelude::*; use monero_p2p::{services::ZoneSpecificPeerListEntryBase, NetZoneAddress, NetworkZone}; use monero_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT}; @@ -14,7 +15,7 @@ pub mod tests; #[derive(Debug)] pub struct PeerList { /// The peers with their peer data. - pub peers: HashMap>, + pub peers: IndexMap>, /// An index of Pruning seed to address, so can quickly grab peers with the blocks /// we want. /// @@ -31,7 +32,7 @@ pub struct PeerList { impl PeerList { /// Creates a new peer list. pub fn new(list: Vec>) -> PeerList { - let mut peers = HashMap::with_capacity(list.len()); + let mut peers = IndexMap::with_capacity(list.len()); let mut pruning_seeds = BTreeMap::new(); let mut ban_ids = HashMap::with_capacity(list.len()); @@ -78,29 +79,27 @@ impl PeerList { } } - /// Gets a reference to a peer - pub fn get_peer(&self, peer: &Z::Addr) -> Option<&ZoneSpecificPeerListEntryBase> { - self.peers.get(peer) - } - /// Returns a random peer. /// If the pruning seed is specified then we will get a random peer with /// that pruning seed otherwise we will just get a random peer in the whole /// list. - pub fn get_random_peer( - &self, + /// + /// The given peer will be removed from the peer list. + pub fn take_random_peer( + &mut self, r: &mut R, block_needed: Option, - ) -> Option<&ZoneSpecificPeerListEntryBase> { + ) -> Option> { if let Some(needed_height) = block_needed { let (_, addresses_with_block) = self.pruning_seeds.iter().find(|(seed, _)| { // TODO: factor in peer blockchain height? seed.get_next_unpruned_block(needed_height, CRYPTONOTE_MAX_BLOCK_HEIGHT) - .expect("Explain") + .expect("Block needed is higher than max block allowed.") == needed_height })?; let n = r.gen_range(0..addresses_with_block.len()); - self.get_peer(&addresses_with_block[n]) + let peer = addresses_with_block[n]; + self.remove_peer(&peer) } else { let len = self.len(); if len == 0 { @@ -108,7 +107,8 @@ impl PeerList { } else { let n = r.gen_range(0..len); - self.peers.values().nth(n) + let (&key, _) = self.peers.get_index(n).unwrap(); + self.remove_peer(&key) } } } @@ -118,7 +118,10 @@ impl PeerList { r: &mut R, len: usize, ) -> Vec> { - let mut peers = self.peers.values().copied().collect::>(); + let mut peers = self.peers.values().copied().choose_multiple(r, len); + // Order of the returned peers is not random, I am unsure of the impact of this, potentially allowing someone to make guesses about which peers + // were connected first. + // So to mitigate this shuffle the result. peers.shuffle(r); peers.drain(len.min(peers.len())..peers.len()); peers @@ -180,7 +183,7 @@ impl PeerList { &mut self, peer: &Z::Addr, ) -> Option> { - let peer_eb = self.peers.remove(peer)?; + let peer_eb = self.peers.swap_remove(peer)?; self.remove_peer_from_all_idxs(&peer_eb); Some(peer_eb) } diff --git a/p2p/address-book/src/peer_list/tests.rs b/p2p/address-book/src/peer_list/tests.rs index c46f6ec..ed9682e 100644 --- a/p2p/address-book/src/peer_list/tests.rs +++ b/p2p/address-book/src/peer_list/tests.rs @@ -86,12 +86,10 @@ fn peer_list_reduce_length_with_peers_we_need() { fn peer_list_remove_specific_peer() { let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100); - let peer = *peer_list - .get_random_peer(&mut rand::thread_rng(), None) + let peer = peer_list + .take_random_peer(&mut rand::thread_rng(), None) .unwrap(); - assert!(peer_list.remove_peer(&peer.adr).is_some()); - let pruning_idxs = peer_list.pruning_seeds; let peers = peer_list.peers; @@ -125,7 +123,7 @@ fn peer_list_add_new_peer() { peer_list.add_new_peer(new_peer); assert_eq!(peer_list.len(), 11); - assert_eq!(peer_list.get_peer(&new_peer.adr), Some(&new_peer)); + assert_eq!(peer_list.peers.get(&new_peer.adr), Some(&new_peer)); assert!(peer_list .pruning_seeds .get(&new_peer.pruning_seed) @@ -136,19 +134,22 @@ fn peer_list_add_new_peer() { #[test] fn peer_list_add_existing_peer() { let mut peer_list = make_fake_peer_list(0, 10); - let existing_peer = *peer_list.get_peer(&TestNetZoneAddr(0)).unwrap(); + let existing_peer = *peer_list.peers.get(&TestNetZoneAddr(0)).unwrap(); peer_list.add_new_peer(existing_peer); assert_eq!(peer_list.len(), 10); - assert_eq!(peer_list.get_peer(&existing_peer.adr), Some(&existing_peer)); + assert_eq!( + peer_list.peers.get(&existing_peer.adr), + Some(&existing_peer) + ); } #[test] fn peer_list_get_non_existent_peer() { let peer_list = make_fake_peer_list(0, 10); let non_existent_peer = TestNetZoneAddr(50); - assert_eq!(peer_list.get_peer(&non_existent_peer), None); + assert_eq!(peer_list.peers.get(&non_existent_peer), None); } #[test] @@ -159,7 +160,7 @@ fn peer_list_get_peer_with_block() { peer_list.add_new_peer(make_fake_peer(101, Some(384))); let peer = peer_list - .get_random_peer(&mut r, Some(1)) + .take_random_peer(&mut r, Some(1)) .expect("We just added a peer with the correct seed"); assert!(peer @@ -172,13 +173,10 @@ fn peer_list_get_peer_with_block() { fn peer_list_ban_peers() { let mut peer_list = make_fake_peer_list_with_random_pruning_seeds(100); let peer = peer_list - .get_random_peer(&mut rand::thread_rng(), None) + .take_random_peer(&mut rand::thread_rng(), None) .unwrap(); let ban_id = peer.adr.ban_id(); - assert!(peer_list.contains_peer(&peer.adr)); - assert_ne!(peer_list.ban_ids.get(&ban_id).unwrap().len(), 0); - peer_list.remove_peers_with_ban_id(&ban_id); assert_eq!(peer_list.ban_ids.get(&ban_id), None); for (addr, _) in peer_list.peers { assert_ne!(addr.ban_id(), ban_id); diff --git a/p2p/address-book/src/store.rs b/p2p/address-book/src/store.rs index 690bcfa..c15e0a7 100644 --- a/p2p/address-book/src/store.rs +++ b/p2p/address-book/src/store.rs @@ -5,7 +5,7 @@ use tokio::task::{spawn_blocking, JoinHandle}; use monero_p2p::{services::ZoneSpecificPeerListEntryBase, NetZoneAddress, NetworkZone}; -use crate::{peer_list::PeerList, Config}; +use crate::{peer_list::PeerList, AddressBookConfig}; // TODO: store anchor and ban list. @@ -22,7 +22,7 @@ struct DeserPeerDataV1 { } pub fn save_peers_to_disk( - cfg: &Config, + cfg: &AddressBookConfig, white_list: &PeerList, gray_list: &PeerList, ) -> JoinHandle> { @@ -39,7 +39,7 @@ pub fn save_peers_to_disk( } pub async fn read_peers_from_disk( - cfg: &Config, + cfg: &AddressBookConfig, ) -> Result< ( Vec>, diff --git a/p2p/monero-p2p/Cargo.toml b/p2p/monero-p2p/Cargo.toml index 7abd9c8..50202f8 100644 --- a/p2p/monero-p2p/Cargo.toml +++ b/p2p/monero-p2p/Cargo.toml @@ -22,7 +22,7 @@ async-trait = { workspace = true } tower = { workspace = true, features = ["util"] } thiserror = { workspace = true } -tracing = { workspace = true, features = ["std"] } +tracing = { workspace = true, features = ["std", "attributes"] } borsh = { workspace = true, default-features = false, features = ["derive", "std"], optional = true } diff --git a/p2p/monero-p2p/src/client/handshaker.rs b/p2p/monero-p2p/src/client/handshaker.rs index 52f8e2e..bad4882 100644 --- a/p2p/monero-p2p/src/client/handshaker.rs +++ b/p2p/monero-p2p/src/client/handshaker.rs @@ -1,3 +1,9 @@ +//! Handshake Module +//! +//! This module contains a [`HandShaker`] which is a [`Service`] that takes an open connection and attempts +//! to complete a handshake with them. +//! +//! This module also contains a [`ping`] function that can be used to check if an address is reachable. use std::{ future::Future, marker::PhantomData, @@ -13,8 +19,9 @@ use tokio::{ time::{error::Elapsed, timeout}, }; use tower::{Service, ServiceExt}; -use tracing::Instrument; +use tracing::{info_span, instrument, Instrument}; +use monero_pruning::{PruningError, PruningSeed}; use monero_wire::{ admin::{ HandshakeRequest, HandshakeResponse, PingResponse, SupportFlagsResponse, @@ -29,13 +36,32 @@ use crate::{ client::{connection::Connection, Client, InternalPeerID}, handles::HandleBuilder, AddressBook, AddressBookRequest, AddressBookResponse, ConnectionDirection, CoreSyncDataRequest, - CoreSyncDataResponse, CoreSyncSvc, MessageID, NetworkZone, PeerBroadcast, PeerRequestHandler, - SharedError, MAX_PEERS_IN_PEER_LIST_MESSAGE, + CoreSyncDataResponse, CoreSyncSvc, MessageID, NetZoneAddress, NetworkZone, PeerBroadcast, + PeerRequestHandler, SharedError, MAX_PEERS_IN_PEER_LIST_MESSAGE, }; -const MAX_EAGER_PROTOCOL_MESSAGES: usize = 2; +/// This is a Cuprate specific constant. +/// +/// When completing a handshake monerod might send protocol messages before the handshake is actually +/// complete, this is a problem for Cuprate as we must complete the handshake before responding to any +/// protocol requests. So when we receive a protocol message during a handshake we keep them around to handle +/// after the handshake. +/// +/// Because we use the [bytes crate](https://crates.io/crates/bytes) in monero-wire for zero-copy parsing +/// it is not safe to keep too many of these messages around for long. +const MAX_EAGER_PROTOCOL_MESSAGES: usize = 1; +/// The time given to complete a handshake before the handshake fails. const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(120); +/// A timeout put on pings during handshakes. +/// +/// When we receive an inbound connection we open an outbound connection to the node and send a ping message +/// to see if we can reach the node, so we can add it to our address book. +/// +/// This timeout must be significantly shorter than [`HANDSHAKE_TIMEOUT`] so we don't drop inbound connections that +/// don't have ports open. +const PING_TIMEOUT: Duration = Duration::from_secs(10); + #[derive(Debug, thiserror::Error)] pub enum HandshakeError { #[error("The handshake timed out")] @@ -48,6 +74,8 @@ pub enum HandshakeError { PeerSentIncorrectPeerList(#[from] crate::services::PeerListConversionError), #[error("Peer sent invalid message: {0}")] PeerSentInvalidMessage(&'static str), + #[error("The peers pruning seed is invalid.")] + InvalidPruningSeed(#[from] PruningError), #[error("Levin bucket error: {0}")] LevinBucketError(#[from] BucketError), #[error("Internal service error: {0}")] @@ -56,28 +84,42 @@ pub enum HandshakeError { IO(#[from] std::io::Error), } +/// A request to complete a handshake. pub struct DoHandshakeRequest { + /// The [`InternalPeerID`] of the peer we are handshaking with. pub addr: InternalPeerID, + /// The receiving side of the connection. pub peer_stream: Z::Stream, + /// The sending side of the connection. pub peer_sink: Z::Sink, + /// The direction of the connection. pub direction: ConnectionDirection, + /// A permit for this connection. pub permit: OwnedSemaphorePermit, } +/// The peer handshaking service. #[derive(Debug, Clone)] pub struct HandShaker { + /// The address book service. address_book: AdrBook, + /// The core sync data service. core_sync_svc: CSync, + /// The peer request handler service. peer_request_svc: ReqHdlr, + /// Our [`BasicNodeData`] our_basic_node_data: BasicNodeData, + /// The channel to broadcast messages to all peers created with this handshaker. broadcast_tx: broadcast::Sender>, + /// The network zone. _zone: PhantomData, } impl HandShaker { + /// Creates a new handshaker. pub fn new( address_book: AdrBook, core_sync_svc: CSync, @@ -122,7 +164,7 @@ where let core_sync_svc = self.core_sync_svc.clone(); let our_basic_node_data = self.our_basic_node_data.clone(); - let span = tracing::info_span!(parent: &tracing::Span::current(), "handshaker", %req.addr); + let span = info_span!(parent: &tracing::Span::current(), "handshaker", addr=%req.addr); async move { timeout( @@ -143,6 +185,46 @@ where } } +/// Send a ping to the requested peer and wait for a response, returning the `peer_id`. +/// +/// This function does not put a timeout on the ping. +pub async fn ping(addr: N::Addr) -> Result { + tracing::debug!("Sending Ping to peer"); + + let (mut peer_stream, mut peer_sink) = N::connect_to_peer(addr).await?; + + tracing::debug!("Made outbound connection to peer, sending ping."); + + peer_sink + .send(Message::Request(RequestMessage::Ping).into()) + .await?; + + if let Some(res) = peer_stream.next().await { + if let Message::Response(ResponseMessage::Ping(ping)) = res? { + if ping.status == PING_OK_RESPONSE_STATUS_TEXT { + tracing::debug!("Ping successful."); + return Ok(ping.peer_id); + } + + tracing::debug!("Peer's ping response was not `OK`."); + return Err(HandshakeError::PeerSentInvalidMessage( + "Ping response was not `OK`", + )); + } + + tracing::debug!("Peer sent invalid response to ping."); + return Err(HandshakeError::PeerSentInvalidMessage( + "Peer did not send correct response for ping.", + )); + } + + tracing::debug!("Connection closed before ping response."); + Err(BucketError::IO(std::io::Error::new( + std::io::ErrorKind::ConnectionAborted, + "The peer stream returned None", + )))? +} + /// This function completes a handshake with the requested peer. async fn handshake( req: DoHandshakeRequest, @@ -167,11 +249,13 @@ where permit, } = req; + // A list of protocol messages the peer has sent during the handshake for us to handle after the handshake. + // see: [`MAX_EAGER_PROTOCOL_MESSAGES`] let mut eager_protocol_messages = Vec::new(); - let mut allow_support_flag_req = true; let (peer_core_sync, mut peer_node_data) = match direction { ConnectionDirection::InBound => { + // Inbound handshake the peer sends the request. tracing::debug!("waiting for handshake request."); let Message::Request(RequestMessage::Handshake(handshake_req)) = wait_for_message::( @@ -180,8 +264,7 @@ where &mut peer_sink, &mut peer_stream, &mut eager_protocol_messages, - &mut allow_support_flag_req, - our_basic_node_data.support_flags, + &our_basic_node_data, ) .await? else { @@ -189,10 +272,11 @@ where }; tracing::debug!("Received handshake request."); - + // We will respond to the handshake request later. (handshake_req.payload_data, handshake_req.node_data) } ConnectionDirection::OutBound => { + // Outbound handshake, we send the request. send_hs_request::( &mut peer_sink, &mut core_sync_svc, @@ -200,6 +284,7 @@ where ) .await?; + // Wait for the handshake response. let Message::Response(ResponseMessage::Handshake(handshake_res)) = wait_for_message::( LevinCommand::Handshake, @@ -207,8 +292,7 @@ where &mut peer_sink, &mut peer_stream, &mut eager_protocol_messages, - &mut allow_support_flag_req, - our_basic_node_data.support_flags, + &our_basic_node_data, ) .await? else { @@ -228,6 +312,7 @@ where handshake_res.local_peerlist_new.len() ); + // Tell our address book about the new peers. address_book .ready() .await? @@ -252,6 +337,10 @@ where return Err(HandshakeError::PeerHasSameNodeID); } + /* + // monerod sends a request for support flags if the peer doesn't specify any but this seems unnecessary + // as the peer should specify them in the handshake. + if peer_node_data.support_flags.is_empty() { tracing::debug!( "Peer didn't send support flags or has no features, sending request to make sure." @@ -267,8 +356,7 @@ where &mut peer_sink, &mut peer_stream, &mut eager_protocol_messages, - &mut allow_support_flag_req, - our_basic_node_data.support_flags, + &our_basic_node_data, ) .await? else { @@ -279,28 +367,75 @@ where peer_node_data.support_flags = support_flags_res.support_flags; } - if direction == ConnectionDirection::InBound { - send_hs_response::( - &mut peer_sink, - &mut core_sync_svc, - &mut address_book, - our_basic_node_data, - ) - .await?; - } + */ + // Make sure the pruning seed is valid. + let pruning_seed = PruningSeed::decompress_p2p_rules(peer_core_sync.pruning_seed)?; + + // public_address, if Some, is the reachable address of the node. + let public_address = 'check_out_addr: { + match direction { + ConnectionDirection::InBound => { + // First send the handshake response. + send_hs_response::( + &mut peer_sink, + &mut core_sync_svc, + &mut address_book, + our_basic_node_data, + ) + .await?; + + // Now if the peer specifies a reachable port, open a connection and ping them to check. + if peer_node_data.my_port != 0 { + let InternalPeerID::KnownAddr(mut outbound_address) = addr else { + // Anonymity network, we don't know the inbound address. + break 'check_out_addr None; + }; + + // u32 does not make sense as a port so just truncate it. + outbound_address.set_port(peer_node_data.my_port as u16); + + let Ok(Ok(ping_peer_id)) = timeout( + PING_TIMEOUT, + ping::(outbound_address).instrument(info_span!("ping")), + ) + .await + else { + // The ping was not successful. + break 'check_out_addr None; + }; + + // Make sure we are talking to the right node. + if ping_peer_id == peer_node_data.peer_id { + break 'check_out_addr Some(outbound_address); + } + } + // The peer did not specify a reachable port or the ping was not successful. + None + } + ConnectionDirection::OutBound => { + let InternalPeerID::KnownAddr(outbound_addr) = addr else { + unreachable!("How could we make an outbound connection to an unknown address"); + }; + + // This is an outbound connection, this address is obviously reachable. + Some(outbound_addr) + } + } + }; + + // Tell the core sync service about the new peer. core_sync_svc .ready() .await? - .call(CoreSyncDataRequest::HandleIncoming(peer_core_sync)) + .call(CoreSyncDataRequest::HandleIncoming(peer_core_sync.clone())) .await?; tracing::debug!("Handshake complete."); + // Set up the connection data. let error_slot = SharedError::new(); - - let (connection_guard, handle, _) = HandleBuilder::new().with_permit(permit).build(); - + let (connection_guard, handle) = HandleBuilder::new().with_permit(permit).build(); let (connection_tx, client_rx) = mpsc::channel(3); let connection = Connection::::new( @@ -315,12 +450,33 @@ where let connection_handle = tokio::spawn(connection.run(peer_stream.fuse(), eager_protocol_messages)); - let client = Client::::new(addr, handle, connection_tx, connection_handle, error_slot); + let client = Client::::new( + addr, + handle.clone(), + connection_tx, + connection_handle, + error_slot, + ); + + // Tell the address book about the new connection. + address_book + .ready() + .await? + .call(AddressBookRequest::NewConnection { + internal_peer_id: addr, + public_address, + handle, + id: peer_node_data.peer_id, + pruning_seed, + rpc_port: peer_node_data.rpc_port, + rpc_credits_per_hash: peer_node_data.rpc_credits_per_hash, + }) + .await?; Ok(client) } -/// Sends a [`HandshakeRequest`] to the peer. +/// Sends a [`RequestMessage::Handshake`] down the peer sink. async fn send_hs_request( peer_sink: &mut Z::Sink, core_sync_svc: &mut CSync, @@ -352,6 +508,7 @@ where Ok(()) } +/// Sends a [`ResponseMessage::Handshake`] down the peer sink. async fn send_hs_response( peer_sink: &mut Z::Sink, core_sync_svc: &mut CSync, @@ -397,15 +554,25 @@ where Ok(()) } +/// Waits for a message with a specific [`LevinCommand`]. +/// +/// The message needed must not be a protocol message, only request/ response "admin" messages are allowed. +/// +/// `levin_command` is the [`LevinCommand`] you need and `request` is for if the message is a request. async fn wait_for_message( levin_command: LevinCommand, request: bool, + peer_sink: &mut Z::Sink, peer_stream: &mut Z::Stream, + eager_protocol_messages: &mut Vec, - allow_support_flag_req: &mut bool, - support_flags: PeerSupportFlags, + + our_basic_node_data: &BasicNodeData, ) -> Result { + let mut allow_support_flag_req = true; + let mut allow_ping = true; + while let Some(message) = peer_stream.next().await { let message = message?; @@ -431,21 +598,38 @@ async fn wait_for_message( return Ok(Message::Request(req_message)); } - if matches!(req_message, RequestMessage::SupportFlags) { - if !*allow_support_flag_req { - return Err(HandshakeError::PeerSentInvalidMessage( - "Peer sent 2 support flag requests", - )); + match req_message { + RequestMessage::SupportFlags => { + if !allow_support_flag_req { + return Err(HandshakeError::PeerSentInvalidMessage( + "Peer sent 2 support flag requests", + )); + } + send_support_flags::(peer_sink, our_basic_node_data.support_flags) + .await?; + // don't let the peer send more after the first request. + allow_support_flag_req = false; + continue; } - send_support_flags::(peer_sink, support_flags).await?; - // don't let the peer send more after the first request. - *allow_support_flag_req = false; - continue; - } + RequestMessage::Ping => { + if !allow_support_flag_req { + return Err(HandshakeError::PeerSentInvalidMessage( + "Peer sent 2 ping requests", + )); + } - return Err(HandshakeError::PeerSentInvalidMessage( - "Peer sent an admin request before responding to the handshake", - )); + send_ping_response::(peer_sink, our_basic_node_data.peer_id).await?; + + // don't let the peer send more after the first request. + allow_ping = false; + continue; + } + _ => { + return Err(HandshakeError::PeerSentInvalidMessage( + "Peer sent an admin request before responding to the handshake", + )) + } + } } Message::Response(res_message) if !request => { if res_message.command() == levin_command { @@ -470,6 +654,7 @@ async fn wait_for_message( )))? } +/// Sends a [`ResponseMessage::SupportFlags`] down the peer sink. async fn send_support_flags( peer_sink: &mut Z::Sink, support_flags: PeerSupportFlags, @@ -484,3 +669,20 @@ async fn send_support_flags( ) .await?) } + +/// Sends a [`ResponseMessage::Ping`] down the peer sink. +async fn send_ping_response( + peer_sink: &mut Z::Sink, + peer_id: u64, +) -> Result<(), HandshakeError> { + tracing::debug!("Sending ping response."); + Ok(peer_sink + .send( + Message::Response(ResponseMessage::Ping(PingResponse { + status: PING_OK_RESPONSE_STATUS_TEXT, + peer_id, + })) + .into(), + ) + .await?) +} diff --git a/p2p/monero-p2p/src/handles.rs b/p2p/monero-p2p/src/handles.rs index c07a76d..ed8a30d 100644 --- a/p2p/monero-p2p/src/handles.rs +++ b/p2p/monero-p2p/src/handles.rs @@ -1,28 +1,41 @@ +//! Connection Handles. //! -use std::time::Duration; +//! This module contains the [`ConnectionHandle`] which allows banning a peer, disconnecting a peer and +//! checking if the peer is still connected. +use std::{ + sync::{Arc, OnceLock}, + time::Duration, +}; -use futures::{channel::mpsc, SinkExt}; +use futures::SinkExt; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio_util::sync::CancellationToken; +/// A [`ConnectionHandle`] builder. #[derive(Default, Debug)] pub struct HandleBuilder { permit: Option, } impl HandleBuilder { + /// Create a new builder. pub fn new() -> Self { Self { permit: None } } + /// Sets the permit for this connection. + /// + /// This must be called at least once. pub fn with_permit(mut self, permit: OwnedSemaphorePermit) -> Self { self.permit = Some(permit); self } - pub fn build(self) -> (ConnectionGuard, ConnectionHandle, PeerHandle) { + /// Builds the [`ConnectionGuard`] which should be handed to the connection task and the [`ConnectionHandle`]. + /// + /// This will panic if a permit was not set [`HandleBuilder::with_permit`] + pub fn build(self) -> (ConnectionGuard, ConnectionHandle) { let token = CancellationToken::new(); - let (tx, rx) = mpsc::channel(0); ( ConnectionGuard { @@ -31,13 +44,14 @@ impl HandleBuilder { }, ConnectionHandle { token: token.clone(), - ban: rx, + ban: Arc::new(OnceLock::new()), }, - PeerHandle { ban: tx, token }, ) } } +/// A struct representing the time a peer should be banned for. +#[derive(Debug, Copy, Clone)] pub struct BanPeer(pub Duration); /// A struct given to the connection task. @@ -47,9 +61,13 @@ pub struct ConnectionGuard { } impl ConnectionGuard { + /// Checks if we should close the connection. pub fn should_shutdown(&self) -> bool { self.token.is_cancelled() } + /// Tell the corresponding [`ConnectionHandle`]s that this connection is closed. + /// + /// This will be called on [`Drop::drop`]. pub fn connection_closed(&self) { self.token.cancel() } @@ -61,39 +79,29 @@ impl Drop for ConnectionGuard { } } -/// A handle given to a task that needs to close this connection and find out if the connection has -/// been banned. +/// A handle given to a task that needs to ban, disconnect, check if the peer should be banned or check +/// the peer is still connected. +#[derive(Debug, Clone)] pub struct ConnectionHandle { token: CancellationToken, - ban: mpsc::Receiver, + ban: Arc>, } impl ConnectionHandle { + /// Bans the peer for the given `duration`. + pub fn ban_peer(&self, duration: Duration) { + let _ = self.ban.set(BanPeer(duration)); + } + /// Checks if this connection is closed. pub fn is_closed(&self) -> bool { self.token.is_cancelled() } + /// Returns if this peer has been banned and the [`Duration`] of that ban. pub fn check_should_ban(&mut self) -> Option { - match self.ban.try_next() { - Ok(res) => res, - Err(_) => None, - } + self.ban.get().copied() } + /// Sends the signal to the connection task to disconnect. pub fn send_close_signal(&self) { self.token.cancel() } } - -/// A handle given to a task that needs to be able to ban a peer. -#[derive(Clone)] -pub struct PeerHandle { - token: CancellationToken, - ban: mpsc::Sender, -} - -impl PeerHandle { - pub fn ban_peer(&mut self, duration: Duration) { - // This channel won't be dropped and if it's full the peer has already been banned. - let _ = self.ban.try_send(BanPeer(duration)); - self.token.cancel() - } -} diff --git a/p2p/monero-p2p/src/lib.rs b/p2p/monero-p2p/src/lib.rs index 8eb309b..0105e7e 100644 --- a/p2p/monero-p2p/src/lib.rs +++ b/p2p/monero-p2p/src/lib.rs @@ -74,6 +74,9 @@ pub trait NetZoneAddress: /// TODO: IP zone banning? type BanID: Debug + Hash + Eq + Clone + Copy + Send + 'static; + /// Changes the port of this address to `port`. + fn set_port(&mut self, port: u16); + fn ban_id(&self) -> Self::BanID; fn should_add_to_peer_list(&self) -> bool; diff --git a/p2p/monero-p2p/src/network_zones/clear.rs b/p2p/monero-p2p/src/network_zones/clear.rs index 4086b48..7c3c599 100644 --- a/p2p/monero-p2p/src/network_zones/clear.rs +++ b/p2p/monero-p2p/src/network_zones/clear.rs @@ -16,6 +16,10 @@ use crate::{NetZoneAddress, NetworkZone}; impl NetZoneAddress for SocketAddr { type BanID = IpAddr; + fn set_port(&mut self, port: u16) { + SocketAddr::set_port(self, port) + } + fn ban_id(&self) -> Self::BanID { self.ip() } diff --git a/p2p/monero-p2p/src/services.rs b/p2p/monero-p2p/src/services.rs index d4035ff..6c6df6c 100644 --- a/p2p/monero-p2p/src/services.rs +++ b/p2p/monero-p2p/src/services.rs @@ -69,34 +69,41 @@ impl TryFrom } pub enum AddressBookRequest { + /// Tells the address book that we have connected or received a connection from a peer. NewConnection { - addr: Option, + /// The [`InternalPeerID`] of this connection. internal_peer_id: InternalPeerID, + /// The public address of the peer, if this peer has a reachable public address. + public_address: Option, + /// The [`ConnectionHandle`] to this peer. handle: ConnectionHandle, + /// An ID the peer assigned itself. id: u64, + /// The peers [`PruningSeed`]. pruning_seed: PruningSeed, - /// The peers port. + /// The peers rpc port. rpc_port: u16, /// The peers rpc credits per hash rpc_credits_per_hash: u32, }, - /// Bans a peer for the specified duration. This request - /// will send disconnect signals to all peers with the same - /// address. - BanPeer(Z::Addr, std::time::Duration), + /// Tells the address book about a peer list received from a peer. IncomingPeerList(Vec>), - /// Gets a random white peer from the peer list. If height is specified + /// Takes a random white peer from the peer list. If height is specified /// then the peer list should retrieve a peer that should have a full /// block at that height according to it's pruning seed - GetRandomWhitePeer { - height: Option, - }, - /// Gets a random gray peer from the peer list. If height is specified + TakeRandomWhitePeer { height: Option }, + /// Takes a random gray peer from the peer list. If height is specified /// then the peer list should retrieve a peer that should have a full /// block at that height according to it's pruning seed - GetRandomGrayPeer { - height: Option, - }, + TakeRandomGrayPeer { height: Option }, + /// Takes a random peer from the peer list. If height is specified + /// then the peer list should retrieve a peer that should have a full + /// block at that height according to it's pruning seed. + /// + /// The address book will look in the white peer list first, then the gray + /// one if no peer is found. + TakeRandomPeer { height: Option }, + /// Gets the specified number of white peers, or less if we don't have enough. GetWhitePeers(usize), } diff --git a/p2p/monero-p2p/tests/handles.rs b/p2p/monero-p2p/tests/handles.rs index be62db6..6766f78 100644 --- a/p2p/monero-p2p/tests/handles.rs +++ b/p2p/monero-p2p/tests/handles.rs @@ -7,11 +7,11 @@ use monero_p2p::handles::HandleBuilder; #[test] fn send_ban_signal() { let semaphore = Arc::new(Semaphore::new(5)); - let (guard, mut connection_handle, mut peer_handle) = HandleBuilder::default() + let (guard, mut connection_handle) = HandleBuilder::default() .with_permit(semaphore.try_acquire_owned().unwrap()) .build(); - peer_handle.ban_peer(Duration::from_secs(300)); + connection_handle.ban_peer(Duration::from_secs(300)); let Some(ban_time) = connection_handle.check_should_ban() else { panic!("ban signal not received!"); @@ -29,13 +29,13 @@ fn send_ban_signal() { #[test] fn multiple_ban_signals() { let semaphore = Arc::new(Semaphore::new(5)); - let (guard, mut connection_handle, mut peer_handle) = HandleBuilder::default() + let (guard, mut connection_handle) = HandleBuilder::default() .with_permit(semaphore.try_acquire_owned().unwrap()) .build(); - peer_handle.ban_peer(Duration::from_secs(300)); - peer_handle.ban_peer(Duration::from_secs(301)); - peer_handle.ban_peer(Duration::from_secs(302)); + connection_handle.ban_peer(Duration::from_secs(300)); + connection_handle.ban_peer(Duration::from_secs(301)); + connection_handle.ban_peer(Duration::from_secs(302)); let Some(ban_time) = connection_handle.check_should_ban() else { panic!("ban signal not received!"); @@ -54,7 +54,7 @@ fn multiple_ban_signals() { #[test] fn dropped_guard_sends_disconnect_signal() { let semaphore = Arc::new(Semaphore::new(5)); - let (guard, connection_handle, _) = HandleBuilder::default() + let (guard, connection_handle) = HandleBuilder::default() .with_permit(semaphore.try_acquire_owned().unwrap()) .build(); diff --git a/test-utils/src/test_netzone.rs b/test-utils/src/test_netzone.rs index 709d556..b38ef96 100644 --- a/test-utils/src/test_netzone.rs +++ b/test-utils/src/test_netzone.rs @@ -29,6 +29,8 @@ pub struct TestNetZoneAddr(pub u32); impl NetZoneAddress for TestNetZoneAddr { type BanID = Self; + fn set_port(&mut self, _: u16) {} + fn ban_id(&self) -> Self::BanID { *self }