Fix too many open files issue with ZeroMQ (#100)

This commit is contained in:
Lee *!* Clagett 2024-04-06 15:50:50 -04:00 committed by Lee *!* Clagett
parent cde050baf1
commit f785cad557

View file

@ -28,6 +28,7 @@
#include <algorithm> #include <algorithm>
#include <boost/range/counting_range.hpp> #include <boost/range/counting_range.hpp>
#include <boost/thread/tss.hpp>
#include <boost/utility/string_ref.hpp> #include <boost/utility/string_ref.hpp>
#include <cstring> #include <cstring>
#include <limits> #include <limits>
@ -65,6 +66,22 @@ namespace lws
{ {
namespace http = epee::net_utils::http; namespace http = epee::net_utils::http;
expect<rpc::client*> thread_client(const rpc::client& gclient)
{
static boost::thread_specific_ptr<rpc::client> global;
rpc::client* thread_ptr = global.get();
if (thread_ptr)
return {thread_ptr};
expect<rpc::client> new_client = gclient.clone();
if (!new_client)
return new_client.error();
rpc::client* const new_ptr = new rpc::client{std::move(*new_client)};
global.reset(new_ptr);
return {new_ptr};
}
struct context : epee::net_utils::connection_context_base struct context : epee::net_utils::connection_context_base
{ {
context() context()
@ -360,9 +377,11 @@ namespace lws
if (50 < req.count || 20 < amounts.size()) if (50 < req.count || 20 < amounts.size())
return {lws::error::exceeded_rest_request_limit}; return {lws::error::exceeded_rest_request_limit};
expect<rpc::client> client = gclient.clone(); const expect<rpc::client*> tclient = thread_client(gclient);
if (!client) if (!tclient)
return client.error(); return tclient.error();
if (*tclient == nullptr)
throw std::logic_error{"Unexpected rpc::client nullptr"};
const std::greater<std::uint64_t> rsort{}; const std::greater<std::uint64_t> rsort{};
std::sort(amounts.begin(), amounts.end(), rsort); std::sort(amounts.begin(), amounts.end(), rsort);
@ -383,9 +402,9 @@ namespace lws
histogram_req.recent_cutoff = 0; histogram_req.recent_cutoff = 0;
epee::byte_slice msg = rpc::client::make_message("get_output_histogram", histogram_req); epee::byte_slice msg = rpc::client::make_message("get_output_histogram", histogram_req);
MONERO_CHECK(client->send(std::move(msg), std::chrono::seconds{10})); MONERO_CHECK((*tclient)->send(std::move(msg), std::chrono::seconds{10}));
auto histogram_resp = client->receive<histogram_rpc::Response>(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION); auto histogram_resp = (*tclient)->receive<histogram_rpc::Response>(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION);
if (!histogram_resp) if (!histogram_resp)
return histogram_resp.error(); return histogram_resp.error();
if (histogram_resp->histogram.size() != histogram_req.amounts.size()) if (histogram_resp->histogram.size() != histogram_req.amounts.size())
@ -411,10 +430,10 @@ namespace lws
epee::byte_slice msg = epee::byte_slice msg =
rpc::client::make_message("get_output_distribution", distribution_req); rpc::client::make_message("get_output_distribution", distribution_req);
MONERO_CHECK(client->send(std::move(msg), std::chrono::seconds{10})); MONERO_CHECK((*tclient)->send(std::move(msg), std::chrono::seconds{10}));
auto distribution_resp = auto distribution_resp =
client->receive<distribution_rpc::Response>(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION); (*tclient)->receive<distribution_rpc::Response>(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION);
if (!distribution_resp) if (!distribution_resp)
return distribution_resp.error(); return distribution_resp.error();
@ -438,32 +457,28 @@ namespace lws
const and copied in the function instead of using a reference to const and copied in the function instead of using a reference to
make the callback in `std::function` thread-safe. This shouldn't make the callback in `std::function` thread-safe. This shouldn't
be a problem now, but this is just-in-case of a future refactor. */ be a problem now, but this is just-in-case of a future refactor. */
rpc::client gclient; rpc::client* tclient;
public: public:
zmq_fetch_keys(rpc::client src) noexcept zmq_fetch_keys(rpc::client* src) noexcept
: gclient(std::move(src)) : tclient(src)
{} {}
zmq_fetch_keys(zmq_fetch_keys&&) = default; zmq_fetch_keys(zmq_fetch_keys&&) = default;
zmq_fetch_keys(zmq_fetch_keys const& rhs) zmq_fetch_keys(const zmq_fetch_keys&) = default;
: gclient(MONERO_UNWRAP(rhs.gclient.clone()))
{}
expect<std::vector<output_keys>> operator()(std::vector<lws::output_ref> ids) const expect<std::vector<output_keys>> operator()(std::vector<lws::output_ref> ids) const
{ {
using get_keys_rpc = cryptonote::rpc::GetOutputKeys; using get_keys_rpc = cryptonote::rpc::GetOutputKeys;
if (tclient == nullptr)
throw std::logic_error{"Unexpected nullptr in zmq_fetch_keys"};
get_keys_rpc::Request keys_req{}; get_keys_rpc::Request keys_req{};
keys_req.outputs = std::move(ids); keys_req.outputs = std::move(ids);
expect<rpc::client> client = gclient.clone();
if (!client)
return client.error();
epee::byte_slice msg = rpc::client::make_message("get_output_keys", keys_req); epee::byte_slice msg = rpc::client::make_message("get_output_keys", keys_req);
MONERO_CHECK(client->send(std::move(msg), std::chrono::seconds{10})); MONERO_CHECK(tclient->send(std::move(msg), std::chrono::seconds{10}));
auto keys_resp = client->receive<get_keys_rpc::Response>(std::chrono::seconds{10}, MLWS_CURRENT_LOCATION); auto keys_resp = tclient->receive<get_keys_rpc::Response>(std::chrono::seconds{10}, MLWS_CURRENT_LOCATION);
if (!keys_resp) if (!keys_resp)
return keys_resp.error(); return keys_resp.error();
@ -477,7 +492,7 @@ namespace lws
epee::to_span(amounts), epee::to_span(amounts),
pick_rct, pick_rct,
epee::to_mut_span(histograms), epee::to_mut_span(histograms),
zmq_fetch_keys{std::move(*client)} zmq_fetch_keys{*tclient}
); );
if (!rings) if (!rings)
return rings.error(); return rings.error();
@ -516,15 +531,17 @@ namespace lws
if (!user) if (!user)
return user.error(); return user.error();
expect<rpc::client> client = gclient.clone(); const expect<rpc::client*> tclient = thread_client(gclient);
if (!client) if (!tclient)
return client.error(); return tclient.error();
if (*tclient == nullptr)
throw std::logic_error{"Unexpected rpc::client nullptr"};
{ {
rpc_command::Request req{}; rpc_command::Request req{};
req.num_grace_blocks = 10; req.num_grace_blocks = 10;
epee::byte_slice msg = rpc::client::make_message("get_dynamic_fee_estimate", req); epee::byte_slice msg = rpc::client::make_message("get_dynamic_fee_estimate", req);
MONERO_CHECK(client->send(std::move(msg), std::chrono::seconds{10})); MONERO_CHECK((*tclient)->send(std::move(msg), std::chrono::seconds{10}));
} }
if ((req.use_dust && req.use_dust) || !req.dust_threshold) if ((req.use_dust && req.use_dust) || !req.dust_threshold)
@ -561,7 +578,7 @@ namespace lws
if (received < std::uint64_t(req.amount)) if (received < std::uint64_t(req.amount))
return {lws::error::account_not_found}; return {lws::error::account_not_found};
const auto resp = client->receive<rpc_command::Response>(std::chrono::seconds{20}, MLWS_CURRENT_LOCATION); const auto resp = (*tclient)->receive<rpc_command::Response>(std::chrono::seconds{20}, MLWS_CURRENT_LOCATION);
if (!resp) if (!resp)
return resp.error(); return resp.error();
@ -659,11 +676,14 @@ namespace lws
if (!hooks->empty()) if (!hooks->empty())
{ {
expect<rpc::client> client = gclient.clone(); const expect<rpc::client*> tclient = thread_client(gclient);
if (!client) if (!tclient)
return client.error(); return tclient.error();
if (*tclient == nullptr)
throw std::logic_error{"Unexpected rpc::client nullptr"};
rpc::send_webhook( rpc::send_webhook(
*client, epee::to_span(*hooks), "json-full-new_account_hook:", "msgpack-full-new_account_hook:", std::chrono::seconds{5}, options.webhook_verify **tclient, epee::to_span(*hooks), "json-full-new_account_hook:", "msgpack-full-new_account_hook:", std::chrono::seconds{5}, options.webhook_verify
); );
} }
return response{true, req.generated_locally}; return response{true, req.generated_locally};
@ -741,18 +761,20 @@ namespace lws
{ {
using transaction_rpc = cryptonote::rpc::SendRawTxHex; using transaction_rpc = cryptonote::rpc::SendRawTxHex;
expect<rpc::client> client = gclient.clone(); const expect<rpc::client*> tclient = thread_client(gclient);
if (!client) if (!tclient)
return client.error(); return tclient.error();
if (*tclient == nullptr)
throw std::logic_error{"Unexpected rpc::client nullptr"};
transaction_rpc::Request daemon_req{}; transaction_rpc::Request daemon_req{};
daemon_req.relay = true; daemon_req.relay = true;
daemon_req.tx_as_hex = std::move(req.tx); daemon_req.tx_as_hex = std::move(req.tx);
epee::byte_slice message = rpc::client::make_message("send_raw_tx_hex", daemon_req); epee::byte_slice message = rpc::client::make_message("send_raw_tx_hex", daemon_req);
MONERO_CHECK(client->send(std::move(message), std::chrono::seconds{10})); MONERO_CHECK((*tclient)->send(std::move(message), std::chrono::seconds{10}));
const auto daemon_resp = client->receive<transaction_rpc::Response>(std::chrono::seconds{20}, MLWS_CURRENT_LOCATION); const auto daemon_resp = (*tclient)->receive<transaction_rpc::Response>(std::chrono::seconds{20}, MLWS_CURRENT_LOCATION);
if (!daemon_resp) if (!daemon_resp)
return daemon_resp.error(); return daemon_resp.error();
if (!daemon_resp->relayed) if (!daemon_resp->relayed)