From 6c6ef1c1b8346e42d8101ff8eb371ce43a4a8f95 Mon Sep 17 00:00:00 2001 From: SChernykh Date: Wed, 11 May 2022 11:44:42 +0200 Subject: [PATCH] Restart ZMQ connection if it looks dead --- src/p2p_server.cpp | 1 + src/p2pool.cpp | 38 +++++++++++++++++++++++++++++++++++++- src/p2pool.h | 3 +++ src/zmq_reader.cpp | 16 +++++++--------- 4 files changed, 48 insertions(+), 10 deletions(-) diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index a0bc6cf..b56acc3 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -1029,6 +1029,7 @@ void P2PServer::check_zmq() if (cur_time >= last_active + 300) { const uint64_t dt = static_cast(cur_time - last_active); LOGERR(1, "no ZMQ messages received from monerod in the last " << dt << " seconds, check your monerod/p2pool/network/firewall setup!!!"); + m_pool->restart_zmq(); } } diff --git a/src/p2pool.cpp b/src/p2pool.cpp index 0c7c31a..79511fe 100644 --- a/src/p2pool.cpp +++ b/src/p2pool.cpp @@ -108,6 +108,13 @@ p2pool::p2pool(int argc, char* argv[]) } m_stopAsync.data = this; + err = uv_async_init(uv_default_loop_checked(), &m_restartZMQAsync, on_restart_zmq); + if (err) { + LOGERR(1, "uv_async_init failed, error " << uv_err_name(err)); + panic(); + } + m_restartZMQAsync.data = this; + uv_rwlock_init_checked(&m_mainchainLock); uv_rwlock_init_checked(&m_minerDataLock); uv_mutex_init_checked(&m_foundBlocksLock); @@ -438,6 +445,7 @@ void p2pool::on_stop(uv_async_t* async) uv_close(reinterpret_cast(&pool->m_submitBlockAsync), nullptr); uv_close(reinterpret_cast(&pool->m_blockTemplateAsync), nullptr); uv_close(reinterpret_cast(&pool->m_stopAsync), nullptr); + uv_close(reinterpret_cast(&pool->m_restartZMQAsync), nullptr); uv_stop(uv_default_loop()); } @@ -632,7 +640,14 @@ void p2pool::download_block_headers(uint64_t current_height) if (parse_block_headers_range(data, size) == current_height - start_height) { update_median_timestamp(); if (m_serversStarted.exchange(1) == 0) { - m_ZMQReader = new ZMQReader(m_params->m_host.c_str(), m_params->m_zmqPort, this); + try { + m_ZMQReader = new ZMQReader(m_params->m_host.c_str(), m_params->m_zmqPort, this); + } + catch (const std::exception& e) { + LOGERR(1, "Couldn't start ZMQ reader: exception " << e.what()); + panic(); + } + m_stratumServer = new StratumServer(this); m_p2pServer = new P2PServer(this); #ifdef WITH_RANDOMX @@ -1376,6 +1391,27 @@ void p2pool::stop() uv_async_send(&m_stopAsync); } +void p2pool::restart_zmq() +{ + if (!is_main_thread()) { + uv_async_send(&m_restartZMQAsync); + return; + } + + get_miner_data(); + + delete m_ZMQReader; + m_ZMQReader = nullptr; + + try { + m_ZMQReader = new ZMQReader(m_params->m_host.c_str(), m_params->m_zmqPort, this); + m_zmqLastActive = seconds_since_epoch(); + } + catch (const std::exception& e) { + LOGERR(1, "Couldn't restart ZMQ reader: exception " << e.what()); + } +} + int p2pool::run() { if (!m_params->ok()) { diff --git a/src/p2pool.h b/src/p2pool.h index fe05d8b..781f4b0 100644 --- a/src/p2pool.h +++ b/src/p2pool.h @@ -95,6 +95,7 @@ public: uint64_t zmq_last_active() const { return m_zmqLastActive; } uint64_t start_time() const { return m_startTime; } + void restart_zmq(); private: p2pool(const p2pool&) = delete; @@ -103,6 +104,7 @@ private: static void on_submit_block(uv_async_t* async) { reinterpret_cast(async->data)->submit_block(); } static void on_update_block_template(uv_async_t* async) { reinterpret_cast(async->data)->update_block_template(); } static void on_stop(uv_async_t*); + static void on_restart_zmq(uv_async_t* async) { reinterpret_cast(async->data)->restart_zmq(); } void submit_block() const; @@ -197,6 +199,7 @@ private: std::atomic m_zmqLastActive; uint64_t m_startTime; + uv_async_t m_restartZMQAsync; ZMQReader* m_ZMQReader = nullptr; }; diff --git a/src/zmq_reader.cpp b/src/zmq_reader.cpp index baa6da1..14b39f7 100644 --- a/src/zmq_reader.cpp +++ b/src/zmq_reader.cpp @@ -49,13 +49,13 @@ ZMQReader::ZMQReader(const char* address, uint32_t zmq_port, MinerCallbackHandle if (!m_publisherPort) { LOGERR(1, "failed to to bind ZMQ publisher port, aborting"); - panic(); + throw zmq::error_t(EFSM); } const int err = uv_thread_create(&m_worker, run_wrapper, this); if (err) { LOGERR(1, "failed to start ZMQ thread, error " << uv_err_name(err)); - panic(); + throw zmq::error_t(EFSM); } } @@ -71,8 +71,7 @@ ZMQReader::~ZMQReader() uv_thread_join(&m_worker); } catch (const std::exception& e) { - LOGERR(1, "exception " << e.what() << ", aborting"); - panic(); + LOGERR(1, "exception " << e.what()); } } @@ -104,7 +103,7 @@ void ZMQReader::run() zmq_msg_t message; int rc = zmq_msg_init(&message); if (rc != 0) { - throw zmq::error_t(); + throw zmq::error_t(errno); } LOGINFO(1, "worker thread ready"); @@ -112,7 +111,7 @@ void ZMQReader::run() do { rc = zmq_msg_recv(&message, m_subscriber, 0); if (rc < 0) { - throw zmq::error_t(); + throw zmq::error_t(errno); } if (m_finished.load()) { @@ -125,8 +124,7 @@ void ZMQReader::run() zmq_msg_close(&message); } catch (const std::exception& e) { - LOGERR(1, "exception " << e.what() << ", aborting"); - panic(); + LOGERR(1, "exception " << e.what()); } } @@ -196,7 +194,7 @@ void ZMQReader::parse(char* data, size_t size) using namespace rapidjson; Document doc; - if (doc.Parse(value, end - value).HasParseError()) { + if (doc.Parse(value, end - value).HasParseError()) { LOGWARN(1, "ZeroMQ message failed to parse, skipping it"); return; }