Add support for keepAlive.

This commit is contained in:
XMRig 2017-06-07 06:48:00 +03:00
parent c29dc8bcf4
commit 5f1f901649
5 changed files with 53 additions and 15 deletions

View file

@ -120,7 +120,7 @@ Options::Options(int argc, char **argv) :
m_background(false), m_background(false),
m_colors(true), m_colors(true),
m_doubleHash(false), m_doubleHash(false),
m_keepalive(false), m_keepAlive(false),
m_nicehash(false), m_nicehash(false),
m_ready(false), m_ready(false),
m_safe(false), m_safe(false),
@ -277,7 +277,7 @@ bool Options::parseArg(int key, char *arg)
break; break;
case 'k': /* --keepalive */ case 'k': /* --keepalive */
m_keepalive = true; m_keepAlive = true;
break; break;
case 'V': /* --version */ case 'V': /* --version */

View file

@ -52,6 +52,7 @@ public:
static Options *parse(int argc, char **argv); static Options *parse(int argc, char **argv);
inline bool isReady() const { return m_ready; } inline bool isReady() const { return m_ready; }
inline bool keepAlive() const { return m_keepAlive; }
inline const char *pass() const { return m_pass; } inline const char *pass() const { return m_pass; }
inline const char *user() const { return m_user; } inline const char *user() const { return m_user; }
inline const Url *backupUrl() const { return m_backupUrl; } inline const Url *backupUrl() const { return m_backupUrl; }
@ -75,7 +76,7 @@ private:
bool m_background; bool m_background;
bool m_colors; bool m_colors;
bool m_doubleHash; bool m_doubleHash;
bool m_keepalive; bool m_keepAlive;
bool m_nicehash; bool m_nicehash;
bool m_ready; bool m_ready;
bool m_safe; bool m_safe;

View file

