Switch from epee http server to boost::beast http server. Min boost 1.70 (#136)

There is roughly a 7.4% increase in performance in the switch to
boost::beast. Additionally, the REST endpoints `/daemon_status`,
`/get_unspent_outs`, and `/submit_raw_tx` do not block in ZMQ calls,
allowing for better response times regardless of `monerod` status.

The REST endpoints `/login and `/get_random_outs` still need updates
to prevent blocking (`/login` is conditional on DB state).
This commit is contained in:
Lee *!* Clagett 2024-10-23 11:31:03 -04:00 committed by GitHub
parent 9d09c561d3
commit a81d71ae29
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 1525 additions and 411 deletions

View file

@ -157,7 +157,7 @@ if(STATIC)
set(Boost_USE_STATIC_LIBS ON) set(Boost_USE_STATIC_LIBS ON)
set(Boost_USE_STATIC_RUNTIME ON) set(Boost_USE_STATIC_RUNTIME ON)
endif() endif()
find_package(Boost 1.58 QUIET REQUIRED COMPONENTS chrono filesystem program_options regex serialization system thread) find_package(Boost 1.70 QUIET REQUIRED COMPONENTS chrono filesystem program_options regex serialization system thread)
if (NOT (Boost_THREAD_LIBRARY STREQUAL monero_Boost_THREAD_LIBRARY_RELEASE)) if (NOT (Boost_THREAD_LIBRARY STREQUAL monero_Boost_THREAD_LIBRARY_RELEASE))
message(STATUS "Found Boost_THREAD_LIBRARY: ${Boost_THREAD_LIBRARY}") message(STATUS "Found Boost_THREAD_LIBRARY: ${Boost_THREAD_LIBRARY}")

View file

@ -62,7 +62,7 @@ library archives (`.a`).
| ------------ | ------------- | -------- | -------------------- | ------------ | ------------------ | ------------------- | -------- | --------------- | | ------------ | ------------- | -------- | -------------------- | ------------ | ------------------ | ------------------- | -------- | --------------- |
| GCC | 4.7.3 | NO | `build-essential` | `base-devel` | `base-devel` | `gcc` | NO | | | GCC | 4.7.3 | NO | `build-essential` | `base-devel` | `base-devel` | `gcc` | NO | |
| CMake | 3.1 | NO | `cmake` | `cmake` | `cmake` | `cmake` | NO | | | CMake | 3.1 | NO | `cmake` | `cmake` | `cmake` | `cmake` | NO | |
| Boost | 1.58 | NO | `libboost-all-dev` | `boost` | `boost-devel` | `boost-devel` | NO | C++ libraries | | Boost | 1.70 | NO | `libboost-all-dev` | `boost` | `boost-devel` | `boost-devel` | NO | C++ libraries |
| monero | 0.15 | NO | | | | | NO | Monero libraries| | monero | 0.15 | NO | | | | | NO | Monero libraries|
| OpenSSL | basically any | NO | `libssl-dev` | `openssl` | `libressl-devel` | `openssl-devel` | NO | sha256 sum | | OpenSSL | basically any | NO | `libssl-dev` | `openssl` | `libressl-devel` | `openssl-devel` | NO | sha256 sum |
| libzmq | 3.0.0 | NO | `libzmq3-dev` | `zeromq` | `zeromq-devel` | `zeromq-devel` | NO | ZeroMQ library | | libzmq | 3.0.0 | NO | `libzmq3-dev` | `zeromq` | `zeromq-devel` | `zeromq-devel` | NO | ZeroMQ library |

View file

@ -31,6 +31,7 @@ include_directories(.)
add_subdirectory(lmdb) add_subdirectory(lmdb)
add_subdirectory(wire) add_subdirectory(wire)
add_subdirectory(db) add_subdirectory(db)
add_subdirectory(net)
add_subdirectory(rpc) add_subdirectory(rpc)
add_subdirectory(util) add_subdirectory(util)
@ -49,12 +50,14 @@ target_link_libraries(monero-lws-daemon-common
${MONERO_lmdb} ${MONERO_lmdb}
monero-lws-common monero-lws-common
monero-lws-db monero-lws-db
monero-lws-net
monero-lws-rpc monero-lws-rpc
monero-lws-rpc-scanner monero-lws-rpc-scanner
monero-lws-wire-json monero-lws-wire-json
monero-lws-util monero-lws-util
${Boost_CHRONO_LIBRARY} ${Boost_CHRONO_LIBRARY}
${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY}
${Boost_SYSTEM_LIBRARY}
${Boost_THREAD_LIBRARY} ${Boost_THREAD_LIBRARY}
${Boost_THREAD_LIBS_INIT} ${Boost_THREAD_LIBS_INIT}
${EXTRA_LIBRARIES} ${EXTRA_LIBRARIES}

33
src/net/CMakeLists.txt Normal file
View file

@ -0,0 +1,33 @@
# Copyright (c) 2024, The Monero Project
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification, are
# permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of
# conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list
# of conditions and the following disclaimer in the documentation and/or other
# materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be
# used to endorse or promote products derived from this software without specific
# prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
# THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
set(monero-lws-net_sources zmq_async.cpp)
set(monero-lws-net_headers zmq_async.h)
add_library(monero-lws-net ${monero-lws-net_sources} ${monero-lws-net_headers})
target_link_libraries(monero-lws-net monero::libraries)

103
src/net/zmq_async.cpp Normal file
View file

@ -0,0 +1,103 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "zmq_async.h"
#include <stdexcept>
namespace net { namespace zmq
{
const boost::system::error_category& boost_error_category() noexcept
{
struct category final : boost::system::error_category
{
virtual const char* name() const noexcept override final
{
return "error::error_category()";
}
virtual std::string message(int value) const override final
{
char const* const msg = zmq_strerror(value);
if (msg)
return msg;
return "zmq_strerror failure";
}
virtual boost::system::error_condition default_error_condition(int value) const noexcept override final
{
// maps specific errors to generic `std::errc` cases.
switch (value)
{
case EFSM:
case ETERM:
break;
default:
/* zmq is using cerrno errors. C++ spec indicates that `std::errc`
values must be identical to the cerrno value. So just map every zmq
specific error to the generic errc equivalent. zmq extensions must
be in the switch or they map to a non-existent errc enum value. */
return boost::system::errc::errc_t(value);
}
return boost::system::error_condition{value, *this};
}
};
static const category instance{};
return instance;
}
boost::system::error_code make_error_code(std::error_code code)
{
if (std::addressof(code.category()) != std::addressof(error_category()))
throw std::logic_error{"Expected only ZMQ errors"};
return boost::system::error_code{code.value(), boost_error_category()};
}
void free_descriptor::operator()(adescriptor* ptr) noexcept
{
if (ptr)
{
ptr->release(); // release ASIO ownership, destroys all queued handlers
delete ptr;
}
}
expect<async_client> async_client::make(boost::asio::io_service& io, socket zsock)
{
MONERO_PRECOND(zsock != nullptr);
int fd = 0;
std::size_t length = sizeof(fd);
if (zmq_getsockopt(zsock.get(), ZMQ_FD, &fd, &length) != 0)
return net::zmq::get_error_code();
async_client out{std::move(zsock), nullptr, false};
out.asock.reset(new adescriptor{io, fd});
return out;
}
}} // net // zmq

165
src/net/zmq_async.h Normal file
View file

@ -0,0 +1,165 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <boost/asio/compose.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/system/error_code.hpp>
#include <cstddef>
#include <memory>
#include <string>
#include <zmq.h>
#include "byte_slice.h" // monero/contrib/epee/include
#include "common/expect.h" // monero/src
#include "net/zmq.h"
namespace net { namespace zmq
{
//! \return Category for ZMQ errors.
const boost::system::error_category& boost_error_category() noexcept;
//! \return `code` iff the `::net::zmq` error category
boost::system::error_code make_error_code(std::error_code code);
using adescriptor = boost::asio::posix::stream_descriptor;
struct free_descriptor
{
void operator()(adescriptor* ptr) noexcept;
};
using asocket = std::unique_ptr<adescriptor, free_descriptor>;
struct async_client
{
async_client() = delete;
socket zsock;
asocket asock;
bool close;
static expect<async_client> make(boost::asio::io_service& io, socket zsock);
};
class read_msg_op
{
async_client* sock_;
std::string* msg_;
public:
read_msg_op(async_client& sock, std::string& msg)
: sock_(std::addressof(sock)), msg_(std::addressof(msg))
{}
template<typename F>
void operator()(F& self, const boost::system::error_code error = {}, std::size_t = 0)
{
if (error)
return self.complete(error, 0);
if (!sock_)
return;
if (sock_->close)
return self.complete(boost::asio::error::operation_aborted, 0);
assert(sock_->zsock && sock_->asock);
expect<std::string> msg = receive(sock_->zsock.get(), ZMQ_DONTWAIT);
if (!msg)
{
if (msg != make_error_code(EAGAIN))
return self.complete(make_error_code(msg.error()), 0);
// try again
sock_->asock->async_read_some(boost::asio::null_buffers(), std::move(self));
return;
}
*msg_ = std::move(*msg);
self.complete(error, msg_->size());
}
};
class write_msg_op
{
async_client* sock_;
epee::byte_slice msg_;
public:
write_msg_op(async_client& sock, epee::byte_slice msg)
: sock_(std::addressof(sock)), msg_(std::move(msg))
{}
template<typename F>
void operator()(F& self, const boost::system::error_code error = {}, std::size_t = 0)
{
if (error)
return self.complete(error, 0);
if (!sock_)
return;
if (sock_->close)
return self.complete(boost::asio::error::operation_aborted, 0);
assert(sock_->zsock && sock_->asock);
expect<void> status =
::net::zmq::send(msg_.clone(), sock_->zsock.get(), ZMQ_DONTWAIT);
if (!status)
{
if (status != ::net::zmq::make_error_code(EAGAIN))
return self.complete(make_error_code(status.error()), 0);
// try again
sock_->asock->async_write_some(boost::asio::null_buffers(), std::move(self));
return;
}
self.complete(error, msg_.size());
}
};
//! Cannot have an `async_read` and `async_write` at same time (edge trigger)
template<typename F>
void async_read(async_client& sock, std::string& buffer, F&& f)
{
// async_compose is required for correct strand invocation, etc
boost::asio::async_compose<F, void(boost::system::error_code, std::size_t)>(
read_msg_op{sock, buffer}, f, *sock.asock
);
}
//! Cannot have an `async_write` and `async_read` at same time (edge trigger)
template<typename F>
void async_write(async_client& sock, epee::byte_slice msg, F&& f)
{
// async_compose is required for correct strand invocation, etc
boost::asio::async_compose<F, void(boost::system::error_code, std::size_t)>(
write_msg_op{sock, std::move(msg)}, f, *sock.asock
);
}
}} // net // zmq

File diff suppressed because it is too large Load diff

View file

@ -28,6 +28,7 @@
#pragma once #pragma once
#include <boost/asio/io_service.hpp> #include <boost/asio/io_service.hpp>
#include <boost/thread/thread.hpp>
#include <cstddef> #include <cstddef>
#include <list> #include <list>
#include <string> #include <string>
@ -43,9 +44,15 @@ namespace lws
class rest_server class rest_server
{ {
struct internal; struct internal;
template<typename> struct connection;
template<typename> struct handler_loop;
template<typename> struct accept_loop;
boost::asio::io_service io_service_; boost::asio::io_service io_service_; //!< Put first so its destroyed last
std::list<internal> ports_; std::list<internal> ports_;
std::vector<boost::thread> workers_;
void run_io();
public: public:
struct configuration struct configuration

View file

@ -39,6 +39,7 @@
#include "misc_log_ex.h" // monero/contrib/epee/include #include "misc_log_ex.h" // monero/contrib/epee/include
#include "net/http_client.h" // monero/contrib/epee/include #include "net/http_client.h" // monero/contrib/epee/include
#include "net/zmq.h" // monero/src #include "net/zmq.h" // monero/src
#include "net/zmq_async.h"
#include "scanner.h" #include "scanner.h"
#include "serialization/json_object.h" // monero/src #include "serialization/json_object.h" // monero/src
#include "wire/msgpack.h" #include "wire/msgpack.h"
@ -140,7 +141,7 @@ namespace rpc
break; break;
const int err = zmq_errno(); const int err = zmq_errno();
if (err != EINTR) if (err != EINTR)
return net::zmq::make_error_code(err); return ::net::zmq::make_error_code(err);
} }
if (items[0].revents) if (items[0].revents)
return success(); return success();
@ -186,7 +187,7 @@ namespace rpc
{ {
struct context struct context
{ {
explicit context(zcontext comm, socket signal_pub, socket external_pub, rcontext rmq, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval, bool untrusted_daemon) explicit context(zcontext comm, net::zmq::socket signal_pub, net::zmq::socket external_pub, rcontext rmq, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval, bool untrusted_daemon)
: comm(std::move(comm)) : comm(std::move(comm))
, signal_pub(std::move(signal_pub)) , signal_pub(std::move(signal_pub))
, external_pub(std::move(external_pub)) , external_pub(std::move(external_pub))
@ -207,8 +208,8 @@ namespace rpc
} }
zcontext comm; zcontext comm;
socket signal_pub; net::zmq::socket signal_pub;
socket external_pub; net::zmq::socket external_pub;
rcontext rmq; rcontext rmq;
const std::string daemon_addr; const std::string daemon_addr;
const std::string sub_addr; const std::string sub_addr;
@ -223,19 +224,15 @@ namespace rpc
}; };
} // detail } // detail
expect<void> client::get_response(cryptonote::rpc::Message& response, const std::chrono::seconds timeout, const source_location loc) expect<void> parse_response(cryptonote::rpc::Message& parser, std::string msg, source_location loc)
{ {
expect<std::string> message = get_message(timeout);
if (!message)
return message.error();
try try
{ {
cryptonote::rpc::FullMessage fm{std::move(*message)}; cryptonote::rpc::FullMessage fm{std::move(msg)};
const cryptonote::rpc::error json_error = fm.getError(); const cryptonote::rpc::error json_error = fm.getError();
if (!json_error.use) if (!json_error.use)
{ {
response.fromJson(fm.getMessage()); parser.fromJson(fm.getMessage());
return success(); return success();
} }
@ -249,6 +246,30 @@ namespace rpc
return {lws::error::bad_daemon_response}; return {lws::error::bad_daemon_response};
} }
expect<net::zmq::socket> client::make_daemon(const std::shared_ptr<detail::context>& ctx) noexcept
{
assert(ctx != nullptr);
net::zmq::socket daemon{zmq_socket(ctx->comm.get(), ZMQ_REQ)};
if (daemon.get() == nullptr)
return net::zmq::get_error_code();
MONERO_CHECK(do_set_option(daemon.get(), ZMQ_LINGER, daemon_zmq_linger));
if (ctx->untrusted_daemon)
MONERO_CHECK(do_set_option(daemon.get(), ZMQ_MAXMSGSIZE, max_msg_req));
MONERO_ZMQ_CHECK(zmq_connect(daemon.get(), ctx->daemon_addr.c_str()));
return daemon;
}
expect<void> client::get_response(cryptonote::rpc::Message& response, const std::chrono::seconds timeout, const source_location loc)
{
expect<std::string> message = get_message(timeout);
if (!message)
return message.error();
return parse_response(response, std::move(*message), loc);
}
expect<std::string> client::get_message(std::chrono::seconds timeout) expect<std::string> client::get_message(std::chrono::seconds timeout)
{ {
MONERO_PRECOND(ctx != nullptr); MONERO_PRECOND(ctx != nullptr);
@ -274,13 +295,10 @@ namespace rpc
client out{std::move(ctx)}; client out{std::move(ctx)};
out.daemon.reset(zmq_socket(out.ctx->comm.get(), ZMQ_REQ)); expect<net::zmq::socket> daemon = make_daemon(out.ctx);
if (out.daemon.get() == nullptr) if (!daemon)
return net::zmq::get_error_code(); return daemon.error();
MONERO_CHECK(do_set_option(out.daemon.get(), ZMQ_LINGER, daemon_zmq_linger)); out.daemon = std::move(*daemon);
if (out.ctx->untrusted_daemon)
MONERO_CHECK(do_set_option(out.daemon.get(), ZMQ_MAXMSGSIZE, max_msg_req));
MONERO_ZMQ_CHECK(zmq_connect(out.daemon.get(), out.ctx->daemon_addr.c_str()));
if (!out.ctx->sub_addr.empty()) if (!out.ctx->sub_addr.empty())
{ {
@ -381,6 +399,16 @@ namespace rpc
return {lws::error::bad_daemon_response}; return {lws::error::bad_daemon_response};
} }
expect<net::zmq::async_client> client::make_async_client(boost::asio::io_service& io) const
{
MONERO_PRECOND(ctx != nullptr);
expect<net::zmq::socket> daemon = make_daemon(ctx);
if (!daemon)
return daemon.error();
return net::zmq::async_client::make(io, std::move(*daemon));
}
expect<void> client::send(epee::byte_slice message, std::chrono::seconds timeout) noexcept expect<void> client::send(epee::byte_slice message, std::chrono::seconds timeout) noexcept
{ {
MONERO_PRECOND(ctx != nullptr); MONERO_PRECOND(ctx != nullptr);
@ -399,7 +427,7 @@ namespace rpc
return success(); return success();
} }
expect<void> client::publish(epee::byte_slice payload) expect<void> client::publish(epee::byte_slice payload) const
{ {
MONERO_PRECOND(ctx != nullptr); MONERO_PRECOND(ctx != nullptr);
assert(daemon != nullptr); assert(daemon != nullptr);
@ -451,16 +479,16 @@ namespace rpc
if (comm == nullptr) if (comm == nullptr)
MONERO_THROW(net::zmq::get_error_code(), "zmq_init"); MONERO_THROW(net::zmq::get_error_code(), "zmq_init");
detail::socket pub{zmq_socket(comm.get(), ZMQ_PUB)}; net::zmq::socket pub{zmq_socket(comm.get(), ZMQ_PUB)};
if (pub == nullptr) if (pub == nullptr)
MONERO_THROW(net::zmq::get_error_code(), "zmq_socket"); MONERO_THROW(net::zmq::get_error_code(), "zmq_socket");
if (zmq_bind(pub.get(), signal_endpoint) < 0) if (zmq_bind(pub.get(), signal_endpoint) < 0)
MONERO_THROW(net::zmq::get_error_code(), "zmq_bind"); MONERO_THROW(net::zmq::get_error_code(), "zmq_bind");
detail::socket external_pub = nullptr; net::zmq::socket external_pub = nullptr;
if (!pub_addr.empty()) if (!pub_addr.empty())
{ {
external_pub = detail::socket{zmq_socket(comm.get(), ZMQ_PUB)}; external_pub = net::zmq::socket{zmq_socket(comm.get(), ZMQ_PUB)};
if (external_pub == nullptr) if (external_pub == nullptr)
MONERO_THROW(net::zmq::get_error_code(), "zmq_socket"); MONERO_THROW(net::zmq::get_error_code(), "zmq_socket");
if (zmq_bind(external_pub.get(), pub_addr.c_str()) < 0) if (zmq_bind(external_pub.get(), pub_addr.c_str()) < 0)

View file

@ -26,6 +26,7 @@
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once #pragma once
#include <boost/asio/io_service.hpp>
#include <boost/optional/optional.hpp> #include <boost/optional/optional.hpp>
#include <chrono> #include <chrono>
#include <memory> #include <memory>
@ -36,27 +37,19 @@
#include "byte_slice.h" // monero/contrib/epee/include #include "byte_slice.h" // monero/contrib/epee/include
#include "db/fwd.h" #include "db/fwd.h"
#include "common/expect.h" // monero/src #include "common/expect.h" // monero/src
#include "net/zmq.h" // monero/src
#include "rpc/message.h" // monero/src #include "rpc/message.h" // monero/src
#include "rpc/daemon_pub.h" #include "rpc/daemon_pub.h"
#include "rpc/rates.h" #include "rpc/rates.h"
#include "util/source_location.h" #include "util/source_location.h"
namespace net { namespace zmq { struct async_client; }}
namespace lws namespace lws
{ {
namespace rpc namespace rpc
{ {
namespace detail namespace detail
{ {
struct close
{
void operator()(void* ptr) const noexcept
{
if (ptr)
zmq_close(ptr);
}
};
using socket = std::unique_ptr<void, close>;
struct context; struct context;
} }
@ -68,18 +61,23 @@ namespace rpc
std::string routing; std::string routing;
}; };
//! Abstraction for ZMQ RPC client. Only `get_rates()` thread-safe; use `clone()`. expect<void> parse_response(cryptonote::rpc::Message& parser, std::string msg, source_location loc = {});
//! Abstraction for ZMQ RPC client. All `const` and `static` methods are thread-safe.
class client class client
{ {
std::shared_ptr<detail::context> ctx; std::shared_ptr<detail::context> ctx;
detail::socket daemon; net::zmq::socket daemon;
detail::socket daemon_sub; net::zmq::socket daemon_sub;
detail::socket signal_sub; net::zmq::socket signal_sub;
explicit client(std::shared_ptr<detail::context> ctx) noexcept explicit client(std::shared_ptr<detail::context> ctx) noexcept
: ctx(std::move(ctx)), daemon(), daemon_sub(), signal_sub() : ctx(std::move(ctx)), daemon(), daemon_sub(), signal_sub()
{} {}
//! \return Connection to daemon REQ/REP.
static expect<net::zmq::socket> make_daemon(const std::shared_ptr<detail::context>& ctx) noexcept;
//! Expect `response` as the next message payload unless error. //! Expect `response` as the next message payload unless error.
expect<void> get_response(cryptonote::rpc::Message& response, std::chrono::seconds timeout, source_location loc); expect<void> get_response(cryptonote::rpc::Message& response, std::chrono::seconds timeout, source_location loc);
@ -140,6 +138,9 @@ namespace rpc
return cryptonote::rpc::FullMessage::getRequest(name, message, 0); return cryptonote::rpc::FullMessage::getRequest(name, message, 0);
} }
//! \return `async_client` to daemon. Thread safe.
expect<net::zmq::async_client> make_async_client(boost::asio::io_service& io) const;
/*! /*!
Queue `message` for sending to daemon. If the queue is full, wait a Queue `message` for sending to daemon. If the queue is full, wait a
maximum of `timeout` seconds or until `context::raise_abort_scan` or maximum of `timeout` seconds or until `context::raise_abort_scan` or
@ -148,11 +149,11 @@ namespace rpc
expect<void> send(epee::byte_slice message, std::chrono::seconds timeout) noexcept; expect<void> send(epee::byte_slice message, std::chrono::seconds timeout) noexcept;
//! Publish `payload` to ZMQ external pub socket. //! Publish `payload` to ZMQ external pub socket.
expect<void> publish(epee::byte_slice payload); expect<void> publish(epee::byte_slice payload) const;
//! Publish `data` after `topic` to ZMQ external pub socket. //! Publish `data` after `topic` to ZMQ external pub socket.
template<typename F, typename T> template<typename F, typename T>
expect<void> publish(const boost::string_ref topic, const T& data) expect<void> publish(const boost::string_ref topic, const T& data) const
{ {
epee::byte_stream bytes{}; epee::byte_stream bytes{};
bytes.write(topic.data(), topic.size()); bytes.write(topic.data(), topic.size());
@ -174,15 +175,8 @@ namespace rpc
return response; return response;
} }
//! Retrieve new accounts to be scanned on this thread. /*! Never blocks for I/O - that is performed on another thread.
expect<std::vector<lws::account>> pull_accounts(); \return Recent exchange rates. */
/*!
\note This is the one function that IS thread-safe. Multiple threads can
call this function with the same `this` argument.
\return Recent exchange rates.
*/
expect<rates> get_rates() const; expect<rates> get_rates() const;
}; };

View file

@ -34,6 +34,7 @@
#include <stdexcept> #include <stdexcept>
#include <type_traits> #include <type_traits>
#include "config.h"
#include "db/string.h" #include "db/string.h"
#include "error.h" #include "error.h"
#include "time_helper.h" // monero/contrib/epee/include #include "time_helper.h" // monero/contrib/epee/include
@ -198,6 +199,14 @@ namespace lws
namespace rpc namespace rpc
{ {
daemon_status_response::daemon_status_response()
: outgoing_connections_count(0),
incoming_connections_count(0),
height(0),
network(lws::rpc::network_type(lws::config::network)),
state(daemon_state::unavailable)
{}
namespace namespace
{ {
constexpr const char* map_daemon_state[] = {"ok", "no_connections", "synchronizing", "unavailable"}; constexpr const char* map_daemon_state[] = {"ok", "no_connections", "synchronizing", "unavailable"};

View file

@ -94,7 +94,9 @@ namespace rpc
struct daemon_status_response struct daemon_status_response
{ {
daemon_status_response() = delete; //! Defaults to current network in unavailable state
daemon_status_response();
std::uint64_t outgoing_connections_count; std::uint64_t outgoing_connections_count;
std::uint64_t incoming_connections_count; std::uint64_t incoming_connections_count;
std::uint64_t height; std::uint64_t height;

View file

@ -138,7 +138,7 @@ namespace lws { namespace rpc
} }
template<typename T> template<typename T>
void zmq_send(rpc::client& client, const epee::span<const T> events, const boost::string_ref json_topic, const boost::string_ref msgpack_topic) void zmq_send(const rpc::client& client, const epee::span<const T> events, const boost::string_ref json_topic, const boost::string_ref msgpack_topic)
{ {
// Each `T` should have a unique count. This is desired. // Each `T` should have a unique count. This is desired.
struct zmq_order struct zmq_order
@ -174,7 +174,7 @@ namespace lws { namespace rpc
template<typename T> template<typename T>
void send_webhook( void send_webhook(
rpc::client& client, const rpc::client& client,
const epee::span<const T> events, const epee::span<const T> events,
const boost::string_ref json_topic, const boost::string_ref json_topic,
const boost::string_ref msgpack_topic, const boost::string_ref msgpack_topic,

View file

@ -27,7 +27,7 @@
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
set(monero-lws-util_sources blocks.cpp gamma_picker.cpp random_outputs.cpp source_location.cpp transactions.cpp) set(monero-lws-util_sources blocks.cpp gamma_picker.cpp random_outputs.cpp source_location.cpp transactions.cpp)
set(monero-lws-util_headers blocks.h fwd.h gamma_picker.h http_server.h random_outputs.h source_location.h transactions.h) set(monero-lws-util_headers blocks.h fwd.h gamma_picker.h random_outputs.h source_location.h transactions.h)
add_library(monero-lws-util ${monero-lws-util_sources} ${monero-lws-util_headers}) add_library(monero-lws-util ${monero-lws-util_sources} ${monero-lws-util_headers})
target_link_libraries(monero-lws-util monero::libraries monero-lws-db) target_link_libraries(monero-lws-util monero::libraries monero-lws-db)

View file

@ -1,124 +0,0 @@
// Copyright (c) 2020, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <boost/bind/bind.hpp>
#include <boost/thread.hpp>
#include <boost/optional/optional.hpp>
#include "misc_log_ex.h"
#include "net/abstract_tcp_server2.h" // monero/contrib/epee/include
#include "net/http_protocol_handler.h" // monero/contrib/epee/include
#include "net/http_server_handlers_map2.h" // monero/contrib/epee/include
#undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "net.http"
namespace lws
{
template<class t_child_class, class t_connection_context = epee::net_utils::connection_context_base>
class http_server_impl_base: public epee::net_utils::http::i_http_server_handler<t_connection_context>
{
public:
http_server_impl_base()
: m_net_server(epee::net_utils::e_connection_type_RPC)
{}
explicit http_server_impl_base(boost::asio::io_service& external_io_service)
: m_net_server(external_io_service, epee::net_utils::e_connection_type_RPC)
{}
bool init(const std::string& bind_port, const std::string& bind_ip,
std::vector<std::string> access_control_origins, epee::net_utils::ssl_options_t ssl_options)
{
//set self as callback handler
m_net_server.get_config_object().m_phandler = static_cast<t_child_class*>(this);
//here set folder for hosting reqests
m_net_server.get_config_object().m_folder = "";
//set access control allow origins if configured
std::sort(access_control_origins.begin(), access_control_origins.end());
m_net_server.get_config_object().m_access_control_origins = std::move(access_control_origins);
MGINFO("Binding on " << bind_ip << " (IPv4):" << bind_port);
bool res = m_net_server.init_server(bind_port, bind_ip, bind_port, std::string{}, false, true, std::move(ssl_options));
if(!res)
{
LOG_ERROR("Failed to bind server");
return false;
}
return true;
}
bool run(size_t threads_count, bool wait = true)
{
//go to loop
MINFO("Run net_service loop( " << threads_count << " threads)...");
if(!m_net_server.run_server(threads_count, wait))
{
LOG_ERROR("Failed to run net tcp server!");
}
if(wait)
MINFO("net_service loop stopped.");
return true;
}
bool deinit()
{
return m_net_server.deinit_server();
}
bool timed_wait_server_stop(uint64_t ms)
{
return m_net_server.timed_wait_server_stop(ms);
}
bool send_stop_signal()
{
m_net_server.send_stop_signal();
return true;
}
int get_binded_port()
{
return m_net_server.get_binded_port();
}
long get_connections_count() const
{
return m_net_server.get_connections_count();
}
protected:
epee::net_utils::boosted_tcp_server<epee::net_utils::http::http_custom_handler<t_connection_context> > m_net_server;
};
} // lws