From 8041a0d8450214042e28834178a9b8c6e84d0344 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 20 Apr 2023 05:05:17 -0400 Subject: [PATCH] Initial Tributary handling --- coordinator/src/main.rs | 79 +++++--- coordinator/src/processor.rs | 3 +- coordinator/src/tributary/db.rs | 92 ++++++++++ .../src/{tributary.rs => tributary/mod.rs} | 96 ++++++++-- coordinator/src/tributary/scanner.rs | 173 ++++++++++++++++++ coordinator/tributary/src/block.rs | 4 + coordinator/tributary/src/blockchain.rs | 4 + coordinator/tributary/src/lib.rs | 3 + coordinator/tributary/src/tendermint.rs | 1 + 9 files changed, 413 insertions(+), 42 deletions(-) create mode 100644 coordinator/src/tributary/db.rs rename coordinator/src/{tributary.rs => tributary/mod.rs} (75%) create mode 100644 coordinator/src/tributary/scanner.rs diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index b3c00be3..0044c08b 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,8 +1,9 @@ +#![allow(dead_code)] #![allow(unused_variables)] #![allow(unreachable_code)] #![allow(clippy::diverging_sub_expression)] -use std::time::Duration; +use std::{time::Duration, collections::HashMap}; use zeroize::Zeroizing; @@ -13,10 +14,7 @@ use serai_client::Serai; use tokio::time::sleep; -mod db; -pub use db::*; - -pub mod tributary; +mod tributary; mod p2p; pub use p2p::*; @@ -30,42 +28,63 @@ mod substrate; mod tests; async fn run( - db: D, + raw_db: D, key: Zeroizing<::F>, p2p: P, mut processor: Pro, serai: Serai, ) { - let mut db = MainDb::new(db); + let mut substrate_db = substrate::SubstrateDb::new(raw_db.clone()); + let mut last_substrate_block = substrate_db.last_block(); + let mut last_tributary_block = HashMap::<[u8; 32], _>::new(); - let mut last_substrate_block = db.last_substrate_block(); - - tokio::spawn(async move { - loop { - match substrate::handle_new_blocks( - &mut db, - &key, - &p2p, - &mut processor, - &serai, - &mut last_substrate_block, - ) - .await - { - Ok(()) => {} - Err(e) => { - log::error!("couldn't communicate with serai node: {e}"); - sleep(Duration::from_secs(5)).await; + { + let key = key.clone(); + let mut processor = processor.clone(); + tokio::spawn(async move { + loop { + match substrate::handle_new_blocks( + &mut substrate_db, + &key, + &p2p, + &mut processor, + &serai, + &mut last_substrate_block, + ) + .await + { + Ok(()) => sleep(Duration::from_secs(3)).await, + Err(e) => { + log::error!("couldn't communicate with serai node: {e}"); + sleep(Duration::from_secs(5)).await; + } } } - } - }); + }); + } + + { + let mut tributary_db = tributary::TributaryDb::new(raw_db); + tokio::spawn(async move { + loop { + for (_, last_block) in last_tributary_block.iter_mut() { + tributary::scanner::handle_new_blocks::<_, _, P>( + &mut tributary_db, + &key, + &mut processor, + todo!(), + todo!(), + last_block, + ) + .await; + } + sleep(Duration::from_secs(3)).await; + } + }); + } loop { - // Handle all messages from tributaries - // Handle all messages from processors - todo!() } } diff --git a/coordinator/src/processor.rs b/coordinator/src/processor.rs index 5817e290..3c6cda5c 100644 --- a/coordinator/src/processor.rs +++ b/coordinator/src/processor.rs @@ -12,13 +12,14 @@ pub struct Message { } #[async_trait::async_trait] -pub trait Processor: 'static + Send + Sync { +pub trait Processor: 'static + Send + Sync + Clone { async fn send(&mut self, msg: CoordinatorMessage); async fn recv(&mut self) -> Message; async fn ack(&mut self, msg: Message); } // TODO: Move this to tests +#[derive(Clone)] pub struct MemProcessor(Arc>>); impl MemProcessor { #[allow(clippy::new_without_default)] diff --git a/coordinator/src/tributary/db.rs b/coordinator/src/tributary/db.rs new file mode 100644 index 00000000..9dc47ae1 --- /dev/null +++ b/coordinator/src/tributary/db.rs @@ -0,0 +1,92 @@ +use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; + +pub use serai_db::*; + +#[derive(Debug)] +pub struct TributaryDb(pub D); +impl TributaryDb { + pub fn new(db: D) -> Self { + Self(db) + } + + fn tributary_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec { + D::key(b"TRIBUTARY", dst, key) + } + + fn block_key(genesis: [u8; 32]) -> Vec { + Self::tributary_key(b"block", genesis) + } + pub fn set_last_block(&mut self, genesis: [u8; 32], block: [u8; 32]) { + let mut txn = self.0.txn(); + txn.put(Self::block_key(genesis), block); + txn.commit(); + } + pub fn last_block(&self, genesis: [u8; 32]) -> [u8; 32] { + self.0.get(Self::block_key(genesis)).unwrap_or(genesis.to_vec()).try_into().unwrap() + } + + fn dkg_attempt_key(genesis: [u8; 32]) -> Vec { + Self::tributary_key(b"dkg_attempt", genesis) + } + pub fn dkg_attempt(getter: &G, genesis: [u8; 32]) -> u32 { + u32::from_le_bytes( + getter.get(Self::dkg_attempt_key(genesis)).unwrap_or(vec![0; 4]).try_into().unwrap(), + ) + } + + fn dkg_data_received_key(label: &'static [u8], genesis: &[u8], attempt: u32) -> Vec { + Self::tributary_key( + b"dkg_data_received", + [label, genesis, attempt.to_le_bytes().as_ref()].concat(), + ) + } + fn dkg_data_key( + label: &'static [u8], + genesis: &[u8], + signer: &::G, + attempt: u32, + ) -> Vec { + Self::tributary_key( + b"dkg_data", + [label, genesis, signer.to_bytes().as_ref(), attempt.to_le_bytes().as_ref()].concat(), + ) + } + pub fn dkg_data( + label: &'static [u8], + getter: &G, + genesis: [u8; 32], + signer: &::G, + attempt: u32, + ) -> Option> { + getter.get(Self::dkg_data_key(label, &genesis, signer, attempt)) + } + pub fn set_dkg_data( + label: &'static [u8], + txn: &mut D::Transaction<'_>, + genesis: [u8; 32], + signer: &::G, + attempt: u32, + data: &[u8], + ) -> u16 { + let received_key = Self::dkg_data_received_key(label, &genesis, attempt); + let mut received = + u16::from_le_bytes(txn.get(&received_key).unwrap_or(vec![0; 2]).try_into().unwrap()); + received += 1; + + txn.put(received_key, received.to_le_bytes()); + txn.put(Self::dkg_data_key(label, &genesis, signer, attempt), data); + + received + } + + fn event_key(id: &[u8], index: u32) -> Vec { + Self::tributary_key(b"event", [id, index.to_le_bytes().as_ref()].concat()) + } + pub fn handled_event(getter: &G, id: [u8; 32], index: u32) -> bool { + getter.get(Self::event_key(&id, index)).is_some() + } + pub fn handle_event(txn: &mut D::Transaction<'_>, id: [u8; 32], index: u32) { + assert!(!Self::handled_event(txn, id, index)); + txn.put(Self::event_key(&id, index), []); + } +} diff --git a/coordinator/src/tributary.rs b/coordinator/src/tributary/mod.rs similarity index 75% rename from coordinator/src/tributary.rs rename to coordinator/src/tributary/mod.rs index 2fe03d24..8d879177 100644 --- a/coordinator/src/tributary.rs +++ b/coordinator/src/tributary/mod.rs @@ -3,27 +3,101 @@ use std::{io, collections::HashMap}; use blake2::{Digest, Blake2s256}; use transcript::{Transcript, RecommendedTranscript}; +use ciphersuite::{Ciphersuite, Ristretto}; use frost::Participant; use scale::Encode; -use serai_client::validator_sets::primitives::ValidatorSet; +use serai_client::validator_sets::primitives::{ValidatorSet, ValidatorSetData}; #[rustfmt::skip] use tributary::{ ReadWrite, Signed, TransactionError, TransactionKind, Transaction as TransactionTrait, }; -pub fn genesis(serai_block: [u8; 32], set: ValidatorSet) -> [u8; 32] { - // Calculate the genesis for this Tributary - let mut genesis = RecommendedTranscript::new(b"Serai Tributary Genesis"); - // This locks it to a specific Serai chain - genesis.append_message(b"serai_block", serai_block); - genesis.append_message(b"session", set.session.0.to_le_bytes()); - genesis.append_message(b"network", set.network.encode()); - let genesis = genesis.challenge(b"genesis"); - let genesis_ref: &[u8] = genesis.as_ref(); - genesis_ref[.. 32].try_into().unwrap() +mod db; +pub use db::*; + +pub mod scanner; + +#[derive(Clone, PartialEq, Eq, Debug)] +pub struct TributarySpec { + serai_block: [u8; 32], + start_time: u64, + set: ValidatorSet, + validators: Vec<(::G, u64)>, +} + +impl TributarySpec { + pub fn new( + serai_block: [u8; 32], + start_time: u64, + set: ValidatorSet, + set_data: ValidatorSetData, + ) -> TributarySpec { + let mut validators = vec![]; + for (participant, amount) in set_data.participants { + // TODO: Ban invalid keys from being validators on the Serai side + let participant = ::read_G::<&[u8]>(&mut participant.0.as_ref()) + .expect("invalid key registered as participant"); + // Give one weight on Tributary per bond instance + validators.push((participant, amount.0 / set_data.bond.0)); + } + + Self { serai_block, start_time, set, validators } + } + + pub fn set(&self) -> ValidatorSet { + self.set + } + + pub fn genesis(&self) -> [u8; 32] { + // Calculate the genesis for this Tributary + let mut genesis = RecommendedTranscript::new(b"Serai Tributary Genesis"); + // This locks it to a specific Serai chain + genesis.append_message(b"serai_block", self.serai_block); + genesis.append_message(b"session", self.set.session.0.to_le_bytes()); + genesis.append_message(b"network", self.set.network.encode()); + let genesis = genesis.challenge(b"genesis"); + let genesis_ref: &[u8] = genesis.as_ref(); + genesis_ref[.. 32].try_into().unwrap() + } + + pub fn start_time(&self) -> u64 { + self.start_time + } + + pub fn n(&self) -> u16 { + // TODO: Support multiple key shares + // self.validators.iter().map(|(_, weight)| u16::try_from(weight).unwrap()).sum() + self.validators().len().try_into().unwrap() + } + + pub fn t(&self) -> u16 { + (2 * (self.n() / 3)) + 1 + } + + pub fn i(&self, key: ::G) -> Option { + let mut i = 1; + // TODO: Support multiple key shares + for (validator, _weight) in &self.validators { + if validator == &key { + // return (i .. (i + weight)).to_vec(); + return Some(Participant::new(i).unwrap()); + } + // i += weight; + i += 1; + } + None + } + + pub fn validators(&self) -> HashMap<::G, u64> { + let mut res = HashMap::new(); + for (key, amount) in self.validators.clone() { + res.insert(key, amount); + } + res + } } #[derive(Clone, PartialEq, Eq, Debug)] diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs new file mode 100644 index 00000000..1871fc27 --- /dev/null +++ b/coordinator/src/tributary/scanner.rs @@ -0,0 +1,173 @@ +use core::ops::Deref; +use std::collections::HashMap; + +use zeroize::Zeroizing; + +use ciphersuite::{Ciphersuite, Ristretto}; + +use tributary::{Signed, Block, P2p, Tributary}; + +use processor_messages::{ + key_gen::{self, KeyGenId}, + CoordinatorMessage, +}; + +use serai_db::DbTxn; + +use crate::{ + Db, + processor::Processor, + tributary::{TributaryDb, TributarySpec, Transaction}, +}; + +// Handle a specific Tributary block +async fn handle_block( + db: &mut TributaryDb, + key: &Zeroizing<::F>, + processor: &mut Pro, + spec: &TributarySpec, + tributary: &Tributary, + block: Block, +) { + let hash = block.hash(); + + let mut event_id = 0; + for tx in block.transactions { + if !TributaryDb::::handled_event(&db.0, hash, event_id) { + let mut txn = db.0.txn(); + + let mut handle_dkg = |label, attempt, mut bytes: Vec, signed: Signed| { + // If they've already published a TX for this attempt, slash + if let Some(data) = + TributaryDb::::dkg_data(label, &txn, tributary.genesis(), &signed.signer, attempt) + { + if data != bytes { + // TODO: Full slash + todo!(); + } + + // TODO: Slash + return None; + } + + // If the attempt is lesser than the blockchain's, slash + let curr_attempt = TributaryDb::::dkg_attempt(&txn, tributary.genesis()); + if attempt < curr_attempt { + // TODO: Slash for being late + return None; + } + if attempt > curr_attempt { + // TODO: Full slash + todo!(); + } + + // Store this data + let received = TributaryDb::::set_dkg_data( + label, + &mut txn, + tributary.genesis(), + &signed.signer, + attempt, + &bytes, + ); + + // If we have all commitments/shares, tell the processor + if received == spec.n() { + let mut data = HashMap::new(); + for validator in spec.validators().keys() { + data.insert( + spec.i(*validator).unwrap(), + if validator == &signed.signer { + bytes.split_off(0) + } else { + TributaryDb::::dkg_data(label, &txn, tributary.genesis(), validator, attempt) + .unwrap_or_else(|| { + panic!( + "received all DKG data yet couldn't load {} for a validator", + std::str::from_utf8(label).unwrap(), + ) + }) + }, + ); + } + + return Some((KeyGenId { set: spec.set(), attempt }, data)); + } + None + }; + + match tx { + Transaction::DkgCommitments(attempt, bytes, signed) => { + if let Some((id, commitments)) = handle_dkg(b"commitments", attempt, bytes, signed) { + processor + .send(CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Commitments { + id, + commitments, + })) + .await; + } + } + + Transaction::DkgShares(attempt, mut shares, signed) => { + if shares.len() != usize::from(spec.n()) { + // TODO: Full slash + continue; + } + + let bytes = shares + .remove( + &spec + .i(Ristretto::generator() * key.deref()) + .expect("in a tributary we're not a validator for"), + ) + .unwrap(); + + if let Some((id, shares)) = handle_dkg(b"shares", attempt, bytes, signed) { + processor + .send(CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Shares { id, shares })) + .await; + } + } + + Transaction::SignPreprocess(..) => todo!(), + Transaction::SignShare(..) => todo!(), + + Transaction::FinalizedBlock(..) => todo!(), + + Transaction::BatchPreprocess(..) => todo!(), + Transaction::BatchShare(..) => todo!(), + } + + TributaryDb::::handle_event(&mut txn, hash, event_id); + txn.commit(); + } + event_id += 1; + } +} + +pub async fn handle_new_blocks( + db: &mut TributaryDb, + key: &Zeroizing<::F>, + processor: &mut Pro, + spec: &TributarySpec, + tributary: &Tributary, + last_block: &mut [u8; 32], +) { + // Check if there's been a new Tributary block + let latest = tributary.tip(); + if latest == *last_block { + return; + } + + let mut blocks = vec![tributary.block(&latest).unwrap()]; + while blocks.last().unwrap().parent() != *last_block { + blocks.push(tributary.block(&blocks.last().unwrap().parent()).unwrap()); + } + + while let Some(block) = blocks.pop() { + let hash = block.hash(); + handle_block(db, key, processor, spec, tributary, block).await; + *last_block = hash; + db.set_last_block(tributary.genesis(), *last_block); + } +} diff --git a/coordinator/tributary/src/block.rs b/coordinator/tributary/src/block.rs index 5c356554..0ee1895f 100644 --- a/coordinator/tributary/src/block.rs +++ b/coordinator/tributary/src/block.rs @@ -132,6 +132,10 @@ impl Block { res } + pub fn parent(&self) -> [u8; 32] { + self.header.parent + } + pub fn hash(&self) -> [u8; 32] { self.header.hash() } diff --git a/coordinator/tributary/src/blockchain.rs b/coordinator/tributary/src/blockchain.rs index adbe946b..e787b223 100644 --- a/coordinator/tributary/src/blockchain.rs +++ b/coordinator/tributary/src/blockchain.rs @@ -84,6 +84,10 @@ impl Blockchain { res } + pub(crate) fn genesis(&self) -> [u8; 32] { + self.genesis + } + pub(crate) fn tip(&self) -> [u8; 32] { self.tip } diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 60be7030..e9ea3539 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -126,6 +126,9 @@ impl Tributary { Some(Self { network, synced_block, messages }) } + pub fn genesis(&self) -> [u8; 32] { + self.network.blockchain.read().unwrap().genesis() + } pub fn tip(&self) -> [u8; 32] { self.network.blockchain.read().unwrap().tip() } diff --git a/coordinator/tributary/src/tendermint.rs b/coordinator/tributary/src/tendermint.rs index 81ec7cb0..59814ffc 100644 --- a/coordinator/tributary/src/tendermint.rs +++ b/coordinator/tributary/src/tendermint.rs @@ -259,6 +259,7 @@ impl Network for TendermintNetwork { self.p2p.broadcast(to_broadcast).await } async fn slash(&mut self, validator: Self::ValidatorId) { + // TODO: Handle this slash log::error!( "validator {} triggered a slash event on tributary {}", hex::encode(validator),