diff --git a/src/db/data.cpp b/src/db/data.cpp index 5f7e188..89a35e8 100644 --- a/src/db/data.cpp +++ b/src/db/data.cpp @@ -34,6 +34,7 @@ #include "wire/json/write.h" #include "wire/msgpack.h" #include "wire/uuid.h" +#include "wire/wrapper/defaulted.h" namespace lws { @@ -126,9 +127,11 @@ namespace db const auto payment_id = payment_bytes.empty() ? nullptr : std::addressof(payment_bytes); + // defaulted will omit "id" and "block" when the output is in the + // txpool with no valid values. wire::object(dest, - wire::field<0>("id", std::cref(self.spend_meta.id)), - wire::field<1>("block", self.link.height), + wire::optional_field<0>("id", wire::defaulted(std::cref(self.spend_meta.id), output_id::txpool())), + wire::optional_field<1>("block", wire::defaulted(self.link.height, block_id::txpool)), wire::field<2>("index", self.spend_meta.index), wire::field<3>("amount", self.spend_meta.amount), wire::field<4>("timestamp", self.timestamp), diff --git a/src/db/data.h b/src/db/data.h index bf355ad..a8fdd76 100644 --- a/src/db/data.h +++ b/src/db/data.h @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -63,12 +64,19 @@ namespace db WIRE_AS_INTEGER(account_time); //! References a block height - enum class block_id : std::uint64_t {}; + enum class block_id : std::uint64_t + { + txpool = std::uint64_t(-1) //! Represents not-yet-a-block + }; WIRE_AS_INTEGER(block_id); //! References a global output number, as determined by the public chain struct output_id { + //! \return Special ID for outputs not yet in a block. + static constexpr output_id txpool() noexcept + { return {0, std::numeric_limits::max()}; } + std::uint64_t high; //!< Amount on public chain; rct outputs are `0` std::uint64_t low; //!< Offset within `amount` on the public chain }; diff --git a/src/db/storage.cpp b/src/db/storage.cpp index 5bf9438..0e7e356 100644 --- a/src/db/storage.cpp +++ b/src/db/storage.cpp @@ -749,6 +749,42 @@ namespace db return requests.get_value(value); } + expect> + storage_reader::find_webhook(webhook_key const& key, crypto::hash8 const& payment_id, cursor::webhooks cur) + { + MONERO_PRECOND(txn != nullptr); + assert(db != nullptr); + MONERO_CHECK(check_cursor(*txn, db->tables.webhooks, cur)); + + webhook_dupsort dup{}; + + static_assert(sizeof(dup.payment_id) == sizeof(payment_id), "bad memcpy"); + std::memcpy(std::addressof(dup.payment_id), std::addressof(payment_id), sizeof(payment_id)); + + MDB_val lkey = lmdb::to_val(key); + MDB_val lvalue = lmdb::to_val(dup); + + std::vector result{}; + int err = mdb_cursor_get(cur.get(), &lkey, &lvalue, MDB_GET_BOTH_RANGE); + for (;;) + { + if (err) + { + if (err == MDB_NOTFOUND) + break; + return {lmdb::error(err)}; + } + + if (webhooks.get_fixed_value(lvalue) != dup.payment_id) + break; + + result.push_back(MONERO_UNWRAP(webhooks.get_value(lvalue))); + err = mdb_cursor_get(cur.get(), &lkey, &lvalue, MDB_NEXT_DUP); + } + + return result; + } + expect>>> storage_reader::get_webhooks(cursor::webhooks cur) { diff --git a/src/db/storage.h b/src/db/storage.h index 87a22ce..a6c00cd 100644 --- a/src/db/storage.h +++ b/src/db/storage.h @@ -133,6 +133,10 @@ namespace db expect get_request(request type, account_address const& address, cursor::requests cur = nullptr) noexcept; + //! \return All webhook values associated with user `key` and `payment_id`. + expect> + find_webhook(webhook_key const& key, crypto::hash8 const& payment_id, cursor::webhooks cur = nullptr); + //! \return All webhooks in the DB expect>>> get_webhooks(cursor::webhooks cur = nullptr); diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index 4ce7048..14b5b41 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -51,6 +51,7 @@ namespace rpc constexpr const char abort_scan_signal[] = "SCAN"; constexpr const char abort_process_signal[] = "PROCESS"; constexpr const char minimal_chain_topic[] = "json-minimal-chain_main"; + constexpr const char full_txpool_topic[] = "json-full-txpool_add"; constexpr const int daemon_zmq_linger = 0; constexpr const std::chrono::seconds chain_poll_timeout{20}; constexpr const std::chrono::minutes chain_sub_timeout{2}; @@ -225,10 +226,9 @@ namespace rpc if (out.daemon_sub.get() == nullptr) return net::zmq::get_error_code(); - option = 1; // keep only last pub message from daemon MONERO_ZMQ_CHECK(zmq_connect(out.daemon_sub.get(), out.ctx->sub_addr.c_str())); - MONERO_ZMQ_CHECK(zmq_setsockopt(out.daemon_sub.get(), ZMQ_CONFLATE, &option, sizeof(option))); MONERO_CHECK(do_subscribe(out.daemon_sub.get(), minimal_chain_topic)); + MONERO_CHECK(do_subscribe(out.daemon_sub.get(), full_txpool_topic)); } out.signal_sub.reset(zmq_socket(out.ctx->comm.get(), ZMQ_SUB)); @@ -250,7 +250,7 @@ namespace rpc return do_subscribe(signal_sub.get(), abort_scan_signal); } - expect client::wait_for_block() + expect>> client::wait_for_block() { MONERO_PRECOND(ctx != nullptr); assert(daemon != nullptr); @@ -271,14 +271,45 @@ namespace rpc return ready.error(); } } - expect pub = net::zmq::receive(daemon_sub.get(), ZMQ_DONTWAIT); - if (!pub) - return pub.error(); - if (!boost::string_ref{*pub}.starts_with(minimal_chain_topic)) - return {lws::error::bad_daemon_response}; - pub->erase(0, sizeof(minimal_chain_topic)); - return minimal_chain_pub::from_json(std::move(*pub)); + std::vector> messages{}; + for (; /*every message */ ;) + { + expect pub = net::zmq::receive(daemon_sub.get(), ZMQ_DONTWAIT); + if (!pub) + { + if (pub == net::zmq::make_error_code(EAGAIN)) + return {std::move(messages)}; + return pub.error(); + } + if (pub->size() < 5) + break; // for loop + + switch (pub->at(5)) + { + case 'm': // json-minimal-chain_main + if (boost::string_ref{*pub}.starts_with(minimal_chain_topic)) + { + pub->erase(0, sizeof(minimal_chain_topic)); + messages.emplace_back(topic::block, std::move(*pub)); + } + else + MWARNING("Unexpected pub/sub message"); + break; + case 'f': // json-full-txpool_add + if (boost::string_ref{*pub}.starts_with(full_txpool_topic)) + { + pub->erase(0, sizeof(full_txpool_topic)); + messages.emplace_back(topic::txpool, std::move(*pub)); + } + else + MWARNING("Unexpected pub/sub message"); + break; + default: + break; + } + } // for every message + return {lws::error::bad_daemon_response}; } expect client::send(epee::byte_slice message, std::chrono::seconds timeout) noexcept diff --git a/src/rpc/client.h b/src/rpc/client.h index 08d37a0..06db02f 100644 --- a/src/rpc/client.h +++ b/src/rpc/client.h @@ -75,6 +75,12 @@ namespace rpc expect get_response(cryptonote::rpc::Message& response, std::chrono::seconds timeout, source_location loc); public: + + enum class topic : std::uint8_t + { + block = 0, txpool + }; + //! A client with no connection (all send/receive functions fail). explicit client() noexcept : ctx(), daemon(), daemon_sub(), signal_sub() @@ -110,7 +116,7 @@ namespace rpc expect watch_scan_signals() noexcept; //! Wait for new block announce or internal timeout. - expect wait_for_block(); + expect>> wait_for_block(); //! \return A JSON message for RPC request `M`. template diff --git a/src/rpc/daemon_pub.cpp b/src/rpc/daemon_pub.cpp index 1bb343f..944d3c7 100644 --- a/src/rpc/daemon_pub.cpp +++ b/src/rpc/daemon_pub.cpp @@ -27,6 +27,8 @@ #include "daemon_pub.h" +#include "cryptonote_basic/cryptonote_basic.h" // monero/src +#include "rpc/daemon_zmq.h" #include "wire/crypto.h" #include "wire/error.h" #include "wire/field.h" @@ -83,5 +85,19 @@ namespace rpc return err; return {std::move(out)}; } + + static void read_bytes(wire::json_reader& source, full_txpool_pub& self) + { + wire_read::array(source, self.txes); + } + + expect full_txpool_pub::from_json(std::string&& source) + { + full_txpool_pub out{}; + std::error_code err = wire::json::from_bytes(std::move(source), out); + if (err) + return err; + return {std::move(out)}; + } } } diff --git a/src/rpc/daemon_pub.h b/src/rpc/daemon_pub.h index 60ed33d..6f31372 100644 --- a/src/rpc/daemon_pub.h +++ b/src/rpc/daemon_pub.h @@ -34,6 +34,7 @@ #include "crypto/hash.h" // monero/src #include "wire/json/fwd.h" +namespace cryptonote { class transaction; } namespace lws { namespace rpc @@ -46,5 +47,12 @@ namespace rpc static expect from_json(std::string&&); }; + + struct full_txpool_pub + { + std::vector txes; + + static expect from_json(std::string&&); + }; } } diff --git a/src/rpc/daemon_zmq.cpp b/src/rpc/daemon_zmq.cpp index 5ef55c8..1e62d5f 100644 --- a/src/rpc/daemon_zmq.cpp +++ b/src/rpc/daemon_zmq.cpp @@ -42,6 +42,7 @@ namespace constexpr const std::size_t default_inputs = 2; constexpr const std::size_t default_outputs = 4; constexpr const std::size_t default_txextra_size = 2048; + constexpr const std::size_t default_txpool_size = 32; } namespace rct @@ -141,7 +142,7 @@ namespace cryptonote ); } - static void read_bytes(wire::json_reader& source, transaction& self) + void read_bytes(wire::json_reader& source, transaction& self) { self.vin.reserve(default_inputs); self.vout.reserve(default_outputs); @@ -177,6 +178,11 @@ namespace cryptonote self.transactions.reserve(default_transaction_count); wire::object(source, WIRE_FIELD(block), WIRE_FIELD(transactions)); } + + static void read_bytes(wire::json_reader& source, tx_in_pool& self) + { + wire::object(source, WIRE_FIELD(tx), WIRE_FIELD(tx_hash)); + } } // rpc } // cryptonote @@ -187,3 +193,8 @@ void lws::rpc::read_bytes(wire::json_reader& source, get_blocks_fast_response& s wire::object(source, WIRE_FIELD(blocks), WIRE_FIELD(output_indices), WIRE_FIELD(start_height), WIRE_FIELD(current_height)); } +void lws::rpc::read_bytes(wire::json_reader& source, get_transaction_pool_response& self) +{ + self.transactions.reserve(default_txpool_size); + wire::object(source, WIRE_FIELD(transactions)); +} diff --git a/src/rpc/daemon_zmq.h b/src/rpc/daemon_zmq.h index 3f694e1..3bd3896 100644 --- a/src/rpc/daemon_zmq.h +++ b/src/rpc/daemon_zmq.h @@ -40,9 +40,13 @@ namespace crypto namespace cryptonote { + class transaction; + void read_bytes(wire::json_reader& source, transaction& self); + namespace rpc { struct block_with_transactions; + struct tx_in_pool; } } @@ -71,5 +75,21 @@ namespace rpc using response = get_blocks_fast_response; }; void read_bytes(wire::json_reader&, get_blocks_fast_response&); + + struct get_transaction_pool_request + { + get_transaction_pool_request() = delete; + }; + struct get_transaction_pool_response + { + get_transaction_pool_response() = delete; + std::vector transactions; + }; + struct get_transaction_pool + { + using request = get_transaction_pool_request; + using response = get_transaction_pool_response; + }; + void read_bytes(wire::json_reader&, get_transaction_pool_response&); } // rpc } // lws diff --git a/src/scanner.cpp b/src/scanner.cpp index 5cf0391..a8b4492 100644 --- a/src/scanner.cpp +++ b/src/scanner.cpp @@ -35,6 +35,8 @@ #include #include #include +#include +#include #include #include #include @@ -50,7 +52,8 @@ #include "misc_log_ex.h" // monero/contrib/epee/include #include "net/http_client.h" #include "net/net_parse_helpers.h" -#include "rpc/daemon_messages.h" // monero/src +#include "rpc/daemon_messages.h" // monero/src +#include "rpc/message_data_structs.h" // monero/src #include "rpc/daemon_zmq.h" #include "rpc/json.h" #include "util/source_location.h" @@ -119,9 +122,16 @@ namespace lws } } - bool is_new_block(db::storage& disk, const account& user, const rpc::minimal_chain_pub& chain) + bool is_new_block(std::string&& chain_msg, db::storage& disk, const account& user) { - if (user.scan_height() < db::block_id(chain.top_block_height)) + const auto chain = rpc::minimal_chain_pub::from_json(std::move(chain_msg)); + if (!chain) + { + MERROR("Unable to parse blockchain notification: " << chain.error()); + return false; + } + + if (user.scan_height() < db::block_id(chain->top_block_height)) return true; auto reader = disk.start_read(); @@ -132,8 +142,8 @@ namespace lws } // check possible chain rollback daemon side - const expect id = reader->get_block_hash(db::block_id(chain.top_block_height)); - if (!id || *id != chain.top_block_id) + const expect id = reader->get_block_hash(db::block_id(chain->top_block_height)); + if (!id || *id != chain->top_block_id) return true; // check possible chain rollback from other thread @@ -241,13 +251,103 @@ namespace lws } }; - void scan_transaction( + struct add_spend + { + void operator()(lws::account& user, const db::spend& spend) const + { user.add_spend(spend); } + }; + struct add_output + { + bool operator()(lws::account& user, const db::output& out) const + { return user.add_out(out); } + }; + + struct null_spend + { + void operator()(lws::account&, const db::spend&) const noexcept + {} + }; + struct send_webhook + { + db::storage const& disk_; + rpc::client& client_; + net::ssl_verification_t verify_mode_; + std::unordered_map txpool_; + + bool operator()(lws::account& user, const db::output& out) + { + /* Upstream monerod does not send all fields for a transaction, so + mempool notifications cannot compute tx_hash correctly (it is not + sent separately, a further blunder). Instead, if there are matching + outputs with webhooks, fetch mempool to compare tx_prefix_hash and + then use corresponding tx_hash. */ + const db::webhook_key key{user.id(), db::webhook_type::tx_confirmation}; + std::vector hooks{}; + { + auto reader = disk_.start_read(); + if (!reader) + { + MERROR("Unable to lookup webhook on tx in pool: " << reader.error().message()); + return false; + } + auto found = reader->find_webhook(key, out.payment_id.short_); + if (!found) + { + MERROR("Failed db lookup for webhooks: " << found.error().message()); + return false; + } + hooks = std::move(*found); + } + + if (!hooks.empty() && txpool_.empty()) + { + cryptonote::rpc::GetTransactionPool::Request req{}; + if (!send(client_, rpc::client::make_message("get_transaction_pool", req))) + { + MERROR("Unable to compute tx hash for webhook, aborting"); + return false; + } + auto resp = client_.get_message(std::chrono::seconds{3}); + if (!resp) + { + MERROR("Unable to get txpool: " << resp.error().message()); + return false; + } + + rpc::json::response txpool{}; + const std::error_code err = wire::json::from_bytes(std::move(*resp), txpool); + if (err) + MONERO_THROW(err, "Invalid json-rpc"); + for (auto& tx : txpool.result.transactions) + txpool_.emplace(get_transaction_prefix_hash(tx.tx), tx.tx_hash); + } + + std::vector events{}; + for (auto& hook : hooks) + { + events.push_back(db::webhook_tx_confirmation{key, std::move(hook), out}); + events.back().value.second.confirmations = 0; + + const auto hash = txpool_.find(out.tx_prefix_hash); + if (hash != txpool_.end()) + events.back().tx_info.link.tx_hash = hash->second; + else + events.pop_back(); //cannot compute tx_hash + } + send_via_http(epee::to_span(events), std::chrono::seconds{5}, verify_mode_); + return true; + } + }; + + void scan_transaction_base( epee::span users, const db::block_id height, const std::uint64_t timestamp, crypto::hash const& tx_hash, cryptonote::transaction const& tx, - std::vector const& out_ids) + std::vector const& out_ids, + std::function spend_action, + std::function output_action) { if (2 < tx.version) throw std::runtime_error{"Unsupported tx version"}; @@ -302,7 +402,8 @@ namespace lws goffset += offset; if (user.has_spendable(db::output_id{in_data->amount, goffset})) { - user.add_spend( + spend_action( + user, db::spend{ db::transaction_link{height, tx_hash}, in_data->k_image, @@ -377,7 +478,8 @@ namespace lws } } - const bool added = user.add_out( + const bool added = output_action( + user, db::output{ db::transaction_link{height, tx_hash}, db::output::spend_meta_{ @@ -405,6 +507,39 @@ namespace lws } // for all users } + void scan_transaction( + epee::span users, + const db::block_id height, + const std::uint64_t timestamp, + crypto::hash const& tx_hash, + cryptonote::transaction const& tx, + std::vector const& out_ids) + { + scan_transaction_base(users, height, timestamp, tx_hash, tx, out_ids, add_spend{}, add_output{}); + } + + void scan_transactions(std::string&& txpool_msg, epee::span users, db::storage const& disk, rpc::client& client, const net::ssl_verification_t verify_mode) + { + // uint64::max is for txpool + static const std::vector fake_outs( + 256, std::numeric_limits::max() + ); + + const auto parsed = rpc::full_txpool_pub::from_json(std::move(txpool_msg)); + if (!parsed) + { + MERROR("Failed parsing txpool pub: " << parsed.error().message()); + return; + } + + const auto time = + boost::numeric_cast(std::chrono::system_clock::to_time_t(std::chrono::system_clock::now())); + + send_webhook sender{disk, client, verify_mode}; + for (const auto& tx : parsed->txes) + scan_transaction_base(users, db::block_id::txpool, time, crypto::hash{}, tx, fake_outs, null_spend{}, sender); + } + void update_rates(rpc::context& ctx) { const expect> new_rates = ctx.retrieve_rates(); @@ -489,20 +624,40 @@ namespace lws if (fetched.result.blocks.size() <= 1) { // synced to top of chain, wait for next blocks - for (;;) + for (bool wait_for_block = true; wait_for_block; ) { - const expect new_block = client.wait_for_block(); - if (new_block.matches(std::errc::interrupted)) - return; - if (!new_block || is_new_block(disk, users.front(), *new_block)) - break; - } + expect>> new_pubs = client.wait_for_block(); + if (new_pubs.matches(std::errc::interrupted)) + return; // reset entire state (maybe shutdown) + + if (!new_pubs) + break; // exit wait for block loop, and try fetching new blocks + + // put txpool messages before block messages + static_assert(rpc::client::topic::block < rpc::client::topic::txpool, "bad sort"); + std::sort(new_pubs->begin(), new_pubs->end(), std::greater<>{}); + + // process txpool first + auto message = new_pubs->begin(); + for ( ; message != new_pubs->end(); ++message) + { + if (message->first != rpc::client::topic::txpool) + break; // inner for loop + scan_transactions(std::move(message->second), epee::to_mut_span(users), disk, client, webhook_verify); + } + + for ( ; message != new_pubs->end(); ++message) + { + if (message->first == rpc::client::topic::block && is_new_block(std::move(message->second), disk, users.front())) + wait_for_block = false; + } + } // wait for block // request next chunk of blocks if (!send(client, block_request.clone())) return; continue; // to next get_blocks_fast read - } + } // if only one block was fetched // request next chunk of blocks if (!send(client, block_request.clone())) diff --git a/src/wire/wrapper/defaulted.h b/src/wire/wrapper/defaulted.h new file mode 100644 index 0000000..f3529d7 --- /dev/null +++ b/src/wire/wrapper/defaulted.h @@ -0,0 +1,78 @@ +// Copyright (c) 2021-2023, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include +#include + +#include "wire/field.h" +#include "wire/traits.h" + +//! An optional field that is omitted when a default value is used +#define WIRE_FIELD_DEFAULTED(name, default_) \ + ::wire::optional_field( #name , ::wire::defaulted(std::ref( self . name ), default_ )) + +namespace wire +{ + /*! A wrapper that tells `wire::writer`s to skip field generation when default + value, and tells `wire::reader`s to use default value when field not present. */ + template + struct defaulted_ + { + using value_type = typename unwrap_reference::type; + + T value; + U default_; + + constexpr const value_type& get_value() const noexcept { return value; } + value_type& get_value() noexcept { return value; } + + // concept requirements for optional fields + + constexpr explicit operator bool() const { return get_value() != default_; } + value_type& emplace() noexcept { return get_value(); } + + constexpr const value_type& operator*() const noexcept { return get_value(); } + value_type& operator*() noexcept { return get_value(); } + + void reset() { get_value() = default_; } + }; + + //! Links `value` with `default_`. + template + inline constexpr defaulted_ defaulted(T value, U default_) + { + return {std::move(value), std::move(default_)}; + } + + /* read/write functions not needed since `defaulted_` meets the concept + requirements for an optional type (optional fields are handled + directly by the generic read/write code because the field name is omitted + entirely when the value is "empty"). */ +} // wire +