From 68b9a4e56fd8bbe61143867b235158099d3a0d1d Mon Sep 17 00:00:00 2001 From: creating2morrow Date: Tue, 2 May 2023 01:11:15 -0400 Subject: [PATCH] begin testin fts message retry --- nevmes-core/src/message.rs | 31 ++++++++++++++----------------- nevmes-core/src/utils.rs | 4 ++-- src/controller.rs | 37 ++++++++++++++++++------------------- src/main.rs | 2 +- 4 files changed, 35 insertions(+), 39 deletions(-) diff --git a/nevmes-core/src/message.rs b/nevmes-core/src/message.rs index f129d6e..606f9bb 100644 --- a/nevmes-core/src/message.rs +++ b/nevmes-core/src/message.rs @@ -118,17 +118,14 @@ pub fn find_all() -> Vec { /// Tx message async fn send_message(out: &Message, jwp: &str) -> Result<(), Box> { - // TODO(c2m): Error handling for http 402 status let host = utils::get_i2p_http_proxy(); let proxy = reqwest::Proxy::http(&host)?; let client = reqwest::Client::builder().proxy(proxy).build(); - // TODO(?): Need some assistance with message retry logic - // check if the contact is online - // let is_online: bool = is_contact_online(String::from(jwp)).await.unwrap_or(false); - // if is_online { + let is_online: bool = is_contact_online(&out.to, String::from(jwp)).await.unwrap_or(false); + if is_online { return match client?.post(format!("http://{}/message/rx", out.to)) .header("proof", jwp).json(&out).send().await { Ok(response) => { @@ -138,7 +135,7 @@ async fn send_message(out: &Message, jwp: &str) -> Result<(), Box> { Ok(r) => { if r.contains("402") { error!("Payment required"); } // remove the mid from fts if necessary - // remove_from_retry(String::from(&out.mid)); + remove_from_retry(String::from(&out.mid)); Ok(()) }, _ => Ok(()), @@ -149,10 +146,10 @@ async fn send_message(out: &Message, jwp: &str) -> Result<(), Box> { Ok(()) } } - // } else { - // send_to_retry(String::from(&out.mid)).await; - // Ok(()) - // } + } else { + send_to_retry(String::from(&out.mid)).await; + Ok(()) + } } /// Returns decrypted hex string of the encrypted message @@ -169,14 +166,12 @@ pub fn delete(mid: &String) { db::Interface::delete(&s.env, &s.handle, &String::from(mid)); } -/* TODO(?): failed-to-send (fts) needs some work to say the least - /// ping the contact health check over i2p -async fn is_contact_online(jwp: String) -> Result> { +async fn is_contact_online(contact: &String, jwp: String) -> Result> { let host = utils::get_i2p_http_proxy(); let proxy = reqwest::Proxy::http(&host)?; let client = reqwest::Client::builder().proxy(proxy).build(); - match client?.get(format!("http://{}/xmr/rpc/version", host)) + match client?.get(format!("http://{}/xmr/rpc/version", contact)) .header("proof", jwp).send().await { Ok(response) => { let res = response.json::().await; @@ -198,14 +193,16 @@ async fn is_contact_online(jwp: String) -> Result> { /// stage message for async retry async fn send_to_retry(mid: String) { + info!("sending {} to fts", &mid); let s = db::Interface::open(); - // in order to retrieve FTS (failed-to-send), write keys to with fts + // in order to retrieve FTS (failed-to-send), write keys to db with fts let list_key = format!("fts"); let r = db::Interface::read(&s.env, &s.handle, &String::from(&list_key)); if r == utils::empty_string() { debug!("creating fts message index"); } let mut msg_list = [String::from(&r), String::from(&mid)].join(","); + // don't duplicate message ids in fts if String::from(&r).contains(&String::from(&mid)) { msg_list = r; } @@ -215,6 +212,7 @@ async fn send_to_retry(mid: String) { /// clear fts message from index fn remove_from_retry(mid: String) { + info!("removing id {} from fts", &mid); let s = db::Interface::open(); // in order to retrieve FTS (failed-to-send), write keys to with fts let list_key = format!("fts"); @@ -233,6 +231,7 @@ fn remove_from_retry(mid: String) { pub async fn retry_fts() { let tick: std::sync::mpsc::Receiver<()> = schedule_recv::periodic_ms(60000); loop { + debug!("running retry failed-to-send thread"); tick.recv().unwrap(); let s = db::Interface::open(); let list_key = format!("fts"); @@ -254,5 +253,3 @@ pub async fn retry_fts() { } } } - -*/ \ No newline at end of file diff --git a/nevmes-core/src/utils.rs b/nevmes-core/src/utils.rs index aedb36a..7657845 100644 --- a/nevmes-core/src/utils.rs +++ b/nevmes-core/src/utils.rs @@ -1,7 +1,7 @@ use rand_core::RngCore; use clap::Parser; use rocket::serde::json::Json; -use crate::{args, db, i2p, models, monero, gpg, utils, reqres}; +use crate::{args, db, i2p, message, models, monero, gpg, utils, reqres}; use log::{info, debug, error, warn}; use std::time::Duration; @@ -332,7 +332,7 @@ pub async fn start_up() { gen_app_gpg().await; let env: String = get_release_env().value(); start_gui(); - // { tokio::spawn(async { message::retry_fts().await; }); } + { tokio::spawn(async { message::retry_fts().await; }); } info!("{} - nevmes is online", env); } diff --git a/src/controller.rs b/src/controller.rs index a97100b..8797f0e 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -3,29 +3,27 @@ use rocket::response::status::Custom; use rocket::serde::json::Json; use rocket::{get, post}; -use nevmes_core::{contact, i2p, message, models, monero, reqres, proof}; +use nevmes_core::{contact, i2p, message, models, monero, proof, reqres}; // JSON APIs exposed over i2p /// Get payment API version -/// +/// /// Protected: false -/// +/// /// This also functions as a health check #[get("/version")] -pub async fn get_version -(_jwp: proof::PaymentProof) -> Custom> { +pub async fn get_version(_jwp: proof::PaymentProof) -> Custom> { Custom(Status::Ok, Json(monero::get_version().await)) } /// If i2p not in the state of rejecting tunnels this will return `open: true` -/// +/// /// Protected: false -/// +/// /// This also functions as a health check #[get("/status")] -pub async fn get_i2p_status -() -> Custom> { +pub async fn get_i2p_status() -> Custom> { Custom(Status::Ok, Json(i2p::get_proxy_status().await)) } @@ -39,17 +37,19 @@ pub async fn share_contact_info() -> Custom> { } /// Recieve messages here -/// +/// /// Protected: true -#[post("/", data="")] -pub async fn rx_message -(_jwp: proof::PaymentProof, message: Json) -> Custom> { +#[post("/", data = "")] +pub async fn rx_message( + _jwp: proof::PaymentProof, + message: Json, +) -> Custom> { message::rx(message).await; - Custom(Status::Ok, Json(Default::default()),) + Custom(Status::Ok, Json(Default::default())) } /// invoice generation -/// +/// /// Protected: false #[get("/")] pub async fn gen_invoice() -> Custom> { @@ -58,11 +58,10 @@ pub async fn gen_invoice() -> Custom> { } /// jwp generation -/// +/// /// Protected: false -#[post("/", data="")] -pub async fn gen_jwp -(proof: Json) -> Custom> { +#[post("/", data = "")] +pub async fn gen_jwp(proof: Json) -> Custom> { let jwp = proof::create_jwp(&proof).await; Custom(Status::Ok, Json(reqres::Jwp { jwp })) } diff --git a/src/main.rs b/src/main.rs index ca8510a..14e7d7b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,8 @@ #[macro_use] extern crate rocket; -use nevmes_core::*; use nevmes::*; +use nevmes_core::*; // The only changes in here should be mounting new controller methods #[launch]