From 95b3d0417f63ca1a28609c15fcb3c4d759842930 Mon Sep 17 00:00:00 2001 From: Someone Else <someoneelse.is_on.github.rio7x@simplelogin.com> Date: Sun, 7 May 2023 23:22:59 +0200 Subject: [PATCH] save --- database/reactor/src/lib.rs | 36 +++------ database/reactor/src/message.rs | 2 - database/reactor/src/reactor.rs | 4 +- database/src/encoding/implementation.rs | 12 +-- database/src/encoding/mod.rs | 9 ++- database/src/interface/default.rs | 99 +++++++++++++++++++++++-- database/src/interface/mod.rs | 6 ++ database/src/lib.rs | 7 +- database/src/table.rs | 1 + database/src/types.rs | 4 +- 10 files changed, 125 insertions(+), 55 deletions(-) diff --git a/database/reactor/src/lib.rs b/database/reactor/src/lib.rs index c5323097..355a3d21 100644 --- a/database/reactor/src/lib.rs +++ b/database/reactor/src/lib.rs @@ -6,59 +6,41 @@ pub mod message; pub mod reactor; pub mod thread; -use std::{pin::Pin, thread::JoinHandle, sync::Arc, time::Duration}; +use std::{pin::Pin, thread::JoinHandle, sync::Arc}; use cuprate_database::error::DBException; use futures::{channel::{oneshot, mpsc}, Future, FutureExt}; use message::{DatabaseRequest, DatabaseResponse, DatabaseClientRequest}; -use tower::Service; #[derive(Debug, Clone)] /// `Databaseclient` is a struct shared across the daemon to interact with database reactor, and therefore the underlying database pub struct DatabaseClient { /// The channel used to send request to the reactor db: mpsc::Sender<DatabaseClientRequest>, - /// A shared pointer to the reactor thread. Used to check if the reactor shutdowned properly. - reactor_thread: Arc<JoinHandle<()>>, + /// Shared handle to the reactor thread to check if the thread is stopped + reactor_thread: Arc<JoinHandle<()>> } -impl DatabaseClient { - - /// This function send a message to stop the reactor, and check if it shutdowned properly. - pub async fn shutdown(mut self) -> Result<(), ()> { - - if let DatabaseResponse::Shutdowned = self - .call(DatabaseRequest::Shutdown) - .await - .map_err(|err| {})? - { - // A small delay is placed here to let the OS thread shutdown. The upper response is sent just before the end of the thread. - std::thread::sleep(Duration::from_millis(200)); - if self.reactor_thread.is_finished() { - return Ok(()) - } - } - Err(()) - } -} - - +/// Implementing Tower service for the database client impl tower::Service<DatabaseRequest> for DatabaseClient { type Response = DatabaseResponse; - type Error = DBException; + type Error = DBException; // The reactor can sent back to the caller database errors, such as NotFound for example type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; + /// `poll_ready` check if the channel is sempty and therefore is waiting to process a request fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> { self.db .poll_ready(cx) .map_err(|_| DBException::Other("closed")) } + /// `call` to send a request to the database fn call(&mut self, req: DatabaseRequest) -> Self::Future { + // Generating result oneshot::channel let (tx, rx) = oneshot::channel::<Result<Self::Response, Self::Error>>(); // get the callers span let span = tracing::span::Span::current(); - + let req = DatabaseClientRequest { req, tx, span }; match self.db.try_send(req) { diff --git a/database/reactor/src/message.rs b/database/reactor/src/message.rs index 3f91ff6c..a0c3f603 100644 --- a/database/reactor/src/message.rs +++ b/database/reactor/src/message.rs @@ -21,7 +21,6 @@ pub enum DatabaseRequest { Chain, BlockHeight(Hash), BlockKnown(Hash), - Shutdown, } /// `DatabaseResponse` is an enum listing all the response sent from the reactor, to answer their corresponding Request. @@ -32,7 +31,6 @@ pub enum DatabaseResponse { Chain(Vec<Hash>), BlockHeight(Option<u64>), BlockKnown(BlockKnown), - Shutdowned } // Temporary diff --git a/database/reactor/src/reactor.rs b/database/reactor/src/reactor.rs index 4ec65488..d4bd138d 100644 --- a/database/reactor/src/reactor.rs +++ b/database/reactor/src/reactor.rs @@ -1,7 +1,7 @@ use std::{thread::JoinHandle, path::PathBuf, sync::{RwLock, Arc}}; use futures::{channel::{oneshot, mpsc::{self, Receiver}}, Future, FutureExt}; -use cuprate_database::{database::{Database, Interface}, error::DB_FAILURES}; +use cuprate_database::{database::{Database, Interface}, error::DBException}; use libmdbx::{NoWriteMap, WriteMap}; use tracing::{span, Level, event, Span}; use crate::{message::{DatabaseRequest, DatabaseResponse, DatabaseClientRequest}, DatabaseClient, thread::{WriteThread, ReadThread}}; @@ -15,8 +15,6 @@ pub struct DatabaseReactor { write_thread: Option<WriteThread>, /// Vector of ReadThread read_threads: Vec<ReadThread>, - /// The (theorical) on-going memory changes in the database - mm_size: u64, /// The number of write being performed in the database write_count: u64, } diff --git a/database/src/encoding/implementation.rs b/database/src/encoding/implementation.rs index cd017563..8837786b 100644 --- a/database/src/encoding/implementation.rs +++ b/database/src/encoding/implementation.rs @@ -5,8 +5,8 @@ use std::{fmt::Debug, io::Write}; use monero::{Hash, PublicKey, util::ringct::Key, consensus::{Encodable, Decodable}}; -use crate::{types::{BlockMetadata, TransactionPruned, OutputMetadata, TxIndex, AltBlock, TxOutputIdx}, BINCODE_CONFIG}; -use super::{Encode, Error, Decode, compat::Compat, Buffer}; +use crate::{types::{BlockMetadata, TransactionPruned, OutputMetadata, TxIndex, AltBlock, TxOutputIdx}}; +use super::{Encode, Error, Decode, compat::Compat, Buffer, BINCODE_CONFIG}; /// A macro for idiomatic implementation of encode|decode for integer primitives macro_rules! impl_encode_decode_integer { @@ -141,7 +141,7 @@ impl Encode for BlockMetadata { buf[8..16].copy_from_slice(&self.total_coins_generated.to_le_bytes()); buf[16..24].copy_from_slice(&self.weight.to_le_bytes()); buf[24..40].copy_from_slice(&self.cumulative_difficulty.to_le_bytes()); - buf[40..72].copy_from_slice(&self.block_hash.0.0); // < When Hash will be implemented pls remove the Compat this + buf[40..72].copy_from_slice(&self.block_hash.0); buf[72..80].copy_from_slice(&self.cum_rct.to_le_bytes()); buf[80..88].copy_from_slice(&self.long_term_block_weight.to_le_bytes()); Ok(buf) @@ -159,7 +159,7 @@ impl Decode for BlockMetadata { total_coins_generated: u64::from_le_bytes(src[8..16].try_into().map_err(Error::TryInto)?), weight: u64::from_le_bytes(src[16..24].try_into().map_err(Error::TryInto)?), cumulative_difficulty: u128::from_le_bytes(src[24..40].try_into().map_err(Error::TryInto)?), - block_hash: Compat(Hash::from_slice(src[40..72].try_into().map_err(Error::Infallible)?)), // < When Hash will be implemented pls remove the Compat this + block_hash: Hash::from_slice(src[40..72].try_into().map_err(Error::Infallible)?), // < When Hash will be implemented pls remove the Compat this cum_rct: u64::from_le_bytes(src[72..80].try_into().map_err(Error::TryInto)?), long_term_block_weight: u64::from_le_bytes(src[80..88].try_into().map_err(Error::TryInto)?), } @@ -395,11 +395,11 @@ mod tests { total_coins_generated: 17, weight: 4300, cumulative_difficulty: 965904, - block_hash: Compat(Hash([ + block_hash: Hash([ 0x5c, 0x5e, 0x69, 0xd8, 0xfc, 0x0d, 0x22, 0x6a, 0x60, 0x91, 0x47, 0xda, 0x98, 0x36, 0x06, 0x00, 0xf4, 0xea, 0x49, 0xcc, 0x49, 0x45, 0x2c, 0x5e, 0xf8, 0xba, 0x20, 0xf5, 0x93, 0xd4, 0x80, 0x7d, - ])), + ]), cum_rct: 7, long_term_block_weight: 30 }; diff --git a/database/src/encoding/mod.rs b/database/src/encoding/mod.rs index 72e0fe2b..fe4b3069 100644 --- a/database/src/encoding/mod.rs +++ b/database/src/encoding/mod.rs @@ -7,6 +7,11 @@ use std::{fmt::Debug, array::TryFromSliceError, convert::Infallible, io}; use crate::table::Table; +const BINCODE_CONFIG: bincode::config::Configuration< + bincode::config::LittleEndian, + bincode::config::Fixint, +> = bincode::config::standard().with_fixed_int_encoding(); + pub mod compat; pub mod buffer; pub mod implementation; @@ -55,7 +60,7 @@ pub enum Value<T: Table> { /// variant of Value you should receive. But be careful on your code, or it might crash. impl<'a, T: Table> Value<T> { - fn as_type(&'a self) -> &'a <T as Table>::Value { + pub fn as_type(&'a self) -> &'a <T as Table>::Value { assert!(matches!(self, Value::Type(_))); // Hint for the compiler to check the boundaries if let Value::Type(value) = self { value @@ -64,7 +69,7 @@ impl<'a, T: Table> Value<T> { } } - fn as_raw(&'a self) -> &'a <<T as Table>::Value as Encode>::Output { + pub fn as_raw(&'a self) -> &'a <<T as Table>::Value as Encode>::Output { assert!(matches!(self, Value::Raw(_))); // Hint for the compiler to check the boundaries if let Value::Raw(raw) = self { raw diff --git a/database/src/interface/default.rs b/database/src/interface/default.rs index 8df00526..52381ce5 100644 --- a/database/src/interface/default.rs +++ b/database/src/interface/default.rs @@ -20,60 +20,143 @@ impl<'thread, D: Database<'thread>> ReadInterface<'thread> for Interface<'thread fn block_exists(&'thread self, hash: &Hash) -> Result<bool, DBException> { let ro_tx = self.db.tx().map_err(Into::into)?; + let mut cursor_blockhash = ro_tx.cursor_dup::<table::blockhash>()?; - let mut cursor_blockhash = ro_tx.cursor_dup::<table::blockhash>()?; Ok(cursor_blockhash.get_dup::<true>(&(), hash)?.is_some()) } fn get_block_hash(&'thread self, height: &u64) -> Result<Hash, DBException> { - todo!() + let ro_tx = self.db.tx().map_err(Into::into)?; + let mut cursor_blockmetadata = ro_tx.cursor_dup::<table::blockmetadata>()?; + + let metadata = cursor_blockmetadata + .get_dup::<false>(&(), height)? + .ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))?; + + Ok(metadata.as_type().block_hash) } fn get_block_height(&'thread self, hash: &Hash) -> Result<u64, DBException> { - todo!() + let ro_tx = self.db.tx().map_err(Into::into)?; + let mut cursor_blockhash = ro_tx.cursor_dup::<table::blockhash>()?; + + cursor_blockhash + .get_dup::<false>(&(), hash)? + .ok_or(DBException::NotFound(format!("Failed to find height of block: {}", hash))) + .map(|res| *res.as_type()) } fn get_block_weight(&'thread self, height: &u64) -> Result<u64, DBException> { - todo!() + let ro_tx = self.db.tx().map_err(Into::into)?; + let mut cursor_blockmetadata = ro_tx.cursor_dup::<table::blockmetadata>()?; + + let metadata = cursor_blockmetadata + .get_dup::<false>(&(), height)? + .ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))?; + + Ok(metadata.as_type().weight) } fn get_block_already_generated_coins(&'thread self, height: &u64) -> Result<u64, DBException> { - todo!() + let ro_tx = self.db.tx().map_err(Into::into)?; + let mut cursor_blockmetadata = ro_tx.cursor_dup::<table::blockmetadata>()?; + + let metadata = cursor_blockmetadata + .get_dup::<false>(&(), height)? + .ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))?; + + Ok(metadata.as_type().total_coins_generated) } fn get_block_long_term_weight(&'thread self, height: &u64) -> Result<u64, DBException> { - todo!() + let ro_tx = self.db.tx().map_err(Into::into)?; + let mut cursor_blockmetadata = ro_tx.cursor_dup::<table::blockmetadata>()?; + + let metadata = cursor_blockmetadata + .get_dup::<false>(&(), height)? + .ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))?; + + Ok(metadata.as_type().long_term_block_weight) } fn get_block_timestamp(&'thread self, height: &u64) -> Result<u64, DBException> { - todo!() + let ro_tx = self.db.tx().map_err(Into::into)?; + let mut cursor_blockmetadata = ro_tx.cursor_dup::<table::blockmetadata>()?; + + let metadata = cursor_blockmetadata + .get_dup::<false>(&(), height)? + .ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))?; + + Ok(metadata.as_type().timestamp) } fn get_block_cumulative_rct_outputs(&'thread self, height: &u64) -> Result<u64, DBException> { - todo!() + let ro_tx = self.db.tx().map_err(Into::into)?; + let mut cursor_blockmetadata = ro_tx.cursor_dup::<table::blockmetadata>()?; + + let metadata = cursor_blockmetadata + .get_dup::<false>(&(), height)? + .ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))?; + + Ok(metadata.as_type().cum_rct) } + fn get_block_cumulative_difficulty(&'thread self, height: &u64) -> Result<u128, DBException> { + let ro_tx = self.db.tx().map_err(Into::into)?; + let mut cursor_blockmetadata = ro_tx.cursor_dup::<table::blockmetadata>()?; + + let metadata = cursor_blockmetadata + .get_dup::<false>(&(), height)? + .ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))?; + + Ok(metadata.as_type().cumulative_difficulty) + } + + fn get_block_difficulty(&'thread self, height: &u64) -> Result<u128, DBException> { + let ro_tx = self.db.tx().map_err(Into::into)?; + let mut cursor_blockmetadata = ro_tx.cursor_dup::<table::blockmetadata>()?; + + let diff1 = cursor_blockmetadata + .get_dup::<false>(&(), height)? + .ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))? + .as_type() + .cumulative_difficulty; + + let diff2 = cursor_blockmetadata + .get_dup::<false>(&(), &(height-1))? + .ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))? + .as_type() + .cumulative_difficulty; + Ok(diff1-diff2) + } + fn get_block<const B: bool>(&'thread self, hash: &Hash) -> Result<Block, DBException> { + let ro_tx = self.db.tx().map_err(Into::into)?; todo!() } fn get_block_from_height<const B: bool>(&'thread self, height: &u64) -> Result<Block, DBException> { + let ro_tx = self.db.tx().map_err(Into::into)?; todo!() } fn get_block_header(&'thread self, hash: &Hash) -> Result<BlockHeader, DBException> { + let ro_tx = self.db.tx().map_err(Into::into)?; todo!() } fn get_block_header_from_height(&'thread self, height: &u64) -> Result<BlockHeader, DBException> { + let ro_tx = self.db.tx().map_err(Into::into)?; todo!() } fn get_top_block(&'thread self) -> Result<Block, DBException> { + let ro_tx = self.db.tx().map_err(Into::into)?; todo!() } fn get_top_block_hash(&'thread self) -> Result<Hash, DBException> { + let ro_tx = self.db.tx().map_err(Into::into)?; todo!() } diff --git a/database/src/interface/mod.rs b/database/src/interface/mod.rs index a36dc5cf..fd76922a 100644 --- a/database/src/interface/mod.rs +++ b/database/src/interface/mod.rs @@ -38,6 +38,12 @@ pub trait ReadInterface<'thread> { /// `get_block_cumulative_rct_outputs` fetch the amount of RingCT outputs at the given block's height fn get_block_cumulative_rct_outputs(&'thread self, height: &u64) -> Result<u64, DBException>; + /// `get_block_cumulative_difficulty` fetch the requested block's cumulative difficulty (with its given height) + fn get_block_cumulative_difficulty(&'thread self, height: &u64) -> Result<u128, DBException>; + + /// `get_block_difficulty` fetch the requested block's cumulative difficulty (with its given height) + fn get_block_difficulty(&'thread self, height: &u64) -> Result<u128, DBException>; + /// `get_block` fetch the requested block (with its given hash) fn get_block<const B: bool>(&'thread self, hash: &Hash) -> Result<Block, DBException>; diff --git a/database/src/lib.rs b/database/src/lib.rs index 4c6e8bd5..bb2e2e76 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -14,7 +14,7 @@ // along with this program. If not, see <https://www.gnu.org/licenses/>. //! The cuprate-db crate implement (as its name suggests) the relations between the blockchain/txpool objects and their databases. -//! `lib.rs` contains all the generics, trait and specification for interfaces between blockchain and a backend-agnostic database +//! `lib.rs` contains all the generics, trait and specification for interfaces between the reactor and a database //! Every other files in this folder are implementation of these traits/methods to real storage engine. //! //! At the moment, the only storage engine available is MDBX. @@ -40,10 +40,7 @@ pub mod types; const DEFAULT_BLOCKCHAIN_DATABASE_DIRECTORY: &str = "blockchain"; const DEFAULT_TXPOOL_DATABASE_DIRECTORY: &str = "txpool_mem"; -const BINCODE_CONFIG: bincode::config::Configuration< - bincode::config::LittleEndian, - bincode::config::Fixint, -> = bincode::config::standard().with_fixed_int_encoding(); + // ------------------------------------------| Errors |------------------------------------------ diff --git a/database/src/table.rs b/database/src/table.rs index 21b0a2a1..984fb22b 100644 --- a/database/src/table.rs +++ b/database/src/table.rs @@ -26,6 +26,7 @@ pub trait Table: Send + Sync + 'static + Clone { pub trait DupTable: Table { // Subkey of the table (prefix of the data) + // Warning: it should be a type with constant-size type SubKey: Encode + Decode + Ord; } diff --git a/database/src/types.rs b/database/src/types.rs index 48288b5e..b44edb53 100644 --- a/database/src/types.rs +++ b/database/src/types.rs @@ -15,7 +15,7 @@ use monero::{ // ---- BLOCKS ---- -#[derive(Clone, Debug, Encode, Decode, PartialEq)] +#[derive(Clone, Debug, PartialEq)] /// [`BlockMetadata`] is a struct containing metadata of a block such as the block's `timestamp`, the `total_coins_generated` at this height, its `weight`, its difficulty (`diff_lo`) /// and cumulative difficulty (`diff_hi`), the `block_hash`, the cumulative RingCT (`cum_rct`) and its long term weight (`long_term_block_weight`). The monerod's struct equivalent is `mdb_block_info_4` /// This struct is used in [`crate::table::blockmetadata`] table. @@ -29,7 +29,7 @@ pub struct BlockMetadata { /// Block's cumulative_difficulty. In monerod this field would have been split into two `u64`, since cpp don't support *natively* `uint128_t`/`u128` pub cumulative_difficulty: u128, /// Block's hash - pub block_hash: Compat<Hash>, + pub block_hash: Hash, /// Cumulative number of RingCT outputs up to this block pub cum_rct: u64, /// Block's long term weight