Reload Tributaries

add_active_tributary writes the spec to disk before it returns, so even if the
VecDeque it pushes to isn't popped, the tributary will still be loaded on boot.
This commit is contained in:
Luke Parker 2023-04-23 04:31:00 -04:00
parent 2b09309adc
commit f2d9d70068
No known key found for this signature in database
5 changed files with 130 additions and 15 deletions

44
coordinator/src/db.rs Normal file
View file

@ -0,0 +1,44 @@
pub use serai_db::*;
use crate::tributary::TributarySpec;
#[derive(Debug)]
pub struct MainDb<D: Db>(pub D);
impl<D: Db> MainDb<D> {
pub fn new(db: D) -> Self {
Self(db)
}
fn main_key(dst: &'static [u8], key: impl AsRef<[u8]>) -> Vec<u8> {
D::key(b"MAIN", dst, key)
}
fn acive_tributaries_key() -> Vec<u8> {
Self::main_key(b"active_tributaries", [])
}
pub fn active_tributaries(&self) -> (Vec<u8>, Vec<TributarySpec>) {
let bytes = self.0.get(Self::acive_tributaries_key()).unwrap_or(vec![]);
let mut bytes_ref: &[u8] = bytes.as_ref();
let mut tributaries = vec![];
while !bytes_ref.is_empty() {
tributaries.push(TributarySpec::read(&mut bytes_ref).unwrap());
}
(bytes, tributaries)
}
pub fn add_active_tributary(&mut self, spec: &TributarySpec) {
let key = Self::acive_tributaries_key();
let (mut existing_bytes, existing) = self.active_tributaries();
for tributary in &existing {
if tributary == spec {
return;
}
}
spec.write(&mut existing_bytes).unwrap();
let mut txn = self.0.txn();
txn.put(key, existing_bytes);
txn.commit();
}
}

View file

