diff --git a/Cargo.lock b/Cargo.lock index de685b5f..79f41f25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8739,6 +8739,7 @@ dependencies = [ "rand_chacha 0.3.1", "rand_core 0.6.4", "reqwest", + "rocksdb", "schnorr-signatures", "secp256k1", "serai-client", diff --git a/message-queue/src/main.rs b/message-queue/src/main.rs index 17499f8e..dae4b53e 100644 --- a/message-queue/src/main.rs +++ b/message-queue/src/main.rs @@ -106,8 +106,12 @@ fn ack_message(service: Service, id: u64, sig: SchnorrSignature<Ristretto>) { #[tokio::main] async fn main() { // Open the DB - let db = - Arc::new(rocksdb::TransactionDB::open_default(serai_env::var("DB_PATH").unwrap()).unwrap()); + let db = Arc::new( + rocksdb::TransactionDB::open_default( + serai_env::var("DB_PATH").expect("path to DB wasn't specified"), + ) + .unwrap(), + ); let read_key = |str| { let key = serai_env::var(str)?; diff --git a/processor/Cargo.toml b/processor/Cargo.toml index 2d58939a..5b1f0a84 100644 --- a/processor/Cargo.toml +++ b/processor/Cargo.toml @@ -55,6 +55,7 @@ monero-serai = { path = "../coins/monero", features = ["multisig"], optional = t log = "0.4" tokio = { version = "1", features = ["full"] } +rocksdb = "0.21" serai-db = { path = "../common/db", default-features = false } serai-env = { path = "../common/env" } serai-client = { path = "../substrate/client", default-features = false } diff --git a/processor/src/coordinator.rs b/processor/src/coordinator.rs index 56767b72..882d39cf 100644 --- a/processor/src/coordinator.rs +++ b/processor/src/coordinator.rs @@ -1,8 +1,4 @@ use core::ops::Deref; -use std::{ - sync::{Arc, RwLock}, - collections::VecDeque, -}; use zeroize::Zeroizing; use rand_core::OsRng; @@ -184,25 +180,3 @@ impl Coordinator for MessageQueue { MessageQueue::ack(self, msg.id, sig.serialize()).await } } - -// TODO: Move this to tests -pub struct MemCoordinator(Arc<RwLock<VecDeque<Message>>>); -impl MemCoordinator { - #[allow(clippy::new_without_default)] - pub fn new() -> MemCoordinator { - MemCoordinator(Arc::new(RwLock::new(VecDeque::new()))) - } -} - -#[async_trait::async_trait] -impl Coordinator for MemCoordinator { - async fn send(&mut self, _: ProcessorMessage) { - todo!() - } - async fn recv(&mut self) -> Message { - todo!() - } - async fn ack(&mut self, _: Message) { - todo!() - } -} diff --git a/processor/src/main.rs b/processor/src/main.rs index fac8fc84..8d5379f2 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -1,12 +1,16 @@ use std::{ time::Duration, + sync::Arc, collections::{VecDeque, HashMap}, }; use zeroize::{Zeroize, Zeroizing}; use transcript::{Transcript, RecommendedTranscript}; -use ciphersuite::group::GroupEncoding; +use ciphersuite::{ + group::{ff::PrimeField, GroupEncoding}, + Ristretto, +}; use frost::{curve::Ciphersuite, ThresholdKeys}; use log::{info, warn, error}; @@ -15,7 +19,7 @@ use tokio::time::sleep; use scale::Decode; use serai_client::{ - primitives::{MAX_DATA_LEN, BlockHash}, + primitives::{MAX_DATA_LEN, BlockHash, NetworkId}, tokens::primitives::{OutInstruction, OutInstructionWithBalance}, in_instructions::primitives::{ Shorthand, RefundableInInstruction, InInstructionWithBalance, Batch, @@ -460,8 +464,7 @@ async fn boot<C: Coin, D: Db>( coin: &C, ) -> (MainDb<C, D>, TributaryMutable<C, D>, SubstrateMutable<C, D>) { let mut entropy_transcript = { - let entropy = - Zeroizing::new(env::var("ENTROPY").expect("entropy wasn't provided as an env var")); + let entropy = Zeroizing::new(env::var("ENTROPY").expect("entropy wasn't specified")); if entropy.len() != 64 { panic!("entropy isn't the right length"); } @@ -715,14 +718,48 @@ async fn run<C: Coin, D: Db, Co: Coordinator>(mut raw_db: D, coin: C, mut coordi #[tokio::main] async fn main() { - let db = MemDb::new(); // TODO - let coordinator = MemCoordinator::new(); // TODO - let url = env::var("COIN_RPC").expect("coin rpc wasn't specified as an env var"); - match env::var("COIN").expect("coin wasn't specified as an env var").as_str() { + let db = Arc::new( + rocksdb::TransactionDB::<rocksdb::SingleThreaded>::open_default( + env::var("DB_PATH").expect("path to DB wasn't specified"), + ) + .unwrap(), + ); + + // Network configuration + let url = env::var("NETWORK_RPC").expect("network RPC wasn't specified"); + let network_id = match env::var("NETWORK").expect("network wasn't specified").as_str() { + "bitcoin" => NetworkId::Bitcoin, + "monero" => NetworkId::Monero, + _ => panic!("unrecognized network"), + }; + + // Coordinator configuration + let priv_key = { + let key_str = + Zeroizing::new(env::var("MESSAGE_QUEUE_KEY").expect("message-queue key wasn't specified")); + let key_bytes = Zeroizing::new( + hex::decode(&key_str).expect("invalid message-queue key specified (wasn't hex)"), + ); + let mut bytes = <<Ristretto as Ciphersuite>::F as PrimeField>::Repr::default(); + bytes.copy_from_slice(&key_bytes); + let key = Zeroizing::new( + Option::from(<<Ristretto as Ciphersuite>::F as PrimeField>::from_repr(bytes)) + .expect("invalid message-queue key specified"), + ); + bytes.zeroize(); + key + }; + let coordinator = MessageQueue::new( + env::var("MESSAGE_QUEUE_RPC").expect("message-queue RPC wasn't specified"), + network_id, + priv_key, + ); + + match network_id { #[cfg(feature = "bitcoin")] - "bitcoin" => run(db, Bitcoin::new(url).await, coordinator).await, + NetworkId::Bitcoin => run(db, Bitcoin::new(url).await, coordinator).await, #[cfg(feature = "monero")] - "monero" => run(db, Monero::new(url), coordinator).await, - _ => panic!("unrecognized coin"), + NetworkId::Monero => run(db, Monero::new(url), coordinator).await, + _ => panic!("spawning a processor for an unsupported network"), } }