From c982f3241581aaf78220119f18f04a3032fb0b28 Mon Sep 17 00:00:00 2001
From: Lee *!* Clagett <code@leeclagett.com>
Date: Tue, 4 Jul 2023 19:50:31 -0400
Subject: [PATCH] Add (build-time optional) support for RabbitMQ

---
 docs/administration.md  | 34 ++++++++-------
 docs/zmq.md             | 91 +++++++++++++++++++++++++++++++++++++++++
 src/db/account.cpp      |  1 +
 src/db/data.cpp         |  2 +-
 src/db/data.h           |  2 +-
 src/db/storage.cpp      |  1 +
 src/rpc/client.cpp      | 21 ++++++----
 src/rpc/client.h        | 17 +++++++-
 src/scanner.cpp         |  6 +++
 src/wire/adapted/span.h | 40 ++++++++++++++++++
 10 files changed, 190 insertions(+), 25 deletions(-)
 create mode 100644 docs/zmq.md
 create mode 100644 src/wire/adapted/span.h

diff --git a/docs/administration.md b/docs/administration.md
index 37a4275..7c6afbd 100644
--- a/docs/administration.md
+++ b/docs/administration.md
@@ -146,23 +146,27 @@ height.
 ### webhook_add
 This is used to track a specific payment ID to an address or all general
 payments to an address (where payment ID is zero). Using this endpint requires
