Added support for multiple Monero hosts

Switch to the next host in list when P2Pool thinks current host is stuck or lagging
This commit is contained in:
SChernykh 2023-06-16 15:51:33 +02:00
parent 192f1d722d
commit b5fa34e1a1
15 changed files with 402 additions and 147 deletions

View file

@ -32,14 +32,13 @@ jobs:
make -j$(nproc)
- name: Run p2pool
timeout-minutes: 25
run: |
cd build
mkdir data
python ../tests/src/stratum_dummy.py 1 &
python ../tests/src/stratum_dummy.py 2 &
python ../tests/src/stratum_dummy.py 3 &
TSAN_OPTIONS="suppressions=../tests/src/tsan_sup.txt halt_on_error=1" ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6
TSAN_OPTIONS="suppressions=../tests/src/tsan_sup.txt halt_on_error=1" ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --host xmr2.rs.me --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6
grep 'Synchronization finished successfully' p2pool.log
- name: Archive p2pool.log
@ -110,14 +109,13 @@ jobs:
make -j$(nproc)
- name: Run p2pool
timeout-minutes: 20
run: |
cd build
mkdir data
python ../tests/src/stratum_dummy.py 1 &
python ../tests/src/stratum_dummy.py 2 &
python ../tests/src/stratum_dummy.py 3 &
MSAN_OPTIONS="halt_on_error=1" ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6
MSAN_OPTIONS="halt_on_error=1" ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --host xmr2.rs.me --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6
grep 'Synchronization finished successfully' p2pool.log
- name: Archive p2pool.log
@ -153,14 +151,13 @@ jobs:
make -j$(nproc)
- name: Run p2pool
timeout-minutes: 20
run: |
cd build
mkdir data
python ../tests/src/stratum_dummy.py 1 &
python ../tests/src/stratum_dummy.py 2 &
python ../tests/src/stratum_dummy.py 3 &
UBSAN_OPTIONS="suppressions=../tests/src/ubsan_sup.txt halt_on_error=1" ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6
UBSAN_OPTIONS="suppressions=../tests/src/ubsan_sup.txt halt_on_error=1" ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --host xmr2.rs.me --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6
grep 'Synchronization finished successfully' p2pool.log
- name: Archive p2pool.log
@ -196,14 +193,13 @@ jobs:
make -j$(nproc)
- name: Run p2pool
timeout-minutes: 20
run: |
cd build
mkdir data
python ../tests/src/stratum_dummy.py 1 &
python ../tests/src/stratum_dummy.py 2 &
python ../tests/src/stratum_dummy.py 3 &
ASAN_OPTIONS="detect_stack_use_after_return=1 atexit=1" ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6
ASAN_OPTIONS="detect_stack_use_after_return=1 atexit=1" ./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --host xmr2.rs.me --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6
grep 'Synchronization finished successfully' p2pool.log
- name: Archive p2pool.log
@ -236,14 +232,13 @@ jobs:
make -j3
- name: Run p2pool
timeout-minutes: 20
run: |
cd build
mkdir data
python ../tests/src/stratum_dummy.py 1 &
python ../tests/src/stratum_dummy.py 2 &
python ../tests/src/stratum_dummy.py 3 &
./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6
./p2pool --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --host xmr2.rs.me --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6
grep 'Synchronization finished successfully' p2pool.log
- name: Archive p2pool.log
@ -276,14 +271,13 @@ jobs:
& "C:\\Program Files\\Microsoft Visual Studio\\2022\\Enterprise\\Msbuild\\Current\\Bin\\amd64\\msbuild" /m /p:Configuration=Debug p2pool.vcxproj
- name: Run p2pool
timeout-minutes: 20
run: |
cd build/Debug
mkdir data
Start-Process python -ArgumentList "../../tests/src/stratum_dummy.py 1"
Start-Process python -ArgumentList "../../tests/src/stratum_dummy.py 2"
Start-Process python -ArgumentList "../../tests/src/stratum_dummy.py 3"
./p2pool.exe --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6
./p2pool.exe --host xmrnode.facspro.net --rpc-port 18089 --zmq-port 18084 --host xmr2.rs.me --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --data-api data --local-api --loglevel 6
findstr /C:"Synchronization finished successfully" p2pool.log
- name: Archive p2pool.log

View file

@ -120,7 +120,7 @@ if (WITH_RANDOMX)
set(SOURCES ${SOURCES} src/miner.cpp)
endif()
if (NOT STATIC_BINARY AND NOT STATIC_LIBS)
if (NOT ((CMAKE_CXX_COMPILER_ID MATCHES MSVC) OR STATIC_BINARY OR STATIC_LIBS))
include(FindCURL)
endif()

View file

@ -34,4 +34,17 @@
### Example command line
```
p2pool.exe --host 127.0.0.1 --rpc-port 18081 --zmq-port 18083 --wallet YOUR_WALLET_ADDRESS --stratum 0.0.0.0:3333 --p2p 0.0.0.0:37889
```
### Multiple backup hosts
You can have multiple hosts in command line. Each new host uses RPC and zmq-pub port numbers from the previous host (or default 18081/18083 if none were specified).
In this example, you have local Monero node running on ports 18081/18083 (RPC/zmq-pub), and several backup nodes running on ports 18089/18084. If P2Pool detects that the node it's currently using is unavailable or stuck, it will cycle to the next node in the list.
```
p2pool.exe --host 127.0.0.1 --host xmr1.rs.me --rpc-port 18089 --zmq-port 18084 --host xmr2.rs.me --host xmr3.rs.me --wallet YOUR_WALLET_ADDRESS
```

