diff --git a/coordinator/tributary/tendermint/src/block.rs b/coordinator/tributary/tendermint/src/block.rs index 71dfb3cc..6dfacfdb 100644 --- a/coordinator/tributary/tendermint/src/block.rs +++ b/coordinator/tributary/tendermint/src/block.rs @@ -139,10 +139,8 @@ impl BlockData { // 27, 33, 41, 46, 60, 64 self.round_mut().step = data.step(); - // Only return a message to if we're actually a current validator and haven't prior posted a - // message + // Only return a message to if we're actually a current validator let round_number = self.round().number; - let step = data.step(); let res = self.validator_id.map(|validator_id| Message { sender: validator_id, block: self.number, @@ -150,21 +148,59 @@ impl BlockData { data, }); - if res.is_some() { + if let Some(res) = res.as_ref() { + const LATEST_BLOCK_KEY: &[u8] = b"tendermint-machine-sent_block"; + const LATEST_ROUND_KEY: &[u8] = b"tendermint-machine-sent_round"; + const PROPOSE_KEY: &[u8] = b"tendermint-machine-sent_propose"; + const PEVOTE_KEY: &[u8] = b"tendermint-machine-sent_prevote"; + const PRECOMMIT_KEY: &[u8] = b"tendermint-machine-sent_commit"; + + let genesis = self.genesis; + let key = |prefix: &[u8]| [prefix, &genesis].concat(); + let mut txn = self.db.txn(); - let key = [ - b"tendermint-machine_already_sent_message".as_ref(), - &self.genesis, - &self.number.0.to_le_bytes(), - &round_number.0.to_le_bytes(), - &step.encode(), - ] - .concat(); - // If we've already sent a message, return - if txn.get(&key).is_some() { + + // Ensure we haven't prior sent a message for a future block/round + let last_block_or_round = |txn: &mut ::Transaction<'_>, prefix, current| { + let key = key(prefix); + let latest = + u64::from_le_bytes(txn.get(key.as_slice()).unwrap_or(vec![0; 8]).try_into().unwrap()); + if latest > current { + None?; + } + if current > latest { + txn.put(&key, current.to_le_bytes()); + return Some(true); + } + Some(false) + }; + let new_block = last_block_or_round(&mut txn, LATEST_BLOCK_KEY, self.number.0)?; + if new_block { + // Delete the latest round key + txn.del(&key(LATEST_ROUND_KEY)); + } + let new_round = last_block_or_round(&mut txn, LATEST_ROUND_KEY, round_number.0.into())?; + if new_block || new_round { + // Delete the messages for the old round + txn.del(&key(PROPOSE_KEY)); + txn.del(&key(PEVOTE_KEY)); + txn.del(&key(PRECOMMIT_KEY)); + } + + // Check we haven't sent this message within this round + let msg_key = key(match res.data.step() { + Step::Propose => PROPOSE_KEY, + Step::Prevote => PEVOTE_KEY, + Step::Precommit => PRECOMMIT_KEY, + }); + if txn.get(&msg_key).is_some() { + assert!(!new_block); + assert!(!new_round); None?; } - txn.put(&key, []); + // Put this message to the DB + txn.put(&msg_key, res.encode()); + txn.commit(); }