add test bin

This commit is contained in:
Boog900 2024-05-29 02:05:04 +01:00
parent 759369dd0a
commit 6979dba92d
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
3 changed files with 169 additions and 11 deletions

View file

@ -0,0 +1,154 @@
use cuprate_p2p::block_downloader::{
download_blocks, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse,
};
use cuprate_p2p::{initialize_network, P2PConfig};
use futures::{FutureExt, StreamExt};
use monero_address_book::AddressBookConfig;
use monero_p2p::network_zones::ClearNet;
use monero_p2p::services::{CoreSyncDataRequest, CoreSyncDataResponse};
use monero_p2p::{PeerRequest, PeerResponse};
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::sleep;
use tower::Service;
use tracing::Level;
#[derive(Clone)]
pub struct DummyCoreSyncSvc;
impl Service<CoreSyncDataRequest> for DummyCoreSyncSvc {
type Response = CoreSyncDataResponse;
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: CoreSyncDataRequest) -> Self::Future {
async move {
Ok(CoreSyncDataResponse(monero_wire::CoreSyncData {
cumulative_difficulty: 1,
cumulative_difficulty_top64: 0,
current_height: 1,
pruning_seed: 0,
top_id: hex::decode(
"418015bb9ae982a1975da7d79277c2705727a56894ba0fb246adaabb1f4632e3",
)
.unwrap()
.try_into()
.unwrap(),
top_version: 1,
}))
}
.boxed()
}
}
#[derive(Clone)]
pub struct DummyPeerRequestHandlerSvc;
impl Service<PeerRequest> for DummyPeerRequestHandlerSvc {
type Response = PeerResponse;
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: PeerRequest) -> Self::Future {
async move { Ok(PeerResponse::NA) }.boxed()
}
}
pub struct OurChainSvc;
impl Service<ChainSvcRequest> for OurChainSvc {
type Response = ChainSvcResponse;
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: ChainSvcRequest) -> Self::Future {
async move {
Ok(match req {
ChainSvcRequest::CompactHistory => ChainSvcResponse::CompactHistory {
block_ids: vec![hex::decode(
"418015bb9ae982a1975da7d79277c2705727a56894ba0fb246adaabb1f4632e3",
)
.unwrap()
.try_into()
.unwrap()],
cumulative_difficulty: 1,
},
ChainSvcRequest::FindFirstUnknown(_) => ChainSvcResponse::FindFirstUnknown(1),
ChainSvcRequest::CumulativeDifficulty => ChainSvcResponse::CumulativeDifficulty(1),
})
}
.boxed()
}
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();
let config = P2PConfig::<ClearNet> {
network: Default::default(),
outbound_connections: 64,
extra_outbound_connections: 32,
max_inbound_connections: 128,
gray_peers_percent: 0.7,
server_config: None,
p2p_port: 18081,
rpc_port: 0,
address_book_config: AddressBookConfig {
max_white_list_length: 1000,
max_gray_list_length: 5000,
peer_store_file: PathBuf::from_str("p2p_store").unwrap(),
peer_save_period: Duration::from_secs(30),
},
};
let net = initialize_network(DummyPeerRequestHandlerSvc, DummyCoreSyncSvc, config)
.await
.unwrap();
sleep(Duration::from_secs(5)).await;
let mut buffer = download_blocks(
net.pool.clone(),
net.sync_states_svc.clone(),
OurChainSvc,
BlockDownloaderConfig {
buffer_size: 50_000_000,
in_progress_queue_size: 5_000_000,
check_client_pool_interval: Duration::from_secs(30),
target_batch_size: 100_000,
initial_batch_size: 100,
},
);
while let Some(entry) = buffer.next().await {
tracing::info!(
"height: {}, amount{}",
entry.blocks[0].0.number().unwrap(),
entry.blocks.len()
)
}
sleep(Duration::from_secs(999999999)).await;
}

View file

@ -34,11 +34,11 @@ use crate::constants::{INITIAL_CHAIN_REQUESTS_TO_SEND, MEDIUM_BAN};
#[derive(Debug)]
pub struct BlockBatch {
/// The blocks.
blocks: Vec<(Block, Vec<Transaction>)>,
pub blocks: Vec<(Block, Vec<Transaction>)>,
/// The size of this batch in bytes.
size: usize,
pub size: usize,
/// The peer that gave us this block.
peer_handle: ConnectionHandle,
pub peer_handle: ConnectionHandle,
}
/// The block downloader config.
@ -46,15 +46,15 @@ pub struct BlockBatch {
pub struct BlockDownloaderConfig {
/// The size of the buffer between the block downloader and the place which
/// is consuming the downloaded blocks.
buffer_size: usize,
pub buffer_size: usize,
/// The size of the in progress queue at which we stop requesting more blocks.
in_progress_queue_size: usize,
pub in_progress_queue_size: usize,
/// The [`Duration`] between checking the client pool for free peers.
check_client_pool_interval: Duration,
pub check_client_pool_interval: Duration,
/// The target size of a single batch of blocks (in bytes).
target_batch_size: usize,
pub target_batch_size: usize,
/// The initial amount of blocks to request (in number of blocks)
initial_batch_size: usize,
pub initial_batch_size: usize,
}
#[derive(Debug, thiserror::Error)]

View file

@ -14,7 +14,7 @@ use tracing::{instrument, Instrument, Span};
use monero_p2p::{CoreSyncSvc, NetworkZone, PeerRequestHandler};
mod block_downloader;
pub mod block_downloader;
mod broadcast;
mod client_pool;
pub mod config;
@ -26,6 +26,7 @@ mod sync_states;
use crate::connection_maintainer::MakeConnectionRequest;
pub use config::P2PConfig;
use monero_p2p::client::Connector;
use monero_p2p::services::PeerSyncRequest;
/// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`].
///
@ -80,7 +81,7 @@ where
let inbound_handshaker = monero_p2p::client::HandShaker::new(
address_book.clone(),
sync_states_svc,
sync_states_svc.clone(),
core_sync_svc.clone(),
peer_req_handler,
inbound_mkr,
@ -115,13 +116,14 @@ where
broadcast_svc,
top_block_watch,
make_connection_tx,
sync_states_svc,
})
}
/// The interface to Monero's P2P network on a certain [`NetworkZone`].
pub struct NetworkInterface<N: NetworkZone> {
/// A pool of free connected peers.
pool: Arc<client_pool::ClientPool<N>>,
pub pool: Arc<client_pool::ClientPool<N>>,
/// A [`Service`](tower::Service) that allows broadcasting to all connected peers.
broadcast_svc: broadcast::BroadcastSvc<N>,
/// A [`watch`] channel that contains the highest seen cumulative difficulty and other info
@ -129,4 +131,6 @@ pub struct NetworkInterface<N: NetworkZone> {
top_block_watch: watch::Receiver<sync_states::NewSyncInfo>,
/// A channel to request extra connections.
make_connection_tx: mpsc::Sender<MakeConnectionRequest>,
pub sync_states_svc: Buffer<sync_states::PeerSyncSvc<N>, PeerSyncRequest<N>>,
}