Add a message-queue connection to processor

Still needs love, yet should get us closer to starting testing.
This commit is contained in:
Luke Parker 2023-07-17 15:49:15 -04:00
parent 56f7037084
commit 6ccac2d0ab
No known key found for this signature in database
17 changed files with 184 additions and 81 deletions

68
Cargo.lock generated
View file

@ -3279,26 +3279,6 @@ dependencies = [
"regex", "regex",
] ]
[[package]]
name = "gloo-net"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9902a044653b26b99f7e3693a42f171312d9be8b26b5697bd1e43ad1f8a35e10"
dependencies = [
"futures-channel",
"futures-core",
"futures-sink",
"gloo-utils",
"js-sys",
"pin-project",
"serde",
"serde_json",
"thiserror",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]] [[package]]
name = "gloo-timers" name = "gloo-timers"
version = "0.2.6" version = "0.2.6"
@ -3311,19 +3291,6 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "gloo-utils"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "037fcb07216cb3a30f7292bd0176b050b7b9a052ba830ef7d5d65f6dc64ba58e"
dependencies = [
"js-sys",
"serde",
"serde_json",
"wasm-bindgen",
"web-sys",
]
[[package]] [[package]]
name = "group" name = "group"
version = "0.12.1" version = "0.12.1"
@ -3954,8 +3921,6 @@ dependencies = [
"jsonrpsee-proc-macros", "jsonrpsee-proc-macros",
"jsonrpsee-server", "jsonrpsee-server",
"jsonrpsee-types", "jsonrpsee-types",
"jsonrpsee-wasm-client",
"jsonrpsee-ws-client",
"tracing", "tracing",
] ]
@ -3965,11 +3930,7 @@ version = "0.16.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "965de52763f2004bc91ac5bcec504192440f0b568a5d621c59d9dbd6f886c3fb" checksum = "965de52763f2004bc91ac5bcec504192440f0b568a5d621c59d9dbd6f886c3fb"
dependencies = [ dependencies = [
"anyhow",
"futures-channel",
"futures-timer",
"futures-util", "futures-util",
"gloo-net",
"http", "http",
"jsonrpsee-core", "jsonrpsee-core",
"jsonrpsee-types", "jsonrpsee-types",
@ -4010,7 +3971,6 @@ dependencies = [
"thiserror", "thiserror",
"tokio", "tokio",
"tracing", "tracing",
"wasm-bindgen-futures",
] ]
[[package]] [[package]]
@ -4081,29 +4041,6 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "jsonrpsee-wasm-client"
version = "0.16.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a77310456f43c6c89bcba1f6b2fc2a28300da7c341f320f5128f8c83cc63232d"
dependencies = [
"jsonrpsee-client-transport",
"jsonrpsee-core",
"jsonrpsee-types",
]
[[package]]
name = "jsonrpsee-ws-client"
version = "0.16.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b83daeecfc6517cfe210df24e570fb06213533dfb990318fae781f4c7119dd9"
dependencies = [
"http",
"jsonrpsee-client-transport",
"jsonrpsee-core",
"jsonrpsee-types",
]
[[package]] [[package]]
name = "k256" name = "k256"
version = "0.13.1" version = "0.13.1"
@ -8786,14 +8723,13 @@ dependencies = [
"async-trait", "async-trait",
"bincode", "bincode",
"bitcoin-serai", "bitcoin-serai",
"ciphersuite",
"dalek-ff-group", "dalek-ff-group",
"env_logger", "env_logger",
"flexible-transcript", "flexible-transcript",
"frost-schnorrkel", "frost-schnorrkel",
"futures", "futures",
"group 0.13.0",
"hex", "hex",
"jsonrpsee",
"k256", "k256",
"lazy_static", "lazy_static",
"log", "log",
@ -8802,6 +8738,8 @@ dependencies = [
"parity-scale-codec", "parity-scale-codec",
"rand_chacha 0.3.1", "rand_chacha 0.3.1",
"rand_core 0.6.4", "rand_core 0.6.4",
"reqwest",
"schnorr-signatures",
"secp256k1", "secp256k1",
"serai-client", "serai-client",
"serai-db", "serai-db",

View file

@ -53,6 +53,8 @@ fn queue_message(meta: Metadata, msg: Vec<u8>, sig: SchnorrSignature<Ristretto>)
// Queue it // Queue it
(*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message(QueuedMessage { (*QUEUES).read().unwrap()[&meta.to].write().unwrap().queue_message(QueuedMessage {
from: meta.from, from: meta.from,
// Temporary value which queue_message will override
id: u64::MAX,
msg, msg,
sig: sig.serialize(), sig: sig.serialize(),
}); });
@ -133,6 +135,8 @@ async fn main() {
// Start server // Start server
let builder = ServerBuilder::new(); let builder = ServerBuilder::new();
// TODO: Add middleware to check some key is present in the header, making this an authed
// connection
// TODO: Set max request/response size // TODO: Set max request/response size
// 5132 ^ ((b'M' << 8) | b'Q') // 5132 ^ ((b'M' << 8) | b'Q')
let listen_on: &[std::net::SocketAddr] = &["0.0.0.0:2287".parse().unwrap()]; let listen_on: &[std::net::SocketAddr] = &["0.0.0.0:2287".parse().unwrap()];
@ -152,7 +156,7 @@ async fn main() {
.unwrap(); .unwrap();
module module
.register_method("next", |args, _| { .register_method("next", |args, _| {
let args = args.parse::<(Service, u64, Vec<u8>)>().unwrap(); let args = args.parse::<(Service, u64)>().unwrap();
get_next_message(args.0, args.1); get_next_message(args.0, args.1);
Ok(()) Ok(())
}) })

View file

@ -14,6 +14,7 @@ pub enum Service {
#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
pub struct QueuedMessage { pub struct QueuedMessage {
pub from: Service, pub from: Service,
pub id: u64,
pub msg: Vec<u8>, pub msg: Vec<u8>,
pub sig: Vec<u8>, pub sig: Vec<u8>,
} }

View file

@ -33,8 +33,9 @@ impl<D: Db> Queue<D> {
fn message_key(&self, id: u64) -> Vec<u8> { fn message_key(&self, id: u64) -> Vec<u8> {
Self::key(b"message", serde_json::to_vec(&(self.1, id)).unwrap()) Self::key(b"message", serde_json::to_vec(&(self.1, id)).unwrap())
} }
pub(crate) fn queue_message(&mut self, msg: QueuedMessage) { pub(crate) fn queue_message(&mut self, mut msg: QueuedMessage) {
let id = self.message_count(); let id = self.message_count();
msg.id = id;
let msg_key = self.message_key(id); let msg_key = self.message_key(id);
let msg_count_key = self.message_count_key(); let msg_count_key = self.message_count_key();
@ -45,7 +46,11 @@ impl<D: Db> Queue<D> {
} }
pub(crate) fn get_message(&self, id: u64) -> Option<QueuedMessage> { pub(crate) fn get_message(&self, id: u64) -> Option<QueuedMessage> {
self.0.get(self.message_key(id)).map(|bytes| serde_json::from_slice(&bytes).unwrap()) let msg = self.0.get(self.message_key(id)).map(|bytes| serde_json::from_slice(&bytes).unwrap());
if let Some(msg) = msg.as_ref() {
assert_eq!(msg.id, id, "message stored at {id} has ID {}", msg.id);
}
msg
} }
pub(crate) fn ack_message(&mut self, id: u64) { pub(crate) fn ack_message(&mut self, id: u64) {

View file

@ -32,7 +32,8 @@ bincode = "1"
serde_json = "1" serde_json = "1"
# Cryptography # Cryptography
group = "0.13" ciphersuite = { path = "../crypto/ciphersuite", features = ["ristretto"] }
schnorr = { package = "schnorr-signatures", path = "../crypto/schnorr" }
transcript = { package = "flexible-transcript", path = "../crypto/transcript" } transcript = { package = "flexible-transcript", path = "../crypto/transcript" }
frost = { package = "modular-frost", path = "../crypto/frost", features = ["ristretto"] } frost = { package = "modular-frost", path = "../crypto/frost", features = ["ristretto"] }
@ -60,7 +61,7 @@ serai-client = { path = "../substrate/client", default-features = false }
messages = { package = "serai-processor-messages", path = "./messages" } messages = { package = "serai-processor-messages", path = "./messages" }
jsonrpsee = { version = "0.16", features = ["client"] } reqwest = "0.11"
message-queue = { package = "serai-message-queue", path = "../message-queue" } message-queue = { package = "serai-message-queue", path = "../message-queue" }
[dev-dependencies] [dev-dependencies]

View file

@ -3,7 +3,7 @@ use std::{time::Duration, io, collections::HashMap};
use async_trait::async_trait; use async_trait::async_trait;
use transcript::RecommendedTranscript; use transcript::RecommendedTranscript;
use group::ff::PrimeField; use ciphersuite::group::ff::PrimeField;
use k256::{ProjectivePoint, Scalar}; use k256::{ProjectivePoint, Scalar};
use frost::{ use frost::{
curve::{Curve, Secp256k1}, curve::{Curve, Secp256k1},

View file

@ -6,7 +6,7 @@ use zeroize::Zeroizing;
use transcript::RecommendedTranscript; use transcript::RecommendedTranscript;
use group::{ff::Field, Group}; use ciphersuite::group::{ff::Field, Group};
use dalek_ff_group::{Scalar, EdwardsPoint}; use dalek_ff_group::{Scalar, EdwardsPoint};
use frost::{curve::Ed25519, ThresholdKeys}; use frost::{curve::Ed25519, ThresholdKeys};

View file

@ -1,10 +1,23 @@
use core::ops::Deref;
use std::{ use std::{
sync::{Arc, RwLock}, sync::{Arc, RwLock},
collections::VecDeque, collections::VecDeque,
}; };
use zeroize::Zeroizing;
use rand_core::OsRng;
use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto};
use schnorr::SchnorrSignature;
use serde::{Serialize, Deserialize};
use messages::{ProcessorMessage, CoordinatorMessage}; use messages::{ProcessorMessage, CoordinatorMessage};
use serai_client::primitives::NetworkId;
use message_queue::{Service, Metadata, QueuedMessage, message_challenge};
use reqwest::Client;
#[derive(Clone, PartialEq, Eq, Debug)] #[derive(Clone, PartialEq, Eq, Debug)]
pub struct Message { pub struct Message {
pub id: u64, pub id: u64,
@ -18,6 +31,147 @@ pub trait Coordinator {
async fn ack(&mut self, msg: Message); async fn ack(&mut self, msg: Message);
} }
pub struct MessageQueue {
network: NetworkId,
priv_key: Zeroizing<<Ristretto as Ciphersuite>::F>,
pub_key: <Ristretto as Ciphersuite>::G,
client: Client,
message_queue_url: String,
}
impl MessageQueue {
pub fn new(
message_queue_url: String,
network: NetworkId,
priv_key: Zeroizing<<Ristretto as Ciphersuite>::F>,
) -> MessageQueue {
MessageQueue {
network,
pub_key: Ristretto::generator() * priv_key.deref(),
priv_key,
client: Client::new(),
message_queue_url,
}
}
async fn json_call(&self, method: &'static str, params: serde_json::Value) -> serde_json::Value {
#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
struct JsonRpcRequest {
version: &'static str,
method: &'static str,
params: serde_json::Value,
id: u64,
}
let res = loop {
// Make the request
if let Ok(req) = self
.client
.post(&self.message_queue_url)
.json(&JsonRpcRequest { version: "2.0", method, params: params.clone(), id: 0 })
.send()
.await
{
// Get the response
if let Ok(res) = req.text().await {
break res;
}
}
// Sleep 5s before trying again
tokio::time::sleep(core::time::Duration::from_secs(5)).await;
};
let json =
serde_json::from_str::<serde_json::Value>(&res).expect("message-queue returned invalid JSON");
if json.get("result").is_none() {
panic!("call failed: {json}");
}
json
}
async fn queue(&self, metadata: Metadata, msg: Vec<u8>, sig: Vec<u8>) {
let json = self.json_call("queue", serde_json::json!([metadata, msg, sig])).await;
if json.get("result") != Some(&serde_json::Value::Bool(true)) {
panic!("failed to queue message: {json}");
}
}
async fn next(&self) -> Message {
loop {
// TODO: Use a proper expected next ID
let json =
self.json_call("next", serde_json::json!([Service::Processor(self.network), 0])).await;
// Convert from a Value to a type via reserialization
let msg: Option<QueuedMessage> = serde_json::from_str(
&serde_json::to_string(
&json.get("result").expect("successful JSON RPC call didn't have result"),
)
.unwrap(),
)
.expect("next didn't return an Option<QueuedMessage>");
// If there wasn't a message, check again in 5s
let Some(msg) = msg else {
tokio::time::sleep(core::time::Duration::from_secs(5)).await;
continue;
};
// Verify the message
assert_eq!(msg.from, Service::Coordinator, "non-coordinator sent us message");
// TODO: Verify the coordinator's signature
// TODO: Check the ID is sane
let id = msg.id;
// Deserialize it into a CoordinatorMessage
let msg: CoordinatorMessage = serde_json::from_str(
&String::from_utf8(msg.msg).expect("msg wasn't valid UTF-8 (not JSON?)"),
)
.expect("message wasn't a JSON-encoded CoordinatorMessage");
return Message { id, msg };
}
}
async fn ack(&self, id: u64, sig: Vec<u8>) {
let json = self.json_call("ack", serde_json::json!([id, sig])).await;
if json.get("result") != Some(&serde_json::Value::Bool(true)) {
panic!("failed to ack message {id}: {json}");
}
}
}
#[async_trait::async_trait]
impl Coordinator for MessageQueue {
async fn send(&mut self, msg: ProcessorMessage) {
let metadata = Metadata {
from: Service::Processor(self.network),
to: Service::Coordinator,
intent: msg.intent(),
};
let msg = serde_json::to_string(&msg).unwrap();
// TODO: Should this use OsRng? Deterministic or deterministic + random may be better.
let nonce = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng));
let nonce_pub = Ristretto::generator() * nonce.deref();
let sig = SchnorrSignature::<Ristretto>::sign(
&self.priv_key,
nonce,
message_challenge(self.pub_key, metadata.to, &metadata.intent, msg.as_bytes(), nonce_pub),
);
self.queue(metadata, msg.into_bytes(), sig.serialize()).await;
}
async fn recv(&mut self) -> Message {
self.next().await
}
async fn ack(&mut self, msg: Message) {
// TODO: Use a proper signature once message-queue checks ack signatures
MessageQueue::ack(self, msg.id, vec![0; 64]).await
}
}
// TODO: Move this to tests // TODO: Move this to tests
pub struct MemCoordinator(Arc<RwLock<VecDeque<Message>>>); pub struct MemCoordinator(Arc<RwLock<VecDeque<Message>>>);
impl MemCoordinator { impl MemCoordinator {

View file

@ -7,7 +7,7 @@ use rand_core::SeedableRng;
use rand_chacha::ChaCha20Rng; use rand_chacha::ChaCha20Rng;
use transcript::{Transcript, RecommendedTranscript}; use transcript::{Transcript, RecommendedTranscript};
use group::GroupEncoding; use ciphersuite::group::GroupEncoding;
use frost::{ use frost::{
curve::{Ciphersuite, Ristretto}, curve::{Ciphersuite, Ristretto},
dkg::{Participant, ThresholdParams, ThresholdCore, ThresholdKeys, encryption::*, frost::*}, dkg::{Participant, ThresholdParams, ThresholdCore, ThresholdKeys, encryption::*, frost::*},

View file

@ -6,7 +6,7 @@ use std::{
use zeroize::{Zeroize, Zeroizing}; use zeroize::{Zeroize, Zeroizing};
use transcript::{Transcript, RecommendedTranscript}; use transcript::{Transcript, RecommendedTranscript};
use group::GroupEncoding; use ciphersuite::group::GroupEncoding;
use frost::{curve::Ciphersuite, ThresholdKeys}; use frost::{curve::Ciphersuite, ThresholdKeys};
use log::{info, warn, error}; use log::{info, warn, error};

View file

@ -1,7 +1,7 @@
use std::io; use std::io;
use transcript::{Transcript, RecommendedTranscript}; use transcript::{Transcript, RecommendedTranscript};
use group::GroupEncoding; use ciphersuite::group::GroupEncoding;
use frost::curve::Ciphersuite; use frost::curve::Ciphersuite;
use crate::coins::{Output, Coin}; use crate::coins::{Output, Coin};

View file

@ -5,7 +5,7 @@ use std::{
collections::{HashSet, HashMap}, collections::{HashSet, HashMap},
}; };
use group::GroupEncoding; use ciphersuite::group::GroupEncoding;
use frost::curve::Ciphersuite; use frost::curve::Ciphersuite;
use log::{info, debug, warn}; use log::{info, debug, warn};

View file

@ -3,7 +3,7 @@ use std::collections::{VecDeque, HashMap};
use rand_core::OsRng; use rand_core::OsRng;
use group::GroupEncoding; use ciphersuite::group::GroupEncoding;
use frost::{ use frost::{
ThresholdKeys, ThresholdKeys,
sign::{Writable, PreprocessMachine, SignMachine, SignatureMachine}, sign::{Writable, PreprocessMachine, SignMachine, SignatureMachine},

View file

@ -5,7 +5,7 @@ use rand_core::OsRng;
use scale::Encode; use scale::Encode;
use group::GroupEncoding; use ciphersuite::group::GroupEncoding;
use frost::{ use frost::{
curve::Ristretto, curve::Ristretto,
ThresholdKeys, ThresholdKeys,

View file

@ -4,7 +4,7 @@ use zeroize::Zeroizing;
use rand_core::{RngCore, OsRng}; use rand_core::{RngCore, OsRng};
use group::GroupEncoding; use ciphersuite::group::GroupEncoding;
use frost::{Participant, ThresholdParams, tests::clone_without}; use frost::{Participant, ThresholdParams, tests::clone_without};
use serai_db::{DbTxn, Db, MemDb}; use serai_db::{DbTxn, Db, MemDb};

View file

@ -2,7 +2,7 @@ use std::collections::HashMap;
use rand_core::{RngCore, OsRng}; use rand_core::{RngCore, OsRng};
use group::GroupEncoding; use ciphersuite::group::GroupEncoding;
use frost::{ use frost::{
Participant, ThresholdKeys, Participant, ThresholdKeys,
dkg::tests::{key_gen, clone_without}, dkg::tests::{key_gen, clone_without},

View file

@ -2,7 +2,7 @@ use std::collections::HashMap;
use rand_core::{RngCore, OsRng}; use rand_core::{RngCore, OsRng};
use group::GroupEncoding; use ciphersuite::group::GroupEncoding;
use frost::{ use frost::{
curve::Ristretto, curve::Ristretto,
Participant, Participant,