mirror of
https://github.com/Cuprate/cuprate.git
synced 2025-03-12 09:29:11 +00:00
add some limits on messages
This commit is contained in:
parent
08f1abb9df
commit
6776264dc6
2 changed files with 73 additions and 22 deletions
|
@ -26,7 +26,7 @@ use monero_p2p::{
|
|||
services::{PeerSyncRequest, PeerSyncResponse},
|
||||
NetworkZone, PeerRequest, PeerResponse, PeerSyncSvc,
|
||||
};
|
||||
use monero_pruning::CRYPTONOTE_MAX_BLOCK_HEIGHT;
|
||||
use monero_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT};
|
||||
use monero_wire::protocol::{ChainRequest, ChainResponse, GetObjectsRequest};
|
||||
|
||||
use crate::{
|
||||
|
@ -35,7 +35,10 @@ use crate::{
|
|||
};
|
||||
|
||||
mod chain_tracker;
|
||||
use crate::constants::{CHIAN_ENTRY_REQUEST_TIMEOUT, MAX_BLOCK_BATCH_LEN};
|
||||
use crate::constants::{
|
||||
CHAIN_ENTRY_REQUEST_TIMEOUT, MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MAX_BLOCK_BATCH_LEN,
|
||||
MAX_TRANSACTION_BLOB_SIZE,
|
||||
};
|
||||
use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker};
|
||||
|
||||
/// A downloaded batch of blocks.
|
||||
|
@ -182,12 +185,17 @@ impl Ord for ReadyQueueBatch {
|
|||
|
||||
/// # Block Downloader
|
||||
///
|
||||
/// This is the block downloader, which finds a chain to follow and attempts to follow it adding the blocks
|
||||
/// to a [`async_buffer`].
|
||||
/// This is the block downloader, which finds a chain to follow and attempts to follow it, adding the
|
||||
/// downloaded blocks to a [`async_buffer`].
|
||||
struct BlockDownloader<N: NetworkZone, S, C> {
|
||||
/// The client pool.
|
||||
client_pool: Arc<ClientPool<N>>,
|
||||
|
||||
/// Peers that have pruned the blocks we are currently downloading.
|
||||
///
|
||||
/// These peers are ready for when we reach blocks that they have not pruned.
|
||||
pending_peers: BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>,
|
||||
|
||||
/// The service that holds the peers sync states.
|
||||
peer_sync_svc: S,
|
||||
/// The service that holds our current chain state.
|
||||
|
@ -255,6 +263,7 @@ where
|
|||
) -> Self {
|
||||
BlockDownloader {
|
||||
client_pool,
|
||||
pending_peers: BTreeMap::new(),
|
||||
peer_sync_svc,
|
||||
our_chain_svc,
|
||||
amount_of_blocks_to_request: config.initial_batch_size,
|
||||
|
@ -275,6 +284,8 @@ where
|
|||
panic!("We need requests inflight to be able to send the request again")
|
||||
}
|
||||
|
||||
// TODO: check pruning seeds.
|
||||
|
||||
let first_batch_requests_sent = self
|
||||
.inflight_requests
|
||||
.first_key_value()
|
||||
|
@ -383,6 +394,11 @@ where
|
|||
let Some(block_entry_to_get) = chain_tracker
|
||||
.blocks_to_get(&client.info.pruning_seed, self.amount_of_blocks_to_request)
|
||||
else {
|
||||
self.pending_peers
|
||||
.entry(client.info.pruning_seed)
|
||||
.or_default()
|
||||
.push(client);
|
||||
|
||||
return;
|
||||
};
|
||||
|
||||
|
@ -415,7 +431,7 @@ where
|
|||
|
||||
self.chain_entry_task.spawn(async move {
|
||||
timeout(
|
||||
CHIAN_ENTRY_REQUEST_TIMEOUT,
|
||||
CHAIN_ENTRY_REQUEST_TIMEOUT,
|
||||
request_chain_entry_from_peer(client, history),
|
||||
)
|
||||
.await
|
||||
|
@ -640,13 +656,16 @@ fn calculate_next_block_batch_size(
|
|||
min(next_batch_len, MAX_BLOCK_BATCH_LEN)
|
||||
}
|
||||
|
||||
/// Requests a sequential batch of blocks from a peer.
|
||||
///
|
||||
/// This function will validate the blocks that were downloaded were the ones asked for and that they match
|
||||
/// the expected height.
|
||||
async fn request_batch_from_peer<N: NetworkZone>(
|
||||
mut client: ClientPoolDropGuard<N>,
|
||||
ids: ByteArrayVec<32>,
|
||||
expected_start_height: u64,
|
||||
) -> Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError> {
|
||||
let numb_requested = ids.len();
|
||||
|
||||
// Request the blocks.
|
||||
let PeerResponse::GetObjects(blocks_response) = client
|
||||
.ready()
|
||||
.await?
|
||||
|
@ -659,44 +678,45 @@ async fn request_batch_from_peer<N: NetworkZone>(
|
|||
panic!("Connection task returned wrong response.");
|
||||
};
|
||||
|
||||
if blocks_response.blocks.len() > numb_requested {
|
||||
// Initial sanity checks
|
||||
if blocks_response.blocks.len() > ids.len() {
|
||||
client.info.handle.ban_peer(MEDIUM_BAN);
|
||||
return Err(BlockDownloadError::PeersResponseWasInvalid);
|
||||
}
|
||||
|
||||
if blocks_response.blocks.len() != numb_requested {
|
||||
if blocks_response.blocks.len() != ids.len() {
|
||||
return Err(BlockDownloadError::PeerDidNotHaveRequestedData);
|
||||
}
|
||||
|
||||
let blocks = rayon_spawn_async(move || {
|
||||
// TODO: size check the incoming blocks/ txs.
|
||||
|
||||
let blocks = blocks_response
|
||||
.blocks
|
||||
.into_par_iter()
|
||||
.enumerate()
|
||||
.map(|(i, block)| (i, u64::try_from(i).unwrap() + expected_start_height, block))
|
||||
.map(|(i, expected_height, block_entry)| {
|
||||
.map(|(i, block_entry)| {
|
||||
let expected_height = u64::try_from(i).unwrap() + expected_start_height;
|
||||
|
||||
let mut size = block_entry.block.len();
|
||||
|
||||
let block = Block::read(&mut block_entry.block.as_ref())
|
||||
.map_err(|_| BlockDownloadError::PeersResponseWasInvalid)?;
|
||||
|
||||
// Check the block matches the one requested and the peer sent enough transactions.
|
||||
if ids[i] != block.hash() || block.txs.len() != block_entry.txs.len() {
|
||||
return Err(BlockDownloadError::PeersResponseWasInvalid);
|
||||
}
|
||||
|
||||
// Check the height lines up as expected.
|
||||
if !block
|
||||
.number()
|
||||
.is_some_and(|height| height == expected_height)
|
||||
{
|
||||
// TODO: remove this panic, I have it this error though which is why it's here.
|
||||
panic!("{} {}", expected_height, block.number().unwrap());
|
||||
// This peer probably did nothing wrong, it was the peer who told us this blockID which
|
||||
// is misbehaving.
|
||||
return Err(BlockDownloadError::ChainInvalid);
|
||||
}
|
||||
|
||||
// Deserialize the transactions.
|
||||
let txs = block_entry
|
||||
.txs
|
||||
.take_normal()
|
||||
|
@ -704,11 +724,17 @@ async fn request_batch_from_peer<N: NetworkZone>(
|
|||
.into_iter()
|
||||
.map(|tx_blob| {
|
||||
size += tx_blob.len();
|
||||
Transaction::read(&mut tx_blob.as_ref())
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map_err(|_| BlockDownloadError::PeersResponseWasInvalid)?;
|
||||
|
||||
if tx_blob.len() > MAX_TRANSACTION_BLOB_SIZE {
|
||||
return Err(BlockDownloadError::PeersResponseWasInvalid);
|
||||
}
|
||||
|
||||
Transaction::read(&mut tx_blob.as_ref())
|
||||
.map_err(|_| BlockDownloadError::PeersResponseWasInvalid)
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
// Make sure the transactions in the block were the ones the peer sent.
|
||||
let mut expected_txs = block.txs.iter().collect::<HashSet<_>>();
|
||||
|
||||
for tx in &txs {
|
||||
|
@ -730,6 +756,7 @@ async fn request_batch_from_peer<N: NetworkZone>(
|
|||
.await;
|
||||
|
||||
let (blocks, sizes) = blocks.inspect_err(|e| {
|
||||
// If the peers response was invalid, ban it.
|
||||
if matches!(e, BlockDownloadError::PeersResponseWasInvalid) {
|
||||
client.info.handle.ban_peer(MEDIUM_BAN);
|
||||
}
|
||||
|
@ -747,6 +774,10 @@ async fn request_batch_from_peer<N: NetworkZone>(
|
|||
))
|
||||
}
|
||||
|
||||
/// Request a chain entry from a peer.
|
||||
///
|
||||
/// Because the block downloader only follows and downloads one chain we only have to send the block hash of
|
||||
/// top block we have found and the genesis block, this is then called `short_history`.
|
||||
async fn request_chain_entry_from_peer<N: NetworkZone>(
|
||||
mut client: ClientPoolDropGuard<N>,
|
||||
short_history: [[u8; 32]; 2],
|
||||
|
@ -763,7 +794,9 @@ async fn request_chain_entry_from_peer<N: NetworkZone>(
|
|||
panic!("Connection task returned wrong response!");
|
||||
};
|
||||
|
||||
if chain_res.m_block_ids.is_empty() {
|
||||
if chain_res.m_block_ids.is_empty()
|
||||
|| chain_res.m_block_ids.len() > MAX_BLOCKS_IDS_IN_CHAIN_ENTRY
|
||||
{
|
||||
client.info.handle.ban_peer(MEDIUM_BAN);
|
||||
return Err(BlockDownloadError::PeersResponseWasInvalid);
|
||||
}
|
||||
|
@ -776,6 +809,12 @@ async fn request_chain_entry_from_peer<N: NetworkZone>(
|
|||
return Err(BlockDownloadError::PeersResponseWasInvalid);
|
||||
}
|
||||
|
||||
// If the genesis is the overlapping block then this peer does not have our top tracked block in
|
||||
// its chain.
|
||||
if chain_res.m_block_ids[0] == short_history[1] {
|
||||
return Err(BlockDownloadError::PeerDidNotHaveRequestedData);
|
||||
}
|
||||
|
||||
let entry = ChainEntry {
|
||||
ids: (&chain_res.m_block_ids).into(),
|
||||
peer: client.info.id,
|
||||
|
@ -836,7 +875,7 @@ where
|
|||
break;
|
||||
};
|
||||
let cloned_req = req.clone();
|
||||
futs.spawn(timeout(CHIAN_ENTRY_REQUEST_TIMEOUT, async move {
|
||||
futs.spawn(timeout(CHAIN_ENTRY_REQUEST_TIMEOUT, async move {
|
||||
let PeerResponse::GetChain(chain_res) =
|
||||
next_peer.ready().await?.call(cloned_req).await?
|
||||
else {
|
||||
|
|
|
@ -49,7 +49,19 @@ pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3;
|
|||
pub(crate) const MAX_BLOCK_BATCH_LEN: usize = 100;
|
||||
|
||||
/// The timeout for a chain entry request.
|
||||
pub(crate) const CHIAN_ENTRY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
pub(crate) const CHAIN_ENTRY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
/// The maximum size of a transaction, a sanity limit that all transactions across all hard-forks must
|
||||
/// be less than.
|
||||
///
|
||||
/// ref: <https://monero-book.cuprate.org/consensus_rules/transactions.html#transaction-size>
|
||||
pub(crate) const MAX_TRANSACTION_BLOB_SIZE: usize = 1_000_000;
|
||||
|
||||
/// The maximum amount of block IDS allowed in a chain entry response.
|
||||
///
|
||||
/// ref: <https://github.com/monero-project/monero/blob/cc73fe71162d564ffda8e549b79a350bca53c454/src/cryptonote_config.h#L97>
|
||||
// TODO: link to the protocol book when this section is added.
|
||||
pub(crate) const MAX_BLOCKS_IDS_IN_CHAIN_ENTRY: usize = 25_000;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
|
Loading…
Reference in a new issue