mirror of
https://github.com/serai-dex/serai.git
synced 2024-12-25 21:19:35 +00:00
Add hooks to the main loop
Lets the Ethereum processor track the first key set as soon as it's set.
This commit is contained in:
parent
a691be21c8
commit
1367e41510
9 changed files with 67 additions and 23 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -8355,6 +8355,7 @@ dependencies = [
|
||||||
"serai-processor-ethereum-primitives",
|
"serai-processor-ethereum-primitives",
|
||||||
"serai-processor-ethereum-router",
|
"serai-processor-ethereum-router",
|
||||||
"serai-processor-key-gen",
|
"serai-processor-key-gen",
|
||||||
|
"serai-processor-messages",
|
||||||
"serai-processor-primitives",
|
"serai-processor-primitives",
|
||||||
"serai-processor-scanner",
|
"serai-processor-scanner",
|
||||||
"serai-processor-scheduler-primitives",
|
"serai-processor-scheduler-primitives",
|
||||||
|
|
|
@ -157,8 +157,18 @@ async fn first_block_after_time<S: ScannerFeed>(feed: &S, serai_time: u64) -> u6
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Hooks to run during the main loop.
|
||||||
|
pub trait Hooks {
|
||||||
|
/// A hook to run upon receiving a message.
|
||||||
|
fn on_message(txn: &mut impl DbTxn, msg: &messages::CoordinatorMessage);
|
||||||
|
}
|
||||||
|
impl Hooks for () {
|
||||||
|
fn on_message(_: &mut impl DbTxn, _: &messages::CoordinatorMessage) {}
|
||||||
|
}
|
||||||
|
|
||||||
/// The main loop of a Processor, interacting with the Coordinator.
|
/// The main loop of a Processor, interacting with the Coordinator.
|
||||||
pub async fn main_loop<
|
pub async fn main_loop<
|
||||||
|
H: Hooks,
|
||||||
S: ScannerFeed,
|
S: ScannerFeed,
|
||||||
K: KeyGenParams<ExternalNetworkCiphersuite: Ciphersuite<G = KeyFor<S>>>,
|
K: KeyGenParams<ExternalNetworkCiphersuite: Ciphersuite<G = KeyFor<S>>>,
|
||||||
Sch: Clone
|
Sch: Clone
|
||||||
|
@ -183,6 +193,7 @@ pub async fn main_loop<
|
||||||
let db_clone = db.clone();
|
let db_clone = db.clone();
|
||||||
let mut txn = db.txn();
|
let mut txn = db.txn();
|
||||||
let msg = coordinator.next_message(&mut txn).await;
|
let msg = coordinator.next_message(&mut txn).await;
|
||||||
|
H::on_message(&mut txn, &msg);
|
||||||
let mut txn = Some(txn);
|
let mut txn = Some(txn);
|
||||||
match msg {
|
match msg {
|
||||||
messages::CoordinatorMessage::KeyGen(msg) => {
|
messages::CoordinatorMessage::KeyGen(msg) => {
|
||||||
|
|
|
@ -57,7 +57,7 @@ async fn main() {
|
||||||
tokio::spawn(TxIndexTask(feed.clone()).continually_run(index_task, vec![]));
|
tokio::spawn(TxIndexTask(feed.clone()).continually_run(index_task, vec![]));
|
||||||
core::mem::forget(index_handle);
|
core::mem::forget(index_handle);
|
||||||
|
|
||||||
bin::main_loop::<_, KeyGenParams, _>(db, feed.clone(), Scheduler::new(Planner), feed).await;
|
bin::main_loop::<(), _, KeyGenParams, _>(db, feed.clone(), Scheduler::new(Planner), feed).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -49,6 +49,7 @@ tokio = { version = "1", default-features = false, features = ["rt-multi-thread"
|
||||||
serai-env = { path = "../../common/env" }
|
serai-env = { path = "../../common/env" }
|
||||||
serai-db = { path = "../../common/db" }
|
serai-db = { path = "../../common/db" }
|
||||||
|
|
||||||
|
messages = { package = "serai-processor-messages", path = "../messages" }
|
||||||
key-gen = { package = "serai-processor-key-gen", path = "../key-gen" }
|
key-gen = { package = "serai-processor-key-gen", path = "../key-gen" }
|
||||||
|
|
||||||
primitives = { package = "serai-processor-primitives", path = "../primitives" }
|
primitives = { package = "serai-processor-primitives", path = "../primitives" }
|
||||||
|
|
|
@ -6,13 +6,6 @@ use std::{sync::Arc, io, collections::HashSet};
|
||||||
|
|
||||||
use group::ff::PrimeField;
|
use group::ff::PrimeField;
|
||||||
|
|
||||||
/*
|
|
||||||
use k256::{
|
|
||||||
elliptic_curve::{group::GroupEncoding, sec1},
|
|
||||||
ProjectivePoint,
|
|
||||||
};
|
|
||||||
*/
|
|
||||||
|
|
||||||
use alloy_core::primitives::{hex::FromHex, Address, U256, Bytes, TxKind};
|
use alloy_core::primitives::{hex::FromHex, Address, U256, Bytes, TxKind};
|
||||||
use alloy_consensus::TxLegacy;
|
use alloy_consensus::TxLegacy;
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,13 @@ use alloy_simple_request_transport::SimpleRequest;
|
||||||
use alloy_rpc_client::ClientBuilder;
|
use alloy_rpc_client::ClientBuilder;
|
||||||
use alloy_provider::{Provider, RootProvider};
|
use alloy_provider::{Provider, RootProvider};
|
||||||
|
|
||||||
|
use serai_client::validator_sets::primitives::Session;
|
||||||
|
|
||||||
use serai_env as env;
|
use serai_env as env;
|
||||||
|
use serai_db::{Get, DbTxn, create_db};
|
||||||
|
|
||||||
|
use ::primitives::EncodableG;
|
||||||
|
use ::key_gen::KeyGenParams as KeyGenParamsTrait;
|
||||||
|
|
||||||
mod primitives;
|
mod primitives;
|
||||||
pub(crate) use crate::primitives::*;
|
pub(crate) use crate::primitives::*;
|
||||||
|
@ -27,6 +33,28 @@ use scheduler::{SmartContract, Scheduler};
|
||||||
mod publisher;
|
mod publisher;
|
||||||
use publisher::TransactionPublisher;
|
use publisher::TransactionPublisher;
|
||||||
|
|
||||||
|
create_db! {
|
||||||
|
EthereumProcessor {
|
||||||
|
// The initial key for Serai on Ethereum
|
||||||
|
InitialSeraiKey: () -> EncodableG<k256::ProjectivePoint>,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SetInitialKey;
|
||||||
|
impl bin::Hooks for SetInitialKey {
|
||||||
|
fn on_message(txn: &mut impl DbTxn, msg: &messages::CoordinatorMessage) {
|
||||||
|
if let messages::CoordinatorMessage::Substrate(
|
||||||
|
messages::substrate::CoordinatorMessage::SetKeys { session, key_pair, .. },
|
||||||
|
) = msg
|
||||||
|
{
|
||||||
|
assert_eq!(*session, Session(0));
|
||||||
|
let key = KeyGenParams::decode_key(key_pair.1.as_ref())
|
||||||
|
.expect("invalid Ethereum key confirmed on Substrate");
|
||||||
|
InitialSeraiKey::set(txn, &EncodableG(key));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
let db = bin::init();
|
let db = bin::init();
|
||||||
|
@ -45,11 +73,11 @@ async fn main() {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
bin::main_loop::<_, KeyGenParams, _>(
|
bin::main_loop::<SetInitialKey, _, KeyGenParams, _>(
|
||||||
db,
|
db.clone(),
|
||||||
Rpc { provider: provider.clone() },
|
Rpc { provider: provider.clone() },
|
||||||
Scheduler::new(SmartContract { chain_id }),
|
Scheduler::new(SmartContract { chain_id }),
|
||||||
TransactionPublisher::new(provider, {
|
TransactionPublisher::new(db, provider, {
|
||||||
let relayer_hostname = env::var("ETHEREUM_RELAYER_HOSTNAME")
|
let relayer_hostname = env::var("ETHEREUM_RELAYER_HOSTNAME")
|
||||||
.expect("ethereum relayer hostname wasn't specified")
|
.expect("ethereum relayer hostname wasn't specified")
|
||||||
.to_string();
|
.to_string();
|
||||||
|
|
|
@ -6,7 +6,7 @@ use serai_client::networks::ethereum::Address;
|
||||||
|
|
||||||
use primitives::{ReceivedOutput, EventualityTracker};
|
use primitives::{ReceivedOutput, EventualityTracker};
|
||||||
|
|
||||||
use ethereum_router::Executed;
|
use ethereum_router::{InInstruction as EthereumInInstruction, Executed};
|
||||||
|
|
||||||
use crate::{output::Output, transaction::Eventuality};
|
use crate::{output::Output, transaction::Eventuality};
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@ impl primitives::BlockHeader for Epoch {
|
||||||
#[derive(Clone, PartialEq, Eq, Debug)]
|
#[derive(Clone, PartialEq, Eq, Debug)]
|
||||||
pub(crate) struct FullEpoch {
|
pub(crate) struct FullEpoch {
|
||||||
epoch: Epoch,
|
epoch: Epoch,
|
||||||
outputs: Vec<Output>,
|
instructions: Vec<EthereumInInstruction>,
|
||||||
executed: Vec<Executed>,
|
executed: Vec<Executed>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,7 +72,7 @@ impl primitives::Block for FullEpoch {
|
||||||
// Associate all outputs with the latest active key
|
// Associate all outputs with the latest active key
|
||||||
// We don't associate these with the current key within the SC as that'll cause outputs to be
|
// We don't associate these with the current key within the SC as that'll cause outputs to be
|
||||||
// marked for forwarding if the SC is delayed to actually rotate
|
// marked for forwarding if the SC is delayed to actually rotate
|
||||||
todo!("TODO")
|
self.instructions.iter().cloned().map(|instruction| Output { key, instruction }).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::type_complexity)]
|
#[allow(clippy::type_complexity)]
|
||||||
|
|
|
@ -13,22 +13,27 @@ use tokio::{
|
||||||
net::TcpStream,
|
net::TcpStream,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use serai_db::Db;
|
||||||
|
|
||||||
use ethereum_schnorr::PublicKey;
|
use ethereum_schnorr::PublicKey;
|
||||||
use ethereum_router::{OutInstructions, Router};
|
use ethereum_router::{OutInstructions, Router};
|
||||||
|
|
||||||
use crate::transaction::{Action, Transaction};
|
use crate::{
|
||||||
|
InitialSeraiKey,
|
||||||
|
transaction::{Action, Transaction},
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub(crate) struct TransactionPublisher {
|
pub(crate) struct TransactionPublisher<D: Db> {
|
||||||
initial_serai_key: PublicKey,
|
db: D,
|
||||||
rpc: Arc<RootProvider<SimpleRequest>>,
|
rpc: Arc<RootProvider<SimpleRequest>>,
|
||||||
router: Arc<RwLock<Option<Router>>>,
|
router: Arc<RwLock<Option<Router>>>,
|
||||||
relayer_url: String,
|
relayer_url: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TransactionPublisher {
|
impl<D: Db> TransactionPublisher<D> {
|
||||||
pub(crate) fn new(rpc: Arc<RootProvider<SimpleRequest>>, relayer_url: String) -> Self {
|
pub(crate) fn new(db: D, rpc: Arc<RootProvider<SimpleRequest>>, relayer_url: String) -> Self {
|
||||||
Self { initial_serai_key: todo!("TODO"), rpc, router: Arc::new(RwLock::new(None)), relayer_url }
|
Self { db, rpc, router: Arc::new(RwLock::new(None)), relayer_url }
|
||||||
}
|
}
|
||||||
|
|
||||||
// This will always return Ok(Some(_)) or Err(_), never Ok(None)
|
// This will always return Ok(Some(_)) or Err(_), never Ok(None)
|
||||||
|
@ -43,7 +48,12 @@ impl TransactionPublisher {
|
||||||
let mut router = self.router.write().await;
|
let mut router = self.router.write().await;
|
||||||
// Check again if it's None in case a different task already did this
|
// Check again if it's None in case a different task already did this
|
||||||
if router.is_none() {
|
if router.is_none() {
|
||||||
let Some(router_actual) = Router::new(self.rpc.clone(), &self.initial_serai_key).await?
|
let Some(router_actual) = Router::new(
|
||||||
|
self.rpc.clone(),
|
||||||
|
&PublicKey::new(InitialSeraiKey::get(&self.db).unwrap().0)
|
||||||
|
.expect("initial key used by Serai wasn't representable on Ethereum"),
|
||||||
|
)
|
||||||
|
.await?
|
||||||
else {
|
else {
|
||||||
Err(TransportErrorKind::Custom(
|
Err(TransportErrorKind::Custom(
|
||||||
"publishing transaction yet couldn't find router on chain. was our node reset?"
|
"publishing transaction yet couldn't find router on chain. was our node reset?"
|
||||||
|
@ -60,7 +70,7 @@ impl TransactionPublisher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl signers::TransactionPublisher<Transaction> for TransactionPublisher {
|
impl<D: Db> signers::TransactionPublisher<Transaction> for TransactionPublisher<D> {
|
||||||
type EphemeralError = RpcError<TransportErrorKind>;
|
type EphemeralError = RpcError<TransportErrorKind>;
|
||||||
|
|
||||||
fn publish(
|
fn publish(
|
||||||
|
|
|
@ -33,7 +33,7 @@ async fn main() {
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
bin::main_loop::<_, KeyGenParams, _>(
|
bin::main_loop::<(), _, KeyGenParams, _>(
|
||||||
db,
|
db,
|
||||||
feed.clone(),
|
feed.clone(),
|
||||||
Scheduler::new(Planner(feed.clone())),
|
Scheduler::new(Planner(feed.clone())),
|
||||||
|
|
Loading…
Reference in a new issue