diff --git a/coordinator/src/tributary.rs b/coordinator/src/tributary.rs index 6f3020cb..7162bbe1 100644 --- a/coordinator/src/tributary.rs +++ b/coordinator/src/tributary.rs @@ -479,7 +479,7 @@ pub(crate) async fn spawn_tributary<P: P2p>( // Spawn the scan task let (scan_tributary_task_def, scan_tributary_task) = Task::new(); tokio::spawn( - ScanTributaryTask::<_, P>::new(tributary_db.clone(), &set, reader) + ScanTributaryTask::<_, P>::new(tributary_db.clone(), set.clone(), reader) // This is the only handle for this TributaryProcessorMessagesTask, so when this task is // dropped, it will be too .continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]), diff --git a/coordinator/substrate/src/lib.rs b/coordinator/substrate/src/lib.rs index f313eb36..68566ff4 100644 --- a/coordinator/substrate/src/lib.rs +++ b/coordinator/substrate/src/lib.rs @@ -45,6 +45,9 @@ pub struct NewSetInformation { /// The validators, with the amount of key shares they have. pub validators: Vec<(SeraiAddress, u16)>, /// The eVRF public keys. + /// + /// This will have the necessary copies of the keys proper for each validator's weight, + /// accordingly syncing up with `participant_indexes`. pub evrf_public_keys: Vec<([u8; 32], Vec<u8>)>, /// The participant indexes, indexed by their validator. #[borsh(skip)] diff --git a/coordinator/tributary/src/db.rs b/coordinator/tributary/src/db.rs index aefe45d3..5812063e 100644 --- a/coordinator/tributary/src/db.rs +++ b/coordinator/tributary/src/db.rs @@ -168,7 +168,7 @@ impl Topic { } } - fn required_participation(&self, n: u64) -> u64 { + fn required_participation(&self, n: u16) -> u16 { let _ = self; // All of our topics require 2/3rds participation ((2 * n) / 3) + 1 @@ -218,7 +218,7 @@ create_db!( SubstrateBlockPlans: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> Vec<[u8; 32]>, // The weight accumulated for a topic. - AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u64, + AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u16, // The entries accumulated for a topic, by validator. Accumulated: <D: Borshy>(set: ValidatorSet, topic: Topic, validator: SeraiAddress) -> D, @@ -360,11 +360,11 @@ impl TributaryDb { txn: &mut impl DbTxn, set: ValidatorSet, validators: &[SeraiAddress], - total_weight: u64, + total_weight: u16, block_number: u64, topic: Topic, validator: SeraiAddress, - validator_weight: u64, + validator_weight: u16, data: &D, ) -> DataSet<D> { // This function will only be called once for a (validator, topic) tuple due to how we handle diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index f37928c3..d8390511 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -109,32 +109,32 @@ struct ScanBlock<'a, TD: Db, TDT: DbTxn, P: P2p> { _td: PhantomData<TD>, _p2p: PhantomData<P>, tributary_txn: &'a mut TDT, - set: ValidatorSet, + set: &'a NewSetInformation, validators: &'a [SeraiAddress], - total_weight: u64, - validator_weights: &'a HashMap<SeraiAddress, u64>, + total_weight: u16, + validator_weights: &'a HashMap<SeraiAddress, u16>, } impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { fn potentially_start_cosign(&mut self) { // Don't start a new cosigning instance if we're actively running one - if TributaryDb::actively_cosigning(self.tributary_txn, self.set).is_some() { + if TributaryDb::actively_cosigning(self.tributary_txn, self.set.set).is_some() { return; } // Fetch the latest intended-to-be-cosigned block let Some(latest_substrate_block_to_cosign) = - TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set) + TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set.set) else { return; }; // If it was already cosigned, return - if TributaryDb::cosigned(self.tributary_txn, self.set, latest_substrate_block_to_cosign) { + if TributaryDb::cosigned(self.tributary_txn, self.set.set, latest_substrate_block_to_cosign) { return; } let intent = - CosignIntents::take(self.tributary_txn, self.set, latest_substrate_block_to_cosign) + CosignIntents::take(self.tributary_txn, self.set.set, latest_substrate_block_to_cosign) .expect("Transaction::Cosign locally provided but CosignIntents wasn't populated"); assert_eq!( intent.block_hash, latest_substrate_block_to_cosign, @@ -144,16 +144,16 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { // Mark us as actively cosigning TributaryDb::start_cosigning( self.tributary_txn, - self.set, + self.set.set, latest_substrate_block_to_cosign, intent.block_number, ); // Send the message for the processor to start signing TributaryDb::send_message( self.tributary_txn, - self.set, + self.set.set, messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { - session: self.set.session, + session: self.set.set.session, intent, }, ); @@ -166,7 +166,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { // TODO: The fact they can publish these TXs makes this a notable spam vector if TributaryDb::is_fatally_slashed( self.tributary_txn, - self.set, + self.set.set, SeraiAddress(signer.to_bytes()), ) { return; @@ -183,7 +183,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { if !self.validators.iter().any(|validator| *validator == participant) { TributaryDb::fatal_slash( self.tributary_txn, - self.set, + self.set.set, signer, "voted to remove non-existent participant", ); @@ -192,7 +192,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { match TributaryDb::accumulate( self.tributary_txn, - self.set, + self.set.set, self.validators, self.total_weight, block_number, @@ -203,7 +203,12 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { ) { DataSet::None => {} DataSet::Participating(_) => { - TributaryDb::fatal_slash(self.tributary_txn, self.set, participant, "voted to remove"); + TributaryDb::fatal_slash( + self.tributary_txn, + self.set.set, + participant, + "voted to remove", + ); } }; } @@ -212,10 +217,10 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { Transaction::DkgParticipation { participation, signed } => { TributaryDb::send_message( self.tributary_txn, - self.set, + self.set.set, messages::key_gen::CoordinatorMessage::Participation { - session: self.set.session, - participant: todo!("TODO"), + session: self.set.set.session, + participant: self.set.participant_indexes[&signer(signed)][0], participation, }, ); @@ -233,7 +238,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { // Update the latest intended-to-be-cosigned Substrate block TributaryDb::set_latest_substrate_block_to_cosign( self.tributary_txn, - self.set, + self.set.set, substrate_block_hash, ); // Start a new cosign if we aren't already working on one @@ -246,32 +251,32 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { not-yet-Cosigned cosigns, we flag all cosigned blocks as cosigned. Then, when we choose the next block to work on, we won't if it's already been cosigned. */ - TributaryDb::mark_cosigned(self.tributary_txn, self.set, substrate_block_hash); + TributaryDb::mark_cosigned(self.tributary_txn, self.set.set, substrate_block_hash); // If we aren't actively cosigning this block, return // This occurs when we have Cosign TXs A, B, C, we received Cosigned for A and start on C, // and then receive Cosigned for B - if TributaryDb::actively_cosigning(self.tributary_txn, self.set) != + if TributaryDb::actively_cosigning(self.tributary_txn, self.set.set) != Some(substrate_block_hash) { return; } // Since this is the block we were cosigning, mark us as having finished cosigning - TributaryDb::finish_cosigning(self.tributary_txn, self.set); + TributaryDb::finish_cosigning(self.tributary_txn, self.set.set); // Start working on the next cosign self.potentially_start_cosign(); } Transaction::SubstrateBlock { hash } => { // Recognize all of the IDs this Substrate block causes to be signed - let plans = SubstrateBlockPlans::take(self.tributary_txn, self.set, hash).expect( + let plans = SubstrateBlockPlans::take(self.tributary_txn, self.set.set, hash).expect( "Transaction::SubstrateBlock locally provided but SubstrateBlockPlans wasn't populated", ); for plan in plans { TributaryDb::recognize_topic( self.tributary_txn, - self.set, + self.set.set, Topic::Sign { id: VariantSignId::Transaction(plan), attempt: 0, @@ -284,7 +289,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { // Recognize the signing of this batch TributaryDb::recognize_topic( self.tributary_txn, - self.set, + self.set.set, Topic::Sign { id: VariantSignId::Batch(hash), attempt: 0, @@ -299,7 +304,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { if slash_points.len() != self.validators.len() { TributaryDb::fatal_slash( self.tributary_txn, - self.set, + self.set.set, signer, "slash report was for a distinct amount of signers", ); @@ -309,7 +314,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { // Accumulate, and if past the threshold, calculate *the* slash report and start signing it match TributaryDb::accumulate( self.tributary_txn, - self.set, + self.set.set, self.validators, self.total_weight, block_number, @@ -327,10 +332,6 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { have a supermajority agree the slash should be fatal. If there isn't a supermajority, but the median believe the slash should be fatal, we need to fallback to a large constant. - - Also, TODO, each slash point should probably be considered as - `MAX_KEY_SHARES_PER_SET * BLOCK_TIME` seconds of downtime. As this time crosses - various thresholds (1 day, 3 days, etc), a multiplier should be attached. */ let mut median_slash_report = Vec::with_capacity(self.validators.len()); for i in 0 .. self.validators.len() { @@ -384,7 +385,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { // Recognize the topic for signing the slash report TributaryDb::recognize_topic( self.tributary_txn, - self.set, + self.set.set, Topic::Sign { id: VariantSignId::SlashReport, attempt: 0, @@ -394,9 +395,9 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { // Send the message for the processor to start signing TributaryDb::send_message( self.tributary_txn, - self.set, + self.set.set, messages::coordinator::CoordinatorMessage::SignSlashReport { - session: self.set.session, + session: self.set.set.session, slash_report: slash_report.try_into().unwrap(), }, ); @@ -408,10 +409,10 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { let topic = topic.unwrap(); let signer = signer(signed); - if u64::try_from(data.len()).unwrap() != self.validator_weights[&signer] { + if data.len() != usize::from(self.validator_weights[&signer]) { TributaryDb::fatal_slash( self.tributary_txn, - self.set, + self.set.set, signer, "signer signed with a distinct amount of key shares than they had key shares", ); @@ -420,7 +421,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { match TributaryDb::accumulate( self.tributary_txn, - self.set, + self.set.set, self.validators, self.total_weight, block_number, @@ -431,12 +432,22 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { ) { DataSet::None => {} DataSet::Participating(data_set) => { - let id = topic.sign_id(self.set).expect("Topic::Sign didn't have SignId"); - let flatten_data_set = |data_set| todo!("TODO"); + let id = topic.sign_id(self.set.set).expect("Topic::Sign didn't have SignId"); + let flatten_data_set = |data_set: HashMap<_, Vec<_>>| { + let mut entries = HashMap::with_capacity(usize::from(self.total_weight)); + for (validator, shares) in data_set { + let indexes = &self.set.participant_indexes[&validator]; + assert_eq!(indexes.len(), shares.len()); + for (index, share) in indexes.iter().zip(shares) { + entries.insert(*index, share); + } + } + entries + }; let data_set = flatten_data_set(data_set); TributaryDb::send_message( self.tributary_txn, - self.set, + self.set.set, match round { SigningProtocolRound::Preprocess => { messages::sign::CoordinatorMessage::Preprocesses { id, preprocesses: data_set } @@ -453,7 +464,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { } fn handle_block(mut self, block_number: u64, block: Block<Transaction>) { - TributaryDb::start_of_block(self.tributary_txn, self.set, block_number); + TributaryDb::start_of_block(self.tributary_txn, self.set.set, block_number); for tx in block.transactions { match tx { @@ -480,7 +491,7 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { // errors, mark the node as fatally slashed TributaryDb::fatal_slash( self.tributary_txn, - self.set, + self.set.set, SeraiAddress(msgs.0.msg.sender), &format!("invalid tendermint messages: {msgs:?}"), ); @@ -496,10 +507,10 @@ impl<'a, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, TD, TDT, P> { /// The task to scan the Tributary, populating `ProcessorMessages`. pub struct ScanTributaryTask<TD: Db, P: P2p> { tributary_db: TD, - set: ValidatorSet, + set: NewSetInformation, validators: Vec<SeraiAddress>, - total_weight: u64, - validator_weights: HashMap<SeraiAddress, u64>, + total_weight: u16, + validator_weights: HashMap<SeraiAddress, u16>, tributary: TributaryReader<TD, Transaction>, _p2p: PhantomData<P>, } @@ -508,14 +519,13 @@ impl<TD: Db, P: P2p> ScanTributaryTask<TD, P> { /// Create a new instance of this task. pub fn new( tributary_db: TD, - new_set: &NewSetInformation, + set: NewSetInformation, tributary: TributaryReader<TD, Transaction>, ) -> Self { - let mut validators = Vec::with_capacity(new_set.validators.len()); + let mut validators = Vec::with_capacity(set.validators.len()); let mut total_weight = 0; - let mut validator_weights = HashMap::with_capacity(new_set.validators.len()); - for (validator, weight) in new_set.validators.iter().copied() { - let weight = u64::from(weight); + let mut validator_weights = HashMap::with_capacity(set.validators.len()); + for (validator, weight) in set.validators.iter().copied() { validators.push(validator); total_weight += weight; validator_weights.insert(validator, weight); @@ -523,7 +533,7 @@ impl<TD: Db, P: P2p> ScanTributaryTask<TD, P> { ScanTributaryTask { tributary_db, - set: new_set.set, + set, validators, total_weight, validator_weights, @@ -539,7 +549,7 @@ impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> { fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> { async move { let (mut last_block_number, mut last_block_hash) = - TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set) + TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set.set) .unwrap_or((0, self.tributary.genesis())); let mut made_progress = false; @@ -558,7 +568,7 @@ impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> { if !self.tributary.locally_provided_txs_in_block(&block_hash, order) { return Err(format!( "didn't have the provided Transactions on-chain for set (ephemeral error): {:?}", - self.set + self.set.set )); } } @@ -568,7 +578,7 @@ impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> { _td: PhantomData::<TD>, _p2p: PhantomData::<P>, tributary_txn: &mut tributary_txn, - set: self.set, + set: &self.set, validators: &self.validators, total_weight: self.total_weight, validator_weights: &self.validator_weights, @@ -576,7 +586,7 @@ impl<TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<TD, P> { .handle_block(block_number, block); TributaryDb::set_last_handled_tributary_block( &mut tributary_txn, - self.set, + self.set.set, block_number, block_hash, ); diff --git a/processor/key-gen/src/generators.rs b/processor/key-gen/src/generators.rs index 3570ca6e..cff9c2f1 100644 --- a/processor/key-gen/src/generators.rs +++ b/processor/key-gen/src/generators.rs @@ -29,8 +29,8 @@ pub(crate) fn generators<C: EvrfCurve>() -> &'static EvrfGenerators<C> { .or_insert_with(|| { // If we haven't prior needed generators for this Ciphersuite, generate new ones Box::leak(Box::new(EvrfGenerators::<C>::new( - ((MAX_KEY_SHARES_PER_SET * 2 / 3) + 1).try_into().unwrap(), - MAX_KEY_SHARES_PER_SET.try_into().unwrap(), + (MAX_KEY_SHARES_PER_SET * 2 / 3) + 1, + MAX_KEY_SHARES_PER_SET, ))) }) .downcast_ref()