Use OptimisticTransactionDb

Exposes flush calls.

Adds safety, at the cost of a panic risk, as multiple TXNs simultaneously
writing to a key will now cause a panic. This should be fine and the safety is
appreciated.
This commit is contained in:
Luke Parker 2024-03-20 21:44:53 -04:00
parent 1f2b9376f9
commit c706d8664a

View file

@ -1,44 +1,51 @@
use std::sync::Arc; use std::sync::Arc;
use rocksdb::{ use rocksdb::{
DBCompressionType, ThreadMode, SingleThreaded, LogLevel, WriteOptions, Transaction, Options, DBCompressionType, ThreadMode, SingleThreaded, LogLevel, WriteOptions,
TransactionDB, Transaction as RocksTransaction, Options, OptimisticTransactionDB,
}; };
use crate::*; use crate::*;
impl<T: ThreadMode> Get for Transaction<'_, TransactionDB<T>> { pub struct Transaction<'a, T: ThreadMode>(
RocksTransaction<'a, OptimisticTransactionDB<T>>,
&'a OptimisticTransactionDB<T>,
);
impl<T: ThreadMode> Get for Transaction<'_, T> {
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> { fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> {
self.get(key).expect("couldn't read from RocksDB via transaction") self.0.get(key).expect("couldn't read from RocksDB via transaction")
} }
} }
impl<T: ThreadMode> DbTxn for Transaction<'_, TransactionDB<T>> { impl<T: ThreadMode> DbTxn for Transaction<'_, T> {
fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) { fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) {
Transaction::put(self, key, value).expect("couldn't write to RocksDB via transaction") self.0.put(key, value).expect("couldn't write to RocksDB via transaction")
} }
fn del(&mut self, key: impl AsRef<[u8]>) { fn del(&mut self, key: impl AsRef<[u8]>) {
self.delete(key).expect("couldn't delete from RocksDB via transaction") self.0.delete(key).expect("couldn't delete from RocksDB via transaction")
} }
fn commit(self) { fn commit(self) {
Transaction::commit(self).expect("couldn't commit to RocksDB via transaction") self.0.commit().expect("couldn't commit to RocksDB via transaction");
self.1.flush_wal(true).expect("couldn't flush RocksDB WAL");
self.1.flush().expect("couldn't flush RocksDB");
} }
} }
impl<T: ThreadMode> Get for Arc<TransactionDB<T>> { impl<T: ThreadMode> Get for Arc<OptimisticTransactionDB<T>> {
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> { fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> {
TransactionDB::get(self, key).expect("couldn't read from RocksDB") OptimisticTransactionDB::get(self, key).expect("couldn't read from RocksDB")
} }
} }
impl<T: ThreadMode + 'static> Db for Arc<TransactionDB<T>> { impl<T: Send + ThreadMode + 'static> Db for Arc<OptimisticTransactionDB<T>> {
type Transaction<'a> = Transaction<'a, TransactionDB<T>>; type Transaction<'a> = Transaction<'a, T>;
fn txn(&mut self) -> Self::Transaction<'_> { fn txn(&mut self) -> Self::Transaction<'_> {
let mut opts = WriteOptions::default(); let mut opts = WriteOptions::default();
opts.set_sync(true); opts.set_sync(true);
self.transaction_opt(&opts, &Default::default()) Transaction(self.transaction_opt(&opts, &Default::default()), &**self)
} }
} }
pub type RocksDB = Arc<TransactionDB<SingleThreaded>>; pub type RocksDB = Arc<OptimisticTransactionDB<SingleThreaded>>;
pub fn new_rocksdb(path: &str) -> RocksDB { pub fn new_rocksdb(path: &str) -> RocksDB {
let mut options = Options::default(); let mut options = Options::default();
options.create_if_missing(true); options.create_if_missing(true);
@ -54,5 +61,5 @@ pub fn new_rocksdb(path: &str) -> RocksDB {
options.set_max_log_file_size(1024 * 1024); options.set_max_log_file_size(1024 * 1024);
options.set_recycle_log_file_num(1); options.set_recycle_log_file_num(1);
Arc::new(TransactionDB::open(&options, &Default::default(), path).unwrap()) Arc::new(OptimisticTransactionDB::open(&options, path).unwrap())
} }