mirror of
https://github.com/serai-dex/serai.git
synced 2025-01-18 00:34:52 +00:00
Start moving Coordinator to a multi-Tributary model
Prior, we only supported a single Tributary per network, and spawned a task to handled Processor messages per Tributary. Now, we handle Processor messages per network, yet we still only supported a single Tributary in that handling function. Now, when we handle a message, we load the Tributary which is relevant. Once we know it, we ensure we have it (preventing race conditions), and then proceed. We do need work to check if we should have a Tributary, or if we're not participating. We also need to check if a Tributary has been retired, meaning we shouldn't handle any transactions related to them, and to clean up retired Tributaries.
This commit is contained in:
parent
4a32f22418
commit
7d738a3677
8 changed files with 353 additions and 239 deletions
|
@ -2,7 +2,7 @@ use core::{ops::Deref, future::Future};
|
|||
use std::{
|
||||
sync::Arc,
|
||||
time::{SystemTime, Duration},
|
||||
collections::HashMap,
|
||||
collections::{VecDeque, HashMap},
|
||||
};
|
||||
|
||||
use zeroize::{Zeroize, Zeroizing};
|
||||
|
@ -527,286 +527,362 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
|
|||
serai: Arc<Serai>,
|
||||
mut processors: Pro,
|
||||
network: NetworkId,
|
||||
mut recv: mpsc::UnboundedReceiver<ActiveTributary<D, P>>,
|
||||
mut new_tributary: mpsc::UnboundedReceiver<ActiveTributary<D, P>>,
|
||||
) {
|
||||
let mut db_clone = db.clone(); // Enables cloning the DB while we have a txn
|
||||
let pub_key = Ristretto::generator() * key.deref();
|
||||
|
||||
let ActiveTributary { spec, tributary } = recv.recv().await.unwrap();
|
||||
let genesis = spec.genesis();
|
||||
let mut tributaries = HashMap::new();
|
||||
|
||||
loop {
|
||||
match new_tributary.try_recv() {
|
||||
Ok(tributary) => {
|
||||
tributaries.insert(tributary.spec.set().session, tributary);
|
||||
}
|
||||
Err(mpsc::error::TryRecvError::Empty) => {}
|
||||
Err(mpsc::error::TryRecvError::Disconnected) => {
|
||||
panic!("handle_processor_messages new_tributary sender closed")
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Check this ID is sane (last handled ID or expected next ID)
|
||||
let msg = processors.recv(network).await;
|
||||
|
||||
// TODO: We need to verify the Batches published to Substrate
|
||||
|
||||
if !MainDb::<D>::handled_message(&db, msg.network, msg.id) {
|
||||
let mut txn = db.txn();
|
||||
|
||||
// TODO: We probably want to NOP here, not panic?
|
||||
// TODO: We do have to track produced Batches in order to ensure their integrity
|
||||
let my_i = spec.i(pub_key).expect("processor message for network we aren't a validator in");
|
||||
|
||||
let tx = match msg.msg.clone() {
|
||||
let relevant_tributary = match &msg.msg {
|
||||
ProcessorMessage::KeyGen(inner_msg) => match inner_msg {
|
||||
key_gen::ProcessorMessage::Commitments { id, commitments } => {
|
||||
Some(Transaction::DkgCommitments(id.attempt, commitments, Transaction::empty_signed()))
|
||||
}
|
||||
key_gen::ProcessorMessage::Shares { id, mut shares } => {
|
||||
// Create a MuSig-based machine to inform Substrate of this key generation
|
||||
let nonces = crate::tributary::dkg_confirmation_nonces(&key, &spec, id.attempt);
|
||||
|
||||
let mut tx_shares = Vec::with_capacity(shares.len());
|
||||
for i in 1 ..= spec.n() {
|
||||
let i = Participant::new(i).unwrap();
|
||||
if i == my_i {
|
||||
continue;
|
||||
}
|
||||
tx_shares.push(
|
||||
shares.remove(&i).expect("processor didn't send share for another validator"),
|
||||
);
|
||||
}
|
||||
|
||||
Some(Transaction::DkgShares {
|
||||
attempt: id.attempt,
|
||||
shares: tx_shares,
|
||||
confirmation_nonces: nonces,
|
||||
signed: Transaction::empty_signed(),
|
||||
})
|
||||
}
|
||||
key_gen::ProcessorMessage::GeneratedKeyPair { id, substrate_key, network_key } => {
|
||||
assert_eq!(
|
||||
id.set.network, msg.network,
|
||||
"processor claimed to be a different network than it was for GeneratedKeyPair",
|
||||
);
|
||||
// TODO: Also check the other KeyGenId fields
|
||||
|
||||
// Tell the Tributary the key pair, get back the share for the MuSig
|
||||
// signature
|
||||
let share = crate::tributary::generated_key_pair::<D>(
|
||||
&mut txn,
|
||||
&key,
|
||||
&spec,
|
||||
&(Public(substrate_key), network_key.try_into().unwrap()),
|
||||
id.attempt,
|
||||
);
|
||||
|
||||
match share {
|
||||
Ok(share) => {
|
||||
Some(Transaction::DkgConfirmed(id.attempt, share, Transaction::empty_signed()))
|
||||
}
|
||||
Err(p) => {
|
||||
todo!("participant {p:?} sent invalid DKG confirmation preprocesses")
|
||||
}
|
||||
}
|
||||
}
|
||||
key_gen::ProcessorMessage::Commitments { id, .. } => Some(id.set.session),
|
||||
key_gen::ProcessorMessage::Shares { id, .. } => Some(id.set.session),
|
||||
key_gen::ProcessorMessage::GeneratedKeyPair { id, .. } => Some(id.set.session),
|
||||
},
|
||||
ProcessorMessage::Sign(msg) => match msg {
|
||||
sign::ProcessorMessage::Preprocess { id, preprocess } => {
|
||||
if id.attempt == 0 {
|
||||
MainDb::<D>::save_first_preprocess(&mut txn, network, id.id, preprocess);
|
||||
|
||||
None
|
||||
} else {
|
||||
Some(Transaction::SignPreprocess(SignData {
|
||||
plan: id.id,
|
||||
attempt: id.attempt,
|
||||
data: preprocess,
|
||||
signed: Transaction::empty_signed(),
|
||||
}))
|
||||
}
|
||||
// TODO: Review replacing key with Session in messages?
|
||||
ProcessorMessage::Sign(inner_msg) => match inner_msg {
|
||||
sign::ProcessorMessage::Preprocess { id, .. } => {
|
||||
Some(substrate::SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
|
||||
}
|
||||
sign::ProcessorMessage::Share { id, share } => Some(Transaction::SignShare(SignData {
|
||||
plan: id.id,
|
||||
attempt: id.attempt,
|
||||
data: share,
|
||||
signed: Transaction::empty_signed(),
|
||||
})),
|
||||
sign::ProcessorMessage::Completed { key: _, id, tx } => {
|
||||
let r = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng));
|
||||
#[allow(non_snake_case)]
|
||||
let R = <Ristretto as Ciphersuite>::generator() * r.deref();
|
||||
let mut tx = Transaction::SignCompleted {
|
||||
plan: id,
|
||||
tx_hash: tx,
|
||||
first_signer: pub_key,
|
||||
signature: SchnorrSignature { R, s: <Ristretto as Ciphersuite>::F::ZERO },
|
||||
};
|
||||
let signed = SchnorrSignature::sign(&key, r, tx.sign_completed_challenge());
|
||||
match &mut tx {
|
||||
Transaction::SignCompleted { signature, .. } => {
|
||||
*signature = signed;
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
Some(tx)
|
||||
sign::ProcessorMessage::Share { id, .. } => {
|
||||
Some(substrate::SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
|
||||
}
|
||||
sign::ProcessorMessage::Completed { key, .. } => {
|
||||
Some(substrate::SubstrateDb::<D>::session_for_key(&txn, key).unwrap())
|
||||
}
|
||||
},
|
||||
ProcessorMessage::Coordinator(inner_msg) => match inner_msg {
|
||||
// This is a special case as it's relevant to *all* Tributaries
|
||||
coordinator::ProcessorMessage::SubstrateBlockAck { network, block, plans } => {
|
||||
assert_eq!(
|
||||
network, msg.network,
|
||||
*network, msg.network,
|
||||
"processor claimed to be a different network than it was for SubstrateBlockAck",
|
||||
);
|
||||
|
||||
// TODO: This needs to be scoped per multisig
|
||||
TributaryDb::<D>::set_plan_ids(&mut txn, genesis, block, &plans);
|
||||
// TODO: Find all Tributaries active at this Substrate block, and make sure we have
|
||||
// them all
|
||||
|
||||
Some(Transaction::SubstrateBlock(block))
|
||||
}
|
||||
coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocess } => {
|
||||
log::info!(
|
||||
"informed of batch (sign ID {}, attempt {}) for block {}",
|
||||
hex::encode(id.id),
|
||||
id.attempt,
|
||||
hex::encode(block),
|
||||
);
|
||||
// If this is the first attempt instance, wait until we synchronize around
|
||||
// the batch first
|
||||
if id.attempt == 0 {
|
||||
MainDb::<D>::save_first_preprocess(&mut txn, spec.set().network, id.id, preprocess);
|
||||
for tributary in tributaries.values() {
|
||||
// TODO: This needs to be scoped per multisig
|
||||
TributaryDb::<D>::set_plan_ids(&mut txn, tributary.spec.genesis(), *block, plans);
|
||||
|
||||
Some(Transaction::Batch(block.0, id.id))
|
||||
} else {
|
||||
Some(Transaction::BatchPreprocess(SignData {
|
||||
plan: id.id,
|
||||
attempt: id.attempt,
|
||||
data: preprocess,
|
||||
signed: Transaction::empty_signed(),
|
||||
}))
|
||||
let tx = Transaction::SubstrateBlock(*block);
|
||||
log::trace!("processor message effected transaction {}", hex::encode(tx.hash()));
|
||||
log::trace!("providing transaction {}", hex::encode(tx.hash()));
|
||||
let res = tributary.tributary.provide_transaction(tx).await;
|
||||
if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) {
|
||||
panic!("provided an invalid transaction: {res:?}");
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
coordinator::ProcessorMessage::BatchShare { id, share } => {
|
||||
Some(Transaction::BatchShare(SignData {
|
||||
plan: id.id,
|
||||
attempt: id.attempt,
|
||||
data: share.to_vec(),
|
||||
signed: Transaction::empty_signed(),
|
||||
}))
|
||||
coordinator::ProcessorMessage::BatchPreprocess { id, .. } => {
|
||||
Some(substrate::SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
|
||||
}
|
||||
coordinator::ProcessorMessage::BatchShare { id, .. } => {
|
||||
Some(substrate::SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
|
||||
}
|
||||
},
|
||||
ProcessorMessage::Substrate(inner_msg) => match inner_msg {
|
||||
// If this is a new Batch, immediately publish it and don't do any further processing
|
||||
processor_messages::substrate::ProcessorMessage::Update { batch } => {
|
||||
assert_eq!(
|
||||
batch.batch.network, msg.network,
|
||||
"processor sent us a batch for a different network than it was for",
|
||||
);
|
||||
// TODO: Check this key's key pair's substrate key is authorized to publish
|
||||
// batches
|
||||
// TODO: Check this key's key pair's substrate key is authorized to publish batches
|
||||
|
||||
log::debug!("received batch {:?} {}", batch.batch.network, batch.batch.id);
|
||||
|
||||
// Save this batch to the disk
|
||||
MainDb::<D>::save_batch(&mut txn, batch);
|
||||
MainDb::<D>::save_batch(&mut txn, batch.clone());
|
||||
|
||||
/*
|
||||
Use a dedicated task to publish batches due to the latency potentially
|
||||
incurred.
|
||||
|
||||
This does not guarantee the batch has actually been published when the
|
||||
message is `ack`ed to message-queue. Accordingly, if we reboot, these
|
||||
batches would be dropped (as we wouldn't see the `Update` again, triggering
|
||||
our re-attempt to publish).
|
||||
|
||||
The solution to this is to have the task try not to publish the batch which
|
||||
caused it to be spawned, yet all saved batches which have yet to published.
|
||||
This does risk having multiple tasks trying to publish all pending batches,
|
||||
yet these aren't notably complex.
|
||||
*/
|
||||
tokio::spawn({
|
||||
let db = db_clone.clone();
|
||||
let serai = serai.clone();
|
||||
let network = msg.network;
|
||||
async move {
|
||||
// Since we have a new batch, publish all batches yet to be published to
|
||||
// Serai
|
||||
// This handles the edge-case where batch n+1 is signed before batch n is
|
||||
while let Some(batch) = {
|
||||
// Get the next-to-execute batch ID
|
||||
let next = {
|
||||
let mut first = true;
|
||||
loop {
|
||||
if !first {
|
||||
log::error!(
|
||||
"{} {network:?}",
|
||||
"couldn't connect to Serai node to get the next batch ID for",
|
||||
);
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
first = false;
|
||||
|
||||
let Ok(latest_block) = serai.get_latest_block().await else {
|
||||
continue;
|
||||
};
|
||||
let Ok(last) =
|
||||
serai.get_last_batch_for_network(latest_block.hash(), network).await
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
break if let Some(last) = last { last + 1 } else { 0 };
|
||||
}
|
||||
};
|
||||
|
||||
// If we have this batch, attempt to publish it
|
||||
MainDb::<D>::batch(&db, network, next)
|
||||
} {
|
||||
let id = batch.batch.id;
|
||||
let block = batch.batch.block;
|
||||
|
||||
let tx = Serai::execute_batch(batch);
|
||||
// This publish may fail if this transactions already exists in the
|
||||
// mempool, which is possible, or if this batch was already executed
|
||||
// on-chain
|
||||
// Either case will have eventual resolution and be handled by the
|
||||
// above check on if this batch should execute
|
||||
if serai.publish(&tx).await.is_ok() {
|
||||
log::info!("published batch {network:?} {id} (block {})", hex::encode(block));
|
||||
}
|
||||
// Get the next-to-execute batch ID
|
||||
async fn get_next(serai: &Serai, network: NetworkId) -> u32 {
|
||||
let mut first = true;
|
||||
loop {
|
||||
if !first {
|
||||
log::error!(
|
||||
"{} {network:?}",
|
||||
"couldn't connect to Serai node to get the next batch ID for",
|
||||
);
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
first = false;
|
||||
|
||||
let Ok(latest_block) = serai.get_latest_block().await else {
|
||||
continue;
|
||||
};
|
||||
let Ok(last) = serai.get_last_batch_for_network(latest_block.hash(), network).await
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
break if let Some(last) = last { last + 1 } else { 0 };
|
||||
}
|
||||
});
|
||||
}
|
||||
let mut next = get_next(&serai, network).await;
|
||||
|
||||
// Since we have a new batch, publish all batches yet to be published to Serai
|
||||
// This handles the edge-case where batch n+1 is signed before batch n is
|
||||
let mut batches = VecDeque::new();
|
||||
while let Some(batch) = MainDb::<D>::batch(&txn, network, next) {
|
||||
batches.push_back(batch);
|
||||
next += 1;
|
||||
}
|
||||
|
||||
while let Some(batch) = batches.pop_front() {
|
||||
// If this Batch should no longer be published, continue
|
||||
if get_next(&serai, network).await > batch.batch.id {
|
||||
continue;
|
||||
}
|
||||
|
||||
let tx = Serai::execute_batch(batch.clone());
|
||||
log::debug!(
|
||||
"attempting to publish batch {:?} {}",
|
||||
batch.batch.network,
|
||||
batch.batch.id,
|
||||
);
|
||||
// This publish may fail if this transactions already exists in the mempool, which is
|
||||
// possible, or if this batch was already executed on-chain
|
||||
// Either case will have eventual resolution and be handled by the above check on if
|
||||
// this batch should execute
|
||||
let res = serai.publish(&tx).await;
|
||||
if res.is_ok() {
|
||||
log::info!(
|
||||
"published batch {network:?} {} (block {})",
|
||||
batch.batch.id,
|
||||
hex::encode(batch.batch.block),
|
||||
);
|
||||
} else {
|
||||
log::debug!(
|
||||
"couldn't publish batch {:?} {}: {:?}",
|
||||
batch.batch.network,
|
||||
batch.batch.id,
|
||||
res,
|
||||
);
|
||||
// If we failed to publish it, restore it
|
||||
batches.push_front(batch);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
// If this created a transaction, publish it
|
||||
if let Some(mut tx) = tx {
|
||||
log::trace!("processor message effected transaction {}", hex::encode(tx.hash()));
|
||||
// If there's a relevant Tributary...
|
||||
if let Some(relevant_tributary) = relevant_tributary {
|
||||
// Make sure we have it
|
||||
// TODO: Differentiate between we don't have it yet *and* we aren't participating in it
|
||||
let Some(ActiveTributary { spec, tributary }) = tributaries.get(&relevant_tributary) else {
|
||||
// Since we don't, sleep for a fraction of a second and move to the next loop iteration
|
||||
// At the start of the loop, we'll check for new tributaries, making this eventually
|
||||
// resolve
|
||||
// TODO: We may receive a Processor message for a *closed* Tributary
|
||||
// Handle that edge case
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
continue;
|
||||
};
|
||||
|
||||
match tx.kind() {
|
||||
TransactionKind::Provided(_) => {
|
||||
log::trace!("providing transaction {}", hex::encode(tx.hash()));
|
||||
let res = tributary.provide_transaction(tx).await;
|
||||
if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) {
|
||||
panic!("provided an invalid transaction: {res:?}");
|
||||
let genesis = spec.genesis();
|
||||
|
||||
let tx = match msg.msg.clone() {
|
||||
ProcessorMessage::KeyGen(inner_msg) => match inner_msg {
|
||||
key_gen::ProcessorMessage::Commitments { id, commitments } => Some(
|
||||
Transaction::DkgCommitments(id.attempt, commitments, Transaction::empty_signed()),
|
||||
),
|
||||
key_gen::ProcessorMessage::Shares { id, mut shares } => {
|
||||
// Create a MuSig-based machine to inform Substrate of this key generation
|
||||
let nonces = crate::tributary::dkg_confirmation_nonces(&key, spec, id.attempt);
|
||||
|
||||
let mut tx_shares = Vec::with_capacity(shares.len());
|
||||
for i in 1 ..= spec.n() {
|
||||
let i = Participant::new(i).unwrap();
|
||||
if i ==
|
||||
spec
|
||||
.i(pub_key)
|
||||
.expect("processor message to DKG for a session we aren't a validator in")
|
||||
{
|
||||
continue;
|
||||
}
|
||||
tx_shares.push(
|
||||
shares.remove(&i).expect("processor didn't send share for another validator"),
|
||||
);
|
||||
}
|
||||
|
||||
Some(Transaction::DkgShares {
|
||||
attempt: id.attempt,
|
||||
shares: tx_shares,
|
||||
confirmation_nonces: nonces,
|
||||
signed: Transaction::empty_signed(),
|
||||
})
|
||||
}
|
||||
}
|
||||
TransactionKind::Unsigned => {
|
||||
log::trace!("publishing unsigned transaction {}", hex::encode(tx.hash()));
|
||||
// Ignores the result since we can't differentiate already in-mempool from
|
||||
// already on-chain from invalid
|
||||
// TODO: Don't ignore the result
|
||||
tributary.add_transaction(tx).await;
|
||||
}
|
||||
TransactionKind::Signed(_) => {
|
||||
log::trace!("getting next nonce for Tributary TX in response to processor message");
|
||||
key_gen::ProcessorMessage::GeneratedKeyPair { id, substrate_key, network_key } => {
|
||||
assert_eq!(
|
||||
id.set.network, msg.network,
|
||||
"processor claimed to be a different network than it was for GeneratedKeyPair",
|
||||
);
|
||||
// TODO2: Also check the other KeyGenId fields
|
||||
|
||||
let nonce = loop {
|
||||
let Some(nonce) =
|
||||
NonceDecider::<D>::nonce(&txn, genesis, &tx).expect("signed TX didn't have nonce")
|
||||
else {
|
||||
// This can be None if:
|
||||
// 1) We scanned the relevant transaction(s) in a Tributary block
|
||||
// 2) The processor was sent a message and responded
|
||||
// 3) The Tributary TXN has yet to be committed
|
||||
log::warn!("nonce has yet to be saved for processor-instigated transaction");
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
continue;
|
||||
// Tell the Tributary the key pair, get back the share for the MuSig signature
|
||||
let share = crate::tributary::generated_key_pair::<D>(
|
||||
&mut txn,
|
||||
&key,
|
||||
spec,
|
||||
&(Public(substrate_key), network_key.try_into().unwrap()),
|
||||
id.attempt,
|
||||
);
|
||||
|
||||
match share {
|
||||
Ok(share) => {
|
||||
Some(Transaction::DkgConfirmed(id.attempt, share, Transaction::empty_signed()))
|
||||
}
|
||||
Err(p) => {
|
||||
todo!("participant {p:?} sent invalid DKG confirmation preprocesses")
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
ProcessorMessage::Sign(msg) => match msg {
|
||||
sign::ProcessorMessage::Preprocess { id, preprocess } => {
|
||||
if id.attempt == 0 {
|
||||
MainDb::<D>::save_first_preprocess(&mut txn, network, id.id, preprocess);
|
||||
|
||||
None
|
||||
} else {
|
||||
Some(Transaction::SignPreprocess(SignData {
|
||||
plan: id.id,
|
||||
attempt: id.attempt,
|
||||
data: preprocess,
|
||||
signed: Transaction::empty_signed(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
sign::ProcessorMessage::Share { id, share } => Some(Transaction::SignShare(SignData {
|
||||
plan: id.id,
|
||||
attempt: id.attempt,
|
||||
data: share,
|
||||
signed: Transaction::empty_signed(),
|
||||
})),
|
||||
sign::ProcessorMessage::Completed { key: _, id, tx } => {
|
||||
let r = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng));
|
||||
#[allow(non_snake_case)]
|
||||
let R = <Ristretto as Ciphersuite>::generator() * r.deref();
|
||||
let mut tx = Transaction::SignCompleted {
|
||||
plan: id,
|
||||
tx_hash: tx,
|
||||
first_signer: pub_key,
|
||||
signature: SchnorrSignature { R, s: <Ristretto as Ciphersuite>::F::ZERO },
|
||||
};
|
||||
break nonce;
|
||||
};
|
||||
tx.sign(&mut OsRng, genesis, &key, nonce);
|
||||
let signed = SchnorrSignature::sign(&key, r, tx.sign_completed_challenge());
|
||||
match &mut tx {
|
||||
Transaction::SignCompleted { signature, .. } => {
|
||||
*signature = signed;
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
Some(tx)
|
||||
}
|
||||
},
|
||||
ProcessorMessage::Coordinator(inner_msg) => match inner_msg {
|
||||
coordinator::ProcessorMessage::SubstrateBlockAck { .. } => unreachable!(),
|
||||
coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocess } => {
|
||||
log::info!(
|
||||
"informed of batch (sign ID {}, attempt {}) for block {}",
|
||||
hex::encode(id.id),
|
||||
id.attempt,
|
||||
hex::encode(block),
|
||||
);
|
||||
// If this is the first attempt instance, wait until we synchronize around
|
||||
// the batch first
|
||||
if id.attempt == 0 {
|
||||
MainDb::<D>::save_first_preprocess(&mut txn, spec.set().network, id.id, preprocess);
|
||||
|
||||
publish_signed_transaction(&mut db_clone, &tributary, tx).await;
|
||||
Some(Transaction::Batch(block.0, id.id))
|
||||
} else {
|
||||
Some(Transaction::BatchPreprocess(SignData {
|
||||
plan: id.id,
|
||||
attempt: id.attempt,
|
||||
data: preprocess,
|
||||
signed: Transaction::empty_signed(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
coordinator::ProcessorMessage::BatchShare { id, share } => {
|
||||
Some(Transaction::BatchShare(SignData {
|
||||
plan: id.id,
|
||||
attempt: id.attempt,
|
||||
data: share.to_vec(),
|
||||
signed: Transaction::empty_signed(),
|
||||
}))
|
||||
}
|
||||
},
|
||||
ProcessorMessage::Substrate(inner_msg) => match inner_msg {
|
||||
processor_messages::substrate::ProcessorMessage::Update { .. } => unreachable!(),
|
||||
},
|
||||
};
|
||||
|
||||
// If this created a transaction, publish it
|
||||
if let Some(mut tx) = tx {
|
||||
log::trace!("processor message effected transaction {}", hex::encode(tx.hash()));
|
||||
|
||||
match tx.kind() {
|
||||
TransactionKind::Provided(_) => {
|
||||
log::trace!("providing transaction {}", hex::encode(tx.hash()));
|
||||
let res = tributary.provide_transaction(tx).await;
|
||||
if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) {
|
||||
panic!("provided an invalid transaction: {res:?}");
|
||||
}
|
||||
}
|
||||
TransactionKind::Unsigned => {
|
||||
log::trace!("publishing unsigned transaction {}", hex::encode(tx.hash()));
|
||||
// Ignores the result since we can't differentiate already in-mempool from
|
||||
// already on-chain from invalid
|
||||
// TODO: Don't ignore the result
|
||||
tributary.add_transaction(tx).await;
|
||||
}
|
||||
TransactionKind::Signed(_) => {
|
||||
log::trace!("getting next nonce for Tributary TX in response to processor message");
|
||||
|
||||
let nonce = loop {
|
||||
let Some(nonce) = NonceDecider::<D>::nonce(&txn, genesis, &tx)
|
||||
.expect("signed TX didn't have nonce")
|
||||
else {
|
||||
// This can be None if:
|
||||
// 1) We scanned the relevant transaction(s) in a Tributary block
|
||||
// 2) The processor was sent a message and responded
|
||||
// 3) The Tributary TXN has yet to be committed
|
||||
log::warn!("nonce has yet to be saved for processor-instigated transaction");
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
continue;
|
||||
};
|
||||
break nonce;
|
||||
};
|
||||
tx.sign(&mut OsRng, genesis, &key, nonce);
|
||||
|
||||
publish_signed_transaction(&mut db_clone, tributary, tx).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
use scale::{Encode, Decode};
|
||||
|
||||
pub use serai_db::*;
|
||||
|
||||
use serai_client::validator_sets::primitives::{Session, KeyPair};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SubstrateDb<D: Db>(pub D);
|
||||
impl<D: Db> SubstrateDb<D> {
|
||||
|
@ -33,4 +37,22 @@ impl<D: Db> SubstrateDb<D> {
|
|||
assert!(!Self::handled_event(txn, id, index));
|
||||
txn.put(Self::event_key(&id, index), []);
|
||||
}
|
||||
|
||||
fn session_key(key: &[u8]) -> Vec<u8> {
|
||||
Self::substrate_key(b"session", key)
|
||||
}
|
||||
pub fn session_for_key<G: Get>(getter: &G, key: &[u8]) -> Option<Session> {
|
||||
getter.get(Self::session_key(key)).map(|bytes| Session::decode(&mut bytes.as_ref()).unwrap())
|
||||
}
|
||||
pub fn save_session_for_keys(txn: &mut D::Transaction<'_>, key_pair: &KeyPair, session: Session) {
|
||||
let session = session.encode();
|
||||
let key_0 = Self::session_key(&key_pair.0);
|
||||
let existing = txn.get(&key_0);
|
||||
// This may trigger if 100% of a DKG are malicious, and they create a key equivalent to a prior
|
||||
// key. Since it requires 100% maliciousness, not just 67% maliciousness, this will only assert
|
||||
// in a modified-to-be-malicious stack, making it safe
|
||||
assert!(existing.is_none() || (existing.as_ref() == Some(&session)));
|
||||
txn.put(key_0, session.clone());
|
||||
txn.put(Self::session_key(&key_pair.1), session);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,13 +86,19 @@ async fn handle_new_set<D: Db, CNT: Clone + Fn(&mut D, TributarySpec)>(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_key_gen<Pro: Processors>(
|
||||
async fn handle_key_gen<D: Db, Pro: Processors>(
|
||||
db: &mut D,
|
||||
processors: &Pro,
|
||||
serai: &Serai,
|
||||
block: &Block,
|
||||
set: ValidatorSet,
|
||||
key_pair: KeyPair,
|
||||
) -> Result<(), SeraiError> {
|
||||
// This has to be saved *before* we send ConfirmKeyPair
|
||||
let mut txn = db.txn();
|
||||
SubstrateDb::<D>::save_session_for_keys(&mut txn, &key_pair, set.session);
|
||||
txn.commit();
|
||||
|
||||
processors
|
||||
.send(
|
||||
set.network,
|
||||
|
@ -254,7 +260,7 @@ async fn handle_block<D: Db, CNT: Clone + Fn(&mut D, TributarySpec), Pro: Proces
|
|||
TributaryDb::<D>::set_key_pair(&mut txn, set, &key_pair);
|
||||
txn.commit();
|
||||
|
||||
handle_key_gen(processors, serai, &block, set, key_pair).await?;
|
||||
handle_key_gen(&mut db.0, processors, serai, &block, set, key_pair).await?;
|
||||
} else {
|
||||
panic!("KeyGen event wasn't KeyGen: {key_gen:?}");
|
||||
}
|
||||
|
|
|
@ -454,11 +454,12 @@ pub(crate) async fn handle_application_tx<
|
|||
) {
|
||||
Some(Some(preprocesses)) => {
|
||||
NonceDecider::<D>::selected_for_signing_batch(txn, genesis, data.plan);
|
||||
let key = TributaryDb::<D>::key_pair(txn, spec.set()).unwrap().0 .0.to_vec();
|
||||
processors
|
||||
.send(
|
||||
spec.set().network,
|
||||
CoordinatorMessage::Coordinator(coordinator::CoordinatorMessage::BatchPreprocesses {
|
||||
id: SignId { key: vec![], id: data.plan, attempt: data.attempt },
|
||||
id: SignId { key, id: data.plan, attempt: data.attempt },
|
||||
preprocesses,
|
||||
}),
|
||||
)
|
||||
|
@ -480,11 +481,12 @@ pub(crate) async fn handle_application_tx<
|
|||
&data.signed,
|
||||
) {
|
||||
Some(Some(shares)) => {
|
||||
let key = TributaryDb::<D>::key_pair(txn, spec.set()).unwrap().0 .0.to_vec();
|
||||
processors
|
||||
.send(
|
||||
spec.set().network,
|
||||
CoordinatorMessage::Coordinator(coordinator::CoordinatorMessage::BatchShares {
|
||||
id: SignId { key: vec![], id: data.plan, attempt: data.attempt },
|
||||
id: SignId { key, id: data.plan, attempt: data.attempt },
|
||||
shares: shares
|
||||
.into_iter()
|
||||
.map(|(validator, share)| (validator, share.try_into().unwrap()))
|
||||
|
|
|
@ -56,6 +56,7 @@ impl TributarySpec {
|
|||
let mut validators = vec![];
|
||||
for (participant, amount) in set_data.participants {
|
||||
// TODO: Ban invalid keys from being validators on the Serai side
|
||||
// (make coordinator key a session key?)
|
||||
let participant = <Ristretto as Ciphersuite>::read_G::<&[u8]>(&mut participant.0.as_ref())
|
||||
.expect("invalid key registered as participant");
|
||||
// Give one weight on Tributary per bond instance
|
||||
|
|
|
@ -4,6 +4,7 @@ use std::collections::{VecDeque, HashMap};
|
|||
use rand_core::OsRng;
|
||||
|
||||
use transcript::{Transcript, RecommendedTranscript};
|
||||
use ciphersuite::group::GroupEncoding;
|
||||
use frost::{
|
||||
curve::Ristretto,
|
||||
ThresholdKeys,
|
||||
|
@ -177,9 +178,7 @@ impl<D: Db> SubstrateSigner<D> {
|
|||
// Update the attempt number
|
||||
self.attempt.insert(id, attempt);
|
||||
|
||||
// Doesn't set key since there's only one key active at a time
|
||||
// TODO: BatchSignId
|
||||
let id = SignId { key: vec![], id, attempt };
|
||||
let id = SignId { key: self.keys.group_key().to_bytes().to_vec(), id, attempt };
|
||||
info!("signing batch {} #{}", hex::encode(id.id), id.attempt);
|
||||
|
||||
// If we reboot mid-sign, the current design has us abort all signs and wait for latter
|
||||
|
|
|
@ -26,7 +26,11 @@ async fn test_substrate_signer() {
|
|||
|
||||
let id: u32 = 5;
|
||||
let block = BlockHash([0xaa; 32]);
|
||||
let mut actual_id = SignId { key: vec![], id: [0; 32], attempt: 0 };
|
||||
let mut actual_id = SignId {
|
||||
key: keys.values().next().unwrap().group_key().to_bytes().to_vec(),
|
||||
id: [0; 32],
|
||||
attempt: 0,
|
||||
};
|
||||
|
||||
let batch = Batch {
|
||||
network: NetworkId::Monero,
|
||||
|
|
|
@ -29,7 +29,11 @@ pub async fn batch(
|
|||
) -> u64 {
|
||||
let mut id = [0; 32];
|
||||
OsRng.fill_bytes(&mut id);
|
||||
let id = SignId { key: vec![], id, attempt: 0 };
|
||||
let id = SignId {
|
||||
key: (<Ristretto as Ciphersuite>::generator() * **substrate_key).to_bytes().to_vec(),
|
||||
id,
|
||||
attempt: 0,
|
||||
};
|
||||
|
||||
// Select a random participant to exclude, so we know for sure who *is* participating
|
||||
assert_eq!(COORDINATORS - THRESHOLD, 1);
|
||||
|
|
Loading…
Reference in a new issue