#![allow(unused_variables)] #![allow(unreachable_code)] #![allow(clippy::diverging_sub_expression)] use core::{ops::Deref, future::Future}; use std::{ sync::Arc, time::{SystemTime, Duration}, collections::{VecDeque, HashMap}, }; use zeroize::{Zeroize, Zeroizing}; use rand_core::OsRng; use ciphersuite::{ group::ff::{Field, PrimeField}, Ciphersuite, Ristretto, }; use schnorr::SchnorrSignature; use frost::Participant; use serai_db::{DbTxn, Db}; use serai_env as env; use serai_client::{primitives::NetworkId, Public, Serai}; use message_queue::{Service, client::MessageQueue}; use futures::stream::StreamExt; use tokio::{sync::RwLock, time::sleep}; use ::tributary::{ ReadWrite, ProvidedError, TransactionKind, TransactionTrait, Block, Tributary, TributaryReader, }; mod tributary; #[rustfmt::skip] use crate::tributary::{TributarySpec, SignData, Transaction, TributaryDb, scanner::RecognizedIdType}; mod db; use db::MainDb; mod p2p; pub use p2p::*; use processor_messages::{key_gen, sign, coordinator, ProcessorMessage}; pub mod processors; use processors::Processors; mod substrate; #[cfg(test)] pub mod tests; lazy_static::lazy_static! { // This is a static to satisfy lifetime expectations static ref NEW_TRIBUTARIES: RwLock> = RwLock::new(VecDeque::new()); } pub struct ActiveTributary { pub spec: TributarySpec, pub tributary: Arc>>, } type Tributaries = HashMap<[u8; 32], ActiveTributary>; // Adds a tributary into the specified HahMap async fn add_tributary( db: D, key: Zeroizing<::F>, p2p: P, tributaries: &mut Tributaries, spec: TributarySpec, ) -> TributaryReader { log::info!("adding tributary {:?}", spec.set()); let tributary = Tributary::<_, Transaction, _>::new( // TODO2: Use a db on a distinct volume to protect against DoS attacks db, spec.genesis(), spec.start_time(), key, spec.validators(), p2p, ) .await .unwrap(); let reader = tributary.reader(); tributaries.insert( tributary.genesis(), ActiveTributary { spec, tributary: Arc::new(RwLock::new(tributary)) }, ); reader } pub async fn scan_substrate( db: D, key: Zeroizing<::F>, processors: Pro, serai: Arc, ) { log::info!("scanning substrate"); let mut db = substrate::SubstrateDb::new(db); let mut next_substrate_block = db.next_block(); let new_substrate_block_notifier = { let serai = &serai; move || async move { loop { match serai.newly_finalized_block().await { Ok(sub) => return sub, Err(e) => { log::error!("couldn't communicate with serai node: {e}"); sleep(Duration::from_secs(5)).await; } } } } }; let mut substrate_block_notifier = new_substrate_block_notifier().await; loop { // await the next block, yet if our notifier had an error, re-create it { let Ok(next_block) = tokio::time::timeout(Duration::from_secs(60), substrate_block_notifier.next()).await else { // Timed out, which may be because Serai isn't finalizing or may be some issue with the // notifier if serai.get_latest_block().await.map(|block| block.number()).ok() == Some(next_substrate_block.saturating_sub(1)) { log::info!("serai hasn't finalized a block in the last 60s..."); } else { substrate_block_notifier = new_substrate_block_notifier().await; } continue; }; // next_block is a Option if next_block.and_then(Result::ok).is_none() { substrate_block_notifier = new_substrate_block_notifier().await; continue; } } match substrate::handle_new_blocks( &mut db, &key, |db: &mut D, spec: TributarySpec| { log::info!("creating new tributary for {:?}", spec.set()); // Save it to the database MainDb::new(db).add_active_tributary(&spec); // Add it to the queue // If we reboot before this is read from the queue, the fact it was saved to the database // means it'll be handled on reboot async { NEW_TRIBUTARIES.write().await.push_back(spec); } }, &processors, &serai, &mut next_substrate_block, ) .await { Ok(()) => {} Err(e) => { log::error!("couldn't communicate with serai node: {e}"); sleep(Duration::from_secs(5)).await; } } } } #[allow(clippy::type_complexity)] pub async fn scan_tributaries< D: Db, Pro: Processors, P: P2p, FRid: Future, RID: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32]) -> FRid, >( raw_db: D, key: Zeroizing<::F>, recognized_id: RID, p2p: P, processors: Pro, serai: Arc, tributaries: Arc>>, ) { log::info!("scanning tributaries"); let mut tributary_readers = vec![]; for ActiveTributary { spec, tributary } in tributaries.read().await.values() { tributary_readers.push((spec.clone(), tributary.read().await.reader())); } // Handle new Tributary blocks let mut tributary_db = tributary::TributaryDb::new(raw_db.clone()); loop { // The following handle_new_blocks function may take an arbitrary amount of time // Accordingly, it may take a long time to acquire a write lock on the tributaries table // By definition of NEW_TRIBUTARIES, we allow tributaries to be added almost immediately, // meaning the Substrate scanner won't become blocked on this { let mut new_tributaries = NEW_TRIBUTARIES.write().await; while let Some(spec) = new_tributaries.pop_front() { let reader = add_tributary( raw_db.clone(), key.clone(), p2p.clone(), // This is a short-lived write acquisition, which is why it should be fine &mut *tributaries.write().await, spec.clone(), ) .await; // Trigger a DKG for the newly added Tributary let set = spec.set(); processors .send( set.network, processor_messages::CoordinatorMessage::KeyGen( processor_messages::key_gen::CoordinatorMessage::GenerateKey { id: processor_messages::key_gen::KeyGenId { set, attempt: 0 }, params: frost::ThresholdParams::new( spec.t(), spec.n(), spec .i(Ristretto::generator() * key.deref()) .expect("adding a tribuary for a set we aren't in set for"), ) .unwrap(), }, ), ) .await; tributary_readers.push((spec, reader)); } } for (spec, reader) in &tributary_readers { tributary::scanner::handle_new_blocks::<_, _, _, _, _, _, P>( &mut tributary_db, &key, recognized_id.clone(), &processors, |set, tx| { let serai = serai.clone(); async move { loop { match serai.publish(&tx).await { Ok(_) => { log::info!("set key pair for {set:?}"); break; } // This is assumed to be some ephemeral error due to the assumed fault-free // creation // TODO2: Differentiate connection errors from invariants Err(e) => { // Check if this failed because the keys were already set by someone else if matches!(serai.get_keys(spec.set()).await, Ok(Some(_))) { log::info!("another coordinator set key pair for {:?}", set); break; } log::error!("couldn't connect to Serai node to publish set_keys TX: {:?}", e); tokio::time::sleep(Duration::from_secs(10)).await; } } } } }, spec, reader, ) .await; } // Sleep for half the block time // TODO2: Define a notification system for when a new block occurs sleep(Duration::from_secs((Tributary::::block_time() / 2).into())).await; } } pub async fn heartbeat_tributaries( p2p: P, tributaries: Arc>>, ) { let ten_blocks_of_time = Duration::from_secs((10 * Tributary::::block_time()).into()); loop { for ActiveTributary { spec: _, tributary } in tributaries.read().await.values() { let tributary = tributary.read().await; let tip = tributary.tip().await; let block_time = SystemTime::UNIX_EPOCH + Duration::from_secs(tributary.reader().time_of_block(&tip).unwrap_or(0)); // Only trigger syncing if the block is more than a minute behind if SystemTime::now() > (block_time + Duration::from_secs(60)) { log::warn!("last known tributary block was over a minute ago"); let mut msg = tip.to_vec(); // Also include the timestamp so LibP2p doesn't flag this as an old message re-circulating let timestamp = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .expect("system clock is wrong") .as_secs(); // Divide by the block time so if multiple parties send a Heartbeat, they're more likely to // overlap let time_unit = timestamp / u64::from(Tributary::::block_time()); msg.extend(time_unit.to_le_bytes()); P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), msg).await; } } // Only check once every 10 blocks of time sleep(ten_blocks_of_time).await; } } pub async fn handle_p2p( our_key: ::G, p2p: P, tributaries: Arc>>, ) { loop { let mut msg = p2p.receive().await; // Spawn a dedicated task to handle this message, ensuring any singularly latent message // doesn't hold everything up // TODO2: Move to one task per tributary (or two. One for Tendermint, one for Tributary) tokio::spawn({ let p2p = p2p.clone(); let tributaries = tributaries.clone(); async move { match msg.kind { P2pMessageKind::KeepAlive => {} P2pMessageKind::Tributary(genesis) => { let tributaries = tributaries.read().await; let Some(tributary) = tributaries.get(&genesis) else { log::debug!("received p2p message for unknown network"); return; }; log::trace!("handling message for tributary {:?}", tributary.spec.set()); if tributary.tributary.read().await.handle_message(&msg.msg).await { P2p::broadcast(&p2p, msg.kind, msg.msg).await; } } // TODO2: Rate limit this per timestamp // And/or slash on Heartbeat which justifies a response, since the node obviously was // offline and we must now use our bandwidth to compensate for them? P2pMessageKind::Heartbeat(genesis) => { if msg.msg.len() != 40 { log::error!("validator sent invalid heartbeat"); return; } let tributaries = tributaries.read().await; let Some(tributary) = tributaries.get(&genesis) else { log::debug!("received heartbeat message for unknown network"); return; }; let tributary_read = tributary.tributary.read().await; /* // Have sqrt(n) nodes reply with the blocks let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64; // Try to have at least 3 responders if responders < 3 { responders = tributary.spec.n().min(3).into(); } */ // Have up to three nodes respond let responders = u64::from(tributary.spec.n().min(3)); // Decide which nodes will respond by using the latest block's hash as a mutually agreed // upon entropy source // This isn't a secure source of entropy, yet it's fine for this let entropy = u64::from_le_bytes(tributary_read.tip().await[.. 8].try_into().unwrap()); // If n = 10, responders = 3, we want start to be 0 ..= 7 (so the highest is 7, 8, 9) // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 let start = usize::try_from(entropy % (u64::from(tributary.spec.n() + 1) - responders)).unwrap(); let mut selected = false; for validator in &tributary.spec.validators()[start .. (start + usize::try_from(responders).unwrap())] { if our_key == validator.0 { selected = true; break; } } if !selected { log::debug!("received heartbeat and not selected to respond"); return; } log::debug!("received heartbeat and selected to respond"); let reader = tributary_read.reader(); drop(tributary_read); let mut latest = msg.msg[.. 32].try_into().unwrap(); while let Some(next) = reader.block_after(&latest) { let mut res = reader.block(&next).unwrap().serialize(); res.extend(reader.commit(&next).unwrap()); // Also include the timestamp used within the Heartbeat res.extend(&msg.msg[32 .. 40]); p2p.send(msg.sender, P2pMessageKind::Block(tributary.spec.genesis()), res).await; latest = next; } } P2pMessageKind::Block(genesis) => { let mut msg_ref: &[u8] = msg.msg.as_ref(); let Ok(block) = Block::::read(&mut msg_ref) else { log::error!("received block message with an invalidly serialized block"); return; }; // Get just the commit msg.msg.drain(.. (msg.msg.len() - msg_ref.len())); msg.msg.drain((msg.msg.len() - 8) ..); let tributaries = tributaries.read().await; let Some(tributary) = tributaries.get(&genesis) else { log::debug!("received block message for unknown network"); return; }; let res = tributary.tributary.read().await.sync_block(block, msg.msg).await; log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res); } } } }); } } pub async fn publish_transaction( tributary: &Tributary, tx: Transaction, ) { log::debug!("publishing transaction {}", hex::encode(tx.hash())); if let TransactionKind::Signed(signed) = tx.kind() { if tributary .next_nonce(signed.signer) .await .expect("we don't have a nonce, meaning we aren't a participant on this tributary") > signed.nonce { log::warn!("we've already published this transaction. this should only appear on reboot"); } else { // We should've created a valid transaction assert!(tributary.add_transaction(tx).await, "created an invalid transaction"); } } else { panic!("non-signed transaction passed to publish_transaction"); } } pub async fn handle_processors( mut db: D, key: Zeroizing<::F>, serai: Arc, mut processors: Pro, tributaries: Arc>>, ) { let pub_key = Ristretto::generator() * key.deref(); loop { // TODO: Dispatch this message to a task dedicated to handling this processor, preventing one // processor from holding up all the others. This would require a peek method be added to the // message-queue (to view multiple future messages at once) // TODO: Do we handle having handled a message, by DB, yet having rebooted before `ack`ing it? // Does the processor? let msg = processors.recv().await; // TODO2: This is slow, and only works as long as a network only has a single Tributary // (which means there's a lack of multisig rotation) let spec = { let mut spec = None; for tributary in tributaries.read().await.values() { if tributary.spec.set().network == msg.network { spec = Some(tributary.spec.clone()); break; } } spec.expect("received message from processor we don't have a tributary for") }; let genesis = spec.genesis(); // TODO: We probably want to NOP here, not panic? let my_i = spec.i(pub_key).expect("processor message for network we aren't a validator in"); let tx = match msg.msg.clone() { ProcessorMessage::KeyGen(inner_msg) => match inner_msg { key_gen::ProcessorMessage::Commitments { id, commitments } => { Some(Transaction::DkgCommitments(id.attempt, commitments, Transaction::empty_signed())) } key_gen::ProcessorMessage::Shares { id, mut shares } => { // Create a MuSig-based machine to inform Substrate of this key generation let nonces = crate::tributary::dkg_confirmation_nonces(&key, &spec, id.attempt); let mut tx_shares = Vec::with_capacity(shares.len()); for i in 1 ..= spec.n() { let i = Participant::new(i).unwrap(); if i == my_i { continue; } tx_shares .push(shares.remove(&i).expect("processor didn't send share for another validator")); } Some(Transaction::DkgShares { attempt: id.attempt, shares: tx_shares, confirmation_nonces: nonces, signed: Transaction::empty_signed(), }) } key_gen::ProcessorMessage::GeneratedKeyPair { id, substrate_key, network_key } => { assert_eq!( id.set.network, msg.network, "processor claimed to be a different network than it was for GeneratedKeyPair", ); // TODO: Also check the other KeyGenId fields // Tell the Tributary the key pair, get back the share for the MuSig signature let mut txn = db.txn(); let share = crate::tributary::generated_key_pair::( &mut txn, &key, &spec, &(Public(substrate_key), network_key.try_into().unwrap()), id.attempt, ); txn.commit(); match share { Ok(share) => { Some(Transaction::DkgConfirmed(id.attempt, share, Transaction::empty_signed())) } Err(p) => todo!("participant {p:?} sent invalid DKG confirmation preprocesses"), } } }, ProcessorMessage::Sign(msg) => match msg { sign::ProcessorMessage::Preprocess { id, preprocess } => { if id.attempt == 0 { let mut txn = db.txn(); MainDb::::save_first_preprocess(&mut txn, id.id, preprocess); txn.commit(); None } else { Some(Transaction::SignPreprocess(SignData { plan: id.id, attempt: id.attempt, data: preprocess, signed: Transaction::empty_signed(), })) } } sign::ProcessorMessage::Share { id, share } => Some(Transaction::SignShare(SignData { plan: id.id, attempt: id.attempt, data: share, signed: Transaction::empty_signed(), })), sign::ProcessorMessage::Completed { key: _, id, tx } => { let r = Zeroizing::new(::F::random(&mut OsRng)); #[allow(non_snake_case)] let R = ::generator() * r.deref(); let mut tx = Transaction::SignCompleted { plan: id, tx_hash: tx, first_signer: pub_key, signature: SchnorrSignature { R, s: ::F::ZERO }, }; let signed = SchnorrSignature::sign(&key, r, tx.sign_completed_challenge()); match &mut tx { Transaction::SignCompleted { signature, .. } => { *signature = signed; } _ => unreachable!(), } Some(tx) } }, ProcessorMessage::Coordinator(inner_msg) => match inner_msg { coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => { assert_eq!( network, msg.network, "processor claimed to be a different network than it was for SubstrateBlockAck", ); // Safe to use its own txn since this is static and just needs to be written before we // provide SubstrateBlock let mut txn = db.txn(); TributaryDb::::set_plan_ids(&mut txn, genesis, block, &plans); txn.commit(); Some(Transaction::SubstrateBlock(block)) } coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocess } => { log::info!( "informed of batch (sign ID {}, attempt {}) for block {}", hex::encode(id.id), id.attempt, hex::encode(block), ); // If this is the first attempt instance, wait until we synchronize around the batch // first if id.attempt == 0 { // Save the preprocess to disk so we can publish it later // This is fine to use its own TX since it's static and just needs to be written // before this message finishes it handling (or with this message's finished handling) let mut txn = db.txn(); MainDb::::save_first_preprocess(&mut txn, id.id, preprocess); txn.commit(); Some(Transaction::Batch(block.0, id.id)) } else { Some(Transaction::BatchPreprocess(SignData { plan: id.id, attempt: id.attempt, data: preprocess, signed: Transaction::empty_signed(), })) } } coordinator::ProcessorMessage::BatchShare { id, share } => { Some(Transaction::BatchShare(SignData { plan: id.id, attempt: id.attempt, data: share.to_vec(), signed: Transaction::empty_signed(), })) } }, ProcessorMessage::Substrate(inner_msg) => match inner_msg { processor_messages::substrate::ProcessorMessage::Update { batch } => { assert_eq!( batch.batch.network, msg.network, "processor sent us a batch for a different network than it was for", ); // TODO: Check this key's key pair's substrate key is authorized to publish batches // Save this batch to the disk MainDb::new(&mut db).save_batch(batch); /* Use a dedicated task to publish batches due to the latency potentially incurred. This does not guarantee the batch has actually been published when the message is `ack`ed to message-queue. Accordingly, if we reboot, these batches would be dropped (as we wouldn't see the `Update` again, triggering our re-attempt to publish). The solution to this is to have the task try not to publish the batch which caused it to be spawned, yet all saved batches which have yet to published. This does risk having multiple tasks trying to publish all pending batches, yet these aren't notably complex. */ tokio::spawn({ let mut db = db.clone(); let serai = serai.clone(); let network = msg.network; async move { // Since we have a new batch, publish all batches yet to be published to Serai // This handles the edge-case where batch n+1 is signed before batch n is while let Some(batch) = { // Get the next-to-execute batch ID let next = { let mut first = true; loop { if !first { log::error!( "couldn't connect to Serai node to get the next batch ID for {network:?}", ); tokio::time::sleep(Duration::from_secs(5)).await; } first = false; let Ok(latest_block) = serai.get_latest_block().await else { continue }; let Ok(last) = serai.get_last_batch_for_network(latest_block.hash(), network).await else { continue; }; break if let Some(last) = last { last + 1 } else { 0 }; } }; // If we have this batch, attempt to publish it MainDb::new(&mut db).batch(network, next) } { let id = batch.batch.id; let block = batch.batch.block; let tx = Serai::execute_batch(batch); // This publish may fail if this transactions already exists in the mempool, which // is possible, or if this batch was already executed on-chain // Either case will have eventual resolution and be handled by the above check on // if this block should execute if serai.publish(&tx).await.is_ok() { log::info!("published batch {network:?} {id} (block {})", hex::encode(block)); } } } }); None } }, }; // If this created a transaction, publish it if let Some(mut tx) = tx { log::trace!("processor message effected transaction {}", hex::encode(tx.hash())); let tributaries = tributaries.read().await; log::trace!("read global tributaries"); let Some(tributary) = tributaries.get(&genesis) else { // TODO: This can happen since Substrate tells the Processor to generate commitments // at the same time it tells the Tributary to be created // There's no guarantee the Tributary will have been created though panic!("processor is operating on tributary we don't have"); }; let tributary = tributary.tributary.read().await; log::trace!("read specific tributary"); match tx.kind() { TransactionKind::Provided(_) => { log::trace!("providing transaction {}", hex::encode(tx.hash())); let res = tributary.provide_transaction(tx).await; if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) { panic!("provided an invalid transaction: {res:?}"); } } TransactionKind::Unsigned => { log::trace!("publishing unsigned transaction {}", hex::encode(tx.hash())); // Ignores the result since we can't differentiate already in-mempool from already // on-chain from invalid // TODO: Don't ignore the result tributary.add_transaction(tx).await; } TransactionKind::Signed(_) => { // Get the next nonce // TODO: This should be deterministic, not just DB-backed, to allow rebuilding validators // without the prior instance's DB // let mut txn = db.txn(); // let nonce = MainDb::tx_nonce(&mut txn, msg.id, tributary); // TODO: This isn't deterministic, or at least DB-backed, and accordingly is unsafe log::trace!("getting next nonce for Tributary TX in response to processor message"); let nonce = tributary .next_nonce(Ristretto::generator() * key.deref()) .await .expect("publishing a TX to a tributary we aren't in"); tx.sign(&mut OsRng, genesis, &key, nonce); publish_transaction(&tributary, tx).await; // txn.commit(); } } } processors.ack(msg).await; } } pub async fn run( mut raw_db: D, key: Zeroizing<::F>, p2p: P, processors: Pro, serai: Serai, ) { let serai = Arc::new(serai); // Handle new Substrate blocks tokio::spawn(scan_substrate(raw_db.clone(), key.clone(), processors.clone(), serai.clone())); // Handle the Tributaries // Arc so this can be shared between the Tributary scanner task and the P2P task // Write locks on this may take a while to acquire let tributaries = Arc::new(RwLock::new(HashMap::<[u8; 32], ActiveTributary>::new())); // Reload active tributaries from the database for spec in MainDb::new(&mut raw_db).active_tributaries().1 { let _ = add_tributary( raw_db.clone(), key.clone(), p2p.clone(), &mut *tributaries.write().await, spec, ) .await; } // When we reach synchrony on an event requiring signing, send our preprocess for it let recognized_id = { let raw_db = raw_db.clone(); let key = key.clone(); let tributaries = tributaries.clone(); move |network, genesis, id_type, id| { let raw_db = raw_db.clone(); let key = key.clone(); let tributaries = tributaries.clone(); async move { // SubstrateBlockAck is fired before Preprocess, creating a race between Tributary ack // of the SubstrateBlock and the sending of all Preprocesses // This waits until the necessary preprocess is available let get_preprocess = |raw_db, id| async move { loop { let Some(preprocess) = MainDb::::first_preprocess(raw_db, id) else { assert_eq!(id_type, RecognizedIdType::Plan); sleep(Duration::from_millis(100)).await; continue; }; return preprocess; } }; let mut tx = match id_type { RecognizedIdType::Batch => Transaction::BatchPreprocess(SignData { plan: id, attempt: 0, data: get_preprocess(&raw_db, id).await, signed: Transaction::empty_signed(), }), RecognizedIdType::Plan => Transaction::SignPreprocess(SignData { plan: id, attempt: 0, data: get_preprocess(&raw_db, id).await, signed: Transaction::empty_signed(), }), }; let tributaries = tributaries.read().await; let Some(tributary) = tributaries.get(&genesis) else { panic!("tributary we don't have came to consensus on an Batch"); }; let tributary = tributary.tributary.read().await; // TODO: Same note as prior nonce acquisition log::trace!("getting next nonce for Tributary TX containing Batch signing data"); let nonce = tributary .next_nonce(Ristretto::generator() * key.deref()) .await .expect("publishing a TX to a tributary we aren't in"); tx.sign(&mut OsRng, genesis, &key, nonce); publish_transaction(&tributary, tx).await; } } }; // Handle new blocks for each Tributary { let raw_db = raw_db.clone(); tokio::spawn(scan_tributaries( raw_db, key.clone(), recognized_id, p2p.clone(), processors.clone(), serai.clone(), tributaries.clone(), )); } // Spawn the heartbeat task, which will trigger syncing if there hasn't been a Tributary block // in a while (presumably because we're behind) tokio::spawn(heartbeat_tributaries(p2p.clone(), tributaries.clone())); // Handle P2P messages tokio::spawn(handle_p2p(Ristretto::generator() * key.deref(), p2p, tributaries.clone())); // Handle all messages from processors handle_processors(raw_db, key, serai, processors, tributaries).await; } #[tokio::main] async fn main() { // Override the panic handler with one which will panic if any tokio task panics { let existing = std::panic::take_hook(); std::panic::set_hook(Box::new(move |panic| { existing(panic); const MSG: &str = "exiting the process due to a task panicking"; println!("{MSG}"); log::error!("{MSG}"); std::process::exit(1); })); } if std::env::var("RUST_LOG").is_err() { std::env::set_var("RUST_LOG", serai_env::var("RUST_LOG").unwrap_or_else(|| "info".to_string())); } env_logger::init(); log::info!("starting coordinator service..."); let db = serai_db::new_rocksdb(&env::var("DB_PATH").expect("path to DB wasn't specified")); let key = { let mut key_hex = serai_env::var("SERAI_KEY").expect("Serai key wasn't provided"); let mut key_vec = hex::decode(&key_hex).map_err(|_| ()).expect("Serai key wasn't hex-encoded"); key_hex.zeroize(); if key_vec.len() != 32 { key_vec.zeroize(); panic!("Serai key had an invalid length"); } let mut key_bytes = [0; 32]; key_bytes.copy_from_slice(&key_vec); key_vec.zeroize(); let key = Zeroizing::new(::F::from_repr(key_bytes).unwrap()); key_bytes.zeroize(); key }; let p2p = LibP2p::new(); let processors = Arc::new(MessageQueue::from_env(Service::Coordinator)); let serai = || async { loop { let Ok(serai) = Serai::new(&format!( "ws://{}:9944", serai_env::var("SERAI_HOSTNAME").expect("Serai hostname wasn't provided") )) .await else { log::error!("couldn't connect to the Serai node"); sleep(Duration::from_secs(5)).await; continue; }; log::info!("made initial connection to Serai node"); return serai; } }; run(db, key, p2p, processors, serai().await).await }