mirror of
https://github.com/SChernykh/p2pool.git
synced 2025-01-03 17:29:24 +00:00
ZMQ reader: abort if connect to monerod failed
This commit is contained in:
parent
19043ace90
commit
4fce76576f
2 changed files with 39 additions and 2 deletions
|
@ -82,10 +82,14 @@ void ZMQReader::run()
|
||||||
char addr[32];
|
char addr[32];
|
||||||
|
|
||||||
snprintf(addr, sizeof(addr), "tcp://%s:%u", m_address, m_zmqPort);
|
snprintf(addr, sizeof(addr), "tcp://%s:%u", m_address, m_zmqPort);
|
||||||
m_subscriber.connect(addr);
|
if (!connect(addr, m_zmqPort)) {
|
||||||
|
throw zmq::error_t();
|
||||||
|
}
|
||||||
|
|
||||||
snprintf(addr, sizeof(addr), "tcp://127.0.0.1:%u", m_publisherPort);
|
snprintf(addr, sizeof(addr), "tcp://127.0.0.1:%u", m_publisherPort);
|
||||||
m_subscriber.connect(addr);
|
if (!connect(addr, m_publisherPort)) {
|
||||||
|
throw zmq::error_t();
|
||||||
|
}
|
||||||
|
|
||||||
m_subscriber.set(zmq::sockopt::subscribe, "json-full-chain_main");
|
m_subscriber.set(zmq::sockopt::subscribe, "json-full-chain_main");
|
||||||
m_subscriber.set(zmq::sockopt::subscribe, "json-full-miner_data");
|
m_subscriber.set(zmq::sockopt::subscribe, "json-full-miner_data");
|
||||||
|
@ -121,6 +125,38 @@ void ZMQReader::run()
|
||||||
LOGINFO(1, "worker thread stopped");
|
LOGINFO(1, "worker thread stopped");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ZMQReader::connect(const char* address, uint32_t id)
|
||||||
|
{
|
||||||
|
struct ConnectMonitor : public zmq::monitor_t
|
||||||
|
{
|
||||||
|
void on_event_connected(const zmq_event_t&, const char* address) ZMQ_OVERRIDE
|
||||||
|
{
|
||||||
|
LOGINFO(1, "connected to " << address);
|
||||||
|
connected = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool connected = false;
|
||||||
|
} monitor;
|
||||||
|
|
||||||
|
char buf[32];
|
||||||
|
snprintf(buf, sizeof(buf), "inproc://connect-mon-%u", id);
|
||||||
|
monitor.init(m_subscriber, buf);
|
||||||
|
m_subscriber.connect(address);
|
||||||
|
|
||||||
|
using namespace std::chrono;
|
||||||
|
const system_clock::time_point start_time = system_clock::now();
|
||||||
|
|
||||||
|
while (!monitor.connected && monitor.check_event(-1)) {
|
||||||
|
const int64_t elapsed_time = duration_cast<milliseconds>(system_clock::now() - start_time).count();
|
||||||
|
if (elapsed_time >= 3000) {
|
||||||
|
LOGERR(1, "failed to connect to " << address);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
void ZMQReader::parse(char* data, size_t size)
|
void ZMQReader::parse(char* data, size_t size)
|
||||||
{
|
{
|
||||||
char* value = data;
|
char* value = data;
|
||||||
|
|
|
@ -30,6 +30,7 @@ public:
|
||||||
private:
|
private:
|
||||||
static void run_wrapper(void* arg) { reinterpret_cast<ZMQReader*>(arg)->run(); }
|
static void run_wrapper(void* arg) { reinterpret_cast<ZMQReader*>(arg)->run(); }
|
||||||
void run();
|
void run();
|
||||||
|
bool connect(const char* address, uint32_t id);
|
||||||
|
|
||||||
void parse(char* data, size_t size);
|
void parse(char* data, size_t size);
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue