mirror of
https://github.com/serai-dex/serai.git
synced 2025-01-11 05:14:41 +00:00
Add sanity checks we haven't prior reported an InInstruction for/accumulated an output
This commit is contained in:
parent
2ca7fccb08
commit
a8b9b7bad3
5 changed files with 80 additions and 88 deletions
|
@ -3,9 +3,9 @@ use core::marker::PhantomData;
|
|||
use scale::Encode;
|
||||
use serai_db::{Get, DbTxn, create_db};
|
||||
|
||||
use primitives::{EncodableG, Eventuality, EventualityTracker};
|
||||
use primitives::{EncodableG, ReceivedOutput, Eventuality, EventualityTracker};
|
||||
|
||||
use crate::{ScannerFeed, KeyFor, EventualityFor};
|
||||
use crate::{ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor};
|
||||
|
||||
create_db!(
|
||||
ScannerEventuality {
|
||||
|
@ -15,6 +15,8 @@ create_db!(
|
|||
LatestHandledNotableBlock: () -> u64,
|
||||
|
||||
SerializedEventualities: <K: Encode>(key: K) -> Vec<u8>,
|
||||
|
||||
AccumulatedOutput: (id: &[u8]) -> (),
|
||||
}
|
||||
);
|
||||
|
||||
|
@ -65,4 +67,17 @@ impl<S: ScannerFeed> EventualityDb<S> {
|
|||
}
|
||||
res
|
||||
}
|
||||
|
||||
pub(crate) fn prior_accumulated_output(
|
||||
getter: &impl Get,
|
||||
id: &<OutputFor<S> as ReceivedOutput<KeyFor<S>, AddressFor<S>>>::Id,
|
||||
) -> bool {
|
||||
AccumulatedOutput::get(getter, id.as_ref()).is_some()
|
||||
}
|
||||
pub(crate) fn accumulated_output(
|
||||
txn: &mut impl DbTxn,
|
||||
id: &<OutputFor<S> as ReceivedOutput<KeyFor<S>, AddressFor<S>>>::Id,
|
||||
) {
|
||||
AccumulatedOutput::set(txn, id.as_ref(), &());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,7 +12,8 @@ use crate::{
|
|||
SeraiKey, OutputWithInInstruction, ReceiverScanData, ScannerGlobalDb, SubstrateToEventualityDb,
|
||||
ScanToEventualityDb,
|
||||
},
|
||||
BlockExt, ScannerFeed, KeyFor, EventualityFor, SchedulerUpdate, Scheduler, sort_outputs,
|
||||
BlockExt, ScannerFeed, KeyFor, OutputFor, EventualityFor, SchedulerUpdate, Scheduler,
|
||||
sort_outputs,
|
||||
scan::{next_to_scan_for_outputs_block, queue_output_until_block},
|
||||
};
|
||||
|
||||
|
@ -349,6 +350,22 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
|
|||
scheduler_update.outputs.sort_by(sort_outputs);
|
||||
scheduler_update.forwards.sort_by(sort_outputs);
|
||||
scheduler_update.returns.sort_by(|a, b| sort_outputs(&a.output, &b.output));
|
||||
|
||||
// Sanity check we've never accumulated these outputs before
|
||||
{
|
||||
let a: core::slice::Iter<'_, OutputFor<S>> = scheduler_update.outputs.iter();
|
||||
let b: core::slice::Iter<'_, OutputFor<S>> = scheduler_update.forwards.iter();
|
||||
let c = scheduler_update.returns.iter().map(|output_to_return| &output_to_return.output);
|
||||
|
||||
for output in a.chain(b).chain(c) {
|
||||
assert!(
|
||||
!EventualityDb::<S>::prior_accumulated_output(&txn, &output.id()),
|
||||
"prior accumulated an output with this ID"
|
||||
);
|
||||
EventualityDb::<S>::accumulated_output(&mut txn, &output.id());
|
||||
}
|
||||
}
|
||||
|
||||
// Intake the new Eventualities
|
||||
let new_eventualities =
|
||||
self.scheduler.update(&mut txn, &keys_with_stages, scheduler_update);
|
||||
|
@ -375,7 +392,6 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
|
|||
// Now that we've intaked any Eventualities caused, check if we're retiring any keys
|
||||
if key.stage == LifetimeStage::Finishing {
|
||||
let eventualities = EventualityDb::<S>::eventualities(&txn, key.key);
|
||||
// TODO: This assumes the Scheduler is empty
|
||||
if eventualities.active_eventualities.is_empty() {
|
||||
log::info!(
|
||||
"key {} has finished and is being retired",
|
||||
|
|
|
@ -200,7 +200,6 @@ pub trait Scheduler<S: ScannerFeed>: 'static + Send {
|
|||
/// certain time period. With `flush_key`, all outputs should be directed towards fulfilling some
|
||||
/// obligation or the `new_key`. Every output MUST be connected to an Eventuality. If a key no
|
||||
/// longer has active Eventualities, it MUST be able to be retired.
|
||||
// TODO: Call this
|
||||
fn flush_key(&mut self, txn: &mut impl DbTxn, retiring_key: KeyFor<S>, new_key: KeyFor<S>);
|
||||
|
||||
/// Retire a key as it'll no longer be used.
|
||||
|
@ -384,81 +383,3 @@ impl<S: ScannerFeed> Scanner<S> {
|
|||
SubstrateToEventualityDb::send_burns(txn, queue_as_of, burns)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
#[derive(Clone, Debug)]
|
||||
struct ScannerGlobalDb<N: Network, D: Db>(PhantomData<N>, PhantomData<D>);
|
||||
impl<N: Network, D: Db> ScannerGlobalDb<N, D> {
|
||||
fn seen_key(id: &<N::Output as Output<N>>::Id) -> Vec<u8> {
|
||||
Self::scanner_key(b"seen", id)
|
||||
}
|
||||
fn seen<G: Get>(getter: &G, id: &<N::Output as Output<N>>::Id) -> bool {
|
||||
getter.get(Self::seen_key(id)).is_some()
|
||||
}
|
||||
|
||||
fn save_scanned_block(txn: &mut D::Transaction<'_>, block: usize) -> Vec<N::Output> {
|
||||
let id = Self::block(txn, block); // It may be None for the first key rotated to
|
||||
let outputs =
|
||||
if let Some(id) = id.as_ref() { Self::outputs(txn, id).unwrap_or(vec![]) } else { vec![] };
|
||||
|
||||
// Mark all the outputs from this block as seen
|
||||
for output in &outputs {
|
||||
txn.put(Self::seen_key(&output.id()), b"");
|
||||
}
|
||||
|
||||
txn.put(Self::scanned_block_key(), u64::try_from(block).unwrap().to_le_bytes());
|
||||
|
||||
// Return this block's outputs so they can be pruned from the RAM cache
|
||||
outputs
|
||||
}
|
||||
}
|
||||
|
||||
// Panic if we've already seen these outputs
|
||||
for output in &outputs {
|
||||
let id = output.id();
|
||||
info!(
|
||||
"block {} had output {} worth {:?}",
|
||||
hex::encode(&block_id),
|
||||
hex::encode(&id),
|
||||
output.balance(),
|
||||
);
|
||||
|
||||
// On Bitcoin, the output ID should be unique for a given chain
|
||||
// On Monero, it's trivial to make an output sharing an ID with another
|
||||
// We should only scan outputs with valid IDs however, which will be unique
|
||||
|
||||
/*
|
||||
The safety of this code must satisfy the following conditions:
|
||||
1) seen is not set for the first occurrence
|
||||
2) seen is set for any future occurrence
|
||||
|
||||
seen is only written to after this code completes. Accordingly, it cannot be set
|
||||
before the first occurrence UNLESSS it's set, yet the last scanned block isn't.
|
||||
They are both written in the same database transaction, preventing this.
|
||||
|
||||
As for future occurrences, the RAM entry ensures they're handled properly even if
|
||||
the database has yet to be set.
|
||||
|
||||
On reboot, which will clear the RAM, if seen wasn't set, neither was latest scanned
|
||||
block. Accordingly, this will scan from some prior block, re-populating the RAM.
|
||||
|
||||
If seen was set, then this will be successfully read.
|
||||
|
||||
There's also no concern ram_outputs was pruned, yet seen wasn't set, as pruning
|
||||
from ram_outputs will acquire a write lock (preventing this code from acquiring
|
||||
its own write lock and running), and during its holding of the write lock, it
|
||||
commits the transaction setting seen and the latest scanned block.
|
||||
|
||||
This last case isn't true. Committing seen/latest_scanned_block happens after
|
||||
relinquishing the write lock.
|
||||
|
||||
TODO2: Only update ram_outputs after committing the TXN in question.
|
||||
*/
|
||||
let seen = ScannerGlobalDb::<N, D>::seen(&db, &id);
|
||||
let id = id.as_ref().to_vec();
|
||||
if seen || scanner.ram_outputs.contains(&id) {
|
||||
panic!("scanned an output multiple times");
|
||||
}
|
||||
scanner.ram_outputs.insert(id);
|
||||
}
|
||||
*/
|
||||
|
|
|
@ -2,7 +2,9 @@ use core::marker::PhantomData;
|
|||
|
||||
use serai_db::{Get, DbTxn, create_db};
|
||||
|
||||
use crate::{db::OutputWithInInstruction, ScannerFeed};
|
||||
use primitives::ReceivedOutput;
|
||||
|
||||
use crate::{db::OutputWithInInstruction, ScannerFeed, KeyFor, AddressFor, OutputFor};
|
||||
|
||||
create_db!(
|
||||
ScannerScan {
|
||||
|
@ -10,6 +12,8 @@ create_db!(
|
|||
NextToScanForOutputsBlock: () -> u64,
|
||||
|
||||
SerializedQueuedOutputs: (block_number: u64) -> Vec<u8>,
|
||||
|
||||
ReportedInInstructionForOutput: (id: &[u8]) -> (),
|
||||
}
|
||||
);
|
||||
|
||||
|
@ -38,7 +42,6 @@ impl<S: ScannerFeed> ScanDb<S> {
|
|||
}
|
||||
res
|
||||
}
|
||||
|
||||
pub(crate) fn queue_output_until_block(
|
||||
txn: &mut impl DbTxn,
|
||||
queue_for_block: u64,
|
||||
|
@ -49,4 +52,17 @@ impl<S: ScannerFeed> ScanDb<S> {
|
|||
output.write(&mut outputs).unwrap();
|
||||
SerializedQueuedOutputs::set(txn, queue_for_block, &outputs);
|
||||
}
|
||||
|
||||
pub(crate) fn prior_reported_in_instruction_for_output(
|
||||
getter: &impl Get,
|
||||
id: &<OutputFor<S> as ReceivedOutput<KeyFor<S>, AddressFor<S>>>::Id,
|
||||
) -> bool {
|
||||
ReportedInInstructionForOutput::get(getter, id.as_ref()).is_some()
|
||||
}
|
||||
pub(crate) fn reported_in_instruction_for_output(
|
||||
txn: &mut impl DbTxn,
|
||||
id: &<OutputFor<S> as ReceivedOutput<KeyFor<S>, AddressFor<S>>>::Id,
|
||||
) {
|
||||
ReportedInInstructionForOutput::set(txn, id.as_ref(), &());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -149,8 +149,8 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
|
|||
queued_outputs
|
||||
};
|
||||
for queued_output in queued_outputs {
|
||||
in_instructions.push((queued_output.output.id(), queued_output.in_instruction));
|
||||
scan_data.received_external_outputs.push(queued_output.output);
|
||||
in_instructions.push(queued_output.in_instruction);
|
||||
}
|
||||
|
||||
// We subtract the cost to aggregate from some outputs we scan
|
||||
|
@ -297,13 +297,37 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
|
|||
// Ensures we didn't miss a `continue` above
|
||||
assert!(matches!(key.stage, LifetimeStage::Active | LifetimeStage::UsingNewForChange));
|
||||
|
||||
scan_data.received_external_outputs.push(output_with_in_instruction.output.clone());
|
||||
in_instructions.push(output_with_in_instruction.in_instruction);
|
||||
in_instructions.push((
|
||||
output_with_in_instruction.output.id(),
|
||||
output_with_in_instruction.in_instruction,
|
||||
));
|
||||
scan_data.received_external_outputs.push(output_with_in_instruction.output);
|
||||
}
|
||||
}
|
||||
|
||||
// Sort the InInstructions by the output ID
|
||||
in_instructions.sort_by(|(output_id_a, _), (output_id_b, _)| {
|
||||
use core::cmp::{Ordering, Ord};
|
||||
let res = output_id_a.as_ref().cmp(output_id_b.as_ref());
|
||||
assert!(res != Ordering::Equal, "two outputs within a collection had the same ID");
|
||||
res
|
||||
});
|
||||
// Check we haven't prior reported an InInstruction for this output
|
||||
// This is a sanity check which is intended to prevent multiple instances of sriXYZ on-chain
|
||||
// due to a single output
|
||||
for (id, _) in &in_instructions {
|
||||
assert!(
|
||||
!ScanDb::<S>::prior_reported_in_instruction_for_output(&txn, id),
|
||||
"prior reported an InInstruction for an output with this ID"
|
||||
);
|
||||
ScanDb::<S>::reported_in_instruction_for_output(&mut txn, id);
|
||||
}
|
||||
// Reformat the InInstructions to just the InInstructions
|
||||
let in_instructions =
|
||||
in_instructions.into_iter().map(|(_id, in_instruction)| in_instruction).collect::<Vec<_>>();
|
||||
// Send the InInstructions to the report task
|
||||
ScanToReportDb::<S>::send_in_instructions(&mut txn, b, in_instructions);
|
||||
|
||||
// Send the scan data to the eventuality task
|
||||
ScanToEventualityDb::<S>::send_scan_data(&mut txn, b, &scan_data);
|
||||
// Update the next to scan block
|
||||
|
|
Loading…
Reference in a new issue