terminate fts thread when empty

This commit is contained in:
creating2morrow 2023-05-04 10:37:11 -04:00
parent 15111ebc6d
commit edcd0f3d97
3 changed files with 36 additions and 4 deletions

View file

@ -124,4 +124,11 @@ pub struct Args {
default_value = "false" default_value = "false"
)] )]
pub remote_access: bool, pub remote_access: bool,
/// Remove all failed-to-send messages from db on app startup
#[arg(
long,
help = "this will clear failed-to-send messages from the databse",
default_value = "false"
)]
pub clear_fts: bool,
} }

View file

@ -136,7 +136,7 @@ async fn send_message(out: &Message, jwp: &str) -> Result<(), Box<dyn Error>> {
Ok(r) => { Ok(r) => {
if r.contains("402") { error!("Payment required"); } if r.contains("402") { error!("Payment required"); }
// remove the mid from fts if necessary // remove the mid from fts if necessary
remove_from_retry(String::from(&out.mid)); remove_from_fts(String::from(&out.mid));
Ok(()) Ok(())
}, },
_ => Ok(()), _ => Ok(()),
@ -209,10 +209,17 @@ async fn send_to_retry(mid: String) {
} }
debug!("writing fts message index {} for id: {}", msg_list, list_key); debug!("writing fts message index {} for id: {}", msg_list, list_key);
db::Interface::write(&s.env, &s.handle, &String::from(list_key), &msg_list); db::Interface::write(&s.env, &s.handle, &String::from(list_key), &msg_list);
// restart fts if is empty
let list_key = format!("fts");
let r = db::Interface::read(&s.env, &s.handle, &String::from(&list_key));
if r == utils::empty_string() {
debug!("restarting fts");
utils::restart_retry_fts();
}
} }
/// clear fts message from index /// clear fts message from index
fn remove_from_retry(mid: String) { fn remove_from_fts(mid: String) {
info!("removing id {} from fts", &mid); info!("removing id {} from fts", &mid);
let s = db::Interface::open(); let s = db::Interface::open();
// in order to retrieve FTS (failed-to-send), write keys to with fts // in order to retrieve FTS (failed-to-send), write keys to with fts
@ -228,7 +235,11 @@ fn remove_from_retry(mid: String) {
db::Interface::write(&s.env, &s.handle, &String::from(list_key), &msg_list); db::Interface::write(&s.env, &s.handle, &String::from(list_key), &msg_list);
} }
/// triggered on app startup, retries to send fts every minute /// Triggered on app startup, retries to send fts every minute
///
/// FTS thread terminates when empty and gets restarted on the next
///
/// failed-to-send message.
pub async fn retry_fts() { pub async fn retry_fts() {
let tick: std::sync::mpsc::Receiver<()> = schedule_recv::periodic_ms(60000); let tick: std::sync::mpsc::Receiver<()> = schedule_recv::periodic_ms(60000);
loop { loop {
@ -239,6 +250,7 @@ pub async fn retry_fts() {
let r = db::Interface::read(&s.env, &s.handle, &String::from(list_key)); let r = db::Interface::read(&s.env, &s.handle, &String::from(list_key));
if r == utils::empty_string() { if r == utils::empty_string() {
info!("fts message index not found"); info!("fts message index not found");
break; // terminate fts if no message to send
} }
let v_mid = r.split(","); let v_mid = r.split(",");
let v: Vec<String> = v_mid.map(|s| String::from(s)).collect(); let v: Vec<String> = v_mid.map(|s| String::from(s)).collect();

View file

@ -335,6 +335,7 @@ pub async fn start_up() {
info!("nevmes is starting up"); info!("nevmes is starting up");
let args = args::Args::parse(); let args = args::Args::parse();
if args.remote_access { start_micro_servers(); } if args.remote_access { start_micro_servers(); }
if args.clear_fts { clear_fts(); }
gen_signing_keys(); gen_signing_keys();
if !is_using_remote_node() { monero::start_daemon(); } if !is_using_remote_node() { monero::start_daemon(); }
create_wallet_dir(); create_wallet_dir();
@ -386,6 +387,18 @@ pub fn kill_child_processes(cm: bool) {
debug!("{:?}", i2pz_output.stdout); debug!("{:?}", i2pz_output.stdout);
} }
/// We can restart fts from since it gets terminated when empty
pub fn restart_retry_fts() {
tokio::spawn(async move { message::retry_fts().await; });
}
/// Called on app startup if `--clear-fts` flag is passed.
fn clear_fts() {
info!("clear fts");
let s = db::Interface::open();
db::Interface::delete(&s.env, &s.handle, "fts");
}
/// Move temp files to /tmp /// Move temp files to /tmp
pub fn stage_cleanup(f: String) { pub fn stage_cleanup(f: String) {
info!("staging {} for cleanup", &f); info!("staging {} for cleanup", &f);