Finish scan task

This commit is contained in:
Luke Parker 2024-08-23 22:09:54 -04:00
parent ce805c8cc8
commit fd12cc0213
4 changed files with 122 additions and 51 deletions

View file

@ -24,8 +24,9 @@ struct SeraiKeyDbEntry<K: Borshy> {
}
pub(crate) struct SeraiKey<K> {
pub(crate) stage: LifetimeStage,
pub(crate) key: K,
pub(crate) stage: LifetimeStage,
pub(crate) block_at_which_reporting_starts: u64,
}
pub(crate) struct OutputWithInInstruction<S: ScannerFeed> {
@ -81,6 +82,9 @@ create_db!(
// This collapses from `bool` to `()`, using if the value was set for true and false otherwise
NotableBlock: (number: u64) -> (),
SerializedQueuedOutputs: (block_number: u64) -> Vec<u8>,
SerializedForwardedOutputsIndex: (block_number: u64) -> Vec<u8>,
SerializedForwardedOutput: (output_id: &[u8]) -> Vec<u8>,
SerializedOutputs: (block_number: u64) -> Vec<u8>,
}
);
@ -138,14 +142,13 @@ impl<S: ScannerFeed> ScannerDb<S> {
if block_number < raw_keys[i].activation_block_number {
continue;
}
keys.push(SeraiKey {
key: raw_keys[i].key.0,
stage: LifetimeStage::calculate::<S>(
let (stage, block_at_which_reporting_starts) =
LifetimeStage::calculate_stage_and_reporting_start_block::<S>(
block_number,
raw_keys[i].activation_block_number,
raw_keys.get(i + 1).map(|key| key.activation_block_number),
),
});
);
keys.push(SeraiKey { key: raw_keys[i].key.0, stage, block_at_which_reporting_starts });
}
assert!(keys.len() <= 2);
Some(keys)
@ -226,6 +229,53 @@ impl<S: ScannerFeed> ScannerDb<S> {
HighestAcknowledgedBlock::get(getter)
}
pub(crate) fn take_queued_outputs(
txn: &mut impl DbTxn,
block_number: u64,
) -> Vec<OutputWithInInstruction<S>> {
todo!("TODO")
}
pub(crate) fn queue_return(
txn: &mut impl DbTxn,
block_queued_from: u64,
return_addr: AddressFor<S>,
output: OutputFor<S>,
) {
todo!("TODO")
}
pub(crate) fn queue_output_until_block(
txn: &mut impl DbTxn,
queue_for_block: u64,
output: &OutputWithInInstruction<S>,
) {
let mut outputs =
SerializedQueuedOutputs::get(txn, queue_for_block).unwrap_or(Vec::with_capacity(128));
output.write(&mut outputs).unwrap();
SerializedQueuedOutputs::set(txn, queue_for_block, &outputs);
}
pub(crate) fn save_output_being_forwarded(
txn: &mut impl DbTxn,
block_forwarded_from: u64,
output: &OutputWithInInstruction<S>,
) {
let mut buf = Vec::with_capacity(128);
output.write(&mut buf).unwrap();
let id = output.output.id();
// Save this to an index so we can later fetch all outputs to forward
let mut forwarded_outputs = SerializedForwardedOutputsIndex::get(txn, block_forwarded_from)
.unwrap_or(Vec::with_capacity(32));
forwarded_outputs.extend(id.as_ref());
SerializedForwardedOutputsIndex::set(txn, block_forwarded_from, &forwarded_outputs);
// Save the output itself
SerializedForwardedOutput::set(txn, id.as_ref(), &buf);
}
pub(crate) fn set_in_instructions(
txn: &mut impl DbTxn,
block_number: u64,

View file

@ -195,6 +195,8 @@ impl<S: ScannerFeed> Scanner<S> {
}
/// Register the Eventualities caused by a block.
// TODO: Replace this with a callback returned by acknowledge_block which panics if it's not
// called yet dropped
pub fn register_eventualities(&mut self, block_number: u64, eventualities: Vec<()>) {
todo!("TODO")
}

View file

@ -35,16 +35,16 @@ pub(crate) enum LifetimeStage {
}
impl LifetimeStage {
/// Get the stage of its lifetime this multisig is in based on when the next multisig's key
/// activates.
/// Get the stage of its lifetime this multisig is in, and the block at which we start reporting
/// outputs to it.
///
/// Panics if the multisig being calculated for isn't actually active and a variety of other
/// insane cases.
pub(crate) fn calculate<S: ScannerFeed>(
pub(crate) fn calculate_stage_and_reporting_start_block<S: ScannerFeed>(
block_number: u64,
activation_block_number: u64,
next_keys_activation_block_number: Option<u64>,
) -> Self {
) -> (Self, u64) {
assert!(
activation_block_number >= block_number,
"calculating lifetime stage for an inactive multisig"
@ -53,13 +53,15 @@ impl LifetimeStage {
// activation block itself is the first block within this window
let active_yet_not_reporting_end_block =
activation_block_number + S::CONFIRMATIONS + S::TEN_MINUTES;
// The exclusive end block is the inclusive start block
let reporting_start_block = active_yet_not_reporting_end_block;
if block_number < active_yet_not_reporting_end_block {
return LifetimeStage::ActiveYetNotReporting;
return (LifetimeStage::ActiveYetNotReporting, reporting_start_block);
}
let Some(next_keys_activation_block_number) = next_keys_activation_block_number else {
// If there is no next multisig, this is the active multisig
return LifetimeStage::Active;
return (LifetimeStage::Active, reporting_start_block);
};
assert!(
@ -72,14 +74,14 @@ impl LifetimeStage {
let new_active_yet_not_reporting_end_block =
next_keys_activation_block_number + S::CONFIRMATIONS + S::TEN_MINUTES;
if block_number < new_active_yet_not_reporting_end_block {
return LifetimeStage::Active;
return (LifetimeStage::Active, reporting_start_block);
}
// Step 4 details a further CONFIRMATIONS
let new_active_and_used_for_change_end_block =
new_active_yet_not_reporting_end_block + S::CONFIRMATIONS;
if block_number < new_active_and_used_for_change_end_block {
return LifetimeStage::UsingNewForChange;
return (LifetimeStage::UsingNewForChange, reporting_start_block);
}
// Step 5 details a further 6 hours
@ -87,10 +89,10 @@ impl LifetimeStage {
let new_active_and_forwarded_to_end_block =
new_active_and_used_for_change_end_block + (6 * 6 * S::TEN_MINUTES);
if block_number < new_active_and_forwarded_to_end_block {
return LifetimeStage::Forwarding;
return (LifetimeStage::Forwarding, reporting_start_block);
}
// Step 6
LifetimeStage::Finishing
(LifetimeStage::Finishing, reporting_start_block)
}
}

View file

@ -103,7 +103,10 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
let mut keys = ScannerDb::<S>::active_keys_as_of_next_to_scan_for_outputs_block(&self.db)
.expect("scanning for a blockchain without any keys set");
let mut in_instructions = vec![];
let mut txn = self.db.txn();
let mut in_instructions = ScannerDb::<S>::take_queued_outputs(&mut txn, b);
// Scan for each key
for key in keys {
for output in block.scan_for_outputs(key.key) {
@ -152,24 +155,6 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
continue;
}
// Drop External outputs if they're to a multisig which won't report them
// This means we should report any External output we save to disk here
#[allow(clippy::match_same_arms)]
match key.stage {
// TODO: Delay External outputs
LifetimeStage::ActiveYetNotReporting => todo!("TODO"),
// We should report External outputs in these cases
LifetimeStage::Active | LifetimeStage::UsingNewForChange => {}
// We should report External outputs only once forwarded, where they'll appear as
// OutputType::Forwarded
LifetimeStage::Forwarding => todo!("TODO"),
// We should drop these as we should not be handling new External outputs at this
// time
LifetimeStage::Finishing => {
continue;
}
}
// Check this isn't dust
let balance_to_use = {
let mut balance = output.balance();
@ -190,27 +175,59 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
balance
};
// Decode and save the InInstruction/return addr for this output
match in_instruction_from_output::<S>(&output) {
(return_address, Some(instruction)) => {
let in_instruction =
InInstructionWithBalance { instruction, balance: balance_to_use };
// TODO: Make a proper struct out of this
in_instructions.push(OutputWithInInstruction {
output,
return_address,
in_instruction,
});
todo!("TODO: Save to be reported")
// Fetch the InInstruction/return addr for this output
let output_with_in_instruction = match in_instruction_from_output::<S>(&output) {
(return_address, Some(instruction)) => OutputWithInInstruction {
output,
return_address,
in_instruction: InInstructionWithBalance { instruction, balance: balance_to_use },
},
(Some(return_addr), None) => {
// Since there was no instruction here, return this since we parsed a return address
ScannerDb::<S>::queue_return(&mut txn, b, return_addr, output);
continue;
}
// Since we didn't receive an instruction nor can we return this, move on
(None, None) => continue,
};
// Drop External outputs if they're to a multisig which won't report them
// This means we should report any External output we save to disk here
#[allow(clippy::match_same_arms)]
match key.stage {
// This multisig isn't yet reporting its External outputs to avoid a DoS
// Queue the output to be reported when this multisig starts reporting
LifetimeStage::ActiveYetNotReporting => {
ScannerDb::<S>::queue_output_until_block(
&mut txn,
key.block_at_which_reporting_starts,
&output_with_in_instruction,
);
continue;
}
// We should report External outputs in these cases
LifetimeStage::Active | LifetimeStage::UsingNewForChange => {}
// We should report External outputs only once forwarded, where they'll appear as
// OutputType::Forwarded. We save them now for when they appear
LifetimeStage::Forwarding => {
// When the forwarded output appears, we can see which Plan it's associated with and
// from there recover this output
ScannerDb::<S>::save_output_being_forwarded(&mut txn, &output_with_in_instruction);
continue;
}
// We should drop these as we should not be handling new External outputs at this
// time
LifetimeStage::Finishing => {
continue;
}
(Some(return_addr), None) => todo!("TODO: Queue return"),
// Since we didn't receive an instruction nor can we return this, accumulate it
(None, None) => {}
}
// Ensures we didn't miss a `continue` above
assert!(matches!(key.stage, LifetimeStage::Active | LifetimeStage::UsingNewForChange));
in_instructions.push(output_with_in_instruction);
}
}
let mut txn = self.db.txn();
// Save the in instructions
ScannerDb::<S>::set_in_instructions(&mut txn, b, in_instructions);
// Update the next to scan block