mirror of
https://github.com/Cuprate/cuprate.git
synced 2025-01-08 20:09:44 +00:00
P2P: Peer sync state svc (#128)
* p2p changes * clippy * a few more docs * init cuprate-p2p * remove some unrelated code and add some docs * start documenting client_pool.rs * add more docs * typo * fix docs * use JoinSet in connection maintainer * small changes * add peer sync state svc * add a test * fix merge * add another test * unify PeerDisconnectFut and add more docs * Apply suggestions from code review Co-authored-by: hinto-janai <hinto.janai@protonmail.com> * dif -> diff * move comment * Update pruning/src/lib.rs Co-authored-by: hinto-janai <hinto.janai@protonmail.com> * Apply suggestions from code review Co-authored-by: hinto-janai <hinto.janai@protonmail.com> --------- Co-authored-by: hinto-janai <hinto.janai@protonmail.com>
This commit is contained in:
parent
02cd963e57
commit
0158c6671a
6 changed files with 451 additions and 4 deletions
|
@ -21,7 +21,7 @@ use monero_p2p::{
|
||||||
ConnectionDirection, NetworkZone,
|
ConnectionDirection, NetworkZone,
|
||||||
};
|
};
|
||||||
|
|
||||||
mod disconnect_monitor;
|
pub(crate) mod disconnect_monitor;
|
||||||
mod drop_guard_client;
|
mod drop_guard_client;
|
||||||
|
|
||||||
pub use drop_guard_client::ClientPoolDropGuard;
|
pub use drop_guard_client::ClientPoolDropGuard;
|
||||||
|
|
|
@ -51,12 +51,12 @@ pub async fn disconnect_monitor<N: NetworkZone>(
|
||||||
|
|
||||||
/// A [`Future`] that resolves when a peer disconnects.
|
/// A [`Future`] that resolves when a peer disconnects.
|
||||||
#[pin_project::pin_project]
|
#[pin_project::pin_project]
|
||||||
struct PeerDisconnectFut<N: NetworkZone> {
|
pub(crate) struct PeerDisconnectFut<N: NetworkZone> {
|
||||||
/// The inner [`Future`] that resolves when a peer disconnects.
|
/// The inner [`Future`] that resolves when a peer disconnects.
|
||||||
#[pin]
|
#[pin]
|
||||||
closed_fut: WaitForCancellationFutureOwned,
|
pub(crate) closed_fut: WaitForCancellationFutureOwned,
|
||||||
/// The peers ID.
|
/// The peers ID.
|
||||||
peer_id: Option<InternalPeerID<N::Addr>>,
|
pub(crate) peer_id: Option<InternalPeerID<N::Addr>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<N: NetworkZone> Future for PeerDisconnectFut<N> {
|
impl<N: NetworkZone> Future for PeerDisconnectFut<N> {
|
||||||
|
|
|
@ -9,6 +9,9 @@ pub(crate) const MAX_SEED_CONNECTIONS: usize = 3;
|
||||||
/// The timeout for when we fail to find a peer to connect to.
|
/// The timeout for when we fail to find a peer to connect to.
|
||||||
pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5);
|
pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
|
|
||||||
|
/// The durations of a short ban.
|
||||||
|
pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10);
|
||||||
|
|
||||||
/// The default amount of time between inbound diffusion flushes.
|
/// The default amount of time between inbound diffusion flushes.
|
||||||
pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5);
|
pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5);
|
||||||
|
|
||||||
|
|
|
@ -12,5 +12,6 @@ pub mod client_pool;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod connection_maintainer;
|
pub mod connection_maintainer;
|
||||||
mod constants;
|
mod constants;
|
||||||
|
mod sync_states;
|
||||||
|
|
||||||
pub use config::P2PConfig;
|
pub use config::P2PConfig;
|
||||||
|
|
427
p2p/cuprate-p2p/src/sync_states.rs
Normal file
427
p2p/cuprate-p2p/src/sync_states.rs
Normal file
|
@ -0,0 +1,427 @@
|
||||||
|
//! # Sync States
|
||||||
|
//!
|
||||||
|
//! This module contains a [`PeerSyncSvc`], which keeps track of the claimed chain states of connected peers.
|
||||||
|
//! This allows checking if we are behind and getting a list of peers who claim they are ahead.
|
||||||
|
use std::{
|
||||||
|
cmp::Ordering,
|
||||||
|
collections::{BTreeMap, HashMap, HashSet},
|
||||||
|
future::{ready, Ready},
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
use futures::{stream::FuturesUnordered, StreamExt};
|
||||||
|
use tokio::sync::watch;
|
||||||
|
use tower::Service;
|
||||||
|
|
||||||
|
use monero_p2p::{
|
||||||
|
client::InternalPeerID,
|
||||||
|
handles::ConnectionHandle,
|
||||||
|
services::{PeerSyncRequest, PeerSyncResponse},
|
||||||
|
NetworkZone,
|
||||||
|
};
|
||||||
|
use monero_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT};
|
||||||
|
use monero_wire::CoreSyncData;
|
||||||
|
|
||||||
|
use crate::{client_pool::disconnect_monitor::PeerDisconnectFut, constants::SHORT_BAN};
|
||||||
|
|
||||||
|
/// The highest claimed sync info from our connected peers.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct NewSyncInfo {
|
||||||
|
/// The peers chain height.
|
||||||
|
chain_height: u64,
|
||||||
|
/// The peers top block's hash.
|
||||||
|
top_hash: [u8; 32],
|
||||||
|
/// The peers cumulative difficulty.
|
||||||
|
cumulative_difficulty: u128,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A service that keeps track of our peers blockchains.
|
||||||
|
///
|
||||||
|
/// This is the service that handles:
|
||||||
|
/// 1. Finding out if we need to sync
|
||||||
|
/// 1. Giving the peers that should be synced _from_, to the requester
|
||||||
|
pub struct PeerSyncSvc<N: NetworkZone> {
|
||||||
|
/// A map of cumulative difficulties to peers.
|
||||||
|
cumulative_difficulties: BTreeMap<u128, HashSet<InternalPeerID<N::Addr>>>,
|
||||||
|
/// A map of peers to cumulative difficulties.
|
||||||
|
peers: HashMap<InternalPeerID<N::Addr>, (u128, PruningSeed)>,
|
||||||
|
/// A watch channel for *a* top synced peer info.
|
||||||
|
new_height_watcher: watch::Sender<NewSyncInfo>,
|
||||||
|
/// The handle to the peer that has data in `new_height_watcher`.
|
||||||
|
last_peer_in_watcher_handle: Option<ConnectionHandle>,
|
||||||
|
/// A [`FuturesUnordered`] that resolves when a peer disconnects.
|
||||||
|
closed_connections: FuturesUnordered<PeerDisconnectFut<N>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<N: NetworkZone> PeerSyncSvc<N> {
|
||||||
|
/// Creates a new [`PeerSyncSvc`] with a [`Receiver`](watch::Receiver) that will be updated with
|
||||||
|
/// the highest seen sync data, this makes no guarantees about which peer will be chosen in case of a tie.
|
||||||
|
pub fn new() -> (Self, watch::Receiver<NewSyncInfo>) {
|
||||||
|
let (watch_tx, mut watch_rx) = watch::channel(NewSyncInfo {
|
||||||
|
chain_height: 0,
|
||||||
|
top_hash: [0; 32],
|
||||||
|
cumulative_difficulty: 0,
|
||||||
|
});
|
||||||
|
|
||||||
|
watch_rx.mark_unchanged();
|
||||||
|
|
||||||
|
(
|
||||||
|
Self {
|
||||||
|
cumulative_difficulties: BTreeMap::new(),
|
||||||
|
peers: HashMap::new(),
|
||||||
|
new_height_watcher: watch_tx,
|
||||||
|
last_peer_in_watcher_handle: None,
|
||||||
|
closed_connections: FuturesUnordered::new(),
|
||||||
|
},
|
||||||
|
watch_rx,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This function checks if any peers have disconnected, removing them if they have.
|
||||||
|
fn poll_disconnected(&mut self, cx: &mut Context<'_>) {
|
||||||
|
while let Poll::Ready(Some(peer_id)) = self.closed_connections.poll_next_unpin(cx) {
|
||||||
|
tracing::trace!("Peer {peer_id} disconnected, removing from peers sync info service.");
|
||||||
|
let (peer_cum_diff, _) = self.peers.remove(&peer_id).unwrap();
|
||||||
|
|
||||||
|
let cum_diff_peers = self
|
||||||
|
.cumulative_difficulties
|
||||||
|
.get_mut(&peer_cum_diff)
|
||||||
|
.unwrap();
|
||||||
|
cum_diff_peers.remove(&peer_id);
|
||||||
|
if cum_diff_peers.is_empty() {
|
||||||
|
// If this was the last peer remove the whole entry for this cumulative difficulty.
|
||||||
|
self.cumulative_difficulties.remove(&peer_cum_diff);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a list of peers that claim to have a higher cumulative difficulty than `current_cum_diff`.
|
||||||
|
fn peers_to_sync_from(
|
||||||
|
&self,
|
||||||
|
current_cum_diff: u128,
|
||||||
|
block_needed: Option<u64>,
|
||||||
|
) -> Vec<InternalPeerID<N::Addr>> {
|
||||||
|
self.cumulative_difficulties
|
||||||
|
.range((current_cum_diff + 1)..)
|
||||||
|
.flat_map(|(_, peers)| peers)
|
||||||
|
.filter(|peer| {
|
||||||
|
if let Some(block_needed) = block_needed {
|
||||||
|
// we just use CRYPTONOTE_MAX_BLOCK_HEIGHT as the blockchain height, this only means
|
||||||
|
// we don't take into account the tip blocks which are not pruned.
|
||||||
|
self.peers
|
||||||
|
.get(peer)
|
||||||
|
.unwrap()
|
||||||
|
.1
|
||||||
|
.has_full_block(block_needed, CRYPTONOTE_MAX_BLOCK_HEIGHT)
|
||||||
|
} else {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.copied()
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Updates a peers sync state.
|
||||||
|
fn update_peer_sync_info(
|
||||||
|
&mut self,
|
||||||
|
peer_id: InternalPeerID<N::Addr>,
|
||||||
|
handle: ConnectionHandle,
|
||||||
|
core_sync_data: CoreSyncData,
|
||||||
|
) -> Result<(), tower::BoxError> {
|
||||||
|
tracing::trace!(
|
||||||
|
"Received new core sync data from peer, top hash: {}",
|
||||||
|
hex::encode(core_sync_data.top_id)
|
||||||
|
);
|
||||||
|
|
||||||
|
let new_cumulative_difficulty = core_sync_data.cumulative_difficulty();
|
||||||
|
|
||||||
|
if let Some((old_cum_diff, _)) = self.peers.get_mut(&peer_id) {
|
||||||
|
match (*old_cum_diff).cmp(&new_cumulative_difficulty) {
|
||||||
|
Ordering::Equal => {
|
||||||
|
// If the cumulative difficulty of the peers chain hasn't changed then no need to update anything.
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
Ordering::Greater => {
|
||||||
|
// This will only happen if a peer lowers its cumulative difficulty during the connection.
|
||||||
|
// This won't happen if a peer re-syncs their blockchain as then the connection would have closed.
|
||||||
|
tracing::debug!(
|
||||||
|
"Peer's claimed cumulative difficulty has dropped, closing connection and banning peer for: {} seconds.", SHORT_BAN.as_secs()
|
||||||
|
);
|
||||||
|
handle.ban_peer(SHORT_BAN);
|
||||||
|
return Err("Peers cumulative difficulty dropped".into());
|
||||||
|
}
|
||||||
|
Ordering::Less => (),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove the old cumulative difficulty entry for this peer
|
||||||
|
let old_cum_diff_peers = self.cumulative_difficulties.get_mut(old_cum_diff).unwrap();
|
||||||
|
old_cum_diff_peers.remove(&peer_id);
|
||||||
|
if old_cum_diff_peers.is_empty() {
|
||||||
|
// If this was the last peer remove the whole entry for this cumulative difficulty.
|
||||||
|
self.cumulative_difficulties.remove(old_cum_diff);
|
||||||
|
}
|
||||||
|
// update the cumulative difficulty
|
||||||
|
*old_cum_diff = new_cumulative_difficulty;
|
||||||
|
} else {
|
||||||
|
// The peer is new so add it the list of peers.
|
||||||
|
self.peers.insert(
|
||||||
|
peer_id,
|
||||||
|
(
|
||||||
|
new_cumulative_difficulty,
|
||||||
|
PruningSeed::decompress_p2p_rules(core_sync_data.pruning_seed)?,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
// add it to the list of peers to watch for disconnection.
|
||||||
|
self.closed_connections.push(PeerDisconnectFut {
|
||||||
|
closed_fut: handle.closed(),
|
||||||
|
peer_id: Some(peer_id),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
self.cumulative_difficulties
|
||||||
|
.entry(new_cumulative_difficulty)
|
||||||
|
.or_default()
|
||||||
|
.insert(peer_id);
|
||||||
|
|
||||||
|
// If the claimed cumulative difficulty is higher than the current one in the watcher
|
||||||
|
// or if the peer in the watch has disconnected, update it.
|
||||||
|
if self.new_height_watcher.borrow().cumulative_difficulty < new_cumulative_difficulty
|
||||||
|
|| self
|
||||||
|
.last_peer_in_watcher_handle
|
||||||
|
.as_ref()
|
||||||
|
.is_some_and(|handle| handle.is_closed())
|
||||||
|
{
|
||||||
|
tracing::debug!(
|
||||||
|
"Updating sync watcher channel with new highest seen cumulative difficulty: {new_cumulative_difficulty}"
|
||||||
|
);
|
||||||
|
let _ = self.new_height_watcher.send(NewSyncInfo {
|
||||||
|
top_hash: core_sync_data.top_id,
|
||||||
|
chain_height: core_sync_data.current_height,
|
||||||
|
cumulative_difficulty: new_cumulative_difficulty,
|
||||||
|
});
|
||||||
|
self.last_peer_in_watcher_handle.replace(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<N: NetworkZone> Service<PeerSyncRequest<N>> for PeerSyncSvc<N> {
|
||||||
|
type Response = PeerSyncResponse<N>;
|
||||||
|
type Error = tower::BoxError;
|
||||||
|
type Future = Ready<Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
self.poll_disconnected(cx);
|
||||||
|
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: PeerSyncRequest<N>) -> Self::Future {
|
||||||
|
let res = match req {
|
||||||
|
PeerSyncRequest::PeersToSyncFrom {
|
||||||
|
current_cumulative_difficulty,
|
||||||
|
block_needed,
|
||||||
|
} => Ok(PeerSyncResponse::PeersToSyncFrom(self.peers_to_sync_from(
|
||||||
|
current_cumulative_difficulty,
|
||||||
|
block_needed,
|
||||||
|
))),
|
||||||
|
PeerSyncRequest::IncomingCoreSyncData(peer_id, handle, sync_data) => self
|
||||||
|
.update_peer_sync_info(peer_id, handle, sync_data)
|
||||||
|
.map(|_| PeerSyncResponse::Ok),
|
||||||
|
};
|
||||||
|
|
||||||
|
ready(res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use tokio::sync::Semaphore;
|
||||||
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
|
use monero_p2p::{client::InternalPeerID, handles::HandleBuilder, services::PeerSyncRequest};
|
||||||
|
use monero_wire::CoreSyncData;
|
||||||
|
|
||||||
|
use cuprate_test_utils::test_netzone::TestNetZone;
|
||||||
|
use monero_p2p::services::PeerSyncResponse;
|
||||||
|
|
||||||
|
use super::PeerSyncSvc;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn top_sync_channel_updates() {
|
||||||
|
let semaphore = Arc::new(Semaphore::new(1));
|
||||||
|
|
||||||
|
let (_g, handle) = HandleBuilder::new()
|
||||||
|
.with_permit(semaphore.try_acquire_owned().unwrap())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let (mut svc, mut watch) = PeerSyncSvc::<TestNetZone<true, true, true>>::new();
|
||||||
|
|
||||||
|
assert!(!watch.has_changed().unwrap());
|
||||||
|
|
||||||
|
svc.ready()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.call(PeerSyncRequest::IncomingCoreSyncData(
|
||||||
|
InternalPeerID::Unknown(0),
|
||||||
|
handle.clone(),
|
||||||
|
CoreSyncData {
|
||||||
|
cumulative_difficulty: 1_000,
|
||||||
|
cumulative_difficulty_top64: 0,
|
||||||
|
current_height: 0,
|
||||||
|
pruning_seed: 0,
|
||||||
|
top_id: [0; 32],
|
||||||
|
top_version: 0,
|
||||||
|
},
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert!(watch.has_changed().unwrap());
|
||||||
|
|
||||||
|
assert_eq!(watch.borrow().top_hash, [0; 32]);
|
||||||
|
assert_eq!(watch.borrow().cumulative_difficulty, 1000);
|
||||||
|
assert_eq!(watch.borrow_and_update().chain_height, 0);
|
||||||
|
|
||||||
|
svc.ready()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.call(PeerSyncRequest::IncomingCoreSyncData(
|
||||||
|
InternalPeerID::Unknown(1),
|
||||||
|
handle.clone(),
|
||||||
|
CoreSyncData {
|
||||||
|
cumulative_difficulty: 1_000,
|
||||||
|
cumulative_difficulty_top64: 0,
|
||||||
|
current_height: 0,
|
||||||
|
pruning_seed: 0,
|
||||||
|
top_id: [0; 32],
|
||||||
|
top_version: 0,
|
||||||
|
},
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert!(!watch.has_changed().unwrap());
|
||||||
|
|
||||||
|
svc.ready()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.call(PeerSyncRequest::IncomingCoreSyncData(
|
||||||
|
InternalPeerID::Unknown(2),
|
||||||
|
handle.clone(),
|
||||||
|
CoreSyncData {
|
||||||
|
cumulative_difficulty: 1_001,
|
||||||
|
cumulative_difficulty_top64: 0,
|
||||||
|
current_height: 0,
|
||||||
|
pruning_seed: 0,
|
||||||
|
top_id: [1; 32],
|
||||||
|
top_version: 0,
|
||||||
|
},
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert!(watch.has_changed().unwrap());
|
||||||
|
|
||||||
|
assert_eq!(watch.borrow().top_hash, [1; 32]);
|
||||||
|
assert_eq!(watch.borrow().cumulative_difficulty, 1001);
|
||||||
|
assert_eq!(watch.borrow_and_update().chain_height, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn peer_sync_info_updates() {
|
||||||
|
let semaphore = Arc::new(Semaphore::new(1));
|
||||||
|
|
||||||
|
let (_g, handle) = HandleBuilder::new()
|
||||||
|
.with_permit(semaphore.try_acquire_owned().unwrap())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let (mut svc, _watch) = PeerSyncSvc::<TestNetZone<true, true, true>>::new();
|
||||||
|
|
||||||
|
svc.ready()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.call(PeerSyncRequest::IncomingCoreSyncData(
|
||||||
|
InternalPeerID::Unknown(0),
|
||||||
|
handle.clone(),
|
||||||
|
CoreSyncData {
|
||||||
|
cumulative_difficulty: 1_000,
|
||||||
|
cumulative_difficulty_top64: 0,
|
||||||
|
current_height: 0,
|
||||||
|
pruning_seed: 0,
|
||||||
|
top_id: [0; 32],
|
||||||
|
top_version: 0,
|
||||||
|
},
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(svc.peers.len(), 1);
|
||||||
|
assert_eq!(svc.cumulative_difficulties.len(), 1);
|
||||||
|
|
||||||
|
svc.ready()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.call(PeerSyncRequest::IncomingCoreSyncData(
|
||||||
|
InternalPeerID::Unknown(0),
|
||||||
|
handle.clone(),
|
||||||
|
CoreSyncData {
|
||||||
|
cumulative_difficulty: 1_001,
|
||||||
|
cumulative_difficulty_top64: 0,
|
||||||
|
current_height: 0,
|
||||||
|
pruning_seed: 0,
|
||||||
|
top_id: [0; 32],
|
||||||
|
top_version: 0,
|
||||||
|
},
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(svc.peers.len(), 1);
|
||||||
|
assert_eq!(svc.cumulative_difficulties.len(), 1);
|
||||||
|
|
||||||
|
svc.ready()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.call(PeerSyncRequest::IncomingCoreSyncData(
|
||||||
|
InternalPeerID::Unknown(1),
|
||||||
|
handle.clone(),
|
||||||
|
CoreSyncData {
|
||||||
|
cumulative_difficulty: 10,
|
||||||
|
cumulative_difficulty_top64: 0,
|
||||||
|
current_height: 0,
|
||||||
|
pruning_seed: 0,
|
||||||
|
top_id: [0; 32],
|
||||||
|
top_version: 0,
|
||||||
|
},
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(svc.peers.len(), 2);
|
||||||
|
assert_eq!(svc.cumulative_difficulties.len(), 2);
|
||||||
|
|
||||||
|
let PeerSyncResponse::PeersToSyncFrom(peers) = svc
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.call(PeerSyncRequest::PeersToSyncFrom {
|
||||||
|
block_needed: None,
|
||||||
|
current_cumulative_difficulty: 0,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
else {
|
||||||
|
panic!("Wrong response for request.")
|
||||||
|
};
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
peers.contains(&InternalPeerID::Unknown(0))
|
||||||
|
&& peers.contains(&InternalPeerID::Unknown(1))
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
|
@ -126,6 +126,14 @@ impl PruningSeed {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns `true` if a peer with this pruning seed should have a non-pruned version of a block.
|
||||||
|
pub fn has_full_block(&self, height: u64, blockchain_height: u64) -> bool {
|
||||||
|
match self {
|
||||||
|
PruningSeed::NotPruned => true,
|
||||||
|
PruningSeed::Pruned(seed) => seed.has_full_block(height, blockchain_height),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Gets the next pruned block for a given `block_height` and `blockchain_height`
|
/// Gets the next pruned block for a given `block_height` and `blockchain_height`
|
||||||
///
|
///
|
||||||
/// Each seed will store, in a cyclic manner, a portion of blocks while discarding
|
/// Each seed will store, in a cyclic manner, a portion of blocks while discarding
|
||||||
|
@ -303,6 +311,14 @@ impl DecompressedPruningSeed {
|
||||||
| ((self.stripe - 1) << PRUNING_SEED_STRIPE_SHIFT)
|
| ((self.stripe - 1) << PRUNING_SEED_STRIPE_SHIFT)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns `true` if a peer with this pruning seed should have a non-pruned version of a block.
|
||||||
|
pub fn has_full_block(&self, height: u64, blockchain_height: u64) -> bool {
|
||||||
|
match get_block_pruning_stripe(height, blockchain_height, self.log_stripes) {
|
||||||
|
Some(block_stripe) => self.stripe == block_stripe,
|
||||||
|
None => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Gets the next unpruned block for a given `block_height` and `blockchain_height`
|
/// Gets the next unpruned block for a given `block_height` and `blockchain_height`
|
||||||
///
|
///
|
||||||
/// Each seed will store, in a cyclic manner, a portion of blocks while discarding
|
/// Each seed will store, in a cyclic manner, a portion of blocks while discarding
|
||||||
|
|
Loading…
Reference in a new issue