Fix ZeroMQ REQ send error after remote process crash/abort (#126)
Some checks failed
unix-ci / build-tests (macos-12, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (macos-12, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (macos-latest, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (macos-latest, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (ubuntu-20.04, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (ubuntu-20.04, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (ubuntu-22.04, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (ubuntu-22.04, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (ubuntu-latest, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (ubuntu-latest, WITH_RMQ=ON) (push) Has been cancelled

This commit is contained in:
Lee *!* Clagett 2024-06-26 14:08:01 -04:00 committed by GitHub
parent 567c1a5f2d
commit 1a7e0a2c57
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -66,21 +66,46 @@ namespace lws
namespace
{
namespace http = epee::net_utils::http;
constexpr const std::chrono::seconds reconnect_backoff{10};
expect<rpc::client*> thread_client(const rpc::client& gclient)
expect<rpc::client*> thread_client(const rpc::client& gclient, const bool reset = false)
{
static boost::thread_specific_ptr<rpc::client> global;
rpc::client* thread_ptr = global.get();
if (thread_ptr)
return {thread_ptr};
struct tclient
{
rpc::client client;
std::chrono::steady_clock::time_point last_connect;
explicit tclient() noexcept
: client(), last_connect(std::chrono::seconds{0})
{}
};
static boost::thread_specific_ptr<tclient> global;
tclient* thread_ptr = global.get();
if (!thread_ptr)
{
thread_ptr = new tclient;
global.reset(thread_ptr);
}
if (reset || !thread_ptr->client)
{
// This reduces ZMQ internal errors with lack of file descriptors
const auto now = std::chrono::steady_clock::now();
if (now - thread_ptr->last_connect < reconnect_backoff)
return {error::daemon_timeout};
// careful, gclient and thread_ptr->client could be aliased
expect<rpc::client> new_client = gclient.clone();
thread_ptr->client = rpc::client{};
thread_ptr->last_connect = now;
if (!new_client)
return new_client.error();
rpc::client* const new_ptr = new rpc::client{std::move(*new_client)};
global.reset(new_ptr);
return {new_ptr};
thread_ptr->client = std::move(*new_client);
}
return {std::addressof(thread_ptr->client)};
}
expect<void> send_with_retry(rpc::client& tclient, epee::byte_slice message, const std::chrono::seconds timeout)
@ -92,8 +117,13 @@ namespace lws
if (resp || resp != net::zmq::make_error_code(EFSM))
break;
// fix state machine by reading+discarding previously timed out response
if (tclient.get_message(timeout).matches(std::errc::timed_out))
return {error::daemon_timeout};
auto read = tclient.get_message(timeout);
if (!read)
{
// message could've been delivered, then dropped in process failure
thread_client(tclient, true);
return read.error();
}
}
return resp;
}