From 3ad11685cca13791ab3daec98e2223454757b20a Mon Sep 17 00:00:00 2001
From: XMRig <support@xmrig.com>
Date: Sun, 11 Jun 2017 06:52:23 +0300
Subject: [PATCH] Job flow WIP.

---
 CMakeLists.txt               |  2 +-
 src/App.cpp                  |  2 +-
 src/Console.cpp              | 10 +++++-----
 src/Console.h                |  4 ++--
 src/net/Job.h                |  1 +
 src/net/Network.cpp          |  4 ++++
 src/workers/Handle.cpp       |  6 ++++--
 src/workers/Handle.h         | 11 ++++++++---
 src/workers/SingleWorker.cpp | 33 +++++++++++++++++++++++++++++++-
 src/workers/SingleWorker.h   |  6 ++++++
 src/workers/Worker.cpp       | 10 ++++++++--
 src/workers/Worker.h         |  5 +++++
 src/workers/Workers.cpp      | 37 ++++++++++++++++++++++++++++++------
 src/workers/Workers.h        | 20 +++++++++++++++++--
 14 files changed, 126 insertions(+), 25 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 801b33bb1..c22bf1764 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -84,7 +84,7 @@ endif()
 
 add_definitions(/D_GNU_SOURCE)
 add_definitions(/DUNICODE)
-add_definitions(/DAPP_DEBUG)
+#add_definitions(/DAPP_DEBUG)
 
 set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake")
 
diff --git a/src/App.cpp b/src/App.cpp
index 1a4f6d069..4078f4b7b 100644
--- a/src/App.cpp
+++ b/src/App.cpp
@@ -74,7 +74,7 @@ App::exec()
     Mem::allocate(m_options->algo(), m_options->threads(), m_options->doubleHash());
     Summary::print();
 
-    Workers::start(m_options->threads());
+    Workers::start(m_options->threads(), m_options->affinity(), m_options->nicehash());
 
     m_network->connect();
 
diff --git a/src/Console.cpp b/src/Console.cpp
index f57a71411..fd4ff4a5a 100644
--- a/src/Console.cpp
+++ b/src/Console.cpp
@@ -97,12 +97,12 @@ void Console::message(Console::Level level, const char* fmt, ...)
             m_colors ? kCL_N : ""
         );
 
-    uv_mutex_lock(&m_mutex);
+    pthread_mutex_lock(&m_mutex);
 
     vfprintf(stdout, buf, ap);
     fflush(stdout);
 
-    uv_mutex_unlock(&m_mutex);
+    pthread_mutex_unlock(&m_mutex);
 
     va_end(ap);
 }
@@ -121,12 +121,12 @@ void Console::text(const char* fmt, ...)
             m_colors ? kCL_N : ""
         );
 
-    uv_mutex_lock(&m_mutex);
+    pthread_mutex_lock(&m_mutex);
 
     vfprintf(stdout, buf, ap);
     fflush(stdout);
 
-    uv_mutex_unlock(&m_mutex);
+    pthread_mutex_unlock(&m_mutex);
 
     va_end(ap);
 }
@@ -135,5 +135,5 @@ void Console::text(const char* fmt, ...)
 Console::Console() :
     m_colors(true)
 {
-    uv_mutex_init(&m_mutex);
+    pthread_mutex_init(&m_mutex, nullptr);
 }
diff --git a/src/Console.h b/src/Console.h
index 73e047061..edd5ae8ca 100644
--- a/src/Console.h
+++ b/src/Console.h
@@ -25,7 +25,7 @@
 #define __CONSOLE_H__
 
 
-#include <uv.h>
+#include <pthread.h>
 
 
 class Console
@@ -61,7 +61,7 @@ private:
 
     static Console *m_self;
     bool m_colors;
-    uv_mutex_t m_mutex;
+    pthread_mutex_t m_mutex;
 };
 
 
diff --git a/src/net/Job.h b/src/net/Job.h
index 4dc186503..4f08e84cf 100644
--- a/src/net/Job.h
+++ b/src/net/Job.h
@@ -40,6 +40,7 @@ public:
     inline const char *id() const      { return m_id; }
     inline const uint8_t *blob() const { return m_blob; }
     inline int poolId() const          { return m_poolId; }
+    inline uint32_t *nonce()           { return reinterpret_cast<uint32_t*>(m_blob + 39); }
     inline uint32_t diff() const       { return m_diff; }
     inline uint32_t size() const       { return m_size; }
     inline uint64_t target() const     { return m_target; }
