Add zero-confirmation support to webhooks (only) (#72)

This commit is contained in:
Lee *!* Clagett 2023-07-01 11:18:18 -04:00 committed by Lee *!* Clagett
parent f827dca8d1
commit fdbd3669a6
12 changed files with 408 additions and 32 deletions

View file

@ -34,6 +34,7 @@
#include "wire/json/write.h"
#include "wire/msgpack.h"
#include "wire/uuid.h"
#include "wire/wrapper/defaulted.h"
namespace lws
{
@ -126,9 +127,11 @@ namespace db
const auto payment_id = payment_bytes.empty() ?
nullptr : std::addressof(payment_bytes);
// defaulted will omit "id" and "block" when the output is in the
// txpool with no valid values.
wire::object(dest,
wire::field<0>("id", std::cref(self.spend_meta.id)),
wire::field<1>("block", self.link.height),
wire::optional_field<0>("id", wire::defaulted(std::cref(self.spend_meta.id), output_id::txpool())),
wire::optional_field<1>("block", wire::defaulted(self.link.height, block_id::txpool)),
wire::field<2>("index", self.spend_meta.index),
wire::field<3>("amount", self.spend_meta.amount),
wire::field<4>("timestamp", self.timestamp),

View file

@ -30,6 +30,7 @@
#include <cassert>
#include <cstdint>
#include <iosfwd>
#include <limits>
#include <string>
#include <utility>
@ -63,12 +64,19 @@ namespace db
WIRE_AS_INTEGER(account_time);
//! References a block height
enum class block_id : std::uint64_t {};
enum class block_id : std::uint64_t
{
txpool = std::uint64_t(-1) //! Represents not-yet-a-block
};
WIRE_AS_INTEGER(block_id);
//! References a global output number, as determined by the public chain
struct output_id
{
//! \return Special ID for outputs not yet in a block.
static constexpr output_id txpool() noexcept
{ return {0, std::numeric_limits<std::uint64_t>::max()}; }
std::uint64_t high; //!< Amount on public chain; rct outputs are `0`
std::uint64_t low; //!< Offset within `amount` on the public chain
};

View file

@ -749,6 +749,42 @@ namespace db
return requests.get_value<request_info>(value);
}
expect<std::vector<webhook_value>>
storage_reader::find_webhook(webhook_key const& key, crypto::hash8 const& payment_id, cursor::webhooks cur)
{
MONERO_PRECOND(txn != nullptr);
assert(db != nullptr);
MONERO_CHECK(check_cursor(*txn, db->tables.webhooks, cur));
webhook_dupsort dup{};
static_assert(sizeof(dup.payment_id) == sizeof(payment_id), "bad memcpy");
std::memcpy(std::addressof(dup.payment_id), std::addressof(payment_id), sizeof(payment_id));
MDB_val lkey = lmdb::to_val(key);
MDB_val lvalue = lmdb::to_val(dup);
std::vector<webhook_value> result{};
int err = mdb_cursor_get(cur.get(), &lkey, &lvalue, MDB_GET_BOTH_RANGE);
for (;;)
{
if (err)
{
if (err == MDB_NOTFOUND)
break;
return {lmdb::error(err)};
}
if (webhooks.get_fixed_value<MONERO_FIELD(webhook_dupsort, payment_id)>(lvalue) != dup.payment_id)
break;
result.push_back(MONERO_UNWRAP(webhooks.get_value(lvalue)));
err = mdb_cursor_get(cur.get(), &lkey, &lvalue, MDB_NEXT_DUP);
}
return result;
}
expect<std::vector<std::pair<webhook_key, std::vector<webhook_value>>>>
storage_reader::get_webhooks(cursor::webhooks cur)
{

View file

@ -133,6 +133,10 @@ namespace db
expect<request_info>
get_request(request type, account_address const& address, cursor::requests cur = nullptr) noexcept;
//! \return All webhook values associated with user `key` and `payment_id`.
expect<std::vector<webhook_value>>
find_webhook(webhook_key const& key, crypto::hash8 const& payment_id, cursor::webhooks cur = nullptr);
//! \return All webhooks in the DB
expect<std::vector<std::pair<webhook_key, std::vector<webhook_value>>>>
get_webhooks(cursor::webhooks cur = nullptr);

View file

@ -51,6 +51,7 @@ namespace rpc
constexpr const char abort_scan_signal[] = "SCAN";
constexpr const char abort_process_signal[] = "PROCESS";
constexpr const char minimal_chain_topic[] = "json-minimal-chain_main";
constexpr const char full_txpool_topic[] = "json-full-txpool_add";
constexpr const int daemon_zmq_linger = 0;
constexpr const std::chrono::seconds chain_poll_timeout{20};
constexpr const std::chrono::minutes chain_sub_timeout{2};
@ -225,10 +226,9 @@ namespace rpc
if (out.daemon_sub.get() == nullptr)
return net::zmq::get_error_code();
option = 1; // keep only last pub message from daemon
MONERO_ZMQ_CHECK(zmq_connect(out.daemon_sub.get(), out.ctx->sub_addr.c_str()));
MONERO_ZMQ_CHECK(zmq_setsockopt(out.daemon_sub.get(), ZMQ_CONFLATE, &option, sizeof(option)));
MONERO_CHECK(do_subscribe(out.daemon_sub.get(), minimal_chain_topic));
MONERO_CHECK(do_subscribe(out.daemon_sub.get(), full_txpool_topic));
}
out.signal_sub.reset(zmq_socket(out.ctx->comm.get(), ZMQ_SUB));
@ -250,7 +250,7 @@ namespace rpc
return do_subscribe(signal_sub.get(), abort_scan_signal);
}
expect<minimal_chain_pub> client::wait_for_block()
expect<std::vector<std::pair<client::topic, std::string>>> client::wait_for_block()
{
MONERO_PRECOND(ctx != nullptr);
assert(daemon != nullptr);
@ -271,14 +271,45 @@ namespace rpc
return ready.error();
}
}
expect<std::string> pub = net::zmq::receive(daemon_sub.get(), ZMQ_DONTWAIT);
if (!pub)
return pub.error();
if (!boost::string_ref{*pub}.starts_with(minimal_chain_topic))
return {lws::error::bad_daemon_response};
pub->erase(0, sizeof(minimal_chain_topic));
return minimal_chain_pub::from_json(std::move(*pub));
std::vector<std::pair<topic, std::string>> messages{};
for (; /*every message */ ;)
{
expect<std::string> pub = net::zmq::receive(daemon_sub.get(), ZMQ_DONTWAIT);
if (!pub)
{
if (pub == net::zmq::make_error_code(EAGAIN))
return {std::move(messages)};
return pub.error();
}
if (pub->size() < 5)
break; // for loop
switch (pub->at(5))
{
case 'm': // json-minimal-chain_main
if (boost::string_ref{*pub}.starts_with(minimal_chain_topic))
{
pub->erase(0, sizeof(minimal_chain_topic));
messages.emplace_back(topic::block, std::move(*pub));
}
else
MWARNING("Unexpected pub/sub message");
break;
case 'f': // json-full-txpool_add
if (boost::string_ref{*pub}.starts_with(full_txpool_topic))
{
pub->erase(0, sizeof(full_txpool_topic));
messages.emplace_back(topic::txpool, std::move(*pub));
}
else
MWARNING("Unexpected pub/sub message");
break;
default:
break;
}
} // for every message
return {lws::error::bad_daemon_response};
}
expect<void> client::send(epee::byte_slice message, std::chrono::seconds timeout) noexcept

