Backport unified client timer from proxy project.

This commit is contained in:
XMRig 2017-08-14 08:41:54 +03:00
parent 5ad1a48489
commit 9975b4e4cd
13 changed files with 105 additions and 22 deletions

View file

@ -41,6 +41,7 @@ public:
virtual void connect() = 0; virtual void connect() = 0;
virtual void resume() = 0; virtual void resume() = 0;
virtual void stop() = 0; virtual void stop() = 0;
virtual void tick(uint64_t now) = 0;
}; };

View file

@ -77,10 +77,14 @@ private:
#ifdef APP_DEBUG #ifdef APP_DEBUG
# define LOG_DEBUG(x, ...) Log::i()->message(Log::DEBUG, x, ##__VA_ARGS__) # define LOG_DEBUG(x, ...) Log::i()->message(Log::DEBUG, x, ##__VA_ARGS__)
#else
# define LOG_DEBUG(x, ...)
#endif
#if defined(APP_DEBUG) || defined(APP_DEVEL)
# define LOG_DEBUG_ERR(x, ...) Log::i()->message(Log::ERR, x, ##__VA_ARGS__) # define LOG_DEBUG_ERR(x, ...) Log::i()->message(Log::ERR, x, ##__VA_ARGS__)
# define LOG_DEBUG_WARN(x, ...) Log::i()->message(Log::WARNING, x, ##__VA_ARGS__) # define LOG_DEBUG_WARN(x, ...) Log::i()->message(Log::WARNING, x, ##__VA_ARGS__)
#else #else
# define LOG_DEBUG(x, ...)
# define LOG_DEBUG_ERR(x, ...) # define LOG_DEBUG_ERR(x, ...)
# define LOG_DEBUG_WARN(x, ...) # define LOG_DEBUG_WARN(x, ...)
#endif #endif

View file

