mirror of
https://github.com/SChernykh/p2pool.git
synced 2024-11-17 08:17:55 +00:00
Don't let user connect to a node without ZMQ
This commit is contained in:
parent
c1a1249be1
commit
146d29b627
5 changed files with 59 additions and 44 deletions
|
@ -1153,6 +1153,12 @@ void P2PServer::check_zmq()
|
|||
return;
|
||||
}
|
||||
|
||||
if (!m_pool->zmq_running()) {
|
||||
LOGERR(1, "ZMQ is not running, restarting it");
|
||||
m_pool->restart_zmq();
|
||||
return;
|
||||
}
|
||||
|
||||
const uint64_t cur_time = seconds_since_epoch();
|
||||
const uint64_t last_active = m_pool->zmq_last_active();
|
||||
|
||||
|
|
|
@ -735,14 +735,6 @@ void p2pool::download_block_headers(uint64_t current_height)
|
|||
if (parse_block_headers_range(data, size) == current_height - start_height) {
|
||||
update_median_timestamp();
|
||||
if (m_serversStarted.exchange(1) == 0) {
|
||||
try {
|
||||
m_ZMQReader = new ZMQReader(m_params->m_host, m_params->m_zmqPort, m_params->m_socks5Proxy, this);
|
||||
}
|
||||
catch (const std::exception& e) {
|
||||
LOGERR(1, "Couldn't start ZMQ reader: exception " << e.what());
|
||||
PANIC_STOP();
|
||||
}
|
||||
|
||||
m_stratumServer = new StratumServer(this);
|
||||
m_p2pServer = new P2PServer(this);
|
||||
#ifdef WITH_RANDOMX
|
||||
|
@ -750,6 +742,13 @@ void p2pool::download_block_headers(uint64_t current_height)
|
|||
start_mining(m_params->m_minerThreads);
|
||||
}
|
||||
#endif
|
||||
try {
|
||||
m_ZMQReader = new ZMQReader(m_params->m_host, m_params->m_zmqPort, m_params->m_socks5Proxy, this);
|
||||
}
|
||||
catch (const std::exception& e) {
|
||||
LOGERR(1, "Couldn't start ZMQ reader: exception " << e.what());
|
||||
PANIC_STOP();
|
||||
}
|
||||
api_update_network_stats();
|
||||
get_miner_data();
|
||||
}
|
||||
|
@ -1535,6 +1534,11 @@ void p2pool::stop()
|
|||
}
|
||||
}
|
||||
|
||||
bool p2pool::zmq_running() const
|
||||
{
|
||||
return m_ZMQReader && m_ZMQReader->is_running();
|
||||
}
|
||||
|
||||
void p2pool::restart_zmq()
|
||||
{
|
||||
// If p2pool is stopped, m_restartZMQAsync is most likely already closed
|
||||
|
|
|
@ -95,6 +95,7 @@ public:
|
|||
void stop_mining();
|
||||
#endif
|
||||
|
||||
bool zmq_running() const;
|
||||
uint64_t zmq_last_active() const { return m_zmqLastActive; }
|
||||
uint64_t start_time() const { return m_startTime; }
|
||||
void restart_zmq();
|
||||
|
|
|
@ -58,10 +58,32 @@ ZMQReader::ZMQReader(const std::string& address, uint32_t zmq_port, const std::s
|
|||
throw zmq::error_t(EFSM);
|
||||
}
|
||||
|
||||
m_subscriber.set(zmq::sockopt::connect_timeout, 1000);
|
||||
|
||||
if (!m_proxy.empty()) {
|
||||
m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer(m_proxy.c_str(), m_proxy.length()));
|
||||
}
|
||||
|
||||
std::string addr = "tcp://" + m_address + ':' + std::to_string(m_zmqPort);
|
||||
if (!connect(addr)) {
|
||||
throw zmq::error_t(EFSM);
|
||||
}
|
||||
|
||||
m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer());
|
||||
|
||||
addr = "tcp://127.0.0.1:" + std::to_string(m_publisherPort);
|
||||
if (!connect(addr)) {
|
||||
throw zmq::error_t(EFSM);
|
||||
}
|
||||
|
||||
m_subscriber.set(zmq::sockopt::subscribe, "json-full-chain_main");
|
||||
m_subscriber.set(zmq::sockopt::subscribe, "json-full-miner_data");
|
||||
m_subscriber.set(zmq::sockopt::subscribe, "json-minimal-txpool_add");
|
||||
|
||||
const int err = uv_thread_create(&m_worker, run_wrapper, this);
|
||||
if (err) {
|
||||
LOGERR(1, "failed to start ZMQ thread, error " << uv_err_name(err));
|
||||
throw zmq::error_t(EFSM);
|
||||
throw zmq::error_t(EMTHREAD);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,7 +91,7 @@ ZMQReader::~ZMQReader()
|
|||
{
|
||||
LOGINFO(1, "stopping");
|
||||
|
||||
m_finished.exchange(1);
|
||||
m_finished.exchange(true);
|
||||
|
||||
try {
|
||||
const char msg[] = "json-minimal-txpool_add:[]";
|
||||
|
@ -89,28 +111,12 @@ void ZMQReader::run_wrapper(void* arg)
|
|||
|
||||
void ZMQReader::run()
|
||||
{
|
||||
m_threadRunning = true;
|
||||
ON_SCOPE_LEAVE([this]() { m_threadRunning = false; });
|
||||
|
||||
zmq_msg_t message = {};
|
||||
|
||||
try {
|
||||
if (!m_proxy.empty()) {
|
||||
m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer(m_proxy.c_str(), m_proxy.length()));
|
||||
}
|
||||
|
||||
std::string addr = "tcp://" + m_address + ':' + std::to_string(m_zmqPort);
|
||||
if (!connect(addr)) {
|
||||
return;
|
||||
}
|
||||
|
||||
m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer());
|
||||
|
||||
addr = "tcp://127.0.0.1:" + std::to_string(m_publisherPort);
|
||||
if (!connect(addr)) {
|
||||
return;
|
||||
}
|
||||
|
||||
m_subscriber.set(zmq::sockopt::subscribe, "json-full-chain_main");
|
||||
m_subscriber.set(zmq::sockopt::subscribe, "json-full-miner_data");
|
||||
m_subscriber.set(zmq::sockopt::subscribe, "json-minimal-txpool_add");
|
||||
|
||||
zmq_msg_t message;
|
||||
int rc = zmq_msg_init(&message);
|
||||
if (rc != 0) {
|
||||
throw zmq::error_t(errno);
|
||||
|
@ -130,12 +136,12 @@ void ZMQReader::run()
|
|||
|
||||
parse(reinterpret_cast<char*>(zmq_msg_data(&message)), zmq_msg_size(&message));
|
||||
} while (true);
|
||||
|
||||
zmq_msg_close(&message);
|
||||
}
|
||||
catch (const std::exception& e) {
|
||||
LOGERR(1, "exception " << e.what());
|
||||
}
|
||||
|
||||
zmq_msg_close(&message);
|
||||
}
|
||||
|
||||
bool ZMQReader::connect(const std::string& address)
|
||||
|
@ -163,22 +169,17 @@ bool ZMQReader::connect(const std::string& address)
|
|||
s << "inproc://p2pool-connect-mon-" << id << '\0';
|
||||
++id;
|
||||
|
||||
using namespace std::chrono;
|
||||
const auto start_time = steady_clock::now();
|
||||
|
||||
monitor.init(m_subscriber, buf);
|
||||
m_subscriber.connect(address);
|
||||
|
||||
using namespace std::chrono;
|
||||
steady_clock::time_point start_time = steady_clock::now();
|
||||
|
||||
while (!monitor.connected && monitor.check_event(-1)) {
|
||||
const steady_clock::time_point cur_time = steady_clock::now();
|
||||
const int64_t elapsed_time = duration_cast<milliseconds>(cur_time - start_time).count();
|
||||
if (elapsed_time >= 3000) {
|
||||
if (duration_cast<milliseconds>(steady_clock::now() - start_time).count() >= 1000) {
|
||||
LOGERR(1, "failed to connect to " << address);
|
||||
if (m_finished.load()) {
|
||||
return false;
|
||||
}
|
||||
start_time = cur_time;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
|
|
|
@ -27,6 +27,8 @@ public:
|
|||
ZMQReader(const std::string& address, uint32_t zmq_port, const std::string& proxy, MinerCallbackHandler* handler);
|
||||
~ZMQReader();
|
||||
|
||||
bool is_running() const { return m_threadRunning.load(); }
|
||||
|
||||
private:
|
||||
static void run_wrapper(void* arg);
|
||||
void run();
|
||||
|
@ -44,7 +46,8 @@ private:
|
|||
zmq::socket_t m_publisher{ m_context, ZMQ_PUB };
|
||||
zmq::socket_t m_subscriber{ m_context, ZMQ_SUB };
|
||||
uint16_t m_publisherPort = 37891;
|
||||
std::atomic<int> m_finished{ 0 };
|
||||
std::atomic<bool> m_finished{ false };
|
||||
std::atomic<bool> m_threadRunning{ false };
|
||||
|
||||
TxMempoolData m_tx;
|
||||
MinerData m_minerData;
|
||||
|
|
Loading…
Reference in a new issue