dynamic batch sizes

This commit is contained in:
Boog900 2024-05-29 23:47:19 +01:00
parent 7e19a59d94
commit 1ae87734fe
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
4 changed files with 163 additions and 46 deletions

View file

@ -16,6 +16,7 @@ use std::time::Duration;
use tokio::time::sleep;
use tower::Service;
use tracing::Level;
use tracing_subscriber::fmt::time::Uptime;
#[derive(Clone)]
pub struct DummyCoreSyncSvc;
@ -104,6 +105,7 @@ impl Service<ChainSvcRequest> for OurChainSvc {
async fn main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.with_timer(Uptime::default())
.init();
let config = P2PConfig::<ClearNet> {
@ -127,28 +129,28 @@ async fn main() {
.await
.unwrap();
sleep(Duration::from_secs(5)).await;
sleep(Duration::from_secs(55)).await;
let mut buffer = download_blocks(
net.pool.clone(),
net.sync_states_svc.clone(),
OurChainSvc,
BlockDownloaderConfig {
buffer_size: 50_000_000,
in_progress_queue_size: 5_000_000,
check_client_pool_interval: Duration::from_secs(30),
target_batch_size: 100_000,
initial_batch_size: 100,
},
);
loop {
let mut buffer = download_blocks(
net.pool.clone(),
net.sync_states_svc.clone(),
OurChainSvc,
BlockDownloaderConfig {
buffer_size: 50_000_000,
in_progress_queue_size: 30_000_000,
check_client_pool_interval: Duration::from_secs(15),
target_batch_size: 2_000_000,
initial_batch_size: 100,
},
);
while let Some(entry) = buffer.next().await {
tracing::info!(
"height: {}, amount{}",
entry.blocks[0].0.number().unwrap(),
entry.blocks.len()
)
while let Some(entry) = buffer.next().await {
tracing::info!(
"height: {}, amount{}",
entry.blocks[0].0.number().unwrap(),
entry.blocks.len()
)
}
}
sleep(Duration::from_secs(999999999)).await;
}

View file

@ -3,7 +3,7 @@
mod chain_tracker;
use std::cmp::{Ordering, Reverse};
use std::cmp::{max, min, Ordering, Reverse};
use std::collections::{BTreeMap, BinaryHeap, HashSet, VecDeque};
use std::sync::Arc;
use std::time::Duration;
@ -170,6 +170,7 @@ struct BlockDownloader<N: NetworkZone, S, C> {
our_chain_svc: C,
amount_of_blocks_to_request: usize,
amount_of_blocks_to_request_updated_at: u64,
#[allow(clippy::type_complexity)]
block_download_tasks: JoinSet<(
@ -183,6 +184,8 @@ struct BlockDownloader<N: NetworkZone, S, C> {
ready_batches: BinaryHeap<ReadyQueueBatch>,
failed_batches: BinaryHeap<Reverse<u64>>,
ready_batches_size: usize,
buffer_appender: BufferAppender<BlockBatch>,
config: BlockDownloaderConfig,
@ -210,23 +213,89 @@ where
peer_sync_svc,
our_chain_svc,
amount_of_blocks_to_request: config.initial_batch_size,
amount_of_blocks_to_request_updated_at: 0,
block_download_tasks: JoinSet::new(),
chain_entry_task: JoinSet::new(),
inflight_requests: BTreeMap::new(),
ready_batches: BinaryHeap::new(),
ready_batches_size: 0,
failed_batches: BinaryHeap::new(),
buffer_appender,
config,
}
}
async fn request_inflight_batch_again(&mut self, client: ClientPoolDropGuard<N>) {
if self.inflight_requests.is_empty() {
panic!("We need requests inflight to be able to send the request again")
}
let first_batch_requests_sent = self
.inflight_requests
.first_key_value()
.unwrap()
.1
.requests_sent;
if first_batch_requests_sent
== self
.inflight_requests
.last_key_value()
.unwrap()
.1
.requests_sent
{
let mut first_batch = self.inflight_requests.first_entry().unwrap();
first_batch.get_mut().requests_sent += 1;
// They should have the blocks so send the re-request to this peer.
let ids = first_batch.get().ids.clone();
let start_height = first_batch.get().start_height;
self.block_download_tasks.spawn(async move {
(
start_height,
request_batch_from_peer(client, ids, start_height).await,
)
});
return;
}
let next_batch = self
.inflight_requests
.iter_mut()
.find(|(_, next_batch)| next_batch.requests_sent != first_batch_requests_sent)
.unwrap()
.1;
next_batch.requests_sent += 1;
// They should have the blocks so send the re-request to this peer.
let ids = next_batch.ids.clone();
let start_height = next_batch.start_height;
self.block_download_tasks.spawn(async move {
(
start_height,
request_batch_from_peer(client, ids, start_height).await,
)
});
}
/// Spawns a task to request blocks from the given peer.
async fn request_block_batch(
&mut self,
chain_tracker: &mut ChainTracker<N>,
client: ClientPoolDropGuard<N>,
) {
// First look to see if we have any failed requests.
while let Some(failed_request) = self.failed_batches.peek() {
// Check if we still have the request that failed - another peer could have completed it after
// failure.
if let Some(request) = self.inflight_requests.get(&failed_request.0) {
// Check if this peer has the blocks according to their pruning seed.
if client
.info
.pruning_seed
@ -236,6 +305,7 @@ where
CRYPTONOTE_MAX_BLOCK_HEIGHT,
)
{
// They should have the blocks so send the re-request to this peer.
let ids = request.ids.clone();
let start_height = request.start_height;
@ -245,6 +315,7 @@ where
request_batch_from_peer(client, ids, start_height).await,
)
});
// Remove the failure, we have just handled it.
self.failed_batches.pop();
return;
@ -252,10 +323,18 @@ where
break;
} else {
// We don't have the request in flight so remove the failure.
self.failed_batches.pop();
}
}
if self.ready_batches_size >= self.config.in_progress_queue_size {
self.request_inflight_batch_again(client).await;
return;
}
// No failed requests that we can handle, request some new blocks.
let Some(block_entry_to_get) = chain_tracker
.blocks_to_get(&client.info.pruning_seed, self.amount_of_blocks_to_request)
else {
@ -337,6 +416,32 @@ where
Ok(())
}
async fn push_new_blocks(&mut self) -> Result<(), BlockDownloadError> {
while let Some(ready_batch) = self.ready_batches.peek() {
if self
.inflight_requests
.first_key_value()
.is_some_and(|(&lowest_start_height, _)| {
ready_batch.start_height > lowest_start_height
})
{
break;
}
let ready_batch = self.ready_batches.pop().unwrap();
let size = ready_batch.block_batch.size;
self.ready_batches_size -= size;
self.buffer_appender
.send(ready_batch.block_batch, size)
.await
.map_err(|_| BlockDownloadError::BufferWasClosed)?;
}
Ok(())
}
async fn handle_download_batch_res(
&mut self,
start_height: u64,
@ -357,28 +462,24 @@ where
return Ok(());
};
if start_height > self.amount_of_blocks_to_request_updated_at {
let average_block_size =
max((block_batch.size * 2) / block_batch.blocks.len(), 1);
self.amount_of_blocks_to_request = min(
max(self.config.target_batch_size / average_block_size, 1),
100,
);
self.amount_of_blocks_to_request_updated_at = start_height;
}
self.ready_batches_size += block_batch.size;
self.ready_batches.push(ReadyQueueBatch {
start_height,
block_batch,
});
while let (Some(ready_batch), Some((&lowest_start_height, _))) = (
self.ready_batches.peek(),
self.inflight_requests.first_key_value(),
) {
if ready_batch.start_height > lowest_start_height {
break;
}
let ready_batch = self.ready_batches.pop().unwrap();
let size = ready_batch.block_batch.size;
self.buffer_appender
.send(ready_batch.block_batch, size)
.await
.map_err(|_| BlockDownloadError::BufferWasClosed)?;
}
self.push_new_blocks().await?;
self.handle_free_client(chain_tracker, client).await;
Ok(())
@ -415,7 +516,7 @@ where
match res {
Ok((client, entry)) => {
if chain_tracker.add_entry(entry).is_ok() {
self.chain_entry_task.abort_all();
}
self.handle_free_client(&mut chain_tracker, client).await;
}
@ -465,6 +566,8 @@ async fn request_batch_from_peer<N: NetworkZone>(
.blocks
.into_par_iter()
.map(|block_entry| {
let mut size = block_entry.block.len();
let block = Block::read(&mut block_entry.block.as_ref())
.map_err(|_| BlockDownloadError::PeersResponseWasInvalid)?;
@ -476,8 +579,11 @@ async fn request_batch_from_peer<N: NetworkZone>(
.txs
.take_normal()
.ok_or(BlockDownloadError::PeersResponseWasInvalid)?
.into_par_iter()
.map(|tx_blob| Transaction::read(&mut tx_blob.as_ref()))
.into_iter()
.map(|tx_blob| {
size += tx_blob.len();
Transaction::read(&mut tx_blob.as_ref())
})
.collect::<Result<Vec<_>, _>>()
.map_err(|_| BlockDownloadError::PeersResponseWasInvalid)?;
@ -493,15 +599,15 @@ async fn request_batch_from_peer<N: NetworkZone>(
return Err(BlockDownloadError::PeersResponseWasInvalid);
}
Ok((block, txs))
Ok(((block, txs), size))
})
.collect::<Result<Vec<_>, _>>();
.collect::<Result<(Vec<_>, Vec<_>), _>>();
blocks
})
.await;
let blocks = blocks.inspect_err(|e| {
let (blocks, sizes) = blocks.inspect_err(|e| {
if matches!(e, BlockDownloadError::PeersResponseWasInvalid) {
client.info.handle.ban_peer(MEDIUM_BAN);
}
@ -513,7 +619,7 @@ async fn request_batch_from_peer<N: NetworkZone>(
client,
BlockBatch {
blocks,
size: 0,
size: sizes.iter().sum(),
peer_handle,
},
))

View file

@ -29,6 +29,8 @@ pub struct BlocksToRetrieve<N: NetworkZone> {
pub peer_who_told_us: InternalPeerID<N::Addr>,
/// The peer who told us about this batch's handle.
pub peer_who_told_us_handle: ConnectionHandle,
/// The number of requests sent for this batch.
pub requests_sent: usize,
}
pub enum ChainTrackerError {
@ -101,6 +103,11 @@ impl<N: NetworkZone> ChainTracker<N> {
return Err(ChainTrackerError::NewEntryIsInvalid);
}
if chain_entry.ids.len() == 1 {
// TODO: properly handle this
return Err(ChainTrackerError::NewEntryDoesNotFollowChain);
}
if self
.entries
.back()
@ -165,6 +172,7 @@ impl<N: NetworkZone> ChainTracker<N> {
start_height: self.first_height,
peer_who_told_us: entry.peer,
peer_who_told_us_handle: entry.handle.clone(),
requests_sent: 0,
};
self.first_height += u64::try_from(end_idx).unwrap();

View file

@ -158,6 +158,7 @@ impl<Z: NetworkZone> Service<PeerRequest> for Client<Z> {
permit: Some(permit),
};
// TODO: this can panic if the channel was closed between poll_ready and this.
self.connection_tx
.try_send(req)
.map_err(|_| ())