From 6e258bb210cee239378f8fe9b8395d090642147e Mon Sep 17 00:00:00 2001 From: SChernykh Date: Wed, 19 Apr 2023 11:36:12 +0200 Subject: [PATCH] Refactored TCPServer to reduce code duplication --- CMakeLists.txt | 2 +- src/console_commands.cpp | 4 +- src/console_commands.h | 9 +- src/crypto.cpp | 1 + src/json_rpc_request.h | 22 +--- src/p2p_server.cpp | 19 +-- src/p2p_server.h | 6 +- src/p2pool_api.cpp | 2 +- src/p2pool_api.h | 22 +--- src/stratum_server.cpp | 12 +- src/stratum_server.h | 6 +- src/{tcp_server.inl => tcp_server.cpp} | 176 +++++++++++-------------- src/tcp_server.h | 44 +++---- src/util.h | 21 +++ src/uv_util.h | 19 +-- tests/CMakeLists.txt | 1 + tests/src/pool_block_tests.cpp | 2 +- 17 files changed, 163 insertions(+), 205 deletions(-) rename src/{tcp_server.inl => tcp_server.cpp} (81%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8b12d0a..44776ce 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -56,7 +56,6 @@ set(HEADERS src/side_chain.h src/stratum_server.h src/tcp_server.h - src/tcp_server.inl src/util.h src/uv_util.h src/wallet.h @@ -85,6 +84,7 @@ set(SOURCES src/pow_hash.cpp src/side_chain.cpp src/stratum_server.cpp + src/tcp_server.cpp src/util.cpp src/wallet.cpp src/zmq_reader.cpp diff --git a/src/console_commands.cpp b/src/console_commands.cpp index fc312c6..4240109 100644 --- a/src/console_commands.cpp +++ b/src/console_commands.cpp @@ -32,12 +32,10 @@ static constexpr char log_category_prefix[] = "ConsoleCommands "; static constexpr int DEFAULT_BACKLOG = 1; -#include "tcp_server.inl" - namespace p2pool { ConsoleCommands::ConsoleCommands(p2pool* pool) - : TCPServer(ConsoleClient::allocate) + : TCPServer(DEFAULT_BACKLOG, ConsoleClient::allocate) , m_pool(pool) , m_tty{} , m_stdin_pipe{} diff --git a/src/console_commands.h b/src/console_commands.h index a79e41d..316ff0c 100644 --- a/src/console_commands.h +++ b/src/console_commands.h @@ -24,7 +24,7 @@ namespace p2pool { class p2pool; -class ConsoleCommands : public TCPServer<256, 256> +class ConsoleCommands : public TCPServer { public: explicit ConsoleCommands(p2pool* pool); @@ -32,6 +32,7 @@ public: struct ConsoleClient : public Client { + ConsoleClient() : Client(m_consoleReadBuf, sizeof(m_consoleReadBuf)) {} ~ConsoleClient() {} static Client* allocate() { return new ConsoleClient(); } @@ -41,19 +42,23 @@ public: bool on_connect() override { return true; }; bool on_read(char* data, uint32_t size) override { static_cast(m_owner)->process_input(m_command, data, size); return true; }; + char m_consoleReadBuf[1024] = {}; + std::string m_command; }; void on_shutdown() override; private: + const char* get_category() const override { return "ConsoleCommands "; } + p2pool* m_pool; uv_tty_t m_tty; uv_pipe_t m_stdin_pipe; uv_stream_t* m_stdin_handle; - char m_readBuf[64]; + char m_readBuf[1024]; bool m_readBufInUse; std::string m_command; diff --git a/src/crypto.cpp b/src/crypto.cpp index e6bbb50..7764eba 100644 --- a/src/crypto.cpp +++ b/src/crypto.cpp @@ -356,6 +356,7 @@ private: hash m_derivation; uint32_t m_viewTags1[2] = { 0xFFFFFFFFUL, 0xFFFFFFFFUL }; std::vector m_viewTags2; + // cppcheck-suppress unusedStructMember uint64_t m_timestamp; FORCEINLINE bool find_view_tag(size_t output_index, uint8_t& view_tag) const diff --git a/src/json_rpc_request.h b/src/json_rpc_request.h index fbf0eeb..7c88a28 100644 --- a/src/json_rpc_request.h +++ b/src/json_rpc_request.h @@ -20,29 +20,17 @@ namespace p2pool { namespace JSONRPCRequest { -struct CallbackBase -{ - virtual ~CallbackBase() {} - virtual void operator()(const char* data, size_t size) = 0; -}; - -template -struct Callback : public CallbackBase -{ - explicit FORCEINLINE Callback(T&& cb) : m_cb(std::move(cb)) {} - void operator()(const char* data, size_t size) override { m_cb(data, size); } - -private: - Callback& operator=(Callback&&) = delete; - T m_cb; -}; +typedef Callback::Base CallbackBase; void Call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop); template FORCEINLINE void call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, T&& cb, U&& close_cb, uv_loop_t* loop = nullptr) { - Call(address, port, req, auth, proxy, new Callback(std::move(cb)), new Callback(std::move(close_cb)), loop); + typedef Callback::Derived CallbackT; + typedef Callback::Derived CallbackU; + + Call(address, port, req, auth, proxy, new CallbackT(std::move(cb)), new CallbackU(std::move(close_cb)), loop); } } // namespace JSONRPCRequest diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index c55614f..7865ddd 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -41,12 +41,10 @@ static constexpr uint64_t DEFAULT_BAN_TIME = 600; static constexpr size_t SEND_BUF_MIN_SIZE = 256; -#include "tcp_server.inl" - namespace p2pool { P2PServer::P2PServer(p2pool* pool) - : TCPServer(P2PClient::allocate) + : TCPServer(DEFAULT_BACKLOG, P2PClient::allocate) , m_pool(pool) , m_cache(pool->params().m_blockCache ? new BlockCache() : nullptr) , m_cacheLoaded(false) @@ -62,6 +60,8 @@ P2PServer::P2PServer(p2pool* pool) , m_lookForMissingBlocks(true) , m_fastestPeer(nullptr) { + m_callbackBuf.resize(P2P_BUF_SIZE); + m_blockDeserializeBuf.reserve(131072); // Diffuse the initial state in case it has low quality @@ -1188,7 +1188,8 @@ void P2PServer::check_block_template() } P2PServer::P2PClient::P2PClient() - : m_peerId(0) + : Client(m_p2pReadBuf, sizeof(m_p2pReadBuf)) + , m_peerId(0) , m_connectedTime(0) , m_broadcastMaxHeight(0) , m_expectedMessage(MessageId::HANDSHAKE_CHALLENGE) @@ -1211,6 +1212,7 @@ P2PServer::P2PClient::P2PClient() , m_lastBlockrequestTimestamp(0) , m_broadcastedHashes{} { + m_p2pReadBuf[0] = '\0'; } void P2PServer::on_shutdown() @@ -1352,7 +1354,7 @@ bool P2PServer::P2PClient::on_read(char* data, uint32_t size) return false; } - if ((data != m_readBuf + m_numRead) || (data + size > m_readBuf + sizeof(m_readBuf))) { + if ((data != m_readBuf + m_numRead) || (data + size > m_readBuf + m_readBufSize)) { LOGERR(1, "peer " << static_cast(m_addrString) << " invalid data pointer or size in on_read()"); ban(DEFAULT_BAN_TIME); server->remove_peer_from_list(this); @@ -1811,9 +1813,8 @@ bool P2PServer::P2PClient::check_handshake_solution(const hash& solution, const bool P2PServer::P2PClient::on_handshake_challenge(const uint8_t* buf) { - check_event_loop_thread(__func__); - P2PServer* server = static_cast(m_owner); + server->check_event_loop_thread(__func__); uint8_t challenge[CHALLENGE_SIZE]; memcpy(challenge, buf, CHALLENGE_SIZE); @@ -2084,9 +2085,9 @@ bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size, bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*) { - check_event_loop_thread(__func__); - P2PServer* server = static_cast(m_owner); + server->check_event_loop_thread(__func__); + const uint64_t cur_time = seconds_since_epoch(); const bool first = (m_prevIncomingPeerListRequest == 0); diff --git a/src/p2p_server.h b/src/p2p_server.h index 8302f1b..ea72593 100644 --- a/src/p2p_server.h +++ b/src/p2p_server.h @@ -36,7 +36,7 @@ static constexpr uint32_t PROTOCOL_VERSION_1_1 = 0x00010001UL; static constexpr uint32_t SUPPORTED_PROTOCOL_VERSION = PROTOCOL_VERSION_1_1; -class P2PServer : public TCPServer +class P2PServer : public TCPServer { public: enum class MessageId { @@ -114,6 +114,8 @@ public: const char* software_name() const; + alignas(8) char m_p2pReadBuf[P2P_BUF_SIZE]; + uint64_t m_peerId; uint64_t m_connectedTime; uint64_t m_broadcastMaxHeight; @@ -167,6 +169,8 @@ public: const PoolBlock* get_block() const { return m_block; } private: + const char* get_category() const override { return "P2PServer "; } + p2pool* m_pool; BlockCache* m_cache; bool m_cacheLoaded; diff --git a/src/p2pool_api.cpp b/src/p2pool_api.cpp index b26704d..82e71e3 100644 --- a/src/p2pool_api.cpp +++ b/src/p2pool_api.cpp @@ -103,7 +103,7 @@ void p2pool_api::on_stop() uv_close(reinterpret_cast(&m_dumpToFileAsync), nullptr); } -void p2pool_api::dump_to_file_async_internal(Category category, const char* filename, DumpFileCallbackBase&& callback) +void p2pool_api::dump_to_file_async_internal(Category category, const char* filename, Callback::Base&& callback) { std::vector buf(16384); log::Stream s(buf.data(), buf.size()); diff --git a/src/p2pool_api.h b/src/p2pool_api.h index bd053ca..29a1298 100644 --- a/src/p2pool_api.h +++ b/src/p2pool_api.h @@ -38,7 +38,7 @@ public: void on_stop(); template - void set(Category category, const char* filename, T&& callback) { dump_to_file_async_internal(category, filename, DumpFileCallback(std::move(callback))); } + void set(Category category, const char* filename, T&& callback) { dump_to_file_async_internal(category, filename, Callback::Derived(std::move(callback))); } private: void create_dir(const std::string& path); @@ -54,25 +54,7 @@ private: std::vector buf; }; - struct DumpFileCallbackBase - { - virtual ~DumpFileCallbackBase() {} - virtual void operator()(log::Stream&) = 0; - }; - - template - struct DumpFileCallback : public DumpFileCallbackBase - { - explicit FORCEINLINE DumpFileCallback(T&& callback) : m_callback(std::move(callback)) {} - void operator()(log::Stream& s) override { m_callback(s); } - - private: - DumpFileCallback& operator=(DumpFileCallback&&) = delete; - - T m_callback; - }; - - void dump_to_file_async_internal(Category category, const char* filename, DumpFileCallbackBase&& callback); + void dump_to_file_async_internal(Category category, const char* filename, Callback::Base&& callback); void dump_to_file(); static void on_fs_open(uv_fs_t* req); static void on_fs_write(uv_fs_t* req); diff --git a/src/stratum_server.cpp b/src/stratum_server.cpp index a3d6c82..3aa557b 100644 --- a/src/stratum_server.cpp +++ b/src/stratum_server.cpp @@ -37,12 +37,10 @@ static constexpr int32_t BAD_SHARE_POINTS = -5; static constexpr int32_t GOOD_SHARE_POINTS = 1; static constexpr int32_t BAN_THRESHOLD_POINTS = -15; -#include "tcp_server.inl" - namespace p2pool { StratumServer::StratumServer(p2pool* pool) - : TCPServer(StratumClient::allocate) + : TCPServer(DEFAULT_BACKLOG, StratumClient::allocate) , m_pool(pool) , m_autoDiff(pool->params().m_autoDiff) , m_rng(RandomDeviceSeed::instance) @@ -57,6 +55,8 @@ StratumServer::StratumServer(p2pool* pool) , m_totalFailedShares(0) , m_apiLastUpdateTime(0) { + m_callbackBuf.resize(STRATUM_BUF_SIZE); + // Diffuse the initial state in case it has low quality m_rng.discard(10000); @@ -1027,7 +1027,8 @@ void StratumServer::on_shutdown() } StratumServer::StratumClient::StratumClient() - : m_rpcId(0) + : Client(m_stratumReadBuf, sizeof(m_stratumReadBuf)) + , m_rpcId(0) , m_perConnectionJobId(0) , m_connectedTime(0) , m_jobs{} @@ -1040,6 +1041,7 @@ StratumServer::StratumClient::StratumClient() , m_lastJobTarget(0) , m_score(0) { + m_stratumReadBuf[0] = '\0'; } void StratumServer::StratumClient::reset() @@ -1072,7 +1074,7 @@ bool StratumServer::StratumClient::on_connect() bool StratumServer::StratumClient::on_read(char* data, uint32_t size) { - if ((data != m_readBuf + m_numRead) || (data + size > m_readBuf + sizeof(m_readBuf))) { + if ((data != m_readBuf + m_numRead) || (data + size > m_readBuf + m_readBufSize)) { LOGERR(1, "client: invalid data pointer or size in on_read()"); ban(DEFAULT_BAN_TIME); return false; diff --git a/src/stratum_server.h b/src/stratum_server.h index c6e9a93..98f6872 100644 --- a/src/stratum_server.h +++ b/src/stratum_server.h @@ -28,7 +28,7 @@ class BlockTemplate; static constexpr size_t STRATUM_BUF_SIZE = log::Stream::BUF_SIZE + 1; static constexpr int DEFAULT_STRATUM_PORT = 3333; -class StratumServer : public TCPServer +class StratumServer : public TCPServer { public: explicit StratumServer(p2pool *pool); @@ -52,6 +52,8 @@ public: bool process_login(rapidjson::Document& doc, uint32_t id); bool process_submit(rapidjson::Document& doc, uint32_t id); + alignas(8) char m_stratumReadBuf[STRATUM_BUF_SIZE]; + uint32_t m_rpcId; uint32_t m_perConnectionJobId; uint64_t m_connectedTime; @@ -95,6 +97,8 @@ public: void reset_share_counters(); private: + const char* get_category() const override { return "StratumServer "; } + void print_stratum_status() const; void update_auto_diff(StratumClient* client, const uint64_t timestamp, const uint64_t hashes); diff --git a/src/tcp_server.inl b/src/tcp_server.cpp similarity index 81% rename from src/tcp_server.inl rename to src/tcp_server.cpp index 514d839..495bb74 100644 --- a/src/tcp_server.inl +++ b/src/tcp_server.cpp @@ -15,15 +15,17 @@ * along with this program. If not, see . */ -#include +#include "common.h" +#include "tcp_server.h" -static thread_local bool server_event_loop_thread = false; +static thread_local void* server_event_loop_thread = nullptr; +static thread_local const char* log_category_prefix = "TCPServer "; namespace p2pool { -template -TCPServer::TCPServer(allocate_client_callback allocate_new_client) +TCPServer::TCPServer(int default_backlog, allocate_client_callback allocate_new_client) : m_allocateNewClient(allocate_new_client) + , m_defaultBacklog(default_backlog) , m_loopThread{} #ifdef WITH_UPNP , m_portMapping(0) @@ -71,9 +73,7 @@ TCPServer::TCPServer(allocate_client_callback all m_connectedClientsList->m_prev = m_connectedClientsList; } -template -// cppcheck-suppress functionStatic -TCPServer::~TCPServer() +TCPServer::~TCPServer() { if (m_finished.load() == 0) { LOGERR(1, "TCP wasn't shutdown properly"); @@ -83,10 +83,7 @@ TCPServer::~TCPServer() delete m_connectedClientsList; } - -template -template -void TCPServer::parse_address_list(const std::string& address_list, T callback) +void TCPServer::parse_address_list_internal(const std::string& address_list, Callback::Base&& callback) { if (address_list.empty()) { return; @@ -120,7 +117,7 @@ void TCPServer::parse_address_list(const std::str callback(is_v6, address, ip, port); } else { - LOGWARN(1, "invalid IP:port " << address); + error_invalid_ip(address); } } @@ -130,8 +127,7 @@ void TCPServer::parse_address_list(const std::str } } -template -bool TCPServer::start_listening(bool is_v6, const std::string& ip, int port, std::string address) +bool TCPServer::start_listening(bool is_v6, const std::string& ip, int port, std::string address) { if ((m_listenPort >= 0) && (m_listenPort != port)) { LOGERR(1, "all sockets must be listening on the same port number, fix the command line"); @@ -205,7 +201,7 @@ bool TCPServer::start_listening(bool is_v6, const } } - err = uv_listen(reinterpret_cast(socket), DEFAULT_BACKLOG, on_new_connection); + err = uv_listen(reinterpret_cast(socket), m_defaultBacklog, on_new_connection); if (err) { LOGERR(1, "failed to listen on tcp server socket " << address << ", error " << uv_err_name(err)); return false; @@ -226,8 +222,7 @@ bool TCPServer::start_listening(bool is_v6, const return true; } -template -void TCPServer::start_listening(const std::string& listen_addresses, bool upnp) +void TCPServer::start_listening(const std::string& listen_addresses, bool upnp) { if (listen_addresses.empty()) { LOGERR(1, "listen address not set"); @@ -257,8 +252,7 @@ void TCPServer::start_listening(const std::string } } -template -bool TCPServer::connect_to_peer(bool is_v6, const char* ip, int port) +bool TCPServer::connect_to_peer(bool is_v6, const char* ip, int port) { if (!ip || (strlen(ip) > sizeof(Client::m_addrString) - 16)) { LOGERR(1, "failed to parse IP address, too long"); @@ -290,8 +284,7 @@ bool TCPServer::connect_to_peer(bool is_v6, const return connect_to_peer(client); } -template -bool TCPServer::connect_to_peer(bool is_v6, const raw_ip& ip, int port) +bool TCPServer::connect_to_peer(bool is_v6, const raw_ip& ip, int port) { if (m_finished.load()) { return false; @@ -307,8 +300,7 @@ bool TCPServer::connect_to_peer(bool is_v6, const return connect_to_peer(client); } -template -bool TCPServer::is_banned(const raw_ip& ip) +bool TCPServer::is_banned(const raw_ip& ip) { if (ip.is_localhost()) { return false; @@ -330,8 +322,7 @@ bool TCPServer::is_banned(const raw_ip& ip) return false; } -template -bool TCPServer::connect_to_peer(Client* client) +bool TCPServer::connect_to_peer(Client* client) { if (is_banned(client->m_addr)) { LOGINFO(5, "peer " << log::Gray() << static_cast(client->m_addrString) << log::NoColor() << " is banned, not connecting to it"); @@ -360,7 +351,11 @@ bool TCPServer::connect_to_peer(Client* client) return false; } - static_assert(sizeof(client->m_readBuf) >= sizeof(uv_connect_t), "READ_BUF_SIZE must be large enough"); + if (client->m_readBufSize < sizeof(uv_connect_t)) { + LOGERR(1, "client read buf size is too small (" << client->m_readBufSize << " bytes), expected at least " << sizeof(uv_connect_t) << " bytes"); + uv_close(reinterpret_cast(&client->m_socket), on_connection_error); + return false; + } uv_connect_t* connect_request = reinterpret_cast(client->m_readBuf); memset(connect_request, 0, sizeof(uv_connect_t)); @@ -412,17 +407,15 @@ bool TCPServer::connect_to_peer(Client* client) } #ifdef P2POOL_DEBUGGING -template -void TCPServer::check_event_loop_thread(const char* func) +void TCPServer::check_event_loop_thread(const char* func) const { - if (!server_event_loop_thread) { + if (server_event_loop_thread != this) { LOGERR(1, func << " called from another thread, this is not thread safe"); } } #endif -template -void TCPServer::close_sockets(bool listen_sockets) +void TCPServer::close_sockets(bool listen_sockets) { check_event_loop_thread(__func__); @@ -456,8 +449,12 @@ void TCPServer::close_sockets(bool listen_sockets } } -template -void TCPServer::shutdown_tcp() +void TCPServer::error_invalid_ip(const std::string& address) +{ + LOGERR(1, "invalid IP:port " << address); +} + +void TCPServer::shutdown_tcp() { if (m_finished.exchange(1)) { return; @@ -478,16 +475,14 @@ void TCPServer::shutdown_tcp() LOGINFO(1, "stopped"); } -template -void TCPServer::print_status() +void TCPServer::print_status() { LOGINFO(0, "status" << "\nConnections = " << m_numConnections.load() << " (" << m_numIncomingConnections.load() << " incoming)" ); } -template -void TCPServer::ban(const raw_ip& ip, uint64_t seconds) +void TCPServer::ban(const raw_ip& ip, uint64_t seconds) { if (ip.is_localhost()) { return; @@ -499,8 +494,7 @@ void TCPServer::ban(const raw_ip& ip, uint64_t se m_bans[ip] = ban_time; } -template -void TCPServer::print_bans() +void TCPServer::print_bans() { using namespace std::chrono; const auto cur_time = steady_clock::now(); @@ -515,8 +509,7 @@ void TCPServer::print_bans() } } -template -bool TCPServer::send_internal(Client* client, SendCallbackBase&& callback) +bool TCPServer::send_internal(Client* client, Callback::Base&& callback) { check_event_loop_thread(__func__); @@ -525,12 +518,10 @@ bool TCPServer::send_internal(Client* client, Sen return true; } - // callback_buf is used in only 1 thread, so it's safe - static uint8_t callback_buf[WRITE_BUF_SIZE]; - const size_t bytes_written = callback(callback_buf, sizeof(callback_buf)); + const size_t bytes_written = callback(m_callbackBuf.data(), m_callbackBuf.size()); - if (bytes_written > WRITE_BUF_SIZE) { - LOGERR(0, "send callback wrote " << bytes_written << " bytes, expected no more than " << WRITE_BUF_SIZE << " bytes"); + if (bytes_written > m_callbackBuf.size()) { + LOGERR(0, "send callback wrote " << bytes_written << " bytes, expected no more than " << m_callbackBuf.size() << " bytes"); PANIC_STOP(); } @@ -553,7 +544,7 @@ bool TCPServer::send_internal(Client* client, Sen } } - memcpy(buf->m_data, callback_buf, bytes_written); + memcpy(buf->m_data, m_callbackBuf.data(), bytes_written); uv_buf_t bufs[1]; bufs[0].base = reinterpret_cast(buf->m_data); @@ -569,15 +560,16 @@ bool TCPServer::send_internal(Client* client, Sen return true; } -template -void TCPServer::loop(void* data) +void TCPServer::loop(void* data) { - LOGINFO(1, "event loop started"); - server_event_loop_thread = true; TCPServer* server = static_cast(data); + log_category_prefix = server->get_category(); - server->m_preallocatedClients.reserve(DEFAULT_BACKLOG); - for (size_t i = 0; i < DEFAULT_BACKLOG; ++i) { + LOGINFO(1, "event loop started"); + server_event_loop_thread = data; + + server->m_preallocatedClients.reserve(server->m_defaultBacklog); + for (int i = 0; i < server->m_defaultBacklog; ++i) { WriteBuf* wb = new WriteBuf(); const size_t capacity = wb->m_dataCapacity; @@ -622,8 +614,7 @@ void TCPServer::loop(void* data) LOGINFO(1, "event loop stopped"); } -template -void TCPServer::on_new_connection(uv_stream_t* server, int status) +void TCPServer::on_new_connection(uv_stream_t* server, int status) { TCPServer* pThis = static_cast(server->data); @@ -639,17 +630,16 @@ void TCPServer::on_new_connection(uv_stream_t* se pThis->on_new_client(server); } -template -void TCPServer::on_connection_close(uv_handle_t* handle) +void TCPServer::on_connection_close(uv_handle_t* handle) { - check_event_loop_thread(__func__); - Client* client = static_cast(handle->data); TCPServer* owner = client->m_owner; LOGINFO(5, "peer " << log::Gray() << static_cast(client->m_addrString) << log::NoColor() << " disconnected"); if (owner) { + owner->check_event_loop_thread(__func__); + Client* prev_in_list = client->m_prev; Client* next_in_list = client->m_next; @@ -672,15 +662,13 @@ void TCPServer::on_connection_close(uv_handle_t* } } -template -void TCPServer::on_connection_error(uv_handle_t* handle) +void TCPServer::on_connection_error(uv_handle_t* handle) { Client* client = reinterpret_cast(handle->data); client->m_owner->return_client(client); } -template -void TCPServer::on_connect(uv_connect_t* req, int status) +void TCPServer::on_connect(uv_connect_t* req, int status) { Client* client = reinterpret_cast(req->data); @@ -706,8 +694,7 @@ void TCPServer::on_connect(uv_connect_t* req, int server->on_new_client(nullptr, client); } -template -void TCPServer::on_new_client(uv_stream_t* server) +void TCPServer::on_new_client(uv_stream_t* server) { if (m_finished.load()) { return; @@ -741,8 +728,7 @@ void TCPServer::on_new_client(uv_stream_t* server on_new_client(server, client); } -template -void TCPServer::on_new_client(uv_stream_t* server, Client* client) +void TCPServer::on_new_client(uv_stream_t* server, Client* client) { check_event_loop_thread(__func__); @@ -836,8 +822,7 @@ void TCPServer::on_new_client(uv_stream_t* server } } -template -void TCPServer::on_shutdown(uv_async_t* async) +void TCPServer::on_shutdown(uv_async_t* async) { TCPServer* s = reinterpret_cast(async->data); s->on_shutdown(); @@ -897,8 +882,7 @@ void TCPServer::on_shutdown(uv_async_t* async) }); } -template -typename TCPServer::WriteBuf* TCPServer::get_write_buffer(size_t size_hint) +TCPServer::WriteBuf* TCPServer::get_write_buffer(size_t size_hint) { WriteBuf* buf; @@ -925,8 +909,7 @@ typename TCPServer::WriteBuf* TCPServer -void TCPServer::return_write_buffer(WriteBuf* buf) +void TCPServer::return_write_buffer(WriteBuf* buf) { const size_t capacity = buf->m_dataCapacity; @@ -938,8 +921,7 @@ void TCPServer::return_write_buffer(WriteBuf* buf m_writeBuffers.emplace(capacity, buf); } -template -typename TCPServer::Client* TCPServer::get_client() +TCPServer::Client* TCPServer::get_client() { Client* c; @@ -957,16 +939,16 @@ typename TCPServer::Client* TCPServer -void TCPServer::return_client(Client* c) +void TCPServer::return_client(Client* c) { ASAN_POISON_MEMORY_REGION(c, c->size()); m_preallocatedClients.push_back(c); } -template -TCPServer::Client::Client() - : m_owner(nullptr) +TCPServer::Client::Client(char* read_buf, size_t size) + : m_readBuf(read_buf) + , m_readBufSize(static_cast(size)) + , m_owner(nullptr) , m_prev(nullptr) , m_next(nullptr) , m_socket{} @@ -982,11 +964,10 @@ TCPServer::Client::Client() , m_resetCounter{ 0 } { m_readBuf[0] = '\0'; - m_readBuf[READ_BUF_SIZE - 1] = '\0'; + m_readBuf[m_readBufSize - 1] = '\0'; } -template -void TCPServer::Client::reset() +void TCPServer::Client::reset() { m_resetCounter.fetch_add(1); @@ -1004,11 +985,10 @@ void TCPServer::Client::reset() m_addrString[0] = '\0'; m_socks5ProxyState = Socks5ProxyState::Default; m_readBuf[0] = '\0'; - m_readBuf[READ_BUF_SIZE - 1] = '\0'; + m_readBuf[m_readBufSize - 1] = '\0'; } -template -void TCPServer::Client::on_alloc(uv_handle_t* handle, size_t /*suggested_size*/, uv_buf_t* buf) +void TCPServer::Client::on_alloc(uv_handle_t* handle, size_t /*suggested_size*/, uv_buf_t* buf) { Client* pThis = static_cast(handle->data); @@ -1019,20 +999,19 @@ void TCPServer::Client::on_alloc(uv_handle_t* han return; } - if (pThis->m_numRead >= sizeof(pThis->m_readBuf)) { + if (pThis->m_numRead >= pThis->m_readBufSize) { LOGWARN(4, "client " << static_cast(pThis->m_addrString) << " read buffer is full"); buf->len = 0; buf->base = nullptr; return; } - buf->len = sizeof(pThis->m_readBuf) - pThis->m_numRead; + buf->len = pThis->m_readBufSize - pThis->m_numRead; buf->base = pThis->m_readBuf + pThis->m_numRead; pThis->m_readBufInUse = true; } -template -void TCPServer::Client::on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) +void TCPServer::Client::on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { Client* client = static_cast(stream->data); client->m_readBufInUse = false; @@ -1067,10 +1046,9 @@ void TCPServer::Client::on_read(uv_stream_t* stre } } -template -bool TCPServer::Client::on_proxy_handshake(char* data, uint32_t size) +bool TCPServer::Client::on_proxy_handshake(char* data, uint32_t size) { - if ((data != m_readBuf + m_numRead) || (data + size > m_readBuf + sizeof(m_readBuf))) { + if ((data != m_readBuf + m_numRead) || (data + size > m_readBuf + m_readBufSize)) { LOGERR(1, "peer " << static_cast(m_addrString) << " invalid data pointer or size in on_read()"); return false; } @@ -1189,8 +1167,7 @@ bool TCPServer::Client::on_proxy_handshake(char* return true; } -template -void TCPServer::Client::on_write(uv_write_t* req, int status) +void TCPServer::Client::on_write(uv_write_t* req, int status) { WriteBuf* buf = static_cast(req->data); Client* client = buf->m_client; @@ -1206,8 +1183,7 @@ void TCPServer::Client::on_write(uv_write_t* req, } } -template -void TCPServer::Client::close() +void TCPServer::Client::close() { if (m_isClosing || !m_owner) { // Already closed @@ -1225,8 +1201,7 @@ void TCPServer::Client::close() } } -template -void TCPServer::Client::ban(uint64_t seconds) +void TCPServer::Client::ban(uint64_t seconds) { if (m_addr.is_localhost()) { return; @@ -1238,8 +1213,7 @@ void TCPServer::Client::ban(uint64_t seconds) } } -template -void TCPServer::Client::init_addr_string() +void TCPServer::Client::init_addr_string() { const char* addr_str; char addr_str_buf[64]; diff --git a/src/tcp_server.h b/src/tcp_server.h index 0bd0447..081cf60 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -22,19 +22,15 @@ namespace p2pool { -template class TCPServer : public nocopy_nomove { public: struct Client; typedef Client* (*allocate_client_callback)(); - explicit TCPServer(allocate_client_callback allocate_new_client); + TCPServer(int default_backlog, allocate_client_callback allocate_new_client); virtual ~TCPServer(); - template - void parse_address_list(const std::string& address_list, T callback); - bool connect_to_peer(bool is_v6, const char* ip, int port); void drop_connections_async() { if (m_finished.load() == 0) { uv_async_send(&m_dropConnectionsAsync); } } @@ -53,7 +49,7 @@ public: struct Client { - Client(); + Client(char* read_buf, size_t size); virtual ~Client() {} virtual size_t size() const = 0; @@ -74,7 +70,8 @@ public: void init_addr_string(); - alignas(8) char m_readBuf[READ_BUF_SIZE]; + char* m_readBuf; + uint32_t m_readBufSize; TCPServer* m_owner; @@ -116,26 +113,14 @@ public: WriteBuf* get_write_buffer(size_t size_hint); void return_write_buffer(WriteBuf* buf); - struct SendCallbackBase + template + FORCEINLINE static void parse_address_list(const std::string& address_list, T&& callback) { - virtual ~SendCallbackBase() {} - virtual size_t operator()(void*, size_t) = 0; - }; + return parse_address_list_internal(address_list, Callback::Derived(std::move(callback))); + } template - struct SendCallback : public SendCallbackBase - { - explicit FORCEINLINE SendCallback(T&& callback) : m_callback(std::move(callback)) {} - size_t operator()(void* buf, size_t buf_size) override { return m_callback(buf, buf_size); } - - private: - SendCallback& operator=(SendCallback&&) = delete; - - T m_callback; - }; - - template - FORCEINLINE bool send(Client* client, T&& callback) { return send_internal(client, SendCallback(std::move(callback))); } + FORCEINLINE bool send(Client* client, T&& callback) { return send_internal(client, Callback::Derived(std::move(callback))); } private: static void on_new_connection(uv_stream_t* server, int status); @@ -147,20 +132,27 @@ private: bool connect_to_peer(Client* client); - bool send_internal(Client* client, SendCallbackBase&& callback); + bool send_internal(Client* client, Callback::Base&& callback); allocate_client_callback m_allocateNewClient; void close_sockets(bool listen_sockets); + static void error_invalid_ip(const std::string& address); std::vector m_listenSockets6; std::vector m_listenSockets; protected: + virtual const char* get_category() const { return "TCPServer "; } + + std::vector m_callbackBuf; + int m_defaultBacklog; + uv_thread_t m_loopThread; static void loop(void* data); + static void parse_address_list_internal(const std::string& address_list, Callback::Base&& callback); void start_listening(const std::string& listen_addresses, bool upnp); bool start_listening(bool is_v6, const std::string& ip, int port, std::string address = std::string()); @@ -179,7 +171,7 @@ protected: uv_loop_t m_loop; #ifdef P2POOL_DEBUGGING - static void check_event_loop_thread(const char *func); + void check_event_loop_thread(const char *func) const; #else static FORCEINLINE void check_event_loop_thread(const char*) {} #endif diff --git a/src/util.h b/src/util.h index 3072069..d519374 100644 --- a/src/util.h +++ b/src/util.h @@ -275,6 +275,27 @@ struct PerfTimer #define PERFLOG(level, name) PerfTimer CONCAT(perf_timer_, __LINE__)(level, name) #endif +template +struct Callback +{ + struct Base + { + virtual ~Base() {} + virtual R operator()(Args...) = 0; + }; + + template + struct Derived : public Base + { + explicit FORCEINLINE Derived(T&& cb) : m_cb(std::move(cb)) {} + R operator()(Args... args) override { return m_cb(args...); } + + private: + Derived& operator=(Derived&&) = delete; + T m_cb; + }; +}; + } // namespace p2pool void memory_tracking_start(); diff --git a/src/uv_util.h b/src/uv_util.h index 7764916..e4e6f0c 100644 --- a/src/uv_util.h +++ b/src/uv_util.h @@ -64,22 +64,7 @@ void uv_rwlock_init_checked(uv_rwlock_t* lock); void uv_async_init_checked(uv_loop_t* loop, uv_async_t* async, uv_async_cb async_cb); uv_loop_t* uv_default_loop_checked(); -struct UV_LoopCallbackBase -{ - virtual ~UV_LoopCallbackBase() {} - virtual void operator()() = 0; -}; - -template -struct UV_LoopCallback : public UV_LoopCallbackBase -{ - explicit FORCEINLINE UV_LoopCallback(T&& cb) : m_cb(std::move(cb)) {} - void operator()() override { m_cb(); } - -private: - UV_LoopCallback& operator=(UV_LoopCallback&&) = delete; - T m_cb; -}; +typedef Callback::Base UV_LoopCallbackBase; struct UV_LoopUserData { @@ -147,7 +132,7 @@ bool CallOnLoop(uv_loop_t* loop, T&& callback) return false; } - UV_LoopCallbackBase* cb = new UV_LoopCallback(std::move(callback)); + UV_LoopCallbackBase* cb = new Callback::Derived(std::move(callback)); { MutexLock lock(data->m_callbacksLock); data->m_callbacks.push_back(cb); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 39e6c93..0440de7 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -55,6 +55,7 @@ set(SOURCES ../src/pow_hash.cpp ../src/side_chain.cpp ../src/stratum_server.cpp + ../src/tcp_server.cpp ../src/util.cpp ../src/wallet.cpp ../src/zmq_reader.cpp diff --git a/tests/src/pool_block_tests.cpp b/tests/src/pool_block_tests.cpp index 2b76386..c4aaab6 100644 --- a/tests/src/pool_block_tests.cpp +++ b/tests/src/pool_block_tests.cpp @@ -87,7 +87,7 @@ TEST(pool_block, deserialize) class RandomX_Hasher_Test : public RandomX_Hasher_Base { public: - bool calculate(const void* data, size_t size, uint64_t, const hash&, hash& result, bool force_light_mode) override + bool calculate(const void* data, size_t size, uint64_t, const hash&, hash& result, bool /*force_light_mode*/) override { if (size == 76) { char buf[76 * 2 + 1];