Logs, documentation, misc

This commit is contained in:
Luke Parker 2024-08-27 16:43:50 -04:00
parent 9cebdf7c68
commit a771fbe1c6
4 changed files with 69 additions and 308 deletions

View file

@ -82,25 +82,34 @@ create_db!(
pub(crate) struct ScannerDb<S: ScannerFeed>(PhantomData<S>);
impl<S: ScannerFeed> ScannerDb<S> {
// activation_block_number is inclusive, so the key will be scanned for starting at the specified
// block
/// Queue a key.
///
/// Keys may be queued whenever, so long as they're scheduled to activate `WINDOW_LENGTH` blocks
/// after the next block acknowledged after they've been set. There is no requirement that any
/// prior keys have had their processing completed (meaning what should be a length-2 vector may
/// be a length-n vector).
///
/// A new key MUST NOT be queued to activate a block preceding the finishing of the key prior to
/// its prior. There MUST only be two keys active at one time.
///
/// activation_block_number is inclusive, so the key will be scanned for starting at the
/// specified block.
pub(crate) fn queue_key(txn: &mut impl DbTxn, activation_block_number: u64, key: KeyFor<S>) {
// Set this block as notable
NotableBlock::set(txn, activation_block_number, &());
// TODO: Panic if we've ever seen this key before
// Push the key
let mut keys: Vec<SeraiKeyDbEntry<BorshG<KeyFor<S>>>> = ActiveKeys::get(txn).unwrap_or(vec![]);
for key_i in &keys {
if key == key_i.key.0 {
panic!("queueing a key prior queued");
}
}
keys.push(SeraiKeyDbEntry { activation_block_number, key: BorshG(key) });
ActiveKeys::set(txn, &keys);
}
/// Retire a key.
///
/// The key retired must be the oldest key. There must be another key actively tracked.
// TODO: This will be called from the Eventuality task yet this field is read by the scan task
// We need to write the argument for its safety
// TODO: retire_key needs to set the notable block
pub(crate) fn retire_key(txn: &mut impl DbTxn, key: KeyFor<S>) {
let mut keys: Vec<SeraiKeyDbEntry<BorshG<KeyFor<S>>>> =
ActiveKeys::get(txn).expect("retiring key yet no active keys");
@ -110,6 +119,9 @@ impl<S: ScannerFeed> ScannerDb<S> {
keys.remove(0);
ActiveKeys::set(txn, &keys);
}
/// Fetch the active keys, as of the next-to-scan-for-outputs Block.
///
/// This means the scan task should scan for all keys returned by this.
pub(crate) fn active_keys_as_of_next_to_scan_for_outputs_block(
getter: &impl Get,
) -> Option<Vec<SeraiKey<KeyFor<S>>>> {
@ -131,7 +143,7 @@ impl<S: ScannerFeed> ScannerDb<S> {
);
keys.push(SeraiKey { key: raw_keys[i].key.0, stage, block_at_which_reporting_starts });
}
assert!(keys.len() <= 2);
assert!(keys.len() <= 2, "more than two keys active");
Some(keys)
}
@ -152,7 +164,6 @@ impl<S: ScannerFeed> ScannerDb<S> {
// We can only scan up to whatever block we've checked the Eventualities of, plus the window
// length. Since this returns an inclusive bound, we need to subtract 1
// See `eventuality.rs` for more info
// TODO: Adjust based on register eventualities
NextToCheckForEventualitiesBlock::get(getter).map(|b| b + S::WINDOW_LENGTH - 1)
}

View file

@ -117,7 +117,7 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
let block = self.feed.block_by_number(&self.db, b).await?;
log::info!("checking eventuality completions in block: {} ({b})", hex::encode(block.id()));
log::debug!("checking eventuality completions in block: {} ({b})", hex::encode(block.id()));
/*
This is proper as the keys for the next to scan block (at most `WINDOW_LENGTH` ahead,
@ -147,13 +147,21 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
let mut outputs = received_external_outputs;
for key in keys {
let completed_eventualities = {
let (eventualities_is_empty, completed_eventualities) = {
let mut eventualities = EventualityDb::<S>::eventualities(&txn, key.key);
let completed_eventualities = block.check_for_eventuality_resolutions(&mut eventualities);
EventualityDb::<S>::set_eventualities(&mut txn, key.key, &eventualities);
completed_eventualities
(eventualities.active_eventualities.is_empty(), completed_eventualities)
};
for (tx, completed_eventuality) in completed_eventualities {
log::info!(
"eventuality {} resolved by {}",
hex::encode(completed_eventuality.id()),
hex::encode(tx.as_ref())
);
}
// Fetch all non-External outputs
let mut non_external_outputs = block.scan_for_outputs(key.key);
non_external_outputs.retain(|output| output.kind() != OutputType::External);
@ -213,7 +221,6 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
outputs.extend(non_external_outputs);
}
// TODO: This also has to intake Burns
let mut scheduler_update = SchedulerUpdate { outputs, forwards, returns };
scheduler_update.outputs.sort_by(sort_outputs);
scheduler_update.forwards.sort_by(sort_outputs);
@ -234,6 +241,22 @@ impl<D: Db, S: ScannerFeed, Sch: Scheduler<S>> ContinuallyRan for EventualityTas
EventualityDb::<S>::set_eventualities(&mut txn, key, &eventualities);
}
for key in keys {
if key.stage == LifetimeStage::Finishing {
let eventualities = EventualityDb::<S>::eventualities(&txn, key.key);
if eventualities.active_eventualities.is_empty() {
log::info!(
"key {} has finished and is being retired",
hex::encode(key.key.to_bytes().as_ref())
);
ScannerDb::<S>::flag_notable(&mut txn, b + S::WINDOW_LENGTH);
// TODO: Retire the key
todo!("TODO")
}
}
}
// Update the next to check block
ScannerDb::<S>::set_next_to_check_for_eventualities_block(&mut txn, next_to_check);
txn.commit();

View file

@ -196,38 +196,34 @@ impl<S: ScannerFeed> Scanner<S> {
///
/// This means this block was ordered on Serai in relation to `Burn` events, and all validators
/// have achieved synchrony on it.
// TODO: If we're acknowledging block `b`, the Eventuality task was already eligible to check it
// for Eventualities. We need this to block until the Eventuality task has actually checked it.
// TODO: Does the prior TODO hold with how the callback is now handled?
pub fn acknowledge_block(
&mut self,
txn: &mut impl DbTxn,
block_number: u64,
key_to_activate: Option<()>,
) -> Vec<OutputFor<S>> {
key_to_activate: Option<KeyFor<S>>,
) {
log::info!("acknowledging block {block_number}");
assert!(
ScannerDb::<S>::is_block_notable(txn, block_number),
"acknowledging a block which wasn't notable"
);
ScannerDb::<S>::set_highest_acknowledged_block(txn, block_number);
ScannerDb::<S>::queue_key(txn, block_number + S::WINDOW_LENGTH);
}
/// Queue Burns.
///
/// The scanner only updates the scheduler with new outputs upon acknowledging a block. We can
/// safely queue Burns so long as they're only actually added once we've handled the outputs from
/// the block acknowledged prior to their queueing.
pub fn queue_burns(&mut self, txn: &mut impl DbTxn, burns: Vec<()>) {
let queue_as_of = ScannerDb::<S>::highest_acknowledged_block(txn)
.expect("queueing Burns yet never acknowledged a block");
todo!("TODO")
}
}
/*
#[derive(Clone, Debug)]
pub enum ScannerEvent<N: Network> {
// Block scanned
Block {
is_retirement_block: bool,
block: <N::Block as Block<N>>::Id,
outputs: Vec<N::Output>,
},
// Eventuality completion found on-chain
// TODO: Move this from a tuple
Completed(
Vec<u8>,
usize,
[u8; 32],
<N::Transaction as Transaction<N>>::Id,
<N::Eventuality as Eventuality>::Completion,
),
}
#[derive(Clone, Debug)]
struct ScannerDb<N: Network, D: Db>(PhantomData<N>, PhantomData<D>);
impl<N: Network, D: Db> ScannerDb<N, D> {
@ -258,182 +254,8 @@ impl<N: Network, D: Db> ScannerDb<N, D> {
.get(Self::scanned_block_key())
.map(|bytes| u64::from_le_bytes(bytes.try_into().unwrap()).try_into().unwrap())
}
fn retirement_block_key(key: &<N::Curve as Ciphersuite>::G) -> Vec<u8> {
Self::scanner_key(b"retirement_block", key.to_bytes())
}
fn save_retirement_block(
txn: &mut D::Transaction<'_>,
key: &<N::Curve as Ciphersuite>::G,
block: usize,
) {
txn.put(Self::retirement_block_key(key), u64::try_from(block).unwrap().to_le_bytes());
}
fn retirement_block<G: Get>(getter: &G, key: &<N::Curve as Ciphersuite>::G) -> Option<usize> {
getter
.get(Self::retirement_block_key(key))
.map(|bytes| usize::try_from(u64::from_le_bytes(bytes.try_into().unwrap())).unwrap())
}
}
impl<N: Network, D: Db> ScannerHandle<N, D> {
/// Acknowledge having handled a block.
///
/// Creates a lock over the Scanner, preventing its independent scanning operations until
/// released.
///
/// This must only be called on blocks which have been scanned in-memory.
pub async fn ack_block(
&mut self,
txn: &mut D::Transaction<'_>,
id: <N::Block as Block<N>>::Id,
) -> (bool, Vec<N::Output>) {
debug!("block {} acknowledged", hex::encode(&id));
let mut scanner = self.scanner.long_term_acquire().await;
// Get the number for this block
let number = ScannerDb::<N, D>::block_number(txn, &id)
.expect("main loop trying to operate on data we haven't scanned");
log::trace!("block {} was {number}", hex::encode(&id));
let outputs = ScannerDb::<N, D>::save_scanned_block(txn, number);
// This has a race condition if we try to ack a block we scanned on a prior boot, and we have
// yet to scan it on this boot
assert!(number <= scanner.ram_scanned.unwrap());
for output in &outputs {
assert!(scanner.ram_outputs.remove(output.id().as_ref()));
}
assert_eq!(scanner.need_ack.pop_front().unwrap(), number);
self.held_scanner = Some(scanner);
// Load the key from the DB, as it will have already been removed from RAM if retired
let key = ScannerDb::<N, D>::keys(txn)[0].1;
let is_retirement_block = ScannerDb::<N, D>::retirement_block(txn, &key) == Some(number);
if is_retirement_block {
ScannerDb::<N, D>::retire_key(txn);
}
(is_retirement_block, outputs)
}
pub async fn register_eventuality(
&mut self,
key: &[u8],
block_number: usize,
id: [u8; 32],
eventuality: N::Eventuality,
) {
let mut lock;
// We won't use held_scanner if we're re-registering on boot
(if let Some(scanner) = self.held_scanner.as_mut() {
scanner
} else {
lock = Some(self.scanner.write().await);
lock.as_mut().unwrap().as_mut().unwrap()
})
.eventualities
.get_mut(key)
.unwrap()
.register(block_number, id, eventuality)
}
pub async fn release_lock(&mut self) {
self.scanner.restore(self.held_scanner.take().unwrap()).await
}
}
impl<N: Network, D: Db> Scanner<N, D> {
#[allow(clippy::type_complexity, clippy::new_ret_no_self)]
pub fn new(
network: N,
db: D,
) -> (ScannerHandle<N, D>, Vec<(usize, <N::Curve as Ciphersuite>::G)>) {
let (multisig_completed_send, multisig_completed_recv) = mpsc::unbounded_channel();
let keys = ScannerDb::<N, D>::keys(&db);
let mut eventualities = HashMap::new();
for key in &keys {
eventualities.insert(key.1.to_bytes().as_ref().to_vec(), EventualitiesTracker::new());
}
}
// An async function, to be spawned on a task, to discover and report outputs
async fn run(
mut db: D,
network: N,
scanner_hold: ScannerHold<N, D>,
mut multisig_completed: mpsc::UnboundedReceiver<bool>,
) {
loop {
for block_being_scanned in (ram_scanned + 1) ..= latest_block_to_scan {
// Redo the checks for if we're too far ahead
{
let needing_ack = {
let scanner_lock = scanner_hold.read().await;
let scanner = scanner_lock.as_ref().unwrap();
scanner.need_ack.front().copied()
};
if let Some(needing_ack) = needing_ack {
let limit = needing_ack + N::CONFIRMATIONS;
assert!(block_being_scanned <= limit);
if block_being_scanned == limit {
break;
}
}
}
let Ok(block) = network.get_block(block_being_scanned).await else {
warn!("couldn't get block {block_being_scanned}");
break;
};
let block_id = block.id();
info!("scanning block: {} ({block_being_scanned})", hex::encode(&block_id));
// Scan new blocks
// TODO: This lock acquisition may be long-lived...
let mut scanner_lock = scanner_hold.write().await;
let scanner = scanner_lock.as_mut().unwrap();
let mut has_activation = false;
let mut outputs = vec![];
let mut completion_block_numbers = vec![];
for (activation_number, key) in scanner.keys.clone() {
if activation_number > block_being_scanned {
continue;
}
if activation_number == block_being_scanned {
has_activation = true;
}
for (id, (block_number, tx, completion)) in network
.get_eventuality_completions(scanner.eventualities.get_mut(&key_vec).unwrap(), &block)
.await
{
info!(
"eventuality {} resolved by {}, as found on chain",
hex::encode(id),
hex::encode(tx.as_ref())
);
completion_block_numbers.push(block_number);
// This must be before the mission of ScannerEvent::Block, per commentary in mod.rs
if !scanner.emit(ScannerEvent::Completed(
key_vec.clone(),
block_number,
id,
tx,
completion,
)) {
return;
}
}
}
// Panic if we've already seen these outputs
for output in &outputs {
let id = output.id();
@ -482,99 +304,4 @@ impl<N: Network, D: Db> Scanner<N, D> {
}
scanner.ram_outputs.insert(id);
}
// We could remove this, if instead of doing the first block which passed
// requirements + CONFIRMATIONS, we simply emitted an event for every block where
// `number % CONFIRMATIONS == 0` (once at the final stage for the existing multisig)
// There's no need at this point, yet the latter may be more suitable for modeling...
async fn check_multisig_completed<N: Network, D: Db>(
db: &mut D,
multisig_completed: &mut mpsc::UnboundedReceiver<bool>,
block_number: usize,
) -> bool {
match multisig_completed.recv().await {
None => {
info!("Scanner handler was dropped. Shutting down?");
false
}
Some(completed) => {
// Set the retirement block as block_number + CONFIRMATIONS
if completed {
let mut txn = db.txn();
// The retiring key is the earliest one still around
let retiring_key = ScannerDb::<N, D>::keys(&txn)[0].1;
// This value is static w.r.t. the key
ScannerDb::<N, D>::save_retirement_block(
&mut txn,
&retiring_key,
block_number + N::CONFIRMATIONS,
);
txn.commit();
}
true
}
}
}
drop(scanner_lock);
// Now that we've dropped the Scanner lock, we need to handle the multisig_completed
// channel before we decide if this block should be fired or not
// (holding the Scanner risks a deadlock)
for block_number in completion_block_numbers {
if !check_multisig_completed::<N, _>(&mut db, &mut multisig_completed, block_number).await
{
return;
};
}
// Reacquire the scanner
let mut scanner_lock = scanner_hold.write().await;
let scanner = scanner_lock.as_mut().unwrap();
// Only emit an event if any of the following is true:
// - This is an activation block
// - This is a retirement block
// - There's outputs
// as only those blocks are meaningful and warrant obtaining synchrony over
let is_retirement_block =
ScannerDb::<N, D>::retirement_block(&db, &scanner.keys[0].1) == Some(block_being_scanned);
let sent_block = if has_activation || is_retirement_block || (!outputs.is_empty()) {
// Save the outputs to disk
let mut txn = db.txn();
ScannerDb::<N, D>::save_outputs(&mut txn, &block_id, &outputs);
txn.commit();
// Send all outputs
if !scanner.emit(ScannerEvent::Block { is_retirement_block, block: block_id, outputs }) {
return;
}
// Since we're creating a Batch, mark it as needing ack
scanner.need_ack.push_back(block_being_scanned);
true
} else {
false
};
// Remove it from memory
if is_retirement_block {
let retired = scanner.keys.remove(0).1;
scanner.eventualities.remove(retired.to_bytes().as_ref());
}
drop(scanner_lock);
// If we sent a Block event, once again check multisig_completed
if sent_block &&
(!check_multisig_completed::<N, _>(
&mut db,
&mut multisig_completed,
block_being_scanned,
)
.await)
{
return;
}
}
}
}
}
*/

View file

@ -245,7 +245,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanForOutputsTask<D, S> {
// Send the scan data to the eventuality task
ScanToEventualityDb::<S>::send_scan_data(&mut txn, b, &scan_data);
// Send the in instructions to the report task
// Send the InInstructions to the report task
ScanToReportDb::<S>::send_in_instructions(&mut txn, b, in_instructions);
// Update the next to scan block
ScannerDb::<S>::set_next_to_scan_for_outputs_block(&mut txn, b + 1);