diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index fd938025..581c4081 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -8,6 +8,7 @@ use blake2::{ use scale::{Encode, Decode}; use serai_client::{ primitives::NetworkId, + validator_sets::primitives::ValidatorSet, in_instructions::primitives::{Batch, SignedBatch}, }; @@ -95,15 +96,24 @@ impl<D: Db> MainDb<D> { getter.get(Self::first_preprocess_key(network, id)) } + fn last_received_batch_key(network: NetworkId) -> Vec<u8> { + Self::main_key(b"last_received_batch", network.encode()) + } fn expected_batch_key(network: NetworkId, id: u32) -> Vec<u8> { Self::main_key(b"expected_batch", (network, id).encode()) } pub fn save_expected_batch(txn: &mut D::Transaction<'_>, batch: &Batch) { + txn.put(Self::last_received_batch_key(batch.network), batch.id.to_le_bytes()); txn.put( Self::expected_batch_key(batch.network, batch.id), Blake2b::<U32>::digest(batch.instructions.encode()), ); } + pub fn last_received_batch<G: Get>(getter: &G, network: NetworkId) -> Option<u32> { + getter + .get(Self::last_received_batch_key(network)) + .map(|id| u32::from_le_bytes(id.try_into().unwrap())) + } pub fn expected_batch<G: Get>(getter: &G, network: NetworkId, id: u32) -> Option<[u8; 32]> { getter.get(Self::expected_batch_key(network, id)).map(|batch| batch.try_into().unwrap()) } @@ -131,4 +141,14 @@ impl<D: Db> MainDb<D> { .get(Self::last_verified_batch_key(network)) .map(|id| u32::from_le_bytes(id.try_into().unwrap())) } + + fn did_handover_key(set: ValidatorSet) -> Vec<u8> { + Self::main_key(b"did_handover", set.encode()) + } + pub fn set_did_handover(txn: &mut D::Transaction<'_>, set: ValidatorSet) { + txn.put(Self::did_handover_key(set), []); + } + pub fn did_handover<G: Get>(getter: &G, set: ValidatorSet) -> bool { + getter.get(Self::did_handover_key(set)).is_some() + } } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 0dc79bd3..c1625ce5 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -283,7 +283,7 @@ pub(crate) async fn scan_tributaries< "couldn't connect to Serai node to publish set_keys TX: {:?}", e ); - tokio::time::sleep(Duration::from_secs(10)).await; + sleep(Duration::from_secs(10)).await; } } } @@ -540,6 +540,38 @@ async fn publish_signed_transaction<D: Db, P: P2p>( txn.commit(); } +/// Verifies `Batch`s which have already been indexed from Substrate. +async fn verify_published_batches<D: Db>( + txn: &mut D::Transaction<'_>, + network: NetworkId, + optimistic_up_to: u32, +) -> Option<u32> { + let last = MainDb::<D>::last_verified_batch(txn, network); + for id in last.map(|last| last + 1).unwrap_or(0) ..= optimistic_up_to { + let Some(on_chain) = SubstrateDb::<D>::batch_instructions_hash(txn, network, id) else { + break; + }; + let off_chain = MainDb::<D>::expected_batch(txn, network, id).unwrap(); + if on_chain != off_chain { + // Halt operations on this network and spin, as this is a critical fault + loop { + log::error!( + "{}! network: {:?} id: {} off-chain: {} on-chain: {}", + "on-chain batch doesn't match off-chain", + network, + id, + hex::encode(off_chain), + hex::encode(on_chain), + ); + sleep(Duration::from_secs(60)).await; + } + } + MainDb::<D>::save_last_verified_batch(txn, network, id); + } + + MainDb::<D>::last_verified_batch(txn, network) +} + async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>( mut db: D, key: Zeroizing<<Ristretto as Ciphersuite>::F>, @@ -569,8 +601,6 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>( // TODO: Check this ID is sane (last handled ID or expected next ID) let msg = processors.recv(network).await; - // TODO: We need to verify the Batches published to Substrate - if !MainDb::<D>::handled_message(&db, msg.network, msg.id) { let mut txn = db.txn(); @@ -648,38 +678,11 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>( #[allow(clippy::let_unit_value, unused_variables)] let batch = (); - // Verify all `Batch`s which we've already indexed from Substrate - // This won't be complete, as it only runs when a `Batch` message is received, which - // will be before we get a `SignedBatch`. It is, however, incremental. We can use a - // complete version to finish the last section when we need a complete version. - let last = MainDb::<D>::last_verified_batch(&txn, msg.network); - // This variable exists so Rust can verify Send/Sync properties - let mut faulty = None; - for id in last.map(|last| last + 1).unwrap_or(0) ..= this_batch_id { - if let Some(on_chain) = SubstrateDb::<D>::batch_instructions_hash(&txn, network, id) { - let off_chain = MainDb::<D>::expected_batch(&txn, network, id).unwrap(); - if on_chain != off_chain { - faulty = Some((id, off_chain, on_chain)); - break; - } - MainDb::<D>::save_last_verified_batch(&mut txn, msg.network, id); - } - } - - if let Some((id, off_chain, on_chain)) = faulty { - // Halt operations on this network and spin, as this is a critical fault - loop { - log::error!( - "{}! network: {:?} id: {} off-chain: {} on-chain: {}", - "on-chain batch doesn't match off-chain", - network, - id, - hex::encode(off_chain), - hex::encode(on_chain), - ); - sleep(Duration::from_secs(60)).await; - } - } + // This won't be complete, as this call is when a `Batch` message is received, which + // will be before we get a `SignedBatch` + // It is, however, incremental + // When we need a complete version, we use another call, continuously called as-needed + verify_published_batches::<D>(&mut txn, msg.network, this_batch_id).await; None } @@ -705,7 +708,7 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>( "{} {network:?}", "couldn't connect to Serai node to get the next batch ID for", ); - tokio::time::sleep(Duration::from_secs(5)).await; + sleep(Duration::from_secs(5)).await; } first = false; @@ -762,7 +765,7 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>( // If we failed to publish it, restore it batches.push_front(batch); // Sleep for a few seconds before retrying to prevent hammering the node - tokio::time::sleep(Duration::from_secs(5)).await; + sleep(Duration::from_secs(5)).await; } } @@ -895,13 +898,32 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>( id.attempt, hex::encode(block), ); - // If this is the first attempt instance, wait until we synchronize around - // the batch first + + // If this is the first attempt instance, wait until we synchronize around the batch + // first if id.attempt == 0 { MainDb::<D>::save_first_preprocess(&mut txn, spec.set().network, id.id, preprocess); - // TODO: If this is the new key's first Batch, only create this TX once we verify + // If this is the new key's first Batch, only create this TX once we verify all // all prior published `Batch`s + if (spec.set().session.0 != 0) && (!MainDb::<D>::did_handover(&txn, spec.set())) { + let last_received = MainDb::<D>::last_received_batch(&txn, msg.network); + if let Some(last_received) = last_received { + // Decrease by 1, to get the ID of the Batch prior to this Batch + let prior_sets_last_batch = last_received - 1; + loop { + let successfully_verified = + verify_published_batches::<D>(&mut txn, msg.network, prior_sets_last_batch) + .await; + if successfully_verified == Some(prior_sets_last_batch) { + break; + } + sleep(Duration::from_secs(5)).await; + } + } + MainDb::<D>::set_did_handover(&mut txn, spec.set()); + } + Some(Transaction::Batch(block.0, id.id)) } else { Some(Transaction::BatchPreprocess(SignData { diff --git a/coordinator/src/tests/tributary/chain.rs b/coordinator/src/tests/tributary/chain.rs index a7d22747..36bdef41 100644 --- a/coordinator/src/tests/tributary/chain.rs +++ b/coordinator/src/tests/tributary/chain.rs @@ -54,7 +54,7 @@ pub fn new_spec<R: RngCore + CryptoRng>( let set_participants = keys .iter() - .map(|key| sr25519::Public((<Ristretto as Ciphersuite>::generator() * **key).to_bytes())) + .map(|key| (sr25519::Public((<Ristretto as Ciphersuite>::generator() * **key).to_bytes()), 1)) .collect::<Vec<_>>(); let res = TributarySpec::new(serai_block, start_time, set, set_participants); diff --git a/substrate/client/src/serai/validator_sets.rs b/substrate/client/src/serai/validator_sets.rs index 0b2362e3..a1920960 100644 --- a/substrate/client/src/serai/validator_sets.rs +++ b/substrate/client/src/serai/validator_sets.rs @@ -61,7 +61,9 @@ impl Serai { key: Public, at_hash: [u8; 32], ) -> Result<Option<Amount>, SeraiError> { - self.storage(PALLET, "Allocations", Some(vec![scale_value(network), scale_value(key)]), at_hash).await + self + .storage(PALLET, "Allocations", Some(vec![scale_value(network), scale_value(key)]), at_hash) + .await } pub async fn get_validator_set_musig_key(