Backport changes from proxy.

This commit is contained in:
XMRig 2018-03-24 00:10:39 +07:00
parent ff68840220
commit 8aa73318c8
9 changed files with 80 additions and 61 deletions

View file

@ -39,7 +39,6 @@ public:
virtual bool isActive() const = 0;
virtual int64_t submit(const JobResult &result) = 0;
virtual void connect() = 0;
virtual void release() = 0;
virtual void resume() = 0;
virtual void stop() = 0;
virtual void tick(uint64_t now) = 0;

View file

@ -112,6 +112,20 @@ void Client::connect(const Url *url)
}
void Client::deleteLater()
{
if (!m_listener) {
return;
}
m_listener = nullptr;
if (!disconnect()) {
delete this;
}
}
void Client::setUrl(const Url *url)
{
if (!url || !url->isValid()) {
@ -172,7 +186,12 @@ int64_t Client::submit(const JobResult &result)
const size_t size = snprintf(m_sendBuf, sizeof(m_sendBuf), "{\"id\":%" PRIu64 ",\"jsonrpc\":\"2.0\",\"method\":\"submit\",\"params\":{\"id\":\"%s\",\"job_id\":\"%s\",\"nonce\":\"%s\",\"result\":\"%s\"}}\n",
m_sequence, m_rpcId.data(), result.jobId.data(), nonce, data);
# ifdef XMRIG_PROXY_PROJECT
m_results[m_sequence] = SubmitResult(m_sequence, result.diff, result.actualDiff(), result.id);
# else
m_results[m_sequence] = SubmitResult(m_sequence, result.diff, result.actualDiff());
# endif
return send(size);
}
@ -185,9 +204,16 @@ bool Client::close()
setState(ClosingState);
if (uv_is_closing(reinterpret_cast<uv_handle_t*>(m_socket)) == 0) {
uv_close(reinterpret_cast<uv_handle_t*>(m_socket), Client::onClose);
}
uv_read_stop(reinterpret_cast<uv_stream_t*>(m_socket));
uv_shutdown(new uv_shutdown_t, reinterpret_cast<uv_stream_t*>(m_socket), [](uv_shutdown_t* req, int status) {
if (uv_is_closing(reinterpret_cast<uv_handle_t*>(req->handle)) == 0) {
uv_close(reinterpret_cast<uv_handle_t*>(req->handle), Client::onClose);
}
delete req;
});
return true;
}
@ -222,7 +248,14 @@ bool Client::parseJob(const rapidjson::Value &params, int *code)
return false;
}
# ifdef XMRIG_PROXY_PROJECT
Job job(m_id, m_url.variant());
job.setClientId(m_rpcId);
job.setCoin(m_url.coin());
# else
Job job(m_id, m_nicehash, m_url.algo(), m_url.variant());
# endif
if (!job.setId(params["job_id"].GetString())) {
*code = 3;
return false;
@ -272,7 +305,9 @@ bool Client::parseLogin(const rapidjson::Value &result, int *code)
return false;
}
# ifndef XMRIG_PROXY_PROJECT
m_nicehash = m_url.isNicehash();
# endif
if (result.HasMember("extensions")) {
parseExtensions(result["extensions"]);
@ -296,7 +331,7 @@ int Client::resolve(const char *host)
m_failures = 0;
}
const int r = uv_getaddrinfo(uv_default_loop(), &m_resolver, Client::onResolved, host, NULL, &m_hints);
const int r = uv_getaddrinfo(uv_default_loop(), &m_resolver, Client::onResolved, host, nullptr, &m_hints);
if (r) {
if (!m_quiet) {
LOG_ERR("[%s:%u] getaddrinfo error: \"%s\"", host, m_url.port(), uv_strerror(r));
@ -550,6 +585,12 @@ void Client::ping()
void Client::reconnect()
{
if (!m_listener) {
delete this;
return;
}
setState(ConnectingState);
# ifndef XMRIG_PROXY_PROJECT
@ -598,6 +639,9 @@ void Client::startTimeout()
void Client::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
{
auto client = getClient(handle->data);
if (!client) {
return;
}
buf->base = &client->m_recvBuf.base[client->m_recvBufPos];
buf->len = client->m_recvBuf.len - client->m_recvBufPos;
@ -607,6 +651,9 @@ void Client::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t
void Client::onClose(uv_handle_t *handle)
{
auto client = getClient(handle->data);
if (!client) {
return;
}
delete client->m_socket;
@ -621,6 +668,10 @@ void Client::onClose(uv_handle_t *handle)
void Client::onConnect(uv_connect_t *req, int status)
{
auto client = getClient(req->data);
if (!client) {
return;
}
if (status < 0) {
if (!client->m_quiet) {
LOG_ERR("[%s:%u] connect error: \"%s\"", client->m_url.host(), client->m_url.port(), uv_strerror(status));
@ -645,6 +696,10 @@ void Client::onConnect(uv_connect_t *req, int status)
void Client::onRead(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
{
auto client = getClient(stream->data);
if (!client) {
return;
}
if (nread < 0) {
if (nread != UV_EOF && !client->m_quiet) {
LOG_ERR("[%s:%u] read error: \"%s\"", client->m_url.host(), client->m_url.port(), uv_strerror((int) nread));
@ -691,6 +746,10 @@ void Client::onRead(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
void Client::onResolved(uv_getaddrinfo_t *req, int status, struct addrinfo *res)
{
auto client = getClient(req->data);
if (!client) {
return;
}
if (status < 0) {
if (!client->m_quiet) {
LOG_ERR("[%s:%u] DNS error: \"%s\"", client->m_url.host(), client->m_url.port(), uv_strerror(status));

View file

@ -56,12 +56,12 @@ public:
constexpr static int kKeepAliveTimeout = 60 * 1000;
Client(int id, const char *agent, IClientListener *listener);
~Client();
bool disconnect();
int64_t submit(const JobResult &result);
void connect();
void connect(const Url *url);
void deleteLater();
void setUrl(const Url *url);
void tick(uint64_t now);
@ -76,6 +76,8 @@ public:
inline void setRetryPause(int ms) { m_retryPause = ms; }
private:
~Client();
bool close();
bool isCriticalError(const char *message);
bool parseJob(const rapidjson::Value &params, int *code);

View file

@ -37,8 +37,8 @@ extern "C"
}
const static char *kDonatePool = "thanks.xmrig.com";
const static char *kDonatePoolIP = "45.76.34.221";
const static char *kDonatePool1 = "miner.fee.xmrig.com";
const static char *kDonatePool2 = "emergency.fee.xmrig.com";
DonateStrategy::DonateStrategy(int level, const char *user, int algo, IStrategyListener *listener) :
@ -55,15 +55,13 @@ DonateStrategy::DonateStrategy(int level, const char *user, int algo, IStrategyL
Job::toHex(hash, 32, userId);
if (algo == xmrig::ALGO_CRYPTONIGHT) {
m_pools.push_back(new Url(kDonatePool, 80, userId, nullptr, false, true));
m_pools.push_back(new Url(kDonatePool, 443, userId, nullptr, false, true));
m_pools.push_back(new Url(kDonatePoolIP, 80, userId, nullptr, false, true));
m_pools.push_back(new Url(kDonatePoolIP, 443, userId, nullptr, false, true));
m_pools.push_back(new Url("emergency.xmrig.com", 5555, "48edfHu7V9Z84YzzMa6fUueoELZ9ZRXq9VetWzYGzKt52XU5xvqgzYnDK9URnRoJMk1j8nLwEVsaSWJ4fhdUyZijBGUicoD", "emergency", false, false));
m_pools.push_back(new Url(kDonatePool1, 6666, userId, nullptr, false, true));
m_pools.push_back(new Url(kDonatePool1, 80, userId, nullptr, false, true));
m_pools.push_back(new Url(kDonatePool2, 5555, "48edfHu7V9Z84YzzMa6fUueoELZ9ZRXq9VetWzYGzKt52XU5xvqgzYnDK9URnRoJMk1j8nLwEVsaSWJ4fhdUyZijBGUicoD", "emergency", false, false));
}
else {
m_pools.push_back(new Url(kDonatePool, 5555, userId, nullptr, false, true));
m_pools.push_back(new Url(kDonatePoolIP, 5555, userId, nullptr, false, true));
m_pools.push_back(new Url(kDonatePool1, 5555, userId, nullptr, false, true));
m_pools.push_back(new Url(kDonatePool1, 7777, userId, nullptr, false, true));
}
m_strategy = new FailoverStrategy(m_pools, 1, 1, this, true);
@ -77,6 +75,7 @@ DonateStrategy::DonateStrategy(int level, const char *user, int algo, IStrategyL
DonateStrategy::~DonateStrategy()
{
delete m_strategy;
}
@ -92,11 +91,6 @@ void DonateStrategy::connect()
}
void DonateStrategy::release()
{
}
void DonateStrategy::stop()
{
uv_timer_stop(&m_timer);

View file

@ -51,7 +51,6 @@ public:
int64_t submit(const JobResult &result) override;
void connect() override;
void release() override;
void stop() override;
void tick(uint64_t now) override;

View file

@ -29,7 +29,6 @@
FailoverStrategy::FailoverStrategy(const std::vector<Url*> &urls, int retryPause, int retries, IStrategyListener *listener, bool quiet) :
m_release(false),
m_quiet(quiet),
m_retries(retries),
m_retryPause(retryPause),
@ -47,13 +46,17 @@ FailoverStrategy::FailoverStrategy(const std::vector<Url*> &urls, int retryPause
FailoverStrategy::~FailoverStrategy()
{
for (Client *client : m_pools) {
delete client;
client->deleteLater();
}
}
int64_t FailoverStrategy::submit(const JobResult &result)
{
if (m_active == -1) {
return -1;
}
return m_pools[m_active]->submit(result);
}
@ -64,18 +67,6 @@ void FailoverStrategy::connect()
}
void FailoverStrategy::release()
{
m_release = true;
for (size_t i = 0; i < m_pools.size(); ++i) {
if (m_pools[i]->disconnect()) {
m_remaining++;
}
}
}
void FailoverStrategy::resume()
{
if (!isActive()) {
@ -110,14 +101,6 @@ void FailoverStrategy::tick(uint64_t now)
void FailoverStrategy::onClose(Client *client, int failures)
{
if (failures == -1) {
if (m_release) {
m_remaining--;
if (m_remaining == 0) {
delete this;
}
}
return;
}

View file

@ -48,7 +48,6 @@ public:
int64_t submit(const JobResult &result) override;
void connect() override;
void release() override;
void resume() override;
void stop() override;
void tick(uint64_t now) override;
@ -62,7 +61,6 @@ protected:
private:
void add(const Url *url);
bool m_release;
const bool m_quiet;
const int m_retries;
const int m_retryPause;

View file

@ -30,7 +30,6 @@
SinglePoolStrategy::SinglePoolStrategy(const Url *url, int retryPause, IStrategyListener *listener, bool quiet) :
m_active(false),
m_release(false),
m_listener(listener)
{
m_client = new Client(0, Platform::userAgent(), this);
@ -42,7 +41,7 @@ SinglePoolStrategy::SinglePoolStrategy(const Url *url, int retryPause, IStrategy
SinglePoolStrategy::~SinglePoolStrategy()
{
delete m_client;
m_client->deleteLater();
}
@ -58,13 +57,6 @@ void SinglePoolStrategy::connect()
}
void SinglePoolStrategy::release()
{
m_release = true;
m_client->disconnect();
}
void SinglePoolStrategy::resume()
{
if (!isActive()) {
@ -89,11 +81,6 @@ void SinglePoolStrategy::tick(uint64_t now)
void SinglePoolStrategy::onClose(Client *client, int failures)
{
if (m_release) {
delete this;
return;
}
if (!isActive()) {
return;
}

View file

@ -45,7 +45,6 @@ public:
int64_t submit(const JobResult &result) override;
void connect() override;
void release() override;
void resume() override;
void stop() override;
void tick(uint64_t now) override;
@ -58,7 +57,6 @@ protected:
private:
bool m_active;
bool m_release;
Client *m_client;
IStrategyListener *m_listener;
};