broadcast new blocks + add commands

This commit is contained in:
Boog900 2024-09-14 13:49:41 +01:00
parent 90beed1590
commit a16381ea61
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
7 changed files with 105 additions and 51 deletions

View file

@ -3,7 +3,8 @@
//! Will contain the chain manager and syncer. //! Will contain the chain manager and syncer.
use futures::FutureExt; use futures::FutureExt;
use tokio::sync::mpsc; use std::sync::Arc;
use tokio::sync::{mpsc, Notify};
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
@ -26,6 +27,7 @@ use types::{
ChainService, ConcreteBlockVerifierService, ConcreteTxVerifierService, ChainService, ConcreteBlockVerifierService, ConcreteTxVerifierService,
ConsensusBlockchainReadHandle, ConsensusBlockchainReadHandle,
}; };
use crate::blockchain::free::INCOMING_BLOCK_TX;
/// Checks if the genesis block is in the blockchain and adds it if not. /// Checks if the genesis block is in the blockchain and adds it if not.
pub async fn check_add_genesis( pub async fn check_add_genesis(
@ -107,12 +109,17 @@ pub async fn init_blockchain_manager(
block_downloader_config: BlockDownloaderConfig, block_downloader_config: BlockDownloaderConfig,
) { ) {
let (batch_tx, batch_rx) = mpsc::channel(1); let (batch_tx, batch_rx) = mpsc::channel(1);
let stop_current_block_downloader = Arc::new(Notify::new());
let (command_tx, command_rx) = mpsc::channel(1);
INCOMING_BLOCK_TX.set(command_tx).unwrap();
tokio::spawn(syncer::syncer( tokio::spawn(syncer::syncer(
blockchain_context_service.clone(), blockchain_context_service.clone(),
ChainService(blockchain_read_handle.clone()), ChainService(blockchain_read_handle.clone()),
clearnet_interface, clearnet_interface.clone(),
batch_tx, batch_tx,
stop_current_block_downloader.clone(),
block_downloader_config, block_downloader_config,
)); ));
@ -121,8 +128,10 @@ pub async fn init_blockchain_manager(
blockchain_read_handle, blockchain_read_handle,
blockchain_context_service, blockchain_context_service,
block_verifier_service, block_verifier_service,
stop_current_block_downloader,
clearnet_interface.broadcast_svc(),
) )
.await; .await;
tokio::spawn(manager.run(batch_rx)); tokio::spawn(manager.run(batch_rx, command_rx));
} }

View file

