Reattempts (#483)

* Schedule re-attempts and add a (not filled out) match statement to actually execute them

A comment explains the methodology. To copy it here:

"""
This is because we *always* re-attempt any protocol which had participation. That doesn't
mean we *should* re-attempt this protocol.

The alternatives were:
1) Note on-chain we completed a protocol, halting re-attempts upon 34%.
2) Vote on-chain to re-attempt a protocol.

This schema doesn't have any additional messages upon the success case (whereas
alternative #1 does) and doesn't have overhead (as alternative #2 does, sending votes and
then preprocesses. This only sends preprocesses).
"""

Any signing protocol which reaches sufficient participation will be
re-attempted until it no longer does.

* Have the Substrate scanner track DKG removals/completions for the Tributary code

* Don't keep trying to publish a participant removal if we've already set keys

* Pad out the re-attempt match a bit more

* Have CosignEvaluator reload from the DB

* Correctly schedule cosign re-attempts

* Actuall spawn new DKG removal attempts

* Use u32 for Batch ID in SubstrateSignableId, finish Batch re-attempt routing

The batch ID was an opaque [u8; 5] which also included the network, yet that's
redundant and unhelpful.

* Clarify a pair of TODOs in the coordinator

* Remove old TODO

* Final comment cleanup

* Correct usage of TARGET_BLOCK_TIME in reattempt scheduler

It's in ms and I assumed it was in s.

* Have coordinator tests drop BatchReattempts which aren't relevant yet may exist

* Bug fix and pointless oddity removal

We scheduled a re-attempt upon receiving 2/3rds of preprocesses and upon
receiving 2/3rds of shares, so any signing protocol could cause two re-attempts
(not one more).

The coordinator tests randomly generated the Batch ID since it was prior an
opaque byte array. While that didn't break the test, it was pointless and did
make the already-succeeded check before re-attempting impossible to hit.

* Add log statements, correct dead-lock in coordinator tests

* Increase pessimistic timeout on recv_message to compensate for tighter best-case timeouts

* Further bump timeout by a minute

AFAICT, GH failed by just a few seconds.

This also is worst-case in a single instance, making it fine to be decently long.

* Further further bump timeout due to lack of distinct error
This commit is contained in:
Luke Parker 2023-12-12 12:28:53 -05:00 committed by GitHub
parent b297b79f07
commit 6a172825aa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 437 additions and 191 deletions

View file

@ -9,7 +9,7 @@ use tokio::{
time::sleep,
};
use scale::Encode;
use borsh::BorshSerialize;
use sp_application_crypto::RuntimePublic;
use serai_client::{
primitives::{NETWORKS, NetworkId, Signature},
@ -28,7 +28,8 @@ use crate::{
create_db! {
CosignDb {
ReceivedCosign: (set: ValidatorSet, block: [u8; 32]) -> Vec<u8>,
ReceivedCosign: (set: ValidatorSet, block: [u8; 32]) -> CosignedBlock,
LatestCosign: (network: NetworkId) -> CosignedBlock,
DistinctChain: (set: ValidatorSet) -> (),
}
}
@ -37,7 +38,7 @@ pub struct CosignEvaluator<D: Db> {
db: Mutex<D>,
serai: Arc<Serai>,
stakes: RwLock<Option<HashMap<NetworkId, u64>>>,
latest_cosigns: RwLock<HashMap<NetworkId, (u64, CosignedBlock)>>,
latest_cosigns: RwLock<HashMap<NetworkId, CosignedBlock>>,
}
impl<D: Db> CosignEvaluator<D> {
@ -50,10 +51,10 @@ impl<D: Db> CosignEvaluator<D> {
let latest_cosigns = self.latest_cosigns.read().await;
let mut highest_block = 0;
for (block_num, _) in latest_cosigns.values() {
for cosign in latest_cosigns.values() {
let mut networks = HashSet::new();
for (network, (sub_block_num, _)) in &*latest_cosigns {
if sub_block_num >= block_num {
for (network, sub_cosign) in &*latest_cosigns {
if sub_cosign.block_number >= cosign.block_number {
networks.insert(network);
}
}
@ -61,7 +62,7 @@ impl<D: Db> CosignEvaluator<D> {
networks.into_iter().map(|network| stakes.get(network).unwrap_or(&0)).sum::<u64>();
let needed_stake = ((total_stake * 2) / 3) + 1;
if (total_stake == 0) || (sum_stake > needed_stake) {
highest_block = highest_block.max(*block_num);
highest_block = highest_block.max(cosign.block_number);
}
}
@ -106,7 +107,7 @@ impl<D: Db> CosignEvaluator<D> {
async fn handle_new_cosign(&self, cosign: CosignedBlock) -> Result<(), SeraiError> {
// If we already have this cosign or a newer cosign, return
if let Some(latest) = self.latest_cosigns.read().await.get(&cosign.network) {
if latest.0 >= cosign.block_number {
if latest.block_number >= cosign.block_number {
return Ok(());
}
}
@ -180,7 +181,8 @@ impl<D: Db> CosignEvaluator<D> {
{
let mut db = self.db.lock().await;
let mut txn = db.txn();
ReceivedCosign::set(&mut txn, set_with_keys, cosign.block, &cosign.encode());
ReceivedCosign::set(&mut txn, set_with_keys, cosign.block, &cosign);
LatestCosign::set(&mut txn, set_with_keys.network, &(cosign));
txn.commit();
}
@ -258,7 +260,7 @@ impl<D: Db> CosignEvaluator<D> {
} else {
{
let mut latest_cosigns = self.latest_cosigns.write().await;
latest_cosigns.insert(cosign.network, (block.number(), cosign));
latest_cosigns.insert(cosign.network, cosign);
}
self.update_latest_cosign().await;
}
@ -268,11 +270,18 @@ impl<D: Db> CosignEvaluator<D> {
#[allow(clippy::new_ret_no_self)]
pub fn new<P: P2p>(db: D, p2p: P, serai: Arc<Serai>) -> mpsc::UnboundedSender<CosignedBlock> {
let mut latest_cosigns = HashMap::new();
for network in NETWORKS {
if let Some(cosign) = LatestCosign::get(&db, network) {
latest_cosigns.insert(network, cosign);
}
}
let evaluator = Arc::new(Self {
db: Mutex::new(db),
serai,
stakes: RwLock::new(None),
latest_cosigns: RwLock::new(HashMap::new()),
latest_cosigns: RwLock::new(latest_cosigns),
});
// Spawn a task to update stakes regularly
@ -310,15 +319,11 @@ impl<D: Db> CosignEvaluator<D> {
tokio::spawn({
async move {
loop {
let cosigns = evaluator
.latest_cosigns
.read()
.await
.values()
.map(|cosign| cosign.1)
.collect::<Vec<_>>();
let cosigns = evaluator.latest_cosigns.read().await.values().cloned().collect::<Vec<_>>();
for cosign in cosigns {
P2p::broadcast(&p2p, P2pMessageKind::CosignedBlock, cosign.encode()).await;
let mut buf = vec![];
cosign.serialize(&mut buf).unwrap();
P2p::broadcast(&p2p, P2pMessageKind::CosignedBlock, buf).await;
}
sleep(Duration::from_secs(60)).await;
}

View file

@ -18,6 +18,7 @@ use frost::Participant;
use serai_db::{DbTxn, Db};
use scale::Encode;
use borsh::BorshSerialize;
use serai_client::{
primitives::NetworkId,
validator_sets::primitives::{Session, ValidatorSet, KeyPair},
@ -248,7 +249,9 @@ async fn handle_processor_message<D: Db, P: P2p>(
},
};
cosign_channel.send(cosigned_block).unwrap();
P2p::broadcast(p2p, P2pMessageKind::CosignedBlock, cosigned_block.encode()).await;
let mut buf = vec![];
cosigned_block.serialize(&mut buf).unwrap();
P2p::broadcast(p2p, P2pMessageKind::CosignedBlock, buf).await;
None
}
},
@ -555,7 +558,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
let SubstrateSignableId::Batch(id) = id.id else {
panic!("BatchPreprocess SubstrateSignableId wasn't Batch")
};
id.encode()
id.to_le_bytes()
},
preprocesses.into_iter().map(Into::into).collect(),
);
@ -1057,7 +1060,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
let mut tx = match id_type {
RecognizedIdType::Batch => Transaction::SubstrateSign(SignData {
data: get_preprocess(&raw_db, id_type, &id).await,
plan: SubstrateSignableId::Batch(id.as_slice().try_into().unwrap()),
plan: SubstrateSignableId::Batch(u32::from_le_bytes(id.try_into().unwrap())),
label: Label::Preprocess,
attempt: 0,
signed: Transaction::empty_signed(),

View file

@ -8,7 +8,7 @@ use std::{
use async_trait::async_trait;
use scale::{Encode, Decode};
use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::primitives::NetworkId;
use serai_db::Db;
@ -39,7 +39,7 @@ use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent};
const LIBP2P_TOPIC: &str = "serai-coordinator";
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)]
pub struct CosignedBlock {
pub network: NetworkId,
pub block_number: u64,
@ -705,8 +705,7 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
}
}
P2pMessageKind::CosignedBlock => {
let mut msg_ref: &[u8] = msg.msg.as_ref();
let Ok(msg) = CosignedBlock::decode(&mut scale::IoReader(&mut msg_ref)) else {
let Ok(msg) = CosignedBlock::deserialize_reader(&mut msg.msg.as_slice()) else {
log::error!("received CosignedBlock message with invalidly serialized contents");
continue;
};

View file

@ -12,7 +12,7 @@ use serai_client::{
SeraiError, Block, Serai, TemporalSerai,
primitives::{BlockHash, NetworkId},
validator_sets::{
primitives::{ValidatorSet, KeyPair, amortize_excess_key_shares},
primitives::{ValidatorSet, amortize_excess_key_shares},
ValidatorSetsEvent,
},
in_instructions::InInstructionsEvent,
@ -25,7 +25,11 @@ use processor_messages::SubstrateContext;
use tokio::{sync::mpsc, time::sleep};
use crate::{Db, processors::Processors, tributary::TributarySpec};
use crate::{
Db,
processors::Processors,
tributary::{TributarySpec, SeraiDkgRemoval, SeraiDkgCompleted},
};
mod db;
pub use db::*;
@ -114,37 +118,6 @@ async fn handle_new_set<D: Db>(
Ok(())
}
async fn handle_key_gen<Pro: Processors>(
processors: &Pro,
serai: &Serai,
block: &Block,
set: ValidatorSet,
key_pair: KeyPair,
) -> Result<(), SeraiError> {
processors
.send(
set.network,
processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair {
context: SubstrateContext {
serai_time: block.time().unwrap() / 1000,
network_latest_finalized_block: serai
.as_of(block.hash())
.in_instructions()
.latest_block_for_network(set.network)
.await?
// The processor treats this as a magic value which will cause it to find a network
// block which has a time greater than or equal to the Serai time
.unwrap_or(BlockHash([0; 32])),
},
session: set.session,
key_pair,
},
)
.await;
Ok(())
}
async fn handle_batch_and_burns<Pro: Processors>(
txn: &mut impl DbTxn,
processors: &Pro,
@ -249,6 +222,19 @@ async fn handle_block<D: Db, Pro: Processors>(
// Define an indexed event ID.
let mut event_id = 0;
if HandledEvent::is_unhandled(db, hash, event_id) {
let mut txn = db.txn();
for removal in serai.as_of(hash).validator_sets().participant_removed_events().await? {
let ValidatorSetsEvent::ParticipantRemoved { set, removed } = removal else {
panic!("ParticipantRemoved event wasn't ParticipantRemoved: {removal:?}");
};
SeraiDkgRemoval::set(&mut txn, set, removed.0, &());
}
HandledEvent::handle_event(&mut txn, hash, event_id);
txn.commit();
}
event_id += 1;
// If a new validator set was activated, create tributary/inform processor to do a DKG
for new_set in serai.as_of(hash).validator_sets().new_set_events().await? {
// Individually mark each event as handled so on reboot, we minimize duplicates
@ -279,12 +265,31 @@ async fn handle_block<D: Db, Pro: Processors>(
for key_gen in serai.as_of(hash).validator_sets().key_gen_events().await? {
if HandledEvent::is_unhandled(db, hash, event_id) {
log::info!("found fresh key gen event {:?}", key_gen);
if let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen {
handle_key_gen(processors, serai, &block, set, key_pair).await?;
} else {
let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen else {
panic!("KeyGen event wasn't KeyGen: {key_gen:?}");
}
};
processors
.send(
set.network,
processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair {
context: SubstrateContext {
serai_time: block.time().unwrap() / 1000,
network_latest_finalized_block: serai
.as_of(block.hash())
.in_instructions()
.latest_block_for_network(set.network)
.await?
// The processor treats this as a magic value which will cause it to find a network
// block which has a time greater than or equal to the Serai time
.unwrap_or(BlockHash([0; 32])),
},
session: set.session,
key_pair,
},
)
.await;
let mut txn = db.txn();
SeraiDkgCompleted::set(&mut txn, set, &());
HandledEvent::handle_event(&mut txn, hash, event_id);
txn.commit();
}

View file

@ -223,27 +223,24 @@ fn serialize_transaction() {
{
let mut block = [0; 32];
OsRng.fill_bytes(&mut block);
let mut batch = [0; 5];
OsRng.fill_bytes(&mut batch);
let batch = u32::try_from(OsRng.next_u64() >> 32).unwrap();
test_read_write(Transaction::Batch { block, batch });
}
test_read_write(Transaction::SubstrateBlock(OsRng.next_u64()));
{
let mut plan = [0; 5];
OsRng.fill_bytes(&mut plan);
let batch = u32::try_from(OsRng.next_u64() >> 32).unwrap();
test_read_write(Transaction::SubstrateSign(random_sign_data(
&mut OsRng,
SubstrateSignableId::Batch(plan),
SubstrateSignableId::Batch(batch),
Label::Preprocess,
)));
}
{
let mut plan = [0; 5];
OsRng.fill_bytes(&mut plan);
let batch = u32::try_from(OsRng.next_u64() >> 32).unwrap();
test_read_write(Transaction::SubstrateSign(random_sign_data(
&mut OsRng,
SubstrateSignableId::Batch(plan),
SubstrateSignableId::Batch(batch),
Label::Share,
)));
}

View file

@ -1,10 +1,11 @@
use std::collections::HashMap;
use scale::Encode;
use borsh::{BorshSerialize, BorshDeserialize};
use frost::Participant;
use serai_client::validator_sets::primitives::KeyPair;
use serai_client::validator_sets::primitives::{KeyPair, ValidatorSet};
use processor_messages::coordinator::SubstrateSignableId;
@ -14,7 +15,7 @@ use tributary::ReadWrite;
use crate::tributary::{Label, Transaction};
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode)]
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, BorshSerialize, BorshDeserialize)]
pub enum Topic {
Dkg,
DkgConfirmation,
@ -44,6 +45,10 @@ pub enum Accumulation {
create_db!(
Tributary {
SeraiBlockNumber: (hash: [u8; 32]) -> u64,
SeraiDkgRemoval: (spec: ValidatorSet, removing: [u8; 32]) -> (),
SeraiDkgCompleted: (spec: ValidatorSet) -> (),
TributaryBlockNumber: (block: [u8; 32]) -> u32,
LastHandledBlock: (genesis: [u8; 32]) -> [u8; 32],
FatalSlashes: (genesis: [u8; 32]) -> Vec<[u8; 32]>,
FatallySlashed: (genesis: [u8; 32], account: [u8; 32]) -> (),
@ -54,7 +59,9 @@ create_db!(
(genesis: [u8; 32], removing: [u8; 32], attempt: u32) -> HashMap<Participant, Vec<u8>>,
DkgKeyPair: (genesis: [u8; 32], attempt: u32) -> KeyPair,
DkgCompleted: (genesis: [u8; 32]) -> (),
LocallyDkgRemoved: (genesis: [u8; 32], validator: [u8; 32]) -> (),
AttemptDb: (genesis: [u8; 32], topic: &Topic) -> u32,
ReattemptDb: (genesis: [u8; 32], block: u32) -> Vec<Topic>,
DataReceived: (genesis: [u8; 32], data_spec: &DataSpecification) -> u16,
DataDb: (genesis: [u8; 32], data_spec: &DataSpecification, signer_bytes: &[u8; 32]) -> Vec<u8>,
@ -82,6 +89,13 @@ impl AttemptDb {
Self::set(txn, genesis, &topic, &0u32);
}
pub fn start_next_attempt(txn: &mut impl DbTxn, genesis: [u8; 32], topic: Topic) -> u32 {
let next =
Self::attempt(txn, genesis, topic).expect("starting next attempt for unknown topic") + 1;
Self::set(txn, genesis, &topic, &next);
next
}
pub fn attempt(getter: &impl Get, genesis: [u8; 32], topic: Topic) -> Option<u32> {
let attempt = Self::get(getter, genesis, &topic);
// Don't require explicit recognition of the Dkg topic as it starts when the chain does
@ -92,6 +106,42 @@ impl AttemptDb {
}
}
impl ReattemptDb {
pub fn schedule_reattempt(
txn: &mut impl DbTxn,
genesis: [u8; 32],
current_block_number: u32,
topic: Topic,
) {
// 5 minutes
const BASE_REATTEMPT_DELAY: u32 = (5 * 60 * 1000) / tributary::tendermint::TARGET_BLOCK_TIME;
// 5 minutes for attempts 0 ..= 2, 10 minutes for attempts 3 ..= 5, 15 minutes for attempts > 5
// Assumes no event will take longer than 15 minutes, yet grows the time in case there are
// network bandwidth issues
let reattempt_delay = BASE_REATTEMPT_DELAY *
((AttemptDb::attempt(txn, genesis, topic)
.expect("scheduling re-attempt for unknown topic") /
3) +
1)
.min(3);
let upon_block = current_block_number + reattempt_delay;
#[allow(clippy::unwrap_or_default)]
let mut reattempts = Self::get(txn, genesis, upon_block).unwrap_or(vec![]);
reattempts.push(topic);
Self::set(txn, genesis, upon_block, &reattempts);
}
pub fn take(txn: &mut impl DbTxn, genesis: [u8; 32], block_number: u32) -> Vec<Topic> {
#[allow(clippy::unwrap_or_default)]
let res = Self::get(txn, genesis, block_number).unwrap_or(vec![]);
if !res.is_empty() {
Self::del(txn, genesis, block_number);
}
res
}
}
impl SignedTransactionDb {
pub fn take_signed_transaction(
txn: &mut impl DbTxn,

View file

@ -26,12 +26,9 @@ use serai_db::*;
use crate::{
processors::Processors,
tributary::{
SignData, Transaction, TributarySpec, SeraiBlockNumber, Topic, Label, DataSpecification,
DataSet, Accumulation,
*,
signing_protocol::{DkgConfirmer, DkgRemoval},
scanner::{RecognizedIdType, RIDTrait, PstTxType, PSTTrait, PTTTrait, TributaryBlockHandler},
FatallySlashed, DkgShare, DkgCompleted, PlanIds, ConfirmationNonces, RemovalNonces, DkgKeyPair,
AttemptDb, DataReceived, DataDb,
},
P2p,
};
@ -106,6 +103,7 @@ impl<T: DbTxn, Pro: Processors, PST: PSTTrait, PTT: PTTTrait, RID: RIDTrait, P:
signer: <Ristretto as Ciphersuite>::G,
data: &Vec<u8>,
) -> Accumulation {
log::debug!("accumulating entry for {:?} attempt #{}", &data_spec.topic, &data_spec.attempt);
let genesis = self.spec.genesis();
if DataDb::get(self.txn, genesis, data_spec, &signer.to_bytes()).is_some() {
panic!("accumulating data for a participant multiple times");
@ -121,13 +119,37 @@ impl<T: DbTxn, Pro: Processors, PST: PSTTrait, PTT: PTTTrait, RID: RIDTrait, P:
DataReceived::set(self.txn, genesis, data_spec, &now_received);
DataDb::set(self.txn, genesis, data_spec, &signer.to_bytes(), data);
let received_range = (prior_received + 1) ..= now_received;
// If 2/3rds of the network participated in this preprocess, queue it for an automatic
// re-attempt
// DkgConfirmation doesn't have a re-attempt as it's just an extension for Dkg
if (data_spec.label == Label::Preprocess) &&
received_range.contains(&self.spec.t()) &&
(data_spec.topic != Topic::DkgConfirmation)
{
// Double check the attempt on this entry, as we don't want to schedule a re-attempt if this
// is an old entry
// This is an assert, not part of the if check, as old data shouldn't be here in the first
// place
assert_eq!(AttemptDb::attempt(self.txn, genesis, data_spec.topic), Some(data_spec.attempt));
ReattemptDb::schedule_reattempt(self.txn, genesis, self.block_number, data_spec.topic);
// Because this attempt was participated in, it was justified
// The question becomes why did the prior attempt fail?
// TODO: Slash people who failed to participate as expected in the prior attempt
}
// If we have all the needed commitments/preprocesses/shares, tell the processor
let needed = if (data_spec.topic == Topic::Dkg) || (data_spec.topic == Topic::DkgConfirmation) {
self.spec.n()
} else {
self.spec.t()
};
if (prior_received < needed) && (now_received >= needed) {
let needs_everyone =
(data_spec.topic == Topic::Dkg) || (data_spec.topic == Topic::DkgConfirmation);
let needed = if needs_everyone { self.spec.n() } else { self.spec.t() };
if received_range.contains(&needed) {
log::debug!(
"accumulation for entry {:?} attempt #{} is ready",
&data_spec.topic,
&data_spec.attempt
);
return Accumulation::Ready({
let mut data = HashMap::new();
for validator in self.spec.validators().iter().map(|validator| validator.0) {
@ -187,6 +209,12 @@ impl<T: DbTxn, Pro: Processors, PST: PSTTrait, PTT: PTTTrait, RID: RIDTrait, P:
// If the attempt is lesser than the blockchain's, return
if data_spec.attempt < curr_attempt {
log::debug!(
"dated attempt published onto tributary for topic {:?} (used attempt {}, current {})",
data_spec.topic,
data_spec.attempt,
curr_attempt
);
return Accumulation::NotReady;
}
// If the attempt is greater, this is a premature publication, full slash
@ -541,17 +569,22 @@ impl<T: DbTxn, Pro: Processors, PST: PSTTrait, PTT: PTTTrait, RID: RIDTrait, P:
return;
};
// TODO: Only handle this if we're not actively removing any of the signers
// We need to only handle this if we're not actively removing any of the signers
// At the start of this function, we only handle messages from non-fatally slashed
// participants, so this is held
//
// The created Substrate call will fail if a removed validator was one of the signers
// Since:
// 1) publish_serai_tx will block this task until the TX is published
// 2) We won't scan any more TXs/blocks until we handle this TX
// The TX *must* be successfully published *before* we start removing any more
// signers
//
// Accordingly, if the signers aren't currently being removed, they won't be removed
// by the time this transaction is successfully published *unless* a malicious 34%
// participates with the non-participating 33% to continue operation and produce a
// distinct removal (since the non-participating won't block in this block)
//
// This breaks BFT and is accordingly within bounds
let tx = serai_client::SeraiValidatorSets::remove_participant(
@ -560,6 +593,7 @@ impl<T: DbTxn, Pro: Processors, PST: PSTTrait, PTT: PTTTrait, RID: RIDTrait, P:
signers,
Signature(signature),
);
LocallyDkgRemoved::set(self.txn, genesis, data.plan, &());
self
.publish_serai_tx
.publish_serai_tx(self.spec.set(), PstTxType::RemoveParticipant(data.plan), tx)
@ -597,7 +631,12 @@ impl<T: DbTxn, Pro: Processors, PST: PSTTrait, PTT: PTTTrait, RID: RIDTrait, P:
);
self
.recognized_id
.recognized_id(self.spec.set(), genesis, RecognizedIdType::Batch, batch.to_vec())
.recognized_id(
self.spec.set(),
genesis,
RecognizedIdType::Batch,
batch.to_le_bytes().to_vec(),
)
.await;
}

View file

@ -1,4 +1,4 @@
use core::{marker::PhantomData, future::Future, time::Duration};
use core::{marker::PhantomData, ops::Deref, future::Future, time::Duration};
use std::sync::Arc;
use rand_core::OsRng;
@ -15,6 +15,8 @@ use serai_client::{validator_sets::primitives::ValidatorSet, Serai};
use serai_db::DbTxn;
use processor_messages::coordinator::SubstrateSignableId;
use tributary::{
TransactionKind, Transaction as TributaryTransaction, TransactionError, Block, TributaryReader,
tendermint::{
@ -26,10 +28,8 @@ use tributary::{
use crate::{
Db,
processors::Processors,
tributary::{
TributarySpec, Label, SignData, Transaction, Topic, AttemptDb, LastHandledBlock,
FatallySlashed, DkgCompleted, signing_protocol::DkgRemoval,
},
substrate::BatchInstructionsHashDb,
tributary::{*, signing_protocol::*},
P2p,
};
@ -74,6 +74,7 @@ pub enum PstTxType {
#[async_trait::async_trait]
pub trait PSTTrait {
// TODO: Diversify publish_set_keys, publish_remove_participant, then remove PstTxType
async fn publish_serai_tx(
&self,
set: ValidatorSet,
@ -125,13 +126,31 @@ pub struct TributaryBlockHandler<
pub publish_tributary_tx: &'a PTT,
pub spec: &'a TributarySpec,
block: Block<Transaction>,
pub block_number: u32,
_p2p: PhantomData<P>,
}
impl<T: DbTxn, Pro: Processors, PST: PSTTrait, PTT: PTTTrait, RID: RIDTrait, P: P2p>
TributaryBlockHandler<'_, T, Pro, PST, PTT, RID, P>
{
async fn dkg_removal_attempt(&mut self, removing: [u8; 32], attempt: u32) {
let preprocess =
(DkgRemoval { spec: self.spec, key: self.our_key, txn: self.txn, removing, attempt })
.preprocess();
let mut tx = Transaction::DkgRemoval(SignData {
plan: removing,
attempt,
label: Label::Preprocess,
data: vec![preprocess.to_vec()],
signed: Transaction::empty_signed(),
});
tx.sign(&mut OsRng, self.spec.genesis(), self.our_key);
self.publish_tributary_tx.publish_tributary_tx(tx).await;
}
pub async fn fatal_slash(&mut self, slashing: [u8; 32], reason: &str) {
// TODO: If this fatal slash puts the remaining set below the threshold, spin
let genesis = self.spec.genesis();
log::warn!("fatally slashing {}. reason: {}", hex::encode(slashing), reason);
@ -144,23 +163,7 @@ impl<T: DbTxn, Pro: Processors, PST: PSTTrait, PTT: PTTTrait, RID: RIDTrait, P:
// If during a DKG, remove the participant
if DkgCompleted::get(self.txn, genesis).is_none() {
AttemptDb::recognize_topic(self.txn, genesis, Topic::DkgRemoval(slashing));
let preprocess = (DkgRemoval {
spec: self.spec,
key: self.our_key,
txn: self.txn,
removing: slashing,
attempt: 0,
})
.preprocess();
let mut tx = Transaction::DkgRemoval(SignData {
plan: slashing,
attempt: 0,
label: Label::Preprocess,
data: vec![preprocess.to_vec()],
signed: Transaction::empty_signed(),
});
tx.sign(&mut OsRng, genesis, self.our_key);
self.publish_tributary_tx.publish_tributary_tx(tx).await;
self.dkg_removal_attempt(slashing, 0).await;
}
}
@ -223,7 +226,135 @@ impl<T: DbTxn, Pro: Processors, PST: PSTTrait, PTT: PTTTrait, RID: RIDTrait, P:
}
}
// TODO: Trigger any necessary re-attempts
let genesis = self.spec.genesis();
for topic in ReattemptDb::take(self.txn, genesis, self.block_number) {
let attempt = AttemptDb::start_next_attempt(self.txn, genesis, topic);
log::info!("re-attempting {topic:?} with attempt {attempt}");
/*
All of these have the same common flow:
1) Check if this re-attempt is actually needed
2) If so, dispatch whatever events as needed
This is because we *always* re-attempt any protocol which had participation. That doesn't
mean we *should* re-attempt this protocol.
The alternatives were:
1) Note on-chain we completed a protocol, halting re-attempts upon 34%.
2) Vote on-chain to re-attempt a protocol.
This schema doesn't have any additional messages upon the success case (whereas
alternative #1 does) and doesn't have overhead (as alternative #2 does, sending votes and
then preprocesses. This only sends preprocesses).
*/
match topic {
Topic::Dkg => {
if DkgCompleted::get(self.txn, genesis).is_none() {
// Since it wasn't completed, instruct the processor to start the next attempt
let id =
processor_messages::key_gen::KeyGenId { session: self.spec.set().session, attempt };
let our_i = self.spec.i(Ristretto::generator() * self.our_key.deref()).unwrap();
// TODO: Handle removed parties (modify n/i to accept list of removed)
// TODO: Don't fatal slash, yet don't include, parties who have been offline so long as
// we still meet the needed threshold. We'd need a complete DKG protocol we then remove
// the offline participants from. publishing the DKG protocol completed without them.
let params =
frost::ThresholdParams::new(self.spec.t(), self.spec.n(), our_i.start).unwrap();
let shares = u16::from(our_i.end) - u16::from(our_i.start);
self
.processors
.send(
self.spec.set().network,
processor_messages::key_gen::CoordinatorMessage::GenerateKey { id, params, shares },
)
.await;
}
}
Topic::DkgConfirmation => {
panic!("re-attempting DkgConfirmation when we should be re-attempting the Dkg")
}
Topic::DkgRemoval(removing) => {
if DkgCompleted::get(self.txn, genesis).is_none() &&
LocallyDkgRemoved::get(self.txn, genesis, removing).is_none() &&
SeraiDkgCompleted::get(self.txn, self.spec.set()).is_none() &&
SeraiDkgRemoval::get(self.txn, self.spec.set(), removing).is_none()
{
// Since it wasn't completed, attempt a new DkgRemoval
self.dkg_removal_attempt(removing, attempt).await;
}
}
Topic::SubstrateSign(inner_id) => {
let id = processor_messages::coordinator::SubstrateSignId {
session: self.spec.set().session,
id: inner_id,
attempt,
};
match inner_id {
SubstrateSignableId::CosigningSubstrateBlock(block) => {
let block_number = SeraiBlockNumber::get(self.txn, block)
.expect("couldn't get the block number for prior attempted cosign");
// Check if the cosigner has a signature from our set for this block/a newer one
let latest_cosign =
crate::cosign_evaluator::LatestCosign::get(self.txn, self.spec.set().network)
.map(|cosign| cosign.block_number)
.unwrap_or(0);
if latest_cosign < block_number {
// Instruct the processor to start the next attempt
self
.processors
.send(
self.spec.set().network,
processor_messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
id,
block_number,
},
)
.await;
}
}
SubstrateSignableId::Batch(batch) => {
// If the Batch hasn't appeared on-chain...
if BatchInstructionsHashDb::get(self.txn, self.spec.set().network, batch).is_none() {
// Instruct the processor to start the next attempt
// The processor won't continue if it's already signed a Batch
// Prior checking if the Batch is on-chain just may reduce the non-participating
// 33% from publishing their re-attempt messages
self
.processors
.send(
self.spec.set().network,
processor_messages::coordinator::CoordinatorMessage::BatchReattempt { id },
)
.await;
}
}
}
}
Topic::Sign(id) => {
// Instruct the processor to start the next attempt
// If it has already noted a completion, it won't send a preprocess and will simply drop
// the re-attempt message
self
.processors
.send(
self.spec.set().network,
processor_messages::sign::CoordinatorMessage::Reattempt {
id: processor_messages::sign::SignId {
session: self.spec.set().session,
id,
attempt,
},
},
)
.await;
}
}
}
}
}
@ -247,8 +378,10 @@ pub(crate) async fn handle_new_blocks<
) {
let genesis = tributary.genesis();
let mut last_block = LastHandledBlock::get(db, genesis).unwrap_or(genesis);
let mut block_number = TributaryBlockNumber::get(db, last_block).unwrap_or(0);
while let Some(next) = tributary.block_after(&last_block) {
let block = tributary.block(&next).unwrap();
block_number += 1;
// Make sure we have all of the provided transactions for this block
for tx in &block.transactions {
@ -264,6 +397,7 @@ pub(crate) async fn handle_new_blocks<
}
let mut txn = db.txn();
TributaryBlockNumber::set(&mut txn, next, &block_number);
(TributaryBlockHandler {
txn: &mut txn,
spec,
@ -273,6 +407,7 @@ pub(crate) async fn handle_new_blocks<
publish_serai_tx,
publish_tributary_tx,
block,
block_number,
_p2p: PhantomData::<P>,
})
.handle::<D>()
@ -368,6 +503,15 @@ pub(crate) async fn scan_tributaries_task<
}
}
PstTxType::RemoveParticipant(removed) => {
if let Ok(Some(_)) = serai.keys(spec.set()).await {
log::info!(
"keys were set before we {} {:?}",
"personally could publish the removal for",
hex::encode(removed)
);
break;
}
if let Ok(Some(participants)) =
serai.participants(spec.set().network).await
{

View file

@ -132,7 +132,6 @@ impl<Id: Clone + PartialEq + Eq + Debug + Encode + Decode> SignData<Id> {
pub enum Transaction {
RemoveParticipant(Participant),
// Once this completes successfully, no more instances should be created.
DkgCommitments {
attempt: u32,
commitments: Vec<Vec<u8>>,
@ -170,7 +169,7 @@ pub enum Transaction {
// with the current processor, yet it would still be an improvement.
Batch {
block: [u8; 32],
batch: [u8; 5],
batch: u32,
},
// When a Serai block is finalized, with the contained batches, we can allow the associated plan
// IDs
@ -230,13 +229,13 @@ impl Debug for Transaction {
Transaction::Batch { block, batch } => fmt
.debug_struct("Transaction::Batch")
.field("block", &hex::encode(block))
.field("batch", &hex::encode(batch))
.field("batch", &batch)
.finish(),
Transaction::SubstrateBlock(block) => {
fmt.debug_struct("Transaction::SubstrateBlock").field("block", block).finish()
}
Transaction::SubstrateSign(sign_data) => {
fmt.debug_struct("Transaction::Substrate").field("sign_data", sign_data).finish()
fmt.debug_struct("Transaction::SubstrateSign").field("sign_data", sign_data).finish()
}
Transaction::Sign(sign_data) => {
fmt.debug_struct("Transaction::Sign").field("sign_data", sign_data).finish()
@ -390,9 +389,9 @@ impl ReadWrite for Transaction {
7 => {
let mut block = [0; 32];
reader.read_exact(&mut block)?;
let mut batch = [0; 5];
let mut batch = [0; 4];
reader.read_exact(&mut batch)?;
Ok(Transaction::Batch { block, batch })
Ok(Transaction::Batch { block, batch: u32::from_le_bytes(batch) })
}
8 => {
@ -514,7 +513,7 @@ impl ReadWrite for Transaction {
Transaction::Batch { block, batch } => {
writer.write_all(&[7])?;
writer.write_all(block)?;
writer.write_all(batch)
writer.write_all(&batch.to_le_bytes())
}
Transaction::SubstrateBlock(block) => {

View file

@ -168,7 +168,7 @@ pub mod coordinator {
)]
pub enum SubstrateSignableId {
CosigningSubstrateBlock([u8; 32]),
Batch([u8; 5]),
Batch(u32),
}
#[derive(Clone, PartialEq, Eq, Hash, Debug, Encode, Decode, BorshSerialize, BorshDeserialize)]

View file

@ -16,7 +16,6 @@ use frost_schnorrkel::Schnorrkel;
use log::{info, debug, warn};
use scale::Encode;
use serai_client::{
primitives::{NetworkId, BlockHash},
in_instructions::primitives::{Batch, SignedBatch, batch_message},
@ -26,15 +25,10 @@ use serai_client::{
use messages::coordinator::*;
use crate::{Get, DbTxn, Db, create_db};
// Generate an ID unique to a Batch
fn batch_sign_id(network: NetworkId, id: u32) -> [u8; 5] {
(network, id).encode().try_into().unwrap()
}
create_db!(
BatchSignerDb {
CompletedDb: (id: [u8; 5]) -> (),
AttemptDb: (id: [u8; 5], attempt: u32) -> (),
CompletedDb: (id: u32) -> (),
AttemptDb: (id: u32, attempt: u32) -> (),
BatchDb: (block: BlockHash) -> SignedBatch
}
);
@ -51,14 +45,12 @@ pub struct BatchSigner<D: Db> {
session: Session,
keys: Vec<ThresholdKeys<Ristretto>>,
signable: HashMap<[u8; 5], Batch>,
attempt: HashMap<[u8; 5], u32>,
signable: HashMap<u32, Batch>,
attempt: HashMap<u32, u32>,
#[allow(clippy::type_complexity)]
preprocessing:
HashMap<[u8; 5], (Vec<AlgorithmSignMachine<Ristretto, Schnorrkel>>, Vec<Preprocess>)>,
preprocessing: HashMap<u32, (Vec<AlgorithmSignMachine<Ristretto, Schnorrkel>>, Vec<Preprocess>)>,
#[allow(clippy::type_complexity)]
signing:
HashMap<[u8; 5], (AlgorithmSignatureMachine<Ristretto, Schnorrkel>, Vec<SignatureShare>)>,
signing: HashMap<u32, (AlgorithmSignatureMachine<Ristretto, Schnorrkel>, Vec<SignatureShare>)>,
}
impl<D: Db> fmt::Debug for BatchSigner<D> {
@ -92,7 +84,7 @@ impl<D: Db> BatchSigner<D> {
}
}
fn verify_id(&self, id: &SubstrateSignId) -> Result<(Session, [u8; 5], u32), ()> {
fn verify_id(&self, id: &SubstrateSignId) -> Result<(Session, u32, u32), ()> {
let SubstrateSignId { session, id, attempt } = id;
let SubstrateSignableId::Batch(id) = id else { panic!("BatchSigner handed non-Batch") };
@ -104,17 +96,12 @@ impl<D: Db> BatchSigner<D> {
// rebooted OR we detected the signed batch on chain
// The latter is the expected flow for batches not actively being participated in
None => {
warn!("not attempting batch {} #{}", hex::encode(id), attempt);
warn!("not attempting batch {id} #{attempt}");
Err(())?;
}
Some(our_attempt) => {
if attempt != our_attempt {
warn!(
"sent signing data for batch {} #{} yet we have attempt #{}",
hex::encode(id),
attempt,
attempt
);
warn!("sent signing data for batch {id} #{attempt} yet we have attempt #{our_attempt}");
Err(())?;
}
}
@ -127,7 +114,7 @@ impl<D: Db> BatchSigner<D> {
async fn attempt(
&mut self,
txn: &mut D::Transaction<'_>,
id: [u8; 5],
id: u32,
attempt: u32,
) -> Option<ProcessorMessage> {
// See above commentary for why this doesn't emit SignedBatch
@ -138,12 +125,7 @@ impl<D: Db> BatchSigner<D> {
// Check if we're already working on this attempt
if let Some(curr_attempt) = self.attempt.get(&id) {
if curr_attempt >= &attempt {
warn!(
"told to attempt {} #{} yet we're already working on {}",
hex::encode(id),
attempt,
curr_attempt
);
warn!("told to attempt {id} #{attempt} yet we're already working on {curr_attempt}");
return None;
}
}
@ -163,7 +145,7 @@ impl<D: Db> BatchSigner<D> {
// Update the attempt number
self.attempt.insert(id, attempt);
info!("signing batch {} #{}", hex::encode(id), attempt);
info!("signing batch {id} #{attempt}");
// If we reboot mid-sign, the current design has us abort all signs and wait for latter
// attempts/new signing protocols
@ -180,9 +162,7 @@ impl<D: Db> BatchSigner<D> {
// TODO: This isn't complete as this txn may not be committed with the expected timing
if AttemptDb::get(txn, id, attempt).is_some() {
warn!(
"already attempted batch {}, attempt #{}. this is an error if we didn't reboot",
hex::encode(id),
attempt
"already attempted batch {id}, attempt #{attempt}. this is an error if we didn't reboot"
);
return None;
}
@ -215,7 +195,7 @@ impl<D: Db> BatchSigner<D> {
batch: Batch,
) -> Option<ProcessorMessage> {
debug_assert_eq!(self.network, batch.network);
let id = batch_sign_id(batch.network, batch.id);
let id = batch.id;
if CompletedDb::get(txn, id).is_some() {
debug!("Sign batch order for ID we've already completed signing");
// See batch_signed for commentary on why this simply returns
@ -246,10 +226,7 @@ impl<D: Db> BatchSigner<D> {
let (machines, our_preprocesses) = match self.preprocessing.remove(&id) {
// Either rebooted or RPC error, or some invariant
None => {
warn!(
"not preprocessing for {}. this is an error if we didn't reboot",
hex::encode(id),
);
warn!("not preprocessing for {id}. this is an error if we didn't reboot");
return None;
}
Some(preprocess) => preprocess,
@ -344,10 +321,7 @@ impl<D: Db> BatchSigner<D> {
panic!("never preprocessed yet signing?");
}
warn!(
"not preprocessing for {}. this is an error if we didn't reboot",
hex::encode(id)
);
warn!("not preprocessing for {id}. this is an error if we didn't reboot");
return None;
}
Some(signing) => signing,
@ -399,7 +373,7 @@ impl<D: Db> BatchSigner<D> {
},
};
info!("signed batch {} with attempt #{}", hex::encode(id), attempt);
info!("signed batch {id} with attempt #{attempt}");
let batch =
SignedBatch { batch: self.signable.remove(&id).unwrap(), signature: sig.into() };
@ -426,15 +400,13 @@ impl<D: Db> BatchSigner<D> {
}
pub fn batch_signed(&mut self, txn: &mut D::Transaction<'_>, id: u32) {
let sign_id = batch_sign_id(self.network, id);
// Stop trying to sign for this batch
CompletedDb::set(txn, sign_id, &());
CompletedDb::set(txn, id, &());
self.signable.remove(&sign_id);
self.attempt.remove(&sign_id);
self.preprocessing.remove(&sign_id);
self.signing.remove(&sign_id);
self.signable.remove(&id);
self.attempt.remove(&id);
self.preprocessing.remove(&id);
self.signing.remove(&id);
// This doesn't emit SignedBatch because it doesn't have access to the SignedBatch
// This function is expected to only be called once Substrate acknowledges this block,

View file

@ -13,7 +13,6 @@ use sp_application_crypto::{RuntimePublic, sr25519::Public};
use serai_db::{DbTxn, Db, MemDb};
use scale::Encode;
#[rustfmt::skip]
use serai_client::{primitives::*, in_instructions::primitives::*, validator_sets::primitives::Session};
@ -49,11 +48,8 @@ async fn test_batch_signer() {
],
};
let actual_id = SubstrateSignId {
session: Session(0),
id: SubstrateSignableId::Batch((batch.network, batch.id).encode().try_into().unwrap()),
attempt: 0,
};
let actual_id =
SubstrateSignId { session: Session(0), id: SubstrateSignableId::Batch(batch.id), attempt: 0 };
let mut signing_set = vec![];
while signing_set.len() < usize::from(keys.values().next().unwrap().params().t()) {

View file

@ -35,6 +35,23 @@ impl<'a> SeraiValidatorSets<'a> {
.await
}
pub async fn participant_removed_events(&self) -> Result<Vec<ValidatorSetsEvent>, SeraiError> {
self
.0
.events(|event| {
if let serai_abi::Event::ValidatorSets(event) = event {
if matches!(event, ValidatorSetsEvent::ParticipantRemoved { .. }) {
Some(event.clone())
} else {
None
}
} else {
None
}
})
.await
}
pub async fn key_gen_events(&self) -> Result<Vec<ValidatorSetsEvent>, SeraiError> {
self
.0

View file

@ -269,9 +269,22 @@ impl Processor {
assert_eq!(msg.id, *next_recv_id);
let msg_msg = borsh::from_slice(&msg.msg).unwrap();
if !is_cosign_message(&msg_msg) {
// Remove any BatchReattempts clogging the pipe
// TODO: Set up a wrapper around serai-client so we aren't throwing this away yet
// leave it for the tests
if matches!(
msg_msg,
messages::CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::BatchReattempt { .. }
)
) {
queue.ack(Service::Coordinator, msg.id).await;
*next_recv_id += 1;
continue;
}
if !is_cosign_message(&msg_msg) {
continue;
};
queue.ack(Service::Coordinator, msg.id).await;
*next_recv_id += 1;
msg_msg
@ -393,17 +406,13 @@ impl Processor {
*next_send_id += 1;
}
/// Receive a message from the coordinator as a processor.
pub async fn recv_message(&mut self) -> CoordinatorMessage {
async fn recv_message_inner(&mut self) -> CoordinatorMessage {
loop {
tokio::task::yield_now().await;
let mut queue_lock = self.queue.lock().await;
let (_, next_recv_id, queue) = &mut *queue_lock;
// Set a timeout of an entire 6 minutes as cosigning may be delayed by up to 5 minutes
let msg = tokio::time::timeout(Duration::from_secs(6 * 60), queue.next(Service::Coordinator))
.await
.unwrap();
let msg = queue.next(Service::Coordinator).await;
assert_eq!(msg.from, Service::Coordinator);
assert_eq!(msg.id, *next_recv_id);
@ -419,6 +428,13 @@ impl Processor {
}
}
/// Receive a message from the coordinator as a processor.
pub async fn recv_message(&mut self) -> CoordinatorMessage {
// Set a timeout of 15 minutes to allow effectively any protocol to occur without a fear of
// an arbitrary timeout cutting it short
tokio::time::timeout(Duration::from_secs(15 * 60), self.recv_message_inner()).await.unwrap()
}
pub async fn set_substrate_key(
&mut self,
substrate_key: Zeroizing<<Ristretto as Ciphersuite>::F>,

View file

@ -38,9 +38,7 @@ pub async fn batch(
substrate_key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
batch: Batch,
) -> u64 {
let mut id = [0; 5];
OsRng.fill_bytes(&mut id);
let id = SubstrateSignId { session, id: SubstrateSignableId::Batch(id), attempt: 0 };
let id = SubstrateSignId { session, id: SubstrateSignableId::Batch(batch.id), attempt: 0 };
for processor in processors.iter_mut() {
processor
@ -222,8 +220,19 @@ pub async fn batch(
// Verify the coordinator sends SubstrateBlock to all processors
let last_block = serai.finalized_block_by_number(last_serai_block).await.unwrap().unwrap();
for processor in processors {
// Handle a potential re-attempt message in the pipeline
let mut received = processor.recv_message().await;
if matches!(
received,
messages::CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::BatchReattempt { .. }
)
) {
received = processor.recv_message().await
}
assert_eq!(
processor.recv_message().await,
received,
messages::CoordinatorMessage::Substrate(
messages::substrate::CoordinatorMessage::SubstrateBlock {
context: SubstrateContext {

View file

@ -46,8 +46,8 @@ pub(crate) fn new_test() -> (Vec<(Handles, <Ristretto as Ciphersuite>::F)>, Dock
// Use an RPC to enaluate if a condition was met, with the following time being a timeout
// https://github.com/serai-dex/serai/issues/340
pub(crate) async fn wait_for_tributary() {
tokio::time::sleep(Duration::from_secs(20)).await;
tokio::time::sleep(Duration::from_secs(15)).await;
if std::env::var("GITHUB_CI") == Ok("true".to_string()) {
tokio::time::sleep(Duration::from_secs(40)).await;
tokio::time::sleep(Duration::from_secs(6)).await;
}
}

View file

@ -7,7 +7,6 @@ use dkg::{Participant, tests::clone_without};
use messages::{coordinator::*, SubstrateContext};
use scale::Encode;
use serai_client::{
primitives::{
BlockHash, Amount, Balance, crypto::RuntimePublic, PublicKey, SeraiAddress, NetworkId,
@ -28,11 +27,7 @@ pub(crate) async fn recv_batch_preprocesses(
batch: &Batch,
attempt: u32,
) -> (SubstrateSignId, HashMap<Participant, [u8; 64]>) {
let id = SubstrateSignId {
session,
id: SubstrateSignableId::Batch((batch.network, batch.id).encode().try_into().unwrap()),
attempt,
};
let id = SubstrateSignId { session, id: SubstrateSignableId::Batch(batch.id), attempt };
let mut block = None;
let mut preprocesses = HashMap::new();