Document how receiving a Processor message does indeed make its Tributary relevant

This commit is contained in:
Luke Parker 2023-09-28 20:04:20 -04:00
parent 4d1212ec65
commit 83b3a5c31c
No known key found for this signature in database
2 changed files with 16 additions and 4 deletions

View file

@ -554,6 +554,8 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
let mut txn = db.txn(); let mut txn = db.txn();
let relevant_tributary = match &msg.msg { let relevant_tributary = match &msg.msg {
// We'll only receive these if we fired GenerateKey, which we'll only do if if we're
// in-set, making the Tributary relevant
ProcessorMessage::KeyGen(inner_msg) => match inner_msg { ProcessorMessage::KeyGen(inner_msg) => match inner_msg {
key_gen::ProcessorMessage::Commitments { id, .. } => Some(id.set.session), key_gen::ProcessorMessage::Commitments { id, .. } => Some(id.set.session),
key_gen::ProcessorMessage::Shares { id, .. } => Some(id.set.session), key_gen::ProcessorMessage::Shares { id, .. } => Some(id.set.session),
@ -561,18 +563,23 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
}, },
// TODO: Review replacing key with Session in messages? // TODO: Review replacing key with Session in messages?
ProcessorMessage::Sign(inner_msg) => match inner_msg { ProcessorMessage::Sign(inner_msg) => match inner_msg {
// We'll only receive Preprocess and Share if we're actively signing
sign::ProcessorMessage::Preprocess { id, .. } => { sign::ProcessorMessage::Preprocess { id, .. } => {
Some(substrate::SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap()) Some(substrate::SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
} }
sign::ProcessorMessage::Share { id, .. } => { sign::ProcessorMessage::Share { id, .. } => {
Some(substrate::SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap()) Some(substrate::SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
} }
// While the Processor's Scanner will always emit Completed, that's routed through the
// Signer and only becomes a ProcessorMessage::Completed if the Signer is present and
// confirms it
sign::ProcessorMessage::Completed { key, .. } => { sign::ProcessorMessage::Completed { key, .. } => {
Some(substrate::SubstrateDb::<D>::session_for_key(&txn, key).unwrap()) Some(substrate::SubstrateDb::<D>::session_for_key(&txn, key).unwrap())
} }
}, },
ProcessorMessage::Coordinator(inner_msg) => match inner_msg { ProcessorMessage::Coordinator(inner_msg) => match inner_msg {
// This is a special case as it's relevant to *all* Tributaries // This is a special case as it's relevant to *all* Tributaries
// It doesn't return a Tributary to become `relevant_tributary` though
coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => { coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => {
assert_eq!( assert_eq!(
*network, msg.network, *network, msg.network,
@ -597,6 +604,7 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
None None
} }
// We'll only fire these if we are the Substrate signer, making the Tributary relevant
coordinator::ProcessorMessage::BatchPreprocess { id, .. } => { coordinator::ProcessorMessage::BatchPreprocess { id, .. } => {
Some(substrate::SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap()) Some(substrate::SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
} }
@ -605,7 +613,8 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
} }
}, },
ProcessorMessage::Substrate(inner_msg) => match inner_msg { ProcessorMessage::Substrate(inner_msg) => match inner_msg {
// If this is a new Batch, immediately publish it and don't do any further processing // If this is a new Batch, immediately publish it (if we can)
// This doesn't return a relevant Tributary as there's no Tributary with action expected
processor_messages::substrate::ProcessorMessage::Update { batch } => { processor_messages::substrate::ProcessorMessage::Update { batch } => {
assert_eq!( assert_eq!(
batch.batch.network, msg.network, batch.batch.network, msg.network,
@ -694,13 +703,14 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
// If there's a relevant Tributary... // If there's a relevant Tributary...
if let Some(relevant_tributary) = relevant_tributary { if let Some(relevant_tributary) = relevant_tributary {
// Make sure we have it // Make sure we have it
// TODO: Differentiate between we don't have it yet *and* we aren't participating in it // Per the reasoning above, we only return a Tributary as relevant if we're a participant
// Accordingly, we do *need* to have this Tributary now to handle it UNLESS the Tributary
// has already completed and this is simply an old message
// TODO: Check if the Tributary has already been completed
let Some(ActiveTributary { spec, tributary }) = tributaries.get(&relevant_tributary) else { let Some(ActiveTributary { spec, tributary }) = tributaries.get(&relevant_tributary) else {
// Since we don't, sleep for a fraction of a second and move to the next loop iteration // Since we don't, sleep for a fraction of a second and move to the next loop iteration
// At the start of the loop, we'll check for new tributaries, making this eventually // At the start of the loop, we'll check for new tributaries, making this eventually
// resolve // resolve
// TODO: We may receive a Processor message for a *closed* Tributary
// Handle that edge case
sleep(Duration::from_millis(100)).await; sleep(Duration::from_millis(100)).await;
continue; continue;
}; };

View file

@ -566,6 +566,8 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
} }
SignerEvent::SignedTransaction { id, tx } => { SignerEvent::SignedTransaction { id, tx } => {
// It is important ProcessorMessage::Completed is only emitted if a Signer we had
// created the TX completed (which having it only emitted after a SignerEvent ensures)
coordinator coordinator
.send(ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed { .send(ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed {
key: key.clone(), key: key.clone(),