New accounts are 'pushed' to worker threads (#102)

This commit is contained in:
Lee *!* Clagett 2024-04-08 12:58:43 -04:00 committed by GitHub
parent fb6e1dedd5
commit 08f8403f13
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 743 additions and 50 deletions

View file

@ -52,7 +52,7 @@
#include "rpc/admin.h"
#include "span.h" // monero/contrib/epee/include
#include "string_tools.h" // monero/contrib/epee/include
#include "wire/crypto.h"
#include "wire/adapted/crypto.h"
#include "wire/filters.h"
#include "wire/json/write.h"
#include "wire/wrapper/array.h"

View file

@ -33,6 +33,11 @@
#include "common/expect.h"
#include "db/data.h"
#include "db/string.h"
#include "wire/adapted/crypto.h"
#include "wire/adapted/pair.h"
#include "wire/msgpack.h"
#include "wire/vector.h"
#include "wire/wrapper/trusted_array.h"
namespace lws
{
@ -50,6 +55,10 @@ namespace lws
struct account::internal
{
internal()
: address(), id(db::account_id::invalid), pubs{}, view_key{}
{}
explicit internal(db::account const& source)
: address(db::address_string(source.address)), id(source.id), pubs(source.address), view_key()
{
@ -66,6 +75,23 @@ namespace lws
);
}
void read_bytes(wire::msgpack_reader& source)
{ map(source, *this); }
void write_bytes(wire::msgpack_writer& dest) const
{ map(dest, *this); }
template<typename F, typename T>
static void map(F& format, T& self)
{
wire::object(format,
WIRE_FIELD_ID(0, address),
WIRE_FIELD_ID(1, id),
WIRE_FIELD_ID(2, pubs),
WIRE_FIELD_ID(3, view_key)
);
}
std::string address;
db::account_id id;
db::account_address pubs;
@ -87,6 +113,23 @@ namespace lws
MONERO_THROW(::common_error::kInvalidArgument, "using moved from account");
}
template<typename F, typename T, typename U>
void account::map(F& format, T& self, U& immutable)
{
wire::object(format,
wire::field<0>("immutable_", std::ref(immutable)),
wire::optional_field<1>("spendable_", wire::trusted_array(std::ref(self.spendable_))),
wire::optional_field<2>("pubs_", wire::trusted_array(std::ref(self.pubs_))),
wire::optional_field<3>("spends_", wire::trusted_array(std::ref(self.spends_))),
wire::optional_field<4>("outputs_", wire::trusted_array(std::ref(self.outputs_))),
WIRE_FIELD_ID(5, height_)
);
}
account::account() noexcept
: immutable_(nullptr), spendable_(), pubs_(), spends_(), outputs_(), height_(db::block_id(0))
{}
account::account(db::account const& source, std::vector<std::pair<db::output_id, db::address_index>> spendable, std::vector<crypto::public_key> pubs)
: account(std::make_shared<internal>(source), source.scan_height, std::move(spendable), std::move(pubs))
{
@ -97,6 +140,21 @@ namespace lws
account::~account() noexcept
{}
void account::read_bytes(::wire::msgpack_reader& source)
{
auto immutable = std::make_shared<internal>();
map(source, *this, *immutable);
immutable_ = std::move(immutable);
std::sort(spendable_.begin(), spendable_.end());
std::sort(pubs_.begin(), pubs_.end(), sort_pubs{});
}
void account::write_bytes(::wire::msgpack_writer& dest) const
{
null_check();
map(dest, *this, *immutable_);
}
account account::clone() const
{
account result{immutable_, height_, spendable_, pubs_};

View file

@ -36,6 +36,8 @@
#include "fwd.h"
#include "db/data.h"
#include "db/fwd.h"
#include "wire/fwd.h"
#include "wire/msgpack/fwd.h"
namespace lws
{
@ -54,8 +56,14 @@ namespace lws
explicit account(std::shared_ptr<const internal> immutable, db::block_id height, std::vector<std::pair<db::output_id, db::address_index>> spendable, std::vector<crypto::public_key> pubs) noexcept;
void null_check() const;
template<typename F, typename T, typename U>
static void map(F& format, T& self, U& immutable);
public:
//! Construct an "invalid" account (for de-serialization)
account() noexcept;
//! Construct an account from `source` and current `spendable` outputs.
explicit account(db::account const& source, std::vector<std::pair<db::output_id, db::address_index>> spendable, std::vector<crypto::public_key> pubs);
@ -71,6 +79,12 @@ namespace lws
account& operator=(const account&) = delete;
account& operator=(account&&) = default;
//! Read into `this` from `source`.
void read_bytes(::wire::msgpack_reader& source);
//! Write to `dest` from `this`.
void write_bytes(::wire::msgpack_writer& dest) const;
//! \return A copy of `this`.
account clone() const;

View file

@ -1,4 +1,4 @@
// Copyright (c) 2018, The Monero Project
// Copyright (c) 2018-2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
@ -36,7 +36,7 @@
#include "ringct/rctTypes.h" // monero/src
#include "wire.h"
#include "wire/adapted/array.h"
#include "wire/crypto.h"
#include "wire/adapted/crypto.h"
#include "wire/json/write.h"
#include "wire/msgpack.h"
#include "wire/uuid.h"
@ -54,7 +54,7 @@ namespace db
template<typename F, typename T>
void map_output_id(F& format, T& self)
{
wire::object(format, WIRE_FIELD(high), WIRE_FIELD(low));
wire::object(format, WIRE_FIELD_ID(0, high), WIRE_FIELD_ID(1, low));
}
}
WIRE_DEFINE_OBJECT(output_id, map_output_id);
@ -72,7 +72,7 @@ namespace db
template<typename F, typename T>
void map_account_address(F& format, T& self)
{
wire::object(format, WIRE_FIELD(spend_public), WIRE_FIELD(view_public));
wire::object(format, WIRE_FIELD_ID(0, spend_public), WIRE_FIELD_ID(1, view_public));
}
}
WIRE_DEFINE_OBJECT(account_address, map_account_address);
@ -250,6 +250,55 @@ namespace db
}
WIRE_DEFINE_OBJECT(transaction_link, map_transaction_link);
void read_bytes(wire::reader& source, output& self)
{
bool coinbase = false;
boost::optional<rct::key> rct;
boost::optional<std::vector<std::uint8_t>> payment_id;
wire::object(source,
wire::optional_field<0>("id", wire::defaulted(std::ref(self.spend_meta.id), output_id::txpool())),
wire::optional_field<1>("block", wire::defaulted(std::ref(self.link.height), block_id::txpool)),
wire::field<2>("index", std::ref(self.spend_meta.index)),
wire::field<3>("amount", std::ref(self.spend_meta.amount)),
wire::field<4>("timestamp", std::ref(self.timestamp)),
wire::field<5>("tx_hash", std::ref(self.link.tx_hash)),
wire::field<6>("tx_prefix_hash", std::ref(self.tx_prefix_hash)),
wire::field<7>("tx_public", std::ref(self.spend_meta.tx_public)),
wire::optional_field<8>("rct_mask", std::ref(rct)),
wire::optional_field<9>("payment_id", std::ref(payment_id)),
wire::field<10>("unlock_time", std::ref(self.unlock_time)),
wire::field<11>("mixin_count", std::ref(self.spend_meta.mixin_count)),
wire::field<12>("coinbase", std::ref(coinbase)),
wire::field<13>("fee", std::ref(self.fee)),
wire::field<14>("recipient", std::ref(self.recipient)),
wire::field<15>("pub", std::ref(self.pub))
);
std::uint8_t pay_length = 0;
if (payment_id)
pay_length = payment_id->size();
if (pay_length && pay_length != 8 && pay_length != 32)
WIRE_DLOG_THROW(wire::error::schema::binary, "Unexpected binary size");
if (pay_length == 8)
std::memcpy(std::addressof(self.payment_id.short_), payment_id->data(), 8);
if (pay_length == 32)
std::memcpy(std::addressof(self.payment_id.long_), payment_id->data(), 32);
if (rct)
std::memcpy(std::addressof(self.ringct_mask), std::addressof(*rct), sizeof(self.ringct_mask));
else
std::memset(std::addressof(self.ringct_mask), 0, sizeof(self.ringct_mask));
extra flags{};
if (coinbase)
flags = lws::db::coinbase_output;
if (rct)
flags = extra(lws::db::ringct_output | flags);
self.extra = db::pack(flags, pay_length);
}
void write_bytes(wire::writer& dest, const output& self)
{
const std::pair<db::extra, std::uint8_t> unpacked =
@ -286,7 +335,8 @@ namespace db
wire::field<11>("mixin_count", self.spend_meta.mixin_count),
wire::field<12>("coinbase", coinbase),
wire::field<13>("fee", self.fee),
wire::field<14>("recipient", self.recipient)
wire::field<14>("recipient", self.recipient),
wire::field<15>("pub", std::cref(self.pub))
);
}
@ -296,15 +346,15 @@ namespace db
void map_spend(F& format, T1& self, T2& payment_id)
{
wire::object(format,
wire::field("height", std::ref(self.link.height)),
wire::field("tx_hash", std::ref(self.link.tx_hash)),
WIRE_FIELD(image),
WIRE_FIELD(source),
WIRE_FIELD(timestamp),
WIRE_FIELD(unlock_time),
WIRE_FIELD(mixin_count),
wire::optional_field("payment_id", std::ref(payment_id)),
WIRE_FIELD(sender)
wire::field<0>("height", std::ref(self.link.height)),
wire::field<1>("tx_hash", std::ref(self.link.tx_hash)),
WIRE_FIELD_ID(2, image),
WIRE_FIELD_ID(3, source),
WIRE_FIELD_ID(4, timestamp),
WIRE_FIELD_ID(5, unlock_time),
WIRE_FIELD_ID(6, mixin_count),
wire::optional_field<7>("payment_id", std::ref(payment_id)),
WIRE_FIELD_ID(8, sender)
);
}
}

