diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 17745580..b95a7df1 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -870,6 +870,7 @@ pub async fn handle_processors( // Alternatively, a peek method with local delineation of handled messages would work. let msg = processors.recv().await; + // TODO: Check this ID is sane (last handled ID or expected next ID) if last_msg == Some(msg.id) { sleep(Duration::from_secs(1)).await; continue; diff --git a/coordinator/src/processors.rs b/coordinator/src/processors.rs index 3dc0f0a6..7b6c6146 100644 --- a/coordinator/src/processors.rs +++ b/coordinator/src/processors.rs @@ -28,8 +28,7 @@ impl Processors for Arc { self.queue(metadata, msg.into_bytes()).await; } async fn recv(&mut self) -> Message { - // TODO: Use a proper expected next ID - let msg = self.next(0).await; + let msg = self.next().await; let network = match msg.from { Service::Processor(network) => network, diff --git a/message-queue/src/client.rs b/message-queue/src/client.rs index 928c2735..f0f26330 100644 --- a/message-queue/src/client.rs +++ b/message-queue/src/client.rs @@ -140,9 +140,9 @@ impl MessageQueue { } } - pub async fn next(&self, expected: u64) -> QueuedMessage { + pub async fn next(&self) -> QueuedMessage { loop { - let json = self.json_call("next", serde_json::json!([self.service, expected])).await; + let json = self.json_call("next", serde_json::json!([self.service])).await; // Convert from a Value to a type via reserialization let msg: Option = serde_json::from_str( @@ -174,7 +174,6 @@ impl MessageQueue { ); } // TODO: Verify the sender's signature - // TODO: Check the ID is sane return msg; } diff --git a/message-queue/src/main.rs b/message-queue/src/main.rs index c59abe5b..dcf86225 100644 --- a/message-queue/src/main.rs +++ b/message-queue/src/main.rs @@ -107,16 +107,11 @@ mod binaries { /* Gets the next message in queue for this service. - This is not authenticated due to the fact every nonce would have to be saved to prevent replays, - or a challenge-response protocol implemented. Neither are worth doing when there should be no - sensitive data on this server. - - The expected index is used to ensure a service didn't fall out of sync with this service. It - should always be either the next message's ID or *TODO*. + This is not authenticated due to the fact every nonce would have to be saved to prevent + replays, or a challenge-response protocol implemented. Neither are worth doing when there + should be no sensitive data on this server. */ - pub(crate) fn get_next_message(service: Service, _expected: u64) -> Option { - // TODO: Verify the expected next message ID matches - + pub(crate) fn get_next_message(service: Service) -> Option { let queue_outer = (*QUEUES).read().unwrap(); let queue = queue_outer[&service].read().unwrap(); let next = queue.last_acknowledged().map(|i| i + 1).unwrap_or(0); @@ -229,8 +224,8 @@ async fn main() { .unwrap(); module .register_method("next", |args, _| { - let args = args.parse::<(Service, u64)>().unwrap(); - Ok(get_next_message(args.0, args.1)) + let args = args.parse::().unwrap(); + Ok(get_next_message(args)) }) .unwrap(); module diff --git a/processor/src/coordinator.rs b/processor/src/coordinator.rs index b9c10a16..fb8e20f1 100644 --- a/processor/src/coordinator.rs +++ b/processor/src/coordinator.rs @@ -25,8 +25,7 @@ impl Coordinator for MessageQueue { } async fn recv(&mut self) -> Message { - // TODO2: Use a proper expected next ID - let msg = self.next(0).await; + let msg = self.next().await; let id = msg.id; diff --git a/processor/src/main.rs b/processor/src/main.rs index 523fa3b1..b4d006e3 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -471,6 +471,7 @@ async fn run(mut raw_db: D, network: N, mut let (main_db, mut tributary_mutable, mut substrate_mutable) = boot(&mut raw_db, &network).await; // We can't load this from the DB as we can't guarantee atomic increments with the ack function + // TODO: Load with a slight tolerance let mut last_coordinator_msg = None; loop {