add block/ tx verifier svc

This commit is contained in:
Boog900 2023-10-23 19:14:40 +01:00
parent 50f9458528
commit eeefe49d63
No known key found for this signature in database
GPG key ID: 5401367FB7302004
24 changed files with 1823 additions and 1104 deletions

View file

@ -10,7 +10,8 @@ repository = "https://github.com/Cuprate/cuprate/tree/main/consensus"
[features]
default = ["binaries"]
binaries = [
"dep:tokio",
"tokio/rt-multi-thread",
"tokio/macros",
"dep:tracing-subscriber",
"tower/retry",
"tower/balance",
@ -28,22 +29,28 @@ tower = {version = "0.4", features = ["util"]}
tracing = "0.1"
futures = "0.3"
sha3 = "0.10"
crypto-bigint = "0.5"
curve25519-dalek = "4"
randomx-rs = "1"
monero-serai = {git="https://github.com/Cuprate/serai.git", rev = "39eafae"}
multiexp = {git="https://github.com/Cuprate/serai.git", rev = "39eafae"}
dalek-ff-group = {git="https://github.com/Cuprate/serai.git", rev = "39eafae"}
cuprate-common = {path = "../common"}
cryptonight-cuprate = {path = "../cryptonight"}
rayon = "1"
tokio = "1"
# used in binaries
monero-wire = {path="../net/monero-wire", optional = true}
monero-epee-bin-serde = {git = "https://github.com/monero-rs/monero-epee-bin-serde.git", rev = "e4a585a", optional = true}
serde_json = {version = "1", optional = true}
serde = {version = "1", optional = true, features = ["derive"]}
tokio = { version = "1", features = ["rt-multi-thread", "macros"], optional = true }
tracing-subscriber = {version = "0.3", optional = true}
# here to help cargo to pick a version - remove me
syn = "2.0.37"
[profile.dev]
opt-level = 3

View file

@ -1,129 +1,82 @@
#![cfg(feature = "binaries")]
use futures::Sink;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::io::Read;
use std::ops::Range;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tower::ServiceExt;
use rayon::prelude::*;
use tower::{Service, ServiceExt};
use tracing::instrument;
use tracing::level_filters::LevelFilter;
use cuprate_common::Network;
use monero_consensus::hardforks::HardFork;
use monero_consensus::rpc::{init_rpc_load_balancer, RpcConfig};
use monero_consensus::rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig};
use monero_consensus::{
verifier::{Config, Verifier},
Database, DatabaseRequest, DatabaseResponse,
context::{ContextConfig, UpdateBlockchainCacheRequest},
initialize_verifier, Database, DatabaseRequest, DatabaseResponse, VerifiedBlockInformation,
VerifyBlockRequest,
};
const INITIAL_MAX_BLOCKS_IN_RANGE: u64 = 250;
const INITIAL_MAX_BLOCKS_IN_RANGE: u64 = 1000;
const MAX_BLOCKS_IN_RANGE: u64 = 1000;
const INITIAL_MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 250;
/// A cache which can keep chain state while scanning.
///
/// Because we are using a RPC interface with a node we need to keep track
/// of certain data that the node doesn't hold like the number of outputs at
/// a certain time.
#[derive(Debug, Clone)]
struct ScanningCache {
network: Network,
numb_outs: HashMap<u64, u64>,
/// The height of the *next* block to scan.
height: u64,
/// Calls for a batch of blocks, returning the response and the time it took.
async fn call_batch<D: Database>(
range: Range<u64>,
database: D,
) -> Result<(DatabaseResponse, Duration), tower::BoxError> {
let now = std::time::Instant::now();
Ok((
database
.oneshot(DatabaseRequest::BlockBatchInRange(range))
.await?,
now.elapsed(),
))
}
impl Default for ScanningCache {
fn default() -> Self {
ScanningCache {
network: Default::default(),
numb_outs: Default::default(),
height: 1_000_000,
}
}
}
impl ScanningCache {
fn total_outs(&self) -> u64 {
self.numb_outs.values().sum()
}
fn numb_outs(&self, amount: u64) -> u64 {
*self.numb_outs.get(&amount).unwrap_or(&0)
}
fn add_outs(&mut self, amount: u64, count: u64) {
if let Some(numb_outs) = self.numb_outs.get_mut(&amount) {
*numb_outs += count;
} else {
self.numb_outs.insert(amount, count);
}
}
}
impl Display for ScanningCache {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let rct_outs = self.numb_outs(0);
let total_outs = self.total_outs();
f.debug_struct("Cache")
.field("next_block", &self.height)
.field("rct_outs", &rct_outs)
.field("total_outs", &total_outs)
.finish()
}
}
async fn scan_chain<D: Database + Clone + Send + 'static>(
cache: ScanningCache,
async fn scan_chain<D>(
cache: Arc<RwLock<ScanningCache>>,
network: Network,
rpc_config: Arc<RwLock<RpcConfig>>,
mut database: D,
) -> Result<(), tower::BoxError>
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
tracing::info!("Beginning chain scan, {}", &cache);
tracing::info!("Beginning chain scan");
let DatabaseResponse::ChainHeight(chain_height) = database
.ready()
.await?
.call(DatabaseRequest::ChainHeight)
.await?
else {
panic!("Database sent incorrect response!");
};
let chain_height = 3_000_000;
tracing::info!("scanning to chain height: {}", chain_height);
let config = match network {
Network::Mainnet => Config::main_net(),
_ => todo!(),
};
let config = ContextConfig::main_net();
//let verifier = Verifier::init_at_chain_height(config, cache.height, database.clone()).await?;
let (mut block_verifier, _, mut context_updater) =
initialize_verifier(database.clone(), config).await?;
tracing::info!("Initialised verifier, begging scan");
let batch_size = rpc_config.read().unwrap().block_batch_size();
let start_height = cache.read().unwrap().height;
let mut db = database.clone();
let mut next_fut = tokio::spawn(async move {
let now = std::time::Instant::now();
(
db.ready()
.await
.unwrap()
.call(DatabaseRequest::BlockBatchInRange(
cache.height..(cache.height + batch_size).min(chain_height),
))
.await
.unwrap(),
now.elapsed(),
)
});
tracing::info!(
"Initialised verifier, begging scan from {} to {}",
start_height,
chain_height
);
let mut current_height = cache.height;
let mut next_batch_start_height = cache.height + batch_size;
let mut next_fut = tokio::spawn(call_batch(
start_height..(start_height + batch_size).min(chain_height),
database.clone(),
));
let mut current_height = start_height;
let mut next_batch_start_height = start_height + batch_size;
let mut time_to_verify_last_batch: u128 = 0;
@ -133,29 +86,17 @@ where
let next_batch_size = rpc_config.read().unwrap().block_batch_size();
// Call the next batch while we handle this batch.
let mut db = database.clone();
let current_fut = std::mem::replace(
&mut next_fut,
tokio::spawn(async move {
let now = std::time::Instant::now();
(
db.ready()
.await
.unwrap()
.call(DatabaseRequest::BlockBatchInRange(
next_batch_start_height
..(next_batch_start_height + next_batch_size).min(chain_height),
))
.await
.unwrap(),
now.elapsed(),
)
}),
tokio::spawn(call_batch(
next_batch_start_height
..(next_batch_start_height + next_batch_size).min(chain_height),
database.clone(),
)),
);
let (DatabaseResponse::BlockBatchInRange(blocks), time_to_retrieve_batch) =
current_fut.await?
current_fut.await??
else {
panic!("Database sent incorrect response!");
};
@ -163,44 +104,46 @@ where
let time_to_verify_batch = std::time::Instant::now();
let time_to_retrieve_batch = time_to_retrieve_batch.as_millis();
/*
if time_to_retrieve_batch > time_to_verify_last_batch + 2000
&& batches_till_check_batch_size == 0
{
batches_till_check_batch_size = 3;
if time_to_retrieve_batch > time_to_verify_last_batch + 2000
&& batches_till_check_batch_size == 0
{
batches_till_check_batch_size = 3;
let mut conf = rpc_config.write().unwrap();
tracing::info!(
"Decreasing batch size time to verify last batch: {}, time_to_retrieve_batch: {}",
time_to_verify_last_batch,
time_to_retrieve_batch
);
conf.max_blocks_per_node = (conf.max_blocks_per_node
* time_to_verify_last_batch as u64
/ (time_to_retrieve_batch as u64))
.max(10_u64)
.min(MAX_BLOCKS_IN_RANGE);
tracing::info!("Decreasing batch size to: {}", conf.max_blocks_per_node);
} else if time_to_retrieve_batch + 2000 < time_to_verify_last_batch
&& batches_till_check_batch_size == 0
{
batches_till_check_batch_size = 3;
let mut conf = rpc_config.write().unwrap();
tracing::info!(
"Decreasing batch size time to verify last batch: {}, time_to_retrieve_batch: {}",
time_to_verify_last_batch,
time_to_retrieve_batch
);
conf.max_blocks_per_node = (conf.max_blocks_per_node
* time_to_verify_last_batch as u64
/ (time_to_retrieve_batch as u64))
.max(10_u64)
.min(MAX_BLOCKS_IN_RANGE);
tracing::info!("Decreasing batch size to: {}", conf.max_blocks_per_node);
} else if time_to_retrieve_batch + 2000 < time_to_verify_last_batch
&& batches_till_check_batch_size == 0
{
batches_till_check_batch_size = 3;
let mut conf = rpc_config.write().unwrap();
tracing::info!(
"Increasing batch size time to verify last batch: {}, time_to_retrieve_batch: {}",
time_to_verify_last_batch,
time_to_retrieve_batch
);
conf.max_blocks_per_node = (conf.max_blocks_per_node
* (time_to_verify_last_batch as u64)
/ time_to_retrieve_batch.max(1) as u64)
.max(30_u64)
.min(MAX_BLOCKS_IN_RANGE);
tracing::info!("Increasing batch size to: {}", conf.max_blocks_per_node);
} else {
batches_till_check_batch_size = batches_till_check_batch_size.saturating_sub(1);
}
let mut conf = rpc_config.write().unwrap();
tracing::info!(
"Increasing batch size time to verify last batch: {}, time_to_retrieve_batch: {}",
time_to_verify_last_batch,
time_to_retrieve_batch
);
conf.max_blocks_per_node = (conf.max_blocks_per_node
* (time_to_verify_last_batch as u64)
/ time_to_retrieve_batch.max(1) as u64)
.max(30_u64)
.min(MAX_BLOCKS_IN_RANGE);
tracing::info!("Increasing batch size to: {}", conf.max_blocks_per_node);
} else {
batches_till_check_batch_size = batches_till_check_batch_size.saturating_sub(1);
}
*/
tracing::info!(
"Handling batch: {:?}, chain height: {}",
@ -208,18 +151,34 @@ where
chain_height
);
for (block, txs) in blocks.into_iter() {
let pow_hash = monero_consensus::block::pow::calculate_pow_hash(
&block.serialize_hashable(),
block.number() as u64,
&HardFork::V1,
);
// let block_len = blocks.len();
for (block, txs) in blocks {
let verified_block_info: VerifiedBlockInformation = block_verifier
.ready()
.await?
.call(VerifyBlockRequest::MainChainBatchSetupVerify(block, txs))
.await?;
tracing::info!(
"Verified block: {}, numb txs: {}",
current_height,
txs.len()
cache.write().unwrap().add_new_block_data(
verified_block_info.generated_coins,
&verified_block_info.block.miner_tx,
&verified_block_info.txs,
);
context_updater
.ready()
.await?
.call(UpdateBlockchainCacheRequest {
new_top_hash: verified_block_info.block_hash,
height: verified_block_info.height,
timestamp: verified_block_info.block.header.timestamp,
weight: verified_block_info.weight,
long_term_weight: verified_block_info.long_term_weight,
vote: verified_block_info.hf_vote,
generated_coins: verified_block_info.generated_coins,
})
.await?;
tracing::info!("Verified block: {}", current_height);
current_height += 1;
next_batch_start_height += 1;
@ -237,6 +196,8 @@ async fn main() {
.with_max_level(LevelFilter::INFO)
.init();
let network = Network::Mainnet;
let urls = vec![
"http://xmr-node.cakewallet.com:18081".to_string(),
"http://node.sethforprivacy.com".to_string(),
@ -269,10 +230,26 @@ async fn main() {
);
let rpc_config = Arc::new(RwLock::new(rpc_config));
let rpc = init_rpc_load_balancer(urls, rpc_config.clone());
let cache = Arc::new(RwLock::new(ScanningCache::default()));
let network = Network::Mainnet;
let cache = ScanningCache::default();
let mut cache_write = cache.write().unwrap();
if cache_write.height == 0 {
let genesis = monero_consensus::genesis::generate_genesis_block(&network);
let total_outs = genesis
.miner_tx
.prefix
.outputs
.iter()
.map(|out| out.amount.unwrap_or(0))
.sum::<u64>();
cache_write.add_new_block_data(total_outs, &genesis.miner_tx, &[]);
}
drop(cache_write);
let rpc = init_rpc_load_balancer(urls, cache.clone(), rpc_config.clone());
scan_chain(cache, network, rpc_config, rpc).await.unwrap();
}

View file

@ -1,102 +1,180 @@
use monero_serai::block::Block;
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use crate::{hardforks::BlockHFInfo, helper::current_time, ConsensusError};
use futures::FutureExt;
use monero_serai::{block::Block, transaction::Transaction};
use tower::{Service, ServiceExt};
pub mod difficulty;
pub mod pow;
pub mod reward;
pub mod weight;
use crate::{
context::{BlockChainContext, BlockChainContextRequest},
transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse},
ConsensusError, HardFork,
};
pub use difficulty::{DifficultyCache, DifficultyCacheConfig};
pub use pow::{check_block_pow, BlockPOWInfo};
pub use weight::{block_weight, BlockWeightInfo, BlockWeightsCache, BlockWeightsCacheConfig};
mod hash_worker;
mod miner_tx;
const BLOCK_SIZE_SANITY_LEEWAY: usize = 100;
const BLOCK_FUTURE_TIME_LIMIT: u64 = 60 * 60 * 2;
#[derive(Debug)]
pub struct VerifiedBlockInformation {
pub block: Block,
pub hf_vote: HardFork,
pub txs: Vec<Arc<TransactionVerificationData>>,
pub block_hash: [u8; 32],
pub pow_hash: [u8; 32],
pub height: u64,
pub generated_coins: u64,
pub weight: usize,
pub long_term_weight: usize,
}
pub struct BlockVerificationData {
hf: BlockHFInfo,
pow: BlockPOWInfo,
current_difficulty: u128,
weights: BlockWeightInfo,
block_blob: Vec<u8>,
pub enum VerifyBlockRequest {
MainChainBatchSetupVerify(Block, Vec<Transaction>),
MainChain(Block, Vec<Arc<TransactionVerificationData>>),
}
pub enum VerifyBlockResponse {
MainChainBatchSetupVerify(),
}
// TODO: it is probably a bad idea for this to derive clone, if 2 places (RPC, P2P) receive valid but different blocks
// then they will both get approved but only one should go to main chain.
#[derive(Clone)]
pub struct BlockVerifierService<C: Clone, Tx: Clone> {
context_svc: C,
tx_verifier_svc: Tx,
}
impl<C, Tx> BlockVerifierService<C, Tx>
where
C: Service<BlockChainContextRequest, Response = BlockChainContext> + Clone + Send + 'static,
Tx: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>
+ Clone
+ Send
+ 'static,
{
pub fn new(context_svc: C, tx_verifier_svc: Tx) -> BlockVerifierService<C, Tx> {
BlockVerifierService {
context_svc,
tx_verifier_svc,
}
}
}
impl<C, Tx> Service<VerifyBlockRequest> for BlockVerifierService<C, Tx>
where
C: Service<BlockChainContextRequest, Response = BlockChainContext, Error = tower::BoxError>
+ Clone
+ Send
+ 'static,
C::Future: Send + 'static,
Tx: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>
+ Clone
+ Send
+ 'static,
Tx::Future: Send + 'static,
{
type Response = VerifiedBlockInformation;
type Error = ConsensusError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
futures::ready!(self.context_svc.poll_ready(cx)).map(Into::into)?;
self.tx_verifier_svc.poll_ready(cx)
}
fn call(&mut self, req: VerifyBlockRequest) -> Self::Future {
let context_svc = self.context_svc.clone();
let tx_verifier_svc = self.tx_verifier_svc.clone();
async move {
match req {
VerifyBlockRequest::MainChainBatchSetupVerify(block, txs) => {
batch_setup_verify_main_chain_block(block, txs, context_svc, tx_verifier_svc)
.await
}
_ => todo!(),
}
}
.boxed()
}
}
async fn batch_setup_verify_main_chain_block<C, Tx>(
block: Block,
block_hash: [u8; 32],
pow_hash: [u8; 32],
// txs: Vec<T>,
}
txs: Vec<Transaction>,
context_svc: C,
tx_verifier_svc: Tx,
) -> Result<VerifiedBlockInformation, ConsensusError>
where
C: Service<BlockChainContextRequest, Response = BlockChainContext, Error = tower::BoxError>
+ Send
+ 'static,
C::Future: Send + 'static,
Tx: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>,
{
tracing::info!("getting blockchain context");
let context = context_svc
.oneshot(BlockChainContextRequest)
.await
.map_err(Into::<ConsensusError>::into)?;
impl BlockVerificationData {
pub fn new(
block: Block,
difficulty_cache: &DifficultyCache,
weight_cache: &BlockWeightsCache,
) -> Result<BlockVerificationData, ConsensusError> {
let hf = BlockHFInfo::from_block_header(&block.header)?;
tracing::info!("got blockchain context: {:?}", context);
let current_diff = difficulty_cache.next_difficulty(&hf.version);
let cum_diff = difficulty_cache.cumulative_difficulty() + current_diff;
todo!()
/*
Ok(BlockVerificationData {
hf: BlockHFInfo::from_block_header(&block.header)?,
pow: BlockPOWInfo::new(block.header.timestamp, cum_diff),
weights:
})
*/
}
}
/// Sanity check on the block blob size.
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks.html#block-weight-and-size
fn block_size_sanity_check(
block_blob_len: usize,
effective_median: usize,
) -> Result<(), ConsensusError> {
if block_blob_len > effective_median * 2 + BLOCK_SIZE_SANITY_LEEWAY {
Err(ConsensusError::BlockIsTooLarge)
let txs = if !txs.is_empty() {
let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier_svc
.oneshot(VerifyTxRequest::BatchSetupVerifyBlock {
txs,
current_chain_height: context.chain_height,
hf: context.current_hard_fork,
})
.await?
else {
panic!("tx verifier sent incorrect response!");
};
txs
} else {
Ok(())
}
}
vec![]
};
/// Sanity check on the block weight.
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks.html#block-weight-and-siz
fn block_weight_check(
block_weight: usize,
median_for_block_reward: usize,
) -> Result<(), ConsensusError> {
if block_weight > median_for_block_reward * 2 {
Err(ConsensusError::BlockIsTooLarge)
} else {
Ok(())
}
}
let block_weight = block.miner_tx.weight() + txs.iter().map(|tx| tx.tx_weight).sum::<usize>();
let total_fees = txs.iter().map(|tx| tx.fee).sum::<u64>();
/// Verifies the previous id is the last blocks hash
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks.html#previous-id
fn check_prev_id(block: &Block, top_hash: &[u8; 32]) -> Result<(), ConsensusError> {
if &block.header.previous != top_hash {
Err(ConsensusError::BlockIsNotApartOfChain)
} else {
Ok(())
}
}
let generated_coins = miner_tx::check_miner_tx(
&block.miner_tx,
total_fees,
context.chain_height,
block_weight,
context.median_weight_for_block_reward,
context.already_generated_coins,
&context.current_hard_fork,
)?;
/// Checks the blocks timestamp is in the valid range.
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks.html#timestamp
fn check_timestamp(block: &Block, median_timestamp: u64) -> Result<(), ConsensusError> {
if block.header.timestamp < median_timestamp
|| block.header.timestamp > current_time() + BLOCK_FUTURE_TIME_LIMIT
{
Err(ConsensusError::BlockTimestampInvalid)
} else {
Ok(())
}
let hashing_blob = block.serialize_hashable();
let pow_hash = tokio::task::spawn_blocking(move || {
hash_worker::calculate_pow_hash(
&hashing_blob,
context.chain_height,
&context.current_hard_fork,
)
})
.await
.unwrap()?;
Ok(VerifiedBlockInformation {
block_hash: block.hash(),
block,
txs,
pow_hash,
generated_coins,
weight: block_weight,
height: context.chain_height,
long_term_weight: 0,
hf_vote: HardFork::V1,
})
}

View file

@ -0,0 +1,83 @@
use std::sync::{Arc, OnceLock};
use crypto_bigint::U256;
use futures::stream::{FuturesOrdered, StreamExt};
use monero_serai::{
block::Block,
transaction::{Timelock, Transaction},
};
use crate::{
helper::current_time, transactions::TransactionVerificationData, ConsensusError, Database,
HardFork,
};
mod checks;
mod hash_worker;
pub mod reward;
const BLOCK_SIZE_SANITY_LEEWAY: usize = 100;
const BLOCK_FUTURE_TIME_LIMIT: u64 = 60 * 60 * 2;
/// Returns if the blocks POW hash is valid for the current difficulty.
///
/// See: https://cuprate.github.io/monero-book/consensus_rules/blocks/difficulty.html#checking-a-blocks-proof-of-work
pub fn check_block_pow(hash: &[u8; 32], difficulty: u128) -> bool {
let int_hash = U256::from_le_slice(hash);
let difficulty = U256::from_u128(difficulty);
int_hash.checked_mul(&difficulty).is_some().unwrap_u8() == 1
}
/// Sanity check on the block blob size.
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks.html#block-weight-and-size
fn block_size_sanity_check(
block_blob_len: usize,
effective_median: usize,
) -> Result<(), ConsensusError> {
if block_blob_len > effective_median * 2 + BLOCK_SIZE_SANITY_LEEWAY {
Err(ConsensusError::BlockIsTooLarge)
} else {
Ok(())
}
}
/// Sanity check on the block weight.
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks.html#block-weight-and-siz
fn block_weight_check(
block_weight: usize,
median_for_block_reward: usize,
) -> Result<(), ConsensusError> {
if block_weight > median_for_block_reward * 2 {
Err(ConsensusError::BlockIsTooLarge)
} else {
Ok(())
}
}
/// Verifies the previous id is the last blocks hash
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks.html#previous-id
fn check_prev_id(block: &Block, top_hash: &[u8; 32]) -> Result<(), ConsensusError> {
if &block.header.previous != top_hash {
Err(ConsensusError::BlockIsNotApartOfChain)
} else {
Ok(())
}
}
/// Checks the blocks timestamp is in the valid range.
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks.html#timestamp
fn check_timestamp(block: &Block, median_timestamp: u64) -> Result<(), ConsensusError> {
if block.header.timestamp < median_timestamp
|| block.header.timestamp > current_time() + BLOCK_FUTURE_TIME_LIMIT
{
Err(ConsensusError::BlockTimestampInvalid)
} else {
Ok(())
}
}

View file

@ -1,36 +1,8 @@
use crypto_bigint::{CheckedMul, U256};
use cryptonight_cuprate::{
cryptonight_hash_r, cryptonight_hash_v0, cryptonight_hash_v1, cryptonight_hash_v2,
};
use crate::{hardforks::HardFork, ConsensusError};
#[derive(Debug)]
pub struct BlockPOWInfo {
pub timestamp: u64,
pub cumulative_difficulty: u128,
}
impl BlockPOWInfo {
pub fn new(timestamp: u64, cumulative_difficulty: u128) -> BlockPOWInfo {
BlockPOWInfo {
timestamp,
cumulative_difficulty,
}
}
}
/// Returns if the blocks POW hash is valid for the current difficulty.
///
/// See: https://cuprate.github.io/monero-book/consensus_rules/blocks/difficulty.html#checking-a-blocks-proof-of-work
pub fn check_block_pow(hash: &[u8; 32], difficulty: u128) -> bool {
let int_hash = U256::from_le_slice(hash);
let difficulty = U256::from_u128(difficulty);
int_hash.checked_mul(&difficulty).is_some().unwrap_u8() == 1
}
use crate::{ConsensusError, HardFork};
/// Calcualtes the POW hash of this block.
pub fn calculate_pow_hash(

View file

@ -0,0 +1,183 @@
use monero_serai::ringct::RctType;
use monero_serai::transaction::{Input, Output, Timelock, Transaction};
use crate::{
transactions::{
outputs::{check_output_types, is_decomposed_amount},
TxVersion,
},
ConsensusError, HardFork,
};
const MONEY_SUPPLY: u64 = u64::MAX;
const MINIMUM_REWARD_PER_MIN: u64 = 3 * 10_u64.pow(11);
const MINER_TX_TIME_LOCKED_BLOCKS: u64 = 60;
fn calculate_base_reward(already_generated_coins: u64, hf: &HardFork) -> u64 {
let target_mins = hf.block_time().as_secs() / 60;
let emission_speed_factor = 20 - (target_mins - 1);
((MONEY_SUPPLY - already_generated_coins) >> emission_speed_factor)
.max(MINIMUM_REWARD_PER_MIN * target_mins)
}
pub fn calculate_block_reward(
block_weight: usize,
median_bw: usize,
already_generated_coins: u64,
hf: &HardFork,
) -> u64 {
tracing::info!("bw: {} median: {}", block_weight, median_bw);
let base_reward: u128 = calculate_base_reward(already_generated_coins, hf).into();
if block_weight <= median_bw {
return base_reward.try_into().unwrap();
}
let multiplicand: u128 = ((2 * median_bw - block_weight) * block_weight)
.try_into()
.unwrap();
let effective_median_bw: u128 = median_bw.try_into().unwrap();
(((base_reward * multiplicand) / effective_median_bw) / effective_median_bw)
.try_into()
.unwrap()
}
/// Checks the miner transactions version.
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks/miner_tx.html#version
fn check_tx_version(tx_version: &TxVersion, hf: &HardFork) -> Result<(), ConsensusError> {
if hf >= &HardFork::V12 && tx_version != &TxVersion::RingCT {
Err(ConsensusError::MinerTransaction("Version invalid"))
} else {
Ok(())
}
}
/// Checks the miner transactions inputs.
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks/miner_tx.html#input
/// https://cuprate.github.io/monero-book/consensus_rules/blocks/miner_tx.html#height
fn check_inputs(inputs: &[Input], chain_height: u64) -> Result<(), ConsensusError> {
if inputs.len() != 1 {
return Err(ConsensusError::MinerTransaction(
"does not have exactly 1 input",
));
}
match &inputs[0] {
Input::Gen(height) => {
if height != &chain_height {
Err(ConsensusError::MinerTransaction(
"Height in input is not expected height",
))
} else {
Ok(())
}
}
_ => Err(ConsensusError::MinerTransaction("Input not of type Gen")),
}
}
/// Checks the miner transaction has a correct time lock.
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks/miner_tx.html#unlock-time
fn check_time_lock(time_lock: &Timelock, chain_height: u64) -> Result<(), ConsensusError> {
match time_lock {
Timelock::Block(till_height) => {
if u64::try_from(*till_height).unwrap() != chain_height + MINER_TX_TIME_LOCKED_BLOCKS {
Err(ConsensusError::MinerTransaction(
"Time lock has invalid block height",
))
} else {
Ok(())
}
}
_ => Err(ConsensusError::MinerTransaction(
"Time lock is not a block height",
)),
}
}
/// Sums the outputs checking for overflow.
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks/miner_tx.html#output-amounts
fn sum_outputs(outputs: &[Output], hf: &HardFork) -> Result<u64, ConsensusError> {
let mut sum: u64 = 0;
for out in outputs {
let amt = out.amount.unwrap_or(0);
if hf == &HardFork::V3 && !is_decomposed_amount(amt) {
return Err(ConsensusError::MinerTransaction(
"output amount is not decomposed",
));
}
sum = sum
.checked_add(amt)
.ok_or(ConsensusError::MinerTransaction(
"outputs overflow when summed",
))?;
}
Ok(sum)
}
/// Checks the total outputs amount is correct returning the amount of coins collected by the miner.
///
/// https://cuprate.github.io/monero-book/consensus_rules/blocks/miner_tx.html#total-outputs
fn check_total_output_amt(
total_output: u64,
reward: u64,
fees: u64,
hf: &HardFork,
) -> Result<u64, ConsensusError> {
if hf == &HardFork::V1 || hf >= &HardFork::V12 {
if total_output != reward + fees {
return Err(ConsensusError::MinerTransaction(
"miner transaction does not output correct amt",
));
}
Ok(reward)
} else {
if total_output - fees > reward {
return Err(ConsensusError::MinerTransaction(
"miner transaction does not output correct amt",
));
}
if total_output > reward + fees {
return Err(ConsensusError::MinerTransaction(
"miner transaction does not output correct amt",
));
}
Ok(total_output - fees)
}
}
pub fn check_miner_tx(
tx: &Transaction,
total_fees: u64,
chain_height: u64,
block_weight: usize,
median_bw: usize,
already_generated_coins: u64,
hf: &HardFork,
) -> Result<u64, ConsensusError> {
let tx_version = TxVersion::from_raw(tx.prefix.version)?;
check_tx_version(&tx_version, hf)?;
if hf >= &HardFork::V12 && tx.rct_signatures.rct_type() != RctType::Null {
return Err(ConsensusError::MinerTransaction("RctType is not null"));
}
check_time_lock(&tx.prefix.timelock, chain_height)?;
check_inputs(&tx.prefix.inputs, chain_height)?;
check_output_types(&tx.prefix.outputs, hf)?;
let reward = calculate_block_reward(block_weight, median_bw, already_generated_coins, hf);
let total_outs = sum_outputs(&tx.prefix.outputs, hf)?;
check_total_output_amt(total_outs, reward, total_fees, hf)
}

View file

@ -1,31 +0,0 @@
use crate::hardforks::HardFork;
const MONEY_SUPPLY: u64 = u64::MAX;
const MINIMUM_REWARD_PER_MIN: u64 = 3 * 10_u64.pow(11);
fn calculate_base_reward(already_generated_coins: u64, hf: &HardFork) -> u64 {
let target_mins = hf.block_time().as_secs() / 60;
let emission_speed_factor = 20 - (target_mins - 1);
((MONEY_SUPPLY - already_generated_coins) >> emission_speed_factor)
.max(MINIMUM_REWARD_PER_MIN * target_mins)
}
pub fn calculate_block_reward(
block_weight: u64,
effective_median_bw: u64,
already_generated_coins: u64,
hf: &HardFork,
) -> u64 {
let base_reward = calculate_base_reward(already_generated_coins, hf);
let multiplicand = (2 * effective_median_bw - block_weight) * block_weight;
let effective_median_bw: u128 = effective_median_bw.into();
((mul_128(base_reward, multiplicand) / effective_median_bw) / effective_median_bw)
.try_into()
.unwrap()
}
fn mul_128(a: u64, b: u64) -> u128 {
a as u128 * b as u128
}

View file

@ -49,15 +49,20 @@ pub async fn initialize_blockchain_context<D>(
cfg: ContextConfig,
mut database: D,
) -> Result<
impl Service<
BlockChainContextRequest,
Response = BlockChainContext,
Error = tower::BoxError,
Future = impl Future<Output = Result<BlockChainContext, tower::BoxError>> + Send + 'static,
> + Clone
+ Send
+ Sync
+ 'static,
(
impl Service<
BlockChainContextRequest,
Response = BlockChainContext,
Error = tower::BoxError,
Future = impl Future<Output = Result<BlockChainContext, tower::BoxError>>
+ Send
+ 'static,
> + Clone
+ Send
+ Sync
+ 'static,
impl Service<UpdateBlockchainCacheRequest, Response = (), Error = tower::BoxError>,
),
ConsensusError,
>
where
@ -81,6 +86,15 @@ where
panic!("Database sent incorrect response!");
};
let DatabaseResponse::GeneratedCoins(already_generated_coins) = database
.ready()
.await?
.call(DatabaseRequest::GeneratedCoins)
.await?
else {
panic!("Database sent incorrect response!");
};
let db = database.clone();
let difficulty_cache_handle = tokio::spawn(async move {
difficulty::DifficultyCache::init_from_chain_height(chain_height, difficulty_cfg, db).await
@ -100,14 +114,17 @@ where
difficulty_cache: Arc::new(difficulty_cache_handle.await.unwrap()?.into()),
weight_cache: Arc::new(weight_cache_handle.await.unwrap()?.into()),
hardfork_state: Arc::new(hardfork_state_handle.await.unwrap()?.into()),
chain_height,
top_block_hash,
chain_height: Arc::new(chain_height.into()),
already_generated_coins: Arc::new(already_generated_coins.into()),
top_block_hash: Arc::new(top_block_hash.into()),
database,
};
let context_svc_update = context_svc.clone();
let buffered_svc = Buffer::new(context_svc.boxed(), BUFFER_CONTEXT_CHANNEL_SIZE);
Ok(buffered_svc)
Ok((buffered_svc.clone(), context_svc_update))
}
#[derive(Debug, Clone, Copy)]
@ -121,11 +138,13 @@ pub struct BlockChainContext {
/// The median long term block weight.
median_long_term_weight: usize,
/// Median weight to use for block reward calculations.
median_weight_for_block_reward: usize,
pub median_weight_for_block_reward: usize,
/// The amount of coins minted already.
pub already_generated_coins: u64,
/// Timestamp to use to check time locked outputs.
time_lock_timestamp: u64,
/// The height of the chain.
chain_height: u64,
pub chain_height: u64,
/// The top blocks hash
top_hash: [u8; 32],
/// The current hard fork.
@ -135,13 +154,15 @@ pub struct BlockChainContext {
#[derive(Debug, Clone)]
pub struct BlockChainContextRequest;
#[derive(Clone)]
pub struct BlockChainContextService<D> {
difficulty_cache: Arc<RwLock<difficulty::DifficultyCache>>,
weight_cache: Arc<RwLock<weight::BlockWeightsCache>>,
hardfork_state: Arc<RwLock<hardforks::HardForkState>>,
chain_height: u64,
top_block_hash: [u8; 32],
chain_height: Arc<RwLock<u64>>,
top_block_hash: Arc<RwLock<[u8; 32]>>,
already_generated_coins: Arc<RwLock<u64>>,
database: D,
}
@ -161,8 +182,9 @@ impl<D> Service<BlockChainContextRequest> for BlockChainContextService<D> {
let difficulty_cache = self.difficulty_cache.clone();
let weight_cache = self.weight_cache.clone();
let chain_height = self.chain_height;
let top_hash = self.top_block_hash;
let chain_height = self.chain_height.clone();
let top_hash = self.top_block_hash.clone();
let already_generated_coins = self.already_generated_coins.clone();
async move {
let hardfork_state = hardfork_state.read().await;
@ -177,9 +199,10 @@ impl<D> Service<BlockChainContextRequest> for BlockChainContextService<D> {
effective_median_weight: weight_cache.effective_median_block_weight(&current_hf),
median_long_term_weight: weight_cache.median_long_term_weight(),
median_weight_for_block_reward: weight_cache.median_for_block_reward(&current_hf),
already_generated_coins: *already_generated_coins.read().await,
time_lock_timestamp: 0, //TODO:
chain_height,
top_hash,
chain_height: *chain_height.read().await,
top_hash: *top_hash.read().await,
current_hard_fork: current_hf,
})
}
@ -188,12 +211,13 @@ impl<D> Service<BlockChainContextRequest> for BlockChainContextService<D> {
}
pub struct UpdateBlockchainCacheRequest {
new_top_hash: [u8; 32],
height: u64,
timestamp: u64,
weight: usize,
long_term_weight: usize,
vote: HardFork,
pub new_top_hash: [u8; 32],
pub height: u64,
pub timestamp: u64,
pub weight: usize,
pub long_term_weight: usize,
pub generated_coins: u64,
pub vote: HardFork,
}
impl<D> tower::Service<UpdateBlockchainCacheRequest> for BlockChainContextService<D>
@ -202,7 +226,7 @@ where
D::Future: Send + 'static,
{
type Response = ();
type Error = ConsensusError;
type Error = tower::BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
@ -215,6 +239,10 @@ where
let difficulty_cache = self.difficulty_cache.clone();
let weight_cache = self.weight_cache.clone();
let chain_height = self.chain_height.clone();
let top_hash = self.top_block_hash.clone();
let already_generated_coins = self.already_generated_coins.clone();
let database = self.database.clone();
async move {
@ -241,6 +269,11 @@ where
.new_block(new.vote, new.height, database)
.await?;
*chain_height.write().await = new.height + 1;
*top_hash.write().await = new.new_top_hash;
let mut already_generated_coins = already_generated_coins.write().await;
*already_generated_coins = already_generated_coins.saturating_add(new.generated_coins);
Ok(())
}
.boxed()

View file

@ -131,8 +131,10 @@ impl DifficultyCache {
assert_eq!(self.last_accounted_height + 1, height);
self.last_accounted_height += 1;
self.timestamps.pop_front();
self.timestamps.push_back(timestamp);
if u64::try_from(self.timestamps.len()).unwrap() > self.config.total_block_count() {
self.timestamps.pop_front();
}
self.update_windowed_work(database).await?;

View file

@ -177,24 +177,26 @@ impl BlockWeightsCache {
Ok(idx) | Err(idx) => self.long_term_weights.insert(idx, long_term_weight),
};
if let Some(height_to_remove) = block_height.checked_sub(self.config.long_term_window) {
tracing::debug!(
"Block {} is out of the long term weight window, removing it",
height_to_remove
);
let DatabaseResponse::BlockExtendedHeader(ext_header) = database
.oneshot(DatabaseRequest::BlockExtendedHeader(
height_to_remove.into(),
))
.await?
else {
panic!("Database sent incorrect response!");
};
let idx = self
.long_term_weights
.binary_search(&ext_header.long_term_weight)
.expect("Weight must be in list if in the window");
self.long_term_weights.remove(idx);
if u64::try_from(self.long_term_weights.len()).unwrap() > self.config.long_term_window {
if let Some(height_to_remove) = block_height.checked_sub(self.config.long_term_window) {
tracing::debug!(
"Block {} is out of the long term weight window, removing it",
height_to_remove
);
let DatabaseResponse::BlockExtendedHeader(ext_header) = database
.oneshot(DatabaseRequest::BlockExtendedHeader(
height_to_remove.into(),
))
.await?
else {
panic!("Database sent incorrect response!");
};
let idx = self
.long_term_weights
.binary_search(&ext_header.long_term_weight)
.expect("Weight must be in list if in the window");
self.long_term_weights.remove(idx);
}
}
self.short_term_block_weights.push_back(block_weight);
@ -244,6 +246,7 @@ impl BlockWeightsCache {
} else {
self.effective_median_block_weight(hf)
}
.max(penalty_free_zone(hf))
}
}
@ -253,7 +256,7 @@ fn calculate_effective_median_block_weight(
sorted_long_term_window: &[usize],
) -> usize {
if hf.in_range(&HardFork::V1, &HardFork::V10) {
return median(sorted_short_term_window);
return median(sorted_short_term_window).max(penalty_free_zone(hf));
}
let long_term_median = median(sorted_long_term_window).max(PENALTY_FREE_ZONE_5);

View file

@ -4,22 +4,26 @@ pub mod block;
pub mod context;
pub mod genesis;
mod helper;
pub mod miner_tx;
#[cfg(feature = "binaries")]
pub mod rpc;
pub mod transactions;
pub use block::VerifyBlockRequest;
pub use context::{ContextConfig, HardFork};
pub use transactions::VerifyTxRequest;
pub use block::{VerifiedBlockInformation, VerifyBlockRequest};
pub use context::{ContextConfig, HardFork, UpdateBlockchainCacheRequest};
pub use transactions::{VerifyTxRequest, VerifyTxResponse};
pub async fn initialize_verifier<D>(
database: D,
cfg: ContextConfig,
) -> Result<
(
impl tower::Service<VerifyBlockRequest, Response = (), Error = ConsensusError>,
impl tower::Service<VerifyTxRequest, Response = (), Error = ConsensusError>,
impl tower::Service<
VerifyBlockRequest,
Response = VerifiedBlockInformation,
Error = ConsensusError,
>,
impl tower::Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>,
impl tower::Service<UpdateBlockchainCacheRequest, Response = (), Error = tower::BoxError>,
),
ConsensusError,
>
@ -27,14 +31,16 @@ where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
let context_svc = context::initialize_blockchain_context(cfg, database.clone()).await?;
let (context_svc, context_svc_updater) = context::initialize_blockchain_context(cfg, database.clone()).await?;
let tx_svc = transactions::TxVerifierService::new(database);
let block_svc = block::BlockVerifierService::new(context_svc, tx_svc.clone());
Ok((block_svc, tx_svc))
let block_svc = block::BlockVerifierService::new(context_svc.clone(), tx_svc.clone());
Ok((block_svc, tx_svc, context_svc_updater))
}
#[derive(Debug, thiserror::Error)]
pub enum ConsensusError {
#[error("Miner transaction invalid: {0}")]
MinerTransaction(&'static str),
#[error("Transaction sig invalid: {0}")]
TransactionSignatureInvalid(&'static str),
#[error("Transaction inputs overflow")]
@ -101,6 +107,7 @@ pub enum DatabaseRequest {
BlockExtendedHeaderInRange(std::ops::Range<u64>),
ChainHeight,
GeneratedCoins,
Outputs(HashMap<u64, HashSet<u64>>),
NumberOutputsWithAmount(u64),
@ -117,6 +124,7 @@ pub enum DatabaseResponse {
BlockExtendedHeaderInRange(Vec<ExtendedBlockHeader>),
ChainHeight(u64, [u8; 32]),
GeneratedCoins(u64),
Outputs(HashMap<u64, HashMap<u64, OutputOnChain>>),
NumberOutputsWithAmount(usize),

View file

@ -1 +0,0 @@

View file

@ -1,4 +1,6 @@
use curve25519_dalek::edwards::CompressedEdwardsY;
use std::cmp::min;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::ops::Range;
use std::pin::Pin;
@ -8,23 +10,24 @@ use std::task::{Context, Poll};
use futures::lock::{OwnedMutexGuard, OwnedMutexLockFuture};
use futures::{stream::FuturesOrdered, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use monero_serai::rpc::{HttpRpc, RpcConnection, RpcError};
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tower::balance::p2c::Balance;
use tower::util::BoxService;
use tower::ServiceExt;
use tracing::Instrument;
use tracing::{instrument, Instrument};
use cuprate_common::BlockID;
use monero_wire::common::{BlockCompleteEntry, TransactionBlobs};
use crate::block::pow::BlockPOWInfo;
use crate::block::weight::BlockWeightInfo;
use crate::hardforks::BlockHFInfo;
use crate::{DatabaseRequest, DatabaseResponse};
use crate::{DatabaseRequest, DatabaseResponse, ExtendedBlockHeader, HardFork, OutputOnChain};
pub mod cache;
mod discover;
use cache::ScanningCache;
#[derive(Debug, Copy, Clone)]
pub struct RpcConfig {
pub max_blocks_per_node: u64,
@ -68,6 +71,7 @@ impl<Req: Clone, Res, E> tower::retry::Policy<Req, Res, E> for Attempts {
pub fn init_rpc_load_balancer(
addresses: Vec<String>,
cache: Arc<RwLock<ScanningCache>>,
config: Arc<RwLock<RpcConfig>>,
) -> impl tower::Service<
DatabaseRequest,
@ -80,19 +84,23 @@ pub fn init_rpc_load_balancer(
let (rpc_discoverer_tx, rpc_discoverer_rx) = futures::channel::mpsc::channel(30);
let rpc_balance = Balance::new(rpc_discoverer_rx.map(Result::<_, tower::BoxError>::Ok));
let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(rpc_balance), 3);
let rpcs = tower::retry::Retry::new(Attempts(2), rpc_buffer);
let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(rpc_balance), 30);
//let rpcs = tower::retry::Retry::new(Attempts(2), rpc_buffer);
let discover = discover::RPCDiscover {
rpc: rpcs.clone(),
rpc: rpc_buffer.clone(),
initial_list: addresses,
ok_channel: rpc_discoverer_tx,
already_connected: Default::default(),
cache,
};
tokio::spawn(discover.run());
RpcBalancer { rpcs, config }
RpcBalancer {
rpcs: rpc_buffer,
config,
}
}
#[derive(Clone)]
@ -141,9 +149,9 @@ where
config.max_blocks_per_node,
)
}
DatabaseRequest::BlockPOWInfoInRange(range) => {
DatabaseRequest::BlockExtendedHeaderInRange(range) => {
let resp_to_ret = |resp: DatabaseResponse| {
let DatabaseResponse::BlockPOWInfoInRange(pow_info) = resp else {
let DatabaseResponse::BlockExtendedHeaderInRange(pow_info) = resp else {
panic!("Database sent incorrect response");
};
pow_info
@ -151,41 +159,8 @@ where
split_range_request(
this,
range,
DatabaseRequest::BlockPOWInfoInRange,
DatabaseResponse::BlockPOWInfoInRange,
resp_to_ret,
config.max_block_headers_per_node,
)
}
DatabaseRequest::BlockWeightsInRange(range) => {
let resp_to_ret = |resp: DatabaseResponse| {
let DatabaseResponse::BlockWeightsInRange(weights) = resp else {
panic!("Database sent incorrect response");
};
weights
};
split_range_request(
this,
range,
DatabaseRequest::BlockWeightsInRange,
DatabaseResponse::BlockWeightsInRange,
resp_to_ret,
config.max_block_headers_per_node,
)
}
DatabaseRequest::BlockHfInfoInRange(range) => {
let resp_to_ret = |resp: DatabaseResponse| {
let DatabaseResponse::BlockHfInfoInRange(hf_info) = resp else {
panic!("Database sent incorrect response");
};
hf_info
};
split_range_request(
this,
range,
DatabaseRequest::BlockHfInfoInRange,
DatabaseResponse::BlockHfInfoInRange,
DatabaseRequest::BlockExtendedHeaderInRange,
DatabaseResponse::BlockExtendedHeaderInRange,
resp_to_ret,
config.max_block_headers_per_node,
)
@ -244,16 +219,18 @@ pub struct Rpc<R: RpcConnection> {
rpc: Arc<futures::lock::Mutex<monero_serai::rpc::Rpc<R>>>,
addr: String,
rpc_state: RpcState<R>,
cache: Arc<RwLock<ScanningCache>>,
error_slot: Arc<Mutex<Option<RpcError>>>,
}
impl Rpc<HttpRpc> {
pub fn new_http(addr: String) -> Rpc<HttpRpc> {
pub fn new_http(addr: String, cache: Arc<RwLock<ScanningCache>>) -> Rpc<HttpRpc> {
let http_rpc = HttpRpc::new(addr.clone()).unwrap();
Rpc {
rpc: Arc::new(futures::lock::Mutex::new(http_rpc)),
addr,
rpc_state: RpcState::Locked,
cache,
error_slot: Arc::new(Mutex::new(None)),
}
}
@ -288,12 +265,13 @@ impl<R: RpcConnection + Send + Sync + 'static> tower::Service<DatabaseRequest> f
panic!("poll_ready was not called first!");
};
let cache = self.cache.clone();
let span = tracing::info_span!("rpc_request", addr = &self.addr);
let err_slot = self.error_slot.clone();
match req {
_ => todo!(),
DatabaseRequest::BlockHash(height) => async move {
let res: Result<_, RpcError> = rpc
.get_block_hash(height as usize)
@ -307,45 +285,134 @@ impl<R: RpcConnection + Send + Sync + 'static> tower::Service<DatabaseRequest> f
.instrument(span)
.boxed(),
DatabaseRequest::ChainHeight => async move {
let res: Result<_, RpcError> = rpc
.get_height()
.map_ok(|height| DatabaseResponse::ChainHeight(height.try_into().unwrap()))
.await;
if let Err(e) = &res {
*err_slot.lock().unwrap() = Some(e.clone());
}
res.map_err(Into::into)
let height = cache.read().unwrap().height;
let hash = rpc
.get_block_hash((height - 1) as usize)
.await
.map_err(Into::<tower::BoxError>::into)?;
Ok(DatabaseResponse::ChainHeight(height, hash))
}
.instrument(span)
.boxed(),
DatabaseRequest::BlockPOWInfo(id) => {
get_blocks_pow_info(id, rpc).instrument(span).boxed()
DatabaseRequest::GeneratedCoins => async move {
Ok(DatabaseResponse::GeneratedCoins(
cache.read().unwrap().already_generated_coins,
))
}
DatabaseRequest::BlockWeights(id) => {
get_blocks_weight_info(id, rpc).instrument(span).boxed()
.instrument(span)
.boxed(),
DatabaseRequest::BlockExtendedHeader(id) => {
get_block_info(id, rpc).instrument(span).boxed()
}
DatabaseRequest::BlockHFInfo(id) => {
get_blocks_hf_info(id, rpc).instrument(span).boxed()
DatabaseRequest::BlockExtendedHeaderInRange(range) => {
get_block_info_in_range(range, rpc).instrument(span).boxed()
}
DatabaseRequest::BlockHfInfoInRange(range) => get_blocks_hf_info_in_range(range, rpc)
.instrument(span)
.boxed(),
DatabaseRequest::BlockWeightsInRange(range) => {
get_blocks_weight_info_in_range(range, rpc)
.instrument(span)
.boxed()
}
DatabaseRequest::BlockPOWInfoInRange(range) => get_blocks_pow_info_in_range(range, rpc)
.instrument(span)
.boxed(),
DatabaseRequest::BlockBatchInRange(range) => {
get_blocks_in_range(range, rpc).instrument(span).boxed()
}
DatabaseRequest::Outputs(out_ids) => {
get_outputs(out_ids, cache, rpc).instrument(span).boxed()
}
DatabaseRequest::NumberOutputsWithAmount(amt) => async move {
Ok(DatabaseResponse::NumberOutputsWithAmount(
cache.read().unwrap().numb_outs(amt) as usize,
))
}
.boxed(),
}
}
}
#[instrument(skip_all)]
async fn get_outputs<R: RpcConnection>(
out_ids: HashMap<u64, HashSet<u64>>,
cache: Arc<RwLock<ScanningCache>>,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> {
tracing::info!(
"Getting outputs len: {}",
out_ids.values().map(|amt_map| amt_map.len()).sum::<usize>()
);
#[derive(Serialize, Copy, Clone)]
struct OutputID {
amount: u64,
index: u64,
}
#[derive(Serialize, Clone)]
struct Request {
outputs: Vec<OutputID>,
}
#[derive(Deserialize)]
struct OutputRes {
height: u64,
key: [u8; 32],
mask: [u8; 32],
txid: [u8; 32],
}
#[derive(Deserialize)]
struct Response {
outs: Vec<OutputRes>,
}
let outputs = out_ids
.into_iter()
.flat_map(|(amt, amt_map)| {
amt_map
.into_iter()
.map(|amt_idx| OutputID {
amount: amt,
index: amt_idx,
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
let res = rpc
.bin_call(
"get_outs.bin",
monero_epee_bin_serde::to_bytes(&Request {
outputs: outputs.clone(),
})?,
)
.await?;
let outs: Response = monero_epee_bin_serde::from_bytes(&res)?;
tracing::info!("Got outputs len: {}", outs.outs.len());
let mut ret = HashMap::new();
let cache = cache.read().unwrap();
for (out, idx) in outs.outs.iter().zip(outputs) {
ret.entry(idx.amount).or_insert_with(HashMap::new).insert(
idx.index,
OutputOnChain {
height: out.height,
time_lock: cache.outputs_time_lock(&out.txid),
// we unwrap these as we are checking already approved rings so if these points are bad
// then a bad proof has been approved.
key: CompressedEdwardsY::from_slice(&out.key)
.unwrap()
.decompress()
.unwrap(),
mask: CompressedEdwardsY::from_slice(&out.mask)
.unwrap()
.decompress()
.unwrap(),
},
);
}
Ok(DatabaseResponse::Outputs(ret))
}
async fn get_blocks_in_range<R: RpcConnection>(
range: Range<u64>,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
@ -376,14 +443,14 @@ async fn get_blocks_in_range<R: RpcConnection>(
Ok(DatabaseResponse::BlockBatchInRange(
blocks
.blocks
.into_iter()
.into_par_iter()
.map(|b| {
Ok((
monero_serai::block::Block::read(&mut b.block.as_slice())?,
match b.txs {
TransactionBlobs::Pruned(_) => return Err("node sent pruned txs!".into()),
TransactionBlobs::Normal(txs) => txs
.into_iter()
.into_par_iter()
.map(|tx| {
monero_serai::transaction::Transaction::read(&mut tx.as_slice())
})
@ -411,7 +478,7 @@ struct BlockInfo {
async fn get_block_info_in_range<R: RpcConnection>(
range: Range<u64>,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<Vec<BlockInfo>, tower::BoxError> {
) -> Result<DatabaseResponse, tower::BoxError> {
#[derive(Deserialize, Debug)]
struct Response {
headers: Vec<BlockInfo>,
@ -426,48 +493,18 @@ async fn get_block_info_in_range<R: RpcConnection>(
tracing::info!("Retrieved block headers in range: {:?}", range);
Ok(res.headers)
}
async fn get_block_info<R: RpcConnection>(
id: BlockID,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<BlockInfo, tower::BoxError> {
tracing::info!("Retrieving block info with id: {}", id);
#[derive(Deserialize, Debug)]
struct Response {
block_header: BlockInfo,
}
match id {
BlockID::Height(height) => {
let res = rpc
.json_rpc_call::<Response>(
"get_block_header_by_height",
Some(json!({"height": height})),
)
.await?;
Ok(res.block_header)
}
BlockID::Hash(hash) => {
let res = rpc
.json_rpc_call::<Response>("get_block_header_by_hash", Some(json!({"hash": hash})))
.await?;
Ok(res.block_header)
}
}
}
async fn get_blocks_weight_info_in_range<R: RpcConnection>(
range: Range<u64>,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> {
let info = get_block_info_in_range(range, rpc).await?;
Ok(DatabaseResponse::BlockWeightsInRange(
info.into_iter()
.map(|info| BlockWeightInfo {
Ok(DatabaseResponse::BlockExtendedHeaderInRange(
res.headers
.into_iter()
.map(|info| ExtendedBlockHeader {
version: HardFork::from_version(&info.major_version)
.expect("previously checked block has incorrect version"),
vote: HardFork::from_vote(&info.minor_version),
timestamp: info.timestamp,
cumulative_difficulty: u128_from_low_high(
info.cumulative_difficulty,
info.cumulative_difficulty_top64,
),
block_weight: info.block_weight,
long_term_weight: info.long_term_weight,
})
@ -475,49 +512,46 @@ async fn get_blocks_weight_info_in_range<R: RpcConnection>(
))
}
async fn get_blocks_pow_info_in_range<R: RpcConnection>(
range: Range<u64>,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> {
let info = get_block_info_in_range(range, rpc).await?;
Ok(DatabaseResponse::BlockPOWInfoInRange(
info.into_iter()
.map(|info| BlockPOWInfo {
timestamp: info.timestamp,
cumulative_difficulty: u128_from_low_high(
info.cumulative_difficulty,
info.cumulative_difficulty_top64,
),
})
.collect(),
))
}
async fn get_blocks_weight_info<R: RpcConnection>(
async fn get_block_info<R: RpcConnection>(
id: BlockID,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> {
let info = get_block_info(id, rpc).await?;
tracing::info!("Retrieving block info with id: {}", id);
Ok(DatabaseResponse::BlockWeights(BlockWeightInfo {
block_weight: info.block_weight,
long_term_weight: info.long_term_weight,
}))
}
#[derive(Deserialize, Debug)]
struct Response {
block_header: BlockInfo,
}
async fn get_blocks_pow_info<R: RpcConnection>(
id: BlockID,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> {
let info = get_block_info(id, rpc).await?;
let info = match id {
BlockID::Height(height) => {
let res = rpc
.json_rpc_call::<Response>(
"get_block_header_by_height",
Some(json!({"height": height})),
)
.await?;
res.block_header
}
BlockID::Hash(hash) => {
let res = rpc
.json_rpc_call::<Response>("get_block_header_by_hash", Some(json!({"hash": hash})))
.await?;
res.block_header
}
};
Ok(DatabaseResponse::BlockPOWInfo(BlockPOWInfo {
Ok(DatabaseResponse::BlockExtendedHeader(ExtendedBlockHeader {
version: HardFork::from_version(&info.major_version)
.expect("previously checked block has incorrect version"),
vote: HardFork::from_vote(&info.minor_version),
timestamp: info.timestamp,
cumulative_difficulty: u128_from_low_high(
info.cumulative_difficulty,
info.cumulative_difficulty_top64,
),
block_weight: info.block_weight,
long_term_weight: info.long_term_weight,
}))
}
@ -525,43 +559,3 @@ fn u128_from_low_high(low: u64, high: u64) -> u128 {
let res: u128 = high as u128;
res << 64 | low as u128
}
async fn get_blocks_hf_info<R: RpcConnection>(
id: BlockID,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> {
let info = get_block_info(id, rpc).await?;
Ok(DatabaseResponse::BlockHFInfo(
BlockHFInfo::from_major_minor(info.major_version, info.minor_version)?,
))
}
async fn get_blocks_hf_info_in_range<R: RpcConnection>(
range: Range<u64>,
rpc: OwnedMutexGuard<monero_serai::rpc::Rpc<R>>,
) -> Result<DatabaseResponse, tower::BoxError> {
let info = get_block_info_in_range(range, rpc).await?;
Ok(DatabaseResponse::BlockHfInfoInRange(
info.into_iter()
.map(|info| {
BlockHFInfo::from_major_minor(info.major_version, info.minor_version).unwrap()
})
.collect(),
))
}
#[tokio::test]
async fn t() {
let rpc = Rpc::new_http("http://node.c3pool.com:18081".to_string());
let res: serde_json::Value = rpc
.rpc
.try_lock()
.unwrap()
.json_rpc_call("get_connections", None)
.await
.unwrap();
println!("{res}");
}

101
consensus/src/rpc/cache.rs Normal file
View file

@ -0,0 +1,101 @@
use std::{
collections::HashMap,
fmt::{Display, Formatter},
sync::Arc,
};
use monero_serai::{
block::Block,
transaction::{Timelock, Transaction},
};
use crate::transactions::TransactionVerificationData;
use cuprate_common::Network;
/// A cache which can keep chain state while scanning.
///
/// Because we are using a RPC interface with a node we need to keep track
/// of certain data that the node doesn't hold or give us like the number
/// of outputs at a certain time.
#[derive(Debug, Default, Clone)]
pub struct ScanningCache {
network: Network,
numb_outs: HashMap<u64, u64>,
time_locked_out: HashMap<[u8; 32], Timelock>,
pub already_generated_coins: u64,
/// The height of the *next* block to scan.
pub height: u64,
}
impl ScanningCache {
pub fn add_new_block_data(
&mut self,
generated_coins: u64,
miner_tx: &Transaction,
txs: &[Arc<TransactionVerificationData>],
) {
self.add_tx_time_lock(miner_tx.hash(), miner_tx.prefix.timelock);
miner_tx
.prefix
.outputs
.iter()
.for_each(|out| self.add_outs(out.amount.unwrap_or(0), 1));
txs.iter().for_each(|tx| {
self.add_tx_time_lock(tx.tx_hash, tx.tx.prefix.timelock);
tx.tx
.prefix
.outputs
.iter()
.for_each(|out| self.add_outs(out.amount.unwrap_or(0), 1));
});
self.already_generated_coins = self.already_generated_coins.saturating_add(generated_coins);
self.height += 1;
}
pub fn outputs_time_lock(&self, tx: &[u8; 32]) -> Timelock {
self.time_locked_out
.get(tx)
.copied()
.unwrap_or(Timelock::None)
}
pub fn add_tx_time_lock(&mut self, tx: [u8; 32], time_lock: Timelock) {
match time_lock {
Timelock::None => (),
lock => {
self.time_locked_out.insert(tx, lock);
}
}
}
pub fn total_outs(&self) -> u64 {
self.numb_outs.values().sum()
}
pub fn numb_outs(&self, amount: u64) -> u64 {
*self.numb_outs.get(&amount).unwrap_or(&0)
}
pub fn add_outs(&mut self, amount: u64, count: u64) {
if let Some(numb_outs) = self.numb_outs.get_mut(&amount) {
*numb_outs += count;
} else {
self.numb_outs.insert(amount, count);
}
}
}
impl Display for ScanningCache {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let rct_outs = self.numb_outs(0);
let total_outs = self.total_outs();
f.debug_struct("Cache")
.field("next_block", &self.height)
.field("rct_outs", &rct_outs)
.field("total_outs", &total_outs)
.finish()
}
}

View file

@ -1,4 +1,5 @@
use std::collections::HashSet;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use futures::channel::mpsc::SendError;
@ -11,11 +12,11 @@ use tower::load::PeakEwma;
use tower::ServiceExt;
use tracing::instrument;
use super::Rpc;
use super::{cache::ScanningCache, Rpc};
use crate::Database;
#[instrument]
async fn check_rpc(addr: String) -> Option<Rpc<HttpRpc>> {
#[instrument(skip(cache))]
async fn check_rpc(addr: String, cache: Arc<RwLock<ScanningCache>>) -> Option<Rpc<HttpRpc>> {
tracing::debug!("Sending request to node.");
let rpc = HttpRpc::new(addr.clone()).ok()?;
// make sure the RPC is actually reachable
@ -26,7 +27,7 @@ async fn check_rpc(addr: String) -> Option<Rpc<HttpRpc>> {
tracing::debug!("Node sent ok response.");
Some(Rpc::new_http(addr))
Some(Rpc::new_http(addr, cache))
}
pub(crate) struct RPCDiscover<T> {
@ -34,6 +35,7 @@ pub(crate) struct RPCDiscover<T> {
pub initial_list: Vec<String>,
pub ok_channel: mpsc::Sender<Change<usize, PeakEwma<Rpc<HttpRpc>>>>,
pub already_connected: HashSet<String>,
pub cache: Arc<RwLock<ScanningCache>>,
}
impl<T: Database> RPCDiscover<T> {
@ -63,7 +65,11 @@ impl<T: Database> RPCDiscover<T> {
pub async fn run(mut self) {
if !self.initial_list.is_empty() {
let mut fut = FuturesUnordered::from_iter(self.initial_list.drain(..).map(check_rpc));
let mut fut = FuturesUnordered::from_iter(
self.initial_list
.drain(..)
.map(|addr| check_rpc(addr, self.cache.clone())),
);
while let Some(res) = fut.next().await {
if let Some(rpc) = res {

View file

@ -1,14 +1,27 @@
use monero_serai::transaction::Transaction;
use sha3::{Digest, Keccak256};
use std::ops::Deref;
use std::{
collections::HashSet,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use crate::{hardforks::HardFork, ConsensusError, Database};
use futures::FutureExt;
use monero_serai::transaction::Transaction;
use rayon::prelude::*;
use tower::Service;
use tracing::instrument;
use crate::{ConsensusError, Database, HardFork};
mod inputs;
mod outputs;
mod signatures;
pub(crate) mod outputs;
mod ring;
mod sigs;
mod time_lock;
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub enum TxVersion {
RingSignatures,
RingCT,
@ -26,42 +39,255 @@ impl TxVersion {
/// Data needed to verify a transaction.
///
#[derive(Debug)]
pub struct TransactionVerificationData {
tx: Transaction,
version: TxVersion,
tx_blob: Vec<u8>,
tx_weight: usize,
tx_hash: [u8; 32],
rings: signatures::Rings,
pub tx: Transaction,
pub version: TxVersion,
pub tx_blob: Vec<u8>,
pub tx_weight: usize,
pub fee: u64,
pub tx_hash: [u8; 32],
/// We put this behind a mutex as the information is not constant and is based of past outputs idxs
/// which could change on re-orgs.
rings_member_info: std::sync::Mutex<Option<ring::TxRingMembersInfo>>,
}
impl TransactionVerificationData {
pub fn new(
tx: Transaction,
rings: signatures::Rings,
) -> Result<TransactionVerificationData, ConsensusError> {
let tx_blob = tx.serialize();
pub fn new(tx: Transaction) -> Result<TransactionVerificationData, ConsensusError> {
Ok(TransactionVerificationData {
tx_hash: Keccak256::digest(&tx_blob).into(),
tx_blob,
tx_hash: tx.hash(),
tx_blob: tx.serialize(),
tx_weight: tx.weight(),
rings,
fee: tx.rct_signatures.base.fee,
rings_member_info: std::sync::Mutex::new(None),
version: TxVersion::from_raw(tx.prefix.version)?,
tx,
})
}
}
pub async fn batch_new<D: Database + Clone>(
pub enum VerifyTxRequest {
/// Verifies transactions in the context of a block.
Block {
txs: Vec<Arc<TransactionVerificationData>>,
current_chain_height: u64,
hf: HardFork,
},
/// Batches the setup of [`TransactionVerificationData`] and verifies the transactions
/// in the context of a block.
BatchSetupVerifyBlock {
txs: Vec<Transaction>,
hf: &HardFork,
database: D,
) -> Result<Vec<TransactionVerificationData>, ConsensusError> {
let rings = signatures::batch_get_rings(&txs, hf, database.clone()).await?;
current_chain_height: u64,
hf: HardFork,
},
}
txs.into_iter()
.zip(rings.into_iter())
.map(|(tx, ring)| TransactionVerificationData::new(tx, ring))
.collect()
pub enum VerifyTxResponse {
BatchSetupOk(Vec<Arc<TransactionVerificationData>>),
Ok,
}
#[derive(Clone)]
pub struct TxVerifierService<D: Clone> {
database: D,
}
impl<D> TxVerifierService<D>
where
D: Database + Clone + Send + 'static,
D::Future: Send + 'static,
{
pub fn new(database: D) -> TxVerifierService<D> {
TxVerifierService { database }
}
}
impl<D> Service<VerifyTxRequest> for TxVerifierService<D>
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
type Response = VerifyTxResponse;
type Error = ConsensusError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.database.poll_ready(cx).map_err(Into::into)
}
fn call(&mut self, req: VerifyTxRequest) -> Self::Future {
let database = self.database.clone();
match req {
VerifyTxRequest::Block {
txs,
current_chain_height,
hf,
} => verify_transactions_for_block(database, txs, current_chain_height, hf).boxed(),
VerifyTxRequest::BatchSetupVerifyBlock {
txs,
current_chain_height,
hf,
} => batch_setup_verify_transactions_for_block(database, txs, current_chain_height, hf)
.boxed(),
}
}
}
async fn set_missing_ring_members<D>(
database: D,
txs: &[Arc<TransactionVerificationData>],
hf: &HardFork,
) -> Result<(), ConsensusError>
where
D: Database + Clone + Sync + Send + 'static,
{
// TODO: handle re-orgs.
let txs_needing_ring_members = txs
.iter()
// Safety: we must not hold the mutex lock for long to not block the async runtime.
.filter(|tx| tx.rings_member_info.lock().unwrap().is_none())
.cloned()
.collect::<Vec<_>>();
tracing::debug!(
"Retrieving ring members for {} txs",
txs_needing_ring_members.len()
);
ring::batch_fill_ring_member_info(&txs_needing_ring_members, hf, database).await?;
Ok(())
}
async fn batch_setup_verify_transactions_for_block<D>(
database: D,
txs: Vec<Transaction>,
current_chain_height: u64,
hf: HardFork,
) -> Result<VerifyTxResponse, ConsensusError>
where
D: Database + Clone + Sync + Send + 'static,
{
// Move out of the async runtime and use rayon to parallelize the serialisation and hashing of the txs.
let txs = tokio::task::spawn_blocking(|| {
txs.into_par_iter()
.map(|tx| Ok(Arc::new(TransactionVerificationData::new(tx)?)))
.collect::<Result<Vec<_>, ConsensusError>>()
})
.await
.unwrap()?;
verify_transactions_for_block(database, txs.clone(), current_chain_height, hf).await?;
Ok(VerifyTxResponse::BatchSetupOk(txs))
}
#[instrument(name = "verify_txs", skip_all, level = "info")]
async fn verify_transactions_for_block<D>(
database: D,
txs: Vec<Arc<TransactionVerificationData>>,
current_chain_height: u64,
hf: HardFork,
) -> Result<VerifyTxResponse, ConsensusError>
where
D: Database + Clone + Sync + Send + 'static,
{
tracing::info!("Verifying transactions for block, amount: {}", txs.len());
set_missing_ring_members(database, &txs, &hf).await?;
let spent_kis = Arc::new(std::sync::Mutex::new(HashSet::new()));
tokio::task::spawn_blocking(move || {
txs.par_iter().try_for_each(|tx| {
verify_transaction_for_block(tx, current_chain_height, hf, spent_kis.clone())
})
});
Ok(VerifyTxResponse::Ok)
}
fn verify_transaction_for_block(
tx_verification_data: &TransactionVerificationData,
current_chain_height: u64,
hf: HardFork,
spent_kis: Arc<std::sync::Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), ConsensusError> {
tracing::trace!(
"Verifying transaction: {}",
hex::encode(tx_verification_data.tx_hash)
);
let tx_version = &tx_verification_data.version;
let rings_member_info_lock = tx_verification_data.rings_member_info.lock().unwrap();
let rings_member_info = match rings_member_info_lock.deref() {
Some(rings_member_info) => rings_member_info,
None => panic!("rings_member_info needs to be set to be able to verify!"),
};
check_tx_version(&rings_member_info.decoy_info, &tx_version, &hf)?;
let sum_outputs =
outputs::check_outputs(&tx_verification_data.tx.prefix.outputs, &hf, tx_version)?;
let sum_inputs = inputs::check_inputs(
&tx_verification_data.tx.prefix.inputs,
rings_member_info,
current_chain_height,
&hf,
tx_version,
spent_kis,
)?;
sigs::verify_signatures(&tx_verification_data.tx, &rings_member_info.rings)?;
Ok(())
}
/// Checks the version is in the allowed range.
///
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#version
fn check_tx_version(
decoy_info: &Option<ring::DecoyInfo>,
version: &TxVersion,
hf: &HardFork,
) -> Result<(), ConsensusError> {
if let Some(decoy_info) = decoy_info {
let max = max_tx_version(hf);
if version > &max {
return Err(ConsensusError::TransactionVersionInvalid);
}
// TODO: Doc is wrong here
let min = min_tx_version(hf);
if version < &min && decoy_info.not_mixable != 0 {
return Err(ConsensusError::TransactionVersionInvalid);
}
} else {
// This will only happen for hard-fork 1 when only RingSignatures are allowed.
if version != &TxVersion::RingSignatures {
return Err(ConsensusError::TransactionVersionInvalid);
}
}
Ok(())
}
fn max_tx_version(hf: &HardFork) -> TxVersion {
if hf <= &HardFork::V3 {
TxVersion::RingSignatures
} else {
TxVersion::RingCT
}
}
fn min_tx_version(hf: &HardFork) -> TxVersion {
if hf >= &HardFork::V6 {
TxVersion::RingCT
} else {
TxVersion::RingSignatures
}
}

View file

@ -1,193 +1,189 @@
use curve25519_dalek::EdwardsPoint;
use std::cmp::{max, min};
use std::collections::HashSet;
use std::{
cmp::{max, min, Ordering},
collections::HashSet,
sync::Arc,
};
use monero_serai::transaction::Input;
use tower::{Service, ServiceExt};
use crate::{hardforks::HardFork, ConsensusError, Database, DatabaseRequest, DatabaseResponse};
use crate::{
transactions::{
ring::{minimum_decoys, DecoyInfo, TxRingMembersInfo},
TxVersion,
},
ConsensusError, HardFork,
};
/// A struct holding information about the inputs and their decoys.
/// Checks the decoys are allowed.
///
/// https://cuprate.github.io/monero-book/consensus_rules/transactions/decoys.html
pub struct DecoyInfo {
/// The number of inputs that have enough outputs on the chain to mix with.
mixable: usize,
/// The number of inputs that don't have enough outputs on the chain to mix with.
not_mixable: usize,
/// The minimum amount of decoys used in the transaction.
min_decoys: usize,
/// The maximum amount of decoys used in the transaction.
max_decoys: usize,
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#minimum-decoys
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#equal-number-of-decoys
fn check_decoy_info(decoy_info: &DecoyInfo, hf: &HardFork) -> Result<(), ConsensusError> {
if hf == &HardFork::V15 {
// Hard-fork 15 allows both v14 and v16 rules
return check_decoy_info(decoy_info, &HardFork::V14)
.or_else(|_| check_decoy_info(decoy_info, &HardFork::V16));
}
let current_minimum_decoys = minimum_decoys(hf);
if decoy_info.min_decoys < current_minimum_decoys {
if decoy_info.not_mixable == 0 {
return Err(ConsensusError::TransactionHasInvalidRing(
"input does not have enough decoys",
));
}
if decoy_info.mixable > 1 {
return Err(ConsensusError::TransactionHasInvalidInput(
"more than one mixable input with unmixable inputs",
));
}
}
if hf >= &HardFork::V8 && decoy_info.min_decoys != current_minimum_decoys {
return Err(ConsensusError::TransactionHasInvalidRing(
"one ring does not have the minimum number of decoys",
));
}
if hf >= &HardFork::V12 && decoy_info.min_decoys != decoy_info.max_decoys {
return Err(ConsensusError::TransactionHasInvalidRing(
"rings do not have the same number of members",
));
}
Ok(())
}
impl DecoyInfo {
/// Creates a new [`DecoyInfo`] struct relating to the passed in inputs.
///
/// Do not rely on this function to do consensus checks!
///
pub async fn new<D: Database>(
inputs: &[Input],
hf: &HardFork,
mut database: D,
) -> Result<DecoyInfo, ConsensusError> {
let mut min_decoys = usize::MAX;
let mut max_decoys = usize::MIN;
let mut mixable = 0;
let mut not_mixable = 0;
/// Checks the inputs key images for torsion and for duplicates in the transaction.
///
/// The `spent_kis` parameter is not meant to be a complete list of key images, just a list of related transactions
/// key images, for example transactions in a block. The chain will be checked for duplicates later.
///
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#unique-key-image
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#torsion-free-key-image
pub(crate) fn check_key_images(
input: &Input,
spent_kis: &mut HashSet<[u8; 32]>,
) -> Result<(), ConsensusError> {
match input {
Input::ToKey { key_image, .. } => {
if !key_image.is_torsion_free() {
return Err(ConsensusError::TransactionHasInvalidInput(
"key image has torsion",
));
}
if !spent_kis.insert(key_image.compress().to_bytes()) {
return Err(ConsensusError::TransactionHasInvalidInput(
"key image already spent",
));
}
}
_ => {
return Err(ConsensusError::TransactionHasInvalidInput(
"Input not ToKey",
))
}
}
let minimum_decoys = minimum_decoys(hf);
Ok(())
}
for inp in inputs {
match inp {
Input::ToKey {
amount,
key_offsets,
..
} => {
let DatabaseResponse::NumberOutputsWithAmount(numb_of_outs) = database
.ready()
.await?
.call(DatabaseRequest::NumberOutputsWithAmount(
amount.unwrap_or(0),
))
.await?
else {
panic!("Database sent incorrect response!");
};
/// Checks that the input is of type [`Input::ToKey`] aka txin_to_key.
///
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#input-type
fn check_input_type(input: &Input) -> Result<(), ConsensusError> {
match input {
Input::ToKey { .. } => Ok(()),
_ => Err(ConsensusError::TransactionHasInvalidInput(
"Input not ToKey",
)),
}
}
// https://cuprate.github.io/monero-book/consensus_rules/transactions/decoys.html#mixable-and-unmixable-inputs
if numb_of_outs <= minimum_decoys {
not_mixable += 1;
} else {
mixable += 1;
}
/// Checks that the input has decoys.
///
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#inputs-must-have-decoys
fn check_input_has_decoys(input: &Input) -> Result<(), ConsensusError> {
match input {
Input::ToKey { key_offsets, .. } => {
if key_offsets.is_empty() {
Err(ConsensusError::TransactionHasInvalidRing("No ring members"))
} else {
Ok(())
}
}
_ => panic!("Input not ToKey"),
}
}
let numb_decoys = key_offsets
.len()
.checked_sub(1)
.ok_or(ConsensusError::TransactionHasInvalidRing("ring is empty"))?;
// https://cuprate.github.io/monero-book/consensus_rules/transactions/decoys.html#minimum-and-maximum-decoys-used
min_decoys = min(min_decoys, numb_decoys);
max_decoys = max(max_decoys, numb_decoys);
/// Checks that the ring members for the input are unique after hard-fork 6.
///
fn check_ring_members_unique(input: &Input, hf: &HardFork) -> Result<(), ConsensusError> {
if hf >= &HardFork::V6 {
match input {
Input::ToKey { key_offsets, .. } => key_offsets.iter().skip(1).try_for_each(|offset| {
if *offset == 0 {
Err(ConsensusError::TransactionHasInvalidRing(
"duplicate ring member",
))
} else {
Ok(())
}
}),
_ => panic!("Only ToKey is allowed this should have already been checked!"),
}
} else {
Ok(())
}
}
fn check_inputs_sorted(inputs: &[Input], hf: &HardFork) -> Result<(), ConsensusError> {
let get_ki = |inp: &Input| match inp {
Input::ToKey { key_image, .. } => key_image.compress().to_bytes(),
_ => panic!("Only ToKey is allowed this should have already been checked!"),
};
if hf >= &HardFork::V7 {
for inps in inputs.windows(2) {
match get_ki(&inps[0]).cmp(&get_ki(&inps[1])) {
Ordering::Less => (),
_ => {
return Err(ConsensusError::TransactionHasInvalidInput(
"input not ToKey",
"Inputs not ordered by key image!",
))
}
}
}
Ok(DecoyInfo {
mixable,
not_mixable,
min_decoys,
max_decoys,
})
}
/// Checks the decoys are allowed.
///
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#minimum-decoys
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#equal-number-of-decoys
pub fn check_decoy_info(&self, hf: &HardFork) -> Result<(), ConsensusError> {
if hf == &HardFork::V15 {
// Hard-fork 15 allows both v14 and v16 rules
return self
.check_decoy_info(&HardFork::V14)
.or_else(|_| self.check_decoy_info(&HardFork::V16));
}
let current_minimum_decoys = minimum_decoys(hf);
if self.min_decoys < current_minimum_decoys {
if self.not_mixable == 0 {
return Err(ConsensusError::TransactionHasInvalidRing(
"input does not have enough decoys",
));
}
if self.mixable > 1 {
return Err(ConsensusError::TransactionHasInvalidInput(
"more than one mixable input with unmixable inputs",
));
}
}
if hf >= &HardFork::V8 && self.min_decoys != current_minimum_decoys {
return Err(ConsensusError::TransactionHasInvalidRing(
"one ring does not have the minimum number of decoys",
));
}
if hf >= &HardFork::V12 && self.min_decoys != self.max_decoys {
return Err(ConsensusError::TransactionHasInvalidRing(
"rings do not have the same number of members",
));
}
Ok(())
}
/// Checks the version is in the allowed range.
///
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#version
pub fn check_tx_version(&self, version: u64, hf: &HardFork) -> Result<(), ConsensusError> {
if version == 0 {
return Err(ConsensusError::TransactionVersionInvalid);
}
let max = max_tx_version(hf);
if version > max {
return Err(ConsensusError::TransactionVersionInvalid);
}
// TODO: Doc is wrong here
let min = min_tx_version(hf);
if version < min && self.not_mixable != 0 {
return Err(ConsensusError::TransactionVersionInvalid);
}
} else {
Ok(())
}
}
fn max_tx_version(hf: &HardFork) -> u64 {
if hf <= &HardFork::V3 {
1
fn check_10_block_lock(
ring_member_info: &TxRingMembersInfo,
current_chain_height: u64,
hf: &HardFork,
) -> Result<(), ConsensusError> {
if hf >= &HardFork::V12 {
if ring_member_info.youngest_used_out_height + 10 > current_chain_height {
Err(ConsensusError::TransactionHasInvalidRing(
"tx has one ring member which is too younge",
))
} else {
Ok(())
}
} else {
2
}
}
fn min_tx_version(hf: &HardFork) -> u64 {
if hf >= &HardFork::V6 {
2
} else {
1
}
}
/// Returns the minimum amount of decoys for a hard-fork.
/// **There are exceptions to this always being the minimum decoys**
///
/// https://cuprate.github.io/monero-book/consensus_rules/transactions/decoys.html#minimum-amount-of-decoys
fn minimum_decoys(hf: &HardFork) -> usize {
use HardFork::*;
match hf {
V1 => panic!("hard-fork 1 does not use these rules!"),
V2 | V3 | V4 | V5 => 2,
V6 => 4,
V7 => 6,
V8 | V9 | V10 | V11 | V12 | V13 | V14 => 10,
_ => 15,
Ok(())
}
}
/// Sums the inputs checking for overflow.
///
/// https://cuprate.github.io/monero-book/consensus_rules/transactions/pre_rct.html#inputs-and-outputs-must-not-overflow
pub(crate) fn sum_inputs_v1(inputs: &[Input]) -> Result<u64, ConsensusError> {
fn sum_inputs_v1(inputs: &[Input]) -> Result<u64, ConsensusError> {
let mut sum: u64 = 0;
for inp in inputs {
match inp {
@ -207,38 +203,38 @@ pub(crate) fn sum_inputs_v1(inputs: &[Input]) -> Result<u64, ConsensusError> {
Ok(sum)
}
/// Checks the inputs key images for torsion and for duplicates in the transaction.
///
/// The `spent_kis` parameter is not meant to be a complete list of key images, just a list of related transactions
/// key images, for example transactions in a block. The chain will be checked for duplicates later.
///
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#unique-key-image
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#torsion-free-key-image
pub(crate) fn check_key_images(
pub fn check_inputs(
inputs: &[Input],
spent_kis: &mut HashSet<[u8; 32]>,
) -> Result<(), ConsensusError> {
for inp in inputs {
match inp {
Input::ToKey { key_image, .. } => {
if !key_image.is_torsion_free() {
return Err(ConsensusError::TransactionHasInvalidInput(
"key image has torsion",
));
}
if !spent_kis.insert(key_image.compress().to_bytes()) {
return Err(ConsensusError::TransactionHasInvalidInput(
"key image already spent",
));
}
}
_ => {
return Err(ConsensusError::TransactionHasInvalidInput(
"Input not ToKey",
))
}
}
ring_member_info: &TxRingMembersInfo,
current_chain_height: u64,
hf: &HardFork,
tx_version: &TxVersion,
spent_kis: Arc<std::sync::Mutex<HashSet<[u8; 32]>>>,
) -> Result<u64, ConsensusError> {
if inputs.is_empty() {
return Err(ConsensusError::TransactionHasInvalidInput("no inputs"));
}
Ok(())
check_10_block_lock(ring_member_info, current_chain_height, hf)?;
if let Some(decoy_info) = &ring_member_info.decoy_info {
check_decoy_info(decoy_info, hf)?;
}
for input in inputs {
check_input_type(input)?;
check_input_has_decoys(input)?;
check_ring_members_unique(input, hf)?;
let mut spent_kis_lock = spent_kis.lock().unwrap();
check_key_images(input, &mut spent_kis_lock)?;
}
check_inputs_sorted(inputs, hf)?;
match tx_version {
TxVersion::RingSignatures => sum_inputs_v1(inputs),
_ => panic!("TODO: RCT"),
}
}

View file

@ -2,8 +2,11 @@ use std::sync::OnceLock;
use monero_serai::transaction::Output;
use crate::{hardforks::HardFork, helper::check_point, transactions::TxVersion, ConsensusError};
use crate::{helper::check_point, transactions::TxVersion, ConsensusError, HardFork};
/// Decomposed amount table.
///
/// TODO: manually list each amount
static DECOMPOSED_AMOUNTS: OnceLock<[u64; 172]> = OnceLock::new();
pub(crate) fn decomposed_amounts() -> &'static [u64; 172] {
@ -23,7 +26,7 @@ pub(crate) fn decomposed_amounts() -> &'static [u64; 172] {
/// Checks the output keys are canonical points.
///
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#output-keys-canonical
pub(crate) fn check_output_keys(outputs: &[Output]) -> Result<(), ConsensusError> {
fn check_output_keys(outputs: &[Output]) -> Result<(), ConsensusError> {
for out in outputs {
if !check_point(&out.key) {
return Err(ConsensusError::TransactionInvalidOutput(
@ -37,7 +40,10 @@ pub(crate) fn check_output_keys(outputs: &[Output]) -> Result<(), ConsensusError
/// Checks the output types are allowed.
///
/// This is also used during miner-tx verification.
///
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#output-type
/// https://cuprate.github.io/monero-book/consensus_rules/blocks/miner_tx.html#output-type
pub(crate) fn check_output_types(outputs: &[Output], hf: &HardFork) -> Result<(), ConsensusError> {
if hf == &HardFork::V15 {
for outs in outputs.windows(2) {
@ -66,7 +72,10 @@ pub(crate) fn check_output_types(outputs: &[Output], hf: &HardFork) -> Result<()
/// Checks that an output amount is decomposed.
///
/// This is also used during miner tx verification.
///
/// https://cuprate.github.io/monero-book/consensus_rules/transactions/pre_rct.html#output-amount
/// https://cuprate.github.io/monero-book/consensus_rules/blocks/miner_tx.html#output-amounts
pub(crate) fn is_decomposed_amount(amount: u64) -> bool {
decomposed_amounts().binary_search(&amount).is_ok()
}
@ -96,7 +105,7 @@ fn check_output_amount_v1(amount: u64, hf: &HardFork) -> Result<(), ConsensusErr
///
/// https://cuprate.github.io/monero-book/consensus_rules/transactions/pre_rct.html#inputs-and-outputs-must-not-overflow
/// https://cuprate.github.io/monero-book/consensus_rules/transactions/pre_rct.html#output-amount
pub(crate) fn sum_outputs_v1(outputs: &[Output], hf: &HardFork) -> Result<u64, ConsensusError> {
fn sum_outputs_v1(outputs: &[Output], hf: &HardFork) -> Result<u64, ConsensusError> {
let mut sum: u64 = 0;
for out in outputs {
@ -111,3 +120,18 @@ pub(crate) fn sum_outputs_v1(outputs: &[Output], hf: &HardFork) -> Result<u64, C
Ok(sum)
}
/// Checks the outputs against all output consensus rules, returning the sum of the output amounts.
pub fn check_outputs(
outputs: &[Output],
hf: &HardFork,
tx_version: &TxVersion,
) -> Result<u64, ConsensusError> {
check_output_types(outputs, &hf)?;
check_output_keys(outputs)?;
match tx_version {
TxVersion::RingSignatures => sum_outputs_v1(outputs, &hf),
_ => todo!("RingCT"),
}
}

View file

@ -0,0 +1,331 @@
//! # Rings
//!
//! This module contains [`TxRingMembersInfo`] which is a struct made up from blockchain information about the
//! ring members of inputs. This module does minimal consensus checks, only when needed, and should not be relied
//! upon to do any.
//!
use std::{
cmp::{max, min},
collections::{HashMap, HashSet},
sync::Arc,
};
use curve25519_dalek::EdwardsPoint;
use monero_serai::{
ringct::{mlsag::RingMatrix, RctType},
transaction::{Input, Timelock, Transaction},
};
use tower::ServiceExt;
use crate::{
transactions::TransactionVerificationData, ConsensusError, Database, DatabaseRequest,
DatabaseResponse, HardFork, OutputOnChain,
};
/// Gets the absolute offsets from the relative offsets.
///
/// This function will return an error if the relative offsets are empty.
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#inputs-must-have-decoys
fn get_absolute_offsets(relative_offsets: &[u64]) -> Result<Vec<u64>, ConsensusError> {
if relative_offsets.is_empty() {
return Err(ConsensusError::TransactionHasInvalidRing(
"ring has no members",
));
}
let mut offsets = Vec::with_capacity(relative_offsets.len());
offsets.push(relative_offsets[0]);
for i in 1..relative_offsets.len() {
offsets.push(offsets[i - 1] + relative_offsets[i]);
}
Ok(offsets)
}
/// Inserts the outputs that are needed to verify the transaction inputs into the provided HashMap.
///
/// This will error if the inputs are empty
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#no-empty-inputs
///
pub fn insert_ring_member_ids(
inputs: &[Input],
output_ids: &mut HashMap<u64, HashSet<u64>>,
) -> Result<(), ConsensusError> {
if inputs.is_empty() {
return Err(ConsensusError::TransactionHasInvalidInput(
"transaction has no inputs",
));
}
for input in inputs {
match input {
Input::ToKey {
amount,
key_offsets,
..
} => output_ids
.entry(amount.unwrap_or(0))
.or_insert_with(HashSet::new)
.extend(get_absolute_offsets(key_offsets)?),
// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#input-type
_ => {
return Err(ConsensusError::TransactionHasInvalidInput(
"input not ToKey",
))
}
}
}
Ok(())
}
/// Represents the ring members of all the inputs.
#[derive(Debug)]
pub enum Rings {
/// Legacy, pre-ringCT, ring.
Legacy(Vec<Vec<EdwardsPoint>>),
/// TODO:
RingCT,
}
impl Rings {
/// Builds the rings for the transaction inputs, from the given outputs.
pub fn new(outputs: Vec<Vec<&OutputOnChain>>, rct_type: RctType) -> Rings {
match rct_type {
RctType::Null => Rings::Legacy(
outputs
.into_iter()
.map(|inp_outs| inp_outs.into_iter().map(|out| out.key).collect())
.collect(),
),
_ => todo!("RingCT"),
}
}
}
/// Information on the outputs the transaction is is referencing for inputs (ring members).
#[derive(Debug)]
pub struct TxRingMembersInfo {
pub rings: Rings,
/// Information on the structure of the decoys, will be [`None`] for txs before [`HardFork::V1`]
pub decoy_info: Option<DecoyInfo>,
pub youngest_used_out_height: u64,
pub time_locked_outs: Vec<Timelock>,
}
impl TxRingMembersInfo {
pub fn new(
used_outs: Vec<Vec<&OutputOnChain>>,
decoy_info: Option<DecoyInfo>,
rct_type: RctType,
) -> TxRingMembersInfo {
TxRingMembersInfo {
youngest_used_out_height: used_outs
.iter()
.map(|inp_outs| {
inp_outs
.iter()
.map(|out| out.height)
.max()
.expect("Input must have ring members")
})
.max()
.expect("Tx must have inputs"),
time_locked_outs: used_outs
.iter()
.flat_map(|inp_outs| {
inp_outs
.iter()
.filter_map(|out| match out.time_lock {
Timelock::None => None,
lock => Some(lock),
})
.collect::<Vec<_>>()
})
.collect(),
rings: Rings::new(used_outs, rct_type),
decoy_info,
}
}
}
/// Get the ring members for the inputs from the outputs on the chain.
fn get_ring_members_for_inputs<'a>(
outputs: &'a HashMap<u64, HashMap<u64, OutputOnChain>>,
inputs: &[Input],
) -> Result<Vec<Vec<&'a OutputOnChain>>, ConsensusError> {
inputs
.iter()
.map(|inp| match inp {
Input::ToKey {
amount,
key_offsets,
..
} => {
let offsets = get_absolute_offsets(key_offsets)?;
Ok(offsets
.iter()
.map(|offset| {
// get the hashmap for this amount.
outputs
.get(&amount.unwrap_or(0))
// get output at the index from the amount hashmap.
.and_then(|amount_map| amount_map.get(offset))
.ok_or(ConsensusError::TransactionHasInvalidRing(
"ring member not in database",
))
})
.collect::<Result<_, ConsensusError>>()?)
}
_ => Err(ConsensusError::TransactionHasInvalidInput(
"input not ToKey",
)),
})
.collect::<Result<_, ConsensusError>>()
}
/// Fills the `rings_member_info` field on the inputted [`TransactionVerificationData`]
pub async fn batch_fill_ring_member_info<D: Database + Clone + Send + Sync + 'static>(
txs_verification_data: &[Arc<TransactionVerificationData>],
hf: &HardFork,
mut database: D,
) -> Result<(), ConsensusError> {
let mut output_ids = HashMap::new();
for tx_v_data in txs_verification_data.iter() {
insert_ring_member_ids(&tx_v_data.tx.prefix.inputs, &mut output_ids)?;
}
let DatabaseResponse::Outputs(outputs) = database
.ready()
.await?
.call(DatabaseRequest::Outputs(output_ids))
.await?
else {
panic!("Database sent incorrect response!")
};
for tx_v_data in txs_verification_data {
let ring_members_for_tx =
get_ring_members_for_inputs(&outputs, &tx_v_data.tx.prefix.inputs)?;
let decoy_info = if hf != &HardFork::V1 {
Some(DecoyInfo::new(&tx_v_data.tx.prefix.inputs, hf, database.clone()).await?)
} else {
None
};
// Temporarily acquirer the mutex lock to add the ring member info.
let _ = tx_v_data
.rings_member_info
.lock()
.unwrap()
.insert(TxRingMembersInfo::new(
ring_members_for_tx,
decoy_info,
tx_v_data.tx.rct_signatures.rct_type(),
));
}
Ok(())
}
/// A struct holding information about the inputs and their decoys.
///
/// https://cuprate.github.io/monero-book/consensus_rules/transactions/decoys.html
#[derive(Debug)]
pub struct DecoyInfo {
/// The number of inputs that have enough outputs on the chain to mix with.
pub mixable: usize,
/// The number of inputs that don't have enough outputs on the chain to mix with.
pub not_mixable: usize,
/// The minimum amount of decoys used in the transaction.
pub min_decoys: usize,
/// The maximum amount of decoys used in the transaction.
pub max_decoys: usize,
}
impl DecoyInfo {
/// Creates a new [`DecoyInfo`] struct relating to the passed in inputs.
///
/// Do not rely on this function to do consensus checks!
///
pub async fn new<D: Database>(
inputs: &[Input],
hf: &HardFork,
mut database: D,
) -> Result<DecoyInfo, ConsensusError> {
let mut min_decoys = usize::MAX;
let mut max_decoys = usize::MIN;
let mut mixable = 0;
let mut not_mixable = 0;
let minimum_decoys = minimum_decoys(hf);
for inp in inputs {
match inp {
Input::ToKey {
amount,
key_offsets,
..
} => {
if let Some(amt) = *amount {
let DatabaseResponse::NumberOutputsWithAmount(numb_of_outs) = database
.ready()
.await?
.call(DatabaseRequest::NumberOutputsWithAmount(amt))
.await?
else {
panic!("Database sent incorrect response!");
};
// https://cuprate.github.io/monero-book/consensus_rules/transactions/decoys.html#mixable-and-unmixable-inputs
if numb_of_outs <= minimum_decoys && amt != 0 {
not_mixable += 1;
} else {
mixable += 1;
}
} else {
mixable += 1;
}
let numb_decoys = key_offsets
.len()
.checked_sub(1)
.ok_or(ConsensusError::TransactionHasInvalidRing("ring is empty"))?;
// https://cuprate.github.io/monero-book/consensus_rules/transactions/decoys.html#minimum-and-maximum-decoys-used
min_decoys = min(min_decoys, numb_decoys);
max_decoys = max(max_decoys, numb_decoys);
}
_ => {
return Err(ConsensusError::TransactionHasInvalidInput(
"input not ToKey",
))
}
}
}
Ok(DecoyInfo {
mixable,
not_mixable,
min_decoys,
max_decoys,
})
}
}
/// Returns the minimum amount of decoys for a hard-fork.
/// **There are exceptions to this always being the minimum decoys**
///
/// https://cuprate.github.io/monero-book/consensus_rules/transactions/decoys.html#minimum-amount-of-decoys
pub(crate) fn minimum_decoys(hf: &HardFork) -> usize {
use HardFork::*;
match hf {
V1 => panic!("hard-fork 1 does not use these rules!"),
V2 | V3 | V4 | V5 => 2,
V6 => 4,
V7 => 6,
V8 | V9 | V10 | V11 | V12 | V13 | V14 => 10,
_ => 15,
}
}

View file

@ -1,183 +0,0 @@
use std::collections::{HashMap, HashSet};
use curve25519_dalek::EdwardsPoint;
use monero_serai::{
ringct::{mlsag::RingMatrix, RctType},
transaction::{Input, Transaction},
};
use tower::ServiceExt;
use crate::{hardforks::HardFork, ConsensusError, Database, DatabaseRequest, DatabaseResponse};
mod ring_sigs;
pub(crate) use ring_sigs::verify_inputs_signatures;
/// Gets the absolute offsets from the relative offsets.
/// This function will return an error if the relative offsets are empty or if the hf version is 6 or higher and
/// not all the ring members are unique.
///
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#inputs-must-have-decoys
/// TODO: change the URL on this link \/
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#unique-inputs
fn get_absolute_offsets(
relative_offsets: &[u64],
hf: &HardFork,
) -> Result<Vec<u64>, ConsensusError> {
if relative_offsets.is_empty() {
return Err(ConsensusError::TransactionHasInvalidRing(
"ring has no members",
));
}
let mut offsets = Vec::with_capacity(relative_offsets.len());
offsets.push(relative_offsets[0]);
for i in 1..relative_offsets.len() {
if relative_offsets[i] == 0 && hf >= &HardFork::V6 {
// all ring members must be unique after v6
return Err(ConsensusError::TransactionHasInvalidRing(
"ring has duplicate member",
));
}
offsets.push(relative_offsets[i - 1] + relative_offsets[i]);
}
Ok(offsets)
}
/// Returns the outputs that are needed to verify the transaction inputs.
///
/// The returned value is a hashmap with:
/// keys = amount
/// values = hashset of amount idxs
///
pub fn get_ring_member_ids(
tx: &Transaction,
hf: &HardFork,
) -> Result<HashMap<u64, HashSet<u64>>, ConsensusError> {
let mut members = HashMap::with_capacity(tx.prefix.inputs.len());
for input in &tx.prefix.inputs {
match input {
Input::ToKey {
amount,
key_offsets,
..
} => members
.entry(amount.unwrap_or(0))
.or_insert_with(HashSet::new)
.extend(get_absolute_offsets(key_offsets, hf)?),
// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#input-type
_ => {
return Err(ConsensusError::TransactionHasInvalidInput(
"input not ToKey",
))
}
}
}
// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#no-empty-inputs
if members.is_empty() {
return Err(ConsensusError::TransactionHasInvalidInput(
"transaction has no inputs",
));
}
Ok(members)
}
/// Represents the ring members of the inputs.
pub enum Rings {
/// Legacy, pre-ringCT, ring.
Legacy(Vec<Vec<EdwardsPoint>>),
/// TODO:
RingCT,
}
impl Rings {
/// Builds the rings for the transaction inputs, from the outputs.
pub fn new(
outputs: &HashMap<u64, HashMap<u64, [EdwardsPoint; 2]>>,
inputs: &[Input],
rct_type: RctType,
hf: &HardFork,
) -> Result<Rings, ConsensusError> {
match rct_type {
RctType::Null => {
let legacy_ring = inputs
.iter()
.map(|inp| match inp {
Input::ToKey {
amount,
key_offsets,
..
} => {
let offsets = get_absolute_offsets(key_offsets, hf)?;
Ok(offsets
.iter()
.map(|offset| {
// get the hashmap for this amount.
outputs
.get(&amount.unwrap_or(0))
// get output at the index from the amount hashmap.
.and_then(|amount_map| amount_map.get(offset))
// this is a legacy ring we only need the one time key.
.and_then(|out| Some(out[0]))
.ok_or(ConsensusError::TransactionHasInvalidRing(
"ring member not in database",
))
})
.collect::<Result<_, ConsensusError>>()?)
}
_ => Err(ConsensusError::TransactionHasInvalidInput(
"input not ToKey",
)),
})
.collect::<Result<_, ConsensusError>>()?;
Ok(Rings::Legacy(legacy_ring))
}
_ => todo!("RingCT"),
}
}
}
/// Get [`Rings`] aka the outputs a transaction references for each transaction.
pub async fn batch_get_rings<D: Database>(
txs: &[Transaction],
hf: &HardFork,
database: D,
) -> Result<Vec<Rings>, ConsensusError> {
let mut output_ids = HashMap::new();
for tx in txs {
let mut tx_out_ids = get_ring_member_ids(tx, hf)?;
for (amount, idxs) in tx_out_ids.drain() {
output_ids
.entry(amount)
.or_insert_with(HashSet::new)
.extend(idxs);
}
}
let DatabaseResponse::Outputs(outputs) = database
.oneshot(DatabaseRequest::Outputs(output_ids))
.await?
else {
panic!("Database sent incorrect response!")
};
let mut rings = Vec::with_capacity(txs.len());
for tx in txs {
rings.push(Rings::new(
&outputs,
&tx.prefix.inputs,
tx.rct_signatures.rct_type(),
hf,
)?);
}
Ok(rings)
}

View file

@ -0,0 +1,29 @@
use std::sync::Arc;
use monero_serai::transaction::Transaction;
use multiexp::BatchVerifier as CoreBatchVerifier;
use crate::{transactions::ring::Rings, ConsensusError};
mod ring_sigs;
#[derive(Clone)]
pub struct BatchVerifier {
batch_verifier: Arc<std::sync::Mutex<CoreBatchVerifier<u64, dalek_ff_group::EdwardsPoint>>>,
}
pub struct BatchVerifierHandle {
batch_verifier: BatchVerifier,
}
pub fn verify_signatures(tx: &Transaction, rings: &Rings) -> Result<(), ConsensusError> {
match rings {
Rings::Legacy(_) => ring_sigs::verify_inputs_signatures(
&tx.prefix.inputs,
&tx.signatures,
rings,
&tx.signature_hash(),
),
_ => panic!("TODO: RCT"),
}
}

View file

@ -7,6 +7,7 @@
//! and this happens during ring signature verification in monero-serai.
//!
use monero_serai::{ring_signatures::RingSignature, transaction::Input};
use rayon::prelude::*;
use super::Rings;
use crate::ConsensusError;
@ -16,9 +17,9 @@ use crate::ConsensusError;
/// https://cuprate.github.io/monero-book/consensus_rules/transactions/pre_rct.html#the-ring-signature-must-be-valid
/// https://cuprate.github.io/monero-book/consensus_rules/transactions/pre_rct.html#amount-of-ring-signatures
pub fn verify_inputs_signatures(
rings: &Rings,
inputs: &[Input],
signatures: &[RingSignature],
rings: &Rings,
tx_sig_hash: &[u8; 32],
) -> Result<(), ConsensusError> {
match rings {
@ -30,17 +31,22 @@ pub fn verify_inputs_signatures(
));
}
for ((input, ring), sig) in inputs.iter().zip(rings).zip(signatures) {
let Input::ToKey { key_image, .. } = input else {
panic!("How did we build a ring with no decoys?");
};
inputs
.par_iter()
.zip(rings)
.zip(signatures)
.try_for_each(|((input, ring), sig)| {
let Input::ToKey { key_image, .. } = input else {
panic!("How did we build a ring with no decoys?");
};
if !sig.verify_ring_signature(tx_sig_hash, ring, key_image) {
return Err(ConsensusError::TransactionSignatureInvalid(
"Invalid ring signature",
));
}
}
if !sig.verify_ring_signature(tx_sig_hash, ring, key_image) {
return Err(ConsensusError::TransactionSignatureInvalid(
"Invalid ring signature",
));
}
Ok(())
})?;
}
_ => panic!("tried to verify v1 tx with a non v1 ring"),
}

View file

@ -2,7 +2,7 @@ use std::cmp::min;
use monero_serai::transaction::Timelock;
use crate::{block::DifficultyCache, hardforks::HardFork, helper::current_time};
use crate::{context::difficulty::DifficultyCache, helper::current_time, HardFork};
const BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW: u64 = 60;

View file

@ -1,125 +0,0 @@
use futures::join;
use monero_serai::{block::Block, transaction::Transaction};
use tower::ServiceExt;
use tracing::instrument;
use crate::{
block::{
difficulty::{DifficultyCache, DifficultyCacheConfig},
weight::{BlockWeightsCache, BlockWeightsCacheConfig},
},
hardforks::{HardForkConfig, HardForkState},
ConsensusError, Database, DatabaseRequest, DatabaseResponse,
};
pub struct Config {
hard_fork_cfg: HardForkConfig,
difficulty_cfg: DifficultyCacheConfig,
weights_config: BlockWeightsCacheConfig,
}
impl Config {
pub fn main_net() -> Config {
Config {
hard_fork_cfg: HardForkConfig::main_net(),
difficulty_cfg: DifficultyCacheConfig::main_net(),
weights_config: BlockWeightsCacheConfig::main_net(),
}
}
}
#[derive(Clone)]
struct State {
block_weight: BlockWeightsCache,
difficulty: DifficultyCache,
hard_fork: HardForkState,
chain_height: u64,
top_hash: [u8; 32],
}
impl State {
pub async fn init<D: Database + Clone>(
config: Config,
mut database: D,
) -> Result<State, ConsensusError> {
let DatabaseResponse::ChainHeight(chain_height) = database
.ready()
.await?
.call(DatabaseRequest::ChainHeight)
.await?
else {
panic!("Database sent incorrect response")
};
Self::init_at_chain_height(config, chain_height, database).await
}
pub async fn init_at_chain_height<D: Database + Clone>(
config: Config,
chain_height: u64,
mut database: D,
) -> Result<State, ConsensusError> {
let DatabaseResponse::BlockHash(top_hash) = database
.ready()
.await?
.call(DatabaseRequest::BlockHash(chain_height - 1))
.await?
else {
panic!("Database sent incorrect response")
};
let (block_weight, difficulty, hard_fork) = join!(
BlockWeightsCache::init_from_chain_height(
chain_height,
config.weights_config,
database.clone()
),
DifficultyCache::init_from_chain_height(
chain_height,
config.difficulty_cfg,
database.clone()
),
HardForkState::init_from_chain_height(config.hard_fork_cfg, chain_height, database)
);
Ok(State {
block_weight: block_weight?,
difficulty: difficulty?,
hard_fork: hard_fork?,
chain_height,
top_hash,
})
}
}
pub struct Verifier {
state: State,
}
impl Verifier {
pub async fn init<D: Database + Clone>(
config: Config,
mut database: D,
) -> Result<Verifier, ConsensusError> {
let DatabaseResponse::ChainHeight(chain_height) = database
.ready()
.await?
.call(DatabaseRequest::ChainHeight)
.await?
else {
panic!("Database sent incorrect response")
};
Self::init_at_chain_height(config, chain_height, database).await
}
pub async fn init_at_chain_height<D: Database + Clone>(
config: Config,
chain_height: u64,
database: D,
) -> Result<Verifier, ConsensusError> {
Ok(Verifier {
state: State::init_at_chain_height(config, chain_height, database).await?,
})
}
}