@ -1,4 +1,3 @@
use crate::blockchain::manager::IncomingBlock;
use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::transactions::new_tx_verification_data; use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_helper::cast::usize_to_u64; use cuprate_helper::cast::usize_to_u64;
@ -11,10 +10,11 @@ use std::collections::HashMap;
use std::sync::OnceLock; use std::sync::OnceLock;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use crate::blockchain::manager::commands::BlockchainManagerCommand;
static INCOMING_BLOCK_TX: OnceLock<mpsc::Sender<IncomingBlock>> = OnceLock::new(); pub static INCOMING_BLOCK_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();
#[derive(thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum IncomingBlockError { pub enum IncomingBlockError {
#[error("Unknown transactions in block.")] #[error("Unknown transactions in block.")]
UnknownTransactions(Vec<u64>), UnknownTransactions(Vec<u64>),
@ -28,8 +28,8 @@ pub async fn handle_incoming_block(
block: Block, block: Block,
given_txs: Vec<Transaction>, given_txs: Vec<Transaction>,
blockchain_read_handle: &mut BlockchainReadHandle, blockchain_read_handle: &mut BlockchainReadHandle,
) -> Result<(), IncomingBlockError> { ) -> Result<bool, IncomingBlockError> {
if !block_exists(block.header.previous, blockchain_read_handle).expect("TODO") { if !block_exists(block.header.previous, blockchain_read_handle).await.expect("TODO") {
return Err(IncomingBlockError::Orphan); return Err(IncomingBlockError::Orphan);
} }
@ -39,7 +39,14 @@ pub async fn handle_incoming_block(
.await .await
.expect("TODO") .expect("TODO")
{ {
return Ok(()); return Ok(false);
}
// TODO: Get transactions from the tx pool first.
if given_txs.len() != block.transactions.len() {
return Err(IncomingBlockError::UnknownTransactions(
(0..usize_to_u64(block.transactions.len())).collect(),
));
} }
let prepped_txs = given_txs let prepped_txs = given_txs
@ -51,21 +58,14 @@ pub async fn handle_incoming_block(
.collect::<Result<_, anyhow::Error>>() .collect::<Result<_, anyhow::Error>>()
.map_err(IncomingBlockError::InvalidBlock)?; .map_err(IncomingBlockError::InvalidBlock)?;
// TODO: Get transactions from the tx pool first.
if given_txs.len() != block.transactions.len() {
return Err(IncomingBlockError::UnknownTransactions(
(0..usize_to_u64(block.transactions.len())).collect(),
));
}
let Some(incoming_block_tx) = INCOMING_BLOCK_TX.get() else { let Some(incoming_block_tx) = INCOMING_BLOCK_TX.get() else {
return Ok(()); return Ok(false);
}; };
let (response_tx, response_rx) = oneshot::channel(); let (response_tx, response_rx) = oneshot::channel();
incoming_block_tx incoming_block_tx
.send(IncomingBlock { .send( BlockchainManagerCommand::AddBlock {
block, block,
prepped_txs, prepped_txs,
response_tx, response_tx,
@ -73,7 +73,7 @@ pub async fn handle_incoming_block(
.await .await
.expect("TODO: don't actually panic here"); .expect("TODO: don't actually panic here");
response_rx.await.map_err(IncomingBlockError::InvalidBlock) response_rx.await.unwrap().map_err(IncomingBlockError::InvalidBlock)
} }
async fn block_exists( async fn block_exists(

View file

@ -1,4 +1,5 @@
mod handler; mod handler;
pub(super) mod commands;
use crate::blockchain::types::ConsensusBlockchainReadHandle; use crate::blockchain::types::ConsensusBlockchainReadHandle;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
@ -9,21 +10,20 @@ use cuprate_consensus::{
VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse, VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse,
}; };
use cuprate_p2p::block_downloader::BlockBatch; use cuprate_p2p::block_downloader::BlockBatch;
use cuprate_p2p::BroadcastSvc;
use cuprate_p2p_core::ClearNet;
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
use cuprate_types::{Chain, TransactionVerificationData}; use cuprate_types::{Chain, TransactionVerificationData};
use futures::StreamExt; use futures::StreamExt;
use monero_serai::block::Block; use monero_serai::block::Block;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::{oneshot, Notify}; use tokio::sync::{oneshot, Notify};
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use tracing::error; use tracing::error;
use tracing_subscriber::fmt::time::FormatTime;
pub struct IncomingBlock { use crate::blockchain::manager::commands::BlockchainManagerCommand;
pub block: Block,
pub prepped_txs: HashMap<[u8; 32], TransactionVerificationData>,
pub response_tx: oneshot::Sender<Result<bool, anyhow::Error>>,
}
pub struct BlockchainManager { pub struct BlockchainManager {
blockchain_write_handle: BlockchainWriteHandle, blockchain_write_handle: BlockchainWriteHandle,
@ -35,7 +35,8 @@ pub struct BlockchainManager {
TxVerifierService<ConsensusBlockchainReadHandle>, TxVerifierService<ConsensusBlockchainReadHandle>,
ConsensusBlockchainReadHandle, ConsensusBlockchainReadHandle,
>, >,
stop_current_block_downloader: Notify, stop_current_block_downloader: Arc<Notify>,
broadcast_svc: BroadcastSvc<ClearNet>,
} }
impl BlockchainManager { impl BlockchainManager {
@ -48,6 +49,8 @@ impl BlockchainManager {
TxVerifierService<ConsensusBlockchainReadHandle>, TxVerifierService<ConsensusBlockchainReadHandle>,
ConsensusBlockchainReadHandle, ConsensusBlockchainReadHandle,
>, >,
stop_current_block_downloader: Arc<Notify>,
broadcast_svc: BroadcastSvc<ClearNet>,
) -> Self { ) -> Self {
let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service let BlockChainContextResponse::Context(blockchain_context) = blockchain_context_service
.ready() .ready()
@ -66,13 +69,15 @@ impl BlockchainManager {
blockchain_context_service, blockchain_context_service,
cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(), cached_blockchain_context: blockchain_context.unchecked_blockchain_context().clone(),
block_verifier_service, block_verifier_service,
stop_current_block_downloader,
broadcast_svc,
} }
} }
pub async fn run( pub async fn run(
mut self, mut self,
mut block_batch_rx: mpsc::Receiver<BlockBatch>, mut block_batch_rx: mpsc::Receiver<BlockBatch>,
mut block_single_rx: mpsc::Receiver<IncomingBlock>, mut command_rx: mpsc::Receiver<BlockchainManagerCommand>,
) { ) {
loop { loop {
tokio::select! { tokio::select! {
@ -81,15 +86,8 @@ impl BlockchainManager {
batch, batch,
).await; ).await;
} }
Some(incoming_block) = block_single_rx.recv() => { Some(incoming_command) = command_rx.recv() => {
let IncomingBlock { self.handle_command(incoming_command).await;
block,
prepped_txs,
response_tx
} = incoming_block;
let res = self.handle_incoming_block(block, prepped_txs).await;
let _ = response_tx.send(res);
} }
else => { else => {
todo!("TODO: exit the BC manager") todo!("TODO: exit the BC manager")

View file

@ -0,0 +1,17 @@
use std::collections::HashMap;
use monero_serai::block::Block;
use tokio::sync::oneshot;
use cuprate_types::TransactionVerificationData;
pub enum BlockchainManagerCommand {
AddBlock {
block: Block,
prepped_txs: HashMap<[u8; 32], TransactionVerificationData>,
response_tx: oneshot::Sender<Result<bool, anyhow::Error>>,
},
PopBlocks,
}

View file

@ -1,8 +1,8 @@
use std::{collections::HashMap, sync::Arc}; use bytes::Bytes;
use futures::{TryFutureExt, TryStreamExt}; use futures::{TryFutureExt, TryStreamExt};
use monero_serai::{block::Block, transaction::Transaction}; use monero_serai::{block::Block, transaction::Transaction};
use rayon::prelude::*; use rayon::prelude::*;
use std::{collections::HashMap, sync::Arc};
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use tracing::info; use tracing::info;
@ -13,15 +13,45 @@ use cuprate_consensus::{
ExtendedConsensusError, VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, ExtendedConsensusError, VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest,
VerifyTxResponse, VerifyTxResponse,
}; };
use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN}; use cuprate_helper::cast::usize_to_u64;
use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest};
use cuprate_types::{ use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest}, blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest},
AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation, AltBlockInformation, HardFork, TransactionVerificationData, VerifiedBlockInformation,
}; };
use crate::{blockchain::types::ConsensusBlockchainReadHandle, signals::REORG_LOCK}; use crate::{blockchain::types::ConsensusBlockchainReadHandle, signals::REORG_LOCK};
use crate::blockchain::manager::commands::BlockchainManagerCommand;
impl super::BlockchainManager { impl super::BlockchainManager {
pub async fn handle_command(&mut self, command: BlockchainManagerCommand) {
match command {
BlockchainManagerCommand::AddBlock {
block,
prepped_txs,
response_tx
} => {
let res = self.handle_incoming_block(block, prepped_txs).await;
drop(response_tx.send(res));
}
BlockchainManagerCommand::PopBlocks => todo!()
}
}
async fn broadcast_block(&mut self, block_bytes: Bytes, blockchain_height: usize) {
self.broadcast_svc
.ready()
.await
.expect("TODO")
.call(BroadcastRequest::Block {
block_bytes,
current_blockchain_height: usize_to_u64(blockchain_height),
})
.await
.expect("TODO");
}
/// Handle an incoming [`Block`]. /// Handle an incoming [`Block`].
/// ///
/// This function will route to [`Self::handle_incoming_alt_block`] if the block does not follow /// This function will route to [`Self::handle_incoming_alt_block`] if the block does not follow
@ -56,8 +86,12 @@ impl super::BlockchainManager {
panic!("Incorrect response!"); panic!("Incorrect response!");
}; };
let block_blob = Bytes::copy_from_slice(&verified_block.block_blob);
self.add_valid_block_to_main_chain(verified_block).await; self.add_valid_block_to_main_chain(verified_block).await;
self.broadcast_block(block_blob, self.cached_blockchain_context.chain_height)
.await;
Ok(true) Ok(true)
} }
@ -101,11 +135,6 @@ impl super::BlockchainManager {
batch.blocks.first().unwrap().0.number().unwrap() batch.blocks.first().unwrap().0.number().unwrap()
); );
let ban_cancel_download = || {
batch.peer_handle.ban_peer(LONG_BAN);
self.stop_current_block_downloader.notify_one();
};
let batch_prep_res = self let batch_prep_res = self
.block_verifier_service .block_verifier_service
.ready() .ready()
@ -119,7 +148,8 @@ impl super::BlockchainManager {
let prepped_blocks = match batch_prep_res { let prepped_blocks = match batch_prep_res {
Ok(VerifyBlockResponse::MainChainBatchPrepped(prepped_blocks)) => prepped_blocks, Ok(VerifyBlockResponse::MainChainBatchPrepped(prepped_blocks)) => prepped_blocks,
Err(_) => { Err(_) => {
ban_cancel_download(); batch.peer_handle.ban_peer(LONG_BAN);
self.stop_current_block_downloader.notify_one();
return; return;
} }
_ => panic!("Incorrect response!"), _ => panic!("Incorrect response!"),
@ -134,10 +164,11 @@ impl super::BlockchainManager {
.call(VerifyBlockRequest::MainChainPrepped { block, txs }) .call(VerifyBlockRequest::MainChainPrepped { block, txs })
.await; .await;
let VerifyBlockResponse::MainChain(verified_block) = match verify_res { let verified_block = match verify_res {
Ok(VerifyBlockResponse::MainChain(verified_block)) => verified_block, Ok(VerifyBlockResponse::MainChain(verified_block)) => verified_block,
Err(_) => { Err(_) => {
ban_cancel_download(); batch.peer_handle.ban_peer(LONG_BAN);
self.stop_current_block_downloader.notify_one();
return; return;
} }
_ => panic!("Incorrect response!"), _ => panic!("Incorrect response!"),
@ -145,8 +176,6 @@ impl super::BlockchainManager {
self.add_valid_block_to_main_chain(verified_block).await; self.add_valid_block_to_main_chain(verified_block).await;
} }
Ok(())
} }
/// Handles an incoming [`BlockBatch`] that does not follow the main-chain. /// Handles an incoming [`BlockBatch`] that does not follow the main-chain.
@ -175,7 +204,7 @@ impl super::BlockchainManager {
self.handle_incoming_alt_block(block, txs).await?; self.handle_incoming_alt_block(block, txs).await?;
Ok(()) Ok::<_, anyhow::Error>(())
} }
.await; .await;

View file

@ -1,4 +1,5 @@
use std::pin::pin; use std::pin::pin;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use futures::StreamExt; use futures::StreamExt;
@ -31,7 +32,7 @@ pub async fn syncer<C, CN>(
our_chain: CN, our_chain: CN,
clearnet_interface: NetworkInterface<ClearNet>, clearnet_interface: NetworkInterface<ClearNet>,
incoming_block_batch_tx: mpsc::Sender<BlockBatch>, incoming_block_batch_tx: mpsc::Sender<BlockBatch>,
stop_current_block_downloader: Notify, stop_current_block_downloader: Arc<Notify>,
block_downloader_config: BlockDownloaderConfig, block_downloader_config: BlockDownloaderConfig,
) -> Result<(), SyncerError> ) -> Result<(), SyncerError>
where where

Binary file not shown.