Support multiple RPCs in the reserialize_chain bin

This commit is contained in:
Luke Parker 2023-07-05 19:49:01 -04:00
parent 8ce8657d34
commit 249f7b904f
No known key found for this signature in database

View file

@ -1,4 +1,4 @@
use std::{sync::Arc, collections::VecDeque}; use std::sync::Arc;
use serde::Deserialize; use serde::Deserialize;
use serde_json::json; use serde_json::json;
@ -90,35 +90,51 @@ async fn main() {
let async_parallelism: usize = let async_parallelism: usize =
args.get(2).unwrap_or(&"8".to_string()).parse::<usize>().expect("invalid parallelism argument"); args.get(2).unwrap_or(&"8".to_string()).parse::<usize>().expect("invalid parallelism argument");
// Read URL as the second arg // Read further args as RPC URLs
let default_node = "http://xmr-node.cakewallet.com:18081".to_string(); let default_nodes = vec![
let url = args.get(3).unwrap_or(&default_node); "http://xmr-node.cakewallet.com:18081".to_string(),
"https://node.sethforprivacy.com".to_string(),
];
let mut specified_nodes = vec![];
{
let mut i = 0;
loop {
let Some(node) = args.get(3 + i) else { break };
specified_nodes.push(node.clone());
i += 1;
}
}
let nodes = if specified_nodes.is_empty() { default_nodes } else { specified_nodes };
let rpc = let rpc = |url: String| {
|| HttpRpc::new(url.clone()).expect(&format!("couldn't create HttpRpc connected to {url}")); HttpRpc::new(url.clone())
let main_rpc = rpc(); .unwrap_or_else(|_| panic!("couldn't create HttpRpc connected to {url}"))
};
let main_rpc = rpc(nodes[0].clone());
let mut rpcs = vec![]; let mut rpcs = vec![];
for _ in 0 .. async_parallelism { for i in 0 .. async_parallelism {
rpcs.push(Arc::new(rpc())); rpcs.push(Arc::new(rpc(nodes[i % nodes.len()].clone())));
} }
let mut rpc_i = 0; let mut rpc_i = 0;
let mut handles: VecDeque<JoinHandle<()>> = VecDeque::new(); let mut handles: Vec<JoinHandle<()>> = vec![];
while block_i < main_rpc.get_height().await.expect("couldn't call get_height") { while block_i < main_rpc.get_height().await.expect("couldn't call get_height") {
if handles.len() >= async_parallelism { if handles.len() >= async_parallelism {
// Guarantee one handle is complete // Guarantee one handle is complete
handles.pop_front().unwrap().await.unwrap(); handles.swap_remove(0).await.unwrap();
// Remove all of the leading handles // Remove all of the finished handles
// This ensures, if 100 is taking a while, we don't print we finished anything beyond 107 let mut i = 0;
while let Some(handle) = handles.get(0) { while i < handles.len() {
if handle.is_finished() { if handles[i].is_finished() {
handles.pop_front().unwrap().await.unwrap(); handles.swap_remove(i).await.unwrap();
continue;
} }
i += 1;
} }
} }
handles.push_back(tokio::spawn(check_block(rpcs[rpc_i].clone(), block_i))); handles.push(tokio::spawn(check_block(rpcs[rpc_i].clone(), block_i)));
rpc_i = (rpc_i + 1) % rpcs.len(); rpc_i = (rpc_i + 1) % rpcs.len();
block_i += 1; block_i += 1;
} }