Fixed JSON RPC handling during shutdown

This commit is contained in:
SChernykh 2022-07-05 14:29:41 +02:00
parent 340a3e85c8
commit 4ec0fe8d96
3 changed files with 54 additions and 9 deletions

View file

@ -365,7 +365,7 @@ void Call(const std::string& address, int port, const std::string& req, const st
loop = uv_default_loop(); loop = uv_default_loop();
} }
CallOnLoop(loop, const bool result = CallOnLoop(loop,
[=]() [=]()
{ {
try { try {
@ -376,6 +376,10 @@ void Call(const std::string& address, int port, const std::string& req, const st
(*close_cb)(msg, strlen(msg)); (*close_cb)(msg, strlen(msg));
} }
}); });
if (!result) {
LOGERR(1, "JSON RPC \"" << req << "\" failed");
}
} }
} // namespace JSONRPCRequest } // namespace JSONRPCRequest

View file

@ -775,9 +775,11 @@ void p2pool::get_info()
{ {
if (size > 0) { if (size > 0) {
LOGWARN(1, "get_info RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second"); LOGWARN(1, "get_info RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second");
if (!m_stopped) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); std::this_thread::sleep_for(std::chrono::milliseconds(1000));
get_info(); get_info();
} }
}
}); });
} }
@ -820,6 +822,10 @@ void p2pool::load_found_blocks()
void p2pool::parse_get_info_rpc(const char* data, size_t size) void p2pool::parse_get_info_rpc(const char* data, size_t size)
{ {
if (m_stopped) {
return;
}
rapidjson::Document doc; rapidjson::Document doc;
doc.Parse<rapidjson::kParseCommentsFlag | rapidjson::kParseTrailingCommasFlag>(data, size); doc.Parse<rapidjson::kParseCommentsFlag | rapidjson::kParseTrailingCommasFlag>(data, size);
@ -881,14 +887,20 @@ void p2pool::get_version()
{ {
if (size > 0) { if (size > 0) {
LOGWARN(1, "get_version RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second"); LOGWARN(1, "get_version RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second");
if (!m_stopped) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); std::this_thread::sleep_for(std::chrono::milliseconds(1000));
get_version(); get_version();
} }
}
}); });
} }
void p2pool::parse_get_version_rpc(const char* data, size_t size) void p2pool::parse_get_version_rpc(const char* data, size_t size)
{ {
if (m_stopped) {
return;
}
rapidjson::Document doc; rapidjson::Document doc;
doc.Parse<rapidjson::kParseCommentsFlag | rapidjson::kParseTrailingCommasFlag>(data, size); doc.Parse<rapidjson::kParseCommentsFlag | rapidjson::kParseTrailingCommasFlag>(data, size);
@ -945,9 +957,11 @@ void p2pool::get_miner_data()
{ {
if (size > 0) { if (size > 0) {
LOGWARN(1, "get_miner_data RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second"); LOGWARN(1, "get_miner_data RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second");
if (!m_stopped) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); std::this_thread::sleep_for(std::chrono::milliseconds(1000));
get_miner_data(); get_miner_data();
} }
}
else { else {
m_getMinerDataPending = false; m_getMinerDataPending = false;
} }
@ -956,6 +970,10 @@ void p2pool::get_miner_data()
void p2pool::parse_get_miner_data_rpc(const char* data, size_t size) void p2pool::parse_get_miner_data_rpc(const char* data, size_t size)
{ {
if (m_stopped) {
return;
}
hash h; hash h;
keccak(reinterpret_cast<const uint8_t*>(data), static_cast<int>(size), h.h, HASH_SIZE); keccak(reinterpret_cast<const uint8_t*>(data), static_cast<int>(size), h.h, HASH_SIZE);
if (h == m_getMinerDataHash) { if (h == m_getMinerDataHash) {

View file

@ -137,9 +137,12 @@ struct UV_LoopUserData
UV_LoopUserData* GetLoopUserData(uv_loop_t* loop, bool create = true); UV_LoopUserData* GetLoopUserData(uv_loop_t* loop, bool create = true);
template<typename T> template<typename T>
void CallOnLoop(uv_loop_t* loop, T&& callback) bool CallOnLoop(uv_loop_t* loop, T&& callback)
{ {
UV_LoopUserData* data = GetLoopUserData(loop, false); UV_LoopUserData* data = GetLoopUserData(loop, false);
if (!data) {
return false;
}
UV_LoopCallbackBase* cb = new UV_LoopCallback<T>(std::move(callback)); UV_LoopCallbackBase* cb = new UV_LoopCallback<T>(std::move(callback));
{ {
@ -147,7 +150,27 @@ void CallOnLoop(uv_loop_t* loop, T&& callback)
data->m_callbacks.push_back(cb); data->m_callbacks.push_back(cb);
} }
uv_async_send(data->m_async); if (uv_async_send(data->m_async) == 0) {
return true;
}
// Clean up after uv_async_send error
bool found = false;
{
MutexLock lock(data->m_callbacksLock);
auto it = std::find(data->m_callbacks.begin(), data->m_callbacks.end(), cb);
if (it != data->m_callbacks.end()) {
found = true;
data->m_callbacks.erase(it);
}
}
if (found) {
delete cb;
}
return false;
} }
} // namespace p2pool } // namespace p2pool