Better error handling of Monero RPC requests

This commit is contained in:
SChernykh 2021-09-10 16:18:16 +02:00
parent 36ee76d4d2
commit 74096248e1
4 changed files with 130 additions and 38 deletions

View file

@ -25,14 +25,16 @@ static constexpr char log_category_prefix[] = "JSONRPCRequest ";
namespace p2pool { namespace p2pool {
JSONRPCRequest::JSONRPCRequest(const char* address, int port, const char* req, CallbackBase* cb) JSONRPCRequest::JSONRPCRequest(const char* address, int port, const char* req, CallbackBase* cb, CallbackBase* close_cb)
: m_socket{} : m_socket{}
, m_connect{} , m_connect{}
, m_write{} , m_write{}
, m_callback(cb) , m_callback(cb)
, m_closeCallback(close_cb)
, m_contentLength(0) , m_contentLength(0)
, m_contentLengthHeader(false) , m_contentLengthHeader(false)
, m_readBufInUse(false) , m_readBufInUse(false)
, m_valid(true)
{ {
m_readBuf[0] = '\0'; m_readBuf[0] = '\0';
@ -44,6 +46,8 @@ JSONRPCRequest::JSONRPCRequest(const char* address, int port, const char* req, C
const int err = uv_ip6_addr(address, port, reinterpret_cast<sockaddr_in6*>(&addr)); const int err = uv_ip6_addr(address, port, reinterpret_cast<sockaddr_in6*>(&addr));
if (err) { if (err) {
LOGERR(1, "invalid IP address " << address << " or port " << port); LOGERR(1, "invalid IP address " << address << " or port " << port);
m_valid = false;
return;
} }
} }
@ -67,6 +71,7 @@ JSONRPCRequest::JSONRPCRequest(const char* address, int port, const char* req, C
const int err = uv_tcp_connect(&m_connect, &m_socket, reinterpret_cast<const sockaddr*>(&addr), on_connect); const int err = uv_tcp_connect(&m_connect, &m_socket, reinterpret_cast<const sockaddr*>(&addr), on_connect);
if (err) { if (err) {
LOGERR(1, "failed to initiate tcp connection to " << address << ", error " << uv_err_name(err)); LOGERR(1, "failed to initiate tcp connection to " << address << ", error " << uv_err_name(err));
m_valid = false;
} }
} }
@ -75,7 +80,8 @@ void JSONRPCRequest::on_connect(uv_connect_t* req, int status)
JSONRPCRequest* pThis = static_cast<JSONRPCRequest*>(req->data); JSONRPCRequest* pThis = static_cast<JSONRPCRequest*>(req->data);
if (status != 0) { if (status != 0) {
LOGERR(1, "failed to connect, error " << uv_err_name(status)); pThis->m_error = uv_err_name(status);
LOGERR(1, "failed to connect, error " << pThis->m_error);
pThis->close(); pThis->close();
return; return;
} }
@ -92,7 +98,8 @@ void JSONRPCRequest::on_write(uv_write_t* handle, int status)
JSONRPCRequest* pThis = static_cast<JSONRPCRequest*>(handle->data); JSONRPCRequest* pThis = static_cast<JSONRPCRequest*>(handle->data);
if (status != 0) { if (status != 0) {
LOGERR(1, "failed to send request, error " << uv_err_name(status)); pThis->m_error = uv_err_name(status);
LOGERR(1, "failed to send request, error " << pThis->m_error);
pThis->close(); pThis->close();
return; return;
} }
@ -123,7 +130,8 @@ void JSONRPCRequest::on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t*
} }
else if (nread < 0) { else if (nread < 0) {
if (nread != UV_EOF){ if (nread != UV_EOF){
LOGERR(1, "failed to read response, error " << uv_err_name(static_cast<int>(nread))); pThis->m_error = uv_err_name(static_cast<int>(nread));
LOGERR(1, "failed to read response, error " << pThis->m_error);
} }
pThis->close(); pThis->close();
} }
@ -195,8 +203,10 @@ void JSONRPCRequest::on_read(const char* data, size_t size)
const llhttp_errno result = llhttp_execute(&parser, m_response.c_str(), m_response.length()); const llhttp_errno result = llhttp_execute(&parser, m_response.c_str(), m_response.length());
if (result != HPE_OK) { if (result != HPE_OK) {
LOGERR(1, "failed to parse response, result = " << static_cast<int>(result)); m_error = "failed to parse response";
LOGERR(1, m_error << ", result = " << static_cast<int>(result));
close(); close();
return;
} }
if (!m_callback) { if (!m_callback) {
@ -212,12 +222,17 @@ void JSONRPCRequest::close()
void JSONRPCRequest::on_close(uv_handle_t* handle) void JSONRPCRequest::on_close(uv_handle_t* handle)
{ {
delete static_cast<JSONRPCRequest*>(handle->data); JSONRPCRequest* req = static_cast<JSONRPCRequest*>(handle->data);
if (req->m_closeCallback) {
(*req->m_closeCallback)(req->m_error.c_str(), req->m_error.length());
}
delete req;
} }
JSONRPCRequest::~JSONRPCRequest() JSONRPCRequest::~JSONRPCRequest()
{ {
delete m_callback; delete m_callback;
delete m_closeCallback;
} }
} // namespace p2pool } // namespace p2pool

View file

@ -26,7 +26,22 @@ public:
static FORCEINLINE void call(const char* address, int port, const char* req, T&& cb) static FORCEINLINE void call(const char* address, int port, const char* req, T&& cb)
{ {
// It will be deleted in one of the tcp callbacks eventually // It will be deleted in one of the tcp callbacks eventually
new JSONRPCRequest(address, port, req, new Callback<T>(std::move(cb))); JSONRPCRequest* r = new JSONRPCRequest(address, port, req, new Callback<T>(std::move(cb)), nullptr);
if (!r->m_valid) {
delete r;
}
}
template<typename T, typename U>
static FORCEINLINE void call(const char* address, int port, const char* req, T&& cb, U&& close_cb)
{
// It will be deleted in one of the tcp callbacks eventually
JSONRPCRequest* r = new JSONRPCRequest(address, port, req, new Callback<T>(std::move(cb)), new Callback<U>(std::move(close_cb)));
if (!r->m_valid) {
constexpr char err[] = "internal error";
close_cb(err, sizeof(err) - 1);
delete r;
}
} }
private: private:
@ -47,7 +62,7 @@ private:
T m_cb; T m_cb;
}; };
JSONRPCRequest(const char* address, int port, const char* req, CallbackBase* cb); JSONRPCRequest(const char* address, int port, const char* req, CallbackBase* cb, CallbackBase* close_cb);
~JSONRPCRequest(); ~JSONRPCRequest();
static void on_connect(uv_connect_t* req, int status); static void on_connect(uv_connect_t* req, int status);
@ -63,6 +78,7 @@ private:
uv_write_t m_write; uv_write_t m_write;
CallbackBase* m_callback; CallbackBase* m_callback;
CallbackBase* m_closeCallback;
uint32_t m_contentLength; uint32_t m_contentLength;
bool m_contentLengthHeader; bool m_contentLengthHeader;
@ -70,6 +86,8 @@ private:
std::string m_response; std::string m_response;
char m_readBuf[65536]; char m_readBuf[65536];
bool m_readBufInUse; bool m_readBufInUse;
bool m_valid;
std::string m_error;
}; };
} // namespace p2pool } // namespace p2pool

