mirror of
https://github.com/Cuprate/cuprate.git
synced 2025-01-18 16:54:35 +00:00
add batch preparing for blocks.
This commit is contained in:
parent
f025513950
commit
484b418faf
10 changed files with 383 additions and 103 deletions
|
@ -19,6 +19,7 @@ use monero_consensus::{
|
|||
initialize_blockchain_context, initialize_verifier,
|
||||
rpc::{cache::ScanningCache, init_rpc_load_balancer, RpcConfig},
|
||||
Database, DatabaseRequest, DatabaseResponse, VerifiedBlockInformation, VerifyBlockRequest,
|
||||
VerifyBlockResponse,
|
||||
};
|
||||
|
||||
mod tx_pool;
|
||||
|
@ -81,19 +82,19 @@ where
|
|||
D::Future: Send + 'static,
|
||||
{
|
||||
let mut next_fut = tokio::spawn(call_batch(
|
||||
start_height..(start_height + MAX_BLOCKS_IN_RANGE).min(chain_height),
|
||||
start_height..(start_height + (MAX_BLOCKS_IN_RANGE * 2)).min(chain_height),
|
||||
database.clone(),
|
||||
));
|
||||
|
||||
for next_batch_start in (start_height..chain_height)
|
||||
.step_by(MAX_BLOCKS_IN_RANGE as usize)
|
||||
.step_by((MAX_BLOCKS_IN_RANGE * 2) as usize)
|
||||
.skip(1)
|
||||
{
|
||||
// Call the next batch while we handle this batch.
|
||||
let current_fut = std::mem::replace(
|
||||
&mut next_fut,
|
||||
tokio::spawn(call_batch(
|
||||
next_batch_start..(next_batch_start + MAX_BLOCKS_IN_RANGE).min(chain_height),
|
||||
next_batch_start..(next_batch_start + (MAX_BLOCKS_IN_RANGE * 2)).min(chain_height),
|
||||
database.clone(),
|
||||
)),
|
||||
);
|
||||
|
@ -103,8 +104,8 @@ where
|
|||
};
|
||||
|
||||
tracing::info!(
|
||||
"Handling batch: {:?}, chain height: {}",
|
||||
(next_batch_start - MAX_BLOCKS_IN_RANGE)..(next_batch_start),
|
||||
"Retrived batch: {:?}, chain height: {}",
|
||||
(next_batch_start - (MAX_BLOCKS_IN_RANGE * 2))..(next_batch_start),
|
||||
chain_height
|
||||
);
|
||||
|
||||
|
@ -161,15 +162,48 @@ where
|
|||
call_blocks(new_tx_chan, block_tx, start_height, chain_height, database).await
|
||||
});
|
||||
|
||||
while let Some(blocks) = incoming_blocks.next().await {
|
||||
for block in blocks {
|
||||
let verified_block_info = block_verifier
|
||||
let (mut prepared_blocks_tx, mut prepared_blocks_rx) = mpsc::channel(2);
|
||||
|
||||
let mut cloned_block_verifier = block_verifier.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Some(mut next_blocks) = incoming_blocks.next().await {
|
||||
while !next_blocks.is_empty() {
|
||||
tracing::info!(
|
||||
"preparing next batch, number of blocks: {}",
|
||||
next_blocks.len().min(100)
|
||||
);
|
||||
|
||||
let res = cloned_block_verifier
|
||||
.ready()
|
||||
.await?
|
||||
.call(VerifyBlockRequest::BatchSetup(
|
||||
next_blocks.drain(0..next_blocks.len().min(100)).collect(),
|
||||
))
|
||||
.await;
|
||||
|
||||
prepared_blocks_tx.send(res).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Result::<_, tower::BoxError>::Ok(())
|
||||
});
|
||||
|
||||
while let Some(prepared_blocks) = prepared_blocks_rx.next().await {
|
||||
let VerifyBlockResponse::BatchSetup(prepared_blocks) = prepared_blocks? else {
|
||||
panic!("block verifier sent incorrect response!");
|
||||
};
|
||||
let mut height = 0;
|
||||
for block in prepared_blocks {
|
||||
let VerifyBlockResponse::MainChain(verified_block_info) = block_verifier
|
||||
.ready()
|
||||
.await?
|
||||
.call(VerifyBlockRequest::MainChain(block))
|
||||
.await?;
|
||||
.call(VerifyBlockRequest::MainChainPreparedBlock(block))
|
||||
.await?
|
||||
else {
|
||||
panic!("Block verifier sent incorrect response!");
|
||||
};
|
||||
|
||||
tracing::info!("verified block: {}", verified_block_info.height);
|
||||
height = verified_block_info.height;
|
||||
|
||||
if verified_block_info.height % 5000 == 0 {
|
||||
tracing::info!("saving cache to: {}", save_file.display());
|
||||
|
@ -178,6 +212,12 @@ where
|
|||
|
||||
update_cache_and_context(&cache, &mut context_updater, verified_block_info).await?;
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
"verified blocks: {:?}, chain height: {}",
|
||||
0..height,
|
||||
chain_height
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -139,10 +139,8 @@ where
|
|||
.await?
|
||||
.call(BlockChainContextRequest)
|
||||
.await?;
|
||||
self.current_ctx
|
||||
.blockchain_context()
|
||||
.map_err(Into::into)
|
||||
.cloned()
|
||||
|
||||
Ok(self.current_ctx.unchecked_blockchain_context().clone())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6,11 +6,13 @@ use std::{
|
|||
};
|
||||
|
||||
use futures::FutureExt;
|
||||
use monero_serai::block::Block;
|
||||
use monero_serai::{block::Block, transaction::Input};
|
||||
use rayon::prelude::*;
|
||||
use tower::{Service, ServiceExt};
|
||||
|
||||
use crate::{
|
||||
context::{BlockChainContext, BlockChainContextRequest},
|
||||
helper::rayon_spawn_async,
|
||||
transactions::{TransactionVerificationData, VerifyTxRequest, VerifyTxResponse},
|
||||
ConsensusError, HardFork, TxNotInPool, TxPoolRequest, TxPoolResponse,
|
||||
};
|
||||
|
@ -19,6 +21,22 @@ mod checks;
|
|||
mod hash_worker;
|
||||
mod miner_tx;
|
||||
|
||||
use hash_worker::calculate_pow_hash;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PrePreparedBlock {
|
||||
pub block: Block,
|
||||
pub block_blob: Vec<u8>,
|
||||
|
||||
pub hf_vote: HardFork,
|
||||
pub hf_version: HardFork,
|
||||
|
||||
pub block_hash: [u8; 32],
|
||||
pub pow_hash: [u8; 32],
|
||||
|
||||
pub miner_tx_weight: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct VerifiedBlockInformation {
|
||||
pub block: Block,
|
||||
|
@ -35,6 +53,15 @@ pub struct VerifiedBlockInformation {
|
|||
|
||||
pub enum VerifyBlockRequest {
|
||||
MainChain(Block),
|
||||
|
||||
BatchSetup(Vec<Block>),
|
||||
MainChainPreparedBlock(PrePreparedBlock),
|
||||
}
|
||||
|
||||
pub enum VerifyBlockResponse {
|
||||
MainChain(VerifiedBlockInformation),
|
||||
|
||||
BatchSetup(Vec<PrePreparedBlock>),
|
||||
}
|
||||
|
||||
// TODO: it is probably a bad idea for this to derive clone, if 2 places (RPC, P2P) receive valid but different blocks
|
||||
|
@ -91,7 +118,7 @@ where
|
|||
+ 'static,
|
||||
TxP::Future: Send + 'static,
|
||||
{
|
||||
type Response = VerifiedBlockInformation;
|
||||
type Response = VerifyBlockResponse;
|
||||
type Error = ConsensusError;
|
||||
type Future =
|
||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||
|
@ -103,6 +130,7 @@ where
|
|||
|
||||
fn call(&mut self, req: VerifyBlockRequest) -> Self::Future {
|
||||
let context_svc = self.context_svc.clone();
|
||||
let context_svc = std::mem::replace(&mut self.context_svc, context_svc);
|
||||
let tx_verifier_svc = self.tx_verifier_svc.clone();
|
||||
let tx_pool = self.tx_pool.clone();
|
||||
|
||||
|
@ -111,18 +139,76 @@ where
|
|||
VerifyBlockRequest::MainChain(block) => {
|
||||
verify_main_chain_block(block, context_svc, tx_verifier_svc, tx_pool).await
|
||||
}
|
||||
VerifyBlockRequest::BatchSetup(blocks) => batch_prepare_block(blocks).await,
|
||||
VerifyBlockRequest::MainChainPreparedBlock(block) => {
|
||||
verify_prepared_main_chain_block(block, context_svc, tx_verifier_svc, tx_pool)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
async fn verify_main_chain_block<C, TxV, TxP>(
|
||||
block: Block,
|
||||
async fn batch_prepare_block(blocks: Vec<Block>) -> Result<VerifyBlockResponse, ConsensusError> {
|
||||
Ok(VerifyBlockResponse::BatchSetup(
|
||||
rayon_spawn_async(move || {
|
||||
blocks
|
||||
.into_par_iter()
|
||||
.map(prepare_block)
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
})
|
||||
.await?,
|
||||
))
|
||||
}
|
||||
|
||||
fn prepare_block(block: Block) -> Result<PrePreparedBlock, ConsensusError> {
|
||||
let hf_version = HardFork::from_version(&block.header.major_version)?;
|
||||
let hf_vote = HardFork::from_vote(&block.header.major_version);
|
||||
|
||||
let height = match block.miner_tx.prefix.inputs.get(0) {
|
||||
Some(Input::Gen(height)) => *height,
|
||||
_ => {
|
||||
return Err(ConsensusError::MinerTransaction(
|
||||
"Input is not a miner input",
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
let block_hashing_blob = block.serialize_hashable();
|
||||
let (pow_hash, mut prepared_block) = rayon::join(
|
||||
|| {
|
||||
// we calculate the POW hash on a different task because this takes a massive amount of time.
|
||||
calculate_pow_hash(&block_hashing_blob, height, &hf_version)
|
||||
},
|
||||
|| {
|
||||
PrePreparedBlock {
|
||||
block_blob: block.serialize(),
|
||||
block_hash: block.hash(),
|
||||
// set a dummy pow hash for now. We use u8::MAX so if something odd happens and this value isn't changed it will fail for
|
||||
// difficulties > 1.
|
||||
pow_hash: [u8::MAX; 32],
|
||||
miner_tx_weight: block.miner_tx.weight(),
|
||||
block,
|
||||
hf_vote,
|
||||
hf_version,
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
prepared_block.pow_hash = pow_hash?;
|
||||
|
||||
tracing::debug!("prepared block: {}", height);
|
||||
|
||||
Ok(prepared_block)
|
||||
}
|
||||
|
||||
async fn verify_prepared_main_chain_block<C, TxV, TxP>(
|
||||
block: PrePreparedBlock,
|
||||
context_svc: C,
|
||||
tx_verifier_svc: TxV,
|
||||
tx_pool: TxP,
|
||||
) -> Result<VerifiedBlockInformation, ConsensusError>
|
||||
) -> Result<VerifyBlockResponse, ConsensusError>
|
||||
where
|
||||
C: Service<BlockChainContextRequest, Response = BlockChainContext, Error = tower::BoxError>
|
||||
+ Send
|
||||
|
@ -141,7 +227,92 @@ where
|
|||
.map_err(Into::<ConsensusError>::into)?;
|
||||
|
||||
// TODO: should we unwrap here, we did just get the data so it should be ok.
|
||||
let context = checked_context.blockchain_context().unwrap().clone();
|
||||
let context = checked_context.unchecked_blockchain_context().clone();
|
||||
|
||||
tracing::debug!("got blockchain context: {:?}", context);
|
||||
|
||||
let TxPoolResponse::Transactions(txs) = tx_pool
|
||||
.oneshot(TxPoolRequest::Transactions(block.block.txs.clone()))
|
||||
.await?;
|
||||
|
||||
let block_weight = block.miner_tx_weight + txs.iter().map(|tx| tx.tx_weight).sum::<usize>();
|
||||
let total_fees = txs.iter().map(|tx| tx.fee).sum::<u64>();
|
||||
|
||||
tx_verifier_svc
|
||||
.oneshot(VerifyTxRequest::Block {
|
||||
txs: txs.clone(),
|
||||
current_chain_height: context.chain_height,
|
||||
time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(),
|
||||
hf: context.current_hard_fork,
|
||||
re_org_token: context.re_org_token.clone(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
let generated_coins = miner_tx::check_miner_tx(
|
||||
&block.block.miner_tx,
|
||||
total_fees,
|
||||
context.chain_height,
|
||||
block_weight,
|
||||
context.median_weight_for_block_reward,
|
||||
context.already_generated_coins,
|
||||
&context.current_hard_fork,
|
||||
)?;
|
||||
|
||||
checks::block_size_sanity_check(block.block_blob.len(), context.effective_median_weight)?;
|
||||
checks::block_weight_check(block_weight, context.median_weight_for_block_reward)?;
|
||||
|
||||
checks::check_amount_txs(block.block.txs.len())?;
|
||||
checks::check_prev_id(&block.block, &context.top_hash)?;
|
||||
if let Some(median_timestamp) = context.median_block_timestamp {
|
||||
// will only be None for the first 60 blocks
|
||||
checks::check_timestamp(&block.block, median_timestamp)?;
|
||||
}
|
||||
|
||||
checks::check_block_pow(&block.pow_hash, context.next_difficulty)?;
|
||||
|
||||
context
|
||||
.current_hard_fork
|
||||
.check_block_version_vote(&block.block.header)?;
|
||||
|
||||
Ok(VerifyBlockResponse::MainChain(VerifiedBlockInformation {
|
||||
block_hash: block.block_hash,
|
||||
block: block.block,
|
||||
txs,
|
||||
pow_hash: block.pow_hash,
|
||||
generated_coins,
|
||||
weight: block_weight,
|
||||
height: context.chain_height,
|
||||
long_term_weight: context.next_block_long_term_weight(block_weight),
|
||||
hf_vote: HardFork::V1,
|
||||
cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty,
|
||||
}))
|
||||
}
|
||||
|
||||
async fn verify_main_chain_block<C, TxV, TxP>(
|
||||
block: Block,
|
||||
context_svc: C,
|
||||
tx_verifier_svc: TxV,
|
||||
tx_pool: TxP,
|
||||
) -> Result<VerifyBlockResponse, ConsensusError>
|
||||
where
|
||||
C: Service<BlockChainContextRequest, Response = BlockChainContext, Error = tower::BoxError>
|
||||
+ Send
|
||||
+ 'static,
|
||||
C::Future: Send + 'static,
|
||||
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ConsensusError>,
|
||||
TxP: Service<TxPoolRequest, Response = TxPoolResponse, Error = TxNotInPool>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
{
|
||||
tracing::debug!("getting blockchain context");
|
||||
let checked_context = context_svc
|
||||
.oneshot(BlockChainContextRequest)
|
||||
.await
|
||||
.map_err(Into::<ConsensusError>::into)?;
|
||||
|
||||
// TODO: should we unwrap here, we did just get the data so it should be ok.
|
||||
let context = checked_context.unchecked_blockchain_context().clone();
|
||||
|
||||
tracing::debug!("got blockchain context: {:?}", context);
|
||||
|
||||
|
@ -201,7 +372,7 @@ where
|
|||
.current_hard_fork
|
||||
.check_block_version_vote(&block.header)?;
|
||||
|
||||
Ok(VerifiedBlockInformation {
|
||||
Ok(VerifyBlockResponse::MainChain(VerifiedBlockInformation {
|
||||
block_hash: block.hash(),
|
||||
block,
|
||||
txs,
|
||||
|
@ -212,5 +383,5 @@ where
|
|||
long_term_weight: context.next_block_long_term_weight(block_weight),
|
||||
hf_vote: HardFork::V1,
|
||||
cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty,
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
|
|
@ -87,6 +87,7 @@ fn check_time_lock(time_lock: &Timelock, chain_height: u64) -> Result<(), Consen
|
|||
match time_lock {
|
||||
Timelock::Block(till_height) => {
|
||||
if u64::try_from(*till_height).unwrap() != chain_height + MINER_TX_TIME_LOCKED_BLOCKS {
|
||||
tracing::warn!("{}, {}", till_height, chain_height);
|
||||
Err(ConsensusError::MinerTransaction(
|
||||
"Time lock has invalid block height",
|
||||
))
|
||||
|
|
|
@ -14,8 +14,10 @@ use std::{
|
|||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use futures::FutureExt;
|
||||
use tokio::sync::RwLock;
|
||||
use futures::{
|
||||
lock::{Mutex, OwnedMutexGuard, OwnedMutexLockFuture},
|
||||
FutureExt,
|
||||
};
|
||||
use tower::{Service, ServiceExt};
|
||||
|
||||
use crate::{helper::current_time, ConsensusError, Database, DatabaseRequest, DatabaseResponse};
|
||||
|
@ -130,6 +132,7 @@ where
|
|||
}
|
||||
.into(),
|
||||
),
|
||||
lock_state: MutexLockState::Locked,
|
||||
};
|
||||
|
||||
let context_svc_update = context_svc.clone();
|
||||
|
@ -235,6 +238,11 @@ impl BlockChainContext {
|
|||
}
|
||||
Ok(&self.raw)
|
||||
}
|
||||
|
||||
/// Returns the blockchain context without checking the validity token.
|
||||
pub fn unchecked_blockchain_context(&self) -> &RawBlockChainContext {
|
||||
&self.raw
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
@ -257,9 +265,23 @@ struct InternalBlockChainContext {
|
|||
already_generated_coins: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
enum MutexLockState {
|
||||
Locked,
|
||||
Acquiring(OwnedMutexLockFuture<InternalBlockChainContext>),
|
||||
Acquired(OwnedMutexGuard<InternalBlockChainContext>),
|
||||
}
|
||||
pub struct BlockChainContextService {
|
||||
internal_blockchain_context: Arc<RwLock<InternalBlockChainContext>>,
|
||||
internal_blockchain_context: Arc<Mutex<InternalBlockChainContext>>,
|
||||
lock_state: MutexLockState,
|
||||
}
|
||||
|
||||
impl Clone for BlockChainContextService {
|
||||
fn clone(&self) -> Self {
|
||||
BlockChainContextService {
|
||||
internal_blockchain_context: self.internal_blockchain_context.clone(),
|
||||
lock_state: MutexLockState::Locked,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Service<BlockChainContextRequest> for BlockChainContextService {
|
||||
|
@ -268,16 +290,30 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
|
|||
type Future =
|
||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
loop {
|
||||
match &mut self.lock_state {
|
||||
MutexLockState::Locked => {
|
||||
self.lock_state = MutexLockState::Acquiring(
|
||||
Arc::clone(&self.internal_blockchain_context).lock_owned(),
|
||||
)
|
||||
}
|
||||
MutexLockState::Acquiring(rpc) => {
|
||||
self.lock_state = MutexLockState::Acquired(futures::ready!(rpc.poll_unpin(cx)))
|
||||
}
|
||||
MutexLockState::Acquired(_) => return Poll::Ready(Ok(())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&mut self, _: BlockChainContextRequest) -> Self::Future {
|
||||
let internal_blockchain_context = self.internal_blockchain_context.clone();
|
||||
let MutexLockState::Acquired(internal_blockchain_context) =
|
||||
std::mem::replace(&mut self.lock_state, MutexLockState::Locked)
|
||||
else {
|
||||
panic!("poll_ready() was not called first!")
|
||||
};
|
||||
|
||||
async move {
|
||||
let internal_blockchain_context_lock = internal_blockchain_context.read().await;
|
||||
|
||||
let InternalBlockChainContext {
|
||||
current_validity_token,
|
||||
current_reorg_token,
|
||||
|
@ -287,7 +323,7 @@ impl Service<BlockChainContextRequest> for BlockChainContextService {
|
|||
chain_height,
|
||||
top_block_hash,
|
||||
already_generated_coins,
|
||||
} = internal_blockchain_context_lock.deref();
|
||||
} = internal_blockchain_context.deref();
|
||||
|
||||
let current_hf = hardfork_state.current_hardfork();
|
||||
|
||||
|
@ -335,16 +371,30 @@ impl tower::Service<UpdateBlockchainCacheRequest> for BlockChainContextService {
|
|||
type Future =
|
||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
loop {
|
||||
match &mut self.lock_state {
|
||||
MutexLockState::Locked => {
|
||||
self.lock_state = MutexLockState::Acquiring(
|
||||
Arc::clone(&self.internal_blockchain_context).lock_owned(),
|
||||
)
|
||||
}
|
||||
MutexLockState::Acquiring(rpc) => {
|
||||
self.lock_state = MutexLockState::Acquired(futures::ready!(rpc.poll_unpin(cx)))
|
||||
}
|
||||
MutexLockState::Acquired(_) => return Poll::Ready(Ok(())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&mut self, new: UpdateBlockchainCacheRequest) -> Self::Future {
|
||||
let internal_blockchain_context = self.internal_blockchain_context.clone();
|
||||
let MutexLockState::Acquired(mut internal_blockchain_context) =
|
||||
std::mem::replace(&mut self.lock_state, MutexLockState::Locked)
|
||||
else {
|
||||
panic!("poll_ready() was not called first!")
|
||||
};
|
||||
|
||||
async move {
|
||||
let mut internal_blockchain_context_lock = internal_blockchain_context.write().await;
|
||||
|
||||
let InternalBlockChainContext {
|
||||
current_validity_token,
|
||||
current_reorg_token: _,
|
||||
|
@ -354,7 +404,7 @@ impl tower::Service<UpdateBlockchainCacheRequest> for BlockChainContextService {
|
|||
chain_height,
|
||||
top_block_hash,
|
||||
already_generated_coins,
|
||||
} = internal_blockchain_context_lock.deref_mut();
|
||||
} = internal_blockchain_context.deref_mut();
|
||||
|
||||
// Cancel the validity token and replace it with a new one.
|
||||
std::mem::replace(current_validity_token, ValidityToken::new()).set_data_invalid();
|
||||
|
|
|
@ -14,7 +14,9 @@ pub mod rpc;
|
|||
mod test_utils;
|
||||
pub mod transactions;
|
||||
|
||||
pub use block::{VerifiedBlockInformation, VerifyBlockRequest};
|
||||
pub use block::{
|
||||
PrePreparedBlock, VerifiedBlockInformation, VerifyBlockRequest, VerifyBlockResponse,
|
||||
};
|
||||
pub use context::{
|
||||
initialize_blockchain_context, BlockChainContext, BlockChainContextRequest, ContextConfig,
|
||||
HardFork, UpdateBlockchainCacheRequest,
|
||||
|
@ -29,10 +31,15 @@ pub async fn initialize_verifier<D, TxP, Ctx>(
|
|||
) -> Result<
|
||||
(
|
||||
impl tower::Service<
|
||||
VerifyBlockRequest,
|
||||
Response = VerifiedBlockInformation,
|
||||
Error = ConsensusError,
|
||||
>,
|
||||
VerifyBlockRequest,
|
||||
Response = VerifyBlockResponse,
|
||||
Error = ConsensusError,
|
||||
Future = impl Future<Output = Result<VerifyBlockResponse, ConsensusError>>
|
||||
+ Send
|
||||
+ 'static,
|
||||
> + Clone
|
||||
+ Send
|
||||
+ 'static,
|
||||
impl tower::Service<
|
||||
VerifyTxRequest,
|
||||
Response = VerifyTxResponse,
|
||||
|
@ -120,7 +127,7 @@ pub struct OutputOnChain {
|
|||
height: u64,
|
||||
time_lock: monero_serai::transaction::Timelock,
|
||||
key: curve25519_dalek::EdwardsPoint,
|
||||
//mask: curve25519_dalek::EdwardsPoint,
|
||||
mask: curve25519_dalek::EdwardsPoint,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
|
|
|
@ -93,8 +93,8 @@ pub fn init_rpc_load_balancer(
|
|||
let (rpc_discoverer_tx, rpc_discoverer_rx) = futures::channel::mpsc::channel(30);
|
||||
|
||||
let rpc_balance = Balance::new(rpc_discoverer_rx.map(Result::<_, tower::BoxError>::Ok));
|
||||
let timeout = tower::timeout::Timeout::new(rpc_balance, Duration::from_secs(1200));
|
||||
let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(timeout), 30);
|
||||
let timeout = tower::timeout::Timeout::new(rpc_balance, Duration::from_secs(300));
|
||||
let rpc_buffer = tower::buffer::Buffer::new(BoxService::new(timeout), 50);
|
||||
let rpcs = tower::retry::Retry::new(Attempts(10), rpc_buffer);
|
||||
|
||||
let discover = discover::RPCDiscover {
|
||||
|
@ -415,7 +415,7 @@ async fn get_outputs<R: RpcConnection>(
|
|||
struct OutputRes {
|
||||
height: u64,
|
||||
key: [u8; 32],
|
||||
// mask: [u8; 32],
|
||||
mask: [u8; 32],
|
||||
txid: [u8; 32],
|
||||
}
|
||||
|
||||
|
@ -424,18 +424,21 @@ async fn get_outputs<R: RpcConnection>(
|
|||
outs: Vec<OutputRes>,
|
||||
}
|
||||
|
||||
let outputs = out_ids
|
||||
.into_iter()
|
||||
.flat_map(|(amt, amt_map)| {
|
||||
amt_map
|
||||
.into_iter()
|
||||
.map(|amt_idx| OutputID {
|
||||
amount: amt,
|
||||
index: amt_idx,
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let outputs = rayon_spawn_async(|| {
|
||||
out_ids
|
||||
.into_par_iter()
|
||||
.flat_map(|(amt, amt_map)| {
|
||||
amt_map
|
||||
.into_iter()
|
||||
.map(|amt_idx| OutputID {
|
||||
amount: amt,
|
||||
index: amt_idx,
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.await;
|
||||
|
||||
let res = rpc
|
||||
.bin_call(
|
||||
|
@ -446,36 +449,36 @@ async fn get_outputs<R: RpcConnection>(
|
|||
)
|
||||
.await?;
|
||||
|
||||
let outs: Response = monero_epee_bin_serde::from_bytes(&res)?;
|
||||
rayon_spawn_async(move || {
|
||||
let outs: Response = monero_epee_bin_serde::from_bytes(&res)?;
|
||||
|
||||
tracing::info!("Got outputs len: {}", outs.outs.len());
|
||||
tracing::info!("Got outputs len: {}", outs.outs.len());
|
||||
|
||||
let mut ret = HashMap::new();
|
||||
let cache = cache.read().unwrap();
|
||||
let mut ret = HashMap::new();
|
||||
let cache = cache.read().unwrap();
|
||||
|
||||
for (out, idx) in outs.outs.iter().zip(outputs) {
|
||||
ret.entry(idx.amount).or_insert_with(HashMap::new).insert(
|
||||
idx.index,
|
||||
OutputOnChain {
|
||||
height: out.height,
|
||||
time_lock: cache.outputs_time_lock(&out.txid),
|
||||
// we unwrap these as we are checking already approved rings so if these points are bad
|
||||
// then a bad proof has been approved.
|
||||
key: CompressedEdwardsY::from_slice(&out.key)
|
||||
.unwrap()
|
||||
.decompress()
|
||||
.unwrap(),
|
||||
/*
|
||||
mask: CompressedEdwardsY::from_slice(&out.mask)
|
||||
.unwrap()
|
||||
.decompress()
|
||||
.unwrap(),
|
||||
|
||||
*/
|
||||
},
|
||||
);
|
||||
}
|
||||
Ok(DatabaseResponse::Outputs(ret))
|
||||
for (out, idx) in outs.outs.iter().zip(outputs) {
|
||||
ret.entry(idx.amount).or_insert_with(HashMap::new).insert(
|
||||
idx.index,
|
||||
OutputOnChain {
|
||||
height: out.height,
|
||||
time_lock: cache.outputs_time_lock(&out.txid),
|
||||
// we unwrap these as we are checking already approved rings so if these points are bad
|
||||
// then a bad proof has been approved.
|
||||
key: CompressedEdwardsY::from_slice(&out.key)
|
||||
.unwrap()
|
||||
.decompress()
|
||||
.unwrap(),
|
||||
mask: CompressedEdwardsY::from_slice(&out.mask)
|
||||
.unwrap()
|
||||
.decompress()
|
||||
.unwrap(),
|
||||
},
|
||||
);
|
||||
}
|
||||
Ok(DatabaseResponse::Outputs(ret))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_blocks_in_range<R: RpcConnection>(
|
||||
|
@ -566,7 +569,7 @@ async fn get_block_info_in_range<R: RpcConnection>(
|
|||
Ok(DatabaseResponse::BlockExtendedHeaderInRange(
|
||||
rayon_spawn_async(|| {
|
||||
res.headers
|
||||
.into_iter()
|
||||
.into_par_iter()
|
||||
.map(|info| ExtendedBlockHeader {
|
||||
version: HardFork::from_version(&info.major_version)
|
||||
.expect("previously checked block has incorrect version"),
|
||||
|
|
|
@ -19,10 +19,7 @@ use std::{
|
|||
};
|
||||
|
||||
use curve25519_dalek::EdwardsPoint;
|
||||
use monero_serai::{
|
||||
ringct::RctType,
|
||||
transaction::{Input, Timelock},
|
||||
};
|
||||
use monero_serai::transaction::{Input, Timelock};
|
||||
use tower::ServiceExt;
|
||||
|
||||
use crate::{
|
||||
|
@ -30,6 +27,8 @@ use crate::{
|
|||
DatabaseRequest, DatabaseResponse, HardFork, OutputOnChain,
|
||||
};
|
||||
|
||||
use super::TxVersion;
|
||||
|
||||
pub async fn batch_refresh_ring_member_info<D: Database + Clone + Send + Sync + 'static>(
|
||||
txs_verification_data: &[Arc<TransactionVerificationData>],
|
||||
hf: &HardFork,
|
||||
|
@ -158,7 +157,7 @@ pub async fn batch_fill_ring_member_info<D: Database + Clone + Send + Sync + 'st
|
|||
.insert(TxRingMembersInfo::new(
|
||||
ring_members_for_tx,
|
||||
decoy_info,
|
||||
tx_v_data.tx.rct_signatures.rct_type(),
|
||||
tx_v_data.version,
|
||||
*hf,
|
||||
re_org_token.clone(),
|
||||
));
|
||||
|
@ -229,21 +228,31 @@ fn insert_ring_member_ids(
|
|||
pub enum Rings {
|
||||
/// Legacy, pre-ringCT, rings.
|
||||
Legacy(Vec<Vec<EdwardsPoint>>),
|
||||
// TODO:
|
||||
// RingCT,
|
||||
// RingCT rings, (outkey, mask).
|
||||
RingCT(Vec<Vec<[EdwardsPoint; 2]>>),
|
||||
}
|
||||
|
||||
impl Rings {
|
||||
/// Builds the rings for the transaction inputs, from the given outputs.
|
||||
fn new(outputs: Vec<Vec<&OutputOnChain>>, rct_type: RctType) -> Rings {
|
||||
match rct_type {
|
||||
RctType::Null => Rings::Legacy(
|
||||
fn new(outputs: Vec<Vec<&OutputOnChain>>, tx_version: TxVersion) -> Rings {
|
||||
match tx_version {
|
||||
TxVersion::RingSignatures => Rings::Legacy(
|
||||
outputs
|
||||
.into_iter()
|
||||
.map(|inp_outs| inp_outs.into_iter().map(|out| out.key).collect())
|
||||
.collect(),
|
||||
),
|
||||
_ => todo!("RingCT"),
|
||||
TxVersion::RingCT => Rings::RingCT(
|
||||
outputs
|
||||
.into_iter()
|
||||
.map(|inp_outs| {
|
||||
inp_outs
|
||||
.into_iter()
|
||||
.map(|out| [out.key, out.mask])
|
||||
.collect()
|
||||
})
|
||||
.collect(),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -269,7 +278,7 @@ impl TxRingMembersInfo {
|
|||
fn new(
|
||||
used_outs: Vec<Vec<&OutputOnChain>>,
|
||||
decoy_info: Option<DecoyInfo>,
|
||||
rct_type: RctType,
|
||||
tx_version: TxVersion,
|
||||
hf: HardFork,
|
||||
re_org_token: ReOrgToken,
|
||||
) -> TxRingMembersInfo {
|
||||
|
@ -298,7 +307,7 @@ impl TxRingMembersInfo {
|
|||
.collect::<Vec<_>>()
|
||||
})
|
||||
.collect(),
|
||||
rings: Rings::new(used_outs, rct_type),
|
||||
rings: Rings::new(used_outs, tx_version),
|
||||
re_org_token,
|
||||
decoy_info,
|
||||
hf,
|
||||
|
|
|
@ -12,6 +12,6 @@ pub fn verify_signatures(tx: &Transaction, rings: &Rings) -> Result<(), Consensu
|
|||
rings,
|
||||
&tx.signature_hash(),
|
||||
),
|
||||
//_ => panic!("TODO: RCT"),
|
||||
_ => panic!("TODO: RCT"),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,7 +47,8 @@ pub fn verify_inputs_signatures(
|
|||
}
|
||||
Ok(())
|
||||
})?;
|
||||
} // _ => panic!("tried to verify v1 tx with a non v1 ring"),
|
||||
},
|
||||
_ => panic!("tried to verify v1 tx with a non v1 ring"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue