From 008e46c6833563c9ff1a17153cdcca25f3c4c757 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Fri, 14 Jun 2024 22:24:08 +0100 Subject: [PATCH] add syncer --- Cargo.lock | 3 + binaries/cuprated/Cargo.toml | 3 + binaries/cuprated/src/blockchain.rs | 63 +--------- binaries/cuprated/src/blockchain/syncer.rs | 122 +++++++++++++++++++ binaries/cuprated/src/config.rs | 0 binaries/cuprated/src/main.rs | 5 +- binaries/cuprated/src/network.rs | 26 ---- binaries/cuprated/src/p2p_request_handler.rs | 7 -- binaries/cuprated/src/tx_pool.rs | 2 - p2p/cuprate-p2p/src/lib.rs | 3 +- 10 files changed, 135 insertions(+), 99 deletions(-) create mode 100644 binaries/cuprated/src/blockchain/syncer.rs create mode 100644 binaries/cuprated/src/config.rs delete mode 100644 binaries/cuprated/src/network.rs delete mode 100644 binaries/cuprated/src/p2p_request_handler.rs delete mode 100644 binaries/cuprated/src/tx_pool.rs diff --git a/Cargo.lock b/Cargo.lock index b896eed3..081a07e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -670,10 +670,13 @@ dependencies = [ "cuprate-p2p", "dandelion-tower", "futures", + "hex", "monero-p2p", "monero-serai", + "thiserror", "tokio", "tower", + "tracing", ] [[package]] diff --git a/binaries/cuprated/Cargo.toml b/binaries/cuprated/Cargo.toml index be5276d9..dd68730a 100644 --- a/binaries/cuprated/Cargo.toml +++ b/binaries/cuprated/Cargo.toml @@ -25,4 +25,7 @@ tower = { workspace = true } # Utils bytes = { workspace = true } +tracing = { workspace = true } +thiserror = { workspace = true } +hex = { workspace = true } diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index 4f950e04..938efd74 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -1,62 +1 @@ -//! # The Syncer -//! -//! The syncer is the part of Cuprate that handles keeping the blockchain state, it handles syncing if -//! we have fallen behind, and it handles incoming blocks. -use cuprate_blockchain::service::DatabaseWriteHandle; -use monero_serai::{block::Block, transaction::Transaction}; -use tokio::sync::mpsc; -use tower::Service; - -use cuprate_consensus::{ - BlockChainContextRequest, BlockChainContextResponse, ExtendedConsensusError, - VerifyBlockRequest, VerifyBlockResponse, -}; -use monero_p2p::handles::ConnectionHandle; - -pub struct IncomingBlock { - block: Block, - included_txs: Vec, - peer_handle: ConnectionHandle, -} - -/// A response to an [`IncomingBlock`] -pub enum IncomingBlockResponse { - /// We are missing these transactions from the block. - MissingTransactions(Vec<[u8; 32]>), - /// A generic ok response. - Ok, -} - -struct BlockBatch; - -/// The blockchain. -/// -/// This struct represents the task that syncs and maintains Cuprate's blockchain state. -pub struct Blockchain { - /// The blockchain context service. - /// - /// This service handles keeping all the data needed to verify new blocks. - context_svc: C, - /// The block verifier service, handles block verification. - block_verifier_svc: BV, - - /// The blockchain database write handle. - database_svc: DatabaseWriteHandle, - - incoming_block_rx: mpsc::Receiver, - - incoming_block_batch_rx: mpsc::Receiver, -} - -impl Blockchain -where - C: Service< - BlockChainContextRequest, - Response = BlockChainContextResponse, - Error = tower::BoxError, - >, - C::Future: Send + 'static, - BV: Service, - BV::Future: Send + 'static, -{ -} +mod syncer; diff --git a/binaries/cuprated/src/blockchain/syncer.rs b/binaries/cuprated/src/blockchain/syncer.rs new file mode 100644 index 00000000..9c608372 --- /dev/null +++ b/binaries/cuprated/src/blockchain/syncer.rs @@ -0,0 +1,122 @@ +use std::time::Duration; + +use futures::StreamExt; +use tokio::{sync::mpsc, time::sleep}; +use tower::{Service, ServiceExt}; +use tracing::instrument; + +use cuprate_consensus::{BlockChainContext, BlockChainContextRequest, BlockChainContextResponse}; +use cuprate_p2p::{ + block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}, + NetworkInterface, +}; +use monero_p2p::network_zones::ClearNet; + +/// An error returned from the [`syncer`]. +#[derive(Debug, thiserror::Error)] +enum SyncerError { + #[error("Incoming block channel closed.")] + IncomingBlockChannelClosed, + #[error("One of our services returned an error: {0}.")] + ServiceError(#[from] tower::BoxError), +} + +#[instrument(level = "debug", skip_all)] +async fn syncer( + mut context_svc: C, + our_chain: CN, + clearnet_interface: NetworkInterface, + incoming_block_batch_tx: mpsc::Sender, + block_downloader_config: BlockDownloaderConfig, +) -> Result<(), SyncerError> +where + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + >, + C::Future: Send + 'static, + CN: Service + + Clone + + Send + + 'static, + CN::Future: Send + 'static, +{ + tracing::info!("Starting blockchain syncer"); + + let BlockChainContextResponse::Context(mut blockchain_ctx) = context_svc + .ready() + .await? + .call(BlockChainContextRequest::GetContext) + .await? + else { + panic!("Blockchain context service returned wrong response!"); + }; + + let mut peer_sync_watch = clearnet_interface.top_sync_stream(); + + tracing::debug!("Waiting for new sync info in top sync channel"); + + while let Some(top_sync_info) = peer_sync_watch.next().await { + tracing::debug!( + "New sync info seen, top height: {}, top block hash: {}", + top_sync_info.chain_height, + hex::encode(top_sync_info.top_hash) + ); + + // The new info could be from a peer giving us a block, so wait a couple seconds to allow the block to + // be added to our blockchain. + sleep(Duration::from_secs(2)).await; + + check_update_blockchain_context(&mut context_svc, &mut blockchain_ctx).await?; + let raw_blockchain_context = blockchain_ctx.unchecked_blockchain_context(); + + if top_sync_info.cumulative_difficulty <= raw_blockchain_context.cumulative_difficulty { + tracing::debug!("New peer sync info is not ahead, nothing to do."); + continue; + } + + tracing::debug!( + "We are behind peers claimed cumulative difficulty, starting block downloader" + ); + let mut block_batch_stream = + clearnet_interface.block_downloader(our_chain.clone(), block_downloader_config); + + while let Some(batch) = block_batch_stream.next().await { + tracing::debug!("Got batch, len: {}", batch.blocks.len()); + if incoming_block_batch_tx.send(batch).await.is_err() { + return Err(SyncerError::IncomingBlockChannelClosed); + } + } + } + + Ok(()) +} + +async fn check_update_blockchain_context( + context_svc: C, + old_context: &mut BlockChainContext, +) -> Result<(), tower::BoxError> +where + C: Service< + BlockChainContextRequest, + Response = BlockChainContextResponse, + Error = tower::BoxError, + >, + C::Future: Send + 'static, +{ + if old_context.blockchain_context().is_ok() { + return Ok(()); + } + + let BlockChainContextResponse::Context(ctx) = context_svc + .oneshot(BlockChainContextRequest::GetContext) + .await? + else { + panic!("Blockchain context service returned wrong response!"); + }; + + *old_context = ctx; + + Ok(()) +} diff --git a/binaries/cuprated/src/config.rs b/binaries/cuprated/src/config.rs new file mode 100644 index 00000000..e69de29b diff --git a/binaries/cuprated/src/main.rs b/binaries/cuprated/src/main.rs index f75c1712..038cc633 100644 --- a/binaries/cuprated/src/main.rs +++ b/binaries/cuprated/src/main.rs @@ -1,8 +1,11 @@ +#![allow(dead_code)] + mod blockchain; +mod config; mod network; mod p2p_request_handler; mod tx_pool; fn main() { - println!("Hello, world!"); + todo!(); } diff --git a/binaries/cuprated/src/network.rs b/binaries/cuprated/src/network.rs deleted file mode 100644 index a4a25a28..00000000 --- a/binaries/cuprated/src/network.rs +++ /dev/null @@ -1,26 +0,0 @@ -use bytes::Bytes; -use dandelion_tower::TxState; -use futures::Stream; - -/// A trait representing the whole P2P network, including all network zones. -/// -/// [`cuprate_p2p`] provides a per [`NetworkZone`](monero_p2p::NetworkZone) abstraction in [`TODO`], this trait -/// provides a full abstraction, just exposing a minimal interface for Cuprate to interact with. -/// -/// It's methods will handle routing to the different [`NetworkZone`](monero_p2p::NetworkZone)s when required. -/// -/// This is kept generic for testing purposes. -trait P2PNetwork: Clone { - /// An identifier for a node on any [`NetworkZone`](monero_p2p::NetworkZone) - type PeerID; - /// The block downloader stream. - type BlockDownloader: Stream; - - /// Broadcasts a block to the network. - fn broadcast_block(&mut self, block_bytes: Bytes, chain_height: u64); - - /// Broadcasts a transaction to the network. - fn broadcast_transaction(&mut self, tx_bytes: Bytes, state: TxState); - - fn block_downloader(&mut self) -> Self::BlockDownloader; -} diff --git a/binaries/cuprated/src/p2p_request_handler.rs b/binaries/cuprated/src/p2p_request_handler.rs deleted file mode 100644 index 4a0e269a..00000000 --- a/binaries/cuprated/src/p2p_request_handler.rs +++ /dev/null @@ -1,7 +0,0 @@ -use cuprate_blockchain::service::DatabaseReadHandle; - -pub struct P2PRequestHandler { - database: DatabaseReadHandle, - - txpool: (), -} diff --git a/binaries/cuprated/src/tx_pool.rs b/binaries/cuprated/src/tx_pool.rs deleted file mode 100644 index 317b0c8f..00000000 --- a/binaries/cuprated/src/tx_pool.rs +++ /dev/null @@ -1,2 +0,0 @@ -//! # Transaction Pool -//! diff --git a/p2p/cuprate-p2p/src/lib.rs b/p2p/cuprate-p2p/src/lib.rs index f15574ef..59de45f6 100644 --- a/p2p/cuprate-p2p/src/lib.rs +++ b/p2p/cuprate-p2p/src/lib.rs @@ -36,6 +36,7 @@ use client_pool::ClientPoolDropGuard; pub use config::P2PConfig; use connection_maintainer::MakeConnectionRequest; use monero_p2p::services::PeerSyncRequest; +pub use sync_states::NewSyncInfo; /// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`]. /// @@ -194,7 +195,7 @@ impl NetworkInterface { } /// Returns a stream which yields the highest seen sync state from a connected peer. - pub fn top_sync_stream(&self) -> WatchStream { + pub fn top_sync_stream(&self) -> WatchStream { WatchStream::from_changes(self.top_block_watch.clone()) }