Flatten the map of preprocesses/shares, send Participant index with DkgParticipation

This commit is contained in:
Luke Parker 2025-01-15 14:24:51 -05:00
parent 167826aa88
commit f36bbcba25
No known key found for this signature in database
5 changed files with 74 additions and 61 deletions
coordinator
src
substrate/src
tributary/src
processor/key-gen/src

View file

@ -479,7 +479,7 @@ pub(crate) async fn spawn_tributary<P: P2p>(
// Spawn the scan task
let (scan_tributary_task_def, scan_tributary_task) = Task::new();
tokio::spawn(
ScanTributaryTask::<_, P>::new(tributary_db.clone(), &set, reader)
ScanTributaryTask::<_, P>::new(tributary_db.clone(), set.clone(), reader)
// This is the only handle for this TributaryProcessorMessagesTask, so when this task is
// dropped, it will be too
.continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]),

View file

@ -45,6 +45,9 @@ pub struct NewSetInformation {
/// The validators, with the amount of key shares they have.
pub validators: Vec<(SeraiAddress, u16)>,
/// The eVRF public keys.
///
/// This will have the necessary copies of the keys proper for each validator's weight,
/// accordingly syncing up with `participant_indexes`.
pub evrf_public_keys: Vec<([u8; 32], Vec<u8>)>,
/// The participant indexes, indexed by their validator.
#[borsh(skip)]

View file

@ -168,7 +168,7 @@ impl Topic {
}
}
fn required_participation(&self, n: u64) -> u64 {
fn required_participation(&self, n: u16) -> u16 {
let _ = self;
// All of our topics require 2/3rds participation
((2 * n) / 3) + 1
@ -218,7 +218,7 @@ create_db!(
SubstrateBlockPlans: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> Vec<[u8; 32]>,
// The weight accumulated for a topic.
AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u64,
AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u16,
// The entries accumulated for a topic, by validator.
Accumulated: <D: Borshy>(set: ValidatorSet, topic: Topic, validator: SeraiAddress) -> D,
@ -360,11 +360,11 @@ impl TributaryDb {
txn: &mut impl DbTxn,
set: ValidatorSet,
validators: &[SeraiAddress],
total_weight: u64,
total_weight: u16,
block_number: u64,
topic: Topic,
validator: SeraiAddress,
validator_weight: u64,
validator_weight: u16,
data: &D,
) -> DataSet<D> {
// This function will only be called once for a (validator, topic) tuple due to how we handle

View file

@ -109,32 +109,32 @@ struct ScanBlock<'a, TD: Db, TDT: DbTxn, P: P2p> {
_td: PhantomData<TD>,
_p2p: PhantomData<P>,
tributary_txn: &'a mut TDT,
set: ValidatorSet,
set: &'a NewSetInformation,
validators: &'a [SeraiAddress],
total_weight: u64,
validator_weights: &'a HashMap<SeraiAddress, u64>,
total_weight: u16,
validator_weights: &'a HashMap<SeraiAddress, u16>,
}
impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
fn potentially_start_cosign(&mut self) {
// Don't start a new cosigning instance if we're actively running one
if TributaryDb::actively_cosigning(self.tributary_txn, self.set).is_some() {
if TributaryDb::actively_cosigning(self.tributary_txn, self.set.set).is_some() {
return;
}
// Fetch the latest intended-to-be-cosigned block
let Some(latest_substrate_block_to_cosign) =
TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set)
TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set.set)
else {
return;
};
// If it was already cosigned, return
if TributaryDb::cosigned(self.tributary_txn, self.set, latest_substrate_block_to_cosign) {
if TributaryDb::cosigned(self.tributary_txn, self.set.set, latest_substrate_block_to_cosign) {
return;
}
let intent =
CosignIntents::take(self.tributary_txn, self.set, latest_substrate_block_to_cosign)
CosignIntents::take(self.tributary_txn, self.set.set, latest_substrate_block_to_cosign)
.expect("Transaction::Cosign locally provided but CosignIntents wasn't populated");
assert_eq!(
intent.block_hash, latest_substrate_block_to_cosign,
@ -144,16 +144,16 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
// Mark us as actively cosigning
TributaryDb::start_cosigning(
self.tributary_txn,
self.set,
self.set.set,
latest_substrate_block_to_cosign,
intent.block_number,
);
// Send the message for the processor to start signing
TributaryDb::send_message(
self.tributary_txn,
self.set,
self.set.set,
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
session: self.set.session,
session: self.set.set.session,
intent,
},
);
@ -166,7 +166,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
// TODO: The fact they can publish these TXs makes this a notable spam vector
if TributaryDb::is_fatally_slashed(
self.tributary_txn,
self.set,
self.set.set,
SeraiAddress(signer.to_bytes()),
) {
return;
@ -183,7 +183,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
if !self.validators.iter().any(|validator| *validator == participant) {
TributaryDb::fatal_slash(
self.tributary_txn,
self.set,
self.set.set,
signer,
"voted to remove non-existent participant",
);
@ -192,7 +192,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
match TributaryDb::accumulate(
self.tributary_txn,
self.set,
self.set.set,
self.validators,
self.total_weight,
block_number,
@ -203,7 +203,12 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
) {
DataSet::None => {}
DataSet::Participating(_) => {
TributaryDb::fatal_slash(self.tributary_txn, self.set, participant, "voted to remove");
TributaryDb::fatal_slash(
self.tributary_txn,
self.set.set,
participant,
"voted to remove",
);
}
};
}
@ -212,10 +217,10 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
Transaction::DkgParticipation { participation, signed } => {
TributaryDb::send_message(
self.tributary_txn,
self.set,
self.set.set,
messages::key_gen::CoordinatorMessage::Participation {
session: self.set.session,
participant: todo!("TODO"),
session: self.set.set.session,
participant: self.set.participant_indexes[&signer(signed)][0],
participation,
},
);
@ -233,7 +238,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
// Update the latest intended-to-be-cosigned Substrate block
TributaryDb::set_latest_substrate_block_to_cosign(
self.tributary_txn,
self.set,
self.set.set,
substrate_block_hash,
);
// Start a new cosign if we aren't already working on one
@ -246,32 +251,32 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
not-yet-Cosigned cosigns, we flag all cosigned blocks as cosigned. Then, when we choose
the next block to work on, we won't if it's already been cosigned.
*/
TributaryDb::mark_cosigned(self.tributary_txn, self.set, substrate_block_hash);
TributaryDb::mark_cosigned(self.tributary_txn, self.set.set, substrate_block_hash);
// If we aren't actively cosigning this block, return
// This occurs when we have Cosign TXs A, B, C, we received Cosigned for A and start on C,
// and then receive Cosigned for B
if TributaryDb::actively_cosigning(self.tributary_txn, self.set) !=
if TributaryDb::actively_cosigning(self.tributary_txn, self.set.set) !=
Some(substrate_block_hash)
{
return;
}
// Since this is the block we were cosigning, mark us as having finished cosigning
TributaryDb::finish_cosigning(self.tributary_txn, self.set);
TributaryDb::finish_cosigning(self.tributary_txn, self.set.set);
// Start working on the next cosign
self.potentially_start_cosign();
}
Transaction::SubstrateBlock { hash } => {
// Recognize all of the IDs this Substrate block causes to be signed
let plans = SubstrateBlockPlans::take(self.tributary_txn, self.set, hash).expect(
let plans = SubstrateBlockPlans::take(self.tributary_txn, self.set.set, hash).expect(
"Transaction::SubstrateBlock locally provided but SubstrateBlockPlans wasn't populated",
);
for plan in plans {
TributaryDb::recognize_topic(
self.tributary_txn,
self.set,
self.set.set,
Topic::Sign {
id: VariantSignId::Transaction(plan),
attempt: 0,
@ -284,7 +289,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
// Recognize the signing of this batch
TributaryDb::recognize_topic(
self.tributary_txn,
self.set,
self.set.set,
Topic::Sign {
id: VariantSignId::Batch(hash),
attempt: 0,
@ -299,7 +304,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
if slash_points.len() != self.validators.len() {
TributaryDb::fatal_slash(
self.tributary_txn,
self.set,
self.set.set,
signer,
"slash report was for a distinct amount of signers",
);
@ -309,7 +314,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
// Accumulate, and if past the threshold, calculate *the* slash report and start signing it
match TributaryDb::accumulate(
self.tributary_txn,
self.set,
self.set.set,
self.validators,
self.total_weight,
block_number,
@ -327,10 +332,6 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
have a supermajority agree the slash should be fatal. If there isn't a supermajority,
but the median believe the slash should be fatal, we need to fallback to a large
constant.
Also, TODO, each slash point should probably be considered as
`MAX_KEY_SHARES_PER_SET * BLOCK_TIME` seconds of downtime. As this time crosses
various thresholds (1 day, 3 days, etc), a multiplier should be attached.
*/
let mut median_slash_report = Vec::with_capacity(self.validators.len());
for i in 0 .. self.validators.len() {
@ -384,7 +385,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
// Recognize the topic for signing the slash report
TributaryDb::recognize_topic(
self.tributary_txn,
self.set,
self.set.set,
Topic::Sign {
id: VariantSignId::SlashReport,
attempt: 0,
@ -394,9 +395,9 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
// Send the message for the processor to start signing
TributaryDb::send_message(
self.tributary_txn,
self.set,
self.set.set,
messages::coordinator::CoordinatorMessage::SignSlashReport {
session: self.set.session,
session: self.set.set.session,
slash_report: slash_report.try_into().unwrap(),
},
);
@ -408,10 +409,10 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
let topic = topic.unwrap();
let signer = signer(signed);
if u64::try_from(data.len()).unwrap() != self.validator_weights[&signer] {
if data.len() != usize::from(self.validator_weights[&signer]) {
TributaryDb::fatal_slash(
self.tributary_txn,
self.set,
self.set.set,
signer,
"signer signed with a distinct amount of key shares than they had key shares",
);
@ -420,7 +421,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
match TributaryDb::accumulate(
self.tributary_txn,
self.set,
self.set.set,
self.validators,
self.total_weight,
block_number,
@ -431,12 +432,22 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
) {
DataSet::None => {}
DataSet::Participating(data_set) => {
let id = topic.sign_id(self.set).expect("Topic::Sign didn't have SignId");
let flatten_data_set = |data_set| todo!("TODO");
let id = topic.sign_id(self.set.set).expect("Topic::Sign didn't have SignId");
let flatten_data_set = |data_set: HashMap<_, Vec<_>>| {
let mut entries = HashMap::with_capacity(usize::from(self.total_weight));
for (validator, shares) in data_set {
let indexes = &self.set.participant_indexes[&validator];
assert_eq!(indexes.len(), shares.len());
for (index, share) in indexes.iter().zip(shares) {
entries.insert(*index, share);
}
}
entries
};
let data_set = flatten_data_set(data_set);
TributaryDb::send_message(
self.tributary_txn,
self.set,
self.set.set,
match round {
SigningProtocolRound::Preprocess => {
messages::sign::CoordinatorMessage::Preprocesses { id, preprocesses: data_set }
@ -453,7 +464,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
}
fn handle_block(mut self, block_number: u64, block: Block<Transaction>) {
TributaryDb::start_of_block(self.tributary_txn, self.set, block_number);
TributaryDb::start_of_block(self.tributary_txn, self.set.set, block_number);
for tx in block.transactions {
match tx {
@ -480,7 +491,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
// errors, mark the node as fatally slashed
TributaryDb::fatal_slash(
self.tributary_txn,
self.set,
self.set.set,
SeraiAddress(msgs.0.msg.sender),
&format!("invalid tendermint messages: {msgs:?}"),
);
@ -496,10 +507,10 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> {
/// The task to scan the Tributary, populating `ProcessorMessages`.
pub struct ScanTributaryTask<TD: Db, P: P2p> {
tributary_db: TD,
set: ValidatorSet,
set: NewSetInformation,
validators: Vec<SeraiAddress>,
total_weight: u64,
validator_weights: HashMap<SeraiAddress, u64>,
total_weight: u16,
validator_weights: HashMap<SeraiAddress, u16>,
tributary: TributaryReader<TD, Transaction>,
_p2p: PhantomData<P>,
}
@ -508,14 +519,13 @@ impl<TD: Db, P: P2p> ScanTributaryTask<TD, P> {
/// Create a new instance of this task.
pub fn new(
tributary_db: TD,
new_set: &NewSetInformation,
set: NewSetInformation,
tributary: TributaryReader<TD, Transaction>,
) -> Self {
let mut validators = Vec::with_capacity(new_set.validators.len());
let mut validators = Vec::with_capacity(set.validators.len());
let mut total_weight = 0;
let mut validator_weights = HashMap::with_capacity(new_set.validators.len());
for (validator, weight) in new_set.validators.iter().copied() {
let weight = u64::from(weight);
let mut validator_weights = HashMap::with_capacity(set.validators.len());
for (validator, weight) in set.validators.iter().copied() {
validators.push(validator);
total_weight += weight;
validator_weights.insert(validator, weight);
@ -523,7 +533,7 @@ impl<TD: Db, P: P2p> ScanTributaryTask<TD, P> {
ScanTributaryTask {
tributary_db,
set: new_set.set,
set,
validators,
total_weight,
validator_weights,
@ -539,7 +549,7 @@ impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
async move {
let (mut last_block_number, mut last_block_hash) =
TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set)
TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set.set)
.unwrap_or((0, self.tributary.genesis()));
let mut made_progress = false;
@ -558,7 +568,7 @@ impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
if !self.tributary.locally_provided_txs_in_block(&block_hash, order) {
return Err(format!(
"didn't have the provided Transactions on-chain for set (ephemeral error): {:?}",
self.set
self.set.set
));
}
}
@ -568,7 +578,7 @@ impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
_td: PhantomData::<TD>,
_p2p: PhantomData::<P>,
tributary_txn: &mut tributary_txn,
set: self.set,
set: &self.set,
validators: &self.validators,
total_weight: self.total_weight,
validator_weights: &self.validator_weights,
@ -576,7 +586,7 @@ impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> {
.handle_block(block_number, block);
TributaryDb::set_last_handled_tributary_block(
&mut tributary_txn,
self.set,
self.set.set,
block_number,
block_hash,
);

View file

@ -29,8 +29,8 @@ pub(crate) fn generators<C: EvrfCurve>() -> &'static EvrfGenerators<C> {
.or_insert_with(|| {
// If we haven't prior needed generators for this Ciphersuite, generate new ones
Box::leak(Box::new(EvrfGenerators::<C>::new(
((MAX_KEY_SHARES_PER_SET * 2 / 3) + 1).try_into().unwrap(),
MAX_KEY_SHARES_PER_SET.try_into().unwrap(),
(MAX_KEY_SHARES_PER_SET * 2 / 3) + 1,
MAX_KEY_SHARES_PER_SET,
)))
})
.downcast_ref()