From fd8885174bc3384ecc61849292dbbeb6c2715807 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Tue, 4 Jun 2024 02:42:53 +0100 Subject: [PATCH] more checks on incoming data --- p2p/cuprate-p2p/src/bin/test_init.rs | 10 ++-- p2p/cuprate-p2p/src/block_downloader.rs | 51 +++++++++++++++---- .../src/client_pool/drop_guard_client.rs | 4 ++ p2p/cuprate-p2p/src/constants.rs | 5 +- 4 files changed, 53 insertions(+), 17 deletions(-) diff --git a/p2p/cuprate-p2p/src/bin/test_init.rs b/p2p/cuprate-p2p/src/bin/test_init.rs index ffc06988..248e389e 100644 --- a/p2p/cuprate-p2p/src/bin/test_init.rs +++ b/p2p/cuprate-p2p/src/bin/test_init.rs @@ -132,7 +132,7 @@ async fn main() { let config = P2PConfig:: { network: Default::default(), - outbound_connections: 32, + outbound_connections: 64, extra_outbound_connections: 32, max_inbound_connections: 128, gray_peers_percent: 0.7, @@ -143,7 +143,7 @@ async fn main() { max_white_list_length: 1000, max_gray_list_length: 5000, peer_store_file: PathBuf::from_str("p2p_store").unwrap(), - peer_save_period: Duration::from_secs(30), + peer_save_period: Duration::from_secs(300), }, }; @@ -151,7 +151,7 @@ async fn main() { .await .unwrap(); - sleep(Duration::from_secs(17)).await; + sleep(Duration::from_secs(15)).await; loop { let mut buffer = download_blocks( @@ -161,8 +161,8 @@ async fn main() { BlockDownloaderConfig { buffer_size: 50_000_000, in_progress_queue_size: 30_000_000, - check_client_pool_interval: Duration::from_secs(20), - target_batch_size: 2_000_000, + check_client_pool_interval: Duration::from_secs(10), + target_batch_size: 5_000_000, initial_batch_size: 10, }, ); diff --git a/p2p/cuprate-p2p/src/block_downloader.rs b/p2p/cuprate-p2p/src/block_downloader.rs index 0a3f0d5e..4404bfc2 100644 --- a/p2p/cuprate-p2p/src/block_downloader.rs +++ b/p2p/cuprate-p2p/src/block_downloader.rs @@ -3,6 +3,7 @@ mod chain_tracker; +use futures::FutureExt; use std::cmp::{max, min, Ordering, Reverse}; use std::collections::{BTreeMap, BinaryHeap, HashSet, VecDeque}; use std::sync::Arc; @@ -29,7 +30,7 @@ use monero_pruning::CRYPTONOTE_MAX_BLOCK_HEIGHT; use monero_wire::protocol::{ChainRequest, ChainResponse, GetObjectsRequest}; use crate::client_pool::{ClientPool, ClientPoolDropGuard}; -use crate::constants::{INITIAL_CHAIN_REQUESTS_TO_SEND, MEDIUM_BAN}; +use crate::constants::{INITIAL_CHAIN_REQUESTS_TO_SEND, LONG_BAN, MEDIUM_BAN}; /// A downloaded batch of blocks. #[derive(Debug)] @@ -66,6 +67,8 @@ pub enum BlockDownloadError { PeerDidNotHaveRequestedData, #[error("The peers response to a request was invalid.")] PeersResponseWasInvalid, + #[error("The chain we are following is invalid.")] + ChainInvalid, #[error("Failed to find a more advanced chain to follow")] FailedToFindAChainToFollow, #[error("The peer did not send any overlapping blocks, unknown start height.")] @@ -381,6 +384,8 @@ where &mut self, chain_tracker: &mut ChainTracker, ) -> Result<(), BlockDownloadError> { + tracing::debug!("Checking for free peers"); + // This value might be slightly behind but thats ok. let ChainSvcResponse::CumulativeDifficulty(current_cumulative_difficulty) = self .our_chain_svc @@ -405,6 +410,8 @@ where panic!("Chain service returned ") }; + tracing::debug!("Response received from peer sync service"); + // Rust borrow rules mean we have to build a vec here. let mut clients = Vec::with_capacity(peers.len()); clients.extend(self.client_pool.borrow_clients(&peers)); @@ -449,7 +456,15 @@ where chain_tracker: &mut ChainTracker, ) -> Result<(), BlockDownloadError> { match res { - Err(_) => { + Err(e) => { + if matches!(e, BlockDownloadError::ChainInvalid) { + self.inflight_requests + .get(&start_height) + .inspect(|entry| entry.peer_who_told_us_handle.ban_peer(LONG_BAN)); + + return Err(e); + } + if self.inflight_requests.contains_key(&start_height) { self.failed_batches.push(Reverse(start_height)) } @@ -512,13 +527,11 @@ where ) .await?; - // The request ID for which we updated `self.amount_of_blocks_to_request` - // `amount_of_blocks_to_request` will update for every new batch of blocks that come in. - let mut amount_of_blocks_to_request_updated_at = 0; - let mut check_client_pool_interval = interval(self.config.check_client_pool_interval); check_client_pool_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + self.check_for_free_clients(&mut chain_tracker).await?; + loop { tokio::select! { _ = check_client_pool_interval.tick() => { @@ -559,7 +572,7 @@ async fn request_batch_from_peer( .ready() .await? .call(PeerRequest::GetObjects(GetObjectsRequest { - blocks: ids, + blocks: ids.clone(), pruned: false, })) .await? @@ -582,16 +595,28 @@ async fn request_batch_from_peer( let blocks = blocks_response .blocks .into_par_iter() - .map(|block_entry| { + .enumerate() + .map(|(i, block)| (i, u64::try_from(i).unwrap() + expected_start_height, block)) + .map(|(i, expected_height, block_entry)| { let mut size = block_entry.block.len(); let block = Block::read(&mut block_entry.block.as_ref()) .map_err(|_| BlockDownloadError::PeersResponseWasInvalid)?; - if block.txs.len() != block_entry.txs.len() { + if ids[i] != block.hash() || block.txs.len() != block_entry.txs.len() { return Err(BlockDownloadError::PeersResponseWasInvalid); } + if !block + .number() + .is_some_and(|height| height == expected_height) + { + panic!("{} {}", expected_height, block.number().unwrap()); + // This peer probably did nothing wrong, it was the peer who told us this blockID which + // is misbehaving. + return Err(BlockDownloadError::ChainInvalid); + } + let txs = block_entry .txs .take_normal() @@ -651,7 +676,7 @@ async fn request_chain_entry_from_peer( .await? .call(PeerRequest::GetChain(ChainRequest { block_ids: short_history.into(), - prune: false, + prune: true, })) .await? else { @@ -793,7 +818,11 @@ where handle: peer_handle, }; - let tracker = ChainTracker::new(first_entry, start_height, our_genesis); + let tracker = ChainTracker::new( + first_entry, + start_height + u64::try_from(first_unknown).unwrap(), + our_genesis, + ); Ok(tracker) } diff --git a/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs b/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs index d8c20c6e..5d11acba 100644 --- a/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs +++ b/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs @@ -36,6 +36,10 @@ impl Drop for ClientPoolDropGuard { fn drop(&mut self) { let client = self.client.take().unwrap(); + if !client.info.handle.is_closed() { + tracing::warn!("peer dropped"); + } + self.pool.add_client(client); } } diff --git a/p2p/cuprate-p2p/src/constants.rs b/p2p/cuprate-p2p/src/constants.rs index df6f5ee7..6e6ce0b3 100644 --- a/p2p/cuprate-p2p/src/constants.rs +++ b/p2p/cuprate-p2p/src/constants.rs @@ -13,7 +13,10 @@ pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_ pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10); /// The durations of a medium ban. -pub(crate) const MEDIUM_BAN: Duration = Duration::from_secs(60 * 10 * 24); +pub(crate) const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24); + +/// The durations of a long ban. +pub(crate) const LONG_BAN: Duration = Duration::from_secs(60 * 60 * 24 * 7); /// The default amount of time between inbound diffusion flushes. pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5);