Add a way to check if blocks completed eventualities

This commit is contained in:
Luke Parker 2023-03-22 22:45:41 -04:00
parent 11a0803ea5
commit 8447021ba1
No known key found for this signature in database
6 changed files with 244 additions and 12 deletions

2
Cargo.lock generated
View file

@ -5057,7 +5057,7 @@ dependencies = [
[[package]] [[package]]
name = "monero-generators" name = "monero-generators"
version = "0.2.0" version = "0.3.0"
dependencies = [ dependencies = [
"curve25519-dalek 3.2.0", "curve25519-dalek 3.2.0",
"dalek-ff-group", "dalek-ff-group",

View file

@ -10,6 +10,8 @@ use frost::{
ThresholdKeys, ThresholdKeys,
}; };
use tokio::time::{Duration, sleep};
use bitcoin_serai::{ use bitcoin_serai::{
bitcoin::{ bitcoin::{
hashes::Hash as HashTrait, hashes::Hash as HashTrait,
@ -39,7 +41,8 @@ use serai_client::coins::bitcoin::Address;
use crate::{ use crate::{
coins::{ coins::{
CoinError, Block as BlockTrait, OutputType, Output as OutputTrait, CoinError, Block as BlockTrait, OutputType, Output as OutputTrait,
Transaction as TransactionTrait, Eventuality, PostFeeBranch, Coin, drop_branches, amortize_fee, Transaction as TransactionTrait, Eventuality, EventualitiesTracker, PostFeeBranch, Coin,
drop_branches, amortize_fee,
}, },
Plan, Plan,
}; };
@ -154,6 +157,10 @@ impl TransactionTrait<Bitcoin> for Transaction {
} }
impl Eventuality for OutPoint { impl Eventuality for OutPoint {
fn lookup(&self) -> Vec<u8> {
self.serialize()
}
fn read<R: io::Read>(reader: &mut R) -> io::Result<Self> { fn read<R: io::Read>(reader: &mut R) -> io::Result<Self> {
OutPoint::consensus_decode(reader) OutPoint::consensus_decode(reader)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "couldn't decode outpoint as eventuality")) .map_err(|_| io::Error::new(io::ErrorKind::Other, "couldn't decode outpoint as eventuality"))
@ -358,6 +365,69 @@ impl Coin for Bitcoin {
Ok(outputs) Ok(outputs)
} }
async fn get_eventuality_completions(
&self,
eventualities: &mut EventualitiesTracker<OutPoint>,
block: &Self::Block,
) -> HashMap<[u8; 32], [u8; 32]> {
let mut res = HashMap::new();
if eventualities.map.is_empty() {
return res;
}
async fn check_block(
eventualities: &mut EventualitiesTracker<OutPoint>,
block: &Block,
res: &mut HashMap<[u8; 32], [u8; 32]>,
) {
for tx in &block.txdata[1 ..] {
let input = &tx.input[0].previous_output;
if let Some((plan, eventuality)) = eventualities.map.remove(&input.serialize()) {
assert_eq!(input, &eventuality);
res.insert(plan, tx.id());
}
}
eventualities.block_number += 1;
}
let this_block_hash = block.id();
let this_block_num = (|| async {
loop {
match self.rpc.get_block_number(&this_block_hash).await {
Ok(number) => return number,
Err(e) => {
log::error!("couldn't get the block number for {}: {}", hex::encode(this_block_hash), e)
}
}
sleep(Duration::from_secs(60)).await;
}
})()
.await;
for block_num in (eventualities.block_number + 1) .. this_block_num {
let block = {
let mut block;
while {
block = self.get_block(block_num).await;
block.is_err()
} {
log::error!("couldn't get block {}: {}", block_num, block.err().unwrap());
sleep(Duration::from_secs(60)).await;
}
block.unwrap()
};
check_block(eventualities, &block, &mut res).await;
}
// Also check the current block
check_block(eventualities, block, &mut res).await;
assert_eq!(eventualities.block_number, this_block_num);
res
}
async fn prepare_send( async fn prepare_send(
&self, &self,
keys: ThresholdKeys<Secp256k1>, keys: ThresholdKeys<Secp256k1>,

View file

@ -1,5 +1,5 @@
use core::fmt::Debug; use core::fmt::Debug;
use std::io; use std::{collections::HashMap, io};
use async_trait::async_trait; use async_trait::async_trait;
use thiserror::Error; use thiserror::Error;
@ -113,10 +113,44 @@ pub trait Transaction<C: Coin>: Send + Sync + Sized + Clone + Debug {
} }
pub trait Eventuality: Send + Sync + Clone + Debug { pub trait Eventuality: Send + Sync + Clone + Debug {
fn lookup(&self) -> Vec<u8>;
fn read<R: io::Read>(reader: &mut R) -> io::Result<Self>; fn read<R: io::Read>(reader: &mut R) -> io::Result<Self>;
fn serialize(&self) -> Vec<u8>; fn serialize(&self) -> Vec<u8>;
} }
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct EventualitiesTracker<E: Eventuality> {
// Lookup property (input, nonce, TX extra...) -> (plan ID, eventuality)
map: HashMap<Vec<u8>, ([u8; 32], E)>,
// Block number we've scanned these eventualities too
block_number: usize,
}
impl<E: Eventuality> EventualitiesTracker<E> {
pub fn new() -> Self {
EventualitiesTracker { map: HashMap::new(), block_number: usize::MAX }
}
pub fn register(&mut self, block_number: usize, id: [u8; 32], eventuality: E) {
log::info!("registering eventuality for {}", hex::encode(id));
let lookup = eventuality.lookup();
if self.map.contains_key(&lookup) {
panic!("registering an eventuality multiple times or lookup collision");
}
self.map.insert(lookup, (id, eventuality));
// If our self tracker already went past this block number, set it back
self.block_number = self.block_number.min(block_number);
}
}
impl<E: Eventuality> Default for EventualitiesTracker<E> {
fn default() -> Self {
Self::new()
}
}
pub trait Block<C: Coin>: Send + Sync + Sized + Clone + Debug { pub trait Block<C: Coin>: Send + Sync + Sized + Clone + Debug {
type Id: 'static + Id; type Id: 'static + Id;
fn id(&self) -> Self::Id; fn id(&self) -> Self::Id;
@ -246,6 +280,14 @@ pub trait Coin: 'static + Send + Sync + Clone + PartialEq + Eq + Debug {
key: <Self::Curve as Ciphersuite>::G, key: <Self::Curve as Ciphersuite>::G,
) -> Result<Vec<Self::Output>, CoinError>; ) -> Result<Vec<Self::Output>, CoinError>;
/// Get the registered eventualities completed within this block, and any prior blocks which
/// registered eventualities may have been completed in.
async fn get_eventuality_completions(
&self,
eventualities: &mut EventualitiesTracker<Self::Eventuality>,
block: &Self::Block,
) -> HashMap<[u8; 32], <Self::Transaction as Transaction<Self>>::Id>;
/// Prepare a SignableTransaction for a transaction. /// Prepare a SignableTransaction for a transaction.
/// Returns None for the transaction if the SignableTransaction was dropped due to lack of value. /// Returns None for the transaction if the SignableTransaction was dropped due to lack of value.
#[rustfmt::skip] #[rustfmt::skip]

View file

@ -1,4 +1,4 @@
use std::io; use std::{time::Duration, collections::HashMap, io};
use async_trait::async_trait; use async_trait::async_trait;
@ -23,14 +23,16 @@ use monero_serai::{
}, },
}; };
use tokio::time::sleep;
pub use serai_client::{primitives::MAX_DATA_LEN, coins::monero::Address}; pub use serai_client::{primitives::MAX_DATA_LEN, coins::monero::Address};
use crate::{ use crate::{
Payment, Plan, additional_key, Payment, Plan, additional_key,
coins::{ coins::{
CoinError, Block as BlockTrait, OutputType, Output as OutputTrait, CoinError, Block as BlockTrait, OutputType, Output as OutputTrait,
Transaction as TransactionTrait, Eventuality as EventualityTrait, PostFeeBranch, Coin, Transaction as TransactionTrait, Eventuality as EventualityTrait, EventualitiesTracker,
drop_branches, amortize_fee, PostFeeBranch, Coin, drop_branches, amortize_fee,
}, },
}; };
@ -104,6 +106,14 @@ impl TransactionTrait<Monero> for Transaction {
} }
impl EventualityTrait for Eventuality { impl EventualityTrait for Eventuality {
// Use the TX extra to look up potential matches
// While anyone can forge this, a transaction with distinct outputs won't actually match
// Extra includess the one time keys which are derived from the plan ID, so a collision here is a
// hash collision
fn lookup(&self) -> Vec<u8> {
self.extra().to_vec()
}
fn read<R: io::Read>(reader: &mut R) -> io::Result<Self> { fn read<R: io::Read>(reader: &mut R) -> io::Result<Self> {
Eventuality::read(reader) Eventuality::read(reader)
} }
@ -137,7 +147,7 @@ impl BlockTrait<Monero> for Block {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Monero { pub struct Monero {
pub(crate) rpc: Rpc, rpc: Rpc,
} }
// Shim required for testing/debugging purposes due to generic arguments also necessitating trait // Shim required for testing/debugging purposes due to generic arguments also necessitating trait
// bounds // bounds
@ -280,6 +290,71 @@ impl Coin for Monero {
Ok(outputs) Ok(outputs)
} }
async fn get_eventuality_completions(
&self,
eventualities: &mut EventualitiesTracker<Eventuality>,
block: &Self::Block,
) -> HashMap<[u8; 32], [u8; 32]> {
let block = &block.1;
let mut res = HashMap::new();
if eventualities.map.is_empty() {
return res;
}
async fn check_block(
coin: &Monero,
eventualities: &mut EventualitiesTracker<Eventuality>,
block: &MBlock,
res: &mut HashMap<[u8; 32], [u8; 32]>,
) {
for hash in &block.txs {
let tx = {
let mut tx;
while {
tx = coin.get_transaction(hash).await;
tx.is_err()
} {
log::error!("couldn't get transaction {}: {}", hex::encode(hash), tx.err().unwrap());
sleep(Duration::from_secs(60)).await;
}
tx.unwrap()
};
if let Some((_, eventuality)) = eventualities.map.get(&tx.prefix.extra) {
if eventuality.matches(&tx) {
res.insert(eventualities.map.remove(&tx.prefix.extra).unwrap().0, tx.hash());
}
}
}
eventualities.block_number += 1;
assert_eq!(eventualities.block_number, block.number());
}
for block_num in (eventualities.block_number + 1) .. block.number() {
let block = {
let mut block;
while {
block = self.get_block(block_num).await;
block.is_err()
} {
log::error!("couldn't get block {}: {}", block_num, block.err().unwrap());
sleep(Duration::from_secs(60)).await;
}
block.unwrap()
};
check_block(self, eventualities, &block.1, &mut res).await;
}
// Also check the current block
check_block(self, eventualities, block, &mut res).await;
assert_eq!(eventualities.block_number, block.number());
res
}
async fn prepare_send( async fn prepare_send(
&self, &self,
keys: ThresholdKeys<Ed25519>, keys: ThresholdKeys<Ed25519>,
@ -455,7 +530,7 @@ impl Coin for Monero {
#[cfg(test)] #[cfg(test)]
async fn mine_block(&self) { async fn mine_block(&self) {
// https://github.com/serai-dex/serai/issues/198 // https://github.com/serai-dex/serai/issues/198
tokio::time::sleep(std::time::Duration::from_millis(100)).await; sleep(std::time::Duration::from_millis(100)).await;
#[derive(serde::Deserialize, Debug)] #[derive(serde::Deserialize, Debug)]
struct EmptyResponse {} struct EmptyResponse {}

View file

@ -128,6 +128,7 @@ async fn prepare_send<C: Coin, D: Db>(
async fn sign_plans<C: Coin, D: Db>( async fn sign_plans<C: Coin, D: Db>(
db: &mut MainDb<C, D>, db: &mut MainDb<C, D>,
coin: &C, coin: &C,
scanner: &ScannerHandle<C, D>,
schedulers: &mut HashMap<Vec<u8>, Scheduler<C>>, schedulers: &mut HashMap<Vec<u8>, Scheduler<C>>,
signers: &HashMap<Vec<u8>, SignerHandle<C, D>>, signers: &HashMap<Vec<u8>, SignerHandle<C, D>>,
context: SubstrateContext, context: SubstrateContext,
@ -162,7 +163,7 @@ async fn sign_plans<C: Coin, D: Db>(
} }
if let Some((tx, eventuality)) = tx { if let Some((tx, eventuality)) = tx {
// TODO: Handle detection of already signed TXs (either on-chain or notified by a peer) scanner.register_eventuality(block_number, id, eventuality.clone()).await;
signers[key.as_ref()].sign_transaction(id, start, tx, eventuality).await; signers[key.as_ref()].sign_transaction(id, start, tx, eventuality).await;
} }
} }
@ -223,6 +224,10 @@ async fn run<C: Coin, D: Db, Co: Coordinator>(raw_db: D, coin: C, mut coordinato
prepare_send(&coin, &signer, block_number, fee, plan).await else { prepare_send(&coin, &signer, block_number, fee, plan).await else {
panic!("previously created transaction is no longer being created") panic!("previously created transaction is no longer being created")
}; };
scanner.register_eventuality(block_number, id, eventuality.clone()).await;
// TODO: Reconsider if the Signer should have the eventuality, or if just the coin/scanner
// should
signer.sign_transaction(id, start, tx, eventuality).await; signer.sign_transaction(id, start, tx, eventuality).await;
} }
@ -360,7 +365,15 @@ async fn run<C: Coin, D: Db, Co: Coordinator>(raw_db: D, coin: C, mut coordinato
.get_mut(&key_vec) .get_mut(&key_vec)
.expect("key we don't have a scheduler for acknowledged a block") .expect("key we don't have a scheduler for acknowledged a block")
.add_outputs(scanner.ack_block(key, block_id).await); .add_outputs(scanner.ack_block(key, block_id).await);
sign_plans(&mut main_db, &coin, &mut schedulers, &signers, context, plans).await; sign_plans(
&mut main_db,
&coin,
&scanner,
&mut schedulers,
&signers,
context,
plans
).await;
} }
substrate::CoordinatorMessage::Burns { context, burns } => { substrate::CoordinatorMessage::Burns { context, burns } => {
@ -381,7 +394,15 @@ async fn run<C: Coin, D: Db, Co: Coordinator>(raw_db: D, coin: C, mut coordinato
} }
let plans = scheduler.schedule(payments); let plans = scheduler.schedule(payments);
sign_plans(&mut main_db, &coin, &mut schedulers, &signers, context, plans).await; sign_plans(
&mut main_db,
&coin,
&scanner,
&mut schedulers,
&signers,
context,
plans
).await;
} }
} }
} }

View file

@ -15,7 +15,7 @@ use tokio::{
use crate::{ use crate::{
DbTxn, Db, DbTxn, Db,
coins::{Output, Block, Coin}, coins::{Output, EventualitiesTracker, Block, Coin},
}; };
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -172,6 +172,8 @@ pub struct Scanner<C: Coin, D: Db> {
db: ScannerDb<C, D>, db: ScannerDb<C, D>,
keys: Vec<<C::Curve as Ciphersuite>::G>, keys: Vec<<C::Curve as Ciphersuite>::G>,
eventualities: EventualitiesTracker<C::Eventuality>,
ram_scanned: HashMap<Vec<u8>, usize>, ram_scanned: HashMap<Vec<u8>, usize>,
ram_outputs: HashSet<Vec<u8>>, ram_outputs: HashSet<Vec<u8>>,
@ -198,6 +200,15 @@ impl<C: Coin, D: Db> ScannerHandle<C, D> {
res.unwrap_or(0) res.unwrap_or(0)
} }
pub async fn register_eventuality(
&self,
block_number: usize,
id: [u8; 32],
eventuality: C::Eventuality,
) {
self.scanner.write().await.eventualities.register(block_number, id, eventuality)
}
/// Rotate the key being scanned for. /// Rotate the key being scanned for.
/// ///
/// If no key has been prior set, this will become the key with no further actions. /// If no key has been prior set, this will become the key with no further actions.
@ -257,6 +268,8 @@ impl<C: Coin, D: Db> Scanner<C, D> {
db, db,
keys: keys.clone(), keys: keys.clone(),
eventualities: EventualitiesTracker::new(),
ram_scanned: HashMap::new(), ram_scanned: HashMap::new(),
ram_outputs: HashSet::new(), ram_outputs: HashSet::new(),
@ -338,6 +351,17 @@ impl<C: Coin, D: Db> Scanner<C, D> {
txn.commit(); txn.commit();
} }
// Clone coin because we can't borrow it while also mutably borrowing the eventualities
// Thankfully, coin is written to be a cheap clone
let coin = scanner.coin.clone();
for (id, tx) in
coin.get_eventuality_completions(&mut scanner.eventualities, &block).await
{
// TODO: Fire Completed
let _ = id;
let _ = tx;
}
let outputs = match scanner.coin.get_outputs(&block, key).await { let outputs = match scanner.coin.get_outputs(&block, key).await {
Ok(outputs) => outputs, Ok(outputs) => outputs,
Err(_) => { Err(_) => {