helper: p2pool - fix args, basic watchdog loop, add STDOUT/STDERR handle to [Process] struct

This commit is contained in:
hinto-janaiyo 2022-12-04 11:24:38 -05:00
parent d9d71c40d4
commit 5d293054cf
No known key found for this signature in database
GPG key ID: B1C5A64B80691E45
5 changed files with 58 additions and 28 deletions

6
Cargo.lock generated
View file

@ -4,7 +4,7 @@ version = 3
[[package]] [[package]]
name = "Gupax" name = "Gupax"
version = "0.5.0" version = "0.7.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"arti-client", "arti-client",
@ -2500,9 +2500,9 @@ dependencies = [
[[package]] [[package]]
name = "num-format" name = "num-format"
version = "0.4.3" version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54b862ff8df690cf089058c98b183676a7ed0f974cc08b426800093227cbff3b" checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3"
dependencies = [ dependencies = [
"arrayvec 0.7.2", "arrayvec 0.7.2",
"itoa", "itoa",

View file

@ -1,6 +1,6 @@
[package] [package]
name = "Gupax" name = "Gupax"
version = "0.5.0" version = "0.7.0"
authors = ["hinto-janaiyo <hinto.janaiyo@protonmail.com>"] authors = ["hinto-janaiyo <hinto.janaiyo@protonmail.com>"]
description = "GUI for P2Pool+XMRig" description = "GUI for P2Pool+XMRig"
documentation = "https://github.com/hinto-janaiyo/gupax" documentation = "https://github.com/hinto-janaiyo/gupax"

View file

@ -54,6 +54,7 @@ pub const BLACK: egui::Color32 = egui::Color32::BLACK;
// [Duration] constants // [Duration] constants
pub const SECOND: std::time::Duration = std::time::Duration::from_secs(1); pub const SECOND: std::time::Duration = std::time::Duration::from_secs(1);
pub const MILLI_900: std::time::Duration = std::time::Duration::from_millis(900);
pub const TOKIO_SECOND: tokio::time::Duration = std::time::Duration::from_secs(1); pub const TOKIO_SECOND: tokio::time::Duration = std::time::Duration::from_secs(1);
// OS specific // OS specific

View file

@ -75,7 +75,7 @@ pub struct Process {
pub signal: ProcessSignal, // Did the user click [Start/Stop/Restart]? pub signal: ProcessSignal, // Did the user click [Start/Stop/Restart]?
start: Instant, // Start time of process start: Instant, // Start time of process
pub uptime: HumanTime, // Human readable process uptime pub uptime: HumanTime, // Human readable process uptime
pub output: String, // This is the process's stdout + stderr pub output: String, // This is the process's PUBLIC stdout + stderr
// STDIN Problem: // STDIN Problem:
// - User can input many many commands in 1 second // - User can input many many commands in 1 second
// - The process loop only processes every 1 second // - The process loop only processes every 1 second
@ -86,7 +86,10 @@ pub struct Process {
// - When the user inputs something, push it to a [Vec] // - When the user inputs something, push it to a [Vec]
// - In the process loop, loop over every [Vec] element and // - In the process loop, loop over every [Vec] element and
// send each one individually to the process stdin // send each one individually to the process stdin
stdin: Option<std::process::ChildStdin>, // A handle to the process's STDIN pub child: Option<Arc<Mutex<tokio::process::Child>>>, // A handle to the actual child process
stdout: Option<tokio::process::ChildStdout>, // A handle to the process's STDOUT
stderr: Option<tokio::process::ChildStderr>, // A handle to the process's STDERR
stdin: Option<tokio::process::ChildStdin>, // A handle to the process's STDIN
pub input: Vec<String>, pub input: Vec<String>,
} }
@ -100,7 +103,10 @@ impl Process {
signal: ProcessSignal::None, signal: ProcessSignal::None,
start: now, start: now,
uptime: HumanTime::into_human(now.elapsed()), uptime: HumanTime::into_human(now.elapsed()),
stdout: Option::None,
stderr: Option::None,
stdin: Option::None, stdin: Option::None,
child: Option::None,
// P2Pool log level 1 produces a bit less than 100,000 lines a day. // P2Pool log level 1 produces a bit less than 100,000 lines a day.
// Assuming each line averages 80 UTF-8 scalars (80 bytes), then this // Assuming each line averages 80 UTF-8 scalars (80 bytes), then this
// initial buffer should last around a week (56MB) before resetting. // initial buffer should last around a week (56MB) before resetting.
@ -540,13 +546,17 @@ impl Helper {
// The tokio runtime that blocks while async reading both STDOUT/STDERR // The tokio runtime that blocks while async reading both STDOUT/STDERR
// Cheaper than spawning 2 OS threads just to read 2 pipes (...right? :D) // Cheaper than spawning 2 OS threads just to read 2 pipes (...right? :D)
#[tokio::main] #[tokio::main]
async fn read_stdout_stderr(process: &Arc<Mutex<Process>>, stdout: tokio::process::ChildStdout, stderr: tokio::process::ChildStderr) { async fn read_stdout_stderr(process: Arc<Mutex<Process>>) {
let process_stdout = Arc::clone(process); let process_stdout = Arc::clone(&process);
let process_stderr = Arc::clone(process); let process_stderr = Arc::clone(&process);
let stdout = process.lock().unwrap().stdout.take().unwrap();
let stderr = process.lock().unwrap().stderr.take().unwrap();
// Create STDOUT pipe job // Create STDOUT pipe job
let stdout_job = tokio::spawn(async move { let stdout_job = tokio::spawn(async move {
let mut stdout_reader = BufReader::new(stdout).lines(); let mut stdout_reader = BufReader::new(stdout).lines();
while let Ok(Some(line)) = stdout_reader.next_line().await { while let Ok(Some(line)) = stdout_reader.next_line().await {
// println!("{}", line); // For debugging.
writeln!(process_stdout.lock().unwrap().output, "{}", line); writeln!(process_stdout.lock().unwrap().output, "{}", line);
} }
}); });
@ -554,6 +564,7 @@ impl Helper {
let stderr_job = tokio::spawn(async move { let stderr_job = tokio::spawn(async move {
let mut stderr_reader = BufReader::new(stderr).lines(); let mut stderr_reader = BufReader::new(stderr).lines();
while let Ok(Some(line)) = stderr_reader.next_line().await { while let Ok(Some(line)) = stderr_reader.next_line().await {
// println!("{}", line); // For debugging.
writeln!(process_stderr.lock().unwrap().output, "{}", line); writeln!(process_stderr.lock().unwrap().output, "{}", line);
} }
}); });
@ -574,11 +585,11 @@ impl Helper {
if state.simple { if state.simple {
// Build the p2pool argument // Build the p2pool argument
let (ip, rpc, zmq) = crate::node::enum_to_ip_rpc_zmq_tuple(state.node); // Get: (IP, RPC, ZMQ) let (ip, rpc, zmq) = crate::node::enum_to_ip_rpc_zmq_tuple(state.node); // Get: (IP, RPC, ZMQ)
args.push(format!("--wallet {}", state.address)); // Wallet Address args.push("--wallet".to_string()); args.push(state.address.clone()); // Wallet address
args.push(format!("--host {}", ip)); // IP Address args.push("--host".to_string()); args.push(ip.to_string()); // IP Address
args.push(format!("--rpc-port {}", rpc)); // RPC Port args.push("--rpc-port".to_string()); args.push(rpc.to_string()); // RPC Port
args.push(format!("--zmq-port {}", zmq)); // ZMQ Port args.push("--zmq-port".to_string()); args.push(zmq.to_string()); // ZMQ Port
args.push(format!("--data-api {}", api_path.display())); // API Path args.push("--data-api".to_string()); args.push(api_path.display().to_string()); // API Path
args.push("--local-api".to_string()); // Enable API args.push("--local-api".to_string()); // Enable API
args.push("--no-color".to_string()); // Remove color escape sequences, Gupax terminal can't parse it :( args.push("--no-color".to_string()); // Remove color escape sequences, Gupax terminal can't parse it :(
args.push("--mini".to_string()); // P2Pool Mini args.push("--mini".to_string()); // P2Pool Mini
@ -622,30 +633,44 @@ impl Helper {
#[tokio::main] #[tokio::main]
async fn spawn_p2pool_watchdog(process: Arc<Mutex<Process>>, pub_api: Arc<Mutex<PubP2poolApi>>, priv_api: Arc<Mutex<PrivP2poolApi>>, args: Vec<String>, path: std::path::PathBuf) { async fn spawn_p2pool_watchdog(process: Arc<Mutex<Process>>, pub_api: Arc<Mutex<PubP2poolApi>>, priv_api: Arc<Mutex<PrivP2poolApi>>, args: Vec<String>, path: std::path::PathBuf) {
// 1. Create command // 1. Create command
let mut child = tokio::process::Command::new(path) let child = Arc::new(Mutex::new(tokio::process::Command::new(path)
.args(args) .args(args)
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.stderr(Stdio::piped()) .stderr(Stdio::piped())
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.spawn().unwrap(); .spawn().unwrap()));
// 2. Set start time // 2. Set process state
let now = Instant::now(); let mut lock = process.lock().unwrap();
process.lock().unwrap().start = now; lock.state = ProcessState::Alive;
process.lock().unwrap().uptime = HumanTime::into_human(now.elapsed()); lock.signal = ProcessSignal::None;
lock.start = Instant::now();
lock.child = Some(Arc::clone(&child));
lock.stdin = Some(child.lock().unwrap().stdin.take().unwrap());
drop(lock);
// 3. Spawn STDOUT/STDERR thread // 3. Spawn STDOUT/STDERR thread
let process_clone = Arc::clone(&process);
thread::spawn(move || { thread::spawn(move || {
Self::read_stdout_stderr(&process, child.stdout.take().unwrap(), child.stderr.take().unwrap()); Self::read_stdout_stderr(process_clone);
}); });
// 4. Loop forever as watchdog until process dies // 4. Loop forever as watchdog until process dies
loop { loop {
// a. Watch user SIGNAL // a. Watch user SIGNAL
match process.lock().unwrap().signal {
ProcessSignal::Stop => {},
ProcessSignal::Restart => {},
_ => {},
}
// b. Create STDIN task // b. Create STDIN task
if !process.lock().unwrap().input.is_empty() { /* process it */ }
// c. Create API task // c. Create API task
let async_file_read = { /* tokio async file read job */ };
// d. Execute async tasks // d. Execute async tasks
// e. Sleep (900ms) tokio::join![/* jobs */];
// f. Sleep (900ms)
std::thread::sleep(MILLI_900);
} }
} }

View file

@ -1018,14 +1018,18 @@ impl eframe::App for App {
if ui.add_sized([width, height], Button::new("")).on_hover_text("Restart P2Pool").clicked() { self.p2pool = false; } if ui.add_sized([width, height], Button::new("")).on_hover_text("Restart P2Pool").clicked() { self.p2pool = false; }
if ui.add_sized([width, height], Button::new("")).on_hover_text("Stop P2Pool").clicked() { self.p2pool = false; } if ui.add_sized([width, height], Button::new("")).on_hover_text("Stop P2Pool").clicked() { self.p2pool = false; }
ui.add_enabled_ui(false, |ui| { ui.add_enabled_ui(false, |ui| {
ui.add_sized([width, height], Button::new("")).on_hover_text("Start P2Pool"); if ui.add_sized([width, height], Button::new("")).on_hover_text("Start P2Pool").clicked() {
Helper::spawn_p2pool(&self.helper, &self.state.p2pool, self.state.gupax.absolute_p2pool_path.clone());
}
}); });
} else { } else {
ui.add_enabled_ui(false, |ui| { ui.add_enabled_ui(false, |ui| {
ui.add_sized([width, height], Button::new("")).on_hover_text("Restart P2Pool"); ui.add_sized([width, height], Button::new("")).on_hover_text("Restart P2Pool");
ui.add_sized([width, height], Button::new("")).on_hover_text("Stop P2Pool"); ui.add_sized([width, height], Button::new("")).on_hover_text("Stop P2Pool");
}); });
if ui.add_sized([width, height], Button::new("")).on_hover_text("Start P2Pool").clicked() { self.p2pool = true; } if ui.add_sized([width, height], Button::new("")).on_hover_text("Start P2Pool").clicked() {
Helper::spawn_p2pool(&self.helper, &self.state.p2pool, self.state.gupax.absolute_p2pool_path.clone());
}
} }
}); });
}, },