Move heartbeat_tributaries from Tributary to TributaryReader

Reduces contention.
This commit is contained in:
Luke Parker 2023-09-25 17:15:36 -04:00
parent 4babf898d7
commit 0440e60645
No known key found for this signature in database
3 changed files with 24 additions and 14 deletions

View file

@ -300,12 +300,16 @@ pub async fn heartbeat_tributaries<D: Db, P: P2p>(
let ten_blocks_of_time =
Duration::from_secs((10 * Tributary::<D, Transaction, P>::block_time()).into());
let mut readers = vec![];
for tributary in tributaries.read().await.values() {
readers.push(tributary.tributary.read().await.reader());
}
loop {
for ActiveTributary { spec: _, tributary } in tributaries.read().await.values() {
let tributary = tributary.read().await;
let tip = tributary.tip().await;
let block_time = SystemTime::UNIX_EPOCH +
Duration::from_secs(tributary.reader().time_of_block(&tip).unwrap_or(0));
for tributary in &readers {
let tip = tributary.tip();
let block_time =
SystemTime::UNIX_EPOCH + Duration::from_secs(tributary.time_of_block(&tip).unwrap_or(0));
// Only trigger syncing if the block is more than a minute behind
if SystemTime::now() > (block_time + Duration::from_secs(60)) {
@ -777,10 +781,6 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
};
tx.sign(&mut OsRng, genesis, &key, nonce);
let Some(tributary) = tributaries.get(&genesis) else {
panic!("tributary we don't have came to consensus on an Batch");
};
let tributary = tributary.tributary.read().await;
publish_transaction(&tributary, tx).await;
}
}

View file

@ -27,8 +27,8 @@ pub(crate) struct Blockchain<D: Db, T: TransactionTrait> {
}
impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
fn tip_key(&self) -> Vec<u8> {
D::key(b"tributary_blockchain", b"tip", self.genesis)
fn tip_key(genesis: [u8; 32]) -> Vec<u8> {
D::key(b"tributary_blockchain", b"tip", genesis)
}
fn block_number_key(&self) -> Vec<u8> {
D::key(b"tributary_blockchain", b"block_number", self.genesis)
@ -80,7 +80,7 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
if let Some((block_number, tip)) = {
let db = res.db.as_ref().unwrap();
db.get(res.block_number_key()).map(|number| (number, db.get(res.tip_key()).unwrap()))
db.get(res.block_number_key()).map(|number| (number, db.get(Self::tip_key(genesis)).unwrap()))
} {
res.block_number = u32::from_le_bytes(block_number.try_into().unwrap());
res.tip.copy_from_slice(&tip);
@ -132,6 +132,10 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
db.get(Self::block_after_key(&genesis, block)).map(|bytes| bytes.try_into().unwrap())
}
pub(crate) fn tip_from_db(db: &D, genesis: [u8; 32]) -> [u8; 32] {
db.get(Self::tip_key(genesis)).map(|bytes| bytes.try_into().unwrap()).unwrap_or(genesis)
}
pub(crate) fn add_transaction<N: Network>(
&mut self,
internal: bool,
@ -226,7 +230,7 @@ impl<D: Db, T: TransactionTrait> Blockchain<D, T> {
let mut txn = db.txn();
self.tip = block.hash();
txn.put(self.tip_key(), self.tip);
txn.put(Self::tip_key(self.genesis), self.tip);
self.block_number += 1;
txn.put(self.block_number_key(), self.block_number.to_le_bytes());

View file

@ -344,7 +344,8 @@ impl<D: Db, T: TransactionTrait> TributaryReader<D, T> {
pub fn genesis(&self) -> [u8; 32] {
self.1
}
// Since these values are static, they can be safely read from the database without lock
// Since these values are static once set, they can be safely read from the database without lock
// acquisition
pub fn block(&self, hash: &[u8; 32]) -> Option<Block<T>> {
Blockchain::<D, T>::block_from_db(&self.0, self.1, hash)
@ -363,4 +364,9 @@ impl<D: Db, T: TransactionTrait> TributaryReader<D, T> {
.commit(hash)
.map(|commit| Commit::<Validators>::decode(&mut commit.as_ref()).unwrap().end_time)
}
// This isn't static, yet can be read with only minor discrepancy risks
pub fn tip(&self) -> [u8; 32] {
Blockchain::<D, T>::tip_from_db(&self.0, self.1)
}
}