From 41919562946bb4dd7adfc6d1fb57a57a9197ba95 Mon Sep 17 00:00:00 2001 From: Lee *!* Clagett Date: Sun, 31 Mar 2024 13:32:33 -0400 Subject: [PATCH] ZMQ pub blocks (#97) --- docs/zmq.md | 26 ++++++++++++ src/db/fwd.h | 2 + src/db/storage.h | 2 +- src/rpc/CMakeLists.txt | 4 +- src/rpc/fwd.h | 4 ++ src/rpc/lws_pub.cpp | 89 ++++++++++++++++++++++++++++++++++++++++++ src/rpc/lws_pub.h | 38 ++++++++++++++++++ src/rpc/webhook.h | 28 +++++++++++++ src/scanner.cpp | 8 +++- src/wire/write.h | 8 +++- 10 files changed, 204 insertions(+), 5 deletions(-) create mode 100644 src/rpc/lws_pub.cpp create mode 100644 src/rpc/lws_pub.h diff --git a/docs/zmq.md b/docs/zmq.md index a44e2c4..9d6b1e0 100644 --- a/docs/zmq.md +++ b/docs/zmq.md @@ -15,6 +15,10 @@ option. Users are still required to "subscribe" to topics: creation that has recently triggered (identical output as webhook). * `msgpack-full-new_account_hook`: A msgpack object of a single new account creation that has recently triggered (identical output as webhook). + * `json-minimal-scanned`: A JSON object of a list of user primary addresses, + with their new height and block hash. + * `msgpack-minimal-scanned:` A msgpack object of a list of user primary + addresses with their new height and block hash. ### `json-full-payment_hook`/`msgpack-full-payment_hook` @@ -95,3 +99,25 @@ PUB/SUB subscription model. The subscriber requests data from a certain "topic" where matching is done by string prefixes. > `index` is a counter used to detect dropped messages. + + +### `json-minimal-scanned`/`msgpack-minimal-scanned` +These topics receive PUB messages when a thread has finished scanning 1+ +accounts. The last block height and hash is sent. + +Example of the "raw" output from ZMQ-SUB side: +```json +json-minimal-scanned:{ + "index": 13, + "event": { + "height": 2438536, + "id": "9197e1c6f3de28a98dfc579325903e5416ef1ba2681043c54b5fff0d39645a7f", + "addresses": [ + "9xkhhJSa7ZhS5sAcTix6ozL14RwdgxbV7JZVFW4rCghN7GidutaykfxDHfgW45UPiCTXncuvZ91GNSGgxs3b2Cin9TU8nP3" + ] + } +} +``` + +> `index` is a counter used to detect dropped messages + diff --git a/src/db/fwd.h b/src/db/fwd.h index 3027823..583b240 100644 --- a/src/db/fwd.h +++ b/src/db/fwd.h @@ -31,6 +31,8 @@ namespace lws { + class account; + namespace db { enum account_flags : std::uint8_t; diff --git a/src/db/storage.h b/src/db/storage.h index a0a6937..f6d57ee 100644 --- a/src/db/storage.h +++ b/src/db/storage.h @@ -274,7 +274,7 @@ namespace db \param chain List of block hashes that `accts` were scanned against. \param accts Updated to `height + chain.size()` scan height. - \return True iff LMDB successfully committed the update. + \return Number of updated accounts, and a list of webhooks that triggered. */ expect>> update(block_id height, epee::span chain, epee::span accts, epee::span pow); diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt index 6b28d11..ec6a3cb 100644 --- a/src/rpc/CMakeLists.txt +++ b/src/rpc/CMakeLists.txt @@ -26,8 +26,8 @@ # STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -set(monero-lws-rpc_sources admin.cpp client.cpp daemon_pub.cpp daemon_zmq.cpp light_wallet.cpp rates.cpp) -set(monero-lws-rpc_headers admin.h client.h daemon_pub.h daemon_zmq.h fwd.h json.h light_wallet.h rates.h) +set(monero-lws-rpc_sources admin.cpp client.cpp daemon_pub.cpp daemon_zmq.cpp light_wallet.cpp lws_pub.cpp rates.cpp) +set(monero-lws-rpc_headers admin.h client.h daemon_pub.h daemon_zmq.h fwd.h json.h light_wallet.h lws_pub.h rates.h) add_library(monero-lws-rpc ${monero-lws-rpc_sources} ${monero-lws-rpc_headers}) target_include_directories(monero-lws-rpc PRIVATE ${RMQ_INCLUDE_DIR}) diff --git a/src/rpc/fwd.h b/src/rpc/fwd.h index 6c9c317..7713c3a 100644 --- a/src/rpc/fwd.h +++ b/src/rpc/fwd.h @@ -29,5 +29,9 @@ namespace lws { + namespace rpc + { + class client; + } struct rates; } diff --git a/src/rpc/lws_pub.cpp b/src/rpc/lws_pub.cpp new file mode 100644 index 0000000..cc81acc --- /dev/null +++ b/src/rpc/lws_pub.cpp @@ -0,0 +1,89 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "lws_pub.h" + +#include +#include "db/account.h" +#include "rpc/client.h" +#include "rpc/webhook.h" +#include "wire/crypto.h" +#include "wire/wrapper/array.h" +#include "wire/wrappers_impl.h" +#include "wire/write.h" + +namespace +{ + constexpr const char json_topic[] = "json-minimal-scanned:"; + constexpr const char msgpack_topic[] = "msgpack-minimal-scanned:"; + + struct get_address + { + std::reference_wrapper user; + }; + + void write_bytes(wire::writer& dest, const get_address& self) + { + wire_write::bytes(dest, self.user.get().address()); + } + + struct minimal_scanned + { + std::reference_wrapper id; + const lws::db::block_id height; + epee::span users; + }; + + void write_bytes(wire::writer& dest, const minimal_scanned& self) + { + const auto just_address = [] (const lws::account& user) + { + return get_address{std::cref(user)}; + }; + + wire::object(dest, + WIRE_FIELD_ID(0, height), + WIRE_FIELD_ID(1, id), + wire::field<2>("addresses", wire::array(self.users | boost::adaptors::transformed(just_address))) + ); + } +} // anonymous + +namespace lws { namespace rpc +{ + void publish_scanned(rpc::client& client, const crypto::hash& id, epee::span users) + { + if (users.empty()) + return; + + const minimal_scanned output{ + std::cref(id), users[0].scan_height(), users + }; + const epee::span event{std::addressof(output), 1}; + zmq_send(client, event, json_topic, msgpack_topic); + } +}} // lws // rpc diff --git a/src/rpc/lws_pub.h b/src/rpc/lws_pub.h new file mode 100644 index 0000000..fc66c03 --- /dev/null +++ b/src/rpc/lws_pub.h @@ -0,0 +1,38 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include "db/fwd.h" +#include "rpc/fwd.h" +#include "crypto/hash.h" // monero/src +#include "span.h" + +namespace lws { namespace rpc +{ + void publish_scanned(rpc::client& client, const crypto::hash& id, epee::span users); +}} // lws // rpc diff --git a/src/rpc/webhook.h b/src/rpc/webhook.h index 6e1a44b..2888df8 100644 --- a/src/rpc/webhook.h +++ b/src/rpc/webhook.h @@ -1,3 +1,31 @@ +// Copyright (c) 2023-2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once #include #include diff --git a/src/scanner.cpp b/src/scanner.cpp index 7696770..151bd02 100644 --- a/src/scanner.cpp +++ b/src/scanner.cpp @@ -58,6 +58,7 @@ #include "rpc/daemon_messages.h" // monero/src #include "rpc/daemon_zmq.h" #include "rpc/json.h" +#include "rpc/lws_pub.h" #include "rpc/message_data_structs.h" // monero/src #include "rpc/webhook.h" #include "util/blocks.h" @@ -741,7 +742,7 @@ namespace lws if (untrusted_daemon) new_pow.push_back(db::pow_sync{fetched->blocks.front().block.timestamp}); - auto blocks = epee::to_span(fetched->blocks); + auto blocks = epee::to_mut_span(fetched->blocks); auto indices = epee::to_span(fetched->output_indices); if (fetched->start_height != 1) @@ -880,6 +881,7 @@ namespace lws MINFO("Processed " << blocks.size() << " block(s) against " << users.size() << " account(s)"); send_payment_hook(client, epee::to_span(updated->second), opts.webhook_verify); + if (updated->first != users.size()) { MWARNING("Only updated " << updated->first << " account(s) out of " << users.size() << ", resetting"); @@ -888,6 +890,10 @@ namespace lws for (account& user : users) user.updated(db::block_id(fetched->start_height)); + + // Publish when all scan threads have past this block + if (!blockchain.empty() && client.has_publish()) + rpc::publish_scanned(client, blockchain.back(), epee::to_span(users)); } } catch (std::exception const& e) diff --git a/src/wire/write.h b/src/wire/write.h index 2756146..c53410b 100644 --- a/src/wire/write.h +++ b/src/wire/write.h @@ -193,7 +193,13 @@ namespace wire_write template inline std::size_t array_size_(std::true_type, const T& source) - { return boost::size(source); } + { + static_assert( + !std::is_same::iterator_category, std::input_iterator_tag>{}, + "Input iterators must use json (or similar) derived classes directly" + ); + return boost::size(source); + } template inline constexpr std::size_t array_size_(std::false_type, const T&) noexcept