Add RabbitMQ support for payment hooks (#80)

This commit is contained in:
Lee *!* Clagett 2023-09-08 20:03:34 -04:00 committed by Lee *!* Clagett
parent aa171b77c3
commit 3b35ce2845
8 changed files with 236 additions and 10 deletions

View file

@ -34,6 +34,11 @@ set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_STANDARD_REQUIRED ON)
option(BUILD_TESTS "Build Tests" OFF) option(BUILD_TESTS "Build Tests" OFF)
option(WITH_RMQ "Build with RMQ publish support" OFF)
if (WITH_RMQ)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DMLWS_RMQ_ENABLED")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DMLWS_RMQ_ENABLED")
endif()
set(MONERO_LIBRARIES set(MONERO_LIBRARIES
daemon_messages daemon_messages
@ -160,6 +165,13 @@ if (NOT (Boost_THREAD_LIBRARY STREQUAL monero_Boost_THREAD_LIBRARY_RELEASE))
message(FATAL_ERROR "Boost libraries for monero build differs from this project") message(FATAL_ERROR "Boost libraries for monero build differs from this project")
endif() endif()
if (WITH_RMQ)
find_path(RMQ_INCLUDE_DIR "amqp.h")
find_library(RMQ_LIBRARY rabbitmq REQUIRED)
else()
set(RMQ_INCLUDE_DIR "")
set(RMQ_LIBRARY "")
endif()
set(LMDB_INCLUDE "${monero_LMDB_INCLUDE}") set(LMDB_INCLUDE "${monero_LMDB_INCLUDE}")
set(LMDB_LIB_PATH "monero::lmdb") set(LMDB_LIB_PATH "monero::lmdb")

55
docs/rmq.md Normal file
View file

@ -0,0 +1,55 @@
# monero-lws RabbitMQ Usage
Monero-lws uses RabbitMQ to provide JSON notifications of payment_id
(web)hooks. The feature is optional - by default LWS is _not_ compiled with
RabbitMQ support. The cmake option -DWITH_RMQ=ON must be specified during the
configuration stage to enable the feature.
The notification location is specified with `--rmq-address`, the login details
are specified with `--rmq-credentials` (specified as `user:pass`), the exchange
is specified with `--rmq-exchange`, and routing name is specified with
`--rmq-routing`.
## `json-full-payment_hook`
Only events of this type are sent to RabbitMQ. They are always "simulcast" with
the webhook URL. If the webhook URL is `zmq` then no simulcast occurs - the
event is only sent via ZeroMQ and/or RabbitMQ (if both are configured then it
sent to both, otherwise it is only sent to the one configured).
Example of the "raw" output to RabbitMQ:
```json
{
"index": 2,
"event": {
"event": "tx-confirmation",
"payment_id": "4f695d197f2a3c54",
"token": "single zmq wallet",
"confirmations": 1,
"event_id": "3894f98f5dd54af5857e4f8a961a4e57",
"tx_info": {
"id": {
"high": 0,
"low": 5666768
},
"block": 2265961,
"index": 1,
"amount": 3117324236131,
"timestamp": 1687301600,
"tx_hash": "ef3187775584351cc5109de124b877bcc530fb3fdbf77895329dd447902cc566",
"tx_prefix_hash": "064884b8a8f903edcfebab830707ed44b633438b47c95a83320f4438b1b28626",
"tx_public": "54dce1a6eebafa2fdedcea5e373ef9de1c3d2737ae9f809e80958d1ba4590d74",
"rct_mask": "4cdc4c4e340aacb4741ba20f9b0b859242ecdad2fcc251f71d81123a47db3400",
"payment_id": "4f695d197f2a3c54",
"unlock_time": 0,
"mixin_count": 15,
"coinbase": false
}
}
}
```
> `index` is a counter used to detect dropped messages. It is not useful to
RabbitMQ but is a carry-over from ZeroMQ (the same serialized message is used
to send to both).
> The `block` and `id` fields in the above example are NOT present when
`confirmations == 0`.

View file

@ -99,6 +99,8 @@ namespace lws
return "An unknown in-process message was received"; return "An unknown in-process message was received";
case error::system_clock_invalid_range: case error::system_clock_invalid_range:
return "System clock is out of range for account storage format"; return "System clock is out of range for account storage format";
case error::rmq_failure:
return "Failure within the RMQ library";
case error::tx_relay_failed: case error::tx_relay_failed:
return "The daemon failed to relay transaction from REST client"; return "The daemon failed to relay transaction from REST client";
default: default:

View file

@ -63,6 +63,7 @@ namespace lws
signal_abort_scan, //!< In process ZMQ PUB to abort the scan was received signal_abort_scan, //!< In process ZMQ PUB to abort the scan was received
signal_unknown, //!< An unknown in process ZMQ PUB was received signal_unknown, //!< An unknown in process ZMQ PUB was received
system_clock_invalid_range, //!< System clock is out of range for storage format system_clock_invalid_range, //!< System clock is out of range for storage format
rmq_failure, //!< Error within RMQ library
tx_relay_failed //!< Daemon failed to relayed tx from REST client tx_relay_failed //!< Daemon failed to relayed tx from REST client
}; };

