From 855e53164ed878636506e65757feff6da28884ae Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 19 Sep 2024 02:41:07 -0400 Subject: [PATCH] Finish Ethereum ScannerFeed --- processor/ethereum/TODO/old_processor.rs | 219 --------------------- processor/ethereum/src/main.rs | 4 +- processor/ethereum/src/primitives/block.rs | 8 +- processor/ethereum/src/primitives/mod.rs | 2 + processor/ethereum/src/publisher.rs | 8 +- processor/ethereum/src/rpc.rs | 114 +++++++++-- processor/ethereum/src/scheduler.rs | 18 +- 7 files changed, 126 insertions(+), 247 deletions(-) diff --git a/processor/ethereum/TODO/old_processor.rs b/processor/ethereum/TODO/old_processor.rs index a8f55c79..2e2daa3e 100644 --- a/processor/ethereum/TODO/old_processor.rs +++ b/processor/ethereum/TODO/old_processor.rs @@ -1,146 +1,5 @@ -/* -#![cfg_attr(docsrs, feature(doc_auto_cfg))] -#![doc = include_str!("../README.md")] -#![deny(missing_docs)] - -use core::{fmt, time::Duration}; -use std::{ - sync::Arc, - collections::{HashSet, HashMap}, - io, -}; - -use async_trait::async_trait; - -use ciphersuite::{group::GroupEncoding, Ciphersuite, Secp256k1}; -use frost::ThresholdKeys; - -use ethereum_serai::{ - alloy::{ - primitives::U256, - rpc_types::{BlockTransactionsKind, BlockNumberOrTag, Transaction}, - simple_request_transport::SimpleRequest, - rpc_client::ClientBuilder, - provider::{Provider, RootProvider}, - }, - crypto::{PublicKey, Signature}, - erc20::Erc20, - deployer::Deployer, - router::{Router, Coin as EthereumCoin, InInstruction as EthereumInInstruction}, - machine::*, -}; -#[cfg(test)] -use ethereum_serai::alloy::primitives::B256; - -use tokio::{ - time::sleep, - sync::{RwLock, RwLockReadGuard}, -}; -#[cfg(not(test))] -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - net::TcpStream, -}; - -use serai_client::{ - primitives::{Coin, Amount, Balance, NetworkId}, - validator_sets::primitives::Session, -}; - -use crate::{ - Db, Payment, - networks::{ - OutputType, Output, Transaction as TransactionTrait, SignableTransaction, Block, - Eventuality as EventualityTrait, EventualitiesTracker, NetworkError, Network, - }, - key_gen::NetworkKeyDb, - multisigs::scheduler::{ - Scheduler as SchedulerTrait, - smart_contract::{Addendum, Scheduler}, - }, -}; - -#[derive(Clone)] -pub struct Ethereum { - // This DB is solely used to access the first key generated, as needed to determine the Router's - // address. Accordingly, all methods present are consistent to a Serai chain with a finalized - // first key (regardless of local state), and this is safe. - db: D, - #[cfg_attr(test, allow(unused))] - relayer_url: String, - provider: Arc>, - deployer: Deployer, - router: Arc>>, -} -impl Ethereum { - pub async fn new(db: D, daemon_url: String, relayer_url: String) -> Self { - let provider = Arc::new(RootProvider::new( - ClientBuilder::default().transport(SimpleRequest::new(daemon_url), true), - )); - - let mut deployer = Deployer::new(provider.clone()).await; - while !matches!(deployer, Ok(Some(_))) { - log::error!("Deployer wasn't deployed yet or networking error"); - sleep(Duration::from_secs(5)).await; - deployer = Deployer::new(provider.clone()).await; - } - let deployer = deployer.unwrap().unwrap(); - - dbg!(&relayer_url); - dbg!(relayer_url.len()); - Ethereum { db, relayer_url, provider, deployer, router: Arc::new(RwLock::new(None)) } - } - - // Obtain a reference to the Router, sleeping until it's deployed if it hasn't already been. - // This is guaranteed to return Some. - pub async fn router(&self) -> RwLockReadGuard<'_, Option> { - // If we've already instantiated the Router, return a read reference - { - let router = self.router.read().await; - if router.is_some() { - return router; - } - } - - // Instantiate it - let mut router = self.router.write().await; - // If another attempt beat us to it, return - if router.is_some() { - drop(router); - return self.router.read().await; - } - - // Get the first key from the DB - let first_key = - NetworkKeyDb::get(&self.db, Session(0)).expect("getting outputs before confirming a key"); - let key = Secp256k1::read_G(&mut first_key.as_slice()).unwrap(); - let public_key = PublicKey::new(key).unwrap(); - - // Find the router - let mut found = self.deployer.find_router(self.provider.clone(), &public_key).await; - while !matches!(found, Ok(Some(_))) { - log::error!("Router wasn't deployed yet or networking error"); - sleep(Duration::from_secs(5)).await; - found = self.deployer.find_router(self.provider.clone(), &public_key).await; - } - - // Set it - *router = Some(found.unwrap().unwrap()); - - // Downgrade to a read lock - // Explicitly doesn't use `downgrade` so that another pending write txn can realize it's no - // longer necessary - drop(router); - self.router.read().await - } -} - #[async_trait] impl Network for Ethereum { - const DUST: u64 = 0; // TODO - - const COST_TO_AGGREGATE: u64 = 0; - async fn get_outputs( &self, block: &Self::Block, @@ -220,66 +79,6 @@ impl Network for Ethereum { all_events } - async fn get_eventuality_completions( - &self, - eventualities: &mut EventualitiesTracker, - block: &Self::Block, - ) -> HashMap< - [u8; 32], - ( - usize, - >::Id, - ::Completion, - ), - > { - let mut res = HashMap::new(); - if eventualities.map.is_empty() { - return res; - } - - let router = self.router().await; - let router = router.as_ref().unwrap(); - - let past_scanned_epoch = loop { - match self.get_block(eventualities.block_number).await { - Ok(block) => break block, - Err(e) => log::error!("couldn't get the last scanned block in the tracker: {}", e), - } - sleep(Duration::from_secs(10)).await; - }; - assert_eq!( - past_scanned_epoch.start / 32, - u64::try_from(eventualities.block_number).unwrap(), - "assumption of tracker block number's relation to epoch start is incorrect" - ); - - // Iterate from after the epoch number in the tracker to the end of this epoch - for block_num in (past_scanned_epoch.end() + 1) ..= block.end() { - let executed = loop { - match router.executed_commands(block_num).await { - Ok(executed) => break executed, - Err(e) => log::error!("couldn't get the executed commands in block {block_num}: {e}"), - } - sleep(Duration::from_secs(10)).await; - }; - - for executed in executed { - let lookup = executed.nonce.to_le_bytes().to_vec(); - if let Some((plan_id, eventuality)) = eventualities.map.get(&lookup) { - if let Some(command) = - SignedRouterCommand::new(&eventuality.0, eventuality.1.clone(), &executed.signature) - { - res.insert(*plan_id, (block_num.try_into().unwrap(), executed.tx_id, command)); - eventualities.map.remove(&lookup); - } - } - } - } - eventualities.block_number = (block.start / 32).try_into().unwrap(); - - res - } - async fn publish_completion( &self, completion: &::Completion, @@ -333,14 +132,6 @@ impl Network for Ethereum { } } - async fn confirm_completion( - &self, - eventuality: &Self::Eventuality, - claim: &::Claim, - ) -> Result::Completion>, NetworkError> { - Ok(SignedRouterCommand::new(&eventuality.0, eventuality.1.clone(), &claim.signature)) - } - #[cfg(test)] async fn get_block_number(&self, id: &>::Id) -> usize { self @@ -355,15 +146,6 @@ impl Network for Ethereum { .unwrap() } - #[cfg(test)] - async fn check_eventuality_by_claim( - &self, - eventuality: &Self::Eventuality, - claim: &::Claim, - ) -> bool { - SignedRouterCommand::new(&eventuality.0, eventuality.1.clone(), &claim.signature).is_some() - } - #[cfg(test)] async fn get_transaction_by_eventuality( &self, @@ -474,4 +256,3 @@ impl Network for Ethereum { self.get_block(self.get_latest_block_number().await.unwrap()).await.unwrap() } } -*/ diff --git a/processor/ethereum/src/main.rs b/processor/ethereum/src/main.rs index bfb9a8df..7acdffdb 100644 --- a/processor/ethereum/src/main.rs +++ b/processor/ethereum/src/main.rs @@ -75,8 +75,8 @@ async fn main() { bin::main_loop::( db.clone(), - Rpc { provider: provider.clone() }, - Scheduler::new(SmartContract { chain_id }), + Rpc { db: db.clone(), provider: provider.clone() }, + Scheduler::::new(SmartContract { chain_id }), TransactionPublisher::new(db, provider, { let relayer_hostname = env::var("ETHEREUM_RELAYER_HOSTNAME") .expect("ethereum relayer hostname wasn't specified") diff --git a/processor/ethereum/src/primitives/block.rs b/processor/ethereum/src/primitives/block.rs index cd26b400..d5f0cb99 100644 --- a/processor/ethereum/src/primitives/block.rs +++ b/processor/ethereum/src/primitives/block.rs @@ -20,8 +20,6 @@ pub(crate) struct Epoch { pub(crate) start: u64, // The hash of the last block within this Epoch. pub(crate) end_hash: [u8; 32], - // The monotonic time for this Epoch. - pub(crate) time: u64, } impl Epoch { @@ -42,9 +40,9 @@ impl primitives::BlockHeader for Epoch { #[derive(Clone, PartialEq, Eq, Debug)] pub(crate) struct FullEpoch { - epoch: Epoch, - instructions: Vec, - executed: Vec, + pub(crate) epoch: Epoch, + pub(crate) instructions: Vec, + pub(crate) executed: Vec, } impl primitives::Block for FullEpoch { diff --git a/processor/ethereum/src/primitives/mod.rs b/processor/ethereum/src/primitives/mod.rs index f0d31802..00a5980f 100644 --- a/processor/ethereum/src/primitives/mod.rs +++ b/processor/ethereum/src/primitives/mod.rs @@ -8,3 +8,5 @@ pub(crate) const DAI: [u8; 20] = Ok(res) => res, Err(_) => panic!("invalid non-test DAI hex address"), }; + +pub(crate) const TOKENS: [[u8; 20]; 1] = [DAI]; diff --git a/processor/ethereum/src/publisher.rs b/processor/ethereum/src/publisher.rs index 5a7a958a..4a62bad7 100644 --- a/processor/ethereum/src/publisher.rs +++ b/processor/ethereum/src/publisher.rs @@ -50,8 +50,12 @@ impl TransactionPublisher { if router.is_none() { let Some(router_actual) = Router::new( self.rpc.clone(), - &PublicKey::new(InitialSeraiKey::get(&self.db).unwrap().0) - .expect("initial key used by Serai wasn't representable on Ethereum"), + &PublicKey::new( + InitialSeraiKey::get(&self.db) + .expect("publishing a transaction yet never confirmed a key") + .0, + ) + .expect("initial key used by Serai wasn't representable on Ethereum"), ) .await? else { diff --git a/processor/ethereum/src/rpc.rs b/processor/ethereum/src/rpc.rs index e3f25f86..a53e6b33 100644 --- a/processor/ethereum/src/rpc.rs +++ b/processor/ethereum/src/rpc.rs @@ -1,6 +1,7 @@ use core::future::Future; -use std::sync::Arc; +use std::{sync::Arc, collections::HashSet}; +use alloy_core::primitives::B256; use alloy_rpc_types_eth::{BlockTransactionsKind, BlockNumberOrTag}; use alloy_transport::{RpcError, TransportErrorKind}; use alloy_simple_request_transport::SimpleRequest; @@ -8,16 +9,26 @@ use alloy_provider::{Provider, RootProvider}; use serai_client::primitives::{NetworkId, Coin, Amount}; +use serai_db::Db; + use scanner::ScannerFeed; -use crate::block::{Epoch, FullEpoch}; +use ethereum_schnorr::PublicKey; +use ethereum_erc20::{TopLevelTransfer, Erc20}; +use ethereum_router::{Coin as EthereumCoin, InInstruction as EthereumInInstruction, Router}; + +use crate::{ + TOKENS, InitialSeraiKey, + block::{Epoch, FullEpoch}, +}; #[derive(Clone)] -pub(crate) struct Rpc { +pub(crate) struct Rpc { + pub(crate) db: D, pub(crate) provider: Arc>, } -impl ScannerFeed for Rpc { +impl ScannerFeed for Rpc { const NETWORK: NetworkId = NetworkId::Ethereum; // We only need one confirmation as Ethereum properly finalizes @@ -62,7 +73,22 @@ impl ScannerFeed for Rpc { &self, number: u64, ) -> impl Send + Future> { - async move { todo!("TODO") } + async move { + let header = self + .provider + .get_block(BlockNumberOrTag::Number(number).into(), BlockTransactionsKind::Hashes) + .await? + .ok_or_else(|| { + TransportErrorKind::Custom( + "asked for time of a block our node doesn't have".to_string().into(), + ) + })? + .header; + // This is monotonic ever since the merge + // https://github.com/ethereum/consensus-specs/blob/4afe39822c9ad9747e0f5635cca117c18441ec1b + // /specs/bellatrix/beacon-chain.md?plain=1#L393-L394 + Ok(header.timestamp) + } } fn unchecked_block_header_by_number( @@ -104,25 +130,91 @@ impl ScannerFeed for Rpc { .header; let end_hash = end_header.hash.into(); - let time = end_header.timestamp; - Ok(Epoch { prior_end_hash, start, end_hash, time }) + Ok(Epoch { prior_end_hash, start, end_hash }) } } - #[rustfmt::skip] // It wants to improperly format the `async move` to a single line fn unchecked_block_by_number( &self, number: u64, ) -> impl Send + Future> { async move { - todo!("TODO") + let epoch = self.unchecked_block_header_by_number(number).await?; + let mut instructions = vec![]; + let mut executed = vec![]; + + let Some(router) = Router::new( + self.provider.clone(), + &PublicKey::new( + InitialSeraiKey::get(&self.db).expect("fetching a block yet never confirmed a key").0, + ) + .expect("initial key used by Serai wasn't representable on Ethereum"), + ) + .await? + else { + // The Router wasn't deployed yet so we cannot have any on-chain interactions + // If the Router has been deployed by the block we've synced to, it won't have any events + // for these blocks anways, so this doesn't risk a consensus split + // TODO: This does as we can have top-level transfers to the router before it's deployed + return Ok(FullEpoch { epoch, instructions, executed }); + }; + + let mut to_check = epoch.end_hash; + while to_check != epoch.prior_end_hash { + let to_check_block = self + .provider + .get_block(B256::from(to_check).into(), BlockTransactionsKind::Hashes) + .await? + .ok_or_else(|| { + TransportErrorKind::Custom( + format!( + "ethereum node didn't have requested block: {}. was the node reset?", + hex::encode(to_check) + ) + .into(), + ) + })? + .header; + + instructions.append( + &mut router.in_instructions(to_check_block.number, &HashSet::from(TOKENS)).await?, + ); + for token in TOKENS { + for TopLevelTransfer { id, from, amount, data } in + Erc20::new(self.provider.clone(), token) + .top_level_transfers(to_check_block.number, router.address()) + .await? + { + instructions.push(EthereumInInstruction { + id: (id, u64::MAX), + from, + coin: EthereumCoin::Erc20(token), + amount, + data, + }); + } + } + + executed.append(&mut router.executed(to_check_block.number).await?); + + to_check = *to_check_block.parent_hash; + } + + Ok(FullEpoch { epoch, instructions, executed }) } } fn dust(coin: Coin) -> Amount { assert_eq!(coin.network(), NetworkId::Ethereum); - todo!("TODO") + #[allow(clippy::inconsistent_digit_grouping)] + match coin { + // 5 USD if Ether is ~3300 USD + Coin::Ether => Amount(1_500_00), + // 5 DAI + Coin::Dai => Amount(5_000_000_00), + _ => unreachable!(), + } } fn cost_to_aggregate( @@ -132,7 +224,7 @@ impl ScannerFeed for Rpc { ) -> impl Send + Future> { async move { assert_eq!(coin.network(), NetworkId::Ethereum); - // TODO + // There is no cost to aggregate as we receive to an account Ok(Amount(0)) } } diff --git a/processor/ethereum/src/scheduler.rs b/processor/ethereum/src/scheduler.rs index 6683eeac..39f3fed3 100644 --- a/processor/ethereum/src/scheduler.rs +++ b/processor/ethereum/src/scheduler.rs @@ -2,6 +2,8 @@ use alloy_core::primitives::U256; use serai_client::primitives::{NetworkId, Coin, Balance}; +use serai_db::Db; + use primitives::Payment; use scanner::{KeyFor, AddressFor, EventualityFor}; @@ -32,15 +34,15 @@ fn balance_to_ethereum_amount(balance: Balance) -> U256 { pub(crate) struct SmartContract { pub(crate) chain_id: U256, } -impl smart_contract_scheduler::SmartContract for SmartContract { +impl smart_contract_scheduler::SmartContract> for SmartContract { type SignableTransaction = Action; fn rotate( &self, nonce: u64, - retiring_key: KeyFor, - new_key: KeyFor, - ) -> (Self::SignableTransaction, EventualityFor) { + retiring_key: KeyFor>, + new_key: KeyFor>, + ) -> (Self::SignableTransaction, EventualityFor>) { let action = Action::SetKey { chain_id: self.chain_id, nonce, @@ -52,9 +54,9 @@ impl smart_contract_scheduler::SmartContract for SmartContract { fn fulfill( &self, nonce: u64, - key: KeyFor, - payments: Vec>>, - ) -> Vec<(Self::SignableTransaction, EventualityFor)> { + key: KeyFor>, + payments: Vec>>>, + ) -> Vec<(Self::SignableTransaction, EventualityFor>)> { let mut outs = Vec::with_capacity(payments.len()); for payment in payments { outs.push(( @@ -75,4 +77,4 @@ impl smart_contract_scheduler::SmartContract for SmartContract { } } -pub(crate) type Scheduler = smart_contract_scheduler::Scheduler; +pub(crate) type Scheduler = smart_contract_scheduler::Scheduler, SmartContract>;