add main chain batch handler

This commit is contained in:
Boog900 2024-08-20 16:39:23 +01:00
parent 39d48fe9fc
commit a01846954d
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
10 changed files with 337 additions and 4 deletions

14
Cargo.lock generated
View file

@ -884,6 +884,20 @@ dependencies = [
[[package]]
name = "cuprated"
version = "0.1.0"
dependencies = [
"cuprate-blockchain",
"cuprate-consensus",
"cuprate-p2p",
"cuprate-p2p-core",
"cuprate-types",
"futures",
"hex",
"rayon",
"thiserror",
"tokio",
"tower",
"tracing",
]
[[package]]
name = "curve25519-dalek"

View file

@ -8,6 +8,19 @@ authors = ["Boog900", "hinto-janai", "SyntheticBird45"]
repository = "https://github.com/Cuprate/cuprate/tree/main/binaries/cuprated"
[dependencies]
cuprate-consensus = { path = "../../consensus" }
cuprate-blockchain = { path = "../../storage/blockchain" }
cuprate-p2p = { path = "../../p2p/p2p" }
cuprate-p2p-core = { path = "../../p2p/p2p-core" }
cuprate-types = { path = "../../types" }
rayon = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true }
tower = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }
hex = { workspace = true }
[lints]
workspace = true

View file

@ -4,3 +4,4 @@
mod manager;
mod syncer;
mod types;

View file

@ -1 +1,14 @@
mod batch_handler;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::{BlockChainContextService, BlockVerifierService, TxVerifierService};
struct BlockchainManager {
blockchain_write_handle: BlockchainWriteHandle,
blockchain_context_service: BlockChainContextService,
block_verifier_service: BlockVerifierService<
BlockChainContextService,
TxVerifierService<BlockchainReadHandle>,
BlockchainReadHandle,
>,
}

View file

@ -0,0 +1,143 @@
//! Block batch handling functions.
use crate::blockchain::types::ConsensusBlockchainReadHandle;
use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::context::NewBlockData;
use cuprate_consensus::{
BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService,
BlockVerifierService, BlockchainReadRequest, BlockchainResponse, ExtendedConsensusError,
VerifyBlockRequest, VerifyBlockResponse, VerifyTxRequest, VerifyTxResponse,
};
use cuprate_p2p::block_downloader::BlockBatch;
use cuprate_types::blockchain::BlockchainWriteRequest;
use cuprate_types::{Chain, HardFork};
use tower::{Service, ServiceExt};
use tracing::{debug, error, info};
pub async fn handle_incoming_block_batch<C, TxV>(
batch: BlockBatch,
block_verifier_service: &mut BlockVerifierService<C, TxV, ConsensusBlockchainReadHandle>,
blockchain_context_service: &mut C,
blockchain_read_handle: &mut BlockchainReadHandle,
blockchain_write_handle: &mut BlockchainWriteHandle,
) where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
+ 'static,
TxV::Future: Send + 'static,
{
let (first_block, _) = batch
.blocks
.first()
.expect("Block batch should not be empty");
match blockchain_read_handle
.oneshot(BlockchainReadRequest::FindBlock(
first_block.header.previous,
))
.await
{
Err(_) | Ok(BlockchainResponse::FindBlock(None)) => {
// The block downloader shouldn't be downloading orphan blocks
error!("Failed to find parent block for first block in batch.");
return;
}
Ok(BlockchainResponse::FindBlock(Some((chain, _)))) => match chain {
Chain::Main => {
handle_incoming_block_batch_main_chain(
batch,
block_verifier_service,
blockchain_context_service,
blockchain_write_handle,
)
.await;
}
Chain::Alt(_) => todo!(),
},
Ok(_) => panic!("Blockchain service returned incorrect response"),
}
}
async fn handle_incoming_block_batch_main_chain<C, TxV>(
batch: BlockBatch,
block_verifier_service: &mut BlockVerifierService<C, TxV, ConsensusBlockchainReadHandle>,
blockchain_context_service: &mut C,
blockchain_write_handle: &mut BlockchainWriteHandle,
) where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
TxV: Service<VerifyTxRequest, Response = VerifyTxResponse, Error = ExtendedConsensusError>
+ Clone
+ Send
+ 'static,
TxV::Future: Send + 'static,
{
let Ok(VerifyBlockResponse::MainChainBatchPrepped(prepped)) = block_verifier_service
.ready()
.await
.expect("TODO")
.call(VerifyBlockRequest::MainChainBatchPrepareBlocks {
blocks: batch.blocks,
})
.await
else {
info!("Error verifying batch, banning peer");
todo!()
};
for (block, txs) in prepped {
let Ok(VerifyBlockResponse::MainChain(verified_block)) = block_verifier_service
.ready()
.await
.expect("TODO")
.call(VerifyBlockRequest::MainChainPrepped { block, txs })
.await
else {
info!("Error verifying batch, banning peer");
todo!()
};
blockchain_context_service
.ready()
.await
.expect("TODO")
.call(BlockChainContextRequest::Update(NewBlockData {
block_hash: verified_block.block_hash,
height: verified_block.height,
timestamp: verified_block.block.header.timestamp,
weight: verified_block.weight,
long_term_weight: verified_block.long_term_weight,
generated_coins: verified_block.generated_coins,
vote: HardFork::from_vote(verified_block.block.header.hardfork_signal),
cumulative_difficulty: verified_block.cumulative_difficulty,
}))
.await
.expect("TODO");
blockchain_write_handle
.ready()
.await
.expect("TODO")
.call(BlockchainWriteRequest::WriteBlock(verified_block))
.await
.expect("TODO");
}
}