@ -56,13 +56,14 @@ Client::Client(int id, const char *agent, IClientListener *listener) :
m_failures(0), m_failures(0),
m_recvBufPos(0), m_recvBufPos(0),
m_state(UnconnectedState), m_state(UnconnectedState),
m_expire(0),
m_stream(nullptr), m_stream(nullptr),
m_socket(nullptr) m_socket(nullptr)
{ {
memset(m_ip, 0, sizeof(m_ip)); memset(m_ip, 0, sizeof(m_ip));
memset(&m_hints, 0, sizeof(m_hints)); memset(&m_hints, 0, sizeof(m_hints));
m_resolver.data = m_responseTimer.data = m_retriesTimer.data = m_keepAliveTimer.data = this; m_resolver.data = this;
m_hints.ai_family = PF_INET; m_hints.ai_family = PF_INET;
m_hints.ai_socktype = SOCK_STREAM; m_hints.ai_socktype = SOCK_STREAM;
@ -71,10 +72,10 @@ Client::Client(int id, const char *agent, IClientListener *listener) :
m_recvBuf.base = static_cast<char*>(malloc(kRecvBufSize)); m_recvBuf.base = static_cast<char*>(malloc(kRecvBufSize));
m_recvBuf.len = kRecvBufSize; m_recvBuf.len = kRecvBufSize;
auto loop = uv_default_loop(); # ifndef XMRIG_PROXY_PROJECT
uv_timer_init(loop, &m_retriesTimer); m_keepAliveTimer.data = this;
uv_timer_init(loop, &m_responseTimer); uv_timer_init(uv_default_loop(), &m_keepAliveTimer);
uv_timer_init(loop, &m_keepAliveTimer); # endif
} }
@ -93,7 +94,7 @@ Client::~Client()
int64_t Client::send(char *data, size_t size) int64_t Client::send(char *data, size_t size)
{ {
LOG_DEBUG("[%s:%u] send (%d bytes): \"%s\"", m_url.host(), m_url.port(), size ? size : strlen(data), data); LOG_DEBUG("[%s:%u] send (%d bytes): \"%s\"", m_url.host(), m_url.port(), size ? size : strlen(data), data);
if (state() != ConnectedState) { if (state() != ConnectedState || !uv_is_writable(m_stream)) {
LOG_DEBUG_ERR("[%s:%u] send failed, invalid state: %d", m_url.host(), m_url.port(), m_state); LOG_DEBUG_ERR("[%s:%u] send failed, invalid state: %d", m_url.host(), m_url.port(), m_state);
return -1; return -1;
} }
@ -108,8 +109,7 @@ int64_t Client::send(char *data, size_t size)
delete req; delete req;
}); });
uv_timer_start(&m_responseTimer, [](uv_timer_t *handle) { getClient(handle->data)->close(); }, kResponseTimeout, 0); m_expire = uv_now(uv_default_loop()) + kResponseTimeout;
return m_sequence++; return m_sequence++;
} }
@ -134,9 +134,11 @@ void Client::connect(const Url *url)
void Client::disconnect() void Client::disconnect()
{ {
# ifndef XMRIG_PROXY_PROJECT
uv_timer_stop(&m_keepAliveTimer); uv_timer_stop(&m_keepAliveTimer);
uv_timer_stop(&m_responseTimer); # endif
uv_timer_stop(&m_retriesTimer);
m_expire = 0;
m_failures = -1; m_failures = -1;
close(); close();
@ -153,6 +155,24 @@ void Client::setUrl(const Url *url)
} }
void Client::tick(uint64_t now)
{
if (m_expire == 0 || now < m_expire) {
return;
}
if (m_state == ConnectedState) {
LOG_DEBUG_ERR("[%s:%u] timeout", m_url.host(), m_url.port());
close();
}
if (m_state == ConnectingState) {
connect();
}
}
int64_t Client::submit(const JobResult &result) int64_t Client::submit(const JobResult &result)
{ {
char *req = static_cast<char*>(malloc(345)); char *req = static_cast<char*>(malloc(345));
@ -231,6 +251,7 @@ int Client::resolve(const char *host)
{ {
setState(HostLookupState); setState(HostLookupState);
m_expire = 0;
m_recvBufPos = 0; m_recvBufPos = 0;
if (m_failures == -1) { if (m_failures == -1) {
@ -432,10 +453,11 @@ void Client::reconnect()
{ {
setState(ConnectingState); setState(ConnectingState);
uv_timer_stop(&m_responseTimer); # ifndef XMRIG_PROXY_PROJECT
if (m_url.isKeepAlive()) { if (m_url.isKeepAlive()) {
uv_timer_stop(&m_keepAliveTimer); uv_timer_stop(&m_keepAliveTimer);
} }
# endif
if (m_failures == -1) { if (m_failures == -1) {
return m_listener->onClose(this, -1); return m_listener->onClose(this, -1);
@ -444,7 +466,7 @@ void Client::reconnect()
m_failures++; m_failures++;
m_listener->onClose(this, m_failures); m_listener->onClose(this, m_failures);
uv_timer_start(&m_retriesTimer, [](uv_timer_t *handle) { getClient(handle->data)->connect(); }, m_retryPause, 0); m_expire = uv_now(uv_default_loop()) + m_retryPause;
} }
@ -462,12 +484,15 @@ void Client::setState(SocketState state)
void Client::startTimeout() void Client::startTimeout()
{ {
uv_timer_stop(&m_responseTimer); m_expire = 0;
# ifndef XMRIG_PROXY_PROJECT
if (!m_url.isKeepAlive()) { if (!m_url.isKeepAlive()) {
return; return;
} }
uv_timer_start(&m_keepAliveTimer, [](uv_timer_t *handle) { getClient(handle->data)->ping(); }, kKeepAliveTimeout, 0); uv_timer_start(&m_keepAliveTimer, [](uv_timer_t *handle) { getClient(handle->data)->ping(); }, kKeepAliveTimeout, 0);
# endif
} }

View file

@ -62,6 +62,7 @@ public:
void connect(const Url *url); void connect(const Url *url);
void disconnect(); void disconnect();
void setUrl(const Url *url); void setUrl(const Url *url);
void tick(uint64_t now);
inline bool isReady() const { return m_state == ConnectedState && m_failures == 0; } inline bool isReady() const { return m_state == ConnectedState && m_failures == 0; }
inline const char *host() const { return m_url.host(); } inline const char *host() const { return m_url.host(); }
@ -112,14 +113,16 @@ private:
SocketState m_state; SocketState m_state;
static int64_t m_sequence; static int64_t m_sequence;
std::map<int64_t, SubmitResult> m_results; std::map<int64_t, SubmitResult> m_results;
uint64_t m_expire;
Url m_url; Url m_url;
uv_buf_t m_recvBuf; uv_buf_t m_recvBuf;
uv_getaddrinfo_t m_resolver; uv_getaddrinfo_t m_resolver;
uv_stream_t *m_stream; uv_stream_t *m_stream;
uv_tcp_t *m_socket; uv_tcp_t *m_socket;
# ifndef XMRIG_PROXY_PROJECT
uv_timer_t m_keepAliveTimer; uv_timer_t m_keepAliveTimer;
uv_timer_t m_responseTimer; # endif
uv_timer_t m_retriesTimer;
}; };

View file

@ -39,7 +39,6 @@
Network::Network(const Options *options) : Network::Network(const Options *options) :
m_donateActive(false),
m_options(options), m_options(options),
m_donate(nullptr), m_donate(nullptr),
m_accepted(0), m_accepted(0),
@ -62,6 +61,11 @@ Network::Network(const Options *options) :
if (m_options->donateLevel() > 0) { if (m_options->donateLevel() > 0) {
m_donate = new DonateStrategy(m_agent, this); m_donate = new DonateStrategy(m_agent, this);
} }
m_timer.data = this;
uv_timer_init(uv_default_loop(), &m_timer);
uv_timer_start(&m_timer, Network::onTick, kTickInterval, kTickInterval);
} }
@ -164,3 +168,21 @@ void Network::setJob(Client *client, const Job &job)
Workers::setJob(job); Workers::setJob(job);
} }
void Network::tick()
{
const uint64_t now = uv_now(uv_default_loop());
m_strategy->tick(now);
if (m_donate) {
m_donate->tick(now);
}
}
void Network::onTick(uv_timer_t *handle)
{
static_cast<Network*>(handle->data)->tick();
}

View file

@ -57,15 +57,20 @@ protected:
void onResultAccepted(Client *client, int64_t seq, uint32_t diff, uint64_t ms, const char *error) override; void onResultAccepted(Client *client, int64_t seq, uint32_t diff, uint64_t ms, const char *error) override;
private: private:
void setJob(Client *client, const Job &job); constexpr static int kTickInterval = 1 * 1000;
void setJob(Client *client, const Job &job);
void tick();
static void onTick(uv_timer_t *handle);
bool m_donateActive;
char *m_agent; char *m_agent;
const Options *m_options; const Options *m_options;
IStrategy *m_donate; IStrategy *m_donate;
IStrategy *m_strategy; IStrategy *m_strategy;
uint64_t m_accepted; uint64_t m_accepted;
uint64_t m_rejected; uint64_t m_rejected;
uv_timer_t m_timer;
}; };

