add more docs

This commit is contained in:
Boog900 2024-06-09 02:38:00 +01:00
parent d7c9032dd8
commit 08f1abb9df
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
2 changed files with 120 additions and 39 deletions

View file

@ -10,6 +10,7 @@ use std::{
use monero_serai::{block::Block, transaction::Transaction}; use monero_serai::{block::Block, transaction::Transaction};
use rand::prelude::*; use rand::prelude::*;
use rayon::prelude::*; use rayon::prelude::*;
use tokio::time::timeout;
use tokio::{ use tokio::{
task::JoinSet, task::JoinSet,
time::{interval, MissedTickBehavior}, time::{interval, MissedTickBehavior},
@ -34,6 +35,7 @@ use crate::{
}; };
mod chain_tracker; mod chain_tracker;
use crate::constants::{CHIAN_ENTRY_REQUEST_TIMEOUT, MAX_BLOCK_BATCH_LEN};
use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker}; use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker};
/// A downloaded batch of blocks. /// A downloaded batch of blocks.
@ -65,6 +67,8 @@ pub struct BlockDownloaderConfig {
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum BlockDownloadError { pub enum BlockDownloadError {
#[error("A request to a peer timed out.")]
TimedOut,
#[error("The block buffer was closed.")] #[error("The block buffer was closed.")]
BufferWasClosed, BufferWasClosed,
#[error("The peers we requested data from did not have all the data.")] #[error("The peers we requested data from did not have all the data.")]
@ -87,7 +91,7 @@ pub enum ChainSvcRequest {
CompactHistory, CompactHistory,
/// A request to find the first unknown /// A request to find the first unknown
FindFirstUnknown(Vec<[u8; 32]>), FindFirstUnknown(Vec<[u8; 32]>),
/// A request for our current cumulative difficulty.
CumulativeDifficulty, CumulativeDifficulty,
} }
@ -95,26 +99,27 @@ pub enum ChainSvcRequest {
pub enum ChainSvcResponse { pub enum ChainSvcResponse {
/// The response for [`ChainSvcRequest::CompactHistory`] /// The response for [`ChainSvcRequest::CompactHistory`]
CompactHistory { CompactHistory {
/// A list of blocks IDs in our chain, starting with the most recent block, all the way to the genesis block.
///
/// These blocks should be in reverse chronological order and not every block is needed.
block_ids: Vec<[u8; 32]>, block_ids: Vec<[u8; 32]>,
cumulative_difficulty: u128, cumulative_difficulty: u128,
}, },
/// The response for [`ChainSvcRequest::FindFirstUnknown`], contains the index of the first unknown /// The response for [`ChainSvcRequest::FindFirstUnknown`], contains the index of the first unknown
/// block. /// block.
FindFirstUnknown(usize), FindFirstUnknown(usize),
/// The current cumulative difficulty of our chain.
CumulativeDifficulty(u128), CumulativeDifficulty(u128),
} }
/// # Block Downloader
///
/// This function starts the block downloader and returns a [`BufferStream`] that will produce /// This function starts the block downloader and returns a [`BufferStream`] that will produce
/// a sequential stream of blocks. /// a sequential stream of blocks.
/// ///
/// The block downloader will pick the longest chain and will follow it for as long as possible, /// The block downloader will pick the longest chain and will follow it for as long as possible,
/// the blocks given from the [`BufferStream`] will be in order. /// the blocks given from the [`BufferStream`] will be in order.
/// ///
/// The block downloader may fail before the whole chain is downloaded. If this is the case you are /// The block downloader may fail before the whole chain is downloaded. If this is the case you can
/// free to call this function again, so it can start the search again. /// call this function again, so it can start the search again.
pub fn download_blocks<N: NetworkZone, S, C>( pub fn download_blocks<N: NetworkZone, S, C>(
client_pool: Arc<ClientPool<N>>, client_pool: Arc<ClientPool<N>>,
peer_sync_svc: S, peer_sync_svc: S,
@ -144,9 +149,13 @@ where
buffer_stream buffer_stream
} }
/// A batch of blocks in the ready queue, waiting for previous blocks to come in so they can
/// be passed into the buffer.
#[derive(Debug)] #[derive(Debug)]
struct ReadyQueueBatch { struct ReadyQueueBatch {
/// The start height of the batch.
start_height: u64, start_height: u64,
/// The batch of blocks.
block_batch: BlockBatch, block_batch: BlockBatch,
} }
@ -166,35 +175,64 @@ impl PartialOrd<Self> for ReadyQueueBatch {
impl Ord for ReadyQueueBatch { impl Ord for ReadyQueueBatch {
fn cmp(&self, other: &Self) -> Ordering { fn cmp(&self, other: &Self) -> Ordering {
// reverse the ordering so earlier blocks come first in a [`BinaryHeap`]
self.start_height.cmp(&other.start_height).reverse() self.start_height.cmp(&other.start_height).reverse()
} }
} }
/// # Block Downloader
///
/// This is the block downloader, which finds a chain to follow and attempts to follow it adding the blocks
/// to a [`async_buffer`].
struct BlockDownloader<N: NetworkZone, S, C> { struct BlockDownloader<N: NetworkZone, S, C> {
/// The client pool.
client_pool: Arc<ClientPool<N>>, client_pool: Arc<ClientPool<N>>,
/// The service that holds the peers sync states.
peer_sync_svc: S, peer_sync_svc: S,
/// The service that holds our current chain state.
our_chain_svc: C, our_chain_svc: C,
/// The amount of blocks to request in the next batch.
amount_of_blocks_to_request: usize, amount_of_blocks_to_request: usize,
/// The height at which `amount_of_blocks_to_request` was updated.
amount_of_blocks_to_request_updated_at: u64, amount_of_blocks_to_request_updated_at: u64,
/// The running block download tasks.
///
/// Returns:
/// - The start height of the batch
/// - A result contains the batch or an error.
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
block_download_tasks: JoinSet<( block_download_tasks: JoinSet<(
u64, u64,
Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError>, Result<(ClientPoolDropGuard<N>, BlockBatch), BlockDownloadError>,
)>, )>,
/// The running chain entry tasks.
///
/// Returns a result of the chain entry or an error.
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
chain_entry_task: JoinSet<Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError>>, chain_entry_task: JoinSet<Result<(ClientPoolDropGuard<N>, ChainEntry<N>), BlockDownloadError>>,
/// The current inflight requests.
///
/// This a map of batch start heights to block ids and related information of the batch.
inflight_requests: BTreeMap<u64, BlocksToRetrieve<N>>, inflight_requests: BTreeMap<u64, BlocksToRetrieve<N>>,
ready_batches: BinaryHeap<ReadyQueueBatch>,
failed_batches: BinaryHeap<Reverse<u64>>,
/// A queue of ready batches.
ready_batches: BinaryHeap<ReadyQueueBatch>,
/// The size, in bytes, of all the batches in `ready_batches`.
ready_batches_size: usize, ready_batches_size: usize,
/// A queue of failed batches start height's that should be retried.
///
/// Wrapped in [`Reverse`] so we prioritize early batches.
failed_batches: BinaryHeap<Reverse<u64>>,
/// The [`BufferAppender`] that gives blocks to Cuprate.
buffer_appender: BufferAppender<BlockBatch>, buffer_appender: BufferAppender<BlockBatch>,
/// The [`BlockDownloaderConfig`].
config: BlockDownloaderConfig, config: BlockDownloaderConfig,
} }
@ -375,8 +413,14 @@ where
{ {
let history = chain_tracker.get_simple_history(); let history = chain_tracker.get_simple_history();
self.chain_entry_task self.chain_entry_task.spawn(async move {
.spawn(request_chain_entry_from_peer(client, history)); timeout(
CHIAN_ENTRY_REQUEST_TIMEOUT,
request_chain_entry_from_peer(client, history),
)
.await
.map_err(|_| BlockDownloadError::TimedOut)?
});
return; return;
} }
@ -427,8 +471,14 @@ where
Ok(()) Ok(())
} }
/// Checks if we have batches ready to send down the [`BufferAppender`].
///
/// We guarantee that blocks sent down the buffer are sent in the correct order.
async fn push_new_blocks(&mut self) -> Result<(), BlockDownloadError> { async fn push_new_blocks(&mut self) -> Result<(), BlockDownloadError> {
while let Some(ready_batch) = self.ready_batches.peek() { while let Some(ready_batch) = self.ready_batches.peek() {
// Check if this ready batch's start height is higher than the lowest in flight request.
// If there is a lower start height in the inflight requests then this is _not_ the next batch
// to send down the buffer.
if self if self
.inflight_requests .inflight_requests
.first_key_value() .first_key_value()
@ -439,6 +489,8 @@ where
break; break;
} }
// Our next ready batch is older (lower height) than the oldest in flight, push it down the
// buffer.
let ready_batch = self.ready_batches.pop().unwrap(); let ready_batch = self.ready_batches.pop().unwrap();
let size = ready_batch.block_batch.size; let size = ready_batch.block_batch.size;
@ -448,11 +500,14 @@ where
.send(ready_batch.block_batch, size) .send(ready_batch.block_batch, size)
.await .await
.map_err(|_| BlockDownloadError::BufferWasClosed)?; .map_err(|_| BlockDownloadError::BufferWasClosed)?;
// Loops back to check the next oldest ready batch.
} }
Ok(()) Ok(())
} }
/// Handles a response to a request to get blocks from a peer.
async fn handle_download_batch_res( async fn handle_download_batch_res(
&mut self, &mut self,
start_height: u64, start_height: u64,
@ -461,7 +516,10 @@ where
) -> Result<(), BlockDownloadError> { ) -> Result<(), BlockDownloadError> {
match res { match res {
Err(e) => { Err(e) => {
// TODO: (IMPORTANT) check if this failure is from the peer who told us about the batch, if so ban them.
if matches!(e, BlockDownloadError::ChainInvalid) { if matches!(e, BlockDownloadError::ChainInvalid) {
// If the chain was invalid ban the peer who told us about it.
self.inflight_requests self.inflight_requests
.get(&start_height) .get(&start_height)
.inspect(|entry| entry.peer_who_told_us_handle.ban_peer(LONG_BAN)); .inspect(|entry| entry.peer_who_told_us_handle.ban_peer(LONG_BAN));
@ -469,6 +527,7 @@ where
return Err(e); return Err(e);
} }
// Add the request to the failed list.
if self.inflight_requests.contains_key(&start_height) { if self.inflight_requests.contains_key(&start_height) {
self.failed_batches.push(Reverse(start_height)) self.failed_batches.push(Reverse(start_height))
} }
@ -476,47 +535,37 @@ where
Ok(()) Ok(())
} }
Ok((client, block_batch)) => { Ok((client, block_batch)) => {
// Remove the batch from the inflight batches.
if self.inflight_requests.remove(&start_height).is_none() { if self.inflight_requests.remove(&start_height).is_none() {
// If it was already retrieved then there is nothing else to do.
// TODO: should we drop this peer for being slow?
self.handle_free_client(chain_tracker, client).await; self.handle_free_client(chain_tracker, client).await;
return Ok(()); return Ok(());
}; };
// If the batch is higher than the last time we updated `amount_of_blocks_to_request`, update it
// again.
if start_height > self.amount_of_blocks_to_request_updated_at { if start_height > self.amount_of_blocks_to_request_updated_at {
let old_amount_of_blocks_to_request = self.amount_of_blocks_to_request; self.amount_of_blocks_to_request = calculate_next_block_batch_size(
block_batch.size,
// The average block size of the last batch of blocks, multiplied by 2 as a safety margin for block_batch.blocks.len(),
// future blocks. self.config.target_batch_size,
let adjusted_average_block_size =
max((block_batch.size * 2) / block_batch.blocks.len(), 1);
// Set the amount of blocks to request equal to our target batch size divided by the adjusted_average_block_size.
// Capping the amount at the maximum allowed in a single request.
self.amount_of_blocks_to_request = min(
max(
self.config.target_batch_size / adjusted_average_block_size,
1,
),
100,
);
// Make sure the amount does not increase too quickly if we get some small blocks so limit the growth to 1.5x the last
// batch size.
self.amount_of_blocks_to_request = min(
self.amount_of_blocks_to_request,
(old_amount_of_blocks_to_request * 3).div_ceil(2),
); );
self.amount_of_blocks_to_request_updated_at = start_height; self.amount_of_blocks_to_request_updated_at = start_height;
} }
// Add the batch to the queue of ready batches.
self.ready_batches_size += block_batch.size; self.ready_batches_size += block_batch.size;
self.ready_batches.push(ReadyQueueBatch { self.ready_batches.push(ReadyQueueBatch {
start_height, start_height,
block_batch, block_batch,
}); });
// Attempt to push new batches to the buffer.
self.push_new_blocks().await?; self.push_new_blocks().await?;
// Give more work to this client.
self.handle_free_client(chain_tracker, client).await; self.handle_free_client(chain_tracker, client).await;
Ok(()) Ok(())
} }
@ -565,6 +614,32 @@ where
} }
} }
/// Calculates the next amount of blocks to request in a batch.
///
/// Parameters:
/// `previous_batch_size` is the size, in bytes, of the last batch,
/// `previous_batch_len` is the amount of blocks in the last batch,
/// `target_batch_size` is the target size, in bytes, of a batch.
fn calculate_next_block_batch_size(
previous_batch_size: usize,
previous_batch_len: usize,
target_batch_size: usize,
) -> usize {
// The average block size of the last batch of blocks, multiplied by 2 as a safety margin for
// future blocks.
let adjusted_average_block_size = max((previous_batch_size * 2) / previous_batch_len, 1);
// Set the amount of blocks to request equal to our target batch size divided by the adjusted_average_block_size.
let next_batch_len = max(target_batch_size / adjusted_average_block_size, 1);
// Cap the amount of growth to 1.5x the previous batch len, to prevent a small block casing us to request
// a huge amount of blocks.
let next_batch_len = min(next_batch_len, (previous_batch_len * 3).div_ceil(2));
// Cap the length to the maximum allowed.
min(next_batch_len, MAX_BLOCK_BATCH_LEN)
}
async fn request_batch_from_peer<N: NetworkZone>( async fn request_batch_from_peer<N: NetworkZone>(
mut client: ClientPoolDropGuard<N>, mut client: ClientPoolDropGuard<N>,
ids: ByteArrayVec<32>, ids: ByteArrayVec<32>,
@ -615,6 +690,7 @@ async fn request_batch_from_peer<N: NetworkZone>(
.number() .number()
.is_some_and(|height| height == expected_height) .is_some_and(|height| height == expected_height)
{ {
// TODO: remove this panic, I have it this error though which is why it's here.
panic!("{} {}", expected_height, block.number().unwrap()); panic!("{} {}", expected_height, block.number().unwrap());
// This peer probably did nothing wrong, it was the peer who told us this blockID which // This peer probably did nothing wrong, it was the peer who told us this blockID which
// is misbehaving. // is misbehaving.
@ -760,24 +836,21 @@ where
break; break;
}; };
let cloned_req = req.clone(); let cloned_req = req.clone();
futs.spawn(async move { futs.spawn(timeout(CHIAN_ENTRY_REQUEST_TIMEOUT, async move {
let PeerResponse::GetChain(chain_res) = let PeerResponse::GetChain(chain_res) =
next_peer.ready().await?.call(cloned_req).await? next_peer.ready().await?.call(cloned_req).await?
else { else {
panic!("connection task returned wrong response!"); panic!("connection task returned wrong response!");
}; };
Ok((chain_res, next_peer.info.id, next_peer.info.handle.clone())) Ok::<_, tower::BoxError>((chain_res, next_peer.info.id, next_peer.info.handle.clone()))
}); }));
} }
let mut res: Option<(ChainResponse, InternalPeerID<_>, ConnectionHandle)> = None; let mut res: Option<(ChainResponse, InternalPeerID<_>, ConnectionHandle)> = None;
while let Some(task_res) = futs.join_next().await { while let Some(task_res) = futs.join_next().await {
let Ok(task_res): Result< let Ok(Ok(task_res)) = task_res.unwrap() else {
(ChainResponse, InternalPeerID<_>, ConnectionHandle),
tower::BoxError,
> = task_res.unwrap() else {
continue; continue;
}; };

View file

@ -43,6 +43,14 @@ pub(crate) const INBOUND_CONNECTION_COOL_DOWN: Duration = Duration::from_millis(
/// The initial amount of chain requests to send to find the best chain to sync from. /// The initial amount of chain requests to send to find the best chain to sync from.
pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3; pub(crate) const INITIAL_CHAIN_REQUESTS_TO_SEND: usize = 3;
/// The enforced maximum amount of blocks to request in a batch.
///
/// Requesting more than this will cause the peer to disconnect and potentially lead to bans.
pub(crate) const MAX_BLOCK_BATCH_LEN: usize = 100;
/// The timeout for a chain entry request.
pub(crate) const CHIAN_ENTRY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;