diff --git a/docs/zmq.md b/docs/zmq.md index 9d6b1e0..4c22a7c 100644 --- a/docs/zmq.md +++ b/docs/zmq.md @@ -19,6 +19,10 @@ option. Users are still required to "subscribe" to topics: with their new height and block hash. * `msgpack-minimal-scanned:` A msgpack object of a list of user primary addresses with their new height and block hash. + * `json-full-spend_hook': A JSON object of a webhook spend event that has + recently triggerd (identical output as webhook). + * `msgpack-full-spend_hook`: A msgpack object of a single new account + creation that has recently triggered (identical output as webhook). ### `json-full-payment_hook`/`msgpack-full-payment_hook` @@ -100,7 +104,6 @@ where matching is done by string prefixes. > `index` is a counter used to detect dropped messages. - ### `json-minimal-scanned`/`msgpack-minimal-scanned` These topics receive PUB messages when a thread has finished scanning 1+ accounts. The last block height and hash is sent. @@ -115,9 +118,61 @@ json-minimal-scanned:{ "addresses": [ "9xkhhJSa7ZhS5sAcTix6ozL14RwdgxbV7JZVFW4rCghN7GidutaykfxDHfgW45UPiCTXncuvZ91GNSGgxs3b2Cin9TU8nP3" ] + +> `index` is a counter used to detect dropped messages. + +### `json-full-spend_hook`/`msgpack-full-spend_hook` +These topics receive PUB messages when a webhook ([`webhook_add`](administration.md)), +event is triggered for a spend (`tx-spend`). If the specified URL is +`zmq`, then notifications are only done over the ZMQ-PUB socket, otherwise the +notification is sent over ZMQ-PUB socket AND the specified URL. Invoking +`webhook_add` with a `payment_id` or `confirmation` results in a NOP because +both fields are unused for spends. This event is only triggered on +confirmation==1 (`confirmation` field on `webhook_add`s have no effect, and +mempool spends are not scanned). The intent is to notify the user of unexpected +spend operations. The end user will need to use `tx_info.input.image`, +`tx_info.source.index`, and `tx_info.source.tx_public` to determine if the +output was actually spent or being used as a decoy. + +Example of the "raw" output from ZMQ-SUB side: + +```json +json-full-spend_hook:{ + "index": 0, + "event": { + "event": "tx-spend", + "token": "spend-xmr", + "event_id": "7ff047aa74e14f4aa978469bc0eec8ec", + "tx_info": { + "input": { + "height": 2464207, + "tx_hash": "97d4e66c4968b16fec7662adc9f8562c49108d3c5e7030c4d6dd32d97fb62540", + "image": "b0fe7acd9e17bb8b9ac2daae36d4cb607ac60ed8a101cc9b2e1f74016cf80b24", + "source": { + "high": 0, + "low": 6246316 + }, + "timestamp": 1711902214, + "unlock_time": 0, + "mixin_count": 15, + "sender": { + "maj_i": 0, + "min_i": 0 + } + }, + "source": { + "id": { + "high": 0, + "low": 6246316 + }, + "amount": 10000000000, + "mixin": 15, + "index": 0, + "tx_public": "426ccd6d39535a1ee8636d14978581e580fcea35c8d3843ceb32eb688a0197f7" + } + } } } ``` > `index` is a counter used to detect dropped messages - diff --git a/src/db/data.cpp b/src/db/data.cpp index 21cc81f..08c9c3a 100644 --- a/src/db/data.cpp +++ b/src/db/data.cpp @@ -359,7 +359,7 @@ namespace db namespace { - constexpr const char* map_webhook_type[] = {"tx-confirmation", "new-account"}; + constexpr const char* map_webhook_type[] = {"tx-confirmation", "new-account", "tx-spend"}; template<typename F, typename T> void map_webhook_key(F& format, T& self) @@ -423,6 +423,32 @@ namespace db ); } + static void write_bytes(wire::writer& dest, const output::spend_meta_& self) + { + wire::object(dest, + WIRE_FIELD_ID(0, id), + wire::field<1>("amount", self.amount), + wire::field<2>("mixin", self.mixin_count), + wire::field<3>("index", self.index), + WIRE_FIELD_ID(4, tx_public) + ); + } + + static void write_bytes(wire::writer& dest, const webhook_tx_spend::tx_info_& self) + { + wire::object(dest, WIRE_FIELD_ID(0, input), WIRE_FIELD_ID(1, source)); + } + + void write_bytes(wire::writer& dest, const webhook_tx_spend& self) + { + wire::object(dest, + wire::field<0>("event", std::cref(self.key.type)), + wire::field<1>("token", std::cref(self.value.second.token)), + wire::field<2>("event_id", std::cref(self.value.first.event_id)), + WIRE_FIELD_ID(3, tx_info) + ); + } + void write_bytes(wire::json_writer& dest, const webhook_event& self) { crypto::hash8 payment_id; diff --git a/src/db/data.h b/src/db/data.h index 10aca85..b25cfcc 100644 --- a/src/db/data.h +++ b/src/db/data.h @@ -335,7 +335,8 @@ namespace db enum class webhook_type : std::uint8_t { tx_confirmation = 0, // cannot change values - stored in DB - new_account + new_account, + tx_spend // unconfirmed_tx, // new_block // confirmed_tx, @@ -384,6 +385,19 @@ namespace db }; void write_bytes(wire::writer&, const webhook_tx_confirmation&); + //! Returned by DB when a webhook event "tripped" + struct webhook_tx_spend + { + webhook_key key; + webhook_value value; + struct tx_info_ + { + spend input; + output::spend_meta_ source; + } tx_info; + }; + void write_bytes(wire::writer&, const webhook_tx_spend&); + //! References a specific output that triggered a webhook struct webhook_output { diff --git a/src/db/storage.cpp b/src/db/storage.cpp index 8b2cdf1..6a30a9a 100644 --- a/src/db/storage.cpp +++ b/src/db/storage.cpp @@ -2573,18 +2573,69 @@ namespace db } return success(); } + + expect<void> check_spends(std::vector<webhook_tx_spend>& out, MDB_cursor& webhooks_cur, MDB_cursor& outputs_cur, const lws::account& user) + { + const account_id user_id = user.id(); + const webhook_key hook_key{user_id, webhook_type::tx_spend}; + MDB_val key = lmdb::to_val(hook_key); + MDB_val value{}; + + // Find a tx_spend for user id + int err = mdb_cursor_get(&webhooks_cur, &key, &value, MDB_SET_KEY); + for (;;) + { + if (err) + { + if (err != MDB_NOTFOUND) + return {lmdb::error(err)}; + break; + } + + const auto hook = webhooks.get_value(value); + if (hook) + { + out.reserve(user.spends().size()); + for (const spend& s : user.spends()) + { + key = lmdb::to_val(user_id); + value = lmdb::to_val(s.link.height); + err = mdb_cursor_get(&outputs_cur, &key, &value, MDB_GET_BOTH_RANGE); + + expect<output::spend_meta_> meta{common_error::kInvalidArgument}; + for (;;) + { + if (err) + return {lmdb::error(err)}; + meta = outputs.get_value<MONERO_FIELD(output, spend_meta)>(value); + if (!meta) + return meta.error(); + if (meta->id == s.source) + break; + err = mdb_cursor_get(&outputs_cur, &key, &value, MDB_PREV_DUP); + } + + out.push_back( + webhook_tx_spend{hook_key, *hook, {s, *meta}} + ); + } + } + err = mdb_cursor_get(&webhooks_cur, &key, &value, MDB_NEXT_DUP); + } // every hook_key + return success(); + } } // anonymous - expect<std::pair<std::size_t, std::vector<webhook_tx_confirmation>>> storage::update(block_id height, epee::span<const crypto::hash> chain, epee::span<const lws::account> users, epee::span<const pow_sync> pow) + expect<storage::updated> storage::update(block_id height, epee::span<const crypto::hash> chain, epee::span<const lws::account> users, epee::span<const pow_sync> pow) { if (users.empty() && chain.empty()) - return {std::make_pair(0, std::vector<webhook_tx_confirmation>{})}; + return {updated{}}; MONERO_PRECOND(!chain.empty()); MONERO_PRECOND(db != nullptr); if (!pow.empty()) MONERO_PRECOND(chain.size() == pow.size()); - return db->try_write([this, height, chain, users, pow] (MDB_txn& txn) -> expect<std::pair<std::size_t, std::vector<webhook_tx_confirmation>>> + return db->try_write([this, height, chain, users, pow] (MDB_txn& txn) -> expect<updated> { epee::span<const crypto::hash> chain_copy{chain}; epee::span<const pow_sync> pow_copy{pow}; @@ -2593,7 +2644,7 @@ namespace db const std::uint64_t first_new = lmdb::to_native(height) + 1; // collect all .value() errors - std::pair<std::size_t, std::vector<webhook_tx_confirmation>> updated; + updated out{}; if (get_checkpoints().get_max_height() <= last_update) { cursor::blocks blocks_cur; @@ -2652,7 +2703,7 @@ namespace db const auto cur_block = blocks.get_value<block_info>(value); if (!cur_block) return cur_block.error(); - // If a reorg past a checkpoint is being attempted + // If a reorg past a checkpoint is being attempted if (chain[chain.size() - 1] != cur_block->hash) return {error::bad_blockchain}; @@ -2743,13 +2794,14 @@ namespace db MONERO_CHECK(check_hooks(*webhooks_cur, *events_cur, *user)); MONERO_CHECK( add_ongoing_hooks( - updated.second, *webhooks_cur, *outputs_cur, *events_cur, user->id(), block_id(first_new), block_id(last_update + 1) + out.confirm_pubs, *webhooks_cur, *outputs_cur, *events_cur, user->id(), block_id(first_new), block_id(last_update + 1) ) ); + MONERO_CHECK(check_spends(out.spend_pubs, *webhooks_cur, *outputs_cur, *user)); - ++updated.first; + ++out.accounts_updated; } // ... for every account being updated ... - return {std::move(updated)}; + return {std::move(out)}; }); } @@ -2954,7 +3006,7 @@ namespace db key.user = MONERO_UNWRAP(accounts_by_address.get_value<MONERO_FIELD(account_by_address, lookup.id)>(lmvalue)); } - if (key.user == account_id::invalid && type == webhook_type::tx_confirmation) + if (key.user == account_id::invalid && (type == webhook_type::tx_confirmation || type == webhook_type::tx_spend)) return {error::bad_webhook}; lmkey = lmdb::to_val(key); diff --git a/src/db/storage.h b/src/db/storage.h index f6d57ee..46961b1 100644 --- a/src/db/storage.h +++ b/src/db/storage.h @@ -264,6 +264,13 @@ namespace db expect<std::vector<account_address>> reject_requests(request req, epee::span<const account_address> addresses); + //! Status of an `update` request + struct updated + { + std::vector<webhook_tx_spend> spend_pubs; + std::vector<webhook_tx_confirmation> confirm_pubs; + std::size_t accounts_updated; + }; /*! Updates the status of user accounts, even if inactive or hidden. Duplicate receives or spends provided in `accts` are silently ignored. If a gap in @@ -274,15 +281,15 @@ namespace db \param chain List of block hashes that `accts` were scanned against. \param accts Updated to `height + chain.size()` scan height. - \return Number of updated accounts, and a list of webhooks that triggered. + \return Status via `updated` object. */ - expect<std::pair<std::size_t, std::vector<webhook_tx_confirmation>>> + expect<updated> update(block_id height, epee::span<const crypto::hash> chain, epee::span<const lws::account> accts, epee::span<const pow_sync> pow); /*! Adds subaddresses to an account. Upon success, an account will immediately begin tracking them in the scanner. - + \param id of the account to associate new indexes \param addresss of the account (needed to generate subaddress publc key) \param view_key of the account (needed to generate subaddress public key) @@ -300,7 +307,7 @@ namespace db /*! Add webhook to be tracked in the database. The webhook will "call" the specified URL with JSON/msgpack information when the event occurs. - + \param type The webhook event type to be tracked by the DB. \param address is required for `type == tx_confirmation`, and is not not needed for all other types. diff --git a/src/rpc/admin.cpp b/src/rpc/admin.cpp index cc05c95..0b42316 100644 --- a/src/rpc/admin.cpp +++ b/src/rpc/admin.cpp @@ -320,6 +320,10 @@ namespace lws { namespace rpc if (req.address) return {error::bad_webhook}; break; + case db::webhook_type::tx_spend: + if (!req.address) + return {error::bad_webhook}; + break; default: return {error::bad_webhook}; } diff --git a/src/scanner.cpp b/src/scanner.cpp index 151bd02..d136127 100644 --- a/src/scanner.cpp +++ b/src/scanner.cpp @@ -217,6 +217,11 @@ namespace lws vec.erase(vec.begin()); }; + void send_spend_hook(rpc::client& client, const epee::span<const db::webhook_tx_spend> events, net::ssl_verification_t verify_mode) + { + rpc::send_webhook(client, events, "json-full-spend_hook:", "msgpack-full-spend_hook:", std::chrono::seconds{5}, verify_mode); + } + struct by_height { bool operator()(account const& left, account const& right) const noexcept @@ -880,11 +885,11 @@ namespace lws } MINFO("Processed " << blocks.size() << " block(s) against " << users.size() << " account(s)"); - send_payment_hook(client, epee::to_span(updated->second), opts.webhook_verify); - - if (updated->first != users.size()) + send_payment_hook(client, epee::to_span(updated->confirm_pubs), opts.webhook_verify); + send_spend_hook(client, epee::to_span(updated->spend_pubs), opts.webhook_verify); + if (updated->accounts_updated != users.size()) { - MWARNING("Only updated " << updated->first << " account(s) out of " << users.size() << ", resetting"); + MWARNING("Only updated " << updated->accounts_updated << " account(s) out of " << users.size() << ", resetting"); return; } diff --git a/tests/unit/db/webhook.test.cpp b/tests/unit/db/webhook.test.cpp index 38424b3..dc90f04 100644 --- a/tests/unit/db/webhook.test.cpp +++ b/tests/unit/db/webhook.test.cpp @@ -64,6 +64,26 @@ namespace } ); } + void add_spend(lws::account& account, const lws::db::block_id last_id) + { + account.add_spend( + lws::db::spend{ + lws::db::transaction_link{ + lws::db::block_id(lmdb::to_native(last_id) + 1), + crypto::rand<crypto::hash>() + }, + crypto::rand<crypto::key_image>(), + lws::db::output_id{0, 100}, + std::uint64_t(2000), + std::uint64_t(0), + std::uint32_t(16), + {0, 0, 0}, + std::uint8_t(0), + crypto::rand<crypto::hash>(), + lws::db::address_index{lws::db::major_index(1), lws::db::minor_index(0)} + } + ); + } } LWS_CASE("db::storage::*_webhook") @@ -143,25 +163,26 @@ LWS_CASE("db::storage::*_webhook") auto updated = db.update(head.id, chain, {std::addressof(full_account), 1}, nullptr); EXPECT(!updated.has_error()); - EXPECT(updated->first == 1); + EXPECT(updated->spend_pubs.empty()); + EXPECT(updated->accounts_updated == 1); if (i < 3) { - EXPECT(updated->second.size() == 1); - EXPECT(updated->second[0].key.user == lws::db::account_id(1)); - EXPECT(updated->second[0].key.type == lws::db::webhook_type::tx_confirmation); - EXPECT(updated->second[0].value.first.payment_id == 500); - EXPECT(updated->second[0].value.first.event_id == id); - EXPECT(updated->second[0].value.second.url == "http://the_url"); - EXPECT(updated->second[0].value.second.token == "the_token"); - EXPECT(updated->second[0].value.second.confirmations == i + 1); + EXPECT(updated->confirm_pubs.size() == 1); + EXPECT(updated->confirm_pubs[0].key.user == lws::db::account_id(1)); + EXPECT(updated->confirm_pubs[0].key.type == lws::db::webhook_type::tx_confirmation); + EXPECT(updated->confirm_pubs[0].value.first.payment_id == 500); + EXPECT(updated->confirm_pubs[0].value.first.event_id == id); + EXPECT(updated->confirm_pubs[0].value.second.url == "http://the_url"); + EXPECT(updated->confirm_pubs[0].value.second.token == "the_token"); + EXPECT(updated->confirm_pubs[0].value.second.confirmations == i + 1); - EXPECT(updated->second[0].tx_info.link == outs[0].link); - EXPECT(updated->second[0].tx_info.spend_meta.id == outs[0].spend_meta.id); - EXPECT(updated->second[0].tx_info.pub == outs[0].pub); - EXPECT(updated->second[0].tx_info.payment_id.short_ == outs[0].payment_id.short_); + EXPECT(updated->confirm_pubs[0].tx_info.link == outs[0].link); + EXPECT(updated->confirm_pubs[0].tx_info.spend_meta.id == outs[0].spend_meta.id); + EXPECT(updated->confirm_pubs[0].tx_info.pub == outs[0].pub); + EXPECT(updated->confirm_pubs[0].tx_info.payment_id.short_ == outs[0].payment_id.short_); } else - EXPECT(updated->second.empty()); + EXPECT(updated->confirm_pubs.empty()); full_account.updated(head.id); head = {lws::db::block_id(lmdb::to_native(head.id) + 1), chain[1]}; @@ -187,24 +208,107 @@ LWS_CASE("db::storage::*_webhook") const auto updated = db.update(last_block.id, chain, {std::addressof(full_account), 1}, nullptr); EXPECT(!updated.has_error()); - EXPECT(updated->first == 1); - EXPECT(updated->second.size() == 3); + EXPECT(updated->spend_pubs.empty()); + EXPECT(updated->accounts_updated == 1); + EXPECT(updated->confirm_pubs.size() == 3); for (unsigned i = 0; i < 3; ++i) { - EXPECT(updated->second[i].key.user == lws::db::account_id(1)); - EXPECT(updated->second[i].key.type == lws::db::webhook_type::tx_confirmation); - EXPECT(updated->second[i].value.first.payment_id == 500); - EXPECT(updated->second[i].value.first.event_id == id); - EXPECT(updated->second[i].value.second.url == "http://the_url"); - EXPECT(updated->second[i].value.second.token == "the_token"); - EXPECT(updated->second[i].value.second.confirmations == i + 1); + EXPECT(updated->confirm_pubs[i].key.user == lws::db::account_id(1)); + EXPECT(updated->confirm_pubs[i].key.type == lws::db::webhook_type::tx_confirmation); + EXPECT(updated->confirm_pubs[i].value.first.payment_id == 500); + EXPECT(updated->confirm_pubs[i].value.first.event_id == id); + EXPECT(updated->confirm_pubs[i].value.second.url == "http://the_url"); + EXPECT(updated->confirm_pubs[i].value.second.token == "the_token"); + EXPECT(updated->confirm_pubs[i].value.second.confirmations == i + 1); - EXPECT(updated->second[i].tx_info.link == outs[0].link); - EXPECT(updated->second[i].tx_info.spend_meta.id == outs[0].spend_meta.id); - EXPECT(updated->second[i].tx_info.pub == outs[0].pub); - EXPECT(updated->second[i].tx_info.payment_id.short_ == outs[0].payment_id.short_); + EXPECT(updated->confirm_pubs[i].tx_info.link == outs[0].link); + EXPECT(updated->confirm_pubs[i].tx_info.spend_meta.id == outs[0].spend_meta.id); + EXPECT(updated->confirm_pubs[i].tx_info.pub == outs[0].pub); + EXPECT(updated->confirm_pubs[i].tx_info.payment_id.short_ == outs[0].payment_id.short_); } } + SECTION("Add db spend") + { + const boost::uuids::uuid other_id = boost::uuids::random_generator{}(); + { + lws::db::webhook_value value{ + lws::db::webhook_dupsort{0, other_id}, + lws::db::webhook_data{"http://the_url_spend", "the_token_spend"} + }; + MONERO_UNWRAP( + db.add_webhook(lws::db::webhook_type::tx_spend, account, std::move(value)) + ); + } + SECTION("storage::get_webhooks()") + { + lws::db::storage_reader reader = MONERO_UNWRAP(db.start_read()); + const auto result = MONERO_UNWRAP(reader.get_webhooks()); + EXPECT(result.size() == 2); + EXPECT(result[0].first.user == lws::db::account_id(1)); + EXPECT(result[0].first.type == lws::db::webhook_type::tx_confirmation); + EXPECT(result[0].second.size() == 1); + EXPECT(result[0].second[0].first.payment_id == 500); + EXPECT(result[0].second[0].first.event_id == id); + EXPECT(result[0].second[0].second.url == "http://the_url"); + EXPECT(result[0].second[0].second.token == "the_token"); + EXPECT(result[0].second[0].second.confirmations == 3); + + EXPECT(result[1].first.user == lws::db::account_id(1)); + EXPECT(result[1].first.type == lws::db::webhook_type::tx_spend); + EXPECT(result[1].second.size() == 1); + EXPECT(result[1].second[0].first.payment_id == 0); + EXPECT(result[1].second[0].first.event_id == other_id); + EXPECT(result[1].second[0].second.url == "http://the_url_spend"); + EXPECT(result[1].second[0].second.token == "the_token_spend"); + EXPECT(result[1].second[0].second.confirmations == 0); + } + const crypto::hash chain[5] = { + last_block.hash, + crypto::rand<crypto::hash>(), + crypto::rand<crypto::hash>(), + crypto::rand<crypto::hash>(), + crypto::rand<crypto::hash>() + }; + + lws::account full_account = lws::db::test::make_account(account, view); + full_account.updated(last_block.id); + EXPECT(add_out(full_account, last_block.id, 500)); + add_spend(full_account, last_block.id); + const auto outs = full_account.outputs(); + const auto spends = full_account.spends(); + EXPECT(outs.size() == 1); + EXPECT(spends.size() == 1); + + const auto updated = db.update(last_block.id, chain, {std::addressof(full_account), 1}, nullptr); + EXPECT(!updated.has_error()); + EXPECT(updated->accounts_updated == 1); + EXPECT(updated->confirm_pubs.size() == 3); + EXPECT(updated->spend_pubs.size() == 1); + + EXPECT(updated->spend_pubs[0].key.user == lws::db::account_id(1)); + EXPECT(updated->spend_pubs[0].key.type == lws::db::webhook_type::tx_spend); + EXPECT(updated->spend_pubs[0].value.first.payment_id == 0); + EXPECT(updated->spend_pubs[0].value.first.event_id == other_id); + EXPECT(updated->spend_pubs[0].value.second.url == "http://the_url_spend"); + EXPECT(updated->spend_pubs[0].value.second.token == "the_token_spend"); + EXPECT(updated->spend_pubs[0].value.second.confirmations == 0); + + EXPECT(updated->spend_pubs[0].tx_info.input.link == spends[0].link); + EXPECT(updated->spend_pubs[0].tx_info.input.image == spends[0].image); + EXPECT(updated->spend_pubs[0].tx_info.input.timestamp == spends[0].timestamp); + EXPECT(updated->spend_pubs[0].tx_info.input.unlock_time == spends[0].unlock_time); + EXPECT(updated->spend_pubs[0].tx_info.input.mixin_count == spends[0].mixin_count); + EXPECT(updated->spend_pubs[0].tx_info.input.length == spends[0].length); + EXPECT(updated->spend_pubs[0].tx_info.input.payment_id == spends[0].payment_id); + EXPECT(updated->spend_pubs[0].tx_info.input.sender.maj_i == lws::db::major_index(1)); + EXPECT(updated->spend_pubs[0].tx_info.input.sender.min_i == lws::db::minor_index(0)); + + EXPECT(updated->spend_pubs[0].tx_info.source.id == outs[0].spend_meta.id); + EXPECT(updated->spend_pubs[0].tx_info.source.amount == outs[0].spend_meta.amount); + EXPECT(updated->spend_pubs[0].tx_info.source.mixin_count == outs[0].spend_meta.mixin_count); + EXPECT(updated->spend_pubs[0].tx_info.source.index == outs[0].spend_meta.index); + EXPECT(updated->spend_pubs[0].tx_info.source.tx_public == outs[0].spend_meta.tx_public); + } } }