From 8420f5f1b1184c1bc0350223310fd602db466716 Mon Sep 17 00:00:00 2001 From: SChernykh Date: Mon, 17 Apr 2023 16:22:46 +0200 Subject: [PATCH] TCPServer: reduced write buffer reallocations --- src/tcp_server.h | 5 +++-- src/tcp_server.inl | 35 ++++++++++++++++++++++++----------- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/src/tcp_server.h b/src/tcp_server.h index 5677d16..3e84c79 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -18,6 +18,7 @@ #pragma once #include "uv_util.h" +#include namespace p2pool { @@ -110,9 +111,9 @@ public: size_t m_dataCapacity = 0; }; - std::vector m_writeBuffers; + std::multimap m_writeBuffers; - WriteBuf* get_write_buffer(); + WriteBuf* get_write_buffer(size_t size_hint); void return_write_buffer(WriteBuf* buf); struct SendCallbackBase diff --git a/src/tcp_server.inl b/src/tcp_server.inl index 4bf3427..a054cc8 100644 --- a/src/tcp_server.inl +++ b/src/tcp_server.inl @@ -492,8 +492,6 @@ bool TCPServer::send_internal(Client* client, Sen return true; } - WriteBuf* buf = get_write_buffer(); - // callback_buf is used in only 1 thread, so it's safe static uint8_t callback_buf[WRITE_BUF_SIZE]; const size_t bytes_written = callback(callback_buf, sizeof(callback_buf)); @@ -505,10 +503,11 @@ bool TCPServer::send_internal(Client* client, Sen if (bytes_written == 0) { LOGWARN(1, "send callback wrote 0 bytes, nothing to do"); - return_write_buffer(buf); return true; } + WriteBuf* buf = get_write_buffer(bytes_written); + buf->m_write.data = buf; buf->m_client = client; @@ -544,14 +543,17 @@ void TCPServer::loop(void* data) server_event_loop_thread = true; TCPServer* server = static_cast(data); - server->m_writeBuffers.resize(DEFAULT_BACKLOG); server->m_preallocatedClients.reserve(DEFAULT_BACKLOG); for (size_t i = 0; i < DEFAULT_BACKLOG; ++i) { WriteBuf* wb = new WriteBuf(); + const size_t capacity = wb->m_dataCapacity; + Client* c = server->m_allocateNewClient(); + ASAN_POISON_MEMORY_REGION(wb, sizeof(WriteBuf)); ASAN_POISON_MEMORY_REGION(c, c->size()); - server->m_writeBuffers[i] = wb; + + server->m_writeBuffers.emplace(capacity, wb); server->m_preallocatedClients.emplace_back(c); } @@ -565,7 +567,9 @@ void TCPServer::loop(void* data) LOGWARN(1, "uv_loop_close returned error " << uv_err_name(err)); } - for (WriteBuf* buf : server->m_writeBuffers) { + for (const auto& it : server->m_writeBuffers) { + WriteBuf* buf = it.second; + ASAN_UNPOISON_MEMORY_REGION(buf, sizeof(WriteBuf)); if (buf->m_data) { ASAN_UNPOISON_MEMORY_REGION(buf->m_data, buf->m_dataCapacity); @@ -861,13 +865,20 @@ void TCPServer::on_shutdown(uv_async_t* async) } template -typename TCPServer::WriteBuf* TCPServer::get_write_buffer() +typename TCPServer::WriteBuf* TCPServer::get_write_buffer(size_t size_hint) { WriteBuf* buf; if (!m_writeBuffers.empty()) { - buf = m_writeBuffers.back(); - m_writeBuffers.pop_back(); + // Try to find the smallest buffer that still has enough capacity + // If there is no buffer with enough capacity, just take the largest available buffer + auto it = m_writeBuffers.lower_bound(size_hint); + if (it == m_writeBuffers.end()) { + it = std::prev(it); + } + + buf = it->second; + m_writeBuffers.erase(it); ASAN_UNPOISON_MEMORY_REGION(buf, sizeof(WriteBuf)); if (buf->m_data) { @@ -884,12 +895,14 @@ typename TCPServer::WriteBuf* TCPServer void TCPServer::return_write_buffer(WriteBuf* buf) { + const size_t capacity = buf->m_dataCapacity; + if (buf->m_data) { - ASAN_POISON_MEMORY_REGION(buf->m_data, buf->m_dataCapacity); + ASAN_POISON_MEMORY_REGION(buf->m_data, capacity); } ASAN_POISON_MEMORY_REGION(buf, sizeof(WriteBuf)); - m_writeBuffers.push_back(buf); + m_writeBuffers.emplace(capacity, buf); } template