Dedicated tasks per-Processor in coordinator

This isn't meaningful yet, as we still have serialized reading messages from
Processors, yet is a step closer.
This commit is contained in:
Luke Parker 2023-09-25 21:54:52 -04:00
parent 60491a091f
commit e1801b57c9
No known key found for this signature in database

View file

@ -175,9 +175,8 @@ pub async fn scan_substrate<D: Db, Pro: Processors>(
// Save it to the database // Save it to the database
MainDb::new(db).add_active_tributary(&spec); MainDb::new(db).add_active_tributary(&spec);
// Add it to the queue // If we reboot before this is read, the fact it was saved to the database means it'll be
// If we reboot before this is read from the queue, the fact it was saved to the database // handled on reboot
// means it'll be handled on reboot
new_tributary_spec.send(spec).unwrap(); new_tributary_spec.send(spec).unwrap();
}, },
&processors, &processors,
@ -416,8 +415,8 @@ pub async fn handle_p2p<D: Db, P: P2p>(
// Have up to three nodes respond // Have up to three nodes respond
let responders = u64::from(tributary.spec.n().min(3)); let responders = u64::from(tributary.spec.n().min(3));
// Decide which nodes will respond by using the latest block's hash as a mutually agreed // Decide which nodes will respond by using the latest block's hash as a mutually
// upon entropy source // agreed upon entropy source
// This isn't a secure source of entropy, yet it's fine for this // This isn't a secure source of entropy, yet it's fine for this
let entropy = u64::from_le_bytes(tributary_read.tip().await[.. 8].try_into().unwrap()); let entropy = u64::from_le_bytes(tributary_read.tip().await[.. 8].try_into().unwrap());
// If n = 10, responders = 3, we want start to be 0 ..= 7 (so the highest is 7, 8, 9) // If n = 10, responders = 3, we want start to be 0 ..= 7 (so the highest is 7, 8, 9)
@ -502,7 +501,7 @@ pub async fn publish_signed_transaction<D: Db, P: P2p>(
} }
pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>( pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
mut db: D, db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>, key: Zeroizing<<Ristretto as Ciphersuite>::F>,
serai: Arc<Serai>, serai: Arc<Serai>,
mut processors: Pro, mut processors: Pro,
@ -510,318 +509,364 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
) { ) {
let pub_key = Ristretto::generator() * key.deref(); let pub_key = Ristretto::generator() * key.deref();
let mut tributaries = HashMap::new(); let channels = Arc::new(RwLock::new(HashMap::new()));
loop { tokio::spawn({
while let Ok(tributary) = { let processors = processors.clone();
match new_tributary.try_recv() { let channels = channels.clone();
Ok(tributary) => Ok(tributary), async move {
Err(broadcast::error::TryRecvError::Empty) => Err(()), loop {
Err(broadcast::error::TryRecvError::Lagged(_)) => { let channels = channels.clone();
panic!("handle_processors lagged to handle new_tributary") let ActiveTributary { spec, tributary } = new_tributary.recv().await.unwrap();
} let genesis = spec.genesis();
Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"), tokio::spawn({
} let mut db = db.clone();
} { let key = key.clone();
tributaries.insert(tributary.spec.genesis(), tributary); let serai = serai.clone();
} let mut processors = processors.clone();
async move {
let (send, mut recv) = mpsc::unbounded_channel();
// TODO: Support multisig rotation (not per-Tributary yet per-network?)
channels.write().await.insert(spec.set().network, send);
loop {
let msg: processors::Message = recv.recv().await.unwrap();
// TODO: We probably want to NOP here, not panic?
// TODO: We do have to track produced Batches in order to ensure their integrity
let my_i =
spec.i(pub_key).expect("processor message for network we aren't a validator in");
let tx = match msg.msg.clone() {
ProcessorMessage::KeyGen(inner_msg) => match inner_msg {
key_gen::ProcessorMessage::Commitments { id, commitments } => {
Some(Transaction::DkgCommitments(
id.attempt,
commitments,
Transaction::empty_signed(),
))
}
key_gen::ProcessorMessage::Shares { id, mut shares } => {
// Create a MuSig-based machine to inform Substrate of this key generation
let nonces = crate::tributary::dkg_confirmation_nonces(&key, &spec, id.attempt);
let mut tx_shares = Vec::with_capacity(shares.len());
for i in 1 ..= spec.n() {
let i = Participant::new(i).unwrap();
if i == my_i {
continue;
}
tx_shares.push(
shares
.remove(&i)
.expect("processor didn't send share for another validator"),
);
}
Some(Transaction::DkgShares {
attempt: id.attempt,
shares: tx_shares,
confirmation_nonces: nonces,
signed: Transaction::empty_signed(),
})
}
key_gen::ProcessorMessage::GeneratedKeyPair {
id,
substrate_key,
network_key,
} => {
assert_eq!(
id.set.network, msg.network,
"processor claimed to be a different network than it was for GeneratedKeyPair",
);
// TODO: Also check the other KeyGenId fields
// Tell the Tributary the key pair, get back the share for the MuSig signature
let mut txn = db.txn();
let share = crate::tributary::generated_key_pair::<D>(
&mut txn,
&key,
&spec,
&(Public(substrate_key), network_key.try_into().unwrap()),
id.attempt,
);
txn.commit();
match share {
Ok(share) => Some(Transaction::DkgConfirmed(
id.attempt,
share,
Transaction::empty_signed(),
)),
Err(p) => {
todo!("participant {p:?} sent invalid DKG confirmation preprocesses")
}
}
}
},
ProcessorMessage::Sign(msg) => match msg {
sign::ProcessorMessage::Preprocess { id, preprocess } => {
if id.attempt == 0 {
let mut txn = db.txn();
MainDb::<D>::save_first_preprocess(&mut txn, id.id, preprocess);
txn.commit();
None
} else {
Some(Transaction::SignPreprocess(SignData {
plan: id.id,
attempt: id.attempt,
data: preprocess,
signed: Transaction::empty_signed(),
}))
}
}
sign::ProcessorMessage::Share { id, share } => {
Some(Transaction::SignShare(SignData {
plan: id.id,
attempt: id.attempt,
data: share,
signed: Transaction::empty_signed(),
}))
}
sign::ProcessorMessage::Completed { key: _, id, tx } => {
let r = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng));
#[allow(non_snake_case)]
let R = <Ristretto as Ciphersuite>::generator() * r.deref();
let mut tx = Transaction::SignCompleted {
plan: id,
tx_hash: tx,
first_signer: pub_key,
signature: SchnorrSignature { R, s: <Ristretto as Ciphersuite>::F::ZERO },
};
let signed = SchnorrSignature::sign(&key, r, tx.sign_completed_challenge());
match &mut tx {
Transaction::SignCompleted { signature, .. } => {
*signature = signed;
}
_ => unreachable!(),
}
Some(tx)
}
},
ProcessorMessage::Coordinator(inner_msg) => match inner_msg {
coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => {
assert_eq!(
network, msg.network,
"processor claimed to be a different network than it was for SubstrateBlockAck",
);
// Safe to use its own txn since this is static and just needs to be written
// before we provide SubstrateBlock
let mut txn = db.txn();
// TODO: This needs to be scoped per multisig
TributaryDb::<D>::set_plan_ids(&mut txn, genesis, block, &plans);
txn.commit();
Some(Transaction::SubstrateBlock(block))
}
coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocess } => {
log::info!(
"informed of batch (sign ID {}, attempt {}) for block {}",
hex::encode(id.id),
id.attempt,
hex::encode(block),
);
// If this is the first attempt instance, wait until we synchronize around the
// batch first
if id.attempt == 0 {
// Save the preprocess to disk so we can publish it later
// This is fine to use its own TX since it's static and just needs to be
// written before this message finishes it handling (or with this message's
// finished handling)
let mut txn = db.txn();
MainDb::<D>::save_first_preprocess(&mut txn, id.id, preprocess);
txn.commit();
Some(Transaction::Batch(block.0, id.id))
} else {
Some(Transaction::BatchPreprocess(SignData {
plan: id.id,
attempt: id.attempt,
data: preprocess,
signed: Transaction::empty_signed(),
}))
}
}
coordinator::ProcessorMessage::BatchShare { id, share } => {
Some(Transaction::BatchShare(SignData {
plan: id.id,
attempt: id.attempt,
data: share.to_vec(),
signed: Transaction::empty_signed(),
}))
}
},
ProcessorMessage::Substrate(inner_msg) => match inner_msg {
processor_messages::substrate::ProcessorMessage::Update { batch } => {
assert_eq!(
batch.batch.network, msg.network,
"processor sent us a batch for a different network than it was for",
);
// TODO: Check this key's key pair's substrate key is authorized to publish
// batches
// Save this batch to the disk
MainDb::new(&mut db).save_batch(batch);
/*
Use a dedicated task to publish batches due to the latency potentially
incurred.
This does not guarantee the batch has actually been published when the
message is `ack`ed to message-queue. Accordingly, if we reboot, these batches
would be dropped (as we wouldn't see the `Update` again, triggering our
re-attempt to publish).
The solution to this is to have the task try not to publish the batch which
caused it to be spawned, yet all saved batches which have yet to published.
This does risk having multiple tasks trying to publish all pending batches,
yet these aren't notably complex.
*/
tokio::spawn({
let mut db = db.clone();
let serai = serai.clone();
let network = msg.network;
async move {
// Since we have a new batch, publish all batches yet to be published to
// Serai
// This handles the edge-case where batch n+1 is signed before batch n is
while let Some(batch) = {
// Get the next-to-execute batch ID
let next = {
let mut first = true;
loop {
if !first {
log::error!(
"{} {network:?}",
"couldn't connect to Serai node to get the next batch ID for",
);
tokio::time::sleep(Duration::from_secs(5)).await;
}
first = false;
let Ok(latest_block) = serai.get_latest_block().await else {
continue;
};
let Ok(last) = serai
.get_last_batch_for_network(latest_block.hash(), network)
.await
else {
continue;
};
break if let Some(last) = last { last + 1 } else { 0 };
}
};
// If we have this batch, attempt to publish it
MainDb::new(&mut db).batch(network, next)
} {
let id = batch.batch.id;
let block = batch.batch.block;
let tx = Serai::execute_batch(batch);
// This publish may fail if this transactions already exists in the
// mempool, which is possible, or if this batch was already executed
// on-chain
// Either case will have eventual resolution and be handled by the above
// check on if this batch should execute
if serai.publish(&tx).await.is_ok() {
log::info!(
"published batch {network:?} {id} (block {})",
hex::encode(block)
);
}
}
}
});
None
}
},
};
// If this created a transaction, publish it
if let Some(mut tx) = tx {
log::trace!("processor message effected transaction {}", hex::encode(tx.hash()));
match tx.kind() {
TransactionKind::Provided(_) => {
log::trace!("providing transaction {}", hex::encode(tx.hash()));
let res = tributary.provide_transaction(tx).await;
if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) {
panic!("provided an invalid transaction: {res:?}");
}
}
TransactionKind::Unsigned => {
log::trace!("publishing unsigned transaction {}", hex::encode(tx.hash()));
// Ignores the result since we can't differentiate already in-mempool from
// already on-chain from invalid
// TODO: Don't ignore the result
tributary.add_transaction(tx).await;
}
TransactionKind::Signed(_) => {
log::trace!(
"getting next nonce for Tributary TX in response to processor message"
);
let nonce = loop {
let Some(nonce) = NonceDecider::<D>::nonce(&db, genesis, &tx)
.expect("signed TX didn't have nonce")
else {
// This can be None if:
// 1) We scanned the relevant transaction(s) in a Tributary block
// 2) The processor was sent a message and responded
// 3) The Tributary TXN has yet to be committed
log::warn!(
"nonce has yet to be saved for processor-instigated transaction"
);
sleep(Duration::from_millis(100)).await;
continue;
};
break nonce;
};
tx.sign(&mut OsRng, genesis, &key, nonce);
publish_signed_transaction(&tributary, tx).await;
}
}
}
processors.ack(msg).await;
}
}
});
}
}
});
let mut last_msg = None;
loop {
// TODO: We dispatch this to an async task per-processor, yet we don't move to the next message
// yet as all processor messages are shoved into a global queue.
// Modify message-queue to offer per-sender queues, not per-receiver.
// Alternatively, a peek method with local delineation of handled messages would work.
// TODO: Dispatch this message to a task dedicated to handling this processor, preventing one
// processor from holding up all the others. This would require a peek method be added to the
// message-queue (to view multiple future messages at once)
// TODO: Do we handle having handled a message, by DB, yet having rebooted before `ack`ing it? // TODO: Do we handle having handled a message, by DB, yet having rebooted before `ack`ing it?
// Does the processor? // Does the processor?
let msg = processors.recv().await; let msg = processors.recv().await;
if last_msg == Some(msg.id) {
// TODO2: This is slow, and only works as long as a network only has a single Tributary sleep(Duration::from_secs(1)).await;
// (which means there's a lack of multisig rotation) continue;
let spec = {
let mut spec = None;
for tributary in tributaries.values() {
if tributary.spec.set().network == msg.network {
spec = Some(tributary.spec.clone());
break;
}
}
spec.expect("received message from processor we don't have a tributary for")
};
let genesis = spec.genesis();
// TODO: We probably want to NOP here, not panic?
let my_i = spec.i(pub_key).expect("processor message for network we aren't a validator in");
let tx = match msg.msg.clone() {
ProcessorMessage::KeyGen(inner_msg) => match inner_msg {
key_gen::ProcessorMessage::Commitments { id, commitments } => {
Some(Transaction::DkgCommitments(id.attempt, commitments, Transaction::empty_signed()))
}
key_gen::ProcessorMessage::Shares { id, mut shares } => {
// Create a MuSig-based machine to inform Substrate of this key generation
let nonces = crate::tributary::dkg_confirmation_nonces(&key, &spec, id.attempt);
let mut tx_shares = Vec::with_capacity(shares.len());
for i in 1 ..= spec.n() {
let i = Participant::new(i).unwrap();
if i == my_i {
continue;
}
tx_shares
.push(shares.remove(&i).expect("processor didn't send share for another validator"));
}
Some(Transaction::DkgShares {
attempt: id.attempt,
shares: tx_shares,
confirmation_nonces: nonces,
signed: Transaction::empty_signed(),
})
}
key_gen::ProcessorMessage::GeneratedKeyPair { id, substrate_key, network_key } => {
assert_eq!(
id.set.network, msg.network,
"processor claimed to be a different network than it was for GeneratedKeyPair",
);
// TODO: Also check the other KeyGenId fields
// Tell the Tributary the key pair, get back the share for the MuSig signature
let mut txn = db.txn();
let share = crate::tributary::generated_key_pair::<D>(
&mut txn,
&key,
&spec,
&(Public(substrate_key), network_key.try_into().unwrap()),
id.attempt,
);
txn.commit();
match share {
Ok(share) => {
Some(Transaction::DkgConfirmed(id.attempt, share, Transaction::empty_signed()))
}
Err(p) => todo!("participant {p:?} sent invalid DKG confirmation preprocesses"),
}
}
},
ProcessorMessage::Sign(msg) => match msg {
sign::ProcessorMessage::Preprocess { id, preprocess } => {
if id.attempt == 0 {
let mut txn = db.txn();
MainDb::<D>::save_first_preprocess(&mut txn, id.id, preprocess);
txn.commit();
None
} else {
Some(Transaction::SignPreprocess(SignData {
plan: id.id,
attempt: id.attempt,
data: preprocess,
signed: Transaction::empty_signed(),
}))
}
}
sign::ProcessorMessage::Share { id, share } => Some(Transaction::SignShare(SignData {
plan: id.id,
attempt: id.attempt,
data: share,
signed: Transaction::empty_signed(),
})),
sign::ProcessorMessage::Completed { key: _, id, tx } => {
let r = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng));
#[allow(non_snake_case)]
let R = <Ristretto as Ciphersuite>::generator() * r.deref();
let mut tx = Transaction::SignCompleted {
plan: id,
tx_hash: tx,
first_signer: pub_key,
signature: SchnorrSignature { R, s: <Ristretto as Ciphersuite>::F::ZERO },
};
let signed = SchnorrSignature::sign(&key, r, tx.sign_completed_challenge());
match &mut tx {
Transaction::SignCompleted { signature, .. } => {
*signature = signed;
}
_ => unreachable!(),
}
Some(tx)
}
},
ProcessorMessage::Coordinator(inner_msg) => match inner_msg {
coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => {
assert_eq!(
network, msg.network,
"processor claimed to be a different network than it was for SubstrateBlockAck",
);
// Safe to use its own txn since this is static and just needs to be written before we
// provide SubstrateBlock
let mut txn = db.txn();
// TODO: This needs to be scoped per multisig
TributaryDb::<D>::set_plan_ids(&mut txn, genesis, block, &plans);
txn.commit();
Some(Transaction::SubstrateBlock(block))
}
coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocess } => {
log::info!(
"informed of batch (sign ID {}, attempt {}) for block {}",
hex::encode(id.id),
id.attempt,
hex::encode(block),
);
// If this is the first attempt instance, wait until we synchronize around the batch
// first
if id.attempt == 0 {
// Save the preprocess to disk so we can publish it later
// This is fine to use its own TX since it's static and just needs to be written
// before this message finishes it handling (or with this message's finished handling)
let mut txn = db.txn();
MainDb::<D>::save_first_preprocess(&mut txn, id.id, preprocess);
txn.commit();
Some(Transaction::Batch(block.0, id.id))
} else {
Some(Transaction::BatchPreprocess(SignData {
plan: id.id,
attempt: id.attempt,
data: preprocess,
signed: Transaction::empty_signed(),
}))
}
}
coordinator::ProcessorMessage::BatchShare { id, share } => {
Some(Transaction::BatchShare(SignData {
plan: id.id,
attempt: id.attempt,
data: share.to_vec(),
signed: Transaction::empty_signed(),
}))
}
},
ProcessorMessage::Substrate(inner_msg) => match inner_msg {
processor_messages::substrate::ProcessorMessage::Update { batch } => {
assert_eq!(
batch.batch.network, msg.network,
"processor sent us a batch for a different network than it was for",
);
// TODO: Check this key's key pair's substrate key is authorized to publish batches
// Save this batch to the disk
MainDb::new(&mut db).save_batch(batch);
/*
Use a dedicated task to publish batches due to the latency potentially incurred.
This does not guarantee the batch has actually been published when the message is
`ack`ed to message-queue. Accordingly, if we reboot, these batches would be dropped
(as we wouldn't see the `Update` again, triggering our re-attempt to publish).
The solution to this is to have the task try not to publish the batch which caused it
to be spawned, yet all saved batches which have yet to published. This does risk having
multiple tasks trying to publish all pending batches, yet these aren't notably complex.
*/
tokio::spawn({
let mut db = db.clone();
let serai = serai.clone();
let network = msg.network;
async move {
// Since we have a new batch, publish all batches yet to be published to Serai
// This handles the edge-case where batch n+1 is signed before batch n is
while let Some(batch) = {
// Get the next-to-execute batch ID
let next = {
let mut first = true;
loop {
if !first {
log::error!(
"couldn't connect to Serai node to get the next batch ID for {network:?}",
);
tokio::time::sleep(Duration::from_secs(5)).await;
}
first = false;
let Ok(latest_block) = serai.get_latest_block().await else { continue };
let Ok(last) =
serai.get_last_batch_for_network(latest_block.hash(), network).await
else {
continue;
};
break if let Some(last) = last { last + 1 } else { 0 };
}
};
// If we have this batch, attempt to publish it
MainDb::new(&mut db).batch(network, next)
} {
let id = batch.batch.id;
let block = batch.batch.block;
let tx = Serai::execute_batch(batch);
// This publish may fail if this transactions already exists in the mempool, which
// is possible, or if this batch was already executed on-chain
// Either case will have eventual resolution and be handled by the above check on
// if this block should execute
if serai.publish(&tx).await.is_ok() {
log::info!("published batch {network:?} {id} (block {})", hex::encode(block));
}
}
}
});
None
}
},
};
// If this created a transaction, publish it
if let Some(mut tx) = tx {
log::trace!("processor message effected transaction {}", hex::encode(tx.hash()));
let Some(tributary) = tributaries.get(&genesis) else {
// TODO: This can happen since Substrate tells the Processor to generate commitments
// at the same time it tells the Tributary to be created
// There's no guarantee the Tributary will have been created though
panic!("processor is operating on tributary we don't have");
};
let tributary = &tributary.tributary;
match tx.kind() {
TransactionKind::Provided(_) => {
log::trace!("providing transaction {}", hex::encode(tx.hash()));
let res = tributary.provide_transaction(tx).await;
if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) {
panic!("provided an invalid transaction: {res:?}");
}
}
TransactionKind::Unsigned => {
log::trace!("publishing unsigned transaction {}", hex::encode(tx.hash()));
// Ignores the result since we can't differentiate already in-mempool from already
// on-chain from invalid
// TODO: Don't ignore the result
tributary.add_transaction(tx).await;
}
TransactionKind::Signed(_) => {
log::trace!("getting next nonce for Tributary TX in response to processor message");
let nonce = loop {
let Some(nonce) =
NonceDecider::<D>::nonce(&db, genesis, &tx).expect("signed TX didn't have nonce")
else {
// This can be None if:
// 1) We scanned the relevant transaction(s) in a Tributary block
// 2) The processor was sent a message and responded
// 3) The Tributary TXN has yet to be committed
log::warn!("nonce has yet to be saved for processor-instigated transaction");
sleep(Duration::from_millis(100)).await;
continue;
};
break nonce;
};
tx.sign(&mut OsRng, genesis, &key, nonce);
publish_signed_transaction(tributary, tx).await;
}
}
} }
last_msg = Some(msg.id);
processors.ack(msg).await; // TODO: Race conditions with above tributary availability?
// TODO: How does this hold up to multisig rotation?
if let Some(channel) = channels.read().await.get(&msg.network) {
channel.send(msg).unwrap();
} else {
log::warn!("received processor message for network we don't have a channel for");
}
} }
} }
@ -893,7 +938,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
loop { loop {
match new_tributary_listener_1.recv().await { match new_tributary_listener_1.recv().await {
Ok(tributary) => { Ok(tributary) => {
tributaries.write().await.insert(tributary.spec.genesis(), tributary); tributaries.write().await.insert(tributary.spec.genesis(), tributary.tributary);
} }
Err(broadcast::error::RecvError::Lagged(_)) => { Err(broadcast::error::RecvError::Lagged(_)) => {
panic!("recognized_id lagged to handle new_tributary") panic!("recognized_id lagged to handle new_tributary")
@ -943,9 +988,10 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
let tributaries = tributaries.read().await; let tributaries = tributaries.read().await;
let Some(tributary) = tributaries.get(&genesis) else { let Some(tributary) = tributaries.get(&genesis) else {
// TODO: This may happen if the task above is simply slow
panic!("tributary we don't have came to consensus on an Batch"); panic!("tributary we don't have came to consensus on an Batch");
}; };
publish_signed_transaction(&tributary.tributary, tx).await; publish_signed_transaction(tributary, tx).await;
} }
} }
}; };