verify all txs in blocks

This commit is contained in:
hinto.janai 2024-12-12 17:49:25 -05:00
parent 1d1cba4f63
commit 0a44af4b8c
No known key found for this signature in database
GPG key ID: D47CE05FA175A499
4 changed files with 183 additions and 73 deletions

View file

@ -1,4 +1,5 @@
use std::{
io::Write,
sync::atomic::{AtomicUsize, Ordering},
time::{Duration, Instant},
};
@ -6,21 +7,21 @@ use std::{
mod rpc;
pub static TESTED_BLOCK_COUNT: AtomicUsize = AtomicUsize::new(0);
pub static TESTED_TX_COUNT: AtomicUsize = AtomicUsize::new(0);
#[tokio::main]
async fn main() {
let now = Instant::now();
let rpc_node_url = if let Ok(url) = std::env::var("RPC_NODE_URL") {
let rpc_url = if let Ok(url) = std::env::var("RPC_URL") {
url
} else {
"http://127.0.0.1:18081/json_rpc".to_string()
"http://127.0.0.1:18081".to_string()
};
println!("rpc_node_url: {rpc_node_url}");
println!("rpc_url: {rpc_url}");
let top_height = rpc::RpcClient::top_height(rpc_node_url.clone()).await;
println!("top_height: {top_height}");
assert!(top_height > 3301441, "node is behind");
let client = rpc::RpcClient::new(rpc_url).await;
let top_height = client.top_height;
let ranges = (0..top_height)
.collect::<Vec<usize>>()
@ -33,10 +34,8 @@ async fn main() {
println!("[{}..{}]", range.first().unwrap(), range.last().unwrap());
}
let rpc_client = rpc::RpcClient::new(rpc_node_url);
let iter = ranges.into_iter().map(move |range| {
let c = rpc_client.clone();
let c = client.clone();
async move {
tokio::task::spawn_blocking(move || async move {
c.get_block_test_batch(range.into_iter().collect()).await;
@ -47,26 +46,20 @@ async fn main() {
}
});
std::thread::spawn(move || {
let mut count = 0;
futures::future::join_all(iter).await;
#[expect(clippy::cast_precision_loss)]
while count != top_height {
let c = TESTED_BLOCK_COUNT.load(Ordering::Acquire);
count = c;
loop {
let block_count = TESTED_BLOCK_COUNT.load(Ordering::Acquire);
let tx_count = TESTED_TX_COUNT.load(Ordering::Acquire);
if top_height == block_count {
println!(
"blocks processed ... {c}/{top_height} ({:.2}%)",
(c as f64 / top_height as f64) * 100.0
"finished processing: blocks: {block_count}/{top_height}, txs: {tx_count}, took {}s",
now.elapsed().as_secs()
);
std::thread::sleep(Duration::from_millis(250));
std::process::exit(0);
}
println!("finished all blocks, took: {}s", now.elapsed().as_secs());
std::process::exit(0);
});
futures::future::join_all(iter).await;
std::thread::park();
std::thread::sleep(Duration::from_secs(1));
}
}

View file

@ -1,7 +1,8 @@
use std::{collections::BTreeSet, sync::atomic::Ordering};
use hex::serde::deserialize;
use monero_serai::block::Block;
use monero_serai::{block::Block, transaction::Transaction};
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use reqwest::{
header::{HeaderMap, HeaderValue},
Client, ClientBuilder,
@ -9,7 +10,7 @@ use reqwest::{
use serde::Deserialize;
use serde_json::json;
use crate::TESTED_BLOCK_COUNT;
use crate::{TESTED_BLOCK_COUNT, TESTED_TX_COUNT};
#[derive(Debug, Clone, Deserialize)]
pub(crate) struct BlockHeader {
@ -33,11 +34,12 @@ pub(crate) struct BlockHeader {
#[derive(Debug, Clone)]
pub(crate) struct RpcClient {
client: Client,
rpc_node_url: String,
rpc_url: String,
pub top_height: usize,
}
impl RpcClient {
pub(crate) fn new(rpc_node_url: String) -> Self {
pub(crate) async fn new(rpc_url: String) -> Self {
let headers = {
let mut h = HeaderMap::new();
h.insert("Content-Type", HeaderValue::from_static("application/json"));
@ -49,13 +51,6 @@ impl RpcClient {
.build()
.unwrap();
Self {
client,
rpc_node_url,
}
}
pub(crate) async fn top_height(rpc_node_url: String) -> usize {
#[derive(Debug, Clone, Deserialize)]
struct JsonRpcResponse {
result: GetLastBlockHeaderResponse,
@ -66,8 +61,6 @@ impl RpcClient {
pub block_header: BlockHeader,
}
let this = Self::new(rpc_node_url);
let request = json!({
"jsonrpc": "2.0",
"id": 0,
@ -75,8 +68,8 @@ impl RpcClient {
"params": {}
});
this.client
.get(this.rpc_node_url)
let top_height = client
.get(format!("{rpc_url}/json_rpc"))
.json(&request)
.send()
.await
@ -86,7 +79,67 @@ impl RpcClient {
.unwrap()
.result
.block_header
.height
.height;
println!("top_height: {top_height}");
assert!(top_height > 3301441, "node is behind");
Self {
client,
rpc_url,
top_height,
}
}
async fn get_transactions(&self, tx_hashes: Vec<[u8; 32]>) -> Vec<(Transaction, Vec<u8>)> {
assert!(!tx_hashes.is_empty());
#[derive(Debug, Clone, Deserialize)]
pub(crate) struct GetTransactionsResponse {
pub txs: Vec<Tx>,
}
#[derive(Debug, Clone, Deserialize)]
pub(crate) struct Tx {
pub as_hex: String,
pub pruned_as_hex: String,
}
let url = format!("{}/get_transactions", self.rpc_url);
let txs_hashes = tx_hashes
.into_iter()
.map(hex::encode)
.collect::<Vec<String>>();
let request = json!({
"txs_hashes": txs_hashes,
});
let txs = self
.client
.get(&url)
.json(&request)
.send()
.await
.unwrap()
.json::<GetTransactionsResponse>()
.await
.unwrap()
.txs;
txs.into_par_iter()
.map(|r| {
let blob = hex::decode(if r.as_hex.is_empty() {
r.pruned_as_hex
} else {
r.as_hex
})
.unwrap();
(Transaction::read(&mut blob.as_slice()).unwrap(), blob)
})
.collect()
}
pub(crate) async fn get_block_test_batch(&self, heights: BTreeSet<usize>) {
@ -103,6 +156,8 @@ impl RpcClient {
}
let tasks = heights.into_iter().map(|height| {
let json_rpc_url = format!("{}/json_rpc", self.rpc_url);
let request = json!({
"jsonrpc": "2.0",
"id": 0,
@ -110,8 +165,7 @@ impl RpcClient {
"params": {"height": height}
});
let task =
tokio::task::spawn(self.client.get(&self.rpc_node_url).json(&request).send());
let task = tokio::task::spawn(self.client.get(&json_rpc_url).json(&request).send());
(height, task)
});
@ -126,15 +180,24 @@ impl RpcClient {
.unwrap()
.result;
let info = format!("\nheight: {height}\nresponse: {resp:#?}");
// Test block deserialization.
let block = match Block::read(&mut resp.blob.as_slice()) {
Ok(b) => b,
Err(e) => panic!("{e:?}\n{info}"),
};
// Fetch all transactions.
let mut tx_hashes = vec![block.miner_transaction.hash()];
tx_hashes.extend(block.transactions.iter());
let txs = self.get_transactions(tx_hashes.clone()).await;
assert_eq!(tx_hashes.len(), txs.len());
let top_height = self.top_height;
#[expect(clippy::cast_precision_loss)]
rayon::spawn(move || {
let info = format!("\nheight: {height}\nresponse: {resp:#?}");
// Test block deserialization.
let block = match Block::read(&mut resp.blob.as_slice()) {
Ok(b) => b,
Err(e) => panic!("{e:?}\n{info}"),
};
// Test block properties.
assert_eq!(resp.blob, block.serialize(), "{info}");
@ -167,8 +230,21 @@ impl RpcClient {
timestamp,
} = resp.block_header;
assert_ne!(block_weight, 0, "{info}"); // TODO: test this
assert_ne!(block.miner_transaction.weight(), 0, "{info}"); // TODO: test this
let total_block_weight = txs.iter().map(|(tx, _)| tx.weight()).sum::<usize>();
// Test transaction properties.
txs.into_par_iter()
.zip(tx_hashes)
.for_each(|((tx, blob), hash)| {
assert_eq!(hash, tx.hash(), "{info}, tx: {tx:#?}");
assert_ne!(tx.weight(), 0, "{info}, tx: {tx:#?}");
assert!(!tx.prefix().inputs.is_empty(), "{info}, tx: {tx:#?}");
assert_eq!(blob, tx.serialize(), "{info}, tx: {tx:#?}");
assert!(matches!(tx.version(), 1 | 2), "{info}, tx: {tx:#?}");
});
assert_eq!(block_weight, total_block_weight, "{info}");
assert_ne!(block.miner_transaction.weight(), 0, "{info}");
assert_eq!(hash, block.hash(), "{info}");
assert_eq!(height, block.number().unwrap(), "{info}");
assert_eq!(major_version, block.header.hardfork_version, "{info}");
@ -180,7 +256,38 @@ impl RpcClient {
assert_eq!(reward, block_reward, "{info}");
assert_eq!(timestamp, block.header.timestamp, "{info}");
TESTED_BLOCK_COUNT.fetch_add(1, Ordering::Release);
let block_count = TESTED_BLOCK_COUNT.fetch_add(1, Ordering::Release) + 1;
let tx_count = TESTED_TX_COUNT.fetch_add(num_txes, Ordering::Release) + 1;
let percent = (block_count as f64 / top_height as f64) * 100.0;
println!(
"block_count | {block_count}/{top_height} ({percent:.2}%)
tx_count | {tx_count}
hash | {}
miner_tx_hash | {}
prev_hash | {}
reward | {}
timestamp | {}
nonce | {}
height | {}
block_weight | {}
miner_tx_weight | {}
major_version | {}
minor_version | {}
num_txes | {}\n",
hex::encode(hash),
hex::encode(miner_tx_hash),
hex::encode(prev_hash),
reward,
timestamp,
nonce,
height,
block_weight,
block.miner_transaction.weight(),
major_version,
minor_version,
num_txes,
);
});
}
}

View file

@ -1,32 +1,42 @@
mod rpc;
use std::{
sync::atomic::{AtomicUsize, Ordering},
time::{Duration, Instant},
};
mod rpc;
pub static TESTED_BLOCK_COUNT: AtomicUsize = AtomicUsize::new(0);
#[tokio::main]
async fn main() {
let now = Instant::now();
let rpc_node_url = if let Ok(url) = std::env::var("RPC_NODE_URL") {
let rpc_url = if let Ok(url) = std::env::var("RPC_URL") {
url
} else {
"http://127.0.0.1:18081/json_rpc".to_string()
"http://127.0.0.1:18081".to_string()
};
println!("rpc_node_url: {rpc_node_url}");
println!("rpc_url: {rpc_url}");
let rpc_client = rpc::RpcClient::new(rpc_node_url).await;
let client = rpc::RpcClient::new(rpc_url).await;
let top_height = client.top_height;
tokio::join!(
rpc_client.cryptonight_v0(),
rpc_client.cryptonight_v1(),
rpc_client.cryptonight_v2(),
rpc_client.cryptonight_r(),
rpc_client.randomx(),
client.cryptonight_v0(),
client.cryptonight_v1(),
client.cryptonight_v2(),
client.cryptonight_r(),
client.randomx(),
);
println!("finished all PoW, took: {}s", now.elapsed().as_secs());
loop {
let count = TESTED_BLOCK_COUNT.load(Ordering::Acquire);
if top_height == count {
println!("finished all PoW, took {}s", now.elapsed().as_secs());
std::process::exit(0);
}
std::thread::sleep(Duration::from_secs(1));
}
}

View file

@ -41,12 +41,12 @@ struct BlockHeader {
#[derive(Debug, Clone)]
pub(crate) struct RpcClient {
client: Client,
rpc_node_url: String,
top_height: usize,
rpc_url: String,
pub top_height: usize,
}
impl RpcClient {
pub(crate) async fn new(rpc_node_url: String) -> Self {
pub(crate) async fn new(rpc_url: String) -> Self {
let headers = {
let mut h = HeaderMap::new();
h.insert("Content-Type", HeaderValue::from_static("application/json"));
@ -66,7 +66,7 @@ impl RpcClient {
});
let top_height = client
.get(&rpc_node_url)
.get(&rpc_url)
.json(&request)
.send()
.await
@ -90,7 +90,7 @@ impl RpcClient {
Self {
client,
rpc_node_url,
rpc_url,
top_height,
}
}
@ -103,7 +103,7 @@ impl RpcClient {
"params": {"height": height, "fill_pow_hash": true}
});
tokio::task::spawn(self.client.get(&self.rpc_node_url).json(&request).send())
tokio::task::spawn(self.client.get(&self.rpc_url).json(&request).send())
.await
.unwrap()
.unwrap()
@ -167,7 +167,7 @@ impl RpcClient {
"\nheight: {height}\nheader: {header:#?}\nblock: {block:#?}"
);
let count = TESTED_BLOCK_COUNT.fetch_add(1, Ordering::Release);
let count = TESTED_BLOCK_COUNT.fetch_add(1, Ordering::Release) + 1;
let hex_header = hex::encode(header.pow_hash);
let hex_hash = hex::encode(pow_hash);