merge in zmq-pub changes

This commit is contained in:
Lee *!* Clagett 2023-07-31 16:14:52 -04:00
parent 3254ab8344
commit 9359c514a6
7 changed files with 122 additions and 19 deletions

View file

@ -305,7 +305,7 @@ Example response:
```json
{
"payment_id": "0000000000000000",
"event_id": "fa10a4db485145f1a24dc09c19a79d43",
"event_id": "c5a735e71b1e4f0a8bfaeff661d0b38a"",
"token": "1234",
"confirmations": 1,
"url": "http://127.0.0.1:7000"

View file

@ -2,24 +2,28 @@
Monero-lws uses ZeroMQ-RPC to retrieve information from a Monero daemon,
ZeroMQ-SUB to get immediate notifications of blocks and transactions from a
Monero daemon, and ZeroMQ-PUB to notify external applications of payment_id
(web)hooks.
and new account (web)hooks.
## External "pub" socket
The bind location of the ZMQ-PUB socket is specified with the `--zmq-pub`
option. Users are still required to "subscribe" to topics:
* `json-full-pyment_hook`: A JSON array of webhook payment events that have
recently triggered (identical output as webhook).
* `msgpack-full-payment_hook`: A msgpack array of webhook payment events that
have recently triggered (identical output as webhook).
* `json-full-payment_hook`: A JSON object of a single webhook payment event
that has recently triggered (identical output as webhook).
* `msgpack-full-payment_hook`: A msgpack object of a webhook payment events
that have recently triggered (identical output as webhook).
* `json-full-new_account_hook`: A JSON object of a single new account
creation that has recently triggered (identical output as webhook).
* `msgpack-full-new_account_hook`: A msgpack object of a single new account
creation that has recently triggered (identical output as webhook).
### `json-full-payment_hook`/`msgpack-full-payment_hook`
These topics receive PUB messages when a webhook ([`webhook_add`](administration.md)),
event is triggered. If the specified URL is `zmq`, then notifications are only
done over the ZMQ-PUB socket, otherwise the notification is sent over ZMQ-PUB
socket AND the specified URL. Invoking `webhook_add` with a `payment_id` of
zeroes (the field is optional in `webhook_add), will match on all transactions
that lack a `payment_id`.
event is triggered for a payment (`tx-confirmation`). If the specified URL is
`zmq`, then notifications are only done over the ZMQ-PUB socket, otherwise the
notification is sent over ZMQ-PUB socket AND the specified URL. Invoking
`webhook_add` with a `payment_id` of zeroes (the field is optional in
`webhook_add`), will match on all transactions that lack a `payment_id`.
Example of the "raw" output from ZMQ-SUB side:
@ -63,3 +67,31 @@ matching is done by string prefixes.
> The `block` and `id` fields in the above example are NOT present when
`confirmations == 0`.
### `json-full-new_account_hook`/`msgpack-full-new_account_hook`
These topics receive PUB messages when a webhook ([`webhook_add`](administration.md)),
event is triggered for a new account (`new-account`). If the specified URL is
`zmq`, then notifications are only done over the ZMQ-PUB socket, otherwise the
notification is sent over ZMQ-PUB socket AND the specified URL. Invoking
`webhook_add` with a `payment_id` of zeroes (the field is optional in
`webhook_add`), will match on all transactions that lack a `payment_id`.
Example of the "raw" output from ZMQ-SUB side:
```json
json-full-new_account_hook:{
"index": 2,
"event": {
"event": "new-account",
"event_id": "c5a735e71b1e4f0a8bfaeff661d0b38a",
"token": "",
"address": "9zGwnfWRMTF9nFVW9DNKp46aJ43CRtQBWNFvPqFVSN3RUKHuc37u2RDi2GXGp1wRdSRo5juS828FqgyxkumDaE4s9qyyi9B"
}
}
```
Notice the `json-full-new_account_hook:` prefix - this is required for the ZMQ
PUB/SUB subscription model. The subscriber requests data from a certain "topic"
where matching is done by string prefixes.
> `index` is a counter used to detect dropped messages.

View file

