From 27b682ba69bda30594f5bf8861175a1c1c6b4ca1 Mon Sep 17 00:00:00 2001 From: Lee *!* Clagett Date: Tue, 25 Jun 2024 22:07:02 -0400 Subject: [PATCH] Fix ZeroMQ invalid REQ/REP state after read timeout. (#125) After a read timeout, the thread-local ZMQ client is expecting a read, and not send on the REQ socket. This attempts to read+discard 1 message before retrying the REQ send. This has proven effective at recovering from a temporarily unavailable daemon. --- src/rest_server.cpp | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/src/rest_server.cpp b/src/rest_server.cpp index c75166a..92235eb 100644 --- a/src/rest_server.cpp +++ b/src/rest_server.cpp @@ -47,6 +47,7 @@ #include "net/http_base.h" // monero/contrib/epee/include #include "net/net_parse_helpers.h" // monero/contrib/epee/include #include "net/net_ssl.h" // monero/contrib/epee/include +#include "net/zmq.h" // monero/src #include "rpc/admin.h" #include "rpc/client.h" #include "rpc/daemon_messages.h" // monero/src @@ -82,6 +83,21 @@ namespace lws return {new_ptr}; } + expect send_with_retry(rpc::client& tclient, epee::byte_slice message, const std::chrono::seconds timeout) + { + expect resp{common_error::kInvalidArgument}; + for (unsigned i = 0; i < 2; ++i) + { + resp = tclient.send(message.clone(), timeout); + if (resp || resp != net::zmq::make_error_code(EFSM)) + break; + // fix state machine by reading+discarding previously timed out response + if (tclient.get_message(timeout).matches(std::errc::timed_out)) + return {error::daemon_timeout}; + } + return resp; + } + struct context : epee::net_utils::connection_context_base { context() @@ -402,7 +418,7 @@ namespace lws histogram_req.recent_cutoff = 0; epee::byte_slice msg = rpc::client::make_message("get_output_histogram", histogram_req); - MONERO_CHECK((*tclient)->send(std::move(msg), std::chrono::seconds{10})); + MONERO_CHECK(send_with_retry(**tclient, std::move(msg), std::chrono::seconds{10})); auto histogram_resp = (*tclient)->receive(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION); if (!histogram_resp) @@ -430,7 +446,7 @@ namespace lws epee::byte_slice msg = rpc::client::make_message("get_output_distribution", distribution_req); - MONERO_CHECK((*tclient)->send(std::move(msg), std::chrono::seconds{10})); + MONERO_CHECK(send_with_retry(**tclient, std::move(msg), std::chrono::seconds{10})); auto distribution_resp = (*tclient)->receive(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION); @@ -476,7 +492,7 @@ namespace lws keys_req.outputs = std::move(ids); epee::byte_slice msg = rpc::client::make_message("get_output_keys", keys_req); - MONERO_CHECK(tclient->send(std::move(msg), std::chrono::seconds{10})); + MONERO_CHECK(send_with_retry(*tclient, std::move(msg), std::chrono::seconds{10})); auto keys_resp = tclient->receive(std::chrono::seconds{10}, MLWS_CURRENT_LOCATION); if (!keys_resp) @@ -541,7 +557,7 @@ namespace lws rpc_command::Request req{}; req.num_grace_blocks = 10; epee::byte_slice msg = rpc::client::make_message("get_dynamic_fee_estimate", req); - MONERO_CHECK((*tclient)->send(std::move(msg), std::chrono::seconds{10})); + MONERO_CHECK(send_with_retry(**tclient, std::move(msg), std::chrono::seconds{10})); } if ((req.use_dust && *req.use_dust) || !req.dust_threshold) @@ -772,7 +788,7 @@ namespace lws daemon_req.tx_as_hex = std::move(req.tx); epee::byte_slice message = rpc::client::make_message("send_raw_tx_hex", daemon_req); - MONERO_CHECK((*tclient)->send(std::move(message), std::chrono::seconds{10})); + MONERO_CHECK(send_with_retry(**tclient, std::move(message), std::chrono::seconds{10})); const auto daemon_resp = (*tclient)->receive(std::chrono::seconds{20}, MLWS_CURRENT_LOCATION); if (!daemon_resp)