From 6267acf3df876643b3ee7a267c7a8a934a5dae59 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sat, 1 Jul 2023 08:53:46 -0400 Subject: [PATCH] Add a message queue This is intended to be a reliable transport between the processors and coordinator. Since it'll be intranet only, it's written as never fail. Primarily needs testing and a proper ID. --- Cargo.lock | 18 ++++ Cargo.toml | 2 + crypto/ciphersuite/Cargo.toml | 2 +- crypto/schnorr/Cargo.toml | 2 +- message-queue/Cargo.toml | 38 +++++++++ message-queue/LICENSE | 15 ++++ message-queue/README.md | 14 ++++ message-queue/src/lib.rs | 2 + message-queue/src/main.rs | 152 ++++++++++++++++++++++++++++++++++ message-queue/src/messages.rs | 40 +++++++++ message-queue/src/queue.rs | 57 +++++++++++++ 11 files changed, 340 insertions(+), 2 deletions(-) create mode 100644 message-queue/Cargo.toml create mode 100644 message-queue/LICENSE create mode 100644 message-queue/README.md create mode 100644 message-queue/src/lib.rs create mode 100644 message-queue/src/main.rs create mode 100644 message-queue/src/messages.rs create mode 100644 message-queue/src/queue.rs diff --git a/Cargo.lock b/Cargo.lock index fc7bd417..8e88fcbc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8653,6 +8653,24 @@ dependencies = [ name = "serai-db" version = "0.1.0" +[[package]] +name = "serai-message-queue" +version = "0.1.0" +dependencies = [ + "ciphersuite", + "flexible-transcript", + "hex", + "jsonrpsee", + "lazy_static", + "log", + "schnorr-signatures", + "serai-db", + "serai-primitives", + "serde", + "serde_json", + "tokio", +] + [[package]] name = "serai-no-std" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 614426d2..8b859d90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,8 @@ members = [ "coins/monero/generators", "coins/monero", + "message-queue", + "processor/messages", "processor", diff --git a/crypto/ciphersuite/Cargo.toml b/crypto/ciphersuite/Cargo.toml index 538e1fa0..d6dfdbfe 100644 --- a/crypto/ciphersuite/Cargo.toml +++ b/crypto/ciphersuite/Cargo.toml @@ -45,7 +45,7 @@ ff-group-tests = { version = "0.13", path = "../ff-group-tests" } [features] alloc = ["std-shims"] -std = ["std-shims/std"] +std = ["zeroize/std", "std-shims/std"] dalek = ["sha2", "dalek-ff-group"] ed25519 = ["dalek"] diff --git a/crypto/schnorr/Cargo.toml b/crypto/schnorr/Cargo.toml index 06d34158..f687ad5a 100644 --- a/crypto/schnorr/Cargo.toml +++ b/crypto/schnorr/Cargo.toml @@ -35,5 +35,5 @@ dalek-ff-group = { path = "../dalek-ff-group", version = "0.3" } ciphersuite = { path = "../ciphersuite", version = "0.3", features = ["ed25519"] } [features] -std = ["std-shims/std", "ciphersuite/std"] +std = ["std-shims/std", "ciphersuite/std", "multiexp/std"] default = ["std"] diff --git a/message-queue/Cargo.toml b/message-queue/Cargo.toml new file mode 100644 index 00000000..cadefb37 --- /dev/null +++ b/message-queue/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "serai-message-queue" +version = "0.1.0" +description = "A message queue focused on safety" +license = "AGPL-3.0-only" +repository = "https://github.com/serai-dex/serai/tree/develop/message-log" +authors = ["Luke Parker "] +keywords = [] +edition = "2021" +publish = false + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[dependencies] +# Macros +lazy_static = "1" +serde = { version = "1", features = ["derive"] } + +# Encoders +hex = "0.4" +serde_json = "1" + +# Cryptography +transcript = { package = "flexible-transcript", path = "../crypto/transcript", features = ["recommended"] } +ciphersuite = { path = "../crypto/ciphersuite", features = ["ristretto"] } +schnorr-signatures = { path = "../crypto/schnorr" } + +# Application +log = "0.4" +tokio = { version = "1", features = ["full"] } + +serai-db = { path = "../common/db" } + +serai-primitives = { path = "../substrate/primitives" } + +jsonrpsee = { version = "0.16", features = ["server"] } diff --git a/message-queue/LICENSE b/message-queue/LICENSE new file mode 100644 index 00000000..f684d027 --- /dev/null +++ b/message-queue/LICENSE @@ -0,0 +1,15 @@ +AGPL-3.0-only license + +Copyright (c) 2023 Luke Parker + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License Version 3 as +published by the Free Software Foundation. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . diff --git a/message-queue/README.md b/message-queue/README.md new file mode 100644 index 00000000..7f4b02e5 --- /dev/null +++ b/message-queue/README.md @@ -0,0 +1,14 @@ +# Message Log + +A message log for various services to communicate over. + +Each message is checked to be of the claimed origin. Then, it's added to the +recipient's message queue. This queue is sequentially handled, FIFO, only +dropping messages once the recipient acknowledges it's been handled. + +A client which publishes an event specifies its own ID for the publication. If +multiple publications with the same ID occur, they are assumed repeats and +dropped. + +This library always panics as its error-cases should be unreachable, given its +intranet status. diff --git a/message-queue/src/lib.rs b/message-queue/src/lib.rs new file mode 100644 index 00000000..8bcb12a2 --- /dev/null +++ b/message-queue/src/lib.rs @@ -0,0 +1,2 @@ +mod messages; +pub use messages::*; diff --git a/message-queue/src/main.rs b/message-queue/src/main.rs new file mode 100644 index 00000000..a94fc5d2 --- /dev/null +++ b/message-queue/src/main.rs @@ -0,0 +1,152 @@ +use std::{ + sync::{Arc, RwLock}, + collections::HashMap, +}; + +use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; +use schnorr_signatures::SchnorrSignature; + +use serai_primitives::NetworkId; + +use jsonrpsee::{RpcModule, server::ServerBuilder}; + +mod messages; +use messages::*; + +mod queue; +use queue::Queue; + +lazy_static::lazy_static! { + static ref KEYS: Arc::G>>> = + Arc::new(RwLock::new(HashMap::new())); + static ref QUEUES: Arc>>>> = + Arc::new(RwLock::new(HashMap::new())); +} + +// queue RPC method +fn queue_message(meta: Metadata, msg: Vec, sig: SchnorrSignature) { + { + let from = (*KEYS).read().unwrap()[&meta.from]; + assert!(sig.verify(from, message_challenge(from, &msg, sig.R))); + } + + // Assert one, and only one of these, is the coordinator + assert!(matches!(meta.from, Service::Coordinator) ^ matches!(meta.to, Service::Coordinator)); + + // TODO: Verify the from_id hasn't been prior seen + + // Queue it + (*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message(QueuedMessage { + from: meta.from, + msg, + sig: sig.serialize(), + }); +} + +// get RPC method +fn get_next_message( + service: Service, + _expected: u64, + _signature: SchnorrSignature, +) -> Option { + // TODO: Verify the signature + + // TODO: Verify the expected next message ID matches + + let queue_outer = (*QUEUES).read().unwrap(); + let queue = queue_outer[&service].read().unwrap(); + let next = queue.last_acknowledged().map(|i| i + 1).unwrap_or(0); + queue.get_message(next) +} + +// ack RPC method +fn ack_message(service: Service, id: u64, _signature: SchnorrSignature) { + // TODO: Verify the signature + + // Is it: + // The acknowledged message should be > last acknowledged OR + // The acknowledged message should be >= + // It's the first if we save messages as acknowledged before acknowledging them + // It's the second if we acknowledge messages before saving them as acknowledged + // TODO: Check only a proper message is being acked + + (*QUEUES).read().unwrap()[&service].write().unwrap().ack_message(id) +} + +#[tokio::main] +async fn main() { + // Open the DB + // TODO + let db = serai_db::MemDb::new(); + + let read_key = |str| { + let Ok(key) = std::env::var(str) else { None? }; + + let mut repr = <::G as GroupEncoding>::Repr::default(); + repr.as_mut().copy_from_slice(&hex::decode(key).unwrap()); + Some(::G::from_bytes(&repr).unwrap()) + }; + + let register_service = |service, key| { + (*KEYS).write().unwrap().insert(service, key); + (*QUEUES).write().unwrap().insert(service, RwLock::new(Queue(db.clone(), service))); + }; + + // Make queues for each NetworkId, other than Serai + for network in [NetworkId::Bitcoin, NetworkId::Ethereum, NetworkId::Monero] { + // Use a match so we error if the list of NetworkIds changes + let Some(key) = read_key(match network { + NetworkId::Serai => unreachable!(), + NetworkId::Bitcoin => "BITCOIN_KEY", + NetworkId::Ethereum => "ETHEREUM_KEY", + NetworkId::Monero => "MONERO_KEY", + }) else { continue }; + + register_service(Service::Processor(network), key); + } + + // And the coordinator's + register_service(Service::Coordinator, read_key("COORDINATOR_KEY").unwrap()); + + // Start server + let builder = ServerBuilder::new(); + // TODO: Set max request/response size + let listen_on: &[std::net::SocketAddr] = &["0.0.0.0".parse().unwrap()]; + let server = builder.build(listen_on).await.unwrap(); + + let mut module = RpcModule::new(()); + module + .register_method("queue", |args, _| { + let args = args.parse::<(Metadata, Vec, Vec)>().unwrap(); + queue_message( + args.0, + args.1, + SchnorrSignature::::read(&mut args.2.as_slice()).unwrap(), + ); + Ok(()) + }) + .unwrap(); + module + .register_method("next", |args, _| { + let args = args.parse::<(Service, u64, Vec)>().unwrap(); + get_next_message( + args.0, + args.1, + SchnorrSignature::::read(&mut args.2.as_slice()).unwrap(), + ); + Ok(()) + }) + .unwrap(); + module + .register_method("ack", |args, _| { + let args = args.parse::<(Service, u64, Vec)>().unwrap(); + ack_message( + args.0, + args.1, + SchnorrSignature::::read(&mut args.2.as_slice()).unwrap(), + ); + Ok(()) + }) + .unwrap(); + server.start(module).unwrap(); +} diff --git a/message-queue/src/messages.rs b/message-queue/src/messages.rs new file mode 100644 index 00000000..87bb54c1 --- /dev/null +++ b/message-queue/src/messages.rs @@ -0,0 +1,40 @@ +use transcript::{Transcript, RecommendedTranscript}; +use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; + +use serde::{Serialize, Deserialize}; + +use serai_primitives::NetworkId; + +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Serialize, Deserialize)] +pub enum Service { + Processor(NetworkId), + Coordinator, +} + +#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] +pub struct QueuedMessage { + pub from: Service, + pub msg: Vec, + pub sig: Vec, +} + +#[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize, Deserialize)] +pub struct Metadata { + pub from: Service, + pub to: Service, + pub from_id: u64, +} + +pub fn message_challenge( + from: ::G, + msg: &[u8], + nonce: ::G, +) -> ::F { + let mut transcript = RecommendedTranscript::new(b"Serai Message Queue v0.1"); + transcript.domain_separate(b"message"); + transcript.append_message(b"from", from.to_bytes()); + transcript.append_message(b"msg", msg); + transcript.domain_separate(b"signature"); + transcript.append_message(b"nonce", nonce.to_bytes()); + ::hash_to_F(b"challenge", &transcript.challenge(b"challenge")) +} diff --git a/message-queue/src/queue.rs b/message-queue/src/queue.rs new file mode 100644 index 00000000..76599446 --- /dev/null +++ b/message-queue/src/queue.rs @@ -0,0 +1,57 @@ +use serai_db::{DbTxn, Db}; + +use crate::messages::*; + +#[derive(Clone, Debug)] +pub(crate) struct Queue(pub(crate) D, pub(crate) Service); +impl Queue { + fn key(domain: &'static [u8], key: impl AsRef<[u8]>) -> Vec { + [&[u8::try_from(domain.len()).unwrap()], domain, key.as_ref()].concat() + } + + fn message_count_key(&self) -> Vec { + Self::key(b"message_count", serde_json::to_vec(&self.1).unwrap()) + } + pub(crate) fn message_count(&self) -> u64 { + self + .0 + .get(self.message_count_key()) + .map(|bytes| u64::from_le_bytes(bytes.try_into().unwrap())) + .unwrap_or(0) + } + + fn last_acknowledged_key(&self) -> Vec { + Self::key(b"last_acknowledged", serde_json::to_vec(&self.1).unwrap()) + } + pub(crate) fn last_acknowledged(&self) -> Option { + self + .0 + .get(self.last_acknowledged_key()) + .map(|bytes| u64::from_le_bytes(bytes.try_into().unwrap())) + } + + fn message_key(&self, id: u64) -> Vec { + Self::key(b"message", serde_json::to_vec(&(self.1, id)).unwrap()) + } + pub(crate) fn queue_message(&mut self, msg: QueuedMessage) { + let id = self.message_count(); + let msg_key = self.message_key(id); + let msg_count_key = self.message_count_key(); + + let mut txn = self.0.txn(); + txn.put(msg_key, serde_json::to_vec(&msg).unwrap()); + txn.put(msg_count_key, (id + 1).to_le_bytes()); + txn.commit(); + } + + pub(crate) fn get_message(&self, id: u64) -> Option { + self.0.get(self.message_key(id)).map(|bytes| serde_json::from_slice(&bytes).unwrap()) + } + + pub(crate) fn ack_message(&mut self, id: u64) { + let ack_key = self.last_acknowledged_key(); + let mut txn = self.0.txn(); + txn.put(ack_key, id.to_le_bytes()); + txn.commit(); + } +}