add peer sync state svc

This commit is contained in:
Boog900 2024-05-15 01:33:05 +01:00
parent cbb629d431
commit 020fd56673
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
4 changed files with 270 additions and 0 deletions

View file

@ -8,3 +8,6 @@ pub(crate) const MAX_SEED_CONNECTIONS: usize = 3;
/// The timeout for when we fail to find a peer to connect to.
pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5);
/// The durations of a short ban.
pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10);

View file

@ -11,5 +11,6 @@ pub mod client_pool;
pub mod config;
pub mod connection_maintainer;
mod constants;
mod sync_states;
pub use config::P2PConfig;

View file

@ -0,0 +1,247 @@
use std::{
cmp::Ordering,
collections::{BTreeMap, HashMap, HashSet},
future::{ready, Future, Ready},
pin::Pin,
task::{Context, Poll},
};
use futures::{stream::FuturesUnordered, StreamExt};
use tokio::sync::watch;
use tokio_util::sync::WaitForCancellationFutureOwned;
use tower::Service;
use monero_p2p::{
client::InternalPeerID,
handles::ConnectionHandle,
services::{PeerSyncRequest, PeerSyncResponse},
NetworkZone,
};
use monero_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT};
use monero_wire::CoreSyncData;
use crate::constants::SHORT_BAN;
#[derive(Debug)]
pub struct NewSyncInfo {
chain_height: u64,
top_hash: [u8; 32],
cumulative_difficulty: u128,
}
/// A service that keeps track of our peers blockchains.
///
/// This is the service that handles finding out if we need to sync and giving the peers that should
/// be synced from to the requester.
pub struct PeerSyncSvc<N: NetworkZone> {
/// A map of cumulative difficulties to peers.
cumulative_difficulties: BTreeMap<u128, HashSet<InternalPeerID<N::Addr>>>,
/// A map of peers to cumulative difficulties.
peers: HashMap<InternalPeerID<N::Addr>, (u128, PruningSeed)>,
/// A watch channel for *a* top synced peer info.
///
/// This is guaranteed to hold the sync info of a peer with the highest cumulative difficulty seen,
/// this makes no guarantees about which peer will be chosen in case of a tie.
new_height_watcher: watch::Sender<NewSyncInfo>,
/// The handle to the peer that has data in `new_height_watcher`.
last_peer_in_watcher_handle: Option<ConnectionHandle>,
/// A [`FuturesUnordered`] that resolves when a peer disconnects.
closed_connections: FuturesUnordered<PeerDisconnectFut<N>>,
}
impl<N: NetworkZone> PeerSyncSvc<N> {
pub fn new() -> (Self, watch::Receiver<NewSyncInfo>) {
let (watch_tx, mut watch_rx) = watch::channel(NewSyncInfo {
chain_height: 0,
top_hash: [0; 32],
cumulative_difficulty: 0,
});
watch_rx.mark_unchanged();
(
Self {
cumulative_difficulties: BTreeMap::new(),
peers: HashMap::new(),
new_height_watcher: watch_tx,
last_peer_in_watcher_handle: None,
closed_connections: FuturesUnordered::new(),
},
watch_rx,
)
}
fn poll_disconnected(&mut self, cx: &mut Context<'_>) {
while let Poll::Ready(Some(peer_id)) = self.closed_connections.poll_next_unpin(cx) {
tracing::trace!("Peer {peer_id} disconnected, removing from peers sync info service.");
let (peer_cum_diff, _) = self.peers.remove(&peer_id).unwrap();
let cum_dif_peers = self
.cumulative_difficulties
.get_mut(&peer_cum_diff)
.unwrap();
cum_dif_peers.remove(&peer_id);
if cum_dif_peers.is_empty() {
// If this was the last peer remove the whole entry for this cumulative difficulty.
self.cumulative_difficulties.remove(&peer_cum_diff);
}
}
}
/// Returns a list of peers that claim to have a higher cumulative difficulty than `current_cum_dif`.
fn peers_to_sync_from(
&self,
current_cum_dif: u128,
block_needed: Option<u64>,
) -> Vec<InternalPeerID<N::Addr>> {
self.cumulative_difficulties
.range((current_cum_dif + 1)..)
.flat_map(|(_, peers)| peers)
.filter(|peer| {
if let Some(block_needed) = block_needed {
// we just use CRYPTONOTE_MAX_BLOCK_HEIGHT as the blockchain height, this only means
// we don't take into account the tip blocks which are not pruned.
self.peers
.get(peer)
.unwrap()
.1
.has_full_block(block_needed, CRYPTONOTE_MAX_BLOCK_HEIGHT)
} else {
true
}
})
.copied()
.collect()
}
/// Updates a peers sync state.
fn update_peer_sync_info(
&mut self,
peer_id: InternalPeerID<N::Addr>,
handle: ConnectionHandle,
core_sync_data: CoreSyncData,
) -> Result<(), tower::BoxError> {
tracing::trace!(
"Received new core sync data from peer, top hash: {}",
hex::encode(core_sync_data.top_id)
);
let new_cumulative_difficulty = core_sync_data.cumulative_difficulty();
if let Some((old_cum_dif, _)) = self.peers.get_mut(&peer_id) {
match (*old_cum_dif).cmp(&new_cumulative_difficulty) {
Ordering::Equal => {
// If the cumulative difficulty of the peers chain hasn't changed then no need to update anything.
return Ok(());
}
Ordering::Greater => {
// This will only happen if a peer lowers its cumulative difficulty during the connection.
// This won't happen if a peer re-syncs their blockchain as then the connection would have closed.
tracing::debug!(
"Peer's claimed cumulative difficulty has dropped, closing connection and banning peer for: {} seconds.", SHORT_BAN.as_secs()
);
handle.ban_peer(SHORT_BAN);
return Err("Peers cumulative difficulty dropped".into());
}
Ordering::Less => (),
}
// Remove the old cumulative difficulty entry for this peer
let old_cum_dif_peers = self.cumulative_difficulties.get_mut(old_cum_dif).unwrap();
old_cum_dif_peers.remove(&peer_id);
if old_cum_dif_peers.is_empty() {
// If this was the last peer remove the whole entry for this cumulative difficulty.
self.cumulative_difficulties.remove(old_cum_dif);
}
// update the cumulative difficulty
*old_cum_dif = new_cumulative_difficulty;
} else {
// The peer is new so add it the list of peers.
self.peers.insert(
peer_id,
(
new_cumulative_difficulty,
PruningSeed::decompress_p2p_rules(core_sync_data.pruning_seed)?,
),
);
// add it to the list of peers to watch for disconnection.
self.closed_connections.push(PeerDisconnectFut {
closed_fut: handle.closed(),
peer_id: Some(peer_id),
})
}
self.cumulative_difficulties
.entry(new_cumulative_difficulty)
.or_default()
.insert(peer_id);
if self.new_height_watcher.borrow().cumulative_difficulty < new_cumulative_difficulty
|| !self
.last_peer_in_watcher_handle
.as_ref()
.is_some_and(|handle| !handle.is_closed())
{
tracing::debug!(
"Updating sync watcher channel with new highest seen cumulative difficulty."
);
let _ = self.new_height_watcher.send(NewSyncInfo {
top_hash: core_sync_data.top_id,
chain_height: core_sync_data.current_height,
cumulative_difficulty: new_cumulative_difficulty,
});
self.last_peer_in_watcher_handle.replace(handle);
}
Ok(())
}
}
impl<N: NetworkZone> Service<PeerSyncRequest<N>> for PeerSyncSvc<N> {
type Response = PeerSyncResponse<N>;
type Error = tower::BoxError;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_disconnected(cx);
Poll::Ready(Ok(()))
}
fn call(&mut self, req: PeerSyncRequest<N>) -> Self::Future {
let res = match req {
PeerSyncRequest::PeersToSyncFrom {
current_cumulative_difficulty,
block_needed,
} => Ok(PeerSyncResponse::PeersToSyncFrom(self.peers_to_sync_from(
current_cumulative_difficulty,
block_needed,
))),
PeerSyncRequest::IncomingCoreSyncData(peer_id, handle, sync_data) => self
.update_peer_sync_info(peer_id, handle, sync_data)
.map(|_| PeerSyncResponse::Ok),
};
ready(res)
}
}
#[pin_project::pin_project]
struct PeerDisconnectFut<N: NetworkZone> {
#[pin]
closed_fut: WaitForCancellationFutureOwned,
peer_id: Option<InternalPeerID<N::Addr>>,
}
impl<N: NetworkZone> Future for PeerDisconnectFut<N> {
type Output = InternalPeerID<N::Addr>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.closed_fut
.poll(cx)
.map(|_| this.peer_id.take().unwrap())
}
}

View file

@ -126,6 +126,14 @@ impl PruningSeed {
}
}
/// Returns if a peer with this pruning seed should have a non-pruned version of a block.
pub fn has_full_block(&self, height: u64, blockchain_height: u64) -> bool {
match self {
PruningSeed::NotPruned => true,
PruningSeed::Pruned(seed) => seed.has_full_block(height, blockchain_height),
}
}
/// Gets the next pruned block for a given `block_height` and `blockchain_height`
///
/// Each seed will store, in a cyclic manner, a portion of blocks while discarding
@ -303,6 +311,17 @@ impl DecompressedPruningSeed {
| ((self.stripe - 1) << PRUNING_SEED_STRIPE_SHIFT)
}
/// Returns if a peer with this pruning seed should have a non-pruned version of a block.
pub fn has_full_block(&self, height: u64, blockchain_height: u64) -> bool {
let Some(block_stripe) =
get_block_pruning_stripe(height, blockchain_height, self.log_stripes)
else {
return true;
};
self.stripe == block_stripe
}
/// Gets the next unpruned block for a given `block_height` and `blockchain_height`
///
/// Each seed will store, in a cyclic manner, a portion of blocks while discarding