From 3f1ee9ce4bdebb5ca7f07f5878e5131a458fe1f0 Mon Sep 17 00:00:00 2001 From: SChernykh Date: Thu, 26 Aug 2021 23:27:05 +0200 Subject: [PATCH] TCPServer fixes - Proper shutdown sequence, uv_close() must be called from the event loop thread - Moved start_listening() to child class constructor because it must be ready before it can listen on sockets - Added simple memory leak detector for Windows. Linux users can enjoy the leak sanitizer --- CMakeLists.txt | 1 + src/main.cpp | 15 ++- src/memory_leak_debug.cpp | 215 ++++++++++++++++++++++++++++++++++++++ src/p2p_server.cpp | 3 +- src/stratum_server.cpp | 4 +- src/tcp_server.h | 22 +++- src/tcp_server.inl | 76 +++++++++----- 7 files changed, 303 insertions(+), 33 deletions(-) create mode 100644 src/memory_leak_debug.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index c335b07..a4155b0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -53,6 +53,7 @@ set(SOURCES src/keccak.cpp src/log.cpp src/main.cpp + src/memory_leak_debug.cpp src/mempool.cpp src/p2p_server.cpp src/p2pool.cpp diff --git a/src/main.cpp b/src/main.cpp index a21991f..275492c 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -43,6 +43,9 @@ static void usage() ); } +void memory_tracking_start(); +void memory_tracking_stop(); + int main(int argc, char* argv[]) { if (argc == 1) { @@ -57,6 +60,14 @@ int main(int argc, char* argv[]) } } - p2pool::p2pool pool(argc, argv); - return pool.run(); + int result; + + memory_tracking_start(); + { + p2pool::p2pool pool(argc, argv); + result = pool.run(); + } + memory_tracking_stop(); + + return result; } diff --git a/src/memory_leak_debug.cpp b/src/memory_leak_debug.cpp new file mode 100644 index 0000000..75db321 --- /dev/null +++ b/src/memory_leak_debug.cpp @@ -0,0 +1,215 @@ +/* + * This file is part of the Monero P2Pool + * Copyright (c) 2021 SChernykh + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include "common.h" + +// Simple memory leak detector for Windows users, works best in RelWithDebInfo configuration. +#if defined(_WIN32) && 0 + +#include "uv_util.h" +#include +#include + +#include "dbghelp.h" + +#pragma comment(lib, "Dbghelp.lib") + +namespace p2pool { + +static bool track_memory = false; + +constexpr size_t N = 1048576; +constexpr size_t MAX_FRAMES = 30; + +struct TrackedAllocation +{ + void* p; + void* stack_trace[MAX_FRAMES]; + uint32_t thread_id; + uint32_t allocated_size; +}; + +static_assert(sizeof(TrackedAllocation) == 256, ""); + +uv_mutex_t allocation_lock; +std::hash hasher; +uint32_t first[N]; +uint32_t next[N]; +TrackedAllocation allocations[N]; +uint32_t num_allocations = 0; +uint32_t cur_allocation_index = 1; + +FORCEINLINE static void add_alocation(void* p, size_t size) +{ + if (!track_memory) { + return; + } + + void* stack_trace[MAX_FRAMES]; + DWORD hash; + CaptureStackBackTrace(1, MAX_FRAMES, stack_trace, &hash); + + const DWORD thread_id = GetCurrentThreadId(); + + const size_t index = hasher(p) & (N - 1); + + p2pool::MutexLock lock(allocation_lock); + + ++num_allocations; + if (num_allocations >= N / 2) { + // Make N two times bigger if this triggers + __debugbreak(); + } + + for (uint64_t i = cur_allocation_index;; i = (i + 1) & (N - 1)) { + if (i && !allocations[i].allocated_size) { + cur_allocation_index = static_cast(i); + allocations[i].allocated_size = static_cast(size); + break; + } + } + + TrackedAllocation& t = allocations[cur_allocation_index]; + t.p = p; + memcpy(t.stack_trace, stack_trace, sizeof(stack_trace)); + next[cur_allocation_index] = first[index]; + t.thread_id = thread_id; + first[index] = cur_allocation_index; +} + +FORCEINLINE static void remove_allocation(void* p) +{ + if (!track_memory || !p) { + return; + } + + p2pool::MutexLock lock(allocation_lock); + + --num_allocations; + + const size_t index = hasher(p) & (N - 1); + + bool found = false; + for (uint32_t prev = 0, k = first[index]; k != 0; prev = k, k = next[k]) { + if (allocations[k].p == p) { + found = true; + allocations[k].allocated_size = 0; + if (prev) { + next[prev] = next[k]; + } + else { + first[index] = next[k]; + } + return; + } + } + if (!found) { + // Someone tried to deallocate a pointer that wasn't allocated before + __debugbreak(); + } +} + +FORCEINLINE static void* allocate(size_t n) +{ + void* p = malloc(n + sizeof(TrackedAllocation)); + if (!p) { + throw std::bad_alloc(); + } + add_alocation(p, n); + return p; +} + +FORCEINLINE static void deallocate(void* p) +{ + remove_allocation(p); + free(p); +} + +} // p2pool + +void memory_tracking_start() +{ + using namespace p2pool; + + uv_mutex_init_checked(&allocation_lock); + track_memory = true; +} + +void memory_tracking_stop() +{ + using namespace p2pool; + + track_memory = false; + uv_mutex_destroy(&allocation_lock); + + const HANDLE h = GetCurrentProcess(); + SymInitialize(h, NULL, TRUE); + + uint64_t total_leaks = 0; + + for (uint32_t i = 0; i < N; ++i) { + if (allocations[i].allocated_size) { + total_leaks += allocations[i].allocated_size; + + char buffer[sizeof(SYMBOL_INFO) + MAX_SYM_NAME * sizeof(TCHAR)] = {}; + PSYMBOL_INFO pSymbol = reinterpret_cast(buffer); + + pSymbol->SizeOfStruct = sizeof(SYMBOL_INFO); + pSymbol->MaxNameLen = MAX_SYM_NAME; + + IMAGEHLP_LINE64 line{}; + line.SizeOfStruct = sizeof(IMAGEHLP_LINE64); + + printf("Memory leak detected, %u bytes allocated by thread %u at:\n", allocations[i].allocated_size, allocations[i].thread_id); + for (size_t j = 0; j < MAX_FRAMES; ++j) { + const DWORD64 address = reinterpret_cast(allocations[i].stack_trace[j]); + DWORD64 t1 = 0; + DWORD t2 = 0; + if (SymFromAddr(h, address, &t1, pSymbol) && SymGetLineFromAddr64(h, address, &t2, &line)) { + const char* s = line.FileName; + const char* file_name = nullptr; + while (*s) { + if ((*s == '\\') || (*s == '/')) { + file_name = s + 1; + } + ++s; + } + printf("%-25s %s (line %lu)\n", file_name ? file_name : line.FileName, pSymbol->Name, line.LineNumber); + } + } + printf("\n"); + } + } + + if (total_leaks > 0) { + printf("%I64u bytes leaked\n\n", total_leaks); + } +} + +NOINLINE void* operator new(size_t n) { return p2pool::allocate(n); } +NOINLINE void* operator new[](size_t n) { return p2pool::allocate(n); } +NOINLINE void* operator new(size_t n, const std::nothrow_t&) noexcept { return p2pool::allocate(n); } +NOINLINE void* operator new[](size_t n, const std::nothrow_t&) noexcept { return p2pool::allocate(n); } +NOINLINE void operator delete(void* p) noexcept { p2pool::deallocate(p); } +NOINLINE void operator delete[](void* p) noexcept { p2pool::deallocate(p); } +NOINLINE void operator delete(void* p, size_t) noexcept { p2pool::deallocate(p); } +NOINLINE void operator delete[](void* p, size_t) noexcept { p2pool::deallocate(p); } + +#else +void memory_tracking_start() {} +void memory_tracking_stop() {} +#endif diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 2552da9..a2b5abc 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -37,7 +37,7 @@ static constexpr uint64_t DEFAULT_BAN_TIME = 600; namespace p2pool { P2PServer::P2PServer(p2pool* pool) - : TCPServer(P2PClient::allocate, pool->params().m_p2pAddresses) + : TCPServer(P2PClient::allocate) , m_pool(pool) , m_cache(new BlockCache()) , m_cacheLoaded(false) @@ -81,6 +81,7 @@ P2PServer::P2PServer(p2pool* pool) panic(); } + start_listening(pool->params().m_p2pAddresses); connect_to_peers(pool->params().m_p2pPeerList); load_saved_peer_list(); } diff --git a/src/stratum_server.cpp b/src/stratum_server.cpp index 7b74441..3bb3bb9 100644 --- a/src/stratum_server.cpp +++ b/src/stratum_server.cpp @@ -31,7 +31,7 @@ static constexpr uint64_t DEFAULT_BAN_TIME = 600; namespace p2pool { StratumServer::StratumServer(p2pool* pool) - : TCPServer(StratumClient::allocate, pool->params().m_stratumAddresses) + : TCPServer(StratumClient::allocate) , m_pool(pool) , m_extraNonce(0) , m_rd{} @@ -53,6 +53,8 @@ StratumServer::StratumServer(p2pool* pool) } m_blobsAsync.data = this; m_blobsQueue.reserve(2); + + start_listening(pool->params().m_stratumAddresses); } StratumServer::~StratumServer() diff --git a/src/tcp_server.h b/src/tcp_server.h index 5dc5aa5..c2774bd 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -30,7 +30,7 @@ public: struct Client; typedef Client* (*allocate_client_callback)(); - TCPServer(allocate_client_callback allocate_new_client, const std::string& listen_addresses); + TCPServer(allocate_client_callback allocate_new_client); virtual ~TCPServer(); template @@ -38,7 +38,7 @@ public: bool connect_to_peer(bool is_v6, const char* ip, int port); - void drop_connections(); + void drop_connections() { uv_async_send(&m_dropConnectionsAsync); } void shutdown_tcp(); virtual void print_status(); @@ -169,17 +169,20 @@ private: allocate_client_callback m_allocateNewClient; - void start_listening(const std::string& listen_addresses); + void close_sockets(bool listen_sockets); std::vector m_listenSockets6; std::vector m_listenSockets; uv_thread_t m_loopThread; protected: + void start_listening(const std::string& listen_addresses); + std::atomic m_finished{ 0 }; int m_listenPort; uv_loop_t m_loop; + volatile bool m_loopStopped; uv_mutex_t m_clientsListLock; std::vector m_preallocatedClients; @@ -192,6 +195,19 @@ protected: uv_mutex_t m_pendingConnectionsLock; std::set m_pendingConnections; + + uv_async_t m_dropConnectionsAsync; + static void on_drop_connections(uv_async_t* async) { reinterpret_cast(async->data)->close_sockets(false); } + + uv_async_t m_shutdownAsync; + static void on_shutdown(uv_async_t* async) + { + TCPServer* server = reinterpret_cast(async->data); + server->close_sockets(true); + + uv_close(reinterpret_cast(&server->m_dropConnectionsAsync), nullptr); + uv_close(reinterpret_cast(&server->m_shutdownAsync), nullptr); + } }; } // namespace p2pool diff --git a/src/tcp_server.inl b/src/tcp_server.inl index df2c3b6..af4160e 100644 --- a/src/tcp_server.inl +++ b/src/tcp_server.inl @@ -22,9 +22,10 @@ static thread_local bool server_event_loop_thread = false; namespace p2pool { template -TCPServer::TCPServer(allocate_client_callback allocate_new_client, const std::string& listen_addresses) +TCPServer::TCPServer(allocate_client_callback allocate_new_client) : m_allocateNewClient(allocate_new_client) , m_listenPort(-1) + , m_loopStopped(false) , m_numConnections(0) , m_numIncomingConnections(0) { @@ -34,6 +35,12 @@ TCPServer::TCPServer(allocate_client_callback all panic(); } + uv_async_init(&m_loop, &m_dropConnectionsAsync, on_drop_connections); + m_dropConnectionsAsync.data = this; + + uv_async_init(&m_loop, &m_shutdownAsync, on_shutdown); + m_shutdownAsync.data = this; + uv_mutex_init_checked(&m_clientsListLock); uv_mutex_init_checked(&m_bansLock); uv_mutex_init_checked(&m_pendingConnectionsLock); @@ -47,8 +54,6 @@ TCPServer::TCPServer(allocate_client_callback all m_connectedClientsList->m_next = m_connectedClientsList; m_connectedClientsList->m_prev = m_connectedClientsList; - start_listening(listen_addresses); - err = uv_thread_create(&m_loopThread, loop, this); if (err) { LOGERR(1, "failed to start event loop thread, error " << uv_err_name(err)); @@ -63,6 +68,7 @@ TCPServer::~TCPServer() LOGERR(1, "TCP wasn't shutdown properly"); shutdown_tcp(); } + delete m_connectedClientsList; } @@ -361,8 +367,21 @@ bool TCPServer::connect_to_peer_nolock(Client* cl } template -void TCPServer::drop_connections() +void TCPServer::close_sockets(bool listen_sockets) { + if (!server_event_loop_thread) { + LOGERR(1, "closing sockets from another thread, this is not thread safe"); + } + + if (listen_sockets) { + for (uv_tcp_t* s : m_listenSockets6) { + uv_close(reinterpret_cast(s), [](uv_handle_t* h) { delete reinterpret_cast(h); }); + } + for (uv_tcp_t* s : m_listenSockets) { + uv_close(reinterpret_cast(s), [](uv_handle_t* h) { delete reinterpret_cast(h); }); + } + } + MutexLock lock(m_clientsListLock); size_t numClosed = 0; @@ -387,36 +406,37 @@ void TCPServer::shutdown_tcp() return; } - for (uv_tcp_t* s : m_listenSockets6) { - uv_close(reinterpret_cast(s), [](uv_handle_t* h) { delete reinterpret_cast(h); }); - } + uv_async_send(&m_shutdownAsync); - for (uv_tcp_t* s : m_listenSockets) { - uv_close(reinterpret_cast(s), [](uv_handle_t* h) { delete reinterpret_cast(h); }); - } - - drop_connections(); - - // Give it 1 second to gracefully close connections using namespace std::chrono; const system_clock::time_point start_time = system_clock::now(); - volatile uint32_t* n = &m_numConnections; + uint32_t counter = 0; + uv_async_t asy; - while (*n > 0) { - if (duration_cast(system_clock::now() - start_time).count() >= 1000) { - break; + constexpr uint32_t timeout_seconds = 30; + + while (!m_loopStopped) { + const int64_t elapsed_time = duration_cast(system_clock::now() - start_time).count(); + + if (elapsed_time >= (counter + 1) * 1000) { + ++counter; + if (counter < timeout_seconds) { + LOGINFO(5, "waiting for event loop to stop for " << (timeout_seconds - counter) << " more seconds..."); + } + else { + LOGWARN(5, "timed out while waiting for event loop to stop"); + uv_async_init(&m_loop, &asy, nullptr); + uv_stop(&m_loop); + uv_async_send(&asy); + break; + } } + std::this_thread::sleep_for(milliseconds(1)); } - uv_async_t asy; - uv_async_init(&m_loop, &asy, NULL); - uv_stop(&m_loop); - uv_async_send(&asy); - uv_thread_join(&m_loopThread); - uv_loop_close(&m_loop); for (Client* c : m_preallocatedClients) { delete c; @@ -425,6 +445,7 @@ void TCPServer::shutdown_tcp() uv_mutex_destroy(&m_clientsListLock); uv_mutex_destroy(&m_bansLock); uv_mutex_destroy(&m_pendingConnectionsLock); + LOGINFO(1, "stopped"); } @@ -447,7 +468,7 @@ template bool TCPServer::send_internal(Client* client, SendCallbackBase&& callback) { if (!server_event_loop_thread) { - LOGERR(1, "sending data from another thread, this is not safe"); + LOGERR(1, "sending data from another thread, this is not thread safe"); } MutexLock lock0(client->m_sendLock); @@ -507,7 +528,10 @@ void TCPServer::loop(void* data) { LOGINFO(1, "event loop started"); server_event_loop_thread = true; - uv_run(&static_cast(data)->m_loop, UV_RUN_DEFAULT); + TCPServer* server = static_cast(data); + uv_run(&server->m_loop, UV_RUN_DEFAULT); + uv_loop_close(&server->m_loop); + server->m_loopStopped = true; } template