diff --git a/README.md b/README.md index d60d60c..4d54241 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ API, and the server will scan for incoming Monero blockchain transactions. Differences from [OpenMonero](https://github.com/moneroexamples/openmonero): - LMDB instead of MySQL - View keys stored in database - scanning occurs continuously in background - - Uses ZeroMQ interface to `monerod` ("push" support coming soon) + - Uses ZeroMQ interface to `monerod` with chain subscription ("push") support - Uses amd64 ASM acceleration from Monero project, if available diff --git a/src/db/storage.cpp b/src/db/storage.cpp index 4c6bb5f..7e90151 100644 --- a/src/db/storage.cpp +++ b/src/db/storage.cpp @@ -288,7 +288,7 @@ namespace db } //! \return Current block hash at `id` using `cur`. - expect get_block_hash(MDB_cursor& cur, block_id id) noexcept + expect do_get_block_hash(MDB_cursor& cur, block_id id) noexcept { MDB_val key = lmdb::to_val(blocks_version); MDB_val value = lmdb::to_val(id); @@ -340,7 +340,7 @@ namespace db /// /// TODO Trim blockchain after a checkpoint has been reached /// - const crypto::hash genesis = MONERO_UNWRAP(get_block_hash(*cur, block_id(0))); + const crypto::hash genesis = MONERO_UNWRAP(do_get_block_hash(*cur, block_id(0))); if (genesis != points.begin()->second) { MONERO_THROW( @@ -383,7 +383,7 @@ namespace db const auto add_block = [&cur, &out] (std::uint64_t id) -> expect { - expect next = get_block_hash(cur, block_id(id)); + expect next = do_get_block_hash(cur, block_id(id)); if (!next) return next.error(); out.push_back(block_info{block_id(id), std::move(*next)}); @@ -493,6 +493,17 @@ namespace db return blocks.get_value(value); } + expect storage_reader::get_block_hash(const block_id height) noexcept + { + MONERO_PRECOND(txn != nullptr); + assert(db != nullptr); + + MONERO_CHECK(check_cursor(*txn, db->tables.blocks, curs.blocks_cur)); + assert(curs.blocks_cur != nullptr); + + return do_get_block_hash(*curs.blocks_cur, height); + } + expect> storage_reader::get_chain_sync() { MONERO_PRECOND(txn != nullptr); @@ -526,8 +537,30 @@ namespace db return accounts.get_value_stream(status, std::move(cur)); } + expect storage_reader::get_account(const account_status status, const account_id id) noexcept + { + MONERO_PRECOND(txn != nullptr); + assert(db != nullptr); + + cursor::accounts cur; + MONERO_CHECK(check_cursor(*txn, db->tables.accounts, cur)); + assert(cur != nullptr); + + MDB_val key = lmdb::to_val(status); + MDB_val value = lmdb::to_val(id); + const int err = mdb_cursor_get(cur.get(), &key, &value, MDB_GET_BOTH); + if (err) + { + if (err == MDB_NOTFOUND) + return {lws::error::account_not_found}; + return {lmdb::error(err)}; + } + + return accounts.get_value(value); + } + expect> - storage_reader::get_account(account_address const& address, cursor::accounts& cur) noexcept + storage_reader::get_account(account_address const& address) noexcept { MONERO_PRECOND(txn != nullptr); assert(db != nullptr); @@ -556,14 +589,7 @@ namespace db if (!lookup) return lookup.error(); - MONERO_CHECK(check_cursor(*txn, db->tables.accounts, cur)); - assert(cur != nullptr); - - key = lmdb::to_val(lookup->status); - value = lmdb::to_val(lookup->id); - MONERO_LMDB_CHECK(mdb_cursor_get(cur.get(), &key, &value, MDB_GET_BOTH)); - - const expect user = accounts.get_value(value); + const expect user = get_account(lookup->status, lookup->id); if (!user) return user.error(); return {{lookup->status, *user}}; @@ -1002,7 +1028,7 @@ namespace db cursor::blocks blocks_cur; MONERO_CHECK(check_cursor(txn, this->db->tables.blocks, blocks_cur)); - expect hash = get_block_hash(*blocks_cur, height); + expect hash = do_get_block_hash(*blocks_cur, height); if (!hash) return hash.error(); @@ -1697,7 +1723,7 @@ namespace db std::min(lmdb::to_native(last_block->id), last_update); const expect hash_check = - get_block_hash(*blocks_cur, block_id(last_same)); + do_get_block_hash(*blocks_cur, block_id(last_same)); if (!hash_check) return hash_check.error(); diff --git a/src/db/storage.h b/src/db/storage.h index 7df4362..522bab0 100644 --- a/src/db/storage.h +++ b/src/db/storage.h @@ -89,6 +89,9 @@ namespace db //! \return Last known block. expect get_last_block() noexcept; + //! \return "Our" block hash at `height`. + expect get_block_hash(const block_id height) noexcept; + //! \return List for `GetHashesFast` to sync blockchain with daemon. expect> get_chain_sync(); @@ -100,16 +103,12 @@ namespace db expect> get_accounts(account_status status, cursor::accounts cur = nullptr) noexcept; - //! \return Info related to `address` or `lmdb::error(MDB_NOT_FOUND)`. - expect> - get_account(account_address const& address, cursor::accounts& cur) noexcept; + //! \return Info for account `id` iff it has `status`. + expect get_account(const account_status status, const account_id id) noexcept; + //! \return Info related to `address`. expect> - get_account(account_address const& address) noexcept - { - cursor::accounts cur; - return get_account(address, cur); - } + get_account(account_address const& address) noexcept; //! \return All outputs received by `id`. expect> diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt index 3646686..934f4be 100644 --- a/src/rpc/CMakeLists.txt +++ b/src/rpc/CMakeLists.txt @@ -26,8 +26,8 @@ # STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -set(monero-lws-rpc_sources client.cpp daemon_zmq.cpp light_wallet.cpp rates.cpp) -set(monero-lws-rpc_headers client.h daemon_zmq.h fwd.h json.h light_wallet.h rates.h) +set(monero-lws-rpc_sources client.cpp daemon_pub.cpp daemon_zmq.cpp light_wallet.cpp rates.cpp) +set(monero-lws-rpc_headers client.h daemon_pub.h daemon_zmq.h fwd.h json.h light_wallet.h rates.h) add_library(monero-lws-rpc ${monero-lws-rpc_sources} ${monero-lws-rpc_headers}) target_link_libraries(monero-lws-rpc monero::libraries monero-lws-wire-json) diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index 6a4a26b..13a399d 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -28,12 +28,14 @@ #include "client.h" #include +#include #include #include #include "common/error.h" // monero/contrib/epee/include #include "error.h" -#include "net/http_client.h" // monero/contrib/epee/include/net +#include "misc_log_ex.h" // monero/contrib/epee/include +#include "net/http_client.h" // monero/contrib/epee/include #include "net/zmq.h" // monero/src namespace lws @@ -47,8 +49,11 @@ namespace rpc constexpr const char signal_endpoint[] = "inproc://signal"; constexpr const char abort_scan_signal[] = "SCAN"; constexpr const char abort_process_signal[] = "PROCESS"; + constexpr const char minimal_chain_topic[] = "json-minimal-chain_main"; constexpr const int daemon_zmq_linger = 0; - + constexpr const std::chrono::seconds chain_poll_timeout{20}; + constexpr const std::chrono::minutes chain_sub_timeout{2}; + struct terminate { void operator()(void* ptr) const noexcept @@ -112,14 +117,14 @@ namespace rpc template expect do_signal(void* signal_pub, const char (&signal)[N]) noexcept { - MONERO_ZMQ_CHECK(zmq_send(signal_pub, signal, sizeof(signal), 0)); + MONERO_ZMQ_CHECK(zmq_send(signal_pub, signal, sizeof(signal) - 1, 0)); return success(); } template expect do_subscribe(void* signal_sub, const char (&signal)[N]) noexcept { - MONERO_ZMQ_CHECK(zmq_setsockopt(signal_sub, ZMQ_SUBSCRIBE, signal, sizeof(signal))); + MONERO_ZMQ_CHECK(zmq_setsockopt(signal_sub, ZMQ_SUBSCRIBE, signal, sizeof(signal) - 1)); return success(); } } // anonymous @@ -128,10 +133,11 @@ namespace rpc { struct context { - explicit context(zcontext comm, socket signal_pub, std::string daemon_addr, std::chrono::minutes interval) + explicit context(zcontext comm, socket signal_pub, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval) : comm(std::move(comm)) , signal_pub(std::move(signal_pub)) , daemon_addr(std::move(daemon_addr)) + , sub_addr(std::move(sub_addr)) , rates_conn() , cache_time() , cache_interval(interval) @@ -144,7 +150,8 @@ namespace rpc zcontext comm; socket signal_pub; - std::string daemon_addr; + const std::string daemon_addr; + const std::string sub_addr; http::http_simple_client rates_conn; std::chrono::steady_clock::time_point cache_time; const std::chrono::minutes cache_interval; @@ -176,14 +183,26 @@ namespace rpc { MONERO_PRECOND(ctx != nullptr); - const int linger = daemon_zmq_linger; + int option = daemon_zmq_linger; client out{std::move(ctx)}; out.daemon.reset(zmq_socket(out.ctx->comm.get(), ZMQ_REQ)); if (out.daemon.get() == nullptr) return net::zmq::get_error_code(); MONERO_ZMQ_CHECK(zmq_connect(out.daemon.get(), out.ctx->daemon_addr.c_str())); - MONERO_ZMQ_CHECK(zmq_setsockopt(out.daemon.get(), ZMQ_LINGER, &linger, sizeof(linger))); + MONERO_ZMQ_CHECK(zmq_setsockopt(out.daemon.get(), ZMQ_LINGER, &option, sizeof(option))); + + if (!out.ctx->sub_addr.empty()) + { + out.daemon_sub.reset(zmq_socket(out.ctx->comm.get(), ZMQ_SUB)); + if (out.daemon_sub.get() == nullptr) + return net::zmq::get_error_code(); + + option = 1; // keep only last pub message from daemon + MONERO_ZMQ_CHECK(zmq_connect(out.daemon_sub.get(), out.ctx->sub_addr.c_str())); + MONERO_ZMQ_CHECK(zmq_setsockopt(out.daemon_sub.get(), ZMQ_CONFLATE, &option, sizeof(option))); + MONERO_CHECK(do_subscribe(out.daemon_sub.get(), minimal_chain_topic)); + } out.signal_sub.reset(zmq_socket(out.ctx->comm.get(), ZMQ_SUB)); if (out.signal_sub.get() == nullptr) @@ -204,12 +223,35 @@ namespace rpc return do_subscribe(signal_sub.get(), abort_scan_signal); } - expect client::wait(std::chrono::seconds timeout) noexcept + expect client::wait_for_block() { MONERO_PRECOND(ctx != nullptr); assert(daemon != nullptr); assert(signal_sub != nullptr); - return do_wait(daemon.get(), signal_sub.get(), 0, timeout); + + if (daemon_sub == nullptr) + { + MONERO_CHECK(do_wait(daemon.get(), signal_sub.get(), 0, chain_poll_timeout)); + return {lws::error::daemon_timeout}; + } + + { + const expect ready = do_wait(daemon_sub.get(), signal_sub.get(), ZMQ_POLLIN, chain_sub_timeout); + if (!ready) + { + if (ready == lws::error::daemon_timeout) + MWARNING("ZeroMQ Pub/Sub chain timeout, check connection settings"); + return ready.error(); + } + } + expect pub = net::zmq::receive(daemon_sub.get(), ZMQ_DONTWAIT); + if (!pub) + return pub.error(); + + if (!boost::string_ref{*pub}.starts_with(minimal_chain_topic)) + return {lws::error::bad_daemon_response}; + pub->erase(0, sizeof(minimal_chain_topic)); + return minimal_chain_pub::from_json(std::move(*pub)); } expect client::send(epee::byte_slice message, std::chrono::seconds timeout) noexcept @@ -243,7 +285,7 @@ namespace rpc return ctx->cached; } - context context::make(std::string daemon_addr, std::chrono::minutes rates_interval) + context context::make(std::string daemon_addr, std::string sub_addr, std::chrono::minutes rates_interval) { zcontext comm{zmq_init(1)}; if (comm == nullptr) @@ -257,7 +299,7 @@ namespace rpc return context{ std::make_shared( - std::move(comm), std::move(pub), std::move(daemon_addr), rates_interval + std::move(comm), std::move(pub), std::move(daemon_addr), std::move(sub_addr), rates_interval ) }; } diff --git a/src/rpc/client.h b/src/rpc/client.h index 92178ae..d403641 100644 --- a/src/rpc/client.h +++ b/src/rpc/client.h @@ -35,8 +35,9 @@ #include "byte_slice.h" // monero/contrib/epee/include #include "common/expect.h" // monero/src -#include "rates.h" #include "rpc/message.h" // monero/src +#include "rpc/daemon_pub.h" +#include "rpc/rates.h" namespace lws { @@ -62,16 +63,17 @@ namespace rpc { std::shared_ptr ctx; detail::socket daemon; + detail::socket daemon_sub; detail::socket signal_sub; - explicit client(std::shared_ptr ctx) - : ctx(std::move(ctx)), daemon(), signal_sub() + explicit client(std::shared_ptr ctx) noexcept + : ctx(std::move(ctx)), daemon(), daemon_sub(), signal_sub() {} public: //! A client with no connection (all send/receive functions fail). explicit client() noexcept - : ctx(), daemon(), signal_sub() + : ctx(), daemon(), daemon_sub(), signal_sub() {} static expect make(std::shared_ptr ctx) noexcept; @@ -103,8 +105,8 @@ namespace rpc //! `wait`, `send`, and `receive` will watch for `raise_abort_scan()`. expect watch_scan_signals() noexcept; - //! Block until `timeout` or until `context::stop()` is invoked. - expect wait(std::chrono::seconds timeout) noexcept; + //! Wait for new block announce or internal timeout. + expect wait_for_block(); //! \return A JSON message for RPC request `M`. template @@ -167,7 +169,7 @@ namespace rpc \param rates_interval Frequency to retrieve exchange rates. Set value to `<= 0` to disable exchange rate retrieval. */ - static context make(std::string daemon_addr, std::chrono::minutes rates_interval); + static context make(std::string daemon_addr, std::string sub_addr, std::chrono::minutes rates_interval); context(context&&) = default; context(context const&) = delete; diff --git a/src/rpc/daemon_pub.cpp b/src/rpc/daemon_pub.cpp new file mode 100644 index 0000000..dcee5ed --- /dev/null +++ b/src/rpc/daemon_pub.cpp @@ -0,0 +1,83 @@ +// Copyright (c) 2020, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "daemon_pub.h" + +#include "wire/crypto.h" +#include "wire/error.h" +#include "wire/field.h" +#include "wire/traits.h" +#include "wire/json/read.h" + +namespace +{ + struct dummy_chain_array + { + using value_type = crypto::hash; + + std::uint64_t count; + std::reference_wrapper id; + + void clear() noexcept {} + void reserve(std::size_t) noexcept {} + + crypto::hash& back() noexcept { return id; } + void emplace_back() { ++count; } + }; +} + +namespace wire +{ + template<> + struct is_array + : std::true_type + {}; +} + +namespace lws +{ +namespace rpc +{ + static void read_bytes(wire::json_reader& src, minimal_chain_pub& self) + { + dummy_chain_array chain{0, std::ref(self.top_block_id)}; + wire::object(src, + wire::field("first_height", std::ref(self.top_block_height)), + wire::field("ids", std::ref(chain)) + ); + + self.top_block_height += chain.count - 1; + if (chain.count == 0) + WIRE_DLOG_THROW(wire::error::schema::binary, "expected at least one block hash"); + } + + expect minimal_chain_pub::from_json(std::string&& source) + { + return wire::json::from_bytes(std::move(source)); + } +} +} diff --git a/src/rpc/daemon_pub.h b/src/rpc/daemon_pub.h new file mode 100644 index 0000000..60ed33d --- /dev/null +++ b/src/rpc/daemon_pub.h @@ -0,0 +1,50 @@ +// Copyright (c) 2020, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include +#include + +#include "common/expect.h" // monero/src +#include "crypto/hash.h" // monero/src +#include "wire/json/fwd.h" + +namespace lws +{ +namespace rpc +{ + //! Represents only the last block listed in "minimal-chain_main" pub. + struct minimal_chain_pub + { + std::uint64_t top_block_height; + crypto::hash top_block_id; + + static expect from_json(std::string&&); + }; +} +} diff --git a/src/scanner.cpp b/src/scanner.cpp index fc3294c..37c197b 100644 --- a/src/scanner.cpp +++ b/src/scanner.cpp @@ -74,7 +74,6 @@ namespace lws namespace { constexpr const std::chrono::seconds account_poll_interval{10}; - constexpr const std::chrono::seconds block_poll_interval{20}; constexpr const std::chrono::minutes block_rpc_timeout{2}; constexpr const std::chrono::seconds send_timeout{30}; constexpr const std::chrono::seconds sync_rpc_timeout{30}; @@ -113,6 +112,28 @@ namespace lws } } + bool is_new_block(db::storage& disk, const account& user, const rpc::minimal_chain_pub& chain) + { + if (user.scan_height() < db::block_id(chain.top_block_height)) + return true; + + auto reader = disk.start_read(); + if (!reader) + { + MWARNING("Failed to start DB read: " << reader.error()); + return true; + } + + // check possible chain rollback daemon side + const expect id = reader->get_block_hash(db::block_id(chain.top_block_height)); + if (!id || *id != chain.top_block_id) + return true; + + // check possible chain rollback from other thread + const expect user_db = reader->get_account(db::account_status::active, user.id()); + return !user_db || user_db->scan_height != user.scan_height(); + } + bool send(rpc::client& client, epee::byte_slice message) { const expect sent = client.send(std::move(message), send_timeout); @@ -352,7 +373,7 @@ namespace lws MWARNING("Block retrieval timeout, retrying"); if (!send(client, block_request.clone())) return; - continue; + continue; // to next get_blocks_fast read } MONERO_THROW(resp.error(), "Failed to retrieve blocks from daemon"); } @@ -367,20 +388,32 @@ namespace lws return; } - // retrieve next blocks in background + // prep for next blocks retrieval req.start_height = fetched.result.start_height + fetched.result.blocks.size() - 1; block_request = rpc::client::make_message("get_blocks_fast", req); - if (!send(client, block_request.clone())) - return; if (fetched.result.blocks.size() <= 1) { - // ... how about some ZMQ push stuff? we can only dream ... - if (client.wait(block_poll_interval).matches(std::errc::interrupted)) + // synced to top of chain, wait for next blocks + for (;;) + { + const expect new_block = client.wait_for_block(); + if (new_block.matches(std::errc::interrupted)) + return; + if (!new_block || is_new_block(disk, users.front(), *new_block)) + break; + } + + // request next chunk of blocks + if (!send(client, block_request.clone())) return; - continue; + continue; // to next get_blocks_fast read } + // request next chunk of blocks + if (!send(client, block_request.clone())) + return; + if (fetched.result.blocks.size() != fetched.result.output_indices.size()) throw std::runtime_error{"Bad daemon response - need same number of blocks and indices"}; diff --git a/src/server_main.cpp b/src/server_main.cpp index 0caaf3a..546c0e4 100644 --- a/src/server_main.cpp +++ b/src/server_main.cpp @@ -55,6 +55,7 @@ namespace struct options : lws::options { const command_line::arg_descriptor daemon_rpc; + const command_line::arg_descriptor daemon_sub; const command_line::arg_descriptor> rest_servers; const command_line::arg_descriptor rest_ssl_key; const command_line::arg_descriptor rest_ssl_cert; @@ -85,6 +86,7 @@ namespace options() : lws::options() , daemon_rpc{"daemon", "://
: of a monerod ZMQ RPC", get_default_zmq()} + , daemon_sub{"sub", "tcp://address:port or ipc://path of a monerod ZMQ Pub", ""} , rest_servers{"rest-server", "[(https|http)://
:] for incoming connections, multiple declarations allowed"} , rest_ssl_key{"rest-ssl-key", " to PEM formatted SSL key for https REST server", ""} , rest_ssl_cert{"rest-ssl-certificate", " to PEM formatted SSL certificate (chains supported) for https REST server", ""} @@ -103,6 +105,7 @@ namespace lws::options::prepare(description); command_line::add_arg(description, daemon_rpc); + command_line::add_arg(description, daemon_sub); description.add_options()(rest_servers.name, boost::program_options::value>()->default_value({rest_default}, rest_default), rest_servers.description); command_line::add_arg(description, rest_ssl_key); command_line::add_arg(description, rest_ssl_cert); @@ -122,6 +125,7 @@ namespace std::vector rest_servers; lws::rest_server::configuration rest_config; std::string daemon_rpc; + std::string daemon_sub; std::chrono::minutes rates_interval; std::size_t scan_threads; unsigned create_queue_max; @@ -171,6 +175,7 @@ namespace command_line::get_arg(args, opts.external_bind) }, command_line::get_arg(args, opts.daemon_rpc), + command_line::get_arg(args, opts.daemon_sub), std::chrono::minutes{command_line::get_arg(args, opts.rates_interval)}, command_line::get_arg(args, opts.scan_threads), command_line::get_arg(args, opts.create_queue_max), @@ -191,7 +196,7 @@ namespace boost::filesystem::create_directories(prog.db_path); auto disk = lws::db::storage::open(prog.db_path.c_str(), prog.create_queue_max); - auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), prog.rates_interval); + auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), prog.rates_interval); MINFO("Using monerod ZMQ RPC at " << ctx.daemon_address()); auto client = lws::scanner::sync(disk.clone(), ctx.connect().value()).value();