From b5fa34e1a195b7dad0075370bb12c1be480a6c6b Mon Sep 17 00:00:00 2001 From: SChernykh Date: Fri, 16 Jun 2023 15:51:33 +0200 Subject: [PATCH] Added support for multiple Monero hosts Switch to the next host in list when P2Pool thinks current host is stuck or lagging --- .github/workflows/test-sync.yml | 18 ++-- CMakeLists.txt | 2 +- docs/COMMAND_LINE.MD | 13 +++ src/console_commands.cpp | 16 ++- src/p2p_server.cpp | 34 +++++-- src/p2p_server.h | 2 +- src/p2pool.cpp | 170 +++++++++++++++++++------------- src/p2pool.h | 31 ++++-- src/params.cpp | 83 +++++++++++++++- src/params.h | 30 +++++- src/pow_hash.cpp | 3 +- src/side_chain.cpp | 17 ++-- src/side_chain.h | 2 +- src/zmq_reader.cpp | 103 +++++++++++++------ src/zmq_reader.h | 25 ++++- 15 files changed, 402 insertions(+), 147 deletions(-) diff --git a/.github/workflows/test-sync.yml b/.github/workflows/test-sync.yml index 8d6e7e2..d5ff798 100644 --- a/.github/workflows/test-sync.yml +++ b/.github/workflows/test-sync.yml @@ -32,14 +32,13 @@ jobs: make -j$(nproc) - name: Run p2pool - timeout-minutes: 25 run: | cd build mkdir data python ../tests/src/stratum_dummy.py 1 & python ../tests/src/stratum_dummy.py 2 & python ../tests/src/stratum_dummy.py 3 & - TSAN_OPTIONS="suppressions=../tests/src/tsan_sup.txt halt_on_error=1" ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6 + TSAN_OPTIONS="suppressions=../tests/src/tsan_sup.txt halt_on_error=1" ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --host xmr2.rs.me --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6 grep 'Synchronization finished successfully' p2pool.log - name: Archive p2pool.log @@ -110,14 +109,13 @@ jobs: make -j$(nproc) - name: Run p2pool - timeout-minutes: 20 run: | cd build mkdir data python ../tests/src/stratum_dummy.py 1 & python ../tests/src/stratum_dummy.py 2 & python ../tests/src/stratum_dummy.py 3 & - MSAN_OPTIONS="halt_on_error=1" ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6 + MSAN_OPTIONS="halt_on_error=1" ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --host xmr2.rs.me --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6 grep 'Synchronization finished successfully' p2pool.log - name: Archive p2pool.log @@ -153,14 +151,13 @@ jobs: make -j$(nproc) - name: Run p2pool - timeout-minutes: 20 run: | cd build mkdir data python ../tests/src/stratum_dummy.py 1 & python ../tests/src/stratum_dummy.py 2 & python ../tests/src/stratum_dummy.py 3 & - UBSAN_OPTIONS="suppressions=../tests/src/ubsan_sup.txt halt_on_error=1" ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6 + UBSAN_OPTIONS="suppressions=../tests/src/ubsan_sup.txt halt_on_error=1" ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --host xmr2.rs.me --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6 grep 'Synchronization finished successfully' p2pool.log - name: Archive p2pool.log @@ -196,14 +193,13 @@ jobs: make -j$(nproc) - name: Run p2pool - timeout-minutes: 20 run: | cd build mkdir data python ../tests/src/stratum_dummy.py 1 & python ../tests/src/stratum_dummy.py 2 & python ../tests/src/stratum_dummy.py 3 & - ASAN_OPTIONS="detect_stack_use_after_return=1 atexit=1" ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6 + ASAN_OPTIONS="detect_stack_use_after_return=1 atexit=1" ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --host xmr2.rs.me --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6 grep 'Synchronization finished successfully' p2pool.log - name: Archive p2pool.log @@ -236,14 +232,13 @@ jobs: make -j3 - name: Run p2pool - timeout-minutes: 20 run: | cd build mkdir data python ../tests/src/stratum_dummy.py 1 & python ../tests/src/stratum_dummy.py 2 & python ../tests/src/stratum_dummy.py 3 & - ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6 + ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --host xmr2.rs.me --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6 grep 'Synchronization finished successfully' p2pool.log - name: Archive p2pool.log @@ -276,14 +271,13 @@ jobs: & "C:\\Program Files\\Microsoft Visual Studio\\2022\\Enterprise\\Msbuild\\Current\\Bin\\amd64\\msbuild" /m /p:Configuration=Debug p2pool.vcxproj - name: Run p2pool - timeout-minutes: 20 run: | cd build/Debug mkdir data Start-Process python -ArgumentList "../../tests/src/stratum_dummy.py 1" Start-Process python -ArgumentList "../../tests/src/stratum_dummy.py 2" Start-Process python -ArgumentList "../../tests/src/stratum_dummy.py 3" - ./p2pool.exe --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6 + ./p2pool.exe --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --host xmr2.rs.me --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6 findstr /C:"Synchronization finished successfully" p2pool.log - name: Archive p2pool.log diff --git a/CMakeLists.txt b/CMakeLists.txt index d4e99ac..2bf7222 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -120,7 +120,7 @@ if (WITH_RANDOMX) set(SOURCES ${SOURCES} src/miner.cpp) endif() -if (NOT STATIC_BINARY AND NOT STATIC_LIBS) +if (NOT ((CMAKE_CXX_COMPILER_ID MATCHES MSVC) OR STATIC_BINARY OR STATIC_LIBS)) include(FindCURL) endif() diff --git a/docs/COMMAND_LINE.MD b/docs/COMMAND_LINE.MD index 307f31f..6ca2e09 100644 --- a/docs/COMMAND_LINE.MD +++ b/docs/COMMAND_LINE.MD @@ -34,4 +34,17 @@ ### Example command line +``` p2pool.exe --host 127.0.0.1 --rpc-port 18081 --zmq-port 18083 --wallet YOUR_WALLET_ADDRESS --stratum 0.0.0.0:3333 --p2p 0.0.0.0:37889 +``` + +### Multiple backup hosts + +You can have multiple hosts in command line. Each new host uses RPC and zmq-pub port numbers from the previous host (or default 18081/18083 if none were specified). + +In this example, you have local Monero node running on ports 18081/18083 (RPC/zmq-pub), and several backup nodes running on ports 18089/18084. If P2Pool detects that the node it's currently using is unavailable or stuck, it will cycle to the next node in the list. + + +``` +p2pool.exe --host 127.0.0.1 --host xmr1.rs.me --rpc-port 18089 --zmq-port 18084 --host xmr2.rs.me --host xmr3.rs.me --wallet YOUR_WALLET_ADDRESS +``` diff --git a/src/console_commands.cpp b/src/console_commands.cpp index 9a775a7..89a006b 100644 --- a/src/console_commands.cpp +++ b/src/console_commands.cpp @@ -140,7 +140,7 @@ typedef struct cmd { cmdfunc *func; } cmd; -static cmdfunc do_help, do_status, do_loglevel, do_addpeers, do_droppeers, do_showpeers, do_showworkers, do_showbans, do_outpeers, do_inpeers, do_exit, do_version; +static cmdfunc do_help, do_status, do_loglevel, do_addpeers, do_droppeers, do_showpeers, do_showworkers, do_showbans, do_showhosts, do_nexthost, do_outpeers, do_inpeers, do_exit, do_version; #ifdef WITH_RANDOMX static cmdfunc do_start_mining, do_stop_mining; @@ -155,6 +155,8 @@ static cmd cmds[] = { { STRCONST("peers"), "", "show all peers", do_showpeers }, { STRCONST("workers"), "", "show all connected workers", do_showworkers }, { STRCONST("bans"), "", "show all banned IPs", do_showbans }, + { STRCONST("hosts"), "", "show Monero hosts", do_showhosts }, + { STRCONST("next_host"), "", "switch to the next Monero host", do_nexthost }, { STRCONST("outpeers"), "", "set maximum number of outgoing connections", do_outpeers }, { STRCONST("inpeers"), "", "set maximum number of incoming connections", do_inpeers }, #ifdef WITH_RANDOMX @@ -240,6 +242,18 @@ static void do_showbans(p2pool* m_pool, const char* /* args */) } } +// cppcheck-suppress constParameterCallback +static void do_showhosts(p2pool* m_pool, const char* /* args */) +{ + m_pool->print_hosts(); +} + +// cppcheck-suppress constParameterCallback +static void do_nexthost(p2pool* m_pool, const char* /* args */) +{ + m_pool->reconnect_to_host(); +} + // cppcheck-suppress constParameterCallback static void do_outpeers(p2pool* m_pool, const char* args) { diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 216f17c..03ecddd 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -615,9 +615,9 @@ void P2PServer::load_peer_list() void P2PServer::load_monerod_peer_list() { - const Params& params = m_pool->params(); + const Params::Host host = m_pool->current_host(); - JSONRPCRequest::call(params.m_host, params.m_rpcPort, "/get_peer_list", params.m_rpcLogin, m_socks5Proxy, + JSONRPCRequest::call(host.m_address, host.m_rpcPort, "/get_peer_list", host.m_rpcLogin, m_socks5Proxy, [this](const char* data, size_t size) { #define ERR_STR "/get_peer_list RPC request returned invalid JSON " @@ -1045,7 +1045,7 @@ void P2PServer::on_timer() update_peer_list(); save_peer_list_async(); update_peer_connections(); - check_zmq(); + check_host(); check_block_template(); api_update_local_stats(); } @@ -1165,25 +1165,41 @@ void P2PServer::download_missing_blocks() } } -void P2PServer::check_zmq() +void P2PServer::check_host() { - if ((m_timerCounter % 30) != 3) { + if (!m_pool->startup_finished()) { return; } if (!m_pool->zmq_running()) { LOGERR(1, "ZMQ is not running, restarting it"); - m_pool->restart_zmq(); + m_pool->reconnect_to_host(); return; } + const uint64_t height = m_pool->miner_data().height; + const SideChain& side_chain = m_pool->side_chain(); + + // If the latest 5 side chain blocks are 2 or more Monero blocks ahead, then the node is probably stuck + uint32_t counter = 5; + for (const PoolBlock* b = side_chain.chainTip(); b && (b->m_txinGenHeight >= height + 2); b = side_chain.find_block(b->m_parent)) { + if (--counter == 0) { + const Params::Host host = m_pool->current_host(); + LOGERR(1, host.m_displayName << " seems to be stuck, reconnecting"); + m_pool->reconnect_to_host(); + return; + } + } + const uint64_t cur_time = seconds_since_epoch(); const uint64_t last_active = m_pool->zmq_last_active(); + // If there were no ZMQ messages in the last 5 minutes, then the node is probably stuck if (cur_time >= last_active + 300) { const uint64_t dt = static_cast(cur_time - last_active); - LOGERR(1, "no ZMQ messages received from monerod in the last " << dt << " seconds, check your monerod/p2pool/network/firewall setup!!!"); - m_pool->restart_zmq(); + const Params::Host host = m_pool->current_host(); + LOGERR(1, "no ZMQ messages received from " << host.m_displayName << " in the last " << dt << " seconds, check your monerod/p2pool/network/firewall setup!!!"); + m_pool->reconnect_to_host(); } } @@ -1230,6 +1246,8 @@ P2PServer::P2PClient::P2PClient() void P2PServer::on_shutdown() { + save_peer_list(); + uv_timer_stop(&m_timer); uv_close(reinterpret_cast(&m_timer), nullptr); uv_close(reinterpret_cast(&m_broadcastAsync), nullptr); diff --git a/src/p2p_server.h b/src/p2p_server.h index c433fa6..3c4ec2b 100644 --- a/src/p2p_server.h +++ b/src/p2p_server.h @@ -191,7 +191,7 @@ private: void flush_cache(); void download_missing_blocks(); - void check_zmq(); + void check_host(); void check_block_template(); void update_peer_connections(); void update_peer_list(); diff --git a/src/p2pool.cpp b/src/p2pool.cpp index 8b7c384..a65cdcb 100644 --- a/src/p2pool.cpp +++ b/src/p2pool.cpp @@ -69,36 +69,14 @@ p2pool::p2pool(int argc, char* argv[]) } #endif - if (!p->m_wallet.valid()) { - LOGERR(1, "Invalid wallet address. Try \"p2pool --help\"."); - throw std::exception(); - } - - m_hostStr = p->m_host; - - if (p->m_socks5Proxy.empty()) { - if (p->m_dns) { - bool is_v6; - if (!resolve_host(p->m_host, is_v6)) { - LOGERR(1, "resolve_host failed for " << p->m_host); - throw std::exception(); - } - } - else if (p->m_host.find_first_not_of("0123456789.:") != std::string::npos) { - LOGERR(1, "Can't resolve hostname " << p->m_host << " with DNS disabled"); + for (Params::Host& h : p->m_hosts) { + if (!h.init_display_name(*p)) { throw std::exception(); } } - { - const bool changed = (p->m_host != m_hostStr); - const std::string rpc_port = ':' + std::to_string(p->m_rpcPort); - const std::string zmq_port = ":ZMQ:" + std::to_string(p->m_zmqPort); - m_hostStr += rpc_port + zmq_port; - if (changed) { - m_hostStr += " (" + p->m_host + ')'; - } - } + m_currentHost = p->m_hosts.front(); + m_currentHostIndex = 0; hash pub, sec, eph_public_key; generate_keys(pub, sec); @@ -139,15 +117,17 @@ p2pool::p2pool(int argc, char* argv[]) } m_stopAsync.data = this; - err = uv_async_init(uv_default_loop_checked(), &m_restartZMQAsync, on_restart_zmq); + err = uv_async_init(uv_default_loop_checked(), &m_reconnectToHostAsync, on_reconnect_to_host); if (err) { LOGERR(1, "uv_async_init failed, error " << uv_err_name(err)); throw std::exception(); } - m_restartZMQAsync.data = this; + m_reconnectToHostAsync.data = this; + uv_rwlock_init_checked(&m_currentHostLock); uv_rwlock_init_checked(&m_mainchainLock); uv_rwlock_init_checked(&m_minerDataLock); + uv_rwlock_init_checked(&m_ZMQReaderLock); uv_mutex_init_checked(&m_foundBlocksLock); #ifdef WITH_RANDOMX uv_mutex_init_checked(&m_minerLock); @@ -204,8 +184,10 @@ p2pool::~p2pool() } #endif + uv_rwlock_destroy(&m_currentHostLock); uv_rwlock_destroy(&m_mainchainLock); uv_rwlock_destroy(&m_minerDataLock); + uv_rwlock_destroy(&m_ZMQReaderLock); uv_mutex_destroy(&m_foundBlocksLock); #ifdef WITH_RANDOMX uv_mutex_destroy(&m_minerLock); @@ -220,6 +202,20 @@ p2pool::~p2pool() delete m_params; } +void p2pool::print_hosts() const +{ + const Params::Host host = current_host(); + + for (const Params::Host& h : m_params->m_hosts) { + if (h.m_displayName == host.m_displayName) { + LOGINFO(0, log::LightCyan() << "-> " << h.m_displayName); + } + else { + LOGINFO(0, " " << h.m_displayName); + } + } +} + bool p2pool::calculate_hash(const void* data, size_t size, uint64_t height, const hash& seed, hash& result, bool force_light_mode) { return m_hasher->calculate(data, size, height, seed, result, force_light_mode); @@ -297,17 +293,15 @@ void p2pool::handle_miner_data(MinerData& data) { WriteLock lock(m_mainchainLock); - m_mainchainByHeight[data.height].difficulty = data.difficulty; + ChainMain& c0 = m_mainchainByHeight[data.height]; + c0.height = data.height; + c0.difficulty = data.difficulty; - ChainMain& c = m_mainchainByHeight[data.height - 1]; - c.height = data.height - 1; - c.id = data.prev_id; + ChainMain& c1 = m_mainchainByHeight[data.height - 1]; + c1.height = data.height - 1; + c1.id = data.prev_id; - // timestamp and reward is unknown here - c.timestamp = 0; - c.reward = 0; - - m_mainchainByHash[c.id] = c; + m_mainchainByHash[c1.id] = c1; cleanup_mainchain_data(data.height); } @@ -321,8 +315,11 @@ void p2pool::handle_miner_data(MinerData& data) m_updateSeed = true; update_median_timestamp(); + const Params::Host host = current_host(); + LOGINFO(2, "new miner data\n---------------------------------------------------------------------------------------------------------------" << + "\nhost = " << host.m_displayName << "\nmajor_version = " << data.major_version << "\nheight = " << data.height << "\nprev_id = " << log::LightBlue() << data.prev_id << log::NoColor() << @@ -370,7 +367,7 @@ void p2pool::handle_miner_data(MinerData& data) log::Stream s(buf); s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << h << "}}\0"; - JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, m_params->m_socks5Proxy, + JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, m_params->m_socks5Proxy, [this, h](const char* data, size_t size) { ChainMain block; @@ -528,7 +525,7 @@ void p2pool::on_stop(uv_async_t* async) uv_close(reinterpret_cast(&pool->m_submitBlockAsync), nullptr); uv_close(reinterpret_cast(&pool->m_blockTemplateAsync), nullptr); uv_close(reinterpret_cast(&pool->m_stopAsync), nullptr); - uv_close(reinterpret_cast(&pool->m_restartZMQAsync), nullptr); + uv_close(reinterpret_cast(&pool->m_reconnectToHostAsync), nullptr); init_signals(pool, false); @@ -603,7 +600,9 @@ void p2pool::submit_block() const } request.append("\"]}"); - JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, request, m_params->m_rpcLogin, m_params->m_socks5Proxy, + const Params::Host host = current_host(); + + JSONRPCRequest::call(host.m_address, host.m_rpcPort, request, host.m_rpcLogin, m_params->m_socks5Proxy, [height, diff, template_id, nonce, extra_nonce, sidechain_id, is_external](const char* data, size_t size) { rapidjson::Document doc; @@ -714,13 +713,15 @@ void p2pool::download_block_headers(uint64_t current_height) char buf[log::Stream::BUF_SIZE + 1] = {}; log::Stream s(buf); + const Params::Host host = current_host(); + // First download 2 RandomX seeds const uint64_t seed_heights[2] = { prev_seed_height, seed_height }; for (uint64_t height : seed_heights) { s.m_pos = 0; s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << height << "}}\0"; - JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, m_params->m_socks5Proxy, + JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, m_params->m_socks5Proxy, [this, prev_seed_height, height](const char* data, size_t size) { ChainMain block; @@ -749,8 +750,8 @@ void p2pool::download_block_headers(uint64_t current_height) s.m_pos = 0; s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_headers_range\",\"params\":{\"start_height\":" << start_height << ",\"end_height\":" << current_height - 1 << "}}\0"; - JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, m_params->m_socks5Proxy, - [this, start_height, current_height](const char* data, size_t size) + JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, m_params->m_socks5Proxy, + [this, start_height, current_height, host](const char* data, size_t size) { if (parse_block_headers_range(data, size) == current_height - start_height) { update_median_timestamp(); @@ -762,15 +763,23 @@ void p2pool::download_block_headers(uint64_t current_height) start_mining(m_params->m_minerThreads); } #endif - try { - m_ZMQReader = new ZMQReader(m_params->m_host, m_params->m_zmqPort, m_params->m_socks5Proxy, this); - } - catch (const std::exception& e) { - LOGERR(1, "Couldn't start ZMQ reader: exception " << e.what()); - PANIC_STOP(); + { + WriteLock lock(m_ZMQReaderLock); + + try { + m_ZMQReader = new ZMQReader(host.m_address, host.m_zmqPort, m_params->m_socks5Proxy, this); + m_zmqLastActive = seconds_since_epoch(); + } + catch (const std::exception& e) { + LOGERR(1, "Couldn't start ZMQ reader: exception " << e.what()); + PANIC_STOP(); + } } + api_update_network_stats(); get_miner_data(); + + m_startupFinished = true; } } else { @@ -857,17 +866,20 @@ void p2pool::stratum_on_block() void p2pool::get_info() { - JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_info\"}", m_params->m_rpcLogin, m_params->m_socks5Proxy, + const Params::Host host = current_host(); + + JSONRPCRequest::call(host.m_address, host.m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_info\"}", host.m_rpcLogin, m_params->m_socks5Proxy, [this](const char* data, size_t size) { parse_get_info_rpc(data, size); }, - [this](const char* data, size_t size) + [this, host](const char* data, size_t size) { if (size > 0) { - LOGWARN(1, "get_info RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second"); + LOGWARN(1, "get_info RPC request to " << host.m_displayName << " failed: error " << log::const_buf(data, size) << ", trying again in 1 second"); if (!m_stopped) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + switch_host(); get_info(); } } @@ -970,17 +982,19 @@ void p2pool::parse_get_info_rpc(const char* data, size_t size) void p2pool::get_version() { + const Params::Host host = current_host(); + const uint64_t t1 = microseconds_since_epoch(); - JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_version\"}", m_params->m_rpcLogin, m_params->m_socks5Proxy, - [this, t1](const char* data, size_t size) + JSONRPCRequest::call(host.m_address, host.m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_version\"}", host.m_rpcLogin, m_params->m_socks5Proxy, + [this, t1, host](const char* data, size_t size) { const double node_ping = static_cast(microseconds_since_epoch() - t1) / 1e3; if (node_ping < 100) { - LOGINFO(1, m_hostStr << " ping time is " << node_ping << " ms"); + LOGINFO(1, host.m_displayName << " ping time is " << node_ping << " ms"); } else { - LOGWARN(1, m_hostStr << " ping time is " << node_ping << " ms, this is too high for an efficient mining. Try to use a different node, or your own local node."); + LOGWARN(1, host.m_displayName << " ping time is " << node_ping << " ms, this is too high for an efficient mining. Try to use a different node, or your own local node."); } parse_get_version_rpc(data, size); }, @@ -1047,23 +1061,25 @@ void p2pool::parse_get_version_rpc(const char* data, size_t size) get_miner_data(); } -void p2pool::get_miner_data() +void p2pool::get_miner_data(bool retry) { if (m_getMinerDataPending) { return; } m_getMinerDataPending = true; - JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_miner_data\"}", m_params->m_rpcLogin, m_params->m_socks5Proxy, + const Params::Host host = current_host(); + + JSONRPCRequest::call(host.m_address, host.m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_miner_data\"}", host.m_rpcLogin, m_params->m_socks5Proxy, [this](const char* data, size_t size) { parse_get_miner_data_rpc(data, size); }, - [this](const char* data, size_t size) + [this, host, retry](const char* data, size_t size) { if (size > 0) { - LOGWARN(1, "get_miner_data RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second"); - if (!m_stopped) { + LOGWARN(1, "get_miner_data RPC request to " << host.m_displayName << " failed: error " << log::const_buf(data, size) << (retry ? ", trying again in 1 second" : "")); + if (!m_stopped && retry) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); m_getMinerDataPending = false; get_miner_data(); @@ -1574,33 +1590,53 @@ void p2pool::stop() bool p2pool::zmq_running() const { + ReadLock lock(m_ZMQReaderLock); return m_ZMQReader && m_ZMQReader->is_running(); } -void p2pool::restart_zmq() +Params::Host p2pool::switch_host() { - // If p2pool is stopped, m_restartZMQAsync is most likely already closed + const Params::Host new_host = m_params->m_hosts[++m_currentHostIndex % m_params->m_hosts.size()]; + { + WriteLock lock(m_currentHostLock); + m_currentHost = new_host; + } + return new_host; +} + +void p2pool::reconnect_to_host() +{ + // If p2pool is stopped, m_reconnectToHostAsync is most likely already closed if (m_stopped) { return; } if (!is_main_thread()) { - uv_async_send(&m_restartZMQAsync); + uv_async_send(&m_reconnectToHostAsync); return; } - get_miner_data(); + const Params::Host new_host = switch_host(); - delete m_ZMQReader; + WriteLock lock(m_ZMQReaderLock); + + ZMQReader* old_reader = m_ZMQReader; m_ZMQReader = nullptr; + delete old_reader; + try { - m_ZMQReader = new ZMQReader(m_params->m_host, m_params->m_zmqPort, m_params->m_socks5Proxy, this); + ZMQReader* new_reader = new ZMQReader(new_host.m_address, new_host.m_zmqPort, m_params->m_socks5Proxy, this); m_zmqLastActive = seconds_since_epoch(); + m_ZMQReader = new_reader; } catch (const std::exception& e) { LOGERR(1, "Couldn't restart ZMQ reader: exception " << e.what()); } + + if (m_ZMQReader) { + get_miner_data(false); + } } int p2pool::run() diff --git a/src/p2pool.h b/src/p2pool.h index 0c6bc31..cac8848 100644 --- a/src/p2pool.h +++ b/src/p2pool.h @@ -18,11 +18,11 @@ #pragma once #include "uv_util.h" +#include "params.h" #include namespace p2pool { -struct Params; class RandomX_Hasher_Base; class BlockTemplate; class Mempool; @@ -46,11 +46,18 @@ public: bool stopped() const { return m_stopped; } void stop(); - const std::string& host_str() const { return m_hostStr; } const Params& params() const { return *m_params; } BlockTemplate& block_template() { return *m_blockTemplate; } SideChain& side_chain() { return *m_sideChain; } + FORCEINLINE Params::Host current_host() const + { + ReadLock lock(m_currentHostLock); + return m_currentHost; + } + + void print_hosts() const; + FORCEINLINE MinerData miner_data() const { ReadLock lock(m_minerDataLock); @@ -98,24 +105,31 @@ public: bool zmq_running() const; uint64_t zmq_last_active() const { return m_zmqLastActive; } uint64_t start_time() const { return m_startTime; } - void restart_zmq(); + void reconnect_to_host(); + + bool startup_finished() const { return m_startupFinished.load(); } private: p2pool(const p2pool&) = delete; p2pool(p2pool&&) = delete; + Params::Host switch_host(); + static void on_submit_block(uv_async_t* async) { reinterpret_cast(async->data)->submit_block(); } static void on_update_block_template(uv_async_t* async) { reinterpret_cast(async->data)->update_block_template(); } static void on_stop(uv_async_t*); - static void on_restart_zmq(uv_async_t* async) { reinterpret_cast(async->data)->restart_zmq(); } + static void on_reconnect_to_host(uv_async_t* async) { reinterpret_cast(async->data)->reconnect_to_host(); } void submit_block() const; std::atomic m_stopped; - std::string m_hostStr; const Params* m_params; + mutable uv_rwlock_t m_currentHostLock; + Params::Host m_currentHost; + uint32_t m_currentHostIndex; + p2pool_api* m_api; SideChain* m_sideChain; RandomX_Hasher_Base* m_hasher; @@ -143,7 +157,7 @@ private: void get_version(); void parse_get_version_rpc(const char* data, size_t size); - void get_miner_data(); + void get_miner_data(bool retry = true); void parse_get_miner_data_rpc(const char* data, size_t size); bool parse_block_header(const char* data, size_t size, ChainMain& c); @@ -179,6 +193,8 @@ private: StratumServer* m_stratumServer = nullptr; P2PServer* m_p2pServer = nullptr; + std::atomic m_startupFinished{ false }; + #ifdef WITH_RANDOMX uv_mutex_t m_minerLock; Miner* m_miner = nullptr; @@ -205,8 +221,9 @@ private: std::atomic m_zmqLastActive; uint64_t m_startTime; - uv_async_t m_restartZMQAsync; + uv_async_t m_reconnectToHostAsync; + mutable uv_rwlock_t m_ZMQReaderLock; ZMQReader* m_ZMQReader = nullptr; hash m_getMinerDataHash; diff --git a/src/params.cpp b/src/params.cpp index b3eefb1..ea9a0ee 100644 --- a/src/params.cpp +++ b/src/params.cpp @@ -20,6 +20,8 @@ #include "stratum_server.h" #include "p2p_server.h" +constexpr char log_category_prefix[] = "P2Pool "; + void p2pool_usage(); namespace p2pool { @@ -30,17 +32,35 @@ Params::Params(int argc, char* const argv[]) bool ok = false; if ((strcmp(argv[i], "--host") == 0) && (i + 1 < argc)) { - m_host = argv[++i]; + const char* address = argv[++i]; + + if (m_hosts.empty()) { + m_hosts.emplace_back(Host()); + m_hosts.back().m_address = address; + } + else { + const Host& h = m_hosts.back(); + m_hosts.emplace_back(address, h.m_rpcPort, h.m_zmqPort, ""); + } + ok = true; } if ((strcmp(argv[i], "--rpc-port") == 0) && (i + 1 < argc)) { - m_rpcPort = std::min(std::max(strtoul(argv[++i], nullptr, 10), 1UL), 65535UL); + if (m_hosts.empty()) { + m_hosts.emplace_back(Host()); + } + + m_hosts.back().m_rpcPort = std::min(std::max(strtoul(argv[++i], nullptr, 10), 1UL), 65535UL); ok = true; } if ((strcmp(argv[i], "--zmq-port") == 0) && (i + 1 < argc)) { - m_zmqPort = std::min(std::max(strtoul(argv[++i], nullptr, 10), 1UL), 65535UL); + if (m_hosts.empty()) { + m_hosts.emplace_back(Host()); + } + + m_hosts.back().m_zmqPort = std::min(std::max(strtoul(argv[++i], nullptr, 10), 1UL), 65535UL); ok = true; } @@ -133,7 +153,11 @@ Params::Params(int argc, char* const argv[]) } if ((strcmp(argv[i], "--rpc-login") == 0) && (i + 1 < argc)) { - m_rpcLogin = argv[++i]; + if (m_hosts.empty()) { + m_hosts.emplace_back(Host()); + } + + m_hosts.back().m_rpcLogin = argv[++i]; ok = true; } @@ -172,6 +196,21 @@ Params::Params(int argc, char* const argv[]) } } + auto invalid_host = [](const Host& h) + { + if (!h.valid()) { + LOGERR(1, "Invalid host " << h.m_address << ':' << h.m_rpcPort << ":ZMQ:" << h.m_zmqPort << ". Try \"p2pool --help\"."); + return true; + } + return false; + }; + + m_hosts.erase(std::remove_if(m_hosts.begin(), m_hosts.end(), invalid_host), m_hosts.end()); + + if (m_hosts.empty()) { + m_hosts.emplace_back(Host()); + } + if (m_stratumAddresses.empty()) { const int stratum_port = DEFAULT_STRATUM_PORT; @@ -185,7 +224,41 @@ Params::Params(int argc, char* const argv[]) bool Params::valid() const { - return !m_host.empty() && m_rpcPort && m_zmqPort && m_wallet.valid(); + if (!m_wallet.valid()) { + LOGERR(1, "Invalid wallet address. Try \"p2pool --help\"."); + return false; + } + + return true; +} + +bool Params::Host::init_display_name(const Params& p) +{ + m_displayName = m_address; + + if (p.m_socks5Proxy.empty()) { + if (p.m_dns) { + bool is_v6; + if (!resolve_host(m_address, is_v6)) { + LOGERR(1, "resolve_host failed for " << m_address); + return false; + } + } + else if (m_address.find_first_not_of("0123456789.:") != std::string::npos) { + LOGERR(1, "Can't resolve hostname " << m_address << " with DNS disabled"); + return false; + } + } + + const bool changed = (m_address != m_displayName); + const std::string rpc_port = ':' + std::to_string(m_rpcPort); + const std::string zmq_port = ":ZMQ:" + std::to_string(m_zmqPort); + m_displayName += rpc_port + zmq_port; + if (changed) { + m_displayName += " (" + m_address + ')'; + } + + return true; } } // namespace p2pool diff --git a/src/params.h b/src/params.h index f0fc96e..a23e7aa 100644 --- a/src/params.h +++ b/src/params.h @@ -27,9 +27,32 @@ struct Params bool valid() const; - std::string m_host = "127.0.0.1"; - uint32_t m_rpcPort = 18081; - uint32_t m_zmqPort = 18083; + struct Host + { + Host() : m_address("127.0.0.1"), m_rpcPort(18081), m_zmqPort(18083) {} + + Host(const char* address, uint32_t rpcPort, uint32_t zmqPort, const char* rpcLogin) + : m_address(address) + , m_rpcPort(rpcPort) + , m_zmqPort(zmqPort) + , m_rpcLogin(rpcLogin) + {} + + bool valid() const { return !m_address.empty() && m_rpcPort && m_zmqPort && (m_rpcPort != m_zmqPort); } + + bool init_display_name(const Params& p); + + std::string m_address; + uint32_t m_rpcPort; + uint32_t m_zmqPort; + + std::string m_rpcLogin; + + std::string m_displayName; + }; + + std::vector m_hosts; + bool m_lightMode = false; Wallet m_wallet{ nullptr }; std::string m_stratumAddresses; @@ -49,7 +72,6 @@ struct Params uint32_t m_minerThreads = 0; bool m_mini = false; bool m_autoDiff = true; - std::string m_rpcLogin; std::string m_socks5Proxy; bool m_dns = true; uint32_t m_p2pExternalPort = 0; diff --git a/src/pow_hash.cpp b/src/pow_hash.cpp index 4afe46f..2221c16 100644 --- a/src/pow_hash.cpp +++ b/src/pow_hash.cpp @@ -447,8 +447,9 @@ bool RandomX_Hasher_RPC::calculate(const void* data_ptr, size_t size, uint64_t h volatile bool done = false; const Params& params = m_pool->params(); + const Params::Host host = m_pool->current_host(); - JSONRPCRequest::call(params.m_host, params.m_rpcPort, buf, params.m_rpcLogin, params.m_socks5Proxy, + JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, params.m_socks5Proxy, [&result, &h](const char* data, size_t size) { rapidjson::Document doc; diff --git a/src/side_chain.cpp b/src/side_chain.cpp index e34c2d4..52f3ee3 100644 --- a/src/side_chain.cpp +++ b/src/side_chain.cpp @@ -67,7 +67,7 @@ SideChain::SideChain(p2pool* pool, NetworkType type, const char* pool_name) , m_unclePenalty(20) , m_precalcFinished(false) #ifdef DEV_TEST_SYNC - , m_synchronizedTime(0) + , m_firstPruneTime(0) #endif { if (s_networkType == NetworkType::Invalid) { @@ -1022,7 +1022,7 @@ void SideChain::print_status(bool obtain_sidechain_lock) const } LOGINFO(0, "status" << - "\nMonero node = " << m_pool->host_str() << + "\nMonero node = " << m_pool->current_host().m_displayName << "\nMain chain height = " << m_pool->block_template().height() << "\nMain chain hashrate = " << log::Hashrate(network_hashrate) << "\nSide chain ID = " << (is_default() ? "default" : (is_mini() ? "mini" : m_consensusIdDisplayStr.c_str())) << @@ -1753,9 +1753,6 @@ void SideChain::update_chain_tip(const PoolBlock* block) // Also clear cache because it has data from all old blocks now clear_crypto_cache(); LOGINFO(0, log::LightCyan() << "SYNCHRONIZED"); -#ifdef DEV_TEST_SYNC - m_synchronizedTime = seconds_since_epoch(); -#endif } } prune_old_blocks(); @@ -2064,7 +2061,14 @@ void SideChain::prune_old_blocks() finish_precalc(); #ifdef DEV_TEST_SYNC - if (m_pool && m_precalcFinished.load() && (cur_time >= m_synchronizedTime + 120)) { + if (m_firstPruneTime == 0) { + m_firstPruneTime = seconds_since_epoch(); + + // Test Monero node switching + m_pool->reconnect_to_host(); + } + + if (m_pool && m_precalcFinished.load() && (cur_time >= m_firstPruneTime + 120)) { LOGINFO(0, log::LightGreen() << "[DEV] Synchronization finished successfully, stopping P2Pool now"); print_status(false); P2PServer* server = m_pool->p2p_server(); @@ -2073,6 +2077,7 @@ void SideChain::prune_old_blocks() server->print_bans(); server->show_peers_async(); } + m_pool->print_hosts(); m_pool->stop(); } #endif diff --git a/src/side_chain.h b/src/side_chain.h index 4ca1f91..5b2b51b 100644 --- a/src/side_chain.h +++ b/src/side_chain.h @@ -156,7 +156,7 @@ private: std::atomic m_precalcFinished; #ifdef DEV_TEST_SYNC - uint64_t m_synchronizedTime; + uint64_t m_firstPruneTime; #endif hash m_consensusHash; diff --git a/src/zmq_reader.cpp b/src/zmq_reader.cpp index 029db6d..121585d 100644 --- a/src/zmq_reader.cpp +++ b/src/zmq_reader.cpp @@ -25,7 +25,8 @@ static constexpr char log_category_prefix[] = "ZMQReader "; namespace p2pool { ZMQReader::ZMQReader(const std::string& address, uint32_t zmq_port, const std::string& proxy, MinerCallbackHandler* handler) - : m_address(address) + : m_monitor(nullptr) + , m_address(address) , m_zmqPort(zmq_port) , m_proxy(proxy) , m_handler(handler) @@ -61,29 +62,30 @@ ZMQReader::ZMQReader(const std::string& address, uint32_t zmq_port, const std::s m_subscriber.set(zmq::sockopt::connect_timeout, 1000); + std::string addr = "tcp://127.0.0.1:" + std::to_string(m_publisherPort); + if (!connect(addr, false)) { + throw zmq::error_t(EFSM); + } + if (!m_proxy.empty()) { m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer(m_proxy.c_str(), m_proxy.length())); } - std::string addr = "tcp://" + m_address + ':' + std::to_string(m_zmqPort); - if (!connect(addr)) { + addr = "tcp://" + m_address + ':' + std::to_string(m_zmqPort); + if (!connect(addr, true)) { throw zmq::error_t(EFSM); } + m_address = addr; m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer()); - addr = "tcp://127.0.0.1:" + std::to_string(m_publisherPort); - if (!connect(addr)) { - throw zmq::error_t(EFSM); - } - m_subscriber.set(zmq::sockopt::subscribe, "json-full-chain_main"); m_subscriber.set(zmq::sockopt::subscribe, "json-full-miner_data"); m_subscriber.set(zmq::sockopt::subscribe, "json-minimal-txpool_add"); const int err = uv_thread_create(&m_worker, run_wrapper, this); if (err) { - LOGERR(1, "failed to start ZMQ thread, error " << uv_err_name(err)); + LOGERR(1, "failed to start ZMQ worker thread, error " << uv_err_name(err)); throw zmq::error_t(EMTHREAD); } } @@ -92,18 +94,43 @@ ZMQReader::~ZMQReader() { LOGINFO(1, "stopping"); - m_finished.exchange(true); + stop(); + uv_thread_join(&m_worker); + + delete m_monitor; + + LOGINFO(1, "stopped"); +} + +void ZMQReader::stop() +{ + if (m_stopped.exchange(true)) { + return; + } try { - const char msg[] = "json-minimal-txpool_add:[]"; - m_publisher.send(zmq::const_buffer(msg, sizeof(msg) - 1)); - uv_thread_join(&m_worker); + static constexpr char dummy_msg[] = "json-minimal-txpool_add:[]"; + m_publisher.send(zmq::const_buffer(dummy_msg, sizeof(dummy_msg) - 1)); } catch (const std::exception& e) { LOGERR(1, "exception " << e.what()); } } +void ZMQReader::monitor_thread(void* arg) +{ + LOGINFO(1, "monitor thread ready"); + + ZMQReader* r = reinterpret_cast(arg); + + do {} while (!r->m_stopped && r->m_monitor->m_connected && r->m_monitor->check_event(-1)); + + // If not connected anymore, shut down ZMQReader entirely + r->stop(); + + LOGINFO(1, "monitor thread stopped"); +} + void ZMQReader::run_wrapper(void* arg) { reinterpret_cast(arg)->run(); @@ -112,8 +139,8 @@ void ZMQReader::run_wrapper(void* arg) void ZMQReader::run() { - m_threadRunning = true; - ON_SCOPE_LEAVE([this]() { m_threadRunning = false; }); + m_workerThreadRunning = true; + ON_SCOPE_LEAVE([this]() { m_workerThreadRunning = false; }); zmq_msg_t message = {}; @@ -123,6 +150,13 @@ void ZMQReader::run() throw zmq::error_t(errno); } + const int err = uv_thread_create(&m_monitorThread, monitor_thread, this); + if (err) { + LOGERR(1, "failed to start ZMQ monitor thread, error " << uv_err_name(err)); + throw zmq::error_t(EMTHREAD); + } + ON_SCOPE_LEAVE([this]() { uv_thread_join(&m_monitorThread); }); + LOGINFO(1, "worker thread ready"); do { @@ -131,7 +165,8 @@ void ZMQReader::run() throw zmq::error_t(errno); } - if (m_finished.load()) { + if (m_stopped) { + m_monitor->abort(); break; } @@ -145,19 +180,20 @@ void ZMQReader::run() zmq_msg_close(&message); } -bool ZMQReader::connect(const std::string& address) +void ZMQReader::Monitor::on_event_connected(const zmq_event_t&, const char* address) { - struct ConnectMonitor : public zmq::monitor_t - { - void on_event_connected(const zmq_event_t&, const char* address) ZMQ_OVERRIDE - { - LOGINFO(1, "connected to " << address); - connected = true; - } + LOGINFO(1, "connected to " << address); + m_connected = true; +} - bool connected = false; - } monitor; +void ZMQReader::Monitor::on_event_disconnected(const zmq_event_t&, const char* address) +{ + LOGERR(1, "disconnected from " << address); + m_connected = false; +} +bool ZMQReader::connect(const std::string& address, bool keep_monitor) +{ static uint64_t id = 0; if (!id) { @@ -165,7 +201,7 @@ bool ZMQReader::connect(const std::string& address) id = (static_cast(rd()) << 32) | static_cast(rd()); } - char buf[log::Stream::BUF_SIZE + 1]; + char buf[64]; log::Stream s(buf); s << "inproc://p2pool-connect-mon-" << id << '\0'; ++id; @@ -173,16 +209,25 @@ bool ZMQReader::connect(const std::string& address) using namespace std::chrono; const auto start_time = steady_clock::now(); - monitor.init(m_subscriber, buf); + Monitor* monitor = new Monitor(); + monitor->init(m_subscriber, buf); m_subscriber.connect(address); - while (!monitor.connected && monitor.check_event(-1)) { + while (!monitor->m_connected && monitor->check_event(-1)) { if (duration_cast(steady_clock::now() - start_time).count() >= 1000) { LOGERR(1, "failed to connect to " << address); + delete monitor; return false; } } + if (keep_monitor) { + m_monitor = monitor; + } + else { + delete monitor; + } + return true; } diff --git a/src/zmq_reader.h b/src/zmq_reader.h index d349d5f..b5ea64e 100644 --- a/src/zmq_reader.h +++ b/src/zmq_reader.h @@ -27,12 +27,29 @@ public: ZMQReader(const std::string& address, uint32_t zmq_port, const std::string& proxy, MinerCallbackHandler* handler); ~ZMQReader(); - bool is_running() const { return m_threadRunning.load(); } + bool is_running() const { return m_workerThreadRunning.load(); } private: + struct Monitor : public zmq::monitor_t { + Monitor() : m_connected(false) {} + Monitor(const Monitor&) = delete; + + void on_event_connected(const zmq_event_t&, const char* address) ZMQ_OVERRIDE; + void on_event_disconnected(const zmq_event_t&, const char* address) ZMQ_OVERRIDE; + + std::atomic m_connected; + } *m_monitor; + + static void monitor_thread(void* arg); + + uv_thread_t m_monitorThread{}; + +private: + void stop(); + static void run_wrapper(void* arg); void run(); - bool connect(const std::string& address); + bool connect(const std::string& address, bool keep_monitor); void parse(char* data, size_t size); @@ -46,8 +63,8 @@ private: zmq::socket_t m_publisher{ m_context, ZMQ_PUB }; zmq::socket_t m_subscriber{ m_context, ZMQ_SUB }; uint16_t m_publisherPort = 0; - std::atomic m_finished{ false }; - std::atomic m_threadRunning{ false }; + std::atomic m_stopped{ false }; + std::atomic m_workerThreadRunning{ false }; TxMempoolData m_tx; MinerData m_minerData;