diff --git a/src/tcp_server.h b/src/tcp_server.h index 0d30124..bac83c0 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -89,21 +89,21 @@ public: char m_readBuf[READ_BUF_SIZE]; uint32_t m_numRead; - struct WriteBuf - { - Client* m_client; - uv_write_t m_write; - char m_data[WRITE_BUF_SIZE]; - }; - - uv_mutex_t m_writeBuffersLock; - std::vector m_writeBuffers; - std::atomic m_resetCounter{ 0 }; uv_mutex_t m_sendLock; }; + struct WriteBuf + { + Client* m_client; + uv_write_t m_write; + char m_data[WRITE_BUF_SIZE]; + }; + + uv_mutex_t m_writeBuffersLock; + std::vector m_writeBuffers; + struct SendCallbackBase { virtual ~SendCallbackBase() {} diff --git a/src/tcp_server.inl b/src/tcp_server.inl index 73d61ba..0753c03 100644 --- a/src/tcp_server.inl +++ b/src/tcp_server.inl @@ -46,6 +46,12 @@ TCPServer::TCPServer(allocate_client_callback all uv_mutex_init_checked(&m_clientsListLock); uv_mutex_init_checked(&m_bansLock); uv_mutex_init_checked(&m_pendingConnectionsLock); + uv_mutex_init_checked(&m_writeBuffersLock); + + m_writeBuffers.resize(DEFAULT_BACKLOG); + for (size_t i = 0; i < m_writeBuffers.size(); ++i) { + m_writeBuffers[i] = new WriteBuf(); + } m_preallocatedClients.reserve(DEFAULT_BACKLOG); for (int i = 0; i < DEFAULT_BACKLOG; ++i) { @@ -465,6 +471,14 @@ void TCPServer::shutdown_tcp() uv_mutex_destroy(&m_bansLock); uv_mutex_destroy(&m_pendingConnectionsLock); + { + MutexLock lock(m_writeBuffersLock); + for (WriteBuf* buf : m_writeBuffers) { + delete buf; + } + } + uv_mutex_destroy(&m_writeBuffersLock); + LOGINFO(1, "stopped"); } @@ -492,18 +506,18 @@ bool TCPServer::send_internal(Client* client, Sen MutexLock lock0(client->m_sendLock); - typename Client::WriteBuf* buf = nullptr; + WriteBuf* buf = nullptr; { - MutexLock lock(client->m_writeBuffersLock); - if (!client->m_writeBuffers.empty()) { - buf = client->m_writeBuffers.back(); - client->m_writeBuffers.pop_back(); + MutexLock lock(m_writeBuffersLock); + if (!m_writeBuffers.empty()) { + buf = m_writeBuffers.back(); + m_writeBuffers.pop_back(); } } if (!buf) { - buf = new typename Client::WriteBuf(); + buf = new WriteBuf(); } const size_t bytes_written = callback(buf->m_data); @@ -516,8 +530,8 @@ bool TCPServer::send_internal(Client* client, Sen if (bytes_written == 0) { LOGWARN(1, "send callback wrote 0 bytes, nothing to do"); { - MutexLock lock(client->m_writeBuffersLock); - client->m_writeBuffers.push_back(buf); + MutexLock lock(m_writeBuffersLock); + m_writeBuffers.push_back(buf); } return true; } @@ -532,8 +546,8 @@ bool TCPServer::send_internal(Client* client, Sen const int err = uv_write(&buf->m_write, reinterpret_cast(&client->m_socket), bufs, 1, Client::on_write); if (err) { { - MutexLock lock(client->m_writeBuffersLock); - client->m_writeBuffers.push_back(buf); + MutexLock lock(m_writeBuffersLock); + m_writeBuffers.push_back(buf); } LOGWARN(1, "failed to start writing data to client connection " << static_cast(client->m_addrString) << ", error " << uv_err_name(err)); return false; @@ -764,27 +778,14 @@ TCPServer::Client::Client() { Client::reset(); - uv_mutex_init_checked(&m_writeBuffersLock); uv_mutex_init_checked(&m_sendLock); m_readBuf[0] = '\0'; - - m_writeBuffers.resize(2); - for (size_t i = 0; i < m_writeBuffers.size(); ++i) { - m_writeBuffers[i] = new WriteBuf(); - } } template TCPServer::Client::~Client() { - { - MutexLock lock(m_writeBuffersLock); - for (WriteBuf* buf : m_writeBuffers) { - delete buf; - } - } - uv_mutex_destroy(&m_writeBuffersLock); uv_mutex_destroy(&m_sendLock); } @@ -861,12 +862,13 @@ void TCPServer::Client::on_read(uv_stream_t* stre template void TCPServer::Client::on_write(uv_write_t* req, int status) { - Client::WriteBuf* buf = static_cast(req->data); + WriteBuf* buf = static_cast(req->data); Client* client = buf->m_client; + TCPServer* server = client->m_owner; - { - MutexLock lock(client->m_writeBuffersLock); - client->m_writeBuffers.push_back(buf); + if (server) { + MutexLock lock(server->m_writeBuffersLock); + server->m_writeBuffers.push_back(buf); } if (status != 0) {