diff --git a/common/db/src/lib.rs b/common/db/src/lib.rs index 7c8bae2b..1ff5bb82 100644 --- a/common/db/src/lib.rs +++ b/common/db/src/lib.rs @@ -29,8 +29,8 @@ pub trait Db: 'static + Send + Sync + Clone + Debug + Get { } /// An atomic operation for the in-memory databae. -#[derive(Debug)] #[must_use] +#[derive(PartialEq, Eq, Debug)] pub struct MemDbTxn<'a>(&'a MemDb, HashMap, Vec>, HashSet>); impl<'a> Get for MemDbTxn<'a> { @@ -65,6 +65,13 @@ impl<'a> DbTxn for MemDbTxn<'a> { #[derive(Clone, Debug)] pub struct MemDb(Arc, Vec>>>); +impl PartialEq for MemDb { + fn eq(&self, other: &MemDb) -> bool { + *self.0.read().unwrap() == *other.0.read().unwrap() + } +} +impl Eq for MemDb {} + impl Default for MemDb { fn default() -> MemDb { MemDb(Arc::new(RwLock::new(HashMap::new()))) diff --git a/coordinator/tributary/src/blockchain.rs b/coordinator/tributary/src/blockchain.rs index c7d5dae9..91a8a2b3 100644 --- a/coordinator/tributary/src/blockchain.rs +++ b/coordinator/tributary/src/blockchain.rs @@ -19,27 +19,27 @@ pub(crate) struct Blockchain { next_nonces: HashMap<::G, u32>, provided: ProvidedTransactions, - mempool: Mempool, + mempool: Mempool, } impl Blockchain { fn tip_key(&self) -> Vec { - D::key(b"tributary", b"tip", self.genesis) + D::key(b"tributary_blockchain", b"tip", self.genesis) } fn block_number_key(&self) -> Vec { - D::key(b"tributary", b"block_number", self.genesis) + D::key(b"tributary_blockchain", b"block_number", self.genesis) } fn block_key(&self, hash: &[u8; 32]) -> Vec { // Since block hashes incorporate their parent, and the first parent is the genesis, this is // fine not incorporating the hash unless there's a hash collision - D::key(b"tributary", b"block", hash) + D::key(b"tributary_blockchain", b"block", hash) } fn commit_key(&self, hash: &[u8; 32]) -> Vec { - D::key(b"tributary", b"commit", hash) + D::key(b"tributary_blockchain", b"commit", hash) } fn next_nonce_key(&self, signer: &::G) -> Vec { D::key( - b"tributary", + b"tributary_blockchain", b"next_nonce", [self.genesis.as_ref(), signer.to_bytes().as_ref()].concat(), ) @@ -50,8 +50,6 @@ impl Blockchain { genesis: [u8; 32], participants: &[::G], ) -> Self { - // TODO: Reload mempool - let mut next_nonces = HashMap::new(); for participant in participants { next_nonces.insert(*participant, 0); @@ -65,8 +63,8 @@ impl Blockchain { tip: genesis, next_nonces, - provided: ProvidedTransactions::new(db, genesis), - mempool: Mempool::new(genesis), + provided: ProvidedTransactions::new(db.clone(), genesis), + mempool: Mempool::new(db, genesis), }; if let Some((block_number, tip)) = { diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index ec386f80..f5427431 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -98,11 +98,11 @@ impl Tributary { key: Zeroizing<::F>, validators: HashMap<::G, u64>, p2p: P, - ) -> Self { + ) -> Option { let validators_vec = validators.keys().cloned().collect::>(); let signer = Arc::new(Signer::new(genesis, key)); - let validators = Arc::new(Validators::new(genesis, validators)); + let validators = Arc::new(Validators::new(genesis, validators)?); let mut blockchain = Blockchain::new(db, genesis, &validators_vec); let block_number = blockchain.block_number(); @@ -123,7 +123,7 @@ impl Tributary { TendermintMachine::new(network.clone(), block_number, start_time, proposal).await; tokio::task::spawn(machine.run()); - Self { network, synced_block, messages } + Some(Self { network, synced_block, messages }) } pub fn provide_transaction(&self, tx: T) -> Result<(), ProvidedError> { diff --git a/coordinator/tributary/src/mempool.rs b/coordinator/tributary/src/mempool.rs index fd923293..6f67b2c5 100644 --- a/coordinator/tributary/src/mempool.rs +++ b/coordinator/tributary/src/mempool.rs @@ -2,18 +2,54 @@ use std::collections::HashMap; use ciphersuite::{Ciphersuite, Ristretto}; +use serai_db::{DbTxn, Db}; + use crate::{ACCOUNT_MEMPOOL_LIMIT, Signed, TransactionKind, Transaction, verify_transaction}; #[derive(Clone, PartialEq, Eq, Debug)] -pub(crate) struct Mempool { +pub(crate) struct Mempool { + db: D, genesis: [u8; 32], + txs: HashMap<[u8; 32], T>, next_nonces: HashMap<::G, u32>, } -impl Mempool { - pub(crate) fn new(genesis: [u8; 32]) -> Self { - Mempool { genesis, txs: HashMap::new(), next_nonces: HashMap::new() } +impl Mempool { + fn transaction_key(&self, hash: &[u8]) -> Vec { + D::key(b"tributary_mempool", b"transaction", [self.genesis.as_ref(), hash].concat()) + } + fn current_mempool_key(&self) -> Vec { + D::key(b"tributary_mempool", b"current", self.genesis) + } + + pub(crate) fn new(db: D, genesis: [u8; 32]) -> Self { + let mut res = Mempool { db, genesis, txs: HashMap::new(), next_nonces: HashMap::new() }; + + let current_mempool = res.db.get(res.current_mempool_key()).unwrap_or(vec![]); + let mut hash = [0; 32]; + let mut i = 0; + while i < current_mempool.len() { + hash.copy_from_slice(¤t_mempool[i .. (i + 32)]); + let tx = + T::read::<&[u8]>(&mut res.db.get(res.transaction_key(&hash)).unwrap().as_ref()).unwrap(); + + match tx.kind() { + TransactionKind::Signed(Signed { signer, nonce, .. }) => { + if let Some(prev) = res.next_nonces.insert(*signer, nonce + 1) { + // These mempool additions should've been ordered + assert!(prev < *nonce); + } + } + _ => panic!("mempool database had a non-signed transaction"), + } + + debug_assert_eq!(tx.hash(), hash); + res.txs.insert(hash, tx); + i += 32; + } + + res } /// Returns true if this is a valid, new transaction. @@ -53,7 +89,20 @@ impl Mempool { } assert_eq!(self.next_nonces[signer], nonce + 1); - self.txs.insert(tx.hash(), tx); + let tx_hash = tx.hash(); + + let transaction_key = self.transaction_key(&tx_hash); + let current_mempool_key = self.current_mempool_key(); + let mut current_mempool = self.db.get(¤t_mempool_key).unwrap_or(vec![]); + + let mut txn = self.db.txn(); + txn.put(transaction_key, tx.serialize()); + current_mempool.extend(tx_hash); + txn.put(current_mempool_key, current_mempool); + txn.commit(); + + self.txs.insert(tx_hash, tx); + true } _ => false, @@ -77,7 +126,7 @@ impl Mempool { match tx.kind() { TransactionKind::Signed(Signed { signer, nonce, .. }) => { if blockchain_next_nonces[signer] > *nonce { - self.txs.remove(&hash); + self.remove(&hash); continue; } } @@ -103,6 +152,27 @@ impl Mempool { /// Remove a transaction from the mempool. pub(crate) fn remove(&mut self, tx: &[u8; 32]) { + let transaction_key = self.transaction_key(tx); + let current_mempool_key = self.current_mempool_key(); + let current_mempool = self.db.get(¤t_mempool_key).unwrap_or(vec![]); + + let mut i = 0; + while i < current_mempool.len() { + if ¤t_mempool[i .. (i + 32)] == tx { + break; + } + i += 32; + } + + // This doesn't have to be atomic with any greater operation + let mut txn = self.db.txn(); + txn.del(transaction_key); + if i != current_mempool.len() { + txn + .put(current_mempool_key, [¤t_mempool[.. i], ¤t_mempool[(i + 32) ..]].concat()); + } + txn.commit(); + self.txs.remove(tx); } diff --git a/coordinator/tributary/src/provided.rs b/coordinator/tributary/src/provided.rs index 957034f1..2a9f5602 100644 --- a/coordinator/tributary/src/provided.rs +++ b/coordinator/tributary/src/provided.rs @@ -28,22 +28,26 @@ pub struct ProvidedTransactions { } impl ProvidedTransactions { - fn provided_key(&self, hash: &[u8]) -> Vec { - D::key(b"tributary", b"provided", [self.genesis.as_ref(), hash].concat()) + fn transaction_key(&self, hash: &[u8]) -> Vec { + D::key(b"tributary_provided", b"transaction", [self.genesis.as_ref(), hash].concat()) } - fn currently_provided_key(&self) -> Vec { - D::key(b"tributary", b"currently_provided", self.genesis) + fn current_provided_key(&self) -> Vec { + D::key(b"tributary_provided", b"current", self.genesis) } pub(crate) fn new(db: D, genesis: [u8; 32]) -> Self { let mut res = ProvidedTransactions { db, genesis, transactions: VecDeque::new() }; - let currently_provided = res.db.get(res.currently_provided_key()).unwrap_or(vec![]); + let currently_provided = res.db.get(res.current_provided_key()).unwrap_or(vec![]); let mut i = 0; while i < currently_provided.len() { res.transactions.push_back( T::read::<&[u8]>( - &mut res.db.get(res.provided_key(¤tly_provided[i .. (i + 32)])).unwrap().as_ref(), + &mut res + .db + .get(res.transaction_key(¤tly_provided[i .. (i + 32)])) + .unwrap() + .as_ref(), ) .unwrap(), ); @@ -65,18 +69,18 @@ impl ProvidedTransactions { } let tx_hash = tx.hash(); - let provided_key = self.provided_key(&tx_hash); + let provided_key = self.transaction_key(&tx_hash); if self.db.get(&provided_key).is_some() { Err(ProvidedError::AlreadyProvided)?; } - let currently_provided_key = self.currently_provided_key(); - let mut currently_provided = self.db.get(¤tly_provided_key).unwrap_or(vec![]); + let current_provided_key = self.current_provided_key(); + let mut currently_provided = self.db.get(¤t_provided_key).unwrap_or(vec![]); let mut txn = self.db.txn(); txn.put(provided_key, tx.serialize()); currently_provided.extend(tx_hash); - txn.put(currently_provided_key, currently_provided); + txn.put(current_provided_key, currently_provided); txn.commit(); self.transactions.push_back(tx); @@ -87,9 +91,9 @@ impl ProvidedTransactions { pub(crate) fn complete(&mut self, txn: &mut D::Transaction<'_>, tx: [u8; 32]) { assert_eq!(self.transactions.pop_front().unwrap().hash(), tx); - let currently_provided_key = self.currently_provided_key(); - let mut currently_provided = txn.get(¤tly_provided_key).unwrap(); + let current_provided_key = self.current_provided_key(); + let mut currently_provided = txn.get(¤t_provided_key).unwrap(); assert_eq!(¤tly_provided.drain(.. 32).collect::>(), &tx); - txn.put(currently_provided_key, currently_provided); + txn.put(current_provided_key, currently_provided); } } diff --git a/coordinator/tributary/src/tendermint.rs b/coordinator/tributary/src/tendermint.rs index 2dad128f..81ec7cb0 100644 --- a/coordinator/tributary/src/tendermint.rs +++ b/coordinator/tributary/src/tendermint.rs @@ -126,7 +126,7 @@ impl Validators { pub(crate) fn new( genesis: [u8; 32], validators: HashMap<::G, u64>, - ) -> Validators { + ) -> Option { let mut total_weight = 0; let mut weights = HashMap::new(); @@ -134,8 +134,9 @@ impl Validators { let mut robin = vec![]; for (validator, weight) in validators { let validator = validator.to_bytes(); - // TODO: Make an error out of this - assert!(weight != 0); + if weight == 0 { + return None; + } total_weight += weight; weights.insert(validator, weight); @@ -145,7 +146,7 @@ impl Validators { } robin.shuffle(&mut ChaCha12Rng::from_seed(transcript.rng_seed(b"robin"))); - Validators { genesis, total_weight, weights, robin } + Some(Validators { genesis, total_weight, weights, robin }) } } diff --git a/coordinator/tributary/src/tests/mempool.rs b/coordinator/tributary/src/tests/mempool.rs index e361db50..3105e4cc 100644 --- a/coordinator/tributary/src/tests/mempool.rs +++ b/coordinator/tributary/src/tests/mempool.rs @@ -5,20 +5,23 @@ use rand::{RngCore, rngs::OsRng}; use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto}; +use serai_db::MemDb; + use crate::{ ACCOUNT_MEMPOOL_LIMIT, Transaction, Mempool, tests::{SignedTransaction, signed_transaction}, }; -fn new_mempool() -> ([u8; 32], Mempool) { +fn new_mempool() -> ([u8; 32], MemDb, Mempool) { let mut genesis = [0; 32]; OsRng.fill_bytes(&mut genesis); - (genesis, Mempool::new(genesis)) + let db = MemDb::new(); + (genesis, db.clone(), Mempool::new(db, genesis)) } #[test] fn mempool_addition() { - let (genesis, mut mempool) = new_mempool::(); + let (genesis, db, mut mempool) = new_mempool::(); let key = Zeroizing::new(::F::random(&mut OsRng)); @@ -31,6 +34,9 @@ fn mempool_addition() { assert!(mempool.add(&blockchain_next_nonces, true, first_tx.clone())); assert_eq!(mempool.next_nonce(&signer), Some(1)); + // Test reloading works + assert_eq!(mempool, Mempool::new(db, genesis)); + // Adding it again should fail assert!(!mempool.add(&blockchain_next_nonces, true, first_tx.clone())); @@ -67,7 +73,7 @@ fn mempool_addition() { #[test] fn too_many_mempool() { - let (genesis, mut mempool) = new_mempool::(); + let (genesis, _, mut mempool) = new_mempool::(); let key = Zeroizing::new(::F::random(&mut OsRng)); let signer = signed_transaction(&mut OsRng, genesis, &key, 0).1.signer;