cargo fmt signers/scanner

This commit is contained in:
Luke Parker 2024-09-13 05:10:37 -04:00
parent 1b39138472
commit b4e94f3d51
3 changed files with 147 additions and 141 deletions

View file

@ -145,25 +145,27 @@ pub trait ScannerFeed: 'static + Send + Sync + Clone {
getter: &(impl Send + Sync + Get), getter: &(impl Send + Sync + Get),
number: u64, number: u64,
) -> impl Send + Future<Output = Result<Self::Block, String>> { ) -> impl Send + Future<Output = Result<Self::Block, String>> {
async move {let block = match self.unchecked_block_by_number(number).await { async move {
Ok(block) => block, let block = match self.unchecked_block_by_number(number).await {
Err(e) => Err(format!("couldn't fetch block {number}: {e:?}"))?, Ok(block) => block,
}; Err(e) => Err(format!("couldn't fetch block {number}: {e:?}"))?,
};
// Check the ID of this block is the expected ID // Check the ID of this block is the expected ID
{ {
let expected = crate::index::block_id(getter, number); let expected = crate::index::block_id(getter, number);
if block.id() != expected { if block.id() != expected {
panic!( panic!(
"finalized chain reorganized from {} to {} at {}", "finalized chain reorganized from {} to {} at {}",
hex::encode(expected), hex::encode(expected),
hex::encode(block.id()), hex::encode(block.id()),
number, number,
); );
}
} }
}
Ok(block)} Ok(block)
}
} }
/// The dust threshold for the specified coin. /// The dust threshold for the specified coin.

View file

