diff --git a/coordinator/src/db.rs b/coordinator/src/db.rs index 8cc0f162..90df8192 100644 --- a/coordinator/src/db.rs +++ b/coordinator/src/db.rs @@ -37,6 +37,9 @@ impl MainDb { fn acive_tributaries_key() -> Vec { Self::main_key(b"active_tributaries", []) } + fn retired_tributary_key(set: ValidatorSet) -> Vec { + Self::main_key(b"retired_tributary", set.encode()) + } pub fn active_tributaries(getter: &G) -> (Vec, Vec) { let bytes = getter.get(Self::acive_tributaries_key()).unwrap_or(vec![]); let mut bytes_ref: &[u8] = bytes.as_ref(); @@ -60,6 +63,25 @@ impl MainDb { spec.write(&mut existing_bytes).unwrap(); txn.put(key, existing_bytes); } + pub fn retire_tributary(txn: &mut D::Transaction<'_>, set: ValidatorSet) { + let mut active = Self::active_tributaries(txn).1; + for i in 0 .. active.len() { + if active[i].set() == set { + active.remove(i); + break; + } + } + + let mut bytes = vec![]; + for active in active { + active.write(&mut bytes).unwrap(); + } + txn.put(Self::acive_tributaries_key(), bytes); + txn.put(Self::retired_tributary_key(set), []); + } + pub fn is_tributary_retired(getter: &G, set: ValidatorSet) -> bool { + getter.get(Self::retired_tributary_key(set)).is_some() + } fn signed_transaction_key(nonce: u32) -> Vec { Self::main_key(b"signed_transaction", nonce.to_le_bytes()) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 65c6203e..2e20a78d 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -50,7 +50,7 @@ pub mod processors; use processors::Processors; mod substrate; -use substrate::{SubstrateDb, is_active_set}; +use substrate::SubstrateDb; #[cfg(test)] pub mod tests; @@ -67,18 +67,18 @@ pub enum TributaryEvent { TributaryRetired(ValidatorSet), } +// TODO: Clean up the actual underlying Tributary/Tendermint tasks + // Creates a new tributary and sends it to all listeners. -// TODO: retire_tributary async fn add_tributary( db: D, key: Zeroizing<::F>, - serai: &Serai, processors: &Pro, p2p: P, tributaries: &broadcast::Sender>, spec: TributarySpec, ) { - if !is_active_set(serai, spec.set()).await { + if MainDb::::is_tributary_retired(&db, spec.set()) { log::info!("not adding tributary {:?} since it's been retired", spec.set()); } @@ -363,12 +363,10 @@ async fn handle_processor_message( // If we have a relevant Tributary, check it's actually still relevant and has yet to be retired if let Some(relevant_tributary_value) = relevant_tributary { - if !is_active_set( - serai, + if MainDb::::is_tributary_retired( + &txn, ValidatorSet { network: msg.network, session: relevant_tributary_value }, - ) - .await - { + ) { relevant_tributary = None; } } @@ -382,8 +380,8 @@ async fn handle_processor_message( let Some(ActiveTributary { spec, tributary }) = tributaries.get(&relevant_tributary) else { // Since we don't, sleep for a fraction of a second and return false, signaling we didn't // handle this message - // At the start of the loop which calls this function, we'll check for new tributaries, making - // this eventually resolve + // At the start of the loop which calls this function, we'll check for new tributaries, + // making this eventually resolve sleep(Duration::from_millis(100)).await; return false; }; @@ -659,25 +657,22 @@ async fn handle_processor_messages( let mut tributaries = HashMap::new(); loop { match tributary_event.try_recv() { - Ok(event) => { - match event { - TributaryEvent::NewTributary(tributary) => { - let set = tributary.spec.set(); - assert_eq!(set.network, network); - tributaries.insert(set.session, tributary); - } - // TOOD - TributaryEvent::TributaryRetired(_) => todo!(), + Ok(event) => match event { + TributaryEvent::NewTributary(tributary) => { + let set = tributary.spec.set(); + assert_eq!(set.network, network); + tributaries.insert(set.session, tributary); } - } + TributaryEvent::TributaryRetired(set) => { + tributaries.remove(&set.session); + } + }, Err(mpsc::error::TryRecvError::Empty) => {} Err(mpsc::error::TryRecvError::Disconnected) => { panic!("handle_processor_messages tributary_event sender closed") } } - // TODO: Remove the Tributary if it's retired - // TODO: Check this ID is sane (last handled ID or expected next ID) let msg = processors.recv(network).await; if handle_processor_message(&mut db, &key, &serai, &tributaries, network, &msg).await { @@ -716,8 +711,9 @@ pub async fn handle_processors( TributaryEvent::NewTributary(tributary) => channels[&tributary.spec.set().network] .send(TributaryEvent::NewTributary(tributary)) .unwrap(), - // TODO - TributaryEvent::TributaryRetired(_) => todo!(), + TributaryEvent::TributaryRetired(set) => { + channels[&set.network].send(TributaryEvent::TributaryRetired(set)).unwrap() + } }; } } @@ -737,6 +733,8 @@ pub async fn run( new_tributary_spec_send.send(spec).unwrap(); } + let (tributary_retired_send, mut tributary_retired_recv) = mpsc::unbounded_channel(); + // Handle new Substrate blocks tokio::spawn(crate::substrate::scan_task( raw_db.clone(), @@ -744,6 +742,7 @@ pub async fn run( processors.clone(), serai.clone(), new_tributary_spec_send, + tributary_retired_send, )); // Handle the Tributaries @@ -756,11 +755,21 @@ pub async fn run( let tributary_event_listener_4 = tributary_event.subscribe(); let tributary_event_listener_5 = tributary_event.subscribe(); + // Emit TributaryEvent::TributaryRetired + tokio::spawn({ + let tributary_event = tributary_event.clone(); + async move { + loop { + let retired = tributary_retired_recv.recv().await.unwrap(); + tributary_event.send(TributaryEvent::TributaryRetired(retired)).map_err(|_| ()).unwrap(); + } + } + }); + // Spawn a task to further add Tributaries as needed tokio::spawn({ let raw_db = raw_db.clone(); let key = key.clone(); - let serai = serai.clone(); let processors = processors.clone(); let p2p = p2p.clone(); async move { @@ -770,12 +779,11 @@ pub async fn run( tokio::spawn({ let raw_db = raw_db.clone(); let key = key.clone(); - let serai = serai.clone(); let processors = processors.clone(); let p2p = p2p.clone(); let tributary_event = tributary_event.clone(); async move { - add_tributary(raw_db, key, &serai, &processors, p2p, &tributary_event, spec).await; + add_tributary(raw_db, key, &processors, p2p, &tributary_event, spec).await; } }); } @@ -790,14 +798,19 @@ pub async fn run( let tributaries = Arc::new(RwLock::new(HashMap::new())); tokio::spawn({ let tributaries = tributaries.clone(); + let mut set_to_genesis = HashMap::new(); async move { loop { match tributary_event_listener_1.recv().await { Ok(TributaryEvent::NewTributary(tributary)) => { + set_to_genesis.insert(tributary.spec.set(), tributary.spec.genesis()); tributaries.write().await.insert(tributary.spec.genesis(), tributary.tributary); } - // TODO - Ok(TributaryEvent::TributaryRetired(_)) => todo!(), + Ok(TributaryEvent::TributaryRetired(set)) => { + if let Some(genesis) = set_to_genesis.remove(&set) { + tributaries.write().await.remove(&genesis); + } + } Err(broadcast::error::RecvError::Lagged(_)) => { panic!("recognized_id lagged to handle tributary_event") } @@ -807,7 +820,7 @@ pub async fn run( } }); - move |network, genesis, id_type, id, nonce| { + move |set: ValidatorSet, genesis, id_type, id, nonce| { let mut raw_db = raw_db.clone(); let key = key.clone(); let tributaries = tributaries.clone(); @@ -815,10 +828,11 @@ pub async fn run( // The transactions for these are fired before the preprocesses are actually // received/saved, creating a race between Tributary ack and the availability of all // Preprocesses - // This waits until the necessary preprocess is available + // This waits until the necessary preprocess is available 0, + // TODO: Incorporate RecognizedIdType here? let get_preprocess = |raw_db, id| async move { loop { - let Some(preprocess) = MainDb::::first_preprocess(raw_db, network, id) else { + let Some(preprocess) = MainDb::::first_preprocess(raw_db, set.network, id) else { sleep(Duration::from_millis(100)).await; continue; }; @@ -853,11 +867,19 @@ pub async fn run( let tributaries = tributaries.read().await; let Some(tributary) = tributaries.get(&genesis) else { + // If we don't have this Tributary because it's retired, break and move on + if MainDb::::is_tributary_retired(&raw_db, set) { + break; + } + // This may happen if the task above is simply slow log::warn!("tributary we don't have yet came to consensus on an Batch"); continue; }; - // This is safe to perform multiple times and solely needs atomicity within itself + // This is safe to perform multiple times and solely needs atomicity with regards to + // itself + // TODO: Should this not take a TXN accordingly? It's best practice to take a txn, yet + // taking a txn fails to declare its achieved independence let mut txn = raw_db.txn(); publish_signed_transaction(&mut txn, tributary, tx).await; txn.commit(); @@ -885,11 +907,7 @@ pub async fn run( tokio::spawn(p2p::heartbeat_tributaries_task(p2p.clone(), tributary_event_listener_3)); // Handle P2P messages - tokio::spawn(p2p::handle_p2p_task( - Ristretto::generator() * key.deref(), - p2p, - tributary_event_listener_4, - )); + tokio::spawn(p2p::handle_p2p_task(p2p, tributary_event_listener_4)); // Handle all messages from processors handle_processors(raw_db, key, serai, processors, tributary_event_listener_5).await; diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index cfd73010..52c1c5cc 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -8,8 +8,6 @@ use std::{ use async_trait::async_trait; -use ciphersuite::{Ciphersuite, Ristretto}; - use serai_db::Db; use tokio::{ @@ -434,19 +432,20 @@ pub async fn heartbeat_tributaries_task( } pub async fn handle_p2p_task( - our_key: ::G, p2p: P, mut tributary_event: broadcast::Receiver>, ) { - let channels = Arc::new(RwLock::new(HashMap::new())); + let channels = Arc::new(RwLock::new(HashMap::<_, mpsc::UnboundedSender>>::new())); tokio::spawn({ let p2p = p2p.clone(); let channels = channels.clone(); + let mut set_to_genesis = HashMap::new(); async move { loop { match tributary_event.recv().await.unwrap() { TributaryEvent::NewTributary(tributary) => { let genesis = tributary.spec.genesis(); + set_to_genesis.insert(tributary.spec.set(), genesis); let (send, mut recv) = mpsc::unbounded_channel(); channels.write().await.insert(genesis, send); @@ -455,7 +454,10 @@ pub async fn handle_p2p_task( let p2p = p2p.clone(); async move { loop { - let mut msg: Message

= recv.recv().await.unwrap(); + let Some(mut msg) = recv.recv().await else { + // Channel closure happens when the tributary retires + break; + }; match msg.kind { P2pMessageKind::KeepAlive => {} @@ -564,8 +566,11 @@ pub async fn handle_p2p_task( } }); } - // TODO - TributaryEvent::TributaryRetired(_) => todo!(), + TributaryEvent::TributaryRetired(set) => { + if let Some(genesis) = set_to_genesis.remove(&set) { + channels.write().await.remove(&genesis); + } + } } } } diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index 49b1d1a5..ce4e7c4a 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -260,6 +260,7 @@ async fn handle_block( db: &mut SubstrateDb, key: &Zeroizing<::F>, new_tributary_spec: &mpsc::UnboundedSender, + tributary_retired: &mpsc::UnboundedSender, processors: &Pro, serai: &Serai, block: Block, @@ -317,6 +318,26 @@ async fn handle_block( event_id += 1; } + for retired_set in serai.as_of(hash).validator_sets().set_retired_events().await? { + let ValidatorSetsEvent::SetRetired { set } = retired_set else { + panic!("SetRetired event wasn't SetRetired: {retired_set:?}"); + }; + + if set.network == NetworkId::Serai { + continue; + } + + if !SubstrateDb::::handled_event(&db.0, hash, event_id) { + log::info!("found fresh set retired event {:?}", retired_set); + let mut txn = db.0.txn(); + crate::MainDb::::retire_tributary(&mut txn, set); + tributary_retired.send(set).unwrap(); + SubstrateDb::::handle_event(&mut txn, hash, event_id); + txn.commit(); + } + event_id += 1; + } + // Finally, tell the processor of acknowledged blocks/burns // This uses a single event as. unlike prior events which individually executed code, all // following events share data collection @@ -336,6 +357,7 @@ async fn handle_new_blocks( db: &mut SubstrateDb, key: &Zeroizing<::F>, new_tributary_spec: &mpsc::UnboundedSender, + tributary_retired: &mpsc::UnboundedSender, processors: &Pro, serai: &Serai, next_block: &mut u64, @@ -354,6 +376,7 @@ async fn handle_new_blocks( db, key, new_tributary_spec, + tributary_retired, processors, serai, if b == latest_number { @@ -380,6 +403,7 @@ pub async fn scan_task( processors: Pro, serai: Arc, new_tributary_spec: mpsc::UnboundedSender, + tributary_retired: mpsc::UnboundedSender, ) { log::info!("scanning substrate"); @@ -431,6 +455,7 @@ pub async fn scan_task( &mut db, &key, &new_tributary_spec, + &tributary_retired, &processors, &serai, &mut next_substrate_block, @@ -446,58 +471,6 @@ pub async fn scan_task( } } -/// Returns if a ValidatorSet has yet to be retired. -pub async fn is_active_set(serai: &Serai, set: ValidatorSet) -> bool { - // TODO: Track this from the Substrate scanner to reduce our overhead? We'd only have a DB - // call, instead of a series of network requests - let serai = loop { - let Ok(serai) = serai.with_current_latest_block().await else { - log::error!("couldn't get the latest block hash from serai when checking if set is active"); - sleep(Duration::from_secs(5)).await; - continue; - }; - break serai.validator_sets(); - }; - - let latest_session = loop { - let Ok(res) = serai.session(set.network).await else { - log::error!("couldn't get the latest session from serai when checking if set is active"); - sleep(Duration::from_secs(5)).await; - continue; - }; - // If the on-chain Session is None, then this Session is greater and therefore, for the - // purposes here, active - let Some(res) = res else { return true }; - break res; - }; - - if latest_session.0 > set.session.0 { - // If we're on the Session after the Session after this Session, then this Session is - // definitively completed - if latest_session.0 > (set.session.0 + 1) { - return false; - } else { - // Since the next session has started, check its handover status - let keys = loop { - let Ok(res) = serai.keys(set).await else { - log::error!( - "couldn't get the keys for a session from serai when checking if set is active" - ); - sleep(Duration::from_secs(5)).await; - continue; - }; - break res; - }; - // If the keys have been deleted, then this Tributary is retired - if keys.is_none() { - return false; - } - } - } - - true -} - /// Gets the expected ID for the next Batch. pub(crate) async fn get_expected_next_batch(serai: &Serai, network: NetworkId) -> u32 { let mut first = true; diff --git a/coordinator/src/tributary/handle.rs b/coordinator/src/tributary/handle.rs index 7c92611d..030e47d6 100644 --- a/coordinator/src/tributary/handle.rs +++ b/coordinator/src/tributary/handle.rs @@ -426,7 +426,7 @@ pub(crate) async fn handle_application_tx< // Because this Batch has achieved synchrony, its batch ID should be authorized TributaryDb::::recognize_topic(txn, genesis, Topic::Batch(batch)); let nonce = NonceDecider::::handle_batch(txn, genesis, batch); - recognized_id(spec.set().network, genesis, RecognizedIdType::Batch, batch, nonce).await; + recognized_id(spec.set(), genesis, RecognizedIdType::Batch, batch, nonce).await; } Transaction::SubstrateBlock(block) => { @@ -438,7 +438,7 @@ pub(crate) async fn handle_application_tx< let nonces = NonceDecider::::handle_substrate_block(txn, genesis, &plan_ids); for (nonce, id) in nonces.into_iter().zip(plan_ids.into_iter()) { TributaryDb::::recognize_topic(txn, genesis, Topic::Sign(id)); - recognized_id(spec.set().network, genesis, RecognizedIdType::Plan, id, nonce).await; + recognized_id(spec.set(), genesis, RecognizedIdType::Plan, id, nonce).await; } } diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index a12a8dd5..b5b26314 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -7,9 +7,7 @@ use ciphersuite::{Ciphersuite, Ristretto}; use tokio::sync::broadcast; -use serai_client::{ - primitives::NetworkId, validator_sets::primitives::ValidatorSet, subxt::utils::Encoded, Serai, -}; +use serai_client::{validator_sets::primitives::ValidatorSet, subxt::utils::Encoded, Serai}; use tributary::{ TransactionKind, Transaction as TributaryTransaction, Block, TributaryReader, @@ -36,10 +34,10 @@ pub enum RecognizedIdType { } pub(crate) trait RIDTrait: - Clone + Fn(NetworkId, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid + Clone + Fn(ValidatorSet, [u8; 32], RecognizedIdType, [u8; 32], u32) -> FRid { } -impl FRid> +impl FRid> RIDTrait for F { } diff --git a/substrate/client/src/serai/validator_sets.rs b/substrate/client/src/serai/validator_sets.rs index 8555c090..8294c6c6 100644 --- a/substrate/client/src/serai/validator_sets.rs +++ b/substrate/client/src/serai/validator_sets.rs @@ -33,6 +33,13 @@ impl<'a> SeraiValidatorSets<'a> { .await } + pub async fn set_retired_events(&self) -> Result, SeraiError> { + self + .0 + .events::(|event| matches!(event, ValidatorSetsEvent::SetRetired { .. })) + .await + } + pub async fn session(&self, network: NetworkId) -> Result, SeraiError> { self.0.storage(PALLET, "CurrentSession", Some(vec![scale_value(network)])).await } diff --git a/substrate/in-instructions/pallet/src/lib.rs b/substrate/in-instructions/pallet/src/lib.rs index 4bb943ac..d7c67e45 100644 --- a/substrate/in-instructions/pallet/src/lib.rs +++ b/substrate/in-instructions/pallet/src/lib.rs @@ -174,7 +174,10 @@ pub mod pallet { // key is publishing `Batch`s. This should only happen once the current key has verified all // `Batch`s published by the prior key, meaning they are accepting the hand-over. if prior.is_some() && (!valid_by_prior) { - ValidatorSets::::retire_session(network, Session(current_session.0 - 1)); + ValidatorSets::::retire_set(ValidatorSet { + network, + session: Session(current_session.0 - 1), + }); } // check that this validator set isn't publishing a batch more than once per block diff --git a/substrate/staking/pallet/src/lib.rs b/substrate/staking/pallet/src/lib.rs index 1ec7d135..7c4fa730 100644 --- a/substrate/staking/pallet/src/lib.rs +++ b/substrate/staking/pallet/src/lib.rs @@ -13,7 +13,10 @@ pub mod pallet { use serai_primitives::{NetworkId, Amount, PublicKey}; - use validator_sets_pallet::{primitives::Session, Config as VsConfig, Pallet as VsPallet}; + use validator_sets_pallet::{ + primitives::{Session, ValidatorSet}, + Config as VsConfig, Pallet as VsPallet, + }; use pallet_session::{Config as SessionConfig, SessionManager}; #[pallet::error] @@ -183,7 +186,12 @@ pub mod pallet { Some(VsPallet::::select_validators(NetworkId::Serai)) } - fn end_session(_end_index: u32) {} + fn end_session(end_index: u32) { + VsPallet::::retire_set(ValidatorSet { + network: NetworkId::Serai, + session: Session(end_index), + }) + } fn start_session(_start_index: u32) {} } diff --git a/substrate/validator-sets/pallet/src/lib.rs b/substrate/validator-sets/pallet/src/lib.rs index 81611390..139850aa 100644 --- a/substrate/validator-sets/pallet/src/lib.rs +++ b/substrate/validator-sets/pallet/src/lib.rs @@ -235,6 +235,7 @@ pub mod pallet { pub enum Event { NewSet { set: ValidatorSet }, KeyGen { set: ValidatorSet, key_pair: KeyPair }, + SetRetired { set: ValidatorSet }, } impl Pallet { @@ -592,10 +593,10 @@ pub mod pallet { Self::participants(network).into() } - pub fn retire_session(network: NetworkId, session: Session) { - let set = ValidatorSet { network, session }; + pub fn retire_set(set: ValidatorSet) { MuSigKeys::::remove(set); Keys::::remove(set); + Pallet::::deposit_event(Event::SetRetired { set }); } /// Take the amount deallocatable.