From a3006cb51a2624b47128d42e17297a6c5f04c7bd Mon Sep 17 00:00:00 2001
From: SChernykh <15806605+SChernykh@users.noreply.github.com>
Date: Thu, 31 Oct 2024 12:10:10 +0100
Subject: [PATCH] Stratum: check found P2Pool shares one at a time

Parallel checks could result in a deadlock
---
 src/common.h           |   1 +
 src/stratum_server.cpp | 147 +++++++++++++++++++++++------------------
 src/stratum_server.h   |   6 +-
 3 files changed, 87 insertions(+), 67 deletions(-)

diff --git a/src/common.h b/src/common.h
index 6851816..5110398 100644
--- a/src/common.h
+++ b/src/common.h
@@ -52,6 +52,7 @@
 
 #include <array>
 #include <vector>
+#include <deque>
 #include <string>
 #include <algorithm>
 #include <atomic>
diff --git a/src/stratum_server.cpp b/src/stratum_server.cpp
index 3bd6fa2..95ecb03 100644
--- a/src/stratum_server.cpp
+++ b/src/stratum_server.cpp
@@ -72,13 +72,6 @@ StratumServer::StratumServer(p2pool* pool)
 
 	m_extraNonce = get_random32();
 
-	m_submittedSharesPool.resize(10);
-	for (size_t i = 0; i < m_submittedSharesPool.size(); ++i) {
-		SubmittedShare* share = new SubmittedShare{};
-		ASAN_POISON_MEMORY_REGION(share, sizeof(SubmittedShare));
-		m_submittedSharesPool[i] = share;
-	}
-
 	uv_async_init_checked(&m_loop, &m_blobsAsync, on_blobs_ready);
 	m_blobsAsync.data = this;
 	m_blobsQueue.reserve(2);
@@ -106,11 +99,6 @@ StratumServer::~StratumServer()
 	uv_mutex_destroy(&m_showWorkersLock);
 	uv_mutex_destroy(&m_rngLock);
 	uv_rwlock_destroy(&m_hashrateDataLock);
-
-	for (SubmittedShare* share : m_submittedSharesPool) {
-		ASAN_UNPOISON_MEMORY_REGION(share, sizeof(SubmittedShare));
-		delete share;
-	}
 }
 
 void StratumServer::on_block(const BlockTemplate& block)
@@ -440,72 +428,77 @@ bool StratumServer::on_submit(StratumClient* client, uint32_t id, const char* jo
 			}
 		}
 
-		SubmittedShare* share;
-
-		if (!m_submittedSharesPool.empty()) {
-			share = m_submittedSharesPool.back();
-			m_submittedSharesPool.pop_back();
-			ASAN_UNPOISON_MEMORY_REGION(share, sizeof(SubmittedShare));
-		}
-		else {
-			share = new SubmittedShare{};
-		}
-
 		if (target >= TARGET_4_BYTES_LIMIT) {
 			// "Low diff share" fix: adjust target to the same value as XMRig would use
 			target = std::numeric_limits<uint64_t>::max() / (std::numeric_limits<uint32_t>::max() / (target >> 32));
 		}
 
-		share->m_req.data = share;
-		share->m_server = this;
-		share->m_client = client;
-		share->m_clientIPv6 = client->m_isV6;
-		share->m_clientAddr = client->m_addr;
-		memcpy(share->m_clientAddrString, client->m_addrString, sizeof(share->m_clientAddrString));
-		memcpy(share->m_clientCustomUser, client->m_customUser, sizeof(share->m_clientCustomUser));
-		share->m_clientResetCounter = client->m_resetCounter.load();
-		share->m_rpcId = client->m_rpcId;
-		share->m_id = id;
-		share->m_templateId = template_id;
-		share->m_nonce = nonce;
-		share->m_extraNonce = extra_nonce;
-		share->m_target = target;
-		share->m_resultHash = resultHash;
-		share->m_sidechainDifficulty = sidechain_diff;
-		share->m_mainchainHeight = height;
-		share->m_sidechainHeight = sidechain_height;
-		share->m_effort = -1.0;
-		share->m_timestamp = seconds_since_epoch();
+		SubmittedShare share{};
+
+		share.m_req.data = &share;
+		share.m_allocated = false;
+
+		share.m_server = this;
+		share.m_client = client;
+		share.m_clientIPv6 = client->m_isV6;
+		share.m_clientAddr = client->m_addr;
+		memcpy(share.m_clientAddrString, client->m_addrString, sizeof(share.m_clientAddrString));
+		memcpy(share.m_clientCustomUser, client->m_customUser, sizeof(share.m_clientCustomUser));
+		share.m_clientResetCounter = client->m_resetCounter.load();
+		share.m_rpcId = client->m_rpcId;
+		share.m_id = id;
+		share.m_templateId = template_id;
+		share.m_nonce = nonce;
+		share.m_extraNonce = extra_nonce;
+		share.m_target = target;
+		share.m_resultHash = resultHash;
+		share.m_sidechainDifficulty = sidechain_diff;
+		share.m_mainchainHeight = height;
+		share.m_sidechainHeight = sidechain_height;
+		share.m_effort = -1.0;
+		share.m_timestamp = seconds_since_epoch();
 
 		uint64_t rem;