View file

@ -291,6 +291,7 @@ namespace db
sizeof(output) == 8 + 32 + (8 * 3) + (4 * 2) + 32 + (8 * 2) + (32 * 3) + 7 + 1 + 32 + 8 + 2 * 4,
"padding in output"
);
void read_bytes(wire::reader&, output&);
void write_bytes(wire::writer&, const output&);
//! Information about a possible spend of a received `output`.

View file

@ -57,7 +57,7 @@
#include "util/gamma_picker.h"
#include "util/random_outputs.h"
#include "util/source_location.h"
#include "wire/crypto.h"
#include "wire/adapted/crypto.h"
#include "wire/json.h"
namespace lws

View file

@ -36,7 +36,7 @@
#include "error.h"
#include "span.h" // monero/contrib/epee/include
#include "wire.h"
#include "wire/crypto.h"
#include "wire/adapted/crypto.h"
#include "wire/error.h"
#include "wire/json/write.h"
#include "wire/traits.h"

View file

@ -34,11 +34,14 @@
#include <system_error>
#include "common/error.h" // monero/contrib/epee/include
#include "db/account.h"
#include "error.h"
#include "misc_log_ex.h" // monero/contrib/epee/include
#include "net/http_client.h" // monero/contrib/epee/include
#include "net/zmq.h" // monero/src
#include "scanner.h"
#include "serialization/json_object.h" // monero/src
#include "wire/msgpack.h"
#if MLWS_RMQ_ENABLED
#include <amqp.h>
#include <amqp_tcp_socket.h>
@ -53,11 +56,13 @@ namespace rpc
namespace
{
constexpr const char signal_endpoint[] = "inproc://signal";
constexpr const char account_endpoint[] = "inproc://account"; // append integer every new `account_push`
constexpr const char abort_scan_signal[] = "SCAN";
constexpr const char abort_process_signal[] = "PROCESS";
constexpr const char minimal_chain_topic[] = "json-minimal-chain_main";
constexpr const char full_txpool_topic[] = "json-full-txpool_add";
constexpr const int daemon_zmq_linger = 0;
constexpr const int account_zmq_linger = 0;
constexpr const std::int64_t max_msg_sub = 10 * 1024 * 1024; // 50 MiB
constexpr const std::int64_t max_msg_req = 350 * 1024 * 1024; // 350 MiB
constexpr const std::chrono::seconds chain_poll_timeout{20};
@ -192,6 +197,7 @@ namespace rpc
, cache_time()
, cache_interval(interval)
, cached{}
, account_counter(0)
, sync_pub()
, sync_rates()
, untrusted_daemon(untrusted_daemon)
@ -210,12 +216,70 @@ namespace rpc
std::chrono::steady_clock::time_point cache_time;
const std::chrono::minutes cache_interval;
rates cached;
std::atomic<unsigned> account_counter;
boost::mutex sync_pub;
boost::mutex sync_rates;
const bool untrusted_daemon;
};
} // detail
expect<account_push> account_push::make(std::shared_ptr<detail::context> ctx) noexcept
{
MONERO_PRECOND(ctx != nullptr);
account_push out{ctx};
out.sock.reset(zmq_socket(ctx->comm.get(), ZMQ_PUSH));
if (out.sock == nullptr)
return {net::zmq::get_error_code()};
const std::string bind = account_endpoint + std::to_string(++ctx->account_counter);
MONERO_CHECK(do_set_option(out.sock.get(), ZMQ_LINGER, account_zmq_linger));
MONERO_ZMQ_CHECK(zmq_bind(out.sock.get(), bind.c_str()));
return {std::move(out)};
}
account_push::~account_push() noexcept
{}
expect<void> account_push::push(epee::span<const lws::account> accounts, std::chrono::seconds timeout)
{
MONERO_PRECOND(ctx != nullptr);
assert(sock.get() != nullptr);
for (const lws::account& account : accounts)
{
// use integer id values (quick and fast)
wire::msgpack_slice_writer dest{true};
try
{
wire_write::bytes(dest, account);
}
catch (const wire::exception& e)
{
return {e.code()};
}
epee::byte_slice message{dest.take_sink()};
/* This is being pushed by the thread that monitors for shutdown, so
no signal is expected. */
expect<void> sent;
const auto start = std::chrono::steady_clock::now();
while (!(sent = net::zmq::send(message.clone(), sock.get(), ZMQ_DONTWAIT)))
{
if (sent != net::zmq::make_error_code(EAGAIN))
return sent.error();
if (!scanner::is_running())
return {error::signal_abort_process};
const auto elapsed = std::chrono::steady_clock::now() - start;
if (timeout <= elapsed)
return {error::daemon_timeout};
boost::this_thread::sleep_for(boost::chrono::milliseconds{10});
}
}
return success();
}
expect<void> client::get_response(cryptonote::rpc::Message& response, const std::chrono::seconds timeout, const source_location loc)
{
expect<std::string> message = get_message(timeout);
@ -312,6 +376,18 @@ namespace rpc
return do_subscribe(signal_sub.get(), abort_scan_signal);
}
expect<void> client::enable_pull_accounts()
{
detail::socket new_sock{zmq_socket(ctx->comm.get(), ZMQ_PULL)};
if (new_sock == nullptr)
return {net::zmq::get_error_code()};
const std::string connect =
account_endpoint + std::to_string(ctx->account_counter);
MONERO_ZMQ_CHECK(zmq_connect(new_sock.get(), connect.c_str()));
account_pull = std::move(new_sock);
return success();
}
expect<std::vector<std::pair<client::topic, std::string>>> client::wait_for_block()
{
MONERO_PRECOND(ctx != nullptr);
@ -425,6 +501,32 @@ namespace rpc
return rc;
}
expect<std::vector<lws::account>> client::pull_accounts()
{
MONERO_PRECOND(ctx != nullptr);
if (!account_pull)
MONERO_CHECK(enable_pull_accounts());
std::vector<lws::account> out{};
for (;;)
{
expect<std::string> next = net::zmq::receive(account_pull.get(), ZMQ_DONTWAIT);
if (!next)
{
if (net::zmq::make_error_code(EAGAIN))
break;
return next.error();
}
out.emplace_back();
const std::error_code error =
wire::msgpack::from_bytes(epee::byte_slice{std::move(*next)}, out.back());
if (error)
return error;
}
return {std::move(out)};
}
expect<rates> client::get_rates() const
{
MONERO_PRECOND(ctx != nullptr);
@ -459,7 +561,7 @@ namespace rpc
if (zmq_bind(external_pub.get(), pub_addr.c_str()) < 0)
MONERO_THROW(net::zmq::get_error_code(), "zmq_bind");
}
rcontext rmq{};
#ifdef MLWS_RMQ_ENABLED
if (!rmq_info.address.empty())

