From 76cbe6cf1edcba3967c65d31769abcc04ed54dfe Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Fri, 30 Aug 2024 01:19:29 -0400 Subject: [PATCH] Have acknowledge_block take in the results of the InInstructions executed If any failed, the scanner now creates a Burn for the return. --- processor/primitives/src/output.rs | 2 +- processor/scanner/src/db.rs | 59 +++++++++++++++++++++++----- processor/scanner/src/lib.rs | 43 ++++++++++++++++++-- processor/scanner/src/report/db.rs | 61 ++++++++++++++++++++++++++++- processor/scanner/src/report/mod.rs | 51 ++++++++++++++++++------ processor/scanner/src/scan/mod.rs | 18 +++++++-- 6 files changed, 203 insertions(+), 31 deletions(-) diff --git a/processor/primitives/src/output.rs b/processor/primitives/src/output.rs index 777b2c52..9a300940 100644 --- a/processor/primitives/src/output.rs +++ b/processor/primitives/src/output.rs @@ -8,7 +8,7 @@ use serai_primitives::{ExternalAddress, Balance}; use crate::Id; /// An address on the external network. -pub trait Address: Send + Sync + TryFrom { +pub trait Address: Send + Sync + Into + TryFrom { /// Write this address. fn write(&self, writer: &mut impl io::Write) -> io::Result<()>; /// Read an address. diff --git a/processor/scanner/src/db.rs b/processor/scanner/src/db.rs index cc86afeb..f45d2966 100644 --- a/processor/scanner/src/db.rs +++ b/processor/scanner/src/db.rs @@ -47,11 +47,7 @@ impl OutputWithInInstruction { let mut opt = [0xff]; reader.read_exact(&mut opt)?; assert!((opt[0] == 0) || (opt[0] == 1)); - if opt[0] == 0 { - None - } else { - Some(AddressFor::::read(reader)?) - } + (opt[0] == 1).then(|| AddressFor::::read(reader)).transpose()? }; let in_instruction = InInstructionWithBalance::decode(&mut IoReader(reader)).map_err(io::Error::other)?; @@ -422,10 +418,39 @@ impl ScanToEventualityDb { } } +pub(crate) struct Returnable { + pub(crate) return_address: Option>, + pub(crate) in_instruction: InInstructionWithBalance, +} + +impl Returnable { + fn read(reader: &mut impl io::Read) -> io::Result { + let mut opt = [0xff]; + reader.read_exact(&mut opt).unwrap(); + assert!((opt[0] == 0) || (opt[0] == 1)); + + let return_address = (opt[0] == 1).then(|| AddressFor::::read(reader)).transpose()?; + + let in_instruction = + InInstructionWithBalance::decode(&mut IoReader(reader)).map_err(io::Error::other)?; + Ok(Returnable { return_address, in_instruction }) + } + fn write(&self, writer: &mut impl io::Write) -> io::Result<()> { + if let Some(return_address) = &self.return_address { + writer.write_all(&[1])?; + return_address.write(writer)?; + } else { + writer.write_all(&[0])?; + } + self.in_instruction.encode_to(writer); + Ok(()) + } +} + #[derive(BorshSerialize, BorshDeserialize)] struct BlockBoundInInstructions { block_number: u64, - in_instructions: Vec, + returnable_in_instructions: Vec, } db_channel! { @@ -439,22 +464,36 @@ impl ScanToReportDb { pub(crate) fn send_in_instructions( txn: &mut impl DbTxn, block_number: u64, - in_instructions: Vec, + returnable_in_instructions: &[Returnable], ) { - InInstructions::send(txn, (), &BlockBoundInInstructions { block_number, in_instructions }); + let mut buf = vec![]; + for returnable_in_instruction in returnable_in_instructions { + returnable_in_instruction.write(&mut buf).unwrap(); + } + InInstructions::send( + txn, + (), + &BlockBoundInInstructions { block_number, returnable_in_instructions: buf }, + ); } pub(crate) fn recv_in_instructions( txn: &mut impl DbTxn, block_number: u64, - ) -> Vec { + ) -> Vec> { let data = InInstructions::try_recv(txn, ()) .expect("receiving InInstructions for a scanned block not yet sent"); assert_eq!( block_number, data.block_number, "received InInstructions for a scanned block distinct than expected" ); - data.in_instructions + let mut buf = data.returnable_in_instructions.as_slice(); + + let mut returnable_in_instructions = vec![]; + while !buf.is_empty() { + returnable_in_instructions.push(Returnable::read(&mut buf).unwrap()); + } + returnable_in_instructions } } diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 927fc145..93ed961d 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -7,7 +7,7 @@ use serai_db::{Get, DbTxn, Db}; use serai_primitives::{NetworkId, Coin, Amount}; use serai_in_instructions_primitives::Batch; -use serai_coins_primitives::OutInstructionWithBalance; +use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance}; use primitives::{task::*, Address, ReceivedOutput, Block}; @@ -327,6 +327,8 @@ impl Scanner { &mut self, mut txn: impl DbTxn, block_number: u64, + batch_id: u32, + in_instruction_succeededs: Vec, key_to_activate: Option>, ) { log::info!("acknowledging block {block_number}"); @@ -338,8 +340,12 @@ impl Scanner { if let Some(prior_highest_acknowledged_block) = ScannerGlobalDb::::highest_acknowledged_block(&txn) { - assert!(block_number > prior_highest_acknowledged_block, "acknowledging blocks out-of-order"); - for b in (prior_highest_acknowledged_block + 1) .. (block_number - 1) { + // If a single block produced multiple Batches, the block number won't increment + assert!( + block_number >= prior_highest_acknowledged_block, + "acknowledging blocks out-of-order" + ); + for b in (prior_highest_acknowledged_block + 1) .. block_number { assert!( !ScannerGlobalDb::::is_block_notable(&txn, b), "skipped acknowledging a block which was notable" @@ -352,6 +358,37 @@ impl Scanner { ScannerGlobalDb::::queue_key(&mut txn, block_number + S::WINDOW_LENGTH, key_to_activate); } + // Return the balances for any InInstructions which failed to execute + { + let return_information = report::take_return_information::(&mut txn, batch_id) + .expect("didn't save the return information for Batch we published"); + assert_eq!( + in_instruction_succeededs.len(), + return_information.len(), + "amount of InInstruction succeededs differed from amount of return information saved" + ); + + // We map these into standard Burns + let mut returns = vec![]; + for (succeeded, return_information) in + in_instruction_succeededs.into_iter().zip(return_information) + { + if succeeded { + continue; + } + + if let Some(report::ReturnInformation { address, balance }) = return_information { + returns.push(OutInstructionWithBalance { + instruction: OutInstruction { address: address.into(), data: None }, + balance, + }); + } + } + // We send them as stemming from this block + // TODO: These should be handled with any Burns from this block + SubstrateToEventualityDb::send_burns(&mut txn, block_number, &returns); + } + // Commit the txn txn.commit(); // Run the Eventuality task since we've advanced it diff --git a/processor/scanner/src/report/db.rs b/processor/scanner/src/report/db.rs index 2fd98d4b..4c96a360 100644 --- a/processor/scanner/src/report/db.rs +++ b/processor/scanner/src/report/db.rs @@ -1,16 +1,34 @@ +use core::marker::PhantomData; +use std::io::{Read, Write}; + +use scale::{Encode, Decode, IoReader}; use serai_db::{Get, DbTxn, create_db}; +use serai_primitives::Balance; + +use primitives::Address; + +use crate::{ScannerFeed, AddressFor}; + create_db!( ScannerReport { // The next block to potentially report NextToPotentiallyReportBlock: () -> u64, // The next Batch ID to use NextBatchId: () -> u32, + + // The return addresses for the InInstructions within a Batch + SerializedReturnAddresses: (batch: u32) -> Vec, } ); -pub(crate) struct ReportDb; -impl ReportDb { +pub(crate) struct ReturnInformation { + pub(crate) address: AddressFor, + pub(crate) balance: Balance, +} + +pub(crate) struct ReportDb(PhantomData); +impl ReportDb { pub(crate) fn set_next_to_potentially_report_block( txn: &mut impl DbTxn, next_to_potentially_report_block: u64, @@ -26,4 +44,43 @@ impl ReportDb { NextBatchId::set(txn, &(id + 1)); id } + + pub(crate) fn save_return_information( + txn: &mut impl DbTxn, + id: u32, + return_information: &Vec>>, + ) { + let mut buf = Vec::with_capacity(return_information.len() * (32 + 1 + 8)); + for return_information in return_information { + if let Some(ReturnInformation { address, balance }) = return_information { + buf.write_all(&[1]).unwrap(); + address.write(&mut buf).unwrap(); + balance.encode_to(&mut buf); + } else { + buf.write_all(&[0]).unwrap(); + } + } + SerializedReturnAddresses::set(txn, id, &buf); + } + pub(crate) fn take_return_information( + txn: &mut impl DbTxn, + id: u32, + ) -> Option>>> { + let buf = SerializedReturnAddresses::get(txn, id)?; + let mut buf = buf.as_slice(); + + let mut res = Vec::with_capacity(buf.len() / (32 + 1 + 8)); + while !buf.is_empty() { + let mut opt = [0xff]; + buf.read_exact(&mut opt).unwrap(); + assert!((opt[0] == 0) || (opt[0] == 1)); + + res.push((opt[0] == 1).then(|| { + let address = AddressFor::::read(&mut buf).unwrap(); + let balance = Balance::decode(&mut IoReader(&mut buf)).unwrap(); + ReturnInformation { address, balance } + })); + } + Some(res) + } } diff --git a/processor/scanner/src/report/mod.rs b/processor/scanner/src/report/mod.rs index b789ea58..8ac2c06b 100644 --- a/processor/scanner/src/report/mod.rs +++ b/processor/scanner/src/report/mod.rs @@ -8,15 +8,23 @@ use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch}; use primitives::task::ContinuallyRan; use crate::{ - db::{ScannerGlobalDb, ScanToReportDb}, + db::{Returnable, ScannerGlobalDb, ScanToReportDb}, index, scan::next_to_scan_for_outputs_block, ScannerFeed, BatchPublisher, }; mod db; +pub(crate) use db::ReturnInformation; use db::ReportDb; +pub(crate) fn take_return_information( + txn: &mut impl DbTxn, + id: u32, +) -> Option>>> { + ReportDb::::take_return_information(txn, id) +} + /* This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion. @@ -33,10 +41,10 @@ pub(crate) struct ReportTask { impl ReportTask { pub(crate) fn new(mut db: D, batch_publisher: B, start_block: u64) -> Self { - if ReportDb::next_to_potentially_report_block(&db).is_none() { + if ReportDb::::next_to_potentially_report_block(&db).is_none() { // Initialize the DB let mut txn = db.txn(); - ReportDb::set_next_to_potentially_report_block(&mut txn, start_block); + ReportDb::::set_next_to_potentially_report_block(&mut txn, start_block); txn.commit(); } @@ -64,7 +72,7 @@ impl ContinuallyRan for ReportTask::next_to_potentially_report_block(&self.db) .expect("ReportTask run before writing the start block"); for b in next_to_potentially_report ..= highest_reportable { @@ -81,32 +89,53 @@ impl ContinuallyRan for ReportTask::acquire_batch_id(&mut txn); // start with empty batch let mut batches = vec![Batch { network, id: batch_id, block: BlockHash(block_hash), instructions: vec![] }]; + // We also track the return information for the InInstructions within a Batch in case they + // error + let mut return_information = vec![vec![]]; + + for Returnable { return_address, in_instruction } in in_instructions { + let balance = in_instruction.balance; - for instruction in in_instructions { let batch = batches.last_mut().unwrap(); - batch.instructions.push(instruction); + batch.instructions.push(in_instruction); // check if batch is over-size if batch.encode().len() > MAX_BATCH_SIZE { // pop the last instruction so it's back in size - let instruction = batch.instructions.pop().unwrap(); + let in_instruction = batch.instructions.pop().unwrap(); // bump the id for the new batch - batch_id = ReportDb::acquire_batch_id(&mut txn); + batch_id = ReportDb::::acquire_batch_id(&mut txn); // make a new batch with this instruction included batches.push(Batch { network, id: batch_id, block: BlockHash(block_hash), - instructions: vec![instruction], + instructions: vec![in_instruction], }); + // Since we're allocating a new batch, allocate a new set of return addresses for it + return_information.push(vec![]); } + + // For the set of return addresses for the InInstructions for the batch we just pushed + // onto, push this InInstruction's return addresses + return_information + .last_mut() + .unwrap() + .push(return_address.map(|address| ReturnInformation { address, balance })); + } + + // Save the return addresses to the databse + assert_eq!(batches.len(), return_information.len()); + for (batch, return_information) in batches.iter().zip(&return_information) { + assert_eq!(batch.instructions.len(), return_information.len()); + ReportDb::::save_return_information(&mut txn, batch.id, return_information); } for batch in batches { @@ -119,7 +148,7 @@ impl ContinuallyRan for ReportTask::set_next_to_potentially_report_block(&mut txn, b + 1); txn.commit(); } diff --git a/processor/scanner/src/scan/mod.rs b/processor/scanner/src/scan/mod.rs index 405861ba..4d6ca16e 100644 --- a/processor/scanner/src/scan/mod.rs +++ b/processor/scanner/src/scan/mod.rs @@ -13,7 +13,8 @@ use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Block}; use crate::{ lifetime::LifetimeStage, db::{ - OutputWithInInstruction, SenderScanData, ScannerGlobalDb, ScanToReportDb, ScanToEventualityDb, + OutputWithInInstruction, Returnable, SenderScanData, ScannerGlobalDb, ScanToReportDb, + ScanToEventualityDb, }, BlockExt, ScannerFeed, AddressFor, OutputFor, Return, sort_outputs, eventuality::latest_scannable_block, @@ -149,7 +150,13 @@ impl ContinuallyRan for ScanTask { queued_outputs }; for queued_output in queued_outputs { - in_instructions.push((queued_output.output.id(), queued_output.in_instruction)); + in_instructions.push(( + queued_output.output.id(), + Returnable { + return_address: queued_output.return_address, + in_instruction: queued_output.in_instruction, + }, + )); scan_data.received_external_outputs.push(queued_output.output); } @@ -302,7 +309,10 @@ impl ContinuallyRan for ScanTask { in_instructions.push(( output_with_in_instruction.output.id(), - output_with_in_instruction.in_instruction, + Returnable { + return_address: output_with_in_instruction.return_address, + in_instruction: output_with_in_instruction.in_instruction, + }, )); scan_data.received_external_outputs.push(output_with_in_instruction.output); } @@ -329,7 +339,7 @@ impl ContinuallyRan for ScanTask { let in_instructions = in_instructions.into_iter().map(|(_id, in_instruction)| in_instruction).collect::>(); // Send the InInstructions to the report task - ScanToReportDb::::send_in_instructions(&mut txn, b, in_instructions); + ScanToReportDb::::send_in_instructions(&mut txn, b, &in_instructions); // Send the scan data to the eventuality task ScanToEventualityDb::::send_scan_data(&mut txn, b, &scan_data);