integrate RandomX, plus some other misc changes.

This commit is contained in:
Boog900 2024-01-05 22:36:47 +00:00
parent 5d6fb3f6b9
commit 40e64cc9c3
No known key found for this signature in database
GPG key ID: 5401367FB7302004
19 changed files with 601 additions and 155 deletions

19
Cargo.lock generated
View file

@ -527,7 +527,7 @@ dependencies = [
[[package]]
name = "dalek-ff-group"
version = "0.4.1"
source = "git+https://github.com/Cuprate/serai.git?rev=77edd00#77edd007255faf256db9026850b1a31201ede22f"
source = "git+https://github.com/Cuprate/serai.git?rev=a59966b#a59966b736ca988c13bd6eb33a9f2204bdf747fb"
dependencies = [
"crypto-bigint",
"curve25519-dalek",
@ -588,7 +588,7 @@ dependencies = [
[[package]]
name = "dleq"
version = "0.4.1"
source = "git+https://github.com/Cuprate/serai.git?rev=77edd00#77edd007255faf256db9026850b1a31201ede22f"
source = "git+https://github.com/Cuprate/serai.git?rev=a59966b#a59966b736ca988c13bd6eb33a9f2204bdf747fb"
dependencies = [
"digest",
"ff",
@ -678,7 +678,7 @@ dependencies = [
[[package]]
name = "flexible-transcript"
version = "0.3.2"
source = "git+https://github.com/Cuprate/serai.git?rev=77edd00#77edd007255faf256db9026850b1a31201ede22f"
source = "git+https://github.com/Cuprate/serai.git?rev=a59966b#a59966b736ca988c13bd6eb33a9f2204bdf747fb"
dependencies = [
"blake2",
"digest",
@ -1175,7 +1175,7 @@ dependencies = [
[[package]]
name = "monero-generators"
version = "0.4.0"
source = "git+https://github.com/Cuprate/serai.git?rev=77edd00#77edd007255faf256db9026850b1a31201ede22f"
source = "git+https://github.com/Cuprate/serai.git?rev=a59966b#a59966b736ca988c13bd6eb33a9f2204bdf747fb"
dependencies = [
"curve25519-dalek",
"dalek-ff-group",
@ -1207,7 +1207,7 @@ dependencies = [
[[package]]
name = "monero-serai"
version = "0.1.4-alpha"
source = "git+https://github.com/Cuprate/serai.git?rev=77edd00#77edd007255faf256db9026850b1a31201ede22f"
source = "git+https://github.com/Cuprate/serai.git?rev=a59966b#a59966b736ca988c13bd6eb33a9f2204bdf747fb"
dependencies = [
"async-lock",
"async-trait",
@ -1253,7 +1253,7 @@ dependencies = [
[[package]]
name = "multiexp"
version = "0.4.0"
source = "git+https://github.com/Cuprate/serai.git?rev=77edd00#77edd007255faf256db9026850b1a31201ede22f"
source = "git+https://github.com/Cuprate/serai.git?rev=a59966b#a59966b736ca988c13bd6eb33a9f2204bdf747fb"
dependencies = [
"ff",
"group",
@ -1555,8 +1555,7 @@ dependencies = [
[[package]]
name = "randomx-rs"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14fb999f322669968fd0e80aeca5cb91e7a817a94ebf2b0fcd345a4a7c695203"
source = "git+https://github.com/Cuprate/randomx-rs.git#6496a61208852a020575dafc160080cf50bda67f"
dependencies = [
"bitflags 1.3.2",
"libc",
@ -1838,7 +1837,7 @@ dependencies = [
[[package]]
name = "simple-request"
version = "0.1.0"
source = "git+https://github.com/Cuprate/serai.git?rev=77edd00#77edd007255faf256db9026850b1a31201ede22f"
source = "git+https://github.com/Cuprate/serai.git?rev=a59966b#a59966b736ca988c13bd6eb33a9f2204bdf747fb"
dependencies = [
"hyper",
"hyper-rustls",
@ -1895,7 +1894,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "std-shims"
version = "0.1.1"
source = "git+https://github.com/Cuprate/serai.git?rev=77edd00#77edd007255faf256db9026850b1a31201ede22f"
source = "git+https://github.com/Cuprate/serai.git?rev=a59966b#a59966b736ca988c13bd6eb33a9f2204bdf747fb"
dependencies = [
"hashbrown 0.14.3",
"spin",

View file

@ -35,19 +35,19 @@ opt-level = 3
[workspace.dependencies]
async-trait = { version = "0.1.74" }
borsh = { version = "1.2.1" }
borsh = { version = "1.2.1", features = ["derive"] }
bytes = { version = "1.5.0" }
clap = { version = "4.4.7" }
chrono = { version = "0.4.31" }
crypto-bigint = { version = "0.5.3" }
curve25519-dalek = { version = "4.1.1" }
dalek-ff-group = { git = "https://github.com/Cuprate/serai.git", rev = "77edd00" }
dalek-ff-group = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b" }
dirs = { version = "5.0.1" }
futures = { version = "0.3.29" }
hex = { version = "0.4.3" }
monero-epee-bin-serde = { git = "https://github.com/monero-rs/monero-epee-bin-serde.git", rev = "e4a585a" }
monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "77edd00" }
multiexp = { git = "https://github.com/Cuprate/serai.git", rev = "77edd00" }
monero-serai = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b" }
multiexp = { git = "https://github.com/Cuprate/serai.git", rev = "a59966b" }
randomx-rs = { version = "1.2.1" }
rand = { version = "0.8.5" }
rayon = { version = "1.8.0" }

View file

@ -37,7 +37,7 @@ futures = "0.3"
crypto-bigint = "0.5"
curve25519-dalek = "4"
randomx-rs = "1"
randomx-rs = {git = "https://github.com/Cuprate/randomx-rs.git"}
monero-serai = { workspace = true }
multiexp = { workspace = true }
dalek-ff-group = { workspace = true }

View file

@ -12,6 +12,11 @@ use crate::{
const BLOCK_SIZE_SANITY_LEEWAY: usize = 100;
const BLOCK_FUTURE_TIME_LIMIT: u64 = 60 * 60 * 2;
const BLOCK_202612_POW_HASH: [u8; 32] =
hex_literal::hex!("84f64766475d51837ac9efbef1926486e58563c95a19fef4aec3254f03000000");
const RX_SEEDHASH_EPOCH_BLOCKS: u64 = 2048;
const RX_SEEDHASH_EPOCH_LAG: u64 = 64;
#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
pub enum BlockError {
@ -31,15 +36,33 @@ pub enum BlockError {
MinerTxError(#[from] MinerTxError),
}
pub trait RandomX {
type Error;
fn calculate_hash(&self, buf: &[u8]) -> Result<[u8; 32], Self::Error>;
}
pub fn is_randomx_seed_height(height: u64) -> bool {
height % RX_SEEDHASH_EPOCH_BLOCKS == 0
}
pub fn randomx_seed_height(height: u64) -> u64 {
if height <= RX_SEEDHASH_EPOCH_BLOCKS + RX_SEEDHASH_EPOCH_LAG {
0
} else {
(height - RX_SEEDHASH_EPOCH_LAG - 1) & !(RX_SEEDHASH_EPOCH_BLOCKS - 1)
}
}
/// Calculates the POW hash of this block.
pub fn calculate_pow_hash(buf: &[u8], height: u64, hf: &HardFork) -> Result<[u8; 32], BlockError> {
pub fn calculate_pow_hash<R: RandomX>(
randomx_vm: &R,
buf: &[u8],
height: u64,
hf: &HardFork,
) -> Result<[u8; 32], BlockError> {
if height == 202612 {
return Ok(
hex::decode("84f64766475d51837ac9efbef1926486e58563c95a19fef4aec3254f03000000")
.unwrap()
.try_into()
.unwrap(),
);
return Ok(BLOCK_202612_POW_HASH);
}
Ok(if hf < &HardFork::V7 {
@ -51,7 +74,9 @@ pub fn calculate_pow_hash(buf: &[u8], height: u64, hf: &HardFork) -> Result<[u8;
} else if hf < &HardFork::V12 {
cryptonight_hash_r(buf, height)
} else {
todo!("RandomX")
randomx_vm
.calculate_hash(buf)
.map_err(|_| BlockError::POWInvalid)?
})
}
@ -64,6 +89,11 @@ pub fn check_block_pow(hash: &[u8; 32], difficulty: u128) -> Result<(), BlockErr
let difficulty = U256::from(difficulty);
if int_hash.checked_mul(difficulty).is_none() {
tracing::debug!(
"Invalid POW: {}, difficulty: {}",
hex::encode(hash),
difficulty
);
Err(BlockError::POWInvalid)
} else {
Ok(())

View file

@ -27,8 +27,8 @@ pub fn decomposed_amounts() -> &'static [u64; 172] {
10000000000000000, 20000000000000000, 30000000000000000, 40000000000000000, 50000000000000000, 60000000000000000, 70000000000000000, 80000000000000000, 90000000000000000,
100000000000000000, 200000000000000000, 300000000000000000, 400000000000000000, 500000000000000000, 600000000000000000, 700000000000000000, 800000000000000000, 900000000000000000,
1000000000000000000, 2000000000000000000, 3000000000000000000, 4000000000000000000, 5000000000000000000, 6000000000000000000, 7000000000000000000, 8000000000000000000, 9000000000000000000,
10000000000000000000]
10000000000000000000
]
})
}
@ -55,7 +55,7 @@ mod tests {
}
#[test]
fn decomposed_amounts_return_not_decomposed() {
fn non_decomposed_amounts_return_not_decomposed() {
assert!(!is_decomposed_amount(&21));
assert!(!is_decomposed_amount(&345431));
assert!(!is_decomposed_amount(&20000001));

View file

@ -8,7 +8,7 @@ use monero_serai::{
use cuprate_common::Network;
fn genesis_nonce(network: &Network) -> u32 {
const fn genesis_nonce(network: &Network) -> u32 {
match network {
Network::Mainnet => 10000,
Network::Testnet => 10001,

View file

@ -81,6 +81,48 @@ impl HFsInfo {
HFInfo::new(2689608, 0),
])
}
pub const fn test_net() -> HFsInfo {
Self([
HFInfo::new(0, 0),
HFInfo::new(624634, 0),
HFInfo::new(800500, 0),
HFInfo::new(801219, 0),
HFInfo::new(802660, 0),
HFInfo::new(971400, 0),
HFInfo::new(1057027, 0),
HFInfo::new(1057058, 0),
HFInfo::new(1057778, 0),
HFInfo::new(1154318, 0),
HFInfo::new(1155038, 0),
HFInfo::new(1308737, 0),
HFInfo::new(1543939, 0),
HFInfo::new(1544659, 0),
HFInfo::new(1982800, 0),
HFInfo::new(1983520, 0),
])
}
pub const fn stage_net() -> HFsInfo {
Self([
HFInfo::new(0, 0),
HFInfo::new(32000, 0),
HFInfo::new(33000, 0),
HFInfo::new(34000, 0),
HFInfo::new(35000, 0),
HFInfo::new(36000, 0),
HFInfo::new(37000, 0),
HFInfo::new(176456, 0),
HFInfo::new(177176, 0),
HFInfo::new(269000, 0),
HFInfo::new(269720, 0),
HFInfo::new(454721, 0),
HFInfo::new(675405, 0),
HFInfo::new(676125, 0),
HFInfo::new(1151000, 0),
HFInfo::new(1151720, 0),
])
}
}
/// An identifier for every hard-fork Monero has had.
@ -177,7 +219,7 @@ impl HardFork {
if self != version {
Err(HardForkError::VersionIncorrect)?;
}
if self < vote {
if self > vote {
Err(HardForkError::VoteTooLow)?;
}

View file

@ -470,7 +470,7 @@ fn check_tx_version(
// TODO: Doc is wrong here
let min = min_tx_version(hf);
if version < &min && decoy_info.not_mixable != 0 {
if version < &min && decoy_info.not_mixable == 0 {
return Err(TransactionError::TransactionVersionInvalid);
}
} else {

View file

@ -1,13 +1,16 @@
#![cfg(feature = "binaries")]
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::time::Duration;
use std::{ops::Range, path::PathBuf, sync::Arc};
use clap::Parser;
use futures::{
channel::{mpsc, oneshot},
SinkExt, StreamExt,
SinkExt, StreamExt, TryFutureExt,
};
use monero_serai::block::Block;
use monero_serai::{block::Block, transaction::Transaction};
use rayon::prelude::*;
use tokio::sync::RwLock;
use tower::{Service, ServiceExt};
@ -16,19 +19,23 @@ use tracing::level_filters::LevelFilter;
use cuprate_common::Network;
use cuprate_consensus::{
block::PrePreparedBlockExPOW,
context::{
BlockChainContextRequest, BlockChainContextResponse, ContextConfig,
UpdateBlockchainCacheData,
},
initialize_blockchain_context, initialize_verifier,
randomx::RandomXVM,
rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig},
Database, DatabaseRequest, DatabaseResponse, PrePreparedBlock, VerifiedBlockInformation,
VerifyBlockRequest, VerifyBlockResponse,
};
use monero_consensus::{blocks::randomx_seed_height, HardFork};
mod tx_pool;
const MAX_BLOCKS_IN_RANGE: u64 = 500;
const MAX_BLOCKS_IN_RANGE: u64 = 200;
const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 1000;
/// Calls for a batch of blocks, returning the response and the time it took.
@ -92,19 +99,19 @@ where
D::Future: Send + 'static,
{
let mut next_fut = tokio::spawn(call_batch(
start_height..(start_height + (MAX_BLOCKS_IN_RANGE * 3)).min(chain_height),
start_height..(start_height + (MAX_BLOCKS_IN_RANGE * 4)).min(chain_height),
database.clone(),
));
for next_batch_start in (start_height..chain_height)
.step_by((MAX_BLOCKS_IN_RANGE * 3) as usize)
.step_by((MAX_BLOCKS_IN_RANGE * 4) as usize)
.skip(1)
{
// Call the next batch while we handle this batch.
let current_fut = std::mem::replace(
&mut next_fut,
tokio::spawn(call_batch(
next_batch_start..(next_batch_start + (MAX_BLOCKS_IN_RANGE * 3)).min(chain_height),
next_batch_start..(next_batch_start + (MAX_BLOCKS_IN_RANGE * 4)).min(chain_height),
database.clone(),
)),
);
@ -114,18 +121,41 @@ where
};
tracing::info!(
"Retrived batch: {:?}, chain height: {}",
(next_batch_start - (MAX_BLOCKS_IN_RANGE * 3))..(next_batch_start),
"Got batch: {:?}, chain height: {}",
(next_batch_start - (MAX_BLOCKS_IN_RANGE * 4))..(next_batch_start),
chain_height
);
let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
let hf = |block: &Block| HardFork::from_version(block.header.major_version).unwrap();
let txs_hf = if blocks.first().map(hf) == blocks.last().map(hf) {
vec![(
txs.into_iter().flatten().collect::<Vec<_>>(),
blocks.first().map(hf).unwrap(),
)]
} else {
let mut txs_hfs: Vec<(Vec<Transaction>, HardFork)> = Vec::new();
let mut last_hf = blocks.first().map(hf).unwrap();
txs_hfs.push((vec![], last_hf));
for (mut txs, current_hf) in txs.into_iter().zip(blocks.iter().map(hf)) {
if current_hf == last_hf {
assert_eq!(txs_hfs.last_mut().unwrap().1, current_hf);
txs_hfs.last_mut().unwrap().0.append(&mut txs);
} else {
txs_hfs.push((txs, current_hf));
last_hf = current_hf;
}
}
txs_hfs
};
let (tx, rx) = oneshot::channel();
new_tx_chan
.send((txs.into_iter().flatten().collect(), tx))
.await?;
rx.await??;
new_tx_chan.send((txs_hf, tx)).await?;
rx.await.unwrap().unwrap();
block_chan.send(blocks).await?;
}
@ -138,6 +168,7 @@ async fn scan_chain<D>(
save_file: PathBuf,
_rpc_config: Arc<std::sync::RwLock<RpcConfig>>,
database: D,
net: Network,
) -> Result<(), tower::BoxError>
where
D: Database + Clone + Send + Sync + 'static,
@ -146,11 +177,15 @@ where
tracing::info!("Beginning chain scan");
// TODO: when we implement all rules use the RPCs chain height, for now we don't check v2 txs.
let chain_height = 3_000_000;
let chain_height = 3_152_725;
tracing::info!("scanning to chain height: {}", chain_height);
let config = ContextConfig::main_net();
let config = match net {
Network::Mainnet => ContextConfig::main_net(),
Network::Stagenet => ContextConfig::stage_net(),
Network::Testnet => ContextConfig::test_net(),
};
let mut ctx_svc = initialize_blockchain_context(config, database.clone()).await?;
@ -173,15 +208,66 @@ where
call_blocks(new_tx_chan, block_tx, start_height, chain_height, database).await
});
let BlockChainContextResponse::Context(ctx) = ctx_svc
.ready()
.await?
.call(BlockChainContextRequest::Get)
.await?
else {
panic!("ctx svc sent wrong response!");
};
let mut rx_seed_cache = ctx.unchecked_blockchain_context().rx_seed_cache.clone();
let mut randomx_vms: Option<HashMap<u64, RandomXVM>> = Some(HashMap::new());
tokio::spawn(async move {
while let Some(blocks) = incoming_blocks.next().await {
let blocks = rayon_spawn_async(|| {
while let Some(mut blocks) = incoming_blocks.next().await {
let unwrapped_rx_vms = randomx_vms.as_mut().unwrap();
let blocks = rayon_spawn_async(move || {
blocks
.into_par_iter()
.map(|block| PrePreparedBlock::new(block).unwrap())
.into_iter()
.map(move |block| PrePreparedBlockExPOW::new(block).unwrap())
.collect::<Vec<_>>()
})
.await;
let seeds_needed = blocks
.iter()
.map(|block| {
rx_seed_cache.new_block(block.block.number() as u64, &block.block_hash);
randomx_seed_height(block.block.number() as u64)
})
.collect::<HashSet<_>>();
unwrapped_rx_vms.retain(|seed_height, _| seeds_needed.contains(seed_height));
for seed_height in seeds_needed {
if !unwrapped_rx_vms.contains_key(&seed_height) {
unwrapped_rx_vms.insert(
seed_height,
RandomXVM::new(rx_seed_cache.get_seeds_hash(seed_height)).unwrap(),
);
}
}
let arc_rx_vms = Arc::new(randomx_vms.take().unwrap());
let cloned_arc_rx_vms = arc_rx_vms.clone();
let blocks = rayon_spawn_async(move || {
blocks
.into_iter()
.map(move |block| {
let rx_vm = arc_rx_vms
.get(&randomx_seed_height(block.block.number() as u64))
.unwrap();
PrePreparedBlock::new(block, rx_vm).unwrap()
})
.collect::<Vec<_>>()
})
.await;
randomx_vms = Some(Arc::into_inner(cloned_arc_rx_vms).unwrap());
prepped_blocks_tx.send(blocks).await.unwrap();
}
});
@ -255,6 +341,8 @@ async fn main() {
let network = match args.network.as_str() {
"mainnet" => Network::Mainnet,
"testnet" => Network::Testnet,
"stagenet" => Network::Stagenet,
_ => panic!("Invalid network, scanner currently only supports mainnet"),
};
@ -262,36 +350,81 @@ async fn main() {
Some(dir) => dir,
None => dirs::cache_dir().unwrap(),
};
file_for_cache.push("cuprate_rpc_scanning_cache.bin");
match network {
Network::Mainnet => file_for_cache.push("cuprate_rpc_scanning_cache.bin"),
Network::Stagenet => file_for_cache.push("cuprate_rpc_scanning_cache_stage_net.bin"),
Network::Testnet => file_for_cache.push("cuprate_rpc_scanning_cache_test_net.bin"),
}
let mut urls = if args.dont_use_default_nodes {
vec![]
} else {
vec![
match network {
Network::Mainnet => vec![
"http://xmr-node.cakewallet.com:18081".to_string(),
"https://node.sethforprivacy.com".to_string(),
"http://nodex.monerujo.io:18081".to_string(),
// "http://nodex.monerujo.io:18081".to_string(),
"http://nodes.hashvault.pro:18081".to_string(),
"http://node.c3pool.com:18081".to_string(),
"http://node.trocador.app:18089".to_string(),
"http://xmr.lukas.services:18089".to_string(),
"http://xmr-node-eu.cakewallet.com:18081".to_string(),
"http://38.105.209.54:18089".to_string(),
"http://68.118.241.70:18089".to_string(),
"http://145.239.97.211:18089".to_string(),
//
"http://xmr-node.cakewallet.com:18081".to_string(),
"https://node.sethforprivacy.com".to_string(),
"http://nodex.monerujo.io:18081".to_string(),
// "http://nodex.monerujo.io:18081".to_string(),
"http://nodes.hashvault.pro:18081".to_string(),
"http://node.c3pool.com:18081".to_string(),
"http://node.trocador.app:18089".to_string(),
"http://xmr.lukas.services:18089".to_string(),
"http://xmr-node-eu.cakewallet.com:18081".to_string(),
"http://38.105.209.54:18089".to_string(),
"http://68.118.241.70:18089".to_string(),
"http://145.239.97.211:18089".to_string(),
]
],
Network::Testnet => vec![
"http://testnet.xmr-tw.org:28081".to_string(),
"http://node3.monerodevs.org:28089".to_string(),
"http://node.monerodevs.org:28089".to_string(),
"http://125.229.105.12:28081".to_string(),
"http://node2.monerodevs.org:28089".to_string(),
"https://testnet.xmr.ditatompel.com".to_string(),
"http://singapore.node.xmr.pm:28081".to_string(),
//
"http://testnet.xmr-tw.org:28081".to_string(),
"http://node3.monerodevs.org:28089".to_string(),
"http://node.monerodevs.org:28089".to_string(),
"http://125.229.105.12:28081".to_string(),
"http://node2.monerodevs.org:28089".to_string(),
"https://testnet.xmr.ditatompel.com".to_string(),
"http://singapore.node.xmr.pm:28081".to_string(),
],
Network::Stagenet => vec![
"http://125.229.105.12:38081".to_string(),
"http://90.189.159.23:38089".to_string(),
"http://stagenet.xmr-tw.org:38081".to_string(),
"http://node.monerodevs.org:38089".to_string(),
"http://stagenet.community.rino.io:38081".to_string(),
"http://node2.monerodevs.org:38089".to_string(),
"http://node3.monerodevs.org:38089".to_string(),
"http://singapore.node.xmr.pm:38081".to_string(),
"https://stagenet.xmr.ditatompel.com".to_string(),
"http://3.10.182.182:38081".to_string(),
//
"http://125.229.105.12:38081".to_string(),
"http://90.189.159.23:38089".to_string(),
"http://stagenet.xmr-tw.org:38081".to_string(),
"http://node.monerodevs.org:38089".to_string(),
"http://stagenet.community.rino.io:38081".to_string(),
"http://node2.monerodevs.org:38089".to_string(),
"http://node3.monerodevs.org:38089".to_string(),
"http://singapore.node.xmr.pm:38081".to_string(),
"https://stagenet.xmr.ditatompel.com".to_string(),
"http://3.10.182.182:38081".to_string(),
],
}
};
urls.extend(args.rpc_nodes.into_iter());
@ -325,7 +458,7 @@ async fn main() {
let rpc = init_rpc_load_balancer(urls, cache.clone(), rpc_config.clone());
scan_chain(cache, file_for_cache, rpc_config, rpc)
scan_chain(cache, file_for_cache, rpc_config, rpc, network)
.await
.unwrap();
}

View file

@ -23,6 +23,7 @@ use cuprate_consensus::{
transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse},
ExtendedConsensusError, TxNotInPool, TxPoolRequest, TxPoolResponse,
};
use monero_consensus::HardFork;
#[derive(Clone)]
pub struct TxPoolHandle {
@ -59,12 +60,12 @@ impl tower::Service<TxPoolRequest> for TxPoolHandle {
}
pub type NewTxChanRec = mpsc::Receiver<(
Vec<Transaction>,
Vec<(Vec<Transaction>, HardFork)>,
oneshot::Sender<Result<(), tower::BoxError>>,
)>;
pub type NewTxChanSen = mpsc::Sender<(
Vec<Transaction>,
Vec<(Vec<Transaction>, HardFork)>,
oneshot::Sender<Result<(), tower::BoxError>>,
)>;
@ -94,16 +95,7 @@ where
pub async fn spawn(
tx_verifier_chan: oneshot::Receiver<TxV>,
mut ctx_svc: Ctx,
) -> Result<
(
TxPoolHandle,
mpsc::Sender<(
Vec<Transaction>,
oneshot::Sender<Result<(), tower::BoxError>>,
)>,
),
tower::BoxError,
> {
) -> Result<(TxPoolHandle, NewTxChanSen), tower::BoxError> {
let BlockChainContextResponse::Context(current_ctx) = ctx_svc
.ready()
.await?
@ -155,8 +147,8 @@ where
}
}
fn handle_txs_req(
&self,
async fn handle_txs_req(
&mut self,
req: TxPoolRequest,
tx: oneshot::Sender<Result<TxPoolResponse, TxNotInPool>>,
) {
@ -164,10 +156,9 @@ where
let mut res = Vec::with_capacity(txs_to_get.len());
let mut txs = self.txs.lock().unwrap();
for tx_hash in txs_to_get {
let Some(tx) = txs.remove(&tx_hash) else {
let Some(tx) = self.txs.lock().unwrap().remove(&tx_hash) else {
tracing::debug!("tx not in pool: {}", hex::encode(tx_hash));
let _ = tx.send(Err(TxNotInPool));
return;
};
@ -179,7 +170,7 @@ where
async fn handle_new_txs(
&mut self,
new_txs: Vec<Transaction>,
new_txs: Vec<(Vec<Transaction>, HardFork)>,
res_chan: oneshot::Sender<Result<(), tower::BoxError>>,
) -> Result<(), tower::BoxError> {
if self.tx_verifier.is_none() {
@ -192,14 +183,15 @@ where
let tx_pool = self.txs.clone();
tokio::spawn(async move {
for (txs, hf) in new_txs {
// We only batch the setup a real tx pool would also call `VerifyTxRequest::Block`
let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier
.ready()
.await
.unwrap()
.call(VerifyTxRequest::BatchSetup {
txs: new_txs,
hf: current_ctx.current_hf,
txs,
hf,
re_org_token: current_ctx.re_org_token.clone(),
})
.await
@ -211,7 +203,11 @@ where
let mut locked_pool = tx_pool.lock().unwrap();
for tx in txs {
locked_pool.insert(tx.tx_hash, tx);
let tx_hash = tx.tx_hash;
if locked_pool.insert(tx_hash, tx).is_some() {
panic!("added same tx to pool twice: {}", hex::encode(tx_hash))
}
}
}
res_chan.send(Ok(())).unwrap();
});
@ -227,13 +223,8 @@ where
mut new_tx_channel: NewTxChanRec,
) {
loop {
futures::select! {
pool_req = tx_pool_handle.next() => {
let Some((req, tx)) = pool_req else {
todo!("Shutdown txpool")
};
self.handle_txs_req(req, tx);
}
tokio::select! {
biased;
new_txs = new_tx_channel.next() => {
let Some(new_txs) = new_txs else {
todo!("Shutdown txpool")
@ -241,6 +232,12 @@ where
self.handle_new_txs(new_txs.0, new_txs.1).await.unwrap()
}
pool_req = tx_pool_handle.next() => {
let Some((req, tx)) = pool_req else {
todo!("Shutdown txpool")
};
self.handle_txs_req(req, tx).await;
}
}
}
}

View file

@ -10,7 +10,7 @@ use monero_serai::block::Block;
use monero_serai::transaction::Input;
use tower::{Service, ServiceExt};
use monero_consensus::blocks::BlockError;
use monero_consensus::blocks::{BlockError, RandomX};
use monero_consensus::miner_tx::MinerTxError;
use monero_consensus::{
blocks::{calculate_pow_hash, check_block, check_block_pow},
@ -24,6 +24,37 @@ use crate::{
ExtendedConsensusError, TxNotInPool, TxPoolRequest, TxPoolResponse,
};
#[derive(Debug)]
pub struct PrePreparedBlockExPOW {
pub block: Block,
pub block_blob: Vec<u8>,
pub hf_vote: HardFork,
pub hf_version: HardFork,
pub block_hash: [u8; 32],
pub miner_tx_weight: usize,
}
impl PrePreparedBlockExPOW {
pub fn new(block: Block) -> Result<PrePreparedBlockExPOW, ConsensusError> {
let (hf_version, hf_vote) =
HardFork::from_block_header(&block.header).map_err(BlockError::HardForkError)?;
Ok(PrePreparedBlockExPOW {
block_blob: block.serialize(),
hf_vote,
hf_version,
block_hash: block.hash(),
miner_tx_weight: block.miner_tx.weight(),
block,
})
}
}
#[derive(Debug)]
pub struct PrePreparedBlock {
pub block: Block,
@ -39,26 +70,31 @@ pub struct PrePreparedBlock {
}
impl PrePreparedBlock {
pub fn new(block: Block) -> Result<PrePreparedBlock, ConsensusError> {
let (hf_version, hf_vote) =
HardFork::from_block_header(&block.header).map_err(BlockError::HardForkError)?;
let Some(Input::Gen(height)) = block.miner_tx.prefix.inputs.first() else {
pub fn new<R: RandomX>(
block: PrePreparedBlockExPOW,
randomx_vm: &R,
) -> Result<PrePreparedBlock, ConsensusError> {
let Some(Input::Gen(height)) = block.block.miner_tx.prefix.inputs.first() else {
Err(ConsensusError::Block(BlockError::MinerTxError(
MinerTxError::InputNotOfTypeGen,
)))?
};
Ok(PrePreparedBlock {
block_blob: block.serialize(),
hf_vote,
hf_version,
block_blob: block.block_blob,
hf_vote: block.hf_vote,
hf_version: block.hf_version,
block_hash: block.hash(),
pow_hash: calculate_pow_hash(&block.serialize_hashable(), *height, &hf_vote)?,
block_hash: block.block_hash,
pow_hash: calculate_pow_hash(
randomx_vm,
&block.block.serialize_hashable(),
*height,
&block.hf_version,
)?,
miner_tx_weight: block.miner_tx.weight(),
block,
miner_tx_weight: block.block.miner_tx.weight(),
block: block.block,
})
}
}
@ -321,11 +357,14 @@ where
// do POW test last
let chain_height = context.chain_height;
let current_hf = context.current_hf;
let pow_hash =
rayon_spawn_async(move || calculate_pow_hash(&hashing_blob, chain_height, &current_hf))
let pow_hash = todo!();
/*
rayon_spawn_async(move || calculate_pow_hash(, &hashing_blob, chain_height, &current_hf))
.await
.map_err(ConsensusError::Block)?;
*/
check_block_pow(&pow_hash, context.next_difficulty).map_err(ConsensusError::Block)?;
Ok(VerifyBlockResponse::MainChain(VerifiedBlockInformation {

View file

@ -28,6 +28,7 @@ mod difficulty;
mod hardforks;
mod weight;
mod rx_seed;
#[cfg(test)]
mod tests;
mod tokens;
@ -53,6 +54,22 @@ impl ContextConfig {
weights_config: BlockWeightsCacheConfig::main_net(),
}
}
pub fn stage_net() -> ContextConfig {
ContextConfig {
hard_fork_cfg: HardForkConfig::stage_net(),
difficulty_cfg: DifficultyCacheConfig::main_net(),
weights_config: BlockWeightsCacheConfig::main_net(),
}
}
pub fn test_net() -> ContextConfig {
ContextConfig {
hard_fork_cfg: HardForkConfig::test_net(),
difficulty_cfg: DifficultyCacheConfig::main_net(),
weights_config: BlockWeightsCacheConfig::main_net(),
}
}
}
pub async fn initialize_blockchain_context<D>(
@ -117,6 +134,11 @@ where
hardforks::HardForkState::init_from_chain_height(chain_height, hard_fork_cfg, db).await
});
let db = database.clone();
let rx_seed_handle = tokio::spawn(async move {
rx_seed::RandomXSeed::init_from_chain_height(chain_height, db).await
});
let context_svc = BlockChainContextService {
internal_blockchain_context: Arc::new(
InternalBlockChainContext {
@ -124,6 +146,7 @@ where
current_reorg_token: ReOrgToken::new(),
difficulty_cache: difficulty_cache_handle.await.unwrap()?,
weight_cache: weight_cache_handle.await.unwrap()?,
rx_seed_cache: rx_seed_handle.await.unwrap()?,
hardfork_state: hardfork_state_handle.await.unwrap()?,
chain_height,
already_generated_coins,
@ -145,6 +168,7 @@ pub struct RawBlockChainContext {
pub cumulative_difficulty: u128,
/// A token which is used to signal if a reorg has happened since creating the token.
pub re_org_token: ReOrgToken,
pub rx_seed_cache: rx_seed::RandomXSeed,
pub context_to_verify_block: ContextToVerifyBlock,
/// The median long term block weight.
median_long_term_weight: usize,
@ -254,8 +278,6 @@ pub enum BlockChainContextResponse {
Context(BlockChainContext),
Ok,
}
#[derive(Clone)]
struct InternalBlockChainContext {
/// A token used to invalidate previous contexts when a new
/// block is added to the chain.
@ -265,6 +287,7 @@ struct InternalBlockChainContext {
difficulty_cache: difficulty::DifficultyCache,
weight_cache: weight::BlockWeightsCache,
rx_seed_cache: rx_seed::RandomXSeed,
hardfork_state: hardforks::HardForkState,
chain_height: u64,
@ -324,6 +347,7 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
current_reorg_token,
difficulty_cache,
weight_cache,
rx_seed_cache,
hardfork_state,
chain_height,
top_block_hash,
@ -351,6 +375,7 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
next_difficulty: difficulty_cache.next_difficulty(&current_hf),
already_generated_coins: *already_generated_coins,
},
rx_seed_cache: rx_seed_cache.clone(),
cumulative_difficulty: difficulty_cache.cumulative_difficulty(),
median_long_term_weight: weight_cache.median_long_term_weight(),
top_block_timestamp: difficulty_cache.top_block_timestamp(),
@ -368,6 +393,8 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
hardfork_state.new_block(new.vote, new.height);
rx_seed_cache.new_block(new.height, &new.new_top_hash);
*chain_height = new.height + 1;
*top_block_hash = new.new_top_hash;
*already_generated_coins =

View file

@ -30,6 +30,20 @@ impl HardForkConfig {
window: DEFAULT_WINDOW_SIZE,
}
}
pub const fn stage_net() -> HardForkConfig {
Self {
info: HFsInfo::stage_net(),
window: DEFAULT_WINDOW_SIZE,
}
}
pub const fn test_net() -> HardForkConfig {
Self {
info: HFsInfo::test_net(),
window: DEFAULT_WINDOW_SIZE,
}
}
}
/// A struct that keeps track of the current hard-fork and current votes.

View file

@ -0,0 +1,115 @@
use std::collections::VecDeque;
use futures::{stream::FuturesOrdered, StreamExt};
use tower::ServiceExt;
use monero_consensus::blocks::{is_randomx_seed_height, randomx_seed_height};
use crate::{Database, DatabaseRequest, DatabaseResponse, ExtendedConsensusError};
const RX_SEEDS_CACHED: usize = 3;
#[derive(Clone, Debug)]
pub struct RandomXSeed {
seeds: VecDeque<(u64, [u8; 32])>,
}
impl RandomXSeed {
pub async fn init_from_chain_height<D: Database + Clone>(
chain_height: u64,
database: D,
) -> Result<Self, ExtendedConsensusError> {
let seed_heights = get_last_rx_seed_heights(chain_height - 1, RX_SEEDS_CACHED);
let seed_hashes = get_block_hashes(seed_heights.clone(), database).await?;
Ok(RandomXSeed {
seeds: seed_heights.into_iter().zip(seed_hashes).collect(),
})
}
pub fn get_seeds_hash(&self, seed_height: u64) -> [u8; 32] {
for (height, seed) in self.seeds.iter() {
if seed_height == *height {
return *seed;
}
}
tracing::error!(
"Current seeds: {:?}, asked for: {}",
self.seeds,
seed_height
);
panic!("RX seed cache was not updated or was asked for a block too old.")
}
pub fn get_rx_seed(&self, height: u64) -> [u8; 32] {
let seed_height = randomx_seed_height(height);
tracing::warn!(
"Current seeds: {:?}, asked for: {}",
self.seeds,
seed_height
);
self.get_seeds_hash(seed_height)
}
pub fn new_block(&mut self, height: u64, hash: &[u8; 32]) {
if is_randomx_seed_height(height) {
for (got_height, _) in self.seeds.iter() {
if *got_height == height {
return;
}
}
self.seeds.pop_back();
self.seeds.push_front((height, *hash));
}
}
}
fn get_last_rx_seed_heights(mut last_height: u64, mut amount: usize) -> Vec<u64> {
let mut seeds = Vec::with_capacity(amount);
if is_randomx_seed_height(last_height) {
seeds.push(last_height);
amount -= 1;
}
for _ in 0..amount {
if last_height == 0 {
return seeds;
}
let seed_height = randomx_seed_height(last_height);
seeds.push(seed_height);
last_height = seed_height
}
seeds
}
async fn get_block_hashes<D: Database + Clone>(
heights: Vec<u64>,
database: D,
) -> Result<Vec<[u8; 32]>, ExtendedConsensusError> {
let mut fut = FuturesOrdered::new();
for height in heights {
let db = database.clone();
fut.push_back(async move {
let DatabaseResponse::BlockHash(hash) = db
.clone()
.oneshot(DatabaseRequest::BlockHash(height))
.await?
else {
panic!("Database sent incorrect response!");
};
Result::<_, ExtendedConsensusError>::Ok(hash)
});
}
let mut res = Vec::new();
while let Some(hash) = fut.next().await {
res.push(hash?);
}
Ok(res)
}

View file

@ -168,18 +168,19 @@ impl BlockWeightsCache {
.expect("long term window can't be negative");
match self.cached_sorted_long_term_weights.binary_search(&val) {
Ok(idx) | Err(idx) => self.cached_sorted_long_term_weights.remove(idx),
Ok(idx) => self.cached_sorted_long_term_weights.remove(idx),
Err(_) => panic!("Long term cache has incorrect values!"),
};
}
self.short_term_block_weights.push_back(block_weight);
match self
.cached_sorted_short_term_weights
.binary_search(&long_term_weight)
.binary_search(&block_weight)
{
Ok(idx) | Err(idx) => self
.cached_sorted_short_term_weights
.insert(idx, long_term_weight),
.insert(idx, block_weight),
}
if u64::try_from(self.short_term_block_weights.len()).unwrap()
@ -191,7 +192,8 @@ impl BlockWeightsCache {
.expect("short term window can't be negative");
match self.cached_sorted_short_term_weights.binary_search(&val) {
Ok(idx) | Err(idx) => self.cached_sorted_short_term_weights.remove(idx),
Ok(idx) => self.cached_sorted_short_term_weights.remove(idx),
Err(_) => panic!("Short term cache has incorrect values"),
};
}

View file

@ -11,6 +11,7 @@ mod batch_verifier;
pub mod block;
pub mod context;
mod helper;
pub mod randomx;
#[cfg(feature = "binaries")]
pub mod rpc;
#[cfg(test)]

37
consensus/src/randomx.rs Normal file
View file

@ -0,0 +1,37 @@
use randomx_rs::{RandomXCache, RandomXError, RandomXFlag, RandomXVM as VMInner};
use thread_local::ThreadLocal;
use monero_consensus::blocks::RandomX;
pub struct RandomXVM {
vms: ThreadLocal<VMInner>,
cache: RandomXCache,
flags: RandomXFlag,
seed: [u8; 32],
}
impl RandomXVM {
pub fn new(seed: [u8; 32]) -> Result<Self, RandomXError> {
let flags = RandomXFlag::get_recommended_flags();
let cache = RandomXCache::new(flags, &seed)?;
Ok(RandomXVM {
vms: ThreadLocal::new(),
cache,
flags,
seed,
})
}
}
impl RandomX for RandomXVM {
type Error = RandomXError;
fn calculate_hash(&self, buf: &[u8]) -> Result<[u8; 32], Self::Error> {
self.vms
.get_or_try(|| VMInner::new(self.flags, Some(self.cache.clone()), None))?
.calculate_hash(buf)
.map(|out| out.try_into().unwrap())
}
}

View file

@ -1,3 +1,5 @@
#![cfg(feature = "binaries")]
use std::{
collections::HashMap,
collections::HashSet,

View file

@ -35,7 +35,7 @@ use crate::{
OutputOnChain,
};
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
const OUTPUTS_TIMEOUT: Duration = Duration::from_secs(20);
const OUTPUTS_TIMEOUT: Duration = Duration::from_secs(50);
pub struct RpcConnectionSvc {
pub(crate) address: String,
@ -209,26 +209,34 @@ impl RpcConnection {
)
.await?;
rayon_spawn_async(|| {
let address = self.address.clone();
rayon_spawn_async(move || {
let blocks: Response = monero_epee_bin_serde::from_bytes(res)?;
blocks
.blocks
.into_par_iter()
.map(|b| {
Ok((
Block::read(&mut b.block.as_slice())?,
match b.txs {
TransactionBlobs::Pruned(_) => {
return Err("node sent pruned txs!".into())
}
let block = Block::read(&mut b.block.as_slice())?;
let txs = match b.txs {
TransactionBlobs::Pruned(_) => return Err("node sent pruned txs!".into()),
TransactionBlobs::Normal(txs) => txs
.into_par_iter()
.map(|tx| Transaction::read(&mut tx.as_slice()))
.collect::<Result<_, _>>()?,
TransactionBlobs::None => vec![],
},
))
};
assert_eq!(
block.txs.len(),
txs.len(),
"node: {}, height: {}, node is pruned, which is not supported!",
address,
block.number(),
);
Ok((block, txs))
})
.collect::<Result<_, tower::BoxError>>()
})