more checks on incoming data

This commit is contained in:
Boog900 2024-06-04 02:42:53 +01:00
parent 51e67163a8
commit fd8885174b
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
4 changed files with 53 additions and 17 deletions

View file

@ -132,7 +132,7 @@ async fn main() {
let config = P2PConfig::<ClearNet> {
network: Default::default(),
outbound_connections: 32,
outbound_connections: 64,
extra_outbound_connections: 32,
max_inbound_connections: 128,
gray_peers_percent: 0.7,
@ -143,7 +143,7 @@ async fn main() {
max_white_list_length: 1000,
max_gray_list_length: 5000,
peer_store_file: PathBuf::from_str("p2p_store").unwrap(),
peer_save_period: Duration::from_secs(30),
peer_save_period: Duration::from_secs(300),
},
};
@ -151,7 +151,7 @@ async fn main() {
.await
.unwrap();
sleep(Duration::from_secs(17)).await;
sleep(Duration::from_secs(15)).await;
loop {
let mut buffer = download_blocks(
@ -161,8 +161,8 @@ async fn main() {
BlockDownloaderConfig {
buffer_size: 50_000_000,
in_progress_queue_size: 30_000_000,
check_client_pool_interval: Duration::from_secs(20),
target_batch_size: 2_000_000,
check_client_pool_interval: Duration::from_secs(10),
target_batch_size: 5_000_000,
initial_batch_size: 10,
},
);

View file

@ -3,6 +3,7 @@
mod chain_tracker;
use futures::FutureExt;
use std::cmp::{max, min, Ordering, Reverse};
use std::collections::{BTreeMap, BinaryHeap, HashSet, VecDeque};
use std::sync::Arc;
@ -29,7 +30,7 @@ use monero_pruning::CRYPTONOTE_MAX_BLOCK_HEIGHT;
use monero_wire::protocol::{ChainRequest, ChainResponse, GetObjectsRequest};
use crate::client_pool::{ClientPool, ClientPoolDropGuard};
use crate::constants::{INITIAL_CHAIN_REQUESTS_TO_SEND, MEDIUM_BAN};
use crate::constants::{INITIAL_CHAIN_REQUESTS_TO_SEND, LONG_BAN, MEDIUM_BAN};
/// A downloaded batch of blocks.
#[derive(Debug)]
@ -66,6 +67,8 @@ pub enum BlockDownloadError {
PeerDidNotHaveRequestedData,
#[error("The peers response to a request was invalid.")]
PeersResponseWasInvalid,
#[error("The chain we are following is invalid.")]
ChainInvalid,
#[error("Failed to find a more advanced chain to follow")]
FailedToFindAChainToFollow,
#[error("The peer did not send any overlapping blocks, unknown start height.")]
@ -381,6 +384,8 @@ where
&mut self,
chain_tracker: &mut ChainTracker<N>,
) -> Result<(), BlockDownloadError> {
tracing::debug!("Checking for free peers");
// This value might be slightly behind but thats ok.
let ChainSvcResponse::CumulativeDifficulty(current_cumulative_difficulty) = self
.our_chain_svc
@ -405,6 +410,8 @@ where
panic!("Chain service returned ")
};
tracing::debug!("Response received from peer sync service");
// Rust borrow rules mean we have to build a vec here.
let mut clients = Vec::with_capacity(peers.len());
clients.extend(self.client_pool.borrow_clients(&peers));
@ -449,7 +456,15 @@ where
chain_tracker: &mut ChainTracker<N>,
) -> Result<(), BlockDownloadError> {
match res {
Err(_) => {
Err(e) => {
if matches!(e, BlockDownloadError::ChainInvalid) {
self.inflight_requests
.get(&start_height)
.inspect(|entry| entry.peer_who_told_us_handle.ban_peer(LONG_BAN));
return Err(e);
}
if self.inflight_requests.contains_key(&start_height) {
self.failed_batches.push(Reverse(start_height))
}
@ -512,13 +527,11 @@ where
)
.await?;
// The request ID for which we updated `self.amount_of_blocks_to_request`
// `amount_of_blocks_to_request` will update for every new batch of blocks that come in.
let mut amount_of_blocks_to_request_updated_at = 0;
let mut check_client_pool_interval = interval(self.config.check_client_pool_interval);
check_client_pool_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
self.check_for_free_clients(&mut chain_tracker).await?;
loop {
tokio::select! {
_ = check_client_pool_interval.tick() => {
@ -559,7 +572,7 @@ async fn request_batch_from_peer<N: NetworkZone>(
.ready()
.await?
.call(PeerRequest::GetObjects(GetObjectsRequest {
blocks: ids,
blocks: ids.clone(),
pruned: false,
}))
.await?
@ -582,16 +595,28 @@ async fn request_batch_from_peer<N: NetworkZone>(
let blocks = blocks_response
.blocks
.into_par_iter()
.map(|block_entry| {
.enumerate()
.map(|(i, block)| (i, u64::try_from(i).unwrap() + expected_start_height, block))
.map(|(i, expected_height, block_entry)| {
let mut size = block_entry.block.len();
let block = Block::read(&mut block_entry.block.as_ref())
.map_err(|_| BlockDownloadError::PeersResponseWasInvalid)?;
if block.txs.len() != block_entry.txs.len() {
if ids[i] != block.hash() || block.txs.len() != block_entry.txs.len() {
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
if !block
.number()
.is_some_and(|height| height == expected_height)
{
panic!("{} {}", expected_height, block.number().unwrap());
// This peer probably did nothing wrong, it was the peer who told us this blockID which
// is misbehaving.
return Err(BlockDownloadError::ChainInvalid);
}
let txs = block_entry
.txs
.take_normal()
@ -651,7 +676,7 @@ async fn request_chain_entry_from_peer<N: NetworkZone>(
.await?
.call(PeerRequest::GetChain(ChainRequest {
block_ids: short_history.into(),
prune: false,
prune: true,
}))
.await?
else {
@ -793,7 +818,11 @@ where
handle: peer_handle,
};
let tracker = ChainTracker::new(first_entry, start_height, our_genesis);
let tracker = ChainTracker::new(
first_entry,
start_height + u64::try_from(first_unknown).unwrap(),
our_genesis,
);
Ok(tracker)
}

View file

@ -36,6 +36,10 @@ impl<N: NetworkZone> Drop for ClientPoolDropGuard<N> {
fn drop(&mut self) {
let client = self.client.take().unwrap();
if !client.info.handle.is_closed() {
tracing::warn!("peer dropped");
}
self.pool.add_client(client);
}
}

View file

@ -13,7 +13,10 @@ pub(crate) const OUTBOUND_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_
pub(crate) const SHORT_BAN: Duration = Duration::from_secs(60 * 10);
/// The durations of a medium ban.
pub(crate) const MEDIUM_BAN: Duration = Duration::from_secs(60 * 10 * 24);
pub(crate) const MEDIUM_BAN: Duration = Duration::from_secs(60 * 60 * 24);
/// The durations of a long ban.
pub(crate) const LONG_BAN: Duration = Duration::from_secs(60 * 60 * 24 * 7);
/// The default amount of time between inbound diffusion flushes.
pub(crate) const DIFFUSION_FLUSH_AVERAGE_SECONDS_INBOUND: Duration = Duration::from_secs(5);