review fixes

This commit is contained in:
Boog900 2024-06-18 00:43:39 +01:00
parent cfe5bea453
commit f0900f7594
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
6 changed files with 129 additions and 113 deletions

View file

@ -1,7 +1,8 @@
//! # Block Downloader
//!
//! This module contains the block downloader, which finds a chain to download from our connected peers
//! and downloads it.
//! This module contains the [`BlockDownloader`], which finds a chain to
//! download from our connected peers and downloads it. See the actual
//! `struct` documentation for implementation details.
//!
//! The block downloader is started by [`download_blocks`].
use std::{
@ -138,7 +139,6 @@ pub fn download_blocks<N: NetworkZone, S, C>(
client_pool: Arc<ClientPool<N>>,
peer_sync_svc: S,
our_chain_svc: C,
config: BlockDownloaderConfig,
) -> BufferStream<BlockBatch>
where
@ -168,8 +168,14 @@ where
buffer_stream
}
/// A batch of blocks in the ready queue, waiting for previous blocks to come in so they can
/// A batch of blocks in the ready queue, waiting for previous blocks to come in, so they can
/// be passed into the buffer.
///
/// The [`Eq`] and [`Ord`] impl on this type will only take into account the `start_height`, this
/// is because the block downloader will only download one chain at once so no 2 batches can have
/// the same `start_height`.
///
/// Also, the [`Ord`] impl is reversed so older blocks (lower height) come first in a [`BinaryHeap`].
#[derive(Debug)]
struct ReadyQueueBatch {
/// The start height of the batch.
@ -222,15 +228,13 @@ impl Ord for ReadyQueueBatch {
/// Ready peers will either:
/// - download the next batch of blocks
/// - request the next chain entry
/// - download an already requested batch of blocks this might happen due to an error in the previous request
/// or because the queue of ready blocks is too large, so we need the oldest block to clear it.
/// - download an already requested batch of blocks (this might happen due to an error in the previous request
/// or because the queue of ready blocks is too large, so we need the oldest block to clear it).
struct BlockDownloader<N: NetworkZone, S, C> {
/// The client pool.
client_pool: Arc<ClientPool<N>>,
/// Peers that are ready to handle requests.
pending_peers: BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>,
/// The service that holds the peers sync states.
/// The service that holds the peer's sync states.
peer_sync_svc: S,
/// The service that holds our current chain state.
our_chain_svc: C,
@ -242,19 +246,15 @@ struct BlockDownloader<N: NetworkZone, S, C> {
/// The amount of consecutive empty chain entries we received.
///
/// An empty chain entry means we reached the peers chain tip.
/// An empty chain entry means we reached the peer's chain tip.
amount_of_empty_chain_entries: usize,
/// The running block download tasks.
///
/// Returns:
/// - The start height of the batch
/// - A result contains the batch or an error.
#[allow(clippy::type_complexity)]
block_download_tasks: JoinSet<(
u64,
Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError>,
)>,
/// - A result containing the batch or an error.
block_download_tasks: JoinSet<BlockDownloadTaskResponse<N>>,
/// The running chain entry tasks.
///
/// Returns a result of the chain entry or an error.
@ -263,7 +263,7 @@ struct BlockDownloader<N: NetworkZone, S, C> {
/// The current inflight requests.
///
/// This a map of batch start heights to block ids and related information of the batch.
/// This is a map of batch start heights to block IDs and related information of the batch.
inflight_requests: BTreeMap<u64, BlocksToRetrieve<N>>,
/// A queue of ready batches.
@ -271,7 +271,7 @@ struct BlockDownloader<N: NetworkZone, S, C> {
/// The size, in bytes, of all the batches in `ready_batches`.
ready_batches_size: usize,
/// A queue of failed batches start height's that should be retried.
/// A queue of start heights from failed batches that should be retried.
///
/// Wrapped in [`Reverse`] so we prioritize early batches.
failed_batches: BinaryHeap<Reverse<u64>>,
@ -301,9 +301,8 @@ where
config: BlockDownloaderConfig,
) -> Self {
BlockDownloader {
Self {
client_pool,
pending_peers: BTreeMap::new(),
peer_sync_svc,
our_chain_svc,
amount_of_blocks_to_request: config.initial_batch_size,
@ -321,12 +320,13 @@ where
}
/// Checks if we can make use of any peers that are currently pending requests.
async fn check_pending_peers(&mut self, chain_tracker: &mut ChainTracker<N>) {
async fn check_pending_peers(
&mut self,
chain_tracker: &mut ChainTracker<N>,
pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>,
) {
tracing::debug!("Checking if we can give any work to pending peers.");
// HACK: The borrow checker doesn't like the following code if we don't do this.
let mut pending_peers = mem::take(&mut self.pending_peers);
for (_, peers) in pending_peers.iter_mut() {
while let Some(peer) = peers.pop() {
if peer.info.handle.is_closed() {
@ -336,16 +336,12 @@ where
if let Some(peer) = self.try_handle_free_client(chain_tracker, peer).await {
// This peer is ok however it does not have the data we currently need, this will only happen
// because of it's pruning seed so just skip over all peers with this pruning seed.
// because of its pruning seed so just skip over all peers with this pruning seed.
peers.push(peer);
break;
}
}
}
// Make sure the calls to `try_handle_free_client` did not add peers to this.
assert!(self.pending_peers.is_empty());
self.pending_peers = pending_peers;
}
/// Attempts to send another request for an inflight batch
@ -353,7 +349,7 @@ where
/// This function will find the batch(es) that we are waiting on to clear our ready queue and sends another request
/// for them.
///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the batch according to it's pruning seed.
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the batch according to its pruning seed.
async fn request_inflight_batch_again(
&mut self,
client: ClientPoolDropGuard<N>,
@ -364,7 +360,7 @@ where
);
if self.inflight_requests.is_empty() {
panic!("We need requests inflight to be able to send the request again")
panic!("We need requests inflight to be able to send the request again");
}
let oldest_ready_batch = self.ready_batches.peek().unwrap().start_height;
@ -394,10 +390,10 @@ where
self.block_download_tasks.spawn(
async move {
(
BlockDownloadTaskResponse {
start_height,
request_batch_from_peer(client, ids, start_height).await,
)
result: request_batch_from_peer(client, ids, start_height).await,
}
}
.instrument(tracing::debug_span!(
"download_batch",
@ -419,7 +415,7 @@ where
/// The batch requested will depend on our current state, failed batches will be prioritised.
///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according
/// to it's pruning seed.
/// to its pruning seed.
async fn request_block_batch(
&mut self,
chain_tracker: &mut ChainTracker<N>,
@ -430,45 +426,45 @@ where
while let Some(failed_request) = self.failed_batches.peek() {
// Check if we still have the request that failed - another peer could have completed it after
// failure.
if let Some(request) = self.inflight_requests.get_mut(&failed_request.0) {
// Check if this peer has the blocks according to their pruning seed.
if client_has_block_in_range(
&client.info.pruning_seed,
request.start_height,
request.ids.len(),
) {
tracing::debug!("Using peer to request a failed batch");
// They should have the blocks so send the re-request to this peer.
let ids = request.ids.clone();
let start_height = request.start_height;
request.requests_sent += 1;
self.block_download_tasks.spawn(
async move {
(
start_height,
request_batch_from_peer(client, ids, start_height).await,
)
}
.instrument(tracing::debug_span!(
"download_batch",
start_height,
attempt = request.requests_sent
)),
);
// Remove the failure, we have just handled it.
self.failed_batches.pop();
return None;
}
break;
} else {
let Some(request) = self.inflight_requests.get_mut(&failed_request.0) else {
// We don't have the request in flight so remove the failure.
self.failed_batches.pop();
continue;
};
// Check if this peer has the blocks according to their pruning seed.
if client_has_block_in_range(
&client.info.pruning_seed,
request.start_height,
request.ids.len(),
) {
tracing::debug!("Using peer to request a failed batch");
// They should have the blocks so send the re-request to this peer.
let ids = request.ids.clone();
let start_height = request.start_height;
request.requests_sent += 1;
self.block_download_tasks.spawn(
async move {
BlockDownloadTaskResponse {
start_height,
result: request_batch_from_peer(client, ids, start_height).await,
}
}
.instrument(tracing::debug_span!(
"download_batch",
start_height,
attempt = request.requests_sent
)),
);
// Remove the failure, we have just handled it.
self.failed_batches.pop();
return None;
}
// The peer doesn't have the batch according to its pruning seed.
break;
}
// If our ready queue is too large send duplicate requests for the blocks we are waiting on.
@ -492,15 +488,15 @@ where
self.block_download_tasks.spawn(
async move {
(
block_entry_to_get.start_height,
request_batch_from_peer(
BlockDownloadTaskResponse {
start_height: block_entry_to_get.start_height,
result: request_batch_from_peer(
client,
block_entry_to_get.ids,
block_entry_to_get.start_height,
)
.await,
)
}
}
.instrument(tracing::debug_span!(
"download_batch",
@ -518,13 +514,13 @@ where
/// or if we should request a batch of blocks.
///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according
/// to it's pruning seed.
/// to its pruning seed.
async fn try_handle_free_client(
&mut self,
chain_tracker: &mut ChainTracker<N>,
client: ClientPoolDropGuard<N>,
) -> Option<ClientPoolDropGuard<N>> {
// We send 2 requests, so if one of them is slow/ doesn't have the next chain we still have a backup.
// We send 2 requests, so if one of them is slow or doesn't have the next chain, we still have a backup.
if self.chain_entry_task.len() < 2
// If we have had too many failures then assume the tip has been found so no more chain entries.
&& self.amount_of_empty_chain_entries <= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED
@ -564,6 +560,7 @@ where
async fn check_for_free_clients(
&mut self,
chain_tracker: &mut ChainTracker<N>,
pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>,
) -> Result<(), BlockDownloadError> {
tracing::debug!("Checking for free peers");
@ -575,7 +572,7 @@ where
.call(ChainSvcRequest::CumulativeDifficulty)
.await?
else {
panic!("Chain service returned ")
panic!("Chain service returned wrong response.");
};
let PeerSyncResponse::PeersToSyncFrom(peers) = self
@ -588,19 +585,19 @@ where
})
.await?
else {
panic!("Chain service returned ")
panic!("Peer sync service returned wrong response.");
};
tracing::debug!("Response received from peer sync service");
for client in self.client_pool.borrow_clients(&peers) {
self.pending_peers
pending_peers
.entry(client.info.pruning_seed)
.or_default()
.push(client);
}
self.check_pending_peers(chain_tracker).await;
self.check_pending_peers(chain_tracker, pending_peers).await;
Ok(())
}
@ -652,6 +649,7 @@ where
start_height: u64,
res: Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError>,
chain_tracker: &mut ChainTracker<N>,
pending_peers: &mut BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>,
) -> Result<(), BlockDownloadError> {
tracing::debug!("Handling block download response");
@ -665,7 +663,7 @@ where
"Received an invalid chain from peer: {}, exiting block downloader (it should be restarted).",
entry.peer_who_told_us
);
entry.peer_who_told_us_handle.ban_peer(LONG_BAN)
entry.peer_who_told_us_handle.ban_peer(LONG_BAN);
});
return Err(e);
@ -683,7 +681,7 @@ where
return Err(BlockDownloadError::TimedOut);
}
self.failed_batches.push(Reverse(start_height))
self.failed_batches.push(Reverse(start_height));
}
Ok(())
@ -693,12 +691,12 @@ where
if self.inflight_requests.remove(&start_height).is_none() {
tracing::debug!("Already retrieved batch");
// If it was already retrieved then there is nothing else to do.
self.pending_peers
pending_peers
.entry(client.info.pruning_seed)
.or_default()
.push(client);
self.check_pending_peers(chain_tracker).await;
self.check_pending_peers(chain_tracker, pending_peers).await;
return Ok(());
};
@ -730,12 +728,12 @@ where
// Attempt to push new batches to the buffer.
self.push_new_blocks().await?;
self.pending_peers
pending_peers
.entry(client.info.pruning_seed)
.or_default()
.push(client);
self.check_pending_peers(chain_tracker).await;
self.check_pending_peers(chain_tracker, pending_peers).await;
Ok(())
}
@ -751,34 +749,40 @@ where
)
.await?;
let mut pending_peers = BTreeMap::new();
tracing::info!("Attempting to download blocks from peers, this may take a while.");
let mut check_client_pool_interval = interval(self.config.check_client_pool_interval);
check_client_pool_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
self.check_for_free_clients(&mut chain_tracker).await?;
self.check_for_free_clients(&mut chain_tracker, &mut pending_peers)
.await?;
loop {
tokio::select! {
_ = check_client_pool_interval.tick() => {
tracing::debug!("Checking client pool for free peers, timer fired.");
self.check_for_free_clients(&mut chain_tracker).await?;
self.check_for_free_clients(&mut chain_tracker, &mut pending_peers).await?;
// If we have no inflight requests, and we have had too many empty chain entries in a row assume the top has been found.
if self.inflight_requests.is_empty() && self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED {
tracing::debug!("Failed to find any more chain entries, probably fround the top");
return Ok(())
return Ok(());
}
}
Some(res) = self.block_download_tasks.join_next() => {
let (start_height, res) = res.expect("Download batch future panicked");
let BlockDownloadTaskResponse {
start_height,
result
} = res.expect("Download batch future panicked");
self.handle_download_batch_res(start_height, res, &mut chain_tracker).await?;
self.handle_download_batch_res(start_height, result, &mut chain_tracker, &mut pending_peers).await?;
// If we have no inflight requests, and we have had too many empty chain entries in a row assume the top has been found.
if self.inflight_requests.is_empty() && self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTRIES_BEFORE_TOP_ASSUMED {
tracing::debug!("Failed to find any more chain entries, probably fround the top");
return Ok(())
return Ok(());
}
}
Some(Ok(res)) = self.chain_entry_task.join_next() => {
@ -792,12 +796,12 @@ where
self.amount_of_empty_chain_entries += 1;
}
self.pending_peers
pending_peers
.entry(client.info.pruning_seed)
.or_default()
.push(client);
self.check_pending_peers(&mut chain_tracker).await;
self.check_pending_peers(&mut chain_tracker, &mut pending_peers).await;
}
Err(_) => self.amount_of_empty_chain_entries += 1
}
@ -807,6 +811,15 @@ where
}
}
/// The return value from the block download tasks.
struct BlockDownloadTaskResponse<N: NetworkZone> {
/// The start height of the batch.
start_height: u64,
/// A result containing the batch or an error.
result: Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError>,
}
/// Returns if a peer has all the blocks in a range, according to its [`PruningSeed`].
fn client_has_block_in_range(pruning_seed: &PruningSeed, start_height: u64, length: usize) -> bool {
pruning_seed.has_full_block(start_height, CRYPTONOTE_MAX_BLOCK_HEIGHT)
&& pruning_seed.has_full_block(
@ -818,9 +831,9 @@ fn client_has_block_in_range(pruning_seed: &PruningSeed, start_height: u64, leng
/// Calculates the next amount of blocks to request in a batch.
///
/// Parameters:
/// `previous_batch_size` is the size, in bytes, of the last batch,
/// `previous_batch_len` is the amount of blocks in the last batch,
/// `target_batch_size` is the target size, in bytes, of a batch.
/// - `previous_batch_size` is the size, in bytes, of the last batch
/// - `previous_batch_len` is the amount of blocks in the last batch
/// - `target_batch_size` is the target size, in bytes, of a batch
fn calculate_next_block_batch_size(
previous_batch_size: usize,
previous_batch_len: usize,
@ -833,7 +846,7 @@ fn calculate_next_block_batch_size(
// Set the amount of blocks to request equal to our target batch size divided by the adjusted_average_block_size.
let next_batch_len = max(target_batch_size / adjusted_average_block_size, 1);
// Cap the amount of growth to 1.5x the previous batch len, to prevent a small block casing us to request
// Cap the amount of growth to 1.5x the previous batch len, to prevent a small block causing us to request
// a huge amount of blocks.
let next_batch_len = min(next_batch_len, (previous_batch_len * 3).div_ceil(2));

View file

@ -35,8 +35,12 @@ pub struct BlocksToRetrieve<N: NetworkZone> {
pub failures: usize,
}
/// An error returned from the [`ChainTracker`].
#[derive(Debug, Clone)]
pub enum ChainTrackerError {
/// The new chain entry is invalid.
NewEntryIsInvalid,
/// The new chain entry does not follow from the top of our chain tracker.
NewEntryDoesNotFollowChain,
}
@ -155,8 +159,7 @@ impl<N: NetworkZone> ChainTracker<N> {
usize::try_from(
pruning_seed
.get_next_pruned_block(self.first_height, CRYPTONOTE_MAX_BLOCK_HEIGHT)
// We check the first height is less than CRYPTONOTE_MAX_BLOCK_HEIGHT in response task.
.unwrap()
.expect("We use local values to calculate height which should be below the sanity limit")
// Use a big value as a fallback if the seed does no pruning.
.unwrap_or(CRYPTONOTE_MAX_BLOCK_HEIGHT)
- self.first_height,

View file

@ -36,10 +36,6 @@ impl<N: NetworkZone> Drop for ClientPoolDropGuard<N> {
fn drop(&mut self) {
let client = self.client.take().unwrap();
if !client.info.handle.is_closed() {
tracing::warn!("peer dropped");
}
self.pool.add_client(client);
}
}

View file

@ -57,7 +57,7 @@ pub(crate) const BLOCK_DOWNLOADER_REQUEST_TIMEOUT: Duration = Duration::from_sec
/// ref: <https://monero-book.cuprate.org/consensus_rules/transactions.html#transaction-size>
pub(crate) const MAX_TRANSACTION_BLOB_SIZE: usize = 1_000_000;
/// The maximum amount of block IDS allowed in a chain entry response.
/// The maximum amount of block IDs allowed in a chain entry response.
///
/// ref: <https://github.com/monero-project/monero/blob/cc73fe71162d564ffda8e549b79a350bca53c454/src/cryptonote_config.h#L97>
// TODO: link to the protocol book when this section is added.
@ -79,4 +79,10 @@ mod tests {
fn outbound_diffusion_flush_shorter_than_inbound() {
assert!(DIFFUSION_FLUSH_AVERAGE_SECONDS_OUTBOUND < DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND);
}
/// Checks that the ban time increases from short to long.
#[test]
fn ban_times_sanity_check() {
assert!(SHORT_BAN < MEDIUM_BAN && MEDIUM_BAN < LONG_BAN);
}
}

View file

@ -17,7 +17,7 @@ use tracing::{instrument, Instrument, Span};
use monero_p2p::{
client::Connector,
client::InternalPeerID,
services::{AddressBookRequest, AddressBookResponse},
services::{AddressBookRequest, AddressBookResponse, PeerSyncRequest},
CoreSyncSvc, NetworkZone, PeerRequestHandler,
};
@ -35,7 +35,6 @@ pub use broadcast::{BroadcastRequest, BroadcastSvc};
use client_pool::ClientPoolDropGuard;
pub use config::P2PConfig;
use connection_maintainer::MakeConnectionRequest;
use monero_p2p::services::PeerSyncRequest;
/// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`].
///
@ -161,7 +160,7 @@ pub struct NetworkInterface<N: NetworkZone> {
make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
/// The address book service.
address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>,
/// The peers sync states service.
/// The peer's sync states service.
sync_states_svc: Buffer<sync_states::PeerSyncSvc<N>, PeerSyncRequest<N>>,
/// Background tasks that will be aborted when this interface is dropped.
_background_tasks: Arc<JoinSet<()>>,

View file

@ -14,10 +14,11 @@ use tower::{Service, ServiceExt};
use tracing::Instrument;
use cuprate_helper::asynch::InfallibleOneshotReceiver;
use monero_pruning::PruningSeed;
use crate::{
handles::ConnectionHandle, ConnectionDirection, NetworkZone, PeerError, PeerRequest,
PeerResponse, SharedError,
handles::{ConnectionGuard, ConnectionHandle},
ConnectionDirection, NetworkZone, PeerError, PeerRequest, PeerResponse, SharedError,
};
mod connection;
@ -25,10 +26,8 @@ mod connector;
pub mod handshaker;
mod timeout_monitor;
use crate::handles::ConnectionGuard;
pub use connector::{ConnectRequest, Connector};
pub use handshaker::{DoHandshakeRequest, HandShaker, HandshakeError};
use monero_pruning::PruningSeed;
/// An internal identifier for a given peer, will be their address if known
/// or a random u128 if not.