diff --git a/common/db/src/rocks.rs b/common/db/src/rocks.rs index 6a724563..9948ed3b 100644 --- a/common/db/src/rocks.rs +++ b/common/db/src/rocks.rs @@ -1,33 +1,40 @@ -use std::sync::Arc; +use std::{sync::Arc, collections::HashSet}; use rocksdb::{ DBCompressionType, ThreadMode, SingleThreaded, LogLevel, WriteOptions, - Transaction as RocksTransaction, Options, OptimisticTransactionDB, + Transaction as RocksTransaction, Options, OptimisticTransactionDB, SnapshotWithThreadMode, }; use crate::*; -pub struct Transaction<'a, T: ThreadMode>( - RocksTransaction<'a, OptimisticTransactionDB<T>>, - &'a OptimisticTransactionDB<T>, -); +pub struct Transaction<'a, T: ThreadMode> { + dirtied_keys: HashSet<Vec<u8>>, + txn: RocksTransaction<'a, OptimisticTransactionDB<T>>, + snapshot: SnapshotWithThreadMode<'a, OptimisticTransactionDB<T>>, + db: &'a OptimisticTransactionDB<T>, +} impl<T: ThreadMode> Get for Transaction<'_, T> { fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> { - self.0.get(key).expect("couldn't read from RocksDB via transaction") + if self.dirtied_keys.contains(key.as_ref()) { + return self.txn.get(key).expect("couldn't read from RocksDB via transaction"); + } + self.snapshot.get(key).expect("couldn't read from RocksDB via snapshot") } } impl<T: ThreadMode> DbTxn for Transaction<'_, T> { fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) { - self.0.put(key, value).expect("couldn't write to RocksDB via transaction") + self.dirtied_keys.insert(key.as_ref().to_vec()); + self.txn.put(key, value).expect("couldn't write to RocksDB via transaction") } fn del(&mut self, key: impl AsRef<[u8]>) { - self.0.delete(key).expect("couldn't delete from RocksDB via transaction") + self.dirtied_keys.insert(key.as_ref().to_vec()); + self.txn.delete(key).expect("couldn't delete from RocksDB via transaction") } fn commit(self) { - self.0.commit().expect("couldn't commit to RocksDB via transaction"); - self.1.flush_wal(true).expect("couldn't flush RocksDB WAL"); - self.1.flush().expect("couldn't flush RocksDB"); + self.txn.commit().expect("couldn't commit to RocksDB via transaction"); + self.db.flush_wal(true).expect("couldn't flush RocksDB WAL"); + self.db.flush().expect("couldn't flush RocksDB"); } } @@ -41,7 +48,12 @@ impl<T: Send + ThreadMode + 'static> Db for Arc<OptimisticTransactionDB<T>> { fn txn(&mut self) -> Self::Transaction<'_> { let mut opts = WriteOptions::default(); opts.set_sync(true); - Transaction(self.transaction_opt(&opts, &Default::default()), &**self) + Transaction { + dirtied_keys: HashSet::new(), + txn: self.transaction_opt(&opts, &Default::default()), + snapshot: self.snapshot(), + db: &**self, + } } }