diff --git a/docs/administration.md b/docs/administration.md index 37a4275..7c6afbd 100644 --- a/docs/administration.md +++ b/docs/administration.md @@ -146,23 +146,27 @@ height. ### webhook_add This is used to track a specific payment ID to an address or all general payments to an address (where payment ID is zero). Using this endpint requires -a web address for callback purposes, a primary (not integrated!) address, and -finally the type ("tx-confirmation"). The event will remain in the database -until one of the delete commands ([webhook_delete_uuid](#webhook_delete_uuid) -or [webhook_delete](#webhook_delete)) is used to remove it. +a web address or `zmq` for callback purposes, a primary (not integrated!) +address, and finally the type ("tx-confirmation"). The event will remain in the +database until one of the delete commands ([webhook_delete_uuid](#webhook_delete_uuid) +or [webhook_delete](#webhook_delete)) is used to remove it. All webhooks are +published over the ZMQ socket specified by `--zmq-pub` (when enabled/specified +on command line) in addition to any HTTP server specified in the callback. > The provided URL will use SSL/TLS if `https://` is prefixed in the URL and -will use plaintext if `http://` is prefixed in the URL. SSL/TLS connections -will use the system certificate authority (root-CAs) by default, and will -ignore all authority checks if `--webhook-ssl-verification none` is provided -on the command line when starting `monero-lws-daemon`. The webhook will fail -if there is a mismatch of `http` and `https` between the two servers, and -will also fail if `https` verification is mismatched. The rule is: (1) if -the callback server has SSL/TLS disabled, the webhook should use `http://`, -(2) if the callback server has a self-signed certificate, `https://` and -`--webhook-ssl-verification none` should be used, and (3) if the callback -server is using "Let's Encrypt" (or similar), then `https://` with no -additional command line flag should be used. +will use plaintext if `http://` is prefixed in the URL. If `zmq` is provided +as the callback, notifications are performed _only_ over the ZMQ pub socket. +SSL/TLS connections will use the system certificate authority (root-CAs) by +default, and will ignore all authority checks if +`--webhook-ssl-verification none` is provided on the command line when +starting `monero-lws-daemon`. The webhook will fail if there is a mismatch of +`http` and `https` between the two servers, and will also fail if `https` +verification is mismatched. The rule is: (1) if the callback server has +SSL/TLS disabled, the webhook should use `http://`, (2) if the callback server +has a self-signed certificate, `https://` and `--webhook-ssl-verification none` +should be used, and (3) if the callback server is using "Let's Encrypt" +(or similar), then `https://` with no additional command line flag should be +used. #### Initial Request to server diff --git a/docs/zmq.md b/docs/zmq.md new file mode 100644 index 0000000..9a28481 --- /dev/null +++ b/docs/zmq.md @@ -0,0 +1,65 @@ +# monero-lws ZeroMQ Usage +Monero-lws uses ZeroMQ-RPC to retrieve information from a Monero daemon, +ZeroMQ-SUB to get immediate notifications of blocks and transactions from a +Monero daemon, and ZeroMQ-PUB to notify external applications of payment_id +(web)hooks. + +## External "pub" socket +The bind location of the ZMQ-PUB socket is specified with the `--zmq-pub` +option. Users are still required to "subscribe" to topics: + * `json-full-pyment_hook`: A JSON array of webhook payment events that have + recently triggered (identical output as webhook). + * `msgpack-full-payment_hook`: A msgpack array of webhook payment events that + have recently triggered (identical output as webhook). + + +### `json-full-payment_hook`/`msgpack-full-payment_hook` +These topics receive PUB messages when a webhook ([`webhook_add`](administration.md)), +event is triggered. If the specified URL is `zmq`, then notifications are only +done over the ZMQ-PUB socket, otherwise the notification is sent over ZMQ-PUB +socket AND the specified URL. Invoking `webhook_add` with a `payment_id` of +zeroes (the field is optional in `webhook_add), will match on all transactions +that lack a `payment_id`. + +Example of the "raw" output from ZMQ-SUB side: + +```json +json-full-payment_hook:{ + "index": 2, + "event": { + "event": "tx-confirmation", + "payment_id": "4f695d197f2a3c54", + "token": "single zmq wallet", + "confirmations": 1, + "event_id": "3894f98f5dd54af5857e4f8a961a4e57", + "tx_info": { + "id": { + "high": 0, + "low": 5666768 + }, + "block": 2265961, + "index": 1, + "amount": 3117324236131, + "timestamp": 1687301600, + "tx_hash": "ef3187775584351cc5109de124b877bcc530fb3fdbf77895329dd447902cc566", + "tx_prefix_hash": "064884b8a8f903edcfebab830707ed44b633438b47c95a83320f4438b1b28626", + "tx_public": "54dce1a6eebafa2fdedcea5e373ef9de1c3d2737ae9f809e80958d1ba4590d74", + "rct_mask": "4cdc4c4e340aacb4741ba20f9b0b859242ecdad2fcc251f71d81123a47db3400", + "payment_id": "4f695d197f2a3c54", + "unlock_time": 0, + "mixin_count": 15, + "coinbase": false + } + } +} + +``` + +Notice the `json-full-payment_hook:` prefix - this is required for the ZMQ PUB/SUB +subscription model. The subscriber requests data from a certain "topic" where +matching is done by string prefixes. + +> `index` is a counter used to detect dropped messages. + +> The `block` and `id` fields in the above example are NOT present when +`confirmations == 0`. diff --git a/src/db/account.cpp b/src/db/account.cpp index 40f9908..2c99894 100644 --- a/src/db/account.cpp +++ b/src/db/account.cpp @@ -177,3 +177,4 @@ namespace lws } } // lws + diff --git a/src/db/data.cpp b/src/db/data.cpp index 89a35e8..d0b54e1 100644 --- a/src/db/data.cpp +++ b/src/db/data.cpp @@ -263,7 +263,7 @@ namespace db map_webhook_value(dest, source, payment_id); } - void write_bytes(wire::json_writer& dest, const webhook_tx_confirmation& self) + void write_bytes(wire::writer& dest, const webhook_tx_confirmation& self) { crypto::hash8 payment_id; static_assert(sizeof(payment_id) == sizeof(self.value.first.payment_id), "bad memcpy"); diff --git a/src/db/data.h b/src/db/data.h index a8fdd76..45b2cde 100644 --- a/src/db/data.h +++ b/src/db/data.h @@ -299,7 +299,7 @@ namespace db webhook_value value; output tx_info; }; - void write_bytes(wire::json_writer&, const webhook_tx_confirmation&); + void write_bytes(wire::writer&, const webhook_tx_confirmation&); //! References a specific output that triggered a webhook struct webhook_output diff --git a/src/db/storage.cpp b/src/db/storage.cpp index 0e7e356..687da91 100644 --- a/src/db/storage.cpp +++ b/src/db/storage.cpp @@ -2192,6 +2192,7 @@ namespace db expect storage::add_webhook(const webhook_type type, const account_address& address, const webhook_value& event) { + if (event.second.url != "zmq") { epee::net_utils::http::url_content url{}; if (event.second.url.empty() || !epee::net_utils::parse_url(event.second.url, url)) diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index 6cf158a..4023faa 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -135,15 +135,17 @@ namespace rpc { struct context { - explicit context(zcontext comm, socket signal_pub, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval) + explicit context(zcontext comm, socket signal_pub, socket external_pub, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval) : comm(std::move(comm)) , signal_pub(std::move(signal_pub)) + , external_pub(std::move(external_pub)) , daemon_addr(std::move(daemon_addr)) , sub_addr(std::move(sub_addr)) , rates_conn() , cache_time() , cache_interval(interval) , cached{} + , sync_pub() , sync_rates() { if (std::chrono::minutes{0} < cache_interval) @@ -152,12 +154,14 @@ namespace rpc zcontext comm; socket signal_pub; + socket external_pub; const std::string daemon_addr; const std::string sub_addr; http::http_simple_client rates_conn; std::chrono::steady_clock::time_point cache_time; const std::chrono::minutes cache_interval; rates cached; + boost::mutex sync_pub; boost::mutex sync_rates; }; } // detail @@ -243,6 +247,11 @@ namespace rpc client::~client() noexcept {} + bool client::has_publish() const noexcept + { + return ctx && ctx->external_pub; + } + expect client::watch_scan_signals() noexcept { MONERO_PRECOND(ctx != nullptr); @@ -330,6 +339,17 @@ namespace rpc return success(); } + expect client::publish(epee::byte_slice payload) + { + MONERO_PRECOND(ctx != nullptr); + assert(daemon != nullptr); + if (ctx->external_pub == nullptr) + return success(); + + const boost::unique_lock guard{ctx->sync_pub}; + return net::zmq::send(std::move(payload), ctx->external_pub.get(), 0); + } + expect client::get_rates() const { MONERO_PRECOND(ctx != nullptr); @@ -343,7 +363,7 @@ namespace rpc return ctx->cached; } - context context::make(std::string daemon_addr, std::string sub_addr, std::chrono::minutes rates_interval) + context context::make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, std::chrono::minutes rates_interval) { zcontext comm{zmq_init(1)}; if (comm == nullptr) @@ -355,9 +375,19 @@ namespace rpc if (zmq_bind(pub.get(), signal_endpoint) < 0) MONERO_THROW(net::zmq::get_error_code(), "zmq_bind"); + detail::socket external_pub = nullptr; + if (!pub_addr.empty()) + { + external_pub = detail::socket{zmq_socket(comm.get(), ZMQ_PUB)}; + if (external_pub == nullptr) + MONERO_THROW(net::zmq::get_error_code(), "zmq_socket"); + if (zmq_bind(external_pub.get(), pub_addr.c_str()) < 0) + MONERO_THROW(net::zmq::get_error_code(), "zmq_bind"); + } + return context{ std::make_shared( - std::move(comm), std::move(pub), std::move(daemon_addr), std::move(sub_addr), rates_interval + std::move(comm), std::move(pub), std::move(external_pub), std::move(daemon_addr), std::move(sub_addr), rates_interval ) }; } diff --git a/src/rpc/client.h b/src/rpc/client.h index 06db02f..8de0185 100644 --- a/src/rpc/client.h +++ b/src/rpc/client.h @@ -38,6 +38,7 @@ #include "rpc/message.h" // monero/src #include "rpc/daemon_pub.h" #include "rpc/rates.h" +#include "span.h" // monero/contrib/epee/include #include "util/source_location.h" namespace lws @@ -112,6 +113,9 @@ namespace rpc return ctx != nullptr; } + //! \return True if an external pub/sub was setup + bool has_publish() const noexcept; + //! `wait`, `send`, and `receive` will watch for `raise_abort_scan()`. expect watch_scan_signals() noexcept; @@ -132,6 +136,21 @@ namespace rpc */ expect send(epee::byte_slice message, std::chrono::seconds timeout) noexcept; + //! Publish `payload` to ZMQ external pub socket. + expect publish(epee::byte_slice payload); + + //! Publish `data` after `topic` to ZMQ external pub socket. + template + expect publish(const boost::string_ref topic, const T& data) + { + epee::byte_stream bytes{}; + bytes.write(topic.data(), topic.size()); + const std::error_code err = F::to_bytes(bytes, data); + if (err) + return err; + return publish(epee::byte_slice{std::move(bytes)}); + } + //! \return Next available RPC message response from server expect get_message(std::chrono::seconds timeout); @@ -171,10 +190,11 @@ namespace rpc \note All errors are exceptions; no recovery can occur. \param daemon_addr Location of ZMQ enabled `monerod` RPC. + \param pub_addr Bind location for publishing ZMQ events. \param rates_interval Frequency to retrieve exchange rates. Set value to `<= 0` to disable exchange rate retrieval. */ - static context make(std::string daemon_addr, std::string sub_addr, std::chrono::minutes rates_interval); + static context make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, std::chrono::minutes rates_interval); context(context&&) = default; context(context const&) = delete; diff --git a/src/scanner.cpp b/src/scanner.cpp index 894bac4..c82a555 100644 --- a/src/scanner.cpp +++ b/src/scanner.cpp @@ -58,7 +58,9 @@ #include "rpc/json.h" #include "util/source_location.h" #include "util/transactions.h" +#include "wire/adapted/span.h" #include "wire/json.h" +#include "wire/msgpack.h" #include "serialization/json_object.h" @@ -211,6 +213,8 @@ namespace lws for (const db::webhook_tx_confirmation& event : events) { + if (event.value.second.url == "zmq") + continue; if (event.value.second.url.empty() || !net::parse_url(event.value.second.url, url)) { MERROR("Bad URL for webhook event: " << event.value.second.url); @@ -243,6 +247,50 @@ namespace lws } } + struct zmq_index_single + { + const std::uint64_t index; + const db::webhook_tx_confirmation& event; + }; + + void write_bytes(wire::writer& dest, const zmq_index_single& self) + { + wire::object(dest, WIRE_FIELD(index), WIRE_FIELD(event)); + } + + void send_via_zmq(rpc::client& client, const epee::span events) + { + struct zmq_order + { + std::uint64_t current; + boost::mutex sync; + + zmq_order() + : current(0), sync() + {} + }; + + static zmq_order ordering{}; + + //! \TODO monitor XPUB to cull the serialization + if (!events.empty() && client.has_publish()) + { + // make sure the event is queued to zmq in order. + const boost::unique_lock guard{ordering.sync}; + + for (const auto& event : events) + { + const zmq_index_single index{ordering.current++, event}; + MINFO("Sending ZMQ-PUB topics json-full-payment_hook and msgpack-full-payment_hook"); + expect result = success(); + if (!(result = client.publish("json-full-payment_hook:", index))) + MERROR("Failed to serialize+send json-full-payment_hook: " << result.error().message()); + if (!(result = client.publish("msgpack-full-payment_hook:", index))) + MERROR("Failed to serialize+send msgpack-full-payment_hook: " << result.error().message()); + } + } + } + struct by_height { bool operator()(account const& left, account const& right) const noexcept @@ -335,6 +383,7 @@ namespace lws events.pop_back(); //cannot compute tx_hash } send_via_http(epee::to_span(events), std::chrono::seconds{5}, verify_mode_); + send_via_zmq(client_, epee::to_span(events)); return true; } }; @@ -741,6 +790,7 @@ namespace lws MINFO("Processed " << blocks.size() << " block(s) against " << users.size() << " account(s)"); send_via_http(epee::to_span(updated->second), std::chrono::seconds{5}, webhook_verify); + send_via_zmq(client, epee::to_span(updated->second)); if (updated->first != users.size()) { MWARNING("Only updated " << updated->first << " account(s) out of " << users.size() << ", resetting"); diff --git a/src/server_main.cpp b/src/server_main.cpp index 02ba810..4eda9d0 100644 --- a/src/server_main.cpp +++ b/src/server_main.cpp @@ -56,6 +56,7 @@ namespace { const command_line::arg_descriptor daemon_rpc; const command_line::arg_descriptor daemon_sub; + const command_line::arg_descriptor zmq_pub; const command_line::arg_descriptor> rest_servers; const command_line::arg_descriptor> admin_rest_servers; const command_line::arg_descriptor rest_ssl_key; @@ -90,6 +91,7 @@ namespace : lws::options() , daemon_rpc{"daemon", "://
: of a monerod ZMQ RPC", get_default_zmq()} , daemon_sub{"sub", "tcp://address:port or ipc://path of a monerod ZMQ Pub", ""} + , zmq_pub{"zmq-pub", "tcp://address:port or ipc://path of a bind location for ZMQ pub events", ""} , rest_servers{"rest-server", "[(https|http)://
:][/] for incoming connections, multiple declarations allowed"} , admin_rest_servers{"admin-rest-server", "[(https|http])://
:][/] for incoming admin connections, multiple declarations allowed"} , rest_ssl_key{"rest-ssl-key", " to PEM formatted SSL key for https REST server", ""} @@ -112,6 +114,7 @@ namespace lws::options::prepare(description); command_line::add_arg(description, daemon_rpc); command_line::add_arg(description, daemon_sub); + command_line::add_arg(description, zmq_pub); description.add_options()(rest_servers.name, boost::program_options::value>()->default_value({rest_default}, rest_default), rest_servers.description); command_line::add_arg(description, admin_rest_servers); command_line::add_arg(description, rest_ssl_key); @@ -136,6 +139,7 @@ namespace lws::rest_server::configuration rest_config; std::string daemon_rpc; std::string daemon_sub; + std::string zmq_pub; std::string webhook_ssl_verification; std::chrono::minutes rates_interval; std::size_t scan_threads; @@ -189,6 +193,7 @@ namespace }, command_line::get_arg(args, opts.daemon_rpc), command_line::get_arg(args, opts.daemon_sub), + command_line::get_arg(args, opts.zmq_pub), command_line::get_arg(args, opts.webhook_ssl_verification), std::chrono::minutes{command_line::get_arg(args, opts.rates_interval)}, command_line::get_arg(args, opts.scan_threads), @@ -210,7 +215,7 @@ namespace boost::filesystem::create_directories(prog.db_path); auto disk = lws::db::storage::open(prog.db_path.c_str(), prog.create_queue_max); - auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), prog.rates_interval); + auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), std::move(prog.zmq_pub), prog.rates_interval); MINFO("Using monerod ZMQ RPC at " << ctx.daemon_address()); auto client = lws::scanner::sync(disk.clone(), ctx.connect().value()).value(); diff --git a/src/wire/adapted/span.h b/src/wire/adapted/span.h new file mode 100644 index 0000000..bfaa393 --- /dev/null +++ b/src/wire/adapted/span.h @@ -0,0 +1,40 @@ +// Copyright (c) 2023, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include "span.h" // monero/contrib/epee/include +#include "wire/traits.h" + +namespace wire +{ + //! Enable span types for array output + template + struct is_array> + : std::true_type + {}; +}