helper: turn [Helper] fields into [Arc]'s, add p2pool watchdog

This commit is contained in:
hinto-janaiyo 2022-12-03 16:02:34 -05:00
parent 0a8deee359
commit 1a7df1e355
No known key found for this signature in database
GPG key ID: B1C5A64B80691E45

View file

@ -38,7 +38,8 @@
use std::{
sync::{Arc,Mutex},
path::PathBuf,
process::Command,
process::{Command,Stdio},
fmt::Write,
time::*,
thread,
};
@ -52,12 +53,12 @@ use log::*;
pub struct Helper {
pub instant: Instant, // Gupax start as an [Instant]
pub human_time: HumanTime, // Gupax uptime formatting for humans
pub p2pool: Process, // P2Pool process state
pub xmrig: Process, // XMRig process state
pub pub_api_p2pool: PubP2poolApi, // P2Pool API state (for GUI thread)
pub pub_api_xmrig: PubXmrigApi, // XMRig API state (for GUI thread)
priv_api_p2pool: PrivP2poolApi, // For "watchdog" thread
priv_api_xmrig: PrivXmrigApi, // For "watchdog" thread
pub p2pool: Arc<Mutex<Process>>, // P2Pool process state
pub xmrig: Arc<Mutex<Process>>, // XMRig process state
pub pub_api_p2pool: Arc<Mutex<PubP2poolApi>>, // P2Pool API state (for GUI thread)
pub pub_api_xmrig: Arc<Mutex<PubXmrigApi>>, // XMRig API state (for GUI thread)
priv_api_p2pool: Arc<Mutex<PrivP2poolApi>>, // For "watchdog" thread
priv_api_xmrig: Arc<Mutex<PrivXmrigApi>>, // For "watchdog" thread
}
// Impl found at the very bottom of this file.
@ -328,41 +329,48 @@ impl Helper {
Self {
instant,
human_time: HumanTime::into_human(instant.elapsed()),
p2pool: Process::new(ProcessName::P2pool, String::new(), PathBuf::new()),
xmrig: Process::new(ProcessName::Xmrig, String::new(), PathBuf::new()),
pub_api_p2pool: PubP2poolApi::new(),
pub_api_xmrig: PubXmrigApi::new(),
priv_api_p2pool: PrivP2poolApi::new(),
priv_api_xmrig: PrivXmrigApi::new(),
p2pool: Arc::new(Mutex::new(Process::new(ProcessName::P2pool, String::new(), PathBuf::new()))),
xmrig: Arc::new(Mutex::new(Process::new(ProcessName::Xmrig, String::new(), PathBuf::new()))),
pub_api_p2pool: Arc::new(Mutex::new(PubP2poolApi::new())),
pub_api_xmrig: Arc::new(Mutex::new(PubXmrigApi::new())),
priv_api_p2pool: Arc::new(Mutex::new(PrivP2poolApi::new())),
priv_api_xmrig: Arc::new(Mutex::new(PrivXmrigApi::new())),
}
}
// The tokio runtime that blocks while async reading both STDOUT/STDERR
// Cheaper than spawning 2 OS threads just to read 2 pipes (...right? :D)
#[tokio::main]
async fn read_stdout_stderr(stdout: tokio::process::ChildStdout, stderr: tokio::process::ChildStderr) {
async fn read_stdout_stderr(process: &Arc<Mutex<Process>>, stdout: tokio::process::ChildStdout, stderr: tokio::process::ChildStderr) {
let process_stdout = Arc::clone(process);
let process_stderr = Arc::clone(process);
// Create STDOUT pipe job
let stdout_job = tokio::spawn(async move {
let mut stdout_reader = BufReader::new(stdout).lines();
while let Ok(Some(line)) = stdout_reader.next_line().await {
println!("{}", line);
writeln!(process_stdout.lock().unwrap().output, "{}", line);
}
});
// Create STDERR pipe job
let stderr_job = tokio::spawn(async move {
let mut stderr_reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = stderr_reader.next_line().await {
println!("{}", line);
writeln!(process_stderr.lock().unwrap().output, "{}", line);
}
});
// Block and read both until they are closed (automatic when process dies)
// The ordering of STDOUT/STDERR should be automatic thanks to the locks.
tokio::join![stdout_job, stderr_job];
}
//---------------------------------------------------------------------------------------------------- P2Pool specific
// Intermediate function that parses the arguments, and spawns the P2Pool watchdog thread.
pub fn spawn_p2pool(state: &crate::disk::P2pool, api_path: &std::path::Path) {
pub fn spawn_p2pool(helper: &Arc<Mutex<Self>>, state: &crate::disk::P2pool, path: std::path::PathBuf) {
let mut args = Vec::with_capacity(500);
let path = path.clone();
let mut api_path = path.clone();
api_path.pop();
// [Simple]
if state.simple {
// Build the p2pool argument
@ -403,17 +411,43 @@ impl Helper {
crate::disk::print_dash(&format!("P2Pool | Launch arguments ... {:#?}", args));
// Spawn watchdog thread
let process = Arc::clone(&helper.lock().unwrap().p2pool);
let pub_api = Arc::clone(&helper.lock().unwrap().pub_api_p2pool);
let priv_api = Arc::clone(&helper.lock().unwrap().priv_api_p2pool);
thread::spawn(move || {
Self::spawn_p2pool_watchdog(args);
Self::spawn_p2pool_watchdog(process, pub_api, priv_api, args, path);
});
}
// The actual P2Pool watchdog tokio runtime.
#[tokio::main]
pub async fn spawn_p2pool_watchdog(args: Vec<String>) {
async fn spawn_p2pool_watchdog(process: Arc<Mutex<Process>>, pub_api: Arc<Mutex<PubP2poolApi>>, priv_api: Arc<Mutex<PrivP2poolApi>>, args: Vec<String>, path: std::path::PathBuf) {
// 1. Create command
// 2. Spawn STDOUT/STDERR thread
// 3. Loop forever as watchdog until process dies
let mut child = tokio::process::Command::new(path)
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.stdin(Stdio::piped())
.spawn().unwrap();
// 2. Set start time
let now = Instant::now();
process.lock().unwrap().start = now;
process.lock().unwrap().uptime = HumanTime::into_human(now.elapsed());
// 3. Spawn STDOUT/STDERR thread
thread::spawn(move || {
Self::read_stdout_stderr(&process, child.stdout.take().unwrap(), child.stderr.take().unwrap());
});
// 4. Loop forever as watchdog until process dies
loop {
// a. Watch user SIGNAL
// b. Create STDIN task
// c. Create API task
// d. Execute async tasks
// e. Sleep (900ms)
}
}
//---------------------------------------------------------------------------------------------------- XMRig specific
@ -480,38 +514,6 @@ impl Helper {
// 2. Loop init timestamp
let start = Instant::now();
// 3. Spawn child processes (if signal found)
let h = helper.lock().unwrap();
if let ProcessSignal::Start = h.p2pool.signal {
// Start outer thread, start inner stdout/stderr pipe, loop in outer thread for stdin/signal/etc
if !h.p2pool.input.is_empty() {
// Process STDIN
}
}
drop(h);
let h = helper.lock().unwrap();
if let ProcessSignal::Start = h.xmrig.signal {
// Start outer thread, start inner stdout/stderr pipe, loop in outer thread for stdin/signal/etc
if !h.xmrig.input.is_empty() {
// Process STDIN
}
}
drop(h);
// 4. Collect P2Pool API task (if alive)
let h = helper.lock().unwrap();
if let ProcessState::Alive = h.p2pool.state {
}
// 5. Collect XMRig HTTP API task (if alive)
if let ProcessState::Alive = h.xmrig.state {
}
drop(h);
// 6. Execute all async tasks
// for job in jobs {
// job.await;
// }
// 7. Set Gupax/P2Pool/XMRig uptime
let mut h = helper.lock().unwrap();
h.human_time = HumanTime::into_human(h.instant.elapsed());