From edcd0f3d97de4cbd251b64bb1ff3f7460e6ec33b Mon Sep 17 00:00:00 2001 From: creating2morrow Date: Thu, 4 May 2023 10:37:11 -0400 Subject: [PATCH] terminate fts thread when empty --- nevmes-core/src/args.rs | 7 +++++++ nevmes-core/src/message.rs | 18 +++++++++++++++--- nevmes-core/src/utils.rs | 15 ++++++++++++++- 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/nevmes-core/src/args.rs b/nevmes-core/src/args.rs index 8df0d83..225e5ce 100644 --- a/nevmes-core/src/args.rs +++ b/nevmes-core/src/args.rs @@ -124,4 +124,11 @@ pub struct Args { default_value = "false" )] pub remote_access: bool, + /// Remove all failed-to-send messages from db on app startup + #[arg( + long, + help = "this will clear failed-to-send messages from the databse", + default_value = "false" + )] + pub clear_fts: bool, } diff --git a/nevmes-core/src/message.rs b/nevmes-core/src/message.rs index c99c057..517532f 100644 --- a/nevmes-core/src/message.rs +++ b/nevmes-core/src/message.rs @@ -136,7 +136,7 @@ async fn send_message(out: &Message, jwp: &str) -> Result<(), Box> { Ok(r) => { if r.contains("402") { error!("Payment required"); } // remove the mid from fts if necessary - remove_from_retry(String::from(&out.mid)); + remove_from_fts(String::from(&out.mid)); Ok(()) }, _ => Ok(()), @@ -209,10 +209,17 @@ async fn send_to_retry(mid: String) { } debug!("writing fts message index {} for id: {}", msg_list, list_key); db::Interface::write(&s.env, &s.handle, &String::from(list_key), &msg_list); + // restart fts if is empty + let list_key = format!("fts"); + let r = db::Interface::read(&s.env, &s.handle, &String::from(&list_key)); + if r == utils::empty_string() { + debug!("restarting fts"); + utils::restart_retry_fts(); + } } /// clear fts message from index -fn remove_from_retry(mid: String) { +fn remove_from_fts(mid: String) { info!("removing id {} from fts", &mid); let s = db::Interface::open(); // in order to retrieve FTS (failed-to-send), write keys to with fts @@ -228,7 +235,11 @@ fn remove_from_retry(mid: String) { db::Interface::write(&s.env, &s.handle, &String::from(list_key), &msg_list); } -/// triggered on app startup, retries to send fts every minute +/// Triggered on app startup, retries to send fts every minute +/// +/// FTS thread terminates when empty and gets restarted on the next +/// +/// failed-to-send message. pub async fn retry_fts() { let tick: std::sync::mpsc::Receiver<()> = schedule_recv::periodic_ms(60000); loop { @@ -239,6 +250,7 @@ pub async fn retry_fts() { let r = db::Interface::read(&s.env, &s.handle, &String::from(list_key)); if r == utils::empty_string() { info!("fts message index not found"); + break; // terminate fts if no message to send } let v_mid = r.split(","); let v: Vec = v_mid.map(|s| String::from(s)).collect(); diff --git a/nevmes-core/src/utils.rs b/nevmes-core/src/utils.rs index 4eb839f..5e45a0b 100644 --- a/nevmes-core/src/utils.rs +++ b/nevmes-core/src/utils.rs @@ -334,7 +334,8 @@ fn start_gui() { pub async fn start_up() { info!("nevmes is starting up"); let args = args::Args::parse(); - if args.remote_access { start_micro_servers(); } + if args.remote_access { start_micro_servers(); } + if args.clear_fts { clear_fts(); } gen_signing_keys(); if !is_using_remote_node() { monero::start_daemon(); } create_wallet_dir(); @@ -386,6 +387,18 @@ pub fn kill_child_processes(cm: bool) { debug!("{:?}", i2pz_output.stdout); } +/// We can restart fts from since it gets terminated when empty +pub fn restart_retry_fts() { + tokio::spawn(async move { message::retry_fts().await; }); +} + +/// Called on app startup if `--clear-fts` flag is passed. +fn clear_fts() { + info!("clear fts"); + let s = db::Interface::open(); + db::Interface::delete(&s.env, &s.handle, "fts"); +} + /// Move temp files to /tmp pub fn stage_cleanup(f: String) { info!("staging {} for cleanup", &f);