diff --git a/src/net/Network.cpp b/src/net/Network.cpp
index c6862e3bd..02c68fd1b 100644
--- a/src/net/Network.cpp
+++ b/src/net/Network.cpp
@@ -30,6 +30,7 @@
 #include "net/Network.h"
 #include "net/Url.h"
 #include "Options.h"
+#include "workers/Workers.h"
 
 
 Network::Network(const Options *options) :
@@ -83,6 +84,7 @@ void Network::onClose(Client *client, int failures)
 
     if (m_pool == id) {
         m_pool = 0;
+        Workers::pause();
     }
 
     if (id == 1 && m_pools.size() > 2 && failures == m_options->retries()) {
@@ -152,6 +154,8 @@ void Network::setJob(Client *client, const Job &job)
     else {
         LOG_INFO("new job from \"%s:%d\", diff: %d", client->host(), client->port(), job.diff());
     }
+
+    Workers::setJob(job);
 }
 
 
diff --git a/src/workers/Handle.cpp b/src/workers/Handle.cpp
index 527b5d6de..6e4d78ed0 100644
--- a/src/workers/Handle.cpp
+++ b/src/workers/Handle.cpp
@@ -25,8 +25,10 @@
 #include "workers/Handle.h"
 
 
-Handle::Handle(int id) :
-    m_id(id),
+Handle::Handle(int threadId, int64_t affinity, bool nicehash) :
+    m_nicehash(nicehash),
+    m_threadId(threadId),
+    m_affinity(affinity),
     m_worker(nullptr)
 {
 }
diff --git a/src/workers/Handle.h b/src/workers/Handle.h
index e697a6180..59bd47931 100644
--- a/src/workers/Handle.h
+++ b/src/workers/Handle.h
@@ -26,6 +26,7 @@
 
 
 #include <pthread.h>
+#include <stdint.h>
 
 
 class IWorker;
@@ -34,14 +35,18 @@ class IWorker;
 class Handle
 {
 public:
-    Handle(int id);
+    Handle(int threadId, int64_t affinity, bool nicehash);
     void start(void *(*callback) (void *));
 
-    inline int id() const                  { return m_id; }
+    inline bool nicehash() const           { return m_nicehash; }
+    inline int threadId() const            { return m_threadId; }
+    inline int64_t affinity() const        { return m_affinity; }
     inline void setWorker(IWorker *worker) { m_worker = worker; }
 
 private:
-    int m_id;
+    bool m_nicehash;
+    int m_threadId;
+    int64_t m_affinity;
     IWorker *m_worker;
     pthread_t m_thread;
 };
diff --git a/src/workers/SingleWorker.cpp b/src/workers/SingleWorker.cpp
index 95eb41572..eacab9411 100644
--- a/src/workers/SingleWorker.cpp
+++ b/src/workers/SingleWorker.cpp
@@ -22,6 +22,7 @@
  */
 
 
+#include <atomic>
 #include <thread>
 #include <pthread.h>
 
@@ -40,5 +41,35 @@ SingleWorker::SingleWorker(Handle *handle)
 
 void SingleWorker::start()
 {
-//    Workers::submit();
+    while (true) {
+        if (Workers::isPaused()) {
+            do {
+                LOG_ERR("SLEEP WAIT FOR WORK");
+                std::this_thread::sleep_for(std::chrono::milliseconds(200));
+            }
+            while (Workers::isPaused());
+
+            consumeJob();
+        }
+
+        while (!Workers::isOutdated(m_sequence)) {
+            LOG_ERR("WORK %lld %lld", Workers::sequence(), m_sequence);
+
+            std::this_thread::sleep_for(std::chrono::milliseconds(2000));
+
+            sched_yield();
+        }
+
+        consumeJob();
+    }
+}
+
+
+
+void SingleWorker::consumeJob()
+{
+    m_job = Workers::job();
+    m_sequence = Workers::sequence();
+
+    LOG_WARN("consumeJob");
 }
diff --git a/src/workers/SingleWorker.h b/src/workers/SingleWorker.h
index 1c569d977..9820172cb 100644
--- a/src/workers/SingleWorker.h
+++ b/src/workers/SingleWorker.h
@@ -25,6 +25,7 @@
 #define __SINGLEWORKER_H__
 
 
+#include "net/Job.h"
 #include "workers/Worker.h"
 
 
@@ -37,6 +38,11 @@ public:
     SingleWorker(Handle *handle);
 
     void start() override;
+
+private:
+    void consumeJob();
+
+    Job m_job;
 };
 
 
diff --git a/src/workers/Worker.cpp b/src/workers/Worker.cpp
index 24f275e94..1756dbc16 100644
--- a/src/workers/Worker.cpp
+++ b/src/workers/Worker.cpp
@@ -22,17 +22,23 @@
  */
 
 
