TCPServer: reduced write buffer reallocations

This commit is contained in:
SChernykh 2023-04-17 16:22:46 +02:00
parent 80352f6b30
commit 8420f5f1b1
2 changed files with 27 additions and 13 deletions

View file

@ -18,6 +18,7 @@
#pragma once #pragma once
#include "uv_util.h" #include "uv_util.h"
#include <map>
namespace p2pool { namespace p2pool {
@ -110,9 +111,9 @@ public:
size_t m_dataCapacity = 0; size_t m_dataCapacity = 0;
}; };
std::vector<WriteBuf*> m_writeBuffers; std::multimap<size_t, WriteBuf*> m_writeBuffers;
WriteBuf* get_write_buffer(); WriteBuf* get_write_buffer(size_t size_hint);
void return_write_buffer(WriteBuf* buf); void return_write_buffer(WriteBuf* buf);
struct SendCallbackBase struct SendCallbackBase

View file

@ -492,8 +492,6 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
return true; return true;
} }
WriteBuf* buf = get_write_buffer();
// callback_buf is used in only 1 thread, so it's safe // callback_buf is used in only 1 thread, so it's safe
static uint8_t callback_buf[WRITE_BUF_SIZE]; static uint8_t callback_buf[WRITE_BUF_SIZE];
const size_t bytes_written = callback(callback_buf, sizeof(callback_buf)); const size_t bytes_written = callback(callback_buf, sizeof(callback_buf));
@ -505,10 +503,11 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
if (bytes_written == 0) { if (bytes_written == 0) {
LOGWARN(1, "send callback wrote 0 bytes, nothing to do"); LOGWARN(1, "send callback wrote 0 bytes, nothing to do");
return_write_buffer(buf);
return true; return true;
} }
WriteBuf* buf = get_write_buffer(bytes_written);
buf->m_write.data = buf; buf->m_write.data = buf;
buf->m_client = client; buf->m_client = client;
@ -544,14 +543,17 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::loop(void* data)
server_event_loop_thread = true; server_event_loop_thread = true;
TCPServer* server = static_cast<TCPServer*>(data); TCPServer* server = static_cast<TCPServer*>(data);
server->m_writeBuffers.resize(DEFAULT_BACKLOG);
server->m_preallocatedClients.reserve(DEFAULT_BACKLOG); server->m_preallocatedClients.reserve(DEFAULT_BACKLOG);
for (size_t i = 0; i < DEFAULT_BACKLOG; ++i) { for (size_t i = 0; i < DEFAULT_BACKLOG; ++i) {
WriteBuf* wb = new WriteBuf(); WriteBuf* wb = new WriteBuf();
const size_t capacity = wb->m_dataCapacity;
Client* c = server->m_allocateNewClient(); Client* c = server->m_allocateNewClient();
ASAN_POISON_MEMORY_REGION(wb, sizeof(WriteBuf)); ASAN_POISON_MEMORY_REGION(wb, sizeof(WriteBuf));
ASAN_POISON_MEMORY_REGION(c, c->size()); ASAN_POISON_MEMORY_REGION(c, c->size());
server->m_writeBuffers[i] = wb;
server->m_writeBuffers.emplace(capacity, wb);
server->m_preallocatedClients.emplace_back(c); server->m_preallocatedClients.emplace_back(c);
} }
@ -565,7 +567,9 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::loop(void* data)
LOGWARN(1, "uv_loop_close returned error " << uv_err_name(err)); LOGWARN(1, "uv_loop_close returned error " << uv_err_name(err));
} }
for (WriteBuf* buf : server->m_writeBuffers) { for (const auto& it : server->m_writeBuffers) {
WriteBuf* buf = it.second;
ASAN_UNPOISON_MEMORY_REGION(buf, sizeof(WriteBuf)); ASAN_UNPOISON_MEMORY_REGION(buf, sizeof(WriteBuf));
if (buf->m_data) { if (buf->m_data) {
ASAN_UNPOISON_MEMORY_REGION(buf->m_data, buf->m_dataCapacity); ASAN_UNPOISON_MEMORY_REGION(buf->m_data, buf->m_dataCapacity);
@ -861,13 +865,20 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_shutdown(uv_async_t* async)
} }
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE> template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
typename TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::WriteBuf* TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::get_write_buffer() typename TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::WriteBuf* TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::get_write_buffer(size_t size_hint)
{ {
WriteBuf* buf; WriteBuf* buf;
if (!m_writeBuffers.empty()) { if (!m_writeBuffers.empty()) {
buf = m_writeBuffers.back(); // Try to find the smallest buffer that still has enough capacity
m_writeBuffers.pop_back(); // If there is no buffer with enough capacity, just take the largest available buffer
auto it = m_writeBuffers.lower_bound(size_hint);
if (it == m_writeBuffers.end()) {
it = std::prev(it);
}
buf = it->second;
m_writeBuffers.erase(it);
ASAN_UNPOISON_MEMORY_REGION(buf, sizeof(WriteBuf)); ASAN_UNPOISON_MEMORY_REGION(buf, sizeof(WriteBuf));
if (buf->m_data) { if (buf->m_data) {
@ -884,12 +895,14 @@ typename TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::WriteBuf* TCPServer<READ_BUF_
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE> template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::return_write_buffer(WriteBuf* buf) void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::return_write_buffer(WriteBuf* buf)
{ {
const size_t capacity = buf->m_dataCapacity;
if (buf->m_data) { if (buf->m_data) {
ASAN_POISON_MEMORY_REGION(buf->m_data, buf->m_dataCapacity); ASAN_POISON_MEMORY_REGION(buf->m_data, capacity);
} }
ASAN_POISON_MEMORY_REGION(buf, sizeof(WriteBuf)); ASAN_POISON_MEMORY_REGION(buf, sizeof(WriteBuf));
m_writeBuffers.push_back(buf); m_writeBuffers.emplace(capacity, buf);
} }
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE> template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>