From 1367e4151021eecf9f6ce4b059e4acadbc442f8b Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 19 Sep 2024 01:31:52 -0400 Subject: [PATCH] Add hooks to the main loop Lets the Ethereum processor track the first key set as soon as it's set. --- Cargo.lock | 1 + processor/bin/src/lib.rs | 11 +++++++ processor/bitcoin/src/main.rs | 2 +- processor/ethereum/Cargo.toml | 1 + processor/ethereum/router/src/lib.rs | 7 ----- processor/ethereum/src/main.rs | 34 ++++++++++++++++++++-- processor/ethereum/src/primitives/block.rs | 6 ++-- processor/ethereum/src/publisher.rs | 26 ++++++++++++----- processor/monero/src/main.rs | 2 +- 9 files changed, 67 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e2faecb..00cb2ac5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8355,6 +8355,7 @@ dependencies = [ "serai-processor-ethereum-primitives", "serai-processor-ethereum-router", "serai-processor-key-gen", + "serai-processor-messages", "serai-processor-primitives", "serai-processor-scanner", "serai-processor-scheduler-primitives", diff --git a/processor/bin/src/lib.rs b/processor/bin/src/lib.rs index 7758b1ea..651514ad 100644 --- a/processor/bin/src/lib.rs +++ b/processor/bin/src/lib.rs @@ -157,8 +157,18 @@ async fn first_block_after_time(feed: &S, serai_time: u64) -> u6 } } +/// Hooks to run during the main loop. +pub trait Hooks { + /// A hook to run upon receiving a message. + fn on_message(txn: &mut impl DbTxn, msg: &messages::CoordinatorMessage); +} +impl Hooks for () { + fn on_message(_: &mut impl DbTxn, _: &messages::CoordinatorMessage) {} +} + /// The main loop of a Processor, interacting with the Coordinator. pub async fn main_loop< + H: Hooks, S: ScannerFeed, K: KeyGenParams>>, Sch: Clone @@ -183,6 +193,7 @@ pub async fn main_loop< let db_clone = db.clone(); let mut txn = db.txn(); let msg = coordinator.next_message(&mut txn).await; + H::on_message(&mut txn, &msg); let mut txn = Some(txn); match msg { messages::CoordinatorMessage::KeyGen(msg) => { diff --git a/processor/bitcoin/src/main.rs b/processor/bitcoin/src/main.rs index f260c47c..5feb3e25 100644 --- a/processor/bitcoin/src/main.rs +++ b/processor/bitcoin/src/main.rs @@ -57,7 +57,7 @@ async fn main() { tokio::spawn(TxIndexTask(feed.clone()).continually_run(index_task, vec![])); core::mem::forget(index_handle); - bin::main_loop::<_, KeyGenParams, _>(db, feed.clone(), Scheduler::new(Planner), feed).await; + bin::main_loop::<(), _, KeyGenParams, _>(db, feed.clone(), Scheduler::new(Planner), feed).await; } /* diff --git a/processor/ethereum/Cargo.toml b/processor/ethereum/Cargo.toml index 649e3fb8..c2a6f581 100644 --- a/processor/ethereum/Cargo.toml +++ b/processor/ethereum/Cargo.toml @@ -49,6 +49,7 @@ tokio = { version = "1", default-features = false, features = ["rt-multi-thread" serai-env = { path = "../../common/env" } serai-db = { path = "../../common/db" } +messages = { package = "serai-processor-messages", path = "../messages" } key-gen = { package = "serai-processor-key-gen", path = "../key-gen" } primitives = { package = "serai-processor-primitives", path = "../primitives" } diff --git a/processor/ethereum/router/src/lib.rs b/processor/ethereum/router/src/lib.rs index 344e2bee..d56c514f 100644 --- a/processor/ethereum/router/src/lib.rs +++ b/processor/ethereum/router/src/lib.rs @@ -6,13 +6,6 @@ use std::{sync::Arc, io, collections::HashSet}; use group::ff::PrimeField; -/* -use k256::{ - elliptic_curve::{group::GroupEncoding, sec1}, - ProjectivePoint, -}; -*/ - use alloy_core::primitives::{hex::FromHex, Address, U256, Bytes, TxKind}; use alloy_consensus::TxLegacy; diff --git a/processor/ethereum/src/main.rs b/processor/ethereum/src/main.rs index 0ebf0f59..bfb9a8df 100644 --- a/processor/ethereum/src/main.rs +++ b/processor/ethereum/src/main.rs @@ -13,7 +13,13 @@ use alloy_simple_request_transport::SimpleRequest; use alloy_rpc_client::ClientBuilder; use alloy_provider::{Provider, RootProvider}; +use serai_client::validator_sets::primitives::Session; + use serai_env as env; +use serai_db::{Get, DbTxn, create_db}; + +use ::primitives::EncodableG; +use ::key_gen::KeyGenParams as KeyGenParamsTrait; mod primitives; pub(crate) use crate::primitives::*; @@ -27,6 +33,28 @@ use scheduler::{SmartContract, Scheduler}; mod publisher; use publisher::TransactionPublisher; +create_db! { + EthereumProcessor { + // The initial key for Serai on Ethereum + InitialSeraiKey: () -> EncodableG, + } +} + +struct SetInitialKey; +impl bin::Hooks for SetInitialKey { + fn on_message(txn: &mut impl DbTxn, msg: &messages::CoordinatorMessage) { + if let messages::CoordinatorMessage::Substrate( + messages::substrate::CoordinatorMessage::SetKeys { session, key_pair, .. }, + ) = msg + { + assert_eq!(*session, Session(0)); + let key = KeyGenParams::decode_key(key_pair.1.as_ref()) + .expect("invalid Ethereum key confirmed on Substrate"); + InitialSeraiKey::set(txn, &EncodableG(key)); + } + } +} + #[tokio::main] async fn main() { let db = bin::init(); @@ -45,11 +73,11 @@ async fn main() { } }; - bin::main_loop::<_, KeyGenParams, _>( - db, + bin::main_loop::( + db.clone(), Rpc { provider: provider.clone() }, Scheduler::new(SmartContract { chain_id }), - TransactionPublisher::new(provider, { + TransactionPublisher::new(db, provider, { let relayer_hostname = env::var("ETHEREUM_RELAYER_HOSTNAME") .expect("ethereum relayer hostname wasn't specified") .to_string(); diff --git a/processor/ethereum/src/primitives/block.rs b/processor/ethereum/src/primitives/block.rs index a6268c0b..cd26b400 100644 --- a/processor/ethereum/src/primitives/block.rs +++ b/processor/ethereum/src/primitives/block.rs @@ -6,7 +6,7 @@ use serai_client::networks::ethereum::Address; use primitives::{ReceivedOutput, EventualityTracker}; -use ethereum_router::Executed; +use ethereum_router::{InInstruction as EthereumInInstruction, Executed}; use crate::{output::Output, transaction::Eventuality}; @@ -43,7 +43,7 @@ impl primitives::BlockHeader for Epoch { #[derive(Clone, PartialEq, Eq, Debug)] pub(crate) struct FullEpoch { epoch: Epoch, - outputs: Vec, + instructions: Vec, executed: Vec, } @@ -72,7 +72,7 @@ impl primitives::Block for FullEpoch { // Associate all outputs with the latest active key // We don't associate these with the current key within the SC as that'll cause outputs to be // marked for forwarding if the SC is delayed to actually rotate - todo!("TODO") + self.instructions.iter().cloned().map(|instruction| Output { key, instruction }).collect() } #[allow(clippy::type_complexity)] diff --git a/processor/ethereum/src/publisher.rs b/processor/ethereum/src/publisher.rs index 03b1d24c..5a7a958a 100644 --- a/processor/ethereum/src/publisher.rs +++ b/processor/ethereum/src/publisher.rs @@ -13,22 +13,27 @@ use tokio::{ net::TcpStream, }; +use serai_db::Db; + use ethereum_schnorr::PublicKey; use ethereum_router::{OutInstructions, Router}; -use crate::transaction::{Action, Transaction}; +use crate::{ + InitialSeraiKey, + transaction::{Action, Transaction}, +}; #[derive(Clone)] -pub(crate) struct TransactionPublisher { - initial_serai_key: PublicKey, +pub(crate) struct TransactionPublisher { + db: D, rpc: Arc>, router: Arc>>, relayer_url: String, } -impl TransactionPublisher { - pub(crate) fn new(rpc: Arc>, relayer_url: String) -> Self { - Self { initial_serai_key: todo!("TODO"), rpc, router: Arc::new(RwLock::new(None)), relayer_url } +impl TransactionPublisher { + pub(crate) fn new(db: D, rpc: Arc>, relayer_url: String) -> Self { + Self { db, rpc, router: Arc::new(RwLock::new(None)), relayer_url } } // This will always return Ok(Some(_)) or Err(_), never Ok(None) @@ -43,7 +48,12 @@ impl TransactionPublisher { let mut router = self.router.write().await; // Check again if it's None in case a different task already did this if router.is_none() { - let Some(router_actual) = Router::new(self.rpc.clone(), &self.initial_serai_key).await? + let Some(router_actual) = Router::new( + self.rpc.clone(), + &PublicKey::new(InitialSeraiKey::get(&self.db).unwrap().0) + .expect("initial key used by Serai wasn't representable on Ethereum"), + ) + .await? else { Err(TransportErrorKind::Custom( "publishing transaction yet couldn't find router on chain. was our node reset?" @@ -60,7 +70,7 @@ impl TransactionPublisher { } } -impl signers::TransactionPublisher for TransactionPublisher { +impl signers::TransactionPublisher for TransactionPublisher { type EphemeralError = RpcError; fn publish( diff --git a/processor/monero/src/main.rs b/processor/monero/src/main.rs index d36118d0..b5c67f12 100644 --- a/processor/monero/src/main.rs +++ b/processor/monero/src/main.rs @@ -33,7 +33,7 @@ async fn main() { }, }; - bin::main_loop::<_, KeyGenParams, _>( + bin::main_loop::<(), _, KeyGenParams, _>( db, feed.clone(), Scheduler::new(Planner(feed.clone())),