From b510739701790f7b047be05801103b296d374b98 Mon Sep 17 00:00:00 2001 From: Boog900 Date: Tue, 4 Jun 2024 17:19:25 +0000 Subject: [PATCH 1/4] P2P: Network init (#130) * p2p changes * clippy * a few more docs * init cuprate-p2p * remove some unrelated code and add some docs * start documenting client_pool.rs * add more docs * typo * fix docs * use JoinSet in connection maintainer * small changes * add peer sync state svc * add broadcast svc * add more docs * add some tests * add a test * fix merge * add another test * unify PeerDisconnectFut and add more docs * start network init * add an inbound connection server * remove crate doc for now * fix address book docs * fix leak in client pool * correct comment * fix merge + add some docs * fix doc * dandelion_tower -> dandelion-tower * fix async-buffer builds * check if incoming peers are banned * add interface methods * update docs * use a JoinSet for background network tasks * Apply suggestions from code review Co-authored-by: hinto-janai * Update p2p/monero-p2p/src/services.rs Co-authored-by: hinto-janai --------- Co-authored-by: hinto-janai --- Cargo.toml | 1 + p2p/address-book/Cargo.toml | 2 +- p2p/address-book/src/book.rs | 3 + p2p/address-book/src/lib.rs | 10 +- p2p/async-buffer/src/lib.rs | 2 +- p2p/cuprate-p2p/Cargo.toml | 6 +- p2p/cuprate-p2p/src/broadcast.rs | 1 + p2p/cuprate-p2p/src/client_pool.rs | 22 +-- .../src/client_pool/disconnect_monitor.rs | 13 +- .../src/client_pool/drop_guard_client.rs | 2 +- p2p/cuprate-p2p/src/config.rs | 42 +++- p2p/cuprate-p2p/src/connection_maintainer.rs | 19 +- p2p/cuprate-p2p/src/constants.rs | 8 +- p2p/cuprate-p2p/src/inbound_server.rs | 113 +++++++++++ p2p/cuprate-p2p/src/lib.rs | 183 +++++++++++++++++- p2p/cuprate-p2p/src/sync_states.rs | 8 +- p2p/dandelion/Cargo.toml | 2 +- p2p/monero-p2p/src/client.rs | 4 +- p2p/monero-p2p/src/lib.rs | 9 +- p2p/monero-p2p/src/network_zones/clear.rs | 6 +- p2p/monero-p2p/src/services.rs | 4 + p2p/monero-p2p/tests/fragmented_handshake.rs | 7 +- p2p/monero-p2p/tests/handshake.rs | 4 +- test-utils/src/test_netzone.rs | 10 +- 24 files changed, 415 insertions(+), 66 deletions(-) create mode 100644 p2p/cuprate-p2p/src/inbound_server.rs diff --git a/Cargo.toml b/Cargo.toml index d07f7bf..4b9bd63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "p2p/cuprate-p2p", "p2p/dandelion", "p2p/monero-p2p", + "p2p/async-buffer", "p2p/address-book", "storage/cuprate-blockchain", "storage/cuprate-txpool", diff --git a/p2p/address-book/Cargo.toml b/p2p/address-book/Cargo.toml index e60ec61..9b24c02 100644 --- a/p2p/address-book/Cargo.toml +++ b/p2p/address-book/Cargo.toml @@ -11,7 +11,7 @@ monero-pruning = { path = "../../pruning" } monero-wire = { path= "../../net/monero-wire" } monero-p2p = { path = "../monero-p2p" } -tower = { workspace = true, features = ["util", "buffer"] } +tower = { workspace = true, features = ["util"] } tokio = { workspace = true, features = ["time", "fs", "rt"]} tokio-util = { workspace = true, features = ["time"] } diff --git a/p2p/address-book/src/book.rs b/p2p/address-book/src/book.rs index 3a49c6b..2f0617e 100644 --- a/p2p/address-book/src/book.rs +++ b/p2p/address-book/src/book.rs @@ -409,6 +409,9 @@ impl Service> for AddressBook { AddressBookRequest::GetWhitePeers(len) => { Ok(AddressBookResponse::Peers(self.get_white_peers(len))) } + AddressBookRequest::IsPeerBanned(addr) => Ok(AddressBookResponse::IsPeerBanned( + self.is_peer_banned(&addr), + )), }; ready(response) diff --git a/p2p/address-book/src/lib.rs b/p2p/address-book/src/lib.rs index a3dc054..51f83dd 100644 --- a/p2p/address-book/src/lib.rs +++ b/p2p/address-book/src/lib.rs @@ -2,7 +2,7 @@ //! //! This module holds the logic for persistent peer storage. //! Cuprates address book is modeled as a [`tower::Service`] -//! The request is [`AddressBookRequest`] and the response is +//! The request is [`AddressBookRequest`](monero_p2p::services::AddressBookRequest) and the response is //! [`AddressBookResponse`](monero_p2p::services::AddressBookResponse). //! //! Cuprate, like monerod, actually has multiple address books, one @@ -13,9 +13,7 @@ //! use std::{io::ErrorKind, path::PathBuf, time::Duration}; -use tower::buffer::Buffer; - -use monero_p2p::{services::AddressBookRequest, NetworkZone}; +use monero_p2p::NetworkZone; mod book; mod peer_list; @@ -65,7 +63,7 @@ pub enum AddressBookError { /// Initializes the P2P address book for a specific network zone. pub async fn init_address_book( cfg: AddressBookConfig, -) -> Result, AddressBookRequest>, std::io::Error> { +) -> Result, std::io::Error> { tracing::info!( "Loading peers from file: {} ", cfg.peer_store_file.display() @@ -82,5 +80,5 @@ pub async fn init_address_book( let address_book = book::AddressBook::::new(cfg, white_list, gray_list, Vec::new()); - Ok(Buffer::new(address_book, 150)) + Ok(address_book) } diff --git a/p2p/async-buffer/src/lib.rs b/p2p/async-buffer/src/lib.rs index ded8c6a..0e2ced2 100644 --- a/p2p/async-buffer/src/lib.rs +++ b/p2p/async-buffer/src/lib.rs @@ -48,7 +48,7 @@ pub fn new_buffer(max_item_weight: usize) -> (BufferAppender, BufferStream queue: tx, sink_waker: sink_waker.clone(), capacity: capacity_atomic.clone(), - max_item_weight: capacity, + max_item_weight, }, BufferStream { queue: rx, diff --git a/p2p/cuprate-p2p/Cargo.toml b/p2p/cuprate-p2p/Cargo.toml index d73684a..687493a 100644 --- a/p2p/cuprate-p2p/Cargo.toml +++ b/p2p/cuprate-p2p/Cargo.toml @@ -15,8 +15,8 @@ cuprate-helper = { path = "../../helper", features = ["asynch"] } monero-serai = { workspace = true, features = ["std"] } -tower = { workspace = true } -tokio = { workspace = true, features = ["rt"] } +tower = { workspace = true, features = ["buffer"] } +tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } rayon = { workspace = true } tokio-util = { workspace = true } tokio-stream = { workspace = true, features = ["sync", "time"] } @@ -32,5 +32,7 @@ rand_distr = { workspace = true, features = ["std"] } hex = { workspace = true, features = ["std"] } tracing = { workspace = true, features = ["std", "attributes"] } +tracing-subscriber = "0.3.18" + [dev-dependencies] cuprate-test-utils = { path = "../../test-utils" } diff --git a/p2p/cuprate-p2p/src/broadcast.rs b/p2p/cuprate-p2p/src/broadcast.rs index cc8a3fd..b6e5e80 100644 --- a/p2p/cuprate-p2p/src/broadcast.rs +++ b/p2p/cuprate-p2p/src/broadcast.rs @@ -151,6 +151,7 @@ pub enum BroadcastRequest { }, } +#[derive(Clone)] pub struct BroadcastSvc { new_block_watch: watch::Sender, tx_broadcast_channel_outbound: broadcast::Sender>, diff --git a/p2p/cuprate-p2p/src/client_pool.rs b/p2p/cuprate-p2p/src/client_pool.rs index 5dc7d1b..8b77f42 100644 --- a/p2p/cuprate-p2p/src/client_pool.rs +++ b/p2p/cuprate-p2p/src/client_pool.rs @@ -12,13 +12,14 @@ //! use std::sync::Arc; -use dashmap::{DashMap, DashSet}; +use dashmap::DashMap; use tokio::sync::mpsc; +use tracing::{Instrument, Span}; use monero_p2p::{ client::{Client, InternalPeerID}, handles::ConnectionHandle, - ConnectionDirection, NetworkZone, + NetworkZone, }; pub(crate) mod disconnect_monitor; @@ -32,12 +33,6 @@ pub use drop_guard_client::ClientPoolDropGuard; pub struct ClientPool { /// The connected [`Client`]s. clients: DashMap, Client>, - /// A set of outbound clients, as these allow accesses/mutation from different threads, - /// a peer ID in here does not mean the peer is necessarily in `clients` as it could have been removed - /// by another thread. However, if the peer is in both here and `clients` it is definitely - /// an outbound peer. - outbound_clients: DashSet>, - /// A channel to send new peer ids down to monitor for disconnect. new_connection_tx: mpsc::UnboundedSender<(ConnectionHandle, InternalPeerID)>, } @@ -49,11 +44,12 @@ impl ClientPool { let pool = Arc::new(ClientPool { clients: DashMap::new(), - outbound_clients: DashSet::new(), new_connection_tx: tx, }); - tokio::spawn(disconnect_monitor::disconnect_monitor(rx, pool.clone())); + tokio::spawn( + disconnect_monitor::disconnect_monitor(rx, pool.clone()).instrument(Span::current()), + ); pool } @@ -74,10 +70,6 @@ impl ClientPool { return; } - if client.info.direction == ConnectionDirection::OutBound { - self.outbound_clients.insert(id); - } - let res = self.clients.insert(id, client); assert!(res.is_none()); @@ -106,8 +98,6 @@ impl ClientPool { /// /// [`None`] is returned if the client did not exist in the pool. fn remove_client(&self, peer: &InternalPeerID) -> Option> { - self.outbound_clients.remove(peer); - self.clients.remove(peer).map(|(_, client)| client) } diff --git a/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs b/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs index 4e5ec08..e83fa32 100644 --- a/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs +++ b/p2p/cuprate-p2p/src/client_pool/disconnect_monitor.rs @@ -24,6 +24,11 @@ pub async fn disconnect_monitor( mut new_connection_rx: mpsc::UnboundedReceiver<(ConnectionHandle, InternalPeerID)>, client_pool: Arc>, ) { + // We need to hold a weak reference otherwise the client pool and this would hold a reference to + // each other causing the pool to be leaked. + let weak_client_pool = Arc::downgrade(&client_pool); + drop(client_pool); + tracing::info!("Starting peer disconnect monitor."); let mut futs: FuturesUnordered> = FuturesUnordered::new(); @@ -39,7 +44,13 @@ pub async fn disconnect_monitor( } Some(peer_id) = futs.next() => { tracing::debug!("{peer_id} has disconnected, removing from client pool."); - client_pool.remove_client(&peer_id); + let Some(pool) = weak_client_pool.upgrade() else { + tracing::info!("Peer disconnect monitor shutting down."); + return; + }; + + pool.remove_client(&peer_id); + drop(pool); } else => { tracing::info!("Peer disconnect monitor shutting down."); diff --git a/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs b/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs index 5555d71..d8c20c6 100644 --- a/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs +++ b/p2p/cuprate-p2p/src/client_pool/drop_guard_client.rs @@ -14,7 +14,7 @@ pub struct ClientPoolDropGuard { /// The [`Client`]. /// /// This is set to [`Some`] when this guard is created, then - /// ### [`take`](Option::take)n and returned to the pool when dropped. + /// [`take`](Option::take)n and returned to the pool when dropped. pub(super) client: Option>, } diff --git a/p2p/cuprate-p2p/src/config.rs b/p2p/cuprate-p2p/src/config.rs index 31b5ab1..a92ad9a 100644 --- a/p2p/cuprate-p2p/src/config.rs +++ b/p2p/cuprate-p2p/src/config.rs @@ -1,12 +1,52 @@ +use cuprate_helper::network::Network; +use monero_address_book::AddressBookConfig; +use monero_p2p::NetworkZone; +use monero_wire::{common::PeerSupportFlags, BasicNodeData}; + /// P2P config. #[derive(Clone, Debug)] -pub struct P2PConfig { +pub struct P2PConfig { + /// The [`Network`] we should connect to. + pub network: Network, + /// The number of outbound connections to make and try keep. pub outbound_connections: usize, /// The amount of extra connections we can make if we are under load from the rest of Cuprate. pub extra_outbound_connections: usize, + /// The maximum amount of inbound connections, only relevant if [`P2PConfig::server_config`] is set to [`Some`] + pub max_inbound_connections: usize, /// The percent of outbound peers that should be gray aka never connected to before. /// /// Only values 0..=1 are valid. pub gray_peers_percent: f64, + /// The inbound server configuration, + /// + /// If this is [`None`] no inbound connections will be accepted. + pub server_config: Option, + + /// The port to listen on for inbound connections, only relevant if [`P2PConfig::server_config`] is set to [`Some`]. + pub p2p_port: u16, + /// The public RPC port to tell peers about so wallets can use our node. `0` if we do not have a public RPC port. + pub rpc_port: u16, + + /// The [`AddressBookConfig`]. + pub address_book_config: AddressBookConfig, +} + +impl P2PConfig { + /// Returns the [`BasicNodeData`] for this [`P2PConfig`]. + /// + /// [`BasicNodeData::peer_id`] is set to a random u64, so this function should only be called once + /// per [`NetworkZone`] per run. + pub(crate) fn basic_node_data(&self) -> BasicNodeData { + BasicNodeData { + my_port: u32::from(self.p2p_port), + network_id: self.network.network_id(), + peer_id: rand::random(), + support_flags: PeerSupportFlags::FLUFFY_BLOCKS, + rpc_port: self.rpc_port, + // We do not (and probably will never) support paying for RPC with hashes. + rpc_credits_per_hash: 0, + } + } } diff --git a/p2p/cuprate-p2p/src/connection_maintainer.rs b/p2p/cuprate-p2p/src/connection_maintainer.rs index bff4b9d..4ec6695 100644 --- a/p2p/cuprate-p2p/src/connection_maintainer.rs +++ b/p2p/cuprate-p2p/src/connection_maintainer.rs @@ -12,7 +12,7 @@ use tokio::{ time::{sleep, timeout}, }; use tower::{Service, ServiceExt}; -use tracing::instrument; +use tracing::{instrument, Instrument, Span}; use monero_p2p::{ client::{Client, ConnectRequest, HandshakeError}, @@ -60,7 +60,7 @@ pub struct OutboundConnectionKeeper { /// we add a permit to the semaphore and keep track here, upto a value in config. pub extra_peers: usize, /// The p2p config. - pub config: P2PConfig, + pub config: P2PConfig, /// The [`Bernoulli`] distribution, when sampled will return true if we should connect to a gray peer or /// false if we should connect to a white peer. /// @@ -76,7 +76,7 @@ where C::Future: Send + 'static, { pub fn new( - config: P2PConfig, + config: P2PConfig, client_pool: Arc>, make_connection_rx: mpsc::Receiver, address_book_svc: A, @@ -149,7 +149,7 @@ where } /// Connects to a given outbound peer. - #[instrument(level = "info", skip(self, permit), fields(%addr))] + #[instrument(level = "info", skip_all)] async fn connect_to_outbound_peer(&mut self, permit: OwnedSemaphorePermit, addr: N::Addr) { let client_pool = self.client_pool.clone(); let connection_fut = self @@ -159,11 +159,14 @@ where .expect("Connector had an error in `poll_ready`") .call(ConnectRequest { addr, permit }); - tokio::spawn(async move { - if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await { - client_pool.add_new_client(peer); + tokio::spawn( + async move { + if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, connection_fut).await { + client_pool.add_new_client(peer); + } } - }); + .instrument(Span::current()), + ); } /// Handles a request from the peer set for more peers. diff --git a/p2p/cuprate-p2p/src/constants.rs b/p2p/cuprate-p2p/src/constants.rs index 8ec0275..0c65386 100644 --- a/p2p/cuprate-p2p/src/constants.rs +++ b/p2p/cuprate-p2p/src/constants.rs @@ -1,7 +1,7 @@ use std::time::Duration; /// The timeout we set on handshakes. -pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30); +pub(crate) const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(20); /// The maximum amount of connections to make to seed nodes for when we need peers. pub(crate) const MAX_SEED_CONNECTIONS: usize = 3; @@ -28,6 +28,12 @@ pub(crate) const SOFT_TX_MESSAGE_SIZE_SIZE_LIMIT: usize = 10 * 1024 * 1024; /// 50 more transactions after it are added to the queue. pub(crate) const MAX_TXS_IN_BROADCAST_CHANNEL: usize = 50; +/// The time to sleep after an inbound connection comes in. +/// +/// This is a safety measure to prevent Cuprate from getting spammed with a load of inbound connections. +/// TODO: it might be a good idea to make this configurable. +pub(crate) const INBOUND_CONNECTION_COOL_DOWN: Duration = Duration::from_millis(500); + #[cfg(test)] mod tests { use super::*; diff --git a/p2p/cuprate-p2p/src/inbound_server.rs b/p2p/cuprate-p2p/src/inbound_server.rs new file mode 100644 index 0000000..d8389e7 --- /dev/null +++ b/p2p/cuprate-p2p/src/inbound_server.rs @@ -0,0 +1,113 @@ +//! # Inbound Server +//! +//! This module contains the inbound connection server, which listens for inbound connections, gives +//! them to the handshaker service and then adds them to the client pool. +use std::{pin::pin, sync::Arc}; + +use futures::StreamExt; +use tokio::{ + sync::Semaphore, + time::{sleep, timeout}, +}; +use tower::{Service, ServiceExt}; +use tracing::{instrument, Instrument, Span}; + +use monero_p2p::{ + client::{Client, DoHandshakeRequest, HandshakeError, InternalPeerID}, + services::{AddressBookRequest, AddressBookResponse}, + AddressBook, ConnectionDirection, NetworkZone, +}; + +use crate::{ + client_pool::ClientPool, + constants::{HANDSHAKE_TIMEOUT, INBOUND_CONNECTION_COOL_DOWN}, + P2PConfig, +}; + +/// Starts the inbound server. +#[instrument(level = "warn", skip_all)] +pub async fn inbound_server( + client_pool: Arc>, + mut handshaker: HS, + mut address_book: A, + config: P2PConfig, +) -> Result<(), tower::BoxError> +where + N: NetworkZone, + HS: Service, Response = Client, Error = HandshakeError> + + Send + + 'static, + HS::Future: Send + 'static, + A: AddressBook, +{ + let Some(server_config) = config.server_config else { + tracing::warn!("No inbound server config provided, not listening for inbound connections."); + return Ok(()); + }; + + tracing::info!("Starting inbound connection server"); + + let listener = N::incoming_connection_listener(server_config, config.p2p_port) + .await + .inspect_err(|e| tracing::warn!("Failed to start inbound server: {e}"))?; + + let mut listener = pin!(listener); + + let semaphore = Arc::new(Semaphore::new(config.max_inbound_connections)); + + while let Some(connection) = listener.next().await { + let Ok((addr, peer_stream, peer_sink)) = connection else { + continue; + }; + + if let Some(addr) = &addr { + let AddressBookResponse::IsPeerBanned(banned) = address_book + .ready() + .await? + .call(AddressBookRequest::IsPeerBanned(*addr)) + .await? + else { + panic!("Address book returned incorrect response!"); + }; + + if banned { + continue; + } + } + + let addr = match addr { + Some(addr) => InternalPeerID::KnownAddr(addr), + None => InternalPeerID::Unknown(rand::random()), + }; + + if let Ok(permit) = semaphore.clone().try_acquire_owned() { + tracing::debug!("Permit free for incoming connection, attempting handshake."); + + let fut = handshaker.ready().await?.call(DoHandshakeRequest { + addr, + peer_stream, + peer_sink, + direction: ConnectionDirection::InBound, + permit, + }); + + let cloned_pool = client_pool.clone(); + + tokio::spawn( + async move { + if let Ok(Ok(peer)) = timeout(HANDSHAKE_TIMEOUT, fut).await { + cloned_pool.add_new_client(peer); + } + } + .instrument(Span::current()), + ); + } else { + tracing::debug!("No permit free for incoming connection."); + // TODO: listen for if the peer is just trying to ping us to see if we are reachable. + } + + sleep(INBOUND_CONNECTION_COOL_DOWN).await; + } + + Ok(()) +} diff --git a/p2p/cuprate-p2p/src/lib.rs b/p2p/cuprate-p2p/src/lib.rs index afa4c93..37ea32a 100644 --- a/p2p/cuprate-p2p/src/lib.rs +++ b/p2p/cuprate-p2p/src/lib.rs @@ -1,17 +1,186 @@ //! Cuprate's P2P Crate. //! -//! This crate contains a [`ClientPool`](client_pool::ClientPool) which holds connected peers on a single [`NetworkZone`](monero_p2p::NetworkZone). -//! -//! This crate also contains the different routing methods that control how messages should be sent, i.e. broadcast to all, -//! or send to a single peer. -//! -#![allow(dead_code)] +//! This crate contains a [`NetworkInterface`] which allows interacting with the Monero P2P network on +//! a certain [`NetworkZone`] +use std::sync::Arc; + +use futures::FutureExt; +use tokio::{ + sync::{mpsc, watch}, + task::JoinSet, +}; +use tokio_stream::wrappers::WatchStream; +use tower::{buffer::Buffer, util::BoxCloneService, ServiceExt}; +use tracing::{instrument, Instrument, Span}; + +use monero_p2p::{ + client::Connector, + client::InternalPeerID, + services::{AddressBookRequest, AddressBookResponse}, + CoreSyncSvc, NetworkZone, PeerRequestHandler, +}; mod broadcast; -pub mod client_pool; +mod client_pool; pub mod config; pub mod connection_maintainer; mod constants; +mod inbound_server; mod sync_states; +pub use broadcast::{BroadcastRequest, BroadcastSvc}; +use client_pool::ClientPoolDropGuard; pub use config::P2PConfig; +use connection_maintainer::MakeConnectionRequest; + +/// Initializes the P2P [`NetworkInterface`] for a specific [`NetworkZone`]. +/// +/// This function starts all the tasks to maintain/accept/make connections. +/// +/// # Usage +/// You must provide: +/// - A peer request handler, which is given to each connection +/// - A core sync service, which keeps track of the sync state of our node +#[instrument(level = "debug", name = "net", skip_all, fields(zone = N::NAME))] +pub async fn initialize_network( + peer_req_handler: R, + core_sync_svc: CS, + config: P2PConfig, +) -> Result, tower::BoxError> +where + N: NetworkZone, + R: PeerRequestHandler + Clone, + CS: CoreSyncSvc + Clone, +{ + let address_book = + monero_address_book::init_address_book(config.address_book_config.clone()).await?; + let address_book = Buffer::new( + address_book, + config.max_inbound_connections + config.outbound_connections, + ); + + let (sync_states_svc, top_block_watch) = sync_states::PeerSyncSvc::new(); + let sync_states_svc = Buffer::new( + sync_states_svc, + config.max_inbound_connections + config.outbound_connections, + ); + + // Use the default config. Changing the defaults affects tx fluff times, which could affect D++ so for now don't allow changing + // this. + let (broadcast_svc, outbound_mkr, inbound_mkr) = + broadcast::init_broadcast_channels(broadcast::BroadcastConfig::default()); + + let mut basic_node_data = config.basic_node_data(); + + if !N::CHECK_NODE_ID { + basic_node_data.peer_id = 1; + } + + let outbound_handshaker = monero_p2p::client::HandShaker::new( + address_book.clone(), + sync_states_svc.clone(), + core_sync_svc.clone(), + peer_req_handler.clone(), + outbound_mkr, + basic_node_data.clone(), + ); + + let inbound_handshaker = monero_p2p::client::HandShaker::new( + address_book.clone(), + sync_states_svc, + core_sync_svc.clone(), + peer_req_handler, + inbound_mkr, + basic_node_data, + ); + + let client_pool = client_pool::ClientPool::new(); + + let (make_connection_tx, make_connection_rx) = mpsc::channel(3); + + let outbound_connector = Connector::new(outbound_handshaker); + let outbound_connection_maintainer = connection_maintainer::OutboundConnectionKeeper::new( + config.clone(), + client_pool.clone(), + make_connection_rx, + address_book.clone(), + outbound_connector, + ); + + let mut background_tasks = JoinSet::new(); + + background_tasks.spawn( + outbound_connection_maintainer + .run() + .instrument(Span::current()), + ); + background_tasks.spawn( + inbound_server::inbound_server( + client_pool.clone(), + inbound_handshaker, + address_book.clone(), + config, + ) + .map(|res| { + if let Err(e) = res { + tracing::error!("Error in inbound connection listener: {e}") + } + + tracing::info!("Inbound connection listener shutdown") + }) + .instrument(Span::current()), + ); + + Ok(NetworkInterface { + pool: client_pool, + broadcast_svc, + top_block_watch, + make_connection_tx, + address_book: address_book.boxed_clone(), + _background_tasks: Arc::new(background_tasks), + }) +} + +/// The interface to Monero's P2P network on a certain [`NetworkZone`]. +#[derive(Clone)] +pub struct NetworkInterface { + /// A pool of free connected peers. + pool: Arc>, + /// A [`Service`] that allows broadcasting to all connected peers. + broadcast_svc: BroadcastSvc, + /// A [`watch`] channel that contains the highest seen cumulative difficulty and other info + /// on that claimed chain. + top_block_watch: watch::Receiver, + /// A channel to request extra connections. + #[allow(dead_code)] // will be used eventually + make_connection_tx: mpsc::Sender, + /// The address book service. + address_book: BoxCloneService, AddressBookResponse, tower::BoxError>, + /// Background tasks that will be aborted when this interface is dropped. + _background_tasks: Arc>, +} + +impl NetworkInterface { + /// Returns a service which allows broadcasting messages to all the connected peers in a specific [`NetworkZone`]. + pub fn broadcast_svc(&self) -> BroadcastSvc { + self.broadcast_svc.clone() + } + + /// Returns a stream which yields the highest seen sync state from a connected peer. + pub fn top_sync_stream(&self) -> WatchStream { + WatchStream::from_changes(self.top_block_watch.clone()) + } + + /// Returns the address book service. + pub fn address_book( + &self, + ) -> BoxCloneService, AddressBookResponse, tower::BoxError> { + self.address_book.clone() + } + + /// Pulls a client from the client pool, returning it in a guard that will return it there when it's + /// dropped. + pub fn borrow_client(&self, peer: &InternalPeerID) -> Option> { + self.pool.borrow_client(peer) + } +} diff --git a/p2p/cuprate-p2p/src/sync_states.rs b/p2p/cuprate-p2p/src/sync_states.rs index 9b8b3bd..127b8d7 100644 --- a/p2p/cuprate-p2p/src/sync_states.rs +++ b/p2p/cuprate-p2p/src/sync_states.rs @@ -25,14 +25,14 @@ use monero_wire::CoreSyncData; use crate::{client_pool::disconnect_monitor::PeerDisconnectFut, constants::SHORT_BAN}; /// The highest claimed sync info from our connected peers. -#[derive(Debug)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct NewSyncInfo { /// The peers chain height. - chain_height: u64, + pub chain_height: u64, /// The peers top block's hash. - top_hash: [u8; 32], + pub top_hash: [u8; 32], /// The peers cumulative difficulty. - cumulative_difficulty: u128, + pub cumulative_difficulty: u128, } /// A service that keeps track of our peers blockchains. diff --git a/p2p/dandelion/Cargo.toml b/p2p/dandelion/Cargo.toml index a8a0469..e5d7e34 100644 --- a/p2p/dandelion/Cargo.toml +++ b/p2p/dandelion/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "dandelion_tower" +name = "dandelion-tower" version = "0.1.0" edition = "2021" license = "MIT" diff --git a/p2p/monero-p2p/src/client.rs b/p2p/monero-p2p/src/client.rs index 8aab306..02deae5 100644 --- a/p2p/monero-p2p/src/client.rs +++ b/p2p/monero-p2p/src/client.rs @@ -29,13 +29,13 @@ pub use handshaker::{DoHandshakeRequest, HandShaker, HandshakeError}; use monero_pruning::PruningSeed; /// An internal identifier for a given peer, will be their address if known -/// or a random u64 if not. +/// or a random u128 if not. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub enum InternalPeerID { /// A known address. KnownAddr(A), /// An unknown address (probably an inbound anonymity network connection). - Unknown(u64), + Unknown(u128), } impl Display for InternalPeerID { diff --git a/p2p/monero-p2p/src/lib.rs b/p2p/monero-p2p/src/lib.rs index 9c17132..13ecf4a 100644 --- a/p2p/monero-p2p/src/lib.rs +++ b/p2p/monero-p2p/src/lib.rs @@ -130,11 +130,11 @@ pub trait NetworkZone: Clone + Copy + Send + 'static { /// The sink (outgoing data) type for this network. type Sink: Sink, Error = BucketError> + Unpin + Send + 'static; /// The inbound connection listener for this network. - type Listener: Stream< - Item = Result<(Option, Self::Stream, Self::Sink), std::io::Error>, - >; + type Listener: Stream, Self::Stream, Self::Sink), std::io::Error>> + + Send + + 'static; /// Config used to start a server which listens for incoming connections. - type ServerCfg; + type ServerCfg: Clone + Debug + Send + 'static; async fn connect_to_peer( addr: Self::Addr, @@ -142,6 +142,7 @@ pub trait NetworkZone: Clone + Copy + Send + 'static { async fn incoming_connection_listener( config: Self::ServerCfg, + port: u16, ) -> Result; } diff --git a/p2p/monero-p2p/src/network_zones/clear.rs b/p2p/monero-p2p/src/network_zones/clear.rs index 5141a06..c77f133 100644 --- a/p2p/monero-p2p/src/network_zones/clear.rs +++ b/p2p/monero-p2p/src/network_zones/clear.rs @@ -37,8 +37,9 @@ impl NetZoneAddress for SocketAddr { } } +#[derive(Debug, Clone)] pub struct ClearNetServerCfg { - pub addr: SocketAddr, + pub ip: IpAddr, } #[derive(Clone, Copy)] @@ -80,8 +81,9 @@ impl NetworkZone for ClearNet { async fn incoming_connection_listener( config: Self::ServerCfg, + port: u16, ) -> Result { - let listener = TcpListener::bind(config.addr).await?; + let listener = TcpListener::bind(SocketAddr::new(config.ip, port)).await?; Ok(InBoundStream { listener }) } } diff --git a/p2p/monero-p2p/src/services.rs b/p2p/monero-p2p/src/services.rs index e86e277..a0ea2e7 100644 --- a/p2p/monero-p2p/src/services.rs +++ b/p2p/monero-p2p/src/services.rs @@ -119,10 +119,14 @@ pub enum AddressBookRequest { TakeRandomPeer { height: Option }, /// Gets the specified number of white peers, or less if we don't have enough. GetWhitePeers(usize), + /// Checks if the given peer is banned. + IsPeerBanned(Z::Addr), } pub enum AddressBookResponse { Ok, Peer(ZoneSpecificPeerListEntryBase), Peers(Vec>), + /// Contains `true` if the peer is banned. + IsPeerBanned(bool), } diff --git a/p2p/monero-p2p/tests/fragmented_handshake.rs b/p2p/monero-p2p/tests/fragmented_handshake.rs index 60d490f..e9833cf 100644 --- a/p2p/monero-p2p/tests/fragmented_handshake.rs +++ b/p2p/monero-p2p/tests/fragmented_handshake.rs @@ -71,8 +71,9 @@ impl NetworkZone for FragNet { async fn incoming_connection_listener( config: Self::ServerCfg, + port: u16, ) -> Result { - let listener = TcpListener::bind(config.addr).await?; + let listener = TcpListener::bind(SocketAddr::new(config.ip, port)).await?; Ok(InBoundStream { listener }) } } @@ -194,9 +195,9 @@ async fn fragmented_handshake_monerod_to_cuprate() { our_basic_node_data, ); - let addr = "127.0.0.1:18081".parse().unwrap(); + let ip = "127.0.0.1".parse().unwrap(); - let mut listener = FragNet::incoming_connection_listener(ClearNetServerCfg { addr }) + let mut listener = FragNet::incoming_connection_listener(ClearNetServerCfg { ip }, 18081) .await .unwrap(); diff --git a/p2p/monero-p2p/tests/handshake.rs b/p2p/monero-p2p/tests/handshake.rs index 1d8b649..b63a221 100644 --- a/p2p/monero-p2p/tests/handshake.rs +++ b/p2p/monero-p2p/tests/handshake.rs @@ -174,9 +174,9 @@ async fn handshake_monerod_to_cuprate() { our_basic_node_data, ); - let addr = "127.0.0.1:18081".parse().unwrap(); + let ip = "127.0.0.1".parse().unwrap(); - let mut listener = ClearNet::incoming_connection_listener(ClearNetServerCfg { addr }) + let mut listener = ClearNet::incoming_connection_listener(ClearNetServerCfg { ip }, 18081) .await .unwrap(); diff --git a/test-utils/src/test_netzone.rs b/test-utils/src/test_netzone.rs index 0a53416..e82e553 100644 --- a/test-utils/src/test_netzone.rs +++ b/test-utils/src/test_netzone.rs @@ -87,8 +87,9 @@ impl, Self::Stream, Self::Sink), std::io::Error>, - >, + Item = Result<(Option, Self::Stream, Self::Sink), std::io::Error>, + > + Send + + 'static, >, >; type ServerCfg = (); @@ -97,7 +98,10 @@ impl Result { + async fn incoming_connection_listener( + _: Self::ServerCfg, + _: u16, + ) -> Result { unimplemented!() } } From 0622237d19e655fa68b3814c4e3d2ac5b3f71fb8 Mon Sep 17 00:00:00 2001 From: Boog900 Date: Tue, 4 Jun 2024 17:19:35 +0000 Subject: [PATCH 2/4] Consensus: use `cuprate-types` types (#145) * move consensus database to /types * fix `storage` builds * unify `VerifiedBlockInformation` * fix docs * change `Database` trait wording * order imports * service -> blockchain * Apply suggestions from code review Co-authored-by: hinto-janai * fix typo * fix key_images_spent * add back todo * fix tests * service -> blockchain 2 * update docs * update docs 2 --------- Co-authored-by: hinto-janai --- Cargo.lock | 24 ++- consensus/Cargo.toml | 1 + consensus/rules/src/hard_forks.rs | 3 + .../rules/src/transactions/contextual_data.rs | 121 +------------- consensus/src/block.rs | 78 ++++----- consensus/src/context/difficulty.rs | 7 +- consensus/src/context/hardforks.rs | 16 +- consensus/src/context/rx_vms.rs | 9 +- consensus/src/context/task.rs | 11 +- consensus/src/context/weight.rs | 11 +- consensus/src/lib.rs | 152 ++++-------------- consensus/src/tests/mock_db.rs | 35 ++-- consensus/src/transactions.rs | 29 ++-- consensus/src/transactions/contextual_data.rs | 134 +++++++++++++-- consensus/tests/verify_correct_txs.rs | 21 +-- storage/cuprate-blockchain/Cargo.toml | 2 +- storage/cuprate-blockchain/src/service/mod.rs | 16 +- .../cuprate-blockchain/src/service/read.rs | 90 +++++++---- .../cuprate-blockchain/src/service/tests.rs | 60 +++---- .../cuprate-blockchain/src/service/types.rs | 6 +- .../cuprate-blockchain/src/service/write.rs | 22 +-- test-utils/src/data/free.rs | 38 ++--- test-utils/src/data/mod.rs | 4 +- test-utils/src/rpc/client.rs | 13 +- types/Cargo.toml | 7 +- types/src/{service.rs => blockchain.rs} | 69 ++++---- types/src/lib.rs | 13 +- types/src/types.rs | 32 +--- 28 files changed, 470 insertions(+), 554 deletions(-) rename types/src/{service.rs => blockchain.rs} (72%) diff --git a/Cargo.lock b/Cargo.lock index 31959a5..b02865d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -217,18 +217,18 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytemuck" -version = "1.15.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d6d68c57235a3a081186990eca2867354726650f42f7516ca50c28d6281fd15" +checksum = "78834c15cb5d5efe3452d58b1e8ba890dd62d21907f867f383358198e56ebca5" dependencies = [ "bytemuck_derive", ] [[package]] name = "bytemuck_derive" -version = "1.6.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4da9a32f3fed317401fa3c862968128267c3106685286e15d5aaa3d7389c2f60" +checksum = "1ee891b04274a59bd38b412188e24b849617b2e45a0fd8d057deb63e7403761b" dependencies = [ "proc-macro2", "quote", @@ -326,9 +326,9 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.12" +version = "0.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" dependencies = [ "crossbeam-utils", ] @@ -441,6 +441,7 @@ dependencies = [ "cuprate-consensus-rules", "cuprate-helper", "cuprate-test-utils", + "cuprate-types", "curve25519-dalek", "dalek-ff-group", "futures", @@ -557,11 +558,8 @@ version = "0.0.0" name = "cuprate-types" version = "0.0.0" dependencies = [ - "borsh", - "cfg-if", "curve25519-dalek", "monero-serai", - "serde", ] [[package]] @@ -972,9 +970,9 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "heed" -version = "0.20.0" +version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7a300b0deeb2957162d7752b0f063b3be1c88333af5bb4e7a57d8fb3716f50b" +checksum = "f60d7cff16094be9627830b399c087a25017e93fb3768b87cd656a68ccb1ebe8" dependencies = [ "bitflags 2.5.0", "byteorder", @@ -1260,9 +1258,9 @@ checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] name = "lmdb-master-sys" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc9048db3a58c0732d7236abc4909058f9d2708cfb6d7d047eb895fddec6419a" +checksum = "a5142795c220effa4c8f4813537bd4c88113a07e45e93100ccb2adc5cec6c7f3" dependencies = [ "cc", "doxygen-rs", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 49236d9..624eb63 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -10,6 +10,7 @@ repository = "https://github.com/Cuprate/cuprate/tree/main/consensus" [dependencies] cuprate-helper = { path = "../helper", default-features = false, features = ["std", "asynch", "num"] } cuprate-consensus-rules = { path = "./rules", features = ["rayon"] } +cuprate-types = { path = "../types" } thiserror = { workspace = true } tower = { workspace = true, features = ["util"] } diff --git a/consensus/rules/src/hard_forks.rs b/consensus/rules/src/hard_forks.rs index 5be1f9d..b34b93d 100644 --- a/consensus/rules/src/hard_forks.rs +++ b/consensus/rules/src/hard_forks.rs @@ -163,6 +163,7 @@ impl HardFork { /// Returns the hard-fork for a blocks `major_version` field. /// /// + #[inline] pub fn from_version(version: u8) -> Result { Ok(match version { 1 => HardFork::V1, @@ -188,6 +189,7 @@ impl HardFork { /// Returns the hard-fork for a blocks `minor_version` (vote) field. /// /// + #[inline] pub fn from_vote(vote: u8) -> HardFork { if vote == 0 { // A vote of 0 is interpreted as 1 as that's what Monero used to default to. @@ -197,6 +199,7 @@ impl HardFork { Self::from_version(vote).unwrap_or(HardFork::V16) } + #[inline] pub fn from_block_header(header: &BlockHeader) -> Result<(HardFork, HardFork), HardForkError> { Ok(( HardFork::from_version(header.major_version)?, diff --git a/consensus/rules/src/transactions/contextual_data.rs b/consensus/rules/src/transactions/contextual_data.rs index 2c4943a..6af3ad3 100644 --- a/consensus/rules/src/transactions/contextual_data.rs +++ b/consensus/rules/src/transactions/contextual_data.rs @@ -6,22 +6,13 @@ use std::{ use curve25519_dalek::EdwardsPoint; use monero_serai::transaction::{Input, Timelock}; -use crate::{transactions::TransactionError, HardFork, TxVersion}; - -/// An already approved previous transaction output. -#[derive(Debug, Copy, Clone)] -pub struct OutputOnChain { - pub height: u64, - pub time_lock: Timelock, - pub key: Option, - pub commitment: EdwardsPoint, -} +use crate::{transactions::TransactionError, HardFork}; /// Gets the absolute offsets from the relative offsets. /// /// This function will return an error if the relative offsets are empty. /// -fn get_absolute_offsets(relative_offsets: &[u64]) -> Result, TransactionError> { +pub fn get_absolute_offsets(relative_offsets: &[u64]) -> Result, TransactionError> { if relative_offsets.is_empty() { return Err(TransactionError::InputDoesNotHaveExpectedNumbDecoys); } @@ -64,35 +55,6 @@ pub fn insert_ring_member_ids( Ok(()) } -/// Get the ring members for the inputs from the outputs on the chain. -/// -/// Will error if `outputs` does not contain the outputs needed. -pub fn get_ring_members_for_inputs( - get_outputs: impl Fn(u64, u64) -> Option, - inputs: &[Input], -) -> Result>, TransactionError> { - inputs - .iter() - .map(|inp| match inp { - Input::ToKey { - amount, - key_offsets, - .. - } => { - let offsets = get_absolute_offsets(key_offsets)?; - Ok(offsets - .iter() - .map(|offset| { - get_outputs(amount.unwrap_or(0), *offset) - .ok_or(TransactionError::RingMemberNotFoundOrInvalid) - }) - .collect::>()?) - } - _ => Err(TransactionError::IncorrectInputType), - }) - .collect::>() -} - /// Represents the ring members of all the inputs. #[derive(Debug)] pub enum Rings { @@ -102,45 +64,6 @@ pub enum Rings { RingCT(Vec>), } -impl Rings { - /// Builds the rings for the transaction inputs, from the given outputs. - fn new( - outputs: Vec>, - tx_version: TxVersion, - ) -> Result { - Ok(match tx_version { - TxVersion::RingSignatures => Rings::Legacy( - outputs - .into_iter() - .map(|inp_outs| { - inp_outs - .into_iter() - .map(|out| out.key.ok_or(TransactionError::RingMemberNotFoundOrInvalid)) - .collect::, TransactionError>>() - }) - .collect::, TransactionError>>()?, - ), - TxVersion::RingCT => Rings::RingCT( - outputs - .into_iter() - .map(|inp_outs| { - inp_outs - .into_iter() - .map(|out| { - Ok([ - out.key - .ok_or(TransactionError::RingMemberNotFoundOrInvalid)?, - out.commitment, - ]) - }) - .collect::>() - }) - .collect::>()?, - ), - }) - } -} - /// Information on the outputs the transaction is referencing for inputs (ring members). #[derive(Debug)] pub struct TxRingMembersInfo { @@ -151,46 +74,6 @@ pub struct TxRingMembersInfo { pub time_locked_outs: Vec, } -impl TxRingMembersInfo { - /// Construct a [`TxRingMembersInfo`] struct. - /// - /// The used outs must be all the ring members used in the transactions inputs. - pub fn new( - used_outs: Vec>, - decoy_info: Option, - tx_version: TxVersion, - ) -> Result { - Ok(TxRingMembersInfo { - youngest_used_out_height: used_outs - .iter() - .map(|inp_outs| { - inp_outs - .iter() - // the output with the highest height is the youngest - .map(|out| out.height) - .max() - .expect("Input must have ring members") - }) - .max() - .expect("Tx must have inputs"), - time_locked_outs: used_outs - .iter() - .flat_map(|inp_outs| { - inp_outs - .iter() - .filter_map(|out| match out.time_lock { - Timelock::None => None, - lock => Some(lock), - }) - .collect::>() - }) - .collect(), - rings: Rings::new(used_outs, tx_version)?, - decoy_info, - }) - } -} - /// A struct holding information about the inputs and their decoys. This data can vary by block so /// this data needs to be retrieved after every change in the blockchain. /// diff --git a/consensus/src/block.rs b/consensus/src/block.rs index fec2990..a17709c 100644 --- a/consensus/src/block.rs +++ b/consensus/src/block.rs @@ -1,6 +1,6 @@ //! Block Verifier Service. use std::{ - collections::HashSet, + collections::HashMap, future::Future, pin::Pin, sync::Arc, @@ -17,6 +17,7 @@ use cuprate_consensus_rules::{ miner_tx::MinerTxError, ConsensusError, HardFork, }; +use cuprate_types::{VerifiedBlockInformation, VerifiedTransactionInformation}; use crate::{ context::{BlockChainContextRequest, BlockChainContextResponse}, @@ -83,37 +84,12 @@ impl PrePreparedBlock { } } -/// Information about a verified block. -#[derive(Debug)] -pub struct VerifiedBlockInformation { - /// The block that has been verified. - pub block: Block, - /// The block's hard-fork vote. - pub hf_vote: HardFork, - /// The txs in this block. - pub txs: Arc<[Arc]>, - /// The blocks hash. - pub block_hash: [u8; 32], - /// the blocks POW hash. - pub pow_hash: [u8; 32], - /// The blocks height. - pub height: u64, - /// The amount of coins generated by this block. - pub generated_coins: u64, - /// This blocks wight. - pub weight: usize, - /// This blocks long term weight. - pub long_term_weight: usize, - /// The cumulative difficulty of the chain including this block. - pub cumulative_difficulty: u128, -} - /// A request to verify a block. pub enum VerifyBlockRequest { /// A request to verify a block. MainChain { block: Block, - prepared_txs: Arc<[Arc]>, + prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, }, } @@ -211,7 +187,7 @@ where /// Verifies a prepared block. async fn verify_main_chain_block( block: Block, - txs: Arc<[Arc]>, + mut txs: HashMap<[u8; 32], TransactionVerificationData>, context_svc: C, tx_verifier_svc: TxV, ) -> Result @@ -255,28 +231,23 @@ where // Check that the txs included are what we need and that there are not any extra. - let mut tx_hashes = txs.iter().map(|tx| &tx.tx_hash).collect::>(); + let mut ordered_txs = Vec::with_capacity(txs.len()); tracing::debug!("Checking we have correct transactions for block."); - if tx_hashes.len() != txs.len() { - return Err(ExtendedConsensusError::TxsIncludedWithBlockIncorrect); - } - for tx_hash in &prepped_block.block.txs { - if !tx_hashes.remove(tx_hash) { - return Err(ExtendedConsensusError::TxsIncludedWithBlockIncorrect); - } - } - if !tx_hashes.is_empty() { - return Err(ExtendedConsensusError::TxsIncludedWithBlockIncorrect); + let tx = txs + .remove(tx_hash) + .ok_or(ExtendedConsensusError::TxsIncludedWithBlockIncorrect)?; + ordered_txs.push(Arc::new(tx)); } + drop(txs); tracing::debug!("Verifying transactions for block."); tx_verifier_svc .oneshot(VerifyTxRequest::Prepped { - txs: txs.clone(), + txs: ordered_txs.clone(), current_chain_height: context.chain_height, top_hash: context.top_hash, time_for_time_lock: context.current_adjusted_timestamp_for_time_lock(), @@ -285,11 +256,11 @@ where .await?; let block_weight = - prepped_block.miner_tx_weight + txs.iter().map(|tx| tx.tx_weight).sum::(); - let total_fees = txs.iter().map(|tx| tx.fee).sum::(); + prepped_block.miner_tx_weight + ordered_txs.iter().map(|tx| tx.tx_weight).sum::(); + let total_fees = ordered_txs.iter().map(|tx| tx.fee).sum::(); tracing::debug!("Verifying block header."); - let (hf_vote, generated_coins) = check_block( + let (_, generated_coins) = check_block( &prepped_block.block, total_fees, block_weight, @@ -301,13 +272,30 @@ where Ok(VerifyBlockResponse::MainChain(VerifiedBlockInformation { block_hash: prepped_block.block_hash, block: prepped_block.block, - txs, + block_blob: prepped_block.block_blob, + txs: ordered_txs + .into_iter() + .map(|tx| { + // Note: it would be possible for the transaction verification service to hold onto the tx after the call + // if one of txs was invalid and the rest are still in rayon threads. + let tx = Arc::into_inner(tx).expect( + "Transaction verification service should not hold onto valid transactions.", + ); + + VerifiedTransactionInformation { + tx_blob: tx.tx_blob, + tx_weight: tx.tx_weight, + fee: tx.fee, + tx_hash: tx.tx_hash, + tx: tx.tx, + } + }) + .collect(), pow_hash: prepped_block.pow_hash, generated_coins, weight: block_weight, height: context.chain_height, long_term_weight: context.next_block_long_term_weight(block_weight), - hf_vote, cumulative_difficulty: context.cumulative_difficulty + context.next_difficulty, })) } diff --git a/consensus/src/context/difficulty.rs b/consensus/src/context/difficulty.rs index c7accc9..9c8321f 100644 --- a/consensus/src/context/difficulty.rs +++ b/consensus/src/context/difficulty.rs @@ -12,8 +12,9 @@ use tower::ServiceExt; use tracing::instrument; use cuprate_helper::num::median; +use cuprate_types::blockchain::{BCReadRequest, BCResponse}; -use crate::{Database, DatabaseRequest, DatabaseResponse, ExtendedConsensusError, HardFork}; +use crate::{Database, ExtendedConsensusError, HardFork}; /// The amount of blocks we account for to calculate difficulty const DIFFICULTY_WINDOW: usize = 720; @@ -301,8 +302,8 @@ async fn get_blocks_in_pow_info( ) -> Result<(VecDeque, VecDeque), ExtendedConsensusError> { tracing::info!("Getting blocks timestamps"); - let DatabaseResponse::BlockExtendedHeaderInRange(ext_header) = database - .oneshot(DatabaseRequest::BlockExtendedHeaderInRange(block_heights)) + let BCResponse::BlockExtendedHeaderInRange(ext_header) = database + .oneshot(BCReadRequest::BlockExtendedHeaderInRange(block_heights)) .await? else { panic!("Database sent incorrect response"); diff --git a/consensus/src/context/hardforks.rs b/consensus/src/context/hardforks.rs index ea5a72d..92182c7 100644 --- a/consensus/src/context/hardforks.rs +++ b/consensus/src/context/hardforks.rs @@ -4,8 +4,9 @@ use tower::ServiceExt; use tracing::instrument; use cuprate_consensus_rules::{HFVotes, HFsInfo, HardFork}; +use cuprate_types::blockchain::{BCReadRequest, BCResponse}; -use crate::{Database, DatabaseRequest, DatabaseResponse, ExtendedConsensusError}; +use crate::{Database, ExtendedConsensusError}; /// The default amount of hard-fork votes to track to decide on activation of a hard-fork. /// @@ -86,16 +87,17 @@ impl HardForkState { debug_assert_eq!(votes.total_votes(), config.window) } - let DatabaseResponse::BlockExtendedHeader(ext_header) = database + let BCResponse::BlockExtendedHeader(ext_header) = database .ready() .await? - .call(DatabaseRequest::BlockExtendedHeader(chain_height - 1)) + .call(BCReadRequest::BlockExtendedHeader(chain_height - 1)) .await? else { panic!("Database sent incorrect response!"); }; - let current_hardfork = ext_header.version; + let current_hardfork = + HardFork::from_version(ext_header.version).expect("Stored block has invalid hardfork"); let mut hfs = HardForkState { config, @@ -165,15 +167,15 @@ async fn get_votes_in_range( ) -> Result { let mut votes = HFVotes::new(window_size); - let DatabaseResponse::BlockExtendedHeaderInRange(vote_list) = database - .oneshot(DatabaseRequest::BlockExtendedHeaderInRange(block_heights)) + let BCResponse::BlockExtendedHeaderInRange(vote_list) = database + .oneshot(BCReadRequest::BlockExtendedHeaderInRange(block_heights)) .await? else { panic!("Database sent incorrect response!"); }; for hf_info in vote_list.into_iter() { - votes.add_vote_for_hf(&hf_info.vote); + votes.add_vote_for_hf(&HardFork::from_vote(hf_info.vote)); } Ok(votes) diff --git a/consensus/src/context/rx_vms.rs b/consensus/src/context/rx_vms.rs index d91c74e..87e1de6 100644 --- a/consensus/src/context/rx_vms.rs +++ b/consensus/src/context/rx_vms.rs @@ -20,8 +20,9 @@ use cuprate_consensus_rules::{ HardFork, }; use cuprate_helper::asynch::rayon_spawn_async; +use cuprate_types::blockchain::{BCReadRequest, BCResponse}; -use crate::{Database, DatabaseRequest, DatabaseResponse, ExtendedConsensusError}; +use crate::{Database, ExtendedConsensusError}; /// The amount of randomX VMs to keep in the cache. const RX_SEEDS_CACHED: usize = 2; @@ -225,10 +226,8 @@ async fn get_block_hashes( for height in heights { let db = database.clone(); fut.push_back(async move { - let DatabaseResponse::BlockHash(hash) = db - .clone() - .oneshot(DatabaseRequest::BlockHash(height)) - .await? + let BCResponse::BlockHash(hash) = + db.clone().oneshot(BCReadRequest::BlockHash(height)).await? else { panic!("Database sent incorrect response!"); }; diff --git a/consensus/src/context/task.rs b/consensus/src/context/task.rs index 39654be..90e1de6 100644 --- a/consensus/src/context/task.rs +++ b/consensus/src/context/task.rs @@ -9,13 +9,14 @@ use tower::ServiceExt; use tracing::Instrument; use cuprate_consensus_rules::blocks::ContextToVerifyBlock; +use cuprate_types::blockchain::{BCReadRequest, BCResponse}; use super::{ difficulty, hardforks, rx_vms, weight, BlockChainContext, BlockChainContextRequest, BlockChainContextResponse, ContextConfig, RawBlockChainContext, ValidityToken, BLOCKCHAIN_TIMESTAMP_CHECK_WINDOW, }; -use crate::{Database, DatabaseRequest, DatabaseResponse, ExtendedConsensusError}; +use crate::{Database, ExtendedConsensusError}; /// A request from the context service to the context task. pub(super) struct ContextTaskRequest { @@ -69,19 +70,19 @@ impl ContextTask { tracing::debug!("Initialising blockchain context"); - let DatabaseResponse::ChainHeight(chain_height, top_block_hash) = database + let BCResponse::ChainHeight(chain_height, top_block_hash) = database .ready() .await? - .call(DatabaseRequest::ChainHeight) + .call(BCReadRequest::ChainHeight) .await? else { panic!("Database sent incorrect response!"); }; - let DatabaseResponse::GeneratedCoins(already_generated_coins) = database + let BCResponse::GeneratedCoins(already_generated_coins) = database .ready() .await? - .call(DatabaseRequest::GeneratedCoins) + .call(BCReadRequest::GeneratedCoins) .await? else { panic!("Database sent incorrect response!"); diff --git a/consensus/src/context/weight.rs b/consensus/src/context/weight.rs index 9636ddb..2511c59 100644 --- a/consensus/src/context/weight.rs +++ b/consensus/src/context/weight.rs @@ -18,8 +18,9 @@ use tracing::instrument; use cuprate_consensus_rules::blocks::{penalty_free_zone, PENALTY_FREE_ZONE_5}; use cuprate_helper::{asynch::rayon_spawn_async, num::median}; +use cuprate_types::blockchain::{BCReadRequest, BCResponse}; -use crate::{Database, DatabaseRequest, DatabaseResponse, ExtendedConsensusError, HardFork}; +use crate::{Database, ExtendedConsensusError, HardFork}; /// The short term block weight window. const SHORT_TERM_WINDOW: u64 = 100; @@ -292,8 +293,8 @@ async fn get_blocks_weight_in_range( ) -> Result, ExtendedConsensusError> { tracing::info!("getting block weights."); - let DatabaseResponse::BlockExtendedHeaderInRange(ext_headers) = database - .oneshot(DatabaseRequest::BlockExtendedHeaderInRange(range)) + let BCResponse::BlockExtendedHeaderInRange(ext_headers) = database + .oneshot(BCReadRequest::BlockExtendedHeaderInRange(range)) .await? else { panic!("Database sent incorrect response!") @@ -313,8 +314,8 @@ async fn get_long_term_weight_in_range( ) -> Result, ExtendedConsensusError> { tracing::info!("getting block long term weights."); - let DatabaseResponse::BlockExtendedHeaderInRange(ext_headers) = database - .oneshot(DatabaseRequest::BlockExtendedHeaderInRange(range)) + let BCResponse::BlockExtendedHeaderInRange(ext_headers) = database + .oneshot(BCReadRequest::BlockExtendedHeaderInRange(range)) .await? else { panic!("Database sent incorrect response!") diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index c91a780..5b38983 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -7,15 +7,10 @@ //! - [`TxVerifierService`] Which handles transaction verification. //! //! This crate is generic over the database which is implemented as a [`tower::Service`]. To -//! implement a database you need to have a service which accepts [`DatabaseRequest`] and responds -//! with [`DatabaseResponse`]. +//! implement a database you need to have a service which accepts [`BCReadRequest`] and responds +//! with [`BCResponse`]. //! -use std::{ - collections::{HashMap, HashSet}, - future::Future, -}; - -use cuprate_consensus_rules::{transactions::OutputOnChain, ConsensusError, HardFork}; +use cuprate_consensus_rules::{ConsensusError, HardFork}; mod batch_verifier; pub mod block; @@ -24,16 +19,16 @@ pub mod context; mod tests; pub mod transactions; -pub use block::{ - BlockVerifierService, PrePreparedBlock, VerifiedBlockInformation, VerifyBlockRequest, - VerifyBlockResponse, -}; +pub use block::{BlockVerifierService, VerifyBlockRequest, VerifyBlockResponse}; pub use context::{ initialize_blockchain_context, BlockChainContext, BlockChainContextRequest, BlockChainContextResponse, BlockChainContextService, ContextConfig, }; pub use transactions::{TxVerifierService, VerifyTxRequest, VerifyTxResponse}; +// re-export. +pub use cuprate_types::blockchain::{BCReadRequest, BCResponse}; + /// An Error returned from one of the consensus services. #[derive(Debug, thiserror::Error)] pub enum ExtendedConsensusError { @@ -80,115 +75,36 @@ where Ok((block_svc, tx_svc)) } -/// An internal trait used to represent a database so we don't have to write [`tower::Service`] bounds -/// everywhere. -pub trait Database: - tower::Service< - DatabaseRequest, - Response = DatabaseResponse, - Error = tower::BoxError, - Future = Self::Future2, -> -{ - type Future2: Future> + Send + 'static; -} +use __private::Database; -impl> - Database for T -where - T::Future: Future> + Send + 'static, -{ - type Future2 = T::Future; -} +pub mod __private { + use std::future::Future; -/// An extended block header. -#[derive(Debug, Copy, Clone)] -pub struct ExtendedBlockHeader { - /// The blocks major version. - pub version: HardFork, - /// The blocks vote. - pub vote: HardFork, + use cuprate_types::blockchain::{BCReadRequest, BCResponse}; - /// The blocks timestamp. - pub timestamp: u64, - /// The blocks cumulative difficulty. - pub cumulative_difficulty: u128, - - /// The blocks weight. - pub block_weight: usize, - /// The blocks long term weight. - pub long_term_weight: usize, -} - -/// A database request to the database [`tower::Service`] -#[derive(Debug, Clone)] -pub enum DatabaseRequest { - /// A block extended header request. - /// Must return: [`DatabaseResponse::BlockExtendedHeader`] - BlockExtendedHeader(u64), - /// A block hash request. - /// Must return: [`DatabaseResponse::BlockHash`] - BlockHash(u64), - - /// Removes the block hashes that are not in the _main_ chain. + /// A type alias trait used to represent a database, so we don't have to write [`tower::Service`] bounds + /// everywhere. /// - /// This should filter (remove) hashes in alt-blocks as well. - FilterUnknownHashes(HashSet<[u8; 32]>), + /// Automatically implemented for: + /// ```ignore + /// tower::Service + /// ``` + pub trait Database: + tower::Service< + BCReadRequest, + Response = BCResponse, + Error = tower::BoxError, + Future = Self::Future2, + > + { + type Future2: Future> + Send + 'static; + } - /// A request for multiple block extended headers. - /// Must return: [`DatabaseResponse::BlockExtendedHeaderInRange`] - BlockExtendedHeaderInRange(std::ops::Range), - - /// A request for the chains height. - /// Must return: [`DatabaseResponse::ChainHeight`] - ChainHeight, - /// A request for the total amount of generated coins. - /// Must return: [`DatabaseResponse::GeneratedCoins`] - GeneratedCoins, - - /// A request for transaction outputs, this contains a map of amounts to amount indexes. - /// Must return: [`DatabaseResponse::Outputs`] - Outputs(HashMap>), - /// A request for the number of outputs with these amounts. - /// Must return: [`DatabaseResponse::NumberOutputsWithAmount`] - NumberOutputsWithAmount(Vec), - - /// A request to check if these key images are in the database. - /// Must return: [`DatabaseResponse::KeyImagesSpent`] - KeyImagesSpent(HashSet<[u8; 32]>), -} - -#[derive(Debug)] -pub enum DatabaseResponse { - /// A block extended header response. - BlockExtendedHeader(ExtendedBlockHeader), - /// A block hash response. - BlockHash([u8; 32]), - - FilteredHashes(HashSet<[u8; 32]>), - - /// A batch block extended header response. - BlockExtendedHeaderInRange(Vec), - - /// A chain height response. - /// Should contains the chains height and top block hash. - ChainHeight(u64, [u8; 32]), - /// Generated coins response. - /// Should contain the total amount of coins emitted in all block rewards. - GeneratedCoins(u64), - - /// Outputs response. - /// Should contain a map of (amounts, amount_idx) -> Output. - /// If an outputs requested does not exist this should *not* be an error, the output - /// should just be omitted from the map. - Outputs(HashMap>), - /// Number of outputs response. - /// Should contain a map of amounts -> numb outs. - /// If there are no outputs with that amount then the numb outs should be zero, *no* amounts - /// requested should be omitted. - NumberOutputsWithAmount(HashMap), - - /// Key images spent response. - /// returns true if key images are spent - KeyImagesSpent(bool), + impl> + crate::Database for T + where + T::Future: Future> + Send + 'static, + { + type Future2 = T::Future; + } } diff --git a/consensus/src/tests/mock_db.rs b/consensus/src/tests/mock_db.rs index d6e86f3..d1c6255 100644 --- a/consensus/src/tests/mock_db.rs +++ b/consensus/src/tests/mock_db.rs @@ -15,7 +15,12 @@ use proptest::{ use proptest_derive::Arbitrary; use tower::{BoxError, Service}; -use crate::{DatabaseRequest, DatabaseResponse, ExtendedBlockHeader, HardFork}; +use cuprate_types::{ + blockchain::{BCReadRequest, BCResponse}, + ExtendedBlockHeader, +}; + +use crate::HardFork; prop_compose! { /// Generates an arbitrary full [`DummyDatabase`], it is not safe to do consensus checks on the returned database @@ -56,8 +61,8 @@ pub struct DummyBlockExtendedHeader { impl From for ExtendedBlockHeader { fn from(value: DummyBlockExtendedHeader) -> Self { ExtendedBlockHeader { - version: value.version.unwrap_or(HardFork::V1), - vote: value.vote.unwrap_or(HardFork::V1), + version: value.version.unwrap_or(HardFork::V1) as u8, + vote: value.vote.unwrap_or(HardFork::V1) as u8, timestamp: value.timestamp.unwrap_or_default(), cumulative_difficulty: value.cumulative_difficulty.unwrap_or_default(), block_weight: value.block_weight.unwrap_or_default(), @@ -122,8 +127,8 @@ pub struct DummyDatabase { dummy_height: Option, } -impl Service for DummyDatabase { - type Response = DatabaseResponse; +impl Service for DummyDatabase { + type Response = BCResponse; type Error = BoxError; type Future = Pin> + Send + 'static>>; @@ -132,13 +137,13 @@ impl Service for DummyDatabase { Poll::Ready(Ok(())) } - fn call(&mut self, req: DatabaseRequest) -> Self::Future { + fn call(&mut self, req: BCReadRequest) -> Self::Future { let blocks = self.blocks.clone(); let dummy_height = self.dummy_height; async move { Ok(match req { - DatabaseRequest::BlockExtendedHeader(id) => { + BCReadRequest::BlockExtendedHeader(id) => { let mut id = usize::try_from(id).unwrap(); if let Some(dummy_height) = dummy_height { let block_len = blocks.read().unwrap().len(); @@ -146,7 +151,7 @@ impl Service for DummyDatabase { id -= dummy_height - block_len; } - DatabaseResponse::BlockExtendedHeader( + BCResponse::BlockExtendedHeader( blocks .read() .unwrap() @@ -156,12 +161,12 @@ impl Service for DummyDatabase { .ok_or("block not in database!")?, ) } - DatabaseRequest::BlockHash(id) => { + BCReadRequest::BlockHash(id) => { let mut hash = [0; 32]; hash[0..8].copy_from_slice(&id.to_le_bytes()); - DatabaseResponse::BlockHash(hash) + BCResponse::BlockHash(hash) } - DatabaseRequest::BlockExtendedHeaderInRange(range) => { + BCReadRequest::BlockExtendedHeaderInRange(range) => { let mut end = usize::try_from(range.end).unwrap(); let mut start = usize::try_from(range.start).unwrap(); @@ -172,7 +177,7 @@ impl Service for DummyDatabase { start -= dummy_height - block_len; } - DatabaseResponse::BlockExtendedHeaderInRange( + BCResponse::BlockExtendedHeaderInRange( blocks .read() .unwrap() @@ -184,7 +189,7 @@ impl Service for DummyDatabase { .collect(), ) } - DatabaseRequest::ChainHeight => { + BCReadRequest::ChainHeight => { let height: u64 = dummy_height .unwrap_or(blocks.read().unwrap().len()) .try_into() @@ -193,9 +198,9 @@ impl Service for DummyDatabase { let mut top_hash = [0; 32]; top_hash[0..8].copy_from_slice(&height.to_le_bytes()); - DatabaseResponse::ChainHeight(height, top_hash) + BCResponse::ChainHeight(height, top_hash) } - DatabaseRequest::GeneratedCoins => DatabaseResponse::GeneratedCoins(0), + BCReadRequest::GeneratedCoins => BCResponse::GeneratedCoins(0), _ => unimplemented!("the context svc should not need these requests!"), }) } diff --git a/consensus/src/transactions.rs b/consensus/src/transactions.rs index cf1a990..eefe253 100644 --- a/consensus/src/transactions.rs +++ b/consensus/src/transactions.rs @@ -28,11 +28,12 @@ use cuprate_consensus_rules::{ ConsensusError, HardFork, TxVersion, }; use cuprate_helper::asynch::rayon_spawn_async; +use cuprate_types::blockchain::{BCReadRequest, BCResponse}; use crate::{ batch_verifier::MultiThreadedBatchVerifier, transactions::contextual_data::{batch_get_decoy_info, batch_get_ring_member_info}, - Database, DatabaseRequest, DatabaseResponse, ExtendedConsensusError, + Database, ExtendedConsensusError, }; pub mod contextual_data; @@ -129,8 +130,8 @@ pub enum VerifyTxRequest { /// Verifies a batch of prepared txs. Prepped { /// The transactions to verify. - // TODO: Can we use references to remove the outer `Arc`? probably wont play nicely with rayon_spawn_async though - txs: Arc<[Arc]>, + // TODO: Can we use references to remove the Vec? wont play nicely with Service though + txs: Vec>, /// The current chain height. current_chain_height: u64, /// The top block hash. @@ -159,7 +160,7 @@ pub enum VerifyTxRequest { /// A response from a verify transaction request. #[derive(Debug)] pub enum VerifyTxResponse { - OkPrepped(Arc<[Arc]>), + OkPrepped(Vec>), Ok, } @@ -227,7 +228,7 @@ where } => { verify_prepped_transactions( database, - txs, + &txs, current_chain_height, top_hash, time_for_time_lock, @@ -259,13 +260,13 @@ where let txs = rayon_spawn_async(|| { txs.into_par_iter() .map(|tx| TransactionVerificationData::new(tx).map(Arc::new)) - .collect::, _>>() + .collect::, _>>() }) .await?; verify_prepped_transactions( database, - txs.clone(), + &txs, current_chain_height, top_hash, time_for_time_lock, @@ -279,7 +280,7 @@ where #[instrument(name = "verify_txs", skip_all, fields(amt = txs.len()) level = "info")] async fn verify_prepped_transactions( mut database: D, - txs: Arc<[Arc]>, + txs: &[Arc], current_chain_height: u64, top_hash: [u8; 32], time_for_time_lock: u64, @@ -307,10 +308,10 @@ where }) })?; - let DatabaseResponse::KeyImagesSpent(kis_spent) = database + let BCResponse::KeyImagesSpent(kis_spent) = database .ready() .await? - .call(DatabaseRequest::KeyImagesSpent(spent_kis)) + .call(BCReadRequest::KeyImagesSpent(spent_kis)) .await? else { panic!("Database sent incorrect response!"); @@ -339,12 +340,10 @@ where if !verified_at_block_hashes.is_empty() { tracing::trace!("Filtering block hashes not in the main chain."); - let DatabaseResponse::FilteredHashes(known_hashes) = database + let BCResponse::FilterUnknownHashes(known_hashes) = database .ready() .await? - .call(DatabaseRequest::FilterUnknownHashes( - verified_at_block_hashes, - )) + .call(BCReadRequest::FilterUnknownHashes(verified_at_block_hashes)) .await? else { panic!("Database returned wrong response!"); @@ -378,7 +377,7 @@ where #[allow(clippy::type_complexity)] // I don't think the return is too complex fn transactions_needing_verification( - txs: Arc<[Arc]>, + txs: &[Arc], hashes_in_main_chain: HashSet<[u8; 32]>, current_hf: &HardFork, current_chain_height: u64, diff --git a/consensus/src/transactions/contextual_data.rs b/consensus/src/transactions/contextual_data.rs index 1cadca1..95e5262 100644 --- a/consensus/src/transactions/contextual_data.rs +++ b/consensus/src/transactions/contextual_data.rs @@ -15,21 +15,127 @@ use std::{ sync::Arc, }; -use monero_serai::transaction::Input; +use monero_serai::transaction::{Input, Timelock}; use tower::ServiceExt; use tracing::instrument; use cuprate_consensus_rules::{ transactions::{ - get_ring_members_for_inputs, insert_ring_member_ids, DecoyInfo, TxRingMembersInfo, + get_absolute_offsets, insert_ring_member_ids, DecoyInfo, Rings, TransactionError, + TxRingMembersInfo, }, - ConsensusError, HardFork, + ConsensusError, HardFork, TxVersion, +}; +use cuprate_types::{ + blockchain::{BCReadRequest, BCResponse}, + OutputOnChain, }; -use crate::{ - transactions::TransactionVerificationData, Database, DatabaseRequest, DatabaseResponse, - ExtendedConsensusError, -}; +use crate::{transactions::TransactionVerificationData, Database, ExtendedConsensusError}; + +/// Get the ring members for the inputs from the outputs on the chain. +/// +/// Will error if `outputs` does not contain the outputs needed. +fn get_ring_members_for_inputs( + get_outputs: impl Fn(u64, u64) -> Option, + inputs: &[Input], +) -> Result>, TransactionError> { + inputs + .iter() + .map(|inp| match inp { + Input::ToKey { + amount, + key_offsets, + .. + } => { + let offsets = get_absolute_offsets(key_offsets)?; + Ok(offsets + .iter() + .map(|offset| { + get_outputs(amount.unwrap_or(0), *offset) + .ok_or(TransactionError::RingMemberNotFoundOrInvalid) + }) + .collect::>()?) + } + _ => Err(TransactionError::IncorrectInputType), + }) + .collect::>() +} + +/// Construct a [`TxRingMembersInfo`] struct. +/// +/// The used outs must be all the ring members used in the transactions inputs. +pub fn new_ring_member_info( + used_outs: Vec>, + decoy_info: Option, + tx_version: TxVersion, +) -> Result { + Ok(TxRingMembersInfo { + youngest_used_out_height: used_outs + .iter() + .map(|inp_outs| { + inp_outs + .iter() + // the output with the highest height is the youngest + .map(|out| out.height) + .max() + .expect("Input must have ring members") + }) + .max() + .expect("Tx must have inputs"), + time_locked_outs: used_outs + .iter() + .flat_map(|inp_outs| { + inp_outs + .iter() + .filter_map(|out| match out.time_lock { + Timelock::None => None, + lock => Some(lock), + }) + .collect::>() + }) + .collect(), + rings: new_rings(used_outs, tx_version)?, + decoy_info, + }) +} + +/// Builds the [`Rings`] for the transaction inputs, from the given outputs. +fn new_rings( + outputs: Vec>, + tx_version: TxVersion, +) -> Result { + Ok(match tx_version { + TxVersion::RingSignatures => Rings::Legacy( + outputs + .into_iter() + .map(|inp_outs| { + inp_outs + .into_iter() + .map(|out| out.key.ok_or(TransactionError::RingMemberNotFoundOrInvalid)) + .collect::, TransactionError>>() + }) + .collect::, TransactionError>>()?, + ), + TxVersion::RingCT => Rings::RingCT( + outputs + .into_iter() + .map(|inp_outs| { + inp_outs + .into_iter() + .map(|out| { + Ok([ + out.key + .ok_or(TransactionError::RingMemberNotFoundOrInvalid)?, + out.commitment, + ]) + }) + .collect::>() + }) + .collect::>()?, + ), + }) +} /// Retrieves the [`TxRingMembersInfo`] for the inputted [`TransactionVerificationData`]. /// @@ -47,19 +153,19 @@ pub async fn batch_get_ring_member_info( .map_err(ConsensusError::Transaction)?; } - let DatabaseResponse::Outputs(outputs) = database + let BCResponse::Outputs(outputs) = database .ready() .await? - .call(DatabaseRequest::Outputs(output_ids)) + .call(BCReadRequest::Outputs(output_ids)) .await? else { panic!("Database sent incorrect response!") }; - let DatabaseResponse::NumberOutputsWithAmount(outputs_with_amount) = database + let BCResponse::NumberOutputsWithAmount(outputs_with_amount) = database .ready() .await? - .call(DatabaseRequest::NumberOutputsWithAmount( + .call(BCReadRequest::NumberOutputsWithAmount( outputs.keys().copied().collect(), )) .await? @@ -87,7 +193,7 @@ pub async fn batch_get_ring_member_info( None }; - TxRingMembersInfo::new(ring_members_for_tx, decoy_info, tx_v_data.version) + new_ring_member_info(ring_members_for_tx, decoy_info, tx_v_data.version) .map_err(ConsensusError::Transaction) }) .collect::>()?) @@ -128,10 +234,10 @@ pub async fn batch_get_decoy_info<'a, D: Database + Clone + Send + 'static>( unique_input_amounts.len() ); - let DatabaseResponse::NumberOutputsWithAmount(outputs_with_amount) = database + let BCResponse::NumberOutputsWithAmount(outputs_with_amount) = database .ready() .await? - .call(DatabaseRequest::NumberOutputsWithAmount( + .call(BCReadRequest::NumberOutputsWithAmount( unique_input_amounts.into_iter().collect(), )) .await? diff --git a/consensus/tests/verify_correct_txs.rs b/consensus/tests/verify_correct_txs.rs index 9640d27..b71b52d 100644 --- a/consensus/tests/verify_correct_txs.rs +++ b/consensus/tests/verify_correct_txs.rs @@ -9,23 +9,26 @@ use monero_serai::transaction::{Timelock, Transaction}; use tower::{service_fn, Service, ServiceExt}; use cuprate_consensus::{ - Database, DatabaseRequest, DatabaseResponse, TxVerifierService, VerifyTxRequest, - VerifyTxResponse, + TxVerifierService, VerifyTxRequest, VerifyTxResponse, __private::Database, +}; +use cuprate_types::{ + blockchain::{BCReadRequest, BCResponse}, + OutputOnChain, }; -use cuprate_consensus_rules::{transactions::OutputOnChain, HardFork}; +use cuprate_consensus_rules::HardFork; use cuprate_test_utils::data::TX_E2D393; fn dummy_database(outputs: BTreeMap) -> impl Database + Clone { let outputs = Arc::new(outputs); - service_fn(move |req: DatabaseRequest| { + service_fn(move |req: BCReadRequest| { ready(Ok(match req { - DatabaseRequest::NumberOutputsWithAmount(_) => { - DatabaseResponse::NumberOutputsWithAmount(HashMap::new()) + BCReadRequest::NumberOutputsWithAmount(_) => { + BCResponse::NumberOutputsWithAmount(HashMap::new()) } - DatabaseRequest::Outputs(outs) => { + BCReadRequest::Outputs(outs) => { let idxs = outs.get(&0).unwrap(); let mut ret = HashMap::new(); @@ -37,9 +40,9 @@ fn dummy_database(outputs: BTreeMap) -> impl Database + Clon .collect::>(), ); - DatabaseResponse::Outputs(ret) + BCResponse::Outputs(ret) } - DatabaseRequest::KeyImagesSpent(_) => DatabaseResponse::KeyImagesSpent(false), + BCReadRequest::KeyImagesSpent(_) => BCResponse::KeyImagesSpent(false), _ => panic!("Database request not needed for this test"), })) }) diff --git a/storage/cuprate-blockchain/Cargo.toml b/storage/cuprate-blockchain/Cargo.toml index bd61d59..e5b6bf6 100644 --- a/storage/cuprate-blockchain/Cargo.toml +++ b/storage/cuprate-blockchain/Cargo.toml @@ -26,7 +26,7 @@ cfg-if = { workspace = true } # We only need the `thread` feature if `service` is enabled. # Figure out how to enable features of an already pulled in dependency conditionally. cuprate-helper = { path = "../../helper", features = ["fs", "thread", "map"] } -cuprate-types = { path = "../../types", features = ["service"] } +cuprate-types = { path = "../../types", features = ["blockchain"] } curve25519-dalek = { workspace = true } monero-pruning = { path = "../../pruning" } monero-serai = { workspace = true, features = ["std"] } diff --git a/storage/cuprate-blockchain/src/service/mod.rs b/storage/cuprate-blockchain/src/service/mod.rs index a95276a..507f7fc 100644 --- a/storage/cuprate-blockchain/src/service/mod.rs +++ b/storage/cuprate-blockchain/src/service/mod.rs @@ -50,11 +50,11 @@ //! This channel can be `.await`ed upon to (eventually) receive //! the corresponding `Response` to your `Request`. //! -//! [req_r]: cuprate_types::service::ReadRequest +//! [req_r]: cuprate_types::blockchain::BCReadRequest //! -//! [req_w]: cuprate_types::service::WriteRequest +//! [req_w]: cuprate_types::blockchain::BCWriteRequest //! -//! [resp]: cuprate_types::service::Response +//! [resp]: cuprate_types::blockchain::BCResponse //! //! # Example //! Simple usage of `service`. @@ -63,7 +63,7 @@ //! use hex_literal::hex; //! use tower::{Service, ServiceExt}; //! -//! use cuprate_types::service::{ReadRequest, WriteRequest, Response}; +//! use cuprate_types::blockchain::{BCReadRequest, BCWriteRequest, BCResponse}; //! use cuprate_test_utils::data::block_v16_tx0; //! //! use cuprate_blockchain::{ConcreteEnv, config::ConfigBuilder, Env}; @@ -82,7 +82,7 @@ //! // Prepare a request to write block. //! let mut block = block_v16_tx0().clone(); //! # block.height = 0 as u64; // must be 0th height or panic in `add_block()` -//! let request = WriteRequest::WriteBlock(block); +//! let request = BCWriteRequest::WriteBlock(block); //! //! // Send the request. //! // We receive back an `async` channel that will @@ -92,16 +92,16 @@ //! //! // Block write was OK. //! let response = response_channel.await?; -//! assert_eq!(response, Response::WriteBlockOk); +//! assert_eq!(response, BCResponse::WriteBlockOk); //! //! // Now, let's try getting the block hash //! // of the block we just wrote. -//! let request = ReadRequest::BlockHash(0); +//! let request = BCReadRequest::BlockHash(0); //! let response_channel = read_handle.ready().await?.call(request); //! let response = response_channel.await?; //! assert_eq!( //! response, -//! Response::BlockHash( +//! BCResponse::BlockHash( //! hex!("43bd1f2b6556dcafa413d8372974af59e4e8f37dbf74dc6b2a9b7212d0577428") //! ) //! ); diff --git a/storage/cuprate-blockchain/src/service/read.rs b/storage/cuprate-blockchain/src/service/read.rs index e53c7f8..f8aafe8 100644 --- a/storage/cuprate-blockchain/src/service/read.rs +++ b/storage/cuprate-blockchain/src/service/read.rs @@ -15,13 +15,14 @@ use tokio_util::sync::PollSemaphore; use cuprate_helper::asynch::InfallibleOneshotReceiver; use cuprate_types::{ - service::{ReadRequest, Response}, + blockchain::{BCReadRequest, BCResponse}, ExtendedBlockHeader, OutputOnChain, }; use crate::{ config::ReaderThreads, error::RuntimeError, + ops::block::block_exists, ops::{ block::{get_block_extended_header_from_height, get_block_info}, blockchain::{cumulative_generated_coins, top_block_height}, @@ -30,6 +31,7 @@ use crate::{ }, service::types::{ResponseReceiver, ResponseResult, ResponseSender}, tables::{BlockHeights, BlockInfos, Tables}, + types::BlockHash, types::{Amount, AmountIndex, BlockHeight, KeyImage, PreRctOutputId}, ConcreteEnv, DatabaseRo, Env, EnvInner, }; @@ -40,9 +42,9 @@ use crate::{ /// This is cheaply [`Clone`]able handle that /// allows `async`hronously reading from the database. /// -/// Calling [`tower::Service::call`] with a [`DatabaseReadHandle`] & [`ReadRequest`] +/// Calling [`tower::Service::call`] with a [`DatabaseReadHandle`] & [`BCReadRequest`] /// will return an `async`hronous channel that can be `.await`ed upon -/// to receive the corresponding [`Response`]. +/// to receive the corresponding [`BCResponse`]. pub struct DatabaseReadHandle { /// Handle to the custom `rayon` DB reader thread-pool. /// @@ -131,8 +133,8 @@ impl DatabaseReadHandle { } } -impl tower::Service for DatabaseReadHandle { - type Response = Response; +impl tower::Service for DatabaseReadHandle { + type Response = BCResponse; type Error = RuntimeError; type Future = ResponseReceiver; @@ -152,7 +154,7 @@ impl tower::Service for DatabaseReadHandle { } #[inline] - fn call(&mut self, request: ReadRequest) -> Self::Future { + fn call(&mut self, request: BCReadRequest) -> Self::Future { let permit = self .permit .take() @@ -189,25 +191,26 @@ impl tower::Service for DatabaseReadHandle { /// The basic structure is: /// 1. `Request` is mapped to a handler function /// 2. Handler function is called -/// 3. [`Response`] is sent +/// 3. [`BCResponse`] is sent fn map_request( env: &ConcreteEnv, // Access to the database - request: ReadRequest, // The request we must fulfill + request: BCReadRequest, // The request we must fulfill response_sender: ResponseSender, // The channel we must send the response back to ) { - use ReadRequest as R; + use BCReadRequest as R; /* SOMEDAY: pre-request handling, run some code for each request? */ let response = match request { R::BlockExtendedHeader(block) => block_extended_header(env, block), R::BlockHash(block) => block_hash(env, block), + R::FilterUnknownHashes(hashes) => filter_unknown_hahses(env, hashes), R::BlockExtendedHeaderInRange(range) => block_extended_header_in_range(env, range), R::ChainHeight => chain_height(env), R::GeneratedCoins => generated_coins(env), R::Outputs(map) => outputs(env, map), R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(env, vec), - R::CheckKIsNotSpent(set) => check_k_is_not_spent(env, set), + R::KeyImagesSpent(set) => key_images_spent(env, set), }; if let Err(e) = response_sender.send(response) { @@ -286,7 +289,10 @@ macro_rules! get_tables { // FIXME: implement multi-transaction read atomicity. // . -/// [`ReadRequest::BlockExtendedHeader`]. +// TODO: The overhead of parallelism may be too much for every request, perfomace test to find optimal +// amount of parallelism. + +/// [`BCReadRequest::BlockExtendedHeader`]. #[inline] fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult { // Single-threaded, no `ThreadLocal` required. @@ -294,12 +300,12 @@ fn block_extended_header(env: &ConcreteEnv, block_height: BlockHeight) -> Respon let tx_ro = env_inner.tx_ro()?; let tables = env_inner.open_tables(&tx_ro)?; - Ok(Response::BlockExtendedHeader( + Ok(BCResponse::BlockExtendedHeader( get_block_extended_header_from_height(&block_height, &tables)?, )) } -/// [`ReadRequest::BlockHash`]. +/// [`BCReadRequest::BlockHash`]. #[inline] fn block_hash(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult { // Single-threaded, no `ThreadLocal` required. @@ -307,12 +313,40 @@ fn block_hash(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult { let tx_ro = env_inner.tx_ro()?; let table_block_infos = env_inner.open_db_ro::(&tx_ro)?; - Ok(Response::BlockHash( + Ok(BCResponse::BlockHash( get_block_info(&block_height, &table_block_infos)?.block_hash, )) } -/// [`ReadRequest::BlockExtendedHeaderInRange`]. +/// [`BCReadRequest::FilterUnknownHashes`]. +#[inline] +fn filter_unknown_hahses(env: &ConcreteEnv, mut hashes: HashSet) -> ResponseResult { + // Single-threaded, no `ThreadLocal` required. + let env_inner = env.env_inner(); + let tx_ro = env_inner.tx_ro()?; + + let table_block_heights = env_inner.open_db_ro::(&tx_ro)?; + + let mut err = None; + + hashes.retain( + |block_hash| match block_exists(block_hash, &table_block_heights) { + Ok(exists) => exists, + Err(e) => { + err.get_or_insert(e); + false + } + }, + ); + + if let Some(e) = err { + Err(e) + } else { + Ok(BCResponse::FilterUnknownHashes(hashes)) + } +} + +/// [`BCReadRequest::BlockExtendedHeaderInRange`]. #[inline] fn block_extended_header_in_range( env: &ConcreteEnv, @@ -333,10 +367,10 @@ fn block_extended_header_in_range( }) .collect::, RuntimeError>>()?; - Ok(Response::BlockExtendedHeaderInRange(vec)) + Ok(BCResponse::BlockExtendedHeaderInRange(vec)) } -/// [`ReadRequest::ChainHeight`]. +/// [`BCReadRequest::ChainHeight`]. #[inline] fn chain_height(env: &ConcreteEnv) -> ResponseResult { // Single-threaded, no `ThreadLocal` required. @@ -349,10 +383,10 @@ fn chain_height(env: &ConcreteEnv) -> ResponseResult { let block_hash = get_block_info(&chain_height.saturating_sub(1), &table_block_infos)?.block_hash; - Ok(Response::ChainHeight(chain_height, block_hash)) + Ok(BCResponse::ChainHeight(chain_height, block_hash)) } -/// [`ReadRequest::GeneratedCoins`]. +/// [`BCReadRequest::GeneratedCoins`]. #[inline] fn generated_coins(env: &ConcreteEnv) -> ResponseResult { // Single-threaded, no `ThreadLocal` required. @@ -363,13 +397,13 @@ fn generated_coins(env: &ConcreteEnv) -> ResponseResult { let top_height = top_block_height(&table_block_heights)?; - Ok(Response::GeneratedCoins(cumulative_generated_coins( + Ok(BCResponse::GeneratedCoins(cumulative_generated_coins( &top_height, &table_block_infos, )?)) } -/// [`ReadRequest::Outputs`]. +/// [`BCReadRequest::Outputs`]. #[inline] fn outputs(env: &ConcreteEnv, outputs: HashMap>) -> ResponseResult { // Prepare tx/tables in `ThreadLocal`. @@ -407,10 +441,10 @@ fn outputs(env: &ConcreteEnv, outputs: HashMap>) -> }) .collect::>, RuntimeError>>()?; - Ok(Response::Outputs(map)) + Ok(BCResponse::Outputs(map)) } -/// [`ReadRequest::NumberOutputsWithAmount`]. +/// [`BCReadRequest::NumberOutputsWithAmount`]. #[inline] fn number_outputs_with_amount(env: &ConcreteEnv, amounts: Vec) -> ResponseResult { // Prepare tx/tables in `ThreadLocal`. @@ -452,12 +486,12 @@ fn number_outputs_with_amount(env: &ConcreteEnv, amounts: Vec) -> Respon }) .collect::, RuntimeError>>()?; - Ok(Response::NumberOutputsWithAmount(map)) + Ok(BCResponse::NumberOutputsWithAmount(map)) } -/// [`ReadRequest::CheckKIsNotSpent`]. +/// [`BCReadRequest::KeyImagesSpent`]. #[inline] -fn check_k_is_not_spent(env: &ConcreteEnv, key_images: HashSet) -> ResponseResult { +fn key_images_spent(env: &ConcreteEnv, key_images: HashSet) -> ResponseResult { // Prepare tx/tables in `ThreadLocal`. let env_inner = env.env_inner(); let tx_ro = thread_local(env); @@ -486,8 +520,8 @@ fn check_k_is_not_spent(env: &ConcreteEnv, key_images: HashSet) -> Res // Else, `Ok(false)` will continue the iterator. .find_any(|result| !matches!(result, Ok(false))) { - None | Some(Ok(false)) => Ok(Response::CheckKIsNotSpent(true)), // Key image was NOT found. - Some(Ok(true)) => Ok(Response::CheckKIsNotSpent(false)), // Key image was found. + None | Some(Ok(false)) => Ok(BCResponse::KeyImagesSpent(false)), // Key image was NOT found. + Some(Ok(true)) => Ok(BCResponse::KeyImagesSpent(true)), // Key image was found. Some(Err(e)) => Err(e), // A database error occurred. } } diff --git a/storage/cuprate-blockchain/src/service/tests.rs b/storage/cuprate-blockchain/src/service/tests.rs index 77c10cd..1560dec 100644 --- a/storage/cuprate-blockchain/src/service/tests.rs +++ b/storage/cuprate-blockchain/src/service/tests.rs @@ -16,7 +16,7 @@ use tower::{Service, ServiceExt}; use cuprate_test_utils::data::{block_v16_tx0, block_v1_tx2, block_v9_tx3}; use cuprate_types::{ - service::{ReadRequest, Response, WriteRequest}, + blockchain::{BCReadRequest, BCResponse, BCWriteRequest}, OutputOnChain, VerifiedBlockInformation, }; @@ -81,10 +81,10 @@ async fn test_template( block.height = i as u64; // Request a block to be written, assert it was written. - let request = WriteRequest::WriteBlock(block); + let request = BCWriteRequest::WriteBlock(block); let response_channel = writer.call(request); let response = response_channel.await.unwrap(); - assert_eq!(response, Response::WriteBlockOk); + assert_eq!(response, BCResponse::WriteBlockOk); } //----------------------------------------------------------------------- Reset the transaction @@ -100,36 +100,36 @@ async fn test_template( // Next few lines are just for preparing the expected responses, // see further below for usage. - let extended_block_header_0 = Ok(Response::BlockExtendedHeader( + let extended_block_header_0 = Ok(BCResponse::BlockExtendedHeader( get_block_extended_header_from_height(&0, &tables).unwrap(), )); let extended_block_header_1 = if block_fns.len() > 1 { - Ok(Response::BlockExtendedHeader( + Ok(BCResponse::BlockExtendedHeader( get_block_extended_header_from_height(&1, &tables).unwrap(), )) } else { Err(RuntimeError::KeyNotFound) }; - let block_hash_0 = Ok(Response::BlockHash( + let block_hash_0 = Ok(BCResponse::BlockHash( get_block_info(&0, tables.block_infos()).unwrap().block_hash, )); let block_hash_1 = if block_fns.len() > 1 { - Ok(Response::BlockHash( + Ok(BCResponse::BlockHash( get_block_info(&1, tables.block_infos()).unwrap().block_hash, )) } else { Err(RuntimeError::KeyNotFound) }; - let range_0_1 = Ok(Response::BlockExtendedHeaderInRange(vec![ + let range_0_1 = Ok(BCResponse::BlockExtendedHeaderInRange(vec![ get_block_extended_header_from_height(&0, &tables).unwrap(), ])); let range_0_2 = if block_fns.len() >= 2 { - Ok(Response::BlockExtendedHeaderInRange(vec![ + Ok(BCResponse::BlockExtendedHeaderInRange(vec![ get_block_extended_header_from_height(&0, &tables).unwrap(), get_block_extended_header_from_height(&1, &tables).unwrap(), ])) @@ -140,10 +140,10 @@ async fn test_template( let chain_height = { let height = chain_height(tables.block_heights()).unwrap(); let block_info = get_block_info(&height.saturating_sub(1), tables.block_infos()).unwrap(); - Ok(Response::ChainHeight(height, block_info.block_hash)) + Ok(BCResponse::ChainHeight(height, block_info.block_hash)) }; - let cumulative_generated_coins = Ok(Response::GeneratedCoins(cumulative_generated_coins)); + let cumulative_generated_coins = Ok(BCResponse::GeneratedCoins(cumulative_generated_coins)); let num_req = tables .outputs_iter() @@ -153,7 +153,7 @@ async fn test_template( .map(|key| key.amount) .collect::>(); - let num_resp = Ok(Response::NumberOutputsWithAmount( + let num_resp = Ok(BCResponse::NumberOutputsWithAmount( num_req .iter() .map(|amount| match tables.num_outputs().get(amount) { @@ -168,21 +168,27 @@ async fn test_template( // Contains a fake non-spent key-image. let ki_req = HashSet::from([[0; 32]]); - let ki_resp = Ok(Response::CheckKIsNotSpent(true)); + let ki_resp = Ok(BCResponse::KeyImagesSpent(false)); //----------------------------------------------------------------------- Assert expected response // Assert read requests lead to the expected responses. for (request, expected_response) in [ - (ReadRequest::BlockExtendedHeader(0), extended_block_header_0), - (ReadRequest::BlockExtendedHeader(1), extended_block_header_1), - (ReadRequest::BlockHash(0), block_hash_0), - (ReadRequest::BlockHash(1), block_hash_1), - (ReadRequest::BlockExtendedHeaderInRange(0..1), range_0_1), - (ReadRequest::BlockExtendedHeaderInRange(0..2), range_0_2), - (ReadRequest::ChainHeight, chain_height), - (ReadRequest::GeneratedCoins, cumulative_generated_coins), - (ReadRequest::NumberOutputsWithAmount(num_req), num_resp), - (ReadRequest::CheckKIsNotSpent(ki_req), ki_resp), + ( + BCReadRequest::BlockExtendedHeader(0), + extended_block_header_0, + ), + ( + BCReadRequest::BlockExtendedHeader(1), + extended_block_header_1, + ), + (BCReadRequest::BlockHash(0), block_hash_0), + (BCReadRequest::BlockHash(1), block_hash_1), + (BCReadRequest::BlockExtendedHeaderInRange(0..1), range_0_1), + (BCReadRequest::BlockExtendedHeaderInRange(0..2), range_0_2), + (BCReadRequest::ChainHeight, chain_height), + (BCReadRequest::GeneratedCoins, cumulative_generated_coins), + (BCReadRequest::NumberOutputsWithAmount(num_req), num_resp), + (BCReadRequest::KeyImagesSpent(ki_req), ki_resp), ] { let response = reader.clone().oneshot(request).await; println!("response: {response:#?}, expected_response: {expected_response:#?}"); @@ -196,10 +202,10 @@ async fn test_template( // Assert each key image we inserted comes back as "spent". for key_image in tables.key_images_iter().keys().unwrap() { let key_image = key_image.unwrap(); - let request = ReadRequest::CheckKIsNotSpent(HashSet::from([key_image])); + let request = BCReadRequest::KeyImagesSpent(HashSet::from([key_image])); let response = reader.clone().oneshot(request).await; println!("response: {response:#?}, key_image: {key_image:#?}"); - assert_eq!(response.unwrap(), Response::CheckKIsNotSpent(false)); + assert_eq!(response.unwrap(), BCResponse::KeyImagesSpent(true)); } //----------------------------------------------------------------------- Output checks @@ -260,10 +266,10 @@ async fn test_template( .collect::>(); // Send a request for every output we inserted before. - let request = ReadRequest::Outputs(map.clone()); + let request = BCReadRequest::Outputs(map.clone()); let response = reader.clone().oneshot(request).await; println!("Response::Outputs response: {response:#?}"); - let Ok(Response::Outputs(response)) = response else { + let Ok(BCResponse::Outputs(response)) = response else { panic!("{response:#?}") }; diff --git a/storage/cuprate-blockchain/src/service/types.rs b/storage/cuprate-blockchain/src/service/types.rs index 265bf42..08bc6ac 100644 --- a/storage/cuprate-blockchain/src/service/types.rs +++ b/storage/cuprate-blockchain/src/service/types.rs @@ -6,15 +6,15 @@ use futures::channel::oneshot::Sender; use cuprate_helper::asynch::InfallibleOneshotReceiver; -use cuprate_types::service::Response; +use cuprate_types::blockchain::BCResponse; use crate::error::RuntimeError; //---------------------------------------------------------------------------------------------------- Types /// The actual type of the response. /// -/// Either our [`Response`], or a database error occurred. -pub(super) type ResponseResult = Result; +/// Either our [`BCResponse`], or a database error occurred. +pub(super) type ResponseResult = Result; /// The `Receiver` channel that receives the read response. /// diff --git a/storage/cuprate-blockchain/src/service/write.rs b/storage/cuprate-blockchain/src/service/write.rs index d6747e9..8c2cc91 100644 --- a/storage/cuprate-blockchain/src/service/write.rs +++ b/storage/cuprate-blockchain/src/service/write.rs @@ -10,7 +10,7 @@ use futures::channel::oneshot; use cuprate_helper::asynch::InfallibleOneshotReceiver; use cuprate_types::{ - service::{Response, WriteRequest}, + blockchain::{BCResponse, BCWriteRequest}, VerifiedBlockInformation, }; @@ -33,15 +33,15 @@ const WRITER_THREAD_NAME: &str = concat!(module_path!(), "::DatabaseWriter"); /// it is not [`Clone`]able as there is only ever 1 place within Cuprate /// that writes. /// -/// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`] & [`WriteRequest`] +/// Calling [`tower::Service::call`] with a [`DatabaseWriteHandle`] & [`BCWriteRequest`] /// will return an `async`hronous channel that can be `.await`ed upon -/// to receive the corresponding [`Response`]. +/// to receive the corresponding [`BCResponse`]. #[derive(Debug)] pub struct DatabaseWriteHandle { /// Sender channel to the database write thread-pool. /// /// We provide the response channel for the thread-pool. - pub(super) sender: crossbeam::channel::Sender<(WriteRequest, ResponseSender)>, + pub(super) sender: crossbeam::channel::Sender<(BCWriteRequest, ResponseSender)>, } impl DatabaseWriteHandle { @@ -65,8 +65,8 @@ impl DatabaseWriteHandle { } } -impl tower::Service for DatabaseWriteHandle { - type Response = Response; +impl tower::Service for DatabaseWriteHandle { + type Response = BCResponse; type Error = RuntimeError; type Future = ResponseReceiver; @@ -76,7 +76,7 @@ impl tower::Service for DatabaseWriteHandle { } #[inline] - fn call(&mut self, request: WriteRequest) -> Self::Future { + fn call(&mut self, request: BCWriteRequest) -> Self::Future { // Response channel we `.await` on. let (response_sender, receiver) = oneshot::channel(); @@ -95,7 +95,7 @@ pub(super) struct DatabaseWriter { /// Any caller can send some requests to this channel. /// They send them alongside another `Response` channel, /// which we will eventually send to. - receiver: crossbeam::channel::Receiver<(WriteRequest, ResponseSender)>, + receiver: crossbeam::channel::Receiver<(BCWriteRequest, ResponseSender)>, /// Access to the database. env: Arc, @@ -153,7 +153,7 @@ impl DatabaseWriter { // FIXME: will there be more than 1 write request? // this won't have to be an enum. let response = match &request { - WriteRequest::WriteBlock(block) => write_block(&self.env, block), + BCWriteRequest::WriteBlock(block) => write_block(&self.env, block), }; // If the database needs to resize, do so. @@ -218,7 +218,7 @@ impl DatabaseWriter { // Each function will return the [`Response`] that we // should send back to the caller in [`map_request()`]. -/// [`WriteRequest::WriteBlock`]. +/// [`BCWriteRequest::WriteBlock`]. #[inline] fn write_block(env: &ConcreteEnv, block: &VerifiedBlockInformation) -> ResponseResult { let env_inner = env.env_inner(); @@ -232,7 +232,7 @@ fn write_block(env: &ConcreteEnv, block: &VerifiedBlockInformation) -> ResponseR match result { Ok(()) => { TxRw::commit(tx_rw)?; - Ok(Response::WriteBlockOk) + Ok(BCResponse::WriteBlockOk) } Err(e) => { // INVARIANT: ensure database atomicity by aborting diff --git a/test-utils/src/data/free.rs b/test-utils/src/data/free.rs index c2c2c82..e80bdda 100644 --- a/test-utils/src/data/free.rs +++ b/test-utils/src/data/free.rs @@ -6,13 +6,13 @@ )] //---------------------------------------------------------------------------------------------------- Import -use std::sync::{Arc, OnceLock}; +use std::sync::OnceLock; use hex_literal::hex; use monero_serai::{block::Block, transaction::Transaction}; use cuprate_helper::map::combine_low_high_bits_to_u128; -use cuprate_types::{TransactionVerificationData, VerifiedBlockInformation}; +use cuprate_types::{VerifiedBlockInformation, VerifiedTransactionInformation}; use crate::data::constants::{ BLOCK_43BD1F, BLOCK_5ECB7E, BLOCK_F91043, TX_2180A8, TX_3BC7FF, TX_84D48D, TX_9E3F73, @@ -20,14 +20,14 @@ use crate::data::constants::{ }; //---------------------------------------------------------------------------------------------------- Conversion -/// Converts `monero_serai`'s `Block` into a -/// `cuprate_types::VerifiedBlockInformation` (superset). +/// Converts [`monero_serai::Block`] into a +/// [`VerifiedBlockInformation`] (superset). /// /// To prevent pulling other code in order to actually calculate things /// (e.g. `pow_hash`), some information must be provided statically, /// this struct represents that data that must be provided. /// -/// Consider using `cuprate_test_utils::rpc` to get this data easily. +/// Consider using [`cuprate_test_utils::rpc`] to get this data easily. struct VerifiedBlockMap { block_blob: &'static [u8], pow_hash: [u8; 32], @@ -43,7 +43,7 @@ struct VerifiedBlockMap { } impl VerifiedBlockMap { - /// Turn the various static data bits in `self` into a `VerifiedBlockInformation`. + /// Turn the various static data bits in `self` into a [`VerifiedBlockInformation`]. /// /// Transactions are verified that they at least match the block's, /// although the correctness of data (whether this block actually existed or not) @@ -64,11 +64,7 @@ impl VerifiedBlockMap { let block_blob = block_blob.to_vec(); let block = Block::read(&mut block_blob.as_slice()).unwrap(); - let txs: Vec> = txs - .iter() - .map(to_tx_verification_data) - .map(Arc::new) - .collect(); + let txs = txs.iter().map(to_tx_verification_data).collect::>(); assert_eq!( txs.len(), @@ -101,11 +97,11 @@ impl VerifiedBlockMap { } } -// Same as [`VerifiedBlockMap`] but for [`TransactionVerificationData`]. -fn to_tx_verification_data(tx_blob: impl AsRef<[u8]>) -> TransactionVerificationData { +// Same as [`VerifiedBlockMap`] but for [`VerifiedTransactionInformation`]. +fn to_tx_verification_data(tx_blob: impl AsRef<[u8]>) -> VerifiedTransactionInformation { let tx_blob = tx_blob.as_ref().to_vec(); let tx = Transaction::read(&mut tx_blob.as_slice()).unwrap(); - TransactionVerificationData { + VerifiedTransactionInformation { tx_weight: tx.weight(), fee: tx.rct_signatures.base.fee, tx_hash: tx.hash(), @@ -239,7 +235,7 @@ verified_block_information_fn! { //---------------------------------------------------------------------------------------------------- Transactions /// Generate a transaction accessor function with this signature: -/// `fn() -> &'static TransactionVerificationData` +/// `fn() -> &'static VerifiedTransactionInformation` /// /// Same as [`verified_block_information_fn`] but for transactions. macro_rules! transaction_verification_data_fn { @@ -249,7 +245,7 @@ macro_rules! transaction_verification_data_fn { weight: $weight:literal, // Transaction weight hash: $hash:literal, // Transaction hash as a string literal ) => { - #[doc = concat!("Return [`", stringify!($tx_blob), "`] as a [`TransactionVerificationData`].")] + #[doc = concat!("Return [`", stringify!($tx_blob), "`] as a [`VerifiedTransactionInformation`].")] /// /// ```rust #[doc = "# use cuprate_test_utils::data::*;"] @@ -261,8 +257,8 @@ macro_rules! transaction_verification_data_fn { #[doc = concat!("assert_eq!(tx.tx_hash, hex!(\"", $hash, "\"));")] #[doc = "assert_eq!(tx.fee, tx.tx.rct_signatures.base.fee);"] /// ``` - pub fn $fn_name() -> &'static TransactionVerificationData { - static TX: OnceLock = OnceLock::new(); + pub fn $fn_name() -> &'static VerifiedTransactionInformation { + static TX: OnceLock = OnceLock::new(); TX.get_or_init(|| to_tx_verification_data($tx_blob)) } }; @@ -319,8 +315,8 @@ mod tests { let mut txs = [block_v1_tx2(), block_v9_tx3(), block_v16_tx0()] .into_iter() - .flat_map(|block| block.txs.iter().map(|arc| (**arc).clone())) - .collect::>(); + .flat_map(|block| block.txs.iter().cloned()) + .collect::>(); txs.extend([ tx_v1_sig0().clone(), @@ -333,7 +329,7 @@ mod tests { let tx_rpc = rpc .get_transaction_verification_data(&[tx.tx_hash]) .await - .collect::>() + .collect::>() .pop() .unwrap(); assert_eq!(tx, tx_rpc); diff --git a/test-utils/src/data/mod.rs b/test-utils/src/data/mod.rs index 03c4524..49ea89a 100644 --- a/test-utils/src/data/mod.rs +++ b/test-utils/src/data/mod.rs @@ -19,10 +19,10 @@ //! The free functions provide access to typed data found in `cuprate_types`: //! ```rust //! # use cuprate_test_utils::data::*; -//! use cuprate_types::{VerifiedBlockInformation, TransactionVerificationData}; +//! use cuprate_types::{VerifiedBlockInformation, VerifiedTransactionInformation}; //! //! let block: VerifiedBlockInformation = block_v16_tx0().clone(); -//! let tx: TransactionVerificationData = tx_v1_sig0().clone(); +//! let tx: VerifiedTransactionInformation = tx_v1_sig0().clone(); //! ``` mod constants; diff --git a/test-utils/src/rpc/client.rs b/test-utils/src/rpc/client.rs index 34d194c..22ae11f 100644 --- a/test-utils/src/rpc/client.rs +++ b/test-utils/src/rpc/client.rs @@ -1,8 +1,6 @@ //! HTTP RPC client. //---------------------------------------------------------------------------------------------------- Use -use std::sync::Arc; - use serde::Deserialize; use serde_json::json; use tokio::task::spawn_blocking; @@ -12,7 +10,7 @@ use monero_serai::{ rpc::{HttpRpc, Rpc}, }; -use cuprate_types::{TransactionVerificationData, VerifiedBlockInformation}; +use cuprate_types::{VerifiedBlockInformation, VerifiedTransactionInformation}; use crate::rpc::constants::LOCALHOST_RPC_URL; @@ -110,10 +108,9 @@ impl HttpRpcClient { .await .unwrap(); - let txs: Vec> = self + let txs: Vec = self .get_transaction_verification_data(&block.txs) .await - .map(Arc::new) .collect(); let block_header = result.block_header; @@ -152,7 +149,7 @@ impl HttpRpcClient { } } - /// Request data and map the response to a [`TransactionVerificationData`]. + /// Request data and map the response to a [`VerifiedTransactionInformation`]. /// /// # Panics /// This function will panic at any error point, e.g., @@ -160,7 +157,7 @@ impl HttpRpcClient { pub async fn get_transaction_verification_data<'a>( &self, tx_hashes: &'a [[u8; 32]], - ) -> impl Iterator + 'a { + ) -> impl Iterator + 'a { self.rpc .get_transactions(tx_hashes) .await @@ -170,7 +167,7 @@ impl HttpRpcClient { .map(|(i, tx)| { let tx_hash = tx.hash(); assert_eq!(tx_hash, tx_hashes[i]); - TransactionVerificationData { + VerifiedTransactionInformation { tx_blob: tx.serialize(), tx_weight: tx.weight(), tx_hash, diff --git a/types/Cargo.toml b/types/Cargo.toml index 8e69e8f..7f6b8f8 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -9,14 +9,11 @@ repository = "https://github.com/Cuprate/cuprate/tree/main/types" keywords = ["cuprate", "types"] [features] -default = ["service"] -service = [] +default = ["blockchain"] +blockchain = [] [dependencies] -borsh = { workspace = true, optional = true } -cfg-if = { workspace = true } curve25519-dalek = { workspace = true } monero-serai = { workspace = true } -serde = { workspace = true, optional = true } [dev-dependencies] \ No newline at end of file diff --git a/types/src/service.rs b/types/src/blockchain.rs similarity index 72% rename from types/src/service.rs rename to types/src/blockchain.rs index 078c846..42390f9 100644 --- a/types/src/service.rs +++ b/types/src/blockchain.rs @@ -1,10 +1,7 @@ -//! Database [`ReadRequest`]s, [`WriteRequest`]s, and [`Response`]s. -//! -//! See [`cuprate_database`](https://github.com/Cuprate/cuprate/blob/00c3692eac6b2669e74cfd8c9b41c7e704c779ad/database/src/service/mod.rs#L1-L59)'s -//! `service` module for more usage/documentation. +//! Database [`BCReadRequest`]s, [`BCWriteRequest`]s, and [`BCResponse`]s. //! //! Tests that assert particular requests lead to particular -//! responses are also tested in `cuprate_database`. +//! responses are also tested in Cuprate's blockchain database crate. //---------------------------------------------------------------------------------------------------- Import use std::{ @@ -20,18 +17,16 @@ use serde::{Deserialize, Serialize}; use crate::types::{ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation}; //---------------------------------------------------------------------------------------------------- ReadRequest -/// A read request to the database. +/// A read request to the blockchain database. /// -/// This pairs with [`Response`], where each variant here -/// matches in name with a `Response` variant. For example, -/// the proper response for a [`ReadRequest::BlockHash`] -/// would be a [`Response::BlockHash`]. +/// This pairs with [`BCResponse`], where each variant here +/// matches in name with a [`BCResponse`] variant. For example, +/// the proper response for a [`BCReadRequest::BlockHash`] +/// would be a [`BCResponse::BlockHash`]. /// /// See `Response` for the expected responses per `Request`. #[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))] -pub enum ReadRequest { +pub enum BCReadRequest { /// Request a block's extended header. /// /// The input is the block's height. @@ -42,6 +37,11 @@ pub enum ReadRequest { /// The input is the block's height. BlockHash(u64), + /// Removes the block hashes that are not in the _main_ chain. + /// + /// This should filter (remove) hashes in alt-blocks as well. + FilterUnknownHashes(HashSet<[u8; 32]>), + /// Request a range of block extended headers. /// /// The input is a range of block heights. @@ -86,18 +86,17 @@ pub enum ReadRequest { /// Check that all key images within a set arer not spent. /// /// Input is a set of key images. - CheckKIsNotSpent(HashSet<[u8; 32]>), + KeyImagesSpent(HashSet<[u8; 32]>), } //---------------------------------------------------------------------------------------------------- WriteRequest -/// A write request to the database. +/// A write request to the blockchain database. /// /// There is currently only 1 write request to the database, -/// as such, the only valid [`Response`] to this request is -/// the proper response for a [`Response::WriteBlockOk`]. +/// as such, the only valid [`BCResponse`] to this request is +/// the proper response for a [`BCResponse::WriteBlockOk`]. #[derive(Debug, Clone, PartialEq, Eq)] -// #[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))] -pub enum WriteRequest { +pub enum BCWriteRequest { /// Request that a block be written to the database. /// /// Input is an already verified block. @@ -109,60 +108,64 @@ pub enum WriteRequest { /// /// These are the data types returned when using sending a `Request`. /// -/// This pairs with [`ReadRequest`] and [`WriteRequest`], +/// This pairs with [`BCReadRequest`] and [`BCWriteRequest`], /// see those two for more info. #[derive(Debug, Clone, PartialEq, Eq)] -// #[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))] -pub enum Response { +pub enum BCResponse { //------------------------------------------------------ Reads - /// Response to [`ReadRequest::BlockExtendedHeader`]. + /// Response to [`BCReadRequest::BlockExtendedHeader`]. /// /// Inner value is the extended headed of the requested block. BlockExtendedHeader(ExtendedBlockHeader), - /// Response to [`ReadRequest::BlockHash`]. + /// Response to [`BCReadRequest::BlockHash`]. /// /// Inner value is the hash of the requested block. BlockHash([u8; 32]), - /// Response to [`ReadRequest::BlockExtendedHeaderInRange`]. + /// Response to [`BCReadRequest::FilterUnknownHashes`]. + /// + /// Inner value is the list of hashes that were in the main chain. + FilterUnknownHashes(HashSet<[u8; 32]>), + + /// Response to [`BCReadRequest::BlockExtendedHeaderInRange`]. /// /// Inner value is the list of extended header(s) of the requested block(s). BlockExtendedHeaderInRange(Vec), - /// Response to [`ReadRequest::ChainHeight`]. + /// Response to [`BCReadRequest::ChainHeight`]. /// /// Inner value is the chain height, and the top block's hash. ChainHeight(u64, [u8; 32]), - /// Response to [`ReadRequest::GeneratedCoins`]. + /// Response to [`BCReadRequest::GeneratedCoins`]. /// /// Inner value is the total amount of generated coins so far, in atomic units. GeneratedCoins(u64), - /// Response to [`ReadRequest::Outputs`]. + /// Response to [`BCReadRequest::Outputs`]. /// /// Inner value is all the outputs requested, /// associated with their amount and amount index. Outputs(HashMap>), - /// Response to [`ReadRequest::NumberOutputsWithAmount`]. + /// Response to [`BCReadRequest::NumberOutputsWithAmount`]. /// /// Inner value is a `HashMap` of all the outputs requested where: /// - Key = output amount /// - Value = count of outputs with the same amount NumberOutputsWithAmount(HashMap), - /// Response to [`ReadRequest::CheckKIsNotSpent`]. + /// Response to [`BCReadRequest::KeyImagesSpent`]. /// /// The inner value is `true` if _any_ of the key images - /// were spent (exited in the database already). + /// were spent (existed in the database already). /// /// The inner value is `false` if _none_ of the key images were spent. - CheckKIsNotSpent(bool), + KeyImagesSpent(bool), //------------------------------------------------------ Writes - /// Response to [`WriteRequest::WriteBlock`]. + /// Response to [`BCWriteRequest::WriteBlock`]. /// /// This response indicates that the requested block has /// successfully been written to the database without error. diff --git a/types/src/lib.rs b/types/src/lib.rs index 8c07790..2d161f7 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -3,8 +3,8 @@ //! This crate is a kitchen-sink for data types that are shared across `Cuprate`. //! //! # Features flags -//! The `service` module, containing `cuprate_database` request/response -//! types, must be enabled with the `service` feature (on by default). +//! The [`blockchain`] module, containing the blockchain database request/response +//! types, must be enabled with the `blockchain` feature (on by default). //---------------------------------------------------------------------------------------------------- Lints // Forbid lints. @@ -88,14 +88,11 @@ mod types; pub use types::{ - ExtendedBlockHeader, OutputOnChain, TransactionVerificationData, VerifiedBlockInformation, + ExtendedBlockHeader, OutputOnChain, VerifiedBlockInformation, VerifiedTransactionInformation, }; //---------------------------------------------------------------------------------------------------- Feature-gated -cfg_if::cfg_if! { - if #[cfg(feature = "service")] { - pub mod service; - } -} +#[cfg(feature = "blockchain")] +pub mod blockchain; //---------------------------------------------------------------------------------------------------- Private diff --git a/types/src/types.rs b/types/src/types.rs index aeef453..76ffd57 100644 --- a/types/src/types.rs +++ b/types/src/types.rs @@ -1,28 +1,17 @@ //! Various shared data types in Cuprate. //---------------------------------------------------------------------------------------------------- Import -use std::sync::Arc; - use curve25519_dalek::edwards::EdwardsPoint; use monero_serai::{ block::Block, transaction::{Timelock, Transaction}, }; -#[cfg(feature = "borsh")] -use borsh::{BorshDeserialize, BorshSerialize}; -#[cfg(feature = "serde")] -use serde::{Deserialize, Serialize}; - //---------------------------------------------------------------------------------------------------- ExtendedBlockHeader /// Extended header data of a block. /// /// This contains various metadata of a block, but not the block blob itself. -/// -/// For more definitions, see also: . #[derive(Copy, Clone, Default, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))] pub struct ExtendedBlockHeader { /// The block's major version. /// @@ -46,15 +35,12 @@ pub struct ExtendedBlockHeader { pub long_term_weight: usize, } -//---------------------------------------------------------------------------------------------------- TransactionVerificationData -/// Data needed to verify a transaction. +//---------------------------------------------------------------------------------------------------- VerifiedTransactionInformation +/// Verified information of a transaction. /// -/// This represents data that allows verification of a transaction, -/// although it doesn't mean it _has_ been verified. +/// This represents a transaction in a valid block. #[derive(Clone, Debug, PartialEq, Eq)] -// #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] // FIXME: monero_serai -// #[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))] -pub struct TransactionVerificationData { +pub struct VerifiedTransactionInformation { /// The transaction itself. pub tx: Transaction, /// The serialized byte form of [`Self::tx`]. @@ -77,11 +63,7 @@ pub struct TransactionVerificationData { /// Verified information of a block. /// /// This represents a block that has already been verified to be correct. -/// -/// For more definitions, see also: . #[derive(Clone, Debug, PartialEq, Eq)] -// #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] // FIXME: monero_serai -// #[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))] pub struct VerifiedBlockInformation { /// The block itself. pub block: Block, @@ -90,7 +72,7 @@ pub struct VerifiedBlockInformation { /// [`Block::serialize`]. pub block_blob: Vec, /// All the transactions in the block, excluding the [`Block::miner_tx`]. - pub txs: Vec>, + pub txs: Vec, /// The block's hash. /// /// [`Block::hash`]. @@ -111,9 +93,7 @@ pub struct VerifiedBlockInformation { //---------------------------------------------------------------------------------------------------- OutputOnChain /// An already existing transaction output. -#[derive(Clone, Debug, PartialEq, Eq)] -// #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] // FIXME: monero_serai -// #[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct OutputOnChain { /// The block height this output belongs to. pub height: u64, From 6df67bb9d3e30a3f4adfe143a234d40c4c369c53 Mon Sep 17 00:00:00 2001 From: hinto-janai Date: Wed, 5 Jun 2024 10:35:08 -0400 Subject: [PATCH 3/4] rpc: add initial crates (#146) * add `rpc/` crates * rpc: fix `monero-rpc-server` -> `monero-rpc-types` * rpc: add skeleton `rpc/json-rpc` * remove `cuprate-rpc-server`, add `json-rpc` --- Cargo.lock | 12 ++++++++++++ Cargo.toml | 3 +++ rpc/README.md | 5 ++++- rpc/cuprate-rpc-interface/Cargo.toml | 15 +++++++++++++++ rpc/cuprate-rpc-interface/src/lib.rs | 1 + rpc/json-rpc/Cargo.toml | 15 +++++++++++++++ rpc/json-rpc/src/lib.rs | 1 + rpc/monero-rpc-types/Cargo.toml | 15 +++++++++++++++ rpc/monero-rpc-types/src/lib.rs | 1 + 9 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 rpc/cuprate-rpc-interface/Cargo.toml create mode 100644 rpc/cuprate-rpc-interface/src/lib.rs create mode 100644 rpc/json-rpc/Cargo.toml create mode 100644 rpc/json-rpc/src/lib.rs create mode 100644 rpc/monero-rpc-types/Cargo.toml create mode 100644 rpc/monero-rpc-types/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index b02865d..0e507c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -527,6 +527,10 @@ dependencies = [ "tracing", ] +[[package]] +name = "cuprate-rpc-interface" +version = "0.0.0" + [[package]] name = "cuprate-test-utils" version = "0.1.0" @@ -1198,6 +1202,10 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json-rpc" +version = "0.0.0" + [[package]] name = "keccak" version = "0.1.5" @@ -1392,6 +1400,10 @@ dependencies = [ "thiserror", ] +[[package]] +name = "monero-rpc-types" +version = "0.0.0" + [[package]] name = "monero-serai" version = "0.1.4-alpha" diff --git a/Cargo.toml b/Cargo.toml index 4b9bd63..630f14d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,9 @@ members = [ "pruning", "test-utils", "types", + "rpc/json-rpc", + "rpc/monero-rpc-types", + "rpc/cuprate-rpc-interface", ] [profile.release] diff --git a/rpc/README.md b/rpc/README.md index 4640904..a93f2e0 100644 --- a/rpc/README.md +++ b/rpc/README.md @@ -1 +1,4 @@ -# TODO +# RPC +This directory contains Monero RPC types and Cuprate RPC's libraries. + + diff --git a/rpc/cuprate-rpc-interface/Cargo.toml b/rpc/cuprate-rpc-interface/Cargo.toml new file mode 100644 index 0000000..47af5cd --- /dev/null +++ b/rpc/cuprate-rpc-interface/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "cuprate-rpc-interface" +version = "0.0.0" +edition = "2021" +description = "Cuprate's RPC interface library" +license = "MIT" +authors = ["hinto-janai"] +repository = "https://github.com/Cuprate/cuprate/tree/main/rpc/cuprate-rpc-interface" +keywords = ["cuprate", "rpc", "interface"] + +[features] + +[dependencies] + +[dev-dependencies] diff --git a/rpc/cuprate-rpc-interface/src/lib.rs b/rpc/cuprate-rpc-interface/src/lib.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/rpc/cuprate-rpc-interface/src/lib.rs @@ -0,0 +1 @@ + diff --git a/rpc/json-rpc/Cargo.toml b/rpc/json-rpc/Cargo.toml new file mode 100644 index 0000000..2425cc6 --- /dev/null +++ b/rpc/json-rpc/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "json-rpc" +version = "0.0.0" +edition = "2021" +description = "JSON-RPC 2.0 implementation" +license = "MIT" +authors = ["hinto-janai"] +repository = "https://github.com/Cuprate/cuprate/tree/main/rpc/json-rpc" +keywords = ["json", "rpc"] + +[features] + +[dependencies] + +[dev-dependencies] diff --git a/rpc/json-rpc/src/lib.rs b/rpc/json-rpc/src/lib.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/rpc/json-rpc/src/lib.rs @@ -0,0 +1 @@ + diff --git a/rpc/monero-rpc-types/Cargo.toml b/rpc/monero-rpc-types/Cargo.toml new file mode 100644 index 0000000..a32eedb --- /dev/null +++ b/rpc/monero-rpc-types/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "monero-rpc-types" +version = "0.0.0" +edition = "2021" +description = "Monero RPC types" +license = "MIT" +authors = ["hinto-janai"] +repository = "https://github.com/Cuprate/cuprate/tree/main/rpc/monero-rpc-types" +keywords = ["monero", "rpc", "types"] + +[features] + +[dependencies] + +[dev-dependencies] diff --git a/rpc/monero-rpc-types/src/lib.rs b/rpc/monero-rpc-types/src/lib.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/rpc/monero-rpc-types/src/lib.rs @@ -0,0 +1 @@ + From 07f61bdb9c365c5f51e3fd0254c5ca1b6a186c38 Mon Sep 17 00:00:00 2001 From: Boog900 Date: Fri, 7 Jun 2024 12:34:53 +0000 Subject: [PATCH 4/4] Consensus: fix panic in batch verifier (#152) * fix panic in batch verifier * docs * review comments * Update consensus/rules/src/batch_verifier.rs Co-authored-by: hinto-janai --------- Co-authored-by: hinto-janai --- Cargo.lock | 61 ++++++++++++++++++++- consensus/rules/src/batch_verifier.rs | 29 ++++++++++ consensus/rules/src/lib.rs | 1 + consensus/rules/src/transactions.rs | 6 +- consensus/rules/src/transactions/ring_ct.rs | 11 ++-- consensus/src/batch_verifier.rs | 29 +++++----- consensus/src/transactions.rs | 27 ++++----- 7 files changed, 125 insertions(+), 39 deletions(-) create mode 100644 consensus/rules/src/batch_verifier.rs diff --git a/Cargo.lock b/Cargo.lock index 0e507c4..b89874d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,6 +50,16 @@ dependencies = [ "libc", ] +[[package]] +name = "async-buffer" +version = "0.1.0" +dependencies = [ + "futures", + "pin-project", + "thiserror", + "tokio", +] + [[package]] name = "async-lock" version = "3.3.0" @@ -525,6 +535,7 @@ dependencies = [ "tokio-util", "tower", "tracing", + "tracing-subscriber", ] [[package]] @@ -613,7 +624,7 @@ dependencies = [ ] [[package]] -name = "dandelion_tower" +name = "dandelion-tower" version = "0.1.0" dependencies = [ "futures", @@ -1463,6 +1474,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-traits" version = "0.2.18" @@ -1510,6 +1531,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "page_size" version = "0.6.0" @@ -2136,6 +2163,15 @@ dependencies = [ "keccak", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -2476,6 +2512,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", ] [[package]] @@ -2484,7 +2532,12 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", "tracing-core", + "tracing-log", ] [[package]] @@ -2543,6 +2596,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "version_check" version = "0.9.4" diff --git a/consensus/rules/src/batch_verifier.rs b/consensus/rules/src/batch_verifier.rs new file mode 100644 index 0000000..c8d3f10 --- /dev/null +++ b/consensus/rules/src/batch_verifier.rs @@ -0,0 +1,29 @@ +use multiexp::BatchVerifier as InternalBatchVerifier; + +/// This trait represents a batch verifier. +/// +/// A batch verifier is used to speed up verification by verifying multiple transactions together. +/// +/// Not all proofs can be batched and at its core it's intended to verify a series of statements are +/// each equivalent to zero. +pub trait BatchVerifier { + /// Queue a statement for batch verification. + /// + /// # Panics + /// This function may panic if `stmt` contains calls to `rayon`'s parallel iterators, e.g. `par_iter()`. + // TODO: remove the panics by adding a generic API upstream. + fn queue_statement( + &mut self, + stmt: impl FnOnce(&mut InternalBatchVerifier<(), dalek_ff_group::EdwardsPoint>) -> R, + ) -> R; +} + +// impl this for a single threaded batch verifier. +impl BatchVerifier for &'_ mut InternalBatchVerifier<(), dalek_ff_group::EdwardsPoint> { + fn queue_statement( + &mut self, + stmt: impl FnOnce(&mut InternalBatchVerifier<(), dalek_ff_group::EdwardsPoint>) -> R, + ) -> R { + stmt(self) + } +} diff --git a/consensus/rules/src/lib.rs b/consensus/rules/src/lib.rs index 4beeec9..3106cbb 100644 --- a/consensus/rules/src/lib.rs +++ b/consensus/rules/src/lib.rs @@ -1,5 +1,6 @@ use std::time::{SystemTime, UNIX_EPOCH}; +pub mod batch_verifier; pub mod blocks; mod decomposed_amount; pub mod genesis; diff --git a/consensus/rules/src/transactions.rs b/consensus/rules/src/transactions.rs index df7c6eb..9169708 100644 --- a/consensus/rules/src/transactions.rs +++ b/consensus/rules/src/transactions.rs @@ -3,10 +3,10 @@ use std::cmp::Ordering; use monero_serai::ringct::RctType; use monero_serai::transaction::{Input, Output, Timelock, Transaction}; -use multiexp::BatchVerifier; use crate::{ - blocks::penalty_free_zone, check_point_canonically_encoded, is_decomposed_amount, HardFork, + batch_verifier::BatchVerifier, blocks::penalty_free_zone, check_point_canonically_encoded, + is_decomposed_amount, HardFork, }; mod contextual_data; @@ -606,7 +606,7 @@ pub fn check_transaction_semantic( tx_weight: usize, tx_hash: &[u8; 32], hf: &HardFork, - verifier: &mut BatchVerifier<(), dalek_ff_group::EdwardsPoint>, + verifier: impl BatchVerifier, ) -> Result { // if tx_blob_size > MAX_TX_BLOB_SIZE diff --git a/consensus/rules/src/transactions/ring_ct.rs b/consensus/rules/src/transactions/ring_ct.rs index 8b64b02..38b56eb 100644 --- a/consensus/rules/src/transactions/ring_ct.rs +++ b/consensus/rules/src/transactions/ring_ct.rs @@ -9,12 +9,11 @@ use monero_serai::{ transaction::{Input, Transaction}, H, }; -use multiexp::BatchVerifier; use rand::thread_rng; #[cfg(feature = "rayon")] use rayon::prelude::*; -use crate::{transactions::Rings, try_par_iter, HardFork}; +use crate::{batch_verifier::BatchVerifier, transactions::Rings, try_par_iter, HardFork}; /// This constant contains the IDs of 2 transactions that should be allowed after the fork the ringCT /// type they used should be banned. @@ -91,7 +90,7 @@ fn simple_type_balances(rct_sig: &RctSignatures) -> Result<(), RingCTError> { /// fn check_output_range_proofs( rct_sig: &RctSignatures, - verifier: &mut BatchVerifier<(), dalek_ff_group::EdwardsPoint>, + mut verifier: impl BatchVerifier, ) -> Result<(), RingCTError> { let commitments = &rct_sig.base.commitments; @@ -109,7 +108,9 @@ fn check_output_range_proofs( }), RctPrunable::MlsagBulletproofs { bulletproofs, .. } | RctPrunable::Clsag { bulletproofs, .. } => { - if bulletproofs.batch_verify(&mut thread_rng(), verifier, (), commitments) { + if verifier.queue_statement(|verifier| { + bulletproofs.batch_verify(&mut thread_rng(), verifier, (), commitments) + }) { Ok(()) } else { Err(RingCTError::BulletproofsRangeInvalid) @@ -121,7 +122,7 @@ fn check_output_range_proofs( pub(crate) fn ring_ct_semantic_checks( tx: &Transaction, tx_hash: &[u8; 32], - verifier: &mut BatchVerifier<(), dalek_ff_group::EdwardsPoint>, + verifier: impl BatchVerifier, hf: &HardFork, ) -> Result<(), RingCTError> { let rct_type = tx.rct_signatures.rct_type(); diff --git a/consensus/src/batch_verifier.rs b/consensus/src/batch_verifier.rs index 877f164..44493a6 100644 --- a/consensus/src/batch_verifier.rs +++ b/consensus/src/batch_verifier.rs @@ -4,8 +4,6 @@ use multiexp::BatchVerifier as InternalBatchVerifier; use rayon::prelude::*; use thread_local::ThreadLocal; -use crate::ConsensusError; - /// A multithreaded batch verifier. pub struct MultiThreadedBatchVerifier { internal: ThreadLocal>>, @@ -19,19 +17,6 @@ impl MultiThreadedBatchVerifier { } } - pub fn queue_statement( - &self, - stmt: impl FnOnce( - &mut InternalBatchVerifier<(), dalek_ff_group::EdwardsPoint>, - ) -> Result, - ) -> Result { - let verifier_cell = self - .internal - .get_or(|| RefCell::new(InternalBatchVerifier::new(8))); - // TODO: this is not ok as a rayon par_iter could be called in stmt. - stmt(verifier_cell.borrow_mut().deref_mut()) - } - pub fn verify(self) -> bool { self.internal .into_iter() @@ -41,3 +26,17 @@ impl MultiThreadedBatchVerifier { .is_none() } } + +impl cuprate_consensus_rules::batch_verifier::BatchVerifier for &'_ MultiThreadedBatchVerifier { + fn queue_statement( + &mut self, + stmt: impl FnOnce(&mut InternalBatchVerifier<(), dalek_ff_group::EdwardsPoint>) -> R, + ) -> R { + let mut verifier = self + .internal + .get_or(|| RefCell::new(InternalBatchVerifier::new(32))) + .borrow_mut(); + + stmt(verifier.deref_mut()) + } +} diff --git a/consensus/src/transactions.rs b/consensus/src/transactions.rs index eefe253..417eb48 100644 --- a/consensus/src/transactions.rs +++ b/consensus/src/transactions.rs @@ -484,26 +484,23 @@ where batch_get_ring_member_info(txs.iter().map(|(tx, _)| tx), &hf, database).await?; rayon_spawn_async(move || { - let batch_veriifier = MultiThreadedBatchVerifier::new(rayon::current_num_threads()); + let batch_verifier = MultiThreadedBatchVerifier::new(rayon::current_num_threads()); txs.par_iter() .zip(txs_ring_member_info.par_iter()) .try_for_each(|((tx, verification_needed), ring)| { // do semantic validation if needed. if *verification_needed == VerificationNeeded::SemanticAndContextual { - batch_veriifier.queue_statement(|verifier| { - let fee = check_transaction_semantic( - &tx.tx, - tx.tx_blob.len(), - tx.tx_weight, - &tx.tx_hash, - &hf, - verifier, - )?; - // make sure monero-serai calculated the same fee. - assert_eq!(fee, tx.fee); - Ok(()) - })?; + let fee = check_transaction_semantic( + &tx.tx, + tx.tx_blob.len(), + tx.tx_weight, + &tx.tx_hash, + &hf, + &batch_verifier, + )?; + // make sure monero-serai calculated the same fee. + assert_eq!(fee, tx.fee); } // Both variants of `VerificationNeeded` require contextual validation. @@ -518,7 +515,7 @@ where Ok::<_, ConsensusError>(()) })?; - if !batch_veriifier.verify() { + if !batch_verifier.verify() { return Err(ExtendedConsensusError::OneOrMoreBatchVerificationStatementsInvalid); }