From 0d2396476278d304daafd3f94465abae29a6be1c Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Mon, 23 Oct 2023 05:07:38 -0400 Subject: [PATCH] Resolve #335 --- processor/src/main.rs | 4 + processor/src/networks/bitcoin.rs | 5 ++ processor/src/networks/mod.rs | 1 + processor/src/networks/monero.rs | 4 + processor/src/signer.rs | 126 ++++++++++++++++++++++++------ 5 files changed, 114 insertions(+), 26 deletions(-) diff --git a/processor/src/main.rs b/processor/src/main.rs index 19b0ed06..799c12cf 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -459,6 +459,10 @@ async fn boot( signers.insert(key.as_ref().to_vec(), signer); } + // Spawn a task to rebroadcast signed TXs yet to be mined into a finalized block + // This hedges against being dropped due to full mempools, temporarily too low of a fee... + tokio::spawn(Signer::::rebroadcast_task(raw_db.clone(), network.clone())); + (main_db, TributaryMutable { key_gen, substrate_signer, signers }, multisig_manager) } diff --git a/processor/src/networks/bitcoin.rs b/processor/src/networks/bitcoin.rs index 985d39a2..532200ce 100644 --- a/processor/src/networks/bitcoin.rs +++ b/processor/src/networks/bitcoin.rs @@ -163,6 +163,11 @@ impl TransactionTrait for Transaction { self.consensus_encode(&mut buf).unwrap(); buf } + fn read(reader: &mut R) -> io::Result { + Transaction::consensus_decode(reader) + .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{e}"))) + } + #[cfg(test)] async fn fee(&self, network: &Bitcoin) -> u64 { let mut value = 0; diff --git a/processor/src/networks/mod.rs b/processor/src/networks/mod.rs index da2b6078..5d8fea11 100644 --- a/processor/src/networks/mod.rs +++ b/processor/src/networks/mod.rs @@ -119,6 +119,7 @@ pub trait Transaction: Send + Sync + Sized + Clone + Debug { type Id: 'static + Id; fn id(&self) -> Self::Id; fn serialize(&self) -> Vec; + fn read(reader: &mut R) -> io::Result; #[cfg(test)] async fn fee(&self, network: &N) -> u64; diff --git a/processor/src/networks/monero.rs b/processor/src/networks/monero.rs index cc92a267..a89f9e0b 100644 --- a/processor/src/networks/monero.rs +++ b/processor/src/networks/monero.rs @@ -114,6 +114,10 @@ impl TransactionTrait for Transaction { fn serialize(&self) -> Vec { self.serialize() } + fn read(reader: &mut R) -> io::Result { + Transaction::read(reader) + } + #[cfg(test)] async fn fee(&self, _: &Monero) -> u64 { self.rct_signatures.base.fee diff --git a/processor/src/signer.rs b/processor/src/signer.rs index e098c07c..ac3adc80 100644 --- a/processor/src/signer.rs +++ b/processor/src/signer.rs @@ -31,35 +31,92 @@ impl SignerDb { D::key(b"SIGNER", dst, key) } - fn completed_key(id: [u8; 32]) -> Vec { + fn active_signs_key() -> Vec { + Self::sign_key(b"active_signs", []) + } + fn completed_on_chain_key(id: &[u8; 32]) -> Vec { + Self::sign_key(b"completed_on_chain", id) + } + fn active_signs(getter: &G) -> Vec<[u8; 32]> { + let active = getter.get(Self::active_signs_key()).unwrap_or(vec![]); + let mut active_ref = active.as_slice(); + let mut res = vec![]; + while !active_ref.is_empty() { + res.push(active_ref[.. 32].try_into().unwrap()); + active_ref = &active_ref[32 ..]; + } + res + } + fn add_active_sign(txn: &mut D::Transaction<'_>, id: &[u8; 32]) { + if txn.get(Self::completed_on_chain_key(id)).is_some() { + return; + } + let key = Self::active_signs_key(); + let mut active = txn.get(&key).unwrap_or(vec![]); + active.extend(id); + txn.put(key, active); + } + fn complete_on_chain(txn: &mut D::Transaction<'_>, id: &[u8; 32]) { + txn.put(Self::completed_on_chain_key(id), []); + txn.put( + Self::active_signs_key(), + Self::active_signs(txn) + .into_iter() + .filter(|active| active != id) + .flatten() + .collect::>(), + ); + } + + fn transaction_key(id: &>::Id) -> Vec { + Self::sign_key(b"tx", id) + } + fn completions_key(id: [u8; 32]) -> Vec { Self::sign_key(b"completed", id) } - fn complete( - txn: &mut D::Transaction<'_>, - id: [u8; 32], - tx: &>::Id, - ) { + fn complete(txn: &mut D::Transaction<'_>, id: [u8; 32], tx: &N::Transaction) { // Transactions can be completed by multiple signatures // Save every solution in order to be robust - let mut existing = txn.get(Self::completed_key(id)).unwrap_or(vec![]); + let tx_id = tx.id(); + txn.put(Self::transaction_key(&tx_id), tx.serialize()); + + let mut existing = txn.get(Self::completions_key(id)).unwrap_or(vec![]); // Don't add this TX if it's already present - let tx_len = tx.as_ref().len(); + let tx_len = tx_id.as_ref().len(); assert_eq!(existing.len() % tx_len, 0); let mut i = 0; while i < existing.len() { - if &existing[i .. (i + tx_len)] == tx.as_ref() { + if &existing[i .. (i + tx_len)] == tx_id.as_ref() { return; } i += tx_len; } - existing.extend(tx.as_ref()); - txn.put(Self::completed_key(id), existing); + existing.extend(tx_id.as_ref()); + txn.put(Self::completions_key(id), existing); } - fn completed(getter: &G, id: [u8; 32]) -> Option> { - getter.get(Self::completed_key(id)) + fn completions(getter: &G, id: [u8; 32]) -> Vec<>::Id> { + let completions = getter.get(Self::completions_key(id)).unwrap_or(vec![]); + let mut completions_ref = completions.as_slice(); + let mut res = vec![]; + while !completions_ref.is_empty() { + let mut id = >::Id::default(); + let id_len = id.as_ref().len(); + id.as_mut().copy_from_slice(&completions_ref[.. id_len]); + completions_ref = &completions_ref[id_len ..]; + res.push(id); + } + res + } + fn transaction( + getter: &G, + id: >::Id, + ) -> Option { + getter + .get(Self::transaction_key(&id)) + .map(|tx| N::Transaction::read(&mut tx.as_slice()).unwrap()) } fn eventuality_key(id: [u8; 32]) -> Vec { @@ -83,10 +140,6 @@ impl SignerDb { fn has_attempt(getter: &G, id: &SignId) -> bool { getter.get(Self::attempt_key(id)).is_some() } - - fn save_transaction(txn: &mut D::Transaction<'_>, tx: &N::Transaction) { - txn.put(Self::sign_key(b"tx", tx.id()), tx.serialize()); - } } pub struct Signer { @@ -122,6 +175,25 @@ impl fmt::Debug for Signer { } impl Signer { + /// Rebroadcast already signed TXs which haven't had their completions mined into a sufficiently + /// confirmed block. + pub async fn rebroadcast_task(db: D, network: N) { + log::info!("rebroadcasting transactions for plans whose completions yet to be confirmed..."); + loop { + for active in SignerDb::::active_signs(&db) { + for completion in SignerDb::::completions(&db, active) { + log::info!("rebroadcasting {}", hex::encode(&completion)); + // TODO: Don't drop the error entirely. Check for invariants + let _ = network + .publish_transaction(&SignerDb::::transaction(&db, completion).unwrap()) + .await; + } + } + // Only run every five minutes so we aren't frequently loading tens to hundreds of KB from + // the DB + tokio::time::sleep(core::time::Duration::from_secs(5 * 60)).await; + } + } pub fn new(network: N, keys: ThresholdKeys) -> Signer { Signer { db: PhantomData, @@ -170,7 +242,7 @@ impl Signer { } fn already_completed(&self, txn: &mut D::Transaction<'_>, id: [u8; 32]) -> bool { - if SignerDb::::completed(txn, id).is_some() { + if !SignerDb::::completions(txn, id).is_empty() { debug!( "SignTransaction/Reattempt order for {}, which we've already completed signing", hex::encode(id) @@ -202,8 +274,8 @@ impl Signer { let first_completion = !self.already_completed(txn, id); // Save this completion to the DB - SignerDb::::save_transaction(txn, &tx); - SignerDb::::complete(txn, id, &tx.id()); + SignerDb::::complete_on_chain(txn, &id); + SignerDb::::complete(txn, id, &tx); if first_completion { self.complete(id, tx.id()); @@ -238,8 +310,7 @@ impl Signer { let first_completion = !self.already_completed(txn, id); // Save this completion to the DB - SignerDb::::save_transaction(txn, &tx); - SignerDb::::complete(txn, id, tx_id); + SignerDb::::complete(txn, id, &tx); if first_completion { self.complete(id, tx.id()); @@ -255,7 +326,7 @@ impl Signer { } else { // If we don't have this in RAM, it should be because we already finished signing it // TODO: Will the coordinator ever send us Completed for an unknown ID? - assert!(SignerDb::::completed(txn, id).is_some()); + assert!(!SignerDb::::completions(txn, id).is_empty()); info!( "signer {} informed of the eventuality completion for plan {}, {}", hex::encode(self.keys.group_key().to_bytes()), @@ -356,6 +427,10 @@ impl Signer { tx: N::SignableTransaction, eventuality: N::Eventuality, ) { + // The caller is expected to re-issue sign orders on reboot + // This is solely used by the rebroadcast task + SignerDb::::add_active_sign(txn, &id); + if self.already_completed(txn, id) { return; } @@ -462,11 +537,10 @@ impl Signer { }; // Save the transaction in case it's needed for recovery - SignerDb::::save_transaction(txn, &tx); - let tx_id = tx.id(); - SignerDb::::complete(txn, id.id, &tx_id); + SignerDb::::complete(txn, id.id, &tx); // Publish it + let tx_id = tx.id(); if let Err(e) = self.network.publish_transaction(&tx).await { error!("couldn't publish {:?}: {:?}", tx, e); } else {