2021-08-22 10:20:59 +00:00
|
|
|
/*
|
|
|
|
* This file is part of the Monero P2Pool <https://github.com/SChernykh/p2pool>
|
|
|
|
* Copyright (c) 2021 SChernykh <https://github.com/SChernykh>
|
|
|
|
*
|
|
|
|
* This program is free software: you can redistribute it and/or modify
|
|
|
|
* it under the terms of the GNU General Public License as published by
|
|
|
|
* the Free Software Foundation, version 3.
|
|
|
|
*
|
|
|
|
* This program is distributed in the hope that it will be useful, but
|
|
|
|
* WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
|
|
* General Public License for more details.
|
|
|
|
*
|
|
|
|
* You should have received a copy of the GNU General Public License
|
|
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
*/
|
|
|
|
|
|
|
|
#include "common.h"
|
|
|
|
#include "zmq_reader.h"
|
|
|
|
#include "json_parsers.h"
|
|
|
|
#include <rapidjson/document.h>
|
|
|
|
|
|
|
|
static constexpr char log_category_prefix[] = "ZMQReader ";
|
|
|
|
|
|
|
|
namespace p2pool {
|
|
|
|
|
2021-09-05 09:50:56 +00:00
|
|
|
ZMQReader::ZMQReader(const char* address, uint32_t zmq_port, MinerCallbackHandler* handler)
|
2021-08-22 10:20:59 +00:00
|
|
|
: m_address(address)
|
|
|
|
, m_zmqPort(zmq_port)
|
|
|
|
, m_handler(handler)
|
|
|
|
, m_tx()
|
|
|
|
, m_minerData()
|
|
|
|
, m_chainmainData()
|
|
|
|
{
|
|
|
|
for (uint32_t i = m_publisherPort; i < std::numeric_limits<uint16_t>::max(); ++i) {
|
|
|
|
try {
|
|
|
|
m_publisherPort = 0;
|
|
|
|
|
|
|
|
char addr[32];
|
|
|
|
snprintf(addr, sizeof(addr), "tcp://127.0.0.1:%u", i);
|
|
|
|
m_publisher.bind(addr);
|
|
|
|
m_publisherPort = static_cast<uint16_t>(i);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
catch (const std::exception& e) {
|
|
|
|
LOGWARN(1, "failed to to bind port " << i << " for ZMQ publisher, error " << e.what());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!m_publisherPort) {
|
|
|
|
LOGERR(1, "failed to to bind ZMQ publisher port, aborting");
|
|
|
|
panic();
|
|
|
|
}
|
|
|
|
|
|
|
|
const int err = uv_thread_create(&m_worker, run_wrapper, this);
|
|
|
|
if (err) {
|
|
|
|
LOGERR(1, "failed to start ZMQ thread, error " << uv_err_name(err));
|
|
|
|
panic();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
ZMQReader::~ZMQReader()
|
|
|
|
{
|
|
|
|
LOGINFO(1, "stopping");
|
|
|
|
|
|
|
|
m_finished.exchange(1);
|
|
|
|
|
|
|
|
try {
|
|
|
|
const char msg[] = "json-minimal-txpool_add:[]";
|
|
|
|
m_publisher.send(zmq::const_buffer(msg, sizeof(msg) - 1));
|
|
|
|
uv_thread_join(&m_worker);
|
|
|
|
}
|
|
|
|
catch (const std::exception& e) {
|
|
|
|
LOGERR(1, "exception " << e.what() << ", aborting");
|
|
|
|
panic();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-20 22:22:53 +00:00
|
|
|
void ZMQReader::run_wrapper(void* arg)
|
|
|
|
{
|
|
|
|
reinterpret_cast<ZMQReader*>(arg)->run();
|
|
|
|
LOGINFO(1, "worker thread stopped");
|
|
|
|
}
|
|
|
|
|
2021-08-22 10:20:59 +00:00
|
|
|
void ZMQReader::run()
|
|
|
|
{
|
|
|
|
try {
|
|
|
|
char addr[32];
|
|
|
|
|
|
|
|
snprintf(addr, sizeof(addr), "tcp://%s:%u", m_address, m_zmqPort);
|
2021-10-28 17:08:42 +00:00
|
|
|
if (!connect(addr)) {
|
|
|
|
return;
|
|
|
|
}
|
2021-08-22 10:20:59 +00:00
|
|
|
|
|
|
|
snprintf(addr, sizeof(addr), "tcp://127.0.0.1:%u", m_publisherPort);
|
2021-10-28 17:08:42 +00:00
|
|
|
if (!connect(addr)) {
|
|
|
|
return;
|
|
|
|
}
|
2021-08-22 10:20:59 +00:00
|
|
|
|
|
|
|
m_subscriber.set(zmq::sockopt::subscribe, "json-full-chain_main");
|
2021-08-26 13:36:40 +00:00
|
|
|
m_subscriber.set(zmq::sockopt::subscribe, "json-full-miner_data");
|
2021-08-22 10:20:59 +00:00
|
|
|
m_subscriber.set(zmq::sockopt::subscribe, "json-minimal-txpool_add");
|
|
|
|
|
2021-08-27 08:32:31 +00:00
|
|
|
zmq_msg_t message;
|
|
|
|
int rc = zmq_msg_init(&message);
|
|
|
|
if (rc != 0) {
|
|
|
|
throw zmq::error_t();
|
|
|
|
}
|
|
|
|
|
2021-08-22 10:20:59 +00:00
|
|
|
LOGINFO(1, "worker thread ready");
|
|
|
|
|
|
|
|
do {
|
|
|
|
rc = zmq_msg_recv(&message, m_subscriber, 0);
|
|
|
|
if (rc < 0) {
|
|
|
|
throw zmq::error_t();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (m_finished.load()) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
parse(reinterpret_cast<char*>(zmq_msg_data(&message)), zmq_msg_size(&message));
|
|
|
|
} while (true);
|
2021-08-27 08:32:31 +00:00
|
|
|
|
|
|
|
zmq_msg_close(&message);
|
2021-08-22 10:20:59 +00:00
|
|
|
}
|
|
|
|
catch (const std::exception& e) {
|
|
|
|
LOGERR(1, "exception " << e.what() << ", aborting");
|
|
|
|
panic();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-28 17:08:42 +00:00
|
|
|
bool ZMQReader::connect(const char* address)
|
2021-09-27 09:32:49 +00:00
|
|
|
{
|
|
|
|
struct ConnectMonitor : public zmq::monitor_t
|
|
|
|
{
|
|
|
|
void on_event_connected(const zmq_event_t&, const char* address) ZMQ_OVERRIDE
|
|
|
|
{
|
|
|
|
LOGINFO(1, "connected to " << address);
|
|
|
|
connected = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool connected = false;
|
|
|
|
} monitor;
|
|
|
|
|
2021-10-28 17:08:42 +00:00
|
|
|
static uint64_t id = 0;
|
|
|
|
|
|
|
|
if (!id) {
|
|
|
|
std::random_device rd;
|
|
|
|
id = (static_cast<uint64_t>(rd()) << 32) | static_cast<uint32_t>(rd());
|
|
|
|
}
|
|
|
|
|
|
|
|
char buf[log::Stream::BUF_SIZE + 1];
|
|
|
|
log::Stream s(buf);
|
|
|
|
s << "inproc://p2pool-connect-mon-" << id << '\0';
|
|
|
|
++id;
|
|
|
|
|
2021-09-27 09:32:49 +00:00
|
|
|
monitor.init(m_subscriber, buf);
|
|
|
|
m_subscriber.connect(address);
|
|
|
|
|
|
|
|
using namespace std::chrono;
|
2021-10-28 17:08:42 +00:00
|
|
|
system_clock::time_point start_time = system_clock::now();
|
2021-09-27 09:32:49 +00:00
|
|
|
|
|
|
|
while (!monitor.connected && monitor.check_event(-1)) {
|
2021-10-28 17:08:42 +00:00
|
|
|
const system_clock::time_point cur_time = system_clock::now();
|
|
|
|
const int64_t elapsed_time = duration_cast<milliseconds>(cur_time - start_time).count();
|
2021-09-27 09:32:49 +00:00
|
|
|
if (elapsed_time >= 3000) {
|
|
|
|
LOGERR(1, "failed to connect to " << address);
|
2021-10-28 17:08:42 +00:00
|
|
|
if (m_finished.load()) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
start_time = cur_time;
|
2021-09-27 09:32:49 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2021-08-22 10:20:59 +00:00
|
|
|
void ZMQReader::parse(char* data, size_t size)
|
|
|
|
{
|
|
|
|
char* value = data;
|
|
|
|
char* end = data + size;
|
|
|
|
|
|
|
|
while ((value < end) && (*value != ':')) {
|
|
|
|
++value;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (value >= end) {
|
|
|
|
LOGWARN(1, "ZeroMQ message doesn't have ':' delimiter, skipping it");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
*value = '\0';
|
|
|
|
++value;
|
|
|
|
|
|
|
|
using namespace rapidjson;
|
|
|
|
|
|
|
|
Document doc;
|
2021-08-29 06:34:26 +00:00
|
|
|
if (doc.Parse<rapidjson::kParseCommentsFlag | rapidjson::kParseTrailingCommasFlag>(value, end - value).HasParseError()) {
|
|
|
|
LOGWARN(1, "ZeroMQ message failed to parse, skipping it");
|
|
|
|
return;
|
|
|
|
}
|
2021-08-22 10:20:59 +00:00
|
|
|
|
|
|
|
if (strcmp(data, "json-minimal-txpool_add") == 0) {
|
|
|
|
if (!doc.IsArray()) {
|
|
|
|
LOGWARN(1, "json-minimal-txpool_add is not an array, skipping it");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2021-09-14 10:54:42 +00:00
|
|
|
m_tx.time_received = time(nullptr);
|
|
|
|
|
2021-08-22 10:20:59 +00:00
|
|
|
for (SizeType i = 0, n = doc.Size(); i < n; ++i) {
|
|
|
|
const auto& v = doc[i];
|
|
|
|
if (PARSE(v, m_tx, id) && PARSE(v, m_tx, blob_size) && PARSE(v, m_tx, weight) && PARSE(v, m_tx, fee)) {
|
|
|
|
m_handler->handle_tx(m_tx);
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
LOGWARN(1, "transaction #" << (i + 1) << " in json-minimal-txpool_add failed to parse, skipped it");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-08-26 13:36:40 +00:00
|
|
|
else if (strcmp(data, "json-full-miner_data") == 0) {
|
2021-08-22 10:20:59 +00:00
|
|
|
if (!doc.IsObject()) {
|
2021-08-26 13:36:40 +00:00
|
|
|
LOGWARN(1, "json-full-miner_data is not an object, skipping it");
|
2021-08-22 10:20:59 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!PARSE(doc, m_minerData, major_version) ||
|
|
|
|
!PARSE(doc, m_minerData, height) ||
|
|
|
|
!PARSE(doc, m_minerData, prev_id) ||
|
|
|
|
!PARSE(doc, m_minerData, seed_hash) ||
|
|
|
|
!PARSE(doc, m_minerData, median_weight) ||
|
|
|
|
!PARSE(doc, m_minerData, already_generated_coins) ||
|
|
|
|
!PARSE(doc, m_minerData, difficulty)) {
|
2021-08-26 13:36:40 +00:00
|
|
|
LOGWARN(1, "json-full-miner_data failed to parse, skipping it");
|
2021-08-22 10:20:59 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!doc.HasMember("tx_backlog")) {
|
2021-08-26 13:36:40 +00:00
|
|
|
LOGWARN(1, "json-full-miner_data doesn't have 'tx_backlog', skipping it");
|
2021-08-22 10:20:59 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
const auto& tx_backlog = doc["tx_backlog"];
|
|
|
|
|
|
|
|
if (!tx_backlog.IsArray()) {
|
2021-08-26 13:36:40 +00:00
|
|
|
LOGWARN(1, "'tx_backlog' in json-full-miner_data is not an array, skipping it");
|
2021-08-22 10:20:59 +00:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
m_minerData.tx_backlog.clear();
|
|
|
|
|
|
|
|
const SizeType n = tx_backlog.Size();
|
|
|
|
m_minerData.tx_backlog.reserve(n);
|
|
|
|
|
|
|
|
for (SizeType i = 0; i < n; ++i) {
|
|
|
|
const auto& v = tx_backlog[i];
|
|
|
|
if (PARSE(v, m_tx, id) && PARSE(v, m_tx, weight) && PARSE(v, m_tx, fee)) {
|
|
|
|
m_minerData.tx_backlog.push_back(m_tx);
|
|
|
|
}
|
|
|
|
else {
|
2021-08-26 13:36:40 +00:00
|
|
|
LOGWARN(1, "transaction #" << (i + 1) << " in json-full-miner_data `tx_backlog` failed to parse, skipped it");
|
2021-08-22 10:20:59 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
m_handler->handle_miner_data(m_minerData);
|
|
|
|
}
|
|
|
|
else if (strcmp(data, "json-full-chain_main") == 0) {
|
|
|
|
if (!doc.IsArray()) {
|
|
|
|
LOGWARN(1, "json-full-chain_main is not an object, skipping it");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
auto arr = doc.GetArray();
|
|
|
|
for (Value* i = arr.begin(); i != arr.end(); ++i) {
|
|
|
|
if (!PARSE(*i, m_chainmainData, timestamp)) {
|
2021-09-01 08:43:10 +00:00
|
|
|
LOGWARN(1, "json-full-chain_main timestamp failed to parse, skipping it");
|
2021-08-22 10:20:59 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
auto it = i->FindMember("miner_tx");
|
|
|
|
if ((it == i->MemberEnd()) || !it->value.IsObject()) {
|
2021-09-01 08:43:10 +00:00
|
|
|
LOGWARN(1, "json-full-chain_main miner_tx not found, skipping it");
|
2021-08-22 10:20:59 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
auto extra_it = it->value.FindMember("extra");
|
|
|
|
if ((extra_it == it->value.MemberEnd()) || !extra_it->value.IsString()) {
|
2021-09-01 08:43:10 +00:00
|
|
|
LOGWARN(1, "json-full-chain_main extra not found, skipping it");
|
2021-08-22 10:20:59 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
auto inputs_it = it->value.FindMember("inputs");
|
|
|
|
if ((inputs_it == it->value.MemberEnd()) || !inputs_it->value.IsArray()) {
|
2021-09-01 08:43:10 +00:00
|
|
|
LOGWARN(1, "json-full-chain_main inputs not found, skipping it");
|
2021-08-22 10:20:59 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2021-09-01 08:43:10 +00:00
|
|
|
// Get block reward from miner_tx outputs
|
|
|
|
m_chainmainData.reward = 0;
|
|
|
|
|
|
|
|
auto outputs_it = it->value.FindMember("outputs");
|
|
|
|
if ((outputs_it != it->value.MemberEnd()) && outputs_it->value.IsArray()) {
|
|
|
|
auto outputs = outputs_it->value.GetArray();
|
|
|
|
for (SizeType j = 0, n = outputs.Size(); j < n; ++j) {
|
|
|
|
if (outputs[j].IsObject()) {
|
|
|
|
auto amount_it = outputs[j].FindMember("amount");
|
|
|
|
if (amount_it != outputs[j].MemberEnd() && amount_it->value.IsUint64()) {
|
|
|
|
m_chainmainData.reward += amount_it->value.GetUint64();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else {
|
2021-09-18 08:03:06 +00:00
|
|
|
LOGWARN(1, "json-full-chain_main outputs not found");
|
2021-09-01 08:43:10 +00:00
|
|
|
}
|
|
|
|
|
2021-08-22 10:20:59 +00:00
|
|
|
auto inputs = inputs_it->value.GetArray();
|
|
|
|
if ((inputs.Size() == 0) || !inputs[0].IsObject()) {
|
2021-09-01 08:43:10 +00:00
|
|
|
LOGWARN(1, "json-full-chain_main inputs is not an array, skipping it");
|
2021-08-22 10:20:59 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
it = inputs[0].FindMember("gen");
|
|
|
|
if ((it == inputs[0].MemberEnd()) || !it->value.IsObject()) {
|
2021-09-01 08:43:10 +00:00
|
|
|
LOGWARN(1, "json-full-chain_main gen not found, skipping it");
|
2021-08-22 10:20:59 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!PARSE(it->value, m_chainmainData, height)) {
|
2021-09-01 08:43:10 +00:00
|
|
|
LOGWARN(1, "json-full-chain_main height not found, skipping it");
|
2021-08-22 10:20:59 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
m_handler->handle_chain_main(m_chainmainData, extra_it->value.GetString());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
} // namespace p2pool
|