Add support to webhook/zmq track spends

This commit is contained in:
Lee Clagett 2024-02-27 20:23:44 -05:00
parent 4191956294
commit d8bfdb8a9d
6 changed files with 208 additions and 46 deletions

View file

@ -359,7 +359,7 @@ namespace db
namespace
{
constexpr const char* map_webhook_type[] = {"tx-confirmation", "new-account"};
constexpr const char* map_webhook_type[] = {"tx-confirmation", "new-account", "tx-spend"};
template<typename F, typename T>
void map_webhook_key(F& format, T& self)
@ -423,6 +423,16 @@ namespace db
);
}
void write_bytes(wire::writer& dest, const webhook_tx_spend& self)
{
wire::object(dest,
wire::field<0>("event", std::cref(self.key.type)),
wire::field<1>("token", std::cref(self.value.second.token)),
wire::field<2>("event_id", std::cref(self.value.first.event_id)),
WIRE_FIELD_ID(3, tx_info)
);
}
void write_bytes(wire::json_writer& dest, const webhook_event& self)
{
crypto::hash8 payment_id;

View file

@ -335,7 +335,8 @@ namespace db
enum class webhook_type : std::uint8_t
{
tx_confirmation = 0, // cannot change values - stored in DB
new_account
new_account,
tx_spend
// unconfirmed_tx,
// new_block
// confirmed_tx,
@ -384,6 +385,15 @@ namespace db
};
void write_bytes(wire::writer&, const webhook_tx_confirmation&);
//! Returned by DB when a webhook event "tripped"
struct webhook_tx_spend
{
webhook_key key;
webhook_value value;
spend tx_info;
};
void write_bytes(wire::writer&, const webhook_tx_spend&);
//! References a specific output that triggered a webhook
struct webhook_output
{

View file

@ -2573,18 +2573,52 @@ namespace db
}
return success();
}
expect<void> check_spends(std::vector<webhook_tx_spend>& out, MDB_cursor& webhooks_cur, const lws::account& user)
{
const webhook_key hook_key{user.id(), webhook_type::tx_spend};
MDB_val key = lmdb::to_val(hook_key);
MDB_val value{};
// Find a tx_spend for user id
int err = mdb_cursor_get(&webhooks_cur, &key, &value, MDB_SET_KEY);
for (;;)
{
if (err)
{
if (err != MDB_NOTFOUND)
return {lmdb::error(err)};
break;
}
const auto hook = webhooks.get_value(value);
if (hook)
{
out.reserve(user.spends().size());
for (const spend& s : user.spends())
{
out.push_back(
webhook_tx_spend{hook_key, *hook, s}
);
}
}
err = mdb_cursor_get(&webhooks_cur, &key, &value, MDB_NEXT_DUP);
} // every hook_key
return success();
}
} // anonymous
expect<std::pair<std::size_t, std::vector<webhook_tx_confirmation>>> storage::update(block_id height, epee::span<const crypto::hash> chain, epee::span<const lws::account> users, epee::span<const pow_sync> pow)
expect<storage::updated> storage::update(block_id height, epee::span<const crypto::hash> chain, epee::span<const lws::account> users, epee::span<const pow_sync> pow)
{
if (users.empty() && chain.empty())
return {std::make_pair(0, std::vector<webhook_tx_confirmation>{})};
return {updated{}};
MONERO_PRECOND(!chain.empty());
MONERO_PRECOND(db != nullptr);
if (!pow.empty())
MONERO_PRECOND(chain.size() == pow.size());
return db->try_write([this, height, chain, users, pow] (MDB_txn& txn) -> expect<std::pair<std::size_t, std::vector<webhook_tx_confirmation>>>
return db->try_write([this, height, chain, users, pow] (MDB_txn& txn) -> expect<updated>
{
epee::span<const crypto::hash> chain_copy{chain};
epee::span<const pow_sync> pow_copy{pow};
@ -2593,7 +2627,7 @@ namespace db
const std::uint64_t first_new = lmdb::to_native(height) + 1;
// collect all .value() errors
std::pair<std::size_t, std::vector<webhook_tx_confirmation>> updated;
updated out{};
if (get_checkpoints().get_max_height() <= last_update)
{
cursor::blocks blocks_cur;
@ -2652,7 +2686,7 @@ namespace db
const auto cur_block = blocks.get_value<block_info>(value);
if (!cur_block)
return cur_block.error();
// If a reorg past a checkpoint is being attempted
// If a reorg past a checkpoint is being attempted
if (chain[chain.size() - 1] != cur_block->hash)
return {error::bad_blockchain};
@ -2743,13 +2777,14 @@ namespace db
MONERO_CHECK(check_hooks(*webhooks_cur, *events_cur, *user));
MONERO_CHECK(
add_ongoing_hooks(
updated.second, *webhooks_cur, *outputs_cur, *events_cur, user->id(), block_id(first_new), block_id(last_update + 1)
out.confirm_pubs, *webhooks_cur, *outputs_cur, *events_cur, user->id(), block_id(first_new), block_id(last_update + 1)
)
);
MONERO_CHECK(check_spends(out.spend_pubs, *webhooks_cur, *user));
++updated.first;
++out.accounts_updated;
} // ... for every account being updated ...
return {std::move(updated)};
return {std::move(out)};
});
}
@ -2954,7 +2989,7 @@ namespace db
key.user = MONERO_UNWRAP(accounts_by_address.get_value<MONERO_FIELD(account_by_address, lookup.id)>(lmvalue));
}
if (key.user == account_id::invalid && type == webhook_type::tx_confirmation)
if (key.user == account_id::invalid && (type == webhook_type::tx_confirmation || type == webhook_type::tx_spend))
return {error::bad_webhook};
lmkey = lmdb::to_val(key);