@ -65,8 +65,10 @@ pub trait Coordinator: 'static + Send + Sync {
) -> impl Send + Future<Output = Result<(), Self::EphemeralError>>; ) -> impl Send + Future<Output = Result<(), Self::EphemeralError>>;
/// Publish a `Batch`. /// Publish a `Batch`.
fn publish_batch(&mut self, batch: Batch) fn publish_batch(
-> impl Send + Future<Output = Result<(), Self::EphemeralError>>; &mut self,
batch: Batch,
) -> impl Send + Future<Output = Result<(), Self::EphemeralError>>;
/// Publish a `SignedBatch`. /// Publish a `SignedBatch`.
fn publish_signed_batch( fn publish_signed_batch(

View file

@ -92,145 +92,147 @@ impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>
impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>> ContinuallyRan impl<D: Db, ST: SignableTransaction, P: TransactionPublisher<TransactionFor<ST>>> ContinuallyRan
for TransactionSignerTask<D, ST, P> for TransactionSignerTask<D, ST, P>
{ {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {async{ fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
let mut iterated = false; async {
let mut iterated = false;
// Check for new transactions to sign // Check for new transactions to sign
loop { loop {
let mut txn = self.db.txn(); let mut txn = self.db.txn();
let Some(tx) = TransactionsToSign::<ST>::try_recv(&mut txn, &self.keys[0].group_key()) else { let Some(tx) = TransactionsToSign::<ST>::try_recv(&mut txn, &self.keys[0].group_key())
break; else {
}; break;
iterated = true; };
iterated = true;
// Save this to the database as a transaction to sign // Save this to the database as a transaction to sign
self.active_signing_protocols.insert(tx.id()); self.active_signing_protocols.insert(tx.id());
ActiveSigningProtocols::set( ActiveSigningProtocols::set(
&mut txn, &mut txn,
self.session, self.session,
&self.active_signing_protocols.iter().copied().collect(), &self.active_signing_protocols.iter().copied().collect(),
); );
{ {
let mut buf = Vec::with_capacity(256); let mut buf = Vec::with_capacity(256);
tx.write(&mut buf).unwrap(); tx.write(&mut buf).unwrap();
SerializedSignableTransactions::set(&mut txn, tx.id(), &buf); SerializedSignableTransactions::set(&mut txn, tx.id(), &buf);
}
let mut machines = Vec::with_capacity(self.keys.len());
for keys in &self.keys {
machines.push(tx.clone().sign(keys.clone()));
}
for msg in self.attempt_manager.register(VariantSignId::Transaction(tx.id()), machines) {
TransactionSignerToCoordinatorMessages::send(&mut txn, self.session, &msg);
}
txn.commit();
} }
let mut machines = Vec::with_capacity(self.keys.len()); // Check for completed Eventualities (meaning we should no longer sign for these transactions)
for keys in &self.keys { loop {
machines.push(tx.clone().sign(keys.clone())); let mut txn = self.db.txn();
} let Some(id) = CompletedEventualities::try_recv(&mut txn, &self.keys[0].group_key()) else {
for msg in self.attempt_manager.register(VariantSignId::Transaction(tx.id()), machines) { break;
TransactionSignerToCoordinatorMessages::send(&mut txn, self.session, &msg); };
/*
We may have yet to register this signing protocol.
While `TransactionsToSign` is populated before `CompletedEventualities`, we could
theoretically have `TransactionsToSign` populated with a new transaction _while iterating
over `CompletedEventualities`_, and then have `CompletedEventualities` populated. In that
edge case, we will see the completion notification before we see the transaction.
In such a case, we break (dropping the txn, re-queueing the completion notification). On
the task's next iteration, we'll process the transaction from `TransactionsToSign` and be
able to make progress.
*/
if !self.active_signing_protocols.remove(&id) {
break;
}
iterated = true;
// Since it was, remove this as an active signing protocol
ActiveSigningProtocols::set(
&mut txn,
self.session,
&self.active_signing_protocols.iter().copied().collect(),
);
// Clean up the database
SerializedSignableTransactions::del(&mut txn, id);
SerializedTransactions::del(&mut txn, id);
// We retire with a txn so we either successfully flag this Eventuality as completed, and
// won't re-register it (making this retire safe), or we don't flag it, meaning we will
// re-register it, yet that's safe as we have yet to retire it
self.attempt_manager.retire(&mut txn, VariantSignId::Transaction(id));
txn.commit();
} }
txn.commit(); // Handle any messages sent to us
} loop {
let mut txn = self.db.txn();
let Some(msg) = CoordinatorToTransactionSignerMessages::try_recv(&mut txn, self.session)
else {
break;
};
iterated = true;
// Check for completed Eventualities (meaning we should no longer sign for these transactions) match self.attempt_manager.handle(msg) {
loop { Response::Messages(msgs) => {
let mut txn = self.db.txn(); for msg in msgs {
let Some(id) = CompletedEventualities::try_recv(&mut txn, &self.keys[0].group_key()) else { TransactionSignerToCoordinatorMessages::send(&mut txn, self.session, &msg);
break; }
}; }
Response::Signature { id, signature: signed_tx } => {
let signed_tx: TransactionFor<ST> = signed_tx.into();
/* // Save this transaction to the database
We may have yet to register this signing protocol. {
let mut buf = Vec::with_capacity(256);
signed_tx.write(&mut buf).unwrap();
SerializedTransactions::set(
&mut txn,
match id {
VariantSignId::Transaction(id) => id,
_ => panic!("TransactionSignerTask signed a non-transaction"),
},
&buf,
);
}
While `TransactionsToSign` is populated before `CompletedEventualities`, we could match self.publisher.publish(signed_tx).await {
theoretically have `TransactionsToSign` populated with a new transaction _while iterating Ok(()) => {}
over `CompletedEventualities`_, and then have `CompletedEventualities` populated. In that Err(e) => log::warn!("couldn't broadcast transaction: {e:?}"),
edge case, we will see the completion notification before we see the transaction. }
In such a case, we break (dropping the txn, re-queueing the completion notification). On
the task's next iteration, we'll process the transaction from `TransactionsToSign` and be
able to make progress.
*/
if !self.active_signing_protocols.remove(&id) {
break;
}
iterated = true;
// Since it was, remove this as an active signing protocol
ActiveSigningProtocols::set(
&mut txn,
self.session,
&self.active_signing_protocols.iter().copied().collect(),
);
// Clean up the database
SerializedSignableTransactions::del(&mut txn, id);
SerializedTransactions::del(&mut txn, id);
// We retire with a txn so we either successfully flag this Eventuality as completed, and
// won't re-register it (making this retire safe), or we don't flag it, meaning we will
// re-register it, yet that's safe as we have yet to retire it
self.attempt_manager.retire(&mut txn, VariantSignId::Transaction(id));
txn.commit();
}
// Handle any messages sent to us
loop {
let mut txn = self.db.txn();
let Some(msg) = CoordinatorToTransactionSignerMessages::try_recv(&mut txn, self.session)
else {
break;
};
iterated = true;
match self.attempt_manager.handle(msg) {
Response::Messages(msgs) => {
for msg in msgs {
TransactionSignerToCoordinatorMessages::send(&mut txn, self.session, &msg);
} }
} }
Response::Signature { id, signature: signed_tx } => {
let signed_tx: TransactionFor<ST> = signed_tx.into();
// Save this transaction to the database txn.commit();
{ }
let mut buf = Vec::with_capacity(256);
signed_tx.write(&mut buf).unwrap();
SerializedTransactions::set(
&mut txn,
match id {
VariantSignId::Transaction(id) => id,
_ => panic!("TransactionSignerTask signed a non-transaction"),
},
&buf,
);
}
match self.publisher.publish(signed_tx).await { // If it's been five minutes since the last publication, republish the transactions for all
Ok(()) => {} // active signing protocols
Err(e) => log::warn!("couldn't broadcast transaction: {e:?}"), if Instant::now().duration_since(self.last_publication) > Duration::from_secs(5 * 60) {
} for tx in &self.active_signing_protocols {
let Some(tx_buf) = SerializedTransactions::get(&self.db, *tx) else { continue };
let mut tx_buf = tx_buf.as_slice();
let tx = TransactionFor::<ST>::read(&mut tx_buf).unwrap();
assert!(tx_buf.is_empty());
self
.publisher
.publish(tx)
.await
.map_err(|e| format!("couldn't re-broadcast transactions: {e:?}"))?;
} }
self.last_publication = Instant::now();
} }
txn.commit(); Ok(iterated)
} }
// If it's been five minutes since the last publication, republish the transactions for all
// active signing protocols
if Instant::now().duration_since(self.last_publication) > Duration::from_secs(5 * 60) {
for tx in &self.active_signing_protocols {
let Some(tx_buf) = SerializedTransactions::get(&self.db, *tx) else { continue };
let mut tx_buf = tx_buf.as_slice();
let tx = TransactionFor::<ST>::read(&mut tx_buf).unwrap();
assert!(tx_buf.is_empty());
self
.publisher
.publish(tx)
.await
.map_err(|e| format!("couldn't re-broadcast transactions: {e:?}"))?;
}
self.last_publication = Instant::now();
}
Ok(iterated)
} }
} }
}