ZMQReader: more reliable connect()

This commit is contained in:
SChernykh 2021-10-28 19:08:42 +02:00
parent 3d60ae8c32
commit a008eac8c6
2 changed files with 28 additions and 9 deletions

View file

@ -19,6 +19,7 @@
#include "zmq_reader.h"
#include "json_parsers.h"
#include <rapidjson/document.h>
#include <random>
static constexpr char log_category_prefix[] = "ZMQReader ";
@ -88,10 +89,14 @@ void ZMQReader::run()
char addr[32];
snprintf(addr, sizeof(addr), "tcp://%s:%u", m_address, m_zmqPort);
while (!connect(addr, m_zmqPort)) { if (m_finished.load()) return; }
if (!connect(addr)) {
return;
}
snprintf(addr, sizeof(addr), "tcp://127.0.0.1:%u", m_publisherPort);
while (!connect(addr, m_publisherPort)) { if (m_finished.load()) return; }
if (!connect(addr)) {
return;
}
m_subscriber.set(zmq::sockopt::subscribe, "json-full-chain_main");
m_subscriber.set(zmq::sockopt::subscribe, "json-full-miner_data");
@ -126,7 +131,7 @@ void ZMQReader::run()
}
}
bool ZMQReader::connect(const char* address, uint32_t id)
bool ZMQReader::connect(const char* address)
{
struct ConnectMonitor : public zmq::monitor_t
{
@ -139,19 +144,33 @@ bool ZMQReader::connect(const char* address, uint32_t id)
bool connected = false;
} monitor;
char buf[32];
snprintf(buf, sizeof(buf), "inproc://connect-mon-%u", id);
static uint64_t id = 0;
if (!id) {
std::random_device rd;
id = (static_cast<uint64_t>(rd()) << 32) | static_cast<uint32_t>(rd());
}
char buf[log::Stream::BUF_SIZE + 1];
log::Stream s(buf);
s << "inproc://p2pool-connect-mon-" << id << '\0';
++id;
monitor.init(m_subscriber, buf);
m_subscriber.connect(address);
using namespace std::chrono;
const system_clock::time_point start_time = system_clock::now();
system_clock::time_point start_time = system_clock::now();
while (!monitor.connected && monitor.check_event(-1)) {
const int64_t elapsed_time = duration_cast<milliseconds>(system_clock::now() - start_time).count();
const system_clock::time_point cur_time = system_clock::now();
const int64_t elapsed_time = duration_cast<milliseconds>(cur_time - start_time).count();
if (elapsed_time >= 3000) {
LOGERR(1, "failed to connect to " << address);
return false;
if (m_finished.load()) {
return false;
}
start_time = cur_time;
}
}

View file

@ -30,7 +30,7 @@ public:
private:
static void run_wrapper(void* arg);
void run();
bool connect(const char* address, uint32_t id);
bool connect(const char* address);
void parse(char* data, size_t size);