mirror of
https://github.com/SChernykh/p2pool.git
synced 2025-01-24 03:05:57 +00:00
Merge branch 'proxy'
This commit is contained in:
commit
c49e8d4770
25 changed files with 571 additions and 305 deletions
8
.github/workflows/c-cpp.yml
vendored
8
.github/workflows/c-cpp.yml
vendored
|
@ -70,7 +70,7 @@ jobs:
|
|||
run: |
|
||||
cd external/src/curl
|
||||
autoreconf -fi
|
||||
./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-proxy --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares
|
||||
./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares
|
||||
make -j$(nproc)
|
||||
|
||||
- name: Build libuv
|
||||
|
@ -139,7 +139,7 @@ jobs:
|
|||
run: |
|
||||
cd external/src/curl
|
||||
autoreconf -fi
|
||||
./configure --host=aarch64-linux-gnu --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-proxy --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares
|
||||
./configure --host=aarch64-linux-gnu --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares
|
||||
make -j$(nproc)
|
||||
|
||||
- name: Build libuv
|
||||
|
@ -195,7 +195,7 @@ jobs:
|
|||
run: |
|
||||
cd external/src/curl
|
||||
autoreconf -fi
|
||||
./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-proxy --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares
|
||||
./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares
|
||||
make -j$(nproc)
|
||||
|
||||
- name: Build libuv
|
||||
|
@ -307,7 +307,7 @@ jobs:
|
|||
run: |
|
||||
cd external/src/curl
|
||||
autoreconf -fi
|
||||
./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-proxy --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares
|
||||
./configure --without-ssl --without-hyper --without-zlib --without-brotli --without-zstd --without-default-ssl-backend --without-ca-bundle --without-ca-path --without-ca-fallback --without-libpsl --without-libgsasl --without-librtmp --without-winidn --without-libidn2 --without-nghttp2 --without-ngtcp2 --without-nghttp3 --without-quiche --without-msh3 --without-zsh-functions-dir --without-fish-functions-dir --disable-ftp --disable-file --disable-ldap --disable-ldaps --disable-rtsp --disable-dict --disable-telnet --disable-tftp --disable-pop3 --disable-imap --disable-smb --disable-smtp --disable-gopher --disable-mqtt --disable-manual --disable-ntlm --disable-ntlm-wb --disable-tls-srp --disable-unix-sockets --disable-cookies --disable-socketpair --disable-doh --disable-dateparse --disable-netrc --disable-progress-meter --disable-dnsshuffle --disable-hsts --disable-alt-svc --disable-ares
|
||||
make -j3
|
||||
|
||||
- name: Build libuv
|
||||
|
|
6
.github/workflows/test-sync.yml
vendored
6
.github/workflows/test-sync.yml
vendored
|
@ -30,7 +30,7 @@ jobs:
|
|||
timeout-minutes: 15
|
||||
run: |
|
||||
cd build
|
||||
./p2pool --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 4
|
||||
./p2pool --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 6
|
||||
|
||||
- name: Archive p2pool.log
|
||||
uses: actions/upload-artifact@v2
|
||||
|
@ -62,7 +62,7 @@ jobs:
|
|||
timeout-minutes: 15
|
||||
run: |
|
||||
cd build
|
||||
./p2pool --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 4
|
||||
./p2pool --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 6
|
||||
|
||||
- name: Archive p2pool.log
|
||||
uses: actions/upload-artifact@v2
|
||||
|
@ -94,7 +94,7 @@ jobs:
|
|||
timeout-minutes: 15
|
||||
run: |
|
||||
cd build/Release
|
||||
./p2pool.exe --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 4
|
||||
./p2pool.exe --host p2pmd.xmrvsbeast.com --zmq-port 18084 --wallet 44MnN1f3Eto8DZYUWuE5XZNUtE3vcRzt2j6PzqWpPau34e6Cf4fAxt6X2MBmrm6F9YMEiMNjN6W4Shn4pLcfNAja621jwyg --no-cache --loglevel 6
|
||||
|
||||
- name: Archive p2pool.log
|
||||
uses: actions/upload-artifact@v2
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
--mini Connect to p2pool-mini sidechain. Note that it will also change default p2p port from 37889 to 37888
|
||||
--no-autodiff Disable automatic difficulty adjustment for miners connected to stratum
|
||||
--rpc-login Specify username[:password] required for Monero RPC server
|
||||
--socks5 Specify IP:port of a SOCKS5 proxy to use for outgoing connections
|
||||
```
|
||||
|
||||
### Example command line
|
||||
|
|
|
@ -910,11 +910,12 @@ void BlockTemplate::calc_merkle_tree_main_branch()
|
|||
}
|
||||
}
|
||||
|
||||
bool BlockTemplate::get_difficulties(const uint32_t template_id, difficulty_type& mainchain_difficulty, difficulty_type& sidechain_difficulty) const
|
||||
bool BlockTemplate::get_difficulties(const uint32_t template_id, uint64_t& height, difficulty_type& mainchain_difficulty, difficulty_type& sidechain_difficulty) const
|
||||
{
|
||||
ReadLock lock(m_lock);
|
||||
|
||||
if (template_id == m_templateId) {
|
||||
height = m_height;
|
||||
mainchain_difficulty = m_difficulty;
|
||||
sidechain_difficulty = m_poolBlockTemplate->m_difficulty;
|
||||
return true;
|
||||
|
@ -923,7 +924,7 @@ bool BlockTemplate::get_difficulties(const uint32_t template_id, difficulty_type
|
|||
const BlockTemplate* old = m_oldTemplates[template_id % array_size(&BlockTemplate::m_oldTemplates)];
|
||||
|
||||
if (old && (template_id == old->m_templateId)) {
|
||||
return old->get_difficulties(template_id, mainchain_difficulty, sidechain_difficulty);
|
||||
return old->get_difficulties(template_id, height, mainchain_difficulty, sidechain_difficulty);
|
||||
}
|
||||
|
||||
return false;
|
||||
|
|
|
@ -40,7 +40,7 @@ public:
|
|||
|
||||
void update(const MinerData& data, const Mempool& mempool, Wallet* miner_wallet);
|
||||
|
||||
bool get_difficulties(const uint32_t template_id, difficulty_type& mainchain_difficulty, difficulty_type& sidechain_difficulty) const;
|
||||
bool get_difficulties(const uint32_t template_id, uint64_t& height, difficulty_type& mainchain_difficulty, difficulty_type& sidechain_difficulty) const;
|
||||
uint32_t get_hashing_blob(const uint32_t template_id, uint32_t extra_nonce, uint8_t (&blob)[128], uint64_t& height, difficulty_type& difficulty, difficulty_type& sidechain_difficulty, hash& seed_hash, size_t& nonce_offset) const;
|
||||
|
||||
uint32_t get_hashing_blob(uint32_t extra_nonce, uint8_t (&blob)[128], uint64_t& height, difficulty_type& difficulty, difficulty_type& sidechain_difficulty, hash& seed_hash, size_t& nonce_offset, uint32_t& template_id) const;
|
||||
|
|
|
@ -161,21 +161,21 @@ static void do_loglevel(p2pool * /* m_pool */, const char *args)
|
|||
static void do_addpeers(p2pool *m_pool, const char *args)
|
||||
{
|
||||
if (m_pool->p2p_server()) {
|
||||
m_pool->p2p_server()->connect_to_peers(args);
|
||||
m_pool->p2p_server()->connect_to_peers_async(args);
|
||||
}
|
||||
}
|
||||
|
||||
static void do_droppeers(p2pool *m_pool, const char * /* args */)
|
||||
{
|
||||
if (m_pool->p2p_server()) {
|
||||
m_pool->p2p_server()->drop_connections();
|
||||
m_pool->p2p_server()->drop_connections_async();
|
||||
}
|
||||
}
|
||||
|
||||
static void do_showpeers(p2pool* m_pool, const char* /* args */)
|
||||
{
|
||||
if (m_pool->p2p_server()) {
|
||||
m_pool->p2p_server()->show_peers();
|
||||
m_pool->p2p_server()->show_peers_async();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ namespace JSONRPCRequest {
|
|||
|
||||
struct CurlContext
|
||||
{
|
||||
CurlContext(const std::string& address, int port, const std::string& req, const std::string& auth, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop);
|
||||
CurlContext(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop);
|
||||
~CurlContext();
|
||||
|
||||
static int socket_func(CURL* easy, curl_socket_t s, int action, void* userp, void* socketp)
|
||||
|
@ -73,7 +73,6 @@ struct CurlContext
|
|||
|
||||
std::string m_url;
|
||||
std::string m_req;
|
||||
std::string m_auth;
|
||||
|
||||
std::vector<char> m_response;
|
||||
std::string m_error;
|
||||
|
@ -81,7 +80,7 @@ struct CurlContext
|
|||
curl_slist* m_headers;
|
||||
};
|
||||
|
||||
CurlContext::CurlContext(const std::string& address, int port, const std::string& req, const std::string& auth, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop)
|
||||
CurlContext::CurlContext(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop)
|
||||
: m_callback(cb)
|
||||
, m_closeCallback(close_cb)
|
||||
, m_loop(loop)
|
||||
|
@ -90,7 +89,6 @@ CurlContext::CurlContext(const std::string& address, int port, const std::string
|
|||
, m_multiHandle(nullptr)
|
||||
, m_handle(nullptr)
|
||||
, m_req(req)
|
||||
, m_auth(auth)
|
||||
, m_headers(nullptr)
|
||||
{
|
||||
m_pollHandles.reserve(2);
|
||||
|
@ -176,19 +174,31 @@ CurlContext::CurlContext(const std::string& address, int port, const std::string
|
|||
curl_easy_setopt_checked(m_handle, CURLOPT_WRITEFUNCTION, write_func);
|
||||
curl_easy_setopt_checked(m_handle, CURLOPT_WRITEDATA, this);
|
||||
|
||||
const int timeout = proxy.empty() ? 1 : 5;
|
||||
|
||||
curl_easy_setopt_checked(m_handle, CURLOPT_URL, m_url.c_str());
|
||||
curl_easy_setopt_checked(m_handle, CURLOPT_POSTFIELDS, m_req.c_str());
|
||||
curl_easy_setopt_checked(m_handle, CURLOPT_CONNECTTIMEOUT, 1);
|
||||
curl_easy_setopt_checked(m_handle, CURLOPT_TIMEOUT, 10);
|
||||
curl_easy_setopt_checked(m_handle, CURLOPT_CONNECTTIMEOUT, timeout);
|
||||
curl_easy_setopt_checked(m_handle, CURLOPT_TIMEOUT, timeout * 10);
|
||||
|
||||
m_headers = curl_slist_append(m_headers, "Content-Type: application/json");
|
||||
if (m_headers) {
|
||||
curl_easy_setopt_checked(m_handle, CURLOPT_HTTPHEADER, m_headers);
|
||||
}
|
||||
|
||||
if (!m_auth.empty()) {
|
||||
if (!auth.empty()) {
|
||||
curl_easy_setopt_checked(m_handle, CURLOPT_HTTPAUTH, CURLAUTH_DIGEST | CURLAUTH_ONLY);
|
||||
curl_easy_setopt_checked(m_handle, CURLOPT_USERPWD, m_auth.c_str());
|
||||
curl_easy_setopt_checked(m_handle, CURLOPT_USERPWD, auth.c_str());
|
||||
}
|
||||
|
||||
if (!proxy.empty()) {
|
||||
if (is_localhost(address)) {
|
||||
LOGINFO(5, "not using proxy to connect to localhost address " << log::Gray() << address);
|
||||
}
|
||||
else {
|
||||
curl_easy_setopt_checked(m_handle, CURLOPT_PROXYTYPE, CURLPROXY_SOCKS5_HOSTNAME);
|
||||
curl_easy_setopt_checked(m_handle, CURLOPT_PROXY, proxy.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
CURLMcode curl_err = curl_multi_add_handle(m_multiHandle, m_handle);
|
||||
|
@ -443,7 +453,7 @@ void CurlContext::shutdown()
|
|||
}
|
||||
}
|
||||
|
||||
void Call(const std::string& address, int port, const std::string& req, const std::string& auth, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop)
|
||||
void Call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop)
|
||||
{
|
||||
if (!loop) {
|
||||
loop = uv_default_loop();
|
||||
|
@ -453,7 +463,7 @@ void Call(const std::string& address, int port, const std::string& req, const st
|
|||
[=]()
|
||||
{
|
||||
try {
|
||||
new CurlContext(address, port, req, auth, cb, close_cb, loop);
|
||||
new CurlContext(address, port, req, auth, proxy, cb, close_cb, loop);
|
||||
}
|
||||
catch (const std::exception& e) {
|
||||
const char* msg = e.what();
|
||||
|
|
|
@ -37,12 +37,12 @@ private:
|
|||
T m_cb;
|
||||
};
|
||||
|
||||
void Call(const std::string& address, int port, const std::string& req, const std::string& auth, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop);
|
||||
void Call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop);
|
||||
|
||||
template<typename T, typename U>
|
||||
FORCEINLINE void call(const std::string& address, int port, const std::string& req, const std::string& auth, T&& cb, U&& close_cb, uv_loop_t* loop = nullptr)
|
||||
FORCEINLINE void call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, T&& cb, U&& close_cb, uv_loop_t* loop = nullptr)
|
||||
{
|
||||
Call(address, port, req, auth, new Callback<T>(std::move(cb)), new Callback<U>(std::move(close_cb)), loop);
|
||||
Call(address, port, req, auth, proxy, new Callback<T>(std::move(cb)), new Callback<U>(std::move(close_cb)), loop);
|
||||
}
|
||||
|
||||
} // namespace JSONRPCRequest
|
||||
|
|
|
@ -48,6 +48,7 @@ void p2pool_usage()
|
|||
"--mini Connect to p2pool-mini sidechain. Note that it will also change default p2p port from %d to %d\n"
|
||||
"--no-autodiff Disable automatic difficulty adjustment for miners connected to stratum\n"
|
||||
"--rpc-login Specify username[:password] required for Monero RPC server\n"
|
||||
"--socks5 Specify IP:port of a SOCKS5 proxy to use for outgoing connections\n"
|
||||
"--help Show this help message\n\n"
|
||||
"Example command line:\n\n"
|
||||
"%s --host 127.0.0.1 --rpc-port 18081 --zmq-port 18083 --wallet YOUR_WALLET_ADDRESS --stratum 0.0.0.0:%d --p2p 0.0.0.0:%d\n\n",
|
||||
|
|
|
@ -68,6 +68,19 @@ P2PServer::P2PServer(p2pool* pool)
|
|||
|
||||
const Params& params = pool->params();
|
||||
|
||||
if (!params.m_socks5Proxy.empty()) {
|
||||
parse_address_list(params.m_socks5Proxy,
|
||||
[this](bool is_v6, const std::string& /*address*/, const std::string& ip, int port)
|
||||
{
|
||||
if (!str_to_ip(is_v6, ip.c_str(), m_socks5ProxyIP)) {
|
||||
panic();
|
||||
}
|
||||
m_socks5ProxyV6 = is_v6;
|
||||
m_socks5ProxyPort = port;
|
||||
});
|
||||
m_socks5Proxy = params.m_socks5Proxy;
|
||||
}
|
||||
|
||||
set_max_outgoing_peers(params.m_maxOutgoingPeers);
|
||||
set_max_incoming_peers(params.m_maxIncomingPeers);
|
||||
|
||||
|
@ -77,6 +90,7 @@ P2PServer::P2PServer(p2pool* pool)
|
|||
uv_mutex_init_checked(&m_broadcastLock);
|
||||
uv_mutex_init_checked(&m_missingBlockRequestsLock);
|
||||
uv_rwlock_init_checked(&m_cachedBlocksLock);
|
||||
uv_mutex_init_checked(&m_connectToPeersLock);
|
||||
|
||||
int err = uv_async_init(&m_loop, &m_broadcastAsync, on_broadcast);
|
||||
if (err) {
|
||||
|
@ -86,6 +100,20 @@ P2PServer::P2PServer(p2pool* pool)
|
|||
m_broadcastAsync.data = this;
|
||||
m_broadcastQueue.reserve(2);
|
||||
|
||||
err = uv_async_init(&m_loop, &m_connectToPeersAsync, on_connect_to_peers);
|
||||
if (err) {
|
||||
LOGERR(1, "uv_async_init failed, error " << uv_err_name(err));
|
||||
panic();
|
||||
}
|
||||
m_connectToPeersAsync.data = this;
|
||||
|
||||
err = uv_async_init(&m_loop, &m_showPeersAsync, on_show_peers);
|
||||
if (err) {
|
||||
LOGERR(1, "uv_async_init failed, error " << uv_err_name(err));
|
||||
panic();
|
||||
}
|
||||
m_showPeersAsync.data = this;
|
||||
|
||||
err = uv_timer_init(&m_loop, &m_timer);
|
||||
if (err) {
|
||||
LOGERR(1, "failed to create timer, error " << uv_err_name(err));
|
||||
|
@ -114,6 +142,8 @@ P2PServer::~P2PServer()
|
|||
uv_timer_stop(&m_timer);
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_timer), nullptr);
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_broadcastAsync), nullptr);
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_connectToPeersAsync), nullptr);
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_showPeersAsync), nullptr);
|
||||
|
||||
shutdown_tcp();
|
||||
|
||||
|
@ -126,6 +156,8 @@ P2PServer::~P2PServer()
|
|||
clear_cached_blocks();
|
||||
uv_rwlock_destroy(&m_cachedBlocksLock);
|
||||
|
||||
uv_mutex_destroy(&m_connectToPeersLock);
|
||||
|
||||
delete m_block;
|
||||
delete m_cache;
|
||||
|
||||
|
@ -179,12 +211,42 @@ void P2PServer::store_in_cache(const PoolBlock& block)
|
|||
}
|
||||
}
|
||||
|
||||
void P2PServer::connect_to_peers_async(const char* peer_list)
|
||||
{
|
||||
{
|
||||
MutexLock lock(m_connectToPeersLock);
|
||||
if (!m_connectToPeersData.empty()) {
|
||||
m_connectToPeersData.append(1, ',');
|
||||
}
|
||||
m_connectToPeersData.append(peer_list);
|
||||
}
|
||||
|
||||
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_connectToPeersAsync))) {
|
||||
uv_async_send(&m_connectToPeersAsync);
|
||||
}
|
||||
}
|
||||
|
||||
void P2PServer::on_connect_to_peers(uv_async_t* handle)
|
||||
{
|
||||
P2PServer* server = reinterpret_cast<P2PServer*>(handle->data);
|
||||
|
||||
std::string peer_list;
|
||||
{
|
||||
MutexLock lock(server->m_connectToPeersLock);
|
||||
peer_list = std::move(server->m_connectToPeersData);
|
||||
}
|
||||
|
||||
if (!peer_list.empty()) {
|
||||
server->connect_to_peers(peer_list);
|
||||
}
|
||||
}
|
||||
|
||||
void P2PServer::connect_to_peers(const std::string& peer_list)
|
||||
{
|
||||
parse_address_list(peer_list,
|
||||
[this](bool is_v6, const std::string& /*address*/, std::string ip, int port)
|
||||
{
|
||||
if (resolve_host(ip, is_v6)) {
|
||||
if (!m_socks5Proxy.empty() || resolve_host(ip, is_v6)) {
|
||||
connect_to_peer(is_v6, ip.c_str(), port);
|
||||
}
|
||||
});
|
||||
|
@ -219,7 +281,7 @@ void P2PServer::update_peer_connections()
|
|||
connected_clients.reserve(m_numConnections);
|
||||
for (P2PClient* client = static_cast<P2PClient*>(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
|
||||
const int timeout = client->m_handshakeComplete ? 300 : 10;
|
||||
if (cur_time >= client->m_lastAlive + timeout) {
|
||||
if ((cur_time >= client->m_lastAlive + timeout) && (client->m_socks5ProxyState == Client::Socks5ProxyState::Default)) {
|
||||
const uint64_t idle_time = static_cast<uint64_t>(cur_time - client->m_lastAlive);
|
||||
LOGWARN(5, "peer " << static_cast<char*>(client->m_addrString) << " has been idle for " << idle_time << " seconds, disconnecting");
|
||||
client->close();
|
||||
|
@ -507,29 +569,10 @@ void P2PServer::load_peer_list()
|
|||
[this](bool is_v6, const std::string& /*address*/, const std::string& ip, int port)
|
||||
{
|
||||
Peer p;
|
||||
if (is_v6) {
|
||||
sockaddr_in6 addr6;
|
||||
const int err = uv_ip6_addr(ip.c_str(), port, &addr6);
|
||||
if (err) {
|
||||
LOGERR(1, "failed to parse IPv6 address " << ip << ", error " << uv_err_name(err));
|
||||
if (!str_to_ip(is_v6, ip.c_str(), p.m_addr)) {
|
||||
return;
|
||||
}
|
||||
p.m_isV6 = true;
|
||||
memcpy(p.m_addr.data, &addr6.sin6_addr, sizeof(in6_addr));
|
||||
}
|
||||
else {
|
||||
sockaddr_in addr4;
|
||||
const int err = uv_ip4_addr(ip.c_str(), port, &addr4);
|
||||
if (err) {
|
||||
LOGERR(1, "failed to parse IPv4 address " << ip << ", error " << uv_err_name(err));
|
||||
return;
|
||||
}
|
||||
p.m_isV6 = false;
|
||||
p.m_addr = {};
|
||||
p.m_addr.data[10] = 0xFF;
|
||||
p.m_addr.data[11] = 0xFF;
|
||||
memcpy(p.m_addr.data + 12, &addr4.sin_addr, sizeof(in_addr));
|
||||
}
|
||||
p.m_isV6 = is_v6;
|
||||
|
||||
bool already_added = false;
|
||||
for (const Peer& peer : m_peerList) {
|
||||
|
@ -555,7 +598,7 @@ void P2PServer::load_monerod_peer_list()
|
|||
{
|
||||
const Params& params = m_pool->params();
|
||||
|
||||
JSONRPCRequest::call(params.m_host, params.m_rpcPort, "/get_peer_list", params.m_rpcLogin,
|
||||
JSONRPCRequest::call(params.m_host, params.m_rpcPort, "/get_peer_list", params.m_rpcLogin, m_socks5Proxy,
|
||||
[this](const char* data, size_t size)
|
||||
{
|
||||
#define ERR_STR "/get_peer_list RPC request returned invalid JSON "
|
||||
|
@ -604,28 +647,11 @@ void P2PServer::load_monerod_peer_list()
|
|||
|
||||
Peer p;
|
||||
p.m_lastSeen = last_seen;
|
||||
p.m_isV6 = (strchr(ip, ':') != 0);
|
||||
|
||||
if (strchr(ip, ':')) {
|
||||
sockaddr_in6 addr6;
|
||||
const int err = uv_ip6_addr(ip, port, &addr6);
|
||||
if (err) {
|
||||
if (!str_to_ip(p.m_isV6, ip, p.m_addr)) {
|
||||
continue;
|
||||
}
|
||||
p.m_isV6 = true;
|
||||
memcpy(p.m_addr.data, &addr6.sin6_addr, sizeof(in6_addr));
|
||||
}
|
||||
else {
|
||||
sockaddr_in addr4;
|
||||
const int err = uv_ip4_addr(ip, port, &addr4);
|
||||
if (err) {
|
||||
continue;
|
||||
}
|
||||
p.m_isV6 = false;
|
||||
p.m_addr = {};
|
||||
p.m_addr.data[10] = 0xFF;
|
||||
p.m_addr.data[11] = 0xFF;
|
||||
memcpy(p.m_addr.data + 12, &addr4.sin_addr, sizeof(in_addr));
|
||||
}
|
||||
|
||||
p.m_port = port;
|
||||
p.m_numFailedConnections = 0;
|
||||
|
@ -635,8 +661,8 @@ void P2PServer::load_monerod_peer_list()
|
|||
}
|
||||
}
|
||||
|
||||
// Put recently active peers first in the list
|
||||
std::sort(m_peerListMonero.begin(), m_peerListMonero.end(), [](const Peer& a, const Peer& b) { return a.m_lastSeen > b.m_lastSeen; });
|
||||
// Put recently active peers last in the list (it will be scanned backwards)
|
||||
std::sort(m_peerListMonero.begin(), m_peerListMonero.end(), [](const Peer& a, const Peer& b) { return a.m_lastSeen < b.m_lastSeen; });
|
||||
|
||||
LOGINFO(4, "monerod peer list loaded (" << m_peerListMonero.size() << " peers)");
|
||||
},
|
||||
|
@ -793,7 +819,7 @@ void P2PServer::on_broadcast()
|
|||
MutexLock lock(m_clientsListLock);
|
||||
|
||||
for (P2PClient* client = static_cast<P2PClient*>(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
|
||||
if (!client->m_handshakeComplete || !client->m_handshakeSolutionSent) {
|
||||
if (!client->is_good()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -875,6 +901,13 @@ void P2PServer::print_status()
|
|||
);
|
||||
}
|
||||
|
||||
void P2PServer::show_peers_async()
|
||||
{
|
||||
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_showPeersAsync))) {
|
||||
uv_async_send(&m_showPeersAsync);
|
||||
}
|
||||
}
|
||||
|
||||
void P2PServer::show_peers()
|
||||
{
|
||||
MutexLock lock(m_clientsListLock);
|
||||
|
@ -1133,14 +1166,16 @@ bool P2PServer::P2PClient::on_connect()
|
|||
return false;
|
||||
}
|
||||
|
||||
// Don't allow multiple connections to/from the same IP
|
||||
// Don't allow multiple connections to/from the same IP (except localhost)
|
||||
// server->m_clientsListLock is already locked here
|
||||
if (!m_addr.is_localhost()) {
|
||||
for (P2PClient* client = static_cast<P2PClient*>(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
|
||||
if ((client != this) && (client->m_addr == m_addr)) {
|
||||
LOGINFO(5, "peer " << static_cast<char*>(m_addrString) << " is already connected as " << static_cast<char*>(client->m_addrString));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
m_lastAlive = seconds_since_epoch();
|
||||
return send_handshake_challenge();
|
||||
|
@ -1891,7 +1926,7 @@ bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*)
|
|||
uint32_t n = 0;
|
||||
|
||||
for (P2PClient* client = static_cast<P2PClient*>(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast<P2PClient*>(client->m_next)) {
|
||||
if ((client->m_listenPort < 0) || (client->m_addr == m_addr)) {
|
||||
if (!client->is_good() || (client->m_addr == m_addr)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -51,6 +51,7 @@ public:
|
|||
void clear_cached_blocks();
|
||||
void store_in_cache(const PoolBlock& block);
|
||||
|
||||
void connect_to_peers_async(const char* peer_list);
|
||||
void connect_to_peers(const std::string& peer_list);
|
||||
void on_connect_failed(bool is_v6, const raw_ip& ip, int port) override;
|
||||
|
||||
|
@ -134,7 +135,7 @@ public:
|
|||
uint64_t get_peerId() const { return m_peerId; }
|
||||
|
||||
void print_status() override;
|
||||
void show_peers();
|
||||
void show_peers_async();
|
||||
size_t peer_list_size() const { MutexLock lock(m_peerListLock); return m_peerList.size(); }
|
||||
|
||||
uint32_t max_outgoing_peers() const { return m_maxOutgoingPeers; }
|
||||
|
@ -221,6 +222,17 @@ private:
|
|||
|
||||
static void on_broadcast(uv_async_t* handle) { reinterpret_cast<P2PServer*>(handle->data)->on_broadcast(); }
|
||||
void on_broadcast();
|
||||
|
||||
uv_mutex_t m_connectToPeersLock;
|
||||
uv_async_t m_connectToPeersAsync;
|
||||
std::string m_connectToPeersData;
|
||||
|
||||
static void on_connect_to_peers(uv_async_t* handle);
|
||||
|
||||
uv_async_t m_showPeersAsync;
|
||||
|
||||
static void on_show_peers(uv_async_t* handle) { reinterpret_cast<P2PServer*>(handle->data)->show_peers(); }
|
||||
void show_peers();
|
||||
};
|
||||
|
||||
} // namespace p2pool
|
||||
|
|
|
@ -65,7 +65,7 @@ p2pool::p2pool(int argc, char* argv[])
|
|||
}
|
||||
|
||||
bool is_v6;
|
||||
if (!resolve_host(m_params->m_host, is_v6)) {
|
||||
if (m_params->m_socks5Proxy.empty() && !resolve_host(m_params->m_host, is_v6)) {
|
||||
LOGERR(1, "resolve_host failed for " << m_params->m_host);
|
||||
throw std::exception();
|
||||
}
|
||||
|
@ -322,7 +322,7 @@ void p2pool::handle_miner_data(MinerData& data)
|
|||
log::Stream s(buf);
|
||||
s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << h << "}}\0";
|
||||
|
||||
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin,
|
||||
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, m_params->m_socks5Proxy,
|
||||
[this, h](const char* data, size_t size)
|
||||
{
|
||||
ChainMain block;
|
||||
|
@ -457,6 +457,8 @@ void p2pool::submit_block_async(const std::vector<uint8_t>& blob)
|
|||
}
|
||||
}
|
||||
|
||||
bool init_signals(p2pool* pool, bool init);
|
||||
|
||||
void p2pool::on_stop(uv_async_t* async)
|
||||
{
|
||||
p2pool* pool = reinterpret_cast<p2pool*>(async->data);
|
||||
|
@ -470,9 +472,11 @@ void p2pool::on_stop(uv_async_t* async)
|
|||
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_stopAsync), nullptr);
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(&pool->m_restartZMQAsync), nullptr);
|
||||
|
||||
init_signals(pool, false);
|
||||
|
||||
uv_loop_t* loop = uv_default_loop_checked();
|
||||
delete GetLoopUserData(loop, false);
|
||||
uv_stop(loop);
|
||||
loop->data = nullptr;
|
||||
}
|
||||
|
||||
void p2pool::submit_block() const
|
||||
|
@ -532,7 +536,7 @@ void p2pool::submit_block() const
|
|||
}
|
||||
request.append("\"]}");
|
||||
|
||||
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, request, m_params->m_rpcLogin,
|
||||
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, request, m_params->m_rpcLogin, m_params->m_socks5Proxy,
|
||||
[height, diff, template_id, nonce, extra_nonce, is_external](const char* data, size_t size)
|
||||
{
|
||||
rapidjson::Document doc;
|
||||
|
@ -648,7 +652,7 @@ void p2pool::download_block_headers(uint64_t current_height)
|
|||
s.m_pos = 0;
|
||||
s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << height << "}}\0";
|
||||
|
||||
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin,
|
||||
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, m_params->m_socks5Proxy,
|
||||
[this, prev_seed_height, height](const char* data, size_t size)
|
||||
{
|
||||
ChainMain block;
|
||||
|
@ -677,14 +681,14 @@ void p2pool::download_block_headers(uint64_t current_height)
|
|||
s.m_pos = 0;
|
||||
s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_headers_range\",\"params\":{\"start_height\":" << start_height << ",\"end_height\":" << current_height - 1 << "}}\0";
|
||||
|
||||
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin,
|
||||
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, buf, m_params->m_rpcLogin, m_params->m_socks5Proxy,
|
||||
[this, start_height, current_height](const char* data, size_t size)
|
||||
{
|
||||
if (parse_block_headers_range(data, size) == current_height - start_height) {
|
||||
update_median_timestamp();
|
||||
if (m_serversStarted.exchange(1) == 0) {
|
||||
try {
|
||||
m_ZMQReader = new ZMQReader(m_params->m_host.c_str(), m_params->m_zmqPort, this);
|
||||
m_ZMQReader = new ZMQReader(m_params->m_host, m_params->m_zmqPort, m_params->m_socks5Proxy, this);
|
||||
}
|
||||
catch (const std::exception& e) {
|
||||
LOGERR(1, "Couldn't start ZMQ reader: exception " << e.what());
|
||||
|
@ -785,7 +789,7 @@ void p2pool::stratum_on_block()
|
|||
|
||||
void p2pool::get_info()
|
||||
{
|
||||
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_info\"}", m_params->m_rpcLogin,
|
||||
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_info\"}", m_params->m_rpcLogin, m_params->m_socks5Proxy,
|
||||
[this](const char* data, size_t size)
|
||||
{
|
||||
parse_get_info_rpc(data, size);
|
||||
|
@ -897,7 +901,7 @@ void p2pool::parse_get_info_rpc(const char* data, size_t size)
|
|||
|
||||
void p2pool::get_version()
|
||||
{
|
||||
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_version\"}", m_params->m_rpcLogin,
|
||||
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_version\"}", m_params->m_rpcLogin, m_params->m_socks5Proxy,
|
||||
[this](const char* data, size_t size)
|
||||
{
|
||||
parse_get_version_rpc(data, size);
|
||||
|
@ -967,7 +971,7 @@ void p2pool::get_miner_data()
|
|||
{
|
||||
m_getMinerDataPending = true;
|
||||
|
||||
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_miner_data\"}", m_params->m_rpcLogin,
|
||||
JSONRPCRequest::call(m_params->m_host, m_params->m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_miner_data\"}", m_params->m_rpcLogin, m_params->m_socks5Proxy,
|
||||
[this](const char* data, size_t size)
|
||||
{
|
||||
parse_get_miner_data_rpc(data, size);
|
||||
|
@ -1427,7 +1431,7 @@ static bool init_uv_threadpool()
|
|||
return (uv_queue_work(uv_default_loop_checked(), &dummy, [](uv_work_t*) {}, nullptr) == 0);
|
||||
}
|
||||
|
||||
static bool init_signals(p2pool* pool)
|
||||
bool init_signals(p2pool* pool, bool init)
|
||||
{
|
||||
#ifdef SIGPIPE
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
|
@ -1447,6 +1451,14 @@ static bool init_signals(p2pool* pool)
|
|||
|
||||
static uv_signal_t signals[array_size(signal_names)];
|
||||
|
||||
if (!init) {
|
||||
for (size_t i = 0; i < array_size(signals); ++i) {
|
||||
uv_signal_stop(&signals[i]);
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(&signals[i]), nullptr);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < array_size(signal_names); ++i) {
|
||||
uv_signal_init(uv_default_loop_checked(), &signals[i]);
|
||||
signals[i].data = pool;
|
||||
|
@ -1488,7 +1500,7 @@ void p2pool::restart_zmq()
|
|||
m_ZMQReader = nullptr;
|
||||
|
||||
try {
|
||||
m_ZMQReader = new ZMQReader(m_params->m_host.c_str(), m_params->m_zmqPort, this);
|
||||
m_ZMQReader = new ZMQReader(m_params->m_host, m_params->m_zmqPort, m_params->m_socks5Proxy, this);
|
||||
m_zmqLastActive = seconds_since_epoch();
|
||||
}
|
||||
catch (const std::exception& e) {
|
||||
|
@ -1498,7 +1510,7 @@ void p2pool::restart_zmq()
|
|||
|
||||
int p2pool::run()
|
||||
{
|
||||
if (!m_params->ok()) {
|
||||
if (!m_params->valid()) {
|
||||
LOGERR(1, "Invalid or missing command line. Try \"p2pool --help\".");
|
||||
return 1;
|
||||
}
|
||||
|
@ -1508,7 +1520,7 @@ int p2pool::run()
|
|||
return 1;
|
||||
}
|
||||
|
||||
if (!init_signals(this)) {
|
||||
if (!init_signals(this, true)) {
|
||||
LOGERR(1, "failed to initialize signal handlers");
|
||||
return 1;
|
||||
}
|
||||
|
|
|
@ -142,7 +142,7 @@ private:
|
|||
void get_miner_data();
|
||||
void parse_get_miner_data_rpc(const char* data, size_t size);
|
||||
|
||||
bool parse_block_header(const char* data, size_t size, ChainMain& result);
|
||||
bool parse_block_header(const char* data, size_t size, ChainMain& c);
|
||||
uint32_t parse_block_headers_range(const char* data, size_t size);
|
||||
|
||||
void api_update_network_stats();
|
||||
|
|
|
@ -135,6 +135,11 @@ Params::Params(int argc, char* argv[])
|
|||
ok = true;
|
||||
}
|
||||
|
||||
if ((strcmp(argv[i], "--socks5") == 0) && (i + 1 < argc)) {
|
||||
m_socks5Proxy = argv[++i];
|
||||
ok = true;
|
||||
}
|
||||
|
||||
if (!ok) {
|
||||
fprintf(stderr, "Unknown command line parameter %s\n\n", argv[i]);
|
||||
p2pool_usage();
|
||||
|
@ -153,7 +158,7 @@ Params::Params(int argc, char* argv[])
|
|||
}
|
||||
}
|
||||
|
||||
bool Params::ok() const
|
||||
bool Params::valid() const
|
||||
{
|
||||
return !m_host.empty() && m_rpcPort && m_zmqPort && m_wallet.valid();
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ struct Params
|
|||
{
|
||||
Params(int argc, char* argv[]);
|
||||
|
||||
bool ok() const;
|
||||
bool valid() const;
|
||||
|
||||
std::string m_host = "127.0.0.1";
|
||||
uint32_t m_rpcPort = 18081;
|
||||
|
@ -50,6 +50,7 @@ struct Params
|
|||
bool m_mini = false;
|
||||
bool m_autoDiff = true;
|
||||
std::string m_rpcLogin;
|
||||
std::string m_socks5Proxy;
|
||||
};
|
||||
|
||||
} // namespace p2pool
|
||||
|
|
|
@ -438,7 +438,7 @@ bool RandomX_Hasher_RPC::calculate(const void* data_ptr, size_t size, uint64_t h
|
|||
|
||||
const Params& params = m_pool->params();
|
||||
|
||||
JSONRPCRequest::call(params.m_host, params.m_rpcPort, buf, params.m_rpcLogin,
|
||||
JSONRPCRequest::call(params.m_host, params.m_rpcPort, buf, params.m_rpcLogin, params.m_socks5Proxy,
|
||||
[&result, &h](const char* data, size_t size)
|
||||
{
|
||||
rapidjson::Document doc;
|
||||
|
|
|
@ -100,7 +100,7 @@ public:
|
|||
explicit RandomX_Hasher_RPC(p2pool* pool);
|
||||
~RandomX_Hasher_RPC();
|
||||
|
||||
bool calculate(const void* data, size_t size, uint64_t height, const hash& seed, hash& result) override;
|
||||
bool calculate(const void* data_ptr, size_t size, uint64_t height, const hash& seed, hash& h) override;
|
||||
|
||||
private:
|
||||
static void loop(void* data);
|
||||
|
|
|
@ -361,9 +361,10 @@ bool StratumServer::on_submit(StratumClient* client, uint32_t id, const char* jo
|
|||
|
||||
if (found) {
|
||||
BlockTemplate& block = m_pool->block_template();
|
||||
uint64_t height;
|
||||
difficulty_type mainchain_diff, sidechain_diff;
|
||||
|
||||
if (!block.get_difficulties(template_id, mainchain_diff, sidechain_diff)) {
|
||||
if (!block.get_difficulties(template_id, height, mainchain_diff, sidechain_diff)) {
|
||||
LOGWARN(4, "client " << static_cast<char*>(client->m_addrString) << " got a stale share");
|
||||
return send(client,
|
||||
[id](void* buf, size_t buf_size)
|
||||
|
@ -408,6 +409,8 @@ bool StratumServer::on_submit(StratumClient* client, uint32_t id, const char* jo
|
|||
share->m_target = target;
|
||||
share->m_resultHash = resultHash;
|
||||
share->m_sidechainDifficulty = sidechain_diff;
|
||||
share->m_mainchainHeight = height;
|
||||
share->m_effort = -1.0;
|
||||
share->m_timestamp = seconds_since_epoch();
|
||||
|
||||
uint64_t rem;
|
||||
|
@ -852,14 +855,12 @@ void StratumServer::on_share_found(uv_work_t* req)
|
|||
|
||||
const uint64_t n = server->m_cumulativeHashes + hashes;
|
||||
const double diff = sidechain_difficulty.to_double();
|
||||
const double effort = static_cast<double>(n - server->m_cumulativeHashesAtLastShare) * 100.0 / diff;
|
||||
share->m_effort = static_cast<double>(n - server->m_cumulativeHashesAtLastShare) * 100.0 / diff;
|
||||
server->m_cumulativeHashesAtLastShare = n;
|
||||
|
||||
server->m_cumulativeFoundSharesDiff += diff;
|
||||
++server->m_totalFoundShares;
|
||||
|
||||
const char* s = client->m_customUser;
|
||||
LOGINFO(0, log::Green() << "SHARE FOUND: mainchain height " << height << ", diff " << sidechain_difficulty << ", client " << static_cast<char*>(client->m_addrString) << (*s ? " user " : "") << s << ", effort " << effort << '%');
|
||||
pool->submit_sidechain_block(share->m_templateId, share->m_nonce, share->m_extraNonce);
|
||||
}
|
||||
|
||||
|
@ -882,13 +883,16 @@ void StratumServer::on_share_found(uv_work_t* req)
|
|||
void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/)
|
||||
{
|
||||
SubmittedShare* share = reinterpret_cast<SubmittedShare*>(req->data);
|
||||
StratumClient* client = share->m_client;
|
||||
|
||||
if (share->m_highEnoughDifficulty) {
|
||||
const char* s = client->m_customUser;
|
||||
LOGINFO(0, log::Green() << "SHARE FOUND: mainchain height " << share->m_mainchainHeight << ", diff " << share->m_sidechainDifficulty << ", client " << static_cast<char*>(client->m_addrString) << (*s ? " user " : "") << s << ", effort " << share->m_effort << '%');
|
||||
bkg_jobs_tracker.stop("StratumServer::on_share_found");
|
||||
}
|
||||
|
||||
ON_SCOPE_LEAVE([share]() { share->m_server->m_submittedSharesPool.push_back(share); });
|
||||
|
||||
StratumClient* client = share->m_client;
|
||||
StratumServer* server = share->m_server;
|
||||
|
||||
const bool bad_share = (share->m_result == SubmittedShare::Result::LOW_DIFF) || (share->m_result == SubmittedShare::Result::INVALID_POW);
|
||||
|
|
|
@ -139,6 +139,8 @@ private:
|
|||
uint64_t m_target;
|
||||
hash m_resultHash;
|
||||
difficulty_type m_sidechainDifficulty;
|
||||
uint64_t m_mainchainHeight;
|
||||
double m_effort;
|
||||
uint64_t m_timestamp;
|
||||
uint64_t m_hashes;
|
||||
bool m_highEnoughDifficulty;
|
||||
|
|
|
@ -36,7 +36,7 @@ public:
|
|||
|
||||
bool connect_to_peer(bool is_v6, const char* ip, int port);
|
||||
|
||||
void drop_connections() { uv_async_send(&m_dropConnectionsAsync); }
|
||||
void drop_connections_async() { uv_async_send(&m_dropConnectionsAsync); }
|
||||
void shutdown_tcp();
|
||||
virtual void print_status();
|
||||
|
||||
|
@ -45,7 +45,7 @@ public:
|
|||
int listen_port() const { return m_listenPort; }
|
||||
|
||||
bool connect_to_peer(bool is_v6, const raw_ip& ip, int port);
|
||||
virtual void on_connect_failed(bool is_v6, const raw_ip& ip, int port);
|
||||
virtual void on_connect_failed(bool /*is_v6*/, const raw_ip& /*ip*/, int /*port*/) {}
|
||||
|
||||
void ban(const raw_ip& ip, uint64_t seconds);
|
||||
virtual void print_bans();
|
||||
|
@ -58,6 +58,7 @@ public:
|
|||
virtual void reset();
|
||||
virtual bool on_connect() = 0;
|
||||
virtual bool on_read(char* data, uint32_t size) = 0;
|
||||
bool on_proxy_handshake(char* data, uint32_t size);
|
||||
virtual void on_read_failed(int /*err*/) {}
|
||||
virtual void on_disconnected() {}
|
||||
|
||||
|
@ -68,7 +69,7 @@ public:
|
|||
void close();
|
||||
void ban(uint64_t seconds);
|
||||
|
||||
void init_addr_string(bool is_v6, const sockaddr_storage* peer_addr);
|
||||
void init_addr_string();
|
||||
|
||||
alignas(8) char m_readBuf[READ_BUF_SIZE];
|
||||
|
||||
|
@ -88,7 +89,13 @@ public:
|
|||
|
||||
raw_ip m_addr;
|
||||
int m_port;
|
||||
char m_addrString[64];
|
||||
char m_addrString[72];
|
||||
|
||||
enum class Socks5ProxyState {
|
||||
Default,
|
||||
MethodSelectionSent,
|
||||
ConnectRequestSent,
|
||||
} m_socks5ProxyState;
|
||||
|
||||
std::atomic<uint32_t> m_resetCounter;
|
||||
};
|
||||
|
@ -100,7 +107,6 @@ public:
|
|||
std::vector<uint8_t> m_data;
|
||||
};
|
||||
|
||||
uv_mutex_t m_writeBuffersLock;
|
||||
std::vector<WriteBuf*> m_writeBuffers;
|
||||
|
||||
struct SendCallbackBase
|
||||
|
@ -131,9 +137,9 @@ private:
|
|||
static void on_connection_error(uv_handle_t* handle);
|
||||
static void on_connect(uv_connect_t* req, int status);
|
||||
void on_new_client(uv_stream_t* server);
|
||||
void on_new_client_nolock(uv_stream_t* server, Client* client);
|
||||
void on_new_client(uv_stream_t* server, Client* client);
|
||||
|
||||
bool connect_to_peer_nolock(Client* client, bool is_v6, const sockaddr* addr);
|
||||
bool connect_to_peer(Client* client);
|
||||
|
||||
bool send_internal(Client* client, SendCallbackBase&& callback);
|
||||
|
||||
|
@ -148,6 +154,11 @@ private:
|
|||
protected:
|
||||
void start_listening(const std::string& listen_addresses);
|
||||
|
||||
std::string m_socks5Proxy;
|
||||
bool m_socks5ProxyV6;
|
||||
raw_ip m_socks5ProxyIP;
|
||||
int m_socks5ProxyPort;
|
||||
|
||||
std::atomic<int> m_finished;
|
||||
int m_listenPort;
|
||||
|
||||
|
@ -165,7 +176,6 @@ protected:
|
|||
|
||||
bool is_banned(const raw_ip& ip);
|
||||
|
||||
uv_mutex_t m_pendingConnectionsLock;
|
||||
unordered_set<raw_ip> m_pendingConnections;
|
||||
|
||||
uv_async_t m_dropConnectionsAsync;
|
||||
|
|
|
@ -25,6 +25,9 @@ template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
|||
TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::TCPServer(allocate_client_callback allocate_new_client)
|
||||
: m_allocateNewClient(allocate_new_client)
|
||||
, m_loopThread{}
|
||||
, m_socks5ProxyV6(false)
|
||||
, m_socks5ProxyIP{}
|
||||
, m_socks5ProxyPort(-1)
|
||||
, m_finished(0)
|
||||
, m_listenPort(-1)
|
||||
, m_loop{}
|
||||
|
@ -57,18 +60,6 @@ TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::TCPServer(allocate_client_callback all
|
|||
|
||||
uv_mutex_init_checked(&m_clientsListLock);
|
||||
uv_mutex_init_checked(&m_bansLock);
|
||||
uv_mutex_init_checked(&m_pendingConnectionsLock);
|
||||
uv_mutex_init_checked(&m_writeBuffersLock);
|
||||
|
||||
m_writeBuffers.resize(DEFAULT_BACKLOG);
|
||||
for (size_t i = 0; i < m_writeBuffers.size(); ++i) {
|
||||
m_writeBuffers[i] = new WriteBuf();
|
||||
}
|
||||
|
||||
m_preallocatedClients.reserve(DEFAULT_BACKLOG);
|
||||
for (int i = 0; i < DEFAULT_BACKLOG; ++i) {
|
||||
m_preallocatedClients.emplace_back(m_allocateNewClient());
|
||||
}
|
||||
|
||||
m_connectedClientsList = m_allocateNewClient();
|
||||
m_connectedClientsList->m_next = m_connectedClientsList;
|
||||
|
@ -228,8 +219,6 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(bool is_v6, const
|
|||
return false;
|
||||
}
|
||||
|
||||
MutexLock lock(m_clientsListLock);
|
||||
|
||||
if (m_finished.load()) {
|
||||
return false;
|
||||
}
|
||||
|
@ -247,48 +236,27 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(bool is_v6, const
|
|||
|
||||
client->m_owner = this;
|
||||
client->m_port = port;
|
||||
client->m_isV6 = is_v6;
|
||||
|
||||
log::Stream s(client->m_addrString);
|
||||
|
||||
sockaddr_storage addr;
|
||||
if (is_v6) {
|
||||
sockaddr_in6* addr6 = reinterpret_cast<sockaddr_in6*>(&addr);
|
||||
const int err = uv_ip6_addr(ip, port, addr6);
|
||||
if (err) {
|
||||
LOGERR(1, "failed to parse IPv6 address " << ip << ", error " << uv_err_name(err));
|
||||
if (!str_to_ip(is_v6, ip, client->m_addr)) {
|
||||
m_preallocatedClients.push_back(client);
|
||||
return false;
|
||||
}
|
||||
|
||||
memcpy(client->m_addr.data, &addr6->sin6_addr, sizeof(in6_addr));
|
||||
|
||||
log::Stream s(client->m_addrString);
|
||||
if (is_v6) {
|
||||
s << '[' << ip << "]:" << port << '\0';
|
||||
}
|
||||
else {
|
||||
sockaddr_in* addr4 = reinterpret_cast<sockaddr_in*>(&addr);
|
||||
const int err = uv_ip4_addr(ip, port, addr4);
|
||||
if (err) {
|
||||
LOGERR(1, "failed to parse IPv4 address " << ip << ", error " << uv_err_name(err));
|
||||
m_preallocatedClients.push_back(client);
|
||||
return false;
|
||||
}
|
||||
|
||||
client->m_addr = {};
|
||||
client->m_addr.data[10] = 0xFF;
|
||||
client->m_addr.data[11] = 0xFF;
|
||||
memcpy(client->m_addr.data + 12, &addr4->sin_addr, sizeof(in_addr));
|
||||
|
||||
s << ip << ':' << port << '\0';
|
||||
}
|
||||
|
||||
return connect_to_peer_nolock(client, is_v6, reinterpret_cast<sockaddr*>(&addr));
|
||||
return connect_to_peer(client);
|
||||
}
|
||||
|
||||
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
||||
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(bool is_v6, const raw_ip& ip, int port)
|
||||
{
|
||||
MutexLock lock(m_clientsListLock);
|
||||
|
||||
if (m_finished.load()) {
|
||||
return false;
|
||||
}
|
||||
|
@ -307,29 +275,10 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(bool is_v6, const
|
|||
client->m_owner = this;
|
||||
client->m_addr = ip;
|
||||
client->m_port = port;
|
||||
client->m_isV6 = is_v6;
|
||||
client->init_addr_string();
|
||||
|
||||
sockaddr_storage addr{};
|
||||
|
||||
if (is_v6) {
|
||||
sockaddr_in6* addr6 = reinterpret_cast<sockaddr_in6*>(&addr);
|
||||
addr6->sin6_family = AF_INET6;
|
||||
memcpy(&addr6->sin6_addr, ip.data, sizeof(in6_addr));
|
||||
addr6->sin6_port = htons(static_cast<uint16_t>(port));
|
||||
}
|
||||
else {
|
||||
sockaddr_in* addr4 = reinterpret_cast<sockaddr_in*>(&addr);
|
||||
addr4->sin_family = AF_INET;
|
||||
memcpy(&addr4->sin_addr, ip.data + 12, sizeof(in_addr));
|
||||
addr4->sin_port = htons(static_cast<uint16_t>(port));
|
||||
}
|
||||
|
||||
client->init_addr_string(is_v6, &addr);
|
||||
return connect_to_peer_nolock(client, is_v6, reinterpret_cast<sockaddr*>(&addr));
|
||||
}
|
||||
|
||||
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
||||
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connect_failed(bool, const raw_ip&, int)
|
||||
{
|
||||
return connect_to_peer(client);
|
||||
}
|
||||
|
||||
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
||||
|
@ -356,7 +305,7 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::is_banned(const raw_ip& ip)
|
|||
}
|
||||
|
||||
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
||||
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer_nolock(Client* client, bool is_v6, const sockaddr* addr)
|
||||
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(Client* client)
|
||||
{
|
||||
if (is_banned(client->m_addr)) {
|
||||
LOGINFO(5, "peer " << log::Gray() << static_cast<char*>(client->m_addrString) << log::NoColor() << " is banned, not connecting to it");
|
||||
|
@ -364,8 +313,6 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer_nolock(Client* cl
|
|||
return false;
|
||||
}
|
||||
|
||||
client->m_isV6 = is_v6;
|
||||
|
||||
int err = uv_tcp_init(&m_loop, &client->m_socket);
|
||||
if (err) {
|
||||
LOGERR(1, "failed to create tcp client handle, error " << uv_err_name(err));
|
||||
|
@ -381,8 +328,6 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer_nolock(Client* cl
|
|||
return false;
|
||||
}
|
||||
|
||||
MutexLock lock(m_pendingConnectionsLock);
|
||||
|
||||
if (!m_pendingConnections.insert(client->m_addr).second) {
|
||||
LOGINFO(6, "there is already a pending connection to this IP, not connecting to " << log::Gray() << static_cast<char*>(client->m_addrString));
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(&client->m_socket), on_connection_error);
|
||||
|
@ -391,9 +336,40 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer_nolock(Client* cl
|
|||
|
||||
uv_connect_t* connect_request = reinterpret_cast<uv_connect_t*>(client->m_readBuf);
|
||||
memset(connect_request, 0, sizeof(uv_connect_t));
|
||||
|
||||
connect_request->data = client;
|
||||
err = uv_tcp_connect(connect_request, &client->m_socket, addr, on_connect);
|
||||
|
||||
sockaddr_storage addr{};
|
||||
|
||||
if (m_socks5Proxy.empty()) {
|
||||
if (client->m_isV6) {
|
||||
sockaddr_in6* addr6 = reinterpret_cast<sockaddr_in6*>(&addr);
|
||||
addr6->sin6_family = AF_INET6;
|
||||
memcpy(&addr6->sin6_addr, client->m_addr.data, sizeof(in6_addr));
|
||||
addr6->sin6_port = htons(static_cast<uint16_t>(client->m_port));
|
||||
}
|
||||
else {
|
||||
sockaddr_in* addr4 = reinterpret_cast<sockaddr_in*>(&addr);
|
||||
addr4->sin_family = AF_INET;
|
||||
memcpy(&addr4->sin_addr, client->m_addr.data + 12, sizeof(in_addr));
|
||||
addr4->sin_port = htons(static_cast<uint16_t>(client->m_port));
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (m_socks5ProxyV6) {
|
||||
sockaddr_in6* addr6 = reinterpret_cast<sockaddr_in6*>(&addr);
|
||||
addr6->sin6_family = AF_INET6;
|
||||
memcpy(&addr6->sin6_addr, m_socks5ProxyIP.data, sizeof(in6_addr));
|
||||
addr6->sin6_port = htons(static_cast<uint16_t>(m_socks5ProxyPort));
|
||||
}
|
||||
else {
|
||||
sockaddr_in* addr4 = reinterpret_cast<sockaddr_in*>(&addr);
|
||||
addr4->sin_family = AF_INET;
|
||||
memcpy(&addr4->sin_addr, m_socks5ProxyIP.data + 12, sizeof(in_addr));
|
||||
addr4->sin_port = htons(static_cast<uint16_t>(m_socks5ProxyPort));
|
||||
}
|
||||
}
|
||||
|
||||
err = uv_tcp_connect(connect_request, &client->m_socket, reinterpret_cast<sockaddr*>(&addr), on_connect);
|
||||
if (err) {
|
||||
LOGERR(1, "failed to initiate tcp connection to " << static_cast<const char*>(client->m_addrString) << ", error " << uv_err_name(err));
|
||||
m_pendingConnections.erase(client->m_addr);
|
||||
|
@ -429,9 +405,9 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::close_sockets(bool listen_sockets
|
|||
}
|
||||
}
|
||||
|
||||
MutexLock lock(m_clientsListLock);
|
||||
|
||||
size_t numClosed = 0;
|
||||
{
|
||||
MutexLock lock(m_clientsListLock);
|
||||
|
||||
for (Client* c = m_connectedClientsList->m_next; c != m_connectedClientsList; c = c->m_next) {
|
||||
uv_handle_t* h = reinterpret_cast<uv_handle_t*>(&c->m_socket);
|
||||
|
@ -440,6 +416,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::close_sockets(bool listen_sockets
|
|||
++numClosed;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (numClosed > 0) {
|
||||
LOGWARN(1, "closed " << numClosed << " active client connections");
|
||||
|
@ -485,21 +462,8 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::shutdown_tcp()
|
|||
|
||||
uv_thread_join(&m_loopThread);
|
||||
|
||||
for (Client* c : m_preallocatedClients) {
|
||||
delete c;
|
||||
}
|
||||
|
||||
uv_mutex_destroy(&m_clientsListLock);
|
||||
uv_mutex_destroy(&m_bansLock);
|
||||
uv_mutex_destroy(&m_pendingConnectionsLock);
|
||||
|
||||
{
|
||||
MutexLock lock(m_writeBuffersLock);
|
||||
for (WriteBuf* buf : m_writeBuffers) {
|
||||
delete buf;
|
||||
}
|
||||
}
|
||||
uv_mutex_destroy(&m_writeBuffersLock);
|
||||
|
||||
LOGINFO(1, "stopped");
|
||||
}
|
||||
|
@ -561,17 +525,13 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
|
|||
return true;
|
||||
}
|
||||
|
||||
WriteBuf* buf = nullptr;
|
||||
WriteBuf* buf;
|
||||
|
||||
{
|
||||
MutexLock lock(m_writeBuffersLock);
|
||||
if (!m_writeBuffers.empty()) {
|
||||
buf = m_writeBuffers.back();
|
||||
m_writeBuffers.pop_back();
|
||||
}
|
||||
}
|
||||
|
||||
if (!buf) {
|
||||
else {
|
||||
buf = new WriteBuf();
|
||||
}
|
||||
|
||||
|
@ -586,10 +546,7 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
|
|||
|
||||
if (bytes_written == 0) {
|
||||
LOGWARN(1, "send callback wrote 0 bytes, nothing to do");
|
||||
{
|
||||
MutexLock lock(m_writeBuffersLock);
|
||||
m_writeBuffers.push_back(buf);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -604,11 +561,8 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
|
|||
|
||||
const int err = uv_write(&buf->m_write, reinterpret_cast<uv_stream_t*>(&client->m_socket), bufs, 1, Client::on_write);
|
||||
if (err) {
|
||||
{
|
||||
MutexLock lock(m_writeBuffersLock);
|
||||
m_writeBuffers.push_back(buf);
|
||||
}
|
||||
LOGWARN(1, "failed to start writing data to client connection " << static_cast<const char*>(client->m_addrString) << ", error " << uv_err_name(err));
|
||||
m_writeBuffers.push_back(buf);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -621,8 +575,27 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::loop(void* data)
|
|||
LOGINFO(1, "event loop started");
|
||||
server_event_loop_thread = true;
|
||||
TCPServer* server = static_cast<TCPServer*>(data);
|
||||
|
||||
server->m_writeBuffers.resize(DEFAULT_BACKLOG);
|
||||
server->m_preallocatedClients.reserve(DEFAULT_BACKLOG);
|
||||
for (size_t i = 0; i < DEFAULT_BACKLOG; ++i) {
|
||||
server->m_writeBuffers[i] = new WriteBuf();
|
||||
server->m_preallocatedClients.emplace_back(server->m_allocateNewClient());
|
||||
}
|
||||
|
||||
uv_run(&server->m_loop, UV_RUN_DEFAULT);
|
||||
uv_loop_close(&server->m_loop);
|
||||
|
||||
for (WriteBuf* buf : server->m_writeBuffers) {
|
||||
delete buf;
|
||||
}
|
||||
server->m_writeBuffers.clear();
|
||||
|
||||
for (Client* c : server->m_preallocatedClients) {
|
||||
delete c;
|
||||
}
|
||||
server->m_preallocatedClients.clear();
|
||||
|
||||
LOGINFO(1, "event loop stopped");
|
||||
server->m_loopStopped = true;
|
||||
}
|
||||
|
@ -647,10 +620,6 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_connection(uv_stream_t* se
|
|||
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
||||
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connection_close(uv_handle_t* handle)
|
||||
{
|
||||
if (!server_event_loop_thread) {
|
||||
LOGERR(1, "on_connection_close called from another thread, this is not thread safe");
|
||||
}
|
||||
|
||||
Client* client = static_cast<Client*>(handle->data);
|
||||
TCPServer* owner = client->m_owner;
|
||||
|
||||
|
@ -685,10 +654,7 @@ template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
|||
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connection_error(uv_handle_t* handle)
|
||||
{
|
||||
Client* client = reinterpret_cast<Client*>(handle->data);
|
||||
TCPServer* server = client->m_owner;
|
||||
|
||||
MutexLock lock(server->m_clientsListLock);
|
||||
server->m_preallocatedClients.push_back(client);
|
||||
client->m_owner->m_preallocatedClients.push_back(client);
|
||||
}
|
||||
|
||||
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
||||
|
@ -701,12 +667,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connect(uv_connect_t* req, int
|
|||
return;
|
||||
}
|
||||
|
||||
{
|
||||
MutexLock lock(server->m_pendingConnectionsLock);
|
||||
server->m_pendingConnections.erase(client->m_addr);
|
||||
}
|
||||
|
||||
MutexLock lock(server->m_clientsListLock);
|
||||
|
||||
if (status) {
|
||||
if (status == UV_ETIMEDOUT) {
|
||||
|
@ -720,14 +681,12 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connect(uv_connect_t* req, int
|
|||
return;
|
||||
}
|
||||
|
||||
server->on_new_client_nolock(nullptr, client);
|
||||
server->on_new_client(nullptr, client);
|
||||
}
|
||||
|
||||
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
||||
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client(uv_stream_t* server)
|
||||
{
|
||||
MutexLock lock(m_clientsListLock);
|
||||
|
||||
if (m_finished.load()) {
|
||||
return;
|
||||
}
|
||||
|
@ -766,20 +725,26 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client(uv_stream_t* server
|
|||
return;
|
||||
}
|
||||
|
||||
on_new_client_nolock(server, client);
|
||||
on_new_client(server, client);
|
||||
}
|
||||
|
||||
|
||||
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
||||
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client_nolock(uv_stream_t* server, Client* client)
|
||||
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client(uv_stream_t* server, Client* client)
|
||||
{
|
||||
MutexLock lock(m_clientsListLock);
|
||||
|
||||
client->m_prev = m_connectedClientsList;
|
||||
client->m_next = m_connectedClientsList->m_next;
|
||||
m_connectedClientsList->m_next->m_prev = client;
|
||||
m_connectedClientsList->m_next = client;
|
||||
|
||||
++m_numConnections;
|
||||
client->m_isIncoming = false;
|
||||
|
||||
client->m_isIncoming = (server != nullptr);
|
||||
|
||||
if (client->m_isIncoming) {
|
||||
client->m_isV6 = (std::find(m_listenSockets6.begin(), m_listenSockets6.end(), reinterpret_cast<uv_tcp_t*>(server)) != m_listenSockets6.end());
|
||||
|
||||
sockaddr_storage peer_addr;
|
||||
int peer_addr_len = static_cast<int>(sizeof(peer_addr));
|
||||
|
@ -790,16 +755,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client_nolock(uv_stream_t*
|
|||
return;
|
||||
}
|
||||
|
||||
bool is_v6;
|
||||
if (server) {
|
||||
is_v6 = (std::find(m_listenSockets6.begin(), m_listenSockets6.end(), reinterpret_cast<uv_tcp_t*>(server)) != m_listenSockets6.end());
|
||||
client->m_isV6 = is_v6;
|
||||
}
|
||||
else {
|
||||
is_v6 = client->m_isV6;
|
||||
}
|
||||
|
||||
if (is_v6) {
|
||||
if (client->m_isV6) {
|
||||
memcpy(client->m_addr.data, &reinterpret_cast<sockaddr_in6*>(&peer_addr)->sin6_addr, sizeof(in6_addr));
|
||||
client->m_port = ntohs(reinterpret_cast<sockaddr_in6*>(&peer_addr)->sin6_port);
|
||||
}
|
||||
|
@ -811,17 +767,11 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client_nolock(uv_stream_t*
|
|||
client->m_port = ntohs(reinterpret_cast<sockaddr_in*>(&peer_addr)->sin_port);
|
||||
}
|
||||
|
||||
client->init_addr_string(is_v6, &peer_addr);
|
||||
|
||||
if (server) {
|
||||
LOGINFO(5, "new connection from " << log::Gray() << static_cast<char*>(client->m_addrString));
|
||||
client->m_isIncoming = true;
|
||||
client->init_addr_string();
|
||||
++m_numIncomingConnections;
|
||||
}
|
||||
else {
|
||||
LOGINFO(5, "new connection to " << log::Gray() << static_cast<char*>(client->m_addrString));
|
||||
client->m_isIncoming = false;
|
||||
}
|
||||
|
||||
LOGINFO(5, "new connection " << (client->m_isIncoming ? "from " : "to ") << log::Gray() << static_cast<char*>(client->m_addrString));
|
||||
|
||||
if (is_banned(client->m_addr)) {
|
||||
LOGINFO(5, "peer " << log::Gray() << static_cast<char*>(client->m_addrString) << log::NoColor() << " is banned, disconnecting");
|
||||
|
@ -829,12 +779,44 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client_nolock(uv_stream_t*
|
|||
return;
|
||||
}
|
||||
|
||||
if (client->m_owner->m_finished.load() || !client->on_connect()) {
|
||||
TCPServer* owner = client->m_owner;
|
||||
|
||||
if (owner->m_finished.load()) {
|
||||
client->close();
|
||||
return;
|
||||
}
|
||||
|
||||
err = uv_read_start(reinterpret_cast<uv_stream_t*>(&client->m_socket), Client::on_alloc, Client::on_read);
|
||||
if (owner->m_socks5Proxy.empty()) {
|
||||
if (!client->on_connect()) {
|
||||
client->close();
|
||||
return;
|
||||
}
|
||||
}
|
||||
else {
|
||||
const bool result = owner->send(client,
|
||||
[](void* buf, size_t buf_size) -> size_t
|
||||
{
|
||||
if (buf_size < 3) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint8_t* p = reinterpret_cast<uint8_t*>(buf);
|
||||
p[0] = 5; // Protocol version (SOCKS5)
|
||||
p[1] = 1; // NMETHODS
|
||||
p[2] = 0; // Method 0 (no authentication)
|
||||
|
||||
return 3;
|
||||
});
|
||||
|
||||
if (result) {
|
||||
client->m_socks5ProxyState = Client::Socks5ProxyState::MethodSelectionSent;
|
||||
}
|
||||
else {
|
||||
client->close();
|
||||
}
|
||||
}
|
||||
|
||||
const int err = uv_read_start(reinterpret_cast<uv_stream_t*>(&client->m_socket), Client::on_alloc, Client::on_read);
|
||||
if (err) {
|
||||
LOGERR(1, "failed to start reading from client connection, error " << uv_err_name(err));
|
||||
client->close();
|
||||
|
@ -855,6 +837,7 @@ TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::Client()
|
|||
, m_addr{}
|
||||
, m_port(0)
|
||||
, m_addrString{}
|
||||
, m_socks5ProxyState(Socks5ProxyState::Default)
|
||||
, m_resetCounter{ 0 }
|
||||
{
|
||||
m_readBuf[0] = '\0';
|
||||
|
@ -878,6 +861,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::reset()
|
|||
m_addr = {};
|
||||
m_port = -1;
|
||||
m_addrString[0] = '\0';
|
||||
m_socks5ProxyState = Socks5ProxyState::Default;
|
||||
}
|
||||
|
||||
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
||||
|
@ -907,34 +891,158 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_alloc(uv_handle_t* han
|
|||
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
||||
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
|
||||
{
|
||||
Client* pThis = static_cast<Client*>(stream->data);
|
||||
pThis->m_readBufInUse = false;
|
||||
Client* client = static_cast<Client*>(stream->data);
|
||||
client->m_readBufInUse = false;
|
||||
|
||||
if (pThis->m_isClosing) {
|
||||
LOGWARN(5, "client " << static_cast<const char*>(pThis->m_addrString) << " is being disconnected but data received from it, nread = " << nread << ". Ignoring it.");
|
||||
if (client->m_isClosing) {
|
||||
LOGWARN(5, "client " << static_cast<const char*>(client->m_addrString) << " is being disconnected but data received from it, nread = " << nread << ". Ignoring it.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (nread > 0) {
|
||||
if (pThis->m_owner && !pThis->m_owner->m_finished.load()) {
|
||||
if (!pThis->on_read(buf->base, static_cast<uint32_t>(nread))) {
|
||||
pThis->close();
|
||||
if (client->m_owner && !client->m_owner->m_finished.load()) {
|
||||
if (client->m_socks5ProxyState == Socks5ProxyState::Default) {
|
||||
if (!client->on_read(buf->base, static_cast<uint32_t>(nread))) {
|
||||
client->close();
|
||||
}
|
||||
}
|
||||
else if (!client->on_proxy_handshake(buf->base, static_cast<uint32_t>(nread))) {
|
||||
client->close();
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (nread < 0) {
|
||||
if (nread != UV_EOF) {
|
||||
const int err = static_cast<int>(nread);
|
||||
LOGWARN(5, "client " << static_cast<const char*>(pThis->m_addrString) << " failed to read response, err = " << uv_err_name(err));
|
||||
pThis->on_read_failed(err);
|
||||
LOGWARN(5, "client " << static_cast<const char*>(client->m_addrString) << " failed to read response, err = " << uv_err_name(err));
|
||||
client->on_read_failed(err);
|
||||
}
|
||||
else {
|
||||
pThis->on_disconnected();
|
||||
client->on_disconnected();
|
||||
}
|
||||
pThis->close();
|
||||
client->close();
|
||||
}
|
||||
}
|
||||
|
||||
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
||||
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_proxy_handshake(char* data, uint32_t size)
|
||||
{
|
||||
if ((data != m_readBuf + m_numRead) || (data + size > m_readBuf + sizeof(m_readBuf))) {
|
||||
LOGERR(1, "peer " << static_cast<char*>(m_addrString) << " invalid data pointer or size in on_read()");
|
||||
return false;
|
||||
}
|
||||
m_numRead += size;
|
||||
|
||||
uint32_t n = 0;
|
||||
|
||||
switch (m_socks5ProxyState) {
|
||||
case Socks5ProxyState::MethodSelectionSent:
|
||||
if (m_numRead >= 2) {
|
||||
if ((m_readBuf[0] != 5) && (m_readBuf[1] != 0)) {
|
||||
LOGWARN(5, "SOCKS5 proxy returned an invalid METHOD selection message");
|
||||
return false;
|
||||
}
|
||||
n = 2;
|
||||
|
||||
const bool result = m_owner->send(this,
|
||||
[this](void* buf, size_t buf_size) -> size_t
|
||||
{
|
||||
if (buf_size < 20) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint8_t* p = reinterpret_cast<uint8_t*>(buf);
|
||||
p[0] = 5; // Protocol version (SOCKS5)
|
||||
p[1] = 1; // CONNECT
|
||||
p[2] = 0; // RESERVED
|
||||
if (m_isV6) {
|
||||
p[3] = 4; // ATYP
|
||||
memcpy(p + 4, m_addr.data, 16);
|
||||
p[20] = static_cast<uint8_t>(m_port >> 8);
|
||||
p[21] = static_cast<uint8_t>(m_port & 0xFF);
|
||||
}
|
||||
else {
|
||||
p[3] = 1; // ATYP
|
||||
memcpy(p + 4, m_addr.data + 12, 4);
|
||||
p[8] = static_cast<uint8_t>(m_port >> 8);
|
||||
p[9] = static_cast<uint8_t>(m_port & 0xFF);
|
||||
}
|
||||
|
||||
return m_isV6 ? 22 : 10;
|
||||
});
|
||||
|
||||
if (result) {
|
||||
m_socks5ProxyState = Socks5ProxyState::ConnectRequestSent;
|
||||
}
|
||||
else {
|
||||
close();
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case Socks5ProxyState::ConnectRequestSent:
|
||||
if (m_numRead >= 4) {
|
||||
uint8_t* p = reinterpret_cast<uint8_t*>(m_readBuf);
|
||||
if ((p[0] != 5) && (p[1] != 0) && p[2] != 0) {
|
||||
LOGWARN(5, "SOCKS5 proxy returned an invalid reply to CONNECT");
|
||||
return false;
|
||||
}
|
||||
|
||||
switch (p[3]) {
|
||||
case 1:
|
||||
if (m_numRead >= 10) {
|
||||
m_socks5ProxyState = Socks5ProxyState::Default;
|
||||
n = 10;
|
||||
}
|
||||
break;
|
||||
case 3:
|
||||
if (m_numRead >= 5) {
|
||||
const uint32_t len = p[4];
|
||||
if (m_numRead >= 7 + len) {
|
||||
m_socks5ProxyState = Socks5ProxyState::Default;
|
||||
n = 7 + len;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case 4:
|
||||
if (m_numRead >= 22) {
|
||||
m_socks5ProxyState = Socks5ProxyState::Default;
|
||||
n = 22;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
|
||||
// Move the possible unfinished message to the beginning of m_readBuf to free up more space for reading
|
||||
if (n > 0) {
|
||||
m_numRead -= n;
|
||||
if (m_numRead > 0) {
|
||||
memmove(m_readBuf, m_readBuf + n, m_numRead);
|
||||
}
|
||||
}
|
||||
|
||||
if (m_socks5ProxyState == Socks5ProxyState::Default) {
|
||||
if (!on_connect()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (m_numRead > 0) {
|
||||
const uint32_t nread = m_numRead;
|
||||
m_numRead = 0;
|
||||
if (!on_read(m_readBuf, nread)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
||||
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_write(uv_write_t* req, int status)
|
||||
{
|
||||
|
@ -943,7 +1051,6 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_write(uv_write_t* req,
|
|||
TCPServer* server = client->m_owner;
|
||||
|
||||
if (server) {
|
||||
MutexLock lock(server->m_writeBuffersLock);
|
||||
server->m_writeBuffers.push_back(buf);
|
||||
}
|
||||
|
||||
|
@ -982,16 +1089,16 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::ban(uint64_t seconds)
|
|||
}
|
||||
|
||||
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
||||
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::init_addr_string(bool is_v6, const sockaddr_storage* peer_addr)
|
||||
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::init_addr_string()
|
||||
{
|
||||
const char* addr_str;
|
||||
char addr_str_buf[64];
|
||||
|
||||
if (is_v6) {
|
||||
addr_str = inet_ntop(AF_INET6, &reinterpret_cast<const sockaddr_in6*>(peer_addr)->sin6_addr, addr_str_buf, sizeof(addr_str_buf));
|
||||
if (m_isV6) {
|
||||
addr_str = inet_ntop(AF_INET6, m_addr.data, addr_str_buf, sizeof(addr_str_buf));
|
||||
}
|
||||
else {
|
||||
addr_str = inet_ntop(AF_INET, &reinterpret_cast<const sockaddr_in*>(peer_addr)->sin_addr, addr_str_buf, sizeof(addr_str_buf));
|
||||
addr_str = inet_ntop(AF_INET, m_addr.data + 12, addr_str_buf, sizeof(addr_str_buf));
|
||||
}
|
||||
|
||||
if (addr_str) {
|
||||
|
@ -1001,11 +1108,11 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::init_addr_string(bool is_
|
|||
}
|
||||
|
||||
log::Stream s(m_addrString);
|
||||
if (is_v6) {
|
||||
s << '[' << log::const_buf(addr_str, n) << "]:" << ntohs(reinterpret_cast<const sockaddr_in6*>(peer_addr)->sin6_port) << '\0';
|
||||
if (m_isV6) {
|
||||
s << '[' << log::const_buf(addr_str, n) << "]:" << m_port << '\0';
|
||||
}
|
||||
else {
|
||||
s << log::const_buf(addr_str, n) << ':' << ntohs(reinterpret_cast<const sockaddr_in*>(peer_addr)->sin_port) << '\0';
|
||||
s << log::const_buf(addr_str, n) << ':' << m_port << '\0';
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
51
src/util.cpp
51
src/util.cpp
|
@ -432,6 +432,57 @@ NOINLINE uint64_t bsr_reference(uint64_t x)
|
|||
return bsr8_table.data[y >> 24] - n0 - n1 - n2;
|
||||
}
|
||||
|
||||
bool str_to_ip(bool is_v6, const char* ip, raw_ip& result)
|
||||
{
|
||||
sockaddr_storage addr;
|
||||
|
||||
if (is_v6) {
|
||||
sockaddr_in6* addr6 = reinterpret_cast<sockaddr_in6*>(&addr);
|
||||
const int err = uv_ip6_addr(ip, 0, addr6);
|
||||
if (err) {
|
||||
LOGERR(1, "failed to parse IPv6 address " << ip << ", error " << uv_err_name(err));
|
||||
return false;
|
||||
}
|
||||
memcpy(result.data, &addr6->sin6_addr, sizeof(in6_addr));
|
||||
}
|
||||
else {
|
||||
sockaddr_in* addr4 = reinterpret_cast<sockaddr_in*>(&addr);
|
||||
const int err = uv_ip4_addr(ip, 0, addr4);
|
||||
if (err) {
|
||||
LOGERR(1, "failed to parse IPv4 address " << ip << ", error " << uv_err_name(err));
|
||||
return false;
|
||||
}
|
||||
result = {};
|
||||
result.data[10] = 0xFF;
|
||||
result.data[11] = 0xFF;
|
||||
memcpy(result.data + 12, &addr4->sin_addr, sizeof(in_addr));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool is_localhost(const std::string& host)
|
||||
{
|
||||
if (host.empty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (host.compare("localhost") == 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (host.find_first_not_of("0123456789.:") != std::string::npos) {
|
||||
return false;
|
||||
}
|
||||
|
||||
raw_ip addr;
|
||||
if (!str_to_ip(host.find(':') != std::string::npos, host.c_str(), addr)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return addr.is_localhost();
|
||||
}
|
||||
|
||||
UV_LoopUserData* GetLoopUserData(uv_loop_t* loop, bool create)
|
||||
{
|
||||
UV_LoopUserData* data = reinterpret_cast<UV_LoopUserData*>(loop->data);
|
||||
|
|
|
@ -219,6 +219,9 @@ FORCEINLINE uint64_t bsr(uint64_t x)
|
|||
#define bsr bsr_reference
|
||||
#endif
|
||||
|
||||
bool str_to_ip(bool is_v6, const char* ip, raw_ip& result);
|
||||
bool is_localhost(const std::string& host);
|
||||
|
||||
} // namespace p2pool
|
||||
|
||||
void memory_tracking_start();
|
||||
|
|
|
@ -24,14 +24,20 @@ static constexpr char log_category_prefix[] = "ZMQReader ";
|
|||
|
||||
namespace p2pool {
|
||||
|
||||
ZMQReader::ZMQReader(const char* address, uint32_t zmq_port, MinerCallbackHandler* handler)
|
||||
ZMQReader::ZMQReader(const std::string& address, uint32_t zmq_port, const std::string& proxy, MinerCallbackHandler* handler)
|
||||
: m_address(address)
|
||||
, m_zmqPort(zmq_port)
|
||||
, m_proxy(proxy)
|
||||
, m_handler(handler)
|
||||
, m_tx()
|
||||
, m_minerData()
|
||||
, m_chainmainData()
|
||||
{
|
||||
if (!m_proxy.empty() && is_localhost(address)) {
|
||||
LOGINFO(5, "not using proxy to connect to localhost address " << log::Gray() << address);
|
||||
m_proxy.clear();
|
||||
}
|
||||
|
||||
for (uint32_t i = m_publisherPort; i < std::numeric_limits<uint16_t>::max(); ++i) {
|
||||
try {
|
||||
m_publisherPort = 0;
|
||||
|
@ -84,14 +90,18 @@ void ZMQReader::run_wrapper(void* arg)
|
|||
void ZMQReader::run()
|
||||
{
|
||||
try {
|
||||
char addr[32];
|
||||
if (!m_proxy.empty()) {
|
||||
m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer(m_proxy.c_str(), m_proxy.length()));
|
||||
}
|
||||
|
||||
snprintf(addr, sizeof(addr), "tcp://%s:%u", m_address, m_zmqPort);
|
||||
std::string addr = "tcp://" + m_address + ':' + std::to_string(m_zmqPort);
|
||||
if (!connect(addr)) {
|
||||
return;
|
||||
}
|
||||
|
||||
snprintf(addr, sizeof(addr), "tcp://127.0.0.1:%u", m_publisherPort);
|
||||
m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer());
|
||||
|
||||
addr = "tcp://127.0.0.1:" + std::to_string(m_publisherPort);
|
||||
if (!connect(addr)) {
|
||||
return;
|
||||
}
|
||||
|
@ -128,7 +138,7 @@ void ZMQReader::run()
|
|||
}
|
||||
}
|
||||
|
||||
bool ZMQReader::connect(const char* address)
|
||||
bool ZMQReader::connect(const std::string& address)
|
||||
{
|
||||
struct ConnectMonitor : public zmq::monitor_t
|
||||
{
|
||||
|
|
|
@ -24,18 +24,19 @@ namespace p2pool {
|
|||
|
||||
class ZMQReader {
|
||||
public:
|
||||
ZMQReader(const char* address, uint32_t zmq_port, MinerCallbackHandler* handler);
|
||||
ZMQReader(const std::string& address, uint32_t zmq_port, const std::string& proxy, MinerCallbackHandler* handler);
|
||||
~ZMQReader();
|
||||
|
||||
private:
|
||||
static void run_wrapper(void* arg);
|
||||
void run();
|
||||
bool connect(const char* address);
|
||||
bool connect(const std::string& address);
|
||||
|
||||
void parse(char* data, size_t size);
|
||||
|
||||
const char* m_address;
|
||||
std::string m_address;
|
||||
uint32_t m_zmqPort;
|
||||
std::string m_proxy;
|
||||
MinerCallbackHandler* m_handler;
|
||||
|
||||
uv_thread_t m_worker{};
|
||||
|
|
Loading…
Reference in a new issue