diff --git a/.github/workflows/message-queue-tests.yml b/.github/workflows/message-queue-tests.yml new file mode 100644 index 00000000..2819971c --- /dev/null +++ b/.github/workflows/message-queue-tests.yml @@ -0,0 +1,25 @@ +name: Message Queue Tests + +on: + push: + branches: + - develop + paths: + - "message-queue/**" + - "tests/message-queue/**" + pull_request: + workflow_dispatch: + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Install Build Dependencies + uses: ./.github/actions/build-dependencies + with: + github-token: ${{ inputs.github-token }} + + - name: Run message-queue Docker tests + run: cd tests/message-queue && cargo test diff --git a/Cargo.lock b/Cargo.lock index a5b547e4..f8259006 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -788,6 +788,44 @@ dependencies = [ "subtle", ] +[[package]] +name = "bollard" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d82e7850583ead5f8bbef247e2a3c37a19bd576e8420cd262a6711921827e1e5" +dependencies = [ + "base64 0.13.1", + "bollard-stubs", + "bytes", + "futures-core", + "futures-util", + "hex", + "http", + "hyper", + "hyperlocal", + "log", + "pin-project-lite 0.2.10", + "serde", + "serde_derive", + "serde_json", + "serde_urlencoded", + "thiserror", + "tokio", + "tokio-util", + "url", + "winapi", +] + +[[package]] +name = "bollard-stubs" +version = "1.42.0-rc.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed59b5c00048f48d7af971b71f800fdf23e858844a6f9e4d32ca72e9399e7864" +dependencies = [ + "serde", + "serde_with", +] + [[package]] name = "bounded-collections" version = "0.1.8" @@ -1638,6 +1676,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "darling" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c" +dependencies = [ + "darling_core 0.13.4", + "darling_macro 0.13.4", +] + [[package]] name = "darling" version = "0.14.4" @@ -1658,6 +1706,20 @@ dependencies = [ "darling_macro 0.20.3", ] +[[package]] +name = "darling_core" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 1.0.109", +] + [[package]] name = "darling_core" version = "0.14.4" @@ -1686,6 +1748,17 @@ dependencies = [ "syn 2.0.26", ] +[[package]] +name = "darling_macro" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" +dependencies = [ + "darling_core 0.13.4", + "quote", + "syn 1.0.109", +] + [[package]] name = "darling_macro" version = "0.14.4" @@ -1991,6 +2064,28 @@ dependencies = [ "zeroize", ] +[[package]] +name = "dockertest" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce5e89cd7c59faf3cf0e31369fce2382807dd794d4fcce6380adcefdf5987796" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.13.1", + "bollard", + "dyn-clone", + "futures", + "lazy_static", + "rand 0.8.5", + "secrecy", + "serde", + "serde_json", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "downcast" version = "0.11.0" @@ -3672,6 +3767,19 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "hyperlocal" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fafdf7b2b2de7c9784f76e02c0935e65a8117ec3b768644379983ab333ac98c" +dependencies = [ + "futures-util", + "hex", + "hyper", + "pin-project", + "tokio", +] + [[package]] name = "iana-time-zone" version = "0.1.57" @@ -8670,10 +8778,12 @@ version = "0.1.0" dependencies = [ "bincode", "ciphersuite", + "env_logger", "flexible-transcript", "hex", "jsonrpsee", "lazy_static", + "log", "rand_core 0.6.4", "reqwest", "rocksdb", @@ -8687,6 +8797,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "serai-message-queue-tests" +version = "0.1.0" +dependencies = [ + "ciphersuite", + "dockertest", + "hex", + "rand_core 0.6.4", + "serai-message-queue", + "serai-primitives", + "tokio", +] + [[package]] name = "serai-no-std-tests" version = "0.1.0" @@ -8975,6 +9098,28 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff" +dependencies = [ + "serde", + "serde_with_macros", +] + +[[package]] +name = "serde_with_macros" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082" +dependencies = [ + "darling 0.13.4", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "sha-1" version = "0.9.8" diff --git a/Cargo.toml b/Cargo.toml index 9574483a..c5a566da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ members = [ "substrate/client", "tests/no-std", + "tests/message-queue", ] # Always compile Monero (and a variety of dependencies) with optimizations due diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index 0e32dbe4..7165cc85 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -32,7 +32,7 @@ scale = { package = "parity-scale-codec", version = "3", features = ["derive"] } sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false } rocksdb = "0.21" -serai-db = { path = "../common/db" } +serai-db = { path = "../common/db", features = ["rocksdb"] } serai-env = { path = "../common/env" } processor-messages = { package = "serai-processor-messages", path = "../processor/messages" } diff --git a/deploy/message-queue/Dockerfile b/deploy/message-queue/Dockerfile index 6d77f179..a5281f4e 100644 --- a/deploy/message-queue/Dockerfile +++ b/deploy/message-queue/Dockerfile @@ -45,7 +45,7 @@ RUN --mount=type=cache,target=/root/.local/ \ --mount=type=cache,target=/serai/target/release/incremental \ --mount=type=cache,target=/serai/target/release/wbuild \ --mount=type=cache,target=/serai/target/release/lib* \ - cd message-queue && cargo build --release + cd message-queue && cargo build --release --all-features # Prepare Image FROM ubuntu:latest as image diff --git a/deploy/processor/Dockerfile b/deploy/processor/Dockerfile index 50ff0f67..8fd36894 100644 --- a/deploy/processor/Dockerfile +++ b/deploy/processor/Dockerfile @@ -45,7 +45,7 @@ RUN --mount=type=cache,target=/root/.local/ \ --mount=type=cache,target=/serai/target/release/incremental \ --mount=type=cache,target=/serai/target/release/wbuild \ --mount=type=cache,target=/serai/target/release/lib* \ - cd processor && cargo build --release + cd processor && cargo build --release --all-features # Prepare Image FROM ubuntu:latest as image diff --git a/message-queue/Cargo.toml b/message-queue/Cargo.toml index 54fea1ea..f3a7bb0d 100644 --- a/message-queue/Cargo.toml +++ b/message-queue/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "serai-message-queue" version = "0.1.0" -description = "A message queue focused on safety" +description = "A message queue for Serai focused on consistency" license = "AGPL-3.0-only" -repository = "https://github.com/serai-dex/serai/tree/develop/message-log" +repository = "https://github.com/serai-dex/serai/tree/develop/message-queue" authors = ["Luke Parker "] keywords = [] edition = "2021" @@ -14,7 +14,6 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [dependencies] - # Macros lazy_static = "1" serde = { version = "1", features = ["derive"] } @@ -34,14 +33,20 @@ ciphersuite = { path = "../crypto/ciphersuite", features = ["ristretto"] } schnorr-signatures = { path = "../crypto/schnorr" } # Application +log = "0.4" +env_logger = "0.10" + tokio = { version = "1", features = ["full"] } -serai-db = { path = "../common/db", features = ["rocksdb"] } -rocksdb = "0.21" +serai-db = { path = "../common/db", features = ["rocksdb"], optional = true } +rocksdb = { version = "0.21", optional = true } serai-env = { path = "../common/env" } serai-primitives = { path = "../substrate/primitives" } -jsonrpsee = { version = "0.16", features = ["server"] } +jsonrpsee = { version = "0.16", features = ["server"], optional = true } reqwest = { version = "0.11", features = ["json"] } + +[features] +binaries = ["serai-db", "rocksdb", "jsonrpsee"] diff --git a/message-queue/src/client.rs b/message-queue/src/client.rs index 5345bb99..ea20d92d 100644 --- a/message-queue/src/client.rs +++ b/message-queue/src/client.rs @@ -57,7 +57,7 @@ impl MessageQueue { async fn json_call(&self, method: &'static str, params: serde_json::Value) -> serde_json::Value { #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] struct JsonRpcRequest { - version: &'static str, + jsonrpc: &'static str, method: &'static str, params: serde_json::Value, id: u64, @@ -65,21 +65,29 @@ impl MessageQueue { let res = loop { // Make the request - if let Ok(req) = self + match self .client .post(&self.url) - .json(&JsonRpcRequest { version: "2.0", method, params: params.clone(), id: 0 }) + .json(&JsonRpcRequest { jsonrpc: "2.0", method, params: params.clone(), id: 0 }) .send() .await { - // Get the response - if let Ok(res) = req.text().await { - break res; + Ok(req) => { + // Get the response + match req.text().await { + Ok(res) => break res, + Err(e) => { + dbg!(e); + } + } + } + Err(e) => { + dbg!(e); } } - // Sleep 5s before trying again - tokio::time::sleep(core::time::Duration::from_secs(5)).await; + // Sleep for a second before trying again + tokio::time::sleep(core::time::Duration::from_secs(1)).await; }; let json = @@ -161,7 +169,7 @@ impl MessageQueue { ) .serialize(); - let json = self.json_call("ack", serde_json::json!([id, sig])).await; + let json = self.json_call("ack", serde_json::json!([self.service, id, sig])).await; if json.get("result") != Some(&serde_json::Value::Bool(true)) { panic!("failed to ack message {id}: {json}"); } diff --git a/message-queue/src/main.rs b/message-queue/src/main.rs index dae4b53e..d56cf552 100644 --- a/message-queue/src/main.rs +++ b/message-queue/src/main.rs @@ -53,13 +53,15 @@ fn queue_message(meta: Metadata, msg: Vec, sig: SchnorrSignature) // TODO: Verify (from, intent) hasn't been prior seen // Queue it - (*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message(QueuedMessage { + let id = (*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message(QueuedMessage { from: meta.from, // Temporary value which queue_message will override id: u64::MAX, msg, sig: sig.serialize(), }); + + log::info!("Queued message from {:?}. It is {:?} {id}", meta.from, meta.to); } // next RPC method @@ -100,11 +102,20 @@ fn ack_message(service: Service, id: u64, sig: SchnorrSignature) { // It's the second if we acknowledge messages before saving them as acknowledged // TODO: Check only a proper message is being acked + log::info!("{:?} is acknowledging {}", service, id); + (*QUEUES).read().unwrap()[&service].write().unwrap().ack_message(id) } #[tokio::main] async fn main() { + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", "info"); + } + env_logger::init(); + + log::info!("Starting message-queue service..."); + // Open the DB let db = Arc::new( rocksdb::TransactionDB::open_default( @@ -160,14 +171,13 @@ async fn main() { args.1, SchnorrSignature::::read(&mut args.2.as_slice()).unwrap(), ); - Ok(()) + Ok(true) }) .unwrap(); module .register_method("next", |args, _| { let args = args.parse::<(Service, u64)>().unwrap(); - get_next_message(args.0, args.1); - Ok(()) + Ok(get_next_message(args.0, args.1)) }) .unwrap(); module @@ -178,7 +188,7 @@ async fn main() { args.1, SchnorrSignature::::read(&mut args.2.as_slice()).unwrap(), ); - Ok(()) + Ok(true) }) .unwrap(); diff --git a/message-queue/src/queue.rs b/message-queue/src/queue.rs index 60359fd1..ab9dfc28 100644 --- a/message-queue/src/queue.rs +++ b/message-queue/src/queue.rs @@ -33,7 +33,7 @@ impl Queue { fn message_key(&self, id: u64) -> Vec { Self::key(b"message", serde_json::to_vec(&(self.1, id)).unwrap()) } - pub(crate) fn queue_message(&mut self, mut msg: QueuedMessage) { + pub(crate) fn queue_message(&mut self, mut msg: QueuedMessage) -> u64 { let id = self.message_count(); msg.id = id; let msg_key = self.message_key(id); @@ -43,6 +43,8 @@ impl Queue { txn.put(msg_key, serde_json::to_vec(&msg).unwrap()); txn.put(msg_count_key, (id + 1).to_le_bytes()); txn.commit(); + + id } pub(crate) fn get_message(&self, id: u64) -> Option { diff --git a/processor/Cargo.toml b/processor/Cargo.toml index e4968a32..2bcce6ac 100644 --- a/processor/Cargo.toml +++ b/processor/Cargo.toml @@ -55,7 +55,7 @@ log = "0.4" tokio = { version = "1", features = ["full"] } rocksdb = "0.21" -serai-db = { path = "../common/db", default-features = false } +serai-db = { path = "../common/db", default-features = false, features = ["rocksdb"] } serai-env = { path = "../common/env" } serai-client = { path = "../substrate/client", default-features = false } diff --git a/tests/message-queue/Cargo.toml b/tests/message-queue/Cargo.toml new file mode 100644 index 00000000..2163abe4 --- /dev/null +++ b/tests/message-queue/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "serai-message-queue-tests" +version = "0.1.0" +description = "Tests for Serai's message-queue" +license = "AGPL-3.0-only" +repository = "https://github.com/serai-dex/serai/tree/develop/tests/message-queue" +authors = ["Luke Parker "] +keywords = [] +edition = "2021" +publish = false + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[dependencies] +hex = "0.4" + +rand_core = "0.6" + +ciphersuite = { path = "../../crypto/ciphersuite", features = ["ristretto"] } + +serai-primitives = { path = "../../substrate/primitives" } +serai-message-queue = { path = "../../message-queue" } + +tokio = { version = "1", features = ["full"] } +dockertest = "0.3" diff --git a/tests/message-queue/LICENSE b/tests/message-queue/LICENSE new file mode 100644 index 00000000..f684d027 --- /dev/null +++ b/tests/message-queue/LICENSE @@ -0,0 +1,15 @@ +AGPL-3.0-only license + +Copyright (c) 2023 Luke Parker + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License Version 3 as +published by the Free Software Foundation. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . diff --git a/tests/message-queue/src/lib.rs b/tests/message-queue/src/lib.rs new file mode 100644 index 00000000..ee20d002 --- /dev/null +++ b/tests/message-queue/src/lib.rs @@ -0,0 +1,176 @@ +#[cfg(test)] +mod tests { + use std::{ + sync::{Mutex, OnceLock}, + collections::HashMap, + env, + }; + + use rand_core::OsRng; + + use ciphersuite::{ + group::{ + ff::{Field, PrimeField}, + GroupEncoding, + }, + Ciphersuite, Ristretto, + }; + + use serai_primitives::NetworkId; + use serai_message_queue::{Service, Metadata, client::MessageQueue}; + + use dockertest::{ + PullPolicy, Image, LogAction, LogPolicy, LogSource, LogOptions, Composition, DockerTest, + }; + + static BUILT: OnceLock> = OnceLock::new(); + fn build() { + let built = BUILT.get_or_init(|| Mutex::new(false)); + // Only one call to build will acquire this lock + let mut built_lock = built.lock().unwrap(); + if *built_lock { + // If it was built, return + return; + } + + // Else, hold the lock while we build + let mut path = env::current_exe().unwrap(); + path.pop(); + assert!(path.as_path().ends_with("deps")); + path.pop(); + assert!(path.as_path().ends_with("debug")); + path.pop(); + assert!(path.as_path().ends_with("target")); + path.pop(); + path.push("deploy"); + + println!("Building message-queue..."); + + assert!(std::process::Command::new("docker") + .current_dir(path) + .arg("compose") + .arg("build") + .arg("message-queue") + .spawn() + .unwrap() + .wait() + .unwrap() + .success()); + + println!("Built!"); + + // Set built + *built_lock = true; + } + + type PrivateKey = ::F; + fn instance() -> (PrivateKey, HashMap, Composition) { + build(); + + let coord_key = ::F::random(&mut OsRng); + let priv_keys = HashMap::from([ + (NetworkId::Bitcoin, ::F::random(&mut OsRng)), + (NetworkId::Ethereum, ::F::random(&mut OsRng)), + (NetworkId::Monero, ::F::random(&mut OsRng)), + ]); + + let mut composition = Composition::with_image( + Image::with_repository("serai-dev-message-queue").pull_policy(PullPolicy::Never), + ) + .with_log_options(Some(LogOptions { + action: LogAction::Forward, + policy: LogPolicy::Always, + source: LogSource::Both, + })) + .with_env( + [ + ( + "COORDINATOR_KEY".to_string(), + hex::encode((Ristretto::generator() * coord_key).to_bytes()), + ), + ( + "BITCOIN_KEY".to_string(), + hex::encode((Ristretto::generator() * priv_keys[&NetworkId::Bitcoin]).to_bytes()), + ), + ( + "ETHEREUM_KEY".to_string(), + hex::encode((Ristretto::generator() * priv_keys[&NetworkId::Ethereum]).to_bytes()), + ), + ( + "MONERO_KEY".to_string(), + hex::encode((Ristretto::generator() * priv_keys[&NetworkId::Monero]).to_bytes()), + ), + ("DB_PATH".to_string(), "./message-queue-db".to_string()), + ] + .into(), + ); + composition.publish_all_ports(); + + (coord_key, priv_keys, composition) + } + + #[test] + fn basic_functionality() { + let mut test = DockerTest::new(); + let (coord_key, priv_keys, composition) = instance(); + test.add_composition(composition); + test.run(|ops| async move { + // Sleep for a second for the message-queue to boot + // It isn't an error to start immediately, it just silences an error + tokio::time::sleep(core::time::Duration::from_secs(1)).await; + + let rpc = ops.handle("serai-dev-message-queue").host_port(2287).unwrap(); + // TODO: MessageQueue directly read from env to remove this boilerplate from all binaries, + // yet it's now annoying as hell to parameterize. Split into new/from_env? + env::set_var( + "MESSAGE_QUEUE_RPC", + "http://".to_string() + &rpc.0.to_string() + ":" + &rpc.1.to_string(), + ); + env::set_var("MESSAGE_QUEUE_KEY", hex::encode(coord_key.to_repr())); + + // Queue some messagse + let coordinator = MessageQueue::new(Service::Coordinator); + coordinator + .queue( + Metadata { + from: Service::Coordinator, + to: Service::Processor(NetworkId::Bitcoin), + intent: b"intent".to_vec(), + }, + b"Hello, World!".to_vec(), + ) + .await; + + coordinator + .queue( + Metadata { + from: Service::Coordinator, + to: Service::Processor(NetworkId::Bitcoin), + intent: b"intent 2".to_vec(), + }, + b"Hello, World, again!".to_vec(), + ) + .await; + + // Successfully get it + env::set_var("MESSAGE_QUEUE_KEY", hex::encode(priv_keys[&NetworkId::Bitcoin].to_repr())); + let bitcoin = MessageQueue::new(Service::Processor(NetworkId::Bitcoin)); + let msg = bitcoin.next(0).await; + assert_eq!(msg.from, Service::Coordinator); + assert_eq!(msg.id, 0); + assert_eq!(&msg.msg, b"Hello, World!"); + + // If we don't ack it, it should continue to be returned + assert_eq!(msg, bitcoin.next(0).await); + + // Acknowledging it should yield the next message + bitcoin.ack(0).await; + + let next_msg = bitcoin.next(1).await; + assert!(msg != next_msg); + assert_eq!(next_msg.from, Service::Coordinator); + assert_eq!(next_msg.id, 1); + assert_eq!(&next_msg.msg, b"Hello, World, again!"); + }); + } +}