+#include "Cpu.h"
+#include "Mem.h"
 #include "workers/Handle.h"
 #include "workers/Worker.h"
-#include "Mem.h"
 
 
 Worker::Worker(Handle *handle) :
+    m_nicehash(handle->nicehash()),
     m_handle(handle),
-    m_id(handle->id())
+    m_id(handle->threadId())
 {
     m_handle->setWorker(this);
 
+    if (Cpu::threads() > 1 && m_handle->affinity() != -1L) {
+        Cpu::setAffinity(m_id, m_handle->affinity());
+    }
+
     m_ctx = Mem::create(m_id);
 }
 
diff --git a/src/workers/Worker.h b/src/workers/Worker.h
index e68534ae8..f62083d0b 100644
--- a/src/workers/Worker.h
+++ b/src/workers/Worker.h
@@ -25,6 +25,9 @@
 #define __WORKER_H__
 
 
+#include <stdint.h>
+
+
 #include "interfaces/IWorker.h"
 
 
@@ -39,9 +42,11 @@ public:
     ~Worker();
 
 protected:
+    bool m_nicehash;
     cryptonight_ctx *m_ctx;
     Handle *m_handle;
     int m_id;
+    uint64_t m_sequence;
 };
 
 
diff --git a/src/workers/Workers.cpp b/src/workers/Workers.cpp
index b9dbc484a..1916be72e 100644
--- a/src/workers/Workers.cpp
+++ b/src/workers/Workers.cpp
@@ -21,27 +21,52 @@
  *   along with this program. If not, see <http://www.gnu.org/licenses/>.
  */
 
-
-#include <pthread.h>
-
-
 #include "Console.h"
 #include "workers/Handle.h"
 #include "workers/SingleWorker.h"
 #include "workers/Workers.h"
 
+
+Job Workers::m_job;
+pthread_rwlock_t Workers::m_rwlock;
+std::atomic<int> Workers::m_paused;
+std::atomic<uint64_t> Workers::m_sequence;
 std::vector<Handle*> Workers::m_workers;
 uv_async_t Workers::m_async;
 
 
-void Workers::start(int threads)
+Job Workers::job()
+{
+    pthread_rwlock_rdlock(&m_rwlock);
+    Job job = m_job;
+    pthread_rwlock_unlock(&m_rwlock);
+
+    return std::move(job);
+}
+
+
+void Workers::setJob(const Job &job)
+{
+    pthread_rwlock_wrlock(&m_rwlock);
+    m_job = job;
+    pthread_rwlock_unlock(&m_rwlock);
+
+    m_sequence++;
+    m_paused = 0;
+}
+
+
+void Workers::start(int threads, int64_t affinity, bool nicehash)
 {
     LOG_NOTICE("start %d", pthread_self());
 
+    m_sequence = 0;
+    m_paused   = 1;
+
     uv_async_init(uv_default_loop(), &m_async, Workers::onResult);
 
     for (int i = 0; i < threads; ++i) {
-        Handle *handle = new Handle(i);
+        Handle *handle = new Handle(i, affinity, nicehash);
         m_workers.push_back(handle);
         handle->start(Workers::onReady);
     }
diff --git a/src/workers/Workers.h b/src/workers/Workers.h
index 48485770d..7d6328cad 100644
--- a/src/workers/Workers.h
+++ b/src/workers/Workers.h
@@ -25,8 +25,13 @@
 #define __WORKERS_H__
 
 
-#include <vector>
+#include <atomic>
+#include <pthread.h>
 #include <uv.h>
+#include <vector>
+
+
+#include "net/Job.h"
 
 
 class Handle;
@@ -35,13 +40,24 @@ class Handle;
 class Workers
 {
 public:
-    static void start(int threads);
+    static Job job();
+    static void setJob(const Job &job);
+    static void start(int threads, int64_t affinity, bool nicehash);
     static void submit();
 
+    static inline bool isOutdated(uint64_t sequence) { return m_sequence.load(std::memory_order_relaxed) != sequence; }
+    static inline bool isPaused()                    { return m_paused.load(std::memory_order_relaxed) == 1; }
+    static inline uint64_t sequence()                { return m_sequence.load(std::memory_order_relaxed); }
+    static inline void pause()                       { m_paused = 1; }
+
 private:
     static void *onReady(void *arg);
     static void onResult(uv_async_t *handle);
 
+    static Job m_job;
+    static pthread_rwlock_t m_rwlock;
+    static std::atomic<int> m_paused;
+    static std::atomic<uint64_t> m_sequence;
     static std::vector<Handle*> m_workers;
     static uv_async_t m_async;
 };