TCPServer: fixed data race during shutdown

This commit is contained in:
SChernykh 2022-10-18 18:41:58 +02:00
parent 0342e7ffb5
commit d081c8ea74
2 changed files with 70 additions and 43 deletions

View file

@ -163,7 +163,6 @@ protected:
int m_listenPort;
uv_loop_t m_loop;
std::atomic<bool> m_loopStopped;
uv_mutex_t m_clientsListLock;
std::vector<Client*> m_preallocatedClients;
@ -184,17 +183,12 @@ protected:
virtual void on_shutdown() = 0;
uv_async_t m_shutdownAsync;
static void on_shutdown(uv_async_t* async)
{
TCPServer* server = reinterpret_cast<TCPServer*>(async->data);
server->on_shutdown();
server->close_sockets(true);
uv_prepare_t m_shutdownPrepare;
uv_timer_t m_shutdownTimer;
uint32_t m_shutdownCountdown;
uint32_t m_numHandles;
uv_close(reinterpret_cast<uv_handle_t*>(&server->m_dropConnectionsAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&server->m_shutdownAsync), nullptr);
delete GetLoopUserData(&server->m_loop, false);
}
static void on_shutdown(uv_async_t* async);
};
} // namespace p2pool

View file

@ -31,9 +31,12 @@ TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::TCPServer(allocate_client_callback all
, m_finished(0)
, m_listenPort(-1)
, m_loop{}
, m_loopStopped{false}
, m_numConnections{ 0 }
, m_numIncomingConnections{ 0 }
, m_shutdownPrepare{}
, m_shutdownTimer{}
, m_shutdownCountdown(30)
, m_numHandles(0)
{
int err = uv_loop_init(&m_loop);
if (err) {
@ -433,35 +436,6 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::shutdown_tcp()
}
uv_async_send(&m_shutdownAsync);
using namespace std::chrono;
const auto start_time = steady_clock::now();
int64_t counter = 0;
uv_async_t asy;
constexpr uint32_t timeout_seconds = 30;
while (!m_loopStopped) {
const int64_t elapsed_time = duration_cast<milliseconds>(steady_clock::now() - start_time).count();
if (elapsed_time >= (counter + 1) * 1000) {
++counter;
if (counter < timeout_seconds) {
LOGINFO(1, "waiting for event loop to stop for " << (timeout_seconds - counter) << " more seconds...");
}
else {
LOGWARN(1, "timed out while waiting for event loop to stop");
uv_async_init(&m_loop, &asy, [](uv_async_t* h) { uv_close(reinterpret_cast<uv_handle_t*>(h), nullptr); });
uv_stop(&m_loop);
uv_async_send(&asy);
break;
}
}
std::this_thread::sleep_for(milliseconds(1));
}
uv_thread_join(&m_loopThread);
uv_mutex_destroy(&m_clientsListLock);
@ -598,7 +572,6 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::loop(void* data)
server->m_preallocatedClients.clear();
LOGINFO(1, "event loop stopped");
server->m_loopStopped = true;
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
@ -729,7 +702,6 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client(uv_stream_t* server
on_new_client(server, client);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client(uv_stream_t* server, Client* client)
{
@ -825,6 +797,67 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client(uv_stream_t* server
}
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_shutdown(uv_async_t* async)
{
TCPServer* s = reinterpret_cast<TCPServer*>(async->data);
s->on_shutdown();
s->close_sockets(true);
uv_close(reinterpret_cast<uv_handle_t*>(&s->m_dropConnectionsAsync), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&s->m_shutdownAsync), nullptr);
delete GetLoopUserData(&s->m_loop, false);
s->m_numHandles = 0;
uv_walk(&s->m_loop, [](uv_handle_t*, void* n) { (*reinterpret_cast<size_t*>(n))++; }, &s->m_numHandles);
uv_prepare_init(&s->m_loop, &s->m_shutdownPrepare);
s->m_shutdownPrepare.data = s;
uv_timer_init(&s->m_loop, &s->m_shutdownTimer);
s->m_shutdownTimer.data = s;
s->m_shutdownCountdown = 30;
uv_timer_start(&s->m_shutdownTimer,
[](uv_timer_t* h)
{
TCPServer* s = reinterpret_cast<TCPServer*>(h->data);
const uint32_t k = --s->m_shutdownCountdown;
if (k > 0) {
LOGINFO(1, "waiting for event loop to stop for " << k << " more seconds (" << s->m_numHandles << " handles left)...");
}
else {
LOGINFO(1, "force stopping the event loop...");
uv_timer_stop(&s->m_shutdownTimer);
uv_prepare_stop(&s->m_shutdownPrepare);
uv_close(reinterpret_cast<uv_handle_t*>(&s->m_shutdownTimer), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&s->m_shutdownPrepare), nullptr);
uv_stop(&s->m_loop);
}
}, 1000, 1000);
uv_prepare_start(&s->m_shutdownPrepare,
[](uv_prepare_t* h)
{
TCPServer* s = reinterpret_cast<TCPServer*>(h->data);
s->m_numHandles = 0;
uv_walk(&s->m_loop, [](uv_handle_t*, void* n) { (*reinterpret_cast<size_t*>(n))++; }, &s->m_numHandles);
if (s->m_numHandles > 2) {
// Don't count m_shutdownTimer and m_shutdownPrepare
s->m_numHandles -= 2;
}
else {
uv_timer_stop(&s->m_shutdownTimer);
uv_prepare_stop(&s->m_shutdownPrepare);
uv_close(reinterpret_cast<uv_handle_t*>(&s->m_shutdownTimer), nullptr);
uv_close(reinterpret_cast<uv_handle_t*>(&s->m_shutdownPrepare), nullptr);
}
});
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::Client()
: m_owner(nullptr)