diff --git a/tests/compat/src/cli.rs b/tests/compat/src/cli.rs index b1763230..1ddaaaec 100644 --- a/tests/compat/src/cli.rs +++ b/tests/compat/src/cli.rs @@ -17,15 +17,25 @@ pub struct Args { /// Base URL to use for `monerod` RPC. /// /// This must be a non-restricted RPC. - #[arg(short, long, default_value_t = String::from("http://127.0.0.1:18081"))] + #[arg(long, default_value_t = String::from("http://127.0.0.1:18081"))] pub rpc_url: String, + /// Amount of async RPC tasks to spawn. + #[arg(long, default_value_t = NonZeroUsize::new(4).unwrap())] + pub rpc_tasks: NonZeroUsize, + + /// The maximum capacity of the block buffer in-between the RPC and verifier. + /// + /// `0` will cause the buffer to be unbounded. + #[arg(long, default_value_t = 1000)] + pub buffer_limit: usize, + /// Amount of verifying threads to spawn. - #[arg(short, long, default_value_t = std::thread::available_parallelism().unwrap())] + #[arg(long, default_value_t = std::thread::available_parallelism().unwrap())] pub threads: NonZeroUsize, /// Print an update every `update` amount of blocks. - #[arg(short, long, default_value_t = NonZeroU64::new(500).unwrap())] + #[arg(long, default_value_t = NonZeroU64::new(500).unwrap())] pub update: NonZeroU64, } diff --git a/tests/compat/src/main.rs b/tests/compat/src/main.rs index a0953f40..69384aa9 100644 --- a/tests/compat/src/main.rs +++ b/tests/compat/src/main.rs @@ -21,17 +21,23 @@ async fn main() { let cli::Args { rpc_url, update, + rpc_tasks, + buffer_limit, threads, } = cli::Args::get(); // Set-up RPC client. - let client = rpc::RpcClient::new(rpc_url).await; + let client = rpc::RpcClient::new(rpc_url, rpc_tasks).await; let top_height = client.top_height; println!("top_height: {top_height}"); println!(); // Test. - let (tx, rx) = crossbeam::channel::unbounded(); + let (tx, rx) = if buffer_limit == 0 { + crossbeam::channel::unbounded() + } else { + crossbeam::channel::bounded(buffer_limit) + }; verify::spawn_verify_pool(threads, update, top_height, rx); client.test(top_height, tx).await; diff --git a/tests/compat/src/rpc.rs b/tests/compat/src/rpc.rs index 9fd1b1dd..42194557 100644 --- a/tests/compat/src/rpc.rs +++ b/tests/compat/src/rpc.rs @@ -1,4 +1,4 @@ -use std::time::Instant; +use std::{num::NonZeroUsize, time::Instant}; use crossbeam::channel::Sender; use monero_serai::{block::Block, transaction::Transaction}; @@ -20,11 +20,12 @@ pub struct RpcClient { client: Client, json_rpc_url: String, get_transactions_url: String, + rpc_tasks: NonZeroUsize, pub top_height: u64, } impl RpcClient { - pub async fn new(rpc_url: String) -> Self { + pub async fn new(rpc_url: String, rpc_tasks: NonZeroUsize) -> Self { let headers = { let mut h = HeaderMap::new(); h.insert("Content-Type", HeaderValue::from_static("application/json")); @@ -70,6 +71,7 @@ impl RpcClient { client, json_rpc_url, get_transactions_url, + rpc_tasks, top_height, } } @@ -207,7 +209,7 @@ impl RpcClient { }); futures::stream::iter(iter) - .buffer_unordered(4) // This can't be too high or else we get bottlenecked by `monerod` + .buffer_unordered(self.rpc_tasks.get()) // This can't be too high or else we get bottlenecked by `monerod` .for_each(|()| async {}) .await; }