mirror of
https://github.com/SChernykh/p2pool.git
synced 2024-11-16 15:57:39 +00:00
TCPServer fixes
- Proper shutdown sequence, uv_close() must be called from the event loop thread - Moved start_listening() to child class constructor because it must be ready before it can listen on sockets - Added simple memory leak detector for Windows. Linux users can enjoy the leak sanitizer
This commit is contained in:
parent
27e85a922b
commit
3f1ee9ce4b
7 changed files with 303 additions and 33 deletions
|
@ -53,6 +53,7 @@ set(SOURCES
|
||||||
src/keccak.cpp
|
src/keccak.cpp
|
||||||
src/log.cpp
|
src/log.cpp
|
||||||
src/main.cpp
|
src/main.cpp
|
||||||
|
src/memory_leak_debug.cpp
|
||||||
src/mempool.cpp
|
src/mempool.cpp
|
||||||
src/p2p_server.cpp
|
src/p2p_server.cpp
|
||||||
src/p2pool.cpp
|
src/p2pool.cpp
|
||||||
|
|
15
src/main.cpp
15
src/main.cpp
|
@ -43,6 +43,9 @@ static void usage()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void memory_tracking_start();
|
||||||
|
void memory_tracking_stop();
|
||||||
|
|
||||||
int main(int argc, char* argv[])
|
int main(int argc, char* argv[])
|
||||||
{
|
{
|
||||||
if (argc == 1) {
|
if (argc == 1) {
|
||||||
|
@ -57,6 +60,14 @@ int main(int argc, char* argv[])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
p2pool::p2pool pool(argc, argv);
|
int result;
|
||||||
return pool.run();
|
|
||||||
|
memory_tracking_start();
|
||||||
|
{
|
||||||
|
p2pool::p2pool pool(argc, argv);
|
||||||
|
result = pool.run();
|
||||||
|
}
|
||||||
|
memory_tracking_stop();
|
||||||
|
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
215
src/memory_leak_debug.cpp
Normal file
215
src/memory_leak_debug.cpp
Normal file
|
@ -0,0 +1,215 @@
|
||||||
|
/*
|
||||||
|
* This file is part of the Monero P2Pool <https://github.com/SChernykh/p2pool>
|
||||||
|
* Copyright (c) 2021 SChernykh <https://github.com/SChernykh>
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, version 3.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but
|
||||||
|
* WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||||
|
* General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "common.h"
|
||||||
|
|
||||||
|
// Simple memory leak detector for Windows users, works best in RelWithDebInfo configuration.
|
||||||
|
#if defined(_WIN32) && 0
|
||||||
|
|
||||||
|
#include "uv_util.h"
|
||||||
|
#include <atomic>
|
||||||
|
#include <type_traits>
|
||||||
|
|
||||||
|
#include "dbghelp.h"
|
||||||
|
|
||||||
|
#pragma comment(lib, "Dbghelp.lib")
|
||||||
|
|
||||||
|
namespace p2pool {
|
||||||
|
|
||||||
|
static bool track_memory = false;
|
||||||
|
|
||||||
|
constexpr size_t N = 1048576;
|
||||||
|
constexpr size_t MAX_FRAMES = 30;
|
||||||
|
|
||||||
|
struct TrackedAllocation
|
||||||
|
{
|
||||||
|
void* p;
|
||||||
|
void* stack_trace[MAX_FRAMES];
|
||||||
|
uint32_t thread_id;
|
||||||
|
uint32_t allocated_size;
|
||||||
|
};
|
||||||
|
|
||||||
|
static_assert(sizeof(TrackedAllocation) == 256, "");
|
||||||
|
|
||||||
|
uv_mutex_t allocation_lock;
|
||||||
|
std::hash<void*> hasher;
|
||||||
|
uint32_t first[N];
|
||||||
|
uint32_t next[N];
|
||||||
|
TrackedAllocation allocations[N];
|
||||||
|
uint32_t num_allocations = 0;
|
||||||
|
uint32_t cur_allocation_index = 1;
|
||||||
|
|
||||||
|
FORCEINLINE static void add_alocation(void* p, size_t size)
|
||||||
|
{
|
||||||
|
if (!track_memory) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* stack_trace[MAX_FRAMES];
|
||||||
|
DWORD hash;
|
||||||
|
CaptureStackBackTrace(1, MAX_FRAMES, stack_trace, &hash);
|
||||||
|
|
||||||
|
const DWORD thread_id = GetCurrentThreadId();
|
||||||
|
|
||||||
|
const size_t index = hasher(p) & (N - 1);
|
||||||
|
|
||||||
|
p2pool::MutexLock lock(allocation_lock);
|
||||||
|
|
||||||
|
++num_allocations;
|
||||||
|
if (num_allocations >= N / 2) {
|
||||||
|
// Make N two times bigger if this triggers
|
||||||
|
__debugbreak();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (uint64_t i = cur_allocation_index;; i = (i + 1) & (N - 1)) {
|
||||||
|
if (i && !allocations[i].allocated_size) {
|
||||||
|
cur_allocation_index = static_cast<uint32_t>(i);
|
||||||
|
allocations[i].allocated_size = static_cast<uint32_t>(size);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TrackedAllocation& t = allocations[cur_allocation_index];
|
||||||
|
t.p = p;
|
||||||
|
memcpy(t.stack_trace, stack_trace, sizeof(stack_trace));
|
||||||
|
next[cur_allocation_index] = first[index];
|
||||||
|
t.thread_id = thread_id;
|
||||||
|
first[index] = cur_allocation_index;
|
||||||
|
}
|
||||||
|
|
||||||
|
FORCEINLINE static void remove_allocation(void* p)
|
||||||
|
{
|
||||||
|
if (!track_memory || !p) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
p2pool::MutexLock lock(allocation_lock);
|
||||||
|
|
||||||
|
--num_allocations;
|
||||||
|
|
||||||
|
const size_t index = hasher(p) & (N - 1);
|
||||||
|
|
||||||
|
bool found = false;
|
||||||
|
for (uint32_t prev = 0, k = first[index]; k != 0; prev = k, k = next[k]) {
|
||||||
|
if (allocations[k].p == p) {
|
||||||
|
found = true;
|
||||||
|
allocations[k].allocated_size = 0;
|
||||||
|
if (prev) {
|
||||||
|
next[prev] = next[k];
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
first[index] = next[k];
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!found) {
|
||||||
|
// Someone tried to deallocate a pointer that wasn't allocated before
|
||||||
|
__debugbreak();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
FORCEINLINE static void* allocate(size_t n)
|
||||||
|
{
|
||||||
|
void* p = malloc(n + sizeof(TrackedAllocation));
|
||||||
|
if (!p) {
|
||||||
|
throw std::bad_alloc();
|
||||||
|
}
|
||||||
|
add_alocation(p, n);
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
|
FORCEINLINE static void deallocate(void* p)
|
||||||
|
{
|
||||||
|
remove_allocation(p);
|
||||||
|
free(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // p2pool
|
||||||
|
|
||||||
|
void memory_tracking_start()
|
||||||
|
{
|
||||||
|
using namespace p2pool;
|
||||||
|
|
||||||
|
uv_mutex_init_checked(&allocation_lock);
|
||||||
|
track_memory = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void memory_tracking_stop()
|
||||||
|
{
|
||||||
|
using namespace p2pool;
|
||||||
|
|
||||||
|
track_memory = false;
|
||||||
|
uv_mutex_destroy(&allocation_lock);
|
||||||
|
|
||||||
|
const HANDLE h = GetCurrentProcess();
|
||||||
|
SymInitialize(h, NULL, TRUE);
|
||||||
|
|
||||||
|
uint64_t total_leaks = 0;
|
||||||
|
|
||||||
|
for (uint32_t i = 0; i < N; ++i) {
|
||||||
|
if (allocations[i].allocated_size) {
|
||||||
|
total_leaks += allocations[i].allocated_size;
|
||||||
|
|
||||||
|
char buffer[sizeof(SYMBOL_INFO) + MAX_SYM_NAME * sizeof(TCHAR)] = {};
|
||||||
|
PSYMBOL_INFO pSymbol = reinterpret_cast<PSYMBOL_INFO>(buffer);
|
||||||
|
|
||||||
|
pSymbol->SizeOfStruct = sizeof(SYMBOL_INFO);
|
||||||
|
pSymbol->MaxNameLen = MAX_SYM_NAME;
|
||||||
|
|
||||||
|
IMAGEHLP_LINE64 line{};
|
||||||
|
line.SizeOfStruct = sizeof(IMAGEHLP_LINE64);
|
||||||
|
|
||||||
|
printf("Memory leak detected, %u bytes allocated by thread %u at:\n", allocations[i].allocated_size, allocations[i].thread_id);
|
||||||
|
for (size_t j = 0; j < MAX_FRAMES; ++j) {
|
||||||
|
const DWORD64 address = reinterpret_cast<DWORD64>(allocations[i].stack_trace[j]);
|
||||||
|
DWORD64 t1 = 0;
|
||||||
|
DWORD t2 = 0;
|
||||||
|
if (SymFromAddr(h, address, &t1, pSymbol) && SymGetLineFromAddr64(h, address, &t2, &line)) {
|
||||||
|
const char* s = line.FileName;
|
||||||
|
const char* file_name = nullptr;
|
||||||
|
while (*s) {
|
||||||
|
if ((*s == '\\') || (*s == '/')) {
|
||||||
|
file_name = s + 1;
|
||||||
|
}
|
||||||
|
++s;
|
||||||
|
}
|
||||||
|
printf("%-25s %s (line %lu)\n", file_name ? file_name : line.FileName, pSymbol->Name, line.LineNumber);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
printf("\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (total_leaks > 0) {
|
||||||
|
printf("%I64u bytes leaked\n\n", total_leaks);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
NOINLINE void* operator new(size_t n) { return p2pool::allocate(n); }
|
||||||
|
NOINLINE void* operator new[](size_t n) { return p2pool::allocate(n); }
|
||||||
|
NOINLINE void* operator new(size_t n, const std::nothrow_t&) noexcept { return p2pool::allocate(n); }
|
||||||
|
NOINLINE void* operator new[](size_t n, const std::nothrow_t&) noexcept { return p2pool::allocate(n); }
|
||||||
|
NOINLINE void operator delete(void* p) noexcept { p2pool::deallocate(p); }
|
||||||
|
NOINLINE void operator delete[](void* p) noexcept { p2pool::deallocate(p); }
|
||||||
|
NOINLINE void operator delete(void* p, size_t) noexcept { p2pool::deallocate(p); }
|
||||||
|
NOINLINE void operator delete[](void* p, size_t) noexcept { p2pool::deallocate(p); }
|
||||||
|
|
||||||
|
#else
|
||||||
|
void memory_tracking_start() {}
|
||||||
|
void memory_tracking_stop() {}
|
||||||
|
#endif
|
|
@ -37,7 +37,7 @@ static constexpr uint64_t DEFAULT_BAN_TIME = 600;
|
||||||
namespace p2pool {
|
namespace p2pool {
|
||||||
|
|
||||||
P2PServer::P2PServer(p2pool* pool)
|
P2PServer::P2PServer(p2pool* pool)
|
||||||
: TCPServer(P2PClient::allocate, pool->params().m_p2pAddresses)
|
: TCPServer(P2PClient::allocate)
|
||||||
, m_pool(pool)
|
, m_pool(pool)
|
||||||
, m_cache(new BlockCache())
|
, m_cache(new BlockCache())
|
||||||
, m_cacheLoaded(false)
|
, m_cacheLoaded(false)
|
||||||
|
@ -81,6 +81,7 @@ P2PServer::P2PServer(p2pool* pool)
|
||||||
panic();
|
panic();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start_listening(pool->params().m_p2pAddresses);
|
||||||
connect_to_peers(pool->params().m_p2pPeerList);
|
connect_to_peers(pool->params().m_p2pPeerList);
|
||||||
load_saved_peer_list();
|
load_saved_peer_list();
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,7 @@ static constexpr uint64_t DEFAULT_BAN_TIME = 600;
|
||||||
namespace p2pool {
|
namespace p2pool {
|
||||||
|
|
||||||
StratumServer::StratumServer(p2pool* pool)
|
StratumServer::StratumServer(p2pool* pool)
|
||||||
: TCPServer(StratumClient::allocate, pool->params().m_stratumAddresses)
|
: TCPServer(StratumClient::allocate)
|
||||||
, m_pool(pool)
|
, m_pool(pool)
|
||||||
, m_extraNonce(0)
|
, m_extraNonce(0)
|
||||||
, m_rd{}
|
, m_rd{}
|
||||||
|
@ -53,6 +53,8 @@ StratumServer::StratumServer(p2pool* pool)
|
||||||
}
|
}
|
||||||
m_blobsAsync.data = this;
|
m_blobsAsync.data = this;
|
||||||
m_blobsQueue.reserve(2);
|
m_blobsQueue.reserve(2);
|
||||||
|
|
||||||
|
start_listening(pool->params().m_stratumAddresses);
|
||||||
}
|
}
|
||||||
|
|
||||||
StratumServer::~StratumServer()
|
StratumServer::~StratumServer()
|
||||||
|
|
|
@ -30,7 +30,7 @@ public:
|
||||||
struct Client;
|
struct Client;
|
||||||
typedef Client* (*allocate_client_callback)();
|
typedef Client* (*allocate_client_callback)();
|
||||||
|
|
||||||
TCPServer(allocate_client_callback allocate_new_client, const std::string& listen_addresses);
|
TCPServer(allocate_client_callback allocate_new_client);
|
||||||
virtual ~TCPServer();
|
virtual ~TCPServer();
|
||||||
|
|
||||||
template<typename T>
|
template<typename T>
|
||||||
|
@ -38,7 +38,7 @@ public:
|
||||||
|
|
||||||
bool connect_to_peer(bool is_v6, const char* ip, int port);
|
bool connect_to_peer(bool is_v6, const char* ip, int port);
|
||||||
|
|
||||||
void drop_connections();
|
void drop_connections() { uv_async_send(&m_dropConnectionsAsync); }
|
||||||
void shutdown_tcp();
|
void shutdown_tcp();
|
||||||
virtual void print_status();
|
virtual void print_status();
|
||||||
|
|
||||||
|
@ -169,17 +169,20 @@ private:
|
||||||
|
|
||||||
allocate_client_callback m_allocateNewClient;
|
allocate_client_callback m_allocateNewClient;
|
||||||
|
|
||||||
void start_listening(const std::string& listen_addresses);
|
void close_sockets(bool listen_sockets);
|
||||||
|
|
||||||
std::vector<uv_tcp_t*> m_listenSockets6;
|
std::vector<uv_tcp_t*> m_listenSockets6;
|
||||||
std::vector<uv_tcp_t*> m_listenSockets;
|
std::vector<uv_tcp_t*> m_listenSockets;
|
||||||
uv_thread_t m_loopThread;
|
uv_thread_t m_loopThread;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
void start_listening(const std::string& listen_addresses);
|
||||||
|
|
||||||
std::atomic<int> m_finished{ 0 };
|
std::atomic<int> m_finished{ 0 };
|
||||||
int m_listenPort;
|
int m_listenPort;
|
||||||
|
|
||||||
uv_loop_t m_loop;
|
uv_loop_t m_loop;
|
||||||
|
volatile bool m_loopStopped;
|
||||||
|
|
||||||
uv_mutex_t m_clientsListLock;
|
uv_mutex_t m_clientsListLock;
|
||||||
std::vector<Client*> m_preallocatedClients;
|
std::vector<Client*> m_preallocatedClients;
|
||||||
|
@ -192,6 +195,19 @@ protected:
|
||||||
|
|
||||||
uv_mutex_t m_pendingConnectionsLock;
|
uv_mutex_t m_pendingConnectionsLock;
|
||||||
std::set<raw_ip> m_pendingConnections;
|
std::set<raw_ip> m_pendingConnections;
|
||||||
|
|
||||||
|
uv_async_t m_dropConnectionsAsync;
|
||||||
|
static void on_drop_connections(uv_async_t* async) { reinterpret_cast<TCPServer*>(async->data)->close_sockets(false); }
|
||||||
|
|
||||||
|
uv_async_t m_shutdownAsync;
|
||||||
|
static void on_shutdown(uv_async_t* async)
|
||||||
|
{
|
||||||
|
TCPServer* server = reinterpret_cast<TCPServer*>(async->data);
|
||||||
|
server->close_sockets(true);
|
||||||
|
|
||||||
|
uv_close(reinterpret_cast<uv_handle_t*>(&server->m_dropConnectionsAsync), nullptr);
|
||||||
|
uv_close(reinterpret_cast<uv_handle_t*>(&server->m_shutdownAsync), nullptr);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace p2pool
|
} // namespace p2pool
|
||||||
|
|
|
@ -22,9 +22,10 @@ static thread_local bool server_event_loop_thread = false;
|
||||||
namespace p2pool {
|
namespace p2pool {
|
||||||
|
|
||||||
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
||||||
TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::TCPServer(allocate_client_callback allocate_new_client, const std::string& listen_addresses)
|
TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::TCPServer(allocate_client_callback allocate_new_client)
|
||||||
: m_allocateNewClient(allocate_new_client)
|
: m_allocateNewClient(allocate_new_client)
|
||||||
, m_listenPort(-1)
|
, m_listenPort(-1)
|
||||||
|
, m_loopStopped(false)
|
||||||
, m_numConnections(0)
|
, m_numConnections(0)
|
||||||
, m_numIncomingConnections(0)
|
, m_numIncomingConnections(0)
|
||||||
{
|
{
|
||||||
|
@ -34,6 +35,12 @@ TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::TCPServer(allocate_client_callback all
|
||||||
panic();
|
panic();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uv_async_init(&m_loop, &m_dropConnectionsAsync, on_drop_connections);
|
||||||
|
m_dropConnectionsAsync.data = this;
|
||||||
|
|
||||||
|
uv_async_init(&m_loop, &m_shutdownAsync, on_shutdown);
|
||||||
|
m_shutdownAsync.data = this;
|
||||||
|
|
||||||
uv_mutex_init_checked(&m_clientsListLock);
|
uv_mutex_init_checked(&m_clientsListLock);
|
||||||
uv_mutex_init_checked(&m_bansLock);
|
uv_mutex_init_checked(&m_bansLock);
|
||||||
uv_mutex_init_checked(&m_pendingConnectionsLock);
|
uv_mutex_init_checked(&m_pendingConnectionsLock);
|
||||||
|
@ -47,8 +54,6 @@ TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::TCPServer(allocate_client_callback all
|
||||||
m_connectedClientsList->m_next = m_connectedClientsList;
|
m_connectedClientsList->m_next = m_connectedClientsList;
|
||||||
m_connectedClientsList->m_prev = m_connectedClientsList;
|
m_connectedClientsList->m_prev = m_connectedClientsList;
|
||||||
|
|
||||||
start_listening(listen_addresses);
|
|
||||||
|
|
||||||
err = uv_thread_create(&m_loopThread, loop, this);
|
err = uv_thread_create(&m_loopThread, loop, this);
|
||||||
if (err) {
|
if (err) {
|
||||||
LOGERR(1, "failed to start event loop thread, error " << uv_err_name(err));
|
LOGERR(1, "failed to start event loop thread, error " << uv_err_name(err));
|
||||||
|
@ -63,6 +68,7 @@ TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::~TCPServer()
|
||||||
LOGERR(1, "TCP wasn't shutdown properly");
|
LOGERR(1, "TCP wasn't shutdown properly");
|
||||||
shutdown_tcp();
|
shutdown_tcp();
|
||||||
}
|
}
|
||||||
|
|
||||||
delete m_connectedClientsList;
|
delete m_connectedClientsList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -361,8 +367,21 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer_nolock(Client* cl
|
||||||
}
|
}
|
||||||
|
|
||||||
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
||||||
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::drop_connections()
|
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::close_sockets(bool listen_sockets)
|
||||||
{
|
{
|
||||||
|
if (!server_event_loop_thread) {
|
||||||
|
LOGERR(1, "closing sockets from another thread, this is not thread safe");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (listen_sockets) {
|
||||||
|
for (uv_tcp_t* s : m_listenSockets6) {
|
||||||
|
uv_close(reinterpret_cast<uv_handle_t*>(s), [](uv_handle_t* h) { delete reinterpret_cast<uv_tcp_t*>(h); });
|
||||||
|
}
|
||||||
|
for (uv_tcp_t* s : m_listenSockets) {
|
||||||
|
uv_close(reinterpret_cast<uv_handle_t*>(s), [](uv_handle_t* h) { delete reinterpret_cast<uv_tcp_t*>(h); });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
MutexLock lock(m_clientsListLock);
|
MutexLock lock(m_clientsListLock);
|
||||||
|
|
||||||
size_t numClosed = 0;
|
size_t numClosed = 0;
|
||||||
|
@ -387,36 +406,37 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::shutdown_tcp()
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (uv_tcp_t* s : m_listenSockets6) {
|
uv_async_send(&m_shutdownAsync);
|
||||||
uv_close(reinterpret_cast<uv_handle_t*>(s), [](uv_handle_t* h) { delete reinterpret_cast<uv_tcp_t*>(h); });
|
|
||||||
}
|
|
||||||
|
|
||||||
for (uv_tcp_t* s : m_listenSockets) {
|
|
||||||
uv_close(reinterpret_cast<uv_handle_t*>(s), [](uv_handle_t* h) { delete reinterpret_cast<uv_tcp_t*>(h); });
|
|
||||||
}
|
|
||||||
|
|
||||||
drop_connections();
|
|
||||||
|
|
||||||
// Give it 1 second to gracefully close connections
|
|
||||||
using namespace std::chrono;
|
using namespace std::chrono;
|
||||||
|
|
||||||
const system_clock::time_point start_time = system_clock::now();
|
const system_clock::time_point start_time = system_clock::now();
|
||||||
volatile uint32_t* n = &m_numConnections;
|
uint32_t counter = 0;
|
||||||
|
uv_async_t asy;
|
||||||
|
|
||||||
while (*n > 0) {
|
constexpr uint32_t timeout_seconds = 30;
|
||||||
if (duration_cast<milliseconds>(system_clock::now() - start_time).count() >= 1000) {
|
|
||||||
break;
|
while (!m_loopStopped) {
|
||||||
|
const int64_t elapsed_time = duration_cast<milliseconds>(system_clock::now() - start_time).count();
|
||||||
|
|
||||||
|
if (elapsed_time >= (counter + 1) * 1000) {
|
||||||
|
++counter;
|
||||||
|
if (counter < timeout_seconds) {
|
||||||
|
LOGINFO(5, "waiting for event loop to stop for " << (timeout_seconds - counter) << " more seconds...");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
LOGWARN(5, "timed out while waiting for event loop to stop");
|
||||||
|
uv_async_init(&m_loop, &asy, nullptr);
|
||||||
|
uv_stop(&m_loop);
|
||||||
|
uv_async_send(&asy);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::this_thread::sleep_for(milliseconds(1));
|
std::this_thread::sleep_for(milliseconds(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
uv_async_t asy;
|
|
||||||
uv_async_init(&m_loop, &asy, NULL);
|
|
||||||
uv_stop(&m_loop);
|
|
||||||
uv_async_send(&asy);
|
|
||||||
|
|
||||||
uv_thread_join(&m_loopThread);
|
uv_thread_join(&m_loopThread);
|
||||||
uv_loop_close(&m_loop);
|
|
||||||
|
|
||||||
for (Client* c : m_preallocatedClients) {
|
for (Client* c : m_preallocatedClients) {
|
||||||
delete c;
|
delete c;
|
||||||
|
@ -425,6 +445,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::shutdown_tcp()
|
||||||
uv_mutex_destroy(&m_clientsListLock);
|
uv_mutex_destroy(&m_clientsListLock);
|
||||||
uv_mutex_destroy(&m_bansLock);
|
uv_mutex_destroy(&m_bansLock);
|
||||||
uv_mutex_destroy(&m_pendingConnectionsLock);
|
uv_mutex_destroy(&m_pendingConnectionsLock);
|
||||||
|
|
||||||
LOGINFO(1, "stopped");
|
LOGINFO(1, "stopped");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -447,7 +468,7 @@ template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
||||||
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, SendCallbackBase&& callback)
|
bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, SendCallbackBase&& callback)
|
||||||
{
|
{
|
||||||
if (!server_event_loop_thread) {
|
if (!server_event_loop_thread) {
|
||||||
LOGERR(1, "sending data from another thread, this is not safe");
|
LOGERR(1, "sending data from another thread, this is not thread safe");
|
||||||
}
|
}
|
||||||
|
|
||||||
MutexLock lock0(client->m_sendLock);
|
MutexLock lock0(client->m_sendLock);
|
||||||
|
@ -507,7 +528,10 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::loop(void* data)
|
||||||
{
|
{
|
||||||
LOGINFO(1, "event loop started");
|
LOGINFO(1, "event loop started");
|
||||||
server_event_loop_thread = true;
|
server_event_loop_thread = true;
|
||||||
uv_run(&static_cast<TCPServer*>(data)->m_loop, UV_RUN_DEFAULT);
|
TCPServer* server = static_cast<TCPServer*>(data);
|
||||||
|
uv_run(&server->m_loop, UV_RUN_DEFAULT);
|
||||||
|
uv_loop_close(&server->m_loop);
|
||||||
|
server->m_loopStopped = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
|
||||||
|
|
Loading…
Reference in a new issue