From 91a024e119694b3ac3da650865ac5a6e3adbef79 Mon Sep 17 00:00:00 2001 From: econsta <143191603+econsta@users.noreply.github.com> Date: Thu, 7 Dec 2023 18:30:11 +0400 Subject: [PATCH] coordinator/src/db.rs db macro implimentation (#431) * coordinator/src/db.rs db macro implimentation * fixed fmt errors * converted txn functions to get/set counterparts * use take_signed_transaction function * fix for two fo the tests * Misc tweaks * Minor tweaks --------- Co-authored-by: Luke Parker --- coordinator/src/db.rs | 210 +++++++++------------------ coordinator/src/main.rs | 78 +++++----- coordinator/src/substrate/mod.rs | 12 +- coordinator/src/tributary/scanner.rs | 2 +- 4 files changed, 119 insertions(+), 183 deletions(-) diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index d8ec4356..560946bc 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -1,11 +1,9 @@ -use core::marker::PhantomData; - use blake2::{ digest::{consts::U32, Digest}, Blake2b, }; -use scale::{Encode, Decode}; +use scale::Encode; use serai_client::{ primitives::NetworkId, validator_sets::primitives::{Session, ValidatorSet}, @@ -17,31 +15,30 @@ pub use serai_db::*; use ::tributary::ReadWrite; use crate::tributary::{TributarySpec, Transaction, scanner::RecognizedIdType}; -#[derive(Debug)] -pub struct MainDb(PhantomData); -impl MainDb { - fn main_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec { - D::key(b"coordinator_main", dst, key) +create_db!( + MainDb { + HandledMessageDb: (network: NetworkId) -> u64, + ActiveTributaryDb: () -> Vec, + RetiredTributaryDb: (set: ValidatorSet) -> (), + SignedTransactionDb: (order: &[u8], nonce: u32) -> Vec, + FirstPreprocessDb: ( + network: NetworkId, + id_type: RecognizedIdType, + id: &[u8] + ) -> Vec>, + LastReceivedBatchDb: (network: NetworkId) -> u32, + ExpectedBatchDb: (network: NetworkId, id: u32) -> [u8; 32], + BatchDb: (network: NetworkId, id: u32) -> SignedBatch, + LastVerifiedBatchDb: (network: NetworkId) -> u32, + HandoverBatchDb: (set: ValidatorSet) -> u32, + LookupHandoverBatchDb: (network: NetworkId, batch: u32) -> Session, + QueuedBatchesDb: (set: ValidatorSet) -> Vec } +); - fn handled_message_key(network: NetworkId, id: u64) -> Vec { - Self::main_key(b"handled_message", (network, id).encode()) - } - pub fn save_handled_message(txn: &mut D::Transaction<'_>, network: NetworkId, id: u64) { - txn.put(Self::handled_message_key(network, id), []); - } - pub fn handled_message(getter: &G, network: NetworkId, id: u64) -> bool { - getter.get(Self::handled_message_key(network, id)).is_some() - } - - fn active_tributaries_key() -> Vec { - Self::main_key(b"active_tributaries", []) - } - fn retired_tributary_key(set: ValidatorSet) -> Vec { - Self::main_key(b"retired_tributary", set.encode()) - } +impl ActiveTributaryDb { pub fn active_tributaries(getter: &G) -> (Vec, Vec) { - let bytes = getter.get(Self::active_tributaries_key()).unwrap_or(vec![]); + let bytes = Self::get(getter).unwrap_or_default(); let mut bytes_ref: &[u8] = bytes.as_ref(); let mut tributaries = vec![]; @@ -51,9 +48,9 @@ impl MainDb { (bytes, tributaries) } - pub fn add_participating_in_tributary(txn: &mut D::Transaction<'_>, spec: &TributarySpec) { - let key = Self::active_tributaries_key(); - let (mut existing_bytes, existing) = Self::active_tributaries(txn); + + pub fn add_participating_in_tributary(txn: &mut impl DbTxn, spec: &TributarySpec) { + let (mut existing_bytes, existing) = ActiveTributaryDb::active_tributaries(txn); for tributary in &existing { if tributary == spec { return; @@ -61,9 +58,10 @@ impl MainDb { } spec.write(&mut existing_bytes).unwrap(); - txn.put(key, existing_bytes); + ActiveTributaryDb::set(txn, &existing_bytes); } - pub fn retire_tributary(txn: &mut D::Transaction<'_>, set: ValidatorSet) { + + pub fn retire_tributary(txn: &mut impl DbTxn, set: ValidatorSet) { let mut active = Self::active_tributaries(txn).1; for i in 0 .. active.len() { if active[i].set() == set { @@ -76,142 +74,72 @@ impl MainDb { for active in active { active.write(&mut bytes).unwrap(); } - txn.put(Self::active_tributaries_key(), bytes); - txn.put(Self::retired_tributary_key(set), []); - } - pub fn is_tributary_retired(getter: &G, set: ValidatorSet) -> bool { - getter.get(Self::retired_tributary_key(set)).is_some() + Self::set(txn, &bytes); + RetiredTributaryDb::set(txn, set, &()); } +} - fn signed_transaction_key(nonce: u32) -> Vec { - Self::main_key(b"signed_transaction", nonce.to_le_bytes()) - } - pub fn save_signed_transaction(txn: &mut D::Transaction<'_>, nonce: u32, tx: Transaction) { - txn.put(Self::signed_transaction_key(nonce), tx.serialize()); - } - pub fn take_signed_transaction(txn: &mut D::Transaction<'_>, nonce: u32) -> Option { - let key = Self::signed_transaction_key(nonce); - let res = txn.get(&key).map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap()); +impl SignedTransactionDb { + pub fn take_signed_transaction( + txn: &mut impl DbTxn, + order: &[u8], + nonce: u32, + ) -> Option { + let res = SignedTransactionDb::get(txn, order, nonce) + .map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap()); if res.is_some() { - txn.del(&key); + Self::del(txn, order, nonce); } res } +} - fn first_preprocess_key(network: NetworkId, id_type: RecognizedIdType, id: &[u8]) -> Vec { - Self::main_key(b"first_preprocess", (network, id_type, id).encode()) - } +impl FirstPreprocessDb { pub fn save_first_preprocess( - txn: &mut D::Transaction<'_>, + txn: &mut impl DbTxn, network: NetworkId, id_type: RecognizedIdType, id: &[u8], preprocess: Vec>, ) { - let preprocess = preprocess.encode(); - let key = Self::first_preprocess_key(network, id_type, id); - if let Some(existing) = txn.get(&key) { + if let Some(existing) = FirstPreprocessDb::get(txn, network, id_type, id) { assert_eq!(existing, preprocess, "saved a distinct first preprocess"); return; } - txn.put(key, preprocess); - } - pub fn first_preprocess( - getter: &G, - network: NetworkId, - id_type: RecognizedIdType, - id: &[u8], - ) -> Option>> { - getter - .get(Self::first_preprocess_key(network, id_type, id)) - .map(|bytes| Vec::<_>::decode(&mut bytes.as_slice()).unwrap()) + FirstPreprocessDb::set(txn, network, id_type, id, &preprocess); } +} - fn last_received_batch_key(network: NetworkId) -> Vec { - Self::main_key(b"last_received_batch", network.encode()) - } - fn expected_batch_key(network: NetworkId, id: u32) -> Vec { - Self::main_key(b"expected_batch", (network, id).encode()) - } - pub fn save_expected_batch(txn: &mut D::Transaction<'_>, batch: &Batch) { - txn.put(Self::last_received_batch_key(batch.network), batch.id.to_le_bytes()); - txn.put( - Self::expected_batch_key(batch.network, batch.id), - Blake2b::::digest(batch.instructions.encode()), +impl ExpectedBatchDb { + pub fn save_expected_batch(txn: &mut impl DbTxn, batch: &Batch) { + LastReceivedBatchDb::set(txn, batch.network, &batch.id); + Self::set( + txn, + batch.network, + batch.id, + &Blake2b::::digest(batch.instructions.encode()).into(), ); } - pub fn last_received_batch(getter: &G, network: NetworkId) -> Option { - getter - .get(Self::last_received_batch_key(network)) - .map(|id| u32::from_le_bytes(id.try_into().unwrap())) +} + +impl HandoverBatchDb { + pub fn set_handover_batch(txn: &mut impl DbTxn, set: ValidatorSet, batch: u32) { + Self::set(txn, set, &batch); + LookupHandoverBatchDb::set(txn, set.network, batch, &set.session); } - pub fn expected_batch(getter: &G, network: NetworkId, id: u32) -> Option<[u8; 32]> { - getter.get(Self::expected_batch_key(network, id)).map(|batch| batch.try_into().unwrap()) +} +impl QueuedBatchesDb { + pub fn queue(txn: &mut impl DbTxn, set: ValidatorSet, batch: Transaction) { + let mut batches = Self::get(txn, set).unwrap_or_default(); + batch.write(&mut batches).unwrap(); + Self::set(txn, set, &batches); } - fn batch_key(network: NetworkId, id: u32) -> Vec { - Self::main_key(b"batch", (network, id).encode()) - } - pub fn save_batch(txn: &mut D::Transaction<'_>, batch: SignedBatch) { - txn.put(Self::batch_key(batch.batch.network, batch.batch.id), batch.encode()); - } - pub fn batch(getter: &G, network: NetworkId, id: u32) -> Option { - getter - .get(Self::batch_key(network, id)) - .map(|batch| SignedBatch::decode(&mut batch.as_ref()).unwrap()) - } + pub fn take(txn: &mut impl DbTxn, set: ValidatorSet) -> Vec { + let batches_vec = Self::get(txn, set).unwrap_or_default(); + txn.del(&Self::key(set)); - fn last_verified_batch_key(network: NetworkId) -> Vec { - Self::main_key(b"last_verified_batch", network.encode()) - } - pub fn save_last_verified_batch(txn: &mut D::Transaction<'_>, network: NetworkId, id: u32) { - txn.put(Self::last_verified_batch_key(network), id.to_le_bytes()); - } - pub fn last_verified_batch(getter: &G, network: NetworkId) -> Option { - getter - .get(Self::last_verified_batch_key(network)) - .map(|id| u32::from_le_bytes(id.try_into().unwrap())) - } - - fn handover_batch_key(set: ValidatorSet) -> Vec { - Self::main_key(b"handover_batch", set.encode()) - } - fn lookup_handover_batch_key(network: NetworkId, batch: u32) -> Vec { - Self::main_key(b"lookup_handover_batch", (network, batch).encode()) - } - pub fn set_handover_batch(txn: &mut D::Transaction<'_>, set: ValidatorSet, batch: u32) { - txn.put(Self::handover_batch_key(set), batch.to_le_bytes()); - txn.put(Self::lookup_handover_batch_key(set.network, batch), set.session.0.to_le_bytes()); - } - pub fn handover_batch(getter: &G, set: ValidatorSet) -> Option { - getter.get(Self::handover_batch_key(set)).map(|id| u32::from_le_bytes(id.try_into().unwrap())) - } - pub fn is_handover_batch( - getter: &G, - network: NetworkId, - batch: u32, - ) -> Option { - getter.get(Self::lookup_handover_batch_key(network, batch)).map(|session| ValidatorSet { - network, - session: Session(u32::from_le_bytes(session.try_into().unwrap())), - }) - } - - fn queued_batches_key(set: ValidatorSet) -> Vec { - Self::main_key(b"queued_batches", set.encode()) - } - pub fn queue_batch(txn: &mut D::Transaction<'_>, set: ValidatorSet, batch: Transaction) { - let key = Self::queued_batches_key(set); - let mut batches = txn.get(&key).unwrap_or(vec![]); - batches.extend(batch.serialize()); - txn.put(&key, batches); - } - pub fn take_queued_batches(txn: &mut D::Transaction<'_>, set: ValidatorSet) -> Vec { - let key = Self::queued_batches_key(set); - let batches_vec = txn.get(&key).unwrap_or(vec![]); - txn.del(&key); let mut batches: &[u8] = &batches_vec; - let mut res = vec![]; while !batches.is_empty() { res.push(Transaction::read(&mut batches).unwrap()); diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 4e3c8a9f..708ec9d0 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -39,7 +39,7 @@ mod tributary; use crate::tributary::{TributarySpec, SignData, Transaction, scanner::RecognizedIdType, PlanIds}; mod db; -use db::MainDb; +use db::*; mod p2p; pub use p2p::*; @@ -83,7 +83,7 @@ async fn add_tributary( tributaries: &broadcast::Sender>, spec: TributarySpec, ) { - if MainDb::::is_tributary_retired(&db, spec.set()) { + if RetiredTributaryDb::get(&db, spec.set()).is_some() { log::info!("not adding tributary {:?} since it's been retired", spec.set()); } @@ -138,7 +138,7 @@ async fn publish_signed_transaction( // Safe as we should deterministically create transactions, meaning if this is already on-disk, // it's what we're saving now - MainDb::::save_signed_transaction(txn, signed.nonce, tx); + SignedTransactionDb::set(txn, &order, signed.nonce, &tx.serialize()); (order, signer) } else { @@ -147,8 +147,9 @@ async fn publish_signed_transaction( // If we're trying to publish 5, when the last transaction published was 3, this will delay // publication until the point in time we publish 4 - while let Some(tx) = MainDb::::take_signed_transaction( + while let Some(tx) = SignedTransactionDb::take_signed_transaction( txn, + &order, tributary .next_nonce(&signer, &order) .await @@ -181,8 +182,13 @@ async fn handle_processor_message( network: NetworkId, msg: &processors::Message, ) -> bool { - if MainDb::::handled_message(db, msg.network, msg.id) { - return true; + #[allow(clippy::nonminimal_bool)] + if let Some(already_handled) = HandledMessageDb::get(db, msg.network) { + assert!(!(already_handled > msg.id)); + assert!((already_handled == msg.id) || (already_handled == msg.id - 1)); + if already_handled == msg.id { + return true; + } } let _hvq_lock = HANDOVER_VERIFY_QUEUE_LOCK.get_or_init(|| Mutex::new(())).lock().await; @@ -219,7 +225,7 @@ async fn handle_processor_message( .iter() .map(|plan| plan.session) .filter(|session| { - !MainDb::::is_tributary_retired(&txn, ValidatorSet { network, session: *session }) + RetiredTributaryDb::get(&txn, ValidatorSet { network, session: *session }).is_none() }) .collect::>(); @@ -293,7 +299,7 @@ async fn handle_processor_message( batch.network, msg.network, "processor sent us a batch for a different network than it was for", ); - MainDb::::save_expected_batch(&mut txn, batch); + ExpectedBatchDb::save_expected_batch(&mut txn, batch); None } // If this is a new Batch, immediately publish it (if we can) @@ -306,7 +312,7 @@ async fn handle_processor_message( log::debug!("received batch {:?} {}", batch.batch.network, batch.batch.id); // Save this batch to the disk - MainDb::::save_batch(&mut txn, batch.clone()); + BatchDb::set(&mut txn, batch.batch.network, batch.batch.id, &batch.clone()); // Get the next-to-execute batch ID let mut next = substrate::get_expected_next_batch(serai, network).await; @@ -314,7 +320,7 @@ async fn handle_processor_message( // Since we have a new batch, publish all batches yet to be published to Serai // This handles the edge-case where batch n+1 is signed before batch n is let mut batches = VecDeque::new(); - while let Some(batch) = MainDb::::batch(&txn, network, next) { + while let Some(batch) = BatchDb::get(&txn, network, next) { batches.push_back(batch); next += 1; } @@ -359,10 +365,12 @@ async fn handle_processor_message( // If we have a relevant Tributary, check it's actually still relevant and has yet to be retired if let Some(relevant_tributary_value) = relevant_tributary { - if MainDb::::is_tributary_retired( + if RetiredTributaryDb::get( &txn, ValidatorSet { network: msg.network, session: relevant_tributary_value }, - ) { + ) + .is_some() + { relevant_tributary = None; } } @@ -491,7 +499,7 @@ async fn handle_processor_message( } sign::ProcessorMessage::Preprocess { id, preprocesses } => { if id.attempt == 0 { - MainDb::::save_first_preprocess( + FirstPreprocessDb::save_first_preprocess( &mut txn, network, RecognizedIdType::Plan, @@ -563,7 +571,7 @@ async fn handle_processor_message( // If this is the first attempt instance, wait until we synchronize around the batch // first if id.attempt == 0 { - MainDb::::save_first_preprocess( + FirstPreprocessDb::save_first_preprocess( &mut txn, spec.set().network, RecognizedIdType::Batch, @@ -588,8 +596,8 @@ async fn handle_processor_message( // all prior published `Batch`s // TODO: This assumes BatchPreprocess is immediately after Batch // Ensure that assumption - let last_received = MainDb::::last_received_batch(&txn, msg.network).unwrap(); - let handover_batch = MainDb::::handover_batch(&txn, spec.set()); + let last_received = LastReceivedBatchDb::get(&txn, msg.network).unwrap(); + let handover_batch = HandoverBatchDb::get(&txn, spec.set()); let mut queue = false; if let Some(handover_batch) = handover_batch { // There is a race condition here. We may verify all `Batch`s from the prior set, @@ -604,7 +612,7 @@ async fn handle_processor_message( // To fix this, if this is after the handover `Batch` and we have yet to verify // publication of the handover `Batch`, don't yet yield the provided. if last_received > handover_batch { - if let Some(last_verified) = MainDb::::last_verified_batch(&txn, msg.network) { + if let Some(last_verified) = LastVerifiedBatchDb::get(&txn, msg.network) { if last_verified < handover_batch { queue = true; } @@ -613,11 +621,11 @@ async fn handle_processor_message( } } } else { - MainDb::::set_handover_batch(&mut txn, spec.set(), last_received); + HandoverBatchDb::set_handover_batch(&mut txn, spec.set(), last_received); // If this isn't the first batch, meaning we do have to verify all prior batches, and // the prior Batch hasn't been verified yet... if (last_received != 0) && - MainDb::::last_verified_batch(&txn, msg.network) + LastVerifiedBatchDb::get(&txn, msg.network) .map(|last_verified| last_verified < (last_received - 1)) .unwrap_or(true) { @@ -627,14 +635,14 @@ async fn handle_processor_message( } if queue { - MainDb::::queue_batch(&mut txn, spec.set(), intended); + QueuedBatchesDb::queue(&mut txn, spec.set(), intended); vec![] } else { // Because this is post-verification of the handover batch, take all queued `Batch`s // now to ensure we don't provide this before an already queued Batch // This *may* be an unreachable case due to how last_verified_batch is set, yet it // doesn't hurt to have as a defensive pattern - let mut res = MainDb::::take_queued_batches(&mut txn, spec.set()); + let mut res = QueuedBatchesDb::take(&mut txn, spec.set()); res.push(intended); res } @@ -702,7 +710,7 @@ async fn handle_processor_message( } } - MainDb::::save_handled_message(&mut txn, msg.network, msg.id); + HandledMessageDb::set(&mut txn, msg.network, &msg.id); txn.commit(); true @@ -828,7 +836,7 @@ async fn handle_cosigns_and_batch_publication( let _hvq_lock = HANDOVER_VERIFY_QUEUE_LOCK.get_or_init(|| Mutex::new(())).lock().await; let mut txn = db.txn(); let mut to_publish = vec![]; - let start_id = MainDb::::last_verified_batch(&txn, network) + let start_id = LastVerifiedBatchDb::get(&txn, network) .map(|already_verified| already_verified + 1) .unwrap_or(0); if let Some(last_id) = @@ -838,9 +846,10 @@ async fn handle_cosigns_and_batch_publication( // `Batch` // If so, we need to publish queued provided `Batch` transactions for batch in start_id ..= last_id { - let is_pre_handover = MainDb::::is_handover_batch(&txn, network, batch + 1); - if let Some(set) = is_pre_handover { - let mut queued = MainDb::::take_queued_batches(&mut txn, set); + let is_pre_handover = LookupHandoverBatchDb::get(&txn, network, batch + 1); + if let Some(session) = is_pre_handover { + let set = ValidatorSet { network, session }; + let mut queued = QueuedBatchesDb::take(&mut txn, set); // is_handover_batch is only set for handover `Batch`s we're participating in, making // this safe if queued.is_empty() { @@ -851,14 +860,14 @@ async fn handle_cosigns_and_batch_publication( to_publish.push((set.session, queued.remove(0))); // Re-queue the remaining batches for remaining in queued { - MainDb::::queue_batch(&mut txn, set, remaining); + QueuedBatchesDb::queue(&mut txn, set, remaining); } } - let is_handover = MainDb::::is_handover_batch(&txn, network, batch); - if let Some(set) = is_handover { - for queued in MainDb::::take_queued_batches(&mut txn, set) { - to_publish.push((set.session, queued)); + let is_handover = LookupHandoverBatchDb::get(&txn, network, batch); + if let Some(session) = is_handover { + for queued in QueuedBatchesDb::take(&mut txn, ValidatorSet { network, session }) { + to_publish.push((session, queued)); } } } @@ -952,7 +961,7 @@ pub async fn run( let (new_tributary_spec_send, mut new_tributary_spec_recv) = mpsc::unbounded_channel(); // Reload active tributaries from the database - for spec in MainDb::::active_tributaries(&raw_db).1 { + for spec in ActiveTributaryDb::active_tributaries(&raw_db).1 { new_tributary_spec_send.send(spec).unwrap(); } @@ -1058,8 +1067,7 @@ pub async fn run( // This waits until the necessary preprocess is available 0, let get_preprocess = |raw_db, id_type, id| async move { loop { - let Some(preprocess) = MainDb::::first_preprocess(raw_db, set.network, id_type, id) - else { + let Some(preprocess) = FirstPreprocessDb::get(raw_db, set.network, id_type, id) else { log::warn!("waiting for preprocess for recognized ID"); sleep(Duration::from_millis(100)).await; continue; @@ -1096,7 +1104,7 @@ pub async fn run( let tributaries = tributaries.read().await; let Some(tributary) = tributaries.get(&genesis) else { // If we don't have this Tributary because it's retired, break and move on - if MainDb::::is_tributary_retired(&raw_db, set) { + if RetiredTributaryDb::get(&raw_db, set).is_some() { break; } diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index b6861928..2d33a9a2 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -105,7 +105,7 @@ async fn handle_new_set( // If this txn doesn't finish, this will be re-fired // If we waited to save to the DB, this txn may be finished, preventing re-firing, yet the // prior fired event may have not been received yet - crate::MainDb::::add_participating_in_tributary(txn, &spec); + crate::ActiveTributaryDb::add_participating_in_tributary(txn, &spec); new_tributary_spec.send(spec).unwrap(); } else { @@ -306,7 +306,7 @@ async fn handle_block( if !SubstrateDb::::handled_event(&db.0, hash, event_id) { log::info!("found fresh set retired event {:?}", retired_set); let mut txn = db.0.txn(); - crate::MainDb::::retire_tributary(&mut txn, set); + crate::ActiveTributaryDb::retire_tributary(&mut txn, set); tributary_retired.send(set).unwrap(); SubstrateDb::::handle_event(&mut txn, hash, event_id); txn.commit(); @@ -678,12 +678,12 @@ pub(crate) async fn verify_published_batches( optimistic_up_to: u32, ) -> Option { // TODO: Localize from MainDb to SubstrateDb - let last = crate::MainDb::::last_verified_batch(txn, network); + let last = crate::LastVerifiedBatchDb::get(txn, network); for id in last.map(|last| last + 1).unwrap_or(0) ..= optimistic_up_to { let Some(on_chain) = SubstrateDb::::batch_instructions_hash(txn, network, id) else { break; }; - let off_chain = crate::MainDb::::expected_batch(txn, network, id).unwrap(); + let off_chain = crate::ExpectedBatchDb::get(txn, network, id).unwrap(); if on_chain != off_chain { // Halt operations on this network and spin, as this is a critical fault loop { @@ -698,8 +698,8 @@ pub(crate) async fn verify_published_batches( sleep(Duration::from_secs(60)).await; } } - crate::MainDb::::save_last_verified_batch(txn, network, id); + crate::LastVerifiedBatchDb::set(txn, network, &id); } - crate::MainDb::::last_verified_batch(txn, network) + crate::LastVerifiedBatchDb::get(txn, network) } diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index d196c8f0..c127bdfa 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -232,7 +232,7 @@ pub(crate) async fn scan_tributaries_task< let mut tributary_db = raw_db.clone(); loop { // Check if the set was retired, and if so, don't further operate - if crate::MainDb::::is_tributary_retired(&raw_db, spec.set()) { + if crate::db::RetiredTributaryDb::get(&raw_db, spec.set()).is_some() { break; }