diff --git a/src/client_main.cpp b/src/client_main.cpp index 37d6c91..aa99899 100644 --- a/src/client_main.cpp +++ b/src/client_main.cpp @@ -195,7 +195,7 @@ namespace { std::shared_ptr client_; - bool operator()(lws::rpc::client&, epee::span chain, epee::span users, epee::span pow, const lws::scanner_options&) + bool operator()(boost::asio::io_context&, lws::rpc::client&, net::http::client&, epee::span chain, epee::span users, epee::span pow) { if (!client_) return false; @@ -246,14 +246,12 @@ namespace if (!users.empty()) { - static constexpr const lws::scanner_options opts{ - epee::net_utils::ssl_verification_t::system_ca, false, false - }; + static constexpr const lws::scanner_options opts{false, false}; auto new_client = MONERO_UNWRAP(zclient.clone()); MONERO_UNWRAP(new_client.watch_scan_signals()); send_users send{client}; - if (!lws::scanner::loop(self.stop_, std::move(send), std::nullopt, std::move(new_client), std::move(users), *queue, opts, false)) + if (!lws::scanner::loop(self, std::move(send), std::nullopt, std::move(new_client), std::move(users), *queue, opts, false)) return; } } @@ -275,7 +273,7 @@ namespace MINFO("Using monerod ZMQ RPC at " << prog.monerod_rpc); auto ctx = lws::rpc::context::make(std::move(prog.monerod_rpc), std::move(prog.monerod_sub), {}, {}, std::chrono::minutes{0}, false); - lws::scanner_sync self{}; + lws::scanner_sync self{epee::net_utils::ssl_verification_t::system_ca}; /*! \NOTE `ctx will need a strand or lock if multiple threads use `self.io.run()` in the future. */ diff --git a/src/net/CMakeLists.txt b/src/net/CMakeLists.txt index 588d2f1..3d2e06e 100644 --- a/src/net/CMakeLists.txt +++ b/src/net/CMakeLists.txt @@ -26,8 +26,10 @@ # STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +add_subdirectory(http) + set(monero-lws-net_sources zmq_async.cpp) set(monero-lws-net_headers zmq_async.h) add_library(monero-lws-net ${monero-lws-net_sources} ${monero-lws-net_headers}) -target_link_libraries(monero-lws-net monero::libraries) +target_link_libraries(monero-lws-net monero-lws-net-http monero::libraries) diff --git a/src/net/http/CMakeLists.txt b/src/net/http/CMakeLists.txt new file mode 100644 index 0000000..042408a --- /dev/null +++ b/src/net/http/CMakeLists.txt @@ -0,0 +1,33 @@ +# Copyright (c) 2024, The Monero Project +# +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, are +# permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this list of +# conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, this list +# of conditions and the following disclaimer in the documentation and/or other +# materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors may be +# used to endorse or promote products derived from this software without specific +# prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +# THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +set(monero-lws-net-http_sources client.cpp) +set(monero-lws-net-http_headers client.h slice_body.h) + +add_library(monero-lws-net-http ${monero-lws-net-http_sources} ${monero-lws-net-http_headers}) +target_link_libraries(monero-lws-net-http ${Boost_SYSTEM_LIBRARY} monero::libraries) diff --git a/src/net/http/client.cpp b/src/net/http/client.cpp new file mode 100644 index 0000000..18ef9cb --- /dev/null +++ b/src/net/http/client.cpp @@ -0,0 +1,482 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "client.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "error.h" +#include "misc_log_ex.h" // monero/contrib/epee/include +#include "net/http_base.h" // monero/contrib/epee/include +#include "net/http/slice_body.h" +#include "net/net_parse_helpers.h" // monero/contrib/epee/include + +namespace net { namespace http +{ + namespace + { + constexpr const unsigned http_version = 11; + constexpr const std::size_t http_parser_buffer_size = 16 * 1024; + constexpr const std::size_t max_body_size = 4 * 1024; + + //! Timeout for 1 entire HTTP request (connect, handshake, send, receive). + constexpr const std::chrono::seconds message_timeout{30}; + + struct message + { + message(epee::byte_slice json_body, std::string host, std::string target, std::uint16_t port, bool https, std::function&& notifier) + : json_body(std::move(json_body)), + notifier(std::move(notifier)), + host(std::move(host)), + target(std::move(target)), + port(port), + https(https) + {} + + message(message&&) = default; + message(const message& rhs) + : json_body(rhs.json_body.clone()), + notifier(rhs.notifier), + host(rhs.host), + target(rhs.target), + port(rhs.port), + https(rhs.https) + {} + + epee::byte_slice json_body; + std::function notifier; + std::string host; + std::string target; + std::uint16_t port; + bool https; + }; + + std::ostream& operator<<(std::ostream& out, const message& src) + { + out << (src.https ? "https://" : "http://") << src.host << ':' << src.port << src.target; + return out; + } + } // anonymous + + struct client_state + { + std::shared_ptr ssl; + boost::beast::flat_static_buffer buffer; + std::deque outgoing; + std::string last_host; + boost::asio::ip::tcp::resolver resolver; + boost::asio::steady_timer timer; + boost::asio::io_context::strand strand; + boost::asio::ip::tcp::endpoint endpoint; + boost::beast::http::request request; + boost::optional> sock; + boost::optional> parser; + std::size_t iteration; + + client_state(boost::asio::io_context& io, std::shared_ptr in) + : ssl(std::move(in)), + buffer{}, + outgoing(), + last_host(), + resolver(io), + timer(io), + strand(io), + endpoint(), + request{}, + sock(), + parser(), + iteration(0) + { + assert(ssl); + sock.emplace(io, *ssl); + } + + template + void async_write(F&& callback) + { + assert(sock); + assert(!outgoing.empty()); + const bool no_body = outgoing.front().json_body.empty(); + request = { + outgoing.front().notifier ? boost::beast::http::verb::get : boost::beast::http::verb::post, + outgoing.front().target, + http_version, + std::move(outgoing.front().json_body) + }; + request.set(boost::beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING); + if (!no_body) + request.set(boost::beast::http::field::content_type, "application/json"); + + // Setting Host is tricky. Check for v6 and non-standard ports + boost::system::error_code error{}; + boost::asio::ip::make_address_v6(outgoing.front().host, error); + if (!error) + request.set(boost::beast::http::field::host, "[" + outgoing.front().host + "]:" + std::to_string(outgoing.front().port)); + else if ((outgoing.front().https && outgoing.front().port == 443) || (!outgoing.front().https && outgoing.front().port == 80)) + request.set(boost::beast::http::field::host, outgoing.front().host); + else + request.set(boost::beast::http::field::host, outgoing.front().host + ":" + std::to_string(outgoing.front().port)); + + request.prepare_payload(); + if (outgoing.front().https) + boost::beast::http::async_write(*sock, request, boost::asio::bind_executor(strand, std::forward(callback))); + else + boost::beast::http::async_write(sock->next_layer(), request, boost::asio::bind_executor(strand, std::forward(callback))); + } + + template + void async_read(F&& callback) + { + assert(sock); + assert(!outgoing.empty()); + parser.emplace(); + parser->body_limit(max_body_size); + if (outgoing.front().https) + boost::beast::http::async_read(*sock, buffer, *parser, boost::asio::bind_executor(strand, std::forward(callback))); + else + boost::beast::http::async_read(sock->next_layer(), buffer, *parser, boost::asio::bind_executor(strand, std::forward(callback))); + } + + void notify_error(const boost::system::error_code& error) const + { + assert(!outgoing.empty()); + if (outgoing.front().notifier) + outgoing.front().notifier(error, {}); + } + }; + + namespace + { + class client_loop : public boost::asio::coroutine + { + std::shared_ptr self_; + + public: + explicit client_loop(std::shared_ptr self) noexcept + : boost::asio::coroutine(), self_(std::move(self)) + {} + + bool set_timeout(const std::chrono::steady_clock::duration timeout) + { + if (!self_) + return false; + + struct on_timeout + { + on_timeout() = delete; + std::shared_ptr self_; + std::size_t iteration; + + void operator()(boost::system::error_code error) const + { + if (!self_ || error == boost::asio::error::operation_aborted) + return; + if (iteration < self_->iteration) + return; + + assert(self_->strand.running_in_this_thread()); + assert(self_->sock); + if (!self_->outgoing.empty()) + MWARNING("Timeout in HTTP attempt to " << self_->outgoing.front()); + else + MERROR("Unexpected empty message stack"); + + self_->resolver.cancel(); + self_->sock->next_layer().cancel(error); + self_->sock->next_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, error); + } + }; + + self_->timer.expires_after(timeout); + self_->timer.async_wait(boost::asio::bind_executor(self_->strand, on_timeout{self_, self_->iteration})); + return true; + } + + void operator()(boost::system::error_code error = {}, std::size_t = 0) + { + if (!self_) + return; + + bool reuse = false; + bool is_https = false; + std::uint16_t last_port = 0; + client_state& self = *self_; + assert(self.strand.running_in_this_thread()); + assert(self.sock); + BOOST_ASIO_CORO_REENTER(*this) + { + while (!self.outgoing.empty()) + { + struct resolve + { + client_loop continue_; + + void operator()(const boost::system::error_code error, const boost::asio::ip::tcp::resolver::results_type& ips) + { + if (error) + std::move(continue_)(error, 0); + else if (ips.empty()) + std::move(continue_)(boost::asio::error::host_not_found, 0); + else if (continue_.self_) + { + continue_.self_->endpoint = *ips.begin(); + std::move(continue_)(error, 0); + } + } + }; + + set_timeout(message_timeout); + + if (!reuse) + { + MDEBUG("Resolving " << self.outgoing.front().host << " for HTTP"); + BOOST_ASIO_CORO_YIELD self.resolver.async_resolve( + self.outgoing.front().host, + std::to_string(self.outgoing.front().port), + boost::asio::bind_executor(self.strand, resolve{std::move(*this)}) + ); + } + + if (!error) + { + if (!reuse) + { + MDEBUG("Connecting to " << self.endpoint << " / " << self.outgoing.front().host << " for HTTP"); + BOOST_ASIO_CORO_YIELD self.sock->next_layer().async_connect( + self.endpoint, boost::asio::bind_executor(self.strand, std::move(*this)) + ); + self.buffer.clear(); // do not re-use http buffer + } + + if (!error) + { + if (!reuse && self.outgoing.front().https) + { + { + SSL* const ssl_ctx = self.sock->native_handle(); + if (ssl_ctx) + SSL_set_tlsext_host_name(ssl_ctx, self.outgoing.front().host.c_str()); + } + MDEBUG("Starting SSL handshake to " << self.outgoing.front().host << " for HTTP"); + BOOST_ASIO_CORO_YIELD self.sock->async_handshake( + boost::asio::ssl::stream::client, + boost::asio::bind_executor(self.strand, std::move(*this)) + ); + } + + if (!error) + { + reuse = false; + MDEBUG("Sending " << self.outgoing.front().json_body.size() << " bytes in HTTP " << (self.outgoing.front().notifier ? "GET" : "POST") << " to " << self.outgoing.front()); + BOOST_ASIO_CORO_YIELD self.async_write(std::move(*this)); + + if (!error) + { + MDEBUG("Starting read from " << self.outgoing.front() << " to previous HTTP message"); + BOOST_ASIO_CORO_YIELD self.async_read(std::move(*this)); + + if (error) + MERROR("Failed to parse HTTP response from " << self.outgoing.front() << ": " << error.message()); + else if (self.parser->get().result_int() != 200 && self.parser->get().result_int() != 201) + { + MERROR(self.outgoing.front() << " returned " << self.parser->get().result_int() << " status code"); + self.notify_error(boost::asio::error::operation_not_supported); + } + else + { + MDEBUG(self.outgoing.front() << " successful"); + reuse = self.parser->get().keep_alive(); + if (self.outgoing.front().notifier) + self.outgoing.front().notifier({}, std::move(self.parser->get()).body()); + } + } + else + MERROR("Failed HTTP " << (self.outgoing.front().notifier ? "GET" : "POST") << " to " << self.outgoing.front() << ": " << error.message()); + } + else + MERROR("SSL handshake to " << self.outgoing.front().host << " failed: " << error.message()); + } + else + MERROR("Failed to connect to " << self.outgoing.front().host << ": " << error.message()); + } + else + MERROR("Failed to resolve TCP/IP address for " << self.outgoing.front().host << ": " << error.message()); + + if (error) + self.notify_error(error); + + is_https = self.outgoing.front().https; + self.last_host = std::move(self.outgoing.front().host); + last_port = self.outgoing.front().port; + self.outgoing.pop_front(); + + reuse = reuse && + !self.outgoing.empty() && + self.last_host == self.outgoing.front().host && + last_port == self.outgoing.front().port; + + if (!reuse) + { + if (is_https) + { + MDEBUG("Starting SSL shutdown on " << self.last_host); + BOOST_ASIO_CORO_YIELD self.sock->async_shutdown( + boost::asio::bind_executor(self.strand, std::move(*this)) + ); + is_https = true; + } + + MDEBUG("Cleaning up connection to " << self.last_host); + self.sock->next_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, error); + self.sock->next_layer().close(error); + if (is_https) // must clear SSL state + self.sock.emplace(self.timer.get_executor(), *self.ssl); + } + + ++self.iteration; + } + } + } + }; + + boost::asio::ssl::context make_context(epee::net_utils::ssl_verification_t verify) + { + epee::net_utils::ssl_options_t ssl{ + epee::net_utils::ssl_support_t::e_ssl_support_enabled + }; + ssl.verification = verify; + return ssl.create_context(); + } + } // anonymous + + expect client::queue_async(boost::asio::io_context& io, std::string url, epee::byte_slice json_body, std::function notifier) + { + static constexpr const std::uint16_t max_port = std::numeric_limits::max(); + + epee::net_utils::http::url_content parsed{}; + if (!epee::net_utils::parse_url(std::move(url), parsed) || max_port < parsed.port) + return {lws::error::bad_url}; + if (parsed.schema != "http" && parsed.schema != "https") + return {lws::error::bad_url}; + + if (!parsed.port) + parsed.port = (parsed.schema == "http") ? 80 : 443; + + if (parsed.uri.empty()) + parsed.uri = "/"; + + message msg{ + std::move(json_body), + std::move(parsed.host), + std::move(parsed.uri), + std::uint16_t(parsed.port), + bool(parsed.schema == "https"), + std::move(notifier) + }; + + MDEBUG("Queueing HTTP " << (msg.notifier ? "GET" : "POST") << " to " << msg << " using " << this); + boost::unique_lock lock{sync_}; + auto state = state_.lock(); + if (!state) + { + // `make_shared` delays freeing of data section, use `make_unique` + MDEBUG("Creating new net::http::client_state for " << this); + state = {std::make_unique(io, ssl_)}; + state_ = state; + state->outgoing.push_back(std::move(msg)); + + lock.unlock(); + boost::asio::post(state->strand, client_loop{state}); + } + else + { + lock.unlock(); + boost::asio::dispatch( + state->strand, + [state, msg = std::move(msg)] () mutable + { + const bool empty = state->outgoing.empty(); + state->outgoing.push_back(std::move(msg)); + if (empty) + boost::asio::post(state->strand, client_loop{state}); + } + ); + } + + return success(); + } + + client::client(epee::net_utils::ssl_verification_t verify) + : state_(), + ssl_(std::make_shared(make_context(verify))), + sync_() + {} + + client::client(std::shared_ptr ssl) + : state_(), ssl_(std::move(ssl)), sync_() + { + if (!ssl_) + throw std::logic_error{"boost::asio::ssl::context cannot be nullptr"}; + } + + client::~client() + {} + + expect client::post_async(boost::asio::io_context& io, std::string url, epee::byte_slice json_body) + { + return queue_async(io, std::move(url), std::move(json_body), {}); + } + + expect client::get_async(boost::asio::io_context& io, std::string url, std::function notifier) + { + if (!notifier) + throw std::logic_error{"net::http::client::get_async requires callback"}; + return queue_async(io, std::move(url), epee::byte_slice{}, std::move(notifier)); + } +}} // net // http + diff --git a/src/net/http/client.h b/src/net/http/client.h new file mode 100644 index 0000000..194c47c --- /dev/null +++ b/src/net/http/client.h @@ -0,0 +1,72 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include "byte_slice.h" // monero/contrib/epee/include +#include "common/expect.h" // monero/src +#include "net/net_ssl.h" // monero/contrib/epee/include + +namespace net { namespace http +{ + struct client_state; + using server_response_func = void(boost::system::error_code, std::string); + + //! Primarily for webhooks, where the response is (basically) ignored. + class client + { + std::weak_ptr state_; + std::shared_ptr ssl_; + boost::mutex sync_; + + expect queue_async(boost::asio::io_context& io, std::string url, epee::byte_slice json_body, std::function notifier); + + public: + explicit client(epee::net_utils::ssl_verification_t verify); + explicit client(std::shared_ptr ssl); + ~client(); + + const std::shared_ptr& ssl_context() const noexcept + { return ssl_; } + + //! Never blocks. Thread safe. \return `success()` if `url` is valid. + expect post_async(boost::asio::io_context& io, std::string url, epee::byte_slice json_body); + + /*! Never blocks. Thread safe. Calls `notifier` with server response iff + `success()` is returned. + \return `success()` if `url` is valid. */ + expect get_async(boost::asio::io_context& io, std::string url, std::function notifier); + }; +}} // net // http + diff --git a/src/net/http/slice_body.h b/src/net/http/slice_body.h new file mode 100644 index 0000000..2330897 --- /dev/null +++ b/src/net/http/slice_body.h @@ -0,0 +1,77 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include +#include +#include +#include +#include +#include "byte_slice.h" // monero/contrib/epee/include + +namespace net { namespace http +{ + //! Trait for `boost::beast` body type + struct slice_body + { + using value_type = epee::byte_slice; + + static std::uint64_t size(const value_type& source) noexcept + { + static_assert(!std::numeric_limits::is_signed, "expected unsigned"); + static_assert( + std::numeric_limits::max() <= std::numeric_limits::max(), + "unexpected size_t max value" + ); + return source.size(); + } + + struct writer + { + epee::byte_slice body_; + + using const_buffers_type = boost::asio::const_buffer; + + template + explicit writer(boost::beast::http::header const&, value_type const& body) + : body_(body.clone()) + {} + + void init(boost::beast::error_code& ec) + { + ec = {}; + } + + boost::optional> get(boost::beast::error_code& ec) + { + ec = {}; + return {{const_buffers_type{body_.data(), body_.size()}, false}}; + } + }; + }; +}} // net // http diff --git a/src/rest_server.cpp b/src/rest_server.cpp index 7786b1a..820fa50 100644 --- a/src/rest_server.cpp +++ b/src/rest_server.cpp @@ -55,6 +55,7 @@ #include #include #include +#include #include #include #include @@ -69,8 +70,11 @@ #include "db/string.h" #include "error.h" #include "lmdb/util.h" // monero/src +#include "net/http/client.h" +#include "net/http/slice_body.h" #include "net/net_parse_helpers.h" // monero/contrib/epee/include #include "net/net_ssl.h" // monero/contrib/epee/include +#include "net/net_utils_base.h" // monero/contrib/epee/include #include "net/zmq.h" // monero/src #include "net/zmq_async.h" #include "rpc/admin.h" @@ -102,6 +106,7 @@ namespace lws const rpc::client client; const runtime_options options; std::vector clients; + net::http::client webhook_client; boost::mutex sync; rest_server_data(db::storage disk, rpc::client client, runtime_options options) @@ -109,6 +114,7 @@ namespace lws disk(std::move(disk)), client(std::move(client)), options(std::move(options)), + webhook_client(options.webhook_verify), clients(), sync() {} @@ -1303,7 +1309,7 @@ namespace lws using request = rpc::login_request; using response = rpc::login_response; - static expect handle(request req, const rest_server_data& data, std::function&& resume) + static expect handle(request req, rest_server_data& data, std::function&&) { if (!key_check(req.creds)) return {lws::error::bad_view_key}; @@ -1346,8 +1352,8 @@ namespace lws // webhooks are not needed for response, so just queue i/o and // log errors when it fails - rpc::send_webhook( - data.client, epee::to_span(*hooks), "json-full-new_account_hook:", "msgpack-full-new_account_hook:", std::chrono::seconds{5}, data.options.webhook_verify + rpc::send_webhook_async( + data.io, data.client, data.webhook_client, epee::to_span(*hooks), "json-full-new_account_hook:", "msgpack-full-new_account_hook:" ); } @@ -1892,7 +1898,7 @@ namespace lws Sock sock_; boost::beast::flat_static_buffer buffer_; boost::optional> parser_; - boost::beast::http::response response_; + boost::beast::http::response response_; boost::asio::steady_timer timer_; boost::asio::io_context::strand strand_; bool keep_alive_; diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index 3ced99c..1f6bdbb 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -42,7 +42,7 @@ #include "net/zmq_async.h" #include "scanner.h" #include "serialization/json_object.h" // monero/src -#include "wire/msgpack.h" +#include "wire/json/write.h" #if MLWS_RMQ_ENABLED #include #include @@ -50,6 +50,15 @@ namespace lws { + // Not in `rates.h` - defaulting to JSON output seems odd + std::ostream& operator<<(std::ostream& out, lws::rates const& src) + { + wire::json_stream_writer dest{out}; + lws::write_bytes(dest, src); + dest.finish(); + return out; + } + namespace rpc { namespace http = epee::net_utils::http; @@ -194,17 +203,16 @@ namespace rpc , rmq(std::move(rmq)) , daemon_addr(std::move(daemon_addr)) , sub_addr(std::move(sub_addr)) - , rates_conn() + , rates_conn(epee::net_utils::ssl_verification_t::system_ca) , cache_time() , cache_interval(interval) , cached{} - , account_counter(0) , sync_pub() , sync_rates() , untrusted_daemon(untrusted_daemon) + , rates_running() { - if (std::chrono::minutes{0} < cache_interval) - rates_conn.set_server(crypto_compare.host, boost::none, epee::net_utils::ssl_support_t::e_ssl_support_enabled); + rates_running.clear(); } zcontext comm; @@ -213,14 +221,14 @@ namespace rpc rcontext rmq; const std::string daemon_addr; const std::string sub_addr; - http::http_simple_client rates_conn; + net::http::client rates_conn; std::chrono::steady_clock::time_point cache_time; const std::chrono::minutes cache_interval; rates cached; - std::atomic account_counter; boost::mutex sync_pub; boost::mutex sync_rates; const bool untrusted_daemon; + std::atomic_flag rates_running; }; } // detail @@ -597,37 +605,44 @@ namespace rpc return do_signal(ctx->signal_pub.get(), abort_process_signal); } - expect> context::retrieve_rates() + expect context::retrieve_rates_async(boost::asio::io_context& io) { MONERO_PRECOND(ctx != nullptr); if (ctx->cache_interval <= std::chrono::minutes{0}) - return boost::make_optional(false, ctx->cached); + return success(); - const auto now = std::chrono::steady_clock::now(); - if (now - ctx->cache_time < ctx->cache_interval) - return boost::make_optional(false, ctx->cached); + if (ctx->rates_running.test_and_set()) + return success(); - expect fresh{lws::error::exchange_rates_fetch}; + auto& self = ctx; + const expect rc = ctx->rates_conn.get_async( + io, crypto_compare.url, [self] (boost::system::error_code error, std::string body) + { + expect fresh{lws::error::exchange_rates_fetch}; + if (!error) + { + fresh = crypto_compare(std::move(body)); + if (fresh) + MINFO("Updated exchange rates: " << *fresh); + else + MERROR("Failed to parse exchange rates: " << fresh.error()); + } + else + MERROR("Failed to retrieve exchange rates: " << error.message()); - const http::http_response_info* info = nullptr; - const bool retrieved = - ctx->rates_conn.invoke_get(crypto_compare.path, std::chrono::seconds{20}, std::string{}, std::addressof(info)) && - info != nullptr && - info->m_response_code == 200; - - // \TODO Remove copy below - if (retrieved) - fresh = crypto_compare(std::string{info->m_body}); - - const boost::unique_lock lock{ctx->sync_rates}; - ctx->cache_time = now; - if (fresh) - { - ctx->cached = *fresh; - return boost::make_optional(*fresh); - } - return fresh.error(); + const auto now = std::chrono::steady_clock::now(); + if (fresh) + { + const boost::lock_guard lock{self->sync_rates}; + self->cache_time = now; + self->cached = std::move(*fresh); + } + self->rates_running.clear(); + }); + if (!rc) + ctx->rates_running.clear(); + return rc; } } // rpc } // lws diff --git a/src/rpc/client.h b/src/rpc/client.h index 6dba671..719f1b8 100644 --- a/src/rpc/client.h +++ b/src/rpc/client.h @@ -148,10 +148,10 @@ namespace rpc */ expect send(epee::byte_slice message, std::chrono::seconds timeout) noexcept; - //! Publish `payload` to ZMQ external pub socket. + //! Publish `payload` to ZMQ external pub socket. Blocks iff RMQ. expect publish(epee::byte_slice payload) const; - //! Publish `data` after `topic` to ZMQ external pub socket. + //! Publish `data` after `topic` to ZMQ external pub socket. Blocks iff RMQ. template expect publish(const boost::string_ref topic, const T& data) const { @@ -250,14 +250,11 @@ namespace rpc expect raise_abort_process() noexcept; /*! - Retrieve exchange rates, if enabled and past cache interval. Not - thread-safe (this can be invoked from one thread only, but this is - thread-safe with `client::get_rates()`). All clients will see new rates - immediately. + Retrieve exchange rates, if enabled. Thread-safe. All clients will see + new rates once completed. - \return Rates iff they were updated. - */ - expect> retrieve_rates(); + \return `success()` if HTTP GET was queued. */ + expect retrieve_rates_async(boost::asio::io_context& io); }; } // rpc } // lws diff --git a/src/rpc/rates.cpp b/src/rpc/rates.cpp index 51ddab8..480d08b 100644 --- a/src/rpc/rates.cpp +++ b/src/rpc/rates.cpp @@ -65,10 +65,10 @@ namespace lws namespace rpc { - const char crypto_compare_::host[] = "https://min-api.cryptocompare.com:443"; - const char crypto_compare_::path[] = - "/data/price?fsym=XMR&tsyms=AUD,BRL,BTC,CAD,CHF,CNY,EUR,GBP," - "HKD,INR,JPY,KRW,MXN,NOK,NZD,SEK,SGD,TRY,USD,RUB,ZAR"; + const char crypto_compare_::url[] = + "https://min-api.cryptocompare.com" + "/data/price?fsym=XMR&tsyms=AUD,BRL,BTC,CAD,CHF,CNY,EUR,GBP," + "HKD,INR,JPY,KRW,MXN,NOK,NZD,SEK,SGD,TRY,USD,RUB,ZAR"; expect crypto_compare_::operator()(std::string&& body) const { diff --git a/src/rpc/rates.h b/src/rpc/rates.h index d5ceaa2..53d543b 100644 --- a/src/rpc/rates.h +++ b/src/rpc/rates.h @@ -64,8 +64,7 @@ namespace lws { struct crypto_compare_ { - static const char host[]; - static const char path[]; + static const char url[]; expect operator()(std::string&& body) const; }; diff --git a/src/rpc/scanner/server.cpp b/src/rpc/scanner/server.cpp index 15d3fba..22deb12 100644 --- a/src/rpc/scanner/server.cpp +++ b/src/rpc/scanner/server.cpp @@ -458,7 +458,7 @@ namespace lws { namespace rpc { namespace scanner }; } - server::server(boost::asio::io_context& io, db::storage disk, rpc::client zclient, std::vector> local, std::vector active, ssl_verification_t webhook_verify) + server::server(boost::asio::io_context& io, db::storage disk, rpc::client zclient, std::vector> local, std::vector active, std::shared_ptr ssl) : strand_(io), check_timer_(io), acceptor_(io), @@ -467,11 +467,11 @@ namespace lws { namespace rpc { namespace scanner active_(std::move(active)), disk_(std::move(disk)), zclient_(std::move(zclient)), + webhook_(std::move(ssl)), accounts_cur_{}, next_thread_(0), pass_hashed_(), pass_salt_(), - webhook_verify_(webhook_verify), stop_(false) { std::sort(active_.begin(), active_.end()); @@ -563,8 +563,7 @@ namespace lws { namespace rpc { namespace scanner self->strand_, [self, users = std::move(users), blocks = std::move(blocks)] () { - const lws::scanner_options opts{self->webhook_verify_, false, false}; - if (!lws::user_data::store(self->disk_, self->zclient_, epee::to_span(blocks), epee::to_span(users), nullptr, opts)) + if (!lws::user_data::store(self->strand_.context(), self->disk_, self->zclient_, self->webhook_, epee::to_span(blocks), epee::to_span(users), nullptr)) { self->do_stop(); self->strand_.context().stop(); diff --git a/src/rpc/scanner/server.h b/src/rpc/scanner/server.h index 8dc50fa..2c0101c 100644 --- a/src/rpc/scanner/server.h +++ b/src/rpc/scanner/server.h @@ -38,6 +38,7 @@ #include "db/fwd.h" #include "db/storage.h" +#include "net/http/client.h" #include "net/net_ssl.h" // monero/contrib/epee/include #include "rpc/client.h" #include "rpc/scanner/queue.h" @@ -65,11 +66,11 @@ namespace lws { namespace rpc { namespace scanner std::vector active_; db::storage disk_; rpc::client zclient_; + net::http::client webhook_; db::cursor::accounts accounts_cur_; std::size_t next_thread_; std::array pass_hashed_; std::array pass_salt_; - const ssl_verification_t webhook_verify_; bool stop_; //! Async acceptor routine @@ -85,7 +86,7 @@ namespace lws { namespace rpc { namespace scanner public: static boost::asio::ip::tcp::endpoint get_endpoint(const std::string& address); - explicit server(boost::asio::io_context& io, db::storage disk, rpc::client zclient, std::vector> local, std::vector active, ssl_verification_t webhook_verify); + explicit server(boost::asio::io_context& io, db::storage disk, rpc::client zclient, std::vector> local, std::vector active, std::shared_ptr ssl); server(const server&) = delete; server(server&&) = delete; diff --git a/src/rpc/webhook.h b/src/rpc/webhook.h index cdd79f8..c89a522 100644 --- a/src/rpc/webhook.h +++ b/src/rpc/webhook.h @@ -27,161 +27,103 @@ #pragma once +#include #include #include #include #include #include "byte_slice.h" // monero/contrib/epee/include #include "misc_log_ex.h" // monero/contrib/epee/include -#include "net/http_client.h" // monero/contrib/epee/include +#include "net/http/client.h" #include "span.h" #include "wire/json.h" #include "wire/msgpack.h" namespace lws { namespace rpc { - namespace net = epee::net_utils; - - template - void http_send(net::http::http_simple_client& client, boost::string_ref uri, const T& event, const net::http::fields_list& params, const std::chrono::milliseconds timeout) +template +void http_async(boost::asio::io_context& io, net::http::client& client, const epee::span events) +{ + for (const auto& event : events) { - if (uri.empty()) - uri = "/"; - - epee::byte_slice bytes{}; - const std::string& url = event.value.second.url; - const std::error_code json_error = wire::json::to_bytes(bytes, event); - const net::http::http_response_info* info = nullptr; - if (json_error) + if (event.value.second.url != "zmq") { - MERROR("Failed to generate webhook JSON: " << json_error.message()); - return; - } - - MINFO("Sending webhook to " << url); - if (!client.invoke(uri, "POST", std::string{bytes.begin(), bytes.end()}, timeout, std::addressof(info), params)) - { - MERROR("Failed to invoke http request to " << url); - return; - } - - if (!info) - { - MERROR("Failed to invoke http request to " << url << ", internal error (null response ptr)"); - return; - } - - if (info->m_response_code != 200 && info->m_response_code != 201) - { - MERROR("Failed to invoke http request to " << url << ", wrong response code: " << info->m_response_code); - return; + epee::byte_slice bytes{}; + const std::error_code json_error = wire::json::to_bytes(bytes, event); + if (!json_error) + { + MINFO("Sending webhook to " << event.value.second.url); + const expect rc = + client.post_async(io, event.value.second.url, std::move(bytes)); + if (!rc) + MERROR("Failed to send HTTP webhook to " << event.value.second.url << ": " << rc.error().message()); + } + else + MERROR("Failed to generate webhook JSON: " << json_error.message()); } } +} - template - void http_send(const epee::span events, const std::chrono::milliseconds timeout, net::ssl_verification_t verify_mode) +template +struct zmq_index_single +{ + const std::uint64_t index; + const T& event; +}; + +template +void write_bytes(wire::writer& dest, const zmq_index_single& self) +{ + wire::object(dest, WIRE_FIELD(index), WIRE_FIELD(event)); +} + +template +void zmq_send(const rpc::client& client, const epee::span events, const boost::string_ref json_topic, const boost::string_ref msgpack_topic) +{ + // Each `T` should have a unique count. This is desired. + struct zmq_order { - if (events.empty()) - return; + std::uint64_t current; + boost::mutex sync; - net::http::url_content url{}; - net::http::http_simple_client client{}; + zmq_order() + : current(0), sync() + {} + }; - net::http::fields_list params; - params.emplace_back("Content-Type", "application/json; charset=utf-8"); + static zmq_order ordering{}; + + //! \TODO monitor XPUB to cull the serialization + if (!events.empty() && client.has_publish()) + { + // make sure the event is queued to zmq in order. + const boost::unique_lock guard{ordering.sync}; for (const auto& event : events) { - if (event.value.second.url.empty() || !net::parse_url(event.value.second.url, url)) - { - MERROR("Bad URL for webhook event: " << event.value.second.url); - continue; - } - - const bool https = (url.schema == "https"); - if (!https && url.schema != "http") - { - MERROR("Only http or https connections: " << event.value.second.url); - continue; - } - - const net::ssl_support_t ssl_mode = https ? - net::ssl_support_t::e_ssl_support_enabled : net::ssl_support_t::e_ssl_support_disabled; - net::ssl_options_t ssl_options{ssl_mode}; - if (https) - ssl_options.verification = verify_mode; - - if (url.port == 0) - url.port = https ? 443 : 80; - - client.set_server(url.host, std::to_string(url.port), boost::none, std::move(ssl_options)); - if (client.connect(timeout)) - http_send(client, url.uri, event, params, timeout); - else - MERROR("Unable to send webhook to " << event.value.second.url); - - client.disconnect(); + const zmq_index_single index{ordering.current++, event}; + MINFO("Sending ZMQ-PUB topics " << json_topic << " and " << msgpack_topic); + expect result = success(); + if (!(result = client.publish(json_topic, index))) + MERROR("Failed to serialize+send " << json_topic << " " << result.error().message()); + if (!(result = client.publish(msgpack_topic, index))) + MERROR("Failed to serialize+send " << msgpack_topic << " " << result.error().message()); } } +} - template - struct zmq_index_single - { - const std::uint64_t index; - const T& event; - }; - - template - void write_bytes(wire::writer& dest, const zmq_index_single& self) - { - wire::object(dest, WIRE_FIELD(index), WIRE_FIELD(event)); - } - - template - void zmq_send(const rpc::client& client, const epee::span events, const boost::string_ref json_topic, const boost::string_ref msgpack_topic) - { - // Each `T` should have a unique count. This is desired. - struct zmq_order - { - std::uint64_t current; - boost::mutex sync; - - zmq_order() - : current(0), sync() - {} - }; - - static zmq_order ordering{}; - - //! \TODO monitor XPUB to cull the serialization - if (!events.empty() && client.has_publish()) - { - // make sure the event is queued to zmq in order. - const boost::unique_lock guard{ordering.sync}; - - for (const auto& event : events) - { - const zmq_index_single index{ordering.current++, event}; - MINFO("Sending ZMQ-PUB topics " << json_topic << " and " << msgpack_topic); - expect result = success(); - if (!(result = client.publish(json_topic, index))) - MERROR("Failed to serialize+send " << json_topic << " " << result.error().message()); - if (!(result = client.publish(msgpack_topic, index))) - MERROR("Failed to serialize+send " << msgpack_topic << " " << result.error().message()); - } - } - } - - template - void send_webhook( +template + void send_webhook_async( + boost::asio::io_context& io, const rpc::client& client, + net::http::client& http_client, const epee::span events, const boost::string_ref json_topic, - const boost::string_ref msgpack_topic, - const std::chrono::seconds timeout, - epee::net_utils::ssl_verification_t verify_mode) + const boost::string_ref msgpack_topic) { - http_send(events, timeout, verify_mode); + http_async(io, http_client, events); + + // ZMQ PUB sockets never block, but RMQ does. No easy way around this. zmq_send(client, events, json_topic, msgpack_topic); } }} // lws // rpc diff --git a/src/scanner.cpp b/src/scanner.cpp index 9aeb584..3ffc7de 100644 --- a/src/scanner.cpp +++ b/src/scanner.cpp @@ -56,6 +56,7 @@ #include "misc_log_ex.h" // monero/contrib/epee/include #include "net/net_parse_helpers.h" #include "net/net_ssl.h" // monero/contrib/epee/include +#include "net/net_utils_base.h" // monero/contrib/epee/include #include "rpc/daemon_messages.h" // monero/src #include "rpc/daemon_zmq.h" #include "rpc/json.h" @@ -75,18 +76,9 @@ namespace lws { - // Not in `rates.h` - defaulting to JSON output seems odd - std::ostream& operator<<(std::ostream& out, lws::rates const& src) - { - wire::json_stream_writer dest{out}; - lws::write_bytes(dest, src); - dest.finish(); - return out; - } - namespace { - namespace net = epee::net_utils; + namespace enet = epee::net_utils; constexpr const std::chrono::minutes block_rpc_timeout{2}; constexpr const std::chrono::seconds send_timeout{30}; @@ -152,9 +144,9 @@ namespace lws return true; } - void send_payment_hook(rpc::client& client, const epee::span events, net::ssl_verification_t verify_mode) + void send_payment_hook(boost::asio::io_context& io, rpc::client& client, net::http::client& http, const epee::span events) { - rpc::send_webhook(client, events, "json-full-payment_hook:", "msgpack-full-payment_hook:", std::chrono::seconds{5}, verify_mode); + rpc::send_webhook_async(io, client, http, events, "json-full-payment_hook:", "msgpack-full-payment_hook:"); } std::size_t get_target_time(db::block_id height) @@ -194,9 +186,9 @@ namespace lws vec.erase(vec.begin()); }; - void send_spend_hook(rpc::client& client, const epee::span events, net::ssl_verification_t verify_mode) + void send_spend_hook(boost::asio::io_context& io, rpc::client& client, net::http::client& http, const epee::span events) { - rpc::send_webhook(client, events, "json-full-spend_hook:", "msgpack-full-spend_hook:", std::chrono::seconds{5}, verify_mode); + rpc::send_webhook_async(io, client, http, events, "json-full-spend_hook:", "msgpack-full-spend_hook:"); } struct add_spend @@ -219,7 +211,7 @@ namespace lws { db::storage const& disk_; rpc::client& client_; - net::ssl_verification_t verify_mode_; + scanner_sync& http_; std::unordered_map txpool_; bool operator()(expect& reader, lws::account& user, const db::output& out) @@ -290,7 +282,7 @@ namespace lws else events.pop_back(); //cannot compute tx_hash } - send_payment_hook(client_, epee::to_span(events), verify_mode_); + send_payment_hook(http_.io_, client_, http_.webhooks_, epee::to_span(events)); return true; } }; @@ -567,7 +559,7 @@ namespace lws scan_transaction_base(users, height, timestamp, tx_hash, tx, out_ids, reader, add_spend{}, add_output{}); } - void scan_transactions(std::string&& txpool_msg, epee::span users, db::storage const& disk, rpc::client& client, const scanner_options& opts) + void scan_transactions(std::string&& txpool_msg, epee::span users, db::storage const& disk, scanner_sync& self, rpc::client& client, const scanner_options& opts) { // uint64::max is for txpool static const std::vector fake_outs( @@ -585,20 +577,11 @@ namespace lws boost::numeric_cast(std::chrono::system_clock::to_time_t(std::chrono::system_clock::now())); subaddress_reader reader{std::optional{disk.clone()}, opts.enable_subaddresses}; - send_webhook sender{disk, client, opts.webhook_verify}; + send_webhook sender{disk, client, self}; for (const auto& tx : parsed->txes) scan_transaction_base(users, db::block_id::txpool, time, crypto::hash{}, tx, fake_outs, reader, null_spend{}, sender); } - void update_rates(rpc::context& ctx) - { - const expect> new_rates = ctx.retrieve_rates(); - if (!new_rates) - MERROR("Failed to retrieve exchange rates: " << new_rates.error().message()); - else if (*new_rates) - MINFO("Updated exchange rates: " << *(*new_rates)); - } - void do_scan_loop(scanner_sync& self, std::shared_ptr data, const bool leader_thread) noexcept { struct stop_ @@ -629,7 +612,7 @@ namespace lws auto new_client = MONERO_UNWRAP(client.clone()); MONERO_UNWRAP(new_client.watch_scan_signals()); user_data store_local{disk.clone()}; - if (!scanner::loop(self.stop_, std::move(store_local), disk.clone(), std::move(new_client), std::move(users), *queue, opts, leader_thread)) + if (!scanner::loop(self, std::move(store_local), disk.clone(), std::move(new_client), std::move(users), *queue, opts, leader_thread)) return; } @@ -657,8 +640,8 @@ namespace lws } } // anonymous - scanner::scanner(db::storage disk) - : disk_(std::move(disk)), sync_(), signals_(sync_.io_) + scanner::scanner(db::storage disk, epee::net_utils::ssl_verification_t webhook_verify) + : disk_(std::move(disk)), sync_(webhook_verify), signals_(sync_.io_) { signals_.add(SIGINT); signals_.async_wait([this] (const boost::system::error_code& error, int) @@ -671,7 +654,7 @@ namespace lws scanner::~scanner() {} - bool scanner::loop(const std::atomic& stop, store_func store, std::optional disk, rpc::client client, std::vector users, rpc::scanner::queue& queue, const scanner_options& opts, const bool leader_thread) + bool scanner::loop(scanner_sync& self, store_func store, std::optional disk, rpc::client client, std::vector users, rpc::scanner::queue& queue, const scanner_options& opts, const bool leader_thread) { if (users.empty()) return true; @@ -698,7 +681,7 @@ namespace lws if (opts.untrusted_daemon && disk) last_pow = MONERO_UNWRAP(MONERO_UNWRAP(disk->start_read()).get_last_pow_block()).id; - while (!stop) + while (!self.stop_) { blockchain.clear(); new_pow.clear(); @@ -796,7 +779,7 @@ namespace lws { if (!disk || message->first != rpc::client::topic::txpool) break; // inner for loop - scan_transactions(std::move(message->second), epee::to_mut_span(users), *disk, client, opts); + scan_transactions(std::move(message->second), epee::to_mut_span(users), *disk, self, client, opts); } for ( ; message != new_pubs->end(); ++message) @@ -885,7 +868,7 @@ namespace lws pow_window.median_timestamps.erase(pow_window.median_timestamps.begin()); // longhash takes a while, check is_running - if (stop) + if (self.stop_) return false; diff = cryptonote::next_difficulty(pow_window.pow_timestamps, pow_window.cumulative_diffs, get_target_time(db::block_id(fetched->start_height))); @@ -942,7 +925,7 @@ namespace lws } // for each block reader.reader = std::error_code{common_error::kInvalidArgument}; // cleanup reader before next write - if (!store(client, epee::to_span(blockchain), epee::to_span(users), epee::to_span(new_pow), opts)) + if (!store(self.io_, client, self.webhooks_, epee::to_span(blockchain), epee::to_span(users), epee::to_span(new_pow))) return false; // TODO @@ -1051,7 +1034,7 @@ namespace lws MONERO_UNWRAP(ctx.connect()), queues, std::move(active), - opts.webhook_verify + self.webhooks_.ssl_context() ); rpc::scanner::server::start_user_checking(server); @@ -1274,7 +1257,7 @@ namespace lws } } // anonymous - bool user_data::store(db::storage& disk, rpc::client& client, const epee::span chain, const epee::span users, const epee::span pow, const scanner_options& opts) + bool user_data::store(boost::asio::io_context& io, db::storage& disk, rpc::client& client, net::http::client& webhook, const epee::span chain, const epee::span users, const epee::span pow) { if (users.empty()) return true; @@ -1293,8 +1276,8 @@ namespace lws } MINFO("Processed " << chain.size() << " block(s) against " << users.size() << " account(s)"); - send_payment_hook(client, epee::to_span(updated->confirm_pubs), opts.webhook_verify); - send_spend_hook(client, epee::to_span(updated->spend_pubs), opts.webhook_verify); + send_payment_hook(io, client, webhook, epee::to_span(updated->confirm_pubs)); + send_spend_hook(io, client, webhook, epee::to_span(updated->spend_pubs)); if (updated->accounts_updated != users.size()) { MWARNING("Only updated " << updated->accounts_updated << " account(s) out of " << users.size() << ", resetting"); @@ -1309,9 +1292,9 @@ namespace lws return true; } - bool user_data::operator()(rpc::client& client, const epee::span chain, const epee::span users, const epee::span pow, const scanner_options& opts) + bool user_data::operator()(boost::asio::io_context& io, rpc::client& client, net::http::client& webhook, const epee::span chain, const epee::span users, const epee::span pow) { - return store(disk_, client, chain, users, pow, opts); + return store(io, disk_, client, webhook, chain, users, pow); } expect scanner::sync(rpc::client client, const bool untrusted_daemon) @@ -1335,26 +1318,25 @@ namespace lws /*! \NOTE Be careful about references and lifetimes of the callbacks. The ones below are safe because no `io_context::run()` call is after the - destruction of the references. - - \NOTE That `ctx` will need a strand or lock if multiple - `io_context::run()` calls are used. */ - + destruction of the references. */ boost::asio::steady_timer rate_timer{sync_.io_}; class rate_updater { + boost::asio::io_context& io_; boost::asio::steady_timer& rate_timer_; rpc::context& ctx_; const std::chrono::minutes rate_interval_; public: - explicit rate_updater(boost::asio::steady_timer& rate_timer, rpc::context& ctx) - : rate_timer_(rate_timer), ctx_(ctx), rate_interval_(ctx.cache_interval()) + explicit rate_updater(boost::asio::io_context& io, boost::asio::steady_timer& rate_timer, rpc::context& ctx) + : io_(io), rate_timer_(rate_timer), ctx_(ctx), rate_interval_(ctx.cache_interval()) {} void operator()(const boost::system::error_code& error = {}) const { - update_rates(ctx_); + const expect status = ctx_.retrieve_rates_async(io_); + if (!status) + MERROR("Unable to retrieve exchange rates: " << status.error()); rate_timer_.expires_from_now(rate_interval_); rate_timer_.async_wait(*this); } @@ -1363,7 +1345,7 @@ namespace lws }; { - rate_updater updater{rate_timer, ctx}; + rate_updater updater{sync_.io_, rate_timer, ctx}; if (std::chrono::minutes{0} < updater.rate_interval()) updater(); } diff --git a/src/scanner.h b/src/scanner.h index ab04bef..746b398 100644 --- a/src/scanner.h +++ b/src/scanner.h @@ -35,6 +35,7 @@ #include "db/fwd.h" #include "db/storage.h" +#include "net/http/client.h" #include "net/net_ssl.h" // monero/contrib/epee/include #include "rpc/client.h" #include "rpc/scanner/fwd.h" @@ -44,7 +45,6 @@ namespace lws { struct scanner_options { - epee::net_utils::ssl_verification_t webhook_verify; bool enable_subaddresses; bool untrusted_daemon; }; @@ -69,20 +69,21 @@ namespace lws /*! Store updated accounts locally (`disk`), and send ZMQ/RMQ/webhook events. `users` must be sorted by height (lowest first). */ - static bool store(db::storage& disk, rpc::client& zclient, epee::span chain, epee::span users, epee::span pow, const scanner_options&); + static bool store(boost::asio::io_context& io, db::storage& disk, rpc::client& zclient, net::http::client& webhook ,epee::span chain, epee::span users, epee::span pow); //! `users` must be sorted by height (lowest first) - bool operator()(rpc::client& zclient, epee::span chain, epee::span users, epee::span pow, const scanner_options&); + bool operator()(boost::asio::io_context& io, rpc::client& zclient, net::http::client& webhook, epee::span chain, epee::span users, epee::span pow); }; struct scanner_sync { boost::asio::io_context io_; + net::http::client webhooks_; std::atomic stop_; //!< Stop scanning but do not shutdown std::atomic shutdown_; //!< Exit scanner::run - explicit scanner_sync() - : io_(), stop_(false), shutdown_(false) + explicit scanner_sync(epee::net_utils::ssl_verification_t webhook_verify) + : io_(), webhooks_(webhook_verify), stop_(false), shutdown_(false) {} bool is_running() const noexcept { return !stop_ && !shutdown_; } @@ -104,18 +105,18 @@ namespace lws public: //! Register `SIGINT` handler and keep a copy of `disk` - explicit scanner(db::storage disk); + explicit scanner(db::storage disk, epee::net_utils::ssl_verification_t webhook_verify); ~scanner(); //! Callback for storing user account (typically local lmdb, but perhaps remote rpc) - using store_func = std::function, epee::span, epee::span, const scanner_options&)>; + using store_func = std::function, epee::span, epee::span)>; /*! Run _just_ the inner scanner loop while `self.is_running() == true`. * \throw std::exception on hard errors (shutdown) conditions \return True iff `queue` indicates thread now has zero accounts. False indicates a soft, typically recoverable error. */ - static bool loop(const std::atomic& stop, store_func store, std::optional disk, rpc::client client, std::vector users, rpc::scanner::queue& queue, const scanner_options& opts, bool leader_thread); + static bool loop(scanner_sync& self, store_func store, std::optional disk, rpc::client client, std::vector users, rpc::scanner::queue& queue, const scanner_options& opts, bool leader_thread); //! Use `client` to sync blockchain data, and \return client if successful. expect sync(rpc::client client, const bool untrusted_daemon = false); diff --git a/src/server_main.cpp b/src/server_main.cpp index c8c1bda..643993a 100644 --- a/src/server_main.cpp +++ b/src/server_main.cpp @@ -296,13 +296,12 @@ namespace auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), std::move(prog.zmq_pub), std::move(prog.rmq), prog.rates_interval, prog.untrusted_daemon); //! SIGINT handle registered by `scanner` constructor - lws::scanner scanner{disk.clone()}; + lws::scanner scanner{disk.clone(), prog.rest_config.webhook_verify}; MINFO("Using monerod ZMQ RPC at " << ctx.daemon_address()); auto client = scanner.sync(ctx.connect().value(), prog.untrusted_daemon).value(); const auto enable_subaddresses = bool(prog.rest_config.max_subaddresses); - const auto webhook_verify = prog.rest_config.webhook_verify; lws::rest_server server{ epee::to_span(prog.rest_servers), prog.admin_rest_servers, std::move(disk), std::move(client), std::move(prog.rest_config) }; @@ -317,7 +316,7 @@ namespace prog.scan_threads, std::move(prog.lws_server_addr), std::move(prog.lws_server_pass), - lws::scanner_options{webhook_verify, enable_subaddresses, prog.untrusted_daemon} + lws::scanner_options{enable_subaddresses, prog.untrusted_daemon} ); } } // anonymous diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index ea99115..13fd43d 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -31,6 +31,7 @@ target_include_directories(monero-lws-unit-framework PUBLIC ${CMAKE_CURRENT_SOUR target_link_libraries(monero-lws-unit-framework) add_subdirectory(db) +add_subdirectory(net) add_subdirectory(rpc) add_subdirectory(wire) @@ -40,6 +41,8 @@ target_link_libraries(monero-lws-unit monero-lws-daemon-common monero-lws-unit-db monero-lws-unit-framework + monero-lws-unit-net + monero-lws-unit-net-http monero-lws-unit-rpc monero-lws-unit-wire monero-lws-unit-wire-json diff --git a/tests/unit/net/CMakeLists.txt b/tests/unit/net/CMakeLists.txt new file mode 100644 index 0000000..d946629 --- /dev/null +++ b/tests/unit/net/CMakeLists.txt @@ -0,0 +1,32 @@ +# Copyright (c) 2024, The Monero Project +# +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, are +# permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this list of +# conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, this list +# of conditions and the following disclaimer in the documentation and/or other +# materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors may be +# used to endorse or promote products derived from this software without specific +# prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +# THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +add_subdirectory(http) + +add_library(monero-lws-unit-net) +target_link_libraries(monero-lws-unit-net monero-lws-unit-net-http) diff --git a/tests/unit/net/http/CMakeLists.txt b/tests/unit/net/http/CMakeLists.txt new file mode 100644 index 0000000..425d121 --- /dev/null +++ b/tests/unit/net/http/CMakeLists.txt @@ -0,0 +1,34 @@ +# Copyright (c) 2024, The Monero Project +# +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, are +# permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this list of +# conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, this list +# of conditions and the following disclaimer in the documentation and/or other +# materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors may be +# used to endorse or promote products derived from this software without specific +# prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +# THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +add_library(monero-lws-unit-net-http OBJECT client.test.cpp) +target_link_libraries( + monero-lws-unit-net-http + monero-lws-net-http + monero-lws-unit-framework + monero::libraries) diff --git a/tests/unit/net/http/client.test.cpp b/tests/unit/net/http/client.test.cpp new file mode 100644 index 0000000..da6a675 --- /dev/null +++ b/tests/unit/net/http/client.test.cpp @@ -0,0 +1,166 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "framework.test.h" + +#include +#include +#include +#include +#include "crypto/crypto.h" // monero/src +#include "net/http/client.h" +#include "net/http_server_impl_base.h" // monero/contrib/epee/include + +namespace +{ + constexpr const std::uint16_t server_port = 10000; + constexpr const std::uint16_t invalid_server_port = 10001; + + namespace enet = epee::net_utils; + struct context : enet::connection_context_base + { + context() + : enet::connection_context_base() + {} + }; + + struct handler : epee::http_server_impl_base + { + handler() + : epee::http_server_impl_base() + {} + + virtual bool + handle_http_request(const enet::http::http_request_info& query, enet::http::http_response_info& response, context&) + override final + { + if (query.m_URI == "/") + response.m_response_code = 404; + else + response.m_response_code = 200; + response.m_body = query.m_URI; + return true; + } + }; +} + +LWS_CASE("net::http::client") +{ + + boost::asio::io_context io; + handler server{}; + server.init(&crypto::generate_random_bytes_thread_safe, std::to_string(server_port)); + server.run(1, false); + + SETUP("server and client") + { + net::http::client client{epee::net_utils::ssl_verification_t::none}; + + SECTION("GET 200 OK") + { + std::atomic done = false; + const auto handler = [&done, &lest_env] (boost::system::error_code error, std::string body) + { + EXPECT(!error); + EXPECT(body == "/some_endpoint"); + done = true; + }; + client.get_async( + io, "http://127.0.0.1:" + std::to_string(server_port) + "/some_endpoint", handler + ); + + while (!done) + { + io.run_one(); + io.restart(); + } + } + + SECTION("GET 200 OK Twice") + { + std::atomic done = 0; + const auto handler = [&done, &lest_env] (boost::system::error_code error, std::string body) + { + EXPECT(!error); + EXPECT(body == "/some_endpoint"); + ++done; + }; + client.get_async( + io, "http://127.0.0.1:" + std::to_string(server_port) + "/some_endpoint", handler + ); + client.get_async( + io, "http://127.0.0.1:" + std::to_string(server_port) + "/some_endpoint", handler + ); + + while (done != 2) + { + io.run_one(); + io.restart(); + } + } + + SECTION("GET 404 NOT FOUND") + { + std::atomic done = false; + const auto handler = [&done, &lest_env] (boost::system::error_code error, std::string body) + { + EXPECT(error == boost::asio::error::operation_not_supported); + EXPECT(body.empty()); + done = true; + }; + client.get_async( + io, "http://127.0.0.1:" + std::to_string(server_port), handler + ); + + while (!done) + { + io.run_one(); + io.restart(); + } + } + + SECTION("GET (Invalid server address)") + { + std::atomic done = false; + const auto handler = [&done, &lest_env] (boost::system::error_code error, std::string body) + { + EXPECT(error == boost::asio::error::connection_refused); + EXPECT(body.empty()); + done = true; + }; + client.get_async( + io, "http://127.0.0.1:" + std::to_string(invalid_server_port), handler + ); + + while (!done) + { + io.run_one(); + io.restart(); + } + } + } +} diff --git a/tests/unit/scanner.test.cpp b/tests/unit/scanner.test.cpp index 724ac19..3968ef1 100644 --- a/tests/unit/scanner.test.cpp +++ b/tests/unit/scanner.test.cpp @@ -350,7 +350,7 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run") std::vector messages{}; messages.push_back(to_json_rpc(1)); - lws::scanner scanner{db.clone()}; + lws::scanner scanner{db.clone(), epee::net_utils::ssl_verification_t::none}; boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages)); const join on_scope_exit{server_thread}; @@ -384,7 +384,7 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run") lws_test::test_chain(lest_env, MONERO_UNWRAP(db.start_read()), last_block.id, {hashes.data(), 1}); { - lws::scanner scanner{db.clone()}; + lws::scanner scanner{db.clone(), epee::net_utils::ssl_verification_t::none}; boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages)); const join on_scope_exit{server_thread}; EXPECT(scanner.sync(MONERO_UNWRAP(rpc.connect()))); @@ -408,7 +408,7 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run") message.hashes.resize(1); messages.push_back(daemon_response(message)); - lws::scanner scanner{db.clone()}; + lws::scanner scanner{db.clone(), epee::net_utils::ssl_verification_t::none}; boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages)); const join on_scope_exit{server_thread}; EXPECT(scanner.sync(MONERO_UNWRAP(rpc.connect()))); @@ -516,7 +516,7 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run") messages.push_back(daemon_response(hmessage)); { - lws::scanner scanner{db.clone()}; + lws::scanner scanner{db.clone(), epee::net_utils::ssl_verification_t::none}; boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages)); const join on_scope_exit{server_thread}; EXPECT(scanner.sync(MONERO_UNWRAP(rpc.connect()))); @@ -534,10 +534,8 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run") bmessage.output_indices.resize(1); messages.push_back(daemon_response(bmessage)); { - static constexpr const lws::scanner_options opts{ - epee::net_utils::ssl_verification_t::none, true, false - }; - lws::scanner scanner{db.clone()}; + static constexpr const lws::scanner_options opts{true, false}; + lws::scanner scanner{db.clone(), epee::net_utils::ssl_verification_t::none}; boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages)); const join on_scope_exit{server_thread}; scanner.run(std::move(rpc), 1, {}, {}, opts);