Save the scheduler to disk

This is a horrible impl which does a full ser of everything on every change.
It's just the minimal changes to resolve this TODO and able testnet deployment.
This commit is contained in:
Luke Parker 2023-07-30 14:11:04 -04:00
parent d8033504c8
commit 2815046b21
No known key found for this signature in database
5 changed files with 144 additions and 29 deletions

View file

@ -25,7 +25,7 @@ services:
entrypoint: /scripts/entry-dev.sh
# TODO: Use expose, not ports
ports:
- "18443"
- "18443:18443"
ethereum:
profiles:
@ -50,7 +50,7 @@ services:
entrypoint: /scripts/entry-dev.sh
# TODO: Use expose, not ports
ports:
- "18081"
- "18081:18081"
# Infrastructure

View file

@ -218,7 +218,7 @@ async fn sign_plans<C: Coin, D: Db>(
.schedulers
.get_mut(key.as_ref())
.expect("didn't have a scheduler for a key we have a plan for")
.created_output(branch.expected, branch.actual);
.created_output::<D>(txn, branch.expected, branch.actual);
}
if let Some((tx, eventuality)) = tx {
@ -322,7 +322,7 @@ async fn handle_coordinator_msg<D: Db, C: Coin, Co: Coordinator>(
// TODO: This assumes the coin has a monotonic clock for its blocks' times, which
// isn't a viable assumption
// If the latest block number is 10, then the block indexd by 1 has 10 confirms
// If the latest block number is 10, then the block indexed by 1 has 10 confirms
// 10 + 1 - 10 = 1
while get_block(
coin,
@ -381,7 +381,7 @@ async fn handle_coordinator_msg<D: Db, C: Coin, Co: Coordinator>(
substrate_mutable.scanner.rotate_key(txn, activation_number, key).await;
substrate_mutable
.schedulers
.insert(key.to_bytes().as_ref().to_vec(), Scheduler::<C>::new(key));
.insert(key.to_bytes().as_ref().to_vec(), Scheduler::<C>::new::<D>(txn, key));
tributary_mutable
.signers
@ -434,7 +434,7 @@ async fn handle_coordinator_msg<D: Db, C: Coin, Co: Coordinator>(
.schedulers
.get_mut(&key_vec)
.expect("key we don't have a scheduler for acknowledged a block")
.schedule(outputs, payments);
.schedule::<D>(txn, outputs, payments);
coordinator
.send(ProcessorMessage::Coordinator(
@ -498,14 +498,14 @@ async fn boot<C: Coin, D: Db>(
// The scanner has no long-standing orders to re-issue
let (mut scanner, active_keys) = Scanner::new(coin.clone(), raw_db.clone());
let schedulers = HashMap::<Vec<u8>, Scheduler<C>>::new();
let mut schedulers = HashMap::<Vec<u8>, Scheduler<C>>::new();
let mut substrate_signers = HashMap::new();
let mut signers = HashMap::new();
let main_db = MainDb::new(raw_db.clone());
for key in &active_keys {
// TODO: Load existing schedulers
schedulers.insert(key.to_bytes().as_ref().to_vec(), Scheduler::from_db(raw_db, *key).unwrap());
let (substrate_keys, coin_keys) = key_gen.keys(key);
@ -589,14 +589,6 @@ async fn run<C: Coin, D: Db, Co: Coordinator>(mut raw_db: D, coin: C, mut coordi
substrate_mutable.scanner.drop_eventuality(id).await;
main_db.finish_signing(&mut txn, key, id);
txn.commit();
// TODO
// 1) We need to stop signing whenever a peer informs us or the chain has an
// eventuality
// 2) If a peer informed us of an eventuality without an outbound payment, stop
// scanning the chain for it (or at least ack it's solely for sanity purposes?)
// 3) When the chain has an eventuality, if it had an outbound payment, report it up to
// Substrate for logging purposes
}
}
}

View file

@ -1,14 +1,17 @@
use std::collections::{VecDeque, HashMap};
use std::{
io::{self, Read},
collections::{VecDeque, HashMap},
};
use frost::curve::Ciphersuite;
use ciphersuite::{group::GroupEncoding, Ciphersuite};
use crate::{
coins::{Output, Coin},
Payment, Plan,
DbTxn, Db, Payment, Plan,
};
/// Stateless, deterministic output/payment manager.
#[derive(Debug)]
#[derive(PartialEq, Eq, Debug)]
pub struct Scheduler<C: Coin> {
key: <C::Curve as Ciphersuite>::G,
@ -38,15 +41,115 @@ pub struct Scheduler<C: Coin> {
payments: VecDeque<Payment<C>>,
}
fn scheduler_key<D: Db, G: GroupEncoding>(key: &G) -> Vec<u8> {
D::key(b"SCHEDULER", b"scheduler", key.to_bytes())
}
impl<C: Coin> Scheduler<C> {
pub fn new(key: <C::Curve as Ciphersuite>::G) -> Self {
Scheduler {
fn read<R: Read>(key: <C::Curve as Ciphersuite>::G, reader: &mut R) -> io::Result<Self> {
let mut read_plans = || -> io::Result<_> {
let mut all_plans = HashMap::new();
let mut all_plans_len = [0; 4];
reader.read_exact(&mut all_plans_len)?;
for _ in 0 .. u32::from_le_bytes(all_plans_len) {
let mut amount = [0; 8];
reader.read_exact(&mut amount)?;
let amount = u64::from_le_bytes(amount);
let mut plans = VecDeque::new();
let mut plans_len = [0; 4];
reader.read_exact(&mut plans_len)?;
for _ in 0 .. u32::from_le_bytes(plans_len) {
let mut payments = vec![];
let mut payments_len = [0; 4];
reader.read_exact(&mut payments_len)?;
for _ in 0 .. u32::from_le_bytes(payments_len) {
payments.push(Payment::read(reader)?);
}
plans.push_back(payments);
}
all_plans.insert(amount, plans);
}
Ok(all_plans)
};
let queued_plans = read_plans()?;
let plans = read_plans()?;
let mut utxos = vec![];
let mut utxos_len = [0; 4];
reader.read_exact(&mut utxos_len)?;
for _ in 0 .. u32::from_le_bytes(utxos_len) {
utxos.push(C::Output::read(reader)?);
}
let mut payments = VecDeque::new();
let mut payments_len = [0; 4];
reader.read_exact(&mut payments_len)?;
for _ in 0 .. u32::from_le_bytes(payments_len) {
payments.push_back(Payment::read(reader)?);
}
Ok(Scheduler { key, queued_plans, plans, utxos, payments })
}
// TODO: Get rid of this
// We reserialize the entire scheduler on any mutation to save it to the DB which is horrible
// We should have an incremental solution
fn serialize(&self) -> Vec<u8> {
let mut res = Vec::with_capacity(4096);
let mut write_plans = |plans: &HashMap<u64, VecDeque<Vec<Payment<C>>>>| {
res.extend(u32::try_from(plans.len()).unwrap().to_le_bytes());
for (amount, list_of_plans) in plans {
res.extend(amount.to_le_bytes());
res.extend(u32::try_from(list_of_plans.len()).unwrap().to_le_bytes());
for plan in list_of_plans {
res.extend(u32::try_from(plan.len()).unwrap().to_le_bytes());
for payment in plan {
payment.write(&mut res).unwrap();
}
}
}
};
write_plans(&self.queued_plans);
write_plans(&self.plans);
res.extend(u32::try_from(self.utxos.len()).unwrap().to_le_bytes());
for utxo in &self.utxos {
utxo.write(&mut res).unwrap();
}
res.extend(u32::try_from(self.payments.len()).unwrap().to_le_bytes());
for payment in &self.payments {
payment.write(&mut res).unwrap();
}
debug_assert_eq!(&Self::read(self.key, &mut res.as_slice()).unwrap(), self);
res
}
pub fn new<D: Db>(txn: &mut D::Transaction<'_>, key: <C::Curve as Ciphersuite>::G) -> Self {
let res = Scheduler {
key,
queued_plans: HashMap::new(),
plans: HashMap::new(),
utxos: vec![],
payments: VecDeque::new(),
}
};
// Save it to disk so from_db won't panic if we don't mutate it before rebooting
txn.put(scheduler_key::<D, _>(&res.key), res.serialize());
res
}
pub fn from_db<D: Db>(db: &D, key: <C::Curve as Ciphersuite>::G) -> io::Result<Self> {
let scheduler = db.get(scheduler_key::<D, _>(&key)).unwrap_or_else(|| {
panic!("loading scheduler from DB without scheduler for {}", hex::encode(key.to_bytes()))
});
let mut reader_slice = scheduler.as_slice();
let reader = &mut reader_slice;
Self::read(key, reader)
}
fn execute(&mut self, inputs: Vec<C::Output>, mut payments: Vec<Payment<C>>) -> Plan<C> {
@ -141,7 +244,12 @@ impl<C: Coin> Scheduler<C> {
}
// Schedule a series of outputs/payments.
pub fn schedule(&mut self, utxos: Vec<C::Output>, payments: Vec<Payment<C>>) -> Vec<Plan<C>> {
pub fn schedule<D: Db>(
&mut self,
txn: &mut D::Transaction<'_>,
utxos: Vec<C::Output>,
payments: Vec<Payment<C>>,
) -> Vec<Plan<C>> {
let mut plans = self.add_outputs(utxos);
log::info!("scheduling {} new payments", payments.len());
@ -222,6 +330,8 @@ impl<C: Coin> Scheduler<C> {
self.utxos.extend(utxos);
}
txn.put(scheduler_key::<D, _>(&self.key), self.serialize());
log::info!(
"created {} plans containing {} payments to sign",
plans.len(),
@ -235,7 +345,12 @@ impl<C: Coin> Scheduler<C> {
// This can be called whenever, so long as it's properly ordered
// (it's independent to Serai/the chain we're scheduling over, yet still expects outputs to be
// created in the same order Plans are returned in)
pub fn created_output(&mut self, expected: u64, actual: Option<u64>) {
pub fn created_output<D: Db>(
&mut self,
txn: &mut D::Transaction<'_>,
expected: u64,
actual: Option<u64>,
) {
log::debug!("output expected to have {} had {:?} after fees", expected, actual);
// Get the payments this output is expected to handle
@ -280,5 +395,8 @@ impl<C: Coin> Scheduler<C> {
}
self.plans.entry(actual).or_insert(VecDeque::new()).push_back(payments);
// TODO: This shows how ridiculous the serialize function is
txn.put(scheduler_key::<D, _>(&self.key), self.serialize());
}
}

View file

@ -49,10 +49,15 @@ pub async fn test_wallet<C: Coin>(coin: C) {
}
};
let mut scheduler = Scheduler::new(key);
let mut txn = db.txn();
let mut scheduler = Scheduler::new::<MemDb>(&mut txn, key);
let amount = 2 * C::DUST;
let plans = scheduler
.schedule(outputs.clone(), vec![Payment { address: C::address(key), data: None, amount }]);
let plans = scheduler.schedule::<MemDb>(
&mut txn,
outputs.clone(),
vec![Payment { address: C::address(key), data: None, amount }],
);
txn.commit();
assert_eq!(
plans,
vec![Plan {

View file

@ -85,7 +85,7 @@ pub fn reproducibly_builds() {
break;
}
// If we didn't get resuts from all runners, panic
// If we didn't get results from all runners, panic
for item in &res {
if item.is_none() {
panic!("couldn't get runtime hashes within allowed time");