Bind RocksDB into serai-db

This commit is contained in:
Luke Parker 2023-07-13 19:09:11 -04:00
parent c9bb284570
commit 62504b2622
No known key found for this signature in database
5 changed files with 133 additions and 83 deletions

5
Cargo.lock generated
View file

@ -4668,7 +4668,9 @@ dependencies = [
"glob", "glob",
"libc", "libc",
"libz-sys", "libz-sys",
"lz4-sys",
"tikv-jemalloc-sys", "tikv-jemalloc-sys",
"zstd-sys",
] ]
[[package]] [[package]]
@ -8633,6 +8635,9 @@ dependencies = [
[[package]] [[package]]
name = "serai-db" name = "serai-db"
version = "0.1.0" version = "0.1.0"
dependencies = [
"rocksdb",
]
[[package]] [[package]]
name = "serai-in-instructions-pallet" name = "serai-in-instructions-pallet"

View file

@ -11,3 +11,9 @@ edition = "2021"
[package.metadata.docs.rs] [package.metadata.docs.rs]
all-features = true all-features = true
rustdoc-args = ["--cfg", "docsrs"] rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
rocksdb = { version = "0.21", optional = true }
[features]
rocksdb = ["dep:rocksdb"]

View file

@ -1,24 +1,24 @@
use core::fmt::Debug; mod mem;
use std::{ pub use mem::*;
sync::{Arc, RwLock},
collections::{HashSet, HashMap}, #[cfg(feature = "rocksdb")]
}; mod rocks;
/// An object implementing get. /// An object implementing get.
pub trait Get: Send + Sync + Debug { pub trait Get {
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>>; fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>>;
} }
/// An atomic database operation. /// An atomic database operation.
#[must_use] #[must_use]
pub trait DbTxn: Send + Sync + Debug + Get { pub trait DbTxn: Send + Get {
fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>); fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>);
fn del(&mut self, key: impl AsRef<[u8]>); fn del(&mut self, key: impl AsRef<[u8]>);
fn commit(self); fn commit(self);
} }
/// A database supporting atomic operations. /// A database supporting atomic operations.
pub trait Db: 'static + Send + Sync + Clone + Debug + Get { pub trait Db: 'static + Send + Sync + Clone + Get {
type Transaction<'a>: DbTxn; type Transaction<'a>: DbTxn;
fn key(db_dst: &'static [u8], item_dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> { fn key(db_dst: &'static [u8], item_dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
let db_len = u8::try_from(db_dst.len()).unwrap(); let db_len = u8::try_from(db_dst.len()).unwrap();
@ -27,78 +27,3 @@ pub trait Db: 'static + Send + Sync + Clone + Debug + Get {
} }
fn txn(&mut self) -> Self::Transaction<'_>; fn txn(&mut self) -> Self::Transaction<'_>;
} }
/// An atomic operation for the in-memory databae.
#[must_use]
#[derive(PartialEq, Eq, Debug)]
pub struct MemDbTxn<'a>(&'a MemDb, HashMap<Vec<u8>, Vec<u8>>, HashSet<Vec<u8>>);
impl<'a> Get for MemDbTxn<'a> {
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> {
if self.2.contains(key.as_ref()) {
return None;
}
self
.1
.get(key.as_ref())
.cloned()
.or_else(|| self.0 .0.read().unwrap().get(key.as_ref()).cloned())
}
}
impl<'a> DbTxn for MemDbTxn<'a> {
fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) {
self.2.remove(key.as_ref());
self.1.insert(key.as_ref().to_vec(), value.as_ref().to_vec());
}
fn del(&mut self, key: impl AsRef<[u8]>) {
self.1.remove(key.as_ref());
self.2.insert(key.as_ref().to_vec());
}
fn commit(mut self) {
let mut db = self.0 .0.write().unwrap();
for (key, value) in self.1.drain() {
db.insert(key, value);
}
for key in self.2 {
db.remove(&key);
}
}
}
/// An in-memory database.
#[derive(Clone, Debug)]
pub struct MemDb(Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>>);
impl PartialEq for MemDb {
fn eq(&self, other: &MemDb) -> bool {
*self.0.read().unwrap() == *other.0.read().unwrap()
}
}
impl Eq for MemDb {}
impl Default for MemDb {
fn default() -> MemDb {
MemDb(Arc::new(RwLock::new(HashMap::new())))
}
}
impl MemDb {
/// Create a new in-memory database.
pub fn new() -> MemDb {
MemDb::default()
}
}
impl Get for MemDb {
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> {
self.0.read().unwrap().get(key.as_ref()).cloned()
}
}
impl Db for MemDb {
type Transaction<'a> = MemDbTxn<'a>;
fn txn(&mut self) -> MemDbTxn<'_> {
MemDbTxn(self, HashMap::new(), HashSet::new())
}
}
// TODO: Also bind RocksDB