View file

@ -34,6 +34,7 @@
#include <zmq.h>
#include "byte_slice.h" // monero/contrib/epee/include
#include "db/fwd.h"
#include "common/expect.h" // monero/src
#include "rpc/message.h" // monero/src
#include "rpc/daemon_pub.h"
@ -67,6 +68,31 @@ namespace rpc
std::string routing;
};
//! Every scanner "reset", a new socket is created so old messages are discarded
class account_push
{
std::shared_ptr<detail::context> ctx;
detail::socket sock;
explicit account_push(std::shared_ptr<detail::context> ctx) noexcept
: ctx(std::move(ctx)), sock()
{}
public:
static expect<account_push> make(std::shared_ptr<detail::context> ctx) noexcept;
account_push(const account_push&) = delete;
account_push(account_push&&) = default;
~account_push() noexcept;
account_push& operator=(const account_push&) = delete;
account_push& operator=(account_push&&) = default;
//! Push new `accounts` to worker threads. Each account is sent in unique message
expect<void> push(epee::span<const lws::account> accounts, std::chrono::seconds timeout);
};
//! Abstraction for ZMQ RPC client. Only `get_rates()` thread-safe; use `clone()`.
class client
{
@ -74,9 +100,10 @@ namespace rpc
detail::socket daemon;
detail::socket daemon_sub;
detail::socket signal_sub;
detail::socket account_pull;
explicit client(std::shared_ptr<detail::context> ctx) noexcept
: ctx(std::move(ctx)), daemon(), daemon_sub(), signal_sub()
: ctx(std::move(ctx)), daemon(), daemon_sub(), signal_sub(), account_pull()
{}
//! Expect `response` as the next message payload unless error.
@ -129,6 +156,9 @@ namespace rpc
//! `wait`, `send`, and `receive` will watch for `raise_abort_scan()`.
expect<void> watch_scan_signals() noexcept;
//! Register `this` client as listening for new accounts
expect<void> enable_pull_accounts();
//! Wait for new block announce or internal timeout.
expect<std::vector<std::pair<topic, std::string>>> wait_for_block();
@ -173,6 +203,9 @@ namespace rpc
return response;
}
//! Retrieve new accounts to be scanned on this thread.
expect<std::vector<lws::account>> pull_accounts();
/*!
\note This is the one function that IS thread-safe. Multiple threads can
call this function with the same `this` argument.
@ -232,6 +265,12 @@ namespace rpc
return client::make(ctx);
}
//! Create a new account push state
expect<account_push> bind_push() const noexcept
{
return account_push::make(ctx);
}
/*!
All block `client::send`, `client::receive`, and `client::wait` calls
originating from `this` object AND whose `watch_scan_signal` method was

View file

@ -29,7 +29,7 @@
#include "cryptonote_basic/cryptonote_basic.h" // monero/src
#include "rpc/daemon_zmq.h"
#include "wire/crypto.h"
#include "wire/adapted/crypto.h"
#include "wire/error.h"
#include "wire/field.h"
#include "wire/traits.h"

View file

@ -31,7 +31,7 @@
#include "cryptonote_config.h" // monero/src
#include "crypto/crypto.h" // monero/src
#include "rpc/message_data_structs.h" // monero/src
#include "wire/crypto.h"
#include "wire/adapted/crypto.h"
#include "wire/json.h"
#include "wire/wrapper/array.h"
#include "wire/wrapper/variant.h"

View file

@ -40,7 +40,7 @@
#include "ringct/rctOps.h" // monero/src
#include "span.h" // monero/contrib/epee/include
#include "util/random_outputs.h"
#include "wire/crypto.h"
#include "wire/adapted/crypto.h"
#include "wire/error.h"
#include "wire/json.h"
#include "wire/traits.h"

View file

@ -31,7 +31,7 @@
#include "db/account.h"
#include "rpc/client.h"
#include "rpc/webhook.h"
#include "wire/crypto.h"
#include "wire/adapted/crypto.h"
#include "wire/wrapper/array.h"
#include "wire/wrappers_impl.h"
#include "wire/write.h"

View file

@ -694,6 +694,35 @@ namespace lws
return;
}
{
expect<std::vector<lws::account>> new_accounts = client.pull_accounts();
if (!new_accounts)
{
MERROR("Failed to pull new accounts: " << new_accounts.error().message());
return; // get all active accounts the easy way
}
if (!new_accounts->empty())
{
MINFO("Received " << new_accounts->size() << " new account(s) for scanning");
std::sort(new_accounts->begin(), new_accounts->end(), by_height{});
const db::block_id oldest = new_accounts->front().scan_height();
users.insert(
users.end(),
std::make_move_iterator(new_accounts->begin()),
std::make_move_iterator(new_accounts->end())
);
if (std::uint64_t(oldest) < fetched->start_height)
{
req.start_height = std::uint64_t(oldest);
block_request = rpc::client::make_message("get_blocks_fast", req);
if (!send(client, block_request.clone()))
return;
continue; // to next get_blocks_fast read
}
// else, the oldest new account is within the newly fetched range
}
}
// prep for next blocks retrieval
req.start_height = fetched->start_height + fetched->blocks.size() - 1;
block_request = rpc::client::make_message("get_blocks_fast", req);
@ -913,6 +942,27 @@ namespace lws
}
}
lws::account prep_account(db::storage_reader& reader, const lws::db::account& user)
{
std::vector<std::pair<db::output_id, db::address_index>> receives{};
std::vector<crypto::public_key> pubs{};
auto receive_list = MONERO_UNWRAP(reader.get_outputs(user.id));
const std::size_t elems = receive_list.count();
receives.reserve(elems);
pubs.reserve(elems);
for (auto output = receive_list.make_iterator(); !output.is_end(); ++output)
{
auto id = output.get_value<MONERO_FIELD(db::output, spend_meta.id)>();
auto subaddr = output.get_value<MONERO_FIELD(db::output, recipient)>();
receives.emplace_back(std::move(id), std::move(subaddr));
pubs.emplace_back(output.get_value<MONERO_FIELD(db::output, pub)>());
}
return lws::account{user, std::move(receives), std::move(pubs)};
}
/*!
Launches `thread_count` threads to run `scan_loop`, and then polls for
active account changes in background
@ -964,9 +1014,13 @@ namespace lws
threads.reserve(thread_count);
std::sort(users.begin(), users.end(), by_height{});
// enable the new bind point before registering pull accounts
lws::rpc::account_push pusher = MONERO_UNWRAP(ctx.bind_push());
MINFO("Starting scan loops on " << std::min(thread_count, users.size()) << " thread(s) with " << users.size() << " account(s)");
bool leader_thread = true;
bool remaining_threads = true;
while (!users.empty() && --thread_count)
{
const std::size_t per_thread = std::max(std::size_t(1), users.size() / (thread_count + 1));
@ -977,7 +1031,8 @@ namespace lws
users.erase(users.end() - count, users.end());
rpc::client client = MONERO_UNWRAP(ctx.connect());
client.watch_scan_signals();
MONERO_UNWRAP(client.watch_scan_signals());
MONERO_UNWRAP(client.enable_pull_accounts());
auto data = std::make_shared<thread_data>(
std::move(client), disk.clone(), std::move(thread_users), opts
@ -989,12 +1044,14 @@ namespace lws
if (!users.empty())
{
rpc::client client = MONERO_UNWRAP(ctx.connect());
client.watch_scan_signals();
MONERO_UNWRAP(client.watch_scan_signals());
MONERO_UNWRAP(client.enable_pull_accounts());
auto data = std::make_shared<thread_data>(
std::move(client), disk.clone(), std::move(users), opts
);
threads.emplace_back(attrs, std::bind(&scan_loop, std::ref(self), std::move(data), opts.untrusted_daemon, leader_thread));
remaining_threads = false;
}
auto last_check = std::chrono::steady_clock::now();
@ -1035,22 +1092,51 @@ namespace lws
auto current_users = MONERO_UNWRAP(
reader->get_accounts(db::account_status::active, std::move(accounts_cur))
);
if (current_users.count() != active.size())
if (current_users.count() < active.size())
{
// cannot remove accounts via ZMQ (yet)
MINFO("Decrease in active user accounts detected, stopping scan threads...");
return;
}
std::vector<db::account_id> active_copy = active;
std::vector<lws::account> new_;
for (auto user = current_users.make_iterator(); !user.is_end(); ++user)
{
const db::account_id user_id = user.get_value<MONERO_FIELD(db::account, id)>();
const auto loc = std::lower_bound(active_copy.begin(), active_copy.end(), user_id);
if (loc == active_copy.end() || *loc != user_id)
{
new_.emplace_back(prep_account(*reader, user.get_value<db::account>()));
active.insert(
std::lower_bound(active.begin(), active.end(), user_id), user_id
);
}
else
active_copy.erase(loc);
}
if (!active_copy.empty())
{
MINFO("Change in active user accounts detected, stopping scan threads...");
return;
}
for (auto user = current_users.make_iterator(); !user.is_end(); ++user)
if (!new_.empty())
{
const db::account_id user_id = user.get_value<MONERO_FIELD(db::account, id)>();
if (!std::binary_search(active.begin(), active.end(), user_id))
if (remaining_threads)
{
MINFO("Change in active user accounts detected, stopping scan threads...");
MINFO("Received new account(s), starting more thread(s)");
return;
}
}
const auto pushed = pusher.push(epee::to_span(new_), std::chrono::seconds{1});
if (!pushed)
{
MERROR("Failed to push new account to workers: " << pushed.error().message());
return; // pull in new accounts by resetting state
}
else
MINFO("Pushed " << new_.size() << " new accounts to worker thread(s)");
}
read_txn = reader->finish_read();
accounts_cur = current_users.give_cursor();
} // while scanning
@ -1293,23 +1379,7 @@ namespace lws
for (db::account user : accounts.make_range())
{
std::vector<std::pair<db::output_id, db::address_index>> receives{};
std::vector<crypto::public_key> pubs{};
auto receive_list = MONERO_UNWRAP(reader.get_outputs(user.id));
const std::size_t elems = receive_list.count();
receives.reserve(elems);
pubs.reserve(elems);
for (auto output = receive_list.make_iterator(); !output.is_end(); ++output)
{
auto id = output.get_value<MONERO_FIELD(db::output, spend_meta.id)>();
auto subaddr = output.get_value<MONERO_FIELD(db::output, recipient)>();
receives.emplace_back(std::move(id), std::move(subaddr));
pubs.emplace_back(output.get_value<MONERO_FIELD(db::output, pub)>());
}
users.emplace_back(user, std::move(receives), std::move(pubs));
users.emplace_back(prep_account(reader, user));
active.insert(
std::lower_bound(active.begin(), active.end(), user.id), user.id
);

View file

@ -27,7 +27,7 @@
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
set(monero-lws-wire_sources error.cpp read.cpp write.cpp)
set(monero-lws-wire_headers crypto.h error.h field.h filters.h fwd.h json.h read.h traits.h vector.h write.h)
set(monero-lws-wire_headers error.h field.h filters.h fwd.h json.h read.h traits.h vector.h write.h)
add_library(monero-lws-wire ${monero-lws-wire_sources} ${monero-lws-wire_headers})
target_include_directories(monero-lws-wire PUBLIC "${LMDB_INCLUDE}")

View file

@ -41,6 +41,12 @@ namespace crypto
{
source.binary(epee::as_mut_byte_span(unwrap(unwrap(self))));
}
template<typename W>
void write_bytes(W& dest, const crypto::secret_key& self)
{
dest.binary(epee::as_byte_span(unwrap(unwrap(self))));
}
}
namespace wire

50
src/wire/adapted/pair.h Normal file
View file

@ -0,0 +1,50 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include "wire/field.h"
#include "wire/read.h"
#include "wire/write.h"
namespace wire
{
template<typename F, typename T>
void map_pair(F& format, T& self)
{
wire::object(format, WIRE_FIELD_ID(0, first), WIRE_FIELD_ID(1, second));
}
template<typename R, typename T, typename U>
void read_bytes(R& source, std::pair<T, U>& dest)
{ map_pair(source, dest); }
template<typename W, typename T, typename U>
void write_bytes(W& dest, const std::pair<T, U>& source)
{ map_pair(dest, source); }
}

View file

@ -170,6 +170,11 @@ namespace wire
inline std::enable_if_t<is_blob<T>::value> read_bytes(R& source, T& dest)
{ source.binary(epee::as_mut_byte_span(dest)); }
//! Use `read_bytes(...)` method if available for `T`.
template<typename R, typename T>
inline auto read_bytes(R& source, T& dest) -> decltype(dest.read_bytes(source))
{ return dest.read_bytes(source); }
namespace integer
{
[[noreturn]] void throw_exception(std::intmax_t value, std::intmax_t min, std::intmax_t max);

View file

@ -0,0 +1,76 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <cstddef>
#include <limits>
#include "wire/traits.h"
#include "wire/read.h"
#include "wire/write.h"
namespace wire
{
//! \brief Wrapper that removes read constraints
template<typename T>
struct trusted_array_
{
using container_type = wire::unwrap_reference_t<T>;
T container;
const container_type& get_container() const noexcept { return container; }
container_type& get_container() noexcept { return container; }
// concept requirements for optional fields
explicit operator bool() const noexcept { return !get_container().empty(); }
trusted_array_& emplace() noexcept { return *this; }
trusted_array_& operator*() noexcept { return *this; }
const trusted_array_& operator*() const noexcept { return *this; }
void reset() { get_container().clear(); }
};
template<typename T>
trusted_array_<T> trusted_array(T value)
{
return {std::move(value)};
}
template<typename R, typename T>
void read_bytes(R& source, trusted_array_<T> dest)
{
wire_read::array_unchecked(source, dest.get_container(), 0, std::numeric_limits<std::size_t>::max());
}
template<typename W, typename T>
void write_bytes(W& dest, const trusted_array_<T> source)
{
wire_write::array(dest, source.get_container());
}
}

View file

@ -27,6 +27,7 @@
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
add_library(monero-lws-unit-db OBJECT
account.test.cpp
chain.test.cpp
data.test.cpp
storage.test.cpp

View file

@ -0,0 +1,221 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "framework.test.h"
#include "crypto/crypto.h" // monero/src
#include "cryptonote_basic/account.h" // monero/src
#include "db/account.h"
#include "wire/msgpack.h"
LWS_CASE("lws::account serialization")
{
cryptonote::account_keys keys{};
crypto::generate_keys(keys.m_account_address.m_spend_public_key, keys.m_spend_secret_key);
crypto::generate_keys(keys.m_account_address.m_view_public_key, keys.m_view_secret_key);
lws::db::account db_account{
lws::db::account_id(44),
lws::db::account_time(500000),
lws::db::account_address{
keys.m_account_address.m_view_public_key,
keys.m_account_address.m_spend_public_key
},
{},
lws::db::block_id(1000000),
lws::db::block_id(500000),
lws::db::account_time(250000)
};
std::memcpy(
std::addressof(db_account.key),
std::addressof(unwrap(unwrap(keys.m_view_secret_key))),
sizeof(db_account.key)
);
const std::vector<std::pair<lws::db::output_id, lws::db::address_index>> spendable{
{
lws::db::output_id{100, 2000},
lws::db::address_index{lws::db::major_index(1), lws::db::minor_index(34)}
}
};
const std::vector<crypto::public_key> pubs{crypto::rand<crypto::public_key>()};
lws::account account{db_account, spendable, pubs};
EXPECT(account);
const lws::db::transaction_link link{
lws::db::block_id(4000), crypto::rand<crypto::hash>()
};
const crypto::public_key tx_public = crypto::rand<crypto::public_key>();
const crypto::hash tx_prefix = crypto::rand<crypto::hash>();
const crypto::public_key pub = crypto::rand<crypto::public_key>();
const rct::key ringct = crypto::rand<rct::key>();
const auto extra =
lws::db::extra(lws::db::extra::coinbase_output | lws::db::extra::ringct_output);
const auto payment_id_ = crypto::rand<lws::db::output::payment_id_>();
const crypto::hash payment_id = crypto::rand<crypto::hash>();
const crypto::key_image image = crypto::rand<crypto::key_image>();
account.add_out(
lws::db::output{
link,
lws::db::output::spend_meta_{
lws::db::output_id{500, 30},
std::uint64_t(40000),
std::uint32_t(16),
std::uint32_t(2),
tx_public
},
std::uint64_t(7000),
std::uint64_t(4670),
tx_prefix,
pub,
ringct,
{0, 0, 0, 0, 0, 0, 0},
lws::db::pack(extra, sizeof(crypto::hash)),
payment_id_,
std::uint64_t(33444),
lws::db::address_index{lws::db::major_index(2), lws::db::minor_index(66)}
}
);
account.add_spend(
lws::db::spend{
link,
image,
lws::db::output_id{10, 70000},
std::uint64_t(66),
std::uint64_t(1500),
std::uint32_t(16),
{0, 0, 0},
32,
payment_id,
lws::db::address_index{lws::db::major_index(4), lws::db::minor_index(55)}
}
);
const std::string account_address = account.address();
EXPECT(account.id() == db_account.id);
EXPECT(account.view_public() == db_account.address.view_public);
EXPECT(account.spend_public() == db_account.address.spend_public);
EXPECT(account.view_key() == keys.m_view_secret_key);
EXPECT(account.scan_height() == db_account.scan_height);
{
const auto result = account.get_spendable(lws::db::output_id{100, 2000});
EXPECT(bool(result));
EXPECT(result->maj_i == lws::db::major_index(1));
EXPECT(result->min_i == lws::db::minor_index(34));
}
EXPECT(account.outputs().size() == 1);
EXPECT(account.outputs()[0].link == link);
EXPECT(account.outputs()[0].spend_meta.id.high == 500);
EXPECT(account.outputs()[0].spend_meta.id.low == 30);
EXPECT(account.outputs()[0].spend_meta.amount == 40000);
EXPECT(account.outputs()[0].spend_meta.mixin_count == 16);
EXPECT(account.outputs()[0].spend_meta.index == 2);
EXPECT(account.outputs()[0].spend_meta.tx_public == tx_public);
EXPECT(account.outputs()[0].timestamp == 7000);
EXPECT(account.outputs()[0].unlock_time == 4670);
EXPECT(account.outputs()[0].tx_prefix_hash == tx_prefix);
EXPECT(account.outputs()[0].pub == pub);
EXPECT(account.outputs()[0].ringct_mask == ringct);
{
const auto unpacked = lws::db::unpack(account.outputs()[0].extra);
EXPECT(unpacked.first == extra);
EXPECT(unpacked.second == sizeof(crypto::hash));
}
EXPECT(account.outputs()[0].recipient.maj_i == lws::db::major_index(2));
EXPECT(account.outputs()[0].recipient.min_i == lws::db::minor_index(66));
EXPECT(account.spends().size() == 1);
EXPECT(account.spends()[0].link == link);
EXPECT(account.spends()[0].image == image);
EXPECT(account.spends()[0].source.high == 10);
EXPECT(account.spends()[0].source.low == 70000);
EXPECT(account.spends()[0].timestamp == 66);
EXPECT(account.spends()[0].unlock_time == 1500);
EXPECT(account.spends()[0].mixin_count == 16);
EXPECT(account.spends()[0].length == 32);
EXPECT(account.spends()[0].payment_id == payment_id);
EXPECT(account.spends()[0].sender.maj_i == lws::db::major_index(4));
EXPECT(account.spends()[0].sender.min_i == lws::db::minor_index(55));
lws::account copy{};
EXPECT(!copy);
{
wire::msgpack_slice_writer dest{true};
wire_write::bytes(dest, account);
EXPECT(!wire::msgpack::from_bytes(epee::byte_slice{dest.take_sink()}, copy));
}
EXPECT(copy);
EXPECT(copy.address() == account_address);
EXPECT(copy.id() == db_account.id);
EXPECT(copy.view_public() == db_account.address.view_public);
EXPECT(copy.spend_public() == db_account.address.spend_public);
EXPECT(copy.view_key() == keys.m_view_secret_key);
EXPECT(copy.scan_height() == db_account.scan_height);
{
const auto result = copy.get_spendable(lws::db::output_id{100, 2000});
EXPECT(bool(result));
EXPECT(result->maj_i == lws::db::major_index(1));
EXPECT(result->min_i == lws::db::minor_index(34));
}
EXPECT(copy.outputs().size() == 1);
EXPECT(copy.outputs()[0].link == link);
EXPECT(copy.outputs()[0].spend_meta.id.high == 500);
EXPECT(copy.outputs()[0].spend_meta.id.low == 30);
EXPECT(copy.outputs()[0].spend_meta.amount == 40000);
EXPECT(copy.outputs()[0].spend_meta.mixin_count == 16);
EXPECT(copy.outputs()[0].spend_meta.index == 2);
EXPECT(copy.outputs()[0].spend_meta.tx_public == tx_public);
EXPECT(copy.outputs()[0].timestamp == 7000);
EXPECT(copy.outputs()[0].unlock_time == 4670);
EXPECT(copy.outputs()[0].tx_prefix_hash == tx_prefix);
EXPECT(copy.outputs()[0].pub == pub);
EXPECT(copy.outputs()[0].ringct_mask == ringct);
{
const auto unpacked = lws::db::unpack(copy.outputs()[0].extra);
EXPECT(unpacked.first == extra);
EXPECT(unpacked.second == sizeof(crypto::hash));
}
EXPECT(copy.outputs()[0].recipient.maj_i == lws::db::major_index(2));
EXPECT(copy.outputs()[0].recipient.min_i == lws::db::minor_index(66));
EXPECT(copy.spends().size() == 1);
EXPECT(copy.spends()[0].link == link);
EXPECT(copy.spends()[0].image == image);
EXPECT(copy.spends()[0].source.high == 10);
EXPECT(copy.spends()[0].source.low == 70000);
EXPECT(copy.spends()[0].timestamp == 66);
EXPECT(copy.spends()[0].unlock_time == 1500);
EXPECT(copy.spends()[0].mixin_count == 16);
EXPECT(copy.spends()[0].length == 32);
EXPECT(copy.spends()[0].payment_id == payment_id);
EXPECT(copy.spends()[0].sender.maj_i == lws::db::major_index(4));
EXPECT(copy.spends()[0].sender.min_i == lws::db::minor_index(55));
}