Add support for remote scanning via custom TCP (#118)
Some checks failed
unix-ci / build-tests (macos-12, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (macos-12, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (macos-13, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (macos-13, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (macos-latest, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (macos-latest, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (ubuntu-20.04, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (ubuntu-20.04, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (ubuntu-22.04, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (ubuntu-22.04, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (ubuntu-latest, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (ubuntu-latest, WITH_RMQ=ON) (push) Has been cancelled

This commit is contained in:
Lee *!* Clagett 2024-09-22 19:55:28 -04:00 committed by GitHub
parent 1575b56f05
commit 9868fd98e8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 2950 additions and 500 deletions

View file

@ -50,6 +50,7 @@ target_link_libraries(monero-lws-daemon-common
monero-lws-common
monero-lws-db
monero-lws-rpc
monero-lws-rpc-scanner
monero-lws-wire-json
monero-lws-util
${Boost_CHRONO_LIBRARY}
@ -66,6 +67,7 @@ target_link_libraries(monero-lws-daemon
PRIVATE
monero::libraries
monero-lws-daemon-common
monero-lws-rpc-scanner
${Boost_PROGRAM_OPTIONS_LIBRARY}
${Boost_FILESYSTEM_LIBRARY}
)
@ -82,5 +84,18 @@ target_link_libraries(monero-lws-admin
Threads::Threads
)
add_executable(monero-lws-client client_main.cpp)
target_link_libraries(monero-lws-client
PRIVATE
monero::libraries
monero-lws-common
monero-lws-daemon-common
monero-lws-rpc
monero-lws-rpc-scanner
${Boost_PROGRAM_OPTIONS_LIBRARY}
Threads::Threads
)
install(TARGETS monero-lws-daemon DESTINATION bin)
install(TARGETS monero-lws-admin DESTINATION bin)
install(TARGETS monero-lws-client DESTINATION bin)

390
src/client_main.cpp Normal file
View file

@ -0,0 +1,390 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <boost/asio/signal_set.hpp>
#include <boost/filesystem/operations.hpp>
#include <boost/program_options/options_description.hpp>
#include <boost/program_options/parsers.hpp>
#include <boost/program_options/variables_map.hpp>
#include <boost/thread/thread.hpp>
#include <csignal>
#include <iostream>
#include <optional>
#include <stdexcept>
#include <string>
#include <vector>
#include "common/command_line.h" // monero/src/
#include "common/expect.h" // monero/src/
#include "common/util.h" // monero/src/
#include "config.h"
#include "cryptonote_config.h" // monero/src/
#include "db/storage.h"
#include "error.h"
#include "options.h"
#include "misc_log_ex.h" // monero/contrib/epee/include
#include "rpc/scanner/client.h"
#include "rpc/scanner/write_commands.h"
#include "scanner.h"
#undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "lws"
namespace
{
struct options
{
const command_line::arg_descriptor<std::string> config_file;
const command_line::arg_descriptor<unsigned short> log_level;
const command_line::arg_descriptor<std::string> lws_daemon;
const command_line::arg_descriptor<std::string> lws_pass;
const command_line::arg_descriptor<std::string> monerod_rpc;
const command_line::arg_descriptor<std::string> monerod_sub;
const command_line::arg_descriptor<std::string> network;
const command_line::arg_descriptor<std::size_t> scan_threads;
static std::string get_default_zmq()
{
static constexpr const char base[] = "tcp://127.0.0.1:";
switch (lws::config::network)
{
case cryptonote::TESTNET:
return base + std::to_string(config::testnet::ZMQ_RPC_DEFAULT_PORT);
case cryptonote::STAGENET:
return base + std::to_string(config::stagenet::ZMQ_RPC_DEFAULT_PORT);
case cryptonote::MAINNET:
default:
break;
}
return base + std::to_string(config::ZMQ_RPC_DEFAULT_PORT);
}
options()
: config_file{"config-file", "Specify any option in a config file; <name>=<value> on separate lines"}
, log_level{"log-level", "Log level [0-4]", 1}
, lws_daemon{"lws-daemon", "Specify monero-lws-daemon main process <[ip:]port>", ""}
, lws_pass{"lws-pass", "Specify monero-lws-daemon password", ""}
, monerod_rpc{"monerod-rpc", "Specify monero ZMQ RPC server <tcp://ip:port> or <ipc:///path>", get_default_zmq()}
, monerod_sub{"monerod-sub", "Specify monero ZMQ RPC server <tcp://ip:port> or <ipc:///path>", ""}
, network{"network", "<\"main\"|\"stage\"|\"test\"> - Blockchain net type", "main"}
, scan_threads{"scan-threads", "Number of scan threads", boost::thread::hardware_concurrency()}
{}
void prepare(boost::program_options::options_description& description) const
{
command_line::add_arg(description, config_file);
command_line::add_arg(description, log_level);
command_line::add_arg(description, lws_daemon);
command_line::add_arg(description, lws_pass);
command_line::add_arg(description, monerod_rpc);
command_line::add_arg(description, monerod_sub);
command_line::add_arg(description, network);
command_line::add_arg(description, scan_threads);
command_line::add_arg(description, command_line::arg_help);
}
void set_network(boost::program_options::variables_map const& args) const
{
const std::string net = command_line::get_arg(args, network);
if (net == "main")
lws::config::network = cryptonote::MAINNET;
else if (net == "stage")
lws::config::network = cryptonote::STAGENET;
else if (net == "test")
lws::config::network = cryptonote::TESTNET;
else
throw std::runtime_error{"Bad --network value"};
}
};
struct program
{
std::string lws_daemon;
std::string lws_pass;
std::string monerod_rpc;
std::string monerod_sub;
std::size_t scan_threads;
};
void print_help(std::ostream& out)
{
boost::program_options::options_description description{"Options"};
options{}.prepare(description);
out << "Usage: [options]" << std::endl;
out << description;
}
std::optional<program> get_program(int argc, char** argv)
{
namespace po = boost::program_options;
const options opts{};
po::variables_map args{};
{
po::options_description description{"Options"};
opts.prepare(description);
po::store(
po::command_line_parser(argc, argv).options(description).run(), args
);
po::notify(args);
if (!command_line::is_arg_defaulted(args, opts.config_file))
{
boost::filesystem::path config_path{command_line::get_arg(args, opts.config_file)};
if (!boost::filesystem::exists(config_path))
MONERO_THROW(lws::error::configuration, "Config file does not exist");
po::store(
po::parse_config_file<char>(config_path.string<std::string>().c_str(), description), args
);
po::notify(args);
}
}
if (command_line::get_arg(args, command_line::arg_help))
{
print_help(std::cout);
return std::nullopt;
}
opts.set_network(args); // do this first, sets global variable :/
mlog_set_log_level(command_line::get_arg(args, opts.log_level));
program prog{
command_line::get_arg(args, opts.lws_daemon),
command_line::get_arg(args, opts.lws_pass),
command_line::get_arg(args, opts.monerod_rpc),
command_line::get_arg(args, opts.monerod_sub),
command_line::get_arg(args, opts.scan_threads)
};
prog.scan_threads = std::max(std::size_t(1), prog.scan_threads);
if (command_line::is_arg_defaulted(args, opts.monerod_rpc))
prog.monerod_rpc = options::get_default_zmq();
return prog;
}
struct send_users
{
std::shared_ptr<lws::rpc::scanner::client> client_;
bool operator()(lws::rpc::client&, epee::span<const crypto::hash> chain, epee::span<const lws::account> users, epee::span<const lws::db::pow_sync> pow, const lws::scanner_options&)
{
if (!client_)
return false;
if (users.empty())
return true;
if (!pow.empty() || chain.empty())
return false;
std::vector<crypto::hash> chain_copy{};
chain_copy.reserve(chain.size());
std::copy(chain.begin(), chain.end(), std::back_inserter(chain_copy));
std::vector<lws::account> users_copy{};
users_copy.reserve(users.size());
for (const auto& user : users)
users_copy.push_back(user.clone());
MINFO("Processed " << chain.size() << " block(s) against " << users.size() << " account(s)");
lws::rpc::scanner::client::send_update(client_, std::move(users_copy), std::move(chain_copy));
return true;
}
};
void run_thread(lws::scanner_sync& self, std::shared_ptr<lws::rpc::scanner::client> client, lws::rpc::client& zclient, std::shared_ptr<lws::rpc::scanner::queue> queue)
{
struct stop_
{
lws::scanner_sync& self;
~stop_() { self.stop(); }
} stop{self};
if (!client || !queue)
return;
try
{
while (self.is_running())
{
std::vector<lws::account> users{};
auto status = queue->wait_for_accounts();
if (status.replace)
users = std::move(*status.replace);
users.insert(
users.end(),
std::make_move_iterator(status.push.begin()),
std::make_move_iterator(status.push.end())
);
if (!users.empty())
{
static constexpr const lws::scanner_options opts{
epee::net_utils::ssl_verification_t::system_ca, false, false
};
auto new_client = MONERO_UNWRAP(zclient.clone());
MONERO_UNWRAP(new_client.watch_scan_signals());
send_users send{client};
if (!lws::scanner::loop(self.stop_, std::move(send), std::nullopt, std::move(new_client), std::move(users), *queue, opts, false))
return;
}
}
}
catch (const std::exception& e)
{
self.shutdown();
MERROR(e.what());
}
catch (...)
{
self.shutdown();
MERROR("Unknown error");
}
}
void run(program prog)
{
MINFO("Using monerod ZMQ RPC at " << prog.monerod_rpc);
auto ctx = lws::rpc::context::make(std::move(prog.monerod_rpc), std::move(prog.monerod_sub), {}, {}, std::chrono::minutes{0}, false);
lws::scanner_sync self{};
/*! \NOTE `ctx will need a strand or lock if multiple threads use
`self.io.run()` in the future. */
boost::asio::signal_set signals{self.io_};
signals.add(SIGINT);
signals.async_wait([&self] (const boost::system::error_code& error, int)
{
if (error != boost::asio::error::operation_aborted)
self.shutdown();
});
for (;;)
{
if (self.stop_ && self.is_running())
boost::this_thread::sleep_for(boost::chrono::seconds{1});
self.stop_ = false;
self.io_.reset();
if (self.has_shutdown())
return;
std::vector<lws::rpc::client> zclients;
zclients.reserve(prog.scan_threads);
std::vector<std::shared_ptr<lws::rpc::scanner::queue>> queues{};
queues.resize(prog.scan_threads);
for (auto& queue : queues)
{
queue = std::make_shared<lws::rpc::scanner::queue>();
zclients.push_back(MONERO_UNWRAP(ctx.connect()));
}
auto client = std::make_shared<lws::rpc::scanner::client>(
self.io_, prog.lws_daemon, prog.lws_pass, queues
);
lws::rpc::scanner::client::connect(client);
std::vector<boost::thread> threads{};
threads.reserve(prog.scan_threads);
struct stop_
{
lws::scanner_sync& self;
lws::rpc::context& ctx;
std::vector<std::shared_ptr<lws::rpc::scanner::queue>> queues;
std::vector<boost::thread>& threads;
~stop_()
{
self.stop();
if (self.has_shutdown())
ctx.raise_abort_process();
else
ctx.raise_abort_scan();
for (const auto& queue : queues)
{
if (queue)
queue->stop();
}
for (auto& thread : threads)
thread.join();
}
} stop{self, ctx, queues, threads};
boost::thread::attributes attrs;
attrs.set_stack_size(THREAD_STACK_SIZE);
for (std::size_t i = 0; i < prog.scan_threads; ++i)
threads.emplace_back(attrs, std::bind(&run_thread, std::ref(self), client, std::ref(zclients[i]), queues[i]));
self.io_.run();
} // while scanner running
}
} // anonymous
int main(int argc, char** argv)
{
tools::on_startup(); // if it throws, don't use MERROR just print default msg
try
{
std::optional<program> prog;
try
{
prog = get_program(argc, argv);
}
catch (std::exception const& e)
{
std::cerr << e.what() << std::endl << std::endl;
print_help(std::cerr);
return EXIT_FAILURE;
}
if (prog)
run(std::move(*prog));
}
catch (std::exception const& e)
{
MERROR(e.what());
return EXIT_FAILURE;
}
catch (...)
{
MERROR("Unknown exception");
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}

View file

@ -40,7 +40,7 @@
#include "wire/msgpack/fwd.h"
namespace lws
{
{
//! Tracks a subset of DB account info for scanning/updating.
class account
{
@ -127,4 +127,18 @@ namespace lws
//! Track a possible `spend`.
void add_spend(db::spend const& spend);
};
struct by_height
{
bool operator()(account const& left, account const& right) const noexcept
{
return left.scan_height() < right.scan_height();
}
bool operator()(db::account const& left, db::account const& right) const noexcept
{
return left.scan_height < right.scan_height;
}
};
} // lws

View file

@ -457,20 +457,37 @@ namespace db
map_webhook_value(dest, source, payment_id);
}
void write_bytes(wire::writer& dest, const webhook_tx_confirmation& self)
namespace
{
template<typename F, typename T, typename U>
void map_webhook_confirmation(F& format, T& self, U& payment_id)
{
wire::object(format,
wire::field<0>("event", std::ref(self.key.type)),
wire::field<1>("payment_id", std::ref(payment_id)),
wire::field<2>("token", std::ref(self.value.second.token)),
wire::field<3>("confirmations", std::ref(self.value.second.confirmations)),
wire::field<4>("event_id", std::ref(self.value.first.event_id)),
WIRE_FIELD_ID(5, tx_info)
);
}
}
void read_bytes(wire::reader& source, webhook_tx_confirmation& dest)
{
crypto::hash8 payment_id{};
map_webhook_confirmation(source, dest, payment_id);
static_assert(sizeof(payment_id) == sizeof(dest.value.first.payment_id), "bad memcpy");
std::memcpy(std::addressof(dest.value.first.payment_id), std::addressof(payment_id), sizeof(payment_id));
}
void write_bytes(wire::writer& dest, const webhook_tx_confirmation& source)
{
crypto::hash8 payment_id;
static_assert(sizeof(payment_id) == sizeof(self.value.first.payment_id), "bad memcpy");
std::memcpy(std::addressof(payment_id), std::addressof(self.value.first.payment_id), sizeof(payment_id));
// to be sent to remote url
wire::object(dest,
wire::field<0>("event", std::cref(self.key.type)),
wire::field<1>("payment_id", std::cref(payment_id)),
wire::field<2>("token", std::cref(self.value.second.token)),
wire::field<3>("confirmations", std::cref(self.value.second.confirmations)),
wire::field<4>("event_id", std::cref(self.value.first.event_id)),
WIRE_FIELD_ID(5, tx_info)
);
static_assert(sizeof(payment_id) == sizeof(source.value.first.payment_id), "bad memcpy");
std::memcpy(std::addressof(payment_id), std::addressof(source.value.first.payment_id), sizeof(payment_id));
map_webhook_confirmation(dest, source, payment_id);
}
static void write_bytes(wire::writer& dest, const output::spend_meta_& self)

View file

@ -291,8 +291,7 @@ namespace db
sizeof(output) == 8 + 32 + (8 * 3) + (4 * 2) + 32 + (8 * 2) + (32 * 3) + 7 + 1 + 32 + 8 + 2 * 4,
"padding in output"
);
void read_bytes(wire::reader&, output&);
void write_bytes(wire::writer&, const output&);
WIRE_DECLARE_OBJECT(output);
//! Information about a possible spend of a received `output`.
struct spend
@ -384,7 +383,7 @@ namespace db
webhook_value value;
output tx_info;
};
void write_bytes(wire::writer&, const webhook_tx_confirmation&);
WIRE_DECLARE_OBJECT(webhook_tx_confirmation);
//! Returned by DB when a webhook event "tripped"
struct webhook_tx_spend

View file

@ -934,6 +934,29 @@ namespace db
return accounts.get_value<account>(value);
}
expect<lws::account> storage_reader::get_full_account(const account& user)
{
std::vector<std::pair<db::output_id, db::address_index>> receives{};
std::vector<crypto::public_key> pubs{};
auto receive_list = get_outputs(user.id);
if (!receive_list)
return receive_list.error();
const std::size_t elems = receive_list->count();
receives.reserve(elems);
pubs.reserve(elems);
for (auto output = receive_list->make_iterator(); !output.is_end(); ++output)
{
auto id = output.get_value<MONERO_FIELD(db::output, spend_meta.id)>();
auto subaddr = output.get_value<MONERO_FIELD(db::output, recipient)>();
receives.emplace_back(std::move(id), std::move(subaddr));
pubs.emplace_back(output.get_value<MONERO_FIELD(db::output, pub)>());
}
return lws::account{user, std::move(receives), std::move(pubs)};
}
expect<std::pair<account_status, account>>
storage_reader::get_account(account_address const& address) noexcept
{
@ -2810,8 +2833,13 @@ namespace db
accounts_by_address.get_value<MONERO_FIELD(account_by_address, lookup)>(temp_value).value().status;
MONERO_LMDB_CHECK(mdb_cursor_get(accounts_cur.get(), &key, &value, MDB_GET_BOTH));
}
/* The check below is `<` instead of `!=` because of remote scanning -
a "check-in" can occur before the user accounts are replaced.
Duplicate writes should be supported as this (duplicate writes)
happened historically due to a different bug involving scan heights.*/
expect<account> existing = accounts.get_value<account>(value);
if (!existing || existing->scan_height != user->scan_height())
if (!existing || existing->scan_height < user->scan_height())
continue; // to next account
// Don't re-store data if already scanned

View file

@ -40,6 +40,7 @@
#include "lmdb/transaction.h"
#include "lmdb/key_stream.h"
#include "lmdb/value_stream.h"
#include "wire/msgpack/fwd.h"
namespace cryptonote { class checkpoints; }
namespace lws
@ -132,6 +133,9 @@ namespace db
//! \return Info for account `id` iff it has `status`.
expect<account> get_account(const account_status status, const account_id id) noexcept;
//! \return Account with outputs and spends
expect<lws::account> get_full_account(const account&);
//! \return Info related to `address`.
expect<std::pair<account_status, account>>
get_account(account_address const& address) noexcept;

View file

@ -1,4 +1,4 @@
# Copyright (c) 2020, The Monero Project
# Copyright (c) 2020-2024, The Monero Project
#
# All rights reserved.
#
@ -26,6 +26,8 @@
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
add_subdirectory(scanner)
set(monero-lws-rpc_sources admin.cpp client.cpp daemon_pub.cpp daemon_zmq.cpp light_wallet.cpp lws_pub.cpp rates.cpp)
set(monero-lws-rpc_headers admin.h client.h daemon_pub.h daemon_zmq.h fwd.h json.h light_wallet.h lws_pub.h rates.h)

View file

@ -223,63 +223,6 @@ namespace rpc
};
} // detail
expect<account_push> account_push::make(std::shared_ptr<detail::context> ctx) noexcept
{
MONERO_PRECOND(ctx != nullptr);
account_push out{ctx};
out.sock.reset(zmq_socket(ctx->comm.get(), ZMQ_PUSH));
if (out.sock == nullptr)
return {net::zmq::get_error_code()};
const std::string bind = account_endpoint + std::to_string(++ctx->account_counter);
MONERO_CHECK(do_set_option(out.sock.get(), ZMQ_LINGER, account_zmq_linger));
MONERO_ZMQ_CHECK(zmq_bind(out.sock.get(), bind.c_str()));
return {std::move(out)};
}
account_push::~account_push() noexcept
{}
expect<void> account_push::push(epee::span<const lws::account> accounts, std::chrono::seconds timeout)
{
MONERO_PRECOND(ctx != nullptr);
assert(sock.get() != nullptr);
for (const lws::account& account : accounts)
{
// use integer id values (quick and fast)
wire::msgpack_slice_writer dest{true};
try
{
wire_write::bytes(dest, account);
}
catch (const wire::exception& e)
{
return {e.code()};
}
epee::byte_slice message{dest.take_sink()};
/* This is being pushed by the thread that monitors for shutdown, so
no signal is expected. */
expect<void> sent;
const auto start = std::chrono::steady_clock::now();
while (!(sent = net::zmq::send(message.clone(), sock.get(), ZMQ_DONTWAIT)))
{
if (sent != net::zmq::make_error_code(EAGAIN))
return sent.error();
if (!scanner::is_running())
return {error::signal_abort_process};
const auto elapsed = std::chrono::steady_clock::now() - start;
if (timeout <= elapsed)
return {error::daemon_timeout};
boost::this_thread::sleep_for(boost::chrono::milliseconds{10});
}
}
return success();
}
expect<void> client::get_response(cryptonote::rpc::Message& response, const std::chrono::seconds timeout, const source_location loc)
{
expect<std::string> message = get_message(timeout);
@ -376,18 +319,6 @@ namespace rpc
return do_subscribe(signal_sub.get(), abort_scan_signal);
}
expect<void> client::enable_pull_accounts()
{
detail::socket new_sock{zmq_socket(ctx->comm.get(), ZMQ_PULL)};
if (new_sock == nullptr)
return {net::zmq::get_error_code()};
const std::string connect =
account_endpoint + std::to_string(ctx->account_counter);
MONERO_ZMQ_CHECK(zmq_connect(new_sock.get(), connect.c_str()));
account_pull = std::move(new_sock);
return success();
}
expect<std::vector<std::pair<client::topic, std::string>>> client::wait_for_block()
{
MONERO_PRECOND(ctx != nullptr);
@ -501,32 +432,6 @@ namespace rpc
return rc;
}
expect<std::vector<lws::account>> client::pull_accounts()
{
MONERO_PRECOND(ctx != nullptr);
if (!account_pull)
MONERO_CHECK(enable_pull_accounts());
std::vector<lws::account> out{};
for (;;)
{
expect<std::string> next = net::zmq::receive(account_pull.get(), ZMQ_DONTWAIT);
if (!next)
{
if (net::zmq::make_error_code(EAGAIN))
break;
return next.error();
}
out.emplace_back();
const std::error_code error =
wire::msgpack::from_bytes(epee::byte_slice{std::move(*next)}, out.back());
if (error)
return error;
}
return {std::move(out)};
}
expect<rates> client::get_rates() const
{
MONERO_PRECOND(ctx != nullptr);
@ -643,6 +548,13 @@ namespace rpc
return ctx->daemon_addr;
}
std::chrono::minutes context::cache_interval() const
{
if (ctx == nullptr)
MONERO_THROW(common_error::kInvalidArgument, "Invalid lws::rpc::context");
return ctx->cache_interval;
}
expect<void> context::raise_abort_scan() noexcept
{
MONERO_PRECOND(ctx != nullptr);

View file

@ -68,31 +68,6 @@ namespace rpc
std::string routing;
};
//! Every scanner "reset", a new socket is created so old messages are discarded
class account_push
{
std::shared_ptr<detail::context> ctx;
detail::socket sock;
explicit account_push(std::shared_ptr<detail::context> ctx) noexcept
: ctx(std::move(ctx)), sock()
{}
public:
static expect<account_push> make(std::shared_ptr<detail::context> ctx) noexcept;
account_push(const account_push&) = delete;
account_push(account_push&&) = default;
~account_push() noexcept;
account_push& operator=(const account_push&) = delete;
account_push& operator=(account_push&&) = default;
//! Push new `accounts` to worker threads. Each account is sent in unique message
expect<void> push(epee::span<const lws::account> accounts, std::chrono::seconds timeout);
};
//! Abstraction for ZMQ RPC client. Only `get_rates()` thread-safe; use `clone()`.
class client
{
@ -100,10 +75,9 @@ namespace rpc
detail::socket daemon;
detail::socket daemon_sub;
detail::socket signal_sub;
detail::socket account_pull;
explicit client(std::shared_ptr<detail::context> ctx) noexcept
: ctx(std::move(ctx)), daemon(), daemon_sub(), signal_sub(), account_pull()
: ctx(std::move(ctx)), daemon(), daemon_sub(), signal_sub()
{}
//! Expect `response` as the next message payload unless error.
@ -156,9 +130,6 @@ namespace rpc
//! `wait`, `send`, and `receive` will watch for `raise_abort_scan()`.
expect<void> watch_scan_signals() noexcept;
//! Register `this` client as listening for new accounts
expect<void> enable_pull_accounts();
//! Wait for new block announce or internal timeout.
expect<std::vector<std::pair<topic, std::string>>> wait_for_block();
@ -259,18 +230,15 @@ namespace rpc
//! \return The full address of the monerod ZMQ daemon.
std::string const& daemon_address() const;
//! \return Exchange rate checking interval
std::chrono::minutes cache_interval() const;
//! \return Client connection. Thread-safe.
expect<client> connect() const noexcept
{
return client::make(ctx);
}
//! Create a new account push state
expect<account_push> bind_push() const noexcept
{
return account_push::make(ctx);
}
/*!
All block `client::send`, `client::receive`, and `client::wait` calls
originating from `this` object AND whose `watch_scan_signal` method was

View file

@ -34,4 +34,5 @@ namespace lws
class client;
}
struct rates;
class scan_manager;
}

View file

@ -0,0 +1,37 @@
# Copyright (c) 2024, The Monero Project
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification, are
# permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of
# conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list
# of conditions and the following disclaimer in the documentation and/or other
# materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be
# used to endorse or promote products derived from this software without specific
# prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
# THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
set(monero-lws-rpc-scanner_sources
client.cpp commands.cpp connection.cpp queue.cpp server.cpp write_commands.cpp
)
set(monero-lws-rpc-scanner_headers
client.h commands.h connection.h fwd.h queue.h read_commands.h server.h write_commands.h
)
add_library(monero-lws-rpc-scanner ${monero-lws-rpc-scanner_sources} ${monero-lws-rpc-scanner_headers})
target_link_libraries(monero-lws-rpc-scanner monero::libraries monero-lws-wire-msgpack)

254
src/rpc/scanner/client.cpp Normal file
View file

@ -0,0 +1,254 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "client.h"
#include <boost/asio/coroutine.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/numeric/conversion/cast.hpp>
#include <chrono>
#include "common/expect.h" // monero/src
#include "misc_log_ex.h" // monero/contrib/epee/include
#include "net/net_utils_base.h" // monero/contrib/epee/include
#include "rpc/scanner/commands.h"
#include "rpc/scanner/connection.h"
#include "rpc/scanner/read_commands.h"
#include "rpc/scanner/server.h"
#include "rpc/scanner/write_commands.h"
namespace lws { namespace rpc { namespace scanner
{
namespace
{
//! Connection completion timeout
constexpr const std::chrono::seconds connect_timeout{5};
//! Retry connection timeout
constexpr const std::chrono::seconds reconnect_interval{10};
struct push_accounts_handler
{
using input = push_accounts;
static bool handle(const std::shared_ptr<client>& self, input msg)
{
if (!self)
return false;
if (msg.users.empty())
return true;
client::push_accounts(self, std::move(msg.users));
return true;
}
};
struct replace_accounts_handler
{
using input = replace_accounts;
static bool handle(const std::shared_ptr<client>& self, input msg)
{
if (!self)
return false;
// push empty accounts too, indicates we should stop scanning
client::replace_accounts(self, std::move(msg.users));
return true;
}
};
} // anonymous
//! \brief Closes the socket, forcing all outstanding ops to cancel.
struct client::close
{
std::shared_ptr<client> self_;
void operator()(const boost::system::error_code& error = {}) const
{
if (self_ && error != boost::asio::error::operation_aborted)
{
// The `cleanup()` function is meant to cleanup then destruct connection
assert(self_->strand_.running_in_this_thread());
boost::system::error_code error{};
self_->sock_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
error = boost::system::error_code{};
self_->sock_.close(error);
if (error)
MERROR("Error when closing socket: " << error.message());
}
}
};
//! \brief
class client::connector : public boost::asio::coroutine
{
std::shared_ptr<client> self_;
public:
explicit connector(std::shared_ptr<client> self)
: boost::asio::coroutine(), self_(std::move(self))
{}
void operator()(const boost::system::error_code& error = {})
{
if (!self_ || error == boost::asio::error::operation_aborted)
return;
assert(self_->strand_.running_in_this_thread());
BOOST_ASIO_CORO_REENTER(*this)
{
for (;;)
{
MINFO("Attempting connection to " << self_->server_address_);
self_->connect_timer_.expires_from_now(connect_timeout);
self_->connect_timer_.async_wait(self_->strand_.wrap(close{self_}));
BOOST_ASIO_CORO_YIELD self_->sock_.async_connect(self_->server_address_, self_->strand_.wrap(*this));
if (!self_->connect_timer_.cancel() || error)
{
MERROR("Connection attempt failed: " << error.message());
close{self_}();
}
else
break;
MINFO("Retrying connection in " << std::chrono::seconds{reconnect_interval}.count() << " seconds");
self_->connect_timer_.expires_from_now(reconnect_interval);
BOOST_ASIO_CORO_YIELD self_->connect_timer_.async_wait(self_->strand_.wrap(*this));
}
MINFO("Connection made to " << self_->server_address_);
self_->connected_ = true;
const auto threads = boost::numeric_cast<std::uint32_t>(self_->local_.size());
write_command(self_, initialize{self_->pass_, threads});
read_commands(self_);
}
}
};
client::client(boost::asio::io_service& io, const std::string& address, std::string pass, std::vector<std::shared_ptr<queue>> local)
: connection(io),
local_(std::move(local)),
pass_(std::move(pass)),
next_push_(0),
connect_timer_(io),
server_address_(rpc::scanner::server::get_endpoint(address)),
connected_(false)
{
for (const auto& queue : local_)
{
if (!queue)
MONERO_THROW(common_error::kInvalidArgument, "nullptr local queue");
}
}
client::~client()
{}
//! \return Handlers for commands from server
const std::array<client::command, 2>& client::commands() noexcept
{
static constexpr const std::array<command, 2> value{{
call<push_accounts_handler, client>,
call<replace_accounts_handler, client>
}};
static_assert(push_accounts_handler::input::id() == 0);
static_assert(replace_accounts_handler::input::id() == 1);
return value;
}
void client::connect(const std::shared_ptr<client>& self)
{
if (!self)
MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
self->strand_.dispatch([self] ()
{
if (!self->sock_.is_open())
connector{self}();
});
}
void client::push_accounts(const std::shared_ptr<client>& self, std::vector<lws::account> users)
{
if (!self)
MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
self->strand_.dispatch([self, users = std::move(users)] () mutable
{
/* Keep this algorithm simple, one user at a time. A little more difficult
to do multiples at once */
MINFO("Adding " << users.size() << " new accounts to workload");
for (std::size_t i = 0; i < users.size(); ++i)
{
self->local_[self->next_push_++]->push_accounts(
std::make_move_iterator(users.begin() + i),
std::make_move_iterator(users.begin() + i + 1)
);
self->next_push_ %= self->local_.size();
}
});
}
void client::replace_accounts(const std::shared_ptr<client>& self, std::vector<lws::account> users)
{
if (!self)
MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
self->strand_.dispatch([self, users = std::move(users)] () mutable
{
MINFO("Received " << users.size() << " accounts as new workload");
for (std::size_t i = 0; i < self->local_.size(); ++i)
{
// count == 0 is OK. This will tell the thread to stop working
const auto count = users.size() / (self->local_.size() - i);
std::vector<lws::account> next{
std::make_move_iterator(users.end() - count),
std::make_move_iterator(users.end())
};
users.erase(users.end() - count, users.end());
self->local_[i]->replace_accounts(std::move(next));
}
self->next_push_ = 0;
});
}
void client::send_update(const std::shared_ptr<client>& self, std::vector<lws::account> users, std::vector<crypto::hash> blocks)
{
if (!self)
MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
self->strand_.dispatch([self, users = std::move(users), blocks = std::move(blocks)] () mutable
{
if (!self->connected_)
MONERO_THROW(common_error::kInvalidArgument, "not connected");
write_command(self, update_accounts{std::move(users), std::move(blocks)});
});
}
void client::cleanup()
{
base_cleanup();
GET_IO_SERVICE(sock_).stop();
}
}}} // lws // rpc // scanner

87
src/rpc/scanner/client.h Normal file
View file

@ -0,0 +1,87 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp>
#include <cstddef>
#include <memory>
#include <string>
#include <vector>
#include "crypto/hash.h" // monero/src
#include "db/fwd.h"
#include "rpc/scanner/connection.h"
#include "rpc/scanner/queue.h"
namespace lws { namespace rpc { namespace scanner
{
//! \brief
class client : public connection
{
const std::vector<std::shared_ptr<queue>> local_;
const std::string pass_;
std::size_t next_push_;
boost::asio::steady_timer connect_timer_;
const boost::asio::ip::tcp::endpoint server_address_;
bool connected_;
struct close;
class connector;
public:
using command = bool(*)(const std::shared_ptr<client>&);
//! Does not start connection to `address`, see `connect`.
explicit client(boost::asio::io_context& io, const std::string& address, std::string pass, std::vector<std::shared_ptr<queue>> local);
client(const client&) = delete;
client(client&&) = delete;
~client();
client& operator=(const client&) = delete;
client& operator=(client&&) = delete;
//! \return Handlers for client commands
static const std::array<command, 2>& commands() noexcept;
//! Start a connect loop on `self`.
static void connect(const std::shared_ptr<client>& self);
//! Push `users` to local queues. Synchronizes with latest connection.
static void push_accounts(const std::shared_ptr<client>& self, std::vector<lws::account> users);
//! Replace `users` on local queues. Synchronizes with latest connection.
static void replace_accounts(const std::shared_ptr<client>& self, std::vector<lws::account> users);
//! Send `users` upstream for disk storage
static void send_update(const std::shared_ptr<client>& self, std::vector<lws::account> users, std::vector<crypto::hash> blocks);
//! Closes socket and calls stop on `io_service`.
void cleanup();
};
}}} // lws // rpc // scanner

View file

@ -0,0 +1,85 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "commands.h"
#include "db/account.h"
#include "db/data.h"
#include "wire/adapted/crypto.h"
#include "wire/vector.h"
#include "wire/msgpack.h"
#include "wire/wrapper/trusted_array.h"
namespace lws { namespace rpc { namespace scanner
{
namespace
{
template<typename F, typename T>
void map_initialize(F& format, T& self)
{
wire::object(format, WIRE_FIELD_ID(0, pass), WIRE_FIELD_ID(1, threads));
}
}
WIRE_MSGPACK_DEFINE_OBJECT(initialize, map_initialize);
namespace
{
template<typename F, typename T>
void map_account_update(F& format, T& self)
{
wire::object(format,
wire::optional_field<0>("users", wire::trusted_array(std::ref(self.users))),
wire::optional_field<1>("blocks", wire::trusted_array(std::ref(self.blocks)))
);
}
}
WIRE_MSGPACK_DEFINE_OBJECT(update_accounts, map_account_update)
namespace
{
template<typename F, typename T>
void map_push_accounts(F& format, T& self)
{
wire::object(format,
wire::optional_field<0>("users", wire::trusted_array(std::ref(self.users)))
);
}
}
WIRE_MSGPACK_DEFINE_OBJECT(push_accounts, map_push_accounts);
namespace
{
template<typename F, typename T>
void map_replace_accounts(F& format, T& self)
{
wire::object(format,
wire::optional_field<0>("users", wire::trusted_array(std::ref(self.users)))
);
}
}
WIRE_MSGPACK_DEFINE_OBJECT(replace_accounts, map_replace_accounts);
}}} // lws // rpc // scanner

101
src/rpc/scanner/commands.h Normal file
View file

@ -0,0 +1,101 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <boost/endian/buffers.hpp>
#include <cstdint>
#include <vector>
#include "crypto/hash.h" // monero/src
#include "db/fwd.h"
#include "wire/msgpack/fwd.h"
namespace lws { namespace rpc { namespace scanner
{
/*
`monero-lws-daemon` is "server" and `monero-lws-scanner` is "client".
*/
//! \brief Data sent before msgpack payload
struct header
{
using length_type = boost::endian::little_uint32_buf_t;
header() = delete;
std::uint8_t version;
std::uint8_t id;
length_type length;
};
static_assert(sizeof(header) == 6);
/*
Client to server messages.
*/
//! \brief Inform server of info needed to spawn work to client.
struct initialize
{
initialize() = delete;
static constexpr std::uint8_t id() noexcept { return 0; }
std::string pass;
std::uint32_t threads;
};
WIRE_MSGPACK_DECLARE_OBJECT(initialize);
//! Command that updates database account records
struct update_accounts
{
update_accounts() = delete;
static constexpr std::uint8_t id() noexcept { return 1; }
std::vector<lws::account> users;
std::vector<crypto::hash> blocks;
};
WIRE_MSGPACK_DECLARE_OBJECT(update_accounts);
/*
Server to client messages.
*/
//! \brief New accounts to add/push to scanning list
struct push_accounts
{
push_accounts() = delete;
static constexpr std::uint8_t id() noexcept { return 0; }
std::vector<lws::account> users;
};
WIRE_MSGPACK_DECLARE_OBJECT(push_accounts);
//! \brief Replace account scanning list with this new one
struct replace_accounts
{
replace_accounts() = delete;
static constexpr const std::uint8_t id() noexcept { return 1; }
std::vector<lws::account> users;
};
WIRE_MSGPACK_DECLARE_OBJECT(replace_accounts);
}}} // lws // rpc // scanner

View file

@ -0,0 +1,83 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "connection.h"
#include "misc_log_ex.h" // monero/contrib/epee/include
namespace lws { namespace rpc { namespace scanner
{
connection::connection(boost::asio::io_service& io)
: read_buf_(),
write_bufs_(),
sock_(io),
write_timeout_(io),
strand_(io),
next_{},
cleanup_(false)
{}
connection::~connection()
{}
boost::asio::ip::tcp::endpoint connection::remote_endpoint()
{
boost::system::error_code error{};
return sock_.remote_endpoint(error);
}
boost::asio::mutable_buffer connection::read_buffer(const std::size_t size)
{
assert(strand_.running_in_this_thread());
read_buf_.clear();
read_buf_.put_n(0, size);
return boost::asio::mutable_buffer(read_buf_.data(), size);
}
boost::asio::const_buffer connection::write_buffer() const
{
assert(strand_.running_in_this_thread());
if (write_bufs_.empty())
return boost::asio::const_buffer(nullptr, 0);
return boost::asio::const_buffer(write_bufs_.front().data(), write_bufs_.front().size());
}
void connection::base_cleanup()
{
assert(strand_.running_in_this_thread());
if (!cleanup_)
MINFO("Disconnected from " << remote_endpoint() << " / " << this);
cleanup_ = true;
boost::system::error_code error{};
sock_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
error = boost::system::error_code{};
sock_.close(error);
if (error)
MERROR("Error when closing socket: " << error.message());
write_timeout_.cancel();
}
}}} // lws // rpc // scanner

View file

@ -0,0 +1,71 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <atomic>
#include <boost/asio/buffer.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/strand.hpp>
#include <cstdint>
#include <deque>
#include <string>
#include "byte_slice.h" // monero/contrib/epee/include
#include "byte_stream.h" // monero/contrib/epee/include
#include "rpc/scanner/commands.h"
namespace lws { namespace rpc { namespace scanner
{
//! \brief Base class for `client_connection` and `server_connection`. Always use `strand_`.
struct connection
{
// Leave public for coroutines `read_commands` and `write_commands`
epee::byte_stream read_buf_;
std::deque<epee::byte_slice> write_bufs_;
boost::asio::ip::tcp::socket sock_;
boost::asio::steady_timer write_timeout_;
boost::asio::io_service::strand strand_;
header next_;
bool cleanup_;
explicit connection(boost::asio::io_service& io);
~connection();
boost::asio::ip::tcp::endpoint remote_endpoint();
//! \return ASIO compatible read buffer of `size`.
boost::asio::mutable_buffer read_buffer(const std::size_t size);
//! \return ASIO compatible write buffer
boost::asio::const_buffer write_buffer() const;
//! Cancels operations on socket and timer. Also updates `cleanup_ = true`.
void base_cleanup();
};
}}} // lws // rpc // scanner

42
src/rpc/scanner/fwd.h Normal file
View file

@ -0,0 +1,42 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
namespace lws { namespace rpc { namespace scanner
{
class client;
struct connection;
struct give_accounts;
struct header;
struct push_accounts;
class queue;
template<typename> struct do_read_commands;
class server;
struct take_accounts;
struct update_accounts;
template<typename> struct write_commands;
}}} // lws // rpc // scanner

91
src/rpc/scanner/queue.cpp Normal file
View file

@ -0,0 +1,91 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "queue.h"
#include "db/account.h"
namespace lws { namespace rpc { namespace scanner
{
queue::status queue::do_get_accounts()
{
status out{
std::move(replace_), std::move(push_), user_count_
};
replace_ = std::nullopt;
push_.clear();
push_.shrink_to_fit();
return out;
}
queue::queue()
: replace_(), push_(), user_count_(0), sync_(), poll_(), stop_(false)
{}
queue::~queue()
{}
void queue::stop()
{
{
const boost::lock_guard<boost::mutex> lock{sync_};
stop_ = true;
}
poll_.notify_all();
}
std::size_t queue::user_count()
{
const boost::lock_guard<boost::mutex> lock{sync_};
return user_count_;
}
queue::status queue::get_accounts()
{
const boost::lock_guard<boost::mutex> lock{sync_};
return do_get_accounts();
}
queue::status queue::wait_for_accounts()
{
boost::unique_lock<boost::mutex> lock{sync_};
if (!replace_ && push_.empty() && !stop_)
poll_.wait(lock, [this] () { return replace_ || !push_.empty() || stop_; });
return do_get_accounts();
}
void queue::replace_accounts(std::vector<lws::account> users)
{
{
const boost::lock_guard<boost::mutex> lock{sync_};
replace_ = std::move(users);
user_count_ = replace_->size();
push_.clear();
}
poll_.notify_all();
}
}}} // lws // rpc // scanner

94
src/rpc/scanner/queue.h Normal file
View file

@ -0,0 +1,94 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/lock_guard.hpp>
#include <boost/thread/mutex.hpp>
#include <cstddef>
#include <optional>
#include <vector>
#include "db/fwd.h"
#include "rpc/scanner/commands.h"
namespace lws { namespace rpc { namespace scanner
{
//! Notifies worker thread of new accounts to scan. All functions thread-safe.
class queue
{
public:
//! Status of upstream scan requests.
struct status
{
std::optional<std::vector<lws::account>> replace; //!< Empty optional means replace **not** requested.
std::vector<lws::account> push;
std::size_t user_count;
};
private:
std::optional<std::vector<lws::account>> replace_;
std::vector<lws::account> push_;
std::size_t user_count_;
boost::mutex sync_;
boost::condition_variable poll_;
bool stop_;
status do_get_accounts();
public:
queue();
~queue();
//! `wait_for_accounts()` will return immediately, permanently.
void stop();
//! \return Total number of users given to this local thread
std::size_t user_count();
//! \return Replacement and "push" accounts
status get_accounts();
//! Blocks until replace or push accounts become available OR `stop()` is called.
status wait_for_accounts();
//! Replace existing accounts on thread with new `users`
void replace_accounts(std::vector<lws::account> users);
//! Push/add new accounts to scan on thread
template<typename T>
void push_accounts(T begin, T end)
{
{
const boost::lock_guard<boost::mutex> lock{sync_};
user_count_ += (end - begin);
push_.insert(push_.end(), std::make_move_iterator(begin), std::make_move_iterator(end));
}
poll_.notify_all();
}
};
}}} // lws // rpc // scanner

View file

@ -0,0 +1,148 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <boost/asio/coroutine.hpp>
#include <boost/asio/read.hpp>
#include <cstring>
#include <limits>
#include <memory>
#include <system_error>
#include <type_traits>
#include <utility>
#include <vector>
#include "byte_slice.h" // monero/contrib/epee/include
#include "db/account.h"
#include "misc_log_ex.h"
#include "rpc/scanner/connection.h"
#include "wire/msgpack/base.h"
#include "wire/msgpack/read.h"
namespace lws { namespace rpc { namespace scanner
{
/*! Function for binding to command callables. Must be exeucting "inside of"
connection strand.
\tparam F concept requirements:
* Must have inner `typedef` named `input` which specifies a type
that can read from msgpack bytes.
* Must have static function `handle` with interface
`bool(std::shared_ptr<T>, F::input)`.
\tparam T concept requirements:
* Must be derived from `lws::rpc::scanner::connection`. */
template<typename F, typename T>
bool call(const std::shared_ptr<T>& self)
{
static_assert(std::is_base_of<connection, T>{});
if (!self)
return false;
assert(self->strand_.running_in_this_thread());
typename F::input data{};
const std::error_code error =
wire::msgpack::from_bytes(epee::byte_slice{std::move(self->read_buf_)}, data);
self->read_buf_.clear();
if (error)
{
MERROR("Failed to unpack message (from " << self->remote_endpoint() << "): " << error.message());
return false;
}
return F::handle(self, std::move(data));
}
/*! \brief ASIO coroutine for reading remote client OR server commands.
\tparam T concept requirements:
* Must be derived from `lws::rpc::scanner::connection`.
* Must have `cleanup()` function that invokes `base_cleanup()`, and
does any other necessary work given that the socket connection is being
terminated.
* Must have a static `commands()` function, which returns a `std::array`
of `bool(std::shared_ptr<T>)` callables. The position in the array
determines the command number. */
template<typename T>
class do_read_commands : public boost::asio::coroutine
{
static_assert(std::is_base_of<connection, T>{});
const std::shared_ptr<T> self_;
public:
explicit do_read_commands(std::shared_ptr<T> self)
: boost::asio::coroutine(), self_(std::move(self))
{}
//! Invoke with no arguments to start read commands loop
void operator()(const boost::system::error_code& error = {}, const std::size_t transferred = 0)
{
if (!self_)
return;
assert(self_->strand_.running_in_this_thread());
if (error)
{
if (error != boost::asio::error::operation_aborted)
{
MERROR("Read error on socket (" << self_->remote_endpoint() << "): " << error.message());
self_->cleanup();
}
return;
}
if (self_->cleanup_)
return; // callback queued before cancellation
BOOST_ASIO_CORO_REENTER(*this)
{
for (;;) // multiple commands
{
// indefinite read timeout (waiting for next command)
BOOST_ASIO_CORO_YIELD boost::asio::async_read(self_->sock_, self_->read_buffer(sizeof(self_->next_)), self_->strand_.wrap(*this));
std::memcpy(std::addressof(self_->next_), self_->read_buf_.data(), sizeof(self_->next_));
static_assert(std::numeric_limits<header::length_type::value_type>::max() <= std::numeric_limits<std::size_t>::max());
BOOST_ASIO_CORO_YIELD boost::asio::async_read(self_->sock_, self_->read_buffer(self_->next_.length.value()), self_->strand_.wrap(*this));
const auto& commands = T::commands();
if (commands.size() <= self_->next_.id || !commands[self_->next_.id](self_))
{
self_->cleanup();
return; // stop reading commands
}
}
}
}
};
template<typename T>
bool read_commands(const std::shared_ptr<T>& self)
{
if (!self)
return false;
self->strand_.dispatch(do_read_commands{self});
return true;
}
}}} // lws // rpc // scanner

528
src/rpc/scanner/server.cpp Normal file
View file

@ -0,0 +1,528 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "server.h"
#include <boost/asio/coroutine.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/numeric/conversion/cast.hpp>
#include <sodium/utils.h>
#include <sodium/randombytes.h>
#include <vector>
#include "byte_slice.h" // monero/contrib/epee/include
#include "byte_stream.h" // monero/contrib/epee/include
#include "common/expect.h" // monero/src/
#include "error.h"
#include "misc_log_ex.h" // monero/contrib/epee/include
#include "net/net_utils_base.h" // monero/contrib/epee/include
#include "rpc/scanner/commands.h"
#include "rpc/scanner/connection.h"
#include "rpc/scanner/read_commands.h"
#include "rpc/scanner/write_commands.h"
#include "scanner.h"
namespace lws { namespace rpc { namespace scanner
{
namespace
{
//! Use remote scanning only if users-per-local-thread exceeds this
constexpr const std::size_t remote_threshold = 100;
//! Threshold for resetting/replacing state instead of pushing
constexpr const std::size_t replace_threshold = 10000;
//! \brief Handler for server to initialize new scanner
struct initialize_handler
{
using input = initialize;
static bool handle(const std::shared_ptr<server_connection>& self, input msg);
};
//! \brief Handler for request to update accounts
struct update_accounts_handler
{
using input = update_accounts;
static bool handle(const std::shared_ptr<server_connection>& self, input msg);
};
using command = bool(*)(const std::shared_ptr<server_connection>&);
} // anonymous
//! \brief Context/state for remote `monero-lws-scanner` instance.
struct server_connection : connection
{
const std::shared_ptr<server> parent_;
std::size_t threads_; //!< Number of scan threads at remote process
public:
explicit server_connection(std::shared_ptr<server> parent, boost::asio::io_service& io)
: connection(io),
parent_(std::move(parent)),
threads_(0)
{
if (!parent_)
MONERO_THROW(common_error::kInvalidArgument, "nullptr parent");
}
//! \return Handlers for commands from client
static const std::array<command, 2>& commands() noexcept
{
static constexpr const std::array<command, 2> value{{
call<initialize_handler, server_connection>,
call<update_accounts_handler, server_connection>
}};
static_assert(initialize_handler::input::id() == 0);
static_assert(update_accounts_handler::input::id() == 1);
return value;
}
//! Cancels pending operations and "pushes" accounts to other processes
void cleanup()
{
base_cleanup();
}
};
namespace
{
bool initialize_handler::handle(const std::shared_ptr<server_connection>& self, const input msg)
{
if (!self)
return false;
assert(self->strand_.running_in_this_thread());
if (self->threads_)
{
MERROR("Client ( " << self->remote_endpoint() << ") invoked initialize twice, closing connection");
return false;
}
if (!msg.threads)
{
MERROR("Client (" << self->remote_endpoint() << ") intialized with 0 threads");
return false;
}
if (!self->parent_->check_pass(msg.pass))
{
MERROR("Client (" << self->remote_endpoint() << ") provided invalid pass");
return false;
}
self->threads_ = boost::numeric_cast<std::size_t>(msg.threads);
server::replace_users(self->parent_);
return true;
}
bool update_accounts_handler::handle(const std::shared_ptr<server_connection>& self, input msg)
{
if (!self)
return false;
if (msg.users.empty())
return true;
server::store(self->parent_, std::move(msg.users), std::move(msg.blocks));
return true;
}
} // anonymous
class server::acceptor : public boost::asio::coroutine
{
std::shared_ptr<server> self_;
std::shared_ptr<server_connection> next_;
public:
explicit acceptor(std::shared_ptr<server> self)
: boost::asio::coroutine(), self_(std::move(self)), next_(nullptr)
{}
void operator()(const boost::system::error_code& error = {})
{
if (!self_ || error)
{
if (error == boost::asio::error::operation_aborted)
return; // exiting
MONERO_THROW(error, "server acceptor failed");
}
assert(self_->strand_.running_in_this_thread());
BOOST_ASIO_CORO_REENTER(*this)
{
for (;;)
{
next_ = std::make_shared<server_connection>(self_, GET_IO_SERVICE(self_->check_timer_));
BOOST_ASIO_CORO_YIELD self_->acceptor_.async_accept(next_->sock_, self_->strand_.wrap(*this));
MINFO("New connection to " << next_->remote_endpoint() << " / " << next_.get());
self_->remote_.emplace(next_);
read_commands(std::move(next_));
}
}
}
};
struct server::check_users
{
std::shared_ptr<server> self_;
void operator()(const boost::system::error_code& error = {}) const
{
if (!self_ || error == boost::asio::error::operation_aborted)
return;
assert(self_->strand_.running_in_this_thread());
self_->check_timer_.expires_from_now(account_poll_interval);
self_->check_timer_.async_wait(self_->strand_.wrap(*this));
std::size_t total_threads = self_->local_.size();
std::vector<std::shared_ptr<server_connection>> remotes{};
remotes.reserve(self_->remote_.size());
for (auto& remote : self_->remote_)
{
auto conn = remote.lock();
if (!conn)
{
// connection loss detected, re-shuffle accounts
self_->do_replace_users();
return;
}
if (std::numeric_limits<std::size_t>::max() - total_threads < conn->threads_)
MONERO_THROW(error::configuration, "Exceeded max threads (size_t) across all systems");
total_threads += conn->threads_;
remotes.push_back(std::move(conn));
}
if (!total_threads)
{
MWARNING("Currently no worker threads, waiting for new clients");
return;
}
auto reader = self_->disk_.start_read(std::move(self_->read_txn_));
if (!reader)
{
if (reader.matches(std::errc::no_lock_available))
{
MWARNING("Failed to open DB read handle, retrying later");
return;
}
MONERO_THROW(reader.error(), "Failed to open DB read handle");
}
auto current_users = MONERO_UNWRAP(
reader->get_accounts(db::account_status::active, std::move(self_->accounts_cur_))
);
if (current_users.count() < self_->active_.size())
{
// a shrinking user base, re-shuffle
self_->do_replace_users();
return;
}
std::vector<db::account_id> active_copy = self_->active_;
std::vector<lws::account> new_accounts;
for (auto user = current_users.make_iterator(); !user.is_end(); ++user)
{
const db::account_id user_id = user.get_value<MONERO_FIELD(db::account, id)>();
const auto loc = std::lower_bound(active_copy.begin(), active_copy.end(), user_id);
if (loc == active_copy.end() || *loc != user_id)
{
new_accounts.push_back(MONERO_UNWRAP(reader->get_full_account(user.get_value<db::account>())));
if (replace_threshold < new_accounts.size())
{
self_->do_replace_users();
return;
}
self_->active_.insert(
std::lower_bound(self_->active_.begin(), self_->active_.end(), user_id),
user_id
);
}
else
active_copy.erase(loc);
}
if (!active_copy.empty())
{
self_->do_replace_users();
return;
}
self_->next_thread_ %= total_threads;
while (!new_accounts.empty())
{
if (self_->next_thread_ < self_->local_.size())
{
self_->local_[self_->next_thread_]->push_accounts(
std::make_move_iterator(new_accounts.end() - 1),
std::make_move_iterator(new_accounts.end())
);
new_accounts.erase(new_accounts.end() - 1);
++self_->next_thread_;
}
else
{
std::size_t j = 0;
for (auto offset = self_->local_.size(); j < remotes.size(); ++j)
{
if (self_->next_thread_ <= offset)
break;
offset += remotes[j]->threads_;
}
const auto user_count = std::min(new_accounts.size(), remotes[j]->threads_);
std::vector<lws::account> next{
std::make_move_iterator(new_accounts.end() - user_count),
std::make_move_iterator(new_accounts.end())
};
new_accounts.erase(new_accounts.end() - user_count);
write_command(remotes[j], push_accounts{std::move(next)});
self_->next_thread_ += remotes[j]->threads_;
}
self_->next_thread_ %= total_threads;
}
self_->read_txn_ = reader->finish_read();
self_->accounts_cur_ = current_users.give_cursor();
}
};
void server::do_replace_users()
{
assert(strand_.running_in_this_thread());
MINFO("Updating/replacing user account(s) on worker thread(s)");
std::size_t remaining_threads = local_.size();
std::vector<std::shared_ptr<server_connection>> remotes;
remotes.reserve(remote_.size());
for (auto remote = remote_.begin(); remote != remote_.end(); )
{
auto conn = remote->lock();
if (conn)
{
if (std::numeric_limits<std::size_t>::max() - remaining_threads < conn->threads_)
MONERO_THROW(error::configuration, "Exceeded max threads (size_t) across all systems");
remaining_threads += conn->threads_;
remotes.push_back(std::move(conn));
++remote;
}
else
remote = remote_.erase(remote);
}
if (!remaining_threads)
{
MWARNING("Currently no worker threads, waiting for new clients");
return;
}
std::vector<db::account_id> active{};
std::vector<db::account> users{};
auto reader = MONERO_UNWRAP(disk_.start_read());
{
auto active_users = MONERO_UNWRAP(reader.get_accounts(db::account_status::active));
const auto active_count = active_users.count();
active.reserve(active_count);
users.reserve(active_count);
for (auto user : active_users.make_range())
{
active.insert(std::lower_bound(active.begin(), active.end(), user.id), user.id);
users.insert(std::lower_bound(users.begin(), users.end(), user, by_height{}), user);
}
}
// if under `remote_threshold` users per thread, use local scanning only
if (local_.size() && (users.size() / local_.size()) < remote_threshold)
remaining_threads = local_.size();
// make sure to notify of zero users too!
for (auto& local : local_)
{
const auto user_count = users.size() / remaining_threads;
std::vector<lws::account> next{};
next.reserve(user_count);
for (std::size_t j = 0; !users.empty() && j < user_count; ++j)
{
next.push_back(MONERO_UNWRAP(reader.get_full_account(users.back())));
users.erase(users.end() - 1);
}
local->replace_accounts(std::move(next));
--remaining_threads;
}
// make sure to notify of zero users too!
for (auto& remote : remotes)
{
const auto users_per_thread = users.size() / std::max(std::size_t(1), remaining_threads);
const auto user_count = std::max(std::size_t(1), users_per_thread) * remote->threads_;
std::vector<lws::account> next{};
next.reserve(user_count);
for (std::size_t j = 0; !users.empty() && j < user_count; ++j)
{
next.push_back(MONERO_UNWRAP(reader.get_full_account(users.back())));
users.erase(users.end() - 1);
}
write_command(remote, replace_accounts{std::move(next)});
remaining_threads -= std::min(remaining_threads, remote->threads_);
}
next_thread_ = 0;
active_ = std::move(active);
}
boost::asio::ip::tcp::endpoint server::get_endpoint(const std::string& address)
{
std::string host;
std::string port;
{
const auto split = address.rfind(':');
if (split == std::string::npos)
{
host = "0.0.0.0";
port = address;
}
else
{
host = address.substr(0, split);
port = address.substr(split + 1);
}
}
return boost::asio::ip::tcp::endpoint{
boost::asio::ip::address::from_string(host), boost::lexical_cast<unsigned short>(port)
};
}
server::server(boost::asio::io_service& io, db::storage disk, rpc::client zclient, std::vector<std::shared_ptr<queue>> local, std::vector<db::account_id> active, ssl_verification_t webhook_verify)
: strand_(io),
check_timer_(io),
acceptor_(io),
remote_(),
local_(std::move(local)),
active_(std::move(active)),
disk_(std::move(disk)),
zclient_(std::move(zclient)),
read_txn_{},
accounts_cur_{},
next_thread_(0),
pass_hashed_(),
pass_salt_(),
webhook_verify_(webhook_verify)
{
std::sort(active_.begin(), active_.end());
for (const auto& local : local_)
{
if (!local)
MONERO_THROW(common_error::kInvalidArgument, "given nullptr local queue");
}
std::memset(pass_hashed_.data(), 0, pass_hashed_.size());
randombytes_buf(pass_salt_.data(), pass_salt_.size());
}
server::~server() noexcept
{}
bool server::check_pass(const std::string& pass) const noexcept
{
std::array<unsigned char, 32> out;
std::memset(out.data(), 0, out.size());
compute_hash(out, pass);
return sodium_memcmp(out.data(), pass_hashed_.data(), out.size()) == 0;
}
void server::compute_hash(std::array<unsigned char, 32>& out, const std::string& pass) const noexcept
{
if (out.size() < crypto_pwhash_BYTES_MIN)
MONERO_THROW(error::crypto_failure, "Invalid output size");
if (crypto_pwhash_BYTES_MAX < out.size())
MONERO_THROW(error::crypto_failure, "Invalid output size");
if (pass.size() < crypto_pwhash_PASSWD_MIN && crypto_pwhash_PASSWD_MAX < pass.size())
MONERO_THROW(error::crypto_failure, "Invalid password size");
if (crypto_pwhash(out.data(), out.size(), pass.data(), pass.size(), pass_salt_.data(),
crypto_pwhash_OPSLIMIT_MIN, crypto_pwhash_MEMLIMIT_MIN, crypto_pwhash_ALG_DEFAULT) != 0)
MONERO_THROW(error::crypto_failure, "Failed password hashing");
}
void server::start_acceptor(const std::shared_ptr<server>& self, const std::string& address, std::string pass)
{
if (!self)
MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
if (address.empty())
return;
auto endpoint = get_endpoint(address);
self->strand_.dispatch([self, endpoint = std::move(endpoint), pass = std::move(pass)] ()
{
self->acceptor_.close();
self->acceptor_.open(endpoint.protocol());
self->acceptor_.bind(endpoint);
self->acceptor_.listen();
MINFO("Listening at " << endpoint << " for scanner clients");
self->compute_hash(self->pass_hashed_, pass);
acceptor{std::move(self)}();
});
}
void server::start_user_checking(const std::shared_ptr<server>& self)
{
if (!self)
MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
self->strand_.dispatch(check_users{self});
}
void server::replace_users(const std::shared_ptr<server>& self)
{
if (!self)
MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
self->strand_.dispatch([self] () { self->do_replace_users(); });
}
void server::store(const std::shared_ptr<server>& self, std::vector<lws::account> users, std::vector<crypto::hash> blocks)
{
if (!self)
MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
std::sort(users.begin(), users.end(), by_height{});
self->strand_.dispatch([self, users = std::move(users), blocks = std::move(blocks)] ()
{
const lws::scanner_options opts{self->webhook_verify_, false, false};
if (!lws::user_data::store(self->disk_, self->zclient_, epee::to_span(blocks), epee::to_span(users), nullptr, opts))
GET_IO_SERVICE(self->check_timer_).stop();
});
}
}}} // lws // rpc // scanner

110
src/rpc/scanner/server.h Normal file
View file

@ -0,0 +1,110 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <array>
#include <boost/asio/io_service.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp>
#include <memory>
#include <set>
#include <sodium/crypto_pwhash.h>
#include <string>
#include "db/fwd.h"
#include "db/storage.h"
#include "net/net_ssl.h" // monero/contrib/epee/include
#include "rpc/client.h"
#include "rpc/scanner/queue.h"
namespace lws { namespace rpc { namespace scanner
{
//! Checking frequency for local user db changes
constexpr const std::chrono::seconds account_poll_interval{10};
using ssl_verification_t = epee::net_utils::ssl_verification_t;
struct server_connection;
/*!
\brief Manages local and remote scanning for the primary daemon.
\note HTTP and ZMQ were not used because a two-way messaging system were
needed (basically a REST server on either end). */
class server
{
boost::asio::io_service::strand strand_;
boost::asio::steady_timer check_timer_;
boost::asio::ip::tcp::acceptor acceptor_;
std::set<std::weak_ptr<server_connection>, std::owner_less<std::weak_ptr<server_connection>>> remote_;
std::vector<std::shared_ptr<queue>> local_;
std::vector<db::account_id> active_;
db::storage disk_;
rpc::client zclient_;
lmdb::suspended_txn read_txn_;
db::cursor::accounts accounts_cur_;
std::size_t next_thread_;
std::array<unsigned char, 32> pass_hashed_;
std::array<unsigned char, crypto_pwhash_SALTBYTES> pass_salt_;
const ssl_verification_t webhook_verify_;
//! Async acceptor routine
class acceptor;
struct check_users;
//! Reset `local_` and `remote_` scanners. Must be called in `strand_`.
void do_replace_users();
public:
static boost::asio::ip::tcp::endpoint get_endpoint(const std::string& address);
explicit server(boost::asio::io_service& io, db::storage disk, rpc::client zclient, std::vector<std::shared_ptr<queue>> local, std::vector<db::account_id> active, ssl_verification_t webhook_verify);
server(const server&) = delete;
server(server&&) = delete;
~server() noexcept;
server& operator=(const server&) = delete;
server& operator=(server&&) = delete;
//! \return True if `pass` matches expected
bool check_pass(const std::string& pass) const noexcept;
void compute_hash(std::array<unsigned char, 32>& out, const std::string& pass) const noexcept;
//! Start listening for incoming connections on `address`.
static void start_acceptor(const std::shared_ptr<server>& self, const std::string& address, std::string pass);
//! Start timed checks of local DB for change in user state
static void start_user_checking(const std::shared_ptr<server>& self);
//! Replace users/accounts on all local and remote threads
static void replace_users(const std::shared_ptr<server>& self);
//! Update `users` information on local DB
static void store(const std::shared_ptr<server>& self, std::vector<lws::account> users, std::vector<crypto::hash> blocks);
};
}}} // lws // rpc // scanner

View file

@ -0,0 +1,55 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "write_commands.h"
#include <cstring>
#include <limits>
namespace lws { namespace rpc { namespace scanner
{
epee::byte_slice complete_command(const std::uint8_t id, epee::byte_stream sink)
{
if (sink.size() < sizeof(header))
{
MERROR("Message sink was unexpectedly shrunk on message");
return nullptr;
}
using value_type = header::length_type::value_type;
if (std::numeric_limits<value_type>::max() < sink.size() - sizeof(header))
{
MERROR("Message to exceeds max size");
return nullptr;
}
const header head{0, id, header::length_type{value_type(sink.size() - sizeof(header))}};
std::memcpy(sink.data(), std::addressof(head), sizeof(head));
return epee::byte_slice{std::move(sink)};
}
}}} // lws // rpc // scanner

View file

@ -0,0 +1,209 @@
// Copyright (c) 2024, The Monero Project
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <boost/asio/coroutine.hpp>
#include <boost/asio/write.hpp>
#include <chrono>
#include <memory>
#include <system_error>
#include <type_traits>
#include <vector>
#include "byte_slice.h" // monero/contrib/epee/include
#include "byte_stream.h" // monero/contrib/epee/include
#include "commands.h"
#include "common/expect.h"// monero/src
#include "crypto/hash.h" // monero/src
#include "db/account.h"
#include "misc_log_ex.h"
#include "rpc/scanner/commands.h"
#include "rpc/scanner/connection.h"
#include "wire/msgpack/write.h"
namespace lws { namespace rpc { namespace scanner
{
constexpr const std::size_t max_write_buffers = 100;
/* \brief ASIO handler for write timeouts
\tparam T concept requirements:
* Must be derived from `lws::rpc::scanner::connection`.
* Must have `cleanup()` function that invokes `base_cleanup()`, and
does any other necessary work given that the socket connection is being
terminated. */
template<typename T>
struct timeout
{
static_assert(std::is_base_of<connection, T>{});
std::shared_ptr<T> self_;
void operator()(const boost::system::error_code& error) const
{
if (self_ && error != boost::asio::error::operation_aborted)
{
assert(self_->strand_.running_in_this_thread());
MERROR("Write timeout on socket (" << self_->remote_endpoint() << ")");
self_->cleanup();
}
}
};
/*! \brief ASIO coroutine for write client OR server commands.
\tparam T concept requirements:
* Must be derived from `lws::rpc::scanner::connection`.
* Must have `cleanup()` function that invokes `base_cleanup()`, and
does any other necessary work given that the socket connection is being
terminated. */
template<typename T>
class write_buffers : public boost::asio::coroutine
{
static_assert(std::is_base_of<connection, T>{});
std::shared_ptr<T> self_;
public:
explicit write_buffers(std::shared_ptr<T> self)
: boost::asio::coroutine(), self_(std::move(self))
{}
write_buffers(write_buffers&&) = default;
write_buffers(const write_buffers&) = default;
void operator()(const boost::system::error_code& error = {}, std::size_t = 0)
{
if (!self_)
return;
assert(self_->strand_.running_in_this_thread());
if (error)
{
if (error != boost::asio::error::operation_aborted)
{
MERROR("Write error on socket (" << self_->remote_endpoint() << "): " << error.message());
self_->cleanup();
}
self_->write_timeout_.cancel();
return;
}
if (self_->cleanup_)
return; // callback queued before cancellation
BOOST_ASIO_CORO_REENTER(*this)
{
while (!self_->write_bufs_.empty())
{
self_->write_timeout_.expires_from_now(std::chrono::seconds{10});
self_->write_timeout_.async_wait(self_->strand_.wrap(timeout<T>{self_}));
BOOST_ASIO_CORO_YIELD boost::asio::async_write(self_->sock_, self_->write_buffer(), self_->strand_.wrap(*this));
self_->write_timeout_.cancel();
self_->write_bufs_.pop_front();
}
}
}
};
//! \return Completed message using `sink` as source.
epee::byte_slice complete_command(std::uint8_t id, epee::byte_stream sink);
/*! Writes "raw" `header` then `data` as msgpack, and queues for writing to
`self`. Also starts ASIO async writing (via `write_buffers`) if the queue
was empty before queueing `data`.
\tparam T must meet concept requirements for `T` outlined in
`write_commands`.
\tparam U concept requirements:
* must be serializable to msgpack using `wire` engine.
* must have static function `id` which returns an `std::uint8_t` to
identify the command on the remote side. */
template<typename T, typename U>
void write_command(const std::shared_ptr<T>& self, const U& data)
{
static_assert(std::is_base_of<connection, T>{});
if (!self)
MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
epee::byte_slice msg = nullptr;
try
{
epee::byte_stream sink{};
sink.put_n(0, sizeof(header));
// use integer keys for speed (default to_bytes uses strings)
wire::msgpack_slice_writer dest{std::move(sink), true};
wire_write::bytes(dest, data);
msg = complete_command(U::id(), dest.take_sink());
}
catch (const wire::exception& e)
{
MERROR("Failed to serialize msgpack for remote (" << self.get() << ") command: " << e.what());
throw; // this should rarely happen, so just shutdown
}
if (msg.empty())
{
self->cleanup();
return;
}
class queue_slice
{
std::shared_ptr<T> self_;
epee::byte_slice msg_;
public:
explicit queue_slice(std::shared_ptr<T> self, epee::byte_slice msg)
: self_(std::move(self)), msg_(std::move(msg))
{}
queue_slice(queue_slice&&) = default;
queue_slice(const queue_slice& rhs)
: self_(rhs.self_), msg_(rhs.msg_.clone())
{}
void operator()()
{
if (!self_)
return;
const bool queue = self_->write_bufs_.empty();
self_->write_bufs_.push_back(std::move(msg_));
if (queue)
write_buffers{self_}();
else if (max_write_buffers <= self_->write_bufs_.size())
{
MERROR("Exceeded max buffer size for connection: " << self_->remote_endpoint());
self_->cleanup();
}
}
};
self->strand_.dispatch(queue_slice{self, std::move(msg)});
}
}}} // lws // rpc // scanner

View file

@ -60,6 +60,8 @@
#include "rpc/json.h"
#include "rpc/lws_pub.h"
#include "rpc/message_data_structs.h" // monero/src
#include "rpc/scanner/queue.h"
#include "rpc/scanner/server.h"
#include "rpc/webhook.h"
#include "util/blocks.h"
#include "util/source_location.h"
@ -72,8 +74,6 @@
namespace lws
{
std::atomic<bool> scanner::running{true};
// Not in `rates.h` - defaulting to JSON output seems odd
std::ostream& operator<<(std::ostream& out, lws::rates const& src)
{
@ -87,54 +87,24 @@ namespace lws
{
namespace net = epee::net_utils;
constexpr const std::chrono::seconds account_poll_interval{10};
constexpr const std::chrono::minutes block_rpc_timeout{2};
constexpr const std::chrono::seconds send_timeout{30};
constexpr const std::chrono::seconds sync_rpc_timeout{30};
struct thread_sync
{
boost::mutex sync;
boost::condition_variable user_poll;
std::atomic<bool> update;
};
struct options
{
net::ssl_verification_t webhook_verify;
bool enable_subaddresses;
bool untrusted_daemon;
};
struct thread_data
{
explicit thread_data(rpc::client client, db::storage disk, std::vector<lws::account> users, options opts)
: client(std::move(client)), disk(std::move(disk)), users(std::move(users)), opts(opts)
explicit thread_data(rpc::client client, db::storage disk, std::vector<lws::account> users, std::shared_ptr<rpc::scanner::queue> queue, scanner_options opts)
: client(std::move(client)), disk(std::move(disk)), users(std::move(users)), queue(std::move(queue)), opts(std::move(opts))
{}
rpc::client client;
db::storage disk;
std::vector<lws::account> users;
options opts;
std::shared_ptr<rpc::scanner::queue> queue;
scanner_options opts;
};
// until we have a signal-handler safe notification system
void checked_wait(const std::chrono::nanoseconds wait)
{
static constexpr const std::chrono::milliseconds interval{500};
const auto start = std::chrono::steady_clock::now();
while (scanner::is_running())
{
const auto current = std::chrono::steady_clock::now() - start;
if (wait <= current)
break;
const auto sleep_time = std::min(wait - current, std::chrono::nanoseconds{interval});
boost::this_thread::sleep_for(boost::chrono::nanoseconds{sleep_time.count()});
}
}
bool is_new_block(std::string&& chain_msg, db::storage& disk, const account& user)
bool is_new_block(std::string&& chain_msg, std::optional<db::storage>& disk, const account& user)
{
const auto chain = rpc::minimal_chain_pub::from_json(std::move(chain_msg));
if (!chain)
@ -146,7 +116,13 @@ namespace lws
if (user.scan_height() < db::block_id(chain->top_block_height))
return true;
auto reader = disk.start_read();
if (!disk)
{
MWARNING("Assuming new block - no access to local DB");
return true;
}
auto reader = disk->start_read();
if (!reader)
{
MWARNING("Failed to start DB read: " << reader.error());
@ -179,7 +155,7 @@ namespace lws
{
rpc::send_webhook(client, events, "json-full-payment_hook:", "msgpack-full-payment_hook:", std::chrono::seconds{5}, verify_mode);
}
std::size_t get_target_time(db::block_id height)
{
const hardfork_t* fork = nullptr;
@ -221,15 +197,7 @@ namespace lws
{
rpc::send_webhook(client, events, "json-full-spend_hook:", "msgpack-full-spend_hook:", std::chrono::seconds{5}, verify_mode);
}
struct by_height
{
bool operator()(account const& left, account const& right) const noexcept
{
return left.scan_height() < right.scan_height();
}
};
struct add_spend
{
void operator()(lws::account& user, const db::spend& spend) const
@ -322,12 +290,12 @@ namespace lws
expect<db::storage_reader> reader;
db::cursor::subaddress_indexes cur;
subaddress_reader(db::storage const& disk, const bool enable_subaddresses)
subaddress_reader(std::optional<db::storage> const& disk, const bool enable_subaddresses)
: reader(common_error::kInvalidArgument), cur(nullptr)
{
if (enable_subaddresses)
if (disk && enable_subaddresses)
{
reader = disk.start_read();
reader = disk->start_read();
if (!reader)
MERROR("Subadress lookup failure: " << reader.error().message());
}
@ -588,7 +556,7 @@ namespace lws
scan_transaction_base(users, height, timestamp, tx_hash, tx, out_ids, reader, add_spend{}, add_output{});
}
void scan_transactions(std::string&& txpool_msg, epee::span<lws::account> users, db::storage const& disk, rpc::client& client, const options& opts)
void scan_transactions(std::string&& txpool_msg, epee::span<lws::account> users, db::storage const& disk, rpc::client& client, const scanner_options& opts)
{
// uint64::max is for txpool
static const std::vector<std::uint64_t> fake_outs(
@ -605,7 +573,7 @@ namespace lws
const auto time =
boost::numeric_cast<std::uint64_t>(std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()));
subaddress_reader reader{disk, opts.enable_subaddresses};
subaddress_reader reader{std::optional<db::storage>{disk.clone()}, opts.enable_subaddresses};
send_webhook sender{disk, client, opts.webhook_verify};
for (const auto& tx : parsed->txes)
scan_transaction_base(users, db::block_id::txpool, time, crypto::hash{}, tx, fake_outs, reader, null_spend{}, sender);
@ -620,49 +588,106 @@ namespace lws
MINFO("Updated exchange rates: " << *(*new_rates));
}
void scan_loop(thread_sync& self, std::shared_ptr<thread_data> data, const bool untrusted_daemon, const bool leader_thread) noexcept
void do_scan_loop(scanner_sync& self, std::shared_ptr<thread_data> data, const bool leader_thread) noexcept
{
try
struct stop_
{
scanner_sync& self;
~stop_() { self.stop(); }
} stop{self};
// thread entry point, so wrap everything in `try { } catch (...) {}`
try
{
// boost::thread doesn't support move-only types + attributes
rpc::client client{std::move(data->client)};
db::storage disk{std::move(data->disk)};
std::vector<lws::account> users{std::move(data->users)};
const options opts = std::move(data->opts);
assert(!users.empty());
assert(std::is_sorted(users.begin(), users.end(), by_height{}));
const std::shared_ptr<rpc::scanner::queue> queue{std::move(data->queue)};
const scanner_options opts{std::move(data->opts)};
data.reset();
struct stop_
{
thread_sync& self;
~stop_() noexcept
{
self.update = true;
self.user_poll.notify_one();
}
} stop{self};
if (!queue)
return;
// RPC server assumes that `start_height == 0` means use
while (self.is_running())
{
if (!users.empty())
{
auto new_client = MONERO_UNWRAP(client.clone());
MONERO_UNWRAP(new_client.watch_scan_signals());
user_data store_local{disk.clone()};
if (!scanner::loop(self.stop_, std::move(store_local), disk.clone(), std::move(new_client), std::move(users), *queue, opts, leader_thread))
return;
}
users.clear();
auto status = queue->wait_for_accounts();
if (status.replace)
users = std::move(*status.replace);
users.insert(
users.end(),
std::make_move_iterator(status.push.begin()),
std::make_move_iterator(status.push.end())
);
}
}
catch (std::exception const& e)
{
self.shutdown();
MERROR(e.what());
}
catch (...)
{
self.shutdown();
MERROR("Unknown exception");
}
}
} // anonymous
scanner::scanner(db::storage disk)
: disk_(std::move(disk)), sync_(), signals_(sync_.io_)
{
signals_.add(SIGINT);
signals_.async_wait([this] (const boost::system::error_code& error, int)
{
if (error != boost::asio::error::operation_aborted)
shutdown();
});
}
scanner::~scanner()
{}
bool scanner::loop(const std::atomic<bool>& stop, store_func store, std::optional<db::storage> disk, rpc::client client, std::vector<lws::account> users, rpc::scanner::queue& queue, const scanner_options& opts, const bool leader_thread)
{
if (users.empty())
return true;
{ // previous `try` block; leave to prevent git blame spam
std::sort(users.begin(), users.end(), by_height{});
/// RPC server assumes that `start_height == 0` means use
// block ids. This technically skips genesis block.
cryptonote::rpc::GetBlocksFast::Request req{};
req.start_height = std::uint64_t(users.begin()->scan_height());
req.start_height = std::max(std::uint64_t(1), req.start_height);
req.prune = !untrusted_daemon;
req.prune = !opts.untrusted_daemon;
epee::byte_slice block_request = rpc::client::make_message("get_blocks_fast", req);
if (!send(client, block_request.clone()))
return;
return false;
std::vector<crypto::hash> blockchain{};
std::vector<db::pow_sync> new_pow{};
db::pow_window pow_window{};
const db::block_info last_checkpoint = db::storage::get_last_checkpoint();
const db::block_id last_pow = MONERO_UNWRAP(MONERO_UNWRAP(disk.start_read()).get_last_pow_block()).id;
while (!self.update && scanner::is_running())
db::block_id last_pow{};
if (opts.untrusted_daemon && disk)
last_pow = MONERO_UNWRAP(MONERO_UNWRAP(disk->start_read()).get_last_pow_block()).id;
while (!stop)
{
blockchain.clear();
new_pow.clear();
@ -674,7 +699,7 @@ namespace lws
if (timeout)
MWARNING("Block retrieval timeout, resetting scanner");
if (timeout || resp.matches(std::errc::interrupted))
return;
return false;
MONERO_THROW(resp.error(), "Failed to retrieve blocks from daemon");
}
@ -682,7 +707,7 @@ namespace lws
if (!fetched)
{
MERROR("Failed to retrieve next blocks: " << fetched.error().message() << ". Resetting state and trying again");
return;
return false;
}
if (fetched->blocks.empty())
@ -691,35 +716,46 @@ namespace lws
if (fetched->start_height != req.start_height)
{
MWARNING("Daemon sent wrong blocks, resetting state");
return;
return false;
}
{
expect<std::vector<lws::account>> new_accounts = client.pull_accounts();
if (!new_accounts)
bool resort = false;
auto status = queue.get_accounts();
if (status.replace && status.replace->empty() && status.push.empty())
return true; // no work explictly given, leave
if (status.replace)
{
MERROR("Failed to pull new accounts: " << new_accounts.error().message());
return; // get all active accounts the easy way
MINFO("Received " << status.replace->size() << " replacement account(s) for scanning");
users = std::move(*status.replace);
resort = true;
}
if (!new_accounts->empty())
if (!status.push.empty())
{
MINFO("Received " << new_accounts->size() << " new account(s) for scanning");
std::sort(new_accounts->begin(), new_accounts->end(), by_height{});
const db::block_id oldest = new_accounts->front().scan_height();
MINFO("Received " << status.push.size() << " new account(s) for scanning");
users.insert(
users.end(),
std::make_move_iterator(new_accounts->begin()),
std::make_move_iterator(new_accounts->end())
std::make_move_iterator(status.push.begin()),
std::make_move_iterator(status.push.end())
);
resort = true;
}
if (resort)
{
assert(!users.empty()); // by logic from above
std::sort(users.begin(), users.end(), by_height{});
const db::block_id oldest = users.front().scan_height();
if (std::uint64_t(oldest) < fetched->start_height)
{
req.start_height = std::uint64_t(oldest);
block_request = rpc::client::make_message("get_blocks_fast", req);
if (!send(client, block_request.clone()))
return;
return false;
continue; // to next get_blocks_fast read
}
// else, the oldest new account is within the newly fetched range
// else, the oldest new account is within the newly fetch range
}
}
@ -734,7 +770,7 @@ namespace lws
{
expect<std::vector<std::pair<rpc::client::topic, std::string>>> new_pubs = client.wait_for_block();
if (new_pubs.matches(std::errc::interrupted))
return; // reset entire state (maybe shutdown)
return false; // reset entire state (maybe shutdown)
if (!new_pubs)
break; // exit wait for block loop, and try fetching new blocks
@ -747,9 +783,9 @@ namespace lws
auto message = new_pubs->begin();
for ( ; message != new_pubs->end(); ++message)
{
if (message->first != rpc::client::topic::txpool)
if (!disk || message->first != rpc::client::topic::txpool)
break; // inner for loop
scan_transactions(std::move(message->second), epee::to_mut_span(users), disk, client, opts);
scan_transactions(std::move(message->second), epee::to_mut_span(users), *disk, client, opts);
}
for ( ; message != new_pubs->end(); ++message)
@ -761,19 +797,19 @@ namespace lws
// request next chunk of blocks
if (!send(client, block_request.clone()))
return;
return false;
continue; // to next get_blocks_fast read
} // if only one block was fetched
// request next chunk of blocks
if (!send(client, block_request.clone()))
return;
return false;
if (fetched->blocks.size() != fetched->output_indices.size())
throw std::runtime_error{"Bad daemon response - need same number of blocks and indices"};
blockchain.push_back(cryptonote::get_block_hash(fetched->blocks.front().block));
if (untrusted_daemon)
if (opts.untrusted_daemon)
new_pow.push_back(db::pow_sync{fetched->blocks.front().block.timestamp});
auto blocks = epee::to_mut_span(fetched->blocks);
@ -788,10 +824,10 @@ namespace lws
else
fetched->start_height = 0;
if (untrusted_daemon)
if (disk && opts.untrusted_daemon)
{
pow_window = MONERO_UNWRAP(
MONERO_UNWRAP(disk.start_read()).get_pow_window(db::block_id(fetched->start_height))
MONERO_UNWRAP(disk->start_read()).get_pow_window(db::block_id(fetched->start_height))
);
}
@ -826,7 +862,7 @@ namespace lws
reader
);
if (untrusted_daemon)
if (opts.untrusted_daemon)
{
if (block.prev_id != blockchain.back())
MONERO_THROW(error::bad_blockchain, "A blocks prev_id does not match");
@ -838,19 +874,19 @@ namespace lws
pow_window.median_timestamps.erase(pow_window.median_timestamps.begin());
// longhash takes a while, check is_running
if (!scanner::is_running())
return;
if (stop)
return false;
diff = cryptonote::next_difficulty(pow_window.pow_timestamps, pow_window.cumulative_diffs, get_target_time(db::block_id(fetched->start_height)));
// skip POW hashing if done previously
if (last_pow < db::block_id(fetched->start_height))
if (disk && last_pow < db::block_id(fetched->start_height))
{
if (!verify_timestamp(block.timestamp, pow_window.median_timestamps))
MONERO_THROW(error::bad_blockchain, "Block failed timestamp check - possible chain forgery");
const crypto::hash pow =
get_block_longhash(get_block_hashing_blob(block), db::block_id(fetched->start_height), block.major_version, disk, initial_height, epee::to_span(blockchain));
get_block_longhash(get_block_hashing_blob(block), db::block_id(fetched->start_height), block.major_version, *disk, initial_height, epee::to_span(blockchain));
if (!cryptonote::check_hash(pow, diff))
MONERO_THROW(error::bad_blockchain, "Block had too low difficulty");
}
@ -862,7 +898,7 @@ namespace lws
for (auto tx_data : boost::combine(block.tx_hashes, txes, indices))
{
if (untrusted_daemon)
if (opts.untrusted_daemon)
{
if (cryptonote::get_transaction_hash(boost::get<1>(tx_data)) != boost::get<0>(tx_data))
MONERO_THROW(error::bad_blockchain, "Hash of transaction does not match hash in block");
@ -879,7 +915,7 @@ namespace lws
);
}
if (untrusted_daemon)
if (opts.untrusted_daemon)
{
const auto last_difficulty =
pow_window.cumulative_diffs.empty() ?
@ -895,255 +931,130 @@ namespace lws
} // for each block
reader.reader = std::error_code{common_error::kInvalidArgument}; // cleanup reader before next write
auto updated = disk.update(
users.front().scan_height(), epee::to_span(blockchain), epee::to_span(users), epee::to_span(new_pow)
);
if (!updated)
{
if (updated == lws::error::blockchain_reorg)
{
MINFO("Blockchain reorg detected, resetting state");
return;
}
MONERO_THROW(updated.error(), "Failed to update accounts on disk");
}
if (!store(client, epee::to_span(blockchain), epee::to_span(users), epee::to_span(new_pow), opts))
return false;
if (untrusted_daemon && leader_thread && fetched->start_height % 4 == 0 && last_pow < db::block_id(fetched->start_height))
// TODO
if (opts.untrusted_daemon && leader_thread && fetched->start_height % 4 == 0 && last_pow < db::block_id(fetched->start_height))
{
MINFO("On chain with hash " << blockchain.back() << " and difficulty " << diff << " at height " << fetched->start_height);
}
MINFO("Processed " << blocks.size() << " block(s) against " << users.size() << " account(s)");
send_payment_hook(client, epee::to_span(updated->confirm_pubs), opts.webhook_verify);
send_spend_hook(client, epee::to_span(updated->spend_pubs), opts.webhook_verify);
if (updated->accounts_updated != users.size())
{
MWARNING("Only updated " << updated->accounts_updated << " account(s) out of " << users.size() << ", resetting");
return;
}
for (account& user : users)
user.updated(db::block_id(fetched->start_height));
// Publish when all scan threads have past this block
if (!blockchain.empty() && client.has_publish())
rpc::publish_scanned(client, blockchain.back(), epee::to_span(users));
}
}
catch (std::exception const& e)
{
scanner::stop();
MERROR(e.what());
}
catch (...)
{
scanner::stop();
MERROR("Unknown exception");
}
}
lws::account prep_account(db::storage_reader& reader, const lws::db::account& user)
{
std::vector<std::pair<db::output_id, db::address_index>> receives{};
std::vector<crypto::public_key> pubs{};
auto receive_list = MONERO_UNWRAP(reader.get_outputs(user.id));
const std::size_t elems = receive_list.count();
receives.reserve(elems);
pubs.reserve(elems);
for (auto output = receive_list.make_iterator(); !output.is_end(); ++output)
{
auto id = output.get_value<MONERO_FIELD(db::output, spend_meta.id)>();
auto subaddr = output.get_value<MONERO_FIELD(db::output, recipient)>();
receives.emplace_back(std::move(id), std::move(subaddr));
pubs.emplace_back(output.get_value<MONERO_FIELD(db::output, pub)>());
}
return lws::account{user, std::move(receives), std::move(pubs)};
}
return false;
} // end scan_loop
namespace
{
/*!
Launches `thread_count` threads to run `scan_loop`, and then polls for
active account changes in background
*/
void check_loop(db::storage disk, rpc::context& ctx, std::size_t thread_count, std::vector<lws::account> users, std::vector<db::account_id> active, const options opts)
void check_loop(scanner_sync& self, db::storage disk, rpc::context& ctx, const std::size_t thread_count, const std::string& lws_server_addr, std::string lws_server_pass, std::vector<lws::account> users, std::vector<db::account_id> active, const scanner_options& opts)
{
assert(0 < thread_count);
assert(0 < users.size());
assert(users.size() == active.size());
assert(thread_count || !lws_server_addr.empty());
assert(!thread_count || !users.empty());
thread_sync self{};
std::vector<boost::thread> threads{};
threads.reserve(thread_count);
std::vector<std::shared_ptr<rpc::scanner::queue>> queues;
queues.resize(thread_count);
struct join_
{
thread_sync& self;
std::vector<boost::thread>& threads;
scanner_sync& self;
rpc::context& ctx;
std::vector<std::shared_ptr<rpc::scanner::queue>>& queues;
std::vector<boost::thread>& threads;
~join_() noexcept
{
self.update = true;
ctx.raise_abort_scan();
self.stop();
if (self.has_shutdown())
ctx.raise_abort_process();
else
ctx.raise_abort_scan();
for (const auto& queue : queues)
{
if (queue)
queue->stop();
}
for (auto& thread : threads)
thread.join();
}
} join{self, threads, ctx};
} join{self, ctx, queues, threads};
/*
The algorithm here is extremely basic. Users are divided evenly amongst
the configurable thread count, and grouped by scan height. If an old
account appears, some accounts (grouped on that thread) will be delayed
in processing waiting for that account to catch up. Its not the greatest,
but this "will have to do" for the first cut.
Its not expected that many people will be running
"enterprise level" of nodes where accounts are constantly added.
but this "will have to do" - but we're getting closer to fixing that
too.
Another "issue" is that each thread works independently instead of more
cooperatively for scanning. This requires a bit more synchronization, so
was left for later. Its likely worth doing to reduce the number of
transfers from the daemon, and the bottleneck on the writes into LMDB.
If the active user list changes, all threads are stopped/joined, and
everything is re-started.
*/
self.stop_ = false;
boost::thread::attributes attrs;
attrs.set_stack_size(THREAD_STACK_SIZE);
threads.reserve(thread_count);
std::sort(users.begin(), users.end(), by_height{});
// enable the new bind point before registering pull accounts
lws::rpc::account_push pusher = MONERO_UNWRAP(ctx.bind_push());
MINFO("Starting scan loops on " << thread_count << " thread(s) with " << users.size() << " account(s)");
MINFO("Starting scan loops on " << std::min(thread_count, users.size()) << " thread(s) with " << users.size() << " account(s)");
bool leader_thread = true;
bool remaining_threads = true;
while (!users.empty() && --thread_count)
for (std::size_t i = 0; i < queues.size(); ++i)
{
const std::size_t per_thread = std::max(std::size_t(1), users.size() / (thread_count + 1));
const std::size_t count = std::min(per_thread, users.size());
queues[i] = std::make_shared<rpc::scanner::queue>();
// this can create threads with no active accounts, they just wait
const std::size_t count = users.size() / (queues.size() - i);
std::vector<lws::account> thread_users{
std::make_move_iterator(users.end() - count), std::make_move_iterator(users.end())
};
users.erase(users.end() - count, users.end());
rpc::client client = MONERO_UNWRAP(ctx.connect());
MONERO_UNWRAP(client.watch_scan_signals());
MONERO_UNWRAP(client.enable_pull_accounts());
auto data = std::make_shared<thread_data>(
std::move(client), disk.clone(), std::move(thread_users), opts
MONERO_UNWRAP(ctx.connect()), disk.clone(), std::move(thread_users), queues[i], opts
);
threads.emplace_back(attrs, std::bind(&scan_loop, std::ref(self), std::move(data), opts.untrusted_daemon, leader_thread));
leader_thread = false;
threads.emplace_back(attrs, std::bind(&do_scan_loop, std::ref(self), std::move(data), i == 0));
}
if (!users.empty())
{
rpc::client client = MONERO_UNWRAP(ctx.connect());
MONERO_UNWRAP(client.watch_scan_signals());
MONERO_UNWRAP(client.enable_pull_accounts());
users.clear();
users.shrink_to_fit();
auto data = std::make_shared<thread_data>(
std::move(client), disk.clone(), std::move(users), opts
{
auto server = std::make_shared<rpc::scanner::server>(
self.io_,
disk.clone(),
MONERO_UNWRAP(ctx.connect()),
queues,
std::move(active),
opts.webhook_verify
);
threads.emplace_back(attrs, std::bind(&scan_loop, std::ref(self), std::move(data), opts.untrusted_daemon, leader_thread));
remaining_threads = false;
rpc::scanner::server::start_user_checking(server);
if (!lws_server_addr.empty())
rpc::scanner::server::start_acceptor(std::move(server), lws_server_addr, std::move(lws_server_pass));
}
auto last_check = std::chrono::steady_clock::now();
lmdb::suspended_txn read_txn{};
db::cursor::accounts accounts_cur{};
boost::unique_lock<boost::mutex> lock{self.sync};
while (scanner::is_running())
{
update_rates(ctx);
for (;;)
{
//! \TODO use signalfd + ZMQ? Windows is the difficult case...
self.user_poll.wait_for(lock, boost::chrono::seconds{1});
if (self.update || !scanner::is_running())
return;
auto this_check = std::chrono::steady_clock::now();
if (account_poll_interval <= (this_check - last_check))
{
last_check = this_check;
break;
}
}
auto reader = disk.start_read(std::move(read_txn));
if (!reader)
{
if (reader.matches(std::errc::no_lock_available))
{
MWARNING("Failed to open DB read handle, retrying later");
continue;
}
MONERO_THROW(reader.error(), "Failed to open DB read handle");
}
auto current_users = MONERO_UNWRAP(
reader->get_accounts(db::account_status::active, std::move(accounts_cur))
);
if (current_users.count() < active.size())
{
// cannot remove accounts via ZMQ (yet)
MINFO("Decrease in active user accounts detected, stopping scan threads...");
return;
}
std::vector<db::account_id> active_copy = active;
std::vector<lws::account> new_;
for (auto user = current_users.make_iterator(); !user.is_end(); ++user)
{
const db::account_id user_id = user.get_value<MONERO_FIELD(db::account, id)>();
const auto loc = std::lower_bound(active_copy.begin(), active_copy.end(), user_id);
if (loc == active_copy.end() || *loc != user_id)
{
new_.emplace_back(prep_account(*reader, user.get_value<db::account>()));
active.insert(
std::lower_bound(active.begin(), active.end(), user_id), user_id
);
}
else
active_copy.erase(loc);
}
if (!active_copy.empty())
{
MINFO("Change in active user accounts detected, stopping scan threads...");
return;
}
if (!new_.empty())
{
if (remaining_threads)
{
MINFO("Received new account(s), starting more thread(s)");
return;
}
const auto pushed = pusher.push(epee::to_span(new_), std::chrono::seconds{1});
if (!pushed)
{
MERROR("Failed to push new account to workers: " << pushed.error().message());
return; // pull in new accounts by resetting state
}
else
MINFO("Pushed " << new_.size() << " new accounts to worker thread(s)");
}
read_txn = reader->finish_read();
accounts_cur = current_users.give_cursor();
} // while scanning
// Blocks until sigint, local scanner issue, or exception
self.io_.run();
}
template<typename R, typename Q>
expect<typename R::response> fetch_chain(rpc::client& client, const char* endpoint, const Q& req)
expect<typename R::response> fetch_chain(const scanner_sync& self, rpc::client& client, const char* endpoint, const Q& req)
{
expect<void> sent{lws::error::daemon_timeout};
@ -1152,7 +1063,7 @@ namespace lws
while (!(sent = client.send(std::move(msg), std::chrono::seconds{1})))
{
if (!scanner::is_running())
if (self.has_shutdown())
return {lws::error::signal_abort_process};
if (sync_rpc_timeout <= (std::chrono::steady_clock::now() - start))
@ -1167,7 +1078,7 @@ namespace lws
while (!(resp = client.get_message(std::chrono::seconds{1})))
{
if (!scanner::is_running())
if (self.has_shutdown())
return {lws::error::signal_abort_process};
if (sync_rpc_timeout <= (std::chrono::steady_clock::now() - start))
@ -1180,7 +1091,7 @@ namespace lws
}
// does not validate blockchain hashes
expect<rpc::client> sync_quick(db::storage disk, rpc::client client)
expect<rpc::client> sync_quick(const scanner_sync& self, db::storage disk, rpc::client client)
{
MINFO("Starting blockchain sync with daemon");
@ -1193,7 +1104,7 @@ namespace lws
if (req.known_hashes.empty())
return {lws::error::bad_blockchain};
auto resp = fetch_chain<rpc::get_hashes_fast>(client, "get_hashes_fast", req);
auto resp = fetch_chain<rpc::get_hashes_fast>(self, client, "get_hashes_fast", req);
if (!resp)
return resp.error();
@ -1219,7 +1130,7 @@ namespace lws
}
// validates blockchain hashes
expect<rpc::client> sync_full(db::storage disk, rpc::client client)
expect<rpc::client> sync_full(const scanner_sync& self, db::storage disk, rpc::client client)
{
MINFO("Starting blockchain sync with daemon");
@ -1235,7 +1146,7 @@ namespace lws
if (req.block_ids.empty())
return {lws::error::bad_blockchain};
auto resp = fetch_chain<rpc::get_blocks_fast>(client, "get_blocks_fast", req);
auto resp = fetch_chain<rpc::get_blocks_fast>(self, client, "get_blocks_fast", req);
if (!resp)
return resp.error();
@ -1297,7 +1208,7 @@ namespace lws
pow_window.median_timestamps.erase(pow_window.median_timestamps.begin());
// longhash takes a while, check is_running
if (!scanner::is_running())
if (self.has_shutdown())
return {error::signal_abort_process};
diff = cryptonote::next_difficulty(pow_window.pow_timestamps, pow_window.cumulative_diffs, get_target_time(height));
@ -1342,37 +1253,119 @@ namespace lws
}
} // anonymous
expect<rpc::client> scanner::sync(db::storage disk, rpc::client client, const bool untrusted_daemon)
bool user_data::store(db::storage& disk, rpc::client& client, const epee::span<const crypto::hash> chain, const epee::span<const lws::account> users, const epee::span<const db::pow_sync> pow, const scanner_options& opts)
{
if (untrusted_daemon)
return sync_full(std::move(disk), std::move(client));
return sync_quick(std::move(disk), std::move(client));
if (users.empty())
return true;
if (!std::is_sorted(users.begin(), users.end(), by_height{}))
throw std::logic_error{"users must be sorted!"};
auto updated = disk.update(users[0].scan_height(), chain, users, pow);
if (!updated)
{
if (updated == lws::error::blockchain_reorg)
{
MINFO("Blockchain reorg detected, resetting state");
return false;
}
MONERO_THROW(updated.error(), "Failed to update accounts on disk");
}
MINFO("Processed " << chain.size() << " block(s) against " << users.size() << " account(s)");
send_payment_hook(client, epee::to_span(updated->confirm_pubs), opts.webhook_verify);
send_spend_hook(client, epee::to_span(updated->spend_pubs), opts.webhook_verify);
if (updated->accounts_updated != users.size())
{
MWARNING("Only updated " << updated->accounts_updated << " account(s) out of " << users.size() << ", resetting");
return false;
}
// Publish when all scan threads have past this block
// only address is printed from users, so height doesn't need updating
if (!chain.empty() && client.has_publish())
rpc::publish_scanned(client, chain[chain.size() - 1], epee::to_span(users));
return true;
}
void scanner::run(db::storage disk, rpc::context ctx, std::size_t thread_count, const epee::net_utils::ssl_verification_t webhook_verify, const bool enable_subaddresses, const bool untrusted_daemon)
bool user_data::operator()(rpc::client& client, const epee::span<const crypto::hash> chain, const epee::span<const lws::account> users, const epee::span<const db::pow_sync> pow, const scanner_options& opts)
{
thread_count = std::max(std::size_t(1), thread_count);
return store(disk_, client, chain, users, pow, opts);
}
expect<rpc::client> scanner::sync(rpc::client client, const bool untrusted_daemon)
{
if (has_shutdown())
MONERO_THROW(common_error::kInvalidArgument, "this has shutdown");
if (untrusted_daemon)
return sync_full(sync_, disk_.clone(), std::move(client));
return sync_quick(sync_, disk_.clone(), std::move(client));
}
void scanner::run(rpc::context ctx, std::size_t thread_count, const std::string& lws_server_addr, std::string lws_server_pass, const scanner_options& opts)
{
if (has_shutdown())
MONERO_THROW(common_error::kInvalidArgument, "this has shutdown");
if (!lws_server_addr.empty() && (opts.enable_subaddresses || opts.untrusted_daemon))
MONERO_THROW(error::configuration, "Cannot use remote scanner with subaddresses or untrusted daemon");
if (lws_server_addr.empty())
thread_count = std::max(std::size_t(1), thread_count);
/*! \NOTE Be careful about references and lifetimes of the callbacks. The
ones below are safe because no `io_service::run()` call is after the
destruction of the references.
\NOTE That `ctx` will need a strand or lock if multiple
`io_service::run()` calls are used. */
boost::asio::steady_timer rate_timer{sync_.io_};
class rate_updater
{
boost::asio::steady_timer& rate_timer_;
rpc::context& ctx_;
const std::chrono::minutes rate_interval_;
public:
explicit rate_updater(boost::asio::steady_timer& rate_timer, rpc::context& ctx)
: rate_timer_(rate_timer), ctx_(ctx), rate_interval_(ctx.cache_interval())
{}
void operator()(const boost::system::error_code& error = {}) const
{
update_rates(ctx_);
rate_timer_.expires_from_now(rate_interval_);
rate_timer_.async_wait(*this);
}
std::chrono::minutes rate_interval() const noexcept { return rate_interval_; }
};
{
rate_updater updater{rate_timer, ctx};
if (std::chrono::minutes{0} < updater.rate_interval())
updater();
}
rpc::client client{};
for (;;)
{
const auto last = std::chrono::steady_clock::now();
update_rates(ctx);
std::vector<db::account_id> active;
std::vector<lws::account> users;
if (thread_count)
{
MINFO("Retrieving current active account list");
auto reader = MONERO_UNWRAP(disk.start_read());
auto reader = MONERO_UNWRAP(disk_.start_read());
auto accounts = MONERO_UNWRAP(
reader.get_accounts(db::account_status::active)
);
for (db::account user : accounts.make_range())
{
users.emplace_back(prep_account(reader, user));
users.emplace_back(MONERO_UNWRAP(reader.get_full_account(user)));
active.insert(
std::lower_bound(active.begin(), active.end(), user.id), user.id
);
@ -1381,21 +1374,27 @@ namespace lws
reader.finish_read();
} // cleanup DB reader
if (users.empty())
if (thread_count && users.empty())
{
MINFO("No active accounts");
checked_wait(account_poll_interval - (std::chrono::steady_clock::now() - last));
boost::asio::steady_timer poll{sync_.io_};
poll.expires_from_now(rpc::scanner::account_poll_interval);
poll.async_wait([] (boost::system::error_code) {});
sync_.io_.run_one();
}
else
check_loop(disk.clone(), ctx, thread_count, std::move(users), std::move(active), options{webhook_verify, enable_subaddresses, untrusted_daemon});
check_loop(sync_, disk_.clone(), ctx, thread_count, lws_server_addr, lws_server_pass, std::move(users), std::move(active), opts);
if (!scanner::is_running())
sync_.io_.reset();
if (has_shutdown())
return;
if (!client)
client = MONERO_UNWRAP(ctx.connect());
expect<rpc::client> synced = sync(disk.clone(), std::move(client), untrusted_daemon);
expect<rpc::client> synced = sync(std::move(client), opts.untrusted_daemon);
if (!synced)
{
if (!synced.matches(std::errc::timed_out))

View file

@ -27,37 +27,109 @@
#pragma once
#include <atomic>
#include <boost/asio/io_service.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/optional/optional.hpp>
#include <cstdint>
#include <string>
#include "db/fwd.h"
#include "db/storage.h"
#include "net/net_ssl.h" // monero/contrib/epee/include
#include "rpc/client.h"
#include "rpc/scanner/fwd.h"
#include "span.h" // monero/contrib/epee/include
namespace lws
{
struct scanner_options
{
epee::net_utils::ssl_verification_t webhook_verify;
bool enable_subaddresses;
bool untrusted_daemon;
};
//! Used in `scan_loop` by server
class user_data
{
db::storage disk_;
public:
user_data(db::storage disk)
: disk_(std::move(disk))
{}
user_data(user_data const& rhs)
: disk_(rhs.disk_.clone())
{}
user_data(user_data&& rhs)
: disk_(std::move(rhs.disk_))
{}
/*! Store updated accounts locally (`disk`), and send ZMQ/RMQ/webhook
events. `users` must be sorted by height (lowest first). */
static bool store(db::storage& disk, rpc::client& zclient, epee::span<const crypto::hash> chain, epee::span<const lws::account> users, epee::span<const db::pow_sync> pow, const scanner_options&);
//! `users` must be sorted by height (lowest first)
bool operator()(rpc::client& zclient, epee::span<const crypto::hash> chain, epee::span<const lws::account> users, epee::span<const db::pow_sync> pow, const scanner_options&);
};
struct scanner_sync
{
boost::asio::io_service io_;
std::atomic<bool> stop_; //!< Stop scanning but do not shutdown
std::atomic<bool> shutdown_; //!< Exit scanner::run
explicit scanner_sync()
: io_(), stop_(false), shutdown_(false)
{}
bool is_running() const noexcept { return !stop_ && !shutdown_; }
bool has_shutdown() const noexcept { return shutdown_; }
void stop() { stop_ = true; io_.stop(); }
void shutdown() { shutdown_ = true; stop(); }
};
//! Scans all active `db::account`s. Detects if another process changes active list.
class scanner
{
static std::atomic<bool> running;
scanner() = delete;
db::storage disk_;
scanner_sync sync_;
boost::asio::signal_set signals_; //!< Detect SIGINT requested shutdown
public:
//! Register `SIGINT` handler and keep a copy of `disk`
explicit scanner(db::storage disk);
~scanner();
//! Callback for storing user account (typically local lmdb, but perhaps remote rpc)
using store_func = std::function<bool(rpc::client&, epee::span<const crypto::hash>, epee::span<const lws::account>, epee::span<const db::pow_sync>, const scanner_options&)>;
/*! Run _just_ the inner scanner loop while `self.is_running() == true`.
*
\throw std::exception on hard errors (shutdown) conditions
\return True iff `queue` indicates thread now has zero accounts. False
indicates a soft, typically recoverable error. */
static bool loop(const std::atomic<bool>& stop, store_func store, std::optional<db::storage> disk, rpc::client client, std::vector<lws::account> users, rpc::scanner::queue& queue, const scanner_options& opts, bool leader_thread);
//! Use `client` to sync blockchain data, and \return client if successful.
static expect<rpc::client> sync(db::storage disk, rpc::client client, const bool untrusted_daemon = false);
expect<rpc::client> sync(rpc::client client, const bool untrusted_daemon = false);
//! Poll daemon until `stop()` is called, using `thread_count` threads.
static void run(db::storage disk, rpc::context ctx, std::size_t thread_count, epee::net_utils::ssl_verification_t webhook_verify, bool enable_subaddresses, bool untrusted_daemon = false);
//! Poll daemon until `shutdown()` is called, using `thread_count` threads.
void run(rpc::context ctx, std::size_t thread_count, const std::string& server_addr, std::string server_pass, const scanner_options&);
//! \return True if `stop()` has never been called.
static bool is_running() noexcept { return running; }
//! \return True iff `stop()` and `shutdown()` has never been called
bool is_running() const noexcept { return sync_.is_running(); }
//! Stops all scanner instances globally.
static void stop() noexcept { running = false; }
//! \return True if `shutdown()` has been been called.
bool has_shutdown() const noexcept { return sync_.has_shutdown(); }
//! For testing, \post is_running() == true
static void reset() noexcept { running = true; }
//! Stop scan threads, but do not shutdown scanner.
void stop() { sync_.stop(); }
// Stop scan threads AND shutdown scanner.
void shutdown() { sync_.shutdown(); }
};
} // lws

View file

@ -66,6 +66,8 @@ namespace
#endif
const command_line::arg_descriptor<std::vector<std::string>> rest_servers;
const command_line::arg_descriptor<std::vector<std::string>> admin_rest_servers;
const command_line::arg_descriptor<std::string> lws_server_addr;
const command_line::arg_descriptor<std::string> lws_server_pass;
const command_line::arg_descriptor<std::string> rest_ssl_key;
const command_line::arg_descriptor<std::string> rest_ssl_cert;
const command_line::arg_descriptor<std::size_t> rest_threads;
@ -111,6 +113,8 @@ namespace
#endif
, rest_servers{"rest-server", "[(https|http)://<address>:]<port>[/<prefix>] for incoming connections, multiple declarations allowed"}
, admin_rest_servers{"admin-rest-server", "[(https|http])://<address>:]<port>[/<prefix>] for incoming admin connections, multiple declarations allowed"}
, lws_server_addr{"lws-server-addr", "[<ip>:]<port> to listen for lws-clients", ""}
, lws_server_pass{"lws-server-pass", "Password for lws-clients connecting to server", ""}
, rest_ssl_key{"rest-ssl-key", "<path> to PEM formatted SSL key for https REST server", ""}
, rest_ssl_cert{"rest-ssl-certificate", "<path> to PEM formatted SSL certificate (chains supported) for https REST server", ""}
, rest_threads{"rest-threads", "Number of threads to process REST connections", 1}
@ -135,6 +139,8 @@ namespace
lws::options::prepare(description);
command_line::add_arg(description, daemon_rpc);
command_line::add_arg(description, daemon_sub);
command_line::add_arg(description, lws_server_addr);
command_line::add_arg(description, lws_server_pass);
command_line::add_arg(description, zmq_pub);
#ifdef MLWS_RMQ_ENABLED
command_line::add_arg(description, rmq_address);
@ -167,6 +173,8 @@ namespace
std::string db_path;
std::vector<std::string> rest_servers;
std::vector<std::string> admin_rest_servers;
std::string lws_server_addr;
std::string lws_server_pass;
lws::rest_server::configuration rest_config;
std::string daemon_rpc;
std::string daemon_sub;
@ -236,11 +244,13 @@ namespace
command_line::get_arg(args, opts.db_path),
command_line::get_arg(args, opts.rest_servers),
command_line::get_arg(args, opts.admin_rest_servers),
command_line::get_arg(args, opts.lws_server_addr),
command_line::get_arg(args, opts.lws_server_pass),
lws::rest_server::configuration{
{command_line::get_arg(args, opts.rest_ssl_key), command_line::get_arg(args, opts.rest_ssl_cert)},
command_line::get_arg(args, opts.access_controls),
command_line::get_arg(args, opts.rest_threads),
command_line::get_arg(args, opts.max_subaddresses),
command_line::get_arg(args, opts.max_subaddresses),
webhook_verify,
command_line::get_arg(args, opts.external_bind),
command_line::get_arg(args, opts.disable_admin_auth),
@ -266,8 +276,12 @@ namespace
command_line::get_arg(args, opts.untrusted_daemon)
};
if (!prog.lws_server_addr.empty() && (prog.rest_config.max_subaddresses || prog.untrusted_daemon))
MONERO_THROW(lws::error::configuration, "Remote scanning cannot be used with subaddresses or untrusted daemon");
prog.rest_config.threads = std::max(std::size_t(1), prog.rest_config.threads);
prog.scan_threads = std::max(std::size_t(1), prog.scan_threads);
if (prog.lws_server_addr.empty())
prog.scan_threads = std::max(std::size_t(1), prog.scan_threads);
if (command_line::is_arg_defaulted(args, opts.daemon_rpc))
prog.daemon_rpc = options::get_default_zmq();
@ -277,19 +291,20 @@ namespace
void run(program prog)
{
std::signal(SIGINT, [] (int) { lws::scanner::stop(); });
boost::filesystem::create_directories(prog.db_path);
auto disk = lws::db::storage::open(prog.db_path.c_str(), prog.create_queue_max);
auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), std::move(prog.zmq_pub), std::move(prog.rmq), prog.rates_interval, prog.untrusted_daemon);
//! SIGINT handle registered by `scanner` constructor
lws::scanner scanner{disk.clone()};
MINFO("Using monerod ZMQ RPC at " << ctx.daemon_address());
auto client = lws::scanner::sync(disk.clone(), ctx.connect().value(), prog.untrusted_daemon).value();
auto client = scanner.sync(ctx.connect().value(), prog.untrusted_daemon).value();
const auto enable_subaddresses = bool(prog.rest_config.max_subaddresses);
const auto webhook_verify = prog.rest_config.webhook_verify;
lws::rest_server server{
epee::to_span(prog.rest_servers), prog.admin_rest_servers, disk.clone(), std::move(client), std::move(prog.rest_config)
epee::to_span(prog.rest_servers), prog.admin_rest_servers, std::move(disk), std::move(client), std::move(prog.rest_config)
};
for (const std::string& address : prog.rest_servers)
MINFO("Listening for REST clients at " << address);
@ -297,7 +312,13 @@ namespace
MINFO("Listening for REST admin clients at " << address);
// blocks until SIGINT
lws::scanner::run(std::move(disk), std::move(ctx), prog.scan_threads, webhook_verify, enable_subaddresses, prog.untrusted_daemon);
scanner.run(
std::move(ctx),
prog.scan_threads,
std::move(prog.lws_server_addr),
std::move(prog.lws_server_pass),
lws::scanner_options{webhook_verify, enable_subaddresses, prog.untrusted_daemon}
);
}
} // anonymous

View file

@ -223,17 +223,23 @@ namespace
}
return out;
}
void scanner_thread(lws::scanner& scanner, void* ctx, const std::vector<epee::byte_slice>& reply)
{
struct stop_
{
lws::scanner& scanner;
~stop_() { scanner.shutdown(); };
} stop{scanner};
lws_test::rpc_thread(ctx, reply);
}
} // anonymous
namespace lws_test
{
void rpc_thread(void* ctx, const std::vector<epee::byte_slice>& reply)
{
struct stop_
{
~stop_() noexcept { lws::scanner::stop(); };
} stop{};
try
{
net::zmq::socket server{};
@ -321,7 +327,6 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run")
SETUP("lws::rpc::context, ZMQ_REP Server, and lws::db::storage")
{
lws::scanner::reset();
auto rpc =
lws::rpc::context::make(lws_test::rpc_rendevous, {}, {}, {}, std::chrono::minutes{0}, false);
@ -345,9 +350,11 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run")
std::vector<epee::byte_slice> messages{};
messages.push_back(to_json_rpc(1));
boost::thread server_thread(&lws_test::rpc_thread, rpc.zmq_context(), std::cref(messages));
lws::scanner scanner{db.clone()};
boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages));
const join on_scope_exit{server_thread};
EXPECT(!lws::scanner::sync(db.clone(), MONERO_UNWRAP(rpc.connect())));
EXPECT(!scanner.sync(MONERO_UNWRAP(rpc.connect())));
lws_test::test_chain(lest_env, MONERO_UNWRAP(db.start_read()), last_block.id, hashes);
}
@ -377,9 +384,10 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run")
lws_test::test_chain(lest_env, MONERO_UNWRAP(db.start_read()), last_block.id, {hashes.data(), 1});
{
boost::thread server_thread(&lws_test::rpc_thread, rpc.zmq_context(), std::cref(messages));
lws::scanner scanner{db.clone()};
boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages));
const join on_scope_exit{server_thread};
EXPECT(lws::scanner::sync(db.clone(), MONERO_UNWRAP(rpc.connect())));
EXPECT(scanner.sync(MONERO_UNWRAP(rpc.connect())));
lws_test::test_chain(lest_env, MONERO_UNWRAP(db.start_read()), last_block.id, epee::to_span(hashes));
}
@ -400,9 +408,10 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run")
message.hashes.resize(1);
messages.push_back(daemon_response(message));
boost::thread server_thread(&lws_test::rpc_thread, rpc.zmq_context(), std::cref(messages));
lws::scanner scanner{db.clone()};
boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages));
const join on_scope_exit{server_thread};
EXPECT(lws::scanner::sync(db.clone(), MONERO_UNWRAP(rpc.connect())));
EXPECT(scanner.sync(MONERO_UNWRAP(rpc.connect())));
lws_test::test_chain(lest_env, MONERO_UNWRAP(db.start_read()), last_block.id, epee::to_span(hashes));
}
}
@ -507,15 +516,14 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run")
messages.push_back(daemon_response(hmessage));
{
boost::thread server_thread(&lws_test::rpc_thread, rpc.zmq_context(), std::cref(messages));
lws::scanner scanner{db.clone()};
boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages));
const join on_scope_exit{server_thread};
EXPECT(lws::scanner::sync(db.clone(), MONERO_UNWRAP(rpc.connect())));
EXPECT(scanner.sync(MONERO_UNWRAP(rpc.connect())));
lws_test::test_chain(lest_env, MONERO_UNWRAP(db.start_read()), last_block.id, epee::to_span(hashes));
}
}
lws::scanner::reset();
EXPECT(db.add_account(account, keys.m_view_secret_key));
EXPECT(db.add_account(account2, keys2.m_view_secret_key));
@ -526,9 +534,13 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run")
bmessage.output_indices.resize(1);
messages.push_back(daemon_response(bmessage));
{
boost::thread server_thread(&lws_test::rpc_thread, rpc.zmq_context(), std::cref(messages));
static constexpr const lws::scanner_options opts{
epee::net_utils::ssl_verification_t::none, true, false
};
lws::scanner scanner{db.clone()};
boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages));
const join on_scope_exit{server_thread};
lws::scanner::run(db.clone(), std::move(rpc), 1, epee::net_utils::ssl_verification_t::none, true);
scanner.run(std::move(rpc), 1, {}, {}, opts);
}
hashes.push_back(cryptonote::get_block_hash(bmessage.blocks.back().block));

View file

@ -27,6 +27,7 @@
#include <vector>
#include "byte_slice.h" // monero/contrib/epee/include
#include "fwd.h"
namespace lws_test
{