add syncer

This commit is contained in:
Boog900 2024-06-14 22:24:08 +01:00
parent d1288b141a
commit 008e46c683
No known key found for this signature in database
GPG key ID: 42AB1287CB0041C2
10 changed files with 135 additions and 99 deletions

3
Cargo.lock generated
View file

@ -670,10 +670,13 @@ dependencies = [
"cuprate-p2p", "cuprate-p2p",
"dandelion-tower", "dandelion-tower",
"futures", "futures",
"hex",
"monero-p2p", "monero-p2p",
"monero-serai", "monero-serai",
"thiserror",
"tokio", "tokio",
"tower", "tower",
"tracing",
] ]
[[package]] [[package]]

View file

@ -25,4 +25,7 @@ tower = { workspace = true }
# Utils # Utils
bytes = { workspace = true } bytes = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }
hex = { workspace = true }

View file

@ -1,62 +1 @@
//! # The Syncer mod syncer;
//!
//! The syncer is the part of Cuprate that handles keeping the blockchain state, it handles syncing if
//! we have fallen behind, and it handles incoming blocks.
use cuprate_blockchain::service::DatabaseWriteHandle;
use monero_serai::{block::Block, transaction::Transaction};
use tokio::sync::mpsc;
use tower::Service;
use cuprate_consensus::{
BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError,
VerifyBlockRequest, VerifyBlockResponse,
};
use monero_p2p::handles::ConnectionHandle;
pub struct IncomingBlock {
block: Block,
included_txs: Vec<Transaction>,
peer_handle: ConnectionHandle,
}
/// A response to an [`IncomingBlock`]
pub enum IncomingBlockResponse {
/// We are missing these transactions from the block.
MissingTransactions(Vec<[u8; 32]>),
/// A generic ok response.
Ok,
}
struct BlockBatch;
/// The blockchain.
///
/// This struct represents the task that syncs and maintains Cuprate's blockchain state.
pub struct Blockchain<C, BV> {
/// The blockchain context service.
///
/// This service handles keeping all the data needed to verify new blocks.
context_svc: C,
/// The block verifier service, handles block verification.
block_verifier_svc: BV,
/// The blockchain database write handle.
database_svc: DatabaseWriteHandle,
incoming_block_rx: mpsc::Receiver<IncomingBlock>,
incoming_block_batch_rx: mpsc::Receiver<BlockBatch>,
}
impl<C, BV> Blockchain<C, BV>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
>,
C::Future: Send + 'static,
BV: Service<VerifyBlockRequest, Response = VerifyBlockResponse, Error = ExtendedConsensusError>,
BV::Future: Send + 'static,
{
}

View file

