keep peers that dont have the current need data

This commit is contained in:
Boog900 2024-06-10 19:17:09 +01:00
parent 6776264dc6
commit 78e63e2cbf
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
4 changed files with 124 additions and 39 deletions

View file

@ -160,7 +160,7 @@ async fn main() {
BlockDownloaderConfig {
buffer_size: 50_000_000,
in_progress_queue_size: 30_000_000,
check_client_pool_interval: Duration::from_secs(10),
check_client_pool_interval: Duration::from_secs(30),
target_batch_size: 5_000_000,
initial_batch_size: 10,
},

View file

@ -1,8 +1,10 @@
//! # Block Downloader
//!
use indexmap::IndexSet;
use std::{
cmp::{max, min, Ordering, Reverse},
collections::{BTreeMap, BinaryHeap, HashSet},
mem,
sync::Arc,
time::Duration,
};
@ -37,7 +39,7 @@ use crate::{
mod chain_tracker;
use crate::constants::{
CHAIN_ENTRY_REQUEST_TIMEOUT, MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MAX_BLOCK_BATCH_LEN,
MAX_TRANSACTION_BLOB_SIZE,
MAX_DOWNLOAD_FAILURES, MAX_TRANSACTION_BLOB_SIZE,
};
use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker};
@ -186,11 +188,10 @@ impl Ord for ReadyQueueBatch {
/// # Block Downloader
///
/// This is the block downloader, which finds a chain to follow and attempts to follow it, adding the
/// downloaded blocks to a [`async_buffer`].
/// downloaded blocks to an [`async_buffer`].
struct BlockDownloader<N: NetworkZone, S, C> {
/// The client pool.
client_pool: Arc<ClientPool<N>>,
/// Peers that have pruned the blocks we are currently downloading.
///
/// These peers are ready for when we reach blocks that they have not pruned.
@ -206,6 +207,11 @@ struct BlockDownloader<N: NetworkZone, S, C> {
/// The height at which `amount_of_blocks_to_request` was updated.
amount_of_blocks_to_request_updated_at: u64,
/// The amount of consecutive empty chain entries we received.
///
/// An empty chain entry means we reached the peers chain tip.
amount_of_empty_chain_entries: usize,
/// The running block download tasks.
///
/// Returns:
@ -268,6 +274,7 @@ where
our_chain_svc,
amount_of_blocks_to_request: config.initial_batch_size,
amount_of_blocks_to_request_updated_at: 0,
amount_of_empty_chain_entries: 0,
block_download_tasks: JoinSet::new(),
chain_entry_task: JoinSet::new(),
inflight_requests: BTreeMap::new(),
@ -279,7 +286,29 @@ where
}
}
async fn request_inflight_batch_again(&mut self, client: ClientPoolDropGuard<N>) {
async fn check_pending_peers(&mut self, chain_tracker: &mut ChainTracker<N>) {
// HACK: The borrow checker doesn't like the following code, if we don't do this.
let mut pending_peers = mem::take(&mut self.pending_peers);
for (_, peers) in pending_peers.iter_mut() {
while let Some(peer) = peers.pop() {
if let Some(peer) = self.try_handle_free_client(chain_tracker, peer).await {
// This peer is ok however it does not have the data we currently need.
peers.push(peer);
break;
}
}
}
// Make sure the calls to `try_handle_free_client` did not add peers to this.
assert!(self.pending_peers.is_empty());
self.pending_peers = pending_peers;
}
async fn request_inflight_batch_again(
&mut self,
client: ClientPoolDropGuard<N>,
) -> Option<ClientPoolDropGuard<N>> {
if self.inflight_requests.is_empty() {
panic!("We need requests inflight to be able to send the request again")
}
@ -303,11 +332,21 @@ where
{
let mut first_batch = self.inflight_requests.first_entry().unwrap();
first_batch.get_mut().requests_sent += 1;
let first_batch_mut = first_batch.get_mut();
if !client_has_block_in_range(
&client.info.pruning_seed,
first_batch_mut.start_height,
first_batch_mut.ids.len(),
) {
return Some(client);
}
first_batch_mut.requests_sent += 1;
// They should have the blocks so send the re-request to this peer.
let ids = first_batch.get().ids.clone();
let start_height = first_batch.get().start_height;
let ids = first_batch_mut.ids.clone();
let start_height = first_batch_mut.start_height;
self.block_download_tasks.spawn(async move {
(
@ -316,7 +355,7 @@ where
)
});
return;
return None;
}
let next_batch = self
@ -326,6 +365,14 @@ where
.unwrap()
.1;
if !client_has_block_in_range(
&client.info.pruning_seed,
next_batch.start_height,
next_batch.ids.len(),
) {
return Some(client);
}
next_batch.requests_sent += 1;
// They should have the blocks so send the re-request to this peer.
@ -338,6 +385,8 @@ where
request_batch_from_peer(client, ids, start_height).await,
)
});
None
}
/// Spawns a task to request blocks from the given peer.
@ -345,7 +394,7 @@ where
&mut self,
chain_tracker: &mut ChainTracker<N>,
client: ClientPoolDropGuard<N>,
) {
) -> Option<ClientPoolDropGuard<N>> {
// First look to see if we have any failed requests.
while let Some(failed_request) = self.failed_batches.peek() {
// Check if we still have the request that failed - another peer could have completed it after
@ -374,7 +423,7 @@ where
// Remove the failure, we have just handled it.
self.failed_batches.pop();
return;
return None;
}
break;
@ -385,8 +434,7 @@ where
}
if self.ready_batches_size >= self.config.in_progress_queue_size {
self.request_inflight_batch_again(client).await;
return;
return self.request_inflight_batch_again(client).await;
}
// No failed requests that we can handle, request some new blocks.
@ -394,12 +442,7 @@ where
let Some(block_entry_to_get) = chain_tracker
.blocks_to_get(&client.info.pruning_seed, self.amount_of_blocks_to_request)
else {
self.pending_peers
.entry(client.info.pruning_seed)
.or_default()
.push(client);
return;
return Some(client);
};
self.inflight_requests
@ -416,14 +459,17 @@ where
.await,
)
});
None
}
async fn handle_free_client(
async fn try_handle_free_client(
&mut self,
chain_tracker: &mut ChainTracker<N>,
client: ClientPoolDropGuard<N>,
) {
) -> Option<ClientPoolDropGuard<N>> {
if self.chain_entry_task.len() < 2
&& self.amount_of_empty_chain_entries < 2
&& chain_tracker.block_requests_queued(self.amount_of_blocks_to_request) < 500
&& chain_tracker.should_ask_for_next_chain_entry(&client.info.pruning_seed)
{
@ -438,10 +484,10 @@ where
.map_err(|_| BlockDownloadError::TimedOut)?
});
return;
return None;
}
self.request_block_batch(chain_tracker, client).await;
self.request_block_batch(chain_tracker, client).await
}
async fn check_for_free_clients(
@ -476,14 +522,15 @@ where
tracing::debug!("Response received from peer sync service");
// Rust borrow rules mean we have to build a vec here.
let mut clients = Vec::with_capacity(peers.len());
clients.extend(self.client_pool.borrow_clients(&peers));
for peer in clients {
self.handle_free_client(chain_tracker, peer).await;
for client in self.client_pool.borrow_clients(&peers) {
self.pending_peers
.entry(client.info.pruning_seed)
.or_default()
.push(client);
}
self.check_pending_peers(chain_tracker).await;
Ok(())
}
@ -532,10 +579,9 @@ where
) -> Result<(), BlockDownloadError> {
match res {
Err(e) => {
// TODO: (IMPORTANT) check if this failure is from the peer who told us about the batch, if so ban them.
if matches!(e, BlockDownloadError::ChainInvalid) {
// If the chain was invalid ban the peer who told us about it.
// If the chain was invalid ban the peer who told us about it and error here to stop the
// block downloader.
self.inflight_requests
.get(&start_height)
.inspect(|entry| entry.peer_who_told_us_handle.ban_peer(LONG_BAN));
@ -544,7 +590,12 @@ where
}
// Add the request to the failed list.
if self.inflight_requests.contains_key(&start_height) {
if let Some(batch) = self.inflight_requests.get_mut(&start_height) {
batch.failures += 1;
if batch.failures > MAX_DOWNLOAD_FAILURES {
return Err(BlockDownloadError::TimedOut);
}
self.failed_batches.push(Reverse(start_height))
}
@ -555,7 +606,13 @@ where
if self.inflight_requests.remove(&start_height).is_none() {
// If it was already retrieved then there is nothing else to do.
// TODO: should we drop this peer for being slow?
self.handle_free_client(chain_tracker, client).await;
self.pending_peers
.entry(client.info.pruning_seed)
.or_default()
.push(client);
self.check_pending_peers(chain_tracker).await;
return Ok(());
};
@ -581,8 +638,13 @@ where
// Attempt to push new batches to the buffer.
self.push_new_blocks().await?;
// Give more work to this client.
self.handle_free_client(chain_tracker, client).await;
self.pending_peers
.entry(client.info.pruning_seed)
.or_default()
.push(client);
self.check_pending_peers(chain_tracker).await;
Ok(())
}
}
@ -614,10 +676,20 @@ where
Some(Ok(res)) = self.chain_entry_task.join_next() => {
match res {
Ok((client, entry)) => {
if chain_tracker.add_entry(entry).is_ok() {
if entry.ids.len() == 1 {
self.amount_of_empty_chain_entries += 1;
}
self.handle_free_client(&mut chain_tracker, client).await;
if chain_tracker.add_entry(entry).is_ok() {
self.amount_of_empty_chain_entries = 0;
}
self.pending_peers
.entry(client.info.pruning_seed)
.or_default()
.push(client);
self.check_pending_peers(&mut chain_tracker).await;
}
Err(_) => {}
}
@ -630,6 +702,14 @@ where
}
}
fn client_has_block_in_range(pruning_seed: &PruningSeed, start_height: u64, length: usize) -> bool {
pruning_seed.has_full_block(start_height, CRYPTONOTE_MAX_BLOCK_HEIGHT)
&& pruning_seed.has_full_block(
start_height + u64::try_from(length).unwrap(),
CRYPTONOTE_MAX_BLOCK_HEIGHT,
)
}
/// Calculates the next amount of blocks to request in a batch.
///
/// Parameters:

View file

@ -31,6 +31,8 @@ pub struct BlocksToRetrieve<N: NetworkZone> {
pub peer_who_told_us_handle: ConnectionHandle,
/// The number of requests sent for this batch.
pub requests_sent: usize,
/// The number of times this batch has been requested from a peer and failed.
pub failures: usize,
}
pub enum ChainTrackerError {
@ -173,6 +175,7 @@ impl<N: NetworkZone> ChainTracker<N> {
peer_who_told_us: entry.peer,
peer_who_told_us_handle: entry.handle.clone(),
requests_sent: 0,
failures: 0,
};
self.first_height += u64::try_from(end_idx).unwrap();

View file

@ -63,6 +63,8 @@ pub(crate) const MAX_TRANSACTION_BLOB_SIZE: usize = 1_000_000;
// TODO: link to the protocol book when this section is added.
pub(crate) const MAX_BLOCKS_IDS_IN_CHAIN_ENTRY: usize = 25_000;
pub(crate) const MAX_DOWNLOAD_FAILURES: usize = 3;
#[cfg(test)]
mod tests {
use super::*;