diff --git a/substrate/node/src/service.rs b/substrate/node/src/service.rs index e0aa7e4b..20579990 100644 --- a/substrate/node/src/service.rs +++ b/substrate/node/src/service.rs @@ -184,7 +184,11 @@ pub async fn new_full(config: Configuration) -> Result> { +pub struct TendermintGossip> { number: Arc>, signature_scheme: Arc, } +impl> TendermintGossip { + pub(crate) fn new(number: Arc>, signature_scheme: Arc) -> TendermintGossip { + TendermintGossip { number, signature_scheme } + } + + pub(crate) fn topic(number: u64) -> B::Hash { + <::Hashing as Hash>::hash( + &[b"Tendermint Block Topic".as_ref(), &number.to_le_bytes()].concat(), + ) + } +} + impl> Validator for TendermintGossip { @@ -36,8 +48,6 @@ impl> Val return ValidationResult::Discard; } - ValidationResult::ProcessAndKeep(<::Hashing as Hash>::hash( - &[b"Tendermint Topic".as_ref(), &msg.number().0.to_le_bytes()].concat(), - )) + ValidationResult::ProcessAndKeep(Self::topic::(msg.number().0)) } } diff --git a/substrate/tendermint/client/src/lib.rs b/substrate/tendermint/client/src/lib.rs index 3343a3fd..7f2cdd70 100644 --- a/substrate/tendermint/client/src/lib.rs +++ b/substrate/tendermint/client/src/lib.rs @@ -7,6 +7,7 @@ use sp_api::{TransactionFor, ProvideRuntimeApi}; use sc_executor::{NativeVersion, NativeExecutionDispatch, NativeElseWasmExecutor}; use sc_transaction_pool::FullPool; +use sc_network::NetworkService; use sc_service::{TaskManager, TFullClient}; use substrate_prometheus_endpoint::Registry; @@ -26,7 +27,7 @@ mod verifier; mod import_queue; use import_queue::TendermintImportQueue; -mod gossip; +pub(crate) mod gossip; mod select_chain; pub use select_chain::TendermintSelectChain; @@ -85,6 +86,7 @@ impl> TendermintValidator for TendermintValidatorFirm { DisableProofRecording, >; + type Network = Arc::Hash>>; type Announce = A; } diff --git a/substrate/tendermint/client/src/tendermint.rs b/substrate/tendermint/client/src/tendermint.rs index 7b8051c4..54f9e6ca 100644 --- a/substrate/tendermint/client/src/tendermint.rs +++ b/substrate/tendermint/client/src/tendermint.rs @@ -8,7 +8,7 @@ use async_trait::async_trait; use log::warn; -use tokio::sync::RwLock as AsyncRwLock; +use tokio::{sync::RwLock as AsyncRwLock, task::yield_now}; use sp_core::{Encode, Decode, sr25519::Signature}; use sp_inherents::{InherentData, InherentDataProvider, CreateInherentDataProviders}; @@ -24,6 +24,9 @@ use sc_consensus::{ForkChoiceStrategy, BlockImportParams, import_queue::Incoming use sc_service::ImportQueue; use sc_client_api::{BlockBackend, Finalizer}; +use sc_network_gossip::GossipEngine; + +use substrate_prometheus_endpoint::Registry; use tendermint_machine::{ ext::{BlockError, BlockNumber, Commit, Network}, @@ -35,6 +38,7 @@ use crate::{ types::TendermintValidator, validators::TendermintValidators, import_queue::{ImportFuture, TendermintImportQueue}, + gossip::TendermintGossip, Announce, }; @@ -43,6 +47,7 @@ pub(crate) struct TendermintImport { validators: Arc>, + number: Arc>, importing_block: Arc::Hash>>>, pub(crate) machine: Arc>>>, @@ -57,7 +62,7 @@ pub(crate) struct TendermintImport { pub struct TendermintAuthority(pub(crate) TendermintImport); impl TendermintAuthority { - pub async fn validate(mut self) { + pub async fn validate(mut self, network: T::Network, registry: Option<&Registry>) { let info = self.0.client.info(); // Header::Number: TryInto doesn't implement Debug and can't be unwrapped @@ -84,13 +89,55 @@ impl TendermintAuthority { .get_proposal(&self.0.client.header(BlockId::Hash(info.best_hash)).unwrap().unwrap()) .await; - *self.0.machine.write().unwrap() = Some(TendermintMachine::new( + *self.0.number.write().unwrap() = last_number.0 + 1; + let mut gossip = GossipEngine::new( + network, + "tendermint", + Arc::new(TendermintGossip::new(self.0.number.clone(), self.0.validators.clone())), + registry, + ); + + let handle = TendermintMachine::new( self.0.clone(), // TODO 0, // ValidatorId (last_number, last_time), proposal, - )); + ); + + let mut last_number = last_number.0 + 1; + let mut recv = gossip + .messages_for(TendermintGossip::>::topic::(last_number)); + loop { + match recv.try_next() { + Ok(Some(msg)) => handle + .messages + .send(match SignedMessage::decode(&mut msg.message.as_ref()) { + Ok(msg) => msg, + Err(e) => { + warn!("couldn't decode valid message: {}", e); + continue; + } + }) + .await + .unwrap(), + Ok(None) => break, + // No messages available + Err(_) => { + // Check if we the block updated and should be listening on a different topic + let curr = *self.0.number.read().unwrap(); + if last_number != curr { + last_number = curr; + // TODO: Will this return existing messages on the new height? Or will those have been + // ignored and are now gone? + recv = gossip.messages_for(TendermintGossip::>::topic::< + T::Block, + >(last_number)); + } + yield_now().await; + } + } + } } } @@ -101,6 +148,7 @@ impl Clone for TendermintImport { validators: self.validators.clone(), + number: self.number.clone(), importing_block: self.importing_block.clone(), machine: self.machine.clone(), @@ -126,6 +174,7 @@ impl TendermintImport { validators: Arc::new(TendermintValidators::new(client.clone())), + number: Arc::new(RwLock::new(0)), importing_block: Arc::new(RwLock::new(None)), machine: Arc::new(RwLock::new(None)), @@ -388,6 +437,7 @@ impl Network for TendermintImport { .finalize_block(BlockId::Hash(hash), Some(justification), true) .map_err(|_| Error::InvalidJustification) .unwrap(); + *self.number.write().unwrap() += 1; self.announce.announce(hash); self.get_proposal(block.header()).await diff --git a/substrate/tendermint/client/src/types.rs b/substrate/tendermint/client/src/types.rs index a21ae977..5b2ce8ba 100644 --- a/substrate/tendermint/client/src/types.rs +++ b/substrate/tendermint/client/src/types.rs @@ -8,6 +8,7 @@ use sp_consensus::Environment; use sc_consensus::BlockImport; use sc_client_api::{BlockBackend, Backend, Finalizer}; +use sc_network_gossip::Network; use sp_tendermint::TendermintApi; @@ -71,5 +72,6 @@ pub trait TendermintValidator: TendermintClient { type CIDP: CreateInherentDataProviders + 'static; type Environment: Send + Sync + Environment + 'static; + type Network: Clone + Send + Sync + Network + 'static; type Announce: Announce; }