mirror of
https://github.com/Cuprate/cuprate.git
synced 2025-01-08 20:09:44 +00:00
add new_fluffy_block
handler
This commit is contained in:
parent
aa274ebafe
commit
37025bdfc0
4 changed files with 84 additions and 15 deletions
|
@ -17,7 +17,7 @@ use cuprate_consensus::transactions::new_tx_verification_data;
|
|||
use cuprate_helper::cast::usize_to_u64;
|
||||
use cuprate_types::{
|
||||
blockchain::{BlockchainReadRequest, BlockchainResponse},
|
||||
Chain,
|
||||
Chain, TransactionVerificationData,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
|
@ -65,7 +65,7 @@ pub enum IncomingBlockError {
|
|||
/// - the block's parent is unknown
|
||||
pub async fn handle_incoming_block(
|
||||
block: Block,
|
||||
given_txs: Vec<Transaction>,
|
||||
given_txs: HashMap<[u8; 32], TransactionVerificationData>,
|
||||
blockchain_read_handle: &mut BlockchainReadHandle,
|
||||
) -> Result<bool, IncomingBlockError> {
|
||||
// FIXME: we should look in the tx-pool for txs when that is ready.
|
||||
|
@ -95,14 +95,6 @@ pub async fn handle_incoming_block(
|
|||
}
|
||||
|
||||
// TODO: check we actually got given the right txs.
|
||||
let prepped_txs = given_txs
|
||||
.into_par_iter()
|
||||
.map(|tx| {
|
||||
let tx = new_tx_verification_data(tx)?;
|
||||
Ok((tx.tx_hash, tx))
|
||||
})
|
||||
.collect::<Result<_, anyhow::Error>>()
|
||||
.map_err(IncomingBlockError::InvalidBlock)?;
|
||||
|
||||
let Some(incoming_block_tx) = COMMAND_TX.get() else {
|
||||
// We could still be starting up the blockchain manger, so just return this as there is nothing
|
||||
|
@ -128,7 +120,7 @@ pub async fn handle_incoming_block(
|
|||
incoming_block_tx
|
||||
.send(BlockchainManagerCommand::AddBlock {
|
||||
block,
|
||||
prepped_txs,
|
||||
prepped_txs: given_txs,
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
use bytes::Bytes;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::FutureExt;
|
||||
use monero_serai::block::Block;
|
||||
use monero_serai::transaction::Transaction;
|
||||
use std::collections::HashSet;
|
||||
use std::{
|
||||
future::{ready, Ready},
|
||||
task::{Context, Poll},
|
||||
|
@ -8,8 +11,10 @@ use std::{
|
|||
use tower::{Service, ServiceExt};
|
||||
|
||||
use cuprate_blockchain::service::BlockchainReadHandle;
|
||||
use cuprate_consensus::transactions::new_tx_verification_data;
|
||||
use cuprate_consensus::BlockChainContextService;
|
||||
use cuprate_fixed_bytes::ByteArrayVec;
|
||||
use cuprate_helper::asynch::rayon_spawn_async;
|
||||
use cuprate_helper::cast::usize_to_u64;
|
||||
use cuprate_helper::map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits};
|
||||
use cuprate_p2p::constants::MAX_BLOCK_BATCH_LEN;
|
||||
|
@ -21,6 +26,9 @@ use cuprate_wire::protocol::{
|
|||
GetObjectsResponse, NewFluffyBlock,
|
||||
};
|
||||
|
||||
use crate::blockchain::interface as blockchain_interface;
|
||||
use crate::blockchain::interface::IncomingBlockError;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct P2pProtocolRequestHandlerMaker {
|
||||
pub blockchain_read_handle: BlockchainReadHandle,
|
||||
|
@ -77,7 +85,9 @@ impl<Z: NetworkZone> Service<ProtocolRequest> for P2pProtocolRequestHandler<Z> {
|
|||
"Peer sent a full block when we support fluffy blocks"
|
||||
)))
|
||||
.boxed(),
|
||||
ProtocolRequest::NewFluffyBlock(_) => todo!(),
|
||||
ProtocolRequest::NewFluffyBlock(r) => {
|
||||
new_fluffy_block(r, self.blockchain_read_handle.clone()).boxed()
|
||||
}
|
||||
ProtocolRequest::GetTxPoolCompliment(_) | ProtocolRequest::NewTransactions(_) => {
|
||||
ready(Ok(ProtocolResponse::NA)).boxed()
|
||||
} // TODO: tx-pool
|
||||
|
@ -97,7 +107,7 @@ async fn get_objects(
|
|||
}
|
||||
|
||||
let block_hashes: Vec<[u8; 32]> = (&request.blocks).into();
|
||||
// de-allocate the backing `Bytes`.
|
||||
// deallocate the backing `Bytes`.
|
||||
drop(request);
|
||||
|
||||
let BlockchainResponse::BlockCompleteEntries {
|
||||
|
@ -131,7 +141,7 @@ async fn get_chain(
|
|||
|
||||
let block_hashes: Vec<[u8; 32]> = (&request.block_ids).into();
|
||||
let want_pruned_data = request.prune;
|
||||
// de-allocate the backing `Bytes`.
|
||||
// deallocate the backing `Bytes`.
|
||||
drop(request);
|
||||
|
||||
let BlockchainResponse::NextChainEntry {
|
||||
|
@ -182,7 +192,7 @@ async fn fluffy_missing_txs(
|
|||
let block_hash: [u8; 32] = *request.block_hash;
|
||||
let current_blockchain_height = request.current_blockchain_height;
|
||||
|
||||
// de-allocate the backing `Bytes`.
|
||||
// deallocate the backing `Bytes`.
|
||||
drop(request);
|
||||
|
||||
let BlockchainResponse::MissingTxsInBlock(res) = blockchain_read_handle
|
||||
|
@ -212,3 +222,63 @@ async fn fluffy_missing_txs(
|
|||
current_blockchain_height,
|
||||
}))
|
||||
}
|
||||
|
||||
/// [`ProtocolRequest::NewFluffyBlock`]
|
||||
async fn new_fluffy_block(
|
||||
request: NewFluffyBlock,
|
||||
mut blockchain_read_handle: BlockchainReadHandle,
|
||||
) -> anyhow::Result<ProtocolResponse> {
|
||||
let current_blockchain_height = request.current_blockchain_height;
|
||||
|
||||
let (block, txs) = rayon_spawn_async(move || -> Result<_, anyhow::Error> {
|
||||
let block = Block::read(&mut request.b.block.as_ref())?;
|
||||
|
||||
let tx_blobs = request
|
||||
.b
|
||||
.txs
|
||||
.take_normal()
|
||||
.ok_or(anyhow::anyhow!("Peer sent pruned txs in fluffy block"))?;
|
||||
|
||||
let mut txs_in_block = block.transactions.iter().copied().collect::<HashSet<_>>();
|
||||
|
||||
// TODO: size check these tx blobs
|
||||
let txs = tx_blobs
|
||||
.into_iter()
|
||||
.map(|tx_blob| {
|
||||
let tx = Transaction::read(&mut tx_blob.as_ref())?;
|
||||
|
||||
let tx = new_tx_verification_data(tx)?;
|
||||
|
||||
if !txs_in_block.remove(&tx.tx_hash) {
|
||||
anyhow::bail!("Peer sent tx in fluffy block that wasn't actually in block")
|
||||
}
|
||||
|
||||
Ok((tx.tx_hash, tx))
|
||||
})
|
||||
.collect::<Result<_, anyhow::Error>>()?;
|
||||
|
||||
// The backing `Bytes` will be deallocated when this closure returns.
|
||||
|
||||
Ok((block, txs))
|
||||
})
|
||||
.await?;
|
||||
|
||||
let res =
|
||||
blockchain_interface::handle_incoming_block(block, txs, &mut blockchain_read_handle).await;
|
||||
|
||||
match res {
|
||||
Ok(_) => Ok(ProtocolResponse::NA),
|
||||
Err(IncomingBlockError::UnknownTransactions(block_hash, missing_tx_indices)) => Ok(
|
||||
ProtocolResponse::FluffyMissingTransactionsRequest(FluffyMissingTransactionsRequest {
|
||||
block_hash: block_hash.into(),
|
||||
current_blockchain_height,
|
||||
missing_tx_indices,
|
||||
}),
|
||||
),
|
||||
Err(IncomingBlockError::Orphan) => {
|
||||
// Block's parent was unknown, could be syncing?
|
||||
Ok(ProtocolResponse::NA)
|
||||
}
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -116,6 +116,7 @@ pub enum ProtocolResponse {
|
|||
GetChain(ChainResponse),
|
||||
NewFluffyBlock(NewFluffyBlock),
|
||||
NewTransactions(NewTransactions),
|
||||
FluffyMissingTransactionsRequest(FluffyMissingTransactionsRequest),
|
||||
NA,
|
||||
}
|
||||
|
||||
|
@ -139,6 +140,9 @@ impl PeerResponse {
|
|||
ProtocolResponse::GetChain(_) => MessageID::GetChain,
|
||||
ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock,
|
||||
ProtocolResponse::NewTransactions(_) => MessageID::NewFluffyBlock,
|
||||
ProtocolResponse::FluffyMissingTransactionsRequest(_) => {
|
||||
MessageID::FluffyMissingTxs
|
||||
}
|
||||
|
||||
ProtocolResponse::NA => return None,
|
||||
},
|
||||
|
|
|
@ -71,6 +71,9 @@ impl TryFrom<ProtocolResponse> for ProtocolMessage {
|
|||
ProtocolResponse::NewFluffyBlock(val) => Self::NewFluffyBlock(val),
|
||||
ProtocolResponse::GetChain(val) => Self::ChainEntryResponse(val),
|
||||
ProtocolResponse::GetObjects(val) => Self::GetObjectsResponse(val),
|
||||
ProtocolResponse::FluffyMissingTransactionsRequest(val) => {
|
||||
Self::FluffyMissingTransactionsRequest(val)
|
||||
}
|
||||
ProtocolResponse::NA => return Err(MessageConversionError),
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue