mirror of
https://github.com/Cuprate/cuprate.git
synced 2025-01-09 04:20:01 +00:00
fix new block handling
This commit is contained in:
parent
d2ab8e20c4
commit
291ffe324d
4 changed files with 41 additions and 20 deletions
|
@ -7,13 +7,15 @@ use cuprate_types::Chain;
|
||||||
use monero_serai::block::Block;
|
use monero_serai::block::Block;
|
||||||
use monero_serai::transaction::Transaction;
|
use monero_serai::transaction::Transaction;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use std::collections::HashMap;
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::sync::OnceLock;
|
use std::sync::{Mutex, OnceLock};
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
use tower::{Service, ServiceExt};
|
use tower::{Service, ServiceExt};
|
||||||
|
|
||||||
pub static INCOMING_BLOCK_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();
|
pub static INCOMING_BLOCK_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();
|
||||||
|
|
||||||
|
pub static BLOCKS_BEING_HANDLED: OnceLock<Mutex<HashSet<[u8; 32]>>> = OnceLock::new();
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub enum IncomingBlockError {
|
pub enum IncomingBlockError {
|
||||||
#[error("Unknown transactions in block.")]
|
#[error("Unknown transactions in block.")]
|
||||||
|
@ -62,6 +64,10 @@ pub async fn handle_incoming_block(
|
||||||
.collect::<Result<_, anyhow::Error>>()
|
.collect::<Result<_, anyhow::Error>>()
|
||||||
.map_err(IncomingBlockError::InvalidBlock)?;
|
.map_err(IncomingBlockError::InvalidBlock)?;
|
||||||
|
|
||||||
|
if !BLOCKS_BEING_HANDLED.get_or_init(|| Mutex::new(HashSet::new())).lock().unwrap().insert(block_hash) {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
|
||||||
let Some(incoming_block_tx) = INCOMING_BLOCK_TX.get() else {
|
let Some(incoming_block_tx) = INCOMING_BLOCK_TX.get() else {
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
};
|
};
|
||||||
|
@ -77,10 +83,14 @@ pub async fn handle_incoming_block(
|
||||||
.await
|
.await
|
||||||
.expect("TODO: don't actually panic here");
|
.expect("TODO: don't actually panic here");
|
||||||
|
|
||||||
response_rx
|
let res =response_rx
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.map_err(IncomingBlockError::InvalidBlock)
|
.map_err(IncomingBlockError::InvalidBlock);
|
||||||
|
|
||||||
|
BLOCKS_BEING_HANDLED.get().unwrap().lock().unwrap().remove(&block_hash);
|
||||||
|
|
||||||
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn block_exists(
|
async fn block_exists(
|
||||||
|
|
|
@ -19,7 +19,10 @@ use cuprate_helper::map::split_u128_into_low_high_bits;
|
||||||
use cuprate_p2p::constants::{MAX_BLOCKCHAIN_SUPPLEMENT_LEN, MAX_BLOCK_BATCH_LEN};
|
use cuprate_p2p::constants::{MAX_BLOCKCHAIN_SUPPLEMENT_LEN, MAX_BLOCK_BATCH_LEN};
|
||||||
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
|
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
|
||||||
use cuprate_types::BlockCompleteEntry;
|
use cuprate_types::BlockCompleteEntry;
|
||||||
use cuprate_wire::protocol::{ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest, GetObjectsResponse, NewFluffyBlock};
|
use cuprate_wire::protocol::{
|
||||||
|
ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest,
|
||||||
|
GetObjectsResponse, NewFluffyBlock,
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct P2pProtocolRequestHandler {
|
pub struct P2pProtocolRequestHandler {
|
||||||
|
@ -46,7 +49,9 @@ impl Service<ProtocolRequest> for P2pProtocolRequestHandler {
|
||||||
ProtocolRequest::FluffyMissingTxs(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
|
ProtocolRequest::FluffyMissingTxs(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
|
||||||
ProtocolRequest::GetTxPoolCompliment(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
|
ProtocolRequest::GetTxPoolCompliment(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
|
||||||
ProtocolRequest::NewBlock(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
|
ProtocolRequest::NewBlock(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
|
||||||
ProtocolRequest::NewFluffyBlock(block) => new_fluffy_block(self.blockchain_read_handle.clone(), block).boxed(),
|
ProtocolRequest::NewFluffyBlock(block) => {
|
||||||
|
new_fluffy_block(self.blockchain_read_handle.clone(), block).boxed()
|
||||||
|
}
|
||||||
ProtocolRequest::NewTransactions(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
|
ProtocolRequest::NewTransactions(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -158,14 +163,16 @@ async fn new_fluffy_block(
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let res = handle_incoming_block(block, txs, &mut blockchain_read_handle).await;
|
let res = handle_incoming_block(block, txs, &mut blockchain_read_handle).await;
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
Err(IncomingBlockError::UnknownTransactions(block_hash, tx_indexes)) => {
|
Err(IncomingBlockError::UnknownTransactions(block_hash, tx_indexes)) => {
|
||||||
return Ok(ProtocolResponse::FluffyMissingTxs(FluffyMissingTransactionsRequest{
|
return Ok(ProtocolResponse::FluffyMissingTxs(
|
||||||
block_hash: ByteArray::from(block_hash),
|
FluffyMissingTransactionsRequest {
|
||||||
current_blockchain_height: peer_blockchain_height,
|
block_hash: ByteArray::from(block_hash),
|
||||||
missing_tx_indices: tx_indexes,
|
current_blockchain_height: peer_blockchain_height,
|
||||||
}))
|
missing_tx_indices: tx_indexes,
|
||||||
|
},
|
||||||
|
))
|
||||||
}
|
}
|
||||||
Err(IncomingBlockError::InvalidBlock(e)) => Err(e)?,
|
Err(IncomingBlockError::InvalidBlock(e)) => Err(e)?,
|
||||||
Err(IncomingBlockError::Orphan) | Ok(_) => Ok(ProtocolResponse::NA),
|
Err(IncomingBlockError::Orphan) | Ok(_) => Ok(ProtocolResponse::NA),
|
||||||
|
|
|
@ -75,7 +75,9 @@ impl TryFrom<ProtocolResponse> for ProtocolMessage {
|
||||||
ProtocolResponse::NewFluffyBlock(val) => ProtocolMessage::NewFluffyBlock(val),
|
ProtocolResponse::NewFluffyBlock(val) => ProtocolMessage::NewFluffyBlock(val),
|
||||||
ProtocolResponse::GetChain(val) => ProtocolMessage::ChainEntryResponse(val),
|
ProtocolResponse::GetChain(val) => ProtocolMessage::ChainEntryResponse(val),
|
||||||
ProtocolResponse::GetObjects(val) => ProtocolMessage::GetObjectsResponse(val),
|
ProtocolResponse::GetObjects(val) => ProtocolMessage::GetObjectsResponse(val),
|
||||||
ProtocolResponse::FluffyMissingTxs(val) => ProtocolMessage::FluffyMissingTransactionsRequest(val),
|
ProtocolResponse::FluffyMissingTxs(val) => {
|
||||||
|
ProtocolMessage::FluffyMissingTransactionsRequest(val)
|
||||||
|
}
|
||||||
ProtocolResponse::NA => return Err(MessageConversionError),
|
ProtocolResponse::NA => return Err(MessageConversionError),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -268,12 +268,14 @@ fn find_block(env: &ConcreteEnv, block_hash: BlockHash) -> ResponseResult {
|
||||||
|
|
||||||
let table_alt_block_heights = env_inner.open_db_ro::<AltBlockHeights>(&tx_ro)?;
|
let table_alt_block_heights = env_inner.open_db_ro::<AltBlockHeights>(&tx_ro)?;
|
||||||
|
|
||||||
let height = table_alt_block_heights.get(&block_hash)?;
|
match table_alt_block_heights.get(&block_hash) {
|
||||||
|
Ok(height) => Ok(BlockchainResponse::FindBlock(Some((
|
||||||
Ok(BlockchainResponse::FindBlock(Some((
|
Chain::Alt(height.chain_id.into()),
|
||||||
Chain::Alt(height.chain_id.into()),
|
height.height,
|
||||||
height.height,
|
)))),
|
||||||
))))
|
Err(RuntimeError::KeyNotFound) => Ok(BlockchainResponse::FindBlock(None)),
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// [`BlockchainReadRequest::FilterUnknownHashes`].
|
/// [`BlockchainReadRequest::FilterUnknownHashes`].
|
||||||
|
|
Loading…
Reference in a new issue