View file

@ -75,6 +75,12 @@ namespace rpc
expect<void> get_response(cryptonote::rpc::Message& response, std::chrono::seconds timeout, source_location loc);
public:
enum class topic : std::uint8_t
{
block = 0, txpool
};
//! A client with no connection (all send/receive functions fail).
explicit client() noexcept
: ctx(), daemon(), daemon_sub(), signal_sub()
@ -110,7 +116,7 @@ namespace rpc
expect<void> watch_scan_signals() noexcept;
//! Wait for new block announce or internal timeout.
expect<minimal_chain_pub> wait_for_block();
expect<std::vector<std::pair<topic, std::string>>> wait_for_block();
//! \return A JSON message for RPC request `M`.
template<typename M>

View file

@ -27,6 +27,8 @@
#include "daemon_pub.h"
#include "cryptonote_basic/cryptonote_basic.h" // monero/src
#include "rpc/daemon_zmq.h"
#include "wire/crypto.h"
#include "wire/error.h"
#include "wire/field.h"
@ -83,5 +85,19 @@ namespace rpc
return err;
return {std::move(out)};
}
static void read_bytes(wire::json_reader& source, full_txpool_pub& self)
{
wire_read::array(source, self.txes);
}
expect<full_txpool_pub> full_txpool_pub::from_json(std::string&& source)
{
full_txpool_pub out{};
std::error_code err = wire::json::from_bytes(std::move(source), out);
if (err)
return err;
return {std::move(out)};
}
}
}

