From ef78f199862825c67d1e4576380749db4bfaef8b Mon Sep 17 00:00:00 2001 From: Lee *!* Clagett Date: Tue, 19 Nov 2024 19:21:32 -0500 Subject: [PATCH] /get_random_outs is now fully async using stackful coroutines (#142) --- CMakeLists.txt | 2 +- src/CMakeLists.txt | 1 + src/net/zmq_async.h | 8 +- src/rest_server.cpp | 486 ++++++++++++++++++++++++++++---------------- 4 files changed, 318 insertions(+), 179 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4259d88..8edd8dc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -161,7 +161,7 @@ if(STATIC) set(Boost_USE_STATIC_LIBS ON) set(Boost_USE_STATIC_RUNTIME ON) endif() -find_package(Boost 1.70 QUIET REQUIRED COMPONENTS chrono filesystem program_options regex serialization system thread) +find_package(Boost 1.70 QUIET REQUIRED COMPONENTS chrono coroutine filesystem program_options regex serialization system thread) if (NOT (Boost_THREAD_LIBRARY STREQUAL monero_Boost_THREAD_LIBRARY_RELEASE)) message(STATUS "Found Boost_THREAD_LIBRARY: ${Boost_THREAD_LIBRARY}") diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 973b155..f6a9072 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -56,6 +56,7 @@ target_link_libraries(monero-lws-daemon-common monero-lws-wire-json monero-lws-util ${Boost_CHRONO_LIBRARY} + ${Boost_COROUTINE_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_THREAD_LIBRARY} diff --git a/src/net/zmq_async.h b/src/net/zmq_async.h index 471f4ef..e4bf597 100644 --- a/src/net/zmq_async.h +++ b/src/net/zmq_async.h @@ -143,20 +143,20 @@ namespace net { namespace zmq //! Cannot have an `async_read` and `async_write` at same time (edge trigger) template - void async_read(async_client& sock, std::string& buffer, F&& f) + auto async_read(async_client& sock, std::string& buffer, F&& f) { // async_compose is required for correct strand invocation, etc - boost::asio::async_compose( + return boost::asio::async_compose( read_msg_op{sock, buffer}, f, *sock.asock ); } //! Cannot have an `async_write` and `async_read` at same time (edge trigger) template - void async_write(async_client& sock, epee::byte_slice msg, F&& f) + auto async_write(async_client& sock, epee::byte_slice msg, F&& f) { // async_compose is required for correct strand invocation, etc - boost::asio::async_compose( + return boost::asio::async_compose( write_msg_op{sock, std::move(msg)}, f, *sock.asock ); } diff --git a/src/rest_server.cpp b/src/rest_server.cpp index ac860c3..b772cc5 100644 --- a/src/rest_server.cpp +++ b/src/rest_server.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -116,66 +117,6 @@ namespace lws }; using async_complete = void(expect); - expect thread_client(const rpc::client& gclient, const bool reset = false) - { - struct tclient - { - rpc::client client; - std::chrono::steady_clock::time_point last_connect; - - explicit tclient() noexcept - : client(), last_connect(std::chrono::seconds{0}) - {} - }; - static boost::thread_specific_ptr global; - - tclient* thread_ptr = global.get(); - if (!thread_ptr) - { - thread_ptr = new tclient; - global.reset(thread_ptr); - } - - if (reset || !thread_ptr->client) - { - // This reduces ZMQ internal errors with lack of file descriptors - const auto now = std::chrono::steady_clock::now(); - if (now - thread_ptr->last_connect < zmq_reconnect_backoff) - return {error::daemon_timeout}; - - // careful, gclient and thread_ptr->client could be aliased - expect new_client = gclient.clone(); - thread_ptr->client = rpc::client{}; - thread_ptr->last_connect = now; - if (!new_client) - return new_client.error(); - - thread_ptr->client = std::move(*new_client); - } - - return {std::addressof(thread_ptr->client)}; - } - - expect send_with_retry(rpc::client& tclient, epee::byte_slice message, const std::chrono::seconds timeout) - { - expect resp{common_error::kInvalidArgument}; - for (unsigned i = 0; i < 2; ++i) - { - resp = tclient.send(message.clone(), timeout); - if (resp || resp != net::zmq::make_error_code(EFSM)) - break; - // fix state machine by reading+discarding previously timed out response - auto read = tclient.get_message(timeout); - if (!read) - { - // message could've been delivered, then dropped in process failure - thread_client(tclient, true); - return read.error(); - } - } - return resp; - } - bool is_locked(std::uint64_t unlock_time, db::block_id last) noexcept { if (unlock_time > CRYPTONOTE_MAX_BLOCK_NUMBER) @@ -316,7 +257,7 @@ namespace lws using response = epee::byte_slice; // sometimes async using async_response = rpc::daemon_status_response; - static expect handle(const request&, boost::asio::io_service& io, rest_server_data& data, std::function resume) + static expect handle(const request&, boost::asio::io_service& io, rest_server_data& data, std::function&& resume) { using info_rpc = cryptonote::rpc::GetInfo; @@ -507,7 +448,7 @@ namespace lws using request = rpc::account_credentials; using response = rpc::get_address_info_response; - static expect handle(const request& req, const boost::asio::io_service&, const rest_server_data& data, std::function) + static expect handle(const request& req, const boost::asio::io_service&, const rest_server_data& data, std::function&&) { auto user = open_account(req, data.disk.clone()); if (!user) @@ -581,7 +522,7 @@ namespace lws using request = rpc::account_credentials; using response = rpc::get_address_txs_response; - static expect handle(const request& req, const boost::asio::io_service&, const rest_server_data& data, std::function) + static expect handle(const request& req, const boost::asio::io_service&, const rest_server_data& data, std::function&&) { auto user = open_account(req, data.disk.clone()); if (!user) @@ -705,138 +646,335 @@ namespace lws using response = void; // always asynchronous response using async_response = rpc::get_random_outs_response; - static expect handle(request req, boost::asio::io_service& io, const rest_server_data& data, std::function resume) + static expect handle(request&& req, boost::asio::io_service& io, rest_server_data& data, std::function&& resume) { using distribution_rpc = cryptonote::rpc::GetOutputDistribution; using histogram_rpc = cryptonote::rpc::GetOutputHistogram; using distribution_rpc = cryptonote::rpc::GetOutputDistribution; - std::vector amounts = std::move(req.amounts.values); - - if (50 < req.count || 20 < amounts.size()) + if (50 < req.count || 20 < req.amounts.values.size()) return {lws::error::exceeded_rest_request_limit}; - const expect tclient = thread_client(data.client); - if (!tclient) - return tclient.error(); - if (*tclient == nullptr) - throw std::logic_error{"Unexpected rpc::client nullptr"}; + std::sort(req.amounts.values.begin(), req.amounts.values.end(), std::greater<>{}); - const std::greater rsort{}; - std::sort(amounts.begin(), amounts.end(), rsort); - const std::size_t ringct_count = - amounts.end() - std::lower_bound(amounts.begin(), amounts.end(), 0, rsort); - - std::vector histograms{}; - if (ringct_count < amounts.size()) + struct frame { - // reuse allocated vector memory - amounts.resize(amounts.size() - ringct_count); + rest_server_data* parent; + net::zmq::async_client client; + boost::asio::steady_timer timer; + boost::asio::strand strand; + std::deque>> resumers; - histogram_rpc::Request histogram_req{}; - histogram_req.amounts = std::move(amounts); - histogram_req.min_count = 0; - histogram_req.max_count = 0; - histogram_req.unlocked = true; - histogram_req.recent_cutoff = 0; + frame(rest_server_data& parent, boost::asio::io_service& io, net::zmq::async_client client) + : parent(std::addressof(parent)), + client(std::move(client)), + timer(io), + strand(io.get_executor()), + resumers() + {} + }; - epee::byte_slice msg = rpc::client::make_message("get_output_histogram", histogram_req); - MONERO_CHECK(send_with_retry(**tclient, std::move(msg), std::chrono::seconds{10})); + struct cached_result + { + std::weak_ptr status; + boost::mutex sync; - auto histogram_resp = (*tclient)->receive(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION); - if (!histogram_resp) - return histogram_resp.error(); - if (histogram_resp->histogram.size() != histogram_req.amounts.size()) - return {lws::error::bad_daemon_response}; + cached_result() noexcept + : status(), sync() + {} + }; - histograms = std::move(histogram_resp->histogram); + static cached_result cache; + boost::unique_lock lock{cache.sync}; - amounts = std::move(histogram_req.amounts); - amounts.insert(amounts.end(), ringct_count, 0); + auto active = cache.status.lock(); + if (active) + { + active->resumers.emplace_back(std::move(req), std::move(resume)); + return success(); } - std::vector distributions{}; - if (ringct_count) + struct async_handler { - distribution_rpc::Request distribution_req{}; - if (ringct_count == amounts.size()) - distribution_req.amounts = std::move(amounts); + std::shared_ptr self_; - distribution_req.amounts.resize(1); - distribution_req.from_height = 0; - distribution_req.to_height = 0; - distribution_req.cumulative = true; - - epee::byte_slice msg = - rpc::client::make_message("get_output_distribution", distribution_req); - MONERO_CHECK(send_with_retry(**tclient, std::move(msg), std::chrono::seconds{10})); - - auto distribution_resp = - (*tclient)->receive(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION); - if (!distribution_resp) - return distribution_resp.error(); - - if (distribution_resp->distributions.size() != 1) - return {lws::error::bad_daemon_response}; - if (distribution_resp->distributions[0].amount != 0) - return {lws::error::bad_daemon_response}; - - distributions = std::move(distribution_resp->distributions[0].data.distribution); - - if (amounts.empty()) - { - amounts = std::move(distribution_req.amounts); - amounts.insert(amounts.end(), ringct_count - 1, 0); - } - } - - class zmq_fetch_keys - { - /* `std::function` needs a copyable functor. The functor was made - const and copied in the function instead of using a reference to - make the callback in `std::function` thread-safe. This shouldn't - be a problem now, but this is just-in-case of a future refactor. */ - rpc::client* tclient; - public: - zmq_fetch_keys(rpc::client* src) noexcept - : tclient(src) + explicit async_handler(std::shared_ptr self) + : self_(std::move(self)) {} - zmq_fetch_keys(zmq_fetch_keys&&) = default; - zmq_fetch_keys(const zmq_fetch_keys&) = default; - - expect> operator()(std::vector ids) const + void send_response(const boost::system::error_code error, expect value) const { - using get_keys_rpc = cryptonote::rpc::GetOutputKeys; - if (tclient == nullptr) - throw std::logic_error{"Unexpected nullptr in zmq_fetch_keys"}; + assert(self_ != nullptr); - get_keys_rpc::Request keys_req{}; - keys_req.outputs = std::move(ids); + std::deque>> resumers; + { + const boost::lock_guard lock{cache.sync}; + if (error) + { + // Prevent further resumers, ZMQ REQ/REP in bad state + MERROR("Failure in /get_random_outs: " << error.message()); + if (value) + value = {lws::error::daemon_timeout}; + cache.status.reset(); + resumers.swap(self_->resumers); + } + else + { + MDEBUG("Completed ZMQ request in /get_random_outs"); + resumers.push_back(std::move(self_->resumers.front())); + self_->resumers.pop_front(); + } + } - epee::byte_slice msg = rpc::client::make_message("get_output_keys", keys_req); - MONERO_CHECK(send_with_retry(*tclient, std::move(msg), std::chrono::seconds{10})); + for (const auto& r : resumers) + r.second(value); + } - auto keys_resp = tclient->receive(std::chrono::seconds{10}, MLWS_CURRENT_LOCATION); - if (!keys_resp) - return keys_resp.error(); + bool set_timeout(std::chrono::steady_clock::duration timeout, const bool expecting) const + { + assert(self_ != nullptr); + if (!self_->timer.expires_after(timeout) && expecting) + return false; - return {std::move(keys_resp->keys)}; + auto& self = self_; + self->timer.async_wait( + [self] (boost::system::error_code error) + { + if (error == boost::asio::error::operation_aborted) + return; + + self->strand.dispatch( + [self] () + { + boost::system::error_code error{}; + MWARNING("Timeout on /get_random_outs ZMQ call"); + self->client.close = true; + self->client.asock->cancel(error); + }, + boost::asio::get_associated_allocator(*self) + ); + } + ); + return true; + } + + void operator()(boost::asio::yield_context yield) const + { + if (!self_) + return; + + std::chrono::steady_clock::time_point last{std::chrono::seconds{0}}; + std::vector distributions{}; + request next{}; + + for (;;) + { + { + const boost::lock_guard lock{cache.sync}; + if (self_->resumers.empty()) + { + cache.status.reset(); + self_->parent->store_async_client(std::move(self_->client)); + MDEBUG("Finishing ZMQ coroutine in /get_random_outs"); + return; + } + next = std::move(self_->resumers.front().first); + } + + boost::system::error_code error{}; + std::vector histograms{}; + const std::size_t ringct_count = + next.amounts.values.end() - + std::lower_bound( + next.amounts.values.begin(), next.amounts.values.end(), 0, std::greater<>{} + ); + + if (ringct_count < next.amounts.values.size()) + { + // reuse allocated vector memory + next.amounts.values.resize(next.amounts.values.size() - ringct_count); + + histogram_rpc::Request histogram_req{}; + histogram_req.amounts = std::move(next.amounts.values); + histogram_req.min_count = 0; + histogram_req.max_count = 0; + histogram_req.unlocked = true; + histogram_req.recent_cutoff = 0; + + epee::byte_slice msg = rpc::client::make_message("get_output_histogram", histogram_req); + + MDEBUG("Fetching histograms for /get_random_outs"); + set_timeout(std::chrono::seconds{10}, false); + net::zmq::async_write(self_->client, std::move(msg), yield[error]); + if (error) + return send_response(error, async_ready()); + if (!set_timeout(std::chrono::minutes{3}, true)) + return send_response(boost::asio::error::operation_aborted, async_ready()); + + std::string in; + net::zmq::async_read(self_->client, in, yield[error]); + if (error) + return send_response(error, async_ready()); + if (!self_->timer.cancel(error)) + return send_response(boost::asio::error::operation_aborted, async_ready()); + + histogram_rpc::Response histogram_resp{}; + const expect status = + rpc::parse_response(histogram_resp, std::move(in)); + if (!status) + return send_response(boost::asio::error::invalid_argument, status.error()); + if (histogram_resp.histogram.size() != histogram_req.amounts.size()) + return send_response(boost::asio::error::invalid_argument, {lws::error::bad_daemon_response}); + + histograms = std::move(histogram_resp.histogram); + + next.amounts.values = std::move(histogram_req.amounts); + next.amounts.values.insert(next.amounts.values.end(), ringct_count, 0); + } + + if (ringct_count && (distributions.empty() || (daemon_cache_timeout < std::chrono::steady_clock::now() - last))) + { + distribution_rpc::Request distribution_req{}; + if (ringct_count == next.amounts.values.size()) + { + distribution_req.amounts = std::move(next.amounts.values); + next.amounts.values.clear(); + } + + distribution_req.amounts.resize(1); + distribution_req.from_height = 0; + distribution_req.to_height = 0; + distribution_req.cumulative = true; + + epee::byte_slice msg = + rpc::client::make_message("get_output_distribution", distribution_req); + + MDEBUG("Fetching distributions for /get_random_outs"); + set_timeout(std::chrono::seconds{10}, false); + net::zmq::async_write(self_->client, std::move(msg), yield[error]); + if (error) + return send_response(error, async_ready()); + if (!set_timeout(std::chrono::minutes{3}, true)) + return send_response(boost::asio::error::operation_aborted, async_ready()); + + std::string in; + net::zmq::async_read(self_->client, in, yield[error]); + if (error) + return send_response(error, async_ready()); + if (!self_->timer.cancel(error)) + return send_response(boost::asio::error::operation_aborted, async_ready()); + + distribution_rpc::Response distribution_resp{}; + const expect status = + rpc::parse_response(distribution_resp, std::move(in)); + if (!status) + return send_response(boost::asio::error::invalid_argument, status.error()); + if (distribution_resp.distributions.size() != 1) + return send_response(boost::asio::error::invalid_argument, {lws::error::bad_daemon_response}); + if (distribution_resp.distributions[0].amount != 0) + return send_response(boost::asio::error::invalid_argument, {lws::error::bad_daemon_response}); + + last = std::chrono::steady_clock::now(); + distributions = std::move(distribution_resp.distributions[0].data.distribution); + + if (next.amounts.values.empty()) + { + next.amounts.values = std::move(distribution_req.amounts); + next.amounts.values.insert( + next.amounts.values.end(), ringct_count - 1, 0 + ); + } + } + + class zmq_fetch_keys + { + /* `std::function` needs a copyable functor. The functor was made + const and copied in the function instead of using a reference to + make the callback in `std::function` thread-safe. This shouldn't + be a problem now, but this is just-in-case of a future refactor. */ + async_handler self_; + boost::asio::yield_context yield_; + + public: + zmq_fetch_keys(async_handler self, boost::asio::yield_context yield) + : self_(std::move(self)), yield_(std::move(yield)) + {} + + zmq_fetch_keys(zmq_fetch_keys&&) = default; + zmq_fetch_keys(const zmq_fetch_keys&) = default; + + expect> operator()(std::vector ids) const + { + using get_keys_rpc = cryptonote::rpc::GetOutputKeys; + if (self_.self_ == nullptr) + throw std::logic_error{"Unexpected nullptr in zmq_fetch_keys"}; + + boost::system::error_code error{}; + get_keys_rpc::Request keys_req{}; + keys_req.outputs = std::move(ids); + + epee::byte_slice msg = rpc::client::make_message("get_output_keys", keys_req); + + self_.set_timeout(std::chrono::seconds{10}, false); + net::zmq::async_write(self_.self_->client, std::move(msg), yield_[error]); + + if (error) + { + MERROR("Internal ZMQ error in /get_random_outs: " << error.message()); + return {error::daemon_timeout}; + } + if (!self_.set_timeout(std::chrono::seconds{10}, true)) + return {error::daemon_timeout}; + + std::string in; + net::zmq::async_read(self_.self_->client, in, yield_[error]); + if (error) + { + MERROR("Internal ZMQ error in /get_random_outs: " << error.message()); + return {error::daemon_timeout}; + } + if (!self_.self_->timer.cancel(error)) + return {error::daemon_timeout}; + + get_keys_rpc::Response keys_resp{}; + const expect status = + rpc::parse_response(keys_resp, std::move(in)); + if (!status) + return status.error(); + + return {std::move(keys_resp.keys)}; + } + }; + + lws::gamma_picker pick_rct{std::move(distributions)}; + auto rings = pick_random_outputs( + next.count, + epee::to_span(next.amounts.values), + pick_rct, + epee::to_mut_span(histograms), + zmq_fetch_keys{*this, yield} + ); + distributions = pick_rct.take_offsets(); + if (!rings) + return send_response(boost::asio::error::invalid_argument, rings.error()); + else + send_response({}, json_response(async_response{std::move(*rings)})); + } } }; - lws::gamma_picker pick_rct{std::move(distributions)}; - auto rings = pick_random_outputs( - req.count, - epee::to_span(amounts), - pick_rct, - epee::to_mut_span(histograms), - zmq_fetch_keys{*tclient} - ); - if (!rings) - return rings.error(); + expect client = data.get_async_client(io); + if (!client) + return client.error(); - resume(json_response(async_response{std::move(*rings)})); + active = std::make_shared(data, io, std::move(*client)); + cache.status = active; + + active->resumers.emplace_back(std::move(req), std::move(resume)); + lock.unlock(); + + MDEBUG("Starting new ZMQ coroutine in /get_random_outs"); + boost::asio::spawn(active->strand, async_handler{active}); return success(); } }; @@ -846,7 +984,7 @@ namespace lws using request = rpc::account_credentials; using response = rpc::get_subaddrs_response; - static expect handle(request const& req, const boost::asio::io_service&, const rest_server_data& data, std::function) + static expect handle(request const& req, const boost::asio::io_service&, const rest_server_data& data, std::function&&) { auto user = open_account(req, data.disk.clone()); if (!user) @@ -925,7 +1063,7 @@ namespace lws ); } - static expect handle(request req, boost::asio::io_service& io, rest_server_data& data, std::function resume) + static expect handle(request req, boost::asio::io_service& io, rest_server_data& data, std::function&& resume) { struct frame { @@ -1109,7 +1247,7 @@ namespace lws using request = rpc::account_credentials; using response = rpc::import_response; - static expect handle(request req, const boost::asio::io_service&, const rest_server_data& data, std::function) + static expect handle(request req, const boost::asio::io_service&, const rest_server_data& data, std::function&&) { bool new_request = false; bool fulfilled = false; @@ -1149,7 +1287,7 @@ namespace lws using request = rpc::login_request; using response = rpc::login_response; - static expect handle(request req, boost::asio::io_service& io, const rest_server_data& data, std::function resume) + static expect handle(request req, boost::asio::io_service& io, const rest_server_data& data, std::function&& resume) { if (!key_check(req.creds)) return {lws::error::bad_view_key}; @@ -1206,7 +1344,7 @@ namespace lws using request = rpc::provision_subaddrs_request; using response = rpc::new_subaddrs_response; - static expect handle(request req, const boost::asio::io_service&, const rest_server_data& data, std::function) + static expect handle(request req, const boost::asio::io_service&, const rest_server_data& data, std::function&&) { if (!req.maj_i && !req.min_i && !req.n_min && !req.n_maj) return {lws::error::invalid_range}; @@ -1494,7 +1632,7 @@ namespace lws }; template - expect call(std::string&& root, boost::asio::io_service& io, rest_server_data& data, std::function resume) + expect call(std::string&& root, boost::asio::io_service& io, rest_server_data& data, std::function&& resume) { using request = typename E::request; using response = typename E::response; @@ -1534,7 +1672,7 @@ namespace lws } template - expect call_admin(std::string&& root, boost::asio::io_service&, rest_server_data& data, std::function) + expect call_admin(std::string&& root, boost::asio::io_service&, rest_server_data& data, std::function&&) { using request = typename E::request; @@ -1575,7 +1713,7 @@ namespace lws struct endpoint { char const* const name; - expect (*const run)(std::string&&, boost::asio::io_service&, rest_server_data&, std::function); + expect (*const run)(std::string&&, boost::asio::io_service&, rest_server_data&, std::function&&); const unsigned max_size; const bool is_async; };