This commit is contained in:
Someone Else 2023-05-07 23:22:59 +02:00
parent 8050606464
commit 95b3d0417f
10 changed files with 125 additions and 55 deletions

View file

@ -6,59 +6,41 @@ pub mod message;
pub mod reactor;
pub mod thread;
use std::{pin::Pin, thread::JoinHandle, sync::Arc, time::Duration};
use std::{pin::Pin, thread::JoinHandle, sync::Arc};
use cuprate_database::error::DBException;
use futures::{channel::{oneshot, mpsc}, Future, FutureExt};
use message::{DatabaseRequest, DatabaseResponse, DatabaseClientRequest};
use tower::Service;
#[derive(Debug, Clone)]
/// `Databaseclient` is a struct shared across the daemon to interact with database reactor, and therefore the underlying database
pub struct DatabaseClient {
/// The channel used to send request to the reactor
db: mpsc::Sender<DatabaseClientRequest>,
/// A shared pointer to the reactor thread. Used to check if the reactor shutdowned properly.
reactor_thread: Arc<JoinHandle<()>>,
/// Shared handle to the reactor thread to check if the thread is stopped
reactor_thread: Arc<JoinHandle<()>>
}
impl DatabaseClient {
/// This function send a message to stop the reactor, and check if it shutdowned properly.
pub async fn shutdown(mut self) -> Result<(), ()> {
if let DatabaseResponse::Shutdowned = self
.call(DatabaseRequest::Shutdown)
.await
.map_err(|err| {})?
{
// A small delay is placed here to let the OS thread shutdown. The upper response is sent just before the end of the thread.
std::thread::sleep(Duration::from_millis(200));
if self.reactor_thread.is_finished() {
return Ok(())
}
}
Err(())
}
}
/// Implementing Tower service for the database client
impl tower::Service<DatabaseRequest> for DatabaseClient {
type Response = DatabaseResponse;
type Error = DBException;
type Error = DBException; // The reactor can sent back to the caller database errors, such as NotFound for example
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
/// `poll_ready` check if the channel is sempty and therefore is waiting to process a request
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
self.db
.poll_ready(cx)
.map_err(|_| DBException::Other("closed"))
}
/// `call` to send a request to the database
fn call(&mut self, req: DatabaseRequest) -> Self::Future {
// Generating result oneshot::channel
let (tx, rx) = oneshot::channel::<Result<Self::Response, Self::Error>>();
// get the callers span
let span = tracing::span::Span::current();
let req = DatabaseClientRequest { req, tx, span };
match self.db.try_send(req) {

View file

@ -21,7 +21,6 @@ pub enum DatabaseRequest {
Chain,
BlockHeight(Hash),
BlockKnown(Hash),
Shutdown,
}
/// `DatabaseResponse` is an enum listing all the response sent from the reactor, to answer their corresponding Request.
@ -32,7 +31,6 @@ pub enum DatabaseResponse {
Chain(Vec<Hash>),
BlockHeight(Option<u64>),
BlockKnown(BlockKnown),
Shutdowned
}
// Temporary

View file

@ -1,7 +1,7 @@
use std::{thread::JoinHandle, path::PathBuf, sync::{RwLock, Arc}};
use futures::{channel::{oneshot, mpsc::{self, Receiver}}, Future, FutureExt};
use cuprate_database::{database::{Database, Interface}, error::DB_FAILURES};
use cuprate_database::{database::{Database, Interface}, error::DBException};
use libmdbx::{NoWriteMap, WriteMap};
use tracing::{span, Level, event, Span};
use crate::{message::{DatabaseRequest, DatabaseResponse, DatabaseClientRequest}, DatabaseClient, thread::{WriteThread, ReadThread}};
@ -15,8 +15,6 @@ pub struct DatabaseReactor {
write_thread: Option<WriteThread>,
/// Vector of ReadThread
read_threads: Vec<ReadThread>,
/// The (theorical) on-going memory changes in the database
mm_size: u64,
/// The number of write being performed in the database
write_count: u64,
}

View file

@ -5,8 +5,8 @@
use std::{fmt::Debug, io::Write};
use monero::{Hash, PublicKey, util::ringct::Key, consensus::{Encodable, Decodable}};
use crate::{types::{BlockMetadata, TransactionPruned, OutputMetadata, TxIndex, AltBlock, TxOutputIdx}, BINCODE_CONFIG};
use super::{Encode, Error, Decode, compat::Compat, Buffer};
use crate::{types::{BlockMetadata, TransactionPruned, OutputMetadata, TxIndex, AltBlock, TxOutputIdx}};
use super::{Encode, Error, Decode, compat::Compat, Buffer, BINCODE_CONFIG};
/// A macro for idiomatic implementation of encode|decode for integer primitives
macro_rules! impl_encode_decode_integer {
@ -141,7 +141,7 @@ impl Encode for BlockMetadata {
buf[8..16].copy_from_slice(&self.total_coins_generated.to_le_bytes());
buf[16..24].copy_from_slice(&self.weight.to_le_bytes());
buf[24..40].copy_from_slice(&self.cumulative_difficulty.to_le_bytes());
buf[40..72].copy_from_slice(&self.block_hash.0.0); // < When Hash will be implemented pls remove the Compat this
buf[40..72].copy_from_slice(&self.block_hash.0);
buf[72..80].copy_from_slice(&self.cum_rct.to_le_bytes());
buf[80..88].copy_from_slice(&self.long_term_block_weight.to_le_bytes());
Ok(buf)
@ -159,7 +159,7 @@ impl Decode for BlockMetadata {
total_coins_generated: u64::from_le_bytes(src[8..16].try_into().map_err(Error::TryInto)?),
weight: u64::from_le_bytes(src[16..24].try_into().map_err(Error::TryInto)?),
cumulative_difficulty: u128::from_le_bytes(src[24..40].try_into().map_err(Error::TryInto)?),
block_hash: Compat(Hash::from_slice(src[40..72].try_into().map_err(Error::Infallible)?)), // < When Hash will be implemented pls remove the Compat this
block_hash: Hash::from_slice(src[40..72].try_into().map_err(Error::Infallible)?), // < When Hash will be implemented pls remove the Compat this
cum_rct: u64::from_le_bytes(src[72..80].try_into().map_err(Error::TryInto)?),
long_term_block_weight: u64::from_le_bytes(src[80..88].try_into().map_err(Error::TryInto)?),
}
@ -395,11 +395,11 @@ mod tests {
total_coins_generated: 17,
weight: 4300,
cumulative_difficulty: 965904,
block_hash: Compat(Hash([
block_hash: Hash([
0x5c, 0x5e, 0x69, 0xd8, 0xfc, 0x0d, 0x22, 0x6a, 0x60, 0x91, 0x47, 0xda, 0x98, 0x36,
0x06, 0x00, 0xf4, 0xea, 0x49, 0xcc, 0x49, 0x45, 0x2c, 0x5e, 0xf8, 0xba, 0x20, 0xf5,
0x93, 0xd4, 0x80, 0x7d,
])),
]),
cum_rct: 7,
long_term_block_weight: 30
};

View file

@ -7,6 +7,11 @@
use std::{fmt::Debug, array::TryFromSliceError, convert::Infallible, io};
use crate::table::Table;
const BINCODE_CONFIG: bincode::config::Configuration<
bincode::config::LittleEndian,
bincode::config::Fixint,
> = bincode::config::standard().with_fixed_int_encoding();
pub mod compat;
pub mod buffer;
pub mod implementation;
@ -55,7 +60,7 @@ pub enum Value<T: Table> {
/// variant of Value you should receive. But be careful on your code, or it might crash.
impl<'a, T: Table> Value<T> {
fn as_type(&'a self) -> &'a <T as Table>::Value {
pub fn as_type(&'a self) -> &'a <T as Table>::Value {
assert!(matches!(self, Value::Type(_))); // Hint for the compiler to check the boundaries
if let Value::Type(value) = self {
value
@ -64,7 +69,7 @@ impl<'a, T: Table> Value<T> {
}
}
fn as_raw(&'a self) -> &'a <<T as Table>::Value as Encode>::Output {
pub fn as_raw(&'a self) -> &'a <<T as Table>::Value as Encode>::Output {
assert!(matches!(self, Value::Raw(_))); // Hint for the compiler to check the boundaries
if let Value::Raw(raw) = self {
raw

View file

@ -20,60 +20,143 @@ impl<'thread, D: Database<'thread>> ReadInterface<'thread> for Interface<'thread
fn block_exists(&'thread self, hash: &Hash) -> Result<bool, DBException> {
let ro_tx = self.db.tx().map_err(Into::into)?;
let mut cursor_blockhash = ro_tx.cursor_dup::<table::blockhash>()?;
let mut cursor_blockhash = ro_tx.cursor_dup::<table::blockhash>()?;
Ok(cursor_blockhash.get_dup::<true>(&(), hash)?.is_some())
}
fn get_block_hash(&'thread self, height: &u64) -> Result<Hash, DBException> {
todo!()
let ro_tx = self.db.tx().map_err(Into::into)?;
let mut cursor_blockmetadata = ro_tx.cursor_dup::<table::blockmetadata>()?;
let metadata = cursor_blockmetadata
.get_dup::<false>(&(), height)?
.ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))?;
Ok(metadata.as_type().block_hash)
}
fn get_block_height(&'thread self, hash: &Hash) -> Result<u64, DBException> {
todo!()
let ro_tx = self.db.tx().map_err(Into::into)?;
let mut cursor_blockhash = ro_tx.cursor_dup::<table::blockhash>()?;
cursor_blockhash
.get_dup::<false>(&(), hash)?
.ok_or(DBException::NotFound(format!("Failed to find height of block: {}", hash)))
.map(|res| *res.as_type())
}
fn get_block_weight(&'thread self, height: &u64) -> Result<u64, DBException> {
todo!()
let ro_tx = self.db.tx().map_err(Into::into)?;
let mut cursor_blockmetadata = ro_tx.cursor_dup::<table::blockmetadata>()?;
let metadata = cursor_blockmetadata
.get_dup::<false>(&(), height)?
.ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))?;
Ok(metadata.as_type().weight)
}
fn get_block_already_generated_coins(&'thread self, height: &u64) -> Result<u64, DBException> {
todo!()
let ro_tx = self.db.tx().map_err(Into::into)?;
let mut cursor_blockmetadata = ro_tx.cursor_dup::<table::blockmetadata>()?;
let metadata = cursor_blockmetadata
.get_dup::<false>(&(), height)?
.ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))?;
Ok(metadata.as_type().total_coins_generated)
}
fn get_block_long_term_weight(&'thread self, height: &u64) -> Result<u64, DBException> {
todo!()
let ro_tx = self.db.tx().map_err(Into::into)?;
let mut cursor_blockmetadata = ro_tx.cursor_dup::<table::blockmetadata>()?;
let metadata = cursor_blockmetadata
.get_dup::<false>(&(), height)?
.ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))?;
Ok(metadata.as_type().long_term_block_weight)
}
fn get_block_timestamp(&'thread self, height: &u64) -> Result<u64, DBException> {
todo!()
let ro_tx = self.db.tx().map_err(Into::into)?;
let mut cursor_blockmetadata = ro_tx.cursor_dup::<table::blockmetadata>()?;
let metadata = cursor_blockmetadata
.get_dup::<false>(&(), height)?
.ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))?;
Ok(metadata.as_type().timestamp)
}
fn get_block_cumulative_rct_outputs(&'thread self, height: &u64) -> Result<u64, DBException> {
todo!()
let ro_tx = self.db.tx().map_err(Into::into)?;
let mut cursor_blockmetadata = ro_tx.cursor_dup::<table::blockmetadata>()?;
let metadata = cursor_blockmetadata
.get_dup::<false>(&(), height)?
.ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))?;
Ok(metadata.as_type().cum_rct)
}
fn get_block_cumulative_difficulty(&'thread self, height: &u64) -> Result<u128, DBException> {
let ro_tx = self.db.tx().map_err(Into::into)?;
let mut cursor_blockmetadata = ro_tx.cursor_dup::<table::blockmetadata>()?;
let metadata = cursor_blockmetadata
.get_dup::<false>(&(), height)?
.ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))?;
Ok(metadata.as_type().cumulative_difficulty)
}
fn get_block_difficulty(&'thread self, height: &u64) -> Result<u128, DBException> {
let ro_tx = self.db.tx().map_err(Into::into)?;
let mut cursor_blockmetadata = ro_tx.cursor_dup::<table::blockmetadata>()?;
let diff1 = cursor_blockmetadata
.get_dup::<false>(&(), height)?
.ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))?
.as_type()
.cumulative_difficulty;
let diff2 = cursor_blockmetadata
.get_dup::<false>(&(), &(height-1))?
.ok_or(DBException::NotFound(format!("Failed to find block's metadata at height : {}", height)))?
.as_type()
.cumulative_difficulty;
Ok(diff1-diff2)
}
fn get_block<const B: bool>(&'thread self, hash: &Hash) -> Result<Block, DBException> {
let ro_tx = self.db.tx().map_err(Into::into)?;
todo!()
}
fn get_block_from_height<const B: bool>(&'thread self, height: &u64) -> Result<Block, DBException> {
let ro_tx = self.db.tx().map_err(Into::into)?;
todo!()
}
fn get_block_header(&'thread self, hash: &Hash) -> Result<BlockHeader, DBException> {
let ro_tx = self.db.tx().map_err(Into::into)?;
todo!()
}
fn get_block_header_from_height(&'thread self, height: &u64) -> Result<BlockHeader, DBException> {
let ro_tx = self.db.tx().map_err(Into::into)?;
todo!()
}
fn get_top_block(&'thread self) -> Result<Block, DBException> {
let ro_tx = self.db.tx().map_err(Into::into)?;
todo!()
}
fn get_top_block_hash(&'thread self) -> Result<Hash, DBException> {
let ro_tx = self.db.tx().map_err(Into::into)?;
todo!()
}

