mirror of
https://github.com/monero-project/monero.git
synced 2025-01-23 19:15:57 +00:00
Merge pull request #7308
df2f00f
boosted_tcp_server: fix connection lifetime (anon)3833624
boosted_tcp_server: add segfault demo (anon)
This commit is contained in:
commit
9f6dcbd568
3 changed files with 169 additions and 7 deletions
|
@ -270,8 +270,6 @@ PRAGMA_WARNING_DISABLE_VS(4355)
|
||||||
//_dbg3("[sock " << socket().native_handle() << "] add_ref, m_peer_number=" << mI->m_peer_number);
|
//_dbg3("[sock " << socket().native_handle() << "] add_ref, m_peer_number=" << mI->m_peer_number);
|
||||||
CRITICAL_REGION_LOCAL(self->m_self_refs_lock);
|
CRITICAL_REGION_LOCAL(self->m_self_refs_lock);
|
||||||
//_dbg3("[sock " << socket().native_handle() << "] add_ref 2, m_peer_number=" << mI->m_peer_number);
|
//_dbg3("[sock " << socket().native_handle() << "] add_ref 2, m_peer_number=" << mI->m_peer_number);
|
||||||
if(m_was_shutdown)
|
|
||||||
return false;
|
|
||||||
++m_reference_count;
|
++m_reference_count;
|
||||||
m_self_ref = std::move(self);
|
m_self_ref = std::move(self);
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -891,12 +891,22 @@ template<class t_connection_context> template<class callback_t>
|
||||||
bool async_protocol_handler_config<t_connection_context>::foreach_connection(const callback_t &cb)
|
bool async_protocol_handler_config<t_connection_context>::foreach_connection(const callback_t &cb)
|
||||||
{
|
{
|
||||||
CRITICAL_REGION_LOCAL(m_connects_lock);
|
CRITICAL_REGION_LOCAL(m_connects_lock);
|
||||||
for(auto& c: m_connects)
|
std::vector<typename connections_map::mapped_type> conn;
|
||||||
{
|
conn.reserve(m_connects.size());
|
||||||
async_protocol_handler<t_connection_context>* aph = c.second;
|
|
||||||
if(!cb(aph->get_context_ref()))
|
auto scope_exit_handler = misc_utils::create_scope_leave_handler([&conn]{
|
||||||
|
for (auto &aph: conn)
|
||||||
|
aph->finish_outer_call();
|
||||||
|
});
|
||||||
|
|
||||||
|
for (auto &e: m_connects)
|
||||||
|
if (e.second->start_outer_call())
|
||||||
|
conn.push_back(e.second);
|
||||||
|
|
||||||
|
for (auto &aph: conn)
|
||||||
|
if (!cb(aph->get_context_ref()))
|
||||||
return false;
|
return false;
|
||||||
}
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
//------------------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------------------
|
||||||
|
@ -907,6 +917,10 @@ bool async_protocol_handler_config<t_connection_context>::for_connection(const b
|
||||||
async_protocol_handler<t_connection_context>* aph = find_connection(connection_id);
|
async_protocol_handler<t_connection_context>* aph = find_connection(connection_id);
|
||||||
if (!aph)
|
if (!aph)
|
||||||
return false;
|
return false;
|
||||||
|
if (!aph->start_outer_call())
|
||||||
|
return false;
|
||||||
|
auto scope_exit_handler = misc_utils::create_scope_leave_handler(
|
||||||
|
boost::bind(&async_protocol_handler<t_connection_context>::finish_outer_call, aph));
|
||||||
if(!cb(aph->get_context_ref()))
|
if(!cb(aph->get_context_ref()))
|
||||||
return false;
|
return false;
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -37,6 +37,7 @@
|
||||||
#include "include_base_utils.h"
|
#include "include_base_utils.h"
|
||||||
#include "string_tools.h"
|
#include "string_tools.h"
|
||||||
#include "net/abstract_tcp_server2.h"
|
#include "net/abstract_tcp_server2.h"
|
||||||
|
#include "net/levin_protocol_handler_async.h"
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
|
@ -132,3 +133,152 @@ TEST(boosted_tcp_server, worker_threads_are_exception_resistant)
|
||||||
ASSERT_TRUE(srv.timed_wait_server_stop(5 * 1000));
|
ASSERT_TRUE(srv.timed_wait_server_stop(5 * 1000));
|
||||||
ASSERT_TRUE(srv.deinit_server());
|
ASSERT_TRUE(srv.deinit_server());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
TEST(test_epee_connection, test_lifetime)
|
||||||
|
{
|
||||||
|
struct context_t: epee::net_utils::connection_context_base {
|
||||||
|
static constexpr size_t get_max_bytes(int) noexcept { return -1; }
|
||||||
|
static constexpr int handshake_command() noexcept { return 1001; }
|
||||||
|
static constexpr bool handshake_complete() noexcept { return true; }
|
||||||
|
};
|
||||||
|
|
||||||
|
struct command_handler_t: epee::levin::levin_commands_handler<context_t> {
|
||||||
|
size_t delay;
|
||||||
|
command_handler_t(size_t delay = 0): delay(delay) {}
|
||||||
|
virtual int invoke(int, const epee::span<const uint8_t>, epee::byte_slice&, context_t&) override { epee::misc_utils::sleep_no_w(delay); return {}; }
|
||||||
|
virtual int notify(int, const epee::span<const uint8_t>, context_t&) override { return {}; }
|
||||||
|
virtual void callback(context_t&) override {}
|
||||||
|
virtual void on_connection_new(context_t&) override {}
|
||||||
|
virtual void on_connection_close(context_t&) override {}
|
||||||
|
virtual ~command_handler_t() override {}
|
||||||
|
static void destroy(epee::levin::levin_commands_handler<context_t>* ptr) { delete ptr; }
|
||||||
|
};
|
||||||
|
|
||||||
|
using handler_t = epee::levin::async_protocol_handler<context_t>;
|
||||||
|
using connection_t = epee::net_utils::connection<handler_t>;
|
||||||
|
using connection_ptr = boost::shared_ptr<connection_t>;
|
||||||
|
using shared_state_t = typename connection_t::shared_state;
|
||||||
|
using shared_state_ptr = std::shared_ptr<shared_state_t>;
|
||||||
|
using tag_t = boost::uuids::uuid;
|
||||||
|
using tags_t = std::vector<tag_t>;
|
||||||
|
using io_context_t = boost::asio::io_service;
|
||||||
|
using endpoint_t = boost::asio::ip::tcp::endpoint;
|
||||||
|
using work_t = boost::asio::io_service::work;
|
||||||
|
using work_ptr = std::shared_ptr<work_t>;
|
||||||
|
using workers_t = std::vector<std::thread>;
|
||||||
|
using server_t = epee::net_utils::boosted_tcp_server<handler_t>;
|
||||||
|
|
||||||
|
io_context_t io_context;
|
||||||
|
work_ptr work(std::make_shared<work_t>(io_context));
|
||||||
|
|
||||||
|
workers_t workers;
|
||||||
|
while (workers.size() < 4) {
|
||||||
|
workers.emplace_back([&io_context]{
|
||||||
|
io_context.run();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
endpoint_t endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 5262);
|
||||||
|
server_t server(epee::net_utils::e_connection_type_P2P);
|
||||||
|
server.init_server(endpoint.port(),
|
||||||
|
endpoint.address().to_string(),
|
||||||
|
0,
|
||||||
|
"",
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
epee::net_utils::ssl_support_t::e_ssl_support_disabled
|
||||||
|
);
|
||||||
|
server.run_server(2, false);
|
||||||
|
server.get_config_shared()->set_handler(new command_handler_t, &command_handler_t::destroy);
|
||||||
|
|
||||||
|
io_context.post([&io_context, &work, &endpoint, &server]{
|
||||||
|
auto scope_exit_handler = epee::misc_utils::create_scope_leave_handler([&work]{
|
||||||
|
work.reset();
|
||||||
|
});
|
||||||
|
|
||||||
|
shared_state_ptr shared_state(std::make_shared<shared_state_t>());
|
||||||
|
shared_state->set_handler(new command_handler_t, &command_handler_t::destroy);
|
||||||
|
|
||||||
|
auto create_connection = [&io_context, &endpoint, &shared_state] {
|
||||||
|
connection_ptr conn(new connection_t(io_context, shared_state, {}, {}));
|
||||||
|
conn->socket().connect(endpoint);
|
||||||
|
conn->start({}, {});
|
||||||
|
context_t context;
|
||||||
|
conn->get_context(context);
|
||||||
|
auto tag = context.m_connection_id;
|
||||||
|
return tag;
|
||||||
|
};
|
||||||
|
|
||||||
|
ASSERT_TRUE(shared_state->get_connections_count() == 0);
|
||||||
|
auto tag = create_connection();
|
||||||
|
ASSERT_TRUE(shared_state->get_connections_count() == 1);
|
||||||
|
bool success = shared_state->for_connection(tag, [shared_state](context_t& context){
|
||||||
|
shared_state->close(context.m_connection_id);
|
||||||
|
context.m_remote_address.get_zone();
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
ASSERT_TRUE(success);
|
||||||
|
|
||||||
|
ASSERT_TRUE(shared_state->get_connections_count() == 0);
|
||||||
|
constexpr auto N = 8;
|
||||||
|
tags_t tags(N);
|
||||||
|
for(auto &t: tags)
|
||||||
|
t = create_connection();
|
||||||
|
ASSERT_TRUE(shared_state->get_connections_count() == N);
|
||||||
|
size_t index = 0;
|
||||||
|
success = shared_state->foreach_connection([&index, shared_state, &tags, &create_connection](context_t& context){
|
||||||
|
if (!index)
|
||||||
|
for (const auto &t: tags)
|
||||||
|
shared_state->close(t);
|
||||||
|
|
||||||
|
shared_state->close(context.m_connection_id);
|
||||||
|
context.m_remote_address.get_zone();
|
||||||
|
++index;
|
||||||
|
|
||||||
|
for(auto i = 0; i < N; ++i)
|
||||||
|
create_connection();
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
ASSERT_TRUE(success);
|
||||||
|
ASSERT_TRUE(index == N);
|
||||||
|
ASSERT_TRUE(shared_state->get_connections_count() == N * N);
|
||||||
|
|
||||||
|
index = 0;
|
||||||
|
success = shared_state->foreach_connection([&index, shared_state](context_t& context){
|
||||||
|
shared_state->close(context.m_connection_id);
|
||||||
|
context.m_remote_address.get_zone();
|
||||||
|
++index;
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
ASSERT_TRUE(success);
|
||||||
|
ASSERT_TRUE(index == N * N);
|
||||||
|
ASSERT_TRUE(shared_state->get_connections_count() == 0);
|
||||||
|
|
||||||
|
ASSERT_TRUE(shared_state->get_connections_count() == 0);
|
||||||
|
constexpr auto DELAY = 30;
|
||||||
|
constexpr auto TIMEOUT = 1;
|
||||||
|
server.get_config_shared()->set_handler(new command_handler_t(DELAY), &command_handler_t::destroy);
|
||||||
|
for (auto i = 0; i < N; ++i) {
|
||||||
|
tag = create_connection();
|
||||||
|
ASSERT_TRUE(shared_state->get_connections_count() == 1);
|
||||||
|
success = shared_state->invoke_async(1, {}, tag, [](int, const epee::span<const uint8_t>, context_t&){}, TIMEOUT);
|
||||||
|
ASSERT_TRUE(success);
|
||||||
|
while (shared_state->sock_count == 1) {
|
||||||
|
success = shared_state->foreach_connection([&shared_state, &tag](context_t&){
|
||||||
|
return shared_state->request_callback(tag);
|
||||||
|
});
|
||||||
|
ASSERT_TRUE(success);
|
||||||
|
}
|
||||||
|
shared_state->close(tag);
|
||||||
|
ASSERT_TRUE(shared_state->get_connections_count() == 0);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
for (auto& w: workers) {
|
||||||
|
w.join();
|
||||||
|
}
|
||||||
|
server.send_stop_signal();
|
||||||
|
server.timed_wait_server_stop(5 * 1000);
|
||||||
|
server.deinit_server();
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue