ZMQ Pub Spends (#101)

This commit is contained in:
Lee *!* Clagett 2024-04-07 19:48:12 -04:00 committed by Lee *!* Clagett
parent fe9d861dfb
commit 38c4999555
8 changed files with 315 additions and 48 deletions

View file

@ -19,6 +19,10 @@ option. Users are still required to "subscribe" to topics:
with their new height and block hash.
* `msgpack-minimal-scanned:` A msgpack object of a list of user primary
addresses with their new height and block hash.
* `json-full-spend_hook': A JSON object of a webhook spend event that has
recently triggerd (identical output as webhook).
* `msgpack-full-spend_hook`: A msgpack object of a single new account
creation that has recently triggered (identical output as webhook).
### `json-full-payment_hook`/`msgpack-full-payment_hook`
@ -100,7 +104,6 @@ where matching is done by string prefixes.
> `index` is a counter used to detect dropped messages.
### `json-minimal-scanned`/`msgpack-minimal-scanned`
These topics receive PUB messages when a thread has finished scanning 1+
accounts. The last block height and hash is sent.
@ -115,9 +118,61 @@ json-minimal-scanned:{
"addresses": [
"9xkhhJSa7ZhS5sAcTix6ozL14RwdgxbV7JZVFW4rCghN7GidutaykfxDHfgW45UPiCTXncuvZ91GNSGgxs3b2Cin9TU8nP3"
]
> `index` is a counter used to detect dropped messages.
### `json-full-spend_hook`/`msgpack-full-spend_hook`
These topics receive PUB messages when a webhook ([`webhook_add`](administration.md)),
event is triggered for a spend (`tx-spend`). If the specified URL is
`zmq`, then notifications are only done over the ZMQ-PUB socket, otherwise the
notification is sent over ZMQ-PUB socket AND the specified URL. Invoking
`webhook_add` with a `payment_id` or `confirmation` results in a NOP because
both fields are unused for spends. This event is only triggered on
confirmation==1 (`confirmation` field on `webhook_add`s have no effect, and
mempool spends are not scanned). The intent is to notify the user of unexpected
spend operations. The end user will need to use `tx_info.input.image`,
`tx_info.source.index`, and `tx_info.source.tx_public` to determine if the
output was actually spent or being used as a decoy.
Example of the "raw" output from ZMQ-SUB side:
```json
json-full-spend_hook:{
"index": 0,
"event": {
"event": "tx-spend",
"token": "spend-xmr",
"event_id": "7ff047aa74e14f4aa978469bc0eec8ec",
"tx_info": {
"input": {
"height": 2464207,
"tx_hash": "97d4e66c4968b16fec7662adc9f8562c49108d3c5e7030c4d6dd32d97fb62540",
"image": "b0fe7acd9e17bb8b9ac2daae36d4cb607ac60ed8a101cc9b2e1f74016cf80b24",
"source": {
"high": 0,
"low": 6246316
},
"timestamp": 1711902214,
"unlock_time": 0,
"mixin_count": 15,
"sender": {
"maj_i": 0,
"min_i": 0
}
},
"source": {
"id": {
"high": 0,
"low": 6246316
},
"amount": 10000000000,
"mixin": 15,
"index": 0,
"tx_public": "426ccd6d39535a1ee8636d14978581e580fcea35c8d3843ceb32eb688a0197f7"
}
}
}
}
```
> `index` is a counter used to detect dropped messages

View file

@ -359,7 +359,7 @@ namespace db
namespace
{
constexpr const char* map_webhook_type[] = {"tx-confirmation", "new-account"};
constexpr const char* map_webhook_type[] = {"tx-confirmation", "new-account", "tx-spend"};
template<typename F, typename T>
void map_webhook_key(F& format, T& self)
@ -423,6 +423,32 @@ namespace db
);
}
static void write_bytes(wire::writer& dest, const output::spend_meta_& self)
{
wire::object(dest,
WIRE_FIELD_ID(0, id),
wire::field<1>("amount", self.amount),
wire::field<2>("mixin", self.mixin_count),
wire::field<3>("index", self.index),
WIRE_FIELD_ID(4, tx_public)
);
}
static void write_bytes(wire::writer& dest, const webhook_tx_spend::tx_info_& self)
{
wire::object(dest, WIRE_FIELD_ID(0, input), WIRE_FIELD_ID(1, source));
}
void write_bytes(wire::writer& dest, const webhook_tx_spend& self)
{
wire::object(dest,
wire::field<0>("event", std::cref(self.key.type)),
wire::field<1>("token", std::cref(self.value.second.token)),
wire::field<2>("event_id", std::cref(self.value.first.event_id)),
WIRE_FIELD_ID(3, tx_info)
);
}
void write_bytes(wire::json_writer& dest, const webhook_event& self)
{
crypto::hash8 payment_id;

View file

@ -335,7 +335,8 @@ namespace db
enum class webhook_type : std::uint8_t
{
tx_confirmation = 0, // cannot change values - stored in DB
new_account
new_account,
tx_spend
// unconfirmed_tx,
// new_block
// confirmed_tx,
@ -384,6 +385,19 @@ namespace db
};
void write_bytes(wire::writer&, const webhook_tx_confirmation&);
//! Returned by DB when a webhook event "tripped"
struct webhook_tx_spend
{
webhook_key key;
webhook_value value;
struct tx_info_
{
spend input;
output::spend_meta_ source;
} tx_info;
};
void write_bytes(wire::writer&, const webhook_tx_spend&);
//! References a specific output that triggered a webhook
struct webhook_output
{

View file

@ -2573,18 +2573,69 @@ namespace db
}
return success();
}
expect<void> check_spends(std::vector<webhook_tx_spend>& out, MDB_cursor& webhooks_cur, MDB_cursor& outputs_cur, const lws::account& user)
{
const account_id user_id = user.id();
const webhook_key hook_key{user_id, webhook_type::tx_spend};
MDB_val key = lmdb::to_val(hook_key);
MDB_val value{};
// Find a tx_spend for user id
int err = mdb_cursor_get(&webhooks_cur, &key, &value, MDB_SET_KEY);
for (;;)
{
if (err)
{
if (err != MDB_NOTFOUND)
return {lmdb::error(err)};
break;
}
const auto hook = webhooks.get_value(value);
if (hook)
{
out.reserve(user.spends().size());
for (const spend& s : user.spends())
{
key = lmdb::to_val(user_id);
value = lmdb::to_val(s.link.height);
err = mdb_cursor_get(&outputs_cur, &key, &value, MDB_GET_BOTH_RANGE);
expect<output::spend_meta_> meta{common_error::kInvalidArgument};
for (;;)
{
if (err)
return {lmdb::error(err)};
meta = outputs.get_value<MONERO_FIELD(output, spend_meta)>(value);
if (!meta)
return meta.error();
if (meta->id == s.source)
break;
err = mdb_cursor_get(&outputs_cur, &key, &value, MDB_PREV_DUP);
}
out.push_back(
webhook_tx_spend{hook_key, *hook, {s, *meta}}
);
}
}
err = mdb_cursor_get(&webhooks_cur, &key, &value, MDB_NEXT_DUP);
} // every hook_key
return success();
}
} // anonymous
expect<std::pair<std::size_t, std::vector<webhook_tx_confirmation>>> storage::update(block_id height, epee::span<const crypto::hash> chain, epee::span<const lws::account> users, epee::span<const pow_sync> pow)
expect<storage::updated> storage::update(block_id height, epee::span<const crypto::hash> chain, epee::span<const lws::account> users, epee::span<const pow_sync> pow)
{
if (users.empty() && chain.empty())
return {std::make_pair(0, std::vector<webhook_tx_confirmation>{})};
return {updated{}};
MONERO_PRECOND(!chain.empty());
MONERO_PRECOND(db != nullptr);
if (!pow.empty())
MONERO_PRECOND(chain.size() == pow.size());
return db->try_write([this, height, chain, users, pow] (MDB_txn& txn) -> expect<std::pair<std::size_t, std::vector<webhook_tx_confirmation>>>
return db->try_write([this, height, chain, users, pow] (MDB_txn& txn) -> expect<updated>
{
epee::span<const crypto::hash> chain_copy{chain};
epee::span<const pow_sync> pow_copy{pow};
@ -2593,7 +2644,7 @@ namespace db
const std::uint64_t first_new = lmdb::to_native(height) + 1;
// collect all .value() errors
std::pair<std::size_t, std::vector<webhook_tx_confirmation>> updated;
updated out{};
if (get_checkpoints().get_max_height() <= last_update)
{
cursor::blocks blocks_cur;
@ -2652,7 +2703,7 @@ namespace db
const auto cur_block = blocks.get_value<block_info>(value);
if (!cur_block)
return cur_block.error();
// If a reorg past a checkpoint is being attempted
// If a reorg past a checkpoint is being attempted
if (chain[chain.size() - 1] != cur_block->hash)
return {error::bad_blockchain};
@ -2743,13 +2794,14 @@ namespace db
MONERO_CHECK(check_hooks(*webhooks_cur, *events_cur, *user));
MONERO_CHECK(
add_ongoing_hooks(
updated.second, *webhooks_cur, *outputs_cur, *events_cur, user->id(), block_id(first_new), block_id(last_update + 1)
out.confirm_pubs, *webhooks_cur, *outputs_cur, *events_cur, user->id(), block_id(first_new), block_id(last_update + 1)
)
);
MONERO_CHECK(check_spends(out.spend_pubs, *webhooks_cur, *outputs_cur, *user));
++updated.first;
++out.accounts_updated;
} // ... for every account being updated ...
return {std::move(updated)};
return {std::move(out)};
});
}
@ -2954,7 +3006,7 @@ namespace db
key.user = MONERO_UNWRAP(accounts_by_address.get_value<MONERO_FIELD(account_by_address, lookup.id)>(lmvalue));
}
if (key.user == account_id::invalid && type == webhook_type::tx_confirmation)
if (key.user == account_id::invalid && (type == webhook_type::tx_confirmation || type == webhook_type::tx_spend))
return {error::bad_webhook};
lmkey = lmdb::to_val(key);

View file

@ -264,6 +264,13 @@ namespace db
expect<std::vector<account_address>>
reject_requests(request req, epee::span<const account_address> addresses);
//! Status of an `update` request
struct updated
{
std::vector<webhook_tx_spend> spend_pubs;
std::vector<webhook_tx_confirmation> confirm_pubs;
std::size_t accounts_updated;
};
/*!
Updates the status of user accounts, even if inactive or hidden. Duplicate
receives or spends provided in `accts` are silently ignored. If a gap in
@ -274,15 +281,15 @@ namespace db
\param chain List of block hashes that `accts` were scanned against.
\param accts Updated to `height + chain.size()` scan height.
\return Number of updated accounts, and a list of webhooks that triggered.
\return Status via `updated` object.
*/
expect<std::pair<std::size_t, std::vector<webhook_tx_confirmation>>>
expect<updated>
update(block_id height, epee::span<const crypto::hash> chain, epee::span<const lws::account> accts, epee::span<const pow_sync> pow);
/*!
Adds subaddresses to an account. Upon success, an account will
immediately begin tracking them in the scanner.
\param id of the account to associate new indexes
\param addresss of the account (needed to generate subaddress publc key)
\param view_key of the account (needed to generate subaddress public key)
@ -300,7 +307,7 @@ namespace db
/*!
Add webhook to be tracked in the database. The webhook will "call"
the specified URL with JSON/msgpack information when the event occurs.
\param type The webhook event type to be tracked by the DB.
\param address is required for `type == tx_confirmation`, and is not
not needed for all other types.

View file

@ -320,6 +320,10 @@ namespace lws { namespace rpc
if (req.address)
return {error::bad_webhook};
break;
case db::webhook_type::tx_spend:
if (!req.address)
return {error::bad_webhook};
break;
default:
return {error::bad_webhook};
}

View file

@ -217,6 +217,11 @@ namespace lws
vec.erase(vec.begin());
};
void send_spend_hook(rpc::client& client, const epee::span<const db::webhook_tx_spend> events, net::ssl_verification_t verify_mode)
{
rpc::send_webhook(client, events, "json-full-spend_hook:", "msgpack-full-spend_hook:", std::chrono::seconds{5}, verify_mode);
}
struct by_height
{
bool operator()(account const& left, account const& right) const noexcept
@ -880,11 +885,11 @@ namespace lws
}
MINFO("Processed " << blocks.size() << " block(s) against " << users.size() << " account(s)");
send_payment_hook(client, epee::to_span(updated->second), opts.webhook_verify);
if (updated->first != users.size())
send_payment_hook(client, epee::to_span(updated->confirm_pubs), opts.webhook_verify);
send_spend_hook(client, epee::to_span(updated->spend_pubs), opts.webhook_verify);
if (updated->accounts_updated != users.size())
{
MWARNING("Only updated " << updated->first << " account(s) out of " << users.size() << ", resetting");
MWARNING("Only updated " << updated->accounts_updated << " account(s) out of " << users.size() << ", resetting");
return;
}

View file

@ -64,6 +64,26 @@ namespace
}
);
}
void add_spend(lws::account& account, const lws::db::block_id last_id)
{
account.add_spend(
lws::db::spend{
lws::db::transaction_link{
lws::db::block_id(lmdb::to_native(last_id) + 1),
crypto::rand<crypto::hash>()
},
crypto::rand<crypto::key_image>(),
lws::db::output_id{0, 100},
std::uint64_t(2000),
std::uint64_t(0),
std::uint32_t(16),
{0, 0, 0},
std::uint8_t(0),
crypto::rand<crypto::hash>(),
lws::db::address_index{lws::db::major_index(1), lws::db::minor_index(0)}
}
);
}
}
LWS_CASE("db::storage::*_webhook")
@ -143,25 +163,26 @@ LWS_CASE("db::storage::*_webhook")
auto updated = db.update(head.id, chain, {std::addressof(full_account), 1}, nullptr);
EXPECT(!updated.has_error());
EXPECT(updated->first == 1);
EXPECT(updated->spend_pubs.empty());
EXPECT(updated->accounts_updated == 1);
if (i < 3)
{
EXPECT(updated->second.size() == 1);
EXPECT(updated->second[0].key.user == lws::db::account_id(1));
EXPECT(updated->second[0].key.type == lws::db::webhook_type::tx_confirmation);
EXPECT(updated->second[0].value.first.payment_id == 500);
EXPECT(updated->second[0].value.first.event_id == id);
EXPECT(updated->second[0].value.second.url == "http://the_url");
EXPECT(updated->second[0].value.second.token == "the_token");
EXPECT(updated->second[0].value.second.confirmations == i + 1);
EXPECT(updated->confirm_pubs.size() == 1);
EXPECT(updated->confirm_pubs[0].key.user == lws::db::account_id(1));
EXPECT(updated->confirm_pubs[0].key.type == lws::db::webhook_type::tx_confirmation);
EXPECT(updated->confirm_pubs[0].value.first.payment_id == 500);
EXPECT(updated->confirm_pubs[0].value.first.event_id == id);
EXPECT(updated->confirm_pubs[0].value.second.url == "http://the_url");
EXPECT(updated->confirm_pubs[0].value.second.token == "the_token");
EXPECT(updated->confirm_pubs[0].value.second.confirmations == i + 1);
EXPECT(updated->second[0].tx_info.link == outs[0].link);
EXPECT(updated->second[0].tx_info.spend_meta.id == outs[0].spend_meta.id);
EXPECT(updated->second[0].tx_info.pub == outs[0].pub);
EXPECT(updated->second[0].tx_info.payment_id.short_ == outs[0].payment_id.short_);
EXPECT(updated->confirm_pubs[0].tx_info.link == outs[0].link);
EXPECT(updated->confirm_pubs[0].tx_info.spend_meta.id == outs[0].spend_meta.id);
EXPECT(updated->confirm_pubs[0].tx_info.pub == outs[0].pub);
EXPECT(updated->confirm_pubs[0].tx_info.payment_id.short_ == outs[0].payment_id.short_);
}
else
EXPECT(updated->second.empty());
EXPECT(updated->confirm_pubs.empty());
full_account.updated(head.id);
head = {lws::db::block_id(lmdb::to_native(head.id) + 1), chain[1]};
@ -187,24 +208,107 @@ LWS_CASE("db::storage::*_webhook")
const auto updated = db.update(last_block.id, chain, {std::addressof(full_account), 1}, nullptr);
EXPECT(!updated.has_error());
EXPECT(updated->first == 1);
EXPECT(updated->second.size() == 3);
EXPECT(updated->spend_pubs.empty());
EXPECT(updated->accounts_updated == 1);
EXPECT(updated->confirm_pubs.size() == 3);
for (unsigned i = 0; i < 3; ++i)
{
EXPECT(updated->second[i].key.user == lws::db::account_id(1));
EXPECT(updated->second[i].key.type == lws::db::webhook_type::tx_confirmation);
EXPECT(updated->second[i].value.first.payment_id == 500);
EXPECT(updated->second[i].value.first.event_id == id);
EXPECT(updated->second[i].value.second.url == "http://the_url");
EXPECT(updated->second[i].value.second.token == "the_token");
EXPECT(updated->second[i].value.second.confirmations == i + 1);
EXPECT(updated->confirm_pubs[i].key.user == lws::db::account_id(1));
EXPECT(updated->confirm_pubs[i].key.type == lws::db::webhook_type::tx_confirmation);
EXPECT(updated->confirm_pubs[i].value.first.payment_id == 500);
EXPECT(updated->confirm_pubs[i].value.first.event_id == id);
EXPECT(updated->confirm_pubs[i].value.second.url == "http://the_url");
EXPECT(updated->confirm_pubs[i].value.second.token == "the_token");
EXPECT(updated->confirm_pubs[i].value.second.confirmations == i + 1);
EXPECT(updated->second[i].tx_info.link == outs[0].link);
EXPECT(updated->second[i].tx_info.spend_meta.id == outs[0].spend_meta.id);
EXPECT(updated->second[i].tx_info.pub == outs[0].pub);
EXPECT(updated->second[i].tx_info.payment_id.short_ == outs[0].payment_id.short_);
EXPECT(updated->confirm_pubs[i].tx_info.link == outs[0].link);
EXPECT(updated->confirm_pubs[i].tx_info.spend_meta.id == outs[0].spend_meta.id);
EXPECT(updated->confirm_pubs[i].tx_info.pub == outs[0].pub);
EXPECT(updated->confirm_pubs[i].tx_info.payment_id.short_ == outs[0].payment_id.short_);
}
}
SECTION("Add db spend")
{
const boost::uuids::uuid other_id = boost::uuids::random_generator{}();
{
lws::db::webhook_value value{
lws::db::webhook_dupsort{0, other_id},
lws::db::webhook_data{"http://the_url_spend", "the_token_spend"}
};
MONERO_UNWRAP(
db.add_webhook(lws::db::webhook_type::tx_spend, account, std::move(value))
);
}
SECTION("storage::get_webhooks()")
{
lws::db::storage_reader reader = MONERO_UNWRAP(db.start_read());
const auto result = MONERO_UNWRAP(reader.get_webhooks());
EXPECT(result.size() == 2);
EXPECT(result[0].first.user == lws::db::account_id(1));
EXPECT(result[0].first.type == lws::db::webhook_type::tx_confirmation);
EXPECT(result[0].second.size() == 1);
EXPECT(result[0].second[0].first.payment_id == 500);
EXPECT(result[0].second[0].first.event_id == id);
EXPECT(result[0].second[0].second.url == "http://the_url");
EXPECT(result[0].second[0].second.token == "the_token");
EXPECT(result[0].second[0].second.confirmations == 3);
EXPECT(result[1].first.user == lws::db::account_id(1));
EXPECT(result[1].first.type == lws::db::webhook_type::tx_spend);
EXPECT(result[1].second.size() == 1);
EXPECT(result[1].second[0].first.payment_id == 0);
EXPECT(result[1].second[0].first.event_id == other_id);
EXPECT(result[1].second[0].second.url == "http://the_url_spend");
EXPECT(result[1].second[0].second.token == "the_token_spend");
EXPECT(result[1].second[0].second.confirmations == 0);
}
const crypto::hash chain[5] = {
last_block.hash,
crypto::rand<crypto::hash>(),
crypto::rand<crypto::hash>(),
crypto::rand<crypto::hash>(),
crypto::rand<crypto::hash>()
};
lws::account full_account = lws::db::test::make_account(account, view);
full_account.updated(last_block.id);
EXPECT(add_out(full_account, last_block.id, 500));
add_spend(full_account, last_block.id);
const auto outs = full_account.outputs();
const auto spends = full_account.spends();
EXPECT(outs.size() == 1);
EXPECT(spends.size() == 1);
const auto updated = db.update(last_block.id, chain, {std::addressof(full_account), 1}, nullptr);
EXPECT(!updated.has_error());
EXPECT(updated->accounts_updated == 1);
EXPECT(updated->confirm_pubs.size() == 3);
EXPECT(updated->spend_pubs.size() == 1);
EXPECT(updated->spend_pubs[0].key.user == lws::db::account_id(1));
EXPECT(updated->spend_pubs[0].key.type == lws::db::webhook_type::tx_spend);
EXPECT(updated->spend_pubs[0].value.first.payment_id == 0);
EXPECT(updated->spend_pubs[0].value.first.event_id == other_id);
EXPECT(updated->spend_pubs[0].value.second.url == "http://the_url_spend");
EXPECT(updated->spend_pubs[0].value.second.token == "the_token_spend");
EXPECT(updated->spend_pubs[0].value.second.confirmations == 0);
EXPECT(updated->spend_pubs[0].tx_info.input.link == spends[0].link);
EXPECT(updated->spend_pubs[0].tx_info.input.image == spends[0].image);
EXPECT(updated->spend_pubs[0].tx_info.input.timestamp == spends[0].timestamp);
EXPECT(updated->spend_pubs[0].tx_info.input.unlock_time == spends[0].unlock_time);
EXPECT(updated->spend_pubs[0].tx_info.input.mixin_count == spends[0].mixin_count);
EXPECT(updated->spend_pubs[0].tx_info.input.length == spends[0].length);
EXPECT(updated->spend_pubs[0].tx_info.input.payment_id == spends[0].payment_id);
EXPECT(updated->spend_pubs[0].tx_info.input.sender.maj_i == lws::db::major_index(1));
EXPECT(updated->spend_pubs[0].tx_info.input.sender.min_i == lws::db::minor_index(0));
EXPECT(updated->spend_pubs[0].tx_info.source.id == outs[0].spend_meta.id);
EXPECT(updated->spend_pubs[0].tx_info.source.amount == outs[0].spend_meta.amount);
EXPECT(updated->spend_pubs[0].tx_info.source.mixin_count == outs[0].spend_meta.mixin_count);
EXPECT(updated->spend_pubs[0].tx_info.source.index == outs[0].spend_meta.index);
EXPECT(updated->spend_pubs[0].tx_info.source.tx_public == outs[0].spend_meta.tx_public);
}
}
}