View file

@ -34,6 +34,7 @@
#include "crypto/hash.h" // monero/src
#include "wire/json/fwd.h"
namespace cryptonote { class transaction; }
namespace lws
{
namespace rpc
@ -46,5 +47,12 @@ namespace rpc
static expect<minimal_chain_pub> from_json(std::string&&);
};
struct full_txpool_pub
{
std::vector<cryptonote::transaction> txes;
static expect<full_txpool_pub> from_json(std::string&&);
};
}
}

View file

@ -42,6 +42,7 @@ namespace
constexpr const std::size_t default_inputs = 2;
constexpr const std::size_t default_outputs = 4;
constexpr const std::size_t default_txextra_size = 2048;
constexpr const std::size_t default_txpool_size = 32;
}
namespace rct
@ -141,7 +142,7 @@ namespace cryptonote
);
}
static void read_bytes(wire::json_reader& source, transaction& self)
void read_bytes(wire::json_reader& source, transaction& self)
{
self.vin.reserve(default_inputs);
self.vout.reserve(default_outputs);
@ -177,6 +178,11 @@ namespace cryptonote
self.transactions.reserve(default_transaction_count);
wire::object(source, WIRE_FIELD(block), WIRE_FIELD(transactions));
}
static void read_bytes(wire::json_reader& source, tx_in_pool& self)
{
wire::object(source, WIRE_FIELD(tx), WIRE_FIELD(tx_hash));
}
} // rpc
} // cryptonote
@ -187,3 +193,8 @@ void lws::rpc::read_bytes(wire::json_reader& source, get_blocks_fast_response& s
wire::object(source, WIRE_FIELD(blocks), WIRE_FIELD(output_indices), WIRE_FIELD(start_height), WIRE_FIELD(current_height));
}
void lws::rpc::read_bytes(wire::json_reader& source, get_transaction_pool_response& self)
{
self.transactions.reserve(default_txpool_size);
wire::object(source, WIRE_FIELD(transactions));
}

View file

@ -40,9 +40,13 @@ namespace crypto
namespace cryptonote
{
class transaction;
void read_bytes(wire::json_reader& source, transaction& self);
namespace rpc
{
struct block_with_transactions;
struct tx_in_pool;
}
}
@ -71,5 +75,21 @@ namespace rpc
using response = get_blocks_fast_response;
};
void read_bytes(wire::json_reader&, get_blocks_fast_response&);
struct get_transaction_pool_request
{
get_transaction_pool_request() = delete;
};
struct get_transaction_pool_response
{
get_transaction_pool_response() = delete;
std::vector<cryptonote::rpc::tx_in_pool> transactions;
};
struct get_transaction_pool
{
using request = get_transaction_pool_request;
using response = get_transaction_pool_response;
};
void read_bytes(wire::json_reader&, get_transaction_pool_response&);
} // rpc
} // lws

View file

