diff --git a/Cargo.lock b/Cargo.lock index 935e95d8..7512f35c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8656,6 +8656,7 @@ dependencies = [ "group", "log", "parity-scale-codec", + "serai-coins-primitives", "serai-primitives", "tokio", ] @@ -8674,9 +8675,7 @@ dependencies = [ "serai-db", "serai-in-instructions-primitives", "serai-primitives", - "serai-processor-messages", "serai-processor-primitives", - "thiserror", "tokio", ] @@ -8712,6 +8711,17 @@ dependencies = [ [[package]] name = "serai-processor-transaction-chaining-scheduler" version = "0.1.0" +dependencies = [ + "borsh", + "group", + "parity-scale-codec", + "serai-coins-primitives", + "serai-db", + "serai-primitives", + "serai-processor-primitives", + "serai-processor-scanner", + "serai-processor-utxo-scheduler-primitives", +] [[package]] name = "serai-processor-utxo-scheduler-primitives" diff --git a/processor/primitives/Cargo.toml b/processor/primitives/Cargo.toml index 9427a604..dd1b74ea 100644 --- a/processor/primitives/Cargo.toml +++ b/processor/primitives/Cargo.toml @@ -22,6 +22,7 @@ async-trait = { version = "0.1", default-features = false } group = { version = "0.13", default-features = false } serai-primitives = { path = "../../substrate/primitives", default-features = false, features = ["std"] } +serai-coins-primitives = { path = "../../substrate/coins/primitives", default-features = false, features = ["std"] } scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] } borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } diff --git a/processor/primitives/src/lib.rs b/processor/primitives/src/lib.rs index 7a8be219..4e45fa8f 100644 --- a/processor/primitives/src/lib.rs +++ b/processor/primitives/src/lib.rs @@ -21,6 +21,9 @@ pub use eventuality::*; mod block; pub use block::*; +mod payment; +pub use payment::*; + /// An ID for an output/transaction/block/etc. /// /// IDs don't need to implement `Copy`, enabling `[u8; 33]`, `[u8; 64]` to be used. IDs are still diff --git a/processor/primitives/src/payment.rs b/processor/primitives/src/payment.rs new file mode 100644 index 00000000..1bbb0604 --- /dev/null +++ b/processor/primitives/src/payment.rs @@ -0,0 +1,42 @@ +use serai_primitives::{Balance, Data}; +use serai_coins_primitives::OutInstructionWithBalance; + +use crate::Address; + +/// A payment to fulfill. +#[derive(Clone)] +pub struct Payment { + address: A, + balance: Balance, + data: Option>, +} + +impl TryFrom for Payment { + type Error = (); + fn try_from(out_instruction_with_balance: OutInstructionWithBalance) -> Result { + Ok(Payment { + address: out_instruction_with_balance.instruction.address.try_into().map_err(|_| ())?, + balance: out_instruction_with_balance.balance, + data: out_instruction_with_balance.instruction.data.map(Data::consume), + }) + } +} + +impl Payment { + /// Create a new Payment. + pub fn new(address: A, balance: Balance, data: Option>) -> Self { + Payment { address, balance, data } + } + /// The address to pay. + pub fn address(&self) -> &A { + &self.address + } + /// The balance to transfer. + pub fn balance(&self) -> Balance { + self.balance + } + /// The data to associate with this payment. + pub fn data(&self) -> &Option> { + &self.data + } +} diff --git a/processor/scanner/Cargo.toml b/processor/scanner/Cargo.toml index e7cdef97..c2dc31fe 100644 --- a/processor/scanner/Cargo.toml +++ b/processor/scanner/Cargo.toml @@ -19,7 +19,6 @@ workspace = true [dependencies] # Macros async-trait = { version = "0.1", default-features = false } -thiserror = { version = "1", default-features = false } # Encoders hex = { version = "0.4", default-features = false, features = ["std"] } @@ -37,7 +36,6 @@ serai-db = { path = "../../common/db" } serai-primitives = { path = "../../substrate/primitives", default-features = false, features = ["std"] } serai-in-instructions-primitives = { path = "../../substrate/in-instructions/primitives", default-features = false, features = ["std"] } -serai-coins-primitives = { path = "../../substrate/coins/primitives", default-features = false, features = ["std"] } +serai-coins-primitives = { path = "../../substrate/coins/primitives", default-features = false, features = ["std", "borsh"] } -messages = { package = "serai-processor-messages", path = "../messages" } primitives = { package = "serai-processor-primitives", path = "../primitives" } diff --git a/processor/scanner/src/eventuality/mod.rs b/processor/scanner/src/eventuality/mod.rs index bfc879ea..83ec50ab 100644 --- a/processor/scanner/src/eventuality/mod.rs +++ b/processor/scanner/src/eventuality/mod.rs @@ -4,7 +4,7 @@ use group::GroupEncoding; use serai_db::{Get, DbTxn, Db}; -use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Eventuality, Block}; +use primitives::{task::ContinuallyRan, OutputType, ReceivedOutput, Eventuality, Block, Payment}; use crate::{ lifetime::LifetimeStage, @@ -12,7 +12,7 @@ use crate::{ SeraiKey, OutputWithInInstruction, ReceiverScanData, ScannerGlobalDb, SubstrateToEventualityDb, ScanToEventualityDb, }, - BlockExt, ScannerFeed, KeyFor, OutputFor, EventualityFor, Payment, SchedulerUpdate, Scheduler, + BlockExt, ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, SchedulerUpdate, Scheduler, sort_outputs, scan::{next_to_scan_for_outputs_block, queue_output_until_block}, }; @@ -168,7 +168,10 @@ impl> EventualityTask { let new_eventualities = self.scheduler.fulfill( &mut txn, &keys_with_stages, - burns.into_iter().filter_map(|burn| Payment::try_from(burn).ok()).collect(), + burns + .into_iter() + .filter_map(|burn| Payment::>::try_from(burn).ok()) + .collect(), ); intake_eventualities::(&mut txn, new_eventualities); } diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index 80cf96be..4d33d0d0 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -5,11 +5,11 @@ use group::GroupEncoding; use serai_db::{Get, DbTxn, Db}; -use serai_primitives::{NetworkId, Coin, Amount, Balance, Data}; +use serai_primitives::{NetworkId, Coin, Amount}; use serai_in_instructions_primitives::Batch; use serai_coins_primitives::OutInstructionWithBalance; -use primitives::{task::*, Address, ReceivedOutput, Block}; +use primitives::{task::*, Address, ReceivedOutput, Block, Payment}; // Logic for deciding where in its lifetime a multisig is. mod lifetime; @@ -195,6 +195,16 @@ impl Return { let output = OutputFor::::read(reader)?; Ok(Return { address, output }) } + + /// The address to return the output to. + pub fn address(&self) -> &AddressFor { + &self.address + } + + /// The output to return. + pub fn output(&self) -> &OutputFor { + &self.output + } } /// An update for the scheduler. @@ -219,40 +229,6 @@ impl SchedulerUpdate { } } -/// A payment to fulfill. -#[derive(Clone)] -pub struct Payment { - address: AddressFor, - balance: Balance, - data: Option>, -} - -impl TryFrom for Payment { - type Error = (); - fn try_from(out_instruction_with_balance: OutInstructionWithBalance) -> Result { - Ok(Payment { - address: out_instruction_with_balance.instruction.address.try_into().map_err(|_| ())?, - balance: out_instruction_with_balance.balance, - data: out_instruction_with_balance.instruction.data.map(Data::consume), - }) - } -} - -impl Payment { - /// The address to pay. - pub fn address(&self) -> &AddressFor { - &self.address - } - /// The balance to transfer. - pub fn balance(&self) -> Balance { - self.balance - } - /// The data to associate with this payment. - pub fn data(&self) -> &Option> { - &self.data - } -} - /// The object responsible for accumulating outputs and planning new transactions. pub trait Scheduler: 'static + Send { /// Activate a key. @@ -327,7 +303,7 @@ pub trait Scheduler: 'static + Send { &mut self, txn: &mut impl DbTxn, active_keys: &[(KeyFor, LifetimeStage)], - payments: Vec>, + payments: Vec>>, ) -> HashMap, Vec>>; } diff --git a/processor/scanner/src/lifetime.rs b/processor/scanner/src/lifetime.rs index bef6af8b..e07f5f42 100644 --- a/processor/scanner/src/lifetime.rs +++ b/processor/scanner/src/lifetime.rs @@ -6,7 +6,7 @@ use crate::ScannerFeed; /// rotation process. Steps 7-8 regard a multisig which isn't retiring yet retired, and /// accordingly, no longer exists, so they are not modelled here (as this only models active /// multisigs. Inactive multisigs aren't represented in the first place). -#[derive(Clone, Copy, PartialEq)] +#[derive(Clone, Copy, PartialEq, Debug)] pub enum LifetimeStage { /// A new multisig, once active, shouldn't actually start receiving coins until several blocks /// later. If any UI is premature in sending to this multisig, we delay to report the outputs to diff --git a/processor/scheduler/utxo/primitives/src/lib.rs b/processor/scheduler/utxo/primitives/src/lib.rs index 61dd9d88..2c6da97b 100644 --- a/processor/scheduler/utxo/primitives/src/lib.rs +++ b/processor/scheduler/utxo/primitives/src/lib.rs @@ -6,12 +6,12 @@ use core::fmt::Debug; use serai_primitives::{Coin, Amount}; -use primitives::ReceivedOutput; -use scanner::{Payment, ScannerFeed, AddressFor, OutputFor}; +use primitives::{ReceivedOutput, Payment}; +use scanner::{ScannerFeed, KeyFor, AddressFor, OutputFor}; /// An object able to plan a transaction. #[async_trait::async_trait] -pub trait TransactionPlanner { +pub trait TransactionPlanner: 'static + Send + Sync { /// An error encountered when determining the fee rate. /// /// This MUST be an ephemeral error. Retrying fetching data from the blockchain MUST eventually @@ -33,17 +33,22 @@ pub trait TransactionPlanner { coin: Coin, ) -> Result; + /// The branch address for this key of Serai's. + fn branch_address(key: KeyFor) -> AddressFor; + /// The change address for this key of Serai's. + fn change_address(key: KeyFor) -> AddressFor; + /// The forwarding address for this key of Serai's. + fn forwarding_address(key: KeyFor) -> AddressFor; + /// Calculate the for a tansaction with this structure. /// /// The fee rate, inputs, and payments, will all be for the same coin. The returned fee is /// denominated in this coin. fn calculate_fee( - &self, - block_number: u64, fee_rate: Self::FeeRate, inputs: Vec>, - payments: Vec>, - change: Option>, + payments: Vec>>, + change: Option>, ) -> Amount; /// Plan a transaction. @@ -53,12 +58,10 @@ pub trait TransactionPlanner { /// /// `change` will always be an address belonging to the Serai network. fn plan( - &self, - block_number: u64, fee_rate: Self::FeeRate, inputs: Vec>, - payments: Vec>, - change: Option>, + payments: Vec>>, + change: Option>, ) -> Self::PlannedTransaction; /// Obtain a PlannedTransaction via amortizing the fee over the payments. @@ -69,13 +72,11 @@ pub trait TransactionPlanner { /// /// Returns `None` if the fee exceeded the inputs, or `Some` otherwise. fn plan_transaction_with_fee_amortization( - &self, operating_costs: &mut u64, - block_number: u64, fee_rate: Self::FeeRate, inputs: Vec>, - mut payments: Vec>, - change: Option>, + mut payments: Vec>>, + mut change: Option>, ) -> Option { // Sanity checks { @@ -102,9 +103,7 @@ pub trait TransactionPlanner { // Sort payments from high amount to low amount payments.sort_by(|a, b| a.balance().amount.0.cmp(&b.balance().amount.0).reverse()); - let mut fee = self - .calculate_fee(block_number, fee_rate, inputs.clone(), payments.clone(), change.clone()) - .0; + let mut fee = Self::calculate_fee(fee_rate, inputs.clone(), payments.clone(), change).0; let mut amortized = 0; while !payments.is_empty() { // We need to pay the fee, and any accrued operating costs, minus what we've already @@ -124,9 +123,7 @@ pub trait TransactionPlanner { if payments.last().unwrap().balance().amount.0 <= (per_payment_fee + S::dust(coin).0) { amortized += payments.pop().unwrap().balance().amount.0; // Recalculate the fee and try again - fee = self - .calculate_fee(block_number, fee_rate, inputs.clone(), payments.clone(), change.clone()) - .0; + fee = Self::calculate_fee(fee_rate, inputs.clone(), payments.clone(), change).0; continue; } // Break since all of these payments shouldn't be dropped @@ -167,6 +164,15 @@ pub trait TransactionPlanner { amortized += per_payment_fee; } assert!(amortized >= (*operating_costs + fee)); + + // If the change is less than the dust, drop it + let would_be_change = inputs.iter().map(|input| input.balance().amount.0).sum::() - + payments.iter().map(|payment| payment.balance().amount.0).sum::() - + fee; + if would_be_change < S::dust(coin).0 { + change = None; + *operating_costs += would_be_change; + } } // Update the amount of operating costs @@ -174,6 +180,6 @@ pub trait TransactionPlanner { } // Because we amortized, or accrued as operating costs, the fee, make the transaction - Some(self.plan(block_number, fee_rate, inputs, payments, change)) + Some(Self::plan(fee_rate, inputs, payments, change)) } } diff --git a/processor/scheduler/utxo/transaction-chaining/Cargo.toml b/processor/scheduler/utxo/transaction-chaining/Cargo.toml index 360da6c5..d54d0f85 100644 --- a/processor/scheduler/utxo/transaction-chaining/Cargo.toml +++ b/processor/scheduler/utxo/transaction-chaining/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "serai-processor-transaction-chaining-scheduler" version = "0.1.0" -description = "Scheduler for networks with transaction chaining for the Serai processor" +description = "Scheduler for UTXO networks with transaction chaining for the Serai processor" license = "AGPL-3.0-only" -repository = "https://github.com/serai-dex/serai/tree/develop/processor/scheduler/transaction-chaining" +repository = "https://github.com/serai-dex/serai/tree/develop/processor/scheduler/utxo/transaction-chaining" authors = ["Luke Parker "] keywords = [] edition = "2021" @@ -14,9 +14,22 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [package.metadata.cargo-machete] -ignored = ["scale"] +ignored = ["scale", "borsh"] [lints] workspace = true [dependencies] +group = { version = "0.13", default-features = false } + +scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std"] } +borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } + +serai-primitives = { path = "../../../../substrate/primitives", default-features = false, features = ["std"] } +serai-coins-primitives = { path = "../../../../substrate/coins/primitives", default-features = false, features = ["std"] } + +serai-db = { path = "../../../../common/db" } + +primitives = { package = "serai-processor-primitives", path = "../../../primitives" } +scheduler-primitives = { package = "serai-processor-utxo-scheduler-primitives", path = "../primitives" } +scanner = { package = "serai-processor-scanner", path = "../../../scanner" } diff --git a/processor/scheduler/utxo/transaction-chaining/LICENSE b/processor/scheduler/utxo/transaction-chaining/LICENSE index 41d5a261..e091b149 100644 --- a/processor/scheduler/utxo/transaction-chaining/LICENSE +++ b/processor/scheduler/utxo/transaction-chaining/LICENSE @@ -1,6 +1,6 @@ AGPL-3.0-only license -Copyright (c) 2022-2024 Luke Parker +Copyright (c) 2024 Luke Parker This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License Version 3 as diff --git a/processor/scheduler/utxo/transaction-chaining/src/db.rs b/processor/scheduler/utxo/transaction-chaining/src/db.rs new file mode 100644 index 00000000..20c574e9 --- /dev/null +++ b/processor/scheduler/utxo/transaction-chaining/src/db.rs @@ -0,0 +1,49 @@ +use core::marker::PhantomData; + +use group::GroupEncoding; + +use serai_primitives::Coin; + +use serai_db::{Get, DbTxn, create_db}; + +use primitives::ReceivedOutput; +use scanner::{ScannerFeed, KeyFor, OutputFor}; + +create_db! { + TransactionChainingScheduler { + SerializedOutputs: (key: &[u8], coin: Coin) -> Vec, + } +} + +pub(crate) struct Db(PhantomData); +impl Db { + pub(crate) fn outputs( + getter: &impl Get, + key: KeyFor, + coin: Coin, + ) -> Option>> { + let buf = SerializedOutputs::get(getter, key.to_bytes().as_ref(), coin)?; + let mut buf = buf.as_slice(); + + let mut res = Vec::with_capacity(buf.len() / 128); + while !buf.is_empty() { + res.push(OutputFor::::read(&mut buf).unwrap()); + } + Some(res) + } + pub(crate) fn set_outputs( + txn: &mut impl DbTxn, + key: KeyFor, + coin: Coin, + outputs: &[OutputFor], + ) { + let mut buf = Vec::with_capacity(outputs.len() * 128); + for output in outputs { + output.write(&mut buf).unwrap(); + } + SerializedOutputs::set(txn, key.to_bytes().as_ref(), coin, &buf); + } + pub(crate) fn del_outputs(txn: &mut impl DbTxn, key: KeyFor, coin: Coin) { + SerializedOutputs::del(txn, key.to_bytes().as_ref(), coin); + } +} diff --git a/processor/scheduler/utxo/transaction-chaining/src/lib.rs b/processor/scheduler/utxo/transaction-chaining/src/lib.rs index 3639aa04..63635696 100644 --- a/processor/scheduler/utxo/transaction-chaining/src/lib.rs +++ b/processor/scheduler/utxo/transaction-chaining/src/lib.rs @@ -1,3 +1,151 @@ #![cfg_attr(docsrs, feature(doc_auto_cfg))] #![doc = include_str!("../README.md")] #![deny(missing_docs)] + +use core::marker::PhantomData; +use std::collections::HashMap; + +use serai_primitives::Coin; + +use serai_db::DbTxn; + +use primitives::{ReceivedOutput, Payment}; +use scanner::{ + LifetimeStage, ScannerFeed, KeyFor, AddressFor, OutputFor, EventualityFor, SchedulerUpdate, + Scheduler as SchedulerTrait, +}; +use scheduler_primitives::*; + +mod db; +use db::Db; + +/// A planned transaction. +pub struct PlannedTransaction { + /// The signable transaction. + signable: T, + /// The outputs we'll receive from this. + effected_received_outputs: OutputFor, + /// The Evtnuality to watch for. + eventuality: EventualityFor, +} + +/// A scheduler of transactions for networks premised on the UTXO model which support +/// transaction chaining. +pub struct Scheduler< + S: ScannerFeed, + T, + P: TransactionPlanner>, +>(PhantomData, PhantomData, PhantomData

); + +impl>> + Scheduler +{ + fn accumulate_outputs(txn: &mut impl DbTxn, key: KeyFor, outputs: &[OutputFor]) { + // Accumulate them in memory + let mut outputs_by_coin = HashMap::with_capacity(1); + for output in outputs.iter().filter(|output| output.key() == key) { + let coin = output.balance().coin; + if let std::collections::hash_map::Entry::Vacant(e) = outputs_by_coin.entry(coin) { + e.insert(Db::::outputs(txn, key, coin).unwrap()); + } + outputs_by_coin.get_mut(&coin).unwrap().push(output.clone()); + } + + // Flush them to the database + for (coin, outputs) in outputs_by_coin { + Db::::set_outputs(txn, key, coin, &outputs); + } + } +} + +impl< + S: ScannerFeed, + T: 'static + Send + Sync, + P: TransactionPlanner>, + > SchedulerTrait for Scheduler +{ + fn activate_key(&mut self, txn: &mut impl DbTxn, key: KeyFor) { + for coin in S::NETWORK.coins() { + Db::::set_outputs(txn, key, *coin, &vec![]); + } + } + + fn flush_key(&mut self, txn: &mut impl DbTxn, retiring_key: KeyFor, new_key: KeyFor) { + todo!("TODO") + } + + fn retire_key(&mut self, txn: &mut impl DbTxn, key: KeyFor) { + for coin in S::NETWORK.coins() { + assert!(Db::::outputs(txn, key, *coin).is_none()); + Db::::del_outputs(txn, key, *coin); + } + } + + fn update( + &mut self, + txn: &mut impl DbTxn, + active_keys: &[(KeyFor, LifetimeStage)], + update: SchedulerUpdate, + ) -> HashMap, Vec>> { + // Accumulate all the outputs + for key in active_keys { + Self::accumulate_outputs(txn, key.0, update.outputs()); + } + + let mut fee_rates: HashMap = todo!("TODO"); + + // Create the transactions for the forwards/burns + { + let mut planned_txs = vec![]; + for forward in update.forwards() { + let forward_to_key = active_keys.last().unwrap(); + assert_eq!(forward_to_key.1, LifetimeStage::Active); + + let Some(plan) = P::plan_transaction_with_fee_amortization( + // This uses 0 for the operating costs as we don't incur any here + &mut 0, + fee_rates[&forward.balance().coin], + vec![forward.clone()], + vec![Payment::new(P::forwarding_address(forward_to_key.0), forward.balance(), None)], + None, + ) else { + continue; + }; + planned_txs.push(plan); + } + for to_return in update.returns() { + let out_instruction = + Payment::new(to_return.address().clone(), to_return.output().balance(), None); + let Some(plan) = P::plan_transaction_with_fee_amortization( + // This uses 0 for the operating costs as we don't incur any here + &mut 0, + fee_rates[&out_instruction.balance().coin], + vec![to_return.output().clone()], + vec![out_instruction], + None, + ) else { + continue; + }; + planned_txs.push(plan); + } + + // TODO: Send the transactions off for signing + // TODO: Return the eventualities + todo!("TODO") + } + } + + fn fulfill( + &mut self, + txn: &mut impl DbTxn, + active_keys: &[(KeyFor, LifetimeStage)], + payments: Vec>>, + ) -> HashMap, Vec>> { + // TODO: Find the key to use for fulfillment + // TODO: Sort outputs and payments by amount + // TODO: For as long as we don't have sufficiently aggregated inputs to handle all payments, + // aggregate + // TODO: Create the tree for the payments + todo!("TODO") + } +}