Add ZMQ-PUB support for webhooks

This commit is contained in:
Lee *!* Clagett 2023-07-04 11:10:02 -04:00
parent 246c905e37
commit d78a920ba1
9 changed files with 129 additions and 22 deletions

View file

@ -146,23 +146,27 @@ height.
### webhook_add
This is used to track a specific payment ID to an address or all general
payments to an address (where payment ID is zero). Using this endpint requires
a web address for callback purposes, a primary (not integrated!) address, and
finally the type ("tx-confirmation"). The event will remain in the database
until one of the delete commands ([webhook_delete_uuid](#webhook_delete_uuid)
or [webhook_delete](#webhook_delete)) is used to remove it.
a web address or `zmq` for callback purposes, a primary (not integrated!)
address, and finally the type ("tx-confirmation"). The event will remain in the
database until one of the delete commands ([webhook_delete_uuid](#webhook_delete_uuid)
or [webhook_delete](#webhook_delete)) is used to remove it. All webhooks are
published over the ZMQ socket specified by `--zmq-pub` (when enabled/specified
on command line) in addition to any HTTP server specified in the callback.
> The provided URL will use SSL/TLS if `https://` is prefixed in the URL and
will use plaintext if `http://` is prefixed in the URL. SSL/TLS connections
will use the system certificate authority (root-CAs) by default, and will
ignore all authority checks if `--webhook-ssl-verification none` is provided
on the command line when starting `monero-lws-daemon`. The webhook will fail
if there is a mismatch of `http` and `https` between the two servers, and
will also fail if `https` verification is mismatched. The rule is: (1) if
the callback server has SSL/TLS disabled, the webhook should use `http://`,
(2) if the callback server has a self-signed certificate, `https://` and
`--webhook-ssl-verification none` should be used, and (3) if the callback
server is using "Let's Encrypt" (or similar), then `https://` with no
additional command line flag should be used.
will use plaintext if `http://` is prefixed in the URL. If `zmq` is provided
as the callback, notifications are performed _only_ over the ZMQ pub socket.
SSL/TLS connections will use the system certificate authority (root-CAs) by
default, and will ignore all authority checks if
`--webhook-ssl-verification none` is provided on the command line when
starting `monero-lws-daemon`. The webhook will fail if there is a mismatch of
`http` and `https` between the two servers, and will also fail if `https`
verification is mismatched. The rule is: (1) if the callback server has
SSL/TLS disabled, the webhook should use `http://`, (2) if the callback server
has a self-signed certificate, `https://` and `--webhook-ssl-verification none`
should be used, and (3) if the callback server is using "Let's Encrypt"
(or similar), then `https://` with no additional command line flag should be
used.
#### Initial Request to server

View file

@ -177,3 +177,4 @@ namespace lws
}
} // lws

View file

@ -263,7 +263,7 @@ namespace db
map_webhook_value(dest, source, payment_id);
}
void write_bytes(wire::json_writer& dest, const webhook_tx_confirmation& self)
void write_bytes(wire::writer& dest, const webhook_tx_confirmation& self)
{
crypto::hash8 payment_id;
static_assert(sizeof(payment_id) == sizeof(self.value.first.payment_id), "bad memcpy");

View file

@ -299,7 +299,7 @@ namespace db
webhook_value value;
output tx_info;
};
void write_bytes(wire::json_writer&, const webhook_tx_confirmation&);
void write_bytes(wire::writer&, const webhook_tx_confirmation&);
//! References a specific output that triggered a webhook
struct webhook_output

View file

@ -2192,6 +2192,7 @@ namespace db
expect<void> storage::add_webhook(const webhook_type type, const account_address& address, const webhook_value& event)
{
if (event.second.url != "zmq")
{
epee::net_utils::http::url_content url{};
if (event.second.url.empty() || !epee::net_utils::parse_url(event.second.url, url))

View file

@ -135,15 +135,17 @@ namespace rpc
{
struct context
{
explicit context(zcontext comm, socket signal_pub, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval)
explicit context(zcontext comm, socket signal_pub, socket external_pub, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval)
: comm(std::move(comm))
, signal_pub(std::move(signal_pub))
, external_pub(std::move(external_pub))
, daemon_addr(std::move(daemon_addr))
, sub_addr(std::move(sub_addr))
, rates_conn()
, cache_time()
, cache_interval(interval)
, cached{}
, sync_pub()
, sync_rates()
{
if (std::chrono::minutes{0} < cache_interval)
@ -152,12 +154,14 @@ namespace rpc
zcontext comm;
socket signal_pub;
socket external_pub;
const std::string daemon_addr;
const std::string sub_addr;
http::http_simple_client rates_conn;
std::chrono::steady_clock::time_point cache_time;
const std::chrono::minutes cache_interval;
rates cached;
boost::mutex sync_pub;
boost::mutex sync_rates;
};
} // detail
@ -243,6 +247,11 @@ namespace rpc
client::~client() noexcept
{}
bool client::has_publish() const noexcept
{
return ctx && ctx->external_pub;
}
expect<void> client::watch_scan_signals() noexcept
{
MONERO_PRECOND(ctx != nullptr);
@ -330,6 +339,17 @@ namespace rpc
return success();
}
expect<void> client::publish(epee::byte_slice payload)
{
MONERO_PRECOND(ctx != nullptr);
assert(daemon != nullptr);
if (ctx->external_pub == nullptr)
return success();
const boost::unique_lock<boost::mutex> guard{ctx->sync_pub};
return net::zmq::send(std::move(payload), ctx->external_pub.get(), 0);
}
expect<rates> client::get_rates() const
{
MONERO_PRECOND(ctx != nullptr);
@ -343,7 +363,7 @@ namespace rpc
return ctx->cached;
}
context context::make(std::string daemon_addr, std::string sub_addr, std::chrono::minutes rates_interval)
context context::make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, std::chrono::minutes rates_interval)
{
zcontext comm{zmq_init(1)};
if (comm == nullptr)
@ -355,9 +375,19 @@ namespace rpc
if (zmq_bind(pub.get(), signal_endpoint) < 0)
MONERO_THROW(net::zmq::get_error_code(), "zmq_bind");
detail::socket external_pub = nullptr;
if (!pub_addr.empty())
{
external_pub = detail::socket{zmq_socket(comm.get(), ZMQ_PUB)};
if (external_pub == nullptr)
MONERO_THROW(net::zmq::get_error_code(), "zmq_socket");
if (zmq_bind(external_pub.get(), pub_addr.c_str()) < 0)
MONERO_THROW(net::zmq::get_error_code(), "zmq_bind");
}
return context{
std::make_shared<detail::context>(
std::move(comm), std::move(pub), std::move(daemon_addr), std::move(sub_addr), rates_interval
std::move(comm), std::move(pub), std::move(external_pub), std::move(daemon_addr), std::move(sub_addr), rates_interval
)
};
}

View file

@ -38,6 +38,7 @@
#include "rpc/message.h" // monero/src
#include "rpc/daemon_pub.h"
#include "rpc/rates.h"
#include "span.h" // monero/contrib/epee/include
#include "util/source_location.h"
namespace lws
@ -112,6 +113,9 @@ namespace rpc
return ctx != nullptr;
}
//! \return True if an external pub/sub was setup
bool has_publish() const noexcept;
//! `wait`, `send`, and `receive` will watch for `raise_abort_scan()`.
expect<void> watch_scan_signals() noexcept;
@ -132,6 +136,21 @@ namespace rpc
*/
expect<void> send(epee::byte_slice message, std::chrono::seconds timeout) noexcept;
//! Publish `payload` to ZMQ external pub socket.
expect<void> publish(epee::byte_slice payload);
//! Publish `data` after `topic` to ZMQ external pub socket.
template<typename F, typename T>
expect<void> publish(const boost::string_ref topic, const T& data)
{
epee::byte_stream bytes{};
bytes.write(topic.data(), topic.size());
const std::error_code err = F::to_bytes(bytes, data);
if (err)
return err;
return publish(epee::byte_slice{std::move(bytes)});
}
//! \return Next available RPC message response from server
expect<std::string> get_message(std::chrono::seconds timeout);
@ -171,10 +190,11 @@ namespace rpc
\note All errors are exceptions; no recovery can occur.
\param daemon_addr Location of ZMQ enabled `monerod` RPC.
\param pub_addr Bind location for publishing ZMQ events.
\param rates_interval Frequency to retrieve exchange rates. Set value to
`<= 0` to disable exchange rate retrieval.
*/
static context make(std::string daemon_addr, std::string sub_addr, std::chrono::minutes rates_interval);
static context make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, std::chrono::minutes rates_interval);
context(context&&) = default;
context(context const&) = delete;

View file

@ -58,7 +58,9 @@
#include "rpc/json.h"
#include "util/source_location.h"
#include "util/transactions.h"
#include "wire/adapted/span.h"
#include "wire/json.h"
#include "wire/msgpack.h"
#include "serialization/json_object.h"
@ -211,6 +213,8 @@ namespace lws
for (const db::webhook_tx_confirmation& event : events)
{
if (event.value.second.url == "zmq")
continue;
if (event.value.second.url.empty() || !net::parse_url(event.value.second.url, url))
{
MERROR("Bad URL for webhook event: " << event.value.second.url);
@ -243,6 +247,46 @@ namespace lws
}
}
struct zmq_index
{
const std::uint64_t index;
const epee::span<const db::webhook_tx_confirmation> events;
};
void write_bytes(wire::writer& dest, const zmq_index& self)
{
wire::object(dest, WIRE_FIELD(index), WIRE_FIELD(events));
}
void send_via_zmq(rpc::client& client, const epee::span<const db::webhook_tx_confirmation> events)
{
struct zmq_order
{
std::uint64_t current;
boost::mutex sync;
zmq_order()
: current(0), sync()
{}
};
static zmq_order ordering{};
//! \TODO monitor XPUB to cull the serialization
if (!events.empty() && client.has_publish())
{
// make sure the event is queued to zmq in order.
const boost::unique_lock<boost::mutex> guard{ordering.sync};
const zmq_index index{ordering.current++, events};
MINFO("Sending ZMQ-PUB topics json-full-hooks and msgpack-full-hooks");
expect<void> result = success();
if (!(result = client.publish<wire::json>("json-full-hooks:", index)))
MERROR("Failed to serialize+send json-full-hooks: " << result.error().message());
if (!(result = client.publish<wire::msgpack>("msgpack-full-hooks:", index)))
MERROR("Failed to serialize+send msgpack-full-hooks: " << result.error().message());
}
}
struct by_height
{
bool operator()(account const& left, account const& right) const noexcept
@ -335,6 +379,7 @@ namespace lws
events.pop_back(); //cannot compute tx_hash
}
send_via_http(epee::to_span(events), std::chrono::seconds{5}, verify_mode_);
send_via_zmq(client_, epee::to_span(events));
return true;
}
};
@ -741,6 +786,7 @@ namespace lws
MINFO("Processed " << blocks.size() << " block(s) against " << users.size() << " account(s)");
send_via_http(epee::to_span(updated->second), std::chrono::seconds{5}, webhook_verify);
send_via_zmq(client, epee::to_span(updated->second));
if (updated->first != users.size())
{
MWARNING("Only updated " << updated->first << " account(s) out of " << users.size() << ", resetting");

View file

@ -56,6 +56,7 @@ namespace
{
const command_line::arg_descriptor<std::string> daemon_rpc;
const command_line::arg_descriptor<std::string> daemon_sub;
const command_line::arg_descriptor<std::string> zmq_pub;
const command_line::arg_descriptor<std::vector<std::string>> rest_servers;
const command_line::arg_descriptor<std::vector<std::string>> admin_rest_servers;
const command_line::arg_descriptor<std::string> rest_ssl_key;
@ -90,6 +91,7 @@ namespace
: lws::options()
, daemon_rpc{"daemon", "<protocol>://<address>:<port> of a monerod ZMQ RPC", get_default_zmq()}
, daemon_sub{"sub", "tcp://address:port or ipc://path of a monerod ZMQ Pub", ""}
, zmq_pub{"zmq-pub", "tcp://address:port or ipc://path of a bind location for ZMQ pub events", ""}
, rest_servers{"rest-server", "[(https|http)://<address>:]<port>[/<prefix>] for incoming connections, multiple declarations allowed"}
, admin_rest_servers{"admin-rest-server", "[(https|http])://<address>:]<port>[/<prefix>] for incoming admin connections, multiple declarations allowed"}
, rest_ssl_key{"rest-ssl-key", "<path> to PEM formatted SSL key for https REST server", ""}
@ -112,6 +114,7 @@ namespace
lws::options::prepare(description);
command_line::add_arg(description, daemon_rpc);
command_line::add_arg(description, daemon_sub);
command_line::add_arg(description, zmq_pub);
description.add_options()(rest_servers.name, boost::program_options::value<std::vector<std::string>>()->default_value({rest_default}, rest_default), rest_servers.description);
command_line::add_arg(description, admin_rest_servers);
command_line::add_arg(description, rest_ssl_key);
@ -136,6 +139,7 @@ namespace
lws::rest_server::configuration rest_config;
std::string daemon_rpc;
std::string daemon_sub;
std::string zmq_pub;
std::string webhook_ssl_verification;
std::chrono::minutes rates_interval;
std::size_t scan_threads;
@ -189,6 +193,7 @@ namespace
},
command_line::get_arg(args, opts.daemon_rpc),
command_line::get_arg(args, opts.daemon_sub),
command_line::get_arg(args, opts.zmq_pub),
command_line::get_arg(args, opts.webhook_ssl_verification),
std::chrono::minutes{command_line::get_arg(args, opts.rates_interval)},
command_line::get_arg(args, opts.scan_threads),
@ -210,7 +215,7 @@ namespace
boost::filesystem::create_directories(prog.db_path);
auto disk = lws::db::storage::open(prog.db_path.c_str(), prog.create_queue_max);
auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), prog.rates_interval);
auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), std::move(prog.zmq_pub), prog.rates_interval);
MINFO("Using monerod ZMQ RPC at " << ctx.daemon_address());
auto client = lws::scanner::sync(disk.clone(), ctx.connect().value()).value();