Remove BatchSigned

SubstrateBlock's provision of the most recently acknowledged block has
equivalent information with the same latency. Accordingly, there's no need for
it.
This commit is contained in:
Luke Parker 2023-04-17 20:16:58 -04:00
parent e26b861d25
commit 5a499de4ca
No known key found for this signature in database
7 changed files with 89 additions and 67 deletions

View file

@ -83,7 +83,7 @@ async fn handle_new_set<D: Db, Pro: Processor, P: P2p>(
.unwrap(); .unwrap();
// Trigger a DKG // Trigger a DKG
// TODO: Check how the processor handles thi being fired multiple times // TODO: Check how the processor handles this being fired multiple times
// We already have a unique event ID based on block, event index (where event index is // We already have a unique event ID based on block, event index (where event index is
// the one generated in this handle_block function) // the one generated in this handle_block function)
// We could use that on this end and the processor end? // We could use that on this end and the processor end?
@ -114,9 +114,9 @@ async fn handle_key_gen<D: Db, Pro: Processor>(
.expect("KeyGen occurred for a set which doesn't exist") .expect("KeyGen occurred for a set which doesn't exist")
.is_some() .is_some()
{ {
// TODO: Check how the processor handles thi being fired multiple times // TODO: Check how the processor handles this being fired multiple times
processor processor
.send(CoordinatorMessage::KeyGen( .send(CoordinatorMessage::Substrate(
processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair { processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair {
context: SubstrateContext { context: SubstrateContext {
coin_latest_finalized_block: serai coin_latest_finalized_block: serai
@ -168,23 +168,8 @@ async fn handle_batch_and_burns<D: Db, Pro: Processor>(
// prior batches // prior batches
// Since batches within a block are guaranteed to be ordered, thanks to their incremental ID, // Since batches within a block are guaranteed to be ordered, thanks to their incremental ID,
// the last batch will be the latest batch, so its block will be the latest block // the last batch will be the latest batch, so its block will be the latest block
// This is just a mild optimization to prevent needing an additional RPC call to grab this
batch_block.insert(network, network_block); batch_block.insert(network, network_block);
// TODO: Check how the processor handles thi being fired multiple times
processor
.send(CoordinatorMessage::Coordinator(
processor_messages::coordinator::CoordinatorMessage::BatchSigned {
key: get_coin_key(
serai,
// TODO2
ValidatorSet { network, session: Session(0) },
)
.await?
.expect("ValidatorSet without keys signed a batch"),
block: network_block,
},
))
.await;
} else { } else {
panic!("Batch event wasn't Batch: {batch:?}"); panic!("Batch event wasn't Batch: {batch:?}");
} }
@ -228,7 +213,7 @@ async fn handle_batch_and_burns<D: Db, Pro: Processor>(
.expect("network had a batch/burn yet never set a latest block") .expect("network had a batch/burn yet never set a latest block")
}; };
// TODO: Check how the processor handles thi being fired multiple times // TODO: Check how the processor handles this being fired multiple times
processor processor
.send(CoordinatorMessage::Substrate( .send(CoordinatorMessage::Substrate(
processor_messages::substrate::CoordinatorMessage::SubstrateBlock { processor_messages::substrate::CoordinatorMessage::SubstrateBlock {

View file

@ -110,8 +110,6 @@ pub mod coordinator {
BatchShares { id: SignId, shares: HashMap<Participant, [u8; 32]> }, BatchShares { id: SignId, shares: HashMap<Participant, [u8; 32]> },
// Re-attempt a batch signing protocol. // Re-attempt a batch signing protocol.
BatchReattempt { id: SignId }, BatchReattempt { id: SignId },
// Needed so a client which didn't participate in signing can still realize signing completed
BatchSigned { key: Vec<u8>, block: BlockHash },
} }
impl CoordinatorMessage { impl CoordinatorMessage {
@ -120,7 +118,6 @@ pub mod coordinator {
CoordinatorMessage::BatchPreprocesses { id, .. } => BlockHash(id.id), CoordinatorMessage::BatchPreprocesses { id, .. } => BlockHash(id.id),
CoordinatorMessage::BatchShares { id, .. } => BlockHash(id.id), CoordinatorMessage::BatchShares { id, .. } => BlockHash(id.id),
CoordinatorMessage::BatchReattempt { id } => BlockHash(id.id), CoordinatorMessage::BatchReattempt { id } => BlockHash(id.id),
CoordinatorMessage::BatchSigned { block, .. } => *block,
}) })
} }
@ -129,7 +126,6 @@ pub mod coordinator {
CoordinatorMessage::BatchPreprocesses { id, .. } => &id.key, CoordinatorMessage::BatchPreprocesses { id, .. } => &id.key,
CoordinatorMessage::BatchShares { id, .. } => &id.key, CoordinatorMessage::BatchShares { id, .. } => &id.key,
CoordinatorMessage::BatchReattempt { id } => &id.key, CoordinatorMessage::BatchReattempt { id } => &id.key,
CoordinatorMessage::BatchSigned { key, .. } => key,
} }
} }
} }

View file

@ -141,9 +141,13 @@ struct TributaryMutable<C: Coin, D: Db> {
key_gen: KeyGen<C, D>, key_gen: KeyGen<C, D>,
signers: HashMap<Vec<u8>, Signer<C, D>>, signers: HashMap<Vec<u8>, Signer<C, D>>,
// This isn't mutably borrowed by Substrate. It is also mutably borrowed by the Scanner. // This is also mutably borrowed by the Scanner.
// The safety of this is from the Scanner starting new sign tasks, and Tributary only mutating // The Scanner starts new sign tasks.
// already-created sign tasks. The Scanner does not mutate sign tasks post-creation. // The Tributary mutates already-created signed tasks, potentially completing them.
// Substrate may mark tasks as completed, invalidating any existing mutable borrows.
// The safety of this follows as written above.
// TODO: There should only be one SubstrateSigner at a time (see #277)
substrate_signers: HashMap<Vec<u8>, SubstrateSigner<D>>, substrate_signers: HashMap<Vec<u8>, SubstrateSigner<D>>,
} }
@ -343,7 +347,13 @@ async fn handle_coordinator_msg<D: Db, C: Coin, Co: Coordinator>(
let key = <C::Curve as Ciphersuite>::read_G::<&[u8]>(&mut key_vec.as_ref()).unwrap(); let key = <C::Curve as Ciphersuite>::read_G::<&[u8]>(&mut key_vec.as_ref()).unwrap();
// We now have to acknowledge every block for this key up to the acknowledged block // We now have to acknowledge every block for this key up to the acknowledged block
let outputs = substrate_mutable.scanner.ack_up_to_block(key, block_id).await; let (blocks, outputs) = substrate_mutable.scanner.ack_up_to_block(key, block_id).await;
// Since this block was acknowledged, we no longer have to sign the batch for it
for block in blocks {
for (_, signer) in tributary_mutable.substrate_signers.iter_mut() {
signer.batch_signed(block);
}
}
let mut payments = vec![]; let mut payments = vec![];
for out in burns { for out in burns {

View file

@ -14,6 +14,8 @@ use tokio::{
time::sleep, time::sleep,
}; };
use serai_client::primitives::BlockHash;
use crate::{ use crate::{
Get, DbTxn, Db, Get, DbTxn, Db,
coins::{Output, Transaction, EventualitiesTracker, Block, Coin}, coins::{Output, Transaction, EventualitiesTracker, Block, Coin},
@ -175,13 +177,19 @@ impl<C: Coin, D: Db> ScannerDb<C, D> {
fn scanned_block_key(key: &<C::Curve as Ciphersuite>::G) -> Vec<u8> { fn scanned_block_key(key: &<C::Curve as Ciphersuite>::G) -> Vec<u8> {
Self::scanner_key(b"scanned_block", key.to_bytes()) Self::scanner_key(b"scanned_block", key.to_bytes())
} }
#[allow(clippy::type_complexity)]
fn save_scanned_block( fn save_scanned_block(
txn: &mut D::Transaction<'_>, txn: &mut D::Transaction<'_>,
key: &<C::Curve as Ciphersuite>::G, key: &<C::Curve as Ciphersuite>::G,
block: usize, block: usize,
) -> Vec<C::Output> { ) -> (Option<<C::Block as Block<C>>::Id>, Vec<C::Output>) {
let outputs = Self::block(txn, block).and_then(|id| Self::outputs(txn, key, &id)); let id = Self::block(txn, block); // It may be None for the first key rotated to
let outputs = outputs.unwrap_or(vec![]); let outputs = if let Some(id) = id.as_ref() {
Self::outputs(txn, key, id).unwrap_or(vec![])
} else {
vec![]
};
// Mark all the outputs from this block as seen // Mark all the outputs from this block as seen
for output in &outputs { for output in &outputs {
@ -191,7 +199,7 @@ impl<C: Coin, D: Db> ScannerDb<C, D> {
txn.put(Self::scanned_block_key(key), u64::try_from(block).unwrap().to_le_bytes()); txn.put(Self::scanned_block_key(key), u64::try_from(block).unwrap().to_le_bytes());
// Return this block's outputs so they can be pruned from the RAM cache // Return this block's outputs so they can be pruned from the RAM cache
outputs (id, outputs)
} }
fn latest_scanned_block(&self, key: <C::Curve as Ciphersuite>::G) -> usize { fn latest_scanned_block(&self, key: <C::Curve as Ciphersuite>::G) -> usize {
let bytes = self let bytes = self
@ -269,7 +277,8 @@ impl<C: Coin, D: Db> ScannerHandle<C, D> {
info!("Rotating to key {}", hex::encode(key.to_bytes())); info!("Rotating to key {}", hex::encode(key.to_bytes()));
let mut txn = scanner.db.0.txn(); let mut txn = scanner.db.0.txn();
assert!(ScannerDb::<C, D>::save_scanned_block(&mut txn, &key, activation_number).is_empty()); let (_, outputs) = ScannerDb::<C, D>::save_scanned_block(&mut txn, &key, activation_number);
assert!(outputs.is_empty());
ScannerDb::<C, D>::add_active_key(&mut txn, key); ScannerDb::<C, D>::add_active_key(&mut txn, key);
txn.commit(); txn.commit();
scanner.keys.push(key); scanner.keys.push(key);
@ -284,7 +293,7 @@ impl<C: Coin, D: Db> ScannerHandle<C, D> {
&mut self, &mut self,
key: <C::Curve as Ciphersuite>::G, key: <C::Curve as Ciphersuite>::G,
id: <C::Block as Block<C>>::Id, id: <C::Block as Block<C>>::Id,
) -> Vec<C::Output> { ) -> (Vec<BlockHash>, Vec<C::Output>) {
let mut scanner = self.scanner.write().await; let mut scanner = self.scanner.write().await;
debug!("Block {} acknowledged", hex::encode(&id)); debug!("Block {} acknowledged", hex::encode(&id));
@ -294,11 +303,16 @@ impl<C: Coin, D: Db> ScannerHandle<C, D> {
// Get the number of the last block we acknowledged // Get the number of the last block we acknowledged
let prior = scanner.db.latest_scanned_block(key); let prior = scanner.db.latest_scanned_block(key);
let mut blocks = vec![];
let mut outputs = vec![]; let mut outputs = vec![];
let mut txn = scanner.db.0.txn(); let mut txn = scanner.db.0.txn();
for number in (prior + 1) ..= number { for number in (prior + 1) ..= number {
outputs.extend(ScannerDb::<C, D>::save_scanned_block(&mut txn, &key, number)); let (block, these_outputs) = ScannerDb::<C, D>::save_scanned_block(&mut txn, &key, number);
let block = BlockHash(block.unwrap().as_ref().try_into().unwrap());
blocks.push(block);
outputs.extend(these_outputs);
} }
assert_eq!(blocks.last().unwrap().as_ref(), id.as_ref());
// TODO: This likely needs to be atomic with the scheduler? // TODO: This likely needs to be atomic with the scheduler?
txn.commit(); txn.commit();
@ -306,7 +320,7 @@ impl<C: Coin, D: Db> ScannerHandle<C, D> {
assert!(scanner.ram_outputs.remove(output.id().as_ref())); assert!(scanner.ram_outputs.remove(output.id().as_ref()));
} }
outputs (blocks, outputs)
} }
} }

View file

@ -18,7 +18,10 @@ use frost_schnorrkel::Schnorrkel;
use log::{info, debug, warn}; use log::{info, debug, warn};
use serai_client::in_instructions::primitives::{Batch, SignedBatch}; use serai_client::{
primitives::BlockHash,
in_instructions::primitives::{Batch, SignedBatch},
};
use messages::{sign::SignId, coordinator::*}; use messages::{sign::SignId, coordinator::*};
use crate::{DbTxn, Db}; use crate::{DbTxn, Db};
@ -104,13 +107,10 @@ impl<D: Db> SubstrateSigner<D> {
// Check the attempt lines up // Check the attempt lines up
match self.attempt.get(&id.id) { match self.attempt.get(&id.id) {
// If we don't have an attempt logged, it's because the coordinator is faulty OR because we // If we don't have an attempt logged, it's because the coordinator is faulty OR because we
// rebooted // rebooted OR we detected the signed batch on chain
// The latter is the expected flow for batches not actively being participated in
None => { None => {
warn!( warn!("not attempting batch {} #{}", hex::encode(id.id), id.attempt);
"not attempting batch {} #{}. this is an error if we didn't reboot",
hex::encode(id.id),
id.attempt
);
Err(())?; Err(())?;
} }
Some(attempt) => { Some(attempt) => {
@ -204,7 +204,7 @@ impl<D: Db> SubstrateSigner<D> {
pub async fn sign(&mut self, batch: Batch) { pub async fn sign(&mut self, batch: Batch) {
if self.db.completed(batch.block.0) { if self.db.completed(batch.block.0) {
debug!("Sign batch order for ID we've already completed signing"); debug!("Sign batch order for ID we've already completed signing");
// See BatchSigned for commentary on why this simply returns // See batch_signed for commentary on why this simply returns
return; return;
} }
@ -318,26 +318,28 @@ impl<D: Db> SubstrateSigner<D> {
CoordinatorMessage::BatchReattempt { id } => { CoordinatorMessage::BatchReattempt { id } => {
self.attempt(id.id, id.attempt).await; self.attempt(id.id, id.attempt).await;
} }
CoordinatorMessage::BatchSigned { key: _, block } => {
// Stop trying to sign for this batch
let mut txn = self.db.0.txn();
SubstrateSignerDb::<D>::complete(&mut txn, block.0);
txn.commit();
self.signable.remove(&block.0);
self.attempt.remove(&block.0);
self.preprocessing.remove(&block.0);
self.signing.remove(&block.0);
// This doesn't emit SignedBatch because it doesn't have access to the SignedBatch
// The coordinator is expected to only claim a batch was signed if it's on the Substrate
// chain, hence why it's unnecessary to check it/back it up here
// This also doesn't emit any further events since all mutation happen on the
// substrate::CoordinatorMessage::SubstrateBlock message (which SignedBatch is meant to
// end up triggering)
}
} }
} }
pub fn batch_signed(&mut self, block: BlockHash) {
// Stop trying to sign for this batch
let mut txn = self.db.0.txn();
SubstrateSignerDb::<D>::complete(&mut txn, block.0);
txn.commit();
self.signable.remove(&block.0);
self.attempt.remove(&block.0);
self.preprocessing.remove(&block.0);
self.signing.remove(&block.0);
// This doesn't emit SignedBatch because it doesn't have access to the SignedBatch
// This function is expected to only be called once Substrate acknowledges this block,
// which means its batch must have been signed
// While a successive batch's signing would also cause this block to be acknowledged, Substrate
// guarantees a batch's ordered inclusion
// This also doesn't emit any further events since all mutation from the Batch being signed
// happens on the substrate::CoordinatorMessage::SubstrateBlock message (which SignedBatch is
// meant to end up triggering)
}
} }

View file

@ -7,6 +7,8 @@ use frost::Participant;
use tokio::time::timeout; use tokio::time::timeout;
use serai_client::primitives::BlockHash;
use serai_db::MemDb; use serai_db::MemDb;
use crate::{ use crate::{
@ -25,13 +27,14 @@ pub async fn test_scanner<C: Coin>(coin: C) {
} }
let first = Arc::new(Mutex::new(true)); let first = Arc::new(Mutex::new(true));
let activation_number = coin.get_latest_block_number().await.unwrap();
let db = MemDb::new(); let db = MemDb::new();
let new_scanner = || async { let new_scanner = || async {
let (mut scanner, active_keys) = Scanner::new(coin.clone(), db.clone()); let (mut scanner, active_keys) = Scanner::new(coin.clone(), db.clone());
let mut first = first.lock().unwrap(); let mut first = first.lock().unwrap();
if *first { if *first {
assert!(active_keys.is_empty()); assert!(active_keys.is_empty());
scanner.rotate_key(coin.get_latest_block_number().await.unwrap(), keys.group_key()).await; scanner.rotate_key(activation_number, keys.group_key()).await;
*first = false; *first = false;
} else { } else {
assert_eq!(active_keys.len(), 1); assert_eq!(active_keys.len(), 1);
@ -68,7 +71,19 @@ pub async fn test_scanner<C: Coin>(coin: C) {
verify_event(new_scanner().await).await; verify_event(new_scanner().await).await;
// Acknowledge the block // Acknowledge the block
assert_eq!(scanner.ack_up_to_block(keys.group_key(), block_id.clone()).await, outputs);
// Acknowledging it should yield a list of all blocks since the last acknowledged block
let mut blocks = vec![];
let mut curr_block = activation_number + 1;
loop {
let block = coin.get_block(curr_block).await.unwrap().id();
blocks.push(BlockHash(block.as_ref().try_into().unwrap()));
if block == block_id {
break;
}
curr_block += 1;
}
assert_eq!(scanner.ack_up_to_block(keys.group_key(), block_id).await, (blocks, outputs));
// There should be no more events // There should be no more events
assert!(timeout(Duration::from_secs(30), scanner.events.recv()).await.is_err()); assert!(timeout(Duration::from_secs(30), scanner.events.recv()).await.is_err());

View file

@ -115,7 +115,7 @@ pub async fn test_wallet<C: Coin>(coin: C) {
// Check the Scanner DB can reload the outputs // Check the Scanner DB can reload the outputs
assert_eq!( assert_eq!(
scanner.ack_up_to_block(key, block.id()).await, scanner.ack_up_to_block(key, block.id()).await.1,
[first_outputs, outputs].concat().to_vec() [first_outputs, outputs].concat().to_vec()
); );
} }