From a7c9c1ef55d23391452b82fcec8241a0937231e7 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 18 Jul 2023 01:53:51 -0400 Subject: [PATCH] Integrate coordinator with MessageQueue and RocksDB Also resolves a couple TODOs. --- Cargo.lock | 11 +- coordinator/Cargo.toml | 5 +- coordinator/src/main.rs | 63 +++++---- coordinator/src/processors.rs | 48 +++---- coordinator/src/tests/mod.rs | 37 ++++++ coordinator/src/tests/tributary/dkg.rs | 6 +- message-queue/Cargo.toml | 7 +- message-queue/src/client.rs | 169 +++++++++++++++++++++++++ message-queue/src/lib.rs | 2 + processor/Cargo.toml | 2 - processor/src/coordinator.rs | 167 ++---------------------- processor/src/main.rs | 29 +---- 12 files changed, 309 insertions(+), 237 deletions(-) create mode 100644 message-queue/src/client.rs diff --git a/Cargo.lock b/Cargo.lock index 79f41f25..ac13305a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8572,11 +8572,14 @@ dependencies = [ "modular-frost", "parity-scale-codec", "rand_core 0.6.4", + "rocksdb", "schnorr-signatures", - "schnorrkel", "serai-client", "serai-db", + "serai-env", + "serai-message-queue", "serai-processor-messages", + "serde_json", "sp-application-crypto", "tokio", "tributary-chain", @@ -8636,7 +8639,8 @@ dependencies = [ "hex", "jsonrpsee", "lazy_static", - "log", + "rand_core 0.6.4", + "reqwest", "rocksdb", "schnorr-signatures", "serai-db", @@ -8645,6 +8649,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "zeroize", ] [[package]] @@ -8738,9 +8743,7 @@ dependencies = [ "parity-scale-codec", "rand_chacha 0.3.1", "rand_core 0.6.4", - "reqwest", "rocksdb", - "schnorr-signatures", "secp256k1", "serai-client", "serai-db", diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index ab0a08da..0e32dbe4 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -29,17 +29,20 @@ frost = { package = "modular-frost", path = "../crypto/frost" } scale = { package = "parity-scale-codec", version = "3", features = ["derive"] } -schnorrkel = "0.10" sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false } +rocksdb = "0.21" serai-db = { path = "../common/db" } +serai-env = { path = "../common/env" } processor-messages = { package = "serai-processor-messages", path = "../processor/messages" } +message-queue = { package = "serai-message-queue", path = "../message-queue" } tributary = { package = "tributary-chain", path = "./tributary" } serai-client = { path = "../substrate/client", features = ["serai"] } hex = "0.4" +serde_json = "1" log = "0.4" tokio = { version = "1", features = ["full"] } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 5197b986..0f75c733 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -14,10 +14,13 @@ use rand_core::OsRng; use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto}; -use serai_db::{DbTxn, Db, MemDb}; +use serai_db::{DbTxn, Db}; +use serai_env as env; use serai_client::{Public, Signature, Serai}; +use message_queue::{Service, client::MessageQueue}; + use tokio::{ sync::{ mpsc::{self, UnboundedSender}, @@ -322,7 +325,7 @@ pub async fn handle_p2p( // connection // In order to reduce congestion though, we should at least check if we take value from // this message before running spawn - // TODO + // TODO2 tokio::spawn({ let tributaries = tributaries.clone(); async move { @@ -398,7 +401,7 @@ pub async fn handle_processors( key_gen::ProcessorMessage::GeneratedKeyPair { id, substrate_key, coin_key } => { assert_eq!( id.set.network, msg.network, - "processor claimed to be a different network than it was for SubstrateBlockAck", + "processor claimed to be a different network than it was for GeneratedKeyPair", ); // TODO: Also check the other KeyGenId fields @@ -415,13 +418,15 @@ pub async fn handle_processors( Signature([0; 64]), // TODO ); - match serai.publish(&tx).await { - Ok(hash) => { - log::info!("voted on key pair for {:?} in TX {}", id.set, hex::encode(hash)) - } - Err(e) => { - log::error!("couldn't connect to Serai node to publish vote TX: {:?}", e); - todo!(); // TODO + loop { + match serai.publish(&tx).await { + Ok(hash) => { + log::info!("voted on key pair for {:?} in TX {}", id.set, hex::encode(hash)) + } + Err(e) => { + log::error!("couldn't connect to Serai node to publish vote TX: {:?}", e); + tokio::time::sleep(Duration::from_secs(10)).await; + } } } @@ -507,19 +512,22 @@ pub async fn handle_processors( // TODO: Check this key's key pair's substrate key is authorized to publish batches // TODO: Check the batch ID is an atomic increment - match serai.publish(&Serai::execute_batch(batch.clone())).await { - Ok(hash) => { - log::info!( - "executed batch {:?} {} (block {}) in TX {}", - batch.batch.network, - batch.batch.id, - hex::encode(batch.batch.block), - hex::encode(hash), - ) - } - Err(e) => { - log::error!("couldn't connect to Serai node to publish batch TX: {:?}", e); - todo!(); // TODO + loop { + match serai.publish(&Serai::execute_batch(batch.clone())).await { + Ok(hash) => { + log::info!( + "executed batch {:?} {} (block {}) in TX {}", + batch.batch.network, + batch.batch.id, + hex::encode(batch.batch.block), + hex::encode(hash), + ); + break; + } + Err(e) => { + log::error!("couldn't connect to Serai node to publish batch TX: {:?}", e); + tokio::time::sleep(Duration::from_secs(10)).await; + } } } @@ -661,12 +669,17 @@ pub async fn run( #[tokio::main] async fn main() { - let db = MemDb::new(); // TODO + let db = Arc::new( + rocksdb::TransactionDB::::open_default( + env::var("DB_PATH").expect("path to DB wasn't specified"), + ) + .unwrap(), + ); let key = Zeroizing::new(::F::ZERO); // TODO let p2p = LocalP2p::new(1).swap_remove(0); // TODO - let processors = processors::MemProcessors::new(); // TODO + let processors = Arc::new(MessageQueue::new(Service::Coordinator)); let serai = || async { loop { diff --git a/coordinator/src/processors.rs b/coordinator/src/processors.rs index 3b03cb1d..1b5573bd 100644 --- a/coordinator/src/processors.rs +++ b/coordinator/src/processors.rs @@ -1,14 +1,10 @@ -use std::{ - sync::Arc, - collections::{VecDeque, HashMap}, -}; - -use tokio::sync::RwLock; +use std::sync::Arc; use serai_client::primitives::NetworkId; - use processor_messages::{ProcessorMessage, CoordinatorMessage}; +use message_queue::{Service, Metadata, client::MessageQueue}; + #[derive(Clone, PartialEq, Eq, Debug)] pub struct Message { pub id: u64, @@ -23,27 +19,31 @@ pub trait Processors: 'static + Send + Sync + Clone { async fn ack(&mut self, msg: Message); } -// TODO: Move this to tests -#[derive(Clone)] -pub struct MemProcessors(pub Arc>>>); -impl MemProcessors { - #[allow(clippy::new_without_default)] - pub fn new() -> MemProcessors { - MemProcessors(Arc::new(RwLock::new(HashMap::new()))) - } -} - #[async_trait::async_trait] -impl Processors for MemProcessors { +impl Processors for Arc { async fn send(&self, network: NetworkId, msg: CoordinatorMessage) { - let mut processors = self.0.write().await; - let processor = processors.entry(network).or_insert_with(VecDeque::new); - processor.push_back(msg); + let metadata = + Metadata { from: self.service, to: Service::Processor(network), intent: msg.intent() }; + let msg = serde_json::to_string(&msg).unwrap(); + self.queue(metadata, msg.into_bytes()).await; } async fn recv(&mut self) -> Message { - todo!() + // TODO: Use a proper expected next ID + let msg = self.next(0).await; + + let network = match msg.from { + Service::Processor(network) => network, + Service::Coordinator => panic!("coordinator sent coordinator message"), + }; + let id = msg.id; + + // Deserialize it into a ProcessorMessage + let msg: ProcessorMessage = + serde_json::from_slice(&msg.msg).expect("message wasn't a JSON-encoded ProcessorMessage"); + + return Message { id, network, msg }; } - async fn ack(&mut self, _: Message) { - todo!() + async fn ack(&mut self, msg: Message) { + MessageQueue::ack(self, msg.id).await } } diff --git a/coordinator/src/tests/mod.rs b/coordinator/src/tests/mod.rs index e263b4da..d715b7cd 100644 --- a/coordinator/src/tests/mod.rs +++ b/coordinator/src/tests/mod.rs @@ -1 +1,38 @@ +use std::{ + sync::Arc, + collections::{VecDeque, HashMap}, +}; + +use serai_client::primitives::NetworkId; + +use processor_messages::CoordinatorMessage; + +use tokio::sync::RwLock; + +use crate::processors::{Message, Processors}; + pub mod tributary; + +#[derive(Clone)] +pub struct MemProcessors(pub Arc>>>); +impl MemProcessors { + #[allow(clippy::new_without_default)] + pub fn new() -> MemProcessors { + MemProcessors(Arc::new(RwLock::new(HashMap::new()))) + } +} + +#[async_trait::async_trait] +impl Processors for MemProcessors { + async fn send(&self, network: NetworkId, msg: CoordinatorMessage) { + let mut processors = self.0.write().await; + let processor = processors.entry(network).or_insert_with(VecDeque::new); + processor.push_back(msg); + } + async fn recv(&mut self) -> Message { + todo!() + } + async fn ack(&mut self, _: Message) { + todo!() + } +} diff --git a/coordinator/src/tests/tributary/dkg.rs b/coordinator/src/tests/tributary/dkg.rs index 85c085cd..f824dced 100644 --- a/coordinator/src/tests/tributary/dkg.rs +++ b/coordinator/src/tests/tributary/dkg.rs @@ -19,10 +19,12 @@ use processor_messages::{ use tributary::{Transaction as TransactionTrait, Tributary}; use crate::{ - processors::MemProcessors, LocalP2p, tributary::{TributaryDb, Transaction, TributarySpec, scanner::handle_new_blocks}, - tests::tributary::{new_keys, new_spec, new_tributaries, run_tributaries, wait_for_tx_inclusion}, + tests::{ + MemProcessors, + tributary::{new_keys, new_spec, new_tributaries, run_tributaries, wait_for_tx_inclusion}, + }, }; #[tokio::test] diff --git a/message-queue/Cargo.toml b/message-queue/Cargo.toml index 3d6d36be..54fea1ea 100644 --- a/message-queue/Cargo.toml +++ b/message-queue/Cargo.toml @@ -14,6 +14,7 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [dependencies] + # Macros lazy_static = "1" serde = { version = "1", features = ["derive"] } @@ -23,13 +24,16 @@ hex = "0.4" bincode = "1" serde_json = "1" +# Libs +zeroize = "1" +rand_core = "0.6" + # Cryptography transcript = { package = "flexible-transcript", path = "../crypto/transcript", features = ["recommended"] } ciphersuite = { path = "../crypto/ciphersuite", features = ["ristretto"] } schnorr-signatures = { path = "../crypto/schnorr" } # Application -log = "0.4" tokio = { version = "1", features = ["full"] } serai-db = { path = "../common/db", features = ["rocksdb"] } @@ -40,3 +44,4 @@ serai-env = { path = "../common/env" } serai-primitives = { path = "../substrate/primitives" } jsonrpsee = { version = "0.16", features = ["server"] } +reqwest = { version = "0.11", features = ["json"] } diff --git a/message-queue/src/client.rs b/message-queue/src/client.rs new file mode 100644 index 00000000..5345bb99 --- /dev/null +++ b/message-queue/src/client.rs @@ -0,0 +1,169 @@ +use core::ops::Deref; + +use zeroize::{Zeroize, Zeroizing}; +use rand_core::OsRng; + +use ciphersuite::{ + group::ff::{Field, PrimeField}, + Ciphersuite, Ristretto, +}; +use schnorr_signatures::SchnorrSignature; + +use serde::{Serialize, Deserialize}; + +use reqwest::Client; + +use serai_env as env; + +use crate::{Service, Metadata, QueuedMessage, message_challenge, ack_challenge}; + +pub struct MessageQueue { + pub service: Service, + priv_key: Zeroizing<::F>, + pub_key: ::G, + client: Client, + url: String, +} + +impl MessageQueue { + pub fn new(service: Service) -> MessageQueue { + let url = env::var("MESSAGE_QUEUE_RPC").expect("message-queue RPC wasn't specified"); + + let priv_key: Zeroizing<::F> = { + let key_str = + Zeroizing::new(env::var("MESSAGE_QUEUE_KEY").expect("message-queue key wasn't specified")); + let key_bytes = Zeroizing::new( + hex::decode(&key_str).expect("invalid message-queue key specified (wasn't hex)"), + ); + let mut bytes = <::F as PrimeField>::Repr::default(); + bytes.copy_from_slice(&key_bytes); + let key = Zeroizing::new( + Option::from(<::F as PrimeField>::from_repr(bytes)) + .expect("invalid message-queue key specified"), + ); + bytes.zeroize(); + key + }; + + MessageQueue { + service, + pub_key: Ristretto::generator() * priv_key.deref(), + priv_key, + client: Client::new(), + url, + } + } + + async fn json_call(&self, method: &'static str, params: serde_json::Value) -> serde_json::Value { + #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] + struct JsonRpcRequest { + version: &'static str, + method: &'static str, + params: serde_json::Value, + id: u64, + } + + let res = loop { + // Make the request + if let Ok(req) = self + .client + .post(&self.url) + .json(&JsonRpcRequest { version: "2.0", method, params: params.clone(), id: 0 }) + .send() + .await + { + // Get the response + if let Ok(res) = req.text().await { + break res; + } + } + + // Sleep 5s before trying again + tokio::time::sleep(core::time::Duration::from_secs(5)).await; + }; + + let json = + serde_json::from_str::(&res).expect("message-queue returned invalid JSON"); + if json.get("result").is_none() { + panic!("call failed: {json}"); + } + json + } + + pub async fn queue(&self, metadata: Metadata, msg: Vec) { + // TODO: Should this use OsRng? Deterministic or deterministic + random may be better. + let nonce = Zeroizing::new(::F::random(&mut OsRng)); + let nonce_pub = Ristretto::generator() * nonce.deref(); + let sig = SchnorrSignature::::sign( + &self.priv_key, + nonce, + message_challenge( + metadata.from, + self.pub_key, + metadata.to, + &metadata.intent, + &msg, + nonce_pub, + ), + ) + .serialize(); + + let json = self.json_call("queue", serde_json::json!([metadata, msg, sig])).await; + if json.get("result") != Some(&serde_json::Value::Bool(true)) { + panic!("failed to queue message: {json}"); + } + } + + pub async fn next(&self, expected: u64) -> QueuedMessage { + loop { + let json = self.json_call("next", serde_json::json!([self.service, expected])).await; + + // Convert from a Value to a type via reserialization + let msg: Option = serde_json::from_str( + &serde_json::to_string( + &json.get("result").expect("successful JSON RPC call didn't have result"), + ) + .unwrap(), + ) + .expect("next didn't return an Option"); + + // If there wasn't a message, check again in 5s + let Some(msg) = msg else { + tokio::time::sleep(core::time::Duration::from_secs(5)).await; + continue; + }; + + // Verify the message + // Verify the sender is sane + if matches!(self.service, Service::Processor(_)) { + assert_eq!(msg.from, Service::Coordinator, "non-coordinator sent processor message"); + } else { + assert!( + matches!(msg.from, Service::Processor(_)), + "non-processor sent coordinator message" + ); + } + // TODO: Verify the sender's signature + // TODO: Check the ID is sane + + return msg; + } + } + + pub async fn ack(&self, id: u64) { + // TODO: Should this use OsRng? Deterministic or deterministic + random may be better. + let nonce = Zeroizing::new(::F::random(&mut OsRng)); + let nonce_pub = Ristretto::generator() * nonce.deref(); + let sig = SchnorrSignature::::sign( + &self.priv_key, + nonce, + ack_challenge(self.service, self.pub_key, id, nonce_pub), + ) + .serialize(); + + let json = self.json_call("ack", serde_json::json!([id, sig])).await; + if json.get("result") != Some(&serde_json::Value::Bool(true)) { + panic!("failed to ack message {id}: {json}"); + } + } +} diff --git a/message-queue/src/lib.rs b/message-queue/src/lib.rs index 8bcb12a2..83c788ac 100644 --- a/message-queue/src/lib.rs +++ b/message-queue/src/lib.rs @@ -1,2 +1,4 @@ mod messages; pub use messages::*; + +pub mod client; diff --git a/processor/Cargo.toml b/processor/Cargo.toml index 5b1f0a84..e4968a32 100644 --- a/processor/Cargo.toml +++ b/processor/Cargo.toml @@ -33,7 +33,6 @@ serde_json = "1" # Cryptography ciphersuite = { path = "../crypto/ciphersuite", features = ["ristretto"] } -schnorr = { package = "schnorr-signatures", path = "../crypto/schnorr" } transcript = { package = "flexible-transcript", path = "../crypto/transcript" } frost = { package = "modular-frost", path = "../crypto/frost", features = ["ristretto"] } @@ -62,7 +61,6 @@ serai-client = { path = "../substrate/client", default-features = false } messages = { package = "serai-processor-messages", path = "./messages" } -reqwest = "0.11" message-queue = { package = "serai-message-queue", path = "../message-queue" } [dev-dependencies] diff --git a/processor/src/coordinator.rs b/processor/src/coordinator.rs index 882d39cf..056cf646 100644 --- a/processor/src/coordinator.rs +++ b/processor/src/coordinator.rs @@ -1,18 +1,6 @@ -use core::ops::Deref; - -use zeroize::Zeroizing; -use rand_core::OsRng; - -use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto}; -use schnorr::SchnorrSignature; - -use serde::{Serialize, Deserialize}; - use messages::{ProcessorMessage, CoordinatorMessage}; -use serai_client::primitives::NetworkId; -use message_queue::{Service, Metadata, QueuedMessage, message_challenge, ack_challenge}; -use reqwest::Client; +use message_queue::{Service, Metadata, client::MessageQueue}; #[derive(Clone, PartialEq, Eq, Debug)] pub struct Message { @@ -27,156 +15,29 @@ pub trait Coordinator { async fn ack(&mut self, msg: Message); } -pub struct MessageQueue { - network: NetworkId, - priv_key: Zeroizing<::F>, - pub_key: ::G, - client: Client, - message_queue_url: String, -} - -impl MessageQueue { - pub fn new( - message_queue_url: String, - network: NetworkId, - priv_key: Zeroizing<::F>, - ) -> MessageQueue { - MessageQueue { - network, - pub_key: Ristretto::generator() * priv_key.deref(), - priv_key, - client: Client::new(), - message_queue_url, - } - } - - async fn json_call(&self, method: &'static str, params: serde_json::Value) -> serde_json::Value { - #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] - struct JsonRpcRequest { - version: &'static str, - method: &'static str, - params: serde_json::Value, - id: u64, - } - - let res = loop { - // Make the request - if let Ok(req) = self - .client - .post(&self.message_queue_url) - .json(&JsonRpcRequest { version: "2.0", method, params: params.clone(), id: 0 }) - .send() - .await - { - // Get the response - if let Ok(res) = req.text().await { - break res; - } - } - - // Sleep 5s before trying again - tokio::time::sleep(core::time::Duration::from_secs(5)).await; - }; - - let json = - serde_json::from_str::(&res).expect("message-queue returned invalid JSON"); - if json.get("result").is_none() { - panic!("call failed: {json}"); - } - json - } - - async fn queue(&self, metadata: Metadata, msg: Vec, sig: Vec) { - let json = self.json_call("queue", serde_json::json!([metadata, msg, sig])).await; - if json.get("result") != Some(&serde_json::Value::Bool(true)) { - panic!("failed to queue message: {json}"); - } - } - - async fn next(&self) -> Message { - loop { - // TODO: Use a proper expected next ID - let json = - self.json_call("next", serde_json::json!([Service::Processor(self.network), 0])).await; - - // Convert from a Value to a type via reserialization - let msg: Option = serde_json::from_str( - &serde_json::to_string( - &json.get("result").expect("successful JSON RPC call didn't have result"), - ) - .unwrap(), - ) - .expect("next didn't return an Option"); - - // If there wasn't a message, check again in 5s - let Some(msg) = msg else { - tokio::time::sleep(core::time::Duration::from_secs(5)).await; - continue; - }; - - // Verify the message - assert_eq!(msg.from, Service::Coordinator, "non-coordinator sent us message"); - // TODO: Verify the coordinator's signature - // TODO: Check the ID is sane - let id = msg.id; - - // Deserialize it into a CoordinatorMessage - let msg: CoordinatorMessage = - serde_json::from_slice(&msg.msg).expect("message wasn't a JSON-encoded CoordinatorMessage"); - return Message { id, msg }; - } - } - - async fn ack(&self, id: u64, sig: Vec) { - let json = self.json_call("ack", serde_json::json!([id, sig])).await; - if json.get("result") != Some(&serde_json::Value::Bool(true)) { - panic!("failed to ack message {id}: {json}"); - } - } -} - #[async_trait::async_trait] impl Coordinator for MessageQueue { async fn send(&mut self, msg: ProcessorMessage) { - let metadata = Metadata { - from: Service::Processor(self.network), - to: Service::Coordinator, - intent: msg.intent(), - }; + let metadata = Metadata { from: self.service, to: Service::Coordinator, intent: msg.intent() }; let msg = serde_json::to_string(&msg).unwrap(); - // TODO: Should this use OsRng? Deterministic or deterministic + random may be better. - let nonce = Zeroizing::new(::F::random(&mut OsRng)); - let nonce_pub = Ristretto::generator() * nonce.deref(); - let sig = SchnorrSignature::::sign( - &self.priv_key, - nonce, - message_challenge( - metadata.from, - self.pub_key, - metadata.to, - &metadata.intent, - msg.as_bytes(), - nonce_pub, - ), - ); - self.queue(metadata, msg.into_bytes(), sig.serialize()).await; + self.queue(metadata, msg.into_bytes()).await; } async fn recv(&mut self) -> Message { - self.next().await + // TODO: Use a proper expected next ID + let msg = self.next(0).await; + + let id = msg.id; + + // Deserialize it into a CoordinatorMessage + let msg: CoordinatorMessage = + serde_json::from_slice(&msg.msg).expect("message wasn't a JSON-encoded CoordinatorMessage"); + + return Message { id, msg }; } async fn ack(&mut self, msg: Message) { - // TODO: Should this use OsRng? Deterministic or deterministic + random may be better. - let nonce = Zeroizing::new(::F::random(&mut OsRng)); - let nonce_pub = Ristretto::generator() * nonce.deref(); - let sig = SchnorrSignature::::sign( - &self.priv_key, - nonce, - ack_challenge(Service::Processor(self.network), self.pub_key, msg.id, nonce_pub), - ); - - MessageQueue::ack(self, msg.id, sig.serialize()).await + MessageQueue::ack(self, msg.id).await } } diff --git a/processor/src/main.rs b/processor/src/main.rs index 06621108..c36be56c 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -7,10 +7,7 @@ use std::{ use zeroize::{Zeroize, Zeroizing}; use transcript::{Transcript, RecommendedTranscript}; -use ciphersuite::{ - group::{ff::PrimeField, GroupEncoding}, - Ristretto, -}; +use ciphersuite::group::GroupEncoding; use frost::{curve::Ciphersuite, ThresholdKeys}; use log::{info, warn, error}; @@ -30,6 +27,8 @@ use messages::{SubstrateContext, CoordinatorMessage, ProcessorMessage}; use serai_env as env; +use message_queue::{Service, client::MessageQueue}; + mod plan; pub use plan::*; @@ -733,27 +732,7 @@ async fn main() { _ => panic!("unrecognized network"), }; - // Coordinator configuration - let priv_key = { - let key_str = - Zeroizing::new(env::var("MESSAGE_QUEUE_KEY").expect("message-queue key wasn't specified")); - let key_bytes = Zeroizing::new( - hex::decode(&key_str).expect("invalid message-queue key specified (wasn't hex)"), - ); - let mut bytes = <::F as PrimeField>::Repr::default(); - bytes.copy_from_slice(&key_bytes); - let key = Zeroizing::new( - Option::from(<::F as PrimeField>::from_repr(bytes)) - .expect("invalid message-queue key specified"), - ); - bytes.zeroize(); - key - }; - let coordinator = MessageQueue::new( - env::var("MESSAGE_QUEUE_RPC").expect("message-queue RPC wasn't specified"), - network_id, - priv_key, - ); + let coordinator = MessageQueue::new(Service::Processor(network_id)); match network_id { #[cfg(feature = "bitcoin")]