diff --git a/src/Options.cpp b/src/Options.cpp index 331db8434..c996c755b 100644 --- a/src/Options.cpp +++ b/src/Options.cpp @@ -131,7 +131,7 @@ Options::Options(int argc, char **argv) : m_donateLevel(kDonateLevel), m_maxCpuUsage(75), m_retries(5), - m_retryPause(5), + m_retryPause(2), m_threads(0), m_affinity(-1L), m_backupUrl(nullptr), diff --git a/src/Options.h b/src/Options.h index 421d19f4c..05efbcee6 100644 --- a/src/Options.h +++ b/src/Options.h @@ -56,6 +56,7 @@ public: inline const char *user() const { return m_user; } inline const Url *backupUrl() const { return m_backupUrl; } inline const Url *url() const { return m_url; } + inline int retryPause() const { return m_retryPause; } private: Options(int argc, char **argv); diff --git a/src/interfaces/IClientListener.h b/src/interfaces/IClientListener.h index 8a2a439f0..73a855161 100644 --- a/src/interfaces/IClientListener.h +++ b/src/interfaces/IClientListener.h @@ -34,6 +34,7 @@ class IClientListener public: virtual ~IClientListener() {} + virtual void onClose(Client *client, int failures); virtual void onJobReceived(Client *client, const Job &job) = 0; virtual void onLoginCredentialsRequired(Client *client) = 0; virtual void onLoginSuccess(Client *client) = 0; diff --git a/src/net/Client.cpp b/src/net/Client.cpp index a8a4f5d57..6ae63b912 100644 --- a/src/net/Client.cpp +++ b/src/net/Client.cpp @@ -32,7 +32,8 @@ Client::Client(int id, IClientListener *listener) : m_host(nullptr), m_listener(listener), m_id(id), - m_retries(0), + m_retryPause(2000), + m_failures(0), m_sequence(1), m_recvBufPos(0), m_state(UnconnectedState), @@ -49,6 +50,9 @@ Client::Client(int id, IClientListener *listener) : m_recvBuf.base = static_cast(malloc(kRecvBufSize)); m_recvBuf.len = kRecvBufSize; + + m_retriesTimer.data = this; + uv_timer_init(uv_default_loop(), &m_retriesTimer); } @@ -80,6 +84,8 @@ void Client::connect(const Url *url) void Client::disconnect() { + m_failures = -1; + close(); } @@ -296,11 +302,24 @@ void Client::parseResponse(int64_t id, const json_t *result, const json_t *error return close(); } + m_failures = 0; m_listener->onLoginSuccess(this); m_listener->onJobReceived(this, m_job); return; } +} + +void Client::reconnect() +{ + if (m_failures == -1) { + return m_listener->onClose(this, -1); + } + + m_failures++; + m_listener->onClose(this, m_failures); + + uv_timer_start(&m_retriesTimer, [](uv_timer_t *handle) { getClient(handle->data)->connect(); }, m_retryPause, 0); } @@ -335,7 +354,7 @@ void Client::onClose(uv_handle_t *handle) client->m_socket = nullptr; client->setState(UnconnectedState); - LOG_NOTICE("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); + client->reconnect(); } @@ -405,7 +424,7 @@ void Client::onResolved(uv_getaddrinfo_t *req, int status, struct addrinfo *res) auto client = getClient(req->data); if (status < 0) { LOG_ERR("[%s:%u] DNS error: \"%s\"", client->m_host, client->m_port, uv_strerror(status)); - return client->close();; + return client->reconnect();; } client->connect(res->ai_addr); diff --git a/src/net/Client.h b/src/net/Client.h index 8df215599..a1e9997a1 100644 --- a/src/net/Client.h +++ b/src/net/Client.h @@ -57,8 +57,9 @@ public: void send(char *data); void setUrl(const Url *url); - inline int id() const { return m_id; } - inline SocketState state() const { return m_state; } + inline int id() const { return m_id; } + inline SocketState state() const { return m_state; } + inline void setRetryPause(int ms) { m_retryPause = ms; } private: constexpr static size_t kRecvBufSize = 4096; @@ -71,6 +72,7 @@ private: void parse(char *line, size_t len); void parseNotification(const char *method, const json_t *params); void parseResponse(int64_t id, const json_t *result, const json_t *error); + void reconnect(); void setState(SocketState state); static void onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf); @@ -85,7 +87,8 @@ private: char m_rpcId[64]; IClientListener *m_listener; int m_id; - int64_t m_retries; + int m_retryPause; + int64_t m_failures; int64_t m_sequence; Job m_job; size_t m_recvBufPos; @@ -96,6 +99,7 @@ private: uv_getaddrinfo_t m_resolver; uv_stream_t *m_stream; uv_tcp_t *m_socket; + uv_timer_t m_retriesTimer; }; diff --git a/src/net/Network.cpp b/src/net/Network.cpp index 1f0a55e95..101c6b793 100644 --- a/src/net/Network.cpp +++ b/src/net/Network.cpp @@ -65,6 +65,12 @@ void Network::connect() } +void Network::onClose(Client *client, int failures) +{ + LOG_DEBUG("CLOSE %d %d", client->id(), failures); +} + + void Network::onJobReceived(Client *client, const Job &job) { @@ -90,6 +96,7 @@ void Network::addPool(const Url *url) Client *client = new Client(m_pools.size(), this); client->setUrl(url); + client->setRetryPause(m_options->retryPause() * 1000); m_pools.push_back(client); } diff --git a/src/net/Network.h b/src/net/Network.h index 2b7cf2082..89787012f 100644 --- a/src/net/Network.h +++ b/src/net/Network.h @@ -46,6 +46,7 @@ public: static char *userAgent(); protected: + void onClose(Client *client, int failures) override; void onJobReceived(Client *client, const Job &job) override; void onLoginCredentialsRequired(Client *client) override; void onLoginSuccess(Client *client) override;