Most of coordinator Tributary retiry

Adds Event::SetRetired to validator-sets.

Emit TributaryRetired.

Replaces is_active_set, which made multiple network requests, with
is_retired_tributary, a DB read.

Performs most of the removals necessary upon TributaryRetired.

Still needs to clean up the actual Tributary/Tendermint tasks.
This commit is contained in:
Luke Parker 2023-10-14 16:47:25 -04:00
parent 5897efd7c7
commit 3b3fdd104b
No known key found for this signature in database
10 changed files with 145 additions and 110 deletions

View file

@ -37,6 +37,9 @@ impl<D: Db> MainDb<D> {
fn acive_tributaries_key() -> Vec<u8> {
Self::main_key(b"active_tributaries", [])
}
fn retired_tributary_key(set: ValidatorSet) -> Vec<u8> {
Self::main_key(b"retired_tributary", set.encode())
}
pub fn active_tributaries<G: Get>(getter: &G) -> (Vec<u8>, Vec<TributarySpec>) {
let bytes = getter.get(Self::acive_tributaries_key()).unwrap_or(vec![]);
let mut bytes_ref: &[u8] = bytes.as_ref();
@ -60,6 +63,25 @@ impl<D: Db> MainDb<D> {
spec.write(&mut existing_bytes).unwrap();
txn.put(key, existing_bytes);
}
pub fn retire_tributary(txn: &mut D::Transaction<'_>, set: ValidatorSet) {
let mut active = Self::active_tributaries(txn).1;
for i in 0 .. active.len() {
if active[i].set() == set {
active.remove(i);
break;
}
}
let mut bytes = vec![];
for active in active {
active.write(&mut bytes).unwrap();
}
txn.put(Self::acive_tributaries_key(), bytes);
txn.put(Self::retired_tributary_key(set), []);
}
pub fn is_tributary_retired<G: Get>(getter: &G, set: ValidatorSet) -> bool {
getter.get(Self::retired_tributary_key(set)).is_some()
}
fn signed_transaction_key(nonce: u32) -> Vec<u8> {
Self::main_key(b"signed_transaction", nonce.to_le_bytes())

View file

@ -50,7 +50,7 @@ pub mod processors;
use processors::Processors;
mod substrate;
use substrate::{SubstrateDb, is_active_set};
use substrate::SubstrateDb;
#[cfg(test)]
pub mod tests;
@ -67,18 +67,18 @@ pub enum TributaryEvent<D: Db, P: P2p> {
TributaryRetired(ValidatorSet),
}
// TODO: Clean up the actual underlying Tributary/Tendermint tasks
// Creates a new tributary and sends it to all listeners.
// TODO: retire_tributary
async fn add_tributary<D: Db, Pro: Processors, P: P2p>(
db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
serai: &Serai,
processors: &Pro,
p2p: P,
tributaries: &broadcast::Sender<TributaryEvent<D, P>>,
spec: TributarySpec,
) {
if !is_active_set(serai, spec.set()).await {
if MainDb::<D>::is_tributary_retired(&db, spec.set()) {
log::info!("not adding tributary {:?} since it's been retired", spec.set());
}
@ -363,12 +363,10 @@ async fn handle_processor_message<D: Db, P: P2p>(
// If we have a relevant Tributary, check it's actually still relevant and has yet to be retired
if let Some(relevant_tributary_value) = relevant_tributary {
if !is_active_set(
serai,
if MainDb::<D>::is_tributary_retired(
&txn,
ValidatorSet { network: msg.network, session: relevant_tributary_value },
)
.await
{
) {
relevant_tributary = None;
}
}
@ -382,8 +380,8 @@ async fn handle_processor_message<D: Db, P: P2p>(
let Some(ActiveTributary { spec, tributary }) = tributaries.get(&relevant_tributary) else {
// Since we don't, sleep for a fraction of a second and return false, signaling we didn't
// handle this message
// At the start of the loop which calls this function, we'll check for new tributaries, making
// this eventually resolve
// At the start of the loop which calls this function, we'll check for new tributaries,
// making this eventually resolve
sleep(Duration::from_millis(100)).await;
return false;
};
@ -659,25 +657,22 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
let mut tributaries = HashMap::new();
loop {
match tributary_event.try_recv() {
Ok(event) => {
match event {
TributaryEvent::NewTributary(tributary) => {
let set = tributary.spec.set();
assert_eq!(set.network, network);
tributaries.insert(set.session, tributary);
}
// TOOD
TributaryEvent::TributaryRetired(_) => todo!(),
Ok(event) => match event {
TributaryEvent::NewTributary(tributary) => {
let set = tributary.spec.set();
assert_eq!(set.network, network);
tributaries.insert(set.session, tributary);
}
}
TributaryEvent::TributaryRetired(set) => {
tributaries.remove(&set.session);
}
},
Err(mpsc::error::TryRecvError::Empty) => {}
Err(mpsc::error::TryRecvError::Disconnected) => {
panic!("handle_processor_messages tributary_event sender closed")
}
}
// TODO: Remove the Tributary if it's retired
// TODO: Check this ID is sane (last handled ID or expected next ID)
let msg = processors.recv(network).await;
if handle_processor_message(&mut db, &key, &serai, &tributaries, network, &msg).await {
@ -716,8 +711,9 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
TributaryEvent::NewTributary(tributary) => channels[&tributary.spec.set().network]
.send(TributaryEvent::NewTributary(tributary))
.unwrap(),
// TODO
TributaryEvent::TributaryRetired(_) => todo!(),
TributaryEvent::TributaryRetired(set) => {
channels[&set.network].send(TributaryEvent::TributaryRetired(set)).unwrap()
}
};
}
}
@ -737,6 +733,8 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
new_tributary_spec_send.send(spec).unwrap();
}
let (tributary_retired_send, mut tributary_retired_recv) = mpsc::unbounded_channel();
// Handle new Substrate blocks
tokio::spawn(crate::substrate::scan_task(
raw_db.clone(),
@ -744,6 +742,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
processors.clone(),
serai.clone(),
new_tributary_spec_send,
tributary_retired_send,
));
// Handle the Tributaries
@ -756,11 +755,21 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
let tributary_event_listener_4 = tributary_event.subscribe();
let tributary_event_listener_5 = tributary_event.subscribe();
// Emit TributaryEvent::TributaryRetired
tokio::spawn({
let tributary_event = tributary_event.clone();
async move {
loop {
let retired = tributary_retired_recv.recv().await.unwrap();
tributary_event.send(TributaryEvent::TributaryRetired(retired)).map_err(|_| ()).unwrap();
}
}
});
// Spawn a task to further add Tributaries as needed
tokio::spawn({
let raw_db = raw_db.clone();
let key = key.clone();
let serai = serai.clone();
let processors = processors.clone();
let p2p = p2p.clone();
async move {
@ -770,12 +779,11 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
tokio::spawn({
let raw_db = raw_db.clone();
let key = key.clone();
let serai = serai.clone();
let processors = processors.clone();
let p2p = p2p.clone();
let tributary_event = tributary_event.clone();
async move {
add_tributary(raw_db, key, &serai, &processors, p2p, &tributary_event, spec).await;
add_tributary(raw_db, key, &processors, p2p, &tributary_event, spec).await;
}
});
}
@ -790,14 +798,19 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
let tributaries = Arc::new(RwLock::new(HashMap::new()));
tokio::spawn({
let tributaries = tributaries.clone();
let mut set_to_genesis = HashMap::new();
async move {
loop {
match tributary_event_listener_1.recv().await {
Ok(TributaryEvent::NewTributary(tributary)) => {
set_to_genesis.insert(tributary.spec.set(), tributary.spec.genesis());
tributaries.write().await.insert(tributary.spec.genesis(), tributary.tributary);
}
// TODO
Ok(TributaryEvent::TributaryRetired(_)) => todo!(),
Ok(TributaryEvent::TributaryRetired(set)) => {
if let Some(genesis) = set_to_genesis.remove(&set) {
tributaries.write().await.remove(&genesis);
}
}
Err(broadcast::error::RecvError::Lagged(_)) => {
panic!("recognized_id lagged to handle tributary_event")
}
@ -807,7 +820,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
}
});
move |network, genesis, id_type, id, nonce| {
move |set: ValidatorSet, genesis, id_type, id, nonce| {
let mut raw_db = raw_db.clone();
let key = key.clone();
let tributaries = tributaries.clone();
@ -815,10 +828,11 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
// The transactions for these are fired before the preprocesses are actually
// received/saved, creating a race between Tributary ack and the availability of all
// Preprocesses
// This waits until the necessary preprocess is available
// This waits until the necessary preprocess is available 0,
// TODO: Incorporate RecognizedIdType here?
let get_preprocess = |raw_db, id| async move {
loop {
let Some(preprocess) = MainDb::<D>::first_preprocess(raw_db, network, id) else {
let Some(preprocess) = MainDb::<D>::first_preprocess(raw_db, set.network, id) else {
sleep(Duration::from_millis(100)).await;
continue;
};
@ -853,11 +867,19 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
let tributaries = tributaries.read().await;
let Some(tributary) = tributaries.get(&genesis) else {
// If we don't have this Tributary because it's retired, break and move on
if MainDb::<D>::is_tributary_retired(&raw_db, set) {
break;
}
// This may happen if the task above is simply slow
log::warn!("tributary we don't have yet came to consensus on an Batch");
continue;
};
// This is safe to perform multiple times and solely needs atomicity within itself
// This is safe to perform multiple times and solely needs atomicity with regards to
// itself
// TODO: Should this not take a TXN accordingly? It's best practice to take a txn, yet
// taking a txn fails to declare its achieved independence
let mut txn = raw_db.txn();
publish_signed_transaction(&mut txn, tributary, tx).await;
txn.commit();
@ -885,11 +907,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
tokio::spawn(p2p::heartbeat_tributaries_task(p2p.clone(), tributary_event_listener_3));
// Handle P2P messages
tokio::spawn(p2p::handle_p2p_task(
Ristretto::generator() * key.deref(),
p2p,
tributary_event_listener_4,
));
tokio::spawn(p2p::handle_p2p_task(p2p, tributary_event_listener_4));
// Handle all messages from processors
handle_processors(raw_db, key, serai, processors, tributary_event_listener_5).await;

View file

@ -8,8 +8,6 @@ use std::{
use async_trait::async_trait;
use ciphersuite::{Ciphersuite, Ristretto};
use serai_db::Db;
use tokio::{
@ -434,19 +432,20 @@ pub async fn heartbeat_tributaries_task<D: Db, P: P2p>(
}
pub async fn handle_p2p_task<D: Db, P: P2p>(
our_key: <Ristretto as Ciphersuite>::G,
p2p: P,
mut tributary_event: broadcast::Receiver<TributaryEvent<D, P>>,
) {
let channels = Arc::new(RwLock::new(HashMap::new()));
let channels = Arc::new(RwLock::new(HashMap::<_, mpsc::UnboundedSender<Message<P>>>::new()));
tokio::spawn({
let p2p = p2p.clone();
let channels = channels.clone();
let mut set_to_genesis = HashMap::new();
async move {
loop {
match tributary_event.recv().await.unwrap() {
TributaryEvent::NewTributary(tributary) => {
let genesis = tributary.spec.genesis();
set_to_genesis.insert(tributary.spec.set(), genesis);
let (send, mut recv) = mpsc::unbounded_channel();
channels.write().await.insert(genesis, send);
@ -455,7 +454,10 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
let p2p = p2p.clone();
async move {
loop {
let mut msg: Message<P> = recv.recv().await.unwrap();
let Some(mut msg) = recv.recv().await else {
// Channel closure happens when the tributary retires
break;
};
match msg.kind {
P2pMessageKind::KeepAlive => {}
@ -564,8 +566,11 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
}
});
}
// TODO
TributaryEvent::TributaryRetired(_) => todo!(),
TributaryEvent::TributaryRetired(set) => {
if let Some(genesis) = set_to_genesis.remove(&set) {
channels.write().await.remove(&genesis);
}
}
}
}
}

View file

@ -260,6 +260,7 @@ async fn handle_block<D: Db, Pro: Processors>(
db: &mut SubstrateDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
new_tributary_spec: &mpsc::UnboundedSender<TributarySpec>,
tributary_retired: &mpsc::UnboundedSender<ValidatorSet>,
processors: &Pro,
serai: &Serai,
block: Block,
@ -317,6 +318,26 @@ async fn handle_block<D: Db, Pro: Processors>(
event_id += 1;
}
for retired_set in serai.as_of(hash).validator_sets().set_retired_events().await? {
let ValidatorSetsEvent::SetRetired { set } = retired_set else {
panic!("SetRetired event wasn't SetRetired: {retired_set:?}");
};
if set.network == NetworkId::Serai {
continue;
}
if !SubstrateDb::<D>::handled_event(&db.0, hash, event_id) {
log::info!("found fresh set retired event {:?}", retired_set);
let mut txn = db.0.txn();
crate::MainDb::<D>::retire_tributary(&mut txn, set);
tributary_retired.send(set).unwrap();
SubstrateDb::<D>::handle_event(&mut txn, hash, event_id);
txn.commit();
}
event_id += 1;
}
// Finally, tell the processor of acknowledged blocks/burns
// This uses a single event as. unlike prior events which individually executed code, all
// following events share data collection
@ -336,6 +357,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
db: &mut SubstrateDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
new_tributary_spec: &mpsc::UnboundedSender<TributarySpec>,
tributary_retired: &mpsc::UnboundedSender<ValidatorSet>,
processors: &Pro,
serai: &Serai,
next_block: &mut u64,
@ -354,6 +376,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
db,
key,
new_tributary_spec,
tributary_retired,
processors,
serai,
if b == latest_number {
@ -380,6 +403,7 @@ pub async fn scan_task<D: Db, Pro: Processors>(
processors: Pro,
serai: Arc<Serai>,
new_tributary_spec: mpsc::UnboundedSender<TributarySpec>,
tributary_retired: mpsc::UnboundedSender<ValidatorSet>,
) {
log::info!("scanning substrate");
@ -431,6 +455,7 @@ pub async fn scan_task<D: Db, Pro: Processors>(
&mut db,
&key,
&new_tributary_spec,
&tributary_retired,
&processors,
&serai,
&mut next_substrate_block,
@ -446,58 +471,6 @@ pub async fn scan_task<D: Db, Pro: Processors>(
}
}
/// Returns if a ValidatorSet has yet to be retired.
pub async fn is_active_set(serai: &Serai, set: ValidatorSet) -> bool {
// TODO: Track this from the Substrate scanner to reduce our overhead? We'd only have a DB
// call, instead of a series of network requests
let serai = loop {
let Ok(serai) = serai.with_current_latest_block().await else {
log::error!("couldn't get the latest block hash from serai when checking if set is active");
sleep(Duration::from_secs(5)).await;
continue;
};
break serai.validator_sets();
};
let latest_session = loop {
let Ok(res) = serai.session(set.network).await else {
log::error!("couldn't get the latest session from serai when checking if set is active");
sleep(Duration::from_secs(5)).await;
continue;
};
// If the on-chain Session is None, then this Session is greater and therefore, for the
// purposes here, active
let Some(res) = res else { return true };
break res;
};
if latest_session.0 > set.session.0 {
// If we're on the Session after the Session after this Session, then this Session is
// definitively completed
if latest_session.0 > (set.session.0 + 1) {
return false;
} else {
// Since the next session has started, check its handover status
let keys = loop {
let Ok(res) = serai.keys(set).await else {
log::error!(
"couldn't get the keys for a session from serai when checking if set is active"
);
sleep(Duration::from_secs(5)).await;
continue;
};
break res;
};
// If the keys have been deleted, then this Tributary is retired
if keys.is_none() {
return false;
}
}
}
true
}
/// Gets the expected ID for the next Batch.
pub(crate) async fn get_expected_next_batch(serai: &Serai, network: NetworkId) -> u32 {
let mut first = true;

View file

@ -426,7 +426,7 @@ pub(crate) async fn handle_application_tx<
// Because this Batch has achieved synchrony, its batch ID should be authorized
TributaryDb::<D>::recognize_topic(txn, genesis, Topic::Batch(batch));
let nonce = NonceDecider::<D>::handle_batch(txn, genesis, batch);
recognized_id(spec.set().network, genesis, RecognizedIdType::Batch, batch, nonce).await;
recognized_id(spec.set(), genesis, RecognizedIdType::Batch, batch, nonce).await;
}
Transaction::SubstrateBlock(block) => {
@ -438,7 +438,7 @@ pub(crate) async fn handle_application_tx<
let nonces = NonceDecider::<D>::handle_substrate_block(txn, genesis, &plan_ids);
for (nonce, id) in nonces.into_iter().zip(plan_ids.into_iter()) {
TributaryDb::<D>::recognize_topic(txn, genesis, Topic::Sign(id));
recognized_id(spec.set().network, genesis, RecognizedIdType::Plan, id, nonce).await;
recognized_id(spec.set(), genesis, RecognizedIdType::Plan, id, nonce).await;
}
}

View file

@ -7,9 +7,7 @@ use ciphersuite::{Ciphersuite, Ristretto};
use tokio::sync::broadcast;
use serai_client::{
primitives::NetworkId, validator_sets::primitives::ValidatorSet, subxt::utils::Encoded, Serai,
};
use serai_client::{validator_sets::primitives::ValidatorSet, subxt::utils::Encoded, Serai};
use tributary::{
TransactionKind, Transaction as TributaryTransaction, Block, TributaryReader,
@ -36,10 +34,10 @@ pub enum RecognizedIdType {
}
pub(crate) trait RIDTrait<FRid>:
Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid
Clone + Fn(ValidatorSet, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid
{
}
impl<FRid, F: Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid>
impl<FRid, F: Clone + Fn(ValidatorSet, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid>
RIDTrait<FRid> for F
{
}

View file

@ -33,6 +33,13 @@ impl<'a> SeraiValidatorSets<'a> {
.await
}
pub async fn set_retired_events(&self) -> Result<Vec<ValidatorSetsEvent>, SeraiError> {
self
.0
.events::<ValidatorSets, _>(|event| matches!(event, ValidatorSetsEvent::SetRetired { .. }))
.await
}
pub async fn session(&self, network: NetworkId) -> Result<Option<Session>, SeraiError> {
self.0.storage(PALLET, "CurrentSession", Some(vec![scale_value(network)])).await
}

View file

@ -174,7 +174,10 @@ pub mod pallet {
// key is publishing `Batch`s. This should only happen once the current key has verified all
// `Batch`s published by the prior key, meaning they are accepting the hand-over.
if prior.is_some() && (!valid_by_prior) {
ValidatorSets::<T>::retire_session(network, Session(current_session.0 - 1));
ValidatorSets::<T>::retire_set(ValidatorSet {
network,
session: Session(current_session.0 - 1),
});
}
// check that this validator set isn't publishing a batch more than once per block

View file

@ -13,7 +13,10 @@ pub mod pallet {
use serai_primitives::{NetworkId, Amount, PublicKey};
use validator_sets_pallet::{primitives::Session, Config as VsConfig, Pallet as VsPallet};
use validator_sets_pallet::{
primitives::{Session, ValidatorSet},
Config as VsConfig, Pallet as VsPallet,
};
use pallet_session::{Config as SessionConfig, SessionManager};
#[pallet::error]
@ -183,7 +186,12 @@ pub mod pallet {
Some(VsPallet::<T>::select_validators(NetworkId::Serai))
}
fn end_session(_end_index: u32) {}
fn end_session(end_index: u32) {
VsPallet::<T>::retire_set(ValidatorSet {
network: NetworkId::Serai,
session: Session(end_index),
})
}
fn start_session(_start_index: u32) {}
}

View file

@ -235,6 +235,7 @@ pub mod pallet {
pub enum Event<T: Config> {
NewSet { set: ValidatorSet },
KeyGen { set: ValidatorSet, key_pair: KeyPair },
SetRetired { set: ValidatorSet },
}
impl<T: Config> Pallet<T> {
@ -592,10 +593,10 @@ pub mod pallet {
Self::participants(network).into()
}
pub fn retire_session(network: NetworkId, session: Session) {
let set = ValidatorSet { network, session };
pub fn retire_set(set: ValidatorSet) {
MuSigKeys::<T>::remove(set);
Keys::<T>::remove(set);
Pallet::<T>::deposit_event(Event::SetRetired { set });
}
/// Take the amount deallocatable.