message-queue: remove (*)

This commit is contained in:
hinto.janai 2023-12-08 08:32:15 -05:00 committed by Luke Parker
parent 7122e0faf4
commit 32bea92742

View file

@ -59,7 +59,7 @@ pub(crate) fn queue_message(
sig: SchnorrSignature<Ristretto>, sig: SchnorrSignature<Ristretto>,
) { ) {
{ {
let from = (*KEYS).read().unwrap()[&meta.from]; let from = KEYS.read().unwrap()[&meta.from];
assert!( assert!(
sig.verify(from, message_challenge(meta.from, from, meta.to, &meta.intent, &msg, sig.R)) sig.verify(from, message_challenge(meta.from, from, meta.to, &meta.intent, &msg, sig.R))
); );
@ -89,7 +89,7 @@ pub(crate) fn queue_message(
DbTxn::put(&mut txn, intent_key, []); DbTxn::put(&mut txn, intent_key, []);
// Queue it // Queue it
let id = (*QUEUES).read().unwrap()[&(meta.from, meta.to)].write().unwrap().queue_message( let id = QUEUES.read().unwrap()[&(meta.from, meta.to)].write().unwrap().queue_message(
&mut txn, &mut txn,
QueuedMessage { QueuedMessage {
from: meta.from, from: meta.from,
@ -113,7 +113,7 @@ pub(crate) fn queue_message(
should be no sensitive data on this server. should be no sensitive data on this server.
*/ */
pub(crate) fn get_next_message(from: Service, to: Service) -> Option<QueuedMessage> { pub(crate) fn get_next_message(from: Service, to: Service) -> Option<QueuedMessage> {
let queue_outer = (*QUEUES).read().unwrap(); let queue_outer = QUEUES.read().unwrap();
let queue = queue_outer[&(from, to)].read().unwrap(); let queue = queue_outer[&(from, to)].read().unwrap();
let next = queue.last_acknowledged().map(|i| i + 1).unwrap_or(0); let next = queue.last_acknowledged().map(|i| i + 1).unwrap_or(0);
queue.get_message(next) queue.get_message(next)
@ -126,7 +126,7 @@ pub(crate) fn get_next_message(from: Service, to: Service) -> Option<QueuedMessa
*/ */
pub(crate) fn ack_message(from: Service, to: Service, id: u64, sig: SchnorrSignature<Ristretto>) { pub(crate) fn ack_message(from: Service, to: Service, id: u64, sig: SchnorrSignature<Ristretto>) {
{ {
let to_key = (*KEYS).read().unwrap()[&to]; let to_key = KEYS.read().unwrap()[&to];
assert!(sig.verify(to_key, ack_challenge(to, to_key, from, id, sig.R))); assert!(sig.verify(to_key, ack_challenge(to, to_key, from, id, sig.R)));
} }
@ -139,7 +139,7 @@ pub(crate) fn ack_message(from: Service, to: Service, id: u64, sig: SchnorrSigna
log::info!("Acknowledging From: {:?} To: {:?} ID: {}", from, to, id); log::info!("Acknowledging From: {:?} To: {:?} ID: {}", from, to, id);
(*QUEUES).read().unwrap()[&(from, to)].write().unwrap().ack_message(id) QUEUES.read().unwrap()[&(from, to)].write().unwrap().ack_message(id)
} }
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "current_thread")]
@ -186,8 +186,8 @@ async fn main() {
}; };
let register_service = |service, key| { let register_service = |service, key| {
(*KEYS).write().unwrap().insert(service, key); KEYS.write().unwrap().insert(service, key);
let mut queues = (*QUEUES).write().unwrap(); let mut queues = QUEUES.write().unwrap();
if service == Service::Coordinator { if service == Service::Coordinator {
for network in serai_primitives::NETWORKS { for network in serai_primitives::NETWORKS {
if network == NetworkId::Serai { if network == NetworkId::Serai {