diff --git a/CMakeLists.txt b/CMakeLists.txt index 37272f1..041b4ce 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,6 +34,11 @@ set(CMAKE_CXX_STANDARD 14) set(CMAKE_CXX_STANDARD_REQUIRED ON) option(BUILD_TESTS "Build Tests" OFF) +option(WITH_RMQ "Build with RMQ publish support" OFF) +if (WITH_RMQ) + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DMLWS_RMQ_ENABLED") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DMLWS_RMQ_ENABLED") +endif() set(MONERO_LIBRARIES daemon_messages @@ -160,6 +165,13 @@ if (NOT (Boost_THREAD_LIBRARY STREQUAL monero_Boost_THREAD_LIBRARY_RELEASE)) message(FATAL_ERROR "Boost libraries for monero build differs from this project") endif() +if (WITH_RMQ) + find_path(RMQ_INCLUDE_DIR "amqp.h") + find_library(RMQ_LIBRARY rabbitmq REQUIRED) +else() + set(RMQ_INCLUDE_DIR "") + set(RMQ_LIBRARY "") +endif() set(LMDB_INCLUDE "${monero_LMDB_INCLUDE}") set(LMDB_LIB_PATH "monero::lmdb") diff --git a/docs/rmq.md b/docs/rmq.md new file mode 100644 index 0000000..66bbd14 --- /dev/null +++ b/docs/rmq.md @@ -0,0 +1,55 @@ +# monero-lws RabbitMQ Usage +Monero-lws uses RabbitMQ to provide JSON notifications of payment_id +(web)hooks. The feature is optional - by default LWS is _not_ compiled with +RabbitMQ support. The cmake option -DWITH_RMQ=ON must be specified during the +configuration stage to enable the feature. + +The notification location is specified with `--rmq-address`, the login details +are specified with `--rmq-credentials` (specified as `user:pass`), the exchange +is specified with `--rmq-exchange`, and routing name is specified with +`--rmq-routing`. + +## `json-full-payment_hook` +Only events of this type are sent to RabbitMQ. They are always "simulcast" with +the webhook URL. If the webhook URL is `zmq` then no simulcast occurs - the +event is only sent via ZeroMQ and/or RabbitMQ (if both are configured then it +sent to both, otherwise it is only sent to the one configured). + +Example of the "raw" output to RabbitMQ: + +```json +{ + "index": 2, + "event": { + "event": "tx-confirmation", + "payment_id": "4f695d197f2a3c54", + "token": "single zmq wallet", + "confirmations": 1, + "event_id": "3894f98f5dd54af5857e4f8a961a4e57", + "tx_info": { + "id": { + "high": 0, + "low": 5666768 + }, + "block": 2265961, + "index": 1, + "amount": 3117324236131, + "timestamp": 1687301600, + "tx_hash": "ef3187775584351cc5109de124b877bcc530fb3fdbf77895329dd447902cc566", + "tx_prefix_hash": "064884b8a8f903edcfebab830707ed44b633438b47c95a83320f4438b1b28626", + "tx_public": "54dce1a6eebafa2fdedcea5e373ef9de1c3d2737ae9f809e80958d1ba4590d74", + "rct_mask": "4cdc4c4e340aacb4741ba20f9b0b859242ecdad2fcc251f71d81123a47db3400", + "payment_id": "4f695d197f2a3c54", + "unlock_time": 0, + "mixin_count": 15, + "coinbase": false + } + } +} +``` +> `index` is a counter used to detect dropped messages. It is not useful to +RabbitMQ but is a carry-over from ZeroMQ (the same serialized message is used +to send to both). + +> The `block` and `id` fields in the above example are NOT present when +`confirmations == 0`. diff --git a/src/error.cpp b/src/error.cpp index 318614b..e5d4642 100644 --- a/src/error.cpp +++ b/src/error.cpp @@ -99,6 +99,8 @@ namespace lws return "An unknown in-process message was received"; case error::system_clock_invalid_range: return "System clock is out of range for account storage format"; + case error::rmq_failure: + return "Failure within the RMQ library"; case error::tx_relay_failed: return "The daemon failed to relay transaction from REST client"; default: diff --git a/src/error.h b/src/error.h index 45bb4b9..c243c28 100644 --- a/src/error.h +++ b/src/error.h @@ -63,6 +63,7 @@ namespace lws signal_abort_scan, //!< In process ZMQ PUB to abort the scan was received signal_unknown, //!< An unknown in process ZMQ PUB was received system_clock_invalid_range, //!< System clock is out of range for storage format + rmq_failure, //!< Error within RMQ library tx_relay_failed //!< Daemon failed to relayed tx from REST client }; diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt index 1d55fd2..538363a 100644 --- a/src/rpc/CMakeLists.txt +++ b/src/rpc/CMakeLists.txt @@ -30,4 +30,5 @@ set(monero-lws-rpc_sources admin.cpp client.cpp daemon_pub.cpp daemon_zmq.cpp li set(monero-lws-rpc_headers admin.h client.h daemon_pub.h daemon_zmq.h fwd.h json.h light_wallet.h rates.h) add_library(monero-lws-rpc ${monero-lws-rpc_sources} ${monero-lws-rpc_headers}) -target_link_libraries(monero-lws-rpc monero::libraries monero-lws-wire-json monero-lws-wire-wrapper) +target_include_directories(monero-lws-rpc PRIVATE ${RMQ_INCLUDE_DIR}) +target_link_libraries(monero-lws-rpc monero::libraries monero-lws-wire-json monero-lws-wire-wrapper ${RMQ_LIBRARY}) diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index 4023faa..db438a9 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -27,6 +27,7 @@ #include "client.h" +#include #include #include #include @@ -38,6 +39,10 @@ #include "net/http_client.h" // monero/contrib/epee/include #include "net/zmq.h" // monero/src #include "serialization/json_object.h" // monero/src +#if MLWS_RMQ_ENABLED + #include + #include +#endif namespace lws { @@ -72,6 +77,38 @@ namespace rpc }; using zcontext = std::unique_ptr; +#ifdef MLWS_RMQ_ENABLED + constexpr const unsigned rmq_channel = 1; + struct rdestroy + { + void operator()(amqp_connection_state_t ptr) const noexcept + { + if (ptr) + { + amqp_channel_close(ptr, rmq_channel, AMQP_REPLY_SUCCESS); + amqp_connection_close(ptr, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(ptr); + } + } + }; + struct rcontext + { + using connection = std::unique_ptr; + + bool is_available() const noexcept { return conn != nullptr; } + + connection conn; + std::string exchange; + std::string routing; + }; +#else // !MLWS_RMQ_ENABLED + + struct rcontext + { + static constexpr bool is_available() noexcept { return false; } + }; +#endif + expect do_wait(void* daemon, void* signal_sub, short events, std::chrono::milliseconds timeout) noexcept { if (timeout <= std::chrono::seconds{0}) @@ -135,10 +172,11 @@ namespace rpc { struct context { - explicit context(zcontext comm, socket signal_pub, socket external_pub, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval) + explicit context(zcontext comm, socket signal_pub, socket external_pub, rcontext rmq, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval) : comm(std::move(comm)) , signal_pub(std::move(signal_pub)) , external_pub(std::move(external_pub)) + , rmq(std::move(rmq)) , daemon_addr(std::move(daemon_addr)) , sub_addr(std::move(sub_addr)) , rates_conn() @@ -155,6 +193,7 @@ namespace rpc zcontext comm; socket signal_pub; socket external_pub; + rcontext rmq; const std::string daemon_addr; const std::string sub_addr; http::http_simple_client rates_conn; @@ -249,7 +288,7 @@ namespace rpc bool client::has_publish() const noexcept { - return ctx && ctx->external_pub; + return ctx && (ctx->external_pub || ctx->rmq.is_available()); } expect client::watch_scan_signals() noexcept @@ -343,11 +382,33 @@ namespace rpc { MONERO_PRECOND(ctx != nullptr); assert(daemon != nullptr); - if (ctx->external_pub == nullptr) + if (ctx->external_pub == nullptr && !ctx->rmq.is_available()) return success(); + expect rc = success(); const boost::unique_lock guard{ctx->sync_pub}; - return net::zmq::send(std::move(payload), ctx->external_pub.get(), 0); + if (ctx->external_pub) + rc = net::zmq::send(payload.clone(), ctx->external_pub.get(), 0); + +#ifdef MLWS_RMQ_ENABLED + if (ctx->rmq.is_available() && boost::algorithm::starts_with(payload, boost::string_ref{payment_topic_json()})) + { + const auto topic = reinterpret_cast(std::memchr(payload.data(), ':', payload.size())); + if (topic && topic >= payload.data()) + payload.remove_prefix(topic - payload.data() + 1); + + amqp_bytes_t message{}; + message.len = payload.size(); + message.bytes = const_cast(payload.data()); + const int rmq_rc = amqp_basic_publish(ctx->rmq.conn.get(), rmq_channel, amqp_cstring_bytes(ctx->rmq.exchange.c_str()), amqp_cstring_bytes(ctx->rmq.routing.c_str()), 0, 0, nullptr, message); + if (rmq_rc != 0) + { + MERROR("Failed RMQ Publish with return code: " << rmq_rc); + return {error::rmq_failure}; + } + } +#endif + return rc; } expect client::get_rates() const @@ -363,7 +424,7 @@ namespace rpc return ctx->cached; } - context context::make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, std::chrono::minutes rates_interval) + context context::make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, rmq_details rmq_info, std::chrono::minutes rates_interval) { zcontext comm{zmq_init(1)}; if (comm == nullptr) @@ -385,9 +446,63 @@ namespace rpc MONERO_THROW(net::zmq::get_error_code(), "zmq_bind"); } + rcontext rmq{}; +#ifdef MLWS_RMQ_ENABLED + if (!rmq_info.address.empty()) + { + rmq.exchange = std::move(rmq_info.exchange); + rmq.routing = std::move(rmq_info.routing); + epee::net_utils::http::url_content url{}; + if (!epee::net_utils::parse_url(rmq_info.address, url)) + MONERO_THROW(error::configuration, "Invalid URL spec given for RMQ"); + if (url.port == 0) + MONERO_THROW(error::configuration, "No port specified for RMQ"); + if (url.uri.empty()) + url.uri = "/"; + + std::string user; + std::string pass; + boost::regex expression{"(\\w+):(\\w+)"}; + boost::smatch matcher; + if (boost::regex_search(rmq_info.credentials, matcher, expression)) + { + user = matcher[1]; + pass = matcher[2]; + } + + rmq.conn.reset(amqp_new_connection()); + const auto socket = amqp_tcp_socket_new(rmq.conn.get()); + if (!socket) + MONERO_THROW(error::configuration, "Unable to create RMQ socket"); + + int status = amqp_socket_open(socket, url.host.c_str(), url.port); + if (status != 0) + { + MERROR("Unable to open RMQ socket: " << status); + MONERO_THROW(error::rmq_failure, "Unable to open RMQ socket"); + } + + if (!user.empty() || !pass.empty()) + { + if (amqp_login(rmq.conn.get(), url.uri.c_str(), 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user.c_str(), pass.c_str()).reply_type != AMQP_RESPONSE_NORMAL) + MONERO_THROW(error::rmq_failure, "Failure to login RMQ socket"); + } + if (amqp_channel_open(rmq.conn.get(), rmq_channel) == nullptr) + MONERO_THROW(error::rmq_failure, "Unabe to open RMQ channel"); + + if (amqp_get_rpc_reply(rmq.conn.get()).reply_type != AMQP_RESPONSE_NORMAL) + MONERO_THROW(error::rmq_failure, "Failed receiving channel open reply"); + + MINFO("Connected to RMQ server " << url.host << ":" << url.port); + } +#else // !MLWS_RMQ_ENABLED + if (!rmq_info.address.empty() || !rmq_info.exchange.empty() || !rmq_info.routing.empty() || !rmq_info.credentials.empty()) + MONERO_THROW(error::configuration, "RabbitMQ support not enabled"); +#endif + return context{ std::make_shared( - std::move(comm), std::move(pub), std::move(external_pub), std::move(daemon_addr), std::move(sub_addr), rates_interval + std::move(comm), std::move(pub), std::move(external_pub), std::move(rmq), std::move(daemon_addr), std::move(sub_addr), rates_interval ) }; } diff --git a/src/rpc/client.h b/src/rpc/client.h index 8de0185..8574e21 100644 --- a/src/rpc/client.h +++ b/src/rpc/client.h @@ -38,7 +38,6 @@ #include "rpc/message.h" // monero/src #include "rpc/daemon_pub.h" #include "rpc/rates.h" -#include "span.h" // monero/contrib/epee/include #include "util/source_location.h" namespace lws @@ -60,6 +59,14 @@ namespace rpc struct context; } + struct rmq_details + { + std::string address; + std::string credentials; + std::string exchange; + std::string routing; + }; + //! Abstraction for ZMQ RPC client. Only `get_rates()` thread-safe; use `clone()`. class client { @@ -77,6 +84,9 @@ namespace rpc public: + static constexpr const char* payment_topic_json() { return "json-full-payment_hook:"; } + static constexpr const char* payment_topic_msgpack() { return "msgpack-full-payment_hook:"; } + enum class topic : std::uint8_t { block = 0, txpool @@ -191,10 +201,11 @@ namespace rpc \param daemon_addr Location of ZMQ enabled `monerod` RPC. \param pub_addr Bind location for publishing ZMQ events. + \param rmq_info Required information for RMQ publishing (if enabled) \param rates_interval Frequency to retrieve exchange rates. Set value to `<= 0` to disable exchange rate retrieval. */ - static context make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, std::chrono::minutes rates_interval); + static context make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, rmq_details rmq_info, std::chrono::minutes rates_interval); context(context&&) = default; context(context const&) = delete; diff --git a/src/server_main.cpp b/src/server_main.cpp index 3fce76e..7348fe0 100644 --- a/src/server_main.cpp +++ b/src/server_main.cpp @@ -58,6 +58,12 @@ namespace const command_line::arg_descriptor daemon_rpc; const command_line::arg_descriptor daemon_sub; const command_line::arg_descriptor zmq_pub; +#ifdef MLWS_RMQ_ENABLED + const command_line::arg_descriptor rmq_address; + const command_line::arg_descriptor rmq_credentials; + const command_line::arg_descriptor rmq_exchange; + const command_line::arg_descriptor rmq_routing; +#endif const command_line::arg_descriptor> rest_servers; const command_line::arg_descriptor> admin_rest_servers; const command_line::arg_descriptor rest_ssl_key; @@ -94,6 +100,12 @@ namespace , daemon_rpc{"daemon", "://
: of a monerod ZMQ RPC", get_default_zmq()} , daemon_sub{"sub", "tcp://address:port or ipc://path of a monerod ZMQ Pub", ""} , zmq_pub{"zmq-pub", "tcp://address:port or ipc://path of a bind location for ZMQ pub events", ""} +#ifdef MLWS_RMQ_ENABLED + , rmq_address{"rmq-address", "tcp:///[vhost]"} + , rmq_credentials{"rmq-credentials", ":"} + , rmq_exchange{"rmq-exchange", "Name of the RMQ exchange"} + , rmq_routing{"rmq-routing", "Routing for the specified exchange"} +#endif , rest_servers{"rest-server", "[(https|http)://
:][/] for incoming connections, multiple declarations allowed"} , admin_rest_servers{"admin-rest-server", "[(https|http])://
:][/] for incoming admin connections, multiple declarations allowed"} , rest_ssl_key{"rest-ssl-key", " to PEM formatted SSL key for https REST server", ""} @@ -118,6 +130,12 @@ namespace command_line::add_arg(description, daemon_rpc); command_line::add_arg(description, daemon_sub); command_line::add_arg(description, zmq_pub); +#ifdef MLWS_RMQ_ENABLED + command_line::add_arg(description, rmq_address); + command_line::add_arg(description, rmq_credentials); + command_line::add_arg(description, rmq_exchange); + command_line::add_arg(description, rmq_routing); +#endif description.add_options()(rest_servers.name, boost::program_options::value>()->default_value({rest_default}, rest_default), rest_servers.description); command_line::add_arg(description, admin_rest_servers); command_line::add_arg(description, rest_ssl_key); @@ -144,6 +162,7 @@ namespace std::string daemon_rpc; std::string daemon_sub; std::string zmq_pub; + lws::rpc::rmq_details rmq; std::string webhook_ssl_verification; std::chrono::minutes rates_interval; std::size_t scan_threads; @@ -218,6 +237,16 @@ namespace command_line::get_arg(args, opts.daemon_rpc), command_line::get_arg(args, opts.daemon_sub), command_line::get_arg(args, opts.zmq_pub), +#ifdef MLWS_RMQ_ENABLED + lws::rpc::rmq_details{ + command_line::get_arg(args, opts.rmq_address), + command_line::get_arg(args, opts.rmq_credentials), + command_line::get_arg(args, opts.rmq_exchange), + command_line::get_arg(args, opts.rmq_routing) + }, +#else + lws::rpc::rmq_details{}, +#endif command_line::get_arg(args, opts.webhook_ssl_verification), std::chrono::minutes{command_line::get_arg(args, opts.rates_interval)}, command_line::get_arg(args, opts.scan_threads), @@ -239,7 +268,7 @@ namespace boost::filesystem::create_directories(prog.db_path); auto disk = lws::db::storage::open(prog.db_path.c_str(), prog.create_queue_max); - auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), std::move(prog.zmq_pub), prog.rates_interval); + auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), std::move(prog.zmq_pub), std::move(prog.rmq), prog.rates_interval); MINFO("Using monerod ZMQ RPC at " << ctx.daemon_address()); auto client = lws::scanner::sync(disk.clone(), ctx.connect().value()).value();