From 680c9ab304530b1f8bbc411954034a7470b2dde8 Mon Sep 17 00:00:00 2001 From: Lee *!* Clagett Date: Tue, 4 Jul 2023 10:31:29 -0400 Subject: [PATCH] :RMQ not yet working --- CMakeLists.txt | 13 ++++ src/error.cpp | 2 + src/error.h | 1 + src/rpc/CMakeLists.txt | 3 +- src/rpc/client.cpp | 144 ++++++++++++++++++++++++++++++++++++++++- src/rpc/client.h | 15 ++++- src/scanner.cpp | 40 ++++++++++++ src/server_main.cpp | 36 ++++++++++- 8 files changed, 248 insertions(+), 6 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1782191..711cebd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,6 +34,11 @@ set(CMAKE_CXX_STANDARD 14) set(CMAKE_CXX_STANDARD_REQUIRED ON) option(BUILD_TESTS "Build Tests" OFF) +option(WITH_RMQ "Build with RMQ publish support" OFF) +if (WITH_RMQ) + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DMLWS_RMQ_ENABLED") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DMLWS_RMQ_ENABLED") +endif() set(MONERO_LIBRARIES daemon_messages @@ -160,6 +165,13 @@ if (NOT (Boost_THREAD_LIBRARY STREQUAL monero_Boost_THREAD_LIBRARY_RELEASE)) message(FATAL_ERROR "Boost libraries for monero build differs from this project") endif() +if (WITH_RMQ) + find_path(RMQ_INCLUDE_DIR "amqp.h") + find_library(RMQ_LIBRARY rabbitmq REQUIRED) +else() + set(RMQ_INCLUDE_DIR "") + set(RMQ_LIBRARY "") +endif() set(LMDB_INCLUDE "${monero_LMDB_INCLUDE}") set(LMDB_LIB_PATH "monero::lmdb") @@ -212,6 +224,7 @@ if(APPLE) list(APPEND IMPORTED_MONERO_LIBRARIES ${IOKIT_LIBRARY}) endif() + add_library(monero::libraries INTERFACE IMPORTED) set_property(TARGET monero::libraries PROPERTY INTERFACE_INCLUDE_DIRECTORIES diff --git a/src/error.cpp b/src/error.cpp index a1d9ec2..d797555 100644 --- a/src/error.cpp +++ b/src/error.cpp @@ -97,6 +97,8 @@ namespace lws return "An unknown in-process message was received"; case error::system_clock_invalid_range: return "System clock is out of range for account storage format"; + case error::rmq_failure: + return "Failure within the RMQ library"; case error::tx_relay_failed: return "The daemon failed to relay transaction from REST client"; default: diff --git a/src/error.h b/src/error.h index 0bb0591..a50b81e 100644 --- a/src/error.h +++ b/src/error.h @@ -62,6 +62,7 @@ namespace lws signal_abort_scan, //!< In process ZMQ PUB to abort the scan was received signal_unknown, //!< An unknown in process ZMQ PUB was received system_clock_invalid_range, //!< System clock is out of range for storage format + rmq_failure, //!< Error within RMQ library tx_relay_failed //!< Daemon failed to relayed tx from REST client }; diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt index 1d55fd2..538363a 100644 --- a/src/rpc/CMakeLists.txt +++ b/src/rpc/CMakeLists.txt @@ -30,4 +30,5 @@ set(monero-lws-rpc_sources admin.cpp client.cpp daemon_pub.cpp daemon_zmq.cpp li set(monero-lws-rpc_headers admin.h client.h daemon_pub.h daemon_zmq.h fwd.h json.h light_wallet.h rates.h) add_library(monero-lws-rpc ${monero-lws-rpc_sources} ${monero-lws-rpc_headers}) -target_link_libraries(monero-lws-rpc monero::libraries monero-lws-wire-json monero-lws-wire-wrapper) +target_include_directories(monero-lws-rpc PRIVATE ${RMQ_INCLUDE_DIR}) +target_link_libraries(monero-lws-rpc monero::libraries monero-lws-wire-json monero-lws-wire-wrapper ${RMQ_LIBRARY}) diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index 14b5b41..fbb5d4c 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -27,6 +27,7 @@ #include "client.h" +#include #include #include #include @@ -38,6 +39,10 @@ #include "net/http_client.h" // monero/contrib/epee/include #include "net/zmq.h" // monero/src #include "serialization/json_object.h" // monero/src +#if MLWS_RMQ_ENABLED + #include + #include +#endif namespace lws { @@ -72,6 +77,38 @@ namespace rpc }; using zcontext = std::unique_ptr; +#ifdef MLWS_RMQ_ENABLED + constexpr const unsigned rmq_channel = 1; + struct rdestroy + { + void operator()(amqp_connection_state_t ptr) const noexcept + { + if (ptr) + { + amqp_channel_close(ptr, rmq_channel, AMQP_REPLY_SUCCESS); + amqp_connection_close(ptr, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(ptr); + } + } + }; + struct rcontext + { + using connection = std::unique_ptr; + + bool is_available() const noexcept { return conn != nullptr; } + + connection conn; + std::string exchange; + std::string routing; + }; +#else // !MLWS_RMQ_ENABLED + + struct rcontext + { + static constexpr bool is_available() noexcept { return false; } + }; +#endif + expect do_wait(void* daemon, void* signal_sub, short events, std::chrono::milliseconds timeout) noexcept { if (timeout <= std::chrono::seconds{0}) @@ -135,9 +172,11 @@ namespace rpc { struct context { - explicit context(zcontext comm, socket signal_pub, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval) + explicit context(zcontext comm, socket signal_pub, socket external_pub, rcontext rmq, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval) : comm(std::move(comm)) , signal_pub(std::move(signal_pub)) + , external_pub(std::move(external_pub)) + , rmq(std::move(rmq)) , daemon_addr(std::move(daemon_addr)) , sub_addr(std::move(sub_addr)) , rates_conn() @@ -152,6 +191,8 @@ namespace rpc zcontext comm; socket signal_pub; + socket external_pub; + rcontext rmq; const std::string daemon_addr; const std::string sub_addr; http::http_simple_client rates_conn; @@ -243,6 +284,11 @@ namespace rpc client::~client() noexcept {} + bool client::has_publish() const noexcept + { + return ctx && (ctx->external_pub || ctx->rmq.is_available()); + } + expect client::watch_scan_signals() noexcept { MONERO_PRECOND(ctx != nullptr); @@ -330,6 +376,39 @@ namespace rpc return success(); } + expect client::publish(epee::byte_slice payload) + { + MONERO_PRECOND(ctx != nullptr); + assert(daemon != nullptr); + if (ctx->external_pub == nullptr && !ctx->rmq.is_available()) + return success(); + + expect rc = success(); + const boost::unique_lock guard{ctx->sync_pub}; + if (ctx->external_pub) + rc = net::zmq::send(payload.clone(), ctx->external_pub.get(), 0); + +#ifdef MLWS_RMQ_ENABLED + if (ctx->rmq.is_available() && boost::algorithm::starts_with(payload, boost::string_ref{"json"})) + { + const auto topic = reinterpret_cast(std::memchr(payload.data(), ':', payload.size())); + if (topic && topic >= payload.data()) + payload.remove_prefix(topic - payload.data() + 1); + + amqp_bytes_t message{}; + message.len = payload.size(); + message.bytes = const_cast(payload.data()); + const int rmq_rc = amqp_basic_publish(ctx->rmq.conn.get(), rmq_channel, amqp_cstring_bytes(ctx->rmq.exchange.c_str()), amqp_cstring_bytes(ctx->rmq.routing.c_str()), 1, 1, nullptr, message); + if (rmq_rc != 0) + { + MERROR("Failed RMQ Publish with return code: " << rmq_rc); + return {error::rmq_failure}; + } + } +#endif + return rc; + } + expect client::get_rates() const { MONERO_PRECOND(ctx != nullptr); @@ -343,7 +422,7 @@ namespace rpc return ctx->cached; } - context context::make(std::string daemon_addr, std::string sub_addr, std::chrono::minutes rates_interval) + context context::make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, rmq_details rmq_info, std::chrono::minutes rates_interval) { zcontext comm{zmq_init(1)}; if (comm == nullptr) @@ -355,9 +434,68 @@ namespace rpc if (zmq_bind(pub.get(), signal_endpoint) < 0) MONERO_THROW(net::zmq::get_error_code(), "zmq_bind"); + detail::socket external_pub = nullptr; + if (!pub_addr.empty()) + { + external_pub = detail::socket{zmq_socket(comm.get(), ZMQ_PUB)}; + if (external_pub == nullptr) + MONERO_THROW(net::zmq::get_error_code(), "zmq_socket"); + if (zmq_bind(external_pub.get(), pub_addr.c_str()) < 0) + MONERO_THROW(net::zmq::get_error_code(), "zmq_bind"); + } + + rcontext rmq{}; +#ifdef MLWS_RMQ_ENABLED + if (!rmq_info.address.empty()) + { + rmq.exchange = std::move(rmq_info.exchange); + rmq.routing = std::move(rmq_info.routing); + epee::net_utils::http::url_content url{}; + if (!epee::net_utils::parse_url(rmq_info.address, url)) + MONERO_THROW(error::configuration, "Invalid URL spec given for RMQ"); + if (url.port == 0) + MONERO_THROW(error::configuration, "No port specified for RMQ"); + if (url.uri.empty()) + url.uri = "/"; + + std::string user; + std::string pass; + boost::regex expression{"^\\w+:\\w+$"}; + boost::smatch matcher; + if (boost::regex_search(url.host, matcher, expression)) + { + user = matcher[0]; + pass = matcher[1]; + } + + rmq.conn.reset(amqp_new_connection()); + const auto socket = amqp_tcp_socket_new(rmq.conn.get()); + if (!socket) + MONERO_THROW(error::configuration, "Unable to create RMQ socket"); + + int status = amqp_socket_open(socket, url.host.c_str(), url.port); + if (status != 0) + { + MERROR("Unable to open RMQ socket: " << status); + MONERO_THROW(error::rmq_failure, "Unable to open RMQ socket"); + } + if (!user.empty() || !pass.empty()) + { + if (amqp_login(rmq.conn.get(), url.uri.c_str(), 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user.c_str(), pass.c_str()).reply_type != AMQP_RESPONSE_NORMAL) + MONERO_THROW(error::rmq_failure, "Failure to login RMQ socket"); + } + if (amqp_channel_open(rmq.conn.get(), rmq_channel) != nullptr) + MONERO_THROW(error::rmq_failure, "Unabe to open RMQ channel"); + MINFO("Connected to RMQ server " << url.host << ":" << url.port); + } +#else // !MLWS_RMQ_ENABLED + if (!rmq_info.address.empty() || !rmq_info.exchange.empty() || !rmq_info.routing.empty()) + MONERO_THROW(error::configuration, "RabbitMQ support not enabled"); +#endif + return context{ std::make_shared( - std::move(comm), std::move(pub), std::move(daemon_addr), std::move(sub_addr), rates_interval + std::move(comm), std::move(pub), std::move(external_pub), std::move(rmq), std::move(daemon_addr), std::move(sub_addr), rates_interval ) }; } diff --git a/src/rpc/client.h b/src/rpc/client.h index 06db02f..48f49a0 100644 --- a/src/rpc/client.h +++ b/src/rpc/client.h @@ -59,6 +59,14 @@ namespace rpc struct context; } + struct rmq_details + { + std::string address; + std::string credentials; + std::string exchange; + std::string routing; + }; + //! Abstraction for ZMQ RPC client. Only `get_rates()` thread-safe; use `clone()`. class client { @@ -112,6 +120,9 @@ namespace rpc return ctx != nullptr; } + //! True if an external pub/sub was setup + bool has_publish() const noexcept; + //! `wait`, `send`, and `receive` will watch for `raise_abort_scan()`. expect watch_scan_signals() noexcept; @@ -171,10 +182,12 @@ namespace rpc \note All errors are exceptions; no recovery can occur. \param daemon_addr Location of ZMQ enabled `monerod` RPC. + \param pub_addr Bind location for publishing ZMQ events. + \param rmq_info Required information for RMQ publishing (if enabled) \param rates_interval Frequency to retrieve exchange rates. Set value to `<= 0` to disable exchange rate retrieval. */ - static context make(std::string daemon_addr, std::string sub_addr, std::chrono::minutes rates_interval); + static context make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, rmq_details rmq_info, std::chrono::minutes rates_interval); context(context&&) = default; context(context const&) = delete; diff --git a/src/scanner.cpp b/src/scanner.cpp index a8b4492..c40b3eb 100644 --- a/src/scanner.cpp +++ b/src/scanner.cpp @@ -243,6 +243,46 @@ namespace lws } } + struct zmq_index + { + const std::uint64_t index; + const epee::span events; + }; + + void write_bytes(wire::writer& dest, const zmq_index& self) + { + wire::object(dest, WIRE_FIELD(index), WIRE_FIELD(events)); + } + + void send_via_zmq(rpc::client& client, const epee::span events) + { + struct zmq_order + { + std::uint64_t current; + boost::mutex sync; + + zmq_order() + : current(0), sync() + {} + }; + + static zmq_order ordering{}; + + //! \TODO monitor XPUB to cull the serialization + if (!events.empty() && client.has_publish()) + { + // make sure the event is queued to zmq in order. + const boost::unique_lock guard{ordering.sync}; + const zmq_index index{ordering.current++, events}; + MINFO("Sending ZMQ/RMQ PUB topics json-full-hooks and msgpack-full-hooks"); + expect result = success(); + if (!(result = client.publish("json-full-hooks:", index))) + MERROR("Failed to serialize+send json-full-hooks: " << result.error().message()); + if (!(result = client.publish("msgpack-full-hooks:", index))) + MERROR("Failed to serialize+send msgpack-full-hooks: " << result.error().message()); + } + } + struct by_height { bool operator()(account const& left, account const& right) const noexcept diff --git a/src/server_main.cpp b/src/server_main.cpp index 02ba810..12255bf 100644 --- a/src/server_main.cpp +++ b/src/server_main.cpp @@ -56,6 +56,13 @@ namespace { const command_line::arg_descriptor daemon_rpc; const command_line::arg_descriptor daemon_sub; + const command_line::arg_descriptor zmq_pub; +#ifdef MLWS_RMQ_ENABLED + const command_line::arg_descriptor rmq_address; + const command_line::arg_descriptor rmq_credentials; + const command_line::arg_descriptor rmq_exchange; + const command_line::arg_descriptor rmq_routing; +#endif const command_line::arg_descriptor> rest_servers; const command_line::arg_descriptor> admin_rest_servers; const command_line::arg_descriptor rest_ssl_key; @@ -90,6 +97,13 @@ namespace : lws::options() , daemon_rpc{"daemon", "://
: of a monerod ZMQ RPC", get_default_zmq()} , daemon_sub{"sub", "tcp://address:port or ipc://path of a monerod ZMQ Pub", ""} + , zmq_pub{"zmq-pub", "tcp://address:port or ipc://path of a bind location for ZMQ pub events", ""} +#ifdef MLWS_RMQ_ENABLED + , rmq_address{"rmq-address", "tcp:///[vhost]"} + , rmq_credentials{"rmq-credentials", ":"} + , rmq_exchange{"rmq-exchange", "Name of the RMQ exchange"} + , rmq_routing{"rmq-routing", "Routing for the specified exchange"} +#endif , rest_servers{"rest-server", "[(https|http)://
:][/] for incoming connections, multiple declarations allowed"} , admin_rest_servers{"admin-rest-server", "[(https|http])://
:][/] for incoming admin connections, multiple declarations allowed"} , rest_ssl_key{"rest-ssl-key", " to PEM formatted SSL key for https REST server", ""} @@ -112,6 +126,13 @@ namespace lws::options::prepare(description); command_line::add_arg(description, daemon_rpc); command_line::add_arg(description, daemon_sub); + command_line::add_arg(description, zmq_pub); +#ifdef MLWS_RMQ_ENABLED + command_line::add_arg(description, rmq_address); + command_line::add_arg(description, rmq_credentials); + command_line::add_arg(description, rmq_exchange); + command_line::add_arg(description, rmq_routing); +#endif description.add_options()(rest_servers.name, boost::program_options::value>()->default_value({rest_default}, rest_default), rest_servers.description); command_line::add_arg(description, admin_rest_servers); command_line::add_arg(description, rest_ssl_key); @@ -136,6 +157,8 @@ namespace lws::rest_server::configuration rest_config; std::string daemon_rpc; std::string daemon_sub; + std::string zmq_pub; + lws::rpc::rmq_details rmq; std::string webhook_ssl_verification; std::chrono::minutes rates_interval; std::size_t scan_threads; @@ -189,6 +212,17 @@ namespace }, command_line::get_arg(args, opts.daemon_rpc), command_line::get_arg(args, opts.daemon_sub), + command_line::get_arg(args, opts.zmq_pub), +#ifdef MLWS_RMQ_ENABLED + lws::rpc::rmq_details{ + command_line::get_arg(args, opts.rmq_address), + command_line::get_arg(args, opts.rmq_credentials), + command_line::get_arg(args, opts.rmq_exchange), + command_line::get_arg(args, opts.rmq_routing) + }, +#else + lws::rpc::rmq_details{}, +#endif command_line::get_arg(args, opts.webhook_ssl_verification), std::chrono::minutes{command_line::get_arg(args, opts.rates_interval)}, command_line::get_arg(args, opts.scan_threads), @@ -210,7 +244,7 @@ namespace boost::filesystem::create_directories(prog.db_path); auto disk = lws::db::storage::open(prog.db_path.c_str(), prog.create_queue_max); - auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), prog.rates_interval); + auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), std::move(prog.zmq_pub), std::move(prog.rmq), prog.rates_interval); MINFO("Using monerod ZMQ RPC at " << ctx.daemon_address()); auto client = lws::scanner::sync(disk.clone(), ctx.connect().value()).value();