From 827f19a1c79f1115f5a3532a807d1ee9790633d6 Mon Sep 17 00:00:00 2001 From: creating2morrow Date: Tue, 9 May 2023 02:55:55 -0400 Subject: [PATCH] successful fts rebroadcated message --- nevmes-core/src/message.rs | 45 ++++++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/nevmes-core/src/message.rs b/nevmes-core/src/message.rs index 067f800..431c700 100644 --- a/nevmes-core/src/message.rs +++ b/nevmes-core/src/message.rs @@ -2,6 +2,7 @@ use crate::{contact, db, models::*, utils, reqres, i2p, gpg}; use std::error::Error; use log::{debug, error, info}; +use reqwest::StatusCode; use rocket::serde::json::Json; /// Create a new message @@ -119,7 +120,7 @@ pub fn find_all() -> Vec { /// Tx message async fn send_message(out: &Message, jwp: &str) -> Result<(), Box> { - // TODO(c2m): Error handling for http 402 status + let host = utils::get_i2p_http_proxy(); let proxy = reqwest::Proxy::http(&host)?; let client = reqwest::Client::builder().proxy(proxy).build(); @@ -130,16 +131,13 @@ async fn send_message(out: &Message, jwp: &str) -> Result<(), Box> { return match client?.post(format!("http://{}/message/rx", out.to)) .header("proof", jwp).json(&out).send().await { Ok(response) => { - let res = response.text().await; - debug!("send response: {:?}", res); - match res { - Ok(r) => { - if r.contains("402") { error!("Payment required"); } - // remove the mid from fts if necessary - remove_from_fts(String::from(&out.mid)); - Ok(()) - }, - _ => Ok(()), + let status = response.status(); + debug!("send response: {:?}", status.as_str()); + if status == StatusCode::OK || status == StatusCode::PAYMENT_REQUIRED { + remove_from_fts(String::from(&out.mid)); + return Ok(()) + } else { + Ok(()) } } Err(e) => { @@ -179,7 +177,6 @@ async fn is_contact_online(contact: &String, jwp: String) -> Result { - if r.result.version == 0 { error!("Payment required"); } if r.result.version != 0 { Ok(true) } else { Ok(false) } }, _ => Ok(false), @@ -209,10 +206,14 @@ async fn send_to_retry(mid: String) { } debug!("writing fts message index {} for id: {}", msg_list, list_key); db::Interface::write(&s.env, &s.handle, &String::from(list_key), &msg_list); - // restart fts if is empty + // restart fts if not empty let list_key = format!("fts"); let r = db::Interface::read(&s.env, &s.handle, &String::from(&list_key)); - if r == utils::empty_string() { + let v_mid = r.split(","); + let v: Vec = v_mid.map(|s| String::from(s)).collect(); + debug!("fts contents: {:#?}", v); + let cleared = is_fts_clear(r); + if !cleared { debug!("restarting fts"); utils::restart_retry_fts(); } @@ -254,6 +255,14 @@ pub async fn retry_fts() { } let v_mid = r.split(","); let v: Vec = v_mid.map(|s| String::from(s)).collect(); + debug!("fts contents: {:#?}", v); + let cleared = is_fts_clear(r); + if cleared { + // index was created but cleared + info!("terminating retry fts thread"); + db::Interface::delete(&s.env, &s.handle, "fts"); + break; + } for m in v { let message: Message = find(&m); if message.mid != utils::empty_string() { @@ -279,3 +288,11 @@ fn validate_message(j: &Json) -> bool { && j.to == i2p::get_destination() && j.uid .len() < utils::string_limit() } + +fn is_fts_clear(r: String) -> bool { + let v_mid = r.split(","); + let v: Vec = v_mid.map(|s| String::from(s)).collect(); + debug!("fts contents: {:#?}", v); + v.len() >= 2 && v[v.len()-1] == utils::empty_string() + && v[0] == utils::empty_string() +}