Fix ZeroMQ invalid REQ/REP state after read timeout. (#125)
Some checks are pending
unix-ci / build-tests (macos-12, WITH_RMQ=OFF) (push) Waiting to run
unix-ci / build-tests (macos-12, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (macos-latest, WITH_RMQ=OFF) (push) Waiting to run
unix-ci / build-tests (macos-latest, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (ubuntu-20.04, WITH_RMQ=OFF) (push) Waiting to run
unix-ci / build-tests (ubuntu-20.04, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (ubuntu-22.04, WITH_RMQ=OFF) (push) Waiting to run
unix-ci / build-tests (ubuntu-22.04, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (ubuntu-latest, WITH_RMQ=OFF) (push) Waiting to run
unix-ci / build-tests (ubuntu-latest, WITH_RMQ=ON) (push) Waiting to run

After a read timeout, the thread-local ZMQ client is expecting a read,
and not send on the REQ socket. This attempts to read+discard 1 message
before retrying the REQ send. This has proven effective at recovering
from a temporarily unavailable daemon.
This commit is contained in:
Lee *!* Clagett 2024-06-25 22:07:02 -04:00 committed by Lee *!* Clagett
parent c2ca85ef30
commit 2883e77367

View file

@ -47,6 +47,7 @@
#include "net/http_base.h" // monero/contrib/epee/include
#include "net/net_parse_helpers.h" // monero/contrib/epee/include
#include "net/net_ssl.h" // monero/contrib/epee/include
#include "net/zmq.h" // monero/src
#include "rpc/admin.h"
#include "rpc/client.h"
#include "rpc/daemon_messages.h" // monero/src
@ -82,6 +83,21 @@ namespace lws
return {new_ptr};
}
expect<void> send_with_retry(rpc::client& tclient, epee::byte_slice message, const std::chrono::seconds timeout)
{
expect<void> resp{common_error::kInvalidArgument};
for (unsigned i = 0; i < 2; ++i)
{
resp = tclient.send(message.clone(), timeout);
if (resp || resp != net::zmq::make_error_code(EFSM))
break;
// fix state machine by reading+discarding previously timed out response
if (tclient.get_message(timeout).matches(std::errc::timed_out))
return {error::daemon_timeout};
}
return resp;
}
struct context : epee::net_utils::connection_context_base
{
context()
@ -402,7 +418,7 @@ namespace lws
histogram_req.recent_cutoff = 0;
epee::byte_slice msg = rpc::client::make_message("get_output_histogram", histogram_req);
MONERO_CHECK((*tclient)->send(std::move(msg), std::chrono::seconds{10}));
MONERO_CHECK(send_with_retry(**tclient, std::move(msg), std::chrono::seconds{10}));
auto histogram_resp = (*tclient)->receive<histogram_rpc::Response>(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION);
if (!histogram_resp)
@ -430,7 +446,7 @@ namespace lws
epee::byte_slice msg =
rpc::client::make_message("get_output_distribution", distribution_req);
MONERO_CHECK((*tclient)->send(std::move(msg), std::chrono::seconds{10}));
MONERO_CHECK(send_with_retry(**tclient, std::move(msg), std::chrono::seconds{10}));
auto distribution_resp =
(*tclient)->receive<distribution_rpc::Response>(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION);
@ -476,7 +492,7 @@ namespace lws
keys_req.outputs = std::move(ids);
epee::byte_slice msg = rpc::client::make_message("get_output_keys", keys_req);
MONERO_CHECK(tclient->send(std::move(msg), std::chrono::seconds{10}));
MONERO_CHECK(send_with_retry(*tclient, std::move(msg), std::chrono::seconds{10}));
auto keys_resp = tclient->receive<get_keys_rpc::Response>(std::chrono::seconds{10}, MLWS_CURRENT_LOCATION);
if (!keys_resp)
@ -541,7 +557,7 @@ namespace lws
rpc_command::Request req{};
req.num_grace_blocks = 10;
epee::byte_slice msg = rpc::client::make_message("get_dynamic_fee_estimate", req);
MONERO_CHECK((*tclient)->send(std::move(msg), std::chrono::seconds{10}));
MONERO_CHECK(send_with_retry(**tclient, std::move(msg), std::chrono::seconds{10}));
}
if ((req.use_dust && *req.use_dust) || !req.dust_threshold)
@ -772,7 +788,7 @@ namespace lws
daemon_req.tx_as_hex = std::move(req.tx);
epee::byte_slice message = rpc::client::make_message("send_raw_tx_hex", daemon_req);
MONERO_CHECK((*tclient)->send(std::move(message), std::chrono::seconds{10}));
MONERO_CHECK(send_with_retry(**tclient, std::move(message), std::chrono::seconds{10}));
const auto daemon_resp = (*tclient)->receive<transaction_rpc::Response>(std::chrono::seconds{20}, MLWS_CURRENT_LOCATION);
if (!daemon_resp)