Provide a way to create the machine

The BasicQueue returned obscures the TendermintImport struct. 
Accordingly, a Future scoped with access is returned upwards, which when 
awaited will create the machine. This makes creating the machine 
optional while maintaining scope boundaries.

Is sufficient to create a 1-node net which produces and finalizes 
blocks.
This commit is contained in:
Luke Parker 2022-10-22 03:41:49 -04:00
parent 39984bd07b
commit 9b0dca06d0
No known key found for this signature in database
GPG key ID: F9F1386DB1E119B6
8 changed files with 104 additions and 89 deletions

1
Cargo.lock generated
View file

@ -7387,6 +7387,7 @@ name = "serai-consensus"
version = "0.1.0"
dependencies = [
"async-trait",
"futures",
"log",
"sc-basic-authorship",
"sc-client-api",

View file

@ -17,6 +17,7 @@ async-trait = "0.1"
log = "0.4"
futures = "0.3"
tokio = { version = "1", features = ["sync", "rt"] }
sp-core = { git = "https://github.com/serai-dex/substrate" }

View file

@ -1,16 +1,15 @@
use std::{
pin::Pin,
sync::{Arc, RwLock},
task::{Poll, Context},
task::{Poll, /* Wake, Waker, */ Context},
future::Future,
time::SystemTime,
};
use tokio::runtime::Handle;
use sp_inherents::CreateInherentDataProviders;
use sp_runtime::traits::{Header, Block};
use sp_blockchain::HeaderBackend;
use sp_api::{TransactionFor, ProvideRuntimeApi};
use sp_api::{BlockId, TransactionFor, ProvideRuntimeApi};
use sp_consensus::{Error, Environment};
use sc_consensus::{BlockImport, BlockImportStatus, BlockImportError, Link, BasicQueue};
@ -20,6 +19,8 @@ use sc_client_api::{Backend, Finalizer};
use substrate_prometheus_endpoint::Registry;
use tendermint_machine::{ext::BlockNumber, TendermintMachine};
use crate::tendermint::TendermintImport;
pub type TendermintImportQueue<Block, Transaction> = BasicQueue<Block, Transaction>;
@ -84,16 +85,33 @@ pub fn import_queue<
env: E,
spawner: &impl sp_core::traits::SpawnEssentialNamed,
registry: Option<&Registry>,
) -> TendermintImportQueue<B, TransactionFor<C, B>>
) -> (impl Future<Output = ()>, TendermintImportQueue<B, TransactionFor<C, B>>)
where
I::Error: Into<Error>,
TransactionFor<C, B>: Send + Sync + 'static,
{
let import = TendermintImport::new(client, inner, providers, env);
let authority = {
let machine_clone = import.machine.clone();
let mut import_clone = import.clone();
async move {
*machine_clone.write().unwrap() = Some(TendermintMachine::new(
import_clone.clone(),
// TODO
0,
(BlockNumber(1), SystemTime::now()),
import_clone
.get_proposal(&import_clone.client.header(BlockId::Number(0u8.into())).unwrap().unwrap())
.await,
));
}
};
let boxed = Box::new(import.clone());
let queue =
|| BasicQueue::new(import.clone(), boxed.clone(), Some(boxed.clone()), spawner, registry);
*Handle::current().block_on(import.queue.write()) = Some(queue());
queue()
*futures::executor::block_on(import.queue.write()) = Some(queue());
(authority, queue())
}

View file

@ -9,14 +9,12 @@ use sp_blockchain::HeaderBackend;
use sp_api::{TransactionFor, ProvideRuntimeApi};
use sp_consensus::{Error, Environment};
use sc_consensus::{BlockImport, JustificationImport, BasicQueue};
use sc_consensus::{BlockImport, JustificationImport};
use sc_client_api::{Backend, Finalizer};
use crate::tendermint::TendermintImport;
pub type TendermintImportQueue<Block, Transaction> = BasicQueue<Block, Transaction>;
#[async_trait]
impl<
B: Block,

View file

@ -1,7 +1,6 @@
use std::sync::Arc;
use std::{sync::Arc, future::Future};
use sp_api::TransactionFor;
use sp_consensus::Error;
use sc_executor::{NativeVersion, NativeExecutionDispatch, NativeElseWasmExecutor};
use sc_transaction_pool::FullPool;
@ -50,8 +49,8 @@ pub fn import_queue(
client: Arc<FullClient>,
pool: Arc<FullPool<Block, FullClient>>,
registry: Option<&Registry>,
) -> Result<TendermintImportQueue<Block, TransactionFor<FullClient, Block>>, Error> {
Ok(import_queue::import_queue(
) -> (impl Future<Output = ()>, TendermintImportQueue<Block, TransactionFor<FullClient, Block>>) {
import_queue::import_queue(
client.clone(),
client.clone(),
Arc::new(|_, _| async { Ok(sp_timestamp::InherentDataProvider::from_system_time()) }),
@ -64,18 +63,7 @@ pub fn import_queue(
),
&task_manager.spawn_essential_handle(),
registry,
))
}
// If we're an authority, produce blocks
pub fn authority(
task_manager: &TaskManager,
client: Arc<FullClient>,
network: Arc<sc_network::NetworkService<Block, <Block as sp_runtime::traits::Block>::Hash>>,
pool: Arc<FullPool<Block, FullClient>>,
registry: Option<&Registry>,
) {
todo!()
)
}
/*

View file

@ -28,7 +28,7 @@ use sc_client_api::{Backend, Finalizer};
use tendermint_machine::{
ext::{BlockError, Commit, Network},
SignedMessage,
SignedMessage, TendermintHandle,
};
use crate::{
@ -45,13 +45,16 @@ pub(crate) struct TendermintImport<
I: Send + Sync + BlockImport<B, Transaction = TransactionFor<C, B>> + 'static,
CIDP: CreateInherentDataProviders<B, ()> + 'static,
E: Send + Sync + Environment<B> + 'static,
> {
> where
TransactionFor<C, B>: Send + Sync + 'static,
{
_block: PhantomData<B>,
_backend: PhantomData<Be>,
importing_block: Arc<RwLock<Option<B::Hash>>>,
pub(crate) machine: Arc<RwLock<Option<TendermintHandle<Self>>>>,
client: Arc<C>,
pub(crate) client: Arc<C>,
pub(crate) inner: Arc<AsyncRwLock<I>>,
providers: Arc<CIDP>,
@ -67,6 +70,8 @@ impl<
CIDP: CreateInherentDataProviders<B, ()> + 'static,
E: Send + Sync + Environment<B> + 'static,
> Clone for TendermintImport<B, Be, C, I, CIDP, E>
where
TransactionFor<C, B>: Send + Sync + 'static,
{
fn clone(&self) -> Self {
TendermintImport {
@ -74,6 +79,7 @@ impl<
_backend: PhantomData,
importing_block: self.importing_block.clone(),
machine: self.machine.clone(),
client: self.client.clone(),
inner: self.inner.clone(),
@ -107,6 +113,7 @@ where
_backend: PhantomData,
importing_block: Arc::new(RwLock::new(None)),
machine: Arc::new(RwLock::new(None)),
client,
inner: Arc::new(AsyncRwLock::new(inner)),
@ -233,28 +240,28 @@ where
Ok(())
}
async fn get_proposal(&mut self, block: &B) -> B {
let inherent_data = match self.providers.create_inherent_data_providers(block.hash(), ()).await
{
Ok(providers) => match providers.create_inherent_data() {
Ok(data) => Some(data),
pub(crate) async fn get_proposal(&mut self, header: &B::Header) -> B {
let inherent_data =
match self.providers.create_inherent_data_providers(header.hash(), ()).await {
Ok(providers) => match providers.create_inherent_data() {
Ok(data) => Some(data),
Err(err) => {
warn!(target: "tendermint", "Failed to create inherent data: {}", err);
None
}
},
Err(err) => {
warn!(target: "tendermint", "Failed to create inherent data: {}", err);
warn!(target: "tendermint", "Failed to create inherent data providers: {}", err);
None
}
},
Err(err) => {
warn!(target: "tendermint", "Failed to create inherent data providers: {}", err);
None
}
}
.unwrap_or_else(InherentData::new);
.unwrap_or_else(InherentData::new);
let proposer = self
.env
.write()
.await
.init(block.header())
.init(header)
.await
.expect("Failed to create a proposer for the new block");
// TODO: Production time, size limit
@ -355,6 +362,6 @@ where
async fn add_block(&mut self, block: B, commit: Commit<TendermintSigner>) -> B {
self.import_justification_actual(block.hash(), (CONSENSUS_ID, commit.encode())).unwrap();
self.get_proposal(&block).await
self.get_proposal(block.header()).await
}
}

View file

@ -60,23 +60,23 @@ pub fn run() -> sc_cli::Result<()> {
Some(Subcommand::CheckBlock(cmd)) => cli.create_runner(cmd)?.async_run(|config| {
let PartialComponents { client, task_manager, import_queue, .. } =
service::new_partial(&config)?;
service::new_partial(&config)?.1;
Ok((cmd.run(client, import_queue), task_manager))
}),
Some(Subcommand::ExportBlocks(cmd)) => cli.create_runner(cmd)?.async_run(|config| {
let PartialComponents { client, task_manager, .. } = service::new_partial(&config)?;
let PartialComponents { client, task_manager, .. } = service::new_partial(&config)?.1;
Ok((cmd.run(client, config.database), task_manager))
}),
Some(Subcommand::ExportState(cmd)) => cli.create_runner(cmd)?.async_run(|config| {
let PartialComponents { client, task_manager, .. } = service::new_partial(&config)?;
let PartialComponents { client, task_manager, .. } = service::new_partial(&config)?.1;
Ok((cmd.run(client, config.chain_spec), task_manager))
}),
Some(Subcommand::ImportBlocks(cmd)) => cli.create_runner(cmd)?.async_run(|config| {
let PartialComponents { client, task_manager, import_queue, .. } =
service::new_partial(&config)?;
service::new_partial(&config)?.1;
Ok((cmd.run(client, import_queue), task_manager))
}),
@ -85,14 +85,15 @@ pub fn run() -> sc_cli::Result<()> {
}
Some(Subcommand::Revert(cmd)) => cli.create_runner(cmd)?.async_run(|config| {
let PartialComponents { client, task_manager, backend, .. } = service::new_partial(&config)?;
let PartialComponents { client, task_manager, backend, .. } =
service::new_partial(&config)?.1;
Ok((cmd.run(client, backend, None), task_manager))
}),
Some(Subcommand::Benchmark(cmd)) => cli.create_runner(cmd)?.sync_run(|config| match cmd {
BenchmarkCmd::Pallet(cmd) => cmd.run::<Block, service::ExecutorDispatch>(config),
BenchmarkCmd::Block(cmd) => cmd.run(service::new_partial(&config)?.client),
BenchmarkCmd::Block(cmd) => cmd.run(service::new_partial(&config)?.1.client),
#[cfg(not(feature = "runtime-benchmarks"))]
BenchmarkCmd::Storage(_) => {
@ -101,12 +102,12 @@ pub fn run() -> sc_cli::Result<()> {
#[cfg(feature = "runtime-benchmarks")]
BenchmarkCmd::Storage(cmd) => {
let PartialComponents { client, backend, .. } = service::new_partial(&config)?;
let PartialComponents { client, backend, .. } = service::new_partial(&config)?.1;
cmd.run(config, client, backend.expose_db(), backend.expose_storage())
}
BenchmarkCmd::Overhead(cmd) => {
let client = service::new_partial(&config)?.client;
let client = service::new_partial(&config)?.1.client;
cmd.run(
config,
client.clone(),
@ -117,7 +118,7 @@ pub fn run() -> sc_cli::Result<()> {
}
BenchmarkCmd::Extrinsic(cmd) => {
let client = service::new_partial(&config)?.client;
let client = service::new_partial(&config)?.1.client;
cmd.run(
client.clone(),
inherent_benchmark_data()?,
@ -134,7 +135,7 @@ pub fn run() -> sc_cli::Result<()> {
}
None => cli.create_runner(&cli.run)?.run_node_until_exit(|config| async {
service::new_full(config).map_err(sc_cli::Error::Service)
service::new_full(config).await.map_err(sc_cli::Error::Service)
}),
}
}

View file

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, future::Future};
use sc_service::{error::Error as ServiceError, Configuration, TaskManager};
use sc_executor::NativeElseWasmExecutor;
@ -19,7 +19,9 @@ type PartialComponents = sc_service::PartialComponents<
Option<Telemetry>,
>;
pub fn new_partial(config: &Configuration) -> Result<PartialComponents, ServiceError> {
pub fn new_partial(
config: &Configuration,
) -> Result<(impl Future<Output = ()>, PartialComponents), ServiceError> {
if config.keystore_remote.is_some() {
return Err(ServiceError::Other("Remote Keystores are not supported".to_string()));
}
@ -63,38 +65,44 @@ pub fn new_partial(config: &Configuration) -> Result<PartialComponents, ServiceE
client.clone(),
);
let import_queue = serai_consensus::import_queue(
let (authority, import_queue) = serai_consensus::import_queue(
&task_manager,
client.clone(),
transaction_pool.clone(),
config.prometheus_registry(),
)?;
);
let select_chain = serai_consensus::TendermintSelectChain::new(backend.clone());
Ok(sc_service::PartialComponents {
client,
backend,
task_manager,
import_queue,
keystore_container,
select_chain,
transaction_pool,
other: telemetry,
})
Ok((
authority,
sc_service::PartialComponents {
client,
backend,
task_manager,
import_queue,
keystore_container,
select_chain,
transaction_pool,
other: telemetry,
},
))
}
pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
let sc_service::PartialComponents {
client,
backend,
mut task_manager,
import_queue,
keystore_container,
select_chain: _,
other: mut telemetry,
transaction_pool,
} = new_partial(&config)?;
pub async fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
let (
authority,
sc_service::PartialComponents {
client,
backend,
mut task_manager,
import_queue,
keystore_container,
select_chain: _,
other: mut telemetry,
transaction_pool,
},
) = new_partial(&config)?;
let (network, system_rpc_tx, tx_handler_controller, network_starter) =
sc_service::build_network(sc_service::BuildNetworkParams {
@ -116,9 +124,6 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
);
}
let role = config.role.clone();
let prometheus_registry = config.prometheus_registry().cloned();
let rpc_extensions_builder = {
let client = client.clone();
let pool = transaction_pool.clone();
@ -133,6 +138,8 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
})
};
let is_authority = config.role.is_authority();
sc_service::spawn_tasks(sc_service::SpawnTasksParams {
network: network.clone(),
client: client.clone(),
@ -147,14 +154,8 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
telemetry: telemetry.as_mut(),
})?;
if role.is_authority() {
serai_consensus::authority(
&task_manager,
client,
network,
transaction_pool,
prometheus_registry.as_ref(),
);
if is_authority {
authority.await;
}
network_starter.start_network();