Update boost::asio usage to conform to newer standards: (#144)
Some checks failed
unix-ci / build-tests (macos-13, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (macos-13, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (macos-14, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (macos-14, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (macos-latest, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (macos-latest, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (ubuntu-20.04, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (ubuntu-20.04, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (ubuntu-22.04, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (ubuntu-22.04, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (ubuntu-24.04, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (ubuntu-24.04, WITH_RMQ=ON) (push) Has been cancelled
unix-ci / build-tests (ubuntu-latest, WITH_RMQ=OFF) (push) Has been cancelled
unix-ci / build-tests (ubuntu-latest, WITH_RMQ=ON) (push) Has been cancelled

* Convert boost::asio::io_service to boost::asio::io_context
  * Convert strand.wrap(...) to boost::asio::bind_executor(strand, ...)
  * Convert strand.dispatch(...) to boost::asio::dispatch(strand, ...)
  * Convert io_context.reset() to io_context.restart()
  * Convert null_buffers() usage to socket.async_wait(...)
  * Drop usage of GET_IO_SERVICE macro from monero
  * Refactor REST server to manage resources better
This commit is contained in:
Lee *!* Clagett 2024-11-20 10:53:40 -05:00 committed by GitHub
parent ef78f19986
commit 18b5743596
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 259 additions and 197 deletions

View file

@ -294,7 +294,7 @@ namespace
boost::this_thread::sleep_for(boost::chrono::seconds{1}); boost::this_thread::sleep_for(boost::chrono::seconds{1});
self.stop_ = false; self.stop_ = false;
self.io_.reset(); self.io_.restart();
if (self.has_shutdown()) if (self.has_shutdown())
return; return;

View file

@ -86,7 +86,7 @@ namespace net { namespace zmq
} }
} }
expect<async_client> async_client::make(boost::asio::io_service& io, socket zsock) expect<async_client> async_client::make(boost::asio::io_context& io, socket zsock)
{ {
MONERO_PRECOND(zsock != nullptr); MONERO_PRECOND(zsock != nullptr);

View file

@ -28,7 +28,7 @@
#pragma once #pragma once
#include <boost/asio/compose.hpp> #include <boost/asio/compose.hpp>
#include <boost/asio/io_service.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/posix/stream_descriptor.hpp> #include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/system/error_code.hpp> #include <boost/system/error_code.hpp>
#include <cstddef> #include <cstddef>
@ -63,7 +63,7 @@ namespace net { namespace zmq
asocket asock; asocket asock;
bool close; bool close;
static expect<async_client> make(boost::asio::io_service& io, socket zsock); static expect<async_client> make(boost::asio::io_context& io, socket zsock);
}; };
class read_msg_op class read_msg_op
@ -94,7 +94,7 @@ namespace net { namespace zmq
return self.complete(make_error_code(msg.error()), 0); return self.complete(make_error_code(msg.error()), 0);
// try again // try again
sock_->asock->async_read_some(boost::asio::null_buffers(), std::move(self)); sock_->asock->async_wait(boost::asio::posix::stream_descriptor::wait_read, std::move(self));
return; return;
} }
@ -133,7 +133,7 @@ namespace net { namespace zmq
return self.complete(make_error_code(status.error()), 0); return self.complete(make_error_code(status.error()), 0);
// try again // try again
sock_->asock->async_write_some(boost::asio::null_buffers(), std::move(self)); sock_->asock->async_wait(boost::asio::posix::stream_descriptor::wait_write, std::move(self));
return; return;
} }

View file

@ -27,8 +27,10 @@
#include "rest_server.h" #include "rest_server.h"
#include <algorithm> #include <algorithm>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp> #include <boost/asio/steady_timer.hpp>
#include <boost/asio/strand.hpp> #include <boost/asio/strand.hpp>
@ -47,6 +49,9 @@
#include <boost/beast/version.hpp> #include <boost/beast/version.hpp>
#include <boost/optional/optional.hpp> #include <boost/optional/optional.hpp>
#include <boost/range/counting_range.hpp> #include <boost/range/counting_range.hpp>
#include <boost/thread/lock_guard.hpp>
#include <boost/thread/lock_types.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/tss.hpp> #include <boost/thread/tss.hpp>
#include <boost/utility/string_ref.hpp> #include <boost/utility/string_ref.hpp>
#include <cstring> #include <cstring>
@ -82,6 +87,53 @@
namespace lws namespace lws
{ {
struct runtime_options
{
const std::uint32_t max_subaddresses;
const epee::net_utils::ssl_verification_t webhook_verify;
const bool disable_admin_auth;
const bool auto_accept_creation;
};
struct rest_server_data
{
boost::asio::io_context io;
const db::storage disk;
const rpc::client client;
const runtime_options options;
std::vector<net::zmq::async_client> clients;
boost::mutex sync;
rest_server_data(db::storage disk, rpc::client client, runtime_options options)
: io(),
disk(std::move(disk)),
client(std::move(client)),
options(std::move(options)),
clients(),
sync()
{}
expect<net::zmq::async_client> get_async_client(boost::asio::io_context& io)
{
boost::unique_lock<boost::mutex> lock{sync};
if (!clients.empty())
{
net::zmq::async_client out{std::move(clients.back())};
clients.pop_back();
return out;
}
lock.unlock();
return client.make_async_client(io);
}
void store_async_client(net::zmq::async_client&& client)
{
const boost::lock_guard<boost::mutex> lock{sync};
client.close = false;
clients.push_back(std::move(client));
}
};
namespace namespace
{ {
namespace http = epee::net_utils::http; namespace http = epee::net_utils::http;
@ -213,51 +265,13 @@ namespace lws
std::atomic_flag rates_error_once = ATOMIC_FLAG_INIT; std::atomic_flag rates_error_once = ATOMIC_FLAG_INIT;
struct runtime_options
{
std::uint32_t max_subaddresses;
epee::net_utils::ssl_verification_t webhook_verify;
bool disable_admin_auth;
bool auto_accept_creation;
};
struct rest_server_data
{
const db::storage disk;
const rpc::client client;
const runtime_options options;
std::vector<net::zmq::async_client> clients;
boost::mutex sync;
expect<net::zmq::async_client> get_async_client(boost::asio::io_service& io)
{
boost::unique_lock<boost::mutex> lock{sync};
if (!clients.empty())
{
net::zmq::async_client out{std::move(clients.back())};
clients.pop_back();
return out;
}
lock.unlock();
return client.make_async_client(io);
}
void store_async_client(net::zmq::async_client&& client)
{
const boost::lock_guard<boost::mutex> lock{sync};
client.close = false;
clients.push_back(std::move(client));
}
};
struct daemon_status struct daemon_status
{ {
using request = rpc::daemon_status_request; using request = rpc::daemon_status_request;
using response = epee::byte_slice; // sometimes async using response = epee::byte_slice; // sometimes async
using async_response = rpc::daemon_status_response; using async_response = rpc::daemon_status_response;
static expect<response> handle(const request&, boost::asio::io_service& io, rest_server_data& data, std::function<async_complete>&& resume) static expect<response> handle(const request&, rest_server_data& data, std::function<async_complete>&& resume)
{ {
using info_rpc = cryptonote::rpc::GetInfo; using info_rpc = cryptonote::rpc::GetInfo;
@ -268,16 +282,16 @@ namespace lws
std::string in; std::string in;
net::zmq::async_client client; net::zmq::async_client client;
boost::asio::steady_timer timer; boost::asio::steady_timer timer;
boost::asio::io_service::strand strand; boost::asio::io_context::strand strand;
std::vector<std::function<async_complete>> resumers; std::vector<std::function<async_complete>> resumers;
frame(rest_server_data& parent, boost::asio::io_service& io, net::zmq::async_client client) frame(rest_server_data& parent, net::zmq::async_client client)
: parent(std::addressof(parent)), : parent(std::addressof(parent)),
out(), out(),
in(), in(),
client(std::move(client)), client(std::move(client)),
timer(io), timer(parent.io),
strand(io), strand(parent.io),
resumers() resumers()
{ {
info_rpc::Request daemon_req{}; info_rpc::Request daemon_req{};
@ -321,6 +335,7 @@ namespace lws
void send_response(const boost::system::error_code error, const expect<copyable_slice>& value) void send_response(const boost::system::error_code error, const expect<copyable_slice>& value)
{ {
assert(self_ != nullptr); assert(self_ != nullptr);
assert(self_->strand.running_in_this_thread());
if (error) if (error)
MERROR("Failure in /daemon_status: " << error.message()); MERROR("Failure in /daemon_status: " << error.message());
@ -361,6 +376,7 @@ namespace lws
if (!self_ || error == boost::asio::error::operation_aborted) if (!self_ || error == boost::asio::error::operation_aborted)
return; return;
assert(self_->strand.running_in_this_thread());
MWARNING("Timeout on /daemon_status ZMQ call"); MWARNING("Timeout on /daemon_status ZMQ call");
self_->client.close = true; self_->client.close = true;
self_->client.asock->cancel(error); self_->client.asock->cancel(error);
@ -371,7 +387,7 @@ namespace lws
if (!self_->timer.expires_after(timeout) && expecting) if (!self_->timer.expires_after(timeout) && expecting)
return false; return false;
self_->timer.async_wait(self_->strand.wrap(on_timeout{self_})); self_->timer.async_wait(boost::asio::bind_executor(self_->strand, on_timeout{self_}));
return true; return true;
} }
@ -383,18 +399,19 @@ namespace lws
return send_response(error, json_response(async_response{})); return send_response(error, json_response(async_response{}));
frame& self = *self_; frame& self = *self_;
assert(self.strand.running_in_this_thread());
BOOST_ASIO_CORO_REENTER(*this) BOOST_ASIO_CORO_REENTER(*this)
{ {
set_timeout(std::chrono::seconds{2}, false); set_timeout(std::chrono::seconds{2}, false);
BOOST_ASIO_CORO_YIELD net::zmq::async_write( BOOST_ASIO_CORO_YIELD net::zmq::async_write(
self.client, std::move(self.out), self.strand.wrap(std::move(*this)) self.client, std::move(self.out), boost::asio::bind_executor(self.strand, std::move(*this))
); );
if (!set_timeout(std::chrono::seconds{5}, true)) if (!set_timeout(std::chrono::seconds{5}, true))
return send_response(boost::asio::error::operation_aborted, json_response(async_response{})); return send_response(boost::asio::error::operation_aborted, json_response(async_response{}));
BOOST_ASIO_CORO_YIELD net::zmq::async_read( BOOST_ASIO_CORO_YIELD net::zmq::async_read(
self.client, self.in, self.strand.wrap(std::move(*this)) self.client, self.in, boost::asio::bind_executor(self.strand, std::move(*this))
); );
if (!self.timer.cancel(error)) if (!self.timer.cancel(error))
@ -427,18 +444,18 @@ namespace lws
} }
}; };
expect<net::zmq::async_client> client = data.get_async_client(io); expect<net::zmq::async_client> client = data.get_async_client(data.io);
if (!client) if (!client)
return client.error(); return client.error();
active = std::make_shared<frame>(data, io, std::move(*client)); active = std::make_shared<frame>(data, std::move(*client));
cache.result = nullptr; cache.result = nullptr;
cache.status = active; cache.status = active;
active->resumers.push_back(std::move(resume)); active->resumers.push_back(std::move(resume));
lock.unlock(); lock.unlock();
MDEBUG("Starting new ZMQ request in /daemon_status"); MDEBUG("Starting new ZMQ request in /daemon_status");
active->strand.dispatch(async_handler{active}); boost::asio::dispatch(active->strand, async_handler{active});
return async_ready(); return async_ready();
} }
}; };
@ -448,7 +465,7 @@ namespace lws
using request = rpc::account_credentials; using request = rpc::account_credentials;
using response = rpc::get_address_info_response; using response = rpc::get_address_info_response;
static expect<response> handle(const request& req, const boost::asio::io_service&, const rest_server_data& data, std::function<async_complete>&&) static expect<response> handle(const request& req, const rest_server_data& data, std::function<async_complete>&&)
{ {
auto user = open_account(req, data.disk.clone()); auto user = open_account(req, data.disk.clone());
if (!user) if (!user)
@ -522,7 +539,7 @@ namespace lws
using request = rpc::account_credentials; using request = rpc::account_credentials;
using response = rpc::get_address_txs_response; using response = rpc::get_address_txs_response;
static expect<response> handle(const request& req, const boost::asio::io_service&, const rest_server_data& data, std::function<async_complete>&&) static expect<response> handle(const request& req, const rest_server_data& data, std::function<async_complete>&&)
{ {
auto user = open_account(req, data.disk.clone()); auto user = open_account(req, data.disk.clone());
if (!user) if (!user)
@ -646,7 +663,7 @@ namespace lws
using response = void; // always asynchronous response using response = void; // always asynchronous response
using async_response = rpc::get_random_outs_response; using async_response = rpc::get_random_outs_response;
static expect<response> handle(request&& req, boost::asio::io_service& io, rest_server_data& data, std::function<async_complete>&& resume) static expect<response> handle(request req, rest_server_data& data, std::function<async_complete>&& resume)
{ {
using distribution_rpc = cryptonote::rpc::GetOutputDistribution; using distribution_rpc = cryptonote::rpc::GetOutputDistribution;
using histogram_rpc = cryptonote::rpc::GetOutputHistogram; using histogram_rpc = cryptonote::rpc::GetOutputHistogram;
@ -665,11 +682,11 @@ namespace lws
boost::asio::strand<boost::asio::io_context::executor_type> strand; boost::asio::strand<boost::asio::io_context::executor_type> strand;
std::deque<std::pair<request, std::function<async_complete>>> resumers; std::deque<std::pair<request, std::function<async_complete>>> resumers;
frame(rest_server_data& parent, boost::asio::io_service& io, net::zmq::async_client client) frame(rest_server_data& parent, net::zmq::async_client client)
: parent(std::addressof(parent)), : parent(std::addressof(parent)),
client(std::move(client)), client(std::move(client)),
timer(io), timer(parent.io),
strand(io.get_executor()), strand(parent.io.get_executor()),
resumers() resumers()
{} {}
}; };
@ -743,15 +760,15 @@ namespace lws
if (error == boost::asio::error::operation_aborted) if (error == boost::asio::error::operation_aborted)
return; return;
self->strand.dispatch( boost::asio::dispatch(
self->strand,
[self] () [self] ()
{ {
boost::system::error_code error{}; boost::system::error_code error{};
MWARNING("Timeout on /get_random_outs ZMQ call"); MWARNING("Timeout on /get_random_outs ZMQ call");
self->client.close = true; self->client.close = true;
self->client.asock->cancel(error); self->client.asock->cancel(error);
}, }
boost::asio::get_associated_allocator(*self)
); );
} }
); );
@ -888,10 +905,6 @@ namespace lws
class zmq_fetch_keys class zmq_fetch_keys
{ {
/* `std::function` needs a copyable functor. The functor was made
const and copied in the function instead of using a reference to
make the callback in `std::function` thread-safe. This shouldn't
be a problem now, but this is just-in-case of a future refactor. */
async_handler self_; async_handler self_;
boost::asio::yield_context yield_; boost::asio::yield_context yield_;
@ -963,11 +976,11 @@ namespace lws
} }
}; };
expect<net::zmq::async_client> client = data.get_async_client(io); expect<net::zmq::async_client> client = data.get_async_client(data.io);
if (!client) if (!client)
return client.error(); return client.error();
active = std::make_shared<frame>(data, io, std::move(*client)); active = std::make_shared<frame>(data, std::move(*client));
cache.status = active; cache.status = active;
active->resumers.emplace_back(std::move(req), std::move(resume)); active->resumers.emplace_back(std::move(req), std::move(resume));
@ -984,7 +997,7 @@ namespace lws
using request = rpc::account_credentials; using request = rpc::account_credentials;
using response = rpc::get_subaddrs_response; using response = rpc::get_subaddrs_response;
static expect<response> handle(request const& req, const boost::asio::io_service&, const rest_server_data& data, std::function<async_complete>&&) static expect<response> handle(request const& req, const rest_server_data& data, std::function<async_complete>&&)
{ {
auto user = open_account(req, data.disk.clone()); auto user = open_account(req, data.disk.clone());
if (!user) if (!user)
@ -1063,7 +1076,7 @@ namespace lws
); );
} }
static expect<response> handle(request req, boost::asio::io_service& io, rest_server_data& data, std::function<async_complete>&& resume) static expect<response> handle(request&& req, rest_server_data& data, std::function<async_complete>&& resume)
{ {
struct frame struct frame
{ {
@ -1072,16 +1085,16 @@ namespace lws
std::string in; std::string in;
net::zmq::async_client client; net::zmq::async_client client;
boost::asio::steady_timer timer; boost::asio::steady_timer timer;
boost::asio::io_service::strand strand; boost::asio::io_context::strand strand;
std::vector<std::pair<request, std::function<async_complete>>> resumers; std::vector<std::pair<request, std::function<async_complete>>> resumers;
frame(rest_server_data& parent, boost::asio::io_service& io, net::zmq::async_client client) frame(rest_server_data& parent, net::zmq::async_client client)
: parent(std::addressof(parent)), : parent(std::addressof(parent)),
out(), out(),
in(), in(),
client(std::move(client)), client(std::move(client)),
timer(io), timer(parent.io),
strand(io), strand(parent.io),
resumers() resumers()
{ {
rpc_command::Request req{}; rpc_command::Request req{};
@ -1130,6 +1143,7 @@ namespace lws
void send_response(const boost::system::error_code error, expect<rpc_command::Response> value) void send_response(const boost::system::error_code error, expect<rpc_command::Response> value)
{ {
assert(self_ != nullptr); assert(self_ != nullptr);
assert(self_->strand.running_in_this_thread());
if (error) if (error)
{ {
@ -1173,6 +1187,7 @@ namespace lws
if (!self_ || error == boost::asio::error::operation_aborted) if (!self_ || error == boost::asio::error::operation_aborted)
return; return;
assert(self_->strand.running_in_this_thread());
MWARNING("Timeout on /get_unspent_outs ZMQ call"); MWARNING("Timeout on /get_unspent_outs ZMQ call");
self_->client.close = true; self_->client.close = true;
self_->client.asock->cancel(error); self_->client.asock->cancel(error);
@ -1183,7 +1198,7 @@ namespace lws
if (!self_->timer.expires_after(timeout) && expecting) if (!self_->timer.expires_after(timeout) && expecting)
return false; return false;
self_->timer.async_wait(self_->strand.wrap(on_timeout{self_})); self_->timer.async_wait(boost::asio::bind_executor(self_->strand, on_timeout{self_}));
return true; return true;
} }
@ -1197,18 +1212,19 @@ namespace lws
return send_response(error, default_response{}); return send_response(error, default_response{});
frame& self = *self_; frame& self = *self_;
assert(self.strand.running_in_this_thread());
BOOST_ASIO_CORO_REENTER(*this) BOOST_ASIO_CORO_REENTER(*this)
{ {
set_timeout(std::chrono::seconds{2}, false); set_timeout(std::chrono::seconds{2}, false);
BOOST_ASIO_CORO_YIELD net::zmq::async_write( BOOST_ASIO_CORO_YIELD net::zmq::async_write(
self.client, std::move(self.out), self.strand.wrap(std::move(*this)) self.client, std::move(self.out), boost::asio::bind_executor(self.strand, std::move(*this))
); );
if (!set_timeout(std::chrono::seconds{5}, true)) if (!set_timeout(std::chrono::seconds{5}, true))
return send_response(boost::asio::error::operation_aborted, default_response{}); return send_response(boost::asio::error::operation_aborted, default_response{});
BOOST_ASIO_CORO_YIELD net::zmq::async_read( BOOST_ASIO_CORO_YIELD net::zmq::async_read(
self.client, self.in, self.strand.wrap(std::move(*this)) self.client, self.in, boost::asio::bind_executor(self.strand, std::move(*this))
); );
if (!self.timer.cancel(error)) if (!self.timer.cancel(error))
@ -1226,18 +1242,18 @@ namespace lws
} }
}; };
expect<net::zmq::async_client> client = data.get_async_client(io); expect<net::zmq::async_client> client = data.get_async_client(data.io);
if (!client) if (!client)
return client.error(); return client.error();
active = std::make_shared<frame>(data, io, std::move(*client)); active = std::make_shared<frame>(data, std::move(*client));
cache.result = rpc_command::Response{}; cache.result = rpc_command::Response{};
cache.status = active; cache.status = active;
active->resumers.emplace_back(std::move(req), std::move(resume)); active->resumers.emplace_back(std::move(req), std::move(resume));
lock.unlock(); lock.unlock();
MDEBUG("Starting new ZMQ request in /get_unspent_outs"); MDEBUG("Starting new ZMQ request in /get_unspent_outs");
active->strand.dispatch(async_handler{active}); boost::asio::dispatch(active->strand, async_handler{active});
return async_ready(); return async_ready();
} }
}; };
@ -1247,7 +1263,7 @@ namespace lws
using request = rpc::account_credentials; using request = rpc::account_credentials;
using response = rpc::import_response; using response = rpc::import_response;
static expect<response> handle(request req, const boost::asio::io_service&, const rest_server_data& data, std::function<async_complete>&&) static expect<response> handle(request req, const rest_server_data& data, std::function<async_complete>&&)
{ {
bool new_request = false; bool new_request = false;
bool fulfilled = false; bool fulfilled = false;
@ -1287,7 +1303,7 @@ namespace lws
using request = rpc::login_request; using request = rpc::login_request;
using response = rpc::login_response; using response = rpc::login_response;
static expect<response> handle(request req, boost::asio::io_service& io, const rest_server_data& data, std::function<async_complete>&& resume) static expect<response> handle(request req, const rest_server_data& data, std::function<async_complete>&& resume)
{ {
if (!key_check(req.creds)) if (!key_check(req.creds))
return {lws::error::bad_view_key}; return {lws::error::bad_view_key};
@ -1344,7 +1360,7 @@ namespace lws
using request = rpc::provision_subaddrs_request; using request = rpc::provision_subaddrs_request;
using response = rpc::new_subaddrs_response; using response = rpc::new_subaddrs_response;
static expect<response> handle(request req, const boost::asio::io_service&, const rest_server_data& data, std::function<async_complete>&&) static expect<response> handle(const request& req, const rest_server_data& data, std::function<async_complete>&&)
{ {
if (!req.maj_i && !req.min_i && !req.n_min && !req.n_maj) if (!req.maj_i && !req.min_i && !req.n_min && !req.n_maj)
return {lws::error::invalid_range}; return {lws::error::invalid_range};
@ -1408,7 +1424,7 @@ namespace lws
using response = void; // always async using response = void; // always async
using async_response = rpc::submit_raw_tx_response; using async_response = rpc::submit_raw_tx_response;
static expect<response> handle(request req, boost::asio::io_service& io, rest_server_data& data, std::function<async_complete> resume) static expect<response> handle(request req, rest_server_data& data, std::function<async_complete>&& resume)
{ {
using transaction_rpc = cryptonote::rpc::SendRawTxHex; using transaction_rpc = cryptonote::rpc::SendRawTxHex;
@ -1418,15 +1434,15 @@ namespace lws
std::string in; std::string in;
net::zmq::async_client client; net::zmq::async_client client;
boost::asio::steady_timer timer; boost::asio::steady_timer timer;
boost::asio::io_service::strand strand; boost::asio::io_context::strand strand;
std::deque<std::pair<epee::byte_slice, std::function<async_complete>>> resumers; std::deque<std::pair<epee::byte_slice, std::function<async_complete>>> resumers;
frame(rest_server_data& parent, boost::asio::io_service& io, net::zmq::async_client client) frame(rest_server_data& parent, net::zmq::async_client client)
: parent(std::addressof(parent)), : parent(std::addressof(parent)),
in(), in(),
client(std::move(client)), client(std::move(client)),
timer(io), timer(parent.io),
strand(io), strand(parent.io),
resumers() resumers()
{} {}
}; };
@ -1468,6 +1484,7 @@ namespace lws
void send_response(const boost::system::error_code error, expect<copyable_slice> value) void send_response(const boost::system::error_code error, expect<copyable_slice> value)
{ {
assert(self_ != nullptr); assert(self_ != nullptr);
assert(self_->strand.running_in_this_thread());
std::deque<std::pair<epee::byte_slice, std::function<async_complete>>> resumers; std::deque<std::pair<epee::byte_slice, std::function<async_complete>>> resumers;
{ {
@ -1503,6 +1520,7 @@ namespace lws
if (!self_ || error == boost::asio::error::operation_aborted) if (!self_ || error == boost::asio::error::operation_aborted)
return; return;
assert(self_->strand.running_in_this_thread());
MWARNING("Timeout on /submit_raw_tx ZMQ call"); MWARNING("Timeout on /submit_raw_tx ZMQ call");
self_->client.close = true; self_->client.close = true;
self_->client.asock->cancel(error); self_->client.asock->cancel(error);
@ -1513,7 +1531,7 @@ namespace lws
if (!self_->timer.expires_after(timeout) && expecting) if (!self_->timer.expires_after(timeout) && expecting)
return false; return false;
self_->timer.async_wait(self_->strand.wrap(on_timeout{self_})); self_->timer.async_wait(boost::asio::bind_executor(self_->strand, on_timeout{self_}));
return true; return true;
} }
@ -1525,6 +1543,7 @@ namespace lws
return send_response(error, async_ready()); return send_response(error, async_ready());
frame& self = *self_; frame& self = *self_;
assert(self.strand.running_in_this_thread());
epee::byte_slice next = nullptr; epee::byte_slice next = nullptr;
BOOST_ASIO_CORO_REENTER(*this) BOOST_ASIO_CORO_REENTER(*this)
{ {
@ -1543,7 +1562,7 @@ namespace lws
set_timeout(std::chrono::seconds{10}, false); set_timeout(std::chrono::seconds{10}, false);
BOOST_ASIO_CORO_YIELD net::zmq::async_write( BOOST_ASIO_CORO_YIELD net::zmq::async_write(
self.client, std::move(next), self.strand.wrap(std::move(*this)) self.client, std::move(next), boost::asio::bind_executor(self.strand, std::move(*this))
); );
if (!set_timeout(std::chrono::seconds{20}, true)) if (!set_timeout(std::chrono::seconds{20}, true))
@ -1551,7 +1570,7 @@ namespace lws
self.in.clear(); // could be in moved-from state self.in.clear(); // could be in moved-from state
BOOST_ASIO_CORO_YIELD net::zmq::async_read( BOOST_ASIO_CORO_YIELD net::zmq::async_read(
self.client, self.in, self.strand.wrap(std::move(*this)) self.client, self.in, boost::asio::bind_executor(self.strand, std::move(*this))
); );
if (!self.timer.cancel(error)) if (!self.timer.cancel(error))
@ -1574,18 +1593,18 @@ namespace lws
} }
}; };
expect<net::zmq::async_client> client = data.get_async_client(io); expect<net::zmq::async_client> client = data.get_async_client(data.io);
if (!client) if (!client)
return client.error(); return client.error();
active = std::make_shared<frame>(data, io, std::move(*client)); active = std::make_shared<frame>(data, std::move(*client));
cache.status = active; cache.status = active;
active->resumers.emplace_back(std::move(msg), std::move(resume)); active->resumers.emplace_back(std::move(msg), std::move(resume));
lock.unlock(); lock.unlock();
MDEBUG("Starting new ZMQ request in /submit_raw_tx"); MDEBUG("Starting new ZMQ request in /submit_raw_tx");
active->strand.dispatch(async_handler{active}); boost::asio::dispatch(active->strand, async_handler{active});
return success(); return success();
} }
}; };
@ -1595,7 +1614,7 @@ namespace lws
using request = rpc::upsert_subaddrs_request; using request = rpc::upsert_subaddrs_request;
using response = rpc::new_subaddrs_response; using response = rpc::new_subaddrs_response;
static expect<response> handle(request req, const boost::asio::io_service&, const rest_server_data& data, std::function<async_complete>) static expect<response> handle(request req, const rest_server_data& data, std::function<async_complete>&&)
{ {
if (!data.options.max_subaddresses) if (!data.options.max_subaddresses)
return {lws::error::max_subaddresses}; return {lws::error::max_subaddresses};
@ -1632,7 +1651,7 @@ namespace lws
}; };
template<typename E> template<typename E>
expect<epee::byte_slice> call(std::string&& root, boost::asio::io_service& io, rest_server_data& data, std::function<async_complete>&& resume) expect<epee::byte_slice> call(std::string&& root, rest_server_data& data, std::function<async_complete>&& resume)
{ {
using request = typename E::request; using request = typename E::request;
using response = typename E::response; using response = typename E::response;
@ -1647,7 +1666,7 @@ namespace lws
if (error) if (error)
return error; return error;
expect<response> resp = E::handle(std::move(req), io, data, std::move(resume)); expect<response> resp = E::handle(std::move(req), data, std::move(resume));
if (!resp) if (!resp)
return resp.error(); return resp.error();
return json_response(std::move(resp)); return json_response(std::move(resp));
@ -1672,7 +1691,7 @@ namespace lws
} }
template<typename E> template<typename E>
expect<epee::byte_slice> call_admin(std::string&& root, boost::asio::io_service&, rest_server_data& data, std::function<async_complete>&&) expect<epee::byte_slice> call_admin(std::string&& root, rest_server_data& data, std::function<async_complete>&&)
{ {
using request = typename E::request; using request = typename E::request;
@ -1713,7 +1732,7 @@ namespace lws
struct endpoint struct endpoint
{ {
char const* const name; char const* const name;
expect<epee::byte_slice> (*const run)(std::string&&, boost::asio::io_service&, rest_server_data&, std::function<async_complete>&&); expect<epee::byte_slice> (*const run)(std::string&&, rest_server_data&, std::function<async_complete>&&);
const unsigned max_size; const unsigned max_size;
const bool is_async; const bool is_async;
}; };
@ -1824,18 +1843,16 @@ namespace lws
struct rest_server::internal struct rest_server::internal
{ {
rest_server_data data;
boost::optional<std::string> prefix; boost::optional<std::string> prefix;
boost::optional<std::string> admin_prefix; boost::optional<std::string> admin_prefix;
boost::optional<boost::asio::ssl::context> ssl_; boost::optional<boost::asio::ssl::context> ssl_;
boost::asio::ip::tcp::acceptor acceptor; boost::asio::ip::tcp::acceptor acceptor;
explicit internal(boost::asio::io_service& io_service, lws::db::storage disk, rpc::client client, runtime_options options) explicit internal(boost::asio::io_context& io)
: data{std::move(disk), std::move(client), std::move(options)} : prefix()
, prefix()
, admin_prefix() , admin_prefix()
, ssl_() , ssl_()
, acceptor(io_service) , acceptor(io)
{ {
assert(std::is_sorted(std::begin(endpoints), std::end(endpoints), by_name)); assert(std::is_sorted(std::begin(endpoints), std::end(endpoints), by_name));
} }
@ -1870,24 +1887,25 @@ namespace lws
template<typename Sock> template<typename Sock>
struct rest_server::connection struct rest_server::connection
{ {
rest_server_data* global_;
internal* parent_; internal* parent_;
Sock sock_; Sock sock_;
boost::beast::flat_static_buffer<http_parser_buffer_size> buffer_; boost::beast::flat_static_buffer<http_parser_buffer_size> buffer_;
boost::optional<boost::beast::http::parser<true, boost::beast::http::string_body>> parser_; boost::optional<boost::beast::http::parser<true, boost::beast::http::string_body>> parser_;
boost::beast::http::response<slice_body> response_; boost::beast::http::response<slice_body> response_;
boost::asio::steady_timer timer_; boost::asio::steady_timer timer_;
boost::asio::io_service::strand strand_; boost::asio::io_context::strand strand_;
bool keep_alive_; bool keep_alive_;
static boost::asio::ip::tcp::socket make_socket(std::true_type, internal* parent) static boost::asio::ip::tcp::socket make_socket(std::true_type, rest_server_data* global, internal*)
{ {
return boost::asio::ip::tcp::socket{GET_IO_SERVICE(parent->acceptor)}; return boost::asio::ip::tcp::socket{global->io};
} }
static boost::asio::ssl::stream<boost::asio::ip::tcp::socket> make_socket(std::false_type, internal* parent) static boost::asio::ssl::stream<boost::asio::ip::tcp::socket> make_socket(std::false_type, rest_server_data* global, internal* parent)
{ {
return boost::asio::ssl::stream<boost::asio::ip::tcp::socket>{ return boost::asio::ssl::stream<boost::asio::ip::tcp::socket>{
GET_IO_SERVICE(parent->acceptor), parent->ssl_.value() global->io, parent->ssl_.value()
}; };
} }
@ -1903,14 +1921,15 @@ namespace lws
boost::asio::ip::tcp::socket& sock() { return get_tcp(sock_); } boost::asio::ip::tcp::socket& sock() { return get_tcp(sock_); }
explicit connection(internal* parent) noexcept explicit connection(rest_server_data* global, internal* parent) noexcept
: parent_(parent), : global_(global),
sock_(make_socket(std::is_same<Sock, boost::asio::ip::tcp::socket>(), parent)), parent_(parent),
sock_(make_socket(std::is_same<Sock, boost::asio::ip::tcp::socket>(), global, parent)),
buffer_{}, buffer_{},
parser_{}, parser_{},
response_{}, response_{},
timer_(GET_IO_SERVICE(parent->acceptor)), timer_(global->io),
strand_(GET_IO_SERVICE(parent->acceptor)), strand_(global->io),
keep_alive_(true) keep_alive_(true)
{} {}
@ -1978,6 +1997,7 @@ namespace lws
if (!self_ || error == boost::asio::error::operation_aborted) if (!self_ || error == boost::asio::error::operation_aborted)
return; return;
assert(self_->strand_.running_in_this_thread());
MWARNING("Timeout on REST connection to " << self_->sock().remote_endpoint(error) << " / " << self_.get()); MWARNING("Timeout on REST connection to " << self_->sock().remote_endpoint(error) << " / " << self_.get());
self_->sock().cancel(error); self_->sock().cancel(error);
self_->shutdown(); self_->shutdown();
@ -1986,7 +2006,7 @@ namespace lws
if (!self->timer_.expires_after(timeout) && existing) if (!self->timer_.expires_after(timeout) && existing)
return false; // timeout queued, just abort return false; // timeout queued, just abort
self->timer_.async_wait(self->strand_.wrap(on_timeout{self})); self->timer_.async_wait(boost::asio::bind_executor(self->strand_, on_timeout{self}));
return true; return true;
} }
@ -2016,7 +2036,7 @@ namespace lws
connection<Sock>& self = *self_; connection<Sock>& self = *self_;
self.sock_.async_handshake( self.sock_.async_handshake(
boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::server, boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::server,
self.strand_.wrap(std::move(*this)) boost::asio::bind_executor(self.strand_, std::move(*this))
); );
} }
@ -2057,7 +2077,9 @@ namespace lws
{ {
/* The `resumer` callback can be invoked in another strand (created /* The `resumer` callback can be invoked in another strand (created
by the handler function), and therefore needs to be "wrapped" to by the handler function), and therefore needs to be "wrapped" to
ensure thread safety. This also allows `resume` to be unwrapped. */ ensure thread safety. This also allows `resume` to be unwrapped.
DO NOT use `boost::asio::bind_executor` here as it doesn't create
a new callable like `wrap` does. */
const auto& self = self_; const auto& self = self_;
resumer = self->strand_.wrap( resumer = self->strand_.wrap(
[self, resume] (expect<copyable_slice> body) mutable [self, resume] (expect<copyable_slice> body) mutable
@ -2071,7 +2093,7 @@ namespace lws
} }
MDEBUG("Running REST handler " << handler->name << " on " << self_.get()); MDEBUG("Running REST handler " << handler->name << " on " << self_.get());
auto body = handler->run(std::move(self_->parser_->get()).body(), GET_IO_SERVICE(self_->timer_), self_->parent_->data, std::move(resumer)); auto body = handler->run(std::move(self_->parser_->get()).body(), *self_->global_, std::move(resumer));
if (!body) if (!body)
return self_->bad_request(body.error(), std::forward<F>(resume)); return self_->bad_request(body.error(), std::forward<F>(resume));
else if (!handler->is_async || !body->empty()) else if (!handler->is_async || !body->empty())
@ -2118,7 +2140,7 @@ namespace lws
MDEBUG("Reading new REST request from " << self_.get()); MDEBUG("Reading new REST request from " << self_.get());
BOOST_ASIO_CORO_YIELD boost::beast::http::async_read( BOOST_ASIO_CORO_YIELD boost::beast::http::async_read(
self.sock_, self.buffer_, *self.parser_, self.strand_.wrap(std::move(*this)) self.sock_, self.buffer_, *self.parser_, boost::asio::bind_executor(self.strand_, std::move(*this))
); );
// async_response will have its own timeouts set in handlers if async // async_response will have its own timeouts set in handlers if async
@ -2131,7 +2153,7 @@ namespace lws
connection<Sock>::set_timeout(self_, rest_response_timeout, false); connection<Sock>::set_timeout(self_, rest_response_timeout, false);
BOOST_ASIO_CORO_YIELD boost::beast::http::async_write( BOOST_ASIO_CORO_YIELD boost::beast::http::async_write(
self.sock_, self.response_, self.strand_.wrap(std::move(*this)) self.sock_, self.response_, boost::asio::bind_executor(self.strand_, std::move(*this))
); );
if (!self.keep_alive_) if (!self.keep_alive_)
@ -2144,24 +2166,25 @@ namespace lws
template<typename Sock> template<typename Sock>
struct rest_server::accept_loop final : public boost::asio::coroutine struct rest_server::accept_loop final : public boost::asio::coroutine
{ {
internal* self_; rest_server_data* global_;
internal* parent_;
std::shared_ptr<connection<Sock>> next_; std::shared_ptr<connection<Sock>> next_;
explicit accept_loop(internal* self) noexcept explicit accept_loop(rest_server_data* global, internal* parent) noexcept
: self_(self), next_(nullptr) : global_(global), parent_(parent), next_(nullptr)
{} {}
void operator()(boost::system::error_code error = {}) void operator()(boost::system::error_code error = {})
{ {
if (!self_) if (!global_ || !parent_)
return; return;
BOOST_ASIO_CORO_REENTER(*this) BOOST_ASIO_CORO_REENTER(*this)
{ {
for (;;) for (;;)
{ {
next_ = std::make_shared<connection<Sock>>(self_); next_ = std::make_shared<connection<Sock>>(global_, parent_);
BOOST_ASIO_CORO_YIELD self_->acceptor.async_accept(next_->sock(), std::move(*this)); BOOST_ASIO_CORO_YIELD parent_->acceptor.async_accept(next_->sock(), std::move(*this));
if (error) if (error)
{ {
@ -2170,7 +2193,7 @@ namespace lws
else else
{ {
MDEBUG("New connection to " << next_->sock().remote_endpoint(error) << " / " << next_.get()); MDEBUG("New connection to " << next_->sock().remote_endpoint(error) << " / " << next_.get());
next_->strand_.dispatch(handler_loop{next_}); boost::asio::dispatch(next_->strand_, handler_loop{next_});
} }
} }
} }
@ -2179,7 +2202,7 @@ namespace lws
void rest_server::run_io() void rest_server::run_io()
{ {
try { io_service_.run(); } try { global_->io.run(); }
catch (const std::exception& e) catch (const std::exception& e)
{ {
std::raise(SIGINT); std::raise(SIGINT);
@ -2193,13 +2216,15 @@ namespace lws
} }
rest_server::rest_server(epee::span<const std::string> addresses, std::vector<std::string> admin, db::storage disk, rpc::client client, configuration config) rest_server::rest_server(epee::span<const std::string> addresses, std::vector<std::string> admin, db::storage disk, rpc::client client, configuration config)
: io_service_(), ports_(), workers_() : global_(std::make_unique<rest_server_data>(std::move(disk), std::move(client), runtime_options{config.max_subaddresses, config.webhook_verify, config.disable_admin_auth, config.auto_accept_creation})),
ports_(),
workers_()
{ {
if (addresses.empty()) if (addresses.empty())
MONERO_THROW(common_error::kInvalidArgument, "REST server requires 1 or more addresses"); MONERO_THROW(common_error::kInvalidArgument, "REST server requires 1 or more addresses");
std::sort(admin.begin(), admin.end()); std::sort(admin.begin(), admin.end());
const auto init_port = [&admin] (internal& port, const std::string& address, configuration config, const bool is_admin) -> bool const auto init_port = [this, &admin] (internal& port, const std::string& address, configuration config, const bool is_admin) -> bool
{ {
epee::net_utils::http::url_content url{}; epee::net_utils::http::url_content url{};
if (!epee::net_utils::parse_url(address, url)) if (!epee::net_utils::parse_url(address, url))
@ -2295,24 +2320,23 @@ namespace lws
if (ssl_options) if (ssl_options)
{ {
port.ssl_ = ssl_options.create_context(); port.ssl_ = ssl_options.create_context();
accept_loop<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>{std::addressof(port)}(); accept_loop<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>{global_.get(), std::addressof(port)}();
} }
else else
accept_loop<boost::asio::ip::tcp::socket>{std::addressof(port)}(); accept_loop<boost::asio::ip::tcp::socket>{global_.get(), std::addressof(port)}();
return https; return https;
}; };
bool any_ssl = false; bool any_ssl = false;
const runtime_options options{config.max_subaddresses, config.webhook_verify, config.disable_admin_auth, config.auto_accept_creation};
for (const std::string& address : addresses) for (const std::string& address : addresses)
{ {
ports_.emplace_back(io_service_, disk.clone(), MONERO_UNWRAP(client.clone()), options); ports_.emplace_back(global_->io);
any_ssl |= init_port(ports_.back(), address, config, false); any_ssl |= init_port(ports_.back(), address, config, false);
} }
for (const std::string& address : admin) for (const std::string& address : admin)
{ {
ports_.emplace_back(io_service_, disk.clone(), MONERO_UNWRAP(client.clone()), options); ports_.emplace_back(global_->io);
any_ssl |= init_port(ports_.back(), address, config, true); any_ssl |= init_port(ports_.back(), address, config, true);
} }
@ -2328,7 +2352,7 @@ namespace lws
rest_server::~rest_server() noexcept rest_server::~rest_server() noexcept
{ {
io_service_.stop(); global_->io.stop();
for (auto& t : workers_) for (auto& t : workers_)
{ {
if (t.joinable()) if (t.joinable())

View file

@ -27,10 +27,10 @@
#pragma once #pragma once
#include <boost/asio/io_service.hpp>
#include <boost/thread/thread.hpp> #include <boost/thread/thread.hpp>
#include <cstddef> #include <cstdint>
#include <list> #include <list>
#include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
@ -41,19 +41,20 @@
namespace lws namespace lws
{ {
struct rest_server_data;
class rest_server class rest_server
{ {
struct internal; struct internal;
template<typename> struct connection; template<typename> struct connection;
template<typename> struct handler_loop; template<typename> struct handler_loop;
template<typename> struct accept_loop; template<typename> struct accept_loop;
boost::asio::io_service io_service_; //!< Put first so its destroyed last std::unique_ptr<rest_server_data> global_;
std::list<internal> ports_; std::list<internal> ports_;
std::vector<boost::thread> workers_; std::vector<boost::thread> workers_;
void run_io(); void run_io();
public: public:
struct configuration struct configuration
{ {
@ -66,14 +67,14 @@ namespace lws
bool disable_admin_auth; bool disable_admin_auth;
bool auto_accept_creation; bool auto_accept_creation;
}; };
explicit rest_server(epee::span<const std::string> addresses, std::vector<std::string> admin, db::storage disk, rpc::client client, configuration config); explicit rest_server(epee::span<const std::string> addresses, std::vector<std::string> admin, db::storage disk, rpc::client client, configuration config);
rest_server(rest_server&&) = delete; rest_server(rest_server&&) = delete;
rest_server(rest_server const&) = delete; rest_server(rest_server const&) = delete;
~rest_server() noexcept; ~rest_server() noexcept;
rest_server& operator=(rest_server&&) = delete; rest_server& operator=(rest_server&&) = delete;
rest_server& operator=(rest_server const&) = delete; rest_server& operator=(rest_server const&) = delete;
}; };

View file

@ -399,7 +399,7 @@ namespace rpc
return {lws::error::bad_daemon_response}; return {lws::error::bad_daemon_response};
} }
expect<net::zmq::async_client> client::make_async_client(boost::asio::io_service& io) const expect<net::zmq::async_client> client::make_async_client(boost::asio::io_context& io) const
{ {
MONERO_PRECOND(ctx != nullptr); MONERO_PRECOND(ctx != nullptr);

View file

@ -26,7 +26,7 @@
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once #pragma once
#include <boost/asio/io_service.hpp> #include <boost/asio/io_context.hpp>
#include <boost/optional/optional.hpp> #include <boost/optional/optional.hpp>
#include <chrono> #include <chrono>
#include <memory> #include <memory>
@ -139,7 +139,7 @@ namespace rpc
} }
//! \return `async_client` to daemon. Thread safe. //! \return `async_client` to daemon. Thread safe.
expect<net::zmq::async_client> make_async_client(boost::asio::io_service& io) const; expect<net::zmq::async_client> make_async_client(boost::asio::io_context& io) const;
/*! /*!
Queue `message` for sending to daemon. If the queue is full, wait a Queue `message` for sending to daemon. If the queue is full, wait a

View file

@ -27,14 +27,15 @@
#include "client.h" #include "client.h"
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/coroutine.hpp> #include <boost/asio/coroutine.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/steady_timer.hpp> #include <boost/asio/steady_timer.hpp>
#include <boost/numeric/conversion/cast.hpp> #include <boost/numeric/conversion/cast.hpp>
#include <chrono> #include <chrono>
#include "common/expect.h" // monero/src #include "common/expect.h" // monero/src
#include "misc_log_ex.h" // monero/contrib/epee/include #include "misc_log_ex.h" // monero/contrib/epee/include
#include "net/net_utils_base.h" // monero/contrib/epee/include
#include "rpc/scanner/commands.h" #include "rpc/scanner/commands.h"
#include "rpc/scanner/connection.h" #include "rpc/scanner/connection.h"
#include "rpc/scanner/read_commands.h" #include "rpc/scanner/read_commands.h"
@ -121,9 +122,13 @@ namespace lws { namespace rpc { namespace scanner
{ {
MINFO("Attempting connection to " << self_->server_address_); MINFO("Attempting connection to " << self_->server_address_);
self_->connect_timer_.expires_from_now(connect_timeout); self_->connect_timer_.expires_from_now(connect_timeout);
self_->connect_timer_.async_wait(self_->strand_.wrap(close{self_})); self_->connect_timer_.async_wait(
boost::asio::bind_executor(self_->strand_, close{self_})
);
BOOST_ASIO_CORO_YIELD self_->sock_.async_connect(self_->server_address_, self_->strand_.wrap(*this)); BOOST_ASIO_CORO_YIELD self_->sock_.async_connect(
self_->server_address_, boost::asio::bind_executor(self_->strand_, *this)
);
if (!self_->connect_timer_.cancel() || error) if (!self_->connect_timer_.cancel() || error)
{ {
@ -135,7 +140,9 @@ namespace lws { namespace rpc { namespace scanner
MINFO("Retrying connection in " << std::chrono::seconds{reconnect_interval}.count() << " seconds"); MINFO("Retrying connection in " << std::chrono::seconds{reconnect_interval}.count() << " seconds");
self_->connect_timer_.expires_from_now(reconnect_interval); self_->connect_timer_.expires_from_now(reconnect_interval);
BOOST_ASIO_CORO_YIELD self_->connect_timer_.async_wait(self_->strand_.wrap(*this)); BOOST_ASIO_CORO_YIELD self_->connect_timer_.async_wait(
boost::asio::bind_executor(self_->strand_, *this)
);
} }
MINFO("Connection made to " << self_->server_address_); MINFO("Connection made to " << self_->server_address_);
@ -147,7 +154,7 @@ namespace lws { namespace rpc { namespace scanner
} }
}; };
client::client(boost::asio::io_service& io, const std::string& address, std::string pass, std::vector<std::shared_ptr<queue>> local) client::client(boost::asio::io_context& io, const std::string& address, std::string pass, std::vector<std::shared_ptr<queue>> local)
: connection(io), : connection(io),
local_(std::move(local)), local_(std::move(local)),
pass_(std::move(pass)), pass_(std::move(pass)),
@ -182,11 +189,14 @@ namespace lws { namespace rpc { namespace scanner
{ {
if (!self) if (!self)
MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
self->strand_.dispatch([self] () boost::asio::dispatch(
self->strand_,
[self] ()
{ {
if (!self->sock_.is_open()) if (!self->sock_.is_open())
connector{self}(); connector{self}();
}); }
);
} }
void client::push_accounts(const std::shared_ptr<client>& self, std::vector<lws::account> users) void client::push_accounts(const std::shared_ptr<client>& self, std::vector<lws::account> users)
@ -194,7 +204,9 @@ namespace lws { namespace rpc { namespace scanner
if (!self) if (!self)
MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
self->strand_.dispatch([self, users = std::move(users)] () mutable boost::asio::dispatch(
self->strand_,
[self, users = std::move(users)] () mutable
{ {
/* Keep this algorithm simple, one user at a time. A little more difficult /* Keep this algorithm simple, one user at a time. A little more difficult
to do multiples at once */ to do multiples at once */
@ -207,7 +219,8 @@ namespace lws { namespace rpc { namespace scanner
); );
self->next_push_ %= self->local_.size(); self->next_push_ %= self->local_.size();
} }
}); }
);
} }
void client::replace_accounts(const std::shared_ptr<client>& self, std::vector<lws::account> users) void client::replace_accounts(const std::shared_ptr<client>& self, std::vector<lws::account> users)
@ -215,7 +228,9 @@ namespace lws { namespace rpc { namespace scanner
if (!self) if (!self)
MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
self->strand_.dispatch([self, users = std::move(users)] () mutable boost::asio::dispatch(
self->strand_,
[self, users = std::move(users)] () mutable
{ {
MINFO("Received " << users.size() << " accounts as new workload"); MINFO("Received " << users.size() << " accounts as new workload");
for (std::size_t i = 0; i < self->local_.size(); ++i) for (std::size_t i = 0; i < self->local_.size(); ++i)
@ -230,7 +245,8 @@ namespace lws { namespace rpc { namespace scanner
self->local_[i]->replace_accounts(std::move(next)); self->local_[i]->replace_accounts(std::move(next));
} }
self->next_push_ = 0; self->next_push_ = 0;
}); }
);
} }
void client::send_update(const std::shared_ptr<client>& self, std::vector<lws::account> users, std::vector<crypto::hash> blocks) void client::send_update(const std::shared_ptr<client>& self, std::vector<lws::account> users, std::vector<crypto::hash> blocks)
@ -238,17 +254,20 @@ namespace lws { namespace rpc { namespace scanner
if (!self) if (!self)
MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
self->strand_.dispatch([self, users = std::move(users), blocks = std::move(blocks)] () mutable boost::asio::dispatch(
self->strand_,
[self, users = std::move(users), blocks = std::move(blocks)] () mutable
{ {
if (!self->connected_) if (!self->connected_)
MONERO_THROW(common_error::kInvalidArgument, "not connected"); MONERO_THROW(common_error::kInvalidArgument, "not connected");
write_command(self, update_accounts{std::move(users), std::move(blocks)}); write_command(self, update_accounts{std::move(users), std::move(blocks)});
}); }
);
} }
void client::cleanup() void client::cleanup()
{ {
base_cleanup(); base_cleanup();
GET_IO_SERVICE(sock_).stop(); context().stop();
} }
}}} // lws // rpc // scanner }}} // lws // rpc // scanner

View file

@ -26,7 +26,7 @@
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once #pragma once
#include <boost/asio/io_service.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp> #include <boost/asio/steady_timer.hpp>
#include <cstddef> #include <cstddef>
@ -81,7 +81,7 @@ namespace lws { namespace rpc { namespace scanner
//! Send `users` upstream for disk storage //! Send `users` upstream for disk storage
static void send_update(const std::shared_ptr<client>& self, std::vector<lws::account> users, std::vector<crypto::hash> blocks); static void send_update(const std::shared_ptr<client>& self, std::vector<lws::account> users, std::vector<crypto::hash> blocks);
//! Closes socket and calls stop on `io_service`. //! Closes socket and calls stop on `io_context`.
void cleanup(); void cleanup();
}; };
}}} // lws // rpc // scanner }}} // lws // rpc // scanner

View file

@ -31,7 +31,7 @@
namespace lws { namespace rpc { namespace scanner namespace lws { namespace rpc { namespace scanner
{ {
connection::connection(boost::asio::io_service& io) connection::connection(boost::asio::io_context& io)
: read_buf_(), : read_buf_(),
write_bufs_(), write_bufs_(),
sock_(io), sock_(io),

View file

@ -28,7 +28,7 @@
#include <atomic> #include <atomic>
#include <boost/asio/buffer.hpp> #include <boost/asio/buffer.hpp>
#include <boost/asio/io_service.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp> #include <boost/asio/steady_timer.hpp>
#include <boost/asio/strand.hpp> #include <boost/asio/strand.hpp>
@ -50,13 +50,15 @@ namespace lws { namespace rpc { namespace scanner
std::deque<epee::byte_slice> write_bufs_; std::deque<epee::byte_slice> write_bufs_;
boost::asio::ip::tcp::socket sock_; boost::asio::ip::tcp::socket sock_;
boost::asio::steady_timer write_timeout_; boost::asio::steady_timer write_timeout_;
boost::asio::io_service::strand strand_; boost::asio::io_context::strand strand_;
header next_; header next_;
bool cleanup_; bool cleanup_;
explicit connection(boost::asio::io_service& io); explicit connection(boost::asio::io_context& io);
~connection(); ~connection();
boost::asio::io_context& context() const { return strand_.context(); }
boost::asio::ip::tcp::endpoint remote_endpoint(); boost::asio::ip::tcp::endpoint remote_endpoint();
//! \return ASIO compatible read buffer of `size`. //! \return ASIO compatible read buffer of `size`.

View file

@ -26,7 +26,9 @@
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once #pragma once
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/coroutine.hpp> #include <boost/asio/coroutine.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/read.hpp> #include <boost/asio/read.hpp>
#include <cstring> #include <cstring>
#include <limits> #include <limits>
@ -120,11 +122,15 @@ namespace lws { namespace rpc { namespace scanner
for (;;) // multiple commands for (;;) // multiple commands
{ {
// indefinite read timeout (waiting for next command) // indefinite read timeout (waiting for next command)
BOOST_ASIO_CORO_YIELD boost::asio::async_read(self_->sock_, self_->read_buffer(sizeof(self_->next_)), self_->strand_.wrap(*this)); BOOST_ASIO_CORO_YIELD boost::asio::async_read(
self_->sock_, self_->read_buffer(sizeof(self_->next_)), boost::asio::bind_executor(self_->strand_, *this)
);
std::memcpy(std::addressof(self_->next_), self_->read_buf_.data(), sizeof(self_->next_)); std::memcpy(std::addressof(self_->next_), self_->read_buf_.data(), sizeof(self_->next_));
static_assert(std::numeric_limits<header::length_type::value_type>::max() <= std::numeric_limits<std::size_t>::max()); static_assert(std::numeric_limits<header::length_type::value_type>::max() <= std::numeric_limits<std::size_t>::max());
BOOST_ASIO_CORO_YIELD boost::asio::async_read(self_->sock_, self_->read_buffer(self_->next_.length.value()), self_->strand_.wrap(*this)); BOOST_ASIO_CORO_YIELD boost::asio::async_read(
self_->sock_, self_->read_buffer(self_->next_.length.value()), boost::asio::bind_executor(self_->strand_, *this)
);
const auto& commands = T::commands(); const auto& commands = T::commands();
if (commands.size() <= self_->next_.id || !commands[self_->next_.id](self_)) if (commands.size() <= self_->next_.id || !commands[self_->next_.id](self_))
@ -142,7 +148,7 @@ namespace lws { namespace rpc { namespace scanner
{ {
if (!self) if (!self)
return false; return false;
self->strand_.dispatch(do_read_commands{self}); boost::asio::dispatch(self->strand_, do_read_commands{self});
return true; return true;
} }
}}} // lws // rpc // scanner }}} // lws // rpc // scanner

View file

@ -27,6 +27,7 @@
#include "server.h" #include "server.h"
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/coroutine.hpp> #include <boost/asio/coroutine.hpp>
#include <boost/asio/dispatch.hpp> #include <boost/asio/dispatch.hpp>
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
@ -39,7 +40,6 @@
#include "common/expect.h" // monero/src/ #include "common/expect.h" // monero/src/
#include "error.h" #include "error.h"
#include "misc_log_ex.h" // monero/contrib/epee/include #include "misc_log_ex.h" // monero/contrib/epee/include
#include "net/net_utils_base.h" // monero/contrib/epee/include
#include "rpc/scanner/commands.h" #include "rpc/scanner/commands.h"
#include "rpc/scanner/connection.h" #include "rpc/scanner/connection.h"
#include "rpc/scanner/read_commands.h" #include "rpc/scanner/read_commands.h"
@ -80,7 +80,7 @@ namespace lws { namespace rpc { namespace scanner
std::size_t threads_; //!< Number of scan threads at remote process std::size_t threads_; //!< Number of scan threads at remote process
public: public:
explicit server_connection(std::shared_ptr<server> parent, boost::asio::io_service& io) explicit server_connection(std::shared_ptr<server> parent, boost::asio::io_context& io)
: connection(io), : connection(io),
parent_(std::move(parent)), parent_(std::move(parent)),
threads_(0) threads_(0)
@ -179,8 +179,10 @@ namespace lws { namespace rpc { namespace scanner
{ {
for (;;) for (;;)
{ {
next_ = std::make_shared<server_connection>(self_, GET_IO_SERVICE(self_->check_timer_)); next_ = std::make_shared<server_connection>(self_, self_->strand_.context());
BOOST_ASIO_CORO_YIELD self_->acceptor_.async_accept(next_->sock_, self_->strand_.wrap(*this)); BOOST_ASIO_CORO_YIELD self_->acceptor_.async_accept(
next_->sock_, boost::asio::bind_executor(self_->strand_, *this)
);
MINFO("New connection to " << next_->remote_endpoint() << " / " << next_.get()); MINFO("New connection to " << next_->remote_endpoint() << " / " << next_.get());
@ -202,7 +204,7 @@ namespace lws { namespace rpc { namespace scanner
assert(self_->strand_.running_in_this_thread()); assert(self_->strand_.running_in_this_thread());
self_->check_timer_.expires_from_now(account_poll_interval); self_->check_timer_.expires_from_now(account_poll_interval);
self_->check_timer_.async_wait(self_->strand_.wrap(*this)); self_->check_timer_.async_wait(boost::asio::bind_executor(self_->strand_, *this));
std::size_t total_threads = self_->local_.size(); std::size_t total_threads = self_->local_.size();
std::vector<std::shared_ptr<server_connection>> remotes{}; std::vector<std::shared_ptr<server_connection>> remotes{};
@ -456,7 +458,7 @@ namespace lws { namespace rpc { namespace scanner
}; };
} }
server::server(boost::asio::io_service& io, db::storage disk, rpc::client zclient, std::vector<std::shared_ptr<queue>> local, std::vector<db::account_id> active, ssl_verification_t webhook_verify) server::server(boost::asio::io_context& io, db::storage disk, rpc::client zclient, std::vector<std::shared_ptr<queue>> local, std::vector<db::account_id> active, ssl_verification_t webhook_verify)
: strand_(io), : strand_(io),
check_timer_(io), check_timer_(io),
acceptor_(io), acceptor_(io),
@ -517,7 +519,9 @@ namespace lws { namespace rpc { namespace scanner
return; return;
auto endpoint = get_endpoint(address); auto endpoint = get_endpoint(address);
self->strand_.dispatch([self, endpoint = std::move(endpoint), pass = std::move(pass)] () boost::asio::dispatch(
self->strand_,
[self, endpoint = std::move(endpoint), pass = std::move(pass)] ()
{ {
self->acceptor_.close(); self->acceptor_.close();
self->acceptor_.open(endpoint.protocol()); self->acceptor_.open(endpoint.protocol());
@ -531,21 +535,22 @@ namespace lws { namespace rpc { namespace scanner
self->compute_hash(self->pass_hashed_, pass); self->compute_hash(self->pass_hashed_, pass);
acceptor{std::move(self)}(); acceptor{std::move(self)}();
}); }
);
} }
void server::start_user_checking(const std::shared_ptr<server>& self) void server::start_user_checking(const std::shared_ptr<server>& self)
{ {
if (!self) if (!self)
MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
self->strand_.dispatch(check_users{self}); boost::asio::dispatch(self->strand_, check_users{self});
} }
void server::replace_users(const std::shared_ptr<server>& self) void server::replace_users(const std::shared_ptr<server>& self)
{ {
if (!self) if (!self)
MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
self->strand_.dispatch([self] () { self->do_replace_users(); }); boost::asio::dispatch(self->strand_, [self] () { self->do_replace_users(); });
} }
void server::store(const std::shared_ptr<server>& self, std::vector<lws::account> users, std::vector<crypto::hash> blocks) void server::store(const std::shared_ptr<server>& self, std::vector<lws::account> users, std::vector<crypto::hash> blocks)
@ -554,7 +559,9 @@ namespace lws { namespace rpc { namespace scanner
MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); MONERO_THROW(common_error::kInvalidArgument, "nullptr self");
std::sort(users.begin(), users.end(), by_height{}); std::sort(users.begin(), users.end(), by_height{});
self->strand_.dispatch([self, users = std::move(users), blocks = std::move(blocks)] () boost::asio::dispatch(
self->strand_,
[self, users = std::move(users), blocks = std::move(blocks)] ()
{ {
const lws::scanner_options opts{self->webhook_verify_, false, false}; const lws::scanner_options opts{self->webhook_verify_, false, false};
if (!lws::user_data::store(self->disk_, self->zclient_, epee::to_span(blocks), epee::to_span(users), nullptr, opts)) if (!lws::user_data::store(self->disk_, self->zclient_, epee::to_span(blocks), epee::to_span(users), nullptr, opts))

View file

@ -27,7 +27,7 @@
#pragma once #pragma once
#include <array> #include <array>
#include <boost/asio/io_service.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/strand.hpp> #include <boost/asio/strand.hpp>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp> #include <boost/asio/steady_timer.hpp>
@ -57,7 +57,7 @@ namespace lws { namespace rpc { namespace scanner
needed (basically a REST server on either end). */ needed (basically a REST server on either end). */
class server class server
{ {
boost::asio::io_service::strand strand_; boost::asio::io_context::strand strand_;
boost::asio::steady_timer check_timer_; boost::asio::steady_timer check_timer_;
boost::asio::ip::tcp::acceptor acceptor_; boost::asio::ip::tcp::acceptor acceptor_;
std::set<std::weak_ptr<server_connection>, std::owner_less<std::weak_ptr<server_connection>>> remote_; std::set<std::weak_ptr<server_connection>, std::owner_less<std::weak_ptr<server_connection>>> remote_;
@ -85,7 +85,7 @@ namespace lws { namespace rpc { namespace scanner
public: public:
static boost::asio::ip::tcp::endpoint get_endpoint(const std::string& address); static boost::asio::ip::tcp::endpoint get_endpoint(const std::string& address);
explicit server(boost::asio::io_service& io, db::storage disk, rpc::client zclient, std::vector<std::shared_ptr<queue>> local, std::vector<db::account_id> active, ssl_verification_t webhook_verify); explicit server(boost::asio::io_context& io, db::storage disk, rpc::client zclient, std::vector<std::shared_ptr<queue>> local, std::vector<db::account_id> active, ssl_verification_t webhook_verify);
server(const server&) = delete; server(const server&) = delete;
server(server&&) = delete; server(server&&) = delete;

View file

@ -26,6 +26,7 @@
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once #pragma once
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/coroutine.hpp> #include <boost/asio/coroutine.hpp>
#include <boost/asio/dispatch.hpp> #include <boost/asio/dispatch.hpp>
#include <boost/asio/write.hpp> #include <boost/asio/write.hpp>
@ -118,8 +119,10 @@ namespace lws { namespace rpc { namespace scanner
while (!self_->write_bufs_.empty()) while (!self_->write_bufs_.empty())
{ {
self_->write_timeout_.expires_from_now(std::chrono::seconds{10}); self_->write_timeout_.expires_from_now(std::chrono::seconds{10});
self_->write_timeout_.async_wait(self_->strand_.wrap(timeout<T>{self_})); self_->write_timeout_.async_wait(boost::asio::bind_executor(self_->strand_, timeout<T>{self_}));
BOOST_ASIO_CORO_YIELD boost::asio::async_write(self_->sock_, self_->write_buffer(), self_->strand_.wrap(*this)); BOOST_ASIO_CORO_YIELD boost::asio::async_write(
self_->sock_, self_->write_buffer(), boost::asio::bind_executor(self_->strand_, *this)
);
self_->write_timeout_.cancel(); self_->write_timeout_.cancel();
self_->write_bufs_.pop_front(); self_->write_bufs_.pop_front();
} }
@ -205,6 +208,6 @@ namespace lws { namespace rpc { namespace scanner
} }
}; };
self->strand_.dispatch(queue_slice{self, std::move(msg)}); boost::asio::dispatch(self->strand_, queue_slice{self, std::move(msg)});
} }
}}} // lws // rpc // scanner }}} // lws // rpc // scanner

View file

@ -1080,7 +1080,7 @@ namespace lws
{ {
// Run possible SIGINT handler // Run possible SIGINT handler
self.io_.poll_one(); self.io_.poll_one();
self.io_.reset(); self.io_.restart();
if (self.has_shutdown()) if (self.has_shutdown())
return {lws::error::signal_abort_process}; return {lws::error::signal_abort_process};
@ -1098,7 +1098,7 @@ namespace lws
{ {
// Run possible SIGINT handler // Run possible SIGINT handler
self.io_.poll_one(); self.io_.poll_one();
self.io_.reset(); self.io_.restart();
if (self.has_shutdown()) if (self.has_shutdown())
return {lws::error::signal_abort_process}; return {lws::error::signal_abort_process};
@ -1334,11 +1334,11 @@ namespace lws
thread_count = std::max(std::size_t(1), thread_count); thread_count = std::max(std::size_t(1), thread_count);
/*! \NOTE Be careful about references and lifetimes of the callbacks. The /*! \NOTE Be careful about references and lifetimes of the callbacks. The
ones below are safe because no `io_service::run()` call is after the ones below are safe because no `io_context::run()` call is after the
destruction of the references. destruction of the references.
\NOTE That `ctx` will need a strand or lock if multiple \NOTE That `ctx` will need a strand or lock if multiple
`io_service::run()` calls are used. */ `io_context::run()` calls are used. */
boost::asio::steady_timer rate_timer{sync_.io_}; boost::asio::steady_timer rate_timer{sync_.io_};
class rate_updater class rate_updater

View file

@ -27,7 +27,7 @@
#pragma once #pragma once
#include <atomic> #include <atomic>
#include <boost/asio/io_service.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/signal_set.hpp> #include <boost/asio/signal_set.hpp>
#include <boost/optional/optional.hpp> #include <boost/optional/optional.hpp>
#include <cstdint> #include <cstdint>
@ -77,7 +77,7 @@ namespace lws
struct scanner_sync struct scanner_sync
{ {
boost::asio::io_service io_; boost::asio::io_context io_;
std::atomic<bool> stop_; //!< Stop scanning but do not shutdown std::atomic<bool> stop_; //!< Stop scanning but do not shutdown
std::atomic<bool> shutdown_; //!< Exit scanner::run std::atomic<bool> shutdown_; //!< Exit scanner::run