From cc531d630ed11e4c5c0002ea0b3240134df5ada9 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 25 Apr 2023 02:36:20 -0400 Subject: [PATCH] Add a UID function to messages When we receive messages, we're provided with a message ID we can use to prevent handling an item multiple times. That doesn't prevent us from *sending* an item multiple times though. Thanks to the UID system, we can now not send if already present. Alternatively, we can remove the ordered message ID for just the UID, allowing duplicates to be sent without issue, and handled on the receiving end. --- Cargo.lock | 1 + processor/messages/Cargo.toml | 1 + processor/messages/src/lib.rs | 176 +++++++++++++++++++++++++++++++--- processor/src/main.rs | 6 ++ processor/src/plan.rs | 1 + 5 files changed, 173 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b956c2e5..020c80db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6655,6 +6655,7 @@ dependencies = [ name = "processor-messages" version = "0.1.0" dependencies = [ + "bincode", "dkg", "in-instructions-primitives", "serai-primitives", diff --git a/processor/messages/Cargo.toml b/processor/messages/Cargo.toml index 5908f77d..3086a3cd 100644 --- a/processor/messages/Cargo.toml +++ b/processor/messages/Cargo.toml @@ -17,6 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"] zeroize = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] } +bincode = "1" dkg = { path = "../../crypto/dkg", features = ["serde"] } diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index 73b187c1..ee2634fb 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -6,7 +6,7 @@ use serde::{Serialize, Deserialize}; use dkg::{Participant, ThresholdParams}; -use serai_primitives::BlockHash; +use serai_primitives::{BlockHash, NetworkId}; use in_instructions_primitives::SignedBatch; use tokens_primitives::OutInstructionWithBalance; use validator_sets_primitives::{ValidatorSet, KeyPair}; @@ -76,16 +76,6 @@ pub mod sign { Completed { key: Vec, id: [u8; 32], tx: Vec }, } - #[derive(Clone, PartialEq, Eq, Debug, Zeroize, Serialize, Deserialize)] - pub enum ProcessorMessage { - // Created preprocess for the specified signing protocol. - Preprocess { id: SignId, preprocess: Vec }, - // Signed share for the specified signing protocol. - Share { id: SignId, share: Vec }, - // Completed a signing protocol already. - Completed { key: Vec, id: [u8; 32], tx: Vec }, - } - impl CoordinatorMessage { pub fn required_block(&self) -> Option { None @@ -100,6 +90,17 @@ pub mod sign { } } } + + #[derive(Clone, PartialEq, Eq, Debug, Zeroize, Serialize, Deserialize)] + pub enum ProcessorMessage { + // Created preprocess for the specified signing protocol. + Preprocess { id: SignId, preprocess: Vec }, + // Signed share for the specified signing protocol. + Share { id: SignId, share: Vec }, + // Completed a signing protocol already. + // TODO: Move this to SignId + Completed { key: Vec, id: [u8; 32], tx: Vec }, + } } pub mod coordinator { @@ -134,7 +135,7 @@ pub mod coordinator { #[derive(Clone, PartialEq, Eq, Debug, Zeroize, Serialize, Deserialize)] pub enum ProcessorMessage { - SubstrateBlockAck { block: u64, plans: Vec<[u8; 32]> }, + SubstrateBlockAck { network: NetworkId, block: u64, plans: Vec<[u8; 32]> }, BatchPreprocess { id: SignId, preprocess: Vec }, BatchShare { id: SignId, share: [u8; 32] }, } @@ -152,6 +153,7 @@ pub mod substrate { }, SubstrateBlock { context: SubstrateContext, + network: NetworkId, block: u64, key: Vec, burns: Vec, @@ -207,3 +209,153 @@ pub enum ProcessorMessage { Coordinator(coordinator::ProcessorMessage), Substrate(substrate::ProcessorMessage), } + +const COORDINATOR_UID: u8 = 0; +const PROCESSSOR_UID: u8 = 1; + +const TYPE_KEY_GEN_UID: u8 = 2; +const TYPE_SIGN_UID: u8 = 3; +const TYPE_COORDINATOR_UID: u8 = 4; +const TYPE_SUBSTRATE_UID: u8 = 5; + +impl CoordinatorMessage { + /// A unique ID for this message, which should be unique across the validator's entire system, + /// including all of its processors. + /// + /// This doesn't use H(msg.serialize()) as it's meant to be unique to intent, not unique to + /// values. While the values should be consistent per intent, that assumption isn't required + /// here. + // TODO: Should this use borsh intead of bincode? + pub fn uid(&self) -> Vec { + match self { + CoordinatorMessage::KeyGen(msg) => { + // Unique since key gen ID embeds the validator set and attempt + let (sub, id) = match msg { + key_gen::CoordinatorMessage::GenerateKey { id, .. } => (0, id), + key_gen::CoordinatorMessage::Commitments { id, .. } => (1, id), + key_gen::CoordinatorMessage::Shares { id, .. } => (2, id), + }; + + let mut res = vec![COORDINATOR_UID, TYPE_KEY_GEN_UID, sub]; + res.extend(&bincode::serialize(id).unwrap()); + res + } + CoordinatorMessage::Sign(msg) => { + let (sub, id) = match msg { + // Unique since SignId includes a hash of the coin, and specific transaction info + sign::CoordinatorMessage::Preprocesses { id, .. } => (0, bincode::serialize(id).unwrap()), + sign::CoordinatorMessage::Shares { id, .. } => (1, bincode::serialize(id).unwrap()), + sign::CoordinatorMessage::Reattempt { id } => (2, bincode::serialize(id).unwrap()), + // TODO: This doesn't embed the attempt. Accordingly, multiple distinct completions will + // be flattened. This isn't acceptable. + sign::CoordinatorMessage::Completed { id, .. } => (3, id.to_vec()), + }; + + let mut res = vec![COORDINATOR_UID, TYPE_SIGN_UID, sub]; + res.extend(&id); + res + } + CoordinatorMessage::Coordinator(msg) => { + let (sub, id) = match msg { + // Unique since this embeds the batch ID (hash of it, including its network) and attempt + coordinator::CoordinatorMessage::BatchPreprocesses { id, .. } => { + (0, bincode::serialize(id).unwrap()) + } + coordinator::CoordinatorMessage::BatchShares { id, .. } => { + (1, bincode::serialize(id).unwrap()) + } + coordinator::CoordinatorMessage::BatchReattempt { id, .. } => { + (2, bincode::serialize(id).unwrap()) + } + }; + + let mut res = vec![COORDINATOR_UID, TYPE_COORDINATOR_UID, sub]; + res.extend(&id); + res + } + CoordinatorMessage::Substrate(msg) => { + let (sub, id) = match msg { + // Unique since there's only one key pair for a set + substrate::CoordinatorMessage::ConfirmKeyPair { set, .. } => { + (0, bincode::serialize(set).unwrap()) + } + substrate::CoordinatorMessage::SubstrateBlock { network, block, .. } => { + (1, bincode::serialize(&(network, block)).unwrap()) + } + }; + + let mut res = vec![COORDINATOR_UID, TYPE_SUBSTRATE_UID, sub]; + res.extend(&id); + res + } + } + } +} + +impl ProcessorMessage { + /// A unique ID for this message, which should be unique across the validator's entire system, + /// including all of its processors. + /// + /// This doesn't use H(msg.serialize()) as it's meant to be unique to intent, not unique to + /// values. While the values should be consistent per intent, that assumption isn't required + /// here. + pub fn uid(&self) -> Vec { + match self { + ProcessorMessage::KeyGen(msg) => { + let (sub, id) = match msg { + // Unique since KeyGenId + key_gen::ProcessorMessage::Commitments { id, .. } => (0, id), + key_gen::ProcessorMessage::Shares { id, .. } => (1, id), + key_gen::ProcessorMessage::GeneratedKeyPair { id, .. } => (2, id), + }; + + let mut res = vec![PROCESSSOR_UID, TYPE_KEY_GEN_UID, sub]; + res.extend(&bincode::serialize(id).unwrap()); + res + } + ProcessorMessage::Sign(msg) => { + let (sub, id) = match msg { + // Unique since SignId + sign::ProcessorMessage::Preprocess { id, .. } => (0, bincode::serialize(id).unwrap()), + sign::ProcessorMessage::Share { id, .. } => (1, bincode::serialize(id).unwrap()), + // Unique since a processor will only sign a TX once + sign::ProcessorMessage::Completed { id, .. } => (2, id.to_vec()), + }; + + let mut res = vec![PROCESSSOR_UID, TYPE_SIGN_UID, sub]; + res.extend(&id); + res + } + ProcessorMessage::Coordinator(msg) => { + let (sub, id) = match msg { + coordinator::ProcessorMessage::SubstrateBlockAck { network, block, .. } => { + (0, bincode::serialize(&(network, block)).unwrap()) + } + // Unique since SignId + coordinator::ProcessorMessage::BatchPreprocess { id, .. } => { + (1, bincode::serialize(id).unwrap()) + } + coordinator::ProcessorMessage::BatchShare { id, .. } => { + (2, bincode::serialize(id).unwrap()) + } + }; + + let mut res = vec![PROCESSSOR_UID, TYPE_COORDINATOR_UID, sub]; + res.extend(&id); + res + } + ProcessorMessage::Substrate(msg) => { + let (sub, id) = match msg { + // Unique since network and ID binding + substrate::ProcessorMessage::Update { batch, .. } => { + (0, bincode::serialize(&(batch.batch.network, batch.batch.id)).unwrap()) + } + }; + + let mut res = vec![PROCESSSOR_UID, TYPE_SUBSTRATE_UID, sub]; + res.extend(&id); + res + } + } + } +} diff --git a/processor/src/main.rs b/processor/src/main.rs index d39ea9cb..f1412ce0 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -383,6 +383,7 @@ async fn handle_coordinator_msg( messages::substrate::CoordinatorMessage::SubstrateBlock { context, + network, block, key: key_vec, burns, @@ -408,7 +409,11 @@ async fn handle_coordinator_msg( instruction: OutInstruction { address, data }, balance, } = out; + // TODO: Check network is this coin's network + assert_eq!(balance.coin.network(), network); + if let Ok(address) = C::Address::try_from(address.consume()) { + // TODO: Add coin to payment payments.push(Payment { address, data: data.map(|data| data.consume()), @@ -426,6 +431,7 @@ async fn handle_coordinator_msg( coordinator .send(ProcessorMessage::Coordinator( messages::coordinator::ProcessorMessage::SubstrateBlockAck { + network, block, plans: plans.iter().map(|plan| plan.id()).collect(), }, diff --git a/processor/src/plan.rs b/processor/src/plan.rs index 28c62f84..e705661f 100644 --- a/processor/src/plan.rs +++ b/processor/src/plan.rs @@ -81,6 +81,7 @@ impl Plan { pub fn transcript(&self) -> RecommendedTranscript { let mut transcript = RecommendedTranscript::new(b"Serai Processor Plan ID"); transcript.domain_separate(b"meta"); + transcript.append_message(b"network", C::ID); transcript.append_message(b"key", self.key.to_bytes()); transcript.domain_separate(b"inputs");