Added error handling for self-select mode.

This commit is contained in:
XMRig 2019-10-17 00:57:35 +07:00
parent 83a5923568
commit d783febad6
8 changed files with 141 additions and 52 deletions

View file

@ -78,9 +78,7 @@ private:
xmrig::HttpClient::HttpClient(int method, const String &url, IHttpListener *listener, const char *data, size_t size) : xmrig::HttpClient::HttpClient(int method, const String &url, IHttpListener *listener, const char *data, size_t size) :
HttpContext(HTTP_RESPONSE, listener), HttpContext(HTTP_RESPONSE, listener)
m_quiet(false),
m_port(0)
{ {
this->method = method; this->method = method;
this->url = url; this->url = url;
@ -127,7 +125,7 @@ void xmrig::HttpClient::onResolved(const Dns &dns, int status)
sockaddr *addr = dns.get().addr(m_port); sockaddr *addr = dns.get().addr(m_port);
uv_connect_t *req = new uv_connect_t; auto req = new uv_connect_t;
req->data = this; req->data = this;
uv_tcp_connect(req, m_tcp, addr, onConnect); uv_tcp_connect(req, m_tcp, addr, onConnect);
@ -140,7 +138,7 @@ void xmrig::HttpClient::handshake()
headers.insert({ "Connection", "close" }); headers.insert({ "Connection", "close" });
headers.insert({ "User-Agent", Platform::userAgent() }); headers.insert({ "User-Agent", Platform::userAgent() });
if (body.size()) { if (!body.empty()) {
headers.insert({ "Content-Length", std::to_string(body.size()) }); headers.insert({ "Content-Length", std::to_string(body.size()) });
} }
@ -169,14 +167,14 @@ void xmrig::HttpClient::read(const char *data, size_t size)
void xmrig::HttpClient::write(const std::string &header) void xmrig::HttpClient::write(const std::string &header)
{ {
ClientWriteBaton *baton = new ClientWriteBaton(header, std::move(body)); auto baton = new ClientWriteBaton(header, std::move(body));
uv_write(&baton->req, stream(), baton->bufs, baton->count(), ClientWriteBaton::onWrite); uv_write(&baton->req, stream(), baton->bufs, baton->count(), ClientWriteBaton::onWrite);
} }
void xmrig::HttpClient::onConnect(uv_connect_t *req, int status) void xmrig::HttpClient::onConnect(uv_connect_t *req, int status)
{ {
HttpClient *client = static_cast<HttpClient *>(req->data); auto client = static_cast<HttpClient *>(req->data);
if (!client) { if (!client) {
delete req; delete req;
return; return;
@ -205,7 +203,7 @@ void xmrig::HttpClient::onConnect(uv_connect_t *req, int status)
}, },
[](uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf) [](uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf)
{ {
HttpClient *client = static_cast<HttpClient*>(tcp->data); auto client = static_cast<HttpClient*>(tcp->data);
if (nread >= 0) { if (nread >= 0) {
client->read(buf->base, static_cast<size_t>(nread)); client->read(buf->base, static_cast<size_t>(nread));

View file

@ -30,6 +30,7 @@
#include "base/net/http/HttpContext.h" #include "base/net/http/HttpContext.h"
#include "base/kernel/interfaces/IDnsListener.h" #include "base/kernel/interfaces/IDnsListener.h"
#include "base/tools/Object.h"
namespace xmrig { namespace xmrig {
@ -41,6 +42,8 @@ class String;
class HttpClient : public HttpContext, public IDnsListener class HttpClient : public HttpContext, public IDnsListener
{ {
public: public:
XMRIG_DISABLE_COPY_MOVE_DEFAULT(HttpClient);
HttpClient(int method, const String &url, IHttpListener *listener, const char *data = nullptr, size_t size = 0); HttpClient(int method, const String &url, IHttpListener *listener, const char *data = nullptr, size_t size = 0);
~HttpClient() override; ~HttpClient() override;
@ -57,13 +60,13 @@ protected:
virtual void read(const char *data, size_t size); virtual void read(const char *data, size_t size);
virtual void write(const std::string &header); virtual void write(const std::string &header);
bool m_quiet; bool m_quiet = false;
private: private:
static void onConnect(uv_connect_t *req, int status); static void onConnect(uv_connect_t *req, int status);
Dns *m_dns; Dns *m_dns;
uint16_t m_port; uint16_t m_port = 0;
}; };

View file

@ -136,7 +136,7 @@ void xmrig::HttpContext::closeAll()
int xmrig::HttpContext::onHeaderField(http_parser *parser, const char *at, size_t length) int xmrig::HttpContext::onHeaderField(http_parser *parser, const char *at, size_t length)
{ {
HttpContext *ctx = static_cast<HttpContext*>(parser->data); auto ctx = static_cast<HttpContext*>(parser->data);
if (ctx->m_wasHeaderValue) { if (ctx->m_wasHeaderValue) {
if (!ctx->m_lastHeaderField.empty()) { if (!ctx->m_lastHeaderField.empty()) {
@ -155,7 +155,7 @@ int xmrig::HttpContext::onHeaderField(http_parser *parser, const char *at, size_
int xmrig::HttpContext::onHeaderValue(http_parser *parser, const char *at, size_t length) int xmrig::HttpContext::onHeaderValue(http_parser *parser, const char *at, size_t length)
{ {
HttpContext *ctx = static_cast<HttpContext*>(parser->data); auto ctx = static_cast<HttpContext*>(parser->data);
if (!ctx->m_wasHeaderValue) { if (!ctx->m_wasHeaderValue) {
ctx->m_lastHeaderValue = std::string(at, length); ctx->m_lastHeaderValue = std::string(at, length);
@ -185,7 +185,7 @@ void xmrig::HttpContext::attach(http_parser_settings *settings)
settings->on_header_value = onHeaderValue; settings->on_header_value = onHeaderValue;
settings->on_headers_complete = [](http_parser* parser) -> int { settings->on_headers_complete = [](http_parser* parser) -> int {
HttpContext *ctx = static_cast<HttpContext*>(parser->data); auto ctx = static_cast<HttpContext*>(parser->data);
ctx->status = parser->status_code; ctx->status = parser->status_code;
if (parser->type == HTTP_REQUEST) { if (parser->type == HTTP_REQUEST) {
@ -208,7 +208,7 @@ void xmrig::HttpContext::attach(http_parser_settings *settings)
settings->on_message_complete = [](http_parser *parser) -> int settings->on_message_complete = [](http_parser *parser) -> int
{ {
HttpContext *ctx = static_cast<HttpContext*>(parser->data); auto ctx = static_cast<HttpContext*>(parser->data);
ctx->m_listener->onHttpData(*ctx); ctx->m_listener->onHttpData(*ctx);
ctx->m_listener = nullptr; ctx->m_listener = nullptr;

View file

@ -28,15 +28,16 @@
#define XMRIG_HTTPCONTEXT_H #define XMRIG_HTTPCONTEXT_H
typedef struct http_parser http_parser; using http_parser = struct http_parser;
typedef struct http_parser_settings http_parser_settings; using http_parser_settings = struct http_parser_settings;
typedef struct uv_connect_s uv_connect_t; using uv_connect_t = struct uv_connect_s;
typedef struct uv_handle_s uv_handle_t; using uv_handle_t = struct uv_handle_s;
typedef struct uv_stream_s uv_stream_t; using uv_stream_t = struct uv_stream_s;
typedef struct uv_tcp_s uv_tcp_t; using uv_tcp_t = struct uv_tcp_s;
#include "base/net/http/HttpData.h" #include "base/net/http/HttpData.h"
#include "base/tools/Object.h"
namespace xmrig { namespace xmrig {
@ -48,6 +49,8 @@ class IHttpListener;
class HttpContext : public HttpData class HttpContext : public HttpData
{ {
public: public:
XMRIG_DISABLE_COPY_MOVE_DEFAULT(HttpContext)
HttpContext(int parser_type, IHttpListener *listener); HttpContext(int parser_type, IHttpListener *listener);
virtual ~HttpContext(); virtual ~HttpContext();

View file

@ -24,7 +24,7 @@
*/ */
#include <assert.h> #include <cassert>
#include <openssl/ssl.h> #include <openssl/ssl.h>
#include <uv.h> #include <uv.h>

View file

@ -28,10 +28,10 @@
#define XMRIG_HTTPSCLIENT_H #define XMRIG_HTTPSCLIENT_H
typedef struct bio_st BIO; using BIO = struct bio_st;
typedef struct ssl_ctx_st SSL_CTX; using SSL_CTX = struct ssl_ctx_st;
typedef struct ssl_st SSL; using SSL = struct ssl_st;
typedef struct x509_st X509; using X509 = struct x509_st;
#include "base/net/http/HttpClient.h" #include "base/net/http/HttpClient.h"
@ -44,6 +44,8 @@ namespace xmrig {
class HttpsClient : public HttpClient class HttpsClient : public HttpClient
{ {
public: public:
XMRIG_DISABLE_COPY_MOVE_DEFAULT(HttpsClient)
HttpsClient(int method, const String &url, IHttpListener *listener, const char *data, size_t size, const String &fingerprint); HttpsClient(int method, const String &url, IHttpListener *listener, const char *data, size_t size, const String &fingerprint);
~HttpsClient() override; ~HttpsClient() override;

View file

@ -73,6 +73,20 @@ xmrig::SelfSelectClient::~SelfSelectClient()
} }
void xmrig::SelfSelectClient::tick(uint64_t now)
{
m_client->tick(now);
if (m_state == RetryState) {
if (Chrono::steadyMSecs() - m_timestamp < m_retryPause) {
return;
}
getBlockTemplate();
}
}
void xmrig::SelfSelectClient::onJobReceived(IClient *, const Job &job, const rapidjson::Value &) void xmrig::SelfSelectClient::onJobReceived(IClient *, const Job &job, const rapidjson::Value &)
{ {
m_job = job; m_job = job;
@ -96,7 +110,7 @@ bool xmrig::SelfSelectClient::parseResponse(int64_t id, rapidjson::Value &result
} }
if (error.IsObject()) { if (error.IsObject()) {
LOG_ERR("[%s:%d] error: " RED_BOLD("\"%s\"") RED_S ", code: %d", pool().daemon().host().data(), pool().daemon().port(), Json::getString(error, "message"), Json::getInt(error, "code")); LOG_ERR("[%s] error: " RED_BOLD("\"%s\"") RED_S ", code: %d", pool().daemon().url().data(), Json::getString(error, "message"), Json::getInt(error, "code"));
return false; return false;
} }
@ -107,7 +121,7 @@ bool xmrig::SelfSelectClient::parseResponse(int64_t id, rapidjson::Value &result
for (auto field : required_fields) { for (auto field : required_fields) {
if (!result.HasMember(field)) { if (!result.HasMember(field)) {
LOG_ERR("[%s:%d] required field " RED_BOLD("\"%s\"") RED_S " not found", pool().daemon().host().data(), pool().daemon().port(), field); LOG_ERR("[%s] required field " RED_BOLD("\"%s\"") RED_S " not found", pool().daemon().url().data(), field);
return false; return false;
} }
@ -122,14 +136,14 @@ bool xmrig::SelfSelectClient::parseResponse(int64_t id, rapidjson::Value &result
submitBlockTemplate(result); submitBlockTemplate(result);
return true; return true;
} }
void xmrig::SelfSelectClient::getBlockTemplate() void xmrig::SelfSelectClient::getBlockTemplate()
{ {
setState(WaitState);
using namespace rapidjson; using namespace rapidjson;
Document doc(kObjectType); Document doc(kObjectType);
auto &allocator = doc.GetAllocator(); auto &allocator = doc.GetAllocator();
@ -138,7 +152,7 @@ void xmrig::SelfSelectClient::getBlockTemplate()
params.AddMember("wallet_address", m_job.poolWallet().toJSON(), allocator); params.AddMember("wallet_address", m_job.poolWallet().toJSON(), allocator);
params.AddMember("extra_nonce", m_job.extraNonce().toJSON(), allocator); params.AddMember("extra_nonce", m_job.extraNonce().toJSON(), allocator);
JsonRequest::create(doc, sequence(), "getblocktemplate", params); JsonRequest::create(doc, m_sequence++, "getblocktemplate", params);
send(HTTP_POST, "/json_rpc", doc); send(HTTP_POST, "/json_rpc", doc);
} }
@ -146,15 +160,14 @@ void xmrig::SelfSelectClient::getBlockTemplate()
void xmrig::SelfSelectClient::retry() void xmrig::SelfSelectClient::retry()
{ {
// FIXME setState(RetryState);
} }
void xmrig::SelfSelectClient::send(int method, const char *url, const char *data, size_t size) void xmrig::SelfSelectClient::send(int method, const char *url, const char *data, size_t size)
{ {
LOG_DEBUG("[%s:%d] " MAGENTA_BOLD("\"%s %s\"") BLACK_BOLD_S " send (%zu bytes): \"%.*s\"", LOG_DEBUG("[%s] " MAGENTA_BOLD("\"%s %s\"") BLACK_BOLD_S " send (%zu bytes): \"%.*s\"",
pool().daemon().host().data(), pool().daemon().url().data(),
pool().daemon().port(),
http_method_str(static_cast<http_method>(method)), http_method_str(static_cast<http_method>(method)),
url, url,
size, size,
@ -172,7 +185,7 @@ void xmrig::SelfSelectClient::send(int method, const char *url, const char *data
client = new HttpClient(method, url, this, data, size); client = new HttpClient(method, url, this, data, size);
} }
client->setQuiet(m_quiet); client->setQuiet(isQuiet());
client->connect(pool().daemon().host(), pool().daemon().port()); client->connect(pool().daemon().host(), pool().daemon().port());
} }
@ -189,6 +202,37 @@ void xmrig::SelfSelectClient::send(int method, const char *url, const rapidjson:
} }
void xmrig::SelfSelectClient::setState(State state)
{
if (m_state == state) {
return;
}
switch (state) {
case IdleState:
m_timestamp = 0;
m_failures = 0;
break;
case WaitState:
m_timestamp = Chrono::steadyMSecs();
break;
case RetryState:
m_timestamp = Chrono::steadyMSecs();
if (m_failures > m_retries) {
m_listener->onClose(this, static_cast<int>(m_failures));
}
m_failures++;
break;
}
m_state = state;
}
void xmrig::SelfSelectClient::submitBlockTemplate(rapidjson::Value &result) void xmrig::SelfSelectClient::submitBlockTemplate(rapidjson::Value &result)
{ {
using namespace rapidjson; using namespace rapidjson;
@ -196,18 +240,35 @@ void xmrig::SelfSelectClient::submitBlockTemplate(rapidjson::Value &result)
auto &allocator = doc.GetAllocator(); auto &allocator = doc.GetAllocator();
Value params(kObjectType); Value params(kObjectType);
params.AddMember(StringRef(kId), m_job.clientId().toJSON(), allocator); params.AddMember(StringRef(kId), m_job.clientId().toJSON(), allocator);
params.AddMember(StringRef(kJobId), m_job.id().toJSON(), allocator); params.AddMember(StringRef(kJobId), m_job.id().toJSON(), allocator);
params.AddMember(StringRef(kBlob), result[kBlocktemplateBlob], allocator); params.AddMember(StringRef(kBlob), result[kBlocktemplateBlob], allocator);
params.AddMember(StringRef(kHeight), m_job.height(), allocator); params.AddMember(StringRef(kHeight), m_job.height(), allocator);
params.AddMember(StringRef(kDifficulty), result[kDifficulty], allocator); params.AddMember(StringRef(kDifficulty), result[kDifficulty], allocator);
params.AddMember(StringRef(kPrevHash), result[kPrevHash], allocator); params.AddMember(StringRef(kPrevHash), result[kPrevHash], allocator);
params.AddMember(StringRef(kSeedHash), result[kSeedHash], allocator); params.AddMember(StringRef(kSeedHash), result[kSeedHash], allocator);
params.AddMember(StringRef(kNextSeedHash), result[kNextSeedHash], allocator); params.AddMember(StringRef(kNextSeedHash), result[kNextSeedHash], allocator);
JsonRequest::create(doc, sequence(), "block_template", params); JsonRequest::create(doc, sequence(), "block_template", params);
send(doc, [this](const rapidjson::Value &result, bool success, uint64_t elapsed) { send(doc, [this](const rapidjson::Value &result, bool success, uint64_t elapsed) {
if (!success) {
if (!isQuiet()) {
LOG_ERR("[%s] error: " RED_BOLD("\"%s\"") RED_S ", code: %d", pool().daemon().url().data(), Json::getString(result, "message"), Json::getInt(result, "code"));
}
return retry();
}
if (!m_active) {
return;
}
if (m_failures > m_retries) {
m_listener->onLoginSuccess(this);
}
setState(IdleState);
m_listener->onJobReceived(this, m_job, rapidjson::Value{}); m_listener->onJobReceived(this, m_job, rapidjson::Value{});
}); });
} }
@ -219,18 +280,23 @@ void xmrig::SelfSelectClient::onHttpData(const HttpData &data)
return retry(); return retry();
} }
LOG_DEBUG("[%s:%d] received (%d bytes): \"%.*s\"", pool().daemon().host().data(), pool().daemon().port(), static_cast<int>(data.body.size()), static_cast<int>(data.body.size()), data.body.c_str()); LOG_DEBUG("[%s] received (%d bytes): \"%.*s\"", pool().daemon().url().data(), static_cast<int>(data.body.size()), static_cast<int>(data.body.size()), data.body.c_str());
rapidjson::Document doc; rapidjson::Document doc;
if (doc.Parse(data.body.c_str()).HasParseError()) { if (doc.Parse(data.body.c_str()).HasParseError()) {
if (!m_quiet) { if (!isQuiet()) {
LOG_ERR("[%s:%d] JSON decode failed: \"%s\"", pool().daemon().host().data(), pool().daemon().port(), rapidjson::GetParseError_En(doc.GetParseError())); LOG_ERR("[%s] JSON decode failed: \"%s\"", pool().daemon().url().data(), rapidjson::GetParseError_En(doc.GetParseError()));
} }
return retry(); return retry();
} }
if (!parseResponse(Json::getInt64(doc, "id", -1), doc["result"], Json::getObject(doc, "error"))) { const int64_t id = Json::getInt64(doc, "id", -1);
if (id > 0 && m_sequence - id != 1) {
return;
}
if (!parseResponse(id, doc["result"], Json::getObject(doc, "error"))) {
retry(); retry();
} }
} }

View file

@ -69,13 +69,14 @@ protected:
inline void setEnabled(bool enabled) override { m_client->setEnabled(enabled); } inline void setEnabled(bool enabled) override { m_client->setEnabled(enabled); }
inline void setPool(const Pool &pool) override { m_client->setPool(pool); } inline void setPool(const Pool &pool) override { m_client->setPool(pool); }
inline void setQuiet(bool quiet) override { m_client->setQuiet(quiet); m_quiet = quiet; } inline void setQuiet(bool quiet) override { m_client->setQuiet(quiet); m_quiet = quiet; }
inline void setRetries(int retries) override { m_client->setRetries(retries); } inline void setRetries(int retries) override { m_client->setRetries(retries); m_retries = retries; }
inline void setRetryPause(uint64_t ms) override { m_client->setRetryPause(ms); } inline void setRetryPause(uint64_t ms) override { m_client->setRetryPause(ms); m_retryPause = ms; }
inline void tick(uint64_t now) override { m_client->tick(now); }
void tick(uint64_t now) override;
// IClientListener // IClientListener
inline void onClose(IClient *, int failures) override { m_listener->onClose(this, failures); } inline void onClose(IClient *, int failures) override { m_listener->onClose(this, failures); setState(IdleState); m_active = false; }
inline void onLoginSuccess(IClient *) override { m_listener->onLoginSuccess(this); } inline void onLoginSuccess(IClient *) override { m_listener->onLoginSuccess(this); setState(IdleState); m_active = true; }
inline void onResultAccepted(IClient *, const SubmitResult &result, const char *error) override { m_listener->onResultAccepted(this, result, error); } inline void onResultAccepted(IClient *, const SubmitResult &result, const char *error) override { m_listener->onResultAccepted(this, result, error); }
inline void onVerifyAlgorithm(const IClient *, const Algorithm &algorithm, bool *ok) override { m_listener->onVerifyAlgorithm(this, algorithm, ok); } inline void onVerifyAlgorithm(const IClient *, const Algorithm &algorithm, bool *ok) override { m_listener->onVerifyAlgorithm(this, algorithm, ok); }
@ -86,17 +87,33 @@ protected:
void onHttpData(const HttpData &data) override; void onHttpData(const HttpData &data) override;
private: private:
enum State {
IdleState,
WaitState,
RetryState
};
inline bool isQuiet() const { return m_quiet || m_failures >= m_retries; }
bool parseResponse(int64_t id, rapidjson::Value &result, const rapidjson::Value &error); bool parseResponse(int64_t id, rapidjson::Value &result, const rapidjson::Value &error);
void getBlockTemplate(); void getBlockTemplate();
void retry(); void retry();
void send(int method, const char *url, const char *data = nullptr, size_t size = 0); void send(int method, const char *url, const char *data = nullptr, size_t size = 0);
void send(int method, const char *url, const rapidjson::Document &doc); void send(int method, const char *url, const rapidjson::Document &doc);
void setState(State state);
void submitBlockTemplate(rapidjson::Value &result); void submitBlockTemplate(rapidjson::Value &result);
bool m_quiet = false; bool m_active = false;
bool m_quiet = false;
IClient *m_client; IClient *m_client;
IClientListener *m_listener; IClientListener *m_listener;
int m_retries = 5;
int64_t m_failures = 0;
int64_t m_sequence = 1;
Job m_job; Job m_job;
State m_state = IdleState;
uint64_t m_retryPause = 5000;
uint64_t m_timestamp = 0;
}; };