add fluffy block handler

This commit is contained in:
Boog900 2024-09-15 01:20:24 +01:00
parent a16381ea61
commit d2ab8e20c4
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
8 changed files with 71 additions and 14 deletions

View file

@ -22,12 +22,14 @@ mod manager;
mod syncer; mod syncer;
mod types; mod types;
use crate::blockchain::free::INCOMING_BLOCK_TX;
use manager::BlockchainManager; use manager::BlockchainManager;
use types::{ use types::{
ChainService, ConcreteBlockVerifierService, ConcreteTxVerifierService, ChainService, ConcreteBlockVerifierService, ConcreteTxVerifierService,
ConsensusBlockchainReadHandle, ConsensusBlockchainReadHandle,
}; };
use crate::blockchain::free::INCOMING_BLOCK_TX;
pub use free::{handle_incoming_block, IncomingBlockError};
/// Checks if the genesis block is in the blockchain and adds it if not. /// Checks if the genesis block is in the blockchain and adds it if not.
pub async fn check_add_genesis( pub async fn check_add_genesis(

View file

@ -1,3 +1,4 @@
use crate::blockchain::manager::commands::BlockchainManagerCommand;
use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::transactions::new_tx_verification_data; use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_helper::cast::usize_to_u64; use cuprate_helper::cast::usize_to_u64;
@ -10,14 +11,13 @@ use std::collections::HashMap;
use std::sync::OnceLock; use std::sync::OnceLock;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use crate::blockchain::manager::commands::BlockchainManagerCommand;
pub static INCOMING_BLOCK_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new(); pub static INCOMING_BLOCK_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum IncomingBlockError { pub enum IncomingBlockError {
#[error("Unknown transactions in block.")] #[error("Unknown transactions in block.")]
UnknownTransactions(Vec<u64>), UnknownTransactions([u8; 32], Vec<u64>),
#[error("The block has an unknown parent.")] #[error("The block has an unknown parent.")]
Orphan, Orphan,
#[error(transparent)] #[error(transparent)]
@ -29,7 +29,10 @@ pub async fn handle_incoming_block(
given_txs: Vec<Transaction>, given_txs: Vec<Transaction>,
blockchain_read_handle: &mut BlockchainReadHandle, blockchain_read_handle: &mut BlockchainReadHandle,
) -> Result<bool, IncomingBlockError> { ) -> Result<bool, IncomingBlockError> {
if !block_exists(block.header.previous, blockchain_read_handle).await.expect("TODO") { if !block_exists(block.header.previous, blockchain_read_handle)
.await
.expect("TODO")
{
return Err(IncomingBlockError::Orphan); return Err(IncomingBlockError::Orphan);
} }
@ -45,6 +48,7 @@ pub async fn handle_incoming_block(
// TODO: Get transactions from the tx pool first. // TODO: Get transactions from the tx pool first.
if given_txs.len() != block.transactions.len() { if given_txs.len() != block.transactions.len() {
return Err(IncomingBlockError::UnknownTransactions( return Err(IncomingBlockError::UnknownTransactions(
block_hash,
(0..usize_to_u64(block.transactions.len())).collect(), (0..usize_to_u64(block.transactions.len())).collect(),
)); ));
} }
@ -65,7 +69,7 @@ pub async fn handle_incoming_block(
let (response_tx, response_rx) = oneshot::channel(); let (response_tx, response_rx) = oneshot::channel();
incoming_block_tx incoming_block_tx
.send( BlockchainManagerCommand::AddBlock { .send(BlockchainManagerCommand::AddBlock {
block, block,
prepped_txs, prepped_txs,
response_tx, response_tx,
@ -73,7 +77,10 @@ pub async fn handle_incoming_block(
.await .await
.expect("TODO: don't actually panic here"); .expect("TODO: don't actually panic here");
response_rx.await.unwrap().map_err(IncomingBlockError::InvalidBlock) response_rx
.await
.unwrap()
.map_err(IncomingBlockError::InvalidBlock)
} }
async fn block_exists( async fn block_exists(

View file

@ -1,6 +1,7 @@
mod handler;
pub(super) mod commands; pub(super) mod commands;
mod handler;
use crate::blockchain::manager::commands::BlockchainManagerCommand;
use crate::blockchain::types::ConsensusBlockchainReadHandle; use crate::blockchain::types::ConsensusBlockchainReadHandle;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::context::RawBlockChainContext; use cuprate_consensus::context::RawBlockChainContext;
@ -23,7 +24,6 @@ use tokio::sync::{oneshot, Notify};
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use tracing::error; use tracing::error;
use tracing_subscriber::fmt::time::FormatTime; use tracing_subscriber::fmt::time::FormatTime;
use crate::blockchain::manager::commands::BlockchainManagerCommand;
pub struct BlockchainManager { pub struct BlockchainManager {
blockchain_write_handle: BlockchainWriteHandle, blockchain_write_handle: BlockchainWriteHandle,

View file

@ -14,4 +14,3 @@ pub enum BlockchainManagerCommand {
PopBlocks, PopBlocks,
} }

View file

@ -20,8 +20,8 @@ use cuprate_types::{
AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation, AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation,
}; };
use crate::{blockchain::types::ConsensusBlockchainReadHandle, signals::REORG_LOCK};
use crate::blockchain::manager::commands::BlockchainManagerCommand; use crate::blockchain::manager::commands::BlockchainManagerCommand;
use crate::{blockchain::types::ConsensusBlockchainReadHandle, signals::REORG_LOCK};
impl super::BlockchainManager { impl super::BlockchainManager {
pub async fn handle_command(&mut self, command: BlockchainManagerCommand) { pub async fn handle_command(&mut self, command: BlockchainManagerCommand) {
@ -29,13 +29,13 @@ impl super::BlockchainManager {
BlockchainManagerCommand::AddBlock { BlockchainManagerCommand::AddBlock {
block, block,
prepped_txs, prepped_txs,
response_tx response_tx,
} => { } => {
let res = self.handle_incoming_block(block, prepped_txs).await; let res = self.handle_incoming_block(block, prepped_txs).await;
drop(response_tx.send(res)); drop(response_tx.send(res));
} }
BlockchainManagerCommand::PopBlocks => todo!() BlockchainManagerCommand::PopBlocks => todo!(),
} }
} }

View file

@ -2,17 +2,24 @@ use bytes::Bytes;
use cuprate_p2p_core::{ProtocolRequest, ProtocolResponse}; use cuprate_p2p_core::{ProtocolRequest, ProtocolResponse};
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::FutureExt; use futures::FutureExt;
use monero_serai::block::Block;
use monero_serai::transaction::Transaction;
use rayon::prelude::*;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use tracing::trace; use tracing::trace;
use crate::blockchain::{handle_incoming_block, IncomingBlockError};
use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_fixed_bytes::ByteArray;
use cuprate_helper::asynch::rayon_spawn_async;
use cuprate_helper::cast::usize_to_u64; use cuprate_helper::cast::usize_to_u64;
use cuprate_helper::map::split_u128_into_low_high_bits; use cuprate_helper::map::split_u128_into_low_high_bits;
use cuprate_p2p::constants::{MAX_BLOCKCHAIN_SUPPLEMENT_LEN, MAX_BLOCK_BATCH_LEN}; use cuprate_p2p::constants::{MAX_BLOCKCHAIN_SUPPLEMENT_LEN, MAX_BLOCK_BATCH_LEN};
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
use cuprate_types::BlockCompleteEntry; use cuprate_types::BlockCompleteEntry;
use cuprate_wire::protocol::{ChainRequest, ChainResponse, GetObjectsRequest, GetObjectsResponse}; use cuprate_wire::protocol::{ChainRequest, ChainResponse, FluffyMissingTransactionsRequest, GetObjectsRequest, GetObjectsResponse, NewFluffyBlock};
#[derive(Clone)] #[derive(Clone)]
pub struct P2pProtocolRequestHandler { pub struct P2pProtocolRequestHandler {
@ -39,7 +46,7 @@ impl Service<ProtocolRequest> for P2pProtocolRequestHandler {
ProtocolRequest::FluffyMissingTxs(_) => async { Ok(ProtocolResponse::NA) }.boxed(), ProtocolRequest::FluffyMissingTxs(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
ProtocolRequest::GetTxPoolCompliment(_) => async { Ok(ProtocolResponse::NA) }.boxed(), ProtocolRequest::GetTxPoolCompliment(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
ProtocolRequest::NewBlock(_) => async { Ok(ProtocolResponse::NA) }.boxed(), ProtocolRequest::NewBlock(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
ProtocolRequest::NewFluffyBlock(_) => async { Ok(ProtocolResponse::NA) }.boxed(), ProtocolRequest::NewFluffyBlock(block) => new_fluffy_block(self.blockchain_read_handle.clone(), block).boxed(),
ProtocolRequest::NewTransactions(_) => async { Ok(ProtocolResponse::NA) }.boxed(), ProtocolRequest::NewTransactions(_) => async { Ok(ProtocolResponse::NA) }.boxed(),
} }
} }
@ -125,3 +132,42 @@ async fn get_chain(
first_block: first_missing_block.map_or(Bytes::new(), Bytes::from), first_block: first_missing_block.map_or(Bytes::new(), Bytes::from),
})) }))
} }
async fn new_fluffy_block(
mut blockchain_read_handle: BlockchainReadHandle,
incoming_block: NewFluffyBlock,
) -> Result<ProtocolResponse, tower::BoxError> {
let peer_blockchain_height = incoming_block.current_blockchain_height;
let (block, txs) = rayon_spawn_async(move || {
let block = Block::read(&mut incoming_block.b.block.as_ref())?;
let txs = incoming_block
.b
.txs
.take_normal()
.expect("TODO")
.into_par_iter()
.map(|tx| {
let tx = Transaction::read(&mut tx.as_ref())?;
Ok(tx)
})
.collect::<Result<_, tower::BoxError>>()?;
Ok::<_, tower::BoxError>((block, txs))
})
.await?;
let res = handle_incoming_block(block, txs, &mut blockchain_read_handle).await;
match res {
Err(IncomingBlockError::UnknownTransactions(block_hash, tx_indexes)) => {
return Ok(ProtocolResponse::FluffyMissingTxs(FluffyMissingTransactionsRequest{
block_hash: ByteArray::from(block_hash),
current_blockchain_height: peer_blockchain_height,
missing_tx_indices: tx_indexes,
}))
}
Err(IncomingBlockError::InvalidBlock(e)) => Err(e)?,
Err(IncomingBlockError::Orphan) | Ok(_) => Ok(ProtocolResponse::NA),
}
}

View file

@ -116,6 +116,7 @@ pub enum ProtocolResponse {
GetChain(ChainResponse), GetChain(ChainResponse),
NewFluffyBlock(NewFluffyBlock), NewFluffyBlock(NewFluffyBlock),
NewTransactions(NewTransactions), NewTransactions(NewTransactions),
FluffyMissingTxs(FluffyMissingTransactionsRequest),
NA, NA,
} }
@ -139,6 +140,7 @@ impl PeerResponse {
ProtocolResponse::GetChain(_) => MessageID::GetChain, ProtocolResponse::GetChain(_) => MessageID::GetChain,
ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock, ProtocolResponse::NewFluffyBlock(_) => MessageID::NewBlock,
ProtocolResponse::NewTransactions(_) => MessageID::NewFluffyBlock, ProtocolResponse::NewTransactions(_) => MessageID::NewFluffyBlock,
ProtocolResponse::FluffyMissingTxs(_) => MessageID::FluffyMissingTxs,
ProtocolResponse::NA => return None, ProtocolResponse::NA => return None,
}, },

View file

@ -75,6 +75,7 @@ impl TryFrom<ProtocolResponse> for ProtocolMessage {
ProtocolResponse::NewFluffyBlock(val) => ProtocolMessage::NewFluffyBlock(val), ProtocolResponse::NewFluffyBlock(val) => ProtocolMessage::NewFluffyBlock(val),
ProtocolResponse::GetChain(val) => ProtocolMessage::ChainEntryResponse(val), ProtocolResponse::GetChain(val) => ProtocolMessage::ChainEntryResponse(val),
ProtocolResponse::GetObjects(val) => ProtocolMessage::GetObjectsResponse(val), ProtocolResponse::GetObjects(val) => ProtocolMessage::GetObjectsResponse(val),
ProtocolResponse::FluffyMissingTxs(val) => ProtocolMessage::FluffyMissingTransactionsRequest(val),
ProtocolResponse::NA => return Err(MessageConversionError), ProtocolResponse::NA => return Err(MessageConversionError),
}) })
} }