Stratum: check found P2Pool shares one at a time

Parallel checks could result in a deadlock
This commit is contained in:
SChernykh 2024-10-31 12:10:10 +01:00
parent 031a1c2eea
commit a3006cb51a
3 changed files with 87 additions and 67 deletions

View file

@ -52,6 +52,7 @@
#include <array>
#include <vector>
#include <deque>
#include <string>
#include <algorithm>
#include <atomic>

View file

@ -72,13 +72,6 @@ StratumServer::StratumServer(p2pool* pool)
m_extraNonce = get_random32();
m_submittedSharesPool.resize(10);
for (size_t i = 0; i < m_submittedSharesPool.size(); ++i) {
SubmittedShare* share = new SubmittedShare{};
ASAN_POISON_MEMORY_REGION(share, sizeof(SubmittedShare));
m_submittedSharesPool[i] = share;
}
uv_async_init_checked(&m_loop, &m_blobsAsync, on_blobs_ready);
m_blobsAsync.data = this;
m_blobsQueue.reserve(2);
@ -106,11 +99,6 @@ StratumServer::~StratumServer()
uv_mutex_destroy(&m_showWorkersLock);
uv_mutex_destroy(&m_rngLock);
uv_rwlock_destroy(&m_hashrateDataLock);
for (SubmittedShare* share : m_submittedSharesPool) {
ASAN_UNPOISON_MEMORY_REGION(share, sizeof(SubmittedShare));
delete share;
}
}
void StratumServer::on_block(const BlockTemplate& block)
@ -440,72 +428,77 @@ bool StratumServer::on_submit(StratumClient* client, uint32_t id, const char* jo
}
}
SubmittedShare* share;
if (!m_submittedSharesPool.empty()) {
share = m_submittedSharesPool.back();
m_submittedSharesPool.pop_back();
ASAN_UNPOISON_MEMORY_REGION(share, sizeof(SubmittedShare));
}
else {
share = new SubmittedShare{};
}
if (target >= TARGET_4_BYTES_LIMIT) {
// "Low diff share" fix: adjust target to the same value as XMRig would use
target = std::numeric_limits<uint64_t>::max() / (std::numeric_limits<uint32_t>::max() / (target >> 32));
}
share->m_req.data = share;
share->m_server = this;
share->m_client = client;
share->m_clientIPv6 = client->m_isV6;
share->m_clientAddr = client->m_addr;
memcpy(share->m_clientAddrString, client->m_addrString, sizeof(share->m_clientAddrString));
memcpy(share->m_clientCustomUser, client->m_customUser, sizeof(share->m_clientCustomUser));
share->m_clientResetCounter = client->m_resetCounter.load();
share->m_rpcId = client->m_rpcId;
share->m_id = id;
share->m_templateId = template_id;
share->m_nonce = nonce;
share->m_extraNonce = extra_nonce;
share->m_target = target;
share->m_resultHash = resultHash;
share->m_sidechainDifficulty = sidechain_diff;
share->m_mainchainHeight = height;
share->m_sidechainHeight = sidechain_height;
share->m_effort = -1.0;
share->m_timestamp = seconds_since_epoch();
SubmittedShare share{};
share.m_req.data = &share;
share.m_allocated = false;
share.m_server = this;
share.m_client = client;
share.m_clientIPv6 = client->m_isV6;
share.m_clientAddr = client->m_addr;
memcpy(share.m_clientAddrString, client->m_addrString, sizeof(share.m_clientAddrString));
memcpy(share.m_clientCustomUser, client->m_customUser, sizeof(share.m_clientCustomUser));
share.m_clientResetCounter = client->m_resetCounter.load();
share.m_rpcId = client->m_rpcId;
share.m_id = id;
share.m_templateId = template_id;
share.m_nonce = nonce;
share.m_extraNonce = extra_nonce;
share.m_target = target;
share.m_resultHash = resultHash;
share.m_sidechainDifficulty = sidechain_diff;
share.m_mainchainHeight = height;
share.m_sidechainHeight = sidechain_height;
share.m_effort = -1.0;
share.m_timestamp = seconds_since_epoch();
uint64_t rem;
share->m_hashes = (target > 1) ? udiv128(1, 0, target, &rem) : 1;
share->m_highEnoughDifficulty = sidechain_diff.check_pow(resultHash);
share->m_score = 0;
share.m_hashes = (target > 1) ? udiv128(1, 0, target, &rem) : 1;
share.m_highEnoughDifficulty = sidechain_diff.check_pow(resultHash);
share.m_score = 0;
// Don't count shares that were found during sync
const SideChain& side_chain = m_pool->side_chain();
const PoolBlock* tip = side_chain.chainTip();
if (tip && (sidechain_height + side_chain.chain_window_size() < tip->m_sidechainHeight)) {
share->m_highEnoughDifficulty = false;
share.m_highEnoughDifficulty = false;
}
update_auto_diff(client, share->m_timestamp, share->m_hashes);
update_auto_diff(client, share.m_timestamp, share.m_hashes);
// If this share is below sidechain difficulty, process it in this thread because it'll be quick
if (!share->m_highEnoughDifficulty) {
on_share_found(&share->m_req);
on_after_share_found(&share->m_req, 0);
if (!share.m_highEnoughDifficulty) {
on_share_found(&share.m_req);
on_after_share_found(&share.m_req, 0);
return true;
}
// Else switch to a worker thread to check PoW which can take a long time
const int err = uv_queue_work(&m_loop, &share->m_req, on_share_found, on_after_share_found);
if (err) {
LOGERR(1, "uv_queue_work failed, error " << uv_err_name(err));
SubmittedShare* share2 = new SubmittedShare(share);
// If uv_queue_work failed, process this share here anyway
on_share_found(&share->m_req);
on_after_share_found(&share->m_req, 0);
share2->m_req.data = share2;
share2->m_allocated = true;
m_pendingShareChecks.push_back(share2);
LOGINFO(5, "on_submit: pending share checks count = " << m_pendingShareChecks.size());
// If there were no pending share checks, run on_share_found in background
// on_after_share_found will pick the remaining share checks
if (m_pendingShareChecks.size() == 1) {
const int err = uv_queue_work(&m_loop, &share2->m_req, on_share_found, on_after_share_found);
if (err) {
LOGERR(1, "uv_queue_work failed, error " << uv_err_name(err));
// If uv_queue_work failed, process this share here anyway
on_share_found(&share2->m_req);
on_after_share_found(&share2->m_req, 0);
}
}
return true;
@ -1017,6 +1010,38 @@ void StratumServer::on_share_found(uv_work_t* req)
void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/)
{
SubmittedShare* share = reinterpret_cast<SubmittedShare*>(req->data);
StratumServer* server = share->m_server;
server->check_event_loop_thread(__func__);
ON_SCOPE_LEAVE([share, server]()
{
if (!share->m_allocated) {
return;
}
auto it = std::find(server->m_pendingShareChecks.begin(), server->m_pendingShareChecks.end(), share);
if (it != server->m_pendingShareChecks.end()) {
server->m_pendingShareChecks.erase(it);
}
delete share;
if (!server->m_pendingShareChecks.empty()) {
SubmittedShare* share2 = server->m_pendingShareChecks.front();
const int err = uv_queue_work(&server->m_loop, &share2->m_req, on_share_found, on_after_share_found);
if (err) {
LOGERR(1, "uv_queue_work failed, error " << uv_err_name(err));
// If uv_queue_work failed, process this share here anyway
server->on_share_found(&share2->m_req);
server->on_after_share_found(&share2->m_req, 0);
}
}
LOGINFO(5, "on_after_share_found: pending share checks count = " << server->m_pendingShareChecks.size());
});
if (share->m_highEnoughDifficulty) {
const char* s = share->m_clientCustomUser;
@ -1040,14 +1065,6 @@ void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/)
BACKGROUND_JOB_STOP(StratumServer::on_share_found);
}
StratumServer* server = share->m_server;
ON_SCOPE_LEAVE([share, server]()
{
ASAN_POISON_MEMORY_REGION(share, sizeof(SubmittedShare));
server->m_submittedSharesPool.push_back(share);
});
const bool bad_share = (share->m_result == SubmittedShare::Result::LOW_DIFF) || (share->m_result == SubmittedShare::Result::INVALID_POW);
StratumClient* client = share->m_client;

View file

@ -147,6 +147,8 @@ private:
struct SubmittedShare
{
uv_work_t m_req;
bool m_allocated;
StratumServer* m_server;
StratumClient* m_client;
bool m_clientIPv6;
@ -180,8 +182,6 @@ private:
} m_result;
};
std::vector<SubmittedShare*> m_submittedSharesPool;
struct HashrateData
{
uint64_t m_timestamp;
@ -204,6 +204,8 @@ private:
std::atomic<uint64_t> m_apiLastUpdateTime;
std::deque<SubmittedShare*> m_pendingShareChecks;
void update_hashrate_data(uint64_t hashes, uint64_t timestamp);
void api_update_local_stats(uint64_t timestamp);