diff --git a/src/Options.cpp b/src/Options.cpp index c996c755b..2df634413 100644 --- a/src/Options.cpp +++ b/src/Options.cpp @@ -120,7 +120,7 @@ Options::Options(int argc, char **argv) : m_background(false), m_colors(true), m_doubleHash(false), - m_keepalive(false), + m_keepAlive(false), m_nicehash(false), m_ready(false), m_safe(false), @@ -277,7 +277,7 @@ bool Options::parseArg(int key, char *arg) break; case 'k': /* --keepalive */ - m_keepalive = true; + m_keepAlive = true; break; case 'V': /* --version */ diff --git a/src/Options.h b/src/Options.h index 05efbcee6..227659f79 100644 --- a/src/Options.h +++ b/src/Options.h @@ -52,6 +52,7 @@ public: static Options *parse(int argc, char **argv); inline bool isReady() const { return m_ready; } + inline bool keepAlive() const { return m_keepAlive; } inline const char *pass() const { return m_pass; } inline const char *user() const { return m_user; } inline const Url *backupUrl() const { return m_backupUrl; } @@ -75,7 +76,7 @@ private: bool m_background; bool m_colors; bool m_doubleHash; - bool m_keepalive; + bool m_keepAlive; bool m_nicehash; bool m_ready; bool m_safe; diff --git a/src/net/Client.cpp b/src/net/Client.cpp index 6ae63b912..fbfe17f8d 100644 --- a/src/net/Client.cpp +++ b/src/net/Client.cpp @@ -29,6 +29,7 @@ Client::Client(int id, IClientListener *listener) : + m_keepAlive(false), m_host(nullptr), m_listener(listener), m_id(id), @@ -41,7 +42,7 @@ Client::Client(int id, IClientListener *listener) : m_stream(nullptr), m_socket(nullptr) { - m_resolver.data = this; + m_resolver.data = m_responseTimer.data = m_retriesTimer.data = m_keepAliveTimer.data = this; m_hints.ai_family = PF_INET; m_hints.ai_socktype = SOCK_STREAM; @@ -51,8 +52,10 @@ Client::Client(int id, IClientListener *listener) : m_recvBuf.base = static_cast(malloc(kRecvBufSize)); m_recvBuf.len = kRecvBufSize; - m_retriesTimer.data = this; - uv_timer_init(uv_default_loop(), &m_retriesTimer); + auto loop = uv_default_loop(); + uv_timer_init(loop, &m_retriesTimer); + uv_timer_init(loop, &m_responseTimer); + uv_timer_init(loop, &m_keepAliveTimer); } @@ -122,14 +125,11 @@ void Client::send(char *data) req->data = buf.base; uv_write(req, m_stream, &buf, 1, [](uv_write_t *req, int status) { - if (status) { - auto client = getClient(req->data); - LOG_ERR("[%s:%u] write error: \"%s\"", client->m_host, client->m_port, uv_strerror(status)); - } - free(req->data); free(req); }); + + uv_timer_start(&m_responseTimer, [](uv_timer_t* handle) { getClient(handle->data)->close(); }, kResponseTimeout, 0); } @@ -236,6 +236,8 @@ void Client::connect(struct sockaddr *addr) void Client::parse(char *line, size_t len) { + startTimeout(); + line[len - 1] = '\0'; LOG_DEBUG("[%s:%u] received (%d bytes): \"%s\"", m_host, m_port, len, line); @@ -310,8 +312,22 @@ void Client::parseResponse(int64_t id, const json_t *result, const json_t *error } +void Client::ping() +{ + char *req = static_cast(malloc(128)); + snprintf(req, 128, "{\"id\":%lld,\"jsonrpc\":\"2.0\",\"method\":\"keepalived\",\"params\":{\"id\":\"%s\"}}\n", m_sequence, m_rpcId); + + send(req); +} + + void Client::reconnect() { + uv_timer_stop(&m_responseTimer); + if (m_keepAlive) { + uv_timer_stop(&m_keepAliveTimer); + } + if (m_failures == -1) { return m_listener->onClose(this, -1); } @@ -335,6 +351,17 @@ void Client::setState(SocketState state) } +void Client::startTimeout() +{ + uv_timer_stop(&m_responseTimer); + if (!m_keepAlive) { + return; + } + + uv_timer_start(&m_keepAliveTimer, [](uv_timer_t *handle) { getClient(handle->data)->ping(); }, kKeepAliveTimeout, 0); +} + + void Client::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { auto client = getClient(handle->data); diff --git a/src/net/Client.h b/src/net/Client.h index a1e9997a1..7dbdf21fe 100644 --- a/src/net/Client.h +++ b/src/net/Client.h @@ -47,6 +47,9 @@ public: ClosingState }; + constexpr static int kResponseTimeout = 15 * 1000; + constexpr static int kKeepAliveTimeout = 60 * 1000; + Client(int id, IClientListener *listener); ~Client(); @@ -57,9 +60,10 @@ public: void send(char *data); void setUrl(const Url *url); - inline int id() const { return m_id; } - inline SocketState state() const { return m_state; } - inline void setRetryPause(int ms) { m_retryPause = ms; } + inline int id() const { return m_id; } + inline SocketState state() const { return m_state; } + inline void setKeepAlive(bool keepAlive) { m_keepAlive = keepAlive; } + inline void setRetryPause(int ms) { m_retryPause = ms; } private: constexpr static size_t kRecvBufSize = 4096; @@ -72,8 +76,10 @@ private: void parse(char *line, size_t len); void parseNotification(const char *method, const json_t *params); void parseResponse(int64_t id, const json_t *result, const json_t *error); + void ping(); void reconnect(); void setState(SocketState state); + void startTimeout(); static void onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf); static void onClose(uv_handle_t *handle); @@ -83,6 +89,7 @@ private: static Client *getClient(void *data); + bool m_keepAlive; char *m_host; char m_rpcId[64]; IClientListener *m_listener; @@ -99,6 +106,8 @@ private: uv_getaddrinfo_t m_resolver; uv_stream_t *m_stream; uv_tcp_t *m_socket; + uv_timer_t m_keepAliveTimer; + uv_timer_t m_responseTimer; uv_timer_t m_retriesTimer; }; diff --git a/src/net/Network.cpp b/src/net/Network.cpp index 101c6b793..a69341674 100644 --- a/src/net/Network.cpp +++ b/src/net/Network.cpp @@ -41,7 +41,7 @@ Network::Network(const Options *options) : m_pools.reserve(2); m_agent = userAgent(); - std::unique_ptr url(new Url("donate.xmrig.com", 443)); + auto url = std::make_unique("donate.xmrig.com", 443); addPool(url.get()); addPool(m_options->url()); @@ -97,6 +97,7 @@ void Network::addPool(const Url *url) Client *client = new Client(m_pools.size(), this); client->setUrl(url); client->setRetryPause(m_options->retryPause() * 1000); + client->setKeepAlive(m_options->keepAlive()); m_pools.push_back(client); }