Initial Tributary handling

This commit is contained in:
Luke Parker 2023-04-20 05:05:17 -04:00
parent 9e1f3fc85c
commit 8041a0d845
No known key found for this signature in database
9 changed files with 413 additions and 42 deletions

View file

@ -1,8 +1,9 @@
#![allow(dead_code)]
#![allow(unused_variables)]
#![allow(unreachable_code)]
#![allow(clippy::diverging_sub_expression)]
use std::time::Duration;
use std::{time::Duration, collections::HashMap};
use zeroize::Zeroizing;
@ -13,10 +14,7 @@ use serai_client::Serai;
use tokio::time::sleep;
mod db;
pub use db::*;
pub mod tributary;
mod tributary;
mod p2p;
pub use p2p::*;
@ -30,42 +28,63 @@ mod substrate;
mod tests;
async fn run<D: Db, Pro: Processor, P: P2p>(
db: D,
raw_db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
p2p: P,
mut processor: Pro,
serai: Serai,
) {
let mut db = MainDb::new(db);
let mut substrate_db = substrate::SubstrateDb::new(raw_db.clone());
let mut last_substrate_block = substrate_db.last_block();
let mut last_tributary_block = HashMap::<[u8; 32], _>::new();
let mut last_substrate_block = db.last_substrate_block();
tokio::spawn(async move {
loop {
match substrate::handle_new_blocks(
&mut db,
&key,
&p2p,
&mut processor,
&serai,
&mut last_substrate_block,
)
.await
{
Ok(()) => {}
Err(e) => {
log::error!("couldn't communicate with serai node: {e}");
sleep(Duration::from_secs(5)).await;
{
let key = key.clone();
let mut processor = processor.clone();
tokio::spawn(async move {
loop {
match substrate::handle_new_blocks(
&mut substrate_db,
&key,
&p2p,
&mut processor,
&serai,
&mut last_substrate_block,
)
.await
{
Ok(()) => sleep(Duration::from_secs(3)).await,
Err(e) => {
log::error!("couldn't communicate with serai node: {e}");
sleep(Duration::from_secs(5)).await;
}
}
}
}
});
});
}
{
let mut tributary_db = tributary::TributaryDb::new(raw_db);
tokio::spawn(async move {
loop {
for (_, last_block) in last_tributary_block.iter_mut() {
tributary::scanner::handle_new_blocks::<_, _, P>(
&mut tributary_db,
&key,
&mut processor,
todo!(),
todo!(),
last_block,
)
.await;
}
sleep(Duration::from_secs(3)).await;
}
});
}
loop {
// Handle all messages from tributaries
// Handle all messages from processors
todo!()
}
}

View file

@ -12,13 +12,14 @@ pub struct Message {
}
#[async_trait::async_trait]
pub trait Processor: 'static + Send + Sync {
pub trait Processor: 'static + Send + Sync + Clone {
async fn send(&mut self, msg: CoordinatorMessage);
async fn recv(&mut self) -> Message;
async fn ack(&mut self, msg: Message);
}
// TODO: Move this to tests
#[derive(Clone)]
pub struct MemProcessor(Arc<RwLock<VecDeque<Message>>>);
impl MemProcessor {
#[allow(clippy::new_without_default)]

View file

@ -0,0 +1,92 @@
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
pub use serai_db::*;
#[derive(Debug)]
pub struct TributaryDb<D: Db>(pub D);
impl<D: Db> TributaryDb<D> {
pub fn new(db: D) -> Self {
Self(db)
}
fn tributary_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
D::key(b"TRIBUTARY", dst, key)
}
fn block_key(genesis: [u8; 32]) -> Vec<u8> {
Self::tributary_key(b"block", genesis)
}
pub fn set_last_block(&mut self, genesis: [u8; 32], block: [u8; 32]) {
let mut txn = self.0.txn();
txn.put(Self::block_key(genesis), block);
txn.commit();
}
pub fn last_block(&self, genesis: [u8; 32]) -> [u8; 32] {
self.0.get(Self::block_key(genesis)).unwrap_or(genesis.to_vec()).try_into().unwrap()
}
fn dkg_attempt_key(genesis: [u8; 32]) -> Vec<u8> {
Self::tributary_key(b"dkg_attempt", genesis)
}
pub fn dkg_attempt<G: Get>(getter: &G, genesis: [u8; 32]) -> u32 {
u32::from_le_bytes(
getter.get(Self::dkg_attempt_key(genesis)).unwrap_or(vec![0; 4]).try_into().unwrap(),
)
}
fn dkg_data_received_key(label: &'static [u8], genesis: &[u8], attempt: u32) -> Vec<u8> {
Self::tributary_key(
b"dkg_data_received",
[label, genesis, attempt.to_le_bytes().as_ref()].concat(),
)
}
fn dkg_data_key(
label: &'static [u8],
genesis: &[u8],
signer: &<Ristretto as Ciphersuite>::G,
attempt: u32,
) -> Vec<u8> {
Self::tributary_key(
b"dkg_data",
[label, genesis, signer.to_bytes().as_ref(), attempt.to_le_bytes().as_ref()].concat(),
)
}
pub fn dkg_data<G: Get>(
label: &'static [u8],
getter: &G,
genesis: [u8; 32],
signer: &<Ristretto as Ciphersuite>::G,
attempt: u32,
) -> Option<Vec<u8>> {
getter.get(Self::dkg_data_key(label, &genesis, signer, attempt))
}
pub fn set_dkg_data(
label: &'static [u8],
txn: &mut D::Transaction<'_>,
genesis: [u8; 32],
signer: &<Ristretto as Ciphersuite>::G,
attempt: u32,
data: &[u8],
) -> u16 {
let received_key = Self::dkg_data_received_key(label, &genesis, attempt);
let mut received =
u16::from_le_bytes(txn.get(&received_key).unwrap_or(vec![0; 2]).try_into().unwrap());
received += 1;
txn.put(received_key, received.to_le_bytes());
txn.put(Self::dkg_data_key(label, &genesis, signer, attempt), data);
received
}
fn event_key(id: &[u8], index: u32) -> Vec<u8> {
Self::tributary_key(b"event", [id, index.to_le_bytes().as_ref()].concat())
}
pub fn handled_event<G: Get>(getter: &G, id: [u8; 32], index: u32) -> bool {
getter.get(Self::event_key(&id, index)).is_some()
}
pub fn handle_event(txn: &mut D::Transaction<'_>, id: [u8; 32], index: u32) {
assert!(!Self::handled_event(txn, id, index));
txn.put(Self::event_key(&id, index), []);
}
}

View file

@ -3,27 +3,101 @@ use std::{io, collections::HashMap};
use blake2::{Digest, Blake2s256};
use transcript::{Transcript, RecommendedTranscript};
use ciphersuite::{Ciphersuite, Ristretto};
use frost::Participant;
use scale::Encode;
use serai_client::validator_sets::primitives::ValidatorSet;
use serai_client::validator_sets::primitives::{ValidatorSet, ValidatorSetData};
#[rustfmt::skip]
use tributary::{
ReadWrite, Signed, TransactionError, TransactionKind, Transaction as TransactionTrait,
};
pub fn genesis(serai_block: [u8; 32], set: ValidatorSet) -> [u8; 32] {
// Calculate the genesis for this Tributary
let mut genesis = RecommendedTranscript::new(b"Serai Tributary Genesis");
// This locks it to a specific Serai chain
genesis.append_message(b"serai_block", serai_block);
genesis.append_message(b"session", set.session.0.to_le_bytes());
genesis.append_message(b"network", set.network.encode());
let genesis = genesis.challenge(b"genesis");
let genesis_ref: &[u8] = genesis.as_ref();
genesis_ref[.. 32].try_into().unwrap()
mod db;
pub use db::*;
pub mod scanner;
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct TributarySpec {
serai_block: [u8; 32],
start_time: u64,
set: ValidatorSet,
validators: Vec<(<Ristretto as Ciphersuite>::G, u64)>,
}
impl TributarySpec {
pub fn new(
serai_block: [u8; 32],
start_time: u64,
set: ValidatorSet,
set_data: ValidatorSetData,
) -> TributarySpec {
let mut validators = vec![];
for (participant, amount) in set_data.participants {
// TODO: Ban invalid keys from being validators on the Serai side
let participant = <Ristretto as Ciphersuite>::read_G::<&[u8]>(&mut participant.0.as_ref())
.expect("invalid key registered as participant");
// Give one weight on Tributary per bond instance
validators.push((participant, amount.0 / set_data.bond.0));
}
Self { serai_block, start_time, set, validators }
}
pub fn set(&self) -> ValidatorSet {
self.set
}
pub fn genesis(&self) -> [u8; 32] {
// Calculate the genesis for this Tributary
let mut genesis = RecommendedTranscript::new(b"Serai Tributary Genesis");
// This locks it to a specific Serai chain
genesis.append_message(b"serai_block", self.serai_block);
genesis.append_message(b"session", self.set.session.0.to_le_bytes());
genesis.append_message(b"network", self.set.network.encode());
let genesis = genesis.challenge(b"genesis");
let genesis_ref: &[u8] = genesis.as_ref();
genesis_ref[.. 32].try_into().unwrap()
}
pub fn start_time(&self) -> u64 {
self.start_time
}
pub fn n(&self) -> u16 {
// TODO: Support multiple key shares
// self.validators.iter().map(|(_, weight)| u16::try_from(weight).unwrap()).sum()
self.validators().len().try_into().unwrap()
}
pub fn t(&self) -> u16 {
(2 * (self.n() / 3)) + 1
}
pub fn i(&self, key: <Ristretto as Ciphersuite>::G) -> Option<Participant> {
let mut i = 1;
// TODO: Support multiple key shares
for (validator, _weight) in &self.validators {
if validator == &key {
// return (i .. (i + weight)).to_vec();
return Some(Participant::new(i).unwrap());
}
// i += weight;
i += 1;
}
None
}
pub fn validators(&self) -> HashMap<<Ristretto as Ciphersuite>::G, u64> {
let mut res = HashMap::new();
for (key, amount) in self.validators.clone() {
res.insert(key, amount);
}
res
}
}
#[derive(Clone, PartialEq, Eq, Debug)]

View file

@ -0,0 +1,173 @@
use core::ops::Deref;
use std::collections::HashMap;
use zeroize::Zeroizing;
use ciphersuite::{Ciphersuite, Ristretto};
use tributary::{Signed, Block, P2p, Tributary};
use processor_messages::{
key_gen::{self, KeyGenId},
CoordinatorMessage,
};
use serai_db::DbTxn;
use crate::{
Db,
processor::Processor,
tributary::{TributaryDb, TributarySpec, Transaction},
};
// Handle a specific Tributary block
async fn handle_block<D: Db, Pro: Processor, P: P2p>(
db: &mut TributaryDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
processor: &mut Pro,
spec: &TributarySpec,
tributary: &Tributary<D, Transaction, P>,
block: Block<Transaction>,
) {
let hash = block.hash();
let mut event_id = 0;
for tx in block.transactions {
if !TributaryDb::<D>::handled_event(&db.0, hash, event_id) {
let mut txn = db.0.txn();
let mut handle_dkg = |label, attempt, mut bytes: Vec<u8>, signed: Signed| {
// If they've already published a TX for this attempt, slash
if let Some(data) =
TributaryDb::<D>::dkg_data(label, &txn, tributary.genesis(), &signed.signer, attempt)
{
if data != bytes {
// TODO: Full slash
todo!();
}
// TODO: Slash
return None;
}
// If the attempt is lesser than the blockchain's, slash
let curr_attempt = TributaryDb::<D>::dkg_attempt(&txn, tributary.genesis());
if attempt < curr_attempt {
// TODO: Slash for being late
return None;
}
if attempt > curr_attempt {
// TODO: Full slash
todo!();
}
// Store this data
let received = TributaryDb::<D>::set_dkg_data(
label,
&mut txn,
tributary.genesis(),
&signed.signer,
attempt,
&bytes,
);
// If we have all commitments/shares, tell the processor
if received == spec.n() {
let mut data = HashMap::new();
for validator in spec.validators().keys() {
data.insert(
spec.i(*validator).unwrap(),
if validator == &signed.signer {
bytes.split_off(0)
} else {
TributaryDb::<D>::dkg_data(label, &txn, tributary.genesis(), validator, attempt)
.unwrap_or_else(|| {
panic!(
"received all DKG data yet couldn't load {} for a validator",
std::str::from_utf8(label).unwrap(),
)
})
},
);
}
return Some((KeyGenId { set: spec.set(), attempt }, data));
}
None
};
match tx {
Transaction::DkgCommitments(attempt, bytes, signed) => {
if let Some((id, commitments)) = handle_dkg(b"commitments", attempt, bytes, signed) {
processor
.send(CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Commitments {
id,
commitments,
}))
.await;
}
}
Transaction::DkgShares(attempt, mut shares, signed) => {
if shares.len() != usize::from(spec.n()) {
// TODO: Full slash
continue;
}
let bytes = shares
.remove(
&spec
.i(Ristretto::generator() * key.deref())
.expect("in a tributary we're not a validator for"),
)
.unwrap();
if let Some((id, shares)) = handle_dkg(b"shares", attempt, bytes, signed) {
processor
.send(CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Shares { id, shares }))
.await;
}
}
Transaction::SignPreprocess(..) => todo!(),
Transaction::SignShare(..) => todo!(),
Transaction::FinalizedBlock(..) => todo!(),
Transaction::BatchPreprocess(..) => todo!(),
Transaction::BatchShare(..) => todo!(),
}
TributaryDb::<D>::handle_event(&mut txn, hash, event_id);
txn.commit();
}
event_id += 1;
}
}
pub async fn handle_new_blocks<D: Db, Pro: Processor, P: P2p>(
db: &mut TributaryDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
processor: &mut Pro,
spec: &TributarySpec,
tributary: &Tributary<D, Transaction, P>,
last_block: &mut [u8; 32],
) {
// Check if there's been a new Tributary block
let latest = tributary.tip();
if latest == *last_block {
return;
}
let mut blocks = vec![tributary.block(&latest).unwrap()];
while blocks.last().unwrap().parent() != *last_block {
blocks.push(tributary.block(&blocks.last().unwrap().parent()).unwrap());
}
while let Some(block) = blocks.pop() {
let hash = block.hash();
handle_block(db, key, processor, spec, tributary, block).await;
*last_block = hash;
db.set_last_block(tributary.genesis(), *last_block);
}
}

View file

@ -132,6 +132,10 @@ impl<T: Transaction> Block<T> {
res
}
pub fn parent(&self) -> [u8; 32] {
self.header.parent
}
pub fn hash(&self) -> [u8; 32] {
self.header.hash()
}

View file

@ -84,6 +84,10 @@ impl<D: Db, T: Transaction> Blockchain<D, T> {
res
}
pub(crate) fn genesis(&self) -> [u8; 32] {
self.genesis
}
pub(crate) fn tip(&self) -> [u8; 32] {
self.tip
}

View file

@ -126,6 +126,9 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
Some(Self { network, synced_block, messages })
}
pub fn genesis(&self) -> [u8; 32] {
self.network.blockchain.read().unwrap().genesis()
}
pub fn tip(&self) -> [u8; 32] {
self.network.blockchain.read().unwrap().tip()
}

View file

@ -259,6 +259,7 @@ impl<D: Db, T: Transaction, P: P2p> Network for TendermintNetwork<D, T, P> {
self.p2p.broadcast(to_broadcast).await
}
async fn slash(&mut self, validator: Self::ValidatorId) {
// TODO: Handle this slash
log::error!(
"validator {} triggered a slash event on tributary {}",
hex::encode(validator),