diff --git a/src/json_rpc_request.cpp b/src/json_rpc_request.cpp index 773b725..2a2cb4d 100644 --- a/src/json_rpc_request.cpp +++ b/src/json_rpc_request.cpp @@ -25,14 +25,16 @@ static constexpr char log_category_prefix[] = "JSONRPCRequest "; namespace p2pool { -JSONRPCRequest::JSONRPCRequest(const char* address, int port, const char* req, CallbackBase* cb) +JSONRPCRequest::JSONRPCRequest(const char* address, int port, const char* req, CallbackBase* cb, CallbackBase* close_cb) : m_socket{} , m_connect{} , m_write{} , m_callback(cb) + , m_closeCallback(close_cb) , m_contentLength(0) , m_contentLengthHeader(false) , m_readBufInUse(false) + , m_valid(true) { m_readBuf[0] = '\0'; @@ -44,6 +46,8 @@ JSONRPCRequest::JSONRPCRequest(const char* address, int port, const char* req, C const int err = uv_ip6_addr(address, port, reinterpret_cast(&addr)); if (err) { LOGERR(1, "invalid IP address " << address << " or port " << port); + m_valid = false; + return; } } @@ -67,6 +71,7 @@ JSONRPCRequest::JSONRPCRequest(const char* address, int port, const char* req, C const int err = uv_tcp_connect(&m_connect, &m_socket, reinterpret_cast(&addr), on_connect); if (err) { LOGERR(1, "failed to initiate tcp connection to " << address << ", error " << uv_err_name(err)); + m_valid = false; } } @@ -75,7 +80,8 @@ void JSONRPCRequest::on_connect(uv_connect_t* req, int status) JSONRPCRequest* pThis = static_cast(req->data); if (status != 0) { - LOGERR(1, "failed to connect, error " << uv_err_name(status)); + pThis->m_error = uv_err_name(status); + LOGERR(1, "failed to connect, error " << pThis->m_error); pThis->close(); return; } @@ -92,7 +98,8 @@ void JSONRPCRequest::on_write(uv_write_t* handle, int status) JSONRPCRequest* pThis = static_cast(handle->data); if (status != 0) { - LOGERR(1, "failed to send request, error " << uv_err_name(status)); + pThis->m_error = uv_err_name(status); + LOGERR(1, "failed to send request, error " << pThis->m_error); pThis->close(); return; } @@ -123,7 +130,8 @@ void JSONRPCRequest::on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* } else if (nread < 0) { if (nread != UV_EOF){ - LOGERR(1, "failed to read response, error " << uv_err_name(static_cast(nread))); + pThis->m_error = uv_err_name(static_cast(nread)); + LOGERR(1, "failed to read response, error " << pThis->m_error); } pThis->close(); } @@ -195,8 +203,10 @@ void JSONRPCRequest::on_read(const char* data, size_t size) const llhttp_errno result = llhttp_execute(&parser, m_response.c_str(), m_response.length()); if (result != HPE_OK) { - LOGERR(1, "failed to parse response, result = " << static_cast(result)); + m_error = "failed to parse response"; + LOGERR(1, m_error << ", result = " << static_cast(result)); close(); + return; } if (!m_callback) { @@ -212,12 +222,17 @@ void JSONRPCRequest::close() void JSONRPCRequest::on_close(uv_handle_t* handle) { - delete static_cast(handle->data); + JSONRPCRequest* req = static_cast(handle->data); + if (req->m_closeCallback) { + (*req->m_closeCallback)(req->m_error.c_str(), req->m_error.length()); + } + delete req; } JSONRPCRequest::~JSONRPCRequest() { delete m_callback; + delete m_closeCallback; } } // namespace p2pool diff --git a/src/json_rpc_request.h b/src/json_rpc_request.h index a57e4b2..38309cb 100644 --- a/src/json_rpc_request.h +++ b/src/json_rpc_request.h @@ -26,7 +26,22 @@ public: static FORCEINLINE void call(const char* address, int port, const char* req, T&& cb) { // It will be deleted in one of the tcp callbacks eventually - new JSONRPCRequest(address, port, req, new Callback(std::move(cb))); + JSONRPCRequest* r = new JSONRPCRequest(address, port, req, new Callback(std::move(cb)), nullptr); + if (!r->m_valid) { + delete r; + } + } + + template + static FORCEINLINE void call(const char* address, int port, const char* req, T&& cb, U&& close_cb) + { + // It will be deleted in one of the tcp callbacks eventually + JSONRPCRequest* r = new JSONRPCRequest(address, port, req, new Callback(std::move(cb)), new Callback(std::move(close_cb))); + if (!r->m_valid) { + constexpr char err[] = "internal error"; + close_cb(err, sizeof(err) - 1); + delete r; + } } private: @@ -47,7 +62,7 @@ private: T m_cb; }; - JSONRPCRequest(const char* address, int port, const char* req, CallbackBase* cb); + JSONRPCRequest(const char* address, int port, const char* req, CallbackBase* cb, CallbackBase* close_cb); ~JSONRPCRequest(); static void on_connect(uv_connect_t* req, int status); @@ -63,6 +78,7 @@ private: uv_write_t m_write; CallbackBase* m_callback; + CallbackBase* m_closeCallback; uint32_t m_contentLength; bool m_contentLengthHeader; @@ -70,6 +86,8 @@ private: std::string m_response; char m_readBuf[65536]; bool m_readBufInUse; + bool m_valid; + std::string m_error; }; } // namespace p2pool diff --git a/src/p2pool.cpp b/src/p2pool.cpp index 1162017..35af7af 100644 --- a/src/p2pool.cpp +++ b/src/p2pool.cpp @@ -349,6 +349,8 @@ void p2pool::submit_block() const size_t nonce_offset = 0; size_t extra_nonce_offset = 0; + bool is_external = false; + if (submit_data.blob.empty()) { LOGINFO(0, "submit_block: height = " << height << ", template id = " << submit_data.template_id << ", nonce = " << submit_data.nonce << ", extra_nonce = " << submit_data.extra_nonce); @@ -360,6 +362,7 @@ void p2pool::submit_block() const } else { LOGINFO(0, "submit_block: height = " << height << ", external blob (" << submit_data.blob.size() << " bytes)"); + is_external = true; } std::string request; @@ -391,7 +394,7 @@ void p2pool::submit_block() const const uint32_t extra_nonce = submit_data.extra_nonce; JSONRPCRequest::call(m_params->m_host.c_str(), m_params->m_rpcPort, request.c_str(), - [height, diff, template_id, nonce, extra_nonce](const char* data, size_t size) + [height, diff, template_id, nonce, extra_nonce, is_external](const char* data, size_t size) { rapidjson::Document doc; if (doc.Parse(data, size).HasParseError() || !doc.IsObject()) { @@ -414,7 +417,12 @@ void p2pool::submit_block() const error_msg = it->value.GetString(); } - LOGERR(0, "submit_block: daemon returned error: '" << (error_msg ? error_msg : "unknown error") << "', template id = " << template_id << ", nonce = " << nonce << ", extra_nonce = " << extra_nonce); + if (is_external) { + LOGWARN(4, "submit_block (external blob): daemon returned error: " << (error_msg ? error_msg : "unknown error")); + } + else { + LOGERR(0, "submit_block: daemon returned error: '" << (error_msg ? error_msg : "unknown error") << "', template id = " << template_id << ", nonce = " << nonce << ", extra_nonce = " << extra_nonce); + } return; } @@ -429,6 +437,17 @@ void p2pool::submit_block() const } LOGWARN(0, "submit_block: daemon sent unrecognizable reply: " << log::const_buf(data, size)); + }, + [is_external](const char* data, size_t size) + { + if (size > 0) { + if (is_external) { + LOGWARN(4, "submit_block (external blob): RPC request failed, error " << log::const_buf(data, size)); + } + else { + LOGERR(0, "submit_block (external blob): RPC request failed, error " << log::const_buf(data, size)); + } + } }); } @@ -485,6 +504,13 @@ void p2pool::download_block_headers(uint64_t current_height) LOGERR(1, "fatal error: couldn't download block header for height " << height); panic(); } + }, + [height](const char* data, size_t size) + { + if (size > 0) { + LOGERR(1, "fatal error: couldn't download block header for height " << height << ", error " << log::const_buf(data, size)); + panic(); + } }); } @@ -506,6 +532,13 @@ void p2pool::download_block_headers(uint64_t current_height) LOGERR(1, "fatal error: couldn't download block headers for heights " << current_height - BLOCK_HEADERS_REQUIRED << " - " << current_height - 1); panic(); } + }, + [current_height](const char* data, size_t size) + { + if (size > 0) { + LOGERR(1, "fatal error: couldn't download block headers for heights " << current_height - BLOCK_HEADERS_REQUIRED << " - " << current_height - 1 << ", error " << log::const_buf(data, size)); + panic(); + } }); } @@ -582,36 +615,52 @@ void p2pool::get_info() [this](const char* data, size_t size) { parse_get_info_rpc(data, size); - }); - - if (m_api) { - std::ifstream f(FOUND_BLOCKS_FILE); - if (f.is_open()) { - while (!f.eof()) { - time_t timestamp; - f >> timestamp; - if (f.eof()) break; - - uint64_t height; - f >> height; - if (f.eof()) break; - - hash id; - f >> id; - if (f.eof()) break; - - difficulty_type block_difficulty; - f >> block_difficulty; - if (f.eof()) break; - - difficulty_type cumulative_difficulty; - f >> cumulative_difficulty; - - m_foundBlocks.emplace_back(timestamp, height, id, block_difficulty, cumulative_difficulty); + }, + [this](const char* data, size_t size) + { + if (size > 0) { + LOGWARN(1, "get_info RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second"); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + get_info(); } - api_update_block_found(nullptr); - } + }); +} + +void p2pool::load_found_blocks() +{ + if (!m_api) { + return; } + + std::ifstream f(FOUND_BLOCKS_FILE); + if (!f.is_open()) { + return; + } + + while (!f.eof()) { + time_t timestamp; + f >> timestamp; + if (f.eof()) break; + + uint64_t height; + f >> height; + if (f.eof()) break; + + hash id; + f >> id; + if (f.eof()) break; + + difficulty_type block_difficulty; + f >> block_difficulty; + if (f.eof()) break; + + difficulty_type cumulative_difficulty; + f >> cumulative_difficulty; + + m_foundBlocks.emplace_back(timestamp, height, id, block_difficulty, cumulative_difficulty); + } + + api_update_block_found(nullptr); } void p2pool::parse_get_info_rpc(const char* data, size_t size) @@ -668,6 +717,14 @@ void p2pool::get_miner_data() [this](const char* data, size_t size) { parse_get_miner_data_rpc(data, size); + }, + [this](const char* data, size_t size) + { + if (size > 0) { + LOGWARN(1, "get_miner_data RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second"); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + get_miner_data(); + } }); } @@ -1084,6 +1141,7 @@ int p2pool::run() try { ZMQReader z(m_params->m_host.c_str(), m_params->m_zmqPort, this); get_info(); + load_found_blocks(); const int rc = uv_run(uv_default_loop_checked(), UV_RUN_DEFAULT); LOGINFO(1, "uv_run exited, result = " << rc); } diff --git a/src/p2pool.h b/src/p2pool.h index ab99b37..a551e15 100644 --- a/src/p2pool.h +++ b/src/p2pool.h @@ -107,6 +107,7 @@ private: void stratum_on_block(); void get_info(); + void load_found_blocks(); void parse_get_info_rpc(const char* data, size_t size); void get_miner_data();