@ -35,6 +35,8 @@
#include <cassert>
#include <chrono>
#include <cstring>
#include <functional>
#include <limits>
#include <type_traits>
#include <utility>
#include <vector>
@ -50,7 +52,8 @@
#include "misc_log_ex.h" // monero/contrib/epee/include
#include "net/http_client.h"
#include "net/net_parse_helpers.h"
#include "rpc/daemon_messages.h" // monero/src
#include "rpc/daemon_messages.h" // monero/src
#include "rpc/message_data_structs.h" // monero/src
#include "rpc/daemon_zmq.h"
#include "rpc/json.h"
#include "util/source_location.h"
@ -119,9 +122,16 @@ namespace lws
}
}
bool is_new_block(db::storage& disk, const account& user, const rpc::minimal_chain_pub& chain)
bool is_new_block(std::string&& chain_msg, db::storage& disk, const account& user)
{
if (user.scan_height() < db::block_id(chain.top_block_height))
const auto chain = rpc::minimal_chain_pub::from_json(std::move(chain_msg));
if (!chain)
{
MERROR("Unable to parse blockchain notification: " << chain.error());
return false;
}
if (user.scan_height() < db::block_id(chain->top_block_height))
return true;
auto reader = disk.start_read();
@ -132,8 +142,8 @@ namespace lws
}
// check possible chain rollback daemon side
const expect<crypto::hash> id = reader->get_block_hash(db::block_id(chain.top_block_height));
if (!id || *id != chain.top_block_id)
const expect<crypto::hash> id = reader->get_block_hash(db::block_id(chain->top_block_height));
if (!id || *id != chain->top_block_id)
return true;
// check possible chain rollback from other thread
@ -241,13 +251,103 @@ namespace lws
}
};
void scan_transaction(
struct add_spend
{
void operator()(lws::account& user, const db::spend& spend) const
{ user.add_spend(spend); }
};
struct add_output
{
bool operator()(lws::account& user, const db::output& out) const
{ return user.add_out(out); }
};
struct null_spend
{
void operator()(lws::account&, const db::spend&) const noexcept
{}
};
struct send_webhook
{
db::storage const& disk_;
rpc::client& client_;
net::ssl_verification_t verify_mode_;
std::unordered_map<crypto::hash, crypto::hash> txpool_;
bool operator()(lws::account& user, const db::output& out)
{
/* Upstream monerod does not send all fields for a transaction, so
mempool notifications cannot compute tx_hash correctly (it is not
sent separately, a further blunder). Instead, if there are matching
outputs with webhooks, fetch mempool to compare tx_prefix_hash and
then use corresponding tx_hash. */
const db::webhook_key key{user.id(), db::webhook_type::tx_confirmation};
std::vector<db::webhook_value> hooks{};
{
auto reader = disk_.start_read();
if (!reader)
{
MERROR("Unable to lookup webhook on tx in pool: " << reader.error().message());
return false;
}
auto found = reader->find_webhook(key, out.payment_id.short_);
if (!found)
{
MERROR("Failed db lookup for webhooks: " << found.error().message());
return false;
}
hooks = std::move(*found);
}
if (!hooks.empty() && txpool_.empty())
{
cryptonote::rpc::GetTransactionPool::Request req{};
if (!send(client_, rpc::client::make_message("get_transaction_pool", req)))
{
MERROR("Unable to compute tx hash for webhook, aborting");
return false;
}
auto resp = client_.get_message(std::chrono::seconds{3});
if (!resp)
{
MERROR("Unable to get txpool: " << resp.error().message());
return false;
}
rpc::json<rpc::get_transaction_pool>::response txpool{};
const std::error_code err = wire::json::from_bytes(std::move(*resp), txpool);
if (err)
MONERO_THROW(err, "Invalid json-rpc");
for (auto& tx : txpool.result.transactions)
txpool_.emplace(get_transaction_prefix_hash(tx.tx), tx.tx_hash);
}
std::vector<db::webhook_tx_confirmation> events{};
for (auto& hook : hooks)
{
events.push_back(db::webhook_tx_confirmation{key, std::move(hook), out});
events.back().value.second.confirmations = 0;
const auto hash = txpool_.find(out.tx_prefix_hash);
if (hash != txpool_.end())
events.back().tx_info.link.tx_hash = hash->second;
else
events.pop_back(); //cannot compute tx_hash
}
send_via_http(epee::to_span(events), std::chrono::seconds{5}, verify_mode_);
return true;
}
};
void scan_transaction_base(
epee::span<lws::account> users,
const db::block_id height,
const std::uint64_t timestamp,
crypto::hash const& tx_hash,
cryptonote::transaction const& tx,
std::vector<std::uint64_t> const& out_ids)
std::vector<std::uint64_t> const& out_ids,
std::function<void(lws::account&, const db::spend&)> spend_action,
std::function<bool(lws::account&, const db::output&)> output_action)
{
if (2 < tx.version)
throw std::runtime_error{"Unsupported tx version"};
@ -302,7 +402,8 @@ namespace lws
goffset += offset;
if (user.has_spendable(db::output_id{in_data->amount, goffset}))
{
user.add_spend(
spend_action(
user,
db::spend{
db::transaction_link{height, tx_hash},
in_data->k_image,
@ -377,7 +478,8 @@ namespace lws
}
}
const bool added = user.add_out(
const bool added = output_action(
user,
db::output{
db::transaction_link{height, tx_hash},
db::output::spend_meta_{
@ -405,6 +507,39 @@ namespace lws
} // for all users
}
void scan_transaction(
epee::span<lws::account> users,
const db::block_id height,
const std::uint64_t timestamp,
crypto::hash const& tx_hash,
cryptonote::transaction const& tx,
std::vector<std::uint64_t> const& out_ids)
{
scan_transaction_base(users, height, timestamp, tx_hash, tx, out_ids, add_spend{}, add_output{});
}
void scan_transactions(std::string&& txpool_msg, epee::span<lws::account> users, db::storage const& disk, rpc::client& client, const net::ssl_verification_t verify_mode)
{
// uint64::max is for txpool
static const std::vector<std::uint64_t> fake_outs(
256, std::numeric_limits<std::uint64_t>::max()
);
const auto parsed = rpc::full_txpool_pub::from_json(std::move(txpool_msg));
if (!parsed)
{
MERROR("Failed parsing txpool pub: " << parsed.error().message());
return;
}
const auto time =
boost::numeric_cast<std::uint64_t>(std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()));
send_webhook sender{disk, client, verify_mode};
for (const auto& tx : parsed->txes)
scan_transaction_base(users, db::block_id::txpool, time, crypto::hash{}, tx, fake_outs, null_spend{}, sender);
}
void update_rates(rpc::context& ctx)
{
const expect<boost::optional<lws::rates>> new_rates = ctx.retrieve_rates();
@ -489,20 +624,40 @@ namespace lws
if (fetched.result.blocks.size() <= 1)
{
// synced to top of chain, wait for next blocks
for (;;)
for (bool wait_for_block = true; wait_for_block; )
{
const expect<rpc::minimal_chain_pub> new_block = client.wait_for_block();
if (new_block.matches(std::errc::interrupted))
return;
if (!new_block || is_new_block(disk, users.front(), *new_block))
break;
}
expect<std::vector<std::pair<rpc::client::topic, std::string>>> new_pubs = client.wait_for_block();
if (new_pubs.matches(std::errc::interrupted))
return; // reset entire state (maybe shutdown)
if (!new_pubs)
break; // exit wait for block loop, and try fetching new blocks
// put txpool messages before block messages
static_assert(rpc::client::topic::block < rpc::client::topic::txpool, "bad sort");
std::sort(new_pubs->begin(), new_pubs->end(), std::greater<>{});
// process txpool first
auto message = new_pubs->begin();
for ( ; message != new_pubs->end(); ++message)
{
if (message->first != rpc::client::topic::txpool)
break; // inner for loop
scan_transactions(std::move(message->second), epee::to_mut_span(users), disk, client, webhook_verify);
}
for ( ; message != new_pubs->end(); ++message)
{
if (message->first == rpc::client::topic::block && is_new_block(std::move(message->second), disk, users.front()))
wait_for_block = false;
}
} // wait for block
// request next chunk of blocks
if (!send(client, block_request.clone()))
return;
continue; // to next get_blocks_fast read
}
} // if only one block was fetched
// request next chunk of blocks
if (!send(client, block_request.clone()))

View file

@ -0,0 +1,78 @@
// Copyright (c) 2021-2023, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <functional>
#include <utility>
#include "wire/field.h"
#include "wire/traits.h"
//! An optional field that is omitted when a default value is used
#define WIRE_FIELD_DEFAULTED(name, default_) \
::wire::optional_field( #name , ::wire::defaulted(std::ref( self . name ), default_ ))
namespace wire
{
/*! A wrapper that tells `wire::writer`s to skip field generation when default
value, and tells `wire::reader`s to use default value when field not present. */
template<typename T, typename U>
struct defaulted_
{
using value_type = typename unwrap_reference<T>::type;
T value;
U default_;
constexpr const value_type& get_value() const noexcept { return value; }
value_type& get_value() noexcept { return value; }
// concept requirements for optional fields
constexpr explicit operator bool() const { return get_value() != default_; }
value_type& emplace() noexcept { return get_value(); }
constexpr const value_type& operator*() const noexcept { return get_value(); }
value_type& operator*() noexcept { return get_value(); }
void reset() { get_value() = default_; }
};
//! Links `value` with `default_`.
template<typename T, typename U>
inline constexpr defaulted_<T, U> defaulted(T value, U default_)
{
return {std::move(value), std::move(default_)};
}
/* read/write functions not needed since `defaulted_` meets the concept
requirements for an optional type (optional fields are handled
directly by the generic read/write code because the field name is omitted
entirely when the value is "empty"). */
} // wire