Get ping times for all hosts

This commit is contained in:
SChernykh 2023-06-19 17:54:22 +02:00
parent 3c41388eb7
commit 1b9d14e89d
6 changed files with 87 additions and 39 deletions

View file

@ -78,6 +78,9 @@ struct CurlContext
std::string m_error;
curl_slist* m_headers;
uint64_t m_startTime;
uint64_t m_connectedTime;
};
CurlContext::CurlContext(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop)
@ -90,6 +93,8 @@ CurlContext::CurlContext(const std::string& address, int port, const std::string
, m_handle(nullptr)
, m_req(req)
, m_headers(nullptr)
, m_startTime(0)
, m_connectedTime(0)
{
m_pollHandles.reserve(2);
@ -214,12 +219,17 @@ CurlContext::CurlContext(const std::string& address, int port, const std::string
uv_close(reinterpret_cast<uv_handle_t*>(&m_timer), nullptr);
throw std::runtime_error("curl_multi_add_handle failed");
}
m_startTime = microseconds_since_epoch();
}
CurlContext::~CurlContext()
{
double tcp_ping = 0.0;
if (m_error.empty() && !m_response.empty()) {
(*m_callback)(m_response.data(), m_response.size());
tcp_ping = static_cast<double>(m_connectedTime - m_startTime) / 1000.0;
(*m_callback)(m_response.data(), m_response.size(), tcp_ping);
}
delete m_callback;
@ -232,7 +242,7 @@ CurlContext::~CurlContext()
}
}
(*m_closeCallback)(m_error.c_str(), m_error.length());
(*m_closeCallback)(m_error.c_str(), m_error.length(), tcp_ping);
delete m_closeCallback;
curl_slist_free_all(m_headers);
@ -369,6 +379,8 @@ size_t CurlContext::on_write(const void* buffer, size_t size, size_t count)
void CurlContext::curl_perform(uv_poll_t* req, int status, int events)
{
CurlContext* ctx = reinterpret_cast<CurlContext*>(req->data);
int flags = 0;
if (status < 0) {
flags |= CURL_CSELECT_ERR;
@ -376,10 +388,13 @@ void CurlContext::curl_perform(uv_poll_t* req, int status, int events)
}
else {
if (events & UV_READABLE) flags |= CURL_CSELECT_IN;
if (events & UV_WRITABLE) flags |= CURL_CSELECT_OUT;
if (events & UV_WRITABLE) {
flags |= CURL_CSELECT_OUT;
if (!ctx->m_connectedTime) {
ctx->m_connectedTime = microseconds_since_epoch();
}
}
}
CurlContext* ctx = reinterpret_cast<CurlContext*>(req->data);
int running_handles = 0;
auto it = std::find_if(ctx->m_pollHandles.begin(), ctx->m_pollHandles.end(), [req](const auto& value) { return value.second == req; });
@ -471,7 +486,7 @@ void Call(const std::string& address, int port, const std::string& req, const st
}
catch (const std::exception& e) {
const char* msg = e.what();
(*close_cb)(msg, strlen(msg));
(*close_cb)(msg, strlen(msg), 0.0);
}
});

View file

@ -20,15 +20,15 @@
namespace p2pool {
namespace JSONRPCRequest {
typedef Callback<void, const char*, size_t>::Base CallbackBase;
typedef Callback<void, const char*, size_t, double>::Base CallbackBase;
void Call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop);
template<typename T, typename U>
FORCEINLINE void call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, T&& cb, U&& close_cb, uv_loop_t* loop = nullptr)
{
typedef Callback<void, const char*, size_t>::Derived<T> CallbackT;
typedef Callback<void, const char*, size_t>::Derived<U> CallbackU;
typedef Callback<void, const char*, size_t, double>::Derived<T> CallbackT;
typedef Callback<void, const char*, size_t, double>::Derived<U> CallbackU;
Call(address, port, req, auth, proxy, new CallbackT(std::move(cb)), new CallbackU(std::move(close_cb)), loop);
}

View file

