From a771fbe1c6ee23ce4213e6931ae9bd8bf099b2ba Mon Sep 17 00:00:00 2001 From: Luke Parker <lukeparker5132@gmail.com> Date: Tue, 27 Aug 2024 16:43:50 -0400 Subject: [PATCH] Logs, documentation, misc --- processor/scanner/src/db.rs | 31 ++- processor/scanner/src/eventuality/mod.rs | 31 ++- processor/scanner/src/lib.rs | 313 ++--------------------- processor/scanner/src/scan.rs | 2 +- 4 files changed, 69 insertions(+), 308 deletions(-) diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index b4d7c27b..d53bf7c7 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -82,25 +82,34 @@ create_db!( pub(crate) struct ScannerDb<S: ScannerFeed>(PhantomData<S>); impl<S: ScannerFeed> ScannerDb<S> { - // activation_block_number is inclusive, so the key will be scanned for starting at the specified - // block + /// Queue a key. + /// + /// Keys may be queued whenever, so long as they're scheduled to activate `WINDOW_LENGTH` blocks + /// after the next block acknowledged after they've been set. There is no requirement that any + /// prior keys have had their processing completed (meaning what should be a length-2 vector may + /// be a length-n vector). + /// + /// A new key MUST NOT be queued to activate a block preceding the finishing of the key prior to + /// its prior. There MUST only be two keys active at one time. + /// + /// activation_block_number is inclusive, so the key will be scanned for starting at the + /// specified block. pub(crate) fn queue_key(txn: &mut impl DbTxn, activation_block_number: u64, key: KeyFor<S>) { // Set this block as notable NotableBlock::set(txn, activation_block_number, &()); + // TODO: Panic if we've ever seen this key before + // Push the key let mut keys: Vec<SeraiKeyDbEntry<BorshG<KeyFor<S>>>> = ActiveKeys::get(txn).unwrap_or(vec![]); - for key_i in &keys { - if key == key_i.key.0 { - panic!("queueing a key prior queued"); - } - } keys.push(SeraiKeyDbEntry { activation_block_number, key: BorshG(key) }); ActiveKeys::set(txn, &keys); } + /// Retire a key. + /// + /// The key retired must be the oldest key. There must be another key actively tracked. // TODO: This will be called from the Eventuality task yet this field is read by the scan task // We need to write the argument for its safety - // TODO: retire_key needs to set the notable block pub(crate) fn retire_key(txn: &mut impl DbTxn, key: KeyFor<S>) { let mut keys: Vec<SeraiKeyDbEntry<BorshG<KeyFor<S>>>> = ActiveKeys::get(txn).expect("retiring key yet no active keys"); @@ -110,6 +119,9 @@ impl<S: ScannerFeed> ScannerDb<S> { keys.remove(0); ActiveKeys::set(txn, &keys); } + /// Fetch the active keys, as of the next-to-scan-for-outputs Block. + /// + /// This means the scan task should scan for all keys returned by this. pub(crate) fn active_keys_as_of_next_to_scan_for_outputs_block( getter: &impl Get, ) -> Option<Vec<SeraiKey<KeyFor<S>>>> { @@ -131,7 +143,7 @@ impl<S: ScannerFeed> ScannerDb<S> { ); keys.push(SeraiKey { key: raw_keys[i].key.0, stage, block_at_which_reporting_starts }); } - assert!(keys.len() <= 2); + assert!(keys.len() <= 2, "more than two keys active"); Some(keys) } @@ -152,7 +164,6 @@ impl<S: ScannerFeed> ScannerDb<S> { // We can only scan up to whatever block we've checked the Eventualities of, plus the window // length. Since this returns an inclusive bound, we need to subtract 1 // See `eventuality.rs` for more info - // TODO: Adjust based on register eventualities NextToCheckForEventualitiesBlock::get(getter).map(|b| b + S::WINDOW_LENGTH - 1) } diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index 20e24112..3a472ce2 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -117,7 +117,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas let block = self.feed.block_by_number(&self.db, b).await?; - log::info!("checking eventuality completions in block: {} ({b})", hex::encode(block.id())); + log::debug!("checking eventuality completions in block: {} ({b})", hex::encode(block.id())); /* This is proper as the keys for the next to scan block (at most `WINDOW_LENGTH` ahead, @@ -147,13 +147,21 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas let mut outputs = received_external_outputs; for key in keys { - let completed_eventualities = { + let (eventualities_is_empty, completed_eventualities) = { let mut eventualities = EventualityDb::<S>::eventualities(&txn, key.key); let completed_eventualities = block.check_for_eventuality_resolutions(&mut eventualities); EventualityDb::<S>::set_eventualities(&mut txn, key.key, &eventualities); - completed_eventualities + (eventualities.active_eventualities.is_empty(), completed_eventualities) }; + for (tx, completed_eventuality) in completed_eventualities { + log::info!( + "eventuality {} resolved by {}", + hex::encode(completed_eventuality.id()), + hex::encode(tx.as_ref()) + ); + } + // Fetch all non-External outputs let mut non_external_outputs = block.scan_for_outputs(key.key); non_external_outputs.retain(|output| output.kind() != OutputType::External); @@ -213,7 +221,6 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas outputs.extend(non_external_outputs); } - // TODO: This also has to intake Burns let mut scheduler_update = SchedulerUpdate { outputs, forwards, returns }; scheduler_update.outputs.sort_by(sort_outputs); scheduler_update.forwards.sort_by(sort_outputs); @@ -234,6 +241,22 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas EventualityDb::<S>::set_eventualities(&mut txn, key, &eventualities); } + for key in keys { + if key.stage == LifetimeStage::Finishing { + let eventualities = EventualityDb::<S>::eventualities(&txn, key.key); + if eventualities.active_eventualities.is_empty() { + log::info!( + "key {} has finished and is being retired", + hex::encode(key.key.to_bytes().as_ref()) + ); + + ScannerDb::<S>::flag_notable(&mut txn, b + S::WINDOW_LENGTH); + // TODO: Retire the key + todo!("TODO") + } + } + } + // Update the next to check block ScannerDb::<S>::set_next_to_check_for_eventualities_block(&mut txn, next_to_check); txn.commit(); diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index d245e255..2d19207f 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -196,38 +196,34 @@ impl<S: ScannerFeed> Scanner<S> { /// /// This means this block was ordered on Serai in relation to `Burn` events, and all validators /// have achieved synchrony on it. - // TODO: If we're acknowledging block `b`, the Eventuality task was already eligible to check it - // for Eventualities. We need this to block until the Eventuality task has actually checked it. - // TODO: Does the prior TODO hold with how the callback is now handled? pub fn acknowledge_block( &mut self, + txn: &mut impl DbTxn, block_number: u64, - key_to_activate: Option<()>, - ) -> Vec<OutputFor<S>> { + key_to_activate: Option<KeyFor<S>>, + ) { + log::info!("acknowledging block {block_number}"); + assert!( + ScannerDb::<S>::is_block_notable(txn, block_number), + "acknowledging a block which wasn't notable" + ); + ScannerDb::<S>::set_highest_acknowledged_block(txn, block_number); + ScannerDb::<S>::queue_key(txn, block_number + S::WINDOW_LENGTH); + } + + /// Queue Burns. + /// + /// The scanner only updates the scheduler with new outputs upon acknowledging a block. We can + /// safely queue Burns so long as they're only actually added once we've handled the outputs from + /// the block acknowledged prior to their queueing. + pub fn queue_burns(&mut self, txn: &mut impl DbTxn, burns: Vec<()>) { + let queue_as_of = ScannerDb::<S>::highest_acknowledged_block(txn) + .expect("queueing Burns yet never acknowledged a block"); todo!("TODO") } } /* -#[derive(Clone, Debug)] -pub enum ScannerEvent<N: Network> { - // Block scanned - Block { - is_retirement_block: bool, - block: <N::Block as Block<N>>::Id, - outputs: Vec<N::Output>, - }, - // Eventuality completion found on-chain - // TODO: Move this from a tuple - Completed( - Vec<u8>, - usize, - [u8; 32], - <N::Transaction as Transaction<N>>::Id, - <N::Eventuality as Eventuality>::Completion, - ), -} - #[derive(Clone, Debug)] struct ScannerDb<N: Network, D: Db>(PhantomData<N>, PhantomData<D>); impl<N: Network, D: Db> ScannerDb<N, D> { @@ -258,182 +254,8 @@ impl<N: Network, D: Db> ScannerDb<N, D> { .get(Self::scanned_block_key()) .map(|bytes| u64::from_le_bytes(bytes.try_into().unwrap()).try_into().unwrap()) } - - fn retirement_block_key(key: &<N::Curve as Ciphersuite>::G) -> Vec<u8> { - Self::scanner_key(b"retirement_block", key.to_bytes()) - } - fn save_retirement_block( - txn: &mut D::Transaction<'_>, - key: &<N::Curve as Ciphersuite>::G, - block: usize, - ) { - txn.put(Self::retirement_block_key(key), u64::try_from(block).unwrap().to_le_bytes()); - } - fn retirement_block<G: Get>(getter: &G, key: &<N::Curve as Ciphersuite>::G) -> Option<usize> { - getter - .get(Self::retirement_block_key(key)) - .map(|bytes| usize::try_from(u64::from_le_bytes(bytes.try_into().unwrap())).unwrap()) - } } -impl<N: Network, D: Db> ScannerHandle<N, D> { - /// Acknowledge having handled a block. - /// - /// Creates a lock over the Scanner, preventing its independent scanning operations until - /// released. - /// - /// This must only be called on blocks which have been scanned in-memory. - pub async fn ack_block( - &mut self, - txn: &mut D::Transaction<'_>, - id: <N::Block as Block<N>>::Id, - ) -> (bool, Vec<N::Output>) { - debug!("block {} acknowledged", hex::encode(&id)); - - let mut scanner = self.scanner.long_term_acquire().await; - - // Get the number for this block - let number = ScannerDb::<N, D>::block_number(txn, &id) - .expect("main loop trying to operate on data we haven't scanned"); - log::trace!("block {} was {number}", hex::encode(&id)); - - let outputs = ScannerDb::<N, D>::save_scanned_block(txn, number); - // This has a race condition if we try to ack a block we scanned on a prior boot, and we have - // yet to scan it on this boot - assert!(number <= scanner.ram_scanned.unwrap()); - for output in &outputs { - assert!(scanner.ram_outputs.remove(output.id().as_ref())); - } - - assert_eq!(scanner.need_ack.pop_front().unwrap(), number); - - self.held_scanner = Some(scanner); - - // Load the key from the DB, as it will have already been removed from RAM if retired - let key = ScannerDb::<N, D>::keys(txn)[0].1; - let is_retirement_block = ScannerDb::<N, D>::retirement_block(txn, &key) == Some(number); - if is_retirement_block { - ScannerDb::<N, D>::retire_key(txn); - } - (is_retirement_block, outputs) - } - - pub async fn register_eventuality( - &mut self, - key: &[u8], - block_number: usize, - id: [u8; 32], - eventuality: N::Eventuality, - ) { - let mut lock; - // We won't use held_scanner if we're re-registering on boot - (if let Some(scanner) = self.held_scanner.as_mut() { - scanner - } else { - lock = Some(self.scanner.write().await); - lock.as_mut().unwrap().as_mut().unwrap() - }) - .eventualities - .get_mut(key) - .unwrap() - .register(block_number, id, eventuality) - } - - pub async fn release_lock(&mut self) { - self.scanner.restore(self.held_scanner.take().unwrap()).await - } -} - -impl<N: Network, D: Db> Scanner<N, D> { - #[allow(clippy::type_complexity, clippy::new_ret_no_self)] - pub fn new( - network: N, - db: D, - ) -> (ScannerHandle<N, D>, Vec<(usize, <N::Curve as Ciphersuite>::G)>) { - let (multisig_completed_send, multisig_completed_recv) = mpsc::unbounded_channel(); - - let keys = ScannerDb::<N, D>::keys(&db); - let mut eventualities = HashMap::new(); - for key in &keys { - eventualities.insert(key.1.to_bytes().as_ref().to_vec(), EventualitiesTracker::new()); - } - } - - // An async function, to be spawned on a task, to discover and report outputs - async fn run( - mut db: D, - network: N, - scanner_hold: ScannerHold<N, D>, - mut multisig_completed: mpsc::UnboundedReceiver<bool>, - ) { - loop { - for block_being_scanned in (ram_scanned + 1) ..= latest_block_to_scan { - // Redo the checks for if we're too far ahead - { - let needing_ack = { - let scanner_lock = scanner_hold.read().await; - let scanner = scanner_lock.as_ref().unwrap(); - scanner.need_ack.front().copied() - }; - - if let Some(needing_ack) = needing_ack { - let limit = needing_ack + N::CONFIRMATIONS; - assert!(block_being_scanned <= limit); - if block_being_scanned == limit { - break; - } - } - } - - let Ok(block) = network.get_block(block_being_scanned).await else { - warn!("couldn't get block {block_being_scanned}"); - break; - }; - let block_id = block.id(); - - info!("scanning block: {} ({block_being_scanned})", hex::encode(&block_id)); - - // Scan new blocks - // TODO: This lock acquisition may be long-lived... - let mut scanner_lock = scanner_hold.write().await; - let scanner = scanner_lock.as_mut().unwrap(); - - let mut has_activation = false; - let mut outputs = vec![]; - let mut completion_block_numbers = vec![]; - for (activation_number, key) in scanner.keys.clone() { - if activation_number > block_being_scanned { - continue; - } - - if activation_number == block_being_scanned { - has_activation = true; - } - - for (id, (block_number, tx, completion)) in network - .get_eventuality_completions(scanner.eventualities.get_mut(&key_vec).unwrap(), &block) - .await - { - info!( - "eventuality {} resolved by {}, as found on chain", - hex::encode(id), - hex::encode(tx.as_ref()) - ); - - completion_block_numbers.push(block_number); - // This must be before the mission of ScannerEvent::Block, per commentary in mod.rs - if !scanner.emit(ScannerEvent::Completed( - key_vec.clone(), - block_number, - id, - tx, - completion, - )) { - return; - } - } - } - // Panic if we've already seen these outputs for output in &outputs { let id = output.id(); @@ -482,99 +304,4 @@ impl<N: Network, D: Db> Scanner<N, D> { } scanner.ram_outputs.insert(id); } - - // We could remove this, if instead of doing the first block which passed - // requirements + CONFIRMATIONS, we simply emitted an event for every block where - // `number % CONFIRMATIONS == 0` (once at the final stage for the existing multisig) - // There's no need at this point, yet the latter may be more suitable for modeling... - async fn check_multisig_completed<N: Network, D: Db>( - db: &mut D, - multisig_completed: &mut mpsc::UnboundedReceiver<bool>, - block_number: usize, - ) -> bool { - match multisig_completed.recv().await { - None => { - info!("Scanner handler was dropped. Shutting down?"); - false - } - Some(completed) => { - // Set the retirement block as block_number + CONFIRMATIONS - if completed { - let mut txn = db.txn(); - // The retiring key is the earliest one still around - let retiring_key = ScannerDb::<N, D>::keys(&txn)[0].1; - // This value is static w.r.t. the key - ScannerDb::<N, D>::save_retirement_block( - &mut txn, - &retiring_key, - block_number + N::CONFIRMATIONS, - ); - txn.commit(); - } - true - } - } - } - - drop(scanner_lock); - // Now that we've dropped the Scanner lock, we need to handle the multisig_completed - // channel before we decide if this block should be fired or not - // (holding the Scanner risks a deadlock) - for block_number in completion_block_numbers { - if !check_multisig_completed::<N, _>(&mut db, &mut multisig_completed, block_number).await - { - return; - }; - } - - // Reacquire the scanner - let mut scanner_lock = scanner_hold.write().await; - let scanner = scanner_lock.as_mut().unwrap(); - - // Only emit an event if any of the following is true: - // - This is an activation block - // - This is a retirement block - // - There's outputs - // as only those blocks are meaningful and warrant obtaining synchrony over - let is_retirement_block = - ScannerDb::<N, D>::retirement_block(&db, &scanner.keys[0].1) == Some(block_being_scanned); - let sent_block = if has_activation || is_retirement_block || (!outputs.is_empty()) { - // Save the outputs to disk - let mut txn = db.txn(); - ScannerDb::<N, D>::save_outputs(&mut txn, &block_id, &outputs); - txn.commit(); - - // Send all outputs - if !scanner.emit(ScannerEvent::Block { is_retirement_block, block: block_id, outputs }) { - return; - } - - // Since we're creating a Batch, mark it as needing ack - scanner.need_ack.push_back(block_being_scanned); - true - } else { - false - }; - - // Remove it from memory - if is_retirement_block { - let retired = scanner.keys.remove(0).1; - scanner.eventualities.remove(retired.to_bytes().as_ref()); - } - drop(scanner_lock); - // If we sent a Block event, once again check multisig_completed - if sent_block && - (!check_multisig_completed::<N, _>( - &mut db, - &mut multisig_completed, - block_being_scanned, - ) - .await) - { - return; - } - } - } - } -} */ diff --git a/processor/scanner/src/scan.rs b/processor/scanner/src/scan.rs index 8617ec18..f176680e 100644 --- a/processor/scanner/src/scan.rs +++ b/processor/scanner/src/scan.rs @@ -245,7 +245,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> { // Send the scan data to the eventuality task ScanToEventualityDb::<S>::send_scan_data(&mut txn, b, &scan_data); - // Send the in instructions to the report task + // Send the InInstructions to the report task ScanToReportDb::<S>::send_in_instructions(&mut txn, b, in_instructions); // Update the next to scan block ScannerDb::<S>::set_next_to_scan_for_outputs_block(&mut txn, b + 1);