From de7268f9e60de20e957b1ae7e9773e525d838bbf Mon Sep 17 00:00:00 2001 From: Lee *!* Clagett Date: Sat, 6 Apr 2024 15:50:50 -0400 Subject: [PATCH] Fix too many open files issue with ZeroMQ (#100) --- src/rest_server.cpp | 90 ++++++++++++++++++++++++++++----------------- 1 file changed, 56 insertions(+), 34 deletions(-) diff --git a/src/rest_server.cpp b/src/rest_server.cpp index fd3bc88..25092a3 100644 --- a/src/rest_server.cpp +++ b/src/rest_server.cpp @@ -28,6 +28,7 @@ #include #include +#include #include #include #include @@ -65,6 +66,22 @@ namespace lws { namespace http = epee::net_utils::http; + expect thread_client(const rpc::client& gclient) + { + static boost::thread_specific_ptr global; + rpc::client* thread_ptr = global.get(); + if (thread_ptr) + return {thread_ptr}; + + expect new_client = gclient.clone(); + if (!new_client) + return new_client.error(); + + rpc::client* const new_ptr = new rpc::client{std::move(*new_client)}; + global.reset(new_ptr); + return {new_ptr}; + } + struct context : epee::net_utils::connection_context_base { context() @@ -360,9 +377,11 @@ namespace lws if (50 < req.count || 20 < amounts.size()) return {lws::error::exceeded_rest_request_limit}; - expect client = gclient.clone(); - if (!client) - return client.error(); + const expect tclient = thread_client(gclient); + if (!tclient) + return tclient.error(); + if (*tclient == nullptr) + throw std::logic_error{"Unexpected rpc::client nullptr"}; const std::greater rsort{}; std::sort(amounts.begin(), amounts.end(), rsort); @@ -383,9 +402,9 @@ namespace lws histogram_req.recent_cutoff = 0; epee::byte_slice msg = rpc::client::make_message("get_output_histogram", histogram_req); - MONERO_CHECK(client->send(std::move(msg), std::chrono::seconds{10})); + MONERO_CHECK((*tclient)->send(std::move(msg), std::chrono::seconds{10})); - auto histogram_resp = client->receive(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION); + auto histogram_resp = (*tclient)->receive(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION); if (!histogram_resp) return histogram_resp.error(); if (histogram_resp->histogram.size() != histogram_req.amounts.size()) @@ -411,10 +430,10 @@ namespace lws epee::byte_slice msg = rpc::client::make_message("get_output_distribution", distribution_req); - MONERO_CHECK(client->send(std::move(msg), std::chrono::seconds{10})); + MONERO_CHECK((*tclient)->send(std::move(msg), std::chrono::seconds{10})); auto distribution_resp = - client->receive(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION); + (*tclient)->receive(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION); if (!distribution_resp) return distribution_resp.error(); @@ -438,32 +457,28 @@ namespace lws const and copied in the function instead of using a reference to make the callback in `std::function` thread-safe. This shouldn't be a problem now, but this is just-in-case of a future refactor. */ - rpc::client gclient; + rpc::client* tclient; public: - zmq_fetch_keys(rpc::client src) noexcept - : gclient(std::move(src)) + zmq_fetch_keys(rpc::client* src) noexcept + : tclient(src) {} zmq_fetch_keys(zmq_fetch_keys&&) = default; - zmq_fetch_keys(zmq_fetch_keys const& rhs) - : gclient(MONERO_UNWRAP(rhs.gclient.clone())) - {} + zmq_fetch_keys(const zmq_fetch_keys&) = default; expect> operator()(std::vector ids) const { using get_keys_rpc = cryptonote::rpc::GetOutputKeys; + if (tclient == nullptr) + throw std::logic_error{"Unexpected nullptr in zmq_fetch_keys"}; get_keys_rpc::Request keys_req{}; keys_req.outputs = std::move(ids); - expect client = gclient.clone(); - if (!client) - return client.error(); - epee::byte_slice msg = rpc::client::make_message("get_output_keys", keys_req); - MONERO_CHECK(client->send(std::move(msg), std::chrono::seconds{10})); + MONERO_CHECK(tclient->send(std::move(msg), std::chrono::seconds{10})); - auto keys_resp = client->receive(std::chrono::seconds{10}, MLWS_CURRENT_LOCATION); + auto keys_resp = tclient->receive(std::chrono::seconds{10}, MLWS_CURRENT_LOCATION); if (!keys_resp) return keys_resp.error(); @@ -477,7 +492,7 @@ namespace lws epee::to_span(amounts), pick_rct, epee::to_mut_span(histograms), - zmq_fetch_keys{std::move(*client)} + zmq_fetch_keys{*tclient} ); if (!rings) return rings.error(); @@ -516,15 +531,17 @@ namespace lws if (!user) return user.error(); - expect client = gclient.clone(); - if (!client) - return client.error(); + const expect tclient = thread_client(gclient); + if (!tclient) + return tclient.error(); + if (*tclient == nullptr) + throw std::logic_error{"Unexpected rpc::client nullptr"}; { rpc_command::Request req{}; req.num_grace_blocks = 10; epee::byte_slice msg = rpc::client::make_message("get_dynamic_fee_estimate", req); - MONERO_CHECK(client->send(std::move(msg), std::chrono::seconds{10})); + MONERO_CHECK((*tclient)->send(std::move(msg), std::chrono::seconds{10})); } if ((req.use_dust && req.use_dust) || !req.dust_threshold) @@ -561,7 +578,7 @@ namespace lws if (received < std::uint64_t(req.amount)) return {lws::error::account_not_found}; - const auto resp = client->receive(std::chrono::seconds{20}, MLWS_CURRENT_LOCATION); + const auto resp = (*tclient)->receive(std::chrono::seconds{20}, MLWS_CURRENT_LOCATION); if (!resp) return resp.error(); @@ -659,11 +676,14 @@ namespace lws if (!hooks->empty()) { - expect client = gclient.clone(); - if (!client) - return client.error(); + const expect tclient = thread_client(gclient); + if (!tclient) + return tclient.error(); + if (*tclient == nullptr) + throw std::logic_error{"Unexpected rpc::client nullptr"}; + rpc::send_webhook( - *client, epee::to_span(*hooks), "json-full-new_account_hook:", "msgpack-full-new_account_hook:", std::chrono::seconds{5}, options.webhook_verify + **tclient, epee::to_span(*hooks), "json-full-new_account_hook:", "msgpack-full-new_account_hook:", std::chrono::seconds{5}, options.webhook_verify ); } return response{true, req.generated_locally}; @@ -741,18 +761,20 @@ namespace lws { using transaction_rpc = cryptonote::rpc::SendRawTxHex; - expect client = gclient.clone(); - if (!client) - return client.error(); + const expect tclient = thread_client(gclient); + if (!tclient) + return tclient.error(); + if (*tclient == nullptr) + throw std::logic_error{"Unexpected rpc::client nullptr"}; transaction_rpc::Request daemon_req{}; daemon_req.relay = true; daemon_req.tx_as_hex = std::move(req.tx); epee::byte_slice message = rpc::client::make_message("send_raw_tx_hex", daemon_req); - MONERO_CHECK(client->send(std::move(message), std::chrono::seconds{10})); + MONERO_CHECK((*tclient)->send(std::move(message), std::chrono::seconds{10})); - const auto daemon_resp = client->receive(std::chrono::seconds{20}, MLWS_CURRENT_LOCATION); + const auto daemon_resp = (*tclient)->receive(std::chrono::seconds{20}, MLWS_CURRENT_LOCATION); if (!daemon_resp) return daemon_resp.error(); if (!daemon_resp->relayed)