mirror of
https://github.com/hinto-janai/cuprate.git
synced 2025-01-18 16:54:32 +00:00
consensus: add a tx pool trait.
This means we don't have to rely on people giving the verifier the correct txs for a block. Also allows some speedup as we can put the fetching of outputs on a different task.
This commit is contained in:
parent
34bb293f95
commit
fc7b676f7b
10 changed files with 640 additions and 434 deletions
|
@ -1,28 +1,29 @@
|
||||||
#![cfg(feature = "binaries")]
|
#![cfg(feature = "binaries")]
|
||||||
|
|
||||||
use std::path::Path;
|
|
||||||
use std::{
|
use std::{
|
||||||
ops::Range,
|
ops::Range,
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
sync::{Arc, RwLock},
|
sync::{Arc, RwLock},
|
||||||
};
|
};
|
||||||
|
|
||||||
use monero_serai::{block::Block, transaction::Transaction};
|
use futures::{channel::mpsc, SinkExt, StreamExt};
|
||||||
use tower::ServiceExt;
|
use monero_serai::block::Block;
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
use tower::{Service, ServiceExt};
|
||||||
use tracing::level_filters::LevelFilter;
|
use tracing::level_filters::LevelFilter;
|
||||||
|
|
||||||
use cuprate_common::Network;
|
use cuprate_common::Network;
|
||||||
|
|
||||||
use monero_consensus::{
|
use monero_consensus::{
|
||||||
context::{ContextConfig, UpdateBlockchainCacheRequest},
|
context::{ContextConfig, UpdateBlockchainCacheRequest},
|
||||||
initialize_verifier,
|
initialize_blockchain_context, initialize_verifier,
|
||||||
rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig},
|
rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig},
|
||||||
transactions::VerifyTxRequest,
|
Database, DatabaseRequest, DatabaseResponse, VerifiedBlockInformation, VerifyBlockRequest,
|
||||||
ConsensusError, Database, DatabaseRequest, DatabaseResponse, HardFork,
|
|
||||||
VerifiedBlockInformation, VerifyBlockRequest, VerifyTxResponse,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
const MAX_BLOCKS_IN_RANGE: u64 = 300;
|
mod tx_pool;
|
||||||
|
|
||||||
|
const MAX_BLOCKS_IN_RANGE: u64 = 1000;
|
||||||
const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 250;
|
const MAX_BLOCKS_HEADERS_IN_RANGE: u64 = 250;
|
||||||
|
|
||||||
/// Calls for a batch of blocks, returning the response and the time it took.
|
/// Calls for a batch of blocks, returning the response and the time it took.
|
||||||
|
@ -35,24 +36,6 @@ async fn call_batch<D: Database>(
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
fn simple_get_hf(height: u64) -> HardFork {
|
|
||||||
match height {
|
|
||||||
0..=1009826 => HardFork::V1,
|
|
||||||
1009827..=1141316 => HardFork::V2,
|
|
||||||
1141317..=1220515 => HardFork::V3,
|
|
||||||
_ => todo!("rules past v3"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_hf_height(hf: &HardFork) -> u64 {
|
|
||||||
match hf {
|
|
||||||
HardFork::V1 => 0,
|
|
||||||
HardFork::V2 => 1009827,
|
|
||||||
HardFork::V3 => 1141317,
|
|
||||||
_ => todo!("rules past v3"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_cache_and_context<Ctx>(
|
async fn update_cache_and_context<Ctx>(
|
||||||
cache: &RwLock<ScanningCache>,
|
cache: &RwLock<ScanningCache>,
|
||||||
context_updater: &mut Ctx,
|
context_updater: &mut Ctx,
|
||||||
|
@ -86,77 +69,54 @@ where
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Batches all transactions together when getting outs
|
async fn call_blocks<D>(
|
||||||
///
|
mut new_tx_chan: tx_pool::NewTxChanSen,
|
||||||
/// TODO: reduce the amount of parameters of this function
|
mut block_chan: mpsc::Sender<Vec<Block>>,
|
||||||
#[allow(clippy::too_many_arguments)]
|
start_height: u64,
|
||||||
async fn batch_txs_verify_blocks<Tx, Blk, Ctx>(
|
chain_height: u64,
|
||||||
cache: &RwLock<ScanningCache>,
|
database: D,
|
||||||
save_file: &Path,
|
|
||||||
txs: Vec<Vec<Transaction>>,
|
|
||||||
blocks: Vec<Block>,
|
|
||||||
tx_verifier: &mut Tx,
|
|
||||||
block_verifier: &mut Blk,
|
|
||||||
context_updater: &mut Ctx,
|
|
||||||
current_height: u64,
|
|
||||||
hf: HardFork,
|
|
||||||
) -> Result<(), tower::BoxError>
|
) -> Result<(), tower::BoxError>
|
||||||
where
|
where
|
||||||
Blk: tower::Service<
|
D: Database + Clone + Send + Sync + 'static,
|
||||||
VerifyBlockRequest,
|
D::Future: Send + 'static,
|
||||||
Response = VerifiedBlockInformation,
|
|
||||||
Error = ConsensusError,
|
|
||||||
>,
|
|
||||||
Tx: tower::Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>,
|
|
||||||
Ctx: tower::Service<UpdateBlockchainCacheRequest, Response = (), Error = tower::BoxError>,
|
|
||||||
{
|
{
|
||||||
// This is an optimisation, we batch ALL the transactions together to get their outputs, saving a
|
let mut next_fut = tokio::spawn(call_batch(
|
||||||
// massive amount of time at the cost of inaccurate data, specifically the only thing that's inaccurate
|
start_height..(start_height + MAX_BLOCKS_IN_RANGE).min(chain_height),
|
||||||
// is the amount of outputs at a certain time and as this would be lower (so more strict) than the true value
|
database.clone(),
|
||||||
// this will fail when this is an issue.
|
));
|
||||||
let mut txs_per_block = [0; (MAX_BLOCKS_IN_RANGE * 3) as usize];
|
|
||||||
let txs = txs
|
|
||||||
.into_iter()
|
|
||||||
.enumerate()
|
|
||||||
.flat_map(|(block_id, block_batch_txs)| {
|
|
||||||
// block id is just this blocks position in the batch.
|
|
||||||
txs_per_block[block_id] = block_batch_txs.len();
|
|
||||||
block_batch_txs
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier
|
for next_batch_start in (start_height..chain_height)
|
||||||
.ready()
|
.step_by(MAX_BLOCKS_IN_RANGE as usize)
|
||||||
.await?
|
.skip(1)
|
||||||
.call(VerifyTxRequest::BatchSetup { txs, hf })
|
{
|
||||||
.await?
|
// Call the next batch while we handle this batch.
|
||||||
else {
|
let current_fut = std::mem::replace(
|
||||||
panic!("tx verifier returned incorrect response");
|
&mut next_fut,
|
||||||
};
|
tokio::spawn(call_batch(
|
||||||
|
next_batch_start..(next_batch_start + MAX_BLOCKS_IN_RANGE).min(chain_height),
|
||||||
let mut done_txs = 0;
|
database.clone(),
|
||||||
for (block_id, block) in blocks.into_iter().enumerate() {
|
)),
|
||||||
// block id is just this blocks position in the batch.
|
|
||||||
let txs = &txs[done_txs..done_txs + txs_per_block[block_id]];
|
|
||||||
done_txs += txs_per_block[block_id];
|
|
||||||
|
|
||||||
let verified_block_info: VerifiedBlockInformation = block_verifier
|
|
||||||
.ready()
|
|
||||||
.await?
|
|
||||||
.call(VerifyBlockRequest::MainChain(block, txs.into()))
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
tracing::info!(
|
|
||||||
"verified block: {}",
|
|
||||||
current_height + u64::try_from(block_id).unwrap()
|
|
||||||
);
|
);
|
||||||
|
|
||||||
update_cache_and_context(cache, context_updater, verified_block_info).await?;
|
let DatabaseResponse::BlockBatchInRange(blocks) = current_fut.await?? else {
|
||||||
|
panic!("Database sent incorrect response!");
|
||||||
|
};
|
||||||
|
|
||||||
if (current_height + u64::try_from(block_id).unwrap()) % 25000 == 0 {
|
tracing::info!(
|
||||||
tracing::info!("Saving cache to: {}", save_file.display());
|
"Handling batch: {:?}, chain height: {}",
|
||||||
cache.read().unwrap().save(save_file)?;
|
(next_batch_start - MAX_BLOCKS_IN_RANGE)..(next_batch_start),
|
||||||
}
|
chain_height
|
||||||
|
);
|
||||||
|
|
||||||
|
let (blocks, txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
new_tx_chan
|
||||||
|
.send((txs.into_iter().flatten().collect(), tx))
|
||||||
|
.await?;
|
||||||
|
rx.await??;
|
||||||
|
|
||||||
|
block_chan.send(blocks).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -164,8 +124,8 @@ where
|
||||||
|
|
||||||
async fn scan_chain<D>(
|
async fn scan_chain<D>(
|
||||||
cache: Arc<RwLock<ScanningCache>>,
|
cache: Arc<RwLock<ScanningCache>>,
|
||||||
save_file: PathBuf,
|
_save_file: PathBuf,
|
||||||
rpc_config: Arc<RwLock<RpcConfig>>,
|
_rpc_config: Arc<RwLock<RpcConfig>>,
|
||||||
database: D,
|
database: D,
|
||||||
) -> Result<(), tower::BoxError>
|
) -> Result<(), tower::BoxError>
|
||||||
where
|
where
|
||||||
|
@ -181,109 +141,37 @@ where
|
||||||
|
|
||||||
let config = ContextConfig::main_net();
|
let config = ContextConfig::main_net();
|
||||||
|
|
||||||
let (mut block_verifier, mut transaction_verifier, mut context_updater) =
|
let (ctx_svc, mut context_updater) =
|
||||||
initialize_verifier(database.clone(), config).await?;
|
initialize_blockchain_context(config, database.clone()).await?;
|
||||||
|
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
|
||||||
|
let (tx_pool_svc, new_tx_chan) = tx_pool::TxPool::spawn(rx, ctx_svc.clone()).await?;
|
||||||
|
|
||||||
|
let (mut block_verifier, transaction_verifier) =
|
||||||
|
initialize_verifier(database.clone(), tx_pool_svc, ctx_svc).await?;
|
||||||
|
|
||||||
|
tx.send(transaction_verifier).map_err(|_| "").unwrap();
|
||||||
|
|
||||||
let batch_size = rpc_config.read().unwrap().block_batch_size();
|
|
||||||
let start_height = cache.read().unwrap().height;
|
let start_height = cache.read().unwrap().height;
|
||||||
|
|
||||||
tracing::info!(
|
let (block_tx, mut incoming_blocks) = mpsc::channel(3);
|
||||||
"Initialised verifier, beginning scan from {} to {}",
|
|
||||||
start_height,
|
|
||||||
chain_height
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut next_fut = tokio::spawn(call_batch(
|
tokio::spawn(async move {
|
||||||
start_height..(start_height + batch_size).min(chain_height),
|
call_blocks(new_tx_chan, block_tx, start_height, chain_height, database).await
|
||||||
database.clone(),
|
});
|
||||||
));
|
|
||||||
|
|
||||||
let mut current_height = start_height;
|
while let Some(blocks) = incoming_blocks.next().await {
|
||||||
let mut next_batch_start_height = start_height + batch_size;
|
for block in blocks {
|
||||||
|
let verified_block_info = block_verifier
|
||||||
while next_batch_start_height < chain_height {
|
.ready()
|
||||||
// TODO: utilize dynamic batch sizes
|
.await?
|
||||||
let next_batch_size = rpc_config.read().unwrap().block_batch_size();
|
.call(VerifyBlockRequest::MainChain(block))
|
||||||
|
|
||||||
// Call the next batch while we handle this batch.
|
|
||||||
let current_fut = std::mem::replace(
|
|
||||||
&mut next_fut,
|
|
||||||
tokio::spawn(call_batch(
|
|
||||||
next_batch_start_height
|
|
||||||
..(next_batch_start_height + next_batch_size).min(chain_height),
|
|
||||||
database.clone(),
|
|
||||||
)),
|
|
||||||
);
|
|
||||||
|
|
||||||
let DatabaseResponse::BlockBatchInRange(blocks) = current_fut.await?? else {
|
|
||||||
panic!("Database sent incorrect response!");
|
|
||||||
};
|
|
||||||
|
|
||||||
tracing::info!(
|
|
||||||
"Handling batch: {:?}, chain height: {}",
|
|
||||||
current_height..(current_height + blocks.len() as u64),
|
|
||||||
chain_height
|
|
||||||
);
|
|
||||||
|
|
||||||
let (mut blocks, mut txs): (Vec<_>, Vec<_>) = blocks.into_iter().unzip();
|
|
||||||
let batch_len = u64::try_from(blocks.len()).unwrap();
|
|
||||||
|
|
||||||
let hf_start_batch = simple_get_hf(current_height);
|
|
||||||
let hf_end_batch = simple_get_hf(current_height + batch_len);
|
|
||||||
|
|
||||||
if hf_start_batch == hf_end_batch {
|
|
||||||
// we can only batch transactions on the same hard fork
|
|
||||||
batch_txs_verify_blocks(
|
|
||||||
&cache,
|
|
||||||
&save_file,
|
|
||||||
txs,
|
|
||||||
blocks,
|
|
||||||
&mut transaction_verifier,
|
|
||||||
&mut block_verifier,
|
|
||||||
&mut context_updater,
|
|
||||||
current_height,
|
|
||||||
hf_start_batch,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
current_height += batch_len;
|
|
||||||
next_batch_start_height += batch_len;
|
|
||||||
} else {
|
|
||||||
let end_hf_start = get_hf_height(&hf_end_batch);
|
|
||||||
let height_diff = (end_hf_start - current_height) as usize;
|
|
||||||
|
|
||||||
batch_txs_verify_blocks(
|
|
||||||
&cache,
|
|
||||||
&save_file,
|
|
||||||
txs.drain(0..height_diff).collect(),
|
|
||||||
blocks.drain(0..height_diff).collect(),
|
|
||||||
&mut transaction_verifier,
|
|
||||||
&mut block_verifier,
|
|
||||||
&mut context_updater,
|
|
||||||
current_height,
|
|
||||||
hf_start_batch,
|
|
||||||
)
|
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
current_height += height_diff as u64;
|
tracing::info!("verified block: {}", verified_block_info.height);
|
||||||
next_batch_start_height += height_diff as u64;
|
|
||||||
|
|
||||||
tracing::info!("Hard fork activating: {:?}", hf_end_batch);
|
update_cache_and_context(&cache, &mut context_updater, verified_block_info).await?;
|
||||||
|
|
||||||
batch_txs_verify_blocks(
|
|
||||||
&cache,
|
|
||||||
&save_file,
|
|
||||||
txs,
|
|
||||||
blocks,
|
|
||||||
&mut transaction_verifier,
|
|
||||||
&mut block_verifier,
|
|
||||||
&mut context_updater,
|
|
||||||
current_height,
|
|
||||||
hf_end_batch,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
current_height += batch_len - height_diff as u64;
|
|
||||||
next_batch_start_height += batch_len - height_diff as u64;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
241
consensus/src/bin/tx_pool.rs
Normal file
241
consensus/src/bin/tx_pool.rs
Normal file
|
@ -0,0 +1,241 @@
|
||||||
|
#![cfg(feature = "binaries")]
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
future::Future,
|
||||||
|
pin::Pin,
|
||||||
|
sync::{Arc, Mutex},
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
use futures::{channel::mpsc, FutureExt, StreamExt};
|
||||||
|
use monero_serai::transaction::Transaction;
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
|
use monero_consensus::{
|
||||||
|
context::{BlockChainContext, BlockChainContextRequest, RawBlockChainContext},
|
||||||
|
transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse},
|
||||||
|
ConsensusError, TxNotInPool, TxPoolRequest, TxPoolResponse,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct TxPoolHandle {
|
||||||
|
tx_pool_task: std::sync::Arc<tokio::task::JoinHandle<()>>,
|
||||||
|
tx_pool_chan: mpsc::Sender<(
|
||||||
|
TxPoolRequest,
|
||||||
|
oneshot::Sender<Result<TxPoolResponse, TxNotInPool>>,
|
||||||
|
)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl tower::Service<TxPoolRequest> for TxPoolHandle {
|
||||||
|
type Response = TxPoolResponse;
|
||||||
|
type Error = TxNotInPool;
|
||||||
|
type Future =
|
||||||
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
if self.tx_pool_task.is_finished() {
|
||||||
|
panic!("Tx pool task finished before it was supposed to!");
|
||||||
|
};
|
||||||
|
|
||||||
|
self.tx_pool_chan
|
||||||
|
.poll_ready(cx)
|
||||||
|
.map_err(|_| panic!("Tx pool channel closed before it was supposed to"))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: TxPoolRequest) -> Self::Future {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
self.tx_pool_chan
|
||||||
|
.try_send((req, tx))
|
||||||
|
.expect("You need to use `poll_ready` to check capacity!");
|
||||||
|
|
||||||
|
async move {
|
||||||
|
rx.await
|
||||||
|
.expect("Tx pool will always respond without dropping the sender")
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type NewTxChanRec = mpsc::Receiver<(
|
||||||
|
Vec<Transaction>,
|
||||||
|
oneshot::Sender<Result<(), tower::BoxError>>,
|
||||||
|
)>;
|
||||||
|
|
||||||
|
pub type NewTxChanSen = mpsc::Sender<(
|
||||||
|
Vec<Transaction>,
|
||||||
|
oneshot::Sender<Result<(), tower::BoxError>>,
|
||||||
|
)>;
|
||||||
|
|
||||||
|
pub struct TxPool<TxV, Ctx> {
|
||||||
|
txs: Arc<Mutex<HashMap<[u8; 32], Arc<TransactionVerificationData>>>>,
|
||||||
|
current_ctx: BlockChainContext,
|
||||||
|
tx_verifier: Option<TxV>,
|
||||||
|
tx_verifier_chan: Option<oneshot::Receiver<TxV>>,
|
||||||
|
ctx_svc: Ctx,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TxV, Ctx> TxPool<TxV, Ctx>
|
||||||
|
where
|
||||||
|
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>
|
||||||
|
+ Clone
|
||||||
|
+ Send
|
||||||
|
+ 'static,
|
||||||
|
TxV::Future: Send + 'static,
|
||||||
|
Ctx: Service<BlockChainContextRequest, Response = BlockChainContext, Error = tower::BoxError>
|
||||||
|
+ Send
|
||||||
|
+ 'static,
|
||||||
|
Ctx::Future: Send + 'static,
|
||||||
|
{
|
||||||
|
pub async fn spawn(
|
||||||
|
tx_verifier_chan: oneshot::Receiver<TxV>,
|
||||||
|
mut ctx_svc: Ctx,
|
||||||
|
) -> Result<
|
||||||
|
(
|
||||||
|
TxPoolHandle,
|
||||||
|
mpsc::Sender<(
|
||||||
|
Vec<Transaction>,
|
||||||
|
oneshot::Sender<Result<(), tower::BoxError>>,
|
||||||
|
)>,
|
||||||
|
),
|
||||||
|
tower::BoxError,
|
||||||
|
> {
|
||||||
|
let current_ctx = ctx_svc
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(BlockChainContextRequest)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let tx_pool = TxPool {
|
||||||
|
txs: Default::default(),
|
||||||
|
current_ctx,
|
||||||
|
tx_verifier: None,
|
||||||
|
tx_verifier_chan: Some(tx_verifier_chan),
|
||||||
|
ctx_svc,
|
||||||
|
};
|
||||||
|
|
||||||
|
let (tx_pool_tx, tx_pool_rx) = mpsc::channel(3);
|
||||||
|
let (new_tx_tx, new_tx_rx) = mpsc::channel(3);
|
||||||
|
|
||||||
|
let tx_pool_task = tokio::spawn(tx_pool.run(tx_pool_rx, new_tx_rx));
|
||||||
|
|
||||||
|
Ok((
|
||||||
|
TxPoolHandle {
|
||||||
|
tx_pool_task: tx_pool_task.into(),
|
||||||
|
tx_pool_chan: tx_pool_tx,
|
||||||
|
},
|
||||||
|
new_tx_tx,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_or_update_ctx(&mut self) -> Result<RawBlockChainContext, tower::BoxError> {
|
||||||
|
if let Ok(current_ctx) = self.current_ctx.blockchain_context().cloned() {
|
||||||
|
Ok(current_ctx)
|
||||||
|
} else {
|
||||||
|
self.current_ctx = self
|
||||||
|
.ctx_svc
|
||||||
|
.ready()
|
||||||
|
.await?
|
||||||
|
.call(BlockChainContextRequest)
|
||||||
|
.await?;
|
||||||
|
self.current_ctx
|
||||||
|
.blockchain_context()
|
||||||
|
.map_err(Into::into)
|
||||||
|
.cloned()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_txs_req(
|
||||||
|
&self,
|
||||||
|
req: TxPoolRequest,
|
||||||
|
tx: oneshot::Sender<Result<TxPoolResponse, TxNotInPool>>,
|
||||||
|
) {
|
||||||
|
let TxPoolRequest::Transactions(txs_to_get) = req;
|
||||||
|
|
||||||
|
let mut res = Vec::with_capacity(txs_to_get.len());
|
||||||
|
|
||||||
|
let mut txs = self.txs.lock().unwrap();
|
||||||
|
|
||||||
|
for tx_hash in txs_to_get {
|
||||||
|
let Some(tx) = txs.remove(&tx_hash) else {
|
||||||
|
let _ = tx.send(Err(TxNotInPool));
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
res.push(tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = tx.send(Ok(TxPoolResponse::Transactions(res)));
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_new_txs(
|
||||||
|
&mut self,
|
||||||
|
new_txs: Vec<Transaction>,
|
||||||
|
res_chan: oneshot::Sender<Result<(), tower::BoxError>>,
|
||||||
|
) -> Result<(), tower::BoxError> {
|
||||||
|
if self.tx_verifier.is_none() {
|
||||||
|
self.tx_verifier = Some(self.tx_verifier_chan.take().unwrap().await?);
|
||||||
|
}
|
||||||
|
|
||||||
|
let current_ctx = self.get_or_update_ctx().await?;
|
||||||
|
|
||||||
|
let mut tx_verifier = self.tx_verifier.clone().unwrap();
|
||||||
|
let tx_pool = self.txs.clone();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
// We only batch the setup a real tx pool would also call `VerifyTxRequest::Block`
|
||||||
|
let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.call(VerifyTxRequest::BatchSetup {
|
||||||
|
txs: new_txs,
|
||||||
|
hf: current_ctx.current_hard_fork,
|
||||||
|
re_org_token: current_ctx.re_org_token.clone(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
else {
|
||||||
|
panic!("Tx verifier sent incorrect response!");
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut locked_pool = tx_pool.lock().unwrap();
|
||||||
|
|
||||||
|
for tx in txs {
|
||||||
|
locked_pool.insert(tx.tx_hash, tx);
|
||||||
|
}
|
||||||
|
res_chan.send(Ok(())).unwrap();
|
||||||
|
});
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(
|
||||||
|
mut self,
|
||||||
|
mut tx_pool_handle: mpsc::Receiver<(
|
||||||
|
TxPoolRequest,
|
||||||
|
oneshot::Sender<Result<TxPoolResponse, TxNotInPool>>,
|
||||||
|
)>,
|
||||||
|
mut new_tx_channel: NewTxChanRec,
|
||||||
|
) {
|
||||||
|
loop {
|
||||||
|
futures::select! {
|
||||||
|
pool_req = tx_pool_handle.next() => {
|
||||||
|
let Some((req, tx)) = pool_req else {
|
||||||
|
todo!("Shutdown txpool")
|
||||||
|
};
|
||||||
|
self.handle_txs_req(req, tx);
|
||||||
|
}
|
||||||
|
new_txs = new_tx_channel.next() => {
|
||||||
|
let Some(new_txs) = new_txs else {
|
||||||
|
todo!("Shutdown txpool")
|
||||||
|
};
|
||||||
|
|
||||||
|
self.handle_new_txs(new_txs.0, new_txs.1).await.unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn main() {}
|
|
@ -6,13 +6,13 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
use monero_serai::{block::Block, transaction::Transaction};
|
use monero_serai::block::Block;
|
||||||
use tower::{Service, ServiceExt};
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
context::{BlockChainContext, BlockChainContextRequest},
|
context::{BlockChainContext, BlockChainContextRequest},
|
||||||
transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse},
|
transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse},
|
||||||
ConsensusError, HardFork,
|
ConsensusError, HardFork, TxNotInPool, TxPoolRequest, TxPoolResponse,
|
||||||
};
|
};
|
||||||
|
|
||||||
mod checks;
|
mod checks;
|
||||||
|
@ -34,50 +34,62 @@ pub struct VerifiedBlockInformation {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum VerifyBlockRequest {
|
pub enum VerifyBlockRequest {
|
||||||
MainChainBatchSetupVerify(Block, Vec<Transaction>),
|
MainChain(Block),
|
||||||
MainChain(Block, Vec<Arc<TransactionVerificationData>>),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub enum VerifyBlockResponse {
|
|
||||||
MainChainBatchSetupVerify(),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: it is probably a bad idea for this to derive clone, if 2 places (RPC, P2P) receive valid but different blocks
|
// TODO: it is probably a bad idea for this to derive clone, if 2 places (RPC, P2P) receive valid but different blocks
|
||||||
// then they will both get approved but only one should go to main chain.
|
// then they will both get approved but only one should go to main chain.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct BlockVerifierService<C: Clone, Tx: Clone> {
|
pub struct BlockVerifierService<C: Clone, TxV: Clone, TxP: Clone> {
|
||||||
context_svc: C,
|
context_svc: C,
|
||||||
tx_verifier_svc: Tx,
|
tx_verifier_svc: TxV,
|
||||||
|
tx_pool: TxP,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<C, Tx> BlockVerifierService<C, Tx>
|
impl<C, TxV, TxP> BlockVerifierService<C, TxV, TxP>
|
||||||
where
|
where
|
||||||
C: Service<BlockChainContextRequest, Response = BlockChainContext> + Clone + Send + 'static,
|
C: Service<BlockChainContextRequest, Response = BlockChainContext> + Clone + Send + 'static,
|
||||||
Tx: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>
|
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>
|
||||||
|
+ Clone
|
||||||
|
+ Send
|
||||||
|
+ 'static,
|
||||||
|
TxP: Service<TxPoolRequest, Response = TxPoolResponse, Error = TxNotInPool>
|
||||||
+ Clone
|
+ Clone
|
||||||
+ Send
|
+ Send
|
||||||
+ 'static,
|
+ 'static,
|
||||||
{
|
{
|
||||||
pub fn new(context_svc: C, tx_verifier_svc: Tx) -> BlockVerifierService<C, Tx> {
|
pub fn new(
|
||||||
|
context_svc: C,
|
||||||
|
tx_verifier_svc: TxV,
|
||||||
|
tx_pool: TxP,
|
||||||
|
) -> BlockVerifierService<C, TxV, TxP> {
|
||||||
BlockVerifierService {
|
BlockVerifierService {
|
||||||
context_svc,
|
context_svc,
|
||||||
tx_verifier_svc,
|
tx_verifier_svc,
|
||||||
|
tx_pool,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<C, Tx> Service<VerifyBlockRequest> for BlockVerifierService<C, Tx>
|
impl<C, TxV, TxP> Service<VerifyBlockRequest> for BlockVerifierService<C, TxV, TxP>
|
||||||
where
|
where
|
||||||
C: Service<BlockChainContextRequest, Response = BlockChainContext, Error = tower::BoxError>
|
C: Service<BlockChainContextRequest, Response = BlockChainContext, Error = tower::BoxError>
|
||||||
+ Clone
|
+ Clone
|
||||||
+ Send
|
+ Send
|
||||||
+ 'static,
|
+ 'static,
|
||||||
C::Future: Send + 'static,
|
C::Future: Send + 'static,
|
||||||
Tx: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>
|
|
||||||
|
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>
|
||||||
+ Clone
|
+ Clone
|
||||||
+ Send
|
+ Send
|
||||||
+ 'static,
|
+ 'static,
|
||||||
Tx::Future: Send + 'static,
|
TxV::Future: Send + 'static,
|
||||||
|
|
||||||
|
TxP: Service<TxPoolRequest, Response = TxPoolResponse, Error = TxNotInPool>
|
||||||
|
+ Clone
|
||||||
|
+ Send
|
||||||
|
+ 'static,
|
||||||
|
TxP::Future: Send + 'static,
|
||||||
{
|
{
|
||||||
type Response = VerifiedBlockInformation;
|
type Response = VerifiedBlockInformation;
|
||||||
type Error = ConsensusError;
|
type Error = ConsensusError;
|
||||||
|
@ -92,15 +104,12 @@ where
|
||||||
fn call(&mut self, req: VerifyBlockRequest) -> Self::Future {
|
fn call(&mut self, req: VerifyBlockRequest) -> Self::Future {
|
||||||
let context_svc = self.context_svc.clone();
|
let context_svc = self.context_svc.clone();
|
||||||
let tx_verifier_svc = self.tx_verifier_svc.clone();
|
let tx_verifier_svc = self.tx_verifier_svc.clone();
|
||||||
|
let tx_pool = self.tx_pool.clone();
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
match req {
|
match req {
|
||||||
VerifyBlockRequest::MainChainBatchSetupVerify(block, txs) => {
|
VerifyBlockRequest::MainChain(block) => {
|
||||||
batch_setup_verify_main_chain_block(block, txs, context_svc, tx_verifier_svc)
|
verify_main_chain_block(block, context_svc, tx_verifier_svc, tx_pool).await
|
||||||
.await
|
|
||||||
}
|
|
||||||
VerifyBlockRequest::MainChain(block, txs) => {
|
|
||||||
verify_main_chain_block(block, txs, context_svc, tx_verifier_svc).await
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -108,18 +117,22 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn verify_main_chain_block<C, Tx>(
|
async fn verify_main_chain_block<C, TxV, TxP>(
|
||||||
block: Block,
|
block: Block,
|
||||||
txs: Vec<Arc<TransactionVerificationData>>,
|
|
||||||
context_svc: C,
|
context_svc: C,
|
||||||
tx_verifier_svc: Tx,
|
tx_verifier_svc: TxV,
|
||||||
|
tx_pool: TxP,
|
||||||
) -> Result<VerifiedBlockInformation, ConsensusError>
|
) -> Result<VerifiedBlockInformation, ConsensusError>
|
||||||
where
|
where
|
||||||
C: Service<BlockChainContextRequest, Response = BlockChainContext, Error = tower::BoxError>
|
C: Service<BlockChainContextRequest, Response = BlockChainContext, Error = tower::BoxError>
|
||||||
+ Send
|
+ Send
|
||||||
+ 'static,
|
+ 'static,
|
||||||
C::Future: Send + 'static,
|
C::Future: Send + 'static,
|
||||||
Tx: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>,
|
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>,
|
||||||
|
TxP: Service<TxPoolRequest, Response = TxPoolResponse, Error = TxNotInPool>
|
||||||
|
+ Clone
|
||||||
|
+ Send
|
||||||
|
+ 'static,
|
||||||
{
|
{
|
||||||
tracing::debug!("getting blockchain context");
|
tracing::debug!("getting blockchain context");
|
||||||
let checked_context = context_svc
|
let checked_context = context_svc
|
||||||
|
@ -128,10 +141,14 @@ where
|
||||||
.map_err(Into::<ConsensusError>::into)?;
|
.map_err(Into::<ConsensusError>::into)?;
|
||||||
|
|
||||||
// TODO: should we unwrap here, we did just get the data so it should be ok.
|
// TODO: should we unwrap here, we did just get the data so it should be ok.
|
||||||
let context = checked_context.blockchain_context().unwrap();
|
let context = checked_context.blockchain_context().unwrap().clone();
|
||||||
|
|
||||||
tracing::debug!("got blockchain context: {:?}", context);
|
tracing::debug!("got blockchain context: {:?}", context);
|
||||||
|
|
||||||
|
let TxPoolResponse::Transactions(txs) = tx_pool
|
||||||
|
.oneshot(TxPoolRequest::Transactions(block.txs.clone()))
|
||||||
|
.await?;
|
||||||
|
|
||||||
let block_weight = block.miner_tx.weight() + txs.iter().map(|tx| tx.tx_weight).sum::<usize>();
|
let block_weight = block.miner_tx.weight() + txs.iter().map(|tx| tx.tx_weight).sum::<usize>();
|
||||||
let total_fees = txs.iter().map(|tx| tx.fee).sum::<u64>();
|
let total_fees = txs.iter().map(|tx| tx.fee).sum::<u64>();
|
||||||
|
|
||||||
|
@ -141,6 +158,7 @@ where
|
||||||
current_chain_height: context.chain_height,
|
current_chain_height: context.chain_height,
|
||||||
time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(),
|
time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(),
|
||||||
hf: context.current_hard_fork,
|
hf: context.current_hard_fork,
|
||||||
|
re_org_token: context.re_org_token.clone(),
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
@ -196,102 +214,3 @@ where
|
||||||
cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty,
|
cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn batch_setup_verify_main_chain_block<C, Tx>(
|
|
||||||
block: Block,
|
|
||||||
txs: Vec<Transaction>,
|
|
||||||
context_svc: C,
|
|
||||||
tx_verifier_svc: Tx,
|
|
||||||
) -> Result<VerifiedBlockInformation, ConsensusError>
|
|
||||||
where
|
|
||||||
C: Service<BlockChainContextRequest, Response = BlockChainContext, Error = tower::BoxError>
|
|
||||||
+ Send
|
|
||||||
+ 'static,
|
|
||||||
C::Future: Send + 'static,
|
|
||||||
Tx: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>,
|
|
||||||
{
|
|
||||||
tracing::debug!("getting blockchain context");
|
|
||||||
let checked_context = context_svc
|
|
||||||
.oneshot(BlockChainContextRequest)
|
|
||||||
.await
|
|
||||||
.map_err(Into::<ConsensusError>::into)?;
|
|
||||||
|
|
||||||
// TODO: should we unwrap here, we did just get the data so it should be ok.
|
|
||||||
let context = checked_context.blockchain_context().unwrap();
|
|
||||||
|
|
||||||
tracing::debug!("got blockchain context: {:?}", context);
|
|
||||||
|
|
||||||
// TODO: reorder these tests so we do the cheap tests first.
|
|
||||||
|
|
||||||
let txs = if !txs.is_empty() {
|
|
||||||
let VerifyTxResponse::BatchSetupOk(txs) = tx_verifier_svc
|
|
||||||
.oneshot(VerifyTxRequest::BatchSetupVerifyBlock {
|
|
||||||
txs,
|
|
||||||
current_chain_height: context.chain_height,
|
|
||||||
time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(),
|
|
||||||
hf: context.current_hard_fork,
|
|
||||||
})
|
|
||||||
.await?
|
|
||||||
else {
|
|
||||||
panic!("tx verifier sent incorrect response!");
|
|
||||||
};
|
|
||||||
txs
|
|
||||||
} else {
|
|
||||||
vec![]
|
|
||||||
};
|
|
||||||
|
|
||||||
let block_weight = block.miner_tx.weight() + txs.iter().map(|tx| tx.tx_weight).sum::<usize>();
|
|
||||||
let total_fees = txs.iter().map(|tx| tx.fee).sum::<u64>();
|
|
||||||
|
|
||||||
let generated_coins = miner_tx::check_miner_tx(
|
|
||||||
&block.miner_tx,
|
|
||||||
total_fees,
|
|
||||||
context.chain_height,
|
|
||||||
block_weight,
|
|
||||||
context.median_weight_for_block_reward,
|
|
||||||
context.already_generated_coins,
|
|
||||||
&context.current_hard_fork,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
let hashing_blob = block.serialize_hashable();
|
|
||||||
|
|
||||||
checks::block_size_sanity_check(block.serialize().len(), context.effective_median_weight)?;
|
|
||||||
checks::block_weight_check(block_weight, context.median_weight_for_block_reward)?;
|
|
||||||
|
|
||||||
checks::check_amount_txs(block.txs.len())?;
|
|
||||||
checks::check_prev_id(&block, &context.top_hash)?;
|
|
||||||
if let Some(median_timestamp) = context.median_block_timestamp {
|
|
||||||
// will only be None for the first 60 blocks
|
|
||||||
checks::check_timestamp(&block, median_timestamp)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// do POW test last
|
|
||||||
let pow_hash = tokio::task::spawn_blocking(move || {
|
|
||||||
hash_worker::calculate_pow_hash(
|
|
||||||
&hashing_blob,
|
|
||||||
context.chain_height,
|
|
||||||
&context.current_hard_fork,
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap()?;
|
|
||||||
|
|
||||||
checks::check_block_pow(&pow_hash, context.next_difficulty)?;
|
|
||||||
|
|
||||||
context
|
|
||||||
.current_hard_fork
|
|
||||||
.check_block_version_vote(&block.header)?;
|
|
||||||
|
|
||||||
Ok(VerifiedBlockInformation {
|
|
||||||
block_hash: block.hash(),
|
|
||||||
block,
|
|
||||||
txs,
|
|
||||||
pow_hash,
|
|
||||||
generated_coins,
|
|
||||||
weight: block_weight,
|
|
||||||
height: context.chain_height,
|
|
||||||
long_term_weight: context.next_block_long_term_weight(block_weight),
|
|
||||||
hf_vote: HardFork::V1,
|
|
||||||
cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
|
@ -16,7 +16,6 @@ use std::{
|
||||||
|
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use tokio_util::sync::CancellationToken;
|
|
||||||
use tower::{Service, ServiceExt};
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
use crate::{helper::current_time, ConsensusError, Database, DatabaseRequest, DatabaseResponse};
|
use crate::{helper::current_time, ConsensusError, Database, DatabaseRequest, DatabaseResponse};
|
||||||
|
@ -27,9 +26,11 @@ mod weight;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
mod tokens;
|
||||||
|
|
||||||
pub use difficulty::DifficultyCacheConfig;
|
pub use difficulty::DifficultyCacheConfig;
|
||||||
pub use hardforks::{HardFork, HardForkConfig};
|
pub use hardforks::{HardFork, HardForkConfig};
|
||||||
|
pub use tokens::*;
|
||||||
pub use weight::BlockWeightsCacheConfig;
|
pub use weight::BlockWeightsCacheConfig;
|
||||||
|
|
||||||
const BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW: u64 = 60;
|
const BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW: u64 = 60;
|
||||||
|
@ -118,7 +119,8 @@ where
|
||||||
let context_svc = BlockChainContextService {
|
let context_svc = BlockChainContextService {
|
||||||
internal_blockchain_context: Arc::new(
|
internal_blockchain_context: Arc::new(
|
||||||
InternalBlockChainContext {
|
InternalBlockChainContext {
|
||||||
current_validity_token: CancellationToken::new(),
|
current_validity_token: ValidityToken::new(),
|
||||||
|
current_reorg_token: ReOrgToken::new(),
|
||||||
difficulty_cache: difficulty_cache_handle.await.unwrap()?,
|
difficulty_cache: difficulty_cache_handle.await.unwrap()?,
|
||||||
weight_cache: weight_cache_handle.await.unwrap()?,
|
weight_cache: weight_cache_handle.await.unwrap()?,
|
||||||
hardfork_state: hardfork_state_handle.await.unwrap()?,
|
hardfork_state: hardfork_state_handle.await.unwrap()?,
|
||||||
|
@ -137,7 +139,7 @@ where
|
||||||
|
|
||||||
/// Raw blockchain context, gotten from [`BlockChainContext`]. This data may turn invalid so is not ok to keep
|
/// Raw blockchain context, gotten from [`BlockChainContext`]. This data may turn invalid so is not ok to keep
|
||||||
/// around. You should keep around [`BlockChainContext`] instead.
|
/// around. You should keep around [`BlockChainContext`] instead.
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RawBlockChainContext {
|
pub struct RawBlockChainContext {
|
||||||
/// The next blocks difficulty.
|
/// The next blocks difficulty.
|
||||||
pub next_difficulty: u128,
|
pub next_difficulty: u128,
|
||||||
|
@ -161,6 +163,8 @@ pub struct RawBlockChainContext {
|
||||||
pub top_hash: [u8; 32],
|
pub top_hash: [u8; 32],
|
||||||
/// The current hard fork.
|
/// The current hard fork.
|
||||||
pub current_hard_fork: HardFork,
|
pub current_hard_fork: HardFork,
|
||||||
|
/// A token which is used to signal if a reorg has happened since creating the token.
|
||||||
|
pub re_org_token: ReOrgToken,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RawBlockChainContext {
|
impl RawBlockChainContext {
|
||||||
|
@ -208,7 +212,7 @@ impl RawBlockChainContext {
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct BlockChainContext {
|
pub struct BlockChainContext {
|
||||||
/// A token representing this data's validity.
|
/// A token representing this data's validity.
|
||||||
validity_token: CancellationToken,
|
validity_token: ValidityToken,
|
||||||
/// The actual block chain context.
|
/// The actual block chain context.
|
||||||
raw: RawBlockChainContext,
|
raw: RawBlockChainContext,
|
||||||
}
|
}
|
||||||
|
@ -220,16 +224,16 @@ pub struct DataNoLongerValid;
|
||||||
impl BlockChainContext {
|
impl BlockChainContext {
|
||||||
/// Checks if the data is still valid.
|
/// Checks if the data is still valid.
|
||||||
pub fn is_still_valid(&self) -> bool {
|
pub fn is_still_valid(&self) -> bool {
|
||||||
!self.validity_token.is_cancelled()
|
self.validity_token.is_data_valid()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Checks if the data is valid returning an Err if not and a reference to the blockchain context if
|
/// Checks if the data is valid returning an Err if not and a reference to the blockchain context if
|
||||||
/// it is.
|
/// it is.
|
||||||
pub fn blockchain_context(&self) -> Result<RawBlockChainContext, DataNoLongerValid> {
|
pub fn blockchain_context(&self) -> Result<&RawBlockChainContext, DataNoLongerValid> {
|
||||||
if !self.is_still_valid() {
|
if !self.is_still_valid() {
|
||||||
return Err(DataNoLongerValid);
|
return Err(DataNoLongerValid);
|
||||||
}
|
}
|
||||||
Ok(self.raw)
|
Ok(&self.raw)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -240,7 +244,9 @@ pub struct BlockChainContextRequest;
|
||||||
struct InternalBlockChainContext {
|
struct InternalBlockChainContext {
|
||||||
/// A token used to invalidate previous contexts when a new
|
/// A token used to invalidate previous contexts when a new
|
||||||
/// block is added to the chain.
|
/// block is added to the chain.
|
||||||
current_validity_token: CancellationToken,
|
current_validity_token: ValidityToken,
|
||||||
|
/// A token which is used to signal a reorg has happened.
|
||||||
|
current_reorg_token: ReOrgToken,
|
||||||
|
|
||||||
difficulty_cache: difficulty::DifficultyCache,
|
difficulty_cache: difficulty::DifficultyCache,
|
||||||
weight_cache: weight::BlockWeightsCache,
|
weight_cache: weight::BlockWeightsCache,
|
||||||
|
@ -274,6 +280,7 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
|
||||||
|
|
||||||
let InternalBlockChainContext {
|
let InternalBlockChainContext {
|
||||||
current_validity_token,
|
current_validity_token,
|
||||||
|
current_reorg_token,
|
||||||
difficulty_cache,
|
difficulty_cache,
|
||||||
weight_cache,
|
weight_cache,
|
||||||
hardfork_state,
|
hardfork_state,
|
||||||
|
@ -285,7 +292,7 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
|
||||||
let current_hf = hardfork_state.current_hardfork();
|
let current_hf = hardfork_state.current_hardfork();
|
||||||
|
|
||||||
Ok(BlockChainContext {
|
Ok(BlockChainContext {
|
||||||
validity_token: current_validity_token.child_token(),
|
validity_token: current_validity_token.clone(),
|
||||||
raw: RawBlockChainContext {
|
raw: RawBlockChainContext {
|
||||||
next_difficulty: difficulty_cache.next_difficulty(¤t_hf),
|
next_difficulty: difficulty_cache.next_difficulty(¤t_hf),
|
||||||
cumulative_difficulty: difficulty_cache.cumulative_difficulty(),
|
cumulative_difficulty: difficulty_cache.cumulative_difficulty(),
|
||||||
|
@ -302,6 +309,7 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
|
||||||
chain_height: *chain_height,
|
chain_height: *chain_height,
|
||||||
top_hash: *top_block_hash,
|
top_hash: *top_block_hash,
|
||||||
current_hard_fork: current_hf,
|
current_hard_fork: current_hf,
|
||||||
|
re_org_token: current_reorg_token.clone(),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -339,6 +347,7 @@ impl tower::Service<UpdateBlockchainCacheRequest> for BlockChainContextService {
|
||||||
|
|
||||||
let InternalBlockChainContext {
|
let InternalBlockChainContext {
|
||||||
current_validity_token,
|
current_validity_token,
|
||||||
|
current_reorg_token: _,
|
||||||
difficulty_cache,
|
difficulty_cache,
|
||||||
weight_cache,
|
weight_cache,
|
||||||
hardfork_state,
|
hardfork_state,
|
||||||
|
@ -348,7 +357,7 @@ impl tower::Service<UpdateBlockchainCacheRequest> for BlockChainContextService {
|
||||||
} = internal_blockchain_context_lock.deref_mut();
|
} = internal_blockchain_context_lock.deref_mut();
|
||||||
|
|
||||||
// Cancel the validity token and replace it with a new one.
|
// Cancel the validity token and replace it with a new one.
|
||||||
std::mem::replace(current_validity_token, CancellationToken::new()).cancel();
|
std::mem::replace(current_validity_token, ValidityToken::new()).set_data_invalid();
|
||||||
|
|
||||||
difficulty_cache.new_block(new.height, new.timestamp, new.cumulative_difficulty);
|
difficulty_cache.new_block(new.height, new.timestamp, new.cumulative_difficulty);
|
||||||
|
|
||||||
|
|
45
consensus/src/context/tokens.rs
Normal file
45
consensus/src/context/tokens.rs
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
|
/// A token representing if a piece of data is valid.
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
pub struct ValidityToken {
|
||||||
|
token: CancellationToken,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ValidityToken {
|
||||||
|
pub fn new() -> ValidityToken {
|
||||||
|
ValidityToken {
|
||||||
|
token: CancellationToken::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_data_valid(&self) -> bool {
|
||||||
|
!self.token.is_cancelled()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_data_invalid(self) {
|
||||||
|
self.token.cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A token representing if a re-org has happened since it's creation.
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
pub struct ReOrgToken {
|
||||||
|
token: CancellationToken,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReOrgToken {
|
||||||
|
pub fn new() -> ReOrgToken {
|
||||||
|
ReOrgToken {
|
||||||
|
token: CancellationToken::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn reorg_happened(&self) -> bool {
|
||||||
|
self.token.is_cancelled()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_reorg_happened(self) {
|
||||||
|
self.token.cancel()
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,8 @@
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::{
|
||||||
|
collections::{HashMap, HashSet},
|
||||||
|
future::Future,
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
pub mod block;
|
pub mod block;
|
||||||
pub mod context;
|
pub mod context;
|
||||||
|
@ -11,12 +15,17 @@ mod test_utils;
|
||||||
pub mod transactions;
|
pub mod transactions;
|
||||||
|
|
||||||
pub use block::{VerifiedBlockInformation, VerifyBlockRequest};
|
pub use block::{VerifiedBlockInformation, VerifyBlockRequest};
|
||||||
pub use context::{ContextConfig, HardFork, UpdateBlockchainCacheRequest};
|
pub use context::{
|
||||||
|
initialize_blockchain_context, BlockChainContext, BlockChainContextRequest, ContextConfig,
|
||||||
|
HardFork, UpdateBlockchainCacheRequest,
|
||||||
|
};
|
||||||
pub use transactions::{VerifyTxRequest, VerifyTxResponse};
|
pub use transactions::{VerifyTxRequest, VerifyTxResponse};
|
||||||
|
|
||||||
pub async fn initialize_verifier<D>(
|
// TODO: instead of (ab)using generic returns return the acc type
|
||||||
|
pub async fn initialize_verifier<D, TxP, Ctx>(
|
||||||
database: D,
|
database: D,
|
||||||
cfg: ContextConfig,
|
tx_pool: TxP,
|
||||||
|
ctx_svc: Ctx,
|
||||||
) -> Result<
|
) -> Result<
|
||||||
(
|
(
|
||||||
impl tower::Service<
|
impl tower::Service<
|
||||||
|
@ -24,20 +33,39 @@ pub async fn initialize_verifier<D>(
|
||||||
Response = VerifiedBlockInformation,
|
Response = VerifiedBlockInformation,
|
||||||
Error = ConsensusError,
|
Error = ConsensusError,
|
||||||
>,
|
>,
|
||||||
impl tower::Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>,
|
impl tower::Service<
|
||||||
impl tower::Service<UpdateBlockchainCacheRequest, Response = (), Error = tower::BoxError>,
|
VerifyTxRequest,
|
||||||
|
Response = VerifyTxResponse,
|
||||||
|
Error = ConsensusError,
|
||||||
|
Future = impl Future<Output = Result<VerifyTxResponse, ConsensusError>> + Send + 'static,
|
||||||
|
> + Clone
|
||||||
|
+ Send
|
||||||
|
+ 'static,
|
||||||
),
|
),
|
||||||
ConsensusError,
|
ConsensusError,
|
||||||
>
|
>
|
||||||
where
|
where
|
||||||
D: Database + Clone + Send + Sync + 'static,
|
D: Database + Clone + Send + Sync + 'static,
|
||||||
D::Future: Send + 'static,
|
D::Future: Send + 'static,
|
||||||
|
TxP: tower::Service<TxPoolRequest, Response = TxPoolResponse, Error = TxNotInPool>
|
||||||
|
+ Clone
|
||||||
|
+ Send
|
||||||
|
+ Sync
|
||||||
|
+ 'static,
|
||||||
|
TxP::Future: Send + 'static,
|
||||||
|
Ctx: tower::Service<
|
||||||
|
BlockChainContextRequest,
|
||||||
|
Response = BlockChainContext,
|
||||||
|
Error = tower::BoxError,
|
||||||
|
> + Clone
|
||||||
|
+ Send
|
||||||
|
+ Sync
|
||||||
|
+ 'static,
|
||||||
|
Ctx::Future: Send + 'static,
|
||||||
{
|
{
|
||||||
let (context_svc, context_svc_updater) =
|
|
||||||
context::initialize_blockchain_context(cfg, database.clone()).await?;
|
|
||||||
let tx_svc = transactions::TxVerifierService::new(database);
|
let tx_svc = transactions::TxVerifierService::new(database);
|
||||||
let block_svc = block::BlockVerifierService::new(context_svc.clone(), tx_svc.clone());
|
let block_svc = block::BlockVerifierService::new(ctx_svc, tx_svc.clone(), tx_pool);
|
||||||
Ok((block_svc, tx_svc, context_svc_updater))
|
Ok((block_svc, tx_svc))
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: split this enum up.
|
// TODO: split this enum up.
|
||||||
|
@ -71,6 +99,8 @@ pub enum ConsensusError {
|
||||||
InvalidHardForkVersion(&'static str),
|
InvalidHardForkVersion(&'static str),
|
||||||
#[error("The block has a different previous hash than expected")]
|
#[error("The block has a different previous hash than expected")]
|
||||||
BlockIsNotApartOfChain,
|
BlockIsNotApartOfChain,
|
||||||
|
#[error("One or more transaction is not in the transaction pool")]
|
||||||
|
TxNotInPool(#[from] TxNotInPool),
|
||||||
#[error("Database error: {0}")]
|
#[error("Database error: {0}")]
|
||||||
Database(#[from] tower::BoxError),
|
Database(#[from] tower::BoxError),
|
||||||
}
|
}
|
||||||
|
@ -137,6 +167,7 @@ pub enum DatabaseResponse {
|
||||||
Outputs(HashMap<u64, HashMap<u64, OutputOnChain>>),
|
Outputs(HashMap<u64, HashMap<u64, OutputOnChain>>),
|
||||||
NumberOutputsWithAmount(usize),
|
NumberOutputsWithAmount(usize),
|
||||||
|
|
||||||
|
/// returns true if key images are spent
|
||||||
CheckKIsNotSpent(bool),
|
CheckKIsNotSpent(bool),
|
||||||
|
|
||||||
#[cfg(feature = "binaries")]
|
#[cfg(feature = "binaries")]
|
||||||
|
@ -147,3 +178,15 @@ pub enum DatabaseResponse {
|
||||||
)>,
|
)>,
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Copy, Clone, thiserror::Error)]
|
||||||
|
#[error("The transaction requested was not in the transaction pool")]
|
||||||
|
pub struct TxNotInPool;
|
||||||
|
|
||||||
|
pub enum TxPoolRequest {
|
||||||
|
Transactions(Vec<[u8; 32]>),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum TxPoolResponse {
|
||||||
|
Transactions(Vec<Arc<transactions::TransactionVerificationData>>),
|
||||||
|
}
|
||||||
|
|
|
@ -10,14 +10,16 @@ use std::{
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
use monero_serai::transaction::Transaction;
|
use monero_serai::transaction::Transaction;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use tower::Service;
|
use tower::{Service, ServiceExt};
|
||||||
use tracing::instrument;
|
use tracing::instrument;
|
||||||
|
|
||||||
use crate::{ConsensusError, Database, HardFork};
|
use crate::{
|
||||||
|
context::ReOrgToken, ConsensusError, Database, DatabaseRequest, DatabaseResponse, HardFork,
|
||||||
|
};
|
||||||
|
|
||||||
|
mod contextual_data;
|
||||||
mod inputs;
|
mod inputs;
|
||||||
pub(crate) mod outputs;
|
pub(crate) mod outputs;
|
||||||
mod ring;
|
|
||||||
mod sigs;
|
mod sigs;
|
||||||
mod time_lock;
|
mod time_lock;
|
||||||
|
|
||||||
|
@ -49,7 +51,7 @@ pub struct TransactionVerificationData {
|
||||||
pub tx_hash: [u8; 32],
|
pub tx_hash: [u8; 32],
|
||||||
/// We put this behind a mutex as the information is not constant and is based of past outputs idxs
|
/// We put this behind a mutex as the information is not constant and is based of past outputs idxs
|
||||||
/// which could change on re-orgs.
|
/// which could change on re-orgs.
|
||||||
rings_member_info: std::sync::Mutex<Option<ring::TxRingMembersInfo>>,
|
rings_member_info: std::sync::Mutex<Option<contextual_data::TxRingMembersInfo>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TransactionVerificationData {
|
impl TransactionVerificationData {
|
||||||
|
@ -73,16 +75,14 @@ pub enum VerifyTxRequest {
|
||||||
current_chain_height: u64,
|
current_chain_height: u64,
|
||||||
time_for_time_lock: u64,
|
time_for_time_lock: u64,
|
||||||
hf: HardFork,
|
hf: HardFork,
|
||||||
|
re_org_token: ReOrgToken,
|
||||||
},
|
},
|
||||||
/// Batches the setup of [`TransactionVerificationData`].
|
/// Batches the setup of [`TransactionVerificationData`], does *minimal* verification, you need to call [`VerifyTxRequest::Block`]
|
||||||
BatchSetup { txs: Vec<Transaction>, hf: HardFork },
|
/// with the returned data.
|
||||||
/// Batches the setup of [`TransactionVerificationData`] and verifies the transactions
|
BatchSetup {
|
||||||
/// in the context of a block.
|
|
||||||
BatchSetupVerifyBlock {
|
|
||||||
txs: Vec<Transaction>,
|
txs: Vec<Transaction>,
|
||||||
current_chain_height: u64,
|
|
||||||
time_for_time_lock: u64,
|
|
||||||
hf: HardFork,
|
hf: HardFork,
|
||||||
|
re_org_token: ReOrgToken,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,65 +129,30 @@ where
|
||||||
current_chain_height,
|
current_chain_height,
|
||||||
time_for_time_lock,
|
time_for_time_lock,
|
||||||
hf,
|
hf,
|
||||||
|
re_org_token,
|
||||||
} => verify_transactions_for_block(
|
} => verify_transactions_for_block(
|
||||||
database,
|
database,
|
||||||
txs,
|
txs,
|
||||||
current_chain_height,
|
current_chain_height,
|
||||||
time_for_time_lock,
|
time_for_time_lock,
|
||||||
hf,
|
hf,
|
||||||
|
re_org_token,
|
||||||
)
|
)
|
||||||
.boxed(),
|
.boxed(),
|
||||||
VerifyTxRequest::BatchSetup { txs, hf } => {
|
VerifyTxRequest::BatchSetup {
|
||||||
batch_setup_transactions(database, txs, hf).boxed()
|
|
||||||
}
|
|
||||||
VerifyTxRequest::BatchSetupVerifyBlock {
|
|
||||||
txs,
|
txs,
|
||||||
current_chain_height,
|
|
||||||
time_for_time_lock,
|
|
||||||
hf,
|
hf,
|
||||||
} => batch_setup_verify_transactions_for_block(
|
re_org_token,
|
||||||
database,
|
} => batch_setup_transactions(database, txs, hf, re_org_token).boxed(),
|
||||||
txs,
|
|
||||||
current_chain_height,
|
|
||||||
time_for_time_lock,
|
|
||||||
hf,
|
|
||||||
)
|
|
||||||
.boxed(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn set_missing_ring_members<D>(
|
|
||||||
database: D,
|
|
||||||
txs: &[Arc<TransactionVerificationData>],
|
|
||||||
hf: &HardFork,
|
|
||||||
) -> Result<(), ConsensusError>
|
|
||||||
where
|
|
||||||
D: Database + Clone + Sync + Send + 'static,
|
|
||||||
{
|
|
||||||
// TODO: handle re-orgs.
|
|
||||||
|
|
||||||
let txs_needing_ring_members = txs
|
|
||||||
.iter()
|
|
||||||
// Safety: we must not hold the mutex lock for long to not block the async runtime.
|
|
||||||
.filter(|tx| tx.rings_member_info.lock().unwrap().is_none())
|
|
||||||
.cloned()
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
tracing::debug!(
|
|
||||||
"Retrieving ring members for {} txs",
|
|
||||||
txs_needing_ring_members.len()
|
|
||||||
);
|
|
||||||
|
|
||||||
ring::batch_fill_ring_member_info(&txs_needing_ring_members, hf, database).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn batch_setup_transactions<D>(
|
async fn batch_setup_transactions<D>(
|
||||||
database: D,
|
database: D,
|
||||||
txs: Vec<Transaction>,
|
txs: Vec<Transaction>,
|
||||||
hf: HardFork,
|
hf: HardFork,
|
||||||
|
re_org_token: ReOrgToken,
|
||||||
) -> Result<VerifyTxResponse, ConsensusError>
|
) -> Result<VerifyTxResponse, ConsensusError>
|
||||||
where
|
where
|
||||||
D: Database + Clone + Sync + Send + 'static,
|
D: Database + Clone + Sync + Send + 'static,
|
||||||
|
@ -201,41 +166,11 @@ where
|
||||||
.await
|
.await
|
||||||
.unwrap()?;
|
.unwrap()?;
|
||||||
|
|
||||||
set_missing_ring_members(database, &txs, &hf).await?;
|
contextual_data::batch_fill_ring_member_info(&txs, &hf, re_org_token, database).await?;
|
||||||
|
|
||||||
Ok(VerifyTxResponse::BatchSetupOk(txs))
|
Ok(VerifyTxResponse::BatchSetupOk(txs))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn batch_setup_verify_transactions_for_block<D>(
|
|
||||||
database: D,
|
|
||||||
txs: Vec<Transaction>,
|
|
||||||
current_chain_height: u64,
|
|
||||||
time_for_time_lock: u64,
|
|
||||||
hf: HardFork,
|
|
||||||
) -> Result<VerifyTxResponse, ConsensusError>
|
|
||||||
where
|
|
||||||
D: Database + Clone + Sync + Send + 'static,
|
|
||||||
{
|
|
||||||
// Move out of the async runtime and use rayon to parallelize the serialisation and hashing of the txs.
|
|
||||||
let txs = tokio::task::spawn_blocking(|| {
|
|
||||||
txs.into_par_iter()
|
|
||||||
.map(|tx| Ok(Arc::new(TransactionVerificationData::new(tx)?)))
|
|
||||||
.collect::<Result<Vec<_>, ConsensusError>>()
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap()?;
|
|
||||||
|
|
||||||
verify_transactions_for_block(
|
|
||||||
database,
|
|
||||||
txs.clone(),
|
|
||||||
current_chain_height,
|
|
||||||
time_for_time_lock,
|
|
||||||
hf,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
Ok(VerifyTxResponse::BatchSetupOk(txs))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[instrument(name = "verify_txs", skip_all, level = "info")]
|
#[instrument(name = "verify_txs", skip_all, level = "info")]
|
||||||
async fn verify_transactions_for_block<D>(
|
async fn verify_transactions_for_block<D>(
|
||||||
database: D,
|
database: D,
|
||||||
|
@ -243,16 +178,19 @@ async fn verify_transactions_for_block<D>(
|
||||||
current_chain_height: u64,
|
current_chain_height: u64,
|
||||||
time_for_time_lock: u64,
|
time_for_time_lock: u64,
|
||||||
hf: HardFork,
|
hf: HardFork,
|
||||||
|
re_org_token: ReOrgToken,
|
||||||
) -> Result<VerifyTxResponse, ConsensusError>
|
) -> Result<VerifyTxResponse, ConsensusError>
|
||||||
where
|
where
|
||||||
D: Database + Clone + Sync + Send + 'static,
|
D: Database + Clone + Sync + Send + 'static,
|
||||||
{
|
{
|
||||||
tracing::debug!("Verifying transactions for block, amount: {}", txs.len());
|
tracing::debug!("Verifying transactions for block, amount: {}", txs.len());
|
||||||
|
|
||||||
set_missing_ring_members(database, &txs, &hf).await?;
|
contextual_data::batch_refresh_ring_member_info(&txs, &hf, re_org_token, database.clone())
|
||||||
|
.await?;
|
||||||
|
|
||||||
let spent_kis = Arc::new(std::sync::Mutex::new(HashSet::new()));
|
let spent_kis = Arc::new(std::sync::Mutex::new(HashSet::new()));
|
||||||
|
|
||||||
|
let cloned_spent_kis = spent_kis.clone();
|
||||||
tokio::task::spawn_blocking(move || {
|
tokio::task::spawn_blocking(move || {
|
||||||
txs.par_iter().try_for_each(|tx| {
|
txs.par_iter().try_for_each(|tx| {
|
||||||
verify_transaction_for_block(
|
verify_transaction_for_block(
|
||||||
|
@ -260,13 +198,28 @@ where
|
||||||
current_chain_height,
|
current_chain_height,
|
||||||
time_for_time_lock,
|
time_for_time_lock,
|
||||||
hf,
|
hf,
|
||||||
spent_kis.clone(),
|
cloned_spent_kis.clone(),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap()?;
|
.unwrap()?;
|
||||||
|
|
||||||
|
let DatabaseResponse::CheckKIsNotSpent(kis_spent) = database
|
||||||
|
.oneshot(DatabaseRequest::CheckKIsNotSpent(
|
||||||
|
Arc::into_inner(spent_kis).unwrap().into_inner().unwrap(),
|
||||||
|
))
|
||||||
|
.await?
|
||||||
|
else {
|
||||||
|
panic!("Database sent incorrect response!");
|
||||||
|
};
|
||||||
|
|
||||||
|
if kis_spent {
|
||||||
|
return Err(ConsensusError::TransactionHasInvalidInput(
|
||||||
|
"One or more key image spent!",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
Ok(VerifyTxResponse::Ok)
|
Ok(VerifyTxResponse::Ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -329,7 +282,7 @@ fn verify_transaction_for_block(
|
||||||
///
|
///
|
||||||
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#version
|
/// https://cuprate.github.io/monero-book/consensus_rules/transactions.html#version
|
||||||
fn check_tx_version(
|
fn check_tx_version(
|
||||||
decoy_info: &Option<ring::DecoyInfo>,
|
decoy_info: &Option<contextual_data::DecoyInfo>,
|
||||||
version: &TxVersion,
|
version: &TxVersion,
|
||||||
hf: &HardFork,
|
hf: &HardFork,
|
||||||
) -> Result<(), ConsensusError> {
|
) -> Result<(), ConsensusError> {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
//! # Rings
|
//! # Contextual Data
|
||||||
//!
|
//!
|
||||||
//! This module contains [`TxRingMembersInfo`] which is a struct made up from blockchain information about the
|
//! This module contains [`TxRingMembersInfo`] which is a struct made up from blockchain information about the
|
||||||
//! ring members of inputs. This module does minimal consensus checks, only when needed, and should not be relied
|
//! ring members of inputs. This module does minimal consensus checks, only when needed, and should not be relied
|
||||||
|
@ -6,7 +6,12 @@
|
||||||
//!
|
//!
|
||||||
//! The data collected by this module can be used to perform consensus checks.
|
//! The data collected by this module can be used to perform consensus checks.
|
||||||
//!
|
//!
|
||||||
|
//! ## Why not use the context service?
|
||||||
|
//!
|
||||||
|
//! Because this data is unique for *every* transaction and the context service is just for blockchain state data.
|
||||||
|
//!
|
||||||
|
|
||||||
|
use std::ops::Deref;
|
||||||
use std::{
|
use std::{
|
||||||
cmp::{max, min},
|
cmp::{max, min},
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
|
@ -21,17 +26,102 @@ use monero_serai::{
|
||||||
use tower::ServiceExt;
|
use tower::ServiceExt;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
transactions::TransactionVerificationData, ConsensusError, Database, DatabaseRequest,
|
context::ReOrgToken, transactions::TransactionVerificationData, ConsensusError, Database,
|
||||||
DatabaseResponse, HardFork, OutputOnChain,
|
DatabaseRequest, DatabaseResponse, HardFork, OutputOnChain,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pub async fn batch_refresh_ring_member_info<D: Database + Clone + Send + Sync + 'static>(
|
||||||
|
txs_verification_data: &[Arc<TransactionVerificationData>],
|
||||||
|
hf: &HardFork,
|
||||||
|
re_org_token: ReOrgToken,
|
||||||
|
database: D,
|
||||||
|
) -> Result<(), ConsensusError> {
|
||||||
|
let (txs_needing_full_refresh, txs_needing_partial_refresh) =
|
||||||
|
ring_member_info_needing_refresh(txs_verification_data, hf);
|
||||||
|
|
||||||
|
batch_fill_ring_member_info(
|
||||||
|
&txs_needing_full_refresh,
|
||||||
|
hf,
|
||||||
|
re_org_token,
|
||||||
|
database.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
for tx_v_data in txs_needing_partial_refresh {
|
||||||
|
let decoy_info = if hf != &HardFork::V1 {
|
||||||
|
// this data is only needed after hard-fork 1.
|
||||||
|
Some(DecoyInfo::new(&tx_v_data.tx.prefix.inputs, hf, database.clone()).await?)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
// Temporarily acquirer the mutex lock to add the ring member info.
|
||||||
|
tx_v_data
|
||||||
|
.rings_member_info
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.as_mut()
|
||||||
|
// this unwrap is safe as otherwise this would require a full refresh not a partial one.
|
||||||
|
.unwrap()
|
||||||
|
.decoy_info = decoy_info;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This function returns the transaction verification datas that need refreshing.
|
||||||
|
///
|
||||||
|
/// The first returned vec needs a full refresh.
|
||||||
|
/// The second returned vec only needs a partial refresh.
|
||||||
|
///
|
||||||
|
/// A full refresh is a refresh of all the ring members and the decoy info.
|
||||||
|
/// A partial refresh is just a refresh of the decoy info.
|
||||||
|
fn ring_member_info_needing_refresh(
|
||||||
|
txs_verification_data: &[Arc<TransactionVerificationData>],
|
||||||
|
hf: &HardFork,
|
||||||
|
) -> (
|
||||||
|
Vec<Arc<TransactionVerificationData>>,
|
||||||
|
Vec<Arc<TransactionVerificationData>>,
|
||||||
|
) {
|
||||||
|
let mut txs_needing_full_refresh = Vec::new();
|
||||||
|
let mut txs_needing_partial_refresh = Vec::new();
|
||||||
|
|
||||||
|
for tx in txs_verification_data {
|
||||||
|
let tx_ring_member_info = tx.rings_member_info.lock().unwrap();
|
||||||
|
|
||||||
|
// if we don't have ring members or if a re-org has happened or if we changed hf do a full refresh.
|
||||||
|
// doing a full refresh each hf isn't needed now but its so rare it makes sense to just do a full one.
|
||||||
|
if let Some(tx_ring_member_info) = tx_ring_member_info.deref() {
|
||||||
|
if tx_ring_member_info.re_org_token.reorg_happened() || &tx_ring_member_info.hf != hf {
|
||||||
|
txs_needing_full_refresh.push(tx.clone());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
txs_needing_full_refresh.push(tx.clone());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// if any input does not have a 0 amount do a partial refresh, this is because some decoy info
|
||||||
|
// data is based on the amount of non-ringCT outputs at a certain point.
|
||||||
|
if tx.tx.prefix.inputs.iter().any(|inp| match inp {
|
||||||
|
Input::Gen(_) => false,
|
||||||
|
Input::ToKey { amount, .. } => amount.is_some(),
|
||||||
|
}) {
|
||||||
|
txs_needing_partial_refresh.push(tx.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
(txs_needing_full_refresh, txs_needing_partial_refresh)
|
||||||
|
}
|
||||||
|
|
||||||
/// Fills the `rings_member_info` field on the inputted [`TransactionVerificationData`].
|
/// Fills the `rings_member_info` field on the inputted [`TransactionVerificationData`].
|
||||||
///
|
///
|
||||||
/// This function batch gets all the ring members for the inputted transactions and fills in data about
|
/// This function batch gets all the ring members for the inputted transactions and fills in data about
|
||||||
/// them, like the youngest used out and the time locks.
|
/// them.
|
||||||
pub async fn batch_fill_ring_member_info<D: Database + Clone + Send + Sync + 'static>(
|
pub async fn batch_fill_ring_member_info<D: Database + Clone + Send + Sync + 'static>(
|
||||||
txs_verification_data: &[Arc<TransactionVerificationData>],
|
txs_verification_data: &[Arc<TransactionVerificationData>],
|
||||||
hf: &HardFork,
|
hf: &HardFork,
|
||||||
|
re_org_token: ReOrgToken,
|
||||||
mut database: D,
|
mut database: D,
|
||||||
) -> Result<(), ConsensusError> {
|
) -> Result<(), ConsensusError> {
|
||||||
let mut output_ids = HashMap::new();
|
let mut output_ids = HashMap::new();
|
||||||
|
@ -54,6 +144,7 @@ pub async fn batch_fill_ring_member_info<D: Database + Clone + Send + Sync + 'st
|
||||||
get_ring_members_for_inputs(&outputs, &tx_v_data.tx.prefix.inputs)?;
|
get_ring_members_for_inputs(&outputs, &tx_v_data.tx.prefix.inputs)?;
|
||||||
|
|
||||||
let decoy_info = if hf != &HardFork::V1 {
|
let decoy_info = if hf != &HardFork::V1 {
|
||||||
|
// this data is only needed after hard-fork 1.
|
||||||
Some(DecoyInfo::new(&tx_v_data.tx.prefix.inputs, hf, database.clone()).await?)
|
Some(DecoyInfo::new(&tx_v_data.tx.prefix.inputs, hf, database.clone()).await?)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
|
@ -68,6 +159,8 @@ pub async fn batch_fill_ring_member_info<D: Database + Clone + Send + Sync + 'st
|
||||||
ring_members_for_tx,
|
ring_members_for_tx,
|
||||||
decoy_info,
|
decoy_info,
|
||||||
tx_v_data.tx.rct_signatures.rct_type(),
|
tx_v_data.tx.rct_signatures.rct_type(),
|
||||||
|
*hf,
|
||||||
|
re_org_token.clone(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -163,6 +256,10 @@ pub struct TxRingMembersInfo {
|
||||||
pub decoy_info: Option<DecoyInfo>,
|
pub decoy_info: Option<DecoyInfo>,
|
||||||
pub youngest_used_out_height: u64,
|
pub youngest_used_out_height: u64,
|
||||||
pub time_locked_outs: Vec<Timelock>,
|
pub time_locked_outs: Vec<Timelock>,
|
||||||
|
/// A token used to check if a re org has happened since getting this data.
|
||||||
|
re_org_token: ReOrgToken,
|
||||||
|
/// The hard-fork this data was retrived for.
|
||||||
|
hf: HardFork,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TxRingMembersInfo {
|
impl TxRingMembersInfo {
|
||||||
|
@ -173,6 +270,8 @@ impl TxRingMembersInfo {
|
||||||
used_outs: Vec<Vec<&OutputOnChain>>,
|
used_outs: Vec<Vec<&OutputOnChain>>,
|
||||||
decoy_info: Option<DecoyInfo>,
|
decoy_info: Option<DecoyInfo>,
|
||||||
rct_type: RctType,
|
rct_type: RctType,
|
||||||
|
hf: HardFork,
|
||||||
|
re_org_token: ReOrgToken,
|
||||||
) -> TxRingMembersInfo {
|
) -> TxRingMembersInfo {
|
||||||
TxRingMembersInfo {
|
TxRingMembersInfo {
|
||||||
youngest_used_out_height: used_outs
|
youngest_used_out_height: used_outs
|
||||||
|
@ -200,7 +299,9 @@ impl TxRingMembersInfo {
|
||||||
})
|
})
|
||||||
.collect(),
|
.collect(),
|
||||||
rings: Rings::new(used_outs, rct_type),
|
rings: Rings::new(used_outs, rct_type),
|
||||||
|
re_org_token,
|
||||||
decoy_info,
|
decoy_info,
|
||||||
|
hf,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -242,7 +343,13 @@ fn get_ring_members_for_inputs<'a>(
|
||||||
.collect::<Result<_, ConsensusError>>()
|
.collect::<Result<_, ConsensusError>>()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A struct holding information about the inputs and their decoys.
|
/// A struct holding information about the inputs and their decoys. This data can vary by block so
|
||||||
|
/// this data needs to be retrieved after every change in the blockchain.
|
||||||
|
///
|
||||||
|
/// This data *does not* need to be refreshed if one of these are true:
|
||||||
|
///
|
||||||
|
/// - The input amounts are *ALL* 0
|
||||||
|
/// - The top block hash is the same as when this data was retrieved.
|
||||||
///
|
///
|
||||||
/// https://cuprate.github.io/monero-book/consensus_rules/transactions/decoys.html
|
/// https://cuprate.github.io/monero-book/consensus_rules/transactions/decoys.html
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -298,6 +405,7 @@ impl DecoyInfo {
|
||||||
mixable += 1;
|
mixable += 1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// ringCT amounts are always mixable.
|
||||||
mixable += 1;
|
mixable += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -331,13 +439,13 @@ impl DecoyInfo {
|
||||||
///
|
///
|
||||||
/// https://cuprate.github.io/monero-book/consensus_rules/transactions/decoys.html#minimum-amount-of-decoys
|
/// https://cuprate.github.io/monero-book/consensus_rules/transactions/decoys.html#minimum-amount-of-decoys
|
||||||
pub(crate) fn minimum_decoys(hf: &HardFork) -> usize {
|
pub(crate) fn minimum_decoys(hf: &HardFork) -> usize {
|
||||||
use HardFork::*;
|
use HardFork as HF;
|
||||||
match hf {
|
match hf {
|
||||||
V1 => panic!("hard-fork 1 does not use these rules!"),
|
HF::V1 => panic!("hard-fork 1 does not use these rules!"),
|
||||||
V2 | V3 | V4 | V5 => 2,
|
HF::V2 | HF::V3 | HF::V4 | HF::V5 => 2,
|
||||||
V6 => 4,
|
HF::V6 => 4,
|
||||||
V7 => 6,
|
HF::V7 => 6,
|
||||||
V8 | V9 | V10 | V11 | V12 | V13 | V14 => 10,
|
HF::V8 | HF::V9 | HF::V10 | HF::V11 | HF::V12 | HF::V13 | HF::V14 => 10,
|
||||||
_ => 15,
|
HF::V15 | HF::V16 => 15,
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -9,7 +9,7 @@ use monero_serai::transaction::Input;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
transactions::{
|
transactions::{
|
||||||
ring::{minimum_decoys, DecoyInfo, TxRingMembersInfo},
|
contextual_data::{minimum_decoys, DecoyInfo, TxRingMembersInfo},
|
||||||
TxVersion,
|
TxVersion,
|
||||||
},
|
},
|
||||||
ConsensusError, HardFork,
|
ConsensusError, HardFork,
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use monero_serai::transaction::Transaction;
|
use monero_serai::transaction::Transaction;
|
||||||
|
|
||||||
use crate::{transactions::ring::Rings, ConsensusError};
|
use crate::{transactions::contextual_data::Rings, ConsensusError};
|
||||||
|
|
||||||
mod ring_sigs;
|
mod ring_sigs;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue