diff --git a/Cargo.lock b/Cargo.lock index f87293fb..346eb5d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7387,6 +7387,7 @@ name = "serai-consensus" version = "0.1.0" dependencies = [ "async-trait", + "futures", "log", "sc-basic-authorship", "sc-client-api", diff --git a/substrate/consensus/Cargo.toml b/substrate/consensus/Cargo.toml index 0de507e4..e5e398cc 100644 --- a/substrate/consensus/Cargo.toml +++ b/substrate/consensus/Cargo.toml @@ -17,6 +17,7 @@ async-trait = "0.1" log = "0.4" +futures = "0.3" tokio = { version = "1", features = ["sync", "rt"] } sp-core = { git = "https://github.com/serai-dex/substrate" } diff --git a/substrate/consensus/src/import_queue.rs b/substrate/consensus/src/import_queue.rs index 0a8d433a..0313b377 100644 --- a/substrate/consensus/src/import_queue.rs +++ b/substrate/consensus/src/import_queue.rs @@ -1,16 +1,15 @@ use std::{ pin::Pin, sync::{Arc, RwLock}, - task::{Poll, Context}, + task::{Poll, /* Wake, Waker, */ Context}, future::Future, + time::SystemTime, }; -use tokio::runtime::Handle; - use sp_inherents::CreateInherentDataProviders; use sp_runtime::traits::{Header, Block}; use sp_blockchain::HeaderBackend; -use sp_api::{TransactionFor, ProvideRuntimeApi}; +use sp_api::{BlockId, TransactionFor, ProvideRuntimeApi}; use sp_consensus::{Error, Environment}; use sc_consensus::{BlockImport, BlockImportStatus, BlockImportError, Link, BasicQueue}; @@ -20,6 +19,8 @@ use sc_client_api::{Backend, Finalizer}; use substrate_prometheus_endpoint::Registry; +use tendermint_machine::{ext::BlockNumber, TendermintMachine}; + use crate::tendermint::TendermintImport; pub type TendermintImportQueue = BasicQueue; @@ -84,16 +85,33 @@ pub fn import_queue< env: E, spawner: &impl sp_core::traits::SpawnEssentialNamed, registry: Option<&Registry>, -) -> TendermintImportQueue> +) -> (impl Future, TendermintImportQueue>) where I::Error: Into, TransactionFor: Send + Sync + 'static, { let import = TendermintImport::new(client, inner, providers, env); + + let authority = { + let machine_clone = import.machine.clone(); + let mut import_clone = import.clone(); + async move { + *machine_clone.write().unwrap() = Some(TendermintMachine::new( + import_clone.clone(), + // TODO + 0, + (BlockNumber(1), SystemTime::now()), + import_clone + .get_proposal(&import_clone.client.header(BlockId::Number(0u8.into())).unwrap().unwrap()) + .await, + )); + } + }; + let boxed = Box::new(import.clone()); let queue = || BasicQueue::new(import.clone(), boxed.clone(), Some(boxed.clone()), spawner, registry); - *Handle::current().block_on(import.queue.write()) = Some(queue()); - queue() + *futures::executor::block_on(import.queue.write()) = Some(queue()); + (authority, queue()) } diff --git a/substrate/consensus/src/justification_import.rs b/substrate/consensus/src/justification_import.rs index fb5b5b53..7245029b 100644 --- a/substrate/consensus/src/justification_import.rs +++ b/substrate/consensus/src/justification_import.rs @@ -9,14 +9,12 @@ use sp_blockchain::HeaderBackend; use sp_api::{TransactionFor, ProvideRuntimeApi}; use sp_consensus::{Error, Environment}; -use sc_consensus::{BlockImport, JustificationImport, BasicQueue}; +use sc_consensus::{BlockImport, JustificationImport}; use sc_client_api::{Backend, Finalizer}; use crate::tendermint::TendermintImport; -pub type TendermintImportQueue = BasicQueue; - #[async_trait] impl< B: Block, diff --git a/substrate/consensus/src/lib.rs b/substrate/consensus/src/lib.rs index 4d7233c9..e5c31ec8 100644 --- a/substrate/consensus/src/lib.rs +++ b/substrate/consensus/src/lib.rs @@ -1,7 +1,6 @@ -use std::sync::Arc; +use std::{sync::Arc, future::Future}; use sp_api::TransactionFor; -use sp_consensus::Error; use sc_executor::{NativeVersion, NativeExecutionDispatch, NativeElseWasmExecutor}; use sc_transaction_pool::FullPool; @@ -50,8 +49,8 @@ pub fn import_queue( client: Arc, pool: Arc>, registry: Option<&Registry>, -) -> Result>, Error> { - Ok(import_queue::import_queue( +) -> (impl Future, TendermintImportQueue>) { + import_queue::import_queue( client.clone(), client.clone(), Arc::new(|_, _| async { Ok(sp_timestamp::InherentDataProvider::from_system_time()) }), @@ -64,18 +63,7 @@ pub fn import_queue( ), &task_manager.spawn_essential_handle(), registry, - )) -} - -// If we're an authority, produce blocks -pub fn authority( - task_manager: &TaskManager, - client: Arc, - network: Arc::Hash>>, - pool: Arc>, - registry: Option<&Registry>, -) { - todo!() + ) } /* diff --git a/substrate/consensus/src/tendermint.rs b/substrate/consensus/src/tendermint.rs index bc1d1d57..cb88a77a 100644 --- a/substrate/consensus/src/tendermint.rs +++ b/substrate/consensus/src/tendermint.rs @@ -28,7 +28,7 @@ use sc_client_api::{Backend, Finalizer}; use tendermint_machine::{ ext::{BlockError, Commit, Network}, - SignedMessage, + SignedMessage, TendermintHandle, }; use crate::{ @@ -45,13 +45,16 @@ pub(crate) struct TendermintImport< I: Send + Sync + BlockImport> + 'static, CIDP: CreateInherentDataProviders + 'static, E: Send + Sync + Environment + 'static, -> { +> where + TransactionFor: Send + Sync + 'static, +{ _block: PhantomData, _backend: PhantomData, importing_block: Arc>>, + pub(crate) machine: Arc>>>, - client: Arc, + pub(crate) client: Arc, pub(crate) inner: Arc>, providers: Arc, @@ -67,6 +70,8 @@ impl< CIDP: CreateInherentDataProviders + 'static, E: Send + Sync + Environment + 'static, > Clone for TendermintImport +where + TransactionFor: Send + Sync + 'static, { fn clone(&self) -> Self { TendermintImport { @@ -74,6 +79,7 @@ impl< _backend: PhantomData, importing_block: self.importing_block.clone(), + machine: self.machine.clone(), client: self.client.clone(), inner: self.inner.clone(), @@ -107,6 +113,7 @@ where _backend: PhantomData, importing_block: Arc::new(RwLock::new(None)), + machine: Arc::new(RwLock::new(None)), client, inner: Arc::new(AsyncRwLock::new(inner)), @@ -233,28 +240,28 @@ where Ok(()) } - async fn get_proposal(&mut self, block: &B) -> B { - let inherent_data = match self.providers.create_inherent_data_providers(block.hash(), ()).await - { - Ok(providers) => match providers.create_inherent_data() { - Ok(data) => Some(data), + pub(crate) async fn get_proposal(&mut self, header: &B::Header) -> B { + let inherent_data = + match self.providers.create_inherent_data_providers(header.hash(), ()).await { + Ok(providers) => match providers.create_inherent_data() { + Ok(data) => Some(data), + Err(err) => { + warn!(target: "tendermint", "Failed to create inherent data: {}", err); + None + } + }, Err(err) => { - warn!(target: "tendermint", "Failed to create inherent data: {}", err); + warn!(target: "tendermint", "Failed to create inherent data providers: {}", err); None } - }, - Err(err) => { - warn!(target: "tendermint", "Failed to create inherent data providers: {}", err); - None } - } - .unwrap_or_else(InherentData::new); + .unwrap_or_else(InherentData::new); let proposer = self .env .write() .await - .init(block.header()) + .init(header) .await .expect("Failed to create a proposer for the new block"); // TODO: Production time, size limit @@ -355,6 +362,6 @@ where async fn add_block(&mut self, block: B, commit: Commit) -> B { self.import_justification_actual(block.hash(), (CONSENSUS_ID, commit.encode())).unwrap(); - self.get_proposal(&block).await + self.get_proposal(block.header()).await } } diff --git a/substrate/node/src/command.rs b/substrate/node/src/command.rs index c087d817..03fbc533 100644 --- a/substrate/node/src/command.rs +++ b/substrate/node/src/command.rs @@ -60,23 +60,23 @@ pub fn run() -> sc_cli::Result<()> { Some(Subcommand::CheckBlock(cmd)) => cli.create_runner(cmd)?.async_run(|config| { let PartialComponents { client, task_manager, import_queue, .. } = - service::new_partial(&config)?; + service::new_partial(&config)?.1; Ok((cmd.run(client, import_queue), task_manager)) }), Some(Subcommand::ExportBlocks(cmd)) => cli.create_runner(cmd)?.async_run(|config| { - let PartialComponents { client, task_manager, .. } = service::new_partial(&config)?; + let PartialComponents { client, task_manager, .. } = service::new_partial(&config)?.1; Ok((cmd.run(client, config.database), task_manager)) }), Some(Subcommand::ExportState(cmd)) => cli.create_runner(cmd)?.async_run(|config| { - let PartialComponents { client, task_manager, .. } = service::new_partial(&config)?; + let PartialComponents { client, task_manager, .. } = service::new_partial(&config)?.1; Ok((cmd.run(client, config.chain_spec), task_manager)) }), Some(Subcommand::ImportBlocks(cmd)) => cli.create_runner(cmd)?.async_run(|config| { let PartialComponents { client, task_manager, import_queue, .. } = - service::new_partial(&config)?; + service::new_partial(&config)?.1; Ok((cmd.run(client, import_queue), task_manager)) }), @@ -85,14 +85,15 @@ pub fn run() -> sc_cli::Result<()> { } Some(Subcommand::Revert(cmd)) => cli.create_runner(cmd)?.async_run(|config| { - let PartialComponents { client, task_manager, backend, .. } = service::new_partial(&config)?; + let PartialComponents { client, task_manager, backend, .. } = + service::new_partial(&config)?.1; Ok((cmd.run(client, backend, None), task_manager)) }), Some(Subcommand::Benchmark(cmd)) => cli.create_runner(cmd)?.sync_run(|config| match cmd { BenchmarkCmd::Pallet(cmd) => cmd.run::(config), - BenchmarkCmd::Block(cmd) => cmd.run(service::new_partial(&config)?.client), + BenchmarkCmd::Block(cmd) => cmd.run(service::new_partial(&config)?.1.client), #[cfg(not(feature = "runtime-benchmarks"))] BenchmarkCmd::Storage(_) => { @@ -101,12 +102,12 @@ pub fn run() -> sc_cli::Result<()> { #[cfg(feature = "runtime-benchmarks")] BenchmarkCmd::Storage(cmd) => { - let PartialComponents { client, backend, .. } = service::new_partial(&config)?; + let PartialComponents { client, backend, .. } = service::new_partial(&config)?.1; cmd.run(config, client, backend.expose_db(), backend.expose_storage()) } BenchmarkCmd::Overhead(cmd) => { - let client = service::new_partial(&config)?.client; + let client = service::new_partial(&config)?.1.client; cmd.run( config, client.clone(), @@ -117,7 +118,7 @@ pub fn run() -> sc_cli::Result<()> { } BenchmarkCmd::Extrinsic(cmd) => { - let client = service::new_partial(&config)?.client; + let client = service::new_partial(&config)?.1.client; cmd.run( client.clone(), inherent_benchmark_data()?, @@ -134,7 +135,7 @@ pub fn run() -> sc_cli::Result<()> { } None => cli.create_runner(&cli.run)?.run_node_until_exit(|config| async { - service::new_full(config).map_err(sc_cli::Error::Service) + service::new_full(config).await.map_err(sc_cli::Error::Service) }), } } diff --git a/substrate/node/src/service.rs b/substrate/node/src/service.rs index b97ce890..214ae0d6 100644 --- a/substrate/node/src/service.rs +++ b/substrate/node/src/service.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, future::Future}; use sc_service::{error::Error as ServiceError, Configuration, TaskManager}; use sc_executor::NativeElseWasmExecutor; @@ -19,7 +19,9 @@ type PartialComponents = sc_service::PartialComponents< Option, >; -pub fn new_partial(config: &Configuration) -> Result { +pub fn new_partial( + config: &Configuration, +) -> Result<(impl Future, PartialComponents), ServiceError> { if config.keystore_remote.is_some() { return Err(ServiceError::Other("Remote Keystores are not supported".to_string())); } @@ -63,38 +65,44 @@ pub fn new_partial(config: &Configuration) -> Result Result { - let sc_service::PartialComponents { - client, - backend, - mut task_manager, - import_queue, - keystore_container, - select_chain: _, - other: mut telemetry, - transaction_pool, - } = new_partial(&config)?; +pub async fn new_full(config: Configuration) -> Result { + let ( + authority, + sc_service::PartialComponents { + client, + backend, + mut task_manager, + import_queue, + keystore_container, + select_chain: _, + other: mut telemetry, + transaction_pool, + }, + ) = new_partial(&config)?; let (network, system_rpc_tx, tx_handler_controller, network_starter) = sc_service::build_network(sc_service::BuildNetworkParams { @@ -116,9 +124,6 @@ pub fn new_full(config: Configuration) -> Result { ); } - let role = config.role.clone(); - let prometheus_registry = config.prometheus_registry().cloned(); - let rpc_extensions_builder = { let client = client.clone(); let pool = transaction_pool.clone(); @@ -133,6 +138,8 @@ pub fn new_full(config: Configuration) -> Result { }) }; + let is_authority = config.role.is_authority(); + sc_service::spawn_tasks(sc_service::SpawnTasksParams { network: network.clone(), client: client.clone(), @@ -147,14 +154,8 @@ pub fn new_full(config: Configuration) -> Result { telemetry: telemetry.as_mut(), })?; - if role.is_authority() { - serai_consensus::authority( - &task_manager, - client, - network, - transaction_pool, - prometheus_registry.as_ref(), - ); + if is_authority { + authority.await; } network_starter.start_network();