diff --git a/binaries/cuprated/src/p2p.rs b/binaries/cuprated/src/p2p.rs index f55d41d..7f13c3f 100644 --- a/binaries/cuprated/src/p2p.rs +++ b/binaries/cuprated/src/p2p.rs @@ -2,4 +2,48 @@ //! //! Will handle initiating the P2P and contains a protocol request handler. +use crate::txpool::IncomingTxHandler; +use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_consensus::BlockChainContextService; +use cuprate_p2p::{NetworkInterface, P2PConfig}; +use cuprate_p2p_core::ClearNet; +use cuprate_txpool::service::TxpoolReadHandle; +use futures::{FutureExt, TryFutureExt}; +use tokio::sync::oneshot; +use tower::ServiceExt; + +mod core_sync_service; pub mod request_handler; + +pub async fn start_clearnet_p2p( + blockchain_read_handle: BlockchainReadHandle, + blockchain_context_service: BlockChainContextService, + txpool_read_handle: TxpoolReadHandle, + config: P2PConfig, +) -> Result< + ( + NetworkInterface, + oneshot::Sender, + ), + tower::BoxError, +> { + let (incoming_tx_handler_tx, incoming_tx_handler_rx) = oneshot::channel(); + + let request_handler_maker = request_handler::P2pProtocolRequestHandlerMaker { + blockchain_read_handle, + blockchain_context_service: blockchain_context_service.clone(), + txpool_read_handle, + incoming_tx_handler: None, + incoming_tx_handler_fut: incoming_tx_handler_rx.shared(), + }; + + Ok(( + cuprate_p2p::initialize_network( + request_handler_maker.map_response(|s| s.map_err(Into::into)), + core_sync_service::CoreSyncService(blockchain_context_service), + config, + ) + .await?, + incoming_tx_handler_tx, + )) +} diff --git a/binaries/cuprated/src/p2p/core_sync_service.rs b/binaries/cuprated/src/p2p/core_sync_service.rs new file mode 100644 index 0000000..04f2004 --- /dev/null +++ b/binaries/cuprated/src/p2p/core_sync_service.rs @@ -0,0 +1,48 @@ +use cuprate_consensus::{ + BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, +}; +use cuprate_helper::cast::usize_to_u64; +use cuprate_helper::map::split_u128_into_low_high_bits; +use cuprate_p2p_core::services::{CoreSyncDataRequest, CoreSyncDataResponse}; +use cuprate_wire::CoreSyncData; +use futures::future::BoxFuture; +use futures::{FutureExt, TryFutureExt}; +use std::task::{Context, Poll}; +use tower::Service; + +#[derive(Clone)] +pub struct CoreSyncService(pub BlockChainContextService); + +impl Service for CoreSyncService { + type Response = CoreSyncDataResponse; + type Error = tower::BoxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_ready(cx) + } + + fn call(&mut self, _: CoreSyncDataRequest) -> Self::Future { + self.0 + .call(BlockChainContextRequest::Context) + .map_ok(|res| { + let BlockChainContextResponse::Context(context) = res else { + unreachable!() + }; + + let context = context.unchecked_blockchain_context(); + let (cumulative_difficulty, cumulative_difficulty_top64) = + split_u128_into_low_high_bits(context.cumulative_difficulty); + + CoreSyncDataResponse(CoreSyncData { + cumulative_difficulty, + cumulative_difficulty_top64, + current_height: usize_to_u64(context.chain_height), + pruning_seed: 0, + top_id: context.top_hash, + top_version: context.current_hf.as_u8(), + }) + }) + .boxed() + } +}