diff --git a/processor/scanner/src/eventuality/db.rs b/processor/scanner/src/eventuality/db.rs index 2bd02025..3e5088d1 100644 --- a/processor/scanner/src/eventuality/db.rs +++ b/processor/scanner/src/eventuality/db.rs @@ -3,9 +3,9 @@ use core::marker::PhantomData; use scale::Encode; use serai_db::{Get, DbTxn, create_db}; -use primitives::{EncodableG, Eventuality, EventualityTracker}; +use primitives::{EncodableG, ReceivedOutput, Eventuality, EventualityTracker}; -use crate::{ScannerFeed, KeyFor, EventualityFor}; +use crate::{ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor}; create_db!( ScannerEventuality { @@ -15,6 +15,8 @@ create_db!( LatestHandledNotableBlock: () -> u64, SerializedEventualities: (key: K) -> Vec, + + AccumulatedOutput: (id: &[u8]) -> (), } ); @@ -65,4 +67,17 @@ impl EventualityDb { } res } + + pub(crate) fn prior_accumulated_output( + getter: &impl Get, + id: & as ReceivedOutput, AddressFor>>::Id, + ) -> bool { + AccumulatedOutput::get(getter, id.as_ref()).is_some() + } + pub(crate) fn accumulated_output( + txn: &mut impl DbTxn, + id: & as ReceivedOutput, AddressFor>>::Id, + ) { + AccumulatedOutput::set(txn, id.as_ref(), &()); + } } diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index 400c5690..43f6b784 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -12,7 +12,8 @@ use crate::{ SeraiKey, OutputWithInInstruction, ReceiverScanData, ScannerGlobalDb, SubstrateToEventualityDb, ScanToEventualityDb, }, - BlockExt, ScannerFeed, KeyFor, EventualityFor, SchedulerUpdate, Scheduler, sort_outputs, + BlockExt, ScannerFeed, KeyFor, OutputFor, EventualityFor, SchedulerUpdate, Scheduler, + sort_outputs, scan::{next_to_scan_for_outputs_block, queue_output_until_block}, }; @@ -349,6 +350,22 @@ impl> ContinuallyRan for EventualityTas scheduler_update.outputs.sort_by(sort_outputs); scheduler_update.forwards.sort_by(sort_outputs); scheduler_update.returns.sort_by(|a, b| sort_outputs(&a.output, &b.output)); + + // Sanity check we've never accumulated these outputs before + { + let a: core::slice::Iter<'_, OutputFor> = scheduler_update.outputs.iter(); + let b: core::slice::Iter<'_, OutputFor> = scheduler_update.forwards.iter(); + let c = scheduler_update.returns.iter().map(|output_to_return| &output_to_return.output); + + for output in a.chain(b).chain(c) { + assert!( + !EventualityDb::::prior_accumulated_output(&txn, &output.id()), + "prior accumulated an output with this ID" + ); + EventualityDb::::accumulated_output(&mut txn, &output.id()); + } + } + // Intake the new Eventualities let new_eventualities = self.scheduler.update(&mut txn, &keys_with_stages, scheduler_update); @@ -375,7 +392,6 @@ impl> ContinuallyRan for EventualityTas // Now that we've intaked any Eventualities caused, check if we're retiring any keys if key.stage == LifetimeStage::Finishing { let eventualities = EventualityDb::::eventualities(&txn, key.key); - // TODO: This assumes the Scheduler is empty if eventualities.active_eventualities.is_empty() { log::info!( "key {} has finished and is being retired", diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 2cbae096..7c6466ff 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -200,7 +200,6 @@ pub trait Scheduler: 'static + Send { /// certain time period. With `flush_key`, all outputs should be directed towards fulfilling some /// obligation or the `new_key`. Every output MUST be connected to an Eventuality. If a key no /// longer has active Eventualities, it MUST be able to be retired. - // TODO: Call this fn flush_key(&mut self, txn: &mut impl DbTxn, retiring_key: KeyFor, new_key: KeyFor); /// Retire a key as it'll no longer be used. @@ -384,81 +383,3 @@ impl Scanner { SubstrateToEventualityDb::send_burns(txn, queue_as_of, burns) } } - -/* -#[derive(Clone, Debug)] -struct ScannerGlobalDb(PhantomData, PhantomData); -impl ScannerGlobalDb { - fn seen_key(id: &>::Id) -> Vec { - Self::scanner_key(b"seen", id) - } - fn seen(getter: &G, id: &>::Id) -> bool { - getter.get(Self::seen_key(id)).is_some() - } - - fn save_scanned_block(txn: &mut D::Transaction<'_>, block: usize) -> Vec { - let id = Self::block(txn, block); // It may be None for the first key rotated to - let outputs = - if let Some(id) = id.as_ref() { Self::outputs(txn, id).unwrap_or(vec![]) } else { vec![] }; - - // Mark all the outputs from this block as seen - for output in &outputs { - txn.put(Self::seen_key(&output.id()), b""); - } - - txn.put(Self::scanned_block_key(), u64::try_from(block).unwrap().to_le_bytes()); - - // Return this block's outputs so they can be pruned from the RAM cache - outputs - } -} - - // Panic if we've already seen these outputs - for output in &outputs { - let id = output.id(); - info!( - "block {} had output {} worth {:?}", - hex::encode(&block_id), - hex::encode(&id), - output.balance(), - ); - - // On Bitcoin, the output ID should be unique for a given chain - // On Monero, it's trivial to make an output sharing an ID with another - // We should only scan outputs with valid IDs however, which will be unique - - /* - The safety of this code must satisfy the following conditions: - 1) seen is not set for the first occurrence - 2) seen is set for any future occurrence - - seen is only written to after this code completes. Accordingly, it cannot be set - before the first occurrence UNLESSS it's set, yet the last scanned block isn't. - They are both written in the same database transaction, preventing this. - - As for future occurrences, the RAM entry ensures they're handled properly even if - the database has yet to be set. - - On reboot, which will clear the RAM, if seen wasn't set, neither was latest scanned - block. Accordingly, this will scan from some prior block, re-populating the RAM. - - If seen was set, then this will be successfully read. - - There's also no concern ram_outputs was pruned, yet seen wasn't set, as pruning - from ram_outputs will acquire a write lock (preventing this code from acquiring - its own write lock and running), and during its holding of the write lock, it - commits the transaction setting seen and the latest scanned block. - - This last case isn't true. Committing seen/latest_scanned_block happens after - relinquishing the write lock. - - TODO2: Only update ram_outputs after committing the TXN in question. - */ - let seen = ScannerGlobalDb::::seen(&db, &id); - let id = id.as_ref().to_vec(); - if seen || scanner.ram_outputs.contains(&id) { - panic!("scanned an output multiple times"); - } - scanner.ram_outputs.insert(id); - } -*/ diff --git a/processor/scanner/src/scan/db.rs b/processor/scanner/src/scan/db.rs index 6df84df1..44023bc8 100644 --- a/processor/scanner/src/scan/db.rs +++ b/processor/scanner/src/scan/db.rs @@ -2,7 +2,9 @@ use core::marker::PhantomData; use serai_db::{Get, DbTxn, create_db}; -use crate::{db::OutputWithInInstruction, ScannerFeed}; +use primitives::ReceivedOutput; + +use crate::{db::OutputWithInInstruction, ScannerFeed, KeyFor, AddressFor, OutputFor}; create_db!( ScannerScan { @@ -10,6 +12,8 @@ create_db!( NextToScanForOutputsBlock: () -> u64, SerializedQueuedOutputs: (block_number: u64) -> Vec, + + ReportedInInstructionForOutput: (id: &[u8]) -> (), } ); @@ -38,7 +42,6 @@ impl ScanDb { } res } - pub(crate) fn queue_output_until_block( txn: &mut impl DbTxn, queue_for_block: u64, @@ -49,4 +52,17 @@ impl ScanDb { output.write(&mut outputs).unwrap(); SerializedQueuedOutputs::set(txn, queue_for_block, &outputs); } + + pub(crate) fn prior_reported_in_instruction_for_output( + getter: &impl Get, + id: & as ReceivedOutput, AddressFor>>::Id, + ) -> bool { + ReportedInInstructionForOutput::get(getter, id.as_ref()).is_some() + } + pub(crate) fn reported_in_instruction_for_output( + txn: &mut impl DbTxn, + id: & as ReceivedOutput, AddressFor>>::Id, + ) { + ReportedInInstructionForOutput::set(txn, id.as_ref(), &()); + } } diff --git a/processor/scanner/src/scan/mod.rs b/processor/scanner/src/scan/mod.rs index 59d0f197..f76adb00 100644 --- a/processor/scanner/src/scan/mod.rs +++ b/processor/scanner/src/scan/mod.rs @@ -149,8 +149,8 @@ impl ContinuallyRan for ScanTask { queued_outputs }; for queued_output in queued_outputs { + in_instructions.push((queued_output.output.id(), queued_output.in_instruction)); scan_data.received_external_outputs.push(queued_output.output); - in_instructions.push(queued_output.in_instruction); } // We subtract the cost to aggregate from some outputs we scan @@ -297,13 +297,37 @@ impl ContinuallyRan for ScanTask { // Ensures we didn't miss a `continue` above assert!(matches!(key.stage, LifetimeStage::Active | LifetimeStage::UsingNewForChange)); - scan_data.received_external_outputs.push(output_with_in_instruction.output.clone()); - in_instructions.push(output_with_in_instruction.in_instruction); + in_instructions.push(( + output_with_in_instruction.output.id(), + output_with_in_instruction.in_instruction, + )); + scan_data.received_external_outputs.push(output_with_in_instruction.output); } } + // Sort the InInstructions by the output ID + in_instructions.sort_by(|(output_id_a, _), (output_id_b, _)| { + use core::cmp::{Ordering, Ord}; + let res = output_id_a.as_ref().cmp(output_id_b.as_ref()); + assert!(res != Ordering::Equal, "two outputs within a collection had the same ID"); + res + }); + // Check we haven't prior reported an InInstruction for this output + // This is a sanity check which is intended to prevent multiple instances of sriXYZ on-chain + // due to a single output + for (id, _) in &in_instructions { + assert!( + !ScanDb::::prior_reported_in_instruction_for_output(&txn, id), + "prior reported an InInstruction for an output with this ID" + ); + ScanDb::::reported_in_instruction_for_output(&mut txn, id); + } + // Reformat the InInstructions to just the InInstructions + let in_instructions = + in_instructions.into_iter().map(|(_id, in_instruction)| in_instruction).collect::>(); // Send the InInstructions to the report task ScanToReportDb::::send_in_instructions(&mut txn, b, in_instructions); + // Send the scan data to the eventuality task ScanToEventualityDb::::send_scan_data(&mut txn, b, &scan_data); // Update the next to scan block