clean up API and add more docs

This commit is contained in:
Boog900 2024-06-11 02:39:53 +01:00
parent 7d33ab25b4
commit e81593beff
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
5 changed files with 170 additions and 97 deletions

View file

@ -153,9 +153,7 @@ async fn main() {
sleep(Duration::from_secs(15)).await;
let mut buffer = download_blocks(
net.pool.clone(),
net.sync_states_svc.clone(),
let mut buffer = net.block_downloader(
OurChainSvc,
BlockDownloaderConfig {
buffer_size: 50_000_000,

View file

@ -1,5 +1,9 @@
//! # Block Downloader
//!
//! This module contains the block downloader, which finds a chain to download from our connected peers
//! and downloads it.
//!
//! The block downloader is started by [`download_blocks`].
use std::{
cmp::{max, min, Ordering, Reverse},
collections::{BTreeMap, BinaryHeap, HashSet},
@ -11,10 +15,9 @@ use std::{
use monero_serai::{block::Block, transaction::Transaction};
use rand::prelude::*;
use rayon::prelude::*;
use tokio::time::timeout;
use tokio::{
task::JoinSet,
time::{interval, MissedTickBehavior},
time::{interval, timeout, MissedTickBehavior},
};
use tower::{Service, ServiceExt};
@ -32,14 +35,15 @@ use monero_wire::protocol::{ChainRequest, ChainResponse, GetObjectsRequest};
use crate::{
client_pool::{ClientPool, ClientPoolDropGuard},
constants::{INITIAL_CHAIN_REQUESTS_TO_SEND, LONG_BAN, MEDIUM_BAN},
constants::{
CHAIN_ENTRY_REQUEST_TIMEOUT, INITIAL_CHAIN_REQUESTS_TO_SEND, LONG_BAN,
MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MAX_BLOCK_BATCH_LEN, MAX_DOWNLOAD_FAILURES,
MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN,
},
};
mod chain_tracker;
use crate::constants::{
CHAIN_ENTRY_REQUEST_TIMEOUT, MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MAX_BLOCK_BATCH_LEN,
MAX_DOWNLOAD_FAILURES, MAX_TRANSACTION_BLOB_SIZE,
};
use crate::constants::EMPTY_CHAIN_ENTIES_BEFORE_TOP_ASSUMED;
use chain_tracker::{BlocksToRetrieve, ChainEntry, ChainTracker};
/// A downloaded batch of blocks.
@ -59,7 +63,7 @@ pub struct BlockDownloaderConfig {
/// The size of the buffer between the block downloader and the place which
/// is consuming the downloaded blocks.
pub buffer_size: usize,
/// The size of the in progress queue at which we stop requesting more blocks.
/// The size of the in progress queue (in bytes) at which we stop requesting more blocks.
pub in_progress_queue_size: usize,
/// The [`Duration`] between checking the client pool for free peers.
pub check_client_pool_interval: Duration,
@ -110,8 +114,8 @@ pub enum ChainSvcResponse {
cumulative_difficulty: u128,
},
/// The response for [`ChainSvcRequest::FindFirstUnknown`], contains the index of the first unknown
/// block.
FindFirstUnknown(usize),
/// block and its expected height.
FindFirstUnknown(usize, u64),
/// The current cumulative difficulty of our chain.
CumulativeDifficulty(u128),
}
@ -179,7 +183,7 @@ impl PartialOrd<Self> for ReadyQueueBatch {
impl Ord for ReadyQueueBatch {
fn cmp(&self, other: &Self) -> Ordering {
// reverse the ordering so earlier blocks come first in a [`BinaryHeap`]
// reverse the ordering so older blocks (lower height) come first in a [`BinaryHeap`]
self.start_height.cmp(&other.start_height).reverse()
}
}
@ -188,12 +192,31 @@ impl Ord for ReadyQueueBatch {
///
/// This is the block downloader, which finds a chain to follow and attempts to follow it, adding the
/// downloaded blocks to an [`async_buffer`].
///
/// ## Implementation Details
///
/// The first step to downloading blocks is to find a chain to follow, this is done by [`initial_chain_search`],
/// docs can be found on that function for details on how this is done.
///
/// With an initial list of block IDs to follow the block downloader will then look for available peers
/// to download blocks from.
///
/// For each peer we will then allocate a batch of blocks for them to retrieve, as these blocks come in
/// we add them to queue for pushing into the [`async_buffer`], once we have the oldest block downloaded
/// we send it into the buffer, repeating this until the oldest current block is still being downloaded.
///
/// When a peer has finished downloading blocks we add it to our list of ready peers, so it can be used to
/// request more data from.
///
/// Ready peers will either:
/// - download the next batch of blocks
/// - request the next chain entry
/// - download an already requested batch of blocks this might happen due to an error in the previous request
/// or because the queue of ready blocks is too large, so we need the oldest block to clear it.
struct BlockDownloader<N: NetworkZone, S, C> {
/// The client pool.
client_pool: Arc<ClientPool<N>>,
/// Peers that have pruned the blocks we are currently downloading.
///
/// These peers are ready for when we reach blocks that they have not pruned.
/// Peers that are ready to handle requests.
pending_peers: BTreeMap<PruningSeed, Vec<ClientPoolDropGuard<N>>>,
/// The service that holds the peers sync states.
@ -285,14 +308,16 @@ where
}
}
/// Checks if we can make use of any peers that are currently pending requests.
async fn check_pending_peers(&mut self, chain_tracker: &mut ChainTracker<N>) {
// HACK: The borrow checker doesn't like the following code, if we don't do this.
// HACK: The borrow checker doesn't like the following code if we don't do this.
let mut pending_peers = mem::take(&mut self.pending_peers);
for (_, peers) in pending_peers.iter_mut() {
while let Some(peer) = peers.pop() {
if let Some(peer) = self.try_handle_free_client(chain_tracker, peer).await {
// This peer is ok however it does not have the data we currently need.
// This peer is ok however it does not have the data we currently need, this will only happen
// because of it's pruning seed so just skip over all peers with this pruning seed.
peers.push(peer);
break;
}
@ -304,6 +329,12 @@ where
self.pending_peers = pending_peers;
}
/// Attempts to send another request for an inflight batch
///
/// This function will find the oldest batch that has had the least amount of requests sent for it
/// and will then attempt to send another request for it.
///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the batch according to it's pruning seed.
async fn request_inflight_batch_again(
&mut self,
client: ClientPoolDropGuard<N>,
@ -312,8 +343,33 @@ where
panic!("We need requests inflight to be able to send the request again")
}
// TODO: check pruning seeds.
// The closure that checks if a peer has a batch according to it's pruning seed and either requests it or
// returns the peer.
let try_request_batch = |batch: &mut BlocksToRetrieve<N>| {
if !client_has_block_in_range(
&client.info.pruning_seed,
batch.start_height,
batch.ids.len(),
) {
return Some(client);
}
batch.requests_sent += 1;
let ids = batch.ids.clone();
let start_height = batch.start_height;
self.block_download_tasks.spawn(async move {
(
start_height,
request_batch_from_peer(client, ids, start_height).await,
)
});
None
};
// The amount of requests sent for the oldest (lowest height) batch.
let first_batch_requests_sent = self
.inflight_requests
.first_key_value()
@ -321,6 +377,8 @@ where
.1
.requests_sent;
// If the first batch has the same amount of requests sent as the oldest batch then send another
// request for the first batch.
if first_batch_requests_sent
== self
.inflight_requests
@ -333,30 +391,10 @@ where
let first_batch_mut = first_batch.get_mut();
if !client_has_block_in_range(
&client.info.pruning_seed,
first_batch_mut.start_height,
first_batch_mut.ids.len(),
) {
return Some(client);
}
first_batch_mut.requests_sent += 1;
// They should have the blocks so send the re-request to this peer.
let ids = first_batch_mut.ids.clone();
let start_height = first_batch_mut.start_height;
self.block_download_tasks.spawn(async move {
(
start_height,
request_batch_from_peer(client, ids, start_height).await,
)
});
return None;
return try_request_batch(first_batch_mut);
}
// Otherwise find the point where the split in number of requests sent happen.
let next_batch = self
.inflight_requests
.iter_mut()
@ -364,31 +402,15 @@ where
.unwrap()
.1;
if !client_has_block_in_range(
&client.info.pruning_seed,
next_batch.start_height,
next_batch.ids.len(),
) {
return Some(client);
}
next_batch.requests_sent += 1;
// They should have the blocks so send the re-request to this peer.
let ids = next_batch.ids.clone();
let start_height = next_batch.start_height;
self.block_download_tasks.spawn(async move {
(
start_height,
request_batch_from_peer(client, ids, start_height).await,
)
});
None
try_request_batch(next_batch)
}
/// Spawns a task to request blocks from the given peer.
///
/// The batch requested will depend on our current state, failed batches will be prioritised.
///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according
/// to it's pruning seed.
async fn request_block_batch(
&mut self,
chain_tracker: &mut ChainTracker<N>,
@ -400,15 +422,11 @@ where
// failure.
if let Some(request) = self.inflight_requests.get(&failed_request.0) {
// Check if this peer has the blocks according to their pruning seed.
if client
.info
.pruning_seed
.has_full_block(request.start_height, CRYPTONOTE_MAX_BLOCK_HEIGHT)
&& client.info.pruning_seed.has_full_block(
request.start_height + u64::try_from(request.ids.len()).unwrap(),
CRYPTONOTE_MAX_BLOCK_HEIGHT,
)
{
if client_has_block_in_range(
&client.info.pruning_seed,
request.start_height,
request.ids.len(),
) {
// They should have the blocks so send the re-request to this peer.
let ids = request.ids.clone();
let start_height = request.start_height;
@ -432,6 +450,7 @@ where
}
}
// If our ready queue is too large send duplicate requests for the blocks we are waiting on.
if self.ready_batches_size >= self.config.in_progress_queue_size {
return self.request_inflight_batch_again(client).await;
}
@ -462,14 +481,26 @@ where
None
}
/// Attempts to give work to a free client.
///
/// This function will use our current state to decide if we should send a request for a chain entry
/// or if we should request a batch of blocks.
///
/// Returns the [`ClientPoolDropGuard`] back if it doesn't have the data we currently need according
/// to it's pruning seed.
async fn try_handle_free_client(
&mut self,
chain_tracker: &mut ChainTracker<N>,
client: ClientPoolDropGuard<N>,
) -> Option<ClientPoolDropGuard<N>> {
// We send 2 requests, so if one of them is slow/ doesn't have the next chain we still have a backup.
if self.chain_entry_task.len() < 2
&& self.amount_of_empty_chain_entries <= 5
// If we have had too many failures then assume the tip has been found so no more chain entries.
&& self.amount_of_empty_chain_entries <= EMPTY_CHAIN_ENTIES_BEFORE_TOP_ASSUMED
// Check we have a nice buffer of pending block IDs to retrieve, we don't want to be waiting around
// for a chain entry.
&& chain_tracker.block_requests_queued(self.amount_of_blocks_to_request) < 500
// Make sure this peer actually has the chain.
&& chain_tracker.should_ask_for_next_chain_entry(&client.info.pruning_seed)
{
let history = chain_tracker.get_simple_history();
@ -486,9 +517,11 @@ where
return None;
}
// Request a batch of blocks instead.
self.request_block_batch(chain_tracker, client).await
}
/// Checks the [`ClientPool`] for free peers.
async fn check_for_free_clients(
&mut self,
chain_tracker: &mut ChainTracker<N>,
@ -667,7 +700,7 @@ where
_ = check_client_pool_interval.tick() => {
self.check_for_free_clients(&mut chain_tracker).await?;
if self.block_download_tasks.is_empty() && self.amount_of_empty_chain_entries > 5 {
if self.inflight_requests.is_empty() && self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTIES_BEFORE_TOP_ASSUMED {
return Ok(())
}
}
@ -676,7 +709,7 @@ where
self.handle_download_batch_res(start_height, res, &mut chain_tracker).await?;
if self.block_download_tasks.is_empty() && self.amount_of_empty_chain_entries > 5 {
if self.inflight_requests.is_empty() && self.amount_of_empty_chain_entries >= EMPTY_CHAIN_ENTIES_BEFORE_TOP_ASSUMED {
return Ok(())
}
}
@ -687,7 +720,7 @@ where
self.amount_of_empty_chain_entries += 1;
}
if chain_tracker.add_entry(entry).is_ok() {
if chain_tracker.add_entry(entry).is_ok() {
self.amount_of_empty_chain_entries = 0;
}
@ -791,6 +824,7 @@ async fn request_batch_from_peer<N: NetworkZone>(
}
// Check the height lines up as expected.
// This must happen after the hash check.
if !block
.number()
.is_some_and(|height| height == expected_height)
@ -908,6 +942,10 @@ async fn request_chain_entry_from_peer<N: NetworkZone>(
Ok((client, entry))
}
/// Initial chain search, this function pulls [`INITIAL_CHAIN_REQUESTS_TO_SEND`] peers from the [`ClientPool`]
/// and sends chain requests to all of them.
///
/// We then wait for their response and choose the peer who claims the highest cumulative difficulty.
async fn initial_chain_search<N: NetworkZone, S, C>(
client_pool: &Arc<ClientPool<N>>,
mut peer_sync_svc: S,
@ -917,6 +955,7 @@ where
S: PeerSyncSvc<N>,
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>,
{
// Get our history.
let ChainSvcResponse::CompactHistory {
block_ids,
cumulative_difficulty,
@ -943,6 +982,7 @@ where
panic!("peer sync service sent wrong response.");
};
// Shuffle the list to remove any possibility of peers being able to prioritize getting picked.
peers.shuffle(&mut thread_rng());
let mut peers = client_pool.borrow_clients(&peers);
@ -954,10 +994,12 @@ where
prune: false,
});
// Send the requests.
while futs.len() < INITIAL_CHAIN_REQUESTS_TO_SEND {
let Some(mut next_peer) = peers.next() else {
break;
};
let cloned_req = req.clone();
futs.spawn(timeout(CHAIN_ENTRY_REQUEST_TIMEOUT, async move {
let PeerResponse::GetChain(chain_res) =
@ -972,6 +1014,7 @@ where
let mut res: Option<(ChainResponse, InternalPeerID<_>, ConnectionHandle)> = None;
// Wait for the peers responses.
while let Some(task_res) = futs.join_next().await {
let Ok(Ok(task_res)) = task_res.unwrap() else {
continue;
@ -979,12 +1022,14 @@ where
match &mut res {
Some(res) => {
// res has already been set, replace it if this peer claims higher cumulative difficulty
if res.0.cumulative_difficulty() < task_res.0.cumulative_difficulty() {
let _ = std::mem::replace(res, task_res);
let _ = mem::replace(res, task_res);
}
}
None => {
let _ = std::mem::replace(&mut res, Some(task_res));
// res has not been set, set it now;
res = Some(task_res);
}
}
}
@ -994,11 +1039,11 @@ where
};
let hashes: Vec<[u8; 32]> = (&chain_res.m_block_ids).into();
let start_height = chain_res.start_height;
// drop this to deallocate the [`Bytes`].
drop(chain_res);
let ChainSvcResponse::FindFirstUnknown(first_unknown) = our_chain_svc
// Find the first unknown block in the batch.
let ChainSvcResponse::FindFirstUnknown(first_unknown, expected_height) = our_chain_svc
.ready()
.await?
.call(ChainSvcRequest::FindFirstUnknown(hashes.clone()))
@ -1007,22 +1052,25 @@ where
panic!("chain service sent wrong response.");
};
// The peer must send at least one block we already know.
if first_unknown == 0 {
peer_handle.ban_peer(MEDIUM_BAN);
return Err(BlockDownloadError::PeerSentNoOverlappingBlocks);
}
// We know all the blocks already
// TODO: The peer could still be on a different chain, however the chain might just be too far split.
if first_unknown == hashes.len() {
return Err(BlockDownloadError::FailedToFindAChainToFollow);
}
let first_entry = ChainEntry {
ids: hashes[first_unknown..].to_vec(),
peer: peer_id,
handle: peer_handle,
};
let tracker = ChainTracker::new(
first_entry,
start_height + u64::try_from(first_unknown).unwrap(),
our_genesis,
);
let tracker = ChainTracker::new(first_entry, expected_height, our_genesis);
Ok(tracker)
}

View file

@ -1,6 +1,7 @@
use fixed_bytes::ByteArrayVec;
use std::{cmp::min, collections::VecDeque};
use fixed_bytes::ByteArrayVec;
use monero_p2p::{client::InternalPeerID, handles::ConnectionHandle, NetworkZone};
use monero_pruning::{PruningSeed, CRYPTONOTE_MAX_BLOCK_HEIGHT};
@ -55,6 +56,7 @@ pub struct ChainTracker<N: NetworkZone> {
}
impl<N: NetworkZone> ChainTracker<N> {
/// Creates a new chain tracker.
pub fn new(new_entry: ChainEntry<N>, first_height: u64, our_genesis: [u8; 32]) -> Self {
let top_seen_hash = *new_entry.ids.last().unwrap();
let mut entries = VecDeque::with_capacity(1);
@ -96,8 +98,8 @@ impl<N: NetworkZone> ChainTracker<N> {
.sum()
}
/// Attempts to add an incoming [`ChainEntry`] to the chain tracker.
pub fn add_entry(&mut self, mut chain_entry: ChainEntry<N>) -> Result<(), ChainTrackerError> {
// TODO: check chain entries length.
if chain_entry.ids.is_empty() {
// The peer must send at lest one overlapping block.
chain_entry.handle.ban_peer(MEDIUM_BAN);
@ -105,7 +107,6 @@ impl<N: NetworkZone> ChainTracker<N> {
}
if chain_entry.ids.len() == 1 {
// TODO: properly handle this
return Err(ChainTrackerError::NewEntryDoesNotFollowChain);
}
@ -117,8 +118,6 @@ impl<N: NetworkZone> ChainTracker<N> {
return Err(ChainTrackerError::NewEntryDoesNotFollowChain);
}
tracing::warn!("len: {}", chain_entry.ids.len());
let new_entry = ChainEntry {
// ignore the first block - we already know it.
ids: chain_entry.ids.split_off(1),
@ -133,6 +132,9 @@ impl<N: NetworkZone> ChainTracker<N> {
Ok(())
}
/// Returns a batch of blocks to request.
///
/// The returned batches length will be less than or equal to `max_blocks`
pub fn blocks_to_get(
&mut self,
pruning_seed: &PruningSeed,
@ -142,8 +144,6 @@ impl<N: NetworkZone> ChainTracker<N> {
return None;
}
// TODO: make sure max block height is enforced.
let entry = self.entries.front_mut()?;
// Calculate the ending index for us to get in this batch, will be the smallest out of `max_blocks`, the length of the batch or

View file

@ -65,6 +65,9 @@ pub(crate) const MAX_BLOCKS_IDS_IN_CHAIN_ENTRY: usize = 25_000;
pub(crate) const MAX_DOWNLOAD_FAILURES: usize = 3;
/// The amount of empty chain entries to receive before we assume we have found the top of the chain.
pub(crate) const EMPTY_CHAIN_ENTIES_BEFORE_TOP_ASSUMED: usize = 5;
#[cfg(test)]
mod tests {
use super::*;

View file

@ -4,13 +4,14 @@
//! a certain [`NetworkZone`]
use std::sync::Arc;
use async_buffer::BufferStream;
use futures::FutureExt;
use tokio::{
sync::{mpsc, watch},
task::JoinSet,
};
use tokio_stream::wrappers::WatchStream;
use tower::{buffer::Buffer, util::BoxCloneService, ServiceExt};
use tower::{buffer::Buffer, util::BoxCloneService, Service, ServiceExt};
use tracing::{instrument, Instrument, Span};
use monero_p2p::{
@ -29,6 +30,9 @@ mod constants;
mod inbound_server;
mod sync_states;
use crate::block_downloader::{
BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse,
};
pub use broadcast::{BroadcastRequest, BroadcastSvc};
use client_pool::ClientPoolDropGuard;
pub use config::P2PConfig;
@ -146,8 +150,7 @@ where
#[derive(Clone)]
pub struct NetworkInterface<N: NetworkZone> {
/// A pool of free connected peers.
// TODO: remove pub
pub pool: Arc<client_pool::ClientPool<N>>,
pool: Arc<client_pool::ClientPool<N>>,
/// A [`Service`] that allows broadcasting to all connected peers.
broadcast_svc: BroadcastSvc<N>,
/// A [`watch`] channel that contains the highest seen cumulative difficulty and other info
@ -158,7 +161,8 @@ pub struct NetworkInterface<N: NetworkZone> {
make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
/// The address book service.
address_book: BoxCloneService<AddressBookRequest<N>, AddressBookResponse<N>, tower::BoxError>,
pub sync_states_svc: Buffer<sync_states::PeerSyncSvc<N>, PeerSyncRequest<N>>,
/// The peers sync states service.
sync_states_svc: Buffer<sync_states::PeerSyncSvc<N>, PeerSyncRequest<N>>,
/// Background tasks that will be aborted when this interface is dropped.
_background_tasks: Arc<JoinSet<()>>,
}
@ -169,6 +173,26 @@ impl<N: NetworkZone> NetworkInterface<N> {
self.broadcast_svc.clone()
}
/// Starts the block downloader and returns a stream that will yield sequentially downloaded blocks.
pub fn block_downloader<C>(
&self,
our_chain_service: C,
config: BlockDownloaderConfig,
) -> BufferStream<BlockBatch>
where
C: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
+ Send
+ 'static,
C::Future: Send + 'static,
{
block_downloader::download_blocks(
self.pool.clone(),
self.sync_states_svc.clone(),
our_chain_service,
config,
)
}
/// Returns a stream which yields the highest seen sync state from a connected peer.
pub fn top_sync_stream(&self) -> WatchStream<sync_states::NewSyncInfo> {
WatchStream::from_changes(self.top_block_watch.clone())