From 6dc9ea772ab4300a235dab99ab5987b483be91a9 Mon Sep 17 00:00:00 2001 From: Lee *!* Clagett Date: Wed, 26 Jun 2024 14:08:01 -0400 Subject: [PATCH] Fix ZeroMQ REQ send error after remote process crash/abort (#126) --- src/rest_server.cpp | 56 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/src/rest_server.cpp b/src/rest_server.cpp index 92235eb..6efa4d7 100644 --- a/src/rest_server.cpp +++ b/src/rest_server.cpp @@ -66,21 +66,46 @@ namespace lws namespace { namespace http = epee::net_utils::http; + constexpr const std::chrono::seconds reconnect_backoff{10}; - expect thread_client(const rpc::client& gclient) + expect thread_client(const rpc::client& gclient, const bool reset = false) { - static boost::thread_specific_ptr global; - rpc::client* thread_ptr = global.get(); - if (thread_ptr) - return {thread_ptr}; + struct tclient + { + rpc::client client; + std::chrono::steady_clock::time_point last_connect; - expect new_client = gclient.clone(); - if (!new_client) - return new_client.error(); + explicit tclient() noexcept + : client(), last_connect(std::chrono::seconds{0}) + {} + }; + static boost::thread_specific_ptr global; - rpc::client* const new_ptr = new rpc::client{std::move(*new_client)}; - global.reset(new_ptr); - return {new_ptr}; + tclient* thread_ptr = global.get(); + if (!thread_ptr) + { + thread_ptr = new tclient; + global.reset(thread_ptr); + } + + if (reset || !thread_ptr->client) + { + // This reduces ZMQ internal errors with lack of file descriptors + const auto now = std::chrono::steady_clock::now(); + if (now - thread_ptr->last_connect < reconnect_backoff) + return {error::daemon_timeout}; + + // careful, gclient and thread_ptr->client could be aliased + expect new_client = gclient.clone(); + thread_ptr->client = rpc::client{}; + thread_ptr->last_connect = now; + if (!new_client) + return new_client.error(); + + thread_ptr->client = std::move(*new_client); + } + + return {std::addressof(thread_ptr->client)}; } expect send_with_retry(rpc::client& tclient, epee::byte_slice message, const std::chrono::seconds timeout) @@ -92,8 +117,13 @@ namespace lws if (resp || resp != net::zmq::make_error_code(EFSM)) break; // fix state machine by reading+discarding previously timed out response - if (tclient.get_message(timeout).matches(std::errc::timed_out)) - return {error::daemon_timeout}; + auto read = tclient.get_message(timeout); + if (!read) + { + // message could've been delivered, then dropped in process failure + thread_client(tclient, true); + return read.error(); + } } return resp; }