From f91be58a7f0dbf92908d8f0283a5a2fb30da6fad Mon Sep 17 00:00:00 2001 From: Boog900 Date: Mon, 1 Jul 2024 19:23:40 +0000 Subject: [PATCH 1/4] P2P: Fix freeze in D++ (#204) * Fix d++ router freeze * update docs * fix imports * fix clippy * Update p2p/dandelion-tower/src/lib.rs Co-authored-by: hinto-janai --------- Co-authored-by: hinto-janai --- p2p/dandelion-tower/Cargo.toml | 2 +- p2p/dandelion-tower/src/lib.rs | 8 +- p2p/dandelion-tower/src/router.rs | 106 +++++++++++++++------------ p2p/dandelion-tower/src/tests/mod.rs | 52 +++++++------ 4 files changed, 94 insertions(+), 74 deletions(-) diff --git a/p2p/dandelion-tower/Cargo.toml b/p2p/dandelion-tower/Cargo.toml index 5e2fec53..976dad60 100644 --- a/p2p/dandelion-tower/Cargo.toml +++ b/p2p/dandelion-tower/Cargo.toml @@ -10,7 +10,7 @@ default = ["txpool"] txpool = ["dep:rand_distr", "dep:tokio-util", "dep:tokio"] [dependencies] -tower = { workspace = true, features = ["discover", "util"] } +tower = { workspace = true, features = ["util"] } tracing = { workspace = true, features = ["std"] } futures = { workspace = true, features = ["std"] } diff --git a/p2p/dandelion-tower/src/lib.rs b/p2p/dandelion-tower/src/lib.rs index f162724f..aa622f30 100644 --- a/p2p/dandelion-tower/src/lib.rs +++ b/p2p/dandelion-tower/src/lib.rs @@ -26,9 +26,9 @@ //! The diffuse service should have a request of [`DiffuseRequest`](traits::DiffuseRequest) and it's error //! should be [`tower::BoxError`]. //! -//! ## Outbound Peer Discoverer +//! ## Outbound Peer TryStream //! -//! The outbound peer [`Discover`](tower::discover::Discover) should provide a stream of randomly selected outbound +//! The outbound peer [`TryStream`](futures::TryStream) should provide a stream of randomly selected outbound //! peers, these peers will then be used to route stem txs to. //! //! The peers will not be returned anywhere, so it is recommended to wrap them in some sort of drop guard that returns @@ -37,10 +37,10 @@ //! ## Peer Service //! //! This service represents a connection to an individual peer, this should be returned from the Outbound Peer -//! Discover. This should immediately send the transaction to the peer when requested, i.e. it should _not_ set +//! TryStream. This should immediately send the transaction to the peer when requested, it should _not_ set //! a timer. //! -//! The diffuse service should have a request of [`StemRequest`](traits::StemRequest) and it's error +//! The peer service should have a request of [`StemRequest`](traits::StemRequest) and its error //! should be [`tower::BoxError`]. //! //! ## Backing Pool diff --git a/p2p/dandelion-tower/src/router.rs b/p2p/dandelion-tower/src/router.rs index 61e962c3..a64819a7 100644 --- a/p2p/dandelion-tower/src/router.rs +++ b/p2p/dandelion-tower/src/router.rs @@ -6,11 +6,10 @@ //! ### What The Router Does Not Do //! //! It does not handle anything to do with keeping transactions long term, i.e. embargo timers and handling -//! loops in the stem. It is up to implementers to do this if they decide not top use [`DandelionPool`](crate::pool::DandelionPool) +//! loops in the stem. It is up to implementers to do this if they decide not to use [`DandelionPool`](crate::pool::DandelionPool) //! use std::{ collections::HashMap, - future::Future, hash::Hash, marker::PhantomData, pin::Pin, @@ -18,12 +17,9 @@ use std::{ time::Instant, }; -use futures::TryFutureExt; +use futures::{future::BoxFuture, FutureExt, TryFutureExt, TryStream}; use rand::{distributions::Bernoulli, prelude::*, thread_rng}; -use tower::{ - discover::{Change, Discover}, - Service, -}; +use tower::Service; use crate::{ traits::{DiffuseRequest, StemRequest}, @@ -39,14 +35,22 @@ pub enum DandelionRouterError { /// The broadcast service returned an error. #[error("Broadcast service returned an err: {0}.")] BroadcastError(tower::BoxError), - /// The outbound peer discoverer returned an error, this is critical. - #[error("The outbound peer discoverer returned an err: {0}.")] - OutboundPeerDiscoverError(tower::BoxError), + /// The outbound peer stream returned an error, this is critical. + #[error("The outbound peer stream returned an err: {0}.")] + OutboundPeerStreamError(tower::BoxError), /// The outbound peer discoverer returned [`None`]. #[error("The outbound peer discoverer exited.")] OutboundPeerDiscoverExited, } +/// A response from an attempt to retrieve an outbound peer. +pub enum OutboundPeer { + /// A peer. + Peer(ID, T), + /// The peer store is exhausted and has no more to return. + Exhausted, +} + /// The dandelion++ state. #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum State { @@ -116,9 +120,11 @@ pub struct DandelionRouter { impl DandelionRouter where ID: Hash + Eq + Clone, - P: Discover, + P: TryStream, Error = tower::BoxError>, B: Service, Error = tower::BoxError>, + B::Future: Send + 'static, S: Service, Error = tower::BoxError>, + S::Future: Send + 'static, { /// Creates a new [`DandelionRouter`], with the provided services and config. /// @@ -165,15 +171,16 @@ where match ready!(self .outbound_peer_discover .as_mut() - .poll_discover(cx) - .map_err(DandelionRouterError::OutboundPeerDiscoverError)) + .try_poll_next(cx) + .map_err(DandelionRouterError::OutboundPeerStreamError)) .ok_or(DandelionRouterError::OutboundPeerDiscoverExited)?? { - Change::Insert(key, svc) => { + OutboundPeer::Peer(key, svc) => { self.stem_peers.insert(key, svc); } - Change::Remove(key) => { - self.stem_peers.remove(&key); + OutboundPeer::Exhausted => { + tracing::warn!("Failed to retrieve enough outbound peers for optimal dandelion++, privacy may be degraded."); + return Poll::Ready(Ok(())); } } } @@ -181,11 +188,24 @@ where Poll::Ready(Ok(())) } - fn fluff_tx(&mut self, tx: Tx) -> B::Future { - self.broadcast_svc.call(DiffuseRequest(tx)) + fn fluff_tx(&mut self, tx: Tx) -> BoxFuture<'static, Result> { + self.broadcast_svc + .call(DiffuseRequest(tx)) + .map_ok(|_| State::Fluff) + .map_err(DandelionRouterError::BroadcastError) + .boxed() } - fn stem_tx(&mut self, tx: Tx, from: ID) -> S::Future { + fn stem_tx( + &mut self, + tx: Tx, + from: ID, + ) -> BoxFuture<'static, Result> { + if self.stem_peers.is_empty() { + tracing::debug!("Stem peers are empty, fluffing stem transaction."); + return self.fluff_tx(tx); + } + loop { let stem_route = self.stem_routes.entry(from.clone()).or_insert_with(|| { self.stem_peers @@ -201,11 +221,20 @@ where continue; }; - return peer.call(StemRequest(tx)); + return peer + .call(StemRequest(tx)) + .map_ok(|_| State::Stem) + .map_err(DandelionRouterError::PeerError) + .boxed(); } } - fn stem_local_tx(&mut self, tx: Tx) -> S::Future { + fn stem_local_tx(&mut self, tx: Tx) -> BoxFuture<'static, Result> { + if self.stem_peers.is_empty() { + tracing::warn!("Stem peers are empty, no outbound connections to stem local tx to, fluffing instead, privacy will be degraded."); + return self.fluff_tx(tx); + } + loop { let stem_route = self.local_route.get_or_insert_with(|| { self.stem_peers @@ -221,7 +250,11 @@ where continue; }; - return peer.call(StemRequest(tx)); + return peer + .call(StemRequest(tx)) + .map_ok(|_| State::Stem) + .map_err(DandelionRouterError::PeerError) + .boxed(); } } } @@ -238,7 +271,7 @@ S: The Peer service - handles routing messages to a single node. impl Service> for DandelionRouter where ID: Hash + Eq + Clone, - P: Discover, + P: TryStream, Error = tower::BoxError>, B: Service, Error = tower::BoxError>, B::Future: Send + 'static, S: Service, Error = tower::BoxError>, @@ -246,8 +279,7 @@ where { type Response = State; type Error = DandelionRouterError; - type Future = - Pin> + Send + 'static>>; + type Future = BoxFuture<'static, Result>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { if self.epoch_start.elapsed() > self.config.epoch_duration { @@ -309,39 +341,23 @@ where tracing::trace!(parent: &self.span, "Handling route request."); match req.state { - TxState::Fluff => Box::pin( - self.fluff_tx(req.tx) - .map_ok(|_| State::Fluff) - .map_err(DandelionRouterError::BroadcastError), - ), + TxState::Fluff => self.fluff_tx(req.tx), TxState::Stem { from } => match self.current_state { State::Fluff => { tracing::debug!(parent: &self.span, "Fluffing stem tx."); - Box::pin( - self.fluff_tx(req.tx) - .map_ok(|_| State::Fluff) - .map_err(DandelionRouterError::BroadcastError), - ) + self.fluff_tx(req.tx) } State::Stem => { tracing::trace!(parent: &self.span, "Steming transaction"); - Box::pin( - self.stem_tx(req.tx, from) - .map_ok(|_| State::Stem) - .map_err(DandelionRouterError::PeerError), - ) + self.stem_tx(req.tx, from) } }, TxState::Local => { tracing::debug!(parent: &self.span, "Steming local tx."); - Box::pin( - self.stem_local_tx(req.tx) - .map_ok(|_| State::Stem) - .map_err(DandelionRouterError::PeerError), - ) + self.stem_local_tx(req.tx) } } } diff --git a/p2p/dandelion-tower/src/tests/mod.rs b/p2p/dandelion-tower/src/tests/mod.rs index 1f3ba3e8..d868a991 100644 --- a/p2p/dandelion-tower/src/tests/mod.rs +++ b/p2p/dandelion-tower/src/tests/mod.rs @@ -3,44 +3,48 @@ mod router; use std::{collections::HashMap, future::Future, hash::Hash, sync::Arc}; -use futures::TryStreamExt; +use futures::{Stream, StreamExt, TryStreamExt}; use tokio::sync::mpsc::{self, UnboundedReceiver}; -use tower::{ - discover::{Discover, ServiceList}, - util::service_fn, - Service, ServiceExt, -}; +use tower::{util::service_fn, Service, ServiceExt}; use crate::{ traits::{TxStoreRequest, TxStoreResponse}, - State, + OutboundPeer, State, }; pub fn mock_discover_svc() -> ( - impl Discover< - Key = usize, - Service = impl Service< - Req, - Future = impl Future> + Send + 'static, - Error = tower::BoxError, - > + Send - + 'static, - Error = tower::BoxError, + impl Stream< + Item = Result< + OutboundPeer< + usize, + impl Service< + Req, + Future = impl Future> + Send + 'static, + Error = tower::BoxError, + > + Send + + 'static, + >, + tower::BoxError, + >, >, - UnboundedReceiver<(u64, Req)>, + UnboundedReceiver<(usize, Req)>, ) { let (tx, rx) = mpsc::unbounded_channel(); - let discover = ServiceList::new((0..).map(move |i| { - let tx_2 = tx.clone(); + let discover = futures::stream::iter(0_usize..1_000_000) + .map(move |i| { + let tx_2 = tx.clone(); - service_fn(move |req| { - tx_2.send((i, req)).unwrap(); + Ok::<_, tower::BoxError>(OutboundPeer::Peer( + i, + service_fn(move |req| { + tx_2.send((i, req)).unwrap(); - async move { Ok::<(), tower::BoxError>(()) } + async move { Ok::<(), tower::BoxError>(()) } + }), + )) }) - })) - .map_err(Into::into); + .map_err(Into::into); (discover, rx) } From fb1f071faf57edb85d7bf637d533ee9ea029a3b7 Mon Sep 17 00:00:00 2001 From: Boog900 Date: Mon, 1 Jul 2024 19:23:59 +0000 Subject: [PATCH 2/4] P2P: fix block downloader test (#205) make sure timeout is non-zero --- p2p/p2p/src/block_downloader/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/p2p/src/block_downloader/tests.rs b/p2p/p2p/src/block_downloader/tests.rs index 07c30d5d..5d4225cd 100644 --- a/p2p/p2p/src/block_downloader/tests.rs +++ b/p2p/p2p/src/block_downloader/tests.rs @@ -95,7 +95,7 @@ prop_compose! { fn dummy_transaction_stragtegy(height: u64) ( extra in vec(any::(), 0..1_000), - timelock in 0_usize..50_000_000, + timelock in 1_usize..50_000_000, ) -> Transaction { Transaction { From 6ce177aeca26f4b4a5e077a998d03b0f3e9d961c Mon Sep 17 00:00:00 2001 From: hinto-janai Date: Mon, 1 Jul 2024 15:24:48 -0400 Subject: [PATCH 3/4] storage: add key sorting (#198) * database: modify `trait Key`, don't blanket impl * heed: create `KeyHeed` wrapper type * fix backend/tests * blockchain: `impl Key PreRctOutputId` * database: `StorableStr`, docs, tests * key: docs, cleanup * fixes * heed: simplify types * storable: remove doc * heed: use `INTEGER_KEY` instead of custom compare fn * add docs, tests * database: document `create_db` invariant * key: `Lexicographic` -> `Default` * redb: fix `clear_db` behavior * fix docs --- storage/blockchain/src/open_tables.rs | 5 +- storage/blockchain/src/types.rs | 4 +- storage/database/src/backend/heed/env.rs | 60 ++++-- storage/database/src/backend/heed/storable.rs | 55 +++++- storage/database/src/backend/heed/types.rs | 3 + storage/database/src/backend/redb/env.rs | 9 +- storage/database/src/backend/redb/storable.rs | 19 +- storage/database/src/backend/tests.rs | 52 +++-- storage/database/src/env.rs | 46 +++-- storage/database/src/key.rs | 177 +++++++++++++++--- storage/database/src/lib.rs | 4 +- storage/database/src/storable.rs | 65 ++++++- storage/database/src/tests.rs | 2 +- 13 files changed, 413 insertions(+), 88 deletions(-) diff --git a/storage/blockchain/src/open_tables.rs b/storage/blockchain/src/open_tables.rs index b98b86b1..4b265e8a 100644 --- a/storage/blockchain/src/open_tables.rs +++ b/storage/blockchain/src/open_tables.rs @@ -169,8 +169,9 @@ mod test { env_inner.open_tables(&tx_ro).unwrap(); } - /// Tests that directory [`cuprate_database::ConcreteEnv`] - /// usage does NOT create all tables. + /// Tests that direct usage of + /// [`cuprate_database::ConcreteEnv`] + /// does NOT create all tables. #[test] #[should_panic(expected = "`Result::unwrap()` on an `Err` value: TableNotFound")] fn test_no_tables_are_created() { diff --git a/storage/blockchain/src/types.rs b/storage/blockchain/src/types.rs index f9319442..a1f28f05 100644 --- a/storage/blockchain/src/types.rs +++ b/storage/blockchain/src/types.rs @@ -46,7 +46,7 @@ use bytemuck::{Pod, Zeroable}; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use cuprate_database::StorableVec; +use cuprate_database::{Key, StorableVec}; //---------------------------------------------------------------------------------------------------- Aliases // These type aliases exist as many Monero-related types are the exact same. @@ -143,6 +143,8 @@ pub struct PreRctOutputId { pub amount_index: AmountIndex, } +impl Key for PreRctOutputId {} + //---------------------------------------------------------------------------------------------------- BlockInfoV3 /// Block information. /// diff --git a/storage/database/src/backend/heed/env.rs b/storage/database/src/backend/heed/env.rs index 14f9777d..69e3b175 100644 --- a/storage/database/src/backend/heed/env.rs +++ b/storage/database/src/backend/heed/env.rs @@ -7,26 +7,23 @@ use std::{ sync::{RwLock, RwLockReadGuard}, }; -use heed::{EnvFlags, EnvOpenOptions}; +use heed::{DatabaseFlags, EnvFlags, EnvOpenOptions}; use crate::{ backend::heed::{ database::{HeedTableRo, HeedTableRw}, + storable::StorableHeed, types::HeedDb, }, config::{Config, SyncMode}, database::{DatabaseIter, DatabaseRo, DatabaseRw}, env::{Env, EnvInner}, error::{InitError, RuntimeError}, + key::{Key, KeyCompare}, resize::ResizeAlgorithm, table::Table, }; -//---------------------------------------------------------------------------------------------------- Consts -/// Panic message when there's a table missing. -const PANIC_MSG_MISSING_TABLE: &str = - "cuprate_database::Env should uphold the invariant that all tables are already created"; - //---------------------------------------------------------------------------------------------------- ConcreteEnv /// A strongly typed, concrete database environment, backed by `heed`. pub struct ConcreteEnv { @@ -268,6 +265,10 @@ where tx_ro: &heed::RoTxn<'env>, ) -> Result + DatabaseIter, RuntimeError> { // Open up a read-only database using our table's const metadata. + // + // INVARIANT: LMDB caches the ordering / comparison function from [`EnvInner::create_db`], + // and we're relying on that since we aren't setting that here. + // Ok(HeedTableRo { db: self .open_database(tx_ro, Some(T::NAME))? @@ -282,6 +283,10 @@ where tx_rw: &RefCell>, ) -> Result, RuntimeError> { // Open up a read/write database using our table's const metadata. + // + // INVARIANT: LMDB caches the ordering / comparison function from [`EnvInner::create_db`], + // and we're relying on that since we aren't setting that here. + // Ok(HeedTableRw { db: self.create_database(&mut tx_rw.borrow_mut(), Some(T::NAME))?, tx_rw, @@ -289,8 +294,33 @@ where } fn create_db(&self, tx_rw: &RefCell>) -> Result<(), RuntimeError> { - // INVARIANT: `heed` creates tables with `open_database` if they don't exist. - self.open_db_rw::(tx_rw)?; + // Create a database using our: + // - [`Table`]'s const metadata. + // - (potentially) our [`Key`] comparison function + let mut tx_rw = tx_rw.borrow_mut(); + let mut db = self.database_options(); + db.name(T::NAME); + + // Set the key comparison behavior. + match ::KEY_COMPARE { + // Use LMDB's default comparison function. + KeyCompare::Default => { + db.create(&mut tx_rw)?; + } + + // Instead of setting a custom [`heed::Comparator`], + // use this LMDB flag; it is ~10% faster. + KeyCompare::Number => { + db.flags(DatabaseFlags::INTEGER_KEY).create(&mut tx_rw)?; + } + + // Use a custom comparison function if specified. + KeyCompare::Custom(_) => { + db.key_comparator::>() + .create(&mut tx_rw)?; + } + } + Ok(()) } @@ -301,18 +331,18 @@ where ) -> Result<(), RuntimeError> { let tx_rw = tx_rw.get_mut(); - // Open the table first... + // Open the table. We don't care about flags or key + // comparison behavior since we're clearing it anyway. let db: HeedDb = self .open_database(tx_rw, Some(T::NAME))? - .expect(PANIC_MSG_MISSING_TABLE); + .ok_or(RuntimeError::TableNotFound)?; - // ...then clear it. - Ok(db.clear(tx_rw)?) + db.clear(tx_rw)?; + + Ok(()) } } //---------------------------------------------------------------------------------------------------- Tests #[cfg(test)] -mod test { - // use super::*; -} +mod tests {} diff --git a/storage/database/src/backend/heed/storable.rs b/storage/database/src/backend/heed/storable.rs index 83442212..3566e88f 100644 --- a/storage/database/src/backend/heed/storable.rs +++ b/storage/database/src/backend/heed/storable.rs @@ -1,11 +1,11 @@ //! `cuprate_database::Storable` <-> `heed` serde trait compatibility layer. //---------------------------------------------------------------------------------------------------- Use -use std::{borrow::Cow, marker::PhantomData}; +use std::{borrow::Cow, cmp::Ordering, marker::PhantomData}; use heed::{BoxedError, BytesDecode, BytesEncode}; -use crate::storable::Storable; +use crate::{storable::Storable, Key}; //---------------------------------------------------------------------------------------------------- StorableHeed /// The glue struct that implements `heed`'s (de)serialization @@ -16,7 +16,19 @@ pub(super) struct StorableHeed(PhantomData) where T: Storable + ?Sized; -//---------------------------------------------------------------------------------------------------- BytesDecode +//---------------------------------------------------------------------------------------------------- Key +// If `Key` is also implemented, this can act as the comparison function. +impl heed::Comparator for StorableHeed +where + T: Key, +{ + #[inline] + fn compare(a: &[u8], b: &[u8]) -> Ordering { + ::KEY_COMPARE.as_compare_fn::()(a, b) + } +} + +//---------------------------------------------------------------------------------------------------- BytesDecode/Encode impl<'a, T> BytesDecode<'a> for StorableHeed where T: Storable + 'static, @@ -30,7 +42,6 @@ where } } -//---------------------------------------------------------------------------------------------------- BytesEncode impl<'a, T> BytesEncode<'a> for StorableHeed where T: Storable + ?Sized + 'a, @@ -57,6 +68,42 @@ mod test { // - simplify trait bounds // - make sure the right function is being called + #[test] + /// Assert key comparison behavior is correct. + fn compare() { + fn test(left: T, right: T, expected: Ordering) + where + T: Key + Ord + 'static, + { + println!("left: {left:?}, right: {right:?}, expected: {expected:?}"); + assert_eq!( + as heed::Comparator>::compare( + & as heed::BytesEncode>::bytes_encode(&left).unwrap(), + & as heed::BytesEncode>::bytes_encode(&right).unwrap() + ), + expected + ); + } + + // Value comparison + test::(0, 255, Ordering::Less); + test::(0, 256, Ordering::Less); + test::(0, 256, Ordering::Less); + test::(0, 256, Ordering::Less); + test::(0, 256, Ordering::Less); + test::(0, 256, Ordering::Less); + test::(-1, 2, Ordering::Less); + test::(-1, 2, Ordering::Less); + test::(-1, 2, Ordering::Less); + test::(-1, 2, Ordering::Less); + test::(-1, 2, Ordering::Less); + test::(-1, 2, Ordering::Less); + + // Byte comparison + test::<[u8; 2]>([1, 1], [1, 0], Ordering::Greater); + test::<[u8; 3]>([1, 2, 3], [1, 2, 3], Ordering::Equal); + } + #[test] /// Assert `BytesEncode::bytes_encode` is accurate. fn bytes_encode() { diff --git a/storage/database/src/backend/heed/types.rs b/storage/database/src/backend/heed/types.rs index 6a99d0df..10f57e67 100644 --- a/storage/database/src/backend/heed/types.rs +++ b/storage/database/src/backend/heed/types.rs @@ -5,4 +5,7 @@ use crate::backend::heed::storable::StorableHeed; //---------------------------------------------------------------------------------------------------- Types /// The concrete database type for `heed`, usable for reads and writes. +// +// Key type Value type +// v v pub(super) type HeedDb = heed::Database, StorableHeed>; diff --git a/storage/database/src/backend/redb/env.rs b/storage/database/src/backend/redb/env.rs index 3ff195c1..65e3e059 100644 --- a/storage/database/src/backend/redb/env.rs +++ b/storage/database/src/backend/redb/env.rs @@ -189,7 +189,10 @@ where // 3. So it's not being used to open a table since that needs `&tx_rw` // // Reader-open tables do not affect this, if they're open the below is still OK. - redb::WriteTransaction::delete_table(tx_rw, table)?; + if !redb::WriteTransaction::delete_table(tx_rw, table)? { + return Err(RuntimeError::TableNotFound); + } + // Re-create the table. // `redb` creates tables if they don't exist, so this should never panic. redb::WriteTransaction::open_table(tx_rw, table)?; @@ -200,6 +203,4 @@ where //---------------------------------------------------------------------------------------------------- Tests #[cfg(test)] -mod test { - // use super::*; -} +mod tests {} diff --git a/storage/database/src/backend/redb/storable.rs b/storage/database/src/backend/redb/storable.rs index 6735fec0..abf2e71b 100644 --- a/storage/database/src/backend/redb/storable.rs +++ b/storage/database/src/backend/redb/storable.rs @@ -25,7 +25,7 @@ where { #[inline] fn compare(left: &[u8], right: &[u8]) -> Ordering { - ::compare(left, right) + ::KEY_COMPARE.as_compare_fn::()(left, right) } } @@ -93,8 +93,21 @@ mod test { ); } - test::(-1, 2, Ordering::Greater); // bytes are greater, not the value - test::(0, 1, Ordering::Less); + // Value comparison + test::(0, 255, Ordering::Less); + test::(0, 256, Ordering::Less); + test::(0, 256, Ordering::Less); + test::(0, 256, Ordering::Less); + test::(0, 256, Ordering::Less); + test::(0, 256, Ordering::Less); + test::(-1, 2, Ordering::Less); + test::(-1, 2, Ordering::Less); + test::(-1, 2, Ordering::Less); + test::(-1, 2, Ordering::Less); + test::(-1, 2, Ordering::Less); + test::(-1, 2, Ordering::Less); + + // Byte comparison test::<[u8; 2]>([1, 1], [1, 0], Ordering::Greater); test::<[u8; 3]>([1, 2, 3], [1, 2, 3], Ordering::Equal); } diff --git a/storage/database/src/backend/tests.rs b/storage/database/src/backend/tests.rs index df80b631..ac6b5927 100644 --- a/storage/database/src/backend/tests.rs +++ b/storage/database/src/backend/tests.rs @@ -156,6 +156,20 @@ fn non_manual_resize_2() { env.current_map_size(); } +/// Tests that [`EnvInner::clear_db`] will return +/// [`RuntimeError::TableNotFound`] if the table doesn't exist. +#[test] +fn clear_db_table_not_found() { + let (env, _tmpdir) = tmp_concrete_env(); + let env_inner = env.env_inner(); + let mut tx_rw = env_inner.tx_rw().unwrap(); + let err = env_inner.clear_db::(&mut tx_rw).unwrap_err(); + assert!(matches!(err, RuntimeError::TableNotFound)); + + env_inner.create_db::(&tx_rw).unwrap(); + env_inner.clear_db::(&mut tx_rw).unwrap(); +} + /// Test all `DatabaseR{o,w}` operations. #[test] fn db_read_write() { @@ -165,11 +179,11 @@ fn db_read_write() { let mut table = env_inner.open_db_rw::(&tx_rw).unwrap(); /// The (1st) key. - const KEY: u8 = 0; + const KEY: u32 = 0; /// The expected value. const VALUE: u64 = 0; /// How many `(key, value)` pairs will be inserted. - const N: u8 = 100; + const N: u32 = 100; /// Assert a u64 is the same as `VALUE`. fn assert_value(value: u64) { @@ -323,19 +337,35 @@ fn db_read_write() { /// Assert that `key`'s in database tables are sorted in /// an ordered B-Tree fashion, i.e. `min_value -> max_value`. +/// +/// And that it is true for integers, e.g. `0` -> `10`. #[test] fn tables_are_sorted() { let (env, _tmp) = tmp_concrete_env(); let env_inner = env.env_inner(); + + /// Range of keys to insert, `{0, 1, 2 ... 256}`. + const RANGE: std::ops::Range = 0..257; + + // Create tables and set flags / comparison flags. + { + let tx_rw = env_inner.tx_rw().unwrap(); + env_inner.create_db::(&tx_rw).unwrap(); + TxRw::commit(tx_rw).unwrap(); + } + let tx_rw = env_inner.tx_rw().unwrap(); let mut table = env_inner.open_db_rw::(&tx_rw).unwrap(); - // Insert `{5, 4, 3, 2, 1, 0}`, assert each new - // number inserted is the minimum `first()` value. - for key in (0..6).rev() { - table.put(&key, &123).unwrap(); + // Insert range, assert each new + // number inserted is the minimum `last()` value. + for key in RANGE { + table.put(&key, &0).unwrap(); + table.contains(&key).unwrap(); let (first, _) = table.first().unwrap(); - assert_eq!(first, key); + let (last, _) = table.last().unwrap(); + println!("first: {first}, last: {last}, key: {key}"); + assert_eq!(last, key); } drop(table); @@ -348,7 +378,7 @@ fn tables_are_sorted() { let table = env_inner.open_db_ro::(&tx_ro).unwrap(); let iter = table.iter().unwrap(); let keys = table.keys().unwrap(); - for ((i, iter), key) in (0..6).zip(iter).zip(keys) { + for ((i, iter), key) in RANGE.zip(iter).zip(keys) { let (iter, _) = iter.unwrap(); let key = key.unwrap(); assert_eq!(i, iter); @@ -359,14 +389,14 @@ fn tables_are_sorted() { let mut table = env_inner.open_db_rw::(&tx_rw).unwrap(); // Assert the `first()` values are the minimum, i.e. `{0, 1, 2}` - for key in 0..3 { + for key in [0, 1, 2] { let (first, _) = table.first().unwrap(); assert_eq!(first, key); table.delete(&key).unwrap(); } - // Assert the `last()` values are the maximum, i.e. `{5, 4, 3}` - for key in (3..6).rev() { + // Assert the `last()` values are the maximum, i.e. `{256, 255, 254}` + for key in [256, 255, 254] { let (last, _) = table.last().unwrap(); assert_eq!(last, key); table.delete(&key).unwrap(); diff --git a/storage/database/src/env.rs b/storage/database/src/env.rs index 8491f58c..291ac9de 100644 --- a/storage/database/src/env.rs +++ b/storage/database/src/env.rs @@ -175,18 +175,16 @@ pub trait Env: Sized { } //---------------------------------------------------------------------------------------------------- DatabaseRo -/// Document errors when opening tables in [`EnvInner`]. -macro_rules! doc_table_error { +/// Document the INVARIANT that the `heed` backend +/// must use [`EnvInner::create_db`] when initially +/// opening/creating tables. +macro_rules! doc_heed_create_db_invariant { () => { - r"# Errors -This will only return [`RuntimeError::Io`] on normal errors. + r#"The first time you open/create tables, you _must_ use [`EnvInner::create_db`] +to set the proper flags / [`Key`](crate::Key) comparison for the `heed` backend. -If the specified table is not created upon before this function is called, -this will return an error. - -Implementation detail you should NOT rely on: -- This only panics on `heed` -- `redb` will create the table if it does not exist" +Subsequent table opens will follow the flags/ordering, but only if +[`EnvInner::create_db`] was the _first_ function to open/create it."# }; } @@ -204,7 +202,13 @@ Implementation detail you should NOT rely on: /// Note that when opening tables with [`EnvInner::open_db_ro`], /// they must be created first or else it will return error. /// -/// See [`EnvInner::open_db_rw`] and [`EnvInner::create_db`] for creating tables. +/// Note that when opening tables with [`EnvInner::open_db_ro`], +/// they must be created first or else it will return error. +/// +/// See [`EnvInner::create_db`] for creating tables. +/// +/// # Invariant +#[doc = doc_heed_create_db_invariant!()] pub trait EnvInner<'env, Ro, Rw> where Self: 'env, @@ -243,6 +247,9 @@ where /// /// If the specified table is not created upon before this function is called, /// this will return [`RuntimeError::TableNotFound`]. + /// + /// # Invariant + #[doc = doc_heed_create_db_invariant!()] fn open_db_ro( &self, tx_ro: &Ro, @@ -262,18 +269,19 @@ where /// # Errors /// This will only return [`RuntimeError::Io`] on errors. /// - /// Implementation details: Both `heed` & `redb` backends create - /// the table with this function if it does not already exist. For safety and - /// clear intent, you should still consider using [`EnvInner::create_db`] instead. + /// # Invariant + #[doc = doc_heed_create_db_invariant!()] fn open_db_rw(&self, tx_rw: &Rw) -> Result, RuntimeError>; /// Create a database table. /// - /// This will create the database [`Table`] - /// passed as a generic to this function. + /// This will create the database [`Table`] passed as a generic to this function. /// /// # Errors /// This will only return [`RuntimeError::Io`] on errors. + /// + /// # Invariant + #[doc = doc_heed_create_db_invariant!()] fn create_db(&self, tx_rw: &Rw) -> Result<(), RuntimeError>; /// Clear all `(key, value)`'s from a database table. @@ -284,6 +292,10 @@ where /// Note that this operation is tied to `tx_rw`, as such this /// function's effects can be aborted using [`TxRw::abort`]. /// - #[doc = doc_table_error!()] + /// # Errors + /// This will return [`RuntimeError::Io`] on normal errors. + /// + /// If the specified table is not created upon before this function is called, + /// this will return [`RuntimeError::TableNotFound`]. fn clear_db(&self, tx_rw: &mut Rw) -> Result<(), RuntimeError>; } diff --git a/storage/database/src/key.rs b/storage/database/src/key.rs index 13f7cede..3273d4ed 100644 --- a/storage/database/src/key.rs +++ b/storage/database/src/key.rs @@ -1,54 +1,177 @@ //! Database key abstraction; `trait Key`. //---------------------------------------------------------------------------------------------------- Import -use std::cmp::Ordering; +use std::{cmp::Ordering, fmt::Debug}; -use crate::storable::Storable; +use crate::{storable::Storable, StorableBytes, StorableStr, StorableVec}; //---------------------------------------------------------------------------------------------------- Table /// Database [`Table`](crate::table::Table) key metadata. /// /// Purely compile time information for database table keys. -// -// FIXME: this doesn't need to exist right now but -// may be used if we implement getting values using ranges. -// -pub trait Key: Storable + Sized { - /// The primary key type. - type Primary: Storable; - +/// +/// ## Comparison +/// There are 2 differences between [`Key`] and [`Storable`]: +/// 1. [`Key`] must be [`Sized`] +/// 2. [`Key`] represents a [`Storable`] type that defines a comparison function +/// +/// The database backends will use [`Key::KEY_COMPARE`] +/// to sort the keys within database tables. +/// +/// [`Key::KEY_COMPARE`] is pre-implemented as a straight byte comparison. +/// +/// This default is overridden for numbers, which use a number comparison. +/// For example, [`u64`] keys are sorted as `{0, 1, 2 ... 999_998, 999_999, 1_000_000}`. +/// +/// If you would like to re-define this for number types, consider; +/// 1. Creating a wrapper type around primitives like a `struct SortU8(pub u8)` +/// 2. Implement [`Key`] on that wrapper +/// 3. Define a custom [`Key::KEY_COMPARE`] +pub trait Key: Storable + Sized + Ord { /// Compare 2 [`Key`]'s against each other. /// - /// By default, this does a straight _byte_ comparison, - /// not a comparison of the key's value. + /// # Defaults for types + /// For arrays and vectors that contain a `T: Storable`, + /// this does a straight _byte_ comparison, not a comparison of the key's value. /// + /// For [`StorableStr`], this will use [`str::cmp`], i.e. it is the same as the default behavior; it is a + /// [lexicographical comparison](https://doc.rust-lang.org/std/cmp/trait.Ord.html#lexicographical-comparison) + /// + /// For all primitive number types ([`u8`], [`i128`], etc), this will + /// convert the bytes to the number using [`Storable::from_bytes`], + /// then do a number comparison. + /// + /// # Example /// ```rust /// # use cuprate_database::*; + /// // Normal byte comparison. + /// let vec1 = StorableVec(vec![0, 1]); + /// let vec2 = StorableVec(vec![255, 0]); /// assert_eq!( - /// ::compare([0].as_slice(), [1].as_slice()), + /// as Key>::KEY_COMPARE + /// .as_compare_fn::>()(&vec1, &vec2), /// std::cmp::Ordering::Less, /// ); + /// + /// // Integer comparison. + /// let byte1 = [0, 1]; // 256 + /// let byte2 = [255, 0]; // 255 + /// let num1 = u16::from_le_bytes(byte1); + /// let num2 = u16::from_le_bytes(byte2); + /// assert_eq!(num1, 256); + /// assert_eq!(num2, 255); /// assert_eq!( - /// ::compare([1].as_slice(), [1].as_slice()), - /// std::cmp::Ordering::Equal, - /// ); - /// assert_eq!( - /// ::compare([2].as_slice(), [1].as_slice()), + /// // 256 > 255 + /// ::KEY_COMPARE.as_compare_fn::()(&byte1, &byte2), /// std::cmp::Ordering::Greater, /// ); /// ``` - #[inline] - fn compare(left: &[u8], right: &[u8]) -> Ordering { - left.cmp(right) - } + const KEY_COMPARE: KeyCompare = KeyCompare::Default; } //---------------------------------------------------------------------------------------------------- Impl -impl Key for T -where - T: Storable + Sized, -{ - type Primary = Self; +/// [`Ord`] comparison for arrays/vectors. +impl Key for [T; N] where T: Key + Storable + Sized + bytemuck::Pod {} +impl Key for StorableVec {} + +/// [`Ord`] comparison for misc types. +/// +/// This is not a blanket implementation because +/// it allows outer crates to define their own +/// comparison functions for their `T: Storable` types. +impl Key for () {} +impl Key for StorableBytes {} +impl Key for StorableStr {} + +/// Number comparison. +/// +/// # Invariant +/// This must _only_ be implemented for [`u32`], [`u64`] (and maybe [`usize`]). +/// +/// This is because: +/// 1. We use LMDB's `INTEGER_KEY` flag when this enum variant is used +/// 2. LMDB only supports these types when using that flag +/// +/// See: +/// +/// Other numbers will still have the same behavior, but they use +/// [`impl_custom_numbers_key`] and essentially pass LMDB a "custom" +/// number compare function. +macro_rules! impl_number_key { + ($($t:ident),* $(,)?) => { + $( + impl Key for $t { + const KEY_COMPARE: KeyCompare = KeyCompare::Number; + } + )* + }; +} + +impl_number_key!(u32, u64, usize); +#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))] +compile_error!("`cuprate_database`: `usize` must be equal to `u32` or `u64` for LMDB's `usize` key sorting to function correctly"); + +/// Custom number comparison for other numbers. +macro_rules! impl_custom_numbers_key { + ($($t:ident),* $(,)?) => { + $( + impl Key for $t { + // Just forward the the number comparison function. + const KEY_COMPARE: KeyCompare = KeyCompare::Custom(|left, right| { + KeyCompare::Number.as_compare_fn::<$t>()(left, right) + }); + } + )* + }; +} + +impl_custom_numbers_key!(u8, u16, u128, i8, i16, i32, i64, i128, isize); + +//---------------------------------------------------------------------------------------------------- KeyCompare +/// Comparison behavior for [`Key`]s. +/// +/// This determines how the database sorts [`Key`]s inside a database [`Table`](crate::Table). +/// +/// See [`Key`] for more info. +#[derive(Default, Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum KeyCompare { + /// Use the default comparison behavior of the backend. + /// + /// Currently, both `heed` and `redb` use + /// [lexicographical comparison](https://doc.rust-lang.org/1.79.0/std/cmp/trait.Ord.html#lexicographical-comparison) + /// by default, i.e. a straight byte comparison. + #[default] + Default, + + /// A by-value number comparison, i.e. `255 < 256`. + /// + /// This _behavior_ is implemented as the default for all number primitives, + /// although some implementations on numbers use [`KeyCompare::Custom`] due + /// to internal implementation details of LMDB. + Number, + + /// A custom sorting function. + /// + /// The input of the function is 2 [`Key`]s in byte form. + Custom(fn(&[u8], &[u8]) -> Ordering), +} + +impl KeyCompare { + /// Return [`Self`] as a pure comparison function. + /// + /// The returned function expects 2 [`Key`]s in byte form as input. + #[inline] + pub const fn as_compare_fn(self) -> fn(&[u8], &[u8]) -> Ordering { + match self { + Self::Default => std::cmp::Ord::cmp, + Self::Number => |left, right| { + let left = ::from_bytes(left); + let right = ::from_bytes(right); + std::cmp::Ord::cmp(&left, &right) + }, + Self::Custom(f) => f, + } + } } //---------------------------------------------------------------------------------------------------- Tests diff --git a/storage/database/src/lib.rs b/storage/database/src/lib.rs index 1e15b584..31c5c95c 100644 --- a/storage/database/src/lib.rs +++ b/storage/database/src/lib.rs @@ -126,10 +126,10 @@ pub use error::{InitError, RuntimeError}; pub mod resize; mod key; -pub use key::Key; +pub use key::{Key, KeyCompare}; mod storable; -pub use storable::{Storable, StorableBytes, StorableVec}; +pub use storable::{Storable, StorableBytes, StorableStr, StorableVec}; mod table; pub use table::Table; diff --git a/storage/database/src/storable.rs b/storage/database/src/storable.rs index b5fa2f8a..8842af7f 100644 --- a/storage/database/src/storable.rs +++ b/storage/database/src/storable.rs @@ -1,7 +1,10 @@ //! (De)serialization for table keys & values. //---------------------------------------------------------------------------------------------------- Import -use std::{borrow::Borrow, fmt::Debug}; +use std::{ + borrow::{Borrow, Cow}, + fmt::Debug, +}; use bytemuck::Pod; use bytes::Bytes; @@ -194,6 +197,66 @@ impl Borrow<[T]> for StorableVec { } } +//---------------------------------------------------------------------------------------------------- StorableVec +/// A [`Storable`] string. +/// +/// This is a wrapper around a `Cow<'static, str>` +/// that can be stored in the database. +/// +/// # Invariant +/// [`StorableStr::from_bytes`] will panic +/// if the bytes are not UTF-8. This should normally +/// not be possible in database operations, although technically +/// you can call this function yourself and input bad data. +/// +/// # Example +/// ```rust +/// # use cuprate_database::*; +/// # use std::borrow::Cow; +/// let string: StorableStr = StorableStr(Cow::Borrowed("a")); +/// +/// // Into bytes. +/// let into = Storable::as_bytes(&string); +/// assert_eq!(into, &[97]); +/// +/// // From bytes. +/// let from: StorableStr = Storable::from_bytes(&into); +/// assert_eq!(from, string); +/// ``` +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, bytemuck::TransparentWrapper)] +#[repr(transparent)] +pub struct StorableStr(pub Cow<'static, str>); + +impl Storable for StorableStr { + const BYTE_LENGTH: Option = None; + + /// [`String::as_bytes`]. + #[inline] + fn as_bytes(&self) -> &[u8] { + self.0.as_bytes() + } + + #[inline] + fn from_bytes(bytes: &[u8]) -> Self { + Self(Cow::Owned(std::str::from_utf8(bytes).unwrap().to_string())) + } +} + +impl std::ops::Deref for StorableStr { + type Target = Cow<'static, str>; + #[inline] + fn deref(&self) -> &Cow<'static, str> { + &self.0 + } +} + +impl Borrow> for StorableStr { + #[inline] + fn borrow(&self) -> &Cow<'static, str> { + &self.0 + } +} + //---------------------------------------------------------------------------------------------------- StorableBytes /// A [`Storable`] version of [`Bytes`]. /// diff --git a/storage/database/src/tests.rs b/storage/database/src/tests.rs index 81561073..9c9317d2 100644 --- a/storage/database/src/tests.rs +++ b/storage/database/src/tests.rs @@ -15,7 +15,7 @@ pub(crate) struct TestTable; impl Table for TestTable { const NAME: &'static str = "test_table"; - type Key = u8; + type Key = u32; type Value = u64; } From 7c8466f4bac8a861add219483fc8e6543867924c Mon Sep 17 00:00:00 2001 From: Boog900 Date: Tue, 2 Jul 2024 22:08:19 +0000 Subject: [PATCH 4/4] Storage: add blockchain history requests (#206) * Add database requests for chain history * misc fixes * review comments * fix clippy * add link and fix typo * Apply suggestions from code review Co-authored-by: hinto-janai * add comment --------- Co-authored-by: hinto-janai --- Cargo.lock | 1 + p2p/p2p/src/block_downloader.rs | 2 +- p2p/p2p/src/block_downloader/request_chain.rs | 14 +-- p2p/p2p/src/block_downloader/tests.rs | 4 +- storage/blockchain/Cargo.toml | 2 +- storage/blockchain/src/service/free.rs | 69 ++++++++++++- storage/blockchain/src/service/read.rs | 99 +++++++++++++++++-- types/src/blockchain.rs | 30 +++++- 8 files changed, 198 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 68ccc3ae..d8628217 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -506,6 +506,7 @@ dependencies = [ "monero-serai", "paste", "pretty_assertions", + "proptest", "rayon", "tempfile", "thread_local", diff --git a/p2p/p2p/src/block_downloader.rs b/p2p/p2p/src/block_downloader.rs index 7d0ab7e2..81640e90 100644 --- a/p2p/p2p/src/block_downloader.rs +++ b/p2p/p2p/src/block_downloader.rs @@ -121,7 +121,7 @@ pub enum ChainSvcResponse { /// The response for [`ChainSvcRequest::FindFirstUnknown`]. /// /// Contains the index of the first unknown block and its expected height. - FindFirstUnknown(usize, u64), + FindFirstUnknown(Option<(usize, u64)>), /// The response for [`ChainSvcRequest::CumulativeDifficulty`]. /// /// The current cumulative difficulty of our chain. diff --git a/p2p/p2p/src/block_downloader/request_chain.rs b/p2p/p2p/src/block_downloader/request_chain.rs index f8b53194..471635bf 100644 --- a/p2p/p2p/src/block_downloader/request_chain.rs +++ b/p2p/p2p/src/block_downloader/request_chain.rs @@ -198,7 +198,7 @@ where tracing::debug!("Highest chin entry contained {} block Ids", hashes.len()); // Find the first unknown block in the batch. - let ChainSvcResponse::FindFirstUnknown(first_unknown, expected_height) = our_chain_svc + let ChainSvcResponse::FindFirstUnknown(first_unknown_ret) = our_chain_svc .ready() .await? .call(ChainSvcRequest::FindFirstUnknown(hashes.clone())) @@ -207,18 +207,18 @@ where panic!("chain service sent wrong response."); }; + // We know all the blocks already + // TODO: The peer could still be on a different chain, however the chain might just be too far split. + let Some((first_unknown, expected_height)) = first_unknown_ret else { + return Err(BlockDownloadError::FailedToFindAChainToFollow); + }; + // The peer must send at least one block we already know. if first_unknown == 0 { peer_handle.ban_peer(MEDIUM_BAN); return Err(BlockDownloadError::PeerSentNoOverlappingBlocks); } - // We know all the blocks already - // TODO: The peer could still be on a different chain, however the chain might just be too far split. - if first_unknown == hashes.len() { - return Err(BlockDownloadError::FailedToFindAChainToFollow); - } - let previous_id = hashes[first_unknown - 1]; let first_entry = ChainEntry { diff --git a/p2p/p2p/src/block_downloader/tests.rs b/p2p/p2p/src/block_downloader/tests.rs index 5d4225cd..bf342727 100644 --- a/p2p/p2p/src/block_downloader/tests.rs +++ b/p2p/p2p/src/block_downloader/tests.rs @@ -314,7 +314,9 @@ impl Service for OurChainSvc { block_ids: vec![genesis], cumulative_difficulty: 1, }, - ChainSvcRequest::FindFirstUnknown(_) => ChainSvcResponse::FindFirstUnknown(1, 1), + ChainSvcRequest::FindFirstUnknown(_) => { + ChainSvcResponse::FindFirstUnknown(Some((1, 1))) + } ChainSvcRequest::CumulativeDifficulty => ChainSvcResponse::CumulativeDifficulty(1), }) } diff --git a/storage/blockchain/Cargo.toml b/storage/blockchain/Cargo.toml index bab582d6..8a882142 100644 --- a/storage/blockchain/Cargo.toml +++ b/storage/blockchain/Cargo.toml @@ -45,8 +45,8 @@ rayon = { workspace = true, optional = true } cuprate-helper = { path = "../../helper", features = ["thread"] } cuprate-test-utils = { path = "../../test-utils" } -bytemuck = { version = "1.14.3", features = ["must_cast", "derive", "min_const_generics", "extern_crate_alloc"] } tempfile = { version = "3.10.0" } pretty_assertions = { workspace = true } +proptest = { workspace = true } hex = { workspace = true } hex-literal = { workspace = true } diff --git a/storage/blockchain/src/service/free.rs b/storage/blockchain/src/service/free.rs index 3ff8d6eb..3701f66f 100644 --- a/storage/blockchain/src/service/free.rs +++ b/storage/blockchain/src/service/free.rs @@ -33,8 +33,69 @@ pub fn init(config: Config) -> Result<(DatabaseReadHandle, DatabaseWriteHandle), Ok((readers, writer)) } -//---------------------------------------------------------------------------------------------------- Tests -#[cfg(test)] -mod test { - // use super::*; +//---------------------------------------------------------------------------------------------------- Compact history +/// Given a position in the compact history, returns the height offset that should be in that position. +/// +/// The height offset is the difference between the top block's height and the block height that should be in that position. +#[inline] +pub(super) const fn compact_history_index_to_height_offset( + i: u64, +) -> u64 { + // If the position is below the initial blocks just return the position back + if i <= INITIAL_BLOCKS { + i + } else { + // Otherwise we go with power of 2 offsets, the same as monerod. + // So (INITIAL_BLOCKS + 2), (INITIAL_BLOCKS + 2 + 4), (INITIAL_BLOCKS + 2 + 4 + 8) + // ref: + INITIAL_BLOCKS + (2 << (i - INITIAL_BLOCKS)) - 2 + } +} + +/// Returns if the genesis block was _NOT_ included when calculating the height offsets. +/// +/// The genesis must always be included in the compact history. +#[inline] +pub(super) const fn compact_history_genesis_not_included( + top_block_height: u64, +) -> bool { + // If the top block height is less than the initial blocks then it will always be included. + // Otherwise, we use the fact that to reach the genesis block this statement must be true (for a + // single `i`): + // + // `top_block_height - INITIAL_BLOCKS - 2^i + 2 == 0` + // which then means: + // `top_block_height - INITIAL_BLOCKS + 2 == 2^i` + // So if `top_block_height - INITIAL_BLOCKS + 2` is a power of 2 then the genesis block is in + // the compact history already. + top_block_height > INITIAL_BLOCKS && !(top_block_height - INITIAL_BLOCKS + 2).is_power_of_two() +} + +//---------------------------------------------------------------------------------------------------- Tests + +#[cfg(test)] +mod tests { + use proptest::prelude::*; + + use super::*; + + proptest! { + #[test] + fn compact_history(top_height in 0_u64..500_000_000) { + let mut heights = (0..) + .map(compact_history_index_to_height_offset::<11>) + .map_while(|i| top_height.checked_sub(i)) + .collect::>(); + + if compact_history_genesis_not_included::<11>(top_height) { + heights.push(0); + } + + // Make sure the genesis and top block are always included. + assert_eq!(*heights.last().unwrap(), 0); + assert_eq!(*heights.first().unwrap(), top_height); + + heights.windows(2).for_each(|window| assert_ne!(window[0], window[1])); + } + } } diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index 20aebf9c..7f856ccd 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -14,7 +14,7 @@ use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio_util::sync::PollSemaphore; use cuprate_database::{ConcreteEnv, DatabaseRo, Env, EnvInner, RuntimeError}; -use cuprate_helper::asynch::InfallibleOneshotReceiver; +use cuprate_helper::{asynch::InfallibleOneshotReceiver, map::combine_low_high_bits_to_u128}; use cuprate_types::{ blockchain::{BCReadRequest, BCResponse}, ExtendedBlockHeader, OutputOnChain, @@ -23,17 +23,20 @@ use cuprate_types::{ use crate::{ config::ReaderThreads, open_tables::OpenTables, - ops::block::block_exists, ops::{ - block::{get_block_extended_header_from_height, get_block_info}, + block::{ + block_exists, get_block_extended_header_from_height, get_block_height, get_block_info, + }, blockchain::{cumulative_generated_coins, top_block_height}, key_image::key_image_exists, output::id_to_output_on_chain, }, - service::types::{ResponseReceiver, ResponseResult, ResponseSender}, + service::{ + free::{compact_history_genesis_not_included, compact_history_index_to_height_offset}, + types::{ResponseReceiver, ResponseResult, ResponseSender}, + }, tables::{BlockHeights, BlockInfos, Tables}, - types::BlockHash, - types::{Amount, AmountIndex, BlockHeight, KeyImage, PreRctOutputId}, + types::{Amount, AmountIndex, BlockHash, BlockHeight, KeyImage, PreRctOutputId}, }; //---------------------------------------------------------------------------------------------------- DatabaseReadHandle @@ -204,13 +207,15 @@ fn map_request( let response = match request { R::BlockExtendedHeader(block) => block_extended_header(env, block), R::BlockHash(block) => block_hash(env, block), - R::FilterUnknownHashes(hashes) => filter_unknown_hahses(env, hashes), + R::FilterUnknownHashes(hashes) => filter_unknown_hashes(env, hashes), R::BlockExtendedHeaderInRange(range) => block_extended_header_in_range(env, range), R::ChainHeight => chain_height(env), R::GeneratedCoins => generated_coins(env), R::Outputs(map) => outputs(env, map), R::NumberOutputsWithAmount(vec) => number_outputs_with_amount(env, vec), R::KeyImagesSpent(set) => key_images_spent(env, set), + R::CompactChainHistory => compact_chain_history(env), + R::FindFirstUnknown(block_ids) => find_first_unknown(env, &block_ids), }; if let Err(e) = response_sender.send(response) { @@ -320,7 +325,7 @@ fn block_hash(env: &ConcreteEnv, block_height: BlockHeight) -> ResponseResult { /// [`BCReadRequest::FilterUnknownHashes`]. #[inline] -fn filter_unknown_hahses(env: &ConcreteEnv, mut hashes: HashSet) -> ResponseResult { +fn filter_unknown_hashes(env: &ConcreteEnv, mut hashes: HashSet) -> ResponseResult { // Single-threaded, no `ThreadLocal` required. let env_inner = env.env_inner(); let tx_ro = env_inner.tx_ro()?; @@ -525,3 +530,81 @@ fn key_images_spent(env: &ConcreteEnv, key_images: HashSet) -> Respons Some(Err(e)) => Err(e), // A database error occurred. } } + +/// [`BCReadRequest::CompactChainHistory`] +fn compact_chain_history(env: &ConcreteEnv) -> ResponseResult { + let env_inner = env.env_inner(); + let tx_ro = env_inner.tx_ro()?; + + let table_block_heights = env_inner.open_db_ro::(&tx_ro)?; + let table_block_infos = env_inner.open_db_ro::(&tx_ro)?; + + let top_block_height = top_block_height(&table_block_heights)?; + + let top_block_info = get_block_info(&top_block_height, &table_block_infos)?; + let cumulative_difficulty = combine_low_high_bits_to_u128( + top_block_info.cumulative_difficulty_low, + top_block_info.cumulative_difficulty_high, + ); + + /// The amount of top block IDs in the compact chain. + const INITIAL_BLOCKS: u64 = 11; + + // rayon is not used here because the amount of block IDs is expected to be small. + let mut block_ids = (0..) + .map(compact_history_index_to_height_offset::) + .map_while(|i| top_block_height.checked_sub(i)) + .map(|height| Ok(get_block_info(&height, &table_block_infos)?.block_hash)) + .collect::, RuntimeError>>()?; + + if compact_history_genesis_not_included::(top_block_height) { + block_ids.push(get_block_info(&0, &table_block_infos)?.block_hash); + } + + Ok(BCResponse::CompactChainHistory { + cumulative_difficulty, + block_ids, + }) +} + +/// [`BCReadRequest::FindFirstUnknown`] +/// +/// # Invariant +/// `block_ids` must be sorted in chronological block order, or else +/// the returned result is unspecified and meaningless, as this function +/// performs a binary search. +fn find_first_unknown(env: &ConcreteEnv, block_ids: &[BlockHash]) -> ResponseResult { + let env_inner = env.env_inner(); + let tx_ro = env_inner.tx_ro()?; + + let table_block_heights = env_inner.open_db_ro::(&tx_ro)?; + + let mut err = None; + + // Do a binary search to find the first unknown block in the batch. + let idx = + block_ids.partition_point( + |block_id| match block_exists(block_id, &table_block_heights) { + Ok(exists) => exists, + Err(e) => { + err.get_or_insert(e); + // if this happens the search is scrapped, just return `false` back. + false + } + }, + ); + + if let Some(e) = err { + return Err(e); + } + + Ok(if idx == block_ids.len() { + BCResponse::FindFirstUnknown(None) + } else if idx == 0 { + BCResponse::FindFirstUnknown(Some((0, 0))) + } else { + let last_known_height = get_block_height(&block_ids[idx - 1], &table_block_heights)?; + + BCResponse::FindFirstUnknown(Some((idx, last_known_height + 1))) + }) +} diff --git a/types/src/blockchain.rs b/types/src/blockchain.rs index 42390f9d..5a09ca3d 100644 --- a/types/src/blockchain.rs +++ b/types/src/blockchain.rs @@ -83,10 +83,21 @@ pub enum BCReadRequest { /// The input is a list of output amounts. NumberOutputsWithAmount(Vec), - /// Check that all key images within a set arer not spent. + /// Check that all key images within a set are not spent. /// /// Input is a set of key images. KeyImagesSpent(HashSet<[u8; 32]>), + + /// A request for the compact chain history. + CompactChainHistory, + + /// A request to find the first unknown block ID in a list of block IDs. + //// + /// # Invariant + /// The [`Vec`] containing the block IDs must be sorted in chronological block + /// order, or else the returned response is unspecified and meaningless, + /// as this request performs a binary search. + FindFirstUnknown(Vec<[u8; 32]>), } //---------------------------------------------------------------------------------------------------- WriteRequest @@ -164,6 +175,23 @@ pub enum BCResponse { /// The inner value is `false` if _none_ of the key images were spent. KeyImagesSpent(bool), + /// Response to [`BCReadRequest::CompactChainHistory`]. + CompactChainHistory { + /// A list of blocks IDs in our chain, starting with the most recent block, all the way to the genesis block. + /// + /// These blocks should be in reverse chronological order, not every block is needed. + block_ids: Vec<[u8; 32]>, + /// The current cumulative difficulty of the chain. + cumulative_difficulty: u128, + }, + + /// The response for [`BCReadRequest::FindFirstUnknown`]. + /// + /// Contains the index of the first unknown block and its expected height. + /// + /// This will be [`None`] if all blocks were known. + FindFirstUnknown(Option<(usize, u64)>), + //------------------------------------------------------ Writes /// Response to [`BCWriteRequest::WriteBlock`]. ///