@ -29,6 +29,7 @@
Client::Client(int id, IClientListener *listener) : Client::Client(int id, IClientListener *listener) :
m_keepAlive(false),
m_host(nullptr), m_host(nullptr),
m_listener(listener), m_listener(listener),
m_id(id), m_id(id),
@ -41,7 +42,7 @@ Client::Client(int id, IClientListener *listener) :
m_stream(nullptr), m_stream(nullptr),
m_socket(nullptr) m_socket(nullptr)
{ {
m_resolver.data = this; m_resolver.data = m_responseTimer.data = m_retriesTimer.data = m_keepAliveTimer.data = this;
m_hints.ai_family = PF_INET; m_hints.ai_family = PF_INET;
m_hints.ai_socktype = SOCK_STREAM; m_hints.ai_socktype = SOCK_STREAM;
@ -51,8 +52,10 @@ Client::Client(int id, IClientListener *listener) :
m_recvBuf.base = static_cast<char*>(malloc(kRecvBufSize)); m_recvBuf.base = static_cast<char*>(malloc(kRecvBufSize));
m_recvBuf.len = kRecvBufSize; m_recvBuf.len = kRecvBufSize;
m_retriesTimer.data = this; auto loop = uv_default_loop();
uv_timer_init(uv_default_loop(), &m_retriesTimer); uv_timer_init(loop, &m_retriesTimer);
uv_timer_init(loop, &m_responseTimer);
uv_timer_init(loop, &m_keepAliveTimer);
} }
@ -122,14 +125,11 @@ void Client::send(char *data)
req->data = buf.base; req->data = buf.base;
uv_write(req, m_stream, &buf, 1, [](uv_write_t *req, int status) { uv_write(req, m_stream, &buf, 1, [](uv_write_t *req, int status) {
if (status) {
auto client = getClient(req->data);
LOG_ERR("[%s:%u] write error: \"%s\"", client->m_host, client->m_port, uv_strerror(status));
}
free(req->data); free(req->data);
free(req); free(req);
}); });
uv_timer_start(&m_responseTimer, [](uv_timer_t* handle) { getClient(handle->data)->close(); }, kResponseTimeout, 0);
} }
@ -236,6 +236,8 @@ void Client::connect(struct sockaddr *addr)
void Client::parse(char *line, size_t len) void Client::parse(char *line, size_t len)
{ {
startTimeout();
line[len - 1] = '\0'; line[len - 1] = '\0';
LOG_DEBUG("[%s:%u] received (%d bytes): \"%s\"", m_host, m_port, len, line); LOG_DEBUG("[%s:%u] received (%d bytes): \"%s\"", m_host, m_port, len, line);
@ -310,8 +312,22 @@ void Client::parseResponse(int64_t id, const json_t *result, const json_t *error
} }
void Client::ping()
{
char *req = static_cast<char*>(malloc(128));
snprintf(req, 128, "{\"id\":%lld,\"jsonrpc\":\"2.0\",\"method\":\"keepalived\",\"params\":{\"id\":\"%s\"}}\n", m_sequence, m_rpcId);
send(req);
}
void Client::reconnect() void Client::reconnect()
{ {
uv_timer_stop(&m_responseTimer);
if (m_keepAlive) {
uv_timer_stop(&m_keepAliveTimer);
}
if (m_failures == -1) { if (m_failures == -1) {
return m_listener->onClose(this, -1); return m_listener->onClose(this, -1);
} }
@ -335,6 +351,17 @@ void Client::setState(SocketState state)
} }
void Client::startTimeout()
{
uv_timer_stop(&m_responseTimer);
if (!m_keepAlive) {
return;
}
uv_timer_start(&m_keepAliveTimer, [](uv_timer_t *handle) { getClient(handle->data)->ping(); }, kKeepAliveTimeout, 0);
}
void Client::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) void Client::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
{ {
auto client = getClient(handle->data); auto client = getClient(handle->data);

View file

@ -47,6 +47,9 @@ public:
ClosingState ClosingState
}; };
constexpr static int kResponseTimeout = 15 * 1000;
constexpr static int kKeepAliveTimeout = 60 * 1000;
Client(int id, IClientListener *listener); Client(int id, IClientListener *listener);
~Client(); ~Client();
@ -57,9 +60,10 @@ public:
void send(char *data); void send(char *data);
void setUrl(const Url *url); void setUrl(const Url *url);
inline int id() const { return m_id; } inline int id() const { return m_id; }
inline SocketState state() const { return m_state; } inline SocketState state() const { return m_state; }
inline void setRetryPause(int ms) { m_retryPause = ms; } inline void setKeepAlive(bool keepAlive) { m_keepAlive = keepAlive; }
inline void setRetryPause(int ms) { m_retryPause = ms; }
private: private:
constexpr static size_t kRecvBufSize = 4096; constexpr static size_t kRecvBufSize = 4096;
@ -72,8 +76,10 @@ private:
void parse(char *line, size_t len); void parse(char *line, size_t len);
void parseNotification(const char *method, const json_t *params); void parseNotification(const char *method, const json_t *params);
void parseResponse(int64_t id, const json_t *result, const json_t *error); void parseResponse(int64_t id, const json_t *result, const json_t *error);
void ping();
void reconnect(); void reconnect();
void setState(SocketState state); void setState(SocketState state);
void startTimeout();
static void onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf); static void onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
static void onClose(uv_handle_t *handle); static void onClose(uv_handle_t *handle);
@ -83,6 +89,7 @@ private:
static Client *getClient(void *data); static Client *getClient(void *data);
bool m_keepAlive;
char *m_host; char *m_host;
char m_rpcId[64]; char m_rpcId[64];
IClientListener *m_listener; IClientListener *m_listener;
@ -99,6 +106,8 @@ private:
uv_getaddrinfo_t m_resolver; uv_getaddrinfo_t m_resolver;
uv_stream_t *m_stream; uv_stream_t *m_stream;
uv_tcp_t *m_socket; uv_tcp_t *m_socket;
uv_timer_t m_keepAliveTimer;
uv_timer_t m_responseTimer;
uv_timer_t m_retriesTimer; uv_timer_t m_retriesTimer;
}; };

View file

@ -41,7 +41,7 @@ Network::Network(const Options *options) :
m_pools.reserve(2); m_pools.reserve(2);
m_agent = userAgent(); m_agent = userAgent();
std::unique_ptr<Url> url(new Url("donate.xmrig.com", 443)); auto url = std::make_unique<Url>("donate.xmrig.com", 443);
addPool(url.get()); addPool(url.get());
addPool(m_options->url()); addPool(m_options->url());
@ -97,6 +97,7 @@ void Network::addPool(const Url *url)
Client *client = new Client(m_pools.size(), this); Client *client = new Client(m_pools.size(), this);
client->setUrl(url); client->setUrl(url);
client->setRetryPause(m_options->retryPause() * 1000); client->setRetryPause(m_options->retryPause() * 1000);
client->setKeepAlive(m_options->keepAlive());
m_pools.push_back(client); m_pools.push_back(client);
} }