-a web address for callback purposes, a primary (not integrated!) address, and
-finally the type ("tx-confirmation"). The event will remain in the database
-until one of the delete commands ([webhook_delete_uuid](#webhook_delete_uuid)
-or [webhook_delete](#webhook_delete)) is used to remove it.
+a web address or `zmq` for callback purposes, a primary (not integrated!)
+address, and finally the type ("tx-confirmation"). The event will remain in the
+database until one of the delete commands ([webhook_delete_uuid](#webhook_delete_uuid)
+or [webhook_delete](#webhook_delete)) is used to remove it. All webhooks are
+published over the ZMQ socket specified by `--zmq-pub` (when enabled/specified
+on command line) in addition to any HTTP server specified in the callback.
 
 > The provided URL will use SSL/TLS if `https://` is prefixed in the URL and
-will use plaintext if `http://` is prefixed in the URL. SSL/TLS connections
-will use the system certificate authority (root-CAs) by default, and will
-ignore all authority checks if `--webhook-ssl-verification none` is provided
-on the command line when starting `monero-lws-daemon`. The webhook will fail
-if there is a mismatch of `http` and `https` between the two servers, and
-will also fail if `https` verification is mismatched. The rule is: (1) if
-the callback server has SSL/TLS disabled, the webhook should use `http://`,
-(2) if the callback server has a self-signed certificate, `https://` and
-`--webhook-ssl-verification none` should be used, and (3) if the callback
-server is using "Let's Encrypt" (or similar), then `https://` with no
-additional command line flag should be used.
+will use plaintext if `http://` is prefixed in the URL. If `zmq` is provided
+as the callback, notifications are performed _only_ over the ZMQ pub socket.
+SSL/TLS connections will use the system certificate authority (root-CAs) by
+default, and will ignore all authority checks if
+`--webhook-ssl-verification none` is provided on the command line when
+starting `monero-lws-daemon`. The webhook will fail if there is a mismatch of
+`http` and `https` between the two servers, and will also fail if `https`
+verification is mismatched. The rule is: (1) if the callback server has
+SSL/TLS disabled, the webhook should use `http://`, (2) if the callback server
+has a self-signed certificate, `https://` and `--webhook-ssl-verification none`
+should be used, and (3) if the callback server is using "Let's Encrypt"
+(or similar), then `https://` with no additional command line flag should be
+used.
 
 
 #### Initial Request to server
diff --git a/docs/zmq.md b/docs/zmq.md
new file mode 100644
index 0000000..613e5c1
--- /dev/null
+++ b/docs/zmq.md
@@ -0,0 +1,91 @@
+# monero-lws ZeroMQ Usage
+Monero-lws uses ZeroMQ-RPC to retrieve information from a Monero daemon,
+ZeroMQ-SUB to get immediate notifications of blocks and transactions from a
+Monero daemon, and ZeroMQ-PUB to notify external applications of payment_id
+(web)hooks.
+
+## External "pub" socket
+The bind location of the ZMQ-PUB socket is specified with the `--zmq-pub`
+option. Users are still required to "subscribe" to topics:
+  * `json-full-hooks`: A JSON array of webhook events that have recently
+    triggered (identical output as webhook).
+  * `msgpack-full-hooks`: A msgpack array of webhook events that have recently
+    triggered (identical output as webhook).
+
+
+### `json-full-hooks`/`msgpack-full-hooks`i
+These topics receive PUB messages when a webhook ([`webhook_add`](administration.md)),
+event is triggered. If the specified URL is `zmq`, then notifications are only
+done over the ZMQ-PUB socket, otherwise the notification is sent over ZMQ-PUB
+socket AND the specified URL. Invoking `webhook_add` with a `payment_id` of
+zeroes (the field is optional in `webhook_add), will match on all transactions
+that lack a payment_id`.
+
+Example of the "raw" output from ZMQ-SUB side:
+
+```json
+json-full-hooks:{
+  "index": 2,
+  "events": [
+    {
+      "event": "tx-confirmation",
+      "payment_id": "4f695d197f2a3c54",
+      "token": "single zmq wallet",
+      "confirmations": 1,
+      "event_id": "3894f98f5dd54af5857e4f8a961a4e57",
+      "tx_info": {
+        "id": {
+          "high": 0,
+          "low": 5666767
+        },
+        "block": 2265961,
+        "index": 0,
+        "amount": 100000000000,
+        "timestamp": 1687301600,
+        "tx_hash": "ef3187775584351cc5109de124b877bcc530fb3fdbf77895329dd447902cc566",
+        "tx_prefix_hash": "064884b8a8f903edcfebab830707ed44b633438b47c95a83320f4438b1b28626",
+        "tx_public": "54dce1a6eebafa2fdedcea5e373ef9de1c3d2737ae9f809e80958d1ba4590d74",
+        "rct_mask": "68459964f89d69b7a4b1e0a1a8a384cbc9a76363c8a6e99058d41906908bd005",
+        "payment_id": "4f695d197f2a3c54",
+        "unlock_time": 0,
+        "mixin_count": 15,
+        "coinbase": false
+      }
+    },
+    {
+      "event": "tx-confirmation",
+      "payment_id": "4f695d197f2a3c54",
+      "token": "single zmq wallet",
+      "confirmations": 1,
+      "event_id": "3894f98f5dd54af5857e4f8a961a4e57",
+      "tx_info": {
+        "id": {
+          "high": 0,
+          "low": 5666768
+        },
+        "block": 2265961,
+        "index": 1,
+        "amount": 3117324236131,
+        "timestamp": 1687301600,
+        "tx_hash": "ef3187775584351cc5109de124b877bcc530fb3fdbf77895329dd447902cc566",
+        "tx_prefix_hash": "064884b8a8f903edcfebab830707ed44b633438b47c95a83320f4438b1b28626",
+        "tx_public": "54dce1a6eebafa2fdedcea5e373ef9de1c3d2737ae9f809e80958d1ba4590d74",
+        "rct_mask": "4cdc4c4e340aacb4741ba20f9b0b859242ecdad2fcc251f71d81123a47db3400",
+        "payment_id": "4f695d197f2a3c54",
+        "unlock_time": 0,
+        "mixin_count": 15,
+        "coinbase": false
+      }
+    }
+  ]
+}
+```
+
+Notice the `json-full-hooks:` prefix - this is required for the ZMQ PUB/SUB
+subscription model. The subscriber requests data from a certain "topic" where
+matching is done by string prefixes.
+
+> `index` is a counter used to detect dropped messages.
+
+> The `block` and `id` fields in the above example are NOT present when
+`confirmations == 0`.
diff --git a/src/db/account.cpp b/src/db/account.cpp
index 40f9908..2c99894 100644
--- a/src/db/account.cpp
+++ b/src/db/account.cpp
@@ -177,3 +177,4 @@ namespace lws
   }
 } // lws
 
+
diff --git a/src/db/data.cpp b/src/db/data.cpp
index 89a35e8..d0b54e1 100644
--- a/src/db/data.cpp
+++ b/src/db/data.cpp
@@ -263,7 +263,7 @@ namespace db
     map_webhook_value(dest, source, payment_id);
   }
 
-  void write_bytes(wire::json_writer& dest, const webhook_tx_confirmation& self)
+  void write_bytes(wire::writer& dest, const webhook_tx_confirmation& self)
   {
     crypto::hash8 payment_id;
     static_assert(sizeof(payment_id) == sizeof(self.value.first.payment_id), "bad memcpy");
diff --git a/src/db/data.h b/src/db/data.h
index a8fdd76..45b2cde 100644
--- a/src/db/data.h
+++ b/src/db/data.h
@@ -299,7 +299,7 @@ namespace db
     webhook_value value;
     output tx_info;
   };
-  void write_bytes(wire::json_writer&, const webhook_tx_confirmation&);
+  void write_bytes(wire::writer&, const webhook_tx_confirmation&);
 
   //! References a specific output that triggered a webhook
   struct webhook_output
diff --git a/src/db/storage.cpp b/src/db/storage.cpp
index 0e7e356..687da91 100644
--- a/src/db/storage.cpp
+++ b/src/db/storage.cpp
@@ -2192,6 +2192,7 @@ namespace db
 
   expect<void> storage::add_webhook(const webhook_type type, const account_address& address, const webhook_value& event)
   {
+    if (event.second.url != "zmq")
     {
       epee::net_utils::http::url_content url{};
       if (event.second.url.empty() || !epee::net_utils::parse_url(event.second.url, url))
diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp
index fbb5d4c..a035e8e 100644
--- a/src/rpc/client.cpp
+++ b/src/rpc/client.cpp
@@ -183,6 +183,7 @@ namespace rpc
         , cache_time()
         , cache_interval(interval)
         , cached{}
+        , sync_pub()
         , sync_rates()
       {
         if (std::chrono::minutes{0} < cache_interval)
@@ -199,6 +200,7 @@ namespace rpc
       std::chrono::steady_clock::time_point cache_time;
       const std::chrono::minutes cache_interval;
       rates cached;
+      boost::mutex sync_pub;
       boost::mutex sync_rates;
     };
   } // detail
@@ -398,7 +400,7 @@ namespace rpc
       amqp_bytes_t message{};
       message.len = payload.size();
       message.bytes = const_cast<std::uint8_t*>(payload.data());
-      const int rmq_rc = amqp_basic_publish(ctx->rmq.conn.get(), rmq_channel, amqp_cstring_bytes(ctx->rmq.exchange.c_str()), amqp_cstring_bytes(ctx->rmq.routing.c_str()), 1, 1, nullptr,  message);
+      const int rmq_rc = amqp_basic_publish(ctx->rmq.conn.get(), rmq_channel, amqp_cstring_bytes(ctx->rmq.exchange.c_str()), amqp_cstring_bytes(ctx->rmq.routing.c_str()), 0, 0, nullptr,  message);
       if (rmq_rc != 0)
       {
         MERROR("Failed RMQ Publish with return code: " << rmq_rc);
@@ -460,12 +462,12 @@ namespace rpc
 
       std::string user;
       std::string pass;
-      boost::regex expression{"^\\w+:\\w+$"};
+      boost::regex expression{"(\\w+):(\\w+)"};
       boost::smatch matcher;
-      if (boost::regex_search(url.host, matcher, expression))
+      if (boost::regex_search(rmq_info.credentials, matcher, expression))
       {
-         user = matcher[0];
-         pass = matcher[1];
+         user = matcher[1];
+         pass = matcher[2];
       }
 
       rmq.conn.reset(amqp_new_connection());
@@ -479,17 +481,22 @@ namespace rpc
         MERROR("Unable to open RMQ socket: " << status);
         MONERO_THROW(error::rmq_failure, "Unable to open RMQ socket");
       }
+
       if (!user.empty() || !pass.empty())
       {
         if (amqp_login(rmq.conn.get(), url.uri.c_str(), 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user.c_str(), pass.c_str()).reply_type != AMQP_RESPONSE_NORMAL)
           MONERO_THROW(error::rmq_failure, "Failure to login RMQ socket");
       }
-      if (amqp_channel_open(rmq.conn.get(), rmq_channel) != nullptr)
+      if (amqp_channel_open(rmq.conn.get(), rmq_channel) == nullptr)
         MONERO_THROW(error::rmq_failure, "Unabe to open RMQ channel");
+
+      if (amqp_get_rpc_reply(rmq.conn.get()).reply_type != AMQP_RESPONSE_NORMAL)
+        MONERO_THROW(error::rmq_failure, "Failed receiving channel open reply");
+
       MINFO("Connected to RMQ server " << url.host << ":" << url.port);
     }
 #else // !MLWS_RMQ_ENABLED
-    if (!rmq_info.address.empty() || !rmq_info.exchange.empty() || !rmq_info.routing.empty())
+    if (!rmq_info.address.empty() || !rmq_info.exchange.empty() || !rmq_info.routing.empty() || !rmq_info.credentials.empty())
       MONERO_THROW(error::configuration, "RabbitMQ support not enabled");
 #endif
 
diff --git a/src/rpc/client.h b/src/rpc/client.h
index 48f49a0..e19baa5 100644
--- a/src/rpc/client.h
+++ b/src/rpc/client.h
@@ -120,7 +120,7 @@ namespace rpc
       return ctx != nullptr;
     }
 
-    //! True if an external pub/sub was setup
+    //! \return True if an external pub/sub was setup
     bool has_publish() const noexcept;
 
     //! `wait`, `send`, and `receive` will watch for `raise_abort_scan()`.
@@ -143,6 +143,21 @@ namespace rpc
     */
     expect<void> send(epee::byte_slice message, std::chrono::seconds timeout) noexcept;
 
+    //! Publish `payload` to ZMQ external pub socket.
+    expect<void> publish(epee::byte_slice payload);
+
+    //! Publish `data` after `topic` to ZMQ external pub socket.
+    template<typename F, typename T>
+    expect<void> publish(const boost::string_ref topic, const T& data)
+    {
+      epee::byte_stream bytes{};
+      bytes.write(topic.data(), topic.size());
+      const std::error_code err = F::to_bytes(bytes, data);
+      if (err)
+        return err;
+      return publish(epee::byte_slice{std::move(bytes)});
+    }
+
     //! \return Next available RPC message response from server
     expect<std::string> get_message(std::chrono::seconds timeout);
 
diff --git a/src/scanner.cpp b/src/scanner.cpp
index c40b3eb..acd7313 100644
--- a/src/scanner.cpp
+++ b/src/scanner.cpp
@@ -58,7 +58,9 @@
 #include "rpc/json.h"
 #include "util/source_location.h"
 #include "util/transactions.h"
+#include "wire/adapted/span.h"
 #include "wire/json.h"
+#include "wire/msgpack.h"
 
 #include "serialization/json_object.h"
 
@@ -211,6 +213,8 @@ namespace lws
 
       for (const db::webhook_tx_confirmation& event : events)
       {
+        if (event.value.second.url == "zmq")
+          continue;
         if (event.value.second.url.empty() || !net::parse_url(event.value.second.url, url))
         {
           MERROR("Bad URL for webhook event: " << event.value.second.url);
@@ -375,6 +379,7 @@ namespace lws
             events.pop_back(); //cannot compute tx_hash
         }
         send_via_http(epee::to_span(events), std::chrono::seconds{5}, verify_mode_);
+        send_via_zmq(client_, epee::to_span(events));
         return true;
       }
     };
@@ -781,6 +786,7 @@ namespace lws
 
           MINFO("Processed " << blocks.size() << " block(s) against " << users.size() << " account(s)");
           send_via_http(epee::to_span(updated->second), std::chrono::seconds{5}, webhook_verify);
+          send_via_zmq(client, epee::to_span(updated->second));
           if (updated->first != users.size())
           {
             MWARNING("Only updated " << updated->first << " account(s) out of " << users.size() << ", resetting");
diff --git a/src/wire/adapted/span.h b/src/wire/adapted/span.h
new file mode 100644
index 0000000..bfaa393
--- /dev/null
+++ b/src/wire/adapted/span.h
@@ -0,0 +1,40 @@
+// Copyright (c) 2023, The Monero Project
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without modification, are
+// permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice, this list of
+//    conditions and the following disclaimer.
+//
+// 2. Redistributions in binary form must reproduce the above copyright notice, this list
+//    of conditions and the following disclaimer in the documentation and/or other
+//    materials provided with the distribution.
+//
+// 3. Neither the name of the copyright holder nor the names of its contributors may be
+//    used to endorse or promote products derived from this software without specific
+//    prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#pragma once
+
+#include "span.h" // monero/contrib/epee/include
+#include "wire/traits.h"
+
+namespace wire
+{
+  //! Enable span types for array output
+  template<typename T>
+  struct is_array<epee::span<T>>
+    : std::true_type
+  {};
+}