View file

@ -30,4 +30,5 @@ set(monero-lws-rpc_sources admin.cpp client.cpp daemon_pub.cpp daemon_zmq.cpp li
set(monero-lws-rpc_headers admin.h client.h daemon_pub.h daemon_zmq.h fwd.h json.h light_wallet.h rates.h) set(monero-lws-rpc_headers admin.h client.h daemon_pub.h daemon_zmq.h fwd.h json.h light_wallet.h rates.h)
add_library(monero-lws-rpc ${monero-lws-rpc_sources} ${monero-lws-rpc_headers}) add_library(monero-lws-rpc ${monero-lws-rpc_sources} ${monero-lws-rpc_headers})
target_link_libraries(monero-lws-rpc monero::libraries monero-lws-wire-json monero-lws-wire-wrapper) target_include_directories(monero-lws-rpc PRIVATE ${RMQ_INCLUDE_DIR})
target_link_libraries(monero-lws-rpc monero::libraries monero-lws-wire-json monero-lws-wire-wrapper ${RMQ_LIBRARY})

View file

@ -27,6 +27,7 @@
#include "client.h" #include "client.h"
#include <boost/algorithm/string/predicate.hpp>
#include <boost/thread/mutex.hpp> #include <boost/thread/mutex.hpp>
#include <boost/utility/string_ref.hpp> #include <boost/utility/string_ref.hpp>
#include <cassert> #include <cassert>
@ -38,6 +39,10 @@
#include "net/http_client.h" // monero/contrib/epee/include #include "net/http_client.h" // monero/contrib/epee/include
#include "net/zmq.h" // monero/src #include "net/zmq.h" // monero/src
#include "serialization/json_object.h" // monero/src #include "serialization/json_object.h" // monero/src
#if MLWS_RMQ_ENABLED
#include <amqp.h>
#include <amqp_tcp_socket.h>
#endif
namespace lws namespace lws
{ {
@ -72,6 +77,38 @@ namespace rpc
}; };
using zcontext = std::unique_ptr<void, terminate>; using zcontext = std::unique_ptr<void, terminate>;
#ifdef MLWS_RMQ_ENABLED
constexpr const unsigned rmq_channel = 1;
struct rdestroy
{
void operator()(amqp_connection_state_t ptr) const noexcept
{
if (ptr)
{
amqp_channel_close(ptr, rmq_channel, AMQP_REPLY_SUCCESS);
amqp_connection_close(ptr, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(ptr);
}
}
};
struct rcontext
{
using connection = std::unique_ptr<amqp_connection_state_t_, rdestroy>;
bool is_available() const noexcept { return conn != nullptr; }
connection conn;
std::string exchange;
std::string routing;
};
#else // !MLWS_RMQ_ENABLED
struct rcontext
{
static constexpr bool is_available() noexcept { return false; }
};
#endif
expect<void> do_wait(void* daemon, void* signal_sub, short events, std::chrono::milliseconds timeout) noexcept expect<void> do_wait(void* daemon, void* signal_sub, short events, std::chrono::milliseconds timeout) noexcept
{ {
if (timeout <= std::chrono::seconds{0}) if (timeout <= std::chrono::seconds{0})
@ -135,10 +172,11 @@ namespace rpc
{ {
struct context struct context
{ {
explicit context(zcontext comm, socket signal_pub, socket external_pub, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval) explicit context(zcontext comm, socket signal_pub, socket external_pub, rcontext rmq, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval)
: comm(std::move(comm)) : comm(std::move(comm))
, signal_pub(std::move(signal_pub)) , signal_pub(std::move(signal_pub))
, external_pub(std::move(external_pub)) , external_pub(std::move(external_pub))
, rmq(std::move(rmq))
, daemon_addr(std::move(daemon_addr)) , daemon_addr(std::move(daemon_addr))
, sub_addr(std::move(sub_addr)) , sub_addr(std::move(sub_addr))
, rates_conn() , rates_conn()
@ -155,6 +193,7 @@ namespace rpc
zcontext comm; zcontext comm;
socket signal_pub; socket signal_pub;
socket external_pub; socket external_pub;
rcontext rmq;
const std::string daemon_addr; const std::string daemon_addr;
const std::string sub_addr; const std::string sub_addr;
http::http_simple_client rates_conn; http::http_simple_client rates_conn;
@ -249,7 +288,7 @@ namespace rpc
bool client::has_publish() const noexcept bool client::has_publish() const noexcept
{ {
return ctx && ctx->external_pub; return ctx && (ctx->external_pub || ctx->rmq.is_available());
} }
expect<void> client::watch_scan_signals() noexcept expect<void> client::watch_scan_signals() noexcept
@ -343,11 +382,33 @@ namespace rpc
{ {
MONERO_PRECOND(ctx != nullptr); MONERO_PRECOND(ctx != nullptr);
assert(daemon != nullptr); assert(daemon != nullptr);
if (ctx->external_pub == nullptr) if (ctx->external_pub == nullptr && !ctx->rmq.is_available())
return success(); return success();
expect<void> rc = success();
const boost::unique_lock<boost::mutex> guard{ctx->sync_pub}; const boost::unique_lock<boost::mutex> guard{ctx->sync_pub};
return net::zmq::send(std::move(payload), ctx->external_pub.get(), 0); if (ctx->external_pub)
rc = net::zmq::send(payload.clone(), ctx->external_pub.get(), 0);
#ifdef MLWS_RMQ_ENABLED
if (ctx->rmq.is_available() && boost::algorithm::starts_with(payload, boost::string_ref{payment_topic_json()}))
{
const auto topic = reinterpret_cast<const std::uint8_t*>(std::memchr(payload.data(), ':', payload.size()));
if (topic && topic >= payload.data())
payload.remove_prefix(topic - payload.data() + 1);
amqp_bytes_t message{};
message.len = payload.size();
message.bytes = const_cast<std::uint8_t*>(payload.data());
const int rmq_rc = amqp_basic_publish(ctx->rmq.conn.get(), rmq_channel, amqp_cstring_bytes(ctx->rmq.exchange.c_str()), amqp_cstring_bytes(ctx->rmq.routing.c_str()), 0, 0, nullptr, message);
if (rmq_rc != 0)
{
MERROR("Failed RMQ Publish with return code: " << rmq_rc);
return {error::rmq_failure};
}
}
#endif
return rc;
} }
expect<rates> client::get_rates() const expect<rates> client::get_rates() const
@ -363,7 +424,7 @@ namespace rpc
return ctx->cached; return ctx->cached;
} }
context context::make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, std::chrono::minutes rates_interval) context context::make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, rmq_details rmq_info, std::chrono::minutes rates_interval)
{ {
zcontext comm{zmq_init(1)}; zcontext comm{zmq_init(1)};
if (comm == nullptr) if (comm == nullptr)
@ -385,9 +446,63 @@ namespace rpc
MONERO_THROW(net::zmq::get_error_code(), "zmq_bind"); MONERO_THROW(net::zmq::get_error_code(), "zmq_bind");
} }
rcontext rmq{};
#ifdef MLWS_RMQ_ENABLED
if (!rmq_info.address.empty())
{
rmq.exchange = std::move(rmq_info.exchange);
rmq.routing = std::move(rmq_info.routing);
epee::net_utils::http::url_content url{};
if (!epee::net_utils::parse_url(rmq_info.address, url))
MONERO_THROW(error::configuration, "Invalid URL spec given for RMQ");
if (url.port == 0)
MONERO_THROW(error::configuration, "No port specified for RMQ");
if (url.uri.empty())
url.uri = "/";
std::string user;
std::string pass;
boost::regex expression{"(\\w+):(\\w+)"};
boost::smatch matcher;
if (boost::regex_search(rmq_info.credentials, matcher, expression))
{
user = matcher[1];
pass = matcher[2];
}
rmq.conn.reset(amqp_new_connection());
const auto socket = amqp_tcp_socket_new(rmq.conn.get());
if (!socket)
MONERO_THROW(error::configuration, "Unable to create RMQ socket");
int status = amqp_socket_open(socket, url.host.c_str(), url.port);
if (status != 0)
{
MERROR("Unable to open RMQ socket: " << status);
MONERO_THROW(error::rmq_failure, "Unable to open RMQ socket");
}
if (!user.empty() || !pass.empty())
{
if (amqp_login(rmq.conn.get(), url.uri.c_str(), 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user.c_str(), pass.c_str()).reply_type != AMQP_RESPONSE_NORMAL)
MONERO_THROW(error::rmq_failure, "Failure to login RMQ socket");
}
if (amqp_channel_open(rmq.conn.get(), rmq_channel) == nullptr)
MONERO_THROW(error::rmq_failure, "Unabe to open RMQ channel");
if (amqp_get_rpc_reply(rmq.conn.get()).reply_type != AMQP_RESPONSE_NORMAL)
MONERO_THROW(error::rmq_failure, "Failed receiving channel open reply");
MINFO("Connected to RMQ server " << url.host << ":" << url.port);
}
#else // !MLWS_RMQ_ENABLED
if (!rmq_info.address.empty() || !rmq_info.exchange.empty() || !rmq_info.routing.empty() || !rmq_info.credentials.empty())
MONERO_THROW(error::configuration, "RabbitMQ support not enabled");
#endif
return context{ return context{
std::make_shared<detail::context>( std::make_shared<detail::context>(
std::move(comm), std::move(pub), std::move(external_pub), std::move(daemon_addr), std::move(sub_addr), rates_interval std::move(comm), std::move(pub), std::move(external_pub), std::move(rmq), std::move(daemon_addr), std::move(sub_addr), rates_interval
) )
}; };
} }

View file

@ -38,7 +38,6 @@
#include "rpc/message.h" // monero/src #include "rpc/message.h" // monero/src
#include "rpc/daemon_pub.h" #include "rpc/daemon_pub.h"
#include "rpc/rates.h" #include "rpc/rates.h"
#include "span.h" // monero/contrib/epee/include
#include "util/source_location.h" #include "util/source_location.h"
namespace lws namespace lws
@ -60,6 +59,14 @@ namespace rpc
struct context; struct context;
} }
struct rmq_details
{
std::string address;
std::string credentials;
std::string exchange;
std::string routing;
};
//! Abstraction for ZMQ RPC client. Only `get_rates()` thread-safe; use `clone()`. //! Abstraction for ZMQ RPC client. Only `get_rates()` thread-safe; use `clone()`.
class client class client
{ {
@ -77,6 +84,9 @@ namespace rpc
public: public:
static constexpr const char* payment_topic_json() { return "json-full-payment_hook:"; }
static constexpr const char* payment_topic_msgpack() { return "msgpack-full-payment_hook:"; }
enum class topic : std::uint8_t enum class topic : std::uint8_t
{ {
block = 0, txpool block = 0, txpool
@ -191,10 +201,11 @@ namespace rpc
\param daemon_addr Location of ZMQ enabled `monerod` RPC. \param daemon_addr Location of ZMQ enabled `monerod` RPC.
\param pub_addr Bind location for publishing ZMQ events. \param pub_addr Bind location for publishing ZMQ events.
\param rmq_info Required information for RMQ publishing (if enabled)
\param rates_interval Frequency to retrieve exchange rates. Set value to \param rates_interval Frequency to retrieve exchange rates. Set value to
`<= 0` to disable exchange rate retrieval. `<= 0` to disable exchange rate retrieval.
*/ */
static context make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, std::chrono::minutes rates_interval); static context make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, rmq_details rmq_info, std::chrono::minutes rates_interval);
context(context&&) = default; context(context&&) = default;
context(context const&) = delete; context(context const&) = delete;

View file

@ -58,6 +58,12 @@ namespace
const command_line::arg_descriptor<std::string> daemon_rpc; const command_line::arg_descriptor<std::string> daemon_rpc;
const command_line::arg_descriptor<std::string> daemon_sub; const command_line::arg_descriptor<std::string> daemon_sub;
const command_line::arg_descriptor<std::string> zmq_pub; const command_line::arg_descriptor<std::string> zmq_pub;
#ifdef MLWS_RMQ_ENABLED
const command_line::arg_descriptor<std::string> rmq_address;
const command_line::arg_descriptor<std::string> rmq_credentials;
const command_line::arg_descriptor<std::string> rmq_exchange;
const command_line::arg_descriptor<std::string> rmq_routing;
#endif
const command_line::arg_descriptor<std::vector<std::string>> rest_servers; const command_line::arg_descriptor<std::vector<std::string>> rest_servers;
const command_line::arg_descriptor<std::vector<std::string>> admin_rest_servers; const command_line::arg_descriptor<std::vector<std::string>> admin_rest_servers;
const command_line::arg_descriptor<std::string> rest_ssl_key; const command_line::arg_descriptor<std::string> rest_ssl_key;
@ -94,6 +100,12 @@ namespace
, daemon_rpc{"daemon", "<protocol>://<address>:<port> of a monerod ZMQ RPC", get_default_zmq()} , daemon_rpc{"daemon", "<protocol>://<address>:<port> of a monerod ZMQ RPC", get_default_zmq()}
, daemon_sub{"sub", "tcp://address:port or ipc://path of a monerod ZMQ Pub", ""} , daemon_sub{"sub", "tcp://address:port or ipc://path of a monerod ZMQ Pub", ""}
, zmq_pub{"zmq-pub", "tcp://address:port or ipc://path of a bind location for ZMQ pub events", ""} , zmq_pub{"zmq-pub", "tcp://address:port or ipc://path of a bind location for ZMQ pub events", ""}
#ifdef MLWS_RMQ_ENABLED
, rmq_address{"rmq-address", "tcp://<host>/[vhost]"}
, rmq_credentials{"rmq-credentials", "<user>:<pass>"}
, rmq_exchange{"rmq-exchange", "Name of the RMQ exchange"}
, rmq_routing{"rmq-routing", "Routing for the specified exchange"}
#endif
, rest_servers{"rest-server", "[(https|http)://<address>:]<port>[/<prefix>] for incoming connections, multiple declarations allowed"} , rest_servers{"rest-server", "[(https|http)://<address>:]<port>[/<prefix>] for incoming connections, multiple declarations allowed"}
, admin_rest_servers{"admin-rest-server", "[(https|http])://<address>:]<port>[/<prefix>] for incoming admin connections, multiple declarations allowed"} , admin_rest_servers{"admin-rest-server", "[(https|http])://<address>:]<port>[/<prefix>] for incoming admin connections, multiple declarations allowed"}
, rest_ssl_key{"rest-ssl-key", "<path> to PEM formatted SSL key for https REST server", ""} , rest_ssl_key{"rest-ssl-key", "<path> to PEM formatted SSL key for https REST server", ""}
@ -118,6 +130,12 @@ namespace
command_line::add_arg(description, daemon_rpc); command_line::add_arg(description, daemon_rpc);
command_line::add_arg(description, daemon_sub); command_line::add_arg(description, daemon_sub);
command_line::add_arg(description, zmq_pub); command_line::add_arg(description, zmq_pub);
#ifdef MLWS_RMQ_ENABLED
command_line::add_arg(description, rmq_address);
command_line::add_arg(description, rmq_credentials);
command_line::add_arg(description, rmq_exchange);
command_line::add_arg(description, rmq_routing);
#endif
description.add_options()(rest_servers.name, boost::program_options::value<std::vector<std::string>>()->default_value({rest_default}, rest_default), rest_servers.description); description.add_options()(rest_servers.name, boost::program_options::value<std::vector<std::string>>()->default_value({rest_default}, rest_default), rest_servers.description);
command_line::add_arg(description, admin_rest_servers); command_line::add_arg(description, admin_rest_servers);
command_line::add_arg(description, rest_ssl_key); command_line::add_arg(description, rest_ssl_key);
@ -144,6 +162,7 @@ namespace
std::string daemon_rpc; std::string daemon_rpc;
std::string daemon_sub; std::string daemon_sub;
std::string zmq_pub; std::string zmq_pub;
lws::rpc::rmq_details rmq;
std::string webhook_ssl_verification; std::string webhook_ssl_verification;
std::chrono::minutes rates_interval; std::chrono::minutes rates_interval;
std::size_t scan_threads; std::size_t scan_threads;
@ -218,6 +237,16 @@ namespace
command_line::get_arg(args, opts.daemon_rpc), command_line::get_arg(args, opts.daemon_rpc),
command_line::get_arg(args, opts.daemon_sub), command_line::get_arg(args, opts.daemon_sub),
command_line::get_arg(args, opts.zmq_pub), command_line::get_arg(args, opts.zmq_pub),
#ifdef MLWS_RMQ_ENABLED
lws::rpc::rmq_details{
command_line::get_arg(args, opts.rmq_address),
command_line::get_arg(args, opts.rmq_credentials),
command_line::get_arg(args, opts.rmq_exchange),
command_line::get_arg(args, opts.rmq_routing)
},
#else
lws::rpc::rmq_details{},
#endif
command_line::get_arg(args, opts.webhook_ssl_verification), command_line::get_arg(args, opts.webhook_ssl_verification),
std::chrono::minutes{command_line::get_arg(args, opts.rates_interval)}, std::chrono::minutes{command_line::get_arg(args, opts.rates_interval)},
command_line::get_arg(args, opts.scan_threads), command_line::get_arg(args, opts.scan_threads),
@ -239,7 +268,7 @@ namespace
boost::filesystem::create_directories(prog.db_path); boost::filesystem::create_directories(prog.db_path);
auto disk = lws::db::storage::open(prog.db_path.c_str(), prog.create_queue_max); auto disk = lws::db::storage::open(prog.db_path.c_str(), prog.create_queue_max);
auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), std::move(prog.zmq_pub), prog.rates_interval); auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), std::move(prog.zmq_pub), std::move(prog.rmq), prog.rates_interval);
MINFO("Using monerod ZMQ RPC at " << ctx.daemon_address()); MINFO("Using monerod ZMQ RPC at " << ctx.daemon_address());
auto client = lws::scanner::sync(disk.clone(), ctx.connect().value()).value(); auto client = lws::scanner::sync(disk.clone(), ctx.connect().value()).value();