more cli args

This commit is contained in:
hinto.janai 2024-12-16 22:14:51 -05:00
parent b4797fa31f
commit 1bc5e4721c
No known key found for this signature in database
GPG key ID: D47CE05FA175A499
3 changed files with 26 additions and 8 deletions

View file

@ -17,15 +17,25 @@ pub struct Args {
/// Base URL to use for `monerod` RPC. /// Base URL to use for `monerod` RPC.
/// ///
/// This must be a non-restricted RPC. /// This must be a non-restricted RPC.
#[arg(short, long, default_value_t = String::from("http://127.0.0.1:18081"))] #[arg(long, default_value_t = String::from("http://127.0.0.1:18081"))]
pub rpc_url: String, pub rpc_url: String,
/// Amount of async RPC tasks to spawn.
#[arg(long, default_value_t = NonZeroUsize::new(4).unwrap())]
pub rpc_tasks: NonZeroUsize,
/// The maximum capacity of the block buffer in-between the RPC and verifier.
///
/// `0` will cause the buffer to be unbounded.
#[arg(long, default_value_t = 1000)]
pub buffer_limit: usize,
/// Amount of verifying threads to spawn. /// Amount of verifying threads to spawn.
#[arg(short, long, default_value_t = std::thread::available_parallelism().unwrap())] #[arg(long, default_value_t = std::thread::available_parallelism().unwrap())]
pub threads: NonZeroUsize, pub threads: NonZeroUsize,
/// Print an update every `update` amount of blocks. /// Print an update every `update` amount of blocks.
#[arg(short, long, default_value_t = NonZeroU64::new(500).unwrap())] #[arg(long, default_value_t = NonZeroU64::new(500).unwrap())]
pub update: NonZeroU64, pub update: NonZeroU64,
} }

View file

@ -21,17 +21,23 @@ async fn main() {
let cli::Args { let cli::Args {
rpc_url, rpc_url,
update, update,
rpc_tasks,
buffer_limit,
threads, threads,
} = cli::Args::get(); } = cli::Args::get();
// Set-up RPC client. // Set-up RPC client.
let client = rpc::RpcClient::new(rpc_url).await; let client = rpc::RpcClient::new(rpc_url, rpc_tasks).await;
let top_height = client.top_height; let top_height = client.top_height;
println!("top_height: {top_height}"); println!("top_height: {top_height}");
println!(); println!();
// Test. // Test.
let (tx, rx) = crossbeam::channel::unbounded(); let (tx, rx) = if buffer_limit == 0 {
crossbeam::channel::unbounded()
} else {
crossbeam::channel::bounded(buffer_limit)
};
verify::spawn_verify_pool(threads, update, top_height, rx); verify::spawn_verify_pool(threads, update, top_height, rx);
client.test(top_height, tx).await; client.test(top_height, tx).await;

View file

@ -1,4 +1,4 @@
use std::time::Instant; use std::{num::NonZeroUsize, time::Instant};
use crossbeam::channel::Sender; use crossbeam::channel::Sender;
use monero_serai::{block::Block, transaction::Transaction}; use monero_serai::{block::Block, transaction::Transaction};
@ -20,11 +20,12 @@ pub struct RpcClient {
client: Client, client: Client,
json_rpc_url: String, json_rpc_url: String,
get_transactions_url: String, get_transactions_url: String,
rpc_tasks: NonZeroUsize,
pub top_height: u64, pub top_height: u64,
} }
impl RpcClient { impl RpcClient {
pub async fn new(rpc_url: String) -> Self { pub async fn new(rpc_url: String, rpc_tasks: NonZeroUsize) -> Self {
let headers = { let headers = {
let mut h = HeaderMap::new(); let mut h = HeaderMap::new();
h.insert("Content-Type", HeaderValue::from_static("application/json")); h.insert("Content-Type", HeaderValue::from_static("application/json"));
@ -70,6 +71,7 @@ impl RpcClient {
client, client,
json_rpc_url, json_rpc_url,
get_transactions_url, get_transactions_url,
rpc_tasks,
top_height, top_height,
} }
} }
@ -207,7 +209,7 @@ impl RpcClient {
}); });
futures::stream::iter(iter) futures::stream::iter(iter)
.buffer_unordered(4) // This can't be too high or else we get bottlenecked by `monerod` .buffer_unordered(self.rpc_tasks.get()) // This can't be too high or else we get bottlenecked by `monerod`
.for_each(|()| async {}) .for_each(|()| async {})
.await; .await;
} }