diff --git a/processor/scanner/src/lib.rs b/processor/scanner/src/lib.rs index a5c5c038..6ac45223 100644 --- a/processor/scanner/src/lib.rs +++ b/processor/scanner/src/lib.rs @@ -145,25 +145,27 @@ pub trait ScannerFeed: 'static + Send + Sync + Clone { getter: &(impl Send + Sync + Get), number: u64, ) -> impl Send + Future> { - async move {let block = match self.unchecked_block_by_number(number).await { - Ok(block) => block, - Err(e) => Err(format!("couldn't fetch block {number}: {e:?}"))?, - }; + async move { + let block = match self.unchecked_block_by_number(number).await { + Ok(block) => block, + Err(e) => Err(format!("couldn't fetch block {number}: {e:?}"))?, + }; - // Check the ID of this block is the expected ID - { - let expected = crate::index::block_id(getter, number); - if block.id() != expected { - panic!( - "finalized chain reorganized from {} to {} at {}", - hex::encode(expected), - hex::encode(block.id()), - number, - ); + // Check the ID of this block is the expected ID + { + let expected = crate::index::block_id(getter, number); + if block.id() != expected { + panic!( + "finalized chain reorganized from {} to {} at {}", + hex::encode(expected), + hex::encode(block.id()), + number, + ); + } } - } - Ok(block)} + Ok(block) + } } /// The dust threshold for the specified coin. diff --git a/processor/signers/src/lib.rs b/processor/signers/src/lib.rs index c76fbd32..a6714fdf 100644 --- a/processor/signers/src/lib.rs +++ b/processor/signers/src/lib.rs @@ -65,8 +65,10 @@ pub trait Coordinator: 'static + Send + Sync { ) -> impl Send + Future>; /// Publish a `Batch`. - fn publish_batch(&mut self, batch: Batch) - -> impl Send + Future>; + fn publish_batch( + &mut self, + batch: Batch, + ) -> impl Send + Future>; /// Publish a `SignedBatch`. fn publish_signed_batch( diff --git a/processor/signers/src/transaction/mod.rs b/processor/signers/src/transaction/mod.rs index f089e931..efb20217 100644 --- a/processor/signers/src/transaction/mod.rs +++ b/processor/signers/src/transaction/mod.rs @@ -92,145 +92,147 @@ impl> impl>> ContinuallyRan for TransactionSignerTask { - fn run_iteration(&mut self) -> impl Send + Future> {async{ - let mut iterated = false; + fn run_iteration(&mut self) -> impl Send + Future> { + async { + let mut iterated = false; - // Check for new transactions to sign - loop { - let mut txn = self.db.txn(); - let Some(tx) = TransactionsToSign::::try_recv(&mut txn, &self.keys[0].group_key()) else { - break; - }; - iterated = true; + // Check for new transactions to sign + loop { + let mut txn = self.db.txn(); + let Some(tx) = TransactionsToSign::::try_recv(&mut txn, &self.keys[0].group_key()) + else { + break; + }; + iterated = true; - // Save this to the database as a transaction to sign - self.active_signing_protocols.insert(tx.id()); - ActiveSigningProtocols::set( - &mut txn, - self.session, - &self.active_signing_protocols.iter().copied().collect(), - ); - { - let mut buf = Vec::with_capacity(256); - tx.write(&mut buf).unwrap(); - SerializedSignableTransactions::set(&mut txn, tx.id(), &buf); + // Save this to the database as a transaction to sign + self.active_signing_protocols.insert(tx.id()); + ActiveSigningProtocols::set( + &mut txn, + self.session, + &self.active_signing_protocols.iter().copied().collect(), + ); + { + let mut buf = Vec::with_capacity(256); + tx.write(&mut buf).unwrap(); + SerializedSignableTransactions::set(&mut txn, tx.id(), &buf); + } + + let mut machines = Vec::with_capacity(self.keys.len()); + for keys in &self.keys { + machines.push(tx.clone().sign(keys.clone())); + } + for msg in self.attempt_manager.register(VariantSignId::Transaction(tx.id()), machines) { + TransactionSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + } + + txn.commit(); } - let mut machines = Vec::with_capacity(self.keys.len()); - for keys in &self.keys { - machines.push(tx.clone().sign(keys.clone())); - } - for msg in self.attempt_manager.register(VariantSignId::Transaction(tx.id()), machines) { - TransactionSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + // Check for completed Eventualities (meaning we should no longer sign for these transactions) + loop { + let mut txn = self.db.txn(); + let Some(id) = CompletedEventualities::try_recv(&mut txn, &self.keys[0].group_key()) else { + break; + }; + + /* + We may have yet to register this signing protocol. + + While `TransactionsToSign` is populated before `CompletedEventualities`, we could + theoretically have `TransactionsToSign` populated with a new transaction _while iterating + over `CompletedEventualities`_, and then have `CompletedEventualities` populated. In that + edge case, we will see the completion notification before we see the transaction. + + In such a case, we break (dropping the txn, re-queueing the completion notification). On + the task's next iteration, we'll process the transaction from `TransactionsToSign` and be + able to make progress. + */ + if !self.active_signing_protocols.remove(&id) { + break; + } + iterated = true; + + // Since it was, remove this as an active signing protocol + ActiveSigningProtocols::set( + &mut txn, + self.session, + &self.active_signing_protocols.iter().copied().collect(), + ); + // Clean up the database + SerializedSignableTransactions::del(&mut txn, id); + SerializedTransactions::del(&mut txn, id); + + // We retire with a txn so we either successfully flag this Eventuality as completed, and + // won't re-register it (making this retire safe), or we don't flag it, meaning we will + // re-register it, yet that's safe as we have yet to retire it + self.attempt_manager.retire(&mut txn, VariantSignId::Transaction(id)); + + txn.commit(); } - txn.commit(); - } + // Handle any messages sent to us + loop { + let mut txn = self.db.txn(); + let Some(msg) = CoordinatorToTransactionSignerMessages::try_recv(&mut txn, self.session) + else { + break; + }; + iterated = true; - // Check for completed Eventualities (meaning we should no longer sign for these transactions) - loop { - let mut txn = self.db.txn(); - let Some(id) = CompletedEventualities::try_recv(&mut txn, &self.keys[0].group_key()) else { - break; - }; + match self.attempt_manager.handle(msg) { + Response::Messages(msgs) => { + for msg in msgs { + TransactionSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + } + } + Response::Signature { id, signature: signed_tx } => { + let signed_tx: TransactionFor = signed_tx.into(); - /* - We may have yet to register this signing protocol. + // Save this transaction to the database + { + let mut buf = Vec::with_capacity(256); + signed_tx.write(&mut buf).unwrap(); + SerializedTransactions::set( + &mut txn, + match id { + VariantSignId::Transaction(id) => id, + _ => panic!("TransactionSignerTask signed a non-transaction"), + }, + &buf, + ); + } - While `TransactionsToSign` is populated before `CompletedEventualities`, we could - theoretically have `TransactionsToSign` populated with a new transaction _while iterating - over `CompletedEventualities`_, and then have `CompletedEventualities` populated. In that - edge case, we will see the completion notification before we see the transaction. - - In such a case, we break (dropping the txn, re-queueing the completion notification). On - the task's next iteration, we'll process the transaction from `TransactionsToSign` and be - able to make progress. - */ - if !self.active_signing_protocols.remove(&id) { - break; - } - iterated = true; - - // Since it was, remove this as an active signing protocol - ActiveSigningProtocols::set( - &mut txn, - self.session, - &self.active_signing_protocols.iter().copied().collect(), - ); - // Clean up the database - SerializedSignableTransactions::del(&mut txn, id); - SerializedTransactions::del(&mut txn, id); - - // We retire with a txn so we either successfully flag this Eventuality as completed, and - // won't re-register it (making this retire safe), or we don't flag it, meaning we will - // re-register it, yet that's safe as we have yet to retire it - self.attempt_manager.retire(&mut txn, VariantSignId::Transaction(id)); - - txn.commit(); - } - - // Handle any messages sent to us - loop { - let mut txn = self.db.txn(); - let Some(msg) = CoordinatorToTransactionSignerMessages::try_recv(&mut txn, self.session) - else { - break; - }; - iterated = true; - - match self.attempt_manager.handle(msg) { - Response::Messages(msgs) => { - for msg in msgs { - TransactionSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); + match self.publisher.publish(signed_tx).await { + Ok(()) => {} + Err(e) => log::warn!("couldn't broadcast transaction: {e:?}"), + } } } - Response::Signature { id, signature: signed_tx } => { - let signed_tx: TransactionFor = signed_tx.into(); - // Save this transaction to the database - { - let mut buf = Vec::with_capacity(256); - signed_tx.write(&mut buf).unwrap(); - SerializedTransactions::set( - &mut txn, - match id { - VariantSignId::Transaction(id) => id, - _ => panic!("TransactionSignerTask signed a non-transaction"), - }, - &buf, - ); - } + txn.commit(); + } - match self.publisher.publish(signed_tx).await { - Ok(()) => {} - Err(e) => log::warn!("couldn't broadcast transaction: {e:?}"), - } + // If it's been five minutes since the last publication, republish the transactions for all + // active signing protocols + if Instant::now().duration_since(self.last_publication) > Duration::from_secs(5 * 60) { + for tx in &self.active_signing_protocols { + let Some(tx_buf) = SerializedTransactions::get(&self.db, *tx) else { continue }; + let mut tx_buf = tx_buf.as_slice(); + let tx = TransactionFor::::read(&mut tx_buf).unwrap(); + assert!(tx_buf.is_empty()); + + self + .publisher + .publish(tx) + .await + .map_err(|e| format!("couldn't re-broadcast transactions: {e:?}"))?; } + + self.last_publication = Instant::now(); } - txn.commit(); + Ok(iterated) } - - // If it's been five minutes since the last publication, republish the transactions for all - // active signing protocols - if Instant::now().duration_since(self.last_publication) > Duration::from_secs(5 * 60) { - for tx in &self.active_signing_protocols { - let Some(tx_buf) = SerializedTransactions::get(&self.db, *tx) else { continue }; - let mut tx_buf = tx_buf.as_slice(); - let tx = TransactionFor::::read(&mut tx_buf).unwrap(); - assert!(tx_buf.is_empty()); - - self - .publisher - .publish(tx) - .await - .map_err(|e| format!("couldn't re-broadcast transactions: {e:?}"))?; - } - - self.last_publication = Instant::now(); - } - - Ok(iterated) } } -}