TCPServer: make write buffers global

Reduced memory usage per connection.
This commit is contained in:
SChernykh 2021-10-31 15:25:59 +01:00
parent 998c2ba72f
commit dff2413cae
2 changed files with 39 additions and 37 deletions

View file

@ -89,6 +89,11 @@ public:
char m_readBuf[READ_BUF_SIZE]; char m_readBuf[READ_BUF_SIZE];
uint32_t m_numRead; uint32_t m_numRead;
std::atomic<uint32_t> m_resetCounter{ 0 };
uv_mutex_t m_sendLock;
};
struct WriteBuf struct WriteBuf
{ {
Client* m_client; Client* m_client;
@ -99,11 +104,6 @@ public:
uv_mutex_t m_writeBuffersLock; uv_mutex_t m_writeBuffersLock;
std::vector<WriteBuf*> m_writeBuffers; std::vector<WriteBuf*> m_writeBuffers;
std::atomic<uint32_t> m_resetCounter{ 0 };
uv_mutex_t m_sendLock;
};
struct SendCallbackBase struct SendCallbackBase
{ {
virtual ~SendCallbackBase() {} virtual ~SendCallbackBase() {}

View file

@ -46,6 +46,12 @@ TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::TCPServer(allocate_client_callback all
uv_mutex_init_checked(&m_clientsListLock); uv_mutex_init_checked(&m_clientsListLock);
uv_mutex_init_checked(&m_bansLock); uv_mutex_init_checked(&m_bansLock);
uv_mutex_init_checked(&m_pendingConnectionsLock); uv_mutex_init_checked(&m_pendingConnectionsLock);
uv_mutex_init_checked(&m_writeBuffersLock);
m_writeBuffers.resize(DEFAULT_BACKLOG);
for (size_t i = 0; i < m_writeBuffers.size(); ++i) {
m_writeBuffers[i] = new WriteBuf();
}
m_preallocatedClients.reserve(DEFAULT_BACKLOG); m_preallocatedClients.reserve(DEFAULT_BACKLOG);
for (int i = 0; i < DEFAULT_BACKLOG; ++i) { for (int i = 0; i < DEFAULT_BACKLOG; ++i) {
@ -465,6 +471,14 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::shutdown_tcp()
uv_mutex_destroy(&m_bansLock); uv_mutex_destroy(&m_bansLock);
uv_mutex_destroy(&m_pendingConnectionsLock); uv_mutex_destroy(&m_pendingConnectionsLock);
{
MutexLock lock(m_writeBuffersLock);
for (WriteBuf* buf : m_writeBuffers) {
delete buf;
}
}
uv_mutex_destroy(&m_writeBuffersLock);
LOGINFO(1, "stopped"); LOGINFO(1, "stopped");
} }
@ -492,18 +506,18 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
MutexLock lock0(client->m_sendLock); MutexLock lock0(client->m_sendLock);
typename Client::WriteBuf* buf = nullptr; WriteBuf* buf = nullptr;
{ {
MutexLock lock(client->m_writeBuffersLock); MutexLock lock(m_writeBuffersLock);
if (!client->m_writeBuffers.empty()) { if (!m_writeBuffers.empty()) {
buf = client->m_writeBuffers.back(); buf = m_writeBuffers.back();
client->m_writeBuffers.pop_back(); m_writeBuffers.pop_back();
} }
} }
if (!buf) { if (!buf) {
buf = new typename Client::WriteBuf(); buf = new WriteBuf();
} }
const size_t bytes_written = callback(buf->m_data); const size_t bytes_written = callback(buf->m_data);
@ -516,8 +530,8 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
if (bytes_written == 0) { if (bytes_written == 0) {
LOGWARN(1, "send callback wrote 0 bytes, nothing to do"); LOGWARN(1, "send callback wrote 0 bytes, nothing to do");
{ {
MutexLock lock(client->m_writeBuffersLock); MutexLock lock(m_writeBuffersLock);
client->m_writeBuffers.push_back(buf); m_writeBuffers.push_back(buf);
} }
return true; return true;
} }
@ -532,8 +546,8 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
const int err = uv_write(&buf->m_write, reinterpret_cast<uv_stream_t*>(&client->m_socket), bufs, 1, Client::on_write); const int err = uv_write(&buf->m_write, reinterpret_cast<uv_stream_t*>(&client->m_socket), bufs, 1, Client::on_write);
if (err) { if (err) {
{ {
MutexLock lock(client->m_writeBuffersLock); MutexLock lock(m_writeBuffersLock);
client->m_writeBuffers.push_back(buf); m_writeBuffers.push_back(buf);
} }
LOGWARN(1, "failed to start writing data to client connection " << static_cast<const char*>(client->m_addrString) << ", error " << uv_err_name(err)); LOGWARN(1, "failed to start writing data to client connection " << static_cast<const char*>(client->m_addrString) << ", error " << uv_err_name(err));
return false; return false;
@ -764,27 +778,14 @@ TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::Client()
{ {
Client::reset(); Client::reset();
uv_mutex_init_checked(&m_writeBuffersLock);
uv_mutex_init_checked(&m_sendLock); uv_mutex_init_checked(&m_sendLock);
m_readBuf[0] = '\0'; m_readBuf[0] = '\0';
m_writeBuffers.resize(2);
for (size_t i = 0; i < m_writeBuffers.size(); ++i) {
m_writeBuffers[i] = new WriteBuf();
}
} }
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE> template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::~Client() TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::~Client()
{ {
{
MutexLock lock(m_writeBuffersLock);
for (WriteBuf* buf : m_writeBuffers) {
delete buf;
}
}
uv_mutex_destroy(&m_writeBuffersLock);
uv_mutex_destroy(&m_sendLock); uv_mutex_destroy(&m_sendLock);
} }
@ -861,12 +862,13 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_read(uv_stream_t* stre
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE> template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_write(uv_write_t* req, int status) void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_write(uv_write_t* req, int status)
{ {
Client::WriteBuf* buf = static_cast<Client::WriteBuf*>(req->data); WriteBuf* buf = static_cast<WriteBuf*>(req->data);
Client* client = buf->m_client; Client* client = buf->m_client;
TCPServer* server = client->m_owner;
{ if (server) {
MutexLock lock(client->m_writeBuffersLock); MutexLock lock(server->m_writeBuffersLock);
client->m_writeBuffers.push_back(buf); server->m_writeBuffers.push_back(buf);
} }
if (status != 0) { if (status != 0) {