@ -293,7 +293,7 @@ namespace db
);
}
void write_bytes(wire::json_writer& dest, const webhook_new_account& self)
void write_bytes(wire::writer& dest, const webhook_new_account& self)
{
wire::object(dest,
wire::field<0>("event_id", std::cref(self.value.first.event_id)),

View file

@ -323,7 +323,7 @@ namespace db
webhook_value value;
account_address account;
};
void write_bytes(wire::json_writer&, const webhook_new_account&);
void write_bytes(wire::writer&, const webhook_new_account&);
bool operator==(transaction_link const& left, transaction_link const& right) noexcept;
bool operator<(transaction_link const& left, transaction_link const& right) noexcept;

View file

@ -599,7 +599,7 @@ namespace lws
using request = rpc::login_request;
using response = rpc::login_response;
static expect<response> handle(request req, db::storage disk, rpc::client const&, runtime_options const& options)
static expect<response> handle(request req, db::storage disk, rpc::client const& gclient, runtime_options const& options)
{
if (!key_check(req.creds))
return {lws::error::bad_view_key};
@ -628,7 +628,16 @@ namespace lws
const auto hooks = disk.creation_request(req.creds.address, req.creds.key, flags);
if (!hooks)
return hooks.error();
rpc::http_send(epee::to_span(*hooks), std::chrono::seconds{5}, options.webhook_verify);
if (!hooks->empty())
{
expect<rpc::client> client = gclient.clone();
if (!client)
return client.error();
rpc::send_webhook(
*client, epee::to_span(*hooks), "json-full-new_account_hook:", "msgpack-full-new_account_hook:", std::chrono::seconds{5}, options.webhook_verify
);
}
return response{true, req.generated_locally};
}
};

View file

@ -1,4 +1,5 @@
#include <boost/thread/mutex.hpp>
#include <boost/utility/string_ref.hpp>
#include <chrono>
#include <string>
@ -7,6 +8,7 @@
#include "net/http_client.h" // monero/contrib/epee/include
#include "span.h"
#include "wire/json.h"
#include "wire/msgpack.h"
namespace lws { namespace rpc
{
@ -94,4 +96,64 @@ namespace lws { namespace rpc
}
}
template<typename T>
struct zmq_index_single
{
const std::uint64_t index;
const T& event;
};
template<typename T>
void write_bytes(wire::writer& dest, const zmq_index_single<T>& self)
{
wire::object(dest, WIRE_FIELD(index), WIRE_FIELD(event));
}
template<typename T>
void zmq_send(rpc::client& client, const epee::span<const T> events, const boost::string_ref json_topic, const boost::string_ref msgpack_topic)
{
// Each `T` should have a unique count. This is desired.
struct zmq_order
{
std::uint64_t current;
boost::mutex sync;
zmq_order()
: current(0), sync()
{}
};
static zmq_order ordering{};
//! \TODO monitor XPUB to cull the serialization
if (!events.empty() && client.has_publish())
{
// make sure the event is queued to zmq in order.
const boost::unique_lock<boost::mutex> guard{ordering.sync};
for (const auto& event : events)
{
const zmq_index_single<T> index{ordering.current++, event};
MINFO("Sending ZMQ-PUB topics " << json_topic << " and " << msgpack_topic);
expect<void> result = success();
if (!(result = client.publish<wire::json>(json_topic, index)))
MERROR("Failed to serialize+send " << json_topic << " " << result.error().message());
if (!(result = client.publish<wire::msgpack>(msgpack_topic, index)))
MERROR("Failed to serialize+send " << msgpack_topic << " " << result.error().message());
}
}
}
template<typename T>
void send_webhook(
rpc::client& client,
const epee::span<const T> events,
const boost::string_ref json_topic,
const boost::string_ref msgpack_topic,
const std::chrono::seconds timeout,
epee::net_utils::ssl_verification_t verify_mode)
{
http_send(events, timeout, verify_mode);
zmq_send(client, events, json_topic, msgpack_topic);
}
}} // lws // rpc

View file

@ -163,9 +163,9 @@ namespace lws
return true;
}
void send_payment_hook(const epee::span<const db::webhook_tx_confirmation>, net::ssl_verififcation_t verify_mode)
void send_payment_hook(rpc::client& client, const epee::span<const db::webhook_tx_confirmation> events, net::ssl_verification_t verify_mode)
{
rpc::send_webhook(events, "json-full-payment_hook", "msgpack-full-payment_hook" std::chrono::seconds{5}, verify_mode);
rpc::send_webhook(client, events, "json-full-payment_hook:", "msgpack-full-payment_hook:", std::chrono::seconds{5}, verify_mode);
}
struct by_height
@ -259,7 +259,7 @@ namespace lws
else
events.pop_back(); //cannot compute tx_hash
}
send_payment_hook(epee::to_span(events), verify_mode_);
send_payment_hook(client_, epee::to_span(events), verify_mode_);
return true;
}
};
@ -665,7 +665,7 @@ namespace lws
}
MINFO("Processed " << blocks.size() << " block(s) against " << users.size() << " account(s)");
send_payment_hook(epee::to_span(updated->second), webhook_verify);
send_payment_hook(client, epee::to_span(updated->second), webhook_verify);
if (updated->first != users.size())
{
MWARNING("Only updated " << updated->first << " account(s) out of " << users.size() << ", resetting");