Finish Ethereum ScannerFeed

This commit is contained in:
Luke Parker 2024-09-19 02:41:07 -04:00
parent 1367e41510
commit 855e53164e
7 changed files with 126 additions and 247 deletions

View file

@ -1,146 +1,5 @@
/*
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![doc = include_str!("../README.md")]
#![deny(missing_docs)]
use core::{fmt, time::Duration};
use std::{
sync::Arc,
collections::{HashSet, HashMap},
io,
};
use async_trait::async_trait;
use ciphersuite::{group::GroupEncoding, Ciphersuite, Secp256k1};
use frost::ThresholdKeys;
use ethereum_serai::{
alloy::{
primitives::U256,
rpc_types::{BlockTransactionsKind, BlockNumberOrTag, Transaction},
simple_request_transport::SimpleRequest,
rpc_client::ClientBuilder,
provider::{Provider, RootProvider},
},
crypto::{PublicKey, Signature},
erc20::Erc20,
deployer::Deployer,
router::{Router, Coin as EthereumCoin, InInstruction as EthereumInInstruction},
machine::*,
};
#[cfg(test)]
use ethereum_serai::alloy::primitives::B256;
use tokio::{
time::sleep,
sync::{RwLock, RwLockReadGuard},
};
#[cfg(not(test))]
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
use serai_client::{
primitives::{Coin, Amount, Balance, NetworkId},
validator_sets::primitives::Session,
};
use crate::{
Db, Payment,
networks::{
OutputType, Output, Transaction as TransactionTrait, SignableTransaction, Block,
Eventuality as EventualityTrait, EventualitiesTracker, NetworkError, Network,
},
key_gen::NetworkKeyDb,
multisigs::scheduler::{
Scheduler as SchedulerTrait,
smart_contract::{Addendum, Scheduler},
},
};
#[derive(Clone)]
pub struct Ethereum<D: Db> {
// This DB is solely used to access the first key generated, as needed to determine the Router's
// address. Accordingly, all methods present are consistent to a Serai chain with a finalized
// first key (regardless of local state), and this is safe.
db: D,
#[cfg_attr(test, allow(unused))]
relayer_url: String,
provider: Arc<RootProvider<SimpleRequest>>,
deployer: Deployer,
router: Arc<RwLock<Option<Router>>>,
}
impl<D: Db> Ethereum<D> {
pub async fn new(db: D, daemon_url: String, relayer_url: String) -> Self {
let provider = Arc::new(RootProvider::new(
ClientBuilder::default().transport(SimpleRequest::new(daemon_url), true),
));
let mut deployer = Deployer::new(provider.clone()).await;
while !matches!(deployer, Ok(Some(_))) {
log::error!("Deployer wasn't deployed yet or networking error");
sleep(Duration::from_secs(5)).await;
deployer = Deployer::new(provider.clone()).await;
}
let deployer = deployer.unwrap().unwrap();
dbg!(&relayer_url);
dbg!(relayer_url.len());
Ethereum { db, relayer_url, provider, deployer, router: Arc::new(RwLock::new(None)) }
}
// Obtain a reference to the Router, sleeping until it's deployed if it hasn't already been.
// This is guaranteed to return Some.
pub async fn router(&self) -> RwLockReadGuard<'_, Option<Router>> {
// If we've already instantiated the Router, return a read reference
{
let router = self.router.read().await;
if router.is_some() {
return router;
}
}
// Instantiate it
let mut router = self.router.write().await;
// If another attempt beat us to it, return
if router.is_some() {
drop(router);
return self.router.read().await;
}
// Get the first key from the DB
let first_key =
NetworkKeyDb::get(&self.db, Session(0)).expect("getting outputs before confirming a key");
let key = Secp256k1::read_G(&mut first_key.as_slice()).unwrap();
let public_key = PublicKey::new(key).unwrap();
// Find the router
let mut found = self.deployer.find_router(self.provider.clone(), &public_key).await;
while !matches!(found, Ok(Some(_))) {
log::error!("Router wasn't deployed yet or networking error");
sleep(Duration::from_secs(5)).await;
found = self.deployer.find_router(self.provider.clone(), &public_key).await;
}
// Set it
*router = Some(found.unwrap().unwrap());
// Downgrade to a read lock
// Explicitly doesn't use `downgrade` so that another pending write txn can realize it's no
// longer necessary
drop(router);
self.router.read().await
}
}
#[async_trait] #[async_trait]
impl<D: Db> Network for Ethereum<D> { impl<D: Db> Network for Ethereum<D> {
const DUST: u64 = 0; // TODO
const COST_TO_AGGREGATE: u64 = 0;
async fn get_outputs( async fn get_outputs(
&self, &self,
block: &Self::Block, block: &Self::Block,
@ -220,66 +79,6 @@ impl<D: Db> Network for Ethereum<D> {
all_events all_events
} }
async fn get_eventuality_completions(
&self,
eventualities: &mut EventualitiesTracker<Self::Eventuality>,
block: &Self::Block,
) -> HashMap<
[u8; 32],
(
usize,
<Self::Transaction as TransactionTrait<Self>>::Id,
<Self::Eventuality as EventualityTrait>::Completion,
),
> {
let mut res = HashMap::new();
if eventualities.map.is_empty() {
return res;
}
let router = self.router().await;
let router = router.as_ref().unwrap();
let past_scanned_epoch = loop {
match self.get_block(eventualities.block_number).await {
Ok(block) => break block,
Err(e) => log::error!("couldn't get the last scanned block in the tracker: {}", e),
}
sleep(Duration::from_secs(10)).await;
};
assert_eq!(
past_scanned_epoch.start / 32,
u64::try_from(eventualities.block_number).unwrap(),
"assumption of tracker block number's relation to epoch start is incorrect"
);
// Iterate from after the epoch number in the tracker to the end of this epoch
for block_num in (past_scanned_epoch.end() + 1) ..= block.end() {
let executed = loop {
match router.executed_commands(block_num).await {
Ok(executed) => break executed,
Err(e) => log::error!("couldn't get the executed commands in block {block_num}: {e}"),
}
sleep(Duration::from_secs(10)).await;
};
for executed in executed {
let lookup = executed.nonce.to_le_bytes().to_vec();
if let Some((plan_id, eventuality)) = eventualities.map.get(&lookup) {
if let Some(command) =
SignedRouterCommand::new(&eventuality.0, eventuality.1.clone(), &executed.signature)
{
res.insert(*plan_id, (block_num.try_into().unwrap(), executed.tx_id, command));
eventualities.map.remove(&lookup);
}
}
}
}
eventualities.block_number = (block.start / 32).try_into().unwrap();
res
}
async fn publish_completion( async fn publish_completion(
&self, &self,
completion: &<Self::Eventuality as EventualityTrait>::Completion, completion: &<Self::Eventuality as EventualityTrait>::Completion,
@ -333,14 +132,6 @@ impl<D: Db> Network for Ethereum<D> {
} }
} }
async fn confirm_completion(
&self,
eventuality: &Self::Eventuality,
claim: &<Self::Eventuality as EventualityTrait>::Claim,
) -> Result<Option<<Self::Eventuality as EventualityTrait>::Completion>, NetworkError> {
Ok(SignedRouterCommand::new(&eventuality.0, eventuality.1.clone(), &claim.signature))
}
#[cfg(test)] #[cfg(test)]
async fn get_block_number(&self, id: &<Self::Block as Block<Self>>::Id) -> usize { async fn get_block_number(&self, id: &<Self::Block as Block<Self>>::Id) -> usize {
self self
@ -355,15 +146,6 @@ impl<D: Db> Network for Ethereum<D> {
.unwrap() .unwrap()
} }
#[cfg(test)]
async fn check_eventuality_by_claim(
&self,
eventuality: &Self::Eventuality,
claim: &<Self::Eventuality as EventualityTrait>::Claim,
) -> bool {
SignedRouterCommand::new(&eventuality.0, eventuality.1.clone(), &claim.signature).is_some()
}
#[cfg(test)] #[cfg(test)]
async fn get_transaction_by_eventuality( async fn get_transaction_by_eventuality(
&self, &self,
@ -474,4 +256,3 @@ impl<D: Db> Network for Ethereum<D> {
self.get_block(self.get_latest_block_number().await.unwrap()).await.unwrap() self.get_block(self.get_latest_block_number().await.unwrap()).await.unwrap()
} }
} }
*/

