diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index edd219f9..8bf4084d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -49,6 +49,7 @@ jobs: -p serai-processor-utxo-scheduler \ -p serai-processor-transaction-chaining-scheduler \ -p serai-processor-signers \ + -p serai-processor-bin \ -p serai-bitcoin-processor \ -p serai-ethereum-processor \ -p serai-monero-processor \ diff --git a/Cargo.lock b/Cargo.lock index 8c0c3dd5..7e7d78a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8131,6 +8131,7 @@ dependencies = [ "dkg", "env_logger", "flexible-transcript", + "hex", "log", "modular-frost", "parity-scale-codec", @@ -8140,6 +8141,7 @@ dependencies = [ "serai-db", "serai-env", "serai-message-queue", + "serai-processor-bin", "serai-processor-key-gen", "serai-processor-messages", "serai-processor-primitives", @@ -8150,6 +8152,7 @@ dependencies = [ "serai-processor-utxo-scheduler-primitives", "tokio", "zalloc", + "zeroize", ] [[package]] @@ -8635,6 +8638,40 @@ dependencies = [ "zeroize", ] +[[package]] +name = "serai-processor-bin" +version = "0.1.0" +dependencies = [ + "async-trait", + "bitcoin-serai", + "borsh", + "ciphersuite", + "dkg", + "env_logger", + "flexible-transcript", + "hex", + "log", + "modular-frost", + "parity-scale-codec", + "rand_core", + "secp256k1", + "serai-client", + "serai-db", + "serai-env", + "serai-message-queue", + "serai-processor-key-gen", + "serai-processor-messages", + "serai-processor-primitives", + "serai-processor-scanner", + "serai-processor-scheduler-primitives", + "serai-processor-signers", + "serai-processor-transaction-chaining-scheduler", + "serai-processor-utxo-scheduler-primitives", + "tokio", + "zalloc", + "zeroize", +] + [[package]] name = "serai-processor-frost-attempt-manager" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 25e6c25d..b35b3318 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,6 +83,7 @@ members = [ "processor/scheduler/utxo/transaction-chaining", "processor/signers", + "processor/bin", "processor/bitcoin", "processor/ethereum", "processor/monero", diff --git a/message-queue/src/main.rs b/message-queue/src/main.rs index c43cc3c8..03c580ce 100644 --- a/message-queue/src/main.rs +++ b/message-queue/src/main.rs @@ -72,6 +72,9 @@ pub(crate) fn queue_message( // Assert one, and only one of these, is the coordinator assert!(matches!(meta.from, Service::Coordinator) ^ matches!(meta.to, Service::Coordinator)); + // Lock the queue + let queue_lock = QUEUES.read().unwrap()[&(meta.from, meta.to)].write().unwrap(); + // Verify (from, to, intent) hasn't been prior seen fn key(domain: &'static [u8], key: impl AsRef<[u8]>) -> Vec { [&[u8::try_from(domain.len()).unwrap()], domain, key.as_ref()].concat() @@ -93,7 +96,7 @@ pub(crate) fn queue_message( DbTxn::put(&mut txn, intent_key, []); // Queue it - let id = QUEUES.read().unwrap()[&(meta.from, meta.to)].write().unwrap().queue_message( + let id = queue_lock.queue_message( &mut txn, QueuedMessage { from: meta.from, diff --git a/processor/bin/Cargo.toml b/processor/bin/Cargo.toml new file mode 100644 index 00000000..f3f3b753 --- /dev/null +++ b/processor/bin/Cargo.toml @@ -0,0 +1,60 @@ +[package] +name = "serai-processor-bin" +version = "0.1.0" +description = "Framework for Serai processor binaries" +license = "AGPL-3.0-only" +repository = "https://github.com/serai-dex/serai/tree/develop/processor/bin" +authors = ["Luke Parker "] +keywords = [] +edition = "2021" +publish = false + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[lints] +workspace = true + +[dependencies] +async-trait = { version = "0.1", default-features = false } +zeroize = { version = "1", default-features = false, features = ["std"] } +rand_core = { version = "0.6", default-features = false } + +hex = { version = "0.4", default-features = false, features = ["std"] } +scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] } +borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } + +transcript = { package = "flexible-transcript", path = "../../crypto/transcript", default-features = false, features = ["std", "recommended"] } +ciphersuite = { path = "../../crypto/ciphersuite", default-features = false, features = ["std", "secp256k1"] } +dkg = { path = "../../crypto/dkg", default-features = false, features = ["std", "evrf-secp256k1"] } +frost = { package = "modular-frost", path = "../../crypto/frost", default-features = false } + +secp256k1 = { version = "0.29", default-features = false, features = ["std", "global-context", "rand-std"] } +bitcoin-serai = { path = "../../networks/bitcoin", default-features = false, features = ["std"] } + +log = { version = "0.4", default-features = false, features = ["std"] } +env_logger = { version = "0.10", default-features = false, features = ["humantime"] } +tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] } + +zalloc = { path = "../../common/zalloc" } +serai-db = { path = "../../common/db" } +serai-env = { path = "../../common/env" } + +serai-client = { path = "../../substrate/client", default-features = false, features = ["bitcoin"] } + +messages = { package = "serai-processor-messages", path = "../messages" } +key-gen = { package = "serai-processor-key-gen", path = "../key-gen" } + +primitives = { package = "serai-processor-primitives", path = "../primitives" } +scheduler = { package = "serai-processor-scheduler-primitives", path = "../scheduler/primitives" } +scanner = { package = "serai-processor-scanner", path = "../scanner" } +utxo-scheduler = { package = "serai-processor-utxo-scheduler-primitives", path = "../scheduler/utxo/primitives" } +transaction-chaining-scheduler = { package = "serai-processor-transaction-chaining-scheduler", path = "../scheduler/utxo/transaction-chaining" } +signers = { package = "serai-processor-signers", path = "../signers" } + +message-queue = { package = "serai-message-queue", path = "../../message-queue" } + +[features] +parity-db = ["serai-db/parity-db"] +rocksdb = ["serai-db/rocksdb"] diff --git a/processor/bin/LICENSE b/processor/bin/LICENSE new file mode 100644 index 00000000..41d5a261 --- /dev/null +++ b/processor/bin/LICENSE @@ -0,0 +1,15 @@ +AGPL-3.0-only license + +Copyright (c) 2022-2024 Luke Parker + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License Version 3 as +published by the Free Software Foundation. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . diff --git a/processor/bin/README.md b/processor/bin/README.md new file mode 100644 index 00000000..858a2925 --- /dev/null +++ b/processor/bin/README.md @@ -0,0 +1,3 @@ +# Serai Processor Bin + +The framework for Serai processor binaries, common to the Serai processors. diff --git a/processor/bin/src/coordinator.rs b/processor/bin/src/coordinator.rs new file mode 100644 index 00000000..d9d8d112 --- /dev/null +++ b/processor/bin/src/coordinator.rs @@ -0,0 +1,196 @@ +use std::sync::Arc; + +use tokio::sync::mpsc; + +use scale::Encode; +use serai_client::{ + primitives::{NetworkId, Signature}, + validator_sets::primitives::Session, + in_instructions::primitives::{Batch, SignedBatch}, +}; + +use serai_env as env; +use serai_db::{Get, DbTxn, Db, create_db, db_channel}; +use message_queue::{Service, Metadata, client::MessageQueue}; + +create_db! { + ProcessorBinCoordinator { + SavedMessages: () -> u64, + } +} + +db_channel! { + ProcessorBinCoordinator { + CoordinatorMessages: () -> Vec + } +} + +async fn send(service: Service, queue: &MessageQueue, msg: messages::ProcessorMessage) { + let metadata = Metadata { from: service, to: Service::Coordinator, intent: msg.intent() }; + let msg = borsh::to_vec(&msg).unwrap(); + queue.queue(metadata, msg).await; +} + +pub(crate) struct Coordinator { + new_message: mpsc::UnboundedReceiver<()>, + service: Service, + message_queue: Arc, +} + +pub(crate) struct CoordinatorSend(Service, Arc); + +impl Coordinator { + pub(crate) fn new(mut db: crate::Db) -> Self { + let (new_message_send, new_message_recv) = mpsc::unbounded_channel(); + + let network_id = match env::var("NETWORK").expect("network wasn't specified").as_str() { + "bitcoin" => NetworkId::Bitcoin, + "ethereum" => NetworkId::Ethereum, + "monero" => NetworkId::Monero, + _ => panic!("unrecognized network"), + }; + let service = Service::Processor(network_id); + let message_queue = Arc::new(MessageQueue::from_env(service)); + + // Spawn a task to move messages from the message-queue to our database so we can achieve + // atomicity. This is the only place we read/ack messages from + tokio::spawn({ + let message_queue = message_queue.clone(); + async move { + loop { + let msg = message_queue.next(Service::Coordinator).await; + + let prior_msg = msg.id.checked_sub(1); + let saved_messages = SavedMessages::get(&db); + /* + This should either be: + A) The message after the message we just saved (as normal) + B) The message we just saved (if we rebooted and failed to ack it) + */ + assert!((saved_messages == prior_msg) || (saved_messages == Some(msg.id))); + if saved_messages < Some(msg.id) { + let mut txn = db.txn(); + CoordinatorMessages::send(&mut txn, &msg.msg); + SavedMessages::set(&mut txn, &msg.id); + txn.commit(); + } + // Acknowledge this message + message_queue.ack(Service::Coordinator, msg.id).await; + + // Fire that there's a new message + new_message_send.send(()).expect("failed to tell the Coordinator there's a new message"); + } + } + }); + + Coordinator { new_message: new_message_recv, service, message_queue } + } + + pub(crate) fn coordinator_send(&self) -> CoordinatorSend { + CoordinatorSend(self.service, self.message_queue.clone()) + } + + /// Fetch the next message from the Coordinator. + /// + /// This message is guaranteed to have never been handled before, where handling is defined as + /// this `txn` being committed. + pub(crate) async fn next_message( + &mut self, + txn: &mut impl DbTxn, + ) -> messages::CoordinatorMessage { + loop { + match CoordinatorMessages::try_recv(txn) { + Some(msg) => { + return borsh::from_slice(&msg) + .expect("message wasn't a borsh-encoded CoordinatorMessage") + } + None => { + let _ = + tokio::time::timeout(core::time::Duration::from_secs(60), self.new_message.recv()) + .await; + } + } + } + } + + #[allow(clippy::unused_async)] + pub(crate) async fn send_message(&mut self, msg: messages::ProcessorMessage) { + send(self.service, &self.message_queue, msg).await + } +} + +#[async_trait::async_trait] +impl signers::Coordinator for CoordinatorSend { + type EphemeralError = (); + + async fn send( + &mut self, + msg: messages::sign::ProcessorMessage, + ) -> Result<(), Self::EphemeralError> { + // TODO: Use a fallible send for these methods + send(self.0, &self.1, messages::ProcessorMessage::Sign(msg)).await; + Ok(()) + } + + async fn publish_cosign( + &mut self, + block_number: u64, + block: [u8; 32], + signature: Signature, + ) -> Result<(), Self::EphemeralError> { + send( + self.0, + &self.1, + messages::ProcessorMessage::Coordinator( + messages::coordinator::ProcessorMessage::CosignedBlock { + block_number, + block, + signature: signature.encode(), + }, + ), + ) + .await; + Ok(()) + } + + async fn publish_batch(&mut self, batch: Batch) -> Result<(), Self::EphemeralError> { + send( + self.0, + &self.1, + messages::ProcessorMessage::Substrate(messages::substrate::ProcessorMessage::Batch { batch }), + ) + .await; + Ok(()) + } + + async fn publish_signed_batch(&mut self, batch: SignedBatch) -> Result<(), Self::EphemeralError> { + send( + self.0, + &self.1, + messages::ProcessorMessage::Coordinator( + messages::coordinator::ProcessorMessage::SignedBatch { batch }, + ), + ) + .await; + Ok(()) + } + + async fn publish_slash_report_signature( + &mut self, + session: Session, + signature: Signature, + ) -> Result<(), Self::EphemeralError> { + send( + self.0, + &self.1, + messages::ProcessorMessage::Coordinator( + messages::coordinator::ProcessorMessage::SignedSlashReport { + session, + signature: signature.encode(), + }, + ), + ) + .await; + Ok(()) + } +} diff --git a/processor/bin/src/lib.rs b/processor/bin/src/lib.rs new file mode 100644 index 00000000..15873873 --- /dev/null +++ b/processor/bin/src/lib.rs @@ -0,0 +1,293 @@ +use core::cmp::Ordering; + +use zeroize::{Zeroize, Zeroizing}; + +use ciphersuite::{ + group::{ff::PrimeField, GroupEncoding}, + Ciphersuite, Ristretto, +}; +use dkg::evrf::EvrfCurve; + +use serai_client::validator_sets::primitives::Session; + +use serai_env as env; +use serai_db::{Get, DbTxn, Db as DbTrait, create_db, db_channel}; + +use primitives::EncodableG; +use ::key_gen::{KeyGenParams, KeyGen}; +use scheduler::SignableTransaction; +use scanner::{ScannerFeed, Scanner, KeyFor, Scheduler}; +use signers::{TransactionPublisher, Signers}; + +mod coordinator; +use coordinator::Coordinator; + +create_db! { + ProcessorBin { + ExternalKeyForSessionForSigners: (session: Session) -> EncodableG, + } +} + +db_channel! { + ProcessorBin { + KeyToActivate: () -> EncodableG + } +} + +/// The type used for the database. +#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))] +pub type Db = serai_db::ParityDb; +/// The type used for the database. +#[cfg(feature = "rocksdb")] +pub type Db = serai_db::RocksDB; + +/// Initialize the processor. +/// +/// Yields the database. +#[allow(unused_variables, unreachable_code)] +pub fn init() -> Db { + // Override the panic handler with one which will panic if any tokio task panics + { + let existing = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |panic| { + existing(panic); + const MSG: &str = "exiting the process due to a task panicking"; + println!("{MSG}"); + log::error!("{MSG}"); + std::process::exit(1); + })); + } + + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", serai_env::var("RUST_LOG").unwrap_or_else(|| "info".to_string())); + } + env_logger::init(); + + #[cfg(all(feature = "parity-db", not(feature = "rocksdb")))] + let db = + serai_db::new_parity_db(&serai_env::var("DB_PATH").expect("path to DB wasn't specified")); + #[cfg(feature = "rocksdb")] + let db = serai_db::new_rocksdb(&serai_env::var("DB_PATH").expect("path to DB wasn't specified")); + db +} + +/// THe URL for the external network's node. +pub fn url() -> String { + let login = env::var("NETWORK_RPC_LOGIN").expect("network RPC login wasn't specified"); + let hostname = env::var("NETWORK_RPC_HOSTNAME").expect("network RPC hostname wasn't specified"); + let port = env::var("NETWORK_RPC_PORT").expect("network port domain wasn't specified"); + "http://".to_string() + &login + "@" + &hostname + ":" + &port +} + +fn key_gen() -> KeyGen { + fn read_key_from_env(label: &'static str) -> Zeroizing { + let key_hex = + Zeroizing::new(env::var(label).unwrap_or_else(|| panic!("{label} wasn't provided"))); + let bytes = Zeroizing::new( + hex::decode(key_hex).unwrap_or_else(|_| panic!("{label} wasn't a valid hex string")), + ); + + let mut repr = ::Repr::default(); + if repr.as_ref().len() != bytes.len() { + panic!("{label} wasn't the correct length"); + } + repr.as_mut().copy_from_slice(bytes.as_slice()); + let res = Zeroizing::new( + Option::from(::from_repr(repr)) + .unwrap_or_else(|| panic!("{label} wasn't a valid scalar")), + ); + repr.as_mut().zeroize(); + res + } + KeyGen::new( + read_key_from_env::<::EmbeddedCurve>("SUBSTRATE_EVRF_KEY"), + read_key_from_env::<::EmbeddedCurve>( + "NETWORK_EVRF_KEY", + ), + ) +} + +async fn first_block_after_time(feed: &S, serai_time: u64) -> u64 { + async fn first_block_after_time_iteration( + feed: &S, + serai_time: u64, + ) -> Result, S::EphemeralError> { + let latest = feed.latest_finalized_block_number().await?; + let latest_time = feed.time_of_block(latest).await?; + if latest_time < serai_time { + tokio::time::sleep(core::time::Duration::from_secs(serai_time - latest_time)).await; + return Ok(None); + } + + // A finalized block has a time greater than or equal to the time we want to start at + // Find the first such block with a binary search + // start_search and end_search are inclusive + let mut start_search = 0; + let mut end_search = latest; + while start_search != end_search { + // This on purposely chooses the earlier block in the case two blocks are both in the middle + let to_check = start_search + ((end_search - start_search) / 2); + let block_time = feed.time_of_block(to_check).await?; + match block_time.cmp(&serai_time) { + Ordering::Less => { + start_search = to_check + 1; + assert!(start_search <= end_search); + } + Ordering::Equal | Ordering::Greater => { + // This holds true since we pick the earlier block upon an even search distance + // If it didn't, this would cause an infinite loop + assert!(to_check < end_search); + end_search = to_check; + } + } + } + Ok(Some(start_search)) + } + loop { + match first_block_after_time_iteration(feed, serai_time).await { + Ok(Some(block)) => return block, + Ok(None) => { + log::info!("waiting for block to activate at (a block with timestamp >= {serai_time})"); + } + Err(e) => { + log::error!("couldn't find the first block Serai should scan due to an RPC error: {e:?}"); + } + } + tokio::time::sleep(core::time::Duration::from_secs(5)).await; + } +} + +/// The main loop of a Processor, interacting with the Coordinator. +pub async fn coordinator_loop< + S: ScannerFeed, + K: KeyGenParams>>, + Sch: Scheduler< + S, + SignableTransaction: SignableTransaction, + >, + P: TransactionPublisher<::Transaction>, +>( + mut db: Db, + feed: S, + publisher: P, +) { + let mut coordinator = Coordinator::new(db.clone()); + + let mut key_gen = key_gen::(); + let mut scanner = Scanner::new::(db.clone(), feed.clone()).await; + let mut signers = + Signers::::new(db.clone(), coordinator.coordinator_send(), publisher); + + loop { + let db_clone = db.clone(); + let mut txn = db.txn(); + let msg = coordinator.next_message(&mut txn).await; + let mut txn = Some(txn); + match msg { + messages::CoordinatorMessage::KeyGen(msg) => { + let txn = txn.as_mut().unwrap(); + let mut new_key = None; + // This is a computationally expensive call yet it happens infrequently + for msg in key_gen.handle(txn, msg) { + if let messages::key_gen::ProcessorMessage::GeneratedKeyPair { session, .. } = &msg { + new_key = Some(*session) + } + coordinator.send_message(messages::ProcessorMessage::KeyGen(msg)).await; + } + + // If we were yielded a key, register it in the signers + if let Some(session) = new_key { + let (substrate_keys, network_keys) = KeyGen::::key_shares(txn, session) + .expect("generated key pair yet couldn't get key shares"); + signers.register_keys(txn, session, substrate_keys, network_keys); + } + } + + // These are cheap calls which are fine to be here in this loop + messages::CoordinatorMessage::Sign(msg) => { + let txn = txn.as_mut().unwrap(); + signers.queue_message(txn, &msg) + } + messages::CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { + session, + block_number, + block, + }, + ) => { + let txn = txn.take().unwrap(); + signers.cosign_block(txn, session, block_number, block) + } + messages::CoordinatorMessage::Coordinator( + messages::coordinator::CoordinatorMessage::SignSlashReport { session, report }, + ) => { + let txn = txn.take().unwrap(); + signers.sign_slash_report(txn, session, &report) + } + + messages::CoordinatorMessage::Substrate(msg) => match msg { + messages::substrate::CoordinatorMessage::SetKeys { serai_time, session, key_pair } => { + let txn = txn.as_mut().unwrap(); + let key = + EncodableG(K::decode_key(key_pair.1.as_ref()).expect("invalid key set on serai")); + + // Queue the key to be activated upon the next Batch + KeyToActivate::>::send(txn, &key); + + // Set the external key, as needed by the signers + ExternalKeyForSessionForSigners::>::set(txn, session, &key); + + // This is presumed extremely expensive, potentially blocking for several minutes, yet + // only happens for the very first set of keys + if session == Session(0) { + assert!(scanner.is_none()); + let start_block = first_block_after_time(&feed, serai_time).await; + scanner = + Some(Scanner::initialize::(db_clone, feed.clone(), start_block, key.0).await); + } + } + messages::substrate::CoordinatorMessage::SlashesReported { session } => { + let txn = txn.as_mut().unwrap(); + + // Since this session had its slashes reported, it has finished all its signature + // protocols and has been fully retired. We retire it from the signers accordingly + let key = ExternalKeyForSessionForSigners::>::take(txn, session).unwrap().0; + + // This is a cheap call + signers.retire_session(txn, session, &key) + } + messages::substrate::CoordinatorMessage::BlockWithBatchAcknowledgement { + block: _, + batch_id, + in_instruction_succeededs, + burns, + } => { + let mut txn = txn.take().unwrap(); + let scanner = scanner.as_mut().unwrap(); + let key_to_activate = KeyToActivate::>::try_recv(&mut txn).map(|key| key.0); + // This is a cheap call as it internally just queues this to be done later + scanner.acknowledge_batch( + txn, + batch_id, + in_instruction_succeededs, + burns, + key_to_activate, + ) + } + messages::substrate::CoordinatorMessage::BlockWithoutBatchAcknowledgement { + block: _, + burns, + } => { + let txn = txn.take().unwrap(); + let scanner = scanner.as_mut().unwrap(); + // This is a cheap call as it internally just queues this to be done later + scanner.queue_burns(txn, burns) + } + }, + }; + // If the txn wasn't already consumed and committed, commit it + if let Some(txn) = txn { + txn.commit(); + } + } +} diff --git a/processor/bitcoin/Cargo.toml b/processor/bitcoin/Cargo.toml index c92e1384..c968e36b 100644 --- a/processor/bitcoin/Cargo.toml +++ b/processor/bitcoin/Cargo.toml @@ -18,8 +18,10 @@ workspace = true [dependencies] async-trait = { version = "0.1", default-features = false } +zeroize = { version = "1", default-features = false, features = ["std"] } rand_core = { version = "0.6", default-features = false } +hex = { version = "0.4", default-features = false, features = ["std"] } scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] } borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } @@ -51,8 +53,10 @@ utxo-scheduler = { package = "serai-processor-utxo-scheduler-primitives", path = transaction-chaining-scheduler = { package = "serai-processor-transaction-chaining-scheduler", path = "../scheduler/utxo/transaction-chaining" } signers = { package = "serai-processor-signers", path = "../signers" } +bin = { package = "serai-processor-bin", path = "../bin" } + message-queue = { package = "serai-message-queue", path = "../../message-queue" } [features] -parity-db = ["serai-db/parity-db"] -rocksdb = ["serai-db/rocksdb"] +parity-db = ["bin/parity-db"] +rocksdb = ["bin/rocksdb"] diff --git a/processor/bitcoin/src/key_gen.rs b/processor/bitcoin/src/key_gen.rs index 75944364..41544134 100644 --- a/processor/bitcoin/src/key_gen.rs +++ b/processor/bitcoin/src/key_gen.rs @@ -7,22 +7,22 @@ pub(crate) struct KeyGenParams; impl key_gen::KeyGenParams for KeyGenParams { const ID: &'static str = "Bitcoin"; - type ExternalNetworkCurve = Secp256k1; + type ExternalNetworkCiphersuite = Secp256k1; - fn tweak_keys(keys: &mut ThresholdKeys) { + fn tweak_keys(keys: &mut ThresholdKeys) { *keys = bitcoin_serai::wallet::tweak_keys(keys); // Also create a scanner to assert these keys, and all expected paths, are usable scanner(keys.group_key()); } - fn encode_key(key: ::G) -> Vec { + fn encode_key(key: ::G) -> Vec { let key = key.to_bytes(); let key: &[u8] = key.as_ref(); // Skip the parity encoding as we know this key is even key[1 ..].to_vec() } - fn decode_key(key: &[u8]) -> Option<::G> { + fn decode_key(key: &[u8]) -> Option<::G> { x_coord_to_even_point(key) } } diff --git a/processor/bitcoin/src/main.rs b/processor/bitcoin/src/main.rs index 1c07b6cd..09228d44 100644 --- a/processor/bitcoin/src/main.rs +++ b/processor/bitcoin/src/main.rs @@ -6,16 +6,9 @@ static ALLOCATOR: zalloc::ZeroizingAlloc = zalloc::ZeroizingAlloc(std::alloc::System); -use core::cmp::Ordering; +use bitcoin_serai::rpc::Rpc as BRpc; -use ciphersuite::Ciphersuite; - -use serai_client::validator_sets::primitives::Session; - -use serai_db::{DbTxn, Db}; -use ::primitives::EncodableG; -use ::key_gen::KeyGenParams as KeyGenParamsTrait; -use scanner::{ScannerFeed, Scanner}; +use ::primitives::task::{Task, ContinuallyRan}; mod primitives; pub(crate) use crate::primitives::*; @@ -34,6 +27,7 @@ use scheduler::Scheduler; // Our custom code for Bitcoin mod db; mod txindex; +use txindex::TxIndexTask; pub(crate) fn hash_bytes(hash: bitcoin_serai::bitcoin::hashes::sha256d::Hash) -> [u8; 32] { use bitcoin_serai::bitcoin::hashes::Hash; @@ -43,204 +37,29 @@ pub(crate) fn hash_bytes(hash: bitcoin_serai::bitcoin::hashes::sha256d::Hash) -> res } -async fn first_block_after_time(feed: &S, serai_time: u64) -> u64 { - async fn first_block_after_time_iteration( - feed: &S, - serai_time: u64, - ) -> Result, S::EphemeralError> { - let latest = feed.latest_finalized_block_number().await?; - let latest_time = feed.time_of_block(latest).await?; - if latest_time < serai_time { - tokio::time::sleep(core::time::Duration::from_secs(serai_time - latest_time)).await; - return Ok(None); - } - - // A finalized block has a time greater than or equal to the time we want to start at - // Find the first such block with a binary search - // start_search and end_search are inclusive - let mut start_search = 0; - let mut end_search = latest; - while start_search != end_search { - // This on purposely chooses the earlier block in the case two blocks are both in the middle - let to_check = start_search + ((end_search - start_search) / 2); - let block_time = feed.time_of_block(to_check).await?; - match block_time.cmp(&serai_time) { - Ordering::Less => { - start_search = to_check + 1; - assert!(start_search <= end_search); - } - Ordering::Equal | Ordering::Greater => { - // This holds true since we pick the earlier block upon an even search distance - // If it didn't, this would cause an infinite loop - assert!(to_check < end_search); - end_search = to_check; - } - } - } - Ok(Some(start_search)) - } - loop { - match first_block_after_time_iteration(feed, serai_time).await { - Ok(Some(block)) => return block, - Ok(None) => { - log::info!("waiting for block to activate at (a block with timestamp >= {serai_time})"); - } - Err(e) => { - log::error!("couldn't find the first block Serai should scan due to an RPC error: {e:?}"); - } - } - tokio::time::sleep(core::time::Duration::from_secs(5)).await; - } -} - -/// Fetch the next message from the Coordinator. -/// -/// This message is guaranteed to have never been handled before, where handling is defined as -/// this `txn` being committed. -async fn next_message(_txn: &mut impl DbTxn) -> messages::CoordinatorMessage { - todo!("TODO") -} - -async fn send_message(_msg: messages::ProcessorMessage) { - todo!("TODO") -} - -async fn coordinator_loop( - mut db: D, - feed: Rpc, - mut key_gen: ::key_gen::KeyGen, - mut signers: signers::Signers, Scheduler, Rpc>, - mut scanner: Option>>, -) { - loop { - let db_clone = db.clone(); - let mut txn = db.txn(); - let msg = next_message(&mut txn).await; - let mut txn = Some(txn); - match msg { - messages::CoordinatorMessage::KeyGen(msg) => { - let txn = txn.as_mut().unwrap(); - let mut new_key = None; - // This is a computationally expensive call yet it happens infrequently - for msg in key_gen.handle(txn, msg) { - if let messages::key_gen::ProcessorMessage::GeneratedKeyPair { session, .. } = &msg { - new_key = Some(*session) - } - send_message(messages::ProcessorMessage::KeyGen(msg)).await; - } - - // If we were yielded a key, register it in the signers - if let Some(session) = new_key { - let (substrate_keys, network_keys) = - ::key_gen::KeyGen::::key_shares(txn, session) - .expect("generated key pair yet couldn't get key shares"); - signers.register_keys(txn, session, substrate_keys, network_keys); - } - } - - // These are cheap calls which are fine to be here in this loop - messages::CoordinatorMessage::Sign(msg) => { - let txn = txn.as_mut().unwrap(); - signers.queue_message(txn, &msg) - } - messages::CoordinatorMessage::Coordinator( - messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { - session, - block_number, - block, - }, - ) => { - let txn = txn.take().unwrap(); - signers.cosign_block(txn, session, block_number, block) - } - messages::CoordinatorMessage::Coordinator( - messages::coordinator::CoordinatorMessage::SignSlashReport { session, report }, - ) => { - let txn = txn.take().unwrap(); - signers.sign_slash_report(txn, session, &report) - } - - messages::CoordinatorMessage::Substrate(msg) => match msg { - messages::substrate::CoordinatorMessage::SetKeys { serai_time, session, key_pair } => { - let txn = txn.as_mut().unwrap(); - let key = EncodableG( - KeyGenParams::decode_key(key_pair.1.as_ref()).expect("invalid key set on serai"), - ); - - // Queue the key to be activated upon the next Batch - db::KeyToActivate::< - <::ExternalNetworkCurve as Ciphersuite>::G, - >::send(txn, &key); - - // Set the external key, as needed by the signers - db::ExternalKeyForSessionForSigners::< - <::ExternalNetworkCurve as Ciphersuite>::G, - >::set(txn, session, &key); - - // This is presumed extremely expensive, potentially blocking for several minutes, yet - // only happens for the very first set of keys - if session == Session(0) { - assert!(scanner.is_none()); - let start_block = first_block_after_time(&feed, serai_time).await; - scanner = - Some(Scanner::new::>(db_clone, feed.clone(), start_block, key.0).await); - } - } - messages::substrate::CoordinatorMessage::SlashesReported { session } => { - let txn = txn.as_mut().unwrap(); - - // Since this session had its slashes reported, it has finished all its signature - // protocols and has been fully retired. We retire it from the signers accordingly - let key = db::ExternalKeyForSessionForSigners::< - <::ExternalNetworkCurve as Ciphersuite>::G, - >::take(txn, session) - .unwrap() - .0; - - // This is a cheap call - signers.retire_session(txn, session, &key) - } - messages::substrate::CoordinatorMessage::BlockWithBatchAcknowledgement { - block: _, - batch_id, - in_instruction_succeededs, - burns, - } => { - let mut txn = txn.take().unwrap(); - let scanner = scanner.as_mut().unwrap(); - let key_to_activate = db::KeyToActivate::< - <::ExternalNetworkCurve as Ciphersuite>::G, - >::try_recv(&mut txn) - .map(|key| key.0); - // This is a cheap call as it internally just queues this to be done later - scanner.acknowledge_batch( - txn, - batch_id, - in_instruction_succeededs, - burns, - key_to_activate, - ) - } - messages::substrate::CoordinatorMessage::BlockWithoutBatchAcknowledgement { - block: _, - burns, - } => { - let txn = txn.take().unwrap(); - let scanner = scanner.as_mut().unwrap(); - // This is a cheap call as it internally just queues this to be done later - scanner.queue_burns(txn, burns) - } - }, - }; - // If the txn wasn't already consumed and committed, commit it - if let Some(txn) = txn { - txn.commit(); - } - } -} - #[tokio::main] -async fn main() {} +async fn main() { + let db = bin::init(); + let feed = Rpc { + db: db.clone(), + rpc: loop { + match BRpc::new(bin::url()).await { + Ok(rpc) => break rpc, + Err(e) => { + log::error!("couldn't connect to the Bitcoin node: {e:?}"); + tokio::time::sleep(core::time::Duration::from_secs(5)).await; + } + } + }, + }; + + let (index_task, index_handle) = Task::new(); + tokio::spawn(TxIndexTask(feed.clone()).continually_run(index_task, vec![])); + core::mem::forget(index_handle); + + bin::coordinator_loop::<_, KeyGenParams, Scheduler<_>, Rpc>(db, feed.clone(), feed) + .await; +} /* use bitcoin_serai::{ @@ -278,9 +97,6 @@ use serai_client::{ */ /* -#[derive(Clone, Copy, PartialEq, Eq, Debug)] -pub(crate) struct Fee(u64); - #[async_trait] impl TransactionTrait for Transaction { #[cfg(test)] diff --git a/processor/bitcoin/src/txindex.rs b/processor/bitcoin/src/txindex.rs index 2d3f1cd6..4ed38973 100644 --- a/processor/bitcoin/src/txindex.rs +++ b/processor/bitcoin/src/txindex.rs @@ -35,7 +35,7 @@ pub(crate) fn script_pubkey_for_on_chain_output( ) } -pub(crate) struct TxIndexTask(Rpc); +pub(crate) struct TxIndexTask(pub(crate) Rpc); #[async_trait::async_trait] impl ContinuallyRan for TxIndexTask { diff --git a/processor/key-gen/src/db.rs b/processor/key-gen/src/db.rs index 676fd2aa..149fe1a2 100644 --- a/processor/key-gen/src/db.rs +++ b/processor/key-gen/src/db.rs @@ -19,7 +19,7 @@ pub(crate) struct Params { pub(crate) substrate_evrf_public_keys: Vec<<::EmbeddedCurve as Ciphersuite>::G>, pub(crate) network_evrf_public_keys: - Vec<<::EmbeddedCurve as Ciphersuite>::G>, + Vec<<::EmbeddedCurve as Ciphersuite>::G>, } #[derive(BorshSerialize, BorshDeserialize)] @@ -93,9 +93,9 @@ impl KeyGenDb

{ .network_evrf_public_keys .into_iter() .map(|key| { - <::EmbeddedCurve as Ciphersuite>::read_G::<&[u8]>( - &mut key.as_ref(), - ) + <::EmbeddedCurve as Ciphersuite>::read_G::< + &[u8], + >(&mut key.as_ref()) .unwrap() }) .collect(), @@ -118,7 +118,7 @@ impl KeyGenDb

{ txn: &mut impl DbTxn, session: Session, substrate_keys: &[ThresholdKeys], - network_keys: &[ThresholdKeys], + network_keys: &[ThresholdKeys], ) { assert_eq!(substrate_keys.len(), network_keys.len()); @@ -134,7 +134,8 @@ impl KeyGenDb

{ pub(crate) fn key_shares( getter: &impl Get, session: Session, - ) -> Option<(Vec>, Vec>)> { + ) -> Option<(Vec>, Vec>)> + { let keys = _db::KeyShares::get(getter, &session)?; let mut keys: &[u8] = keys.as_ref(); diff --git a/processor/key-gen/src/lib.rs b/processor/key-gen/src/lib.rs index cb23a740..fd847cc5 100644 --- a/processor/key-gen/src/lib.rs +++ b/processor/key-gen/src/lib.rs @@ -34,27 +34,29 @@ pub trait KeyGenParams { const ID: &'static str; /// The curve used for the external network. - type ExternalNetworkCurve: EvrfCurve< + type ExternalNetworkCiphersuite: EvrfCurve< EmbeddedCurve: Ciphersuite< - G: ec_divisors::DivisorCurve::F>, + G: ec_divisors::DivisorCurve< + FieldElement = ::F, + >, >, >; /// Tweaks keys as necessary/beneficial. - fn tweak_keys(keys: &mut ThresholdKeys); + fn tweak_keys(keys: &mut ThresholdKeys); /// Encode keys as optimal. /// /// A default implementation is provided which calls the traditional `to_bytes`. - fn encode_key(key: ::G) -> Vec { + fn encode_key(key: ::G) -> Vec { key.to_bytes().as_ref().to_vec() } /// Decode keys from their optimal encoding. /// /// A default implementation is provided which calls the traditional `from_bytes`. - fn decode_key(mut key: &[u8]) -> Option<::G> { - let res = ::read_G(&mut key).ok()?; + fn decode_key(mut key: &[u8]) -> Option<::G> { + let res = ::read_G(&mut key).ok()?; if !key.is_empty() { None?; } @@ -143,7 +145,7 @@ pub struct KeyGen { substrate_evrf_private_key: Zeroizing<<::EmbeddedCurve as Ciphersuite>::F>, network_evrf_private_key: - Zeroizing<<::EmbeddedCurve as Ciphersuite>::F>, + Zeroizing<<::EmbeddedCurve as Ciphersuite>::F>, } impl KeyGen

{ @@ -154,7 +156,7 @@ impl KeyGen

{ <::EmbeddedCurve as Ciphersuite>::F, >, network_evrf_private_key: Zeroizing< - <::EmbeddedCurve as Ciphersuite>::F, + <::EmbeddedCurve as Ciphersuite>::F, >, ) -> KeyGen

{ KeyGen { substrate_evrf_private_key, network_evrf_private_key } @@ -165,7 +167,8 @@ impl KeyGen

{ pub fn key_shares( getter: &impl Get, session: Session, - ) -> Option<(Vec>, Vec>)> { + ) -> Option<(Vec>, Vec>)> + { // This is safe, despite not having a txn, since it's a static value // It doesn't change over time/in relation to other operations // It is solely set or unset @@ -198,7 +201,7 @@ impl KeyGen

{ let network_evrf_public_keys = evrf_public_keys.into_iter().map(|(_, key)| key).collect::>(); let (network_evrf_public_keys, additional_faulty) = - coerce_keys::(&network_evrf_public_keys); + coerce_keys::(&network_evrf_public_keys); faulty.extend(additional_faulty); // Participate for both Substrate and the network @@ -228,7 +231,7 @@ impl KeyGen

{ &self.substrate_evrf_private_key, &mut participation, ); - participate::( + participate::( context::

(session, NETWORK_KEY_CONTEXT), threshold, &network_evrf_public_keys, @@ -283,7 +286,7 @@ impl KeyGen

{ }; let len_at_network_participation_start_pos = participation.len(); let Ok(network_participation) = - Participation::::read(&mut participation, n) + Participation::::read(&mut participation, n) else { return blame; }; @@ -317,7 +320,7 @@ impl KeyGen

{ } } - match EvrfDkg::::verify( + match EvrfDkg::::verify( &mut OsRng, generators(), context::

(session, NETWORK_KEY_CONTEXT), @@ -490,7 +493,7 @@ impl KeyGen

{ Err(blames) => return blames, }; - let network_dkg = match verify_dkg::( + let network_dkg = match verify_dkg::( txn, session, false, diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 107616cc..49ab1785 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -70,6 +70,8 @@ impl OutputWithInInstruction { create_db!( ScannerGlobal { + StartBlock: () -> u64, + QueuedKey: (key: K) -> (), ActiveKeys: () -> Vec>, @@ -106,8 +108,11 @@ create_db!( pub(crate) struct ScannerGlobalDb(PhantomData); impl ScannerGlobalDb { - pub(crate) fn has_any_key_been_queued(getter: &impl Get) -> bool { - ActiveKeys::>>::get(getter).is_some() + pub(crate) fn start_block(getter: &impl Get) -> Option { + StartBlock::get(getter) + } + pub(crate) fn set_start_block(txn: &mut impl DbTxn, block: u64) { + StartBlock::set(txn, &block) } /// Queue a key. diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 6ed16d74..ebd783bf 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -344,17 +344,10 @@ impl Scanner { /// Create a new scanner. /// /// This will begin its execution, spawning several asynchronous tasks. - pub async fn new>( - mut db: impl Db, - feed: S, - start_block: u64, - start_key: KeyFor, - ) -> Self { - if !ScannerGlobalDb::::has_any_key_been_queued(&db) { - let mut txn = db.txn(); - ScannerGlobalDb::::queue_key(&mut txn, start_block, start_key); - txn.commit(); - } + /// + /// This will return None if the Scanner was never initialized. + pub async fn new>(db: impl Db, feed: S) -> Option { + let start_block = ScannerGlobalDb::::start_block(&db)?; let index_task = index::IndexTask::new(db.clone(), feed.clone(), start_block).await; let scan_task = scan::ScanTask::new(db.clone(), feed.clone(), start_block); @@ -381,7 +374,28 @@ impl Scanner { // window its allowed to scan tokio::spawn(eventuality_task.continually_run(eventuality_task_def, vec![scan_handle])); - Self { substrate_handle, _S: PhantomData } + Some(Self { substrate_handle, _S: PhantomData }) + } + + /// Initialize the scanner. + /// + /// This will begin its execution, spawning several asynchronous tasks. + /// + /// This passes through to `Scanner::new` if prior called. + pub async fn initialize>( + mut db: impl Db, + feed: S, + start_block: u64, + start_key: KeyFor, + ) -> Self { + if ScannerGlobalDb::::start_block(&db).is_none() { + let mut txn = db.txn(); + ScannerGlobalDb::::set_start_block(&mut txn, start_block); + ScannerGlobalDb::::queue_key(&mut txn, start_block, start_key); + txn.commit(); + } + + Self::new::(db, feed).await.unwrap() } /// Acknowledge a Batch having been published on Serai. diff --git a/processor/signers/src/coordinator/mod.rs b/processor/signers/src/coordinator/mod.rs index a3163922..e749f841 100644 --- a/processor/signers/src/coordinator/mod.rs +++ b/processor/signers/src/coordinator/mod.rs @@ -114,6 +114,7 @@ impl ContinuallyRan for CoordinatorTask { self .coordinator .publish_slash_report_signature( + session, <_>::decode(&mut slash_report_signature.as_slice()).unwrap(), ) .await diff --git a/processor/signers/src/lib.rs b/processor/signers/src/lib.rs index 881205f8..e06dd07f 100644 --- a/processor/signers/src/lib.rs +++ b/processor/signers/src/lib.rs @@ -71,6 +71,7 @@ pub trait Coordinator: 'static + Send + Sync { /// Publish a slash report's signature. async fn publish_slash_report_signature( &mut self, + session: Session, signature: Signature, ) -> Result<(), Self::EphemeralError>; } diff --git a/processor/src/coordinator.rs b/processor/src/coordinator.rs deleted file mode 100644 index 26786e30..00000000 --- a/processor/src/coordinator.rs +++ /dev/null @@ -1,43 +0,0 @@ -use messages::{ProcessorMessage, CoordinatorMessage}; - -use message_queue::{Service, Metadata, client::MessageQueue}; - -#[derive(Clone, PartialEq, Eq, Debug)] -pub struct Message { - pub id: u64, - pub msg: CoordinatorMessage, -} - -#[async_trait::async_trait] -pub trait Coordinator { - async fn send(&mut self, msg: impl Send + Into); - async fn recv(&mut self) -> Message; - async fn ack(&mut self, msg: Message); -} - -#[async_trait::async_trait] -impl Coordinator for MessageQueue { - async fn send(&mut self, msg: impl Send + Into) { - let msg: ProcessorMessage = msg.into(); - let metadata = Metadata { from: self.service, to: Service::Coordinator, intent: msg.intent() }; - let msg = borsh::to_vec(&msg).unwrap(); - - self.queue(metadata, msg).await; - } - - async fn recv(&mut self) -> Message { - let msg = self.next(Service::Coordinator).await; - - let id = msg.id; - - // Deserialize it into a CoordinatorMessage - let msg: CoordinatorMessage = - borsh::from_slice(&msg.msg).expect("message wasn't a borsh-encoded CoordinatorMessage"); - - return Message { id, msg }; - } - - async fn ack(&mut self, msg: Message) { - MessageQueue::ack(self, Service::Coordinator, msg.id).await - } -} diff --git a/processor/src/db.rs b/processor/src/db.rs deleted file mode 100644 index ffd7c43a..00000000 --- a/processor/src/db.rs +++ /dev/null @@ -1,43 +0,0 @@ -use std::io::Read; - -use scale::{Encode, Decode}; -use serai_client::validator_sets::primitives::{Session, KeyPair}; - -pub use serai_db::*; - -use crate::networks::{Block, Network}; - -create_db!( - MainDb { - HandledMessageDb: (id: u64) -> (), - PendingActivationsDb: () -> Vec - } -); - -impl PendingActivationsDb { - pub fn pending_activation( - getter: &impl Get, - ) -> Option<(>::Id, Session, KeyPair)> { - if let Some(bytes) = Self::get(getter) { - if !bytes.is_empty() { - let mut slice = bytes.as_slice(); - let (session, key_pair) = <(Session, KeyPair)>::decode(&mut slice).unwrap(); - let mut block_before_queue_block = >::Id::default(); - slice.read_exact(block_before_queue_block.as_mut()).unwrap(); - assert!(slice.is_empty()); - return Some((block_before_queue_block, session, key_pair)); - } - } - None - } - pub fn set_pending_activation( - txn: &mut impl DbTxn, - block_before_queue_block: &>::Id, - session: Session, - key_pair: KeyPair, - ) { - let mut buf = (session, key_pair).encode(); - buf.extend(block_before_queue_block.as_ref()); - Self::set(txn, &buf); - } -} diff --git a/processor/src/main.rs b/processor/src/main.rs index 65e74f55..b4a5053a 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -60,263 +60,9 @@ async fn handle_coordinator_msg( } } -async fn boot( - raw_db: &mut D, - network: &N, - coordinator: &mut Co, -) -> (D, TributaryMutable, SubstrateMutable) { - fn read_key_from_env(label: &'static str) -> Zeroizing { - let key_hex = - Zeroizing::new(env::var(label).unwrap_or_else(|| panic!("{label} wasn't provided"))); - let bytes = Zeroizing::new( - hex::decode(key_hex).unwrap_or_else(|_| panic!("{label} wasn't a valid hex string")), - ); - - let mut repr = ::Repr::default(); - if repr.as_ref().len() != bytes.len() { - panic!("{label} wasn't the correct length"); - } - repr.as_mut().copy_from_slice(bytes.as_slice()); - let res = Zeroizing::new( - Option::from(::from_repr(repr)) - .unwrap_or_else(|| panic!("{label} wasn't a valid scalar")), - ); - repr.as_mut().zeroize(); - res - } - - let key_gen = KeyGen::::new( - raw_db.clone(), - read_key_from_env::<::EmbeddedCurve>("SUBSTRATE_EVRF_KEY"), - read_key_from_env::<::EmbeddedCurve>("NETWORK_EVRF_KEY"), - ); - - let (multisig_manager, current_keys, actively_signing) = - MultisigManager::new(raw_db, network).await; - - let mut batch_signer = None; - let mut signers = HashMap::new(); - - for (i, key) in current_keys.iter().enumerate() { - let Some((session, (substrate_keys, network_keys))) = key_gen.keys(key) else { continue }; - let network_key = network_keys[0].group_key(); - - // If this is the oldest key, load the BatchSigner for it as the active BatchSigner - // The new key only takes responsibility once the old key is fully deprecated - // - // We don't have to load any state for this since the Scanner will re-fire any events - // necessary, only no longer scanning old blocks once Substrate acks them - if i == 0 { - batch_signer = Some(BatchSigner::new(N::NETWORK, session, substrate_keys)); - } - - // The Scanner re-fires events as needed for batch_signer yet not signer - // This is due to the transactions which we start signing from due to a block not being - // guaranteed to be signed before we stop scanning the block on reboot - // We could simplify the Signer flow by delaying when it acks a block, yet that'd: - // 1) Increase the startup time - // 2) Cause re-emission of Batch events, which we'd need to check the safety of - // (TODO: Do anyways?) - // 3) Violate the attempt counter (TODO: Is this already being violated?) - let mut signer = Signer::new(network.clone(), session, network_keys); - - // Sign any TXs being actively signed - for (plan, tx, eventuality) in &actively_signing { - if plan.key == network_key { - let mut txn = raw_db.txn(); - if let Some(msg) = - signer.sign_transaction(&mut txn, plan.id(), tx.clone(), eventuality).await - { - coordinator.send(msg).await; - } - // This should only have re-writes of existing data - drop(txn); - } - } - - signers.insert(session, signer); - } - - // Spawn a task to rebroadcast signed TXs yet to be mined into a finalized block - // This hedges against being dropped due to full mempools, temporarily too low of a fee... - tokio::spawn(Signer::::rebroadcast_task(raw_db.clone(), network.clone())); - - ( - raw_db.clone(), - TributaryMutable { key_gen, batch_signer, cosigner: None, slash_report_signer: None, signers }, - multisig_manager, - ) -} - -#[allow(clippy::await_holding_lock)] // Needed for txn, unfortunately can't be down-scoped -async fn run(mut raw_db: D, network: N, mut coordinator: Co) { - // We currently expect a contextless bidirectional mapping between these two values - // (which is that any value of A can be interpreted as B and vice versa) - // While we can write a contextual mapping, we have yet to do so - // This check ensures no network which doesn't have a bidirectional mapping is defined - assert_eq!(>::Id::default().as_ref().len(), BlockHash([0u8; 32]).0.len()); - - let (main_db, mut tributary_mutable, mut substrate_mutable) = - boot(&mut raw_db, &network, &mut coordinator).await; - - // We can't load this from the DB as we can't guarantee atomic increments with the ack function - // TODO: Load with a slight tolerance - let mut last_coordinator_msg = None; - - loop { - let mut txn = raw_db.txn(); - - log::trace!("new db txn in run"); - - let mut outer_msg = None; - - tokio::select! { - // This blocks the entire processor until it finishes handling this message - // KeyGen specifically may take a notable amount of processing time - // While that shouldn't be an issue in practice, as after processing an attempt it'll handle - // the other messages in the queue, it may be beneficial to parallelize these - // They could potentially be parallelized by type (KeyGen, Sign, Substrate) without issue - msg = coordinator.recv() => { - if let Some(last_coordinator_msg) = last_coordinator_msg { - assert_eq!(msg.id, last_coordinator_msg + 1); - } - last_coordinator_msg = Some(msg.id); - - // Only handle this if we haven't already - if HandledMessageDb::get(&main_db, msg.id).is_none() { - HandledMessageDb::set(&mut txn, msg.id, &()); - - // This is isolated to better think about how its ordered, or rather, about how the other - // cases aren't ordered - // - // While the coordinator messages are ordered, they're not deterministically ordered - // Tributary-caused messages are deterministically ordered, and Substrate-caused messages - // are deterministically-ordered, yet they're both shoved into a singular queue - // The order at which they're shoved in together isn't deterministic - // - // This is safe so long as Tributary and Substrate messages don't both expect mutable - // references over the same data - handle_coordinator_msg( - &mut txn, - &network, - &mut coordinator, - &mut tributary_mutable, - &mut substrate_mutable, - &msg, - ).await; - } - - outer_msg = Some(msg); - }, - - scanner_event = substrate_mutable.next_scanner_event() => { - let msg = substrate_mutable.scanner_event_to_multisig_event( - &mut txn, - &network, - scanner_event - ).await; - - match msg { - MultisigEvent::Batches(retired_key_new_key, batches) => { - // Start signing this batch - for batch in batches { - info!("created batch {} ({} instructions)", batch.id, batch.instructions.len()); - - // The coordinator expects BatchPreprocess to immediately follow Batch - coordinator.send( - messages::substrate::ProcessorMessage::Batch { batch: batch.clone() } - ).await; - - if let Some(batch_signer) = tributary_mutable.batch_signer.as_mut() { - if let Some(msg) = batch_signer.sign(&mut txn, batch) { - coordinator.send(msg).await; - } - } - } - - if let Some((retired_key, new_key)) = retired_key_new_key { - // Safe to mutate since all signing operations are done and no more will be added - if let Some(retired_session) = SessionDb::get(&txn, retired_key.to_bytes().as_ref()) { - tributary_mutable.signers.remove(&retired_session); - } - tributary_mutable.batch_signer.take(); - let keys = tributary_mutable.key_gen.keys(&new_key); - if let Some((session, (substrate_keys, _))) = keys { - tributary_mutable.batch_signer = - Some(BatchSigner::new(N::NETWORK, session, substrate_keys)); - } - } - }, - MultisigEvent::Completed(key, id, tx) => { - if let Some(session) = SessionDb::get(&txn, &key) { - let signer = tributary_mutable.signers.get_mut(&session).unwrap(); - if let Some(msg) = signer.completed(&mut txn, id, &tx) { - coordinator.send(msg).await; - } - } - } - } - }, - } - - txn.commit(); - if let Some(msg) = outer_msg { - coordinator.ack(msg).await; - } - } -} - #[tokio::main] async fn main() { - // Override the panic handler with one which will panic if any tokio task panics - { - let existing = std::panic::take_hook(); - std::panic::set_hook(Box::new(move |panic| { - existing(panic); - const MSG: &str = "exiting the process due to a task panicking"; - println!("{MSG}"); - log::error!("{MSG}"); - std::process::exit(1); - })); - } - - if std::env::var("RUST_LOG").is_err() { - std::env::set_var("RUST_LOG", serai_env::var("RUST_LOG").unwrap_or_else(|| "info".to_string())); - } - env_logger::init(); - - #[allow(unused_variables, unreachable_code)] - let db = { - #[cfg(all(feature = "parity-db", feature = "rocksdb"))] - panic!("built with parity-db and rocksdb"); - #[cfg(all(feature = "parity-db", not(feature = "rocksdb")))] - let db = - serai_db::new_parity_db(&serai_env::var("DB_PATH").expect("path to DB wasn't specified")); - #[cfg(feature = "rocksdb")] - let db = - serai_db::new_rocksdb(&serai_env::var("DB_PATH").expect("path to DB wasn't specified")); - db - }; - - // Network configuration - let url = { - let login = env::var("NETWORK_RPC_LOGIN").expect("network RPC login wasn't specified"); - let hostname = env::var("NETWORK_RPC_HOSTNAME").expect("network RPC hostname wasn't specified"); - let port = env::var("NETWORK_RPC_PORT").expect("network port domain wasn't specified"); - "http://".to_string() + &login + "@" + &hostname + ":" + &port - }; - let network_id = match env::var("NETWORK").expect("network wasn't specified").as_str() { - "bitcoin" => NetworkId::Bitcoin, - "ethereum" => NetworkId::Ethereum, - "monero" => NetworkId::Monero, - _ => panic!("unrecognized network"), - }; - - let coordinator = MessageQueue::from_env(Service::Processor(network_id)); - match network_id { - #[cfg(feature = "bitcoin")] - NetworkId::Bitcoin => run(db, Bitcoin::new(url).await, coordinator).await, #[cfg(feature = "ethereum")] NetworkId::Ethereum => { let relayer_hostname = env::var("ETHEREUM_RELAYER_HOSTNAME") @@ -327,8 +73,5 @@ async fn main() { let relayer_url = relayer_hostname + ":" + &relayer_port; run(db.clone(), Ethereum::new(db, url, relayer_url).await, coordinator).await } - #[cfg(feature = "monero")] - NetworkId::Monero => run(db, Monero::new(url).await, coordinator).await, - _ => panic!("spawning a processor for an unsupported network"), } }