Added enable_pull_accounts to remove racy registration

This commit is contained in:
Lee *!* Clagett 2024-04-07 14:14:24 -04:00
parent 75e899ebd6
commit 1b0e4ff642
3 changed files with 21 additions and 13 deletions

View file

@ -376,6 +376,18 @@ namespace rpc
return do_subscribe(signal_sub.get(), abort_scan_signal);
}
expect<void> client::enable_pull_accounts()
{
detail::socket new_sock{zmq_socket(ctx->comm.get(), ZMQ_PULL)};
if (new_sock == nullptr)
return {net::zmq::get_error_code()};
const std::string connect =
account_endpoint + std::to_string(ctx->account_counter);
MONERO_ZMQ_CHECK(zmq_connect(new_sock.get(), connect.c_str()));
account_pull = std::move(new_sock);
return success();
}
expect<std::vector<std::pair<client::topic, std::string>>> client::wait_for_block()
{
MONERO_PRECOND(ctx != nullptr);
@ -494,15 +506,7 @@ namespace rpc
MONERO_PRECOND(ctx != nullptr);
if (!account_pull)
{
detail::socket new_sock{zmq_socket(ctx->comm.get(), ZMQ_PULL)};
if (new_sock == nullptr)
return {net::zmq::get_error_code()};
const std::string connect =
account_endpoint + std::to_string(ctx->account_counter);
MONERO_ZMQ_CHECK(zmq_connect(new_sock.get(), connect.c_str()));
account_pull = std::move(new_sock);
}
MONERO_CHECK(enable_pull_accounts());
std::vector<lws::account> out{};
for (;;)

View file

@ -157,7 +157,7 @@ namespace rpc
expect<void> watch_scan_signals() noexcept;
//! Register `this` client as listening for new accounts
expect<void> enable_pull_new_accounts() noexcept;
expect<void> enable_pull_accounts();
//! Wait for new block announce or internal timeout.
expect<std::vector<std::pair<topic, std::string>>> wait_for_block();

View file

@ -1014,6 +1014,9 @@ namespace lws
threads.reserve(thread_count);
std::sort(users.begin(), users.end(), by_height{});
// enable the new bind point before registering pull accounts
lws::rpc::account_push pusher = MONERO_UNWRAP(ctx.bind_push());
MINFO("Starting scan loops on " << std::min(thread_count, users.size()) << " thread(s) with " << users.size() << " account(s)");
bool leader_thread = true;
@ -1027,7 +1030,8 @@ namespace lws
users.erase(users.end() - count, users.end());
rpc::client client = MONERO_UNWRAP(ctx.connect());
client.watch_scan_signals();
MONERO_UNWRAP(client.watch_scan_signals());
MONERO_UNWRAP(client.enable_pull_accounts());
auto data = std::make_shared<thread_data>(
std::move(client), disk.clone(), std::move(thread_users), opts
@ -1039,7 +1043,8 @@ namespace lws
if (!users.empty())
{
rpc::client client = MONERO_UNWRAP(ctx.connect());
client.watch_scan_signals();
MONERO_UNWRAP(client.watch_scan_signals());
MONERO_UNWRAP(client.enable_pull_accounts());
auto data = std::make_shared<thread_data>(
std::move(client), disk.clone(), std::move(users), opts
@ -1052,7 +1057,6 @@ namespace lws
lmdb::suspended_txn read_txn{};
db::cursor::accounts accounts_cur{};
boost::unique_lock<boost::mutex> lock{self.sync};
lws::rpc::account_push pusher = MONERO_UNWRAP(ctx.bind_push());
while (scanner::is_running())
{