Switch from epee http client to boost::beast. All HTTP now non-blocking. (#150)
Some checks failed
unix-ci / build-tests (ubuntu-24.04, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (macos-13, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (macos-13, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (macos-14, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (macos-14, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (macos-latest, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (macos-latest, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (ubuntu-20.04, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (ubuntu-20.04, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (ubuntu-22.04, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (ubuntu-22.04, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (ubuntu-24.04, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (ubuntu-latest, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (ubuntu-latest, WITH_RMQ=ON) (push) Has been cancelled

This commit is contained in:
Lee *!* Clagett 2024-12-04 17:25:07 -05:00 committed by GitHub
parent 18b5743596
commit b659200a2b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 1095 additions and 257 deletions

View file

@ -195,7 +195,7 @@ namespace
{
std::shared_ptr<lws::rpc::scanner::client> client_;
bool operator()(lws::rpc::client&, epee::span<const crypto::hash> chain, epee::span<const lws::account> users, epee::span<const lws::db::pow_sync> pow, const lws::scanner_options&)
bool operator()(boost::asio::io_context&, lws::rpc::client&, net::http::client&, epee::span<const crypto::hash> chain, epee::span<const lws::account> users, epee::span<const lws::db::pow_sync> pow)
{
if (!client_)
return false;
@ -246,14 +246,12 @@ namespace
if (!users.empty())
{
static constexpr const lws::scanner_options opts{
epee::net_utils::ssl_verification_t::system_ca, false, false
};
static constexpr const lws::scanner_options opts{false, false};
auto new_client = MONERO_UNWRAP(zclient.clone());
MONERO_UNWRAP(new_client.watch_scan_signals());
send_users send{client};
if (!lws::scanner::loop(self.stop_, std::move(send), std::nullopt, std::move(new_client), std::move(users), *queue, opts, false))
if (!lws::scanner::loop(self, std::move(send), std::nullopt, std::move(new_client), std::move(users), *queue, opts, false))
return;
}
}
@ -275,7 +273,7 @@ namespace
MINFO("Using monerod ZMQ RPC at " << prog.monerod_rpc);
auto ctx = lws::rpc::context::make(std::move(prog.monerod_rpc), std::move(prog.monerod_sub), {}, {}, std::chrono::minutes{0}, false);
lws::scanner_sync self{};
lws::scanner_sync self{epee::net_utils::ssl_verification_t::system_ca};
/*! \NOTE `ctx will need a strand or lock if multiple threads use
`self.io.run()` in the future. */

View file

@ -26,8 +26,10 @@
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
add_subdirectory(http)
set(monero-lws-net_sources zmq_async.cpp)
set(monero-lws-net_headers zmq_async.h)
add_library(monero-lws-net ${monero-lws-net_sources} ${monero-lws-net_headers})
target_link_libraries(monero-lws-net monero::libraries)
target_link_libraries(monero-lws-net monero-lws-net-http monero::libraries)

View file

@ -0,0 +1,33 @@
# Copyright (c) 2024, The Monero Project
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification, are
# permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of
# conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list
# of conditions and the following disclaimer in the documentation and/or other
# materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be
# used to endorse or promote products derived from this software without specific
# prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
# THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
set(monero-lws-net-http_sources client.cpp)
set(monero-lws-net-http_headers client.h slice_body.h)
add_library(monero-lws-net-http ${monero-lws-net-http_sources} ${monero-lws-net-http_headers})
target_link_libraries(monero-lws-net-http ${Boost_SYSTEM_LIBRARY} monero::libraries)

482
src/net/http/client.cpp Normal file
View file

@ -0,0 +1,482 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "client.h"
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/ip/address_v6.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/strand.hpp>
#include <boost/beast/core/flat_static_buffer.hpp>
#include <boost/beast/http/parser.hpp>
#include <boost/beast/http/read.hpp>
#include <boost/beast/http/string_body.hpp>
#include <boost/beast/http/write.hpp>
#include <boost/beast/version.hpp>
#include <boost/optional/optional.hpp>
#include <boost/thread/lock_types.hpp>
#include <cstdint>
#include <deque>
#include <limits>
#include <ostream>
#include "error.h"
#include "misc_log_ex.h" // monero/contrib/epee/include
#include "net/http_base.h" // monero/contrib/epee/include
#include "net/http/slice_body.h"
#include "net/net_parse_helpers.h" // monero/contrib/epee/include
namespace net { namespace http
{
namespace
{
constexpr const unsigned http_version = 11;
constexpr const std::size_t http_parser_buffer_size = 16 * 1024;
constexpr const std::size_t max_body_size = 4 * 1024;
//! Timeout for 1 entire HTTP request (connect, handshake, send, receive).
constexpr const std::chrono::seconds message_timeout{30};
struct message
{
message(epee::byte_slice json_body, std::string host, std::string target, std::uint16_t port, bool https, std::function<server_response_func>&& notifier)
: json_body(std::move(json_body)),
notifier(std::move(notifier)),
host(std::move(host)),
target(std::move(target)),
port(port),
https(https)
{}
message(message&&) = default;
message(const message& rhs)
: json_body(rhs.json_body.clone()),
notifier(rhs.notifier),
host(rhs.host),
target(rhs.target),
port(rhs.port),
https(rhs.https)
{}
epee::byte_slice json_body;
std::function<server_response_func> notifier;
std::string host;
std::string target;
std::uint16_t port;
bool https;
};
std::ostream& operator<<(std::ostream& out, const message& src)
{
out << (src.https ? "https://" : "http://") << src.host << ':' << src.port << src.target;
return out;
}
} // anonymous
struct client_state
{
std::shared_ptr<boost::asio::ssl::context> ssl;
boost::beast::flat_static_buffer<http_parser_buffer_size> buffer;
std::deque<message> outgoing;
std::string last_host;
boost::asio::ip::tcp::resolver resolver;
boost::asio::steady_timer timer;
boost::asio::io_context::strand strand;
boost::asio::ip::tcp::endpoint endpoint;
boost::beast::http::request<slice_body> request;
boost::optional<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>> sock;
boost::optional<boost::beast::http::parser<false, boost::beast::http::string_body>> parser;
std::size_t iteration;
client_state(boost::asio::io_context& io, std::shared_ptr<boost::asio::ssl::context> in)
: ssl(std::move(in)),
buffer{},
outgoing(),
last_host(),
resolver(io),
timer(io),
strand(io),
endpoint(),
request{},
sock(),
parser(),
iteration(0)
{
assert(ssl);
sock.emplace(io, *ssl);
}
template<typename F>
void async_write(F&& callback)
{
assert(sock);
assert(!outgoing.empty());
const bool no_body = outgoing.front().json_body.empty();
request = {
outgoing.front().notifier ? boost::beast::http::verb::get : boost::beast::http::verb::post,
outgoing.front().target,
http_version,
std::move(outgoing.front().json_body)
};
request.set(boost::beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING);
if (!no_body)
request.set(boost::beast::http::field::content_type, "application/json");
// Setting Host is tricky. Check for v6 and non-standard ports
boost::system::error_code error{};
boost::asio::ip::make_address_v6(outgoing.front().host, error);
if (!error)
request.set(boost::beast::http::field::host, "[" + outgoing.front().host + "]:" + std::to_string(outgoing.front().port));
else if ((outgoing.front().https && outgoing.front().port == 443) || (!outgoing.front().https && outgoing.front().port == 80))
request.set(boost::beast::http::field::host, outgoing.front().host);
else
request.set(boost::beast::http::field::host, outgoing.front().host + ":" + std::to_string(outgoing.front().port));
request.prepare_payload();
if (outgoing.front().https)
boost::beast::http::async_write(*sock, request, boost::asio::bind_executor(strand, std::forward<F>(callback)));
else
boost::beast::http::async_write(sock->next_layer(), request, boost::asio::bind_executor(strand, std::forward<F>(callback)));
}
template<typename F>
void async_read(F&& callback)
{
assert(sock);
assert(!outgoing.empty());
parser.emplace();
parser->body_limit(max_body_size);
if (outgoing.front().https)
boost::beast::http::async_read(*sock, buffer, *parser, boost::asio::bind_executor(strand, std::forward<F>(callback)));
else
boost::beast::http::async_read(sock->next_layer(), buffer, *parser, boost::asio::bind_executor(strand, std::forward<F>(callback)));
}
void notify_error(const boost::system::error_code& error) const
{
assert(!outgoing.empty());
if (outgoing.front().notifier)
outgoing.front().notifier(error, {});
}
};
namespace
{
class client_loop : public boost::asio::coroutine
{
std::shared_ptr<client_state> self_;
public:
explicit client_loop(std::shared_ptr<client_state> self) noexcept
: boost::asio::coroutine(), self_(std::move(self))
{}
bool set_timeout(const std::chrono::steady_clock::duration timeout)
{
if (!self_)
return false;
struct on_timeout
{
on_timeout() = delete;
std::shared_ptr<client_state> self_;
std::size_t iteration;
void operator()(boost::system::error_code error) const
{
if (!self_ || error == boost::asio::error::operation_aborted)
return;
if (iteration < self_->iteration)
return;
assert(self_->strand.running_in_this_thread());
assert(self_->sock);
if (!self_->outgoing.empty())
MWARNING("Timeout in HTTP attempt to " << self_->outgoing.front());
else
MERROR("Unexpected empty message stack");
self_->resolver.cancel();
self_->sock->next_layer().cancel(error);
self_->sock->next_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
}
};
self_->timer.expires_after(timeout);
self_->timer.async_wait(boost::asio::bind_executor(self_->strand, on_timeout{self_, self_->iteration}));
return true;
}
void operator()(boost::system::error_code error = {}, std::size_t = 0)
{
if (!self_)
return;
bool reuse = false;
bool is_https = false;
std::uint16_t last_port = 0;
client_state& self = *self_;
assert(self.strand.running_in_this_thread());
assert(self.sock);
BOOST_ASIO_CORO_REENTER(*this)
{
while (!self.outgoing.empty())
{
struct resolve
{
client_loop continue_;
void operator()(const boost::system::error_code error, const boost::asio::ip::tcp::resolver::results_type& ips)
{
if (error)
std::move(continue_)(error, 0);
else if (ips.empty())
std::move(continue_)(boost::asio::error::host_not_found, 0);
else if (continue_.self_)
{
continue_.self_->endpoint = *ips.begin();
std::move(continue_)(error, 0);
}
}
};
set_timeout(message_timeout);
if (!reuse)
{
MDEBUG("Resolving " << self.outgoing.front().host << " for HTTP");
BOOST_ASIO_CORO_YIELD self.resolver.async_resolve(
self.outgoing.front().host,
std::to_string(self.outgoing.front().port),
boost::asio::bind_executor(self.strand, resolve{std::move(*this)})
);
}
if (!error)
{
if (!reuse)
{
MDEBUG("Connecting to " << self.endpoint << " / " << self.outgoing.front().host << " for HTTP");
BOOST_ASIO_CORO_YIELD self.sock->next_layer().async_connect(
self.endpoint, boost::asio::bind_executor(self.strand, std::move(*this))
);
self.buffer.clear(); // do not re-use http buffer
}
if (!error)
{
if (!reuse && self.outgoing.front().https)
{
{
SSL* const ssl_ctx = self.sock->native_handle();
if (ssl_ctx)
SSL_set_tlsext_host_name(ssl_ctx, self.outgoing.front().host.c_str());
}
MDEBUG("Starting SSL handshake to " << self.outgoing.front().host << " for HTTP");
BOOST_ASIO_CORO_YIELD self.sock->async_handshake(
boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::client,
boost::asio::bind_executor(self.strand, std::move(*this))
);
}
if (!error)
{
reuse = false;
MDEBUG("Sending " << self.outgoing.front().json_body.size() << " bytes in HTTP " << (self.outgoing.front().notifier ? "GET" : "POST") << " to " << self.outgoing.front());
BOOST_ASIO_CORO_YIELD self.async_write(std::move(*this));
if (!error)
{
MDEBUG("Starting read from " << self.outgoing.front() << " to previous HTTP message");
BOOST_ASIO_CORO_YIELD self.async_read(std::move(*this));
if (error)
MERROR("Failed to parse HTTP response from " << self.outgoing.front() << ": " << error.message());
else if (self.parser->get().result_int() != 200 && self.parser->get().result_int() != 201)
{
MERROR(self.outgoing.front() << " returned " << self.parser->get().result_int() << " status code");
self.notify_error(boost::asio::error::operation_not_supported);
}
else
{
MDEBUG(self.outgoing.front() << " successful");
reuse = self.parser->get().keep_alive();
if (self.outgoing.front().notifier)
self.outgoing.front().notifier({}, std::move(self.parser->get()).body());
}
}
else
MERROR("Failed HTTP " << (self.outgoing.front().notifier ? "GET" : "POST") << " to " << self.outgoing.front() << ": " << error.message());
}
else
MERROR("SSL handshake to " << self.outgoing.front().host << " failed: " << error.message());
}
else
MERROR("Failed to connect to " << self.outgoing.front().host << ": " << error.message());
}
else
MERROR("Failed to resolve TCP/IP address for " << self.outgoing.front().host << ": " << error.message());
if (error)
self.notify_error(error);
is_https = self.outgoing.front().https;
self.last_host = std::move(self.outgoing.front().host);
last_port = self.outgoing.front().port;
self.outgoing.pop_front();
reuse = reuse &&
!self.outgoing.empty() &&
self.last_host == self.outgoing.front().host &&
last_port == self.outgoing.front().port;
if (!reuse)
{
if (is_https)
{
MDEBUG("Starting SSL shutdown on " << self.last_host);
BOOST_ASIO_CORO_YIELD self.sock->async_shutdown(
boost::asio::bind_executor(self.strand, std::move(*this))
);
is_https = true;
}
MDEBUG("Cleaning up connection to " << self.last_host);
self.sock->next_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
self.sock->next_layer().close(error);
if (is_https) // must clear SSL state
self.sock.emplace(self.timer.get_executor(), *self.ssl);
}
++self.iteration;
}
}
}
};
boost::asio::ssl::context make_context(epee::net_utils::ssl_verification_t verify)
{
epee::net_utils::ssl_options_t ssl{
epee::net_utils::ssl_support_t::e_ssl_support_enabled
};
ssl.verification = verify;
return ssl.create_context();
}
} // anonymous
expect<void> client::queue_async(boost::asio::io_context& io, std::string url, epee::byte_slice json_body, std::function<server_response_func> notifier)
{
static constexpr const std::uint16_t max_port = std::numeric_limits<std::uint16_t>::max();
epee::net_utils::http::url_content parsed{};
if (!epee::net_utils::parse_url(std::move(url), parsed) || max_port < parsed.port)
return {lws::error::bad_url};
if (parsed.schema != "http" && parsed.schema != "https")
return {lws::error::bad_url};
if (!parsed.port)
parsed.port = (parsed.schema == "http") ? 80 : 443;
if (parsed.uri.empty())
parsed.uri = "/";
message msg{
std::move(json_body),
std::move(parsed.host),
std::move(parsed.uri),
std::uint16_t(parsed.port),
bool(parsed.schema == "https"),
std::move(notifier)
};
MDEBUG("Queueing HTTP " << (msg.notifier ? "GET" : "POST") << " to " << msg << " using " << this);
boost::unique_lock<boost::mutex> lock{sync_};
auto state = state_.lock();
if (!state)
{
// `make_shared` delays freeing of data section, use `make_unique`
MDEBUG("Creating new net::http::client_state for " << this);
state = {std::make_unique<client_state>(io, ssl_)};
state_ = state;
state->outgoing.push_back(std::move(msg));
lock.unlock();
boost::asio::post(state->strand, client_loop{state});
}
else
{
lock.unlock();
boost::asio::dispatch(
state->strand,
[state, msg = std::move(msg)] () mutable
{
const bool empty = state->outgoing.empty();
state->outgoing.push_back(std::move(msg));
if (empty)
boost::asio::post(state->strand, client_loop{state});
}
);
}
return success();
}
client::client(epee::net_utils::ssl_verification_t verify)
: state_(),
ssl_(std::make_shared<boost::asio::ssl::context>(make_context(verify))),
sync_()
{}
client::client(std::shared_ptr<boost::asio::ssl::context> ssl)
: state_(), ssl_(std::move(ssl)), sync_()
{
if (!ssl_)
throw std::logic_error{"boost::asio::ssl::context cannot be nullptr"};
}
client::~client()
{}
expect<void> client::post_async(boost::asio::io_context& io, std::string url, epee::byte_slice json_body)
{
return queue_async(io, std::move(url), std::move(json_body), {});
}
expect<void> client::get_async(boost::asio::io_context& io, std::string url, std::function<server_response_func> notifier)
{
if (!notifier)
throw std::logic_error{"net::http::client::get_async requires callback"};
return queue_async(io, std::move(url), epee::byte_slice{}, std::move(notifier));
}
}} // net // http

72
src/net/http/client.h Normal file
View file

@ -0,0 +1,72 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <boost/asio/io_context.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/system/error_code.hpp>
#include <boost/thread/mutex.hpp>
#include <functional>
#include <memory>
#include <string>
#include "byte_slice.h" // monero/contrib/epee/include
#include "common/expect.h" // monero/src
#include "net/net_ssl.h" // monero/contrib/epee/include
namespace net { namespace http
{
struct client_state;
using server_response_func = void(boost::system::error_code, std::string);
//! Primarily for webhooks, where the response is (basically) ignored.
class client
{
std::weak_ptr<client_state> state_;
std::shared_ptr<boost::asio::ssl::context> ssl_;
boost::mutex sync_;
expect<void> queue_async(boost::asio::io_context& io, std::string url, epee::byte_slice json_body, std::function<server_response_func> notifier);
public:
explicit client(epee::net_utils::ssl_verification_t verify);
explicit client(std::shared_ptr<boost::asio::ssl::context> ssl);
~client();
const std::shared_ptr<boost::asio::ssl::context>& ssl_context() const noexcept
{ return ssl_; }
//! Never blocks. Thread safe. \return `success()` if `url` is valid.
expect<void> post_async(boost::asio::io_context& io, std::string url, epee::byte_slice json_body);
/*! Never blocks. Thread safe. Calls `notifier` with server response iff
`success()` is returned.
\return `success()` if `url` is valid. */
expect<void> get_async(boost::asio::io_context& io, std::string url, std::function<server_response_func> notifier);
};
}} // net // http

77
src/net/http/slice_body.h Normal file
View file

@ -0,0 +1,77 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <boost/asio/buffer.hpp>
#include <boost/beast/core/error.hpp>
#include <boost/beast/http/message.hpp>
#include <cstdint>
#include <limits>
#include "byte_slice.h" // monero/contrib/epee/include
namespace net { namespace http
{
//! Trait for `boost::beast` body type
struct slice_body
{
using value_type = epee::byte_slice;
static std::uint64_t size(const value_type& source) noexcept
{
static_assert(!std::numeric_limits<std::size_t>::is_signed, "expected unsigned");
static_assert(
std::numeric_limits<std::size_t>::max() <= std::numeric_limits<std::uint64_t>::max(),
"unexpected size_t max value"
);
return source.size();
}
struct writer
{
epee::byte_slice body_;
using const_buffers_type = boost::asio::const_buffer;
template<bool is_request, typename Fields>
explicit writer(boost::beast::http::header<is_request, Fields> const&, value_type const& body)
: body_(body.clone())
{}
void init(boost::beast::error_code& ec)
{
ec = {};
}
boost::optional<std::pair<const_buffers_type, bool>> get(boost::beast::error_code& ec)
{
ec = {};
return {{const_buffers_type{body_.data(), body_.size()}, false}};
}
};
};
}} // net // http

View file

@ -55,6 +55,7 @@
#include <boost/thread/tss.hpp>
#include <boost/utility/string_ref.hpp>
#include <cstring>
#include <functional>
#include <limits>
#include <string>
#include <utility>
@ -69,8 +70,11 @@
#include "db/string.h"
#include "error.h"
#include "lmdb/util.h" // monero/src
#include "net/http/client.h"
#include "net/http/slice_body.h"
#include "net/net_parse_helpers.h" // monero/contrib/epee/include
#include "net/net_ssl.h" // monero/contrib/epee/include
#include "net/net_utils_base.h" // monero/contrib/epee/include
#include "net/zmq.h" // monero/src
#include "net/zmq_async.h"
#include "rpc/admin.h"
@ -102,6 +106,7 @@ namespace lws
const rpc::client client;
const runtime_options options;
std::vector<net::zmq::async_client> clients;
net::http::client webhook_client;
boost::mutex sync;
rest_server_data(db::storage disk, rpc::client client, runtime_options options)
@ -109,6 +114,7 @@ namespace lws
disk(std::move(disk)),
client(std::move(client)),
options(std::move(options)),
webhook_client(options.webhook_verify),
clients(),
sync()
{}
@ -1303,7 +1309,7 @@ namespace lws
using request = rpc::login_request;
using response = rpc::login_response;
static expect<response> handle(request req, const rest_server_data& data, std::function<async_complete>&& resume)
static expect<response> handle(request req, rest_server_data& data, std::function<async_complete>&&)
{
if (!key_check(req.creds))
return {lws::error::bad_view_key};
@ -1346,8 +1352,8 @@ namespace lws
// webhooks are not needed for response, so just queue i/o and
// log errors when it fails
rpc::send_webhook(
data.client, epee::to_span(*hooks), "json-full-new_account_hook:", "msgpack-full-new_account_hook:", std::chrono::seconds{5}, data.options.webhook_verify
rpc::send_webhook_async(
data.io, data.client, data.webhook_client, epee::to_span(*hooks), "json-full-new_account_hook:", "msgpack-full-new_account_hook:"
);
}
@ -1892,7 +1898,7 @@ namespace lws
Sock sock_;
boost::beast::flat_static_buffer<http_parser_buffer_size> buffer_;
boost::optional<boost::beast::http::parser<true, boost::beast::http::string_body>> parser_;
boost::beast::http::response<slice_body> response_;
boost::beast::http::response<net::http::slice_body> response_;
boost::asio::steady_timer timer_;
boost::asio::io_context::strand strand_;
bool keep_alive_;

View file

@ -42,7 +42,7 @@
#include "net/zmq_async.h"
#include "scanner.h"
#include "serialization/json_object.h" // monero/src
#include "wire/msgpack.h"
#include "wire/json/write.h"
#if MLWS_RMQ_ENABLED
#include <amqp.h>
#include <amqp_tcp_socket.h>
@ -50,6 +50,15 @@
namespace lws
{
// Not in `rates.h` - defaulting to JSON output seems odd
std::ostream& operator<<(std::ostream& out, lws::rates const& src)
{
wire::json_stream_writer dest{out};
lws::write_bytes(dest, src);
dest.finish();
return out;
}
namespace rpc
{
namespace http = epee::net_utils::http;
@ -194,17 +203,16 @@ namespace rpc
, rmq(std::move(rmq))
, daemon_addr(std::move(daemon_addr))
, sub_addr(std::move(sub_addr))
, rates_conn()
, rates_conn(epee::net_utils::ssl_verification_t::system_ca)
, cache_time()
, cache_interval(interval)
, cached{}
, account_counter(0)
, sync_pub()
, sync_rates()
, untrusted_daemon(untrusted_daemon)
, rates_running()
{
if (std::chrono::minutes{0} < cache_interval)
rates_conn.set_server(crypto_compare.host, boost::none, epee::net_utils::ssl_support_t::e_ssl_support_enabled);
rates_running.clear();
}
zcontext comm;
@ -213,14 +221,14 @@ namespace rpc
rcontext rmq;
const std::string daemon_addr;
const std::string sub_addr;
http::http_simple_client rates_conn;
net::http::client rates_conn;
std::chrono::steady_clock::time_point cache_time;
const std::chrono::minutes cache_interval;
rates cached;
std::atomic<unsigned> account_counter;
boost::mutex sync_pub;
boost::mutex sync_rates;
const bool untrusted_daemon;
std::atomic_flag rates_running;
};
} // detail
@ -597,37 +605,44 @@ namespace rpc
return do_signal(ctx->signal_pub.get(), abort_process_signal);
}
expect<boost::optional<lws::rates>> context::retrieve_rates()
expect<void> context::retrieve_rates_async(boost::asio::io_context& io)
{
MONERO_PRECOND(ctx != nullptr);
if (ctx->cache_interval <= std::chrono::minutes{0})
return boost::make_optional(false, ctx->cached);
return success();
if (ctx->rates_running.test_and_set())
return success();
auto& self = ctx;
const expect<void> rc = ctx->rates_conn.get_async(
io, crypto_compare.url, [self] (boost::system::error_code error, std::string body)
{
expect<rates> fresh{lws::error::exchange_rates_fetch};
if (!error)
{
fresh = crypto_compare(std::move(body));
if (fresh)
MINFO("Updated exchange rates: " << *fresh);
else
MERROR("Failed to parse exchange rates: " << fresh.error());
}
else
MERROR("Failed to retrieve exchange rates: " << error.message());
const auto now = std::chrono::steady_clock::now();
if (now - ctx->cache_time < ctx->cache_interval)
return boost::make_optional(false, ctx->cached);
expect<rates> fresh{lws::error::exchange_rates_fetch};
const http::http_response_info* info = nullptr;
const bool retrieved =
ctx->rates_conn.invoke_get(crypto_compare.path, std::chrono::seconds{20}, std::string{}, std::addressof(info)) &&
info != nullptr &&
info->m_response_code == 200;
// \TODO Remove copy below
if (retrieved)
fresh = crypto_compare(std::string{info->m_body});
const boost::unique_lock<boost::mutex> lock{ctx->sync_rates};
ctx->cache_time = now;
if (fresh)
{
ctx->cached = *fresh;
return boost::make_optional(*fresh);
const boost::lock_guard<boost::mutex> lock{self->sync_rates};
self->cache_time = now;
self->cached = std::move(*fresh);
}
return fresh.error();
self->rates_running.clear();
});
if (!rc)
ctx->rates_running.clear();
return rc;
}
} // rpc
} // lws

View file

@ -148,10 +148,10 @@ namespace rpc
*/
expect<void> send(epee::byte_slice message, std::chrono::seconds timeout) noexcept;
//! Publish `payload` to ZMQ external pub socket.
//! Publish `payload` to ZMQ external pub socket. Blocks iff RMQ.
expect<void> publish(epee::byte_slice payload) const;
//! Publish `data` after `topic` to ZMQ external pub socket.
//! Publish `data` after `topic` to ZMQ external pub socket. Blocks iff RMQ.
template<typename F, typename T>
expect<void> publish(const boost::string_ref topic, const T& data) const
{
@ -250,14 +250,11 @@ namespace rpc
expect<void> raise_abort_process() noexcept;
/*!
Retrieve exchange rates, if enabled and past cache interval. Not
thread-safe (this can be invoked from one thread only, but this is
thread-safe with `client::get_rates()`). All clients will see new rates
immediately.
Retrieve exchange rates, if enabled. Thread-safe. All clients will see
new rates once completed.
\return Rates iff they were updated.
*/
expect<boost::optional<lws::rates>> retrieve_rates();
\return `success()` if HTTP GET was queued. */
expect<void> retrieve_rates_async(boost::asio::io_context& io);
};
} // rpc
} // lws

View file

@ -65,8 +65,8 @@ namespace lws
namespace rpc
{
const char crypto_compare_::host[] = "https://min-api.cryptocompare.com:443";
const char crypto_compare_::path[] =
const char crypto_compare_::url[] =
"https://min-api.cryptocompare.com"
"/data/price?fsym=XMR&tsyms=AUD,BRL,BTC,CAD,CHF,CNY,EUR,GBP,"
"HKD,INR,JPY,KRW,MXN,NOK,NZD,SEK,SGD,TRY,USD,RUB,ZAR";

View file

@ -64,8 +64,7 @@ namespace lws
{
struct crypto_compare_
{
static const char host[];
static const char path[];
static const char url[];
expect<lws::rates> operator()(std::string&& body) const;
};

View file

@ -458,7 +458,7 @@ namespace lws { namespace rpc { namespace scanner
};
}
server::server(boost::asio::io_context& io, db::storage disk, rpc::client zclient, std::vector<std::shared_ptr<queue>> local, std::vector<db::account_id> active, ssl_verification_t webhook_verify)
server::server(boost::asio::io_context& io, db::storage disk, rpc::client zclient, std::vector<std::shared_ptr<queue>> local, std::vector<db::account_id> active, std::shared_ptr<boost::asio::ssl::context> ssl)
: strand_(io),
check_timer_(io),
acceptor_(io),
@ -467,11 +467,11 @@ namespace lws { namespace rpc { namespace scanner
active_(std::move(active)),
disk_(std::move(disk)),
zclient_(std::move(zclient)),
webhook_(std::move(ssl)),
accounts_cur_{},
next_thread_(0),
pass_hashed_(),
pass_salt_(),
webhook_verify_(webhook_verify),
stop_(false)
{
std::sort(active_.begin(), active_.end());
@ -563,8 +563,7 @@ namespace lws { namespace rpc { namespace scanner
self->strand_,
[self, users = std::move(users), blocks = std::move(blocks)] ()
{
const lws::scanner_options opts{self->webhook_verify_, false, false};
if (!lws::user_data::store(self->disk_, self->zclient_, epee::to_span(blocks), epee::to_span(users), nullptr, opts))
if (!lws::user_data::store(self->strand_.context(), self->disk_, self->zclient_, self->webhook_, epee::to_span(blocks), epee::to_span(users), nullptr))
{
self->do_stop();
self->strand_.context().stop();

View file

@ -38,6 +38,7 @@
#include "db/fwd.h"
#include "db/storage.h"
#include "net/http/client.h"
#include "net/net_ssl.h" // monero/contrib/epee/include
#include "rpc/client.h"
#include "rpc/scanner/queue.h"
@ -65,11 +66,11 @@ namespace lws { namespace rpc { namespace scanner
std::vector<db::account_id> active_;
db::storage disk_;
rpc::client zclient_;
net::http::client webhook_;
db::cursor::accounts accounts_cur_;
std::size_t next_thread_;
std::array<unsigned char, 32> pass_hashed_;
std::array<unsigned char, crypto_pwhash_SALTBYTES> pass_salt_;
const ssl_verification_t webhook_verify_;
bool stop_;
//! Async acceptor routine
@ -85,7 +86,7 @@ namespace lws { namespace rpc { namespace scanner
public:
static boost::asio::ip::tcp::endpoint get_endpoint(const std::string& address);
explicit server(boost::asio::io_context& io, db::storage disk, rpc::client zclient, std::vector<std::shared_ptr<queue>> local, std::vector<db::account_id> active, ssl_verification_t webhook_verify);
explicit server(boost::asio::io_context& io, db::storage disk, rpc::client zclient, std::vector<std::shared_ptr<queue>> local, std::vector<db::account_id> active, std::shared_ptr<boost::asio::ssl::context> ssl);
server(const server&) = delete;
server(server&&) = delete;

View file

@ -27,100 +27,40 @@
#pragma once
#include <boost/asio/io_context.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/utility/string_ref.hpp>
#include <chrono>
#include <string>
#include "byte_slice.h" // monero/contrib/epee/include
#include "misc_log_ex.h" // monero/contrib/epee/include
#include "net/http_client.h" // monero/contrib/epee/include
#include "net/http/client.h"
#include "span.h"
#include "wire/json.h"
#include "wire/msgpack.h"
namespace lws { namespace rpc
{
namespace net = epee::net_utils;
template<typename T>
void http_send(net::http::http_simple_client& client, boost::string_ref uri, const T& event, const net::http::fields_list& params, const std::chrono::milliseconds timeout)
void http_async(boost::asio::io_context& io, net::http::client& client, const epee::span<const T> events)
{
if (uri.empty())
uri = "/";
epee::byte_slice bytes{};
const std::string& url = event.value.second.url;
const std::error_code json_error = wire::json::to_bytes(bytes, event);
const net::http::http_response_info* info = nullptr;
if (json_error)
{
MERROR("Failed to generate webhook JSON: " << json_error.message());
return;
}
MINFO("Sending webhook to " << url);
if (!client.invoke(uri, "POST", std::string{bytes.begin(), bytes.end()}, timeout, std::addressof(info), params))
{
MERROR("Failed to invoke http request to " << url);
return;
}
if (!info)
{
MERROR("Failed to invoke http request to " << url << ", internal error (null response ptr)");
return;
}
if (info->m_response_code != 200 && info->m_response_code != 201)
{
MERROR("Failed to invoke http request to " << url << ", wrong response code: " << info->m_response_code);
return;
}
}
template<typename T>
void http_send(const epee::span<const T> events, const std::chrono::milliseconds timeout, net::ssl_verification_t verify_mode)
{
if (events.empty())
return;
net::http::url_content url{};
net::http::http_simple_client client{};
net::http::fields_list params;
params.emplace_back("Content-Type", "application/json; charset=utf-8");
for (const auto& event : events)
{
if (event.value.second.url.empty() || !net::parse_url(event.value.second.url, url))
if (event.value.second.url != "zmq")
{
MERROR("Bad URL for webhook event: " << event.value.second.url);
continue;
}
const bool https = (url.schema == "https");
if (!https && url.schema != "http")
epee::byte_slice bytes{};
const std::error_code json_error = wire::json::to_bytes(bytes, event);
if (!json_error)
{
MERROR("Only http or https connections: " << event.value.second.url);
continue;
MINFO("Sending webhook to " << event.value.second.url);
const expect<void> rc =
client.post_async(io, event.value.second.url, std::move(bytes));
if (!rc)
MERROR("Failed to send HTTP webhook to " << event.value.second.url << ": " << rc.error().message());
}
const net::ssl_support_t ssl_mode = https ?
net::ssl_support_t::e_ssl_support_enabled : net::ssl_support_t::e_ssl_support_disabled;
net::ssl_options_t ssl_options{ssl_mode};
if (https)
ssl_options.verification = verify_mode;
if (url.port == 0)
url.port = https ? 443 : 80;
client.set_server(url.host, std::to_string(url.port), boost::none, std::move(ssl_options));
if (client.connect(timeout))
http_send(client, url.uri, event, params, timeout);
else
MERROR("Unable to send webhook to " << event.value.second.url);
client.disconnect();
MERROR("Failed to generate webhook JSON: " << json_error.message());
}
}
}
@ -173,15 +113,17 @@ namespace lws { namespace rpc
}
template<typename T>
void send_webhook(
void send_webhook_async(
boost::asio::io_context& io,
const rpc::client& client,
net::http::client& http_client,
const epee::span<const T> events,
const boost::string_ref json_topic,
const boost::string_ref msgpack_topic,
const std::chrono::seconds timeout,
epee::net_utils::ssl_verification_t verify_mode)
const boost::string_ref msgpack_topic)
{
http_send(events, timeout, verify_mode);
http_async(io, http_client, events);
// ZMQ PUB sockets never block, but RMQ does. No easy way around this.
zmq_send(client, events, json_topic, msgpack_topic);
}
}} // lws // rpc

View file

@ -56,6 +56,7 @@
#include "misc_log_ex.h" // monero/contrib/epee/include
#include "net/net_parse_helpers.h"
#include "net/net_ssl.h" // monero/contrib/epee/include
#include "net/net_utils_base.h" // monero/contrib/epee/include
#include "rpc/daemon_messages.h" // monero/src
#include "rpc/daemon_zmq.h"
#include "rpc/json.h"
@ -75,18 +76,9 @@
namespace lws
{
// Not in `rates.h` - defaulting to JSON output seems odd
std::ostream& operator<<(std::ostream& out, lws::rates const& src)
{
wire::json_stream_writer dest{out};
lws::write_bytes(dest, src);
dest.finish();
return out;
}
namespace
{
namespace net = epee::net_utils;
namespace enet = epee::net_utils;
constexpr const std::chrono::minutes block_rpc_timeout{2};
constexpr const std::chrono::seconds send_timeout{30};
@ -152,9 +144,9 @@ namespace lws
return true;
}
void send_payment_hook(rpc::client& client, const epee::span<const db::webhook_tx_confirmation> events, net::ssl_verification_t verify_mode)
void send_payment_hook(boost::asio::io_context& io, rpc::client& client, net::http::client& http, const epee::span<const db::webhook_tx_confirmation> events)
{
rpc::send_webhook(client, events, "json-full-payment_hook:", "msgpack-full-payment_hook:", std::chrono::seconds{5}, verify_mode);
rpc::send_webhook_async(io, client, http, events, "json-full-payment_hook:", "msgpack-full-payment_hook:");
}
std::size_t get_target_time(db::block_id height)
@ -194,9 +186,9 @@ namespace lws
vec.erase(vec.begin());
};
void send_spend_hook(rpc::client& client, const epee::span<const db::webhook_tx_spend> events, net::ssl_verification_t verify_mode)
void send_spend_hook(boost::asio::io_context& io, rpc::client& client, net::http::client& http, const epee::span<const db::webhook_tx_spend> events)
{
rpc::send_webhook(client, events, "json-full-spend_hook:", "msgpack-full-spend_hook:", std::chrono::seconds{5}, verify_mode);
rpc::send_webhook_async(io, client, http, events, "json-full-spend_hook:", "msgpack-full-spend_hook:");
}
struct add_spend
@ -219,7 +211,7 @@ namespace lws
{
db::storage const& disk_;
rpc::client& client_;
net::ssl_verification_t verify_mode_;
scanner_sync& http_;
std::unordered_map<crypto::hash, crypto::hash> txpool_;
bool operator()(expect<db::storage_reader>& reader, lws::account& user, const db::output& out)
@ -290,7 +282,7 @@ namespace lws
else
events.pop_back(); //cannot compute tx_hash
}
send_payment_hook(client_, epee::to_span(events), verify_mode_);
send_payment_hook(http_.io_, client_, http_.webhooks_, epee::to_span(events));
return true;
}
};
@ -567,7 +559,7 @@ namespace lws
scan_transaction_base(users, height, timestamp, tx_hash, tx, out_ids, reader, add_spend{}, add_output{});
}
void scan_transactions(std::string&& txpool_msg, epee::span<lws::account> users, db::storage const& disk, rpc::client& client, const scanner_options& opts)
void scan_transactions(std::string&& txpool_msg, epee::span<lws::account> users, db::storage const& disk, scanner_sync& self, rpc::client& client, const scanner_options& opts)
{
// uint64::max is for txpool
static const std::vector<std::uint64_t> fake_outs(
@ -585,20 +577,11 @@ namespace lws
boost::numeric_cast<std::uint64_t>(std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()));
subaddress_reader reader{std::optional<db::storage>{disk.clone()}, opts.enable_subaddresses};
send_webhook sender{disk, client, opts.webhook_verify};
send_webhook sender{disk, client, self};
for (const auto& tx : parsed->txes)
scan_transaction_base(users, db::block_id::txpool, time, crypto::hash{}, tx, fake_outs, reader, null_spend{}, sender);
}
void update_rates(rpc::context& ctx)
{
const expect<boost::optional<lws::rates>> new_rates = ctx.retrieve_rates();
if (!new_rates)
MERROR("Failed to retrieve exchange rates: " << new_rates.error().message());
else if (*new_rates)
MINFO("Updated exchange rates: " << *(*new_rates));
}
void do_scan_loop(scanner_sync& self, std::shared_ptr<thread_data> data, const bool leader_thread) noexcept
{
struct stop_
@ -629,7 +612,7 @@ namespace lws
auto new_client = MONERO_UNWRAP(client.clone());
MONERO_UNWRAP(new_client.watch_scan_signals());
user_data store_local{disk.clone()};
if (!scanner::loop(self.stop_, std::move(store_local), disk.clone(), std::move(new_client), std::move(users), *queue, opts, leader_thread))
if (!scanner::loop(self, std::move(store_local), disk.clone(), std::move(new_client), std::move(users), *queue, opts, leader_thread))
return;
}
@ -657,8 +640,8 @@ namespace lws
}
} // anonymous
scanner::scanner(db::storage disk)
: disk_(std::move(disk)), sync_(), signals_(sync_.io_)
scanner::scanner(db::storage disk, epee::net_utils::ssl_verification_t webhook_verify)
: disk_(std::move(disk)), sync_(webhook_verify), signals_(sync_.io_)
{
signals_.add(SIGINT);
signals_.async_wait([this] (const boost::system::error_code& error, int)
@ -671,7 +654,7 @@ namespace lws
scanner::~scanner()
{}
bool scanner::loop(const std::atomic<bool>& stop, store_func store, std::optional<db::storage> disk, rpc::client client, std::vector<lws::account> users, rpc::scanner::queue& queue, const scanner_options& opts, const bool leader_thread)
bool scanner::loop(scanner_sync& self, store_func store, std::optional<db::storage> disk, rpc::client client, std::vector<lws::account> users, rpc::scanner::queue& queue, const scanner_options& opts, const bool leader_thread)
{
if (users.empty())
return true;
@ -698,7 +681,7 @@ namespace lws
if (opts.untrusted_daemon && disk)
last_pow = MONERO_UNWRAP(MONERO_UNWRAP(disk->start_read()).get_last_pow_block()).id;
while (!stop)
while (!self.stop_)
{
blockchain.clear();
new_pow.clear();
@ -796,7 +779,7 @@ namespace lws
{
if (!disk || message->first != rpc::client::topic::txpool)
break; // inner for loop
scan_transactions(std::move(message->second), epee::to_mut_span(users), *disk, client, opts);
scan_transactions(std::move(message->second), epee::to_mut_span(users), *disk, self, client, opts);
}
for ( ; message != new_pubs->end(); ++message)
@ -885,7 +868,7 @@ namespace lws
pow_window.median_timestamps.erase(pow_window.median_timestamps.begin());
// longhash takes a while, check is_running
if (stop)
if (self.stop_)
return false;
diff = cryptonote::next_difficulty(pow_window.pow_timestamps, pow_window.cumulative_diffs, get_target_time(db::block_id(fetched->start_height)));
@ -942,7 +925,7 @@ namespace lws
} // for each block
reader.reader = std::error_code{common_error::kInvalidArgument}; // cleanup reader before next write
if (!store(client, epee::to_span(blockchain), epee::to_span(users), epee::to_span(new_pow), opts))
if (!store(self.io_, client, self.webhooks_, epee::to_span(blockchain), epee::to_span(users), epee::to_span(new_pow)))
return false;
// TODO
@ -1051,7 +1034,7 @@ namespace lws
MONERO_UNWRAP(ctx.connect()),
queues,
std::move(active),
opts.webhook_verify
self.webhooks_.ssl_context()
);
rpc::scanner::server::start_user_checking(server);
@ -1274,7 +1257,7 @@ namespace lws
}
} // anonymous
bool user_data::store(db::storage& disk, rpc::client& client, const epee::span<const crypto::hash> chain, const epee::span<const lws::account> users, const epee::span<const db::pow_sync> pow, const scanner_options& opts)
bool user_data::store(boost::asio::io_context& io, db::storage& disk, rpc::client& client, net::http::client& webhook, const epee::span<const crypto::hash> chain, const epee::span<const lws::account> users, const epee::span<const db::pow_sync> pow)
{
if (users.empty())
return true;
@ -1293,8 +1276,8 @@ namespace lws
}
MINFO("Processed " << chain.size() << " block(s) against " << users.size() << " account(s)");
send_payment_hook(client, epee::to_span(updated->confirm_pubs), opts.webhook_verify);
send_spend_hook(client, epee::to_span(updated->spend_pubs), opts.webhook_verify);
send_payment_hook(io, client, webhook, epee::to_span(updated->confirm_pubs));
send_spend_hook(io, client, webhook, epee::to_span(updated->spend_pubs));
if (updated->accounts_updated != users.size())
{
MWARNING("Only updated " << updated->accounts_updated << " account(s) out of " << users.size() << ", resetting");
@ -1309,9 +1292,9 @@ namespace lws
return true;
}
bool user_data::operator()(rpc::client& client, const epee::span<const crypto::hash> chain, const epee::span<const lws::account> users, const epee::span<const db::pow_sync> pow, const scanner_options& opts)
bool user_data::operator()(boost::asio::io_context& io, rpc::client& client, net::http::client& webhook, const epee::span<const crypto::hash> chain, const epee::span<const lws::account> users, const epee::span<const db::pow_sync> pow)
{
return store(disk_, client, chain, users, pow, opts);
return store(io, disk_, client, webhook, chain, users, pow);
}
expect<rpc::client> scanner::sync(rpc::client client, const bool untrusted_daemon)
@ -1335,26 +1318,25 @@ namespace lws
/*! \NOTE Be careful about references and lifetimes of the callbacks. The
ones below are safe because no `io_context::run()` call is after the
destruction of the references.
\NOTE That `ctx` will need a strand or lock if multiple
`io_context::run()` calls are used. */
destruction of the references. */
boost::asio::steady_timer rate_timer{sync_.io_};
class rate_updater
{
boost::asio::io_context& io_;
boost::asio::steady_timer& rate_timer_;
rpc::context& ctx_;
const std::chrono::minutes rate_interval_;
public:
explicit rate_updater(boost::asio::steady_timer& rate_timer, rpc::context& ctx)
: rate_timer_(rate_timer), ctx_(ctx), rate_interval_(ctx.cache_interval())
explicit rate_updater(boost::asio::io_context& io, boost::asio::steady_timer& rate_timer, rpc::context& ctx)
: io_(io), rate_timer_(rate_timer), ctx_(ctx), rate_interval_(ctx.cache_interval())
{}
void operator()(const boost::system::error_code& error = {}) const
{
update_rates(ctx_);
const expect<void> status = ctx_.retrieve_rates_async(io_);
if (!status)
MERROR("Unable to retrieve exchange rates: " << status.error());
rate_timer_.expires_from_now(rate_interval_);
rate_timer_.async_wait(*this);
}
@ -1363,7 +1345,7 @@ namespace lws
};
{
rate_updater updater{rate_timer, ctx};
rate_updater updater{sync_.io_, rate_timer, ctx};
if (std::chrono::minutes{0} < updater.rate_interval())
updater();
}

View file

@ -35,6 +35,7 @@
#include "db/fwd.h"
#include "db/storage.h"
#include "net/http/client.h"
#include "net/net_ssl.h" // monero/contrib/epee/include
#include "rpc/client.h"
#include "rpc/scanner/fwd.h"
@ -44,7 +45,6 @@ namespace lws
{
struct scanner_options
{
epee::net_utils::ssl_verification_t webhook_verify;
bool enable_subaddresses;
bool untrusted_daemon;
};
@ -69,20 +69,21 @@ namespace lws
/*! Store updated accounts locally (`disk`), and send ZMQ/RMQ/webhook
events. `users` must be sorted by height (lowest first). */
static bool store(db::storage& disk, rpc::client& zclient, epee::span<const crypto::hash> chain, epee::span<const lws::account> users, epee::span<const db::pow_sync> pow, const scanner_options&);
static bool store(boost::asio::io_context& io, db::storage& disk, rpc::client& zclient, net::http::client& webhook ,epee::span<const crypto::hash> chain, epee::span<const lws::account> users, epee::span<const db::pow_sync> pow);
//! `users` must be sorted by height (lowest first)
bool operator()(rpc::client& zclient, epee::span<const crypto::hash> chain, epee::span<const lws::account> users, epee::span<const db::pow_sync> pow, const scanner_options&);
bool operator()(boost::asio::io_context& io, rpc::client& zclient, net::http::client& webhook, epee::span<const crypto::hash> chain, epee::span<const lws::account> users, epee::span<const db::pow_sync> pow);
};
struct scanner_sync
{
boost::asio::io_context io_;
net::http::client webhooks_;
std::atomic<bool> stop_; //!< Stop scanning but do not shutdown
std::atomic<bool> shutdown_; //!< Exit scanner::run
explicit scanner_sync()
: io_(), stop_(false), shutdown_(false)
explicit scanner_sync(epee::net_utils::ssl_verification_t webhook_verify)
: io_(), webhooks_(webhook_verify), stop_(false), shutdown_(false)
{}
bool is_running() const noexcept { return !stop_ && !shutdown_; }
@ -104,18 +105,18 @@ namespace lws
public:
//! Register `SIGINT` handler and keep a copy of `disk`
explicit scanner(db::storage disk);
explicit scanner(db::storage disk, epee::net_utils::ssl_verification_t webhook_verify);
~scanner();
//! Callback for storing user account (typically local lmdb, but perhaps remote rpc)
using store_func = std::function<bool(rpc::client&, epee::span<const crypto::hash>, epee::span<const lws::account>, epee::span<const db::pow_sync>, const scanner_options&)>;
using store_func = std::function<bool(boost::asio::io_context&, rpc::client&, net::http::client&, epee::span<const crypto::hash>, epee::span<const lws::account>, epee::span<const db::pow_sync>)>;
/*! Run _just_ the inner scanner loop while `self.is_running() == true`.
*
\throw std::exception on hard errors (shutdown) conditions
\return True iff `queue` indicates thread now has zero accounts. False
indicates a soft, typically recoverable error. */
static bool loop(const std::atomic<bool>& stop, store_func store, std::optional<db::storage> disk, rpc::client client, std::vector<lws::account> users, rpc::scanner::queue& queue, const scanner_options& opts, bool leader_thread);
static bool loop(scanner_sync& self, store_func store, std::optional<db::storage> disk, rpc::client client, std::vector<lws::account> users, rpc::scanner::queue& queue, const scanner_options& opts, bool leader_thread);
//! Use `client` to sync blockchain data, and \return client if successful.
expect<rpc::client> sync(rpc::client client, const bool untrusted_daemon = false);

View file

@ -296,13 +296,12 @@ namespace
auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), std::move(prog.zmq_pub), std::move(prog.rmq), prog.rates_interval, prog.untrusted_daemon);
//! SIGINT handle registered by `scanner` constructor
lws::scanner scanner{disk.clone()};
lws::scanner scanner{disk.clone(), prog.rest_config.webhook_verify};
MINFO("Using monerod ZMQ RPC at " << ctx.daemon_address());
auto client = scanner.sync(ctx.connect().value(), prog.untrusted_daemon).value();
const auto enable_subaddresses = bool(prog.rest_config.max_subaddresses);
const auto webhook_verify = prog.rest_config.webhook_verify;
lws::rest_server server{
epee::to_span(prog.rest_servers), prog.admin_rest_servers, std::move(disk), std::move(client), std::move(prog.rest_config)
};
@ -317,7 +316,7 @@ namespace
prog.scan_threads,
std::move(prog.lws_server_addr),
std::move(prog.lws_server_pass),
lws::scanner_options{webhook_verify, enable_subaddresses, prog.untrusted_daemon}
lws::scanner_options{enable_subaddresses, prog.untrusted_daemon}
);
}
} // anonymous

View file

@ -31,6 +31,7 @@ target_include_directories(monero-lws-unit-framework PUBLIC ${CMAKE_CURRENT_SOUR
target_link_libraries(monero-lws-unit-framework)
add_subdirectory(db)
add_subdirectory(net)
add_subdirectory(rpc)
add_subdirectory(wire)
@ -40,6 +41,8 @@ target_link_libraries(monero-lws-unit
monero-lws-daemon-common
monero-lws-unit-db
monero-lws-unit-framework
monero-lws-unit-net
monero-lws-unit-net-http
monero-lws-unit-rpc
monero-lws-unit-wire
monero-lws-unit-wire-json

View file

@ -0,0 +1,32 @@
# Copyright (c) 2024, The Monero Project
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification, are
# permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of
# conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list
# of conditions and the following disclaimer in the documentation and/or other
# materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be
# used to endorse or promote products derived from this software without specific
# prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
# THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
add_subdirectory(http)
add_library(monero-lws-unit-net)
target_link_libraries(monero-lws-unit-net monero-lws-unit-net-http)

View file

@ -0,0 +1,34 @@
# Copyright (c) 2024, The Monero Project
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification, are
# permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of
# conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list
# of conditions and the following disclaimer in the documentation and/or other
# materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be
# used to endorse or promote products derived from this software without specific
# prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
# THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
add_library(monero-lws-unit-net-http OBJECT client.test.cpp)
target_link_libraries(
monero-lws-unit-net-http
monero-lws-net-http
monero-lws-unit-framework
monero::libraries)

View file

@ -0,0 +1,166 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "framework.test.h"
#include <atomic>
#include <boost/asio/error.hpp>
#include <boost/asio/io_context.hpp>
#include <string>
#include "crypto/crypto.h" // monero/src
#include "net/http/client.h"
#include "net/http_server_impl_base.h" // monero/contrib/epee/include
namespace
{
constexpr const std::uint16_t server_port = 10000;
constexpr const std::uint16_t invalid_server_port = 10001;
namespace enet = epee::net_utils;
struct context : enet::connection_context_base
{
context()
: enet::connection_context_base()
{}
};
struct handler : epee::http_server_impl_base<handler, context>
{
handler()
: epee::http_server_impl_base<handler, context>()
{}
virtual bool
handle_http_request(const enet::http::http_request_info& query, enet::http::http_response_info& response, context&)
override final
{
if (query.m_URI == "/")
response.m_response_code = 404;
else
response.m_response_code = 200;
response.m_body = query.m_URI;
return true;
}
};
}
LWS_CASE("net::http::client")
{
boost::asio::io_context io;
handler server{};
server.init(&crypto::generate_random_bytes_thread_safe, std::to_string(server_port));
server.run(1, false);
SETUP("server and client")
{
net::http::client client{epee::net_utils::ssl_verification_t::none};
SECTION("GET 200 OK")
{
std::atomic<bool> done = false;
const auto handler = [&done, &lest_env] (boost::system::error_code error, std::string body)
{
EXPECT(!error);
EXPECT(body == "/some_endpoint");
done = true;
};
client.get_async(
io, "http://127.0.0.1:" + std::to_string(server_port) + "/some_endpoint", handler
);
while (!done)
{
io.run_one();
io.restart();
}
}
SECTION("GET 200 OK Twice")
{
std::atomic<unsigned> done = 0;
const auto handler = [&done, &lest_env] (boost::system::error_code error, std::string body)
{
EXPECT(!error);
EXPECT(body == "/some_endpoint");
++done;
};
client.get_async(
io, "http://127.0.0.1:" + std::to_string(server_port) + "/some_endpoint", handler
);
client.get_async(
io, "http://127.0.0.1:" + std::to_string(server_port) + "/some_endpoint", handler
);
while (done != 2)
{
io.run_one();
io.restart();
}
}
SECTION("GET 404 NOT FOUND")
{
std::atomic<bool> done = false;
const auto handler = [&done, &lest_env] (boost::system::error_code error, std::string body)
{
EXPECT(error == boost::asio::error::operation_not_supported);
EXPECT(body.empty());
done = true;
};
client.get_async(
io, "http://127.0.0.1:" + std::to_string(server_port), handler
);
while (!done)
{
io.run_one();
io.restart();
}
}
SECTION("GET (Invalid server address)")
{
std::atomic<bool> done = false;
const auto handler = [&done, &lest_env] (boost::system::error_code error, std::string body)
{
EXPECT(error == boost::asio::error::connection_refused);
EXPECT(body.empty());
done = true;
};
client.get_async(
io, "http://127.0.0.1:" + std::to_string(invalid_server_port), handler
);
while (!done)
{
io.run_one();
io.restart();
}
}
}
}

View file

@ -350,7 +350,7 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run")
std::vector<epee::byte_slice> messages{};
messages.push_back(to_json_rpc(1));
lws::scanner scanner{db.clone()};
lws::scanner scanner{db.clone(), epee::net_utils::ssl_verification_t::none};
boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages));
const join on_scope_exit{server_thread};
@ -384,7 +384,7 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run")
lws_test::test_chain(lest_env, MONERO_UNWRAP(db.start_read()), last_block.id, {hashes.data(), 1});
{
lws::scanner scanner{db.clone()};
lws::scanner scanner{db.clone(), epee::net_utils::ssl_verification_t::none};
boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages));
const join on_scope_exit{server_thread};
EXPECT(scanner.sync(MONERO_UNWRAP(rpc.connect())));
@ -408,7 +408,7 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run")
message.hashes.resize(1);
messages.push_back(daemon_response(message));
lws::scanner scanner{db.clone()};
lws::scanner scanner{db.clone(), epee::net_utils::ssl_verification_t::none};
boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages));
const join on_scope_exit{server_thread};
EXPECT(scanner.sync(MONERO_UNWRAP(rpc.connect())));
@ -516,7 +516,7 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run")
messages.push_back(daemon_response(hmessage));
{
lws::scanner scanner{db.clone()};
lws::scanner scanner{db.clone(), epee::net_utils::ssl_verification_t::none};
boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages));
const join on_scope_exit{server_thread};
EXPECT(scanner.sync(MONERO_UNWRAP(rpc.connect())));
@ -534,10 +534,8 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run")
bmessage.output_indices.resize(1);
messages.push_back(daemon_response(bmessage));
{
static constexpr const lws::scanner_options opts{
epee::net_utils::ssl_verification_t::none, true, false
};
lws::scanner scanner{db.clone()};
static constexpr const lws::scanner_options opts{true, false};
lws::scanner scanner{db.clone(), epee::net_utils::ssl_verification_t::none};
boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages));
const join on_scope_exit{server_thread};
scanner.run(std::move(rpc), 1, {}, {}, opts);