View file

@ -69,6 +69,12 @@ void DonateStrategy::stop()
} }
void DonateStrategy::tick(uint64_t now)
{
m_client->tick(now);
}
void DonateStrategy::onClose(Client *client, int failures) void DonateStrategy::onClose(Client *client, int failures)
{ {
} }

View file

@ -49,6 +49,7 @@ public:
int64_t submit(const JobResult &result) override; int64_t submit(const JobResult &result) override;
void connect() override; void connect() override;
void stop() override; void stop() override;
void tick(uint64_t now) override;
protected: protected:
void onClose(Client *client, int failures) override; void onClose(Client *client, int failures) override;

View file

@ -74,6 +74,14 @@ void FailoverStrategy::stop()
} }
void FailoverStrategy::tick(uint64_t now)
{
for (Client *client : m_pools) {
client->tick(now);
}
}
void FailoverStrategy::onClose(Client *client, int failures) void FailoverStrategy::onClose(Client *client, int failures)
{ {
if (failures == -1) { if (failures == -1) {

View file

@ -49,6 +49,7 @@ public:
void connect() override; void connect() override;
void resume() override; void resume() override;
void stop() override; void stop() override;
void tick(uint64_t now) override;
protected: protected:
void onClose(Client *client, int failures) override; void onClose(Client *client, int failures) override;

View file

@ -66,6 +66,12 @@ void SinglePoolStrategy::stop()
} }
void SinglePoolStrategy::tick(uint64_t now)
{
m_client->tick(now);
}
void SinglePoolStrategy::onClose(Client *client, int failures) void SinglePoolStrategy::onClose(Client *client, int failures)
{ {
if (!isActive()) { if (!isActive()) {

View file

@ -46,6 +46,7 @@ public:
void connect() override; void connect() override;
void resume() override; void resume() override;
void stop() override; void stop() override;
void tick(uint64_t now) override;
protected: protected:
void onClose(Client *client, int failures) override; void onClose(Client *client, int failures) override;

View file

@ -27,14 +27,14 @@
#define APP_ID "xmrig" #define APP_ID "xmrig"
#define APP_NAME "XMRig" #define APP_NAME "XMRig"
#define APP_DESC "Monero (XMR) CPU miner" #define APP_DESC "Monero (XMR) CPU miner"
#define APP_VERSION "2.2.1" #define APP_VERSION "2.3.0-dev"
#define APP_DOMAIN "xmrig.com" #define APP_DOMAIN "xmrig.com"
#define APP_SITE "www.xmrig.com" #define APP_SITE "www.xmrig.com"
#define APP_COPYRIGHT "Copyright (C) 2016-2017 xmrig.com" #define APP_COPYRIGHT "Copyright (C) 2016-2017 xmrig.com"
#define APP_VER_MAJOR 2 #define APP_VER_MAJOR 2
#define APP_VER_MINOR 2 #define APP_VER_MINOR 3
#define APP_VER_BUILD 1 #define APP_VER_BUILD 0
#define APP_VER_REV 0 #define APP_VER_REV 0
#ifdef _MSC_VER #ifdef _MSC_VER