Use MessageQueue instead of MemCoordinator in processor

Also has it use RocksDB.
This commit is contained in:
Luke Parker 2023-07-17 18:01:56 -04:00
parent 344ac9cbfc
commit acc9495429
No known key found for this signature in database
5 changed files with 56 additions and 39 deletions

1
Cargo.lock generated
View file

@ -8739,6 +8739,7 @@ dependencies = [
"rand_chacha 0.3.1", "rand_chacha 0.3.1",
"rand_core 0.6.4", "rand_core 0.6.4",
"reqwest", "reqwest",
"rocksdb",
"schnorr-signatures", "schnorr-signatures",
"secp256k1", "secp256k1",
"serai-client", "serai-client",

View file

@ -106,8 +106,12 @@ fn ack_message(service: Service, id: u64, sig: SchnorrSignature<Ristretto>) {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
// Open the DB // Open the DB
let db = let db = Arc::new(
Arc::new(rocksdb::TransactionDB::open_default(serai_env::var("DB_PATH").unwrap()).unwrap()); rocksdb::TransactionDB::open_default(
serai_env::var("DB_PATH").expect("path to DB wasn't specified"),
)
.unwrap(),
);
let read_key = |str| { let read_key = |str| {
let key = serai_env::var(str)?; let key = serai_env::var(str)?;

View file

@ -55,6 +55,7 @@ monero-serai = { path = "../coins/monero", features = ["multisig"], optional = t
log = "0.4" log = "0.4"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
rocksdb = "0.21"
serai-db = { path = "../common/db", default-features = false } serai-db = { path = "../common/db", default-features = false }
serai-env = { path = "../common/env" } serai-env = { path = "../common/env" }
serai-client = { path = "../substrate/client", default-features = false } serai-client = { path = "../substrate/client", default-features = false }

View file

@ -1,8 +1,4 @@
use core::ops::Deref; use core::ops::Deref;
use std::{
sync::{Arc, RwLock},
collections::VecDeque,
};
use zeroize::Zeroizing; use zeroize::Zeroizing;
use rand_core::OsRng; use rand_core::OsRng;
@ -184,25 +180,3 @@ impl Coordinator for MessageQueue {
MessageQueue::ack(self, msg.id, sig.serialize()).await MessageQueue::ack(self, msg.id, sig.serialize()).await
} }
} }
// TODO: Move this to tests
pub struct MemCoordinator(Arc<RwLock<VecDeque<Message>>>);
impl MemCoordinator {
#[allow(clippy::new_without_default)]
pub fn new() -> MemCoordinator {
MemCoordinator(Arc::new(RwLock::new(VecDeque::new())))
}
}
#[async_trait::async_trait]
impl Coordinator for MemCoordinator {
async fn send(&mut self, _: ProcessorMessage) {
todo!()
}
async fn recv(&mut self) -> Message {
todo!()
}
async fn ack(&mut self, _: Message) {
todo!()
}
}

View file

@ -1,12 +1,16 @@
use std::{ use std::{
time::Duration, time::Duration,
sync::Arc,
collections::{VecDeque, HashMap}, collections::{VecDeque, HashMap},
}; };
use zeroize::{Zeroize, Zeroizing}; use zeroize::{Zeroize, Zeroizing};
use transcript::{Transcript, RecommendedTranscript}; use transcript::{Transcript, RecommendedTranscript};
use ciphersuite::group::GroupEncoding; use ciphersuite::{
group::{ff::PrimeField, GroupEncoding},
Ristretto,
};
use frost::{curve::Ciphersuite, ThresholdKeys}; use frost::{curve::Ciphersuite, ThresholdKeys};
use log::{info, warn, error}; use log::{info, warn, error};
@ -15,7 +19,7 @@ use tokio::time::sleep;
use scale::Decode; use scale::Decode;
use serai_client::{ use serai_client::{
primitives::{MAX_DATA_LEN, BlockHash}, primitives::{MAX_DATA_LEN, BlockHash, NetworkId},
tokens::primitives::{OutInstruction, OutInstructionWithBalance}, tokens::primitives::{OutInstruction, OutInstructionWithBalance},
in_instructions::primitives::{ in_instructions::primitives::{
Shorthand, RefundableInInstruction, InInstructionWithBalance, Batch, Shorthand, RefundableInInstruction, InInstructionWithBalance, Batch,
@ -460,8 +464,7 @@ async fn boot<C: Coin, D: Db>(
coin: &C, coin: &C,
) -> (MainDb<C, D>, TributaryMutable<C, D>, SubstrateMutable<C, D>) { ) -> (MainDb<C, D>, TributaryMutable<C, D>, SubstrateMutable<C, D>) {
let mut entropy_transcript = { let mut entropy_transcript = {
let entropy = let entropy = Zeroizing::new(env::var("ENTROPY").expect("entropy wasn't specified"));
Zeroizing::new(env::var("ENTROPY").expect("entropy wasn't provided as an env var"));
if entropy.len() != 64 { if entropy.len() != 64 {
panic!("entropy isn't the right length"); panic!("entropy isn't the right length");
} }
@ -715,14 +718,48 @@ async fn run<C: Coin, D: Db, Co: Coordinator>(mut raw_db: D, coin: C, mut coordi
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let db = MemDb::new(); // TODO let db = Arc::new(
let coordinator = MemCoordinator::new(); // TODO rocksdb::TransactionDB::<rocksdb::SingleThreaded>::open_default(
let url = env::var("COIN_RPC").expect("coin rpc wasn't specified as an env var"); env::var("DB_PATH").expect("path to DB wasn't specified"),
match env::var("COIN").expect("coin wasn't specified as an env var").as_str() { )
.unwrap(),
);
// Network configuration
let url = env::var("NETWORK_RPC").expect("network RPC wasn't specified");
let network_id = match env::var("NETWORK").expect("network wasn't specified").as_str() {
"bitcoin" => NetworkId::Bitcoin,
"monero" => NetworkId::Monero,
_ => panic!("unrecognized network"),
};
// Coordinator configuration
let priv_key = {
let key_str =
Zeroizing::new(env::var("MESSAGE_QUEUE_KEY").expect("message-queue key wasn't specified"));
let key_bytes = Zeroizing::new(
hex::decode(&key_str).expect("invalid message-queue key specified (wasn't hex)"),
);
let mut bytes = <<Ristretto as Ciphersuite>::F as PrimeField>::Repr::default();
bytes.copy_from_slice(&key_bytes);
let key = Zeroizing::new(
Option::from(<<Ristretto as Ciphersuite>::F as PrimeField>::from_repr(bytes))
.expect("invalid message-queue key specified"),
);
bytes.zeroize();
key
};
let coordinator = MessageQueue::new(
env::var("MESSAGE_QUEUE_RPC").expect("message-queue RPC wasn't specified"),
network_id,
priv_key,
);
match network_id {
#[cfg(feature = "bitcoin")] #[cfg(feature = "bitcoin")]
"bitcoin" => run(db, Bitcoin::new(url).await, coordinator).await, NetworkId::Bitcoin => run(db, Bitcoin::new(url).await, coordinator).await,
#[cfg(feature = "monero")] #[cfg(feature = "monero")]
"monero" => run(db, Monero::new(url), coordinator).await, NetworkId::Monero => run(db, Monero::new(url), coordinator).await,
_ => panic!("unrecognized coin"), _ => panic!("spawning a processor for an unsupported network"),
} }
} }