View file

@ -264,6 +264,13 @@ namespace db
expect<std::vector<account_address>>
reject_requests(request req, epee::span<const account_address> addresses);
//! Status of an `update` request
struct updated
{
std::vector<webhook_tx_spend> spend_pubs;
std::vector<webhook_tx_confirmation> confirm_pubs;
std::size_t accounts_updated;
};
/*!
Updates the status of user accounts, even if inactive or hidden. Duplicate
receives or spends provided in `accts` are silently ignored. If a gap in
@ -274,15 +281,15 @@ namespace db
\param chain List of block hashes that `accts` were scanned against.
\param accts Updated to `height + chain.size()` scan height.
\return Number of updated accounts, and a list of webhooks that triggered.
\return Status via `updated` object.
*/
expect<std::pair<std::size_t, std::vector<webhook_tx_confirmation>>>
expect<updated>
update(block_id height, epee::span<const crypto::hash> chain, epee::span<const lws::account> accts, epee::span<const pow_sync> pow);
/*!
Adds subaddresses to an account. Upon success, an account will
immediately begin tracking them in the scanner.
\param id of the account to associate new indexes
\param addresss of the account (needed to generate subaddress publc key)
\param view_key of the account (needed to generate subaddress public key)
@ -300,7 +307,7 @@ namespace db
/*!
Add webhook to be tracked in the database. The webhook will "call"
the specified URL with JSON/msgpack information when the event occurs.
\param type The webhook event type to be tracked by the DB.
\param address is required for `type == tx_confirmation`, and is not
not needed for all other types.

View file

@ -217,6 +217,11 @@ namespace lws
vec.erase(vec.begin());
};
void send_spend_hook(rpc::client& client, const epee::span<const db::webhook_tx_spend> events, net::ssl_verification_t verify_mode)
{
rpc::send_webhook(client, events, "json-full-spend_hook:", "msgpack-full-spend_hook:", std::chrono::seconds{5}, verify_mode);
}
struct by_height
{
bool operator()(account const& left, account const& right) const noexcept
@ -880,11 +885,11 @@ namespace lws
}
MINFO("Processed " << blocks.size() << " block(s) against " << users.size() << " account(s)");
send_payment_hook(client, epee::to_span(updated->second), opts.webhook_verify);
if (updated->first != users.size())
send_payment_hook(client, epee::to_span(updated->confirm_pubs), opts.webhook_verify);
send_spend_hook(client, epee::to_span(updated->spend_pubs), opts.webhook_verify);
if (updated->accounts_updated != users.size())
{
MWARNING("Only updated " << updated->first << " account(s) out of " << users.size() << ", resetting");
MWARNING("Only updated " << updated->accounts_updated << " account(s) out of " << users.size() << ", resetting");
return;
}

View file

@ -64,6 +64,26 @@ namespace
}
);
}
void add_spend(lws::account& account, const lws::db::block_id last_id)
{
account.add_spend(
lws::db::spend{
lws::db::transaction_link{
lws::db::block_id(lmdb::to_native(last_id) + 1),
crypto::rand<crypto::hash>()
},
crypto::rand<crypto::key_image>(),
lws::db::output_id{0, 200},
std::uint64_t(2000),
std::uint64_t(0),
std::uint32_t(16),
{0, 0, 0},
std::uint8_t(0),
crypto::rand<crypto::hash>(),
lws::db::address_index{lws::db::major_index(1), lws::db::minor_index(0)}
}
);
}
}
LWS_CASE("db::storage::*_webhook")
@ -143,25 +163,26 @@ LWS_CASE("db::storage::*_webhook")
auto updated = db.update(head.id, chain, {std::addressof(full_account), 1}, nullptr);
EXPECT(!updated.has_error());
EXPECT(updated->first == 1);
EXPECT(updated->spend_pubs.empty());
EXPECT(updated->accounts_updated == 1);
if (i < 3)
{
EXPECT(updated->second.size() == 1);
EXPECT(updated->second[0].key.user == lws::db::account_id(1));
EXPECT(updated->second[0].key.type == lws::db::webhook_type::tx_confirmation);
EXPECT(updated->second[0].value.first.payment_id == 500);
EXPECT(updated->second[0].value.first.event_id == id);
EXPECT(updated->second[0].value.second.url == "http://the_url");
EXPECT(updated->second[0].value.second.token == "the_token");
EXPECT(updated->second[0].value.second.confirmations == i + 1);
EXPECT(updated->confirm_pubs.size() == 1);
EXPECT(updated->confirm_pubs[0].key.user == lws::db::account_id(1));
EXPECT(updated->confirm_pubs[0].key.type == lws::db::webhook_type::tx_confirmation);
EXPECT(updated->confirm_pubs[0].value.first.payment_id == 500);
EXPECT(updated->confirm_pubs[0].value.first.event_id == id);
EXPECT(updated->confirm_pubs[0].value.second.url == "http://the_url");
EXPECT(updated->confirm_pubs[0].value.second.token == "the_token");
EXPECT(updated->confirm_pubs[0].value.second.confirmations == i + 1);
EXPECT(updated->second[0].tx_info.link == outs[0].link);
EXPECT(updated->second[0].tx_info.spend_meta.id == outs[0].spend_meta.id);
EXPECT(updated->second[0].tx_info.pub == outs[0].pub);
EXPECT(updated->second[0].tx_info.payment_id.short_ == outs[0].payment_id.short_);
EXPECT(updated->confirm_pubs[0].tx_info.link == outs[0].link);
EXPECT(updated->confirm_pubs[0].tx_info.spend_meta.id == outs[0].spend_meta.id);
EXPECT(updated->confirm_pubs[0].tx_info.pub == outs[0].pub);
EXPECT(updated->confirm_pubs[0].tx_info.payment_id.short_ == outs[0].payment_id.short_);
}
else
EXPECT(updated->second.empty());
EXPECT(updated->confirm_pubs.empty());
full_account.updated(head.id);
head = {lws::db::block_id(lmdb::to_native(head.id) + 1), chain[1]};
@ -187,24 +208,98 @@ LWS_CASE("db::storage::*_webhook")
const auto updated = db.update(last_block.id, chain, {std::addressof(full_account), 1}, nullptr);
EXPECT(!updated.has_error());
EXPECT(updated->first == 1);
EXPECT(updated->second.size() == 3);
EXPECT(updated->spend_pubs.empty());
EXPECT(updated->accounts_updated == 1);
EXPECT(updated->confirm_pubs.size() == 3);
for (unsigned i = 0; i < 3; ++i)
{
EXPECT(updated->second[i].key.user == lws::db::account_id(1));
EXPECT(updated->second[i].key.type == lws::db::webhook_type::tx_confirmation);
EXPECT(updated->second[i].value.first.payment_id == 500);
EXPECT(updated->second[i].value.first.event_id == id);
EXPECT(updated->second[i].value.second.url == "http://the_url");
EXPECT(updated->second[i].value.second.token == "the_token");
EXPECT(updated->second[i].value.second.confirmations == i + 1);
EXPECT(updated->confirm_pubs[i].key.user == lws::db::account_id(1));
EXPECT(updated->confirm_pubs[i].key.type == lws::db::webhook_type::tx_confirmation);
EXPECT(updated->confirm_pubs[i].value.first.payment_id == 500);
EXPECT(updated->confirm_pubs[i].value.first.event_id == id);
EXPECT(updated->confirm_pubs[i].value.second.url == "http://the_url");
EXPECT(updated->confirm_pubs[i].value.second.token == "the_token");
EXPECT(updated->confirm_pubs[i].value.second.confirmations == i + 1);
EXPECT(updated->second[i].tx_info.link == outs[0].link);
EXPECT(updated->second[i].tx_info.spend_meta.id == outs[0].spend_meta.id);
EXPECT(updated->second[i].tx_info.pub == outs[0].pub);
EXPECT(updated->second[i].tx_info.payment_id.short_ == outs[0].payment_id.short_);
EXPECT(updated->confirm_pubs[i].tx_info.link == outs[0].link);
EXPECT(updated->confirm_pubs[i].tx_info.spend_meta.id == outs[0].spend_meta.id);
EXPECT(updated->confirm_pubs[i].tx_info.pub == outs[0].pub);
EXPECT(updated->confirm_pubs[i].tx_info.payment_id.short_ == outs[0].payment_id.short_);
}
}
SECTION("Add db spend")
{
const boost::uuids::uuid other_id = boost::uuids::random_generator{}();
{
lws::db::webhook_value value{
lws::db::webhook_dupsort{0, other_id},
lws::db::webhook_data{"http://the_url_spend", "the_token_spend"}
};
MONERO_UNWRAP(
db.add_webhook(lws::db::webhook_type::tx_spend, account, std::move(value))
);
}
SECTION("storage::get_webhooks()")
{
lws::db::storage_reader reader = MONERO_UNWRAP(db.start_read());
const auto result = MONERO_UNWRAP(reader.get_webhooks());
EXPECT(result.size() == 2);
EXPECT(result[0].first.user == lws::db::account_id(1));
EXPECT(result[0].first.type == lws::db::webhook_type::tx_confirmation);
EXPECT(result[0].second.size() == 1);
EXPECT(result[0].second[0].first.payment_id == 500);
EXPECT(result[0].second[0].first.event_id == id);
EXPECT(result[0].second[0].second.url == "http://the_url");
EXPECT(result[0].second[0].second.token == "the_token");
EXPECT(result[0].second[0].second.confirmations == 3);
EXPECT(result[1].first.user == lws::db::account_id(1));
EXPECT(result[1].first.type == lws::db::webhook_type::tx_spend);
EXPECT(result[1].second.size() == 1);
EXPECT(result[1].second[0].first.payment_id == 0);
EXPECT(result[1].second[0].first.event_id == other_id);
EXPECT(result[1].second[0].second.url == "http://the_url_spend");
EXPECT(result[1].second[0].second.token == "the_token_spend");
EXPECT(result[1].second[0].second.confirmations == 0);
}
const crypto::hash chain[5] = {
last_block.hash,
crypto::rand<crypto::hash>(),
crypto::rand<crypto::hash>(),
crypto::rand<crypto::hash>(),
crypto::rand<crypto::hash>()
};
lws::account full_account = lws::db::test::make_account(account, view);
full_account.updated(last_block.id);
add_spend(full_account, lws::db::block_id(105));
const auto spends = full_account.spends();
EXPECT(spends.size() == 1);
const auto updated = db.update(last_block.id, chain, {std::addressof(full_account), 1}, nullptr);
EXPECT(!updated.has_error());
EXPECT(updated->accounts_updated == 1);
EXPECT(updated->confirm_pubs.empty());
EXPECT(updated->spend_pubs.size() == 1);
EXPECT(updated->spend_pubs[0].key.user == lws::db::account_id(1));
EXPECT(updated->spend_pubs[0].key.type == lws::db::webhook_type::tx_spend);
EXPECT(updated->spend_pubs[0].value.first.payment_id == 0);
EXPECT(updated->spend_pubs[0].value.first.event_id == other_id);
EXPECT(updated->spend_pubs[0].value.second.url == "http://the_url_spend");
EXPECT(updated->spend_pubs[0].value.second.token == "the_token_spend");
EXPECT(updated->spend_pubs[0].value.second.confirmations == 0);
EXPECT(updated->spend_pubs[0].tx_info.link == spends[0].link);
EXPECT(updated->spend_pubs[0].tx_info.image == spends[0].image);
EXPECT(updated->spend_pubs[0].tx_info.timestamp == spends[0].timestamp);
EXPECT(updated->spend_pubs[0].tx_info.unlock_time == spends[0].unlock_time);
EXPECT(updated->spend_pubs[0].tx_info.mixin_count == spends[0].mixin_count);
EXPECT(updated->spend_pubs[0].tx_info.length == spends[0].length);
EXPECT(updated->spend_pubs[0].tx_info.payment_id == spends[0].payment_id);
EXPECT(updated->spend_pubs[0].tx_info.sender.maj_i == lws::db::major_index(1));
EXPECT(updated->spend_pubs[0].tx_info.sender.min_i == lws::db::minor_index(0));
}
}
}