Fix ZeroMQ REQ send error after remote process crash/abort (#126)

This commit is contained in:
Lee *!* Clagett 2024-06-26 14:08:01 -04:00 committed by Lee *!* Clagett
parent 27b682ba69
commit 57c67ae0e4

View file

@ -66,21 +66,46 @@ namespace lws
namespace namespace
{ {
namespace http = epee::net_utils::http; namespace http = epee::net_utils::http;
constexpr const std::chrono::seconds reconnect_backoff{10};
expect<rpc::client*> thread_client(const rpc::client& gclient) expect<rpc::client*> thread_client(const rpc::client& gclient, const bool reset = false)
{ {
static boost::thread_specific_ptr<rpc::client> global; struct tclient
rpc::client* thread_ptr = global.get(); {
if (thread_ptr) rpc::client client;
return {thread_ptr}; std::chrono::steady_clock::time_point last_connect;
expect<rpc::client> new_client = gclient.clone(); explicit tclient() noexcept
if (!new_client) : client(), last_connect(std::chrono::seconds{0})
return new_client.error(); {}
};
static boost::thread_specific_ptr<tclient> global;
rpc::client* const new_ptr = new rpc::client{std::move(*new_client)}; tclient* thread_ptr = global.get();
global.reset(new_ptr); if (!thread_ptr)
return {new_ptr}; {
thread_ptr = new tclient;
global.reset(thread_ptr);
}
if (reset || !thread_ptr->client)
{
// This reduces ZMQ internal errors with lack of file descriptors
const auto now = std::chrono::steady_clock::now();
if (now - thread_ptr->last_connect < reconnect_backoff)
return {error::daemon_timeout};
// careful, gclient and thread_ptr->client could be aliased
expect<rpc::client> new_client = gclient.clone();
thread_ptr->client = rpc::client{};
thread_ptr->last_connect = now;
if (!new_client)
return new_client.error();
thread_ptr->client = std::move(*new_client);
}
return {std::addressof(thread_ptr->client)};
} }
expect<void> send_with_retry(rpc::client& tclient, epee::byte_slice message, const std::chrono::seconds timeout) expect<void> send_with_retry(rpc::client& tclient, epee::byte_slice message, const std::chrono::seconds timeout)
@ -92,8 +117,13 @@ namespace lws
if (resp || resp != net::zmq::make_error_code(EFSM)) if (resp || resp != net::zmq::make_error_code(EFSM))
break; break;
// fix state machine by reading+discarding previously timed out response // fix state machine by reading+discarding previously timed out response
if (tclient.get_message(timeout).matches(std::errc::timed_out)) auto read = tclient.get_message(timeout);
return {error::daemon_timeout}; if (!read)
{
// message could've been delivered, then dropped in process failure
thread_client(tclient, true);
return read.error();
}
} }
return resp; return resp;
} }