From 80604e8133fc781e526e1936a704ac79cfa709ae Mon Sep 17 00:00:00 2001 From: Lee *!* Clagett Date: Mon, 8 Apr 2024 12:58:43 -0400 Subject: [PATCH] New accounts are 'pushed' to worker threads (#102) --- src/admin_main.cpp | 2 +- src/db/account.cpp | 58 ++++++++ src/db/account.h | 14 ++ src/db/data.cpp | 78 +++++++++-- src/db/data.h | 1 + src/rest_server.cpp | 2 +- src/rpc/admin.cpp | 2 +- src/rpc/client.cpp | 104 ++++++++++++++- src/rpc/client.h | 41 +++++- src/rpc/daemon_pub.cpp | 2 +- src/rpc/daemon_zmq.cpp | 2 +- src/rpc/light_wallet.cpp | 2 +- src/rpc/lws_pub.cpp | 2 +- src/scanner.cpp | 122 +++++++++++++---- src/wire/CMakeLists.txt | 2 +- src/wire/{ => adapted}/crypto.h | 6 + src/wire/adapted/pair.h | 50 +++++++ src/wire/read.h | 5 + src/wire/wrapper/trusted_array.h | 76 +++++++++++ tests/unit/db/CMakeLists.txt | 1 + tests/unit/db/account.test.cpp | 221 +++++++++++++++++++++++++++++++ 21 files changed, 743 insertions(+), 50 deletions(-) rename src/wire/{ => adapted}/crypto.h (93%) create mode 100644 src/wire/adapted/pair.h create mode 100644 src/wire/wrapper/trusted_array.h create mode 100644 tests/unit/db/account.test.cpp diff --git a/src/admin_main.cpp b/src/admin_main.cpp index a114014..8e06b0c 100644 --- a/src/admin_main.cpp +++ b/src/admin_main.cpp @@ -52,7 +52,7 @@ #include "rpc/admin.h" #include "span.h" // monero/contrib/epee/include #include "string_tools.h" // monero/contrib/epee/include -#include "wire/crypto.h" +#include "wire/adapted/crypto.h" #include "wire/filters.h" #include "wire/json/write.h" #include "wire/wrapper/array.h" diff --git a/src/db/account.cpp b/src/db/account.cpp index 79c6bba..7752827 100644 --- a/src/db/account.cpp +++ b/src/db/account.cpp @@ -33,6 +33,11 @@ #include "common/expect.h" #include "db/data.h" #include "db/string.h" +#include "wire/adapted/crypto.h" +#include "wire/adapted/pair.h" +#include "wire/msgpack.h" +#include "wire/vector.h" +#include "wire/wrapper/trusted_array.h" namespace lws { @@ -50,6 +55,10 @@ namespace lws struct account::internal { + internal() + : address(), id(db::account_id::invalid), pubs{}, view_key{} + {} + explicit internal(db::account const& source) : address(db::address_string(source.address)), id(source.id), pubs(source.address), view_key() { @@ -66,6 +75,23 @@ namespace lws ); } + void read_bytes(wire::msgpack_reader& source) + { map(source, *this); } + + void write_bytes(wire::msgpack_writer& dest) const + { map(dest, *this); } + + template + static void map(F& format, T& self) + { + wire::object(format, + WIRE_FIELD_ID(0, address), + WIRE_FIELD_ID(1, id), + WIRE_FIELD_ID(2, pubs), + WIRE_FIELD_ID(3, view_key) + ); + } + std::string address; db::account_id id; db::account_address pubs; @@ -87,6 +113,23 @@ namespace lws MONERO_THROW(::common_error::kInvalidArgument, "using moved from account"); } + template + void account::map(F& format, T& self, U& immutable) + { + wire::object(format, + wire::field<0>("immutable_", std::ref(immutable)), + wire::optional_field<1>("spendable_", wire::trusted_array(std::ref(self.spendable_))), + wire::optional_field<2>("pubs_", wire::trusted_array(std::ref(self.pubs_))), + wire::optional_field<3>("spends_", wire::trusted_array(std::ref(self.spends_))), + wire::optional_field<4>("outputs_", wire::trusted_array(std::ref(self.outputs_))), + WIRE_FIELD_ID(5, height_) + ); + } + + account::account() noexcept + : immutable_(nullptr), spendable_(), pubs_(), spends_(), outputs_(), height_(db::block_id(0)) + {} + account::account(db::account const& source, std::vector> spendable, std::vector pubs) : account(std::make_shared(source), source.scan_height, std::move(spendable), std::move(pubs)) { @@ -97,6 +140,21 @@ namespace lws account::~account() noexcept {} + void account::read_bytes(::wire::msgpack_reader& source) + { + auto immutable = std::make_shared(); + map(source, *this, *immutable); + immutable_ = std::move(immutable); + std::sort(spendable_.begin(), spendable_.end()); + std::sort(pubs_.begin(), pubs_.end(), sort_pubs{}); + } + + void account::write_bytes(::wire::msgpack_writer& dest) const + { + null_check(); + map(dest, *this, *immutable_); + } + account account::clone() const { account result{immutable_, height_, spendable_, pubs_}; diff --git a/src/db/account.h b/src/db/account.h index 8e630ea..c044081 100644 --- a/src/db/account.h +++ b/src/db/account.h @@ -36,6 +36,8 @@ #include "fwd.h" #include "db/data.h" #include "db/fwd.h" +#include "wire/fwd.h" +#include "wire/msgpack/fwd.h" namespace lws { @@ -54,8 +56,14 @@ namespace lws explicit account(std::shared_ptr immutable, db::block_id height, std::vector> spendable, std::vector pubs) noexcept; void null_check() const; + template + static void map(F& format, T& self, U& immutable); + public: + //! Construct an "invalid" account (for de-serialization) + account() noexcept; + //! Construct an account from `source` and current `spendable` outputs. explicit account(db::account const& source, std::vector> spendable, std::vector pubs); @@ -71,6 +79,12 @@ namespace lws account& operator=(const account&) = delete; account& operator=(account&&) = default; + //! Read into `this` from `source`. + void read_bytes(::wire::msgpack_reader& source); + + //! Write to `dest` from `this`. + void write_bytes(::wire::msgpack_writer& dest) const; + //! \return A copy of `this`. account clone() const; diff --git a/src/db/data.cpp b/src/db/data.cpp index 08c9c3a..8b0b348 100644 --- a/src/db/data.cpp +++ b/src/db/data.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2018, The Monero Project +// Copyright (c) 2018-2024, The Monero Project // All rights reserved. // // Redistribution and use in source and binary forms, with or without modification, are @@ -36,7 +36,7 @@ #include "ringct/rctTypes.h" // monero/src #include "wire.h" #include "wire/adapted/array.h" -#include "wire/crypto.h" +#include "wire/adapted/crypto.h" #include "wire/json/write.h" #include "wire/msgpack.h" #include "wire/uuid.h" @@ -54,7 +54,7 @@ namespace db template void map_output_id(F& format, T& self) { - wire::object(format, WIRE_FIELD(high), WIRE_FIELD(low)); + wire::object(format, WIRE_FIELD_ID(0, high), WIRE_FIELD_ID(1, low)); } } WIRE_DEFINE_OBJECT(output_id, map_output_id); @@ -72,7 +72,7 @@ namespace db template void map_account_address(F& format, T& self) { - wire::object(format, WIRE_FIELD(spend_public), WIRE_FIELD(view_public)); + wire::object(format, WIRE_FIELD_ID(0, spend_public), WIRE_FIELD_ID(1, view_public)); } } WIRE_DEFINE_OBJECT(account_address, map_account_address); @@ -250,6 +250,55 @@ namespace db } WIRE_DEFINE_OBJECT(transaction_link, map_transaction_link); + void read_bytes(wire::reader& source, output& self) + { + bool coinbase = false; + boost::optional rct; + boost::optional> payment_id; + + wire::object(source, + wire::optional_field<0>("id", wire::defaulted(std::ref(self.spend_meta.id), output_id::txpool())), + wire::optional_field<1>("block", wire::defaulted(std::ref(self.link.height), block_id::txpool)), + wire::field<2>("index", std::ref(self.spend_meta.index)), + wire::field<3>("amount", std::ref(self.spend_meta.amount)), + wire::field<4>("timestamp", std::ref(self.timestamp)), + wire::field<5>("tx_hash", std::ref(self.link.tx_hash)), + wire::field<6>("tx_prefix_hash", std::ref(self.tx_prefix_hash)), + wire::field<7>("tx_public", std::ref(self.spend_meta.tx_public)), + wire::optional_field<8>("rct_mask", std::ref(rct)), + wire::optional_field<9>("payment_id", std::ref(payment_id)), + wire::field<10>("unlock_time", std::ref(self.unlock_time)), + wire::field<11>("mixin_count", std::ref(self.spend_meta.mixin_count)), + wire::field<12>("coinbase", std::ref(coinbase)), + wire::field<13>("fee", std::ref(self.fee)), + wire::field<14>("recipient", std::ref(self.recipient)), + wire::field<15>("pub", std::ref(self.pub)) + ); + + std::uint8_t pay_length = 0; + if (payment_id) + pay_length = payment_id->size(); + + if (pay_length && pay_length != 8 && pay_length != 32) + WIRE_DLOG_THROW(wire::error::schema::binary, "Unexpected binary size"); + if (pay_length == 8) + std::memcpy(std::addressof(self.payment_id.short_), payment_id->data(), 8); + if (pay_length == 32) + std::memcpy(std::addressof(self.payment_id.long_), payment_id->data(), 32); + + if (rct) + std::memcpy(std::addressof(self.ringct_mask), std::addressof(*rct), sizeof(self.ringct_mask)); + else + std::memset(std::addressof(self.ringct_mask), 0, sizeof(self.ringct_mask)); + + extra flags{}; + if (coinbase) + flags = lws::db::coinbase_output; + if (rct) + flags = extra(lws::db::ringct_output | flags); + self.extra = db::pack(flags, pay_length); + } + void write_bytes(wire::writer& dest, const output& self) { const std::pair unpacked = @@ -286,7 +335,8 @@ namespace db wire::field<11>("mixin_count", self.spend_meta.mixin_count), wire::field<12>("coinbase", coinbase), wire::field<13>("fee", self.fee), - wire::field<14>("recipient", self.recipient) + wire::field<14>("recipient", self.recipient), + wire::field<15>("pub", std::cref(self.pub)) ); } @@ -296,15 +346,15 @@ namespace db void map_spend(F& format, T1& self, T2& payment_id) { wire::object(format, - wire::field("height", std::ref(self.link.height)), - wire::field("tx_hash", std::ref(self.link.tx_hash)), - WIRE_FIELD(image), - WIRE_FIELD(source), - WIRE_FIELD(timestamp), - WIRE_FIELD(unlock_time), - WIRE_FIELD(mixin_count), - wire::optional_field("payment_id", std::ref(payment_id)), - WIRE_FIELD(sender) + wire::field<0>("height", std::ref(self.link.height)), + wire::field<1>("tx_hash", std::ref(self.link.tx_hash)), + WIRE_FIELD_ID(2, image), + WIRE_FIELD_ID(3, source), + WIRE_FIELD_ID(4, timestamp), + WIRE_FIELD_ID(5, unlock_time), + WIRE_FIELD_ID(6, mixin_count), + wire::optional_field<7>("payment_id", std::ref(payment_id)), + WIRE_FIELD_ID(8, sender) ); } } diff --git a/src/db/data.h b/src/db/data.h index b25cfcc..82c6b6e 100644 --- a/src/db/data.h +++ b/src/db/data.h @@ -291,6 +291,7 @@ namespace db sizeof(output) == 8 + 32 + (8 * 3) + (4 * 2) + 32 + (8 * 2) + (32 * 3) + 7 + 1 + 32 + 8 + 2 * 4, "padding in output" ); + void read_bytes(wire::reader&, output&); void write_bytes(wire::writer&, const output&); //! Information about a possible spend of a received `output`. diff --git a/src/rest_server.cpp b/src/rest_server.cpp index c61717c..c75166a 100644 --- a/src/rest_server.cpp +++ b/src/rest_server.cpp @@ -57,7 +57,7 @@ #include "util/gamma_picker.h" #include "util/random_outputs.h" #include "util/source_location.h" -#include "wire/crypto.h" +#include "wire/adapted/crypto.h" #include "wire/json.h" namespace lws diff --git a/src/rpc/admin.cpp b/src/rpc/admin.cpp index 0b42316..c2498d7 100644 --- a/src/rpc/admin.cpp +++ b/src/rpc/admin.cpp @@ -36,7 +36,7 @@ #include "error.h" #include "span.h" // monero/contrib/epee/include #include "wire.h" -#include "wire/crypto.h" +#include "wire/adapted/crypto.h" #include "wire/error.h" #include "wire/json/write.h" #include "wire/traits.h" diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index f68f73d..f2fb0c3 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -34,11 +34,14 @@ #include #include "common/error.h" // monero/contrib/epee/include +#include "db/account.h" #include "error.h" #include "misc_log_ex.h" // monero/contrib/epee/include #include "net/http_client.h" // monero/contrib/epee/include #include "net/zmq.h" // monero/src +#include "scanner.h" #include "serialization/json_object.h" // monero/src +#include "wire/msgpack.h" #if MLWS_RMQ_ENABLED #include #include @@ -53,11 +56,13 @@ namespace rpc namespace { constexpr const char signal_endpoint[] = "inproc://signal"; + constexpr const char account_endpoint[] = "inproc://account"; // append integer every new `account_push` constexpr const char abort_scan_signal[] = "SCAN"; constexpr const char abort_process_signal[] = "PROCESS"; constexpr const char minimal_chain_topic[] = "json-minimal-chain_main"; constexpr const char full_txpool_topic[] = "json-full-txpool_add"; constexpr const int daemon_zmq_linger = 0; + constexpr const int account_zmq_linger = 0; constexpr const std::int64_t max_msg_sub = 10 * 1024 * 1024; // 50 MiB constexpr const std::int64_t max_msg_req = 350 * 1024 * 1024; // 350 MiB constexpr const std::chrono::seconds chain_poll_timeout{20}; @@ -192,6 +197,7 @@ namespace rpc , cache_time() , cache_interval(interval) , cached{} + , account_counter(0) , sync_pub() , sync_rates() , untrusted_daemon(untrusted_daemon) @@ -210,12 +216,70 @@ namespace rpc std::chrono::steady_clock::time_point cache_time; const std::chrono::minutes cache_interval; rates cached; + std::atomic account_counter; boost::mutex sync_pub; boost::mutex sync_rates; const bool untrusted_daemon; }; } // detail + expect account_push::make(std::shared_ptr ctx) noexcept + { + MONERO_PRECOND(ctx != nullptr); + + account_push out{ctx}; + out.sock.reset(zmq_socket(ctx->comm.get(), ZMQ_PUSH)); + if (out.sock == nullptr) + return {net::zmq::get_error_code()}; + + const std::string bind = account_endpoint + std::to_string(++ctx->account_counter); + MONERO_CHECK(do_set_option(out.sock.get(), ZMQ_LINGER, account_zmq_linger)); + MONERO_ZMQ_CHECK(zmq_bind(out.sock.get(), bind.c_str())); + return {std::move(out)}; + } + + account_push::~account_push() noexcept + {} + + expect account_push::push(epee::span accounts, std::chrono::seconds timeout) + { + MONERO_PRECOND(ctx != nullptr); + assert(sock.get() != nullptr); + + for (const lws::account& account : accounts) + { + // use integer id values (quick and fast) + wire::msgpack_slice_writer dest{true}; + try + { + wire_write::bytes(dest, account); + } + catch (const wire::exception& e) + { + return {e.code()}; + } + epee::byte_slice message{dest.take_sink()}; + + /* This is being pushed by the thread that monitors for shutdown, so + no signal is expected. */ + expect sent; + const auto start = std::chrono::steady_clock::now(); + while (!(sent = net::zmq::send(message.clone(), sock.get(), ZMQ_DONTWAIT))) + { + if (sent != net::zmq::make_error_code(EAGAIN)) + return sent.error(); + if (!scanner::is_running()) + return {error::signal_abort_process}; + const auto elapsed = std::chrono::steady_clock::now() - start; + if (timeout <= elapsed) + return {error::daemon_timeout}; + + boost::this_thread::sleep_for(boost::chrono::milliseconds{10}); + } + } + return success(); + } + expect client::get_response(cryptonote::rpc::Message& response, const std::chrono::seconds timeout, const source_location loc) { expect message = get_message(timeout); @@ -312,6 +376,18 @@ namespace rpc return do_subscribe(signal_sub.get(), abort_scan_signal); } + expect client::enable_pull_accounts() + { + detail::socket new_sock{zmq_socket(ctx->comm.get(), ZMQ_PULL)}; + if (new_sock == nullptr) + return {net::zmq::get_error_code()}; + const std::string connect = + account_endpoint + std::to_string(ctx->account_counter); + MONERO_ZMQ_CHECK(zmq_connect(new_sock.get(), connect.c_str())); + account_pull = std::move(new_sock); + return success(); + } + expect>> client::wait_for_block() { MONERO_PRECOND(ctx != nullptr); @@ -425,6 +501,32 @@ namespace rpc return rc; } + expect> client::pull_accounts() + { + MONERO_PRECOND(ctx != nullptr); + + if (!account_pull) + MONERO_CHECK(enable_pull_accounts()); + + std::vector out{}; + for (;;) + { + expect next = net::zmq::receive(account_pull.get(), ZMQ_DONTWAIT); + if (!next) + { + if (net::zmq::make_error_code(EAGAIN)) + break; + return next.error(); + } + out.emplace_back(); + const std::error_code error = + wire::msgpack::from_bytes(epee::byte_slice{std::move(*next)}, out.back()); + if (error) + return error; + } + return {std::move(out)}; + } + expect client::get_rates() const { MONERO_PRECOND(ctx != nullptr); @@ -459,7 +561,7 @@ namespace rpc if (zmq_bind(external_pub.get(), pub_addr.c_str()) < 0) MONERO_THROW(net::zmq::get_error_code(), "zmq_bind"); } - + rcontext rmq{}; #ifdef MLWS_RMQ_ENABLED if (!rmq_info.address.empty()) diff --git a/src/rpc/client.h b/src/rpc/client.h index 502c885..9b9a161 100644 --- a/src/rpc/client.h +++ b/src/rpc/client.h @@ -34,6 +34,7 @@ #include #include "byte_slice.h" // monero/contrib/epee/include +#include "db/fwd.h" #include "common/expect.h" // monero/src #include "rpc/message.h" // monero/src #include "rpc/daemon_pub.h" @@ -67,6 +68,31 @@ namespace rpc std::string routing; }; + //! Every scanner "reset", a new socket is created so old messages are discarded + class account_push + { + std::shared_ptr ctx; + detail::socket sock; + + explicit account_push(std::shared_ptr ctx) noexcept + : ctx(std::move(ctx)), sock() + {} + + public: + static expect make(std::shared_ptr ctx) noexcept; + + account_push(const account_push&) = delete; + account_push(account_push&&) = default; + + ~account_push() noexcept; + + account_push& operator=(const account_push&) = delete; + account_push& operator=(account_push&&) = default; + + //! Push new `accounts` to worker threads. Each account is sent in unique message + expect push(epee::span accounts, std::chrono::seconds timeout); + }; + //! Abstraction for ZMQ RPC client. Only `get_rates()` thread-safe; use `clone()`. class client { @@ -74,9 +100,10 @@ namespace rpc detail::socket daemon; detail::socket daemon_sub; detail::socket signal_sub; + detail::socket account_pull; explicit client(std::shared_ptr ctx) noexcept - : ctx(std::move(ctx)), daemon(), daemon_sub(), signal_sub() + : ctx(std::move(ctx)), daemon(), daemon_sub(), signal_sub(), account_pull() {} //! Expect `response` as the next message payload unless error. @@ -129,6 +156,9 @@ namespace rpc //! `wait`, `send`, and `receive` will watch for `raise_abort_scan()`. expect watch_scan_signals() noexcept; + //! Register `this` client as listening for new accounts + expect enable_pull_accounts(); + //! Wait for new block announce or internal timeout. expect>> wait_for_block(); @@ -173,6 +203,9 @@ namespace rpc return response; } + //! Retrieve new accounts to be scanned on this thread. + expect> pull_accounts(); + /*! \note This is the one function that IS thread-safe. Multiple threads can call this function with the same `this` argument. @@ -232,6 +265,12 @@ namespace rpc return client::make(ctx); } + //! Create a new account push state + expect bind_push() const noexcept + { + return account_push::make(ctx); + } + /*! All block `client::send`, `client::receive`, and `client::wait` calls originating from `this` object AND whose `watch_scan_signal` method was diff --git a/src/rpc/daemon_pub.cpp b/src/rpc/daemon_pub.cpp index a809127..989ea7c 100644 --- a/src/rpc/daemon_pub.cpp +++ b/src/rpc/daemon_pub.cpp @@ -29,7 +29,7 @@ #include "cryptonote_basic/cryptonote_basic.h" // monero/src #include "rpc/daemon_zmq.h" -#include "wire/crypto.h" +#include "wire/adapted/crypto.h" #include "wire/error.h" #include "wire/field.h" #include "wire/traits.h" diff --git a/src/rpc/daemon_zmq.cpp b/src/rpc/daemon_zmq.cpp index 1c05ca3..b468e73 100644 --- a/src/rpc/daemon_zmq.cpp +++ b/src/rpc/daemon_zmq.cpp @@ -31,7 +31,7 @@ #include "cryptonote_config.h" // monero/src #include "crypto/crypto.h" // monero/src #include "rpc/message_data_structs.h" // monero/src -#include "wire/crypto.h" +#include "wire/adapted/crypto.h" #include "wire/json.h" #include "wire/wrapper/array.h" #include "wire/wrapper/variant.h" diff --git a/src/rpc/light_wallet.cpp b/src/rpc/light_wallet.cpp index 9f829b8..378d08c 100644 --- a/src/rpc/light_wallet.cpp +++ b/src/rpc/light_wallet.cpp @@ -40,7 +40,7 @@ #include "ringct/rctOps.h" // monero/src #include "span.h" // monero/contrib/epee/include #include "util/random_outputs.h" -#include "wire/crypto.h" +#include "wire/adapted/crypto.h" #include "wire/error.h" #include "wire/json.h" #include "wire/traits.h" diff --git a/src/rpc/lws_pub.cpp b/src/rpc/lws_pub.cpp index cc81acc..bbf5f74 100644 --- a/src/rpc/lws_pub.cpp +++ b/src/rpc/lws_pub.cpp @@ -31,7 +31,7 @@ #include "db/account.h" #include "rpc/client.h" #include "rpc/webhook.h" -#include "wire/crypto.h" +#include "wire/adapted/crypto.h" #include "wire/wrapper/array.h" #include "wire/wrappers_impl.h" #include "wire/write.h" diff --git a/src/scanner.cpp b/src/scanner.cpp index 7c02273..41cdefb 100644 --- a/src/scanner.cpp +++ b/src/scanner.cpp @@ -694,6 +694,35 @@ namespace lws return; } + { + expect> new_accounts = client.pull_accounts(); + if (!new_accounts) + { + MERROR("Failed to pull new accounts: " << new_accounts.error().message()); + return; // get all active accounts the easy way + } + if (!new_accounts->empty()) + { + MINFO("Received " << new_accounts->size() << " new account(s) for scanning"); + std::sort(new_accounts->begin(), new_accounts->end(), by_height{}); + const db::block_id oldest = new_accounts->front().scan_height(); + users.insert( + users.end(), + std::make_move_iterator(new_accounts->begin()), + std::make_move_iterator(new_accounts->end()) + ); + if (std::uint64_t(oldest) < fetched->start_height) + { + req.start_height = std::uint64_t(oldest); + block_request = rpc::client::make_message("get_blocks_fast", req); + if (!send(client, block_request.clone())) + return; + continue; // to next get_blocks_fast read + } + // else, the oldest new account is within the newly fetched range + } + } + // prep for next blocks retrieval req.start_height = fetched->start_height + fetched->blocks.size() - 1; block_request = rpc::client::make_message("get_blocks_fast", req); @@ -913,6 +942,27 @@ namespace lws } } + lws::account prep_account(db::storage_reader& reader, const lws::db::account& user) + { + std::vector> receives{}; + std::vector pubs{}; + auto receive_list = MONERO_UNWRAP(reader.get_outputs(user.id)); + + const std::size_t elems = receive_list.count(); + receives.reserve(elems); + pubs.reserve(elems); + + for (auto output = receive_list.make_iterator(); !output.is_end(); ++output) + { + auto id = output.get_value(); + auto subaddr = output.get_value(); + receives.emplace_back(std::move(id), std::move(subaddr)); + pubs.emplace_back(output.get_value()); + } + + return lws::account{user, std::move(receives), std::move(pubs)}; + } + /*! Launches `thread_count` threads to run `scan_loop`, and then polls for active account changes in background @@ -964,9 +1014,13 @@ namespace lws threads.reserve(thread_count); std::sort(users.begin(), users.end(), by_height{}); + // enable the new bind point before registering pull accounts + lws::rpc::account_push pusher = MONERO_UNWRAP(ctx.bind_push()); + MINFO("Starting scan loops on " << std::min(thread_count, users.size()) << " thread(s) with " << users.size() << " account(s)"); bool leader_thread = true; + bool remaining_threads = true; while (!users.empty() && --thread_count) { const std::size_t per_thread = std::max(std::size_t(1), users.size() / (thread_count + 1)); @@ -977,7 +1031,8 @@ namespace lws users.erase(users.end() - count, users.end()); rpc::client client = MONERO_UNWRAP(ctx.connect()); - client.watch_scan_signals(); + MONERO_UNWRAP(client.watch_scan_signals()); + MONERO_UNWRAP(client.enable_pull_accounts()); auto data = std::make_shared( std::move(client), disk.clone(), std::move(thread_users), opts @@ -989,12 +1044,14 @@ namespace lws if (!users.empty()) { rpc::client client = MONERO_UNWRAP(ctx.connect()); - client.watch_scan_signals(); + MONERO_UNWRAP(client.watch_scan_signals()); + MONERO_UNWRAP(client.enable_pull_accounts()); auto data = std::make_shared( std::move(client), disk.clone(), std::move(users), opts ); threads.emplace_back(attrs, std::bind(&scan_loop, std::ref(self), std::move(data), opts.untrusted_daemon, leader_thread)); + remaining_threads = false; } auto last_check = std::chrono::steady_clock::now(); @@ -1035,22 +1092,51 @@ namespace lws auto current_users = MONERO_UNWRAP( reader->get_accounts(db::account_status::active, std::move(accounts_cur)) ); - if (current_users.count() != active.size()) + if (current_users.count() < active.size()) + { + // cannot remove accounts via ZMQ (yet) + MINFO("Decrease in active user accounts detected, stopping scan threads..."); + return; + } + std::vector active_copy = active; + std::vector new_; + for (auto user = current_users.make_iterator(); !user.is_end(); ++user) + { + const db::account_id user_id = user.get_value(); + const auto loc = std::lower_bound(active_copy.begin(), active_copy.end(), user_id); + if (loc == active_copy.end() || *loc != user_id) + { + new_.emplace_back(prep_account(*reader, user.get_value())); + active.insert( + std::lower_bound(active.begin(), active.end(), user_id), user_id + ); + } + else + active_copy.erase(loc); + } + + if (!active_copy.empty()) { MINFO("Change in active user accounts detected, stopping scan threads..."); return; } - - for (auto user = current_users.make_iterator(); !user.is_end(); ++user) + if (!new_.empty()) { - const db::account_id user_id = user.get_value(); - if (!std::binary_search(active.begin(), active.end(), user_id)) + if (remaining_threads) { - MINFO("Change in active user accounts detected, stopping scan threads..."); + MINFO("Received new account(s), starting more thread(s)"); return; } - } + const auto pushed = pusher.push(epee::to_span(new_), std::chrono::seconds{1}); + if (!pushed) + { + MERROR("Failed to push new account to workers: " << pushed.error().message()); + return; // pull in new accounts by resetting state + } + else + MINFO("Pushed " << new_.size() << " new accounts to worker thread(s)"); + } read_txn = reader->finish_read(); accounts_cur = current_users.give_cursor(); } // while scanning @@ -1293,23 +1379,7 @@ namespace lws for (db::account user : accounts.make_range()) { - std::vector> receives{}; - std::vector pubs{}; - auto receive_list = MONERO_UNWRAP(reader.get_outputs(user.id)); - - const std::size_t elems = receive_list.count(); - receives.reserve(elems); - pubs.reserve(elems); - - for (auto output = receive_list.make_iterator(); !output.is_end(); ++output) - { - auto id = output.get_value(); - auto subaddr = output.get_value(); - receives.emplace_back(std::move(id), std::move(subaddr)); - pubs.emplace_back(output.get_value()); - } - - users.emplace_back(user, std::move(receives), std::move(pubs)); + users.emplace_back(prep_account(reader, user)); active.insert( std::lower_bound(active.begin(), active.end(), user.id), user.id ); diff --git a/src/wire/CMakeLists.txt b/src/wire/CMakeLists.txt index 15fd61b..e0dee99 100644 --- a/src/wire/CMakeLists.txt +++ b/src/wire/CMakeLists.txt @@ -27,7 +27,7 @@ # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. set(monero-lws-wire_sources error.cpp read.cpp write.cpp) -set(monero-lws-wire_headers crypto.h error.h field.h filters.h fwd.h json.h read.h traits.h vector.h write.h) +set(monero-lws-wire_headers error.h field.h filters.h fwd.h json.h read.h traits.h vector.h write.h) add_library(monero-lws-wire ${monero-lws-wire_sources} ${monero-lws-wire_headers}) target_include_directories(monero-lws-wire PUBLIC "${LMDB_INCLUDE}") diff --git a/src/wire/crypto.h b/src/wire/adapted/crypto.h similarity index 93% rename from src/wire/crypto.h rename to src/wire/adapted/crypto.h index 34e42f5..dab53ba 100644 --- a/src/wire/crypto.h +++ b/src/wire/adapted/crypto.h @@ -41,6 +41,12 @@ namespace crypto { source.binary(epee::as_mut_byte_span(unwrap(unwrap(self)))); } + + template + void write_bytes(W& dest, const crypto::secret_key& self) + { + dest.binary(epee::as_byte_span(unwrap(unwrap(self)))); + } } namespace wire diff --git a/src/wire/adapted/pair.h b/src/wire/adapted/pair.h new file mode 100644 index 0000000..682ab9c --- /dev/null +++ b/src/wire/adapted/pair.h @@ -0,0 +1,50 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include "wire/field.h" +#include "wire/read.h" +#include "wire/write.h" + +namespace wire +{ + template + void map_pair(F& format, T& self) + { + wire::object(format, WIRE_FIELD_ID(0, first), WIRE_FIELD_ID(1, second)); + } + + template + void read_bytes(R& source, std::pair& dest) + { map_pair(source, dest); } + + template + void write_bytes(W& dest, const std::pair& source) + { map_pair(dest, source); } +} + diff --git a/src/wire/read.h b/src/wire/read.h index 1c44319..107b7d6 100644 --- a/src/wire/read.h +++ b/src/wire/read.h @@ -170,6 +170,11 @@ namespace wire inline std::enable_if_t::value> read_bytes(R& source, T& dest) { source.binary(epee::as_mut_byte_span(dest)); } + //! Use `read_bytes(...)` method if available for `T`. + template + inline auto read_bytes(R& source, T& dest) -> decltype(dest.read_bytes(source)) + { return dest.read_bytes(source); } + namespace integer { [[noreturn]] void throw_exception(std::intmax_t value, std::intmax_t min, std::intmax_t max); diff --git a/src/wire/wrapper/trusted_array.h b/src/wire/wrapper/trusted_array.h new file mode 100644 index 0000000..c4ac3eb --- /dev/null +++ b/src/wire/wrapper/trusted_array.h @@ -0,0 +1,76 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include +#include +#include "wire/traits.h" +#include "wire/read.h" +#include "wire/write.h" + +namespace wire +{ + //! \brief Wrapper that removes read constraints + template + struct trusted_array_ + { + using container_type = wire::unwrap_reference_t; + T container; + + const container_type& get_container() const noexcept { return container; } + container_type& get_container() noexcept { return container; } + + // concept requirements for optional fields + + explicit operator bool() const noexcept { return !get_container().empty(); } + trusted_array_& emplace() noexcept { return *this; } + + trusted_array_& operator*() noexcept { return *this; } + const trusted_array_& operator*() const noexcept { return *this; } + + void reset() { get_container().clear(); } + }; + + template + trusted_array_ trusted_array(T value) + { + return {std::move(value)}; + } + + template + void read_bytes(R& source, trusted_array_ dest) + { + wire_read::array_unchecked(source, dest.get_container(), 0, std::numeric_limits::max()); + } + + template + void write_bytes(W& dest, const trusted_array_ source) + { + wire_write::array(dest, source.get_container()); + } +} diff --git a/tests/unit/db/CMakeLists.txt b/tests/unit/db/CMakeLists.txt index f082f55..5d90837 100644 --- a/tests/unit/db/CMakeLists.txt +++ b/tests/unit/db/CMakeLists.txt @@ -27,6 +27,7 @@ # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. add_library(monero-lws-unit-db OBJECT + account.test.cpp chain.test.cpp data.test.cpp storage.test.cpp diff --git a/tests/unit/db/account.test.cpp b/tests/unit/db/account.test.cpp new file mode 100644 index 0000000..58a9569 --- /dev/null +++ b/tests/unit/db/account.test.cpp @@ -0,0 +1,221 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "framework.test.h" + +#include "crypto/crypto.h" // monero/src +#include "cryptonote_basic/account.h" // monero/src +#include "db/account.h" +#include "wire/msgpack.h" + +LWS_CASE("lws::account serialization") +{ + cryptonote::account_keys keys{}; + crypto::generate_keys(keys.m_account_address.m_spend_public_key, keys.m_spend_secret_key); + crypto::generate_keys(keys.m_account_address.m_view_public_key, keys.m_view_secret_key); + + lws::db::account db_account{ + lws::db::account_id(44), + lws::db::account_time(500000), + lws::db::account_address{ + keys.m_account_address.m_view_public_key, + keys.m_account_address.m_spend_public_key + }, + {}, + lws::db::block_id(1000000), + lws::db::block_id(500000), + lws::db::account_time(250000) + }; + std::memcpy( + std::addressof(db_account.key), + std::addressof(unwrap(unwrap(keys.m_view_secret_key))), + sizeof(db_account.key) + ); + + const std::vector> spendable{ + { + lws::db::output_id{100, 2000}, + lws::db::address_index{lws::db::major_index(1), lws::db::minor_index(34)} + } + }; + const std::vector pubs{crypto::rand()}; + + lws::account account{db_account, spendable, pubs}; + EXPECT(account); + + const lws::db::transaction_link link{ + lws::db::block_id(4000), crypto::rand() + }; + const crypto::public_key tx_public = crypto::rand(); + const crypto::hash tx_prefix = crypto::rand(); + const crypto::public_key pub = crypto::rand(); + const rct::key ringct = crypto::rand(); + const auto extra = + lws::db::extra(lws::db::extra::coinbase_output | lws::db::extra::ringct_output); + const auto payment_id_ = crypto::rand(); + const crypto::hash payment_id = crypto::rand(); + const crypto::key_image image = crypto::rand(); + + account.add_out( + lws::db::output{ + link, + lws::db::output::spend_meta_{ + lws::db::output_id{500, 30}, + std::uint64_t(40000), + std::uint32_t(16), + std::uint32_t(2), + tx_public + }, + std::uint64_t(7000), + std::uint64_t(4670), + tx_prefix, + pub, + ringct, + {0, 0, 0, 0, 0, 0, 0}, + lws::db::pack(extra, sizeof(crypto::hash)), + payment_id_, + std::uint64_t(33444), + lws::db::address_index{lws::db::major_index(2), lws::db::minor_index(66)} + } + ); + account.add_spend( + lws::db::spend{ + link, + image, + lws::db::output_id{10, 70000}, + std::uint64_t(66), + std::uint64_t(1500), + std::uint32_t(16), + {0, 0, 0}, + 32, + payment_id, + lws::db::address_index{lws::db::major_index(4), lws::db::minor_index(55)} + } + ); + + const std::string account_address = account.address(); + EXPECT(account.id() == db_account.id); + EXPECT(account.view_public() == db_account.address.view_public); + EXPECT(account.spend_public() == db_account.address.spend_public); + EXPECT(account.view_key() == keys.m_view_secret_key); + EXPECT(account.scan_height() == db_account.scan_height); + { + const auto result = account.get_spendable(lws::db::output_id{100, 2000}); + EXPECT(bool(result)); + EXPECT(result->maj_i == lws::db::major_index(1)); + EXPECT(result->min_i == lws::db::minor_index(34)); + } + EXPECT(account.outputs().size() == 1); + EXPECT(account.outputs()[0].link == link); + EXPECT(account.outputs()[0].spend_meta.id.high == 500); + EXPECT(account.outputs()[0].spend_meta.id.low == 30); + EXPECT(account.outputs()[0].spend_meta.amount == 40000); + EXPECT(account.outputs()[0].spend_meta.mixin_count == 16); + EXPECT(account.outputs()[0].spend_meta.index == 2); + EXPECT(account.outputs()[0].spend_meta.tx_public == tx_public); + EXPECT(account.outputs()[0].timestamp == 7000); + EXPECT(account.outputs()[0].unlock_time == 4670); + EXPECT(account.outputs()[0].tx_prefix_hash == tx_prefix); + EXPECT(account.outputs()[0].pub == pub); + EXPECT(account.outputs()[0].ringct_mask == ringct); + { + const auto unpacked = lws::db::unpack(account.outputs()[0].extra); + EXPECT(unpacked.first == extra); + EXPECT(unpacked.second == sizeof(crypto::hash)); + } + EXPECT(account.outputs()[0].recipient.maj_i == lws::db::major_index(2)); + EXPECT(account.outputs()[0].recipient.min_i == lws::db::minor_index(66)); + + EXPECT(account.spends().size() == 1); + EXPECT(account.spends()[0].link == link); + EXPECT(account.spends()[0].image == image); + EXPECT(account.spends()[0].source.high == 10); + EXPECT(account.spends()[0].source.low == 70000); + EXPECT(account.spends()[0].timestamp == 66); + EXPECT(account.spends()[0].unlock_time == 1500); + EXPECT(account.spends()[0].mixin_count == 16); + EXPECT(account.spends()[0].length == 32); + EXPECT(account.spends()[0].payment_id == payment_id); + EXPECT(account.spends()[0].sender.maj_i == lws::db::major_index(4)); + EXPECT(account.spends()[0].sender.min_i == lws::db::minor_index(55)); + + lws::account copy{}; + EXPECT(!copy); + + { + wire::msgpack_slice_writer dest{true}; + wire_write::bytes(dest, account); + EXPECT(!wire::msgpack::from_bytes(epee::byte_slice{dest.take_sink()}, copy)); + } + + EXPECT(copy); + EXPECT(copy.address() == account_address); + EXPECT(copy.id() == db_account.id); + EXPECT(copy.view_public() == db_account.address.view_public); + EXPECT(copy.spend_public() == db_account.address.spend_public); + EXPECT(copy.view_key() == keys.m_view_secret_key); + EXPECT(copy.scan_height() == db_account.scan_height); + { + const auto result = copy.get_spendable(lws::db::output_id{100, 2000}); + EXPECT(bool(result)); + EXPECT(result->maj_i == lws::db::major_index(1)); + EXPECT(result->min_i == lws::db::minor_index(34)); + } + EXPECT(copy.outputs().size() == 1); + EXPECT(copy.outputs()[0].link == link); + EXPECT(copy.outputs()[0].spend_meta.id.high == 500); + EXPECT(copy.outputs()[0].spend_meta.id.low == 30); + EXPECT(copy.outputs()[0].spend_meta.amount == 40000); + EXPECT(copy.outputs()[0].spend_meta.mixin_count == 16); + EXPECT(copy.outputs()[0].spend_meta.index == 2); + EXPECT(copy.outputs()[0].spend_meta.tx_public == tx_public); + EXPECT(copy.outputs()[0].timestamp == 7000); + EXPECT(copy.outputs()[0].unlock_time == 4670); + EXPECT(copy.outputs()[0].tx_prefix_hash == tx_prefix); + EXPECT(copy.outputs()[0].pub == pub); + EXPECT(copy.outputs()[0].ringct_mask == ringct); + { + const auto unpacked = lws::db::unpack(copy.outputs()[0].extra); + EXPECT(unpacked.first == extra); + EXPECT(unpacked.second == sizeof(crypto::hash)); + } + EXPECT(copy.outputs()[0].recipient.maj_i == lws::db::major_index(2)); + EXPECT(copy.outputs()[0].recipient.min_i == lws::db::minor_index(66)); + + EXPECT(copy.spends().size() == 1); + EXPECT(copy.spends()[0].link == link); + EXPECT(copy.spends()[0].image == image); + EXPECT(copy.spends()[0].source.high == 10); + EXPECT(copy.spends()[0].source.low == 70000); + EXPECT(copy.spends()[0].timestamp == 66); + EXPECT(copy.spends()[0].unlock_time == 1500); + EXPECT(copy.spends()[0].mixin_count == 16); + EXPECT(copy.spends()[0].length == 32); + EXPECT(copy.spends()[0].payment_id == payment_id); + EXPECT(copy.spends()[0].sender.maj_i == lws::db::major_index(4)); + EXPECT(copy.spends()[0].sender.min_i == lws::db::minor_index(55)); +}