Manually step the Tendermint machine when we synced a block over the network

This commit is contained in:
Luke Parker 2022-11-13 20:26:01 -05:00
parent 138866f64d
commit 8c51bc011d
No known key found for this signature in database
GPG key ID: F9F1386DB1E119B6
2 changed files with 71 additions and 32 deletions

View file

@ -10,6 +10,7 @@ use log::{warn, error};
use futures::{
SinkExt, StreamExt,
lock::Mutex,
channel::mpsc::{self, UnboundedSender},
};
@ -26,7 +27,7 @@ use sp_consensus::{Error, BlockOrigin, Proposer, Environment};
use sc_consensus::import_queue::IncomingBlock;
use sc_service::ImportQueue;
use sc_client_api::{BlockBackend, Finalizer};
use sc_client_api::{BlockBackend, Finalizer, BlockchainEvents};
use sc_network::{ProtocolName, NetworkBlock};
use sc_network_gossip::GossipEngine;
@ -64,7 +65,7 @@ struct ActiveAuthority<T: TendermintValidator> {
>,
// Block producer
env: T::Environment,
env: Arc<Mutex<T::Environment>>,
announce: T::Network,
}
@ -74,6 +75,35 @@ pub struct TendermintAuthority<T: TendermintValidator> {
active: Option<ActiveAuthority<T>>,
}
async fn get_proposal<T: TendermintValidator>(
env: &Arc<Mutex<T::Environment>>,
import: &TendermintImport<T>,
header: &<T::Block as Block>::Header,
stub: bool
) -> T::Block {
let proposer =
env.lock().await.init(header).await.expect("Failed to create a proposer for the new block");
proposer
.propose(
import.inherent_data(*header.parent_hash()).await,
Digest::default(),
if stub {
Duration::ZERO
} else {
// The first processing time is to build the block.
// The second is for it to be downloaded (assumes a block won't take longer to download
// than it'll take to process)
// The third is for it to actually be processed
Duration::from_secs((T::BLOCK_PROCESSING_TIME_IN_SECONDS / 3).into())
},
Some(T::PROPOSED_BLOCK_SIZE_LIMIT),
)
.await
.expect("Failed to crate a new block proposal")
.block
}
impl<T: TendermintValidator> TendermintAuthority<T> {
/// Create a new TendermintAuthority.
pub fn new(import: TendermintImport<T>) -> Self {
@ -114,32 +144,8 @@ impl<T: TendermintValidator> TendermintAuthority<T> {
)
}
pub(crate) async fn get_proposal(&mut self, header: &<T::Block as Block>::Header) -> T::Block {
let parent = *header.parent_hash();
let proposer = self
.active
.as_mut()
.unwrap()
.env
.init(header)
.await
.expect("Failed to create a proposer for the new block");
proposer
.propose(
self.import.inherent_data(parent).await,
Digest::default(),
// The first processing time is to build the block.
// The second is for it to be downloaded (assumes a block won't take longer to download
// than it'll take to process)
// The third is for it to actually be processed
Duration::from_secs((T::BLOCK_PROCESSING_TIME_IN_SECONDS / 3).into()),
Some(T::PROPOSED_BLOCK_SIZE_LIMIT),
)
.await
.expect("Failed to crate a new block proposal")
.block
async fn get_proposal(&mut self, header: &<T::Block as Block>::Header) -> T::Block {
get_proposal(&self.active.as_mut().unwrap().env, &self.import, header, false).await
}
/// Act as a network authority, proposing and voting on blocks. This should be spawned on a task
@ -175,6 +181,12 @@ impl<T: TendermintValidator> TendermintAuthority<T> {
let (new_number_tx, mut new_number_rx) = mpsc::unbounded();
let (gossip_tx, mut gossip_rx) = mpsc::unbounded();
// Clone the import object
let import = self.import.clone();
// Move the env into an Arc
let env = Arc::new(Mutex::new(env));
// Create the Tendermint machine
let TendermintHandle { mut step, mut messages, machine } = {
// Set this struct as active
@ -185,7 +197,7 @@ impl<T: TendermintValidator> TendermintAuthority<T> {
new_number: new_number_tx,
gossip: gossip_tx,
env,
env: env.clone(),
announce: network,
});
@ -201,16 +213,41 @@ impl<T: TendermintValidator> TendermintAuthority<T> {
// Start receiving messages about the Tendermint process for this block
let mut recv = gossip.messages_for(TendermintGossip::<T>::topic(new_number));
// Get finality events from Substrate
let mut finality = import.client.finality_notification_stream();
loop {
futures::select_biased! {
// GossipEngine closed down
_ = gossip => break,
// Synced a block from the network
notif = finality.next() => {
if let Some(notif) = notif {
let justifications = import.client.justifications(notif.hash).unwrap().unwrap();
step.send((
Commit::decode(&mut justifications.get(CONSENSUS_ID).unwrap().as_ref()).unwrap(),
// This will fail if syncing occurs radically faster than machine stepping takes
// TODO: Set true when initial syncing
get_proposal(&env, &import, &notif.header, false).await
)).await.unwrap();
let new_number = match (*notif.header.number()).try_into() {
Ok(number) => number,
Err(_) => panic!("BlockNumber exceeded u64"),
};
*number.write().unwrap() = new_number;
recv = gossip.messages_for(TendermintGossip::<T>::topic(new_number))
} else {
break;
}
},
// Machine reached a new height
new_number = new_number_rx.next() => {
if let Some(new_number) = new_number {
*number.write().unwrap() = new_number;
recv = gossip.messages_for(TendermintGossip::<T>::topic(new_number));
recv = gossip.messages_for(TendermintGossip::<T>::topic(new_number))
} else {
break;
}
@ -239,7 +276,7 @@ impl<T: TendermintValidator> TendermintAuthority<T> {
continue;
}
}
).await.unwrap()
).await.unwrap();
} else {
break;
}

View file

@ -7,7 +7,7 @@ use sp_blockchain::HeaderBackend;
use sp_api::{StateBackend, StateBackendFor, TransactionFor, ApiExt, ProvideRuntimeApi};
use sp_consensus::{Error, Environment};
use sc_client_api::{BlockBackend, Backend, Finalizer};
use sc_client_api::{BlockBackend, Backend, Finalizer, BlockchainEvents};
use sc_block_builder::BlockBuilderApi;
use sc_consensus::{BlockImport, BasicQueue};
@ -81,6 +81,7 @@ pub trait TendermintClient: Send + Sync + 'static {
+ BlockBackend<Self::Block>
+ BlockImport<Self::Block, Transaction = Self::BackendTransaction>
+ Finalizer<Self::Block, Self::Backend>
+ BlockchainEvents<Self::Block>
+ ProvideRuntimeApi<Self::Block, Api = Self::Api>
+ 'static;
}
@ -100,6 +101,7 @@ pub trait TendermintClientMinimal: Send + Sync + 'static {
+ BlockBackend<Self::Block>
+ BlockImport<Self::Block, Transaction = TransactionFor<Self::Client, Self::Block>>
+ Finalizer<Self::Block, Self::Backend>
+ BlockchainEvents<Self::Block>
+ ProvideRuntimeApi<Self::Block, Api = Self::Api>
+ 'static;
}