helper: optimize PTY output parsing

(Stupid) Problem (caused by me):
--------------------------------
Up to 56million bytes of P2Pool/XMRig output were being held in memory
so they could be parsed. P2Pool output is the only one that needs
parsing as well, so double bad.

(Obvious) Solution:
-------------------
For XMRig:
Just don't write to an extra [String] buffer.

For P2Pool:
Parse the output, then... toss it out? You don't need the output
anymore after you parsed the values out, e.g: `Payouts`, `XMR Mined`.
Once they are parsed, add them to the current values instead of
completely overwriting them and then toss out the log buffer.

Now Gupax doesn't use stupid amounts of memory holding what is
essentially dead logs we already parsed. The parsing will be a lot
cheaper too since we aren't parsing the entire thing over and over
again (it was already pretty fast though).
This commit is contained in:
hinto-janaiyo 2022-12-14 17:37:29 -05:00
parent d3bbe2ece2
commit 631e4cc2db
No known key found for this signature in database
GPG key ID: B1C5A64B80691E45
2 changed files with 61 additions and 78 deletions

View file

@ -56,11 +56,8 @@ use log::*;
const LOCALE: num_format::Locale = num_format::Locale::en;
// The max amount of bytes of process output we are willing to
// hold in memory before it's too much and we need to reset.
const MAX_PROCESS_OUTPUT_BYTES: usize = 56_000_000;
// Just a little leeway so a reset will go off before the [String] allocates more memory.
const PROCESS_OUTPUT_LEEWAY: usize = MAX_PROCESS_OUTPUT_BYTES - 1000;
// The max bytes the GUI thread should hold
const MAX_GUI_OUTPUT_BYTES: usize = 500_000;
// Just a little leeway so a reset will go off before the [String] allocates more memory.
const GUI_OUTPUT_LEEWAY: usize = MAX_GUI_OUTPUT_BYTES - 1000;
@ -147,12 +144,12 @@ pub struct Process {
stdin: Option<Box<dyn portable_pty::MasterPty + Send>>, // A handle to the process's MasterPTY/STDIN
// This is the process's private output [String], used by both [Simple] and [Advanced].
// "full" contains the entire output (up to a month, or 50m bytes), "buf" will be written to
// the same as full, but it will be [swap()]'d by the "helper" thread into the GUIs [String].
// "parse" contains the output that will be parsed, then tossed out. "pub" will be written to
// the same as parse, but it will be [swap()]'d by the "helper" thread into the GUIs [String].
// The "helper" thread synchronizes this swap so that the data in here is moved there
// roughly once a second. GUI thread never touches this.
output_full: Arc<Mutex<String>>,
output_buf: Arc<Mutex<String>>,
output_parse: Arc<Mutex<String>>,
output_pub: Arc<Mutex<String>>,
// Start time of process.
start: std::time::Instant,
@ -168,9 +165,8 @@ impl Process {
start: Instant::now(),
stdin: Option::None,
child: Option::None,
// MAX should last around a month for P2Pool log level 3.
output_full: Arc::new(Mutex::new(String::with_capacity(MAX_PROCESS_OUTPUT_BYTES))),
output_buf: Arc::new(Mutex::new(String::new())),
output_parse: Arc::new(Mutex::new(String::with_capacity(500))),
output_pub: Arc::new(Mutex::new(String::with_capacity(500))),
input: vec![String::new()],
}
}
@ -248,32 +244,27 @@ impl Helper {
}
// Reads a PTY which combines STDOUT/STDERR for me, yay
fn read_pty(output_full: Arc<Mutex<String>>, output_buf: Arc<Mutex<String>>, reader: Box<dyn std::io::Read + Send>) {
fn read_pty(output_parse: Arc<Mutex<String>>, output_pub: Arc<Mutex<String>>, reader: Box<dyn std::io::Read + Send>, name: ProcessName) {
use std::io::BufRead;
let mut stdout = std::io::BufReader::new(reader).lines();
// We don't need to write twice for XMRig, since we dont parse it... yet.
if name == ProcessName::Xmrig {
while let Some(Ok(line)) = stdout.next() {
// println!("{}", line); // For debugging.
if let Err(e) = writeln!(output_full.lock().unwrap(), "{}", line) { error!("PTY | Output error: {}", e); }
if let Err(e) = writeln!(output_buf.lock().unwrap(), "{}", line) { error!("PTY | Output error: {}", e); }
// if let Err(e) = writeln!(output_parse.lock().unwrap(), "{}", line) { error!("PTY | Output error: {}", e); }
if let Err(e) = writeln!(output_pub.lock().unwrap(), "{}", line) { error!("PTY | Output error: {}", e); }
}
} else {
while let Some(Ok(line)) = stdout.next() {
// println!("{}", line); // For debugging.
if let Err(e) = writeln!(output_parse.lock().unwrap(), "{}", line) { error!("PTY | Output error: {}", e); }
if let Err(e) = writeln!(output_pub.lock().unwrap(), "{}", line) { error!("PTY | Output error: {}", e); }
}
}
}
// Reset output if larger than 55_999_000 bytes (around 1 week of logs).
// The actual [String] holds 56_000_000, but this allows for some leeway so it doesn't allocate more memory.
// Reset output if larger than max bytes.
// This will also append a message showing it was reset.
fn check_reset_output_full(output_full: &Arc<Mutex<String>>, name: ProcessName) {
debug!("{} Watchdog | Resetting [output_full]...", name);
let mut output_full = output_full.lock().unwrap();
if output_full.len() > PROCESS_OUTPUT_LEEWAY {
info!("{} | Output is nearing {} bytes, resetting!", MAX_PROCESS_OUTPUT_BYTES, name);
let text = format!("{}\n{} internal log is exceeding the maximum: {} bytes!\nI've reset the logs for you, your stats may now be inaccurate since they depend on these logs!\nI think you rather have that than have it hogging your memory, though!\n{}\n\n\n\n", HORI_CONSOLE, name, MAX_PROCESS_OUTPUT_BYTES, HORI_CONSOLE);
output_full.clear();
output_full.push_str(&text);
}
debug!("{} Watchdog | Resetting [output_full] ... OK", name);
}
// For the GUI thread
fn check_reset_gui_p2pool_output(gui_api: &Arc<Mutex<PubP2poolApi>>) {
debug!("P2Pool Watchdog | Resetting GUI output...");
let mut gui_api = gui_api.lock().unwrap();
@ -469,13 +460,13 @@ impl Helper {
// 3. Spawn PTY read thread
debug!("P2Pool | Spawning PTY read thread...");
let output_full = Arc::clone(&process.lock().unwrap().output_full);
let output_buf = Arc::clone(&process.lock().unwrap().output_buf);
let output_parse = Arc::clone(&process.lock().unwrap().output_parse);
let output_pub = Arc::clone(&process.lock().unwrap().output_pub);
thread::spawn(move || {
Self::read_pty(output_full, output_buf, reader);
Self::read_pty(output_parse, output_pub, reader, ProcessName::P2pool);
});
let output_full = Arc::clone(&process.lock().unwrap().output_full);
let output_buf = Arc::clone(&process.lock().unwrap().output_buf);
let output_parse = Arc::clone(&process.lock().unwrap().output_parse);
let output_pub = Arc::clone(&process.lock().unwrap().output_pub);
debug!("P2Pool | Cleaning old API files...");
// Attempt to remove stale API file
@ -573,13 +564,12 @@ impl Helper {
// Check if logs need resetting
debug!("P2Pool Watchdog | Attempting log reset check");
Self::check_reset_output_full(&output_full, ProcessName::P2pool);
debug!("P2Pool Watchdog | Attempting GUI log reset check");
Self::check_reset_gui_p2pool_output(&gui_api);
// Always update from output
debug!("P2Pool Watchdog | Starting [update_from_output()]");
PubP2poolApi::update_from_output(&pub_api, &output_full, &output_buf, start.elapsed(), &regex);
PubP2poolApi::update_from_output(&pub_api, &output_parse, &output_pub, start.elapsed(), &regex);
// Read API file into string
debug!("P2Pool Watchdog | Attempting API file read");
@ -608,22 +598,6 @@ impl Helper {
}
//---------------------------------------------------------------------------------------------------- XMRig specific, most functions are very similar to P2Pool's
// // Read P2Pool's API file.
// fn read_p2pool_api(path: &std::path::PathBuf) -> Result<String, std::io::Error> {
// match std::fs::read_to_string(path) {
// Ok(s) => Ok(s),
// Err(e) => { warn!("P2Pool API | [{}] read error: {}", path.display(), e); Err(e) },
// }
// }
//
// // Deserialize the above [String] into a [PrivP2poolApi]
// fn str_to_priv_p2pool_api(string: &str) -> Result<PrivP2poolApi, serde_json::Error> {
// match serde_json::from_str::<PrivP2poolApi>(string) {
// Ok(a) => Ok(a),
// Err(e) => { warn!("P2Pool API | Could not deserialize API data: {}", e); Err(e) },
// }
// }
//
// If processes are started with [sudo] on macOS, they must also
// be killed with [sudo] (even if I have a direct handle to it as the
// parent process...!). This is only needed on macOS, not Linux.
@ -838,13 +812,14 @@ impl Helper {
// 4. Spawn PTY read thread
debug!("XMRig | Spawning PTY read thread...");
let output_full = Arc::clone(&process.lock().unwrap().output_full);
let output_buf = Arc::clone(&process.lock().unwrap().output_buf);
let output_parse = Arc::clone(&process.lock().unwrap().output_parse);
let output_pub = Arc::clone(&process.lock().unwrap().output_pub);
thread::spawn(move || {
Self::read_pty(output_full, output_buf, reader);
Self::read_pty(output_parse, output_pub, reader, ProcessName::Xmrig);
});
let output_full = Arc::clone(&process.lock().unwrap().output_full);
let output_buf = Arc::clone(&process.lock().unwrap().output_buf);
// We don't parse anything in XMRigs output... yet.
// let output_parse = Arc::clone(&process.lock().unwrap().output_parse);
let output_pub = Arc::clone(&process.lock().unwrap().output_pub);
let client: hyper::Client<hyper::client::HttpConnector> = hyper::Client::builder().build(hyper::client::HttpConnector::new());
let start = process.lock().unwrap().start;
@ -929,13 +904,12 @@ impl Helper {
drop(lock);
// Check if logs need resetting
debug!("XMRig Watchdog | Attempting log reset check");
Self::check_reset_output_full(&output_full, ProcessName::Xmrig);
debug!("XMRig Watchdog | Attempting GUI log reset check");
Self::check_reset_gui_xmrig_output(&gui_api);
// Always update from output
debug!("XMRig Watchdog | Starting [update_from_output()]");
PubXmrigApi::update_from_output(&pub_api, &output_buf, start.elapsed());
PubXmrigApi::update_from_output(&pub_api, &output_pub, start.elapsed());
// Send an HTTP API request
debug!("XMRig Watchdog | Attempting HTTP API request...");
@ -1411,19 +1385,25 @@ impl PubP2poolApi {
}
// Mutate "watchdog"'s [PubP2poolApi] with data the process output.
// The [output] & [output_buf] are from the [Process] struct.
fn update_from_output(public: &Arc<Mutex<Self>>, output: &Arc<Mutex<String>>, output_buf: &Arc<Mutex<String>>, elapsed: std::time::Duration, regex: &P2poolRegex) {
fn update_from_output(public: &Arc<Mutex<Self>>, output_parse: &Arc<Mutex<String>>, output_pub: &Arc<Mutex<String>>, elapsed: std::time::Duration, regex: &P2poolRegex) {
// 1. Take the process's current output buffer and combine it with Pub (if not empty)
let mut output_buf = output_buf.lock().unwrap();
if !output_buf.is_empty() {
public.lock().unwrap().output.push_str(&std::mem::take(&mut *output_buf));
let mut output_pub = output_pub.lock().unwrap();
if !output_pub.is_empty() {
public.lock().unwrap().output.push_str(&std::mem::take(&mut *output_pub));
}
// 2. Parse the full STDOUT
let output = output.lock().unwrap();
let (payouts, xmr) = Self::calc_payouts_and_xmr(&output, regex);
let mut output_parse = output_parse.lock().unwrap();
let (payouts, xmr) = Self::calc_payouts_and_xmr(&output_parse, regex);
// 3. Throw away [output_parse]
output_parse.clear();
drop(output_parse);
let lock = public.lock().unwrap();
// 4. Add to current values
let (payouts, xmr) = (lock.payouts + payouts, lock.xmr + xmr);
drop(lock);
// 3. Calculate hour/day/month given elapsed time
// 5. Calculate hour/day/month given elapsed time
let elapsed_as_secs_f64 = elapsed.as_secs_f64();
// Payouts
let per_sec = (payouts as f64) / elapsed_as_secs_f64;
@ -1436,7 +1416,7 @@ impl PubP2poolApi {
let xmr_day = payouts_hour * 24.0;
let xmr_month = payouts_day * 30.0;
// 4. Mutate the struct with the new info
// 6. Mutate the struct with the new info
let mut public = public.lock().unwrap();
*public = Self {
uptime: HumanTime::into_human(elapsed),
@ -1590,13 +1570,15 @@ impl PubXmrigApi {
if !buf.is_empty() { gui_api.output.push_str(&buf); }
}
fn update_from_output(public: &Arc<Mutex<Self>>, output_buf: &Arc<Mutex<String>>, elapsed: std::time::Duration) {
// This combines the buffer from the PTY thread [output_pub]
// with the actual [PubApiXmrig] output field.
fn update_from_output(public: &Arc<Mutex<Self>>, output_pub: &Arc<Mutex<String>>, elapsed: std::time::Duration) {
// 1. Take process output buffer if not empty
let mut output_buf = output_buf.lock().unwrap();
let mut output_pub = output_pub.lock().unwrap();
let mut public = public.lock().unwrap();
// 2. Append
if !output_buf.is_empty() {
public.output.push_str(&std::mem::take(&mut *output_buf));
if !output_pub.is_empty() {
public.output.push_str(&std::mem::take(&mut *output_pub));
}
// 3. Update uptime
public.uptime = HumanTime::into_human(elapsed);

View file

@ -972,13 +972,13 @@ impl eframe::App for App {
let p2pool = self.p2pool.lock().unwrap();
let p2pool_is_alive = p2pool.is_alive();
let p2pool_is_waiting = p2pool.is_waiting();
let p2pool_state = ProcessState::Alive;
let p2pool_state = p2pool.state;
drop(p2pool);
debug!("App | Locking and collecting XMRig state...");
let xmrig = self.xmrig.lock().unwrap();
let xmrig_is_alive = xmrig.is_alive();
let xmrig_is_waiting = xmrig.is_waiting();
let xmrig_state = ProcessState::Alive;
let xmrig_state = xmrig.state;
drop(xmrig);
// This sets the top level Ui dimensions.
@ -1487,6 +1487,7 @@ impl eframe::App for App {
ui.add_sized([width, height], Hyperlink::from_label_and_url("Made by hinto-janaiyo".to_string(), "https://gupax.io"));
ui.add_sized([width, height], Label::new("egui is licensed under MIT & Apache-2.0"));
ui.add_sized([width, height], Label::new("Gupax, P2Pool, and XMRig are licensed under GPLv3"));
if cfg!(debug_assertions) { ui.label(format!("Gupax is running in debug mode - {}", self.now.elapsed().as_secs_f64())); }
});
}
Tab::Status => {