View file

@ -38,6 +38,12 @@ pub trait ReadInterface<'thread> {
/// `get_block_cumulative_rct_outputs` fetch the amount of RingCT outputs at the given block's height
fn get_block_cumulative_rct_outputs(&'thread self, height: &u64) -> Result<u64, DBException>;
/// `get_block_cumulative_difficulty` fetch the requested block's cumulative difficulty (with its given height)
fn get_block_cumulative_difficulty(&'thread self, height: &u64) -> Result<u128, DBException>;
/// `get_block_difficulty` fetch the requested block's cumulative difficulty (with its given height)
fn get_block_difficulty(&'thread self, height: &u64) -> Result<u128, DBException>;
/// `get_block` fetch the requested block (with its given hash)
fn get_block<const B: bool>(&'thread self, hash: &Hash) -> Result<Block, DBException>;

View file

@ -14,7 +14,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! The cuprate-db crate implement (as its name suggests) the relations between the blockchain/txpool objects and their databases.
//! `lib.rs` contains all the generics, trait and specification for interfaces between blockchain and a backend-agnostic database
//! `lib.rs` contains all the generics, trait and specification for interfaces between the reactor and a database
//! Every other files in this folder are implementation of these traits/methods to real storage engine.
//!
//! At the moment, the only storage engine available is MDBX.
@ -40,10 +40,7 @@ pub mod types;
const DEFAULT_BLOCKCHAIN_DATABASE_DIRECTORY: &str = "blockchain";
const DEFAULT_TXPOOL_DATABASE_DIRECTORY: &str = "txpool_mem";
const BINCODE_CONFIG: bincode::config::Configuration<
bincode::config::LittleEndian,
bincode::config::Fixint,
> = bincode::config::standard().with_fixed_int_encoding();
// ------------------------------------------| Errors |------------------------------------------

View file

@ -26,6 +26,7 @@ pub trait Table: Send + Sync + 'static + Clone {
pub trait DupTable: Table {
// Subkey of the table (prefix of the data)
// Warning: it should be a type with constant-size
type SubKey: Encode + Decode + Ord;
}

View file

@ -15,7 +15,7 @@ use monero::{
// ---- BLOCKS ----
#[derive(Clone, Debug, Encode, Decode, PartialEq)]
#[derive(Clone, Debug, PartialEq)]
/// [`BlockMetadata`] is a struct containing metadata of a block such as the block's `timestamp`, the `total_coins_generated` at this height, its `weight`, its difficulty (`diff_lo`)
/// and cumulative difficulty (`diff_hi`), the `block_hash`, the cumulative RingCT (`cum_rct`) and its long term weight (`long_term_block_weight`). The monerod's struct equivalent is `mdb_block_info_4`
/// This struct is used in [`crate::table::blockmetadata`] table.
@ -29,7 +29,7 @@ pub struct BlockMetadata {
/// Block's cumulative_difficulty. In monerod this field would have been split into two `u64`, since cpp don't support *natively* `uint128_t`/`u128`
pub cumulative_difficulty: u128,
/// Block's hash
pub block_hash: Compat<Hash>,
pub block_hash: Hash,
/// Cumulative number of RingCT outputs up to this block
pub cum_rct: u64,
/// Block's long term weight