From c0fa56ab0cd11bb9d8b25dbd4b96c7fd5b039553 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Tue, 25 Jul 2023 19:40:00 +0100 Subject: [PATCH] remove peer set for now --- net/monero-wire/Cargo.toml | 2 +- p2p/src/lib.rs | 3 +- p2p/src/peer_set.rs | 7 - p2p/src/peer_set/initialize.rs | 94 ---- p2p/src/peer_set/services.rs | 9 - .../peer_set/services/block_broadcaster.rs | 116 ---- p2p/src/peer_set/services/block_download.rs | 173 ------ p2p/src/peer_set/set.rs | 516 ------------------ p2p/src/peer_set/unready_service.rs | 92 ---- 9 files changed, 2 insertions(+), 1010 deletions(-) delete mode 100644 p2p/src/peer_set.rs delete mode 100644 p2p/src/peer_set/initialize.rs delete mode 100644 p2p/src/peer_set/services.rs delete mode 100644 p2p/src/peer_set/services/block_broadcaster.rs delete mode 100644 p2p/src/peer_set/services/block_download.rs delete mode 100644 p2p/src/peer_set/set.rs delete mode 100644 p2p/src/peer_set/unready_service.rs diff --git a/net/monero-wire/Cargo.toml b/net/monero-wire/Cargo.toml index f908f68..059012b 100644 --- a/net/monero-wire/Cargo.toml +++ b/net/monero-wire/Cargo.toml @@ -9,7 +9,7 @@ repository = "https://github.com/SyntheticBird45/cuprate/tree/main/net/monero-wi [dependencies] levin-cuprate = {path="../levin"} -epee-encoding = { git = "https://github.com/boog900/epee-encoding", rev = "b774bf7"} +epee-encoding = { version = "0.5.0"} [dev-dependencies] hex = "0.4.3" diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index b09589f..292d91f 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -1,10 +1,9 @@ pub mod address_book; pub mod config; -mod connection_handle; pub mod connection_counter; +mod connection_handle; mod constants; pub mod peer; -pub mod peer_set; mod protocol; pub use config::Config; diff --git a/p2p/src/peer_set.rs b/p2p/src/peer_set.rs deleted file mode 100644 index 8563dcc..0000000 --- a/p2p/src/peer_set.rs +++ /dev/null @@ -1,7 +0,0 @@ -pub mod initialize; -pub mod services; -pub mod set; - -mod unready_service; - -use unready_service::UnreadyService; diff --git a/p2p/src/peer_set/initialize.rs b/p2p/src/peer_set/initialize.rs deleted file mode 100644 index 6bbc785..0000000 --- a/p2p/src/peer_set/initialize.rs +++ /dev/null @@ -1,94 +0,0 @@ -use futures::TryStreamExt; -use futures::{future, StreamExt}; -use tower::buffer::Buffer; -use tower::discover::Change; -use tower::util::BoxService; -use tower::{BoxError, Layer, Service}; - -use monero_wire::NetworkAddress; - -use crate::address_book::{start_address_book, AddressBookRequest, AddressBookResponse}; -use crate::constants; -use crate::protocol::{ - CoreSyncDataRequest, CoreSyncDataResponse, InternalMessageRequest, InternalMessageResponse, -}; -use crate::{peer, Config, NetZoneBasicNodeData, P2PStore}; - -use super::set::{MorePeers, PeerSet}; - -type DiscoveredPeer = Result<(NetworkAddress, peer::Client), BoxError>; -/* -pub async fn init( - config: Config, - inbound_service: Svc, - core_sync_svc: CoreSync, - mut p2p_store: P2PS, -) -> Result< - Buffer, AddressBookRequest>, - BoxError, -> -where - Svc: Service - + Clone - + Send - + 'static, - Svc::Future: Send, - - CoreSync: Service - + Clone - + Send - + 'static, - CoreSync::Future: Send, - - P2PS: P2PStore, -{ - let basic_node_data: NetZoneBasicNodeData = match p2p_store.basic_node_data().await? { - Some(bnd) => bnd, - None => { - let node_id = crate::NodeID::generate(); - let bnd = NetZoneBasicNodeData::new(&config, &node_id); - p2p_store.save_basic_node_data(&bnd).await?; - bnd - } - }; - let address_book = Buffer::new( - BoxService::new(start_address_book(p2p_store, config).await?), - constants::ADDRESS_BOOK_BUFFER_SIZE, - ); - - let outbound_connector = { - use tower::timeout::TimeoutLayer; - let hs_timeout = TimeoutLayer::new(constants::HANDSHAKE_TIMEOUT); - let hs = peer::Handshaker::new( - basic_node_data, - config.network(), - address_book.clone(), - core_sync_svc, - inbound_service, - ); - hs_timeout.layer(hs) - }; - - let (peerset_tx, peerset_rx) = - futures::channel::mpsc::channel::(config.peerset_total_connection_limit()); - - let discovered_peers = peerset_rx - // Discover interprets an error as stream termination, - // so discard any errored connections... - .filter(|result| future::ready(result.is_ok())) - .map_ok(|(address, client)| Change::Insert(address, client.into())); - - // Create an mpsc channel for peerset demand signaling, - // based on the maximum number of outbound peers. - let (mut demand_tx, demand_rx) = - futures::channel::mpsc::channel::(config.peerset_total_connection_limit()); - - // Create a oneshot to send background task JoinHandles to the peer set - let (handle_tx, handle_rx) = tokio::sync::oneshot::channel(); - - let peer_set = PeerSet::new(&config, discovered_peers, demand_tx, handle_rx); - let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE); - - Ok(address_book) -} -*/ diff --git a/p2p/src/peer_set/services.rs b/p2p/src/peer_set/services.rs deleted file mode 100644 index d24fc4f..0000000 --- a/p2p/src/peer_set/services.rs +++ /dev/null @@ -1,9 +0,0 @@ -use monero_wire::NetworkAddress; -use tower::BoxError; - -pub mod block_broadcaster; -pub mod block_download; - -pub(crate) type DiscoveredPeer = Result<(NetworkAddress, crate::peer::Client), BoxError>; - -pub use block_download::{BlockGetterRequest, BlockGetterResponse, BlockGetterService}; diff --git a/p2p/src/peer_set/services/block_broadcaster.rs b/p2p/src/peer_set/services/block_broadcaster.rs deleted file mode 100644 index 408013c..0000000 --- a/p2p/src/peer_set/services/block_broadcaster.rs +++ /dev/null @@ -1,116 +0,0 @@ -// TODO: Investigate tor/i2p block broadcasting; should we do it? randomise delay? -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use futures::stream::FuturesOrdered; -use futures::{FutureExt, StreamExt}; -use tokio::sync::Mutex; -use tower::discover::Discover; -use tower::BoxError; - -use monero_wire::messages::common::{BlockCompleteEntry, TransactionBlobs}; -use monero_wire::messages::{NewBlock, NewFluffyBlock}; -use monero_wire::{NetworkAddress, PeerID}; - -use crate::peer::LoadTrackedClient; -use crate::peer_set::set::PeerSet; - -pub enum BlockBroadCasterRequest { - /// A request to broadcast a block to all ready peers, Cuprate - /// only supports broadcasting by fluffy blocks. - BroadCastBlock { block: Vec, block_height: u64 }, -} - -pub enum BlockBroadCasterResponse { - BlockBroadCasted, -} - -pub struct BlockBroadCaster -where - D: Discover + Unpin, - D::Error: Into, -{ - peer_set: std::sync::Arc>>, - /// The proportion of peers that need to be ready for `poll_ready` to return ready. - /// - /// monerod will remove peers that do not broadcast every block to it, this is a problem - /// for us as we need peers to be ready for us to broadcast to them so we compromise and - /// only broadcast to ready peers and take the hit on the other peers. - wanted_ready_peers: f64, -} - -impl tower::Service for BlockBroadCaster -where - D: Discover + Unpin + Send + 'static, - D::Error: Into, -{ - type Response = BlockBroadCasterResponse; - type Error = BoxError; - type Future = - Pin> + Send + 'static>>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let mutex = self.peer_set.clone(); - let ret = match Box::pin(mutex.lock()).poll_unpin(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(mut peer_set) => { - peer_set.poll_all(cx)?; - peer_set.poll_ready(cx); - if self.wanted_ready_peers <= peer_set.proportion_ready() { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } - }; - ret - } - - fn call(&mut self, req: BlockBroadCasterRequest) -> Self::Future { - match req { - BlockBroadCasterRequest::BroadCastBlock { - block, - block_height, - } => { - let empty_txs = TransactionBlobs::new_unpruned(vec![]); - - let fluffy_complete_entry = BlockCompleteEntry { - block: block.clone(), - block_weight: 0, - txs: empty_txs, - pruned: false, - }; - - let new_fluffy_block = NewFluffyBlock { - b: fluffy_complete_entry, - current_blockchain_height: block_height + 1, - }; - - let mutex = self.peer_set.clone(); - - async move { - let mut peer_set = mutex.lock().await; - let all_ready_peers = peer_set.all_ready(); - - let mut fut = FuturesOrdered::new(); - - for (_, svc) in all_ready_peers { - if svc.supports_fluffy_blocks() { - fut.push_back(svc.call(new_fluffy_block.clone().into())); - } else { - tracing::error!( - "Peer which doesn't support fluffy blocks is in the PeerSet" - ) - } - } - peer_set.push_all_unready(); - - while let Some(_) = fut.next().await {} - Ok(BlockBroadCasterResponse::BlockBroadCasted) - } - .boxed() - } - } - } -} diff --git a/p2p/src/peer_set/services/block_download.rs b/p2p/src/peer_set/services/block_download.rs deleted file mode 100644 index f3213b1..0000000 --- a/p2p/src/peer_set/services/block_download.rs +++ /dev/null @@ -1,173 +0,0 @@ -use futures::{FutureExt, Sink}; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use tokio::sync::Mutex; -use tower::discover::Discover; -use tower::BoxError; - -use monero_wire::messages::GetObjectsRequest; -use monero_wire::messages::{GetObjectsResponse, MessageNotification}; -use monero_wire::{Message, NetworkAddress, PeerID}; - -use crate::peer::LoadTrackedClient; -use crate::peer_set::set::PeerSet; -use crate::protocol::InternalMessageResponse; - -pub enum BlockGetterRequest { - /// A request for blocks, used when syncing. - /// - /// start_height is used to determine the peer for the next request, - /// you should use [`BlockGetterRequest::SetHeight`] before calling - /// this for the first time. - GetBlocks { - blocks: Vec<[u8; 32]>, - pruned: bool, - start_height: u64, - }, - SetHeight(u64), -} - -pub enum BlockGetterResponse { - Blocks(GetObjectsResponse), - HeightSet, -} - -pub struct BlockGetterService -where - D: Discover + Unpin, - D::Error: Into, -{ - peer_set: Arc>>, - next_start_height: Option, - p2c_peer: Option<(D::Key, D::Service)>, -} - -impl tower::Service for BlockGetterService -where - D: Discover + Unpin + Send + 'static, - D::Error: Into, -{ - type Response = BlockGetterResponse; - type Error = BoxError; - type Future = - Pin> + Send + 'static>>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let span = tracing::trace_span!(parent: &tracing::span::Span::current(), "BlockGetter"); - match self.next_start_height { - // If we don't know the next batch start height we must have not received - // any requests yet. The first request has to be [`SetHeight`] so thats - // what the next request will be. - None => { - tracing::trace!(parent: &span, "next height not known"); - Poll::Ready(Ok(())) - } - Some(height) => { - tracing::trace!(parent: &span, next_height = height); - - let mut peer_no_longer_ready = false; - - if let Some((addr, svc)) = &mut self.p2c_peer { - tracing::trace!(parent: &span, preselected_peer = ?addr); - match svc.poll_ready(cx) { - Poll::Ready(Ok(())) => { - tracing::trace!( - parent: &span, - "Pre-selected peer still ready, keeping it selected" - ); - return Poll::Ready(Ok(())); - } - Poll::Pending => { - tracing::trace!( - "preselected service is no longer ready, moving to unready list" - ); - peer_no_longer_ready = true; - } - Poll::Ready(Err(e)) => { - tracing::trace!(parent: &span, %e, "preselected service failed, dropping it"); - self.p2c_peer = None; - } - }; - } - - tracing::trace!( - parent: &span, - "preselected service was not ready, preselecting another ready service" - ); - - let mutex = self.peer_set.clone(); - let ret = match Box::pin(mutex.lock()).poll_unpin(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(mut peer_set) => { - peer_set.poll_all(cx)?; - - if peer_no_longer_ready { - let (key, svc) = std::mem::replace(&mut self.p2c_peer, None) - .expect("Peer must exist for it to not be ready"); - peer_set.push_unready(key, svc); - } - - let p2c_peer = match peer_set.preselect_p2c_peer_with_full_block(height) { - None => { - tracing::trace!( - parent: &span, - "no ready services, sending demand signal" - ); - peer_set.demand_more_peers(); - return Poll::Pending; - } - Some(peer) => { - tracing::trace!(parent: &span, preselected_peer = ?peer); - peer - } - }; - self.p2c_peer = peer_set - .take_ready_service(&p2c_peer) - .and_then(|svc| Some((p2c_peer, svc))); - Poll::Ready(Ok(())) - } - }; - ret - } - } - } - - fn call(&mut self, req: BlockGetterRequest) -> Self::Future { - match req { - BlockGetterRequest::SetHeight(height) => { - self.next_start_height = Some(height); - async { Ok(BlockGetterResponse::HeightSet) }.boxed() - } - BlockGetterRequest::GetBlocks { - blocks, - pruned, - start_height, - } => { - self.next_start_height = Some(start_height + blocks.len() as u64); - let obj_req = GetObjectsRequest { blocks, pruned }; - - let peer_set = self.peer_set.clone(); - let (addr, mut svc) = std::mem::replace(&mut self.p2c_peer, None).expect( - "A peer is always selected in poll_ready and poll_ready must be called first", - ); - - async move { - let fut = svc.call(obj_req.into()); - - let mut set = peer_set.lock().await; - set.push_unready(addr, svc); - fut.await.map(|res| { - let InternalMessageResponse::GetObjectsResponse(res) = res else { - unreachable!("Peer connection must return correct response") - }; - BlockGetterResponse::Blocks(res) - }) - } - .boxed() - } - } - } -} diff --git a/p2p/src/peer_set/set.rs b/p2p/src/peer_set/set.rs deleted file mode 100644 index dd717ba..0000000 --- a/p2p/src/peer_set/set.rs +++ /dev/null @@ -1,516 +0,0 @@ -use std::future::Future; -use std::ops::Div; -use std::{ - collections::{HashMap, HashSet}, - convert, - marker::PhantomData, - pin::Pin, - task::{Context, Poll}, -}; - -use futures::future::BoxFuture; -use futures::TryFutureExt; -use futures::{ - channel::{mpsc, oneshot}, - stream::FuturesUnordered, - Stream, -}; -use futures::{FutureExt, SinkExt}; -use tokio::{sync::oneshot::error::TryRecvError, task::JoinHandle}; -use tower::{ - discover::{Change, Discover}, - load::Load, - BoxError, Service, -}; - -use monero_wire::{NetworkAddress, PeerID}; - -use super::{unready_service::UnreadyError, UnreadyService}; -use crate::{ - peer::LoadTrackedClient, - protocol::{InternalMessageRequest, InternalMessageResponse}, - Config, -}; - -/// A signal sent by the [`PeerSet`] when it has no ready peers, and gets a request from Zebra. -/// -/// In response to this signal, the crawler tries to open more peer connections. -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] -pub struct MorePeers; - -/// A signal sent by the [`PeerSet`] to cancel a [`Client`][1]'s current request -/// or response. -/// -/// When it receives this signal, the [`Client`][1] stops processing and exits. -/// -/// [1]: crate::peer::Client -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] -pub struct CancelClientWork; - -/// A [`tower::Service`] that abstractly represents "the rest of the network". -/// -/// # Security -/// -/// The `Discover::Key` must be the transient remote address of each peer. This -/// address may only be valid for the duration of a single connection. (For -/// example, inbound connections have an ephemeral remote port, and proxy -/// connections have an ephemeral local or proxy port.) -/// -/// Otherwise, malicious peers could interfere with other peers' `PeerSet` state. -pub struct PeerSet -where - D: Discover + Unpin, - D::Error: Into, -{ - /// Peer Tracking: New Peers - /// - /// Provides new and deleted peer [`Change`]s to the peer set, - /// via the [`Discover`] trait implementation. - discover: D, - - /// A channel that asks the peer crawler task to connect to more peers. - demand_signal: mpsc::Sender, - - /// Peer Tracking: Ready Peers - /// - /// Connected peers that are ready to receive requests from Zebra, - /// or send requests to Zebra. - ready_services: HashMap, - - /// Peer Tracking: Busy Peers - /// - /// Connected peers that are handling a Zebra request, - /// or Zebra is handling one of their requests. - unready_services: FuturesUnordered>, - - /// Channels used to cancel the request that an unready service is doing. - cancel_handles: HashMap>, - - /// The configured limit for inbound and outbound connections. - /// - /// The peer set panics if this size is exceeded. - /// If that happens, our connection limit code has a bug. - peerset_total_connection_limit: usize, - - // Background Tasks - // - /// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks - /// - /// The join handles passed into the PeerSet are used populate the `guards` member - handle_rx: tokio::sync::oneshot::Receiver>>>, - - /// Unordered set of handles to background tasks associated with the `PeerSet` - /// - /// These guards are checked for errors as part of `poll_ready` which lets - /// the `PeerSet` propagate errors from background tasks back to the user - guards: FuturesUnordered>>, -} - -impl PeerSet -where - D: Discover + Unpin, - D::Error: Into, -{ - /// Construct a peerset which uses `discover` to manage peer connections. - /// - /// Arguments: - /// - `config`: configures the peer set connection limit; - /// - `discover`: handles peer connects and disconnects; - /// - `demand_signal`: requests more peers when all peers are busy (unready); - /// - `handle_rx`: receives background task handles, - /// monitors them to make sure they're still running, - /// and shuts down all the tasks as soon as one task exits; - pub fn new( - config: &Config, - discover: D, - demand_signal: mpsc::Sender, - handle_rx: tokio::sync::oneshot::Receiver>>>, - ) -> Self { - Self { - // New peers - discover, - demand_signal, - - // Ready peers - ready_services: HashMap::new(), - - // Busy peers - unready_services: FuturesUnordered::new(), - cancel_handles: HashMap::new(), - - peerset_total_connection_limit: config.peerset_total_connection_limit(), - - // Background tasks - handle_rx, - guards: futures::stream::FuturesUnordered::new(), - } - } - - /// Receive background tasks, if they've been sent on the channel, - /// but not consumed yet. - /// - /// Returns a result representing the current task state, - /// or `None` if the background tasks should be polled to check their state. - fn receive_tasks_if_needed(&mut self) -> Option> { - if self.guards.is_empty() { - match self.handle_rx.try_recv() { - // The tasks haven't been sent yet. - Err(TryRecvError::Empty) => Some(Ok(())), - - // The tasks have been sent, but not consumed. - Ok(handles) => { - // Currently, the peer set treats an empty background task set as an error. - // - // TODO: refactor `handle_rx` and `guards` into an enum - // for the background task state: Waiting/Running/Shutdown. - assert!( - !handles.is_empty(), - "the peer set requires at least one background task" - ); - - self.guards.extend(handles); - - None - } - - // The tasks have been sent and consumed, but then they exited. - // - // Correctness: the peer set must receive at least one task. - // - // TODO: refactor `handle_rx` and `guards` into an enum - // for the background task state: Waiting/Running/Shutdown. - Err(TryRecvError::Closed) => { - Some(Err("all peer set background tasks have exited".into())) - } - } - } else { - None - } - } - - /// Check background task handles to make sure they're still running. - /// - /// If any background task exits, shuts down all other background tasks, - /// and returns an error. - fn poll_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> { - if let Some(result) = self.receive_tasks_if_needed() { - return result; - } - - match Pin::new(&mut self.guards).poll_next(cx) { - // All background tasks are still running. - Poll::Pending => Ok(()), - - Poll::Ready(Some(res)) => { - tracing::info!( - background_tasks = %self.guards.len(), - "a peer set background task exited, shutting down other peer set tasks" - ); - - self.shut_down_tasks_and_channels(); - - // Flatten the join result and inner result, - // then turn Ok() task exits into errors. - res.map_err(Into::into) - // TODO: replace with Result::flatten when it stabilises (#70142) - .and_then(convert::identity) - .and(Err("a peer set background task exited".into())) - } - - Poll::Ready(None) => { - self.shut_down_tasks_and_channels(); - Err("all peer set background tasks have exited".into()) - } - } - } - - /// Shut down: - /// - services by dropping the service lists - /// - background tasks via their join handles or cancel handles - /// - channels by closing the channel - fn shut_down_tasks_and_channels(&mut self) { - // Drop services and cancel their background tasks. - self.ready_services = HashMap::new(); - - for (_peer_key, handle) in self.cancel_handles.drain() { - let _ = handle.send(CancelClientWork); - } - self.unready_services = FuturesUnordered::new(); - - // Close the MorePeers channel for all senders, - // so we don't add more peers to a shut down peer set. - self.demand_signal.close_channel(); - - // Shut down background tasks. - self.handle_rx.close(); - self.receive_tasks_if_needed(); - for guard in self.guards.iter() { - guard.abort(); - } - - // TODO: implement graceful shutdown for InventoryRegistry (#1678) - } - - /// Check busy peer services for request completion or errors. - /// - /// Move newly ready services to the ready list if they are for peers with supported protocol - /// versions, otherwise they are dropped. Also drop failed services. - fn poll_unready(&mut self, cx: &mut Context<'_>) { - loop { - match Pin::new(&mut self.unready_services).poll_next(cx) { - // No unready service changes, or empty unready services - Poll::Pending | Poll::Ready(None) => return, - - // Unready -> Ready - Poll::Ready(Some(Ok((key, svc)))) => { - tracing::trace!(?key, "service became ready"); - let cancel = self.cancel_handles.remove(&key); - assert!(cancel.is_some(), "missing cancel handle"); - } - - // Unready -> Canceled - Poll::Ready(Some(Err((key, UnreadyError::Canceled)))) => { - // A service be canceled because we've connected to the same service twice. - // In that case, there is a cancel handle for the peer address, - // but it belongs to the service for the newer connection. - tracing::trace!( - ?key, - duplicate_connection = self.cancel_handles.contains_key(&key), - "service was canceled, dropping service" - ); - } - Poll::Ready(Some(Err((key, UnreadyError::CancelHandleDropped(_))))) => { - // Similarly, services with dropped cancel handes can have duplicates. - tracing::trace!( - ?key, - duplicate_connection = self.cancel_handles.contains_key(&key), - "cancel handle was dropped, dropping service" - ); - } - - // Unready -> Errored - Poll::Ready(Some(Err((key, UnreadyError::Inner(error))))) => { - tracing::debug!(%error, "service failed while unready, dropping service"); - - let cancel = self.cancel_handles.remove(&key); - assert!(cancel.is_some(), "missing cancel handle"); - } - } - } - } - - /// Checks for newly inserted or removed services. - /// - /// Puts inserted services in the unready list. - /// Drops removed services, after cancelling any pending requests. - fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll> { - use futures::ready; - loop { - match ready!(Pin::new(&mut self.discover).poll_discover(cx)) - .ok_or("discovery stream closed")? - .map_err(Into::into)? - { - Change::Remove(key) => { - tracing::trace!(?key, "got Change::Remove from Discover"); - self.remove(&key); - } - Change::Insert(key, svc) => { - // We add peers as unready, so that we: - // - always do the same checks on every ready peer, and - // - check for any errors that happened right after the handshake - tracing::trace!(?key, "got Change::Insert from Discover"); - self.remove(&key); - self.push_unready(key, svc); - } - } - } - } - - /// Calls the poll functions used at the start of all `poll_ready`s - pub fn poll_all(&mut self, cx: &mut Context<'_>) -> Result<(), BoxError> { - self.poll_background_errors(cx)?; - - // Update peer statuses - let _ = self.poll_discover(cx)?; - self.poll_unready(cx); - Ok(()) - } - - pub fn poll_ready(&mut self, cx: &mut Context<'_>) { - let mut ready_services = HashMap::with_capacity(self.ready_services.len()); - let mut pending_services = vec![]; - for (key, mut svc) in self.ready_services.drain() { - match svc.poll_ready(cx) { - Poll::Pending => { - pending_services.push((key, svc)); - } - Poll::Ready(Ok(())) => { - ready_services.insert(key, svc); - } - Poll::Ready(Err(e)) => { - tracing::trace!("Peer poll_ready returned error: {}", e); - // peer svc will get dropped at the start of next loop - } - } - } - for (key, svc) in pending_services { - self.push_unready(key, svc); - } - self.ready_services = ready_services; - } - - pub fn proportion_ready(&self) -> f64 { - let total_services = self.ready_services.len() + self.unready_services.len(); - - if total_services == 0 { - return 1.0; - } - - self.ready_services.len() as f64 / total_services as f64 - } - - /// Takes a ready service by key. - pub fn take_ready_service(&mut self, key: &D::Key) -> Option { - if let Some(svc) = self.ready_services.remove(key) { - assert!( - !self.cancel_handles.contains_key(key), - "cancel handles are only used for unready service work" - ); - - Some(svc) - } else { - None - } - } - - /// Remove the service corresponding to `key` from the peer set. - /// - /// Drops the service, cancelling any pending request or response to that peer. - /// If the peer does not exist, does nothing. - fn remove(&mut self, key: &D::Key) { - if let Some(ready_service) = self.take_ready_service(key) { - // A ready service has no work to cancel, so just drop it. - std::mem::drop(ready_service); - } else if let Some(handle) = self.cancel_handles.remove(key) { - // Cancel the work, implicitly dropping the cancel handle. - // The service future returns a `Canceled` error, - // making `poll_unready` drop the service. - let _ = handle.send(CancelClientWork); - } - } - - /// Adds a busy service to the unready list if it's for a peer with a supported version, - /// and adds a cancel handle for the service's current request. - /// - /// If the service is for a connection to an outdated peer, the request is cancelled and the - /// service is dropped. - pub fn push_unready(&mut self, key: D::Key, svc: D::Service) { - let (tx, rx) = oneshot::channel(); - - self.unready_services.push(UnreadyService { - key: Some(key), - service: Some(svc), - cancel: rx, - _req: PhantomData, - }); - - self.cancel_handles.insert(key, tx); - } - - pub fn preselect_p2c_peer_with_full_block(&self, block_height: u64) -> Option { - self.select_p2c_peer_from_list( - &self - .ready_services - .iter() - .filter_map(|(key, serv)| { - if serv.has_full_block(block_height) { - Some(key) - } else { - None - } - }) - .collect(), - ) - } - - /// Performs P2C on `self.ready_services` to randomly select a less-loaded ready service. - pub fn preselect_p2c_peer(&self) -> Option { - self.select_p2c_peer_from_list(&self.ready_services.keys().collect()) - } - - /// Accesses a ready endpoint by `key` and returns its current load. - /// - /// Returns `None` if the service is not in the ready service list. - fn query_load(&self, key: &D::Key) -> Option<::Metric> { - let svc = self.ready_services.get(key); - svc.map(|svc| svc.load()) - } - - // Performs P2C on `ready_service_list` to randomly select a less-loaded ready service. - #[allow(clippy::unwrap_in_result)] - pub fn select_p2c_peer_from_list( - &self, - ready_service_list: &HashSet<&D::Key>, - ) -> Option { - match ready_service_list.len() { - 0 => None, - 1 => Some( - **ready_service_list - .iter() - .next() - .expect("just checked there is one service"), - ), - len => { - // If there are only 2 peers, randomise their order. - // Otherwise, choose 2 random peers in a random order. - let (a, b) = { - let idxs = rand::seq::index::sample(&mut rand::thread_rng(), len, 2); - let a = idxs.index(0); - let b = idxs.index(1); - - let a = **ready_service_list - .iter() - .nth(a) - .expect("sample returns valid indexes"); - let b = **ready_service_list - .iter() - .nth(b) - .expect("sample returns valid indexes"); - - (a, b) - }; - - let a_load = self.query_load(&a).expect("supplied services are ready"); - let b_load = self.query_load(&b).expect("supplied services are ready"); - - let selected = if a_load <= b_load { a } else { b }; - - tracing::trace!( - a.key = ?a, - a.load = ?a_load, - b.key = ?b, - b.load = ?b_load, - selected = ?selected, - ?len, - "selected service by p2c" - ); - - Some(selected) - } - } - } - - pub fn all_ready(&mut self) -> &mut HashMap { - &mut self.ready_services - } - - pub fn push_all_unready(&mut self) { - let all_ready: Vec<(_, _)> = self.ready_services.drain().collect(); - for (key, svc) in all_ready { - self.push_unready(key, svc) - } - } - pub fn demand_more_peers(&mut self) { - let _ = self.demand_signal.try_send(MorePeers); - } -} diff --git a/p2p/src/peer_set/unready_service.rs b/p2p/src/peer_set/unready_service.rs deleted file mode 100644 index bfd3f87..0000000 --- a/p2p/src/peer_set/unready_service.rs +++ /dev/null @@ -1,92 +0,0 @@ -/// Services that are busy or newly created. -/// -/// Adapted from zebra who themselves adapted this from tower-balance. -use std::{ - future::Future, - marker::PhantomData, - pin::Pin, - task::{Context, Poll}, -}; - -use futures::{channel::oneshot, ready}; -use pin_project::pin_project; -use tower::Service; - -use crate::peer_set::set::CancelClientWork; - -/// A Future that becomes satisfied when an `S`-typed service is ready. -/// -/// May fail due to cancellation, i.e. if the service is removed from discovery. -#[pin_project] -#[derive(Debug)] -pub(super) struct UnreadyService { - /// The key used to lookup `service`. - pub(super) key: Option, - - /// A oneshot used to cancel the request the `service` is currently working on, if any. - #[pin] - pub(super) cancel: oneshot::Receiver, - - /// The `service` that is busy (or newly created). - pub(super) service: Option, - - /// Dropping `service` might drop a request. - /// This [`PhantomData`] tells the Rust compiler to do a drop check for `Req`. - pub(super) _req: PhantomData, -} - -#[derive(Debug, Eq, PartialEq)] -pub(super) enum UnreadyError { - Inner(E), - Canceled, - CancelHandleDropped(oneshot::Canceled), -} - -impl, Req> Future for UnreadyService { - type Output = Result<(K, S), (K, UnreadyError)>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - if let Poll::Ready(oneshot_result) = this.cancel.poll(cx) { - let key = this.key.take().expect("polled after ready"); - - // # Correctness - // - // Return an error if the service is explicitly canceled, - // or its cancel handle is dropped, implicitly cancelling it. - match oneshot_result { - Ok(CancelClientWork) => return Poll::Ready(Err((key, UnreadyError::Canceled))), - Err(canceled_error) => { - return Poll::Ready(Err(( - key, - UnreadyError::CancelHandleDropped(canceled_error), - ))) - } - } - } - - // # Correctness - // - // The current task must be scheduled for wakeup every time we return - // `Poll::Pending`. - // - //`ready!` returns `Poll::Pending` when the service is unready, and - // the inner `poll_ready` schedules this task for wakeup. - // - // `cancel.poll` also schedules this task for wakeup if it is canceled. - let res = ready!(this - .service - .as_mut() - .expect("polled after ready") - .poll_ready(cx)); - - let key = this.key.take().expect("polled after ready"); - let svc = this.service.take().expect("polled after ready"); - - match res { - Ok(()) => Poll::Ready(Ok((key, svc))), - Err(e) => Poll::Ready(Err((key, UnreadyError::Inner(e)))), - } - } -}