first draft

This commit is contained in:
SyntheticBird45 2024-11-10 17:05:49 +01:00
parent 525e20e841
commit b3b6eafcc7
Signed by untrusted user who does not match committer: SyntheticBird
GPG key ID: C76973F748521E11
16 changed files with 341 additions and 90 deletions

4
Cargo.lock generated
View file

@ -550,11 +550,11 @@ version = "0.1.0"
dependencies = [
"borsh",
"cuprate-constants",
"cuprate-p2p-bucket",
"cuprate-p2p-core",
"cuprate-pruning",
"cuprate-test-utils",
"futures",
"indexmap",
"rand",
"thiserror",
"tokio",
@ -871,6 +871,7 @@ dependencies = [
"borsh",
"cfg-if",
"cuprate-helper",
"cuprate-p2p-bucket",
"cuprate-pruning",
"cuprate-test-utils",
"cuprate-types",
@ -936,6 +937,7 @@ dependencies = [
"async-trait",
"borsh",
"cuprate-helper",
"cuprate-p2p-bucket",
"cuprate-p2p-core",
"cuprate-types",
"cuprate-wire",

View file

@ -65,7 +65,7 @@ cuprate-levin = { path = "net/levin" ,default-feature
cuprate-wire = { path = "net/wire" ,default-features = false}
cuprate-p2p = { path = "p2p/p2p" ,default-features = false}
cuprate-p2p-core = { path = "p2p/p2p-core" ,default-features = false}
cuprate-p2p-bucket = { path = "p2p/p2p-bucket" ,default-features = false}
cuprate-p2p-bucket = { path = "p2p/bucket" ,default-features = false}
cuprate-dandelion-tower = { path = "p2p/dandelion-tower" ,default-features = false}
cuprate-async-buffer = { path = "p2p/async-buffer" ,default-features = false}
cuprate-address-book = { path = "p2p/address-book" ,default-features = false}

View file

@ -10,6 +10,7 @@ authors = ["Boog900"]
cuprate-constants = { workspace = true }
cuprate-pruning = { workspace = true }
cuprate-p2p-core = { workspace = true, features = ["borsh"] }
cuprate-p2p-bucket = { workspace = true }
tower = { workspace = true, features = ["util"] }
tokio = { workspace = true, features = ["time", "fs", "rt"]}
@ -19,7 +20,7 @@ futures = { workspace = true, features = ["std"] }
thiserror = { workspace = true }
tracing = { workspace = true, features = ["std", "attributes"] }
indexmap = { workspace = true, features = ["std"] }
#indexmap = { workspace = true, features = ["std"] }
rand = { workspace = true, features = ["std", "std_rng"] }

View file

@ -357,6 +357,13 @@ impl<Z: BorshNetworkZone> AddressBook<Z> {
self.connected_peers.insert(internal_peer_id, peer);
Ok(())
}
fn add_white_peer(
&mut self,
white_peer: ZoneSpecificPeerListEntryBase<Z::Addr>
) {
self.white_list.add_new_peer(white_peer);
}
}
impl<Z: BorshNetworkZone> Service<AddressBookRequest<Z>> for AddressBook<Z> {
@ -401,6 +408,10 @@ impl<Z: BorshNetworkZone> Service<AddressBookRequest<Z>> for AddressBook<Z> {
self.handle_incoming_peer_list(peer_list);
Ok(AddressBookResponse::Ok)
}
AddressBookRequest::AddWhitePeer(white_peer) =>{
self.add_white_peer(white_peer);
Ok(AddressBookResponse::Ok)
}
AddressBookRequest::TakeRandomWhitePeer { height } => self
.take_random_white_peer(height)
.map(AddressBookResponse::Peer)

View file

@ -1,11 +1,11 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use indexmap::IndexMap;
use rand::prelude::*;
use cuprate_constants::block::MAX_BLOCK_HEIGHT_USIZE;
use cuprate_p2p_core::{services::ZoneSpecificPeerListEntryBase, NetZoneAddress, NetworkZone};
use cuprate_pruning::PruningSeed;
use cuprate_p2p_bucket::BucketMap;
#[cfg(test)]
pub(crate) mod tests;
@ -16,7 +16,7 @@ pub(crate) mod tests;
#[derive(Debug)]
pub(crate) struct PeerList<Z: NetworkZone> {
/// The peers with their peer data.
pub peers: IndexMap<Z::Addr, ZoneSpecificPeerListEntryBase<Z::Addr>>,
pub peers: BucketMap<ZoneSpecificPeerListEntryBase<Z::Addr>, 8>,
/// An index of Pruning seed to address, so can quickly grab peers with the blocks
/// we want.
///
@ -33,7 +33,7 @@ pub(crate) struct PeerList<Z: NetworkZone> {
impl<Z: NetworkZone> PeerList<Z> {
/// Creates a new peer list.
pub(crate) fn new(list: Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>) -> Self {
let mut peers = IndexMap::with_capacity(list.len());
let mut peers = BucketMap::<_, 8>::new();
let mut pruning_seeds = BTreeMap::new();
let mut ban_ids = HashMap::with_capacity(list.len());
@ -48,7 +48,7 @@ impl<Z: NetworkZone> PeerList<Z> {
.or_insert_with(Vec::new)
.push(peer.adr);
peers.insert(peer.adr, peer);
peers.push(peer);
}
Self {
peers,
@ -64,7 +64,7 @@ impl<Z: NetworkZone> PeerList<Z> {
/// Adds a new peer to the peer list
pub(crate) fn add_new_peer(&mut self, peer: ZoneSpecificPeerListEntryBase<Z::Addr>) {
if self.peers.insert(peer.adr, peer).is_none() {
if self.peers.push(peer).is_none() {
#[expect(clippy::unwrap_or_default, reason = "It's more clear with this")]
self.pruning_seeds
.entry(peer.pruning_seed)
@ -94,7 +94,14 @@ impl<Z: NetworkZone> PeerList<Z> {
// Take a random peer and see if it's in the list of must_keep_peers, if it is try again.
// TODO: improve this
// Return early if no items
if self.peers.is_empty() {
return None
}
for _ in 0..3 {
// In case we require a specific height
if let Some(needed_height) = block_needed {
let (_, addresses_with_block) = self.pruning_seeds.iter().find(|(seed, _)| {
// TODO: factor in peer blockchain height?
@ -109,18 +116,15 @@ impl<Z: NetworkZone> PeerList<Z> {
}
return self.remove_peer(&peer);
}
let len = self.len();
if len == 0 {
return None;
}
let n = r.gen_range(0..len);
let (&key, _) = self.peers.get_index(n).unwrap();
if !must_keep_peers.contains(&key) {
return self.remove_peer(&key);
}
// Straightforward if we don't need a specific height.
let random_peer = self.peers.get_random(r);
if let Some(peer) = random_peer {
if !must_keep_peers.contains(&peer.adr) {
return self.remove_peer(&peer.adr.clone())
}
}
}
@ -132,26 +136,22 @@ impl<Z: NetworkZone> PeerList<Z> {
r: &mut R,
len: usize,
) -> Vec<ZoneSpecificPeerListEntryBase<Z::Addr>> {
let mut peers = self.peers.values().copied().choose_multiple(r, len);
// Order of the returned peers is not random, I am unsure of the impact of this, potentially allowing someone to make guesses about which peers
// were connected first.
// So to mitigate this shuffle the result.
peers.shuffle(r);
peers.drain(len.min(peers.len())..peers.len());
peers
(0..len)
.filter_map(|_| self.peers.get_random(r).copied())
.collect::<Vec<_>>()
}
/// Returns a mutable reference to a peer.
pub(crate) fn get_peer_mut(
&mut self,
peer: &Z::Addr,
peer_addr: &Z::Addr,
) -> Option<&mut ZoneSpecificPeerListEntryBase<Z::Addr>> {
self.peers.get_mut(peer)
self.peers.get_mut(peer_addr)
}
/// Returns true if the list contains this peer.
pub(crate) fn contains_peer(&self, peer: &Z::Addr) -> bool {
self.peers.contains_key(peer)
self.peers.contains(peer)
}
/// Removes a peer from the pruning idx
@ -197,7 +197,7 @@ impl<Z: NetworkZone> PeerList<Z> {
&mut self,
peer: &Z::Addr,
) -> Option<ZoneSpecificPeerListEntryBase<Z::Addr>> {
let peer_eb = self.peers.swap_remove(peer)?;
let peer_eb = self.peers.remove(peer)?;
self.remove_peer_from_all_idxs(&peer_eb);
Some(peer_eb)
}
@ -226,13 +226,13 @@ impl<Z: NetworkZone> PeerList<Z> {
let target_removed = self.len() - new_len;
let mut removed_count = 0;
let mut peers_to_remove: Vec<Z::Addr> = Vec::with_capacity(target_removed);
for peer_adr in self.peers.keys() {
if removed_count >= target_removed {
break;
}
if !must_keep_peers.contains(peer_adr) {
peers_to_remove.push(*peer_adr);
if !must_keep_peers.contains(&peer_adr) {
peers_to_remove.push(peer_adr);
removed_count += 1;
}
}

View file

@ -68,7 +68,7 @@ fn peer_list_reduce_length() {
#[test]
fn peer_list_reduce_length_with_peers_we_need() {
let mut peer_list = make_fake_peer_list(0, 500);
let must_keep_peers = peer_list.peers.keys().copied().collect::<HashSet<_>>();
let must_keep_peers = peer_list.peers.keys().collect::<HashSet<_>>();
let target_len = 49;
@ -93,7 +93,7 @@ fn peer_list_remove_specific_peer() {
addrs.iter().for_each(|adr| assert_ne!(adr, &peer.adr));
}
assert!(!peers.contains_key(&peer.adr));
assert!(!peers.contains(&peer.adr));
}
#[test]
@ -170,7 +170,7 @@ fn peer_list_ban_peers() {
let ban_id = peer.adr.ban_id();
assert_eq!(peer_list.ban_ids.get(&ban_id), None);
for (addr, _) in peer_list.peers {
for addr in peer_list.peers.keys() {
assert_ne!(addr.ban_id(), ban_id);
}
}

View file

@ -86,11 +86,11 @@ mod tests {
assert_eq!(gray_list.peers.len(), gray_list_2.peers.len());
for addr in white_list.peers.keys() {
assert!(white_list_2.contains_peer(addr));
assert!(white_list_2.contains_peer(&addr));
}
for addr in gray_list.peers.keys() {
assert!(gray_list_2.contains_peer(addr));
assert!(gray_list_2.contains_peer(&addr));
}
}
}

View file

@ -1,10 +1,10 @@
//! Bucket data structure
//! `BucketMap` data structure
//!
//! A collection data structure that discriminates its unique items and place them into "buckets".
//!
//! The item must implement the [`Bucketable`] trait that defines how to create the discriminant
//! from the item type. The data structure will internally contain any item into "buckets" or vectors
//! of sized capacity `N` that regroup all the stored items with this specific discriminant.
//! from the item type and its discriminated field. The data structure will internally contain any item into
//! "buckets" or vectors of sized capacity `N` that regroup all the stored items with this specific discriminant.
//!
//! A practical example of this data structure is for storing `N` amount of IP discriminated by their subnets.
//! You can store in each "buckets" corresponding to a `/16` subnet up to `N` IPs of that subnet.
@ -12,11 +12,11 @@
//! # Example
//!
//! ```
//! use cuprate_p2p_bucket::Bucket;
//! use cuprate_p2p_bucket::BucketMap;
//! use std::net::Ipv4Addr;
//!
//! // Create a new bucket that can store at most 2 IPs in a particular `/16` subnet.
//! let mut bucket = Bucket::<2,Ipv4Addr>::new();
//! let mut bucket = BucketMap::<2,Ipv4Addr>::new();
//!
//! // Fulfill the `96.96.0.0/16` bucket.
//! bucket.push("96.96.0.1".parse().unwrap());
@ -39,28 +39,47 @@
//! ```
use arrayvec::{ArrayVec, CapacityError};
use rand::random;
use rand::Rng;
use std::{collections::BTreeMap, net::Ipv4Addr};
use std::{
collections::BTreeMap,
fmt::Debug,
net::{IpAddr, SocketAddr}
};
/// A discriminant that can be computed from the type.
/// This trait defines an item key and discriminant in
/// a `BucketMap` and method to compute the discriminant
/// from the key
pub trait Bucketable: Sized + Eq + Clone {
/// The type of the discriminant being used in the Binary tree.
type Discriminant: Ord + AsRef<[u8]>;
/// The type of the key being used to lookup for a specific element.
/// It is also the type from which the discriminant is computed.
type Key: Sized + Eq + Clone;
/// Method that can compute the discriminant from the item.
fn discriminant(&self) -> Self::Discriminant;
/// The type of discriminant used to discriminate items together.
type Discriminant: Ord + Debug + Send;
/// Method that give return the key from the item.
fn key(&self) -> Self::Key;
/// Method for computing a discriminant from the key of the item.
fn compute_discriminant(key: &Self::Key) -> Self::Discriminant;
/// Method that can compute the discriminant directly from the item.
fn discriminant(&self) -> Self::Discriminant {
Self::compute_discriminant(&self.key())
}
}
/// A collection data structure discriminating its unique items
/// with a specified method. Limiting the amount of items stored
/// with that discriminant to the const `N`.
pub struct Bucket<const N: usize, I: Bucketable> {
#[derive(Debug)]
pub struct BucketMap<I: Bucketable, const N: usize> {
/// The storage of the bucket
storage: BTreeMap<I::Discriminant, ArrayVec<I, N>>,
}
impl<const N: usize, I: Bucketable> Bucket<N, I> {
impl<I: Bucketable, const N: usize> BucketMap<I, N> {
/// Create a new Bucket
pub const fn new() -> Self {
Self {
@ -81,10 +100,10 @@ impl<const N: usize, I: Bucketable> Bucket<N, I> {
/// # Example
///
/// ```
/// use cuprate_p2p_bucket::Bucket;
/// use cuprate_p2p_bucket::BucketMap;
/// use std::net::Ipv4Addr;
///
/// let mut bucket = Bucket::<8,Ipv4Addr>::new();
/// let mut bucket = BucketMap::<8,Ipv4Addr>::new();
///
/// // Push a first IP address.
/// bucket.push("127.0.0.1".parse().unwrap());
@ -111,18 +130,64 @@ impl<const N: usize, I: Bucketable> Bucket<N, I> {
None
}
/// Will attempt to remove an item from the bucket.
pub fn remove(&mut self, item: &I) -> Option<I> {
self.storage.get_mut(&item.discriminant()).and_then(|vec| {
/// Get a reference to an item stored in the `BucketMap` with
/// the specified item key.
pub fn get(&self, key: &I::Key) -> Option<&I> {
self.storage.get(&I::compute_discriminant(key)).and_then(|vec| {
vec.iter()
.enumerate()
.find_map(|(i, v)| (item == v).then_some(i))
.find_map(|(i, v)| (*key == v.key()).then_some(i))
.and_then(|index| vec.get(index))
})
}
/// Get a mutable reference to an item stored in the `BucketMap`
/// with the specified item key.
pub fn get_mut(&mut self, key: &I::Key) -> Option<&mut I> {
self.storage.get_mut(&I::compute_discriminant(key)).and_then(|vec| {
vec.iter()
.enumerate()
.find_map(|(i, v)| (*key == v.key()).then_some(i))
.and_then(|index| vec.get_mut(index))
})
}
/// Return a reference to an item chosen at random.
///
/// Repeated use of this function will provide a normal distribution of
/// items based on their discriminants.
pub fn get_random<R: Rng>(&self, r: &mut R) -> Option<&I> {
// Get the total amount of discriminants to explore.
let len = self.storage.len();
// Get a random bucket.
let (_, vec) = self.storage.iter().nth(r.r#gen::<usize>() / len).unwrap();
// Return a reference chose at random.
vec.get(r.r#gen::<usize>() / vec.len())
}
/// Will attempt to remove an item from the bucket.
///
/// Internally use `swap_remove` which breaks the order in which
/// elements in a bucket were inserted.
pub fn remove(&mut self, key: &I::Key) -> Option<I> {
self.storage.get_mut(&I::compute_discriminant(key)).and_then(|vec| {
vec.iter()
.enumerate()
.find_map(|(i, v)| (*key == v.key()).then_some(i))
.map(|index| vec.swap_remove(index))
})
}
/// Return `true` if the `BucketMap` contains an item
/// with the key `key`. Return `false` otherwise.
pub fn contains(&self, key: &I::Key) -> bool {
self.get(key).is_some()
}
/// Return the number of item stored within the storage
/// Return the number of item stored within the storage.
pub fn len(&self) -> usize {
self.storage.values().map(ArrayVec::len).sum()
}
@ -135,38 +200,82 @@ impl<const N: usize, I: Bucketable> Bucket<N, I> {
self.storage.get(discriminant).map(ArrayVec::len)
}
/// Return `true` if the storage contains no items
/// Return `true` if the storage contains no items.
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Return a reference to an item chosen at random.
///
/// Repeated use of this function will provide a normal distribution of
/// items based on their discriminants.
pub fn get_random(&mut self) -> Option<&I> {
// Get the total amount of discriminants to explore.
let len = self.storage.len();
// Get a random bucket.
let (_, vec) = self.storage.iter().nth(random::<usize>() / len).unwrap();
// Return a reference chose at random.
vec.get(random::<usize>() / vec.len())
/// Return an iterator of the keys of all items contained in
/// the `BucketMap`.
pub fn keys(&self) -> impl Iterator<Item = I::Key> + '_ {
self.storage
.iter()
.flat_map(|bucket|
bucket.1
.iter()
.map(I::key)
)
}
/// Return an iterator of all items contained in
/// the `BucketMap`.
pub fn values(&self) -> impl Iterator<Item = &I> + '_ {
self.storage
.iter()
.flat_map(|bucket|
bucket.1
.iter()
)
}
}
impl<const N: usize, I: Bucketable> Default for Bucket<N, I> {
impl<I: Bucketable, const N: usize> Default for BucketMap<I, N> {
fn default() -> Self {
Self::new()
}
}
impl Bucketable for Ipv4Addr {
/// We are discriminating by `/16` subnets.
type Discriminant = [u8; 2];
/// A discriminant type for `SocketAddr`.
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord)]
pub enum IpSubnet {
/// The discriminant for IPv4 subnets is the 16 bytes prefix (therefore discriminating `/16` subnets)
V4([u8; 2]),
/// The equivalent of a `/16` IPv4 subnet for IPv6 is a `/32` prefix
V6([u8; 4])
}
fn discriminant(&self) -> Self::Discriminant {
[self.octets()[0], self.octets()[1]]
impl Bucketable for SocketAddr {
/// The key is the IP address itself.
type Key = IpAddr;
/// The discriminant is either a truncated `/16` IPv4 or a `/32` IPv6 prefix.
type Discriminant = IpSubnet;
#[inline]
fn key(&self) -> Self::Key {
self.ip()
}
#[inline]
fn compute_discriminant(key: &Self::Key) -> Self::Discriminant {
match key {
IpAddr::V4(ip_addr_v4) => {
let bytes = ip_addr_v4.octets();
IpSubnet::V4([bytes[0],bytes[1]])
},
IpAddr::V6(ip_addr_v6) => {
let bytes = ip_addr_v6.octets();
IpSubnet::V6([bytes[0],bytes[1],bytes[2],bytes[3]])
}
}
}
}
#[cfg(test)]
mod test {
}

View file

@ -10,10 +10,11 @@ default = ["borsh"]
borsh = ["dep:borsh", "cuprate-pruning/borsh"]
[dependencies]
cuprate-helper = { workspace = true, features = ["asynch"], default-features = false }
cuprate-wire = { workspace = true, features = ["tracing"] }
cuprate-pruning = { workspace = true }
cuprate-types = { workspace = true }
cuprate-helper = { workspace = true, features = ["asynch"], default-features = false }
cuprate-wire = { workspace = true, features = ["tracing"] }
cuprate-pruning = { workspace = true }
cuprate-types = { workspace = true }
cuprate-p2p-bucket = { workspace = true }
tokio = { workspace = true, features = ["net", "sync", "macros", "time", "rt", "rt-multi-thread"]}
tokio-util = { workspace = true, features = ["codec"] }

View file

@ -102,7 +102,9 @@ impl<N: NetworkZone> Service<AddressBookRequest<N>> for DummyAddressBook {
| AddressBookRequest::TakeRandomWhitePeer { .. } => {
return ready(Err("dummy address book does not hold peers".into()));
}
AddressBookRequest::NewConnection { .. } | AddressBookRequest::IncomingPeerList(_) => {
AddressBookRequest::NewConnection { .. }
| AddressBookRequest::AddWhitePeer(_)
| AddressBookRequest::IncomingPeerList(_) => {
AddressBookResponse::Ok
}
AddressBookRequest::GetBan(_) => AddressBookResponse::GetBan {

View file

@ -68,6 +68,7 @@ cfg_if::cfg_if! {
use std::{fmt::Debug, hash::Hash};
use cuprate_p2p_bucket::Bucketable;
use futures::{Sink, Stream};
use cuprate_wire::{
@ -105,6 +106,7 @@ pub enum ConnectionDirection {
pub trait NetZoneAddress:
TryFrom<NetworkAddress, Error = NetworkAddressIncorrectZone>
+ Into<NetworkAddress>
+ Bucketable
+ std::fmt::Display
+ Hash
+ Eq

View file

@ -1,5 +1,6 @@
use std::time::Instant;
use cuprate_p2p_bucket::Bucketable;
use cuprate_pruning::{PruningError, PruningSeed};
use cuprate_wire::{CoreSyncData, PeerListEntryBase};
@ -34,6 +35,23 @@ pub struct ZoneSpecificPeerListEntryBase<A: NetZoneAddress> {
pub rpc_credits_per_hash: u32,
}
impl<A: NetZoneAddress> Bucketable for ZoneSpecificPeerListEntryBase<A>
{
type Key = A;
type Discriminant = A::Discriminant;
#[inline]
fn key(&self) -> Self::Key {
self.adr
}
#[inline]
fn compute_discriminant(key: &Self::Key) -> Self::Discriminant {
A::compute_discriminant(&key.key())
}
}
impl<A: NetZoneAddress> From<ZoneSpecificPeerListEntryBase<A>> for PeerListEntryBase {
fn from(value: ZoneSpecificPeerListEntryBase<A>) -> Self {
Self {
@ -93,6 +111,9 @@ pub enum AddressBookRequest<Z: NetworkZone> {
/// Tells the address book about a peer list received from a peer.
IncomingPeerList(Vec<ZoneSpecificPeerListEntryBase<Z::Addr>>),
/// Add a new specific white peer to the peer list.
AddWhitePeer(ZoneSpecificPeerListEntryBase<Z::Addr>),
/// Takes a random white peer from the peer list. If height is specified
/// then the peer list should retrieve a peer that should have a full
@ -141,6 +162,7 @@ pub enum AddressBookResponse<Z: NetworkZone> {
/// Response to:
/// - [`AddressBookRequest::NewConnection`]
/// - [`AddressBookRequest::IncomingPeerList`]
/// - [`AddressBookRequest::AddWhitePeer`]
Ok,
/// Response to:

View file

@ -2,9 +2,10 @@
//!
//! This crate contains a [`NetworkInterface`] which allows interacting with the Monero P2P network on
//! a certain [`NetworkZone`]
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use futures::FutureExt;
use peer_pinger::PeerPinger;
use tokio::{sync::mpsc, task::JoinSet};
use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt};
use tracing::{instrument, Instrument, Span};
@ -23,6 +24,7 @@ pub mod config;
pub mod connection_maintainer;
pub mod constants;
mod inbound_server;
mod peer_pinger;
use block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse};
pub use broadcast::{BroadcastRequest, BroadcastSvc};
@ -54,7 +56,7 @@ where
cuprate_address_book::init_address_book(config.address_book_config.clone()).await?;
let address_book = Buffer::new(
address_book,
config.max_inbound_connections + config.outbound_connections,
config.max_inbound_connections + config.outbound_connections + 1,
);
// Use the default config. Changing the defaults affects tx fluff times, which could affect D++ so for now don't allow changing
@ -95,6 +97,12 @@ where
address_book.clone(),
outbound_connector,
);
// TODO: Export duration into P2PConfig ?
let peer_pinger = PeerPinger::new(
address_book.clone(),
Duration::from_secs(30)
);
let mut background_tasks = JoinSet::new();
@ -103,6 +111,11 @@ where
.run()
.instrument(Span::current()),
);
background_tasks.spawn(
peer_pinger
.run()
.instrument(Span::current()),
);
background_tasks.spawn(
inbound_server::inbound_server(
Arc::clone(&client_pool),

View file

@ -0,0 +1,72 @@
//! Peer Pinger.
//!
//! This module handles the connectivity check to the peers contain in our address book.
//! It make sure to periodically remove any peers that are unreachable.
use std::{marker::PhantomData, time::Duration};
use cuprate_p2p_core::{client::handshaker::ping, services::{AddressBookRequest, AddressBookResponse}, AddressBook, NetworkZone};
use tokio::time::{sleep, timeout};
use tracing::warn;
use crate::constants::PING_REQUEST_TIMEOUT;
pub(crate) struct PeerPinger<N: NetworkZone, A: AddressBook<N>> {
address_book_svc: A,
delay_btw_ping: Duration,
_network: PhantomData<N>
}
impl<
N: NetworkZone,
A: AddressBook<N>
> PeerPinger<N,A> {
pub(crate) const fn new(address_book_svc: A, delay_btw_ping: Duration) -> Self {
Self {
address_book_svc,
delay_btw_ping,
_network: PhantomData,
}
}
/// Will ping a random white and gray peer every `self.delay_btw_ping`.
/// Only replace the peer back in the list if it has been reached.
pub(crate) async fn run(mut self) {
loop {
sleep(self.delay_btw_ping).await;
// First ping a white peer
let Ok(AddressBookResponse::Peer(peer)) = self.address_book_svc.call(AddressBookRequest::TakeRandomWhitePeer { height: None }).await else {
warn!("AddressBook unavailable.");
return
};
let response = timeout(PING_REQUEST_TIMEOUT, ping::<N>(peer.adr)).await;
if let Ok(Ok(peer_id)) = response {
if peer_id == peer.id {
let Ok(AddressBookResponse::Ok) = self.address_book_svc.call(AddressBookRequest::AddWhitePeer(peer)).await else {
warn!("AddressBook unavailable.");
return
};
}
}
// Then ping a gray peer
let Ok(AddressBookResponse::Peer(peer)) = self.address_book_svc.call(AddressBookRequest::TakeRandomGrayPeer { height: None }).await else {
warn!("AddressBook unavailable.");
return
};
let response = timeout(PING_REQUEST_TIMEOUT, ping::<N>(peer.adr)).await;
if let Ok(Ok(peer_id)) = response {
if peer_id == peer.id {
let Ok(AddressBookResponse::Ok) = self.address_book_svc.call(AddressBookRequest::IncomingPeerList(vec![peer])).await else {
warn!("AddressBook unavailable.");
return
};
}
}
}
}
}

View file

@ -10,6 +10,7 @@ cuprate-types = { workspace = true }
cuprate-helper = { workspace = true, features = ["map", "tx"] }
cuprate-wire = { workspace = true }
cuprate-p2p-core = { workspace = true, features = ["borsh"] }
cuprate-p2p-bucket = { workspace = true }
hex = { workspace = true }
hex-literal = { workspace = true }

View file

@ -11,6 +11,7 @@ use std::{
};
use borsh::{BorshDeserialize, BorshSerialize};
use cuprate_p2p_bucket::Bucketable;
use futures::Stream;
use tokio::io::{DuplexStream, ReadHalf, WriteHalf};
use tokio_util::codec::{FramedRead, FramedWrite};
@ -23,9 +24,23 @@ use cuprate_wire::{
use cuprate_p2p_core::{NetZoneAddress, NetworkZone};
/// An address on the test network
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, BorshSerialize, BorshDeserialize)]
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, PartialOrd, Ord, BorshSerialize, BorshDeserialize)]
pub struct TestNetZoneAddr(pub u32);
impl Bucketable for TestNetZoneAddr {
type Key = Self;
type Discriminant = Self;
fn key(&self) -> Self::Key {
*self
}
fn compute_discriminant(key: &Self::Key) -> Self::Discriminant {
*key
}
}
impl NetZoneAddress for TestNetZoneAddr {
type BanID = Self;