From 97617f92df2b3565eca31232f736a454ee50f307 Mon Sep 17 00:00:00 2001 From: Lee *!* Clagett Date: Fri, 25 Oct 2024 19:28:35 -0400 Subject: [PATCH] Fix several bugs found in new server scanner code (#146) --- src/rpc/scanner/server.cpp | 60 ++++++++++++++++++++++++++++---- src/rpc/scanner/server.h | 10 ++++-- src/rpc/scanner/write_commands.h | 3 +- src/scanner.cpp | 44 ++++++++++++++--------- 4 files changed, 90 insertions(+), 27 deletions(-) diff --git a/src/rpc/scanner/server.cpp b/src/rpc/scanner/server.cpp index 15888c1..1885019 100644 --- a/src/rpc/scanner/server.cpp +++ b/src/rpc/scanner/server.cpp @@ -28,6 +28,7 @@ #include "server.h" #include +#include #include #include #include @@ -163,12 +164,16 @@ namespace lws { namespace rpc { namespace scanner void operator()(const boost::system::error_code& error = {}) { - if (!self_ || error) + if (error) { if (error == boost::asio::error::operation_aborted) return; // exiting MONERO_THROW(error, "server acceptor failed"); } + + if (!self_ || self_->stop_) + return; + assert(self_->strand_.running_in_this_thread()); BOOST_ASIO_CORO_REENTER(*this) { @@ -192,7 +197,7 @@ namespace lws { namespace rpc { namespace scanner void operator()(const boost::system::error_code& error = {}) const { - if (!self_ || error == boost::asio::error::operation_aborted) + if (!self_ || self_->stop_ || error == boost::asio::error::operation_aborted) return; assert(self_->strand_.running_in_this_thread()); @@ -223,7 +228,7 @@ namespace lws { namespace rpc { namespace scanner return; } - auto reader = self_->disk_.start_read(std::move(self_->read_txn_)); + auto reader = self_->disk_.start_read(); if (!reader) { if (reader.matches(std::errc::no_lock_available)) @@ -240,6 +245,8 @@ namespace lws { namespace rpc { namespace scanner if (current_users.count() < self_->active_.size()) { // a shrinking user base, re-shuffle + reader->finish_read(); + self_->accounts_cur_ = current_users.give_cursor(); self_->do_replace_users(); return; } @@ -254,6 +261,8 @@ namespace lws { namespace rpc { namespace scanner new_accounts.push_back(MONERO_UNWRAP(reader->get_full_account(user.get_value()))); if (replace_threshold < new_accounts.size()) { + reader->finish_read(); + self_->accounts_cur_ = current_users.give_cursor(); self_->do_replace_users(); return; } @@ -268,6 +277,8 @@ namespace lws { namespace rpc { namespace scanner if (!active_copy.empty()) { + reader->finish_read(); + self_->accounts_cur_ = current_users.give_cursor(); self_->do_replace_users(); return; } @@ -306,7 +317,7 @@ namespace lws { namespace rpc { namespace scanner self_->next_thread_ %= total_threads; } - self_->read_txn_ = reader->finish_read(); + reader->finish_read(); self_->accounts_cur_ = current_users.give_cursor(); } }; @@ -401,6 +412,28 @@ namespace lws { namespace rpc { namespace scanner active_ = std::move(active); } + void server::do_stop() + { + assert(strand_.running_in_this_thread()); + if (stop_) + return; + + MDEBUG("Stopping rpc::scanner::server async operations"); + boost::system::error_code error{}; + check_timer_.cancel(error); + acceptor_.cancel(error); + acceptor_.close(error); + + for (auto& remote : remote_) + { + const auto conn = remote.lock(); + if (conn) + boost::asio::dispatch(conn->strand_, [conn] () { conn->cleanup(); }); + } + + stop_ = true; + } + boost::asio::ip::tcp::endpoint server::get_endpoint(const std::string& address) { std::string host; @@ -432,12 +465,12 @@ namespace lws { namespace rpc { namespace scanner active_(std::move(active)), disk_(std::move(disk)), zclient_(std::move(zclient)), - read_txn_{}, accounts_cur_{}, next_thread_(0), pass_hashed_(), pass_salt_(), - webhook_verify_(webhook_verify) + webhook_verify_(webhook_verify), + stop_(false) { std::sort(active_.begin(), active_.end()); for (const auto& local : local_) @@ -488,6 +521,9 @@ namespace lws { namespace rpc { namespace scanner { self->acceptor_.close(); self->acceptor_.open(endpoint.protocol()); +#if !defined(_WIN32) + self->acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); +#endif self->acceptor_.bind(endpoint); self->acceptor_.listen(); @@ -522,7 +558,17 @@ namespace lws { namespace rpc { namespace scanner { const lws::scanner_options opts{self->webhook_verify_, false, false}; if (!lws::user_data::store(self->disk_, self->zclient_, epee::to_span(blocks), epee::to_span(users), nullptr, opts)) - GET_IO_SERVICE(self->check_timer_).stop(); + { + self->do_stop(); + self->strand_.context().stop(); + } }); } + + void server::stop(const std::shared_ptr& self) + { + if (!self) + MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); + boost::asio::dispatch(self->strand_, [self] () { self->do_stop(); }); + } }}} // lws // rpc // scanner diff --git a/src/rpc/scanner/server.h b/src/rpc/scanner/server.h index 6be369c..68d4d9a 100644 --- a/src/rpc/scanner/server.h +++ b/src/rpc/scanner/server.h @@ -65,12 +65,12 @@ namespace lws { namespace rpc { namespace scanner std::vector active_; db::storage disk_; rpc::client zclient_; - lmdb::suspended_txn read_txn_; db::cursor::accounts accounts_cur_; std::size_t next_thread_; std::array pass_hashed_; std::array pass_salt_; const ssl_verification_t webhook_verify_; + bool stop_; //! Async acceptor routine class acceptor; @@ -79,6 +79,9 @@ namespace lws { namespace rpc { namespace scanner //! Reset `local_` and `remote_` scanners. Must be called in `strand_`. void do_replace_users(); + //! Stop all async operations + void do_stop(); + public: static boost::asio::ip::tcp::endpoint get_endpoint(const std::string& address); @@ -105,6 +108,9 @@ namespace lws { namespace rpc { namespace scanner static void replace_users(const std::shared_ptr& self); //! Update `users` information on local DB - static void store(const std::shared_ptr& self, std::vector users, std::vector blocks); + static void store(const std::shared_ptr& self, std::vector users, std::vector blocks); + + //! Stop a running instance of all operations + static void stop(const std::shared_ptr& self); }; }}} // lws // rpc // scanner diff --git a/src/rpc/scanner/write_commands.h b/src/rpc/scanner/write_commands.h index d13384b..fbc17b0 100644 --- a/src/rpc/scanner/write_commands.h +++ b/src/rpc/scanner/write_commands.h @@ -27,6 +27,7 @@ #pragma once #include +#include #include #include #include @@ -167,7 +168,7 @@ namespace lws { namespace rpc { namespace scanner if (msg.empty()) { - self->cleanup(); + boost::asio::dispatch(self->strand_, [self] () { self->cleanup(); }); return; } diff --git a/src/scanner.cpp b/src/scanner.cpp index c8ac798..739e21f 100644 --- a/src/scanner.cpp +++ b/src/scanner.cpp @@ -27,6 +27,7 @@ #include "scanner.h" #include +#include #include #include #include @@ -1044,23 +1045,27 @@ namespace lws users.clear(); users.shrink_to_fit(); - { - auto server = std::make_shared( - self.io_, - disk.clone(), - MONERO_UNWRAP(ctx.connect()), - queues, - std::move(active), - opts.webhook_verify - ); + auto server = std::make_shared( + self.io_, + disk.clone(), + MONERO_UNWRAP(ctx.connect()), + queues, + std::move(active), + opts.webhook_verify + ); - rpc::scanner::server::start_user_checking(server); - if (!lws_server_addr.empty()) - rpc::scanner::server::start_acceptor(std::move(server), lws_server_addr, std::move(lws_server_pass)); - } + rpc::scanner::server::start_user_checking(server); + if (!lws_server_addr.empty()) + rpc::scanner::server::start_acceptor(std::move(server), lws_server_addr, std::move(lws_server_pass)); - // Blocks until sigint, local scanner issue, or exception + // Blocks until sigint, local scanner issue, storage issue, or exception self.io_.run(); + self.io_.restart(); + + // Make sure server stops because we could re-start after blockchain sync + rpc::scanner::server::stop(server); + self.io_.poll(); + self.io_.restart(); } template @@ -1396,14 +1401,19 @@ namespace lws boost::asio::steady_timer poll{sync_.io_}; poll.expires_from_now(rpc::scanner::account_poll_interval); - poll.async_wait([] (boost::system::error_code) {}); + const auto ready = poll.async_wait(boost::asio::use_future); - sync_.io_.run_one(); + /* The exchange rates timer could run while waiting, so ensure that + the correct timer was run. */ + while (!has_shutdown() && ready.wait_for(std::chrono::seconds{0}) == std::future_status::timeout) + { + sync_.io_.run_one(); + sync_.io_.restart(); + } } else check_loop(sync_, disk_.clone(), ctx, thread_count, lws_server_addr, lws_server_pass, std::move(users), std::move(active), opts); - sync_.io_.reset(); if (has_shutdown()) return;