diff --git a/CMakeLists.txt b/CMakeLists.txt index 81ad18f6e..42bb51f89 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -26,6 +26,7 @@ set(HEADERS src/base/kernel/Entry.h src/base/kernel/interfaces/IClientListener.h src/base/kernel/interfaces/IConfigListener.h + src/base/kernel/interfaces/ILineListener.h src/base/kernel/interfaces/ISignalListener.h src/base/kernel/interfaces/IStrategy.h src/base/kernel/interfaces/IStrategyListener.h @@ -39,6 +40,7 @@ set(HEADERS src/base/net/stratum/strategies/FailoverStrategy.h src/base/net/stratum/strategies/SinglePoolStrategy.h src/base/net/stratum/SubmitResult.h + src/base/net/tools/RecvBuf.h src/base/net/tools/Storage.h src/base/tools/Arguments.h src/base/tools/Buffer.h diff --git a/src/base/kernel/interfaces/ILineListener.h b/src/base/kernel/interfaces/ILineListener.h new file mode 100644 index 000000000..1a6d49142 --- /dev/null +++ b/src/base/kernel/interfaces/ILineListener.h @@ -0,0 +1,50 @@ +/* XMRig + * Copyright 2010 Jeff Garzik + * Copyright 2012-2014 pooler + * Copyright 2014 Lucas Jones + * Copyright 2014-2016 Wolf9466 + * Copyright 2016 Jay D Dee + * Copyright 2017-2018 XMR-Stak , + * Copyright 2018-2019 SChernykh + * Copyright 2016-2019 XMRig , + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef XMRIG_ILINELISTENER_H +#define XMRIG_ILINELISTENER_H + + +#include + + +namespace xmrig { + + +class String; + + +class ILineListener +{ +public: + virtual ~ILineListener() = default; + + virtual void onLine(char *line, size_t size) = 0; +}; + + +} /* namespace xmrig */ + + +#endif // XMRIG_ILINELISTENER_H diff --git a/src/base/net/stratum/Client.cpp b/src/base/net/stratum/Client.cpp index 5b330311b..313f4ec39 100644 --- a/src/base/net/stratum/Client.cpp +++ b/src/base/net/stratum/Client.cpp @@ -83,7 +83,6 @@ xmrig::Client::Client(int id, const char *agent, IClientListener *listener) : m_retries(5), m_retryPause(5000), m_failures(0), - m_recvBufPos(0), m_state(UnconnectedState), m_tls(nullptr), m_expire(0), @@ -103,9 +102,6 @@ xmrig::Client::Client(int id, const char *agent, IClientListener *listener) : m_hints.ai_family = AF_UNSPEC; m_hints.ai_socktype = SOCK_STREAM; m_hints.ai_protocol = IPPROTO_TCP; - - m_recvBuf.base = m_buf; - m_recvBuf.len = sizeof(m_buf); } @@ -477,8 +473,8 @@ int xmrig::Client::resolve(const char *host) { setState(HostLookupState); - m_expire = 0; - m_recvBufPos = 0; + m_expire = 0; + m_recvBuf.reset(); if (m_failures == -1) { m_failures = 0; @@ -672,8 +668,6 @@ void xmrig::Client::parse(char *line, size_t len) { startTimeout(); - line[len - 1] = '\0'; - LOG_DEBUG("[%s] received (%d bytes): \"%s\"", m_pool.url(), len, line); if (len < 32 || line[0] != '{') { @@ -827,32 +821,42 @@ void xmrig::Client::ping() } -void xmrig::Client::read() +void xmrig::Client::read(ssize_t nread) { - char* end; - char* start = m_recvBuf.base; - size_t remaining = m_recvBufPos; + const size_t size = static_cast(nread); - while ((end = static_cast(memchr(start, '\n', remaining))) != nullptr) { - end++; - size_t len = end - start; - parse(start, len); - - remaining -= len; - start = end; + if (nread > 0 && size > m_recvBuf.available()) { + nread = UV_ENOBUFS; } - if (remaining == 0) { - m_recvBufPos = 0; + if (nread < 0) { + if (!isQuiet()) { + LOG_ERR("[%s] read error: \"%s\"", m_pool.url(), uv_strerror(static_cast(nread))); + } + + close(); return; } - if (start == m_recvBuf.base) { - return; + assert(client->m_listener != nullptr); + if (!m_listener) { + return reconnect(); } - memcpy(m_recvBuf.base, start, remaining); - m_recvBufPos = remaining; + m_recvBuf.nread(size); + +# ifndef XMRIG_NO_TLS + if (isTLS()) { + LOG_DEBUG("[%s] TLS received (%d bytes)", m_pool.url(), static_cast(nread)); + + m_tls->read(m_recvBuf.base(), m_recvBuf.pos()); + m_recvBuf.reset(); + } + else +# endif + { + m_recvBuf.getline(this); + } } @@ -873,7 +877,7 @@ void xmrig::Client::reconnect() setState(ConnectingState); m_failures++; - m_listener->onClose(this, (int) m_failures); + m_listener->onClose(this, static_cast(m_failures)); m_expire = Chrono::steadyMSecs() + m_retryPause; } @@ -903,15 +907,20 @@ void xmrig::Client::startTimeout() } -void xmrig::Client::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) +void xmrig::Client::onAllocBuffer(uv_handle_t *handle, size_t, uv_buf_t *buf) { auto client = getClient(handle->data); if (!client) { return; } - buf->base = &client->m_recvBuf.base[client->m_recvBufPos]; - buf->len = client->m_recvBuf.len - client->m_recvBufPos; + buf->base = client->m_recvBuf.current(); + +# ifdef _WIN32 + buf->len = static_cast(client->m_recvBuf.available()); +# else + buf->len = client->m_recvBuf.available(); +# endif } @@ -955,45 +964,11 @@ void xmrig::Client::onConnect(uv_connect_t *req, int status) } -void xmrig::Client::onRead(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) +void xmrig::Client::onRead(uv_stream_t *stream, ssize_t nread, const uv_buf_t *) { auto client = getClient(stream->data); - if (!client) { - return; - } - - if (nread < 0) { - if (!client->isQuiet()) { - LOG_ERR("[%s] read error: \"%s\"", client->m_pool.url(), uv_strerror((int) nread)); - } - - client->close(); - return; - } - - if ((size_t) nread > (sizeof(m_buf) - 8 - client->m_recvBufPos)) { - client->close(); - return; - } - - assert(client->m_listener != nullptr); - if (!client->m_listener) { - return client->reconnect(); - } - - client->m_recvBufPos += nread; - -# ifndef XMRIG_NO_TLS - if (client->isTLS()) { - LOG_DEBUG("[%s] TLS received (%d bytes)", client->m_pool.url(), static_cast(nread)); - - client->m_tls->read(client->m_recvBuf.base, client->m_recvBufPos); - client->m_recvBufPos = 0; - } - else -# endif - { - client->read(); + if (client) { + client->read(nread); } } diff --git a/src/base/net/stratum/Client.h b/src/base/net/stratum/Client.h index ed433ff95..07c82a8f8 100644 --- a/src/base/net/stratum/Client.h +++ b/src/base/net/stratum/Client.h @@ -32,9 +32,11 @@ #include +#include "base/kernel/interfaces/ILineListener.h" #include "base/net/stratum/Job.h" #include "base/net/stratum/Pool.h" #include "base/net/stratum/SubmitResult.h" +#include "base/net/tools/RecvBuf.h" #include "base/net/tools/Storage.h" #include "common/crypto/Algorithm.h" #include "rapidjson/fwd.h" @@ -50,7 +52,7 @@ class IClientListener; class JobResult; -class Client +class Client : public ILineListener { public: enum SocketState { @@ -78,7 +80,7 @@ public: # endif Client(int id, const char *agent, IClientListener *listener); - ~Client(); + ~Client() override; bool disconnect(); const char *tlsFingerprint() const; @@ -106,6 +108,9 @@ public: template inline bool has() const noexcept { return m_extensions.test(ext); } +protected: + inline void onLine(char *line, size_t size) override { parse(line, size); } + private: class Tls; @@ -129,7 +134,7 @@ private: void parseNotification(const char *method, const rapidjson::Value ¶ms, const rapidjson::Value &error); void parseResponse(int64_t id, const rapidjson::Value &result, const rapidjson::Value &error); void ping(); - void read(); + void read(ssize_t nread); void reconnect(); void setState(SocketState state); void startTimeout(); @@ -149,7 +154,6 @@ private: bool m_enabled; bool m_ipv6; bool m_quiet; - char m_buf[kInputBufferSize]; char m_ip[46]; char m_sendBuf[2048]; const char *m_agent; @@ -160,7 +164,7 @@ private: int64_t m_failures; Job m_job; Pool m_pool; - size_t m_recvBufPos; + RecvBuf m_recvBuf; SocketState m_state; std::bitset m_extensions; std::map m_results; @@ -170,7 +174,6 @@ private: uint64_t m_jobs; uint64_t m_keepAlive; uintptr_t m_key; - uv_buf_t m_recvBuf; uv_getaddrinfo_t m_resolver; uv_stream_t *m_stream; uv_tcp_t *m_socket; diff --git a/src/base/net/stratum/Tls.cpp b/src/base/net/stratum/Tls.cpp index e8948f34f..e96d9946e 100644 --- a/src/base/net/stratum/Tls.cpp +++ b/src/base/net/stratum/Tls.cpp @@ -135,7 +135,8 @@ void xmrig::Client::Tls::read(const char *data, size_t size) int bytes_read = 0; while ((bytes_read = SSL_read(m_ssl, m_buf, sizeof(m_buf))) > 0) { - m_client->parse(m_buf, bytes_read); + m_buf[bytes_read - 1] = '\0'; + m_client->parse(m_buf, static_cast(bytes_read)); } } diff --git a/src/base/net/tools/RecvBuf.h b/src/base/net/tools/RecvBuf.h new file mode 100644 index 000000000..5122c9804 --- /dev/null +++ b/src/base/net/tools/RecvBuf.h @@ -0,0 +1,99 @@ +/* XMRig + * Copyright 2010 Jeff Garzik + * Copyright 2012-2014 pooler + * Copyright 2014 Lucas Jones + * Copyright 2014-2016 Wolf9466 + * Copyright 2016 Jay D Dee + * Copyright 2017-2018 XMR-Stak , + * Copyright 2018-2019 SChernykh + * Copyright 2016-2019 XMRig , + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef XMRIG_RECVBUF_H +#define XMRIG_RECVBUF_H + + +#include + + +#include "base/kernel/interfaces/ILineListener.h" + + +namespace xmrig { + + +template +class RecvBuf +{ +public: + inline RecvBuf() : + m_buf(), + m_pos(0) + { + } + + inline char *base() { return m_buf; } + inline char *current() { return m_buf + m_pos; } + inline const char *base() const { return m_buf; } + inline const char *current() const { return m_buf + m_pos; } + inline size_t available() const { return N - m_pos; } + inline size_t pos() const { return m_pos; } + inline void nread(size_t size) { m_pos += size; } + inline void reset() { m_pos = 0; } + + constexpr inline size_t size() const { return N; } + + inline void getline(ILineListener *listener) + { + char *end; + char *start = m_buf; + size_t remaining = m_pos; + + while ((end = static_cast(memchr(start, '\n', remaining))) != nullptr) { + *end = '\0'; + + end++; + const size_t len = static_cast(end - start); + + listener->onLine(start, len - 1); + + remaining -= len; + start = end; + } + + if (remaining == 0) { + m_pos = 0; + return; + } + + if (start == m_buf) { + return; + } + + memcpy(m_buf, start, remaining); + m_pos = remaining; + } + +private: + char m_buf[N]; + size_t m_pos; +}; + + +} /* namespace xmrig */ + + +#endif /* XMRIG_RECVBUF_H */