mirror of
https://github.com/vtnerd/monero-lws.git
synced 2025-03-12 09:27:39 +00:00
Add (build-time optional) support for RabbitMQ
This commit is contained in:
parent
680c9ab304
commit
c982f32415
10 changed files with 190 additions and 25 deletions
|
@ -146,23 +146,27 @@ height.
|
|||
### webhook_add
|
||||
This is used to track a specific payment ID to an address or all general
|
||||
payments to an address (where payment ID is zero). Using this endpint requires
|
||||
a web address for callback purposes, a primary (not integrated!) address, and
|
||||
finally the type ("tx-confirmation"). The event will remain in the database
|
||||
until one of the delete commands ([webhook_delete_uuid](#webhook_delete_uuid)
|
||||
or [webhook_delete](#webhook_delete)) is used to remove it.
|
||||
a web address or `zmq` for callback purposes, a primary (not integrated!)
|
||||
address, and finally the type ("tx-confirmation"). The event will remain in the
|
||||
database until one of the delete commands ([webhook_delete_uuid](#webhook_delete_uuid)
|
||||
or [webhook_delete](#webhook_delete)) is used to remove it. All webhooks are
|
||||
published over the ZMQ socket specified by `--zmq-pub` (when enabled/specified
|
||||
on command line) in addition to any HTTP server specified in the callback.
|
||||
|
||||
> The provided URL will use SSL/TLS if `https://` is prefixed in the URL and
|
||||
will use plaintext if `http://` is prefixed in the URL. SSL/TLS connections
|
||||
will use the system certificate authority (root-CAs) by default, and will
|
||||
ignore all authority checks if `--webhook-ssl-verification none` is provided
|
||||
on the command line when starting `monero-lws-daemon`. The webhook will fail
|
||||
if there is a mismatch of `http` and `https` between the two servers, and
|
||||
will also fail if `https` verification is mismatched. The rule is: (1) if
|
||||
the callback server has SSL/TLS disabled, the webhook should use `http://`,
|
||||
(2) if the callback server has a self-signed certificate, `https://` and
|
||||
`--webhook-ssl-verification none` should be used, and (3) if the callback
|
||||
server is using "Let's Encrypt" (or similar), then `https://` with no
|
||||
additional command line flag should be used.
|
||||
will use plaintext if `http://` is prefixed in the URL. If `zmq` is provided
|
||||
as the callback, notifications are performed _only_ over the ZMQ pub socket.
|
||||
SSL/TLS connections will use the system certificate authority (root-CAs) by
|
||||
default, and will ignore all authority checks if
|
||||
`--webhook-ssl-verification none` is provided on the command line when
|
||||
starting `monero-lws-daemon`. The webhook will fail if there is a mismatch of
|
||||
`http` and `https` between the two servers, and will also fail if `https`
|
||||
verification is mismatched. The rule is: (1) if the callback server has
|
||||
SSL/TLS disabled, the webhook should use `http://`, (2) if the callback server
|
||||
has a self-signed certificate, `https://` and `--webhook-ssl-verification none`
|
||||
should be used, and (3) if the callback server is using "Let's Encrypt"
|
||||
(or similar), then `https://` with no additional command line flag should be
|
||||
used.
|
||||
|
||||
|
||||
#### Initial Request to server
|
||||
|
|
91
docs/zmq.md
Normal file
91
docs/zmq.md
Normal file
|
@ -0,0 +1,91 @@
|
|||
# monero-lws ZeroMQ Usage
|
||||
Monero-lws uses ZeroMQ-RPC to retrieve information from a Monero daemon,
|
||||
ZeroMQ-SUB to get immediate notifications of blocks and transactions from a
|
||||
Monero daemon, and ZeroMQ-PUB to notify external applications of payment_id
|
||||
(web)hooks.
|
||||
|
||||
## External "pub" socket
|
||||
The bind location of the ZMQ-PUB socket is specified with the `--zmq-pub`
|
||||
option. Users are still required to "subscribe" to topics:
|
||||
* `json-full-hooks`: A JSON array of webhook events that have recently
|
||||
triggered (identical output as webhook).
|
||||
* `msgpack-full-hooks`: A msgpack array of webhook events that have recently
|
||||
triggered (identical output as webhook).
|
||||
|
||||
|
||||
### `json-full-hooks`/`msgpack-full-hooks`i
|
||||
These topics receive PUB messages when a webhook ([`webhook_add`](administration.md)),
|
||||
event is triggered. If the specified URL is `zmq`, then notifications are only
|
||||
done over the ZMQ-PUB socket, otherwise the notification is sent over ZMQ-PUB
|
||||
socket AND the specified URL. Invoking `webhook_add` with a `payment_id` of
|
||||
zeroes (the field is optional in `webhook_add), will match on all transactions
|
||||
that lack a payment_id`.
|
||||
|
||||
Example of the "raw" output from ZMQ-SUB side:
|
||||
|
||||
```json
|
||||
json-full-hooks:{
|
||||
"index": 2,
|
||||
"events": [
|
||||
{
|
||||
"event": "tx-confirmation",
|
||||
"payment_id": "4f695d197f2a3c54",
|
||||
"token": "single zmq wallet",
|
||||
"confirmations": 1,
|
||||
"event_id": "3894f98f5dd54af5857e4f8a961a4e57",
|
||||
"tx_info": {
|
||||
"id": {
|
||||
"high": 0,
|
||||
"low": 5666767
|
||||
},
|
||||
"block": 2265961,
|
||||
"index": 0,
|
||||
"amount": 100000000000,
|
||||
"timestamp": 1687301600,
|
||||
"tx_hash": "ef3187775584351cc5109de124b877bcc530fb3fdbf77895329dd447902cc566",
|
||||
"tx_prefix_hash": "064884b8a8f903edcfebab830707ed44b633438b47c95a83320f4438b1b28626",
|
||||
"tx_public": "54dce1a6eebafa2fdedcea5e373ef9de1c3d2737ae9f809e80958d1ba4590d74",
|
||||
"rct_mask": "68459964f89d69b7a4b1e0a1a8a384cbc9a76363c8a6e99058d41906908bd005",
|
||||
"payment_id": "4f695d197f2a3c54",
|
||||
"unlock_time": 0,
|
||||
"mixin_count": 15,
|
||||
"coinbase": false
|
||||
}
|
||||
},
|
||||
{
|
||||
"event": "tx-confirmation",
|
||||
"payment_id": "4f695d197f2a3c54",
|
||||
"token": "single zmq wallet",
|
||||
"confirmations": 1,
|
||||
"event_id": "3894f98f5dd54af5857e4f8a961a4e57",
|
||||
"tx_info": {
|
||||
"id": {
|
||||
"high": 0,
|
||||
"low": 5666768
|
||||
},
|
||||
"block": 2265961,
|
||||
"index": 1,
|
||||
"amount": 3117324236131,
|
||||
"timestamp": 1687301600,
|
||||
"tx_hash": "ef3187775584351cc5109de124b877bcc530fb3fdbf77895329dd447902cc566",
|
||||
"tx_prefix_hash": "064884b8a8f903edcfebab830707ed44b633438b47c95a83320f4438b1b28626",
|
||||
"tx_public": "54dce1a6eebafa2fdedcea5e373ef9de1c3d2737ae9f809e80958d1ba4590d74",
|
||||
"rct_mask": "4cdc4c4e340aacb4741ba20f9b0b859242ecdad2fcc251f71d81123a47db3400",
|
||||
"payment_id": "4f695d197f2a3c54",
|
||||
"unlock_time": 0,
|
||||
"mixin_count": 15,
|
||||
"coinbase": false
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
Notice the `json-full-hooks:` prefix - this is required for the ZMQ PUB/SUB
|
||||
subscription model. The subscriber requests data from a certain "topic" where
|
||||
matching is done by string prefixes.
|
||||
|
||||
> `index` is a counter used to detect dropped messages.
|
||||
|
||||
> The `block` and `id` fields in the above example are NOT present when
|
||||
`confirmations == 0`.
|
|
@ -177,3 +177,4 @@ namespace lws
|
|||
}
|
||||
} // lws
|
||||
|
||||
|
||||
|
|
|
@ -263,7 +263,7 @@ namespace db
|
|||
map_webhook_value(dest, source, payment_id);
|
||||
}
|
||||
|
||||
void write_bytes(wire::json_writer& dest, const webhook_tx_confirmation& self)
|
||||
void write_bytes(wire::writer& dest, const webhook_tx_confirmation& self)
|
||||
{
|
||||
crypto::hash8 payment_id;
|
||||
static_assert(sizeof(payment_id) == sizeof(self.value.first.payment_id), "bad memcpy");
|
||||
|
|
|
@ -299,7 +299,7 @@ namespace db
|
|||
webhook_value value;
|
||||
output tx_info;
|
||||
};
|
||||
void write_bytes(wire::json_writer&, const webhook_tx_confirmation&);
|
||||
void write_bytes(wire::writer&, const webhook_tx_confirmation&);
|
||||
|
||||
//! References a specific output that triggered a webhook
|
||||
struct webhook_output
|
||||
|
|
|
@ -2192,6 +2192,7 @@ namespace db
|
|||
|
||||
expect<void> storage::add_webhook(const webhook_type type, const account_address& address, const webhook_value& event)
|
||||
{
|
||||
if (event.second.url != "zmq")
|
||||
{
|
||||
epee::net_utils::http::url_content url{};
|
||||
if (event.second.url.empty() || !epee::net_utils::parse_url(event.second.url, url))
|
||||
|
|
|
@ -183,6 +183,7 @@ namespace rpc
|
|||
, cache_time()
|
||||
, cache_interval(interval)
|
||||
, cached{}
|
||||
, sync_pub()
|
||||
, sync_rates()
|
||||
{
|
||||
if (std::chrono::minutes{0} < cache_interval)
|
||||
|
@ -199,6 +200,7 @@ namespace rpc
|
|||
std::chrono::steady_clock::time_point cache_time;
|
||||
const std::chrono::minutes cache_interval;
|
||||
rates cached;
|
||||
boost::mutex sync_pub;
|
||||
boost::mutex sync_rates;
|
||||
};
|
||||
} // detail
|
||||
|
@ -398,7 +400,7 @@ namespace rpc
|
|||
amqp_bytes_t message{};
|
||||
message.len = payload.size();
|
||||
message.bytes = const_cast<std::uint8_t*>(payload.data());
|
||||
const int rmq_rc = amqp_basic_publish(ctx->rmq.conn.get(), rmq_channel, amqp_cstring_bytes(ctx->rmq.exchange.c_str()), amqp_cstring_bytes(ctx->rmq.routing.c_str()), 1, 1, nullptr, message);
|
||||
const int rmq_rc = amqp_basic_publish(ctx->rmq.conn.get(), rmq_channel, amqp_cstring_bytes(ctx->rmq.exchange.c_str()), amqp_cstring_bytes(ctx->rmq.routing.c_str()), 0, 0, nullptr, message);
|
||||
if (rmq_rc != 0)
|
||||
{
|
||||
MERROR("Failed RMQ Publish with return code: " << rmq_rc);
|
||||
|
@ -460,12 +462,12 @@ namespace rpc
|
|||
|
||||
std::string user;
|
||||
std::string pass;
|
||||
boost::regex expression{"^\\w+:\\w+$"};
|
||||
boost::regex expression{"(\\w+):(\\w+)"};
|
||||
boost::smatch matcher;
|
||||
if (boost::regex_search(url.host, matcher, expression))
|
||||
if (boost::regex_search(rmq_info.credentials, matcher, expression))
|
||||
{
|
||||
user = matcher[0];
|
||||
pass = matcher[1];
|
||||
user = matcher[1];
|
||||
pass = matcher[2];
|
||||
}
|
||||
|
||||
rmq.conn.reset(amqp_new_connection());
|
||||
|
@ -479,17 +481,22 @@ namespace rpc
|
|||
MERROR("Unable to open RMQ socket: " << status);
|
||||
MONERO_THROW(error::rmq_failure, "Unable to open RMQ socket");
|
||||
}
|
||||
|
||||
if (!user.empty() || !pass.empty())
|
||||
{
|
||||
if (amqp_login(rmq.conn.get(), url.uri.c_str(), 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user.c_str(), pass.c_str()).reply_type != AMQP_RESPONSE_NORMAL)
|
||||
MONERO_THROW(error::rmq_failure, "Failure to login RMQ socket");
|
||||
}
|
||||
if (amqp_channel_open(rmq.conn.get(), rmq_channel) != nullptr)
|
||||
if (amqp_channel_open(rmq.conn.get(), rmq_channel) == nullptr)
|
||||
MONERO_THROW(error::rmq_failure, "Unabe to open RMQ channel");
|
||||
|
||||
if (amqp_get_rpc_reply(rmq.conn.get()).reply_type != AMQP_RESPONSE_NORMAL)
|
||||
MONERO_THROW(error::rmq_failure, "Failed receiving channel open reply");
|
||||
|
||||
MINFO("Connected to RMQ server " << url.host << ":" << url.port);
|
||||
}
|
||||
#else // !MLWS_RMQ_ENABLED
|
||||
if (!rmq_info.address.empty() || !rmq_info.exchange.empty() || !rmq_info.routing.empty())
|
||||
if (!rmq_info.address.empty() || !rmq_info.exchange.empty() || !rmq_info.routing.empty() || !rmq_info.credentials.empty())
|
||||
MONERO_THROW(error::configuration, "RabbitMQ support not enabled");
|
||||
#endif
|
||||
|
||||
|
|
|
@ -120,7 +120,7 @@ namespace rpc
|
|||
return ctx != nullptr;
|
||||
}
|
||||
|
||||
//! True if an external pub/sub was setup
|
||||
//! \return True if an external pub/sub was setup
|
||||
bool has_publish() const noexcept;
|
||||
|
||||
//! `wait`, `send`, and `receive` will watch for `raise_abort_scan()`.
|
||||
|
@ -143,6 +143,21 @@ namespace rpc
|
|||
*/
|
||||
expect<void> send(epee::byte_slice message, std::chrono::seconds timeout) noexcept;
|
||||
|
||||
//! Publish `payload` to ZMQ external pub socket.
|
||||
expect<void> publish(epee::byte_slice payload);
|
||||
|
||||
//! Publish `data` after `topic` to ZMQ external pub socket.
|
||||
template<typename F, typename T>
|
||||
expect<void> publish(const boost::string_ref topic, const T& data)
|
||||
{
|
||||
epee::byte_stream bytes{};
|
||||
bytes.write(topic.data(), topic.size());
|
||||
const std::error_code err = F::to_bytes(bytes, data);
|
||||
if (err)
|
||||
return err;
|
||||
return publish(epee::byte_slice{std::move(bytes)});
|
||||
}
|
||||
|
||||
//! \return Next available RPC message response from server
|
||||
expect<std::string> get_message(std::chrono::seconds timeout);
|
||||
|
||||
|
|
|
@ -58,7 +58,9 @@
|
|||
#include "rpc/json.h"
|
||||
#include "util/source_location.h"
|
||||
#include "util/transactions.h"
|
||||
#include "wire/adapted/span.h"
|
||||
#include "wire/json.h"
|
||||
#include "wire/msgpack.h"
|
||||
|
||||
#include "serialization/json_object.h"
|
||||
|
||||
|
@ -211,6 +213,8 @@ namespace lws
|
|||
|
||||
for (const db::webhook_tx_confirmation& event : events)
|
||||
{
|
||||
if (event.value.second.url == "zmq")
|
||||
continue;
|
||||
if (event.value.second.url.empty() || !net::parse_url(event.value.second.url, url))
|
||||
{
|
||||
MERROR("Bad URL for webhook event: " << event.value.second.url);
|
||||
|
@ -375,6 +379,7 @@ namespace lws
|
|||
events.pop_back(); //cannot compute tx_hash
|
||||
}
|
||||
send_via_http(epee::to_span(events), std::chrono::seconds{5}, verify_mode_);
|
||||
send_via_zmq(client_, epee::to_span(events));
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
@ -781,6 +786,7 @@ namespace lws
|
|||
|
||||
MINFO("Processed " << blocks.size() << " block(s) against " << users.size() << " account(s)");
|
||||
send_via_http(epee::to_span(updated->second), std::chrono::seconds{5}, webhook_verify);
|
||||
send_via_zmq(client, epee::to_span(updated->second));
|
||||
if (updated->first != users.size())
|
||||
{
|
||||
MWARNING("Only updated " << updated->first << " account(s) out of " << users.size() << ", resetting");
|
||||
|
|
40
src/wire/adapted/span.h
Normal file
40
src/wire/adapted/span.h
Normal file
|
@ -0,0 +1,40 @@
|
|||
// Copyright (c) 2023, The Monero Project
|
||||
// All rights reserved.
|
||||
//
|
||||
// Redistribution and use in source and binary forms, with or without modification, are
|
||||
// permitted provided that the following conditions are met:
|
||||
//
|
||||
// 1. Redistributions of source code must retain the above copyright notice, this list of
|
||||
// conditions and the following disclaimer.
|
||||
//
|
||||
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
|
||||
// of conditions and the following disclaimer in the documentation and/or other
|
||||
// materials provided with the distribution.
|
||||
//
|
||||
// 3. Neither the name of the copyright holder nor the names of its contributors may be
|
||||
// used to endorse or promote products derived from this software without specific
|
||||
// prior written permission.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
|
||||
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
|
||||
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
|
||||
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
||||
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
|
||||
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "span.h" // monero/contrib/epee/include
|
||||
#include "wire/traits.h"
|
||||
|
||||
namespace wire
|
||||
{
|
||||
//! Enable span types for array output
|
||||
template<typename T>
|
||||
struct is_array<epee::span<T>>
|
||||
: std::true_type
|
||||
{};
|
||||
}
|
Loading…
Reference in a new issue