successful fts rebroadcated message

This commit is contained in:
creating2morrow 2023-05-09 02:55:55 -04:00
parent a7135a53f7
commit 827f19a1c7

View file

@ -2,6 +2,7 @@
use crate::{contact, db, models::*, utils, reqres, i2p, gpg};
use std::error::Error;
use log::{debug, error, info};
use reqwest::StatusCode;
use rocket::serde::json::Json;
/// Create a new message
@ -119,7 +120,7 @@ pub fn find_all() -> Vec<Message> {
/// Tx message
async fn send_message(out: &Message, jwp: &str) -> Result<(), Box<dyn Error>> {
// TODO(c2m): Error handling for http 402 status
let host = utils::get_i2p_http_proxy();
let proxy = reqwest::Proxy::http(&host)?;
let client = reqwest::Client::builder().proxy(proxy).build();
@ -130,16 +131,13 @@ async fn send_message(out: &Message, jwp: &str) -> Result<(), Box<dyn Error>> {
return match client?.post(format!("http://{}/message/rx", out.to))
.header("proof", jwp).json(&out).send().await {
Ok(response) => {
let res = response.text().await;
debug!("send response: {:?}", res);
match res {
Ok(r) => {
if r.contains("402") { error!("Payment required"); }
// remove the mid from fts if necessary
remove_from_fts(String::from(&out.mid));
Ok(())
},
_ => Ok(()),
let status = response.status();
debug!("send response: {:?}", status.as_str());
if status == StatusCode::OK || status == StatusCode::PAYMENT_REQUIRED {
remove_from_fts(String::from(&out.mid));
return Ok(())
} else {
Ok(())
}
}
Err(e) => {
@ -179,7 +177,6 @@ async fn is_contact_online(contact: &String, jwp: String) -> Result<bool, Box<dy
debug!("check is contact online by version response: {:?}", res);
match res {
Ok(r) => {
if r.result.version == 0 { error!("Payment required"); }
if r.result.version != 0 { Ok(true) } else { Ok(false) }
},
_ => Ok(false),
@ -209,10 +206,14 @@ async fn send_to_retry(mid: String) {
}
debug!("writing fts message index {} for id: {}", msg_list, list_key);
db::Interface::write(&s.env, &s.handle, &String::from(list_key), &msg_list);
// restart fts if is empty
// restart fts if not empty
let list_key = format!("fts");
let r = db::Interface::read(&s.env, &s.handle, &String::from(&list_key));
if r == utils::empty_string() {
let v_mid = r.split(",");
let v: Vec<String> = v_mid.map(|s| String::from(s)).collect();
debug!("fts contents: {:#?}", v);
let cleared = is_fts_clear(r);
if !cleared {
debug!("restarting fts");
utils::restart_retry_fts();
}
@ -254,6 +255,14 @@ pub async fn retry_fts() {
}
let v_mid = r.split(",");
let v: Vec<String> = v_mid.map(|s| String::from(s)).collect();
debug!("fts contents: {:#?}", v);
let cleared = is_fts_clear(r);
if cleared {
// index was created but cleared
info!("terminating retry fts thread");
db::Interface::delete(&s.env, &s.handle, "fts");
break;
}
for m in v {
let message: Message = find(&m);
if message.mid != utils::empty_string() {
@ -279,3 +288,11 @@ fn validate_message(j: &Json<Message>) -> bool {
&& j.to == i2p::get_destination()
&& j.uid .len() < utils::string_limit()
}
fn is_fts_clear(r: String) -> bool {
let v_mid = r.split(",");
let v: Vec<String> = v_mid.map(|s| String::from(s)).collect();
debug!("fts contents: {:#?}", v);
v.len() >= 2 && v[v.len()-1] == utils::empty_string()
&& v[0] == utils::empty_string()
}