diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index 67566e53..43bd4cb0 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -16,6 +16,16 @@ impl<'a, D: Db> MainDb<'a, D> { D::key(b"coordinator_main", dst, key) } + fn handled_message_key(id: u64) -> Vec { + Self::main_key(b"handled_message", id.to_le_bytes()) + } + pub fn save_handled_message(txn: &mut D::Transaction<'_>, id: u64) { + txn.put(Self::handled_message_key(id), []); + } + pub fn handled_message(getter: &G, id: u64) -> bool { + getter.get(Self::handled_message_key(id)).is_some() + } + fn acive_tributaries_key() -> Vec { Self::main_key(b"active_tributaries", []) } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 0a75c16d..d08a43d1 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -524,6 +524,7 @@ pub async fn handle_processors( let channels = Arc::new(RwLock::new(HashMap::new())); tokio::spawn({ + let db = db.clone(); let processors = processors.clone(); let channels = channels.clone(); async move { @@ -849,6 +850,11 @@ pub async fn handle_processors( } } + // TODO: Consider a global txn for this message? + let mut txn = db.txn(); + MainDb::<'static, D>::save_handled_message(&mut txn, msg.id); + txn.commit(); + processors.ack(msg).await; } } @@ -864,9 +870,11 @@ pub async fn handle_processors( // Modify message-queue to offer per-sender queues, not per-receiver. // Alternatively, a peek method with local delineation of handled messages would work. - // TODO: Do we handle having handled a message, by DB, yet having rebooted before `ack`ing it? - // Does the processor? let msg = processors.recv().await; + if MainDb::<'static, D>::handled_message(&db, msg.id) { + processors.ack(msg).await; + continue; + } if last_msg == Some(msg.id) { sleep(Duration::from_secs(1)).await; continue;