diff --git a/Cargo.lock b/Cargo.lock index 647c0353..000a9885 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -527,7 +527,7 @@ dependencies = [ [[package]] name = "dalek-ff-group" version = "0.4.1" -source = "git+https://github.com/Cuprate/serai.git?rev=77edd00#77edd007255faf256db9026850b1a31201ede22f" +source = "git+https://github.com/Cuprate/serai.git?rev=a59966b#a59966b736ca988c13bd6eb33a9f2204bdf747fb" dependencies = [ "crypto-bigint", "curve25519-dalek", @@ -588,7 +588,7 @@ dependencies = [ [[package]] name = "dleq" version = "0.4.1" -source = "git+https://github.com/Cuprate/serai.git?rev=77edd00#77edd007255faf256db9026850b1a31201ede22f" +source = "git+https://github.com/Cuprate/serai.git?rev=a59966b#a59966b736ca988c13bd6eb33a9f2204bdf747fb" dependencies = [ "digest", "ff", @@ -678,7 +678,7 @@ dependencies = [ [[package]] name = "flexible-transcript" version = "0.3.2" -source = "git+https://github.com/Cuprate/serai.git?rev=77edd00#77edd007255faf256db9026850b1a31201ede22f" +source = "git+https://github.com/Cuprate/serai.git?rev=a59966b#a59966b736ca988c13bd6eb33a9f2204bdf747fb" dependencies = [ "blake2", "digest", @@ -1175,7 +1175,7 @@ dependencies = [ [[package]] name = "monero-generators" version = "0.4.0" -source = "git+https://github.com/Cuprate/serai.git?rev=77edd00#77edd007255faf256db9026850b1a31201ede22f" +source = "git+https://github.com/Cuprate/serai.git?rev=a59966b#a59966b736ca988c13bd6eb33a9f2204bdf747fb" dependencies = [ "curve25519-dalek", "dalek-ff-group", @@ -1207,7 +1207,7 @@ dependencies = [ [[package]] name = "monero-serai" version = "0.1.4-alpha" -source = "git+https://github.com/Cuprate/serai.git?rev=77edd00#77edd007255faf256db9026850b1a31201ede22f" +source = "git+https://github.com/Cuprate/serai.git?rev=a59966b#a59966b736ca988c13bd6eb33a9f2204bdf747fb" dependencies = [ "async-lock", "async-trait", @@ -1253,7 +1253,7 @@ dependencies = [ [[package]] name = "multiexp" version = "0.4.0" -source = "git+https://github.com/Cuprate/serai.git?rev=77edd00#77edd007255faf256db9026850b1a31201ede22f" +source = "git+https://github.com/Cuprate/serai.git?rev=a59966b#a59966b736ca988c13bd6eb33a9f2204bdf747fb" dependencies = [ "ff", "group", @@ -1555,8 +1555,7 @@ dependencies = [ [[package]] name = "randomx-rs" version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14fb999f322669968fd0e80aeca5cb91e7a817a94ebf2b0fcd345a4a7c695203" +source = "git+https://github.com/Cuprate/randomx-rs.git#6496a61208852a020575dafc160080cf50bda67f" dependencies = [ "bitflags 1.3.2", "libc", @@ -1838,7 +1837,7 @@ dependencies = [ [[package]] name = "simple-request" version = "0.1.0" -source = "git+https://github.com/Cuprate/serai.git?rev=77edd00#77edd007255faf256db9026850b1a31201ede22f" +source = "git+https://github.com/Cuprate/serai.git?rev=a59966b#a59966b736ca988c13bd6eb33a9f2204bdf747fb" dependencies = [ "hyper", "hyper-rustls", @@ -1895,7 +1894,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "std-shims" version = "0.1.1" -source = "git+https://github.com/Cuprate/serai.git?rev=77edd00#77edd007255faf256db9026850b1a31201ede22f" +source = "git+https://github.com/Cuprate/serai.git?rev=a59966b#a59966b736ca988c13bd6eb33a9f2204bdf747fb" dependencies = [ "hashbrown 0.14.3", "spin", diff --git a/Cargo.toml b/Cargo.toml index fcc9639c..b50328c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,19 +35,19 @@ opt-level = 3 [workspace.dependencies] async-trait = { version = "0.1.74" } -borsh = { version = "1.2.1" } +borsh = { version = "1.2.1", features = ["derive"] } bytes = { version = "1.5.0" } clap = { version = "4.4.7" } chrono = { version = "0.4.31" } crypto-bigint = { version = "0.5.3" } curve25519-dalek = { version = "4.1.1" } -dalek-ff-group = { git = "https://github.com/Cuprate/serai.git", rev = "77edd00" } +dalek-ff-group = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b" } dirs = { version = "5.0.1" } futures = { version = "0.3.29" } hex = { version = "0.4.3" } monero-epee-bin-serde = { git = "https://github.com/monero-rs/monero-epee-bin-serde.git", rev = "e4a585a" } -monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "77edd00" } -multiexp = { git = "https://github.com/Cuprate/serai.git", rev = "77edd00" } +monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b" } +multiexp = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b" } randomx-rs = { version = "1.2.1" } rand = { version = "0.8.5" } rayon = { version = "1.8.0" } diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 30e091a5..882ed11a 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -37,7 +37,7 @@ futures = "0.3" crypto-bigint = "0.5" curve25519-dalek = "4" -randomx-rs = "1" +randomx-rs = {git = "https://github.com/Cuprate/randomx-rs.git"} monero-serai = { workspace = true } multiexp = { workspace = true } dalek-ff-group = { workspace = true } diff --git a/consensus/rules/src/blocks.rs b/consensus/rules/src/blocks.rs index 6ffc58c1..bc5d584e 100644 --- a/consensus/rules/src/blocks.rs +++ b/consensus/rules/src/blocks.rs @@ -12,6 +12,11 @@ use crate::{ const BLOCK_SIZE_SANITY_LEEWAY: usize = 100; const BLOCK_FUTURE_TIME_LIMIT: u64 = 60 * 60 * 2; +const BLOCK_202612_POW_HASH: [u8; 32] = + hex_literal::hex!("84f64766475d51837ac9efbef1926486e58563c95a19fef4aec3254f03000000"); + +const RX_SEEDHASH_EPOCH_BLOCKS: u64 = 2048; +const RX_SEEDHASH_EPOCH_LAG: u64 = 64; #[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)] pub enum BlockError { @@ -31,15 +36,33 @@ pub enum BlockError { MinerTxError(#[from] MinerTxError), } +pub trait RandomX { + type Error; + + fn calculate_hash(&self, buf: &[u8]) -> Result<[u8; 32], Self::Error>; +} + +pub fn is_randomx_seed_height(height: u64) -> bool { + height % RX_SEEDHASH_EPOCH_BLOCKS == 0 +} + +pub fn randomx_seed_height(height: u64) -> u64 { + if height <= RX_SEEDHASH_EPOCH_BLOCKS + RX_SEEDHASH_EPOCH_LAG { + 0 + } else { + (height - RX_SEEDHASH_EPOCH_LAG - 1) & !(RX_SEEDHASH_EPOCH_BLOCKS - 1) + } +} + /// Calculates the POW hash of this block. -pub fn calculate_pow_hash(buf: &[u8], height: u64, hf: &HardFork) -> Result<[u8; 32], BlockError> { +pub fn calculate_pow_hash( + randomx_vm: &R, + buf: &[u8], + height: u64, + hf: &HardFork, +) -> Result<[u8; 32], BlockError> { if height == 202612 { - return Ok( - hex::decode("84f64766475d51837ac9efbef1926486e58563c95a19fef4aec3254f03000000") - .unwrap() - .try_into() - .unwrap(), - ); + return Ok(BLOCK_202612_POW_HASH); } Ok(if hf < &HardFork::V7 { @@ -51,7 +74,9 @@ pub fn calculate_pow_hash(buf: &[u8], height: u64, hf: &HardFork) -> Result<[u8; } else if hf < &HardFork::V12 { cryptonight_hash_r(buf, height) } else { - todo!("RandomX") + randomx_vm + .calculate_hash(buf) + .map_err(|_| BlockError::POWInvalid)? }) } @@ -64,6 +89,11 @@ pub fn check_block_pow(hash: &[u8; 32], difficulty: u128) -> Result<(), BlockErr let difficulty = U256::from(difficulty); if int_hash.checked_mul(difficulty).is_none() { + tracing::debug!( + "Invalid POW: {}, difficulty: {}", + hex::encode(hash), + difficulty + ); Err(BlockError::POWInvalid) } else { Ok(()) diff --git a/consensus/rules/src/decomposed_amount.rs b/consensus/rules/src/decomposed_amount.rs index 860a5947..6ad6eda8 100644 --- a/consensus/rules/src/decomposed_amount.rs +++ b/consensus/rules/src/decomposed_amount.rs @@ -27,8 +27,8 @@ pub fn decomposed_amounts() -> &'static [u64; 172] { 10000000000000000, 20000000000000000, 30000000000000000, 40000000000000000, 50000000000000000, 60000000000000000, 70000000000000000, 80000000000000000, 90000000000000000, 100000000000000000, 200000000000000000, 300000000000000000, 400000000000000000, 500000000000000000, 600000000000000000, 700000000000000000, 800000000000000000, 900000000000000000, 1000000000000000000, 2000000000000000000, 3000000000000000000, 4000000000000000000, 5000000000000000000, 6000000000000000000, 7000000000000000000, 8000000000000000000, 9000000000000000000, - 10000000000000000000] - + 10000000000000000000 + ] }) } @@ -55,7 +55,7 @@ mod tests { } #[test] - fn decomposed_amounts_return_not_decomposed() { + fn non_decomposed_amounts_return_not_decomposed() { assert!(!is_decomposed_amount(&21)); assert!(!is_decomposed_amount(&345431)); assert!(!is_decomposed_amount(&20000001)); diff --git a/consensus/rules/src/genesis.rs b/consensus/rules/src/genesis.rs index 731eb03c..f77d4621 100644 --- a/consensus/rules/src/genesis.rs +++ b/consensus/rules/src/genesis.rs @@ -8,7 +8,7 @@ use monero_serai::{ use cuprate_common::Network; -fn genesis_nonce(network: &Network) -> u32 { +const fn genesis_nonce(network: &Network) -> u32 { match network { Network::Mainnet => 10000, Network::Testnet => 10001, diff --git a/consensus/rules/src/hard_forks.rs b/consensus/rules/src/hard_forks.rs index 54071169..d9ac4fae 100644 --- a/consensus/rules/src/hard_forks.rs +++ b/consensus/rules/src/hard_forks.rs @@ -81,6 +81,48 @@ impl HFsInfo { HFInfo::new(2689608, 0), ]) } + + pub const fn test_net() -> HFsInfo { + Self([ + HFInfo::new(0, 0), + HFInfo::new(624634, 0), + HFInfo::new(800500, 0), + HFInfo::new(801219, 0), + HFInfo::new(802660, 0), + HFInfo::new(971400, 0), + HFInfo::new(1057027, 0), + HFInfo::new(1057058, 0), + HFInfo::new(1057778, 0), + HFInfo::new(1154318, 0), + HFInfo::new(1155038, 0), + HFInfo::new(1308737, 0), + HFInfo::new(1543939, 0), + HFInfo::new(1544659, 0), + HFInfo::new(1982800, 0), + HFInfo::new(1983520, 0), + ]) + } + + pub const fn stage_net() -> HFsInfo { + Self([ + HFInfo::new(0, 0), + HFInfo::new(32000, 0), + HFInfo::new(33000, 0), + HFInfo::new(34000, 0), + HFInfo::new(35000, 0), + HFInfo::new(36000, 0), + HFInfo::new(37000, 0), + HFInfo::new(176456, 0), + HFInfo::new(177176, 0), + HFInfo::new(269000, 0), + HFInfo::new(269720, 0), + HFInfo::new(454721, 0), + HFInfo::new(675405, 0), + HFInfo::new(676125, 0), + HFInfo::new(1151000, 0), + HFInfo::new(1151720, 0), + ]) + } } /// An identifier for every hard-fork Monero has had. @@ -177,7 +219,7 @@ impl HardFork { if self != version { Err(HardForkError::VersionIncorrect)?; } - if self < vote { + if self > vote { Err(HardForkError::VoteTooLow)?; } diff --git a/consensus/rules/src/transactions.rs b/consensus/rules/src/transactions.rs index 4ae1b53d..37a56482 100644 --- a/consensus/rules/src/transactions.rs +++ b/consensus/rules/src/transactions.rs @@ -470,7 +470,7 @@ fn check_tx_version( // TODO: Doc is wrong here let min = min_tx_version(hf); - if version < &min && decoy_info.not_mixable != 0 { + if version < &min && decoy_info.not_mixable == 0 { return Err(TransactionError::TransactionVersionInvalid); } } else { diff --git a/consensus/src/bin/scan_chain.rs b/consensus/src/bin/scan_chain.rs index 980a947e..7250ab5c 100644 --- a/consensus/src/bin/scan_chain.rs +++ b/consensus/src/bin/scan_chain.rs @@ -1,13 +1,16 @@ #![cfg(feature = "binaries")] +use std::collections::{HashMap, HashSet}; +use std::ops::Deref; +use std::time::Duration; use std::{ops::Range, path::PathBuf, sync::Arc}; use clap::Parser; use futures::{ channel::{mpsc, oneshot}, - SinkExt, StreamExt, + SinkExt, StreamExt, TryFutureExt, }; -use monero_serai::block::Block; +use monero_serai::{block::Block, transaction::Transaction}; use rayon::prelude::*; use tokio::sync::RwLock; use tower::{Service, ServiceExt}; @@ -16,19 +19,23 @@ use tracing::level_filters::LevelFilter; use cuprate_common::Network; use cuprate_consensus::{ + block::PrePreparedBlockExPOW, context::{ BlockChainContextRequest, BlockChainContextResponse, ContextConfig, UpdateBlockchainCacheData, }, initialize_blockchain_context, initialize_verifier, + randomx::RandomXVM, rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig}, Database, DatabaseRequest, DatabaseResponse, PrePreparedBlock, VerifiedBlockInformation, VerifyBlockRequest, VerifyBlockResponse, }; +use monero_consensus::{blocks::randomx_seed_height, HardFork}; + mod tx_pool; -const MAX_BLOCKS_IN_RANGE: u64 = 500; +const MAX_BLOCKS_IN_RANGE: u64 = 200; const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 1000; /// Calls for a batch of blocks, returning the response and the time it took. @@ -92,19 +99,19 @@ where D::Future: Send + 'static, { let mut next_fut = tokio::spawn(call_batch( - start_height..(start_height + (MAX_BLOCKS_IN_RANGE * 3)).min(chain_height), + start_height..(start_height + (MAX_BLOCKS_IN_RANGE * 4)).min(chain_height), database.clone(), )); for next_batch_start in (start_height..chain_height) - .step_by((MAX_BLOCKS_IN_RANGE * 3) as usize) + .step_by((MAX_BLOCKS_IN_RANGE * 4) as usize) .skip(1) { // Call the next batch while we handle this batch. let current_fut = std::mem::replace( &mut next_fut, tokio::spawn(call_batch( - next_batch_start..(next_batch_start + (MAX_BLOCKS_IN_RANGE * 3)).min(chain_height), + next_batch_start..(next_batch_start + (MAX_BLOCKS_IN_RANGE * 4)).min(chain_height), database.clone(), )), ); @@ -114,18 +121,41 @@ where }; tracing::info!( - "Retrived batch: {:?}, chain height: {}", - (next_batch_start - (MAX_BLOCKS_IN_RANGE * 3))..(next_batch_start), + "Got batch: {:?}, chain height: {}", + (next_batch_start - (MAX_BLOCKS_IN_RANGE * 4))..(next_batch_start), chain_height ); let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip(); + let hf = |block: &Block| HardFork::from_version(block.header.major_version).unwrap(); + + let txs_hf = if blocks.first().map(hf) == blocks.last().map(hf) { + vec![( + txs.into_iter().flatten().collect::>(), + blocks.first().map(hf).unwrap(), + )] + } else { + let mut txs_hfs: Vec<(Vec, HardFork)> = Vec::new(); + let mut last_hf = blocks.first().map(hf).unwrap(); + + txs_hfs.push((vec![], last_hf)); + + for (mut txs, current_hf) in txs.into_iter().zip(blocks.iter().map(hf)) { + if current_hf == last_hf { + assert_eq!(txs_hfs.last_mut().unwrap().1, current_hf); + txs_hfs.last_mut().unwrap().0.append(&mut txs); + } else { + txs_hfs.push((txs, current_hf)); + last_hf = current_hf; + } + } + txs_hfs + }; + let (tx, rx) = oneshot::channel(); - new_tx_chan - .send((txs.into_iter().flatten().collect(), tx)) - .await?; - rx.await??; + new_tx_chan.send((txs_hf, tx)).await?; + rx.await.unwrap().unwrap(); block_chan.send(blocks).await?; } @@ -138,6 +168,7 @@ async fn scan_chain( save_file: PathBuf, _rpc_config: Arc>, database: D, + net: Network, ) -> Result<(), tower::BoxError> where D: Database + Clone + Send + Sync + 'static, @@ -146,11 +177,15 @@ where tracing::info!("Beginning chain scan"); // TODO: when we implement all rules use the RPCs chain height, for now we don't check v2 txs. - let chain_height = 3_000_000; + let chain_height = 3_152_725; tracing::info!("scanning to chain height: {}", chain_height); - let config = ContextConfig::main_net(); + let config = match net { + Network::Mainnet => ContextConfig::main_net(), + Network::Stagenet => ContextConfig::stage_net(), + Network::Testnet => ContextConfig::test_net(), + }; let mut ctx_svc = initialize_blockchain_context(config, database.clone()).await?; @@ -173,15 +208,66 @@ where call_blocks(new_tx_chan, block_tx, start_height, chain_height, database).await }); + let BlockChainContextResponse::Context(ctx) = ctx_svc + .ready() + .await? + .call(BlockChainContextRequest::Get) + .await? + else { + panic!("ctx svc sent wrong response!"); + }; + let mut rx_seed_cache = ctx.unchecked_blockchain_context().rx_seed_cache.clone(); + + let mut randomx_vms: Option> = Some(HashMap::new()); + tokio::spawn(async move { - while let Some(blocks) = incoming_blocks.next().await { - let blocks = rayon_spawn_async(|| { + while let Some(mut blocks) = incoming_blocks.next().await { + let unwrapped_rx_vms = randomx_vms.as_mut().unwrap(); + + let blocks = rayon_spawn_async(move || { blocks - .into_par_iter() - .map(|block| PrePreparedBlock::new(block).unwrap()) + .into_iter() + .map(move |block| PrePreparedBlockExPOW::new(block).unwrap()) .collect::>() }) .await; + + let seeds_needed = blocks + .iter() + .map(|block| { + rx_seed_cache.new_block(block.block.number() as u64, &block.block_hash); + randomx_seed_height(block.block.number() as u64) + }) + .collect::>(); + + unwrapped_rx_vms.retain(|seed_height, _| seeds_needed.contains(seed_height)); + + for seed_height in seeds_needed { + if !unwrapped_rx_vms.contains_key(&seed_height) { + unwrapped_rx_vms.insert( + seed_height, + RandomXVM::new(rx_seed_cache.get_seeds_hash(seed_height)).unwrap(), + ); + } + } + + let arc_rx_vms = Arc::new(randomx_vms.take().unwrap()); + let cloned_arc_rx_vms = arc_rx_vms.clone(); + let blocks = rayon_spawn_async(move || { + blocks + .into_iter() + .map(move |block| { + let rx_vm = arc_rx_vms + .get(&randomx_seed_height(block.block.number() as u64)) + .unwrap(); + PrePreparedBlock::new(block, rx_vm).unwrap() + }) + .collect::>() + }) + .await; + + randomx_vms = Some(Arc::into_inner(cloned_arc_rx_vms).unwrap()); + prepped_blocks_tx.send(blocks).await.unwrap(); } }); @@ -255,6 +341,8 @@ async fn main() { let network = match args.network.as_str() { "mainnet" => Network::Mainnet, + "testnet" => Network::Testnet, + "stagenet" => Network::Stagenet, _ => panic!("Invalid network, scanner currently only supports mainnet"), }; @@ -262,36 +350,81 @@ async fn main() { Some(dir) => dir, None => dirs::cache_dir().unwrap(), }; - file_for_cache.push("cuprate_rpc_scanning_cache.bin"); + + match network { + Network::Mainnet => file_for_cache.push("cuprate_rpc_scanning_cache.bin"), + Network::Stagenet => file_for_cache.push("cuprate_rpc_scanning_cache_stage_net.bin"), + Network::Testnet => file_for_cache.push("cuprate_rpc_scanning_cache_test_net.bin"), + } let mut urls = if args.dont_use_default_nodes { vec![] } else { - vec![ - "http://xmr-node.cakewallet.com:18081".to_string(), - "https://node.sethforprivacy.com".to_string(), - "http://nodex.monerujo.io:18081".to_string(), - "http://nodes.hashvault.pro:18081".to_string(), - "http://node.c3pool.com:18081".to_string(), - "http://node.trocador.app:18089".to_string(), - "http://xmr.lukas.services:18089".to_string(), - "http://xmr-node-eu.cakewallet.com:18081".to_string(), - "http://38.105.209.54:18089".to_string(), - "http://68.118.241.70:18089".to_string(), - "http://145.239.97.211:18089".to_string(), - // - "http://xmr-node.cakewallet.com:18081".to_string(), - "https://node.sethforprivacy.com".to_string(), - "http://nodex.monerujo.io:18081".to_string(), - "http://nodes.hashvault.pro:18081".to_string(), - "http://node.c3pool.com:18081".to_string(), - "http://node.trocador.app:18089".to_string(), - "http://xmr.lukas.services:18089".to_string(), - "http://xmr-node-eu.cakewallet.com:18081".to_string(), - "http://38.105.209.54:18089".to_string(), - "http://68.118.241.70:18089".to_string(), - "http://145.239.97.211:18089".to_string(), - ] + match network { + Network::Mainnet => vec![ + "http://xmr-node.cakewallet.com:18081".to_string(), + "https://node.sethforprivacy.com".to_string(), + // "http://nodex.monerujo.io:18081".to_string(), + "http://nodes.hashvault.pro:18081".to_string(), + "http://node.c3pool.com:18081".to_string(), + "http://node.trocador.app:18089".to_string(), + "http://xmr.lukas.services:18089".to_string(), + "http://xmr-node-eu.cakewallet.com:18081".to_string(), + "http://68.118.241.70:18089".to_string(), + "http://145.239.97.211:18089".to_string(), + // + "http://xmr-node.cakewallet.com:18081".to_string(), + "https://node.sethforprivacy.com".to_string(), + // "http://nodex.monerujo.io:18081".to_string(), + "http://nodes.hashvault.pro:18081".to_string(), + "http://node.c3pool.com:18081".to_string(), + "http://node.trocador.app:18089".to_string(), + "http://xmr.lukas.services:18089".to_string(), + "http://xmr-node-eu.cakewallet.com:18081".to_string(), + "http://68.118.241.70:18089".to_string(), + "http://145.239.97.211:18089".to_string(), + ], + Network::Testnet => vec![ + "http://testnet.xmr-tw.org:28081".to_string(), + "http://node3.monerodevs.org:28089".to_string(), + "http://node.monerodevs.org:28089".to_string(), + "http://125.229.105.12:28081".to_string(), + "http://node2.monerodevs.org:28089".to_string(), + "https://testnet.xmr.ditatompel.com".to_string(), + "http://singapore.node.xmr.pm:28081".to_string(), + // + "http://testnet.xmr-tw.org:28081".to_string(), + "http://node3.monerodevs.org:28089".to_string(), + "http://node.monerodevs.org:28089".to_string(), + "http://125.229.105.12:28081".to_string(), + "http://node2.monerodevs.org:28089".to_string(), + "https://testnet.xmr.ditatompel.com".to_string(), + "http://singapore.node.xmr.pm:28081".to_string(), + ], + Network::Stagenet => vec![ + "http://125.229.105.12:38081".to_string(), + "http://90.189.159.23:38089".to_string(), + "http://stagenet.xmr-tw.org:38081".to_string(), + "http://node.monerodevs.org:38089".to_string(), + "http://stagenet.community.rino.io:38081".to_string(), + "http://node2.monerodevs.org:38089".to_string(), + "http://node3.monerodevs.org:38089".to_string(), + "http://singapore.node.xmr.pm:38081".to_string(), + "https://stagenet.xmr.ditatompel.com".to_string(), + "http://3.10.182.182:38081".to_string(), + // + "http://125.229.105.12:38081".to_string(), + "http://90.189.159.23:38089".to_string(), + "http://stagenet.xmr-tw.org:38081".to_string(), + "http://node.monerodevs.org:38089".to_string(), + "http://stagenet.community.rino.io:38081".to_string(), + "http://node2.monerodevs.org:38089".to_string(), + "http://node3.monerodevs.org:38089".to_string(), + "http://singapore.node.xmr.pm:38081".to_string(), + "https://stagenet.xmr.ditatompel.com".to_string(), + "http://3.10.182.182:38081".to_string(), + ], + } }; urls.extend(args.rpc_nodes.into_iter()); @@ -325,7 +458,7 @@ async fn main() { let rpc = init_rpc_load_balancer(urls, cache.clone(), rpc_config.clone()); - scan_chain(cache, file_for_cache, rpc_config, rpc) + scan_chain(cache, file_for_cache, rpc_config, rpc, network) .await .unwrap(); } diff --git a/consensus/src/bin/tx_pool.rs b/consensus/src/bin/tx_pool.rs index 15efb881..905a58c6 100644 --- a/consensus/src/bin/tx_pool.rs +++ b/consensus/src/bin/tx_pool.rs @@ -23,6 +23,7 @@ use cuprate_consensus::{ transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse}, ExtendedConsensusError, TxNotInPool, TxPoolRequest, TxPoolResponse, }; +use monero_consensus::HardFork; #[derive(Clone)] pub struct TxPoolHandle { @@ -59,12 +60,12 @@ impl tower::Service for TxPoolHandle { } pub type NewTxChanRec = mpsc::Receiver<( - Vec, + Vec<(Vec, HardFork)>, oneshot::Sender>, )>; pub type NewTxChanSen = mpsc::Sender<( - Vec, + Vec<(Vec, HardFork)>, oneshot::Sender>, )>; @@ -94,16 +95,7 @@ where pub async fn spawn( tx_verifier_chan: oneshot::Receiver, mut ctx_svc: Ctx, - ) -> Result< - ( - TxPoolHandle, - mpsc::Sender<( - Vec, - oneshot::Sender>, - )>, - ), - tower::BoxError, - > { + ) -> Result<(TxPoolHandle, NewTxChanSen), tower::BoxError> { let BlockChainContextResponse::Context(current_ctx) = ctx_svc .ready() .await? @@ -155,8 +147,8 @@ where } } - fn handle_txs_req( - &self, + async fn handle_txs_req( + &mut self, req: TxPoolRequest, tx: oneshot::Sender>, ) { @@ -164,10 +156,9 @@ where let mut res = Vec::with_capacity(txs_to_get.len()); - let mut txs = self.txs.lock().unwrap(); - for tx_hash in txs_to_get { - let Some(tx) = txs.remove(&tx_hash) else { + let Some(tx) = self.txs.lock().unwrap().remove(&tx_hash) else { + tracing::debug!("tx not in pool: {}", hex::encode(tx_hash)); let _ = tx.send(Err(TxNotInPool)); return; }; @@ -179,7 +170,7 @@ where async fn handle_new_txs( &mut self, - new_txs: Vec, + new_txs: Vec<(Vec, HardFork)>, res_chan: oneshot::Sender>, ) -> Result<(), tower::BoxError> { if self.tx_verifier.is_none() { @@ -192,26 +183,31 @@ where let tx_pool = self.txs.clone(); tokio::spawn(async move { - // We only batch the setup a real tx pool would also call `VerifyTxRequest::Block` - let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier - .ready() - .await - .unwrap() - .call(VerifyTxRequest::BatchSetup { - txs: new_txs, - hf: current_ctx.current_hf, - re_org_token: current_ctx.re_org_token.clone(), - }) - .await - .unwrap() - else { - panic!("Tx verifier sent incorrect response!"); - }; + for (txs, hf) in new_txs { + // We only batch the setup a real tx pool would also call `VerifyTxRequest::Block` + let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier + .ready() + .await + .unwrap() + .call(VerifyTxRequest::BatchSetup { + txs, + hf, + re_org_token: current_ctx.re_org_token.clone(), + }) + .await + .unwrap() + else { + panic!("Tx verifier sent incorrect response!"); + }; - let mut locked_pool = tx_pool.lock().unwrap(); + let mut locked_pool = tx_pool.lock().unwrap(); - for tx in txs { - locked_pool.insert(tx.tx_hash, tx); + for tx in txs { + let tx_hash = tx.tx_hash; + if locked_pool.insert(tx_hash, tx).is_some() { + panic!("added same tx to pool twice: {}", hex::encode(tx_hash)) + } + } } res_chan.send(Ok(())).unwrap(); }); @@ -227,13 +223,8 @@ where mut new_tx_channel: NewTxChanRec, ) { loop { - futures::select! { - pool_req = tx_pool_handle.next() => { - let Some((req, tx)) = pool_req else { - todo!("Shutdown txpool") - }; - self.handle_txs_req(req, tx); - } + tokio::select! { + biased; new_txs = new_tx_channel.next() => { let Some(new_txs) = new_txs else { todo!("Shutdown txpool") @@ -241,6 +232,12 @@ where self.handle_new_txs(new_txs.0, new_txs.1).await.unwrap() } + pool_req = tx_pool_handle.next() => { + let Some((req, tx)) = pool_req else { + todo!("Shutdown txpool") + }; + self.handle_txs_req(req, tx).await; + } } } } diff --git a/consensus/src/block.rs b/consensus/src/block.rs index fb96690e..ae27f710 100644 --- a/consensus/src/block.rs +++ b/consensus/src/block.rs @@ -10,7 +10,7 @@ use monero_serai::block::Block; use monero_serai::transaction::Input; use tower::{Service, ServiceExt}; -use monero_consensus::blocks::BlockError; +use monero_consensus::blocks::{BlockError, RandomX}; use monero_consensus::miner_tx::MinerTxError; use monero_consensus::{ blocks::{calculate_pow_hash, check_block, check_block_pow}, @@ -24,6 +24,37 @@ use crate::{ ExtendedConsensusError, TxNotInPool, TxPoolRequest, TxPoolResponse, }; +#[derive(Debug)] +pub struct PrePreparedBlockExPOW { + pub block: Block, + pub block_blob: Vec, + + pub hf_vote: HardFork, + pub hf_version: HardFork, + + pub block_hash: [u8; 32], + + pub miner_tx_weight: usize, +} + +impl PrePreparedBlockExPOW { + pub fn new(block: Block) -> Result { + let (hf_version, hf_vote) = + HardFork::from_block_header(&block.header).map_err(BlockError::HardForkError)?; + + Ok(PrePreparedBlockExPOW { + block_blob: block.serialize(), + hf_vote, + hf_version, + + block_hash: block.hash(), + + miner_tx_weight: block.miner_tx.weight(), + block, + }) + } +} + #[derive(Debug)] pub struct PrePreparedBlock { pub block: Block, @@ -39,26 +70,31 @@ pub struct PrePreparedBlock { } impl PrePreparedBlock { - pub fn new(block: Block) -> Result { - let (hf_version, hf_vote) = - HardFork::from_block_header(&block.header).map_err(BlockError::HardForkError)?; - - let Some(Input::Gen(height)) = block.miner_tx.prefix.inputs.first() else { + pub fn new( + block: PrePreparedBlockExPOW, + randomx_vm: &R, + ) -> Result { + let Some(Input::Gen(height)) = block.block.miner_tx.prefix.inputs.first() else { Err(ConsensusError::Block(BlockError::MinerTxError( MinerTxError::InputNotOfTypeGen, )))? }; Ok(PrePreparedBlock { - block_blob: block.serialize(), - hf_vote, - hf_version, + block_blob: block.block_blob, + hf_vote: block.hf_vote, + hf_version: block.hf_version, - block_hash: block.hash(), - pow_hash: calculate_pow_hash(&block.serialize_hashable(), *height, &hf_vote)?, + block_hash: block.block_hash, + pow_hash: calculate_pow_hash( + randomx_vm, + &block.block.serialize_hashable(), + *height, + &block.hf_version, + )?, - miner_tx_weight: block.miner_tx.weight(), - block, + miner_tx_weight: block.block.miner_tx.weight(), + block: block.block, }) } } @@ -321,10 +357,13 @@ where // do POW test last let chain_height = context.chain_height; let current_hf = context.current_hf; - let pow_hash = - rayon_spawn_async(move || calculate_pow_hash(&hashing_blob, chain_height, ¤t_hf)) - .await - .map_err(ConsensusError::Block)?; + let pow_hash = todo!(); + /* + rayon_spawn_async(move || calculate_pow_hash(, &hashing_blob, chain_height, ¤t_hf)) + .await + .map_err(ConsensusError::Block)?; + + */ check_block_pow(&pow_hash, context.next_difficulty).map_err(ConsensusError::Block)?; diff --git a/consensus/src/context.rs b/consensus/src/context.rs index a856e42a..2e48ac3d 100644 --- a/consensus/src/context.rs +++ b/consensus/src/context.rs @@ -28,6 +28,7 @@ mod difficulty; mod hardforks; mod weight; +mod rx_seed; #[cfg(test)] mod tests; mod tokens; @@ -53,6 +54,22 @@ impl ContextConfig { weights_config: BlockWeightsCacheConfig::main_net(), } } + + pub fn stage_net() -> ContextConfig { + ContextConfig { + hard_fork_cfg: HardForkConfig::stage_net(), + difficulty_cfg: DifficultyCacheConfig::main_net(), + weights_config: BlockWeightsCacheConfig::main_net(), + } + } + + pub fn test_net() -> ContextConfig { + ContextConfig { + hard_fork_cfg: HardForkConfig::test_net(), + difficulty_cfg: DifficultyCacheConfig::main_net(), + weights_config: BlockWeightsCacheConfig::main_net(), + } + } } pub async fn initialize_blockchain_context( @@ -117,6 +134,11 @@ where hardforks::HardForkState::init_from_chain_height(chain_height, hard_fork_cfg, db).await }); + let db = database.clone(); + let rx_seed_handle = tokio::spawn(async move { + rx_seed::RandomXSeed::init_from_chain_height(chain_height, db).await + }); + let context_svc = BlockChainContextService { internal_blockchain_context: Arc::new( InternalBlockChainContext { @@ -124,6 +146,7 @@ where current_reorg_token: ReOrgToken::new(), difficulty_cache: difficulty_cache_handle.await.unwrap()?, weight_cache: weight_cache_handle.await.unwrap()?, + rx_seed_cache: rx_seed_handle.await.unwrap()?, hardfork_state: hardfork_state_handle.await.unwrap()?, chain_height, already_generated_coins, @@ -145,6 +168,7 @@ pub struct RawBlockChainContext { pub cumulative_difficulty: u128, /// A token which is used to signal if a reorg has happened since creating the token. pub re_org_token: ReOrgToken, + pub rx_seed_cache: rx_seed::RandomXSeed, pub context_to_verify_block: ContextToVerifyBlock, /// The median long term block weight. median_long_term_weight: usize, @@ -254,8 +278,6 @@ pub enum BlockChainContextResponse { Context(BlockChainContext), Ok, } - -#[derive(Clone)] struct InternalBlockChainContext { /// A token used to invalidate previous contexts when a new /// block is added to the chain. @@ -265,6 +287,7 @@ struct InternalBlockChainContext { difficulty_cache: difficulty::DifficultyCache, weight_cache: weight::BlockWeightsCache, + rx_seed_cache: rx_seed::RandomXSeed, hardfork_state: hardforks::HardForkState, chain_height: u64, @@ -324,6 +347,7 @@ impl Service for BlockChainContextService { current_reorg_token, difficulty_cache, weight_cache, + rx_seed_cache, hardfork_state, chain_height, top_block_hash, @@ -351,6 +375,7 @@ impl Service for BlockChainContextService { next_difficulty: difficulty_cache.next_difficulty(¤t_hf), already_generated_coins: *already_generated_coins, }, + rx_seed_cache: rx_seed_cache.clone(), cumulative_difficulty: difficulty_cache.cumulative_difficulty(), median_long_term_weight: weight_cache.median_long_term_weight(), top_block_timestamp: difficulty_cache.top_block_timestamp(), @@ -368,6 +393,8 @@ impl Service for BlockChainContextService { hardfork_state.new_block(new.vote, new.height); + rx_seed_cache.new_block(new.height, &new.new_top_hash); + *chain_height = new.height + 1; *top_block_hash = new.new_top_hash; *already_generated_coins = diff --git a/consensus/src/context/hardforks.rs b/consensus/src/context/hardforks.rs index a5a41d2c..7d107d35 100644 --- a/consensus/src/context/hardforks.rs +++ b/consensus/src/context/hardforks.rs @@ -30,6 +30,20 @@ impl HardForkConfig { window: DEFAULT_WINDOW_SIZE, } } + + pub const fn stage_net() -> HardForkConfig { + Self { + info: HFsInfo::stage_net(), + window: DEFAULT_WINDOW_SIZE, + } + } + + pub const fn test_net() -> HardForkConfig { + Self { + info: HFsInfo::test_net(), + window: DEFAULT_WINDOW_SIZE, + } + } } /// A struct that keeps track of the current hard-fork and current votes. diff --git a/consensus/src/context/rx_seed.rs b/consensus/src/context/rx_seed.rs new file mode 100644 index 00000000..c9d95f98 --- /dev/null +++ b/consensus/src/context/rx_seed.rs @@ -0,0 +1,115 @@ +use std::collections::VecDeque; + +use futures::{stream::FuturesOrdered, StreamExt}; +use tower::ServiceExt; + +use monero_consensus::blocks::{is_randomx_seed_height, randomx_seed_height}; + +use crate::{Database, DatabaseRequest, DatabaseResponse, ExtendedConsensusError}; + +const RX_SEEDS_CACHED: usize = 3; + +#[derive(Clone, Debug)] +pub struct RandomXSeed { + seeds: VecDeque<(u64, [u8; 32])>, +} + +impl RandomXSeed { + pub async fn init_from_chain_height( + chain_height: u64, + database: D, + ) -> Result { + let seed_heights = get_last_rx_seed_heights(chain_height - 1, RX_SEEDS_CACHED); + let seed_hashes = get_block_hashes(seed_heights.clone(), database).await?; + + Ok(RandomXSeed { + seeds: seed_heights.into_iter().zip(seed_hashes).collect(), + }) + } + + pub fn get_seeds_hash(&self, seed_height: u64) -> [u8; 32] { + for (height, seed) in self.seeds.iter() { + if seed_height == *height { + return *seed; + } + } + + tracing::error!( + "Current seeds: {:?}, asked for: {}", + self.seeds, + seed_height + ); + panic!("RX seed cache was not updated or was asked for a block too old.") + } + + pub fn get_rx_seed(&self, height: u64) -> [u8; 32] { + let seed_height = randomx_seed_height(height); + tracing::warn!( + "Current seeds: {:?}, asked for: {}", + self.seeds, + seed_height + ); + + self.get_seeds_hash(seed_height) + } + + pub fn new_block(&mut self, height: u64, hash: &[u8; 32]) { + if is_randomx_seed_height(height) { + for (got_height, _) in self.seeds.iter() { + if *got_height == height { + return; + } + } + + self.seeds.pop_back(); + self.seeds.push_front((height, *hash)); + } + } +} + +fn get_last_rx_seed_heights(mut last_height: u64, mut amount: usize) -> Vec { + let mut seeds = Vec::with_capacity(amount); + if is_randomx_seed_height(last_height) { + seeds.push(last_height); + amount -= 1; + } + + for _ in 0..amount { + if last_height == 0 { + return seeds; + } + + let seed_height = randomx_seed_height(last_height); + seeds.push(seed_height); + last_height = seed_height + } + + seeds +} + +async fn get_block_hashes( + heights: Vec, + database: D, +) -> Result, ExtendedConsensusError> { + let mut fut = FuturesOrdered::new(); + + for height in heights { + let db = database.clone(); + fut.push_back(async move { + let DatabaseResponse::BlockHash(hash) = db + .clone() + .oneshot(DatabaseRequest::BlockHash(height)) + .await? + else { + panic!("Database sent incorrect response!"); + }; + Result::<_, ExtendedConsensusError>::Ok(hash) + }); + } + + let mut res = Vec::new(); + while let Some(hash) = fut.next().await { + res.push(hash?); + } + Ok(res) +} diff --git a/consensus/src/context/weight.rs b/consensus/src/context/weight.rs index 5af1796d..c8c0d5ee 100644 --- a/consensus/src/context/weight.rs +++ b/consensus/src/context/weight.rs @@ -168,18 +168,19 @@ impl BlockWeightsCache { .expect("long term window can't be negative"); match self.cached_sorted_long_term_weights.binary_search(&val) { - Ok(idx) | Err(idx) => self.cached_sorted_long_term_weights.remove(idx), + Ok(idx) => self.cached_sorted_long_term_weights.remove(idx), + Err(_) => panic!("Long term cache has incorrect values!"), }; } self.short_term_block_weights.push_back(block_weight); match self .cached_sorted_short_term_weights - .binary_search(&long_term_weight) + .binary_search(&block_weight) { Ok(idx) | Err(idx) => self .cached_sorted_short_term_weights - .insert(idx, long_term_weight), + .insert(idx, block_weight), } if u64::try_from(self.short_term_block_weights.len()).unwrap() @@ -191,7 +192,8 @@ impl BlockWeightsCache { .expect("short term window can't be negative"); match self.cached_sorted_short_term_weights.binary_search(&val) { - Ok(idx) | Err(idx) => self.cached_sorted_short_term_weights.remove(idx), + Ok(idx) => self.cached_sorted_short_term_weights.remove(idx), + Err(_) => panic!("Short term cache has incorrect values"), }; } diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index b41f7edd..936d30f9 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -11,6 +11,7 @@ mod batch_verifier; pub mod block; pub mod context; mod helper; +pub mod randomx; #[cfg(feature = "binaries")] pub mod rpc; #[cfg(test)] diff --git a/consensus/src/randomx.rs b/consensus/src/randomx.rs new file mode 100644 index 00000000..db1c06be --- /dev/null +++ b/consensus/src/randomx.rs @@ -0,0 +1,37 @@ +use randomx_rs::{RandomXCache, RandomXError, RandomXFlag, RandomXVM as VMInner}; +use thread_local::ThreadLocal; + +use monero_consensus::blocks::RandomX; + +pub struct RandomXVM { + vms: ThreadLocal, + cache: RandomXCache, + flags: RandomXFlag, + seed: [u8; 32], +} + +impl RandomXVM { + pub fn new(seed: [u8; 32]) -> Result { + let flags = RandomXFlag::get_recommended_flags(); + + let cache = RandomXCache::new(flags, &seed)?; + + Ok(RandomXVM { + vms: ThreadLocal::new(), + cache, + flags, + seed, + }) + } +} + +impl RandomX for RandomXVM { + type Error = RandomXError; + + fn calculate_hash(&self, buf: &[u8]) -> Result<[u8; 32], Self::Error> { + self.vms + .get_or_try(|| VMInner::new(self.flags, Some(self.cache.clone()), None))? + .calculate_hash(buf) + .map(|out| out.try_into().unwrap()) + } +} diff --git a/consensus/src/rpc/cache.rs b/consensus/src/rpc/cache.rs index e808e08f..a5597e94 100644 --- a/consensus/src/rpc/cache.rs +++ b/consensus/src/rpc/cache.rs @@ -1,3 +1,5 @@ +#![cfg(feature = "binaries")] + use std::{ collections::HashMap, collections::HashSet, diff --git a/consensus/src/rpc/connection.rs b/consensus/src/rpc/connection.rs index d310def7..e0460c9a 100644 --- a/consensus/src/rpc/connection.rs +++ b/consensus/src/rpc/connection.rs @@ -35,7 +35,7 @@ use crate::{ OutputOnChain, }; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300); -const OUTPUTS_TIMEOUT: Duration = Duration::from_secs(20); +const OUTPUTS_TIMEOUT: Duration = Duration::from_secs(50); pub struct RpcConnectionSvc { pub(crate) address: String, @@ -209,26 +209,34 @@ impl RpcConnection { ) .await?; - rayon_spawn_async(|| { + let address = self.address.clone(); + rayon_spawn_async(move || { let blocks: Response = monero_epee_bin_serde::from_bytes(res)?; blocks .blocks .into_par_iter() .map(|b| { - Ok(( - Block::read(&mut b.block.as_slice())?, - match b.txs { - TransactionBlobs::Pruned(_) => { - return Err("node sent pruned txs!".into()) - } - TransactionBlobs::Normal(txs) => txs - .into_par_iter() - .map(|tx| Transaction::read(&mut tx.as_slice())) - .collect::>()?, - TransactionBlobs::None => vec![], - }, - )) + let block = Block::read(&mut b.block.as_slice())?; + + let txs = match b.txs { + TransactionBlobs::Pruned(_) => return Err("node sent pruned txs!".into()), + TransactionBlobs::Normal(txs) => txs + .into_par_iter() + .map(|tx| Transaction::read(&mut tx.as_slice())) + .collect::>()?, + TransactionBlobs::None => vec![], + }; + + assert_eq!( + block.txs.len(), + txs.len(), + "node: {}, height: {}, node is pruned, which is not supported!", + address, + block.number(), + ); + + Ok((block, txs)) }) .collect::>() })