From 8447021ba1f196b2c48205fe9bf8962d8900cede Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 22 Mar 2023 22:45:41 -0400 Subject: [PATCH] Add a way to check if blocks completed eventualities --- Cargo.lock | 2 +- processor/src/coins/bitcoin.rs | 72 +++++++++++++++++++++++++++- processor/src/coins/mod.rs | 44 +++++++++++++++++- processor/src/coins/monero.rs | 85 ++++++++++++++++++++++++++++++++-- processor/src/main.rs | 27 +++++++++-- processor/src/scanner.rs | 26 ++++++++++- 6 files changed, 244 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4578fbda..8285f677 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5057,7 +5057,7 @@ dependencies = [ [[package]] name = "monero-generators" -version = "0.2.0" +version = "0.3.0" dependencies = [ "curve25519-dalek 3.2.0", "dalek-ff-group", diff --git a/processor/src/coins/bitcoin.rs b/processor/src/coins/bitcoin.rs index ccb13243..311b8440 100644 --- a/processor/src/coins/bitcoin.rs +++ b/processor/src/coins/bitcoin.rs @@ -10,6 +10,8 @@ use frost::{ ThresholdKeys, }; +use tokio::time::{Duration, sleep}; + use bitcoin_serai::{ bitcoin::{ hashes::Hash as HashTrait, @@ -39,7 +41,8 @@ use serai_client::coins::bitcoin::Address; use crate::{ coins::{ CoinError, Block as BlockTrait, OutputType, Output as OutputTrait, - Transaction as TransactionTrait, Eventuality, PostFeeBranch, Coin, drop_branches, amortize_fee, + Transaction as TransactionTrait, Eventuality, EventualitiesTracker, PostFeeBranch, Coin, + drop_branches, amortize_fee, }, Plan, }; @@ -154,6 +157,10 @@ impl TransactionTrait for Transaction { } impl Eventuality for OutPoint { + fn lookup(&self) -> Vec { + self.serialize() + } + fn read(reader: &mut R) -> io::Result { OutPoint::consensus_decode(reader) .map_err(|_| io::Error::new(io::ErrorKind::Other, "couldn't decode outpoint as eventuality")) @@ -358,6 +365,69 @@ impl Coin for Bitcoin { Ok(outputs) } + async fn get_eventuality_completions( + &self, + eventualities: &mut EventualitiesTracker, + block: &Self::Block, + ) -> HashMap<[u8; 32], [u8; 32]> { + let mut res = HashMap::new(); + if eventualities.map.is_empty() { + return res; + } + + async fn check_block( + eventualities: &mut EventualitiesTracker, + block: &Block, + res: &mut HashMap<[u8; 32], [u8; 32]>, + ) { + for tx in &block.txdata[1 ..] { + let input = &tx.input[0].previous_output; + if let Some((plan, eventuality)) = eventualities.map.remove(&input.serialize()) { + assert_eq!(input, &eventuality); + res.insert(plan, tx.id()); + } + } + + eventualities.block_number += 1; + } + + let this_block_hash = block.id(); + let this_block_num = (|| async { + loop { + match self.rpc.get_block_number(&this_block_hash).await { + Ok(number) => return number, + Err(e) => { + log::error!("couldn't get the block number for {}: {}", hex::encode(this_block_hash), e) + } + } + sleep(Duration::from_secs(60)).await; + } + })() + .await; + + for block_num in (eventualities.block_number + 1) .. this_block_num { + let block = { + let mut block; + while { + block = self.get_block(block_num).await; + block.is_err() + } { + log::error!("couldn't get block {}: {}", block_num, block.err().unwrap()); + sleep(Duration::from_secs(60)).await; + } + block.unwrap() + }; + + check_block(eventualities, &block, &mut res).await; + } + + // Also check the current block + check_block(eventualities, block, &mut res).await; + assert_eq!(eventualities.block_number, this_block_num); + + res + } + async fn prepare_send( &self, keys: ThresholdKeys, diff --git a/processor/src/coins/mod.rs b/processor/src/coins/mod.rs index bc73a7d2..379a965d 100644 --- a/processor/src/coins/mod.rs +++ b/processor/src/coins/mod.rs @@ -1,5 +1,5 @@ use core::fmt::Debug; -use std::io; +use std::{collections::HashMap, io}; use async_trait::async_trait; use thiserror::Error; @@ -113,10 +113,44 @@ pub trait Transaction: Send + Sync + Sized + Clone + Debug { } pub trait Eventuality: Send + Sync + Clone + Debug { + fn lookup(&self) -> Vec; + fn read(reader: &mut R) -> io::Result; fn serialize(&self) -> Vec; } +#[derive(Clone, PartialEq, Eq, Debug)] +pub struct EventualitiesTracker { + // Lookup property (input, nonce, TX extra...) -> (plan ID, eventuality) + map: HashMap, ([u8; 32], E)>, + // Block number we've scanned these eventualities too + block_number: usize, +} + +impl EventualitiesTracker { + pub fn new() -> Self { + EventualitiesTracker { map: HashMap::new(), block_number: usize::MAX } + } + + pub fn register(&mut self, block_number: usize, id: [u8; 32], eventuality: E) { + log::info!("registering eventuality for {}", hex::encode(id)); + + let lookup = eventuality.lookup(); + if self.map.contains_key(&lookup) { + panic!("registering an eventuality multiple times or lookup collision"); + } + self.map.insert(lookup, (id, eventuality)); + // If our self tracker already went past this block number, set it back + self.block_number = self.block_number.min(block_number); + } +} + +impl Default for EventualitiesTracker { + fn default() -> Self { + Self::new() + } +} + pub trait Block: Send + Sync + Sized + Clone + Debug { type Id: 'static + Id; fn id(&self) -> Self::Id; @@ -246,6 +280,14 @@ pub trait Coin: 'static + Send + Sync + Clone + PartialEq + Eq + Debug { key: ::G, ) -> Result, CoinError>; + /// Get the registered eventualities completed within this block, and any prior blocks which + /// registered eventualities may have been completed in. + async fn get_eventuality_completions( + &self, + eventualities: &mut EventualitiesTracker, + block: &Self::Block, + ) -> HashMap<[u8; 32], >::Id>; + /// Prepare a SignableTransaction for a transaction. /// Returns None for the transaction if the SignableTransaction was dropped due to lack of value. #[rustfmt::skip] diff --git a/processor/src/coins/monero.rs b/processor/src/coins/monero.rs index c18787ed..4339c4a4 100644 --- a/processor/src/coins/monero.rs +++ b/processor/src/coins/monero.rs @@ -1,4 +1,4 @@ -use std::io; +use std::{time::Duration, collections::HashMap, io}; use async_trait::async_trait; @@ -23,14 +23,16 @@ use monero_serai::{ }, }; +use tokio::time::sleep; + pub use serai_client::{primitives::MAX_DATA_LEN, coins::monero::Address}; use crate::{ Payment, Plan, additional_key, coins::{ CoinError, Block as BlockTrait, OutputType, Output as OutputTrait, - Transaction as TransactionTrait, Eventuality as EventualityTrait, PostFeeBranch, Coin, - drop_branches, amortize_fee, + Transaction as TransactionTrait, Eventuality as EventualityTrait, EventualitiesTracker, + PostFeeBranch, Coin, drop_branches, amortize_fee, }, }; @@ -104,6 +106,14 @@ impl TransactionTrait for Transaction { } impl EventualityTrait for Eventuality { + // Use the TX extra to look up potential matches + // While anyone can forge this, a transaction with distinct outputs won't actually match + // Extra includess the one time keys which are derived from the plan ID, so a collision here is a + // hash collision + fn lookup(&self) -> Vec { + self.extra().to_vec() + } + fn read(reader: &mut R) -> io::Result { Eventuality::read(reader) } @@ -137,7 +147,7 @@ impl BlockTrait for Block { #[derive(Clone, Debug)] pub struct Monero { - pub(crate) rpc: Rpc, + rpc: Rpc, } // Shim required for testing/debugging purposes due to generic arguments also necessitating trait // bounds @@ -280,6 +290,71 @@ impl Coin for Monero { Ok(outputs) } + async fn get_eventuality_completions( + &self, + eventualities: &mut EventualitiesTracker, + block: &Self::Block, + ) -> HashMap<[u8; 32], [u8; 32]> { + let block = &block.1; + + let mut res = HashMap::new(); + if eventualities.map.is_empty() { + return res; + } + + async fn check_block( + coin: &Monero, + eventualities: &mut EventualitiesTracker, + block: &MBlock, + res: &mut HashMap<[u8; 32], [u8; 32]>, + ) { + for hash in &block.txs { + let tx = { + let mut tx; + while { + tx = coin.get_transaction(hash).await; + tx.is_err() + } { + log::error!("couldn't get transaction {}: {}", hex::encode(hash), tx.err().unwrap()); + sleep(Duration::from_secs(60)).await; + } + tx.unwrap() + }; + + if let Some((_, eventuality)) = eventualities.map.get(&tx.prefix.extra) { + if eventuality.matches(&tx) { + res.insert(eventualities.map.remove(&tx.prefix.extra).unwrap().0, tx.hash()); + } + } + } + + eventualities.block_number += 1; + assert_eq!(eventualities.block_number, block.number()); + } + + for block_num in (eventualities.block_number + 1) .. block.number() { + let block = { + let mut block; + while { + block = self.get_block(block_num).await; + block.is_err() + } { + log::error!("couldn't get block {}: {}", block_num, block.err().unwrap()); + sleep(Duration::from_secs(60)).await; + } + block.unwrap() + }; + + check_block(self, eventualities, &block.1, &mut res).await; + } + + // Also check the current block + check_block(self, eventualities, block, &mut res).await; + assert_eq!(eventualities.block_number, block.number()); + + res + } + async fn prepare_send( &self, keys: ThresholdKeys, @@ -455,7 +530,7 @@ impl Coin for Monero { #[cfg(test)] async fn mine_block(&self) { // https://github.com/serai-dex/serai/issues/198 - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + sleep(std::time::Duration::from_millis(100)).await; #[derive(serde::Deserialize, Debug)] struct EmptyResponse {} diff --git a/processor/src/main.rs b/processor/src/main.rs index 20d5c96e..ce275e79 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -128,6 +128,7 @@ async fn prepare_send( async fn sign_plans( db: &mut MainDb, coin: &C, + scanner: &ScannerHandle, schedulers: &mut HashMap, Scheduler>, signers: &HashMap, SignerHandle>, context: SubstrateContext, @@ -162,7 +163,7 @@ async fn sign_plans( } if let Some((tx, eventuality)) = tx { - // TODO: Handle detection of already signed TXs (either on-chain or notified by a peer) + scanner.register_eventuality(block_number, id, eventuality.clone()).await; signers[key.as_ref()].sign_transaction(id, start, tx, eventuality).await; } } @@ -223,6 +224,10 @@ async fn run(raw_db: D, coin: C, mut coordinato prepare_send(&coin, &signer, block_number, fee, plan).await else { panic!("previously created transaction is no longer being created") }; + + scanner.register_eventuality(block_number, id, eventuality.clone()).await; + // TODO: Reconsider if the Signer should have the eventuality, or if just the coin/scanner + // should signer.sign_transaction(id, start, tx, eventuality).await; } @@ -360,7 +365,15 @@ async fn run(raw_db: D, coin: C, mut coordinato .get_mut(&key_vec) .expect("key we don't have a scheduler for acknowledged a block") .add_outputs(scanner.ack_block(key, block_id).await); - sign_plans(&mut main_db, &coin, &mut schedulers, &signers, context, plans).await; + sign_plans( + &mut main_db, + &coin, + &scanner, + &mut schedulers, + &signers, + context, + plans + ).await; } substrate::CoordinatorMessage::Burns { context, burns } => { @@ -381,7 +394,15 @@ async fn run(raw_db: D, coin: C, mut coordinato } let plans = scheduler.schedule(payments); - sign_plans(&mut main_db, &coin, &mut schedulers, &signers, context, plans).await; + sign_plans( + &mut main_db, + &coin, + &scanner, + &mut schedulers, + &signers, + context, + plans + ).await; } } } diff --git a/processor/src/scanner.rs b/processor/src/scanner.rs index 4d6e3e07..b59e7fab 100644 --- a/processor/src/scanner.rs +++ b/processor/src/scanner.rs @@ -15,7 +15,7 @@ use tokio::{ use crate::{ DbTxn, Db, - coins::{Output, Block, Coin}, + coins::{Output, EventualitiesTracker, Block, Coin}, }; #[derive(Clone, Debug)] @@ -172,6 +172,8 @@ pub struct Scanner { db: ScannerDb, keys: Vec<::G>, + eventualities: EventualitiesTracker, + ram_scanned: HashMap, usize>, ram_outputs: HashSet>, @@ -198,6 +200,15 @@ impl ScannerHandle { res.unwrap_or(0) } + pub async fn register_eventuality( + &self, + block_number: usize, + id: [u8; 32], + eventuality: C::Eventuality, + ) { + self.scanner.write().await.eventualities.register(block_number, id, eventuality) + } + /// Rotate the key being scanned for. /// /// If no key has been prior set, this will become the key with no further actions. @@ -257,6 +268,8 @@ impl Scanner { db, keys: keys.clone(), + eventualities: EventualitiesTracker::new(), + ram_scanned: HashMap::new(), ram_outputs: HashSet::new(), @@ -338,6 +351,17 @@ impl Scanner { txn.commit(); } + // Clone coin because we can't borrow it while also mutably borrowing the eventualities + // Thankfully, coin is written to be a cheap clone + let coin = scanner.coin.clone(); + for (id, tx) in + coin.get_eventuality_completions(&mut scanner.eventualities, &block).await + { + // TODO: Fire Completed + let _ = id; + let _ = tx; + } + let outputs = match scanner.coin.get_outputs(&block, key).await { Ok(outputs) => outputs, Err(_) => {