diff --git a/substrate/tendermint/client/src/authority/gossip.rs b/substrate/tendermint/client/src/authority/gossip.rs index b8af1a08..e7317031 100644 --- a/substrate/tendermint/client/src/authority/gossip.rs +++ b/substrate/tendermint/client/src/authority/gossip.rs @@ -49,6 +49,8 @@ impl<T: TendermintValidator> Validator<T::Block> for TendermintGossip<T> { return ValidationResult::Discard; } + // Verify the signature here so we don't carry invalid messages in our gossip layer + // This will cause double verification of the signature, yet that's a minimal cost if !msg.verify_signature(&self.signature_scheme) { return ValidationResult::Discard; } diff --git a/substrate/tendermint/client/src/authority/mod.rs b/substrate/tendermint/client/src/authority/mod.rs index 7d910158..314d4369 100644 --- a/substrate/tendermint/client/src/authority/mod.rs +++ b/substrate/tendermint/client/src/authority/mod.rs @@ -5,9 +5,12 @@ use std::{ use async_trait::async_trait; -use log::warn; +use log::{warn, error}; -use tokio::task::yield_now; +use futures::{ + StreamExt, + channel::mpsc::{self, UnboundedSender}, +}; use sp_core::{Encode, Decode}; use sp_keystore::CryptoStore; @@ -52,13 +55,11 @@ use import_future::ImportFuture; struct ActiveAuthority<T: TendermintValidator> { signer: TendermintSigner<T>, - // Block whose gossip is being tracked - number: Arc<RwLock<u64>>, + // Notification channel for when we start a new number + new_number: UnboundedSender<u64>, // Outgoing message queue, placed here as the GossipEngine itself can't be - gossip_queue: Arc< - RwLock< - Vec<SignedMessage<u16, T::Block, <TendermintValidators<T> as SignatureScheme>::Signature>>, - >, + gossip: UnboundedSender< + SignedMessage<u16, T::Block, <TendermintValidators<T> as SignatureScheme>::Signature>, >, // Block producer @@ -148,21 +149,26 @@ impl<T: TendermintValidator> TendermintAuthority<T> { registry: Option<&Registry>, ) { let (best_hash, last) = self.get_last(); - let mut last_number = last.0 .0 + 1; + let new_number = last.0 .0 + 1; // Shared references between us and the Tendermint machine (and its actions via its Network // trait) - let number = Arc::new(RwLock::new(last_number)); - let gossip_queue = Arc::new(RwLock::new(vec![])); + let number = Arc::new(RwLock::new(new_number)); // Create the gossip network let mut gossip = GossipEngine::new( network.clone(), PROTOCOL_NAME, + protocol, Arc::new(TendermintGossip::new(number.clone(), self.import.validators.clone())), registry, ); + // This should only have a single value, yet a bounded channel with a capacity of 1 would cause + // a firm bound. It's not worth having a backlog crash the node since we aren't constrained + let (new_number_tx, mut new_number_rx) = mpsc::unbounded(); + let (gossip_tx, mut gossip_rx) = mpsc::unbounded(); + // Create the Tendermint machine let handle = { // Set this struct as active @@ -170,8 +176,8 @@ impl<T: TendermintValidator> TendermintAuthority<T> { self.active = Some(ActiveAuthority { signer: TendermintSigner(keys, self.import.validators.clone()), - number: number.clone(), - gossip_queue: gossip_queue.clone(), + new_number: new_number_tx, + gossip: gossip_tx, env, announce: network, @@ -186,55 +192,51 @@ impl<T: TendermintValidator> TendermintAuthority<T> { }; // Start receiving messages about the Tendermint process for this block - let mut recv = gossip.messages_for(TendermintGossip::<T>::topic(last_number)); + let mut recv = gossip.messages_for(TendermintGossip::<T>::topic(new_number)); - 'outer: loop { - // Send out any queued messages - let mut queue = gossip_queue.write().unwrap().drain(..).collect::<Vec<_>>(); - for msg in queue.drain(..) { - gossip.gossip_message(TendermintGossip::<T>::topic(msg.number().0), msg.encode(), false); - } + loop { + futures::select_biased! { + // GossipEngine closed down + _ = gossip => break, - // Handle any received messages - // This inner loop enables handling all pending messages before acquiring the out-queue lock - // again - // TODO: Move to a select model. The disadvantage of this is we'll more frequently acquire - // the above lock, despite lack of reason to do so - let _ = futures::poll!(&mut gossip); - 'inner: loop { - match recv.try_next() { - Ok(Some(msg)) => handle - .messages - .send(match SignedMessage::decode(&mut msg.message.as_ref()) { - Ok(msg) => msg, - Err(e) => { - warn!(target: "tendermint", "Couldn't decode valid message: {}", e); - continue; - } - }) - .await - .unwrap(), + // Machine reached a new height + new_number = new_number_rx.next() => { + if let Some(new_number) = new_number { + *number.write().unwrap() = new_number; + recv = gossip.messages_for(TendermintGossip::<T>::topic(new_number)); + } else { + break; + } + }, - // Ok(None) IS NOT when there aren't messages available. It's when the channel is closed - // If we're no longer receiving messages from the network, it must no longer be running - // We should no longer be accordingly - Ok(None) => break 'outer, + // Message to broadcast + msg = gossip_rx.next() => { + if let Some(msg) = msg { + let topic = TendermintGossip::<T>::topic(msg.number().0); + gossip.gossip_message(topic, msg.encode(), false); + } else { + break; + } + }, - // No messages available - Err(_) => { - // Check if we the block updated and should be listening on a different topic - let curr = *number.read().unwrap(); - if last_number != curr { - last_number = curr; - // TODO: Will this return existing messages on the new height? Or will those have - // been ignored and are now gone? - recv = gossip.messages_for(TendermintGossip::<T>::topic(last_number)); - } - - // If there are no messages available, yield to not hog the thread, then return to the - // outer loop - yield_now().await; - break 'inner; + // Received a message + msg = recv.next() => { + if let Some(msg) = msg { + handle + .messages + .send(match SignedMessage::decode(&mut msg.message.as_ref()) { + Ok(msg) => msg, + Err(e) => { + // This is guaranteed to be valid thanks to to the gossip validator, assuming + // that pipeline is correct. That's why this doesn't panic + error!(target: "tendermint", "Couldn't decode valid message: {}", e); + continue; + } + }) + .await + .unwrap() + } else { + break; } } } @@ -267,7 +269,13 @@ impl<T: TendermintValidator> Network for TendermintAuthority<T> { &mut self, msg: SignedMessage<u16, Self::Block, <TendermintValidators<T> as SignatureScheme>::Signature>, ) { - self.active.as_mut().unwrap().gossip_queue.write().unwrap().push(msg); + if self.active.as_mut().unwrap().gossip.unbounded_send(msg).is_err() { + warn!( + target: "tendermint", + "Attempted to broadcast a message except the gossip channel is closed. {}", + "Is the node shutting down?" + ); + } } async fn slash(&mut self, _validator: u16) { @@ -344,7 +352,18 @@ impl<T: TendermintValidator> Network for TendermintAuthority<T> { .finalize_block(BlockId::Hash(hash), Some(justification), true) .map_err(|_| Error::InvalidJustification) .unwrap(); - *self.active.as_mut().unwrap().number.write().unwrap() += 1; + + let number: u64 = match (*block.header().number()).try_into() { + Ok(number) => number, + Err(_) => panic!("BlockNumber exceeded u64"), + }; + if self.active.as_mut().unwrap().new_number.unbounded_send(number + 1).is_err() { + warn!( + target: "tendermint", + "Attempted to send a new number to the gossip handler except it's closed. {}", + "Is the node shutting down?" + ); + } self.active.as_ref().unwrap().announce.announce_block(hash, None); self.get_proposal(block.header()).await