mirror of
https://github.com/vtnerd/monero-lws.git
synced 2025-01-05 02:09:24 +00:00
/get_random_outs is now fully async using stackful coroutines (#142)
Some checks are pending
unix-ci / build-tests (ubuntu-latest, WITH_RMQ=OFF) (push) Waiting to run
unix-ci / build-tests (ubuntu-latest, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (macos-13, WITH_RMQ=OFF) (push) Waiting to run
unix-ci / build-tests (macos-13, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (macos-14, WITH_RMQ=OFF) (push) Waiting to run
unix-ci / build-tests (macos-14, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (macos-latest, WITH_RMQ=OFF) (push) Waiting to run
unix-ci / build-tests (macos-latest, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (ubuntu-20.04, WITH_RMQ=OFF) (push) Waiting to run
unix-ci / build-tests (ubuntu-20.04, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (ubuntu-22.04, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (ubuntu-24.04, WITH_RMQ=OFF) (push) Waiting to run
unix-ci / build-tests (ubuntu-24.04, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (ubuntu-22.04, WITH_RMQ=OFF) (push) Waiting to run
Some checks are pending
unix-ci / build-tests (ubuntu-latest, WITH_RMQ=OFF) (push) Waiting to run
unix-ci / build-tests (ubuntu-latest, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (macos-13, WITH_RMQ=OFF) (push) Waiting to run
unix-ci / build-tests (macos-13, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (macos-14, WITH_RMQ=OFF) (push) Waiting to run
unix-ci / build-tests (macos-14, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (macos-latest, WITH_RMQ=OFF) (push) Waiting to run
unix-ci / build-tests (macos-latest, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (ubuntu-20.04, WITH_RMQ=OFF) (push) Waiting to run
unix-ci / build-tests (ubuntu-20.04, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (ubuntu-22.04, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (ubuntu-24.04, WITH_RMQ=OFF) (push) Waiting to run
unix-ci / build-tests (ubuntu-24.04, WITH_RMQ=ON) (push) Waiting to run
unix-ci / build-tests (ubuntu-22.04, WITH_RMQ=OFF) (push) Waiting to run
This commit is contained in:
parent
cd80138722
commit
ef78f19986
4 changed files with 318 additions and 179 deletions
|
@ -161,7 +161,7 @@ if(STATIC)
|
||||||
set(Boost_USE_STATIC_LIBS ON)
|
set(Boost_USE_STATIC_LIBS ON)
|
||||||
set(Boost_USE_STATIC_RUNTIME ON)
|
set(Boost_USE_STATIC_RUNTIME ON)
|
||||||
endif()
|
endif()
|
||||||
find_package(Boost 1.70 QUIET REQUIRED COMPONENTS chrono filesystem program_options regex serialization system thread)
|
find_package(Boost 1.70 QUIET REQUIRED COMPONENTS chrono coroutine filesystem program_options regex serialization system thread)
|
||||||
|
|
||||||
if (NOT (Boost_THREAD_LIBRARY STREQUAL monero_Boost_THREAD_LIBRARY_RELEASE))
|
if (NOT (Boost_THREAD_LIBRARY STREQUAL monero_Boost_THREAD_LIBRARY_RELEASE))
|
||||||
message(STATUS "Found Boost_THREAD_LIBRARY: ${Boost_THREAD_LIBRARY}")
|
message(STATUS "Found Boost_THREAD_LIBRARY: ${Boost_THREAD_LIBRARY}")
|
||||||
|
|
|
@ -56,6 +56,7 @@ target_link_libraries(monero-lws-daemon-common
|
||||||
monero-lws-wire-json
|
monero-lws-wire-json
|
||||||
monero-lws-util
|
monero-lws-util
|
||||||
${Boost_CHRONO_LIBRARY}
|
${Boost_CHRONO_LIBRARY}
|
||||||
|
${Boost_COROUTINE_LIBRARY}
|
||||||
${Boost_PROGRAM_OPTIONS_LIBRARY}
|
${Boost_PROGRAM_OPTIONS_LIBRARY}
|
||||||
${Boost_SYSTEM_LIBRARY}
|
${Boost_SYSTEM_LIBRARY}
|
||||||
${Boost_THREAD_LIBRARY}
|
${Boost_THREAD_LIBRARY}
|
||||||
|
|
|
@ -143,20 +143,20 @@ namespace net { namespace zmq
|
||||||
|
|
||||||
//! Cannot have an `async_read` and `async_write` at same time (edge trigger)
|
//! Cannot have an `async_read` and `async_write` at same time (edge trigger)
|
||||||
template<typename F>
|
template<typename F>
|
||||||
void async_read(async_client& sock, std::string& buffer, F&& f)
|
auto async_read(async_client& sock, std::string& buffer, F&& f)
|
||||||
{
|
{
|
||||||
// async_compose is required for correct strand invocation, etc
|
// async_compose is required for correct strand invocation, etc
|
||||||
boost::asio::async_compose<F, void(boost::system::error_code, std::size_t)>(
|
return boost::asio::async_compose<F, void(boost::system::error_code, std::size_t)>(
|
||||||
read_msg_op{sock, buffer}, f, *sock.asock
|
read_msg_op{sock, buffer}, f, *sock.asock
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
//! Cannot have an `async_write` and `async_read` at same time (edge trigger)
|
//! Cannot have an `async_write` and `async_read` at same time (edge trigger)
|
||||||
template<typename F>
|
template<typename F>
|
||||||
void async_write(async_client& sock, epee::byte_slice msg, F&& f)
|
auto async_write(async_client& sock, epee::byte_slice msg, F&& f)
|
||||||
{
|
{
|
||||||
// async_compose is required for correct strand invocation, etc
|
// async_compose is required for correct strand invocation, etc
|
||||||
boost::asio::async_compose<F, void(boost::system::error_code, std::size_t)>(
|
return boost::asio::async_compose<F, void(boost::system::error_code, std::size_t)>(
|
||||||
write_msg_op{sock, std::move(msg)}, f, *sock.asock
|
write_msg_op{sock, std::move(msg)}, f, *sock.asock
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <boost/asio/ip/tcp.hpp>
|
#include <boost/asio/ip/tcp.hpp>
|
||||||
#include <boost/asio/io_service.hpp>
|
#include <boost/asio/io_service.hpp>
|
||||||
|
#include <boost/asio/spawn.hpp>
|
||||||
#include <boost/asio/steady_timer.hpp>
|
#include <boost/asio/steady_timer.hpp>
|
||||||
#include <boost/asio/strand.hpp>
|
#include <boost/asio/strand.hpp>
|
||||||
#include <boost/beast/core/error.hpp>
|
#include <boost/beast/core/error.hpp>
|
||||||
|
@ -116,66 +117,6 @@ namespace lws
|
||||||
};
|
};
|
||||||
using async_complete = void(expect<copyable_slice>);
|
using async_complete = void(expect<copyable_slice>);
|
||||||
|
|
||||||
expect<rpc::client*> thread_client(const rpc::client& gclient, const bool reset = false)
|
|
||||||
{
|
|
||||||
struct tclient
|
|
||||||
{
|
|
||||||
rpc::client client;
|
|
||||||
std::chrono::steady_clock::time_point last_connect;
|
|
||||||
|
|
||||||
explicit tclient() noexcept
|
|
||||||
: client(), last_connect(std::chrono::seconds{0})
|
|
||||||
{}
|
|
||||||
};
|
|
||||||
static boost::thread_specific_ptr<tclient> global;
|
|
||||||
|
|
||||||
tclient* thread_ptr = global.get();
|
|
||||||
if (!thread_ptr)
|
|
||||||
{
|
|
||||||
thread_ptr = new tclient;
|
|
||||||
global.reset(thread_ptr);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reset || !thread_ptr->client)
|
|
||||||
{
|
|
||||||
// This reduces ZMQ internal errors with lack of file descriptors
|
|
||||||
const auto now = std::chrono::steady_clock::now();
|
|
||||||
if (now - thread_ptr->last_connect < zmq_reconnect_backoff)
|
|
||||||
return {error::daemon_timeout};
|
|
||||||
|
|
||||||
// careful, gclient and thread_ptr->client could be aliased
|
|
||||||
expect<rpc::client> new_client = gclient.clone();
|
|
||||||
thread_ptr->client = rpc::client{};
|
|
||||||
thread_ptr->last_connect = now;
|
|
||||||
if (!new_client)
|
|
||||||
return new_client.error();
|
|
||||||
|
|
||||||
thread_ptr->client = std::move(*new_client);
|
|
||||||
}
|
|
||||||
|
|
||||||
return {std::addressof(thread_ptr->client)};
|
|
||||||
}
|
|
||||||
|
|
||||||
expect<void> send_with_retry(rpc::client& tclient, epee::byte_slice message, const std::chrono::seconds timeout)
|
|
||||||
{
|
|
||||||
expect<void> resp{common_error::kInvalidArgument};
|
|
||||||
for (unsigned i = 0; i < 2; ++i)
|
|
||||||
{
|
|
||||||
resp = tclient.send(message.clone(), timeout);
|
|
||||||
if (resp || resp != net::zmq::make_error_code(EFSM))
|
|
||||||
break;
|
|
||||||
// fix state machine by reading+discarding previously timed out response
|
|
||||||
auto read = tclient.get_message(timeout);
|
|
||||||
if (!read)
|
|
||||||
{
|
|
||||||
// message could've been delivered, then dropped in process failure
|
|
||||||
thread_client(tclient, true);
|
|
||||||
return read.error();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return resp;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool is_locked(std::uint64_t unlock_time, db::block_id last) noexcept
|
bool is_locked(std::uint64_t unlock_time, db::block_id last) noexcept
|
||||||
{
|
{
|
||||||
if (unlock_time > CRYPTONOTE_MAX_BLOCK_NUMBER)
|
if (unlock_time > CRYPTONOTE_MAX_BLOCK_NUMBER)
|
||||||
|
@ -316,7 +257,7 @@ namespace lws
|
||||||
using response = epee::byte_slice; // sometimes async
|
using response = epee::byte_slice; // sometimes async
|
||||||
using async_response = rpc::daemon_status_response;
|
using async_response = rpc::daemon_status_response;
|
||||||
|
|
||||||
static expect<response> handle(const request&, boost::asio::io_service& io, rest_server_data& data, std::function<async_complete> resume)
|
static expect<response> handle(const request&, boost::asio::io_service& io, rest_server_data& data, std::function<async_complete>&& resume)
|
||||||
{
|
{
|
||||||
using info_rpc = cryptonote::rpc::GetInfo;
|
using info_rpc = cryptonote::rpc::GetInfo;
|
||||||
|
|
||||||
|
@ -507,7 +448,7 @@ namespace lws
|
||||||
using request = rpc::account_credentials;
|
using request = rpc::account_credentials;
|
||||||
using response = rpc::get_address_info_response;
|
using response = rpc::get_address_info_response;
|
||||||
|
|
||||||
static expect<response> handle(const request& req, const boost::asio::io_service&, const rest_server_data& data, std::function<async_complete>)
|
static expect<response> handle(const request& req, const boost::asio::io_service&, const rest_server_data& data, std::function<async_complete>&&)
|
||||||
{
|
{
|
||||||
auto user = open_account(req, data.disk.clone());
|
auto user = open_account(req, data.disk.clone());
|
||||||
if (!user)
|
if (!user)
|
||||||
|
@ -581,7 +522,7 @@ namespace lws
|
||||||
using request = rpc::account_credentials;
|
using request = rpc::account_credentials;
|
||||||
using response = rpc::get_address_txs_response;
|
using response = rpc::get_address_txs_response;
|
||||||
|
|
||||||
static expect<response> handle(const request& req, const boost::asio::io_service&, const rest_server_data& data, std::function<async_complete>)
|
static expect<response> handle(const request& req, const boost::asio::io_service&, const rest_server_data& data, std::function<async_complete>&&)
|
||||||
{
|
{
|
||||||
auto user = open_account(req, data.disk.clone());
|
auto user = open_account(req, data.disk.clone());
|
||||||
if (!user)
|
if (!user)
|
||||||
|
@ -705,62 +646,200 @@ namespace lws
|
||||||
using response = void; // always asynchronous response
|
using response = void; // always asynchronous response
|
||||||
using async_response = rpc::get_random_outs_response;
|
using async_response = rpc::get_random_outs_response;
|
||||||
|
|
||||||
static expect<response> handle(request req, boost::asio::io_service& io, const rest_server_data& data, std::function<async_complete> resume)
|
static expect<response> handle(request&& req, boost::asio::io_service& io, rest_server_data& data, std::function<async_complete>&& resume)
|
||||||
{
|
{
|
||||||
using distribution_rpc = cryptonote::rpc::GetOutputDistribution;
|
using distribution_rpc = cryptonote::rpc::GetOutputDistribution;
|
||||||
using histogram_rpc = cryptonote::rpc::GetOutputHistogram;
|
using histogram_rpc = cryptonote::rpc::GetOutputHistogram;
|
||||||
using distribution_rpc = cryptonote::rpc::GetOutputDistribution;
|
using distribution_rpc = cryptonote::rpc::GetOutputDistribution;
|
||||||
|
|
||||||
std::vector<std::uint64_t> amounts = std::move(req.amounts.values);
|
if (50 < req.count || 20 < req.amounts.values.size())
|
||||||
|
|
||||||
if (50 < req.count || 20 < amounts.size())
|
|
||||||
return {lws::error::exceeded_rest_request_limit};
|
return {lws::error::exceeded_rest_request_limit};
|
||||||
|
|
||||||
const expect<rpc::client*> tclient = thread_client(data.client);
|
std::sort(req.amounts.values.begin(), req.amounts.values.end(), std::greater<>{});
|
||||||
if (!tclient)
|
|
||||||
return tclient.error();
|
|
||||||
if (*tclient == nullptr)
|
|
||||||
throw std::logic_error{"Unexpected rpc::client nullptr"};
|
|
||||||
|
|
||||||
const std::greater<std::uint64_t> rsort{};
|
struct frame
|
||||||
std::sort(amounts.begin(), amounts.end(), rsort);
|
{
|
||||||
const std::size_t ringct_count =
|
rest_server_data* parent;
|
||||||
amounts.end() - std::lower_bound(amounts.begin(), amounts.end(), 0, rsort);
|
net::zmq::async_client client;
|
||||||
|
boost::asio::steady_timer timer;
|
||||||
|
boost::asio::strand<boost::asio::io_context::executor_type> strand;
|
||||||
|
std::deque<std::pair<request, std::function<async_complete>>> resumers;
|
||||||
|
|
||||||
|
frame(rest_server_data& parent, boost::asio::io_service& io, net::zmq::async_client client)
|
||||||
|
: parent(std::addressof(parent)),
|
||||||
|
client(std::move(client)),
|
||||||
|
timer(io),
|
||||||
|
strand(io.get_executor()),
|
||||||
|
resumers()
|
||||||
|
{}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct cached_result
|
||||||
|
{
|
||||||
|
std::weak_ptr<frame> status;
|
||||||
|
boost::mutex sync;
|
||||||
|
|
||||||
|
cached_result() noexcept
|
||||||
|
: status(), sync()
|
||||||
|
{}
|
||||||
|
};
|
||||||
|
|
||||||
|
static cached_result cache;
|
||||||
|
boost::unique_lock<boost::mutex> lock{cache.sync};
|
||||||
|
|
||||||
|
auto active = cache.status.lock();
|
||||||
|
if (active)
|
||||||
|
{
|
||||||
|
active->resumers.emplace_back(std::move(req), std::move(resume));
|
||||||
|
return success();
|
||||||
|
}
|
||||||
|
|
||||||
|
struct async_handler
|
||||||
|
{
|
||||||
|
std::shared_ptr<frame> self_;
|
||||||
|
|
||||||
|
explicit async_handler(std::shared_ptr<frame> self)
|
||||||
|
: self_(std::move(self))
|
||||||
|
{}
|
||||||
|
|
||||||
|
void send_response(const boost::system::error_code error, expect<copyable_slice> value) const
|
||||||
|
{
|
||||||
|
assert(self_ != nullptr);
|
||||||
|
|
||||||
|
std::deque<std::pair<request, std::function<async_complete>>> resumers;
|
||||||
|
{
|
||||||
|
const boost::lock_guard<boost::mutex> lock{cache.sync};
|
||||||
|
if (error)
|
||||||
|
{
|
||||||
|
// Prevent further resumers, ZMQ REQ/REP in bad state
|
||||||
|
MERROR("Failure in /get_random_outs: " << error.message());
|
||||||
|
if (value)
|
||||||
|
value = {lws::error::daemon_timeout};
|
||||||
|
cache.status.reset();
|
||||||
|
resumers.swap(self_->resumers);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MDEBUG("Completed ZMQ request in /get_random_outs");
|
||||||
|
resumers.push_back(std::move(self_->resumers.front()));
|
||||||
|
self_->resumers.pop_front();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto& r : resumers)
|
||||||
|
r.second(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool set_timeout(std::chrono::steady_clock::duration timeout, const bool expecting) const
|
||||||
|
{
|
||||||
|
assert(self_ != nullptr);
|
||||||
|
if (!self_->timer.expires_after(timeout) && expecting)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
auto& self = self_;
|
||||||
|
self->timer.async_wait(
|
||||||
|
[self] (boost::system::error_code error)
|
||||||
|
{
|
||||||
|
if (error == boost::asio::error::operation_aborted)
|
||||||
|
return;
|
||||||
|
|
||||||
|
self->strand.dispatch(
|
||||||
|
[self] ()
|
||||||
|
{
|
||||||
|
boost::system::error_code error{};
|
||||||
|
MWARNING("Timeout on /get_random_outs ZMQ call");
|
||||||
|
self->client.close = true;
|
||||||
|
self->client.asock->cancel(error);
|
||||||
|
},
|
||||||
|
boost::asio::get_associated_allocator(*self)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void operator()(boost::asio::yield_context yield) const
|
||||||
|
{
|
||||||
|
if (!self_)
|
||||||
|
return;
|
||||||
|
|
||||||
|
std::chrono::steady_clock::time_point last{std::chrono::seconds{0}};
|
||||||
|
std::vector<std::uint64_t> distributions{};
|
||||||
|
request next{};
|
||||||
|
|
||||||
|
for (;;)
|
||||||
|
{
|
||||||
|
{
|
||||||
|
const boost::lock_guard<boost::mutex> lock{cache.sync};
|
||||||
|
if (self_->resumers.empty())
|
||||||
|
{
|
||||||
|
cache.status.reset();
|
||||||
|
self_->parent->store_async_client(std::move(self_->client));
|
||||||
|
MDEBUG("Finishing ZMQ coroutine in /get_random_outs");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
next = std::move(self_->resumers.front().first);
|
||||||
|
}
|
||||||
|
|
||||||
|
boost::system::error_code error{};
|
||||||
std::vector<lws::histogram> histograms{};
|
std::vector<lws::histogram> histograms{};
|
||||||
if (ringct_count < amounts.size())
|
const std::size_t ringct_count =
|
||||||
|
next.amounts.values.end() -
|
||||||
|
std::lower_bound(
|
||||||
|
next.amounts.values.begin(), next.amounts.values.end(), 0, std::greater<>{}
|
||||||
|
);
|
||||||
|
|
||||||
|
if (ringct_count < next.amounts.values.size())
|
||||||
{
|
{
|
||||||
// reuse allocated vector memory
|
// reuse allocated vector memory
|
||||||
amounts.resize(amounts.size() - ringct_count);
|
next.amounts.values.resize(next.amounts.values.size() - ringct_count);
|
||||||
|
|
||||||
histogram_rpc::Request histogram_req{};
|
histogram_rpc::Request histogram_req{};
|
||||||
histogram_req.amounts = std::move(amounts);
|
histogram_req.amounts = std::move(next.amounts.values);
|
||||||
histogram_req.min_count = 0;
|
histogram_req.min_count = 0;
|
||||||
histogram_req.max_count = 0;
|
histogram_req.max_count = 0;
|
||||||
histogram_req.unlocked = true;
|
histogram_req.unlocked = true;
|
||||||
histogram_req.recent_cutoff = 0;
|
histogram_req.recent_cutoff = 0;
|
||||||
|
|
||||||
epee::byte_slice msg = rpc::client::make_message("get_output_histogram", histogram_req);
|
epee::byte_slice msg = rpc::client::make_message("get_output_histogram", histogram_req);
|
||||||
MONERO_CHECK(send_with_retry(**tclient, std::move(msg), std::chrono::seconds{10}));
|
|
||||||
|
|
||||||
auto histogram_resp = (*tclient)->receive<histogram_rpc::Response>(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION);
|
MDEBUG("Fetching histograms for /get_random_outs");
|
||||||
if (!histogram_resp)
|
set_timeout(std::chrono::seconds{10}, false);
|
||||||
return histogram_resp.error();
|
net::zmq::async_write(self_->client, std::move(msg), yield[error]);
|
||||||
if (histogram_resp->histogram.size() != histogram_req.amounts.size())
|
if (error)
|
||||||
return {lws::error::bad_daemon_response};
|
return send_response(error, async_ready());
|
||||||
|
if (!set_timeout(std::chrono::minutes{3}, true))
|
||||||
|
return send_response(boost::asio::error::operation_aborted, async_ready());
|
||||||
|
|
||||||
histograms = std::move(histogram_resp->histogram);
|
std::string in;
|
||||||
|
net::zmq::async_read(self_->client, in, yield[error]);
|
||||||
|
if (error)
|
||||||
|
return send_response(error, async_ready());
|
||||||
|
if (!self_->timer.cancel(error))
|
||||||
|
return send_response(boost::asio::error::operation_aborted, async_ready());
|
||||||
|
|
||||||
amounts = std::move(histogram_req.amounts);
|
histogram_rpc::Response histogram_resp{};
|
||||||
amounts.insert(amounts.end(), ringct_count, 0);
|
const expect<void> status =
|
||||||
|
rpc::parse_response(histogram_resp, std::move(in));
|
||||||
|
if (!status)
|
||||||
|
return send_response(boost::asio::error::invalid_argument, status.error());
|
||||||
|
if (histogram_resp.histogram.size() != histogram_req.amounts.size())
|
||||||
|
return send_response(boost::asio::error::invalid_argument, {lws::error::bad_daemon_response});
|
||||||
|
|
||||||
|
histograms = std::move(histogram_resp.histogram);
|
||||||
|
|
||||||
|
next.amounts.values = std::move(histogram_req.amounts);
|
||||||
|
next.amounts.values.insert(next.amounts.values.end(), ringct_count, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<std::uint64_t> distributions{};
|
if (ringct_count && (distributions.empty() || (daemon_cache_timeout < std::chrono::steady_clock::now() - last)))
|
||||||
if (ringct_count)
|
|
||||||
{
|
{
|
||||||
distribution_rpc::Request distribution_req{};
|
distribution_rpc::Request distribution_req{};
|
||||||
if (ringct_count == amounts.size())
|
if (ringct_count == next.amounts.values.size())
|
||||||
distribution_req.amounts = std::move(amounts);
|
{
|
||||||
|
distribution_req.amounts = std::move(next.amounts.values);
|
||||||
|
next.amounts.values.clear();
|
||||||
|
}
|
||||||
|
|
||||||
distribution_req.amounts.resize(1);
|
distribution_req.amounts.resize(1);
|
||||||
distribution_req.from_height = 0;
|
distribution_req.from_height = 0;
|
||||||
|
@ -769,24 +848,41 @@ namespace lws
|
||||||
|
|
||||||
epee::byte_slice msg =
|
epee::byte_slice msg =
|
||||||
rpc::client::make_message("get_output_distribution", distribution_req);
|
rpc::client::make_message("get_output_distribution", distribution_req);
|
||||||
MONERO_CHECK(send_with_retry(**tclient, std::move(msg), std::chrono::seconds{10}));
|
|
||||||
|
|
||||||
auto distribution_resp =
|
MDEBUG("Fetching distributions for /get_random_outs");
|
||||||
(*tclient)->receive<distribution_rpc::Response>(std::chrono::minutes{3}, MLWS_CURRENT_LOCATION);
|
set_timeout(std::chrono::seconds{10}, false);
|
||||||
if (!distribution_resp)
|
net::zmq::async_write(self_->client, std::move(msg), yield[error]);
|
||||||
return distribution_resp.error();
|
if (error)
|
||||||
|
return send_response(error, async_ready());
|
||||||
|
if (!set_timeout(std::chrono::minutes{3}, true))
|
||||||
|
return send_response(boost::asio::error::operation_aborted, async_ready());
|
||||||
|
|
||||||
if (distribution_resp->distributions.size() != 1)
|
std::string in;
|
||||||
return {lws::error::bad_daemon_response};
|
net::zmq::async_read(self_->client, in, yield[error]);
|
||||||
if (distribution_resp->distributions[0].amount != 0)
|
if (error)
|
||||||
return {lws::error::bad_daemon_response};
|
return send_response(error, async_ready());
|
||||||
|
if (!self_->timer.cancel(error))
|
||||||
|
return send_response(boost::asio::error::operation_aborted, async_ready());
|
||||||
|
|
||||||
distributions = std::move(distribution_resp->distributions[0].data.distribution);
|
distribution_rpc::Response distribution_resp{};
|
||||||
|
const expect<void> status =
|
||||||
|
rpc::parse_response(distribution_resp, std::move(in));
|
||||||
|
if (!status)
|
||||||
|
return send_response(boost::asio::error::invalid_argument, status.error());
|
||||||
|
if (distribution_resp.distributions.size() != 1)
|
||||||
|
return send_response(boost::asio::error::invalid_argument, {lws::error::bad_daemon_response});
|
||||||
|
if (distribution_resp.distributions[0].amount != 0)
|
||||||
|
return send_response(boost::asio::error::invalid_argument, {lws::error::bad_daemon_response});
|
||||||
|
|
||||||
if (amounts.empty())
|
last = std::chrono::steady_clock::now();
|
||||||
|
distributions = std::move(distribution_resp.distributions[0].data.distribution);
|
||||||
|
|
||||||
|
if (next.amounts.values.empty())
|
||||||
{
|
{
|
||||||
amounts = std::move(distribution_req.amounts);
|
next.amounts.values = std::move(distribution_req.amounts);
|
||||||
amounts.insert(amounts.end(), ringct_count - 1, 0);
|
next.amounts.values.insert(
|
||||||
|
next.amounts.values.end(), ringct_count - 1, 0
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -796,10 +892,12 @@ namespace lws
|
||||||
const and copied in the function instead of using a reference to
|
const and copied in the function instead of using a reference to
|
||||||
make the callback in `std::function` thread-safe. This shouldn't
|
make the callback in `std::function` thread-safe. This shouldn't
|
||||||
be a problem now, but this is just-in-case of a future refactor. */
|
be a problem now, but this is just-in-case of a future refactor. */
|
||||||
rpc::client* tclient;
|
async_handler self_;
|
||||||
|
boost::asio::yield_context yield_;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
zmq_fetch_keys(rpc::client* src) noexcept
|
zmq_fetch_keys(async_handler self, boost::asio::yield_context yield)
|
||||||
: tclient(src)
|
: self_(std::move(self)), yield_(std::move(yield))
|
||||||
{}
|
{}
|
||||||
|
|
||||||
zmq_fetch_keys(zmq_fetch_keys&&) = default;
|
zmq_fetch_keys(zmq_fetch_keys&&) = default;
|
||||||
|
@ -808,35 +906,75 @@ namespace lws
|
||||||
expect<std::vector<output_keys>> operator()(std::vector<lws::output_ref> ids) const
|
expect<std::vector<output_keys>> operator()(std::vector<lws::output_ref> ids) const
|
||||||
{
|
{
|
||||||
using get_keys_rpc = cryptonote::rpc::GetOutputKeys;
|
using get_keys_rpc = cryptonote::rpc::GetOutputKeys;
|
||||||
if (tclient == nullptr)
|
if (self_.self_ == nullptr)
|
||||||
throw std::logic_error{"Unexpected nullptr in zmq_fetch_keys"};
|
throw std::logic_error{"Unexpected nullptr in zmq_fetch_keys"};
|
||||||
|
|
||||||
|
boost::system::error_code error{};
|
||||||
get_keys_rpc::Request keys_req{};
|
get_keys_rpc::Request keys_req{};
|
||||||
keys_req.outputs = std::move(ids);
|
keys_req.outputs = std::move(ids);
|
||||||
|
|
||||||
epee::byte_slice msg = rpc::client::make_message("get_output_keys", keys_req);
|
epee::byte_slice msg = rpc::client::make_message("get_output_keys", keys_req);
|
||||||
MONERO_CHECK(send_with_retry(*tclient, std::move(msg), std::chrono::seconds{10}));
|
|
||||||
|
|
||||||
auto keys_resp = tclient->receive<get_keys_rpc::Response>(std::chrono::seconds{10}, MLWS_CURRENT_LOCATION);
|
self_.set_timeout(std::chrono::seconds{10}, false);
|
||||||
if (!keys_resp)
|
net::zmq::async_write(self_.self_->client, std::move(msg), yield_[error]);
|
||||||
return keys_resp.error();
|
|
||||||
|
|
||||||
return {std::move(keys_resp->keys)};
|
if (error)
|
||||||
|
{
|
||||||
|
MERROR("Internal ZMQ error in /get_random_outs: " << error.message());
|
||||||
|
return {error::daemon_timeout};
|
||||||
|
}
|
||||||
|
if (!self_.set_timeout(std::chrono::seconds{10}, true))
|
||||||
|
return {error::daemon_timeout};
|
||||||
|
|
||||||
|
std::string in;
|
||||||
|
net::zmq::async_read(self_.self_->client, in, yield_[error]);
|
||||||
|
if (error)
|
||||||
|
{
|
||||||
|
MERROR("Internal ZMQ error in /get_random_outs: " << error.message());
|
||||||
|
return {error::daemon_timeout};
|
||||||
|
}
|
||||||
|
if (!self_.self_->timer.cancel(error))
|
||||||
|
return {error::daemon_timeout};
|
||||||
|
|
||||||
|
get_keys_rpc::Response keys_resp{};
|
||||||
|
const expect<void> status =
|
||||||
|
rpc::parse_response(keys_resp, std::move(in));
|
||||||
|
if (!status)
|
||||||
|
return status.error();
|
||||||
|
|
||||||
|
return {std::move(keys_resp.keys)};
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
lws::gamma_picker pick_rct{std::move(distributions)};
|
lws::gamma_picker pick_rct{std::move(distributions)};
|
||||||
auto rings = pick_random_outputs(
|
auto rings = pick_random_outputs(
|
||||||
req.count,
|
next.count,
|
||||||
epee::to_span(amounts),
|
epee::to_span(next.amounts.values),
|
||||||
pick_rct,
|
pick_rct,
|
||||||
epee::to_mut_span(histograms),
|
epee::to_mut_span(histograms),
|
||||||
zmq_fetch_keys{*tclient}
|
zmq_fetch_keys{*this, yield}
|
||||||
);
|
);
|
||||||
|
distributions = pick_rct.take_offsets();
|
||||||
if (!rings)
|
if (!rings)
|
||||||
return rings.error();
|
return send_response(boost::asio::error::invalid_argument, rings.error());
|
||||||
|
else
|
||||||
|
send_response({}, json_response(async_response{std::move(*rings)}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
resume(json_response(async_response{std::move(*rings)}));
|
expect<net::zmq::async_client> client = data.get_async_client(io);
|
||||||
|
if (!client)
|
||||||
|
return client.error();
|
||||||
|
|
||||||
|
active = std::make_shared<frame>(data, io, std::move(*client));
|
||||||
|
cache.status = active;
|
||||||
|
|
||||||
|
active->resumers.emplace_back(std::move(req), std::move(resume));
|
||||||
|
lock.unlock();
|
||||||
|
|
||||||
|
MDEBUG("Starting new ZMQ coroutine in /get_random_outs");
|
||||||
|
boost::asio::spawn(active->strand, async_handler{active});
|
||||||
return success();
|
return success();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -846,7 +984,7 @@ namespace lws
|
||||||
using request = rpc::account_credentials;
|
using request = rpc::account_credentials;
|
||||||
using response = rpc::get_subaddrs_response;
|
using response = rpc::get_subaddrs_response;
|
||||||
|
|
||||||
static expect<response> handle(request const& req, const boost::asio::io_service&, const rest_server_data& data, std::function<async_complete>)
|
static expect<response> handle(request const& req, const boost::asio::io_service&, const rest_server_data& data, std::function<async_complete>&&)
|
||||||
{
|
{
|
||||||
auto user = open_account(req, data.disk.clone());
|
auto user = open_account(req, data.disk.clone());
|
||||||
if (!user)
|
if (!user)
|
||||||
|
@ -925,7 +1063,7 @@ namespace lws
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
static expect<response> handle(request req, boost::asio::io_service& io, rest_server_data& data, std::function<async_complete> resume)
|
static expect<response> handle(request req, boost::asio::io_service& io, rest_server_data& data, std::function<async_complete>&& resume)
|
||||||
{
|
{
|
||||||
struct frame
|
struct frame
|
||||||
{
|
{
|
||||||
|
@ -1109,7 +1247,7 @@ namespace lws
|
||||||
using request = rpc::account_credentials;
|
using request = rpc::account_credentials;
|
||||||
using response = rpc::import_response;
|
using response = rpc::import_response;
|
||||||
|
|
||||||
static expect<response> handle(request req, const boost::asio::io_service&, const rest_server_data& data, std::function<async_complete>)
|
static expect<response> handle(request req, const boost::asio::io_service&, const rest_server_data& data, std::function<async_complete>&&)
|
||||||
{
|
{
|
||||||
bool new_request = false;
|
bool new_request = false;
|
||||||
bool fulfilled = false;
|
bool fulfilled = false;
|
||||||
|
@ -1149,7 +1287,7 @@ namespace lws
|
||||||
using request = rpc::login_request;
|
using request = rpc::login_request;
|
||||||
using response = rpc::login_response;
|
using response = rpc::login_response;
|
||||||
|
|
||||||
static expect<response> handle(request req, boost::asio::io_service& io, const rest_server_data& data, std::function<async_complete> resume)
|
static expect<response> handle(request req, boost::asio::io_service& io, const rest_server_data& data, std::function<async_complete>&& resume)
|
||||||
{
|
{
|
||||||
if (!key_check(req.creds))
|
if (!key_check(req.creds))
|
||||||
return {lws::error::bad_view_key};
|
return {lws::error::bad_view_key};
|
||||||
|
@ -1206,7 +1344,7 @@ namespace lws
|
||||||
using request = rpc::provision_subaddrs_request;
|
using request = rpc::provision_subaddrs_request;
|
||||||
using response = rpc::new_subaddrs_response;
|
using response = rpc::new_subaddrs_response;
|
||||||
|
|
||||||
static expect<response> handle(request req, const boost::asio::io_service&, const rest_server_data& data, std::function<async_complete>)
|
static expect<response> handle(request req, const boost::asio::io_service&, const rest_server_data& data, std::function<async_complete>&&)
|
||||||
{
|
{
|
||||||
if (!req.maj_i && !req.min_i && !req.n_min && !req.n_maj)
|
if (!req.maj_i && !req.min_i && !req.n_min && !req.n_maj)
|
||||||
return {lws::error::invalid_range};
|
return {lws::error::invalid_range};
|
||||||
|
@ -1494,7 +1632,7 @@ namespace lws
|
||||||
};
|
};
|
||||||
|
|
||||||
template<typename E>
|
template<typename E>
|
||||||
expect<epee::byte_slice> call(std::string&& root, boost::asio::io_service& io, rest_server_data& data, std::function<async_complete> resume)
|
expect<epee::byte_slice> call(std::string&& root, boost::asio::io_service& io, rest_server_data& data, std::function<async_complete>&& resume)
|
||||||
{
|
{
|
||||||
using request = typename E::request;
|
using request = typename E::request;
|
||||||
using response = typename E::response;
|
using response = typename E::response;
|
||||||
|
@ -1534,7 +1672,7 @@ namespace lws
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename E>
|
template<typename E>
|
||||||
expect<epee::byte_slice> call_admin(std::string&& root, boost::asio::io_service&, rest_server_data& data, std::function<async_complete>)
|
expect<epee::byte_slice> call_admin(std::string&& root, boost::asio::io_service&, rest_server_data& data, std::function<async_complete>&&)
|
||||||
{
|
{
|
||||||
using request = typename E::request;
|
using request = typename E::request;
|
||||||
|
|
||||||
|
@ -1575,7 +1713,7 @@ namespace lws
|
||||||
struct endpoint
|
struct endpoint
|
||||||
{
|
{
|
||||||
char const* const name;
|
char const* const name;
|
||||||
expect<epee::byte_slice> (*const run)(std::string&&, boost::asio::io_service&, rest_server_data&, std::function<async_complete>);
|
expect<epee::byte_slice> (*const run)(std::string&&, boost::asio::io_service&, rest_server_data&, std::function<async_complete>&&);
|
||||||
const unsigned max_size;
|
const unsigned max_size;
|
||||||
const bool is_async;
|
const bool is_async;
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in a new issue