-		share->m_hashes = (target > 1) ? udiv128(1, 0, target, &rem) : 1;
-		share->m_highEnoughDifficulty = sidechain_diff.check_pow(resultHash);
-		share->m_score = 0;
+		share.m_hashes = (target > 1) ? udiv128(1, 0, target, &rem) : 1;
+		share.m_highEnoughDifficulty = sidechain_diff.check_pow(resultHash);
+		share.m_score = 0;
 
 		// Don't count shares that were found during sync
 		const SideChain& side_chain = m_pool->side_chain();
 		const PoolBlock* tip = side_chain.chainTip();
 		if (tip && (sidechain_height + side_chain.chain_window_size() < tip->m_sidechainHeight)) {
-			share->m_highEnoughDifficulty = false;
+			share.m_highEnoughDifficulty = false;
 		}
 
-		update_auto_diff(client, share->m_timestamp, share->m_hashes);
+		update_auto_diff(client, share.m_timestamp, share.m_hashes);
 
 		// If this share is below sidechain difficulty, process it in this thread because it'll be quick
-		if (!share->m_highEnoughDifficulty) {
-			on_share_found(&share->m_req);
-			on_after_share_found(&share->m_req, 0);
+		if (!share.m_highEnoughDifficulty) {
+			on_share_found(&share.m_req);
+			on_after_share_found(&share.m_req, 0);
 			return true;
 		}
 
 		// Else switch to a worker thread to check PoW which can take a long time
-		const int err = uv_queue_work(&m_loop, &share->m_req, on_share_found, on_after_share_found);
-		if (err) {
-			LOGERR(1, "uv_queue_work failed, error " << uv_err_name(err));
+		SubmittedShare* share2 = new SubmittedShare(share);
 
-			// If uv_queue_work failed, process this share here anyway
-			on_share_found(&share->m_req);
-			on_after_share_found(&share->m_req, 0);
+		share2->m_req.data = share2;
+		share2->m_allocated = true;
+
+		m_pendingShareChecks.push_back(share2);
+		LOGINFO(5, "on_submit: pending share checks count = " << m_pendingShareChecks.size());
+
+		// If there were no pending share checks, run on_share_found in background
+		// on_after_share_found will pick the remaining share checks
+		if (m_pendingShareChecks.size() == 1) {
+			const int err = uv_queue_work(&m_loop, &share2->m_req, on_share_found, on_after_share_found);
+			if (err) {
+				LOGERR(1, "uv_queue_work failed, error " << uv_err_name(err));
+
+				// If uv_queue_work failed, process this share here anyway
+				on_share_found(&share2->m_req);
+				on_after_share_found(&share2->m_req, 0);
+			}
 		}
 
 		return true;
@@ -1017,6 +1010,38 @@ void StratumServer::on_share_found(uv_work_t* req)
 void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/)
 {
 	SubmittedShare* share = reinterpret_cast<SubmittedShare*>(req->data);
+	StratumServer* server = share->m_server;
+
+	server->check_event_loop_thread(__func__);
+
+	ON_SCOPE_LEAVE([share, server]()
+		{
+			if (!share->m_allocated) {
+				return;
+			}
+
+			auto it = std::find(server->m_pendingShareChecks.begin(), server->m_pendingShareChecks.end(), share);
+			if (it != server->m_pendingShareChecks.end()) {
+				server->m_pendingShareChecks.erase(it);
+			}
+
+			delete share;
+
+			if (!server->m_pendingShareChecks.empty()) {
+				SubmittedShare* share2 = server->m_pendingShareChecks.front();
+
+				const int err = uv_queue_work(&server->m_loop, &share2->m_req, on_share_found, on_after_share_found);
+				if (err) {
+					LOGERR(1, "uv_queue_work failed, error " << uv_err_name(err));
+
+					// If uv_queue_work failed, process this share here anyway
+					server->on_share_found(&share2->m_req);
+					server->on_after_share_found(&share2->m_req, 0);
+				}
+			}
+
+			LOGINFO(5, "on_after_share_found: pending share checks count = " << server->m_pendingShareChecks.size());
+		});
 
 	if (share->m_highEnoughDifficulty) {
 		const char* s = share->m_clientCustomUser;
@@ -1040,14 +1065,6 @@ void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/)
 		BACKGROUND_JOB_STOP(StratumServer::on_share_found);
 	}
 
-	StratumServer* server = share->m_server;
-
-	ON_SCOPE_LEAVE([share, server]()
-		{
-			ASAN_POISON_MEMORY_REGION(share, sizeof(SubmittedShare));
-			server->m_submittedSharesPool.push_back(share);
-		});
-
 	const bool bad_share = (share->m_result == SubmittedShare::Result::LOW_DIFF) || (share->m_result == SubmittedShare::Result::INVALID_POW);
 
 	StratumClient* client = share->m_client;
diff --git a/src/stratum_server.h b/src/stratum_server.h
index babbb24..e92d875 100644
--- a/src/stratum_server.h
+++ b/src/stratum_server.h
@@ -147,6 +147,8 @@ private:
 	struct SubmittedShare
 	{
 		uv_work_t m_req;
+		bool m_allocated;
+
 		StratumServer* m_server;
 		StratumClient* m_client;
 		bool m_clientIPv6;
@@ -180,8 +182,6 @@ private:
 		} m_result;
 	};
 
-	std::vector<SubmittedShare*> m_submittedSharesPool;
-
 	struct HashrateData
 	{
 		uint64_t m_timestamp;
@@ -204,6 +204,8 @@ private:
 
 	std::atomic<uint64_t> m_apiLastUpdateTime;
 
+	std::deque<SubmittedShare*> m_pendingShareChecks;
+
 	void update_hashrate_data(uint64_t hashes, uint64_t timestamp);
 	void api_update_local_stats(uint64_t timestamp);