From 8404844c4e4ae2687d1bc44611f8e9f8c3ffeaed Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 31 Mar 2024 10:20:59 -0400 Subject: [PATCH] Create a Snapshot within RocksDB when starting a transaction This ensures if a TXN is reading a value, and another TXN mutates it, the TXN's actions are atomic to the first value (and not affected by the inconsistency). Because we can't replicate this with parity-db, I'm not sure this is worth committing. --- common/db/src/rocks.rs | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/common/db/src/rocks.rs b/common/db/src/rocks.rs index 6a724563..9948ed3b 100644 --- a/common/db/src/rocks.rs +++ b/common/db/src/rocks.rs @@ -1,33 +1,40 @@ -use std::sync::Arc; +use std::{sync::Arc, collections::HashSet}; use rocksdb::{ DBCompressionType, ThreadMode, SingleThreaded, LogLevel, WriteOptions, - Transaction as RocksTransaction, Options, OptimisticTransactionDB, + Transaction as RocksTransaction, Options, OptimisticTransactionDB, SnapshotWithThreadMode, }; use crate::*; -pub struct Transaction<'a, T: ThreadMode>( - RocksTransaction<'a, OptimisticTransactionDB>, - &'a OptimisticTransactionDB, -); +pub struct Transaction<'a, T: ThreadMode> { + dirtied_keys: HashSet>, + txn: RocksTransaction<'a, OptimisticTransactionDB>, + snapshot: SnapshotWithThreadMode<'a, OptimisticTransactionDB>, + db: &'a OptimisticTransactionDB, +} impl Get for Transaction<'_, T> { fn get(&self, key: impl AsRef<[u8]>) -> Option> { - self.0.get(key).expect("couldn't read from RocksDB via transaction") + if self.dirtied_keys.contains(key.as_ref()) { + return self.txn.get(key).expect("couldn't read from RocksDB via transaction"); + } + self.snapshot.get(key).expect("couldn't read from RocksDB via snapshot") } } impl DbTxn for Transaction<'_, T> { fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) { - self.0.put(key, value).expect("couldn't write to RocksDB via transaction") + self.dirtied_keys.insert(key.as_ref().to_vec()); + self.txn.put(key, value).expect("couldn't write to RocksDB via transaction") } fn del(&mut self, key: impl AsRef<[u8]>) { - self.0.delete(key).expect("couldn't delete from RocksDB via transaction") + self.dirtied_keys.insert(key.as_ref().to_vec()); + self.txn.delete(key).expect("couldn't delete from RocksDB via transaction") } fn commit(self) { - self.0.commit().expect("couldn't commit to RocksDB via transaction"); - self.1.flush_wal(true).expect("couldn't flush RocksDB WAL"); - self.1.flush().expect("couldn't flush RocksDB"); + self.txn.commit().expect("couldn't commit to RocksDB via transaction"); + self.db.flush_wal(true).expect("couldn't flush RocksDB WAL"); + self.db.flush().expect("couldn't flush RocksDB"); } } @@ -41,7 +48,12 @@ impl Db for Arc> { fn txn(&mut self) -> Self::Transaction<'_> { let mut opts = WriteOptions::default(); opts.set_sync(true); - Transaction(self.transaction_opt(&opts, &Default::default()), &**self) + Transaction { + dirtied_keys: HashSet::new(), + txn: self.transaction_opt(&opts, &Default::default()), + snapshot: self.snapshot(), + db: &**self, + } } }