Added send with callback.

This commit is contained in:
XMRig 2019-10-16 19:34:33 +07:00
parent 3752551e53
commit 83a5923568
9 changed files with 127 additions and 72 deletions

View file

@ -29,6 +29,9 @@
#include "rapidjson/fwd.h"
#include <functional>
namespace xmrig {
@ -51,32 +54,35 @@ public:
EXT_MAX
};
using Callback = std::function<void(const rapidjson::Value &result, bool success, uint64_t elapsed)>;
virtual ~IClient() = default;
virtual bool disconnect() = 0;
virtual bool hasExtension(Extension extension) const noexcept = 0;
virtual bool isEnabled() const = 0;
virtual bool isTLS() const = 0;
virtual const char *mode() const = 0;
virtual const char *tlsFingerprint() const = 0;
virtual const char *tlsVersion() const = 0;
virtual const Job &job() const = 0;
virtual const Pool &pool() const = 0;
virtual const String &ip() const = 0;
virtual int id() const = 0;
virtual int64_t send(const rapidjson::Value &obj) = 0;
virtual int64_t sequence() const = 0;
virtual int64_t submit(const JobResult &result) = 0;
virtual void connect() = 0;
virtual void connect(const Pool &pool) = 0;
virtual void deleteLater() = 0;
virtual void setAlgo(const Algorithm &algo) = 0;
virtual void setEnabled(bool enabled) = 0;
virtual void setPool(const Pool &pool) = 0;
virtual void setQuiet(bool quiet) = 0;
virtual void setRetries(int retries) = 0;
virtual void setRetryPause(uint64_t ms) = 0;
virtual void tick(uint64_t now) = 0;
virtual bool disconnect() = 0;
virtual bool hasExtension(Extension extension) const noexcept = 0;
virtual bool isEnabled() const = 0;
virtual bool isTLS() const = 0;
virtual const char *mode() const = 0;
virtual const char *tlsFingerprint() const = 0;
virtual const char *tlsVersion() const = 0;
virtual const Job &job() const = 0;
virtual const Pool &pool() const = 0;
virtual const String &ip() const = 0;
virtual int id() const = 0;
virtual int64_t send(const rapidjson::Value &obj, Callback callback) = 0;
virtual int64_t send(const rapidjson::Value &obj) = 0;
virtual int64_t sequence() const = 0;
virtual int64_t submit(const JobResult &result) = 0;
virtual void connect() = 0;
virtual void connect(const Pool &pool) = 0;
virtual void deleteLater() = 0;
virtual void setAlgo(const Algorithm &algo) = 0;
virtual void setEnabled(bool enabled) = 0;
virtual void setPool(const Pool &pool) = 0;
virtual void setQuiet(bool quiet) = 0;
virtual void setRetries(int retries) = 0;
virtual void setRetryPause(uint64_t ms) = 0;
virtual void tick(uint64_t now) = 0;
};

View file