@ -618,7 +618,7 @@ void P2PServer::load_monerod_peer_list()
const Params::Host host = m_pool->current_host();
JSONRPCRequest::call(host.m_address, host.m_rpcPort, "/get_peer_list", host.m_rpcLogin, m_socks5Proxy,
[this](const char* data, size_t size)
[this](const char* data, size_t size, double)
{
#define ERR_STR "/get_peer_list RPC request returned invalid JSON "
@ -685,7 +685,7 @@ void P2PServer::load_monerod_peer_list()
LOGINFO(4, "monerod peer list loaded (" << m_peerListMonero.size() << " peers)");
},
[](const char* data, size_t size)
[](const char* data, size_t size, double)
{
if (size > 0) {
LOGWARN(4, "/get_peer_list RPC request failed: error " << log::const_buf(data, size));

View file

@ -78,6 +78,8 @@ p2pool::p2pool(int argc, char* argv[])
m_currentHost = p->m_hosts.front();
m_currentHostIndex = 0;
m_hostPing.resize(p->m_hosts.size());
hash pub, sec, eph_public_key;
generate_keys(pub, sec);
@ -202,16 +204,43 @@ p2pool::~p2pool()
delete m_params;
}
void p2pool::update_host_ping(const std::string& display_name, double ping)
{
if (ping < 100) {
LOGINFO(1, display_name << " ping is " << ping << " ms");
}
else {
LOGWARN(1, display_name << " ping is " << ping << " ms, this is too high for an efficient mining. Try to use a different node, or your own local node.");
}
const std::vector<Params::Host>& v = m_params->m_hosts;
for (size_t i = 0, n = v.size(); i < n; ++i) {
if (v[i].m_displayName == display_name) {
m_hostPing[i] = ping;
return;
}
}
}
void p2pool::print_hosts() const
{
const Params::Host host = current_host();
for (const Params::Host& h : m_params->m_hosts) {
for (size_t i = 0, n = m_params->m_hosts.size(); i < n; ++i) {
const Params::Host& h = m_params->m_hosts[i];
char buf[64] = {};
if (m_hostPing[i] > 0.0) {
log::Stream s(buf);
s << " (" << m_hostPing[i] << " ms)";
}
if (h.m_displayName == host.m_displayName) {
LOGINFO(0, log::LightCyan() << "-> " << h.m_displayName);
LOGINFO(0, log::LightCyan() << "-> " << h.m_displayName << buf);
}
else {
LOGINFO(0, " " << h.m_displayName);
LOGINFO(0, " " << h.m_displayName << buf);
}
}
}
@ -368,14 +397,14 @@ void p2pool::handle_miner_data(MinerData& data)
s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << h << "}}\0";
JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, m_params->m_socks5Proxy,
[this, h](const char* data, size_t size)
[this, h](const char* data, size_t size, double)
{
ChainMain block;
if (!parse_block_header(data, size, block)) {
LOGERR(1, "couldn't download block header for height " << h);
}
},
[h](const char* data, size_t size)
[h](const char* data, size_t size, double)
{
if (size > 0) {
LOGERR(1, "couldn't download block header for height " << h << ", error " << log::const_buf(data, size));
@ -603,7 +632,7 @@ void p2pool::submit_block() const
const Params::Host host = current_host();
JSONRPCRequest::call(host.m_address, host.m_rpcPort, request, host.m_rpcLogin, m_params->m_socks5Proxy,
[height, diff, template_id, nonce, extra_nonce, sidechain_id, is_external](const char* data, size_t size)
[height, diff, template_id, nonce, extra_nonce, sidechain_id, is_external](const char* data, size_t size, double)
{
rapidjson::Document doc;
if (doc.Parse(data, size).HasParseError() || !doc.IsObject()) {
@ -647,7 +676,7 @@ void p2pool::submit_block() const
LOGWARN(0, "submit_block: daemon sent unrecognizable reply: " << log::const_buf(data, size));
},
[is_external](const char* data, size_t size)
[is_external](const char* data, size_t size, double)
{
if (size > 0) {
if (is_external) {
@ -722,7 +751,7 @@ void p2pool::download_block_headers(uint64_t current_height)
s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << height << "}}\0";
JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, m_params->m_socks5Proxy,
[this, prev_seed_height, height](const char* data, size_t size)
[this, prev_seed_height, height](const char* data, size_t size, double)
{
ChainMain block;
if (parse_block_header(data, size, block)) {
@ -736,7 +765,7 @@ void p2pool::download_block_headers(uint64_t current_height)
PANIC_STOP();
}
},
[height](const char* data, size_t size)
[height](const char* data, size_t size, double)
{
if (size > 0) {
LOGERR(1, "fatal error: couldn't download block header for seed height " << height << ", error " << log::const_buf(data, size));
@ -751,7 +780,7 @@ void p2pool::download_block_headers(uint64_t current_height)
s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_headers_range\",\"params\":{\"start_height\":" << start_height << ",\"end_height\":" << current_height - 1 << "}}\0";
JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, m_params->m_socks5Proxy,
[this, start_height, current_height, host](const char* data, size_t size)
[this, start_height, current_height, host](const char* data, size_t size, double)
{
if (parse_block_headers_range(data, size) == current_height - start_height) {
update_median_timestamp();
@ -779,6 +808,16 @@ void p2pool::download_block_headers(uint64_t current_height)
api_update_network_stats();
get_miner_data();
// Get ping times for all other hosts
for (const Params::Host& h : m_params->m_hosts) {
const std::string& name = h.m_displayName;
if (name != host.m_displayName) {
JSONRPCRequest::call(h.m_address, h.m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_version\"}", h.m_rpcLogin, m_params->m_socks5Proxy,
[this, name](const char*, size_t, double tcp_ping) { update_host_ping(name, tcp_ping); },
[](const char*, size_t, double) {});
}
}
m_startupFinished = true;
}
}
@ -787,7 +826,7 @@ void p2pool::download_block_headers(uint64_t current_height)
download_block_headers(current_height);
}
},
[this, start_height, current_height](const char* data, size_t size)
[this, start_height, current_height](const char* data, size_t size, double)
{
if (size > 0) {
LOGERR(1, "Couldn't download block headers for heights " << start_height << " - " << current_height - 1 << ", error " << log::const_buf(data, size));
@ -869,11 +908,11 @@ void p2pool::get_info()
const Params::Host host = current_host();
JSONRPCRequest::call(host.m_address, host.m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_info\"}", host.m_rpcLogin, m_params->m_socks5Proxy,
[this](const char* data, size_t size)
[this](const char* data, size_t size, double)
{
parse_get_info_rpc(data, size);
},
[this, host](const char* data, size_t size)
[this, host](const char* data, size_t size, double)
{
if (size > 0) {
LOGWARN(1, "get_info RPC request to " << host.m_displayName << " failed: error " << log::const_buf(data, size) << ", trying again in 1 second");
@ -984,21 +1023,12 @@ void p2pool::get_version()
{
const Params::Host host = current_host();
const uint64_t t1 = microseconds_since_epoch();
JSONRPCRequest::call(host.m_address, host.m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_version\"}", host.m_rpcLogin, m_params->m_socks5Proxy,
[this, t1, host](const char* data, size_t size)
[this, host](const char* data, size_t size, double)
{
const double node_ping = static_cast<double>(microseconds_since_epoch() - t1) / 1e3;
if (node_ping < 100) {
LOGINFO(1, host.m_displayName << " ping time is " << node_ping << " ms");
}
else {
LOGWARN(1, host.m_displayName << " ping time is " << node_ping << " ms, this is too high for an efficient mining. Try to use a different node, or your own local node.");
}
parse_get_version_rpc(data, size);
},
[this](const char* data, size_t size)
[this](const char* data, size_t size, double)
{
if (size > 0) {
LOGWARN(1, "get_version RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second");
@ -1071,11 +1101,12 @@ void p2pool::get_miner_data(bool retry)
const Params::Host host = current_host();
JSONRPCRequest::call(host.m_address, host.m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_miner_data\"}", host.m_rpcLogin, m_params->m_socks5Proxy,
[this](const char* data, size_t size)
[this, host](const char* data, size_t size, double tcp_ping)
{
parse_get_miner_data_rpc(data, size);
update_host_ping(host.m_displayName, tcp_ping);
},
[this, host, retry](const char* data, size_t size)
[this, host, retry](const char* data, size_t size, double)
{
if (size > 0) {
LOGWARN(1, "get_miner_data RPC request to " << host.m_displayName << " failed: error " << log::const_buf(data, size) << (retry ? ", trying again in 1 second" : ""));

View file

@ -56,6 +56,7 @@ public:
return m_currentHost;
}
void update_host_ping(const std::string& display_name, double ping);
void print_hosts() const;
FORCEINLINE MinerData miner_data() const
@ -125,6 +126,7 @@ private:
std::atomic<bool> m_stopped;
const Params* m_params;
std::vector<double> m_hostPing;
mutable uv_rwlock_t m_currentHostLock;
Params::Host m_currentHost;

View file

@ -450,7 +450,7 @@ bool RandomX_Hasher_RPC::calculate(const void* data_ptr, size_t size, uint64_t h
const Params::Host host = m_pool->current_host();
JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, params.m_socks5Proxy,
[&result, &h](const char* data, size_t size)
[&result, &h](const char* data, size_t size, double)
{
rapidjson::Document doc;
if (doc.Parse(data, size).HasParseError() || !parseValue(doc, "result", h)) {
@ -460,7 +460,7 @@ bool RandomX_Hasher_RPC::calculate(const void* data_ptr, size_t size, uint64_t h
}
result = 1;
},
[this, &result, &done](const char* data, size_t size)
[this, &result, &done](const char* data, size_t size, double)
{
if (size > 0) {
LOGWARN(3, "RPC calc_pow: server returned error " << log::const_buf(data, size));