diff --git a/p2p/cuprate-p2p/src/block_downloader.rs b/p2p/cuprate-p2p/src/block_downloader.rs index 289935d2..3f7f7e73 100644 --- a/p2p/cuprate-p2p/src/block_downloader.rs +++ b/p2p/cuprate-p2p/src/block_downloader.rs @@ -39,19 +39,16 @@ use crate::{ mod block_queue; mod chain_tracker; - -use crate::block_downloader::request_chain::{initial_chain_search, request_chain_entry_from_peer}; -use block_queue::{BlockQueue, ReadyQueueBatch}; -use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker}; -use download_batch::download_batch_task; - -// TODO: check first block in batch prev_id - mod download_batch; mod request_chain; #[cfg(test)] mod tests; +use block_queue::{BlockQueue, ReadyQueueBatch}; +use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker}; +use download_batch::download_batch_task; +use request_chain::{initial_chain_search, request_chain_entry_from_peer}; + /// A downloaded batch of blocks. #[derive(Debug, Clone)] pub struct BlockBatch { @@ -218,10 +215,6 @@ struct BlockDownloader { amount_of_empty_chain_entries: usize, /// The running block download tasks. - /// - /// Returns: - /// - The start height of the batch - /// - A result containing the batch or an error. block_download_tasks: JoinSet>, /// The running chain entry tasks. /// @@ -342,6 +335,7 @@ where self.block_download_tasks.spawn(download_batch_task( client, in_flight_batch.ids.clone(), + in_flight_batch.prev_id, in_flight_batch.start_height, in_flight_batch.requests_sent, )); @@ -383,15 +377,14 @@ where ) { tracing::debug!("Using peer to request a failed batch"); // They should have the blocks so send the re-request to this peer. - let ids = request.ids.clone(); - let start_height = request.start_height; request.requests_sent += 1; self.block_download_tasks.spawn(download_batch_task( client, - ids, - start_height, + request.ids.clone(), + request.prev_id, + request.start_height, request.requests_sent, )); @@ -426,6 +419,7 @@ where self.block_download_tasks.spawn(download_batch_task( client, block_entry_to_get.ids.clone(), + block_entry_to_get.prev_id, block_entry_to_get.start_height, block_entry_to_get.requests_sent, )); diff --git a/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs b/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs index 018d9b6c..6e66f2a7 100644 --- a/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs +++ b/p2p/cuprate-p2p/src/block_downloader/chain_tracker.rs @@ -23,6 +23,8 @@ pub(crate) struct ChainEntry { pub struct BlocksToRetrieve { /// The block IDs to get. pub ids: ByteArrayVec<32>, + /// The hash of the last block before this batch. + pub prev_id: [u8; 32], /// The expected height of the first block in [`BlocksToRetrieve::ids`]. pub start_height: u64, /// The peer who told us about this batch. @@ -51,17 +53,24 @@ pub enum ChainTrackerError { pub struct ChainTracker { /// A list of [`ChainEntry`]s, in order. entries: VecDeque>, - /// The height of the first block, in the first entry in entries. + /// The height of the first block, in the first entry in [`Self::entries`]. first_height: u64, /// The hash of the last block in the last entry. top_seen_hash: [u8; 32], + /// The hash of the block one below [`Self::first_height`]. + previous_hash: [u8; 32], /// The hash of the genesis block. our_genesis: [u8; 32], } impl ChainTracker { /// Creates a new chain tracker. - pub fn new(new_entry: ChainEntry, first_height: u64, our_genesis: [u8; 32]) -> Self { + pub fn new( + new_entry: ChainEntry, + first_height: u64, + our_genesis: [u8; 32], + previous_hash: [u8; 32], + ) -> Self { let top_seen_hash = *new_entry.ids.last().unwrap(); let mut entries = VecDeque::with_capacity(1); entries.push_back(new_entry); @@ -70,6 +79,7 @@ impl ChainTracker { top_seen_hash, entries, first_height, + previous_hash, our_genesis, } } @@ -180,6 +190,7 @@ impl ChainTracker { let blocks = BlocksToRetrieve { ids: ids_to_get.into(), + prev_id: self.previous_hash, start_height: self.first_height, peer_who_told_us: entry.peer, peer_who_told_us_handle: entry.handle.clone(), @@ -188,6 +199,8 @@ impl ChainTracker { }; self.first_height += u64::try_from(end_idx).unwrap(); + // TODO: improve ByteArrayVec API. + self.previous_hash = blocks.ids[blocks.ids.len() - 1]; if entry.ids.is_empty() { self.entries.pop_front(); diff --git a/p2p/cuprate-p2p/src/block_downloader/download_batch.rs b/p2p/cuprate-p2p/src/block_downloader/download_batch.rs index f99714c4..8cdde41e 100644 --- a/p2p/cuprate-p2p/src/block_downloader/download_batch.rs +++ b/p2p/cuprate-p2p/src/block_downloader/download_batch.rs @@ -6,10 +6,10 @@ use tokio::time::timeout; use tower::{Service, ServiceExt}; use tracing::instrument; -use monero_p2p::{NetworkZone, PeerRequest, PeerResponse, handles::ConnectionHandle}; -use monero_wire::protocol::{GetObjectsRequest, GetObjectsResponse}; use cuprate_helper::asynch::rayon_spawn_async; use fixed_bytes::ByteArrayVec; +use monero_p2p::{handles::ConnectionHandle, NetworkZone, PeerRequest, PeerResponse}; +use monero_wire::protocol::{GetObjectsRequest, GetObjectsResponse}; use crate::{ block_downloader::{BlockBatch, BlockDownloadError, BlockDownloadTaskResponse}, @@ -17,26 +17,26 @@ use crate::{ constants::{BLOCK_DOWNLOADER_REQUEST_TIMEOUT, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN}, }; - /// Attempts to request a batch of blocks from a peer, returning [`BlockDownloadTaskResponse`]. #[instrument( - level = "debug", - name = "download_batch", + level = "debug", + name = "download_batch", skip_all, fields( - start_height = expected_start_height, + start_height = expected_start_height, attempt = _attempt ) )] pub async fn download_batch_task( client: ClientPoolDropGuard, ids: ByteArrayVec<32>, + previous_id: [u8; 32], expected_start_height: u64, _attempt: usize, ) -> BlockDownloadTaskResponse { BlockDownloadTaskResponse { start_height: expected_start_height, - result: request_batch_from_peer(client, ids, expected_start_height).await, + result: request_batch_from_peer(client, ids, previous_id, expected_start_height).await, } } @@ -47,6 +47,7 @@ pub async fn download_batch_task( async fn request_batch_from_peer( mut client: ClientPoolDropGuard, ids: ByteArrayVec<32>, + previous_id: [u8; 32], expected_start_height: u64, ) -> Result<(ClientPoolDropGuard, BlockBatch), BlockDownloadError> { // Request the blocks. @@ -80,7 +81,13 @@ async fn request_batch_from_peer( let peer_handle = client.info.handle.clone(); let blocks = rayon_spawn_async(move || { - deserialize_batch(blocks_response, expected_start_height, ids, peer_handle) + deserialize_batch( + blocks_response, + expected_start_height, + ids, + previous_id, + peer_handle, + ) }) .await; @@ -98,6 +105,7 @@ fn deserialize_batch( blocks_response: GetObjectsResponse, expected_start_height: u64, requested_ids: ByteArrayVec<32>, + previous_id: [u8; 32], peer_handle: ConnectionHandle, ) -> Result { let blocks = blocks_response @@ -112,11 +120,26 @@ fn deserialize_batch( let block = Block::read(&mut block_entry.block.as_ref()) .map_err(|_| BlockDownloadError::PeersResponseWasInvalid)?; + let block_hash = block.hash(); + // Check the block matches the one requested and the peer sent enough transactions. - if requested_ids[i] != block.hash() || block.txs.len() != block_entry.txs.len() { + if requested_ids[i] != block_hash || block.txs.len() != block_entry.txs.len() { return Err(BlockDownloadError::PeersResponseWasInvalid); } + // Check that the previous ID is correct for the first block. + // This is to protect use against banning the wrong peer. + // This must happen after the hash check. + if i == 0 && block.header.previous != previous_id { + tracing::warn!( + "Invalid chain, peer told us a block follows the chain when it doesn't." + ); + + // This peer probably did nothing wrong, it was the peer who told us this blockID which + // is misbehaving. + return Err(BlockDownloadError::ChainInvalid); + } + // Check the height lines up as expected. // This must happen after the hash check. if !block diff --git a/p2p/cuprate-p2p/src/block_downloader/request_chain.rs b/p2p/cuprate-p2p/src/block_downloader/request_chain.rs index 20349997..7733aef9 100644 --- a/p2p/cuprate-p2p/src/block_downloader/request_chain.rs +++ b/p2p/cuprate-p2p/src/block_downloader/request_chain.rs @@ -219,6 +219,8 @@ where return Err(BlockDownloadError::FailedToFindAChainToFollow); } + let previous_id = hashes[first_unknown - 1]; + let first_entry = ChainEntry { ids: hashes[first_unknown..].to_vec(), peer: peer_id, @@ -230,7 +232,7 @@ where first_entry.ids.len() ); - let tracker = ChainTracker::new(first_entry, expected_height, our_genesis); + let tracker = ChainTracker::new(first_entry, expected_height, our_genesis, previous_id); Ok(tracker) } diff --git a/p2p/monero-p2p/src/client.rs b/p2p/monero-p2p/src/client.rs index 2eede923..33446819 100644 --- a/p2p/monero-p2p/src/client.rs +++ b/p2p/monero-p2p/src/client.rs @@ -160,6 +160,8 @@ impl Service for Client { }; if let Err(e) = self.connection_tx.try_send(req) { + // The connection task could have closed between a call to `poll_ready` and the call to + // `call`, which means if we don't handle the error here the receiver would panic. use mpsc::error::TrySendError; match e {