Add a Docker-based test for the message-queue service

This commit is contained in:
Luke Parker 2023-07-20 18:53:03 -04:00
parent ceeb57470f
commit 9effd5ccdc
No known key found for this signature in database
14 changed files with 439 additions and 25 deletions

View file

@ -0,0 +1,25 @@
name: Message Queue Tests
on:
push:
branches:
- develop
paths:
- "message-queue/**"
- "tests/message-queue/**"
pull_request:
workflow_dispatch:
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Build Dependencies
uses: ./.github/actions/build-dependencies
with:
github-token: ${{ inputs.github-token }}
- name: Run message-queue Docker tests
run: cd tests/message-queue && cargo test

145
Cargo.lock generated
View file

@ -788,6 +788,44 @@ dependencies = [
"subtle",
]
[[package]]
name = "bollard"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82e7850583ead5f8bbef247e2a3c37a19bd576e8420cd262a6711921827e1e5"
dependencies = [
"base64 0.13.1",
"bollard-stubs",
"bytes",
"futures-core",
"futures-util",
"hex",
"http",
"hyper",
"hyperlocal",
"log",
"pin-project-lite 0.2.10",
"serde",
"serde_derive",
"serde_json",
"serde_urlencoded",
"thiserror",
"tokio",
"tokio-util",
"url",
"winapi",
]
[[package]]
name = "bollard-stubs"
version = "1.42.0-rc.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed59b5c00048f48d7af971b71f800fdf23e858844a6f9e4d32ca72e9399e7864"
dependencies = [
"serde",
"serde_with",
]
[[package]]
name = "bounded-collections"
version = "0.1.8"
@ -1638,6 +1676,16 @@ dependencies = [
"zeroize",
]
[[package]]
name = "darling"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c"
dependencies = [
"darling_core 0.13.4",
"darling_macro 0.13.4",
]
[[package]]
name = "darling"
version = "0.14.4"
@ -1658,6 +1706,20 @@ dependencies = [
"darling_macro 0.20.3",
]
[[package]]
name = "darling_core"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim",
"syn 1.0.109",
]
[[package]]
name = "darling_core"
version = "0.14.4"
@ -1686,6 +1748,17 @@ dependencies = [
"syn 2.0.26",
]
[[package]]
name = "darling_macro"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835"
dependencies = [
"darling_core 0.13.4",
"quote",
"syn 1.0.109",
]
[[package]]
name = "darling_macro"
version = "0.14.4"
@ -1991,6 +2064,28 @@ dependencies = [
"zeroize",
]
[[package]]
name = "dockertest"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce5e89cd7c59faf3cf0e31369fce2382807dd794d4fcce6380adcefdf5987796"
dependencies = [
"anyhow",
"async-trait",
"base64 0.13.1",
"bollard",
"dyn-clone",
"futures",
"lazy_static",
"rand 0.8.5",
"secrecy",
"serde",
"serde_json",
"thiserror",
"tokio",
"tracing",
]
[[package]]
name = "downcast"
version = "0.11.0"
@ -3672,6 +3767,19 @@ dependencies = [
"tokio-native-tls",
]
[[package]]
name = "hyperlocal"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fafdf7b2b2de7c9784f76e02c0935e65a8117ec3b768644379983ab333ac98c"
dependencies = [
"futures-util",
"hex",
"hyper",
"pin-project",
"tokio",
]
[[package]]
name = "iana-time-zone"
version = "0.1.57"
@ -8670,10 +8778,12 @@ version = "0.1.0"
dependencies = [
"bincode",
"ciphersuite",
"env_logger",
"flexible-transcript",
"hex",
"jsonrpsee",
"lazy_static",
"log",
"rand_core 0.6.4",
"reqwest",
"rocksdb",
@ -8687,6 +8797,19 @@ dependencies = [
"zeroize",
]
[[package]]
name = "serai-message-queue-tests"
version = "0.1.0"
dependencies = [
"ciphersuite",
"dockertest",
"hex",
"rand_core 0.6.4",
"serai-message-queue",
"serai-primitives",
"tokio",
]
[[package]]
name = "serai-no-std-tests"
version = "0.1.0"
@ -8975,6 +9098,28 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_with"
version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff"
dependencies = [
"serde",
"serde_with_macros",
]
[[package]]
name = "serde_with_macros"
version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082"
dependencies = [
"darling 0.13.4",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "sha-1"
version = "0.9.8"

View file

@ -51,6 +51,7 @@ members = [
"substrate/client",
"tests/no-std",
"tests/message-queue",
]
# Always compile Monero (and a variety of dependencies) with optimizations due

View file

@ -32,7 +32,7 @@ scale = { package = "parity-scale-codec", version = "3", features = ["derive"] }
sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false }
rocksdb = "0.21"
serai-db = { path = "../common/db" }
serai-db = { path = "../common/db", features = ["rocksdb"] }
serai-env = { path = "../common/env" }
processor-messages = { package = "serai-processor-messages", path = "../processor/messages" }

View file

@ -45,7 +45,7 @@ RUN --mount=type=cache,target=/root/.local/ \
--mount=type=cache,target=/serai/target/release/incremental \
--mount=type=cache,target=/serai/target/release/wbuild \
--mount=type=cache,target=/serai/target/release/lib* \
cd message-queue && cargo build --release
cd message-queue && cargo build --release --all-features
# Prepare Image
FROM ubuntu:latest as image

View file

@ -45,7 +45,7 @@ RUN --mount=type=cache,target=/root/.local/ \
--mount=type=cache,target=/serai/target/release/incremental \
--mount=type=cache,target=/serai/target/release/wbuild \
--mount=type=cache,target=/serai/target/release/lib* \
cd processor && cargo build --release
cd processor && cargo build --release --all-features
# Prepare Image
FROM ubuntu:latest as image

View file

@ -1,9 +1,9 @@
[package]
name = "serai-message-queue"
version = "0.1.0"
description = "A message queue focused on safety"
description = "A message queue for Serai focused on consistency"
license = "AGPL-3.0-only"
repository = "https://github.com/serai-dex/serai/tree/develop/message-log"
repository = "https://github.com/serai-dex/serai/tree/develop/message-queue"
authors = ["Luke Parker <lukeparker5132@gmail.com>"]
keywords = []
edition = "2021"
@ -14,7 +14,6 @@ all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
# Macros
lazy_static = "1"
serde = { version = "1", features = ["derive"] }
@ -34,14 +33,20 @@ ciphersuite = { path = "../crypto/ciphersuite", features = ["ristretto"] }
schnorr-signatures = { path = "../crypto/schnorr" }
# Application
log = "0.4"
env_logger = "0.10"
tokio = { version = "1", features = ["full"] }
serai-db = { path = "../common/db", features = ["rocksdb"] }
rocksdb = "0.21"
serai-db = { path = "../common/db", features = ["rocksdb"], optional = true }
rocksdb = { version = "0.21", optional = true }
serai-env = { path = "../common/env" }
serai-primitives = { path = "../substrate/primitives" }
jsonrpsee = { version = "0.16", features = ["server"] }
jsonrpsee = { version = "0.16", features = ["server"], optional = true }
reqwest = { version = "0.11", features = ["json"] }
[features]
binaries = ["serai-db", "rocksdb", "jsonrpsee"]

View file

@ -57,7 +57,7 @@ impl MessageQueue {
async fn json_call(&self, method: &'static str, params: serde_json::Value) -> serde_json::Value {
#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
struct JsonRpcRequest {
version: &'static str,
jsonrpc: &'static str,
method: &'static str,
params: serde_json::Value,
id: u64,
@ -65,21 +65,29 @@ impl MessageQueue {
let res = loop {
// Make the request
if let Ok(req) = self
match self
.client
.post(&self.url)
.json(&JsonRpcRequest { version: "2.0", method, params: params.clone(), id: 0 })
.json(&JsonRpcRequest { jsonrpc: "2.0", method, params: params.clone(), id: 0 })
.send()
.await
{
Ok(req) => {
// Get the response
if let Ok(res) = req.text().await {
break res;
match req.text().await {
Ok(res) => break res,
Err(e) => {
dbg!(e);
}
}
}
Err(e) => {
dbg!(e);
}
}
// Sleep 5s before trying again
tokio::time::sleep(core::time::Duration::from_secs(5)).await;
// Sleep for a second before trying again
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
};
let json =
@ -161,7 +169,7 @@ impl MessageQueue {
)
.serialize();
let json = self.json_call("ack", serde_json::json!([id, sig])).await;
let json = self.json_call("ack", serde_json::json!([self.service, id, sig])).await;
if json.get("result") != Some(&serde_json::Value::Bool(true)) {
panic!("failed to ack message {id}: {json}");
}

View file

@ -53,13 +53,15 @@ fn queue_message(meta: Metadata, msg: Vec<u8>, sig: SchnorrSignature<Ristretto>)
// TODO: Verify (from, intent) hasn't been prior seen
// Queue it
(*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message(QueuedMessage {
let id = (*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message(QueuedMessage {
from: meta.from,
// Temporary value which queue_message will override
id: u64::MAX,
msg,
sig: sig.serialize(),
});
log::info!("Queued message from {:?}. It is {:?} {id}", meta.from, meta.to);
}
// next RPC method
@ -100,11 +102,20 @@ fn ack_message(service: Service, id: u64, sig: SchnorrSignature<Ristretto>) {
// It's the second if we acknowledge messages before saving them as acknowledged
// TODO: Check only a proper message is being acked
log::info!("{:?} is acknowledging {}", service, id);
(*QUEUES).read().unwrap()[&service].write().unwrap().ack_message(id)
}
#[tokio::main]
async fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}
env_logger::init();
log::info!("Starting message-queue service...");
// Open the DB
let db = Arc::new(
rocksdb::TransactionDB::open_default(
@ -160,14 +171,13 @@ async fn main() {
args.1,
SchnorrSignature::<Ristretto>::read(&mut args.2.as_slice()).unwrap(),
);
Ok(())
Ok(true)
})
.unwrap();
module
.register_method("next", |args, _| {
let args = args.parse::<(Service, u64)>().unwrap();
get_next_message(args.0, args.1);
Ok(())
Ok(get_next_message(args.0, args.1))
})
.unwrap();
module
@ -178,7 +188,7 @@ async fn main() {
args.1,
SchnorrSignature::<Ristretto>::read(&mut args.2.as_slice()).unwrap(),
);
Ok(())
Ok(true)
})
.unwrap();

View file

@ -33,7 +33,7 @@ impl<D: Db> Queue<D> {
fn message_key(&self, id: u64) -> Vec<u8> {
Self::key(b"message", serde_json::to_vec(&(self.1, id)).unwrap())
}
pub(crate) fn queue_message(&mut self, mut msg: QueuedMessage) {
pub(crate) fn queue_message(&mut self, mut msg: QueuedMessage) -> u64 {
let id = self.message_count();
msg.id = id;
let msg_key = self.message_key(id);
@ -43,6 +43,8 @@ impl<D: Db> Queue<D> {
txn.put(msg_key, serde_json::to_vec(&msg).unwrap());
txn.put(msg_count_key, (id + 1).to_le_bytes());
txn.commit();
id
}
pub(crate) fn get_message(&self, id: u64) -> Option<QueuedMessage> {

View file

@ -55,7 +55,7 @@ log = "0.4"
tokio = { version = "1", features = ["full"] }
rocksdb = "0.21"
serai-db = { path = "../common/db", default-features = false }
serai-db = { path = "../common/db", default-features = false, features = ["rocksdb"] }
serai-env = { path = "../common/env" }
serai-client = { path = "../substrate/client", default-features = false }

View file

@ -0,0 +1,27 @@
[package]
name = "serai-message-queue-tests"
version = "0.1.0"
description = "Tests for Serai's message-queue"
license = "AGPL-3.0-only"
repository = "https://github.com/serai-dex/serai/tree/develop/tests/message-queue"
authors = ["Luke Parker <lukeparker5132@gmail.com>"]
keywords = []
edition = "2021"
publish = false
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
hex = "0.4"
rand_core = "0.6"
ciphersuite = { path = "../../crypto/ciphersuite", features = ["ristretto"] }
serai-primitives = { path = "../../substrate/primitives" }
serai-message-queue = { path = "../../message-queue" }
tokio = { version = "1", features = ["full"] }
dockertest = "0.3"

View file

@ -0,0 +1,15 @@
AGPL-3.0-only license
Copyright (c) 2023 Luke Parker
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License Version 3 as
published by the Free Software Foundation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.

View file

@ -0,0 +1,176 @@
#[cfg(test)]
mod tests {
use std::{
sync::{Mutex, OnceLock},
collections::HashMap,
env,
};
use rand_core::OsRng;
use ciphersuite::{
group::{
ff::{Field, PrimeField},
GroupEncoding,
},
Ciphersuite, Ristretto,
};
use serai_primitives::NetworkId;
use serai_message_queue::{Service, Metadata, client::MessageQueue};
use dockertest::{
PullPolicy, Image, LogAction, LogPolicy, LogSource, LogOptions, Composition, DockerTest,
};
static BUILT: OnceLock<Mutex<bool>> = OnceLock::new();
fn build() {
let built = BUILT.get_or_init(|| Mutex::new(false));
// Only one call to build will acquire this lock
let mut built_lock = built.lock().unwrap();
if *built_lock {
// If it was built, return
return;
}
// Else, hold the lock while we build
let mut path = env::current_exe().unwrap();
path.pop();
assert!(path.as_path().ends_with("deps"));
path.pop();
assert!(path.as_path().ends_with("debug"));
path.pop();
assert!(path.as_path().ends_with("target"));
path.pop();
path.push("deploy");
println!("Building message-queue...");
assert!(std::process::Command::new("docker")
.current_dir(path)
.arg("compose")
.arg("build")
.arg("message-queue")
.spawn()
.unwrap()
.wait()
.unwrap()
.success());
println!("Built!");
// Set built
*built_lock = true;
}
type PrivateKey = <Ristretto as Ciphersuite>::F;
fn instance() -> (PrivateKey, HashMap<NetworkId, PrivateKey>, Composition) {
build();
let coord_key = <Ristretto as Ciphersuite>::F::random(&mut OsRng);
let priv_keys = HashMap::from([
(NetworkId::Bitcoin, <Ristretto as Ciphersuite>::F::random(&mut OsRng)),
(NetworkId::Ethereum, <Ristretto as Ciphersuite>::F::random(&mut OsRng)),
(NetworkId::Monero, <Ristretto as Ciphersuite>::F::random(&mut OsRng)),
]);
let mut composition = Composition::with_image(
Image::with_repository("serai-dev-message-queue").pull_policy(PullPolicy::Never),
)
.with_log_options(Some(LogOptions {
action: LogAction::Forward,
policy: LogPolicy::Always,
source: LogSource::Both,
}))
.with_env(
[
(
"COORDINATOR_KEY".to_string(),
hex::encode((Ristretto::generator() * coord_key).to_bytes()),
),
(
"BITCOIN_KEY".to_string(),
hex::encode((Ristretto::generator() * priv_keys[&NetworkId::Bitcoin]).to_bytes()),
),
(
"ETHEREUM_KEY".to_string(),
hex::encode((Ristretto::generator() * priv_keys[&NetworkId::Ethereum]).to_bytes()),
),
(
"MONERO_KEY".to_string(),
hex::encode((Ristretto::generator() * priv_keys[&NetworkId::Monero]).to_bytes()),
),
("DB_PATH".to_string(), "./message-queue-db".to_string()),
]
.into(),
);
composition.publish_all_ports();
(coord_key, priv_keys, composition)
}
#[test]
fn basic_functionality() {
let mut test = DockerTest::new();
let (coord_key, priv_keys, composition) = instance();
test.add_composition(composition);
test.run(|ops| async move {
// Sleep for a second for the message-queue to boot
// It isn't an error to start immediately, it just silences an error
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
let rpc = ops.handle("serai-dev-message-queue").host_port(2287).unwrap();
// TODO: MessageQueue directly read from env to remove this boilerplate from all binaries,
// yet it's now annoying as hell to parameterize. Split into new/from_env?
env::set_var(
"MESSAGE_QUEUE_RPC",
"http://".to_string() + &rpc.0.to_string() + ":" + &rpc.1.to_string(),
);
env::set_var("MESSAGE_QUEUE_KEY", hex::encode(coord_key.to_repr()));
// Queue some messagse
let coordinator = MessageQueue::new(Service::Coordinator);
coordinator
.queue(
Metadata {
from: Service::Coordinator,
to: Service::Processor(NetworkId::Bitcoin),
intent: b"intent".to_vec(),
},
b"Hello, World!".to_vec(),
)
.await;
coordinator
.queue(
Metadata {
from: Service::Coordinator,
to: Service::Processor(NetworkId::Bitcoin),
intent: b"intent 2".to_vec(),
},
b"Hello, World, again!".to_vec(),
)
.await;
// Successfully get it
env::set_var("MESSAGE_QUEUE_KEY", hex::encode(priv_keys[&NetworkId::Bitcoin].to_repr()));
let bitcoin = MessageQueue::new(Service::Processor(NetworkId::Bitcoin));
let msg = bitcoin.next(0).await;
assert_eq!(msg.from, Service::Coordinator);
assert_eq!(msg.id, 0);
assert_eq!(&msg.msg, b"Hello, World!");
// If we don't ack it, it should continue to be returned
assert_eq!(msg, bitcoin.next(0).await);
// Acknowledging it should yield the next message
bitcoin.ack(0).await;
let next_msg = bitcoin.next(1).await;
assert!(msg != next_msg);
assert_eq!(next_msg.from, Service::Coordinator);
assert_eq!(next_msg.id, 1);
assert_eq!(&next_msg.msg, b"Hello, World, again!");
});
}
}