Create a Snapshot within RocksDB when starting a transaction

This ensures if a TXN is reading a value, and another TXN mutates it, the TXN's
actions are atomic to the first value (and not affected by the inconsistency).

Because we can't replicate this with parity-db, I'm not sure this is worth
committing.
This commit is contained in:
Luke Parker 2024-03-31 10:20:59 -04:00
parent 93be7a3067
commit 8404844c4e
No known key found for this signature in database

View file

@ -1,33 +1,40 @@
use std::sync::Arc; use std::{sync::Arc, collections::HashSet};
use rocksdb::{ use rocksdb::{
DBCompressionType, ThreadMode, SingleThreaded, LogLevel, WriteOptions, DBCompressionType, ThreadMode, SingleThreaded, LogLevel, WriteOptions,
Transaction as RocksTransaction, Options, OptimisticTransactionDB, Transaction as RocksTransaction, Options, OptimisticTransactionDB, SnapshotWithThreadMode,
}; };
use crate::*; use crate::*;
pub struct Transaction<'a, T: ThreadMode>( pub struct Transaction<'a, T: ThreadMode> {
RocksTransaction<'a, OptimisticTransactionDB<T>>, dirtied_keys: HashSet<Vec<u8>>,
&'a OptimisticTransactionDB<T>, txn: RocksTransaction<'a, OptimisticTransactionDB<T>>,
); snapshot: SnapshotWithThreadMode<'a, OptimisticTransactionDB<T>>,
db: &'a OptimisticTransactionDB<T>,
}
impl<T: ThreadMode> Get for Transaction<'_, T> { impl<T: ThreadMode> Get for Transaction<'_, T> {
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> { fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> {
self.0.get(key).expect("couldn't read from RocksDB via transaction") if self.dirtied_keys.contains(key.as_ref()) {
return self.txn.get(key).expect("couldn't read from RocksDB via transaction");
}
self.snapshot.get(key).expect("couldn't read from RocksDB via snapshot")
} }
} }
impl<T: ThreadMode> DbTxn for Transaction<'_, T> { impl<T: ThreadMode> DbTxn for Transaction<'_, T> {
fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) { fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) {
self.0.put(key, value).expect("couldn't write to RocksDB via transaction") self.dirtied_keys.insert(key.as_ref().to_vec());
self.txn.put(key, value).expect("couldn't write to RocksDB via transaction")
} }
fn del(&mut self, key: impl AsRef<[u8]>) { fn del(&mut self, key: impl AsRef<[u8]>) {
self.0.delete(key).expect("couldn't delete from RocksDB via transaction") self.dirtied_keys.insert(key.as_ref().to_vec());
self.txn.delete(key).expect("couldn't delete from RocksDB via transaction")
} }
fn commit(self) { fn commit(self) {
self.0.commit().expect("couldn't commit to RocksDB via transaction"); self.txn.commit().expect("couldn't commit to RocksDB via transaction");
self.1.flush_wal(true).expect("couldn't flush RocksDB WAL"); self.db.flush_wal(true).expect("couldn't flush RocksDB WAL");
self.1.flush().expect("couldn't flush RocksDB"); self.db.flush().expect("couldn't flush RocksDB");
} }
} }
@ -41,7 +48,12 @@ impl<T: Send + ThreadMode + 'static> Db for Arc<OptimisticTransactionDB<T>> {
fn txn(&mut self) -> Self::Transaction<'_> { fn txn(&mut self) -> Self::Transaction<'_> {
let mut opts = WriteOptions::default(); let mut opts = WriteOptions::default();
opts.set_sync(true); opts.set_sync(true);
Transaction(self.transaction_opt(&opts, &Default::default()), &**self) Transaction {
dirtied_keys: HashSet::new(),
txn: self.transaction_opt(&opts, &Default::default()),
snapshot: self.snapshot(),
db: &**self,
}
} }
} }