From 1a7df1e355bb19767d6eb1b5dc91ff119badce97 Mon Sep 17 00:00:00 2001 From: hinto-janaiyo Date: Sat, 3 Dec 2022 16:02:34 -0500 Subject: [PATCH] helper: turn [Helper] fields into [Arc]'s, add p2pool watchdog --- src/helper.rs | 112 +++++++++++++++++++++++++------------------------- 1 file changed, 57 insertions(+), 55 deletions(-) diff --git a/src/helper.rs b/src/helper.rs index b11498c..b0b9b10 100644 --- a/src/helper.rs +++ b/src/helper.rs @@ -38,7 +38,8 @@ use std::{ sync::{Arc,Mutex}, path::PathBuf, - process::Command, + process::{Command,Stdio}, + fmt::Write, time::*, thread, }; @@ -50,14 +51,14 @@ use log::*; //---------------------------------------------------------------------------------------------------- [Helper] Struct // A meta struct holding all the data that gets processed in this thread pub struct Helper { - pub instant: Instant, // Gupax start as an [Instant] - pub human_time: HumanTime, // Gupax uptime formatting for humans - pub p2pool: Process, // P2Pool process state - pub xmrig: Process, // XMRig process state - pub pub_api_p2pool: PubP2poolApi, // P2Pool API state (for GUI thread) - pub pub_api_xmrig: PubXmrigApi, // XMRig API state (for GUI thread) - priv_api_p2pool: PrivP2poolApi, // For "watchdog" thread - priv_api_xmrig: PrivXmrigApi, // For "watchdog" thread + pub instant: Instant, // Gupax start as an [Instant] + pub human_time: HumanTime, // Gupax uptime formatting for humans + pub p2pool: Arc>, // P2Pool process state + pub xmrig: Arc>, // XMRig process state + pub pub_api_p2pool: Arc>, // P2Pool API state (for GUI thread) + pub pub_api_xmrig: Arc>, // XMRig API state (for GUI thread) + priv_api_p2pool: Arc>, // For "watchdog" thread + priv_api_xmrig: Arc>, // For "watchdog" thread } // Impl found at the very bottom of this file. @@ -328,41 +329,48 @@ impl Helper { Self { instant, human_time: HumanTime::into_human(instant.elapsed()), - p2pool: Process::new(ProcessName::P2pool, String::new(), PathBuf::new()), - xmrig: Process::new(ProcessName::Xmrig, String::new(), PathBuf::new()), - pub_api_p2pool: PubP2poolApi::new(), - pub_api_xmrig: PubXmrigApi::new(), - priv_api_p2pool: PrivP2poolApi::new(), - priv_api_xmrig: PrivXmrigApi::new(), + p2pool: Arc::new(Mutex::new(Process::new(ProcessName::P2pool, String::new(), PathBuf::new()))), + xmrig: Arc::new(Mutex::new(Process::new(ProcessName::Xmrig, String::new(), PathBuf::new()))), + pub_api_p2pool: Arc::new(Mutex::new(PubP2poolApi::new())), + pub_api_xmrig: Arc::new(Mutex::new(PubXmrigApi::new())), + priv_api_p2pool: Arc::new(Mutex::new(PrivP2poolApi::new())), + priv_api_xmrig: Arc::new(Mutex::new(PrivXmrigApi::new())), } } // The tokio runtime that blocks while async reading both STDOUT/STDERR // Cheaper than spawning 2 OS threads just to read 2 pipes (...right? :D) #[tokio::main] - async fn read_stdout_stderr(stdout: tokio::process::ChildStdout, stderr: tokio::process::ChildStderr) { + async fn read_stdout_stderr(process: &Arc>, stdout: tokio::process::ChildStdout, stderr: tokio::process::ChildStderr) { + let process_stdout = Arc::clone(process); + let process_stderr = Arc::clone(process); // Create STDOUT pipe job let stdout_job = tokio::spawn(async move { let mut stdout_reader = BufReader::new(stdout).lines(); while let Ok(Some(line)) = stdout_reader.next_line().await { - println!("{}", line); + writeln!(process_stdout.lock().unwrap().output, "{}", line); } }); // Create STDERR pipe job let stderr_job = tokio::spawn(async move { let mut stderr_reader = BufReader::new(stderr).lines(); while let Ok(Some(line)) = stderr_reader.next_line().await { - println!("{}", line); + writeln!(process_stderr.lock().unwrap().output, "{}", line); } }); // Block and read both until they are closed (automatic when process dies) + // The ordering of STDOUT/STDERR should be automatic thanks to the locks. tokio::join![stdout_job, stderr_job]; } //---------------------------------------------------------------------------------------------------- P2Pool specific // Intermediate function that parses the arguments, and spawns the P2Pool watchdog thread. - pub fn spawn_p2pool(state: &crate::disk::P2pool, api_path: &std::path::Path) { + pub fn spawn_p2pool(helper: &Arc>, state: &crate::disk::P2pool, path: std::path::PathBuf) { let mut args = Vec::with_capacity(500); + let path = path.clone(); + let mut api_path = path.clone(); + api_path.pop(); + // [Simple] if state.simple { // Build the p2pool argument @@ -403,17 +411,43 @@ impl Helper { crate::disk::print_dash(&format!("P2Pool | Launch arguments ... {:#?}", args)); // Spawn watchdog thread + let process = Arc::clone(&helper.lock().unwrap().p2pool); + let pub_api = Arc::clone(&helper.lock().unwrap().pub_api_p2pool); + let priv_api = Arc::clone(&helper.lock().unwrap().priv_api_p2pool); thread::spawn(move || { - Self::spawn_p2pool_watchdog(args); + Self::spawn_p2pool_watchdog(process, pub_api, priv_api, args, path); }); } // The actual P2Pool watchdog tokio runtime. #[tokio::main] - pub async fn spawn_p2pool_watchdog(args: Vec) { + async fn spawn_p2pool_watchdog(process: Arc>, pub_api: Arc>, priv_api: Arc>, args: Vec, path: std::path::PathBuf) { // 1. Create command - // 2. Spawn STDOUT/STDERR thread - // 3. Loop forever as watchdog until process dies + let mut child = tokio::process::Command::new(path) + .args(args) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .stdin(Stdio::piped()) + .spawn().unwrap(); + + // 2. Set start time + let now = Instant::now(); + process.lock().unwrap().start = now; + process.lock().unwrap().uptime = HumanTime::into_human(now.elapsed()); + + // 3. Spawn STDOUT/STDERR thread + thread::spawn(move || { + Self::read_stdout_stderr(&process, child.stdout.take().unwrap(), child.stderr.take().unwrap()); + }); + + // 4. Loop forever as watchdog until process dies + loop { + // a. Watch user SIGNAL + // b. Create STDIN task + // c. Create API task + // d. Execute async tasks + // e. Sleep (900ms) + } } //---------------------------------------------------------------------------------------------------- XMRig specific @@ -480,38 +514,6 @@ impl Helper { // 2. Loop init timestamp let start = Instant::now(); - // 3. Spawn child processes (if signal found) - let h = helper.lock().unwrap(); - if let ProcessSignal::Start = h.p2pool.signal { - // Start outer thread, start inner stdout/stderr pipe, loop in outer thread for stdin/signal/etc - if !h.p2pool.input.is_empty() { - // Process STDIN - } - } - drop(h); - let h = helper.lock().unwrap(); - if let ProcessSignal::Start = h.xmrig.signal { - // Start outer thread, start inner stdout/stderr pipe, loop in outer thread for stdin/signal/etc - if !h.xmrig.input.is_empty() { - // Process STDIN - } - } - drop(h); - - // 4. Collect P2Pool API task (if alive) - let h = helper.lock().unwrap(); - if let ProcessState::Alive = h.p2pool.state { - } - // 5. Collect XMRig HTTP API task (if alive) - if let ProcessState::Alive = h.xmrig.state { - } - drop(h); - - // 6. Execute all async tasks -// for job in jobs { -// job.await; -// } - // 7. Set Gupax/P2Pool/XMRig uptime let mut h = helper.lock().unwrap(); h.human_time = HumanTime::into_human(h.instant.elapsed());