From 020fd56673bce370ae6c4e229c8ad833712d4dc6 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Wed, 15 May 2024 01:33:05 +0100 Subject: [PATCH] add peer sync state svc --- p2p/cuprate-p2p/src/constants.rs | 3 + p2p/cuprate-p2p/src/lib.rs | 1 + p2p/cuprate-p2p/src/sync_states.rs | 247 +++++++++++++++++++++++++++++ pruning/src/lib.rs | 19 +++ 4 files changed, 270 insertions(+) create mode 100644 p2p/cuprate-p2p/src/sync_states.rs diff --git a/p2p/cuprate-p2p/src/constants.rs b/p2p/cuprate-p2p/src/constants.rs index 58e263fa..1a75b067 100644 --- a/p2p/cuprate-p2p/src/constants.rs +++ b/p2p/cuprate-p2p/src/constants.rs @@ -8,3 +8,6 @@ pub(crate) const MAX_SEED_CONNECTIONS: usize = 3; /// The timeout for when we fail to find a peer to connect to. pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5); + +/// The durations of a short ban. +pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10); diff --git a/p2p/cuprate-p2p/src/lib.rs b/p2p/cuprate-p2p/src/lib.rs index 0c53b78f..441ba584 100644 --- a/p2p/cuprate-p2p/src/lib.rs +++ b/p2p/cuprate-p2p/src/lib.rs @@ -11,5 +11,6 @@ pub mod client_pool; pub mod config; pub mod connection_maintainer; mod constants; +mod sync_states; pub use config::P2PConfig; diff --git a/p2p/cuprate-p2p/src/sync_states.rs b/p2p/cuprate-p2p/src/sync_states.rs new file mode 100644 index 00000000..1e97271b --- /dev/null +++ b/p2p/cuprate-p2p/src/sync_states.rs @@ -0,0 +1,247 @@ +use std::{ + cmp::Ordering, + collections::{BTreeMap, HashMap, HashSet}, + future::{ready, Future, Ready}, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::{stream::FuturesUnordered, StreamExt}; +use tokio::sync::watch; +use tokio_util::sync::WaitForCancellationFutureOwned; +use tower::Service; + +use monero_p2p::{ + client::InternalPeerID, + handles::ConnectionHandle, + services::{PeerSyncRequest, PeerSyncResponse}, + NetworkZone, +}; +use monero_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT}; +use monero_wire::CoreSyncData; + +use crate::constants::SHORT_BAN; + +#[derive(Debug)] +pub struct NewSyncInfo { + chain_height: u64, + top_hash: [u8; 32], + cumulative_difficulty: u128, +} + +/// A service that keeps track of our peers blockchains. +/// +/// This is the service that handles finding out if we need to sync and giving the peers that should +/// be synced from to the requester. +pub struct PeerSyncSvc { + /// A map of cumulative difficulties to peers. + cumulative_difficulties: BTreeMap>>, + /// A map of peers to cumulative difficulties. + peers: HashMap, (u128, PruningSeed)>, + /// A watch channel for *a* top synced peer info. + /// + /// This is guaranteed to hold the sync info of a peer with the highest cumulative difficulty seen, + /// this makes no guarantees about which peer will be chosen in case of a tie. + new_height_watcher: watch::Sender, + /// The handle to the peer that has data in `new_height_watcher`. + last_peer_in_watcher_handle: Option, + /// A [`FuturesUnordered`] that resolves when a peer disconnects. + closed_connections: FuturesUnordered>, +} + +impl PeerSyncSvc { + pub fn new() -> (Self, watch::Receiver) { + let (watch_tx, mut watch_rx) = watch::channel(NewSyncInfo { + chain_height: 0, + top_hash: [0; 32], + cumulative_difficulty: 0, + }); + + watch_rx.mark_unchanged(); + + ( + Self { + cumulative_difficulties: BTreeMap::new(), + peers: HashMap::new(), + new_height_watcher: watch_tx, + last_peer_in_watcher_handle: None, + closed_connections: FuturesUnordered::new(), + }, + watch_rx, + ) + } + + fn poll_disconnected(&mut self, cx: &mut Context<'_>) { + while let Poll::Ready(Some(peer_id)) = self.closed_connections.poll_next_unpin(cx) { + tracing::trace!("Peer {peer_id} disconnected, removing from peers sync info service."); + let (peer_cum_diff, _) = self.peers.remove(&peer_id).unwrap(); + + let cum_dif_peers = self + .cumulative_difficulties + .get_mut(&peer_cum_diff) + .unwrap(); + cum_dif_peers.remove(&peer_id); + if cum_dif_peers.is_empty() { + // If this was the last peer remove the whole entry for this cumulative difficulty. + self.cumulative_difficulties.remove(&peer_cum_diff); + } + } + } + + /// Returns a list of peers that claim to have a higher cumulative difficulty than `current_cum_dif`. + fn peers_to_sync_from( + &self, + current_cum_dif: u128, + block_needed: Option, + ) -> Vec> { + self.cumulative_difficulties + .range((current_cum_dif + 1)..) + .flat_map(|(_, peers)| peers) + .filter(|peer| { + if let Some(block_needed) = block_needed { + // we just use CRYPTONOTE_MAX_BLOCK_HEIGHT as the blockchain height, this only means + // we don't take into account the tip blocks which are not pruned. + self.peers + .get(peer) + .unwrap() + .1 + .has_full_block(block_needed, CRYPTONOTE_MAX_BLOCK_HEIGHT) + } else { + true + } + }) + .copied() + .collect() + } + + /// Updates a peers sync state. + fn update_peer_sync_info( + &mut self, + peer_id: InternalPeerID, + handle: ConnectionHandle, + core_sync_data: CoreSyncData, + ) -> Result<(), tower::BoxError> { + tracing::trace!( + "Received new core sync data from peer, top hash: {}", + hex::encode(core_sync_data.top_id) + ); + + let new_cumulative_difficulty = core_sync_data.cumulative_difficulty(); + + if let Some((old_cum_dif, _)) = self.peers.get_mut(&peer_id) { + match (*old_cum_dif).cmp(&new_cumulative_difficulty) { + Ordering::Equal => { + // If the cumulative difficulty of the peers chain hasn't changed then no need to update anything. + return Ok(()); + } + Ordering::Greater => { + // This will only happen if a peer lowers its cumulative difficulty during the connection. + // This won't happen if a peer re-syncs their blockchain as then the connection would have closed. + tracing::debug!( + "Peer's claimed cumulative difficulty has dropped, closing connection and banning peer for: {} seconds.", SHORT_BAN.as_secs() + ); + handle.ban_peer(SHORT_BAN); + return Err("Peers cumulative difficulty dropped".into()); + } + Ordering::Less => (), + } + + // Remove the old cumulative difficulty entry for this peer + let old_cum_dif_peers = self.cumulative_difficulties.get_mut(old_cum_dif).unwrap(); + old_cum_dif_peers.remove(&peer_id); + if old_cum_dif_peers.is_empty() { + // If this was the last peer remove the whole entry for this cumulative difficulty. + self.cumulative_difficulties.remove(old_cum_dif); + } + // update the cumulative difficulty + *old_cum_dif = new_cumulative_difficulty; + } else { + // The peer is new so add it the list of peers. + self.peers.insert( + peer_id, + ( + new_cumulative_difficulty, + PruningSeed::decompress_p2p_rules(core_sync_data.pruning_seed)?, + ), + ); + + // add it to the list of peers to watch for disconnection. + self.closed_connections.push(PeerDisconnectFut { + closed_fut: handle.closed(), + peer_id: Some(peer_id), + }) + } + + self.cumulative_difficulties + .entry(new_cumulative_difficulty) + .or_default() + .insert(peer_id); + + if self.new_height_watcher.borrow().cumulative_difficulty < new_cumulative_difficulty + || !self + .last_peer_in_watcher_handle + .as_ref() + .is_some_and(|handle| !handle.is_closed()) + { + tracing::debug!( + "Updating sync watcher channel with new highest seen cumulative difficulty." + ); + let _ = self.new_height_watcher.send(NewSyncInfo { + top_hash: core_sync_data.top_id, + chain_height: core_sync_data.current_height, + cumulative_difficulty: new_cumulative_difficulty, + }); + self.last_peer_in_watcher_handle.replace(handle); + } + + Ok(()) + } +} + +impl Service> for PeerSyncSvc { + type Response = PeerSyncResponse; + type Error = tower::BoxError; + type Future = Ready>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.poll_disconnected(cx); + + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: PeerSyncRequest) -> Self::Future { + let res = match req { + PeerSyncRequest::PeersToSyncFrom { + current_cumulative_difficulty, + block_needed, + } => Ok(PeerSyncResponse::PeersToSyncFrom(self.peers_to_sync_from( + current_cumulative_difficulty, + block_needed, + ))), + PeerSyncRequest::IncomingCoreSyncData(peer_id, handle, sync_data) => self + .update_peer_sync_info(peer_id, handle, sync_data) + .map(|_| PeerSyncResponse::Ok), + }; + + ready(res) + } +} + +#[pin_project::pin_project] +struct PeerDisconnectFut { + #[pin] + closed_fut: WaitForCancellationFutureOwned, + peer_id: Option>, +} + +impl Future for PeerDisconnectFut { + type Output = InternalPeerID; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + this.closed_fut + .poll(cx) + .map(|_| this.peer_id.take().unwrap()) + } +} diff --git a/pruning/src/lib.rs b/pruning/src/lib.rs index 9cff0824..b354064f 100644 --- a/pruning/src/lib.rs +++ b/pruning/src/lib.rs @@ -126,6 +126,14 @@ impl PruningSeed { } } + /// Returns if a peer with this pruning seed should have a non-pruned version of a block. + pub fn has_full_block(&self, height: u64, blockchain_height: u64) -> bool { + match self { + PruningSeed::NotPruned => true, + PruningSeed::Pruned(seed) => seed.has_full_block(height, blockchain_height), + } + } + /// Gets the next pruned block for a given `block_height` and `blockchain_height` /// /// Each seed will store, in a cyclic manner, a portion of blocks while discarding @@ -303,6 +311,17 @@ impl DecompressedPruningSeed { | ((self.stripe - 1) << PRUNING_SEED_STRIPE_SHIFT) } + /// Returns if a peer with this pruning seed should have a non-pruned version of a block. + pub fn has_full_block(&self, height: u64, blockchain_height: u64) -> bool { + let Some(block_stripe) = + get_block_pruning_stripe(height, blockchain_height, self.log_stripes) + else { + return true; + }; + + self.stripe == block_stripe + } + /// Gets the next unpruned block for a given `block_height` and `blockchain_height` /// /// Each seed will store, in a cyclic manner, a portion of blocks while discarding