diff --git a/CMakeLists.txt b/CMakeLists.txt index 28d89a854..1f42950ed 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -43,6 +43,7 @@ set(HEADERS src/net/Job.h src/net/JobResult.h src/net/Network.h + src/net/Storage.h src/net/strategies/DonateStrategy.h src/net/strategies/FailoverStrategy.h src/net/strategies/SinglePoolStrategy.h diff --git a/src/net/Client.cpp b/src/net/Client.cpp index c8a93f24c..ec9dc863c 100644 --- a/src/net/Client.cpp +++ b/src/net/Client.cpp @@ -52,6 +52,7 @@ int64_t Client::m_sequence = 1; +xmrig::Storage Client::m_storage; Client::Client(int id, const char *agent, IClientListener *listener) : @@ -67,13 +68,17 @@ Client::Client(int id, const char *agent, IClientListener *listener) : m_state(UnconnectedState), m_expire(0), m_jobs(0), + m_keepAlive(0), + m_key(0), m_stream(nullptr), m_socket(nullptr) { + m_key = m_storage.add(this); + memset(m_ip, 0, sizeof(m_ip)); memset(&m_hints, 0, sizeof(m_hints)); - m_resolver.data = this; + m_resolver.data = m_storage.ptr(m_key); m_hints.ai_family = AF_UNSPEC; m_hints.ai_socktype = SOCK_STREAM; @@ -81,11 +86,6 @@ Client::Client(int id, const char *agent, IClientListener *listener) : m_recvBuf.base = m_buf; m_recvBuf.len = sizeof(m_buf); - -# ifndef XMRIG_PROXY_PROJECT - m_keepAliveTimer.data = this; - uv_timer_init(uv_default_loop(), &m_keepAliveTimer); -# endif } @@ -121,8 +121,13 @@ void Client::deleteLater() m_listener = nullptr; - if (!disconnect()) { - delete this; + if (state() == HostLookupState) { + uv_cancel(reinterpret_cast(&m_resolver)); + return; + } + + if (!disconnect() && m_state != ClosingState) { + m_storage.remove(m_key); } } @@ -139,17 +144,17 @@ void Client::setUrl(const Url *url) void Client::tick(uint64_t now) { - if (m_expire == 0 || now < m_expire) { - return; - } - if (m_state == ConnectedState) { - LOG_DEBUG_ERR("[%s:%u] timeout", m_url.host(), m_url.port()); - close(); + if (m_expire && now > m_expire) { + LOG_DEBUG_ERR("[%s:%u] timeout", m_url.host(), m_url.port()); + close(); + } + else if (m_keepAlive && now > m_keepAlive) { + ping(); + } } - - if (m_state == ConnectingState) { + if (m_expire && now > m_expire && m_state == ConnectingState) { connect(); } } @@ -157,12 +162,9 @@ void Client::tick(uint64_t now) bool Client::disconnect() { -# ifndef XMRIG_PROXY_PROJECT - uv_timer_stop(&m_keepAliveTimer); -# endif - - m_expire = 0; - m_failures = -1; + m_keepAlive = 0; + m_expire = 0; + m_failures = -1; return close(); } @@ -404,10 +406,10 @@ void Client::connect(sockaddr *addr) delete m_socket; uv_connect_t *req = new uv_connect_t; - req->data = this; + req->data = m_storage.ptr(m_key); m_socket = new uv_tcp_t; - m_socket->data = this; + m_socket->data = m_storage.ptr(m_key); uv_tcp_init(uv_default_loop(), m_socket); uv_tcp_nodelay(m_socket, 1); @@ -567,7 +569,7 @@ void Client::parseResponse(int64_t id, const rapidjson::Value &result, const rap LOG_ERR("[%s:%u] error: \"%s\", code: %d", m_url.host(), m_url.port(), message, error["code"].GetInt()); } - if (id == 1 || isCriticalError(message)) { + if (isCriticalError(message)) { close(); } @@ -613,18 +615,13 @@ void Client::ping() void Client::reconnect() { if (!m_listener) { - delete this; + m_storage.remove(m_key); return; } setState(ConnectingState); - -# ifndef XMRIG_PROXY_PROJECT - if (m_url.isKeepAlive()) { - uv_timer_stop(&m_keepAliveTimer); - } -# endif + m_keepAlive = 0; if (m_failures == -1) { return m_listener->onClose(this, -1); @@ -653,13 +650,9 @@ void Client::startTimeout() { m_expire = 0; -# ifndef XMRIG_PROXY_PROJECT - if (!m_url.isKeepAlive()) { - return; + if (m_url.keepAlive()) { + m_keepAlive = uv_now(uv_default_loop()) + (m_url.keepAlive() * 1000); } - - uv_timer_start(&m_keepAliveTimer, [](uv_timer_t *handle) { getClient(handle->data)->ping(); }, kKeepAliveTimeout, 0); -# endif } @@ -690,6 +683,7 @@ void Client::onConnect(uv_connect_t *req, int status) { auto client = getClient(req->data); if (!client) { + delete req; return; } @@ -735,6 +729,11 @@ void Client::onRead(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) return; } + assert(client->m_listener != nullptr); + if (!client->m_listener) { + return client->reconnect(); + } + client->m_recvBufPos += nread; char* end; @@ -771,6 +770,11 @@ void Client::onResolved(uv_getaddrinfo_t *req, int status, struct addrinfo *res) return; } + assert(client->m_listener != nullptr); + if (!client->m_listener) { + return client->reconnect(); + } + if (status < 0) { if (!client->m_quiet) { LOG_ERR("[%s:%u] DNS error: \"%s\"", client->m_url.host(), client->m_url.port(), uv_strerror(status)); diff --git a/src/net/Client.h b/src/net/Client.h index fff7a1560..fc0335c65 100644 --- a/src/net/Client.h +++ b/src/net/Client.h @@ -32,6 +32,7 @@ #include "net/Id.h" #include "net/Job.h" +#include "net/Storage.h" #include "net/SubmitResult.h" #include "net/Url.h" #include "rapidjson/fwd.h" @@ -53,9 +54,9 @@ public: }; constexpr static int kResponseTimeout = 20 * 1000; - constexpr static int kKeepAliveTimeout = 60 * 1000; Client(int id, const char *agent, IClientListener *listener); + ~Client(); bool disconnect(); int64_t submit(const JobResult &result); @@ -76,8 +77,6 @@ public: inline void setRetryPause(int ms) { m_retryPause = ms; } private: - ~Client(); - bool close(); bool isCriticalError(const char *message); bool parseJob(const rapidjson::Value ¶ms, int *code); @@ -103,7 +102,7 @@ private: static void onRead(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf); static void onResolved(uv_getaddrinfo_t *req, int status, struct addrinfo *res); - static inline Client *getClient(void *data) { return static_cast(data); } + static inline Client *getClient(void *data) { return m_storage.get(data); } addrinfo m_hints; bool m_ipv6; @@ -120,10 +119,11 @@ private: Job m_job; size_t m_recvBufPos; SocketState m_state; - static int64_t m_sequence; std::map m_results; uint64_t m_expire; uint64_t m_jobs; + uint64_t m_keepAlive; + uintptr_t m_key; Url m_url; uv_buf_t m_recvBuf; uv_getaddrinfo_t m_resolver; @@ -131,9 +131,8 @@ private: uv_tcp_t *m_socket; xmrig::Id m_rpcId; -# ifndef XMRIG_PROXY_PROJECT - uv_timer_t m_keepAliveTimer; -# endif + static int64_t m_sequence; + static xmrig::Storage m_storage; }; diff --git a/src/net/Storage.h b/src/net/Storage.h new file mode 100644 index 000000000..105547ec5 --- /dev/null +++ b/src/net/Storage.h @@ -0,0 +1,97 @@ +/* XMRig + * Copyright 2010 Jeff Garzik + * Copyright 2012-2014 pooler + * Copyright 2014 Lucas Jones + * Copyright 2014-2016 Wolf9466 + * Copyright 2016 Jay D Dee + * Copyright 2017-2018 XMR-Stak , + * Copyright 2016-2018 XMRig , + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef __STORAGE_H__ +#define __STORAGE_H__ + + +#include +#include + +#include "log/Log.h" + + +namespace xmrig { + + +template +class Storage +{ +public: + inline Storage() : + m_counter(0) + { + } + + + inline uintptr_t add(TYPE *ptr) + { + m_data[m_counter] = ptr; + + return m_counter++; + } + + + inline static void *ptr(uintptr_t id) { return reinterpret_cast(id); } + + + inline TYPE *get(void *id) const { return get(reinterpret_cast(id)); } + inline TYPE *get(uintptr_t id) const + { + assert(m_data.count(id) > 0); + + if (m_data.count(id) == 0) { + return nullptr; + } + + return m_data.at(id); + } + + + inline void remove(void *id) { remove(reinterpret_cast(id)); } + inline void remove(uintptr_t id) + { + TYPE *obj = get(id); + if (obj == nullptr) { + return; + } + + auto it = m_data.find(id); + if (it != m_data.end()) { + m_data.erase(it); + } + + delete obj; + } + + +private: + std::map m_data; + uint64_t m_counter; +}; + + +} /* namespace xmrig */ + + +#endif /* __STORAGE_H__ */ diff --git a/src/net/Url.h b/src/net/Url.h index 45db4457f..4c2c94352 100644 --- a/src/net/Url.h +++ b/src/net/Url.h @@ -41,7 +41,6 @@ public: Url(const char *host, uint16_t port, const char *user = nullptr, const char *password = nullptr, int keepAlive = 0, bool nicehash = false, int variant = -1); ~Url(); - inline bool isKeepAlive() const { return m_keepAlive > 0; } // FIXME: replace isKeepAlive to keepAlive inline bool isNicehash() const { return m_nicehash; } inline bool isValid() const { return m_host && m_port > 0; } inline const char *host() const { return m_host; }