diff --git a/p2p/cuprate-p2p/src/bin/test_init.rs b/p2p/cuprate-p2p/src/bin/test_init.rs index 181cdfca..6f9153ec 100644 --- a/p2p/cuprate-p2p/src/bin/test_init.rs +++ b/p2p/cuprate-p2p/src/bin/test_init.rs @@ -160,7 +160,7 @@ async fn main() { BlockDownloaderConfig { buffer_size: 50_000_000, in_progress_queue_size: 30_000_000, - check_client_pool_interval: Duration::from_secs(10), + check_client_pool_interval: Duration::from_secs(30), target_batch_size: 5_000_000, initial_batch_size: 10, }, diff --git a/p2p/cuprate-p2p/src/block_downloader.rs b/p2p/cuprate-p2p/src/block_downloader.rs index bf128d14..85798c2c 100644 --- a/p2p/cuprate-p2p/src/block_downloader.rs +++ b/p2p/cuprate-p2p/src/block_downloader.rs @@ -1,8 +1,10 @@ //! # Block Downloader //! +use indexmap::IndexSet; use std::{ cmp::{max, min, Ordering, Reverse}, collections::{BTreeMap, BinaryHeap, HashSet}, + mem, sync::Arc, time::Duration, }; @@ -37,7 +39,7 @@ use crate::{ mod chain_tracker; use crate::constants::{ CHAIN_ENTRY_REQUEST_TIMEOUT, MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MAX_BLOCK_BATCH_LEN, - MAX_TRANSACTION_BLOB_SIZE, + MAX_DOWNLOAD_FAILURES, MAX_TRANSACTION_BLOB_SIZE, }; use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker}; @@ -186,11 +188,10 @@ impl Ord for ReadyQueueBatch { /// # Block Downloader /// /// This is the block downloader, which finds a chain to follow and attempts to follow it, adding the -/// downloaded blocks to a [`async_buffer`]. +/// downloaded blocks to an [`async_buffer`]. struct BlockDownloader<N: NetworkZone, S, C> { /// The client pool. client_pool: Arc<ClientPool<N>>, - /// Peers that have pruned the blocks we are currently downloading. /// /// These peers are ready for when we reach blocks that they have not pruned. @@ -206,6 +207,11 @@ struct BlockDownloader<N: NetworkZone, S, C> { /// The height at which `amount_of_blocks_to_request` was updated. amount_of_blocks_to_request_updated_at: u64, + /// The amount of consecutive empty chain entries we received. + /// + /// An empty chain entry means we reached the peers chain tip. + amount_of_empty_chain_entries: usize, + /// The running block download tasks. /// /// Returns: @@ -268,6 +274,7 @@ where our_chain_svc, amount_of_blocks_to_request: config.initial_batch_size, amount_of_blocks_to_request_updated_at: 0, + amount_of_empty_chain_entries: 0, block_download_tasks: JoinSet::new(), chain_entry_task: JoinSet::new(), inflight_requests: BTreeMap::new(), @@ -279,7 +286,29 @@ where } } - async fn request_inflight_batch_again(&mut self, client: ClientPoolDropGuard<N>) { + async fn check_pending_peers(&mut self, chain_tracker: &mut ChainTracker<N>) { + // HACK: The borrow checker doesn't like the following code, if we don't do this. + let mut pending_peers = mem::take(&mut self.pending_peers); + + for (_, peers) in pending_peers.iter_mut() { + while let Some(peer) = peers.pop() { + if let Some(peer) = self.try_handle_free_client(chain_tracker, peer).await { + // This peer is ok however it does not have the data we currently need. + peers.push(peer); + break; + } + } + } + // Make sure the calls to `try_handle_free_client` did not add peers to this. + assert!(self.pending_peers.is_empty()); + + self.pending_peers = pending_peers; + } + + async fn request_inflight_batch_again( + &mut self, + client: ClientPoolDropGuard<N>, + ) -> Option<ClientPoolDropGuard<N>> { if self.inflight_requests.is_empty() { panic!("We need requests inflight to be able to send the request again") } @@ -303,11 +332,21 @@ where { let mut first_batch = self.inflight_requests.first_entry().unwrap(); - first_batch.get_mut().requests_sent += 1; + let first_batch_mut = first_batch.get_mut(); + + if !client_has_block_in_range( + &client.info.pruning_seed, + first_batch_mut.start_height, + first_batch_mut.ids.len(), + ) { + return Some(client); + } + + first_batch_mut.requests_sent += 1; // They should have the blocks so send the re-request to this peer. - let ids = first_batch.get().ids.clone(); - let start_height = first_batch.get().start_height; + let ids = first_batch_mut.ids.clone(); + let start_height = first_batch_mut.start_height; self.block_download_tasks.spawn(async move { ( @@ -316,7 +355,7 @@ where ) }); - return; + return None; } let next_batch = self @@ -326,6 +365,14 @@ where .unwrap() .1; + if !client_has_block_in_range( + &client.info.pruning_seed, + next_batch.start_height, + next_batch.ids.len(), + ) { + return Some(client); + } + next_batch.requests_sent += 1; // They should have the blocks so send the re-request to this peer. @@ -338,6 +385,8 @@ where request_batch_from_peer(client, ids, start_height).await, ) }); + + None } /// Spawns a task to request blocks from the given peer. @@ -345,7 +394,7 @@ where &mut self, chain_tracker: &mut ChainTracker<N>, client: ClientPoolDropGuard<N>, - ) { + ) -> Option<ClientPoolDropGuard<N>> { // First look to see if we have any failed requests. while let Some(failed_request) = self.failed_batches.peek() { // Check if we still have the request that failed - another peer could have completed it after @@ -374,7 +423,7 @@ where // Remove the failure, we have just handled it. self.failed_batches.pop(); - return; + return None; } break; @@ -385,8 +434,7 @@ where } if self.ready_batches_size >= self.config.in_progress_queue_size { - self.request_inflight_batch_again(client).await; - return; + return self.request_inflight_batch_again(client).await; } // No failed requests that we can handle, request some new blocks. @@ -394,12 +442,7 @@ where let Some(block_entry_to_get) = chain_tracker .blocks_to_get(&client.info.pruning_seed, self.amount_of_blocks_to_request) else { - self.pending_peers - .entry(client.info.pruning_seed) - .or_default() - .push(client); - - return; + return Some(client); }; self.inflight_requests @@ -416,14 +459,17 @@ where .await, ) }); + + None } - async fn handle_free_client( + async fn try_handle_free_client( &mut self, chain_tracker: &mut ChainTracker<N>, client: ClientPoolDropGuard<N>, - ) { + ) -> Option<ClientPoolDropGuard<N>> { if self.chain_entry_task.len() < 2 + && self.amount_of_empty_chain_entries < 2 && chain_tracker.block_requests_queued(self.amount_of_blocks_to_request) < 500 && chain_tracker.should_ask_for_next_chain_entry(&client.info.pruning_seed) { @@ -438,10 +484,10 @@ where .map_err(|_| BlockDownloadError::TimedOut)? }); - return; + return None; } - self.request_block_batch(chain_tracker, client).await; + self.request_block_batch(chain_tracker, client).await } async fn check_for_free_clients( @@ -476,14 +522,15 @@ where tracing::debug!("Response received from peer sync service"); - // Rust borrow rules mean we have to build a vec here. - let mut clients = Vec::with_capacity(peers.len()); - clients.extend(self.client_pool.borrow_clients(&peers)); - - for peer in clients { - self.handle_free_client(chain_tracker, peer).await; + for client in self.client_pool.borrow_clients(&peers) { + self.pending_peers + .entry(client.info.pruning_seed) + .or_default() + .push(client); } + self.check_pending_peers(chain_tracker).await; + Ok(()) } @@ -532,10 +579,9 @@ where ) -> Result<(), BlockDownloadError> { match res { Err(e) => { - // TODO: (IMPORTANT) check if this failure is from the peer who told us about the batch, if so ban them. - if matches!(e, BlockDownloadError::ChainInvalid) { - // If the chain was invalid ban the peer who told us about it. + // If the chain was invalid ban the peer who told us about it and error here to stop the + // block downloader. self.inflight_requests .get(&start_height) .inspect(|entry| entry.peer_who_told_us_handle.ban_peer(LONG_BAN)); @@ -544,7 +590,12 @@ where } // Add the request to the failed list. - if self.inflight_requests.contains_key(&start_height) { + if let Some(batch) = self.inflight_requests.get_mut(&start_height) { + batch.failures += 1; + if batch.failures > MAX_DOWNLOAD_FAILURES { + return Err(BlockDownloadError::TimedOut); + } + self.failed_batches.push(Reverse(start_height)) } @@ -555,7 +606,13 @@ where if self.inflight_requests.remove(&start_height).is_none() { // If it was already retrieved then there is nothing else to do. // TODO: should we drop this peer for being slow? - self.handle_free_client(chain_tracker, client).await; + self.pending_peers + .entry(client.info.pruning_seed) + .or_default() + .push(client); + + self.check_pending_peers(chain_tracker).await; + return Ok(()); }; @@ -581,8 +638,13 @@ where // Attempt to push new batches to the buffer. self.push_new_blocks().await?; - // Give more work to this client. - self.handle_free_client(chain_tracker, client).await; + self.pending_peers + .entry(client.info.pruning_seed) + .or_default() + .push(client); + + self.check_pending_peers(chain_tracker).await; + Ok(()) } } @@ -614,10 +676,20 @@ where Some(Ok(res)) = self.chain_entry_task.join_next() => { match res { Ok((client, entry)) => { - if chain_tracker.add_entry(entry).is_ok() { - + if entry.ids.len() == 1 { + self.amount_of_empty_chain_entries += 1; } - self.handle_free_client(&mut chain_tracker, client).await; + + if chain_tracker.add_entry(entry).is_ok() { + self.amount_of_empty_chain_entries = 0; + } + + self.pending_peers + .entry(client.info.pruning_seed) + .or_default() + .push(client); + + self.check_pending_peers(&mut chain_tracker).await; } Err(_) => {} } @@ -630,6 +702,14 @@ where } } +fn client_has_block_in_range(pruning_seed: &PruningSeed, start_height: u64, length: usize) -> bool { + pruning_seed.has_full_block(start_height, CRYPTONOTE_MAX_BLOCK_HEIGHT) + && pruning_seed.has_full_block( + start_height + u64::try_from(length).unwrap(), + CRYPTONOTE_MAX_BLOCK_HEIGHT, + ) +} + /// Calculates the next amount of blocks to request in a batch. /// /// Parameters: diff --git a/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs b/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs index 7b9fa634..829f92f3 100644 --- a/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs +++ b/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs @@ -31,6 +31,8 @@ pub struct BlocksToRetrieve<N: NetworkZone> { pub peer_who_told_us_handle: ConnectionHandle, /// The number of requests sent for this batch. pub requests_sent: usize, + /// The number of times this batch has been requested from a peer and failed. + pub failures: usize, } pub enum ChainTrackerError { @@ -173,6 +175,7 @@ impl<N: NetworkZone> ChainTracker<N> { peer_who_told_us: entry.peer, peer_who_told_us_handle: entry.handle.clone(), requests_sent: 0, + failures: 0, }; self.first_height += u64::try_from(end_idx).unwrap(); diff --git a/p2p/cuprate-p2p/src/constants.rs b/p2p/cuprate-p2p/src/constants.rs index 97d52655..883b40f1 100644 --- a/p2p/cuprate-p2p/src/constants.rs +++ b/p2p/cuprate-p2p/src/constants.rs @@ -63,6 +63,8 @@ pub(crate) const MAX_TRANSACTION_BLOB_SIZE: usize = 1_000_000; // TODO: link to the protocol book when this section is added. pub(crate) const MAX_BLOCKS_IDS_IN_CHAIN_ENTRY: usize = 25_000; +pub(crate) const MAX_DOWNLOAD_FAILURES: usize = 3; + #[cfg(test)] mod tests { use super::*;