@ -26,6 +26,7 @@
#include "base/kernel/interfaces/IClientListener.h"
#include "base/net/stratum/BaseClient.h"
#include "base/net/stratum/SubmitResult.h"
#include "rapidjson/document.h"
namespace xmrig {
@ -42,6 +43,32 @@ xmrig::BaseClient::BaseClient(int id, IClientListener *listener) :
}
bool xmrig::BaseClient::handleResponse(int64_t id, const rapidjson::Value &result, const rapidjson::Value &error)
{
if (id == 1) {
return false;
}
auto it = m_callbacks.find(id);
if (it != m_callbacks.end()) {
const uint64_t elapsed = Chrono::steadyMSecs() - it->second.ts;
if (error.IsObject()) {
it->second.callback(error, false, elapsed);
}
else {
it->second.callback(result, true, elapsed);
}
m_callbacks.erase(it);
return true;
}
return false;
}
bool xmrig::BaseClient::handleSubmitResponse(int64_t id, const char *error)
{
auto it = m_results.find(id);

View file

@ -32,6 +32,7 @@
#include "base/kernel/interfaces/IClient.h"
#include "base/net/stratum/Job.h"
#include "base/net/stratum/Pool.h"
#include "base/tools/Chrono.h"
namespace xmrig {
@ -70,8 +71,17 @@ protected:
ReconnectingState
};
struct SendResult
{
inline SendResult(Callback &&callback) : callback(callback), ts(Chrono::steadyMSecs()) {}
Callback callback;
const uint64_t ts;
};
inline bool isQuiet() const { return m_quiet || m_failures >= m_retries; }
bool handleResponse(int64_t id, const rapidjson::Value &result, const rapidjson::Value &error);
bool handleSubmitResponse(int64_t id, const char *error = nullptr);
bool m_quiet = false;
@ -82,6 +92,7 @@ protected:
Job m_job;
Pool m_pool;
SocketState m_state = UnconnectedState;
std::map<int64_t, SendResult> m_callbacks;
std::map<int64_t, SubmitResult> m_results;
String m_ip;
uint64_t m_retryPause = 5000;

View file

@ -137,6 +137,16 @@ const char *xmrig::Client::tlsVersion() const
}
int64_t xmrig::Client::send(const rapidjson::Value &obj, Callback callback)
{
assert(obj["id"] == sequence());
m_callbacks.insert({ sequence(), std::move(callback) });
return send(obj);
}
int64_t xmrig::Client::send(const rapidjson::Value &obj)
{
using namespace rapidjson;
@ -736,6 +746,10 @@ void xmrig::Client::parseNotification(const char *method, const rapidjson::Value
void xmrig::Client::parseResponse(int64_t id, const rapidjson::Value &result, const rapidjson::Value &error)
{
if (handleResponse(id, result, error)) {
return;
}
if (error.IsObject()) {
const char *message = error["message"].GetString();

View file

@ -77,6 +77,7 @@ protected:
bool isTLS() const override;
const char *tlsFingerprint() const override;
const char *tlsVersion() const override;
int64_t send(const rapidjson::Value &obj, Callback callback) override;
int64_t send(const rapidjson::Value &obj) override;
int64_t submit(const JobResult &result) override;
void connect() override;

View file

@ -54,13 +54,14 @@ protected:
void onHttpData(const HttpData &data) override;
void onTimer(const Timer *timer) override;
inline bool hasExtension(Extension) const noexcept override { return false; }
inline const char *mode() const override { return "daemon"; }
inline const char *tlsFingerprint() const override { return m_tlsFingerprint; }
inline const char *tlsVersion() const override { return m_tlsVersion; }
inline int64_t send(const rapidjson::Value &) override { return -1; }
inline void deleteLater() override { delete this; }
inline void tick(uint64_t) override {}
inline bool hasExtension(Extension) const noexcept override { return false; }
inline const char *mode() const override { return "daemon"; }
inline const char *tlsFingerprint() const override { return m_tlsFingerprint; }
inline const char *tlsVersion() const override { return m_tlsVersion; }
inline int64_t send(const rapidjson::Value &, Callback) override { return -1; }
inline int64_t send(const rapidjson::Value &) override { return -1; }
inline void deleteLater() override { delete this; }
inline void tick(uint64_t) override {}
private:
bool isOutdated(uint64_t height, const char *hash) const;

View file

@ -122,7 +122,7 @@ bool xmrig::SelfSelectClient::parseResponse(int64_t id, rapidjson::Value &result
submitBlockTemplate(result);
m_listener->onJobReceived(this, m_job, rapidjson::Value{});
return true;
}
@ -207,7 +207,9 @@ void xmrig::SelfSelectClient::submitBlockTemplate(rapidjson::Value &result)
JsonRequest::create(doc, sequence(), "block_template", params);
send(doc);
send(doc, [this](const rapidjson::Value &result, bool success, uint64_t elapsed) {
m_listener->onJobReceived(this, m_job, rapidjson::Value{});
});
}

View file

@ -47,30 +47,31 @@ public:
protected:
// IClient
inline bool disconnect() override { return m_client->disconnect(); }
inline bool hasExtension(Extension extension) const noexcept override { return m_client->hasExtension(extension); }
inline bool isEnabled() const override { return m_client->isEnabled(); }
inline bool isTLS() const override { return m_client->isTLS(); }
inline const char *mode() const override { return m_client->mode(); }
inline const char *tlsFingerprint() const override { return m_client->tlsFingerprint(); }
inline const char *tlsVersion() const override { return m_client->tlsVersion(); }
inline const Job &job() const override { return m_client->job(); }
inline const Pool &pool() const override { return m_client->pool(); }
inline const String &ip() const override { return m_client->ip(); }
inline int id() const override { return m_client->id(); }
inline int64_t send(const rapidjson::Value &obj) override { return m_client->send(obj); }
inline int64_t sequence() const override { return m_client->sequence(); }
inline int64_t submit(const JobResult &result) override { return m_client->submit(result); }
inline void connect() override { m_client->connect(); }
inline void connect(const Pool &pool) override { m_client->connect(pool); }
inline void deleteLater() override { m_client->deleteLater(); }
inline void setAlgo(const Algorithm &algo) override { m_client->setAlgo(algo); }
inline void setEnabled(bool enabled) override { m_client->setEnabled(enabled); }
inline void setPool(const Pool &pool) override { m_client->setPool(pool); }
inline void setQuiet(bool quiet) override { m_client->setQuiet(quiet); m_quiet = quiet; }
inline void setRetries(int retries) override { m_client->setRetries(retries); }
inline void setRetryPause(uint64_t ms) override { m_client->setRetryPause(ms); }
inline void tick(uint64_t now) override { m_client->tick(now); }
inline bool disconnect() override { return m_client->disconnect(); }
inline bool hasExtension(Extension extension) const noexcept override { return m_client->hasExtension(extension); }
inline bool isEnabled() const override { return m_client->isEnabled(); }
inline bool isTLS() const override { return m_client->isTLS(); }
inline const char *mode() const override { return m_client->mode(); }
inline const char *tlsFingerprint() const override { return m_client->tlsFingerprint(); }
inline const char *tlsVersion() const override { return m_client->tlsVersion(); }
inline const Job &job() const override { return m_client->job(); }
inline const Pool &pool() const override { return m_client->pool(); }
inline const String &ip() const override { return m_client->ip(); }
inline int id() const override { return m_client->id(); }
inline int64_t send(const rapidjson::Value &obj, Callback callback) override { return m_client->send(obj, callback); }
inline int64_t send(const rapidjson::Value &obj) override { return m_client->send(obj); }
inline int64_t sequence() const override { return m_client->sequence(); }
inline int64_t submit(const JobResult &result) override { return m_client->submit(result); }
inline void connect() override { m_client->connect(); }
inline void connect(const Pool &pool) override { m_client->connect(pool); }
inline void deleteLater() override { m_client->deleteLater(); }
inline void setAlgo(const Algorithm &algo) override { m_client->setAlgo(algo); }
inline void setEnabled(bool enabled) override { m_client->setEnabled(enabled); }
inline void setPool(const Pool &pool) override { m_client->setPool(pool); }
inline void setQuiet(bool quiet) override { m_client->setQuiet(quiet); m_quiet = quiet; }
inline void setRetries(int retries) override { m_client->setRetries(retries); }
inline void setRetryPause(uint64_t ms) override { m_client->setRetryPause(ms); }
inline void tick(uint64_t now) override { m_client->tick(now); }
// IClientListener
inline void onClose(IClient *, int failures) override { m_listener->onClose(this, failures); }

View file

@ -35,34 +35,26 @@ namespace xmrig {
class SubmitResult
{
public:
inline SubmitResult() :
reqId(0),
seq(0),
actualDiff(0),
diff(0),
elapsed(0),
m_start(0)
{}
SubmitResult() = default;
inline SubmitResult(int64_t seq, uint64_t diff, uint64_t actualDiff, int64_t reqId = 0) :
reqId(reqId),
seq(seq),
actualDiff(actualDiff),
diff(diff),
elapsed(0),
m_start(Chrono::steadyMSecs())
{}
inline void done() { elapsed = Chrono::steadyMSecs() - m_start; }
int64_t reqId;
int64_t seq;
uint64_t actualDiff;
uint64_t diff;
uint64_t elapsed;
int64_t reqId = 0;
int64_t seq = 0;
uint64_t actualDiff = 0;
uint64_t diff = 0;
uint64_t elapsed = 0;
private:
uint64_t m_start;
uint64_t m_start = 0;
};