diff --git a/src/base/kernel/interfaces/IClient.h b/src/base/kernel/interfaces/IClient.h index 06e129684..db88638ad 100644 --- a/src/base/kernel/interfaces/IClient.h +++ b/src/base/kernel/interfaces/IClient.h @@ -29,6 +29,9 @@ #include "rapidjson/fwd.h" +#include + + namespace xmrig { @@ -51,32 +54,35 @@ public: EXT_MAX }; + using Callback = std::function; + virtual ~IClient() = default; - virtual bool disconnect() = 0; - virtual bool hasExtension(Extension extension) const noexcept = 0; - virtual bool isEnabled() const = 0; - virtual bool isTLS() const = 0; - virtual const char *mode() const = 0; - virtual const char *tlsFingerprint() const = 0; - virtual const char *tlsVersion() const = 0; - virtual const Job &job() const = 0; - virtual const Pool &pool() const = 0; - virtual const String &ip() const = 0; - virtual int id() const = 0; - virtual int64_t send(const rapidjson::Value &obj) = 0; - virtual int64_t sequence() const = 0; - virtual int64_t submit(const JobResult &result) = 0; - virtual void connect() = 0; - virtual void connect(const Pool &pool) = 0; - virtual void deleteLater() = 0; - virtual void setAlgo(const Algorithm &algo) = 0; - virtual void setEnabled(bool enabled) = 0; - virtual void setPool(const Pool &pool) = 0; - virtual void setQuiet(bool quiet) = 0; - virtual void setRetries(int retries) = 0; - virtual void setRetryPause(uint64_t ms) = 0; - virtual void tick(uint64_t now) = 0; + virtual bool disconnect() = 0; + virtual bool hasExtension(Extension extension) const noexcept = 0; + virtual bool isEnabled() const = 0; + virtual bool isTLS() const = 0; + virtual const char *mode() const = 0; + virtual const char *tlsFingerprint() const = 0; + virtual const char *tlsVersion() const = 0; + virtual const Job &job() const = 0; + virtual const Pool &pool() const = 0; + virtual const String &ip() const = 0; + virtual int id() const = 0; + virtual int64_t send(const rapidjson::Value &obj, Callback callback) = 0; + virtual int64_t send(const rapidjson::Value &obj) = 0; + virtual int64_t sequence() const = 0; + virtual int64_t submit(const JobResult &result) = 0; + virtual void connect() = 0; + virtual void connect(const Pool &pool) = 0; + virtual void deleteLater() = 0; + virtual void setAlgo(const Algorithm &algo) = 0; + virtual void setEnabled(bool enabled) = 0; + virtual void setPool(const Pool &pool) = 0; + virtual void setQuiet(bool quiet) = 0; + virtual void setRetries(int retries) = 0; + virtual void setRetryPause(uint64_t ms) = 0; + virtual void tick(uint64_t now) = 0; }; diff --git a/src/base/net/stratum/BaseClient.cpp b/src/base/net/stratum/BaseClient.cpp index ccffa7ce7..56e5ad7c5 100644 --- a/src/base/net/stratum/BaseClient.cpp +++ b/src/base/net/stratum/BaseClient.cpp @@ -26,6 +26,7 @@ #include "base/kernel/interfaces/IClientListener.h" #include "base/net/stratum/BaseClient.h" #include "base/net/stratum/SubmitResult.h" +#include "rapidjson/document.h" namespace xmrig { @@ -42,6 +43,32 @@ xmrig::BaseClient::BaseClient(int id, IClientListener *listener) : } +bool xmrig::BaseClient::handleResponse(int64_t id, const rapidjson::Value &result, const rapidjson::Value &error) +{ + if (id == 1) { + return false; + } + + auto it = m_callbacks.find(id); + if (it != m_callbacks.end()) { + const uint64_t elapsed = Chrono::steadyMSecs() - it->second.ts; + + if (error.IsObject()) { + it->second.callback(error, false, elapsed); + } + else { + it->second.callback(result, true, elapsed); + } + + m_callbacks.erase(it); + + return true; + } + + return false; +} + + bool xmrig::BaseClient::handleSubmitResponse(int64_t id, const char *error) { auto it = m_results.find(id); diff --git a/src/base/net/stratum/BaseClient.h b/src/base/net/stratum/BaseClient.h index 0d73dc9bf..974e61a5e 100644 --- a/src/base/net/stratum/BaseClient.h +++ b/src/base/net/stratum/BaseClient.h @@ -32,6 +32,7 @@ #include "base/kernel/interfaces/IClient.h" #include "base/net/stratum/Job.h" #include "base/net/stratum/Pool.h" +#include "base/tools/Chrono.h" namespace xmrig { @@ -70,8 +71,17 @@ protected: ReconnectingState }; + struct SendResult + { + inline SendResult(Callback &&callback) : callback(callback), ts(Chrono::steadyMSecs()) {} + + Callback callback; + const uint64_t ts; + }; + inline bool isQuiet() const { return m_quiet || m_failures >= m_retries; } + bool handleResponse(int64_t id, const rapidjson::Value &result, const rapidjson::Value &error); bool handleSubmitResponse(int64_t id, const char *error = nullptr); bool m_quiet = false; @@ -82,6 +92,7 @@ protected: Job m_job; Pool m_pool; SocketState m_state = UnconnectedState; + std::map m_callbacks; std::map m_results; String m_ip; uint64_t m_retryPause = 5000; diff --git a/src/base/net/stratum/Client.cpp b/src/base/net/stratum/Client.cpp index 543495335..3619e4e9f 100644 --- a/src/base/net/stratum/Client.cpp +++ b/src/base/net/stratum/Client.cpp @@ -137,6 +137,16 @@ const char *xmrig::Client::tlsVersion() const } +int64_t xmrig::Client::send(const rapidjson::Value &obj, Callback callback) +{ + assert(obj["id"] == sequence()); + + m_callbacks.insert({ sequence(), std::move(callback) }); + + return send(obj); +} + + int64_t xmrig::Client::send(const rapidjson::Value &obj) { using namespace rapidjson; @@ -736,6 +746,10 @@ void xmrig::Client::parseNotification(const char *method, const rapidjson::Value void xmrig::Client::parseResponse(int64_t id, const rapidjson::Value &result, const rapidjson::Value &error) { + if (handleResponse(id, result, error)) { + return; + } + if (error.IsObject()) { const char *message = error["message"].GetString(); diff --git a/src/base/net/stratum/Client.h b/src/base/net/stratum/Client.h index 8ff58c639..da4234845 100644 --- a/src/base/net/stratum/Client.h +++ b/src/base/net/stratum/Client.h @@ -77,6 +77,7 @@ protected: bool isTLS() const override; const char *tlsFingerprint() const override; const char *tlsVersion() const override; + int64_t send(const rapidjson::Value &obj, Callback callback) override; int64_t send(const rapidjson::Value &obj) override; int64_t submit(const JobResult &result) override; void connect() override; diff --git a/src/base/net/stratum/DaemonClient.h b/src/base/net/stratum/DaemonClient.h index 0932b2bee..e819c07d6 100644 --- a/src/base/net/stratum/DaemonClient.h +++ b/src/base/net/stratum/DaemonClient.h @@ -54,13 +54,14 @@ protected: void onHttpData(const HttpData &data) override; void onTimer(const Timer *timer) override; - inline bool hasExtension(Extension) const noexcept override { return false; } - inline const char *mode() const override { return "daemon"; } - inline const char *tlsFingerprint() const override { return m_tlsFingerprint; } - inline const char *tlsVersion() const override { return m_tlsVersion; } - inline int64_t send(const rapidjson::Value &) override { return -1; } - inline void deleteLater() override { delete this; } - inline void tick(uint64_t) override {} + inline bool hasExtension(Extension) const noexcept override { return false; } + inline const char *mode() const override { return "daemon"; } + inline const char *tlsFingerprint() const override { return m_tlsFingerprint; } + inline const char *tlsVersion() const override { return m_tlsVersion; } + inline int64_t send(const rapidjson::Value &, Callback) override { return -1; } + inline int64_t send(const rapidjson::Value &) override { return -1; } + inline void deleteLater() override { delete this; } + inline void tick(uint64_t) override {} private: bool isOutdated(uint64_t height, const char *hash) const; diff --git a/src/base/net/stratum/SelfSelectClient.cpp b/src/base/net/stratum/SelfSelectClient.cpp index 7398d6ee4..3b6034a7e 100644 --- a/src/base/net/stratum/SelfSelectClient.cpp +++ b/src/base/net/stratum/SelfSelectClient.cpp @@ -122,7 +122,7 @@ bool xmrig::SelfSelectClient::parseResponse(int64_t id, rapidjson::Value &result submitBlockTemplate(result); - m_listener->onJobReceived(this, m_job, rapidjson::Value{}); + return true; } @@ -207,7 +207,9 @@ void xmrig::SelfSelectClient::submitBlockTemplate(rapidjson::Value &result) JsonRequest::create(doc, sequence(), "block_template", params); - send(doc); + send(doc, [this](const rapidjson::Value &result, bool success, uint64_t elapsed) { + m_listener->onJobReceived(this, m_job, rapidjson::Value{}); + }); } diff --git a/src/base/net/stratum/SelfSelectClient.h b/src/base/net/stratum/SelfSelectClient.h index aa39d4306..d8bc5585e 100644 --- a/src/base/net/stratum/SelfSelectClient.h +++ b/src/base/net/stratum/SelfSelectClient.h @@ -47,30 +47,31 @@ public: protected: // IClient - inline bool disconnect() override { return m_client->disconnect(); } - inline bool hasExtension(Extension extension) const noexcept override { return m_client->hasExtension(extension); } - inline bool isEnabled() const override { return m_client->isEnabled(); } - inline bool isTLS() const override { return m_client->isTLS(); } - inline const char *mode() const override { return m_client->mode(); } - inline const char *tlsFingerprint() const override { return m_client->tlsFingerprint(); } - inline const char *tlsVersion() const override { return m_client->tlsVersion(); } - inline const Job &job() const override { return m_client->job(); } - inline const Pool &pool() const override { return m_client->pool(); } - inline const String &ip() const override { return m_client->ip(); } - inline int id() const override { return m_client->id(); } - inline int64_t send(const rapidjson::Value &obj) override { return m_client->send(obj); } - inline int64_t sequence() const override { return m_client->sequence(); } - inline int64_t submit(const JobResult &result) override { return m_client->submit(result); } - inline void connect() override { m_client->connect(); } - inline void connect(const Pool &pool) override { m_client->connect(pool); } - inline void deleteLater() override { m_client->deleteLater(); } - inline void setAlgo(const Algorithm &algo) override { m_client->setAlgo(algo); } - inline void setEnabled(bool enabled) override { m_client->setEnabled(enabled); } - inline void setPool(const Pool &pool) override { m_client->setPool(pool); } - inline void setQuiet(bool quiet) override { m_client->setQuiet(quiet); m_quiet = quiet; } - inline void setRetries(int retries) override { m_client->setRetries(retries); } - inline void setRetryPause(uint64_t ms) override { m_client->setRetryPause(ms); } - inline void tick(uint64_t now) override { m_client->tick(now); } + inline bool disconnect() override { return m_client->disconnect(); } + inline bool hasExtension(Extension extension) const noexcept override { return m_client->hasExtension(extension); } + inline bool isEnabled() const override { return m_client->isEnabled(); } + inline bool isTLS() const override { return m_client->isTLS(); } + inline const char *mode() const override { return m_client->mode(); } + inline const char *tlsFingerprint() const override { return m_client->tlsFingerprint(); } + inline const char *tlsVersion() const override { return m_client->tlsVersion(); } + inline const Job &job() const override { return m_client->job(); } + inline const Pool &pool() const override { return m_client->pool(); } + inline const String &ip() const override { return m_client->ip(); } + inline int id() const override { return m_client->id(); } + inline int64_t send(const rapidjson::Value &obj, Callback callback) override { return m_client->send(obj, callback); } + inline int64_t send(const rapidjson::Value &obj) override { return m_client->send(obj); } + inline int64_t sequence() const override { return m_client->sequence(); } + inline int64_t submit(const JobResult &result) override { return m_client->submit(result); } + inline void connect() override { m_client->connect(); } + inline void connect(const Pool &pool) override { m_client->connect(pool); } + inline void deleteLater() override { m_client->deleteLater(); } + inline void setAlgo(const Algorithm &algo) override { m_client->setAlgo(algo); } + inline void setEnabled(bool enabled) override { m_client->setEnabled(enabled); } + inline void setPool(const Pool &pool) override { m_client->setPool(pool); } + inline void setQuiet(bool quiet) override { m_client->setQuiet(quiet); m_quiet = quiet; } + inline void setRetries(int retries) override { m_client->setRetries(retries); } + inline void setRetryPause(uint64_t ms) override { m_client->setRetryPause(ms); } + inline void tick(uint64_t now) override { m_client->tick(now); } // IClientListener inline void onClose(IClient *, int failures) override { m_listener->onClose(this, failures); } diff --git a/src/base/net/stratum/SubmitResult.h b/src/base/net/stratum/SubmitResult.h index 5abd3e4bf..1b49acb43 100644 --- a/src/base/net/stratum/SubmitResult.h +++ b/src/base/net/stratum/SubmitResult.h @@ -35,34 +35,26 @@ namespace xmrig { class SubmitResult { public: - inline SubmitResult() : - reqId(0), - seq(0), - actualDiff(0), - diff(0), - elapsed(0), - m_start(0) - {} + SubmitResult() = default; inline SubmitResult(int64_t seq, uint64_t diff, uint64_t actualDiff, int64_t reqId = 0) : reqId(reqId), seq(seq), actualDiff(actualDiff), diff(diff), - elapsed(0), m_start(Chrono::steadyMSecs()) {} inline void done() { elapsed = Chrono::steadyMSecs() - m_start; } - int64_t reqId; - int64_t seq; - uint64_t actualDiff; - uint64_t diff; - uint64_t elapsed; + int64_t reqId = 0; + int64_t seq = 0; + uint64_t actualDiff = 0; + uint64_t diff = 0; + uint64_t elapsed = 0; private: - uint64_t m_start; + uint64_t m_start = 0; };