mirror of
https://github.com/monero-project/monero.git
synced 2024-11-17 00:07:38 +00:00
Merge pull request #7460
2935a0c
async_protocol_handler_config: fix deadlock (anon)c877705
async_protocol_handler_config: add deadlock demo (anon)
This commit is contained in:
commit
a2b046dafc
2 changed files with 134 additions and 28 deletions
|
@ -769,36 +769,32 @@ void async_protocol_handler_config<t_connection_context>::del_connection(async_p
|
|||
template<class t_connection_context>
|
||||
void async_protocol_handler_config<t_connection_context>::delete_connections(size_t count, bool incoming)
|
||||
{
|
||||
std::vector <boost::uuids::uuid> connections;
|
||||
std::vector<typename connections_map::mapped_type> connections;
|
||||
|
||||
auto scope_exit_handler = misc_utils::create_scope_leave_handler([&connections]{
|
||||
for (auto &aph: connections)
|
||||
aph->finish_outer_call();
|
||||
});
|
||||
|
||||
CRITICAL_REGION_BEGIN(m_connects_lock);
|
||||
for (auto& c: m_connects)
|
||||
{
|
||||
if (c.second->m_connection_context.m_is_income == incoming)
|
||||
connections.push_back(c.first);
|
||||
if (c.second->start_outer_call())
|
||||
connections.push_back(c.second);
|
||||
}
|
||||
|
||||
// close random connections from the provided set
|
||||
// TODO or better just keep removing random elements (performance)
|
||||
unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
|
||||
shuffle(connections.begin(), connections.end(), std::default_random_engine(seed));
|
||||
while (count > 0 && connections.size() > 0)
|
||||
{
|
||||
try
|
||||
{
|
||||
auto i = connections.end() - 1;
|
||||
async_protocol_handler<t_connection_context> *conn = m_connects.at(*i);
|
||||
m_connects.erase(*i);
|
||||
conn->close();
|
||||
connections.erase(i);
|
||||
}
|
||||
catch (const std::out_of_range &e)
|
||||
{
|
||||
MWARNING("Connection not found in m_connects, continuing");
|
||||
}
|
||||
--count;
|
||||
}
|
||||
for (size_t i = 0; i < connections.size() && i < count; ++i)
|
||||
m_connects.erase(connections[i]->get_connection_id());
|
||||
|
||||
CRITICAL_REGION_END();
|
||||
|
||||
for (size_t i = 0; i < connections.size() && i < count; ++i)
|
||||
connections[i]->close();
|
||||
}
|
||||
//------------------------------------------------------------------------------------------
|
||||
template<class t_connection_context>
|
||||
|
@ -860,18 +856,19 @@ int async_protocol_handler_config<t_connection_context>::invoke_async(int comman
|
|||
template<class t_connection_context> template<class callback_t>
|
||||
bool async_protocol_handler_config<t_connection_context>::foreach_connection(const callback_t &cb)
|
||||
{
|
||||
CRITICAL_REGION_LOCAL(m_connects_lock);
|
||||
std::vector<typename connections_map::mapped_type> conn;
|
||||
conn.reserve(m_connects.size());
|
||||
|
||||
auto scope_exit_handler = misc_utils::create_scope_leave_handler([&conn]{
|
||||
for (auto &aph: conn)
|
||||
aph->finish_outer_call();
|
||||
});
|
||||
|
||||
CRITICAL_REGION_BEGIN(m_connects_lock);
|
||||
conn.reserve(m_connects.size());
|
||||
for (auto &e: m_connects)
|
||||
if (e.second->start_outer_call())
|
||||
conn.push_back(e.second);
|
||||
CRITICAL_REGION_END()
|
||||
|
||||
for (auto &aph: conn)
|
||||
if (!cb(aph->get_context_ref()))
|
||||
|
@ -883,11 +880,8 @@ bool async_protocol_handler_config<t_connection_context>::foreach_connection(con
|
|||
template<class t_connection_context> template<class callback_t>
|
||||
bool async_protocol_handler_config<t_connection_context>::for_connection(const boost::uuids::uuid &connection_id, const callback_t &cb)
|
||||
{
|
||||
CRITICAL_REGION_LOCAL(m_connects_lock);
|
||||
async_protocol_handler<t_connection_context>* aph = find_connection(connection_id);
|
||||
if (!aph)
|
||||
return false;
|
||||
if (!aph->start_outer_call())
|
||||
async_protocol_handler<t_connection_context>* aph = nullptr;
|
||||
if (find_and_lock_connection(connection_id, aph) != LEVIN_OK)
|
||||
return false;
|
||||
auto scope_exit_handler = misc_utils::create_scope_leave_handler(
|
||||
boost::bind(&async_protocol_handler<t_connection_context>::finish_outer_call, aph));
|
||||
|
@ -953,12 +947,14 @@ int async_protocol_handler_config<t_connection_context>::send(byte_slice message
|
|||
template<class t_connection_context>
|
||||
bool async_protocol_handler_config<t_connection_context>::close(boost::uuids::uuid connection_id)
|
||||
{
|
||||
CRITICAL_REGION_LOCAL(m_connects_lock);
|
||||
async_protocol_handler<t_connection_context>* aph = find_connection(connection_id);
|
||||
if (!aph)
|
||||
async_protocol_handler<t_connection_context>* aph = nullptr;
|
||||
if (find_and_lock_connection(connection_id, aph) != LEVIN_OK)
|
||||
return false;
|
||||
auto scope_exit_handler = misc_utils::create_scope_leave_handler(
|
||||
boost::bind(&async_protocol_handler<t_connection_context>::finish_outer_call, aph));
|
||||
if (!aph->close())
|
||||
return false;
|
||||
CRITICAL_REGION_LOCAL(m_connects_lock);
|
||||
m_connects.erase(connection_id);
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -170,6 +170,7 @@ TEST(test_epee_connection, test_lifetime)
|
|||
using connection_ptr = boost::shared_ptr<connection_t>;
|
||||
using shared_state_t = typename connection_t::shared_state;
|
||||
using shared_state_ptr = std::shared_ptr<shared_state_t>;
|
||||
using shared_states_t = std::vector<shared_state_ptr>;
|
||||
using tag_t = boost::uuids::uuid;
|
||||
using tags_t = std::vector<tag_t>;
|
||||
using io_context_t = boost::asio::io_service;
|
||||
|
@ -344,6 +345,115 @@ TEST(test_epee_connection, test_lifetime)
|
|||
shared_state->del_out_connections(1);
|
||||
ASSERT_TRUE(shared_state->get_connections_count() == 0);
|
||||
}
|
||||
|
||||
shared_states_t shared_states;
|
||||
while (shared_states.size() < 2) {
|
||||
shared_states.emplace_back(std::make_shared<shared_state_t>());
|
||||
shared_states.back()->set_handler(new command_handler_t(ZERO_DELAY,
|
||||
[&shared_states]{
|
||||
for (auto &s: shared_states) {
|
||||
auto success = s->foreach_connection([](context_t&){
|
||||
return true;
|
||||
});
|
||||
ASSERT_TRUE(success);
|
||||
}
|
||||
}
|
||||
),
|
||||
&command_handler_t::destroy
|
||||
);
|
||||
}
|
||||
workers_t workers;
|
||||
|
||||
for (auto &s: shared_states) {
|
||||
workers.emplace_back([&io_context, &s, &endpoint]{
|
||||
for (auto i = 0; i < N * N; ++i) {
|
||||
connection_ptr conn(new connection_t(io_context, s, {}, {}));
|
||||
conn->socket().connect(endpoint);
|
||||
conn->start({}, {});
|
||||
io_context.post([conn]{
|
||||
conn->cancel();
|
||||
});
|
||||
conn.reset();
|
||||
s->del_out_connections(1);
|
||||
while (s->sock_count);
|
||||
}
|
||||
});
|
||||
}
|
||||
for (;workers.size(); workers.pop_back())
|
||||
workers.back().join();
|
||||
|
||||
for (auto &s: shared_states) {
|
||||
workers.emplace_back([&io_context, &s, &endpoint]{
|
||||
for (auto i = 0; i < N * N; ++i) {
|
||||
connection_ptr conn(new connection_t(io_context, s, {}, {}));
|
||||
conn->socket().connect(endpoint);
|
||||
conn->start({}, {});
|
||||
conn->cancel();
|
||||
while (conn.use_count() > 1);
|
||||
s->foreach_connection([&io_context, &s, &endpoint, &conn](context_t& context){
|
||||
conn.reset(new connection_t(io_context, s, {}, {}));
|
||||
conn->socket().connect(endpoint);
|
||||
conn->start({}, {});
|
||||
conn->cancel();
|
||||
while (conn.use_count() > 1);
|
||||
conn.reset();
|
||||
return true;
|
||||
});
|
||||
while (s->sock_count);
|
||||
}
|
||||
});
|
||||
}
|
||||
for (;workers.size(); workers.pop_back())
|
||||
workers.back().join();
|
||||
|
||||
for (auto &s: shared_states) {
|
||||
workers.emplace_back([&io_context, &s, &endpoint]{
|
||||
for (auto i = 0; i < N; ++i) {
|
||||
connection_ptr conn(new connection_t(io_context, s, {}, {}));
|
||||
conn->socket().connect(endpoint);
|
||||
conn->start({}, {});
|
||||
context_t context;
|
||||
conn->get_context(context);
|
||||
auto tag = context.m_connection_id;
|
||||
conn->cancel();
|
||||
while (conn.use_count() > 1);
|
||||
s->for_connection(tag, [&io_context, &s, &endpoint, &conn](context_t& context){
|
||||
conn.reset(new connection_t(io_context, s, {}, {}));
|
||||
conn->socket().connect(endpoint);
|
||||
conn->start({}, {});
|
||||
conn->cancel();
|
||||
while (conn.use_count() > 1);
|
||||
conn.reset();
|
||||
return true;
|
||||
});
|
||||
while (s->sock_count);
|
||||
}
|
||||
});
|
||||
}
|
||||
for (;workers.size(); workers.pop_back())
|
||||
workers.back().join();
|
||||
|
||||
for (auto &s: shared_states) {
|
||||
workers.emplace_back([&io_context, &s, &endpoint]{
|
||||
for (auto i = 0; i < N; ++i) {
|
||||
connection_ptr conn(new connection_t(io_context, s, {}, {}));
|
||||
conn->socket().connect(endpoint);
|
||||
conn->start({}, {});
|
||||
context_t context;
|
||||
conn->get_context(context);
|
||||
auto tag = context.m_connection_id;
|
||||
io_context.post([conn]{
|
||||
conn->cancel();
|
||||
});
|
||||
conn.reset();
|
||||
s->close(tag);
|
||||
while (s->sock_count);
|
||||
}
|
||||
});
|
||||
}
|
||||
for (;workers.size(); workers.pop_back())
|
||||
workers.back().join();
|
||||
|
||||
});
|
||||
|
||||
for (auto& w: workers) {
|
||||
|
|
Loading…
Reference in a new issue