From 673cf8fd47cd0b4e7232a2e6ec4fb10184047c6c Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 19 Sep 2024 01:00:31 -0400 Subject: [PATCH] Pass the latest active key to the Block's scan function Effectively necessary for networks on which we utilize account abstraction in order to know what key to associate the received coins with. --- networks/ethereum/relayer/src/main.rs | 3 ++- processor/bitcoin/src/primitives/block.rs | 6 +++++- processor/ethereum/src/primitives/block.rs | 13 ++++++++++++- processor/monero/src/primitives/block.rs | 6 +++++- processor/primitives/src/block.rs | 6 +++++- processor/scanner/src/db.rs | 1 + processor/scanner/src/eventuality/mod.rs | 14 +++++++++++++- processor/scanner/src/lib.rs | 6 +++--- processor/scanner/src/scan/mod.rs | 15 ++++++++++++++- 9 files changed, 60 insertions(+), 10 deletions(-) diff --git a/networks/ethereum/relayer/src/main.rs b/networks/ethereum/relayer/src/main.rs index f5a7e0f9..6424c90f 100644 --- a/networks/ethereum/relayer/src/main.rs +++ b/networks/ethereum/relayer/src/main.rs @@ -91,7 +91,8 @@ async fn main() { let Ok(_) = socket.read_exact(&mut buf).await else { break }; let transaction = db.get(&buf[.. 4]).unwrap_or(vec![]); - let Ok(()) = socket.write_all(&u32::try_from(transaction.len()).unwrap().to_le_bytes()).await + let Ok(()) = + socket.write_all(&u32::try_from(transaction.len()).unwrap().to_le_bytes()).await else { break; }; diff --git a/processor/bitcoin/src/primitives/block.rs b/processor/bitcoin/src/primitives/block.rs index e3df7e69..02b8e595 100644 --- a/processor/bitcoin/src/primitives/block.rs +++ b/processor/bitcoin/src/primitives/block.rs @@ -43,7 +43,11 @@ impl primitives::Block for Block { primitives::BlockHeader::id(&BlockHeader(self.1.header)) } - fn scan_for_outputs_unordered(&self, key: Self::Key) -> Vec { + fn scan_for_outputs_unordered( + &self, + _latest_active_key: Self::Key, + key: Self::Key, + ) -> Vec { let scanner = scanner(key); let mut res = vec![]; diff --git a/processor/ethereum/src/primitives/block.rs b/processor/ethereum/src/primitives/block.rs index 2c0e0505..a6268c0b 100644 --- a/processor/ethereum/src/primitives/block.rs +++ b/processor/ethereum/src/primitives/block.rs @@ -59,8 +59,19 @@ impl primitives::Block for FullEpoch { self.epoch.end_hash } - fn scan_for_outputs_unordered(&self, _key: Self::Key) -> Vec { + fn scan_for_outputs_unordered( + &self, + latest_active_key: Self::Key, + key: Self::Key, + ) -> Vec { // Only return these outputs for the latest key + if latest_active_key != key { + return vec![]; + } + + // Associate all outputs with the latest active key + // We don't associate these with the current key within the SC as that'll cause outputs to be + // marked for forwarding if the SC is delayed to actually rotate todo!("TODO") } diff --git a/processor/monero/src/primitives/block.rs b/processor/monero/src/primitives/block.rs index 70a559c1..6afae429 100644 --- a/processor/monero/src/primitives/block.rs +++ b/processor/monero/src/primitives/block.rs @@ -40,7 +40,11 @@ impl primitives::Block for Block { self.0.block.hash() } - fn scan_for_outputs_unordered(&self, key: Self::Key) -> Vec { + fn scan_for_outputs_unordered( + &self, + _latest_active_key: Self::Key, + key: Self::Key, + ) -> Vec { let mut scanner = GuaranteedScanner::new(view_pair(key)); scanner.register_subaddress(EXTERNAL_SUBADDRESS); scanner.register_subaddress(BRANCH_SUBADDRESS); diff --git a/processor/primitives/src/block.rs b/processor/primitives/src/block.rs index da481247..a3dec40b 100644 --- a/processor/primitives/src/block.rs +++ b/processor/primitives/src/block.rs @@ -43,7 +43,11 @@ pub trait Block: Send + Sync + Sized + Clone + Debug { /// Scan all outputs within this block to find the outputs spendable by this key. /// /// No assumption on the order of the returned outputs is made. - fn scan_for_outputs_unordered(&self, key: Self::Key) -> Vec; + fn scan_for_outputs_unordered( + &self, + latest_active_key: Self::Key, + key: Self::Key, + ) -> Vec; /// Check if this block resolved any Eventualities. /// diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index 49ab1785..884e0e2b 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -28,6 +28,7 @@ struct SeraiKeyDbEntry { key: K, } +#[derive(Clone)] pub(crate) struct SeraiKey { pub(crate) key: K, pub(crate) stage: LifetimeStage, diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index 99fea2fb..bb3e4b7e 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -273,6 +273,18 @@ impl> ContinuallyRan for EventualityTas log::debug!("checking eventuality completions in block: {} ({b})", hex::encode(block.id())); let (keys, keys_with_stages) = self.keys_and_keys_with_stages(b); + let latest_active_key = { + let mut keys_with_stages = keys_with_stages.clone(); + loop { + // Use the most recent key + let (key, stage) = keys_with_stages.pop().unwrap(); + // Unless this key is active, but not yet reporting + if stage == LifetimeStage::ActiveYetNotReporting { + continue; + } + break key; + } + }; let mut txn = self.db.txn(); @@ -307,7 +319,7 @@ impl> ContinuallyRan for EventualityTas } // Fetch all non-External outputs - let mut non_external_outputs = block.scan_for_outputs(key.key); + let mut non_external_outputs = block.scan_for_outputs(latest_active_key, key.key); non_external_outputs.retain(|output| output.kind() != OutputType::External); // Drop any outputs less than the dust limit non_external_outputs.retain(|output| { diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 1b6afaa9..e591d210 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -46,11 +46,11 @@ pub(crate) fn sort_outputs /// Extension traits around Block. pub(crate) trait BlockExt: Block { - fn scan_for_outputs(&self, key: Self::Key) -> Vec; + fn scan_for_outputs(&self, latest_active_key: Self::Key, key: Self::Key) -> Vec; } impl BlockExt for B { - fn scan_for_outputs(&self, key: Self::Key) -> Vec { - let mut outputs = self.scan_for_outputs_unordered(key); + fn scan_for_outputs(&self, latest_active_key: Self::Key, key: Self::Key) -> Vec { + let mut outputs = self.scan_for_outputs_unordered(latest_active_key, key); outputs.sort_by(sort_outputs); outputs } diff --git a/processor/scanner/src/scan/mod.rs b/processor/scanner/src/scan/mod.rs index b235ff15..7004a4d9 100644 --- a/processor/scanner/src/scan/mod.rs +++ b/processor/scanner/src/scan/mod.rs @@ -122,6 +122,19 @@ impl ContinuallyRan for ScanTask { let keys = ScannerGlobalDb::::active_keys_as_of_next_to_scan_for_outputs_block(&txn) .expect("scanning for a blockchain without any keys set"); + let latest_active_key = { + let mut keys = keys.clone(); + loop { + // Use the most recent key + let key = keys.pop().unwrap(); + // Unless this key is active, but not yet reporting + if key.stage == LifetimeStage::ActiveYetNotReporting { + continue; + } + break key.key; + } + }; + // The scan data for this block let mut scan_data = SenderScanData { block_number: b, @@ -157,7 +170,7 @@ impl ContinuallyRan for ScanTask { // Scan for each key for key in &keys { - for output in block.scan_for_outputs(key.key) { + for output in block.scan_for_outputs(latest_active_key, key.key) { assert_eq!(output.key(), key.key); /*