mirror of
https://github.com/serai-dex/serai.git
synced 2025-04-23 06:28:14 +00:00
implement db macro for processor/signer.rs (#438)
* implement db macro for processor/signer.rs * Use () * CompletedDb -> CompletionsDb * () -> &() --------- Co-authored-by: Luke Parker <lukeparker5132@gmail.com>
This commit is contained in:
parent
aa9daea6b0
commit
8634d90b6b
1 changed files with 83 additions and 100 deletions
|
@ -2,7 +2,6 @@ use core::{marker::PhantomData, fmt};
|
|||
use std::collections::HashMap;
|
||||
|
||||
use rand_core::OsRng;
|
||||
|
||||
use ciphersuite::group::GroupEncoding;
|
||||
use frost::{
|
||||
ThresholdKeys, FrostError,
|
||||
|
@ -13,69 +12,74 @@ use log::{info, debug, warn, error};
|
|||
|
||||
use scale::Encode;
|
||||
use messages::sign::*;
|
||||
|
||||
pub use serai_db::*;
|
||||
|
||||
use crate::{
|
||||
Get, DbTxn, Db,
|
||||
networks::{Transaction, Eventuality, Network},
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SignerDb<N: Network, D: Db>(D, PhantomData<N>);
|
||||
impl<N: Network, D: Db> SignerDb<N, D> {
|
||||
fn sign_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
|
||||
D::key(b"SIGNER", dst, key)
|
||||
create_db!(
|
||||
SignerDb {
|
||||
CompletionsDb: (id: [u8; 32]) -> Vec<u8>,
|
||||
EventualityDb: (id: [u8; 32]) -> Vec<u8>,
|
||||
AttemptDb: (id: &SignId) -> (),
|
||||
TransactionDb: (id: &[u8]) -> Vec<u8>,
|
||||
ActiveSignsDb: () -> Vec<[u8; 32]>,
|
||||
CompletedOnChainDb: (id: &[u8; 32]) -> (),
|
||||
}
|
||||
);
|
||||
|
||||
fn active_signs_key() -> Vec<u8> {
|
||||
Self::sign_key(b"active_signs", [])
|
||||
}
|
||||
fn completed_on_chain_key(id: &[u8; 32]) -> Vec<u8> {
|
||||
Self::sign_key(b"completed_on_chain", id)
|
||||
}
|
||||
fn active_signs<G: Get>(getter: &G) -> Vec<[u8; 32]> {
|
||||
let active = getter.get(Self::active_signs_key()).unwrap_or(vec![]);
|
||||
let mut active_ref = active.as_slice();
|
||||
let mut res = vec![];
|
||||
while !active_ref.is_empty() {
|
||||
res.push(active_ref[.. 32].try_into().unwrap());
|
||||
active_ref = &active_ref[32 ..];
|
||||
}
|
||||
res
|
||||
}
|
||||
fn add_active_sign(txn: &mut D::Transaction<'_>, id: &[u8; 32]) {
|
||||
if txn.get(Self::completed_on_chain_key(id)).is_some() {
|
||||
impl ActiveSignsDb {
|
||||
fn add_active_sign(txn: &mut impl DbTxn, id: &[u8; 32]) {
|
||||
if CompletedOnChainDb::get(txn, id).is_some() {
|
||||
return;
|
||||
}
|
||||
let key = Self::active_signs_key();
|
||||
let mut active = txn.get(&key).unwrap_or(vec![]);
|
||||
active.extend(id);
|
||||
txn.put(key, active);
|
||||
let mut active = ActiveSignsDb::get(txn).unwrap_or_default();
|
||||
active.push(*id);
|
||||
ActiveSignsDb::set(txn, &active);
|
||||
}
|
||||
fn complete_on_chain(txn: &mut D::Transaction<'_>, id: &[u8; 32]) {
|
||||
txn.put(Self::completed_on_chain_key(id), []);
|
||||
txn.put(
|
||||
Self::active_signs_key(),
|
||||
Self::active_signs(txn)
|
||||
}
|
||||
|
||||
impl CompletedOnChainDb {
|
||||
fn complete_on_chain(txn: &mut impl DbTxn, id: &[u8; 32]) {
|
||||
CompletedOnChainDb::set(txn, id, &());
|
||||
ActiveSignsDb::set(
|
||||
txn,
|
||||
&ActiveSignsDb::get(txn)
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.filter(|active| active != id)
|
||||
.flatten()
|
||||
.collect::<Vec<_>>(),
|
||||
);
|
||||
}
|
||||
}
|
||||
impl CompletionsDb {
|
||||
fn completions<N: Network>(
|
||||
getter: &impl Get,
|
||||
id: [u8; 32],
|
||||
) -> Vec<<N::Transaction as Transaction<N>>::Id> {
|
||||
let completions = Self::get(getter, id).unwrap_or_default();
|
||||
let mut completions_ref = completions.as_slice();
|
||||
let mut res = vec![];
|
||||
while !completions_ref.is_empty() {
|
||||
let mut id = <N::Transaction as Transaction<N>>::Id::default();
|
||||
let id_len = id.as_ref().len();
|
||||
id.as_mut().copy_from_slice(&completions_ref[.. id_len]);
|
||||
completions_ref = &completions_ref[id_len ..];
|
||||
res.push(id);
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
fn transaction_key(id: &<N::Transaction as Transaction<N>>::Id) -> Vec<u8> {
|
||||
Self::sign_key(b"tx", id)
|
||||
}
|
||||
fn completions_key(id: [u8; 32]) -> Vec<u8> {
|
||||
Self::sign_key(b"completed", id)
|
||||
}
|
||||
fn complete(txn: &mut D::Transaction<'_>, id: [u8; 32], tx: &N::Transaction) {
|
||||
fn complete<N: Network>(txn: &mut impl DbTxn, id: [u8; 32], tx: &N::Transaction) {
|
||||
let tx_id = tx.id();
|
||||
// Transactions can be completed by multiple signatures
|
||||
// Save every solution in order to be robust
|
||||
let tx_id = tx.id();
|
||||
txn.put(Self::transaction_key(&tx_id), tx.serialize());
|
||||
|
||||
let mut existing = txn.get(Self::completions_key(id)).unwrap_or(vec![]);
|
||||
|
||||
TransactionDb::save_transaction::<N>(txn, tx);
|
||||
let mut existing = Self::get(txn, id).unwrap_or_default();
|
||||
// Don't add this TX if it's already present
|
||||
let tx_len = tx_id.as_ref().len();
|
||||
assert_eq!(existing.len() % tx_len, 0);
|
||||
|
@ -89,50 +93,30 @@ impl<N: Network, D: Db> SignerDb<N, D> {
|
|||
}
|
||||
|
||||
existing.extend(tx_id.as_ref());
|
||||
txn.put(Self::completions_key(id), existing);
|
||||
Self::set(txn, id, &existing);
|
||||
}
|
||||
fn completions<G: Get>(getter: &G, id: [u8; 32]) -> Vec<<N::Transaction as Transaction<N>>::Id> {
|
||||
let completions = getter.get(Self::completions_key(id)).unwrap_or(vec![]);
|
||||
let mut completions_ref = completions.as_slice();
|
||||
let mut res = vec![];
|
||||
while !completions_ref.is_empty() {
|
||||
let mut id = <N::Transaction as Transaction<N>>::Id::default();
|
||||
let id_len = id.as_ref().len();
|
||||
id.as_mut().copy_from_slice(&completions_ref[.. id_len]);
|
||||
completions_ref = &completions_ref[id_len ..];
|
||||
res.push(id);
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
impl EventualityDb {
|
||||
fn save_eventuality<N: Network>(txn: &mut impl DbTxn, id: [u8; 32], eventuality: N::Eventuality) {
|
||||
txn.put(Self::key(id), eventuality.serialize());
|
||||
}
|
||||
fn transaction<G: Get>(
|
||||
getter: &G,
|
||||
|
||||
fn eventuality<N: Network>(getter: &impl Get, id: [u8; 32]) -> Option<N::Eventuality> {
|
||||
Some(N::Eventuality::read(&mut getter.get(Self::key(id))?.as_slice()).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
impl TransactionDb {
|
||||
fn save_transaction<N: Network>(txn: &mut impl DbTxn, tx: &N::Transaction) {
|
||||
Self::set(txn, tx.id().as_ref(), &tx.serialize());
|
||||
}
|
||||
|
||||
fn transaction<N: Network>(
|
||||
getter: &impl Get,
|
||||
id: <N::Transaction as Transaction<N>>::Id,
|
||||
) -> Option<N::Transaction> {
|
||||
getter
|
||||
.get(Self::transaction_key(&id))
|
||||
.map(|tx| N::Transaction::read(&mut tx.as_slice()).unwrap())
|
||||
}
|
||||
|
||||
fn eventuality_key(id: [u8; 32]) -> Vec<u8> {
|
||||
Self::sign_key(b"eventuality", id)
|
||||
}
|
||||
fn save_eventuality(txn: &mut D::Transaction<'_>, id: [u8; 32], eventuality: N::Eventuality) {
|
||||
txn.put(Self::eventuality_key(id), eventuality.serialize());
|
||||
}
|
||||
fn eventuality<G: Get>(getter: &G, id: [u8; 32]) -> Option<N::Eventuality> {
|
||||
Some(
|
||||
N::Eventuality::read::<&[u8]>(&mut getter.get(Self::eventuality_key(id))?.as_ref()).unwrap(),
|
||||
)
|
||||
}
|
||||
|
||||
fn attempt_key(id: &SignId) -> Vec<u8> {
|
||||
Self::sign_key(b"attempt", id.encode())
|
||||
}
|
||||
fn attempt(txn: &mut D::Transaction<'_>, id: &SignId) {
|
||||
txn.put(Self::attempt_key(id), []);
|
||||
}
|
||||
fn has_attempt<G: Get>(getter: &G, id: &SignId) -> bool {
|
||||
getter.get(Self::attempt_key(id)).is_some()
|
||||
Self::get(getter, id.as_ref()).map(|tx| N::Transaction::read(&mut tx.as_slice()).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -175,12 +159,12 @@ impl<N: Network, D: Db> Signer<N, D> {
|
|||
pub async fn rebroadcast_task(db: D, network: N) {
|
||||
log::info!("rebroadcasting transactions for plans whose completions yet to be confirmed...");
|
||||
loop {
|
||||
for active in SignerDb::<N, D>::active_signs(&db) {
|
||||
for completion in SignerDb::<N, D>::completions(&db, active) {
|
||||
for active in ActiveSignsDb::get(&db).unwrap_or_default() {
|
||||
for completion in CompletionsDb::completions::<N>(&db, active) {
|
||||
log::info!("rebroadcasting {}", hex::encode(&completion));
|
||||
// TODO: Don't drop the error entirely. Check for invariants
|
||||
let _ = network
|
||||
.publish_transaction(&SignerDb::<N, D>::transaction(&db, completion).unwrap())
|
||||
.publish_transaction(&TransactionDb::transaction::<N>(&db, completion).unwrap())
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
@ -237,7 +221,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
|||
|
||||
#[must_use]
|
||||
fn already_completed(&self, txn: &mut D::Transaction<'_>, id: [u8; 32]) -> bool {
|
||||
if !SignerDb::<N, D>::completions(txn, id).is_empty() {
|
||||
if !CompletionsDb::completions::<N>(txn, id).is_empty() {
|
||||
debug!(
|
||||
"SignTransaction/Reattempt order for {}, which we've already completed signing",
|
||||
hex::encode(id)
|
||||
|
@ -284,8 +268,8 @@ impl<N: Network, D: Db> Signer<N, D> {
|
|||
let first_completion = !self.already_completed(txn, id);
|
||||
|
||||
// Save this completion to the DB
|
||||
SignerDb::<N, D>::complete_on_chain(txn, &id);
|
||||
SignerDb::<N, D>::complete(txn, id, &tx);
|
||||
CompletedOnChainDb::complete_on_chain(txn, &id);
|
||||
CompletionsDb::complete::<N>(txn, id, &tx);
|
||||
|
||||
if first_completion {
|
||||
Some(self.complete(id, tx.id()))
|
||||
|
@ -303,7 +287,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
|||
id: [u8; 32],
|
||||
tx_id: &<N::Transaction as Transaction<N>>::Id,
|
||||
) -> Option<ProcessorMessage> {
|
||||
if let Some(eventuality) = SignerDb::<N, D>::eventuality(txn, id) {
|
||||
if let Some(eventuality) = EventualityDb::eventuality::<N>(txn, id) {
|
||||
// Transaction hasn't hit our mempool/was dropped for a different signature
|
||||
// The latter can happen given certain latency conditions/a single malicious signer
|
||||
// In the case of a single malicious signer, they can drag multiple honest validators down
|
||||
|
@ -324,7 +308,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
|||
let first_completion = !self.already_completed(txn, id);
|
||||
|
||||
// Save this completion to the DB
|
||||
SignerDb::<N, D>::complete(txn, id, &tx);
|
||||
CompletionsDb::complete::<N>(txn, id, &tx);
|
||||
|
||||
if first_completion {
|
||||
return Some(self.complete(id, tx.id()));
|
||||
|
@ -338,7 +322,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
|||
}
|
||||
} else {
|
||||
// If we don't have this in RAM, it should be because we already finished signing it
|
||||
assert!(!SignerDb::<N, D>::completions(txn, id).is_empty());
|
||||
assert!(!CompletionsDb::completions::<N>(txn, id).is_empty());
|
||||
info!(
|
||||
"signer {} informed of the eventuality completion for plan {}, {}",
|
||||
hex::encode(self.keys[0].group_key().to_bytes()),
|
||||
|
@ -405,7 +389,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
|||
//
|
||||
// Only run if this hasn't already been attempted
|
||||
// TODO: This isn't complete as this txn may not be committed with the expected timing
|
||||
if SignerDb::<N, D>::has_attempt(txn, &id) {
|
||||
if AttemptDb::get(txn, &id).is_some() {
|
||||
warn!(
|
||||
"already attempted {} #{}. this is an error if we didn't reboot",
|
||||
hex::encode(id.id),
|
||||
|
@ -413,8 +397,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
|||
);
|
||||
return None;
|
||||
}
|
||||
|
||||
SignerDb::<N, D>::attempt(txn, &id);
|
||||
AttemptDb::set(txn, &id, &());
|
||||
|
||||
// Attempt to create the TX
|
||||
let mut machines = vec![];
|
||||
|
@ -451,13 +434,13 @@ impl<N: Network, D: Db> Signer<N, D> {
|
|||
) -> Option<ProcessorMessage> {
|
||||
// The caller is expected to re-issue sign orders on reboot
|
||||
// This is solely used by the rebroadcast task
|
||||
SignerDb::<N, D>::add_active_sign(txn, &id);
|
||||
ActiveSignsDb::add_active_sign(txn, &id);
|
||||
|
||||
if self.already_completed(txn, id) {
|
||||
return None;
|
||||
}
|
||||
|
||||
SignerDb::<N, D>::save_eventuality(txn, id, eventuality);
|
||||
EventualityDb::save_eventuality::<N>(txn, id, eventuality);
|
||||
|
||||
self.signable.insert(id, tx);
|
||||
self.attempt(txn, id, 0).await
|
||||
|
@ -605,7 +588,7 @@ impl<N: Network, D: Db> Signer<N, D> {
|
|||
};
|
||||
|
||||
// Save the transaction in case it's needed for recovery
|
||||
SignerDb::<N, D>::complete(txn, id.id, &tx);
|
||||
CompletionsDb::complete::<N>(txn, id.id, &tx);
|
||||
|
||||
// Publish it
|
||||
let tx_id = tx.id();
|
||||
|
|
Loading…
Reference in a new issue