View file

@ -75,8 +75,8 @@ async fn main() {
bin::main_loop::<SetInitialKey, _, KeyGenParams, _>( bin::main_loop::<SetInitialKey, _, KeyGenParams, _>(
db.clone(), db.clone(),
Rpc { provider: provider.clone() }, Rpc { db: db.clone(), provider: provider.clone() },
Scheduler::new(SmartContract { chain_id }), Scheduler::<bin::Db>::new(SmartContract { chain_id }),
TransactionPublisher::new(db, provider, { TransactionPublisher::new(db, provider, {
let relayer_hostname = env::var("ETHEREUM_RELAYER_HOSTNAME") let relayer_hostname = env::var("ETHEREUM_RELAYER_HOSTNAME")
.expect("ethereum relayer hostname wasn't specified") .expect("ethereum relayer hostname wasn't specified")

View file

@ -20,8 +20,6 @@ pub(crate) struct Epoch {
pub(crate) start: u64, pub(crate) start: u64,
// The hash of the last block within this Epoch. // The hash of the last block within this Epoch.
pub(crate) end_hash: [u8; 32], pub(crate) end_hash: [u8; 32],
// The monotonic time for this Epoch.
pub(crate) time: u64,
} }
impl Epoch { impl Epoch {
@ -42,9 +40,9 @@ impl primitives::BlockHeader for Epoch {
#[derive(Clone, PartialEq, Eq, Debug)] #[derive(Clone, PartialEq, Eq, Debug)]
pub(crate) struct FullEpoch { pub(crate) struct FullEpoch {
epoch: Epoch, pub(crate) epoch: Epoch,
instructions: Vec<EthereumInInstruction>, pub(crate) instructions: Vec<EthereumInInstruction>,
executed: Vec<Executed>, pub(crate) executed: Vec<Executed>,
} }
impl primitives::Block for FullEpoch { impl primitives::Block for FullEpoch {

View file

@ -8,3 +8,5 @@ pub(crate) const DAI: [u8; 20] =
Ok(res) => res, Ok(res) => res,
Err(_) => panic!("invalid non-test DAI hex address"), Err(_) => panic!("invalid non-test DAI hex address"),
}; };
pub(crate) const TOKENS: [[u8; 20]; 1] = [DAI];

View file

@ -50,7 +50,11 @@ impl<D: Db> TransactionPublisher<D> {
if router.is_none() { if router.is_none() {
let Some(router_actual) = Router::new( let Some(router_actual) = Router::new(
self.rpc.clone(), self.rpc.clone(),
&PublicKey::new(InitialSeraiKey::get(&self.db).unwrap().0) &PublicKey::new(
InitialSeraiKey::get(&self.db)
.expect("publishing a transaction yet never confirmed a key")
.0,
)
.expect("initial key used by Serai wasn't representable on Ethereum"), .expect("initial key used by Serai wasn't representable on Ethereum"),
) )
.await? .await?

View file

@ -1,6 +1,7 @@
use core::future::Future; use core::future::Future;
use std::sync::Arc; use std::{sync::Arc, collections::HashSet};
use alloy_core::primitives::B256;
use alloy_rpc_types_eth::{BlockTransactionsKind, BlockNumberOrTag}; use alloy_rpc_types_eth::{BlockTransactionsKind, BlockNumberOrTag};
use alloy_transport::{RpcError, TransportErrorKind}; use alloy_transport::{RpcError, TransportErrorKind};
use alloy_simple_request_transport::SimpleRequest; use alloy_simple_request_transport::SimpleRequest;
@ -8,16 +9,26 @@ use alloy_provider::{Provider, RootProvider};
use serai_client::primitives::{NetworkId, Coin, Amount}; use serai_client::primitives::{NetworkId, Coin, Amount};
use serai_db::Db;
use scanner::ScannerFeed; use scanner::ScannerFeed;
use crate::block::{Epoch, FullEpoch}; use ethereum_schnorr::PublicKey;
use ethereum_erc20::{TopLevelTransfer, Erc20};
use ethereum_router::{Coin as EthereumCoin, InInstruction as EthereumInInstruction, Router};
use crate::{
TOKENS, InitialSeraiKey,
block::{Epoch, FullEpoch},
};
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct Rpc { pub(crate) struct Rpc<D: Db> {
pub(crate) db: D,
pub(crate) provider: Arc<RootProvider<SimpleRequest>>, pub(crate) provider: Arc<RootProvider<SimpleRequest>>,
} }
impl ScannerFeed for Rpc { impl<D: Db> ScannerFeed for Rpc<D> {
const NETWORK: NetworkId = NetworkId::Ethereum; const NETWORK: NetworkId = NetworkId::Ethereum;
// We only need one confirmation as Ethereum properly finalizes // We only need one confirmation as Ethereum properly finalizes
@ -62,7 +73,22 @@ impl ScannerFeed for Rpc {
&self, &self,
number: u64, number: u64,
) -> impl Send + Future<Output = Result<u64, Self::EphemeralError>> { ) -> impl Send + Future<Output = Result<u64, Self::EphemeralError>> {
async move { todo!("TODO") } async move {
let header = self
.provider
.get_block(BlockNumberOrTag::Number(number).into(), BlockTransactionsKind::Hashes)
.await?
.ok_or_else(|| {
TransportErrorKind::Custom(
"asked for time of a block our node doesn't have".to_string().into(),
)
})?
.header;
// This is monotonic ever since the merge
// https://github.com/ethereum/consensus-specs/blob/4afe39822c9ad9747e0f5635cca117c18441ec1b
// /specs/bellatrix/beacon-chain.md?plain=1#L393-L394
Ok(header.timestamp)
}
} }
fn unchecked_block_header_by_number( fn unchecked_block_header_by_number(
@ -104,25 +130,91 @@ impl ScannerFeed for Rpc {
.header; .header;
let end_hash = end_header.hash.into(); let end_hash = end_header.hash.into();
let time = end_header.timestamp;
Ok(Epoch { prior_end_hash, start, end_hash, time }) Ok(Epoch { prior_end_hash, start, end_hash })
} }
} }
#[rustfmt::skip] // It wants to improperly format the `async move` to a single line
fn unchecked_block_by_number( fn unchecked_block_by_number(
&self, &self,
number: u64, number: u64,
) -> impl Send + Future<Output = Result<Self::Block, Self::EphemeralError>> { ) -> impl Send + Future<Output = Result<Self::Block, Self::EphemeralError>> {
async move { async move {
todo!("TODO") let epoch = self.unchecked_block_header_by_number(number).await?;
let mut instructions = vec![];
let mut executed = vec![];
let Some(router) = Router::new(
self.provider.clone(),
&PublicKey::new(
InitialSeraiKey::get(&self.db).expect("fetching a block yet never confirmed a key").0,
)
.expect("initial key used by Serai wasn't representable on Ethereum"),
)
.await?
else {
// The Router wasn't deployed yet so we cannot have any on-chain interactions
// If the Router has been deployed by the block we've synced to, it won't have any events
// for these blocks anways, so this doesn't risk a consensus split
// TODO: This does as we can have top-level transfers to the router before it's deployed
return Ok(FullEpoch { epoch, instructions, executed });
};
let mut to_check = epoch.end_hash;
while to_check != epoch.prior_end_hash {
let to_check_block = self
.provider
.get_block(B256::from(to_check).into(), BlockTransactionsKind::Hashes)
.await?
.ok_or_else(|| {
TransportErrorKind::Custom(
format!(
"ethereum node didn't have requested block: {}. was the node reset?",
hex::encode(to_check)
)
.into(),
)
})?
.header;
instructions.append(
&mut router.in_instructions(to_check_block.number, &HashSet::from(TOKENS)).await?,
);
for token in TOKENS {
for TopLevelTransfer { id, from, amount, data } in
Erc20::new(self.provider.clone(), token)
.top_level_transfers(to_check_block.number, router.address())
.await?
{
instructions.push(EthereumInInstruction {
id: (id, u64::MAX),
from,
coin: EthereumCoin::Erc20(token),
amount,
data,
});
}
}
executed.append(&mut router.executed(to_check_block.number).await?);
to_check = *to_check_block.parent_hash;
}
Ok(FullEpoch { epoch, instructions, executed })
} }
} }
fn dust(coin: Coin) -> Amount { fn dust(coin: Coin) -> Amount {
assert_eq!(coin.network(), NetworkId::Ethereum); assert_eq!(coin.network(), NetworkId::Ethereum);
todo!("TODO") #[allow(clippy::inconsistent_digit_grouping)]
match coin {
// 5 USD if Ether is ~3300 USD
Coin::Ether => Amount(1_500_00),
// 5 DAI
Coin::Dai => Amount(5_000_000_00),
_ => unreachable!(),
}
} }
fn cost_to_aggregate( fn cost_to_aggregate(
@ -132,7 +224,7 @@ impl ScannerFeed for Rpc {
) -> impl Send + Future<Output = Result<Amount, Self::EphemeralError>> { ) -> impl Send + Future<Output = Result<Amount, Self::EphemeralError>> {
async move { async move {
assert_eq!(coin.network(), NetworkId::Ethereum); assert_eq!(coin.network(), NetworkId::Ethereum);
// TODO // There is no cost to aggregate as we receive to an account
Ok(Amount(0)) Ok(Amount(0))
} }
} }

View file

@ -2,6 +2,8 @@ use alloy_core::primitives::U256;
use serai_client::primitives::{NetworkId, Coin, Balance}; use serai_client::primitives::{NetworkId, Coin, Balance};
use serai_db::Db;
use primitives::Payment; use primitives::Payment;
use scanner::{KeyFor, AddressFor, EventualityFor}; use scanner::{KeyFor, AddressFor, EventualityFor};
@ -32,15 +34,15 @@ fn balance_to_ethereum_amount(balance: Balance) -> U256 {
pub(crate) struct SmartContract { pub(crate) struct SmartContract {
pub(crate) chain_id: U256, pub(crate) chain_id: U256,
} }
impl smart_contract_scheduler::SmartContract<Rpc> for SmartContract { impl<D: Db> smart_contract_scheduler::SmartContract<Rpc<D>> for SmartContract {
type SignableTransaction = Action; type SignableTransaction = Action;
fn rotate( fn rotate(
&self, &self,
nonce: u64, nonce: u64,
retiring_key: KeyFor<Rpc>, retiring_key: KeyFor<Rpc<D>>,
new_key: KeyFor<Rpc>, new_key: KeyFor<Rpc<D>>,
) -> (Self::SignableTransaction, EventualityFor<Rpc>) { ) -> (Self::SignableTransaction, EventualityFor<Rpc<D>>) {
let action = Action::SetKey { let action = Action::SetKey {
chain_id: self.chain_id, chain_id: self.chain_id,
nonce, nonce,
@ -52,9 +54,9 @@ impl smart_contract_scheduler::SmartContract<Rpc> for SmartContract {
fn fulfill( fn fulfill(
&self, &self,
nonce: u64, nonce: u64,
key: KeyFor<Rpc>, key: KeyFor<Rpc<D>>,
payments: Vec<Payment<AddressFor<Rpc>>>, payments: Vec<Payment<AddressFor<Rpc<D>>>>,
) -> Vec<(Self::SignableTransaction, EventualityFor<Rpc>)> { ) -> Vec<(Self::SignableTransaction, EventualityFor<Rpc<D>>)> {
let mut outs = Vec::with_capacity(payments.len()); let mut outs = Vec::with_capacity(payments.len());
for payment in payments { for payment in payments {
outs.push(( outs.push((
@ -75,4 +77,4 @@ impl smart_contract_scheduler::SmartContract<Rpc> for SmartContract {
} }
} }
pub(crate) type Scheduler = smart_contract_scheduler::Scheduler<Rpc, SmartContract>; pub(crate) type Scheduler<D> = smart_contract_scheduler::Scheduler<Rpc<D>, SmartContract>;