begin testin fts message retry

This commit is contained in:
creating2morrow 2023-05-02 01:11:15 -04:00
parent d14fa2c6a6
commit 68b9a4e56f
4 changed files with 35 additions and 39 deletions

View file

@ -118,17 +118,14 @@ pub fn find_all() -> Vec<Message> {
/// Tx message /// Tx message
async fn send_message(out: &Message, jwp: &str) -> Result<(), Box<dyn Error>> { async fn send_message(out: &Message, jwp: &str) -> Result<(), Box<dyn Error>> {
// TODO(c2m): Error handling for http 402 status // TODO(c2m): Error handling for http 402 status
let host = utils::get_i2p_http_proxy(); let host = utils::get_i2p_http_proxy();
let proxy = reqwest::Proxy::http(&host)?; let proxy = reqwest::Proxy::http(&host)?;
let client = reqwest::Client::builder().proxy(proxy).build(); let client = reqwest::Client::builder().proxy(proxy).build();
// TODO(?): Need some assistance with message retry logic
// check if the contact is online // check if the contact is online
// let is_online: bool = is_contact_online(String::from(jwp)).await.unwrap_or(false); let is_online: bool = is_contact_online(&out.to, String::from(jwp)).await.unwrap_or(false);
// if is_online { if is_online {
return match client?.post(format!("http://{}/message/rx", out.to)) return match client?.post(format!("http://{}/message/rx", out.to))
.header("proof", jwp).json(&out).send().await { .header("proof", jwp).json(&out).send().await {
Ok(response) => { Ok(response) => {
@ -138,7 +135,7 @@ async fn send_message(out: &Message, jwp: &str) -> Result<(), Box<dyn Error>> {
Ok(r) => { Ok(r) => {
if r.contains("402") { error!("Payment required"); } if r.contains("402") { error!("Payment required"); }
// remove the mid from fts if necessary // remove the mid from fts if necessary
// remove_from_retry(String::from(&out.mid)); remove_from_retry(String::from(&out.mid));
Ok(()) Ok(())
}, },
_ => Ok(()), _ => Ok(()),
@ -149,10 +146,10 @@ async fn send_message(out: &Message, jwp: &str) -> Result<(), Box<dyn Error>> {
Ok(()) Ok(())
} }
} }
// } else { } else {
// send_to_retry(String::from(&out.mid)).await; send_to_retry(String::from(&out.mid)).await;
// Ok(()) Ok(())
// } }
} }
/// Returns decrypted hex string of the encrypted message /// Returns decrypted hex string of the encrypted message
@ -169,14 +166,12 @@ pub fn delete(mid: &String) {
db::Interface::delete(&s.env, &s.handle, &String::from(mid)); db::Interface::delete(&s.env, &s.handle, &String::from(mid));
} }
/* TODO(?): failed-to-send (fts) needs some work to say the least
/// ping the contact health check over i2p /// ping the contact health check over i2p
async fn is_contact_online(jwp: String) -> Result<bool, Box<dyn Error>> { async fn is_contact_online(contact: &String, jwp: String) -> Result<bool, Box<dyn Error>> {
let host = utils::get_i2p_http_proxy(); let host = utils::get_i2p_http_proxy();
let proxy = reqwest::Proxy::http(&host)?; let proxy = reqwest::Proxy::http(&host)?;
let client = reqwest::Client::builder().proxy(proxy).build(); let client = reqwest::Client::builder().proxy(proxy).build();
match client?.get(format!("http://{}/xmr/rpc/version", host)) match client?.get(format!("http://{}/xmr/rpc/version", contact))
.header("proof", jwp).send().await { .header("proof", jwp).send().await {
Ok(response) => { Ok(response) => {
let res = response.json::<reqres::XmrRpcVersionResponse>().await; let res = response.json::<reqres::XmrRpcVersionResponse>().await;
@ -198,14 +193,16 @@ async fn is_contact_online(jwp: String) -> Result<bool, Box<dyn Error>> {
/// stage message for async retry /// stage message for async retry
async fn send_to_retry(mid: String) { async fn send_to_retry(mid: String) {
info!("sending {} to fts", &mid);
let s = db::Interface::open(); let s = db::Interface::open();
// in order to retrieve FTS (failed-to-send), write keys to with fts // in order to retrieve FTS (failed-to-send), write keys to db with fts
let list_key = format!("fts"); let list_key = format!("fts");
let r = db::Interface::read(&s.env, &s.handle, &String::from(&list_key)); let r = db::Interface::read(&s.env, &s.handle, &String::from(&list_key));
if r == utils::empty_string() { if r == utils::empty_string() {
debug!("creating fts message index"); debug!("creating fts message index");
} }
let mut msg_list = [String::from(&r), String::from(&mid)].join(","); let mut msg_list = [String::from(&r), String::from(&mid)].join(",");
// don't duplicate message ids in fts
if String::from(&r).contains(&String::from(&mid)) { if String::from(&r).contains(&String::from(&mid)) {
msg_list = r; msg_list = r;
} }
@ -215,6 +212,7 @@ async fn send_to_retry(mid: String) {
/// clear fts message from index /// clear fts message from index
fn remove_from_retry(mid: String) { fn remove_from_retry(mid: String) {
info!("removing id {} from fts", &mid);
let s = db::Interface::open(); let s = db::Interface::open();
// in order to retrieve FTS (failed-to-send), write keys to with fts // in order to retrieve FTS (failed-to-send), write keys to with fts
let list_key = format!("fts"); let list_key = format!("fts");
@ -233,6 +231,7 @@ fn remove_from_retry(mid: String) {
pub async fn retry_fts() { pub async fn retry_fts() {
let tick: std::sync::mpsc::Receiver<()> = schedule_recv::periodic_ms(60000); let tick: std::sync::mpsc::Receiver<()> = schedule_recv::periodic_ms(60000);
loop { loop {
debug!("running retry failed-to-send thread");
tick.recv().unwrap(); tick.recv().unwrap();
let s = db::Interface::open(); let s = db::Interface::open();
let list_key = format!("fts"); let list_key = format!("fts");
@ -254,5 +253,3 @@ pub async fn retry_fts() {
} }
} }
} }
*/

View file

@ -1,7 +1,7 @@
use rand_core::RngCore; use rand_core::RngCore;
use clap::Parser; use clap::Parser;
use rocket::serde::json::Json; use rocket::serde::json::Json;
use crate::{args, db, i2p, models, monero, gpg, utils, reqres}; use crate::{args, db, i2p, message, models, monero, gpg, utils, reqres};
use log::{info, debug, error, warn}; use log::{info, debug, error, warn};
use std::time::Duration; use std::time::Duration;
@ -332,7 +332,7 @@ pub async fn start_up() {
gen_app_gpg().await; gen_app_gpg().await;
let env: String = get_release_env().value(); let env: String = get_release_env().value();
start_gui(); start_gui();
// { tokio::spawn(async { message::retry_fts().await; }); } { tokio::spawn(async { message::retry_fts().await; }); }
info!("{} - nevmes is online", env); info!("{} - nevmes is online", env);
} }

View file

@ -3,29 +3,27 @@ use rocket::response::status::Custom;
use rocket::serde::json::Json; use rocket::serde::json::Json;
use rocket::{get, post}; use rocket::{get, post};
use nevmes_core::{contact, i2p, message, models, monero, reqres, proof}; use nevmes_core::{contact, i2p, message, models, monero, proof, reqres};
// JSON APIs exposed over i2p // JSON APIs exposed over i2p
/// Get payment API version /// Get payment API version
/// ///
/// Protected: false /// Protected: false
/// ///
/// This also functions as a health check /// This also functions as a health check
#[get("/version")] #[get("/version")]
pub async fn get_version pub async fn get_version(_jwp: proof::PaymentProof) -> Custom<Json<reqres::XmrRpcVersionResponse>> {
(_jwp: proof::PaymentProof) -> Custom<Json<reqres::XmrRpcVersionResponse>> {
Custom(Status::Ok, Json(monero::get_version().await)) Custom(Status::Ok, Json(monero::get_version().await))
} }
/// If i2p not in the state of rejecting tunnels this will return `open: true` /// If i2p not in the state of rejecting tunnels this will return `open: true`
/// ///
/// Protected: false /// Protected: false
/// ///
/// This also functions as a health check /// This also functions as a health check
#[get("/status")] #[get("/status")]
pub async fn get_i2p_status pub async fn get_i2p_status() -> Custom<Json<i2p::HttpProxyStatus>> {
() -> Custom<Json<i2p::HttpProxyStatus>> {
Custom(Status::Ok, Json(i2p::get_proxy_status().await)) Custom(Status::Ok, Json(i2p::get_proxy_status().await))
} }
@ -39,17 +37,19 @@ pub async fn share_contact_info() -> Custom<Json<models::Contact>> {
} }
/// Recieve messages here /// Recieve messages here
/// ///
/// Protected: true /// Protected: true
#[post("/", data="<message>")] #[post("/", data = "<message>")]
pub async fn rx_message pub async fn rx_message(
(_jwp: proof::PaymentProof, message: Json<models::Message>) -> Custom<Json<models::Message>> { _jwp: proof::PaymentProof,
message: Json<models::Message>,
) -> Custom<Json<models::Message>> {
message::rx(message).await; message::rx(message).await;
Custom(Status::Ok, Json(Default::default()),) Custom(Status::Ok, Json(Default::default()))
} }
/// invoice generation /// invoice generation
/// ///
/// Protected: false /// Protected: false
#[get("/")] #[get("/")]
pub async fn gen_invoice() -> Custom<Json<reqres::Invoice>> { pub async fn gen_invoice() -> Custom<Json<reqres::Invoice>> {
@ -58,11 +58,10 @@ pub async fn gen_invoice() -> Custom<Json<reqres::Invoice>> {
} }
/// jwp generation /// jwp generation
/// ///
/// Protected: false /// Protected: false
#[post("/", data="<proof>")] #[post("/", data = "<proof>")]
pub async fn gen_jwp pub async fn gen_jwp(proof: Json<proof::TxProof>) -> Custom<Json<reqres::Jwp>> {
(proof: Json<proof::TxProof>) -> Custom<Json<reqres::Jwp>> {
let jwp = proof::create_jwp(&proof).await; let jwp = proof::create_jwp(&proof).await;
Custom(Status::Ok, Json(reqres::Jwp { jwp })) Custom(Status::Ok, Json(reqres::Jwp { jwp }))
} }

View file

@ -1,8 +1,8 @@
#[macro_use] #[macro_use]
extern crate rocket; extern crate rocket;
use nevmes_core::*;
use nevmes::*; use nevmes::*;
use nevmes_core::*;
// The only changes in here should be mounting new controller methods // The only changes in here should be mounting new controller methods
#[launch] #[launch]