View file

@ -140,7 +140,7 @@ typedef struct cmd {
cmdfunc *func;
} cmd;
static cmdfunc do_help, do_status, do_loglevel, do_addpeers, do_droppeers, do_showpeers, do_showworkers, do_showbans, do_outpeers, do_inpeers, do_exit, do_version;
static cmdfunc do_help, do_status, do_loglevel, do_addpeers, do_droppeers, do_showpeers, do_showworkers, do_showbans, do_showhosts, do_nexthost, do_outpeers, do_inpeers, do_exit, do_version;
#ifdef WITH_RANDOMX
static cmdfunc do_start_mining, do_stop_mining;
@ -155,6 +155,8 @@ static cmd cmds[] = {
{ STRCONST("peers"), "", "show all peers", do_showpeers },
{ STRCONST("workers"), "", "show all connected workers", do_showworkers },
{ STRCONST("bans"), "", "show all banned IPs", do_showbans },
{ STRCONST("hosts"), "", "show Monero hosts", do_showhosts },
{ STRCONST("next_host"), "", "switch to the next Monero host", do_nexthost },
{ STRCONST("outpeers"), "<N>", "set maximum number of outgoing connections", do_outpeers },
{ STRCONST("inpeers"), "<N>", "set maximum number of incoming connections", do_inpeers },
#ifdef WITH_RANDOMX
@ -240,6 +242,18 @@ static void do_showbans(p2pool* m_pool, const char* /* args */)
}
}
// cppcheck-suppress constParameterCallback
static void do_showhosts(p2pool* m_pool, const char* /* args */)
{
m_pool->print_hosts();
}
// cppcheck-suppress constParameterCallback
static void do_nexthost(p2pool* m_pool, const char* /* args */)
{
m_pool->reconnect_to_host();
}
// cppcheck-suppress constParameterCallback
static void do_outpeers(p2pool* m_pool, const char* args)
{

View file

@ -615,9 +615,9 @@ void P2PServer::load_peer_list()
void P2PServer::load_monerod_peer_list()
{
const Params& params = m_pool->params();
const Params::Host host = m_pool->current_host();
JSONRPCRequest::call(params.m_host, params.m_rpcPort, "/get_peer_list", params.m_rpcLogin, m_socks5Proxy,
JSONRPCRequest::call(host.m_address, host.m_rpcPort, "/get_peer_list", host.m_rpcLogin, m_socks5Proxy,
[this](const char* data, size_t size)
{
#define ERR_STR "/get_peer_list RPC request returned invalid JSON "
@ -1045,7 +1045,7 @@ void P2PServer::on_timer()
update_peer_list();
save_peer_list_async();
update_peer_connections();
check_zmq();
check_host();
check_block_template();
api_update_local_stats();
}
@ -1165,25 +1165,41 @@ void P2PServer::download_missing_blocks()
}
}
void P2PServer::check_zmq()
void P2PServer::check_host()
{
if ((m_timerCounter % 30) != 3) {
if (!m_pool->startup_finished()) {
return;
}
if (!m_pool->zmq_running()) {
LOGERR(1, "ZMQ is not running, restarting it");
m_pool->restart_zmq();
m_pool->reconnect_to_host();
return;
}
const uint64_t height = m_pool->miner_data().height;
const SideChain& side_chain = m_pool->side_chain();
// If the latest 5 side chain blocks are 2 or more Monero blocks ahead, then the node is probably stuck
uint32_t counter = 5;
for (const PoolBlock* b = side_chain.chainTip(); b && (b->m_txinGenHeight >= height + 2); b = side_chain.find_block(b->m_parent)) {
if (--counter == 0) {
const Params::Host host = m_pool->current_host();
LOGERR(1, host.m_displayName << " seems to be stuck, reconnecting");
m_pool->reconnect_to_host();
return;
}
}
const uint64_t cur_time = seconds_since_epoch();
const uint64_t last_active = m_pool->zmq_last_active();
// If there were no ZMQ messages in the last 5 minutes, then the node is probably stuck
if (cur_time >= last_active + 300) {
const uint64_t dt = static_cast<uint64_t>(cur_time - last_active);
LOGERR(1, "no ZMQ messages received from monerod in the last " << dt << " seconds, check your monerod/p2pool/network/firewall setup!!!");
m_pool->restart_zmq();
const Params::Host host = m_pool->current_host();
LOGERR(1, "no ZMQ messages received from " << host.m_displayName << " in the last " << dt << " seconds, check your monerod/p2pool/network/firewall setup!!!");
m_pool->reconnect_to_host();
}
}
@ -1230,6 +1246,8 @@ P2PServer::P2PClient::P2PClient()
void P2PServer::on_shutdown()
{
save_peer_list();
uv_timer_stop(&m_timer);
uv_close(reinterpret_cast<uv_handle_t*>(&m_timer), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&m_broadcastAsync), nullptr);

View file

@ -191,7 +191,7 @@ private:
void flush_cache();
void download_missing_blocks();
void check_zmq();
void check_host();
void check_block_template();
void update_peer_connections();
void update_peer_list();

View file

@ -69,36 +69,14 @@ p2pool::p2pool(int argc, char* argv[])
}
#endif
if (!p->m_wallet.valid()) {
LOGERR(1, "Invalid wallet address. Try \"p2pool --help\".");
throw std::exception();
}
m_hostStr = p->m_host;
if (p->m_socks5Proxy.empty()) {
if (p->m_dns) {
bool is_v6;
if (!resolve_host(p->m_host, is_v6)) {
LOGERR(1, "resolve_host failed for " << p->m_host);
throw std::exception();
}
}
else if (p->m_host.find_first_not_of("0123456789.:") != std::string::npos) {
LOGERR(1, "Can't resolve hostname " << p->m_host << " with DNS disabled");
for (Params::Host& h : p->m_hosts) {
if (!h.init_display_name(*p)) {
throw std::exception();
}
}
{
const bool changed = (p->m_host != m_hostStr);
const std::string rpc_port = ':' + std::to_string(p->m_rpcPort);
const std::string zmq_port = ":ZMQ:" + std::to_string(p->m_zmqPort);
m_hostStr += rpc_port + zmq_port;
if (changed) {
m_hostStr += " (" + p->m_host + ')';
}
}
m_currentHost = p->m_hosts.front();
m_currentHostIndex = 0;
hash pub, sec, eph_public_key;
generate_keys(pub, sec);
@ -139,15 +117,17 @@ p2pool::p2pool(int argc, char* argv[])
}
m_stopAsync.data = this;
err = uv_async_init(uv_default_loop_checked(), &m_restartZMQAsync, on_restart_zmq);
err = uv_async_init(uv_default_loop_checked(), &m_reconnectToHostAsync, on_reconnect_to_host);
if (err) {
LOGERR(1, "uv_async_init failed, error " << uv_err_name(err));
throw std::exception();
}
m_restartZMQAsync.data = this;
m_reconnectToHostAsync.data = this;
uv_rwlock_init_checked(&m_currentHostLock);
uv_rwlock_init_checked(&m_mainchainLock);
uv_rwlock_init_checked(&m_minerDataLock);
uv_rwlock_init_checked(&m_ZMQReaderLock);
uv_mutex_init_checked(&m_foundBlocksLock);
#ifdef WITH_RANDOMX
uv_mutex_init_checked(&m_minerLock);
@ -204,8 +184,10 @@ p2pool::~p2pool()
}
#endif
uv_rwlock_destroy(&m_currentHostLock);
uv_rwlock_destroy(&m_mainchainLock);
uv_rwlock_destroy(&m_minerDataLock);
uv_rwlock_destroy(&m_ZMQReaderLock);
uv_mutex_destroy(&m_foundBlocksLock);
#ifdef WITH_RANDOMX
uv_mutex_destroy(&m_minerLock);
@ -220,6 +202,20 @@ p2pool::~p2pool()
delete m_params;
}
void p2pool::print_hosts() const
{
const Params::Host host = current_host();
for (const Params::Host& h : m_params->m_hosts) {
if (h.m_displayName == host.m_displayName) {
LOGINFO(0, log::LightCyan() << "-> " << h.m_displayName);
}
else {
LOGINFO(0, " " << h.m_displayName);
}
}
}
bool p2pool::calculate_hash(const void* data, size_t size, uint64_t height, const hash& seed, hash& result, bool force_light_mode)
{
return m_hasher->calculate(data, size, height, seed, result, force_light_mode);
@ -297,17 +293,15 @@ void p2pool::handle_miner_data(MinerData& data)
{
WriteLock lock(m_mainchainLock);
m_mainchainByHeight[data.height].difficulty = data.difficulty;
ChainMain& c0 = m_mainchainByHeight[data.height];
c0.height = data.height;
c0.difficulty = data.difficulty;
ChainMain& c = m_mainchainByHeight[data.height - 1];
c.height = data.height - 1;
c.id = data.prev_id;
ChainMain& c1 = m_mainchainByHeight[data.height - 1];
c1.height = data.height - 1;
c1.id = data.prev_id;
// timestamp and reward is unknown here
c.timestamp = 0;
c.reward = 0;
m_mainchainByHash[c.id] = c;
m_mainchainByHash[c1.id] = c1;
cleanup_mainchain_data(data.height);
}
@ -321,8 +315,11 @@ void p2pool::handle_miner_data(MinerData& data)
m_updateSeed = true;
update_median_timestamp();
const Params::Host host = current_host();
LOGINFO(2,
"new miner data\n---------------------------------------------------------------------------------------------------------------" <<
"\nhost = " << host.m_displayName <<
"\nmajor_version = " << data.major_version <<
"\nheight = " << data.height <<
"\nprev_id = " << log::LightBlue() << data.prev_id << log::NoColor() <<
@ -370,7 +367,7 @@ void p2pool::handle_miner_data(MinerData& data)
log::Stream s(buf);
s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << h << "}}\0";
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, m_params->m_socks5Proxy,
JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, m_params->m_socks5Proxy,
[this, h](const char* data, size_t size)
{
ChainMain block;
@ -528,7 +525,7 @@ void p2pool::on_stop(uv_async_t* async)
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_submitBlockAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_blockTemplateAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_stopAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_restartZMQAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_reconnectToHostAsync), nullptr);
init_signals(pool, false);
@ -603,7 +600,9 @@ void p2pool::submit_block() const
}
request.append("\"]}");
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, request, m_params->m_rpcLogin, m_params->m_socks5Proxy,
const Params::Host host = current_host();
JSONRPCRequest::call(host.m_address, host.m_rpcPort, request, host.m_rpcLogin, m_params->m_socks5Proxy,
[height, diff, template_id, nonce, extra_nonce, sidechain_id, is_external](const char* data, size_t size)
{
rapidjson::Document doc;
@ -714,13 +713,15 @@ void p2pool::download_block_headers(uint64_t current_height)
char buf[log::Stream::BUF_SIZE + 1] = {};
log::Stream s(buf);
const Params::Host host = current_host();
// First download 2 RandomX seeds
const uint64_t seed_heights[2] = { prev_seed_height, seed_height };
for (uint64_t height : seed_heights) {
s.m_pos = 0;
s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << height << "}}\0";
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, m_params->m_socks5Proxy,
JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, m_params->m_socks5Proxy,
[this, prev_seed_height, height](const char* data, size_t size)
{
ChainMain block;
@ -749,8 +750,8 @@ void p2pool::download_block_headers(uint64_t current_height)
s.m_pos = 0;
s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_headers_range\",\"params\":{\"start_height\":" << start_height << ",\"end_height\":" << current_height - 1 << "}}\0";
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, m_params->m_socks5Proxy,
[this, start_height, current_height](const char* data, size_t size)
JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, m_params->m_socks5Proxy,
[this, start_height, current_height, host](const char* data, size_t size)
{
if (parse_block_headers_range(data, size) == current_height - start_height) {
update_median_timestamp();
@ -762,15 +763,23 @@ void p2pool::download_block_headers(uint64_t current_height)
start_mining(m_params->m_minerThreads);
}
#endif
try {
m_ZMQReader = new ZMQReader(m_params->m_host, m_params->m_zmqPort, m_params->m_socks5Proxy, this);
}
catch (const std::exception& e) {
LOGERR(1, "Couldn't start ZMQ reader: exception " << e.what());
PANIC_STOP();
{
WriteLock lock(m_ZMQReaderLock);
try {
m_ZMQReader = new ZMQReader(host.m_address, host.m_zmqPort, m_params->m_socks5Proxy, this);
m_zmqLastActive = seconds_since_epoch();
}
catch (const std::exception& e) {
LOGERR(1, "Couldn't start ZMQ reader: exception " << e.what());
PANIC_STOP();
}
}
api_update_network_stats();
get_miner_data();
m_startupFinished = true;
}
}
else {
@ -857,17 +866,20 @@ void p2pool::stratum_on_block()
void p2pool::get_info()
{
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_info\"}", m_params->m_rpcLogin, m_params->m_socks5Proxy,
const Params::Host host = current_host();
JSONRPCRequest::call(host.m_address, host.m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_info\"}", host.m_rpcLogin, m_params->m_socks5Proxy,
[this](const char* data, size_t size)
{
parse_get_info_rpc(data, size);
},
[this](const char* data, size_t size)
[this, host](const char* data, size_t size)
{
if (size > 0) {
LOGWARN(1, "get_info RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second");
LOGWARN(1, "get_info RPC request to " << host.m_displayName << " failed: error " << log::const_buf(data, size) << ", trying again in 1 second");
if (!m_stopped) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
switch_host();
get_info();
}
}
@ -970,17 +982,19 @@ void p2pool::parse_get_info_rpc(const char* data, size_t size)
void p2pool::get_version()
{
const Params::Host host = current_host();
const uint64_t t1 = microseconds_since_epoch();
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_version\"}", m_params->m_rpcLogin, m_params->m_socks5Proxy,
[this, t1](const char* data, size_t size)
JSONRPCRequest::call(host.m_address, host.m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_version\"}", host.m_rpcLogin, m_params->m_socks5Proxy,
[this, t1, host](const char* data, size_t size)
{
const double node_ping = static_cast<double>(microseconds_since_epoch() - t1) / 1e3;
if (node_ping < 100) {
LOGINFO(1, m_hostStr << " ping time is " << node_ping << " ms");
LOGINFO(1, host.m_displayName << " ping time is " << node_ping << " ms");
}
else {
LOGWARN(1, m_hostStr << " ping time is " << node_ping << " ms, this is too high for an efficient mining. Try to use a different node, or your own local node.");
LOGWARN(1, host.m_displayName << " ping time is " << node_ping << " ms, this is too high for an efficient mining. Try to use a different node, or your own local node.");
}
parse_get_version_rpc(data, size);
},
@ -1047,23 +1061,25 @@ void p2pool::parse_get_version_rpc(const char* data, size_t size)
get_miner_data();
}
void p2pool::get_miner_data()
void p2pool::get_miner_data(bool retry)
{
if (m_getMinerDataPending) {
return;
}
m_getMinerDataPending = true;
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_miner_data\"}", m_params->m_rpcLogin, m_params->m_socks5Proxy,
const Params::Host host = current_host();
JSONRPCRequest::call(host.m_address, host.m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_miner_data\"}", host.m_rpcLogin, m_params->m_socks5Proxy,
[this](const char* data, size_t size)
{
parse_get_miner_data_rpc(data, size);
},
[this](const char* data, size_t size)
[this, host, retry](const char* data, size_t size)
{
if (size > 0) {
LOGWARN(1, "get_miner_data RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second");
if (!m_stopped) {
LOGWARN(1, "get_miner_data RPC request to " << host.m_displayName << " failed: error " << log::const_buf(data, size) << (retry ? ", trying again in 1 second" : ""));
if (!m_stopped && retry) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
m_getMinerDataPending = false;
get_miner_data();
@ -1574,33 +1590,53 @@ void p2pool::stop()
bool p2pool::zmq_running() const
{
ReadLock lock(m_ZMQReaderLock);
return m_ZMQReader && m_ZMQReader->is_running();
}
void p2pool::restart_zmq()
Params::Host p2pool::switch_host()
{
// If p2pool is stopped, m_restartZMQAsync is most likely already closed
const Params::Host new_host = m_params->m_hosts[++m_currentHostIndex % m_params->m_hosts.size()];
{
WriteLock lock(m_currentHostLock);
m_currentHost = new_host;
}
return new_host;
}
void p2pool::reconnect_to_host()
{
// If p2pool is stopped, m_reconnectToHostAsync is most likely already closed
if (m_stopped) {
return;
}
if (!is_main_thread()) {
uv_async_send(&m_restartZMQAsync);
uv_async_send(&m_reconnectToHostAsync);
return;
}
get_miner_data();
const Params::Host new_host = switch_host();
delete m_ZMQReader;
WriteLock lock(m_ZMQReaderLock);
ZMQReader* old_reader = m_ZMQReader;
m_ZMQReader = nullptr;
delete old_reader;
try {
m_ZMQReader = new ZMQReader(m_params->m_host, m_params->m_zmqPort, m_params->m_socks5Proxy, this);
ZMQReader* new_reader = new ZMQReader(new_host.m_address, new_host.m_zmqPort, m_params->m_socks5Proxy, this);
m_zmqLastActive = seconds_since_epoch();
m_ZMQReader = new_reader;
}
catch (const std::exception& e) {
LOGERR(1, "Couldn't restart ZMQ reader: exception " << e.what());
}
if (m_ZMQReader) {
get_miner_data(false);
}
}
int p2pool::run()

View file

@ -18,11 +18,11 @@
#pragma once
#include "uv_util.h"
#include "params.h"
#include <map>
namespace p2pool {
struct Params;
class RandomX_Hasher_Base;
class BlockTemplate;
class Mempool;
@ -46,11 +46,18 @@ public:
bool stopped() const { return m_stopped; }
void stop();
const std::string& host_str() const { return m_hostStr; }
const Params& params() const { return *m_params; }
BlockTemplate& block_template() { return *m_blockTemplate; }
SideChain& side_chain() { return *m_sideChain; }
FORCEINLINE Params::Host current_host() const
{
ReadLock lock(m_currentHostLock);
return m_currentHost;
}
void print_hosts() const;
FORCEINLINE MinerData miner_data() const
{
ReadLock lock(m_minerDataLock);
@ -98,24 +105,31 @@ public:
bool zmq_running() const;
uint64_t zmq_last_active() const { return m_zmqLastActive; }
uint64_t start_time() const { return m_startTime; }
void restart_zmq();
void reconnect_to_host();
bool startup_finished() const { return m_startupFinished.load(); }
private:
p2pool(const p2pool&) = delete;
p2pool(p2pool&&) = delete;
Params::Host switch_host();
static void on_submit_block(uv_async_t* async) { reinterpret_cast<p2pool*>(async->data)->submit_block(); }
static void on_update_block_template(uv_async_t* async) { reinterpret_cast<p2pool*>(async->data)->update_block_template(); }
static void on_stop(uv_async_t*);
static void on_restart_zmq(uv_async_t* async) { reinterpret_cast<p2pool*>(async->data)->restart_zmq(); }
static void on_reconnect_to_host(uv_async_t* async) { reinterpret_cast<p2pool*>(async->data)->reconnect_to_host(); }
void submit_block() const;
std::atomic<bool> m_stopped;
std::string m_hostStr;
const Params* m_params;
mutable uv_rwlock_t m_currentHostLock;
Params::Host m_currentHost;
uint32_t m_currentHostIndex;
p2pool_api* m_api;
SideChain* m_sideChain;
RandomX_Hasher_Base* m_hasher;
@ -143,7 +157,7 @@ private:
void get_version();
void parse_get_version_rpc(const char* data, size_t size);
void get_miner_data();
void get_miner_data(bool retry = true);
void parse_get_miner_data_rpc(const char* data, size_t size);
bool parse_block_header(const char* data, size_t size, ChainMain& c);
@ -179,6 +193,8 @@ private:
StratumServer* m_stratumServer = nullptr;
P2PServer* m_p2pServer = nullptr;
std::atomic<bool> m_startupFinished{ false };
#ifdef WITH_RANDOMX
uv_mutex_t m_minerLock;
Miner* m_miner = nullptr;
@ -205,8 +221,9 @@ private:
std::atomic<uint64_t> m_zmqLastActive;
uint64_t m_startTime;
uv_async_t m_restartZMQAsync;
uv_async_t m_reconnectToHostAsync;
mutable uv_rwlock_t m_ZMQReaderLock;
ZMQReader* m_ZMQReader = nullptr;
hash m_getMinerDataHash;

View file

@ -20,6 +20,8 @@
#include "stratum_server.h"
#include "p2p_server.h"
constexpr char log_category_prefix[] = "P2Pool ";
void p2pool_usage();
namespace p2pool {
@ -30,17 +32,35 @@ Params::Params(int argc, char* const argv[])
bool ok = false;
if ((strcmp(argv[i], "--host") == 0) && (i + 1 < argc)) {
m_host = argv[++i];
const char* address = argv[++i];
if (m_hosts.empty()) {
m_hosts.emplace_back(Host());
m_hosts.back().m_address = address;
}
else {
const Host& h = m_hosts.back();
m_hosts.emplace_back(address, h.m_rpcPort, h.m_zmqPort, "");
}
ok = true;
}
if ((strcmp(argv[i], "--rpc-port") == 0) && (i + 1 < argc)) {
m_rpcPort = std::min(std::max(strtoul(argv[++i], nullptr, 10), 1UL), 65535UL);
if (m_hosts.empty()) {
m_hosts.emplace_back(Host());
}
m_hosts.back().m_rpcPort = std::min(std::max(strtoul(argv[++i], nullptr, 10), 1UL), 65535UL);
ok = true;
}
if ((strcmp(argv[i], "--zmq-port") == 0) && (i + 1 < argc)) {
m_zmqPort = std::min(std::max(strtoul(argv[++i], nullptr, 10), 1UL), 65535UL);
if (m_hosts.empty()) {
m_hosts.emplace_back(Host());
}
m_hosts.back().m_zmqPort = std::min(std::max(strtoul(argv[++i], nullptr, 10), 1UL), 65535UL);
ok = true;
}
@ -133,7 +153,11 @@ Params::Params(int argc, char* const argv[])
}
if ((strcmp(argv[i], "--rpc-login") == 0) && (i + 1 < argc)) {
m_rpcLogin = argv[++i];
if (m_hosts.empty()) {
m_hosts.emplace_back(Host());
}
m_hosts.back().m_rpcLogin = argv[++i];
ok = true;
}
@ -172,6 +196,21 @@ Params::Params(int argc, char* const argv[])
}
}
auto invalid_host = [](const Host& h)
{
if (!h.valid()) {
LOGERR(1, "Invalid host " << h.m_address << ':' << h.m_rpcPort << ":ZMQ:" << h.m_zmqPort << ". Try \"p2pool --help\".");
return true;
}
return false;
};
m_hosts.erase(std::remove_if(m_hosts.begin(), m_hosts.end(), invalid_host), m_hosts.end());
if (m_hosts.empty()) {
m_hosts.emplace_back(Host());
}
if (m_stratumAddresses.empty()) {
const int stratum_port = DEFAULT_STRATUM_PORT;
@ -185,7 +224,41 @@ Params::Params(int argc, char* const argv[])
bool Params::valid() const
{
return !m_host.empty() && m_rpcPort && m_zmqPort && m_wallet.valid();
if (!m_wallet.valid()) {
LOGERR(1, "Invalid wallet address. Try \"p2pool --help\".");
return false;
}
return true;
}
bool Params::Host::init_display_name(const Params& p)
{
m_displayName = m_address;
if (p.m_socks5Proxy.empty()) {
if (p.m_dns) {
bool is_v6;
if (!resolve_host(m_address, is_v6)) {
LOGERR(1, "resolve_host failed for " << m_address);
return false;
}
}
else if (m_address.find_first_not_of("0123456789.:") != std::string::npos) {
LOGERR(1, "Can't resolve hostname " << m_address << " with DNS disabled");
return false;
}
}
const bool changed = (m_address != m_displayName);
const std::string rpc_port = ':' + std::to_string(m_rpcPort);
const std::string zmq_port = ":ZMQ:" + std::to_string(m_zmqPort);
m_displayName += rpc_port + zmq_port;
if (changed) {
m_displayName += " (" + m_address + ')';
}
return true;
}
} // namespace p2pool

View file

@ -27,9 +27,32 @@ struct Params
bool valid() const;
std::string m_host = "127.0.0.1";
uint32_t m_rpcPort = 18081;
uint32_t m_zmqPort = 18083;
struct Host
{
Host() : m_address("127.0.0.1"), m_rpcPort(18081), m_zmqPort(18083) {}
Host(const char* address, uint32_t rpcPort, uint32_t zmqPort, const char* rpcLogin)
: m_address(address)
, m_rpcPort(rpcPort)
, m_zmqPort(zmqPort)
, m_rpcLogin(rpcLogin)
{}
bool valid() const { return !m_address.empty() && m_rpcPort && m_zmqPort && (m_rpcPort != m_zmqPort); }
bool init_display_name(const Params& p);
std::string m_address;
uint32_t m_rpcPort;
uint32_t m_zmqPort;
std::string m_rpcLogin;
std::string m_displayName;
};
std::vector<Host> m_hosts;
bool m_lightMode = false;
Wallet m_wallet{ nullptr };
std::string m_stratumAddresses;
@ -49,7 +72,6 @@ struct Params
uint32_t m_minerThreads = 0;
bool m_mini = false;
bool m_autoDiff = true;
std::string m_rpcLogin;
std::string m_socks5Proxy;
bool m_dns = true;
uint32_t m_p2pExternalPort = 0;

View file

@ -447,8 +447,9 @@ bool RandomX_Hasher_RPC::calculate(const void* data_ptr, size_t size, uint64_t h
volatile bool done = false;
const Params& params = m_pool->params();
const Params::Host host = m_pool->current_host();
JSONRPCRequest::call(params.m_host, params.m_rpcPort, buf, params.m_rpcLogin, params.m_socks5Proxy,
JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, params.m_socks5Proxy,
[&result, &h](const char* data, size_t size)
{
rapidjson::Document doc;

View file

@ -67,7 +67,7 @@ SideChain::SideChain(p2pool* pool, NetworkType type, const char* pool_name)
, m_unclePenalty(20)
, m_precalcFinished(false)
#ifdef DEV_TEST_SYNC
, m_synchronizedTime(0)
, m_firstPruneTime(0)
#endif
{
if (s_networkType == NetworkType::Invalid) {
@ -1022,7 +1022,7 @@ void SideChain::print_status(bool obtain_sidechain_lock) const
}
LOGINFO(0, "status" <<
"\nMonero node = " << m_pool->host_str() <<
"\nMonero node = " << m_pool->current_host().m_displayName <<
"\nMain chain height = " << m_pool->block_template().height() <<
"\nMain chain hashrate = " << log::Hashrate(network_hashrate) <<
"\nSide chain ID = " << (is_default() ? "default" : (is_mini() ? "mini" : m_consensusIdDisplayStr.c_str())) <<
@ -1753,9 +1753,6 @@ void SideChain::update_chain_tip(const PoolBlock* block)
// Also clear cache because it has data from all old blocks now
clear_crypto_cache();
LOGINFO(0, log::LightCyan() << "SYNCHRONIZED");
#ifdef DEV_TEST_SYNC
m_synchronizedTime = seconds_since_epoch();
#endif
}
}
prune_old_blocks();
@ -2064,7 +2061,14 @@ void SideChain::prune_old_blocks()
finish_precalc();
#ifdef DEV_TEST_SYNC
if (m_pool && m_precalcFinished.load() && (cur_time >= m_synchronizedTime + 120)) {
if (m_firstPruneTime == 0) {
m_firstPruneTime = seconds_since_epoch();
// Test Monero node switching
m_pool->reconnect_to_host();
}
if (m_pool && m_precalcFinished.load() && (cur_time >= m_firstPruneTime + 120)) {
LOGINFO(0, log::LightGreen() << "[DEV] Synchronization finished successfully, stopping P2Pool now");
print_status(false);
P2PServer* server = m_pool->p2p_server();
@ -2073,6 +2077,7 @@ void SideChain::prune_old_blocks()
server->print_bans();
server->show_peers_async();
}
m_pool->print_hosts();
m_pool->stop();
}
#endif

View file

@ -156,7 +156,7 @@ private:
std::atomic<bool> m_precalcFinished;
#ifdef DEV_TEST_SYNC
uint64_t m_synchronizedTime;
uint64_t m_firstPruneTime;
#endif
hash m_consensusHash;

View file

@ -25,7 +25,8 @@ static constexpr char log_category_prefix[] = "ZMQReader ";
namespace p2pool {
ZMQReader::ZMQReader(const std::string& address, uint32_t zmq_port, const std::string& proxy, MinerCallbackHandler* handler)
: m_address(address)
: m_monitor(nullptr)
, m_address(address)
, m_zmqPort(zmq_port)
, m_proxy(proxy)
, m_handler(handler)
@ -61,29 +62,30 @@ ZMQReader::ZMQReader(const std::string& address, uint32_t zmq_port, const std::s
m_subscriber.set(zmq::sockopt::connect_timeout, 1000);
std::string addr = "tcp://127.0.0.1:" + std::to_string(m_publisherPort);
if (!connect(addr, false)) {
throw zmq::error_t(EFSM);
}
if (!m_proxy.empty()) {
m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer(m_proxy.c_str(), m_proxy.length()));
}
std::string addr = "tcp://" + m_address + ':' + std::to_string(m_zmqPort);
if (!connect(addr)) {
addr = "tcp://" + m_address + ':' + std::to_string(m_zmqPort);
if (!connect(addr, true)) {
throw zmq::error_t(EFSM);
}
m_address = addr;
m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer());
addr = "tcp://127.0.0.1:" + std::to_string(m_publisherPort);
if (!connect(addr)) {
throw zmq::error_t(EFSM);
}
m_subscriber.set(zmq::sockopt::subscribe, "json-full-chain_main");
m_subscriber.set(zmq::sockopt::subscribe, "json-full-miner_data");
m_subscriber.set(zmq::sockopt::subscribe, "json-minimal-txpool_add");
const int err = uv_thread_create(&m_worker, run_wrapper, this);
if (err) {
LOGERR(1, "failed to start ZMQ thread, error " << uv_err_name(err));
LOGERR(1, "failed to start ZMQ worker thread, error " << uv_err_name(err));
throw zmq::error_t(EMTHREAD);
}
}
@ -92,18 +94,43 @@ ZMQReader::~ZMQReader()
{
LOGINFO(1, "stopping");
m_finished.exchange(true);
stop();
uv_thread_join(&m_worker);
delete m_monitor;
LOGINFO(1, "stopped");
}
void ZMQReader::stop()
{
if (m_stopped.exchange(true)) {
return;
}
try {
const char msg[] = "json-minimal-txpool_add:[]";
m_publisher.send(zmq::const_buffer(msg, sizeof(msg) - 1));
uv_thread_join(&m_worker);
static constexpr char dummy_msg[] = "json-minimal-txpool_add:[]";
m_publisher.send(zmq::const_buffer(dummy_msg, sizeof(dummy_msg) - 1));
}
catch (const std::exception& e) {
LOGERR(1, "exception " << e.what());
}
}
void ZMQReader::monitor_thread(void* arg)
{
LOGINFO(1, "monitor thread ready");
ZMQReader* r = reinterpret_cast<ZMQReader*>(arg);
do {} while (!r->m_stopped && r->m_monitor->m_connected && r->m_monitor->check_event(-1));
// If not connected anymore, shut down ZMQReader entirely
r->stop();
LOGINFO(1, "monitor thread stopped");
}
void ZMQReader::run_wrapper(void* arg)
{
reinterpret_cast<ZMQReader*>(arg)->run();
@ -112,8 +139,8 @@ void ZMQReader::run_wrapper(void* arg)
void ZMQReader::run()
{
m_threadRunning = true;
ON_SCOPE_LEAVE([this]() { m_threadRunning = false; });
m_workerThreadRunning = true;
ON_SCOPE_LEAVE([this]() { m_workerThreadRunning = false; });
zmq_msg_t message = {};
@ -123,6 +150,13 @@ void ZMQReader::run()
throw zmq::error_t(errno);
}
const int err = uv_thread_create(&m_monitorThread, monitor_thread, this);
if (err) {
LOGERR(1, "failed to start ZMQ monitor thread, error " << uv_err_name(err));
throw zmq::error_t(EMTHREAD);
}
ON_SCOPE_LEAVE([this]() { uv_thread_join(&m_monitorThread); });
LOGINFO(1, "worker thread ready");
do {
@ -131,7 +165,8 @@ void ZMQReader::run()
throw zmq::error_t(errno);
}
if (m_finished.load()) {
if (m_stopped) {
m_monitor->abort();
break;
}
@ -145,19 +180,20 @@ void ZMQReader::run()
zmq_msg_close(&message);
}
bool ZMQReader::connect(const std::string& address)
void ZMQReader::Monitor::on_event_connected(const zmq_event_t&, const char* address)
{
struct ConnectMonitor : public zmq::monitor_t
{
void on_event_connected(const zmq_event_t&, const char* address) ZMQ_OVERRIDE
{
LOGINFO(1, "connected to " << address);
connected = true;
}
LOGINFO(1, "connected to " << address);
m_connected = true;
}
bool connected = false;
} monitor;
void ZMQReader::Monitor::on_event_disconnected(const zmq_event_t&, const char* address)
{
LOGERR(1, "disconnected from " << address);
m_connected = false;
}
bool ZMQReader::connect(const std::string& address, bool keep_monitor)
{
static uint64_t id = 0;
if (!id) {
@ -165,7 +201,7 @@ bool ZMQReader::connect(const std::string& address)
id = (static_cast<uint64_t>(rd()) << 32) | static_cast<uint32_t>(rd());
}
char buf[log::Stream::BUF_SIZE + 1];
char buf[64];
log::Stream s(buf);
s << "inproc://p2pool-connect-mon-" << id << '\0';
++id;
@ -173,16 +209,25 @@ bool ZMQReader::connect(const std::string& address)
using namespace std::chrono;
const auto start_time = steady_clock::now();
monitor.init(m_subscriber, buf);
Monitor* monitor = new Monitor();
monitor->init(m_subscriber, buf);
m_subscriber.connect(address);
while (!monitor.connected && monitor.check_event(-1)) {
while (!monitor->m_connected && monitor->check_event(-1)) {
if (duration_cast<milliseconds>(steady_clock::now() - start_time).count() >= 1000) {
LOGERR(1, "failed to connect to " << address);
delete monitor;
return false;
}
}
if (keep_monitor) {
m_monitor = monitor;
}
else {
delete monitor;
}
return true;
}

View file

@ -27,12 +27,29 @@ public:
ZMQReader(const std::string& address, uint32_t zmq_port, const std::string& proxy, MinerCallbackHandler* handler);
~ZMQReader();
bool is_running() const { return m_threadRunning.load(); }
bool is_running() const { return m_workerThreadRunning.load(); }
private:
struct Monitor : public zmq::monitor_t {
Monitor() : m_connected(false) {}
Monitor(const Monitor&) = delete;
void on_event_connected(const zmq_event_t&, const char* address) ZMQ_OVERRIDE;
void on_event_disconnected(const zmq_event_t&, const char* address) ZMQ_OVERRIDE;
std::atomic<bool> m_connected;
} *m_monitor;
static void monitor_thread(void* arg);
uv_thread_t m_monitorThread{};
private:
void stop();
static void run_wrapper(void* arg);
void run();
bool connect(const std::string& address);
bool connect(const std::string& address, bool keep_monitor);
void parse(char* data, size_t size);
@ -46,8 +63,8 @@ private:
zmq::socket_t m_publisher{ m_context, ZMQ_PUB };
zmq::socket_t m_subscriber{ m_context, ZMQ_SUB };
uint16_t m_publisherPort = 0;
std::atomic<bool> m_finished{ false };
std::atomic<bool> m_threadRunning{ false };
std::atomic<bool> m_stopped{ false };
std::atomic<bool> m_workerThreadRunning{ false };
TxMempoolData m_tx;
MinerData m_minerData;