View file

@ -1 +1,122 @@
use std::time::Duration;
use futures::StreamExt;
use tokio::{sync::mpsc, time::sleep};
use tower::{Service, ServiceExt};
use tracing::instrument;
use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse};
use cuprate_p2p::{
block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
NetworkInterface,
};
use cuprate_p2p_core::ClearNet;
/// An error returned from the [`syncer`].
#[derive(Debug, thiserror::Error)]
enum SyncerError {
#[error("Incoming block channel closed.")]
IncomingBlockChannelClosed,
#[error("One of our services returned an error: {0}.")]
ServiceError(#[from] tower::BoxError),
}
#[instrument(level = "debug", skip_all)]
pub async fn syncer<C, CN>(
mut context_svc: C,
our_chain: CN,
clearnet_interface: NetworkInterface<ClearNet>,
incoming_block_batch_tx: mpsc::Sender<BlockBatch>,
block_downloader_config: BlockDownloaderConfig,
) -> Result<(), SyncerError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
>,
C::Future: Send + 'static,
CN: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
+ Clone
+ Send
+ 'static,
CN::Future: Send + 'static,
{
tracing::info!("Starting blockchain syncer");
let BlockChainContextResponse::Context(mut blockchain_ctx) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::GetContext)
.await?
else {
panic!("Blockchain context service returned wrong response!");
};
let mut peer_sync_watch = clearnet_interface.top_sync_stream();
tracing::debug!("Waiting for new sync info in top sync channel");
while let Some(top_sync_info) = peer_sync_watch.next().await {
tracing::debug!(
"New sync info seen, top height: {}, top block hash: {}",
top_sync_info.chain_height,
hex::encode(top_sync_info.top_hash)
);
// The new info could be from a peer giving us a block, so wait a couple seconds to allow the block to
// be added to our blockchain.
sleep(Duration::from_secs(2)).await;
check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?;
let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context();
if top_sync_info.cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty {
tracing::debug!("New peer sync info is not ahead, nothing to do.");
continue;
}
tracing::debug!(
"We are behind peers claimed cumulative difficulty, starting block downloader"
);
let mut block_batch_stream =
clearnet_interface.block_downloader(our_chain.clone(), block_downloader_config);
while let Some(batch) = block_batch_stream.next().await {
tracing::debug!("Got batch, len: {}", batch.blocks.len());
if incoming_block_batch_tx.send(batch).await.is_err() {
return Err(SyncerError::IncomingBlockChannelClosed);
}
}
}
Ok(())
}
async fn check_update_blockchain_context<C>(
context_svc: C,
old_context: &mut BlockChainContext,
) -> Result<(), tower::BoxError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
>,
C::Future: Send + 'static,
{
if old_context.blockchain_context().is_ok() {
return Ok(());
}
let BlockChainContextResponse::Context(ctx) = context_svc
.oneshot(BlockChainContextRequest::GetContext)
.await?
else {
panic!("Blockchain context service returned wrong response!");
};
*old_context = ctx;
Ok(())
}

View file

@ -0,0 +1,27 @@
use cuprate_blockchain::cuprate_database::RuntimeError;
use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};
use futures::future::MapErr;
use futures::TryFutureExt;
use std::task::{Context, Poll};
use tower::Service;
#[derive(Clone)]
pub struct ConsensusBlockchainReadHandle(BlockchainReadHandle);
impl Service<BlockchainReadRequest> for ConsensusBlockchainReadHandle {
type Response = BlockchainResponse;
type Error = tower::BoxError;
type Future = MapErr<
<BlockchainReadHandle as Service<BlockchainReadRequest>>::Future,
fn(RuntimeError) -> tower::BoxError,
>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx).map_err(Into::into)
}
fn call(&mut self, req: BlockchainReadRequest) -> Self::Future {
self.0.call(req).map_err(Into::into)
}
}

View file

@ -40,11 +40,12 @@ static HASHES_OF_HASHES: &[HashOfHashes] = &[
const BATCH_SIZE: usize = 4;
#[inline]
fn max_height() -> u64 {
pub fn max_height() -> u64 {
(HASHES_OF_HASHES.len() * BATCH_SIZE) as u64
}
#[derive(Debug, PartialEq)]
#[repr(transparent)]
pub struct ValidBlockId(BlockId);
fn valid_block_ids(block_ids: &[BlockId]) -> Vec<ValidBlockId> {

View file

@ -250,7 +250,7 @@ where
+ Clone
+ Send
+ 'static,
D: Database + Clone + Send + Sync + 'static,
D: Database + Clone + Send + 'static,
D::Future: Send + 'static,
{
/// Creates a new block verifier.
@ -284,7 +284,7 @@ where
+ 'static,
TxV::Future: Send + 'static,
D: Database + Clone + Send + Sync + 'static,
D: Database + Clone + Send + 'static,
D::Future: Send + 'static,
{
type Response = VerifyBlockResponse;

View file

@ -21,7 +21,7 @@ use cuprate_p2p_core::{
CoreSyncSvc, NetworkZone, ProtocolRequestHandler,
};
mod block_downloader;
pub mod block_downloader;
mod broadcast;
mod client_pool;
pub mod config;