View file

@ -349,6 +349,8 @@ void p2pool::submit_block() const
size_t nonce_offset = 0; size_t nonce_offset = 0;
size_t extra_nonce_offset = 0; size_t extra_nonce_offset = 0;
bool is_external = false;
if (submit_data.blob.empty()) { if (submit_data.blob.empty()) {
LOGINFO(0, "submit_block: height = " << height << ", template id = " << submit_data.template_id << ", nonce = " << submit_data.nonce << ", extra_nonce = " << submit_data.extra_nonce); LOGINFO(0, "submit_block: height = " << height << ", template id = " << submit_data.template_id << ", nonce = " << submit_data.nonce << ", extra_nonce = " << submit_data.extra_nonce);
@ -360,6 +362,7 @@ void p2pool::submit_block() const
} }
else { else {
LOGINFO(0, "submit_block: height = " << height << ", external blob (" << submit_data.blob.size() << " bytes)"); LOGINFO(0, "submit_block: height = " << height << ", external blob (" << submit_data.blob.size() << " bytes)");
is_external = true;
} }
std::string request; std::string request;
@ -391,7 +394,7 @@ void p2pool::submit_block() const
const uint32_t extra_nonce = submit_data.extra_nonce; const uint32_t extra_nonce = submit_data.extra_nonce;
JSONRPCRequest::call(m_params->m_host.c_str(), m_params->m_rpcPort, request.c_str(), JSONRPCRequest::call(m_params->m_host.c_str(), m_params->m_rpcPort, request.c_str(),
[height, diff, template_id, nonce, extra_nonce](const char* data, size_t size) [height, diff, template_id, nonce, extra_nonce, is_external](const char* data, size_t size)
{ {
rapidjson::Document doc; rapidjson::Document doc;
if (doc.Parse<rapidjson::kParseCommentsFlag | rapidjson::kParseTrailingCommasFlag>(data, size).HasParseError() || !doc.IsObject()) { if (doc.Parse<rapidjson::kParseCommentsFlag | rapidjson::kParseTrailingCommasFlag>(data, size).HasParseError() || !doc.IsObject()) {
@ -414,7 +417,12 @@ void p2pool::submit_block() const
error_msg = it->value.GetString(); error_msg = it->value.GetString();
} }
LOGERR(0, "submit_block: daemon returned error: '" << (error_msg ? error_msg : "unknown error") << "', template id = " << template_id << ", nonce = " << nonce << ", extra_nonce = " << extra_nonce); if (is_external) {
LOGWARN(4, "submit_block (external blob): daemon returned error: " << (error_msg ? error_msg : "unknown error"));
}
else {
LOGERR(0, "submit_block: daemon returned error: '" << (error_msg ? error_msg : "unknown error") << "', template id = " << template_id << ", nonce = " << nonce << ", extra_nonce = " << extra_nonce);
}
return; return;
} }
@ -429,6 +437,17 @@ void p2pool::submit_block() const
} }
LOGWARN(0, "submit_block: daemon sent unrecognizable reply: " << log::const_buf(data, size)); LOGWARN(0, "submit_block: daemon sent unrecognizable reply: " << log::const_buf(data, size));
},
[is_external](const char* data, size_t size)
{
if (size > 0) {
if (is_external) {
LOGWARN(4, "submit_block (external blob): RPC request failed, error " << log::const_buf(data, size));
}
else {
LOGERR(0, "submit_block (external blob): RPC request failed, error " << log::const_buf(data, size));
}
}
}); });
} }
@ -485,6 +504,13 @@ void p2pool::download_block_headers(uint64_t current_height)
LOGERR(1, "fatal error: couldn't download block header for height " << height); LOGERR(1, "fatal error: couldn't download block header for height " << height);
panic(); panic();
} }
},
[height](const char* data, size_t size)
{
if (size > 0) {
LOGERR(1, "fatal error: couldn't download block header for height " << height << ", error " << log::const_buf(data, size));
panic();
}
}); });
} }
@ -506,6 +532,13 @@ void p2pool::download_block_headers(uint64_t current_height)
LOGERR(1, "fatal error: couldn't download block headers for heights " << current_height - BLOCK_HEADERS_REQUIRED << " - " << current_height - 1); LOGERR(1, "fatal error: couldn't download block headers for heights " << current_height - BLOCK_HEADERS_REQUIRED << " - " << current_height - 1);
panic(); panic();
} }
},
[current_height](const char* data, size_t size)
{
if (size > 0) {
LOGERR(1, "fatal error: couldn't download block headers for heights " << current_height - BLOCK_HEADERS_REQUIRED << " - " << current_height - 1 << ", error " << log::const_buf(data, size));
panic();
}
}); });
} }
@ -582,36 +615,52 @@ void p2pool::get_info()
[this](const char* data, size_t size) [this](const char* data, size_t size)
{ {
parse_get_info_rpc(data, size); parse_get_info_rpc(data, size);
}); },
[this](const char* data, size_t size)
if (m_api) { {
std::ifstream f(FOUND_BLOCKS_FILE); if (size > 0) {
if (f.is_open()) { LOGWARN(1, "get_info RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second");
while (!f.eof()) { std::this_thread::sleep_for(std::chrono::milliseconds(1000));
time_t timestamp; get_info();
f >> timestamp;
if (f.eof()) break;
uint64_t height;
f >> height;
if (f.eof()) break;
hash id;
f >> id;
if (f.eof()) break;
difficulty_type block_difficulty;
f >> block_difficulty;
if (f.eof()) break;
difficulty_type cumulative_difficulty;
f >> cumulative_difficulty;
m_foundBlocks.emplace_back(timestamp, height, id, block_difficulty, cumulative_difficulty);
} }
api_update_block_found(nullptr); });
} }
void p2pool::load_found_blocks()
{
if (!m_api) {
return;
} }
std::ifstream f(FOUND_BLOCKS_FILE);
if (!f.is_open()) {
return;
}
while (!f.eof()) {
time_t timestamp;
f >> timestamp;
if (f.eof()) break;
uint64_t height;
f >> height;
if (f.eof()) break;
hash id;
f >> id;
if (f.eof()) break;
difficulty_type block_difficulty;
f >> block_difficulty;
if (f.eof()) break;
difficulty_type cumulative_difficulty;
f >> cumulative_difficulty;
m_foundBlocks.emplace_back(timestamp, height, id, block_difficulty, cumulative_difficulty);
}
api_update_block_found(nullptr);
} }
void p2pool::parse_get_info_rpc(const char* data, size_t size) void p2pool::parse_get_info_rpc(const char* data, size_t size)
@ -668,6 +717,14 @@ void p2pool::get_miner_data()
[this](const char* data, size_t size) [this](const char* data, size_t size)
{ {
parse_get_miner_data_rpc(data, size); parse_get_miner_data_rpc(data, size);
},
[this](const char* data, size_t size)
{
if (size > 0) {
LOGWARN(1, "get_miner_data RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second");
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
get_miner_data();
}
}); });
} }
@ -1084,6 +1141,7 @@ int p2pool::run()
try { try {
ZMQReader z(m_params->m_host.c_str(), m_params->m_zmqPort, this); ZMQReader z(m_params->m_host.c_str(), m_params->m_zmqPort, this);
get_info(); get_info();
load_found_blocks();
const int rc = uv_run(uv_default_loop_checked(), UV_RUN_DEFAULT); const int rc = uv_run(uv_default_loop_checked(), UV_RUN_DEFAULT);
LOGINFO(1, "uv_run exited, result = " << rc); LOGINFO(1, "uv_run exited, result = " << rc);
} }

View file

@ -107,6 +107,7 @@ private:
void stratum_on_block(); void stratum_on_block();
void get_info(); void get_info();
void load_found_blocks();
void parse_get_info_rpc(const char* data, size_t size); void parse_get_info_rpc(const char* data, size_t size);
void get_miner_data(); void get_miner_data();