From fc7b676f7b0310e7a03cda7166c441c93a037311 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Sun, 5 Nov 2023 18:44:41 +0000 Subject: [PATCH] consensus: add a tx pool trait. This means we don't have to rely on people giving the verifier the correct txs for a block. Also allows some speedup as we can put the fetching of outputs on a different task. --- consensus/src/bin/scan_chain.rs | 264 +++++------------- consensus/src/bin/tx_pool.rs | 241 ++++++++++++++++ consensus/src/block.rs | 171 +++--------- consensus/src/context.rs | 29 +- consensus/src/context/tokens.rs | 45 +++ consensus/src/lib.rs | 63 ++++- consensus/src/transactions.rs | 125 +++------ .../{ring.rs => contextual_data.rs} | 132 ++++++++- consensus/src/transactions/inputs.rs | 2 +- consensus/src/transactions/sigs.rs | 2 +- 10 files changed, 640 insertions(+), 434 deletions(-) create mode 100644 consensus/src/bin/tx_pool.rs create mode 100644 consensus/src/context/tokens.rs rename consensus/src/transactions/{ring.rs => contextual_data.rs} (71%) diff --git a/consensus/src/bin/scan_chain.rs b/consensus/src/bin/scan_chain.rs index 9408045..01c5ebb 100644 --- a/consensus/src/bin/scan_chain.rs +++ b/consensus/src/bin/scan_chain.rs @@ -1,28 +1,29 @@ #![cfg(feature = "binaries")] -use std::path::Path; use std::{ ops::Range, path::PathBuf, sync::{Arc, RwLock}, }; -use monero_serai::{block::Block, transaction::Transaction}; -use tower::ServiceExt; +use futures::{channel::mpsc, SinkExt, StreamExt}; +use monero_serai::block::Block; +use tokio::sync::oneshot; +use tower::{Service, ServiceExt}; use tracing::level_filters::LevelFilter; use cuprate_common::Network; use monero_consensus::{ context::{ContextConfig, UpdateBlockchainCacheRequest}, - initialize_verifier, + initialize_blockchain_context, initialize_verifier, rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig}, - transactions::VerifyTxRequest, - ConsensusError, Database, DatabaseRequest, DatabaseResponse, HardFork, - VerifiedBlockInformation, VerifyBlockRequest, VerifyTxResponse, + Database, DatabaseRequest, DatabaseResponse, VerifiedBlockInformation, VerifyBlockRequest, }; -const MAX_BLOCKS_IN_RANGE: u64 = 300; +mod tx_pool; + +const MAX_BLOCKS_IN_RANGE: u64 = 1000; const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 250; /// Calls for a batch of blocks, returning the response and the time it took. @@ -35,24 +36,6 @@ async fn call_batch( .await } -fn simple_get_hf(height: u64) -> HardFork { - match height { - 0..=1009826 => HardFork::V1, - 1009827..=1141316 => HardFork::V2, - 1141317..=1220515 => HardFork::V3, - _ => todo!("rules past v3"), - } -} - -fn get_hf_height(hf: &HardFork) -> u64 { - match hf { - HardFork::V1 => 0, - HardFork::V2 => 1009827, - HardFork::V3 => 1141317, - _ => todo!("rules past v3"), - } -} - async fn update_cache_and_context( cache: &RwLock, context_updater: &mut Ctx, @@ -86,77 +69,54 @@ where Ok(()) } -/// Batches all transactions together when getting outs -/// -/// TODO: reduce the amount of parameters of this function -#[allow(clippy::too_many_arguments)] -async fn batch_txs_verify_blocks( - cache: &RwLock, - save_file: &Path, - txs: Vec>, - blocks: Vec, - tx_verifier: &mut Tx, - block_verifier: &mut Blk, - context_updater: &mut Ctx, - current_height: u64, - hf: HardFork, +async fn call_blocks( + mut new_tx_chan: tx_pool::NewTxChanSen, + mut block_chan: mpsc::Sender>, + start_height: u64, + chain_height: u64, + database: D, ) -> Result<(), tower::BoxError> where - Blk: tower::Service< - VerifyBlockRequest, - Response = VerifiedBlockInformation, - Error = ConsensusError, - >, - Tx: tower::Service, - Ctx: tower::Service, + D: Database + Clone + Send + Sync + 'static, + D::Future: Send + 'static, { - // This is an optimisation, we batch ALL the transactions together to get their outputs, saving a - // massive amount of time at the cost of inaccurate data, specifically the only thing that's inaccurate - // is the amount of outputs at a certain time and as this would be lower (so more strict) than the true value - // this will fail when this is an issue. - let mut txs_per_block = [0; (MAX_BLOCKS_IN_RANGE * 3) as usize]; - let txs = txs - .into_iter() - .enumerate() - .flat_map(|(block_id, block_batch_txs)| { - // block id is just this blocks position in the batch. - txs_per_block[block_id] = block_batch_txs.len(); - block_batch_txs - }) - .collect(); + let mut next_fut = tokio::spawn(call_batch( + start_height..(start_height + MAX_BLOCKS_IN_RANGE).min(chain_height), + database.clone(), + )); - let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier - .ready() - .await? - .call(VerifyTxRequest::BatchSetup { txs, hf }) - .await? - else { - panic!("tx verifier returned incorrect response"); - }; - - let mut done_txs = 0; - for (block_id, block) in blocks.into_iter().enumerate() { - // block id is just this blocks position in the batch. - let txs = &txs[done_txs..done_txs + txs_per_block[block_id]]; - done_txs += txs_per_block[block_id]; - - let verified_block_info: VerifiedBlockInformation = block_verifier - .ready() - .await? - .call(VerifyBlockRequest::MainChain(block, txs.into())) - .await?; - - tracing::info!( - "verified block: {}", - current_height + u64::try_from(block_id).unwrap() + for next_batch_start in (start_height..chain_height) + .step_by(MAX_BLOCKS_IN_RANGE as usize) + .skip(1) + { + // Call the next batch while we handle this batch. + let current_fut = std::mem::replace( + &mut next_fut, + tokio::spawn(call_batch( + next_batch_start..(next_batch_start + MAX_BLOCKS_IN_RANGE).min(chain_height), + database.clone(), + )), ); - update_cache_and_context(cache, context_updater, verified_block_info).await?; + let DatabaseResponse::BlockBatchInRange(blocks) = current_fut.await?? else { + panic!("Database sent incorrect response!"); + }; - if (current_height + u64::try_from(block_id).unwrap()) % 25000 == 0 { - tracing::info!("Saving cache to: {}", save_file.display()); - cache.read().unwrap().save(save_file)?; - } + tracing::info!( + "Handling batch: {:?}, chain height: {}", + (next_batch_start - MAX_BLOCKS_IN_RANGE)..(next_batch_start), + chain_height + ); + + let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip(); + + let (tx, rx) = oneshot::channel(); + new_tx_chan + .send((txs.into_iter().flatten().collect(), tx)) + .await?; + rx.await??; + + block_chan.send(blocks).await?; } Ok(()) @@ -164,8 +124,8 @@ where async fn scan_chain( cache: Arc>, - save_file: PathBuf, - rpc_config: Arc>, + _save_file: PathBuf, + _rpc_config: Arc>, database: D, ) -> Result<(), tower::BoxError> where @@ -181,109 +141,37 @@ where let config = ContextConfig::main_net(); - let (mut block_verifier, mut transaction_verifier, mut context_updater) = - initialize_verifier(database.clone(), config).await?; + let (ctx_svc, mut context_updater) = + initialize_blockchain_context(config, database.clone()).await?; + + let (tx, rx) = tokio::sync::oneshot::channel(); + + let (tx_pool_svc, new_tx_chan) = tx_pool::TxPool::spawn(rx, ctx_svc.clone()).await?; + + let (mut block_verifier, transaction_verifier) = + initialize_verifier(database.clone(), tx_pool_svc, ctx_svc).await?; + + tx.send(transaction_verifier).map_err(|_| "").unwrap(); - let batch_size = rpc_config.read().unwrap().block_batch_size(); let start_height = cache.read().unwrap().height; - tracing::info!( - "Initialised verifier, beginning scan from {} to {}", - start_height, - chain_height - ); + let (block_tx, mut incoming_blocks) = mpsc::channel(3); - let mut next_fut = tokio::spawn(call_batch( - start_height..(start_height + batch_size).min(chain_height), - database.clone(), - )); + tokio::spawn(async move { + call_blocks(new_tx_chan, block_tx, start_height, chain_height, database).await + }); - let mut current_height = start_height; - let mut next_batch_start_height = start_height + batch_size; + while let Some(blocks) = incoming_blocks.next().await { + for block in blocks { + let verified_block_info = block_verifier + .ready() + .await? + .call(VerifyBlockRequest::MainChain(block)) + .await?; - while next_batch_start_height < chain_height { - // TODO: utilize dynamic batch sizes - let next_batch_size = rpc_config.read().unwrap().block_batch_size(); + tracing::info!("verified block: {}", verified_block_info.height); - // Call the next batch while we handle this batch. - let current_fut = std::mem::replace( - &mut next_fut, - tokio::spawn(call_batch( - next_batch_start_height - ..(next_batch_start_height + next_batch_size).min(chain_height), - database.clone(), - )), - ); - - let DatabaseResponse::BlockBatchInRange(blocks) = current_fut.await?? else { - panic!("Database sent incorrect response!"); - }; - - tracing::info!( - "Handling batch: {:?}, chain height: {}", - current_height..(current_height + blocks.len() as u64), - chain_height - ); - - let (mut blocks, mut txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip(); - let batch_len = u64::try_from(blocks.len()).unwrap(); - - let hf_start_batch = simple_get_hf(current_height); - let hf_end_batch = simple_get_hf(current_height + batch_len); - - if hf_start_batch == hf_end_batch { - // we can only batch transactions on the same hard fork - batch_txs_verify_blocks( - &cache, - &save_file, - txs, - blocks, - &mut transaction_verifier, - &mut block_verifier, - &mut context_updater, - current_height, - hf_start_batch, - ) - .await?; - current_height += batch_len; - next_batch_start_height += batch_len; - } else { - let end_hf_start = get_hf_height(&hf_end_batch); - let height_diff = (end_hf_start - current_height) as usize; - - batch_txs_verify_blocks( - &cache, - &save_file, - txs.drain(0..height_diff).collect(), - blocks.drain(0..height_diff).collect(), - &mut transaction_verifier, - &mut block_verifier, - &mut context_updater, - current_height, - hf_start_batch, - ) - .await?; - - current_height += height_diff as u64; - next_batch_start_height += height_diff as u64; - - tracing::info!("Hard fork activating: {:?}", hf_end_batch); - - batch_txs_verify_blocks( - &cache, - &save_file, - txs, - blocks, - &mut transaction_verifier, - &mut block_verifier, - &mut context_updater, - current_height, - hf_end_batch, - ) - .await?; - - current_height += batch_len - height_diff as u64; - next_batch_start_height += batch_len - height_diff as u64; + update_cache_and_context(&cache, &mut context_updater, verified_block_info).await?; } } diff --git a/consensus/src/bin/tx_pool.rs b/consensus/src/bin/tx_pool.rs new file mode 100644 index 0000000..7718f7a --- /dev/null +++ b/consensus/src/bin/tx_pool.rs @@ -0,0 +1,241 @@ +#![cfg(feature = "binaries")] + +use std::{ + collections::HashMap, + future::Future, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll}, +}; + +use futures::{channel::mpsc, FutureExt, StreamExt}; +use monero_serai::transaction::Transaction; +use tokio::sync::oneshot; +use tower::{Service, ServiceExt}; + +use monero_consensus::{ + context::{BlockChainContext, BlockChainContextRequest, RawBlockChainContext}, + transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse}, + ConsensusError, TxNotInPool, TxPoolRequest, TxPoolResponse, +}; + +#[derive(Clone)] +pub struct TxPoolHandle { + tx_pool_task: std::sync::Arc>, + tx_pool_chan: mpsc::Sender<( + TxPoolRequest, + oneshot::Sender>, + )>, +} + +impl tower::Service for TxPoolHandle { + type Response = TxPoolResponse; + type Error = TxNotInPool; + type Future = + Pin> + Send + 'static>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.tx_pool_task.is_finished() { + panic!("Tx pool task finished before it was supposed to!"); + }; + + self.tx_pool_chan + .poll_ready(cx) + .map_err(|_| panic!("Tx pool channel closed before it was supposed to")) + } + + fn call(&mut self, req: TxPoolRequest) -> Self::Future { + let (tx, rx) = oneshot::channel(); + self.tx_pool_chan + .try_send((req, tx)) + .expect("You need to use `poll_ready` to check capacity!"); + + async move { + rx.await + .expect("Tx pool will always respond without dropping the sender") + } + .boxed() + } +} + +pub type NewTxChanRec = mpsc::Receiver<( + Vec, + oneshot::Sender>, +)>; + +pub type NewTxChanSen = mpsc::Sender<( + Vec, + oneshot::Sender>, +)>; + +pub struct TxPool { + txs: Arc>>>, + current_ctx: BlockChainContext, + tx_verifier: Option, + tx_verifier_chan: Option>, + ctx_svc: Ctx, +} + +impl TxPool +where + TxV: Service + + Clone + + Send + + 'static, + TxV::Future: Send + 'static, + Ctx: Service + + Send + + 'static, + Ctx::Future: Send + 'static, +{ + pub async fn spawn( + tx_verifier_chan: oneshot::Receiver, + mut ctx_svc: Ctx, + ) -> Result< + ( + TxPoolHandle, + mpsc::Sender<( + Vec, + oneshot::Sender>, + )>, + ), + tower::BoxError, + > { + let current_ctx = ctx_svc + .ready() + .await? + .call(BlockChainContextRequest) + .await?; + + let tx_pool = TxPool { + txs: Default::default(), + current_ctx, + tx_verifier: None, + tx_verifier_chan: Some(tx_verifier_chan), + ctx_svc, + }; + + let (tx_pool_tx, tx_pool_rx) = mpsc::channel(3); + let (new_tx_tx, new_tx_rx) = mpsc::channel(3); + + let tx_pool_task = tokio::spawn(tx_pool.run(tx_pool_rx, new_tx_rx)); + + Ok(( + TxPoolHandle { + tx_pool_task: tx_pool_task.into(), + tx_pool_chan: tx_pool_tx, + }, + new_tx_tx, + )) + } + + async fn get_or_update_ctx(&mut self) -> Result { + if let Ok(current_ctx) = self.current_ctx.blockchain_context().cloned() { + Ok(current_ctx) + } else { + self.current_ctx = self + .ctx_svc + .ready() + .await? + .call(BlockChainContextRequest) + .await?; + self.current_ctx + .blockchain_context() + .map_err(Into::into) + .cloned() + } + } + + fn handle_txs_req( + &self, + req: TxPoolRequest, + tx: oneshot::Sender>, + ) { + let TxPoolRequest::Transactions(txs_to_get) = req; + + let mut res = Vec::with_capacity(txs_to_get.len()); + + let mut txs = self.txs.lock().unwrap(); + + for tx_hash in txs_to_get { + let Some(tx) = txs.remove(&tx_hash) else { + let _ = tx.send(Err(TxNotInPool)); + return; + }; + res.push(tx) + } + + let _ = tx.send(Ok(TxPoolResponse::Transactions(res))); + } + + async fn handle_new_txs( + &mut self, + new_txs: Vec, + res_chan: oneshot::Sender>, + ) -> Result<(), tower::BoxError> { + if self.tx_verifier.is_none() { + self.tx_verifier = Some(self.tx_verifier_chan.take().unwrap().await?); + } + + let current_ctx = self.get_or_update_ctx().await?; + + let mut tx_verifier = self.tx_verifier.clone().unwrap(); + let tx_pool = self.txs.clone(); + + tokio::spawn(async move { + // We only batch the setup a real tx pool would also call `VerifyTxRequest::Block` + let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier + .ready() + .await + .unwrap() + .call(VerifyTxRequest::BatchSetup { + txs: new_txs, + hf: current_ctx.current_hard_fork, + re_org_token: current_ctx.re_org_token.clone(), + }) + .await + .unwrap() + else { + panic!("Tx verifier sent incorrect response!"); + }; + + let mut locked_pool = tx_pool.lock().unwrap(); + + for tx in txs { + locked_pool.insert(tx.tx_hash, tx); + } + res_chan.send(Ok(())).unwrap(); + }); + Ok(()) + } + + pub async fn run( + mut self, + mut tx_pool_handle: mpsc::Receiver<( + TxPoolRequest, + oneshot::Sender>, + )>, + mut new_tx_channel: NewTxChanRec, + ) { + loop { + futures::select! { + pool_req = tx_pool_handle.next() => { + let Some((req, tx)) = pool_req else { + todo!("Shutdown txpool") + }; + self.handle_txs_req(req, tx); + } + new_txs = new_tx_channel.next() => { + let Some(new_txs) = new_txs else { + todo!("Shutdown txpool") + }; + + self.handle_new_txs(new_txs.0, new_txs.1).await.unwrap() + } + } + } + } +} + +#[allow(dead_code)] +fn main() {} diff --git a/consensus/src/block.rs b/consensus/src/block.rs index bfc6961..b2bce7e 100644 --- a/consensus/src/block.rs +++ b/consensus/src/block.rs @@ -6,13 +6,13 @@ use std::{ }; use futures::FutureExt; -use monero_serai::{block::Block, transaction::Transaction}; +use monero_serai::block::Block; use tower::{Service, ServiceExt}; use crate::{ context::{BlockChainContext, BlockChainContextRequest}, transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse}, - ConsensusError, HardFork, + ConsensusError, HardFork, TxNotInPool, TxPoolRequest, TxPoolResponse, }; mod checks; @@ -34,50 +34,62 @@ pub struct VerifiedBlockInformation { } pub enum VerifyBlockRequest { - MainChainBatchSetupVerify(Block, Vec), - MainChain(Block, Vec>), -} - -pub enum VerifyBlockResponse { - MainChainBatchSetupVerify(), + MainChain(Block), } // TODO: it is probably a bad idea for this to derive clone, if 2 places (RPC, P2P) receive valid but different blocks // then they will both get approved but only one should go to main chain. #[derive(Clone)] -pub struct BlockVerifierService { +pub struct BlockVerifierService { context_svc: C, - tx_verifier_svc: Tx, + tx_verifier_svc: TxV, + tx_pool: TxP, } -impl BlockVerifierService +impl BlockVerifierService where C: Service + Clone + Send + 'static, - Tx: Service + TxV: Service + + Clone + + Send + + 'static, + TxP: Service + Clone + Send + 'static, { - pub fn new(context_svc: C, tx_verifier_svc: Tx) -> BlockVerifierService { + pub fn new( + context_svc: C, + tx_verifier_svc: TxV, + tx_pool: TxP, + ) -> BlockVerifierService { BlockVerifierService { context_svc, tx_verifier_svc, + tx_pool, } } } -impl Service for BlockVerifierService +impl Service for BlockVerifierService where C: Service + Clone + Send + 'static, C::Future: Send + 'static, - Tx: Service + + TxV: Service + Clone + Send + 'static, - Tx::Future: Send + 'static, + TxV::Future: Send + 'static, + + TxP: Service + + Clone + + Send + + 'static, + TxP::Future: Send + 'static, { type Response = VerifiedBlockInformation; type Error = ConsensusError; @@ -92,15 +104,12 @@ where fn call(&mut self, req: VerifyBlockRequest) -> Self::Future { let context_svc = self.context_svc.clone(); let tx_verifier_svc = self.tx_verifier_svc.clone(); + let tx_pool = self.tx_pool.clone(); async move { match req { - VerifyBlockRequest::MainChainBatchSetupVerify(block, txs) => { - batch_setup_verify_main_chain_block(block, txs, context_svc, tx_verifier_svc) - .await - } - VerifyBlockRequest::MainChain(block, txs) => { - verify_main_chain_block(block, txs, context_svc, tx_verifier_svc).await + VerifyBlockRequest::MainChain(block) => { + verify_main_chain_block(block, context_svc, tx_verifier_svc, tx_pool).await } } } @@ -108,18 +117,22 @@ where } } -async fn verify_main_chain_block( +async fn verify_main_chain_block( block: Block, - txs: Vec>, context_svc: C, - tx_verifier_svc: Tx, + tx_verifier_svc: TxV, + tx_pool: TxP, ) -> Result where C: Service + Send + 'static, C::Future: Send + 'static, - Tx: Service, + TxV: Service, + TxP: Service + + Clone + + Send + + 'static, { tracing::debug!("getting blockchain context"); let checked_context = context_svc @@ -128,10 +141,14 @@ where .map_err(Into::::into)?; // TODO: should we unwrap here, we did just get the data so it should be ok. - let context = checked_context.blockchain_context().unwrap(); + let context = checked_context.blockchain_context().unwrap().clone(); tracing::debug!("got blockchain context: {:?}", context); + let TxPoolResponse::Transactions(txs) = tx_pool + .oneshot(TxPoolRequest::Transactions(block.txs.clone())) + .await?; + let block_weight = block.miner_tx.weight() + txs.iter().map(|tx| tx.tx_weight).sum::(); let total_fees = txs.iter().map(|tx| tx.fee).sum::(); @@ -141,6 +158,7 @@ where current_chain_height: context.chain_height, time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), hf: context.current_hard_fork, + re_org_token: context.re_org_token.clone(), }) .await?; @@ -196,102 +214,3 @@ where cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty, }) } - -async fn batch_setup_verify_main_chain_block( - block: Block, - txs: Vec, - context_svc: C, - tx_verifier_svc: Tx, -) -> Result -where - C: Service - + Send - + 'static, - C::Future: Send + 'static, - Tx: Service, -{ - tracing::debug!("getting blockchain context"); - let checked_context = context_svc - .oneshot(BlockChainContextRequest) - .await - .map_err(Into::::into)?; - - // TODO: should we unwrap here, we did just get the data so it should be ok. - let context = checked_context.blockchain_context().unwrap(); - - tracing::debug!("got blockchain context: {:?}", context); - - // TODO: reorder these tests so we do the cheap tests first. - - let txs = if !txs.is_empty() { - let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier_svc - .oneshot(VerifyTxRequest::BatchSetupVerifyBlock { - txs, - current_chain_height: context.chain_height, - time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), - hf: context.current_hard_fork, - }) - .await? - else { - panic!("tx verifier sent incorrect response!"); - }; - txs - } else { - vec![] - }; - - let block_weight = block.miner_tx.weight() + txs.iter().map(|tx| tx.tx_weight).sum::(); - let total_fees = txs.iter().map(|tx| tx.fee).sum::(); - - let generated_coins = miner_tx::check_miner_tx( - &block.miner_tx, - total_fees, - context.chain_height, - block_weight, - context.median_weight_for_block_reward, - context.already_generated_coins, - &context.current_hard_fork, - )?; - - let hashing_blob = block.serialize_hashable(); - - checks::block_size_sanity_check(block.serialize().len(), context.effective_median_weight)?; - checks::block_weight_check(block_weight, context.median_weight_for_block_reward)?; - - checks::check_amount_txs(block.txs.len())?; - checks::check_prev_id(&block, &context.top_hash)?; - if let Some(median_timestamp) = context.median_block_timestamp { - // will only be None for the first 60 blocks - checks::check_timestamp(&block, median_timestamp)?; - } - - // do POW test last - let pow_hash = tokio::task::spawn_blocking(move || { - hash_worker::calculate_pow_hash( - &hashing_blob, - context.chain_height, - &context.current_hard_fork, - ) - }) - .await - .unwrap()?; - - checks::check_block_pow(&pow_hash, context.next_difficulty)?; - - context - .current_hard_fork - .check_block_version_vote(&block.header)?; - - Ok(VerifiedBlockInformation { - block_hash: block.hash(), - block, - txs, - pow_hash, - generated_coins, - weight: block_weight, - height: context.chain_height, - long_term_weight: context.next_block_long_term_weight(block_weight), - hf_vote: HardFork::V1, - cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty, - }) -} diff --git a/consensus/src/context.rs b/consensus/src/context.rs index da24874..aa58882 100644 --- a/consensus/src/context.rs +++ b/consensus/src/context.rs @@ -16,7 +16,6 @@ use std::{ use futures::FutureExt; use tokio::sync::RwLock; -use tokio_util::sync::CancellationToken; use tower::{Service, ServiceExt}; use crate::{helper::current_time, ConsensusError, Database, DatabaseRequest, DatabaseResponse}; @@ -27,9 +26,11 @@ mod weight; #[cfg(test)] mod tests; +mod tokens; pub use difficulty::DifficultyCacheConfig; pub use hardforks::{HardFork, HardForkConfig}; +pub use tokens::*; pub use weight::BlockWeightsCacheConfig; const BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW: u64 = 60; @@ -118,7 +119,8 @@ where let context_svc = BlockChainContextService { internal_blockchain_context: Arc::new( InternalBlockChainContext { - current_validity_token: CancellationToken::new(), + current_validity_token: ValidityToken::new(), + current_reorg_token: ReOrgToken::new(), difficulty_cache: difficulty_cache_handle.await.unwrap()?, weight_cache: weight_cache_handle.await.unwrap()?, hardfork_state: hardfork_state_handle.await.unwrap()?, @@ -137,7 +139,7 @@ where /// Raw blockchain context, gotten from [`BlockChainContext`]. This data may turn invalid so is not ok to keep /// around. You should keep around [`BlockChainContext`] instead. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub struct RawBlockChainContext { /// The next blocks difficulty. pub next_difficulty: u128, @@ -161,6 +163,8 @@ pub struct RawBlockChainContext { pub top_hash: [u8; 32], /// The current hard fork. pub current_hard_fork: HardFork, + /// A token which is used to signal if a reorg has happened since creating the token. + pub re_org_token: ReOrgToken, } impl RawBlockChainContext { @@ -208,7 +212,7 @@ impl RawBlockChainContext { #[derive(Debug, Clone)] pub struct BlockChainContext { /// A token representing this data's validity. - validity_token: CancellationToken, + validity_token: ValidityToken, /// The actual block chain context. raw: RawBlockChainContext, } @@ -220,16 +224,16 @@ pub struct DataNoLongerValid; impl BlockChainContext { /// Checks if the data is still valid. pub fn is_still_valid(&self) -> bool { - !self.validity_token.is_cancelled() + self.validity_token.is_data_valid() } /// Checks if the data is valid returning an Err if not and a reference to the blockchain context if /// it is. - pub fn blockchain_context(&self) -> Result { + pub fn blockchain_context(&self) -> Result<&RawBlockChainContext, DataNoLongerValid> { if !self.is_still_valid() { return Err(DataNoLongerValid); } - Ok(self.raw) + Ok(&self.raw) } } @@ -240,7 +244,9 @@ pub struct BlockChainContextRequest; struct InternalBlockChainContext { /// A token used to invalidate previous contexts when a new /// block is added to the chain. - current_validity_token: CancellationToken, + current_validity_token: ValidityToken, + /// A token which is used to signal a reorg has happened. + current_reorg_token: ReOrgToken, difficulty_cache: difficulty::DifficultyCache, weight_cache: weight::BlockWeightsCache, @@ -274,6 +280,7 @@ impl Service for BlockChainContextService { let InternalBlockChainContext { current_validity_token, + current_reorg_token, difficulty_cache, weight_cache, hardfork_state, @@ -285,7 +292,7 @@ impl Service for BlockChainContextService { let current_hf = hardfork_state.current_hardfork(); Ok(BlockChainContext { - validity_token: current_validity_token.child_token(), + validity_token: current_validity_token.clone(), raw: RawBlockChainContext { next_difficulty: difficulty_cache.next_difficulty(¤t_hf), cumulative_difficulty: difficulty_cache.cumulative_difficulty(), @@ -302,6 +309,7 @@ impl Service for BlockChainContextService { chain_height: *chain_height, top_hash: *top_block_hash, current_hard_fork: current_hf, + re_org_token: current_reorg_token.clone(), }, }) } @@ -339,6 +347,7 @@ impl tower::Service for BlockChainContextService { let InternalBlockChainContext { current_validity_token, + current_reorg_token: _, difficulty_cache, weight_cache, hardfork_state, @@ -348,7 +357,7 @@ impl tower::Service for BlockChainContextService { } = internal_blockchain_context_lock.deref_mut(); // Cancel the validity token and replace it with a new one. - std::mem::replace(current_validity_token, CancellationToken::new()).cancel(); + std::mem::replace(current_validity_token, ValidityToken::new()).set_data_invalid(); difficulty_cache.new_block(new.height, new.timestamp, new.cumulative_difficulty); diff --git a/consensus/src/context/tokens.rs b/consensus/src/context/tokens.rs new file mode 100644 index 0000000..e86f7c8 --- /dev/null +++ b/consensus/src/context/tokens.rs @@ -0,0 +1,45 @@ +use tokio_util::sync::CancellationToken; + +/// A token representing if a piece of data is valid. +#[derive(Debug, Clone, Default)] +pub struct ValidityToken { + token: CancellationToken, +} + +impl ValidityToken { + pub fn new() -> ValidityToken { + ValidityToken { + token: CancellationToken::new(), + } + } + + pub fn is_data_valid(&self) -> bool { + !self.token.is_cancelled() + } + + pub fn set_data_invalid(self) { + self.token.cancel() + } +} + +/// A token representing if a re-org has happened since it's creation. +#[derive(Debug, Clone, Default)] +pub struct ReOrgToken { + token: CancellationToken, +} + +impl ReOrgToken { + pub fn new() -> ReOrgToken { + ReOrgToken { + token: CancellationToken::new(), + } + } + + pub fn reorg_happened(&self) -> bool { + self.token.is_cancelled() + } + + pub fn set_reorg_happened(self) { + self.token.cancel() + } +} diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 228b75f..ddc7f4e 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -1,4 +1,8 @@ -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + future::Future, + sync::Arc, +}; pub mod block; pub mod context; @@ -11,12 +15,17 @@ mod test_utils; pub mod transactions; pub use block::{VerifiedBlockInformation, VerifyBlockRequest}; -pub use context::{ContextConfig, HardFork, UpdateBlockchainCacheRequest}; +pub use context::{ + initialize_blockchain_context, BlockChainContext, BlockChainContextRequest, ContextConfig, + HardFork, UpdateBlockchainCacheRequest, +}; pub use transactions::{VerifyTxRequest, VerifyTxResponse}; -pub async fn initialize_verifier( +// TODO: instead of (ab)using generic returns return the acc type +pub async fn initialize_verifier( database: D, - cfg: ContextConfig, + tx_pool: TxP, + ctx_svc: Ctx, ) -> Result< ( impl tower::Service< @@ -24,20 +33,39 @@ pub async fn initialize_verifier( Response = VerifiedBlockInformation, Error = ConsensusError, >, - impl tower::Service, - impl tower::Service, + impl tower::Service< + VerifyTxRequest, + Response = VerifyTxResponse, + Error = ConsensusError, + Future = impl Future> + Send + 'static, + > + Clone + + Send + + 'static, ), ConsensusError, > where D: Database + Clone + Send + Sync + 'static, D::Future: Send + 'static, + TxP: tower::Service + + Clone + + Send + + Sync + + 'static, + TxP::Future: Send + 'static, + Ctx: tower::Service< + BlockChainContextRequest, + Response = BlockChainContext, + Error = tower::BoxError, + > + Clone + + Send + + Sync + + 'static, + Ctx::Future: Send + 'static, { - let (context_svc, context_svc_updater) = - context::initialize_blockchain_context(cfg, database.clone()).await?; let tx_svc = transactions::TxVerifierService::new(database); - let block_svc = block::BlockVerifierService::new(context_svc.clone(), tx_svc.clone()); - Ok((block_svc, tx_svc, context_svc_updater)) + let block_svc = block::BlockVerifierService::new(ctx_svc, tx_svc.clone(), tx_pool); + Ok((block_svc, tx_svc)) } // TODO: split this enum up. @@ -71,6 +99,8 @@ pub enum ConsensusError { InvalidHardForkVersion(&'static str), #[error("The block has a different previous hash than expected")] BlockIsNotApartOfChain, + #[error("One or more transaction is not in the transaction pool")] + TxNotInPool(#[from] TxNotInPool), #[error("Database error: {0}")] Database(#[from] tower::BoxError), } @@ -137,6 +167,7 @@ pub enum DatabaseResponse { Outputs(HashMap>), NumberOutputsWithAmount(usize), + /// returns true if key images are spent CheckKIsNotSpent(bool), #[cfg(feature = "binaries")] @@ -147,3 +178,15 @@ pub enum DatabaseResponse { )>, ), } + +#[derive(Debug, Copy, Clone, thiserror::Error)] +#[error("The transaction requested was not in the transaction pool")] +pub struct TxNotInPool; + +pub enum TxPoolRequest { + Transactions(Vec<[u8; 32]>), +} + +pub enum TxPoolResponse { + Transactions(Vec>), +} diff --git a/consensus/src/transactions.rs b/consensus/src/transactions.rs index f48fe18..ea18ac9 100644 --- a/consensus/src/transactions.rs +++ b/consensus/src/transactions.rs @@ -10,14 +10,16 @@ use std::{ use futures::FutureExt; use monero_serai::transaction::Transaction; use rayon::prelude::*; -use tower::Service; +use tower::{Service, ServiceExt}; use tracing::instrument; -use crate::{ConsensusError, Database, HardFork}; +use crate::{ + context::ReOrgToken, ConsensusError, Database, DatabaseRequest, DatabaseResponse, HardFork, +}; +mod contextual_data; mod inputs; pub(crate) mod outputs; -mod ring; mod sigs; mod time_lock; @@ -49,7 +51,7 @@ pub struct TransactionVerificationData { pub tx_hash: [u8; 32], /// We put this behind a mutex as the information is not constant and is based of past outputs idxs /// which could change on re-orgs. - rings_member_info: std::sync::Mutex>, + rings_member_info: std::sync::Mutex>, } impl TransactionVerificationData { @@ -73,16 +75,14 @@ pub enum VerifyTxRequest { current_chain_height: u64, time_for_time_lock: u64, hf: HardFork, + re_org_token: ReOrgToken, }, - /// Batches the setup of [`TransactionVerificationData`]. - BatchSetup { txs: Vec, hf: HardFork }, - /// Batches the setup of [`TransactionVerificationData`] and verifies the transactions - /// in the context of a block. - BatchSetupVerifyBlock { + /// Batches the setup of [`TransactionVerificationData`], does *minimal* verification, you need to call [`VerifyTxRequest::Block`] + /// with the returned data. + BatchSetup { txs: Vec, - current_chain_height: u64, - time_for_time_lock: u64, hf: HardFork, + re_org_token: ReOrgToken, }, } @@ -129,65 +129,30 @@ where current_chain_height, time_for_time_lock, hf, + re_org_token, } => verify_transactions_for_block( database, txs, current_chain_height, time_for_time_lock, hf, + re_org_token, ) .boxed(), - VerifyTxRequest::BatchSetup { txs, hf } => { - batch_setup_transactions(database, txs, hf).boxed() - } - VerifyTxRequest::BatchSetupVerifyBlock { + VerifyTxRequest::BatchSetup { txs, - current_chain_height, - time_for_time_lock, hf, - } => batch_setup_verify_transactions_for_block( - database, - txs, - current_chain_height, - time_for_time_lock, - hf, - ) - .boxed(), + re_org_token, + } => batch_setup_transactions(database, txs, hf, re_org_token).boxed(), } } } -async fn set_missing_ring_members( - database: D, - txs: &[Arc], - hf: &HardFork, -) -> Result<(), ConsensusError> -where - D: Database + Clone + Sync + Send + 'static, -{ - // TODO: handle re-orgs. - - let txs_needing_ring_members = txs - .iter() - // Safety: we must not hold the mutex lock for long to not block the async runtime. - .filter(|tx| tx.rings_member_info.lock().unwrap().is_none()) - .cloned() - .collect::>(); - - tracing::debug!( - "Retrieving ring members for {} txs", - txs_needing_ring_members.len() - ); - - ring::batch_fill_ring_member_info(&txs_needing_ring_members, hf, database).await?; - - Ok(()) -} - async fn batch_setup_transactions( database: D, txs: Vec, hf: HardFork, + re_org_token: ReOrgToken, ) -> Result where D: Database + Clone + Sync + Send + 'static, @@ -201,41 +166,11 @@ where .await .unwrap()?; - set_missing_ring_members(database, &txs, &hf).await?; + contextual_data::batch_fill_ring_member_info(&txs, &hf, re_org_token, database).await?; Ok(VerifyTxResponse::BatchSetupOk(txs)) } -async fn batch_setup_verify_transactions_for_block( - database: D, - txs: Vec, - current_chain_height: u64, - time_for_time_lock: u64, - hf: HardFork, -) -> Result -where - D: Database + Clone + Sync + Send + 'static, -{ - // Move out of the async runtime and use rayon to parallelize the serialisation and hashing of the txs. - let txs = tokio::task::spawn_blocking(|| { - txs.into_par_iter() - .map(|tx| Ok(Arc::new(TransactionVerificationData::new(tx)?))) - .collect::, ConsensusError>>() - }) - .await - .unwrap()?; - - verify_transactions_for_block( - database, - txs.clone(), - current_chain_height, - time_for_time_lock, - hf, - ) - .await?; - Ok(VerifyTxResponse::BatchSetupOk(txs)) -} - #[instrument(name = "verify_txs", skip_all, level = "info")] async fn verify_transactions_for_block( database: D, @@ -243,16 +178,19 @@ async fn verify_transactions_for_block( current_chain_height: u64, time_for_time_lock: u64, hf: HardFork, + re_org_token: ReOrgToken, ) -> Result where D: Database + Clone + Sync + Send + 'static, { tracing::debug!("Verifying transactions for block, amount: {}", txs.len()); - set_missing_ring_members(database, &txs, &hf).await?; + contextual_data::batch_refresh_ring_member_info(&txs, &hf, re_org_token, database.clone()) + .await?; let spent_kis = Arc::new(std::sync::Mutex::new(HashSet::new())); + let cloned_spent_kis = spent_kis.clone(); tokio::task::spawn_blocking(move || { txs.par_iter().try_for_each(|tx| { verify_transaction_for_block( @@ -260,13 +198,28 @@ where current_chain_height, time_for_time_lock, hf, - spent_kis.clone(), + cloned_spent_kis.clone(), ) }) }) .await .unwrap()?; + let DatabaseResponse::CheckKIsNotSpent(kis_spent) = database + .oneshot(DatabaseRequest::CheckKIsNotSpent( + Arc::into_inner(spent_kis).unwrap().into_inner().unwrap(), + )) + .await? + else { + panic!("Database sent incorrect response!"); + }; + + if kis_spent { + return Err(ConsensusError::TransactionHasInvalidInput( + "One or more key image spent!", + )); + } + Ok(VerifyTxResponse::Ok) } @@ -329,7 +282,7 @@ fn verify_transaction_for_block( /// /// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#version fn check_tx_version( - decoy_info: &Option, + decoy_info: &Option, version: &TxVersion, hf: &HardFork, ) -> Result<(), ConsensusError> { diff --git a/consensus/src/transactions/ring.rs b/consensus/src/transactions/contextual_data.rs similarity index 71% rename from consensus/src/transactions/ring.rs rename to consensus/src/transactions/contextual_data.rs index 6b70b29..f272197 100644 --- a/consensus/src/transactions/ring.rs +++ b/consensus/src/transactions/contextual_data.rs @@ -1,4 +1,4 @@ -//! # Rings +//! # Contextual Data //! //! This module contains [`TxRingMembersInfo`] which is a struct made up from blockchain information about the //! ring members of inputs. This module does minimal consensus checks, only when needed, and should not be relied @@ -6,7 +6,12 @@ //! //! The data collected by this module can be used to perform consensus checks. //! +//! ## Why not use the context service? +//! +//! Because this data is unique for *every* transaction and the context service is just for blockchain state data. +//! +use std::ops::Deref; use std::{ cmp::{max, min}, collections::{HashMap, HashSet}, @@ -21,17 +26,102 @@ use monero_serai::{ use tower::ServiceExt; use crate::{ - transactions::TransactionVerificationData, ConsensusError, Database, DatabaseRequest, - DatabaseResponse, HardFork, OutputOnChain, + context::ReOrgToken, transactions::TransactionVerificationData, ConsensusError, Database, + DatabaseRequest, DatabaseResponse, HardFork, OutputOnChain, }; +pub async fn batch_refresh_ring_member_info( + txs_verification_data: &[Arc], + hf: &HardFork, + re_org_token: ReOrgToken, + database: D, +) -> Result<(), ConsensusError> { + let (txs_needing_full_refresh, txs_needing_partial_refresh) = + ring_member_info_needing_refresh(txs_verification_data, hf); + + batch_fill_ring_member_info( + &txs_needing_full_refresh, + hf, + re_org_token, + database.clone(), + ) + .await?; + + for tx_v_data in txs_needing_partial_refresh { + let decoy_info = if hf != &HardFork::V1 { + // this data is only needed after hard-fork 1. + Some(DecoyInfo::new(&tx_v_data.tx.prefix.inputs, hf, database.clone()).await?) + } else { + None + }; + + // Temporarily acquirer the mutex lock to add the ring member info. + tx_v_data + .rings_member_info + .lock() + .unwrap() + .as_mut() + // this unwrap is safe as otherwise this would require a full refresh not a partial one. + .unwrap() + .decoy_info = decoy_info; + } + + Ok(()) +} + +/// This function returns the transaction verification datas that need refreshing. +/// +/// The first returned vec needs a full refresh. +/// The second returned vec only needs a partial refresh. +/// +/// A full refresh is a refresh of all the ring members and the decoy info. +/// A partial refresh is just a refresh of the decoy info. +fn ring_member_info_needing_refresh( + txs_verification_data: &[Arc], + hf: &HardFork, +) -> ( + Vec>, + Vec>, +) { + let mut txs_needing_full_refresh = Vec::new(); + let mut txs_needing_partial_refresh = Vec::new(); + + for tx in txs_verification_data { + let tx_ring_member_info = tx.rings_member_info.lock().unwrap(); + + // if we don't have ring members or if a re-org has happened or if we changed hf do a full refresh. + // doing a full refresh each hf isn't needed now but its so rare it makes sense to just do a full one. + if let Some(tx_ring_member_info) = tx_ring_member_info.deref() { + if tx_ring_member_info.re_org_token.reorg_happened() || &tx_ring_member_info.hf != hf { + txs_needing_full_refresh.push(tx.clone()); + continue; + } + } else { + txs_needing_full_refresh.push(tx.clone()); + continue; + } + + // if any input does not have a 0 amount do a partial refresh, this is because some decoy info + // data is based on the amount of non-ringCT outputs at a certain point. + if tx.tx.prefix.inputs.iter().any(|inp| match inp { + Input::Gen(_) => false, + Input::ToKey { amount, .. } => amount.is_some(), + }) { + txs_needing_partial_refresh.push(tx.clone()); + } + } + + (txs_needing_full_refresh, txs_needing_partial_refresh) +} + /// Fills the `rings_member_info` field on the inputted [`TransactionVerificationData`]. /// /// This function batch gets all the ring members for the inputted transactions and fills in data about -/// them, like the youngest used out and the time locks. +/// them. pub async fn batch_fill_ring_member_info( txs_verification_data: &[Arc], hf: &HardFork, + re_org_token: ReOrgToken, mut database: D, ) -> Result<(), ConsensusError> { let mut output_ids = HashMap::new(); @@ -54,6 +144,7 @@ pub async fn batch_fill_ring_member_info, pub youngest_used_out_height: u64, pub time_locked_outs: Vec, + /// A token used to check if a re org has happened since getting this data. + re_org_token: ReOrgToken, + /// The hard-fork this data was retrived for. + hf: HardFork, } impl TxRingMembersInfo { @@ -173,6 +270,8 @@ impl TxRingMembersInfo { used_outs: Vec>, decoy_info: Option, rct_type: RctType, + hf: HardFork, + re_org_token: ReOrgToken, ) -> TxRingMembersInfo { TxRingMembersInfo { youngest_used_out_height: used_outs @@ -200,7 +299,9 @@ impl TxRingMembersInfo { }) .collect(), rings: Rings::new(used_outs, rct_type), + re_org_token, decoy_info, + hf, } } } @@ -242,7 +343,13 @@ fn get_ring_members_for_inputs<'a>( .collect::>() } -/// A struct holding information about the inputs and their decoys. +/// A struct holding information about the inputs and their decoys. This data can vary by block so +/// this data needs to be retrieved after every change in the blockchain. +/// +/// This data *does not* need to be refreshed if one of these are true: +/// +/// - The input amounts are *ALL* 0 +/// - The top block hash is the same as when this data was retrieved. /// /// https://cuprate.github.io/monero-book/consensus_rules/transactions/decoys.html #[derive(Debug)] @@ -298,6 +405,7 @@ impl DecoyInfo { mixable += 1; } } else { + // ringCT amounts are always mixable. mixable += 1; } @@ -331,13 +439,13 @@ impl DecoyInfo { /// /// https://cuprate.github.io/monero-book/consensus_rules/transactions/decoys.html#minimum-amount-of-decoys pub(crate) fn minimum_decoys(hf: &HardFork) -> usize { - use HardFork::*; + use HardFork as HF; match hf { - V1 => panic!("hard-fork 1 does not use these rules!"), - V2 | V3 | V4 | V5 => 2, - V6 => 4, - V7 => 6, - V8 | V9 | V10 | V11 | V12 | V13 | V14 => 10, - _ => 15, + HF::V1 => panic!("hard-fork 1 does not use these rules!"), + HF::V2 | HF::V3 | HF::V4 | HF::V5 => 2, + HF::V6 => 4, + HF::V7 => 6, + HF::V8 | HF::V9 | HF::V10 | HF::V11 | HF::V12 | HF::V13 | HF::V14 => 10, + HF::V15 | HF::V16 => 15, } } diff --git a/consensus/src/transactions/inputs.rs b/consensus/src/transactions/inputs.rs index 79426c7..253eec1 100644 --- a/consensus/src/transactions/inputs.rs +++ b/consensus/src/transactions/inputs.rs @@ -9,7 +9,7 @@ use monero_serai::transaction::Input; use crate::{ transactions::{ - ring::{minimum_decoys, DecoyInfo, TxRingMembersInfo}, + contextual_data::{minimum_decoys, DecoyInfo, TxRingMembersInfo}, TxVersion, }, ConsensusError, HardFork, diff --git a/consensus/src/transactions/sigs.rs b/consensus/src/transactions/sigs.rs index 1a5b623..49e0a21 100644 --- a/consensus/src/transactions/sigs.rs +++ b/consensus/src/transactions/sigs.rs @@ -1,6 +1,6 @@ use monero_serai::transaction::Transaction; -use crate::{transactions::ring::Rings, ConsensusError}; +use crate::{transactions::contextual_data::Rings, ConsensusError}; mod ring_sigs;