From a81d71ae29d0bc8479df82102888c4710cb0d80f Mon Sep 17 00:00:00 2001 From: Lee *!* Clagett Date: Wed, 23 Oct 2024 11:31:03 -0400 Subject: [PATCH] Switch from epee http server to boost::beast http server. Min boost 1.70 (#136) There is roughly a 7.4% increase in performance in the switch to boost::beast. Additionally, the REST endpoints `/daemon_status`, `/get_unspent_outs`, and `/submit_raw_tx` do not block in ZMQ calls, allowing for better response times regardless of `monerod` status. The REST endpoints `/login and `/get_random_outs` still need updates to prevent blocking (`/login` is conditional on DB state). --- CMakeLists.txt | 2 +- README.md | 2 +- src/CMakeLists.txt | 3 + src/net/CMakeLists.txt | 33 + src/net/zmq_async.cpp | 103 +++ src/net/zmq_async.h | 165 +++++ src/rest_server.cpp | 1360 +++++++++++++++++++++++++++++++------- src/rest_server.h | 9 +- src/rpc/client.cpp | 72 +- src/rpc/client.h | 44 +- src/rpc/light_wallet.cpp | 9 + src/rpc/light_wallet.h | 4 +- src/rpc/webhook.h | 4 +- src/util/CMakeLists.txt | 2 +- src/util/http_server.h | 124 ---- 15 files changed, 1525 insertions(+), 411 deletions(-) create mode 100644 src/net/CMakeLists.txt create mode 100644 src/net/zmq_async.cpp create mode 100644 src/net/zmq_async.h delete mode 100644 src/util/http_server.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 5daea52..f394276 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -157,7 +157,7 @@ if(STATIC) set(Boost_USE_STATIC_LIBS ON) set(Boost_USE_STATIC_RUNTIME ON) endif() -find_package(Boost 1.58 QUIET REQUIRED COMPONENTS chrono filesystem program_options regex serialization system thread) +find_package(Boost 1.70 QUIET REQUIRED COMPONENTS chrono filesystem program_options regex serialization system thread) if (NOT (Boost_THREAD_LIBRARY STREQUAL monero_Boost_THREAD_LIBRARY_RELEASE)) message(STATUS "Found Boost_THREAD_LIBRARY: ${Boost_THREAD_LIBRARY}") diff --git a/README.md b/README.md index 4d54241..309425b 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,7 @@ library archives (`.a`). | ------------ | ------------- | -------- | -------------------- | ------------ | ------------------ | ------------------- | -------- | --------------- | | GCC | 4.7.3 | NO | `build-essential` | `base-devel` | `base-devel` | `gcc` | NO | | | CMake | 3.1 | NO | `cmake` | `cmake` | `cmake` | `cmake` | NO | | -| Boost | 1.58 | NO | `libboost-all-dev` | `boost` | `boost-devel` | `boost-devel` | NO | C++ libraries | +| Boost | 1.70 | NO | `libboost-all-dev` | `boost` | `boost-devel` | `boost-devel` | NO | C++ libraries | | monero | 0.15 | NO | | | | | NO | Monero libraries| | OpenSSL | basically any | NO | `libssl-dev` | `openssl` | `libressl-devel` | `openssl-devel` | NO | sha256 sum | | libzmq | 3.0.0 | NO | `libzmq3-dev` | `zeromq` | `zeromq-devel` | `zeromq-devel` | NO | ZeroMQ library | diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 34cb213..3d84c62 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -31,6 +31,7 @@ include_directories(.) add_subdirectory(lmdb) add_subdirectory(wire) add_subdirectory(db) +add_subdirectory(net) add_subdirectory(rpc) add_subdirectory(util) @@ -49,12 +50,14 @@ target_link_libraries(monero-lws-daemon-common ${MONERO_lmdb} monero-lws-common monero-lws-db + monero-lws-net monero-lws-rpc monero-lws-rpc-scanner monero-lws-wire-json monero-lws-util ${Boost_CHRONO_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} + ${Boost_SYSTEM_LIBRARY} ${Boost_THREAD_LIBRARY} ${Boost_THREAD_LIBS_INIT} ${EXTRA_LIBRARIES} diff --git a/src/net/CMakeLists.txt b/src/net/CMakeLists.txt new file mode 100644 index 0000000..588d2f1 --- /dev/null +++ b/src/net/CMakeLists.txt @@ -0,0 +1,33 @@ +# Copyright (c) 2024, The Monero Project +# +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, are +# permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this list of +# conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, this list +# of conditions and the following disclaimer in the documentation and/or other +# materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors may be +# used to endorse or promote products derived from this software without specific +# prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +# THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +set(monero-lws-net_sources zmq_async.cpp) +set(monero-lws-net_headers zmq_async.h) + +add_library(monero-lws-net ${monero-lws-net_sources} ${monero-lws-net_headers}) +target_link_libraries(monero-lws-net monero::libraries) diff --git a/src/net/zmq_async.cpp b/src/net/zmq_async.cpp new file mode 100644 index 0000000..b7e8405 --- /dev/null +++ b/src/net/zmq_async.cpp @@ -0,0 +1,103 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "zmq_async.h" + +#include + +namespace net { namespace zmq +{ + const boost::system::error_category& boost_error_category() noexcept + { + struct category final : boost::system::error_category + { + virtual const char* name() const noexcept override final + { + return "error::error_category()"; + } + + virtual std::string message(int value) const override final + { + char const* const msg = zmq_strerror(value); + if (msg) + return msg; + return "zmq_strerror failure"; + } + + virtual boost::system::error_condition default_error_condition(int value) const noexcept override final + { + // maps specific errors to generic `std::errc` cases. + switch (value) + { + case EFSM: + case ETERM: + break; + default: + /* zmq is using cerrno errors. C++ spec indicates that `std::errc` + values must be identical to the cerrno value. So just map every zmq + specific error to the generic errc equivalent. zmq extensions must + be in the switch or they map to a non-existent errc enum value. */ + return boost::system::errc::errc_t(value); + } + return boost::system::error_condition{value, *this}; + } + }; + static const category instance{}; + return instance; + } + + boost::system::error_code make_error_code(std::error_code code) + { + if (std::addressof(code.category()) != std::addressof(error_category())) + throw std::logic_error{"Expected only ZMQ errors"}; + return boost::system::error_code{code.value(), boost_error_category()}; + } + + void free_descriptor::operator()(adescriptor* ptr) noexcept + { + if (ptr) + { + ptr->release(); // release ASIO ownership, destroys all queued handlers + delete ptr; + } + } + + expect async_client::make(boost::asio::io_service& io, socket zsock) + { + MONERO_PRECOND(zsock != nullptr); + + int fd = 0; + std::size_t length = sizeof(fd); + if (zmq_getsockopt(zsock.get(), ZMQ_FD, &fd, &length) != 0) + return net::zmq::get_error_code(); + + async_client out{std::move(zsock), nullptr, false}; + out.asock.reset(new adescriptor{io, fd}); + return out; + } +}} // net // zmq + diff --git a/src/net/zmq_async.h b/src/net/zmq_async.h new file mode 100644 index 0000000..471f4ef --- /dev/null +++ b/src/net/zmq_async.h @@ -0,0 +1,165 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include "byte_slice.h" // monero/contrib/epee/include +#include "common/expect.h" // monero/src +#include "net/zmq.h" + +namespace net { namespace zmq +{ + //! \return Category for ZMQ errors. + const boost::system::error_category& boost_error_category() noexcept; + + //! \return `code` iff the `::net::zmq` error category + boost::system::error_code make_error_code(std::error_code code); + + using adescriptor = boost::asio::posix::stream_descriptor; + + struct free_descriptor + { + void operator()(adescriptor* ptr) noexcept; + }; + + using asocket = std::unique_ptr; + + struct async_client + { + async_client() = delete; + socket zsock; + asocket asock; + bool close; + + static expect make(boost::asio::io_service& io, socket zsock); + }; + + class read_msg_op + { + async_client* sock_; + std::string* msg_; + + public: + read_msg_op(async_client& sock, std::string& msg) + : sock_(std::addressof(sock)), msg_(std::addressof(msg)) + {} + + template + void operator()(F& self, const boost::system::error_code error = {}, std::size_t = 0) + { + if (error) + return self.complete(error, 0); + if (!sock_) + return; + if (sock_->close) + return self.complete(boost::asio::error::operation_aborted, 0); + + assert(sock_->zsock && sock_->asock); + expect msg = receive(sock_->zsock.get(), ZMQ_DONTWAIT); + if (!msg) + { + if (msg != make_error_code(EAGAIN)) + return self.complete(make_error_code(msg.error()), 0); + + // try again + sock_->asock->async_read_some(boost::asio::null_buffers(), std::move(self)); + return; + } + + *msg_ = std::move(*msg); + self.complete(error, msg_->size()); + } + }; + + class write_msg_op + { + async_client* sock_; + epee::byte_slice msg_; + + public: + write_msg_op(async_client& sock, epee::byte_slice msg) + : sock_(std::addressof(sock)), msg_(std::move(msg)) + {} + + template + void operator()(F& self, const boost::system::error_code error = {}, std::size_t = 0) + { + if (error) + return self.complete(error, 0); + if (!sock_) + return; + if (sock_->close) + return self.complete(boost::asio::error::operation_aborted, 0); + + assert(sock_->zsock && sock_->asock); + + expect status = + ::net::zmq::send(msg_.clone(), sock_->zsock.get(), ZMQ_DONTWAIT); + if (!status) + { + if (status != ::net::zmq::make_error_code(EAGAIN)) + return self.complete(make_error_code(status.error()), 0); + + // try again + sock_->asock->async_write_some(boost::asio::null_buffers(), std::move(self)); + return; + } + + self.complete(error, msg_.size()); + } + }; + + //! Cannot have an `async_read` and `async_write` at same time (edge trigger) + template + void async_read(async_client& sock, std::string& buffer, F&& f) + { + // async_compose is required for correct strand invocation, etc + boost::asio::async_compose( + read_msg_op{sock, buffer}, f, *sock.asock + ); + } + + //! Cannot have an `async_write` and `async_read` at same time (edge trigger) + template + void async_write(async_client& sock, epee::byte_slice msg, F&& f) + { + // async_compose is required for correct strand invocation, etc + boost::asio::async_compose( + write_msg_op{sock, std::move(msg)}, f, *sock.asock + ); + } + +}} // net // zmq + diff --git a/src/rest_server.cpp b/src/rest_server.cpp index ce502c2..ac860c3 100644 --- a/src/rest_server.cpp +++ b/src/rest_server.cpp @@ -27,6 +27,24 @@ #include "rest_server.h" #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include #include @@ -45,17 +63,16 @@ #include "db/string.h" #include "error.h" #include "lmdb/util.h" // monero/src -#include "net/http_base.h" // monero/contrib/epee/include #include "net/net_parse_helpers.h" // monero/contrib/epee/include #include "net/net_ssl.h" // monero/contrib/epee/include #include "net/zmq.h" // monero/src +#include "net/zmq_async.h" #include "rpc/admin.h" #include "rpc/client.h" #include "rpc/daemon_messages.h" // monero/src #include "rpc/light_wallet.h" #include "rpc/rates.h" #include "rpc/webhook.h" -#include "util/http_server.h" #include "util/gamma_picker.h" #include "util/random_outputs.h" #include "util/source_location.h" @@ -67,7 +84,37 @@ namespace lws namespace { namespace http = epee::net_utils::http; - constexpr const std::chrono::seconds reconnect_backoff{10}; + constexpr const std::size_t http_parser_buffer_size = 16 * 1024; + constexpr const std::chrono::seconds zmq_reconnect_backoff{10}; + constexpr const std::chrono::seconds rest_handshake_timeout{5}; + constexpr const std::chrono::seconds rest_request_timeout{5}; + constexpr const std::chrono::seconds rest_response_timeout{15}; + + //! `/daemon_status` and `get_unspent_outs` caches ZMQ result for this long + constexpr const std::chrono::seconds daemon_cache_timeout{5}; + + struct copyable_slice + { + epee::byte_slice value; + + copyable_slice(epee::byte_slice value) noexcept + : value(std::move(value)) + {} + + copyable_slice(copyable_slice&&) = default; + copyable_slice(const copyable_slice& rhs) noexcept + : value(rhs.value.clone()) + {} + + copyable_slice& operator=(copyable_slice&&) = default; + copyable_slice& operator=(const copyable_slice& rhs) noexcept + { + if (this != std::addressof(rhs)) + value = rhs.value.clone(); + return *this; + } + }; + using async_complete = void(expect); expect thread_client(const rpc::client& gclient, const bool reset = false) { @@ -93,7 +140,7 @@ namespace lws { // This reduces ZMQ internal errors with lack of file descriptors const auto now = std::chrono::steady_clock::now(); - if (now - thread_ptr->last_connect < reconnect_backoff) + if (now - thread_ptr->last_connect < zmq_reconnect_backoff) return {error::daemon_timeout}; // careful, gclient and thread_ptr->client could be aliased @@ -129,13 +176,6 @@ namespace lws return resp; } - struct context : epee::net_utils::connection_context_base - { - context() - : epee::net_utils::connection_context_base() - {} - }; - bool is_locked(std::uint64_t unlock_time, db::block_id last) noexcept { if (unlock_time > CRYPTONOTE_MAX_BLOCK_NUMBER) @@ -202,6 +242,34 @@ namespace lws return {std::make_pair(user->second, std::move(*reader))}; } + //! For endpoints that _sometimes_ generate async responses + expect async_ready() noexcept + { return epee::byte_slice{}; } + + //! Helper for `call` function when handling an _always_ async endpoint + expect json_response(const expect&) noexcept + { return epee::byte_slice{}; } + + //! Helper for `call` function when handling a _sometimes_ async endpoint + expect json_response(expect source) noexcept + { return source; } + + //! Immediately generate JSON from `source` + template + expect json_response(const T& source) + { + std::error_code error{}; + epee::byte_slice out{}; + if ((error = wire::json::to_bytes(out, source))) + return error; + return {std::move(out)}; + } + + //! Helper for `call` function when handling a _never_ async endpoint + template + expect json_response(const expect& source) + { return json_response(source.value()); } + std::atomic_flag rates_error_once = ATOMIC_FLAG_INIT; struct runtime_options @@ -212,57 +280,225 @@ namespace lws bool auto_accept_creation; }; + struct rest_server_data + { + const db::storage disk; + const rpc::client client; + const runtime_options options; + + std::vector clients; + boost::mutex sync; + + expect get_async_client(boost::asio::io_service& io) + { + boost::unique_lock lock{sync}; + if (!clients.empty()) + { + net::zmq::async_client out{std::move(clients.back())}; + clients.pop_back(); + return out; + } + lock.unlock(); + return client.make_async_client(io); + } + + void store_async_client(net::zmq::async_client&& client) + { + const boost::lock_guard lock{sync}; + client.close = false; + clients.push_back(std::move(client)); + } + }; + struct daemon_status { using request = rpc::daemon_status_request; - using response = rpc::daemon_status_response; + using response = epee::byte_slice; // sometimes async + using async_response = rpc::daemon_status_response; - static expect handle(request, const db::storage&, const rpc::client& gclient, const runtime_options&) + static expect handle(const request&, boost::asio::io_service& io, rest_server_data& data, std::function resume) { using info_rpc = cryptonote::rpc::GetInfo; - const expect tclient = thread_client(gclient); - if (!tclient) - return tclient.error(); - if (*tclient == nullptr) - throw std::logic_error{"Unexpected rpc::client nullptr"}; + struct frame + { + rest_server_data* parent; + epee::byte_slice out; + std::string in; + net::zmq::async_client client; + boost::asio::steady_timer timer; + boost::asio::io_service::strand strand; + std::vector> resumers; - // default to an unavailable daemon - response resp{ - .network = rpc::network_type(lws::config::network), - .state = rpc::daemon_state::unavailable + frame(rest_server_data& parent, boost::asio::io_service& io, net::zmq::async_client client) + : parent(std::addressof(parent)), + out(), + in(), + client(std::move(client)), + timer(io), + strand(io), + resumers() + { + info_rpc::Request daemon_req{}; + out = rpc::client::make_message("get_info", daemon_req); + } }; - info_rpc::Request daemon_req{}; - epee::byte_slice message = rpc::client::make_message("get_info", daemon_req); - const expect sent = send_with_retry(**tclient, std::move(message), std::chrono::seconds{2}); - if (!sent) + struct cached_result { - if (sent.matches(std::errc::timed_out)) - return resp; - return sent.error(); + std::weak_ptr status; + epee::byte_slice result; + std::chrono::steady_clock::time_point last; + boost::mutex sync; + + cached_result() noexcept + : status(), result(), last(std::chrono::seconds{0}), sync() + {} + }; + + static cached_result cache; + boost::unique_lock lock{cache.sync}; + + if (!cache.result.empty() && std::chrono::steady_clock::now() - cache.last < daemon_cache_timeout) + return cache.result.clone(); + + auto active = cache.status.lock(); + if (active) + { + active->resumers.push_back(std::move(resume)); + return async_ready(); } - const auto daemon_resp = (*tclient)->receive(std::chrono::seconds{4}, MLWS_CURRENT_LOCATION); - if (!daemon_resp) + struct async_handler : public boost::asio::coroutine { - if (daemon_resp.matches(std::errc::timed_out)) - return resp; - return daemon_resp.error(); - } + std::shared_ptr self_; - resp.outgoing_connections_count = daemon_resp->info.outgoing_connections_count; - resp.incoming_connections_count = daemon_resp->info.incoming_connections_count; - resp.height = daemon_resp->info.height; - resp.target_height = daemon_resp->info.target_height; + explicit async_handler(std::shared_ptr self) + : boost::asio::coroutine(), self_(std::move(self)) + {} - if (!resp.outgoing_connections_count && !resp.incoming_connections_count) - resp.state = rpc::daemon_state::no_connections; - else if (resp.target_height && (resp.target_height - resp.height) >= 5) - resp.state = rpc::daemon_state::synchronizing; - else - resp.state = rpc::daemon_state::ok; - return resp; + void send_response(const boost::system::error_code error, const expect& value) + { + assert(self_ != nullptr); + + if (error) + MERROR("Failure in /daemon_status: " << error.message()); + else + { + // only re-use REQ socket if in proper state + MDEBUG("Completed ZMQ request in /daemon_status"); + self_->parent->store_async_client(std::move(self_->client)); + } + + std::vector> resumers; + { + const boost::lock_guard lock{cache.sync}; + cache.status.reset(); // prevent more resumers being added + resumers.swap(self_->resumers); + if (value) + { + cache.result = value->value.clone(); + cache.last = std::chrono::steady_clock::now(); + } + else + cache.result = nullptr; // serialization error + } + + // send default constructed response if I/O `error` + for (const auto& r : resumers) + r(value); + } + + bool set_timeout(std::chrono::steady_clock::duration timeout, const bool expecting) const + { + struct on_timeout + { + std::shared_ptr self_; + + void operator()(boost::system::error_code error) const + { + if (!self_ || error == boost::asio::error::operation_aborted) + return; + + MWARNING("Timeout on /daemon_status ZMQ call"); + self_->client.close = true; + self_->client.asock->cancel(error); + } + }; + + assert(self_ != nullptr); + if (!self_->timer.expires_after(timeout) && expecting) + return false; + + self_->timer.async_wait(self_->strand.wrap(on_timeout{self_})); + return true; + } + + void operator()(boost::system::error_code error = {}, const std::size_t bytes = 0) + { + if (!self_) + return; + if (error) + return send_response(error, json_response(async_response{})); + + frame& self = *self_; + BOOST_ASIO_CORO_REENTER(*this) + { + set_timeout(std::chrono::seconds{2}, false); + BOOST_ASIO_CORO_YIELD net::zmq::async_write( + self.client, std::move(self.out), self.strand.wrap(std::move(*this)) + ); + + if (!set_timeout(std::chrono::seconds{5}, true)) + return send_response(boost::asio::error::operation_aborted, json_response(async_response{})); + + BOOST_ASIO_CORO_YIELD net::zmq::async_read( + self.client, self.in, self.strand.wrap(std::move(*this)) + ); + + if (!self.timer.cancel(error)) + return send_response(boost::asio::error::operation_aborted, json_response(async_response{})); + + { + info_rpc::Response daemon_resp{}; + const expect status = + rpc::parse_response(daemon_resp, std::move(self.in)); + if (!status) + return send_response({}, status.error()); + + async_response resp{}; + + resp.outgoing_connections_count = daemon_resp.info.outgoing_connections_count; + resp.incoming_connections_count = daemon_resp.info.incoming_connections_count; + resp.height = daemon_resp.info.height; + resp.target_height = daemon_resp.info.target_height; + + if (!resp.outgoing_connections_count && !resp.incoming_connections_count) + resp.state = rpc::daemon_state::no_connections; + else if (resp.target_height && (resp.target_height - resp.height) >= 5) + resp.state = rpc::daemon_state::synchronizing; + else + resp.state = rpc::daemon_state::ok; + + send_response({}, json_response(std::move(resp))); + } + } + } + }; + + expect client = data.get_async_client(io); + if (!client) + return client.error(); + + active = std::make_shared(data, io, std::move(*client)); + cache.result = nullptr; + cache.status = active; + active->resumers.push_back(std::move(resume)); + lock.unlock(); + + MDEBUG("Starting new ZMQ request in /daemon_status"); + active->strand.dispatch(async_handler{active}); + return async_ready(); } }; @@ -271,9 +507,9 @@ namespace lws using request = rpc::account_credentials; using response = rpc::get_address_info_response; - static expect handle(const request& req, db::storage disk, rpc::client const& client, runtime_options const&) + static expect handle(const request& req, const boost::asio::io_service&, const rest_server_data& data, std::function) { - auto user = open_account(req, std::move(disk)); + auto user = open_account(req, data.disk.clone()); if (!user) return user.error(); @@ -331,7 +567,8 @@ namespace lws resp.total_sent = rpc::safe_uint64(std::uint64_t(resp.total_sent) + meta->amount); } - resp.rates = client.get_rates(); + // `get_rates()` nevers does I/O, so handler can remain synchronous + resp.rates = data.client.get_rates(); if (!resp.rates && !rates_error_once.test_and_set(std::memory_order_relaxed)) MWARNING("Unable to retrieve exchange rates: " << resp.rates.error().message()); @@ -344,9 +581,9 @@ namespace lws using request = rpc::account_credentials; using response = rpc::get_address_txs_response; - static expect handle(const request& req, db::storage disk, rpc::client const&, runtime_options const&) + static expect handle(const request& req, const boost::asio::io_service&, const rest_server_data& data, std::function) { - auto user = open_account(req, std::move(disk)); + auto user = open_account(req, data.disk.clone()); if (!user) return user.error(); @@ -465,9 +702,10 @@ namespace lws struct get_random_outs { using request = rpc::get_random_outs_request; - using response = rpc::get_random_outs_response; + using response = void; // always asynchronous response + using async_response = rpc::get_random_outs_response; - static expect handle(request req, const db::storage&, rpc::client const& gclient, runtime_options const&) + static expect handle(request req, boost::asio::io_service& io, const rest_server_data& data, std::function resume) { using distribution_rpc = cryptonote::rpc::GetOutputDistribution; using histogram_rpc = cryptonote::rpc::GetOutputHistogram; @@ -478,7 +716,7 @@ namespace lws if (50 < req.count || 20 < amounts.size()) return {lws::error::exceeded_rest_request_limit}; - const expect tclient = thread_client(gclient); + const expect tclient = thread_client(data.client); if (!tclient) return tclient.error(); if (*tclient == nullptr) @@ -598,7 +836,8 @@ namespace lws if (!rings) return rings.error(); - return response{std::move(*rings)}; + resume(json_response(async_response{std::move(*rings)})); + return success(); } }; @@ -607,9 +846,9 @@ namespace lws using request = rpc::account_credentials; using response = rpc::get_subaddrs_response; - static expect handle(request const& req, db::storage disk, rpc::client const&, runtime_options const& options) + static expect handle(request const& req, const boost::asio::io_service&, const rest_server_data& data, std::function) { - auto user = open_account(req, std::move(disk)); + auto user = open_account(req, data.disk.clone()); if (!user) return user.error(); auto subaddrs = user->second.get_subaddresses(user->first.id); @@ -622,29 +861,19 @@ namespace lws struct get_unspent_outs { using request = rpc::get_unspent_outs_request; - using response = rpc::get_unspent_outs_response; + using response = epee::byte_slice; // somtimes async response + using async_response = rpc::get_unspent_outs_response; + using rpc_command = cryptonote::rpc::GetFeeEstimate; - static expect handle(request req, db::storage disk, rpc::client const& gclient, runtime_options const&) + static expect generate_response(request req, const expect& rpc, db::storage disk) { - using rpc_command = cryptonote::rpc::GetFeeEstimate; + if (!rpc) + return rpc.error(); auto user = open_account(req.creds, std::move(disk)); if (!user) return user.error(); - const expect tclient = thread_client(gclient); - if (!tclient) - return tclient.error(); - if (*tclient == nullptr) - throw std::logic_error{"Unexpected rpc::client nullptr"}; - - { - rpc_command::Request req{}; - req.num_grace_blocks = 10; - epee::byte_slice msg = rpc::client::make_message("get_dynamic_fee_estimate", req); - MONERO_CHECK(send_with_retry(**tclient, std::move(msg), std::chrono::seconds{10})); - } - if ((req.use_dust && *req.use_dust) || !req.dust_threshold) req.dust_threshold = rpc::safe_uint64(0); @@ -679,17 +908,199 @@ namespace lws if (received < std::uint64_t(req.amount)) return {lws::error::account_not_found}; - const auto resp = (*tclient)->receive(std::chrono::seconds{20}, MLWS_CURRENT_LOCATION); - if (!resp) - return resp.error(); - - if (resp->size_scale == 0 || 1024 < resp->size_scale || resp->fee_mask == 0) + if (rpc->size_scale == 0 || 1024 < rpc->size_scale || rpc->fee_mask == 0) return {lws::error::bad_daemon_response}; const std::uint64_t per_byte_fee = - resp->estimated_base_fee / resp->size_scale; + rpc->estimated_base_fee / rpc->size_scale; - return response{per_byte_fee, resp->fee_mask, rpc::safe_uint64(received), std::move(unspent), std::move(req.creds.key)}; + return json_response( + async_response{ + per_byte_fee, + rpc->fee_mask, + rpc::safe_uint64(received), + std::move(unspent), + std::move(req.creds.key) + } + ); + } + + static expect handle(request req, boost::asio::io_service& io, rest_server_data& data, std::function resume) + { + struct frame + { + rest_server_data* parent; + epee::byte_slice out; + std::string in; + net::zmq::async_client client; + boost::asio::steady_timer timer; + boost::asio::io_service::strand strand; + std::vector>> resumers; + + frame(rest_server_data& parent, boost::asio::io_service& io, net::zmq::async_client client) + : parent(std::addressof(parent)), + out(), + in(), + client(std::move(client)), + timer(io), + strand(io), + resumers() + { + rpc_command::Request req{}; + req.num_grace_blocks = 10; + out = rpc::client::make_message("get_dynamic_fee_estimate", req); + } + }; + + struct cached_result + { + std::weak_ptr status; + rpc_command::Response result; + std::chrono::steady_clock::time_point last; + boost::mutex sync; + + cached_result() noexcept + : status(), result{}, last(std::chrono::seconds{0}), sync() + {} + }; + + static cached_result cache; + boost::unique_lock lock{cache.sync}; + + if (std::chrono::steady_clock::now() - cache.last < daemon_cache_timeout) + { + rpc_command::Response result = cache.result; + lock.unlock(); + return generate_response(std::move(req), std::move(result), data.disk.clone()); + } + + auto active = cache.status.lock(); + if (active) + { + active->resumers.emplace_back(std::move(req), std::move(resume)); + return async_ready(); + } + + struct async_handler : public boost::asio::coroutine + { + std::shared_ptr self_; + + explicit async_handler(std::shared_ptr self) + : boost::asio::coroutine(), self_(std::move(self)) + {} + + void send_response(const boost::system::error_code error, expect value) + { + assert(self_ != nullptr); + + if (error) + { + MERROR("Failure in /get_unspent_outs: " << error.message()); + value = {lws::error::daemon_timeout}; // old previous behavior + } + else + { + // only re-use REQ socket if in proper state + MDEBUG("Completed ZMQ request in /get_unspent_outs"); + self_->parent->store_async_client(std::move(self_->client)); + } + + std::vector>> resumers; + { + const boost::lock_guard lock{cache.sync}; + cache.status.reset(); // prevent more resumers being added + resumers.swap(self_->resumers); + if (value) + { + cache.result = *value; + cache.last = std::chrono::steady_clock::now(); + } + else + cache.result = rpc_command::Response{}; + } + + // if `value` is error, it will return immediately + for (auto& r : resumers) + r.second(generate_response(std::move(r.first), value, self_->parent->disk.clone())); + } + + bool set_timeout(std::chrono::steady_clock::duration timeout, const bool expecting) const + { + struct on_timeout + { + std::shared_ptr self_; + + void operator()(boost::system::error_code error) const + { + if (!self_ || error == boost::asio::error::operation_aborted) + return; + + MWARNING("Timeout on /get_unspent_outs ZMQ call"); + self_->client.close = true; + self_->client.asock->cancel(error); + } + }; + + assert(self_ != nullptr); + if (!self_->timer.expires_after(timeout) && expecting) + return false; + + self_->timer.async_wait(self_->strand.wrap(on_timeout{self_})); + return true; + } + + void operator()(boost::system::error_code error = {}, const std::size_t bytes = 0) + { + using default_response = rpc_command::Response; + + if (!self_) + return; + if (error) + return send_response(error, default_response{}); + + frame& self = *self_; + BOOST_ASIO_CORO_REENTER(*this) + { + set_timeout(std::chrono::seconds{2}, false); + BOOST_ASIO_CORO_YIELD net::zmq::async_write( + self.client, std::move(self.out), self.strand.wrap(std::move(*this)) + ); + + if (!set_timeout(std::chrono::seconds{5}, true)) + return send_response(boost::asio::error::operation_aborted, default_response{}); + + BOOST_ASIO_CORO_YIELD net::zmq::async_read( + self.client, self.in, self.strand.wrap(std::move(*this)) + ); + + if (!self.timer.cancel(error)) + return send_response(boost::asio::error::operation_aborted, default_response{}); + + { + rpc_command::Response daemon_resp{}; + const expect status = + rpc::parse_response(daemon_resp, std::move(self.in)); + if (!status) + return send_response({}, status.error()); + return send_response({}, std::move(daemon_resp)); + } + } + } + }; + + expect client = data.get_async_client(io); + if (!client) + return client.error(); + + active = std::make_shared(data, io, std::move(*client)); + cache.result = rpc_command::Response{}; + cache.status = active; + active->resumers.emplace_back(std::move(req), std::move(resume)); + lock.unlock(); + + MDEBUG("Starting new ZMQ request in /get_unspent_outs"); + active->strand.dispatch(async_handler{active}); + return async_ready(); } }; @@ -698,12 +1109,12 @@ namespace lws using request = rpc::account_credentials; using response = rpc::import_response; - static expect handle(request req, db::storage disk, rpc::client const&, runtime_options const&) + static expect handle(request req, const boost::asio::io_service&, const rest_server_data& data, std::function) { bool new_request = false; bool fulfilled = false; { - auto user = open_account(req, disk.clone()); + auto user = open_account(req, data.disk.clone()); if (!user) return user.error(); @@ -725,7 +1136,7 @@ namespace lws } // close reader if (new_request) - MONERO_CHECK(disk.import_request(req.address, db::block_id(0))); + MONERO_CHECK(data.disk.clone().import_request(req.address, db::block_id(0))); const char* status = new_request ? "Accepted, waiting for approval" : (fulfilled ? "Approved" : "Waiting for Approval"); @@ -738,11 +1149,12 @@ namespace lws using request = rpc::login_request; using response = rpc::login_response; - static expect handle(request req, db::storage disk, rpc::client const& gclient, runtime_options const& options) + static expect handle(request req, boost::asio::io_service& io, const rest_server_data& data, std::function resume) { if (!key_check(req.creds)) return {lws::error::bad_view_key}; + auto disk = data.disk.clone(); { auto reader = disk.start_read(); if (!reader) @@ -768,7 +1180,7 @@ namespace lws if (!hooks) return hooks.error(); - if (options.auto_accept_creation) + if (data.options.auto_accept_creation) { const auto accepted = disk.accept_requests(db::request::create, {std::addressof(req.creds.address), 1}); if (!accepted) @@ -777,16 +1189,14 @@ namespace lws if (!hooks->empty()) { - const expect tclient = thread_client(gclient); - if (!tclient) - return tclient.error(); - if (*tclient == nullptr) - throw std::logic_error{"Unexpected rpc::client nullptr"}; + // webhooks are not needed for response, so just queue i/o and + // log errors when it fails rpc::send_webhook( - **tclient, epee::to_span(*hooks), "json-full-new_account_hook:", "msgpack-full-new_account_hook:", std::chrono::seconds{5}, options.webhook_verify + data.client, epee::to_span(*hooks), "json-full-new_account_hook:", "msgpack-full-new_account_hook:", std::chrono::seconds{5}, data.options.webhook_verify ); } + return response{true, req.generated_locally}; } }; @@ -796,14 +1206,14 @@ namespace lws using request = rpc::provision_subaddrs_request; using response = rpc::new_subaddrs_response; - static expect handle(request req, db::storage disk, rpc::client const&, runtime_options const& options) + static expect handle(request req, const boost::asio::io_service&, const rest_server_data& data, std::function) { if (!req.maj_i && !req.min_i && !req.n_min && !req.n_maj) return {lws::error::invalid_range}; db::account_id id = db::account_id::invalid; { - auto user = open_account(req.creds, disk.clone()); + auto user = open_account(req.creds, data.disk.clone()); if (!user) return user.error(); id = user->first.id; @@ -821,7 +1231,7 @@ namespace lws { if (std::numeric_limits::max() / n_major < n_minor) return {lws::error::max_subaddresses}; - if (options.max_subaddresses < n_major * n_minor) + if (data.options.max_subaddresses < n_major * n_minor) return {lws::error::max_subaddresses}; std::vector ranges; @@ -832,7 +1242,7 @@ namespace lws db::major_index(elem), db::index_ranges{{db::index_range{db::minor_index(minor_i), db::minor_index(minor_i + n_minor - 1)}}} ); } - auto upserted = disk.upsert_subaddresses(id, req.creds.address, req.creds.key, ranges, options.max_subaddresses); + auto upserted = data.disk.clone().upsert_subaddresses(id, req.creds.address, req.creds.key, ranges, data.options.max_subaddresses); if (!upserted) return upserted.error(); new_ranges = std::move(*upserted); @@ -841,6 +1251,7 @@ namespace lws if (get_all) { // must start a new read after the last write + auto disk = data.disk.clone(); auto reader = disk.start_read(); if (!reader) return reader.error(); @@ -856,32 +1267,188 @@ namespace lws struct submit_raw_tx { using request = rpc::submit_raw_tx_request; - using response = rpc::submit_raw_tx_response; + using response = void; // always async + using async_response = rpc::submit_raw_tx_response; - static expect handle(request req, const db::storage& disk, const rpc::client& gclient, const runtime_options&) + static expect handle(request req, boost::asio::io_service& io, rest_server_data& data, std::function resume) { using transaction_rpc = cryptonote::rpc::SendRawTxHex; - const expect tclient = thread_client(gclient); - if (!tclient) - return tclient.error(); - if (*tclient == nullptr) - throw std::logic_error{"Unexpected rpc::client nullptr"}; + struct frame + { + rest_server_data* parent; + std::string in; + net::zmq::async_client client; + boost::asio::steady_timer timer; + boost::asio::io_service::strand strand; + std::deque>> resumers; + + frame(rest_server_data& parent, boost::asio::io_service& io, net::zmq::async_client client) + : parent(std::addressof(parent)), + in(), + client(std::move(client)), + timer(io), + strand(io), + resumers() + {} + }; + + struct cached_result + { + std::weak_ptr status; + boost::mutex sync; + + cached_result() noexcept + : status(), sync() + {} + }; transaction_rpc::Request daemon_req{}; daemon_req.relay = true; daemon_req.tx_as_hex = std::move(req.tx); - epee::byte_slice message = rpc::client::make_message("send_raw_tx_hex", daemon_req); - MONERO_CHECK(send_with_retry(**tclient, std::move(message), std::chrono::seconds{10})); + epee::byte_slice msg = rpc::client::make_message("send_raw_tx_hex", daemon_req); - const auto daemon_resp = (*tclient)->receive(std::chrono::seconds{20}, MLWS_CURRENT_LOCATION); - if (!daemon_resp) - return daemon_resp.error(); - if (!daemon_resp->relayed) - return {lws::error::tx_relay_failed}; + static cached_result cache; + boost::unique_lock lock{cache.sync}; - return response{"OK"}; + auto active = cache.status.lock(); + if (active) + { + active->resumers.emplace_back(std::move(msg), std::move(resume)); + return success(); + } + + struct async_handler : public boost::asio::coroutine + { + std::shared_ptr self_; + + explicit async_handler(std::shared_ptr self) + : boost::asio::coroutine(), self_(std::move(self)) + {} + + void send_response(const boost::system::error_code error, expect value) + { + assert(self_ != nullptr); + + std::deque>> resumers; + { + const boost::lock_guard lock{cache.sync}; + if (error) + { + // Prevent further resumers, ZMQ REQ/REP in bad state + MERROR("Failure in /submit_raw_tx: " << error.message()); + value = {lws::error::daemon_timeout}; + cache.status.reset(); + resumers.swap(self_->resumers); + } + else + { + MDEBUG("Completed ZMQ request in /submit_raw_tx"); + resumers.push_back(std::move(self_->resumers.front())); + self_->resumers.pop_front(); + } + } + + for (const auto& r : resumers) + r.second(value); + } + + bool set_timeout(std::chrono::steady_clock::duration timeout, const bool expecting) const + { + struct on_timeout + { + std::shared_ptr self_; + + void operator()(boost::system::error_code error) const + { + if (!self_ || error == boost::asio::error::operation_aborted) + return; + + MWARNING("Timeout on /submit_raw_tx ZMQ call"); + self_->client.close = true; + self_->client.asock->cancel(error); + } + }; + + assert(self_ != nullptr); + if (!self_->timer.expires_after(timeout) && expecting) + return false; + + self_->timer.async_wait(self_->strand.wrap(on_timeout{self_})); + return true; + } + + void operator()(boost::system::error_code error = {}, const std::size_t bytes = 0) + { + if (!self_) + return; + if (error) + return send_response(error, async_ready()); + + frame& self = *self_; + epee::byte_slice next = nullptr; + BOOST_ASIO_CORO_REENTER(*this) + { + for (;;) + { + { + const boost::lock_guard lock{cache.sync}; + if (self_->resumers.empty()) + { + cache.status.reset(); + self_->parent->store_async_client(std::move(self_->client)); + return; + } + next = std::move(self_->resumers.front().first); + } + + set_timeout(std::chrono::seconds{10}, false); + BOOST_ASIO_CORO_YIELD net::zmq::async_write( + self.client, std::move(next), self.strand.wrap(std::move(*this)) + ); + + if (!set_timeout(std::chrono::seconds{20}, true)) + return send_response(boost::asio::error::operation_aborted, async_ready()); + + self.in.clear(); // could be in moved-from state + BOOST_ASIO_CORO_YIELD net::zmq::async_read( + self.client, self.in, self.strand.wrap(std::move(*this)) + ); + + if (!self.timer.cancel(error)) + return send_response(boost::asio::error::operation_aborted, async_ready()); + + { + transaction_rpc::Response daemon_resp{}; + const expect status = + rpc::parse_response(daemon_resp, std::move(self.in)); + + if (!status) + send_response({}, status.error()); + else if (!daemon_resp.relayed) + send_response({}, {lws::error::tx_relay_failed}); + else + send_response({}, json_response(async_response{"OK"})); + } + } + } + } + }; + + expect client = data.get_async_client(io); + if (!client) + return client.error(); + + active = std::make_shared(data, io, std::move(*client)); + cache.status = active; + + active->resumers.emplace_back(std::move(msg), std::move(resume)); + lock.unlock(); + + MDEBUG("Starting new ZMQ request in /submit_raw_tx"); + active->strand.dispatch(async_handler{active}); + return success(); } }; @@ -890,14 +1457,14 @@ namespace lws using request = rpc::upsert_subaddrs_request; using response = rpc::new_subaddrs_response; - static expect handle(request req, db::storage disk, rpc::client const&, runtime_options const& options) + static expect handle(request req, const boost::asio::io_service&, const rest_server_data& data, std::function) { - if (!options.max_subaddresses) + if (!data.options.max_subaddresses) return {lws::error::max_subaddresses}; db::account_id id = db::account_id::invalid; { - auto user = open_account(req.creds, disk.clone()); + auto user = open_account(req.creds, data.disk.clone()); if (!user) return user.error(); id = user->first.id; @@ -906,8 +1473,9 @@ namespace lws const bool get_all = req.get_all.value_or(true); std::vector all_ranges; + auto disk = data.disk.clone(); auto new_ranges = - disk.upsert_subaddresses(id, req.creds.address, req.creds.key, req.subaddrs, options.max_subaddresses); + disk.upsert_subaddresses(id, req.creds.address, req.creds.key, req.subaddrs, data.options.max_subaddresses); if (!new_ranges) return new_ranges.error(); @@ -926,24 +1494,25 @@ namespace lws }; template - expect call(std::string&& root, db::storage disk, const rpc::client& gclient, const runtime_options& options) + expect call(std::string&& root, boost::asio::io_service& io, rest_server_data& data, std::function resume) { using request = typename E::request; using response = typename E::response; + if (std::is_same() && !resume) + throw std::logic_error{"async REST handler not setup properly"}; + if (std::is_same() && !resume) + throw std::logic_error{"async REST handler not setup properly"}; + request req{}; std::error_code error = wire::json::from_bytes(std::move(root), req); if (error) return error; - expect resp = E::handle(std::move(req), std::move(disk), gclient, options); + expect resp = E::handle(std::move(req), io, data, std::move(resume)); if (!resp) return resp.error(); - - epee::byte_slice out{}; - if ((error = wire::json::to_bytes(out, *resp))) - return error; - return {std::move(out)}; + return json_response(std::move(resp)); } template @@ -965,7 +1534,7 @@ namespace lws } template - expect call_admin(std::string&& root, db::storage disk, const rpc::client&, const runtime_options& options) + expect call_admin(std::string&& root, boost::asio::io_service&, rest_server_data& data, std::function) { using request = typename E::request; @@ -976,7 +1545,8 @@ namespace lws return error; } - if (!options.disable_admin_auth) + db::storage disk = data.disk.clone(); + if (!data.options.disable_admin_auth) { if (!req.auth) return {error::account_not_found}; @@ -1005,41 +1575,57 @@ namespace lws struct endpoint { char const* const name; - expect (*const run)(std::string&&, db::storage, rpc::client const&, const runtime_options&); + expect (*const run)(std::string&&, boost::asio::io_service&, rest_server_data&, std::function); const unsigned max_size; + const bool is_async; }; + constexpr unsigned get_max(const endpoint* start, endpoint const* const end) noexcept + { + unsigned current_max = 0; + for ( ; start != end; ++start) + current_max = std::max(current_max, start->max_size); + return current_max; + } + constexpr const endpoint endpoints[] = { - {"/daemon_status", call, 1024}, - {"/get_address_info", call, 2 * 1024}, - {"/get_address_txs", call, 2 * 1024}, - {"/get_random_outs", call, 2 * 1024}, - {"/get_subaddrs", call, 2 * 1024}, - {"/get_txt_records", nullptr, 0 }, - {"/get_unspent_outs", call, 2 * 1024}, - {"/import_wallet_request", call, 2 * 1024}, - {"/login", call, 2 * 1024}, - {"/provision_subaddrs", call, 2 * 1024}, - {"/submit_raw_tx", call, 50 * 1024}, - {"/upsert_subaddrs", call, 10 * 1024} + {"/daemon_status", call, 1024, true}, + {"/get_address_info", call, 2 * 1024, false}, + {"/get_address_txs", call, 2 * 1024, false}, + {"/get_random_outs", call, 2 * 1024, true}, + {"/get_subaddrs", call, 2 * 1024, false}, + {"/get_txt_records", nullptr, 0, false}, + {"/get_unspent_outs", call, 2 * 1024, true}, + {"/import_wallet_request", call, 2 * 1024, false}, + {"/login", call, 2 * 1024, false}, + {"/provision_subaddrs", call, 2 * 1024, false}, + {"/submit_raw_tx", call, 50 * 1024, true}, + {"/upsert_subaddrs", call, 10 * 1024, false} }; + constexpr const unsigned max_standard_endpoint_size = + get_max(std::begin(endpoints), std::end(endpoints)); constexpr const endpoint admin_endpoints[] = { - {"/accept_requests", call_admin, 50 * 1024}, - {"/add_account", call_admin, 50 * 1024}, - {"/list_accounts", call_admin, 100}, - {"/list_requests", call_admin, 100}, - {"/modify_account_status", call_admin, 50 * 1024}, - {"/reject_requests", call_admin, 50 * 1024}, - {"/rescan", call_admin, 50 * 1024}, - {"/validate", call_admin, 50 * 1024}, - {"/webhook_add", call_admin, 50 * 1024}, - {"/webhook_delete", call_admin, 50 * 1024}, - {"/webhook_delete_uuid", call_admin,50 * 1024}, - {"/webhook_list", call_admin, 100} + {"/accept_requests", call_admin, 50 * 1024, false}, + {"/add_account", call_admin, 50 * 1024, false}, + {"/list_accounts", call_admin, 100, false}, + {"/list_requests", call_admin, 100, false}, + {"/modify_account_status", call_admin, 50 * 1024, false}, + {"/reject_requests", call_admin, 50 * 1024, false}, + {"/rescan", call_admin, 50 * 1024, false}, + {"/validate", call_admin, 50 * 1024, false}, + {"/webhook_add", call_admin, 50 * 1024, false}, + {"/webhook_delete", call_admin, 50 * 1024, false}, + {"/webhook_delete_uuid", call_admin,50 * 1024, false}, + {"/webhook_list", call_admin, 100, false} }; + constexpr const unsigned max_admin_endpoint_size = + get_max(std::begin(endpoints), std::end(endpoints)); + + constexpr const unsigned max_endpoint_size = + std::max(max_standard_endpoint_size, max_admin_endpoint_size); struct by_name_ { @@ -1049,13 +1635,13 @@ namespace lws return std::strcmp(left.name, right.name) < 0; return false; } - bool operator()(const boost::string_ref left, endpoint const& right) const noexcept + bool operator()(const boost::beast::string_view left, endpoint const& right) const noexcept { if (right.name) return left < right.name; return false; } - bool operator()(endpoint const& left, const boost::string_ref right) const noexcept + bool operator()(endpoint const& left, const boost::beast::string_view right) const noexcept { if (left.name) return left.name < right; @@ -1063,28 +1649,60 @@ namespace lws } }; constexpr const by_name_ by_name{}; + + struct slice_body + { + using value_type = epee::byte_slice; + + static std::size_t size(const value_type& source) noexcept + { + return source.size(); + } + + struct writer + { + epee::byte_slice body_; + + using const_buffers_type = boost::asio::const_buffer; + + template + explicit writer(boost::beast::http::header const&, value_type const& body) + : body_(body.clone()) + {} + + void init(boost::beast::error_code& ec) + { + ec = {}; + } + + boost::optional> get(boost::beast::error_code& ec) + { + ec = {}; + return {{const_buffers_type{body_.data(), body_.size()}, false}}; + } + }; + }; } // anonymous - struct rest_server::internal final : public lws::http_server_impl_base + struct rest_server::internal { - db::storage disk; - rpc::client client; + rest_server_data data; boost::optional prefix; boost::optional admin_prefix; - runtime_options options; + boost::optional ssl_; + boost::asio::ip::tcp::acceptor acceptor; explicit internal(boost::asio::io_service& io_service, lws::db::storage disk, rpc::client client, runtime_options options) - : lws::http_server_impl_base(io_service) - , disk(std::move(disk)) - , client(std::move(client)) + : data{std::move(disk), std::move(client), std::move(options)} , prefix() , admin_prefix() - , options(std::move(options)) + , ssl_() + , acceptor(io_service) { assert(std::is_sorted(std::begin(endpoints), std::end(endpoints), by_name)); } - const endpoint* get_endpoint(boost::string_ref uri) const + const endpoint* get_endpoint(boost::beast::string_view uri) const { using span = epee::span; span handlers = nullptr; @@ -1109,86 +1727,335 @@ namespace lws return nullptr; return handler; } + }; - virtual bool - handle_http_request(const http::http_request_info& query, http::http_response_info& response, context& ctx) - override final + template + struct rest_server::connection + { + internal* parent_; + Sock sock_; + boost::beast::flat_static_buffer buffer_; + boost::optional> parser_; + boost::beast::http::response response_; + boost::asio::steady_timer timer_; + boost::asio::io_service::strand strand_; + bool keep_alive_; + + static boost::asio::ip::tcp::socket make_socket(std::true_type, internal* parent) { - endpoint const* const handler = get_endpoint(query.m_URI); - if (!handler) + return boost::asio::ip::tcp::socket{GET_IO_SERVICE(parent->acceptor)}; + } + + static boost::asio::ssl::stream make_socket(std::false_type, internal* parent) + { + return boost::asio::ssl::stream{ + GET_IO_SERVICE(parent->acceptor), parent->ssl_.value() + }; + } + + static boost::asio::ip::tcp::socket& get_tcp(boost::asio::ip::tcp::socket& sock) + { + return sock; + } + + static boost::asio::ip::tcp::socket& get_tcp(boost::asio::ssl::stream& sock) + { + return sock.next_layer(); + } + + boost::asio::ip::tcp::socket& sock() { return get_tcp(sock_); } + + explicit connection(internal* parent) noexcept + : parent_(parent), + sock_(make_socket(std::is_same(), parent)), + buffer_{}, + parser_{}, + response_{}, + timer_(GET_IO_SERVICE(parent->acceptor)), + strand_(GET_IO_SERVICE(parent->acceptor)), + keep_alive_(true) + {} + + ~connection() + { + MDEBUG("Destroying connection " << this); + } + + template + void bad_request(const boost::beast::http::status status, F&& resume) + { + MDEBUG("Sending HTTP status " << int(status) << " to " << this); + + assert(strand_.running_in_this_thread()); + response_ = {status, parser_->get().version()}; + response_.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + response_.keep_alive(keep_alive_); + response_.prepare_payload(); + resume(); + } + + template + void bad_request(const std::error_code error, F&& resume) + { + boost::system::error_code ec{}; + MINFO("REST error: " << error.message() << " from " << sock().remote_endpoint(ec) << " / " << this); + + assert(strand_.running_in_this_thread()); + if (error.category() == wire::error::rapidjson_category() || error == lws::error::invalid_range) + return bad_request(boost::beast::http::status::bad_request, std::forward(resume)); + else if (error == lws::error::account_not_found || error == lws::error::duplicate_request) + return bad_request(boost::beast::http::status::forbidden, std::forward(resume)); + else if (error == lws::error::max_subaddresses) + return bad_request(boost::beast::http::status::conflict, std::forward(resume)); + else if (error.default_error_condition() == std::errc::timed_out || error.default_error_condition() == std::errc::no_lock_available) + return bad_request(boost::beast::http::status::service_unavailable, std::forward(resume)); + return bad_request(boost::beast::http::status::internal_server_error, std::forward(resume)); + } + + template + void valid_request(epee::byte_slice body, F&& resume) + { + MDEBUG("Sending HTTP 200 OK (" << body.size() << " bytes) to " << this); + + assert(strand_.running_in_this_thread()); + response_ = {boost::beast::http::status::ok, parser_->get().version(), std::move(body)}; + response_.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING); + response_.set(boost::beast::http::field::content_type, "application/json"); + response_.keep_alive(keep_alive_); + response_.prepare_payload(); + resume(); // runs in strand + } + + static bool set_timeout(const std::shared_ptr& self, const std::chrono::steady_clock::duration timeout, const bool existing) + { + if (!self) + return false; + + struct on_timeout { - response.m_response_code = 404; - response.m_response_comment = "Not Found"; - return true; - } + std::shared_ptr self_; - if (handler->run == nullptr) - { - response.m_response_code = 501; - response.m_response_comment = "Not Implemented"; - return true; - } - - if (handler->max_size < query.m_body.size()) - { - MINFO("Client exceeded maximum body size (" << handler->max_size << " bytes)"); - response.m_response_code = 400; - response.m_response_comment = "Bad Request"; - return true; - } - - if (query.m_http_method != http::http_method_post) - { - response.m_response_code = 405; - response.m_response_comment = "Method Not Allowed"; - return true; - } - - // \TODO remove copy of json string here :/ - auto body = handler->run(std::string{query.m_body}, disk.clone(), client, options); - if (!body) - { - MINFO(body.error().message() << " from " << ctx.m_remote_address.str() << " on " << handler->name); - - if (body.error().category() == wire::error::rapidjson_category() || body == lws::error::invalid_range) + void operator()(boost::system::error_code error) const { - response.m_response_code = 400; - response.m_response_comment = "Bad Request"; - } - else if (body == lws::error::account_not_found || body == lws::error::duplicate_request) - { - response.m_response_code = 403; - response.m_response_comment = "Forbidden"; - } - else if (body == lws::error::max_subaddresses) - { - response.m_response_code = 409; - response.m_response_comment = "Conflict"; - } - else if (body.matches(std::errc::timed_out) || body.matches(std::errc::no_lock_available)) - { - response.m_response_code = 503; - response.m_response_comment = "Service Unavailable"; - } - else - { - response.m_response_code = 500; - response.m_response_comment = "Internal Server Error"; - } - return true; - } + if (!self_ || error == boost::asio::error::operation_aborted) + return; - response.m_response_code = 200; - response.m_response_comment = "OK"; - response.m_mime_tipe = "application/json"; - response.m_header_info.m_content_type = "application/json"; - response.m_body.assign(reinterpret_cast(body->data()), body->size()); // \TODO Remove copy here too!s + MWARNING("Timeout on REST connection to " << self_->sock().remote_endpoint(error) << " / " << self_.get()); + self_->sock().cancel(error); + self_->shutdown(); + } + }; + + if (!self->timer_.expires_after(timeout) && existing) + return false; // timeout queued, just abort + self->timer_.async_wait(self->strand_.wrap(on_timeout{self})); return true; } + + void shutdown() + { + boost::system::error_code ec{}; + MDEBUG("Shutting down REST socket to " << this); + sock().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); + timer_.cancel(ec); + } }; + template + struct rest_server::handler_loop final : public boost::asio::coroutine + { + std::shared_ptr> self_; + + explicit handler_loop(std::shared_ptr> self) noexcept + : boost::asio::coroutine(), self_(std::move(self)) + {} + + static void async_handshake(const boost::asio::ip::tcp::socket&) noexcept + {} + + void async_handshake(boost::asio::ssl::stream& sock) + { + connection& self = *self_; + self.sock_.async_handshake( + boost::asio::ssl::stream::server, + self.strand_.wrap(std::move(*this)) + ); + } + + template + void async_response(F&& resume) + { + assert(self_ != nullptr); + assert(self_->strand_.running_in_this_thread()); + + // checks access for `parser_` on first use + self_->keep_alive_ = self_->parser_->get().keep_alive(); + const auto target = self_->parser_.value().get().target(); + + MDEBUG("Received HTTP request from " << self_.get() << " to target " << target); + + // Checked access for `parser_` on first use + endpoint const* const handler = self_->parent_->get_endpoint(target); + if (!handler) + return self_->bad_request(boost::beast::http::status::not_found, std::forward(resume)); + + if (handler->run == nullptr) + return self_->bad_request(boost::beast::http::status::not_implemented, std::forward(resume)); + + const auto payload_size = + self_->parser_->get().payload_size().value_or(std::numeric_limits::max()); + if (handler->max_size < payload_size) + { + boost::system::error_code error{}; + MINFO("REST client (" << self_->sock().remote_endpoint(error) << " / " << self_.get() << ") exceeded maximum body size (" << handler->max_size << " bytes)"); + return self_->bad_request(boost::beast::http::status::bad_request, std::forward(resume)); + } + + if (self_->parser_->get().method() != boost::beast::http::verb::post) + return self_->bad_request(boost::beast::http::status::method_not_allowed, std::forward(resume)); + + std::function resumer; + if (handler->is_async) + { + /* The `resumer` callback can be invoked in another strand (created + by the handler function), and therefore needs to be "wrapped" to + ensure thread safety. This also allows `resume` to be unwrapped. */ + const auto& self = self_; + resumer = self->strand_.wrap( + [self, resume] (expect body) mutable + { + if (!body) + self->bad_request(body.error(), std::move(resume)); + else + self->valid_request(std::move(body->value), std::move(resume)); + } + ); + } + + MDEBUG("Running REST handler " << handler->name << " on " << self_.get()); + auto body = handler->run(std::move(self_->parser_->get()).body(), GET_IO_SERVICE(self_->timer_), self_->parent_->data, std::move(resumer)); + if (!body) + return self_->bad_request(body.error(), std::forward(resume)); + else if (!handler->is_async || !body->empty()) + return self_->valid_request(std::move(*body), std::forward(resume)); + // else wait for `resumer` to continue response coroutine + MDEBUG("REST response to " << self_.get() << " is being generated async"); + } + + void operator()(boost::system::error_code error = {}, const std::size_t bytes = 0) + { + using not_ssl = std::is_same; + + if (!self_) + return; + + assert(self_->strand_.running_in_this_thread()); + if (error) + { + boost::system::error_code ec{}; + if (error != boost::asio::error::operation_aborted && error != boost::beast::http::error::end_of_stream) + MERROR("Error on REST socket (" << self_->sock().remote_endpoint(ec) << " / " << self_.get() << "): " << error.message()); + return self_->shutdown(); + } + + connection& self = *self_; + const bool not_first = bool(self.parser_ || !not_ssl()); + BOOST_ASIO_CORO_REENTER(*this) + { + // still need if statement, otherwise YIELD exits. + if (!not_ssl()) + { + MDEBUG("Performing SSL handshake to " << self_.get()); + connection::set_timeout(self_, rest_handshake_timeout, false); + BOOST_ASIO_CORO_YIELD async_handshake(self.sock_); + } + + for (;;) + { + self.parser_.emplace(); + self.parser_->body_limit(max_endpoint_size); + + if (!connection::set_timeout(self_, rest_request_timeout, not_first)) + return self.shutdown(); + + MDEBUG("Reading new REST request from " << self_.get()); + BOOST_ASIO_CORO_YIELD boost::beast::http::async_read( + self.sock_, self.buffer_, *self.parser_, self.strand_.wrap(std::move(*this)) + ); + + // async_response will have its own timeouts set in handlers if async + if (!self.timer_.cancel(error)) + return self.shutdown(); + + /* async_response flow has MDEBUG statements for outgoing messages. + async_response will also `self_->strand_.wrap` when necessary. */ + BOOST_ASIO_CORO_YIELD async_response(handler_loop{*this}); + + connection::set_timeout(self_, rest_response_timeout, false); + BOOST_ASIO_CORO_YIELD boost::beast::http::async_write( + self.sock_, self.response_, self.strand_.wrap(std::move(*this)) + ); + + if (!self.keep_alive_) + return self.shutdown(); + } + } + } + }; + + template + struct rest_server::accept_loop final : public boost::asio::coroutine + { + internal* self_; + std::shared_ptr> next_; + + explicit accept_loop(internal* self) noexcept + : self_(self), next_(nullptr) + {} + + void operator()(boost::system::error_code error = {}) + { + if (!self_) + return; + + BOOST_ASIO_CORO_REENTER(*this) + { + for (;;) + { + next_ = std::make_shared>(self_); + BOOST_ASIO_CORO_YIELD self_->acceptor.async_accept(next_->sock(), std::move(*this)); + + if (error) + { + MERROR("Acceptor failed: " << error.message()); + } + else + { + MDEBUG("New connection to " << next_->sock().remote_endpoint(error) << " / " << next_.get()); + next_->strand_.dispatch(handler_loop{next_}); + } + } + } + } + }; + + void rest_server::run_io() + { + try { io_service_.run(); } + catch (const std::exception& e) + { + std::raise(SIGINT); + MERROR("Error in REST I/O thread: " << e.what()); + } + catch (...) + { + std::raise(SIGINT); + MERROR("Unexpected error in REST I/O thread"); + } + } + rest_server::rest_server(epee::span addresses, std::vector admin, db::storage disk, rpc::client client, configuration config) - : io_service_(), ports_() + : io_service_(), ports_(), workers_() { if (addresses.empty()) MONERO_THROW(common_error::kInvalidArgument, "REST server requires 1 or more addresses"); @@ -1273,8 +2140,27 @@ namespace lws ssl_options.verification = epee::net_utils::ssl_verification_t::none; // clients verified with view key ssl_options.auth = std::move(config.auth); - if (!port.init(std::to_string(url.port), std::move(url.host), std::move(config.access_controls), std::move(ssl_options))) - MONERO_THROW(lws::error::http_server, "REST server failed to initialize"); + boost::asio::ip::tcp::endpoint endpoint{ + boost::asio::ip::address::from_string(url.host), + boost::lexical_cast(url.port) + }; + + port.acceptor.open(endpoint.protocol()); + +#if !defined(_WIN32) + port.acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); +#endif + + port.acceptor.bind(endpoint); + port.acceptor.listen(); + + if (ssl_options) + { + port.ssl_ = ssl_options.create_context(); + accept_loop>{std::addressof(port)}(); + } + else + accept_loop{std::addressof(port)}(); return https; }; @@ -1297,10 +2183,18 @@ namespace lws if (!any_ssl && expect_ssl) MONERO_THROW(lws::error::configuration, "Specified SSL key/cert without specifying https capable REST server"); - if (!ports_.front().run(threads, false)) - MONERO_THROW(lws::error::http_server, "REST server failed to run"); + workers_.reserve(threads); + for (std::size_t i = 0; i < threads; ++i) + workers_.emplace_back(std::bind(&rest_server::run_io, this)); } rest_server::~rest_server() noexcept - {} + { + io_service_.stop(); + for (auto& t : workers_) + { + if (t.joinable()) + t.join(); + } + } } // lws diff --git a/src/rest_server.h b/src/rest_server.h index 9e6e33f..2329ee1 100644 --- a/src/rest_server.h +++ b/src/rest_server.h @@ -28,6 +28,7 @@ #pragma once #include +#include #include #include #include @@ -43,9 +44,15 @@ namespace lws class rest_server { struct internal; + template struct connection; + template struct handler_loop; + template struct accept_loop; - boost::asio::io_service io_service_; + boost::asio::io_service io_service_; //!< Put first so its destroyed last std::list ports_; + std::vector workers_; + + void run_io(); public: struct configuration diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index c38239f..0275e03 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -39,6 +39,7 @@ #include "misc_log_ex.h" // monero/contrib/epee/include #include "net/http_client.h" // monero/contrib/epee/include #include "net/zmq.h" // monero/src +#include "net/zmq_async.h" #include "scanner.h" #include "serialization/json_object.h" // monero/src #include "wire/msgpack.h" @@ -140,7 +141,7 @@ namespace rpc break; const int err = zmq_errno(); if (err != EINTR) - return net::zmq::make_error_code(err); + return ::net::zmq::make_error_code(err); } if (items[0].revents) return success(); @@ -186,7 +187,7 @@ namespace rpc { struct context { - explicit context(zcontext comm, socket signal_pub, socket external_pub, rcontext rmq, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval, bool untrusted_daemon) + explicit context(zcontext comm, net::zmq::socket signal_pub, net::zmq::socket external_pub, rcontext rmq, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval, bool untrusted_daemon) : comm(std::move(comm)) , signal_pub(std::move(signal_pub)) , external_pub(std::move(external_pub)) @@ -207,8 +208,8 @@ namespace rpc } zcontext comm; - socket signal_pub; - socket external_pub; + net::zmq::socket signal_pub; + net::zmq::socket external_pub; rcontext rmq; const std::string daemon_addr; const std::string sub_addr; @@ -223,19 +224,15 @@ namespace rpc }; } // detail - expect client::get_response(cryptonote::rpc::Message& response, const std::chrono::seconds timeout, const source_location loc) + expect parse_response(cryptonote::rpc::Message& parser, std::string msg, source_location loc) { - expect message = get_message(timeout); - if (!message) - return message.error(); - try { - cryptonote::rpc::FullMessage fm{std::move(*message)}; + cryptonote::rpc::FullMessage fm{std::move(msg)}; const cryptonote::rpc::error json_error = fm.getError(); if (!json_error.use) { - response.fromJson(fm.getMessage()); + parser.fromJson(fm.getMessage()); return success(); } @@ -249,6 +246,30 @@ namespace rpc return {lws::error::bad_daemon_response}; } + expect client::make_daemon(const std::shared_ptr& ctx) noexcept + { + assert(ctx != nullptr); + + net::zmq::socket daemon{zmq_socket(ctx->comm.get(), ZMQ_REQ)}; + + if (daemon.get() == nullptr) + return net::zmq::get_error_code(); + MONERO_CHECK(do_set_option(daemon.get(), ZMQ_LINGER, daemon_zmq_linger)); + if (ctx->untrusted_daemon) + MONERO_CHECK(do_set_option(daemon.get(), ZMQ_MAXMSGSIZE, max_msg_req)); + MONERO_ZMQ_CHECK(zmq_connect(daemon.get(), ctx->daemon_addr.c_str())); + + return daemon; + } + + expect client::get_response(cryptonote::rpc::Message& response, const std::chrono::seconds timeout, const source_location loc) + { + expect message = get_message(timeout); + if (!message) + return message.error(); + return parse_response(response, std::move(*message), loc); + } + expect client::get_message(std::chrono::seconds timeout) { MONERO_PRECOND(ctx != nullptr); @@ -274,13 +295,10 @@ namespace rpc client out{std::move(ctx)}; - out.daemon.reset(zmq_socket(out.ctx->comm.get(), ZMQ_REQ)); - if (out.daemon.get() == nullptr) - return net::zmq::get_error_code(); - MONERO_CHECK(do_set_option(out.daemon.get(), ZMQ_LINGER, daemon_zmq_linger)); - if (out.ctx->untrusted_daemon) - MONERO_CHECK(do_set_option(out.daemon.get(), ZMQ_MAXMSGSIZE, max_msg_req)); - MONERO_ZMQ_CHECK(zmq_connect(out.daemon.get(), out.ctx->daemon_addr.c_str())); + expect daemon = make_daemon(out.ctx); + if (!daemon) + return daemon.error(); + out.daemon = std::move(*daemon); if (!out.ctx->sub_addr.empty()) { @@ -381,6 +399,16 @@ namespace rpc return {lws::error::bad_daemon_response}; } + expect client::make_async_client(boost::asio::io_service& io) const + { + MONERO_PRECOND(ctx != nullptr); + + expect daemon = make_daemon(ctx); + if (!daemon) + return daemon.error(); + return net::zmq::async_client::make(io, std::move(*daemon)); + } + expect client::send(epee::byte_slice message, std::chrono::seconds timeout) noexcept { MONERO_PRECOND(ctx != nullptr); @@ -399,7 +427,7 @@ namespace rpc return success(); } - expect client::publish(epee::byte_slice payload) + expect client::publish(epee::byte_slice payload) const { MONERO_PRECOND(ctx != nullptr); assert(daemon != nullptr); @@ -451,16 +479,16 @@ namespace rpc if (comm == nullptr) MONERO_THROW(net::zmq::get_error_code(), "zmq_init"); - detail::socket pub{zmq_socket(comm.get(), ZMQ_PUB)}; + net::zmq::socket pub{zmq_socket(comm.get(), ZMQ_PUB)}; if (pub == nullptr) MONERO_THROW(net::zmq::get_error_code(), "zmq_socket"); if (zmq_bind(pub.get(), signal_endpoint) < 0) MONERO_THROW(net::zmq::get_error_code(), "zmq_bind"); - detail::socket external_pub = nullptr; + net::zmq::socket external_pub = nullptr; if (!pub_addr.empty()) { - external_pub = detail::socket{zmq_socket(comm.get(), ZMQ_PUB)}; + external_pub = net::zmq::socket{zmq_socket(comm.get(), ZMQ_PUB)}; if (external_pub == nullptr) MONERO_THROW(net::zmq::get_error_code(), "zmq_socket"); if (zmq_bind(external_pub.get(), pub_addr.c_str()) < 0) diff --git a/src/rpc/client.h b/src/rpc/client.h index 6b992dd..e93ea0f 100644 --- a/src/rpc/client.h +++ b/src/rpc/client.h @@ -26,6 +26,7 @@ // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #pragma once +#include #include #include #include @@ -36,27 +37,19 @@ #include "byte_slice.h" // monero/contrib/epee/include #include "db/fwd.h" #include "common/expect.h" // monero/src +#include "net/zmq.h" // monero/src #include "rpc/message.h" // monero/src #include "rpc/daemon_pub.h" #include "rpc/rates.h" #include "util/source_location.h" +namespace net { namespace zmq { struct async_client; }} namespace lws { namespace rpc { namespace detail { - struct close - { - void operator()(void* ptr) const noexcept - { - if (ptr) - zmq_close(ptr); - } - }; - using socket = std::unique_ptr; - struct context; } @@ -68,18 +61,23 @@ namespace rpc std::string routing; }; - //! Abstraction for ZMQ RPC client. Only `get_rates()` thread-safe; use `clone()`. + expect parse_response(cryptonote::rpc::Message& parser, std::string msg, source_location loc = {}); + + //! Abstraction for ZMQ RPC client. All `const` and `static` methods are thread-safe. class client { std::shared_ptr ctx; - detail::socket daemon; - detail::socket daemon_sub; - detail::socket signal_sub; + net::zmq::socket daemon; + net::zmq::socket daemon_sub; + net::zmq::socket signal_sub; explicit client(std::shared_ptr ctx) noexcept : ctx(std::move(ctx)), daemon(), daemon_sub(), signal_sub() {} + //! \return Connection to daemon REQ/REP. + static expect make_daemon(const std::shared_ptr& ctx) noexcept; + //! Expect `response` as the next message payload unless error. expect get_response(cryptonote::rpc::Message& response, std::chrono::seconds timeout, source_location loc); @@ -140,6 +138,9 @@ namespace rpc return cryptonote::rpc::FullMessage::getRequest(name, message, 0); } + //! \return `async_client` to daemon. Thread safe. + expect make_async_client(boost::asio::io_service& io) const; + /*! Queue `message` for sending to daemon. If the queue is full, wait a maximum of `timeout` seconds or until `context::raise_abort_scan` or @@ -148,11 +149,11 @@ namespace rpc expect send(epee::byte_slice message, std::chrono::seconds timeout) noexcept; //! Publish `payload` to ZMQ external pub socket. - expect publish(epee::byte_slice payload); + expect publish(epee::byte_slice payload) const; //! Publish `data` after `topic` to ZMQ external pub socket. template - expect publish(const boost::string_ref topic, const T& data) + expect publish(const boost::string_ref topic, const T& data) const { epee::byte_stream bytes{}; bytes.write(topic.data(), topic.size()); @@ -174,15 +175,8 @@ namespace rpc return response; } - //! Retrieve new accounts to be scanned on this thread. - expect> pull_accounts(); - - /*! - \note This is the one function that IS thread-safe. Multiple threads can - call this function with the same `this` argument. - - \return Recent exchange rates. - */ + /*! Never blocks for I/O - that is performed on another thread. + \return Recent exchange rates. */ expect get_rates() const; }; diff --git a/src/rpc/light_wallet.cpp b/src/rpc/light_wallet.cpp index 926b7b7..7200a81 100644 --- a/src/rpc/light_wallet.cpp +++ b/src/rpc/light_wallet.cpp @@ -34,6 +34,7 @@ #include #include +#include "config.h" #include "db/string.h" #include "error.h" #include "time_helper.h" // monero/contrib/epee/include @@ -198,6 +199,14 @@ namespace lws namespace rpc { + daemon_status_response::daemon_status_response() + : outgoing_connections_count(0), + incoming_connections_count(0), + height(0), + network(lws::rpc::network_type(lws::config::network)), + state(daemon_state::unavailable) + {} + namespace { constexpr const char* map_daemon_state[] = {"ok", "no_connections", "synchronizing", "unavailable"}; diff --git a/src/rpc/light_wallet.h b/src/rpc/light_wallet.h index 1fe8724..802e837 100644 --- a/src/rpc/light_wallet.h +++ b/src/rpc/light_wallet.h @@ -94,7 +94,9 @@ namespace rpc struct daemon_status_response { - daemon_status_response() = delete; + //! Defaults to current network in unavailable state + daemon_status_response(); + std::uint64_t outgoing_connections_count; std::uint64_t incoming_connections_count; std::uint64_t height; diff --git a/src/rpc/webhook.h b/src/rpc/webhook.h index 2888df8..cdd79f8 100644 --- a/src/rpc/webhook.h +++ b/src/rpc/webhook.h @@ -138,7 +138,7 @@ namespace lws { namespace rpc } template - void zmq_send(rpc::client& client, const epee::span events, const boost::string_ref json_topic, const boost::string_ref msgpack_topic) + void zmq_send(const rpc::client& client, const epee::span events, const boost::string_ref json_topic, const boost::string_ref msgpack_topic) { // Each `T` should have a unique count. This is desired. struct zmq_order @@ -174,7 +174,7 @@ namespace lws { namespace rpc template void send_webhook( - rpc::client& client, + const rpc::client& client, const epee::span events, const boost::string_ref json_topic, const boost::string_ref msgpack_topic, diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index 2fbadf1..8c45860 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -27,7 +27,7 @@ # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. set(monero-lws-util_sources blocks.cpp gamma_picker.cpp random_outputs.cpp source_location.cpp transactions.cpp) -set(monero-lws-util_headers blocks.h fwd.h gamma_picker.h http_server.h random_outputs.h source_location.h transactions.h) +set(monero-lws-util_headers blocks.h fwd.h gamma_picker.h random_outputs.h source_location.h transactions.h) add_library(monero-lws-util ${monero-lws-util_sources} ${monero-lws-util_headers}) target_link_libraries(monero-lws-util monero::libraries monero-lws-db) diff --git a/src/util/http_server.h b/src/util/http_server.h deleted file mode 100644 index d19bb57..0000000 --- a/src/util/http_server.h +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright (c) 2020, The Monero Project -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without modification, are -// permitted provided that the following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of -// conditions and the following disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list -// of conditions and the following disclaimer in the documentation and/or other -// materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be -// used to endorse or promote products derived from this software without specific -// prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -#include -#include -#include - -#include "misc_log_ex.h" -#include "net/abstract_tcp_server2.h" // monero/contrib/epee/include -#include "net/http_protocol_handler.h" // monero/contrib/epee/include -#include "net/http_server_handlers_map2.h" // monero/contrib/epee/include - -#undef MONERO_DEFAULT_LOG_CATEGORY -#define MONERO_DEFAULT_LOG_CATEGORY "net.http" - - -namespace lws -{ - template - class http_server_impl_base: public epee::net_utils::http::i_http_server_handler - { - - public: - http_server_impl_base() - : m_net_server(epee::net_utils::e_connection_type_RPC) - {} - - explicit http_server_impl_base(boost::asio::io_service& external_io_service) - : m_net_server(external_io_service, epee::net_utils::e_connection_type_RPC) - {} - - bool init(const std::string& bind_port, const std::string& bind_ip, - std::vector access_control_origins, epee::net_utils::ssl_options_t ssl_options) - { - - //set self as callback handler - m_net_server.get_config_object().m_phandler = static_cast(this); - - //here set folder for hosting reqests - m_net_server.get_config_object().m_folder = ""; - - //set access control allow origins if configured - std::sort(access_control_origins.begin(), access_control_origins.end()); - m_net_server.get_config_object().m_access_control_origins = std::move(access_control_origins); - - - MGINFO("Binding on " << bind_ip << " (IPv4):" << bind_port); - bool res = m_net_server.init_server(bind_port, bind_ip, bind_port, std::string{}, false, true, std::move(ssl_options)); - if(!res) - { - LOG_ERROR("Failed to bind server"); - return false; - } - return true; - } - - bool run(size_t threads_count, bool wait = true) - { - //go to loop - MINFO("Run net_service loop( " << threads_count << " threads)..."); - if(!m_net_server.run_server(threads_count, wait)) - { - LOG_ERROR("Failed to run net tcp server!"); - } - - if(wait) - MINFO("net_service loop stopped."); - return true; - } - - bool deinit() - { - return m_net_server.deinit_server(); - } - - bool timed_wait_server_stop(uint64_t ms) - { - return m_net_server.timed_wait_server_stop(ms); - } - - bool send_stop_signal() - { - m_net_server.send_stop_signal(); - return true; - } - - int get_binded_port() - { - return m_net_server.get_binded_port(); - } - - long get_connections_count() const - { - return m_net_server.get_connections_count(); - } - - protected: - epee::net_utils::boosted_tcp_server > m_net_server; - }; -} // lws