cuprate/consensus/src/transactions.rs

268 lines
7.6 KiB
Rust
Raw Normal View History

2023-10-23 18:14:40 +00:00
use std::{
collections::HashSet,
future::Future,
ops::Deref,
2023-10-23 18:14:40 +00:00
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures::FutureExt;
2023-10-20 00:04:26 +00:00
use monero_serai::transaction::Transaction;
2023-10-23 18:14:40 +00:00
use rayon::prelude::*;
use tower::{Service, ServiceExt};
2023-10-23 18:14:40 +00:00
use tracing::instrument;
2023-10-20 00:04:26 +00:00
use monero_consensus::{
transactions::{
check_transaction_contextual, check_transaction_semantic, RingCTError, TransactionError,
TxRingMembersInfo,
},
ConsensusError, HardFork, TxVersion,
};
use crate::{
batch_verifier::MultiThreadedBatchVerifier, context::ReOrgToken, helper::rayon_spawn_async,
Database, DatabaseRequest, DatabaseResponse, ExtendedConsensusError,
};
2023-10-20 00:04:26 +00:00
mod contextual_data;
2023-10-20 00:04:26 +00:00
/// Data needed to verify a transaction.
///
2023-10-23 18:14:40 +00:00
#[derive(Debug)]
2023-10-20 00:04:26 +00:00
pub struct TransactionVerificationData {
2023-10-23 18:14:40 +00:00
pub tx: Transaction,
pub version: TxVersion,
pub tx_blob: Vec<u8>,
pub tx_weight: usize,
pub fee: u64,
pub tx_hash: [u8; 32],
/// We put this behind a mutex as the information is not constant and is based of past outputs idxs
/// which could change on re-orgs.
rings_member_info: std::sync::Mutex<Option<(TxRingMembersInfo, ReOrgToken)>>,
2023-10-20 00:04:26 +00:00
}
impl TransactionVerificationData {
pub fn new(
tx: Transaction,
hf: &HardFork,
verifier: Arc<MultiThreadedBatchVerifier>,
) -> Result<TransactionVerificationData, ConsensusError> {
let tx_hash = tx.hash();
let fee = verifier.queue_statement(|verifier| {
check_transaction_semantic(&tx, &tx_hash, hf, verifier)
.map_err(ConsensusError::Transaction)
})?;
2023-10-20 00:04:26 +00:00
Ok(TransactionVerificationData {
tx_hash,
2023-10-23 18:14:40 +00:00
tx_blob: tx.serialize(),
2023-10-20 00:04:26 +00:00
tx_weight: tx.weight(),
fee,
2023-10-23 18:14:40 +00:00
rings_member_info: std::sync::Mutex::new(None),
version: TxVersion::from_raw(tx.prefix.version)
.ok_or(TransactionError::TransactionVersionInvalid)?,
2023-10-20 00:04:26 +00:00
tx,
})
}
2023-10-23 18:14:40 +00:00
}
2023-10-20 00:04:26 +00:00
2023-10-23 18:14:40 +00:00
pub enum VerifyTxRequest {
/// Verifies transactions in the context of a block.
Block {
txs: Vec<Arc<TransactionVerificationData>>,
current_chain_height: u64,
time_for_time_lock: u64,
2023-10-23 18:14:40 +00:00
hf: HardFork,
re_org_token: ReOrgToken,
2023-10-23 18:14:40 +00:00
},
/// Batches the setup of [`TransactionVerificationData`], does *some* verification, you need to call [`VerifyTxRequest::Block`]
/// with the returned data.
BatchSetup {
2023-10-20 00:04:26 +00:00
txs: Vec<Transaction>,
2023-10-23 18:14:40 +00:00
hf: HardFork,
re_org_token: ReOrgToken,
2023-10-23 18:14:40 +00:00
},
}
pub enum VerifyTxResponse {
BatchSetupOk(Vec<Arc<TransactionVerificationData>>),
Ok,
}
#[derive(Clone)]
pub struct TxVerifierService<D: Clone> {
database: D,
}
impl<D> TxVerifierService<D>
where
D: Database + Clone + Send + 'static,
D::Future: Send + 'static,
{
pub fn new(database: D) -> TxVerifierService<D> {
TxVerifierService { database }
}
}
impl<D> Service<VerifyTxRequest> for TxVerifierService<D>
where
D: Database + Clone + Send + Sync + 'static,
D::Future: Send + 'static,
{
type Response = VerifyTxResponse;
type Error = ExtendedConsensusError;
2023-10-23 18:14:40 +00:00
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.database.poll_ready(cx).map_err(Into::into)
}
fn call(&mut self, req: VerifyTxRequest) -> Self::Future {
let database = self.database.clone();
match req {
VerifyTxRequest::Block {
txs,
current_chain_height,
time_for_time_lock,
2023-10-23 18:14:40 +00:00
hf,
re_org_token,
} => verify_transactions_for_block(
database,
txs,
current_chain_height,
time_for_time_lock,
hf,
re_org_token,
)
.boxed(),
VerifyTxRequest::BatchSetup {
2023-10-23 18:14:40 +00:00
txs,
hf,
re_org_token,
} => batch_setup_transactions(database, txs, hf, re_org_token).boxed(),
2023-10-23 18:14:40 +00:00
}
}
}
2023-10-26 02:16:03 +00:00
async fn batch_setup_transactions<D>(
database: D,
txs: Vec<Transaction>,
hf: HardFork,
re_org_token: ReOrgToken,
) -> Result<VerifyTxResponse, ExtendedConsensusError>
where
D: Database + Clone + Sync + Send + 'static,
2023-10-26 02:16:03 +00:00
{
let batch_verifier = Arc::new(MultiThreadedBatchVerifier::new(rayon::current_num_threads()));
let cloned_verifier = batch_verifier.clone();
2023-10-26 02:16:03 +00:00
// Move out of the async runtime and use rayon to parallelize the serialisation and hashing of the txs.
let txs = rayon_spawn_async(move || {
2023-10-26 02:16:03 +00:00
txs.into_par_iter()
.map(|tx| {
Ok(Arc::new(TransactionVerificationData::new(
tx,
&hf,
cloned_verifier.clone(),
)?))
})
2023-10-26 02:16:03 +00:00
.collect::<Result<Vec<_>, ConsensusError>>()
})
.await?;
2023-10-26 02:16:03 +00:00
if !Arc::into_inner(batch_verifier).unwrap().verify() {
Err(ConsensusError::Transaction(TransactionError::RingCTError(
RingCTError::BulletproofsRangeInvalid,
)))?
}
contextual_data::batch_fill_ring_member_info(&txs, &hf, re_org_token, database).await?;
2023-10-23 18:14:40 +00:00
Ok(VerifyTxResponse::BatchSetupOk(txs))
}
#[instrument(name = "verify_txs", skip_all, level = "info")]
async fn verify_transactions_for_block<D>(
database: D,
txs: Vec<Arc<TransactionVerificationData>>,
current_chain_height: u64,
time_for_time_lock: u64,
2023-10-23 18:14:40 +00:00
hf: HardFork,
re_org_token: ReOrgToken,
) -> Result<VerifyTxResponse, ExtendedConsensusError>
2023-10-23 18:14:40 +00:00
where
D: Database + Clone + Sync + Send + 'static,
{
2023-10-24 01:25:11 +00:00
tracing::debug!("Verifying transactions for block, amount: {}", txs.len());
2023-10-23 18:14:40 +00:00
contextual_data::batch_refresh_ring_member_info(&txs, &hf, re_org_token, database.clone())
.await?;
2023-10-23 18:14:40 +00:00
let spent_kis = Arc::new(std::sync::Mutex::new(HashSet::new()));
let cloned_spent_kis = spent_kis.clone();
rayon_spawn_async(move || {
2023-10-23 18:14:40 +00:00
txs.par_iter().try_for_each(|tx| {
verify_transaction_for_block(
tx,
current_chain_height,
time_for_time_lock,
hf,
cloned_spent_kis.clone(),
)
2023-10-23 18:14:40 +00:00
})
})
.await?;
2023-10-23 18:14:40 +00:00
let DatabaseResponse::CheckKIsNotSpent(kis_spent) = database
.oneshot(DatabaseRequest::CheckKIsNotSpent(
Arc::into_inner(spent_kis).unwrap().into_inner().unwrap(),
))
.await?
else {
panic!("Database sent incorrect response!");
};
if kis_spent {
Err(ConsensusError::Transaction(TransactionError::KeyImageSpent))?;
}
2023-10-23 18:14:40 +00:00
Ok(VerifyTxResponse::Ok)
}
fn verify_transaction_for_block(
tx_verification_data: &TransactionVerificationData,
current_chain_height: u64,
time_for_time_lock: u64,
2023-10-23 18:14:40 +00:00
hf: HardFork,
spent_kis: Arc<std::sync::Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), ConsensusError> {
tracing::debug!(
2023-10-23 18:14:40 +00:00
"Verifying transaction: {}",
hex::encode(tx_verification_data.tx_hash)
);
let rings_member_info_lock = tx_verification_data.rings_member_info.lock().unwrap();
let rings_member_info = match rings_member_info_lock.deref() {
Some(rings_member_info) => rings_member_info,
None => panic!("rings_member_info needs to be set to be able to verify!"),
};
check_transaction_contextual(
&tx_verification_data.tx,
&rings_member_info.0,
2023-10-23 18:14:40 +00:00
current_chain_height,
time_for_time_lock,
2023-10-23 18:14:40 +00:00
&hf,
spent_kis,
)?;
Ok(())
}