Update the coordinator to not handle a processor message multiple times

This commit is contained in:
Luke Parker 2023-09-26 23:28:05 -04:00
parent 32a9a33226
commit 2e0f8138e2
No known key found for this signature in database
2 changed files with 20 additions and 2 deletions

View file

@ -16,6 +16,16 @@ impl<'a, D: Db> MainDb<'a, D> {
D::key(b"coordinator_main", dst, key) D::key(b"coordinator_main", dst, key)
} }
fn handled_message_key(id: u64) -> Vec<u8> {
Self::main_key(b"handled_message", id.to_le_bytes())
}
pub fn save_handled_message(txn: &mut D::Transaction<'_>, id: u64) {
txn.put(Self::handled_message_key(id), []);
}
pub fn handled_message<G: Get>(getter: &G, id: u64) -> bool {
getter.get(Self::handled_message_key(id)).is_some()
}
fn acive_tributaries_key() -> Vec<u8> { fn acive_tributaries_key() -> Vec<u8> {
Self::main_key(b"active_tributaries", []) Self::main_key(b"active_tributaries", [])
} }

View file

@ -524,6 +524,7 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
let channels = Arc::new(RwLock::new(HashMap::new())); let channels = Arc::new(RwLock::new(HashMap::new()));
tokio::spawn({ tokio::spawn({
let db = db.clone();
let processors = processors.clone(); let processors = processors.clone();
let channels = channels.clone(); let channels = channels.clone();
async move { async move {
@ -849,6 +850,11 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
} }
} }
// TODO: Consider a global txn for this message?
let mut txn = db.txn();
MainDb::<'static, D>::save_handled_message(&mut txn, msg.id);
txn.commit();
processors.ack(msg).await; processors.ack(msg).await;
} }
} }
@ -864,9 +870,11 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
// Modify message-queue to offer per-sender queues, not per-receiver. // Modify message-queue to offer per-sender queues, not per-receiver.
// Alternatively, a peek method with local delineation of handled messages would work. // Alternatively, a peek method with local delineation of handled messages would work.
// TODO: Do we handle having handled a message, by DB, yet having rebooted before `ack`ing it?
// Does the processor?
let msg = processors.recv().await; let msg = processors.recv().await;
if MainDb::<'static, D>::handled_message(&db, msg.id) {
processors.ack(msg).await;
continue;
}
if last_msg == Some(msg.id) { if last_msg == Some(msg.id) {
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;
continue; continue;