diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index b04b5388..cca8fbc0 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -175,8 +175,8 @@ pub async fn heartbeat_tributaries<D: Db, P: P2p>( for ActiveTributary { spec: _, tributary } in tributaries.read().await.values() { let tributary = tributary.read().await; let tip = tributary.tip().await; - let block_time = SystemTime::UNIX_EPOCH + - Duration::from_secs(tributary.time_of_block(&tip).await.unwrap_or(0)); + let block_time = + SystemTime::UNIX_EPOCH + Duration::from_secs(tributary.time_of_block(&tip).unwrap_or(0)); // Only trigger syncing if the block is more than a minute behind if SystemTime::now() > (block_time + Duration::from_secs(60)) { diff --git a/coordinator/src/tests/tributary/chain.rs b/coordinator/src/tests/tributary/chain.rs index ca0ccda6..d1d61251 100644 --- a/coordinator/src/tests/tributary/chain.rs +++ b/coordinator/src/tests/tributary/chain.rs @@ -124,14 +124,14 @@ pub async fn wait_for_tx_inclusion( continue; } - let mut queue = vec![tributary.block(&tip).await.unwrap()]; + let mut queue = vec![tributary.block(&tip).unwrap()]; let mut block = None; while { let parent = queue.last().unwrap().parent(); if parent == tributary.genesis() { false } else { - block = Some(tributary.block(&parent).await.unwrap()); + block = Some(tributary.block(&parent).unwrap()); block.as_ref().unwrap().hash() != last_checked } } { diff --git a/coordinator/src/tests/tributary/tx.rs b/coordinator/src/tests/tributary/tx.rs index 059781f2..e2458a4e 100644 --- a/coordinator/src/tests/tributary/tx.rs +++ b/coordinator/src/tests/tributary/tx.rs @@ -46,7 +46,7 @@ async fn tx_test() { // All tributaries should have acknowledged this transaction in a block for (_, tributary) in tributaries { - let block = tributary.block(&included_in).await.unwrap(); + let block = tributary.block(&included_in).unwrap(); assert_eq!(block.transactions, vec![tx.clone()]); } } diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index affdd689..ccd63351 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -307,7 +307,7 @@ pub async fn handle_new_blocks<D: Db, Pro: Processor, P: P2p>( let mut blocks = VecDeque::new(); // This is a new block, as per the prior if check - blocks.push_back(tributary.block(&latest).await.unwrap()); + blocks.push_back(tributary.block(&latest).unwrap()); let mut block = None; while { @@ -317,7 +317,7 @@ pub async fn handle_new_blocks<D: Db, Pro: Processor, P: P2p>( false } else { // Get this block - block = Some(tributary.block(&parent).await.unwrap()); + block = Some(tributary.block(&parent).unwrap()); // If it's the last block we've scanned, it's the end. Else, push it block.as_ref().unwrap().hash() != last_block } diff --git a/coordinator/tributary/src/blockchain.rs b/coordinator/tributary/src/blockchain.rs index 8038dcd1..82874ddb 100644 --- a/coordinator/tributary/src/blockchain.rs +++ b/coordinator/tributary/src/blockchain.rs @@ -29,12 +29,12 @@ impl<D: Db, T: Transaction> Blockchain<D, T> { fn block_number_key(&self) -> Vec<u8> { D::key(b"tributary_blockchain", b"block_number", self.genesis) } - fn block_key(&self, hash: &[u8; 32]) -> Vec<u8> { + fn block_key(hash: &[u8; 32]) -> Vec<u8> { // Since block hashes incorporate their parent, and the first parent is the genesis, this is // fine not incorporating the hash unless there's a hash collision D::key(b"tributary_blockchain", b"block", hash) } - fn commit_key(&self, hash: &[u8; 32]) -> Vec<u8> { + fn commit_key(hash: &[u8; 32]) -> Vec<u8> { D::key(b"tributary_blockchain", b"commit", hash) } fn next_nonce_key(&self, signer: &<Ristretto as Ciphersuite>::G) -> Vec<u8> { @@ -92,17 +92,17 @@ impl<D: Db, T: Transaction> Blockchain<D, T> { self.block_number } - pub(crate) fn block(&self, block: &[u8; 32]) -> Option<Block<T>> { - self - .db - .as_ref() - .unwrap() - .get(self.block_key(block)) + pub(crate) fn block_from_db(db: &D, block: &[u8; 32]) -> Option<Block<T>> { + db.get(Self::block_key(block)) .map(|bytes| Block::<T>::read::<&[u8]>(&mut bytes.as_ref()).unwrap()) } + pub(crate) fn commit_from_db(db: &D, block: &[u8; 32]) -> Option<Vec<u8>> { + db.get(Self::commit_key(block)) + } + pub(crate) fn commit(&self, block: &[u8; 32]) -> Option<Vec<u8>> { - self.db.as_ref().unwrap().get(self.commit_key(block)) + Self::commit_from_db(self.db.as_ref().unwrap(), block) } pub(crate) fn add_transaction(&mut self, internal: bool, tx: T) -> bool { @@ -155,8 +155,8 @@ impl<D: Db, T: Transaction> Blockchain<D, T> { self.block_number += 1; txn.put(self.block_number_key(), self.block_number.to_le_bytes()); - txn.put(self.block_key(&self.tip), block.serialize()); - txn.put(self.commit_key(&self.tip), commit); + txn.put(Self::block_key(&self.tip), block.serialize()); + txn.put(Self::commit_key(&self.tip), commit); for tx in &block.transactions { match tx.kind() { diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 931cdf56..0b38d1cf 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -83,6 +83,8 @@ impl<P: P2p> P2p for Arc<P> { #[derive(Clone)] pub struct Tributary<D: Db, T: Transaction, P: P2p> { + db: D, + genesis: [u8; 32], network: TendermintNetwork<D, T, P>, @@ -104,7 +106,7 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> { let signer = Arc::new(Signer::new(genesis, key)); let validators = Arc::new(Validators::new(genesis, validators)?); - let mut blockchain = Blockchain::new(db, genesis, &validators_vec); + let mut blockchain = Blockchain::new(db.clone(), genesis, &validators_vec); let block_number = BlockNumber(blockchain.block_number().into()); let start_time = if let Some(commit) = blockchain.commit(&blockchain.tip()) { @@ -121,7 +123,7 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> { TendermintMachine::new(network.clone(), block_number, start_time, proposal).await; tokio::task::spawn(machine.run()); - Some(Self { genesis, network, synced_block, messages: Arc::new(RwLock::new(messages)) }) + Some(Self { db, genesis, network, synced_block, messages: Arc::new(RwLock::new(messages)) }) } pub fn block_time() -> u32 { @@ -132,29 +134,26 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> { self.genesis } - // TODO: block, time_of_block, and commit shouldn't require acquiring the read lock - // These values can be safely read directly from the database since they're static pub async fn block_number(&self) -> u32 { self.network.blockchain.read().await.block_number() } pub async fn tip(&self) -> [u8; 32] { self.network.blockchain.read().await.tip() } - pub async fn block(&self, hash: &[u8; 32]) -> Option<Block<T>> { - self.network.blockchain.read().await.block(hash) + + // Since these values are static, they can be safely read from the database without lock + // acquisition + pub fn block(&self, hash: &[u8; 32]) -> Option<Block<T>> { + Blockchain::<D, T>::block_from_db(&self.db, hash) } - pub async fn time_of_block(&self, hash: &[u8; 32]) -> Option<u64> { + pub fn commit(&self, hash: &[u8; 32]) -> Option<Vec<u8>> { + Blockchain::<D, T>::commit_from_db(&self.db, hash) + } + pub fn time_of_block(&self, hash: &[u8; 32]) -> Option<u64> { self - .network - .blockchain - .read() - .await .commit(hash) .map(|commit| Commit::<Validators>::decode(&mut commit.as_ref()).unwrap().end_time) } - pub async fn commit(&self, hash: &[u8; 32]) -> Option<Vec<u8>> { - self.network.blockchain.read().await.commit(hash) - } pub async fn provide_transaction(&self, tx: T) -> Result<(), ProvidedError> { self.network.blockchain.write().await.provide_transaction(tx)