Have acknowledge_block take in the results of the InInstructions executed

If any failed, the scanner now creates a Burn for the return.
This commit is contained in:
Luke Parker 2024-08-30 01:19:29 -04:00
parent 5999f5d65a
commit 76cbe6cf1e
6 changed files with 203 additions and 31 deletions

View file

@ -8,7 +8,7 @@ use serai_primitives::{ExternalAddress, Balance};
use crate::Id; use crate::Id;
/// An address on the external network. /// An address on the external network.
pub trait Address: Send + Sync + TryFrom<ExternalAddress> { pub trait Address: Send + Sync + Into<ExternalAddress> + TryFrom<ExternalAddress> {
/// Write this address. /// Write this address.
fn write(&self, writer: &mut impl io::Write) -> io::Result<()>; fn write(&self, writer: &mut impl io::Write) -> io::Result<()>;
/// Read an address. /// Read an address.

View file

@ -47,11 +47,7 @@ impl<S: ScannerFeed> OutputWithInInstruction<S> {
let mut opt = [0xff]; let mut opt = [0xff];
reader.read_exact(&mut opt)?; reader.read_exact(&mut opt)?;
assert!((opt[0] == 0) || (opt[0] == 1)); assert!((opt[0] == 0) || (opt[0] == 1));
if opt[0] == 0 { (opt[0] == 1).then(|| AddressFor::<S>::read(reader)).transpose()?
None
} else {
Some(AddressFor::<S>::read(reader)?)
}
}; };
let in_instruction = let in_instruction =
InInstructionWithBalance::decode(&mut IoReader(reader)).map_err(io::Error::other)?; InInstructionWithBalance::decode(&mut IoReader(reader)).map_err(io::Error::other)?;
@ -422,10 +418,39 @@ impl<S: ScannerFeed> ScanToEventualityDb<S> {
} }
} }
pub(crate) struct Returnable<S: ScannerFeed> {
pub(crate) return_address: Option<AddressFor<S>>,
pub(crate) in_instruction: InInstructionWithBalance,
}
impl<S: ScannerFeed> Returnable<S> {
fn read(reader: &mut impl io::Read) -> io::Result<Self> {
let mut opt = [0xff];
reader.read_exact(&mut opt).unwrap();
assert!((opt[0] == 0) || (opt[0] == 1));
let return_address = (opt[0] == 1).then(|| AddressFor::<S>::read(reader)).transpose()?;
let in_instruction =
InInstructionWithBalance::decode(&mut IoReader(reader)).map_err(io::Error::other)?;
Ok(Returnable { return_address, in_instruction })
}
fn write(&self, writer: &mut impl io::Write) -> io::Result<()> {
if let Some(return_address) = &self.return_address {
writer.write_all(&[1])?;
return_address.write(writer)?;
} else {
writer.write_all(&[0])?;
}
self.in_instruction.encode_to(writer);
Ok(())
}
}
#[derive(BorshSerialize, BorshDeserialize)] #[derive(BorshSerialize, BorshDeserialize)]
struct BlockBoundInInstructions { struct BlockBoundInInstructions {
block_number: u64, block_number: u64,
in_instructions: Vec<InInstructionWithBalance>, returnable_in_instructions: Vec<u8>,
} }
db_channel! { db_channel! {
@ -439,22 +464,36 @@ impl<S: ScannerFeed> ScanToReportDb<S> {
pub(crate) fn send_in_instructions( pub(crate) fn send_in_instructions(
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
block_number: u64, block_number: u64,
in_instructions: Vec<InInstructionWithBalance>, returnable_in_instructions: &[Returnable<S>],
) { ) {
InInstructions::send(txn, (), &BlockBoundInInstructions { block_number, in_instructions }); let mut buf = vec![];
for returnable_in_instruction in returnable_in_instructions {
returnable_in_instruction.write(&mut buf).unwrap();
}
InInstructions::send(
txn,
(),
&BlockBoundInInstructions { block_number, returnable_in_instructions: buf },
);
} }
pub(crate) fn recv_in_instructions( pub(crate) fn recv_in_instructions(
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
block_number: u64, block_number: u64,
) -> Vec<InInstructionWithBalance> { ) -> Vec<Returnable<S>> {
let data = InInstructions::try_recv(txn, ()) let data = InInstructions::try_recv(txn, ())
.expect("receiving InInstructions for a scanned block not yet sent"); .expect("receiving InInstructions for a scanned block not yet sent");
assert_eq!( assert_eq!(
block_number, data.block_number, block_number, data.block_number,
"received InInstructions for a scanned block distinct than expected" "received InInstructions for a scanned block distinct than expected"
); );
data.in_instructions let mut buf = data.returnable_in_instructions.as_slice();
let mut returnable_in_instructions = vec![];
while !buf.is_empty() {
returnable_in_instructions.push(Returnable::read(&mut buf).unwrap());
}
returnable_in_instructions
} }
} }

View file

@ -7,7 +7,7 @@ use serai_db::{Get, DbTxn, Db};
use serai_primitives::{NetworkId, Coin, Amount}; use serai_primitives::{NetworkId, Coin, Amount};
use serai_in_instructions_primitives::Batch; use serai_in_instructions_primitives::Batch;
use serai_coins_primitives::OutInstructionWithBalance; use serai_coins_primitives::{OutInstruction, OutInstructionWithBalance};
use primitives::{task::*, Address, ReceivedOutput, Block}; use primitives::{task::*, Address, ReceivedOutput, Block};
@ -327,6 +327,8 @@ impl<S: ScannerFeed> Scanner<S> {
&mut self, &mut self,
mut txn: impl DbTxn, mut txn: impl DbTxn,
block_number: u64, block_number: u64,
batch_id: u32,
in_instruction_succeededs: Vec<bool>,
key_to_activate: Option<KeyFor<S>>, key_to_activate: Option<KeyFor<S>>,
) { ) {
log::info!("acknowledging block {block_number}"); log::info!("acknowledging block {block_number}");
@ -338,8 +340,12 @@ impl<S: ScannerFeed> Scanner<S> {
if let Some(prior_highest_acknowledged_block) = if let Some(prior_highest_acknowledged_block) =
ScannerGlobalDb::<S>::highest_acknowledged_block(&txn) ScannerGlobalDb::<S>::highest_acknowledged_block(&txn)
{ {
assert!(block_number > prior_highest_acknowledged_block, "acknowledging blocks out-of-order"); // If a single block produced multiple Batches, the block number won't increment
for b in (prior_highest_acknowledged_block + 1) .. (block_number - 1) { assert!(
block_number >= prior_highest_acknowledged_block,
"acknowledging blocks out-of-order"
);
for b in (prior_highest_acknowledged_block + 1) .. block_number {
assert!( assert!(
!ScannerGlobalDb::<S>::is_block_notable(&txn, b), !ScannerGlobalDb::<S>::is_block_notable(&txn, b),
"skipped acknowledging a block which was notable" "skipped acknowledging a block which was notable"
@ -352,6 +358,37 @@ impl<S: ScannerFeed> Scanner<S> {
ScannerGlobalDb::<S>::queue_key(&mut txn, block_number + S::WINDOW_LENGTH, key_to_activate); ScannerGlobalDb::<S>::queue_key(&mut txn, block_number + S::WINDOW_LENGTH, key_to_activate);
} }
// Return the balances for any InInstructions which failed to execute
{
let return_information = report::take_return_information::<S>(&mut txn, batch_id)
.expect("didn't save the return information for Batch we published");
assert_eq!(
in_instruction_succeededs.len(),
return_information.len(),
"amount of InInstruction succeededs differed from amount of return information saved"
);
// We map these into standard Burns
let mut returns = vec![];
for (succeeded, return_information) in
in_instruction_succeededs.into_iter().zip(return_information)
{
if succeeded {
continue;
}
if let Some(report::ReturnInformation { address, balance }) = return_information {
returns.push(OutInstructionWithBalance {
instruction: OutInstruction { address: address.into(), data: None },
balance,
});
}
}
// We send them as stemming from this block
// TODO: These should be handled with any Burns from this block
SubstrateToEventualityDb::send_burns(&mut txn, block_number, &returns);
}
// Commit the txn // Commit the txn
txn.commit(); txn.commit();
// Run the Eventuality task since we've advanced it // Run the Eventuality task since we've advanced it

View file

@ -1,16 +1,34 @@
use core::marker::PhantomData;
use std::io::{Read, Write};
use scale::{Encode, Decode, IoReader};
use serai_db::{Get, DbTxn, create_db}; use serai_db::{Get, DbTxn, create_db};
use serai_primitives::Balance;
use primitives::Address;
use crate::{ScannerFeed, AddressFor};
create_db!( create_db!(
ScannerReport { ScannerReport {
// The next block to potentially report // The next block to potentially report
NextToPotentiallyReportBlock: () -> u64, NextToPotentiallyReportBlock: () -> u64,
// The next Batch ID to use // The next Batch ID to use
NextBatchId: () -> u32, NextBatchId: () -> u32,
// The return addresses for the InInstructions within a Batch
SerializedReturnAddresses: (batch: u32) -> Vec<u8>,
} }
); );
pub(crate) struct ReportDb; pub(crate) struct ReturnInformation<S: ScannerFeed> {
impl ReportDb { pub(crate) address: AddressFor<S>,
pub(crate) balance: Balance,
}
pub(crate) struct ReportDb<S: ScannerFeed>(PhantomData<S>);
impl<S: ScannerFeed> ReportDb<S> {
pub(crate) fn set_next_to_potentially_report_block( pub(crate) fn set_next_to_potentially_report_block(
txn: &mut impl DbTxn, txn: &mut impl DbTxn,
next_to_potentially_report_block: u64, next_to_potentially_report_block: u64,
@ -26,4 +44,43 @@ impl ReportDb {
NextBatchId::set(txn, &(id + 1)); NextBatchId::set(txn, &(id + 1));
id id
} }
pub(crate) fn save_return_information(
txn: &mut impl DbTxn,
id: u32,
return_information: &Vec<Option<ReturnInformation<S>>>,
) {
let mut buf = Vec::with_capacity(return_information.len() * (32 + 1 + 8));
for return_information in return_information {
if let Some(ReturnInformation { address, balance }) = return_information {
buf.write_all(&[1]).unwrap();
address.write(&mut buf).unwrap();
balance.encode_to(&mut buf);
} else {
buf.write_all(&[0]).unwrap();
}
}
SerializedReturnAddresses::set(txn, id, &buf);
}
pub(crate) fn take_return_information(
txn: &mut impl DbTxn,
id: u32,
) -> Option<Vec<Option<ReturnInformation<S>>>> {
let buf = SerializedReturnAddresses::get(txn, id)?;
let mut buf = buf.as_slice();
let mut res = Vec::with_capacity(buf.len() / (32 + 1 + 8));
while !buf.is_empty() {
let mut opt = [0xff];
buf.read_exact(&mut opt).unwrap();
assert!((opt[0] == 0) || (opt[0] == 1));
res.push((opt[0] == 1).then(|| {
let address = AddressFor::<S>::read(&mut buf).unwrap();
let balance = Balance::decode(&mut IoReader(&mut buf)).unwrap();
ReturnInformation { address, balance }
}));
}
Some(res)
}
} }

View file

@ -8,15 +8,23 @@ use serai_in_instructions_primitives::{MAX_BATCH_SIZE, Batch};
use primitives::task::ContinuallyRan; use primitives::task::ContinuallyRan;
use crate::{ use crate::{
db::{ScannerGlobalDb, ScanToReportDb}, db::{Returnable, ScannerGlobalDb, ScanToReportDb},
index, index,
scan::next_to_scan_for_outputs_block, scan::next_to_scan_for_outputs_block,
ScannerFeed, BatchPublisher, ScannerFeed, BatchPublisher,
}; };
mod db; mod db;
pub(crate) use db::ReturnInformation;
use db::ReportDb; use db::ReportDb;
pub(crate) fn take_return_information<S: ScannerFeed>(
txn: &mut impl DbTxn,
id: u32,
) -> Option<Vec<Option<ReturnInformation<S>>>> {
ReportDb::<S>::take_return_information(txn, id)
}
/* /*
This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion. This task produces Batches for notable blocks, with all InInstructions, in an ordered fashion.
@ -33,10 +41,10 @@ pub(crate) struct ReportTask<D: Db, S: ScannerFeed, B: BatchPublisher> {
impl<D: Db, S: ScannerFeed, B: BatchPublisher> ReportTask<D, S, B> { impl<D: Db, S: ScannerFeed, B: BatchPublisher> ReportTask<D, S, B> {
pub(crate) fn new(mut db: D, batch_publisher: B, start_block: u64) -> Self { pub(crate) fn new(mut db: D, batch_publisher: B, start_block: u64) -> Self {
if ReportDb::next_to_potentially_report_block(&db).is_none() { if ReportDb::<S>::next_to_potentially_report_block(&db).is_none() {
// Initialize the DB // Initialize the DB
let mut txn = db.txn(); let mut txn = db.txn();
ReportDb::set_next_to_potentially_report_block(&mut txn, start_block); ReportDb::<S>::set_next_to_potentially_report_block(&mut txn, start_block);
txn.commit(); txn.commit();
} }
@ -64,7 +72,7 @@ impl<D: Db, S: ScannerFeed, B: BatchPublisher> ContinuallyRan for ReportTask<D,
last_scanned last_scanned
}; };
let next_to_potentially_report = ReportDb::next_to_potentially_report_block(&self.db) let next_to_potentially_report = ReportDb::<S>::next_to_potentially_report_block(&self.db)
.expect("ReportTask run before writing the start block"); .expect("ReportTask run before writing the start block");
for b in next_to_potentially_report ..= highest_reportable { for b in next_to_potentially_report ..= highest_reportable {
@ -81,32 +89,53 @@ impl<D: Db, S: ScannerFeed, B: BatchPublisher> ContinuallyRan for ReportTask<D,
if notable { if notable {
let network = S::NETWORK; let network = S::NETWORK;
let block_hash = index::block_id(&txn, b); let block_hash = index::block_id(&txn, b);
let mut batch_id = ReportDb::acquire_batch_id(&mut txn); let mut batch_id = ReportDb::<S>::acquire_batch_id(&mut txn);
// start with empty batch // start with empty batch
let mut batches = let mut batches =
vec![Batch { network, id: batch_id, block: BlockHash(block_hash), instructions: vec![] }]; vec![Batch { network, id: batch_id, block: BlockHash(block_hash), instructions: vec![] }];
// We also track the return information for the InInstructions within a Batch in case they
// error
let mut return_information = vec![vec![]];
for Returnable { return_address, in_instruction } in in_instructions {
let balance = in_instruction.balance;
for instruction in in_instructions {
let batch = batches.last_mut().unwrap(); let batch = batches.last_mut().unwrap();
batch.instructions.push(instruction); batch.instructions.push(in_instruction);
// check if batch is over-size // check if batch is over-size
if batch.encode().len() > MAX_BATCH_SIZE { if batch.encode().len() > MAX_BATCH_SIZE {
// pop the last instruction so it's back in size // pop the last instruction so it's back in size
let instruction = batch.instructions.pop().unwrap(); let in_instruction = batch.instructions.pop().unwrap();
// bump the id for the new batch // bump the id for the new batch
batch_id = ReportDb::acquire_batch_id(&mut txn); batch_id = ReportDb::<S>::acquire_batch_id(&mut txn);
// make a new batch with this instruction included // make a new batch with this instruction included
batches.push(Batch { batches.push(Batch {
network, network,
id: batch_id, id: batch_id,
block: BlockHash(block_hash), block: BlockHash(block_hash),
instructions: vec![instruction], instructions: vec![in_instruction],
}); });
// Since we're allocating a new batch, allocate a new set of return addresses for it
return_information.push(vec![]);
} }
// For the set of return addresses for the InInstructions for the batch we just pushed
// onto, push this InInstruction's return addresses
return_information
.last_mut()
.unwrap()
.push(return_address.map(|address| ReturnInformation { address, balance }));
}
// Save the return addresses to the databse
assert_eq!(batches.len(), return_information.len());
for (batch, return_information) in batches.iter().zip(&return_information) {
assert_eq!(batch.instructions.len(), return_information.len());
ReportDb::<S>::save_return_information(&mut txn, batch.id, return_information);
} }
for batch in batches { for batch in batches {
@ -119,7 +148,7 @@ impl<D: Db, S: ScannerFeed, B: BatchPublisher> ContinuallyRan for ReportTask<D,
} }
// Update the next to potentially report block // Update the next to potentially report block
ReportDb::set_next_to_potentially_report_block(&mut txn, b + 1); ReportDb::<S>::set_next_to_potentially_report_block(&mut txn, b + 1);
txn.commit(); txn.commit();
} }

View file

@ -13,7 +13,8 @@ use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Block};
use crate::{ use crate::{
lifetime::LifetimeStage, lifetime::LifetimeStage,
db::{ db::{
OutputWithInInstruction, SenderScanData, ScannerGlobalDb, ScanToReportDb, ScanToEventualityDb, OutputWithInInstruction, Returnable, SenderScanData, ScannerGlobalDb, ScanToReportDb,
ScanToEventualityDb,
}, },
BlockExt, ScannerFeed, AddressFor, OutputFor, Return, sort_outputs, BlockExt, ScannerFeed, AddressFor, OutputFor, Return, sort_outputs,
eventuality::latest_scannable_block, eventuality::latest_scannable_block,
@ -149,7 +150,13 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
queued_outputs queued_outputs
}; };
for queued_output in queued_outputs { for queued_output in queued_outputs {
in_instructions.push((queued_output.output.id(), queued_output.in_instruction)); in_instructions.push((
queued_output.output.id(),
Returnable {
return_address: queued_output.return_address,
in_instruction: queued_output.in_instruction,
},
));
scan_data.received_external_outputs.push(queued_output.output); scan_data.received_external_outputs.push(queued_output.output);
} }
@ -302,7 +309,10 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
in_instructions.push(( in_instructions.push((
output_with_in_instruction.output.id(), output_with_in_instruction.output.id(),
output_with_in_instruction.in_instruction, Returnable {
return_address: output_with_in_instruction.return_address,
in_instruction: output_with_in_instruction.in_instruction,
},
)); ));
scan_data.received_external_outputs.push(output_with_in_instruction.output); scan_data.received_external_outputs.push(output_with_in_instruction.output);
} }
@ -329,7 +339,7 @@ impl<D: Db, S: ScannerFeed> ContinuallyRan for ScanTask<D, S> {
let in_instructions = let in_instructions =
in_instructions.into_iter().map(|(_id, in_instruction)| in_instruction).collect::<Vec<_>>(); in_instructions.into_iter().map(|(_id, in_instruction)| in_instruction).collect::<Vec<_>>();
// Send the InInstructions to the report task // Send the InInstructions to the report task
ScanToReportDb::<S>::send_in_instructions(&mut txn, b, in_instructions); ScanToReportDb::<S>::send_in_instructions(&mut txn, b, &in_instructions);
// Send the scan data to the eventuality task // Send the scan data to the eventuality task
ScanToEventualityDb::<S>::send_scan_data(&mut txn, b, &scan_data); ScanToEventualityDb::<S>::send_scan_data(&mut txn, b, &scan_data);