xmrig/src/net/Client.cpp

652 lines
15 KiB
C++
Raw Normal View History

2017-06-04 17:52:21 +00:00
/* XMRig
* Copyright 2010 Jeff Garzik <jgarzik@pobox.com>
* Copyright 2012-2014 pooler <pooler@litecoinpool.org>
* Copyright 2014 Lucas Jones <https://github.com/lucasjones>
* Copyright 2014-2016 Wolf9466 <https://github.com/OhGodAPet>
* Copyright 2016 Jay D Dee <jayddee246@gmail.com>
* Copyright 2016-2017 XMRig <support@xmrig.com>
*
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2017-07-18 17:20:29 +00:00
#include <inttypes.h>
#include <iterator>
#include <stdio.h>
#include <string.h>
2017-06-10 10:32:27 +00:00
#include <utility>
2017-06-04 17:52:21 +00:00
#include "interfaces/IClientListener.h"
#include "log/Log.h"
2017-06-04 17:52:21 +00:00
#include "net/Client.h"
#include "net/Url.h"
#include "rapidjson/document.h"
#include "rapidjson/error/en.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
2017-06-04 17:52:21 +00:00
#ifdef XMRIG_PROXY_PROJECT
# include "proxy/JobResult.h"
#else
# include "net/JobResult.h"
#endif
2017-06-15 19:30:56 +00:00
#ifdef _MSC_VER
# define strncasecmp(x,y,z) _strnicmp(x,y,z)
#endif
int64_t Client::m_sequence = 1;
Client::Client(int id, const char *agent, IClientListener *listener) :
2017-06-30 00:20:50 +00:00
m_quiet(false),
m_agent(agent),
2017-06-04 17:52:21 +00:00
m_listener(listener),
2017-06-06 23:46:52 +00:00
m_id(id),
2017-06-07 06:07:31 +00:00
m_retryPause(5000),
2017-06-07 01:19:32 +00:00
m_failures(0),
2017-06-04 17:52:21 +00:00
m_recvBufPos(0),
m_state(UnconnectedState),
m_expire(0),
2017-06-04 17:52:21 +00:00
m_stream(nullptr),
m_socket(nullptr)
{
2017-07-01 22:36:33 +00:00
memset(m_ip, 0, sizeof(m_ip));
2017-07-05 04:20:28 +00:00
memset(&m_hints, 0, sizeof(m_hints));
m_resolver.data = this;
2017-06-04 17:52:21 +00:00
m_hints.ai_family = PF_INET;
m_hints.ai_socktype = SOCK_STREAM;
m_hints.ai_protocol = IPPROTO_TCP;
m_recvBuf.base = m_buf;
m_recvBuf.len = sizeof(m_buf);
2017-06-07 01:19:32 +00:00
# ifndef XMRIG_PROXY_PROJECT
m_keepAliveTimer.data = this;
uv_timer_init(uv_default_loop(), &m_keepAliveTimer);
# endif
2017-06-04 17:52:21 +00:00
}
Client::~Client()
{
delete m_socket;
}
2017-06-06 23:46:52 +00:00
void Client::connect()
2017-06-04 17:52:21 +00:00
{
resolve(m_url.host());
2017-06-04 17:52:21 +00:00
}
/**
* @brief Connect to server.
*
* @param url
*/
void Client::connect(const Url *url)
{
2017-06-06 23:46:52 +00:00
setUrl(url);
resolve(m_url.host());
2017-06-04 17:52:21 +00:00
}
void Client::disconnect()
{
# ifndef XMRIG_PROXY_PROJECT
2017-07-18 02:20:36 +00:00
uv_timer_stop(&m_keepAliveTimer);
# endif
m_expire = 0;
2017-06-07 01:19:32 +00:00
m_failures = -1;
2017-06-04 17:52:21 +00:00
close();
}
2017-06-06 23:46:52 +00:00
void Client::setUrl(const Url *url)
{
2017-06-07 19:34:23 +00:00
if (!url || !url->isValid()) {
return;
}
m_url = url;
2017-06-06 23:46:52 +00:00
}
void Client::tick(uint64_t now)
{
if (m_expire == 0 || now < m_expire) {
return;
}
if (m_state == ConnectedState) {
LOG_DEBUG_ERR("[%s:%u] timeout", m_url.host(), m_url.port());
close();
}
if (m_state == ConnectingState) {
connect();
}
}
int64_t Client::submit(const JobResult &result)
2017-06-11 12:32:15 +00:00
{
# ifdef XMRIG_PROXY_PROJECT
const char *nonce = result.nonce;
const char *data = result.result;
# else
2017-06-11 12:32:15 +00:00
char nonce[9];
char data[65];
Job::toHex(reinterpret_cast<const unsigned char*>(&result.nonce), 4, nonce);
nonce[8] = '\0';
Job::toHex(result.result, 32, data);
data[64] = '\0';
# endif
2017-06-11 12:32:15 +00:00
const size_t size = snprintf(m_sendBuf, sizeof(m_sendBuf), "{\"id\":%" PRIu64 ",\"jsonrpc\":\"2.0\",\"method\":\"submit\",\"params\":{\"id\":\"%s\",\"job_id\":\"%s\",\"nonce\":\"%s\",\"result\":\"%s\"}}\n",
m_sequence, m_rpcId, result.jobId.data(), nonce, data);
2017-06-11 12:32:15 +00:00
2017-09-01 05:02:56 +00:00
m_results[m_sequence] = SubmitResult(m_sequence, result.diff, result.actualDiff());
return send(size);
2017-06-11 12:32:15 +00:00
}
bool Client::isCriticalError(const char *message)
{
if (!message) {
return false;
}
if (strncasecmp(message, "Unauthenticated", 15) == 0) {
return true;
}
if (strncasecmp(message, "your IP is banned", 17) == 0) {
return true;
}
if (strncasecmp(message, "IP Address currently banned", 27) == 0) {
return true;
}
return false;
}
bool Client::parseJob(const rapidjson::Value &params, int *code)
2017-06-06 17:43:52 +00:00
{
if (!params.IsObject()) {
2017-06-06 17:43:52 +00:00
*code = 2;
return false;
}
Job job(m_id, m_url.isNicehash());
if (!job.setId(params["job_id"].GetString())) {
2017-06-06 17:43:52 +00:00
*code = 3;
return false;
}
if (!job.setBlob(params["blob"].GetString())) {
2017-06-06 17:43:52 +00:00
*code = 4;
return false;
}
if (!job.setTarget(params["target"].GetString())) {
2017-06-06 17:43:52 +00:00
*code = 5;
return false;
}
if (m_job == job) {
2017-10-23 12:32:07 +00:00
if (!m_quiet) {
LOG_WARN("[%s:%u] duplicate job received, reconnect", m_url.host(), m_url.port());
}
close();
return false;
}
2017-06-10 10:32:27 +00:00
m_job = std::move(job);
2017-06-06 17:43:52 +00:00
return true;
}
bool Client::parseLogin(const rapidjson::Value &result, int *code)
2017-06-06 03:35:17 +00:00
{
const char *id = result["id"].GetString();
2017-06-06 03:35:17 +00:00
if (!id || strlen(id) >= sizeof(m_rpcId)) {
*code = 1;
return false;
}
memset(m_rpcId, 0, sizeof(m_rpcId));
memcpy(m_rpcId, id, strlen(id));
return parseJob(result["job"], code);
2017-06-06 03:35:17 +00:00
}
2017-06-04 17:52:21 +00:00
int Client::resolve(const char *host)
{
setState(HostLookupState);
m_expire = 0;
2017-06-04 17:52:21 +00:00
m_recvBufPos = 0;
2017-07-01 17:53:42 +00:00
if (m_failures == -1) {
m_failures = 0;
}
2017-06-04 17:52:21 +00:00
const int r = uv_getaddrinfo(uv_default_loop(), &m_resolver, Client::onResolved, host, NULL, &m_hints);
if (r) {
2017-06-30 00:20:50 +00:00
if (!m_quiet) {
LOG_ERR("[%s:%u] getaddrinfo error: \"%s\"", host, m_url.port(), uv_strerror(r));
}
2017-06-04 17:52:21 +00:00
return 1;
}
return 0;
}
int64_t Client::send(size_t size)
{
LOG_DEBUG("[%s:%u] send (%d bytes): \"%s\"", m_url.host(), m_url.port(), size, m_sendBuf);
if (state() != ConnectedState || !uv_is_writable(m_stream)) {
LOG_DEBUG_ERR("[%s:%u] send failed, invalid state: %d", m_url.host(), m_url.port(), m_state);
return -1;
}
uv_buf_t buf = uv_buf_init(m_sendBuf, (unsigned int) size);
if (uv_try_write(m_stream, &buf, 1) < 0) {
close();
return -1;
}
m_expire = uv_now(uv_default_loop()) + kResponseTimeout;
return m_sequence++;
}
2017-06-04 17:52:21 +00:00
void Client::close()
{
if (m_state == UnconnectedState || m_state == ClosingState || !m_socket) {
return;
}
setState(ClosingState);
if (uv_is_closing(reinterpret_cast<uv_handle_t*>(m_socket)) == 0) {
uv_close(reinterpret_cast<uv_handle_t*>(m_socket), Client::onClose);
}
2017-06-04 17:52:21 +00:00
}
void Client::connect(struct sockaddr *addr)
{
setState(ConnectingState);
reinterpret_cast<struct sockaddr_in*>(addr)->sin_port = htons(m_url.port());
delete m_socket;
2017-06-04 17:52:21 +00:00
uv_connect_t *req = new uv_connect_t;
2017-06-04 17:52:21 +00:00
req->data = this;
m_socket = new uv_tcp_t;
2017-06-04 17:52:21 +00:00
m_socket->data = this;
uv_tcp_init(uv_default_loop(), m_socket);
uv_tcp_nodelay(m_socket, 1);
# ifndef WIN32
2017-06-04 17:52:21 +00:00
uv_tcp_keepalive(m_socket, 1, 60);
# endif
2017-06-04 17:52:21 +00:00
uv_tcp_connect(req, m_socket, reinterpret_cast<const sockaddr*>(addr), Client::onConnect);
2017-06-04 17:52:21 +00:00
}
void Client::login()
{
m_results.clear();
rapidjson::Document doc;
doc.SetObject();
auto &allocator = doc.GetAllocator();
doc.AddMember("id", 1, allocator);
doc.AddMember("jsonrpc", "2.0", allocator);
doc.AddMember("method", "login", allocator);
rapidjson::Value params(rapidjson::kObjectType);
params.AddMember("login", rapidjson::StringRef(m_url.user()), allocator);
params.AddMember("pass", rapidjson::StringRef(m_url.password()), allocator);
params.AddMember("agent", rapidjson::StringRef(m_agent), allocator);
doc.AddMember("params", params, allocator);
rapidjson::StringBuffer buffer(0, 512);
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
doc.Accept(writer);
const size_t size = buffer.GetSize();
if (size > (sizeof(m_buf) - 2)) {
return;
}
memcpy(m_sendBuf, buffer.GetString(), size);
m_sendBuf[size] = '\n';
m_sendBuf[size + 1] = '\0';
send(size + 1);
}
2017-06-04 17:52:21 +00:00
void Client::parse(char *line, size_t len)
{
2017-06-07 03:48:00 +00:00
startTimeout();
2017-06-04 17:52:21 +00:00
line[len - 1] = '\0';
LOG_DEBUG("[%s:%u] received (%d bytes): \"%s\"", m_url.host(), m_url.port(), len, line);
2017-06-04 17:52:21 +00:00
rapidjson::Document doc;
if (doc.ParseInsitu(line).HasParseError()) {
2017-06-30 00:20:50 +00:00
if (!m_quiet) {
LOG_ERR("[%s:%u] JSON decode failed: \"%s\"", m_url.host(), m_url.port(), rapidjson::GetParseError_En(doc.GetParseError()));
2017-06-30 00:20:50 +00:00
}
2017-06-04 17:52:21 +00:00
return;
}
2017-06-06 03:05:17 +00:00
if (!doc.IsObject()) {
return;
}
const rapidjson::Value &id = doc["id"];
if (id.IsInt64()) {
parseResponse(id.GetInt64(), doc["result"], doc["error"]);
2017-06-06 03:05:17 +00:00
}
else {
parseNotification(doc["method"].GetString(), doc["params"], doc["error"]);
2017-06-06 03:05:17 +00:00
}
}
void Client::parseNotification(const char *method, const rapidjson::Value &params, const rapidjson::Value &error)
2017-06-06 03:05:17 +00:00
{
if (error.IsObject()) {
2017-06-30 00:20:50 +00:00
if (!m_quiet) {
LOG_ERR("[%s:%u] error: \"%s\", code: %d", m_url.host(), m_url.port(), error["message"].GetString(), error["code"].GetInt());
2017-06-30 00:20:50 +00:00
}
2017-06-07 04:34:14 +00:00
return;
}
2017-06-06 22:19:59 +00:00
if (!method) {
return;
}
if (strcmp(method, "job") == 0) {
int code = -1;
if (parseJob(params, &code)) {
m_listener->onJobReceived(this, m_job);
}
return;
}
2017-06-06 03:05:17 +00:00
LOG_WARN("[%s:%u] unsupported method: \"%s\"", m_url.host(), m_url.port(), method);
2017-06-06 03:05:17 +00:00
}
void Client::parseResponse(int64_t id, const rapidjson::Value &result, const rapidjson::Value &error)
2017-06-06 03:05:17 +00:00
{
if (error.IsObject()) {
const char *message = error["message"].GetString();
2017-06-30 00:20:50 +00:00
auto it = m_results.find(id);
if (it != m_results.end()) {
2017-09-01 05:02:56 +00:00
it->second.done();
m_listener->onResultAccepted(this, it->second, message);
m_results.erase(it);
}
else if (!m_quiet) {
LOG_ERR("[%s:%u] error: \"%s\", code: %d", m_url.host(), m_url.port(), message, error["code"].GetInt());
2017-06-30 00:20:50 +00:00
}
2017-06-06 03:05:17 +00:00
if (id == 1 || isCriticalError(message)) {
2017-06-06 03:05:17 +00:00
close();
}
return;
}
2017-06-06 03:35:17 +00:00
if (!result.IsObject()) {
2017-06-06 03:35:17 +00:00
return;
}
if (id == 1) {
int code = -1;
if (!parseLogin(result, &code)) {
2017-06-30 00:20:50 +00:00
if (!m_quiet) {
LOG_ERR("[%s:%u] login error code: %d", m_url.host(), m_url.port(), code);
}
2017-06-06 03:35:17 +00:00
return close();
}
2017-06-07 01:19:32 +00:00
m_failures = 0;
2017-06-06 22:19:59 +00:00
m_listener->onLoginSuccess(this);
m_listener->onJobReceived(this, m_job);
2017-06-06 03:35:17 +00:00
return;
}
auto it = m_results.find(id);
if (it != m_results.end()) {
2017-09-01 05:02:56 +00:00
it->second.done();
m_listener->onResultAccepted(this, it->second, nullptr);
m_results.erase(it);
}
2017-06-07 01:19:32 +00:00
}
2017-06-06 03:35:17 +00:00
2017-06-07 01:19:32 +00:00
2017-06-07 03:48:00 +00:00
void Client::ping()
{
send(snprintf(m_sendBuf, sizeof(m_sendBuf), "{\"id\":%" PRId64 ",\"jsonrpc\":\"2.0\",\"method\":\"keepalived\",\"params\":{\"id\":\"%s\"}}\n", m_sequence, m_rpcId));
2017-06-07 03:48:00 +00:00
}
2017-06-07 01:19:32 +00:00
void Client::reconnect()
{
2017-07-01 17:53:42 +00:00
setState(ConnectingState);
# ifndef XMRIG_PROXY_PROJECT
if (m_url.isKeepAlive()) {
2017-06-07 03:48:00 +00:00
uv_timer_stop(&m_keepAliveTimer);
}
# endif
2017-06-07 03:48:00 +00:00
2017-06-07 01:19:32 +00:00
if (m_failures == -1) {
return m_listener->onClose(this, -1);
}
m_failures++;
2017-08-14 06:30:41 +00:00
m_listener->onClose(this, (int) m_failures);
2017-06-07 01:19:32 +00:00
m_expire = uv_now(uv_default_loop()) + m_retryPause;
2017-06-04 17:52:21 +00:00
}
void Client::setState(SocketState state)
{
LOG_DEBUG("[%s:%u] state: %d", m_url.host(), m_url.port(), state);
2017-06-04 17:52:21 +00:00
if (m_state == state) {
return;
}
m_state = state;
}
2017-06-07 03:48:00 +00:00
void Client::startTimeout()
{
m_expire = 0;
# ifndef XMRIG_PROXY_PROJECT
if (!m_url.isKeepAlive()) {
2017-06-07 03:48:00 +00:00
return;
}
uv_timer_start(&m_keepAliveTimer, [](uv_timer_t *handle) { getClient(handle->data)->ping(); }, kKeepAliveTimeout, 0);
# endif
2017-06-07 03:48:00 +00:00
}
2017-06-04 17:52:21 +00:00
void Client::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
{
auto client = getClient(handle->data);
buf->base = &client->m_recvBuf.base[client->m_recvBufPos];
2017-08-15 00:13:11 +00:00
buf->len = client->m_recvBuf.len - client->m_recvBufPos;
2017-06-04 17:52:21 +00:00
}
void Client::onClose(uv_handle_t *handle)
{
auto client = getClient(handle->data);
delete client->m_socket;
2017-06-04 17:52:21 +00:00
client->m_stream = nullptr;
client->m_socket = nullptr;
client->setState(UnconnectedState);
2017-06-06 23:46:52 +00:00
2017-06-07 01:19:32 +00:00
client->reconnect();
2017-06-04 17:52:21 +00:00
}
void Client::onConnect(uv_connect_t *req, int status)
{
auto client = getClient(req->data);
if (status < 0) {
2017-06-30 00:20:50 +00:00
if (!client->m_quiet) {
LOG_ERR("[%s:%u] connect error: \"%s\"", client->m_url.host(), client->m_url.port(), uv_strerror(status));
}
delete req;
2017-06-06 23:46:52 +00:00
client->close();
2017-06-04 17:52:21 +00:00
return;
}
client->m_stream = static_cast<uv_stream_t*>(req->handle);
client->m_stream->data = req->data;
client->setState(ConnectedState);
uv_read_start(client->m_stream, Client::onAllocBuffer, Client::onRead);
delete req;
2017-06-04 17:52:21 +00:00
client->login();
2017-06-04 17:52:21 +00:00
}
void Client::onRead(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
{
auto client = getClient(stream->data);
if (nread < 0) {
2017-06-30 00:20:50 +00:00
if (nread != UV_EOF && !client->m_quiet) {
2017-08-14 06:30:41 +00:00
LOG_ERR("[%s:%u] read error: \"%s\"", client->m_url.host(), client->m_url.port(), uv_strerror((int) nread));
2017-06-04 17:52:21 +00:00
}
2017-06-06 23:46:52 +00:00
return client->close();;
2017-06-04 17:52:21 +00:00
}
if ((size_t) nread > (sizeof(m_buf) - 8 - client->m_recvBufPos)) {
2017-07-31 13:05:16 +00:00
return client->close();;
}
2017-06-04 17:52:21 +00:00
client->m_recvBufPos += nread;
char* end;
char* start = buf->base;
2017-06-04 17:52:21 +00:00
size_t remaining = client->m_recvBufPos;
while ((end = static_cast<char*>(memchr(start, '\n', remaining))) != nullptr) {
end++;
size_t len = end - start;
client->parse(start, len);
remaining -= len;
start = end;
}
if (remaining == 0) {
client->m_recvBufPos = 0;
return;
}
if (start == buf->base) {
2017-06-04 17:52:21 +00:00
return;
}
memcpy(buf->base, start, remaining);
2017-06-04 17:52:21 +00:00
client->m_recvBufPos = remaining;
}
void Client::onResolved(uv_getaddrinfo_t *req, int status, struct addrinfo *res)
{
auto client = getClient(req->data);
if (status < 0) {
LOG_ERR("[%s:%u] DNS error: \"%s\"", client->m_url.host(), client->m_url.port(), uv_strerror(status));
2017-06-07 01:19:32 +00:00
return client->reconnect();;
2017-06-04 17:52:21 +00:00
}
addrinfo *ptr = res;
std::vector<addrinfo*> ipv4;
2017-07-01 22:36:33 +00:00
while (ptr != nullptr) {
if (ptr->ai_family == AF_INET) {
ipv4.push_back(ptr);
}
ptr = ptr->ai_next;
}
if (ipv4.empty()) {
LOG_ERR("[%s:%u] DNS error: \"No IPv4 records found\"", client->m_url.host(), client->m_url.port());
return client->reconnect();
}
ptr = ipv4[rand() % ipv4.size()];
uv_ip4_name(reinterpret_cast<sockaddr_in*>(ptr->ai_addr), client->m_ip, 16);
2017-07-01 22:36:33 +00:00
client->connect(ptr->ai_addr);
2017-06-04 17:52:21 +00:00
uv_freeaddrinfo(res);
}