diff --git a/common/db/src/rocks.rs b/common/db/src/rocks.rs index 20d39666..6a724563 100644 --- a/common/db/src/rocks.rs +++ b/common/db/src/rocks.rs @@ -1,44 +1,51 @@ use std::sync::Arc; use rocksdb::{ - DBCompressionType, ThreadMode, SingleThreaded, LogLevel, WriteOptions, Transaction, Options, - TransactionDB, + DBCompressionType, ThreadMode, SingleThreaded, LogLevel, WriteOptions, + Transaction as RocksTransaction, Options, OptimisticTransactionDB, }; use crate::*; -impl Get for Transaction<'_, TransactionDB> { +pub struct Transaction<'a, T: ThreadMode>( + RocksTransaction<'a, OptimisticTransactionDB>, + &'a OptimisticTransactionDB, +); + +impl Get for Transaction<'_, T> { fn get(&self, key: impl AsRef<[u8]>) -> Option> { - self.get(key).expect("couldn't read from RocksDB via transaction") + self.0.get(key).expect("couldn't read from RocksDB via transaction") } } -impl DbTxn for Transaction<'_, TransactionDB> { +impl DbTxn for Transaction<'_, T> { fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) { - Transaction::put(self, key, value).expect("couldn't write to RocksDB via transaction") + self.0.put(key, value).expect("couldn't write to RocksDB via transaction") } fn del(&mut self, key: impl AsRef<[u8]>) { - self.delete(key).expect("couldn't delete from RocksDB via transaction") + self.0.delete(key).expect("couldn't delete from RocksDB via transaction") } fn commit(self) { - Transaction::commit(self).expect("couldn't commit to RocksDB via transaction") + self.0.commit().expect("couldn't commit to RocksDB via transaction"); + self.1.flush_wal(true).expect("couldn't flush RocksDB WAL"); + self.1.flush().expect("couldn't flush RocksDB"); } } -impl Get for Arc> { +impl Get for Arc> { fn get(&self, key: impl AsRef<[u8]>) -> Option> { - TransactionDB::get(self, key).expect("couldn't read from RocksDB") + OptimisticTransactionDB::get(self, key).expect("couldn't read from RocksDB") } } -impl Db for Arc> { - type Transaction<'a> = Transaction<'a, TransactionDB>; +impl Db for Arc> { + type Transaction<'a> = Transaction<'a, T>; fn txn(&mut self) -> Self::Transaction<'_> { let mut opts = WriteOptions::default(); opts.set_sync(true); - self.transaction_opt(&opts, &Default::default()) + Transaction(self.transaction_opt(&opts, &Default::default()), &**self) } } -pub type RocksDB = Arc>; +pub type RocksDB = Arc>; pub fn new_rocksdb(path: &str) -> RocksDB { let mut options = Options::default(); options.create_if_missing(true); @@ -54,5 +61,5 @@ pub fn new_rocksdb(path: &str) -> RocksDB { options.set_max_log_file_size(1024 * 1024); options.set_recycle_log_file_num(1); - Arc::new(TransactionDB::open(&options, &Default::default(), path).unwrap()) + Arc::new(OptimisticTransactionDB::open(&options, path).unwrap()) }