@ -23,6 +23,9 @@ use ::tributary::Tributary;
mod tributary;
use crate::tributary::{TributarySpec, Transaction};
mod db;
use db::MainDb;
mod p2p;
pub use p2p::*;
@ -48,9 +51,9 @@ async fn run<D: Db, Pro: Processor, P: P2p>(
mut processor: Pro,
serai: Serai,
) {
let add_new_tributary = |spec: TributarySpec| async {
let add_new_tributary = |db, spec: TributarySpec| async {
MainDb(db).add_active_tributary(&spec);
NEW_TRIBUTARIES.write().await.push_back(spec);
// TODO: Save this tributary's information to the databae before returning
};
{
@ -92,6 +95,7 @@ async fn run<D: Db, Pro: Processor, P: P2p>(
let mut tributaries = HashMap::<[u8; 32], ActiveTributary<D, P>>::new();
// TODO: Use a db on a distinct volume
async fn add_tributary<D: Db, P: P2p>(
db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
@ -113,7 +117,10 @@ async fn run<D: Db, Pro: Processor, P: P2p>(
tributaries.insert(tributary.genesis(), ActiveTributary { spec, tributary });
}
// TODO: Reload tributaries
// TODO: Can MainDb take a borrow?
for spec in MainDb(raw_db.clone()).active_tributaries().1 {
add_tributary(raw_db.clone(), key.clone(), p2p.clone(), &mut tributaries, spec).await;
}
let mut tributary_db = tributary::TributaryDb::new(raw_db.clone());
tokio::spawn(async move {
@ -130,7 +137,7 @@ async fn run<D: Db, Pro: Processor, P: P2p>(
}
}
for (genesis, ActiveTributary { spec, tributary }) in tributaries.iter_mut() {
for ActiveTributary { spec, tributary } in tributaries.values() {
tributary::scanner::handle_new_blocks::<_, _, P>(
&mut tributary_db,
&key,

View file

@ -41,10 +41,10 @@ async fn in_set(
async fn handle_new_set<
D: Db,
Fut: Future<Output = ()>,
ANT: Clone + Fn(TributarySpec) -> Fut,
ANT: Clone + Fn(D, TributarySpec) -> Fut,
Pro: Processor,
>(
db: D,
db: &D,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
add_new_tributary: ANT,
processor: &mut Pro,
@ -56,7 +56,7 @@ async fn handle_new_set<
let set_data = serai.get_validator_set(set).await?.expect("NewSet for set which doesn't exist");
let spec = TributarySpec::new(block.hash(), block.time().unwrap(), set, set_data);
add_new_tributary(spec.clone());
add_new_tributary(db.clone(), spec.clone());
// Trigger a DKG
// TODO: Check how the processor handles this being fired multiple times
@ -210,7 +210,7 @@ async fn handle_batch_and_burns<Pro: Processor>(
async fn handle_block<
D: Db,
Fut: Future<Output = ()>,
ANT: Clone + Fn(TributarySpec) -> Fut,
ANT: Clone + Fn(D, TributarySpec) -> Fut,
Pro: Processor,
P: P2p,
>(
@ -236,7 +236,7 @@ async fn handle_block<
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
if let ValidatorSetsEvent::NewSet { set } = new_set {
// TODO2: Use a DB on a dedicated volume
handle_new_set(db.0.clone(), key, add_new_tributary.clone(), processor, serai, &block, set)
handle_new_set(&db.0, key, add_new_tributary.clone(), processor, serai, &block, set)
.await?;
} else {
panic!("NewSet event wasn't NewSet: {new_set:?}");
@ -281,7 +281,7 @@ async fn handle_block<
pub async fn handle_new_blocks<
D: Db,
Fut: Future<Output = ()>,
ANT: Clone + Fn(TributarySpec) -> Fut,
ANT: Clone + Fn(D, TributarySpec) -> Fut,
Pro: Processor,
P: P2p,
>(

View file

@ -61,7 +61,9 @@ pub fn new_spec<R: RngCore + CryptoRng>(
.unwrap(),
};
TributarySpec::new(serai_block, start_time, set, set_data)
let res = TributarySpec::new(serai_block, start_time, set, set_data);
assert_eq!(TributarySpec::read::<&[u8]>(&mut res.serialize().as_ref()).unwrap(), res);
res
}
pub async fn new_tributaries(

View file

@ -1,5 +1,8 @@
use core::ops::Deref;
use std::{io, collections::HashMap};
use std::{
io::{self, Read, Write},
collections::HashMap,
};
use zeroize::Zeroizing;
use rand_core::{RngCore, CryptoRng};
@ -7,13 +10,19 @@ use rand_core::{RngCore, CryptoRng};
use blake2::{Digest, Blake2s256};
use transcript::{Transcript, RecommendedTranscript};
use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto};
use ciphersuite::{
group::{ff::Field, GroupEncoding},
Ciphersuite, Ristretto,
};
use schnorr::SchnorrSignature;
use frost::Participant;
use scale::Encode;
use scale::{Encode, Decode};
use serai_client::validator_sets::primitives::{ValidatorSet, ValidatorSetData};
use serai_client::{
primitives::NetworkId,
validator_sets::primitives::{Session, ValidatorSet, ValidatorSetData},
};
#[rustfmt::skip]
use tributary::{
@ -99,6 +108,59 @@ impl TributarySpec {
pub fn validators(&self) -> Vec<(<Ristretto as Ciphersuite>::G, u64)> {
self.validators.clone()
}
pub fn write<W: Write>(&self, writer: &mut W) -> io::Result<()> {
writer.write_all(&self.serai_block)?;
writer.write_all(&self.start_time.to_le_bytes())?;
writer.write_all(&self.set.session.0.to_le_bytes())?;
let network_encoded = self.set.network.encode();
assert_eq!(network_encoded.len(), 1);
writer.write_all(&network_encoded)?;
writer.write_all(&u32::try_from(self.validators.len()).unwrap().to_le_bytes())?;
for validator in &self.validators {
writer.write_all(&validator.0.to_bytes())?;
writer.write_all(&validator.1.to_le_bytes())?;
}
Ok(())
}
pub fn serialize(&self) -> Vec<u8> {
let mut res = vec![];
self.write(&mut res).unwrap();
res
}
pub fn read<R: Read>(reader: &mut R) -> io::Result<Self> {
let mut serai_block = [0; 32];
reader.read_exact(&mut serai_block)?;
let mut start_time = [0; 8];
reader.read_exact(&mut start_time)?;
let start_time = u64::from_le_bytes(start_time);
let mut session = [0; 4];
reader.read_exact(&mut session)?;
let session = Session(u32::from_le_bytes(session));
let mut network = [0; 1];
reader.read_exact(&mut network)?;
let network = NetworkId::decode(&mut &network[..])
.map_err(|_| io::Error::new(io::ErrorKind::Other, "invalid network"))?;
let mut validators_len = [0; 4];
reader.read_exact(&mut validators_len)?;
let validators_len = usize::try_from(u32::from_le_bytes(validators_len)).unwrap();
let mut validators = Vec::with_capacity(validators_len);
for _ in 0 .. validators_len {
let key = Ristretto::read_G(reader)?;
let mut bond = [0; 8];
reader.read_exact(&mut bond)?;
validators.push((key, u64::from_le_bytes(bond)));
}
Ok(Self { serai_block, start_time, set: ValidatorSet { session, network }, validators })
}
}
#[derive(Clone, PartialEq, Eq, Debug)]