80
common/db/src/mem.rs Normal file
View file

@ -0,0 +1,80 @@
use core::fmt::Debug;
use std::{
sync::{Arc, RwLock},
collections::{HashSet, HashMap},
};
use crate::*;
/// An atomic operation for the in-memory databae.
#[must_use]
#[derive(PartialEq, Eq, Debug)]
pub struct MemDbTxn<'a>(&'a MemDb, HashMap<Vec<u8>, Vec<u8>>, HashSet<Vec<u8>>);
impl<'a> Get for MemDbTxn<'a> {
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> {
if self.2.contains(key.as_ref()) {
return None;
}
self
.1
.get(key.as_ref())
.cloned()
.or_else(|| self.0 .0.read().unwrap().get(key.as_ref()).cloned())
}
}
impl<'a> DbTxn for MemDbTxn<'a> {
fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) {
self.2.remove(key.as_ref());
self.1.insert(key.as_ref().to_vec(), value.as_ref().to_vec());
}
fn del(&mut self, key: impl AsRef<[u8]>) {
self.1.remove(key.as_ref());
self.2.insert(key.as_ref().to_vec());
}
fn commit(mut self) {
let mut db = self.0 .0.write().unwrap();
for (key, value) in self.1.drain() {
db.insert(key, value);
}
for key in self.2 {
db.remove(&key);
}
}
}
/// An in-memory database.
#[derive(Clone, Debug)]
pub struct MemDb(Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>>);
impl PartialEq for MemDb {
fn eq(&self, other: &MemDb) -> bool {
*self.0.read().unwrap() == *other.0.read().unwrap()
}
}
impl Eq for MemDb {}
impl Default for MemDb {
fn default() -> MemDb {
MemDb(Arc::new(RwLock::new(HashMap::new())))
}
}
impl MemDb {
/// Create a new in-memory database.
pub fn new() -> MemDb {
MemDb::default()
}
}
impl Get for MemDb {
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> {
self.0.read().unwrap().get(key.as_ref()).cloned()
}
}
impl Db for MemDb {
type Transaction<'a> = MemDbTxn<'a>;
fn txn(&mut self) -> MemDbTxn<'_> {
MemDbTxn(self, HashMap::new(), HashSet::new())
}
}

34
common/db/src/rocks.rs Normal file
View file

@ -0,0 +1,34 @@
use std::sync::Arc;
use rocksdb::{ThreadMode, Transaction, TransactionDB};
use crate::*;
impl<T: ThreadMode> Get for Transaction<'_, TransactionDB<T>> {
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> {
self.get(key).expect("couldn't read from RocksDB via transaction")
}
}
impl<T: ThreadMode> DbTxn for Transaction<'_, TransactionDB<T>> {
fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) {
Transaction::put(self, key, value).expect("couldn't write to RocksDB via transaction")
}
fn del(&mut self, key: impl AsRef<[u8]>) {
self.delete(key).expect("couldn't delete from RocksDB via transaction")
}
fn commit(self) {
Transaction::commit(self).expect("couldn't commit to RocksDB via transaction")
}
}
impl<T: ThreadMode> Get for Arc<TransactionDB<T>> {
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> {
TransactionDB::get(self, key).expect("couldn't read from RocksDB")
}
}
impl<T: ThreadMode + 'static> Db for Arc<TransactionDB<T>> {
type Transaction<'a> = Transaction<'a, TransactionDB<T>>;
fn txn(&mut self) -> Self::Transaction<'_> {
self.transaction()
}
}