@ -0,0 +1,122 @@
use std::time::Duration;
use futures::StreamExt;
use tokio::{sync::mpsc, time::sleep};
use tower::{Service, ServiceExt};
use tracing::instrument;
use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse};
use cuprate_p2p::{
block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse},
NetworkInterface,
};
use monero_p2p::network_zones::ClearNet;
/// An error returned from the [`syncer`].
#[derive(Debug, thiserror::Error)]
enum SyncerError {
#[error("Incoming block channel closed.")]
IncomingBlockChannelClosed,
#[error("One of our services returned an error: {0}.")]
ServiceError(#[from] tower::BoxError),
}
#[instrument(level = "debug", skip_all)]
async fn syncer<C, CN>(
mut context_svc: C,
our_chain: CN,
clearnet_interface: NetworkInterface<ClearNet>,
incoming_block_batch_tx: mpsc::Sender<BlockBatch>,
block_downloader_config: BlockDownloaderConfig,
) -> Result<(), SyncerError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
>,
C::Future: Send + 'static,
CN: Service<ChainSvcRequest, Response = ChainSvcResponse, Error = tower::BoxError>
+ Clone
+ Send
+ 'static,
CN::Future: Send + 'static,
{
tracing::info!("Starting blockchain syncer");
let BlockChainContextResponse::Context(mut blockchain_ctx) = context_svc
.ready()
.await?
.call(BlockChainContextRequest::GetContext)
.await?
else {
panic!("Blockchain context service returned wrong response!");
};
let mut peer_sync_watch = clearnet_interface.top_sync_stream();
tracing::debug!("Waiting for new sync info in top sync channel");
while let Some(top_sync_info) = peer_sync_watch.next().await {
tracing::debug!(
"New sync info seen, top height: {}, top block hash: {}",
top_sync_info.chain_height,
hex::encode(top_sync_info.top_hash)
);
// The new info could be from a peer giving us a block, so wait a couple seconds to allow the block to
// be added to our blockchain.
sleep(Duration::from_secs(2)).await;
check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?;
let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context();
if top_sync_info.cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty {
tracing::debug!("New peer sync info is not ahead, nothing to do.");
continue;
}
tracing::debug!(
"We are behind peers claimed cumulative difficulty, starting block downloader"
);
let mut block_batch_stream =
clearnet_interface.block_downloader(our_chain.clone(), block_downloader_config);
while let Some(batch) = block_batch_stream.next().await {
tracing::debug!("Got batch, len: {}", batch.blocks.len());
if incoming_block_batch_tx.send(batch).await.is_err() {
return Err(SyncerError::IncomingBlockChannelClosed);
}
}
}
Ok(())
}
async fn check_update_blockchain_context<C>(
context_svc: C,
old_context: &mut BlockChainContext,
) -> Result<(), tower::BoxError>
where
C: Service<
BlockChainContextRequest,
Response = BlockChainContextResponse,
Error = tower::BoxError,
>,
C::Future: Send + 'static,
{
if old_context.blockchain_context().is_ok() {
return Ok(());
}
let BlockChainContextResponse::Context(ctx) = context_svc
.oneshot(BlockChainContextRequest::GetContext)
.await?
else {
panic!("Blockchain context service returned wrong response!");
};
*old_context = ctx;
Ok(())
}

View file

View file

@ -1,8 +1,11 @@
#![allow(dead_code)]
mod blockchain; mod blockchain;
mod config;
mod network; mod network;
mod p2p_request_handler; mod p2p_request_handler;
mod tx_pool; mod tx_pool;
fn main() { fn main() {
println!("Hello, world!"); todo!();
} }

View file

@ -1,26 +0,0 @@
use bytes::Bytes;
use dandelion_tower::TxState;
use futures::Stream;
/// A trait representing the whole P2P network, including all network zones.
///
/// [`cuprate_p2p`] provides a per [`NetworkZone`](monero_p2p::NetworkZone) abstraction in [`TODO`], this trait
/// provides a full abstraction, just exposing a minimal interface for Cuprate to interact with.
///
/// It's methods will handle routing to the different [`NetworkZone`](monero_p2p::NetworkZone)s when required.
///
/// This is kept generic for testing purposes.
trait P2PNetwork: Clone {
/// An identifier for a node on any [`NetworkZone`](monero_p2p::NetworkZone)
type PeerID;
/// The block downloader stream.
type BlockDownloader: Stream<Item = ()>;
/// Broadcasts a block to the network.
fn broadcast_block(&mut self, block_bytes: Bytes, chain_height: u64);
/// Broadcasts a transaction to the network.
fn broadcast_transaction(&mut self, tx_bytes: Bytes, state: TxState<Self::PeerID>);
fn block_downloader(&mut self) -> Self::BlockDownloader;
}

View file

@ -1,7 +0,0 @@
use cuprate_blockchain::service::DatabaseReadHandle;
pub struct P2PRequestHandler {
database: DatabaseReadHandle,
txpool: (),
}

View file

@ -1,2 +0,0 @@
//! # Transaction Pool
//!

View file

@ -36,6 +36,7 @@ use client_pool::ClientPoolDropGuard;
pub use config::P2PConfig; pub use config::P2PConfig;
use connection_maintainer::MakeConnectionRequest; use connection_maintainer::MakeConnectionRequest;
use monero_p2p::services::PeerSyncRequest; use monero_p2p::services::PeerSyncRequest;
pub use sync_states::NewSyncInfo;
/// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`]. /// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`].
/// ///
@ -194,7 +195,7 @@ impl<N: NetworkZone> NetworkInterface<N> {
} }
/// Returns a stream which yields the highest seen sync state from a connected peer. /// Returns a stream which yields the highest seen sync state from a connected peer.
pub fn top_sync_stream(&self) -> WatchStream<sync_states::NewSyncInfo> { pub fn top_sync_stream(&self) -> WatchStream<NewSyncInfo> {
WatchStream::from_changes(self.top_block_watch.clone()) WatchStream::from_changes(self.top_block_watch.clone())
} }