From d081c8ea74384304815a1f69e46256276ea65563 Mon Sep 17 00:00:00 2001 From: SChernykh Date: Tue, 18 Oct 2022 18:41:58 +0200 Subject: [PATCH] TCPServer: fixed data race during shutdown --- src/tcp_server.h | 16 +++----- src/tcp_server.inl | 97 +++++++++++++++++++++++++++++++--------------- 2 files changed, 70 insertions(+), 43 deletions(-) diff --git a/src/tcp_server.h b/src/tcp_server.h index 834f4f2..96bf876 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -163,7 +163,6 @@ protected: int m_listenPort; uv_loop_t m_loop; - std::atomic m_loopStopped; uv_mutex_t m_clientsListLock; std::vector m_preallocatedClients; @@ -184,17 +183,12 @@ protected: virtual void on_shutdown() = 0; uv_async_t m_shutdownAsync; - static void on_shutdown(uv_async_t* async) - { - TCPServer* server = reinterpret_cast(async->data); - server->on_shutdown(); - server->close_sockets(true); + uv_prepare_t m_shutdownPrepare; + uv_timer_t m_shutdownTimer; + uint32_t m_shutdownCountdown; + uint32_t m_numHandles; - uv_close(reinterpret_cast(&server->m_dropConnectionsAsync), nullptr); - uv_close(reinterpret_cast(&server->m_shutdownAsync), nullptr); - - delete GetLoopUserData(&server->m_loop, false); - } + static void on_shutdown(uv_async_t* async); }; } // namespace p2pool diff --git a/src/tcp_server.inl b/src/tcp_server.inl index f860709..4481eb0 100644 --- a/src/tcp_server.inl +++ b/src/tcp_server.inl @@ -31,9 +31,12 @@ TCPServer::TCPServer(allocate_client_callback all , m_finished(0) , m_listenPort(-1) , m_loop{} - , m_loopStopped{false} , m_numConnections{ 0 } , m_numIncomingConnections{ 0 } + , m_shutdownPrepare{} + , m_shutdownTimer{} + , m_shutdownCountdown(30) + , m_numHandles(0) { int err = uv_loop_init(&m_loop); if (err) { @@ -433,35 +436,6 @@ void TCPServer::shutdown_tcp() } uv_async_send(&m_shutdownAsync); - - using namespace std::chrono; - - const auto start_time = steady_clock::now(); - int64_t counter = 0; - uv_async_t asy; - - constexpr uint32_t timeout_seconds = 30; - - while (!m_loopStopped) { - const int64_t elapsed_time = duration_cast(steady_clock::now() - start_time).count(); - - if (elapsed_time >= (counter + 1) * 1000) { - ++counter; - if (counter < timeout_seconds) { - LOGINFO(1, "waiting for event loop to stop for " << (timeout_seconds - counter) << " more seconds..."); - } - else { - LOGWARN(1, "timed out while waiting for event loop to stop"); - uv_async_init(&m_loop, &asy, [](uv_async_t* h) { uv_close(reinterpret_cast(h), nullptr); }); - uv_stop(&m_loop); - uv_async_send(&asy); - break; - } - } - - std::this_thread::sleep_for(milliseconds(1)); - } - uv_thread_join(&m_loopThread); uv_mutex_destroy(&m_clientsListLock); @@ -598,7 +572,6 @@ void TCPServer::loop(void* data) server->m_preallocatedClients.clear(); LOGINFO(1, "event loop stopped"); - server->m_loopStopped = true; } template @@ -729,7 +702,6 @@ void TCPServer::on_new_client(uv_stream_t* server on_new_client(server, client); } - template void TCPServer::on_new_client(uv_stream_t* server, Client* client) { @@ -825,6 +797,67 @@ void TCPServer::on_new_client(uv_stream_t* server } } +template +void TCPServer::on_shutdown(uv_async_t* async) +{ + TCPServer* s = reinterpret_cast(async->data); + s->on_shutdown(); + s->close_sockets(true); + + uv_close(reinterpret_cast(&s->m_dropConnectionsAsync), nullptr); + uv_close(reinterpret_cast(&s->m_shutdownAsync), nullptr); + + delete GetLoopUserData(&s->m_loop, false); + + s->m_numHandles = 0; + uv_walk(&s->m_loop, [](uv_handle_t*, void* n) { (*reinterpret_cast(n))++; }, &s->m_numHandles); + + uv_prepare_init(&s->m_loop, &s->m_shutdownPrepare); + s->m_shutdownPrepare.data = s; + + uv_timer_init(&s->m_loop, &s->m_shutdownTimer); + s->m_shutdownTimer.data = s; + s->m_shutdownCountdown = 30; + + uv_timer_start(&s->m_shutdownTimer, + [](uv_timer_t* h) + { + TCPServer* s = reinterpret_cast(h->data); + const uint32_t k = --s->m_shutdownCountdown; + if (k > 0) { + LOGINFO(1, "waiting for event loop to stop for " << k << " more seconds (" << s->m_numHandles << " handles left)..."); + } + else { + LOGINFO(1, "force stopping the event loop..."); + uv_timer_stop(&s->m_shutdownTimer); + uv_prepare_stop(&s->m_shutdownPrepare); + uv_close(reinterpret_cast(&s->m_shutdownTimer), nullptr); + uv_close(reinterpret_cast(&s->m_shutdownPrepare), nullptr); + uv_stop(&s->m_loop); + } + }, 1000, 1000); + + uv_prepare_start(&s->m_shutdownPrepare, + [](uv_prepare_t* h) + { + TCPServer* s = reinterpret_cast(h->data); + + s->m_numHandles = 0; + uv_walk(&s->m_loop, [](uv_handle_t*, void* n) { (*reinterpret_cast(n))++; }, &s->m_numHandles); + + if (s->m_numHandles > 2) { + // Don't count m_shutdownTimer and m_shutdownPrepare + s->m_numHandles -= 2; + } + else { + uv_timer_stop(&s->m_shutdownTimer); + uv_prepare_stop(&s->m_shutdownPrepare); + uv_close(reinterpret_cast(&s->m_shutdownTimer), nullptr); + uv_close(reinterpret_cast(&s->m_shutdownPrepare), nullptr); + } + }); +} + template TCPServer::Client::Client() : m_owner(nullptr)